T - See base class.public class NCSubscriber<T extends org.omg.CORBA.portable.IDLEntity> extends AcsEventSubscriberImplBase<T> implements ReconnectableParticipant
AcsEventSubscriber interface.
This class is used to receive events asynchronously from notification channel suppliers.
It is the replacement of alma.acs.nc.Consumer, and to keep things simple it no longer
supports the inheritance mode, but instead supports type-safe delegation of incoming calls to
a user-supplied handler.
The lifecycle steps are:
Filter objects getting attached to the
proxy supplier, see AcsEventSubscriberImplBase.addSubscription(alma.acs.nc.AcsEventSubscriber.Callback). AcsEventSubscriberImplBase.addSubscription(alma.acs.nc.AcsEventSubscriber.Callback))
and/or for all events (AcsEventSubscriberImplBase.addGenericSubscription(alma.acs.nc.AcsEventSubscriber.GenericCallback)) can be registered.
AcsEventSubscriberImplBase.startReceivingEvents() is called, Corba NCs push events to this class, which delegates
the events to the registered handlers.
| Modifier and Type | Class and Description |
|---|---|
static class |
NCSubscriber.AdminReuseCompatibilityHack
Encapsulates the hack of using a dummy ProxyType.PUSH_ANY proxy to mark a shared consumer admin used by NCSubscriber
(or other next-gen subscribers), to distinguish it from non-reusable (subscriber-owned) admin objects
as they are used by the old-generation subscribers.
|
AcsEventSubscriber.Callback<U>, AcsEventSubscriber.GenericCallback| Modifier and Type | Field and Description |
|---|---|
protected alma.acs.nc.AnyAide |
anyAide
Helper class used to manipulate CORBA anys.
|
protected boolean |
autoreconnect
Auto reconnect attribute.
|
protected EventChannel |
channel
There can be only one notification channel for any given subscriber.
|
protected java.lang.String |
channelName
The channel has exactly one name registered in the CORBA Naming Service.
|
protected java.lang.String |
channelNotifyServiceDomainName
The channel notification service domain name, can be
null. |
protected java.util.HashMap<java.lang.String,java.lang.Double> |
handlerTimeoutMap
Maps event names to the maximum amount of time allowed for receiver
methods to complete.
|
protected Helper |
helper
Provides access to the notify service and CDB, creates NCs, etc
|
protected static int |
PROXIES_PER_ADMIN
Maximum number of proxies (subscribers) per admin object.
|
protected StructuredProxyPushSupplier |
proxySupplier
The supplier proxy we are connected to.
|
protected ConsumerAdmin |
sharedConsumerAdmin
The consumer admin object attached to the NC,
which is used by subscribers to get a reference to the structured supplier proxy.
|
protected java.util.Map<java.lang.String,java.lang.Integer> |
subscriptionsFilters
Contains a list of the added and removed subscription filters applied.
|
clientName, EVENT_QUEUE_CAPACITY, eventType, genericReceiver, logger, processTimeLogRepeatGuard, receivers, services, stateMachine, stateMachineSignalDispatcher| Constructor and Description |
|---|
NCSubscriber(java.lang.String channelName,
java.lang.String channelNotifyServiceDomainName,
ContainerServicesBase services,
org.omg.CosNaming.NamingContext namingService,
java.lang.String clientName,
java.lang.Class<T> eventType)
Creates a new instance of NCSubscriber.
|
| Modifier and Type | Method and Description |
|---|---|
protected int |
addFilter(java.lang.String eventTypeName)
This method manages the filtering capabilities used to control subscriptions.
|
protected void |
createConnectionAction(EventDispatcher evtDispatcher,
ErrorReporter errRep,
SCInstance scInstance,
java.util.Collection<TriggerEvent> derivedEvents)
Subclass may override, but must call super.createConnectionAction().
|
protected void |
createEnvironmentAction(EventDispatcher evtDispatcher,
ErrorReporter errRep,
SCInstance scInstance,
java.util.Collection<TriggerEvent> derivedEvents)
Subclass may override, but must call super.createEnvironmentAction().
|
protected void |
destroyConnectionAction(EventDispatcher evtDispatcher,
ErrorReporter errRep,
SCInstance scInstance,
java.util.Collection<TriggerEvent> derivedEvents)
Handler for "destroyConnection" state machine action.
|
protected void |
destroyEnvironmentAction(EventDispatcher evtDispatcher,
ErrorReporter errRep,
SCInstance scInstance,
java.util.Collection<TriggerEvent> derivedEvents)
Subclass may override, but must call super.destroyEnvironmentAction().
|
void |
disconnect_structured_push_consumer()
ACS does not provide an implementation of this method.
|
protected java.lang.String |
getFilterLanguage()
This method returns a string to the type of filter constraint language to
be used for filtering events, which is normally equivalent to
acsnc::FILTER_LANGUAGE_NAME (ETCL = Extended Trader Constraint Language).
|
protected double |
getMaxProcessTimeSeconds(java.lang.String eventName)
Gets the configured (or default) max time that a receiver may take to process an event,
regardless of the actual event rate.
|
protected java.lang.String |
getNotificationFactoryName()
This method returns the notify service name as registered with the CORBA
Naming Service.
|
protected boolean |
isTraceEventsEnabled() |
protected void |
logEventProcessingTimeExceeded(java.lang.String eventName,
long logOcurrencesNumber)
Logs the error that event processing time was exceeded.
|
protected void |
logEventProcessingTooSlowForEventRate(long numEventsDiscarded,
java.lang.String eventName)
Logs the error that the receiver cannot keep up with the actual event rate,
in spite of the small event buffering done.
|
protected void |
logEventReceiveHandlerException(java.lang.String eventName,
java.lang.String receiverClassName,
java.lang.Throwable thr)
Logs an exception thrown by an event handler (user code).
|
protected void |
logNoEventReceiver(java.lang.String eventName)
Logs or ignores the fact that an event was received for which no receiver could be found.
|
protected void |
logQueueShutdownError(int timeoutMillis,
int remainingEvents)
Logs the error that the local event buffer could not be emptied before shutting down the subscriber.
|
protected void |
notifyFirstSubscription(java.lang.Class<?> structClass)
Adds a filter on the server-side supplier proxy that lets the given event type pass through.
|
protected void |
notifyNoSubscription() |
protected void |
notifySubscriptionRemoved(java.lang.Class<?> structClass) |
void |
offer_change(EventType[] added,
EventType[] removed)
ACS does not provide an implementation of this method.
|
protected boolean |
push_structured_event_called(StructuredEvent structuredEvent)
Users can override this method to get notified of raw events, for additional statistics,
to handle event data given as a sequence of IDL structs (exceptional case in acssamp),
or for DynAny access (eventGUI).
|
void |
push_structured_event(StructuredEvent structuredEvent)
This method is called by the notification channel (supplier proxy) each time an event is received.
|
void |
reconnect(EventChannelFactory ecf) |
protected void |
resumeAction(EventDispatcher evtDispatcher,
ErrorReporter errRep,
SCInstance scInstance,
java.util.Collection<TriggerEvent> derivedEvents)
Subclass may override, but must call super.resumeAction().
|
void |
setAutoreconnect(boolean autoreconnect)
This method sets the attribute autoreconnect which is used to determine if
a reconnection to the channel must be done when the worker thread that checks
the connection detects that's been lost.
|
boolean |
setConnectionCheckerFreq(int connectionCheckerFreq)
Frequency in seconds at which the connection status will be checked
|
boolean |
setEventReceptionTimeout(int eventReceptionTimeout)
Time to wait in seconds after the last received event to consider that the consumer is not receiving events.
|
protected void |
suspendAction(EventDispatcher evtDispatcher,
ErrorReporter errRep,
SCInstance scInstance,
java.util.Collection<TriggerEvent> derivedEvents)
Subclass may override, but must call super.suspendAction().
|
addGenericSubscription, addSubscription, disconnect, execute, getLifecycleState, getNumberOfReceivers, hasGenericReceiver, isDisconnected, isSuspended, processEvent, processEventAsync, removeGenericSubscription, removeSubscription, resume, startReceivingEvents, suspendprotected final Helper helper
protected EventChannel channel
protected final java.lang.String channelName
protected final java.lang.String channelNotifyServiceDomainName
null.protected ConsumerAdmin sharedConsumerAdmin
null when the subscriber is not connected to a NC.
The TAO extensions would allow us to set a meaningful name for the admin object, but it
still does not get used as the ID, but as a separate name field.
You can get the consumer admin object ID from here, see ConsumerAdmin#MyID().
(In the NC spec, it says "The MyID attribute is a readonly attribute that maintains
the unique identifier of the target ConsumerAdmin instance, which is assigned to it
upon creation by the Notification Service event channel.) It is an integer type, which makes
it necessarily different from the name used with the TAO extensions.
We try to reuse an admin object for a limited number of subscribers, to not allocate a new thread in the notification service for every subscriber but instead get a flexible thread::subscriber mapping.
PROXIES_PER_ADMINprotected static final int PROXIES_PER_ADMIN
sharedConsumerAdmin,
Constant Field Valuesprotected StructuredProxyPushSupplier proxySupplier
protected alma.acs.nc.AnyAide anyAide
protected final java.util.HashMap<java.lang.String,java.lang.Double> handlerTimeoutMap
protected final java.util.Map<java.lang.String,java.lang.Integer> subscriptionsFilters
protected boolean autoreconnect
public NCSubscriber(java.lang.String channelName,
java.lang.String channelNotifyServiceDomainName,
ContainerServicesBase services,
org.omg.CosNaming.NamingContext namingService,
java.lang.String clientName,
java.lang.Class<T> eventType)
throws AcsJException
channelName - Subscribe to events on this channel registered in the CORBA
Naming Service. If the channel does not exist, it's
registered.channelNotifyServiceDomainName - Channel domain name, which is being used to determine the
notification service that should host the NC.
Passing null results in the default notify service "NotifyEventChannelFactory" being used.services - To get ACS logger, access to the CDB, etc.namingService - Must be passed explicitly, instead of the old hidden approach via ORBInitRef.NameService property.clientName - eventType - Our type parameter, either IDLEntity as base type or a concrete IDL-defined struct.AcsJException - Thrown on any really bad error conditions encountered.protected void createEnvironmentAction(EventDispatcher evtDispatcher,
ErrorReporter errRep,
SCInstance scInstance,
java.util.Collection<TriggerEvent> derivedEvents)
throws AcsJStateMachineActionEx
AcsEventSubscriberImplBasecreateEnvironmentAction in class AcsEventSubscriberImplBase<T extends org.omg.CORBA.portable.IDLEntity>AcsJStateMachineActionExprotected void destroyEnvironmentAction(EventDispatcher evtDispatcher,
ErrorReporter errRep,
SCInstance scInstance,
java.util.Collection<TriggerEvent> derivedEvents)
throws AcsJStateMachineActionEx
AcsEventSubscriberImplBasedestroyEnvironmentAction in class AcsEventSubscriberImplBase<T extends org.omg.CORBA.portable.IDLEntity>AcsJStateMachineActionExprotected void createConnectionAction(EventDispatcher evtDispatcher,
ErrorReporter errRep,
SCInstance scInstance,
java.util.Collection<TriggerEvent> derivedEvents)
throws AcsJStateMachineActionEx
AcsEventSubscriberImplBasecreateConnectionAction in class AcsEventSubscriberImplBase<T extends org.omg.CORBA.portable.IDLEntity>AcsJStateMachineActionExprotected void destroyConnectionAction(EventDispatcher evtDispatcher,
ErrorReporter errRep,
SCInstance scInstance,
java.util.Collection<TriggerEvent> derivedEvents)
throws AcsJStateMachineActionEx
AcsEventSubscriberImplBaseShuts down the event queue. Queued events may still be processed by the receivers afterwards, but here we wait for up to 500 ms to log it if it is the case.
Further events delivered to AcsEventSubscriberImplBase.processEventAsync(Object, EventDescription)
will cause an exception there.
Subclass may override, but must call super.destroyConnectionAction().
destroyConnectionAction in class AcsEventSubscriberImplBase<T extends org.omg.CORBA.portable.IDLEntity>AcsJStateMachineActionExprotected void suspendAction(EventDispatcher evtDispatcher,
ErrorReporter errRep,
SCInstance scInstance,
java.util.Collection<TriggerEvent> derivedEvents)
throws AcsJStateMachineActionEx
AcsEventSubscriberImplBasesuspendAction in class AcsEventSubscriberImplBase<T extends org.omg.CORBA.portable.IDLEntity>AcsJStateMachineActionExprotected void resumeAction(EventDispatcher evtDispatcher,
ErrorReporter errRep,
SCInstance scInstance,
java.util.Collection<TriggerEvent> derivedEvents)
throws AcsJStateMachineActionEx
AcsEventSubscriberImplBaseresumeAction in class AcsEventSubscriberImplBase<T extends org.omg.CORBA.portable.IDLEntity>AcsJStateMachineActionExprotected boolean isTraceEventsEnabled()
isTraceEventsEnabled in class AcsEventSubscriberImplBase<T extends org.omg.CORBA.portable.IDLEntity>true if events should be logged when they are received.protected void logEventReceiveHandlerException(java.lang.String eventName,
java.lang.String receiverClassName,
java.lang.Throwable thr)
AcsEventSubscriberImplBaselogEventReceiveHandlerException in class AcsEventSubscriberImplBase<T extends org.omg.CORBA.portable.IDLEntity>protected void logEventProcessingTimeExceeded(java.lang.String eventName,
long logOcurrencesNumber)
AcsEventSubscriberImplBaselogEventProcessingTimeExceeded in class AcsEventSubscriberImplBase<T extends org.omg.CORBA.portable.IDLEntity>protected void logEventProcessingTooSlowForEventRate(long numEventsDiscarded,
java.lang.String eventName)
AcsEventSubscriberImplBaselogEventProcessingTooSlowForEventRate in class AcsEventSubscriberImplBase<T extends org.omg.CORBA.portable.IDLEntity>numEventsDiscarded - The number of events that have actually been discarded since the last log.
Will often be 0 when we just warn about the queue filling up.protected void logNoEventReceiver(java.lang.String eventName)
AcsEventSubscriberImplBaseThe subclass must know whether such a condition is expected or not, e.g. because event filtering is set up outside of the subscriber and only subscribed event types are expected to arrive.
logNoEventReceiver in class AcsEventSubscriberImplBase<T extends org.omg.CORBA.portable.IDLEntity>protected void logQueueShutdownError(int timeoutMillis,
int remainingEvents)
AcsEventSubscriberImplBaselogQueueShutdownError in class AcsEventSubscriberImplBase<T extends org.omg.CORBA.portable.IDLEntity>protected double getMaxProcessTimeSeconds(java.lang.String eventName)
AcsEventSubscriberImplBasegetMaxProcessTimeSeconds in class AcsEventSubscriberImplBase<T extends org.omg.CORBA.portable.IDLEntity>protected void notifyFirstSubscription(java.lang.Class<?> structClass)
throws AcsJEventSubscriptionEx
Note that we derive the event type name from the simple class name of struct,
as done in other parts of ACS, which requires IDL event structs to have globally unique names
across IDL name spaces.
If structClass is null (generic subscription),
then "*" is used as the event type name,
which in ETCL is understood as a wildcard for all event type names.
notifyFirstSubscription in class AcsEventSubscriberImplBase<T extends org.omg.CORBA.portable.IDLEntity>structClass - AcsJEventSubscriptionExprotected void notifySubscriptionRemoved(java.lang.Class<?> structClass)
throws AcsJEventSubscriptionEx
notifySubscriptionRemoved in class AcsEventSubscriberImplBase<T extends org.omg.CORBA.portable.IDLEntity>AcsJEventSubscriptionExprotected void notifyNoSubscription()
notifyNoSubscription in class AcsEventSubscriberImplBase<T extends org.omg.CORBA.portable.IDLEntity>protected int addFilter(java.lang.String eventTypeName)
throws AcsJCORBAProblemEx
A constraint evaluates to true when both of the following conditions are true: A member of the constraint's EventTypeSeq matches the message's event type. The constraint expression evaluates to true.
AcsJCORBAProblemExprotected java.lang.String getNotificationFactoryName()
NotifyEventChannelFactory.protected java.lang.String getFilterLanguage()
public final void push_structured_event(StructuredEvent structuredEvent)
throws Disconnected
It is declared final because it is crucial for the functioning of the NC library
and thus cannot be overwritten by a subclass.
If for special purposes a notification of raw event reception is needed,
a subclass can implement push_structured_event_called(StructuredEvent), which gets called from this
method as the first thing it does.
structuredEvent - The structured event sent by a supplier.Disconnected - If this subscriber is disconnected from the NC.
See NC spec 3.3.7.1: "if the invocation of push_structured_event upon a StructuredPushConsumer instance
by a StructuredProxyPushSupplier instance results in the Disconnected exception being raised,
the StructuredProxyPushSupplier will invoke its own disconnect_structured_push_supplier operation,
resulting in the destruction of that StructuredProxyPushSupplier instance."
This serves only as a backup mechanism, since normally we explicitly disconnect the subscriber.org.omg.CosNotifyComm.StructuredPushConsumerOperations#push_structured_event(org.omg.CosNotification.StructuredEvent)protected boolean push_structured_event_called(StructuredEvent structuredEvent)
Usually this method should not be overridden.
structuredEvent - true if normal event processing should continue,
false if NCSubscriber should not process this event.public void disconnect_structured_push_consumer()
org.omg.CORBA.NO_IMPLEMENTorg.omg.CosNotifyComm.StructuredPushConsumerOperations#disconnect_structured_push_consumer()public void offer_change(EventType[] added,
EventType[] removed)
throws InvalidEventType
org.omg.CORBA.NO_IMPLEMENTInvalidEventTypeorg.omg.CosNotifyComm.NotifyPublishOperations#offer_change(org.omg.CosNotification.EventType[],
org.omg.CosNotification.EventType[])public void reconnect(EventChannelFactory ecf)
reconnect in interface ReconnectableParticipantecf - The new EventChannelFactory reference as delivered by the NotifyService's callback.alma.acs.nc.ReconnectableParticipant#reconnect(gov.sandia.NotifyMonitoringExt.EventChannelFactory)public void setAutoreconnect(boolean autoreconnect)
autoreconnect - Whether autoreconneting to the channel should be donepublic boolean setEventReceptionTimeout(int eventReceptionTimeout)
public boolean setConnectionCheckerFreq(int connectionCheckerFreq)