...
As the BulkDataNT version is based in the proprietary software, is not part of the ACS distributed to the community and as such it's not a candidate for most users of ACS. The old implementation based on TAO A/V, has not been maintained in the later years and even when used in production, it had robustness problem that where never fixed.
Presentation
- Scope
- BulkData and BulkDataNT Architecture
- Duration: 10 minutes
Hands On
Since the available implementation is no longer maintained it doesn't make sense to exercise the hands-on. Still, for informative purposes, example code is provided.
Sender Component
Code Block |
---|
language | cpp |
---|
title | SenderExample.idl |
---|
linenumbers | true |
---|
collapse | true |
---|
|
#ifndef _SENDER_EXAMPLE_IDL_
#define _SENDER_EXAMPLE_IDL_
#pragma prefix "alma"
#include <bulkDataSender.idl>
module test {
interface SenderExample : bulkdata::BulkDataSender {
};
};
#endif |
Code Block |
---|
language | cpp |
---|
title | SenderExampleImpl.h |
---|
linenumbers | true |
---|
collapse | true |
---|
|
#ifndef _SENDER_EXAMPLE_IMPL_H
#define _SENDER_EXAMPLE_IMPL_H
#include "SenderExampleS.h"
#include "bulkDataSenderImpl.h"
class SenderExampleImpl : public virtual BulkDataSenderDefaultImpl, public virtual POA_test::SenderExample {
public:
SenderExample(const ACE_CString& name, maci::ContainerServices* containerServices);
virtual ~SenderExample();
virtual void startSend();
virtual void paceData();
virtual void stopSend();
};
#endif |
Code Block |
---|
language | cpp |
---|
title | SenderExampleImpl.cpp |
---|
linenumbers | true |
---|
collapse | true |
---|
|
#include "SenderExampleImpl.h"
using namespace ACSBulkDataError;
SenderExampleImpl::SenderExampleImpl(const ACE_CString& name, maci::ContainerServices* containerServices) : BulkDataSenderDefaultImpl(name,containerServices) {
}
SenderExampleImpl::~SenderExampleImpl() {
}
void BulkDataSenderPerfImpl::startSend()
{
int size;
size = 10;
try {
ACE_Message_Block *mb;
mb = new ACE_Message_Block(size);
for (CORBA::Long j = 0; j < (size-1); j++) {
*mb->wr_ptr()='p';
mb->wr_ptr(sizeof(char));
}
*mb->wr_ptr()='\0';
mb->wr_ptr(sizeof(char));
CORBA::ULong flowNumber = 1;
getSender()->startSend(flowNumber, mb);
mb->release();
} catch (...) {
ACS_SHORT_LOG((LM_INFO,"SenderExampleImpl::startSend UNKNOWN exception"));
AVStartSendErrorExImpl err = AVStartSendErrorExImpl(__FILE__,__LINE__,"SenderExampleImpl::startSend");
throw err.getAVStartSendErrorEx();
}
}
void BulkDataSenderPerfImpl::paceData() {
int size;
CORBA::ULong flowNumber;
size = 30000000;
try {
ACE_Message_Block *mb;
mb = new ACE_Message_Block(size);
for (CORBA::Long j = 0; j < (size-1); j++) {
*mb->wr_ptr()='d';
mb->wr_ptr(sizeof(char));
}
*mb->wr_ptr()='\0';
mb->wr_ptr(sizeof(char));
flowNumber = 1;
getSender()->sendData(flowNumber, mb);
mb->release();
}
catch (...) {
ACS_SHORT_LOG((LM_INFO,"SenderExampleImpl::paceData UNKNOWN exception"));
AVPaceDataErrorExImpl err = AVPaceDataErrorExImpl(__FILE__,__LINE__,"SenderExamplePerfImpl::paceData");
throw err.getAVPaceDataErrorEx();
}
}
void BulkDataSenderPerfImpl::stopSend()
{
CORBA::ULong flowNumber;
try {
flowNumber = 1;
getSender()->stopSend(flowNumber);
} catch (...) {
ACS_SHORT_LOG((LM_INFO,"SenderExampleImpl::stopSend UNKNOWN exception"));
AVStopSendErrorExImpl err = AVStopSendErrorExImpl(__FILE__,__LINE__,"SenderExamplePerfImpl::stopSend");
throw err.getAVStopSendErrorEx();
}
}
/* --------------- [ MACI DLL support functions ] -----------------*/
#include <maciACSComponentDefines.h>
MACI_DLL_SUPPORT_FUNCTIONS(SenderExampleImpl)
/* ----------------------------------------------------------------*/ |
ReceiverComponent
Code Block |
---|
language | cpp |
---|
title | ReceiverExample.idl |
---|
linenumbers | true |
---|
collapse | true |
---|
|
#ifndef _RECEIVER_EXAMPLE_IDL_
#define _RECEIVER_EXAMPLE_IDL_
#pragma prefix "alma"
#include <bulkDataReceiver.idl>
module test {
interface ReceiverExample : bulkdata::BulkDataReceiver {
};
};
#endif |
Code Block |
---|
language | cpp |
---|
title | ReceiverExample.h |
---|
linenumbers | true |
---|
collapse | true |
---|
|
#ifndef _RECEIVER_EXAMPLE_IMPL_H
#define _RECEIVER_EXAMPLE_IMPL_H
#include "ReceiverExampleS.h"
#include "bulkDataReceiverImpl.h"
template<class TCallback>
class ReceiverExampleImpl : public virtual BulkDataReceiverImpl<TCallback>, public virtual POA_bulkdata::ReceiverExample {
public:
BulkDataReceiver1PerfImpl(const ACE_CString& name,maci::ContainerServices* containerServices);
virtual ~BulkDataReceiver1PerfImpl();
void cleanUp();
};
#include "ReceiverExampleImpl.i"
#endif |
Code Block |
---|
language | cpp |
---|
title | ReceiverExample.i |
---|
linenumbers | true |
---|
collapse | true |
---|
|
template<class TCallback>
ReceiverExampleImpl<TCallback>::ReceiverExampleImpl(const ACE_CString& name, maci::ContainerServices* containerServices) : BulkDataReceiverImpl<TCallback>(name,containerServices) {
ACS_TRACE("ReceiverExampleImpl<>::ReceiverExampleImpl");
}
template<class TCallback>
ReceiverExampleImpl<TCallback>::~ReceiverExampleImpl() {
ACS_TRACE("ReceiverExampleImpl<>::~ReceiverExampleImpl");
}
template<class TCallback>
void ReceiverExampleImpl<TCallback>::cleanUp()
{
ACS_TRACE("ReceiverExampleImpl<>::cleanUp");
} |
Code Block |
---|
language | cpp |
---|
title | ReceiverExampleImpl.cpp |
---|
linenumbers | true |
---|
collapse | true |
---|
|
#include "ReceiverExampleImpl.h"
#include "ReceiverExampleCb.h"
/* --------------- [ MACI DLL support functions ] -----------------*/
#include <maciACSComponentDefines.h>
MACI_DLL_SUPPORT_FUNCTIONS(ReceiverExampleImpl<ReceiverExampleCb>)
/* ----------------------------------------------------------------*/ |
Code Block |
---|
language | cpp |
---|
title | ReceiverExampleCb.h |
---|
linenumbers | true |
---|
collapse | true |
---|
|
#ifndef _RECEIVER_EXAMPLE_CB_H
#define _RECIEVER_EXAMPLE_CB_H
#include "bulkDataCallback.h"
#include "ace/High_Res_Timer.h"
class ReceiverExampleCb : public BulkDataCallback {
public:
ReceiverExampleCb();
virtual ~ReceiverExampleCb();
virtual int cbStart(ACE_Message_Block * userParam_p = 0);
virtual int cbReceive(ACE_Message_Block * frame_p);
virtual int cbStop();
private:
void dump_stats();
double stats_avg();
double sum_frame();
ACE_Time_Value start_time;
std::vector<double> dstats;
int start;
long count;
long size;
CORBA::ULong count1_m;
};
#endif |
Code Block |
---|
language | cpp |
---|
title | ReceiverExampleCb.cpp |
---|
linenumbers | true |
---|
collapse | true |
---|
|
#include "ReceiverExampleCb.h"
ReceiverExampleCb::ReceiverExampleCb() : count1_m(0) {
dstats.reserve(10000);
}
ReceiverExampleCb::~ReceiverExampleCb() {
}
int BulkDataReceiver1PerfCb::cbStart(ACE_Message_Block * userParam_p) {
count = 0;
size = 0;
start = 1;
return 0;
}
int BulkDataReceiver1PerfCb::cbReceive(ACE_Message_Block * frame_p) {
count++;
double dtime;
if (start) {
start_time = ACE_OS::gettimeofday();
start = 0;
} else {
ACE_Time_Value elapsed_time = ACE_OS::gettimeofday() - start_time;
dtime = elapsed_time.sec() + ( elapsed_time.usec() / 1000000. );
dstats.push_back(dtime);
start_time = ACE_OS::gettimeofday();
}
size += frame_p->length();
return 0;
}
int BulkDataReceiver1PerfCb::cbStop() {
dump_stats();
return 0;
}
double BulkDataReceiver1PerfCb::stats_avg() {
double sum = 0;
for (size_t i = 0; i < dstats.size(); i++) {
sum += dstats[i];
}
return sum/dstats.size();
}
double BulkDataReceiver1PerfCb::sum_frame()
{
double sum = 0.;
for (size_t i = 0; i < dstats.size(); i++) {
sum += dstats[i];
}
return sum;
}
void BulkDataReceiver1PerfCb::dump_stats (void) {
FILE* stats_file = ACE_OS::fopen ("StatsReceiver1.dat", "w");
if (stats_file == 0) {
ACE_ERROR ((LM_ERROR, "StatsReceiver1.dat cannot be opened \n"));
}
// first dump what the caller has to say.
ACE_OS::fprintf (stats_file, "Average Inter-Frame Arrival Time = %f sec\n",stats_avg ());
ACE_OS::fprintf (stats_file, "Amount transmitted = %ld bytes\n",size);
ACE_OS::fprintf (stats_file, "Total time = %f sec\n",(sum_frame()));
//ACE_OS::fprintf (stats_file, "Total time = %f sec\n",(sum_frame()/1000000.));
double tmp = sum_frame();
if (tmp != 0.) {
ACE_OS::fprintf (stats_file, "Transfer rate = %f Mbits/sec \n", (size/tmp)/(1024.*1024.)*8. );
} else {
ACE_OS::fprintf (stats_file, "Could not calculate transfer rate. Total amount of time = 0\n");
}
ACE_OS::fprintf (stats_file, "Single frame arrival time (sec):\n");
for (size_t i = 0; i < dstats.size(); i++)
ACE_OS::fprintf (stats_file, "%f\n",dstats[i]);
ACE_OS::fclose (stats_file);
ACS_SHORT_LOG((LM_INFO,"Done!"));
} |
Discussion
- How important is bulk data transfer in the ACS community?
- TAO A/V BulkData Limitations
- Lack of maintenance
- Long Standing Bug
- No Multicast
- Only Implemented for Components (No standalone clients)
- RTI DDS Limitations
- Licensing Costs
- Complex Tuning (Though very flexible)
- ZeroMQ Implementation by CTA/UTFSM Agreement
- Other ideas or topics?