Azure Service Bus SDK for Python messaging. Use for queues, topics, subscriptions, and enterprise messaging patterns.
✓Works with OpenClaudeEnterprise messaging for reliable cloud communication with queues and pub/sub topics.
Installation
pip install azure-servicebus azure-identity
Environment Variables
SERVICEBUS_FULLY_QUALIFIED_NAMESPACE=<namespace>.servicebus.windows.net
SERVICEBUS_QUEUE_NAME=myqueue
SERVICEBUS_TOPIC_NAME=mytopic
SERVICEBUS_SUBSCRIPTION_NAME=mysubscription
Authentication
from azure.identity import DefaultAzureCredential
from azure.servicebus import ServiceBusClient
credential = DefaultAzureCredential()
namespace = "<namespace>.servicebus.windows.net"
client = ServiceBusClient(
fully_qualified_namespace=namespace,
credential=credential
)
Client Types
| Client | Purpose | Get From |
|---|---|---|
ServiceBusClient | Connection management | Direct instantiation |
ServiceBusSender | Send messages | client.get_queue_sender() / get_topic_sender() |
ServiceBusReceiver | Receive messages | client.get_queue_receiver() / get_subscription_receiver() |
Send Messages (Async)
import asyncio
from azure.servicebus.aio import ServiceBusClient
from azure.servicebus import ServiceBusMessage
from azure.identity.aio import DefaultAzureCredential
async def send_messages():
credential = DefaultAzureCredential()
async with ServiceBusClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
credential=credential
) as client:
sender = client.get_queue_sender(queue_name="myqueue")
async with sender:
# Single message
message = ServiceBusMessage("Hello, Service Bus!")
await sender.send_messages(message)
# Batch of messages
messages = [ServiceBusMessage(f"Message {i}") for i in range(10)]
await sender.send_messages(messages)
# Message batch (for size control)
batch = await sender.create_message_batch()
for i in range(100):
try:
batch.add_message(ServiceBusMessage(f"Batch message {i}"))
except ValueError: # Batch full
await sender.send_messages(batch)
batch = await sender.create_message_batch()
batch.add_message(ServiceBusMessage(f"Batch message {i}"))
await sender.send_messages(batch)
asyncio.run(send_messages())
Receive Messages (Async)
async def receive_messages():
credential = DefaultAzureCredential()
async with ServiceBusClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
credential=credential
) as client:
receiver = client.get_queue_receiver(queue_name="myqueue")
async with receiver:
# Receive batch
messages = await receiver.receive_messages(
max_message_count=10,
max_wait_time=5 # seconds
)
for msg in messages:
print(f"Received: {str(msg)}")
await receiver.complete_message(msg) # Remove from queue
asyncio.run(receive_messages())
Receive Modes
| Mode | Behavior | Use Case |
|---|---|---|
PEEK_LOCK (default) | Message locked, must complete/abandon | Reliable processing |
RECEIVE_AND_DELETE | Removed immediately on receive | At-most-once delivery |
from azure.servicebus import ServiceBusReceiveMode
receiver = client.get_queue_receiver(
queue_name="myqueue",
receive_mode=ServiceBusReceiveMode.RECEIVE_AND_DELETE
)
Message Settlement
async with receiver:
messages = await receiver.receive_messages(max_message_count=1)
for msg in messages:
try:
# Process message...
await receiver.complete_message(msg) # Success - remove from queue
except ProcessingError:
await receiver.abandon_message(msg) # Retry later
except PermanentError:
await receiver.dead_letter_message(
msg,
reason="ProcessingFailed",
error_description="Could not process"
)
| Action | Effect |
|---|---|
complete_message() | Remove from queue (success) |
abandon_message() | Release lock, retry immediately |
dead_letter_message() | Move to dead-letter queue |
defer_message() | Set aside, receive by sequence number |
Topics and Subscriptions
# Send to topic
sender = client.get_topic_sender(topic_name="mytopic")
async with sender:
await sender.send_messages(ServiceBusMessage("Topic message"))
# Receive from subscription
receiver = client.get_subscription_receiver(
topic_name="mytopic",
subscription_name="mysubscription"
)
async with receiver:
messages = await receiver.receive_messages(max_message_count=10)
Sessions (FIFO)
# Send with session
message = ServiceBusMessage("Session message")
message.session_id = "order-123"
await sender.send_messages(message)
# Receive from specific session
receiver = client.get_queue_receiver(
queue_name="session-queue",
session_id="order-123"
)
# Receive from next available session
from azure.servicebus import NEXT_AVAILABLE_SESSION
receiver = client.get_queue_receiver(
queue_name="session-queue",
session_id=NEXT_AVAILABLE_SESSION
)
Scheduled Messages
from datetime import datetime, timedelta, timezone
message = ServiceBusMessage("Scheduled message")
scheduled_time = datetime.now(timezone.utc) + timedelta(minutes=10)
# Schedule message
sequence_number = await sender.schedule_messages(message, scheduled_time)
# Cancel scheduled message
await sender.cancel_scheduled_messages(sequence_number)
Dead-Letter Queue
from azure.servicebus import ServiceBusSubQueue
# Receive from dead-letter queue
dlq_receiver = client.get_queue_receiver(
queue_name="myqueue",
sub_queue=ServiceBusSubQueue.DEAD_LETTER
)
async with dlq_receiver:
messages = await dlq_receiver.receive_messages(max_message_count=10)
for msg in messages:
print(f"Dead-lettered: {msg.dead_letter_reason}")
await dlq_receiver.complete_message(msg)
Sync Client (for simple scripts)
from azure.servicebus import ServiceBusClient, ServiceBusMessage
from azure.identity import DefaultAzureCredential
with ServiceBusClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
credential=DefaultAzureCredential()
) as client:
with client.get_queue_sender("myqueue") as sender:
sender.send_messages(ServiceBusMessage("Sync message"))
with client.get_queue_receiver("myqueue") as receiver:
for msg in receiver:
print(str(msg))
receiver.complete_message(msg)
Best Practices
- Use async client for production workloads
- Use context managers (
async with) for proper cleanup - Complete messages after successful processing
- Use dead-letter queue for poison messages
- Use sessions for ordered, FIFO processing
- Use message batches for high-throughput scenarios
- Set
max_wait_timeto avoid infinite blocking
Reference Files
| File | Contents |
|---|---|
| references/patterns.md | Competing consumers, sessions, retry patterns, request-response, transactions |
| references/dead-letter.md | DLQ handling, poison messages, reprocessing strategies |
| scripts/setup_servicebus.py | CLI for queue/topic/subscription management and DLQ monitoring |
When to Use
This skill is applicable to execute the workflow or actions described in the overview.
Related Cloud (AWS/GCP/Azure) Skills
Other Claude Code skills in the same category — free to download.
Lambda Function
Create AWS Lambda function with handler
S3 Operations
Set up S3 bucket operations (upload, download, presigned URLs)
DynamoDB CRUD
Create DynamoDB CRUD operations
SQS Setup
Set up SQS queue producer and consumer
SNS Notifications
Configure SNS for push notifications
CloudFront Setup
Set up CloudFront CDN distribution
Cognito Auth
Implement AWS Cognito authentication
RDS Setup
Configure RDS database connection
Want a Cloud (AWS/GCP/Azure) skill personalized to YOUR project?
This is a generic skill that works for everyone. Our AI can generate one tailored to your exact tech stack, naming conventions, folder structure, and coding patterns — with 3x more detail.