T - The event (base) type. If all events are of the same type then that type should be used;
otherwise a common base type for all events that may be sent on the given "channel" should be used,
such as Object or IDLEntity.public abstract class AcsEventSubscriberImplBase<T> extends java.lang.Object implements AcsEventSubscriber<T>, AcsScxmlActionExecutor<EventSubscriberAction>
We will have to see if it can also be used for DDS-based events, or if the required modifications will be too heavy.
AcsEventSubscriber.Callback<U>, AcsEventSubscriber.GenericCallback| Modifier and Type | Field and Description |
|---|---|
protected java.lang.String |
clientName
A name that identifies the client of this NCSubscriber, to be used
both for logging and also (if applicable) as the supplier proxy name so that
looking at the proxy objects of the NC we can figure out who the clients are.
|
static int |
EVENT_QUEUE_CAPACITY
Event queue should hold at least two events to avoid unnecessary scary logs about slow receivers,
but must be short enough to get receivers to actually implement their own queue and discard mechanism
instead of relying on this ACS queue which may buffer events for a limited time and thus obscure the problem.
|
protected java.lang.Class<T> |
eventType
Runtime access to the type parameter <T>.
|
protected AcsEventSubscriber.GenericCallback |
genericReceiver
Contains a generic receiver to be used by the
#addGenericSubscription() method. |
protected java.util.logging.Logger |
logger
Provides access to the ACS logging system.
|
protected MultipleRepeatGuard |
processTimeLogRepeatGuard
Contains a list of repeat guards for each different type of event.
|
protected java.util.Map<java.lang.Class<? extends T>,AcsEventSubscriber.Callback<? extends T>> |
receivers
Contains a list of receiver functions to be invoked when an event
of a particular type is received.
|
protected ContainerServicesBase |
services |
protected AcsScxmlEngine<EventSubscriberSignal,EventSubscriberAction> |
stateMachine
State machine for the subscriber lifecycle.
|
protected EventSubscriberSignalDispatcher |
stateMachineSignalDispatcher |
| Constructor and Description |
|---|
AcsEventSubscriberImplBase(ContainerServicesBase services,
java.lang.String clientName,
java.lang.Class<T> eventType)
Base class constructor, to be called from subclass ctor.
|
| Modifier and Type | Method and Description |
|---|---|
void |
addGenericSubscription(AcsEventSubscriber.GenericCallback receiver)
Subscribes to all events.
|
<U extends T> |
addSubscription(AcsEventSubscriber.Callback<U> receiver)
Adds a handler that will receive events of a specific type.
|
protected void |
createConnectionAction(EventDispatcher evtDispatcher,
ErrorReporter errRep,
SCInstance scInstance,
java.util.Collection<TriggerEvent> derivedEvents)
Subclass may override, but must call super.createConnectionAction().
|
protected void |
createEnvironmentAction(EventDispatcher evtDispatcher,
ErrorReporter errRep,
SCInstance scInstance,
java.util.Collection<TriggerEvent> derivedEvents)
Subclass may override, but must call super.createEnvironmentAction().
|
protected void |
destroyConnectionAction(EventDispatcher evtDispatcher,
ErrorReporter errRep,
SCInstance scInstance,
java.util.Collection<TriggerEvent> derivedEvents)
Handler for "destroyConnection" state machine action.
|
protected void |
destroyEnvironmentAction(EventDispatcher evtDispatcher,
ErrorReporter errRep,
SCInstance scInstance,
java.util.Collection<TriggerEvent> derivedEvents)
Subclass may override, but must call super.destroyEnvironmentAction().
|
void |
disconnect()
Disconnects this subscriber from the Notification Channel, and releases all
the resources associated with it.
|
boolean |
execute(EventSubscriberAction action,
EventDispatcher evtDispatcher,
ErrorReporter errRep,
SCInstance scInstance,
java.util.Collection<TriggerEvent> derivedEvents)
Dispatches the action enum to one of the action handler methods.
|
java.lang.String |
getLifecycleState()
Returns the lifecycle state as obtained from an internal state machine.
|
protected abstract double |
getMaxProcessTimeSeconds(java.lang.String eventName)
Gets the configured (or default) max time that a receiver may take to process an event,
regardless of the actual event rate.
|
int |
getNumberOfReceivers()
Use only for unit testing!
|
boolean |
hasGenericReceiver()
Use only for unit testing!
|
boolean |
isDisconnected() |
boolean |
isSuspended()
Returns
true if this subscriber has been suspended. |
protected abstract boolean |
isTraceEventsEnabled() |
protected abstract void |
logEventProcessingTimeExceeded(java.lang.String eventName,
long logOcurrencesNumber)
Logs the error that event processing time was exceeded.
|
protected abstract void |
logEventProcessingTooSlowForEventRate(long numEventsDiscarded,
java.lang.String eventName)
Logs the error that the receiver cannot keep up with the actual event rate,
in spite of the small event buffering done.
|
protected abstract void |
logEventReceiveHandlerException(java.lang.String eventName,
java.lang.String receiverClassName,
java.lang.Throwable thr)
Logs an exception thrown by an event handler (user code).
|
protected abstract void |
logNoEventReceiver(java.lang.String eventName)
Logs or ignores the fact that an event was received for which no receiver could be found.
|
protected abstract void |
logQueueShutdownError(int timeoutMillis,
int remainingEvents)
Logs the error that the local event buffer could not be emptied before shutting down the subscriber.
|
protected abstract void |
notifyFirstSubscription(java.lang.Class<?> structClass) |
protected abstract void |
notifyNoSubscription() |
protected abstract void |
notifySubscriptionRemoved(java.lang.Class<?> structClass) |
protected void |
processEvent(java.lang.Object eventData,
EventDescription eventDesc)
This method should be called from the subclass-specific method that receives the event,
for example
push_structured_event in case of Corba NC,
or preferably via processEventAsync(Object, EventDescription). |
protected void |
processEventAsync(java.lang.Object eventData,
EventDescription eventDesc)
Asynchronously calls
processEvent(Object, EventDescription),
using eventHandlingExecutor. |
void |
removeGenericSubscription()
Removes the generic receiver handler.
|
<U extends T> |
removeSubscription(java.lang.Class<U> structClass)
Removes the subscription for a specified event type or for all events types, so that the handler previously
registered for that event type will no longer receive events.
|
void |
resume()
Used to reenable the Subscriber after a call to the
suspend() method. |
protected void |
resumeAction(EventDispatcher evtDispatcher,
ErrorReporter errRep,
SCInstance scInstance,
java.util.Collection<TriggerEvent> derivedEvents)
Subclass may override, but must call super.resumeAction().
|
void |
startReceivingEvents()
This method must be called to actually start receiving events.
|
void |
suspend()
Used to temporarily halt receiving events of all types.
|
protected void |
suspendAction(EventDispatcher evtDispatcher,
ErrorReporter errRep,
SCInstance scInstance,
java.util.Collection<TriggerEvent> derivedEvents)
Subclass may override, but must call super.suspendAction().
|
protected final java.lang.String clientName
protected final ContainerServicesBase services
protected final java.util.logging.Logger logger
protected final AcsScxmlEngine<EventSubscriberSignal,EventSubscriberAction> stateMachine
protected final EventSubscriberSignalDispatcher stateMachineSignalDispatcher
public static final int EVENT_QUEUE_CAPACITY
eventHandlingExecutor,
Constant Field Valuesprotected AcsEventSubscriber.GenericCallback genericReceiver
#addGenericSubscription() method.protected final java.util.Map<java.lang.Class<? extends T>,AcsEventSubscriber.Callback<? extends T>> receivers
key = the event type (Java class derived from IDL struct).
value = the matching event handler.
protected final MultipleRepeatGuard processTimeLogRepeatGuard
protected final java.lang.Class<T> eventType
public AcsEventSubscriberImplBase(ContainerServicesBase services, java.lang.String clientName, java.lang.Class<T> eventType) throws AcsJException
Normally an ACS class such as container services will act as the factory for event subscriber objects, but for exceptional cases it is also possible to create one stand-alone, as long as the required parameters can be provided.
services - To get ACS logger, access to the CDB, etc.clientName - A name that identifies the client of this NCSubscriber.
TODO: Check if we still need this name to be specified separately from services#getName().AcsJException - Thrown on any really bad error conditions encountered.protected abstract boolean isTraceEventsEnabled()
true if events should be logged when they are received.public boolean execute(EventSubscriberAction action, EventDispatcher evtDispatcher, ErrorReporter errRep, SCInstance scInstance, java.util.Collection<TriggerEvent> derivedEvents) throws AcsJStateMachineActionEx
The overhead of implementing this method is the price we pay for avoiding reflection and allowing flexible action implementation in the SM design.
execute in interface AcsScxmlActionExecutor<EventSubscriberAction>action was recognized.AcsJStateMachineActionExalma.acs.nc.sm.generic.AcsScxmlActionExecutor#execute(java.lang.Enum, org.apache.commons.scxml.EventDispatcher, org.apache.commons.scxml.ErrorReporter, org.apache.commons.scxml.SCInstance, java.util.Collection)protected void createEnvironmentAction(EventDispatcher evtDispatcher,
ErrorReporter errRep,
SCInstance scInstance,
java.util.Collection<TriggerEvent> derivedEvents)
throws AcsJStateMachineActionEx
AcsJStateMachineActionExprotected void destroyEnvironmentAction(EventDispatcher evtDispatcher,
ErrorReporter errRep,
SCInstance scInstance,
java.util.Collection<TriggerEvent> derivedEvents)
throws AcsJStateMachineActionEx
AcsJStateMachineActionExprotected void createConnectionAction(EventDispatcher evtDispatcher,
ErrorReporter errRep,
SCInstance scInstance,
java.util.Collection<TriggerEvent> derivedEvents)
throws AcsJStateMachineActionEx
AcsJStateMachineActionExprotected void destroyConnectionAction(EventDispatcher evtDispatcher,
ErrorReporter errRep,
SCInstance scInstance,
java.util.Collection<TriggerEvent> derivedEvents)
throws AcsJStateMachineActionEx
Shuts down the event queue. Queued events may still be processed by the receivers afterwards, but here we wait for up to 500 ms to log it if it is the case.
Further events delivered to processEventAsync(Object, EventDescription)
will cause an exception there.
Subclass may override, but must call super.destroyConnectionAction().
AcsJStateMachineActionExprotected void suspendAction(EventDispatcher evtDispatcher,
ErrorReporter errRep,
SCInstance scInstance,
java.util.Collection<TriggerEvent> derivedEvents)
throws AcsJStateMachineActionEx
AcsJStateMachineActionExprotected void resumeAction(EventDispatcher evtDispatcher,
ErrorReporter errRep,
SCInstance scInstance,
java.util.Collection<TriggerEvent> derivedEvents)
throws AcsJStateMachineActionEx
AcsJStateMachineActionExprotected abstract double getMaxProcessTimeSeconds(java.lang.String eventName)
protected abstract void logEventReceiveHandlerException(java.lang.String eventName,
java.lang.String receiverClassName,
java.lang.Throwable thr)
protected abstract void logEventProcessingTimeExceeded(java.lang.String eventName,
long logOcurrencesNumber)
protected abstract void logEventProcessingTooSlowForEventRate(long numEventsDiscarded,
java.lang.String eventName)
numEventsDiscarded - The number of events that have actually been discarded since the last log.
Will often be 0 when we just warn about the queue filling up.eventName - protected abstract void logNoEventReceiver(java.lang.String eventName)
The subclass must know whether such a condition is expected or not, e.g. because event filtering is set up outside of the subscriber and only subscribed event types are expected to arrive.
protected abstract void logQueueShutdownError(int timeoutMillis,
int remainingEvents)
protected void processEventAsync(java.lang.Object eventData,
EventDescription eventDesc)
processEvent(Object, EventDescription),
using eventHandlingExecutor.
This method should be called from the subclass-specific method that receives the event,
for example push_structured_event in case of Corba NC.
This method is thread-safe.
eventData - (defined as Object instead of T to include data for generic subscription).eventDesc - event meta dataprotected void processEvent(java.lang.Object eventData,
EventDescription eventDesc)
push_structured_event in case of Corba NC,
or preferably via processEventAsync(Object, EventDescription).
No exception is allowed to be thrown by this method, even if the receiver implementation throws a RuntimeExecption
eventData - (defined as Object instead of T to include data for generic subscription).eventDesc - public java.lang.String getLifecycleState()
AcsEventSubscriberThis state can be used only for debug output. The state names may change over time and should not be used to control execution.
getLifecycleState in interface AcsEventSubscriber<T>public boolean hasGenericReceiver()
public int getNumberOfReceivers()
public final void addGenericSubscription(AcsEventSubscriber.GenericCallback receiver) throws AcsJEventSubscriptionEx
If in addition to this generic subscription we also have specific subscriptions via
#addSubscription(Class, Callback),
then those more specific subscriptions will take precedence in receiving an event.
Notice though that any server-side filters previously created for the event type specific subscriptions get deleted when calling this method, so that even after removing a generic subscription the network performance gain of server-side filtering is lost.
addGenericSubscription in interface AcsEventSubscriber<T>receiver - The callback to use when receiving eventsAcsJEventSubscriptionEx - If there is a problem and the generic receiver cannot
be addedpublic final void removeGenericSubscription()
throws AcsJEventSubscriptionEx
removeGenericSubscription in interface AcsEventSubscriber<T>AcsJCORBAProblemExAcsJEventSubscriptionEx - If there is any problem while unsubscribing the generic receiverpublic final <U extends T> void addSubscription(AcsEventSubscriber.Callback<U> receiver) throws AcsJEventSubscriptionEx
AcsEventSubscriberAcsEventSubscriber.Callback.getEventType()
on the receiver parameter.
Note that the same event type can only be subscribed to with one handler, which means that another handler added for the same type will replace the previous handler.
The event type must be that of actual events, not of base classes.
For example, the AcsEventSubscriber could be parameterized with base type IDLEntity
and we could then add two subscriptions for events defined as IDL structs,
say AntennaStatus and TemperatureData.
If you want to subscribe to events without knowing their exact type,
use AcsEventSubscriber.addGenericSubscription(GenericCallback) instead.
addSubscription in interface AcsEventSubscriber<T>receiver - The callback to use when receiving events for the specified type.AcsJEventSubscriptionEx - If there is a problem and the receiver cannot be addedpublic final <U extends T> void removeSubscription(java.lang.Class<U> structClass) throws AcsJEventSubscriptionEx
AcsEventSubscriberremoveSubscription in interface AcsEventSubscriber<T>structClass - the event type to be unsubscribed. If null, then all subscriptions but the generic
subscription are removed.AcsJEventSubscriptionEx - if the specified event type has not been previously subscribed or if the removal fails with a
technical problem.public final void startReceivingEvents()
throws AcsJIllegalStateEventEx,
AcsJCouldntPerformActionEx
AcsEventSubscriber
No further invocations should be attempted on this method after one
has been already successful. Otherwise, an AcsJIllegalStateEventEx
will be thrown.
If this method is not called, no event will ever be received
startReceivingEvents in interface AcsEventSubscriber<T>AcsJIllegalStateEventEx - If the user calls this method on an object that
is already receiving eventsAcsJCouldntPerformActionEx - If any error happens while trying
to start receiving eventspublic final void disconnect()
throws AcsJIllegalStateEventEx,
AcsJCouldntPerformActionEx
AcsEventSubscriber
Calling this method over a subscriber object that has been already disconnected
will throw an AcsJIllegalStateEventEx.
disconnect in interface AcsEventSubscriber<T>AcsJIllegalStateEventEx - If this method is called on an AcsEventSubscriber object
that has been already disconnectedAcsJCouldntPerformActionExAcsEventSubscriber.disconnect()public final void suspend()
throws AcsJIllegalStateEventEx,
AcsJCouldntPerformActionEx
AcsEventSubscriber
If the Subscriber has been connected already (method AcsEventSubscriber.startReceivingEvents(),
then after calling this method, incoming events will be buffered instead of being discarded;
unexpired events will be received later, after a call to AcsEventSubscriber.resume().
// * This call has no effect if the Subscriber is not connected, or if it is // * connected but already suspended.
suspend in interface AcsEventSubscriber<T>AcsJIllegalStateEventEx - if the subscriber is not connected to an NC.AcsJCouldntPerformActionExpublic final void resume()
throws AcsJIllegalStateEventEx,
AcsJCouldntPerformActionEx
AcsEventSubscribersuspend() method. Queued events will be received after
this call, see AcsEventSubscriber.suspend().
This call has no effect if the Subscriber is not connected, or if it is
connected and already processing events.resume in interface AcsEventSubscriber<T>AcsJIllegalStateEventExAcsJCouldntPerformActionExprotected abstract void notifyFirstSubscription(java.lang.Class<?> structClass)
throws AcsJEventSubscriptionEx
structClass - Can be null in case of generic subscription.AcsJEventSubscriptionExprotected abstract void notifySubscriptionRemoved(java.lang.Class<?> structClass)
throws AcsJEventSubscriptionEx
structClass - AcsJEventSubscriptionExprotected abstract void notifyNoSubscription()
public boolean isSuspended()
AcsEventSubscribertrue if this subscriber has been suspended.isSuspended in interface AcsEventSubscriber<T>public final boolean isDisconnected()