feat(supervisor): Phase 2 metrics ingest — AgentEvent/AgentSession rollup

- New AgentHistoryService: persistAgentEvent pairs START/END for durationS, patchCallTiming updates Call SLA fields
- Supervisor service wires handleCallEvent (CALL_START on Answered, CALL_END on Disconnect) and handleAgentEvent (LOGIN/LOGOUT/PAUSE/RESUME/ACW_START/ACW_END/READY) via priorState-aware mapping
- setInterval-based nightly-ish rollup: every 15min aggregates AgentEvent into AgentSession per IST day (idempotent upsert by agentId+date)
- Ozonetel dispose flow extracts HandlingTime/WrapupDuration/HoldDuration from CDR, patches Call timing fields
- Field names match platform truncation: durationS, loginDurationS, busyTimeS, idleTimeS, pauseTimeS, wrapupTimeS, avgHandlingTimeS, handlingTimeS, acwDurationS, holdDurationS, responseTimeS, sessionDate → date
- Skips cleanly on workspaces where AgentEvent entity isn't synced

Known issue: pending-pair map has single slot per agent, so ACW_START overwrites pending CALL_START and CALL_END computes 0s duration. Fix in followup.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-15 06:49:15 +05:30
parent fbe782b5ac
commit 4eb8cb80b2
4 changed files with 527 additions and 14 deletions

View File

