1

I'm trying to read messages from Azure ServiceBus Topics using async/await and then forward the content to another application via HTTP. My code is simple:

import asyncio
from aiohttp import ClientSession
from azure.servicebus.aio.async_client import ServiceBusService

bus_service = ServiceBusService(service_namespace=..., shared_access_key_name=..., shared_access_key_value=...)

async def watch(topic_name, subscription_name):
    print('{} started'.format(topic_name))

    message = bus_service.receive_subscription_message(topic_name, subscription_name, peek_lock=False, timeout=1)

    if message.body is not None:
        async with ClientSession() as session:
            await session.post('ip:port/endpoint',
                               headers={'Content-type': 'application/x-www-form-urlencoded'},
                               data={'data': message.body.decode()})


async def do():
    while True:
        for topic in ['topic1', 'topic2', 'topic3']:
            await watch(topic, 'watcher')


if __name__ == "__main__":
    asyncio.run(do())

I want to look for messages (forever) from various topics and when a message arrives send the POST. I import the aio package from azure which should work in an async way. After many attempts, the only solution I got is this with while True and setting the timeout=1. This is not what I wanted, I'm doing it sequentially.

azure-servicebus version 0.50.3.

This is my first time with async/await probably I'm missing something...
Any solution/suggestions?

2
  • docs I'm not familiar with the library but you are calling a method to receive a message. This is a blocking call. You should subscribe and then handle received events. It's somewhere here Commented Jul 29, 2020 at 9:02
  • Please use azure-servicebus 7.0.0 to use asyncio pypi.org/project/azure-servicebus Commented Dec 18, 2020 at 18:01

2 Answers 2

3

Here's how you'll do it with the latest major version v7 of servicebus Please take a look a the async samples to send and receive subscription messages https://github.com/Azure/azure-sdk-for-python/blob/04290863fa8963ec525a0b2f4079595287e15d93/sdk/servicebus/azure-servicebus/samples/async_samples/sample_code_servicebus_async.py

import os
import asyncio
from aiohttp import ClientSession
from azure.servicebus.aio import ServiceBusClient
connstr = os.environ['SERVICE_BUS_CONNECTION_STR']
topic_name = os.environ['SERVICE_BUS_TOPIC_NAME']
subscription_name = os.environ['SERVICE_BUS_SUBSCRIPTION_NAME']

async def watch(topic_name, subscription_name):
    async with ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str) as servicebus_client:
        subscription_receiver = servicebus_client.get_subscription_receiver(
            topic_name=topic_name,
            subscription_name=subscription_name,
        )
    async with subscription_receiver:
         message = await subscription_receiver.receive_messages(max_wait_time=1)

    if message.body is not None:
        async with ClientSession() as session:
            await session.post('ip:port/endpoint',
                               headers={'Content-type': 'application/x-www-form-urlencoded'},
                               data={'data': message.body.decode()})

async def do():
    while True:
        for topic in ['topic1', 'topic2', 'topic3']:
            await watch(topic, 'watcher')


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(do())
Sign up to request clarification or add additional context in comments.

Comments

0

You will have to use the package : azure.servicebus.aio

They have the below modules for async : enter image description here

We will have to use the Receive handler class - it can instantiated with get_receiver() method. With this object you will be able to iterate through the message Asynchronously. Spun up a sample script which does that you could further optimise it :

from azure.servicebus.aio import SubscriptionClient
import asyncio
import nest_asyncio
nest_asyncio.apply()

        
Receiving = True

#Topic 1 receiver : 
conn_str= "<>"
name="Allmessages1"
SubsClient = SubscriptionClient.from_connection_string(conn_str, name)
receiver =  SubsClient.get_receiver()

#Topic 2 receiver : 
conn_str2= "<>"
name2="Allmessages2"
SubsClient2 = SubscriptionClient.from_connection_string(conn_str2, name2)
receiver2 =  SubsClient2.get_receiver()
    
#obj= SubscriptionClient("svijayservicebus","mytopic1", shared_access_key_name="RootManageSharedAccessKey", shared_access_key_value="ySr+maBCmIRDK4I1aGgkoBl5sNNxJt4HTwINo0FQ/tc=")
async def receive_message_from1():
    await receiver.open()
    print("Opening the Receiver for Topic1")
    async with receiver:
      while(Receiving):
        msgs =  await receiver.fetch_next()
        for m in msgs:
            print("Received the message from topic 1.....")
            print(str(m))
            await m.complete()
       
async def receive_message_from2():
    await receiver2.open()
    print("Opening the Receiver for Topic2")
    async with receiver2:
      while(Receiving):
        msgs =  await receiver2.fetch_next()
        for m in msgs:
            print("Received the message from topic 2.....")
            print(str(m))
            await m.complete()
               



loop = asyncio.get_event_loop()
topic1receiver = loop.create_task(receive_message_from1())
topic2receiver = loop.create_task(receive_message_from2())

I have created two tasks to facilitate the concurrency. You could refer this post to get more clarity on them.

Output : enter image description here

4 Comments

I already tried something similar using azure-servicebus == 7.0.0b4 but I got ConnectionClose('ErrorCodes.UnknownError: Connection in an unexpected error state.'), which is the same error I'm getting using your solution. I also copied and pasted one example from Azure official Github and had the same error.
Are you getting the above error after sometime or at the beginning of the execution?
Beginning. The connection string is topic related, right?
Yes that's right. I would be needing few more details like the debug logs. I can also take it up offline for a closer look and provide a quick and specialized assistance, please send an email with subject line “Attn:Sathya” to AzCommunity[at]Microsoft[dot]com referencing this thread along with the code sample you re using and debug log file. Ref : learn.microsoft.com/en-us/azure/developer/python/…

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.