ddt  0.1
Public Member Functions | Protected Member Functions | Protected Attributes | List of all members
ddt::DdtConnectionManager Class Reference

#include <ddtConnectionManager.hpp>

Inheritance diagram for ddt::DdtConnectionManager:
DdtConnectionManagerFake

Public Member Functions

 DdtConnectionManager (DdtLogger *ddt_logger, const std::string uri_string)
 
 DdtConnectionManager (DdtMemoryManager *mmgr, DdtLogger *ddt_logger, const std::string uri_string)
 
virtual ~DdtConnectionManager ()
 
int32_t RegisterPublisher (const std::string &data_stream_identifier, int32_t latency, int32_t deadline, int32_t max_data_sample_size, int32_t number_of_samples, bool compute_checksum, const std::string &publishing_uri) override
 
int32_t UnregisterPublisher (const std::string &data_stream_identifier) override
 
int32_t UnregisterPublishers ()
 
void PublishData (const std::string &data_stream_identifier) override
 
int32_t RegisterSubscriber (const std::string &subscriber_uuid, const std::string &data_stream_identifier, const std::string &remote_broker_uri, int32_t latency, int32_t deadline) override
 
int32_t RegisterRemoteSubscriber (const std::string &remote_broker, const std::string &subscriber_uuid, const std::string &data_stream_identifier, const int32_t latency, const int32_t deadline) override
 
int32_t UnregisterSubscriber (const std::string &data_stream_identifier, const std::string &subscriber_uuid) override
 
int32_t UnregisterSubscribers ()
 
int32_t get_max_data_sample_size (const std::string &data_stream_identifier) override
 
int32_t get_number_of_samples (const std::string &data_stream_identifier) override
 
std::string get_publishing_uri (const std::string &data_stream_identifier) override
 
int32_t get_notification_port (const std::string &data_stream_identifier) override
 
std::vector< std::string > get_statistics (const std::string &data_stream_identifier) override
 
int32_t get_heartbeat_interval () override
 
int32_t get_heartbeat_timeout () override
 
bool get_compute_checksum (const std::string &data_stream_identifier) override
 
std::string get_shm_id (const std::string &data_stream_identifier) override
 
void UpdateHeartbeat (const std::string &identifier) override
 
bool CheckPubRegistrationPermitted (const std::string &data_stream_identifier) override
 
bool CheckPublisherExists (const std::string &data_stream_identifier) override
 
bool CheckRemotePublisherExists (const std::string &remote_broker_uri, const std::string &data_stream_identifier) override
 
void UpdateStatistics (const std::string &data_stream_identifier, const int32_t datavec_size, const uint64_t source_timestamp) override
 
int32_t GetMaxPossibleBufferSize (int32_t max_data_sample_size) override
 
std::vector< std::string > GetRegisteredStreams () override
 
std::vector< std::string > GetConnectedBrokers () override
 

Protected Member Functions

void LoadDefaults ()
 
const std::string GetConfigPath ()
 
void ReadIni ()
 

Protected Attributes

const int32_t SHM_TIMEOUT_DEFAULT = 10
 
const int32_t WAITING_TIME_DEFAULT = 1000
 
const int32_t REPLY_TIME_DEFAULT = 6
 
const int32_t HEARTBEAT_INTERVAL_DEFAULT = 1
 
const int32_t HEARTBEAT_TIMEOUT_DEFAULT = 10
 
int32_t shm_timeout
 
int32_t waiting_time
 
int32_t reply_time
 
int32_t heartbeat_interval
 
int32_t heartbeat_timeout
 

Detailed Description

This class manages the connection handling between Data Brokers and Publisher / Subscriber applications.

Constructor & Destructor Documentation

◆ DdtConnectionManager() [1/2]

DdtConnectionManager::DdtConnectionManager ( DdtLogger ddt_logger,
const std::string  uri_string 
)
explicit

Constructor

◆ DdtConnectionManager() [2/2]

DdtConnectionManager::DdtConnectionManager ( DdtMemoryManager mmgr,
DdtLogger ddt_logger,
const std::string  uri_string 
)
explicit

Constructor

◆ ~DdtConnectionManager()

DdtConnectionManager::~DdtConnectionManager ( )
virtual

Destructor

Member Function Documentation

◆ CheckPublisherExists()

