feat: SSE agent state, maint module, timestamp fix, missed call lead lookup

- SSE agent state stream: supervisor maintains state map from Ozonetel webhooks, streams via /api/supervisor/agent-state/stream
- Force-logout via SSE: distinct force-logout event type avoids conflict with normal login cycle
- Maint module (/api/maint): OTP-guarded endpoints for force-ready, unlock-agent, backfill-missed-calls, fix-timestamps
- Fix Ozonetel IST→UTC timestamp conversion: istToUtc() in webhook controller and missed-queue service
- Missed call lead lookup: ingestion queries leads by phone, stores leadId + leadName on Call entity
- Timestamp backfill endpoint: throttled at 700ms/mutation, idempotent (skips already-fixed records)
- Structured logging: full JSON payloads for agent/call webhooks, [DISPOSE] trace with agentId
- Fix dead code: agent-state endpoint auto-assign was after return statement
- Export SupervisorService for cross-module injection

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-24 22:04:31 +05:30
parent d3331e56c0
commit eb4000961f
10 changed files with 388 additions and 62 deletions

View File

@@ -13,6 +13,7 @@ import { WorklistModule } from './worklist/worklist.module';
import { CallAssistModule } from './call-assist/call-assist.module';
import { SearchModule } from './search/search.module';
import { SupervisorModule } from './supervisor/supervisor.module';
import { MaintModule } from './maint/maint.module';
@Module({
imports: [
@@ -32,6 +33,7 @@ import { SupervisorModule } from './supervisor/supervisor.module';
CallAssistModule,
SearchModule,
SupervisorModule,
MaintModule,
],
})
export class AppModule {}

View File

