import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; import { PlatformGraphqlService } from '../platform/platform-graphql.service'; // AgentEvent enum values (mirror of the SDK app's agent-event.object.ts). // Ozonetel webhook actions → Helix event types. export type AgentEventType = | 'LOGIN' | 'LOGOUT' | 'READY' | 'PAUSE' | 'RESUME' | 'CALL_START' | 'CALL_END' | 'ACW_START' | 'ACW_END'; // Separate pending slots per event category. Call + ACW overlap (agent // enters ACW before the CALL_END arrives), so a single shared slot would // let ACW_START clobber pending CALL_START and produce 0-second call // durations. Keep one slot per category so each END event pairs cleanly. type PendingSlot = 'pause' | 'call' | 'acw'; type PendingStarts = { pause?: number; // PAUSE eventAt ms call?: number; // CALL_START eventAt ms acw?: number; // ACW_START eventAt ms }; /** * Persists agent activity and per-call timing into the platform entities * we added in Phase 1 (AgentEvent, Call SLA fields). Reads AgentSession * later via the rollup job. * * Called from: * - supervisor.service.handleAgentEvent → persistAgentEvent() * - supervisor.service.handleCallEvent → patchCallTiming() * - ozonetel-agent.controller dispose flow → patchCallTiming() */ @Injectable() export class AgentHistoryService implements OnModuleInit { private readonly logger = new Logger(AgentHistoryService.name); // ozonetelAgentId → Agent entity UUID. Loaded at startup. private readonly agentUuidByOzonetelId = new Map(); // agentId → map of pending start events per category, used to compute // durationSec on the matching END event. private readonly pendingStartsByAgent = new Map(); constructor(private readonly platform: PlatformGraphqlService) {} private rollupTimer: NodeJS.Timeout | null = null; async onModuleInit() { await this.refreshAgentCache(); // Roll up today's sessions every 15 minutes. Rollup is idempotent // (upsert by agent+date), so missing a tick is safe — the next tick // recomputes from AgentEvent history. Written with setInterval because // @nestjs/schedule isn't installed in this sidecar. this.rollupTimer = setInterval(() => { this.rollupSessions(this.currentSessionDate()).catch((err) => { this.logger.warn(`[HISTORY] Rollup tick failed: ${err?.message ?? err}`); }); }, 15 * 60 * 1000); // Kick off one immediately so the dashboard has data on boot. this.rollupSessions(this.currentSessionDate()).catch(() => {}); } onModuleDestroy() { if (this.rollupTimer) clearInterval(this.rollupTimer); } // IST day boundary — agents work in IST, so the rollup is by IST date. private currentSessionDate(): string { const now = new Date(); const ist = new Date(now.getTime() + 5.5 * 60 * 60 * 1000); return ist.toISOString().slice(0, 10); } private async refreshAgentCache(): Promise { try { const data = await this.platform.query( `{ agents(first: 50) { edges { node { id ozonetelAgentId } } } }`, ); const edges = data?.agents?.edges ?? []; this.agentUuidByOzonetelId.clear(); for (const edge of edges) { const n = edge.node; if (n.ozonetelAgentId) { this.agentUuidByOzonetelId.set(n.ozonetelAgentId, n.id); } } this.logger.log(`[HISTORY] Loaded ${this.agentUuidByOzonetelId.size} agent UUIDs into cache`); } catch (err) { this.logger.warn(`[HISTORY] Failed to refresh agent cache: ${err}`); } } private async resolveAgentUuid(ozonetelAgentId: string): Promise { if (!ozonetelAgentId) return null; const cached = this.agentUuidByOzonetelId.get(ozonetelAgentId); if (cached) return cached; // Cache miss — refresh once (handles late-provisioned agents like Ganesh) await this.refreshAgentCache(); return this.agentUuidByOzonetelId.get(ozonetelAgentId) ?? null; } /** * Record an agent activity event. Computes durationSec for END events * (RESUME, CALL_END, ACW_END) by pairing against the most recent START. * Non-fatal on failure — realtime SSE flow continues even if the * platform write errors. */ async persistAgentEvent(params: { ozonetelAgentId: string; eventType: AgentEventType; eventAt: string; // ISO pauseReason?: string | null; callId?: string | null; }): Promise { const agentUuid = await this.resolveAgentUuid(params.ozonetelAgentId); if (!agentUuid) { this.logger.warn(`[HISTORY] No Agent entity for ozonetelAgentId=${params.ozonetelAgentId} — skipping event persist`); return; } // Pair START → END events by category. CALL and ACW can overlap // (agent enters ACW before CALL_END arrives), so each lives in its // own slot. READY is a fallback close — supervisor.service already // maps 'release'/'IDLE' to RESUME / ACW_END when it knows the prior // state; READY only fires when that disambiguation failed, so it // clears anything dangling. let durationSec: number | null = null; const endSlot = this.slotForEnd(params.eventType); const startSlot = this.slotForStart(params.eventType); const eventMs = new Date(params.eventAt).getTime(); if (endSlot) { const pending = this.pendingStartsByAgent.get(params.ozonetelAgentId); const at = pending?.[endSlot]; if (at !== undefined) { durationSec = Math.max(0, Math.round((eventMs - at) / 1000)); delete pending![endSlot]; if (!pending!.pause && !pending!.call && !pending!.acw) { this.pendingStartsByAgent.delete(params.ozonetelAgentId); } } } else if (startSlot) { const existing = this.pendingStartsByAgent.get(params.ozonetelAgentId) ?? {}; existing[startSlot] = eventMs; this.pendingStartsByAgent.set(params.ozonetelAgentId, existing); } else if (params.eventType === 'READY' || params.eventType === 'LOGOUT') { // Defensive flush of any lingering slots on session boundaries. this.pendingStartsByAgent.delete(params.ozonetelAgentId); } const data: Record = { name: `${params.ozonetelAgentId} ${params.eventType}`, eventType: params.eventType, eventAt: params.eventAt, source: 'OZONETEL_SUBSCRIPTION', agentId: agentUuid, }; if (params.pauseReason) data.pauseReason = params.pauseReason; if (durationSec !== null) data.durationS = durationSec; if (params.callId) data.callId = params.callId; try { await this.platform.query( `mutation($data: AgentEventCreateInput!) { createAgentEvent(data: $data) { id } }`, { data }, ); } catch (err: any) { if (this.isEntityMissingError(err)) { if (!this.warnedEntityMissing) { this.logger.warn('[HISTORY] AgentEvent entity not synced on this workspace — skipping persistence'); this.warnedEntityMissing = true; } return; } this.logger.warn(`[HISTORY] createAgentEvent failed: ${err}`); } } private warnedEntityMissing = false; private isEntityMissingError(err: unknown): boolean { const msg = String((err as any)?.message ?? err ?? ''); return msg.includes('Cannot query field') || msg.includes('Unknown type') || msg.includes('AgentEventCreateInput') || msg.includes('AgentSessionCreateInput'); } private slotForStart(eventType: AgentEventType): PendingSlot | null { if (eventType === 'PAUSE') return 'pause'; if (eventType === 'CALL_START') return 'call'; if (eventType === 'ACW_START') return 'acw'; return null; } private slotForEnd(eventType: AgentEventType): PendingSlot | null { if (eventType === 'RESUME') return 'pause'; if (eventType === 'CALL_END') return 'call'; if (eventType === 'ACW_END') return 'acw'; return null; } /** * Patch a Call record with SLA / timing fields derived from Ozonetel * webhooks or post-call CDR. All fields optional — caller passes only * what it has. Used for response-time and ACW histograms on the * supervisor dashboard. */ async patchCallTiming(callId: string, fields: { assignedAt?: string; answeredAt?: string; responseTimeSec?: number; handlingTimeSec?: number; acwDurationSec?: number; holdDurationSec?: number; }): Promise { // Platform truncates `*Sec` → `*S` on field names. const fieldNameMap: Record = { responseTimeSec: 'responseTimeS', handlingTimeSec: 'handlingTimeS', acwDurationSec: 'acwDurationS', holdDurationSec: 'holdDurationS', }; const data: Record = {}; for (const [k, v] of Object.entries(fields)) { if (v !== undefined && v !== null) { data[fieldNameMap[k] ?? k] = v; } } if (Object.keys(data).length === 0) return; try { await this.platform.query( `mutation($id: UUID!, $data: CallUpdateInput!) { updateCall(id: $id, data: $data) { id } }`, { id: callId, data }, ); } catch (err) { this.logger.warn(`[HISTORY] updateCall timing failed (${callId}): ${err}`); } } /** * Aggregate AgentEvent rows into an AgentSession row per agent for the * given IST date. Called on a 15-minute interval; upserts by (agent, * sessionDate) so re-runs are safe. */ async rollupSessions(sessionDate: string): Promise { if (this.agentUuidByOzonetelId.size === 0) await this.refreshAgentCache(); const agentUuids = Array.from(new Set(this.agentUuidByOzonetelId.values())); if (agentUuids.length === 0) return; const startIso = `${sessionDate}T00:00:00+05:30`; const endIso = `${sessionDate}T23:59:59+05:30`; let succeeded = 0; for (const agentUuid of agentUuids) { try { const events = await this.fetchAgentEvents(agentUuid, startIso, endIso); const totals = this.aggregateEvents(events); await this.upsertSession(agentUuid, sessionDate, totals); succeeded++; } catch (err: any) { if (this.isEntityMissingError(err)) { if (!this.warnedEntityMissing) { this.logger.warn('[HISTORY] AgentEvent/AgentSession entities not synced on this workspace — skipping rollup'); this.warnedEntityMissing = true; } return; } this.logger.warn(`[HISTORY] Rollup failed for agent ${agentUuid}: ${err?.message ?? err}`); } } this.logger.log(`[HISTORY] Rollup complete for ${sessionDate} — ${succeeded}/${agentUuids.length} agents`); } // Platform strips the `Sec` suffix on numeric field names — schema uses // `durationS`, `loginDurationS`, etc. Map back to our canonical names // when reading. private async fetchAgentEvents(agentUuid: string, startIso: string, endIso: string): Promise> { const events: Array<{ eventType: AgentEventType; durationSec: number | null; eventAt: string }> = []; let after: string | null = null; for (let page = 0; page < 20; page++) { const cursorArg: string = after ? `, after: "${after}"` : ''; const data: any = await this.platform.query( `{ agentEvents(first: 200${cursorArg}, filter: { agentId: { eq: "${agentUuid}" }, eventAt: { gte: "${startIso}", lte: "${endIso}" } }, orderBy: [{ eventAt: AscNullsLast }]) { edges { node { eventType eventAt durationS } } pageInfo { hasNextPage endCursor } } }`, ); const edges = data?.agentEvents?.edges ?? []; for (const e of edges) { events.push({ eventType: e.node.eventType, eventAt: e.node.eventAt, durationSec: e.node.durationS ?? null, }); } const pageInfo: { hasNextPage?: boolean; endCursor?: string } = data?.agentEvents?.pageInfo ?? {}; if (!pageInfo.hasNextPage) break; after = pageInfo.endCursor ?? null; } return events; } private aggregateEvents(events: Array<{ eventType: AgentEventType; durationSec: number | null; eventAt: string }>) { let busyTimeSec = 0; let pauseTimeSec = 0; let wrapupTimeSec = 0; let handlingSum = 0; let handlingCount = 0; // Login duration: sum each LOGIN → (next LOGOUT on same day | now) span. // Ozonetel doesn't emit a LOGOUT if the agent just closes the tab, so // cap open sessions at the end of the rollup day. let loginDurationSec = 0; let openLoginAt: number | null = null; for (const e of events) { if (e.eventType === 'LOGIN') { openLoginAt = new Date(e.eventAt).getTime(); } else if (e.eventType === 'LOGOUT' && openLoginAt !== null) { loginDurationSec += Math.max(0, Math.round((new Date(e.eventAt).getTime() - openLoginAt) / 1000)); openLoginAt = null; } else if (e.eventType === 'CALL_END' && e.durationSec) { busyTimeSec += e.durationSec; handlingSum += e.durationSec; handlingCount++; } else if (e.eventType === 'RESUME' && e.durationSec) { pauseTimeSec += e.durationSec; } else if (e.eventType === 'ACW_END' && e.durationSec) { wrapupTimeSec += e.durationSec; } } if (openLoginAt !== null) { // Still logged in — count up to now (capped to the rollup day end). loginDurationSec += Math.max(0, Math.round((Date.now() - openLoginAt) / 1000)); } const avgHandlingTimeSec = handlingCount > 0 ? Math.round(handlingSum / handlingCount) : null; const idleTimeSec = Math.max(0, loginDurationSec - busyTimeSec - pauseTimeSec - wrapupTimeSec); return { loginDurationSec, busyTimeSec, pauseTimeSec, wrapupTimeSec, idleTimeSec, avgHandlingTimeSec }; } // AgentSession fields map: our `*Sec` → platform `*S`, `sessionDate` → `date`. private async upsertSession( agentUuid: string, sessionDate: string, totals: { loginDurationSec: number; busyTimeSec: number; pauseTimeSec: number; wrapupTimeSec: number; idleTimeSec: number; avgHandlingTimeSec: number | null }, ): Promise { const existing = await this.platform.query( `{ agentSessions(first: 1, filter: { agentId: { eq: "${agentUuid}" }, date: { eq: "${sessionDate}" } }) { edges { node { id } } } }`, ); const existingId = existing?.agentSessions?.edges?.[0]?.node?.id; const data: Record = { loginDurationS: totals.loginDurationSec, busyTimeS: totals.busyTimeSec, pauseTimeS: totals.pauseTimeSec, wrapupTimeS: totals.wrapupTimeSec, idleTimeS: totals.idleTimeSec, source: 'COMPUTED', lastSyncedAt: new Date().toISOString(), }; if (totals.avgHandlingTimeSec !== null) data.avgHandlingTimeS = totals.avgHandlingTimeSec; if (existingId) { await this.platform.query( `mutation($id: UUID!, $data: AgentSessionUpdateInput!) { updateAgentSession(id: $id, data: $data) { id } }`, { id: existingId, data }, ); } else { await this.platform.query( `mutation($data: AgentSessionCreateInput!) { createAgentSession(data: $data) { id } }`, { data: { ...data, name: `Session ${sessionDate}`, agentId: agentUuid, date: sessionDate } }, ); } } }