ddt 1.2.1
Loading...
Searching...
No Matches
Protected Attributes | List of all members
ddt::DdtConnectionManager Class Reference

#include <ddtConnectionManager.hpp>

Inheritance diagram for ddt::DdtConnectionManager:
DdtConnectionManagerFake

Protected Attributes

const int32_t SHM_TIMEOUT_DEFAULT = 10
 
const int32_t WAITING_TIME_DEFAULT = 1000
 
const int32_t REPLY_TIME_DEFAULT = 6
 
const int32_t HEARTBEAT_INTERVAL_DEFAULT = 1
 
const int32_t HEARTBEAT_TIMEOUT_DEFAULT = 10
 
int32_t shm_timeout
 
int32_t waiting_time
 
int32_t reply_time
 
int32_t heartbeat_interval
 
int32_t heartbeat_timeout
 
std::set< std::string > registered_publishers
 
std::map< std::string, DdtStatisticsstatistics_map
 
 DdtConnectionManager (DdtLogger *ddt_logger, const std::string uri_string, const std::string config)
 
 DdtConnectionManager (DdtMemoryManager *const mmgr, DdtLogger *ddt_logger, const std::string uri_string, const std::string config)
 
 ~DdtConnectionManager () override
 
int32_t RegisterPublisher (const std::string &data_stream_identifier, const int32_t latency, int32_t deadline, const int32_t max_data_sample_size, const int32_t number_of_samples, const bool compute_checksum, const std::string &publishing_uri) override
 
int32_t UnregisterPublisher (const std::string &data_stream_identifier) override
 
int32_t UnregisterPublishers ()
 
void PublishData (const std::string &data_stream_identifier) override
 
int32_t RegisterSubscriber (const std::string &subscriber_uuid, const std::string &data_stream_identifier, const std::string &remote_broker_uri, const int32_t latency, const int32_t deadline) override
 
int32_t RegisterRemoteSubscriber (const std::string &remote_broker, const std::string &subscriber_uuid, const std::string &data_stream_identifier, const int32_t latency, const int32_t deadline) override
 
int32_t UnregisterSubscriber (const std::string &data_stream_identifier, const std::string &subscriber_uuid) override
 
int32_t UnregisterSubscribers ()
 
int32_t get_max_data_sample_size (const std::string &data_stream_identifier) override
 
int32_t get_number_of_samples (const std::string &data_stream_identifier) override
 
std::string get_publishing_uri (const std::string &data_stream_identifier) override
 
int32_t get_notification_port (const std::string &data_stream_identifier) override
 
std::vector< std::string > get_statistics (const std::string &data_stream_identifier) override
 
int32_t get_heartbeat_interval () override
 
int32_t get_heartbeat_timeout () override
 
bool get_compute_checksum (const std::string &data_stream_identifier) override
 
std::string get_shm_id (const std::string &data_stream_identifier) override
 
std::string get_shm_full_path (const std::string &data_stream_identifier) override
 
std::string get_broker_uri () override
 
void UpdateHeartbeat (const std::string &identifier) override
 
bool CheckPubRegistrationPermitted (const std::string &data_stream_identifier) override
 
bool CheckPublisherExists (const std::string &data_stream_identifier) override
 
bool CheckRemotePublisherExists (const std::string &remote_broker_uri, const std::string &data_stream_identifier) override
 
void UpdateStatistics (const std::string &data_stream_identifier, const int32_t datavec_size, const uint64_t source_timestamp) override
 
int32_t GetMaxPossibleBufferSize (int32_t max_data_sample_size) override
 
std::vector< std::string > GetRegisteredStreams () override
 
std::vector< std::string > GetConnectedBrokers () override
 
void LoadDefaults ()
 
std::string GetConfigPath () const
 
void ReadIni ()
 
std::string CreateSubscriptionUri (const std::string publishing_uri, const std::string remote_broker_uri) const
 
bool CheckStreamIdInUse (const std::string &data_stream_identifier)
 
void CreateStatistics (const std::string &data_stream_identifier, const int32_t number_of_samples)
 
void ResetStatistics (const std::string &data_stream_identifier)
 

Detailed Description

This class manages the connection handling between Data Brokers and Publisher / Subscriber applications.

Constructor & Destructor Documentation

