Go to the documentation of this file.00001 #ifndef CONSUMER_H
00002 #define CONSUMER_H
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00034 #include "acsncHelper.h"
00035 #include <acstimeProfiler.h>
00036 #include "acsncCDBProperties.h"
00037
00038 #include <list>
00039 namespace nc {
00040
00053 class Consumer :
00054 protected Helper,
00055 public POA_CosNotifyComm::StructuredPushConsumer,
00056 protected virtual PortableServer::RefCountServantBase
00057 {
00058 public:
00069 Consumer(const char* channelName, const char* acsNCDomainName = 0);
00070
00080 Consumer(const char* channelName,
00081 CORBA::ORB_ptr orb_mp,
00082 const char* acsNCDomainName = 0);
00083
00107 Consumer(const char* channelName,
00108 int argc,
00109 char *argv[],
00110 const char* acsNCDomainName = 0);
00111
00122 virtual void
00123 disconnect();
00124
00138 virtual void
00139 push_structured_event(const CosNotification::StructuredEvent &publishedEvent)= 0;
00140
00154 virtual void
00155 offer_change(const CosNotification::EventTypeSeq &added,
00156 const CosNotification::EventTypeSeq &removed);
00157
00167 virtual void
00168 disconnect_structured_push_consumer();
00169
00180 void
00181 consumerReady();
00182
00192 void
00193 resume();
00194
00205 void
00206 suspend();
00207
00219 template <class T> void
00220 addSubscription()
00221 {
00222
00223 CORBA::Any any;
00224
00225 T junk;
00226
00227
00228
00229
00230 any <<= junk;
00231
00232
00233
00234
00235 if (any.type()->kind()!=CORBA::tk_sequence)
00236 {
00237 addSubscription(any.type()->name());
00238 }
00239 else
00240 {
00241 std::string etName= acsnc::SEQUENCE_EVENT_TYPE_PREFIX;
00242 CORBA::Any a;
00243 a._tao_set_typecode(any.type()->content_type());
00244 etName+=a.type()->name();
00245 addSubscription(etName.c_str());
00246 }
00247 }
00248
00259 template <class T> void
00260 removeSubscription()
00261 {
00262
00263 CORBA::Any any;
00264
00265 T junk;
00266
00267
00268
00269
00270 any <<= junk;
00271
00272
00273
00274 if (any.type()->kind()!=CORBA::tk_sequence)
00275 {
00276 removeSubscription(any.type()->name());
00277 }
00278 else
00279 {
00280 std::string etName= acsnc::SEQUENCE_EVENT_TYPE_PREFIX;
00281 CORBA::Any a;
00282 a._tao_set_typecode(any.type()->content_type());
00283 etName+=a.type()->name();
00284 removeSubscription(etName.c_str());
00285 }
00286 }
00287
00304 int
00305 addFilter(const char* type_name,
00306 const char* filter);
00307
00318 bool
00319 removeFilter(int filter_id);
00320
00325 virtual void reconnect(::NotifyMonitoringExt::EventChannelFactory *ecf);
00326
00327 void setAntennaName(std::string antennaName);
00328
00329 protected:
00341 void
00342 init();
00343
00347 virtual ~Consumer(){}
00349
00359 void
00360 createConsumer();
00361
00362
00364
00368 CosNotifyChannelAdmin::ConsumerAdmin_var consumerAdmin_m;
00369
00374 CosNotifyChannelAdmin::StructuredProxyPushSupplier_var proxySupplier_m;
00375
00380 unsigned long long numEvents_m;
00381
00385 CosNotifyComm::StructuredPushConsumer_var reference_m;
00386
00396 virtual const char*
00397 getFilterLanguage();
00398
00402 CDBProperties::EventHandlerTimeoutMap handlerTimeoutMap_m;
00403
00410 static double DEFAULT_MAX_PROCESS_TIME;
00411
00415 Profiler *profiler_mp;
00416
00417 std::string antennaName;
00418 private:
00419
00429 void
00430 init(CORBA::ORB_ptr orb);
00431
00435 CORBA::ORB_ptr orb_mp;
00436
00440 void operator=(const Consumer&);
00441
00445 Consumer(const Consumer&);
00446
00458 void
00459 addSubscription(const char* type_name);
00460
00471 void
00472 removeSubscription(const char* type_name);
00473
00474 CosNotifyChannelAdmin::AdminID adminid;
00475 CosNotifyChannelAdmin::ProxyID proxySupplierID;
00477 };
00478 };
00479
00480 #endif