mirror of
https://dev.azure.com/globalhealthx/EMR/_git/helix-engage-server
synced 2026-04-11 10:07:22 +00:00
- SSE agent state stream: supervisor maintains state map from Ozonetel webhooks, streams via /api/supervisor/agent-state/stream - Force-logout via SSE: distinct force-logout event type avoids conflict with normal login cycle - Maint module (/api/maint): OTP-guarded endpoints for force-ready, unlock-agent, backfill-missed-calls, fix-timestamps - Fix Ozonetel IST→UTC timestamp conversion: istToUtc() in webhook controller and missed-queue service - Missed call lead lookup: ingestion queries leads by phone, stores leadId + leadName on Call entity - Timestamp backfill endpoint: throttled at 700ms/mutation, idempotent (skips already-fixed records) - Structured logging: full JSON payloads for agent/call webhooks, [DISPOSE] trace with agentId - Fix dead code: agent-state endpoint auto-assign was after return statement - Export SupervisorService for cross-module injection Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
264 lines
12 KiB
TypeScript
264 lines
12 KiB
TypeScript
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
|
|
import { ConfigService } from '@nestjs/config';
|
|
import { PlatformGraphqlService } from '../platform/platform-graphql.service';
|
|
import { OzonetelAgentService } from '../ozonetel/ozonetel-agent.service';
|
|
|
|
// Ozonetel sends all timestamps in IST — convert to UTC for storage
|
|
export function istToUtc(istDateStr: string | null): string | null {
|
|
if (!istDateStr) return null;
|
|
const d = new Date(istDateStr);
|
|
if (isNaN(d.getTime())) return null;
|
|
d.setMinutes(d.getMinutes() - 330); // IST is UTC+5:30
|
|
return d.toISOString();
|
|
}
|
|
|
|
// Normalize phone to +91XXXXXXXXXX format
|
|
export function normalizePhone(raw: string): string {
|
|
let digits = raw.replace(/[^0-9]/g, '');
|
|
// Strip leading country code variations: 0091, 91, 0
|
|
if (digits.startsWith('0091')) digits = digits.slice(4);
|
|
else if (digits.startsWith('91') && digits.length > 10) digits = digits.slice(2);
|
|
else if (digits.startsWith('0') && digits.length > 10) digits = digits.slice(1);
|
|
return `+91${digits.slice(-10)}`;
|
|
}
|
|
|
|
@Injectable()
|
|
export class MissedQueueService implements OnModuleInit {
|
|
private readonly logger = new Logger(MissedQueueService.name);
|
|
private readonly pollIntervalMs: number;
|
|
private readonly processedUcids = new Set<string>();
|
|
private assignmentMutex = false;
|
|
|
|
constructor(
|
|
private readonly config: ConfigService,
|
|
private readonly platform: PlatformGraphqlService,
|
|
private readonly ozonetel: OzonetelAgentService,
|
|
) {
|
|
this.pollIntervalMs = this.config.get<number>('missedQueue.pollIntervalMs', 30000);
|
|
}
|
|
|
|
onModuleInit() {
|
|
this.logger.log(`Starting missed call ingestion polling every ${this.pollIntervalMs}ms`);
|
|
setInterval(() => this.ingest().catch(err => this.logger.error('Ingestion failed', err)), this.pollIntervalMs);
|
|
}
|
|
|
|
async ingest(): Promise<{ created: number; updated: number }> {
|
|
let created = 0;
|
|
let updated = 0;
|
|
|
|
// Ozonetel fromTime/toTime use HH:MM:SS format (time of day, filters within current day)
|
|
const now = new Date();
|
|
const fiveMinAgo = new Date(now.getTime() - 5 * 60 * 1000);
|
|
const toHHMMSS = (d: Date) => d.toTimeString().slice(0, 8);
|
|
|
|
let abandonCalls: any[];
|
|
try {
|
|
abandonCalls = await this.ozonetel.getAbandonCalls({ fromTime: toHHMMSS(fiveMinAgo), toTime: toHHMMSS(now) });
|
|
} catch (err) {
|
|
this.logger.warn(`Failed to fetch abandon calls: ${err}`);
|
|
return { created: 0, updated: 0 };
|
|
}
|
|
|
|
if (!abandonCalls?.length) return { created: 0, updated: 0 };
|
|
|
|
for (const call of abandonCalls) {
|
|
const ucid = call.monitorUCID;
|
|
if (!ucid || this.processedUcids.has(ucid)) continue;
|
|
this.processedUcids.add(ucid);
|
|
|
|
const phone = normalizePhone(call.callerID || '');
|
|
if (!phone || phone.length < 13) continue;
|
|
|
|
const did = call.did || '';
|
|
const callTime = istToUtc(call.callTime) ?? new Date().toISOString();
|
|
|
|
try {
|
|
// Look up lead by phone number — strip +91 prefix for flexible matching
|
|
const phoneDigits = phone.replace(/^\+91/, '');
|
|
let leadId: string | null = null;
|
|
let leadName: string | null = null;
|
|
try {
|
|
const leadResult = await this.platform.query<any>(
|
|
`{ leads(first: 1, filter: {
|
|
contactPhone: { primaryPhoneNumber: { like: "%${phoneDigits}" } }
|
|
}) { edges { node { id contactName { firstName lastName } patientId } } } }`,
|
|
);
|
|
const matchedLead = leadResult?.leads?.edges?.[0]?.node;
|
|
if (matchedLead) {
|
|
leadId = matchedLead.id;
|
|
const fn = matchedLead.contactName?.firstName ?? '';
|
|
const ln = matchedLead.contactName?.lastName ?? '';
|
|
leadName = `${fn} ${ln}`.trim() || null;
|
|
this.logger.log(`Matched missed call ${phone} → lead ${leadId} (${leadName})`);
|
|
}
|
|
} catch (err) {
|
|
this.logger.warn(`Lead lookup failed for ${phone}: ${err}`);
|
|
}
|
|
|
|
const existing = await this.platform.query<any>(
|
|
`{ calls(first: 1, filter: {
|
|
callbackstatus: { eq: PENDING_CALLBACK },
|
|
callerNumber: { primaryPhoneNumber: { eq: "${phone}" } }
|
|
}) { edges { node { id missedcallcount } } } }`,
|
|
);
|
|
|
|
const existingNode = existing?.calls?.edges?.[0]?.node;
|
|
|
|
if (existingNode) {
|
|
const newCount = (existingNode.missedcallcount || 1) + 1;
|
|
const updateParts = [
|
|
`missedcallcount: ${newCount}`,
|
|
`startedAt: "${callTime}"`,
|
|
`callsourcenumber: "${did}"`,
|
|
];
|
|
if (leadId) updateParts.push(`leadId: "${leadId}"`);
|
|
if (leadName) updateParts.push(`leadName: "${leadName}"`);
|
|
await this.platform.query<any>(
|
|
`mutation { updateCall(id: "${existingNode.id}", data: { ${updateParts.join(', ')} }) { id } }`,
|
|
);
|
|
updated++;
|
|
this.logger.log(`Dedup missed call ${phone}: count now ${newCount}${leadName ? ` (${leadName})` : ''}`);
|
|
} else {
|
|
const dataParts = [
|
|
`callStatus: MISSED`,
|
|
`direction: INBOUND`,
|
|
`callerNumber: { primaryPhoneNumber: "${phone}", primaryPhoneCallingCode: "+91" }`,
|
|
`callsourcenumber: "${did}"`,
|
|
`callbackstatus: PENDING_CALLBACK`,
|
|
`missedcallcount: 1`,
|
|
`startedAt: "${callTime}"`,
|
|
];
|
|
if (leadId) dataParts.push(`leadId: "${leadId}"`);
|
|
if (leadName) dataParts.push(`leadName: "${leadName}"`);
|
|
await this.platform.query<any>(
|
|
`mutation { createCall(data: { ${dataParts.join(', ')} }) { id } }`,
|
|
);
|
|
created++;
|
|
this.logger.log(`Created missed call record for ${phone}${leadName ? ` → ${leadName}` : ''}`);
|
|
}
|
|
} catch (err) {
|
|
this.logger.warn(`Failed to process abandon call ${ucid}: ${err}`);
|
|
}
|
|
}
|
|
|
|
// Trim processedUcids to prevent unbounded growth
|
|
if (this.processedUcids.size > 500) {
|
|
const arr = Array.from(this.processedUcids);
|
|
this.processedUcids.clear();
|
|
arr.slice(-200).forEach(u => this.processedUcids.add(u));
|
|
}
|
|
|
|
if (created || updated) this.logger.log(`Ingestion: ${created} created, ${updated} updated`);
|
|
return { created, updated };
|
|
}
|
|
|
|
async assignNext(agentName: string): Promise<any | null> {
|
|
if (this.assignmentMutex) return null;
|
|
this.assignmentMutex = true;
|
|
|
|
try {
|
|
// Find oldest unassigned PENDING_CALLBACK call (empty agentName)
|
|
let result = await this.platform.query<any>(
|
|
`{ calls(first: 1, filter: {
|
|
callbackstatus: { eq: PENDING_CALLBACK },
|
|
agentName: { eq: "" }
|
|
}, orderBy: [{ startedAt: AscNullsLast }]) {
|
|
edges { node {
|
|
id callerNumber { primaryPhoneNumber }
|
|
startedAt callsourcenumber missedcallcount
|
|
} }
|
|
} }`,
|
|
);
|
|
|
|
let call = result?.calls?.edges?.[0]?.node;
|
|
|
|
// Also check for null agentName
|
|
if (!call) {
|
|
result = await this.platform.query<any>(
|
|
`{ calls(first: 1, filter: {
|
|
callbackstatus: { eq: PENDING_CALLBACK },
|
|
agentName: { is: NULL }
|
|
}, orderBy: [{ startedAt: AscNullsLast }]) {
|
|
edges { node {
|
|
id callerNumber { primaryPhoneNumber }
|
|
startedAt callsourcenumber missedcallcount
|
|
} }
|
|
} }`,
|
|
);
|
|
call = result?.calls?.edges?.[0]?.node;
|
|
}
|
|
|
|
if (!call) return null;
|
|
|
|
await this.platform.query<any>(
|
|
`mutation { updateCall(id: "${call.id}", data: { agentName: "${agentName}" }) { id } }`,
|
|
);
|
|
this.logger.log(`Assigned missed call ${call.id} to ${agentName}`);
|
|
return call;
|
|
} catch (err) {
|
|
this.logger.warn(`Assignment failed: ${err}`);
|
|
return null;
|
|
} finally {
|
|
this.assignmentMutex = false;
|
|
}
|
|
}
|
|
|
|
async updateStatus(callId: string, status: string, authHeader: string): Promise<any> {
|
|
const validStatuses = ['PENDING_CALLBACK', 'CALLBACK_ATTEMPTED', 'CALLBACK_COMPLETED', 'INVALID', 'WRONG_NUMBER'];
|
|
if (!validStatuses.includes(status)) {
|
|
throw new Error(`Invalid status: ${status}. Must be one of: ${validStatuses.join(', ')}`);
|
|
}
|
|
|
|
const dataParts: string[] = [`callbackstatus: ${status}`];
|
|
if (status === 'CALLBACK_ATTEMPTED') {
|
|
dataParts.push(`callbackattemptedat: "${new Date().toISOString()}"`);
|
|
}
|
|
|
|
return this.platform.queryWithAuth<any>(
|
|
`mutation { updateCall(id: "${callId}", data: { ${dataParts.join(', ')} }) { id callbackstatus callbackattemptedat } }`,
|
|
undefined,
|
|
authHeader,
|
|
);
|
|
}
|
|
|
|
async getMissedQueue(agentName: string, authHeader: string): Promise<{
|
|
pending: any[];
|
|
attempted: any[];
|
|
completed: any[];
|
|
invalid: any[];
|
|
}> {
|
|
const fields = `id name createdAt direction callStatus agentName
|
|
callerNumber { primaryPhoneNumber }
|
|
startedAt endedAt durationSec disposition leadId
|
|
callbackstatus callsourcenumber missedcallcount callbackattemptedat`;
|
|
|
|
const buildQuery = (status: string) => `{ calls(first: 50, filter: {
|
|
agentName: { eq: "${agentName}" },
|
|
callStatus: { eq: MISSED },
|
|
callbackstatus: { eq: ${status} }
|
|
}, orderBy: [{ startedAt: AscNullsLast }]) { edges { node { ${fields} } } } }`;
|
|
|
|
try {
|
|
const [pending, attempted, completed, invalid, wrongNumber] = await Promise.all([
|
|
this.platform.queryWithAuth<any>(buildQuery('PENDING_CALLBACK'), undefined, authHeader),
|
|
this.platform.queryWithAuth<any>(buildQuery('CALLBACK_ATTEMPTED'), undefined, authHeader),
|
|
this.platform.queryWithAuth<any>(buildQuery('CALLBACK_COMPLETED'), undefined, authHeader),
|
|
this.platform.queryWithAuth<any>(buildQuery('INVALID'), undefined, authHeader),
|
|
this.platform.queryWithAuth<any>(buildQuery('WRONG_NUMBER'), undefined, authHeader),
|
|
]);
|
|
|
|
const extract = (r: any) => r?.calls?.edges?.map((e: any) => e.node) ?? [];
|
|
|
|
return {
|
|
pending: extract(pending),
|
|
attempted: extract(attempted),
|
|
completed: [...extract(completed), ...extract(wrongNumber)],
|
|
invalid: extract(invalid),
|
|
};
|
|
} catch (err) {
|
|
this.logger.warn(`Failed to fetch missed queue: ${err}`);
|
|
return { pending: [], attempted: [], completed: [], invalid: [] };
|
|
}
|
|
}
|
|
}
|