Go to the documentation of this file.00001 #ifndef _DDS_SUBSCRIBER_H
00002 #define _DDS_SUBSCRIBER_H
00003
00004 #include <DDSHelper.h>
00005 #include <acsddsncDataReaderListener.h>
00006 #include <dds/DCPS/SubscriberImpl.h>
00007
00008 namespace ddsnc{
00009
00016 class DDSSubscriber : public ddsnc::DDSHelper{
00017 private:
00018 DDS::Subscriber_var sub;
00019 OpenDDS::DCPS::SubscriberImpl *sub_impl;
00020 DDS::DataReaderListener_var *listener;
00021
00027 int createSubscriber();
00028
00029 protected:
00030 DDS::SubscriberQos subQos;
00031
00032 public:
00033 DDS::DataReaderQos drQos;
00048 DDSSubscriber(CORBA::String_var channel_name);
00049
00050 ~DDSSubscriber()
00051 {
00052 delete listener;
00053 }
00054
00061 void consumerReady();
00062
00079 template <class DRV, class DR, class D>
00080 void addSubscription(
00081 void (*templateFunction)(D, void *), void *handlerParam=0)
00082 {
00083 std::cerr << "DDSSubscriber::addSubscription" << std::endl;
00084
00085 listener = new DDS::DataReaderListener_var
00086 (new ddsnc::ACSDDSNCDataReaderListener
00087 <DRV,DR,D>(templateFunction, handlerParam));
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097 }
00098
00099
00119 template <class D, class TSV, class TSI>
00120 void initialize()
00121 {
00122 std::cerr<< "DDSSubscriber::initialize()" << std::endl;
00123 createParticipant();
00124 if (CORBA::is_nil (participant.in()))
00125 std::cerr << "Participant is nil" << std::endl;
00126
00127 if(partitionName!=NULL){
00128 participant->get_default_subscriber_qos(subQos);
00129 subQos.partition.name.length(1);
00130 subQos.partition.name[0]=CORBA::string_dup(partitionName);
00131 }
00132
00133 createSubscriber();
00134
00135
00136 TSV ts;
00137 ts = new TSI();
00138 if (DDS::RETCODE_OK != ts->register_type(participant.in(),"")){
00139 std::cerr << "register_type failed" << std::endl;
00140 }
00141
00142
00143 initializeTopic(ts->get_type_name());
00144 if(CORBA::is_nil(topic.in()))
00145 std::cerr<< "Topic is nil" << std::endl;
00146
00147 sub->get_default_datareader_qos (drQos);
00148
00149 drQos.reliability.kind = ::DDS::RELIABLE_RELIABILITY_QOS;
00150 drQos.reliability.max_blocking_time.sec = 1;
00151
00152 drQos.history.kind = ::DDS::KEEP_LAST_HISTORY_QOS;
00153
00154 drQos.history.depth = 1;
00155 }
00156 };
00157 }
00158
00169 #define ACS_NEW_DDS_SUBSCRIBER(subscriber_p, idlStruct, channelName, handlerFunc, handlerParam) \
00170 { \
00171 subscriber_p= new ddsnc::DDSSubscriber(channelName); \
00172 subscriber_p->initialize<idlStruct, idlStruct##TypeSupport_var, idlStruct##TypeSupportImpl>(); \
00173 subscriber_p->addSubscription<idlStruct##DataReader_var, idlStruct##DataReader, idlStruct>(handlerFunc, handlerParam); \
00174 }
00175
00176 #endif