• Classes
  • Modules
  • Namespaces
  • Files
  • Related Pages
  • File List
  • File Members

bulkDataNTArrayThread.h

Go to the documentation of this file.
00001 #ifndef BD_ARRAY_THREAD_H
00002 #define BD_ARRAY_THREAD_H
00003 /*******************************************************************************
00004  * ALMA - Atacama Large Millimeter Array
00005  * Copyright (c) AUI - Associated Universities Inc., 2011
00006  * (in the framework of the ALMA collaboration).
00007  * All rights reserved.
00008  * 
00009  * This library is free software; you can redistribute it and/or
00010  * modify it under the terms of the GNU Lesser General Public
00011  * License as published by the Free Software Foundation; either
00012  * version 2.1 of the License, or (at your option) any later version.
00013  * 
00014  * This library is distributed in the hope that it will be useful,
00015  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00016  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
00017  * Lesser General Public License for more details.
00018  * 
00019  * You should have received a copy of the GNU Lesser General Public
00020  * License along with this library; if not, write to the Free Software
00021  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
00022  *******************************************************************************
00023  * 
00024  * "@(#) $Id: bulkDataNTArrayThread.h,v 1.1 2013/02/11 18:37:36 rbourtem Exp $"
00025  *
00026  * who       when        what
00027  * --------  ----------  ----------------------------------------------
00028  */
00029 
00030 //
00031 // System stuff
00032 //
00033 #include <set>
00034 #include <list>
00035 //#include <boost/tuple/tuple.hpp>
00036 #include <utility>
00037 #include <System_Time.h>
00038 #include <pthread.h>
00039 
00040 //
00041 // ACS stuff
00042 //
00043 #include <acsThread.h>
00044 /*#include <bulkDataDistributerC.h>
00045 #include <acsncRTSupplier.h>
00046 #include <acsncSimpleConsumer.h>
00047 #include <RepeatGuard.h>*/
00048 
00049 //
00050 // ICD stuff
00051 //
00052 /*#include <ObservationControlC.h>
00053 #include <SpectralResolutionType.h>
00054 #include <ControlBasicInterfacesC.h>*/
00055 
00056 //
00057 // CORR stuff
00058 //
00059 /*#include <CorrEx.h>
00060 #include <CorrMemoryHeap.h>
00061 #include <CorrThreadSyncGuard.h>
00062 #include <CorrTimingStats.h>
00063 #include <DataCollector.h>
00064 #include <NodeC.h>*/
00065 
00066 //
00067 // Local stuff
00068 //
00069 /*#include "MasterAlarm.h"
00070 #include "MasterBulkDataSender.h"
00071 #include "ConfigBasket.h"
00072 #include "NodesCluster.h"
00073 #include "Interferometer.h"
00074 #include "BlobStreamerThread.h"*/
00075 #include "bulkDataNTGenStreamerThread.h"
00076 
00077 
00078 namespace AcsBulkdata
00079 {
00087 class BulkDataNTArrayThread : public ACS::Thread
00088 {
00089 public:
00093         BulkDataNTArrayThread(const ACE_CString &name,
00094                         const std::string &streamName,
00095                         const std::string &sendFlowName,
00096                         const std::string &qosLib,
00097                         const double &throttling,
00098                         const double &sendTimeout,
00099                         const double &ACKTimeout);
00100 
00103         ~BulkDataNTArrayThread();
00104 
00107         void run();
00108 
00109         ACE_CString getName() const { return name; };
00110 
00117         bool addDataEvent(const uint8_t *buffer, const size_t size);
00118 
00122         void startSequence();
00123 
00127         void stopSequence();
00128 
00129 
00130 private:
00131 
00132         ACE_CString name;
00133 
00134         std::string m_streamName;
00135 
00136         std::string m_sendFlowName;
00137 
00138         std::string m_qosLib;
00139 
00142         enum SequenceEndStatus
00143         {
00144                 SequenceEndStatus_OK,
00145                 SequenceEndStatus_EXCEPTION,
00146                 SequenceEndStatus_TIMEOUT,
00147                 SequenceEndStatus_STOPPED
00148         };
00149 
00150         //
00151         // lower limit for data packages sent through a blob streamer
00152         //
00153         //static const unsigned int m_blobThresholdSize = 41943040;
00154 
00155         //
00156         // copies not allowed
00157         //
00158         BulkDataNTArrayThread(const BulkDataNTArrayThread &toCopy);
00159 
00160         //
00161         // assignment not allowed
00162         //
00163         BulkDataNTArrayThread &operator =(const BulkDataNTArrayThread &);
00164 
00167         static const ACS::TimeInterval m_accessTimeout = 50000000LLU;
00168         //static const ACS::TimeInterval m_accessTimeout = 5000000000LLU;
00171         //Corr::CDP::MemoryHeap *m_mh_p;
00172 
00176         //TODO
00177         StreamerThread * m_Streamer;
00178         //std::map<SpectralResolutionTypeMod::SpectralResolutionType, Corr::CDP::Master::Blob::StreamerThread *> m_blobStreamer;
00179 
00182         std::list< std::pair<uint8_t *, size_t> > m_frontEndBuffer;
00183 
00186         pthread_mutex_t m_eventListMutex;
00187 
00192         pthread_mutex_t m_accessMutex;
00193 
00199     pthread_cond_t m_condition;
00200 
00204         //CorrEx m_eventListErr;
00205 
00209         pthread_cond_t m_newDataCondition;
00210 
00216         bool m_addDataEventLogFlag;
00217 
00227          pthread_cond_t m_stopCondition;
00228 
00234         bool m_sequenceStopFlag;
00235 
00239         bool sequenceAlreadyRunningFlag;
00240 
00241         double m_throttling;
00242 
00243         double m_sendTimeout;
00244 
00245         double m_ACKTimeout;
00246 
00252         virtual void exit()
00253         {
00254                 ACS::Thread::exit();
00255 
00256                 pthread_cond_broadcast(&m_condition);
00257         }
00258 
00263         SequenceEndStatus handleSequenceLoop();
00264 
00271         void waitForDataEvent(const ACS::TimeInterval to, std::list< std::pair<uint8_t *, size_t> > &out);
00272 
00277         void deleteFrontEndBuffer(const ACS::TimeInterval _to, std::list< std::pair<uint8_t *, size_t> > &out);
00278 
00279         void relyDataToStreamer(std::list< std::pair<uint8_t *, size_t> > &data);
00280 
00283         void abort(const std::string &reason);
00284 };
00285 };
00286 
00287 #endif /* BD_ARRAY_THREAD_H */
00288 
00289 /*___oOo___*/

Generated on Mon May 4 2015 08:27:43 for ACS-2015.4 C++ API by  doxygen 1.7.0