Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

As the BulkDataNT version is based in the proprietary software, is not part of the ACS distributed to the community and as such it's not a candidate for most users of ACS. The old implementation based on TAO A/V, has not been maintained in the later years and even when used in production, it had robustness problem that where never fixed.

Presentation

  • Scope
    • BulkData and BulkDataNT Architecture
  • Duration: 10 minutes

Hands On

Since the available implementation is no longer maintained it doesn't make sense to exercise the hands-on. Still, for informative purposes, example code is provided.

Sender Component

Code Block
languagecpp
titleSenderExample.idl
linenumberstrue
collapsetrue
#ifndef _SENDER_EXAMPLE_IDL_
#define _SENDER_EXAMPLE_IDL_

#pragma prefix "alma"
#include <bulkDataSender.idl>

module test {
    interface SenderExample : bulkdata::BulkDataSender {
    };
};

#endif
Code Block
languagecpp
titleSenderExampleImpl.h
linenumberstrue
collapsetrue
#ifndef _SENDER_EXAMPLE_IMPL_H
#define _SENDER_EXAMPLE_IMPL_H
#include "SenderExampleS.h"
#include "bulkDataSenderImpl.h"

class SenderExampleImpl : public virtual BulkDataSenderDefaultImpl, public virtual POA_test::SenderExample {
  public:
    SenderExample(const ACE_CString& name, maci::ContainerServices* containerServices);
    virtual ~SenderExample();
    virtual void startSend();
    virtual void paceData();
    virtual void stopSend();
};
#endif
Code Block
languagecpp
titleSenderExampleImpl.cpp
linenumberstrue
collapsetrue
#include "SenderExampleImpl.h"

using namespace ACSBulkDataError;

SenderExampleImpl::SenderExampleImpl(const ACE_CString& name, maci::ContainerServices* containerServices) : BulkDataSenderDefaultImpl(name,containerServices) {
}

SenderExampleImpl::~SenderExampleImpl() {
}

void BulkDataSenderPerfImpl::startSend()
{
    int size;
    size = 10;
    try {
        ACE_Message_Block *mb;
        mb = new ACE_Message_Block(size);
        for (CORBA::Long j = 0; j < (size-1); j++) {
            *mb->wr_ptr()='p';
            mb->wr_ptr(sizeof(char));
        }
        *mb->wr_ptr()='\0';
        mb->wr_ptr(sizeof(char));

        CORBA::ULong flowNumber = 1;
        getSender()->startSend(flowNumber, mb);

        mb->release();
    } catch (...) {
        ACS_SHORT_LOG((LM_INFO,"SenderExampleImpl::startSend UNKNOWN exception"));
        AVStartSendErrorExImpl err = AVStartSendErrorExImpl(__FILE__,__LINE__,"SenderExampleImpl::startSend");
        throw err.getAVStartSendErrorEx();
    }
}
void BulkDataSenderPerfImpl::paceData() {
    int size;
    CORBA::ULong flowNumber;
    size = 30000000;
    try {
        ACE_Message_Block *mb;
        mb = new ACE_Message_Block(size);
        for (CORBA::Long j = 0; j < (size-1); j++) {
            *mb->wr_ptr()='d';
            mb->wr_ptr(sizeof(char));
        }
        *mb->wr_ptr()='\0';
        mb->wr_ptr(sizeof(char));

        flowNumber = 1;
        getSender()->sendData(flowNumber, mb);

        mb->release();
    }
    catch (...) {
        ACS_SHORT_LOG((LM_INFO,"SenderExampleImpl::paceData UNKNOWN exception"));
        AVPaceDataErrorExImpl err = AVPaceDataErrorExImpl(__FILE__,__LINE__,"SenderExamplePerfImpl::paceData");
        throw err.getAVPaceDataErrorEx();
    }
}
void BulkDataSenderPerfImpl::stopSend()
{
    CORBA::ULong flowNumber;
    try {
        flowNumber = 1;
        getSender()->stopSend(flowNumber);
    } catch (...) {
        ACS_SHORT_LOG((LM_INFO,"SenderExampleImpl::stopSend UNKNOWN exception"));
        AVStopSendErrorExImpl err = AVStopSendErrorExImpl(__FILE__,__LINE__,"SenderExamplePerfImpl::stopSend");
        throw err.getAVStopSendErrorEx();
    }
}

/* --------------- [ MACI DLL support functions ] -----------------*/
#include <maciACSComponentDefines.h>
MACI_DLL_SUPPORT_FUNCTIONS(SenderExampleImpl)
/* ----------------------------------------------------------------*/

ReceiverComponent

Code Block
languagecpp
titleReceiverExample.idl
linenumberstrue
collapsetrue
#ifndef _RECEIVER_EXAMPLE_IDL_
#define _RECEIVER_EXAMPLE_IDL_

#pragma prefix "alma"

#include <bulkDataReceiver.idl>

module test {
    interface ReceiverExample : bulkdata::BulkDataReceiver {
    };
};

#endif
Code Block
languagecpp
titleReceiverExample.h
linenumberstrue
collapsetrue
#ifndef _RECEIVER_EXAMPLE_IMPL_H
#define _RECEIVER_EXAMPLE_IMPL_H

#include "ReceiverExampleS.h"
#include "bulkDataReceiverImpl.h"

template<class TCallback>
class ReceiverExampleImpl : public virtual BulkDataReceiverImpl<TCallback>, public virtual POA_bulkdata::ReceiverExample {
  public:
    BulkDataReceiver1PerfImpl(const ACE_CString& name,maci::ContainerServices* containerServices);
    virtual ~BulkDataReceiver1PerfImpl();
    void cleanUp();
};

