I am stuck with an issue related to Kafka consumer using confluent-kafka’s python library.
CONTEXT
I have a Kafka topic on Azure Ubuntu VM that I need to consume.
SCENARIO
Consumer Script (my_topic_consumer.py) uses confluent-kafka-python to create a consumer (shown below) and subscribe to the ‘my_topic’ topic and trigger REST API based on the message content. The issue is that the consumer is not able to read messages from the Kafka cluster.
from confluent_kafka import Consumer, KafkaError
import requests
import json
# Kafka configuration
kafka_bootstrap_servers="X.X.X.X:9092"
kafka_topic="my_topic"
kafka_group_id = 'test-consumer-group'
# REST API configuration
api_url="https://X.X.X.X/api/v1/webhooks/cnas/zabbix_alerts?st2-api-key=ODFjZGJkNzRjZDQ5MDk1ZDljMzMxYTk5MzE5OWQ0YzU3N2ZmODBlZGViOWRiMjBjZDMxM2Q1OGNjYWQyNTgxYQ"
# Create Kafka consumer
consumer_conf = {
'bootstrap.servers': kafka_bootstrap_servers,
'group.id': kafka_group_id,
'auto.offset.reset': 'earliest'
#'enable.auto.commit': False # Disable auto-commit
}
consumer = Consumer(consumer_conf)
consumer.subscribe([kafka_topic])
try:
while True:
msg = consumer.poll(10.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
continue
else:
print(msg.error())
break
# Print received Kafka message
print(f"Received Kafka message: {msg.value().decode('utf-8')}")
# Process the received message
try:
message_value = json.loads(msg.value().decode('utf-8'))
# Trigger REST API with the message content
response = requests.post(api_url, json=message_value, verify=False)
# Print API response
print(f"API response: {response.text}")
if response.status_code == 202:
print(f"API call successful for message: {message_value}")
consumer.commit() # Manually commit offset
else:
print(f"API call failed with status code {response.status_code}")
except Exception as e:
print(f"Error processing message: {e}")
except KeyboardInterrupt:
pass
finally:
# Close down consumer to commit final offsets.
consumer.close()
While running the console-consumer script, I am able to see the data:
```
root@CNAS-STACKSTORM-DEV-1:/home/kafka/kafka/bin# ./kafka-console-consumer.sh --bootstrap-server X.X.X.X:9092 --from-beginning --topic my_topic notification --partition 0
{"ProblemID":"26547402","Incident":"PROBLEM","Severity":" Warning","Problem":"HighCPUUsage","ImpactedEntity":"Dell XR12","ProblemDescription":"HighCPUUsage","ProblemStatus":"OPEN","StartDateTime":"2023-02-21 14:44:02","EndDateTime":"","NodeName":"esxi012.5g.lab","NodeType":"ESXi 7.0","NodeMake":"VMWare","HWDetails":"esxi012.5g.lab-10.151.18.133","Location":"Sciangai 53,Rome,Rome","Country":"Italy","Region":"Rome","City":"Rome","DCLocation":"Accenture - Via Sciangai 53","Duration":"2s","HostName":"4c4c4544-0036-5610-8036-b7c04f325133","ProblemAcknowledged":"No","TopologyName":"CPU usage"}
Any help and feedback is appreciated.