Jump to content
OpenSplice DDS Forum
kuolty

Is it possible to make persistent queue for the reader only?

Recommended Posts

kuolty   

Hi

 

I have data writer with transient-local durability and data reader with persistent durability and keep-all history. What I am trying to achieve is to utilize DDS durability service to implement queue with topic samples that must be persistently stored until they would be taken from reader's cache (queue in my terms). So, writer can send data even if they are considered to be volatile but if reader is alive and it receives the data it must store them in the cache for postponed processing.

 

Writer:

        HelloWorldData::Msg msgInstance(1, "Hello World");

        dds::domain::DomainParticipant dp(org::opensplice::domain::default_id());

        dds::topic::qos::TopicQos topicQos
                = dp.default_topic_qos()
                << dds::core::policy::Durability::TransientLocal()
                << dds::core::policy::Reliability::Reliable();

        dds::topic::Topic<HelloWorldData::Msg> topic(dp, "THelloWorldData", topicQos);

        std::string name = "HelloWorldPartition";
        dds::pub::qos::PublisherQos pubQos
                = dp.default_publisher_qos()
                << dds::core::policy::Partition(name);
        dds::pub::Publisher pub(dp, pubQos);
        
        dds::pub::qos::DataWriterQos dwqos = topic.qos();
        dwqos << dds::core::policy::WriterDataLifecycle::ManuallyDisposeUnregisteredInstances();

        /** A dds::pub::DataWriter is created on the Publisher & Topic with the modififed Qos. */
        dds::pub::DataWriter<HelloWorldData::Msg> dw(pub, topic, dwqos);
        
        dw << msgInstance;

        std::cout << "Data reader is started at this point" << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(10));

Reader:

    class HelloListener : public dds::sub::NoOpDataReaderListener<HelloWorldData::Msg>
    {
    public:
        
        void on_data_available(dds::sub::DataReader<HelloWorldData::Msg>& dr)
        {
            std::cout << "----------on_data_available-----------" << std::endl;
        }

        void on_liveliness_changed(dds::sub::DataReader<HelloWorldData::Msg>& dr, const dds::core::status::LivelinessChangedStatus& status)
        {
            std::cout << "!!! Liveliness Changed !!!" << std::endl;
        }
        
        
        virtual void on_sample_rejected(
            dds::sub::DataReader<HelloWorldData::Msg>& reader,
            const dds::core::status::SampleRejectedStatus& status)
        {
            std::cout << "----------on_sample_rejected-----------" << std::endl;
        }
        
        virtual void on_requested_deadline_missed(
            dds::sub::DataReader<HelloWorldData::Msg>& reader,
            const dds::core::status::RequestedDeadlineMissedStatus& status)
        {
            std::cout << "----------on_requested_deadline_missed-----------" << std::endl;
        }

        virtual void on_requested_incompatible_qos(
            dds::sub::DataReader<HelloWorldData::Msg>& reader,
            const dds::core::status::RequestedIncompatibleQosStatus& status)
        {
        }
        
        virtual void on_subscription_matched(
            dds::sub::DataReader<HelloWorldData::Msg>& reader,
            const dds::core::status::SubscriptionMatchedStatus& status)
        {
            std::cout << "----------on_subscription_matched-----------" << std::endl;
        }

        virtual void on_sample_lost(
            dds::sub::DataReader<HelloWorldData::Msg>& reader,
            const dds::core::status::SampleLostStatus& status)
        {
            std::cout << "----------on_sample_lost-----------" << std::endl;
        }
        
    };
    

    HelloListener listener;
    dds::domain::DomainParticipant dp(org::opensplice::domain::default_id());
    
    dds::topic::qos::TopicQos topicQos = dp.default_topic_qos()
            << dds::core::policy::Reliability::Reliable()
            << dds::core::policy::Durability::Persistent()
            << dds::core::policy::History::KeepAll();
    
    dds::topic::Topic<HelloWorldData::Msg> topic(dp, "THelloWorldData", topicQos);
    
    std::string name = "HelloWorldPartition";

    dds::sub::qos::SubscriberQos subQos= dp.default_subscriber_qos()
            << dds::core::policy::Partition(name);
    dds::sub::Subscriber sub(dp, subQos);

    dds::sub::qos::DataReaderQos drqos = topic.qos();

    dds::sub::DataReader<HelloWorldData::Msg> dr(sub, topic, drqos, &listener);

    //dds::core::Duration timeout(4, 10000000); 
    //dr.wait_for_historical_data(timeout);

    while (true) {
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }

Share this post


Link to post
Share on other sites

