3

I am writing simple producer/consumer program.

import zmq

@asyncio.coroutine
def receive_data(future,s):
        print("begin to recv sth from.....socket"
        my_data = s.recv()
        future.set_result(my_data)

@asyncio.coroutine
def producer(loop,q,s):
        while True:
                future = asyncio.Future()
                yield from receive_data(future,s)
                data = str(future.result())
                yield from q.put(data)
@asyncio.coroutine
def consumer(loop,q):
       while True:
          a = yield from q.get()
          print("i am get..."+str(a)+"..."+str(type(a)))  
loop = asyncio.get_event_loop()

c = zmq.Context()
s = c.socket(zmq.REP)
s.bind('tcp://127.0.0.1:5515')

q = asyncio.Queue()
tasks=[asyncio.Task(producer(loop,q,s)),asyncio.Task(comsumer(loop,q))]
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
s.close()

It appears the consumer has no chance to execute.

The sockets receive data every 500ms, so when yield from in receive_data function suspends the producer coroutine, the consumer coroutine will print info.

What could explain this?

3 Answers 3

3

s.recv() is blocking call, so receive_data hungs until new ZMQ message arrives.

That blocks event loop and consumer has no chance to execute itself.

You can pass zmq.NOBLOCK flag to .recv and call asyncio.sleep(0) if no data available to give eventloop a chance to iterate over other ready tasks.

Or just use aiozmq library :)

Sign up to request clarification or add additional context in comments.

1 Comment

Or you can use a ROUTER/DEALER instead as it is kind of an asynchronous request-reply pattern. See request-reply recap documentation
1

You are mixing synchronous and asynchronous calls, the results will be synchronous. If you want to keep using asyncio, you should define an asynchronous context c = zmq.asyncio.context() and use ROUTER socket s = c.socket(zmq.ROUTER). Then, following asyncio syntax, you should yield from recv_multipart() so your my_data = s.recv() becomes my_data = yield from s.recv_multipart().

Comments

0

Here's a guideline of what needs to happen:

  • you should use Context and ZMQEventLoop from zmq.asyncio
  • you should make asyncio use the zmq loop by calling set_event_loop()
  • for testing purposes only, I have switched to a ROUTER_RAW socket

Working example:

import asyncio
import zmq
from zmq.asyncio import Context, ZMQEventLoop

async def receive_data(s):
        data = await s.recv()
        print('receive_data', data)
        return data

async def producer(q, s):
        while True:
                data = await receive_data(s)
                await q.put(data)

async def consumer(q):
       while True:
          a = await q.get()
          print('i got... {} ... {}'.format(a, type(a)))  

loop = ZMQEventLoop()
asyncio.set_event_loop(loop)

c = Context()
s = c.socket(zmq.ROUTER)
s.setsockopt(zmq.ROUTER_RAW, 1)
s.bind('tcp://127.0.0.1:5515')

q = asyncio.Queue()
tasks=[producer(q, s), consumer(q)]
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
s.close()

You can test it out with ROUTER_RAW using telnet:

$ telnet localhost 5515
Trying ::1...
telnet: connect to address ::1: Connection refused
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
test
abcd1234
^]
telnet> Connection closed.
$ 

The response from the app for this would be:

receive_data b'\x00\xa3\x8e\x1f)'
receive_data b''
i got... b'\x00\xa3\x8e\x1f)' ... <class 'bytes'>
i got... b'' ... <class 'bytes'>
receive_data b'\x00\xa3\x8e\x1f)'
receive_data b'test\r\n'
i got... b'\x00\xa3\x8e\x1f)' ... <class 'bytes'>
i got... b'test\r\n' ... <class 'bytes'>
receive_data b'\x00\xa3\x8e\x1f)'
receive_data b'abcd1234\r\n'
i got... b'\x00\xa3\x8e\x1f)' ... <class 'bytes'>
i got... b'abcd1234\r\n' ... <class 'bytes'>
receive_data b'\x00\xa3\x8e\x1f)'
receive_data b''
i got... b'\x00\xa3\x8e\x1f)' ... <class 'bytes'>
i got... b'' ... <class 'bytes'>

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.