◆ DdtConnectionManager() [1/2]

DdtConnectionManager::DdtConnectionManager ( DdtLogger * ddt_logger,
const std::string uri_string,
const std::string config )
explicit

Constructor

Parameters
ddt_loggerA DDT logger object (no transfer of ownership).
uri_stringURI of the broker.
configBroker configuration file.

◆ DdtConnectionManager() [2/2]

DdtConnectionManager::DdtConnectionManager ( DdtMemoryManager *const mmgr,
DdtLogger * ddt_logger,
const std::string uri_string,
const std::string config )
explicit

Constructor

Parameters
mmgrA DDT memory manager object (no transfer of ownership).
ddt_loggerA DDT logger object (no transfer of ownership).
uri_stringURI of the broker.
configBroker configuration file.

◆ ~DdtConnectionManager()

DdtConnectionManager::~DdtConnectionManager ( )
override

Destructor

Member Function Documentation

◆ CheckPublisherExists()

bool DdtConnectionManager::CheckPublisherExists ( const std::string & data_stream_identifier)
override

Checks if a publisher with the specified data stream identifier exists.

Parameters
data_stream_identifierThe data stream identifier.
Returns
True if a publisher for the specified data stream exists, false otherwise.

◆ CheckPubRegistrationPermitted()

bool DdtConnectionManager::CheckPubRegistrationPermitted ( const std::string & data_stream_identifier)
override

Checks if a publisher is permitted for registration. A publisher is rejected if its stream identifier is already in use, except heartbeats are missing. Then a publisher is allowed to reregister.

Parameters
data_stream_identifierThe data stream identifier.
Returns
True if registration is permitted, false otherwise.

◆ CheckRemotePublisherExists()

bool DdtConnectionManager::CheckRemotePublisherExists ( const std::string & remote_broker_uri,
const std::string & data_stream_identifier )
override

Check if remote publisher for specified stream id exists.

Parameters
remote_broker_uriURI of the remote broker.
data_stream_identifierThe data stream identifier.
Returns
True if a remote publisher for the specified data stream exists, false otherwise.

◆ CheckStreamIdInUse()

bool DdtConnectionManager::CheckStreamIdInUse ( const std::string & data_stream_identifier)
protected

This function checks if the stream identifier is already in use. This will prevent a publisher using the same stream identifier as a subscriber that has subscribed to a remote publisher.

Parameters
data_stream_identifierThe data stream identifier.
Returns
True if stream identifier is in use, false if not.

◆ CreateStatistics()

void DdtConnectionManager::CreateStatistics ( const std::string & data_stream_identifier,
const int32_t number_of_samples )
protected

Inserts a DdtStatistics object into a map. This is done for each data stream.

Parameters
data_stream_identifierThe data stream identifier.
number_of_samplesThe queue capacity.

Create a map for the statistics. The statistics are bound to the lifetime of the shared memory. In case a publisher is restarted within the shm timeout while subscribers are still alive, the statistics are not reset.

◆ CreateSubscriptionUri()

std::string DdtConnectionManager::CreateSubscriptionUri ( const std::string publishing_uri,
const std::string remote_broker_uri ) const
protected

Creates the subscription uri.

Parameters
publishing_uriPublishing URI.
remote_broker_uriURI of the remote broker.
Returns
A URI for subscriptions.

Creates the subscription URI. Therefore, it extracts the port and the path element from the publishing uri and takes the ip address from the remote broker uri. Example: publishing_uri: zpb.ps://222.22.222.222:5100/ds1 remote_broker_uri: zpb.rr://111.11.111.111:5001/broker/Broker1 resulting subscription_uri: zpb.ps://111.11.111.111:5100/ds1

◆ get_broker_uri()

std::string DdtConnectionManager::get_broker_uri ( )
override

Returns the URI of the broker including the host name.

Returns
broker_uri.

◆ get_compute_checksum()

bool DdtConnectionManager::get_compute_checksum ( const std::string & data_stream_identifier)
override

Returns compute_checksum from the memory accessor.

Parameters
data_stream_identifierThe data stream identifier.
Returns
True if the checksum computation is switched on, false otherwise.

◆ get_heartbeat_interval()

int32_t DdtConnectionManager::get_heartbeat_interval ( )
override

