• Classes
  • Modules
  • Namespaces
  • Files
  • Related Pages
  • File List
  • File Members

bulkDataSender.h

Go to the documentation of this file.
00001 #ifndef _BULKDATA_SENDER_H
00002 #define _BULKDATA_SENDER_H
00003 /*******************************************************************************
00004  *    ALMA - Atacama Large Millimiter Array
00005  *    (c) European Southern Observatory, 2002
00006  *    Copyright by ESO (in the framework of the ALMA collaboration)
00007  *    and Cosylab 2002, All rights reserved
00008  *
00009  *    This library is free software; you can redistribute it and/or
00010  *    modify it under the terms of the GNU Lesser General Public
00011  *    License as published by the Free Software Foundation; either
00012  *    version 2.1 of the License, or (at your option) any later version.
00013  *
00014  *    This library is distributed in the hope that it will be useful,
00015  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
00016  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00017  *    Lesser General Public License for more details.
00018  *
00019  *    You should have received a copy of the GNU Lesser General Public
00020  *    License along with this library; if not, write to the Free Software
00021  *    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
00022  *
00023  *
00024  * "@(#)"
00025  *
00026  * who       when      what
00027  * --------  --------  ----------------------------------------------
00028  * oat       27/01/05  created 
00029  */
00030 
00031 #include <iostream>
00032 
00033 #include "orbsvcs/AV/AVStreams_i.h"
00034 #include "orbsvcs/AV/Endpoint_Strategy.h"
00035 #include "orbsvcs/AV/Protocol_Factory.h"
00036 #include "orbsvcs/AV/Flows_T.h"
00037 #include "orbsvcs/AV/Transport.h"
00038 #include "orbsvcs/AV/Policy.h"
00039 
00040 #include "ACSBulkDataError.h"
00041 #include "bulkDataSenderDefaultCb.h"
00042 #include "bulkDataFlowProducer.h"
00043 
00044 #include "bulkDataC.h"
00045 
00046 // #include "ace/High_Res_Timer.h"
00047 
00048 // #include <acsQoS.h>
00049 
00053 namespace AcsBulkdata
00054 {  
00074     template<class TSenderCallback>
00075     class BulkDataSender
00076     {
00077       public:
00078       
00082         BulkDataSender();
00083       
00087         virtual ~BulkDataSender();
00088       
00096         void initialize();
00097 
00106         void createSingleFlow();
00107 
00118         void createMultipleFlows(const char *fepsConfig);
00119 
00128         void connectToPeer(bulkdata::BulkDataReceiverConfig *recvConfig_p);
00129 
00140         void getFlowProtocol(ACE_CString &flowname, TAO_AV_Protocol_Object *&currentProtocol_p);
00141 
00152         void startSend(CORBA::ULong flownumber, ACE_Message_Block *param = 0);
00153 
00164         void startSend(CORBA::ULong flownumber, const char *param, size_t len);
00165 
00176         void sendData(CORBA::ULong flownumber, ACE_Message_Block *buffer);
00177            
00190         void sendData(CORBA::ULong flownumber, const char *buffer, size_t len);
00191 
00202         void stopSend(CORBA::ULong flownumber);
00203 
00211         void disconnectPeer();
00212  
00213         TAO_StreamCtrl * getStreamCtrl() 
00214             {
00215                 return streamctrl_p;
00216             }
00217 
00218         const char *getFlowSpec(const ACE_CString & flowName);
00219 
00226         std::vector<std::string> getFlowNames();
00227 
00228         /* THE FOLLOWING METHODS ARE UNDER TESTING - PLEASE DO NOT USE THEM */
00229         /********************************************************************/
00230 
00231         void startStream(CORBA::ULong flownumber);
00232         
00233         void sendStream(CORBA::ULong flownumber, ACE_Message_Block *buffer);
00234 
00235         void stopStream(CORBA::ULong flownumber);
00236 
00237         /********************************************************************/
00238 
00239 
00240       private:
00241 
00242         typedef ACE_Hash_Map_Manager<ACE_CString, BulkDataFlowProducer<TSenderCallback> *, ACE_Null_Mutex> FepObjects;
00243         typedef ACE_Hash_Map_Iterator<ACE_CString, BulkDataFlowProducer<TSenderCallback> *, ACE_Null_Mutex>  FepObjectsIterator;
00244 
00245         typedef ACE_Hash_Map_Manager<ACE_CString, ACE_HANDLE, ACE_Null_Mutex> HandleMap;
00246         typedef ACE_Hash_Map_Iterator<ACE_CString, ACE_HANDLE, ACE_Null_Mutex>  HandleMapIterator;
00247 
00254         void initPartA();
00255 
00262         AVStreams::StreamEndPoint_A_ptr createSepA();
00263 
00273         AVStreams::FlowProducer_ptr createFepProducerA(ACE_CString &flowname,
00274                                                        AVStreams::protocolSpec protocols,
00275                                                        ACE_CString &format,
00276                                                        TAO_StreamCtrl *strctrl_p);
00277 
00278 
00287         void addFepToSep(AVStreams::StreamEndPoint_A_ptr locSepA_p, AVStreams::FlowProducer_ptr locFepA_p);
00288 
00289 
00296         TAO_StreamCtrl *createStreamCtrl();     
00297 
00298 
00305         //      AVStreams::streamQoS * create_QoS();
00306 
00307 
00321         const char * createFwdFlowSpec(ACE_CString &flowname,
00322                                        ACE_CString &direction,
00323                                        ACE_CString &formatName,
00324                                        ACE_CString &flowProtocol,
00325                                        ACE_CString &carrierProtocol,
00326                                        ACE_CString &localAddress,
00327                                        ACE_CString &remoteAddress);
00328 
00329 
00337         void setReceiverConfig(bulkdata::BulkDataReceiverConfig *recvConfig_p);
00338 
00339                     
00340 
00347         AVStreams::StreamEndPoint_A_ptr getStreamEndPointA();
00348 
00349 
00356         void deleteStreamCtrl();
00357 
00358 
00359 
00366         void deleteFepsA();
00367 
00374         void deleteSepA();
00375 
00376         void deleteConnector();
00377 
00378         void deleteHandler();
00379 
00388         const char * createFlowSpec(ACE_CString &flowname,
00389                                     ACE_CString &fepProtocol);
00390 
00391         void mergeFlowSpecs();
00392 
00396         TAO_AV_Endpoint_Reactive_Strategy_A<TAO_StreamEndPoint_A,TAO_VDev,AV_Null_MediaCtrl> endpointStrategy_m;
00397 
00398         AVStreams::StreamEndPoint_A_var sepA_p;
00399 
00400         AVStreams::StreamEndPoint_B_var sepB_p;
00401 
00402 
00403         //AVStreams::streamQoS_var the_qos;
00404         struct FepsCfgA
00405         {
00406             ACE_CString fepsFlowname;
00407             ACE_CString fepsFormat;
00408             ACE_CString fepsProtocol;
00409         };
00410 
00411         FepObjects fepMap_m;
00412 
00413         HandleMap handleMap_m;
00414 
00415         AVStreams::flowSpec_var recvFeps_p;
00416 
00417         AVStreams::flowSpec senderFeps_m;
00418 
00419         TAO_StreamEndPoint_A * sepRefCount_p;
00420 
00421         CORBA::Boolean disconnectPeerFlag;
00422     
00423         AVStreams::flowSpec flowSpec_m;
00424 
00425         TAO_StreamCtrl *streamctrl_p;
00426 
00430         //void operator=(const BulkDataSender&);
00431 
00432 //      public:
00433 // this part should be private
00434 // deprecated and back-incompatible
00435 // needed for the distributer but will be removed as soon as possible
00436 //
00437 //      AVStreams::flowSpec flowSpec_m;
00438 //      TAO_StreamCtrl *streamctrl_p;
00439     };
00440 }
00441 
00442 #include "bulkDataSender.i"
00443 
00444 #endif /* _BULKDATA_SENDER_H */

Generated on Thu Jul 8 2010 19:47:47 for ACS-9.0 C++ API by  doxygen 1.7.0