@@ -0,0 +1,363 @@
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { PlatformGraphqlService } from '../platform/platform-graphql.service';
// AgentEvent enum values (mirror of the SDK app's agent-event.object.ts).
// Ozonetel webhook actions → Helix event types.
export type AgentEventType =
| 'LOGIN'
| 'LOGOUT'
| 'READY'
| 'PAUSE'
| 'RESUME'
| 'CALL_START'
| 'CALL_END'
| 'ACW_START'
| 'ACW_END';
type PendingStart = {
eventType: 'PAUSE' | 'CALL_START' | 'ACW_START';
at: number; // ms timestamp
};
/**
* Persists agent activity and per-call timing into the platform entities
* we added in Phase 1 (AgentEvent, Call SLA fields). Reads AgentSession
* later via the rollup job.
*
* Called from:
* - supervisor.service.handleAgentEvent → persistAgentEvent()
* - supervisor.service.handleCallEvent → patchCallTiming()
* - ozonetel-agent.controller dispose flow → patchCallTiming()
*/
@Injectable()
export class AgentHistoryService implements OnModuleInit {
private readonly logger = new Logger(AgentHistoryService.name);
// ozonetelAgentId → Agent entity UUID. Loaded at startup.
private readonly agentUuidByOzonetelId = new Map<string, string>();
// agentId → most recent "start" event that's awaiting a paired "end",
// used to compute durationSec on the END event.
private readonly pendingStartByAgent = new Map<string, PendingStart>();
constructor(private readonly platform: PlatformGraphqlService) {}
private rollupTimer: NodeJS.Timeout | null = null;
async onModuleInit() {
await this.refreshAgentCache();
// Roll up today's sessions every 15 minutes. Rollup is idempotent
// (upsert by agent+date), so missing a tick is safe — the next tick
// recomputes from AgentEvent history. Written with setInterval because
// @nestjs/schedule isn't installed in this sidecar.
this.rollupTimer = setInterval(() => {
this.rollupSessions(this.currentSessionDate()).catch((err) => {
this.logger.warn(`[HISTORY] Rollup tick failed: ${err?.message ?? err}`);
});
}, 15 * 60 * 1000);
// Kick off one immediately so the dashboard has data on boot.
this.rollupSessions(this.currentSessionDate()).catch(() => {});
}
onModuleDestroy() {
if (this.rollupTimer) clearInterval(this.rollupTimer);
}
// IST day boundary — agents work in IST, so the rollup is by IST date.
private currentSessionDate(): string {
const now = new Date();
const ist = new Date(now.getTime() + 5.5 * 60 * 60 * 1000);
return ist.toISOString().slice(0, 10);
}
private async refreshAgentCache(): Promise<void> {
try {
const data = await this.platform.query<any>(
`{ agents(first: 50) { edges { node { id ozonetelAgentId } } } }`,
);
const edges = data?.agents?.edges ?? [];
this.agentUuidByOzonetelId.clear();
for (const edge of edges) {
const n = edge.node;
if (n.ozonetelAgentId) {
this.agentUuidByOzonetelId.set(n.ozonetelAgentId, n.id);
}
}
this.logger.log(`[HISTORY] Loaded ${this.agentUuidByOzonetelId.size} agent UUIDs into cache`);
} catch (err) {
this.logger.warn(`[HISTORY] Failed to refresh agent cache: ${err}`);
}
}
private async resolveAgentUuid(ozonetelAgentId: string): Promise<string | null> {
if (!ozonetelAgentId) return null;
const cached = this.agentUuidByOzonetelId.get(ozonetelAgentId);
if (cached) return cached;
// Cache miss — refresh once (handles late-provisioned agents like Ganesh)
await this.refreshAgentCache();
return this.agentUuidByOzonetelId.get(ozonetelAgentId) ?? null;
}
/**
* Record an agent activity event. Computes durationSec for END events
* (RESUME, CALL_END, ACW_END) by pairing against the most recent START.
* Non-fatal on failure — realtime SSE flow continues even if the
* platform write errors.
*/
async persistAgentEvent(params: {
ozonetelAgentId: string;
eventType: AgentEventType;
eventAt: string; // ISO
pauseReason?: string | null;
callId?: string | null;
}): Promise<void> {
const agentUuid = await this.resolveAgentUuid(params.ozonetelAgentId);
if (!agentUuid) {
this.logger.warn(`[HISTORY] No Agent entity for ozonetelAgentId=${params.ozonetelAgentId} — skipping event persist`);
return;
}
// Compute duration when closing out a pending start. Ozonetel
// emits a single "release" action for both post-pause and post-ACW,
// so a READY event closes whatever was open. Specific close events
// (RESUME / ACW_END / CALL_END) also pair to their explicit start.
let durationSec: number | null = null;
if (this.closesAnyOpenStart(params.eventType)) {
const pending = this.pendingStartByAgent.get(params.ozonetelAgentId);
if (pending) {
durationSec = Math.max(0, Math.round((new Date(params.eventAt).getTime() - pending.at) / 1000));
this.pendingStartByAgent.delete(params.ozonetelAgentId);
}
} else if (this.isStartEvent(params.eventType)) {
// Overwrite any existing pending start for this agent. Ozonetel
// doesn't nest states, so a new start implicitly ends the previous.
this.pendingStartByAgent.set(params.ozonetelAgentId, {
eventType: params.eventType as PendingStart['eventType'],
at: new Date(params.eventAt).getTime(),
});
}
const data: Record<string, any> = {
eventType: params.eventType,
eventAt: params.eventAt,
source: 'OZONETEL_SUBSCRIPTION',
agentId: agentUuid,
};
if (params.pauseReason) data.pauseReason = params.pauseReason;
if (durationSec !== null) data.durationS = durationSec;
if (params.callId) data.callId = params.callId;
try {
await this.platform.query<any>(
`mutation($data: AgentEventCreateInput!) { createAgentEvent(data: $data) { id } }`,
{ data },
);
} catch (err: any) {
if (this.isEntityMissingError(err)) {
if (!this.warnedEntityMissing) {
this.logger.warn('[HISTORY] AgentEvent entity not synced on this workspace — skipping persistence');
this.warnedEntityMissing = true;
}
return;
}
this.logger.warn(`[HISTORY] createAgentEvent failed: ${err}`);
}
}
private warnedEntityMissing = false;
private isEntityMissingError(err: unknown): boolean {
const msg = String((err as any)?.message ?? err ?? '');
return msg.includes('Cannot query field') || msg.includes('Unknown type')
|| msg.includes('AgentEventCreateInput') || msg.includes('AgentSessionCreateInput');
}
private closesAnyOpenStart(endType: AgentEventType): boolean {
// READY, RESUME, CALL_END, ACW_END all close whatever start was open.
// LOGIN/LOGOUT don't — they're session boundaries.
return endType === 'READY'
|| endType === 'RESUME'
|| endType === 'CALL_END'
|| endType === 'ACW_END';
}
private isStartEvent(eventType: AgentEventType): boolean {
return eventType === 'PAUSE' || eventType === 'CALL_START' || eventType === 'ACW_START';
}
/**
* Patch a Call record with SLA / timing fields derived from Ozonetel
* webhooks or post-call CDR. All fields optional — caller passes only
* what it has. Used for response-time and ACW histograms on the
* supervisor dashboard.
*/
async patchCallTiming(callId: string, fields: {
assignedAt?: string;
answeredAt?: string;
responseTimeSec?: number;
handlingTimeSec?: number;
acwDurationSec?: number;
holdDurationSec?: number;
}): Promise<void> {
// Platform truncates `*Sec` → `*S` on field names.
const fieldNameMap: Record<string, string> = {
responseTimeSec: 'responseTimeS',
handlingTimeSec: 'handlingTimeS',
acwDurationSec: 'acwDurationS',
holdDurationSec: 'holdDurationS',
};
const data: Record<string, any> = {};
for (const [k, v] of Object.entries(fields)) {
if (v !== undefined && v !== null) {
data[fieldNameMap[k] ?? k] = v;
}
}
if (Object.keys(data).length === 0) return;
try {
await this.platform.query<any>(
`mutation($id: UUID!, $data: CallUpdateInput!) { updateCall(id: $id, data: $data) { id } }`,
{ id: callId, data },
);
} catch (err) {
this.logger.warn(`[HISTORY] updateCall timing failed (${callId}): ${err}`);
}
}
/**
* Aggregate AgentEvent rows into an AgentSession row per agent for the
* given IST date. Called on a 15-minute interval; upserts by (agent,
* sessionDate) so re-runs are safe.
*/
async rollupSessions(sessionDate: string): Promise<void> {
if (this.agentUuidByOzonetelId.size === 0) await this.refreshAgentCache();
const agentUuids = Array.from(new Set(this.agentUuidByOzonetelId.values()));
if (agentUuids.length === 0) return;
const startIso = `${sessionDate}T00:00:00+05:30`;
const endIso = `${sessionDate}T23:59:59+05:30`;
let succeeded = 0;
for (const agentUuid of agentUuids) {
try {
const events = await this.fetchAgentEvents(agentUuid, startIso, endIso);
const totals = this.aggregateEvents(events);
await this.upsertSession(agentUuid, sessionDate, totals);
succeeded++;
} catch (err: any) {
if (this.isEntityMissingError(err)) {
if (!this.warnedEntityMissing) {
this.logger.warn('[HISTORY] AgentEvent/AgentSession entities not synced on this workspace — skipping rollup');
this.warnedEntityMissing = true;
}
return;
}
this.logger.warn(`[HISTORY] Rollup failed for agent ${agentUuid}: ${err?.message ?? err}`);
}
}
this.logger.log(`[HISTORY] Rollup complete for ${sessionDate}${succeeded}/${agentUuids.length} agents`);
}
// Platform strips the `Sec` suffix on numeric field names — schema uses
// `durationS`, `loginDurationS`, etc. Map back to our canonical names
// when reading.
private async fetchAgentEvents(agentUuid: string, startIso: string, endIso: string): Promise<Array<{ eventType: AgentEventType; durationSec: number | null; eventAt: string }>> {
const events: Array<{ eventType: AgentEventType; durationSec: number | null; eventAt: string }> = [];
let after: string | null = null;
for (let page = 0; page < 20; page++) {
const cursorArg: string = after ? `, after: "${after}"` : '';
const data: any = await this.platform.query<any>(
`{ agentEvents(first: 200${cursorArg}, filter: { agentId: { eq: "${agentUuid}" }, eventAt: { gte: "${startIso}", lte: "${endIso}" } }, orderBy: [{ eventAt: AscNullsLast }]) {
edges { node { eventType eventAt durationS } }
pageInfo { hasNextPage endCursor }
} }`,
);
const edges = data?.agentEvents?.edges ?? [];
for (const e of edges) {
events.push({
eventType: e.node.eventType,
eventAt: e.node.eventAt,
durationSec: e.node.durationS ?? null,
});
}
const pageInfo: { hasNextPage?: boolean; endCursor?: string } = data?.agentEvents?.pageInfo ?? {};
if (!pageInfo.hasNextPage) break;
after = pageInfo.endCursor ?? null;
}
return events;
}
private aggregateEvents(events: Array<{ eventType: AgentEventType; durationSec: number | null; eventAt: string }>) {
let busyTimeSec = 0;
let pauseTimeSec = 0;
let wrapupTimeSec = 0;
let handlingSum = 0;
let handlingCount = 0;
// Login duration: sum each LOGIN → (next LOGOUT on same day | now) span.
// Ozonetel doesn't emit a LOGOUT if the agent just closes the tab, so
// cap open sessions at the end of the rollup day.
let loginDurationSec = 0;
let openLoginAt: number | null = null;
for (const e of events) {
if (e.eventType === 'LOGIN') {
openLoginAt = new Date(e.eventAt).getTime();
} else if (e.eventType === 'LOGOUT' && openLoginAt !== null) {
loginDurationSec += Math.max(0, Math.round((new Date(e.eventAt).getTime() - openLoginAt) / 1000));
openLoginAt = null;
} else if (e.eventType === 'CALL_END' && e.durationSec) {
busyTimeSec += e.durationSec;
handlingSum += e.durationSec;
handlingCount++;
} else if (e.eventType === 'RESUME' && e.durationSec) {
pauseTimeSec += e.durationSec;
} else if (e.eventType === 'ACW_END' && e.durationSec) {
wrapupTimeSec += e.durationSec;
}
}
if (openLoginAt !== null) {
// Still logged in — count up to now (capped to the rollup day end).
loginDurationSec += Math.max(0, Math.round((Date.now() - openLoginAt) / 1000));
}
const avgHandlingTimeSec = handlingCount > 0 ? Math.round(handlingSum / handlingCount) : null;
const idleTimeSec = Math.max(0, loginDurationSec - busyTimeSec - pauseTimeSec - wrapupTimeSec);
return { loginDurationSec, busyTimeSec, pauseTimeSec, wrapupTimeSec, idleTimeSec, avgHandlingTimeSec };
}
// AgentSession fields map: our `*Sec` → platform `*S`, `sessionDate` → `date`.
private async upsertSession(
agentUuid: string,
sessionDate: string,
totals: { loginDurationSec: number; busyTimeSec: number; pauseTimeSec: number; wrapupTimeSec: number; idleTimeSec: number; avgHandlingTimeSec: number | null },
): Promise<void> {
const existing = await this.platform.query<any>(
`{ agentSessions(first: 1, filter: { agentId: { eq: "${agentUuid}" }, date: { eq: "${sessionDate}" } }) { edges { node { id } } } }`,
);
const existingId = existing?.agentSessions?.edges?.[0]?.node?.id;
const data: Record<string, any> = {
loginDurationS: totals.loginDurationSec,
busyTimeS: totals.busyTimeSec,
pauseTimeS: totals.pauseTimeSec,
wrapupTimeS: totals.wrapupTimeSec,
idleTimeS: totals.idleTimeSec,
source: 'COMPUTED',
lastSyncedAt: new Date().toISOString(),
};
if (totals.avgHandlingTimeSec !== null) data.avgHandlingTimeS = totals.avgHandlingTimeSec;
if (existingId) {
await this.platform.query<any>(
`mutation($id: UUID!, $data: AgentSessionUpdateInput!) { updateAgentSession(id: $id, data: $data) { id } }`,
{ id: existingId, data },
);
} else {
await this.platform.query<any>(
`mutation($data: AgentSessionCreateInput!) { createAgentSession(data: $data) { id } }`,
{ data: { ...data, agentId: agentUuid, date: sessionDate } },
);
}
}
}