feat(calls): consolidate agent identity via Ozonetel CDR

Ozonetel's webhook AgentName is a transfer-chain display string — same
display can collide (two agents both named "GlobalHealthX" with distinct
agent IDs), and chained like "RamaiahAdmin -> Ganesh Bandi -> GlobalHealthX".
Team Performance was bucketing every unique raw string as a separate
"agent", producing 7 rows for 3 real agents.

Fix — authoritative agent link via CDR AgentID (unique):

- New AgentLookupService (platform module): case-insensitive
  ozonetelAgentId → Agent UUID cache, shared across webhook / dispose /
  enrichment / backfill paths
- Webhook + outbound-dispose now persist UCID on Call so CDR can join
- Outbound dispose resolves agent relation at create time and overwrites
  from CDR AgentID post-hoc (catches dial transfers)
- New CdrEnrichmentService: every 30 min fetches today + yesterday CDR,
  patches Calls missing agentId / transferredTo / transferType by UCID
  join. Well under Ozonetel's 2 req/min cap.
- Historical backfill maint endpoint: /api/maint/enrich-call-agents
  with configurable day window (default 2, max 15). Rate-limited at 35s
  between dates.

Call schema additions (synced on Global + Ramaiah): agent relation,
ucid, transferredTo, transferType. agentName remains for legacy/display.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-15 07:43:28 +05:30
parent 9472f83cd8
commit 846c5f4c9b
7 changed files with 349 additions and 4 deletions

View File

