You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Introduction

ACS provides two mechanisms for bulk data transfer, which are implemented in the BulkData and BulkDataNT modules. The first is the original bulk data implementation and is based on ACE/TAO Audio/Video Streaming Service. After several years of using it, it was superseded by BulkDataNT at ALMA. A new implementation based on the proprietary vendor RTI DDS allowing for better tuning and supporting multicast to reduce network usage.

As the BulkDataNT version is based in the proprietary software, is not part of the ACS distributed to the community and as such it's not a candidate for most users of ACS. The old implementation based on TAO A/V, has not been maintained in the later years and even when used in production, it had robustness problem that where never fixed.

Presentation

  • Scope
    • BulkData and BulkDataNT Architecture
  • Duration: 10 minutes

Hands On

Since the available implementation is no longer maintained it doesn't make sense to exercise the hands-on. Still, for informative purposes, example code is provided.

Sender Component

SenderExample.idl
#ifndef _SENDER_EXAMPLE_IDL_
#define _SENDER_EXAMPLE_IDL_

#pragma prefix "alma"
#include <bulkDataSender.idl>

module test {
    interface SenderExample : bulkdata::BulkDataSender {
    };
};

#endif
SenderExampleImpl.h
#ifndef _SENDER_EXAMPLE_IMPL_H
#define _SENDER_EXAMPLE_IMPL_H
#include "SenderExampleS.h"
#include "bulkDataSenderImpl.h"

class SenderExampleImpl : public virtual BulkDataSenderDefaultImpl, public virtual POA_test::SenderExample {
  public:
    SenderExample(const ACE_CString& name, maci::ContainerServices* containerServices);
    virtual ~SenderExample();
    virtual void startSend();
    virtual void paceData();
    virtual void stopSend();
};
#endif
SenderExampleImpl.cpp
#include "SenderExampleImpl.h"

using namespace ACSBulkDataError;

SenderExampleImpl::SenderExampleImpl(const ACE_CString& name, maci::ContainerServices* containerServices) : BulkDataSenderDefaultImpl(name,containerServices) {
}

SenderExampleImpl::~SenderExampleImpl() {
}

void BulkDataSenderPerfImpl::startSend()
{
    int size;
    size = 10;
    try {
        ACE_Message_Block *mb;
        mb = new ACE_Message_Block(size);
        for (CORBA::Long j = 0; j < (size-1); j++) {
            *mb->wr_ptr()='p';
            mb->wr_ptr(sizeof(char));
        }
        *mb->wr_ptr()='\0';
        mb->wr_ptr(sizeof(char));

        CORBA::ULong flowNumber = 1;
        getSender()->startSend(flowNumber, mb);

        mb->release();
    } catch (...) {
        ACS_SHORT_LOG((LM_INFO,"SenderExampleImpl::startSend UNKNOWN exception"));
        AVStartSendErrorExImpl err = AVStartSendErrorExImpl(__FILE__,__LINE__,"SenderExampleImpl::startSend");
        throw err.getAVStartSendErrorEx();
    }
}
void BulkDataSenderPerfImpl::paceData() {
    int size;
    CORBA::ULong flowNumber;
    size = 30000000;
    try {
        ACE_Message_Block *mb;
        mb = new ACE_Message_Block(size);
        for (CORBA::Long j = 0; j < (size-1); j++) {
            *mb->wr_ptr()='d';
            mb->wr_ptr(sizeof(char));
        }
        *mb->wr_ptr()='\0';
        mb->wr_ptr(sizeof(char));

        flowNumber = 1;
        getSender()->sendData(flowNumber, mb);

        mb->release();
    }
    catch (...) {
        ACS_SHORT_LOG((LM_INFO,"SenderExampleImpl::paceData UNKNOWN exception"));
        AVPaceDataErrorExImpl err = AVPaceDataErrorExImpl(__FILE__,__LINE__,"SenderExamplePerfImpl::paceData");
        throw err.getAVPaceDataErrorEx();
    }
}
void BulkDataSenderPerfImpl::stopSend()
{
    CORBA::ULong flowNumber;
    try {
        flowNumber = 1;
        getSender()->stopSend(flowNumber);
    } catch (...) {
        ACS_SHORT_LOG((LM_INFO,"SenderExampleImpl::stopSend UNKNOWN exception"));
        AVStopSendErrorExImpl err = AVStopSendErrorExImpl(__FILE__,__LINE__,"SenderExamplePerfImpl::stopSend");
        throw err.getAVStopSendErrorEx();
    }
}

/* --------------- [ MACI DLL support functions ] -----------------*/
#include <maciACSComponentDefines.h>
MACI_DLL_SUPPORT_FUNCTIONS(SenderExampleImpl)
/* ----------------------------------------------------------------*/

ReceiverComponent

ReceiverExample.idl
#ifndef _RECEIVER_EXAMPLE_IDL_
#define _RECEIVER_EXAMPLE_IDL_

#pragma prefix "alma"

#include <bulkDataReceiver.idl>

module test {
    interface ReceiverExample : bulkdata::BulkDataReceiver {
    };
};

#endif
ReceiverExample.h
#ifndef _RECEIVER_EXAMPLE_IMPL_H
#define _RECEIVER_EXAMPLE_IMPL_H

#include "ReceiverExampleS.h"
#include "bulkDataReceiverImpl.h"

template<class TCallback>
class ReceiverExampleImpl : public virtual BulkDataReceiverImpl<TCallback>, public virtual POA_bulkdata::ReceiverExample {
  public:
    BulkDataReceiver1PerfImpl(const ACE_CString& name,maci::ContainerServices* containerServices);
    virtual ~BulkDataReceiver1PerfImpl();
    void cleanUp();
};