Returns heartbeat_interval.

Returns
The heartbeat interval in [s].

◆ get_heartbeat_timeout()

int32_t DdtConnectionManager::get_heartbeat_timeout ( )
override

Returns heartbeat_timeout.

Returns
The heartbeat timeout in [s].

◆ get_max_data_sample_size()

int32_t DdtConnectionManager::get_max_data_sample_size ( const std::string & data_stream_identifier)
override

Returns max_data_sample_size.

Parameters
data_stream_identifierThe data stream identifier.
Returns
Maximum data sample size.

◆ get_notification_port()

int32_t DdtConnectionManager::get_notification_port ( const std::string & data_stream_identifier)
override

Returns notification_port.

Parameters
data_stream_identifierThe data stream identifier.
Returns
Port used for notifications.

◆ get_number_of_samples()

int32_t DdtConnectionManager::get_number_of_samples ( const std::string & data_stream_identifier)
override

Returns number_of_samples.

Parameters
data_stream_identifierThe data stream identifier.
Returns
Number of samples stored in the SHM ring buffer.

◆ get_publishing_uri()

std::string DdtConnectionManager::get_publishing_uri ( const std::string & data_stream_identifier)
override

Returns publishing_uri.

Parameters
data_stream_identifierThe data stream identifier.
Returns
Publishing URI.

◆ get_shm_full_path()

std::string DdtConnectionManager::get_shm_full_path ( const std::string & data_stream_identifier)
override

Returns a full path to the shm file.

Parameters
data_stream_identifierThe data stream identifier.
Returns
The full path to the shm file.

◆ get_shm_id()

std::string DdtConnectionManager::get_shm_id ( const std::string & data_stream_identifier)
override

Returns the shared memory identifier.

Parameters
data_stream_identifierThe data stream identifier.
Returns
SHM identifier.

◆ get_statistics()

std::vector< std::string > DdtConnectionManager::get_statistics ( const std::string & data_stream_identifier)
override

Returns the statistics.

Parameters
data_stream_identifierThe data stream identifier.
Returns
A vector containing the statistics.

◆ GetConfigPath()

std::string DdtConnectionManager::GetConfigPath ( ) const
protected

This function reads the environment variable DDT_TRANSFERCONFIG_PATH and returns the path of the configuration file.

Returns
A string containing the path to the configuration file.

check if DDT_TRANSFERCONFIG_PATH is set return empty string if not set

◆ GetConnectedBrokers()

std::vector< std::string > DdtConnectionManager::GetConnectedBrokers ( )
override

Returns the URIs of all connected remote brokers.

Returns
A vector containing the URIs of all connected remote brokers.

◆ GetMaxPossibleBufferSize()

int32_t DdtConnectionManager::GetMaxPossibleBufferSize ( int32_t max_data_sample_size)
override

Requests the max possible buffer size (computed by the memory manager).

Parameters
max_data_sample_sizeThe maximum data sample size.
Returns
The max possible buffer size.

◆ GetRegisteredStreams()

std::vector< std::string > DdtConnectionManager::GetRegisteredStreams ( )
override

Returns all registered data streams

Returns
A vector containing all data stream identifiers.

◆ LoadDefaults()

void DdtConnectionManager::LoadDefaults ( )
protected

Loads default values for configuration parameters.

◆ PublishData()

void DdtConnectionManager::PublishData ( const std::string & data_stream_identifier)
override

This function

  1. notifies the memory reader that new data was written into shared memory by a publisher
  2. fetches the latest data packet using the memory accessor
  3. publishes the data packet over network using the DdtDataProducer object
  4. notifies all local subscribers that new data is available
    Parameters
    data_stream_identifierThe data stream identifier.

if there is at least one remote subscriber, read the packet from shared memory and publish it over the network

◆ ReadIni()

void DdtConnectionManager::ReadIni ( )
protected

Reads the databroker configuration file.

An exception is thrown if the config file does not exist. In that case the default values are used instead.

make sure shm_timeout is at least SHM_TIMEOUT_MIN set to default value otherwise

make sure waiting_time is at least WAITING_TIME_MIN set to default value otherwise

make sure reply_time is at least REPLY_TIME_MIN set to default value otherwise

