Build reliable RabbitMQ consumers with acknowledgments and prefetch
✓Works with OpenClaudeYou are a message queue engineer. The user wants to build a reliable RabbitMQ consumer that handles acknowledgments correctly and uses prefetch to control message flow.
What to check first
- Verify RabbitMQ server is running:
rabbitmq-diagnostics -s ping - Check that
amqplibis installed:npm list amqplib(orpip list | grep pikafor Python) - Confirm the queue exists or will be declared by the consumer
Steps
- Connect to RabbitMQ using
amqplib.connect()with proper error handling for reconnection - Create a channel with
connection.createChannel()to isolate this consumer's operations - Declare the queue using
channel.assertQueue()withdurable: trueto persist it across restarts - Set prefetch with
channel.prefetch(1)to pull only one message at a time (prevents overwhelming the worker) - Consume messages using
channel.consume()withnoAck: falseto enable manual acknowledgment - Process the message in the callback and call
channel.ack(msg)only after successful processing - Implement error handling to call
channel.nack(msg, false, true)to requeue failed messages - Add graceful shutdown to close the channel and connection on process termination
Code
const amqp = require('amqplib');
const RABBITMQ_URL = process.env.RABBITMQ_URL || 'amqp://guest:guest@localhost';
const QUEUE_NAME = 'task_queue';
const PREFETCH_COUNT = 1;
async function startConsumer() {
let connection;
let channel;
try {
// Step 1: Connect to RabbitMQ
connection = await amqp.connect(RABBITMQ_URL);
console.log('Connected to RabbitMQ');
// Step 2: Create a channel
channel = await connection.createChannel();
// Step 3: Declare the queue (idempotent)
await channel.assertQueue(QUEUE_NAME, {
durable: true,
arguments: {
'x-message-ttl': 86400000, // 24 hours
},
});
// Step 4: Set prefetch to 1 (process one message at a time)
await channel.prefetch(PREFETCH_COUNT);
console.log(`Waiting for messages in queue: ${QUEUE_NAME}`);
// Step 5: Start consuming with manual acknowledgment
channel.consume(QUEUE_NAME, async (msg) => {
if (!msg) return;
try {
// Parse the message content
const content = JSON.parse(msg.content.toString());
console.log(`Received message:`, content);
// Simulate async work
await processMessage(content);
// Step
Note: this example was truncated in the source. See the GitHub repo for the latest full version.
Common Pitfalls
- Treating this skill as a one-shot solution — most workflows need iteration and verification
- Skipping the verification steps — you don't know it worked until you measure
- Applying this skill without understanding the underlying problem — read the related docs first
When NOT to Use This Skill
- When a simpler manual approach would take less than 10 minutes
- On critical production systems without testing in staging first
- When you don't have permission or authorization to make these changes
How to Verify It Worked
- Run the verification steps documented above
- Compare the output against your expected baseline
- Check logs for any warnings or errors — silent failures are the worst kind
Production Considerations
- Test in staging before deploying to production
- Have a rollback plan — every change should be reversible
- Monitor the affected systems for at least 24 hours after the change
Related Networking Skills
Other Claude Code skills in the same category — free to download.
HTTP Client
Create configured HTTP client with interceptors
Retry Logic
Implement retry logic with exponential backoff
Circuit Breaker
Implement circuit breaker pattern
Request Queue
Queue and batch HTTP requests
Proxy Setup
Set up reverse proxy configuration
SSL Setup
Configure SSL/TLS certificates
DNS Setup
Configure DNS records
Load Balancer
Set up load balancing configuration
Want a Networking skill personalized to YOUR project?
This is a generic skill that works for everyone. Our AI can generate one tailored to your exact tech stack, naming conventions, folder structure, and coding patterns — with 3x more detail.