mirror of
https://dev.azure.com/globalhealthx/EMR/_git/helix-engage-server
synced 2026-04-11 18:08:16 +00:00
feat: Phase 2 — missed call queue ingestion, auto-assignment, endpoints
- MissedQueueService: polls Ozonetel abandonCalls every 30s, dedup by phone - Auto-assigns oldest PENDING_CALLBACK call on agent Ready (dispose + state change) - GET /api/worklist/missed-queue, PATCH /api/worklist/missed-queue/:id/status - Worklist query updated with callback fields and FIFO ordering - PlatformGraphqlService.query() made public for server-to-server ops - forwardRef circular dependency resolution between WorklistModule and OzonetelAgentModule Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
225
src/worklist/missed-queue.service.ts
Normal file
225
src/worklist/missed-queue.service.ts
Normal file
@@ -0,0 +1,225 @@
|
||||
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
|
||||
import { ConfigService } from '@nestjs/config';
|
||||
import { PlatformGraphqlService } from '../platform/platform-graphql.service';
|
||||
import { OzonetelAgentService } from '../ozonetel/ozonetel-agent.service';
|
||||
|
||||
// Normalize phone to +91XXXXXXXXXX format
|
||||
export function normalizePhone(raw: string): string {
|
||||
let digits = raw.replace(/[^0-9]/g, '');
|
||||
// Strip leading country code variations: 0091, 91, 0
|
||||
if (digits.startsWith('0091')) digits = digits.slice(4);
|
||||
else if (digits.startsWith('91') && digits.length > 10) digits = digits.slice(2);
|
||||
else if (digits.startsWith('0') && digits.length > 10) digits = digits.slice(1);
|
||||
return `+91${digits.slice(-10)}`;
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
export class MissedQueueService implements OnModuleInit {
|
||||
private readonly logger = new Logger(MissedQueueService.name);
|
||||
private readonly pollIntervalMs: number;
|
||||
private readonly processedUcids = new Set<string>();
|
||||
private assignmentMutex = false;
|
||||
|
||||
constructor(
|
||||
private readonly config: ConfigService,
|
||||
private readonly platform: PlatformGraphqlService,
|
||||
private readonly ozonetel: OzonetelAgentService,
|
||||
) {
|
||||
this.pollIntervalMs = this.config.get<number>('missedQueue.pollIntervalMs', 30000);
|
||||
}
|
||||
|
||||
onModuleInit() {
|
||||
this.logger.log(`Starting missed call ingestion polling every ${this.pollIntervalMs}ms`);
|
||||
setInterval(() => this.ingest().catch(err => this.logger.error('Ingestion failed', err)), this.pollIntervalMs);
|
||||
}
|
||||
|
||||
async ingest(): Promise<{ created: number; updated: number }> {
|
||||
let created = 0;
|
||||
let updated = 0;
|
||||
|
||||
const now = new Date();
|
||||
const fiveMinAgo = new Date(now.getTime() - 5 * 60 * 1000);
|
||||
const format = (d: Date) => d.toISOString().replace('T', ' ').slice(0, 19);
|
||||
|
||||
let abandonCalls: any[];
|
||||
try {
|
||||
abandonCalls = await this.ozonetel.getAbandonCalls({ fromTime: format(fiveMinAgo), toTime: format(now) });
|
||||
} catch (err) {
|
||||
this.logger.warn(`Failed to fetch abandon calls: ${err}`);
|
||||
return { created: 0, updated: 0 };
|
||||
}
|
||||
|
||||
if (!abandonCalls?.length) return { created: 0, updated: 0 };
|
||||
|
||||
for (const call of abandonCalls) {
|
||||
const ucid = call.monitorUCID;
|
||||
if (!ucid || this.processedUcids.has(ucid)) continue;
|
||||
this.processedUcids.add(ucid);
|
||||
|
||||
const phone = normalizePhone(call.callerID || '');
|
||||
if (!phone || phone.length < 13) continue;
|
||||
|
||||
const did = call.did || '';
|
||||
const callTime = call.callTime || new Date().toISOString();
|
||||
|
||||
try {
|
||||
const existing = await this.platform.query<any>(
|
||||
`{ calls(first: 1, filter: {
|
||||
callbackstatus: { eq: PENDING_CALLBACK },
|
||||
callerNumber: { primaryPhoneNumber: { eq: "${phone}" } }
|
||||
}) { edges { node { id missedcallcount } } } }`,
|
||||
);
|
||||
|
||||
const existingNode = existing?.calls?.edges?.[0]?.node;
|
||||
|
||||
if (existingNode) {
|
||||
const newCount = (existingNode.missedcallcount || 1) + 1;
|
||||
await this.platform.query<any>(
|
||||
`mutation { updateCall(id: "${existingNode.id}", data: {
|
||||
missedcallcount: ${newCount},
|
||||
startedAt: "${callTime}",
|
||||
callsourcenumber: "${did}"
|
||||
}) { id } }`,
|
||||
);
|
||||
updated++;
|
||||
this.logger.log(`Dedup missed call ${phone}: count now ${newCount}`);
|
||||
} else {
|
||||
await this.platform.query<any>(
|
||||
`mutation { createCall(data: {
|
||||
callStatus: MISSED,
|
||||
direction: INBOUND,
|
||||
callerNumber: { primaryPhoneNumber: "${phone}", primaryPhoneCallingCode: "+91" },
|
||||
callsourcenumber: "${did}",
|
||||
callbackstatus: PENDING_CALLBACK,
|
||||
missedcallcount: 1,
|
||||
startedAt: "${callTime}"
|
||||
}) { id } }`,
|
||||
);
|
||||
created++;
|
||||
this.logger.log(`Created missed call record for ${phone}`);
|
||||
}
|
||||
} catch (err) {
|
||||
this.logger.warn(`Failed to process abandon call ${ucid}: ${err}`);
|
||||
}
|
||||
}
|
||||
|
||||
// Trim processedUcids to prevent unbounded growth
|
||||
if (this.processedUcids.size > 500) {
|
||||
const arr = Array.from(this.processedUcids);
|
||||
this.processedUcids.clear();
|
||||
arr.slice(-200).forEach(u => this.processedUcids.add(u));
|
||||
}
|
||||
|
||||
if (created || updated) this.logger.log(`Ingestion: ${created} created, ${updated} updated`);
|
||||
return { created, updated };
|
||||
}
|
||||
|
||||
async assignNext(agentName: string): Promise<any | null> {
|
||||
if (this.assignmentMutex) return null;
|
||||
this.assignmentMutex = true;
|
||||
|
||||
try {
|
||||
// Find oldest unassigned PENDING_CALLBACK call (empty agentName)
|
||||
let result = await this.platform.query<any>(
|
||||
`{ calls(first: 1, filter: {
|
||||
callbackstatus: { eq: PENDING_CALLBACK },
|
||||
agentName: { eq: "" }
|
||||
}, orderBy: [{ startedAt: AscNullsLast }]) {
|
||||
edges { node {
|
||||
id callerNumber { primaryPhoneNumber }
|
||||
startedAt callsourcenumber missedcallcount
|
||||
} }
|
||||
} }`,
|
||||
);
|
||||
|
||||
let call = result?.calls?.edges?.[0]?.node;
|
||||
|
||||
// Also check for null agentName
|
||||
if (!call) {
|
||||
result = await this.platform.query<any>(
|
||||
`{ calls(first: 1, filter: {
|
||||
callbackstatus: { eq: PENDING_CALLBACK },
|
||||
agentName: { is: NULL }
|
||||
}, orderBy: [{ startedAt: AscNullsLast }]) {
|
||||
edges { node {
|
||||
id callerNumber { primaryPhoneNumber }
|
||||
startedAt callsourcenumber missedcallcount
|
||||
} }
|
||||
} }`,
|
||||
);
|
||||
call = result?.calls?.edges?.[0]?.node;
|
||||
}
|
||||
|
||||
if (!call) return null;
|
||||
|
||||
await this.platform.query<any>(
|
||||
`mutation { updateCall(id: "${call.id}", data: { agentName: "${agentName}" }) { id } }`,
|
||||
);
|
||||
this.logger.log(`Assigned missed call ${call.id} to ${agentName}`);
|
||||
return call;
|
||||
} catch (err) {
|
||||
this.logger.warn(`Assignment failed: ${err}`);
|
||||
return null;
|
||||
} finally {
|
||||
this.assignmentMutex = false;
|
||||
}
|
||||
}
|
||||
|
||||
async updateStatus(callId: string, status: string, authHeader: string): Promise<any> {
|
||||
const validStatuses = ['PENDING_CALLBACK', 'CALLBACK_ATTEMPTED', 'CALLBACK_COMPLETED', 'INVALID', 'WRONG_NUMBER'];
|
||||
if (!validStatuses.includes(status)) {
|
||||
throw new Error(`Invalid status: ${status}. Must be one of: ${validStatuses.join(', ')}`);
|
||||
}
|
||||
|
||||
const dataParts: string[] = [`callbackstatus: ${status}`];
|
||||
if (status === 'CALLBACK_ATTEMPTED') {
|
||||
dataParts.push(`callbackattemptedat: "${new Date().toISOString()}"`);
|
||||
}
|
||||
|
||||
return this.platform.queryWithAuth<any>(
|
||||
`mutation { updateCall(id: "${callId}", data: { ${dataParts.join(', ')} }) { id callbackstatus callbackattemptedat } }`,
|
||||
undefined,
|
||||
authHeader,
|
||||
);
|
||||
}
|
||||
|
||||
async getMissedQueue(agentName: string, authHeader: string): Promise<{
|
||||
pending: any[];
|
||||
attempted: any[];
|
||||
completed: any[];
|
||||
invalid: any[];
|
||||
}> {
|
||||
const fields = `id name createdAt direction callStatus agentName
|
||||
callerNumber { primaryPhoneNumber }
|
||||
startedAt endedAt durationSec disposition leadId
|
||||
callbackstatus callsourcenumber missedcallcount callbackattemptedat`;
|
||||
|
||||
const buildQuery = (status: string) => `{ calls(first: 50, filter: {
|
||||
agentName: { eq: "${agentName}" },
|
||||
callStatus: { eq: MISSED },
|
||||
callbackstatus: { eq: ${status} }
|
||||
}, orderBy: [{ startedAt: AscNullsLast }]) { edges { node { ${fields} } } } }`;
|
||||
|
||||
try {
|
||||
const [pending, attempted, completed, invalid, wrongNumber] = await Promise.all([
|
||||
this.platform.queryWithAuth<any>(buildQuery('PENDING_CALLBACK'), undefined, authHeader),
|
||||
this.platform.queryWithAuth<any>(buildQuery('CALLBACK_ATTEMPTED'), undefined, authHeader),
|
||||
this.platform.queryWithAuth<any>(buildQuery('CALLBACK_COMPLETED'), undefined, authHeader),
|
||||
this.platform.queryWithAuth<any>(buildQuery('INVALID'), undefined, authHeader),
|
||||
this.platform.queryWithAuth<any>(buildQuery('WRONG_NUMBER'), undefined, authHeader),
|
||||
]);
|
||||
|
||||
const extract = (r: any) => r?.calls?.edges?.map((e: any) => e.node) ?? [];
|
||||
|
||||
return {
|
||||
pending: extract(pending),
|
||||
attempted: extract(attempted),
|
||||
completed: [...extract(completed), ...extract(wrongNumber)],
|
||||
invalid: extract(invalid),
|
||||
};
|
||||
} catch (err) {
|
||||
this.logger.warn(`Failed to fetch missed queue: ${err}`);
|
||||
return { pending: [], attempted: [], completed: [], invalid: [] };
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user