some notes/suggestions:

  • if you want to decouple the lifecycle of the data from that of the application(s) you should exploit DDS's durability-support for TRANSIENT and/or PERSISTENT data
  • in order for that to work, you should set the TOPIC-level durability-QoS to either TRANSIENT or PERSISTENT (and configure a persistent-store location in the middleware configuration 'xml')
  • now its up to a writer to write samples either as volatile or non-volatile data, but only non-volatile/durable (i.e. TRANSIENT or PERSISTENT) data will be preserved by the middleware's durability-services (of which you need at least 1 instance).
  • Our community-edition doesn't support the federated-deployment where you could run a 'federation' somewhere (even without applications) who's durability-service would maintain durable data as well as provides late-joiners with historical data, but each 'single-process' application 'embeds' a durability-service 'out-of-the-box' so you'd need to have/keep at least 1 application running
  • now late-joining applications that create a TRANSIENT or PERSISTENT reader will receive historical data from the middleware (don't forget to call wait_for_historical_data() to get synchronized with the delivery of that non-volatile data to your late-joining reader)

Furthermore I noticed that you didn't create a default writer-QoS object but instead used the topic-QoS's for your writer .. thats not good practice as the topic-level QoS's don't provide for all writer (or reader for that matter) QoS's .. so best to follow the pattern of all our examples where you first create a set of default QoS's for your reader/writer, then copy the topic-level QoS's (copy_from_topic_Qos) to modify those QoS's for which you've created (system-wide) values on topic-level. Also noting that as durability operates independent from readers/writers, you have to 'notify' the required durability behavior to the middleware by registering the non-volatile topic(s) with the correct QoS policies for DURABILITY_SERVICE (i.e. history-kind, depth, resource limits and cleanup-delay)

Share this post


Link to post
Share on other sites
kuolty   

Furthermore I noticed that you didn't create a default writer-QoS object but instead used the topic-QoS's for your writer .. thats not good practice as the topic-level QoS's don't provide for all writer (or reader for that matter) QoS's .. so best to follow the pattern of all our examples where you first create a set of default QoS's for your reader/writer, then copy the topic-level QoS's (copy_from_topic_Qos) to modify those QoS's for which you've created (system-wide) values on topic-level. Also noting that as durability operates independent from readers/writers, you have to 'notify' the required durability behavior to the middleware by registering the non-volatile topic(s) with the correct QoS policies for DURABILITY_SERVICE (i.e. history-kind, depth, resource limits and cleanup-delay)

otes/suggestions:

I'm curious what examples are you talking about because  my code is based on examples/dcps/HelloWorld/isocpp/implementation.cpp:

        /** A dds::domain::DomainParticipant is created for the default domain. */
        dds::domain::DomainParticipant dp(org::opensplice::domain::default_id());

        /** The Durability::Transient policy is specified as a dds::topic::qos::TopicQos
         * so that even if the subscriber does not join until after the sample is written
         * then the DDS will still retain the sample for it. The Reliability::Reliable
         * policy is also specified to guarantee delivery. */
        dds::topic::qos::TopicQos topicQos
             = dp.default_topic_qos()
                << dds::core::policy::Durability::Transient()
                << dds::core::policy::Reliability::Reliable();

        /** A dds::topic::Topic is created for our sample type on the domain participant. */
        dds::topic::Topic<HelloWorldData::Msg> topic(dp, "HelloWorldData_Msg", topicQos);

        /** A dds::pub::Publisher is created on the domain participant. */
        std::string name = "HelloWorld example";
        dds::pub::qos::PublisherQos pubQos
            = dp.default_publisher_qos()
                << dds::core::policy::Partition(name);
        dds::pub::Publisher pub(dp, pubQos);

        /** The dds::pub::qos::DataWriterQos is derived from the topic qos and the
         * WriterDataLifecycle::ManuallyDisposeUnregisteredInstances policy is
         * specified as an addition. This is so the publisher can optionally be run (and
         * exit) before the subscriber. It prevents the middleware default 'clean up' of
         * the topic instance after the writer deletion, this deletion implicitly performs
         * DataWriter::unregister_instance */
        dds::pub::qos::DataWriterQos dwqos = topic.qos();
        dwqos << dds::core::policy::WriterDataLifecycle::ManuallyDisposeUnregisteredInstances();

        /** A dds::pub::DataWriter is created on the Publisher & Topic with the modififed Qos. */
        dds::pub::DataWriter<HelloWorldData::Msg> dw(pub, topic, dwqos);

Share this post


Link to post
Share on other sites

Create an account or sign in to comment

You need to be a member in order to leave a comment

Create an account

Sign up for a new account in our community. It's easy!

Register a new account

Sign in

Already have an account? Sign in here.

Sign In Now

×