diff --git a/src/messaging/flow/default-flows/appointment-booking.json b/src/messaging/flow/default-flows/appointment-booking.json new file mode 100644 index 0000000..1a1bc43 --- /dev/null +++ b/src/messaging/flow/default-flows/appointment-booking.json @@ -0,0 +1,320 @@ +{ + "id": "flow-appointment-booking", + "name": "Appointment Booking", + "description": "AI-driven appointment booking via WhatsApp with interactive department, doctor, date, and slot selection.", + "trigger": { "type": "default" }, + "version": 1, + "status": "published", + "variables": [ + { "id": "v1", "name": "intent", "type": "string" }, + { "id": "v2", "name": "selectedDepartment", "type": "string" }, + { "id": "v3", "name": "selectedDepartmentTitle", "type": "string" }, + { "id": "v4", "name": "selectedDoctor", "type": "string" }, + { "id": "v5", "name": "selectedDoctorTitle", "type": "string" }, + { "id": "v6", "name": "doctorId", "type": "string" }, + { "id": "v7", "name": "dateChoice", "type": "string" }, + { "id": "v8", "name": "selectedDate", "type": "string" }, + { "id": "v9", "name": "selectedSlot", "type": "string" }, + { "id": "v10", "name": "confirmation", "type": "string" }, + { "id": "v11", "name": "bookingResult", "type": "object" }, + { "id": "v12", "name": "deptListResult", "type": "object" }, + { "id": "v13", "name": "docListResult", "type": "object" }, + { "id": "v14", "name": "slotListResult", "type": "object" }, + { "id": "v15", "name": "aiGreeting", "type": "string" }, + { "id": "v16", "name": "reason", "type": "string" } + ], + "groups": [ + { + "id": "g1", + "title": "Greeting", + "blocks": [ + { + "id": "b1", + "type": "ai", + "prompt": "Greet the patient {{_senderName}} warmly in 1-2 sentences. They messaged: \"{{_initialMessage}}\". You are a WhatsApp assistant for Ramaiah Hospital. Be concise, no markdown.", + "outputVariableId": "aiGreeting", + "sendToPatient": true + }, + { + "id": "b2", + "type": "message", + "content": { + "format": "buttons", + "text": "How can I help you today?", + "buttons": [ + { "id": "intent:book", "title": "Book Appointment" }, + { "id": "intent:check", "title": "Check Appointment" }, + { "id": "intent:question", "title": "Ask a Question" } + ] + } + }, + { + "id": "b3", + "type": "input", + "inputType": "any", + "variableId": "intent" + }, + { + "id": "b4", + "type": "condition", + "conditions": [ + { "id": "c1", "variableId": "intent", "operator": "contains", "value": "book" }, + { "id": "c2", "variableId": "intent", "operator": "contains", "value": "check" } + ] + } + ] + }, + { + "id": "g2", + "title": "Department Selection", + "blocks": [ + { + "id": "b5", + "type": "tool_call", + "toolName": "send_department_list", + "inputs": {}, + "outputVariableId": "deptListResult" + }, + { + "id": "b6", + "type": "input", + "inputType": "any", + "variableId": "selectedDepartment" + }, + { + "id": "b7", + "type": "set_variable", + "variableId": "selectedDepartmentTitle", + "value": "selectedDepartment", + "expression": "extract_id" + } + ] + }, + { + "id": "g3", + "title": "Doctor Selection", + "blocks": [ + { + "id": "b8", + "type": "tool_call", + "toolName": "send_doctor_list", + "inputs": { "department": "{{selectedDepartmentTitle}}" }, + "outputVariableId": "docListResult" + }, + { + "id": "b9", + "type": "input", + "inputType": "any", + "variableId": "selectedDoctor" + }, + { + "id": "b10", + "type": "set_variable", + "variableId": "doctorId", + "value": "selectedDoctor", + "expression": "extract_id" + } + ] + }, + { + "id": "g4", + "title": "Date Selection", + "blocks": [ + { + "id": "b11", + "type": "message", + "content": { + "format": "buttons", + "text": "When would you like to visit?", + "buttons": [ + { "id": "date:tomorrow", "title": "Tomorrow" }, + { "id": "date:day_after", "title": "Day After Tomorrow" }, + { "id": "date:other", "title": "Choose Another Date" } + ] + } + }, + { + "id": "b12", + "type": "input", + "inputType": "any", + "variableId": "dateChoice" + }, + { + "id": "b13", + "type": "condition", + "conditions": [ + { "id": "c3", "variableId": "dateChoice", "operator": "contains", "value": "tomorrow" }, + { "id": "c4", "variableId": "dateChoice", "operator": "contains", "value": "day_after" } + ] + }, + { + "id": "b14", + "type": "set_variable", + "variableId": "selectedDate", + "value": "", + "expression": "date_tomorrow" + } + ] + }, + { + "id": "g4a", + "title": "Date - Day After", + "blocks": [ + { + "id": "b15", + "type": "set_variable", + "variableId": "selectedDate", + "value": "", + "expression": "date_day_after" + } + ] + }, + { + "id": "g5", + "title": "Slot Selection", + "blocks": [ + { + "id": "b16", + "type": "tool_call", + "toolName": "send_slot_list", + "inputs": { + "doctorId": "{{doctorId}}", + "doctorName": "{{selectedDoctor_title}}", + "date": "{{selectedDate}}" + }, + "outputVariableId": "slotListResult" + }, + { + "id": "b17", + "type": "input", + "inputType": "any", + "variableId": "selectedSlot" + } + ] + }, + { + "id": "g6", + "title": "Reason", + "blocks": [ + { + "id": "b18", + "type": "message", + "content": { + "format": "text", + "text": "What is the reason for your visit? (e.g., General Consultation, Follow-up, etc.)" + } + }, + { + "id": "b19", + "type": "input", + "inputType": "text", + "variableId": "reason" + } + ] + }, + { + "id": "g7", + "title": "Confirmation", + "blocks": [ + { + "id": "b20", + "type": "tool_call", + "toolName": "send_confirm_buttons", + "inputs": { + "summary": "Appointment Summary:\nDoctor: {{selectedDoctor_title}}\nDate: {{selectedDate}}\nTime: {{selectedSlot_title}}\nReason: {{reason}}\n\nShall I confirm this booking?" + } + }, + { + "id": "b21", + "type": "input", + "inputType": "any", + "variableId": "confirmation" + }, + { + "id": "b22", + "type": "condition", + "conditions": [ + { "id": "c5", "variableId": "confirmation", "operator": "contains", "value": "confirm" }, + { "id": "c6", "variableId": "confirmation", "operator": "contains", "value": "cancel" } + ] + } + ] + }, + { + "id": "g8", + "title": "Booking", + "blocks": [ + { + "id": "b23", + "type": "tool_call", + "toolName": "book_appointment", + "inputs": { + "patientName": "{{_senderName}}", + "phoneNumber": "{{_phone}}", + "department": "{{selectedDepartmentTitle}}", + "doctorName": "{{selectedDoctor_title}}", + "scheduledAt": "{{selectedSlot}}", + "reason": "{{reason}}" + }, + "outputVariableId": "bookingResult" + }, + { + "id": "b24", + "type": "message", + "content": { + "format": "text", + "text": "Your appointment is confirmed!\n\nDoctor: {{selectedDoctor_title}}\nDate: {{selectedDate}}\nTime: {{selectedSlot_title}}\nReason: {{reason}}\n\nThank you for choosing Ramaiah Hospital. See you soon!" + } + } + ] + }, + { + "id": "g9", + "title": "Cancelled", + "blocks": [ + { + "id": "b25", + "type": "message", + "content": { + "format": "text", + "text": "No problem! Your booking has been cancelled. Feel free to message us again whenever you'd like to book an appointment." + } + } + ] + }, + { + "id": "g10", + "title": "Check Appointments", + "blocks": [ + { + "id": "b26", + "type": "tool_call", + "toolName": "lookup_appointments", + "inputs": {}, + "outputVariableId": "existingAppts" + }, + { + "id": "b27", + "type": "ai", + "prompt": "The patient {{_senderName}} asked to check their appointments. Here are their appointments: {{existingAppts}}. Summarize them in a friendly WhatsApp message. If no appointments, say they have none and offer to book one. Be concise, no markdown.", + "outputVariableId": "apptSummary", + "sendToPatient": true + } + ] + } + ], + "edges": [ + { "id": "e1", "from": { "blockId": "b4", "conditionId": "c1" }, "to": { "groupId": "g2" } }, + { "id": "e2", "from": { "blockId": "b4", "conditionId": "c2" }, "to": { "groupId": "g10" } }, + { "id": "e3", "from": { "blockId": "b7" }, "to": { "groupId": "g3" } }, + { "id": "e4", "from": { "blockId": "b10" }, "to": { "groupId": "g4" } }, + { "id": "e5", "from": { "blockId": "b13", "conditionId": "c3" }, "to": { "groupId": "g5" } }, + { "id": "e6", "from": { "blockId": "b13", "conditionId": "c4" }, "to": { "groupId": "g4a" } }, + { "id": "e7", "from": { "blockId": "b14" }, "to": { "groupId": "g5" } }, + { "id": "e8", "from": { "blockId": "b15" }, "to": { "groupId": "g5" } }, + { "id": "e9", "from": { "blockId": "b17" }, "to": { "groupId": "g6" } }, + { "id": "e10", "from": { "blockId": "b19" }, "to": { "groupId": "g7" } }, + { "id": "e11", "from": { "blockId": "b22", "conditionId": "c5" }, "to": { "groupId": "g8" } }, + { "id": "e12", "from": { "blockId": "b22", "conditionId": "c6" }, "to": { "groupId": "g9" } } + ] +} diff --git a/src/messaging/flow/flow-execution.service.ts b/src/messaging/flow/flow-execution.service.ts new file mode 100644 index 0000000..770aad5 --- /dev/null +++ b/src/messaging/flow/flow-execution.service.ts @@ -0,0 +1,325 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { generateText, stepCountIs } from 'ai'; +import { createAiModel } from '../../ai/ai-provider'; +import { AiConfigService } from '../../config/ai-config.service'; +import { CallerResolutionService } from '../../caller/caller-resolution.service'; +import { CallerContextService } from '../../caller/caller-context.service'; +import { PlatformGraphqlService } from '../../platform/platform-graphql.service'; +import { MessagingProvider } from '../providers/messaging-provider.interface'; +import { FlowSessionService } from './flow-session.service'; +import { FlowStoreService } from './flow-store.service'; +import { FlowVariableService } from './flow-variable.service'; +import { ToolRegistry } from './tool-registry'; +import type { Flow, FlowSession, Group, Block, ConditionBlock, ToolContext } from './flow-types'; +import type { NormalizedMessage } from '../types'; +import type { LanguageModel } from 'ai'; + +@Injectable() +export class FlowExecutionService { + private readonly logger = new Logger(FlowExecutionService.name); + private readonly aiModel: LanguageModel | null; + private readonly auth: string; + + constructor( + private config: ConfigService, + private provider: MessagingProvider, + private sessions: FlowSessionService, + private store: FlowStoreService, + private variables: FlowVariableService, + private tools: ToolRegistry, + private caller: CallerResolutionService, + private callerContext: CallerContextService, + private platform: PlatformGraphqlService, + private aiConfig: AiConfigService, + ) { + const cfg = aiConfig.getConfig(); + this.aiModel = createAiModel({ + provider: cfg.provider, + model: cfg.model, + anthropicApiKey: config.get('ai.anthropicApiKey'), + openaiApiKey: config.get('ai.openaiApiKey'), + }); + const apiKey = config.get('platform.apiKey') ?? ''; + this.auth = apiKey ? `Bearer ${apiKey}` : ''; + } + + async handleMessage(message: NormalizedMessage): Promise { + const { phone } = message; + + // 1. Load existing session or start new flow + let session = await this.sessions.load(phone); + let flow: Flow | null = null; + + if (session) { + flow = this.store.getById(session.flowId); + if (!flow) { + this.logger.warn(`[FLOW] Flow ${session.flowId} not found — clearing session`); + await this.sessions.clear(phone); + session = null; + } + } + + if (!session) { + flow = this.store.matchFlow(message.text); + if (!flow) { + this.logger.log(`[FLOW] No matching flow for: ${message.text.substring(0, 50)}`); + await this.provider.sendText(phone, 'Sorry, I didn\'t understand. Please try again.'); + return; + } + + // Initialize session + const firstGroup = flow.groups[0]; + if (!firstGroup) { + this.logger.error(`[FLOW] Flow ${flow.id} has no groups`); + return; + } + + session = { + flowId: flow.id, + currentGroupId: firstGroup.id, + currentBlockIndex: 0, + variables: this.initializeVariables(flow, message), + startedAt: Date.now(), + lastActiveAt: Date.now(), + }; + + // Resolve caller and inject context variables + const resolved = await this.caller.resolve(phone, this.auth).catch(() => null); + if (resolved) { + session.variables['_callerName'] = `${resolved.firstName} ${resolved.lastName}`.trim(); + session.variables['_leadId'] = resolved.leadId; + session.variables['_patientId'] = resolved.patientId; + session.variables['_isNew'] = resolved.isNew; + session.variables['_phone'] = phone; + } + + this.logger.log(`[FLOW] Started flow "${flow.name}" for ${phone}`); + } + + // 2. If paused at an InputBlock, process the reply + const currentGroup = flow!.groups.find(g => g.id === session!.currentGroupId); + if (currentGroup) { + const currentBlock = currentGroup.blocks[session!.currentBlockIndex]; + if (currentBlock?.type === 'input') { + const value = message.interactiveReply?.id ?? message.text; + session!.variables[currentBlock.variableId] = value; + + // Also store the display title for interactive replies + if (message.interactiveReply?.title) { + session!.variables[currentBlock.variableId + '_title'] = message.interactiveReply.title; + } + + this.logger.log(`[FLOW] Input received: ${currentBlock.variableId}=${value}`); + session!.currentBlockIndex++; + } + } + + // 3. Walk forward + await this.walkForward(phone, session!, flow!); + } + + private async walkForward(phone: string, session: FlowSession, flow: Flow): Promise { + let iterations = 0; + const maxIterations = 50; // safety valve + + while (iterations++ < maxIterations) { + const group = flow.groups.find(g => g.id === session.currentGroupId); + if (!group) { + this.logger.log(`[FLOW] Group ${session.currentGroupId} not found — flow complete`); + await this.sessions.clear(phone); + return; + } + + // End of group — follow outgoing edge + if (session.currentBlockIndex >= group.blocks.length) { + const edge = this.findGroupEdge(flow, group); + if (!edge) { + this.logger.log(`[FLOW] No outgoing edge from group "${group.title}" — flow complete`); + await this.sessions.clear(phone); + return; + } + session.currentGroupId = edge.to.groupId; + session.currentBlockIndex = 0; + continue; + } + + const block = group.blocks[session.currentBlockIndex]; + this.logger.log(`[FLOW] Executing block ${block.id} (${block.type}) in group "${group.title}"`); + + const shouldStop = await this.executeBlock(block, phone, session, flow); + if (shouldStop) { + await this.sessions.save(phone, session); + return; + } + } + + this.logger.error(`[FLOW] Max iterations reached for ${phone} — possible infinite loop`); + await this.sessions.clear(phone); + } + + // Returns true if execution should pause (InputBlock) + private async executeBlock(block: Block, phone: string, session: FlowSession, flow: Flow): Promise { + const ctx: ToolContext = { + phone, + session, + provider: this.provider, + platform: this.platform, + auth: this.auth, + }; + + switch (block.type) { + case 'message': { + const content = block.content; + if (content.format === 'text') { + const text = this.variables.interpolate(content.text, session.variables); + await this.provider.sendText(phone, text); + } else if (content.format === 'buttons') { + const text = this.variables.interpolate(content.text, session.variables); + await this.provider.sendButtons(phone, text, content.buttons); + } else if (content.format === 'list') { + const text = this.variables.interpolate(content.text, session.variables); + await this.provider.sendList(phone, text, content.buttonText, content.sections); + } + session.currentBlockIndex++; + return false; + } + + case 'input': { + // Pause — wait for next message + this.logger.log(`[FLOW] Waiting for input → ${block.variableId}`); + return true; + } + + case 'condition': { + const matched = this.evaluateConditions(block, session); + if (matched) { + const edge = flow.edges.find(e => + e.from.blockId === block.id && e.from.conditionId === matched.id, + ); + if (edge) { + session.currentGroupId = edge.to.groupId; + session.currentBlockIndex = 0; + return false; + } + } + // No match — fall through to next block + session.currentBlockIndex++; + return false; + } + + case 'set_variable': { + if (block.expression) { + const rawValue = session.variables[block.value] ?? block.value; + session.variables[block.variableId] = this.variables.evaluateExpression( + block.expression, String(rawValue), session.variables, + ); + } else { + session.variables[block.variableId] = this.variables.interpolate(block.value, session.variables); + } + this.logger.log(`[FLOW] Set ${block.variableId}=${session.variables[block.variableId]}`); + session.currentBlockIndex++; + return false; + } + + case 'tool_call': { + const inputs = this.variables.interpolateObject(block.inputs, session.variables); + const result = await this.tools.execute(block.toolName, inputs, ctx); + if (block.outputVariableId) { + session.variables[block.outputVariableId] = result; + } + session.currentBlockIndex++; + return false; + } + + case 'ai': { + if (!this.aiModel) { + session.currentBlockIndex++; + return false; + } + const prompt = this.variables.interpolate(block.prompt, session.variables); + try { + const result = await generateText({ + model: this.aiModel, + prompt, + stopWhen: stepCountIs(1), + }); + const text = result.text?.trim() ?? ''; + if (block.outputVariableId) { + session.variables[block.outputVariableId] = text; + } + if (block.sendToPatient && text) { + await this.provider.sendText(phone, text); + } + } catch (err: any) { + this.logger.error(`[FLOW] AI block failed: ${err.message}`); + } + session.currentBlockIndex++; + return false; + } + + case 'jump': { + session.currentGroupId = block.targetGroupId; + session.currentBlockIndex = 0; + return false; + } + + default: + this.logger.warn(`[FLOW] Unknown block type: ${(block as any).type}`); + session.currentBlockIndex++; + return false; + } + } + + private evaluateConditions(block: ConditionBlock, session: FlowSession) { + for (const cond of block.conditions) { + const value = session.variables[cond.variableId]; + const target = cond.value ? this.variables.interpolate(cond.value, session.variables) : undefined; + + let match = false; + switch (cond.operator) { + case 'equals': match = String(value) === target; break; + case 'contains': match = String(value ?? '').toLowerCase().includes((target ?? '').toLowerCase()); break; + case 'exists': match = value !== undefined && value !== null && value !== ''; break; + case 'not_exists': match = value === undefined || value === null || value === ''; break; + case 'starts_with': match = String(value ?? '').startsWith(target ?? ''); break; + case 'gt': match = Number(value) > Number(target); break; + case 'lt': match = Number(value) < Number(target); break; + } + + if (match) return cond; + } + return null; + } + + private findGroupEdge(flow: Flow, group: Group) { + // Find edge from the last block in the group (default outgoing) + const lastBlock = group.blocks[group.blocks.length - 1]; + if (lastBlock) { + const edge = flow.edges.find(e => e.from.blockId === lastBlock.id && !e.from.conditionId); + if (edge) return edge; + } + // Fallback: any edge from any block in this group without conditionId + for (const block of group.blocks) { + const edge = flow.edges.find(e => e.from.blockId === block.id && !e.from.conditionId); + if (edge) return edge; + } + return null; + } + + private initializeVariables(flow: Flow, message: NormalizedMessage): Record { + const vars: Record = {}; + for (const v of flow.variables) { + vars[v.name] = v.defaultValue ?? null; + } + // Inject message context + vars['_initialMessage'] = message.text; + vars['_senderName'] = message.name; + return vars; + } + + // Check if flow engine has any published flows + hasFlows(): boolean { + return this.store.getAll().some(f => f.status === 'published'); + } +} diff --git a/src/messaging/flow/flow-session.service.ts b/src/messaging/flow/flow-session.service.ts new file mode 100644 index 0000000..ff66a5f --- /dev/null +++ b/src/messaging/flow/flow-session.service.ts @@ -0,0 +1,39 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import Redis from 'ioredis'; +import type { FlowSession } from './flow-types'; + +@Injectable() +export class FlowSessionService { + private readonly logger = new Logger(FlowSessionService.name); + private readonly redis: Redis; + private readonly ttlSec = 24 * 60 * 60; // 24h + + constructor(config: ConfigService) { + const redisUrl = config.get('redis.url') ?? 'redis://localhost:6379'; + this.redis = new Redis(redisUrl); + } + + private key(phone: string): string { + return `wa:flow:${phone}`; + } + + async load(phone: string): Promise { + const raw = await this.redis.get(this.key(phone)); + if (!raw) return null; + try { + return JSON.parse(raw); + } catch { + return null; + } + } + + async save(phone: string, session: FlowSession): Promise { + session.lastActiveAt = Date.now(); + await this.redis.setex(this.key(phone), this.ttlSec, JSON.stringify(session)); + } + + async clear(phone: string): Promise { + await this.redis.del(this.key(phone)); + } +} diff --git a/src/messaging/flow/flow-store.service.ts b/src/messaging/flow/flow-store.service.ts new file mode 100644 index 0000000..e66fefc --- /dev/null +++ b/src/messaging/flow/flow-store.service.ts @@ -0,0 +1,102 @@ +import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; +import { existsSync, readFileSync, writeFileSync, readdirSync } from 'fs'; +import { join } from 'path'; +import type { Flow } from './flow-types'; + +const FLOWS_DIR = join(process.cwd(), 'data', 'flows'); +const DEFAULTS_DIR = join(__dirname, 'default-flows'); + +@Injectable() +export class FlowStoreService implements OnModuleInit { + private readonly logger = new Logger(FlowStoreService.name); + private flows: Map = new Map(); + + onModuleInit() { + this.ensureDirectory(); + this.seedDefaults(); + this.loadAll(); + } + + private ensureDirectory() { + const { mkdirSync } = require('fs'); + if (!existsSync(FLOWS_DIR)) { + mkdirSync(FLOWS_DIR, { recursive: true }); + } + } + + private seedDefaults() { + // Copy default flows if data/flows/ is empty + if (!existsSync(DEFAULTS_DIR)) return; + const existing = readdirSync(FLOWS_DIR).filter(f => f.endsWith('.json')); + if (existing.length > 0) return; + + const defaults = readdirSync(DEFAULTS_DIR).filter(f => f.endsWith('.json')); + for (const file of defaults) { + const src = join(DEFAULTS_DIR, file); + const dest = join(FLOWS_DIR, file); + const content = readFileSync(src, 'utf-8'); + writeFileSync(dest, content); + this.logger.log(`[FLOW-STORE] Seeded default flow: ${file}`); + } + } + + private loadAll() { + this.flows.clear(); + const files = readdirSync(FLOWS_DIR).filter(f => f.endsWith('.json')); + for (const file of files) { + try { + const raw = readFileSync(join(FLOWS_DIR, file), 'utf-8'); + const flow: Flow = JSON.parse(raw); + this.flows.set(flow.id, flow); + this.logger.log(`[FLOW-STORE] Loaded flow: ${flow.name} (${flow.id}) status=${flow.status}`); + } catch (err: any) { + this.logger.error(`[FLOW-STORE] Failed to load ${file}: ${err.message}`); + } + } + this.logger.log(`[FLOW-STORE] ${this.flows.size} flow(s) loaded`); + } + + getById(id: string): Flow | null { + return this.flows.get(id) ?? null; + } + + // Match inbound message to a published flow by trigger + matchFlow(messageText: string): Flow | null { + let defaultFlow: Flow | null = null; + + for (const flow of this.flows.values()) { + if (flow.status !== 'published') continue; + + if (flow.trigger.type === 'default') { + defaultFlow = flow; + continue; + } + + if (flow.trigger.type === 'message' && flow.trigger.conditions) { + const { keywords, regex } = flow.trigger.conditions; + const lower = messageText.toLowerCase(); + + if (keywords?.some(k => lower.includes(k.toLowerCase()))) { + return flow; + } + if (regex && new RegExp(regex, 'i').test(messageText)) { + return flow; + } + } + } + + return defaultFlow; + } + + // CRUD for admin API (future) + getAll(): Flow[] { + return Array.from(this.flows.values()); + } + + save(flow: Flow): void { + this.flows.set(flow.id, flow); + const file = join(FLOWS_DIR, `${flow.id}.json`); + writeFileSync(file, JSON.stringify(flow, null, 2)); + this.logger.log(`[FLOW-STORE] Saved flow: ${flow.name} (${flow.id})`); + } +} diff --git a/src/messaging/flow/flow-types.ts b/src/messaging/flow/flow-types.ts new file mode 100644 index 0000000..31656ec --- /dev/null +++ b/src/messaging/flow/flow-types.ts @@ -0,0 +1,133 @@ +// ── Flow Definition ── + +export type Flow = { + id: string; + name: string; + description: string; + trigger: FlowTrigger; + groups: Group[]; + edges: Edge[]; + variables: VariableDefinition[]; + version: number; + status: 'draft' | 'published'; +}; + +export type FlowTrigger = + | { type: 'message'; conditions?: { keywords?: string[]; regex?: string } } + | { type: 'default' }; + +export type VariableDefinition = { + id: string; + name: string; + type: 'string' | 'number' | 'boolean' | 'object' | 'array'; + defaultValue?: any; +}; + +// ── Groups & Edges ── + +export type Group = { + id: string; + title: string; + blocks: Block[]; +}; + +export type Edge = { + id: string; + from: { blockId: string; conditionId?: string }; + to: { groupId: string; blockId?: string }; +}; + +// ── Blocks ── + +export type Block = + | MessageBlock + | InputBlock + | ConditionBlock + | SetVariableBlock + | ToolCallBlock + | AIBlock + | JumpBlock; + +export type MessageBlock = { + id: string; + type: 'message'; + content: + | { format: 'text'; text: string } + | { format: 'buttons'; text: string; buttons: { id: string; title: string }[] } + | { format: 'list'; text: string; buttonText: string; sections: { title: string; rows: { id: string; title: string; description?: string }[] }[] }; +}; + +export type InputBlock = { + id: string; + type: 'input'; + inputType: 'text' | 'interactive_reply' | 'any'; + variableId: string; + validation?: { regex?: string; errorMessage?: string }; +}; + +export type ConditionBlock = { + id: string; + type: 'condition'; + conditions: { + id: string; + variableId: string; + operator: 'equals' | 'contains' | 'exists' | 'not_exists' | 'gt' | 'lt' | 'starts_with'; + value?: string; + }[]; +}; + +export type SetVariableBlock = { + id: string; + type: 'set_variable'; + variableId: string; + value: string; + expression?: 'extract_id' | 'date_tomorrow' | 'date_day_after'; +}; + +export type ToolCallBlock = { + id: string; + type: 'tool_call'; + toolName: string; + inputs: Record; + outputVariableId?: string; +}; + +export type AIBlock = { + id: string; + type: 'ai'; + prompt: string; + outputVariableId?: string; + sendToPatient: boolean; +}; + +export type JumpBlock = { + id: string; + type: 'jump'; + targetGroupId: string; +}; + +// ── Session State ── + +export type FlowSession = { + flowId: string; + currentGroupId: string; + currentBlockIndex: number; + variables: Record; + startedAt: number; + lastActiveAt: number; +}; + +// ── Tool Registry ── + +export type ToolHandler = ( + inputs: Record, + context: ToolContext, +) => Promise; + +export type ToolContext = { + phone: string; + session: FlowSession; + provider: import('../providers/messaging-provider.interface').MessagingProvider; + platform: import('../../platform/platform-graphql.service').PlatformGraphqlService; + auth: string; +}; diff --git a/src/messaging/flow/flow-variable.service.ts b/src/messaging/flow/flow-variable.service.ts new file mode 100644 index 0000000..0a20066 --- /dev/null +++ b/src/messaging/flow/flow-variable.service.ts @@ -0,0 +1,44 @@ +import { Injectable } from '@nestjs/common'; + +@Injectable() +export class FlowVariableService { + // Replace {{variableName}} with values from session variables + interpolate(template: string, variables: Record): string { + return template.replace(/\{\{(\w+)\}\}/g, (match, name) => { + const value = variables[name]; + if (value === undefined || value === null) return match; // keep placeholder if unresolved + if (typeof value === 'object') return JSON.stringify(value); + return String(value); + }); + } + + // Interpolate all string values in an object + interpolateObject(obj: Record, variables: Record): Record { + const result: Record = {}; + for (const [key, value] of Object.entries(obj)) { + result[key] = this.interpolate(value, variables); + } + return result; + } + + // Execute expressions for SetVariableBlock + evaluateExpression(expression: string, value: string, variables: Record): any { + switch (expression) { + case 'extract_id': { + // Extract UUID from "doc:{uuid}:{name}" or "dept:{name}" or "slot:{id}:{datetime}" + const parts = value.split(':'); + return parts.length >= 2 ? parts[1] : value; + } + case 'date_tomorrow': { + const d = new Date(Date.now() + 86400000); + return d.toISOString().split('T')[0]; + } + case 'date_day_after': { + const d = new Date(Date.now() + 2 * 86400000); + return d.toISOString().split('T')[0]; + } + default: + return this.interpolate(value, variables); + } + } +} diff --git a/src/messaging/flow/tool-registry.ts b/src/messaging/flow/tool-registry.ts new file mode 100644 index 0000000..491322d --- /dev/null +++ b/src/messaging/flow/tool-registry.ts @@ -0,0 +1,217 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { PlatformGraphqlService } from '../../platform/platform-graphql.service'; +import { CallerResolutionService } from '../../caller/caller-resolution.service'; +import { DOCTOR_VISIT_SLOTS_FRAGMENT, normalizeDoctors } from '../../shared/doctor-utils'; +import type { ToolHandler, ToolContext } from './flow-types'; +import type { ListSection, InteractiveButton } from '../types'; + +@Injectable() +export class ToolRegistry { + private readonly logger = new Logger(ToolRegistry.name); + private readonly tools: Map = new Map(); + + constructor( + private platform: PlatformGraphqlService, + private caller: CallerResolutionService, + ) { + this.registerDefaults(); + } + + register(name: string, handler: ToolHandler) { + this.tools.set(name, handler); + } + + async execute(name: string, inputs: Record, context: ToolContext): Promise { + const handler = this.tools.get(name); + if (!handler) { + this.logger.error(`[TOOL] Unknown tool: ${name}`); + return { error: `Unknown tool: ${name}` }; + } + this.logger.log(`[TOOL] ${name} inputs=${JSON.stringify(inputs).substring(0, 200)}`); + const result = await handler(inputs, context); + this.logger.log(`[TOOL] ${name} result=${JSON.stringify(result).substring(0, 200)}`); + return result; + } + + private registerDefaults() { + this.register('resolve_caller', async (inputs, ctx) => { + const phone = inputs.phone ?? ctx.phone; + const resolved = await this.caller.resolve(phone, ctx.auth).catch(() => null); + return resolved ?? { isNew: true, leadId: '', patientId: '', phone }; + }); + + this.register('send_department_list', async (_inputs, ctx) => { + const data = await this.platform.query( + `{ doctors(first: 50) { edges { node { department } } } }`, + ); + const departments = [...new Set( + data.doctors.edges.map((e: any) => e.node.department).filter(Boolean), + )] as string[]; + + if (!departments.length) return { sent: false, message: 'No departments available.' }; + + const sections: ListSection[] = [{ + title: 'Departments', + rows: departments.slice(0, 10).map(d => ({ + id: `dept:${d}`, + title: d.substring(0, 24), + })), + }]; + await ctx.provider.sendList(ctx.phone, 'Which department would you like to visit?', 'View Departments', sections); + return { sent: true, departments }; + }); + + this.register('send_doctor_list', async (inputs, ctx) => { + const department = inputs.department; + const data = await this.platform.query( + `{ doctors(first: 50) { edges { node { + id fullName { firstName lastName } + department specialty + consultationFeeNew { amountMicros currencyCode } + ${DOCTOR_VISIT_SLOTS_FRAGMENT} + } } } }`, + ); + const allDocs = normalizeDoctors(data.doctors.edges.map((e: any) => e.node)); + const deptDocs = allDocs.filter((d: any) => + d.department?.toLowerCase() === department.toLowerCase(), + ); + + if (!deptDocs.length) return { sent: false, message: `No doctors found in ${department}.` }; + + const sections: ListSection[] = [{ + title: department.substring(0, 24), + rows: deptDocs.slice(0, 10).map((d: any) => { + const docName = `Dr. ${d.fullName?.firstName ?? ''} ${d.fullName?.lastName ?? ''}`.trim(); + const fee = d.consultationFeeNew?.amountMicros + ? `₹${(d.consultationFeeNew.amountMicros / 1000000).toFixed(0)}` + : ''; + return { + id: `doc:${d.id}:${docName}`, + title: docName.substring(0, 24), + description: fee ? `${d.specialty ?? department} — ${fee}` : (d.specialty ?? department), + }; + }), + }]; + await ctx.provider.sendList(ctx.phone, `Doctors in ${department}:`, 'View Doctors', sections); + return { sent: true, count: deptDocs.length }; + }); + + this.register('send_slot_list', async (inputs, ctx) => { + const { doctorId, doctorName, date } = inputs; + const targetDate = date ?? new Date(Date.now() + 86400000).toISOString().split('T')[0]; + const dayNames = ['SUNDAY', 'MONDAY', 'TUESDAY', 'WEDNESDAY', 'THURSDAY', 'FRIDAY', 'SATURDAY']; + const targetDay = dayNames[new Date(targetDate + 'T00:00:00+05:30').getDay()]; + + const data = await this.platform.query( + `{ doctors(first: 50) { edges { node { + id fullName { firstName lastName } + ${DOCTOR_VISIT_SLOTS_FRAGMENT} + } } } }`, + ); + const rawDocs = data.doctors.edges.map((e: any) => e.node); + const doctor = rawDocs.find((d: any) => d.id === doctorId); + if (!doctor) return { sent: false, message: 'Doctor not found.' }; + + const rawSlots = doctor.visitSlots?.edges?.map((e: any) => e.node) ?? []; + const daySlots = rawSlots.filter((s: any) => s.dayOfWeek === targetDay); + + if (!daySlots.length) { + const dayLabel = targetDay.charAt(0) + targetDay.slice(1).toLowerCase(); + return { sent: false, message: `${doctorName} is not available on ${dayLabel} (${targetDate}).` }; + } + + const timeSlots: { time: string; clinic: string }[] = []; + for (const ds of daySlots) { + const startHour = parseInt(ds.startTime?.split(':')[0] ?? '9', 10); + const endHour = parseInt(ds.endTime?.split(':')[0] ?? '17', 10); + const clinicName = ds.clinic?.clinicName ?? ''; + for (let h = startHour; h < endHour && timeSlots.length < 10; h++) { + timeSlots.push({ time: `${String(h).padStart(2, '0')}:00`, clinic: clinicName }); + } + } + + if (!timeSlots.length) return { sent: false, message: `No slots for ${doctorName} on ${targetDate}.` }; + + const sections: ListSection[] = [{ + title: targetDate, + rows: timeSlots.map(s => ({ + id: `slot:${doctorId}:${targetDate}T${s.time}:00`, + title: s.time, + description: s.clinic || undefined, + })), + }]; + await ctx.provider.sendList(ctx.phone, `Available slots for ${doctorName}:`, 'View Slots', sections); + return { sent: true, slots: timeSlots.length }; + }); + + this.register('send_confirm_buttons', async (inputs, ctx) => { + const buttons: InteractiveButton[] = [ + { id: 'confirm_booking', title: 'Confirm' }, + { id: 'cancel_booking', title: 'Cancel' }, + ]; + await ctx.provider.sendButtons(ctx.phone, inputs.summary, buttons); + return { sent: true }; + }); + + this.register('book_appointment', async (inputs, ctx) => { + const { patientName, phoneNumber, department, doctorName, scheduledAt, reason } = inputs; + const cleanPhone = (phoneNumber ?? ctx.phone).replace(/[^0-9]/g, '').slice(-10); + + // Conflict check + const bookingDate = scheduledAt.split('T')[0]; + const existingAppts = await this.platform.query( + `{ appointments(first: 50, filter: { doctorName: { eq: "${doctorName}" } }, orderBy: [{ scheduledAt: AscNullsLast }]) { edges { node { id scheduledAt status patientName } } } }`, + ).catch(() => ({ appointments: { edges: [] } })); + + const conflicts = existingAppts.appointments.edges + .map((e: any) => e.node) + .filter((a: any) => a.status === 'SCHEDULED' && a.scheduledAt?.startsWith(bookingDate)); + + const slotConflicts = conflicts.filter((a: any) => a.scheduledAt === scheduledAt); + if (slotConflicts.length >= 3) { + return { booked: false, message: `${doctorName} is fully booked at this time.` }; + } + + // Create lead/patient if new + const resolved = await this.caller.resolve(cleanPhone, ctx.auth).catch(() => null); + if (resolved?.isNew && patientName) { + const firstName = patientName.split(' ')[0]; + const lastName = patientName.split(' ').slice(1).join(' ') || ''; + try { + const p = await this.platform.query( + `mutation($data: PatientCreateInput!) { createPatient(data: $data) { id } }`, + { data: { fullName: { firstName, lastName }, phones: { primaryPhoneNumber: `+91${cleanPhone}` }, patientType: 'NEW' } }, + ); + const patientId = p?.createPatient?.id; + await this.platform.query( + `mutation($data: LeadCreateInput!) { createLead(data: $data) { id } }`, + { data: { name: `WhatsApp — ${patientName}`, contactName: { firstName, lastName }, contactPhone: { primaryPhoneNumber: `+91${cleanPhone}` }, source: 'WHATSAPP', status: 'NEW', interestedService: department, ...(patientId ? { patientId } : {}) } }, + ); + } catch {} + } + + // Book + const result = await this.platform.query( + `mutation($data: AppointmentCreateInput!) { createAppointment(data: $data) { id } }`, + { data: { name: `WhatsApp Booking — ${patientName} (${department})`, scheduledAt, status: 'SCHEDULED', doctorName, department, reasonForVisit: reason ?? 'General Consultation' } }, + ); + const id = result?.createAppointment?.id; + if (id) { + return { booked: true, appointmentId: id, reference: id.substring(0, 8) }; + } + return { booked: false, message: 'Booking failed.' }; + }); + + this.register('lookup_appointments', async (inputs, ctx) => { + const resolved = await this.caller.resolve(ctx.phone, ctx.auth).catch(() => null); + if (!resolved?.patientId) return { appointments: [], message: 'No patient record found.' }; + + const data = await this.platform.query( + `{ appointments(first: 10, filter: { patientId: { eq: "${resolved.patientId}" } }, orderBy: [{ scheduledAt: DescNullsLast }]) { edges { node { + id scheduledAt status doctorName department reasonForVisit + } } } }`, + ); + return { appointments: data.appointments.edges.map((e: any) => e.node) }; + }); + } +} diff --git a/src/messaging/messaging.module.ts b/src/messaging/messaging.module.ts index da886e6..966d76d 100644 --- a/src/messaging/messaging.module.ts +++ b/src/messaging/messaging.module.ts @@ -7,6 +7,11 @@ import { MessagingService } from './messaging.service'; import { MessagingConversationService } from './messaging-conversation.service'; import { GupshupProvider } from './providers/gupshup.provider'; import { MessagingProvider } from './providers/messaging-provider.interface'; +import { FlowExecutionService } from './flow/flow-execution.service'; +import { FlowSessionService } from './flow/flow-session.service'; +import { FlowStoreService } from './flow/flow-store.service'; +import { FlowVariableService } from './flow/flow-variable.service'; +import { ToolRegistry } from './flow/tool-registry'; @Module({ imports: [PlatformModule, CallerResolutionModule], @@ -14,11 +19,14 @@ import { MessagingProvider } from './providers/messaging-provider.interface'; providers: [ MessagingService, MessagingConversationService, + FlowExecutionService, + FlowSessionService, + FlowStoreService, + FlowVariableService, + ToolRegistry, { provide: MessagingProvider, useFactory: (config: ConfigService) => { - // Future: switch on config.get('messaging.provider') to return - // OzonetelProvider, MetaCloudProvider, etc. return new GupshupProvider(config); }, inject: [ConfigService], diff --git a/src/messaging/messaging.service.ts b/src/messaging/messaging.service.ts index ee8e9a6..42d9cb7 100644 --- a/src/messaging/messaging.service.ts +++ b/src/messaging/messaging.service.ts @@ -1,9 +1,10 @@ -import { Injectable, Logger } from '@nestjs/common'; +import { Injectable, Inject, Logger, Optional } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { generateText, tool, stepCountIs } from 'ai'; import { z } from 'zod'; import { MessagingProvider } from './providers/messaging-provider.interface'; import { MessagingConversationService } from './messaging-conversation.service'; +import { FlowExecutionService } from './flow/flow-execution.service'; import { CallerResolutionService } from '../caller/caller-resolution.service'; import { CallerContextService } from '../caller/caller-context.service'; import { PlatformGraphqlService } from '../platform/platform-graphql.service'; @@ -27,6 +28,7 @@ export class MessagingService { private callerContext: CallerContextService, private platform: PlatformGraphqlService, private aiConfig: AiConfigService, + @Optional() private flowExecution: FlowExecutionService, ) { const cfg = aiConfig.getConfig(); this.aiModel = createAiModel({ @@ -51,6 +53,14 @@ export class MessagingService { const replyId = message.interactiveReply?.id; this.logger.log(`[WA] Inbound from ${phone} (${name}): ${text.substring(0, 100)}${replyId ? ` [reply_id=${replyId}]` : ''}`); + // Delegate to flow engine if published flows exist + if (this.flowExecution?.hasFlows()) { + this.logger.log(`[WA] Delegating to flow engine`); + await this.flowExecution.handleMessage(message); + return; + } + + // Fallback: hardcoded AI chat (legacy — will be removed once flows are validated) if (!this.aiModel) { await this.provider.sendText(phone, 'Our assistant is temporarily unavailable. Please call us directly.'); return;