mirror of
https://dev.azure.com/globalhealthx/EMR/_git/helix-engage-server
synced 2026-05-18 20:08:19 +00:00
Compare commits
50 Commits
feature/om
...
1dd8413297
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1dd8413297 | ||
|
|
7d8424b446 | ||
|
|
55b8680923 | ||
|
|
973614749b | ||
| 6adb3985cb | |||
| 67c41f4783 | |||
| d459d6469a | |||
| 60d2329dd8 | |||
| f375e7736c | |||
| 96977e84a1 | |||
| 00303df95b | |||
| 34e053204f | |||
| 98f5bc0347 | |||
| 048545317d | |||
| 8dcfa5a72f | |||
| 5b40f49b65 | |||
| fb616d47ee | |||
| 6fd17acf78 | |||
| 846c5f4c9b | |||
| 9472f83cd8 | |||
| 6de1989536 | |||
| 2acba59963 | |||
| 4eb8cb80b2 | |||
| fbe782b5ac | |||
| b6b597fdda | |||
| a4ff052fef | |||
| 5969441868 | |||
| 01348123e6 | |||
| d97d73dd1a | |||
| 7b178f9dc7 | |||
| 3d790e51dc | |||
| 1c3e42ad7c | |||
| ea60787da0 | |||
| c23792496b | |||
| 27a3fbcfed | |||
| 0f5bd7d61a | |||
| f1313f0e2f | |||
| 44f1ec36e1 | |||
| 4bd08a9b02 | |||
| 0248c4cad1 | |||
| be505b8d1f | |||
| dbefa9675a | |||
| 9dc02e107a | |||
| c807cf737f | |||
| 96d0c32000 | |||
| 9665500b63 | |||
| 9f5935e417 | |||
| 898ff65951 | |||
| 7717536622 | |||
| 33dc8b5669 |
24
.woodpecker.yml
Normal file
24
.woodpecker.yml
Normal file
@@ -0,0 +1,24 @@
|
||||
# Woodpecker CI pipeline for Helix Engage Server (sidecar)
|
||||
|
||||
when:
|
||||
- event: [push, manual]
|
||||
|
||||
steps:
|
||||
unit-tests:
|
||||
image: node:20
|
||||
commands:
|
||||
- npm ci
|
||||
- npm test -- --ci --forceExit
|
||||
|
||||
notify-teams:
|
||||
image: curlimages/curl
|
||||
environment:
|
||||
TEAMS_WEBHOOK:
|
||||
from_secret: teams_webhook
|
||||
commands:
|
||||
- >
|
||||
curl -s -X POST "$TEAMS_WEBHOOK"
|
||||
-H "Content-Type:application/json"
|
||||
-d '{"type":"message","attachments":[{"contentType":"application/vnd.microsoft.card.adaptive","content":{"type":"AdaptiveCard","version":"1.4","body":[{"type":"TextBlock","size":"Medium","weight":"Bolder","text":"Helix Engage Server — Build #'"$CI_PIPELINE_NUMBER"'"},{"type":"TextBlock","text":"Branch: '"$CI_COMMIT_BRANCH"'","wrap":true},{"type":"TextBlock","text":"'"$(echo $CI_COMMIT_MESSAGE | head -c 80)"'","wrap":true}],"actions":[{"type":"Action.OpenUrl","title":"View Pipeline","url":"https://operations.healix360.net/repos/2/pipeline/'"$CI_PIPELINE_NUMBER"'"}]}}]}'
|
||||
when:
|
||||
- status: [success, failure]
|
||||
@@ -5,6 +5,7 @@ import { generateText, streamText, tool, stepCountIs } from 'ai';
|
||||
import type { LanguageModel } from 'ai';
|
||||
import { z } from 'zod';
|
||||
import { PlatformGraphqlService } from '../platform/platform-graphql.service';
|
||||
import { CallerResolutionService } from '../caller/caller-resolution.service';
|
||||
import { createAiModel, isAiConfigured } from './ai-provider';
|
||||
import { AiConfigService } from '../config/ai-config.service';
|
||||
import { DOCTOR_VISIT_SLOTS_FRAGMENT, normalizeDoctors } from '../shared/doctor-utils';
|
||||
@@ -26,6 +27,7 @@ export class AiChatController {
|
||||
private config: ConfigService,
|
||||
private platform: PlatformGraphqlService,
|
||||
private aiConfig: AiConfigService,
|
||||
private caller: CallerResolutionService,
|
||||
) {
|
||||
const cfg = aiConfig.getConfig();
|
||||
this.aiModel = createAiModel({
|
||||
@@ -271,7 +273,7 @@ export class AiChatController {
|
||||
inputSchema: z.object({}),
|
||||
execute: async () => {
|
||||
const data = await platformService.queryWithAuth<any>(
|
||||
`{ calls(first: 100, filter: { callStatus: { eq: MISSED }, callbackstatus: { eq: PENDING_CALLBACK } }) { edges { node { id callerNumber { primaryPhoneNumber } startedAt agentName sla } } } }`,
|
||||
`{ calls(first: 100, filter: { callStatus: { eq: MISSED }, callbackStatus: { eq: PENDING_CALLBACK } }) { edges { node { id callerNumber { primaryPhoneNumber } startedAt agentName sla } } } }`,
|
||||
undefined, auth,
|
||||
);
|
||||
const breached = data.calls.edges
|
||||
@@ -380,17 +382,18 @@ export class AiChatController {
|
||||
}),
|
||||
|
||||
book_appointment: tool({
|
||||
description: 'Book an appointment for a patient. Collect patient name, phone, department, doctor, preferred date/time, and reason before calling this.',
|
||||
description: 'Book an appointment for a patient. Collect patient name, phone, department, doctor, clinic/branch, preferred date/time, and reason before calling this.',
|
||||
inputSchema: z.object({
|
||||
patientName: z.string().describe('Full name of the patient'),
|
||||
phoneNumber: z.string().describe('Patient phone number'),
|
||||
department: z.string().describe('Department for the appointment'),
|
||||
doctorName: z.string().describe('Doctor name'),
|
||||
clinicId: z.string().optional().describe('Clinic/branch ID — get from lookup_doctor results'),
|
||||
scheduledAt: z.string().describe('Date and time in ISO format (e.g. 2026-04-01T10:00:00)'),
|
||||
reason: z.string().describe('Reason for visit'),
|
||||
}),
|
||||
execute: async ({ patientName, phoneNumber, department, doctorName, scheduledAt, reason }) => {
|
||||
this.logger.log(`[TOOL] book_appointment: ${patientName} | ${phoneNumber} | ${department} | ${doctorName} | ${scheduledAt}`);
|
||||
execute: async ({ patientName, phoneNumber, department, doctorName, clinicId, scheduledAt, reason }) => {
|
||||
this.logger.log(`[TOOL] book_appointment: ${patientName} | ${phoneNumber} | ${department} | ${doctorName} | clinic=${clinicId ?? 'none'} | ${scheduledAt}`);
|
||||
try {
|
||||
const result = await platformService.queryWithAuth<any>(
|
||||
`mutation($data: AppointmentCreateInput!) { createAppointment(data: $data) { id } }`,
|
||||
@@ -402,6 +405,7 @@ export class AiChatController {
|
||||
doctorName,
|
||||
department,
|
||||
reasonForVisit: reason,
|
||||
...(clinicId ? { clinicId } : {}),
|
||||
},
|
||||
},
|
||||
auth,
|
||||
@@ -429,16 +433,60 @@ export class AiChatController {
|
||||
this.logger.log(`[TOOL] create_lead: ${name} | ${phoneNumber} | ${interest}`);
|
||||
try {
|
||||
const cleanPhone = phoneNumber.replace(/[^0-9]/g, '').slice(-10);
|
||||
const result = await platformService.queryWithAuth<any>(
|
||||
`mutation($data: LeadCreateInput!) { createLead(data: $data) { id } }`,
|
||||
const resolved = await this.caller.resolve(cleanPhone, auth);
|
||||
const firstName = name.split(' ')[0];
|
||||
const lastName = name.split(' ').slice(1).join(' ') || '';
|
||||
|
||||
if (resolved.isNew) {
|
||||
// Net-new caller — create Patient + Lead with
|
||||
// the AI-collected name from the conversation.
|
||||
let patientId: string | undefined;
|
||||
try {
|
||||
const p = await platformService.queryWithAuth<any>(
|
||||
`mutation($data: PatientCreateInput!) { createPatient(data: $data) { id } }`,
|
||||
{
|
||||
data: {
|
||||
fullName: { firstName, lastName },
|
||||
phones: { primaryPhoneNumber: `+91${cleanPhone}` },
|
||||
patientType: 'NEW',
|
||||
},
|
||||
},
|
||||
auth,
|
||||
);
|
||||
patientId = p?.createPatient?.id;
|
||||
} catch (err: any) {
|
||||
this.logger.warn(`[TOOL] create_lead patient create failed: ${err.message}`);
|
||||
}
|
||||
const created = await platformService.queryWithAuth<any>(
|
||||
`mutation($data: LeadCreateInput!) { createLead(data: $data) { id } }`,
|
||||
{
|
||||
data: {
|
||||
name: `AI Enquiry — ${name}`,
|
||||
contactName: { firstName, lastName },
|
||||
contactPhone: { primaryPhoneNumber: `+91${cleanPhone}` },
|
||||
source: 'PHONE',
|
||||
status: 'NEW',
|
||||
interestedService: interest,
|
||||
...(patientId ? { patientId } : {}),
|
||||
},
|
||||
},
|
||||
auth,
|
||||
);
|
||||
const id = created?.createLead?.id;
|
||||
if (id) {
|
||||
return { created: true, leadId: id, message: `Lead created for ${name}. Our team will follow up on ${phoneNumber}.` };
|
||||
}
|
||||
return { created: false, message: 'Lead creation failed.' };
|
||||
}
|
||||
|
||||
// Existing record — update with AI-collected name.
|
||||
await platformService.queryWithAuth<any>(
|
||||
`mutation($id: UUID!, $data: LeadUpdateInput!) { updateLead(id: $id, data: $data) { id } }`,
|
||||
{
|
||||
id: resolved.leadId,
|
||||
data: {
|
||||
name: `AI Enquiry — ${name}`,
|
||||
contactName: {
|
||||
firstName: name.split(' ')[0],
|
||||
lastName: name.split(' ').slice(1).join(' ') || '',
|
||||
},
|
||||
contactPhone: { primaryPhoneNumber: `+91${cleanPhone}` },
|
||||
contactName: { firstName, lastName },
|
||||
source: 'PHONE',
|
||||
status: 'NEW',
|
||||
interestedService: interest,
|
||||
@@ -446,11 +494,14 @@ export class AiChatController {
|
||||
},
|
||||
auth,
|
||||
);
|
||||
const id = result?.createLead?.id;
|
||||
if (id) {
|
||||
return { created: true, leadId: id, message: `Lead created for ${name}. Our team will follow up on ${phoneNumber}.` };
|
||||
if (resolved.patientId) {
|
||||
await platformService.queryWithAuth<any>(
|
||||
`mutation($id: UUID!, $data: PatientUpdateInput!) { updatePatient(id: $id, data: $data) { id } }`,
|
||||
{ id: resolved.patientId, data: { fullName: { firstName, lastName } } },
|
||||
auth,
|
||||
).catch(() => {});
|
||||
}
|
||||
return { created: false, message: 'Lead creation failed.' };
|
||||
return { created: true, leadId: resolved.leadId, message: `Lead updated for ${name}. Our team will follow up on ${phoneNumber}.` };
|
||||
} catch (err: any) {
|
||||
this.logger.error(`[TOOL] create_lead failed: ${err.message}`);
|
||||
return { created: false, message: `Failed: ${err.message}` };
|
||||
@@ -516,16 +567,23 @@ export class AiChatController {
|
||||
`{ clinics(first: 20) { edges { node {
|
||||
id name clinicName
|
||||
addressCustom { addressStreet1 addressCity addressState addressPostcode }
|
||||
weekdayHours saturdayHours sundayHours
|
||||
openMonday openTuesday openWednesday openThursday openFriday openSaturday openSunday
|
||||
opensAt closesAt
|
||||
status walkInAllowed onlineBooking
|
||||
cancellationWindowHours arriveEarlyMin requiredDocuments
|
||||
cancellationWindowHours arriveEarlyMin
|
||||
acceptsCash acceptsCard acceptsUpi
|
||||
requiredDocuments { edges { node { documentType notes } } }
|
||||
} } } }`,
|
||||
undefined, auth,
|
||||
);
|
||||
const clinics = clinicData.clinics.edges.map((e: any) => e.node);
|
||||
if (clinics.length) {
|
||||
sections.push('## CLINICS & TIMINGS');
|
||||
const dayFlags: Array<[string, string]> = [
|
||||
['Mon', 'openMonday'], ['Tue', 'openTuesday'], ['Wed', 'openWednesday'],
|
||||
['Thu', 'openThursday'], ['Fri', 'openFriday'],
|
||||
['Sat', 'openSaturday'], ['Sun', 'openSunday'],
|
||||
];
|
||||
for (const c of clinics) {
|
||||
const name = c.clinicName ?? c.name;
|
||||
const addr = c.addressCustom
|
||||
@@ -533,9 +591,15 @@ export class AiChatController {
|
||||
: '';
|
||||
sections.push(`### ${name}`);
|
||||
if (addr) sections.push(` Address: ${addr}`);
|
||||
if (c.weekdayHours) sections.push(` Mon–Fri: ${c.weekdayHours}`);
|
||||
if (c.saturdayHours) sections.push(` Saturday: ${c.saturdayHours}`);
|
||||
sections.push(` Sunday: ${c.sundayHours ?? 'Closed'}`);
|
||||
const openDays = dayFlags.filter(([, flag]) => c[flag]).map(([label]) => label);
|
||||
if (openDays.length) {
|
||||
const hours = c.opensAt && c.closesAt ? ` ${c.opensAt}–${c.closesAt}` : '';
|
||||
sections.push(` Open: ${openDays.join(', ')}${hours}`);
|
||||
}
|
||||
const closedDays = dayFlags.filter(([, flag]) => !c[flag]).map(([label]) => label);
|
||||
if (closedDays.length) {
|
||||
sections.push(` Closed: ${closedDays.join(', ')}`);
|
||||
}
|
||||
if (c.walkInAllowed) sections.push(` Walk-ins: Accepted`);
|
||||
}
|
||||
|
||||
@@ -543,7 +607,8 @@ export class AiChatController {
|
||||
const rules: string[] = [];
|
||||
if (rulesClinic.cancellationWindowHours) rules.push(`Free cancellation up to ${rulesClinic.cancellationWindowHours}h before`);
|
||||
if (rulesClinic.arriveEarlyMin) rules.push(`Arrive ${rulesClinic.arriveEarlyMin}min early`);
|
||||
if (rulesClinic.requiredDocuments) rules.push(`First-time patients bring ${rulesClinic.requiredDocuments}`);
|
||||
const docs = rulesClinic.requiredDocuments?.edges?.map((e: any) => e.node?.documentType).filter(Boolean) ?? [];
|
||||
if (docs.length) rules.push(`First-time patients bring: ${docs.join(', ')}`);
|
||||
if (rulesClinic.walkInAllowed) rules.push('Walk-ins accepted');
|
||||
if (rulesClinic.onlineBooking) rules.push('Online booking available');
|
||||
if (rules.length) {
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
import { Module } from '@nestjs/common';
|
||||
import { Module, forwardRef } from '@nestjs/common';
|
||||
import { PlatformModule } from '../platform/platform.module';
|
||||
import { AiEnrichmentService } from './ai-enrichment.service';
|
||||
import { AiChatController } from './ai-chat.controller';
|
||||
import { CallerResolutionModule } from '../caller/caller-resolution.module';
|
||||
|
||||
@Module({
|
||||
imports: [PlatformModule],
|
||||
imports: [PlatformModule, forwardRef(() => CallerResolutionModule)],
|
||||
controllers: [AiChatController],
|
||||
providers: [AiEnrichmentService],
|
||||
exports: [AiEnrichmentService],
|
||||
|
||||
@@ -21,6 +21,9 @@ import { RulesEngineModule } from './rules-engine/rules-engine.module';
|
||||
import { ConfigThemeModule } from './config/config-theme.module';
|
||||
import { WidgetModule } from './widget/widget.module';
|
||||
import { TeamModule } from './team/team.module';
|
||||
import { MasterdataModule } from './masterdata/masterdata.module';
|
||||
import { LeadsModule } from './leads/leads.module';
|
||||
import { TelephonyRegistrationService } from './telephony-registration.service';
|
||||
|
||||
@Module({
|
||||
imports: [
|
||||
@@ -48,6 +51,9 @@ import { TeamModule } from './team/team.module';
|
||||
ConfigThemeModule,
|
||||
WidgetModule,
|
||||
TeamModule,
|
||||
MasterdataModule,
|
||||
LeadsModule,
|
||||
],
|
||||
providers: [TelephonyRegistrationService],
|
||||
})
|
||||
export class AppModule {}
|
||||
|
||||
@@ -107,11 +107,9 @@ export class AuthController {
|
||||
|
||||
// Determine app role from platform roles
|
||||
let appRole = 'executive'; // default
|
||||
if (roleLabels.includes('HelixEngage Manager')) {
|
||||
if (roleLabels.includes('HelixEngage Manager') || roleLabels.includes('HelixEngage Supervisor')) {
|
||||
appRole = 'admin';
|
||||
} else if (roleLabels.includes('HelixEngage User')) {
|
||||
// Distinguish CC agent from executive by email convention or config
|
||||
// For now, emails containing 'cc' map to cc-agent
|
||||
const email = workspaceMember?.userEmail ?? body.email;
|
||||
appRole = email.includes('cc') ? 'cc-agent' : 'executive';
|
||||
}
|
||||
|
||||
@@ -55,6 +55,26 @@ export class SessionService {
|
||||
await this.redis.del(this.key(agentId));
|
||||
}
|
||||
|
||||
// Enumerate every active session lock so the maint UI can show which
|
||||
// agentIds are currently held (and by whom) vs free. Uses SCAN, not
|
||||
// KEYS, to avoid blocking Redis on workspaces with many keys.
|
||||
async listLockedSessions(): Promise<Array<{ agentId: string; memberId: string; ip: string; lockedAt: string }>> {
|
||||
const out: Array<{ agentId: string; memberId: string; ip: string; lockedAt: string }> = [];
|
||||
const stream = this.redis.scanStream({ match: 'agent:session:*', count: 100 });
|
||||
const keys: string[] = [];
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
stream.on('data', (chunk: string[]) => keys.push(...chunk));
|
||||
stream.on('end', resolve);
|
||||
stream.on('error', reject);
|
||||
});
|
||||
for (const key of keys) {
|
||||
const agentId = key.slice('agent:session:'.length);
|
||||
const session = await this.getSession(agentId);
|
||||
if (session) out.push({ agentId, ...session });
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
// Generic cache operations for any module
|
||||
async getCache(key: string): Promise<string | null> {
|
||||
return this.redis.get(key);
|
||||
|
||||
@@ -99,17 +99,9 @@ export class LeadEnrichController {
|
||||
);
|
||||
}
|
||||
|
||||
// 5. Invalidate the caller cache so the next incoming call from
|
||||
// this phone number does a fresh platform lookup (and picks
|
||||
// up the corrected identity + new summary).
|
||||
if (body?.phone) {
|
||||
try {
|
||||
await this.callerResolution.invalidate(body.phone);
|
||||
this.logger.log(`[LEAD-ENRICH] Caller cache invalidated for ${body.phone}`);
|
||||
} catch (err) {
|
||||
this.logger.warn(`[LEAD-ENRICH] Failed to invalidate caller cache: ${err}`);
|
||||
}
|
||||
}
|
||||
// Caller resolution no longer caches — every resolve() hits the
|
||||
// platform fresh via an indexed phone filter. No invalidation
|
||||
// needed after enrichment.
|
||||
|
||||
this.logger.log(`[LEAD-ENRICH] Lead ${leadId} enriched successfully`);
|
||||
|
||||
|
||||
@@ -23,14 +23,4 @@ export class CallerResolutionController {
|
||||
const result = await this.resolution.resolve(phone, auth);
|
||||
return result;
|
||||
}
|
||||
|
||||
@Post('invalidate')
|
||||
async invalidate(@Body('phone') phone: string) {
|
||||
if (!phone) {
|
||||
throw new HttpException('phone is required', HttpStatus.BAD_REQUEST);
|
||||
}
|
||||
this.logger.log(`[RESOLVE] Invalidating cache for: ${phone}`);
|
||||
await this.resolution.invalidate(phone);
|
||||
return { status: 'ok' };
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
import { Module } from '@nestjs/common';
|
||||
import { Module, forwardRef } from '@nestjs/common';
|
||||
import { PlatformModule } from '../platform/platform.module';
|
||||
import { AuthModule } from '../auth/auth.module';
|
||||
import { CallerResolutionController } from './caller-resolution.controller';
|
||||
import { CallerResolutionService } from './caller-resolution.service';
|
||||
|
||||
@Module({
|
||||
imports: [PlatformModule, AuthModule],
|
||||
imports: [PlatformModule, forwardRef(() => AuthModule)],
|
||||
controllers: [CallerResolutionController],
|
||||
providers: [CallerResolutionService],
|
||||
exports: [CallerResolutionService],
|
||||
|
||||
@@ -1,9 +1,5 @@
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
import { PlatformGraphqlService } from '../platform/platform-graphql.service';
|
||||
import { SessionService } from '../auth/session.service';
|
||||
|
||||
const CACHE_TTL = 3600; // 1 hour
|
||||
const CACHE_PREFIX = 'caller:';
|
||||
|
||||
export type ResolvedCaller = {
|
||||
leadId: string;
|
||||
@@ -11,7 +7,7 @@ export type ResolvedCaller = {
|
||||
firstName: string;
|
||||
lastName: string;
|
||||
phone: string;
|
||||
isNew: boolean; // true if we just created the lead+patient pair
|
||||
isNew: boolean; // true if no Lead/Patient exists for this phone
|
||||
};
|
||||
|
||||
@Injectable()
|
||||
@@ -20,28 +16,24 @@ export class CallerResolutionService {
|
||||
|
||||
constructor(
|
||||
private readonly platform: PlatformGraphqlService,
|
||||
private readonly cache: SessionService,
|
||||
) {}
|
||||
|
||||
// Resolve a caller by phone number. Always returns a paired lead + patient.
|
||||
// Resolve a caller by phone number via indexed platform queries. No
|
||||
// cache — every call hits the DB fresh. Cache was previously used to
|
||||
// compensate for client-side `leads(first: 200)` scans, but we now
|
||||
// filter by phone directly which is O(log n) with the DB index.
|
||||
// Cost: ~2 fast queries per resolve; eventual-consistency window = 0.
|
||||
async resolve(phone: string, auth: string): Promise<ResolvedCaller> {
|
||||
const normalized = phone.replace(/\D/g, '').slice(-10);
|
||||
if (normalized.length < 10) {
|
||||
throw new Error(`Invalid phone number: ${phone}`);
|
||||
}
|
||||
|
||||
// 1. Check cache
|
||||
const cached = await this.cache.getCache(`${CACHE_PREFIX}${normalized}`);
|
||||
if (cached) {
|
||||
this.logger.log(`[RESOLVE] Cache hit for ${normalized}`);
|
||||
return JSON.parse(cached);
|
||||
}
|
||||
|
||||
// 2. Look up lead by phone
|
||||
const lead = await this.findLeadByPhone(normalized, auth);
|
||||
|
||||
// 3. Look up patient by phone
|
||||
const patient = await this.findPatientByPhone(normalized, auth);
|
||||
// Lookup lead + patient by phone, in parallel.
|
||||
const [lead, patient] = await Promise.all([
|
||||
this.findLeadByPhone(normalized, auth),
|
||||
this.findPatientByPhone(normalized, auth),
|
||||
]);
|
||||
|
||||
let result: ResolvedCaller;
|
||||
|
||||
@@ -51,6 +43,11 @@ export class CallerResolutionService {
|
||||
await this.linkLeadToPatient(lead.id, patient.id, auth);
|
||||
this.logger.log(`[RESOLVE] Linked existing lead ${lead.id} → patient ${patient.id}`);
|
||||
}
|
||||
// PRD: "Returning patient (Y/N) will be taken care of by the system"
|
||||
// Patient is recognized on a subsequent contact → mark as RETURNING
|
||||
if (patient.patientType === 'NEW') {
|
||||
this.upgradeToReturning(patient.id, auth);
|
||||
}
|
||||
result = {
|
||||
leadId: lead.id,
|
||||
patientId: patient.id,
|
||||
@@ -76,6 +73,9 @@ export class CallerResolutionService {
|
||||
// Patient exists, no lead — create lead
|
||||
const newLead = await this.createLead(patient.firstName, patient.lastName, normalized, patient.id, auth);
|
||||
this.logger.log(`[RESOLVE] Created lead ${newLead.id} for existing patient ${patient.id}`);
|
||||
if (patient.patientType === 'NEW') {
|
||||
this.upgradeToReturning(patient.id, auth);
|
||||
}
|
||||
result = {
|
||||
leadId: newLead.id,
|
||||
patientId: patient.id,
|
||||
@@ -85,13 +85,18 @@ export class CallerResolutionService {
|
||||
isNew: false,
|
||||
};
|
||||
} else {
|
||||
// Neither exists — create both
|
||||
const newPatient = await this.createPatient('', '', normalized, auth);
|
||||
const newLead = await this.createLead('', '', normalized, newPatient.id, auth);
|
||||
this.logger.log(`[RESOLVE] Created new lead ${newLead.id} + patient ${newPatient.id} for ${normalized}`);
|
||||
// Neither exists — return empty IDs with isNew=true. Caller
|
||||
// code is responsible for creating records with the real name
|
||||
// they've collected (enquiry form, appointment form, widget,
|
||||
// AI tools). This avoids the "Unknown" placeholder cascade:
|
||||
// no Lead/Patient is ever written unless we have a real name
|
||||
// to attach to it. Missed-call / poller paths that have no
|
||||
// name persist the Call record with leadName=phone as the
|
||||
// honest snapshot.
|
||||
this.logger.log(`[RESOLVE] No existing records for ${normalized} — returning isNew=true`);
|
||||
result = {
|
||||
leadId: newLead.id,
|
||||
patientId: newPatient.id,
|
||||
leadId: '',
|
||||
patientId: '',
|
||||
firstName: '',
|
||||
lastName: '',
|
||||
phone: normalized,
|
||||
@@ -99,43 +104,30 @@ export class CallerResolutionService {
|
||||
};
|
||||
}
|
||||
|
||||
// 4. Cache the result
|
||||
await this.cache.setCache(`${CACHE_PREFIX}${normalized}`, JSON.stringify(result), CACHE_TTL);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
// Invalidate cache for a phone number (call after updates)
|
||||
async invalidate(phone: string): Promise<void> {
|
||||
const normalized = phone.replace(/\D/g, '').slice(-10);
|
||||
await this.cache.setCache(`${CACHE_PREFIX}${normalized}`, '', 1); // expire immediately
|
||||
}
|
||||
|
||||
// Indexed lookup — platform filters by phone server-side. Matches on
|
||||
// the last 10 digits regardless of stored format (+919XXXX / 91XXXX /
|
||||
// XXXX / +91-XXXX), via the `like: "%XXXXXXXXXX"` predicate.
|
||||
private async findLeadByPhone(phone10: string, auth: string): Promise<{ id: string; firstName: string; lastName: string; patientId: string | null } | null> {
|
||||
try {
|
||||
const data = await this.platform.queryWithAuth<{ leads: { edges: { node: any }[] } }>(
|
||||
`{ leads(first: 200) { edges { node {
|
||||
`{ leads(first: 1, filter: { contactPhone: { primaryPhoneNumber: { like: "%${phone10}" } } }) { edges { node {
|
||||
id
|
||||
contactName { firstName lastName }
|
||||
contactPhone { primaryPhoneNumber }
|
||||
patientId
|
||||
} } } }`,
|
||||
undefined,
|
||||
auth,
|
||||
);
|
||||
|
||||
const match = data.leads.edges.find(e => {
|
||||
const num = (e.node.contactPhone?.primaryPhoneNumber ?? '').replace(/\D/g, '').slice(-10);
|
||||
return num.length >= 10 && num === phone10;
|
||||
});
|
||||
|
||||
const match = data.leads.edges[0]?.node;
|
||||
if (!match) return null;
|
||||
|
||||
return {
|
||||
id: match.node.id,
|
||||
firstName: match.node.contactName?.firstName ?? '',
|
||||
lastName: match.node.contactName?.lastName ?? '',
|
||||
patientId: match.node.patientId || null,
|
||||
id: match.id,
|
||||
firstName: match.contactName?.firstName ?? '',
|
||||
lastName: match.contactName?.lastName ?? '',
|
||||
patientId: match.patientId || null,
|
||||
};
|
||||
} catch (err: any) {
|
||||
this.logger.warn(`[RESOLVE] Lead lookup failed: ${err.message}`);
|
||||
@@ -143,29 +135,24 @@ export class CallerResolutionService {
|
||||
}
|
||||
}
|
||||
|
||||
private async findPatientByPhone(phone10: string, auth: string): Promise<{ id: string; firstName: string; lastName: string } | null> {
|
||||
private async findPatientByPhone(phone10: string, auth: string): Promise<{ id: string; firstName: string; lastName: string; patientType: string | null } | null> {
|
||||
try {
|
||||
const data = await this.platform.queryWithAuth<{ patients: { edges: { node: any }[] } }>(
|
||||
`{ patients(first: 200) { edges { node {
|
||||
`{ patients(first: 1, filter: { phones: { primaryPhoneNumber: { like: "%${phone10}" } } }) { edges { node {
|
||||
id
|
||||
fullName { firstName lastName }
|
||||
phones { primaryPhoneNumber }
|
||||
patientType
|
||||
} } } }`,
|
||||
undefined,
|
||||
auth,
|
||||
);
|
||||
|
||||
const match = data.patients.edges.find(e => {
|
||||
const num = (e.node.phones?.primaryPhoneNumber ?? '').replace(/\D/g, '').slice(-10);
|
||||
return num.length >= 10 && num === phone10;
|
||||
});
|
||||
|
||||
const match = data.patients.edges[0]?.node;
|
||||
if (!match) return null;
|
||||
|
||||
return {
|
||||
id: match.node.id,
|
||||
firstName: match.node.fullName?.firstName ?? '',
|
||||
lastName: match.node.fullName?.lastName ?? '',
|
||||
id: match.id,
|
||||
firstName: match.fullName?.firstName ?? '',
|
||||
lastName: match.fullName?.lastName ?? '',
|
||||
patientType: match.patientType ?? null,
|
||||
};
|
||||
} catch (err: any) {
|
||||
this.logger.warn(`[RESOLVE] Patient lookup failed: ${err.message}`);
|
||||
@@ -178,6 +165,7 @@ export class CallerResolutionService {
|
||||
`mutation($data: PatientCreateInput!) { createPatient(data: $data) { id } }`,
|
||||
{
|
||||
data: {
|
||||
name: `${firstName || 'Unknown'} ${lastName || ''}`.trim(),
|
||||
fullName: { firstName: firstName || 'Unknown', lastName: lastName || '' },
|
||||
phones: { primaryPhoneNumber: `+91${phone}` },
|
||||
patientType: 'NEW',
|
||||
@@ -206,6 +194,19 @@ export class CallerResolutionService {
|
||||
return data.createLead;
|
||||
}
|
||||
|
||||
private upgradeToReturning(patientId: string, auth: string): void {
|
||||
// Fire-and-forget — don't block caller resolution
|
||||
this.platform.queryWithAuth<any>(
|
||||
`mutation($id: UUID!, $data: PatientUpdateInput!) { updatePatient(id: $id, data: $data) { id } }`,
|
||||
{ id: patientId, data: { patientType: 'RETURNING' } },
|
||||
auth,
|
||||
).then(() => {
|
||||
this.logger.log(`[RESOLVE] Upgraded patient ${patientId} to RETURNING`);
|
||||
}).catch(err => {
|
||||
this.logger.warn(`[RESOLVE] Failed to upgrade patient type: ${err.message}`);
|
||||
});
|
||||
}
|
||||
|
||||
private async linkLeadToPatient(leadId: string, patientId: string, auth: string): Promise<void> {
|
||||
await this.platform.queryWithAuth<any>(
|
||||
`mutation($id: UUID!, $data: LeadUpdateInput!) { updateLead(id: $id, data: $data) { id } }`,
|
||||
|
||||
@@ -22,6 +22,16 @@ export default () => ({
|
||||
missedQueue: {
|
||||
pollIntervalMs: parseInt(process.env.MISSED_QUEUE_POLL_INTERVAL_MS ?? '30000', 10),
|
||||
},
|
||||
worklist: {
|
||||
// Per-page fetch size from the platform GraphQL endpoint. Tuned to
|
||||
// balance response size vs. page count. Platform's Relay pagination
|
||||
// typically caps at 100–200 per page.
|
||||
pageSize: parseInt(process.env.WORKLIST_PAGE_SIZE ?? '50', 10),
|
||||
// Hard ceiling on pages fetched per poll. Safety valve against
|
||||
// unbounded cost when a tenant has thousands of pending callbacks.
|
||||
// maxPages * pageSize = effective worklist size.
|
||||
maxPages: parseInt(process.env.WORKLIST_MAX_PAGES ?? '10', 10),
|
||||
},
|
||||
ai: {
|
||||
provider: process.env.AI_PROVIDER ?? 'openai',
|
||||
anthropicApiKey: process.env.ANTHROPIC_API_KEY ?? '',
|
||||
|
||||
@@ -53,4 +53,20 @@ export class SetupStateController {
|
||||
const updated = this.setupState.resetState();
|
||||
return { ...updated, wizardRequired: this.setupState.isWizardRequired() };
|
||||
}
|
||||
|
||||
// UI-level flags the frontend reads at app boot to tailor which admin
|
||||
// surfaces are available. Driven by sidecar env vars so each workspace
|
||||
// can be configured independently without touching the frontend build.
|
||||
//
|
||||
// setupManaged=true means "the product team handles setup for this
|
||||
// workspace" — hide the Settings nav, routes, and the resume-setup
|
||||
// banner. The wizard + setup-state APIs stay functional for ops use
|
||||
// (a support engineer can still PUT /steps/:step or hit the routes
|
||||
// directly); only the end-user admin UI is hidden.
|
||||
@Get('ui-flags')
|
||||
uiFlags() {
|
||||
return {
|
||||
setupManaged: process.env.HELIX_SETUP_MANAGED === 'true',
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -46,6 +46,7 @@ export class TelephonyConfigService implements OnModuleInit {
|
||||
ozonetel: {
|
||||
...c.ozonetel,
|
||||
agentPassword: c.ozonetel.agentPassword ? '***masked***' : '',
|
||||
adminPassword: c.ozonetel.adminPassword ? '***masked***' : '',
|
||||
},
|
||||
};
|
||||
}
|
||||
@@ -68,6 +69,9 @@ export class TelephonyConfigService implements OnModuleInit {
|
||||
if (merged.ozonetel.agentPassword === '***masked***') {
|
||||
merged.ozonetel.agentPassword = current.ozonetel.agentPassword;
|
||||
}
|
||||
if (merged.ozonetel.adminPassword === '***masked***') {
|
||||
merged.ozonetel.adminPassword = current.ozonetel.adminPassword;
|
||||
}
|
||||
this.backup();
|
||||
this.writeFile(merged);
|
||||
this.cached = merged;
|
||||
|
||||
@@ -22,6 +22,11 @@ export type TelephonyConfig = {
|
||||
sipId: string;
|
||||
// Default outbound campaign name on Ozonetel CloudAgent.
|
||||
campaignName: string;
|
||||
// Ozonetel portal admin credentials — used by supervisor barge/whisper/listen.
|
||||
// These are the login credentials for the Ozonetel admin dashboard
|
||||
// (api.cloudagent.ozonetel.com/auth/login), NOT an agent ID.
|
||||
adminUsername: string;
|
||||
adminPassword: string;
|
||||
};
|
||||
// Ozonetel WebRTC gateway used by the staff portal softphone.
|
||||
sip: {
|
||||
@@ -46,6 +51,8 @@ export const DEFAULT_TELEPHONY_CONFIG: TelephonyConfig = {
|
||||
did: '',
|
||||
sipId: '',
|
||||
campaignName: '',
|
||||
adminUsername: '',
|
||||
adminPassword: '',
|
||||
},
|
||||
sip: {
|
||||
domain: 'blr-pub-rtc4.ozonetel.com',
|
||||
@@ -62,8 +69,11 @@ export const DEFAULT_TELEPHONY_CONFIG: TelephonyConfig = {
|
||||
// Field-by-field mapping from legacy env var names to config paths. Used by
|
||||
// the first-boot seeder. Keep in sync with the migration target sites.
|
||||
export const TELEPHONY_ENV_SEEDS: Array<{ env: string; path: string[] }> = [
|
||||
{ env: 'OZONETEL_AGENT_ID', path: ['ozonetel', 'agentId'] },
|
||||
// OZONETEL_AGENT_ID removed — agentId is per-user on the Agent entity,
|
||||
// not a sidecar-level config. All endpoints require agentId from caller.
|
||||
{ env: 'OZONETEL_AGENT_PASSWORD', path: ['ozonetel', 'agentPassword'] },
|
||||
{ env: 'OZONETEL_ADMIN_USERNAME', path: ['ozonetel', 'adminUsername'] },
|
||||
{ env: 'OZONETEL_ADMIN_PASSWORD', path: ['ozonetel', 'adminPassword'] },
|
||||
{ env: 'OZONETEL_DID', path: ['ozonetel', 'did'] },
|
||||
{ env: 'OZONETEL_SIP_ID', path: ['ozonetel', 'sipId'] },
|
||||
{ env: 'OZONETEL_CAMPAIGN_NAME', path: ['ozonetel', 'campaignName'] },
|
||||
|
||||
182
src/leads/lead-auto-assign.service.ts
Normal file
182
src/leads/lead-auto-assign.service.ts
Normal file
@@ -0,0 +1,182 @@
|
||||
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
|
||||
import { PlatformGraphqlService } from '../platform/platform-graphql.service';
|
||||
import { SupervisorService } from '../supervisor/supervisor.service';
|
||||
|
||||
const TICK_INTERVAL_MS = 60 * 1000; // 60s
|
||||
const KICKOFF_DELAY_MS = 45_000; // let sidecar boot settle
|
||||
const MAX_LEADS_PER_TICK = 100; // guard against runaway batches
|
||||
const ACTIVE_STATES = new Set(['ready', 'calling', 'in-call', 'acw']);
|
||||
// Excluded: 'offline' (agent logged out), 'break' / 'training' (explicitly away).
|
||||
// ACW is included — the agent is still handling work and will return to Ready soon.
|
||||
|
||||
/**
|
||||
* Polls for unassigned leads every 60s and assigns them least-loaded across
|
||||
* active agents.
|
||||
*
|
||||
* Why polling instead of platform functions or Redpanda events:
|
||||
* - The platform's lead.created hook isn't wired to the sidecar (no bridge)
|
||||
* - The SDK's lead-auto-assign.function.ts is written but hasn't been
|
||||
* deployed/published to either workspace
|
||||
* - Polling catches EVERY lead creation path (CSV import, enquiry form,
|
||||
* missed-call webhook, widget, livekit) with no per-path instrumentation
|
||||
*
|
||||
* Assignment strategy:
|
||||
* - Count each active agent's OPEN leads (status in NEW/CONTACTED/QUALIFIED)
|
||||
* - Pick the agent with the lowest count — ties broken by platform ordering
|
||||
* - Write agent.name (display name) to lead.assignedAgent (worklist filter matches on this)
|
||||
*
|
||||
* Edge cases:
|
||||
* - No active agents → skip tick; next run retries
|
||||
* - agentName empty → skip agent
|
||||
* - Mutation errors → log, continue with next lead
|
||||
*/
|
||||
@Injectable()
|
||||
export class LeadAutoAssignService implements OnModuleInit, OnModuleDestroy {
|
||||
private readonly logger = new Logger(LeadAutoAssignService.name);
|
||||
private timer: NodeJS.Timeout | null = null;
|
||||
private running = false;
|
||||
|
||||
constructor(
|
||||
private readonly platform: PlatformGraphqlService,
|
||||
private readonly supervisor: SupervisorService,
|
||||
) {}
|
||||
|
||||
onModuleInit() {
|
||||
setTimeout(() => {
|
||||
this.runOnce().catch((err) => this.logger.warn(`[AUTO-ASSIGN] Kickoff failed: ${err?.message ?? err}`));
|
||||
}, KICKOFF_DELAY_MS);
|
||||
|
||||
this.timer = setInterval(() => {
|
||||
this.runOnce().catch((err) => this.logger.warn(`[AUTO-ASSIGN] Tick failed: ${err?.message ?? err}`));
|
||||
}, TICK_INTERVAL_MS);
|
||||
}
|
||||
|
||||
onModuleDestroy() {
|
||||
if (this.timer) clearInterval(this.timer);
|
||||
}
|
||||
|
||||
async runOnce(): Promise<{ assigned: number; skipped: number; noAgents: boolean }> {
|
||||
// Guard against concurrent runs (prev tick hasn't finished).
|
||||
if (this.running) return { assigned: 0, skipped: 0, noAgents: false };
|
||||
this.running = true;
|
||||
try {
|
||||
const unassigned = await this.fetchUnassignedLeads();
|
||||
if (unassigned.length === 0) return { assigned: 0, skipped: 0, noAgents: false };
|
||||
|
||||
const active = await this.fetchActiveAgents();
|
||||
if (active.length === 0) {
|
||||
this.logger.debug(`[AUTO-ASSIGN] ${unassigned.length} leads waiting — no active agents`);
|
||||
return { assigned: 0, skipped: unassigned.length, noAgents: true };
|
||||
}
|
||||
|
||||
// Seed current-load map: lead count per agent across their OPEN leads.
|
||||
// Fetch once per tick (not per lead) — the map is updated locally as we assign.
|
||||
const loadByAgent = await this.fetchOpenLeadCounts(active.map((a) => a.name));
|
||||
|
||||
let assigned = 0;
|
||||
let skipped = 0;
|
||||
|
||||
for (const lead of unassigned) {
|
||||
// Pick the least-loaded active agent.
|
||||
const target = [...active].sort(
|
||||
(a, b) => (loadByAgent.get(a.name) ?? 0) - (loadByAgent.get(b.name) ?? 0),
|
||||
)[0];
|
||||
if (!target?.name) { skipped++; continue; }
|
||||
|
||||
try {
|
||||
await this.platform.query<any>(
|
||||
`mutation($id: UUID!, $data: LeadUpdateInput!) { updateLead(id: $id, data: $data) { id } }`,
|
||||
{ id: lead.id, data: { assignedAgent: target.name } },
|
||||
);
|
||||
assigned++;
|
||||
loadByAgent.set(target.name, (loadByAgent.get(target.name) ?? 0) + 1);
|
||||
await new Promise((r) => setTimeout(r, 40)); // gentle pacing
|
||||
} catch (err: any) {
|
||||
this.logger.warn(`[AUTO-ASSIGN] updateLead failed for ${lead.id}: ${err?.message ?? err}`);
|
||||
skipped++;
|
||||
}
|
||||
}
|
||||
|
||||
if (assigned > 0 || skipped > 0) {
|
||||
const loadSummary = active.map((a) => `${a.name}=${loadByAgent.get(a.name) ?? 0}`).join(', ');
|
||||
this.logger.log(`[AUTO-ASSIGN] Pass complete — assigned=${assigned} skipped=${skipped} load=[${loadSummary}]`);
|
||||
}
|
||||
return { assigned, skipped, noAgents: false };
|
||||
} finally {
|
||||
this.running = false;
|
||||
}
|
||||
}
|
||||
|
||||
private async fetchUnassignedLeads(): Promise<Array<{ id: string; campaignId: string | null }>> {
|
||||
try {
|
||||
const data: any = await this.platform.query<any>(
|
||||
`{ leads(first: ${MAX_LEADS_PER_TICK}, filter: {
|
||||
or: [
|
||||
{ assignedAgent: { eq: "" } },
|
||||
{ assignedAgent: { is: NULL } }
|
||||
]
|
||||
}, orderBy: [{ createdAt: AscNullsLast }]) {
|
||||
edges { node { id campaignId } }
|
||||
} }`,
|
||||
);
|
||||
return (data?.leads?.edges ?? []).map((e: any) => e.node);
|
||||
} catch (err: any) {
|
||||
this.logger.warn(`[AUTO-ASSIGN] fetch unassigned failed: ${err?.message ?? err}`);
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
private async fetchActiveAgents(): Promise<Array<{ id: string; name: string; ozonetelAgentId: string }>> {
|
||||
try {
|
||||
const data: any = await this.platform.query<any>(
|
||||
`{ agents(first: 100) { edges { node { id name ozonetelAgentId } } } }`,
|
||||
);
|
||||
const all: Array<{ id: string; name: string; ozonetelAgentId: string }> =
|
||||
(data?.agents?.edges ?? []).map((e: any) => e.node);
|
||||
// Filter to agents whose in-memory state (from Ozonetel webhooks) is active.
|
||||
// If state is unknown (never seen a state event), treat as offline.
|
||||
return all.filter((a) => {
|
||||
if (!a.name || !a.ozonetelAgentId) return false;
|
||||
const entry = this.supervisor.getAgentState(a.ozonetelAgentId);
|
||||
return entry ? ACTIVE_STATES.has(entry.state) : false;
|
||||
});
|
||||
} catch (err: any) {
|
||||
this.logger.warn(`[AUTO-ASSIGN] fetch agents failed: ${err?.message ?? err}`);
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
private async fetchOpenLeadCounts(agentNames: string[]): Promise<Map<string, number>> {
|
||||
const map = new Map<string, number>();
|
||||
for (const name of agentNames) map.set(name, 0);
|
||||
if (agentNames.length === 0) return map;
|
||||
|
||||
// Single aggregated query — pull ALL open leads with assignedAgent set,
|
||||
// count by agent locally. Avoids N+1 over agents.
|
||||
try {
|
||||
let after: string | null = null;
|
||||
for (let page = 0; page < 20; page++) {
|
||||
const cursor: string = after ? `, after: "${after}"` : '';
|
||||
const data: any = await this.platform.query<any>(
|
||||
`{ leads(first: 200${cursor}, filter: {
|
||||
status: { in: [NEW, CONTACTED, QUALIFIED] }
|
||||
}) {
|
||||
edges { node { assignedAgent } }
|
||||
pageInfo { hasNextPage endCursor }
|
||||
} }`,
|
||||
);
|
||||
const edges = data?.leads?.edges ?? [];
|
||||
for (const e of edges) {
|
||||
const name = e.node.assignedAgent;
|
||||
if (name && map.has(name)) map.set(name, (map.get(name) ?? 0) + 1);
|
||||
}
|
||||
const info: { hasNextPage?: boolean; endCursor?: string } = data?.leads?.pageInfo ?? {};
|
||||
if (!info.hasNextPage) break;
|
||||
after = info.endCursor ?? null;
|
||||
}
|
||||
} catch (err: any) {
|
||||
this.logger.warn(`[AUTO-ASSIGN] fetch open-lead counts failed: ${err?.message ?? err}`);
|
||||
}
|
||||
return map;
|
||||
}
|
||||
}
|
||||
11
src/leads/leads.module.ts
Normal file
11
src/leads/leads.module.ts
Normal file
@@ -0,0 +1,11 @@
|
||||
import { Module, forwardRef } from '@nestjs/common';
|
||||
import { PlatformModule } from '../platform/platform.module';
|
||||
import { SupervisorModule } from '../supervisor/supervisor.module';
|
||||
import { LeadAutoAssignService } from './lead-auto-assign.service';
|
||||
|
||||
@Module({
|
||||
imports: [PlatformModule, forwardRef(() => SupervisorModule)],
|
||||
providers: [LeadAutoAssignService],
|
||||
exports: [LeadAutoAssignService],
|
||||
})
|
||||
export class LeadsModule {}
|
||||
@@ -27,6 +27,27 @@ async function gql<T = any>(query: string, variables?: Record<string, unknown>):
|
||||
}
|
||||
}
|
||||
|
||||
// Resolve a phone to a {leadId, patientId} pair via the sidecar's
|
||||
// caller-resolution endpoint. Always returns populated IDs (creates
|
||||
// placeholder lead+patient when none exist).
|
||||
async function resolveCaller(phone: string): Promise<{ leadId: string; patientId: string; firstName: string; lastName: string; isNew: boolean } | null> {
|
||||
try {
|
||||
const res = await fetch(`${SIDECAR_URL}/api/caller/resolve`, {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({ phone }),
|
||||
});
|
||||
if (!res.ok) {
|
||||
console.error('[AGENT-RESOLVE] Failed:', res.status, await res.text().catch(() => ''));
|
||||
return null;
|
||||
}
|
||||
return await res.json();
|
||||
} catch (err) {
|
||||
console.error('[AGENT-RESOLVE] Failed:', err);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
// Hospital context — loaded on startup
|
||||
let hospitalContext = {
|
||||
doctors: [] as Array<{ name: string; department: string; specialty: string; id: string }>,
|
||||
@@ -128,28 +149,58 @@ const bookAppointment = llm.tool({
|
||||
doctorName: doctor?.name ?? doctorName ?? 'To be assigned',
|
||||
department,
|
||||
reasonForVisit: reason,
|
||||
...((doctor as any)?.clinicId ? { clinicId: (doctor as any).clinicId } : {}),
|
||||
},
|
||||
},
|
||||
);
|
||||
|
||||
// Create or find lead
|
||||
// Resolve caller — if isNew, create Lead + Patient with the
|
||||
// AI-collected name; otherwise update the existing record.
|
||||
const cleanPhone = phoneNumber.replace(/[^0-9]/g, '').slice(-10);
|
||||
await gql(
|
||||
`mutation($data: LeadCreateInput!) { createLead(data: $data) { id } }`,
|
||||
{
|
||||
data: {
|
||||
name: `AI — ${patientName}`,
|
||||
contactName: {
|
||||
firstName: patientName.split(' ')[0],
|
||||
lastName: patientName.split(' ').slice(1).join(' ') || '',
|
||||
const resolved = await resolveCaller(cleanPhone);
|
||||
const fn = patientName.split(' ')[0];
|
||||
const ln = patientName.split(' ').slice(1).join(' ') || '';
|
||||
if (resolved?.isNew) {
|
||||
const p = await gql(
|
||||
`mutation($data: PatientCreateInput!) { createPatient(data: $data) { id } }`,
|
||||
{ data: { fullName: { firstName: fn, lastName: ln }, phones: { primaryPhoneNumber: `+91${cleanPhone}` }, patientType: 'NEW' } },
|
||||
);
|
||||
const newPatientId = p?.createPatient?.id;
|
||||
await gql(
|
||||
`mutation($data: LeadCreateInput!) { createLead(data: $data) { id } }`,
|
||||
{
|
||||
data: {
|
||||
name: `AI — ${patientName}`,
|
||||
contactName: { firstName: fn, lastName: ln },
|
||||
contactPhone: { primaryPhoneNumber: `+91${cleanPhone}` },
|
||||
source: 'PHONE',
|
||||
status: 'APPOINTMENT_SET',
|
||||
interestedService: department,
|
||||
...(newPatientId ? { patientId: newPatientId } : {}),
|
||||
},
|
||||
contactPhone: { primaryPhoneNumber: `+91${cleanPhone}` },
|
||||
source: 'PHONE',
|
||||
status: 'APPOINTMENT_SET',
|
||||
interestedService: department,
|
||||
},
|
||||
},
|
||||
);
|
||||
);
|
||||
} else if (resolved?.leadId) {
|
||||
await gql(
|
||||
`mutation($id: UUID!, $data: LeadUpdateInput!) { updateLead(id: $id, data: $data) { id } }`,
|
||||
{
|
||||
id: resolved.leadId,
|
||||
data: {
|
||||
name: `AI — ${patientName}`,
|
||||
contactName: { firstName: fn, lastName: ln },
|
||||
source: 'PHONE',
|
||||
status: 'APPOINTMENT_SET',
|
||||
interestedService: department,
|
||||
},
|
||||
},
|
||||
);
|
||||
if (resolved.patientId) {
|
||||
await gql(
|
||||
`mutation($id: UUID!, $data: PatientUpdateInput!) { updatePatient(id: $id, data: $data) { id } }`,
|
||||
{ id: resolved.patientId, data: { fullName: { firstName: fn, lastName: ln } } },
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
const refNum = `GH-${Date.now().toString().slice(-6)}`;
|
||||
if (result?.createAppointment?.id) {
|
||||
@@ -171,25 +222,53 @@ const collectLeadInfo = llm.tool({
|
||||
console.log(`[LIVEKIT-AGENT] Lead: ${name} | ${phoneNumber} | ${interest}`);
|
||||
|
||||
const cleanPhone = phoneNumber.replace(/[^0-9]/g, '').slice(-10);
|
||||
const result = await gql(
|
||||
`mutation($data: LeadCreateInput!) { createLead(data: $data) { id } }`,
|
||||
{
|
||||
data: {
|
||||
name: `AI Enquiry — ${name}`,
|
||||
contactName: {
|
||||
firstName: name.split(' ')[0],
|
||||
lastName: name.split(' ').slice(1).join(' ') || '',
|
||||
},
|
||||
contactPhone: { primaryPhoneNumber: `+91${cleanPhone}` },
|
||||
source: 'PHONE',
|
||||
status: 'NEW',
|
||||
interestedService: interest,
|
||||
},
|
||||
},
|
||||
);
|
||||
const resolved = await resolveCaller(cleanPhone);
|
||||
const fn = name.split(' ')[0];
|
||||
const ln = name.split(' ').slice(1).join(' ') || '';
|
||||
|
||||
if (result?.createLead?.id) {
|
||||
console.log(`[LIVEKIT-AGENT] Lead created: ${result.createLead.id}`);
|
||||
if (resolved?.isNew) {
|
||||
// Net-new caller — create Patient + Lead with the AI-collected name.
|
||||
const p = await gql(
|
||||
`mutation($data: PatientCreateInput!) { createPatient(data: $data) { id } }`,
|
||||
{ data: { fullName: { firstName: fn, lastName: ln }, phones: { primaryPhoneNumber: `+91${cleanPhone}` }, patientType: 'NEW' } },
|
||||
);
|
||||
const newPatientId = p?.createPatient?.id;
|
||||
const created = await gql(
|
||||
`mutation($data: LeadCreateInput!) { createLead(data: $data) { id } }`,
|
||||
{
|
||||
data: {
|
||||
name: `AI Enquiry — ${name}`,
|
||||
contactName: { firstName: fn, lastName: ln },
|
||||
contactPhone: { primaryPhoneNumber: `+91${cleanPhone}` },
|
||||
source: 'PHONE',
|
||||
status: 'NEW',
|
||||
interestedService: interest,
|
||||
...(newPatientId ? { patientId: newPatientId } : {}),
|
||||
},
|
||||
},
|
||||
);
|
||||
console.log(`[LIVEKIT-AGENT] Lead created: ${created?.createLead?.id ?? 'none'} (patient ${newPatientId ?? 'none'})`);
|
||||
} else if (resolved?.leadId) {
|
||||
await gql(
|
||||
`mutation($id: UUID!, $data: LeadUpdateInput!) { updateLead(id: $id, data: $data) { id } }`,
|
||||
{
|
||||
id: resolved.leadId,
|
||||
data: {
|
||||
name: `AI Enquiry — ${name}`,
|
||||
contactName: { firstName: fn, lastName: ln },
|
||||
source: 'PHONE',
|
||||
status: 'NEW',
|
||||
interestedService: interest,
|
||||
},
|
||||
},
|
||||
);
|
||||
if (resolved.patientId) {
|
||||
await gql(
|
||||
`mutation($id: UUID!, $data: PatientUpdateInput!) { updatePatient(id: $id, data: $data) { id } }`,
|
||||
{ id: resolved.patientId, data: { fullName: { firstName: fn, lastName: ln } } },
|
||||
);
|
||||
}
|
||||
console.log(`[LIVEKIT-AGENT] Lead updated: ${resolved.leadId} (patient ${resolved.patientId})`);
|
||||
}
|
||||
return `Thank you ${name}. I have noted your enquiry about ${interest}. One of our team members will call you back on ${phoneNumber} shortly.`;
|
||||
},
|
||||
|
||||
@@ -1,11 +1,14 @@
|
||||
import { Controller, Post, UseGuards, Logger } from '@nestjs/common';
|
||||
import { Body, Controller, HttpException, Post, UseGuards, Logger } from '@nestjs/common';
|
||||
import { MaintGuard } from './maint.guard';
|
||||
import { OzonetelAgentService } from '../ozonetel/ozonetel-agent.service';
|
||||
import { PlatformGraphqlService } from '../platform/platform-graphql.service';
|
||||
import { SessionService } from '../auth/session.service';
|
||||
import { SupervisorService } from '../supervisor/supervisor.service';
|
||||
import { AgentHistoryService, AgentEventType } from '../supervisor/agent-history.service';
|
||||
import { CallerResolutionService } from '../caller/caller-resolution.service';
|
||||
import { TelephonyConfigService } from '../config/telephony-config.service';
|
||||
import { AgentLookupService } from '../platform/agent-lookup.service';
|
||||
import { CdrEnrichmentService } from '../ozonetel/cdr-enrichment.service';
|
||||
|
||||
@Controller('api/maint')
|
||||
@UseGuards(MaintGuard)
|
||||
@@ -19,14 +22,20 @@ export class MaintController {
|
||||
private readonly session: SessionService,
|
||||
private readonly supervisor: SupervisorService,
|
||||
private readonly callerResolution: CallerResolutionService,
|
||||
private readonly history: AgentHistoryService,
|
||||
private readonly agentLookup: AgentLookupService,
|
||||
private readonly cdrEnrichment: CdrEnrichmentService,
|
||||
) {}
|
||||
|
||||
@Post('force-ready')
|
||||
async forceReady() {
|
||||
async forceReady(@Body() body: { agentId: string }) {
|
||||
if (!body?.agentId) throw new HttpException('agentId required', 400);
|
||||
const agentId = body.agentId;
|
||||
const oz = this.telephony.getConfig().ozonetel;
|
||||
const agentId = oz.agentId || 'agent3';
|
||||
const password = oz.agentPassword || 'Test123$';
|
||||
const sipId = oz.sipId || '521814';
|
||||
const password = oz.agentPassword;
|
||||
if (!password) throw new HttpException('agent password not configured', 400);
|
||||
const sipId = oz.sipId;
|
||||
if (!sipId) throw new HttpException('SIP ID not configured', 400);
|
||||
|
||||
this.logger.log(`[MAINT] Force ready: agent=${agentId}`);
|
||||
|
||||
@@ -47,9 +56,63 @@ export class MaintController {
|
||||
}
|
||||
}
|
||||
|
||||
// Returns the current per-agent session state — which ozonetelAgentIds
|
||||
// are currently locked (held by a member IP) and which are free. Used
|
||||
// by the maint OTP modal to render a picker so a supervisor can unlock
|
||||
// the right agent without knowing the id off the top of their head.
|
||||
// Read-only; OTP-guarded like the rest of /api/maint.
|
||||
@Post('session-status')
|
||||
async sessionStatus() {
|
||||
const data = await this.platform.query<any>(
|
||||
`{ agents(first: 100) { edges { node { id name ozonetelAgentId ozonetelDisplayName } } } }`,
|
||||
).catch(() => ({ agents: { edges: [] } }));
|
||||
|
||||
const allAgents = (data?.agents?.edges ?? []).map((e: any) => e.node).filter((a: any) => a.ozonetelAgentId);
|
||||
const sessions = await this.session.listLockedSessions();
|
||||
const sessionByAgent = new Map(sessions.map((s) => [s.agentId.toLowerCase(), s]));
|
||||
|
||||
const locked: Array<any> = [];
|
||||
const free: Array<any> = [];
|
||||
const seenAgentIds = new Set<string>();
|
||||
|
||||
for (const agent of allAgents) {
|
||||
const key = String(agent.ozonetelAgentId).toLowerCase();
|
||||
seenAgentIds.add(key);
|
||||
const session = sessionByAgent.get(key);
|
||||
const row = {
|
||||
agentId: agent.ozonetelAgentId,
|
||||
displayName: agent.name ?? agent.ozonetelDisplayName ?? agent.ozonetelAgentId,
|
||||
};
|
||||
if (session) {
|
||||
locked.push({ ...row, heldByIp: session.ip, lockedAt: session.lockedAt });
|
||||
} else {
|
||||
free.push(row);
|
||||
}
|
||||
}
|
||||
|
||||
// Surface orphan locks (Redis holds a session for an ozonetelAgentId
|
||||
// with no matching Agent entity). Rare but possible after SDK renames
|
||||
// or workspace resets — without surfacing them, the operator can't
|
||||
// clear the stale lock via the UI.
|
||||
for (const session of sessions) {
|
||||
const key = session.agentId.toLowerCase();
|
||||
if (!seenAgentIds.has(key)) {
|
||||
locked.push({
|
||||
agentId: session.agentId,
|
||||
displayName: `${session.agentId} (orphan — no Agent record)`,
|
||||
heldByIp: session.ip,
|
||||
lockedAt: session.lockedAt,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return { locked, free };
|
||||
}
|
||||
|
||||
@Post('unlock-agent')
|
||||
async unlockAgent() {
|
||||
const agentId = this.telephony.getConfig().ozonetel.agentId || 'agent3';
|
||||
async unlockAgent(@Body() body: { agentId: string }) {
|
||||
if (!body?.agentId) throw new HttpException('agentId required', 400);
|
||||
const agentId = body.agentId;
|
||||
this.logger.log(`[MAINT] Unlock agent session: ${agentId}`);
|
||||
|
||||
try {
|
||||
@@ -267,6 +330,7 @@ export class MaintController {
|
||||
`mutation($data: PatientCreateInput!) { createPatient(data: $data) { id } }`,
|
||||
{
|
||||
data: {
|
||||
name: `${firstName} ${lastName}`.trim(),
|
||||
fullName: { firstName, lastName },
|
||||
phones: { primaryPhoneNumber: `+91${phone}` },
|
||||
patientType: 'NEW',
|
||||
@@ -313,4 +377,586 @@ export class MaintController {
|
||||
this.logger.log(`[MAINT] Backfill complete: ${linked} linked, ${created} patients created, ${apptLinked} appointments linked, ${skipped} skipped`);
|
||||
return { status: 'ok', leads: { total: leads.length, linked, created, skipped }, appointments: { total: appointments.length, linked: apptLinked } };
|
||||
}
|
||||
|
||||
// Backfill Call records that lost their identity at ingest (missed-call
|
||||
// webhook / poller / dispose flow before the caller-resolution wiring).
|
||||
// Routes each phone through CallerResolutionService so the same code
|
||||
// path the live system uses also fixes historical rows. Idempotent —
|
||||
// safe to re-run; only patches calls that are currently missing
|
||||
// leadName / patientId / leadId.
|
||||
@Post('backfill-caller-resolution')
|
||||
async backfillCallerResolution() {
|
||||
this.logger.log('[MAINT] Backfill caller resolution — patching Calls + Leads via resolver');
|
||||
|
||||
const apiKey = process.env.PLATFORM_API_KEY ?? '';
|
||||
const auth = apiKey ? `Bearer ${apiKey}` : '';
|
||||
if (!auth) throw new HttpException('PLATFORM_API_KEY not configured', 500);
|
||||
|
||||
let callsScanned = 0;
|
||||
let callsPatched = 0;
|
||||
let callsSkipped = 0;
|
||||
let leadsResolved = 0;
|
||||
let resolveErrors = 0;
|
||||
|
||||
// Phone → resolved cache so multiple calls from the same number
|
||||
// only resolve once during this run.
|
||||
const resolvedByPhone = new Map<string, { leadId: string; patientId: string; firstName: string; lastName: string }>();
|
||||
|
||||
// Page through all calls in chunks of 200. We're after rows where
|
||||
// leadName is empty OR leadId is null OR patientId is missing.
|
||||
let cursor: string | null = null;
|
||||
let hasNext = true;
|
||||
while (hasNext) {
|
||||
const pageQuery = cursor
|
||||
? `{ calls(first: 200, after: "${cursor}") { edges { cursor node { id leadId leadName callerNumber { primaryPhoneNumber } } } pageInfo { hasNextPage endCursor } } }`
|
||||
: `{ calls(first: 200) { edges { cursor node { id leadId leadName callerNumber { primaryPhoneNumber } } } pageInfo { hasNextPage endCursor } } }`;
|
||||
let page: any;
|
||||
try {
|
||||
page = await this.platform.query<any>(pageQuery);
|
||||
} catch (err) {
|
||||
this.logger.warn(`[MAINT] calls page query failed: ${err}`);
|
||||
break;
|
||||
}
|
||||
const edges = page?.calls?.edges ?? [];
|
||||
hasNext = page?.calls?.pageInfo?.hasNextPage ?? false;
|
||||
cursor = page?.calls?.pageInfo?.endCursor ?? null;
|
||||
|
||||
for (const edge of edges) {
|
||||
const call = edge.node;
|
||||
callsScanned++;
|
||||
|
||||
const phoneRaw = call.callerNumber?.primaryPhoneNumber ?? '';
|
||||
const phone10 = phoneRaw.replace(/\D/g, '').slice(-10);
|
||||
const needsName = !call.leadName || call.leadName === '';
|
||||
const needsLead = !call.leadId;
|
||||
|
||||
if (!phone10 || phone10.length < 10) { callsSkipped++; continue; }
|
||||
if (!needsName && !needsLead) { callsSkipped++; continue; }
|
||||
|
||||
let resolved = resolvedByPhone.get(phone10) ?? null;
|
||||
if (!resolved) {
|
||||
try {
|
||||
const r = await this.callerResolution.resolve(phone10, auth);
|
||||
resolved = {
|
||||
leadId: r.leadId,
|
||||
patientId: r.patientId,
|
||||
firstName: r.firstName,
|
||||
lastName: r.lastName,
|
||||
};
|
||||
resolvedByPhone.set(phone10, resolved);
|
||||
leadsResolved++;
|
||||
} catch (err) {
|
||||
this.logger.warn(`[MAINT] resolve failed for ${phone10}: ${err}`);
|
||||
resolveErrors++;
|
||||
callsSkipped++;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
const fullName = `${resolved.firstName} ${resolved.lastName}`.trim();
|
||||
const updateParts: string[] = [];
|
||||
if (needsLead && resolved.leadId) updateParts.push(`leadId: "${resolved.leadId}"`);
|
||||
if (needsName && fullName) updateParts.push(`leadName: "${fullName.replace(/"/g, '\\"')}"`);
|
||||
if (updateParts.length === 0) { callsSkipped++; continue; }
|
||||
|
||||
try {
|
||||
await this.platform.query<any>(
|
||||
`mutation { updateCall(id: "${call.id}", data: { ${updateParts.join(', ')} }) { id } }`,
|
||||
);
|
||||
callsPatched++;
|
||||
} catch (err) {
|
||||
this.logger.warn(`[MAINT] updateCall failed for ${call.id}: ${err}`);
|
||||
callsSkipped++;
|
||||
}
|
||||
|
||||
// Throttle so the platform isn't hammered
|
||||
await new Promise((r) => setTimeout(r, 100));
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.log(`[MAINT] Backfill caller resolution complete: scanned=${callsScanned} patched=${callsPatched} skipped=${callsSkipped} uniquePhones=${resolvedByPhone.size} leadsResolved=${leadsResolved} resolveErrors=${resolveErrors}`);
|
||||
return {
|
||||
status: 'ok',
|
||||
calls: { scanned: callsScanned, patched: callsPatched, skipped: callsSkipped },
|
||||
phones: { unique: resolvedByPhone.size, resolved: leadsResolved, errors: resolveErrors },
|
||||
};
|
||||
}
|
||||
|
||||
// Recompute durationS on existing AgentEvent rows using the per-category
|
||||
// pairing logic. Fixes rows written before the slot-split fix where
|
||||
// ACW_START clobbered CALL_START's pending entry. Also re-runs the
|
||||
// session rollup for each affected date. Idempotent — only updates rows
|
||||
// whose stored durationS differs from the recomputed value.
|
||||
//
|
||||
// POST /api/maint/backfill-agent-event-durations
|
||||
// body: { date?: "YYYY-MM-DD" | "all" } — default today IST
|
||||
@Post('backfill-agent-event-durations')
|
||||
async backfillAgentEventDurations(@Body() body: { date?: string }) {
|
||||
const target = body?.date ?? this.todayIst();
|
||||
this.logger.log(`[MAINT] Backfill AgentEvent durations — target=${target}`);
|
||||
|
||||
// Pull events for the range. If "all", no filter; otherwise scope to the IST day.
|
||||
let events = await this.fetchAgentEventsForBackfill(target);
|
||||
if (events.length === 0) {
|
||||
return { status: 'ok', scanned: 0, patched: 0, skipped: 0, dates: [] };
|
||||
}
|
||||
this.logger.log(`[MAINT] Fetched ${events.length} AgentEvent rows`);
|
||||
|
||||
// Group by agent, sort by eventAt ascending.
|
||||
const byAgent = new Map<string, typeof events>();
|
||||
for (const e of events) {
|
||||
const k = e.agentId;
|
||||
if (!k) continue;
|
||||
if (!byAgent.has(k)) byAgent.set(k, []);
|
||||
byAgent.get(k)!.push(e);
|
||||
}
|
||||
for (const list of byAgent.values()) {
|
||||
list.sort((a, b) => new Date(a.eventAt).getTime() - new Date(b.eventAt).getTime());
|
||||
}
|
||||
|
||||
// Per-category slot pairing, same logic as the live ingest.
|
||||
const slotForStart = (t: AgentEventType): 'pause' | 'call' | 'acw' | null =>
|
||||
t === 'PAUSE' ? 'pause' : t === 'CALL_START' ? 'call' : t === 'ACW_START' ? 'acw' : null;
|
||||
const slotForEnd = (t: AgentEventType): 'pause' | 'call' | 'acw' | null =>
|
||||
t === 'RESUME' ? 'pause' : t === 'CALL_END' ? 'call' : t === 'ACW_END' ? 'acw' : null;
|
||||
|
||||
let patched = 0;
|
||||
let skipped = 0;
|
||||
const affectedDates = new Set<string>();
|
||||
|
||||
for (const [agentId, agentEvents] of byAgent) {
|
||||
const pending: { pause?: number; call?: number; acw?: number } = {};
|
||||
for (const e of agentEvents) {
|
||||
const eventMs = new Date(e.eventAt).getTime();
|
||||
const endSlot = slotForEnd(e.eventType);
|
||||
const startSlot = slotForStart(e.eventType);
|
||||
|
||||
let computed: number | null = null;
|
||||
|
||||
if (endSlot) {
|
||||
const at = pending[endSlot];
|
||||
if (at !== undefined) {
|
||||
computed = Math.max(0, Math.round((eventMs - at) / 1000));
|
||||
delete pending[endSlot];
|
||||
}
|
||||
} else if (startSlot) {
|
||||
pending[startSlot] = eventMs;
|
||||
} else if (e.eventType === 'READY' || e.eventType === 'LOGOUT') {
|
||||
delete pending.pause;
|
||||
delete pending.call;
|
||||
delete pending.acw;
|
||||
}
|
||||
|
||||
// Only patch END events that now have a computed duration
|
||||
// different from what's stored.
|
||||
if (endSlot && computed !== null && computed !== (e.durationS ?? null)) {
|
||||
try {
|
||||
await this.platform.query<any>(
|
||||
`mutation { updateAgentEvent(id: "${e.id}", data: { durationS: ${computed} }) { id } }`,
|
||||
);
|
||||
patched++;
|
||||
const datePart = (e.eventAt ?? '').slice(0, 10);
|
||||
if (datePart) affectedDates.add(datePart);
|
||||
this.logger.log(`[MAINT] Patched AgentEvent ${e.id} ${e.eventType} agent=${agentId} ${e.durationS ?? 'null'}s → ${computed}s`);
|
||||
await new Promise((r) => setTimeout(r, 80));
|
||||
} catch (err) {
|
||||
this.logger.warn(`[MAINT] Patch failed for ${e.id}: ${err}`);
|
||||
skipped++;
|
||||
}
|
||||
} else {
|
||||
skipped++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Re-run rollup for each affected date so AgentSession numbers update.
|
||||
const dates = Array.from(affectedDates);
|
||||
for (const d of dates) {
|
||||
try {
|
||||
await this.history.rollupSessions(d);
|
||||
this.logger.log(`[MAINT] Rollup re-run for ${d}`);
|
||||
} catch (err) {
|
||||
this.logger.warn(`[MAINT] Rollup failed for ${d}: ${err}`);
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.log(`[MAINT] Backfill AgentEvent durations complete: scanned=${events.length} patched=${patched} skipped=${skipped} dates=${dates.join(',')}`);
|
||||
return { status: 'ok', scanned: events.length, patched, skipped, dates };
|
||||
}
|
||||
|
||||
private todayIst(): string {
|
||||
const ist = new Date(Date.now() + 5.5 * 60 * 60 * 1000);
|
||||
return ist.toISOString().slice(0, 10);
|
||||
}
|
||||
|
||||
private async fetchAgentEventsForBackfill(date: string): Promise<Array<{ id: string; eventType: AgentEventType; eventAt: string; durationS: number | null; agentId: string }>> {
|
||||
const events: Array<{ id: string; eventType: AgentEventType; eventAt: string; durationS: number | null; agentId: string }> = [];
|
||||
let after: string | null = null;
|
||||
const rangeFilter = date === 'all'
|
||||
? ''
|
||||
: `, filter: { eventAt: { gte: "${date}T00:00:00+05:30", lte: "${date}T23:59:59+05:30" } }`;
|
||||
|
||||
for (let page = 0; page < 50; page++) {
|
||||
const cursorArg: string = after ? `, after: "${after}"` : '';
|
||||
const data: any = await this.platform.query<any>(
|
||||
`{ agentEvents(first: 200${cursorArg}${rangeFilter}, orderBy: [{ eventAt: AscNullsLast }]) {
|
||||
edges { node { id eventType eventAt durationS agentId } }
|
||||
pageInfo { hasNextPage endCursor }
|
||||
} }`,
|
||||
);
|
||||
const edges = data?.agentEvents?.edges ?? [];
|
||||
for (const e of edges) events.push(e.node);
|
||||
const pageInfo: { hasNextPage?: boolean; endCursor?: string } = data?.agentEvents?.pageInfo ?? {};
|
||||
if (!pageInfo.hasNextPage) break;
|
||||
after = pageInfo.endCursor ?? null;
|
||||
}
|
||||
return events;
|
||||
}
|
||||
|
||||
// Historical enrichment: runs the same CDR-enrichment loop the cron runs,
|
||||
// but kicks it off immediately and (optionally) widens the date window
|
||||
// beyond "today + yesterday" up to the CDR API's 15-day limit.
|
||||
//
|
||||
// POST /api/maint/enrich-call-agents
|
||||
// Headers: x-maint-otp: <OTP>
|
||||
// Body: { days?: number } — default 2 (matches the cron); max 15
|
||||
@Post('enrich-call-agents')
|
||||
async enrichCallAgents(@Body() body: { days?: number }) {
|
||||
const requestedDays = Math.max(1, Math.min(15, body?.days ?? 2));
|
||||
this.logger.log(`[MAINT] Enrich call agents — days=${requestedDays}`);
|
||||
|
||||
// Call the enrichment service once per date, respecting the 2-req/min
|
||||
// CDR rate limit. Each tick fetches one date's CDR (1 req) so we can
|
||||
// iterate up to 2 dates per minute — enforce a 35s gap between dates.
|
||||
const dates = this.recentDatesIst(requestedDays);
|
||||
let totalScanned = 0;
|
||||
let totalEnriched = 0;
|
||||
let totalSkipped = 0;
|
||||
|
||||
for (let i = 0; i < dates.length; i++) {
|
||||
const date = dates[i];
|
||||
try {
|
||||
const result = await this.enrichSingleDate(date);
|
||||
totalScanned += result.scanned;
|
||||
totalEnriched += result.enriched;
|
||||
totalSkipped += result.skipped;
|
||||
this.logger.log(`[MAINT] ${date} — scanned=${result.scanned} enriched=${result.enriched} skipped=${result.skipped}`);
|
||||
} catch (err: any) {
|
||||
this.logger.warn(`[MAINT] Enrich failed for ${date}: ${err?.message ?? err}`);
|
||||
}
|
||||
// Rate limiting: 35s between dates to stay under 2 req/min on CDR.
|
||||
if (i < dates.length - 1) await new Promise((r) => setTimeout(r, 35_000));
|
||||
}
|
||||
|
||||
this.logger.log(`[MAINT] Enrichment complete: scanned=${totalScanned} enriched=${totalEnriched} skipped=${totalSkipped} across ${dates.length} dates`);
|
||||
return { status: 'ok', scanned: totalScanned, enriched: totalEnriched, skipped: totalSkipped, dates };
|
||||
}
|
||||
|
||||
// Fallback backfill for historical Calls that pre-date UCID persistence.
|
||||
// Can't join to CDR without UCID, so parse the agentName string (which
|
||||
// may be a transfer chain "A -> B -> C"), take the final segment, and
|
||||
// resolve to an Agent entity by name or ozonetelAgentId (case-insensitive).
|
||||
//
|
||||
// POST /api/maint/backfill-call-agents-by-name
|
||||
// Headers: x-maint-otp: <OTP>
|
||||
// Body: {}
|
||||
@Post('backfill-call-agents-by-name')
|
||||
async backfillCallAgentsByName() {
|
||||
this.logger.log('[MAINT] Backfill call agents by name — matching agentName last-segment to Agent entity');
|
||||
|
||||
// Pull all active agents — cheap, cached at service level but we
|
||||
// also need name → UUID maps for this pass. Three indexes:
|
||||
// - ozonetelAgentId (e.g. "globalhealthx") — matches outbound dispose rows
|
||||
// - ozonetelDisplayName (e.g. "Ganesh Bandi") — matches inbound webhook rows
|
||||
// - platform Agent.name (e.g. "Ganesh Iyer") — last-resort fallback
|
||||
const agentData = await this.platform.query<any>(
|
||||
`{ agents(first: 100) { edges { node { id name ozonetelAgentId ozonetelDisplayName } } } }`,
|
||||
);
|
||||
const agentUuidByName = new Map<string, string>();
|
||||
const agentUuidByOzonetelId = new Map<string, string>();
|
||||
const agentUuidByDisplayName = new Map<string, string>();
|
||||
for (const edge of agentData?.agents?.edges ?? []) {
|
||||
const a = edge.node;
|
||||
if (a.name) agentUuidByName.set(a.name.toLowerCase().trim(), a.id);
|
||||
if (a.ozonetelAgentId) agentUuidByOzonetelId.set(a.ozonetelAgentId.toLowerCase().trim(), a.id);
|
||||
if (a.ozonetelDisplayName) agentUuidByDisplayName.set(a.ozonetelDisplayName.toLowerCase().trim(), a.id);
|
||||
}
|
||||
|
||||
let scanned = 0;
|
||||
let patched = 0;
|
||||
let skipped = 0;
|
||||
let unmatched = 0;
|
||||
const unmatchedSamples = new Set<string>();
|
||||
|
||||
// Paginate through all Calls with agentId=null and agentName set.
|
||||
let after: string | null = null;
|
||||
for (let page = 0; page < 50; page++) {
|
||||
const cursorArg: string = after ? `, after: "${after}"` : '';
|
||||
const data: any = await this.platform.query<any>(
|
||||
`{ calls(first: 200${cursorArg}, filter: {
|
||||
agentId: { is: NULL },
|
||||
agentName: { is: NOT_NULL }
|
||||
}) {
|
||||
edges { node { id agentName } }
|
||||
pageInfo { hasNextPage endCursor }
|
||||
} }`,
|
||||
).catch(() => ({ calls: { edges: [], pageInfo: {} } }));
|
||||
const edges = data?.calls?.edges ?? [];
|
||||
scanned += edges.length;
|
||||
|
||||
for (const edge of edges) {
|
||||
const call = edge.node;
|
||||
if (!call.agentName || call.agentName.trim() === '') { skipped++; continue; }
|
||||
|
||||
// Take the final hop of the transfer chain, trimmed.
|
||||
const segments = call.agentName.split('->').map((s: string) => s.trim()).filter(Boolean);
|
||||
const last = segments[segments.length - 1];
|
||||
if (!last) { skipped++; continue; }
|
||||
|
||||
// Prefer ozonetelAgentId match (outbound rows store
|
||||
// agentName=agentId); fall back to ozonetelDisplayName
|
||||
// (inbound webhook rows store the Ozonetel display string);
|
||||
// last-resort match on platform Agent.name.
|
||||
const key = last.toLowerCase();
|
||||
const uuid = agentUuidByOzonetelId.get(key)
|
||||
?? agentUuidByDisplayName.get(key)
|
||||
?? agentUuidByName.get(key);
|
||||
if (!uuid) {
|
||||
unmatched++;
|
||||
if (unmatchedSamples.size < 10) unmatchedSamples.add(last);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Store the raw chain on transferredTo if it was actually chained,
|
||||
// so the audit trail is preserved even without CDR data.
|
||||
const patchData: Record<string, any> = { agentId: uuid };
|
||||
if (segments.length > 1) patchData.transferredTo = call.agentName;
|
||||
|
||||
try {
|
||||
await this.platform.query<any>(
|
||||
`mutation($id: UUID!, $data: CallUpdateInput!) { updateCall(id: $id, data: $data) { id } }`,
|
||||
{ id: call.id, data: patchData },
|
||||
);
|
||||
patched++;
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
} catch (err) {
|
||||
skipped++;
|
||||
}
|
||||
}
|
||||
|
||||
const pageInfo = data?.calls?.pageInfo ?? {};
|
||||
if (!pageInfo.hasNextPage) break;
|
||||
after = pageInfo.endCursor ?? null;
|
||||
}
|
||||
|
||||
this.logger.log(`[MAINT] Backfill by name complete: scanned=${scanned} patched=${patched} unmatched=${unmatched} skipped=${skipped}`);
|
||||
return {
|
||||
status: 'ok',
|
||||
scanned,
|
||||
patched,
|
||||
unmatched,
|
||||
skipped,
|
||||
unmatchedSamples: Array.from(unmatchedSamples),
|
||||
};
|
||||
}
|
||||
|
||||
private async enrichSingleDate(date: string): Promise<{ scanned: number; enriched: number; skipped: number }> {
|
||||
// Reuse the cdr-enrichment path via its runOnce method, but scoped.
|
||||
// For simplicity we reimplement the single-date logic here so we can
|
||||
// parameterize the date without leaking CDR-enrichment internals.
|
||||
const cdrRows = await this.ozonetel.fetchCDR({ date });
|
||||
if (cdrRows.length === 0) return { scanned: 0, enriched: 0, skipped: 0 };
|
||||
|
||||
const byUcid = new Map<string, any>();
|
||||
for (const row of cdrRows) {
|
||||
const ucid = String(row.UCID ?? '').trim();
|
||||
if (ucid) byUcid.set(ucid, row);
|
||||
}
|
||||
|
||||
// Fetch calls missing agent link on this date
|
||||
const gte = `${date}T00:00:00+05:30`;
|
||||
const lte = `${date}T23:59:59+05:30`;
|
||||
const calls: Array<any> = [];
|
||||
let after: string | null = null;
|
||||
for (let page = 0; page < 30; page++) {
|
||||
const cursorArg: string = after ? `, after: "${after}"` : '';
|
||||
const data: any = await this.platform.query<any>(
|
||||
`{ calls(first: 200${cursorArg}, filter: {
|
||||
startedAt: { gte: "${gte}", lte: "${lte}" },
|
||||
ucid: { is: NOT_NULL },
|
||||
agentId: { is: NULL }
|
||||
}) {
|
||||
edges { node { id ucid agentId transferredTo transferType } }
|
||||
pageInfo { hasNextPage endCursor }
|
||||
} }`,
|
||||
).catch(() => ({ calls: { edges: [], pageInfo: {} } }));
|
||||
const edges = data?.calls?.edges ?? [];
|
||||
for (const e of edges) calls.push(e.node);
|
||||
const pageInfo = data?.calls?.pageInfo ?? {};
|
||||
if (!pageInfo.hasNextPage) break;
|
||||
after = pageInfo.endCursor ?? null;
|
||||
}
|
||||
|
||||
let enriched = 0;
|
||||
let skipped = 0;
|
||||
for (const call of calls) {
|
||||
const cdrRow = byUcid.get(String(call.ucid).trim());
|
||||
if (!cdrRow) { skipped++; continue; }
|
||||
const patch: Record<string, any> = {};
|
||||
if (cdrRow.AgentID && !call.agentId) {
|
||||
const uuid = await this.agentLookup.resolveByOzonetelId(cdrRow.AgentID);
|
||||
if (uuid) patch.agentId = uuid;
|
||||
if (cdrRow.AgentName) patch.agentName = cdrRow.AgentName;
|
||||
}
|
||||
if (cdrRow.TransferredTo && !call.transferredTo) patch.transferredTo = cdrRow.TransferredTo;
|
||||
if (cdrRow.TransferType && !call.transferType) patch.transferType = cdrRow.TransferType;
|
||||
|
||||
if (Object.keys(patch).length === 0) { skipped++; continue; }
|
||||
try {
|
||||
await this.platform.query<any>(
|
||||
`mutation($id: UUID!, $data: CallUpdateInput!) { updateCall(id: $id, data: $data) { id } }`,
|
||||
{ id: call.id, data: patch },
|
||||
);
|
||||
enriched++;
|
||||
await new Promise((r) => setTimeout(r, 80));
|
||||
} catch (err) {
|
||||
skipped++;
|
||||
}
|
||||
}
|
||||
return { scanned: calls.length, enriched, skipped };
|
||||
}
|
||||
|
||||
private recentDatesIst(n: number): string[] {
|
||||
const dates: string[] = [];
|
||||
for (let i = 0; i < n; i++) {
|
||||
const d = new Date(Date.now() + 5.5 * 60 * 60 * 1000 - i * 24 * 60 * 60 * 1000);
|
||||
dates.push(d.toISOString().slice(0, 10));
|
||||
}
|
||||
return dates;
|
||||
}
|
||||
|
||||
// Infer clinicId on historical Appointments that were written before
|
||||
// the clinicId-persistence fix went live. Lookup path:
|
||||
// Appointment.doctorId + Appointment.scheduledAt.dayOfWeek
|
||||
// → DoctorVisitSlot rows for that doctor on that weekday
|
||||
// → if single clinic → use it
|
||||
// → if multiple clinics → match by time-of-day window (slot covers scheduledAt time)
|
||||
// → if still ambiguous → match by department, else skip
|
||||
//
|
||||
// POST /api/maint/backfill-appointment-clinics
|
||||
// Headers: x-maint-otp: <OTP>
|
||||
@Post('backfill-appointment-clinics')
|
||||
async backfillAppointmentClinics() {
|
||||
this.logger.log('[MAINT] Backfill Appointment.clinicId — inferring from doctorVisitSlots');
|
||||
|
||||
// 1. Pull all appointments missing clinicId
|
||||
const appointments: Array<{ id: string; doctorId: string | null; scheduledAt: string | null; department: string | null }> = [];
|
||||
let after: string | null = null;
|
||||
for (let page = 0; page < 50; page++) {
|
||||
const cursor: string = after ? `, after: "${after}"` : '';
|
||||
const data: any = await this.platform.query<any>(
|
||||
`{ appointments(first: 200${cursor}, filter: { clinicId: { is: NULL } }) {
|
||||
edges { node { id doctorId scheduledAt department } }
|
||||
pageInfo { hasNextPage endCursor }
|
||||
} }`,
|
||||
).catch(() => ({ appointments: { edges: [], pageInfo: {} } }));
|
||||
const edges = data?.appointments?.edges ?? [];
|
||||
for (const e of edges) appointments.push(e.node);
|
||||
const info = data?.appointments?.pageInfo ?? {};
|
||||
if (!info.hasNextPage) break;
|
||||
after = info.endCursor ?? null;
|
||||
}
|
||||
this.logger.log(`[MAINT] Found ${appointments.length} appointments missing clinicId`);
|
||||
if (appointments.length === 0) {
|
||||
return { status: 'ok', scanned: 0, patched: 0, skipped: 0 };
|
||||
}
|
||||
|
||||
// 2. For each unique doctorId, pre-load visit slots (7 weekdays × clinic rows).
|
||||
const uniqueDoctorIds = [...new Set(appointments.map((a) => a.doctorId).filter(Boolean) as string[])];
|
||||
const slotsByDoctor = new Map<string, Array<{ dayOfWeek: string; startTime: string; endTime: string; clinicId: string; clinicName: string }>>();
|
||||
for (const docId of uniqueDoctorIds) {
|
||||
try {
|
||||
const data: any = await this.platform.query<any>(
|
||||
`{ doctorVisitSlots(first: 50, filter: { doctorId: { eq: "${docId}" } }) {
|
||||
edges { node { dayOfWeek startTime endTime clinic { id clinicName } } }
|
||||
} }`,
|
||||
);
|
||||
const rows = (data?.doctorVisitSlots?.edges ?? []).map((e: any) => ({
|
||||
dayOfWeek: e.node.dayOfWeek,
|
||||
startTime: e.node.startTime,
|
||||
endTime: e.node.endTime,
|
||||
clinicId: e.node.clinic?.id,
|
||||
clinicName: e.node.clinic?.clinicName ?? '',
|
||||
})).filter((r: any) => r.clinicId);
|
||||
slotsByDoctor.set(docId, rows);
|
||||
} catch {
|
||||
slotsByDoctor.set(docId, []);
|
||||
}
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
}
|
||||
|
||||
// 3. Walk each appointment, infer the clinic, patch.
|
||||
let patched = 0;
|
||||
let skipped = 0;
|
||||
const skippedReasons: Record<string, number> = { noDoctor: 0, noScheduledAt: 0, noSlots: 0, ambiguous: 0 };
|
||||
|
||||
for (const appt of appointments) {
|
||||
if (!appt.doctorId) { skipped++; skippedReasons.noDoctor++; continue; }
|
||||
if (!appt.scheduledAt) { skipped++; skippedReasons.noScheduledAt++; continue; }
|
||||
|
||||
const slots = slotsByDoctor.get(appt.doctorId) ?? [];
|
||||
if (slots.length === 0) { skipped++; skippedReasons.noSlots++; continue; }
|
||||
|
||||
// Appointment time in IST
|
||||
const ist = new Date(new Date(appt.scheduledAt).getTime() + 5.5 * 60 * 60 * 1000);
|
||||
const dayOfWeek = ist.toLocaleDateString('en-US', { weekday: 'long', timeZone: 'UTC' }).toUpperCase();
|
||||
const apptMinutes = ist.getUTCHours() * 60 + ist.getUTCMinutes();
|
||||
|
||||
// Match slots for same weekday where the appointment time falls within the window
|
||||
const toMin = (hhmm: string): number => {
|
||||
const [h, m] = hhmm.split(':').map(Number);
|
||||
return h * 60 + (m ?? 0);
|
||||
};
|
||||
let candidates = slots.filter((s) => s.dayOfWeek === dayOfWeek);
|
||||
if (candidates.length > 0) {
|
||||
const inWindow = candidates.filter((s) => {
|
||||
const start = toMin(s.startTime ?? '00:00');
|
||||
const end = toMin(s.endTime ?? '23:59');
|
||||
return apptMinutes >= start && apptMinutes < end;
|
||||
});
|
||||
if (inWindow.length > 0) candidates = inWindow;
|
||||
}
|
||||
// Distinct clinics among candidates
|
||||
const distinctClinics = [...new Set(candidates.map((c) => c.clinicId))];
|
||||
let clinicId: string | null = null;
|
||||
if (distinctClinics.length === 1) {
|
||||
clinicId = distinctClinics[0];
|
||||
} else if (distinctClinics.length > 1) {
|
||||
// Ambiguous — doctor visits multiple clinics in this window.
|
||||
// Pick deterministically by clinic id lex-order so re-runs land
|
||||
// on the same choice. Log the ambiguity so QA can review.
|
||||
clinicId = [...distinctClinics].sort()[0];
|
||||
this.logger.debug(`[MAINT] Ambiguous clinic for appt=${appt.id} — doctor=${appt.doctorId} day=${dayOfWeek} candidates=${distinctClinics.join(',')} picked=${clinicId}`);
|
||||
}
|
||||
// Last resort: any clinic for that doctor (pick first)
|
||||
if (!clinicId && slots.length > 0) clinicId = slots[0].clinicId;
|
||||
|
||||
if (!clinicId) { skipped++; skippedReasons.ambiguous++; continue; }
|
||||
|
||||
try {
|
||||
await this.platform.query<any>(
|
||||
`mutation($id: UUID!, $data: AppointmentUpdateInput!) { updateAppointment(id: $id, data: $data) { id } }`,
|
||||
{ id: appt.id, data: { clinicId } },
|
||||
);
|
||||
patched++;
|
||||
await new Promise((r) => setTimeout(r, 40));
|
||||
} catch (err: any) {
|
||||
this.logger.warn(`[MAINT] updateAppointment(${appt.id}) failed: ${err?.message ?? err}`);
|
||||
skipped++;
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.log(`[MAINT] Appointment clinic backfill complete: scanned=${appointments.length} patched=${patched} skipped=${skipped} reasons=${JSON.stringify(skippedReasons)}`);
|
||||
return { status: 'ok', scanned: appointments.length, patched, skipped, skippedReasons };
|
||||
}
|
||||
}
|
||||
|
||||
45
src/masterdata/masterdata.controller.ts
Normal file
45
src/masterdata/masterdata.controller.ts
Normal file
@@ -0,0 +1,45 @@
|
||||
import { Controller, Get, Query, Logger } from '@nestjs/common';
|
||||
import { MasterdataService } from './masterdata.service';
|
||||
|
||||
@Controller('api/masterdata')
|
||||
export class MasterdataController {
|
||||
private readonly logger = new Logger(MasterdataController.name);
|
||||
|
||||
constructor(private masterdata: MasterdataService) {}
|
||||
|
||||
@Get('departments')
|
||||
async departments() {
|
||||
return this.masterdata.getDepartments();
|
||||
}
|
||||
|
||||
@Get('doctors')
|
||||
async doctors() {
|
||||
return this.masterdata.getDoctors();
|
||||
}
|
||||
|
||||
@Get('clinics')
|
||||
async clinics() {
|
||||
return this.masterdata.getClinics();
|
||||
}
|
||||
|
||||
// Available time slots for a doctor on a given date.
|
||||
// Computed from DoctorVisitSlot entities (doctor × clinic × dayOfWeek).
|
||||
// Returns 30-min slots within the doctor's visiting window for that day.
|
||||
//
|
||||
// GET /api/masterdata/slots?doctorId=xxx&date=2026-04-15
|
||||
@Get('slots')
|
||||
async slots(
|
||||
@Query('doctorId') doctorId: string,
|
||||
@Query('date') date: string,
|
||||
) {
|
||||
if (!doctorId || !date) return [];
|
||||
return this.masterdata.getAvailableSlots(doctorId, date);
|
||||
}
|
||||
|
||||
// Force cache refresh (admin use)
|
||||
@Get('refresh')
|
||||
async refresh() {
|
||||
await this.masterdata.invalidateAll();
|
||||
return { refreshed: true };
|
||||
}
|
||||
}
|
||||
13
src/masterdata/masterdata.module.ts
Normal file
13
src/masterdata/masterdata.module.ts
Normal file
@@ -0,0 +1,13 @@
|
||||
import { Module } from '@nestjs/common';
|
||||
import { PlatformModule } from '../platform/platform.module';
|
||||
import { AuthModule } from '../auth/auth.module';
|
||||
import { MasterdataController } from './masterdata.controller';
|
||||
import { MasterdataService } from './masterdata.service';
|
||||
|
||||
@Module({
|
||||
imports: [PlatformModule, AuthModule],
|
||||
controllers: [MasterdataController],
|
||||
providers: [MasterdataService],
|
||||
exports: [MasterdataService],
|
||||
})
|
||||
export class MasterdataModule {}
|
||||
213
src/masterdata/masterdata.service.ts
Normal file
213
src/masterdata/masterdata.service.ts
Normal file
@@ -0,0 +1,213 @@
|
||||
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
|
||||
import { ConfigService } from '@nestjs/config';
|
||||
import { PlatformGraphqlService } from '../platform/platform-graphql.service';
|
||||
import { SessionService } from '../auth/session.service';
|
||||
|
||||
// Master data: cached lookups for departments, doctors, clinics.
|
||||
// Fetched from the platform on first request, cached in Redis with TTL.
|
||||
// Frontend dropdowns use these instead of direct GraphQL queries.
|
||||
|
||||
const CACHE_TTL = 300; // 5 minutes
|
||||
const KEY_DEPARTMENTS = 'masterdata:departments';
|
||||
const KEY_DOCTORS = 'masterdata:doctors';
|
||||
const KEY_CLINICS = 'masterdata:clinics';
|
||||
|
||||
@Injectable()
|
||||
export class MasterdataService implements OnModuleInit {
|
||||
private readonly logger = new Logger(MasterdataService.name);
|
||||
private readonly apiKey: string;
|
||||
|
||||
constructor(
|
||||
private config: ConfigService,
|
||||
private platform: PlatformGraphqlService,
|
||||
private cache: SessionService,
|
||||
) {
|
||||
this.apiKey = this.config.get<string>('platform.apiKey') ?? process.env.PLATFORM_API_KEY ?? '';
|
||||
}
|
||||
|
||||
async onModuleInit() {
|
||||
// Warm cache on startup
|
||||
try {
|
||||
await this.getDepartments();
|
||||
await this.getDoctors();
|
||||
await this.getClinics();
|
||||
this.logger.log('Master data cache warmed');
|
||||
} catch (err: any) {
|
||||
this.logger.warn(`Cache warm failed: ${err.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
async getDepartments(): Promise<string[]> {
|
||||
const cached = await this.cache.getCache(KEY_DEPARTMENTS);
|
||||
if (cached) return JSON.parse(cached);
|
||||
|
||||
const auth = `Bearer ${this.apiKey}`;
|
||||
const data = await this.platform.queryWithAuth<any>(
|
||||
`{ doctors(first: 500) { edges { node { department } } } }`,
|
||||
undefined, auth,
|
||||
);
|
||||
|
||||
const departments = Array.from(new Set(
|
||||
data.doctors.edges
|
||||
.map((e: any) => e.node.department)
|
||||
.filter((d: string) => d && d.trim()),
|
||||
)).sort() as string[];
|
||||
|
||||
await this.cache.setCache(KEY_DEPARTMENTS, JSON.stringify(departments), CACHE_TTL);
|
||||
this.logger.log(`Cached ${departments.length} departments`);
|
||||
return departments;
|
||||
}
|
||||
|
||||
async getDoctors(): Promise<Array<{ id: string; name: string; department: string; qualifications: string }>> {
|
||||
const cached = await this.cache.getCache(KEY_DOCTORS);
|
||||
if (cached) return JSON.parse(cached);
|
||||
|
||||
const auth = `Bearer ${this.apiKey}`;
|
||||
const data = await this.platform.queryWithAuth<any>(
|
||||
`{ doctors(first: 500) { edges { node {
|
||||
id name department qualifications specialty active
|
||||
fullName { firstName lastName }
|
||||
} } } }`,
|
||||
undefined, auth,
|
||||
);
|
||||
|
||||
const doctors = data.doctors.edges
|
||||
.map((e: any) => ({
|
||||
id: e.node.id,
|
||||
name: e.node.name ?? `${e.node.fullName?.firstName ?? ''} ${e.node.fullName?.lastName ?? ''}`.trim(),
|
||||
department: e.node.department ?? '',
|
||||
qualifications: e.node.qualifications ?? '',
|
||||
specialty: e.node.specialty ?? '',
|
||||
active: e.node.active ?? true,
|
||||
}))
|
||||
.filter((d: any) => d.active !== false);
|
||||
|
||||
await this.cache.setCache(KEY_DOCTORS, JSON.stringify(doctors), CACHE_TTL);
|
||||
this.logger.log(`Cached ${doctors.length} doctors`);
|
||||
return doctors;
|
||||
}
|
||||
|
||||
async getClinics(): Promise<Array<{ id: string; name: string; phone: string; address: string; opensAt: string; closesAt: string }>> {
|
||||
const cached = await this.cache.getCache(KEY_CLINICS);
|
||||
if (cached) return JSON.parse(cached);
|
||||
|
||||
const auth = `Bearer ${this.apiKey}`;
|
||||
const data = await this.platform.queryWithAuth<any>(
|
||||
`{ clinics(first: 50) { edges { node {
|
||||
id clinicName status opensAt closesAt
|
||||
phone { primaryPhoneNumber }
|
||||
addressCustom { addressCity addressState }
|
||||
} } } }`,
|
||||
undefined, auth,
|
||||
);
|
||||
|
||||
const clinics = data.clinics.edges
|
||||
.filter((e: any) => e.node.status !== 'INACTIVE')
|
||||
.map((e: any) => ({
|
||||
id: e.node.id,
|
||||
name: e.node.clinicName ?? '',
|
||||
phone: e.node.phone?.primaryPhoneNumber ?? '',
|
||||
opensAt: e.node.opensAt ?? '08:00',
|
||||
closesAt: e.node.closesAt ?? '20:00',
|
||||
address: [e.node.addressCustom?.addressCity, e.node.addressCustom?.addressState].filter(Boolean).join(', '),
|
||||
}));
|
||||
|
||||
await this.cache.setCache(KEY_CLINICS, JSON.stringify(clinics), CACHE_TTL);
|
||||
this.logger.log(`Cached ${clinics.length} clinics`);
|
||||
return clinics;
|
||||
}
|
||||
|
||||
// Available time slots for a doctor on a given date.
|
||||
// Reads DoctorVisitSlot entities for the matching dayOfWeek,
|
||||
// then generates 30-min slots within each visiting window.
|
||||
async getAvailableSlots(doctorId: string, date: string): Promise<Array<{ time: string; label: string; clinicId: string; clinicName: string }>> {
|
||||
const dayOfWeek = new Date(date).toLocaleDateString('en-US', { weekday: 'long' }).toUpperCase();
|
||||
const cacheKey = `masterdata:slots:${doctorId}:${dayOfWeek}`;
|
||||
|
||||
// Cache stores the UNFILTERED full-day slot list (keyed by dayOfWeek,
|
||||
// so it's reusable across dates that fall on the same weekday). The
|
||||
// "hide past slots on today" filter is applied AFTER cache read so it
|
||||
// stays correct as real-time advances without cache churn.
|
||||
const cached = await this.cache.getCache(cacheKey);
|
||||
if (cached) return this.filterPastSlotsForToday(JSON.parse(cached), date);
|
||||
|
||||
const auth = `Bearer ${this.apiKey}`;
|
||||
const data = await this.platform.queryWithAuth<any>(
|
||||
`{ doctorVisitSlots(first: 100, filter: { doctorId: { eq: "${doctorId}" }, dayOfWeek: { eq: ${dayOfWeek} } }) {
|
||||
edges { node { id startTime endTime clinic { id clinicName } } }
|
||||
} }`,
|
||||
undefined, auth,
|
||||
);
|
||||
|
||||
const slots: Array<{ time: string; label: string; clinicId: string; clinicName: string }> = [];
|
||||
|
||||
for (const edge of data.doctorVisitSlots?.edges ?? []) {
|
||||
const node = edge.node;
|
||||
const clinicId = node.clinic?.id ?? '';
|
||||
const clinicName = node.clinic?.clinicName ?? '';
|
||||
const startTime = node.startTime ?? '09:00';
|
||||
const endTime = node.endTime ?? '17:00';
|
||||
|
||||
// Generate 30-min slots within visiting window
|
||||
const [startH, startM] = startTime.split(':').map(Number);
|
||||
const [endH, endM] = endTime.split(':').map(Number);
|
||||
let h = startH, m = startM ?? 0;
|
||||
const endMin = endH * 60 + (endM ?? 0);
|
||||
|
||||
while (h * 60 + m < endMin) {
|
||||
const hh = h.toString().padStart(2, '0');
|
||||
const mm = m.toString().padStart(2, '0');
|
||||
const ampm = h < 12 ? 'AM' : 'PM';
|
||||
const displayH = h === 0 ? 12 : h > 12 ? h - 12 : h;
|
||||
slots.push({
|
||||
time: `${hh}:${mm}`,
|
||||
label: `${displayH}:${mm.toString().padStart(2, '0')} ${ampm} — ${clinicName}`,
|
||||
clinicId,
|
||||
clinicName,
|
||||
});
|
||||
m += 30;
|
||||
if (m >= 60) { h++; m = 0; }
|
||||
}
|
||||
}
|
||||
|
||||
// Sort by time
|
||||
slots.sort((a, b) => a.time.localeCompare(b.time));
|
||||
|
||||
// Cache the full UNFILTERED list so reuse across dates (same dayOfWeek)
|
||||
// doesn't mis-serve filtered data from an earlier date.
|
||||
await this.cache.setCache(cacheKey, JSON.stringify(slots), CACHE_TTL);
|
||||
this.logger.log(`Generated ${slots.length} slots for doctor ${doctorId} on ${dayOfWeek}`);
|
||||
|
||||
return this.filterPastSlotsForToday(slots, date);
|
||||
}
|
||||
|
||||
// When the requested date is today (IST), hide slots whose time has
|
||||
// already passed (30-min buffer so we don't offer the impossible-to-keep
|
||||
// "in 5 minutes" slot). Applies to both cache-hit and fresh fetch paths.
|
||||
private filterPastSlotsForToday(
|
||||
slots: Array<{ time: string; label: string; clinicId: string; clinicName: string }>,
|
||||
date: string,
|
||||
): Array<{ time: string; label: string; clinicId: string; clinicName: string }> {
|
||||
const todayIst = new Date().toLocaleDateString('en-CA', { timeZone: 'Asia/Kolkata' });
|
||||
if (date !== todayIst) return slots;
|
||||
|
||||
const nowHHMM = new Date().toLocaleTimeString('en-GB', {
|
||||
timeZone: 'Asia/Kolkata', hour: '2-digit', minute: '2-digit',
|
||||
});
|
||||
const [nowH, nowM] = nowHHMM.split(':').map(Number);
|
||||
const cutoff = nowH * 60 + nowM + 30; // 30-min buffer
|
||||
const filtered = slots.filter((s) => {
|
||||
const [h, m] = s.time.split(':').map(Number);
|
||||
return h * 60 + m >= cutoff;
|
||||
});
|
||||
this.logger.log(`[SLOTS] Today filter: ${slots.length} → ${filtered.length} (now=${nowHHMM} IST, cutoff=${Math.floor(cutoff / 60)}:${String(cutoff % 60).padStart(2, '0')})`);
|
||||
return filtered;
|
||||
}
|
||||
|
||||
async invalidateAll(): Promise<void> {
|
||||
await this.cache.setCache(KEY_DEPARTMENTS, '', 1);
|
||||
await this.cache.setCache(KEY_DOCTORS, '', 1);
|
||||
await this.cache.setCache(KEY_CLINICS, '', 1);
|
||||
this.logger.log('Master data cache invalidated');
|
||||
}
|
||||
}
|
||||
174
src/ozonetel/cdr-enrichment.service.ts
Normal file
174
src/ozonetel/cdr-enrichment.service.ts
Normal file
@@ -0,0 +1,174 @@
|
||||
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
|
||||
import { OzonetelAgentService } from './ozonetel-agent.service';
|
||||
import { PlatformGraphqlService } from '../platform/platform-graphql.service';
|
||||
import { AgentLookupService } from '../platform/agent-lookup.service';
|
||||
|
||||
/**
|
||||
* Periodically pulls Ozonetel CDR (per-row, includes unique AgentID) and
|
||||
* enriches Call records that were created from the missed-call webhook
|
||||
* or outbound dispose without the authoritative agent relation.
|
||||
*
|
||||
* Runs every 30 minutes — well under Ozonetel's 2-req/min cap on the CDR
|
||||
* endpoints (one fetch per workspace per tick = 2/hour).
|
||||
*
|
||||
* Pairs Call rows to CDR rows by `ucid`. Only patches Calls that are
|
||||
* missing `agentId` / `transferredTo` / `transferType` — idempotent.
|
||||
*/
|
||||
const ENRICHMENT_INTERVAL_MS = 30 * 60 * 1000;
|
||||
const ENRICHMENT_DATE_WINDOW_DAYS = 2; // today + yesterday in case late-arriving calls straddle IST midnight
|
||||
|
||||
@Injectable()
|
||||
export class CdrEnrichmentService implements OnModuleInit, OnModuleDestroy {
|
||||
private readonly logger = new Logger(CdrEnrichmentService.name);
|
||||
private timer: NodeJS.Timeout | null = null;
|
||||
|
||||
constructor(
|
||||
private readonly ozonetel: OzonetelAgentService,
|
||||
private readonly platform: PlatformGraphqlService,
|
||||
private readonly agentLookup: AgentLookupService,
|
||||
) {}
|
||||
|
||||
async onModuleInit() {
|
||||
// Kick off after 60s so the sidecar isn't hammering platform during boot,
|
||||
// then settle into the 30-min cadence.
|
||||
setTimeout(() => {
|
||||
this.runOnce().catch((err) => {
|
||||
this.logger.warn(`[CDR-ENRICH] First run failed: ${err?.message ?? err}`);
|
||||
});
|
||||
}, 60_000);
|
||||
this.timer = setInterval(() => {
|
||||
this.runOnce().catch((err) => {
|
||||
this.logger.warn(`[CDR-ENRICH] Tick failed: ${err?.message ?? err}`);
|
||||
});
|
||||
}, ENRICHMENT_INTERVAL_MS);
|
||||
}
|
||||
|
||||
onModuleDestroy() {
|
||||
if (this.timer) clearInterval(this.timer);
|
||||
}
|
||||
|
||||
async runOnce(): Promise<{ scanned: number; enriched: number; skipped: number }> {
|
||||
let scanned = 0;
|
||||
let enriched = 0;
|
||||
let skipped = 0;
|
||||
|
||||
// Walk the IST-date window. For each date, pull CDR + patch Calls.
|
||||
// Sleep 35s between dates — Ozonetel caps CDR endpoints at 2 req/min
|
||||
// and the dispose flow shares that budget (fetchCdrByUCID per outbound).
|
||||
const dates = this.recentDatesIst(ENRICHMENT_DATE_WINDOW_DAYS);
|
||||
for (let i = 0; i < dates.length; i++) {
|
||||
const date = dates[i];
|
||||
if (i > 0) await new Promise((r) => setTimeout(r, 35_000));
|
||||
const cdrRows = await this.ozonetel.fetchCDR({ date }).catch(() => []);
|
||||
if (cdrRows.length === 0) continue;
|
||||
|
||||
// Build UCID → cdr-row map so we can O(1) join per Call.
|
||||
// Ozonetel emits two identifiers per call — `UCID` (caller-leg)
|
||||
// and `monitorUCID` (agent-leg). The webhook stores `monitorUCID`,
|
||||
// but the bulk CDR rows are keyed on caller-leg `UCID`. Index
|
||||
// both so the lookup at line ~79 finds the row regardless of
|
||||
// which side was persisted. Without this, transferred inbound
|
||||
// calls never get their agent relation enriched.
|
||||
const byUcid = new Map<string, any>();
|
||||
for (const row of cdrRows) {
|
||||
const ucid = String(row.UCID ?? '').trim();
|
||||
const monitorUcid = String(row.monitorUCID ?? '').trim();
|
||||
if (ucid) byUcid.set(ucid, row);
|
||||
if (monitorUcid && monitorUcid !== ucid) byUcid.set(monitorUcid, row);
|
||||
}
|
||||
if (byUcid.size === 0) continue;
|
||||
|
||||
// Pull Calls in the same date window that are missing agent linkage
|
||||
// (i.e. ucid set, agentId null). Patch each.
|
||||
const calls = await this.fetchCallsMissingAgent(date);
|
||||
scanned += calls.length;
|
||||
|
||||
for (const call of calls) {
|
||||
const cdrRow = byUcid.get(String(call.ucid).trim());
|
||||
if (!cdrRow) { skipped++; continue; }
|
||||
|
||||
const patch: Record<string, any> = {};
|
||||
if (!call.agentId) {
|
||||
// Primary resolution: use AgentID from CDR (unique lowercase id).
|
||||
const cdrAgentId = cdrRow.AgentID;
|
||||
let uuid = cdrAgentId
|
||||
? await this.agentLookup.resolveByOzonetelId(cdrAgentId)
|
||||
: null;
|
||||
// Fallback: CDR AgentName may be a chain ("A -> B") for
|
||||
// transferred calls. Pick the final handler (last segment)
|
||||
// and look it up by display name or ozonetelId. Matches
|
||||
// the write-time resolution in missed-call-webhook.
|
||||
if (!uuid && cdrRow.AgentName) {
|
||||
const segments = String(cdrRow.AgentName).split('->').map((s) => s.trim()).filter(Boolean);
|
||||
const finalHandler = segments[segments.length - 1];
|
||||
if (finalHandler) {
|
||||
uuid =
|
||||
(await this.agentLookup.resolveByOzonetelId(finalHandler)) ??
|
||||
(await this.agentLookup.resolveByDisplayName(finalHandler));
|
||||
}
|
||||
}
|
||||
if (uuid) patch.agentId = uuid;
|
||||
if (cdrRow.AgentName) patch.agentName = cdrRow.AgentName;
|
||||
}
|
||||
if (cdrRow.TransferredTo && !call.transferredTo) patch.transferredTo = cdrRow.TransferredTo;
|
||||
if (cdrRow.TransferType && !call.transferType) patch.transferType = cdrRow.TransferType;
|
||||
|
||||
if (Object.keys(patch).length === 0) { skipped++; continue; }
|
||||
|
||||
try {
|
||||
await this.platform.query<any>(
|
||||
`mutation($id: UUID!, $data: CallUpdateInput!) { updateCall(id: $id, data: $data) { id } }`,
|
||||
{ id: call.id, data: patch },
|
||||
);
|
||||
enriched++;
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
} catch (err: any) {
|
||||
this.logger.warn(`[CDR-ENRICH] Patch failed for ${call.id}: ${err?.message ?? err}`);
|
||||
skipped++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (scanned > 0 || enriched > 0) {
|
||||
this.logger.log(`[CDR-ENRICH] Pass complete — dates=[${dates.join(',')}] scanned=${scanned} enriched=${enriched} skipped=${skipped}`);
|
||||
}
|
||||
return { scanned, enriched, skipped };
|
||||
}
|
||||
|
||||
private async fetchCallsMissingAgent(date: string): Promise<Array<{ id: string; ucid: string | null; agentId: string | null; transferredTo: string | null; transferType: string | null }>> {
|
||||
// Bound by IST day. CDR window is 15 days; we only ever need recent.
|
||||
const gte = `${date}T00:00:00+05:30`;
|
||||
const lte = `${date}T23:59:59+05:30`;
|
||||
const results: Array<any> = [];
|
||||
let after: string | null = null;
|
||||
|
||||
for (let page = 0; page < 20; page++) {
|
||||
const cursorArg: string = after ? `, after: "${after}"` : '';
|
||||
const data: any = await this.platform.query<any>(
|
||||
`{ calls(first: 200${cursorArg}, filter: {
|
||||
startedAt: { gte: "${gte}", lte: "${lte}" },
|
||||
ucid: { is: NOT_NULL },
|
||||
agentId: { is: NULL }
|
||||
}) {
|
||||
edges { node { id ucid agentId transferredTo transferType } }
|
||||
pageInfo { hasNextPage endCursor }
|
||||
} }`,
|
||||
).catch(() => ({ calls: { edges: [], pageInfo: {} } }));
|
||||
const edges = data?.calls?.edges ?? [];
|
||||
for (const e of edges) results.push(e.node);
|
||||
const pageInfo = data?.calls?.pageInfo ?? {};
|
||||
if (!pageInfo.hasNextPage) break;
|
||||
after = pageInfo.endCursor ?? null;
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
private recentDatesIst(n: number): string[] {
|
||||
const dates: string[] = [];
|
||||
for (let i = 0; i < n; i++) {
|
||||
const d = new Date(Date.now() + 5.5 * 60 * 60 * 1000 - i * 24 * 60 * 60 * 1000);
|
||||
dates.push(d.toISOString().slice(0, 10));
|
||||
}
|
||||
return dates;
|
||||
}
|
||||
}
|
||||
127
src/ozonetel/ozonetel-admin-auth.service.ts
Normal file
127
src/ozonetel/ozonetel-admin-auth.service.ts
Normal file
@@ -0,0 +1,127 @@
|
||||
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
|
||||
import { publicEncrypt, constants as cryptoConstants } from 'crypto';
|
||||
import axios from 'axios';
|
||||
import { TelephonyConfigService } from '../config/telephony-config.service';
|
||||
|
||||
// Ozonetel admin API auth — login with RSA-encrypted credentials, cache JWT.
|
||||
// Used by supervisor barge endpoints to call dashboardApi.
|
||||
//
|
||||
// Auth flow (from CA-Admin source code):
|
||||
// 1. GET /api/auth/public-key → { publicKey, keyId }
|
||||
// 2. RSA-encrypt username + password with publicKey
|
||||
// 3. POST /auth/login → JWT token
|
||||
// 4. All admin API calls use: Authorization: Bearer <jwt>, userId, userName, isSuperAdmin
|
||||
|
||||
@Injectable()
|
||||
export class OzonetelAdminAuthService implements OnModuleInit {
|
||||
private readonly logger = new Logger(OzonetelAdminAuthService.name);
|
||||
private cachedToken: string | null = null;
|
||||
private cachedUserId: string | null = null;
|
||||
private cachedUserName: string | null = null;
|
||||
private tokenExpiresAt = 0;
|
||||
|
||||
constructor(private readonly telephony: TelephonyConfigService) {}
|
||||
|
||||
async onModuleInit() {
|
||||
const config = this.telephony.getConfig();
|
||||
if (config.ozonetel.adminUsername && config.ozonetel.adminPassword) {
|
||||
this.logger.log('Ozonetel admin credentials configured — will authenticate on first use');
|
||||
} else {
|
||||
this.logger.warn('Ozonetel admin credentials not configured — supervisor barge will be unavailable');
|
||||
}
|
||||
}
|
||||
|
||||
private get apiBase(): string {
|
||||
return 'https://api.cloudagent.ozonetel.com';
|
||||
}
|
||||
|
||||
async getAuthHeaders(): Promise<Record<string, string>> {
|
||||
const token = await this.getToken();
|
||||
return {
|
||||
'Content-Type': 'application/json',
|
||||
'Authorization': `Bearer ${token}`,
|
||||
'userId': this.cachedUserId ?? '',
|
||||
'userName': this.cachedUserName ?? '',
|
||||
'isSuperAdmin': 'true',
|
||||
'dAccessType': 'false',
|
||||
};
|
||||
}
|
||||
|
||||
async getToken(): Promise<string> {
|
||||
if (this.cachedToken && Date.now() < this.tokenExpiresAt) {
|
||||
return this.cachedToken;
|
||||
}
|
||||
return this.login();
|
||||
}
|
||||
|
||||
private rsaEncrypt(publicKeyRaw: string, plaintext: string): string {
|
||||
// Ozonetel returns raw base64 without PEM headers — wrap it
|
||||
const pem = publicKeyRaw.includes('-----BEGIN')
|
||||
? publicKeyRaw
|
||||
: `-----BEGIN PUBLIC KEY-----\n${publicKeyRaw}\n-----END PUBLIC KEY-----`;
|
||||
const buffer = Buffer.from(plaintext, 'utf8');
|
||||
const encrypted = publicEncrypt(
|
||||
{ key: pem, padding: cryptoConstants.RSA_PKCS1_PADDING },
|
||||
buffer,
|
||||
);
|
||||
return encrypted.toString('base64');
|
||||
}
|
||||
|
||||
private async login(): Promise<string> {
|
||||
const config = this.telephony.getConfig();
|
||||
const { adminUsername, adminPassword } = config.ozonetel;
|
||||
|
||||
if (!adminUsername || !adminPassword) {
|
||||
throw new Error('Ozonetel admin credentials not configured');
|
||||
}
|
||||
|
||||
// Step 1: Get RSA public key
|
||||
this.logger.log('Fetching Ozonetel public key...');
|
||||
const preLoginRes = await axios.get(`${this.apiBase}/api/auth/public-key`);
|
||||
const { publicKey, keyId } = preLoginRes.data;
|
||||
|
||||
if (!publicKey || !keyId) {
|
||||
throw new Error('Failed to get Ozonetel public key');
|
||||
}
|
||||
|
||||
// Step 2: RSA-encrypt credentials using Node crypto
|
||||
const encryptedUsername = this.rsaEncrypt(publicKey, adminUsername);
|
||||
const encryptedPassword = this.rsaEncrypt(publicKey, adminPassword);
|
||||
|
||||
// Step 3: Login
|
||||
this.logger.log('Logging into Ozonetel admin portal...');
|
||||
const loginRes = await axios.post(`${this.apiBase}/auth/login`, {
|
||||
username: encryptedUsername,
|
||||
password: encryptedPassword,
|
||||
keyId,
|
||||
ltype: 'PORTAL',
|
||||
}, {
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
});
|
||||
|
||||
const data = loginRes.data;
|
||||
if (!data.token) {
|
||||
throw new Error(`Ozonetel admin login failed: ${JSON.stringify(data)}`);
|
||||
}
|
||||
|
||||
this.cachedToken = data.token;
|
||||
this.cachedUserId = data.userId?.toString() ?? data.UserId?.toString() ?? '';
|
||||
this.cachedUserName = data.name ?? adminUsername;
|
||||
|
||||
// Decode token expiry — fallback to 6 hours
|
||||
try {
|
||||
const payload = JSON.parse(Buffer.from(data.token.split('.')[1], 'base64').toString());
|
||||
this.tokenExpiresAt = (payload.exp ?? 0) * 1000 - 60_000; // refresh 1 min early
|
||||
} catch {
|
||||
this.tokenExpiresAt = Date.now() + 6 * 60 * 60 * 1000;
|
||||
}
|
||||
|
||||
this.logger.log(`Ozonetel admin login successful (userId=${this.cachedUserId}, expires in ${Math.round((this.tokenExpiresAt - Date.now()) / 60000)}min)`);
|
||||
return this.cachedToken!;
|
||||
}
|
||||
|
||||
isConfigured(): boolean {
|
||||
const config = this.telephony.getConfig();
|
||||
return !!(config.ozonetel.adminUsername && config.ozonetel.adminPassword);
|
||||
}
|
||||
}
|
||||
@@ -2,9 +2,22 @@ import { Controller, Post, Get, Body, Query, Logger, HttpException } from '@nest
|
||||
import { OzonetelAgentService } from './ozonetel-agent.service';
|
||||
import { MissedQueueService } from '../worklist/missed-queue.service';
|
||||
import { PlatformGraphqlService } from '../platform/platform-graphql.service';
|
||||
import { AgentLookupService } from '../platform/agent-lookup.service';
|
||||
import { EventBusService } from '../events/event-bus.service';
|
||||
import { Topics } from '../events/event-types';
|
||||
import { TelephonyConfigService } from '../config/telephony-config.service';
|
||||
import { SupervisorService } from '../supervisor/supervisor.service';
|
||||
import { AgentHistoryService } from '../supervisor/agent-history.service';
|
||||
|
||||
// Convert Ozonetel "HH:MM:SS" (or null/empty) to integer seconds.
|
||||
// Returns null when input is missing or all-zero.
|
||||
function parseHmsToSec(raw: any): number | null {
|
||||
if (!raw || typeof raw !== 'string') return null;
|
||||
if (raw === '00:00:00') return null;
|
||||
const parts = raw.split(':').map((p) => parseInt(p, 10));
|
||||
if (parts.length !== 3 || parts.some((n) => isNaN(n))) return null;
|
||||
return parts[0] * 3600 + parts[1] * 60 + parts[2];
|
||||
}
|
||||
|
||||
@Controller('api/ozonetel')
|
||||
export class OzonetelAgentController {
|
||||
@@ -16,17 +29,14 @@ export class OzonetelAgentController {
|
||||
private readonly missedQueue: MissedQueueService,
|
||||
private readonly platform: PlatformGraphqlService,
|
||||
private readonly eventBus: EventBusService,
|
||||
private readonly supervisor: SupervisorService,
|
||||
private readonly agentLookup: AgentLookupService,
|
||||
private readonly agentHistory: AgentHistoryService,
|
||||
) {}
|
||||
|
||||
// Read-through accessors so admin updates take effect immediately.
|
||||
private get defaultAgentId(): string {
|
||||
return this.telephony.getConfig().ozonetel.agentId || 'agent3';
|
||||
}
|
||||
private get defaultAgentPassword(): string {
|
||||
return this.telephony.getConfig().ozonetel.agentPassword;
|
||||
}
|
||||
private get defaultSipId(): string {
|
||||
return this.telephony.getConfig().ozonetel.sipId || '521814';
|
||||
private requireAgentId(agentId: string | undefined | null): string {
|
||||
if (!agentId) throw new HttpException('agentId required', 400);
|
||||
return agentId;
|
||||
}
|
||||
|
||||
@Post('agent-login')
|
||||
@@ -65,17 +75,18 @@ export class OzonetelAgentController {
|
||||
|
||||
@Post('agent-state')
|
||||
async agentState(
|
||||
@Body() body: { state: 'Ready' | 'Pause'; pauseReason?: string },
|
||||
@Body() body: { agentId: string; state: 'Ready' | 'Pause'; pauseReason?: string },
|
||||
) {
|
||||
if (!body.state) {
|
||||
throw new HttpException('state required', 400);
|
||||
}
|
||||
const agentId = this.requireAgentId(body.agentId);
|
||||
|
||||
this.logger.log(`[AGENT-STATE] ${this.defaultAgentId} → ${body.state} (${body.pauseReason ?? 'none'})`);
|
||||
this.logger.log(`[AGENT-STATE] ${agentId} → ${body.state} (${body.pauseReason ?? 'none'})`);
|
||||
|
||||
try {
|
||||
const result = await this.ozonetelAgent.changeAgentState({
|
||||
agentId: this.defaultAgentId,
|
||||
agentId,
|
||||
state: body.state,
|
||||
pauseReason: body.pauseReason,
|
||||
});
|
||||
@@ -84,7 +95,7 @@ export class OzonetelAgentController {
|
||||
// Auto-assign missed call when agent goes Ready
|
||||
if (body.state === 'Ready') {
|
||||
try {
|
||||
const assigned = await this.missedQueue.assignNext(this.defaultAgentId);
|
||||
const assigned = await this.missedQueue.assignNext(agentId);
|
||||
if (assigned) {
|
||||
this.logger.log(`[AGENT-STATE] Auto-assigned missed call ${assigned.id}`);
|
||||
return { ...result, assignedCall: assigned };
|
||||
@@ -110,10 +121,12 @@ export class OzonetelAgentController {
|
||||
@Body() body: {
|
||||
ucid: string;
|
||||
disposition: string;
|
||||
agentId: string;
|
||||
callerPhone?: string;
|
||||
direction?: string;
|
||||
durationSec?: number;
|
||||
leadId?: string;
|
||||
leadName?: string;
|
||||
notes?: string;
|
||||
missedCallId?: string;
|
||||
},
|
||||
@@ -122,13 +135,17 @@ export class OzonetelAgentController {
|
||||
throw new HttpException('ucid and disposition required', 400);
|
||||
}
|
||||
|
||||
const agentId = this.requireAgentId(body.agentId);
|
||||
const ozonetelDisposition = this.mapToOzonetelDisposition(body.disposition);
|
||||
|
||||
this.logger.log(`[DISPOSE] ucid=${body.ucid} disposition=${body.disposition} → ozonetel="${ozonetelDisposition}" agentId=${this.defaultAgentId} callerPhone=${body.callerPhone ?? 'none'} direction=${body.direction ?? 'unknown'} leadId=${body.leadId ?? 'none'}`);
|
||||
// Cancel the ACW auto-dispose timer — the frontend submitted disposition
|
||||
this.supervisor.cancelAcwTimer(agentId);
|
||||
|
||||
this.logger.log(`[DISPOSE] ucid=${body.ucid} disposition=${body.disposition} → ozonetel="${ozonetelDisposition}" agentId=${agentId} callerPhone=${body.callerPhone ?? 'none'} direction=${body.direction ?? 'unknown'} leadId=${body.leadId ?? 'none'}`);
|
||||
|
||||
try {
|
||||
const result = await this.ozonetelAgent.setDisposition({
|
||||
agentId: this.defaultAgentId,
|
||||
agentId,
|
||||
ucid: body.ucid,
|
||||
disposition: ozonetelDisposition,
|
||||
});
|
||||
@@ -139,20 +156,121 @@ export class OzonetelAgentController {
|
||||
this.logger.error(`[DISPOSE] FAILED: ${message} ${responseData}`);
|
||||
}
|
||||
|
||||
// Create call record for outbound calls. Inbound calls are
|
||||
// created by the webhook — but we skip outbound in the webhook
|
||||
// (they're not "missed calls"). So the dispose endpoint is the
|
||||
// only place that creates the call record for outbound dials.
|
||||
if (body.direction === 'OUTBOUND' && body.callerPhone) {
|
||||
try {
|
||||
const durationSec = body.durationSec ?? 0;
|
||||
const endedAt = new Date().toISOString();
|
||||
const startedAt = durationSec > 0
|
||||
? new Date(Date.now() - durationSec * 1000).toISOString()
|
||||
: endedAt;
|
||||
const callData: Record<string, any> = {
|
||||
name: `Outbound — ${body.callerPhone}`,
|
||||
direction: 'OUTBOUND',
|
||||
callStatus: 'COMPLETED',
|
||||
callerNumber: { primaryPhoneNumber: `+91${body.callerPhone.replace(/^\+?91/, '')}` },
|
||||
agentName: agentId,
|
||||
durationSec,
|
||||
disposition: body.disposition,
|
||||
startedAt,
|
||||
endedAt,
|
||||
};
|
||||
// Persist UCID so the CDR enrichment cron and backfill can
|
||||
// resolve the authoritative agent relation even if the initial
|
||||
// lookup misses.
|
||||
if (body.ucid) callData.ucid = body.ucid;
|
||||
// Resolve the agent relation from the logged-in agentId. For
|
||||
// outbound, the dispatching agent IS the handler — no transfer.
|
||||
const agentUuid = await this.agentLookup.resolveByOzonetelId(agentId);
|
||||
if (agentUuid) callData.agentId = agentUuid;
|
||||
if (body.leadId) callData.leadId = body.leadId;
|
||||
if (body.leadName) callData.leadName = body.leadName;
|
||||
|
||||
const apiKey = process.env.PLATFORM_API_KEY;
|
||||
if (apiKey) {
|
||||
const result = await this.platform.queryWithAuth<any>(
|
||||
`mutation($data: CallCreateInput!) { createCall(data: $data) { id } }`,
|
||||
{ data: callData },
|
||||
`Bearer ${apiKey}`,
|
||||
);
|
||||
this.logger.log(`[DISPOSE] Created outbound call record: ${result.createCall.id}`);
|
||||
|
||||
// Fetch recording URL from CDR after a delay (Ozonetel needs time to process)
|
||||
const callId = result.createCall.id;
|
||||
const ucid = body.ucid;
|
||||
const dateStr = new Date().toISOString().split('T')[0];
|
||||
setTimeout(async () => {
|
||||
try {
|
||||
// fetchCdrByUCID is the targeted lookup — Ozonetel resolves
|
||||
// leg-pair UCIDs server-side, so the agent-facing UCID we
|
||||
// hold reliably returns the call row and its CallAudio.
|
||||
const record = await this.ozonetelAgent.fetchCdrByUCID({ date: dateStr, ucid });
|
||||
const audioUrl = record?.CallAudio || record?.AudioFile;
|
||||
// Compose a single update with recording + SLA timing
|
||||
// fields. CDR exposes HandlingTime, WrapupDuration,
|
||||
// HoldDuration as HH:MM:SS strings.
|
||||
const updateData: Record<string, any> = {};
|
||||
if (audioUrl) {
|
||||
updateData.recording = { primaryLinkUrl: audioUrl, primaryLinkLabel: 'Recording' };
|
||||
}
|
||||
const handlingSec = parseHmsToSec(record?.HandlingTime);
|
||||
const wrapupSec = parseHmsToSec(record?.WrapupDuration);
|
||||
const holdSec = parseHmsToSec(record?.HoldDuration);
|
||||
if (handlingSec !== null) updateData.handlingTimeS = handlingSec;
|
||||
if (wrapupSec !== null) updateData.acwDurationS = wrapupSec;
|
||||
if (holdSec !== null) updateData.holdDurationS = holdSec;
|
||||
// Overwrite agent relation with CDR's AgentID (the
|
||||
// actual final handler; may differ from the caller
|
||||
// agentId if Ozonetel transferred the dial).
|
||||
const cdrAgentId = record?.AgentID;
|
||||
if (cdrAgentId) {
|
||||
const cdrAgentUuid = await this.agentLookup.resolveByOzonetelId(cdrAgentId);
|
||||
if (cdrAgentUuid) updateData.agentId = cdrAgentUuid;
|
||||
if (record.AgentName) updateData.agentName = record.AgentName;
|
||||
}
|
||||
if (record?.TransferredTo) updateData.transferredTo = record.TransferredTo;
|
||||
if (record?.TransferType) updateData.transferType = record.TransferType;
|
||||
if (Object.keys(updateData).length > 0) {
|
||||
await this.platform.queryWithAuth<any>(
|
||||
`mutation($id: UUID!, $data: CallUpdateInput!) { updateCall(id: $id, data: $data) { id } }`,
|
||||
{ id: callId, data: updateData },
|
||||
`Bearer ${apiKey}`,
|
||||
);
|
||||
this.logger.log(`[DISPOSE] Updated outbound call ${callId} ${audioUrl ? 'with recording + ' : ''}timing (handling=${handlingSec ?? 'na'}s wrap=${wrapupSec ?? 'na'}s hold=${holdSec ?? 'na'}s)`);
|
||||
} else {
|
||||
this.logger.warn(`[DISPOSE] No CallAudio or timing for ucid=${ucid} — record=${JSON.stringify(record ?? null)}`);
|
||||
}
|
||||
} catch (err: any) {
|
||||
this.logger.warn(`[DISPOSE] Failed to fetch recording for outbound call: ${err.message}`);
|
||||
}
|
||||
}, 30_000);
|
||||
}
|
||||
} catch (err: any) {
|
||||
this.logger.warn(`[DISPOSE] Failed to create outbound call record: ${err.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
// Handle missed call callback status update
|
||||
if (body.missedCallId) {
|
||||
const statusMap: Record<string, string> = {
|
||||
APPOINTMENT_BOOKED: 'CALLBACK_COMPLETED',
|
||||
APPOINTMENT_RESCHEDULED: 'CALLBACK_COMPLETED',
|
||||
APPOINTMENT_CANCELLED: 'CALLBACK_COMPLETED',
|
||||
INFO_PROVIDED: 'CALLBACK_COMPLETED',
|
||||
FOLLOW_UP_SCHEDULED: 'CALLBACK_COMPLETED',
|
||||
CALLBACK_REQUESTED: 'CALLBACK_COMPLETED',
|
||||
NOT_INTERESTED: 'CALLBACK_COMPLETED',
|
||||
WRONG_NUMBER: 'WRONG_NUMBER',
|
||||
NO_ANSWER: 'CALLBACK_ATTEMPTED',
|
||||
};
|
||||
const newStatus = statusMap[body.disposition];
|
||||
if (newStatus) {
|
||||
try {
|
||||
await this.platform.query<any>(
|
||||
`mutation { updateCall(id: "${body.missedCallId}", data: { callbackstatus: ${newStatus} }) { id } }`,
|
||||
`mutation { updateCall(id: "${body.missedCallId}", data: { callbackStatus: ${newStatus}, disposition: ${body.disposition} }) { id } }`,
|
||||
);
|
||||
} catch (err) {
|
||||
this.logger.warn(`Failed to update missed call status: ${err}`);
|
||||
@@ -162,7 +280,7 @@ export class OzonetelAgentController {
|
||||
|
||||
// Auto-assign next missed call to this agent
|
||||
try {
|
||||
await this.missedQueue.assignNext(this.defaultAgentId);
|
||||
await this.missedQueue.assignNext(agentId);
|
||||
} catch (err) {
|
||||
this.logger.warn(`Auto-assignment after dispose failed: ${err}`);
|
||||
}
|
||||
@@ -171,7 +289,7 @@ export class OzonetelAgentController {
|
||||
this.eventBus.emit(Topics.CALL_COMPLETED, {
|
||||
callId: null,
|
||||
ucid: body.ucid,
|
||||
agentId: this.defaultAgentId,
|
||||
agentId,
|
||||
callerPhone: body.callerPhone ?? '',
|
||||
direction: body.direction ?? 'INBOUND',
|
||||
durationSec: body.durationSec ?? 0,
|
||||
@@ -186,19 +304,27 @@ export class OzonetelAgentController {
|
||||
|
||||
@Post('dial')
|
||||
async dial(
|
||||
@Body() body: { phoneNumber: string; campaignName?: string; leadId?: string },
|
||||
@Body() body: { phoneNumber: string; agentId: string; campaignName?: string; leadId?: string },
|
||||
) {
|
||||
if (!body.phoneNumber) {
|
||||
throw new HttpException('phoneNumber required', 400);
|
||||
}
|
||||
|
||||
const campaignName = body.campaignName ?? this.telephony.getConfig().ozonetel.campaignName ?? 'Inbound_918041763265';
|
||||
const agentId = this.requireAgentId(body.agentId);
|
||||
const did = this.telephony.getConfig().ozonetel.did;
|
||||
const campaignName = body.campaignName
|
||||
|| this.telephony.getConfig().ozonetel.campaignName
|
||||
|| (did ? `Inbound_${did}` : '');
|
||||
|
||||
this.logger.log(`[DIAL] phone=${body.phoneNumber} campaign=${campaignName} agentId=${this.defaultAgentId} lead=${body.leadId ?? 'none'}`);
|
||||
if (!campaignName) {
|
||||
throw new HttpException('Campaign name not configured — set in Telephony settings or pass campaignName', 400);
|
||||
}
|
||||
|
||||
this.logger.log(`[DIAL] phone=${body.phoneNumber} campaign=${campaignName} agentId=${agentId} lead=${body.leadId ?? 'none'}`);
|
||||
|
||||
try {
|
||||
const result = await this.ozonetelAgent.manualDial({
|
||||
agentId: this.defaultAgentId,
|
||||
agentId,
|
||||
campaignName,
|
||||
customerNumber: body.phoneNumber,
|
||||
});
|
||||
@@ -276,23 +402,56 @@ export class OzonetelAgentController {
|
||||
}
|
||||
|
||||
@Get('performance')
|
||||
async performance(@Query('date') date?: string) {
|
||||
async performance(@Query('date') date?: string, @Query('agentId') agentId?: string) {
|
||||
const agent = this.requireAgentId(agentId);
|
||||
const targetDate = date ?? new Date().toISOString().split('T')[0];
|
||||
this.logger.log(`Performance: date=${targetDate} agent=${this.defaultAgentId}`);
|
||||
this.logger.log(`Performance: date=${targetDate} agent=${agent}`);
|
||||
|
||||
const [cdr, summary, aht] = await Promise.all([
|
||||
// Trigger an on-demand rollup for the requested date so the
|
||||
// AgentSession row reflects the current open session (caps at now)
|
||||
// instead of waiting up to 15 min for the background tick. Fire-and-
|
||||
// forget with a short await so we don't block the whole response on
|
||||
// cache-refresh tail but still hand the read a fresh row when Redpanda
|
||||
// is quiet. Safe to error — AgentSession just stays stale.
|
||||
await this.agentHistory.rollupSessions(targetDate).catch(() => {});
|
||||
|
||||
const [cdr, summary, aht, agentSessionBreakdown] = await Promise.all([
|
||||
this.ozonetelAgent.fetchCDR({ date: targetDate }),
|
||||
this.ozonetelAgent.getAgentSummary(this.defaultAgentId, targetDate),
|
||||
this.ozonetelAgent.getAHT(this.defaultAgentId),
|
||||
this.ozonetelAgent.getAgentSummary(agent, targetDate),
|
||||
this.ozonetelAgent.getAHT(agent),
|
||||
this.fetchAgentSessionTimeBreakdown(agent, targetDate),
|
||||
]);
|
||||
|
||||
const totalCalls = cdr.length;
|
||||
const inbound = cdr.filter((c: any) => c.Type === 'InBound').length;
|
||||
const outbound = cdr.filter((c: any) => c.Type === 'Manual' || c.Type === 'Progressive').length;
|
||||
const answered = cdr.filter((c: any) => c.Status === 'Answered').length;
|
||||
const missed = cdr.filter((c: any) => c.Status === 'Unanswered' || c.Status === 'NotAnswered').length;
|
||||
// Prefer our AgentSession rollup when present — it correctly counts
|
||||
// the current OPEN session (caps at now), while Ozonetel's summaryReport
|
||||
// only tallies CLOSED login→logout pairs. Fall back to Ozonetel if
|
||||
// our rollup hasn't captured this agent yet (e.g., brand-new agent,
|
||||
// workspace without AgentEvent entity synced).
|
||||
const timeUtilization = agentSessionBreakdown ?? summary;
|
||||
|
||||
const talkTimes = cdr
|
||||
// Filter CDR to this agent only — fetchCDR returns all agents' calls
|
||||
// Use case-insensitive matching — Ozonetel field casing varies
|
||||
const agentLower = agent.toLowerCase();
|
||||
const agentCdr = cdr.filter((c: any) =>
|
||||
(c.AgentID ?? '').toLowerCase() === agentLower ||
|
||||
(c.AgentName ?? '').toLowerCase() === agentLower,
|
||||
);
|
||||
this.logger.log(`[PERFORMANCE] CDR total=${cdr.length} agentFiltered=${agentCdr.length} agent="${agent}"`);
|
||||
if (cdr.length > 0 && agentCdr.length === 0) {
|
||||
const sampleIds = cdr.slice(0, 3).map((c: any) => `AgentID="${c.AgentID}" AgentName="${c.AgentName}"`);
|
||||
this.logger.warn(`[PERFORMANCE] No CDR match for agent "${agent}". Sample CDR agents: ${sampleIds.join(', ')}`);
|
||||
}
|
||||
|
||||
const totalCalls = agentCdr.length;
|
||||
const inbound = agentCdr.filter((c: any) => (c.Type ?? '').toLowerCase() === 'inbound').length;
|
||||
const outbound = agentCdr.filter((c: any) => {
|
||||
const type = (c.Type ?? '').toLowerCase();
|
||||
return type === 'manual' || type === 'progressive' || type === 'outbound';
|
||||
}).length;
|
||||
const answered = agentCdr.filter((c: any) => (c.Status ?? '').toLowerCase() === 'answered').length;
|
||||
const missed = agentCdr.filter((c: any) => (c.Status ?? '').toLowerCase() === 'notanswered').length;
|
||||
|
||||
const talkTimes = agentCdr
|
||||
.filter((c: any) => c.TalkTime && c.TalkTime !== '00:00:00')
|
||||
.map((c: any) => {
|
||||
const parts = c.TalkTime.split(':').map(Number);
|
||||
@@ -303,12 +462,12 @@ export class OzonetelAgentController {
|
||||
: 0;
|
||||
|
||||
const dispositions: Record<string, number> = {};
|
||||
for (const c of cdr) {
|
||||
for (const c of agentCdr) {
|
||||
const d = (c as any).Disposition || 'No Disposition';
|
||||
dispositions[d] = (dispositions[d] ?? 0) + 1;
|
||||
}
|
||||
|
||||
const appointmentsBooked = cdr.filter((c: any) =>
|
||||
const appointmentsBooked = agentCdr.filter((c: any) =>
|
||||
c.Disposition?.toLowerCase().includes('appointment'),
|
||||
).length;
|
||||
|
||||
@@ -319,7 +478,7 @@ export class OzonetelAgentController {
|
||||
avgHandlingTime: aht,
|
||||
conversionRate: totalCalls > 0 ? Math.round((appointmentsBooked / totalCalls) * 100) : 0,
|
||||
appointmentsBooked,
|
||||
timeUtilization: summary,
|
||||
timeUtilization,
|
||||
dispositions,
|
||||
};
|
||||
}
|
||||
@@ -328,12 +487,63 @@ export class OzonetelAgentController {
|
||||
// Campaign only has 'General Enquiry' configured currently
|
||||
const map: Record<string, string> = {
|
||||
'APPOINTMENT_BOOKED': 'General Enquiry',
|
||||
'APPOINTMENT_RESCHEDULED': 'General Enquiry',
|
||||
'APPOINTMENT_CANCELLED': 'General Enquiry',
|
||||
'FOLLOW_UP_SCHEDULED': 'General Enquiry',
|
||||
'INFO_PROVIDED': 'General Enquiry',
|
||||
'NO_ANSWER': 'General Enquiry',
|
||||
'WRONG_NUMBER': 'General Enquiry',
|
||||
'NOT_INTERESTED': 'General Enquiry',
|
||||
'CALLBACK_REQUESTED': 'General Enquiry',
|
||||
};
|
||||
return map[disposition] ?? 'General Enquiry';
|
||||
}
|
||||
|
||||
// Convert our AgentSession rollup (seconds per category) into the HH:MM:SS
|
||||
// shape the frontend expects — so My Performance gets LOGIN TIME with the
|
||||
// current open session included, not just closed sessions from Ozonetel.
|
||||
private async fetchAgentSessionTimeBreakdown(ozonetelAgentId: string, date: string): Promise<{
|
||||
totalLoginDuration: string;
|
||||
totalBusyTime: string;
|
||||
totalIdleTime: string;
|
||||
totalPauseTime: string;
|
||||
totalWrapupTime: string;
|
||||
totalDialTime: string;
|
||||
} | null> {
|
||||
try {
|
||||
const agentUuid = await this.agentLookup.resolveByOzonetelId(ozonetelAgentId);
|
||||
if (!agentUuid) return null;
|
||||
const data = await this.platform.query<any>(
|
||||
`{ agentSessions(first: 1, filter: {
|
||||
agentId: { eq: "${agentUuid}" },
|
||||
date: { eq: "${date}" }
|
||||
}) { edges { node {
|
||||
loginDurationS busyTimeS idleTimeS pauseTimeS wrapupTimeS dialTimeS
|
||||
} } } }`,
|
||||
);
|
||||
const node = data?.agentSessions?.edges?.[0]?.node;
|
||||
if (!node) return null;
|
||||
const hms = (sec: number | null | undefined): string => {
|
||||
const s = Math.max(0, Math.round(sec ?? 0));
|
||||
const h = Math.floor(s / 3600);
|
||||
const m = Math.floor((s % 3600) / 60);
|
||||
const r = s % 60;
|
||||
return `${h.toString().padStart(2, '0')}:${m.toString().padStart(2, '0')}:${r.toString().padStart(2, '0')}`;
|
||||
};
|
||||
// If the entire rollup is zero, treat as "no data yet" — fall back
|
||||
// to Ozonetel's summaryReport so the KPI isn't all zeroes.
|
||||
const total = (node.loginDurationS ?? 0) + (node.busyTimeS ?? 0) + (node.idleTimeS ?? 0) + (node.pauseTimeS ?? 0) + (node.wrapupTimeS ?? 0);
|
||||
if (total === 0) return null;
|
||||
return {
|
||||
totalLoginDuration: hms(node.loginDurationS),
|
||||
totalBusyTime: hms(node.busyTimeS),
|
||||
totalIdleTime: hms(node.idleTimeS),
|
||||
totalPauseTime: hms(node.pauseTimeS),
|
||||
totalWrapupTime: hms(node.wrapupTimeS),
|
||||
totalDialTime: hms(node.dialTimeS),
|
||||
};
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,13 +2,15 @@ import { Module, forwardRef } from '@nestjs/common';
|
||||
import { OzonetelAgentController } from './ozonetel-agent.controller';
|
||||
import { OzonetelAgentService } from './ozonetel-agent.service';
|
||||
import { KookooIvrController } from './kookoo-ivr.controller';
|
||||
import { CdrEnrichmentService } from './cdr-enrichment.service';
|
||||
import { WorklistModule } from '../worklist/worklist.module';
|
||||
import { PlatformModule } from '../platform/platform.module';
|
||||
import { SupervisorModule } from '../supervisor/supervisor.module';
|
||||
|
||||
@Module({
|
||||
imports: [PlatformModule, forwardRef(() => WorklistModule)],
|
||||
imports: [PlatformModule, forwardRef(() => WorklistModule), forwardRef(() => SupervisorModule)],
|
||||
controllers: [OzonetelAgentController, KookooIvrController],
|
||||
providers: [OzonetelAgentService],
|
||||
exports: [OzonetelAgentService],
|
||||
providers: [OzonetelAgentService, CdrEnrichmentService],
|
||||
exports: [OzonetelAgentService, CdrEnrichmentService],
|
||||
})
|
||||
export class OzonetelAgentModule {}
|
||||
|
||||
@@ -394,6 +394,48 @@ export class OzonetelAgentService {
|
||||
}
|
||||
}
|
||||
|
||||
// Fetch a single CDR record by UCID. Preferred over fetchCDR + .find()
|
||||
// for recording lookups — Ozonetel resolves leg-pair UCIDs internally,
|
||||
// so the agent-side UCID we hold reliably returns the call row.
|
||||
// Same rate limit as fetchCDR (2 req/min, 15-day window).
|
||||
async fetchCdrByUCID(params: { date: string; ucid: string }): Promise<Record<string, any> | null> {
|
||||
const url = `https://${this.apiDomain}/ca_reports/fetchCdrByUCID`;
|
||||
this.logger.log(`Fetch CDR by UCID: ucid=${params.ucid} date=${params.date}`);
|
||||
|
||||
try {
|
||||
const token = await this.getToken();
|
||||
const body = {
|
||||
userName: this.accountId,
|
||||
fromDate: `${params.date} 00:00:00`,
|
||||
toDate: `${params.date} 23:59:59`,
|
||||
ucid: params.ucid,
|
||||
};
|
||||
|
||||
const response = await axios({
|
||||
method: 'GET',
|
||||
url,
|
||||
headers: {
|
||||
Authorization: `Bearer ${token}`,
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
data: JSON.stringify(body),
|
||||
});
|
||||
|
||||
const data = response.data;
|
||||
if (data.status === 'success' && Array.isArray(data.details) && data.details.length > 0) {
|
||||
return data.details[0];
|
||||
}
|
||||
if (data.status === 'success' && data.details && !Array.isArray(data.details)) {
|
||||
return data.details;
|
||||
}
|
||||
return null;
|
||||
} catch (error: any) {
|
||||
const responseData = error?.response?.data ? JSON.stringify(error.response.data) : '';
|
||||
this.logger.error(`Fetch CDR by UCID failed: ${error.message} ${responseData}`);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
async getAgentSummary(agentId: string, date: string): Promise<{
|
||||
totalLoginDuration: string;
|
||||
totalBusyTime: string;
|
||||
|
||||
70
src/platform/agent-lookup.service.ts
Normal file
70
src/platform/agent-lookup.service.ts
Normal file
@@ -0,0 +1,70 @@
|
||||
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
|
||||
import { PlatformGraphqlService } from './platform-graphql.service';
|
||||
|
||||
/**
|
||||
* Maps Ozonetel agent identifiers (unique — e.g. "ramaiahadmin",
|
||||
* "globalhealthx", "global") to the platform Agent entity UUID. Used by
|
||||
* ingest paths (webhook, dispose, CDR enrichment, backfill) so every Call
|
||||
* ends up with the correct `agent` relation regardless of how Ozonetel
|
||||
* formats the display name (AgentName collisions, transfer chains like
|
||||
* "A -> B -> C", etc.).
|
||||
*
|
||||
* The cache is case-insensitive because Ozonetel occasionally mixes
|
||||
* casing ("global" vs "Global" vs "GLOBAL") across webhook/CDR responses.
|
||||
*/
|
||||
@Injectable()
|
||||
export class AgentLookupService implements OnModuleInit {
|
||||
private readonly logger = new Logger(AgentLookupService.name);
|
||||
private readonly uuidByOzonetelId = new Map<string, string>();
|
||||
private readonly uuidByDisplayName = new Map<string, string>();
|
||||
|
||||
constructor(private readonly platform: PlatformGraphqlService) {}
|
||||
|
||||
async onModuleInit() {
|
||||
await this.refresh();
|
||||
}
|
||||
|
||||
async refresh(): Promise<void> {
|
||||
try {
|
||||
const data = await this.platform.query<any>(
|
||||
`{ agents(first: 100) { edges { node { id ozonetelAgentId ozonetelDisplayName } } } }`,
|
||||
);
|
||||
const edges = data?.agents?.edges ?? [];
|
||||
this.uuidByOzonetelId.clear();
|
||||
this.uuidByDisplayName.clear();
|
||||
for (const edge of edges) {
|
||||
const n = edge.node;
|
||||
if (n.ozonetelAgentId) {
|
||||
this.uuidByOzonetelId.set(n.ozonetelAgentId.toLowerCase(), n.id);
|
||||
}
|
||||
if (n.ozonetelDisplayName) {
|
||||
this.uuidByDisplayName.set(n.ozonetelDisplayName.toLowerCase().trim(), n.id);
|
||||
}
|
||||
}
|
||||
this.logger.log(`[AGENT-LOOKUP] Loaded ${this.uuidByOzonetelId.size} agents (${this.uuidByDisplayName.size} with display name)`);
|
||||
} catch (err) {
|
||||
this.logger.warn(`[AGENT-LOOKUP] Refresh failed: ${err}`);
|
||||
}
|
||||
}
|
||||
|
||||
async resolveByOzonetelId(ozonetelId: string | null | undefined): Promise<string | null> {
|
||||
if (!ozonetelId) return null;
|
||||
const key = ozonetelId.toLowerCase();
|
||||
const cached = this.uuidByOzonetelId.get(key);
|
||||
if (cached) return cached;
|
||||
// Cache miss — refresh once (handles late-provisioned agents)
|
||||
await this.refresh();
|
||||
return this.uuidByOzonetelId.get(key) ?? null;
|
||||
}
|
||||
|
||||
// Resolve by Ozonetel display name (e.g. "Ganesh Bandi") — used by
|
||||
// missed-call webhook backfill where only AgentName (display) is available.
|
||||
async resolveByDisplayName(displayName: string | null | undefined): Promise<string | null> {
|
||||
if (!displayName) return null;
|
||||
const key = displayName.toLowerCase().trim();
|
||||
const cached = this.uuidByDisplayName.get(key);
|
||||
if (cached) return cached;
|
||||
await this.refresh();
|
||||
return this.uuidByDisplayName.get(key) ?? null;
|
||||
}
|
||||
}
|
||||
@@ -1,8 +1,9 @@
|
||||
import { Module } from '@nestjs/common';
|
||||
import { PlatformGraphqlService } from './platform-graphql.service';
|
||||
import { AgentLookupService } from './agent-lookup.service';
|
||||
|
||||
@Module({
|
||||
providers: [PlatformGraphqlService],
|
||||
exports: [PlatformGraphqlService],
|
||||
providers: [PlatformGraphqlService, AgentLookupService],
|
||||
exports: [PlatformGraphqlService, AgentLookupService],
|
||||
})
|
||||
export class PlatformModule {}
|
||||
|
||||
@@ -1,12 +1,95 @@
|
||||
// src/rules-engine/actions/escalate.action.ts
|
||||
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
import { PlatformGraphqlService } from '../../platform/platform-graphql.service';
|
||||
import type { ActionHandler, ActionResult } from '../types/action.types';
|
||||
import type { RuleAction } from '../types/rule.types';
|
||||
import type { RuleAction, EscalateActionParams } from '../types/rule.types';
|
||||
|
||||
/**
|
||||
* Persists a PerformanceAlert when a rule's escalate action fires.
|
||||
*
|
||||
* Dedupes by (agentId, alertType, IST date) — a single rule firing every
|
||||
* 5 min should only produce ONE alert per day per agent until dismissed.
|
||||
* If a row already exists for that key today and is not dismissed, the
|
||||
* action is a no-op (returns the existing id). If the existing row was
|
||||
* dismissed earlier today, we don't re-fire — supervisor explicitly
|
||||
* acknowledged.
|
||||
*/
|
||||
@Injectable()
|
||||
export class EscalateActionHandler implements ActionHandler {
|
||||
type = 'escalate';
|
||||
private readonly logger = new Logger(EscalateActionHandler.name);
|
||||
|
||||
async execute(_action: RuleAction, _context: Record<string, any>): Promise<ActionResult> {
|
||||
return { success: true, data: { stub: true, action: 'escalate' } };
|
||||
constructor(private readonly platform: PlatformGraphqlService) {}
|
||||
|
||||
async execute(action: RuleAction, context: Record<string, any>): Promise<ActionResult> {
|
||||
const params = action.params as EscalateActionParams & { ruleId?: string; alertType?: string };
|
||||
const agentId = context['agent.id'] as string | undefined;
|
||||
const agentName = (context['agent.name'] as string | undefined) ?? '';
|
||||
const valueRaw = context['_alertValue'];
|
||||
const valueText = valueRaw != null ? String(valueRaw) : null;
|
||||
|
||||
if (!agentId) {
|
||||
return { success: false, error: 'agent.id missing from facts' };
|
||||
}
|
||||
|
||||
const alertType = params.alertType ?? this.inferAlertType(params.message);
|
||||
const severity = (params.severity ?? 'warning').toUpperCase(); // INFO | WARNING | CRITICAL
|
||||
const today = this.todayIst();
|
||||
|
||||
// Dedupe: any non-dismissed alert today for this agent + type?
|
||||
try {
|
||||
const existing = await this.platform.query<any>(
|
||||
`{ performanceAlerts(first: 1, filter: {
|
||||
agentId: { eq: "${agentId}" },
|
||||
alertType: { eq: ${alertType} },
|
||||
firedAt: { gte: "${today}T00:00:00+05:30", lte: "${today}T23:59:59+05:30" }
|
||||
}) { edges { node { id dismissedAt value } } } }`,
|
||||
);
|
||||
const existingNode = existing?.performanceAlerts?.edges?.[0]?.node;
|
||||
if (existingNode) {
|
||||
// Already fired today. If value changed, update it; otherwise no-op.
|
||||
if (!existingNode.dismissedAt && existingNode.value !== valueText) {
|
||||
await this.platform.query<any>(
|
||||
`mutation($id: UUID!, $data: PerformanceAlertUpdateInput!) { updatePerformanceAlert(id: $id, data: $data) { id } }`,
|
||||
{ id: existingNode.id, data: { value: valueText } },
|
||||
);
|
||||
}
|
||||
return { success: true, data: { id: existingNode.id, deduped: true, agentId, alertType } };
|
||||
}
|
||||
|
||||
const created = await this.platform.query<any>(
|
||||
`mutation($data: PerformanceAlertCreateInput!) { createPerformanceAlert(data: $data) { id } }`,
|
||||
{
|
||||
data: {
|
||||
name: `${agentName || agentId}: ${params.message ?? alertType}${valueText ? ` (${valueText})` : ''}`,
|
||||
agentId,
|
||||
alertType,
|
||||
severity,
|
||||
message: params.message ?? alertType,
|
||||
value: valueText,
|
||||
ruleId: params.ruleId ?? null,
|
||||
firedAt: new Date().toISOString(),
|
||||
},
|
||||
},
|
||||
);
|
||||
const id = created?.createPerformanceAlert?.id;
|
||||
this.logger.log(`[ESCALATE] Created alert ${id} agent=${agentName ?? agentId} type=${alertType} value=${valueText}`);
|
||||
return { success: true, data: { id, agentId, alertType, severity, message: params.message } };
|
||||
} catch (err: any) {
|
||||
this.logger.warn(`[ESCALATE] Failed for agent=${agentId}: ${err?.message ?? err}`);
|
||||
return { success: false, error: String(err?.message ?? err) };
|
||||
}
|
||||
}
|
||||
|
||||
private inferAlertType(message: string | undefined): string {
|
||||
const m = (message ?? '').toLowerCase();
|
||||
if (m.includes('idle')) return 'EXCESSIVE_IDLE';
|
||||
if (m.includes('nps')) return 'LOW_NPS';
|
||||
if (m.includes('conversion')) return 'LOW_CONVERSION';
|
||||
return 'OTHER';
|
||||
}
|
||||
|
||||
private todayIst(): string {
|
||||
const ist = new Date(Date.now() + 5.5 * 60 * 60 * 1000);
|
||||
return ist.toISOString().slice(0, 10);
|
||||
}
|
||||
}
|
||||
|
||||
114
src/rules-engine/consumers/performance.consumer.ts
Normal file
114
src/rules-engine/consumers/performance.consumer.ts
Normal file
@@ -0,0 +1,114 @@
|
||||
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
|
||||
import { RulesEngineService } from '../rules-engine.service';
|
||||
import { RulesStorageService } from '../rules-storage.service';
|
||||
import { PerformanceFactsProvider } from '../facts/performance-facts.provider';
|
||||
import { PlatformGraphqlService } from '../../platform/platform-graphql.service';
|
||||
|
||||
const TICK_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes
|
||||
const KICKOFF_DELAY_MS = 90_000; // wait for boot to settle
|
||||
|
||||
/**
|
||||
* Evaluates `on_schedule` performance rules every 5 minutes for every
|
||||
* platform Agent. Facts come from PerformanceFactsProvider; matching
|
||||
* rules dispatch the escalate action which persists a PerformanceAlert.
|
||||
*
|
||||
* Skips quietly when no scheduled performance rules are configured.
|
||||
*/
|
||||
@Injectable()
|
||||
export class PerformanceConsumer implements OnModuleInit, OnModuleDestroy {
|
||||
private readonly logger = new Logger(PerformanceConsumer.name);
|
||||
private timer: NodeJS.Timeout | null = null;
|
||||
|
||||
constructor(
|
||||
private readonly engine: RulesEngineService,
|
||||
private readonly storage: RulesStorageService,
|
||||
private readonly facts: PerformanceFactsProvider,
|
||||
private readonly platform: PlatformGraphqlService,
|
||||
) {}
|
||||
|
||||
onModuleInit() {
|
||||
setTimeout(() => {
|
||||
this.runOnce().catch((err) => {
|
||||
this.logger.warn(`[PERF-CONSUMER] First run failed: ${err?.message ?? err}`);
|
||||
});
|
||||
}, KICKOFF_DELAY_MS);
|
||||
|
||||
this.timer = setInterval(() => {
|
||||
this.runOnce().catch((err) => {
|
||||
this.logger.warn(`[PERF-CONSUMER] Tick failed: ${err?.message ?? err}`);
|
||||
});
|
||||
}, TICK_INTERVAL_MS);
|
||||
}
|
||||
|
||||
onModuleDestroy() {
|
||||
if (this.timer) clearInterval(this.timer);
|
||||
}
|
||||
|
||||
async runOnce(): Promise<{ agentsScanned: number; alertsFired: number }> {
|
||||
// Storage.getByTrigger doesn't sub-discriminate on_schedule rules, so
|
||||
// filter to only those that reference agent.* facts in their conditions.
|
||||
// Anything else (e.g. SLA-breach rules over call.* facts) belongs to
|
||||
// other consumers.
|
||||
const allScheduled = await this.storage.getByTrigger('on_schedule');
|
||||
const rules = allScheduled.filter((r) => this.referencesAgentFacts(r.conditions));
|
||||
if (rules.length === 0) {
|
||||
this.logger.debug('[PERF-CONSUMER] No agent-fact on_schedule rules — skipping');
|
||||
return { agentsScanned: 0, alertsFired: 0 };
|
||||
}
|
||||
|
||||
const agents = await this.fetchAgents();
|
||||
if (agents.length === 0) return { agentsScanned: 0, alertsFired: 0 };
|
||||
|
||||
let alertsFired = 0;
|
||||
for (const agent of agents) {
|
||||
try {
|
||||
const factContext = await this.facts.resolveFacts({ agentId: agent.id, agentName: agent.name });
|
||||
|
||||
// Each rule's escalate action needs to know which fact value
|
||||
// to surface as the alert's value (e.g. "65m" for idle).
|
||||
// Inject _alertValue per-rule below.
|
||||
for (const rule of rules) {
|
||||
const ruleFacts = { ...factContext };
|
||||
const valueFact = (rule.action.params as any)?.valueFact as string | undefined;
|
||||
if (valueFact && ruleFacts[valueFact] != null) {
|
||||
ruleFacts['_alertValue'] = ruleFacts[valueFact];
|
||||
}
|
||||
const result = await this.engine.evaluate('on_schedule', 'performance', ruleFacts);
|
||||
alertsFired += result.results.filter((r: any) => r.success && !r.data?.deduped).length;
|
||||
}
|
||||
} catch (err: any) {
|
||||
this.logger.warn(`[PERF-CONSUMER] Eval failed for agent=${agent.id}: ${err?.message ?? err}`);
|
||||
}
|
||||
}
|
||||
|
||||
if (alertsFired > 0) {
|
||||
this.logger.log(`[PERF-CONSUMER] Tick complete — agents=${agents.length} alertsFired=${alertsFired}`);
|
||||
}
|
||||
return { agentsScanned: agents.length, alertsFired };
|
||||
}
|
||||
|
||||
private referencesAgentFacts(group: any): boolean {
|
||||
if (!group) return false;
|
||||
const items = group.all ?? group.any ?? [];
|
||||
for (const item of items) {
|
||||
if (item.all || item.any) {
|
||||
if (this.referencesAgentFacts(item)) return true;
|
||||
} else if (typeof item.fact === 'string' && item.fact.startsWith('agent.')) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private async fetchAgents(): Promise<Array<{ id: string; name: string }>> {
|
||||
try {
|
||||
const data = await this.platform.query<any>(
|
||||
`{ agents(first: 100) { edges { node { id name } } } }`,
|
||||
);
|
||||
return (data?.agents?.edges ?? []).map((e: any) => e.node);
|
||||
} catch (err) {
|
||||
this.logger.warn(`[PERF-CONSUMER] Agent fetch failed: ${err}`);
|
||||
return [];
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -18,10 +18,10 @@ export class CallFactsProvider implements FactProvider {
|
||||
'call.status': call.callStatus ?? null,
|
||||
'call.disposition': call.disposition ?? null,
|
||||
'call.durationSeconds': call.durationSeconds ?? call.durationSec ?? 0,
|
||||
'call.callbackStatus': call.callbackstatus ?? call.callbackStatus ?? null,
|
||||
'call.callbackStatus': call.callbackStatus ?? call.callbackStatus ?? null,
|
||||
'call.slaElapsedPercent': slaElapsedPercent,
|
||||
'call.slaBreached': slaElapsedPercent > 100,
|
||||
'call.missedCount': call.missedcallcount ?? call.missedCount ?? 0,
|
||||
'call.missedCount': call.missedCallCount ?? call.missedCount ?? 0,
|
||||
'call.taskType': taskType,
|
||||
};
|
||||
}
|
||||
|
||||
93
src/rules-engine/facts/performance-facts.provider.ts
Normal file
93
src/rules-engine/facts/performance-facts.provider.ts
Normal file
@@ -0,0 +1,93 @@
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
import { PlatformGraphqlService } from '../../platform/platform-graphql.service';
|
||||
import type { FactProvider, FactValue } from '../types/fact.types';
|
||||
|
||||
/**
|
||||
* Resolves per-agent performance facts for the rules engine.
|
||||
* Used by the PerformanceConsumer to evaluate alert rules every 5 min.
|
||||
*
|
||||
* Facts exposed:
|
||||
* - agent.idleMinutes — from today's AgentSession.idleTimeS
|
||||
* - agent.busyMinutes — from AgentSession.busyTimeS
|
||||
* - agent.totalCallsToday — count of Calls started today
|
||||
* - agent.bookedCallsToday — count of Calls today with disposition=APPOINTMENT_BOOKED
|
||||
* - agent.conversionPercent — bookedCallsToday / totalCallsToday × 100
|
||||
* - agent.id, agent.name — for routing alerts back to the right agent
|
||||
*
|
||||
* NPS deferred — no source signal exists yet.
|
||||
*/
|
||||
@Injectable()
|
||||
export class PerformanceFactsProvider implements FactProvider {
|
||||
name = 'performance';
|
||||
private readonly logger = new Logger(PerformanceFactsProvider.name);
|
||||
|
||||
constructor(private readonly platform: PlatformGraphqlService) {}
|
||||
|
||||
/**
|
||||
* @param entityData { agentId: string, agentName?: string }
|
||||
*/
|
||||
async resolveFacts(entityData: { agentId: string; agentName?: string }): Promise<Record<string, FactValue>> {
|
||||
const agentId = entityData.agentId;
|
||||
const today = this.todayIst();
|
||||
|
||||
const session = await this.fetchTodaySession(agentId, today);
|
||||
const callTotals = await this.fetchTodayCallTotals(agentId, today);
|
||||
|
||||
const idleMinutes = Math.round((session?.idleTimeS ?? 0) / 60);
|
||||
const busyMinutes = Math.round((session?.busyTimeS ?? 0) / 60);
|
||||
const conversionPercent = callTotals.total > 0
|
||||
? Math.round((callTotals.booked / callTotals.total) * 100)
|
||||
: 0;
|
||||
|
||||
return {
|
||||
'agent.id': agentId,
|
||||
'agent.name': entityData.agentName ?? '',
|
||||
'agent.idleMinutes': idleMinutes,
|
||||
'agent.busyMinutes': busyMinutes,
|
||||
'agent.totalCallsToday': callTotals.total,
|
||||
'agent.bookedCallsToday': callTotals.booked,
|
||||
'agent.conversionPercent': conversionPercent,
|
||||
};
|
||||
}
|
||||
|
||||
private todayIst(): string {
|
||||
const ist = new Date(Date.now() + 5.5 * 60 * 60 * 1000);
|
||||
return ist.toISOString().slice(0, 10);
|
||||
}
|
||||
|
||||
private async fetchTodaySession(agentId: string, date: string): Promise<{ idleTimeS: number; busyTimeS: number } | null> {
|
||||
try {
|
||||
const data = await this.platform.query<any>(
|
||||
`{ agentSessions(first: 1, filter: { agentId: { eq: "${agentId}" }, date: { eq: "${date}" } }) {
|
||||
edges { node { idleTimeS busyTimeS } }
|
||||
} }`,
|
||||
);
|
||||
const node = data?.agentSessions?.edges?.[0]?.node;
|
||||
if (!node) return null;
|
||||
return { idleTimeS: node.idleTimeS ?? 0, busyTimeS: node.busyTimeS ?? 0 };
|
||||
} catch (err) {
|
||||
this.logger.warn(`[PERF-FACTS] Session fetch failed for agent=${agentId}: ${err}`);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private async fetchTodayCallTotals(agentId: string, date: string): Promise<{ total: number; booked: number }> {
|
||||
const gte = `${date}T00:00:00+05:30`;
|
||||
const lte = `${date}T23:59:59+05:30`;
|
||||
try {
|
||||
const data = await this.platform.query<any>(
|
||||
`{ calls(first: 200, filter: {
|
||||
agentId: { eq: "${agentId}" },
|
||||
startedAt: { gte: "${gte}", lte: "${lte}" }
|
||||
}) { edges { node { disposition } } } }`,
|
||||
);
|
||||
const edges = data?.calls?.edges ?? [];
|
||||
const total = edges.length;
|
||||
const booked = edges.filter((e: any) => e.node.disposition === 'APPOINTMENT_BOOKED').length;
|
||||
return { total, booked };
|
||||
} catch (err) {
|
||||
this.logger.warn(`[PERF-FACTS] Call totals fetch failed for agent=${agentId}: ${err}`);
|
||||
return { total: 0, booked: 0 };
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,14 +1,26 @@
|
||||
// src/rules-engine/rules-engine.module.ts
|
||||
|
||||
import { Module } from '@nestjs/common';
|
||||
import { PlatformModule } from '../platform/platform.module';
|
||||
import { RulesEngineController } from './rules-engine.controller';
|
||||
import { RulesEngineService } from './rules-engine.service';
|
||||
import { RulesStorageService } from './rules-storage.service';
|
||||
import { WorklistConsumer } from './consumers/worklist.consumer';
|
||||
import { PerformanceConsumer } from './consumers/performance.consumer';
|
||||
import { EscalateActionHandler } from './actions/escalate.action';
|
||||
import { PerformanceFactsProvider } from './facts/performance-facts.provider';
|
||||
|
||||
@Module({
|
||||
imports: [PlatformModule],
|
||||
controllers: [RulesEngineController],
|
||||
providers: [RulesEngineService, RulesStorageService, WorklistConsumer],
|
||||
exports: [RulesEngineService, RulesStorageService, WorklistConsumer],
|
||||
providers: [
|
||||
RulesEngineService,
|
||||
RulesStorageService,
|
||||
WorklistConsumer,
|
||||
PerformanceConsumer,
|
||||
EscalateActionHandler,
|
||||
PerformanceFactsProvider,
|
||||
],
|
||||
exports: [RulesEngineService, RulesStorageService, WorklistConsumer, PerformanceConsumer],
|
||||
})
|
||||
export class RulesEngineModule {}
|
||||
|
||||
@@ -20,11 +20,14 @@ export class RulesEngineService {
|
||||
private readonly agentFacts = new AgentFactsProvider();
|
||||
private readonly actionHandlers: Map<string, ActionHandler>;
|
||||
|
||||
constructor(private readonly storage: RulesStorageService) {
|
||||
constructor(
|
||||
private readonly storage: RulesStorageService,
|
||||
private readonly escalateHandler: EscalateActionHandler,
|
||||
) {
|
||||
this.actionHandlers = new Map([
|
||||
['score', new ScoreActionHandler()],
|
||||
['assign', new AssignActionHandler()],
|
||||
['escalate', new EscalateActionHandler()],
|
||||
['escalate', this.escalateHandler],
|
||||
]);
|
||||
}
|
||||
|
||||
|
||||
@@ -84,6 +84,51 @@
|
||||
"trigger": { "type": "on_schedule", "interval": "5m" },
|
||||
"conditions": { "all": [{ "fact": "call.slaBreached", "operator": "equal", "value": true }, { "fact": "call.callbackStatus", "operator": "equal", "value": "PENDING_CALLBACK" }] },
|
||||
"action": { "type": "escalate", "params": { "channel": "notification", "recipients": "supervisor", "message": "SLA breached — no callback attempted", "severity": "critical" } }
|
||||
},
|
||||
{
|
||||
"ruleType": "automation",
|
||||
"name": "Excessive idle time",
|
||||
"description": "Agent has been idle for more than the configured threshold today",
|
||||
"enabled": true,
|
||||
"priority": 2,
|
||||
"trigger": { "type": "on_schedule", "interval": "5m" },
|
||||
"conditions": { "all": [{ "fact": "agent.idleMinutes", "operator": "greaterThan", "value": 60 }] },
|
||||
"action": {
|
||||
"type": "escalate",
|
||||
"params": {
|
||||
"channel": "notification",
|
||||
"recipients": "supervisor",
|
||||
"message": "Excessive Idle Time",
|
||||
"severity": "warning",
|
||||
"alertType": "EXCESSIVE_IDLE",
|
||||
"valueFact": "agent.idleMinutes"
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"ruleType": "automation",
|
||||
"name": "Low conversion rate",
|
||||
"description": "Agent's conversion (booked/total) is below the workspace floor",
|
||||
"enabled": true,
|
||||
"priority": 3,
|
||||
"trigger": { "type": "on_schedule", "interval": "5m" },
|
||||
"conditions": {
|
||||
"all": [
|
||||
{ "fact": "agent.conversionPercent", "operator": "lessThan", "value": 15 },
|
||||
{ "fact": "agent.totalCallsToday", "operator": "greaterThan", "value": 10 }
|
||||
]
|
||||
},
|
||||
"action": {
|
||||
"type": "escalate",
|
||||
"params": {
|
||||
"channel": "notification",
|
||||
"recipients": "supervisor",
|
||||
"message": "Low Conversion",
|
||||
"severity": "warning",
|
||||
"alertType": "LOW_CONVERSION",
|
||||
"valueFact": "agent.conversionPercent"
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
381
src/supervisor/agent-history.service.ts
Normal file
381
src/supervisor/agent-history.service.ts
Normal file
@@ -0,0 +1,381 @@
|
||||
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
|
||||
import { PlatformGraphqlService } from '../platform/platform-graphql.service';
|
||||
|
||||
// AgentEvent enum values (mirror of the SDK app's agent-event.object.ts).
|
||||
// Ozonetel webhook actions → Helix event types.
|
||||
export type AgentEventType =
|
||||
| 'LOGIN'
|
||||
| 'LOGOUT'
|
||||
| 'READY'
|
||||
| 'PAUSE'
|
||||
| 'RESUME'
|
||||
| 'CALL_START'
|
||||
| 'CALL_END'
|
||||
| 'ACW_START'
|
||||
| 'ACW_END';
|
||||
|
||||
// Separate pending slots per event category. Call + ACW overlap (agent
|
||||
// enters ACW before the CALL_END arrives), so a single shared slot would
|
||||
// let ACW_START clobber pending CALL_START and produce 0-second call
|
||||
// durations. Keep one slot per category so each END event pairs cleanly.
|
||||
type PendingSlot = 'pause' | 'call' | 'acw';
|
||||
type PendingStarts = {
|
||||
pause?: number; // PAUSE eventAt ms
|
||||
call?: number; // CALL_START eventAt ms
|
||||
acw?: number; // ACW_START eventAt ms
|
||||
};
|
||||
|
||||
/**
|
||||
* Persists agent activity and per-call timing into the platform entities
|
||||
* we added in Phase 1 (AgentEvent, Call SLA fields). Reads AgentSession
|
||||
* later via the rollup job.
|
||||
*
|
||||
* Called from:
|
||||
* - supervisor.service.handleAgentEvent → persistAgentEvent()
|
||||
* - supervisor.service.handleCallEvent → patchCallTiming()
|
||||
* - ozonetel-agent.controller dispose flow → patchCallTiming()
|
||||
*/
|
||||
@Injectable()
|
||||
export class AgentHistoryService implements OnModuleInit {
|
||||
private readonly logger = new Logger(AgentHistoryService.name);
|
||||
|
||||
// ozonetelAgentId → Agent entity UUID. Loaded at startup.
|
||||
private readonly agentUuidByOzonetelId = new Map<string, string>();
|
||||
|
||||
// agentId → map of pending start events per category, used to compute
|
||||
// durationSec on the matching END event.
|
||||
private readonly pendingStartsByAgent = new Map<string, PendingStarts>();
|
||||
|
||||
constructor(private readonly platform: PlatformGraphqlService) {}
|
||||
|
||||
private rollupTimer: NodeJS.Timeout | null = null;
|
||||
|
||||
async onModuleInit() {
|
||||
await this.refreshAgentCache();
|
||||
// Roll up today's sessions every 15 minutes. Rollup is idempotent
|
||||
// (upsert by agent+date), so missing a tick is safe — the next tick
|
||||
// recomputes from AgentEvent history. Written with setInterval because
|
||||
// @nestjs/schedule isn't installed in this sidecar.
|
||||
this.rollupTimer = setInterval(() => {
|
||||
this.rollupSessions(this.currentSessionDate()).catch((err) => {
|
||||
this.logger.warn(`[HISTORY] Rollup tick failed: ${err?.message ?? err}`);
|
||||
});
|
||||
}, 15 * 60 * 1000);
|
||||
// Kick off one immediately so the dashboard has data on boot.
|
||||
this.rollupSessions(this.currentSessionDate()).catch(() => {});
|
||||
}
|
||||
|
||||
onModuleDestroy() {
|
||||
if (this.rollupTimer) clearInterval(this.rollupTimer);
|
||||
}
|
||||
|
||||
// IST day boundary — agents work in IST, so the rollup is by IST date.
|
||||
private currentSessionDate(): string {
|
||||
const now = new Date();
|
||||
const ist = new Date(now.getTime() + 5.5 * 60 * 60 * 1000);
|
||||
return ist.toISOString().slice(0, 10);
|
||||
}
|
||||
|
||||
private async refreshAgentCache(): Promise<void> {
|
||||
try {
|
||||
const data = await this.platform.query<any>(
|
||||
`{ agents(first: 50) { edges { node { id ozonetelAgentId } } } }`,
|
||||
);
|
||||
const edges = data?.agents?.edges ?? [];
|
||||
this.agentUuidByOzonetelId.clear();
|
||||
for (const edge of edges) {
|
||||
const n = edge.node;
|
||||
if (n.ozonetelAgentId) {
|
||||
this.agentUuidByOzonetelId.set(n.ozonetelAgentId, n.id);
|
||||
}
|
||||
}
|
||||
this.logger.log(`[HISTORY] Loaded ${this.agentUuidByOzonetelId.size} agent UUIDs into cache`);
|
||||
} catch (err) {
|
||||
this.logger.warn(`[HISTORY] Failed to refresh agent cache: ${err}`);
|
||||
}
|
||||
}
|
||||
|
||||
private async resolveAgentUuid(ozonetelAgentId: string): Promise<string | null> {
|
||||
if (!ozonetelAgentId) return null;
|
||||
const cached = this.agentUuidByOzonetelId.get(ozonetelAgentId);
|
||||
if (cached) return cached;
|
||||
// Cache miss — refresh once (handles late-provisioned agents like Ganesh)
|
||||
await this.refreshAgentCache();
|
||||
return this.agentUuidByOzonetelId.get(ozonetelAgentId) ?? null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Record an agent activity event. Computes durationSec for END events
|
||||
* (RESUME, CALL_END, ACW_END) by pairing against the most recent START.
|
||||
* Non-fatal on failure — realtime SSE flow continues even if the
|
||||
* platform write errors.
|
||||
*/
|
||||
async persistAgentEvent(params: {
|
||||
ozonetelAgentId: string;
|
||||
eventType: AgentEventType;
|
||||
eventAt: string; // ISO
|
||||
pauseReason?: string | null;
|
||||
callId?: string | null;
|
||||
}): Promise<void> {
|
||||
const agentUuid = await this.resolveAgentUuid(params.ozonetelAgentId);
|
||||
if (!agentUuid) {
|
||||
this.logger.warn(`[HISTORY] No Agent entity for ozonetelAgentId=${params.ozonetelAgentId} — skipping event persist`);
|
||||
return;
|
||||
}
|
||||
|
||||
// Pair START → END events by category. CALL and ACW can overlap
|
||||
// (agent enters ACW before CALL_END arrives), so each lives in its
|
||||
// own slot. READY is a fallback close — supervisor.service already
|
||||
// maps 'release'/'IDLE' to RESUME / ACW_END when it knows the prior
|
||||
// state; READY only fires when that disambiguation failed, so it
|
||||
// clears anything dangling.
|
||||
let durationSec: number | null = null;
|
||||
const endSlot = this.slotForEnd(params.eventType);
|
||||
const startSlot = this.slotForStart(params.eventType);
|
||||
const eventMs = new Date(params.eventAt).getTime();
|
||||
|
||||
if (endSlot) {
|
||||
const pending = this.pendingStartsByAgent.get(params.ozonetelAgentId);
|
||||
const at = pending?.[endSlot];
|
||||
if (at !== undefined) {
|
||||
durationSec = Math.max(0, Math.round((eventMs - at) / 1000));
|
||||
delete pending![endSlot];
|
||||
if (!pending!.pause && !pending!.call && !pending!.acw) {
|
||||
this.pendingStartsByAgent.delete(params.ozonetelAgentId);
|
||||
}
|
||||
}
|
||||
} else if (startSlot) {
|
||||
const existing = this.pendingStartsByAgent.get(params.ozonetelAgentId) ?? {};
|
||||
existing[startSlot] = eventMs;
|
||||
this.pendingStartsByAgent.set(params.ozonetelAgentId, existing);
|
||||
} else if (params.eventType === 'READY' || params.eventType === 'LOGOUT') {
|
||||
// Defensive flush of any lingering slots on session boundaries.
|
||||
this.pendingStartsByAgent.delete(params.ozonetelAgentId);
|
||||
}
|
||||
|
||||
const data: Record<string, any> = {
|
||||
name: `${params.ozonetelAgentId} ${params.eventType}`,
|
||||
eventType: params.eventType,
|
||||
eventAt: params.eventAt,
|
||||
source: 'OZONETEL_SUBSCRIPTION',
|
||||
agentId: agentUuid,
|
||||
};
|
||||
if (params.pauseReason) data.pauseReason = params.pauseReason;
|
||||
if (durationSec !== null) data.durationS = durationSec;
|
||||
if (params.callId) data.callId = params.callId;
|
||||
|
||||
try {
|
||||
await this.platform.query<any>(
|
||||
`mutation($data: AgentEventCreateInput!) { createAgentEvent(data: $data) { id } }`,
|
||||
{ data },
|
||||
);
|
||||
} catch (err: any) {
|
||||
if (this.isEntityMissingError(err)) {
|
||||
if (!this.warnedEntityMissing) {
|
||||
this.logger.warn('[HISTORY] AgentEvent entity not synced on this workspace — skipping persistence');
|
||||
this.warnedEntityMissing = true;
|
||||
}
|
||||
return;
|
||||
}
|
||||
this.logger.warn(`[HISTORY] createAgentEvent failed: ${err}`);
|
||||
}
|
||||
}
|
||||
|
||||
private warnedEntityMissing = false;
|
||||
|
||||
private isEntityMissingError(err: unknown): boolean {
|
||||
const msg = String((err as any)?.message ?? err ?? '');
|
||||
return msg.includes('Cannot query field') || msg.includes('Unknown type')
|
||||
|| msg.includes('AgentEventCreateInput') || msg.includes('AgentSessionCreateInput');
|
||||
}
|
||||
|
||||
private slotForStart(eventType: AgentEventType): PendingSlot | null {
|
||||
if (eventType === 'PAUSE') return 'pause';
|
||||
if (eventType === 'CALL_START') return 'call';
|
||||
if (eventType === 'ACW_START') return 'acw';
|
||||
return null;
|
||||
}
|
||||
|
||||
private slotForEnd(eventType: AgentEventType): PendingSlot | null {
|
||||
if (eventType === 'RESUME') return 'pause';
|
||||
if (eventType === 'CALL_END') return 'call';
|
||||
if (eventType === 'ACW_END') return 'acw';
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Patch a Call record with SLA / timing fields derived from Ozonetel
|
||||
* webhooks or post-call CDR. All fields optional — caller passes only
|
||||
* what it has. Used for response-time and ACW histograms on the
|
||||
* supervisor dashboard.
|
||||
*/
|
||||
async patchCallTiming(callId: string, fields: {
|
||||
assignedAt?: string;
|
||||
answeredAt?: string;
|
||||
responseTimeSec?: number;
|
||||
handlingTimeSec?: number;
|
||||
acwDurationSec?: number;
|
||||
holdDurationSec?: number;
|
||||
}): Promise<void> {
|
||||
// Platform truncates `*Sec` → `*S` on field names.
|
||||
const fieldNameMap: Record<string, string> = {
|
||||
responseTimeSec: 'responseTimeS',
|
||||
handlingTimeSec: 'handlingTimeS',
|
||||
acwDurationSec: 'acwDurationS',
|
||||
holdDurationSec: 'holdDurationS',
|
||||
};
|
||||
const data: Record<string, any> = {};
|
||||
for (const [k, v] of Object.entries(fields)) {
|
||||
if (v !== undefined && v !== null) {
|
||||
data[fieldNameMap[k] ?? k] = v;
|
||||
}
|
||||
}
|
||||
if (Object.keys(data).length === 0) return;
|
||||
try {
|
||||
await this.platform.query<any>(
|
||||
`mutation($id: UUID!, $data: CallUpdateInput!) { updateCall(id: $id, data: $data) { id } }`,
|
||||
{ id: callId, data },
|
||||
);
|
||||
} catch (err) {
|
||||
this.logger.warn(`[HISTORY] updateCall timing failed (${callId}): ${err}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Aggregate AgentEvent rows into an AgentSession row per agent for the
|
||||
* given IST date. Called on a 15-minute interval; upserts by (agent,
|
||||
* sessionDate) so re-runs are safe.
|
||||
*/
|
||||
async rollupSessions(sessionDate: string): Promise<void> {
|
||||
if (this.agentUuidByOzonetelId.size === 0) await this.refreshAgentCache();
|
||||
const agentUuids = Array.from(new Set(this.agentUuidByOzonetelId.values()));
|
||||
if (agentUuids.length === 0) return;
|
||||
|
||||
const startIso = `${sessionDate}T00:00:00+05:30`;
|
||||
const endIso = `${sessionDate}T23:59:59+05:30`;
|
||||
|
||||
let succeeded = 0;
|
||||
for (const agentUuid of agentUuids) {
|
||||
try {
|
||||
const events = await this.fetchAgentEvents(agentUuid, startIso, endIso);
|
||||
const totals = this.aggregateEvents(events);
|
||||
await this.upsertSession(agentUuid, sessionDate, totals);
|
||||
succeeded++;
|
||||
} catch (err: any) {
|
||||
if (this.isEntityMissingError(err)) {
|
||||
if (!this.warnedEntityMissing) {
|
||||
this.logger.warn('[HISTORY] AgentEvent/AgentSession entities not synced on this workspace — skipping rollup');
|
||||
this.warnedEntityMissing = true;
|
||||
}
|
||||
return;
|
||||
}
|
||||
this.logger.warn(`[HISTORY] Rollup failed for agent ${agentUuid}: ${err?.message ?? err}`);
|
||||
}
|
||||
}
|
||||
this.logger.log(`[HISTORY] Rollup complete for ${sessionDate} — ${succeeded}/${agentUuids.length} agents`);
|
||||
}
|
||||
|
||||
// Platform strips the `Sec` suffix on numeric field names — schema uses
|
||||
// `durationS`, `loginDurationS`, etc. Map back to our canonical names
|
||||
// when reading.
|
||||
private async fetchAgentEvents(agentUuid: string, startIso: string, endIso: string): Promise<Array<{ eventType: AgentEventType; durationSec: number | null; eventAt: string }>> {
|
||||
const events: Array<{ eventType: AgentEventType; durationSec: number | null; eventAt: string }> = [];
|
||||
let after: string | null = null;
|
||||
for (let page = 0; page < 20; page++) {
|
||||
const cursorArg: string = after ? `, after: "${after}"` : '';
|
||||
const data: any = await this.platform.query<any>(
|
||||
`{ agentEvents(first: 200${cursorArg}, filter: { agentId: { eq: "${agentUuid}" }, eventAt: { gte: "${startIso}", lte: "${endIso}" } }, orderBy: [{ eventAt: AscNullsLast }]) {
|
||||
edges { node { eventType eventAt durationS } }
|
||||
pageInfo { hasNextPage endCursor }
|
||||
} }`,
|
||||
);
|
||||
const edges = data?.agentEvents?.edges ?? [];
|
||||
for (const e of edges) {
|
||||
events.push({
|
||||
eventType: e.node.eventType,
|
||||
eventAt: e.node.eventAt,
|
||||
durationSec: e.node.durationS ?? null,
|
||||
});
|
||||
}
|
||||
const pageInfo: { hasNextPage?: boolean; endCursor?: string } = data?.agentEvents?.pageInfo ?? {};
|
||||
if (!pageInfo.hasNextPage) break;
|
||||
after = pageInfo.endCursor ?? null;
|
||||
}
|
||||
return events;
|
||||
}
|
||||
|
||||
private aggregateEvents(events: Array<{ eventType: AgentEventType; durationSec: number | null; eventAt: string }>) {
|
||||
let busyTimeSec = 0;
|
||||
let pauseTimeSec = 0;
|
||||
let wrapupTimeSec = 0;
|
||||
let handlingSum = 0;
|
||||
let handlingCount = 0;
|
||||
|
||||
// Login duration: sum each LOGIN → (next LOGOUT on same day | now) span.
|
||||
// Ozonetel doesn't emit a LOGOUT if the agent just closes the tab, so
|
||||
// cap open sessions at the end of the rollup day.
|
||||
let loginDurationSec = 0;
|
||||
let openLoginAt: number | null = null;
|
||||
|
||||
for (const e of events) {
|
||||
if (e.eventType === 'LOGIN') {
|
||||
openLoginAt = new Date(e.eventAt).getTime();
|
||||
} else if (e.eventType === 'LOGOUT' && openLoginAt !== null) {
|
||||
loginDurationSec += Math.max(0, Math.round((new Date(e.eventAt).getTime() - openLoginAt) / 1000));
|
||||
openLoginAt = null;
|
||||
} else if (e.eventType === 'CALL_END' && e.durationSec) {
|
||||
busyTimeSec += e.durationSec;
|
||||
handlingSum += e.durationSec;
|
||||
handlingCount++;
|
||||
} else if (e.eventType === 'RESUME' && e.durationSec) {
|
||||
pauseTimeSec += e.durationSec;
|
||||
} else if (e.eventType === 'ACW_END' && e.durationSec) {
|
||||
wrapupTimeSec += e.durationSec;
|
||||
}
|
||||
}
|
||||
if (openLoginAt !== null) {
|
||||
// Still logged in — count up to now (capped to the rollup day end).
|
||||
loginDurationSec += Math.max(0, Math.round((Date.now() - openLoginAt) / 1000));
|
||||
}
|
||||
|
||||
const avgHandlingTimeSec = handlingCount > 0 ? Math.round(handlingSum / handlingCount) : null;
|
||||
const idleTimeSec = Math.max(0, loginDurationSec - busyTimeSec - pauseTimeSec - wrapupTimeSec);
|
||||
|
||||
return { loginDurationSec, busyTimeSec, pauseTimeSec, wrapupTimeSec, idleTimeSec, avgHandlingTimeSec };
|
||||
}
|
||||
|
||||
// AgentSession fields map: our `*Sec` → platform `*S`, `sessionDate` → `date`.
|
||||
private async upsertSession(
|
||||
agentUuid: string,
|
||||
sessionDate: string,
|
||||
totals: { loginDurationSec: number; busyTimeSec: number; pauseTimeSec: number; wrapupTimeSec: number; idleTimeSec: number; avgHandlingTimeSec: number | null },
|
||||
): Promise<void> {
|
||||
const existing = await this.platform.query<any>(
|
||||
`{ agentSessions(first: 1, filter: { agentId: { eq: "${agentUuid}" }, date: { eq: "${sessionDate}" } }) { edges { node { id } } } }`,
|
||||
);
|
||||
const existingId = existing?.agentSessions?.edges?.[0]?.node?.id;
|
||||
|
||||
const data: Record<string, any> = {
|
||||
loginDurationS: totals.loginDurationSec,
|
||||
busyTimeS: totals.busyTimeSec,
|
||||
pauseTimeS: totals.pauseTimeSec,
|
||||
wrapupTimeS: totals.wrapupTimeSec,
|
||||
idleTimeS: totals.idleTimeSec,
|
||||
source: 'COMPUTED',
|
||||
lastSyncedAt: new Date().toISOString(),
|
||||
};
|
||||
if (totals.avgHandlingTimeSec !== null) data.avgHandlingTimeS = totals.avgHandlingTimeSec;
|
||||
|
||||
if (existingId) {
|
||||
await this.platform.query<any>(
|
||||
`mutation($id: UUID!, $data: AgentSessionUpdateInput!) { updateAgentSession(id: $id, data: $data) { id } }`,
|
||||
{ id: existingId, data },
|
||||
);
|
||||
} else {
|
||||
await this.platform.query<any>(
|
||||
`mutation($data: AgentSessionCreateInput!) { createAgentSession(data: $data) { id } }`,
|
||||
{ data: { ...data, name: `Session ${sessionDate}`, agentId: agentUuid, date: sessionDate } },
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
91
src/supervisor/performance-alerts.controller.ts
Normal file
91
src/supervisor/performance-alerts.controller.ts
Normal file
@@ -0,0 +1,91 @@
|
||||
import { Controller, Get, Post, Param, Logger } from '@nestjs/common';
|
||||
import { PlatformGraphqlService } from '../platform/platform-graphql.service';
|
||||
|
||||
/**
|
||||
* Read API for the supervisor notification bell. Returns active (non-
|
||||
* dismissed) PerformanceAlert rows the rules engine has emitted.
|
||||
*
|
||||
* Frontend polls every 60s. Dismiss is per-alert.
|
||||
*/
|
||||
@Controller('api/supervisor/performance-alerts')
|
||||
export class PerformanceAlertsController {
|
||||
private readonly logger = new Logger(PerformanceAlertsController.name);
|
||||
|
||||
constructor(private readonly platform: PlatformGraphqlService) {}
|
||||
|
||||
@Get()
|
||||
async list() {
|
||||
const data = await this.platform.query<any>(
|
||||
`{ performanceAlerts(
|
||||
first: 50,
|
||||
filter: { dismissedAt: { is: NULL } },
|
||||
orderBy: [{ firedAt: DescNullsLast }]
|
||||
) {
|
||||
edges { node {
|
||||
id alertType severity message value ruleId firedAt
|
||||
agent { id name }
|
||||
} }
|
||||
} }`,
|
||||
);
|
||||
const edges = data?.performanceAlerts?.edges ?? [];
|
||||
return {
|
||||
alerts: edges.map((e: any) => {
|
||||
const n = e.node;
|
||||
return {
|
||||
id: n.id,
|
||||
agent: n.agent?.name ?? 'Unknown',
|
||||
agentId: n.agent?.id ?? null,
|
||||
type: this.toLabel(n.alertType),
|
||||
severity: (n.severity ?? 'WARNING').toLowerCase(),
|
||||
value: n.value ?? '',
|
||||
message: n.message,
|
||||
firedAt: n.firedAt,
|
||||
ruleId: n.ruleId,
|
||||
};
|
||||
}),
|
||||
};
|
||||
}
|
||||
|
||||
@Post(':id/dismiss')
|
||||
async dismiss(@Param('id') id: string) {
|
||||
try {
|
||||
await this.platform.query<any>(
|
||||
`mutation($id: UUID!, $data: PerformanceAlertUpdateInput!) { updatePerformanceAlert(id: $id, data: $data) { id } }`,
|
||||
{ id, data: { dismissedAt: new Date().toISOString() } },
|
||||
);
|
||||
return { status: 'ok' };
|
||||
} catch (err: any) {
|
||||
this.logger.warn(`[ALERTS] Dismiss failed for ${id}: ${err?.message ?? err}`);
|
||||
return { status: 'error', message: String(err?.message ?? err) };
|
||||
}
|
||||
}
|
||||
|
||||
private toLabel(alertType: string | null | undefined): string {
|
||||
switch (alertType) {
|
||||
case 'EXCESSIVE_IDLE': return 'Excessive Idle Time';
|
||||
case 'LOW_NPS': return 'Low NPS';
|
||||
case 'LOW_CONVERSION': return 'Low Conversion';
|
||||
default: return alertType ?? 'Alert';
|
||||
}
|
||||
}
|
||||
|
||||
@Post('dismiss-all')
|
||||
async dismissAll() {
|
||||
const now = new Date().toISOString();
|
||||
const data = await this.platform.query<any>(
|
||||
`{ performanceAlerts(first: 100, filter: { dismissedAt: { is: NULL } }) { edges { node { id } } } }`,
|
||||
);
|
||||
const ids = (data?.performanceAlerts?.edges ?? []).map((e: any) => e.node.id);
|
||||
let dismissed = 0;
|
||||
for (const id of ids) {
|
||||
try {
|
||||
await this.platform.query<any>(
|
||||
`mutation($id: UUID!, $data: PerformanceAlertUpdateInput!) { updatePerformanceAlert(id: $id, data: $data) { id } }`,
|
||||
{ id, data: { dismissedAt: now } },
|
||||
);
|
||||
dismissed++;
|
||||
} catch {}
|
||||
}
|
||||
return { status: 'ok', dismissed };
|
||||
}
|
||||
}
|
||||
163
src/supervisor/supervisor-barge.controller.ts
Normal file
163
src/supervisor/supervisor-barge.controller.ts
Normal file
@@ -0,0 +1,163 @@
|
||||
import { Controller, Post, Get, Body, HttpException, Logger } from '@nestjs/common';
|
||||
import axios from 'axios';
|
||||
import { OzonetelAdminAuthService } from '../ozonetel/ozonetel-admin-auth.service';
|
||||
import { SupervisorService } from './supervisor.service';
|
||||
import { TelephonyConfigService } from '../config/telephony-config.service';
|
||||
|
||||
// Supervisor barge/whisper/listen endpoints.
|
||||
// Proxies requests to Ozonetel's dashboardApi using admin JWT auth.
|
||||
//
|
||||
// API reference (from CA-Admin source code):
|
||||
// apiId 63 → CALL_BARGEIN (initiate barge)
|
||||
// apiId 158 → Redis barge state (insert/delete)
|
||||
// apiId 139 → SIP credential pool (sipSubscribe)
|
||||
|
||||
@Controller('api/supervisor/barge')
|
||||
export class SupervisorBargeController {
|
||||
private readonly logger = new Logger(SupervisorBargeController.name);
|
||||
private readonly dashboardApiUrl = 'https://api.cloudagent.ozonetel.com/dashboardApi/monitor/api';
|
||||
private readonly adminApiUrl = 'https://api.cloudagent.ozonetel.com/ca-admin-Api/CloudAgentAPI';
|
||||
|
||||
constructor(
|
||||
private readonly adminAuth: OzonetelAdminAuthService,
|
||||
private readonly supervisor: SupervisorService,
|
||||
private readonly telephony: TelephonyConfigService,
|
||||
) {}
|
||||
|
||||
@Get('sip-credentials')
|
||||
async getSipCredentials() {
|
||||
if (!this.adminAuth.isConfigured()) {
|
||||
throw new HttpException('Ozonetel admin not configured — add credentials in Settings → Telephony', 503);
|
||||
}
|
||||
|
||||
const config = this.telephony.getConfig();
|
||||
const sipGateway = `${config.sip.domain}:${config.sip.wsPort}`;
|
||||
const headers = await this.adminAuth.getAuthHeaders();
|
||||
|
||||
try {
|
||||
const res = await axios.post(`${this.adminApiUrl}/endpoint/sipnumber/sipSubscribe`, {
|
||||
apiId: 139,
|
||||
sipURL: sipGateway,
|
||||
}, { headers });
|
||||
|
||||
const data = res.data;
|
||||
this.logger.log(`[BARGE] SIP credentials response: ${JSON.stringify(data)}`);
|
||||
|
||||
if (!data?.sip_number) {
|
||||
throw new HttpException('No SIP numbers available in pool', 503);
|
||||
}
|
||||
|
||||
return {
|
||||
sipNumber: data.sip_number,
|
||||
sipPassword: data.password,
|
||||
sipDomain: data.pop_location ?? config.sip.domain,
|
||||
sipPort: config.sip.wsPort,
|
||||
};
|
||||
} catch (err: any) {
|
||||
this.logger.error(`[BARGE] SIP credentials failed: ${err.message}`);
|
||||
if (err instanceof HttpException) throw err;
|
||||
throw new HttpException('Failed to fetch SIP credentials', 502);
|
||||
}
|
||||
}
|
||||
|
||||
@Post()
|
||||
async initiateBarge(@Body() body: { ucid: string; agentId: string; agentNumber: string; supervisorId?: string }) {
|
||||
if (!body.ucid || !body.agentNumber) {
|
||||
throw new HttpException('ucid and agentNumber required', 400);
|
||||
}
|
||||
if (!this.adminAuth.isConfigured()) {
|
||||
throw new HttpException('Ozonetel admin not configured — add credentials in Settings → Telephony', 503);
|
||||
}
|
||||
|
||||
// Prevent double-barge on same agent
|
||||
const existing = this.supervisor.getBargeSession(body.agentId);
|
||||
if (existing) {
|
||||
throw new HttpException(`Agent ${body.agentId} is already being monitored`, 409);
|
||||
}
|
||||
|
||||
// Get SIP credentials from Ozonetel pool
|
||||
const sipCreds = await this.getSipCredentials();
|
||||
const headers = await this.adminAuth.getAuthHeaders();
|
||||
|
||||
try {
|
||||
const res = await axios.post(this.dashboardApiUrl, {
|
||||
apiId: 63,
|
||||
ucid: body.ucid,
|
||||
action: 'CALL_BARGEIN',
|
||||
isSip: true,
|
||||
phoneno: sipCreds.sipNumber,
|
||||
agentNumber: body.agentNumber,
|
||||
cbURL: 'helix-engage',
|
||||
}, { headers });
|
||||
|
||||
this.logger.log(`[BARGE] Initiated: ucid=${body.ucid} agent=${body.agentId} sip=${sipCreds.sipNumber} response=${JSON.stringify(res.data)}`);
|
||||
|
||||
// Track the session
|
||||
this.supervisor.startBargeSession({
|
||||
supervisorId: body.supervisorId ?? 'admin',
|
||||
agentId: body.agentId,
|
||||
sipNumber: sipCreds.sipNumber,
|
||||
mode: 'listen',
|
||||
startedAt: new Date().toISOString(),
|
||||
});
|
||||
|
||||
return {
|
||||
status: 'ok',
|
||||
...sipCreds,
|
||||
ozonetelResponse: res.data,
|
||||
};
|
||||
} catch (err: any) {
|
||||
this.logger.error(`[BARGE] Initiation failed: ${err.message} ${err.response?.data ? JSON.stringify(err.response.data) : ''}`);
|
||||
throw new HttpException(`Barge failed: ${err.response?.data?.Message ?? err.message}`, 502);
|
||||
}
|
||||
}
|
||||
|
||||
@Post('mode')
|
||||
async updateMode(@Body() body: { agentId: string; mode: 'listen' | 'whisper' | 'barge' }) {
|
||||
if (!body.agentId || !body.mode) {
|
||||
throw new HttpException('agentId and mode required', 400);
|
||||
}
|
||||
if (!['listen', 'whisper', 'barge'].includes(body.mode)) {
|
||||
throw new HttpException('mode must be listen, whisper, or barge', 400);
|
||||
}
|
||||
|
||||
const session = this.supervisor.getBargeSession(body.agentId);
|
||||
if (!session) {
|
||||
throw new HttpException('No active barge session for this agent', 404);
|
||||
}
|
||||
|
||||
this.supervisor.updateBargeMode(body.agentId, body.mode);
|
||||
return { status: 'ok', mode: body.mode };
|
||||
}
|
||||
|
||||
@Post('end')
|
||||
async endBarge(@Body() body: { agentId: string }) {
|
||||
if (!body.agentId) {
|
||||
throw new HttpException('agentId required', 400);
|
||||
}
|
||||
|
||||
const session = this.supervisor.getBargeSession(body.agentId);
|
||||
if (!session) {
|
||||
return { status: 'ok', message: 'No active session' };
|
||||
}
|
||||
|
||||
// Clear Redis tracking on Ozonetel side (best-effort)
|
||||
if (this.adminAuth.isConfigured()) {
|
||||
try {
|
||||
const headers = await this.adminAuth.getAuthHeaders();
|
||||
await axios.post(this.dashboardApiUrl, {
|
||||
apiId: 158,
|
||||
Action: 'delete',
|
||||
AgentId: body.agentId,
|
||||
Sip: session.sipNumber,
|
||||
}, { headers });
|
||||
this.logger.log(`[BARGE] Redis cleanup: ${body.agentId}`);
|
||||
} catch (err: any) {
|
||||
this.logger.warn(`[BARGE] Redis cleanup failed (non-critical): ${err.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
this.supervisor.endBargeSession(body.agentId);
|
||||
return { status: 'ok' };
|
||||
}
|
||||
}
|
||||
@@ -1,13 +1,20 @@
|
||||
import { Module } from '@nestjs/common';
|
||||
import { Module, forwardRef } from '@nestjs/common';
|
||||
import { PlatformModule } from '../platform/platform.module';
|
||||
import { OzonetelAgentModule } from '../ozonetel/ozonetel-agent.module';
|
||||
import { SupervisorController } from './supervisor.controller';
|
||||
import { SupervisorBargeController } from './supervisor-barge.controller';
|
||||
import { PerformanceAlertsController } from './performance-alerts.controller';
|
||||
import { SupervisorService } from './supervisor.service';
|
||||
import { AgentHistoryService } from './agent-history.service';
|
||||
import { OzonetelAdminAuthService } from '../ozonetel/ozonetel-admin-auth.service';
|
||||
|
||||
// Note: TelephonyConfigService is available without import because
|
||||
// ConfigThemeModule is @Global(). Do NOT import ConfigThemeModule here
|
||||
// — it causes a circular dependency via AuthModule.
|
||||
@Module({
|
||||
imports: [PlatformModule, OzonetelAgentModule],
|
||||
controllers: [SupervisorController],
|
||||
providers: [SupervisorService],
|
||||
exports: [SupervisorService],
|
||||
imports: [PlatformModule, forwardRef(() => OzonetelAgentModule)],
|
||||
controllers: [SupervisorController, SupervisorBargeController, PerformanceAlertsController],
|
||||
providers: [SupervisorService, AgentHistoryService, OzonetelAdminAuthService],
|
||||
exports: [SupervisorService, AgentHistoryService, OzonetelAdminAuthService],
|
||||
})
|
||||
export class SupervisorModule {}
|
||||
|
||||
@@ -3,6 +3,7 @@ import { ConfigService } from '@nestjs/config';
|
||||
import { Subject } from 'rxjs';
|
||||
import { PlatformGraphqlService } from '../platform/platform-graphql.service';
|
||||
import { OzonetelAgentService } from '../ozonetel/ozonetel-agent.service';
|
||||
import { AgentHistoryService, AgentEventType } from './agent-history.service';
|
||||
|
||||
type ActiveCall = {
|
||||
ucid: string;
|
||||
@@ -20,23 +21,53 @@ type AgentStateEntry = {
|
||||
timestamp: string;
|
||||
};
|
||||
|
||||
// ACW auto-dispose: if an agent has been in ACW for longer than this
|
||||
// without the frontend calling /api/ozonetel/dispose, the server
|
||||
// auto-disposes with a default disposition + autoRelease. This is the
|
||||
// Layer 3 safety net — covers browser crash, tab close, page refresh
|
||||
// where sendBeacon didn't fire, or any other frontend failure.
|
||||
const ACW_TIMEOUT_MS = 30_000; // 30 seconds
|
||||
const ACW_DEFAULT_DISPOSITION = 'General Enquiry';
|
||||
|
||||
@Injectable()
|
||||
export class SupervisorService implements OnModuleInit {
|
||||
private readonly logger = new Logger(SupervisorService.name);
|
||||
private readonly activeCalls = new Map<string, ActiveCall>();
|
||||
private readonly agentStates = new Map<string, AgentStateEntry>();
|
||||
readonly agentStateSubject = new Subject<{ agentId: string; state: AgentOzonetelState; timestamp: string }>();
|
||||
private readonly acwTimers = new Map<string, NodeJS.Timeout>();
|
||||
readonly agentStateSubject = new Subject<{ agentId: string; state: AgentOzonetelState | string; timestamp: string }>();
|
||||
|
||||
// Barge session tracking — key is agentId
|
||||
private readonly bargeSessions = new Map<string, {
|
||||
supervisorId: string;
|
||||
agentId: string;
|
||||
sipNumber: string;
|
||||
mode: 'listen' | 'whisper' | 'barge';
|
||||
startedAt: string;
|
||||
}>();
|
||||
|
||||
constructor(
|
||||
private platform: PlatformGraphqlService,
|
||||
private ozonetel: OzonetelAgentService,
|
||||
private config: ConfigService,
|
||||
private history: AgentHistoryService,
|
||||
) {}
|
||||
|
||||
async onModuleInit() {
|
||||
this.logger.log('Supervisor service initialized');
|
||||
}
|
||||
|
||||
// Called by the dispose endpoint to cancel the ACW timer
|
||||
// (agent submitted disposition before the timeout)
|
||||
cancelAcwTimer(agentId: string) {
|
||||
const timer = this.acwTimers.get(agentId);
|
||||
if (timer) {
|
||||
clearTimeout(timer);
|
||||
this.acwTimers.delete(agentId);
|
||||
this.logger.log(`[ACW-TIMER] Cancelled for ${agentId} (disposition received)`);
|
||||
}
|
||||
}
|
||||
|
||||
handleCallEvent(event: any) {
|
||||
const action = event.action;
|
||||
const ucid = event.ucid ?? event.monitorUCID;
|
||||
@@ -44,37 +75,155 @@ export class SupervisorService implements OnModuleInit {
|
||||
const callerNumber = event.caller_id ?? event.callerID;
|
||||
const callType = event.call_type ?? event.Type;
|
||||
const eventTime = event.event_time ?? event.eventTime ?? new Date().toISOString();
|
||||
const iso = this.parseOzonetelTime(eventTime);
|
||||
|
||||
if (!ucid) return;
|
||||
|
||||
if (action === 'Answered' || action === 'Calling') {
|
||||
// Don't show calls for offline agents (ghost calls)
|
||||
const agentState = this.agentStates.get(agentId);
|
||||
if (agentState?.state === 'offline') {
|
||||
this.logger.warn(`Ignoring call event for offline agent ${agentId} (${ucid})`);
|
||||
return;
|
||||
}
|
||||
this.activeCalls.set(ucid, {
|
||||
ucid, agentId, callerNumber,
|
||||
callType, startTime: eventTime, status: 'active',
|
||||
});
|
||||
this.logger.log(`Active call: ${agentId} ↔ ${callerNumber} (${ucid})`);
|
||||
|
||||
// Persist CALL_START as AgentEvent on the "Answered" moment
|
||||
// (that's when busy-time actually begins). "Calling" is the
|
||||
// ring — doesn't count as busy.
|
||||
if (action === 'Answered' && agentId) {
|
||||
this.history.persistAgentEvent({
|
||||
ozonetelAgentId: agentId,
|
||||
eventType: 'CALL_START',
|
||||
eventAt: iso,
|
||||
}).catch(() => {});
|
||||
}
|
||||
} else if (action === 'Disconnect') {
|
||||
const wasActive = this.activeCalls.get(ucid);
|
||||
this.activeCalls.delete(ucid);
|
||||
this.logger.log(`Call ended: ${ucid}`);
|
||||
|
||||
// Persist CALL_END — pair against the start for duration.
|
||||
if (wasActive?.agentId) {
|
||||
this.history.persistAgentEvent({
|
||||
ozonetelAgentId: wasActive.agentId,
|
||||
eventType: 'CALL_END',
|
||||
eventAt: iso,
|
||||
}).catch(() => {});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Ozonetel sends timestamps in "YYYY-MM-DD HH:MM:SS" IST format. Normalise.
|
||||
private parseOzonetelTime(raw: string): string {
|
||||
if (!raw) return new Date().toISOString();
|
||||
const asDate = new Date(raw);
|
||||
if (!isNaN(asDate.getTime())) return asDate.toISOString();
|
||||
return new Date().toISOString();
|
||||
}
|
||||
|
||||
handleAgentEvent(event: any) {
|
||||
const agentId = event.agentId ?? event.agent_id ?? 'unknown';
|
||||
const action = event.action ?? 'unknown';
|
||||
const eventData = event.eventData ?? '';
|
||||
const eventData = event.eventData ?? event.data ?? '';
|
||||
const pauseReason = event.pauseReason ?? event.pause_reason ?? event.breakReason ?? '';
|
||||
const eventTime = event.event_time ?? event.eventTime ?? new Date().toISOString();
|
||||
this.logger.log(`[AGENT-STATE] ${agentId} → ${action}${eventData ? ` (${eventData})` : ''} at ${eventTime}`);
|
||||
this.logger.log(`[AGENT-STATE] ${agentId} → ${action} eventData="${eventData}" pauseReason="${pauseReason}" at ${eventTime}`);
|
||||
this.logger.log(`[AGENT-STATE] Full event payload: ${JSON.stringify(event)}`);
|
||||
|
||||
const mapped = this.mapOzonetelAction(action, eventData);
|
||||
const priorState = this.agentStates.get(agentId)?.state;
|
||||
const mapped = this.mapOzonetelAction(action, eventData, pauseReason);
|
||||
if (mapped) {
|
||||
this.agentStates.set(agentId, { state: mapped, timestamp: eventTime });
|
||||
this.agentStateSubject.next({ agentId, state: mapped, timestamp: eventTime });
|
||||
this.logger.log(`[AGENT-STATE] Emitted: ${agentId} → ${mapped}`);
|
||||
|
||||
// Persist to AgentEvent table. CALL_START/CALL_END are
|
||||
// handled in handleCallEvent (they arrive via a separate
|
||||
// Ozonetel webhook). Everything else is captured here.
|
||||
// Pass priorState so 'release' → RESUME / ACW_END / READY can
|
||||
// be disambiguated for the session rollup.
|
||||
const historyEventType = this.mapToHistoryEventType(action, priorState);
|
||||
if (historyEventType) {
|
||||
const resolvedPauseReason = (pauseReason || eventData || '') || null;
|
||||
this.history.persistAgentEvent({
|
||||
ozonetelAgentId: agentId,
|
||||
eventType: historyEventType,
|
||||
eventAt: this.parseOzonetelTime(eventTime),
|
||||
pauseReason: historyEventType === 'PAUSE' ? resolvedPauseReason : null,
|
||||
}).catch(() => {});
|
||||
}
|
||||
|
||||
// Layer 3: ACW auto-dispose safety net
|
||||
if (mapped === 'acw') {
|
||||
// Find the most recent UCID for this agent
|
||||
const lastCall = Array.from(this.activeCalls.values())
|
||||
.filter(c => c.agentId === agentId)
|
||||
.pop();
|
||||
const ucid = lastCall?.ucid;
|
||||
|
||||
this.cancelAcwTimer(agentId); // clear any existing timer
|
||||
const timer = setTimeout(async () => {
|
||||
// Check if agent is STILL in ACW (they might have disposed by now)
|
||||
const current = this.agentStates.get(agentId);
|
||||
if (current?.state !== 'acw') {
|
||||
this.logger.log(`[ACW-TIMER] ${agentId} no longer in ACW — skipping auto-dispose`);
|
||||
return;
|
||||
}
|
||||
this.logger.warn(`[ACW-TIMER] ${agentId} stuck in ACW for ${ACW_TIMEOUT_MS / 1000}s — auto-disposing${ucid ? ` (UCID ${ucid})` : ''}`);
|
||||
try {
|
||||
if (ucid) {
|
||||
await this.ozonetel.setDisposition({ agentId, ucid, disposition: ACW_DEFAULT_DISPOSITION });
|
||||
} else {
|
||||
await this.ozonetel.changeAgentState({ agentId, state: 'Ready' });
|
||||
}
|
||||
this.logger.log(`[ACW-TIMER] Auto-dispose successful for ${agentId}`);
|
||||
} catch (err: any) {
|
||||
this.logger.error(`[ACW-TIMER] Auto-dispose failed for ${agentId}: ${err.message}`);
|
||||
// Last resort: try force-ready
|
||||
try {
|
||||
await this.ozonetel.changeAgentState({ agentId, state: 'Ready' });
|
||||
} catch {}
|
||||
}
|
||||
this.acwTimers.delete(agentId);
|
||||
}, ACW_TIMEOUT_MS);
|
||||
this.acwTimers.set(agentId, timer);
|
||||
this.logger.log(`[ACW-TIMER] Started ${ACW_TIMEOUT_MS / 1000}s timer for ${agentId}`);
|
||||
} else if (mapped === 'ready' || mapped === 'offline') {
|
||||
// Agent left ACW normally — cancel the timer
|
||||
this.cancelAcwTimer(agentId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private mapOzonetelAction(action: string, eventData: string): AgentOzonetelState | null {
|
||||
// Map the Ozonetel webhook action to our AgentEvent.eventType enum.
|
||||
// 'release' means "agent is available again" — could be post-pause,
|
||||
// post-ACW, or post-call. Use the previous agent state to emit the
|
||||
// specific close-out event so session rollups can sum durations by
|
||||
// category (pause vs wrapup vs busy) without extra metadata.
|
||||
private mapToHistoryEventType(action: string, priorState: AgentOzonetelState | undefined): AgentEventType | null {
|
||||
switch (action) {
|
||||
case 'login': return 'LOGIN';
|
||||
case 'logout': return 'LOGOUT';
|
||||
case 'ACW': return 'ACW_START';
|
||||
case 'pause':
|
||||
case 'AUX':
|
||||
return 'PAUSE';
|
||||
case 'release':
|
||||
case 'IDLE':
|
||||
if (priorState === 'acw') return 'ACW_END';
|
||||
if (priorState === 'break' || priorState === 'training') return 'RESUME';
|
||||
return 'READY';
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private mapOzonetelAction(action: string, eventData: string, pauseReason?: string): AgentOzonetelState | null {
|
||||
switch (action) {
|
||||
case 'release': return 'ready';
|
||||
case 'IDLE': return 'ready'; // agent available after unanswered/canceled call
|
||||
@@ -82,11 +231,16 @@ export class SupervisorService implements OnModuleInit {
|
||||
case 'incall': return 'in-call';
|
||||
case 'ACW': return 'acw';
|
||||
case 'logout': return 'offline';
|
||||
case 'AUX':
|
||||
case 'pause': // Ozonetel sends 'pause' via webhook when agent is paused
|
||||
case 'AUX': {
|
||||
// "changeMode" is the brief AUX during login — not a real pause
|
||||
if (eventData === 'changeMode') return null;
|
||||
if (eventData?.toLowerCase().includes('training')) return 'training';
|
||||
// Check pauseReason first (explicit field), then fall back to eventData
|
||||
const reason = (pauseReason || eventData || '').toLowerCase();
|
||||
this.logger.log(`[AGENT-STATE] Pause reason resolved: "${reason}"`);
|
||||
if (reason.includes('training')) return 'training';
|
||||
return 'break';
|
||||
}
|
||||
case 'login': return null; // wait for release
|
||||
default: return null;
|
||||
}
|
||||
@@ -120,20 +274,152 @@ export class SupervisorService implements OnModuleInit {
|
||||
);
|
||||
const agents = agentData?.agents?.edges?.map((e: any) => e.node) ?? [];
|
||||
|
||||
// Fetch Ozonetel time summary per agent
|
||||
// Fetch AgentSession rows for this date — the authoritative source
|
||||
// for time breakdowns now that Phase 2 ingest is live. Keyed by
|
||||
// agentId (UUID on platform) so we can match back by agent.id.
|
||||
const sessionByAgentId = await this.fetchAgentSessionsByDate(date);
|
||||
|
||||
// Fetch CDR for the entire account for this date (one call, not per-agent)
|
||||
let allCdr: any[] = [];
|
||||
try {
|
||||
allCdr = await this.ozonetel.fetchCDR({ date });
|
||||
} catch (err) {
|
||||
this.logger.warn(`Failed to fetch CDR for ${date}: ${err}`);
|
||||
}
|
||||
|
||||
// Merge AgentSession → timeBreakdown (Ozonetel shape for UI compat);
|
||||
// fall back to Ozonetel summary when no session row exists.
|
||||
const summaries = await Promise.all(
|
||||
agents.map(async (agent: any) => {
|
||||
if (!agent.ozonetelAgentId) return { ...agent, timeBreakdown: null };
|
||||
if (!agent.ozonetelAgentId) return { ...agent, timeBreakdown: null, calls: null };
|
||||
try {
|
||||
const summary = await this.ozonetel.getAgentSummary(agent.ozonetelAgentId, date);
|
||||
return { ...agent, timeBreakdown: summary };
|
||||
let timeBreakdown: any = null;
|
||||
let source: 'AGENT_SESSION' | 'OZONETEL_SUMMARY' | 'NONE' = 'NONE';
|
||||
|
||||
const session = sessionByAgentId.get(agent.id);
|
||||
if (session) {
|
||||
timeBreakdown = this.sessionToTimeBreakdown(session);
|
||||
source = 'AGENT_SESSION';
|
||||
} else {
|
||||
timeBreakdown = await this.ozonetel.getAgentSummary(agent.ozonetelAgentId, date);
|
||||
if (timeBreakdown) source = 'OZONETEL_SUMMARY';
|
||||
}
|
||||
|
||||
// Filter CDR to this agent
|
||||
const agentCdr = allCdr.filter(
|
||||
(c: any) => c.AgentID === agent.ozonetelAgentId || c.AgentName === agent.ozonetelAgentId,
|
||||
);
|
||||
const totalCalls = agentCdr.length;
|
||||
const inbound = agentCdr.filter((c: any) => c.Type === 'InBound').length;
|
||||
const outbound = agentCdr.filter((c: any) => c.Type === 'Manual' || c.Type === 'Progressive').length;
|
||||
const answered = agentCdr.filter((c: any) => c.Status === 'Answered').length;
|
||||
const missed = agentCdr.filter((c: any) => c.Status === 'NotAnswered').length;
|
||||
|
||||
return {
|
||||
...agent,
|
||||
timeBreakdown,
|
||||
timeBreakdownSource: source,
|
||||
calls: { total: totalCalls, inbound, outbound, answered, missed },
|
||||
};
|
||||
} catch (err) {
|
||||
this.logger.warn(`Failed to get summary for ${agent.ozonetelAgentId}: ${err}`);
|
||||
return { ...agent, timeBreakdown: null };
|
||||
return { ...agent, timeBreakdown: null, calls: null };
|
||||
}
|
||||
}),
|
||||
);
|
||||
|
||||
return { date, agents: summaries };
|
||||
// Aggregate team totals
|
||||
const teamTotals = {
|
||||
totalCalls: summaries.reduce((sum, a) => sum + (a.calls?.total ?? 0), 0),
|
||||
inbound: summaries.reduce((sum, a) => sum + (a.calls?.inbound ?? 0), 0),
|
||||
outbound: summaries.reduce((sum, a) => sum + (a.calls?.outbound ?? 0), 0),
|
||||
answered: summaries.reduce((sum, a) => sum + (a.calls?.answered ?? 0), 0),
|
||||
missed: summaries.reduce((sum, a) => sum + (a.calls?.missed ?? 0), 0),
|
||||
};
|
||||
|
||||
return { date, agents: summaries, teamTotals };
|
||||
}
|
||||
|
||||
// Pull AgentSession rows for the given IST date, keyed by agent UUID so
|
||||
// getTeamPerformance can look them up per-agent.
|
||||
private async fetchAgentSessionsByDate(date: string): Promise<Map<string, any>> {
|
||||
const map = new Map<string, any>();
|
||||
try {
|
||||
const data = await this.platform.query<any>(
|
||||
`{ agentSessions(first: 100, filter: { date: { eq: "${date}" } }) {
|
||||
edges { node {
|
||||
agentId loginDurationS busyTimeS idleTimeS pauseTimeS
|
||||
wrapupTimeS dialTimeS avgHandlingTimeS source lastSyncedAt
|
||||
} }
|
||||
} }`,
|
||||
);
|
||||
const edges = data?.agentSessions?.edges ?? [];
|
||||
for (const e of edges) {
|
||||
if (e.node?.agentId) map.set(e.node.agentId, e.node);
|
||||
}
|
||||
} catch (err) {
|
||||
this.logger.warn(`[PERF] Failed to fetch AgentSession rows for ${date}: ${err}`);
|
||||
}
|
||||
return map;
|
||||
}
|
||||
|
||||
// Render AgentSession seconds in the HH:MM:SS shape the frontend expects
|
||||
// (matches Ozonetel's summary so team-performance.tsx can parseTime() it
|
||||
// without changing the page code).
|
||||
private sessionToTimeBreakdown(session: any): any {
|
||||
const hms = (sec: number | null | undefined): string => {
|
||||
const s = Math.max(0, Math.round(sec ?? 0));
|
||||
const h = Math.floor(s / 3600);
|
||||
const m = Math.floor((s % 3600) / 60);
|
||||
const r = s % 60;
|
||||
return `${h}:${String(m).padStart(2, '0')}:${String(r).padStart(2, '0')}`;
|
||||
};
|
||||
return {
|
||||
totalLoginTime: hms(session.loginDurationS),
|
||||
totalBusyTime: hms(session.busyTimeS),
|
||||
totalIdleTime: hms(session.idleTimeS),
|
||||
totalPauseTime: hms(session.pauseTimeS),
|
||||
totalWrapupTime: hms(session.wrapupTimeS),
|
||||
totalDialTime: hms(session.dialTimeS),
|
||||
avgHandlingTime: hms(session.avgHandlingTimeS),
|
||||
};
|
||||
}
|
||||
|
||||
// --- Barge session management ---
|
||||
|
||||
getBargeSession(agentId: string) {
|
||||
return this.bargeSessions.get(agentId) ?? null;
|
||||
}
|
||||
|
||||
startBargeSession(session: { supervisorId: string; agentId: string; sipNumber: string; mode: 'listen' | 'whisper' | 'barge'; startedAt: string }) {
|
||||
this.bargeSessions.set(session.agentId, session);
|
||||
this.logger.log(`[BARGE] Started: ${session.supervisorId} → ${session.agentId} (${session.mode})`);
|
||||
}
|
||||
|
||||
updateBargeMode(agentId: string, mode: 'listen' | 'whisper' | 'barge') {
|
||||
const session = this.bargeSessions.get(agentId);
|
||||
if (!session) return;
|
||||
|
||||
const previousMode = session.mode;
|
||||
session.mode = mode;
|
||||
|
||||
// Emit SSE to agent — whisper/barge show indicator, listen is silent
|
||||
if (mode === 'whisper' || mode === 'barge') {
|
||||
this.agentStateSubject.next({ agentId, state: `supervisor-${mode}`, timestamp: new Date().toISOString() });
|
||||
} else if (previousMode !== 'listen') {
|
||||
// Switching back to listen from whisper/barge
|
||||
this.agentStateSubject.next({ agentId, state: 'supervisor-left', timestamp: new Date().toISOString() });
|
||||
}
|
||||
|
||||
this.logger.log(`[BARGE] Mode: ${agentId} → ${mode}`);
|
||||
}
|
||||
|
||||
endBargeSession(agentId: string) {
|
||||
const session = this.bargeSessions.get(agentId);
|
||||
if (!session) return;
|
||||
|
||||
this.bargeSessions.delete(agentId);
|
||||
this.agentStateSubject.next({ agentId, state: 'supervisor-left', timestamp: new Date().toISOString() });
|
||||
this.logger.log(`[BARGE] Ended: ${session.supervisorId} → ${agentId}`);
|
||||
}
|
||||
}
|
||||
|
||||
114
src/telephony-registration.service.ts
Normal file
114
src/telephony-registration.service.ts
Normal file
@@ -0,0 +1,114 @@
|
||||
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
|
||||
import { ConfigService } from '@nestjs/config';
|
||||
import axios from 'axios';
|
||||
import { PlatformGraphqlService } from './platform/platform-graphql.service';
|
||||
|
||||
// On startup, registers this sidecar with the telephony dispatcher
|
||||
// so Ozonetel events are routed to the correct sidecar by agentId.
|
||||
//
|
||||
// Flow:
|
||||
// 1. Load agent list from platform (Agent entities in this workspace)
|
||||
// 2. POST /api/supervisor/register to the dispatcher
|
||||
// 3. Start heartbeat interval (every 30s)
|
||||
// 4. On shutdown, DELETE /api/supervisor/register
|
||||
|
||||
const HEARTBEAT_INTERVAL_MS = 30_000;
|
||||
|
||||
@Injectable()
|
||||
export class TelephonyRegistrationService implements OnModuleInit, OnModuleDestroy {
|
||||
private readonly logger = new Logger(TelephonyRegistrationService.name);
|
||||
private heartbeatTimer: NodeJS.Timeout | null = null;
|
||||
|
||||
constructor(
|
||||
private config: ConfigService,
|
||||
private platform: PlatformGraphqlService,
|
||||
) {}
|
||||
|
||||
private get dispatcherUrl(): string {
|
||||
return this.config.get<string>('TELEPHONY_DISPATCHER_URL') ?? '';
|
||||
}
|
||||
|
||||
private get sidecarUrl(): string {
|
||||
return this.config.get<string>('TELEPHONY_CALLBACK_URL') ?? '';
|
||||
}
|
||||
|
||||
private get workspace(): string {
|
||||
return process.env.PLATFORM_WORKSPACE_SUBDOMAIN ?? 'unknown';
|
||||
}
|
||||
|
||||
async onModuleInit() {
|
||||
if (!this.dispatcherUrl || !this.sidecarUrl) {
|
||||
this.logger.warn('TELEPHONY_DISPATCHER_URL or TELEPHONY_CALLBACK_URL not set — skipping telephony registration');
|
||||
return;
|
||||
}
|
||||
|
||||
await this.register();
|
||||
|
||||
this.heartbeatTimer = setInterval(async () => {
|
||||
try {
|
||||
await axios.post(`${this.dispatcherUrl}/api/supervisor/heartbeat`, {
|
||||
sidecarUrl: this.sidecarUrl,
|
||||
}, { timeout: 5000 });
|
||||
} catch (err: any) {
|
||||
this.logger.warn(`Heartbeat failed: ${err.message} — attempting re-registration`);
|
||||
await this.register();
|
||||
}
|
||||
}, HEARTBEAT_INTERVAL_MS);
|
||||
}
|
||||
|
||||
async onModuleDestroy() {
|
||||
if (this.heartbeatTimer) clearInterval(this.heartbeatTimer);
|
||||
|
||||
if (this.dispatcherUrl && this.sidecarUrl) {
|
||||
try {
|
||||
await axios.delete(`${this.dispatcherUrl}/api/supervisor/register`, {
|
||||
data: { sidecarUrl: this.sidecarUrl },
|
||||
timeout: 5000,
|
||||
});
|
||||
this.logger.log('Deregistered from telephony dispatcher');
|
||||
} catch {
|
||||
// Best-effort — TTL will clean up anyway
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async register() {
|
||||
try {
|
||||
const agents = await this.loadAgentIds();
|
||||
if (agents.length === 0) {
|
||||
this.logger.warn('No agents found in workspace — skipping registration');
|
||||
return;
|
||||
}
|
||||
|
||||
await axios.post(`${this.dispatcherUrl}/api/supervisor/register`, {
|
||||
sidecarUrl: this.sidecarUrl,
|
||||
workspace: this.workspace,
|
||||
agents,
|
||||
}, { timeout: 5000 });
|
||||
|
||||
this.logger.log(`Registered with telephony dispatcher: ${agents.length} agents (${agents.join(', ')})`);
|
||||
} catch (err: any) {
|
||||
this.logger.error(`Registration failed: ${err.message}`);
|
||||
}
|
||||
}
|
||||
|
||||
private async loadAgentIds(): Promise<string[]> {
|
||||
try {
|
||||
const apiKey = this.config.get<string>('PLATFORM_API_KEY');
|
||||
if (!apiKey) return [];
|
||||
|
||||
const data = await this.platform.queryWithAuth<any>(
|
||||
`{ agents(first: 50) { edges { node { ozonetelAgentId } } } }`,
|
||||
undefined,
|
||||
`Bearer ${apiKey}`,
|
||||
);
|
||||
|
||||
return (data.agents?.edges ?? [])
|
||||
.map((e: any) => e.node.ozonetelAgentId)
|
||||
.filter((id: string) => id && id !== 'PENDING');
|
||||
} catch (err: any) {
|
||||
this.logger.warn(`Failed to load agents from platform: ${err.message}`);
|
||||
return [];
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
import { Module } from '@nestjs/common';
|
||||
import { Module, forwardRef } from '@nestjs/common';
|
||||
import { WidgetController } from './widget.controller';
|
||||
import { WebhooksController } from './webhooks.controller';
|
||||
import { WidgetService } from './widget.service';
|
||||
@@ -6,12 +6,13 @@ import { WidgetChatService } from './widget-chat.service';
|
||||
import { PlatformModule } from '../platform/platform.module';
|
||||
import { AuthModule } from '../auth/auth.module';
|
||||
import { ConfigThemeModule } from '../config/config-theme.module';
|
||||
import { CallerResolutionModule } from '../caller/caller-resolution.module';
|
||||
|
||||
// WidgetKeysService lives in ConfigThemeModule now — injected here via the
|
||||
// module's exports. This module only owns the widget-facing API endpoints
|
||||
// (init / chat / book / lead) plus the NestJS guards that consume the keys.
|
||||
@Module({
|
||||
imports: [PlatformModule, AuthModule, ConfigThemeModule],
|
||||
imports: [PlatformModule, AuthModule, ConfigThemeModule, forwardRef(() => CallerResolutionModule)],
|
||||
controllers: [WidgetController, WebhooksController],
|
||||
providers: [WidgetService, WidgetChatService],
|
||||
})
|
||||
|
||||
@@ -4,6 +4,7 @@ import { ConfigService } from '@nestjs/config';
|
||||
import type { WidgetInitResponse, WidgetBookRequest, WidgetLeadRequest } from './widget.types';
|
||||
import { ThemeService } from '../config/theme.service';
|
||||
import { DOCTOR_VISIT_SLOTS_FRAGMENT, normalizeDoctors, type NormalizedDoctor } from '../shared/doctor-utils';
|
||||
import { CallerResolutionService } from '../caller/caller-resolution.service';
|
||||
|
||||
// Dedup window: any lead created for this phone within the last 24h is
|
||||
// considered the same visitor's lead — chat + book + contact by the same
|
||||
@@ -25,6 +26,7 @@ export class WidgetService {
|
||||
private platform: PlatformGraphqlService,
|
||||
private theme: ThemeService,
|
||||
private config: ConfigService,
|
||||
private caller: CallerResolutionService,
|
||||
) {
|
||||
this.apiKey = config.get<string>('platform.apiKey') ?? '';
|
||||
}
|
||||
@@ -37,8 +39,10 @@ export class WidgetService {
|
||||
return raw.replace(/[^0-9]/g, '').slice(-10);
|
||||
}
|
||||
|
||||
// Shared lead dedup: finds a lead created in the last 24h for the same
|
||||
// phone, or creates a new one. Public so WidgetChatService can reuse it.
|
||||
// Shared lead dedup. Resolves via CallerResolutionService; when isNew
|
||||
// (no prior Lead/Patient), we have a name here (widget form field),
|
||||
// so we create both records inline. When an existing record is
|
||||
// returned we update it with the latest channel + name.
|
||||
async findOrCreateLeadByPhone(
|
||||
name: string,
|
||||
rawPhone: string,
|
||||
@@ -47,53 +51,86 @@ export class WidgetService {
|
||||
const phone = this.normalizePhone(rawPhone);
|
||||
if (!phone) throw new Error('Invalid phone number');
|
||||
|
||||
const since = new Date(Date.now() - LEAD_DEDUP_WINDOW_MS).toISOString();
|
||||
|
||||
try {
|
||||
const existing = await this.platform.queryWithAuth<any>(
|
||||
`query($phone: String!, $since: DateTime!) {
|
||||
leads(
|
||||
first: 1,
|
||||
filter: {
|
||||
contactPhone: { primaryPhoneNumber: { like: $phone } },
|
||||
createdAt: { gte: $since }
|
||||
},
|
||||
orderBy: [{ createdAt: DescNullsLast }]
|
||||
) { edges { node { id createdAt } } }
|
||||
}`,
|
||||
{ phone: `%${phone}`, since },
|
||||
this.auth,
|
||||
);
|
||||
const match = existing?.leads?.edges?.[0]?.node;
|
||||
if (match?.id) {
|
||||
this.logger.log(`Lead dedup: reusing ${match.id} for phone ${phone}`);
|
||||
return match.id as string;
|
||||
}
|
||||
} catch (err) {
|
||||
this.logger.warn(`Lead dedup lookup failed, falling through to create: ${err}`);
|
||||
}
|
||||
|
||||
const firstName = name.split(' ')[0] || name;
|
||||
const resolved = await this.caller.resolve(phone, this.auth);
|
||||
const firstName = name.split(' ')[0] || name || 'Unknown';
|
||||
const lastName = name.split(' ').slice(1).join(' ') || '';
|
||||
|
||||
const created = await this.platform.queryWithAuth<any>(
|
||||
`mutation($data: LeadCreateInput!) { createLead(data: $data) { id } }`,
|
||||
{
|
||||
data: {
|
||||
name,
|
||||
contactName: { firstName, lastName },
|
||||
contactPhone: { primaryPhoneNumber: `+91${phone}` },
|
||||
source: opts.source ?? 'WEBSITE',
|
||||
status: opts.status ?? 'NEW',
|
||||
interestedService: opts.interestedService ?? 'Website Enquiry',
|
||||
if (resolved.isNew) {
|
||||
// Net-new visitor — create Patient + Lead with the widget-
|
||||
// collected name. Both records get the real name from the
|
||||
// first moment they exist.
|
||||
let patientId: string | undefined;
|
||||
try {
|
||||
const p = await this.platform.queryWithAuth<any>(
|
||||
`mutation($data: PatientCreateInput!) { createPatient(data: $data) { id } }`,
|
||||
{
|
||||
data: {
|
||||
name: `${firstName} ${lastName}`.trim() || 'Unknown',
|
||||
fullName: { firstName, lastName },
|
||||
phones: { primaryPhoneNumber: `+91${phone}` },
|
||||
patientType: 'NEW',
|
||||
},
|
||||
},
|
||||
this.auth,
|
||||
);
|
||||
patientId = p?.createPatient?.id;
|
||||
} catch (err) {
|
||||
this.logger.warn(`Widget patient create failed (${phone}): ${err}`);
|
||||
}
|
||||
const created = await this.platform.queryWithAuth<any>(
|
||||
`mutation($data: LeadCreateInput!) { createLead(data: $data) { id } }`,
|
||||
{
|
||||
data: {
|
||||
name,
|
||||
contactName: { firstName, lastName },
|
||||
contactPhone: { primaryPhoneNumber: `+91${phone}` },
|
||||
source: opts.source ?? 'WEBSITE',
|
||||
status: opts.status ?? 'NEW',
|
||||
interestedService: opts.interestedService ?? 'Website Enquiry',
|
||||
...(patientId ? { patientId } : {}),
|
||||
},
|
||||
},
|
||||
},
|
||||
this.auth,
|
||||
);
|
||||
const id = created?.createLead?.id;
|
||||
if (!id) throw new Error('Lead creation returned no id');
|
||||
this.logger.log(`Lead dedup: created ${id} for ${name} (${phone})`);
|
||||
return id as string;
|
||||
this.auth,
|
||||
);
|
||||
const leadId = created?.createLead?.id;
|
||||
if (!leadId) throw new Error('Lead creation returned no id');
|
||||
this.logger.log(`Widget lead created: ${leadId} (patient ${patientId ?? 'none'}) for ${name} (${phone})`);
|
||||
return leadId;
|
||||
}
|
||||
|
||||
// Existing Lead found — update with widget-supplied details.
|
||||
const leadId = resolved.leadId;
|
||||
try {
|
||||
await this.platform.queryWithAuth<any>(
|
||||
`mutation($id: UUID!, $data: LeadUpdateInput!) { updateLead(id: $id, data: $data) { id } }`,
|
||||
{
|
||||
id: leadId,
|
||||
data: {
|
||||
name,
|
||||
contactName: { firstName, lastName },
|
||||
source: opts.source ?? 'WEBSITE',
|
||||
status: opts.status ?? 'NEW',
|
||||
interestedService: opts.interestedService ?? 'Website Enquiry',
|
||||
},
|
||||
},
|
||||
this.auth,
|
||||
);
|
||||
} catch (err) {
|
||||
this.logger.warn(`Lead update after resolve failed (lead=${leadId}): ${err}`);
|
||||
}
|
||||
if (resolved.patientId) {
|
||||
try {
|
||||
await this.platform.queryWithAuth<any>(
|
||||
`mutation($id: UUID!, $data: PatientUpdateInput!) { updatePatient(id: $id, data: $data) { id } }`,
|
||||
{ id: resolved.patientId, data: { fullName: { firstName, lastName } } },
|
||||
this.auth,
|
||||
);
|
||||
} catch (err) {
|
||||
this.logger.warn(`Patient rename after resolve failed (patient=${resolved.patientId}): ${err}`);
|
||||
}
|
||||
}
|
||||
this.logger.log(`Widget lead updated: ${leadId} (patient ${resolved.patientId}) for ${name} (${phone})`);
|
||||
return leadId;
|
||||
}
|
||||
|
||||
// Upgrade a lead's status — used when an existing lead is promoted from
|
||||
@@ -183,6 +220,7 @@ export class WidgetService {
|
||||
const created = await this.platform.queryWithAuth<any>(
|
||||
`mutation($data: PatientCreateInput!) { createPatient(data: $data) { id } }`,
|
||||
{ data: {
|
||||
name: req.patientName.trim() || 'Unknown',
|
||||
fullName: { firstName, lastName },
|
||||
phones: { primaryPhoneNumber: `+91${phone}` },
|
||||
patientType: 'NEW',
|
||||
@@ -196,6 +234,7 @@ export class WidgetService {
|
||||
const appt = await this.platform.queryWithAuth<any>(
|
||||
`mutation($data: AppointmentCreateInput!) { createAppointment(data: $data) { id } }`,
|
||||
{ data: {
|
||||
name: `${req.patientName.trim() || 'Patient'} — ${new Date(req.scheduledAt).toISOString().slice(0, 10)}`,
|
||||
scheduledAt: req.scheduledAt,
|
||||
durationMin: 30,
|
||||
appointmentType: 'CONSULTATION',
|
||||
@@ -204,6 +243,7 @@ export class WidgetService {
|
||||
department: req.departmentId,
|
||||
reasonForVisit: req.chiefComplaint ?? '',
|
||||
patientId,
|
||||
...(req.clinicId ? { clinicId: req.clinicId } : {}),
|
||||
} },
|
||||
this.auth,
|
||||
);
|
||||
|
||||
@@ -15,6 +15,7 @@ export type WidgetInitResponse = {
|
||||
export type WidgetBookRequest = {
|
||||
departmentId: string;
|
||||
doctorId: string;
|
||||
clinicId?: string;
|
||||
scheduledAt: string;
|
||||
patientName: string;
|
||||
patientPhone: string;
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
import { Controller, Post, Body, Headers, Logger } from '@nestjs/common';
|
||||
import { PlatformGraphqlService } from '../platform/platform-graphql.service';
|
||||
import { AgentLookupService } from '../platform/agent-lookup.service';
|
||||
import { CallerResolutionService } from '../caller/caller-resolution.service';
|
||||
import { ConfigService } from '@nestjs/config';
|
||||
|
||||
// Ozonetel sends all timestamps in IST — convert to UTC for storage
|
||||
@@ -20,6 +22,8 @@ export class MissedCallWebhookController {
|
||||
constructor(
|
||||
private readonly platform: PlatformGraphqlService,
|
||||
private readonly config: ConfigService,
|
||||
private readonly caller: CallerResolutionService,
|
||||
private readonly agentLookup: AgentLookupService,
|
||||
) {
|
||||
this.apiKey = config.get<string>('platform.apiKey') ?? '';
|
||||
}
|
||||
@@ -53,9 +57,17 @@ export class MissedCallWebhookController {
|
||||
return { received: true, processed: false };
|
||||
}
|
||||
|
||||
// Skip outbound calls — an unanswered outbound dial is NOT a
|
||||
// "missed call" in the call-center sense. Outbound call records
|
||||
// are created by the disposition flow, not the webhook.
|
||||
if (type === 'Manual' || type === 'OutBound') {
|
||||
this.logger.log(`Skipping outbound call webhook (type=${type}, status=${status})`);
|
||||
return { received: true, processed: false, reason: 'outbound' };
|
||||
}
|
||||
|
||||
// Determine call status for our platform
|
||||
const callStatus = status === 'Answered' ? 'COMPLETED' : 'MISSED';
|
||||
const direction = type === 'InBound' ? 'INBOUND' : 'OUTBOUND';
|
||||
const direction = 'INBOUND'; // only inbound reaches here now
|
||||
|
||||
// Use API key auth for server-to-server writes
|
||||
const authHeader = this.apiKey ? `Bearer ${this.apiKey}` : '';
|
||||
@@ -65,7 +77,38 @@ export class MissedCallWebhookController {
|
||||
}
|
||||
|
||||
try {
|
||||
// Step 1: Create call record
|
||||
// Step 1: Resolve caller. CallerResolutionService looks up BOTH
|
||||
// leads and patients — for an existing patient with no lead yet
|
||||
// it creates the lead on the fly and returns the name. This is
|
||||
// the single source of truth for caller identity across webhook,
|
||||
// polling, and agent-initiated paths.
|
||||
let resolved: { leadId: string; leadName: string | null; patientId: string } = {
|
||||
leadId: '',
|
||||
leadName: null,
|
||||
patientId: '',
|
||||
};
|
||||
try {
|
||||
const r = await this.caller.resolve(callerPhone, authHeader);
|
||||
const fullName = `${r.firstName} ${r.lastName}`.trim();
|
||||
resolved = {
|
||||
leadId: r.leadId,
|
||||
// Resolver returns isNew when no Lead/Patient exists for
|
||||
// this phone. We do NOT auto-create records from the
|
||||
// webhook — agents don't have a name to attach, so we
|
||||
// persist the phone as leadName (honest snapshot). The
|
||||
// first agent action (enquiry, appointment) will create
|
||||
// real Lead+Patient records and retroactive identity
|
||||
// isn't a data-layer concern.
|
||||
leadName: r.isNew ? `+91${callerPhone}` : (fullName || null),
|
||||
patientId: r.patientId,
|
||||
};
|
||||
this.logger.log(`[WEBHOOK] Resolved ${callerPhone} → lead=${resolved.leadId || 'none'} name=${resolved.leadName ?? 'unresolved'} isNew=${r.isNew}`);
|
||||
} catch (err) {
|
||||
this.logger.warn(`[WEBHOOK] Caller resolution failed for ${callerPhone}: ${err}`);
|
||||
}
|
||||
|
||||
// Step 2: Create call record with leadId + leadName baked in so
|
||||
// the worklist row renders the patient name immediately.
|
||||
const callId = await this.createCall({
|
||||
callerPhone,
|
||||
direction,
|
||||
@@ -77,25 +120,21 @@ export class MissedCallWebhookController {
|
||||
recordingUrl,
|
||||
disposition,
|
||||
ucid,
|
||||
leadId: resolved.leadId || null,
|
||||
leadName: resolved.leadName,
|
||||
}, authHeader);
|
||||
|
||||
this.logger.log(`Created call record: ${callId} (${callStatus})`);
|
||||
this.logger.log(`Created call record: ${callId} (${callStatus})${resolved.leadName ? ` linked to ${resolved.leadName}` : ''}`);
|
||||
|
||||
// Step 2: Find matching lead by phone number
|
||||
const lead = await this.findLeadByPhone(callerPhone, authHeader);
|
||||
|
||||
if (lead) {
|
||||
// Step 3: Link call to lead
|
||||
await this.updateCall(callId, { leadId: lead.id }, authHeader);
|
||||
|
||||
// Step 4: Create lead activity
|
||||
// Step 3: Lead-side side-effects (activity log + contact stats)
|
||||
if (resolved.leadId) {
|
||||
const summary = callStatus === 'MISSED'
|
||||
? `Missed inbound call from ${callerPhone} (${duration}s, ${hangupBy ?? 'unknown'})`
|
||||
: `Inbound call from ${callerPhone} — ${duration}s, ${disposition || 'no disposition'}`;
|
||||
|
||||
await this.createLeadActivity({
|
||||
leadId: lead.id,
|
||||
activityType: callStatus === 'MISSED' ? 'CALL_RECEIVED' : 'CALL_RECEIVED',
|
||||
leadId: resolved.leadId,
|
||||
activityType: 'CALL_RECEIVED',
|
||||
summary,
|
||||
channel: 'PHONE',
|
||||
performedBy: agentName ?? 'System',
|
||||
@@ -103,18 +142,16 @@ export class MissedCallWebhookController {
|
||||
outcome: callStatus === 'MISSED' ? 'NO_ANSWER' : 'SUCCESSFUL',
|
||||
}, authHeader);
|
||||
|
||||
// Step 5: Update lead contact timestamps
|
||||
await this.updateLead(lead.id, {
|
||||
// Bump contact timestamps. Read current contactAttempts first
|
||||
// (kept local rather than extending resolve() signature).
|
||||
const leadMeta = await this.findLeadByPhone(callerPhone, authHeader);
|
||||
await this.updateLead(resolved.leadId, {
|
||||
lastContacted: startTime ? new Date(startTime).toISOString() : new Date().toISOString(),
|
||||
contactAttempts: (lead.contactAttempts ?? 0) + 1,
|
||||
contactAttempts: ((leadMeta?.contactAttempts) ?? 0) + 1,
|
||||
}, authHeader);
|
||||
|
||||
this.logger.log(`Linked call to lead ${lead.id} (${lead.name}), activity created`);
|
||||
} else {
|
||||
this.logger.log(`No matching lead for ${callerPhone} — call record created without lead link`);
|
||||
}
|
||||
|
||||
return { received: true, processed: true, callId, leadId: lead?.id ?? null };
|
||||
return { received: true, processed: true, callId, leadId: resolved.leadId || null };
|
||||
} catch (err: any) {
|
||||
const responseData = err?.response?.data ? JSON.stringify(err.response.data) : '';
|
||||
this.logger.error(`Webhook processing failed: ${err.message} ${responseData}`);
|
||||
@@ -133,6 +170,8 @@ export class MissedCallWebhookController {
|
||||
recordingUrl: string | null;
|
||||
disposition: string | null;
|
||||
ucid: string | null;
|
||||
leadId?: string | null;
|
||||
leadName?: string | null;
|
||||
}, authHeader: string): Promise<string> {
|
||||
const callData: Record<string, any> = {
|
||||
name: `${data.direction === 'INBOUND' ? 'Inbound' : 'Outbound'} — ${data.callerPhone}`,
|
||||
@@ -145,15 +184,40 @@ export class MissedCallWebhookController {
|
||||
durationSec: data.duration,
|
||||
disposition: this.mapDisposition(data.disposition),
|
||||
};
|
||||
// Persist UCID so the 30-min CDR enrichment cron and historical
|
||||
// backfill can pair this row to a CDR record and fill in the
|
||||
// authoritative agent relation.
|
||||
if (data.ucid) callData.ucid = data.ucid;
|
||||
if (data.leadId) callData.leadId = data.leadId;
|
||||
if (data.leadName) callData.leadName = data.leadName;
|
||||
// Set callback tracking fields for missed calls so they appear in the worklist
|
||||
if (data.callStatus === 'MISSED') {
|
||||
callData.callbackstatus = 'PENDING_CALLBACK';
|
||||
callData.missedcallcount = 1;
|
||||
callData.callbackStatus = 'PENDING_CALLBACK';
|
||||
callData.missedCallCount = 1;
|
||||
}
|
||||
if (data.recordingUrl) {
|
||||
callData.recording = { primaryLinkUrl: data.recordingUrl, primaryLinkLabel: 'Recording' };
|
||||
}
|
||||
|
||||
// Resolve agent relation at write-time so the supervisor dashboard
|
||||
// can bucket the row immediately. Ozonetel sends transferred calls
|
||||
// with a chain-style AgentName like "RamaiahAdmin -> GlobalHealthX" —
|
||||
// the final handler is the last segment, so split on " -> " and
|
||||
// resolve that. Try both ozonetelAgentId (lowercase unique) and
|
||||
// ozonetelDisplayName (mixed-case human label) since Ozonetel mixes
|
||||
// formats across webhook payloads. Leaves agentId null on miss so
|
||||
// the cdr-enrichment cron can still attempt a match by UCID later.
|
||||
if (data.agentName) {
|
||||
const segments = data.agentName.split('->').map((s) => s.trim()).filter(Boolean);
|
||||
const finalHandler = segments[segments.length - 1];
|
||||
if (finalHandler) {
|
||||
const uuid =
|
||||
(await this.agentLookup.resolveByOzonetelId(finalHandler)) ??
|
||||
(await this.agentLookup.resolveByDisplayName(finalHandler));
|
||||
if (uuid) callData.agentId = uuid;
|
||||
}
|
||||
}
|
||||
|
||||
const result = await this.platform.queryWithAuth<any>(
|
||||
`mutation($data: CallCreateInput!) { createCall(data: $data) { id } }`,
|
||||
{ data: callData },
|
||||
@@ -234,8 +298,9 @@ export class MissedCallWebhookController {
|
||||
'General Enquiry': 'INFO_PROVIDED',
|
||||
'Appointment Booked': 'APPOINTMENT_BOOKED',
|
||||
'Follow Up': 'FOLLOW_UP_SCHEDULED',
|
||||
'Not Interested': 'CALLBACK_REQUESTED',
|
||||
'Not Interested': 'NOT_INTERESTED',
|
||||
'Wrong Number': 'WRONG_NUMBER',
|
||||
'No Answer': 'NO_ANSWER',
|
||||
};
|
||||
return map[disposition] ?? null;
|
||||
}
|
||||
|
||||
@@ -11,6 +11,8 @@ import { Test } from '@nestjs/testing';
|
||||
import { ConfigService } from '@nestjs/config';
|
||||
import { MissedCallWebhookController } from './missed-call-webhook.controller';
|
||||
import { PlatformGraphqlService } from '../platform/platform-graphql.service';
|
||||
import { AgentLookupService } from '../platform/agent-lookup.service';
|
||||
import { CallerResolutionService } from '../caller/caller-resolution.service';
|
||||
import {
|
||||
WEBHOOK_INBOUND_ANSWERED,
|
||||
WEBHOOK_INBOUND_MISSED,
|
||||
@@ -48,11 +50,28 @@ describe('MissedCallWebhookController', () => {
|
||||
}),
|
||||
};
|
||||
|
||||
const mockCaller = {
|
||||
resolve: jest.fn().mockResolvedValue({
|
||||
leadId: '',
|
||||
firstName: '',
|
||||
lastName: '',
|
||||
patientId: '',
|
||||
isNew: true,
|
||||
}),
|
||||
};
|
||||
|
||||
const mockAgentLookup = {
|
||||
resolveByOzonetelId: jest.fn().mockResolvedValue(null),
|
||||
resolveByDisplayName: jest.fn().mockResolvedValue(null),
|
||||
};
|
||||
|
||||
const module = await Test.createTestingModule({
|
||||
controllers: [MissedCallWebhookController],
|
||||
providers: [
|
||||
{ provide: PlatformGraphqlService, useValue: mockPlatformGql },
|
||||
{ provide: ConfigService, useValue: mockConfig },
|
||||
{ provide: CallerResolutionService, useValue: mockCaller },
|
||||
{ provide: AgentLookupService, useValue: mockAgentLookup },
|
||||
],
|
||||
}).compile();
|
||||
|
||||
|
||||
@@ -2,6 +2,8 @@ import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
|
||||
import { ConfigService } from '@nestjs/config';
|
||||
import { PlatformGraphqlService } from '../platform/platform-graphql.service';
|
||||
import { OzonetelAgentService } from '../ozonetel/ozonetel-agent.service';
|
||||
import { TelephonyConfigService } from '../config/telephony-config.service';
|
||||
import { CallerResolutionService } from '../caller/caller-resolution.service';
|
||||
|
||||
// Ozonetel sends all timestamps in IST — convert to UTC for storage
|
||||
export function istToUtc(istDateStr: string | null): string | null {
|
||||
@@ -33,10 +35,17 @@ export class MissedQueueService implements OnModuleInit {
|
||||
private readonly config: ConfigService,
|
||||
private readonly platform: PlatformGraphqlService,
|
||||
private readonly ozonetel: OzonetelAgentService,
|
||||
private readonly telephony: TelephonyConfigService,
|
||||
private readonly caller: CallerResolutionService,
|
||||
) {
|
||||
this.pollIntervalMs = this.config.get<number>('missedQueue.pollIntervalMs', 30000);
|
||||
}
|
||||
|
||||
// Read-through so admin config changes take effect without restart
|
||||
private get ownCampaign(): string {
|
||||
return this.telephony.getConfig().ozonetel.campaignName ?? '';
|
||||
}
|
||||
|
||||
onModuleInit() {
|
||||
this.logger.log(`Starting missed call ingestion polling every ${this.pollIntervalMs}ms`);
|
||||
setInterval(() => this.ingest().catch(err => this.logger.error('Ingestion failed', err)), this.pollIntervalMs);
|
||||
@@ -61,7 +70,17 @@ export class MissedQueueService implements OnModuleInit {
|
||||
|
||||
if (!abandonCalls?.length) return { created: 0, updated: 0 };
|
||||
|
||||
for (const call of abandonCalls) {
|
||||
// Filter to this sidecar's campaign only — the Ozonetel API
|
||||
// returns ALL abandoned calls across the account.
|
||||
const filtered = this.ownCampaign
|
||||
? abandonCalls.filter((c: any) => c.campaign === this.ownCampaign)
|
||||
: abandonCalls;
|
||||
|
||||
if (filtered.length < abandonCalls.length) {
|
||||
this.logger.log(`Filtered ${abandonCalls.length - filtered.length} calls from other campaigns (own=${this.ownCampaign})`);
|
||||
}
|
||||
|
||||
for (const call of filtered) {
|
||||
const ucid = call.monitorUCID;
|
||||
if (!ucid || this.processedUcids.has(ucid)) continue;
|
||||
this.processedUcids.add(ucid);
|
||||
@@ -73,43 +92,46 @@ export class MissedQueueService implements OnModuleInit {
|
||||
const callTime = istToUtc(call.callTime) ?? new Date().toISOString();
|
||||
|
||||
try {
|
||||
// Look up lead by phone number — strip +91 prefix for flexible matching
|
||||
const phoneDigits = phone.replace(/^\+91/, '');
|
||||
// Resolve caller via the shared service — covers the case
|
||||
// where there's an existing patient but no lead yet (the
|
||||
// service creates the lead on the fly and returns the name).
|
||||
// Same source of truth as the webhook path.
|
||||
let leadId: string | null = null;
|
||||
let leadName: string | null = null;
|
||||
try {
|
||||
const leadResult = await this.platform.query<any>(
|
||||
`{ leads(first: 1, filter: {
|
||||
contactPhone: { primaryPhoneNumber: { like: "%${phoneDigits}" } }
|
||||
}) { edges { node { id contactName { firstName lastName } patientId } } } }`,
|
||||
);
|
||||
const matchedLead = leadResult?.leads?.edges?.[0]?.node;
|
||||
if (matchedLead) {
|
||||
leadId = matchedLead.id;
|
||||
const fn = matchedLead.contactName?.firstName ?? '';
|
||||
const ln = matchedLead.contactName?.lastName ?? '';
|
||||
leadName = `${fn} ${ln}`.trim() || null;
|
||||
this.logger.log(`Matched missed call ${phone} → lead ${leadId} (${leadName})`);
|
||||
const apiKey = this.config.get<string>('platform.apiKey') ?? '';
|
||||
const auth = apiKey ? `Bearer ${apiKey}` : '';
|
||||
const r = await this.caller.resolve(phone, auth);
|
||||
if (r.isNew) {
|
||||
// No existing Lead/Patient — write phone as leadName.
|
||||
// Record creation is deferred to the first agent
|
||||
// action (enquiry / appointment).
|
||||
leadName = phone;
|
||||
} else if (r.leadId) {
|
||||
leadId = r.leadId;
|
||||
const fullName = `${r.firstName} ${r.lastName}`.trim();
|
||||
leadName = fullName || null;
|
||||
this.logger.log(`Matched missed call ${phone} → lead ${leadId} (${leadName ?? 'no name'})`);
|
||||
}
|
||||
} catch (err) {
|
||||
this.logger.warn(`Lead lookup failed for ${phone}: ${err}`);
|
||||
this.logger.warn(`Caller resolution failed for ${phone}: ${err}`);
|
||||
}
|
||||
|
||||
const existing = await this.platform.query<any>(
|
||||
`{ calls(first: 1, filter: {
|
||||
callbackstatus: { eq: PENDING_CALLBACK },
|
||||
callbackStatus: { eq: PENDING_CALLBACK },
|
||||
callerNumber: { primaryPhoneNumber: { eq: "${phone}" } }
|
||||
}) { edges { node { id missedcallcount } } } }`,
|
||||
}) { edges { node { id missedCallCount } } } }`,
|
||||
);
|
||||
|
||||
const existingNode = existing?.calls?.edges?.[0]?.node;
|
||||
|
||||
if (existingNode) {
|
||||
const newCount = (existingNode.missedcallcount || 1) + 1;
|
||||
const newCount = (existingNode.missedCallCount || 1) + 1;
|
||||
const updateParts = [
|
||||
`missedcallcount: ${newCount}`,
|
||||
`missedCallCount: ${newCount}`,
|
||||
`startedAt: "${callTime}"`,
|
||||
`callsourcenumber: "${did}"`,
|
||||
`callSourceNumber: "${did}"`,
|
||||
];
|
||||
if (leadId) updateParts.push(`leadId: "${leadId}"`);
|
||||
if (leadName) updateParts.push(`leadName: "${leadName}"`);
|
||||
@@ -120,12 +142,13 @@ export class MissedQueueService implements OnModuleInit {
|
||||
this.logger.log(`Dedup missed call ${phone}: count now ${newCount}${leadName ? ` (${leadName})` : ''}`);
|
||||
} else {
|
||||
const dataParts = [
|
||||
`name: "Missed — ${phone}"`,
|
||||
`callStatus: MISSED`,
|
||||
`direction: INBOUND`,
|
||||
`callerNumber: { primaryPhoneNumber: "${phone}", primaryPhoneCallingCode: "+91" }`,
|
||||
`callsourcenumber: "${did}"`,
|
||||
`callbackstatus: PENDING_CALLBACK`,
|
||||
`missedcallcount: 1`,
|
||||
`callSourceNumber: "${did}"`,
|
||||
`callbackStatus: PENDING_CALLBACK`,
|
||||
`missedCallCount: 1`,
|
||||
`startedAt: "${callTime}"`,
|
||||
];
|
||||
if (leadId) dataParts.push(`leadId: "${leadId}"`);
|
||||
@@ -160,12 +183,12 @@ export class MissedQueueService implements OnModuleInit {
|
||||
// Find oldest unassigned PENDING_CALLBACK call (empty agentName)
|
||||
let result = await this.platform.query<any>(
|
||||
`{ calls(first: 1, filter: {
|
||||
callbackstatus: { eq: PENDING_CALLBACK },
|
||||
callbackStatus: { eq: PENDING_CALLBACK },
|
||||
agentName: { eq: "" }
|
||||
}, orderBy: [{ startedAt: AscNullsLast }]) {
|
||||
edges { node {
|
||||
id callerNumber { primaryPhoneNumber }
|
||||
startedAt callsourcenumber missedcallcount
|
||||
startedAt callSourceNumber missedCallCount
|
||||
} }
|
||||
} }`,
|
||||
);
|
||||
@@ -176,12 +199,12 @@ export class MissedQueueService implements OnModuleInit {
|
||||
if (!call) {
|
||||
result = await this.platform.query<any>(
|
||||
`{ calls(first: 1, filter: {
|
||||
callbackstatus: { eq: PENDING_CALLBACK },
|
||||
callbackStatus: { eq: PENDING_CALLBACK },
|
||||
agentName: { is: NULL }
|
||||
}, orderBy: [{ startedAt: AscNullsLast }]) {
|
||||
edges { node {
|
||||
id callerNumber { primaryPhoneNumber }
|
||||
startedAt callsourcenumber missedcallcount
|
||||
startedAt callSourceNumber missedCallCount
|
||||
} }
|
||||
} }`,
|
||||
);
|
||||
@@ -209,13 +232,13 @@ export class MissedQueueService implements OnModuleInit {
|
||||
throw new Error(`Invalid status: ${status}. Must be one of: ${validStatuses.join(', ')}`);
|
||||
}
|
||||
|
||||
const dataParts: string[] = [`callbackstatus: ${status}`];
|
||||
const dataParts: string[] = [`callbackStatus: ${status}`];
|
||||
if (status === 'CALLBACK_ATTEMPTED') {
|
||||
dataParts.push(`callbackattemptedat: "${new Date().toISOString()}"`);
|
||||
dataParts.push(`callbackAttemptedAt: "${new Date().toISOString()}"`);
|
||||
}
|
||||
|
||||
return this.platform.queryWithAuth<any>(
|
||||
`mutation { updateCall(id: "${callId}", data: { ${dataParts.join(', ')} }) { id callbackstatus callbackattemptedat } }`,
|
||||
`mutation { updateCall(id: "${callId}", data: { ${dataParts.join(', ')} }) { id callbackStatus callbackAttemptedAt } }`,
|
||||
undefined,
|
||||
authHeader,
|
||||
);
|
||||
@@ -230,12 +253,12 @@ export class MissedQueueService implements OnModuleInit {
|
||||
const fields = `id name createdAt direction callStatus agentName
|
||||
callerNumber { primaryPhoneNumber }
|
||||
startedAt endedAt durationSec disposition leadId
|
||||
callbackstatus callsourcenumber missedcallcount callbackattemptedat`;
|
||||
callbackStatus callSourceNumber missedCallCount callbackAttemptedAt`;
|
||||
|
||||
const buildQuery = (status: string) => `{ calls(first: 50, filter: {
|
||||
agentName: { eq: "${agentName}" },
|
||||
callStatus: { eq: MISSED },
|
||||
callbackstatus: { eq: ${status} }
|
||||
callbackStatus: { eq: ${status} }
|
||||
}, orderBy: [{ startedAt: AscNullsLast }]) { edges { node { ${fields} } } } }`;
|
||||
|
||||
try {
|
||||
|
||||
@@ -15,6 +15,7 @@ import { ConfigService } from '@nestjs/config';
|
||||
import { MissedQueueService, istToUtc, normalizePhone } from './missed-queue.service';
|
||||
import { PlatformGraphqlService } from '../platform/platform-graphql.service';
|
||||
import { OzonetelAgentService } from '../ozonetel/ozonetel-agent.service';
|
||||
import { TelephonyConfigService } from '../config/telephony-config.service';
|
||||
import { ABANDON_CALL_RECORD } from '../__fixtures__/ozonetel-payloads';
|
||||
|
||||
describe('MissedQueueService', () => {
|
||||
@@ -57,6 +58,16 @@ describe('MissedQueueService', () => {
|
||||
getAbandonCalls: jest.fn().mockResolvedValue([ABANDON_CALL_RECORD]),
|
||||
},
|
||||
},
|
||||
{
|
||||
provide: TelephonyConfigService,
|
||||
useValue: {
|
||||
getConfig: () => ({
|
||||
ozonetel: { campaignName: 'Inbound_918041763400', agentId: '', agentPassword: '', did: '918041763400', sipId: '' },
|
||||
sip: { domain: 'test', wsPort: '444' },
|
||||
exotel: { apiKey: '', accountSid: '', subdomain: '' },
|
||||
}),
|
||||
},
|
||||
},
|
||||
],
|
||||
}).compile();
|
||||
|
||||
|
||||
@@ -3,6 +3,8 @@ import { PlatformModule } from '../platform/platform.module';
|
||||
import { OzonetelAgentModule } from '../ozonetel/ozonetel-agent.module';
|
||||
import { AuthModule } from '../auth/auth.module';
|
||||
import { RulesEngineModule } from '../rules-engine/rules-engine.module';
|
||||
import { CallerResolutionModule } from '../caller/caller-resolution.module';
|
||||
import { TelephonyConfigService } from '../config/telephony-config.service';
|
||||
import { WorklistController } from './worklist.controller';
|
||||
import { WorklistService } from './worklist.service';
|
||||
import { MissedQueueService } from './missed-queue.service';
|
||||
@@ -10,9 +12,9 @@ import { MissedCallWebhookController } from './missed-call-webhook.controller';
|
||||
import { KookooCallbackController } from './kookoo-callback.controller';
|
||||
|
||||
@Module({
|
||||
imports: [PlatformModule, forwardRef(() => OzonetelAgentModule), forwardRef(() => AuthModule), RulesEngineModule],
|
||||
imports: [PlatformModule, forwardRef(() => OzonetelAgentModule), forwardRef(() => AuthModule), RulesEngineModule, forwardRef(() => CallerResolutionModule)],
|
||||
controllers: [WorklistController, MissedCallWebhookController, KookooCallbackController],
|
||||
providers: [WorklistService, MissedQueueService],
|
||||
providers: [WorklistService, MissedQueueService, TelephonyConfigService],
|
||||
exports: [MissedQueueService],
|
||||
})
|
||||
export class WorklistModule {}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
import { ConfigService } from '@nestjs/config';
|
||||
import { PlatformGraphqlService } from '../platform/platform-graphql.service';
|
||||
import { WorklistConsumer } from '../rules-engine/consumers/worklist.consumer';
|
||||
|
||||
@@ -16,8 +17,49 @@ export class WorklistService {
|
||||
constructor(
|
||||
private readonly platform: PlatformGraphqlService,
|
||||
private readonly worklistConsumer: WorklistConsumer,
|
||||
private readonly config: ConfigService,
|
||||
) {}
|
||||
|
||||
private get pageSize(): number {
|
||||
return this.config.get<number>('worklist.pageSize', 50);
|
||||
}
|
||||
|
||||
private get maxPages(): number {
|
||||
return this.config.get<number>('worklist.maxPages', 10);
|
||||
}
|
||||
|
||||
// Paginate a Relay connection query. Caller provides a function that
|
||||
// builds the query for a given cursor ('' on first page). Stops when
|
||||
// the platform reports no more pages OR the safety ceiling hits.
|
||||
private async fetchAllPages<T>(
|
||||
buildQuery: (cursorClause: string) => string,
|
||||
connectionKey: string,
|
||||
authHeader: string,
|
||||
): Promise<T[]> {
|
||||
const all: T[] = [];
|
||||
let cursor = '';
|
||||
for (let page = 0; page < this.maxPages; page++) {
|
||||
const cursorClause = cursor ? `, after: "${cursor}"` : '';
|
||||
try {
|
||||
const data = await this.platform.queryWithAuth<any>(
|
||||
buildQuery(cursorClause),
|
||||
undefined,
|
||||
authHeader,
|
||||
);
|
||||
const conn = data?.[connectionKey];
|
||||
if (!conn) break;
|
||||
all.push(...(conn.edges?.map((e: any) => e.node) ?? []));
|
||||
if (!conn.pageInfo?.hasNextPage) break;
|
||||
cursor = conn.pageInfo.endCursor ?? '';
|
||||
if (!cursor) break;
|
||||
} catch (err) {
|
||||
this.logger.warn(`[WORKLIST] ${connectionKey} page ${page} failed: ${err}`);
|
||||
break;
|
||||
}
|
||||
}
|
||||
return all;
|
||||
}
|
||||
|
||||
async getWorklist(agentName: string, authHeader: string): Promise<WorklistResponse> {
|
||||
const [rawMissedCalls, rawFollowUps, rawMarketingLeads] = await Promise.all([
|
||||
this.getMissedCalls(agentName, authHeader),
|
||||
@@ -49,69 +91,95 @@ export class WorklistService {
|
||||
}
|
||||
|
||||
private async getAssignedLeads(agentName: string, authHeader: string): Promise<any[]> {
|
||||
try {
|
||||
const data = await this.platform.queryWithAuth<any>(
|
||||
`{ leads(first: 20, filter: { assignedAgent: { eq: "${agentName}" } }, orderBy: [{ createdAt: AscNullsLast }]) { edges { node {
|
||||
id createdAt
|
||||
contactName { firstName lastName }
|
||||
contactPhone { primaryPhoneNumber }
|
||||
contactEmail { primaryEmail }
|
||||
source status interestedService
|
||||
assignedAgent campaignId
|
||||
contactAttempts spamScore isSpam
|
||||
aiSummary aiSuggestedAction
|
||||
} } } }`,
|
||||
undefined,
|
||||
authHeader,
|
||||
);
|
||||
return data.leads.edges.map((e: any) => e.node);
|
||||
} catch (err) {
|
||||
this.logger.warn(`Failed to fetch assigned leads: ${err}`);
|
||||
return [];
|
||||
}
|
||||
return this.fetchAllPages<any>(
|
||||
(cursor) => `{ leads(first: ${this.pageSize}${cursor}, filter: { assignedAgent: { eq: "${agentName}" } }, orderBy: [{ createdAt: AscNullsLast }]) { edges { node {
|
||||
id createdAt
|
||||
contactName { firstName lastName }
|
||||
contactPhone { primaryPhoneNumber }
|
||||
contactEmail { primaryEmail }
|
||||
source status interestedService
|
||||
assignedAgent campaignId
|
||||
contactAttempts spamScore isSpam
|
||||
aiSummary aiSuggestedAction
|
||||
patientId
|
||||
} } pageInfo { hasNextPage endCursor } } }`,
|
||||
'leads',
|
||||
authHeader,
|
||||
);
|
||||
}
|
||||
|
||||
private async getPendingFollowUps(agentName: string, authHeader: string): Promise<any[]> {
|
||||
const raw = await this.fetchAllPages<any>(
|
||||
(cursor) => `{ followUps(first: ${this.pageSize}${cursor}, filter: { assignedAgent: { eq: "${agentName}" } }) { edges { node {
|
||||
id name createdAt
|
||||
typeCustom status scheduledAt completedAt
|
||||
priority assignedAgent
|
||||
patientId
|
||||
} } pageInfo { hasNextPage endCursor } } }`,
|
||||
'followUps',
|
||||
authHeader,
|
||||
);
|
||||
// Filter to PENDING/OVERDUE client-side since platform may not support in-filter on remapped fields
|
||||
const followUps = raw.filter((f: any) => f.status === 'PENDING' || f.status === 'OVERDUE');
|
||||
try {
|
||||
const data = await this.platform.queryWithAuth<any>(
|
||||
`{ followUps(first: 20, filter: { assignedAgent: { eq: "${agentName}" } }) { edges { node {
|
||||
id name createdAt
|
||||
typeCustom status scheduledAt completedAt
|
||||
priority assignedAgent
|
||||
patientId
|
||||
} } } }`,
|
||||
undefined,
|
||||
authHeader,
|
||||
|
||||
// Enrich with patient name/phone so the worklist can render them.
|
||||
// FollowUp stores only patientId — the name in fu.name is free-form
|
||||
// and phone isn't stored at all, so one patient fetch fills both.
|
||||
const patientIds: string[] = Array.from(
|
||||
new Set(followUps.map((f: any) => f.patientId).filter((id: any): id is string => typeof id === 'string' && id.length > 0)),
|
||||
);
|
||||
// Filter to PENDING/OVERDUE client-side since platform may not support in-filter on remapped fields
|
||||
return data.followUps.edges
|
||||
.map((e: any) => e.node)
|
||||
.filter((f: any) => f.status === 'PENDING' || f.status === 'OVERDUE');
|
||||
if (patientIds.length > 0) {
|
||||
try {
|
||||
const idsGql = patientIds.map((id) => `"${id}"`).join(',');
|
||||
const patientsData = await this.platform.queryWithAuth<any>(
|
||||
`{ patients(first: ${patientIds.length}, filter: { id: { in: [${idsGql}] } }) { edges { node {
|
||||
id fullName { firstName lastName } phones { primaryPhoneNumber }
|
||||
} } } }`,
|
||||
undefined,
|
||||
authHeader,
|
||||
);
|
||||
const patientMap = new Map<string, { name: string; phone: string }>();
|
||||
for (const edge of patientsData.patients.edges) {
|
||||
const p = edge.node;
|
||||
const name = [p.fullName?.firstName, p.fullName?.lastName].filter(Boolean).join(' ').trim();
|
||||
const phone = p.phones?.primaryPhoneNumber ?? '';
|
||||
patientMap.set(p.id, { name, phone });
|
||||
}
|
||||
for (const fu of followUps) {
|
||||
if (fu.patientId && patientMap.has(fu.patientId)) {
|
||||
const p = patientMap.get(fu.patientId)!;
|
||||
fu.patientName = p.name;
|
||||
fu.patientPhone = p.phone;
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
this.logger.warn(`Failed to enrich follow-ups with patient data: ${err}`);
|
||||
}
|
||||
}
|
||||
|
||||
return followUps;
|
||||
} catch (err) {
|
||||
this.logger.warn(`Failed to fetch follow-ups: ${err}`);
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
private async getMissedCalls(agentName: string, authHeader: string): Promise<any[]> {
|
||||
try {
|
||||
// FIFO ordering (AscNullsLast) — oldest first. No agentName filter — missed calls are a shared queue.
|
||||
const data = await this.platform.queryWithAuth<any>(
|
||||
`{ calls(first: 20, filter: { callStatus: { eq: MISSED }, callbackstatus: { in: [PENDING_CALLBACK, CALLBACK_ATTEMPTED] } }, orderBy: [{ startedAt: AscNullsLast }]) { edges { node {
|
||||
id name createdAt
|
||||
direction callStatus agentName
|
||||
callerNumber { primaryPhoneNumber }
|
||||
startedAt endedAt durationSec
|
||||
disposition leadId
|
||||
callbackstatus callsourcenumber missedcallcount callbackattemptedat
|
||||
} } } }`,
|
||||
undefined,
|
||||
authHeader,
|
||||
);
|
||||
return data.calls.edges.map((e: any) => e.node);
|
||||
} catch (err) {
|
||||
this.logger.warn(`Failed to fetch missed calls: ${err}`);
|
||||
return [];
|
||||
}
|
||||
private async getMissedCalls(_agentName: string, authHeader: string): Promise<any[]> {
|
||||
// FIFO ordering (AscNullsLast) — oldest first. No agentName filter —
|
||||
// missed calls are a shared queue. Paginated via WORKLIST_PAGE_SIZE
|
||||
// × WORKLIST_MAX_PAGES ceiling.
|
||||
return this.fetchAllPages<any>(
|
||||
(cursor) => `{ calls(first: ${this.pageSize}${cursor}, filter: { callStatus: { eq: MISSED }, callbackStatus: { in: [PENDING_CALLBACK, CALLBACK_ATTEMPTED] } }, orderBy: [{ startedAt: AscNullsLast }]) { edges { node {
|
||||
id name createdAt
|
||||
direction callStatus agentName
|
||||
callerNumber { primaryPhoneNumber }
|
||||
startedAt endedAt durationSec
|
||||
disposition leadId leadName
|
||||
callbackStatus callSourceNumber missedCallCount callbackAttemptedAt
|
||||
} } pageInfo { hasNextPage endCursor } } }`,
|
||||
'calls',
|
||||
authHeader,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user