Intro
Microservices architecture is gaining traction in enterprise-level applications for its ability to enhance scalability, maintainability, and flexibility. Event-Driven Architecture (EDA) is particularly beneficial in the Microservices context, enabling microservices to reduce dependencies, allowing each microservice to evolve autonomously.
Kafka has emerged as a leading choice for an event bus in EDA. Its popularity can be attributed to its scalability, fault-tolerance, and durability. Kafka's distributed nature allows it to handle large volumes of events with low latency, making it well-suited for the demands of modern, distributed systems. Additionally, Kafka's durability ensures that events are persisted, enabling reliable event delivery even in the face of failures.
Problem
According to Confluent, the creators of Kafka, effective topic management is crucial for both optimizing the performance and clarity of system. They recommend the following considerations:
- if one entity depends on another (e.g. an address belongs to a customer), or if they are often needed together, they might as well go in the same topic.
- If several consumers all read a particular group of topics, this suggests that maybe those topics should be combined. If you combine the fine-grained topics into coarser-grained ones, some consumers may receive unwanted events that they need to ignore. That is not a big deal: consuming messages from Kafka is very cheap, so even if a consumer ends up ignoring half of the events, the cost of this overconsumption is probably not significant.
nestjs/microservices
library would be the one you are going to use first when you introduce Microservices architecture(or EDA) in NestJS applications. While the library simplifies producing and consuming messages with Kafka in applications, it does come with a limitation. It is related to grouping events into a single topic, which has been recommended by Confluent as best practice.The library's current implementation uses the
@EventPattern
decorator to subscribe a topic to a handler method. However, this mapping of the pattern to a single topic can make it tricky to implement the Rule of Thumb for event grouping effectively.Solution
Before delving into the solution, let's revisit the fundamental goal – efficient event grouping using NestJS along with Kafka. We will try to follow rule of thumb that Confluent provided.
Events of the same type or category should be grouped within a single topic. This ensures that events with similar characteristics and purposes are logically organized.
Given the limitations outlined above, let's explore your solution to overcome these challenges and implement effective event grouping in NestJS using Kafka:
Implement custom ServerKafka strategy
This class extends
ServerKafka
and overrides the bindEvents
method to customize event binding logic. It parses the pattern registered by @EventPattern()
decorators to convert it into a single topic name, enabling event grouping. The consumer subscribes to the topic using converted name, ensuring proper application-wide event distribution.export class CustomServerKafka extends ServerKafka { constructor(readonly options: KafkaOptions['options'] & { deserializer: CustomKafkaRequestDeserializer }) { super(options); } override async bindEvents(consumer: Consumer): Promise<void> { const registeredPatterns = [...this.messageHandlers.keys()]; const consumerSubscribeOptions = this.options?.subscribe ?? {}; const subscribePattern = (pattern: string) => consumer.subscribe({ topic: HandlerPatternTopicTransformer.toTopic(pattern), ...consumerSubscribeOptions, }); await Promise.all(registeredPatterns.map(subscribePattern)); const consumerRunOptions = Object.assign(this.options?.run ?? {}, { eachMessage: this.getMessageHandler(), }); await consumer.run(consumerRunOptions); } }
Create custom Deserializer
This class implements the
Deserializer
interface to extract eventGroup
from the topic name and eventName
from the metadata of the message. It reconstructs the handler pattern using these details, allowing the consumer to invoke the proper handler methods in the application. export interface KafkaEvent<T extends Array<Record<keyof T, unknown>> | Record<keyof T, unknown>> { id: string; eventName: string; producedAt: string; payload: T; }
Â
import { Deserializer, IncomingEvent, IncomingRequest } from '@nestjs/microservices'; import { KafkaRequest } from '@nestjs/microservices/serializers'; export class CustomKafkaRequestDeserializer implements Deserializer { deserialize(data: KafkaRequest<KafkaEvent<unknown>>, options?: { channel: string }): IncomingRequest | IncomingEvent { if (!options) { return { pattern: undefined, data: undefined }; } const kafkaEvent = Object.assign({}, data.value); const { dataName: eventGroup } = HandlerPatternTopicTransformer.parseTopic(options.channel); return { pattern: HandlerPatternTopicTransformer.createHandlerPatternFrom({ eventGroup, eventName: kafkaEvent.eventName }), data: kafkaEvent.payload, }; } }
Connecting Microservices to NestApplication
On consumer microservice, you need to connect the microservice to the
NestApplication
instance using CustomServerKafka
and CustomKafkaRequestDeserializer
that we have implemented. import { NestFactory } from '@nestjs/core'; import { MicroserviceOptions } from '@nestjs/microservices'; import { AppModule } from './app.module'; async function bootstrap() { const app = await NestFactory.create(AppModule); app.connectMicroservice<MicroserviceOptions>({ strategy: new CustomServerKafka({ client: { brokers: ['localhost:9092'] }, deserializer: new CustomKafkaRequestDeserializer(), }), }); await app.startAllMicroservices(); } bootstrap();
MessageProducer
On producer microservice, it is required to produce message in a specified format. The
MessageProducer
class abstracts the complexity of message production and ensures adherence to the specified topic and message format rules.import { Inject } from '@nestjs/common'; import { ClientKafka } from '@nestjs/microservices'; import { v4 } from 'uuid'; import { HandlerPatternTopicTransformer } from '../common'; import { KafkaEvent } from '../custom-kafka/type'; import { KAFKA_CLIENT_INJECTION_TOKEN } from './constant'; import { EnqueueParams, LogParams, ProduceParams } from './type'; export class MessageProducer { constructor(@Inject(KAFKA_CLIENT_INJECTION_TOKEN) private readonly client: ClientKafka) {} enqueue<Payload>(params: EnqueueParams<Payload>) { const { eventGroup, eventName, payload } = params; this.produce<Payload>({ eventGroup, eventName, payload, message: 'queuing' }); } log<Payload>(params: LogParams<Payload>) { const { eventGroup, eventName, payload } = params; this.produce<Payload>({ eventGroup, eventName, payload, message: 'logging' }); } private produce<Payload>({ eventGroup, eventName, payload, message }: ProduceParams<Payload>) { const topic = HandlerPatternTopicTransformer.createTopicFrom({ message, dataName: eventGroup, dataFormat: 'json' }); const event: KafkaEvent<Payload> = { id: v4(), eventName, producedAt: new Date().toISOString(), payload }; this.client.emit(topic, event); } }
You can check the entire codes in following repository.
- GitHub repository: https://github.com/Pigrabbit/nest-kafka
Conclusion
Through the creation of
CustomServerKafka
and CustomKafkaRequestDeserializer
, we've forged a path to meet Confluent's guidelines, enabling the efficient grouping of events into single topics within NestJS applications.In conclusion, by understanding the principles of effective event grouping and implementing custom solutions tailored to their specific needs, developers can harness the power of EDA to build resilient and adaptable Microservices architectures that meet the demands of modern applications.
With these insights and tools at hand, developers are well-equipped to navigate the complexities of event-driven systems and drive innovation in their respective domains. If you have any further questions or require additional assistance, feel free to reach out.
Reference
Â