Jump to content
OpenSplice DDS Forum

kuolty

Members
  • Content count

    15
  • Joined

  • Last visited

About kuolty

  • Rank
    Member

Profile Information

  • Company
    Lika Tech
  1. kuolty

    How to receive only not read samples once

    I think it's because every time I run publisher then DDS considers that it is a new node appeared and sends historical data to him? Am I right that DDS can't distinguish data readers (nodes)? Or it is possible to assign some kind of name/id to the data reader to make it known to DDS?
  2. Hi I was inspecting durability example with persistent storage enabled and found that late-joined subscriber receives previously posted (saved to the persistent storage) samples each time it runs. It frustrates me because according data state we have to receive only not read samples: dds::sub::status::DataState ds; ds << dds::sub::status::SampleState::not_read() << dds::sub::status::ViewState::new_view() << dds::sub::status::InstanceState::any(); My steps: cd $OSPL_HOME/examples/dcps/Durability/isocpp Run ./publisher persistent false false and wait 10 seconds before exit to make sure that data is stored in the persistent storage Run ./subscriber persistent false false So, what data state I have to use to receive not read sample only once?
  3. I want to receive messages in the order they were published. So I added dds::core::policy::DestinationOrder::SourceTimestamp() to my topic qos. Now I can't receive any message. What I am doing wrong? There are no errors in log files. dds::domain::DomainParticipant dp(org::opensplice::domain::default_id()); dds::topic::qos::TopicQos topicQosCommand = dp.default_topic_qos() << dds::core::policy::Durability::Persistent() << dds::core::policy::Reliability::Reliable() << dds::core::policy::DestinationOrder::SourceTimestamp(); dds::topic::Topic<Command> topicCommand(dp, "Command", topicQosCommand); dds::sub::qos::SubscriberQos subQos_Command = dp.default_subscriber_qos(); dds::sub::Subscriber sub_command(dp, subQos_Command); dds::sub::qos::DataReaderQos drqosCommand = topicCommand.qos(); dds::sub::DataReader<NSQC442::Command> drCommand(sub_command, topicCommand, drqosCommand);
  4. One more thing. I've just added support for the second topic "UnitAddress" to subscriber application and now I don't receive command sample. Code below is works, but if I simply uncomment not active code then everything stops working. I don't receive command sample from publisher. It's very strange. I have only added the same code for another topic! There are no warnings or errors in log files. #include <csignal> #include <thread> #include "Command_DCPS.hpp" #include "UnitAddress_DCPS.hpp" struct ShutdownHandler { static bool stop; static void shutdown(int /*sig*/) { std::signal(SIGINT, NULL); stop = true; } }; bool ShutdownHandler::stop = false; int main(int argc, char** argv) { std::signal(SIGINT, &ShutdownHandler::shutdown); std::signal(SIGTERM, &ShutdownHandler::shutdown); dds::core::cond::WaitSet ws; auto ds = dds::sub::status::DataState::new_data(); int result = 0; try { dds::domain::DomainParticipant dp(org::opensplice::domain::default_id()); dds::topic::qos::TopicQos topicQosCommand = dp.default_topic_qos() << dds::core::policy::Durability::Persistent() << dds::core::policy::Reliability::Reliable() << dds::core::policy::History::KeepAll(); // dds::topic::qos::TopicQos topicQosUnit = dp.default_topic_qos() // << dds::core::policy::Durability::Persistent() // << dds::core::policy::Reliability::Reliable() // << dds::core::policy::History::KeepAll(); dds::topic::Topic<Command> topicCommand(dp, "Command", topicQosCommand); // dds::topic::Topic<UnitAddress> topicUnitAddress(dp, "UnitAddress", topicQosUnit); dds::sub::qos::SubscriberQos subQos_Command = dp.default_subscriber_qos(); // dds::sub::qos::SubscriberQos subQos_Unit = dp.default_subscriber_qos(); dds::sub::Subscriber sub_command(dp, subQos_Command); // dds::sub::Subscriber sub_unit(dp, subQos_Unit); dds::sub::qos::DataReaderQos drqosCommand = topicCommand.qos(); // dds::sub::qos::DataReaderQos drqosUnit = topicUnitAddress.qos(); dds::sub::DataReader<Command> drCommand(sub_command, topicCommand, drqosCommand); // dds::sub::DataReader<UnitAddress> drUnit(sub_unit, topicUnitAddress, drqosUnit); drCommand.wait_for_historical_data(dds::core::Duration(4)); // drUnit.wait_for_historical_data(dds::core::Duration(4)); dds::sub::cond::ReadCondition rcCommand(drCommand, ds); // dds::sub::cond::ReadCondition rcUnit(drUnit, ds); // Attach the condition ws += rcCommand; // ws += rcUnit; while (!ShutdownHandler::stop) { try { auto condSeq = ws.wait(dds::core::Duration(1)); for(auto& cond: condSeq) { if (cond == rcCommand) { std::cout << "Command rc triggered" << std::endl; auto commandSamples = drCommand.select().state(ds).take(); if (0 == commandSamples.length()) { std::cout << "Command sample empty" << std::endl; } for (auto& commandSample : commandSamples) { if (commandSample.info().valid()) { std::cout << "Command sample is ok" << std::endl; } else { std::cout << "Command sample is not valid" << std::endl; } } // } else if (cond == rcUnit) { // std::cout << "Unit rc triggered" << std::endl; // auto unitSamples = drUnit.select().state(ds).take(); // // for (auto& unitSample : unitSamples) { // if (unitSample.info().valid()) { // std::cout << "Unit sample is ok" << std::endl; // } else { // std::cout << "Unit sample is not valid" << std::endl; // } // } // } else { std::cout << "Unknown rc triggered" << std::endl; } } } catch(dds::core::TimeoutError& e) { (void)e; } } std::cout << "Shutting down ... " << std::endl; } catch (const dds::core::Exception& e) { std::cerr << "ERROR: Exception: " << e.what() << std::endl; result = 1; } return result; }
  5. If I change subscriber TopicQos to dds::core::policy::History::KeepAll() then warnings are eliminated in ospl-info.log file and I receive sample message only once.
  6. I have data reader and data writer. I want late-joiners receive all not read data from persistent storage (only once!). My steps: I run only publisher application which writes one data sample to the topic. Then it waits about 10 seconds before exit to make sure that no heartbeat is detected and my data is written to the persistent storage. At this point everything works fine and I can see how persistent storage (a folder with XML files) is populated by new data. I run subscriber application for the first time and wait ... Nothing happens. There are no new data from persistent storage but a few warning are added to the ospl-info.log: ======================================================================================== Report : WARNING Date : Thu May 18 18:30:17 +03 2017 Description : Create Topic <Command> failed: Unmatching QoS Policy: 'History'. Node : DELLE5470 Process : sub1 <6153> Thread : durability 7f0069807700 Internals : V6.4.151118OSS///v_topicNew/v_topic.c/428/0/1495121417.776491192 ======================================================================================== Report : WARNING Date : Thu May 18 18:30:17 +03 2017 Description : Create kernel entity failed. For Topic: <Command> Node : DELLE5470 Process : sub1 <6153> Thread : durability 7f0069807700 Internals : V6.4.151118OSS///u_topicNew/u_topic.c/74/0/1495121417.776645897 ======================================================================================== Here we come to the first question. What's wrong with QoS? The only difference between topic qos for data writer and data reader is that topic qos for data reader is explicitly set to dds::core::policy::History::KeepLast(1) while data writer topic qos is set to dds::core::policy::History::KeepAll(). I run subscriber application for the second time and finally receive message "Command sample is ok" so it means that data reader received a new message. And no more warning in log file about unmatching qos policy. Publisher: #include <thread> #include "Command_DCPS.hpp" int main(int argc, char** argv) { int result = 0; try { Command command1(1, CommandType::READY); dds::domain::DomainParticipant dp(org::opensplice::domain::default_id()); dds::topic::qos::TopicQos topicQos = dp.default_topic_qos() << dds::core::policy::Durability::Persistent() << dds::core::policy::Reliability::Reliable() << dds::core::policy::History::KeepAll(); // << dds::core::policy::DestinationOrder::SourceTimestamp(); dds::topic::Topic<Command> topicCommand(dp, "Command", topicQos); dds::pub::qos::PublisherQos pubQos_command = dp.default_publisher_qos(); dds::pub::Publisher pub_command(dp, pubQos_command); dds::pub::qos::DataWriterQos dwqosCommand = topicCommand.qos(); dwqosCommand << dds::core::policy::WriterDataLifecycle::ManuallyDisposeUnregisteredInstances(); dds::pub::DataWriter<Command> dw_command(pub_command, topicCommand, dwqosCommand); dw_command << command1; std::cout << "Wait for 10 seconds to let samples to be added to the persistent storage" << std::endl; std::this_thread::sleep_for(std::chrono::seconds(10)); } catch (const dds::core::Exception& e) { std::cerr << "ERROR: Exception: " << e.what() << std::endl; result = 1; } return result; } Subscriber: #include <csignal> #include <thread> #include "Command_DCPS.hpp" struct ShutdownHandler { static bool stop; static void shutdown(int /*sig*/) { std::signal(SIGINT, NULL); stop = true; } }; bool ShutdownHandler::stop = false; int main(int argc, char** argv) { std::signal(SIGINT, &ShutdownHandler::shutdown); std::signal(SIGTERM, &ShutdownHandler::shutdown); dds::core::cond::WaitSet ws; auto ds = dds::sub::status::DataState::new_data(); int result = 0; try { dds::domain::DomainParticipant dp(org::opensplice::domain::default_id()); dds::topic::qos::TopicQos topicQos = dp.default_topic_qos() << dds::core::policy::Durability::Persistent() << dds::core::policy::Reliability::Reliable() << dds::core::policy::History::KeepLast(1); // << dds::core::policy::DestinationOrder::SourceTimestamp(); dds::topic::Topic<Command> topicCommand(dp, "Command", topicQos); dds::sub::qos::SubscriberQos subQos_Command = dp.default_subscriber_qos(); dds::sub::Subscriber sub_command(dp, subQos_Command); dds::sub::qos::DataReaderQos drqosCommand = topicCommand.qos(); dds::sub::DataReader<Command> drCommand(sub_command, topicCommand, drqosCommand); drCommand.wait_for_historical_data(dds::core::Duration(4)); dds::sub::cond::ReadCondition rcCommand(drCommand, ds); // Attach the condition ws += rcCommand; while (!ShutdownHandler::stop) { try { ws.wait(dds::core::Duration(1)); auto commandSamples = drCommand.select().state(ds).take(); for (auto& commandSample : commandSamples) { if (commandSample.info().valid()) { std::cout << "Command sample is ok" << std::endl; } else { std::cout << "Command sample is not valid" << std::endl; } } } catch(dds::core::TimeoutError& e) { } } std::cout << "Shutting down ... " << std::endl; } catch (const dds::core::Exception& e) { std::cerr << "ERROR: Exception: " << e.what() << std::endl; result = 1; } return result; }
  7. Read conditions attached to the waitset will be dispatched in exactly the same order they were attached. Am I correct?
  8. It still doesn't clear how can I receive all samples regardless of durability service settings. Not using payload in keylist.
  9. Hi Let's assume that we want to listen DDS messages and forward some of them to the cloud (Amazon IoT). For now I have N data readers for N topics I am interested in. Is it possible to serialize topic sample to string by some way? I know that it is possible to use Google Protobuf in place of OMG IDL. Will it allow to easily convert C data structure to standardized protobuf string for later transmitting over the network? Thanks.
  10. I'm curious what examples are you talking about because my code is based on examples/dcps/HelloWorld/isocpp/implementation.cpp: /** A dds::domain::DomainParticipant is created for the default domain. */ dds::domain::DomainParticipant dp(org::opensplice::domain::default_id()); /** The Durability::Transient policy is specified as a dds::topic::qos::TopicQos * so that even if the subscriber does not join until after the sample is written * then the DDS will still retain the sample for it. The Reliability::Reliable * policy is also specified to guarantee delivery. */ dds::topic::qos::TopicQos topicQos = dp.default_topic_qos() << dds::core::policy::Durability::Transient() << dds::core::policy::Reliability::Reliable(); /** A dds::topic::Topic is created for our sample type on the domain participant. */ dds::topic::Topic<HelloWorldData::Msg> topic(dp, "HelloWorldData_Msg", topicQos); /** A dds::pub::Publisher is created on the domain participant. */ std::string name = "HelloWorld example"; dds::pub::qos::PublisherQos pubQos = dp.default_publisher_qos() << dds::core::policy::Partition(name); dds::pub::Publisher pub(dp, pubQos); /** The dds::pub::qos::DataWriterQos is derived from the topic qos and the * WriterDataLifecycle::ManuallyDisposeUnregisteredInstances policy is * specified as an addition. This is so the publisher can optionally be run (and * exit) before the subscriber. It prevents the middleware default 'clean up' of * the topic instance after the writer deletion, this deletion implicitly performs * DataWriter::unregister_instance */ dds::pub::qos::DataWriterQos dwqos = topic.qos(); dwqos << dds::core::policy::WriterDataLifecycle::ManuallyDisposeUnregisteredInstances(); /** A dds::pub::DataWriter is created on the Publisher & Topic with the modififed Qos. */ dds::pub::DataWriter<HelloWorldData::Msg> dw(pub, topic, dwqos);
  11. Hi Thanks for your comment. I understand drawbacks of chosen fix, but I don't now how to receive all history of changes (not only the last updated value) for userID #1 without adding payload as extra key. Sure, I can extend fields by adding timestamp field but it will duplicate DDS timestamp and I'm not sure that it will be a better solution.
  12. The problem was with keylist in IDL file. That's how my IDL file was originally looking. module HelloWorldData { struct Msg { /** User ID */ long userID; /** message */ string message; }; #pragma keylist Msg userID }; And that's how it must look: module HelloWorldData { struct Msg { /** User ID */ long userID; /** message */ string message; }; #pragma keylist Msg userID message };
  13. I want late joined subscriber to read all previously written samples. My steps: run publisher check persistent storage file to ensure that data was really saved run subscriber I don't know why but I receive only the latest published sample nevertheless I specified to keep all history. Writer.cpp HelloWorldData::Msg msgInstance1(1, "Hello World 1"); HelloWorldData::Msg msgInstance2(1, "Hello World 2"); dds::domain::DomainParticipant dp(org::opensplice::domain::default_id()); dds::topic::qos::TopicQos topicQos = dp.default_topic_qos() << dds::core::policy::Durability::Persistent() << dds::core::policy::Reliability::Reliable() << dds::core::policy::History::KeepAll(); dds::topic::Topic<HelloWorldData::Msg> topic(dp, "THelloWorldData", topicQos); std::string name = "HelloWorldPartition"; dds::pub::qos::PublisherQos pubQos = dp.default_publisher_qos() << dds::core::policy::Partition(name); dds::pub::Publisher pub(dp, pubQos); dds::pub::qos::DataWriterQos dwqos = topic.qos(); dwqos << dds::core::policy::WriterDataLifecycle::ManuallyDisposeUnregisteredInstances(); dds::pub::DataWriter<HelloWorldData::Msg> dw(pub, topic, dwqos); dw << msgInstance1; dw << msgInstance2; // wait until heartbeat timed out and published instances stored to the persistent storage std::this_thread::sleep_for(std::chrono::seconds(10)); Reader: auto ds = dds::sub::status::DataState::new_data(); ds << dds::sub::status::SampleState::not_read(); dds::domain::DomainParticipant dp(org::opensplice::domain::default_id()); dds::topic::qos::TopicQos topicQos = dp.default_topic_qos() << dds::core::policy::Reliability::Reliable() << dds::core::policy::Durability::Persistent() << dds::core::policy::History::KeepAll(); dds::topic::Topic<HelloWorldData::Msg> topic(dp, "THelloWorldData", topicQos); std::string name = "HelloWorldPartition"; dds::sub::qos::SubscriberQos subQos = dp.default_subscriber_qos() << dds::core::policy::Partition(name); dds::sub::Subscriber sub(dp, subQos); dds::sub::qos::DataReaderQos drqos = topic.qos(); dds::sub::DataReader<HelloWorldData::Msg> dr(sub, topic, drqos); //dr.wait_for_historical_data(dds::core::Duration(4)); dds::sub::cond::ReadCondition rc(dr, ds); // Attach the condition ws += rc; while (!stop) { try { ws.wait(dds::core::Duration(1)); auto samples = dr.select().state(ds).take(); std::for_each(samples.begin(), samples.end(),[](const dds::sub::Sample<HelloWorldData::Msg>& sample) { if (sample.info().valid()) { std::cout << sample.data().message() << std::endl; } else { std::cout << "Sample is not valid" << std::endl; } }); } catch(dds::core::TimeoutError& e) { (void)e; } }
  14. Hi I have data writer with transient-local durability and data reader with persistent durability and keep-all history. What I am trying to achieve is to utilize DDS durability service to implement queue with topic samples that must be persistently stored until they would be taken from reader's cache (queue in my terms). So, writer can send data even if they are considered to be volatile but if reader is alive and it receives the data it must store them in the cache for postponed processing. Writer: HelloWorldData::Msg msgInstance(1, "Hello World"); dds::domain::DomainParticipant dp(org::opensplice::domain::default_id()); dds::topic::qos::TopicQos topicQos = dp.default_topic_qos() << dds::core::policy::Durability::TransientLocal() << dds::core::policy::Reliability::Reliable(); dds::topic::Topic<HelloWorldData::Msg> topic(dp, "THelloWorldData", topicQos); std::string name = "HelloWorldPartition"; dds::pub::qos::PublisherQos pubQos = dp.default_publisher_qos() << dds::core::policy::Partition(name); dds::pub::Publisher pub(dp, pubQos); dds::pub::qos::DataWriterQos dwqos = topic.qos(); dwqos << dds::core::policy::WriterDataLifecycle::ManuallyDisposeUnregisteredInstances(); /** A dds::pub::DataWriter is created on the Publisher & Topic with the modififed Qos. */ dds::pub::DataWriter<HelloWorldData::Msg> dw(pub, topic, dwqos); dw << msgInstance; std::cout << "Data reader is started at this point" << std::endl; std::this_thread::sleep_for(std::chrono::seconds(10)); Reader: class HelloListener : public dds::sub::NoOpDataReaderListener<HelloWorldData::Msg> { public: void on_data_available(dds::sub::DataReader<HelloWorldData::Msg>& dr) { std::cout << "----------on_data_available-----------" << std::endl; } void on_liveliness_changed(dds::sub::DataReader<HelloWorldData::Msg>& dr, const dds::core::status::LivelinessChangedStatus& status) { std::cout << "!!! Liveliness Changed !!!" << std::endl; } virtual void on_sample_rejected( dds::sub::DataReader<HelloWorldData::Msg>& reader, const dds::core::status::SampleRejectedStatus& status) { std::cout << "----------on_sample_rejected-----------" << std::endl; } virtual void on_requested_deadline_missed( dds::sub::DataReader<HelloWorldData::Msg>& reader, const dds::core::status::RequestedDeadlineMissedStatus& status) { std::cout << "----------on_requested_deadline_missed-----------" << std::endl; } virtual void on_requested_incompatible_qos( dds::sub::DataReader<HelloWorldData::Msg>& reader, const dds::core::status::RequestedIncompatibleQosStatus& status) { } virtual void on_subscription_matched( dds::sub::DataReader<HelloWorldData::Msg>& reader, const dds::core::status::SubscriptionMatchedStatus& status) { std::cout << "----------on_subscription_matched-----------" << std::endl; } virtual void on_sample_lost( dds::sub::DataReader<HelloWorldData::Msg>& reader, const dds::core::status::SampleLostStatus& status) { std::cout << "----------on_sample_lost-----------" << std::endl; } }; HelloListener listener; dds::domain::DomainParticipant dp(org::opensplice::domain::default_id()); dds::topic::qos::TopicQos topicQos = dp.default_topic_qos() << dds::core::policy::Reliability::Reliable() << dds::core::policy::Durability::Persistent() << dds::core::policy::History::KeepAll(); dds::topic::Topic<HelloWorldData::Msg> topic(dp, "THelloWorldData", topicQos); std::string name = "HelloWorldPartition"; dds::sub::qos::SubscriberQos subQos= dp.default_subscriber_qos() << dds::core::policy::Partition(name); dds::sub::Subscriber sub(dp, subQos); dds::sub::qos::DataReaderQos drqos = topic.qos(); dds::sub::DataReader<HelloWorldData::Msg> dr(sub, topic, drqos, &listener); //dds::core::Duration timeout(4, 10000000); //dr.wait_for_historical_data(timeout); while (true) { std::this_thread::sleep_for(std::chrono::seconds(1)); }
×