How to Send/Recieve data in Azure Event Hub with python?

people sitting on gang chairs
Reading Time: 4 minutes
Azure Event Hub with python

Azure Event Hub Setup:

We will create an Eventhub namespace, an event hub and a storage account on Azure Cloud. And then, we will integrate them using a connection string. Before we create other resources, we will create a common resource group: rg-eventhub.

resource group event hub

Step 1: Event Hub Namespace

Now we create an event hub namespace with the following configurations.

  • Subscription: Azure subscription 1
  • Resource Group: rg-eventhub
  • Namespace name: knolduseventhub
  • Location: East US
  • Pricing tier: Basic
  • Throughput Units: 1 Mb/s
  • Rest remains as the default

And then, verify and Create.

Azure Event Hub namespace

Step 2: Event Hub

Inside the event hub namespace, we can create multiple event hubs. Then, open the namespace and go to the Entities section and open Event hubs.

Event Hub Entities

Now, we will create a single event hub with the following configurations.

  • Name: knoldusevent
  • Partition count: 2
  • Retention
    • Cleanup Policy: Delete
    • Retention time(hrs): 1
  • Capture: Not Available in the Basic tier namespace

After that, review and Create.

knolduseventhub

Step 3: Storage

We need a blob container to store the inputs from the receiver part. So we will create a storage account and then attach a blob container to it.

Step 3.1: The storage account

We will create a storage account with the following configurations.

  • Subscription: Azure subscription 1
  • Resource group: rg-eventhub
  • Name: knoldusstorage
  • Performance: Standard
  • Redundancy: Locally-redundant storage (LRS)
  • Network access: Enable public access from all networks
  • Routing preference: Internet routing
  • Encryption type: Microsoft-managed keys (MMK)
  • Rest configurations are default one

After that, review and create.

knoldus storage account

Step 3.2: blob container

Firstly, go to the Data Storage section of the storage account and open containers. And then, create a blob container with the following configurations.

  • Name: knoldusblob
  • Public access level: Container
  • Create
blob container

Step 4: python script

We will install all dependencies of python locally using pip. And then, you will be able to import these into your python scripts.

pip install azure-eventhub
pip install azure-identity
pip install aiohttp
pip install azure-eventhub-checkpointstoreblob-aio
pip install asyncio

After that, we will add the code to send data. And then, we name the code send.py.

import asyncio

from azure.eventhub import EventData
from azure.eventhub.aio import EventHubProducerClient

EVENT_HUB_CONNECTION_STR = "<YOUR_EVENT_HUB_CONNECTION_STR>"
EVENT_HUB_NAME = "YOUR_EVENT_HUB_NAME"

async def run():

    while True:
        await asyncio.sleep(1)

        producer = EventHubProducerClient.from_connection_string(
            conn_str=EVENT_HUB_CONNECTION_STR, eventhub_name=EVENT_HUB_NAME
        )
        async with producer:
            # Create a batch.
            event_data_batch = await producer.create_batch()

            # Add events to the batch.
            event_data_batch.add(EventData("First event "))
            event_data_batch.add(EventData("Second event"))
            event_data_batch.add(EventData("Third event"))

            # Send the batch of events to the event hub.
            await producer.send_batch(event_data_batch)
            print("sent to azure successfully")

loop = asyncio.get_event_loop()

try:
    asyncio.run(run())
    loop.run_forever()
except KeyboardInterrupt:
    pass
finally:
    print("Closing the loop")
    loop.close()

You can get the Connection string from Shared Access Policies in the setting section of the event hub (not the event hub namespace). If not there create a policy with the following configurations.

  • Name: RootPolicyAccess
  • type: Manage
  • Create
Connection String event hub

Now we create the receiver code. Named as receive.py.

import asyncio

from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import (
    BlobCheckpointStore,
)

BLOB_STORAGE_CONNECTION_STRING = "YOUR_STORAGE_ACCOUNT_CONNECTION_STR"
BLOB_CONTAINER_NAME = "YOUR_CONTAINER_NAME"
EVENT_HUB_CONNECTION_STR = "YOUR_EVENT_HUB_CONNECTION_STR"
EVENT_HUB_NAME = "YOUR_EVENT_HUB_NAME"

async def on_event(partition_context, event):
    # Print the event data.
    print(
        'Received the event: "{}" from the partition with ID: "{}"'.format(
            event.body_as_str(encoding="UTF-8"), partition_context.partition_id
        )
    )

    # Update the checkpoint so that the program doesn't read the events
    # that it has already read when you run it next time.
    await partition_context.update_checkpoint(event)

async def main():
    # Create an Azure blob checkpoint store to store the checkpoints.
    checkpoint_store = BlobCheckpointStore.from_connection_string(
        BLOB_STORAGE_CONNECTION_STRING, BLOB_CONTAINER_NAME
    )

    # Create a consumer client for the event hub.
    client = EventHubConsumerClient.from_connection_string(
        EVENT_HUB_CONNECTION_STR,
        consumer_group="$Default",
        eventhub_name=EVENT_HUB_NAME,
        checkpoint_store=checkpoint_store,
    )
    async with client:
        # Call the receive method. Read from the beginning of the
        # partition (starting_position: "-1")
        await client.receive(on_event=on_event, starting_position="-1")

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    # Run the main method.
    
    try:
        loop.run_until_complete(main())
    except KeyboardInterrupt:
        pass
    finally:
        print("Closing the loop")
        loop.close()

And then, you can get your storage account string from the security+networking section in the access keys.

access key storage

Step 5: Run the code

First, run the receive.py and then send.py. And then, you will get the output on the receiver end.

code output

References

  1. https://learn.microsoft.com/en-us/azure/event-hubs/event-hubs-python-get-started-send?tabs=passwordless%2Croles-azure-portal
  2. https://blog.knoldus.com/how-to-create-resources-using-an-arm-template-locally/
Knoldus Footer

Written by 

Vaibhav Kumar is a DevOps Engineer at Knoldus | Part of Nashtech with experience in architecting and automating integral deployments over infrastructure. Proficient in Jenkins, Git, AWS and in developing CI pipelines. Able to perform configuration management using ansible and infrastructure management using terraform. Like to script and do developing in Python. Other than AWS, have an experience in Google Cloud, Azure cloud services. Other than Jenkins, CI/CD in Azure Pipelines, GitHub Actions, Teamcity. Loves to explore new technologies and ways to improve work with automation.