feat(leads): sidecar polling service for auto-assigning unassigned leads

Replaces the untrusted platform function path (SDK's lead-auto-assign
was written but never deployed to either workspace — all leads created
after seeding are orphan).

Polls every 60s:
  1. Fetch up to 100 unassigned leads (assignedAgent empty or null)
  2. Fetch platform Agents whose live SupervisorService state is
     ready/calling/in-call/acw (skip offline/break/training/unknown)
  3. Build open-lead count per agent (single paginated query)
  4. Assign each unassigned lead to the least-loaded active agent —
     writes agent.name into lead.assignedAgent to match the worklist
     filter (assignedAgent: { eq: agentName })

Catches every lead-creation path: CSV import, enquiry form,
missed-call webhook, widget, livekit. No platform changes needed.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-15 11:23:53 +05:30
parent 98f5bc0347
commit 34e053204f
3 changed files with 195 additions and 0 deletions

View File

@@ -0,0 +1,182 @@
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { PlatformGraphqlService } from '../platform/platform-graphql.service';
import { SupervisorService } from '../supervisor/supervisor.service';
const TICK_INTERVAL_MS = 60 * 1000; // 60s
const KICKOFF_DELAY_MS = 45_000; // let sidecar boot settle
const MAX_LEADS_PER_TICK = 100; // guard against runaway batches
const ACTIVE_STATES = new Set(['ready', 'calling', 'in-call', 'acw']);
// Excluded: 'offline' (agent logged out), 'break' / 'training' (explicitly away).
// ACW is included — the agent is still handling work and will return to Ready soon.
/**
* Polls for unassigned leads every 60s and assigns them least-loaded across
* active agents.
*
* Why polling instead of platform functions or Redpanda events:
* - The platform's lead.created hook isn't wired to the sidecar (no bridge)
* - The SDK's lead-auto-assign.function.ts is written but hasn't been
* deployed/published to either workspace
* - Polling catches EVERY lead creation path (CSV import, enquiry form,
* missed-call webhook, widget, livekit) with no per-path instrumentation
*
* Assignment strategy:
* - Count each active agent's OPEN leads (status in NEW/CONTACTED/QUALIFIED)
* - Pick the agent with the lowest count — ties broken by platform ordering
* - Write agent.name (display name) to lead.assignedAgent (worklist filter matches on this)
*
* Edge cases:
* - No active agents → skip tick; next run retries
* - agentName empty → skip agent
* - Mutation errors → log, continue with next lead
*/
@Injectable()
export class LeadAutoAssignService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(LeadAutoAssignService.name);
private timer: NodeJS.Timeout | null = null;
private running = false;
constructor(
private readonly platform: PlatformGraphqlService,
private readonly supervisor: SupervisorService,
) {}
onModuleInit() {
setTimeout(() => {
this.runOnce().catch((err) => this.logger.warn(`[AUTO-ASSIGN] Kickoff failed: ${err?.message ?? err}`));
}, KICKOFF_DELAY_MS);
this.timer = setInterval(() => {
this.runOnce().catch((err) => this.logger.warn(`[AUTO-ASSIGN] Tick failed: ${err?.message ?? err}`));
}, TICK_INTERVAL_MS);
}
onModuleDestroy() {
if (this.timer) clearInterval(this.timer);
}
async runOnce(): Promise<{ assigned: number; skipped: number; noAgents: boolean }> {
// Guard against concurrent runs (prev tick hasn't finished).
if (this.running) return { assigned: 0, skipped: 0, noAgents: false };
this.running = true;
try {
const unassigned = await this.fetchUnassignedLeads();
if (unassigned.length === 0) return { assigned: 0, skipped: 0, noAgents: false };
const active = await this.fetchActiveAgents();
if (active.length === 0) {
this.logger.debug(`[AUTO-ASSIGN] ${unassigned.length} leads waiting — no active agents`);
return { assigned: 0, skipped: unassigned.length, noAgents: true };
}
// Seed current-load map: lead count per agent across their OPEN leads.
// Fetch once per tick (not per lead) — the map is updated locally as we assign.
const loadByAgent = await this.fetchOpenLeadCounts(active.map((a) => a.name));
let assigned = 0;
let skipped = 0;
for (const lead of unassigned) {
// Pick the least-loaded active agent.
const target = [...active].sort(
(a, b) => (loadByAgent.get(a.name) ?? 0) - (loadByAgent.get(b.name) ?? 0),
)[0];
if (!target?.name) { skipped++; continue; }
try {
await this.platform.query<any>(
`mutation($id: UUID!, $data: LeadUpdateInput!) { updateLead(id: $id, data: $data) { id } }`,
{ id: lead.id, data: { assignedAgent: target.name } },
);
assigned++;
loadByAgent.set(target.name, (loadByAgent.get(target.name) ?? 0) + 1);
await new Promise((r) => setTimeout(r, 40)); // gentle pacing
} catch (err: any) {
this.logger.warn(`[AUTO-ASSIGN] updateLead failed for ${lead.id}: ${err?.message ?? err}`);
skipped++;
}
}
if (assigned > 0 || skipped > 0) {
const loadSummary = active.map((a) => `${a.name}=${loadByAgent.get(a.name) ?? 0}`).join(', ');
this.logger.log(`[AUTO-ASSIGN] Pass complete — assigned=${assigned} skipped=${skipped} load=[${loadSummary}]`);
}
return { assigned, skipped, noAgents: false };
} finally {
this.running = false;
}
}
private async fetchUnassignedLeads(): Promise<Array<{ id: string; campaignId: string | null }>> {
try {
const data: any = await this.platform.query<any>(
`{ leads(first: ${MAX_LEADS_PER_TICK}, filter: {
or: [
{ assignedAgent: { eq: "" } },
{ assignedAgent: { is: NULL } }
]
}, orderBy: [{ createdAt: AscNullsLast }]) {
edges { node { id campaignId } }
} }`,
);
return (data?.leads?.edges ?? []).map((e: any) => e.node);
} catch (err: any) {
this.logger.warn(`[AUTO-ASSIGN] fetch unassigned failed: ${err?.message ?? err}`);
return [];
}
}
private async fetchActiveAgents(): Promise<Array<{ id: string; name: string; ozonetelAgentId: string }>> {
try {
const data: any = await this.platform.query<any>(
`{ agents(first: 100) { edges { node { id name ozonetelAgentId } } } }`,
);
const all: Array<{ id: string; name: string; ozonetelAgentId: string }> =
(data?.agents?.edges ?? []).map((e: any) => e.node);
// Filter to agents whose in-memory state (from Ozonetel webhooks) is active.
// If state is unknown (never seen a state event), treat as offline.
return all.filter((a) => {
if (!a.name || !a.ozonetelAgentId) return false;
const entry = this.supervisor.getAgentState(a.ozonetelAgentId);
return entry ? ACTIVE_STATES.has(entry.state) : false;
});
} catch (err: any) {
this.logger.warn(`[AUTO-ASSIGN] fetch agents failed: ${err?.message ?? err}`);
return [];
}
}
private async fetchOpenLeadCounts(agentNames: string[]): Promise<Map<string, number>> {
const map = new Map<string, number>();
for (const name of agentNames) map.set(name, 0);
if (agentNames.length === 0) return map;
// Single aggregated query — pull ALL open leads with assignedAgent set,
// count by agent locally. Avoids N+1 over agents.
try {
let after: string | null = null;
for (let page = 0; page < 20; page++) {
const cursor: string = after ? `, after: "${after}"` : '';
const data: any = await this.platform.query<any>(
`{ leads(first: 200${cursor}, filter: {
status: { in: [NEW, CONTACTED, QUALIFIED] }
}) {
edges { node { assignedAgent } }
pageInfo { hasNextPage endCursor }
} }`,
);
const edges = data?.leads?.edges ?? [];
for (const e of edges) {
const name = e.node.assignedAgent;
if (name && map.has(name)) map.set(name, (map.get(name) ?? 0) + 1);
}
const info: { hasNextPage?: boolean; endCursor?: string } = data?.leads?.pageInfo ?? {};
if (!info.hasNextPage) break;
after = info.endCursor ?? null;
}
} catch (err: any) {
this.logger.warn(`[AUTO-ASSIGN] fetch open-lead counts failed: ${err?.message ?? err}`);
}
return map;
}
}

11
src/leads/leads.module.ts Normal file
View File

@@ -0,0 +1,11 @@
import { Module, forwardRef } from '@nestjs/common';
import { PlatformModule } from '../platform/platform.module';
import { SupervisorModule } from '../supervisor/supervisor.module';
import { LeadAutoAssignService } from './lead-auto-assign.service';
@Module({
imports: [PlatformModule, forwardRef(() => SupervisorModule)],
providers: [LeadAutoAssignService],
exports: [LeadAutoAssignService],
})
export class LeadsModule {}