Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Introduction

The Notification Channel is a mechanism for decoupled communication between a supplier one or more consumer which agree on a Channel and EventType to communicate with each other. The Notification Service used is based on CORBA Notification Service specification and in particular abstracts TAO Cos Notification implementation.

The implementation is available for Python, Java and C++ languages and can interact transparently between all of them.

Presentation

  • Scope
    • TAO's Notification Service
    • ACS Notification Channel Architecture
  • Duration: 15 minutes

Hands-On Exercise

Event Definition IDL

Code Block
languagecpp
titleExampleEvent.idl
linenumberstrue
collapsetrue
#ifndef _ExampleEvent_IDL_
#define _ExampleEvent_IDL_

#pragma prefix "alma"

module workshop {
    const string CHANNELNAME_EXAMPLE = "example";
    struct ExampleEvent {
        string msg;
        long value;
    };
};
#endif

Supplier

Python

Code Block
languagepy
titleExampleEventSupplier.py
linenumberstrue
collapsetrue
import workshop


from Acspy.Nc.Supplier import Supplier

event = workshop.ExampleEvent("Example Supplier", 10)


sup = Supplier(workshop.CHANNELNAME_EXAMPLE)
sup.publishEvent(simple_data=event)
sup.disconnect()

Java

Code Block
languagepy
titleExampleEventSupplier.java
linenumberstrue
collapsetrue
package ...;
 
import java.util.logging.Logger;


import org.omg.CORBA.portable.IDLEntity;


import alma.acs.nc.AcsEventPublisher;
import alma.acs.component.client.ComponentClient;


import alma.workshop.ExampleEvent;
import alma.workshop.CHANNELNAME_EXAMPLE;
 
public class ExampleEventSupplier extends ComponentClient { 
    private Logger m_logger;
 
    public ExampleEventSupplier() {
        String managerLoc = System.getProperty("ACS.manager");
        super(null, managerLoc, clientName);
        m_logger = getContainerServices().getLogger()
    }
 
    public doStuff() {
        ExampleEvent event = new ExampleEvent("Example Supplier", 10);

        AcsEventPublisher<IDLEntity> sup = getContainerServices().createNotificationChannelPublisher(CHANNELNAME_EXAMPLE.value, IDLEntity.class);
        sup.publishEvent(event)

        sup.disconnect()
    }
 
    public static void main(String[] args) {
        ExampleEventSupplier client = new ExampleEventSupplier();
        client.doStuff();
    }
}

C++

Code Block
languagepy
titleExampleEventSupplier.py
linenumberstrue
collapsetrue
#include <maciSimpleClient.h>
#include <acsncSimpleSupplier.h>


#include <ExampleEventC.h>
 
int main(int argc, char *argv[]) {
    maci::SimpleClient client;

    if (client.init(argc,argv) == 0) {
        return -1;
    } else {
        client.login();
    }


    workshop::ExampleEvent event;
    event.msg ="Example Supplier";
    event.value = 10;


    sup = new nc::SimpleSupplier(workshop::CHANNELNAME_EXAMPLE, NULL);
    sup->publishData<workshop::ExampleEvent>(event);
    sup.disconnect()

    client.logout();
 
    ACE_OS::sleep(3);
    return 0;
}

Consumer

Python

Code Block
languagepy
titleExampleEventConsumer.py
linenumberstrue
collapsetrue
import time
import workshop
from Acspy.Nc.Consumer import Consumer

def eventHandler(event):
    print("New messsage received: " + str(event))
    print(event.msg)
    print(event.value)

con = Consumer(workshop.CHANNELNAME_EXAMPLE)
con.addSubscription(workshop.ExampleEvent, eventHandler)

con.consumerReady()
time.sleep(10)

con.disconnect()

Java

Code Block
languagepy
titleExampleEventConsumer.py
linenumberstrue
collapsetrue
package ...;
 
import java.util.logging.Logger;

import alma.acs.nc.AcsEventSubscriber;
import alma.acs.component.client.ComponentClient;

import alma.workshop.ExampleEvent;
import alma.workshop.CHANNELNAME_EXAMPLE;
 
public class ExampleEventConsumer extends ComponentClient implements AcsEventSubscriber.Callback<ExampleEvent> { 
    private Logger m_logger;
 
    public ExampleEventConsumer() {
        String managerLoc = System.getProperty("ACS.manager");
        super(null, managerLoc, clientName);
        m_logger = getContainerServices().getLogger()
    }
 
    public doStuff() {
        ExampleEvent event = new ExampleEvent("Example Supplier", 10);

        AcsEventSubscriber<ExampleEvent> con = getContainerServices().createNotificationChannelSubscriber(CHANNELNAME_EXAMPLE.value, ExampleEvent.class);
        con.addSubscription(this);
        con.startReceivingEvents();
        Thread.sleep(10);
        
        con.disconnect();
    }


    public void receive(ExampleEvent event, EventDescription desc) {
        m_logger.info("New messsage received: " + event)
        m_logger.info(event.msg)
        m_logger.info(event.value)
    }
 
    public static void main(String[] args) {
        ExampleEventSupplier client = new ExampleEventSupplier();
        client.doStuff();
    }
}

C++

Code Block
languagepy
titleExampleEventConsumer.py
linenumberstrue
collapsetrue
import time
import workshop
from Acspy.Nc.Consumer import Consumer

def eventHandler(event):
    print("New messsage received: " + str(event))
    print(event.msg)
    print(event.value)

g = Consumer(workshop.CHANNELNAME_EXAMPLE)
g.addSubscription(workshop.ExampleEvent, eventHandler)

g.consumerReady()
time.sleep(10)

g.disconnect()

Discussion

  • Reliability Problems
  • New Technologies (ActiveMQ, ZeroMQ, Kafka, Akka Streams, etc.)