make sure heartbeat_interval is at least HEARTBEAT_INTERVAL_MIN set to default value otherwise

make sure heartbeat_timeout is at least HEARTBEAT_TIMEOUT_MIN set to default value otherwise

make sure heartbeat_timeout is greater than reply_time set to default value otherwise

◆ RegisterPublisher()

int32_t DdtConnectionManager::RegisterPublisher ( const std::string & data_stream_identifier,
const int32_t latency,
int32_t deadline,
const int32_t max_data_sample_size,
const int32_t number_of_samples,
const bool compute_checksum,
const std::string & publishing_uri )
override

This function is called from Publisher applications by the DdtDataPublishers. For each Publisher application a DdtDataProducer object is created which are stored in a producer map. This also triggers the registration process in the DdtMemoryManager.

Parameters
data_stream_identifierThe data stream identifier.
latencyA MAL QoS parameter. Shall be the maximum time a sample may remain in-transit between the publisher and subscriber in [ms].
deadlineA MAL QoS parameter. Shall be the maximum age of a sample in [s].
max_data_sample_sizeThe maximum size of the data samples.
number_of_samplesNumber of samples stored in the SHM ring buffer.
compute_checksumSpecifies if checksum computation is switched on.
publishing_uriPublishing URI.
Returns
1 if registration was successful, -1 in case of a general error, -2 in case of a memory error.

Reuse the publishing URI if the publisher was restarted.

It takes some time to establish communication mechanism on startup of the publishers and subscribers (see MAL API description). Wait after publisher has been instantiated. This does NOT guarantee that the first n message will not be lost.

◆ RegisterRemoteSubscriber()

int32_t DdtConnectionManager::RegisterRemoteSubscriber ( const std::string & remote_broker,
const std::string & subscriber_uuid,
const std::string & data_stream_identifier,
const int32_t latency,
const int32_t deadline )
override

This function is called from remote brokers to register a remote subscriber. This triggers the registration process in the DdtMemoryManager and creates a new DdtDataConsumer object.

Parameters
remote_brokerURI of the remote broker.
subscriber_uuidUUID of the subscriber.
data_stream_identifierThe data stream identifier.
latencyA MAL QoS parameter. Shall be the maximum time a sample may remain in-transit between the publisher and subscriber in [ms].
deadlineA MAL QoS parameter. Shall be the maximum age of a sample in [s].
Returns
1 if registration was successful.

◆ RegisterSubscriber()

int32_t DdtConnectionManager::RegisterSubscriber ( const std::string & subscriber_uuid,
const std::string & data_stream_identifier,
const std::string & remote_broker_uri,
const int32_t latency,
const int32_t deadline )
override

Registers Subscriber applications. Distinguishes between local and remote subscribers. if local:

  • register the subscriber at the DdtMemoryManager
  • create a DdtDataConsumer object and store it in a consumer map if remote:
  • create a MAL client and ask remote broker for the required buffer size
  • register the subscriber at the DdtMemoryManager
  • register the subscriber at the remote broker
  • create a DdtDataConsumer object and store it in a consumer map
  • start MAL subscription on the DdtDataConsumer
    Parameters
    subscriber_uuidUUID of the subscriber.
    data_stream_identifierThe data stream identifier.
    remote_broker_uriURI of the remote broker.
    latencyA MAL QoS parameter. Shall be the maximum time a sample may remain in-transit between the publisher and subscriber in [ms].
    deadlineA MAL QoS parameter. Shall be the maximum age of a sample in [s].
    Returns
    1 if unregistration was successful, -1 otherwise.

Check if the remote broker uri was specified register as local subscriber if this is not the case

◆ ResetStatistics()

void DdtConnectionManager::ResetStatistics ( const std::string & data_stream_identifier)
protected

Resets / deletes the statistics in case the shared memory is deleted.

Parameters
data_stream_identifierThe data stream identifier.

◆ UnregisterPublisher()

int32_t DdtConnectionManager::UnregisterPublisher ( const std::string & data_stream_identifier)
override

This function stops the memory reader, unregisters the Publisher application from the DdtMemoryManager and deletes the DdtDataProducer.

Parameters
data_stream_identifierThe data stream identifier.
Returns
1 if unregistration was successful.

◆ UnregisterPublishers()

