Jump to content
OpenSplice DDS Forum
Sign in to follow this  
kuolty

Unmatching policy and receiving data from persistent storage from second call of application

Recommended Posts

I have data reader and data writer. I want late-joiners receive all not read data from persistent storage (only once!).

My steps:

  1. I run only  publisher application which writes one data sample to the topic. Then it waits about 10 seconds before exit to make sure that no heartbeat is detected and my data is written to the persistent storage. At this point everything works fine and I can see how persistent storage (a folder with XML files) is populated by new data.
  2. I run subscriber application for the first time and wait ... Nothing happens. There are no new data from persistent storage but a few warning are added to the ospl-info.log:
    ========================================================================================
    Report      : WARNING
    Date        : Thu May 18 18:30:17 +03 2017
    Description : Create Topic <Command> failed: Unmatching QoS Policy: 'History'.
    Node        : DELLE5470
    Process     : sub1 <6153>
    Thread      : durability 7f0069807700
    Internals   : V6.4.151118OSS///v_topicNew/v_topic.c/428/0/1495121417.776491192
    ========================================================================================
    Report      : WARNING
    Date        : Thu May 18 18:30:17 +03 2017
    Description : Create kernel entity failed. For Topic: <Command>
    Node        : DELLE5470
    Process     : sub1 <6153>
    Thread      : durability 7f0069807700
    Internals   : V6.4.151118OSS///u_topicNew/u_topic.c/74/0/1495121417.776645897
    ========================================================================================
    

    Here we come to the first question. What's wrong with QoS? The only difference between topic qos for data writer and data reader is that topic qos for data reader is explicitly set to dds::core::policy::History::KeepLast(1) while data writer topic qos is set to dds::core::policy::History::KeepAll().

  3.  

    I run subscriber application for the second time and finally receive message "Command sample is ok" so it means that data reader received a new message. And no more warning in log file about unmatching qos policy.

Publisher:

#include <thread>
#include "Command_DCPS.hpp"

int main(int argc, char** argv)
{
    
    int result = 0;
    try {
        
        Command command1(1, CommandType::READY);

        dds::domain::DomainParticipant dp(org::opensplice::domain::default_id());

        dds::topic::qos::TopicQos topicQos = dp.default_topic_qos()
                << dds::core::policy::Durability::Persistent()
                << dds::core::policy::Reliability::Reliable()
                << dds::core::policy::History::KeepAll();
//                << dds::core::policy::DestinationOrder::SourceTimestamp();
        
        dds::topic::Topic<Command> topicCommand(dp, "Command", topicQos);
        dds::pub::qos::PublisherQos pubQos_command = dp.default_publisher_qos();

        dds::pub::Publisher pub_command(dp, pubQos_command);
        dds::pub::qos::DataWriterQos dwqosCommand = topicCommand.qos();
        
        dwqosCommand << dds::core::policy::WriterDataLifecycle::ManuallyDisposeUnregisteredInstances();

        dds::pub::DataWriter<Command> dw_command(pub_command, topicCommand, dwqosCommand);
    
        dw_command << command1;
        
        std::cout << "Wait for 10 seconds to let samples to be added to the persistent storage" << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(10));
        
    } catch (const dds::core::Exception& e) {
        std::cerr << "ERROR: Exception: " << e.what() << std::endl;
        result = 1;
    }
    
    return result;
}

Subscriber:

#include <csignal>
#include <thread>
#include "Command_DCPS.hpp"

struct ShutdownHandler
{
    static bool stop;
    
    static void shutdown(int /*sig*/)
    {
        std::signal(SIGINT, NULL);
        stop = true;
    }
};

bool ShutdownHandler::stop = false;


int main(int argc, char** argv)
{
    std::signal(SIGINT, &ShutdownHandler::shutdown);
    std::signal(SIGTERM, &ShutdownHandler::shutdown);
    
    dds::core::cond::WaitSet ws;
    auto ds = dds::sub::status::DataState::new_data();
    
    int result = 0;
    try {
        
        dds::domain::DomainParticipant dp(org::opensplice::domain::default_id());

        dds::topic::qos::TopicQos topicQos = dp.default_topic_qos()
                << dds::core::policy::Durability::Persistent()
                << dds::core::policy::Reliability::Reliable()
                << dds::core::policy::History::KeepLast(1);
//                << dds::core::policy::DestinationOrder::SourceTimestamp();
        
        dds::topic::Topic<Command> topicCommand(dp, "Command", topicQos);
        
        dds::sub::qos::SubscriberQos subQos_Command = dp.default_subscriber_qos();
        dds::sub::Subscriber sub_command(dp, subQos_Command);        

        dds::sub::qos::DataReaderQos drqosCommand = topicCommand.qos();
        
        dds::sub::DataReader<Command> drCommand(sub_command, topicCommand, drqosCommand);
        
        drCommand.wait_for_historical_data(dds::core::Duration(4));
        
        dds::sub::cond::ReadCondition rcCommand(drCommand, ds);

        // Attach the condition
        ws += rcCommand;
        
        while (!ShutdownHandler::stop) {
            
            try {
                ws.wait(dds::core::Duration(1));
                
                auto commandSamples = drCommand.select().state(ds).take();
                
                for (auto& commandSample : commandSamples) {
                    if (commandSample.info().valid()) {
                        std::cout << "Command sample is ok" << std::endl;
                    } else {
                        std::cout << "Command sample is not valid" << std::endl;
                    }
                }


            } catch(dds::core::TimeoutError& e) {
            }
            
        }   
        
        std::cout << "Shutting down ... " << std::endl;
        
    } catch (const dds::core::Exception& e) {
        std::cerr << "ERROR: Exception: " << e.what() << std::endl;
        result = 1;
    }
    
    return result;
}

