Kafka JS Connection issue

  1. Client network socket disconnected before secure TLS connection was established
  2. SASL SCRAM SHA512 authentication failed: Not connected

this is an intermittent issue. Major times I don’t encounter this error. Sometimes it occurs, especially like when I execute the producer multiple times simultaneously.

I just need thoughts from experienced people here, what might be the solution to fix this intermittent problem.

Btw, I’m using AWS MSK

Here is my code:

/* eslint-disable import/no-extraneous-dependencies */
/* eslint-disable lines-between-class-members */
const httpStatus = require('http-status');
const { Kafka, Partitioners } = require('kafkajs');

const logger = require('../logger');

const { KAFKA_ACKS } = require('../constants/kafka-constants');
const { KafkaInstanceError } = require('../../utils/custom-errors/class-errors');
const { KAFKA_INSTANCE_ERRORS } = require('../constants/error-messages');
const { HTTP_RESPONSE_CODE } = require('../../utils/response-codes');

    class Producer {
      #config;
      #producerInstance;
    
      /**
       * Kafka Config Object (Producer)
       * @type {
       * {
       *          topic: string,
       *          kafka: object,
       *          producer: object
       *          producerSendOptions: object
       * }
       * }
       */
      constructor(kafkaConfig = {}) {
        this.#config = kafkaConfig;
        this.#producerInstance = this.#initProducer();
      }
    
      #initProducer() {
        const kafka = new Kafka(this.#config.kafka);
        const producer = kafka.producer({
          ...this.#config?.producer,
          createPartitioner: Partitioners.DefaultPartitioner,
        });
        return producer;
      }
    
      async produceEvent(events = {}) {
        try {
          if (Object.keys(events).length === 0) {
            throw new KafkaInstanceError(
              KAFKA_INSTANCE_ERRORS.EVENT_OBJECT_EMPTY,
              HTTP_RESPONSE_CODE.DATA_PROCESSING_ERROR,
              httpStatus.BAD_REQUEST,
            );
          }
    
          await this.#producerInstance.connect();
          logger.info('sending events to kafka notification');
    
          await this.#producerInstance.send({
            topic: this.#config.topic,
            acks: -1,
            messages: [events],
          });
    
          logger.info('event sent to kafka notification');
          return events;
        } catch (error) {
          logger.error(error.message);
          throw new KafkaInstanceError(
            error.message,
            error.code || HTTP_RESPONSE_CODE.INTERNAL_SERVER_ERROR,
            error.statusCode || httpStatus.INTERNAL_SERVER_ERROR,
          );
        } finally {
          await this.#producerInstance.disconnect();
        }
      }
    }
    
    module.exports = {
      Producer,
    };

Leave a Comment