• Classes
  • Modules
  • Namespaces
  • Files
  • Related Pages
  • File List
  • File Members

acsDaemonImpl.h

Go to the documentation of this file.
00001 #ifndef _ACS_DAEMON_IMPL_H_
00002 #define _ACS_DAEMON_IMPL_H_
00003 
00004 /*******************************************************************************
00005 *    ALMA - Atacama Large Millimiter Array
00006 *    (c) European Southern Observatory, 2002
00007 *    Copyright by ESO (in the framework of the ALMA collaboration)
00008 *    and Cosylab 2002, All rights reserved
00009 *
00010 *    This library is free software; you can redistribute it and/or
00011 *    modify it under the terms of the GNU Lesser General Public
00012 *    License as published by the Free Software Foundation; either
00013 *    version 2.1 of the License, or (at your option) any later version.
00014 *
00015 *    This library is distributed in the hope that it will be useful,
00016 *    but WITHOUT ANY WARRANTY; without even the implied warranty of
00017 *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018 *    Lesser General Public License for more details.
00019 *
00020 *    You should have received a copy of the GNU Lesser General Public
00021 *    License along with this library; if not, write to the Free Software
00022 *    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
00023 *
00024 * "@(#) $Id: acsDaemonImpl.h,v 1.12 2012/05/15 09:06:34 msekoran Exp $"
00025 *
00026 * who       when      what
00027 * --------  --------  ----------------------------------------------
00028 * msekoran 2006-06-21 created 
00029 * agrimstr 2007-11-07 refactored common daemon implementation code to
00030 *                     template classes
00031 */
00032 
00033 #ifndef __cplusplus
00034 #error This is a C++ include file and cannot be used from plain C
00035 #endif
00036 
00037 #include "acsdaemonS.h"
00038 #include <ace/SString.h>
00039 #include "logging.h"
00040 #include <getopt.h>
00041 #include <acsutilPorts.h>
00042 #include <tao/IORTable/IORTable.h>
00043 #include <acserr.h>
00044 #include <acsdaemonErrType.h>
00045 #include <ACSErrTypeCommon.h>
00046 #include <acsutilTempFile.h>
00047 
00048 #include "acsdaemonORBTask.h"
00049 
00050 // thread-pool
00051 #include "acsRequest.h"
00052 
00059 template <typename T>
00060 class ACSDaemonServiceImpl {
00061 
00062   public:
00063     
00067     ACSDaemonServiceImpl(LoggingProxy &logProxy, bool isProtected, int nbServiceThreads);
00068   
00072     virtual ~ACSDaemonServiceImpl();
00073     
00077     bool 
00078     isInitialized() { return m_isInitialized; }
00079     
00083     bool 
00084     isProtected() { return m_isProtected; }
00085     
00086 
00090     std::string
00091     getPort() { return handler.getPort(); }
00092 
00096     std::string
00097     getName() { return handler.getName(); }
00098 
00102     int
00103     startup (int argc, char *argv[]);
00104 
00109     int 
00110     run ();
00111 
00115     void
00116     shutdown (bool wait_for_completition); 
00117 
00121     const char*
00122     getIOR() const { return m_ior.in(); };
00123     
00124   protected:
00128     virtual int 
00129     init_ORB (int& argc, char *argv []);
00130 
00131     //--Common data members-------------------------------------
00132 
00134     bool m_isInitialized;
00135 
00137     bool m_isProtected;
00138 
00140     bool m_blockTermination;
00141 
00143     CORBA::ORB_var m_orb;
00144 
00146     LoggingProxy &m_logProxy;
00147 
00149     CORBA::String_var m_ior;
00150 
00152     T handler;
00153     
00155     int m_serverThreads;
00156 };
00157 
00158 template <typename T>
00159 ACSDaemonServiceImpl<T>::ACSDaemonServiceImpl (LoggingProxy &logProxy, bool isProtected, int nbServerThreads) :
00160     m_isInitialized(false), m_logProxy(logProxy)
00161 {
00162     // noop here
00163 
00164     m_isInitialized = true;
00165 
00166     m_isProtected = isProtected;
00167 
00168     m_blockTermination = false;
00169     
00170     m_serverThreads = nbServerThreads;
00171 
00172     ACS_CHECK_LOGGER;
00173     // daemon is a standalone process, replace global logger with named logger
00174     Logging::Logger::setGlobalLogger(getNamedLogger(handler.getName()));
00175 
00176     handler.setService(this);
00177 }
00178 
00179 template <typename T>
00180 ACSDaemonServiceImpl<T>::~ACSDaemonServiceImpl (void)
00181 {
00182 }
00183 
00184 template <typename T>
00185 int ACSDaemonServiceImpl<T>::startup (int argc, char *argv[])
00186 {
00187     ACS_SHORT_LOG ((LM_INFO, "Starting up the %s...", handler.getName().c_str()));
00188 
00189     // Initalize the ORB.
00190     if (init_ORB (argc, argv) != 0)
00191         {
00192         return -1;
00193         }
00194 
00195     // Initialize AES.
00196     if (!ACSError::init(m_orb.in()))
00197         {
00198         ACS_SHORT_LOG ((LM_ERROR, "Failed to initalize the ACS Error System."));
00199         return -1;
00200         }
00201 
00202     ACS_SHORT_LOG ((LM_INFO, "%s is initialized.", handler.getName().c_str()));
00203 
00204     return 0;
00205 }
00206 
00207 template <typename T>
00208 int ACSDaemonServiceImpl<T>::run (void)
00209 {
00210     ACS_SHORT_LOG ((LM_INFO, "%s is up and running...", handler.getName().c_str()));
00211 
00212   
00213     try
00214         {
00215         handler.initialize(m_orb.in());
00216 //      this->m_orb->run ();
00217 
00218       // constrcut CORBA ORB request handler task
00219       ORBTask task (this->m_orb.in(), &m_logProxy);
00220 
00221       // activate task, i.e. spawn threads and handle requests
00222       if (task.activate (THR_NEW_LWP | THR_JOINABLE, m_serverThreads) == 0)
00223           // wait until CORBA ORB is shutdown or destroyed
00224           task.thr_mgr()->wait ();
00225       else
00226         {
00227           // failed to spawn threads
00228           ACS_LOG(LM_RUNTIME_CONTEXT, "ACSDaemonServiceImpl<T>::run", (LM_INFO, "Failed to activate CORBA ORB request handler threads..."));
00229           return -1;
00230         }
00231 
00232         }
00233     catch(...)
00234         {
00235         return -1;
00236         }
00237 
00238     return 0;
00239 }
00240 
00241 template <typename T>
00242 void ACSDaemonServiceImpl<T>::shutdown (bool wait_for_completition)
00243 {
00244     if (!m_blockTermination)
00245     {
00246         ACS_SHORT_LOG ((LM_INFO, "Stopping the %s...", this->getName().c_str()));
00247         m_blockTermination=true;
00248 
00249         AsyncRequestThreadPool::destroy();
00250 
00251         // shutdown the ORB.
00252         if (!CORBA::is_nil (m_orb.in ()))
00253         {
00254             handler.dispose(m_orb.in());
00255             this->m_orb->shutdown (wait_for_completition);
00256         }
00257         // shutdown AES
00258         ACSError::done();
00259     }
00260 }
00261 
00262 template <typename T>
00263 int ACSDaemonServiceImpl<T>::init_ORB  (int& argc, char *argv [])
00264 {
00265     m_orb = CORBA::ORB_init(argc, argv, "TAO");
00266 
00267     try
00268         {
00269         // get a reference to the RootPOA
00270         CORBA::Object_var obj = m_orb->resolve_initial_references("RootPOA");
00271         PortableServer::POA_var root_poa = PortableServer::POA::_narrow(obj.in());
00272         PortableServer::POAManager_var poa_manager = root_poa->the_POAManager();
00273       
00274         // create policies
00275         CORBA::PolicyList policy_list;
00276         policy_list.length(5);
00277         policy_list[0] = root_poa->create_request_processing_policy(PortableServer::USE_DEFAULT_SERVANT);
00278         policy_list[1] =  root_poa->create_id_uniqueness_policy(PortableServer::MULTIPLE_ID);
00279         policy_list[2] = root_poa->create_id_assignment_policy(PortableServer::USER_ID); 
00280         policy_list[3] = root_poa->create_servant_retention_policy(PortableServer::NON_RETAIN); 
00281         policy_list[4] =  root_poa->create_lifespan_policy(PortableServer::PERSISTENT);
00282       
00283         // create a ACSDaemon POA with policies 
00284         PortableServer::POA_var poa = root_poa->create_POA(handler.getType().c_str(), poa_manager.in(), policy_list);
00285 
00286         // destroy policies
00287         for (CORBA::ULong i = 0; i < policy_list.length(); ++i)
00288             {
00289             CORBA::Policy_ptr policy = policy_list[i];
00290             policy->destroy();
00291             }
00292 
00293         // set as default servant
00294         poa->set_servant(&handler);
00295 
00296         // create reference
00297         PortableServer::ObjectId_var oid = PortableServer::string_to_ObjectId(handler.getType().c_str());
00298         obj = poa->create_reference_with_id (oid.in(), handler._interface_repository_id());
00299         m_ior = m_orb->object_to_string(obj.in());
00300 
00301         // bind to IOR table
00302         CORBA::Object_var table_object = m_orb->resolve_initial_references("IORTable");
00303         IORTable::Table_var adapter = IORTable::Table::_narrow(table_object.in());
00304       
00305         if (CORBA::is_nil(adapter.in()))
00306             {
00307             ACS_SHORT_LOG ((LM_ERROR, "Nil IORTable"));
00308             return -1;
00309             }
00310         else
00311             {
00312             adapter->bind(handler.getType().c_str(), m_ior.in());
00313             }
00314 
00315         // activate POA
00316         poa_manager->activate();
00317 
00318         ACS_SHORT_LOG((LM_INFO, "%s is waiting for incoming requests.", handler.getName().c_str()));
00319       
00320         }
00321     catch( CORBA::Exception &ex )
00322         {
00323         ex._tao_print_exception("EXCEPTION CAUGHT");
00324         return -1;
00325         }
00326   
00327     return 0;
00328 }
00329 
00337 template <typename T>
00338 class acsDaemonImpl
00339 {
00340 
00341   public:
00342 
00346     acsDaemonImpl(int argc, char *argv[]);
00347     
00351     ~acsDaemonImpl();
00352     
00356     void usage(const char *argv);
00357 
00361     int run();
00362 
00366     void shutdown();
00367 
00368   private:
00369 
00371     ACSDaemonServiceImpl<T> *service;
00372 
00374     ACE_CString iorFile;
00375 
00377     ACE_CString ORBEndpoint;
00378     
00380     int m_serverThreads;
00381 
00383     int nargc;
00384     char** nargv;
00385     
00387     LoggingProxy *m_logger;
00388 };
00389 
00390 
00392 static struct option long_options[] = {
00393     {"help",        no_argument,       0, 'h'},
00394     {"outfile",     required_argument, 0, 'o'},
00395     {"ORBEndpoint", required_argument, 0, 'O'},
00396     {"unprotected", no_argument,       0, 'u'},
00397     {"nthreads",    required_argument, 0, 'n'},
00398     {0, 0, 0, '\0'}};
00399 
00400 template <typename T>
00401 void acsDaemonImpl<T>::usage(const char *argv)
00402 {
00403     ACE_OS::printf ("\n\tusage: %s {-h} [-O iiop://ip:port] [-o iorfile]\n", argv);
00404     ACE_OS::printf ("\t   -h, --help         show this help message\n");
00405     ACE_OS::printf ("\t   -O, --ORBEndpoint  ORB end point\n");
00406     ACE_OS::printf ("\t   -o, --outfile      IOR output file\n");
00407     ACE_OS::printf ("\t   -u, --unprotected  start in unprotected mode\n");
00408     ACE_OS::printf ("\t   -n, --nthreads     number of threads to handle CORBA requests\n");
00409 }
00410 
00411 template <typename T>
00412 acsDaemonImpl<T>::acsDaemonImpl(int argc, char *argv[])
00413 {
00414     nargc = 0;
00415     nargv = 0;
00416     service = 0;
00417     m_logger = 0;
00418     bool unprotected = false;
00419     m_serverThreads = 5;
00420 
00421     // Extract and validate command line arguments
00422     int c;
00423     for(;;)
00424         {
00425         int option_index = 0;
00426         c = getopt_long (argc, argv, "ho:O:un:",
00427                          long_options, &option_index); 
00428         if (c==-1) break;
00429         switch(c)
00430             {
00431                 case 'h':
00432                     usage(argv[0]);
00433                     return;
00434                 case 'o':
00435                     iorFile = optarg;
00436                     break;
00437                 case 'O':
00438                     ORBEndpoint = optarg;
00439                     break;
00440                 case 'u':
00441                     unprotected = true;
00442                     break;
00443                 case 'n':
00444                     m_serverThreads = atoi(optarg);
00445                     break;
00446                 default:
00447                     ACE_OS::printf("Ignoring unrecognized option %s", 
00448                                     argv[option_index]);
00449             }
00450         }
00451 
00452     // Host IP information is needed to initialize the logging system
00453     // and for ORBEndpoint creation
00454     const char* hostName = ACSPorts::getIP();
00455 
00456     // store env. var. value and disable logging to local file cache
00457     char * acsLogFileEnv = ACE_OS::getenv("ACS_LOG_FILE");
00458     ACE_CString acsLogFileValue(acsLogFileEnv);
00459     ACE_OS::setenv("ACS_LOG_FILE", "/dev/null", 1);
00460 
00461     // create logging proxy
00462     LoggingProxy::ProcessName(argv[0]);
00463     LoggingProxy::ThreadName("main");
00464     ACE_Log_Msg::instance()->local_host(hostName);
00465     // for daemons/imp we set some cache by default
00466     m_logger = new LoggingProxy (1024, 0, 31, 0);
00467     
00468     LoggingProxy::init (m_logger);  
00469 
00470     // reset ACS_LOG_FILE back
00471     if (acsLogFileEnv)
00472         ACE_OS::setenv("ACS_LOG_FILE", acsLogFileValue.c_str(), 1);
00473     else
00474     {
00475                 #define DEFAULT_LOG_FILE_NAME "acs_local_log"
00476         ACE_CString daemonsLogFileName = getTempFileName(0, DEFAULT_LOG_FILE_NAME);
00477 
00478         // replace "ACS_INSTANCE.x" with "daemons_" + <timestamp>
00479         ACE_CString daemonsDir = "daemons_" + getStringifiedTimeStamp();
00480 
00481         ACE_CString instancePart("ACS_INSTANCE.");
00482         ACE_CString::size_type pos = daemonsLogFileName.find(instancePart);
00483         daemonsLogFileName =
00484                         daemonsLogFileName.substring(0, pos) +
00485                         daemonsDir +
00486                         daemonsLogFileName.substring(pos + instancePart.length() + 1);  // +1 for skipping instance number
00487 
00488         ACE_OS::setenv("ACS_LOG_FILE", daemonsLogFileName.c_str(), 1);
00489         //ACE_OS::unsetenv("ACS_LOG_FILE"1);
00490     }
00491 
00492     AsyncRequestThreadPool::configure(argv[0], m_logger, m_serverThreads);      // threads for async
00493 
00494     // Ready the service manager
00495     service = new ACSDaemonServiceImpl<T>(*m_logger, !unprotected, m_serverThreads);
00496 
00497     // Generate the CORBA configuration for the service
00498     ACE_CString argStr;
00499     
00500     if(ORBEndpoint.length()<=0)
00501         {
00502         argStr = ACE_CString("-ORBEndpoint iiop://") + hostName + ":";
00503         argStr = argStr + service->getPort().c_str();
00504         }
00505     else
00506         {
00507         argStr = ACE_CString("-ORBEndpoint ") + ORBEndpoint;
00508         }
00509 
00510     ACS_SHORT_LOG((LM_INFO, "Command line is: %s", argStr.c_str()));
00511     ACE_OS::string_to_argv ((ACE_TCHAR*)argStr.c_str(), nargc, nargv);
00512 }
00513 
00514 template <typename T>
00515 acsDaemonImpl<T>::~acsDaemonImpl()
00516 {
00517     if (service != 0) delete service;
00518     if (m_logger != 0)
00519     {
00520         LoggingProxy::done();
00521         delete m_logger;
00522     }
00523 }
00524 
00525 
00526 template <typename T>
00527 int acsDaemonImpl<T>::run()
00528 {
00529     ACS_TRACE("acsDaemonImpl<T>::run");
00530     if (!service || !service->isInitialized())
00531         {
00532         return -1;
00533         }
00534     try
00535         {
00536         if (service->startup (nargc, nargv) != 0)
00537             {
00538             return -1;
00539             }
00540 
00541         // write IOR to file, if necessary
00542         if (iorFile.length() > 0)
00543             {
00544             FILE *output_file = ACE_OS::fopen (iorFile.c_str(), "w");
00545             if (output_file == 0) 
00546                 {
00547                 ACS_SHORT_LOG ((LM_ERROR, "Cannot open output file '%s' to write IOR.", iorFile.c_str()));
00548                 return  -1;
00549                 }
00550 
00551             int result = ACE_OS::fprintf (output_file, "%s", service->getIOR());
00552             if (result < 0) 
00553                 {
00554                 ACS_SHORT_LOG ((LM_ERROR, "ACE_OS::fprintf failed to write IOR."));
00555                 return  -1;
00556                 }
00557 
00558             ACE_OS::fclose (output_file);
00559             ACS_SHORT_LOG((LM_INFO, "%s IOR has been written into file '%s'.", service->getName().c_str(), iorFile.c_str()));
00560             }
00561 
00562         // run, run, run...
00563         if (service->run () == -1)
00564             {
00565             this->shutdown ();
00566             ACS_SHORT_LOG ((LM_ERROR, "Failed to run the %s.", service->getName().c_str()));
00567             return  1;
00568             }
00569         }
00570     catch(...)
00571         {
00572         ACS_SHORT_LOG((LM_ERROR, "Failed to start the %s.", service->getName().c_str()));
00573         return 1;
00574         }
00575   
00576 
00577     this->shutdown();
00578   
00579     ACS_SHORT_LOG ((LM_INFO, "%s stopped.", service->getName().c_str()));
00580 
00581     return 0;
00582 }
00583 
00584 template <typename T>
00585 void acsDaemonImpl<T>::shutdown()
00586 {
00587     service->shutdown(true);
00588 }
00589 
00590 #endif

Generated on Mon May 4 2015 08:27:41 for ACS-2015.4 C++ API by  doxygen 1.7.0