bool DdtConnectionManager::CheckPublisherExists ( const std::string &  data_stream_identifier)
override

Checks if a publisher with the specified data stream identifier exists.

◆ CheckPubRegistrationPermitted()

bool DdtConnectionManager::CheckPubRegistrationPermitted ( const std::string &  data_stream_identifier)
override

Checks if a publisher is permitted for registration. A publisher is rejected if its stream identifier is already in use, except heartbeats are missing. Then a publisher is allowed to reregister.

◆ CheckRemotePublisherExists()

bool DdtConnectionManager::CheckRemotePublisherExists ( const std::string &  remote_broker_uri,
const std::string &  data_stream_identifier 
)
override

Check if remote publisher for specified stream id exists.

◆ get_compute_checksum()

bool DdtConnectionManager::get_compute_checksum ( const std::string &  data_stream_identifier)
override

Returns compute_checksum from the memory accessor.

◆ get_heartbeat_interval()

int32_t DdtConnectionManager::get_heartbeat_interval ( )
override

Returns heartbeat_interval.

◆ get_heartbeat_timeout()

int32_t DdtConnectionManager::get_heartbeat_timeout ( )
override

Returns heartbeat_timeout.

◆ get_max_data_sample_size()

int32_t DdtConnectionManager::get_max_data_sample_size ( const std::string &  data_stream_identifier)
override

Returns max_data_sample_size.

◆ get_notification_port()

int32_t DdtConnectionManager::get_notification_port ( const std::string &  data_stream_identifier)
override

Returns notification_port.

◆ get_number_of_samples()

int32_t DdtConnectionManager::get_number_of_samples ( const std::string &  data_stream_identifier)
override

Returns number_of_samples.

◆ get_publishing_uri()

std::string DdtConnectionManager::get_publishing_uri ( const std::string &  data_stream_identifier)
override

Returns publishing_uri.

◆ get_shm_id()

std::string DdtConnectionManager::get_shm_id ( const std::string &  data_stream_identifier)
override

Returns the shared memory identifier.

◆ get_statistics()

std::vector< std::string > DdtConnectionManager::get_statistics ( const std::string &  data_stream_identifier)
override

Returns the statistics.

◆ GetConfigPath()

const std::string DdtConnectionManager::GetConfigPath ( )
protected

This function reads the environment variable DDT_TRANSFERCONFIG_PATH and returns the path of the configuration file.

Returns
A string containing the path to the configuration file.

◆ GetConnectedBrokers()

std::vector< std::string > DdtConnectionManager::GetConnectedBrokers ( )
override

Returns the URIs of all connected remote brokers.

Returns
A vector containing the URIs of all connected remote brokers.

◆ GetMaxPossibleBufferSize()

int32_t DdtConnectionManager::GetMaxPossibleBufferSize ( int32_t  max_data_sample_size)
override

Requests the max possible buffer size (computed by the memory manager).

Parameters
max_data_sample_sizeThe maximum data sample size.
Returns
The max possible buffer size.

◆ GetRegisteredStreams()

std::vector< std::string > DdtConnectionManager::GetRegisteredStreams ( )
override

Returns all registered data streams

Returns
A vector containing all data stream identifiers.

◆ LoadDefaults()

void DdtConnectionManager::LoadDefaults ( )
protected

Loads default values for configuration parameters.

◆ PublishData()

void DdtConnectionManager::PublishData ( const std::string &  data_stream_identifier)
override

This function

  1. notifies the memory reader that new data was written into shared memory by a publisher
  2. fetches the latest data packet using the memory accessor
  3. publishes the data packet over network using the DdtDataProducer object
  4. notifies all local subscribers that new data is available

if there is at least one remote subscriber, read the packet from shared memory and publish it over the network

◆ ReadIni()

void DdtConnectionManager::ReadIni ( )
protected

Reads the databroker configuration file.

An exception is thrown if the config file does not exist. In that case the default values are used instead.

◆ RegisterPublisher()

int32_t DdtConnectionManager::RegisterPublisher ( const std::string &  data_stream_identifier,
int32_t  latency,
int32_t  deadline,
int32_t  max_data_sample_size,
int32_t  number_of_samples,
bool  compute_checksum,
const std::string &  publishing_uri 
)
override

This function is called from Publisher applications by the DdtDataPublishers. For each Publisher application a DdtDataProducer object is created which are stored in a producer map. This also triggers the registration process in the DdtMemoryManager.