@@ -0,0 +1,191 @@
import { Controller, Post, UseGuards, Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { MaintGuard } from './maint.guard';
import { OzonetelAgentService } from '../ozonetel/ozonetel-agent.service';
import { PlatformGraphqlService } from '../platform/platform-graphql.service';
import { SessionService } from '../auth/session.service';
import { SupervisorService } from '../supervisor/supervisor.service';
@Controller('api/maint')
@UseGuards(MaintGuard)
export class MaintController {
private readonly logger = new Logger(MaintController.name);
constructor(
private readonly config: ConfigService,
private readonly ozonetel: OzonetelAgentService,
private readonly platform: PlatformGraphqlService,
private readonly session: SessionService,
private readonly supervisor: SupervisorService,
) {}
@Post('force-ready')
async forceReady() {
const agentId = this.config.get<string>('OZONETEL_AGENT_ID') ?? 'agent3';
const password = process.env.OZONETEL_AGENT_PASSWORD ?? 'Test123$';
const sipId = this.config.get<string>('OZONETEL_SIP_ID') ?? '521814';
this.logger.log(`[MAINT] Force ready: agent=${agentId}`);
try {
await this.ozonetel.logoutAgent({ agentId, password });
const result = await this.ozonetel.loginAgent({
agentId,
password,
phoneNumber: sipId,
mode: 'blended',
});
this.logger.log(`[MAINT] Force ready complete: ${JSON.stringify(result)}`);
return { status: 'ok', message: `Agent ${agentId} force-readied`, result };
} catch (error: any) {
const message = error.response?.data?.message ?? error.message ?? 'Force ready failed';
this.logger.error(`[MAINT] Force ready failed: ${message}`);
return { status: 'error', message };
}
}
@Post('unlock-agent')
async unlockAgent() {
const agentId = this.config.get<string>('OZONETEL_AGENT_ID') ?? 'agent3';
this.logger.log(`[MAINT] Unlock agent session: ${agentId}`);
try {
const existing = await this.session.getSession(agentId);
if (!existing) {
return { status: 'ok', message: `No active session for ${agentId}` };
}
await this.session.unlockSession(agentId);
// Push force-logout via SSE to all connected browsers for this agent
this.supervisor.emitForceLogout(agentId);
this.logger.log(`[MAINT] Session unlocked + force-logout pushed for ${agentId} (was held by IP ${existing.ip} since ${existing.lockedAt})`);
return { status: 'ok', message: `Session unlocked and force-logout sent for ${agentId}`, previousSession: existing };
} catch (error: any) {
this.logger.error(`[MAINT] Unlock failed: ${error.message}`);
return { status: 'error', message: error.message };
}
}
@Post('backfill-missed-calls')
async backfillMissedCalls() {
this.logger.log('[MAINT] Backfill missed call lead names — starting');
// Fetch all missed calls without a leadId
const result = await this.platform.query<any>(
`{ calls(first: 200, filter: {
callStatus: { eq: MISSED },
leadId: { is: NULL }
}) { edges { node { id callerNumber { primaryPhoneNumber } } } } }`,
);
const calls = result?.calls?.edges?.map((e: any) => e.node) ?? [];
if (calls.length === 0) {
this.logger.log('[MAINT] No missed calls without leadId found');
return { status: 'ok', total: 0, patched: 0 };
}
this.logger.log(`[MAINT] Found ${calls.length} missed calls without leadId`);
let patched = 0;
let skipped = 0;
for (const call of calls) {
const phone = call.callerNumber?.primaryPhoneNumber;
if (!phone) { skipped++; continue; }
const phoneDigits = phone.replace(/^\+91/, '');
try {
const leadResult = await this.platform.query<any>(
`{ leads(first: 1, filter: {
contactPhone: { primaryPhoneNumber: { like: "%${phoneDigits}" } }
}) { edges { node { id contactName { firstName lastName } } } } }`,
);
const lead = leadResult?.leads?.edges?.[0]?.node;
if (!lead) { skipped++; continue; }
const fn = lead.contactName?.firstName ?? '';
const ln = lead.contactName?.lastName ?? '';
const leadName = `${fn} ${ln}`.trim();
await this.platform.query<any>(
`mutation { updateCall(id: "${call.id}", data: {
leadId: "${lead.id}"${leadName ? `, leadName: "${leadName}"` : ''}
}) { id } }`,
);
patched++;
this.logger.log(`[MAINT] Patched ${phone}${leadName} (${lead.id})`);
} catch (err) {
this.logger.warn(`[MAINT] Failed to patch ${call.id}: ${err}`);
skipped++;
}
}
this.logger.log(`[MAINT] Backfill complete: ${patched} patched, ${skipped} skipped out of ${calls.length}`);
return { status: 'ok', total: calls.length, patched, skipped };
}
@Post('fix-timestamps')
async fixTimestamps() {
this.logger.log('[MAINT] Fix call timestamps — subtracting 5:30 IST offset from existing records');
const result = await this.platform.query<any>(
`{ calls(first: 200) { edges { node { id startedAt endedAt createdAt } } } }`,
);
const calls = result?.calls?.edges?.map((e: any) => e.node) ?? [];
if (calls.length === 0) {
return { status: 'ok', total: 0, fixed: 0 };
}
this.logger.log(`[MAINT] Found ${calls.length} call records to check`);
let fixed = 0;
let skipped = 0;
for (const call of calls) {
if (!call.startedAt) { skipped++; continue; }
// Skip records that don't need fixing: if startedAt is BEFORE createdAt,
// it was already corrected (or is naturally correct)
const started = new Date(call.startedAt).getTime();
const created = new Date(call.createdAt).getTime();
if (started <= created) {
skipped++;
continue;
}
try {
const updates: string[] = [];
const startDate = new Date(call.startedAt);
startDate.setMinutes(startDate.getMinutes() - 330);
updates.push(`startedAt: "${startDate.toISOString()}"`);
if (call.endedAt) {
const endDate = new Date(call.endedAt);
endDate.setMinutes(endDate.getMinutes() - 330);
updates.push(`endedAt: "${endDate.toISOString()}"`);
}
await this.platform.query<any>(
`mutation { updateCall(id: "${call.id}", data: { ${updates.join(', ')} }) { id } }`,
);
fixed++;
// Throttle: 700ms between mutations to stay under 100/min rate limit
await new Promise(resolve => setTimeout(resolve, 700));
} catch (err) {
this.logger.warn(`[MAINT] Failed to fix ${call.id}: ${err}`);
skipped++;
}
}
this.logger.log(`[MAINT] Timestamp fix complete: ${fixed} fixed, ${skipped} skipped out of ${calls.length}`);
return { status: 'ok', total: calls.length, fixed, skipped };
}
}

20
src/maint/maint.guard.ts Normal file
View File

@@ -0,0 +1,20 @@
import { CanActivate, ExecutionContext, Injectable, HttpException } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
@Injectable()
export class MaintGuard implements CanActivate {
private readonly otp: string;
constructor(private config: ConfigService) {
this.otp = process.env.MAINT_OTP ?? '400168';
}
canActivate(context: ExecutionContext): boolean {
const request = context.switchToHttp().getRequest();
const provided = request.headers['x-maint-otp'] ?? request.body?.otp;
if (!provided || provided !== this.otp) {
throw new HttpException('Invalid maintenance OTP', 403);
}
return true;
}
}

12
src/maint/maint.module.ts Normal file
View File

@@ -0,0 +1,12 @@
import { Module } from '@nestjs/common';
import { PlatformModule } from '../platform/platform.module';
import { OzonetelAgentModule } from '../ozonetel/ozonetel-agent.module';
import { AuthModule } from '../auth/auth.module';
import { SupervisorModule } from '../supervisor/supervisor.module';
import { MaintController } from './maint.controller';
@Module({
imports: [PlatformModule, OzonetelAgentModule, AuthModule, SupervisorModule],
controllers: [MaintController],
})
export class MaintModule {}

View File

@@ -65,7 +65,7 @@ export class OzonetelAgentController {
throw new HttpException('state required', 400);
}
this.logger.log(`Agent state change: ${this.defaultAgentId}${body.state} (${body.pauseReason ?? ''})`);
this.logger.log(`[AGENT-STATE] ${this.defaultAgentId}${body.state} (${body.pauseReason ?? 'none'})`);
try {
const result = await this.ozonetelAgent.changeAgentState({
@@ -73,47 +73,31 @@ export class OzonetelAgentController {
state: body.state,
pauseReason: body.pauseReason,
});
this.logger.log(`[AGENT-STATE] Ozonetel response: ${JSON.stringify(result)}`);
// Auto-assign missed call when agent goes Ready
if (body.state === 'Ready') {
try {
const assigned = await this.missedQueue.assignNext(this.defaultAgentId);
if (assigned) {
this.logger.log(`[AGENT-STATE] Auto-assigned missed call ${assigned.id}`);
return { ...result, assignedCall: assigned };
}
} catch (err) {
this.logger.warn(`[AGENT-STATE] Auto-assignment on Ready failed: ${err}`);
}
}
return result;
} catch (error: any) {
const message = error.response?.data?.message ?? error.message ?? 'State change failed';
const responseData = error.response?.data ? JSON.stringify(error.response.data) : '';
this.logger.error(`[AGENT-STATE] FAILED: ${message} ${responseData}`);
return { status: 'error', message };
}
// Auto-assign missed call when agent goes Ready
if (body.state === 'Ready') {
try {
const assigned = await this.missedQueue.assignNext(this.defaultAgentId);
if (assigned) {
return { status: 'ok', message: `State changed to Ready. Assigned missed call ${assigned.id}`, assignedCall: assigned };
}
} catch (err) {
this.logger.warn(`Auto-assignment on Ready failed: ${err}`);
}
}
}
@Post('agent-ready')
async agentReady() {
this.logger.log(`Force ready: logging out and back in agent ${this.defaultAgentId}`);
try {
await this.ozonetelAgent.logoutAgent({
agentId: this.defaultAgentId,
password: this.defaultAgentPassword,
});
const result = await this.ozonetelAgent.loginAgent({
agentId: this.defaultAgentId,
password: this.defaultAgentPassword,
phoneNumber: this.defaultSipId,
mode: 'blended',
});
return result;
} catch (error: any) {
const message = error.response?.data?.message ?? error.message ?? 'Force ready failed';
this.logger.error(`Force ready failed: ${message}`);
throw new HttpException(message, error.response?.status ?? 502);
}
}
// force-ready moved to /api/maint/force-ready
@Post('dispose')
async dispose(
@@ -132,19 +116,21 @@ export class OzonetelAgentController {
throw new HttpException('ucid and disposition required', 400);
}
this.logger.log(`Dispose: ucid=${body.ucid} disposition=${body.disposition}`);
const ozonetelDisposition = this.mapToOzonetelDisposition(body.disposition);
this.logger.log(`[DISPOSE] ucid=${body.ucid} disposition=${body.disposition} → ozonetel="${ozonetelDisposition}" agentId=${this.defaultAgentId} callerPhone=${body.callerPhone ?? 'none'} direction=${body.direction ?? 'unknown'} leadId=${body.leadId ?? 'none'}`);
try {
const result = await this.ozonetelAgent.setDisposition({
agentId: this.defaultAgentId,
ucid: body.ucid,
disposition: ozonetelDisposition,
});
this.logger.log(`[DISPOSE] Ozonetel response: ${JSON.stringify(result)}`);
} catch (error: any) {
const message = error.response?.data?.message ?? error.message ?? 'Disposition failed';
this.logger.error(`Dispose failed: ${message}`);
const responseData = error.response?.data ? JSON.stringify(error.response.data) : '';
this.logger.error(`[DISPOSE] FAILED: ${message} ${responseData}`);
}
// Handle missed call callback status update
@@ -188,7 +174,7 @@ export class OzonetelAgentController {
const campaignName = body.campaignName ?? process.env.OZONETEL_CAMPAIGN_NAME ?? 'Inbound_918041763265';
this.logger.log(`Manual dial: ${body.phoneNumber} campaign=${campaignName} (lead: ${body.leadId ?? 'none'})`);
this.logger.log(`[DIAL] phone=${body.phoneNumber} campaign=${campaignName} agentId=${this.defaultAgentId} lead=${body.leadId ?? 'none'}`);
try {
const result = await this.ozonetelAgent.manualDial({

View File

@@ -1,4 +1,5 @@
import { Controller, Get, Post, Body, Query, Logger } from '@nestjs/common';
import { Controller, Get, Post, Body, Query, Sse, Logger } from '@nestjs/common';
import { Observable, filter, map } from 'rxjs';
import { SupervisorService } from './supervisor.service';
@Controller('api/supervisor')
@@ -22,7 +23,7 @@ export class SupervisorController {
@Post('call-event')
handleCallEvent(@Body() body: any) {
const event = body.data ?? body;
this.logger.log(`Call event: ${event.action} ucid=${event.ucid ?? event.monitorUCID} agent=${event.agent_id ?? event.agentID}`);
this.logger.log(`[CALL-EVENT] ${JSON.stringify(event)}`);
this.supervisor.handleCallEvent(event);
return { received: true };
}
@@ -30,8 +31,25 @@ export class SupervisorController {
@Post('agent-event')
handleAgentEvent(@Body() body: any) {
const event = body.data ?? body;
this.logger.log(`Agent event: ${event.action} agent=${event.agentId ?? event.agent_id}`);
this.logger.log(`[AGENT-EVENT] ${JSON.stringify(event)}`);
this.supervisor.handleAgentEvent(event);
return { received: true };
}
@Get('agent-state')
getAgentState(@Query('agentId') agentId: string) {
const state = this.supervisor.getAgentState(agentId);
return state ?? { state: 'offline', timestamp: null };
}
@Sse('agent-state/stream')
streamAgentState(@Query('agentId') agentId: string): Observable<MessageEvent> {
this.logger.log(`[SSE] Agent state stream opened for ${agentId}`);
return this.supervisor.agentStateSubject.pipe(
filter(event => event.agentId === agentId),
map(event => ({
data: JSON.stringify({ state: event.state, timestamp: event.timestamp }),
} as MessageEvent)),
);
}
}

View File

@@ -8,5 +8,6 @@ import { SupervisorService } from './supervisor.service';
imports: [PlatformModule, OzonetelAgentModule],
controllers: [SupervisorController],
providers: [SupervisorService],
exports: [SupervisorService],
})
export class SupervisorModule {}

View File

@@ -1,5 +1,6 @@
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { Subject } from 'rxjs';
import { PlatformGraphqlService } from '../platform/platform-graphql.service';
import { OzonetelAgentService } from '../ozonetel/ozonetel-agent.service';
@@ -12,10 +13,19 @@ type ActiveCall = {
status: 'active' | 'on-hold';
};
export type AgentOzonetelState = 'ready' | 'break' | 'training' | 'calling' | 'in-call' | 'acw' | 'offline';
type AgentStateEntry = {
state: AgentOzonetelState;
timestamp: string;
};
@Injectable()
export class SupervisorService implements OnModuleInit {
private readonly logger = new Logger(SupervisorService.name);
private readonly activeCalls = new Map<string, ActiveCall>();
private readonly agentStates = new Map<string, AgentStateEntry>();
readonly agentStateSubject = new Subject<{ agentId: string; state: AgentOzonetelState; timestamp: string }>();
constructor(
private platform: PlatformGraphqlService,
@@ -50,7 +60,46 @@ export class SupervisorService implements OnModuleInit {
}
handleAgentEvent(event: any) {
this.logger.log(`Agent event: ${event.agentId ?? event.agent_id}${event.action}`);
const agentId = event.agentId ?? event.agent_id ?? 'unknown';
const action = event.action ?? 'unknown';
const eventData = event.eventData ?? '';
const eventTime = event.event_time ?? event.eventTime ?? new Date().toISOString();
this.logger.log(`[AGENT-STATE] ${agentId}${action}${eventData ? ` (${eventData})` : ''} at ${eventTime}`);
const mapped = this.mapOzonetelAction(action, eventData);
if (mapped) {
this.agentStates.set(agentId, { state: mapped, timestamp: eventTime });
this.agentStateSubject.next({ agentId, state: mapped, timestamp: eventTime });
this.logger.log(`[AGENT-STATE] Emitted: ${agentId}${mapped}`);
}
}
private mapOzonetelAction(action: string, eventData: string): AgentOzonetelState | null {
switch (action) {
case 'release': return 'ready';
case 'calling': return 'calling';
case 'incall': return 'in-call';
case 'ACW': return 'acw';
case 'logout': return 'offline';
case 'AUX':
// "changeMode" is the brief AUX during login — not a real pause
if (eventData === 'changeMode') return null;
if (eventData?.toLowerCase().includes('training')) return 'training';
return 'break';
case 'login': return null; // wait for release
default: return null;
}
}
getAgentState(agentId: string): AgentStateEntry | null {
return this.agentStates.get(agentId) ?? null;
}
emitForceLogout(agentId: string) {
this.logger.log(`[AGENT-STATE] Emitting force-logout for ${agentId}`);
this.agentStates.set(agentId, { state: 'offline', timestamp: new Date().toISOString() });
// Use a special state so frontend can distinguish admin force-logout from normal Ozonetel logout
this.agentStateSubject.next({ agentId, state: 'force-logout' as any, timestamp: new Date().toISOString() });
}
getActiveCalls(): ActiveCall[] {

View File

@@ -2,6 +2,16 @@ import { Controller, Post, Body, Headers, Logger } from '@nestjs/common';
import { PlatformGraphqlService } from '../platform/platform-graphql.service';
import { ConfigService } from '@nestjs/config';
// Ozonetel sends all timestamps in IST — convert to UTC for storage
function istToUtc(istDateStr: string | null): string | null {
if (!istDateStr) return null;
// Parse as-is, then subtract 5:30 to get UTC
const d = new Date(istDateStr);
if (isNaN(d.getTime())) return null;
d.setMinutes(d.getMinutes() - 330); // IST is UTC+5:30
return d.toISOString();
}
@Controller('webhooks/ozonetel')
export class MissedCallWebhookController {
private readonly logger = new Logger(MissedCallWebhookController.name);
@@ -130,8 +140,8 @@ export class MissedCallWebhookController {
callStatus: data.callStatus,
callerNumber: { primaryPhoneNumber: `+91${data.callerPhone}` },
agentName: data.agentName,
startedAt: data.startTime ? new Date(data.startTime).toISOString() : null,
endedAt: data.endTime ? new Date(data.endTime).toISOString() : null,
startedAt: istToUtc(data.startTime),
endedAt: istToUtc(data.endTime),
durationSec: data.duration,
disposition: this.mapDisposition(data.disposition),
};

View File

@@ -3,6 +3,15 @@ import { ConfigService } from '@nestjs/config';
import { PlatformGraphqlService } from '../platform/platform-graphql.service';
import { OzonetelAgentService } from '../ozonetel/ozonetel-agent.service';
// Ozonetel sends all timestamps in IST — convert to UTC for storage
export function istToUtc(istDateStr: string | null): string | null {
if (!istDateStr) return null;
const d = new Date(istDateStr);
if (isNaN(d.getTime())) return null;
d.setMinutes(d.getMinutes() - 330); // IST is UTC+5:30
return d.toISOString();
}
// Normalize phone to +91XXXXXXXXXX format
export function normalizePhone(raw: string): string {
let digits = raw.replace(/[^0-9]/g, '');
@@ -61,9 +70,31 @@ export class MissedQueueService implements OnModuleInit {
if (!phone || phone.length < 13) continue;
const did = call.did || '';
const callTime = call.callTime || new Date().toISOString();
const callTime = istToUtc(call.callTime) ?? new Date().toISOString();
try {
// Look up lead by phone number — strip +91 prefix for flexible matching
const phoneDigits = phone.replace(/^\+91/, '');
let leadId: string | null = null;
let leadName: string | null = null;
try {
const leadResult = await this.platform.query<any>(
`{ leads(first: 1, filter: {
contactPhone: { primaryPhoneNumber: { like: "%${phoneDigits}" } }
}) { edges { node { id contactName { firstName lastName } patientId } } } }`,
);
const matchedLead = leadResult?.leads?.edges?.[0]?.node;
if (matchedLead) {
leadId = matchedLead.id;
const fn = matchedLead.contactName?.firstName ?? '';
const ln = matchedLead.contactName?.lastName ?? '';
leadName = `${fn} ${ln}`.trim() || null;
this.logger.log(`Matched missed call ${phone} → lead ${leadId} (${leadName})`);
}
} catch (err) {
this.logger.warn(`Lead lookup failed for ${phone}: ${err}`);
}
const existing = await this.platform.query<any>(
`{ calls(first: 1, filter: {
callbackstatus: { eq: PENDING_CALLBACK },
@@ -75,29 +106,35 @@ export class MissedQueueService implements OnModuleInit {
if (existingNode) {
const newCount = (existingNode.missedcallcount || 1) + 1;
const updateParts = [
`missedcallcount: ${newCount}`,
`startedAt: "${callTime}"`,
`callsourcenumber: "${did}"`,
];
if (leadId) updateParts.push(`leadId: "${leadId}"`);
if (leadName) updateParts.push(`leadName: "${leadName}"`);
await this.platform.query<any>(
`mutation { updateCall(id: "${existingNode.id}", data: {
missedcallcount: ${newCount},
startedAt: "${callTime}",
callsourcenumber: "${did}"
}) { id } }`,
`mutation { updateCall(id: "${existingNode.id}", data: { ${updateParts.join(', ')} }) { id } }`,
);
updated++;
this.logger.log(`Dedup missed call ${phone}: count now ${newCount}`);
this.logger.log(`Dedup missed call ${phone}: count now ${newCount}${leadName ? ` (${leadName})` : ''}`);
} else {
const dataParts = [
`callStatus: MISSED`,
`direction: INBOUND`,
`callerNumber: { primaryPhoneNumber: "${phone}", primaryPhoneCallingCode: "+91" }`,
`callsourcenumber: "${did}"`,
`callbackstatus: PENDING_CALLBACK`,
`missedcallcount: 1`,
`startedAt: "${callTime}"`,
];
if (leadId) dataParts.push(`leadId: "${leadId}"`);
if (leadName) dataParts.push(`leadName: "${leadName}"`);
await this.platform.query<any>(
`mutation { createCall(data: {
callStatus: MISSED,
direction: INBOUND,
callerNumber: { primaryPhoneNumber: "${phone}", primaryPhoneCallingCode: "+91" },
callsourcenumber: "${did}",
callbackstatus: PENDING_CALLBACK,
missedcallcount: 1,
startedAt: "${callTime}"
}) { id } }`,
`mutation { createCall(data: { ${dataParts.join(', ')} }) { id } }`,
);
created++;
this.logger.log(`Created missed call record for ${phone}`);
this.logger.log(`Created missed call record for ${phone}${leadName ? `${leadName}` : ''}`);
}
} catch (err) {
this.logger.warn(`Failed to process abandon call ${ucid}: ${err}`);