ddt 1.1.0
|
#include <ddtConnectionManager.hpp>
Protected Attributes | |
const int32_t | SHM_TIMEOUT_DEFAULT = 10 |
const int32_t | WAITING_TIME_DEFAULT = 1000 |
const int32_t | REPLY_TIME_DEFAULT = 6 |
const int32_t | HEARTBEAT_INTERVAL_DEFAULT = 1 |
const int32_t | HEARTBEAT_TIMEOUT_DEFAULT = 10 |
int32_t | shm_timeout |
int32_t | waiting_time |
int32_t | reply_time |
int32_t | heartbeat_interval |
int32_t | heartbeat_timeout |
std::set< std::string > | registered_publishers |
std::map< std::string, DdtStatistics > | statistics_map |
DdtConnectionManager (DdtLogger *ddt_logger, const std::string uri_string, const std::string config) | |
DdtConnectionManager (DdtMemoryManager *const mmgr, DdtLogger *ddt_logger, const std::string uri_string, const std::string config) | |
~DdtConnectionManager () override | |
int32_t | RegisterPublisher (const std::string &data_stream_identifier, const int32_t latency, int32_t deadline, const int32_t max_data_sample_size, const int32_t number_of_samples, const bool compute_checksum, const std::string &publishing_uri) override |
int32_t | UnregisterPublisher (const std::string &data_stream_identifier) override |
int32_t | UnregisterPublishers () |
void | PublishData (const std::string &data_stream_identifier) override |
int32_t | RegisterSubscriber (const std::string &subscriber_uuid, const std::string &data_stream_identifier, const std::string &remote_broker_uri, const int32_t latency, const int32_t deadline) override |
int32_t | RegisterRemoteSubscriber (const std::string &remote_broker, const std::string &subscriber_uuid, const std::string &data_stream_identifier, const int32_t latency, const int32_t deadline) override |
int32_t | UnregisterSubscriber (const std::string &data_stream_identifier, const std::string &subscriber_uuid) override |
int32_t | UnregisterSubscribers () |
int32_t | get_max_data_sample_size (const std::string &data_stream_identifier) override |
int32_t | get_number_of_samples (const std::string &data_stream_identifier) override |
std::string | get_publishing_uri (const std::string &data_stream_identifier) override |
int32_t | get_notification_port (const std::string &data_stream_identifier) override |
std::vector< std::string > | get_statistics (const std::string &data_stream_identifier) override |
int32_t | get_heartbeat_interval () override |
int32_t | get_heartbeat_timeout () override |
bool | get_compute_checksum (const std::string &data_stream_identifier) override |
std::string | get_shm_id (const std::string &data_stream_identifier) override |
std::string | get_broker_uri () override |
void | UpdateHeartbeat (const std::string &identifier) override |
bool | CheckPubRegistrationPermitted (const std::string &data_stream_identifier) override |
bool | CheckPublisherExists (const std::string &data_stream_identifier) override |
bool | CheckRemotePublisherExists (const std::string &remote_broker_uri, const std::string &data_stream_identifier) override |
void | UpdateStatistics (const std::string &data_stream_identifier, const int32_t datavec_size, const uint64_t source_timestamp) override |
int32_t | GetMaxPossibleBufferSize (int32_t max_data_sample_size) override |
std::vector< std::string > | GetRegisteredStreams () override |
std::vector< std::string > | GetConnectedBrokers () override |
void | LoadDefaults () |
std::string | GetConfigPath () const |
void | ReadIni () |
std::string | CreateSubscriptionUri (const std::string publishing_uri, const std::string remote_broker_uri) const |
bool | CheckStreamIdInUse (const std::string &data_stream_identifier) |
void | CreateStatistics (const std::string &data_stream_identifier, const int32_t number_of_samples) |
void | ResetStatistics (const std::string &data_stream_identifier) |
This class manages the connection handling between Data Brokers and Publisher / Subscriber applications.
|
explicit |
Constructor
ddt_logger | A DDT logger object (no transfer of ownership). |
uri_string | URI of the broker. |
config | Broker configuration file. |
|
explicit |
Constructor
mmgr | A DDT memory manager object (no transfer of ownership). |
ddt_logger | A DDT logger object (no transfer of ownership). |
uri_string | URI of the broker. |
config | Broker configuration file. |
|
override |
Destructor
|
override |
Checks if a publisher with the specified data stream identifier exists.
data_stream_identifier | The data stream identifier. |
|
override |
Checks if a publisher is permitted for registration. A publisher is rejected if its stream identifier is already in use, except heartbeats are missing. Then a publisher is allowed to reregister.
data_stream_identifier | The data stream identifier. |
|
override |
Check if remote publisher for specified stream id exists.
remote_broker_uri | URI of the remote broker. |
data_stream_identifier | The data stream identifier. |
|
protected |
This function checks if the stream identifier is already in use. This will prevent a publisher using the same stream identifier as a subscriber that has subscribed to a remote publisher.
data_stream_identifier | The data stream identifier. |
|
protected |
Inserts a DdtStatistics object into a map. This is done for each data stream.
data_stream_identifier | The data stream identifier. |
number_of_samples | The queue capacity. |
Create a map for the statistics. The statistics are bound to the lifetime of the shared memory. In case a publisher is restarted within the shm timeout while subscribers are still alive, the statistics are not reset.
|
protected |
Creates the subscription uri.
publishing_uri | Publishing URI. |
remote_broker_uri | URI of the remote broker. |
Creates the subscription URI. Therefore, it extracts the port and the path element from the publishing uri and takes the ip address from the remote broker uri. Example: publishing_uri: zpb.ps://222.22.222.222:5100/ds1 remote_broker_uri: zpb.rr://111.11.111.111:5001/broker/Broker1 resulting subscription_uri: zpb.ps://111.11.111.111:5100/ds1
|
override |
Returns the URI of the broker including the host name.
|
override |
Returns compute_checksum from the memory accessor.
data_stream_identifier | The data stream identifier. |
|
override |
Returns heartbeat_interval.
|
override |
Returns heartbeat_timeout.
|
override |
Returns max_data_sample_size.
data_stream_identifier | The data stream identifier. |
|
override |
Returns notification_port.
data_stream_identifier | The data stream identifier. |
|
override |
Returns number_of_samples.
data_stream_identifier | The data stream identifier. |
|
override |
Returns publishing_uri.
data_stream_identifier | The data stream identifier. |
|
override |
Returns the shared memory identifier.
data_stream_identifier | The data stream identifier. |
|
override |
Returns the statistics.
data_stream_identifier | The data stream identifier. |
|
protected |
This function reads the environment variable DDT_TRANSFERCONFIG_PATH and returns the path of the configuration file.
check if DDT_TRANSFERCONFIG_PATH is set return empty string if not set
|
override |
Returns the URIs of all connected remote brokers.
|
override |
Requests the max possible buffer size (computed by the memory manager).
max_data_sample_size | The maximum data sample size. |
|
override |
Returns all registered data streams
|
protected |
Loads default values for configuration parameters.
|
override |
This function
data_stream_identifier | The data stream identifier. |
if there is at least one remote subscriber, read the packet from shared memory and publish it over the network
|
protected |
Reads the databroker configuration file.
An exception is thrown if the config file does not exist. In that case the default values are used instead.
make sure shm_timeout is at least SHM_TIMEOUT_MIN set to default value otherwise
make sure waiting_time is at least WAITING_TIME_MIN set to default value otherwise
make sure reply_time is at least REPLY_TIME_MIN set to default value otherwise
make sure heartbeat_interval is at least HEARTBEAT_INTERVAL_MIN set to default value otherwise
make sure heartbeat_timeout is at least HEARTBEAT_TIMEOUT_MIN set to default value otherwise
make sure heartbeat_timeout is greater than reply_time set to default value otherwise
|
override |
This function is called from Publisher applications by the DdtDataPublishers. For each Publisher application a DdtDataProducer object is created which are stored in a producer map. This also triggers the registration process in the DdtMemoryManager.
data_stream_identifier | The data stream identifier. |
latency | A MAL QoS parameter. Shall be the maximum time a sample may remain in-transit between the publisher and subscriber in [ms]. |
deadline | A MAL QoS parameter. Shall be the maximum age of a sample in [s]. |
max_data_sample_size | The maximum size of the data samples. |
number_of_samples | Number of samples stored in the SHM ring buffer. |
compute_checksum | Specifies if checksum computation is switched on. |
publishing_uri | Publishing URI. |
Reuse the publishing URI if the publisher was restarted.
It takes some time to establish communication mechanism on startup of the publishers and subscribers (see MAL API description). Wait after publisher has been instantiated. This does NOT guarantee that the first n message will not be lost.
|
override |
This function is called from remote brokers to register a remote subscriber. This triggers the registration process in the DdtMemoryManager and creates a new DdtDataConsumer object.
remote_broker | URI of the remote broker. |
subscriber_uuid | UUID of the subscriber. |
data_stream_identifier | The data stream identifier. |
latency | A MAL QoS parameter. Shall be the maximum time a sample may remain in-transit between the publisher and subscriber in [ms]. |
deadline | A MAL QoS parameter. Shall be the maximum age of a sample in [s]. |
|
override |
Registers Subscriber applications. Distinguishes between local and remote subscribers. if local:
subscriber_uuid | UUID of the subscriber. |
data_stream_identifier | The data stream identifier. |
remote_broker_uri | URI of the remote broker. |
latency | A MAL QoS parameter. Shall be the maximum time a sample may remain in-transit between the publisher and subscriber in [ms]. |
deadline | A MAL QoS parameter. Shall be the maximum age of a sample in [s]. |
Check if the remote broker uri was specified register as local subscriber if this is not the case
|
protected |
Resets / deletes the statistics in case the shared memory is deleted.
data_stream_identifier | The data stream identifier. |
|
override |
This function stops the memory reader, unregisters the Publisher application from the DdtMemoryManager and deletes the DdtDataProducer.
data_stream_identifier | The data stream identifier. |
int32_t DdtConnectionManager::UnregisterPublishers | ( | ) |
Unregisters all publishers.
go through the list of all publishers and unregister each of them
|
override |
Unregisters subscribers from their brokers. To unregister from remote brokers a MAL client is created. If it is the last subscriber of a data stream:
data_stream_identifier | The data stream identifier. |
subscriber_uuid | UUID of the subscriber. |
Set publishing uri to "" if there are no remote subscribers.
int32_t DdtConnectionManager::UnregisterSubscribers | ( | ) |
Unregisters all subscribers.
go through the list of all subscribers and unregister each of them
|
override |
Updates the timestamp in the client map.
identifier | The data stream identifier. |
|
override |
Updates the statistic counters
data_stream_identifier | The data stream identifier. |
datavec_size | The size of the image data. |
source_timestamp | The source timestamp. |
|
protected |
Interval for the heartbeat in [s].
|
protected |
Interval in [s] for the heartbeat which is used to monitor the status of the connection between Data Broker and Subscriber/Publisher.
|
protected |
Timeout for the heartbeat in [s].
|
protected |
Timeout in [s] for the heartbeat. If no heartbeat signal was received for the configured time the MAL client (Publisher or Subscriber) is unregistered.
|
protected |
A set containing all registered publishers.
key | The data stream identifier. |
|
protected |
Reply time for MAL clients in [s].
|
protected |
Maximum time in [s] for establishing a connection to a MAL server. If the time is exceeded the connection attempt fails.
|
protected |
Timeout after which a shared memory is deleted in [s].
|
protected |
Specifies the time in [s] after which an allocated SHM is deleted. This timeout is used when a publisher was unregistered but subscribers are still registered.
|
protected |
A map containing statistics for each data stream.
key | The data stream identifier. |
value | A DDT statistics object. |
|
protected |
Waiting time for MAL publishers to establish communication in [ms].
|
protected |
Time in [ms] that the MAL publishers may use for establishing the communication. If the time is exceeded the connection attempt fails.