Jump to content
OpenSplice DDS Forum
kuolty

I receive only the latest published sample with persistent durability instead of all

Recommended Posts

I want late joined subscriber to read all previously  written samples.

 

My steps:

  1. run publisher
  2. check persistent storage file to ensure that data was really saved
  3. run subscriber

I don't know why but I receive only the latest published sample nevertheless I specified to keep all history.

 

Writer.cpp

        HelloWorldData::Msg msgInstance1(1, "Hello World 1");
        HelloWorldData::Msg msgInstance2(1, "Hello World 2");

        dds::domain::DomainParticipant dp(org::opensplice::domain::default_id());

        dds::topic::qos::TopicQos topicQos
                = dp.default_topic_qos()
                << dds::core::policy::Durability::Persistent()
                << dds::core::policy::Reliability::Reliable()
                << dds::core::policy::History::KeepAll();

        dds::topic::Topic<HelloWorldData::Msg> topic(dp, "THelloWorldData", topicQos);

        std::string name = "HelloWorldPartition";
        dds::pub::qos::PublisherQos pubQos
                = dp.default_publisher_qos()
                << dds::core::policy::Partition(name);
        dds::pub::Publisher pub(dp, pubQos);

        dds::pub::qos::DataWriterQos dwqos = topic.qos();

        dwqos << dds::core::policy::WriterDataLifecycle::ManuallyDisposeUnregisteredInstances();

        dds::pub::DataWriter<HelloWorldData::Msg> dw(pub, topic, dwqos);
        
        dw << msgInstance1;
        dw << msgInstance2;

        // wait until heartbeat timed out and published instances stored to the persistent storage
        std::this_thread::sleep_for(std::chrono::seconds(10));

Reader:

        auto ds = dds::sub::status::DataState::new_data();
        ds << dds::sub::status::SampleState::not_read();

        dds::domain::DomainParticipant dp(org::opensplice::domain::default_id());

        dds::topic::qos::TopicQos topicQos = dp.default_topic_qos()
                << dds::core::policy::Reliability::Reliable()
                << dds::core::policy::Durability::Persistent()
                << dds::core::policy::History::KeepAll();
        
        dds::topic::Topic<HelloWorldData::Msg> topic(dp, "THelloWorldData", topicQos);

        std::string name = "HelloWorldPartition";
        dds::sub::qos::SubscriberQos subQos
            = dp.default_subscriber_qos()
                << dds::core::policy::Partition(name);
        dds::sub::Subscriber sub(dp, subQos);

        dds::sub::qos::DataReaderQos drqos = topic.qos();

        dds::sub::DataReader<HelloWorldData::Msg> dr(sub, topic, drqos);
        
        //dr.wait_for_historical_data(dds::core::Duration(4));
        
        dds::sub::cond::ReadCondition rc(dr, ds);

        // Attach the condition
        ws += rc;        
        
        while (!stop) {
            
            try {
                ws.wait(dds::core::Duration(1));
                
                auto samples = dr.select().state(ds).take();

                std::for_each(samples.begin(), samples.end(),[](const dds::sub::Sample<HelloWorldData::Msg>& sample) {
                            
                    if (sample.info().valid()) {
                        std::cout << sample.data().message() << std::endl;
                    } else {
                        std::cout << "Sample is not valid" << std::endl;
                    }
                });


            } catch(dds::core::TimeoutError& e) {
                (void)e;
            }
            
        }

Share this post


Link to post
Share on other sites

The problem was with keylist in IDL file.

 

That's how my IDL file was originally looking.

module HelloWorldData
{
   struct Msg
   {
      /** User ID */
      long userID;
      /**  message */
      string message;
   };
   #pragma keylist Msg userID
};


And that's how it must look:

module HelloWorldData
{
   struct Msg
   {
      /** User ID */
      long userID;
      /**  message */
      string message;
   };
   #pragma keylist Msg userID message
};

Share this post


Link to post
Share on other sites

Hi,

 

I'd have guessed that the 'real' issue is that you wrote both messages with the same userID value '1'.

Your solution to add the payload itself as an extra key might 'bypass' that error, but its not a very efficient solution :)

 

-Hans

Share this post


Link to post
Share on other sites

Hi,

 

I'd have guessed that the 'real' issue is that you wrote both messages with the same userID value '1'.

Your solution to add the payload itself as an extra key might 'bypass' that error, but its not a very efficient solution :)

 

-Hans

 

Hi

 

Thanks for your comment. I understand drawbacks of chosen fix, but I don't now how to receive all history of changes (not only the last updated value) for userID #1 without adding payload as extra key. Sure, I can extend fields by adding timestamp field but it will duplicate DDS timestamp and I'm not sure that it will be a better solution.

Share this post


Link to post
Share on other sites

Ok, I think I understand the source of the confusion: QoS-policies defined on topic-level do not automatically get 'transferred' to the policies of readers/writers. They are there to serve as 'defaults' but they have to be explicitly copied (copy-from-topic-qos API's). The reason is that the guys that come-up with the data-model i.e. the topics are often also the domain-experts that can 'reason' about those QoS's that define global-behavior such as durability, reliability, urgency, importance. So if they define the proper topic-level QoS's then individual appliation-programmers that eventually write/read samples of those topics can 'inherit' that knowledge by re-using those policies as defined on the topic-level.

 

In your case, although you have specified a KEEP_ALL history-policy on the topic-level, that wasn't 'effectuated' for your reader which is still using the default KEEP_LAST policy with a depth of 1.

 

Since a reader-history is more about local-behavior than system-wide behavior, it usually isn't specified on topic-level, unless your topic is non-volatile i.e. TRANSIENT or PERSISTENT, in which case you MUST specify the DURABILITY_SERVICE QoS policies on the topic-level (i.e. history-kind, history-depth and resource-limits).

 

So for your example there's 2 things to do:

1. specify the DURABILITY_SERVICE policies for you topic so that the middleware knows how to 'retain' those non-volatile samples

2. specify a (matching) history-policy for your reader so that sufficient historical samples are maintained in your reader's history-cache

 

Typically I'd suggest a KEEP_LAST policy with a sufficient depth rather than using a KEEP_ALL policy as KEEP_ALL will cause end-to-end flow-control in case resource-limits are reached (or if you don't specify resource-limits, you can run out of memory if you're not careful with your instance-management and/or disposing stale data)

 

Hope this helps.

Share this post


Link to post
Share on other sites

It still doesn't clear how can I receive all samples regardless of durability service settings. Not using payload in keylist.

Share this post


Link to post
Share on other sites

Create an account or sign in to comment

You need to be a member in order to leave a comment

Create an account

Sign up for a new account in our community. It's easy!

Register a new account

Sign in

Already have an account? Sign in here.

Sign In Now

×