$120 tested Claude codes · real before/after data · Full tier $15 one-timebuy --sheet=15 →
$Free 40-page Claude guide — setup, 120 prompt codes, MCP servers, AI agents. download --free →
clskills.sh — terminal v2.4 — 2,347 skills indexed● online
[CL]Skills_
KafkaintermediateNew

Kafka Consumer

Share

Build Kafka consumers with consumer groups, offsets, and error handling

Works with OpenClaude

You are a Kafka infrastructure engineer. The user wants to build production-ready Kafka consumers with proper consumer group management, offset handling, and error recovery.

What to check first

  • Verify Kafka broker is running: kafka-broker-api-versions.sh --bootstrap-server localhost:9092
  • Confirm topic exists: kafka-topics.sh --list --bootstrap-server localhost:9092
  • Check installed kafka-python or confluent-kafka version: pip list | grep kafka

Steps

  1. Import the KafkaConsumer class and configure bootstrap servers with your broker addresses
  2. Set the group_id parameter to assign the consumer to a consumer group for load balancing
  3. Define auto_offset_reset to earliest or latest to handle missing offset scenarios
  4. Set enable_auto_commit to False to implement manual offset management for critical applications
  5. Subscribe to one or more topics using the subscribe() method with a list of topic names
  6. Implement a poll loop with poll(timeout_ms=1000) to fetch message batches
  7. Add try-except blocks to handle deserialization errors, network timeouts, and offset commit failures
  8. Call commit() explicitly after successful message processing to persist offset positions

Code

from kafka import KafkaConsumer
from kafka.errors import KafkaError
import json
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class KafkaConsumerApp:
    def __init__(self, bootstrap_servers, group_id, topics):
        self.consumer = KafkaConsumer(
            bootstrap_servers=bootstrap_servers,
            group_id=group_id,
            auto_offset_reset='earliest',
            enable_auto_commit=False,
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            key_deserializer=lambda k: k.decode('utf-8') if k else None,
            session_timeout_ms=30000,
            heartbeat_interval_ms=10000,
            max_poll_records=100
        )
        self.consumer.subscribe(topics)
        self.topics = topics
    
    def process_message(self, message):
        """Override this method to implement custom processing logic"""
        logger.info(f"Processing: {message.value}")
        return True
    
    def run(self):
        """Main consumer loop with error handling"""
        try:
            for message in self.consumer:
                try:
                    if self.process_message(message):
                        self.consumer.commit()
                        logger.info(
                            f"Committed offset {message.offset} "
                            f"for partition {message.partition}"
                        )
                    else:
                        logger.warning(f"Failed to process message at offset {message.offset}")
                except Exception as e:
                    logger.error(

Note: this example was truncated in the source. See the GitHub repo for the latest full version.

Common Pitfalls

  • Treating this skill as a one-shot solution — most workflows need iteration and verification
  • Skipping the verification steps — you don't know it worked until you measure
  • Applying this skill without understanding the underlying problem — read the related docs first

When NOT to Use This Skill

  • When a simpler manual approach would take less than 10 minutes
  • On critical production systems without testing in staging first
  • When you don't have permission or authorization to make these changes

How to Verify It Worked

  • Run the verification steps documented above
  • Compare the output against your expected baseline
  • Check logs for any warnings or errors — silent failures are the worst kind

Production Considerations

  • Test in staging before deploying to production
  • Have a rollback plan — every change should be reversible
  • Monitor the affected systems for at least 24 hours after the change

Quick Info

CategoryKafka
Difficultyintermediate
Version1.0.0
AuthorClaude Skills Hub
kafkaconsumermessaging

Install command:

curl -o ~/.claude/skills/kafka-consumer.md https://clskills.in/skills/kafka/kafka-consumer.md

Related Kafka Skills

Other Claude Code skills in the same category — free to download.

Want a Kafka skill personalized to YOUR project?

This is a generic skill that works for everyone. Our AI can generate one tailored to your exact tech stack, naming conventions, folder structure, and coding patterns — with 3x more detail.