int32_t DdtConnectionManager::UnregisterPublishers ( )

Unregisters all publishers.

Returns
1 if unregistration was successful.

go through the list of all publishers and unregister each of them

◆ UnregisterSubscriber()

int32_t DdtConnectionManager::UnregisterSubscriber ( const std::string & data_stream_identifier,
const std::string & subscriber_uuid )
override

Unregisters subscribers from their brokers. To unregister from remote brokers a MAL client is created. If it is the last subscriber of a data stream:

  • stops MAL subscription on the DdtDataConsumer
  • unregisters the Subscriber application from the DdtMemoryManager
  • deletes the DdtDataConsumer object
    Parameters
    data_stream_identifierThe data stream identifier.
    subscriber_uuidUUID of the subscriber.
    Returns
    1 if unregistration was successful.

Set publishing uri to "" if there are no remote subscribers.

◆ UnregisterSubscribers()

int32_t DdtConnectionManager::UnregisterSubscribers ( )

Unregisters all subscribers.

Returns
1 if unregistration was successful.

go through the list of all subscribers and unregister each of them

◆ UpdateHeartbeat()

void DdtConnectionManager::UpdateHeartbeat ( const std::string & identifier)
override

Updates the timestamp in the client map.

Parameters
identifierThe data stream identifier.

◆ UpdateStatistics()

void DdtConnectionManager::UpdateStatistics ( const std::string & data_stream_identifier,
const int32_t datavec_size,
const uint64_t source_timestamp )
override

Updates the statistic counters

Parameters
data_stream_identifierThe data stream identifier.
datavec_sizeThe size of the image data.
source_timestampThe source timestamp.

Member Data Documentation

◆ heartbeat_interval

int32_t ddt::DdtConnectionManager::heartbeat_interval
protected

Interval for the heartbeat in [s].

◆ HEARTBEAT_INTERVAL_DEFAULT

const int32_t ddt::DdtConnectionManager::HEARTBEAT_INTERVAL_DEFAULT = 1
protected

Interval in [s] for the heartbeat which is used to monitor the status of the connection between Data Broker and Subscriber/Publisher.

◆ heartbeat_timeout

int32_t ddt::DdtConnectionManager::heartbeat_timeout
protected

Timeout for the heartbeat in [s].

◆ HEARTBEAT_TIMEOUT_DEFAULT

const int32_t ddt::DdtConnectionManager::HEARTBEAT_TIMEOUT_DEFAULT = 10
protected

Timeout in [s] for the heartbeat. If no heartbeat signal was received for the configured time the MAL client (Publisher or Subscriber) is unregistered.

◆ registered_publishers

std::set<std::string> ddt::DdtConnectionManager::registered_publishers
protected

A set containing all registered publishers.

Parameters
keyThe data stream identifier.

◆ reply_time

int32_t ddt::DdtConnectionManager::reply_time
protected

Reply time for MAL clients in [s].

◆ REPLY_TIME_DEFAULT

const int32_t ddt::DdtConnectionManager::REPLY_TIME_DEFAULT = 6
protected

Maximum time in [s] for establishing a connection to a MAL server. If the time is exceeded the connection attempt fails.

◆ shm_timeout

int32_t ddt::DdtConnectionManager::shm_timeout
protected

Timeout after which a shared memory is deleted in [s].

◆ SHM_TIMEOUT_DEFAULT

const int32_t ddt::DdtConnectionManager::SHM_TIMEOUT_DEFAULT = 10
protected

Specifies the time in [s] after which an allocated SHM is deleted. This timeout is used when a publisher was unregistered but subscribers are still registered.

◆ statistics_map

std::map<std::string, DdtStatistics> ddt::DdtConnectionManager::statistics_map
protected

A map containing statistics for each data stream.

Parameters
keyThe data stream identifier.
valueA DDT statistics object.

◆ waiting_time

int32_t ddt::DdtConnectionManager::waiting_time
protected

Waiting time for MAL publishers to establish communication in [ms].

◆ WAITING_TIME_DEFAULT

const int32_t ddt::DdtConnectionManager::WAITING_TIME_DEFAULT = 1000
protected

Time in [ms] that the MAL publishers may use for establishing the communication. If the time is exceeded the connection attempt fails.


The documentation for this class was generated from the following files: