mirror of
https://dev.azure.com/globalhealthx/EMR/_git/helix-engage-server
synced 2026-05-18 20:08:19 +00:00
Inbound transferred calls arrive with AgentName like 'RamaiahAdmin -> GlobalHealthX'. The webhook was persisting the raw chain string and leaving agentId null; the CDR enrichment cron then silently skipped 100% of rows because the bulk CDR keys on caller-leg UCID while the webhook stores monitorUCID — the join never matched. - missed-call-webhook: split chain on ' -> ', take final handler, resolve via AgentLookupService (ozonetelAgentId + display name) - cdr-enrichment: index CDR rows by both UCID and monitorUCID so the cron actually patches historical rows - enrichment also parses chain in CDR AgentName as a second fallback - spec: add CallerResolutionService + AgentLookupService mocks
175 lines
8.2 KiB
TypeScript
175 lines
8.2 KiB
TypeScript
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
|
|
import { OzonetelAgentService } from './ozonetel-agent.service';
|
|
import { PlatformGraphqlService } from '../platform/platform-graphql.service';
|
|
import { AgentLookupService } from '../platform/agent-lookup.service';
|
|
|
|
/**
|
|
* Periodically pulls Ozonetel CDR (per-row, includes unique AgentID) and
|
|
* enriches Call records that were created from the missed-call webhook
|
|
* or outbound dispose without the authoritative agent relation.
|
|
*
|
|
* Runs every 30 minutes — well under Ozonetel's 2-req/min cap on the CDR
|
|
* endpoints (one fetch per workspace per tick = 2/hour).
|
|
*
|
|
* Pairs Call rows to CDR rows by `ucid`. Only patches Calls that are
|
|
* missing `agentId` / `transferredTo` / `transferType` — idempotent.
|
|
*/
|
|
const ENRICHMENT_INTERVAL_MS = 30 * 60 * 1000;
|
|
const ENRICHMENT_DATE_WINDOW_DAYS = 2; // today + yesterday in case late-arriving calls straddle IST midnight
|
|
|
|
@Injectable()
|
|
export class CdrEnrichmentService implements OnModuleInit, OnModuleDestroy {
|
|
private readonly logger = new Logger(CdrEnrichmentService.name);
|
|
private timer: NodeJS.Timeout | null = null;
|
|
|
|
constructor(
|
|
private readonly ozonetel: OzonetelAgentService,
|
|
private readonly platform: PlatformGraphqlService,
|
|
private readonly agentLookup: AgentLookupService,
|
|
) {}
|
|
|
|
async onModuleInit() {
|
|
// Kick off after 60s so the sidecar isn't hammering platform during boot,
|
|
// then settle into the 30-min cadence.
|
|
setTimeout(() => {
|
|
this.runOnce().catch((err) => {
|
|
this.logger.warn(`[CDR-ENRICH] First run failed: ${err?.message ?? err}`);
|
|
});
|
|
}, 60_000);
|
|
this.timer = setInterval(() => {
|
|
this.runOnce().catch((err) => {
|
|
this.logger.warn(`[CDR-ENRICH] Tick failed: ${err?.message ?? err}`);
|
|
});
|
|
}, ENRICHMENT_INTERVAL_MS);
|
|
}
|
|
|
|
onModuleDestroy() {
|
|
if (this.timer) clearInterval(this.timer);
|
|
}
|
|
|
|
async runOnce(): Promise<{ scanned: number; enriched: number; skipped: number }> {
|
|
let scanned = 0;
|
|
let enriched = 0;
|
|
let skipped = 0;
|
|
|
|
// Walk the IST-date window. For each date, pull CDR + patch Calls.
|
|
// Sleep 35s between dates — Ozonetel caps CDR endpoints at 2 req/min
|
|
// and the dispose flow shares that budget (fetchCdrByUCID per outbound).
|
|
const dates = this.recentDatesIst(ENRICHMENT_DATE_WINDOW_DAYS);
|
|
for (let i = 0; i < dates.length; i++) {
|
|
const date = dates[i];
|
|
if (i > 0) await new Promise((r) => setTimeout(r, 35_000));
|
|
const cdrRows = await this.ozonetel.fetchCDR({ date }).catch(() => []);
|
|
if (cdrRows.length === 0) continue;
|
|
|
|
// Build UCID → cdr-row map so we can O(1) join per Call.
|
|
// Ozonetel emits two identifiers per call — `UCID` (caller-leg)
|
|
// and `monitorUCID` (agent-leg). The webhook stores `monitorUCID`,
|
|
// but the bulk CDR rows are keyed on caller-leg `UCID`. Index
|
|
// both so the lookup at line ~79 finds the row regardless of
|
|
// which side was persisted. Without this, transferred inbound
|
|
// calls never get their agent relation enriched.
|
|
const byUcid = new Map<string, any>();
|
|
for (const row of cdrRows) {
|
|
const ucid = String(row.UCID ?? '').trim();
|
|
const monitorUcid = String(row.monitorUCID ?? '').trim();
|
|
if (ucid) byUcid.set(ucid, row);
|
|
if (monitorUcid && monitorUcid !== ucid) byUcid.set(monitorUcid, row);
|
|
}
|
|
if (byUcid.size === 0) continue;
|
|
|
|
// Pull Calls in the same date window that are missing agent linkage
|
|
// (i.e. ucid set, agentId null). Patch each.
|
|
const calls = await this.fetchCallsMissingAgent(date);
|
|
scanned += calls.length;
|
|
|
|
for (const call of calls) {
|
|
const cdrRow = byUcid.get(String(call.ucid).trim());
|
|
if (!cdrRow) { skipped++; continue; }
|
|
|
|
const patch: Record<string, any> = {};
|
|
if (!call.agentId) {
|
|
// Primary resolution: use AgentID from CDR (unique lowercase id).
|
|
const cdrAgentId = cdrRow.AgentID;
|
|
let uuid = cdrAgentId
|
|
? await this.agentLookup.resolveByOzonetelId(cdrAgentId)
|
|
: null;
|
|
// Fallback: CDR AgentName may be a chain ("A -> B") for
|
|
// transferred calls. Pick the final handler (last segment)
|
|
// and look it up by display name or ozonetelId. Matches
|
|
// the write-time resolution in missed-call-webhook.
|
|
if (!uuid && cdrRow.AgentName) {
|
|
const segments = String(cdrRow.AgentName).split('->').map((s) => s.trim()).filter(Boolean);
|
|
const finalHandler = segments[segments.length - 1];
|
|
if (finalHandler) {
|
|
uuid =
|
|
(await this.agentLookup.resolveByOzonetelId(finalHandler)) ??
|
|
(await this.agentLookup.resolveByDisplayName(finalHandler));
|
|
}
|
|
}
|
|
if (uuid) patch.agentId = uuid;
|
|
if (cdrRow.AgentName) patch.agentName = cdrRow.AgentName;
|
|
}
|
|
if (cdrRow.TransferredTo && !call.transferredTo) patch.transferredTo = cdrRow.TransferredTo;
|
|
if (cdrRow.TransferType && !call.transferType) patch.transferType = cdrRow.TransferType;
|
|
|
|
if (Object.keys(patch).length === 0) { skipped++; continue; }
|
|
|
|
try {
|
|
await this.platform.query<any>(
|
|
`mutation($id: UUID!, $data: CallUpdateInput!) { updateCall(id: $id, data: $data) { id } }`,
|
|
{ id: call.id, data: patch },
|
|
);
|
|
enriched++;
|
|
await new Promise((r) => setTimeout(r, 50));
|
|
} catch (err: any) {
|
|
this.logger.warn(`[CDR-ENRICH] Patch failed for ${call.id}: ${err?.message ?? err}`);
|
|
skipped++;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (scanned > 0 || enriched > 0) {
|
|
this.logger.log(`[CDR-ENRICH] Pass complete — dates=[${dates.join(',')}] scanned=${scanned} enriched=${enriched} skipped=${skipped}`);
|
|
}
|
|
return { scanned, enriched, skipped };
|
|
}
|
|
|
|
private async fetchCallsMissingAgent(date: string): Promise<Array<{ id: string; ucid: string | null; agentId: string | null; transferredTo: string | null; transferType: string | null }>> {
|
|
// Bound by IST day. CDR window is 15 days; we only ever need recent.
|
|
const gte = `${date}T00:00:00+05:30`;
|
|
const lte = `${date}T23:59:59+05:30`;
|
|
const results: Array<any> = [];
|
|
let after: string | null = null;
|
|
|
|
for (let page = 0; page < 20; page++) {
|
|
const cursorArg: string = after ? `, after: "${after}"` : '';
|
|
const data: any = await this.platform.query<any>(
|
|
`{ calls(first: 200${cursorArg}, filter: {
|
|
startedAt: { gte: "${gte}", lte: "${lte}" },
|
|
ucid: { is: NOT_NULL },
|
|
agentId: { is: NULL }
|
|
}) {
|
|
edges { node { id ucid agentId transferredTo transferType } }
|
|
pageInfo { hasNextPage endCursor }
|
|
} }`,
|
|
).catch(() => ({ calls: { edges: [], pageInfo: {} } }));
|
|
const edges = data?.calls?.edges ?? [];
|
|
for (const e of edges) results.push(e.node);
|
|
const pageInfo = data?.calls?.pageInfo ?? {};
|
|
if (!pageInfo.hasNextPage) break;
|
|
after = pageInfo.endCursor ?? null;
|
|
}
|
|
return results;
|
|
}
|
|
|
|
private recentDatesIst(n: number): string[] {
|
|
const dates: string[] = [];
|
|
for (let i = 0; i < n; i++) {
|
|
const d = new Date(Date.now() + 5.5 * 60 * 60 * 1000 - i * 24 * 60 * 60 * 1000);
|
|
dates.push(d.toISOString().slice(0, 10));
|
|
}
|
|
return dates;
|
|
}
|
|
}
|