I'm trying to develop an event-hub trigger azure function that could receive events from a first event-hub and send these events to a second event-hub. As additional features I'd like my function to be asynchronous and use store checkpoints in an Azure Blob Storage. To do so, I wanted to use the EventHubConsumerClient class of the azure-eventhub library (https://pypi.org/project/azure-eventhub/, https://learn.microsoft.com/en-us/javascript/api/@azure/event-hubs/eventhubconsumerclient?view=azure-node-latest)
However, it seems I cannot receive the events in the first place when I'm testing the function locally on VSCode.
The Event Hub I am listening has two partitions. Its shared access policy is set to send and listen. I have a small script to send him messages for testing and it works great. My Azure function runtime is 4.x with python 3.9.13 with locally a conda base.
Here is the code of my function to receive the events with EventHubConsumerClient class in my init.py:
import logging
import asyncio
import os
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
import azure.functions as func
CONNECTION_STR = os.environ.get("EVENT_HUB_CONN_STR")
EVENTHUB_NAME = os.environ.get("EVENT_HUB_NAME")
STORAGE_CONNECTION_STR = os.environ.get("AZURE_STORAGE_CONN_STR")
BLOB_CONTAINER_NAME = os.environ.get("AZURE_STORAGE_NAME")
async def on_event(partition_context, event):
logging.info("Received event with body: {} from partition: {}.".format(event.body_as_str(encoding="UTF-8"), partition_context.partition_id))
await partition_context.update_checkpoint(event)
async def receive(client):
await client.receive(
on_event=on_event,
starting_position="-1", # "-1" is from the beginning of the partition.
)
async def main(one_event: func.EventHubEvent):
checkpoint_store = BlobCheckpointStore.from_connection_string(STORAGE_CONNECTION_STR, BLOB_CONTAINER_NAME)
client = EventHubConsumerClient.from_connection_string(
CONNECTION_STR,
consumer_group="$Default",
eventhub_name=EVENTHUB_NAME,
checkpoint_store=checkpoint_store,
)
async with client:
await receive(client)
if __name__ == '__main__':
asyncio.run(main())
Note: I know the one_event in main is not used in the main code but I want him to behave as a trigger to run main.
My function.json file is:
{
"scriptFile": "__init__.py",
"bindings": [
{
"type": "eventHubTrigger",
"name": "one_event",
"direction": "in",
"eventHubName": "<My_event_hub_name>",
"connection": "<My_event_hub_co_str>",
"cardinality": "one",
"consumerGroup": "$Default"
}
]
}
I defined an event hub input binding in there to use as a trigger.
I also have a local.settings.json which contains some variables and the requirements.txt which does not seem to lack any libraries.
FYI: I have tested another method (here: https://learn.microsoft.com/en-us/azure/azure-functions/functions-bindings-event-hubs-trigger?tabs=in-process%2Cfunctionsv2%2Cextensionv5&pivots=programming-language-python) (without using EventHubConsumerClient class) to receive the events and it works fine but I do not have the checkpoint and async capabilities.
Upon running the function locally with a "func start", instead of receiving and printing some basic information about the received event, I've got a lot of messages continuously printing in my terminal. It keeps on printing and it locks my terminal so I've got to manually kill it and create a new one.
So it seems that my code is not working properly.
*I am probably messing things up about the main() and asyncio.run() methods. * Do you know by any chance what the problem may be please? Thank you very much!