
Azure Event Hub Setup:
We will create an Eventhub namespace, an event hub and a storage account on Azure Cloud. And then, we will integrate them using a connection string. Before we create other resources, we will create a common resource group: rg-eventhub.

Step 1: Event Hub Namespace
Now we create an event hub namespace with the following configurations.
- Subscription: Azure subscription 1
- Resource Group: rg-eventhub
- Namespace name: knolduseventhub
- Location: East US
- Pricing tier: Basic
- Throughput Units: 1 Mb/s
- Rest remains as the default
And then, verify and Create.

Step 2: Event Hub
Inside the event hub namespace, we can create multiple event hubs. Then, open the namespace and go to the Entities section and open Event hubs.

Now, we will create a single event hub with the following configurations.
- Name: knoldusevent
- Partition count: 2
- Retention
- Cleanup Policy: Delete
- Retention time(hrs): 1
- Capture: Not Available in the Basic tier namespace
After that, review and Create.

Step 3: Storage
We need a blob container to store the inputs from the receiver part. So we will create a storage account and then attach a blob container to it.
Step 3.1: The storage account
We will create a storage account with the following configurations.
- Subscription: Azure subscription 1
- Resource group: rg-eventhub
- Name: knoldusstorage
- Performance: Standard
- Redundancy: Locally-redundant storage (LRS)
- Network access: Enable public access from all networks
- Routing preference: Internet routing
- Encryption type: Microsoft-managed keys (MMK)
- Rest configurations are default one
After that, review and create.

Step 3.2: blob container
Firstly, go to the Data Storage section of the storage account and open containers. And then, create a blob container with the following configurations.
- Name: knoldusblob
- Public access level: Container
- Create

Step 4: python script
We will install all dependencies of python locally using pip. And then, you will be able to import these into your python scripts.
pip install azure-eventhub
pip install azure-identity
pip install aiohttp
pip install azure-eventhub-checkpointstoreblob-aio
pip install asyncio
After that, we will add the code to send data. And then, we name the code send.py.
import asyncio
from azure.eventhub import EventData
from azure.eventhub.aio import EventHubProducerClient
EVENT_HUB_CONNECTION_STR = "<YOUR_EVENT_HUB_CONNECTION_STR>"
EVENT_HUB_NAME = "YOUR_EVENT_HUB_NAME"
async def run():
while True:
await asyncio.sleep(1)
producer = EventHubProducerClient.from_connection_string(
conn_str=EVENT_HUB_CONNECTION_STR, eventhub_name=EVENT_HUB_NAME
)
async with producer:
# Create a batch.
event_data_batch = await producer.create_batch()
# Add events to the batch.
event_data_batch.add(EventData("First event "))
event_data_batch.add(EventData("Second event"))
event_data_batch.add(EventData("Third event"))
# Send the batch of events to the event hub.
await producer.send_batch(event_data_batch)
print("sent to azure successfully")
loop = asyncio.get_event_loop()
try:
asyncio.run(run())
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
print("Closing the loop")
loop.close()
You can get the Connection string from Shared Access Policies in the setting section of the event hub (not the event hub namespace). If not there create a policy with the following configurations.
- Name: RootPolicyAccess
- type: Manage
- Create

Now we create the receiver code. Named as receive.py.
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import (
BlobCheckpointStore,
)
BLOB_STORAGE_CONNECTION_STRING = "YOUR_STORAGE_ACCOUNT_CONNECTION_STR"
BLOB_CONTAINER_NAME = "YOUR_CONTAINER_NAME"
EVENT_HUB_CONNECTION_STR = "YOUR_EVENT_HUB_CONNECTION_STR"
EVENT_HUB_NAME = "YOUR_EVENT_HUB_NAME"
async def on_event(partition_context, event):
# Print the event data.
print(
'Received the event: "{}" from the partition with ID: "{}"'.format(
event.body_as_str(encoding="UTF-8"), partition_context.partition_id
)
)
# Update the checkpoint so that the program doesn't read the events
# that it has already read when you run it next time.
await partition_context.update_checkpoint(event)
async def main():
# Create an Azure blob checkpoint store to store the checkpoints.
checkpoint_store = BlobCheckpointStore.from_connection_string(
BLOB_STORAGE_CONNECTION_STRING, BLOB_CONTAINER_NAME
)
# Create a consumer client for the event hub.
client = EventHubConsumerClient.from_connection_string(
EVENT_HUB_CONNECTION_STR,
consumer_group="$Default",
eventhub_name=EVENT_HUB_NAME,
checkpoint_store=checkpoint_store,
)
async with client:
# Call the receive method. Read from the beginning of the
# partition (starting_position: "-1")
await client.receive(on_event=on_event, starting_position="-1")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
# Run the main method.
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
pass
finally:
print("Closing the loop")
loop.close()
And then, you can get your storage account string from the security+networking section in the access keys.

Step 5: Run the code
First, run the receive.py and then send.py. And then, you will get the output on the receiver end.

References
- https://learn.microsoft.com/en-us/azure/event-hubs/event-hubs-python-get-started-send?tabs=passwordless%2Croles-azure-portal
- https://blog.knoldus.com/how-to-create-resources-using-an-arm-template-locally/
