confluent-kafka-python consumer unable to read messages from Kafka Topic

I am stuck with an issue related to Kafka consumer using confluent-kafka’s python library.

CONTEXT
I have a Kafka topic on Azure Ubuntu VM that I need to consume.

SCENARIO
Consumer Script (my_topic_consumer.py) uses confluent-kafka-python to create a consumer (shown below) and subscribe to the ‘my_topic’ topic and trigger REST API based on the message content. The issue is that the consumer is not able to read messages from the Kafka cluster.

from confluent_kafka import Consumer, KafkaError
import requests
import json

# Kafka configuration
kafka_bootstrap_servers="X.X.X.X:9092"
kafka_topic="my_topic"
kafka_group_id = 'test-consumer-group'

# REST API configuration
api_url="https://X.X.X.X/api/v1/webhooks/cnas/zabbix_alerts?st2-api-key=ODFjZGJkNzRjZDQ5MDk1ZDljMzMxYTk5MzE5OWQ0YzU3N2ZmODBlZGViOWRiMjBjZDMxM2Q1OGNjYWQyNTgxYQ"

# Create Kafka consumer
consumer_conf = {
    'bootstrap.servers': kafka_bootstrap_servers,
    'group.id': kafka_group_id,
    'auto.offset.reset': 'earliest'
    #'enable.auto.commit': False  # Disable auto-commit
}

consumer = Consumer(consumer_conf)
consumer.subscribe([kafka_topic])

try:
    while True:
        msg = consumer.poll(10.0)

        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition event
                continue
            else:
                print(msg.error())
                break

        # Print received Kafka message
        print(f"Received Kafka message: {msg.value().decode('utf-8')}")

# Process the received message
        try:
            message_value = json.loads(msg.value().decode('utf-8'))

            # Trigger REST API with the message content
            response = requests.post(api_url, json=message_value, verify=False)

            # Print API response
            print(f"API response: {response.text}")

            if response.status_code == 202:
                print(f"API call successful for message: {message_value}")
                consumer.commit()  # Manually commit offset
            else:
                print(f"API call failed with status code {response.status_code}")

        except Exception as e:
            print(f"Error processing message: {e}")

except KeyboardInterrupt:
    pass

finally:
    # Close down consumer to commit final offsets.
    consumer.close()




While running the console-consumer script, I am able to see the data:

    ```
    root@CNAS-STACKSTORM-DEV-1:/home/kafka/kafka/bin# ./kafka-console-consumer.sh --bootstrap-server X.X.X.X:9092 --from-beginning --topic my_topic  notification --partition 0
    {"ProblemID":"26547402","Incident":"PROBLEM","Severity":" Warning","Problem":"HighCPUUsage","ImpactedEntity":"Dell XR12","ProblemDescription":"HighCPUUsage","ProblemStatus":"OPEN","StartDateTime":"2023-02-21 14:44:02","EndDateTime":"","NodeName":"esxi012.5g.lab","NodeType":"ESXi 7.0","NodeMake":"VMWare","HWDetails":"esxi012.5g.lab-10.151.18.133","Location":"Sciangai 53,Rome,Rome","Country":"Italy","Region":"Rome","City":"Rome","DCLocation":"Accenture - Via Sciangai 53","Duration":"2s","HostName":"4c4c4544-0036-5610-8036-b7c04f325133","ProblemAcknowledged":"No","TopologyName":"CPU usage"}

Any help and feedback is appreciated.

Leave a Comment