Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Scope
    • BulkData and BulkDataNT Architecture
  • Duration: 10 minutes

Hands On

Since the available implementation is no longer maintained it doesn't make sense to exercise the hands-on. Still, for informative purposes, example code is provided.

Sender Component

Code Block
languagecpp
titleSenderExample.idl
linenumberstrue
collapsetrue
#ifndef _SENDER_EXAMPLE_IDL_
#define _SENDER_EXAMPLE_IDL_

#pragma prefix "alma"
#include <bulkDataSender.idl>

module test {
    interface SenderExample : bulkdata::BulkDataSender {
    };
};

#endif
Code Block
languagecpp
titleSenderExampleImpl.h
linenumberstrue
collapsetrue
#ifndef _SENDER_EXAMPLE_IMPL_H
#define _SENDER_EXAMPLE_IMPL_H
#include "SenderExampleS.h"
#include "bulkDataSenderImpl.h"

class SenderExampleImpl : public virtual BulkDataSenderDefaultImpl, public virtual POA_test::SenderExample {
  public:
    SenderExample(const ACE_CString& name, maci::ContainerServices* containerServices);
    virtual ~SenderExample();
    virtual void startSend();
    virtual void paceData();
    virtual void stopSend();
};
#endif
Code Block
languagecpp
titleSenderExampleImpl.cpp
linenumberstrue
collapsetrue
#include "SenderExampleImpl.h"

using namespace ACSBulkDataError;

SenderExampleImpl::SenderExampleImpl(const ACE_CString& name, maci::ContainerServices* containerServices) : BulkDataSenderDefaultImpl(name,containerServices) {
}

SenderExampleImpl::~SenderExampleImpl() {
}

void BulkDataSenderPerfImpl::startSend()
{
    int size;
    size = 10;
    try {
        ACE_Message_Block *mb;
        mb = new ACE_Message_Block(size);
        for (CORBA::Long j = 0; j < (size-1); j++) {
            *mb->wr_ptr()='p';
            mb->wr_ptr(sizeof(char));
        }
        *mb->wr_ptr()='\0';
        mb->wr_ptr(sizeof(char));

        CORBA::ULong flowNumber = 1;
        getSender()->startSend(flowNumber, mb);

        mb->release();
    } catch (...) {
        ACS_SHORT_LOG((LM_INFO,"SenderExampleImpl::startSend UNKNOWN exception"));
        AVStartSendErrorExImpl err = AVStartSendErrorExImpl(__FILE__,__LINE__,"SenderExampleImpl::startSend");
        throw err.getAVStartSendErrorEx();
    }
}
void BulkDataSenderPerfImpl::paceData() {
    int size;
    CORBA::ULong flowNumber;
    size = 30000000;
    try {
        ACE_Message_Block *mb;
        mb = new ACE_Message_Block(size);
        for (CORBA::Long j = 0; j < (size-1); j++) {
            *mb->wr_ptr()='d';
            mb->wr_ptr(sizeof(char));
        }
        *mb->wr_ptr()='\0';
        mb->wr_ptr(sizeof(char));

        flowNumber = 1;
        getSender()->sendData(flowNumber, mb);

        mb->release();
    }
    catch (...) {
        ACS_SHORT_LOG((LM_INFO,"SenderExampleImpl::paceData UNKNOWN exception"));
        AVPaceDataErrorExImpl err = AVPaceDataErrorExImpl(__FILE__,__LINE__,"SenderExamplePerfImpl::paceData");
        throw err.getAVPaceDataErrorEx();
    }
}
void BulkDataSenderPerfImpl::stopSend()
{
    CORBA::ULong flowNumber;
    try {
        flowNumber = 1;
        getSender()->stopSend(flowNumber);
    } catch (...) {
        ACS_SHORT_LOG((LM_INFO,"SenderExampleImpl::stopSend UNKNOWN exception"));
        AVStopSendErrorExImpl err = AVStopSendErrorExImpl(__FILE__,__LINE__,"SenderExamplePerfImpl::stopSend");
        throw err.getAVStopSendErrorEx();
    }
}

