From cec2526d3716a148c6073ea4dab5562b17be95d6 Mon Sep 17 00:00:00 2001 From: saridsa2 Date: Mon, 23 Mar 2026 09:17:33 +0530 Subject: [PATCH] =?UTF-8?q?feat:=20Phase=202=20=E2=80=94=20missed=20call?= =?UTF-8?q?=20queue=20ingestion,=20auto-assignment,=20endpoints?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - MissedQueueService: polls Ozonetel abandonCalls every 30s, dedup by phone - Auto-assigns oldest PENDING_CALLBACK call on agent Ready (dispose + state change) - GET /api/worklist/missed-queue, PATCH /api/worklist/missed-queue/:id/status - Worklist query updated with callback fields and FIFO ordering - PlatformGraphqlService.query() made public for server-to-server ops - forwardRef circular dependency resolution between WorklistModule and OzonetelAgentModule Co-Authored-By: Claude Opus 4.6 (1M context) --- src/config/configuration.ts | 3 + src/ozonetel/ozonetel-agent.controller.ts | 50 ++++- src/ozonetel/ozonetel-agent.module.ts | 5 +- src/platform/platform-graphql.service.ts | 2 +- src/worklist/missed-queue.service.ts | 225 ++++++++++++++++++++++ src/worklist/worklist.controller.ts | 22 ++- src/worklist/worklist.module.ts | 9 +- src/worklist/worklist.service.ts | 4 +- 8 files changed, 310 insertions(+), 10 deletions(-) create mode 100644 src/worklist/missed-queue.service.ts diff --git a/src/config/configuration.ts b/src/config/configuration.ts index 416c9c2..638f5fa 100644 --- a/src/config/configuration.ts +++ b/src/config/configuration.ts @@ -12,6 +12,9 @@ export default () => ({ subdomain: process.env.EXOTEL_SUBDOMAIN ?? 'api.exotel.com', webhookSecret: process.env.EXOTEL_WEBHOOK_SECRET ?? '', }, + missedQueue: { + pollIntervalMs: parseInt(process.env.MISSED_QUEUE_POLL_INTERVAL_MS ?? '30000', 10), + }, ai: { provider: process.env.AI_PROVIDER ?? 'openai', anthropicApiKey: process.env.ANTHROPIC_API_KEY ?? '', diff --git a/src/ozonetel/ozonetel-agent.controller.ts b/src/ozonetel/ozonetel-agent.controller.ts index 40885e6..b8644c1 100644 --- a/src/ozonetel/ozonetel-agent.controller.ts +++ b/src/ozonetel/ozonetel-agent.controller.ts @@ -1,6 +1,8 @@ import { Controller, Post, Get, Body, Query, Logger, HttpException } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { OzonetelAgentService } from './ozonetel-agent.service'; +import { MissedQueueService } from '../worklist/missed-queue.service'; +import { PlatformGraphqlService } from '../platform/platform-graphql.service'; @Controller('api/ozonetel') export class OzonetelAgentController { @@ -13,6 +15,8 @@ export class OzonetelAgentController { constructor( private readonly ozonetelAgent: OzonetelAgentService, private readonly config: ConfigService, + private readonly missedQueue: MissedQueueService, + private readonly platform: PlatformGraphqlService, ) { this.defaultAgentId = config.get('OZONETEL_AGENT_ID') ?? 'agent3'; this.defaultAgentPassword = config.get('OZONETEL_AGENT_PASSWORD') ?? ''; @@ -74,6 +78,18 @@ export class OzonetelAgentController { const message = error.response?.data?.message ?? error.message ?? 'State change failed'; return { status: 'error', message }; } + + // Auto-assign missed call when agent goes Ready + if (body.state === 'Ready') { + try { + const assigned = await this.missedQueue.assignNext(this.defaultAgentId); + if (assigned) { + return { status: 'ok', message: `State changed to Ready. Assigned missed call ${assigned.id}`, assignedCall: assigned }; + } + } catch (err) { + this.logger.warn(`Auto-assignment on Ready failed: ${err}`); + } + } } @Post('agent-ready') @@ -109,6 +125,7 @@ export class OzonetelAgentController { durationSec?: number; leadId?: string; notes?: string; + missedCallId?: string; }, ) { if (!body.ucid || !body.disposition) { @@ -125,13 +142,40 @@ export class OzonetelAgentController { ucid: body.ucid, disposition: ozonetelDisposition, }); - return result; } catch (error: any) { const message = error.response?.data?.message ?? error.message ?? 'Disposition failed'; this.logger.error(`Dispose failed: ${message}`); - // Don't throw — disposition failure shouldn't block the UI - return { status: 'error', message }; } + + // Handle missed call callback status update + if (body.missedCallId) { + const statusMap: Record = { + APPOINTMENT_BOOKED: 'CALLBACK_COMPLETED', + INFO_PROVIDED: 'CALLBACK_COMPLETED', + FOLLOW_UP_SCHEDULED: 'CALLBACK_COMPLETED', + CALLBACK_REQUESTED: 'CALLBACK_COMPLETED', + WRONG_NUMBER: 'WRONG_NUMBER', + }; + const newStatus = statusMap[body.disposition]; + if (newStatus) { + try { + await this.platform.query( + `mutation { updateCall(id: "${body.missedCallId}", data: { callbackstatus: ${newStatus} }) { id } }`, + ); + } catch (err) { + this.logger.warn(`Failed to update missed call status: ${err}`); + } + } + } + + // Auto-assign next missed call to this agent + try { + await this.missedQueue.assignNext(this.defaultAgentId); + } catch (err) { + this.logger.warn(`Auto-assignment after dispose failed: ${err}`); + } + + return { status: 'ok' }; } @Post('dial') diff --git a/src/ozonetel/ozonetel-agent.module.ts b/src/ozonetel/ozonetel-agent.module.ts index bcecdc6..e6a8857 100644 --- a/src/ozonetel/ozonetel-agent.module.ts +++ b/src/ozonetel/ozonetel-agent.module.ts @@ -1,9 +1,12 @@ -import { Module } from '@nestjs/common'; +import { Module, forwardRef } from '@nestjs/common'; import { OzonetelAgentController } from './ozonetel-agent.controller'; import { OzonetelAgentService } from './ozonetel-agent.service'; import { KookooIvrController } from './kookoo-ivr.controller'; +import { WorklistModule } from '../worklist/worklist.module'; +import { PlatformModule } from '../platform/platform.module'; @Module({ + imports: [PlatformModule, forwardRef(() => WorklistModule)], controllers: [OzonetelAgentController, KookooIvrController], providers: [OzonetelAgentService], exports: [OzonetelAgentService], diff --git a/src/platform/platform-graphql.service.ts b/src/platform/platform-graphql.service.ts index d64d17a..cc8389a 100644 --- a/src/platform/platform-graphql.service.ts +++ b/src/platform/platform-graphql.service.ts @@ -14,7 +14,7 @@ export class PlatformGraphqlService { } // Server-to-server query using API key - private async query(query: string, variables?: Record): Promise { + async query(query: string, variables?: Record): Promise { return this.queryWithAuth(query, variables, `Bearer ${this.apiKey}`); } diff --git a/src/worklist/missed-queue.service.ts b/src/worklist/missed-queue.service.ts new file mode 100644 index 0000000..ee98d83 --- /dev/null +++ b/src/worklist/missed-queue.service.ts @@ -0,0 +1,225 @@ +import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { PlatformGraphqlService } from '../platform/platform-graphql.service'; +import { OzonetelAgentService } from '../ozonetel/ozonetel-agent.service'; + +// Normalize phone to +91XXXXXXXXXX format +export function normalizePhone(raw: string): string { + let digits = raw.replace(/[^0-9]/g, ''); + // Strip leading country code variations: 0091, 91, 0 + if (digits.startsWith('0091')) digits = digits.slice(4); + else if (digits.startsWith('91') && digits.length > 10) digits = digits.slice(2); + else if (digits.startsWith('0') && digits.length > 10) digits = digits.slice(1); + return `+91${digits.slice(-10)}`; +} + +@Injectable() +export class MissedQueueService implements OnModuleInit { + private readonly logger = new Logger(MissedQueueService.name); + private readonly pollIntervalMs: number; + private readonly processedUcids = new Set(); + private assignmentMutex = false; + + constructor( + private readonly config: ConfigService, + private readonly platform: PlatformGraphqlService, + private readonly ozonetel: OzonetelAgentService, + ) { + this.pollIntervalMs = this.config.get('missedQueue.pollIntervalMs', 30000); + } + + onModuleInit() { + this.logger.log(`Starting missed call ingestion polling every ${this.pollIntervalMs}ms`); + setInterval(() => this.ingest().catch(err => this.logger.error('Ingestion failed', err)), this.pollIntervalMs); + } + + async ingest(): Promise<{ created: number; updated: number }> { + let created = 0; + let updated = 0; + + const now = new Date(); + const fiveMinAgo = new Date(now.getTime() - 5 * 60 * 1000); + const format = (d: Date) => d.toISOString().replace('T', ' ').slice(0, 19); + + let abandonCalls: any[]; + try { + abandonCalls = await this.ozonetel.getAbandonCalls({ fromTime: format(fiveMinAgo), toTime: format(now) }); + } catch (err) { + this.logger.warn(`Failed to fetch abandon calls: ${err}`); + return { created: 0, updated: 0 }; + } + + if (!abandonCalls?.length) return { created: 0, updated: 0 }; + + for (const call of abandonCalls) { + const ucid = call.monitorUCID; + if (!ucid || this.processedUcids.has(ucid)) continue; + this.processedUcids.add(ucid); + + const phone = normalizePhone(call.callerID || ''); + if (!phone || phone.length < 13) continue; + + const did = call.did || ''; + const callTime = call.callTime || new Date().toISOString(); + + try { + const existing = await this.platform.query( + `{ calls(first: 1, filter: { + callbackstatus: { eq: PENDING_CALLBACK }, + callerNumber: { primaryPhoneNumber: { eq: "${phone}" } } + }) { edges { node { id missedcallcount } } } }`, + ); + + const existingNode = existing?.calls?.edges?.[0]?.node; + + if (existingNode) { + const newCount = (existingNode.missedcallcount || 1) + 1; + await this.platform.query( + `mutation { updateCall(id: "${existingNode.id}", data: { + missedcallcount: ${newCount}, + startedAt: "${callTime}", + callsourcenumber: "${did}" + }) { id } }`, + ); + updated++; + this.logger.log(`Dedup missed call ${phone}: count now ${newCount}`); + } else { + await this.platform.query( + `mutation { createCall(data: { + callStatus: MISSED, + direction: INBOUND, + callerNumber: { primaryPhoneNumber: "${phone}", primaryPhoneCallingCode: "+91" }, + callsourcenumber: "${did}", + callbackstatus: PENDING_CALLBACK, + missedcallcount: 1, + startedAt: "${callTime}" + }) { id } }`, + ); + created++; + this.logger.log(`Created missed call record for ${phone}`); + } + } catch (err) { + this.logger.warn(`Failed to process abandon call ${ucid}: ${err}`); + } + } + + // Trim processedUcids to prevent unbounded growth + if (this.processedUcids.size > 500) { + const arr = Array.from(this.processedUcids); + this.processedUcids.clear(); + arr.slice(-200).forEach(u => this.processedUcids.add(u)); + } + + if (created || updated) this.logger.log(`Ingestion: ${created} created, ${updated} updated`); + return { created, updated }; + } + + async assignNext(agentName: string): Promise { + if (this.assignmentMutex) return null; + this.assignmentMutex = true; + + try { + // Find oldest unassigned PENDING_CALLBACK call (empty agentName) + let result = await this.platform.query( + `{ calls(first: 1, filter: { + callbackstatus: { eq: PENDING_CALLBACK }, + agentName: { eq: "" } + }, orderBy: [{ startedAt: AscNullsLast }]) { + edges { node { + id callerNumber { primaryPhoneNumber } + startedAt callsourcenumber missedcallcount + } } + } }`, + ); + + let call = result?.calls?.edges?.[0]?.node; + + // Also check for null agentName + if (!call) { + result = await this.platform.query( + `{ calls(first: 1, filter: { + callbackstatus: { eq: PENDING_CALLBACK }, + agentName: { is: NULL } + }, orderBy: [{ startedAt: AscNullsLast }]) { + edges { node { + id callerNumber { primaryPhoneNumber } + startedAt callsourcenumber missedcallcount + } } + } }`, + ); + call = result?.calls?.edges?.[0]?.node; + } + + if (!call) return null; + + await this.platform.query( + `mutation { updateCall(id: "${call.id}", data: { agentName: "${agentName}" }) { id } }`, + ); + this.logger.log(`Assigned missed call ${call.id} to ${agentName}`); + return call; + } catch (err) { + this.logger.warn(`Assignment failed: ${err}`); + return null; + } finally { + this.assignmentMutex = false; + } + } + + async updateStatus(callId: string, status: string, authHeader: string): Promise { + const validStatuses = ['PENDING_CALLBACK', 'CALLBACK_ATTEMPTED', 'CALLBACK_COMPLETED', 'INVALID', 'WRONG_NUMBER']; + if (!validStatuses.includes(status)) { + throw new Error(`Invalid status: ${status}. Must be one of: ${validStatuses.join(', ')}`); + } + + const dataParts: string[] = [`callbackstatus: ${status}`]; + if (status === 'CALLBACK_ATTEMPTED') { + dataParts.push(`callbackattemptedat: "${new Date().toISOString()}"`); + } + + return this.platform.queryWithAuth( + `mutation { updateCall(id: "${callId}", data: { ${dataParts.join(', ')} }) { id callbackstatus callbackattemptedat } }`, + undefined, + authHeader, + ); + } + + async getMissedQueue(agentName: string, authHeader: string): Promise<{ + pending: any[]; + attempted: any[]; + completed: any[]; + invalid: any[]; + }> { + const fields = `id name createdAt direction callStatus agentName + callerNumber { primaryPhoneNumber } + startedAt endedAt durationSec disposition leadId + callbackstatus callsourcenumber missedcallcount callbackattemptedat`; + + const buildQuery = (status: string) => `{ calls(first: 50, filter: { + agentName: { eq: "${agentName}" }, + callStatus: { eq: MISSED }, + callbackstatus: { eq: ${status} } + }, orderBy: [{ startedAt: AscNullsLast }]) { edges { node { ${fields} } } } }`; + + try { + const [pending, attempted, completed, invalid, wrongNumber] = await Promise.all([ + this.platform.queryWithAuth(buildQuery('PENDING_CALLBACK'), undefined, authHeader), + this.platform.queryWithAuth(buildQuery('CALLBACK_ATTEMPTED'), undefined, authHeader), + this.platform.queryWithAuth(buildQuery('CALLBACK_COMPLETED'), undefined, authHeader), + this.platform.queryWithAuth(buildQuery('INVALID'), undefined, authHeader), + this.platform.queryWithAuth(buildQuery('WRONG_NUMBER'), undefined, authHeader), + ]); + + const extract = (r: any) => r?.calls?.edges?.map((e: any) => e.node) ?? []; + + return { + pending: extract(pending), + attempted: extract(attempted), + completed: [...extract(completed), ...extract(wrongNumber)], + invalid: extract(invalid), + }; + } catch (err) { + this.logger.warn(`Failed to fetch missed queue: ${err}`); + return { pending: [], attempted: [], completed: [], invalid: [] }; + } + } +} diff --git a/src/worklist/worklist.controller.ts b/src/worklist/worklist.controller.ts index c87aaca..de3b9d5 100644 --- a/src/worklist/worklist.controller.ts +++ b/src/worklist/worklist.controller.ts @@ -1,6 +1,7 @@ -import { Controller, Get, Headers, HttpException, Logger } from '@nestjs/common'; +import { Controller, Get, Patch, Headers, Param, Body, HttpException, Logger } from '@nestjs/common'; import { PlatformGraphqlService } from '../platform/platform-graphql.service'; import { WorklistService } from './worklist.service'; +import { MissedQueueService } from './missed-queue.service'; @Controller('api/worklist') export class WorklistController { @@ -8,6 +9,7 @@ export class WorklistController { constructor( private readonly worklist: WorklistService, + private readonly missedQueue: MissedQueueService, private readonly platform: PlatformGraphqlService, ) {} @@ -23,6 +25,24 @@ export class WorklistController { return this.worklist.getWorklist(agentName, authHeader); } + @Get('missed-queue') + async getMissedQueue(@Headers('authorization') authHeader: string) { + if (!authHeader) throw new HttpException('Authorization header required', 401); + const agentName = await this.resolveAgentName(authHeader); + return this.missedQueue.getMissedQueue(agentName, authHeader); + } + + @Patch('missed-queue/:id/status') + async updateMissedCallStatus( + @Param('id') id: string, + @Headers('authorization') authHeader: string, + @Body() body: { status: string }, + ) { + if (!authHeader) throw new HttpException('Authorization header required', 401); + if (!body.status) throw new HttpException('status is required', 400); + return this.missedQueue.updateStatus(id, body.status, authHeader); + } + private async resolveAgentName(authHeader: string): Promise { try { const data = await this.platform.queryWithAuth( diff --git a/src/worklist/worklist.module.ts b/src/worklist/worklist.module.ts index 5b7731c..cfc64c6 100644 --- a/src/worklist/worklist.module.ts +++ b/src/worklist/worklist.module.ts @@ -1,13 +1,16 @@ -import { Module } from '@nestjs/common'; +import { Module, forwardRef } from '@nestjs/common'; import { PlatformModule } from '../platform/platform.module'; +import { OzonetelAgentModule } from '../ozonetel/ozonetel-agent.module'; import { WorklistController } from './worklist.controller'; import { WorklistService } from './worklist.service'; +import { MissedQueueService } from './missed-queue.service'; import { MissedCallWebhookController } from './missed-call-webhook.controller'; import { KookooCallbackController } from './kookoo-callback.controller'; @Module({ - imports: [PlatformModule], + imports: [PlatformModule, forwardRef(() => OzonetelAgentModule)], controllers: [WorklistController, MissedCallWebhookController, KookooCallbackController], - providers: [WorklistService], + providers: [WorklistService, MissedQueueService], + exports: [MissedQueueService], }) export class WorklistModule {} diff --git a/src/worklist/worklist.service.ts b/src/worklist/worklist.service.ts index 39a1624..81bc8bb 100644 --- a/src/worklist/worklist.service.ts +++ b/src/worklist/worklist.service.ts @@ -76,13 +76,15 @@ export class WorklistService { private async getMissedCalls(agentName: string, authHeader: string): Promise { try { + // FIFO ordering (AscNullsLast) — oldest first. Filter to active callback statuses only. const data = await this.platform.queryWithAuth( - `{ calls(first: 20, filter: { agentName: { eq: "${agentName}" }, callStatus: { eq: MISSED } }, orderBy: [{ startedAt: DescNullsLast }]) { edges { node { + `{ calls(first: 20, filter: { agentName: { eq: "${agentName}" }, callStatus: { eq: MISSED }, callbackstatus: { in: [PENDING_CALLBACK, CALLBACK_ATTEMPTED] } }, orderBy: [{ startedAt: AscNullsLast }]) { edges { node { id name createdAt direction callStatus agentName callerNumber { primaryPhoneNumber } startedAt endedAt durationSec disposition leadId + callbackstatus callsourcenumber missedcallcount callbackattemptedat } } } }`, undefined, authHeader,