Share this post


Link to post
Share on other sites

If I change subscriber TopicQos to dds::core::policy::History::KeepAll() then warnings are eliminated in ospl-info.log file and I receive sample message only once.

Share this post


Link to post
Share on other sites

One more thing. I've just added support for the second topic "UnitAddress" to subscriber application and now I don't receive command sample. Code below is works, but if I simply uncomment not active code then everything stops working. I don't receive command sample from publisher. It's very strange. I have only added the same code for another topic! There are no warnings or errors in log files.

#include <csignal>
#include <thread>
#include "Command_DCPS.hpp"
#include "UnitAddress_DCPS.hpp"

struct ShutdownHandler
{
    static bool stop;
    
    static void shutdown(int /*sig*/)
    {
        std::signal(SIGINT, NULL);
        stop = true;
    }
};

bool ShutdownHandler::stop = false;


int main(int argc, char** argv)
{
    std::signal(SIGINT, &ShutdownHandler::shutdown);
    std::signal(SIGTERM, &ShutdownHandler::shutdown);
    
    dds::core::cond::WaitSet ws;
    auto ds = dds::sub::status::DataState::new_data();
    
    int result = 0;
    try {
        
        dds::domain::DomainParticipant dp(org::opensplice::domain::default_id());

        dds::topic::qos::TopicQos topicQosCommand = dp.default_topic_qos()
                << dds::core::policy::Durability::Persistent()
                << dds::core::policy::Reliability::Reliable()
                << dds::core::policy::History::KeepAll();
        
//        dds::topic::qos::TopicQos topicQosUnit = dp.default_topic_qos()
//                << dds::core::policy::Durability::Persistent()
//                << dds::core::policy::Reliability::Reliable()
//                << dds::core::policy::History::KeepAll();
        
        dds::topic::Topic<Command> topicCommand(dp, "Command", topicQosCommand);
//        dds::topic::Topic<UnitAddress> topicUnitAddress(dp, "UnitAddress", topicQosUnit);
        
        dds::sub::qos::SubscriberQos subQos_Command = dp.default_subscriber_qos();
//        dds::sub::qos::SubscriberQos subQos_Unit = dp.default_subscriber_qos();
        dds::sub::Subscriber sub_command(dp, subQos_Command);        
//        dds::sub::Subscriber sub_unit(dp, subQos_Unit);        

        dds::sub::qos::DataReaderQos drqosCommand = topicCommand.qos();
//        dds::sub::qos::DataReaderQos drqosUnit = topicUnitAddress.qos();
        
        dds::sub::DataReader<Command> drCommand(sub_command, topicCommand, drqosCommand);
//        dds::sub::DataReader<UnitAddress> drUnit(sub_unit, topicUnitAddress, drqosUnit);
        
        
        drCommand.wait_for_historical_data(dds::core::Duration(4));
//        drUnit.wait_for_historical_data(dds::core::Duration(4));
        
        dds::sub::cond::ReadCondition rcCommand(drCommand, ds);
//        dds::sub::cond::ReadCondition rcUnit(drUnit, ds);

        // Attach the condition
        ws += rcCommand;
//        ws += rcUnit;
        
        while (!ShutdownHandler::stop) {
            
            try {
                auto condSeq = ws.wait(dds::core::Duration(1));
                
                for(auto& cond: condSeq) {

                    if (cond == rcCommand) {
                        std::cout << "Command rc triggered" << std::endl;
                        auto commandSamples = drCommand.select().state(ds).take();
                        if (0 == commandSamples.length()) {
                            std::cout << "Command sample empty" << std::endl;
                            
                        }
                        for (auto& commandSample : commandSamples) {
                            if (commandSample.info().valid()) {
                                std::cout << "Command sample is ok" << std::endl;
                            } else {
                                std::cout << "Command sample is not valid" << std::endl;
                            }
                        }
//                    } else if (cond == rcUnit) {
//                        std::cout << "Unit rc triggered" << std::endl;
//                        auto unitSamples = drUnit.select().state(ds).take();
//
//                        for (auto& unitSample : unitSamples) {
//                            if (unitSample.info().valid()) {
//                                std::cout << "Unit sample is ok" << std::endl;
//                            } else {
//                                std::cout << "Unit sample is not valid" << std::endl;
//                            }
//                        }
//                        
                    } else {
                        std::cout << "Unknown rc triggered" << std::endl;
                    }
                    
                }

            } catch(dds::core::TimeoutError& e) {
                (void)e;
            }
            
        }   
        
        std::cout << "Shutting down ... " << std::endl;
        
    } catch (const dds::core::Exception& e) {
        std::cerr << "ERROR: Exception: " << e.what() << std::endl;
        result = 1;
    }
    
    return result;
}

Share this post


Link to post
Share on other sites

Create an account or sign in to comment

You need to be a member in order to leave a comment

Create an account

Sign up for a new account in our community. It's easy!

Register a new account

Sign in

Already have an account? Sign in here.

Sign In Now
Sign in to follow this  

×