mirror of
https://dev.azure.com/globalhealthx/EMR/_git/helix-engage-server
synced 2026-05-18 20:08:19 +00:00
Audited all 23 sidecar create-mutation call sites; 7 were missing the top-level data.name field that the platform uses as record title: - caller-resolution.service.ts createPatient — full name from first/last - maint.controller.ts createPatient (backfill-lead-patient-links) — same - widget.service.ts createPatient (chat path + booking path) — full name - widget.service.ts createAppointment — "<Patient> — <date>" - worklist/missed-queue.service.ts createCall — "Missed — <phone>" - rules-engine/actions/escalate.action.ts createPerformanceAlert — "<agent>: <message> (<value>)" - supervisor/agent-history.service.ts createAgentEvent / createAgentSession Cosmetic only — the app fetches fullName/agentName for display, so end users never saw "Untitled". Fixes platform-side admin browsing. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
382 lines
17 KiB
TypeScript
382 lines
17 KiB
TypeScript
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
|
|
import { PlatformGraphqlService } from '../platform/platform-graphql.service';
|
|
|
|
// AgentEvent enum values (mirror of the SDK app's agent-event.object.ts).
|
|
// Ozonetel webhook actions → Helix event types.
|
|
export type AgentEventType =
|
|
| 'LOGIN'
|
|
| 'LOGOUT'
|
|
| 'READY'
|
|
| 'PAUSE'
|
|
| 'RESUME'
|
|
| 'CALL_START'
|
|
| 'CALL_END'
|
|
| 'ACW_START'
|
|
| 'ACW_END';
|
|
|
|
// Separate pending slots per event category. Call + ACW overlap (agent
|
|
// enters ACW before the CALL_END arrives), so a single shared slot would
|
|
// let ACW_START clobber pending CALL_START and produce 0-second call
|
|
// durations. Keep one slot per category so each END event pairs cleanly.
|
|
type PendingSlot = 'pause' | 'call' | 'acw';
|
|
type PendingStarts = {
|
|
pause?: number; // PAUSE eventAt ms
|
|
call?: number; // CALL_START eventAt ms
|
|
acw?: number; // ACW_START eventAt ms
|
|
};
|
|
|
|
/**
|
|
* Persists agent activity and per-call timing into the platform entities
|
|
* we added in Phase 1 (AgentEvent, Call SLA fields). Reads AgentSession
|
|
* later via the rollup job.
|
|
*
|
|
* Called from:
|
|
* - supervisor.service.handleAgentEvent → persistAgentEvent()
|
|
* - supervisor.service.handleCallEvent → patchCallTiming()
|
|
* - ozonetel-agent.controller dispose flow → patchCallTiming()
|
|
*/
|
|
@Injectable()
|
|
export class AgentHistoryService implements OnModuleInit {
|
|
private readonly logger = new Logger(AgentHistoryService.name);
|
|
|
|
// ozonetelAgentId → Agent entity UUID. Loaded at startup.
|
|
private readonly agentUuidByOzonetelId = new Map<string, string>();
|
|
|
|
// agentId → map of pending start events per category, used to compute
|
|
// durationSec on the matching END event.
|
|
private readonly pendingStartsByAgent = new Map<string, PendingStarts>();
|
|
|
|
constructor(private readonly platform: PlatformGraphqlService) {}
|
|
|
|
private rollupTimer: NodeJS.Timeout | null = null;
|
|
|
|
async onModuleInit() {
|
|
await this.refreshAgentCache();
|
|
// Roll up today's sessions every 15 minutes. Rollup is idempotent
|
|
// (upsert by agent+date), so missing a tick is safe — the next tick
|
|
// recomputes from AgentEvent history. Written with setInterval because
|
|
// @nestjs/schedule isn't installed in this sidecar.
|
|
this.rollupTimer = setInterval(() => {
|
|
this.rollupSessions(this.currentSessionDate()).catch((err) => {
|
|
this.logger.warn(`[HISTORY] Rollup tick failed: ${err?.message ?? err}`);
|
|
});
|
|
}, 15 * 60 * 1000);
|
|
// Kick off one immediately so the dashboard has data on boot.
|
|
this.rollupSessions(this.currentSessionDate()).catch(() => {});
|
|
}
|
|
|
|
onModuleDestroy() {
|
|
if (this.rollupTimer) clearInterval(this.rollupTimer);
|
|
}
|
|
|
|
// IST day boundary — agents work in IST, so the rollup is by IST date.
|
|
private currentSessionDate(): string {
|
|
const now = new Date();
|
|
const ist = new Date(now.getTime() + 5.5 * 60 * 60 * 1000);
|
|
return ist.toISOString().slice(0, 10);
|
|
}
|
|
|
|
private async refreshAgentCache(): Promise<void> {
|
|
try {
|
|
const data = await this.platform.query<any>(
|
|
`{ agents(first: 50) { edges { node { id ozonetelAgentId } } } }`,
|
|
);
|
|
const edges = data?.agents?.edges ?? [];
|
|
this.agentUuidByOzonetelId.clear();
|
|
for (const edge of edges) {
|
|
const n = edge.node;
|
|
if (n.ozonetelAgentId) {
|
|
this.agentUuidByOzonetelId.set(n.ozonetelAgentId, n.id);
|
|
}
|
|
}
|
|
this.logger.log(`[HISTORY] Loaded ${this.agentUuidByOzonetelId.size} agent UUIDs into cache`);
|
|
} catch (err) {
|
|
this.logger.warn(`[HISTORY] Failed to refresh agent cache: ${err}`);
|
|
}
|
|
}
|
|
|
|
private async resolveAgentUuid(ozonetelAgentId: string): Promise<string | null> {
|
|
if (!ozonetelAgentId) return null;
|
|
const cached = this.agentUuidByOzonetelId.get(ozonetelAgentId);
|
|
if (cached) return cached;
|
|
// Cache miss — refresh once (handles late-provisioned agents like Ganesh)
|
|
await this.refreshAgentCache();
|
|
return this.agentUuidByOzonetelId.get(ozonetelAgentId) ?? null;
|
|
}
|
|
|
|
/**
|
|
* Record an agent activity event. Computes durationSec for END events
|
|
* (RESUME, CALL_END, ACW_END) by pairing against the most recent START.
|
|
* Non-fatal on failure — realtime SSE flow continues even if the
|
|
* platform write errors.
|
|
*/
|
|
async persistAgentEvent(params: {
|
|
ozonetelAgentId: string;
|
|
eventType: AgentEventType;
|
|
eventAt: string; // ISO
|
|
pauseReason?: string | null;
|
|
callId?: string | null;
|
|
}): Promise<void> {
|
|
const agentUuid = await this.resolveAgentUuid(params.ozonetelAgentId);
|
|
if (!agentUuid) {
|
|
this.logger.warn(`[HISTORY] No Agent entity for ozonetelAgentId=${params.ozonetelAgentId} — skipping event persist`);
|
|
return;
|
|
}
|
|
|
|
// Pair START → END events by category. CALL and ACW can overlap
|
|
// (agent enters ACW before CALL_END arrives), so each lives in its
|
|
// own slot. READY is a fallback close — supervisor.service already
|
|
// maps 'release'/'IDLE' to RESUME / ACW_END when it knows the prior
|
|
// state; READY only fires when that disambiguation failed, so it
|
|
// clears anything dangling.
|
|
let durationSec: number | null = null;
|
|
const endSlot = this.slotForEnd(params.eventType);
|
|
const startSlot = this.slotForStart(params.eventType);
|
|
const eventMs = new Date(params.eventAt).getTime();
|
|
|
|
if (endSlot) {
|
|
const pending = this.pendingStartsByAgent.get(params.ozonetelAgentId);
|
|
const at = pending?.[endSlot];
|
|
if (at !== undefined) {
|
|
durationSec = Math.max(0, Math.round((eventMs - at) / 1000));
|
|
delete pending![endSlot];
|
|
if (!pending!.pause && !pending!.call && !pending!.acw) {
|
|
this.pendingStartsByAgent.delete(params.ozonetelAgentId);
|
|
}
|
|
}
|
|
} else if (startSlot) {
|
|
const existing = this.pendingStartsByAgent.get(params.ozonetelAgentId) ?? {};
|
|
existing[startSlot] = eventMs;
|
|
this.pendingStartsByAgent.set(params.ozonetelAgentId, existing);
|
|
} else if (params.eventType === 'READY' || params.eventType === 'LOGOUT') {
|
|
// Defensive flush of any lingering slots on session boundaries.
|
|
this.pendingStartsByAgent.delete(params.ozonetelAgentId);
|
|
}
|
|
|
|
const data: Record<string, any> = {
|
|
name: `${params.ozonetelAgentId} ${params.eventType}`,
|
|
eventType: params.eventType,
|
|
eventAt: params.eventAt,
|
|
source: 'OZONETEL_SUBSCRIPTION',
|
|
agentId: agentUuid,
|
|
};
|
|
if (params.pauseReason) data.pauseReason = params.pauseReason;
|
|
if (durationSec !== null) data.durationS = durationSec;
|
|
if (params.callId) data.callId = params.callId;
|
|
|
|
try {
|
|
await this.platform.query<any>(
|
|
`mutation($data: AgentEventCreateInput!) { createAgentEvent(data: $data) { id } }`,
|
|
{ data },
|
|
);
|
|
} catch (err: any) {
|
|
if (this.isEntityMissingError(err)) {
|
|
if (!this.warnedEntityMissing) {
|
|
this.logger.warn('[HISTORY] AgentEvent entity not synced on this workspace — skipping persistence');
|
|
this.warnedEntityMissing = true;
|
|
}
|
|
return;
|
|
}
|
|
this.logger.warn(`[HISTORY] createAgentEvent failed: ${err}`);
|
|
}
|
|
}
|
|
|
|
private warnedEntityMissing = false;
|
|
|
|
private isEntityMissingError(err: unknown): boolean {
|
|
const msg = String((err as any)?.message ?? err ?? '');
|
|
return msg.includes('Cannot query field') || msg.includes('Unknown type')
|
|
|| msg.includes('AgentEventCreateInput') || msg.includes('AgentSessionCreateInput');
|
|
}
|
|
|
|
private slotForStart(eventType: AgentEventType): PendingSlot | null {
|
|
if (eventType === 'PAUSE') return 'pause';
|
|
if (eventType === 'CALL_START') return 'call';
|
|
if (eventType === 'ACW_START') return 'acw';
|
|
return null;
|
|
}
|
|
|
|
private slotForEnd(eventType: AgentEventType): PendingSlot | null {
|
|
if (eventType === 'RESUME') return 'pause';
|
|
if (eventType === 'CALL_END') return 'call';
|
|
if (eventType === 'ACW_END') return 'acw';
|
|
return null;
|
|
}
|
|
|
|
/**
|
|
* Patch a Call record with SLA / timing fields derived from Ozonetel
|
|
* webhooks or post-call CDR. All fields optional — caller passes only
|
|
* what it has. Used for response-time and ACW histograms on the
|
|
* supervisor dashboard.
|
|
*/
|
|
async patchCallTiming(callId: string, fields: {
|
|
assignedAt?: string;
|
|
answeredAt?: string;
|
|
responseTimeSec?: number;
|
|
handlingTimeSec?: number;
|
|
acwDurationSec?: number;
|
|
holdDurationSec?: number;
|
|
}): Promise<void> {
|
|
// Platform truncates `*Sec` → `*S` on field names.
|
|
const fieldNameMap: Record<string, string> = {
|
|
responseTimeSec: 'responseTimeS',
|
|
handlingTimeSec: 'handlingTimeS',
|
|
acwDurationSec: 'acwDurationS',
|
|
holdDurationSec: 'holdDurationS',
|
|
};
|
|
const data: Record<string, any> = {};
|
|
for (const [k, v] of Object.entries(fields)) {
|
|
if (v !== undefined && v !== null) {
|
|
data[fieldNameMap[k] ?? k] = v;
|
|
}
|
|
}
|
|
if (Object.keys(data).length === 0) return;
|
|
try {
|
|
await this.platform.query<any>(
|
|
`mutation($id: UUID!, $data: CallUpdateInput!) { updateCall(id: $id, data: $data) { id } }`,
|
|
{ id: callId, data },
|
|
);
|
|
} catch (err) {
|
|
this.logger.warn(`[HISTORY] updateCall timing failed (${callId}): ${err}`);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Aggregate AgentEvent rows into an AgentSession row per agent for the
|
|
* given IST date. Called on a 15-minute interval; upserts by (agent,
|
|
* sessionDate) so re-runs are safe.
|
|
*/
|
|
async rollupSessions(sessionDate: string): Promise<void> {
|
|
if (this.agentUuidByOzonetelId.size === 0) await this.refreshAgentCache();
|
|
const agentUuids = Array.from(new Set(this.agentUuidByOzonetelId.values()));
|
|
if (agentUuids.length === 0) return;
|
|
|
|
const startIso = `${sessionDate}T00:00:00+05:30`;
|
|
const endIso = `${sessionDate}T23:59:59+05:30`;
|
|
|
|
let succeeded = 0;
|
|
for (const agentUuid of agentUuids) {
|
|
try {
|
|
const events = await this.fetchAgentEvents(agentUuid, startIso, endIso);
|
|
const totals = this.aggregateEvents(events);
|
|
await this.upsertSession(agentUuid, sessionDate, totals);
|
|
succeeded++;
|
|
} catch (err: any) {
|
|
if (this.isEntityMissingError(err)) {
|
|
if (!this.warnedEntityMissing) {
|
|
this.logger.warn('[HISTORY] AgentEvent/AgentSession entities not synced on this workspace — skipping rollup');
|
|
this.warnedEntityMissing = true;
|
|
}
|
|
return;
|
|
}
|
|
this.logger.warn(`[HISTORY] Rollup failed for agent ${agentUuid}: ${err?.message ?? err}`);
|
|
}
|
|
}
|
|
this.logger.log(`[HISTORY] Rollup complete for ${sessionDate} — ${succeeded}/${agentUuids.length} agents`);
|
|
}
|
|
|
|
// Platform strips the `Sec` suffix on numeric field names — schema uses
|
|
// `durationS`, `loginDurationS`, etc. Map back to our canonical names
|
|
// when reading.
|
|
private async fetchAgentEvents(agentUuid: string, startIso: string, endIso: string): Promise<Array<{ eventType: AgentEventType; durationSec: number | null; eventAt: string }>> {
|
|
const events: Array<{ eventType: AgentEventType; durationSec: number | null; eventAt: string }> = [];
|
|
let after: string | null = null;
|
|
for (let page = 0; page < 20; page++) {
|
|
const cursorArg: string = after ? `, after: "${after}"` : '';
|
|
const data: any = await this.platform.query<any>(
|
|
`{ agentEvents(first: 200${cursorArg}, filter: { agentId: { eq: "${agentUuid}" }, eventAt: { gte: "${startIso}", lte: "${endIso}" } }, orderBy: [{ eventAt: AscNullsLast }]) {
|
|
edges { node { eventType eventAt durationS } }
|
|
pageInfo { hasNextPage endCursor }
|
|
} }`,
|
|
);
|
|
const edges = data?.agentEvents?.edges ?? [];
|
|
for (const e of edges) {
|
|
events.push({
|
|
eventType: e.node.eventType,
|
|
eventAt: e.node.eventAt,
|
|
durationSec: e.node.durationS ?? null,
|
|
});
|
|
}
|
|
const pageInfo: { hasNextPage?: boolean; endCursor?: string } = data?.agentEvents?.pageInfo ?? {};
|
|
if (!pageInfo.hasNextPage) break;
|
|
after = pageInfo.endCursor ?? null;
|
|
}
|
|
return events;
|
|
}
|
|
|
|
private aggregateEvents(events: Array<{ eventType: AgentEventType; durationSec: number | null; eventAt: string }>) {
|
|
let busyTimeSec = 0;
|
|
let pauseTimeSec = 0;
|
|
let wrapupTimeSec = 0;
|
|
let handlingSum = 0;
|
|
let handlingCount = 0;
|
|
|
|
// Login duration: sum each LOGIN → (next LOGOUT on same day | now) span.
|
|
// Ozonetel doesn't emit a LOGOUT if the agent just closes the tab, so
|
|
// cap open sessions at the end of the rollup day.
|
|
let loginDurationSec = 0;
|
|
let openLoginAt: number | null = null;
|
|
|
|
for (const e of events) {
|
|
if (e.eventType === 'LOGIN') {
|
|
openLoginAt = new Date(e.eventAt).getTime();
|
|
} else if (e.eventType === 'LOGOUT' && openLoginAt !== null) {
|
|
loginDurationSec += Math.max(0, Math.round((new Date(e.eventAt).getTime() - openLoginAt) / 1000));
|
|
openLoginAt = null;
|
|
} else if (e.eventType === 'CALL_END' && e.durationSec) {
|
|
busyTimeSec += e.durationSec;
|
|
handlingSum += e.durationSec;
|
|
handlingCount++;
|
|
} else if (e.eventType === 'RESUME' && e.durationSec) {
|
|
pauseTimeSec += e.durationSec;
|
|
} else if (e.eventType === 'ACW_END' && e.durationSec) {
|
|
wrapupTimeSec += e.durationSec;
|
|
}
|
|
}
|
|
if (openLoginAt !== null) {
|
|
// Still logged in — count up to now (capped to the rollup day end).
|
|
loginDurationSec += Math.max(0, Math.round((Date.now() - openLoginAt) / 1000));
|
|
}
|
|
|
|
const avgHandlingTimeSec = handlingCount > 0 ? Math.round(handlingSum / handlingCount) : null;
|
|
const idleTimeSec = Math.max(0, loginDurationSec - busyTimeSec - pauseTimeSec - wrapupTimeSec);
|
|
|
|
return { loginDurationSec, busyTimeSec, pauseTimeSec, wrapupTimeSec, idleTimeSec, avgHandlingTimeSec };
|
|
}
|
|
|
|
// AgentSession fields map: our `*Sec` → platform `*S`, `sessionDate` → `date`.
|
|
private async upsertSession(
|
|
agentUuid: string,
|
|
sessionDate: string,
|
|
totals: { loginDurationSec: number; busyTimeSec: number; pauseTimeSec: number; wrapupTimeSec: number; idleTimeSec: number; avgHandlingTimeSec: number | null },
|
|
): Promise<void> {
|
|
const existing = await this.platform.query<any>(
|
|
`{ agentSessions(first: 1, filter: { agentId: { eq: "${agentUuid}" }, date: { eq: "${sessionDate}" } }) { edges { node { id } } } }`,
|
|
);
|
|
const existingId = existing?.agentSessions?.edges?.[0]?.node?.id;
|
|
|
|
const data: Record<string, any> = {
|
|
loginDurationS: totals.loginDurationSec,
|
|
busyTimeS: totals.busyTimeSec,
|
|
pauseTimeS: totals.pauseTimeSec,
|
|
wrapupTimeS: totals.wrapupTimeSec,
|
|
idleTimeS: totals.idleTimeSec,
|
|
source: 'COMPUTED',
|
|
lastSyncedAt: new Date().toISOString(),
|
|
};
|
|
if (totals.avgHandlingTimeSec !== null) data.avgHandlingTimeS = totals.avgHandlingTimeSec;
|
|
|
|
if (existingId) {
|
|
await this.platform.query<any>(
|
|
`mutation($id: UUID!, $data: AgentSessionUpdateInput!) { updateAgentSession(id: $id, data: $data) { id } }`,
|
|
{ id: existingId, data },
|
|
);
|
|
} else {
|
|
await this.platform.query<any>(
|
|
`mutation($data: AgentSessionCreateInput!) { createAgentSession(data: $data) { id } }`,
|
|
{ data: { ...data, name: `Session ${sessionDate}`, agentId: agentUuid, date: sessionDate } },
|
|
);
|
|
}
|
|
}
|
|
}
|