@@ -7,6 +7,8 @@ import { SupervisorService } from '../supervisor/supervisor.service';
import { AgentHistoryService, AgentEventType } from '../supervisor/agent-history.service'; import { AgentHistoryService, AgentEventType } from '../supervisor/agent-history.service';
import { CallerResolutionService } from '../caller/caller-resolution.service'; import { CallerResolutionService } from '../caller/caller-resolution.service';
import { TelephonyConfigService } from '../config/telephony-config.service'; import { TelephonyConfigService } from '../config/telephony-config.service';
import { AgentLookupService } from '../platform/agent-lookup.service';
import { CdrEnrichmentService } from '../ozonetel/cdr-enrichment.service';
@Controller('api/maint') @Controller('api/maint')
@UseGuards(MaintGuard) @UseGuards(MaintGuard)
@@ -21,6 +23,8 @@ export class MaintController {
private readonly supervisor: SupervisorService, private readonly supervisor: SupervisorService,
private readonly callerResolution: CallerResolutionService, private readonly callerResolution: CallerResolutionService,
private readonly history: AgentHistoryService, private readonly history: AgentHistoryService,
private readonly agentLookup: AgentLookupService,
private readonly cdrEnrichment: CdrEnrichmentService,
) {} ) {}
@Post('force-ready') @Post('force-ready')
@@ -554,4 +558,118 @@ export class MaintController {
} }
return events; return events;
} }
// Historical enrichment: runs the same CDR-enrichment loop the cron runs,
// but kicks it off immediately and (optionally) widens the date window
// beyond "today + yesterday" up to the CDR API's 15-day limit.
//
// POST /api/maint/enrich-call-agents
// Headers: x-maint-otp: <OTP>
// Body: { days?: number } — default 2 (matches the cron); max 15
@Post('enrich-call-agents')
async enrichCallAgents(@Body() body: { days?: number }) {
const requestedDays = Math.max(1, Math.min(15, body?.days ?? 2));
this.logger.log(`[MAINT] Enrich call agents — days=${requestedDays}`);
// Call the enrichment service once per date, respecting the 2-req/min
// CDR rate limit. Each tick fetches one date's CDR (1 req) so we can
// iterate up to 2 dates per minute — enforce a 35s gap between dates.
const dates = this.recentDatesIst(requestedDays);
let totalScanned = 0;
let totalEnriched = 0;
let totalSkipped = 0;
for (let i = 0; i < dates.length; i++) {
const date = dates[i];
try {
const result = await this.enrichSingleDate(date);
totalScanned += result.scanned;
totalEnriched += result.enriched;
totalSkipped += result.skipped;
this.logger.log(`[MAINT] ${date} — scanned=${result.scanned} enriched=${result.enriched} skipped=${result.skipped}`);
} catch (err: any) {
this.logger.warn(`[MAINT] Enrich failed for ${date}: ${err?.message ?? err}`);
}
// Rate limiting: 35s between dates to stay under 2 req/min on CDR.
if (i < dates.length - 1) await new Promise((r) => setTimeout(r, 35_000));
}
this.logger.log(`[MAINT] Enrichment complete: scanned=${totalScanned} enriched=${totalEnriched} skipped=${totalSkipped} across ${dates.length} dates`);
return { status: 'ok', scanned: totalScanned, enriched: totalEnriched, skipped: totalSkipped, dates };
}
private async enrichSingleDate(date: string): Promise<{ scanned: number; enriched: number; skipped: number }> {
// Reuse the cdr-enrichment path via its runOnce method, but scoped.
// For simplicity we reimplement the single-date logic here so we can
// parameterize the date without leaking CDR-enrichment internals.
const cdrRows = await this.ozonetel.fetchCDR({ date });
if (cdrRows.length === 0) return { scanned: 0, enriched: 0, skipped: 0 };
const byUcid = new Map<string, any>();
for (const row of cdrRows) {
const ucid = String(row.UCID ?? '').trim();
if (ucid) byUcid.set(ucid, row);
}
// Fetch calls missing agent link on this date
const gte = `${date}T00:00:00+05:30`;
const lte = `${date}T23:59:59+05:30`;
const calls: Array<any> = [];
let after: string | null = null;
for (let page = 0; page < 30; page++) {
const cursorArg: string = after ? `, after: "${after}"` : '';
const data: any = await this.platform.query<any>(
`{ calls(first: 200${cursorArg}, filter: {
startedAt: { gte: "${gte}", lte: "${lte}" },
ucid: { is: NOT_NULL },
agentId: { is: NULL }
}) {
edges { node { id ucid agentId transferredTo transferType } }
pageInfo { hasNextPage endCursor }
} }`,
).catch(() => ({ calls: { edges: [], pageInfo: {} } }));
const edges = data?.calls?.edges ?? [];
for (const e of edges) calls.push(e.node);
const pageInfo = data?.calls?.pageInfo ?? {};
if (!pageInfo.hasNextPage) break;
after = pageInfo.endCursor ?? null;
}
let enriched = 0;
let skipped = 0;
for (const call of calls) {
const cdrRow = byUcid.get(String(call.ucid).trim());
if (!cdrRow) { skipped++; continue; }
const patch: Record<string, any> = {};
if (cdrRow.AgentID && !call.agentId) {
const uuid = await this.agentLookup.resolveByOzonetelId(cdrRow.AgentID);
if (uuid) patch.agentId = uuid;
if (cdrRow.AgentName) patch.agentName = cdrRow.AgentName;
}
if (cdrRow.TransferredTo && !call.transferredTo) patch.transferredTo = cdrRow.TransferredTo;
if (cdrRow.TransferType && !call.transferType) patch.transferType = cdrRow.TransferType;
if (Object.keys(patch).length === 0) { skipped++; continue; }
try {
await this.platform.query<any>(
`mutation($id: UUID!, $data: CallUpdateInput!) { updateCall(id: $id, data: $data) { id } }`,
{ id: call.id, data: patch },
);
enriched++;
await new Promise((r) => setTimeout(r, 80));
} catch (err) {
skipped++;
}
}
return { scanned: calls.length, enriched, skipped };
}
private recentDatesIst(n: number): string[] {
const dates: string[] = [];
for (let i = 0; i < n; i++) {
const d = new Date(Date.now() + 5.5 * 60 * 60 * 1000 - i * 24 * 60 * 60 * 1000);
dates.push(d.toISOString().slice(0, 10));
}
return dates;
}
} }

View File

