feat: real-time active call SSE — hold/unhold status for supervisor live monitor

- SupervisorService: added activeCallSubject (RxJS Subject), emits on all
  activeCalls Map mutations (Answered, Calling, Disconnect, Hold, Unhold)
- SupervisorController: new @Sse('active-calls/stream') endpoint
- OzonetelAgentController: callControl HOLD/UNHOLD updates activeCalls Map
  status via supervisor.updateCallStatus()

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-17 05:45:14 +05:30
parent 9cf0f69dde
commit 9a016a2ed0
3 changed files with 33 additions and 4 deletions

View File

@@ -382,6 +382,13 @@ export class OzonetelAgentController {
try { try {
const result = await this.ozonetelAgent.callControl(body); const result = await this.ozonetelAgent.callControl(body);
if (body.action === 'HOLD') {
this.supervisor.updateCallStatus(body.ucid, 'on-hold');
} else if (body.action === 'UNHOLD') {
this.supervisor.updateCallStatus(body.ucid, 'active');
}
return result; return result;
} catch (error: any) { } catch (error: any) {
const message = error.response?.data?.message ?? error.message ?? 'Call control failed'; const message = error.response?.data?.message ?? error.message ?? 'Call control failed';

View File

@@ -13,6 +13,16 @@ export class SupervisorController {
return this.supervisor.getActiveCalls(); return this.supervisor.getActiveCalls();
} }
@Sse('active-calls/stream')
streamActiveCalls(): Observable<MessageEvent> {
this.logger.log('[SSE] Active calls stream opened');
return this.supervisor.activeCallSubject.pipe(
map(event => ({
data: JSON.stringify(event),
} as MessageEvent)),
);
}
@Get('team-performance') @Get('team-performance')
async getTeamPerformance(@Query('date') date?: string) { async getTeamPerformance(@Query('date') date?: string) {
const targetDate = date ?? new Date().toISOString().split('T')[0]; const targetDate = date ?? new Date().toISOString().split('T')[0];

View File

@@ -36,6 +36,7 @@ export class SupervisorService implements OnModuleInit {
private readonly agentStates = new Map<string, AgentStateEntry>(); private readonly agentStates = new Map<string, AgentStateEntry>();
private readonly acwTimers = new Map<string, NodeJS.Timeout>(); private readonly acwTimers = new Map<string, NodeJS.Timeout>();
readonly agentStateSubject = new Subject<{ agentId: string; state: AgentOzonetelState | string; timestamp: string }>(); readonly agentStateSubject = new Subject<{ agentId: string; state: AgentOzonetelState | string; timestamp: string }>();
readonly activeCallSubject = new Subject<{ type: 'update' | 'remove'; call?: ActiveCall; ucid: string }>();
// Worklist update stream — emitted when a missed call is created or // Worklist update stream — emitted when a missed call is created or
// assigned. Frontend SSE listener triggers an immediate worklist // assigned. Frontend SSE listener triggers an immediate worklist
// refresh so agents see new missed calls without waiting for the 30s poll. // refresh so agents see new missed calls without waiting for the 30s poll.
@@ -95,10 +96,9 @@ export class SupervisorService implements OnModuleInit {
this.logger.warn(`Ignoring call event for offline agent ${agentId} (${ucid})`); this.logger.warn(`Ignoring call event for offline agent ${agentId} (${ucid})`);
return; return;
} }
this.activeCalls.set(ucid, { const call: ActiveCall = { ucid, agentId, callerNumber, callType, startTime: eventTime, status: 'active' };
ucid, agentId, callerNumber, this.activeCalls.set(ucid, call);
callType, startTime: eventTime, status: 'active', this.activeCallSubject.next({ type: 'update', call, ucid });
});
this.logger.log(`Active call: ${agentId}${callerNumber} (${ucid})`); this.logger.log(`Active call: ${agentId}${callerNumber} (${ucid})`);
// Persist CALL_START as AgentEvent on the "Answered" moment // Persist CALL_START as AgentEvent on the "Answered" moment
@@ -130,6 +130,7 @@ export class SupervisorService implements OnModuleInit {
} else if (action === 'Disconnect') { } else if (action === 'Disconnect') {
const wasActive = this.activeCalls.get(ucid); const wasActive = this.activeCalls.get(ucid);
this.activeCalls.delete(ucid); this.activeCalls.delete(ucid);
this.activeCallSubject.next({ type: 'remove', ucid });
this.logger.log(`Call ended: ${ucid}`); this.logger.log(`Call ended: ${ucid}`);
// Persist CALL_END — pair against the start for duration. // Persist CALL_END — pair against the start for duration.
@@ -294,6 +295,17 @@ export class SupervisorService implements OnModuleInit {
// definitely stale (e.g. Disconnect webhook was dropped). // definitely stale (e.g. Disconnect webhook was dropped).
private static readonly NON_CALL_AGENT_STATES = new Set(['ready', 'offline', 'paused']); private static readonly NON_CALL_AGENT_STATES = new Set(['ready', 'offline', 'paused']);
updateCallStatus(ucid: string, status: 'active' | 'on-hold') {
const call = this.activeCalls.get(ucid);
if (!call) {
this.logger.warn(`[CALL-STATUS] No active call found for UCID ${ucid}`);
return;
}
call.status = status;
this.activeCallSubject.next({ type: 'update', call, ucid });
this.logger.log(`[CALL-STATUS] ${ucid}${status} (agent=${call.agentId})`);
}
getActiveCalls(): ActiveCall[] { getActiveCalls(): ActiveCall[] {
// Sweep stale entries before returning. The activeCalls Map is a // Sweep stale entries before returning. The activeCalls Map is a
// best-effort in-memory projection of Ozonetel call events — if // best-effort in-memory projection of Ozonetel call events — if