import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; import { Kafka, Producer, Consumer, EachMessagePayload } from 'kafkajs'; import type { EventPayload } from './event-types'; type EventHandler = (payload: any) => Promise; @Injectable() export class EventBusService implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(EventBusService.name); private kafka: Kafka; private producer: Producer; private consumer: Consumer; private handlers = new Map(); private connected = false; constructor() { const brokers = (process.env.KAFKA_BROKERS ?? 'localhost:9092').split(','); this.kafka = new Kafka({ clientId: 'helix-engage-sidecar', brokers, retry: { retries: 5, initialRetryTime: 1000 }, logLevel: 1, // ERROR only }); this.producer = this.kafka.producer(); this.consumer = this.kafka.consumer({ groupId: 'helix-engage-workers' }); } async onModuleInit() { try { await this.producer.connect(); await this.consumer.connect(); this.connected = true; this.logger.log('Event bus connected (Kafka/Redpanda)'); // Subscribe to all topics we have handlers for // Handlers are registered by consumer modules during their onModuleInit // We start consuming after a short delay to let all handlers register setTimeout(() => this.startConsuming(), 2000); } catch (err: any) { this.logger.warn(`Event bus not available (${err.message}) — running without events`); this.connected = false; } } async onModuleDestroy() { if (this.connected) { await this.consumer.disconnect().catch(() => {}); await this.producer.disconnect().catch(() => {}); } } async emit(topic: string, payload: EventPayload): Promise { if (!this.connected) { this.logger.debug(`[EVENT] Skipped (not connected): ${topic}`); return; } try { await this.producer.send({ topic, messages: [{ value: JSON.stringify(payload), timestamp: Date.now().toString() }], }); this.logger.log(`[EVENT] Emitted: ${topic}`); } catch (err: any) { this.logger.error(`[EVENT] Failed to emit ${topic}: ${err.message}`); } } on(topic: string, handler: EventHandler): void { const existing = this.handlers.get(topic) ?? []; existing.push(handler); this.handlers.set(topic, existing); this.logger.log(`[EVENT] Handler registered for: ${topic}`); } private async startConsuming(): Promise { if (!this.connected) return; const topics = Array.from(this.handlers.keys()); if (topics.length === 0) { this.logger.log('[EVENT] No handlers registered — skipping consumer'); return; } try { for (const topic of topics) { await this.consumer.subscribe({ topic, fromBeginning: false }); } await this.consumer.run({ eachMessage: async (payload: EachMessagePayload) => { const { topic, message } = payload; const handlers = this.handlers.get(topic) ?? []; if (handlers.length === 0 || !message.value) return; try { const data = JSON.parse(message.value.toString()); for (const handler of handlers) { await handler(data).catch(err => this.logger.error(`[EVENT] Handler error on ${topic}: ${err.message}`), ); } } catch (err: any) { this.logger.error(`[EVENT] Parse error on ${topic}: ${err.message}`); } }, }); this.logger.log(`[EVENT] Consuming: ${topics.join(', ')}`); } catch (err: any) { this.logger.error(`[EVENT] Consumer failed: ${err.message}`); } } }