/* --------------- [ MACI DLL support functions ] -----------------*/
#include <maciACSComponentDefines.h>
MACI_DLL_SUPPORT_FUNCTIONS(SenderExampleImpl)
/* ----------------------------------------------------------------*/

ReceiverComponent

Code Block
languagecpp
titleReceiverExample.idl
linenumberstrue
collapsetrue
#ifndef _RECEIVER_EXAMPLE_IDL_
#define _RECEIVER_EXAMPLE_IDL_

#pragma prefix "alma"

#include <bulkDataReceiver.idl>

module test {
    interface ReceiverExample : bulkdata::BulkDataReceiver {
    };
};

#endif
Code Block
languagecpp
titleReceiverExampleImpl.h
linenumberstrue
collapsetrue
#ifndef _RECEIVER_EXAMPLE_IMPL_H
#define _RECEIVER_EXAMPLE_IMPL_H

#include "ReceiverExampleS.h"
#include "bulkDataReceiverImpl.h"

template<class TCallback>
class ReceiverExampleImpl : public virtual BulkDataReceiverImpl<TCallback>, public virtual POA_bulkdata::ReceiverExample {
  public:
    BulkDataReceiver1PerfImpl(const ACE_CString& name,maci::ContainerServices* containerServices);
    virtual ~BulkDataReceiver1PerfImpl();
    void cleanUp();
};

#include "ReceiverExampleImpl.i"

#endif
Code Block
languagecpp
titleReceiverExampleImpl.i
linenumberstrue
collapsetrue
template<class TCallback>
ReceiverExampleImpl<TCallback>::ReceiverExampleImpl(const ACE_CString& name, maci::ContainerServices* containerServices) : BulkDataReceiverImpl<TCallback>(name,containerServices) {
    ACS_TRACE("ReceiverExampleImpl<>::ReceiverExampleImpl");
}

template<class TCallback>
ReceiverExampleImpl<TCallback>::~ReceiverExampleImpl() {
    ACS_TRACE("ReceiverExampleImpl<>::~ReceiverExampleImpl");
}

template<class TCallback>
void ReceiverExampleImpl<TCallback>::cleanUp()
{
    ACS_TRACE("ReceiverExampleImpl<>::cleanUp");
}
Code Block
languagecpp
titleReceiverExampleImpl.cpp
linenumberstrue
collapsetrue
#include "ReceiverExampleImpl.h"
#include "ReceiverExampleCb.h"


/* --------------- [ MACI DLL support functions ] -----------------*/
#include <maciACSComponentDefines.h>
MACI_DLL_SUPPORT_FUNCTIONS(ReceiverExampleImpl<ReceiverExampleCb>)
/* ----------------------------------------------------------------*/
Code Block
languagecpp
titleReceiverExampleCb.h
linenumberstrue
collapsetrue
#ifndef _RECEIVER_EXAMPLE_CB_H
#define _RECIEVER_EXAMPLE_CB_H

#include "bulkDataCallback.h"
#include "ace/High_Res_Timer.h"

class ReceiverExampleCb : public BulkDataCallback {
  public:
    ReceiverExampleCb();
    virtual ~ReceiverExampleCb();
    virtual int cbStart(ACE_Message_Block * userParam_p = 0);
    virtual int cbReceive(ACE_Message_Block * frame_p);
    virtual int cbStop();
  private:
    void dump_stats();
    double stats_avg();
    double sum_frame();

    ACE_Time_Value start_time;
    std::vector<double> dstats;
    int start;
    long count;
    long size;
    CORBA::ULong count1_m;
};

#endif

...

languagecpp
titleReceiverExampleCb.cpp
linenumberstrue
collapsetrue

...

View file
nameBulkData.pdf
height250

...

Discussion

  • How important is bulk data transfer in the ACS community?
  • TAO A/V BulkData Limitations
    • Lack of maintenance
    • Long Standing Bug
    • No Multicast
    • Only Implemented for Components (No standalone clients)
  • RTI DDS Limitations
    • Licensing Costs
    • Complex Tuning (Though very flexible)
  • ZeroMQ Implementation by CTA/UTFSM Agreement
  • Other ideas or topics?