From 34e053204f20ace6c8419d3a117d6ca12caa428b Mon Sep 17 00:00:00 2001 From: saridsa2 Date: Wed, 15 Apr 2026 11:23:53 +0530 Subject: [PATCH] feat(leads): sidecar polling service for auto-assigning unassigned leads MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces the untrusted platform function path (SDK's lead-auto-assign was written but never deployed to either workspace — all leads created after seeding are orphan). Polls every 60s: 1. Fetch up to 100 unassigned leads (assignedAgent empty or null) 2. Fetch platform Agents whose live SupervisorService state is ready/calling/in-call/acw (skip offline/break/training/unknown) 3. Build open-lead count per agent (single paginated query) 4. Assign each unassigned lead to the least-loaded active agent — writes agent.name into lead.assignedAgent to match the worklist filter (assignedAgent: { eq: agentName }) Catches every lead-creation path: CSV import, enquiry form, missed-call webhook, widget, livekit. No platform changes needed. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/app.module.ts | 2 + src/leads/lead-auto-assign.service.ts | 182 ++++++++++++++++++++++++++ src/leads/leads.module.ts | 11 ++ 3 files changed, 195 insertions(+) create mode 100644 src/leads/lead-auto-assign.service.ts create mode 100644 src/leads/leads.module.ts diff --git a/src/app.module.ts b/src/app.module.ts index 680b687..6b18437 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -22,6 +22,7 @@ import { ConfigThemeModule } from './config/config-theme.module'; import { WidgetModule } from './widget/widget.module'; import { TeamModule } from './team/team.module'; import { MasterdataModule } from './masterdata/masterdata.module'; +import { LeadsModule } from './leads/leads.module'; import { TelephonyRegistrationService } from './telephony-registration.service'; @Module({ @@ -51,6 +52,7 @@ import { TelephonyRegistrationService } from './telephony-registration.service'; WidgetModule, TeamModule, MasterdataModule, + LeadsModule, ], providers: [TelephonyRegistrationService], }) diff --git a/src/leads/lead-auto-assign.service.ts b/src/leads/lead-auto-assign.service.ts new file mode 100644 index 0000000..0d7f682 --- /dev/null +++ b/src/leads/lead-auto-assign.service.ts @@ -0,0 +1,182 @@ +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import { PlatformGraphqlService } from '../platform/platform-graphql.service'; +import { SupervisorService } from '../supervisor/supervisor.service'; + +const TICK_INTERVAL_MS = 60 * 1000; // 60s +const KICKOFF_DELAY_MS = 45_000; // let sidecar boot settle +const MAX_LEADS_PER_TICK = 100; // guard against runaway batches +const ACTIVE_STATES = new Set(['ready', 'calling', 'in-call', 'acw']); +// Excluded: 'offline' (agent logged out), 'break' / 'training' (explicitly away). +// ACW is included — the agent is still handling work and will return to Ready soon. + +/** + * Polls for unassigned leads every 60s and assigns them least-loaded across + * active agents. + * + * Why polling instead of platform functions or Redpanda events: + * - The platform's lead.created hook isn't wired to the sidecar (no bridge) + * - The SDK's lead-auto-assign.function.ts is written but hasn't been + * deployed/published to either workspace + * - Polling catches EVERY lead creation path (CSV import, enquiry form, + * missed-call webhook, widget, livekit) with no per-path instrumentation + * + * Assignment strategy: + * - Count each active agent's OPEN leads (status in NEW/CONTACTED/QUALIFIED) + * - Pick the agent with the lowest count — ties broken by platform ordering + * - Write agent.name (display name) to lead.assignedAgent (worklist filter matches on this) + * + * Edge cases: + * - No active agents → skip tick; next run retries + * - agentName empty → skip agent + * - Mutation errors → log, continue with next lead + */ +@Injectable() +export class LeadAutoAssignService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(LeadAutoAssignService.name); + private timer: NodeJS.Timeout | null = null; + private running = false; + + constructor( + private readonly platform: PlatformGraphqlService, + private readonly supervisor: SupervisorService, + ) {} + + onModuleInit() { + setTimeout(() => { + this.runOnce().catch((err) => this.logger.warn(`[AUTO-ASSIGN] Kickoff failed: ${err?.message ?? err}`)); + }, KICKOFF_DELAY_MS); + + this.timer = setInterval(() => { + this.runOnce().catch((err) => this.logger.warn(`[AUTO-ASSIGN] Tick failed: ${err?.message ?? err}`)); + }, TICK_INTERVAL_MS); + } + + onModuleDestroy() { + if (this.timer) clearInterval(this.timer); + } + + async runOnce(): Promise<{ assigned: number; skipped: number; noAgents: boolean }> { + // Guard against concurrent runs (prev tick hasn't finished). + if (this.running) return { assigned: 0, skipped: 0, noAgents: false }; + this.running = true; + try { + const unassigned = await this.fetchUnassignedLeads(); + if (unassigned.length === 0) return { assigned: 0, skipped: 0, noAgents: false }; + + const active = await this.fetchActiveAgents(); + if (active.length === 0) { + this.logger.debug(`[AUTO-ASSIGN] ${unassigned.length} leads waiting — no active agents`); + return { assigned: 0, skipped: unassigned.length, noAgents: true }; + } + + // Seed current-load map: lead count per agent across their OPEN leads. + // Fetch once per tick (not per lead) — the map is updated locally as we assign. + const loadByAgent = await this.fetchOpenLeadCounts(active.map((a) => a.name)); + + let assigned = 0; + let skipped = 0; + + for (const lead of unassigned) { + // Pick the least-loaded active agent. + const target = [...active].sort( + (a, b) => (loadByAgent.get(a.name) ?? 0) - (loadByAgent.get(b.name) ?? 0), + )[0]; + if (!target?.name) { skipped++; continue; } + + try { + await this.platform.query( + `mutation($id: UUID!, $data: LeadUpdateInput!) { updateLead(id: $id, data: $data) { id } }`, + { id: lead.id, data: { assignedAgent: target.name } }, + ); + assigned++; + loadByAgent.set(target.name, (loadByAgent.get(target.name) ?? 0) + 1); + await new Promise((r) => setTimeout(r, 40)); // gentle pacing + } catch (err: any) { + this.logger.warn(`[AUTO-ASSIGN] updateLead failed for ${lead.id}: ${err?.message ?? err}`); + skipped++; + } + } + + if (assigned > 0 || skipped > 0) { + const loadSummary = active.map((a) => `${a.name}=${loadByAgent.get(a.name) ?? 0}`).join(', '); + this.logger.log(`[AUTO-ASSIGN] Pass complete — assigned=${assigned} skipped=${skipped} load=[${loadSummary}]`); + } + return { assigned, skipped, noAgents: false }; + } finally { + this.running = false; + } + } + + private async fetchUnassignedLeads(): Promise> { + try { + const data: any = await this.platform.query( + `{ leads(first: ${MAX_LEADS_PER_TICK}, filter: { + or: [ + { assignedAgent: { eq: "" } }, + { assignedAgent: { is: NULL } } + ] + }, orderBy: [{ createdAt: AscNullsLast }]) { + edges { node { id campaignId } } + } }`, + ); + return (data?.leads?.edges ?? []).map((e: any) => e.node); + } catch (err: any) { + this.logger.warn(`[AUTO-ASSIGN] fetch unassigned failed: ${err?.message ?? err}`); + return []; + } + } + + private async fetchActiveAgents(): Promise> { + try { + const data: any = await this.platform.query( + `{ agents(first: 100) { edges { node { id name ozonetelAgentId } } } }`, + ); + const all: Array<{ id: string; name: string; ozonetelAgentId: string }> = + (data?.agents?.edges ?? []).map((e: any) => e.node); + // Filter to agents whose in-memory state (from Ozonetel webhooks) is active. + // If state is unknown (never seen a state event), treat as offline. + return all.filter((a) => { + if (!a.name || !a.ozonetelAgentId) return false; + const entry = this.supervisor.getAgentState(a.ozonetelAgentId); + return entry ? ACTIVE_STATES.has(entry.state) : false; + }); + } catch (err: any) { + this.logger.warn(`[AUTO-ASSIGN] fetch agents failed: ${err?.message ?? err}`); + return []; + } + } + + private async fetchOpenLeadCounts(agentNames: string[]): Promise> { + const map = new Map(); + for (const name of agentNames) map.set(name, 0); + if (agentNames.length === 0) return map; + + // Single aggregated query — pull ALL open leads with assignedAgent set, + // count by agent locally. Avoids N+1 over agents. + try { + let after: string | null = null; + for (let page = 0; page < 20; page++) { + const cursor: string = after ? `, after: "${after}"` : ''; + const data: any = await this.platform.query( + `{ leads(first: 200${cursor}, filter: { + status: { in: [NEW, CONTACTED, QUALIFIED] } + }) { + edges { node { assignedAgent } } + pageInfo { hasNextPage endCursor } + } }`, + ); + const edges = data?.leads?.edges ?? []; + for (const e of edges) { + const name = e.node.assignedAgent; + if (name && map.has(name)) map.set(name, (map.get(name) ?? 0) + 1); + } + const info: { hasNextPage?: boolean; endCursor?: string } = data?.leads?.pageInfo ?? {}; + if (!info.hasNextPage) break; + after = info.endCursor ?? null; + } + } catch (err: any) { + this.logger.warn(`[AUTO-ASSIGN] fetch open-lead counts failed: ${err?.message ?? err}`); + } + return map; + } +} diff --git a/src/leads/leads.module.ts b/src/leads/leads.module.ts new file mode 100644 index 0000000..7299f3a --- /dev/null +++ b/src/leads/leads.module.ts @@ -0,0 +1,11 @@ +import { Module, forwardRef } from '@nestjs/common'; +import { PlatformModule } from '../platform/platform.module'; +import { SupervisorModule } from '../supervisor/supervisor.module'; +import { LeadAutoAssignService } from './lead-auto-assign.service'; + +@Module({ + imports: [PlatformModule, forwardRef(() => SupervisorModule)], + providers: [LeadAutoAssignService], + exports: [LeadAutoAssignService], +}) +export class LeadsModule {}