RegularDev

Contents

Intro

Recently, I encountered a challenge – I needed to access a KafkaJS consumer within a NestJS microservice, but there was no built-in method for this. After searching the Internet, I found out that creating a custom strategy was the key to gaining more control over the transport layer. Kamil Mysliwiec’s comment on a related GitHub issue pointed me in the right direction.

At first, I missed the important “inherit from existing one” part of Kamil Mysliwiec’s comment. This meant I had to manually handle all Kafka message processing logic while keeping the Kafka-specific CustomTransportStrategy interface to retain the convenience of the @MessagePattern decorator.

This approach meant writing a lot of new code compared to the standard Transport.KAFKA usage in NestJS microservices. As a developer, I prefer using well-maintained and tested features instead of reinventing the wheel. So, I explored the source code of the built-in ServerKafka implementation for insights.

The Problem

Let’s take a closer look at the core issue. I had been dealing with KafkaJS consumer’s silent failures. There are ongoing discussions about this odd behavior on GitHub Issues. Additionally, my specific cases were worsened by connection issues between Kafka and my microservice. To tackle these problems, I decided to use a health check endpoint to reboot my microservice’s container whenever the consumer failed.

Accessing the KafkaJS Consumer Instance

A typical implementation of Transport.KAFKA in a NestJS microservice looks something like this:

// code of Transport.KAFKA NestJS microservice implementation
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { NestApplication, NestFactory } from '@nestjs/core';
import { AppModule } from 'src/app';
 
async function bootstrap() {
	const app: NestApplication = await NestFactory.create(AppModule);
 
	const kafkaMicroservice = app.connectMicroservice<MicroserviceOptions>({
	    transport: Transport.KAFKA,
	    options: {
	        client: {
		        brokers: ['localhost:14244', 'localhost:14245']
	        },
	        consumer: {
	            groupId: 'some-group-id',
	        },
	    },
	});
 
	await kafkaMicroservice.listen();
 
	await app.listen(3030, '0.0.0.0');
}

For the strategy feature:

// code of Custom Strategy NestJS microservice implementation
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { NestApplication, NestFactory } from '@nestjs/core';
import { AppModule } from 'src/app';
import { CustomStrategy } from 'src/custom-strategy';
 
async function bootstrap() {
	const app: NestApplication = await NestFactory.create(AppModule);
 
	const kafkaMicroservice = app.connectMicroservice<MicroserviceOptions>({
	    strategy: new CustomStrategy(
		    {
		        client: {
			        brokers: ['localhost:14244', 'localhost:14245']
		        },
		        consumer: {
		            groupId: 'some-group-id',
		        }
		    }
		),
	});
 
	await kafkaMicroservice.listen();
 
	await app.listen(3030, '0.0.0.0');
}

To use the strategy feature, we need to implement a class with the CustomTransportStrategy interface. In our KafkaJS case, it looks like this:

// code of implementation of CustomStrategy NestJS microservice class to use KafkaJS
import { Server, CustomTransportStrategy } from '@nestjs/microservices';
import { Consumer, Kafka, KafkaMessage } from 'kafkajs';
 
export class KafkaCustomStrategy extends Server implements CustomTransportStrategy {
    private readonly kafka: Kafka;
 
    private readonly consumer: Consumer;
 
    constructor(private readonly options: { clientId?: string; brokers: string[]; groupId: string }) {
        super();
        this.kafka = new Kafka({
            clientId: this.options.clientId,
            brokers: this.options.brokers,
        });
        this.consumer = this.kafka.consumer({ groupId: this.options.groupId });
    }
 
    async listen(callback: () => void) {
        await this.consumer.connect();
        const patterns = [...this.messageHandlers.keys()];
 
        await Promise.all(
            patterns.map(async (pattern) => {
                await this.consumer.subscribe({ topic: pattern });
            }),
        );
 
        await this.consumer.run({
            eachMessage: async ({ topic, partition, message }: { topic: string; partition: number; message: KafkaMessage }) => {
                const handler = this.getHandlerByPattern(topic);
 
                if (handler) {
                    const { value } = message;
 
                    await handler(value?.toString());
                }
            },
        });
 
        callback();
    }
 