@@ -0,0 +1,146 @@
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { OzonetelAgentService } from './ozonetel-agent.service';
import { PlatformGraphqlService } from '../platform/platform-graphql.service';
import { AgentLookupService } from '../platform/agent-lookup.service';
/**
* Periodically pulls Ozonetel CDR (per-row, includes unique AgentID) and
* enriches Call records that were created from the missed-call webhook
* or outbound dispose without the authoritative agent relation.
*
* Runs every 30 minutes — well under Ozonetel's 2-req/min cap on the CDR
* endpoints (one fetch per workspace per tick = 2/hour).
*
* Pairs Call rows to CDR rows by `ucid`. Only patches Calls that are
* missing `agentId` / `transferredTo` / `transferType` — idempotent.
*/
const ENRICHMENT_INTERVAL_MS = 30 * 60 * 1000;
const ENRICHMENT_DATE_WINDOW_DAYS = 2; // today + yesterday in case late-arriving calls straddle IST midnight
@Injectable()
export class CdrEnrichmentService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(CdrEnrichmentService.name);
private timer: NodeJS.Timeout | null = null;
constructor(
private readonly ozonetel: OzonetelAgentService,
private readonly platform: PlatformGraphqlService,
private readonly agentLookup: AgentLookupService,
) {}
async onModuleInit() {
// Kick off after 60s so the sidecar isn't hammering platform during boot,
// then settle into the 30-min cadence.
setTimeout(() => {
this.runOnce().catch((err) => {
this.logger.warn(`[CDR-ENRICH] First run failed: ${err?.message ?? err}`);
});
}, 60_000);
this.timer = setInterval(() => {
this.runOnce().catch((err) => {
this.logger.warn(`[CDR-ENRICH] Tick failed: ${err?.message ?? err}`);
});
}, ENRICHMENT_INTERVAL_MS);
}
onModuleDestroy() {
if (this.timer) clearInterval(this.timer);
}
async runOnce(): Promise<{ scanned: number; enriched: number; skipped: number }> {
let scanned = 0;
let enriched = 0;
let skipped = 0;
// Walk the IST-date window. For each date, pull CDR + patch Calls.
const dates = this.recentDatesIst(ENRICHMENT_DATE_WINDOW_DAYS);
for (const date of dates) {
const cdrRows = await this.ozonetel.fetchCDR({ date }).catch(() => []);
if (cdrRows.length === 0) continue;
// Build UCID → cdr-row map so we can O(1) join per Call.
const byUcid = new Map<string, any>();
for (const row of cdrRows) {
const ucid = String(row.UCID ?? '').trim();
if (ucid) byUcid.set(ucid, row);
}
if (byUcid.size === 0) continue;
// Pull Calls in the same date window that are missing agent linkage
// (i.e. ucid set, agentId null). Patch each.
const calls = await this.fetchCallsMissingAgent(date);
scanned += calls.length;
for (const call of calls) {
const cdrRow = byUcid.get(String(call.ucid).trim());
if (!cdrRow) { skipped++; continue; }
const patch: Record<string, any> = {};
const cdrAgentId = cdrRow.AgentID;
if (cdrAgentId && !call.agentId) {
const uuid = await this.agentLookup.resolveByOzonetelId(cdrAgentId);
if (uuid) patch.agentId = uuid;
if (cdrRow.AgentName) patch.agentName = cdrRow.AgentName;
}
if (cdrRow.TransferredTo && !call.transferredTo) patch.transferredTo = cdrRow.TransferredTo;
if (cdrRow.TransferType && !call.transferType) patch.transferType = cdrRow.TransferType;
if (Object.keys(patch).length === 0) { skipped++; continue; }
try {
await this.platform.query<any>(
`mutation($id: UUID!, $data: CallUpdateInput!) { updateCall(id: $id, data: $data) { id } }`,
{ id: call.id, data: patch },
);
enriched++;
await new Promise((r) => setTimeout(r, 50));
} catch (err: any) {
this.logger.warn(`[CDR-ENRICH] Patch failed for ${call.id}: ${err?.message ?? err}`);
skipped++;
}
}
}
if (scanned > 0 || enriched > 0) {
this.logger.log(`[CDR-ENRICH] Pass complete — dates=[${dates.join(',')}] scanned=${scanned} enriched=${enriched} skipped=${skipped}`);
}
return { scanned, enriched, skipped };
}
private async fetchCallsMissingAgent(date: string): Promise<Array<{ id: string; ucid: string | null; agentId: string | null; transferredTo: string | null; transferType: string | null }>> {
// Bound by IST day. CDR window is 15 days; we only ever need recent.
const gte = `${date}T00:00:00+05:30`;
const lte = `${date}T23:59:59+05:30`;
const results: Array<any> = [];
let after: string | null = null;
for (let page = 0; page < 20; page++) {
const cursorArg: string = after ? `, after: "${after}"` : '';
const data: any = await this.platform.query<any>(
`{ calls(first: 200${cursorArg}, filter: {
startedAt: { gte: "${gte}", lte: "${lte}" },
ucid: { is: NOT_NULL },
agentId: { is: NULL }
}) {
edges { node { id ucid agentId transferredTo transferType } }
pageInfo { hasNextPage endCursor }
} }`,
).catch(() => ({ calls: { edges: [], pageInfo: {} } }));
const edges = data?.calls?.edges ?? [];
for (const e of edges) results.push(e.node);
const pageInfo = data?.calls?.pageInfo ?? {};
if (!pageInfo.hasNextPage) break;
after = pageInfo.endCursor ?? null;
}
return results;
}
private recentDatesIst(n: number): string[] {
const dates: string[] = [];
for (let i = 0; i < n; i++) {
const d = new Date(Date.now() + 5.5 * 60 * 60 * 1000 - i * 24 * 60 * 60 * 1000);
dates.push(d.toISOString().slice(0, 10));
}
return dates;
}
}

