Leak while using multiprocessing list with PyTango 9.3.1

Hi Tangoers! :)

I've been facing quite an annoying problem with my device server. Problem generally known as a memory leak smile. After a long time I manage to separate a few lines out of my code which were responsible of memory consumption. To my surprise, any call to ProxyList object was causing the device server to consume more and more memory.

I attach the Device Server class(DeviceTest.py), where comments point to each line where the ProxyList object was used.

The leak can be observed while using such an example script, which just reads attributes in a loop. (I also tested the behavior when using commands, but it is same- leaking).

import time
from PyTango import DeviceProxy

dev = DeviceProxy("test/devicetest/test")

while True:
test = dev.test_attr_2
print test,'.'
time.sleep(0.01)


My next step was to test the same approach while using simple python script (without Tango), but memory usage didn't grow. (script.py)

I used a memory_profiler module to trace the leak and currently working on Python2.7 with PyTango 9.3.1 on a Windows10 machine.

Any help would be appreciated. Maybe I'm missing something here…

FYI, the reason of using multiprocessing was to separate device communication and device memory read to another process. Device Server just shares variables with the second process. For example using
self.data_counter = multiprocessing.Value('i', 0)
and returning its value
return self.data_counter.value
in attributes doesn't cause a memory leak.

So in my case I created
self.current_memory = self.manager.list()
passed this to a process, started it and assigned values to this list within a process, so that attribute could return
[self.current_memory[registry], self.get_dataCounter()]


As tutorials point, this seems to be a proper list usage while using multiprocessing and manager caring of memory sharing.

What is more, the reason why I use ListProxy is that it can increase its size after creation - in contrary to
multiprocessing.Array("i", [0, 1])
where one has to specify its size on the very beginning. In my case user can decide in runtime which registers are read in the second process.

Anyway, I would be very grateful for helping me with a solution to my problem.
Regards,
Jagoda
Edited 4 years ago
Hi Jagoda,

I don't know the implementation details behind multiprocessing, but I suspect that the issue is because you are accessing multiprocessing list proxy from multiple threads (each attribute access is handled by a thread allocated by omniORB).

Following change to your example shows memory leak on my machine:

def access(self):
print self.my_list.__len__()

@profile
def print_list(self):
# print self.my_list.__len__()
t = threading.Thread(target=self.access)
t.start()
t.join()


Also, following change seems to fix memory leak in device server:

@profile
def init_device(self):
pid = os.getpid()
print("DS pid is", pid)

# —————————————-
self.manager = multiprocessing.Manager()
self.my_list = self.manager.list(range(24))

# use dedicated thread to access multiprocessing resource
self.req = queue.Queue()
self.resp = queue.Queue()

def access():
while True:
_ = self.req.get()
self.resp.put(self.my_list[:])

self.thread = threading.Thread(target=access)
self.thread.daemon = True
self.thread.start()

@profile
def get_test_attr_2(self):
# return self.my_list[:] # leaking cause
self.req.put(None)
return self.resp.get()


May be relevant:
https://stackoverflow.com/questions/47672076/sharing-manager-list-with-threads-and-process-creates-seperate-copies
Michal Liszcz
Yes, indeed creating a separate thread for accessing the data solves the issue if there is only one list in the thread. Thank you! I am really grateful for this solution.

But here comes my next question: Why cannot I get it to work when more queues' req and put are in this access thread?
When I put in the thread two lists (controlled by another queue objects as you showed in the example), it gets stucked on attribute read where its value is returned as follows:
self.req_d_memory.put(None)
return [self.resp_d_memory.get()[value], self.get_dataCounter()]

My access method looked like this:
def access():
while True:
_ = self.req_d_memory.get()
self.resp_d_memory.put(self.current_memory_d[:])

_ = self.req_m_memory.get()
self.resp_m_memory.put(self.current_memory_m[:])

What I did to control two lists:
def access():
while True:
_ = self.req_d_memory.get()
self.resp_d_memory.put(self.current_memory_d[:])
#time.sleep(0.001)
def access_2():
while True:
_ = self.req_m_memory.get()
self.resp_m_memory.put(self.current_memory_m[:])


self.thread = threading.Thread(target=access)
self.thread.daemon = True
self.thread.start()
self.thread_2 = threading.Thread(target=access_2)
self.thread_2.daemon = True
self.thread_2.start()


Is this the right approach, what do you think?
Regards,
Jagoda
Edited 4 years ago
Hi Jagoda,

I've used thread and queues just as a simple example how to serialize access to the shared resource. Spawning a thread and two queues for each resource does not seem very robust. Instead, you may want to look into one of the following solutions:

1. Create a ThreadPoolExecutor with just one worker thread and perform the reads using executor:

# in init_device (remember to close the executor in delete_device)
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
# later
all_items = slice(None, None, None) # equivalent to x[:]
value = self.executor.submit(self.current_memory_d.__getitem__, all_items).result()

https://docs.python.org/3/library/concurrent.futures.html
Backport for Py2: https://pypi.org/project/futures/

2. Use actor pattern to implement the business logic of your device and from tango callbacks (getters/setters/commands) just perform blocking calls to the actor:

# in init_device (remember to stop the actor in delete_device)
self.actor = MyDeviceActor.start()
self.proxy = self.actor.proxy()
# later
value = self.proxy.get_current_memory_d().get()

Example library that implements actor model: https://www.pykka.org/en/latest/api/proxies/

3. Upgrade to Python 3 and use Asyncio green mode. Tango callbacks will run as coroutines invoked by the event loop (thus, from single thread).
https://pytango.readthedocs.io/en/stable/green_modes/green.html#id2

4. Stick to Python 2 and use Gevent green mode (But I have zero experience with Gevent and I don't know whether this will work or not)
https://pytango.readthedocs.io/en/stable/green_modes/green.html#id1
Michal Liszcz
 
Register or login to create to post a reply.