    async close() {
        await this.consumer.disconnect();
    }
}

Pay attention to these lines in the ‘listen’ method:

// code that is needed for @MessagePattern to work properly
const patterns = [...this.messageHandlers.keys()];
 
await Promise.all(
	patterns.map(async (pattern) => {
		await this.consumer.subscribe({ topic: pattern });
	}),
);
 
await this.consumer.run({
	eachMessage: async ({ topic, partition, message }: { topic: string; partition: number; message: KafkaMessage }) => {
		const handler = this.getHandlerByPattern(topic);
 
		if (handler) {
			const { value } = message;
 
			await handler(value?.toString());
		}
	},
});

You should use the messageHandlers property inherited from the Server class to collect topic names for the consumer to subscribe to. The messageHandlers property contains an object with topic names as keys and corresponding handlers as values. These message handlers come from all Controller methods marked with the @MessagePattern decorator. Similarly, we use the getHandlerByPattern method inherited from the Server class to retrieve a specific handler by topic name.

That’s quite a bit of new code, isn’t it? Fortunately, the built-in class used by default for Transport.KAFKA in NestJS microservices already implements the CustomTransportStrategy interface! Let’s examine the source code to locate where the Kafka consumer instance is stored:

// source code of KafkaStrategy
public async listen(
  callback: (err?: unknown, ...optionalParams: unknown[]) => void,
): Promise<void> {
  try {
    this.client = this.createClient();
    await this.start(callback);
  } catch (err) {
    callback(err);
  }
}
 
public async start(callback: () => void): Promise<void> {
  const consumerOptions = Object.assign(this.options.consumer || {}, {
    groupId: this.groupId,
  });
  this.consumer = this.client.consumer(consumerOptions);
  this.producer = this.client.producer(this.options.producer);
 
  await this.consumer.connect();
  await this.producer.connect();
  await this.bindEvents(this.consumer);
  callback();
}

There it is. The Kafka consumer is instantiated in the start method and stored in the ‘consumer’ property. When we invoke the listen method of a microservice, NestJS will call the ‘listen’ method of the ServerKafka class instance. This is an ideal place to access the consumer instance by extending the ServerKafka class and overriding the listen method:

// code of ServerKafka class to illustrate point for accessing KafkaJS consumer
public async listen(
    callback: (err?: unknown, ...optionalParams: unknown[]) => void,
): Promise<void> {
    try {
        this.client = this.createClient();
        await this.start(callback);
 
+       this.setupConsumerHealthCheck();
 
    } catch (err) {
        callback(err);
    }
}

Setting Up a Health Check

Now that we’ve found the KafkaJS consumer instance, we need to connect its HEARTBEAT event with a health check endpoint. Let’s first set up the health check endpoint using the NestJS documentation:

‘health.controller.ts'

// code of several files about setting up Terminus health check
import { Controller, Get } from '@nestjs/common';
import { HealthCheckService, HealthCheck } from '@nestjs/terminus';
import { CustomConsumerHealthIndicator } from './custom-consumer.health';
 
@Controller('health')
export class HealthController {
  constructor(
    private health: HealthCheckService,
    private customConsumerHealthIndicator: CustomConsumerHealthIndicator,
  ) {}
 
  @Get()
  @HealthCheck()
  check() {
    return this.health.check([
      () => this.customConsumerHealthIndicator.isHealthy(),
    ]);
  }
}

'health.module.ts'

import { Module } from '@nestjs/common';
import { TerminusModule } from '@nestjs/terminus';
 
import { HealthController } from './health.controller';
import { CustomConsumerHealthIndicator } from './custom-consumer.health';
 
@Module({
    controllers: [HealthController],
    imports: [TerminusModule],
    providers: [CustomConsumerHealthIndicator],
})
export class HealthModule { }

'custom-consumer.health.ts’

