Jump to content
OpenSplice DDS Forum
Sign in to follow this  
saso.skube@cosylab.com

[java][v6.7] Subscriber memory leak

Recommended Posts

Hi!

I have been testing basic functionality of the new DDS version (using VortexOpenSplice-6.7.170523OSS-HDE-x86_64.linux-gcc5.4.0-glibc2.23-installer.tar) in Java (dcpssaj5.jar). I noticed a memory leak on the subscriber side, where the following threads keep increasing heap size:

- RMI TCP Connection

- JMX server connection timeout

- ListenerEventThread

 

No idea about the other two, but ListenerEventThread increases it's heap size each time .take() method is called in the onDataAvailable callback. Size increase is about the same as the received message size. Garbage collector doesn't clean anything up and based on the QoS settings there shouldn't be more than 10 samples in the data reader at any point.

The leak seems to be somewhere in Java library, since c++ code doesn't have it, and I can see Java objects increasing in size. I'm guessing that JNI is still holding references somewhere...

 

Now the question. Is there really a memory leak or am I missing something? Here's the subscriber code:

package test.opensplice.history;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;

import org.omg.dds.core.ServiceEnvironment;
import org.omg.dds.core.event.DataAvailableEvent;
import org.omg.dds.core.policy.Durability;
import org.omg.dds.core.policy.DurabilityService;
import org.omg.dds.core.policy.History;
import org.omg.dds.core.policy.PolicyFactory;
import org.omg.dds.core.policy.Reliability;
import org.omg.dds.core.policy.History.Kind;
import org.omg.dds.core.status.Status;
import org.omg.dds.domain.DomainParticipant;
import org.omg.dds.domain.DomainParticipantFactory;
import org.omg.dds.sub.DataReader;
import org.omg.dds.sub.DataReaderAdapter;
import org.omg.dds.sub.Sample;
import org.omg.dds.sub.SampleState;
import org.omg.dds.sub.Subscriber;
import org.omg.dds.sub.Subscriber.DataState;
import org.omg.dds.sub.ViewState;
import org.omg.dds.topic.Topic;

import test.opensplice.history.NewsData.LargeData;

public class NewsSub extends DataReaderAdapter<LargeData> {
	private ServiceEnvironment env = null;
	DomainParticipant participant = null;
	private DataReader<LargeData> reader = null;
	
	public NewsSub(String topicName) {
		System.setProperty(
                ServiceEnvironment.IMPLEMENTATION_CLASS_NAME_PROPERTY,
                "org.opensplice.dds.core.OsplServiceEnvironment");
    	
    	env = ServiceEnvironment.createInstance(NewsPub.class.getClassLoader());
    	participant = DomainParticipantFactory.getInstance(env).createParticipant();
    	Subscriber subscriber = participant.createSubscriber();
    	
    	
    	// Set up QOS
    	Reliability rel = PolicyFactory.getPolicyFactory(env).Reliability().withReliable();
		Durability dur = PolicyFactory.getPolicyFactory(env).Durability().withTransientLocal();
		History hist = PolicyFactory.getPolicyFactory(env).History().withKeepLast(10);
		DurabilityService durServ = PolicyFactory.getPolicyFactory(env).DurabilityService().withHistoryKind(Kind.KEEP_LAST).withHistoryDepth(10);
    	
    	
    	// Set up topic
    	Collection<Class<? extends Status>> status = new HashSet<Class<? extends Status>>();
    	Topic<LargeData> topic = participant.createTopic(topicName,  LargeData.class, participant.getDefaultTopicQos().withPolicies(rel, dur, hist, durServ), null, status);
    	
    	// Set up data reader
    	reader = subscriber.createDataReader(topic, subscriber.copyFromTopicQos(subscriber.getDefaultDataReaderQos(), topic.getQos()));
    	reader.select().dataState(subscriber.createDataState().withAnySampleState().withAnyViewState().withAnyInstanceState());
    	reader.setListener(this);
	}
	
	public void run(long sendPeriodMillis) {
		while (true) {
            try {
                Thread.sleep(sendPeriodMillis);
            } catch (Exception e) {
            	participant.close();
            }
        }
	}
	
	@Override
    public void onDataAvailable(DataAvailableEvent<LargeData> status) {
        try {
			Iterator<Sample<LargeData>> samples = status.getSource().take();
			while (samples.hasNext()) {
			    Sample<LargeData> sample = samples.next();
			    LargeData message = sample.getData();
			    if (message != null) { //Check if the sample is valid.
			         System.out.println(String.format("Message %d received states: Sample=%d, View=%d, Instance=%d",
			        		 message.element[0], 
			        		 sample.getSampleState().value, 
			        		 sample.getViewState().value, 
			        		 sample.getInstanceState().value));
			    }
			}
			
        	/*
            List<Sample<LargeData>> samples = new ArrayList<Sample<LargeData>>();

        	status.getSource().take(samples);
            for (Sample<LargeData> sample : samples) {
            	LargeData message = sample.getData();
                if (message != null) { //Check if the sample is valid.
                     System.out.println("Message " + message.element[0] + " received");
                     //System.out.println("Message: " + message.ID + ": " + message.title + " -> " + message.message);
                }
            }*/
        } catch(Exception e) {
            e.printStackTrace();
        }
        
        
    }
	
	public static void main(String[] args) {
		NewsSub app = new NewsSub("LargeTopic");
		app.run(1000);

	}
}

Note that even with just status.getSource().take() in the onDataAvailable() callback, the problem persists.

 

Best regards,

Sašo

Share this post


Link to post
Share on other sites

Hi Saso, 

 

I think you should be using return function here. Probably that is the reason for your memory leak. Check in the documentation for return.

 

 

Cheers,

Akhil 

Share this post


Link to post
Share on other sites

Hi!

Thought there was something about returning the loan, but I couldn't get to the function for returning it.

Here is what I did wrong:

Instead of using java.util

Iterator<Sample<LargeData>> samples = status.getSource().take();

I should have used org.omg.dds.sub.Sample

Sample.Iterator<LargeData> samples = status.getSource().take();

Then the 'close()' function appears on the samples list.

 

Here is working code in the onDataAvailable function:

Sample.Iterator<LargeData> samples = status.getSource().take();
// Process each Sample and print its name and production time.
while (samples.hasNext()) {
    Sample<LargeData> sample = samples.next();
    LargeData message = sample.getData();
    if (message != null) { //Check if the sample is valid.
            System.out.println(String.format("Message %d received states: Sample=%d, View=%d, Instance=%d",
                    message.element[0],
                    sample.getSampleState().value,
                    sample.getViewState().value,
                    sample.getInstanceState().value));
    }
}
samples.close();

Share this post


Link to post
Share on other sites

Create an account or sign in to comment

You need to be a member in order to leave a comment

Create an account

Sign up for a new account in our community. It's easy!

Register a new account

Sign in

Already have an account? Sign in here.

Sign In Now
Sign in to follow this  

×