mirror of
https://dev.azure.com/globalhealthx/EMR/_git/helix-engage-server
synced 2026-05-18 20:08:19 +00:00
Replaces the untrusted platform function path (SDK's lead-auto-assign
was written but never deployed to either workspace — all leads created
after seeding are orphan).
Polls every 60s:
1. Fetch up to 100 unassigned leads (assignedAgent empty or null)
2. Fetch platform Agents whose live SupervisorService state is
ready/calling/in-call/acw (skip offline/break/training/unknown)
3. Build open-lead count per agent (single paginated query)
4. Assign each unassigned lead to the least-loaded active agent —
writes agent.name into lead.assignedAgent to match the worklist
filter (assignedAgent: { eq: agentName })
Catches every lead-creation path: CSV import, enquiry form,
missed-call webhook, widget, livekit. No platform changes needed.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
183 lines
8.3 KiB
TypeScript
183 lines
8.3 KiB
TypeScript
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
|
|
import { PlatformGraphqlService } from '../platform/platform-graphql.service';
|
|
import { SupervisorService } from '../supervisor/supervisor.service';
|
|
|
|
const TICK_INTERVAL_MS = 60 * 1000; // 60s
|
|
const KICKOFF_DELAY_MS = 45_000; // let sidecar boot settle
|
|
const MAX_LEADS_PER_TICK = 100; // guard against runaway batches
|
|
const ACTIVE_STATES = new Set(['ready', 'calling', 'in-call', 'acw']);
|
|
// Excluded: 'offline' (agent logged out), 'break' / 'training' (explicitly away).
|
|
// ACW is included — the agent is still handling work and will return to Ready soon.
|
|
|
|
/**
|
|
* Polls for unassigned leads every 60s and assigns them least-loaded across
|
|
* active agents.
|
|
*
|
|
* Why polling instead of platform functions or Redpanda events:
|
|
* - The platform's lead.created hook isn't wired to the sidecar (no bridge)
|
|
* - The SDK's lead-auto-assign.function.ts is written but hasn't been
|
|
* deployed/published to either workspace
|
|
* - Polling catches EVERY lead creation path (CSV import, enquiry form,
|
|
* missed-call webhook, widget, livekit) with no per-path instrumentation
|
|
*
|
|
* Assignment strategy:
|
|
* - Count each active agent's OPEN leads (status in NEW/CONTACTED/QUALIFIED)
|
|
* - Pick the agent with the lowest count — ties broken by platform ordering
|
|
* - Write agent.name (display name) to lead.assignedAgent (worklist filter matches on this)
|
|
*
|
|
* Edge cases:
|
|
* - No active agents → skip tick; next run retries
|
|
* - agentName empty → skip agent
|
|
* - Mutation errors → log, continue with next lead
|
|
*/
|
|
@Injectable()
|
|
export class LeadAutoAssignService implements OnModuleInit, OnModuleDestroy {
|
|
private readonly logger = new Logger(LeadAutoAssignService.name);
|
|
private timer: NodeJS.Timeout | null = null;
|
|
private running = false;
|
|
|
|
constructor(
|
|
private readonly platform: PlatformGraphqlService,
|
|
private readonly supervisor: SupervisorService,
|
|
) {}
|
|
|
|
onModuleInit() {
|
|
setTimeout(() => {
|
|
this.runOnce().catch((err) => this.logger.warn(`[AUTO-ASSIGN] Kickoff failed: ${err?.message ?? err}`));
|
|
}, KICKOFF_DELAY_MS);
|
|
|
|
this.timer = setInterval(() => {
|
|
this.runOnce().catch((err) => this.logger.warn(`[AUTO-ASSIGN] Tick failed: ${err?.message ?? err}`));
|
|
}, TICK_INTERVAL_MS);
|
|
}
|
|
|
|
onModuleDestroy() {
|
|
if (this.timer) clearInterval(this.timer);
|
|
}
|
|
|
|
async runOnce(): Promise<{ assigned: number; skipped: number; noAgents: boolean }> {
|
|
// Guard against concurrent runs (prev tick hasn't finished).
|
|
if (this.running) return { assigned: 0, skipped: 0, noAgents: false };
|
|
this.running = true;
|
|
try {
|
|
const unassigned = await this.fetchUnassignedLeads();
|
|
if (unassigned.length === 0) return { assigned: 0, skipped: 0, noAgents: false };
|
|
|
|
const active = await this.fetchActiveAgents();
|
|
if (active.length === 0) {
|
|
this.logger.debug(`[AUTO-ASSIGN] ${unassigned.length} leads waiting — no active agents`);
|
|
return { assigned: 0, skipped: unassigned.length, noAgents: true };
|
|
}
|
|
|
|
// Seed current-load map: lead count per agent across their OPEN leads.
|
|
// Fetch once per tick (not per lead) — the map is updated locally as we assign.
|
|
const loadByAgent = await this.fetchOpenLeadCounts(active.map((a) => a.name));
|
|
|
|
let assigned = 0;
|
|
let skipped = 0;
|
|
|
|
for (const lead of unassigned) {
|
|
// Pick the least-loaded active agent.
|
|
const target = [...active].sort(
|
|
(a, b) => (loadByAgent.get(a.name) ?? 0) - (loadByAgent.get(b.name) ?? 0),
|
|
)[0];
|
|
if (!target?.name) { skipped++; continue; }
|
|
|
|
try {
|
|
await this.platform.query<any>(
|
|
`mutation($id: UUID!, $data: LeadUpdateInput!) { updateLead(id: $id, data: $data) { id } }`,
|
|
{ id: lead.id, data: { assignedAgent: target.name } },
|
|
);
|
|
assigned++;
|
|
loadByAgent.set(target.name, (loadByAgent.get(target.name) ?? 0) + 1);
|
|
await new Promise((r) => setTimeout(r, 40)); // gentle pacing
|
|
} catch (err: any) {
|
|
this.logger.warn(`[AUTO-ASSIGN] updateLead failed for ${lead.id}: ${err?.message ?? err}`);
|
|
skipped++;
|
|
}
|
|
}
|
|
|
|
if (assigned > 0 || skipped > 0) {
|
|
const loadSummary = active.map((a) => `${a.name}=${loadByAgent.get(a.name) ?? 0}`).join(', ');
|
|
this.logger.log(`[AUTO-ASSIGN] Pass complete — assigned=${assigned} skipped=${skipped} load=[${loadSummary}]`);
|
|
}
|
|
return { assigned, skipped, noAgents: false };
|
|
} finally {
|
|
this.running = false;
|
|
}
|
|
}
|
|
|
|
private async fetchUnassignedLeads(): Promise<Array<{ id: string; campaignId: string | null }>> {
|
|
try {
|
|
const data: any = await this.platform.query<any>(
|
|
`{ leads(first: ${MAX_LEADS_PER_TICK}, filter: {
|
|
or: [
|
|
{ assignedAgent: { eq: "" } },
|
|
{ assignedAgent: { is: NULL } }
|
|
]
|
|
}, orderBy: [{ createdAt: AscNullsLast }]) {
|
|
edges { node { id campaignId } }
|
|
} }`,
|
|
);
|
|
return (data?.leads?.edges ?? []).map((e: any) => e.node);
|
|
} catch (err: any) {
|
|
this.logger.warn(`[AUTO-ASSIGN] fetch unassigned failed: ${err?.message ?? err}`);
|
|
return [];
|
|
}
|
|
}
|
|
|
|
private async fetchActiveAgents(): Promise<Array<{ id: string; name: string; ozonetelAgentId: string }>> {
|
|
try {
|
|
const data: any = await this.platform.query<any>(
|
|
`{ agents(first: 100) { edges { node { id name ozonetelAgentId } } } }`,
|
|
);
|
|
const all: Array<{ id: string; name: string; ozonetelAgentId: string }> =
|
|
(data?.agents?.edges ?? []).map((e: any) => e.node);
|
|
// Filter to agents whose in-memory state (from Ozonetel webhooks) is active.
|
|
// If state is unknown (never seen a state event), treat as offline.
|
|
return all.filter((a) => {
|
|
if (!a.name || !a.ozonetelAgentId) return false;
|
|
const entry = this.supervisor.getAgentState(a.ozonetelAgentId);
|
|
return entry ? ACTIVE_STATES.has(entry.state) : false;
|
|
});
|
|
} catch (err: any) {
|
|
this.logger.warn(`[AUTO-ASSIGN] fetch agents failed: ${err?.message ?? err}`);
|
|
return [];
|
|
}
|
|
}
|
|
|
|
private async fetchOpenLeadCounts(agentNames: string[]): Promise<Map<string, number>> {
|
|
const map = new Map<string, number>();
|
|
for (const name of agentNames) map.set(name, 0);
|
|
if (agentNames.length === 0) return map;
|
|
|
|
// Single aggregated query — pull ALL open leads with assignedAgent set,
|
|
// count by agent locally. Avoids N+1 over agents.
|
|
try {
|
|
let after: string | null = null;
|
|
for (let page = 0; page < 20; page++) {
|
|
const cursor: string = after ? `, after: "${after}"` : '';
|
|
const data: any = await this.platform.query<any>(
|
|
`{ leads(first: 200${cursor}, filter: {
|
|
status: { in: [NEW, CONTACTED, QUALIFIED] }
|
|
}) {
|
|
edges { node { assignedAgent } }
|
|
pageInfo { hasNextPage endCursor }
|
|
} }`,
|
|
);
|
|
const edges = data?.leads?.edges ?? [];
|
|
for (const e of edges) {
|
|
const name = e.node.assignedAgent;
|
|
if (name && map.has(name)) map.set(name, (map.get(name) ?? 0) + 1);
|
|
}
|
|
const info: { hasNextPage?: boolean; endCursor?: string } = data?.leads?.pageInfo ?? {};
|
|
if (!info.hasNextPage) break;
|
|
after = info.endCursor ?? null;
|
|
}
|
|
} catch (err: any) {
|
|
this.logger.warn(`[AUTO-ASSIGN] fetch open-lead counts failed: ${err?.message ?? err}`);
|
|
}
|
|
return map;
|
|
}
|
|
}
|