import { Injectable } from '@nestjs/common';
import { HealthCheckError, HealthIndicator, HealthIndicatorResult } from '@nestjs/terminus';
 
@Injectable()
export class CustomConsumerHealthIndicator extends HealthIndicator {
    private isHealthy = true;
 
 
    async isHealthy(): Promise<HealthIndicatorResult> {
	    const currentStatus = this.isHealthy;
        const result = this.getStatus('kafka-consumer-health', currentStatus);
 
        if (currentStatus) {
            return result;
        }
        throw new HealthCheckError('Consumer is down!', result);
    }
 
    setIsNotHealthy() {
        this.isHealthy = false;
    }
}

Next, we need to use the CustomConsumerHealthIndicator in our ExtendedKafkaStrategy ‘listen’ method. We can do this by passing the CustomConsumerHealthIndicator instance to the ExtendedKafkaStrategy constructor:

// code that illustrate how to pass a health check service to the microservice
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { NestApplication, NestFactory } from '@nestjs/core';
import { AppModule } from 'src/app';
import { CustomConsumerHealthIndicator } from 'src/modules/health/custom-consumer.health';
 
async function bootstrap() {
	const app: NestApplication = await NestFactory.create(AppModule);
 
	const customConsumerHealthIndicator = app.get(CustomConsumerHealthIndicator);
 
	const kafkaMicroservice = app.connectMicroservice<MicroserviceOptions>({
	    strategy: new ExtendedKafkaStrategy(
		    {
		        client: {
			        brokers: ['localhost:14244', 'localhost:14245']
		        },
		        consumer: {
		            groupId: 'some-group-id',
		        },
		    },
		    customConsumerHealthIndicator,
	    ),
	});
 
	await kafkaMicroservice.listen();
 
	await app.listen(3030, '0.0.0.0');
}

All set! Now, we can implement the ExtendedKafkaStrategy class, extending the ServerKafka class and accepting two arguments in the constructor:

We also need to override the listen method of the ServerKafka class to add logic for checking the consumer’s health by measuring intervals between HEARTBEAT events:

// code of ExtendedKafkaStrategy class
import { KafkaOptions, ServerKafka } from '@nestjs/microservices';
import { CustomConsumerHealthIndicator } from 'src/modules/health/custom-consumer.health';
 
const ALLOWED_DELAY = 600000;
const CHECK_INTERVAL = 10000;
 
export class ExtendedKafkaStrategy extends ServerKafka {
    lastHeartbeat: Date;
 
    constructor(
        protected readonly options: KafkaOptions['options'],
        protected readonly healthIndicator: CustomConsumerHealthIndicator,
    ) {
        super(options);
        this.lastHeartbeat = new Date();
    }
 
    public async listen(
        callback: (err?: unknown, ...optionalParams: unknown[]) => void,
    ): Promise<void> {
        try {
            this.client = this.createClient();
            await this.start(callback);
 
            this.setupConsumerHealthCheck();
 
        } catch (err) {
            callback(err);
        }
    }
 
    /*
    checks every 10 seconds delay between HEARTBEAT events and if it's more than 10 minutes - consumer is dead or is not OK,    so it break healthcheck and eventually lead to container's restart    */
    setupConsumerHealthCheck() {
        const { HEARTBEAT } = this.consumer.events;
 
        this.consumer.on(HEARTBEAT, () => {
            this.lastHeartbeat = new Date();
        });
 
        setInterval(() => {
            const now = new Date();
            const currentDelay = now.getTime() - this.lastHeartbeat.getTime();
 
            if (currentDelay >= ALLOWED_DELAY) {
                this.healthIndicator.setStatusToDOWN();
            }
        }, CHECK_INTERVAL);
    }
}

I chose a 10-minute delay as the allowed interval because KafkaJS consumer’s reconnection sometimes requires Kafka’s group rebalancing, which can take a few minutes.

Conclusion

That’s it! Now, if the consumer unexpectedly fails or encounters issues that cannot be resolved by automatic reconnection, your microservice can signal to your environment that it needs a reboot.