Reuse the publishing URI if the publisher was restarted.

It takes some time to establish communication mechanism on startup of the publishers and subscribers (see MAL API description). Wait after publisher has been instantiated. This does NOT guarantee that the first n message will not be lost.

◆ RegisterRemoteSubscriber()

int32_t DdtConnectionManager::RegisterRemoteSubscriber ( const std::string &  remote_broker,
const std::string &  subscriber_uuid,
const std::string &  data_stream_identifier,
const int32_t  latency,
const int32_t  deadline 
)
override

This function is called from remote brokers to register a remote subscriber. This triggers the registration process in the DdtMemoryManager and creates a new DdtDataConsumer object.

◆ RegisterSubscriber()

int32_t DdtConnectionManager::RegisterSubscriber ( const std::string &  subscriber_uuid,
const std::string &  data_stream_identifier,
const std::string &  remote_broker_uri,
int32_t  latency,
int32_t  deadline 
)
override

Registers Subscriber applications. Distinguishes between local and remote subscribers. if local:

  • register the subscriber at the DdtMemoryManager
  • create a DdtDataConsumer object and store it in a consumer map if remote:
  • create a MAL client and ask remote broker for the required buffer size
  • register the subscriber at the DdtMemoryManager
  • register the subscriber at the remote broker
  • create a DdtDataConsumer object and store it in a consumer map
  • start MAL subscription on the DdtDataConsumer

◆ UnregisterPublisher()

int32_t DdtConnectionManager::UnregisterPublisher ( const std::string &  data_stream_identifier)
override

This function stops the memory reader, unregisters the Publisher application from the DdtMemoryManager and deletes the DdtDataProducer.

◆ UnregisterPublishers()

int32_t DdtConnectionManager::UnregisterPublishers ( )

Unregisters all publishers.

◆ UnregisterSubscriber()

int32_t DdtConnectionManager::UnregisterSubscriber ( const std::string &  data_stream_identifier,
const std::string &  subscriber_uuid 
)
override

Unregisters subscribers from their brokers. To unregister from remote brokers a MAL client is created. If it is the last subscriber of a data stream:

Set publishing uri to "" if there are no remote subscribers.

◆ UnregisterSubscribers()

int32_t DdtConnectionManager::UnregisterSubscribers ( )

Unregisters all subscribers.

◆ UpdateHeartbeat()

void DdtConnectionManager::UpdateHeartbeat ( const std::string &  identifier)
override

Updates the timestamp in the client map.

◆ UpdateStatistics()

void DdtConnectionManager::UpdateStatistics ( const std::string &  data_stream_identifier,
const int32_t  datavec_size,
const uint64_t  source_timestamp 
)
override

Updates the statistic counters

Parameters
data_stream_identifierThe data stream identifier.
datavec_sizeThe size of the image data.
source_timestampThe source timestamp.

Member Data Documentation

◆ heartbeat_interval

int32_t ddt::DdtConnectionManager::heartbeat_interval
protected

◆ HEARTBEAT_INTERVAL_DEFAULT

const int32_t ddt::DdtConnectionManager::HEARTBEAT_INTERVAL_DEFAULT = 1
protected

◆ heartbeat_timeout

int32_t ddt::DdtConnectionManager::heartbeat_timeout
protected

◆ HEARTBEAT_TIMEOUT_DEFAULT

const int32_t ddt::DdtConnectionManager::HEARTBEAT_TIMEOUT_DEFAULT = 10
protected

◆ reply_time

int32_t ddt::DdtConnectionManager::reply_time
protected

◆ REPLY_TIME_DEFAULT

const int32_t ddt::DdtConnectionManager::REPLY_TIME_DEFAULT = 6
protected

◆ shm_timeout

int32_t ddt::DdtConnectionManager::shm_timeout
protected

◆ SHM_TIMEOUT_DEFAULT

const int32_t ddt::DdtConnectionManager::SHM_TIMEOUT_DEFAULT = 10
protected

◆ waiting_time

int32_t ddt::DdtConnectionManager::waiting_time
protected

◆ WAITING_TIME_DEFAULT

const int32_t ddt::DdtConnectionManager::WAITING_TIME_DEFAULT = 1000
protected

The documentation for this class was generated from the following files: