From f1c026cf7a40d23122b33abed99d189ec8f503c1 Mon Sep 17 00:00:00 2001 From: saridsa2 Date: Mon, 20 Apr 2026 20:48:55 +0530 Subject: [PATCH] fix(flow): serialize per-phone execution to prevent concurrent flows Two messages arriving close together could start two parallel flow executions for the same phone. The second would create a new session while the first was mid-AI-block, causing duplicate greetings and race conditions. Per-phone async lock ensures sequential execution. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/messaging/flow/flow-execution.service.ts | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/messaging/flow/flow-execution.service.ts b/src/messaging/flow/flow-execution.service.ts index 770aad5..b31c797 100644 --- a/src/messaging/flow/flow-execution.service.ts +++ b/src/messaging/flow/flow-execution.service.ts @@ -44,9 +44,28 @@ export class FlowExecutionService { this.auth = apiKey ? `Bearer ${apiKey}` : ''; } + // Per-phone lock to prevent concurrent flow executions + private readonly locks = new Map>(); + async handleMessage(message: NormalizedMessage): Promise { const { phone } = message; + // Serialize executions per phone — prevent two concurrent flows + const existing = this.locks.get(phone); + const execute = async () => { + if (existing) await existing.catch(() => {}); + await this._handleMessage(message); + }; + const promise = execute(); + this.locks.set(phone, promise); + await promise.finally(() => { + if (this.locks.get(phone) === promise) this.locks.delete(phone); + }); + } + + private async _handleMessage(message: NormalizedMessage): Promise { + const { phone } = message; + // 1. Load existing session or start new flow let session = await this.sessions.load(phone); let flow: Flow | null = null;