1

I am trying to obtain data from a redis channel by using a subscription on my client application. I am using python with asyncio and aioredis for this purpose.

I would like to use my subscription to have a variable of my main application updated when this one changes on the server, but I cannot manage to pass the data received from the subscription to my main thread.

According to aioredis website, I implemented my Subscription with:

sub = await aioredis.create_redis(
     'redis://localhost')

ch1 = await sub.subscribe('channel:1')
assert isinstance(ch1, aioredis.Channel)

async def async_reader(channel, globarVar):
    while await channel.wait_message():
        msg = await channel.get(encoding='utf-8')
        # ... process message ...
        globarVar = float(msg)
        print("message in {}: {}".format(channel.name, msg))

tsk1 = asyncio.ensure_future(async_reader(ch1, upToDateValue))

But I cannot get to update the global variable, I guess python pass just the current value as argument (which I expected to, but wanted to be sure).

Is there any viable option to get data out of a subscription? or to pass a reference to a shared variable or queue I could use?

1 Answer 1

1

You should redesign your code so you don't need a global variable. All of your processing should occur when receiving the message. However to modify a global variable you need to declare it in the function with the global keyword. You don't pass global variables around - you just use them.

Sub:

import aioredis
import asyncio
import json

gvar = 2

# Do everything you need here or call another function
# based on the message.  Don't use a global variable.
async def process_message(msg):
  global gvar
  gvar = msg

async def async_reader(channel):
  while await channel.wait_message():
    j = await channel.get(encoding='utf-8')
    msg = json.loads(j)
    if msg == "stop":
      break
    print(gvar)
    await process_message(msg)
    print(gvar)

async def run(loop):
  sub = await aioredis.create_redis('redis://localhost')
  res = await sub.subscribe('channel:1')
  ch1 = res[0]
  assert isinstance(ch1, aioredis.Channel)

  await async_reader(ch1)

  await sub.unsubscribe('channel:1')
  sub.close()

loop = asyncio.get_event_loop()
loop.run_until_complete( run(loop) )
loop.close()

publisher:

import asyncio
import aioredis

async def main():
    pub = await aioredis.create_redis('redis://localhost')

    res = await pub.publish_json('channel:1', ["Hello", "world"])
    await asyncio.sleep(1)
    res = await pub.publish_json('channel:1', "stop")

    pub.close()


if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(main())
Sign up to request clarification or add additional context in comments.

1 Comment

Thanks you MarkReedZ, very good example and explanation. I would like to mention as well for those who might try your code, please be sure you have installed tornado==4.5.3 and not a newer version of it, or else you may run into an error: "asyncio: RuntimeError this event loop is already running"

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.