View File

@@ -2,6 +2,7 @@ import { Controller, Post, Get, Body, Query, Logger, HttpException } from '@nest
import { OzonetelAgentService } from './ozonetel-agent.service'; import { OzonetelAgentService } from './ozonetel-agent.service';
import { MissedQueueService } from '../worklist/missed-queue.service'; import { MissedQueueService } from '../worklist/missed-queue.service';
import { PlatformGraphqlService } from '../platform/platform-graphql.service'; import { PlatformGraphqlService } from '../platform/platform-graphql.service';
import { AgentLookupService } from '../platform/agent-lookup.service';
import { EventBusService } from '../events/event-bus.service'; import { EventBusService } from '../events/event-bus.service';
import { Topics } from '../events/event-types'; import { Topics } from '../events/event-types';
import { TelephonyConfigService } from '../config/telephony-config.service'; import { TelephonyConfigService } from '../config/telephony-config.service';
@@ -28,6 +29,7 @@ export class OzonetelAgentController {
private readonly platform: PlatformGraphqlService, private readonly platform: PlatformGraphqlService,
private readonly eventBus: EventBusService, private readonly eventBus: EventBusService,
private readonly supervisor: SupervisorService, private readonly supervisor: SupervisorService,
private readonly agentLookup: AgentLookupService,
) {} ) {}
private requireAgentId(agentId: string | undefined | null): string { private requireAgentId(agentId: string | undefined | null): string {
@@ -174,6 +176,14 @@ export class OzonetelAgentController {
startedAt, startedAt,
endedAt, endedAt,
}; };
// Persist UCID so the CDR enrichment cron and backfill can
// resolve the authoritative agent relation even if the initial
// lookup misses.
if (body.ucid) callData.ucid = body.ucid;
// Resolve the agent relation from the logged-in agentId. For
// outbound, the dispatching agent IS the handler — no transfer.
const agentUuid = await this.agentLookup.resolveByOzonetelId(agentId);
if (agentUuid) callData.agentId = agentUuid;
if (body.leadId) callData.leadId = body.leadId; if (body.leadId) callData.leadId = body.leadId;
if (body.leadName) callData.leadName = body.leadName; if (body.leadName) callData.leadName = body.leadName;
@@ -210,6 +220,17 @@ export class OzonetelAgentController {
if (handlingSec !== null) updateData.handlingTimeS = handlingSec; if (handlingSec !== null) updateData.handlingTimeS = handlingSec;
if (wrapupSec !== null) updateData.acwDurationS = wrapupSec; if (wrapupSec !== null) updateData.acwDurationS = wrapupSec;
if (holdSec !== null) updateData.holdDurationS = holdSec; if (holdSec !== null) updateData.holdDurationS = holdSec;
// Overwrite agent relation with CDR's AgentID (the
// actual final handler; may differ from the caller
// agentId if Ozonetel transferred the dial).
const cdrAgentId = record?.AgentID;
if (cdrAgentId) {
const cdrAgentUuid = await this.agentLookup.resolveByOzonetelId(cdrAgentId);
if (cdrAgentUuid) updateData.agentId = cdrAgentUuid;
if (record.AgentName) updateData.agentName = record.AgentName;
}
if (record?.TransferredTo) updateData.transferredTo = record.TransferredTo;
if (record?.TransferType) updateData.transferType = record.TransferType;
if (Object.keys(updateData).length > 0) { if (Object.keys(updateData).length > 0) {
await this.platform.queryWithAuth<any>( await this.platform.queryWithAuth<any>(
`mutation($id: UUID!, $data: CallUpdateInput!) { updateCall(id: $id, data: $data) { id } }`, `mutation($id: UUID!, $data: CallUpdateInput!) { updateCall(id: $id, data: $data) { id } }`,

View File

@@ -2,6 +2,7 @@ import { Module, forwardRef } from '@nestjs/common';
import { OzonetelAgentController } from './ozonetel-agent.controller'; import { OzonetelAgentController } from './ozonetel-agent.controller';
import { OzonetelAgentService } from './ozonetel-agent.service'; import { OzonetelAgentService } from './ozonetel-agent.service';
import { KookooIvrController } from './kookoo-ivr.controller'; import { KookooIvrController } from './kookoo-ivr.controller';
import { CdrEnrichmentService } from './cdr-enrichment.service';
import { WorklistModule } from '../worklist/worklist.module'; import { WorklistModule } from '../worklist/worklist.module';
import { PlatformModule } from '../platform/platform.module'; import { PlatformModule } from '../platform/platform.module';
import { SupervisorModule } from '../supervisor/supervisor.module'; import { SupervisorModule } from '../supervisor/supervisor.module';
@@ -9,7 +10,7 @@ import { SupervisorModule } from '../supervisor/supervisor.module';
@Module({ @Module({
imports: [PlatformModule, forwardRef(() => WorklistModule), forwardRef(() => SupervisorModule)], imports: [PlatformModule, forwardRef(() => WorklistModule), forwardRef(() => SupervisorModule)],
controllers: [OzonetelAgentController, KookooIvrController], controllers: [OzonetelAgentController, KookooIvrController],
providers: [OzonetelAgentService], providers: [OzonetelAgentService, CdrEnrichmentService],
exports: [OzonetelAgentService], exports: [OzonetelAgentService, CdrEnrichmentService],
}) })
export class OzonetelAgentModule {} export class OzonetelAgentModule {}

View File

@@ -0,0 +1,54 @@
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { PlatformGraphqlService } from './platform-graphql.service';
/**
* Maps Ozonetel agent identifiers (unique — e.g. "ramaiahadmin",
* "globalhealthx", "global") to the platform Agent entity UUID. Used by
* ingest paths (webhook, dispose, CDR enrichment, backfill) so every Call
* ends up with the correct `agent` relation regardless of how Ozonetel
* formats the display name (AgentName collisions, transfer chains like
* "A -> B -> C", etc.).
*
* The cache is case-insensitive because Ozonetel occasionally mixes
* casing ("global" vs "Global" vs "GLOBAL") across webhook/CDR responses.
*/
@Injectable()
export class AgentLookupService implements OnModuleInit {
private readonly logger = new Logger(AgentLookupService.name);
private readonly uuidByOzonetelId = new Map<string, string>();
constructor(private readonly platform: PlatformGraphqlService) {}
async onModuleInit() {
await this.refresh();
}
async refresh(): Promise<void> {
try {
const data = await this.platform.query<any>(
`{ agents(first: 100) { edges { node { id ozonetelAgentId } } } }`,
);
const edges = data?.agents?.edges ?? [];
this.uuidByOzonetelId.clear();
for (const edge of edges) {
const n = edge.node;
if (n.ozonetelAgentId) {
this.uuidByOzonetelId.set(n.ozonetelAgentId.toLowerCase(), n.id);
}
}
this.logger.log(`[AGENT-LOOKUP] Loaded ${this.uuidByOzonetelId.size} agents`);
} catch (err) {
this.logger.warn(`[AGENT-LOOKUP] Refresh failed: ${err}`);
}
}
async resolveByOzonetelId(ozonetelId: string | null | undefined): Promise<string | null> {
if (!ozonetelId) return null;
const key = ozonetelId.toLowerCase();
const cached = this.uuidByOzonetelId.get(key);
if (cached) return cached;
// Cache miss — refresh once (handles late-provisioned agents)
await this.refresh();
return this.uuidByOzonetelId.get(key) ?? null;
}
}

View File

@@ -1,8 +1,9 @@
import { Module } from '@nestjs/common'; import { Module } from '@nestjs/common';
import { PlatformGraphqlService } from './platform-graphql.service'; import { PlatformGraphqlService } from './platform-graphql.service';
import { AgentLookupService } from './agent-lookup.service';
@Module({ @Module({
providers: [PlatformGraphqlService], providers: [PlatformGraphqlService, AgentLookupService],
exports: [PlatformGraphqlService], exports: [PlatformGraphqlService, AgentLookupService],
}) })
export class PlatformModule {} export class PlatformModule {}

View File

@@ -182,6 +182,10 @@ export class MissedCallWebhookController {
durationSec: data.duration, durationSec: data.duration,
disposition: this.mapDisposition(data.disposition), disposition: this.mapDisposition(data.disposition),
}; };
// Persist UCID so the 30-min CDR enrichment cron and historical
// backfill can pair this row to a CDR record and fill in the
// authoritative agent relation.
if (data.ucid) callData.ucid = data.ucid;
if (data.leadId) callData.leadId = data.leadId; if (data.leadId) callData.leadId = data.leadId;
if (data.leadName) callData.leadName = data.leadName; if (data.leadName) callData.leadName = data.leadName;
// Set callback tracking fields for missed calls so they appear in the worklist // Set callback tracking fields for missed calls so they appear in the worklist