00001 #ifndef _ACS_REQUEST_H_
00002 #define _ACS_REQUEST_H_
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 #include "acsdaemonS.h"
00032 #include <acsThread.h>
00033 #include <acsutilPorts.h>
00034 #include <queue>
00035 #include <memory>
00036
00037 #include <ace/Task.h>
00038 #include <ace/Activation_Queue.h>
00039 #include <ace/Method_Request.h>
00040 #include <ace/Guard_T.h>
00041
00042 #include <logging.h>
00043
00044 #include <set>
00045
00046 #include "acsDaemonUtils.h"
00047
00048
00049 #define CORBA_TIMEOUT 5000
00050
00051
00052
00053 #define EC_OK 0
00054
00055 #define EC_CANNOTCREATE 40
00056
00057 #define EC_CANNOTUSE 41
00058
00059 #define EC_FAILURE 42
00060
00061 #define EC_BADARGS 43
00062
00063 #define EC_NOPORT 44
00064
00065 #define EC_TIMEOUT 45
00066
00067
00068 enum ACSServiceType {
00069 NAMING_SERVICE = 0,
00070 INTERFACE_REPOSITORY,
00071 CDB,
00072 RDB_CDB,
00073 NOTIFICATION_SERVICE,
00074 LOGGING_SERVICE,
00075 ACS_LOG_SERVICE,
00076 ALARM_SERVICE,
00077 MANAGER,
00078 UNKNOWN
00079 };
00080
00081 struct ACSService {
00082 const char *xmltag;
00083 const char *script;
00084 const char *impname;
00085 const char *imptype;
00086 const char *impport;
00087 const char *impexec;
00088 const char *svccorbaurl;
00089 std::string (*svcport)(int);
00090 std::string (*namedsvcport)(int, const char *);
00091 bool autorestart;
00092 bool async;
00093 const ACSServiceType* depententService;
00094 };
00095
00096 #define ACS_SERVICE_TYPES UNKNOWN
00097 #define ACS_SERVICE_INSTANCES 11
00098
00099 const ACSServiceType noDependency[] = { UNKNOWN };
00100 const ACSServiceType namingServiceDependency[] = { NAMING_SERVICE, UNKNOWN };
00101 const ACSServiceType cdbDependency[] = { CDB, RDB_CDB, UNKNOWN };
00102
00103
00104 const ACSService acsServices[] = {
00105 {
00106 "naming_service",
00107 "acsNamingService",
00108 "Naming Service Imp",
00109 "NamingServiceImp",
00110 "2981",
00111 "acsutilBlock -t 15 -s -k -b \"Imp is up and running...\" acsdaemonNamingServiceImp",
00112 "corbaloc::%s:%s/NameService",
00113 &ACSPorts::getNamingServicePort,
00114 NULL,
00115 true,
00116 false,
00117 noDependency
00118 }, {
00119 "interface_repository",
00120 "acsInterfaceRepository",
00121 "Interface Repository Imp",
00122 "InterfaceRepositoryImp",
00123 "2987",
00124 "acsutilBlock -t 15 -s -k -b \"Imp is up and running...\" acsdaemonInterfaceRepositoryImp",
00125 "corbaloc::%s:%s/InterfaceRepository",
00126 &ACSPorts::getIRPort,
00127 NULL,
00128 false,
00129 true,
00130 namingServiceDependency
00131 }, {
00132 "cdb",
00133 "acsConfigurationDatabase",
00134 "CDB Imp",
00135 "ConfigurationDatabaseImp",
00136 "2983",
00137 "acsutilBlock -t 15 -s -k -b \"Imp is up and running...\" acsdaemonConfigurationDatabaseImp",
00138 "corbaloc::%s:%s/CDB",
00139 &ACSPorts::getCDBPort,
00140 NULL,
00141 false,
00142 true,
00143 namingServiceDependency
00144 }, {
00145 "rdb_cdb",
00146 "acsRDBConfigurationDatabase",
00147 "CDB Imp",
00148 "ConfigurationDatabaseImp",
00149 "2983",
00150 "acsutilBlock -t 15 -s -k -b \"Imp is up and running...\" acsdaemonConfigurationDatabaseImp",
00151 "corbaloc::%s:%s/CDB",
00152 &ACSPorts::getCDBPort,
00153 NULL,
00154 false,
00155 true,
00156 namingServiceDependency
00157 }, {
00158 "notification_service",
00159 "acsNotifyService",
00160 "Notification Service Imp",
00161 "NotificationServiceImp",
00162 "2982",
00163 "acsutilBlock -t 15 -s -k -b \"Imp is up and running...\" acsdaemonNotificationServiceImp",
00164 "corbaloc::%s:%s/%s",
00165 NULL,
00166 &ACSPorts::getNotifyServicePort,
00167 true,
00168 false,
00169 namingServiceDependency
00170 }, {
00171 "logging_service",
00172 "acsLoggingService",
00173 "Logging Service Imp",
00174 "LoggingServiceImp",
00175 "2986",
00176 "acsutilBlock -t 15 -s -k -b \"Imp is up and running...\" acsdaemonLoggingServiceImp",
00177 "corbaloc::%s:%s/Log",
00178 &ACSPorts::getLoggingServicePort,
00179 NULL,
00180 true,
00181 false,
00182 namingServiceDependency
00183 }, {
00184 "acs_log",
00185 "acsACSLogService",
00186 "ACS Log Service Imp",
00187 "ACSLogServiceImp",
00188 "2985",
00189 "acsutilBlock -t 15 -s -k -b \"Imp is up and running...\" acsdaemonACSLogServiceImp",
00190 "corbaloc::%s:%s/ACSLogSvc",
00191 &ACSPorts::getLogPort,
00192 NULL,
00193 false,
00194 false,
00195 namingServiceDependency
00196 }, {
00197 "alarm_service",
00198 "acsAlarmService",
00199 "Alarm Service Imp",
00200 "AlarmServiceImp",
00201 "2988",
00202 "acsutilBlock -t 15 -s -k -b \"Imp is up and running...\" acsdaemonAlarmServiceImp",
00203 "corbaloc::%s:%s/AcsAlarmService",
00204 &ACSPorts::getAlarmServicePort,
00205 NULL,
00206 false,
00207 true,
00208 cdbDependency
00209 }, {
00210 "manager",
00211 "acsManager",
00212 "Manager Imp",
00213 "ManagerImp",
00214 "2984",
00215 "acsutilBlock -t 15 -s -k -b \"Imp is up and running...\" acsdaemonManagerImp",
00216 "corbaloc::%s:%s/Manager",
00217 &ACSPorts::getManagerPort,
00218 NULL,
00219 false,
00220 false,
00221 cdbDependency
00222 }, { NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, false, false, noDependency }
00223 };
00224
00225 enum ACSServiceRequestType {
00226 START_SERVICE,
00227 STOP_SERVICE
00228 };
00229
00230 enum ACSServiceRequestTarget {
00231 LOCAL,
00232 DAEMON,
00233 IMP
00234 };
00235
00236 ACSServiceType acsServiceXMLTagToEnum(const char *service);
00237
00238 class RequestProcessorThread;
00239
00240 class Request {
00241 public:
00242 virtual ~Request() {};
00243 virtual void abort() = 0;
00244 virtual bool execute() = 0;
00245 };
00246
00247 class RequestProcessorThread : public ACS::Thread {
00248 private:
00249 ACE_Thread_Mutex *m_mutex;
00250 ACE_Condition<ACE_Thread_Mutex> *m_wait;
00251 std::queue<Request*> pending;
00252 volatile bool running;
00253 public:
00254 RequestProcessorThread(const ACE_CString &name,
00255 const ACS::TimeInterval& responseTime=ThreadBase::defaultResponseTime,
00256 const ACS::TimeInterval& sleepTime=ThreadBase::defaultSleepTime);
00257 ~RequestProcessorThread();
00258 void onStart();
00259 void stop();
00260 void exit();
00261 void runLoop() throw(CORBA::SystemException, ::ACSErrTypeCommon::BadParameterEx);
00262 void process(Request* r);
00263 };
00264
00265 template <class R> class RequestChainContext;
00266
00267 template <class R> class ChainedRequest : public Request {
00268 private:
00269 RequestChainContext<R> *context;
00270 friend class RequestChainContext<R>;
00271 protected:
00272 virtual void complete();
00273 public:
00274 ChainedRequest() : context(NULL) {}
00275 void process(RequestChainContext<R> *icontext);
00276 virtual bool isAsync() { return false; }
00277 };
00278
00279 template <class R> class RequestChainContext {
00280 public:
00281 typedef std::deque<R*> Queue;
00282 Queue requests;
00283 private:
00284 RequestProcessorThread *rpt;
00285 R *curreq;
00286 bool inprocess;
00287
00288 bool hasAsync;
00289 bool failed;
00290 int asyncToComplete;
00291 std::set<ACSServiceType> asyncStartInProgress;
00292 bool syncPending;
00293
00294 ACE_Thread_Mutex mutex;
00295
00296 protected:
00297 virtual bool requestDone(R *request) = 0;
00298 virtual void chainDone() = 0;
00299 virtual void chainAborted() = 0;
00300 public:
00301 RequestChainContext(RequestProcessorThread *irpt) : rpt(irpt), curreq(NULL), inprocess(false),
00302 hasAsync(false), failed(false), asyncToComplete(0), asyncStartInProgress(), syncPending(false) {}
00303 virtual ~RequestChainContext() {
00304 while (!requests.empty()) {
00305 delete requests.front();
00306 requests.pop_front();
00307 }
00308 }
00309 RequestProcessorThread *getRequestProcessor() { return rpt; }
00310 void appendRequest(R *request) { requests.push_back(request); }
00311 void appendRequestOrdered(R *request) {
00312 ACSServiceType thisType = request->getDescription()->getACSService();
00313 typename Queue::iterator it = requests.begin();
00314 while (it != requests.end() && (*it)->getDescription()->getACSService() <= thisType)
00315 it++;
00316 requests.insert(it, request);
00317 }
00318 void prependRequest(R *request) { requests.push_front(request); }
00319 void prependRequestOrdered(R *request) {
00320 ACSServiceType thisType = request->getDescription()->getACSService();
00321 typename Queue::iterator it = requests.begin();
00322 while (it != requests.end() && (*it)->getDescription()->getACSService() > thisType)
00323 it++;
00324 requests.insert(it, request);
00325 }
00326 void proceed(R *lastreq = NULL);
00327 };
00328
00329
00330
00331 class ACSServiceRequestChainContext;
00332 class ACSDaemonContext;
00333
00334 class ACSServiceRequestDescription {
00335 private:
00336 ACSServiceType service;
00337 int instance_number;
00338 const char *host, *name, *corbalocName, *domain, *cdbxmldir;
00339 bool loadir, wait, recovery, async;
00340 AcsDaemonUtils m_daemonUtils;
00341
00342 ACE_CString prepareCommand(ACSDaemonContext *context, ACSServiceRequestType request_type, bool log);
00343
00344 public:
00345 ACSServiceRequestDescription(ACSServiceType iservice, int iinstance_number);
00346 ACSServiceRequestDescription(const ACSServiceRequestDescription &desc);
00347 ~ACSServiceRequestDescription();
00348 ACSErr::Completion_var executeLocal(ACSDaemonContext *context, ACSServiceRequestType request_type);
00349 ACSErr::Completion_var executeRemote(ACSDaemonContext *context, ACSServiceRequestType request_type, CORBA::ORB_ptr orb, acsdaemon::DaemonCallback_ptr cbptr, const char *corbaloc);
00350 void setFromXMLAttributes(const char **atts);
00351 void setName(const char *iname) { name = iname == NULL ? NULL : strdup(iname); }
00352 void setCorbalocName(const char *iname) { corbalocName = iname == NULL ? NULL : strdup(iname); }
00353 void setDomain(const char *idomain) { domain = idomain == NULL ? NULL : strdup(idomain); }
00354 void setLoadIR(bool iloadir) { loadir = iloadir; }
00355 void setWaitLoadIR(bool iwait) { wait = iwait; }
00356 void setRecovery(bool irecovery) { recovery = irecovery; }
00357 void setCdbXMLDir(const char *icdbxmldir) { cdbxmldir = icdbxmldir == NULL ? NULL : strdup(icdbxmldir); }
00358 int getInstanceNumber() { return instance_number; }
00359 const char *getName() { return name; }
00360 const char *getCorbalocName() { return corbalocName; }
00361 const char *getHost() { return host == NULL ? ACSPorts::getIP() : host; }
00362 ACSServiceType getACSService() { return service; }
00363 const char *getACSServiceName() { return acsServices[service].xmltag; }
00364 bool isAsync() { return async; }
00365 const ACSServiceType* getDependentService() { return acsServices[service].depententService; }
00366 };
00367
00368
00369 class ACSServiceRequest : public ChainedRequest<ACSServiceRequest>, POA_acsdaemon::DaemonCallback {
00370 private:
00371 ACSDaemonContext *context;
00372 ACSServiceRequestTarget target;
00373 ACSServiceRequestType request_type;
00374 ACSServiceRequestDescription *desc;
00375 acsdaemon::DaemonCallback_var callback;
00376 const ACSErr::Completion *completion;
00377 acsdaemon::DaemonCallback_var cbvar;
00378 acsdaemon::DaemonCallback_ptr cbptr();
00379 void release();
00380 protected:
00381 void complete();
00382 void abort();
00383 bool execute();
00384 public:
00385 ACSServiceRequest(ACSDaemonContext *icontext, ACSServiceRequestTarget itarget, ACSServiceRequestType itype, ACSServiceRequestDescription *idesc, acsdaemon::DaemonCallback_ptr icallback = NULL);
00386 ~ACSServiceRequest();
00387 void done(const ::ACSErr::Completion &comp);
00388 void working(const ::ACSErr::Completion &comp);
00389 const ACSErr::Completion *getCompletion() { return completion; }
00390 bool isErrorFree() { return completion == NULL || completion->previousError.length() == 0; }
00391 ACSServiceRequestTarget getRequestTarget() { return target; }
00392 ACSServiceRequestType getRequestType() { return request_type; }
00393 ACSServiceRequestDescription *getDescription() { return desc; }
00394 const char *getACSServiceName() { return desc->getACSServiceName(); }
00395 int getInstanceNumber() { return desc->getInstanceNumber(); }
00396 const char *getHost() { return desc->getHost(); }
00397 virtual bool isAsync() { return desc->isAsync(); }
00398 };
00399
00400 class ACSServiceRequestChainContext : public RequestChainContext<ACSServiceRequest> {
00401 private:
00402 ACSDaemonContext *context;
00403 ACSServiceRequestType request_type;
00404 bool reuse_services;
00405 acsdaemon::DaemonSequenceCallback_var callback;
00406 int instance_number;
00407 bool free_instance;
00408 protected:
00409 bool requestDone(ACSServiceRequest *request);
00410 void chainDone();
00411 void chainAborted();
00412 public:
00413 ACSServiceRequestChainContext(ACSDaemonContext *icontext, ACSServiceRequestType itype, bool ireuse_services, acsdaemon::DaemonSequenceCallback_ptr icallback);
00414 ~ACSServiceRequestChainContext();
00415 void addRequest(const char *iservice, const char **atts);
00416 void startProcessing() { proceed(); }
00417 };
00418
00419
00420 class AsyncRequestThreadPool : public ACE_Task_Base, public Logging::Loggable
00421 {
00422 public:
00423
00424 static void configure(const char* processName, LoggingProxy *log, int threads)
00425 {
00426 ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
00427 process_name = processName;
00428 logger = log;
00429 conf_threads = threads;
00430 }
00431
00432 static ACE_CString getProcessName()
00433 {
00434 ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
00435 return process_name;
00436 }
00437
00438
00439 static AsyncRequestThreadPool* getInstance()
00440 {
00441 ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
00442 if (!instance_)
00443 instance_ = new AsyncRequestThreadPool(conf_threads);
00444 return instance_;
00445 }
00446
00447 static void destroy()
00448 {
00449 ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
00450 if (instance_)
00451 {
00452 instance_->shutdown();
00453 delete instance_;
00454 instance_ = 0;
00455 }
00456 }
00457
00458 virtual int svc (void);
00459
00460 int enqueue (ACE_Method_Request *request);
00461
00462 void shutdown();
00463
00464 private:
00465
00466 AsyncRequestThreadPool (int n_threads = 1);
00467
00468 static ACE_Thread_Mutex mutex_;
00469 static int conf_threads;
00470 static ACE_CString process_name;
00471 static LoggingProxy *logger;
00472
00473 static AsyncRequestThreadPool* instance_;
00474
00475 ACE_Activation_Queue activation_queue_;
00476
00477 int m_threads;
00478 };
00479
00480 #endif
00481