diff --git a/package-lock.json b/package-lock.json index 04aac62..5d99507 100644 --- a/package-lock.json +++ b/package-lock.json @@ -24,6 +24,7 @@ "ai": "^6.0.116", "axios": "^1.13.6", "ioredis": "^5.10.1", + "kafkajs": "^2.2.4", "reflect-metadata": "^0.2.2", "rxjs": "^7.8.1", "socket.io": "^4.8.3", @@ -10569,6 +10570,15 @@ "safe-buffer": "^5.0.1" } }, + "node_modules/kafkajs": { + "version": "2.2.4", + "resolved": "http://localhost:4873/kafkajs/-/kafkajs-2.2.4.tgz", + "integrity": "sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA==", + "license": "MIT", + "engines": { + "node": ">=14.0.0" + } + }, "node_modules/keyv": { "version": "4.5.4", "resolved": "http://localhost:4873/keyv/-/keyv-4.5.4.tgz", diff --git a/package.json b/package.json index 1dc9ec1..50b5bcd 100644 --- a/package.json +++ b/package.json @@ -35,6 +35,7 @@ "ai": "^6.0.116", "axios": "^1.13.6", "ioredis": "^5.10.1", + "kafkajs": "^2.2.4", "reflect-metadata": "^0.2.2", "rxjs": "^7.8.1", "socket.io": "^4.8.3", diff --git a/src/app.module.ts b/src/app.module.ts index 5ea0383..6e7bc18 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -15,6 +15,7 @@ import { SearchModule } from './search/search.module'; import { SupervisorModule } from './supervisor/supervisor.module'; import { MaintModule } from './maint/maint.module'; import { RecordingsModule } from './recordings/recordings.module'; +import { EventsModule } from './events/events.module'; @Module({ imports: [ @@ -36,6 +37,7 @@ import { RecordingsModule } from './recordings/recordings.module'; SupervisorModule, MaintModule, RecordingsModule, + EventsModule, ], }) export class AppModule {} diff --git a/src/events/consumers/ai-insight.consumer.ts b/src/events/consumers/ai-insight.consumer.ts new file mode 100644 index 0000000..63cddff --- /dev/null +++ b/src/events/consumers/ai-insight.consumer.ts @@ -0,0 +1,119 @@ +import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { generateObject } from 'ai'; +import { z } from 'zod'; +import { EventBusService } from '../event-bus.service'; +import { Topics } from '../event-types'; +import type { CallCompletedEvent } from '../event-types'; +import { PlatformGraphqlService } from '../../platform/platform-graphql.service'; +import { createAiModel } from '../../ai/ai-provider'; +import type { LanguageModel } from 'ai'; + +@Injectable() +export class AiInsightConsumer implements OnModuleInit { + private readonly logger = new Logger(AiInsightConsumer.name); + private readonly aiModel: LanguageModel | null; + + constructor( + private eventBus: EventBusService, + private platform: PlatformGraphqlService, + private config: ConfigService, + ) { + this.aiModel = createAiModel(config); + } + + onModuleInit() { + this.eventBus.on(Topics.CALL_COMPLETED, (event: CallCompletedEvent) => this.handleCallCompleted(event)); + } + + private async handleCallCompleted(event: CallCompletedEvent): Promise { + if (!event.leadId) { + this.logger.debug('[AI-INSIGHT] No leadId — skipping'); + return; + } + + if (!this.aiModel) { + this.logger.debug('[AI-INSIGHT] No AI model configured — skipping'); + return; + } + + this.logger.log(`[AI-INSIGHT] Generating insight for lead ${event.leadId}`); + + try { + // Fetch lead + all activities + const data = await this.platform.query( + `{ leads(filter: { id: { eq: "${event.leadId}" } }) { edges { node { + id name contactName { firstName lastName } + status source interestedService + contactAttempts lastContacted + } } } }`, + ); + const lead = data?.leads?.edges?.[0]?.node; + if (!lead) return; + + const activityData = await this.platform.query( + `{ leadActivities(first: 20, filter: { leadId: { eq: "${event.leadId}" } }, orderBy: [{ occurredAt: DescNullsLast }]) { + edges { node { activityType summary occurredAt channel durationSec outcome } } + } }`, + ); + const activities = activityData?.leadActivities?.edges?.map((e: any) => e.node) ?? []; + + const leadName = lead.contactName + ? `${lead.contactName.firstName ?? ''} ${lead.contactName.lastName ?? ''}`.trim() + : lead.name ?? 'Unknown'; + + // Build context + const activitySummary = activities.map((a: any) => + `${a.activityType}: ${a.summary} (${a.occurredAt ?? 'unknown date'})`, + ).join('\n'); + + // Generate insight + const { object } = await generateObject({ + model: this.aiModel, + schema: z.object({ + summary: z.string().describe('2-3 sentence summary of this lead based on all their interactions'), + suggestedAction: z.string().describe('One clear next action for the agent'), + }), + system: `You are a CRM assistant for Global Hospital Bangalore. +Generate a brief, actionable insight about this lead based on their interaction history. +Be specific — reference actual dates, dispositions, and patterns. +If the lead has booked appointments, mention upcoming ones. +If they keep calling about the same thing, note the pattern.`, + prompt: `Lead: ${leadName} +Status: ${lead.status ?? 'Unknown'} +Source: ${lead.source ?? 'Unknown'} +Interested in: ${lead.interestedService ?? 'Not specified'} +Contact attempts: ${lead.contactAttempts ?? 0} +Last contacted: ${lead.lastContacted ?? 'Never'} + +Recent activity (newest first): +${activitySummary || 'No activity recorded'} + +Latest call: +- Direction: ${event.direction} +- Duration: ${event.durationSec}s +- Disposition: ${event.disposition} +- Notes: ${event.notes ?? 'None'}`, + maxOutputTokens: 200, + }); + + // Update lead with new AI insight + await this.platform.query( + `mutation($id: UUID!, $data: LeadUpdateInput!) { updateLead(id: $id, data: $data) { id } }`, + { + id: event.leadId, + data: { + aiSummary: object.summary, + aiSuggestedAction: object.suggestedAction, + lastContacted: new Date().toISOString(), + contactAttempts: (lead.contactAttempts ?? 0) + 1, + }, + }, + ); + + this.logger.log(`[AI-INSIGHT] Updated lead ${event.leadId}: "${object.summary.substring(0, 60)}..."`); + } catch (err: any) { + this.logger.error(`[AI-INSIGHT] Failed for lead ${event.leadId}: ${err.message}`); + } + } +} diff --git a/src/events/event-bus.service.ts b/src/events/event-bus.service.ts new file mode 100644 index 0000000..3903517 --- /dev/null +++ b/src/events/event-bus.service.ts @@ -0,0 +1,114 @@ +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import { Kafka, Producer, Consumer, EachMessagePayload } from 'kafkajs'; +import type { EventPayload } from './event-types'; + +type EventHandler = (payload: any) => Promise; + +@Injectable() +export class EventBusService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(EventBusService.name); + private kafka: Kafka; + private producer: Producer; + private consumer: Consumer; + private handlers = new Map(); + private connected = false; + + constructor() { + const brokers = (process.env.KAFKA_BROKERS ?? 'localhost:9092').split(','); + this.kafka = new Kafka({ + clientId: 'helix-engage-sidecar', + brokers, + retry: { retries: 5, initialRetryTime: 1000 }, + logLevel: 1, // ERROR only + }); + this.producer = this.kafka.producer(); + this.consumer = this.kafka.consumer({ groupId: 'helix-engage-workers' }); + } + + async onModuleInit() { + try { + await this.producer.connect(); + await this.consumer.connect(); + this.connected = true; + this.logger.log('Event bus connected (Kafka/Redpanda)'); + + // Subscribe to all topics we have handlers for + // Handlers are registered by consumer modules during their onModuleInit + // We start consuming after a short delay to let all handlers register + setTimeout(() => this.startConsuming(), 2000); + } catch (err: any) { + this.logger.warn(`Event bus not available (${err.message}) — running without events`); + this.connected = false; + } + } + + async onModuleDestroy() { + if (this.connected) { + await this.consumer.disconnect().catch(() => {}); + await this.producer.disconnect().catch(() => {}); + } + } + + async emit(topic: string, payload: EventPayload): Promise { + if (!this.connected) { + this.logger.debug(`[EVENT] Skipped (not connected): ${topic}`); + return; + } + + try { + await this.producer.send({ + topic, + messages: [{ value: JSON.stringify(payload), timestamp: Date.now().toString() }], + }); + this.logger.log(`[EVENT] Emitted: ${topic}`); + } catch (err: any) { + this.logger.error(`[EVENT] Failed to emit ${topic}: ${err.message}`); + } + } + + on(topic: string, handler: EventHandler): void { + const existing = this.handlers.get(topic) ?? []; + existing.push(handler); + this.handlers.set(topic, existing); + this.logger.log(`[EVENT] Handler registered for: ${topic}`); + } + + private async startConsuming(): Promise { + if (!this.connected) return; + + const topics = Array.from(this.handlers.keys()); + if (topics.length === 0) { + this.logger.log('[EVENT] No handlers registered — skipping consumer'); + return; + } + + try { + for (const topic of topics) { + await this.consumer.subscribe({ topic, fromBeginning: false }); + } + + await this.consumer.run({ + eachMessage: async (payload: EachMessagePayload) => { + const { topic, message } = payload; + const handlers = this.handlers.get(topic) ?? []; + if (handlers.length === 0 || !message.value) return; + + try { + const data = JSON.parse(message.value.toString()); + for (const handler of handlers) { + await handler(data).catch(err => + this.logger.error(`[EVENT] Handler error on ${topic}: ${err.message}`), + ); + } + } catch (err: any) { + this.logger.error(`[EVENT] Parse error on ${topic}: ${err.message}`); + } + }, + }); + + this.logger.log(`[EVENT] Consuming: ${topics.join(', ')}`); + } catch (err: any) { + this.logger.error(`[EVENT] Consumer failed: ${err.message}`); + } + } +} diff --git a/src/events/event-types.ts b/src/events/event-types.ts new file mode 100644 index 0000000..ff93fc0 --- /dev/null +++ b/src/events/event-types.ts @@ -0,0 +1,36 @@ +// Event topic names +export const Topics = { + CALL_COMPLETED: 'call.completed', + CALL_MISSED: 'call.missed', + AGENT_STATE: 'agent.state', +} as const; + +// Event payloads +export type CallCompletedEvent = { + callId: string | null; + ucid: string; + agentId: string; + callerPhone: string; + direction: string; + durationSec: number; + disposition: string; + leadId: string | null; + notes: string | null; + timestamp: string; +}; + +export type CallMissedEvent = { + callId: string | null; + callerPhone: string; + leadId: string | null; + leadName: string | null; + timestamp: string; +}; + +export type AgentStateEvent = { + agentId: string; + state: string; + timestamp: string; +}; + +export type EventPayload = CallCompletedEvent | CallMissedEvent | AgentStateEvent; diff --git a/src/events/events.module.ts b/src/events/events.module.ts new file mode 100644 index 0000000..b612a79 --- /dev/null +++ b/src/events/events.module.ts @@ -0,0 +1,12 @@ +import { Module, Global } from '@nestjs/common'; +import { PlatformModule } from '../platform/platform.module'; +import { EventBusService } from './event-bus.service'; +import { AiInsightConsumer } from './consumers/ai-insight.consumer'; + +@Global() +@Module({ + imports: [PlatformModule], + providers: [EventBusService, AiInsightConsumer], + exports: [EventBusService], +}) +export class EventsModule {} diff --git a/src/ozonetel/ozonetel-agent.controller.ts b/src/ozonetel/ozonetel-agent.controller.ts index f7e1597..80740d5 100644 --- a/src/ozonetel/ozonetel-agent.controller.ts +++ b/src/ozonetel/ozonetel-agent.controller.ts @@ -3,6 +3,8 @@ import { ConfigService } from '@nestjs/config'; import { OzonetelAgentService } from './ozonetel-agent.service'; import { MissedQueueService } from '../worklist/missed-queue.service'; import { PlatformGraphqlService } from '../platform/platform-graphql.service'; +import { EventBusService } from '../events/event-bus.service'; +import { Topics } from '../events/event-types'; @Controller('api/ozonetel') export class OzonetelAgentController { @@ -17,6 +19,7 @@ export class OzonetelAgentController { private readonly config: ConfigService, private readonly missedQueue: MissedQueueService, private readonly platform: PlatformGraphqlService, + private readonly eventBus: EventBusService, ) { this.defaultAgentId = config.get('OZONETEL_AGENT_ID') ?? 'agent3'; this.defaultAgentPassword = config.get('OZONETEL_AGENT_PASSWORD') ?? ''; @@ -161,6 +164,20 @@ export class OzonetelAgentController { this.logger.warn(`Auto-assignment after dispose failed: ${err}`); } + // Emit event for downstream processing (AI insights, metrics, etc.) + this.eventBus.emit(Topics.CALL_COMPLETED, { + callId: null, + ucid: body.ucid, + agentId: this.defaultAgentId, + callerPhone: body.callerPhone ?? '', + direction: body.direction ?? 'INBOUND', + durationSec: body.durationSec ?? 0, + disposition: body.disposition, + leadId: body.leadId ?? null, + notes: body.notes ?? null, + timestamp: new Date().toISOString(), + }).catch(() => {}); + return { status: 'ok' }; }