• Classes
  • Modules
  • Namespaces
  • Files
  • Related Pages
  • File List
  • File Members

bulkDataDistributer.h

Go to the documentation of this file.
00001 #ifndef _BULKDATA_DISTRIBUTER_H
00002 #define _BULKDATA_DISTRIBUTER_H
00003 /*******************************************************************************
00004  *    ALMA - Atacama Large Millimiter Array
00005  *    (c) European Southern Observatory, 2002
00006  *    Copyright by ESO (in the framework of the ALMA collaboration)
00007  *    and Cosylab 2002, All rights reserved
00008  *
00009  *    This library is free software; you can redistribute it and/or
00010  *    modify it under the terms of the GNU Lesser General Public
00011  *    License as published by the Free Software Foundation; either
00012  *    version 2.1 of the License, or (at your option) any later version.
00013  *
00014  *    This library is distributed in the hope that it will be useful,
00015  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
00016  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00017  *    Lesser General Public License for more details.
00018  *
00019  *    You should have received a copy of the GNU Lesser General Public
00020  *    License along with this library; if not, write to the Free Software
00021  *    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
00022  *
00023  *
00024  * "@(#)"
00025  *
00026  * who       when      what
00027  * --------  --------  ----------------------------------------------
00028  * oat       02/03/05  created 
00029  */
00030 
00031 
00032 #include "bulkDataSender.h"
00033 #include "bulkDataReceiver.h"
00034 
00035 #include <Pair_T.h>
00036 
00037 #include <acsQoS.h>
00038 
00042 namespace AcsBulkdata
00043 {  
00062     //forward declaration
00063     template<class TReceiverCallback, class TSenderCallback>
00064     class BulkDataDistributerNotifCb;
00065     
00066     template<class TReceiverCallback, class TSenderCallback>
00067     class BulkDataDistributer
00068     { 
00069 
00070         enum Flow_Status
00071         {
00072             FLOW_AVAILABLE,
00073             FLOW_NOT_AVAILABLE
00074         };
00075 
00076         typedef ACE_Pair< bulkdata::BulkDataReceiver_ptr, BulkDataSender<TSenderCallback> *> Sender_Map_Pair;
00077 
00078         /*typedef ACE_Hash_Map_Manager <ACE_CString, BulkDataSender<TSenderCallback> *,ACE_Null_Mutex>  Sender_Map;
00079           typedef ACE_Hash_Map_Entry <ACE_CString, BulkDataSender<TSenderCallback> * > Sender_Map_Entry;
00080           typedef ACE_Hash_Map_Iterator <ACE_CString, BulkDataSender<TSenderCallback> * ,ACE_Null_Mutex>  Sender_Map_Iterator;*/
00081 
00082         typedef ACE_Hash_Map_Manager <ACE_CString, Sender_Map_Pair, ACE_Null_Mutex>  Sender_Map;
00083         typedef ACE_Hash_Map_Entry <ACE_CString, Sender_Map_Pair > Sender_Map_Entry;
00084         typedef ACE_Hash_Map_Iterator <ACE_CString, Sender_Map_Pair ,ACE_Null_Mutex>  Sender_Map_Iterator;
00085 
00086         typedef ACE_Hash_Map_Manager <CORBA::ULong, Flow_Status, ACE_Null_Mutex> Flows_Status_Map;
00087         typedef ACE_Hash_Map_Entry <CORBA::ULong, Flow_Status> Flows_Status_Map_Entry;
00088         typedef ACE_Hash_Map_Iterator <CORBA::ULong, Flow_Status, ACE_Null_Mutex> Flows_Status_Map_Iterator;
00089 
00090         typedef ACE_Hash_Map_Manager <ACE_CString, CORBA::ULong, ACE_Null_Mutex> Recv_Status_Map;
00091         typedef ACE_Hash_Map_Entry <ACE_CString, CORBA::ULong> Recv_Status_Map_Entry;
00092         typedef ACE_Hash_Map_Iterator <ACE_CString, CORBA::ULong, ACE_Null_Mutex> Recv_Status_Map_Iterator;
00093 
00094 
00095 
00096       public:
00097       
00101         BulkDataDistributer();
00102       
00106         virtual ~BulkDataDistributer();
00107 
00111         virtual void multiConnect(bulkdata::BulkDataReceiverConfig *recvConfig_p, const char *fepsConfig, const ACE_CString& receiverName);
00112         
00116         virtual void multiDisconnect(const ACE_CString& receiverName);
00117 
00118         virtual BulkDataReceiver<TReceiverCallback> *getReceiver() 
00119             {
00120                 return &receiver_m;
00121             }
00122 
00123         virtual Sender_Map *getSenderMap() 
00124             {
00125                 return &senderMap_m;
00126             }
00127 
00128         virtual bool isRecvConnected (const ACE_CString& receiverName);
00129 
00130         virtual bool isSenderConnected (const ACE_CString& receiverName);
00131 
00132         virtual bool isReceiverConnected (const ACE_CString& receiverName);
00133 
00134         virtual void distSendStart (ACE_CString& flowName, CORBA::ULong flowNumber);
00135 
00136         virtual int distSendDataHsk (ACE_CString& flowName, ACE_Message_Block * frame_p, CORBA::ULong flowNumber);
00137 
00138         virtual int distSendData (ACE_CString& flowName, ACE_Message_Block * frame_p, CORBA::ULong flowNumber);
00139 
00140         virtual CORBA::Boolean distSendStopTimeout (ACE_CString& flowName, CORBA::ULong flowNumber);
00141 
00142         virtual void distSendStop (ACE_CString& flowName, CORBA::ULong flowNumber);
00143 
00144         void setTimeout (CORBA::ULong user_timeout) 
00145             { timeout_m = user_timeout; }
00146 
00147         void setContSvc (maci::ContainerServices * services_p)
00148             {  contSvc_p = services_p; }  
00149 
00153         void subscribeNotification(ACS::CBvoid_ptr notifCb);
00154 
00158         void notifySender(const ACSErr::Completion& comp);
00159 
00160         bulkdata::Connection getSenderConnectionState()
00161             {
00162                 return getReceiver()->getSenderConnectionState();
00163             }
00164 
00165       private:
00166 
00167         CORBA::Boolean getFlowReceiverStatus(const ACE_CString& receiverName, CORBA::ULong flowNumber);
00168 
00169         CORBA::Boolean isFlowReceiverAvailable(const ACE_CString& receiverName, CORBA::ULong flowNumber);
00170 
00171         BulkDataSender<TSenderCallback> *sender_p;
00172         
00173         BulkDataReceiver<TReceiverCallback> receiver_m;
00174 
00175         Sender_Map senderMap_m;
00176 
00177         Recv_Status_Map recvStatusMap_m;
00178         Flows_Status_Map flowsStatusMap_m;
00179 
00180         CORBA::ULong timeout_m;
00181         CORBA::ULong numberOfFlows;
00182         CORBA::ULong offset;
00183 
00184         maci::ContainerServices *contSvc_p;
00185 
00186         BulkDataDistributerNotifCb<TReceiverCallback, TSenderCallback> *distributerNotifCb_p;
00187 
00188         ACS::CBvoid_ptr locNotifCb_p;
00189     };
00190 
00191 
00192 
00193     template<class TReceiverCallback, class TSenderCallback = BulkDataSenderDefaultCallback>
00194     class BulkDataDistributerNotifCb: public virtual POA_ACS::CBvoid
00195     {
00196 
00197       public:
00198 
00199         BulkDataDistributerNotifCb(BulkDataDistributer<TReceiverCallback, TSenderCallback> *distr)
00200             {
00201                 ACS_TRACE("BulkDataDistributerNotifCb<>::BulkDataDistributerNotifCb");
00202 
00203                 distr_p = distr;
00204             }
00205 
00206         ~BulkDataDistributerNotifCb()
00207             {
00208                 ACS_TRACE("BulkDataDistributerNotifCb<>::~BulkDataDistributerNotifCb");
00209             }
00210 
00211         void working(const Completion &comp, const ACS::CBDescOut &desc) 
00212             {
00213             }
00214         
00215         void done(const Completion &comp, const ACS::CBDescOut &desc) 
00216             {
00217                 try
00218                     {
00219                     distr_p->notifySender(comp);
00220                     }
00221                 catch(ACSErr::ACSbaseExImpl &ex)
00222                     {
00223                     ACS_SHORT_LOG((LM_ERROR,"BulkDataDistributerNotifCb::done error"));
00224                     ex.log();
00225                     }
00226                 catch(...)
00227                     {
00228                     ACS_SHORT_LOG((LM_ERROR,"BulkDataDistributerNotifCb::done unknown error"));     
00229                     }
00230             }
00231 
00232         CORBA::Boolean negotiate (ACS::TimeInterval timeToTransmit, const ACS::CBDescOut &desc) 
00233             {
00234                 return true;
00235             }
00236 
00237       private:
00238 
00239         BulkDataDistributer<TReceiverCallback, TSenderCallback> *distr_p;
00240     };   
00241 
00242 
00243 }
00244 
00245 
00246 #include "bulkDataDistributer.i"
00247 
00248 #endif /* _BULKDATA_DISTRIBUTER_H */

Generated on Thu Jul 8 2010 19:47:47 for ACS-9.0 C++ API by  doxygen 1.7.0