00001 #ifndef _ACS_REQUEST_H_
00002 #define _ACS_REQUEST_H_
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 #include "acsdaemonS.h"
00032 #include <acsThread.h>
00033 #include <acsutilPorts.h>
00034 #include <queue>
00035 #include <memory>
00036
00037 #include <ace/Task.h>
00038 #include <ace/Activation_Queue.h>
00039 #include <ace/Method_Request.h>
00040 #include <ace/Guard_T.h>
00041
00042 #include <logging.h>
00043
00044 #include <set>
00045
00046 #include "acsDaemonUtils.h"
00047
00048
00049 #define CORBA_TIMEOUT 5000
00050
00051
00052
00053 #define EC_OK 0
00054
00055 #define EC_CANNOTCREATE 40
00056
00057 #define EC_CANNOTUSE 41
00058
00059 #define EC_FAILURE 42
00060
00061 #define EC_BADARGS 43
00062
00063 #define EC_NOPORT 44
00064
00065 #define EC_TIMEOUT 45
00066
00067
00068
00069
00070
00071
00072
00073 enum ACSServiceType {
00074 NAMING_SERVICE = 0,
00075 INTERFACE_REPOSITORY,
00076 CDB,
00077 RDB_CDB,
00078 NOTIFICATION_SERVICE,
00079 LOGGING_SERVICE,
00080 ACS_LOG_SERVICE,
00081 ALARM_SERVICE,
00082 MANAGER,
00083 UNKNOWN
00084 };
00085
00086 struct ACSService {
00087 const char *xmltag;
00088 const char *script;
00089 const char *impname;
00090 const char *imptype;
00091 const char *impport;
00092 const char *impexec;
00093 const char *svccorbaurl;
00094 std::string (*svcport)(int);
00095 std::string (*namedsvcport)(int, const char *);
00096 bool autorestart;
00097 bool async;
00098 const ACSServiceType* depententService;
00099 };
00100
00101 #define ACS_SERVICE_TYPES UNKNOWN
00102 #define ACS_SERVICE_INSTANCES 11
00103
00104
00105 const ACSServiceType noDependency[] = { UNKNOWN };
00106
00107
00108 const ACSServiceType namingServiceDependency[] = { NAMING_SERVICE, UNKNOWN };
00109
00110
00111 const ACSServiceType loggingServiceServiceDependency[] = { LOGGING_SERVICE, UNKNOWN };
00112
00113
00114
00115
00116 const ACSServiceType cdbDependency[] = { CDB, RDB_CDB, UNKNOWN };
00117
00121 const ACSService acsServices[] = {
00122 {
00123 "naming_service",
00124 "acsNamingService",
00125 "Naming Service Imp",
00126 "NamingServiceImp",
00127 "2981",
00128 "acsutilBlock -t 15 -s -k -b \"Imp is up and running...\" acsdaemonNamingServiceImp",
00129 "corbaloc::%s:%s/NameService",
00130 &ACSPorts::getNamingServicePort,
00131 NULL,
00132 true,
00133 false,
00134 noDependency
00135 }, {
00136 "interface_repository",
00137 "acsInterfaceRepository",
00138 "Interface Repository Imp",
00139 "InterfaceRepositoryImp",
00140 "2987",
00141 "acsutilBlock -t 15 -s -k -b \"Imp is up and running...\" acsdaemonInterfaceRepositoryImp",
00142 "corbaloc::%s:%s/InterfaceRepository",
00143 &ACSPorts::getIRPort,
00144 NULL,
00145 false,
00146 true,
00147 namingServiceDependency
00148 }, {
00149 "cdb",
00150 "acsConfigurationDatabase",
00151 "CDB Imp",
00152 "ConfigurationDatabaseImp",
00153 "2983",
00154 "acsutilBlock -t 15 -s -k -b \"Imp is up and running...\" acsdaemonConfigurationDatabaseImp",
00155 "corbaloc::%s:%s/CDB",
00156 &ACSPorts::getCDBPort,
00157 NULL,
00158 false,
00159 true,
00160 loggingServiceServiceDependency
00161 }, {
00162 "rdb_cdb",
00163 "acsRDBConfigurationDatabase",
00164 "CDB Imp",
00165 "ConfigurationDatabaseImp",
00166 "2983",
00167 "acsutilBlock -t 15 -s -k -b \"Imp is up and running...\" acsdaemonConfigurationDatabaseImp",
00168 "corbaloc::%s:%s/CDB",
00169 &ACSPorts::getCDBPort,
00170 NULL,
00171 false,
00172 true,
00173 loggingServiceServiceDependency
00174 }, {
00175 "notification_service",
00176 "acsNotifyService",
00177 "Notification Service Imp",
00178 "NotificationServiceImp",
00179 "2982",
00180 "acsutilBlock -t 15 -s -k -b \"Imp is up and running...\" acsdaemonNotificationServiceImp",
00181 "corbaloc::%s:%s/%s",
00182 NULL,
00183 &ACSPorts::getNotifyServicePort,
00184 true,
00185 false,
00186 namingServiceDependency
00187 }, {
00188 "logging_service",
00189 "acsLoggingService",
00190 "Logging Service Imp",
00191 "LoggingServiceImp",
00192 "2986",
00193 "acsutilBlock -t 15 -s -k -b \"Imp is up and running...\" acsdaemonLoggingServiceImp",
00194 "corbaloc::%s:%s/Log",
00195 &ACSPorts::getLoggingServicePort,
00196 NULL,
00197 true,
00198 false,
00199 namingServiceDependency
00200 }, {
00201 "acs_log",
00202 "acsACSLogService",
00203 "ACS Log Service Imp",
00204 "ACSLogServiceImp",
00205 "2985",
00206 "acsutilBlock -t 15 -s -k -b \"Imp is up and running...\" acsdaemonACSLogServiceImp",
00207 "corbaloc::%s:%s/ACSLogSvc",
00208 &ACSPorts::getLogPort,
00209 NULL,
00210 false,
00211 false,
00212 namingServiceDependency
00213 }, {
00214 "alarm_service",
00215 "acsAlarmService",
00216 "Alarm Service Imp",
00217 "AlarmServiceImp",
00218 "2988",
00219 "acsutilBlock -t 15 -s -k -b \"Imp is up and running...\" acsdaemonAlarmServiceImp",
00220 "corbaloc::%s:%s/AcsAlarmService",
00221 &ACSPorts::getAlarmServicePort,
00222 NULL,
00223 false,
00224 true,
00225 cdbDependency
00226 }, {
00227 "manager",
00228 "acsManager",
00229 "Manager Imp",
00230 "ManagerImp",
00231 "2984",
00232 "acsutilBlock -t 15 -s -k -b \"Imp is up and running...\" acsdaemonManagerImp",
00233 "corbaloc::%s:%s/Manager",
00234 &ACSPorts::getManagerPort,
00235 NULL,
00236 false,
00237 false,
00238 cdbDependency
00239 }, { NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, false, false, noDependency }
00240 };
00241
00242 enum ACSServiceRequestType {
00243 START_SERVICE,
00244 STOP_SERVICE
00245 };
00246
00247 enum ACSServiceRequestTarget {
00248 LOCAL,
00249 DAEMON,
00250 IMP
00251 };
00252
00253 ACSServiceType acsServiceXMLTagToEnum(const char *service);
00254
00255 class RequestProcessorThread;
00256
00257 class Request {
00258 public:
00259 virtual ~Request() {};
00260 virtual void abort() = 0;
00261 virtual bool execute() = 0;
00262 };
00263
00264 class RequestProcessorThread : public ACS::Thread {
00265 private:
00266 ACE_Thread_Mutex *m_mutex;
00267 ACE_Condition<ACE_Thread_Mutex> *m_wait;
00268 std::queue<Request*> pending;
00269 volatile bool running;
00270 public:
00271 RequestProcessorThread(const ACE_CString &name,
00272 const ACS::TimeInterval& responseTime=ThreadBase::defaultResponseTime,
00273 const ACS::TimeInterval& sleepTime=ThreadBase::defaultSleepTime);
00274 ~RequestProcessorThread();
00275 void onStart();
00276 void stop();
00277 void exit();
00278 void runLoop() throw(CORBA::SystemException, ::ACSErrTypeCommon::BadParameterEx);
00279 void process(Request* r);
00280 };
00281
00282 template <class R> class RequestChainContext;
00283
00284 template <class R> class ChainedRequest : public Request {
00285 private:
00286 RequestChainContext<R> *context;
00287 friend class RequestChainContext<R>;
00288 protected:
00289 virtual void complete();
00290 public:
00291 ChainedRequest() : context(NULL) {}
00292 void process(RequestChainContext<R> *icontext);
00293 virtual bool isAsync() { return false; }
00294 };
00295
00296 template <class R> class RequestChainContext {
00297 public:
00298 typedef std::deque<R*> Queue;
00299 Queue requests;
00300 private:
00301 RequestProcessorThread *rpt;
00302 R *curreq;
00303 bool inprocess;
00304
00305 bool hasAsync;
00306 bool failed;
00307 int asyncToComplete;
00308 std::set<ACSServiceType> asyncStartInProgress;
00309 bool syncPending;
00310
00311 ACE_Thread_Mutex mutex;
00312
00313 protected:
00314 virtual bool requestDone(R *request) = 0;
00315 virtual void chainDone() = 0;
00316 virtual void chainAborted() = 0;
00317 public:
00318 RequestChainContext(RequestProcessorThread *irpt) : rpt(irpt), curreq(NULL), inprocess(false),
00319 hasAsync(false), failed(false), asyncToComplete(0), asyncStartInProgress(), syncPending(false) {}
00320 virtual ~RequestChainContext() {
00321 while (!requests.empty()) {
00322 delete requests.front();
00323 requests.pop_front();
00324 }
00325 }
00326 RequestProcessorThread *getRequestProcessor() { return rpt; }
00327 void appendRequest(R *request) { requests.push_back(request); }
00328 void appendRequestOrdered(R *request) {
00329 ACSServiceType thisType = request->getDescription()->getACSService();
00330 typename Queue::iterator it = requests.begin();
00331 while (it != requests.end() && (*it)->getDescription()->getACSService() <= thisType)
00332 it++;
00333 requests.insert(it, request);
00334 }
00335 void prependRequest(R *request) { requests.push_front(request); }
00336 void prependRequestOrdered(R *request) {
00337 ACSServiceType thisType = request->getDescription()->getACSService();
00338 typename Queue::iterator it = requests.begin();
00339 while (it != requests.end() && (*it)->getDescription()->getACSService() > thisType)
00340 it++;
00341 requests.insert(it, request);
00342 }
00343 void proceed(R *lastreq = NULL);
00344 };
00345
00346
00347
00348 class ACSServiceRequestChainContext;
00349 class ACSDaemonContext;
00350
00351 class ACSServiceRequestDescription {
00352 private:
00353 ACSServiceType service;
00354 int instance_number;
00355 const char *host, *name, *corbalocName, *domain, *cdbxmldir;
00356 bool loadir, wait, recovery, async;
00357 AcsDaemonUtils m_daemonUtils;
00358
00359 ACE_CString prepareCommand(ACSDaemonContext *context, ACSServiceRequestType request_type, bool log);
00360
00361 public:
00362 ACSServiceRequestDescription(ACSServiceType iservice, int iinstance_number);
00363 ACSServiceRequestDescription(const ACSServiceRequestDescription &desc);
00364 ~ACSServiceRequestDescription();
00365 ACSErr::Completion_var executeLocal(ACSDaemonContext *context, ACSServiceRequestType request_type);
00366 ACSErr::Completion_var executeRemote(ACSDaemonContext *context, ACSServiceRequestType request_type, CORBA::ORB_ptr orb, acsdaemon::DaemonCallback_ptr cbptr, const char *corbaloc);
00367 void setFromXMLAttributes(const char **atts);
00368 void setName(const char *iname) { name = iname == NULL ? NULL : strdup(iname); }
00369 void setCorbalocName(const char *iname) { corbalocName = iname == NULL ? NULL : strdup(iname); }
00370 void setDomain(const char *idomain) { domain = idomain == NULL ? NULL : strdup(idomain); }
00371 void setLoadIR(bool iloadir) { loadir = iloadir; }
00372 void setWaitLoadIR(bool iwait) { wait = iwait; }
00373 void setRecovery(bool irecovery) { recovery = irecovery; }
00374 void setCdbXMLDir(const char *icdbxmldir) { cdbxmldir = icdbxmldir == NULL ? NULL : strdup(icdbxmldir); }
00375 int getInstanceNumber() { return instance_number; }
00376 const char *getName() { return name; }
00377 const char *getCorbalocName() { return corbalocName; }
00378 const char *getHost() { return host == NULL ? ACSPorts::getIP() : host; }
00379 ACSServiceType getACSService() { return service; }
00380 const char *getACSServiceName() { return acsServices[service].xmltag; }
00381 bool isAsync() { return async; }
00382 const ACSServiceType* getDependentService() { return acsServices[service].depententService; }
00383 };
00384
00385
00386 class ACSServiceRequest : public ChainedRequest<ACSServiceRequest>, POA_acsdaemon::DaemonCallback {
00387 private:
00388 ACSDaemonContext *context;
00389 ACSServiceRequestTarget target;
00390 ACSServiceRequestType request_type;
00391 ACSServiceRequestDescription *desc;
00392 acsdaemon::DaemonCallback_var callback;
00393 const ACSErr::Completion *completion;
00394 acsdaemon::DaemonCallback_var cbvar;
00395 acsdaemon::DaemonCallback_ptr cbptr();
00396 void release();
00397 protected:
00398 void complete();
00399 void abort();
00400 bool execute();
00401 public:
00402 ACSServiceRequest(ACSDaemonContext *icontext, ACSServiceRequestTarget itarget, ACSServiceRequestType itype, ACSServiceRequestDescription *idesc, acsdaemon::DaemonCallback_ptr icallback = NULL);
00403 ~ACSServiceRequest();
00404 void done(const ::ACSErr::Completion &comp);
00405 void working(const ::ACSErr::Completion &comp);
00406 const ACSErr::Completion *getCompletion() { return completion; }
00407 bool isErrorFree() { return completion == NULL || completion->previousError.length() == 0; }
00408 ACSServiceRequestTarget getRequestTarget() { return target; }
00409 ACSServiceRequestType getRequestType() { return request_type; }
00410 ACSServiceRequestDescription *getDescription() { return desc; }
00411 const char *getACSServiceName() { return desc->getACSServiceName(); }
00412 int getInstanceNumber() { return desc->getInstanceNumber(); }
00413 const char *getHost() { return desc->getHost(); }
00414 virtual bool isAsync() { return desc->isAsync(); }
00415 };
00416
00417 class ACSServiceRequestChainContext : public RequestChainContext<ACSServiceRequest> {
00418 private:
00419 ACSDaemonContext *context;
00420 ACSServiceRequestType request_type;
00421 bool reuse_services;
00422 acsdaemon::DaemonSequenceCallback_var callback;
00423 int instance_number;
00424 bool free_instance;
00425 protected:
00426 bool requestDone(ACSServiceRequest *request);
00427 void chainDone();
00428 void chainAborted();
00429 public:
00430 ACSServiceRequestChainContext(ACSDaemonContext *icontext, ACSServiceRequestType itype, bool ireuse_services, acsdaemon::DaemonSequenceCallback_ptr icallback);
00431 ~ACSServiceRequestChainContext();
00432 void addRequest(const char *iservice, const char **atts);
00433 void startProcessing() { proceed(); }
00434 };
00435
00436
00437 class AsyncRequestThreadPool : public ACE_Task_Base, public Logging::Loggable
00438 {
00439 public:
00440
00441 static void configure(const char* processName, LoggingProxy *log, int threads)
00442 {
00443 ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
00444 process_name = processName;
00445 logger = log;
00446 conf_threads = threads;
00447 }
00448
00449 static ACE_CString getProcessName()
00450 {
00451 ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
00452 return process_name;
00453 }
00454
00455
00456 static AsyncRequestThreadPool* getInstance()
00457 {
00458 ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
00459 if (!instance_)
00460 instance_ = new AsyncRequestThreadPool(conf_threads);
00461 return instance_;
00462 }
00463
00464 static void destroy()
00465 {
00466 ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
00467 if (instance_)
00468 {
00469 instance_->shutdown();
00470 delete instance_;
00471 instance_ = 0;
00472 }
00473 }
00474
00475 virtual int svc (void);
00476
00477 int enqueue (ACE_Method_Request *request);
00478
00479 void shutdown();
00480
00481 private:
00482
00483 AsyncRequestThreadPool (int n_threads = 1);
00484
00485 static ACE_Thread_Mutex mutex_;
00486 static int conf_threads;
00487 static ACE_CString process_name;
00488 static LoggingProxy *logger;
00489
00490 static AsyncRequestThreadPool* instance_;
00491
00492 ACE_Activation_Queue activation_queue_;
00493
00494 int m_threads;
00495 };
00496
00497 #endif
00498