#include "ReceiverExampleImpl.i"

#endif
ReceiverExample.i
template<class TCallback>
ReceiverExampleImpl<TCallback>::ReceiverExampleImpl(const ACE_CString& name, maci::ContainerServices* containerServices) : BulkDataReceiverImpl<TCallback>(name,containerServices) {
    ACS_TRACE("ReceiverExampleImpl<>::ReceiverExampleImpl");
}

template<class TCallback>
ReceiverExampleImpl<TCallback>::~ReceiverExampleImpl() {
    ACS_TRACE("ReceiverExampleImpl<>::~ReceiverExampleImpl");
}

template<class TCallback>
void ReceiverExampleImpl<TCallback>::cleanUp()
{
    ACS_TRACE("ReceiverExampleImpl<>::cleanUp");
}
ReceiverExampleImpl.cpp
#include "ReceiverExampleImpl.h"
#include "ReceiverExampleCb.h"


/* --------------- [ MACI DLL support functions ] -----------------*/
#include <maciACSComponentDefines.h>
MACI_DLL_SUPPORT_FUNCTIONS(ReceiverExampleImpl<ReceiverExampleCb>)
/* ----------------------------------------------------------------*/
ReceiverExampleCb.h
#ifndef _RECEIVER_EXAMPLE_CB_H
#define _RECIEVER_EXAMPLE_CB_H

#include "bulkDataCallback.h"
#include "ace/High_Res_Timer.h"

class ReceiverExampleCb : public BulkDataCallback {
  public:
    ReceiverExampleCb();
    virtual ~ReceiverExampleCb();
    virtual int cbStart(ACE_Message_Block * userParam_p = 0);
    virtual int cbReceive(ACE_Message_Block * frame_p);
    virtual int cbStop();
  private:
    void dump_stats();
    double stats_avg();
    double sum_frame();

    ACE_Time_Value start_time;
    std::vector<double> dstats;
    int start;
    long count;
    long size;
    CORBA::ULong count1_m;
};

#endif
ReceiverExampleCb.cpp
#include "ReceiverExampleCb.h"

ReceiverExampleCb::ReceiverExampleCb() : count1_m(0) {
    dstats.reserve(10000);
}

ReceiverExampleCb::~ReceiverExampleCb() {
}

int BulkDataReceiver1PerfCb::cbStart(ACE_Message_Block * userParam_p) {
    count = 0;
    size = 0;
    start = 1;
    return 0;
}

int BulkDataReceiver1PerfCb::cbReceive(ACE_Message_Block * frame_p) {
    count++;
    double dtime;

    if (start) {
        start_time = ACE_OS::gettimeofday();
        start = 0;
    } else {
        ACE_Time_Value elapsed_time = ACE_OS::gettimeofday() - start_time;
        dtime = elapsed_time.sec() + ( elapsed_time.usec() / 1000000. );

        dstats.push_back(dtime);

        start_time = ACE_OS::gettimeofday();
    }
    size += frame_p->length();
    return 0;
}

int BulkDataReceiver1PerfCb::cbStop() {
    dump_stats();
    return 0;
}

double BulkDataReceiver1PerfCb::stats_avg() {
    double sum = 0;
    for (size_t i = 0; i < dstats.size(); i++) {
        sum += dstats[i];
    }
    return sum/dstats.size();
}

double BulkDataReceiver1PerfCb::sum_frame()
{
    double sum = 0.;
    for (size_t i = 0; i < dstats.size(); i++) {
        sum += dstats[i];
    }
    return sum;
}

void BulkDataReceiver1PerfCb::dump_stats (void) {
    FILE* stats_file = ACE_OS::fopen ("StatsReceiver1.dat", "w");
    if (stats_file == 0) {
        ACE_ERROR ((LM_ERROR, "StatsReceiver1.dat cannot be opened \n"));
    }
    // first dump what the caller has to say.
    ACE_OS::fprintf (stats_file, "Average Inter-Frame Arrival Time = %f sec\n",stats_avg ());
    ACE_OS::fprintf (stats_file, "Amount transmitted = %ld bytes\n",size);
    ACE_OS::fprintf (stats_file, "Total time = %f sec\n",(sum_frame()));
    //ACE_OS::fprintf (stats_file, "Total time = %f sec\n",(sum_frame()/1000000.));
    double tmp = sum_frame();
    if (tmp != 0.) {
        ACE_OS::fprintf (stats_file, "Transfer rate = %f Mbits/sec \n",  (size/tmp)/(1024.*1024.)*8. );
    } else {
        ACE_OS::fprintf (stats_file, "Could not calculate transfer rate. Total amount of time = 0\n");
    }
    ACE_OS::fprintf (stats_file, "Single frame arrival time (sec):\n");
    for (size_t i = 0; i < dstats.size(); i++)
        ACE_OS::fprintf (stats_file, "%f\n",dstats[i]);
    ACE_OS::fclose (stats_file);
    ACS_SHORT_LOG((LM_INFO,"Done!"));
}


Discussion

  • How important is bulk data transfer in the ACS community?
  • TAO A/V BulkData Limitations
    • Lack of maintenance
    • Long Standing Bug
    • No Multicast
    • Only Implemented for Components (No standalone clients)
  • RTI DDS Limitations
    • Licensing Costs
    • Complex Tuning (Though very flexible)
  • ZeroMQ Implementation by CTA/UTFSM Agreement
  • Other ideas or topics?
  • No labels