- Client network socket disconnected before secure TLS connection was established
- SASL SCRAM SHA512 authentication failed: Not connected
this is an intermittent issue. Major times I don’t encounter this error. Sometimes it occurs, especially like when I execute the producer multiple times simultaneously.
I just need thoughts from experienced people here, what might be the solution to fix this intermittent problem.
Btw, I’m using AWS MSK
Here is my code:
/* eslint-disable import/no-extraneous-dependencies */
/* eslint-disable lines-between-class-members */
const httpStatus = require('http-status');
const { Kafka, Partitioners } = require('kafkajs');
const logger = require('../logger');
const { KAFKA_ACKS } = require('../constants/kafka-constants');
const { KafkaInstanceError } = require('../../utils/custom-errors/class-errors');
const { KAFKA_INSTANCE_ERRORS } = require('../constants/error-messages');
const { HTTP_RESPONSE_CODE } = require('../../utils/response-codes');
class Producer {
#config;
#producerInstance;
/**
* Kafka Config Object (Producer)
* @type {
* {
* topic: string,
* kafka: object,
* producer: object
* producerSendOptions: object
* }
* }
*/
constructor(kafkaConfig = {}) {
this.#config = kafkaConfig;
this.#producerInstance = this.#initProducer();
}
#initProducer() {
const kafka = new Kafka(this.#config.kafka);
const producer = kafka.producer({
...this.#config?.producer,
createPartitioner: Partitioners.DefaultPartitioner,
});
return producer;
}
async produceEvent(events = {}) {
try {
if (Object.keys(events).length === 0) {
throw new KafkaInstanceError(
KAFKA_INSTANCE_ERRORS.EVENT_OBJECT_EMPTY,
HTTP_RESPONSE_CODE.DATA_PROCESSING_ERROR,
httpStatus.BAD_REQUEST,
);
}
await this.#producerInstance.connect();
logger.info('sending events to kafka notification');
await this.#producerInstance.send({
topic: this.#config.topic,
acks: -1,
messages: [events],
});
logger.info('event sent to kafka notification');
return events;
} catch (error) {
logger.error(error.message);
throw new KafkaInstanceError(
error.message,
error.code || HTTP_RESPONSE_CODE.INTERNAL_SERVER_ERROR,
error.statusCode || httpStatus.INTERNAL_SERVER_ERROR,
);
} finally {
await this.#producerInstance.disconnect();
}
}
}
module.exports = {
Producer,
};