import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; import { RulesEngineService } from '../rules-engine.service'; import { RulesStorageService } from '../rules-storage.service'; import { PerformanceFactsProvider } from '../facts/performance-facts.provider'; import { PlatformGraphqlService } from '../../platform/platform-graphql.service'; const TICK_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes const KICKOFF_DELAY_MS = 90_000; // wait for boot to settle /** * Evaluates `on_schedule` performance rules every 5 minutes for every * platform Agent. Facts come from PerformanceFactsProvider; matching * rules dispatch the escalate action which persists a PerformanceAlert. * * Skips quietly when no scheduled performance rules are configured. */ @Injectable() export class PerformanceConsumer implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(PerformanceConsumer.name); private timer: NodeJS.Timeout | null = null; constructor( private readonly engine: RulesEngineService, private readonly storage: RulesStorageService, private readonly facts: PerformanceFactsProvider, private readonly platform: PlatformGraphqlService, ) {} onModuleInit() { setTimeout(() => { this.runOnce().catch((err) => { this.logger.warn(`[PERF-CONSUMER] First run failed: ${err?.message ?? err}`); }); }, KICKOFF_DELAY_MS); this.timer = setInterval(() => { this.runOnce().catch((err) => { this.logger.warn(`[PERF-CONSUMER] Tick failed: ${err?.message ?? err}`); }); }, TICK_INTERVAL_MS); } onModuleDestroy() { if (this.timer) clearInterval(this.timer); } async runOnce(): Promise<{ agentsScanned: number; alertsFired: number }> { // Storage.getByTrigger doesn't sub-discriminate on_schedule rules, so // filter to only those that reference agent.* facts in their conditions. // Anything else (e.g. SLA-breach rules over call.* facts) belongs to // other consumers. const allScheduled = await this.storage.getByTrigger('on_schedule'); const rules = allScheduled.filter((r) => this.referencesAgentFacts(r.conditions)); if (rules.length === 0) { this.logger.debug('[PERF-CONSUMER] No agent-fact on_schedule rules — skipping'); return { agentsScanned: 0, alertsFired: 0 }; } const agents = await this.fetchAgents(); if (agents.length === 0) return { agentsScanned: 0, alertsFired: 0 }; let alertsFired = 0; for (const agent of agents) { try { const factContext = await this.facts.resolveFacts({ agentId: agent.id, agentName: agent.name }); // Each rule's escalate action needs to know which fact value // to surface as the alert's value (e.g. "65m" for idle). // Inject _alertValue per-rule below. for (const rule of rules) { const ruleFacts = { ...factContext }; const valueFact = (rule.action.params as any)?.valueFact as string | undefined; if (valueFact && ruleFacts[valueFact] != null) { ruleFacts['_alertValue'] = ruleFacts[valueFact]; } const result = await this.engine.evaluate('on_schedule', 'performance', ruleFacts); alertsFired += result.results.filter((r: any) => r.success && !r.data?.deduped).length; } } catch (err: any) { this.logger.warn(`[PERF-CONSUMER] Eval failed for agent=${agent.id}: ${err?.message ?? err}`); } } if (alertsFired > 0) { this.logger.log(`[PERF-CONSUMER] Tick complete — agents=${agents.length} alertsFired=${alertsFired}`); } return { agentsScanned: agents.length, alertsFired }; } private referencesAgentFacts(group: any): boolean { if (!group) return false; const items = group.all ?? group.any ?? []; for (const item of items) { if (item.all || item.any) { if (this.referencesAgentFacts(item)) return true; } else if (typeof item.fact === 'string' && item.fact.startsWith('agent.')) { return true; } } return false; } private async fetchAgents(): Promise> { try { const data = await this.platform.query( `{ agents(first: 100) { edges { node { id name } } } }`, ); return (data?.agents?.edges ?? []).map((e: any) => e.node); } catch (err) { this.logger.warn(`[PERF-CONSUMER] Agent fetch failed: ${err}`); return []; } } }