• Classes
  • Modules
  • Namespaces
  • Files
  • Related Pages
  • File List
  • File Members

acsncConsumer.h

Go to the documentation of this file.
00001 #ifndef CONSUMER_H
00002 #define CONSUMER_H
00003 
00004 /* @(#) $Id: acsncConsumer.h,v 1.72 2011/11/24 14:18:52 rtobar Exp $
00005 *
00006 *    Consumer Abstract base class for notification channel push structured event
00007 *    consumers.
00008 *    ALMA - Atacama Large Millimiter Array
00009 *
00010 *    (c) Associated Universities Inc., 2002   
00011 *    (c) European Southern Observatory, 2002
00012 *    Copyright by ESO (in the framework of the ALMA collaboration)
00013 *    and Cosylab 2002, All rights reserved
00014 *
00015 *    This library is free software; you can redistribute it and/or
00016 *    modify it under the terms of the GNU Lesser General Public
00017 *    License as published by the Free Software Foundation; either
00018 *    version 2.1 of the License, or (at your option) any later version.
00019 *
00020 *    This library is distributed in the hope that it will be useful,
00021 *    but WITHOUT ANY WARRANTY; without even the implied warranty of
00022 *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00023 *    Lesser General Public License for more details.
00024 *
00025 *    You should have received a copy of the GNU Lesser General Public
00026 *    License along with this library; if not, write to the Free Software
00027 *    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
00028 */
00029 
00034 #include "acsncHelper.h"
00035 #include <acstimeProfiler.h>
00036 #include "acsncCDBProperties.h"
00037 
00038 #include <list>
00039 namespace nc {
00040 
00053 class Consumer :
00054     protected Helper,
00055     public POA_CosNotifyComm::StructuredPushConsumer,
00056     protected virtual PortableServer::RefCountServantBase
00057 {
00058   public:
00069     Consumer(const char* channelName, const char* acsNCDomainName = 0);
00070 
00080     Consumer(const char* channelName, 
00081              CORBA::ORB_ptr orb_mp,
00082                  const char* acsNCDomainName = 0);
00083     
00107     Consumer(const char* channelName, 
00108              int argc, 
00109              char *argv[],
00110                  const char* acsNCDomainName = 0);
00111 
00122     virtual void 
00123     disconnect();
00124     
00138     virtual void 
00139     push_structured_event(const CosNotification::StructuredEvent &publishedEvent)= 0;
00140     
00154     virtual void 
00155     offer_change(const CosNotification::EventTypeSeq &added,
00156                  const CosNotification::EventTypeSeq &removed);
00157     
00167     virtual void 
00168     disconnect_structured_push_consumer();
00169 
00180     void
00181     consumerReady();
00182 
00192     void
00193     resume();
00194     
00205     void
00206     suspend();
00207 
00219     template <class T> void
00220     addSubscription()
00221         {
00222             //Create a tempory any
00223             CORBA::Any any;
00224             //Create a temporary instance of the ALMA event.
00225             T junk;
00226             
00227             //Store the ALMA event in the any to extract the name
00228             //Probably a nicer way of doing this but it's the best that
00229             //I can come up with at the moment.
00230             any <<= junk;
00231 
00232             //Use the version of addSubscription which really subscribes
00233             //to events from the channel
00234            
00235             if (any.type()->kind()!=CORBA::tk_sequence)
00236                 {
00237                 addSubscription(any.type()->name());
00238                 }
00239             else
00240                 {
00241                 std::string etName= acsnc::SEQUENCE_EVENT_TYPE_PREFIX; //_SequenceOf_
00242                 CORBA::Any a;
00243                 a._tao_set_typecode(any.type()->content_type());
00244                 etName+=a.type()->name();
00245                 addSubscription(etName.c_str());
00246                 }
00247         }
00248 
00259     template <class T> void
00260     removeSubscription()
00261         {
00262             //Create a tempory any
00263             CORBA::Any any;
00264             //Create a temporary instance of the ALMA event.
00265             T junk;
00266             
00267             //Store the ALMA event in the any to extract the name
00268             //Probably a nicer way of doing this but it's the best that
00269             //I can come up with at the moment.
00270             any <<= junk;
00271             
00272             //Use the version of removeSubscription which really unsubscribes
00273             //from an event type.
00274             if (any.type()->kind()!=CORBA::tk_sequence)
00275                 {
00276                 removeSubscription(any.type()->name());
00277                 }
00278             else
00279                 {
00280                 std::string etName= acsnc::SEQUENCE_EVENT_TYPE_PREFIX; //_SequenceOf_"
00281                 CORBA::Any a;
00282                 a._tao_set_typecode(any.type()->content_type());
00283                 etName+=a.type()->name();
00284                 removeSubscription(etName.c_str());
00285                 }
00286         }
00287     
00304     int
00305     addFilter(const char* type_name,
00306               const char* filter);
00307     
00318     bool
00319     removeFilter(int filter_id);
00320 
00325     virtual void reconnect(::NotifyMonitoringExt::EventChannelFactory *ecf);
00326     
00327     void setAntennaName(std::string antennaName);
00328 
00336     void setAutoreconnect(bool autoreconnect);
00337 
00342     bool setEventReceptionTimeout(int eventReceptionTimeout); 
00343 
00348     bool setConnectionCheckerFreq(int connectionCheckerFreq);
00349 
00350 
00351   protected:
00363     void
00364     init();
00365 
00369     virtual ~Consumer(){}
00371 
00381     void 
00382     createConsumer();
00383         
00384 
00386 
00390     CosNotifyChannelAdmin::ConsumerAdmin_var consumerAdmin_m;
00391 
00396     CosNotifyChannelAdmin::StructuredProxyPushSupplier_var proxySupplier_m;
00397 
00402     unsigned long long numEvents_m;
00403     
00407     CosNotifyComm::StructuredPushConsumer_var reference_m;
00408 
00418     virtual const char* 
00419     getFilterLanguage();
00420 
00424     CDBProperties::EventHandlerTimeoutMap handlerTimeoutMap_m;
00425 
00432     static double DEFAULT_MAX_PROCESS_TIME;
00433 
00437     Profiler *profiler_mp;
00438 
00439     std::string antennaName;
00440 
00446     bool autoreconnect_m;
00447 
00448   private:
00449 
00459     void
00460     init(CORBA::ORB_ptr orb);
00461 
00465     void reinit();
00466 
00470     bool shouldReconnect();
00471 
00476     static void* ncChecker(void* arg);
00477 
00482     void checkNotifyChannel();
00483 
00489     void createCheckerThread();
00490 
00495     void destroyCheckerThread();
00496 
00500     CORBA::ORB_ptr orb_mp;
00501 
00505     void operator=(const Consumer&);
00506 
00510     Consumer(const Consumer&);
00511 
00523     void
00524     addSubscription(const char* type_name);
00525 
00536     void
00537     removeSubscription(const char* type_name);
00538 
00539     CosNotifyChannelAdmin::AdminID adminid;
00540     CosNotifyChannelAdmin::ProxyID proxySupplierID;
00541 
00542     static const bool DEFAULT_AUTORECONNECT;
00543     static const int DEFAULT_EVENT_RECEPTION_TIMEOUT;
00544     static const int DEFAULT_CONNECTION_CHECKER_FREQ;
00545     bool stopNCCheckerThread;
00546     pthread_t ncCheckerThread;
00547     ACE_Thread_Mutex checkerThMutex_m;
00548     int eventReceptionTimeout_m;
00549     int connectionCheckerFreq_m;
00551 };
00552  }; 
00553 
00554 #endif /* CONSUMER_H */

Generated on Mon Jul 4 2016 08:50:20 for ACS-2016.6 C++ API by  doxygen 1.7.0