mirror of
https://dev.azure.com/globalhealthx/EMR/_git/helix-engage-server
synced 2026-05-18 20:08:19 +00:00
feat(maint): backfill-agent-event-durations endpoint
Recomputes durationS on existing AgentEvent rows using the fixed
per-category pairing logic and re-runs the session rollup for every
affected date. Fixes the 0-second CALL_END durations written before the
slot-split fix.
Idempotent — only patches rows whose stored durationS differs from the
newly computed value. Safe to re-run.
POST /api/maint/backfill-agent-event-durations
Headers: x-maint-otp: <OTP>
Body: { "date": "YYYY-MM-DD" } (optional; defaults to today IST; use "all" to backfill every row)
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -4,6 +4,7 @@ import { OzonetelAgentService } from '../ozonetel/ozonetel-agent.service';
|
||||
import { PlatformGraphqlService } from '../platform/platform-graphql.service';
|
||||
import { SessionService } from '../auth/session.service';
|
||||
import { SupervisorService } from '../supervisor/supervisor.service';
|
||||
import { AgentHistoryService, AgentEventType } from '../supervisor/agent-history.service';
|
||||
import { CallerResolutionService } from '../caller/caller-resolution.service';
|
||||
import { TelephonyConfigService } from '../config/telephony-config.service';
|
||||
|
||||
@@ -19,6 +20,7 @@ export class MaintController {
|
||||
private readonly session: SessionService,
|
||||
private readonly supervisor: SupervisorService,
|
||||
private readonly callerResolution: CallerResolutionService,
|
||||
private readonly history: AgentHistoryService,
|
||||
) {}
|
||||
|
||||
@Post('force-ready')
|
||||
@@ -421,4 +423,135 @@ export class MaintController {
|
||||
phones: { unique: resolvedByPhone.size, resolved: leadsResolved, errors: resolveErrors },
|
||||
};
|
||||
}
|
||||
|
||||
// Recompute durationS on existing AgentEvent rows using the per-category
|
||||
// pairing logic. Fixes rows written before the slot-split fix where
|
||||
// ACW_START clobbered CALL_START's pending entry. Also re-runs the
|
||||
// session rollup for each affected date. Idempotent — only updates rows
|
||||
// whose stored durationS differs from the recomputed value.
|
||||
//
|
||||
// POST /api/maint/backfill-agent-event-durations
|
||||
// body: { date?: "YYYY-MM-DD" | "all" } — default today IST
|
||||
@Post('backfill-agent-event-durations')
|
||||
async backfillAgentEventDurations(@Body() body: { date?: string }) {
|
||||
const target = body?.date ?? this.todayIst();
|
||||
this.logger.log(`[MAINT] Backfill AgentEvent durations — target=${target}`);
|
||||
|
||||
// Pull events for the range. If "all", no filter; otherwise scope to the IST day.
|
||||
let events = await this.fetchAgentEventsForBackfill(target);
|
||||
if (events.length === 0) {
|
||||
return { status: 'ok', scanned: 0, patched: 0, skipped: 0, dates: [] };
|
||||
}
|
||||
this.logger.log(`[MAINT] Fetched ${events.length} AgentEvent rows`);
|
||||
|
||||
// Group by agent, sort by eventAt ascending.
|
||||
const byAgent = new Map<string, typeof events>();
|
||||
for (const e of events) {
|
||||
const k = e.agentId;
|
||||
if (!k) continue;
|
||||
if (!byAgent.has(k)) byAgent.set(k, []);
|
||||
byAgent.get(k)!.push(e);
|
||||
}
|
||||
for (const list of byAgent.values()) {
|
||||
list.sort((a, b) => new Date(a.eventAt).getTime() - new Date(b.eventAt).getTime());
|
||||
}
|
||||
|
||||
// Per-category slot pairing, same logic as the live ingest.
|
||||
const slotForStart = (t: AgentEventType): 'pause' | 'call' | 'acw' | null =>
|
||||
t === 'PAUSE' ? 'pause' : t === 'CALL_START' ? 'call' : t === 'ACW_START' ? 'acw' : null;
|
||||
const slotForEnd = (t: AgentEventType): 'pause' | 'call' | 'acw' | null =>
|
||||
t === 'RESUME' ? 'pause' : t === 'CALL_END' ? 'call' : t === 'ACW_END' ? 'acw' : null;
|
||||
|
||||
let patched = 0;
|
||||
let skipped = 0;
|
||||
const affectedDates = new Set<string>();
|
||||
|
||||
for (const [agentId, agentEvents] of byAgent) {
|
||||
const pending: { pause?: number; call?: number; acw?: number } = {};
|
||||
for (const e of agentEvents) {
|
||||
const eventMs = new Date(e.eventAt).getTime();
|
||||
const endSlot = slotForEnd(e.eventType);
|
||||
const startSlot = slotForStart(e.eventType);
|
||||
|
||||
let computed: number | null = null;
|
||||
|
||||
if (endSlot) {
|
||||
const at = pending[endSlot];
|
||||
if (at !== undefined) {
|
||||
computed = Math.max(0, Math.round((eventMs - at) / 1000));
|
||||
delete pending[endSlot];
|
||||
}
|
||||
} else if (startSlot) {
|
||||
pending[startSlot] = eventMs;
|
||||
} else if (e.eventType === 'READY' || e.eventType === 'LOGOUT') {
|
||||
delete pending.pause;
|
||||
delete pending.call;
|
||||
delete pending.acw;
|
||||
}
|
||||
|
||||
// Only patch END events that now have a computed duration
|
||||
// different from what's stored.
|
||||
if (endSlot && computed !== null && computed !== (e.durationS ?? null)) {
|
||||
try {
|
||||
await this.platform.query<any>(
|
||||
`mutation { updateAgentEvent(id: "${e.id}", data: { durationS: ${computed} }) { id } }`,
|
||||
);
|
||||
patched++;
|
||||
const datePart = (e.eventAt ?? '').slice(0, 10);
|
||||
if (datePart) affectedDates.add(datePart);
|
||||
this.logger.log(`[MAINT] Patched AgentEvent ${e.id} ${e.eventType} agent=${agentId} ${e.durationS ?? 'null'}s → ${computed}s`);
|
||||
await new Promise((r) => setTimeout(r, 80));
|
||||
} catch (err) {
|
||||
this.logger.warn(`[MAINT] Patch failed for ${e.id}: ${err}`);
|
||||
skipped++;
|
||||
}
|
||||
} else {
|
||||
skipped++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Re-run rollup for each affected date so AgentSession numbers update.
|
||||
const dates = Array.from(affectedDates);
|
||||
for (const d of dates) {
|
||||
try {
|
||||
await this.history.rollupSessions(d);
|
||||
this.logger.log(`[MAINT] Rollup re-run for ${d}`);
|
||||
} catch (err) {
|
||||
this.logger.warn(`[MAINT] Rollup failed for ${d}: ${err}`);
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.log(`[MAINT] Backfill AgentEvent durations complete: scanned=${events.length} patched=${patched} skipped=${skipped} dates=${dates.join(',')}`);
|
||||
return { status: 'ok', scanned: events.length, patched, skipped, dates };
|
||||
}
|
||||
|
||||
private todayIst(): string {
|
||||
const ist = new Date(Date.now() + 5.5 * 60 * 60 * 1000);
|
||||
return ist.toISOString().slice(0, 10);
|
||||
}
|
||||
|
||||
private async fetchAgentEventsForBackfill(date: string): Promise<Array<{ id: string; eventType: AgentEventType; eventAt: string; durationS: number | null; agentId: string }>> {
|
||||
const events: Array<{ id: string; eventType: AgentEventType; eventAt: string; durationS: number | null; agentId: string }> = [];
|
||||
let after: string | null = null;
|
||||
const rangeFilter = date === 'all'
|
||||
? ''
|
||||
: `, filter: { eventAt: { gte: "${date}T00:00:00+05:30", lte: "${date}T23:59:59+05:30" } }`;
|
||||
|
||||
for (let page = 0; page < 50; page++) {
|
||||
const cursorArg: string = after ? `, after: "${after}"` : '';
|
||||
const data: any = await this.platform.query<any>(
|
||||
`{ agentEvents(first: 200${cursorArg}${rangeFilter}, orderBy: [{ eventAt: AscNullsLast }]) {
|
||||
edges { node { id eventType eventAt durationS agentId } }
|
||||
pageInfo { hasNextPage endCursor }
|
||||
} }`,
|
||||
);
|
||||
const edges = data?.agentEvents?.edges ?? [];
|
||||
for (const e of edges) events.push(e.node);
|
||||
const pageInfo: { hasNextPage?: boolean; endCursor?: string } = data?.agentEvents?.pageInfo ?? {};
|
||||
if (!pageInfo.hasNextPage) break;
|
||||
after = pageInfo.endCursor ?? null;
|
||||
}
|
||||
return events;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user