#include "ReceiverExampleImpl.i"

#endif
Code Block
languagecpp
titleReceiverExample.i
linenumberstrue
collapsetrue
template<class TCallback>
ReceiverExampleImpl<TCallback>::ReceiverExampleImpl(const ACE_CString& name, maci::ContainerServices* containerServices) : BulkDataReceiverImpl<TCallback>(name,containerServices) {
    ACS_TRACE("ReceiverExampleImpl<>::ReceiverExampleImpl");
}

template<class TCallback>
ReceiverExampleImpl<TCallback>::~ReceiverExampleImpl() {
    ACS_TRACE("ReceiverExampleImpl<>::~ReceiverExampleImpl");
}

template<class TCallback>
void ReceiverExampleImpl<TCallback>::cleanUp()
{
    ACS_TRACE("ReceiverExampleImpl<>::cleanUp");
}
Code Block
languagecpp
titleReceiverExampleImpl.cpp
linenumberstrue
collapsetrue
#include "ReceiverExampleImpl.h"
#include "ReceiverExampleCb.h"


/* --------------- [ MACI DLL support functions ] -----------------*/
#include <maciACSComponentDefines.h>
MACI_DLL_SUPPORT_FUNCTIONS(ReceiverExampleImpl<ReceiverExampleCb>)
/* ----------------------------------------------------------------*/
Code Block
languagecpp
titleReceiverExampleCb.h
linenumberstrue
collapsetrue
#ifndef _RECEIVER_EXAMPLE_CB_H
#define _RECIEVER_EXAMPLE_CB_H

#include "bulkDataCallback.h"
#include "ace/High_Res_Timer.h"

class ReceiverExampleCb : public BulkDataCallback {
  public:
    ReceiverExampleCb();
    virtual ~ReceiverExampleCb();
    virtual int cbStart(ACE_Message_Block * userParam_p = 0);
    virtual int cbReceive(ACE_Message_Block * frame_p);
    virtual int cbStop();
  private:
    void dump_stats();
    double stats_avg();
    double sum_frame();

    ACE_Time_Value start_time;
    std::vector<double> dstats;
    int start;
    long count;
    long size;
    CORBA::ULong count1_m;
};

#endif
Code Block
languagecpp
titleReceiverExampleCb.cpp
linenumberstrue
collapsetrue
#include "ReceiverExampleCb.h"

ReceiverExampleCb::ReceiverExampleCb() : count1_m(0) {
    dstats.reserve(10000);
}

ReceiverExampleCb::~ReceiverExampleCb() {
}

int BulkDataReceiver1PerfCb::cbStart(ACE_Message_Block * userParam_p) {
    count = 0;
    size = 0;
    start = 1;
    return 0;
}

int BulkDataReceiver1PerfCb::cbReceive(ACE_Message_Block * frame_p) {
    count++;
    double dtime;

    if (start) {
        start_time = ACE_OS::gettimeofday();
        start = 0;
    } else {
        ACE_Time_Value elapsed_time = ACE_OS::gettimeofday() - start_time;
        dtime = elapsed_time.sec() + ( elapsed_time.usec() / 1000000. );

        dstats.push_back(dtime);

        start_time = ACE_OS::gettimeofday();
    }
    size += frame_p->length();
    return 0;
}

int BulkDataReceiver1PerfCb::cbStop() {
    dump_stats();
    return 0;
}

double BulkDataReceiver1PerfCb::stats_avg() {
    double sum = 0;
    for (size_t i = 0; i < dstats.size(); i++) {
        sum += dstats[i];
    }
    return sum/dstats.size();
}

double BulkDataReceiver1PerfCb::sum_frame()
{
    double sum = 0.;
    for (size_t i = 0; i < dstats.size(); i++) {
        sum += dstats[i];
    }
    return sum;
}

void BulkDataReceiver1PerfCb::dump_stats (void) {
    FILE* stats_file = ACE_OS::fopen ("StatsReceiver1.dat", "w");
    if (stats_file == 0) {
        ACE_ERROR ((LM_ERROR, "StatsReceiver1.dat cannot be opened \n"));
    }
    // first dump what the caller has to say.
    ACE_OS::fprintf (stats_file, "Average Inter-Frame Arrival Time = %f sec\n",stats_avg ());
    ACE_OS::fprintf (stats_file, "Amount transmitted = %ld bytes\n",size);
    ACE_OS::fprintf (stats_file, "Total time = %f sec\n",(sum_frame()));
    //ACE_OS::fprintf (stats_file, "Total time = %f sec\n",(sum_frame()/1000000.));
    double tmp = sum_frame();
    if (tmp != 0.) {
        ACE_OS::fprintf (stats_file, "Transfer rate = %f Mbits/sec \n",  (size/tmp)/(1024.*1024.)*8. );
    } else {
        ACE_OS::fprintf (stats_file, "Could not calculate transfer rate. Total amount of time = 0\n");
    }
    ACE_OS::fprintf (stats_file, "Single frame arrival time (sec):\n");
    for (size_t i = 0; i < dstats.size(); i++)
        ACE_OS::fprintf (stats_file, "%f\n",dstats[i]);
    ACE_OS::fclose (stats_file);
    ACS_SHORT_LOG((LM_INFO,"Done!"));
}


Discussion

  • How important is bulk data transfer in the ACS community?
  • TAO A/V BulkData Limitations
    • Lack of maintenance
    • Long Standing Bug
    • No Multicast
    • Only Implemented for Components (No standalone clients)
  • RTI DDS Limitations
    • Licensing Costs
    • Complex Tuning (Though very flexible)
  • ZeroMQ Implementation by CTA/UTFSM Agreement
  • Other ideas or topics?