From 028d7d6b0aaaea90c306097b59581dfe8eb23dab Mon Sep 17 00:00:00 2001 From: waleed Date: Wed, 10 Jun 2026 14:49:40 -0700 Subject: [PATCH 01/12] improvement(db): add opt-in read-replica client and harden migration runner --- apps/sim/.env.example | 4 + apps/sim/app/api/logs/export/route.ts | 4 +- apps/sim/app/api/logs/stats/route.ts | 6 +- apps/sim/app/api/v1/audit-logs/query.ts | 6 +- apps/sim/hooks/use-inline-rename.ts | 2 +- apps/sim/lib/billing/core/usage-log.test.ts | 4 + apps/sim/lib/billing/core/usage-log.ts | 8 +- apps/sim/lib/core/config/env.ts | 1 + .../sim/lib/data-drains/sources/audit-logs.ts | 4 +- .../lib/data-drains/sources/copilot-chats.ts | 6 +- .../lib/data-drains/sources/copilot-runs.ts | 4 +- apps/sim/lib/data-drains/sources/helpers.ts | 4 +- apps/sim/lib/data-drains/sources/job-logs.ts | 4 +- .../lib/data-drains/sources/workflow-logs.ts | 4 +- apps/sim/lib/logs/list-logs.test.ts | 3 + apps/sim/lib/logs/list-logs.ts | 6 +- packages/db/.env.example | 3 +- packages/db/db.ts | 25 +++- packages/db/scripts/migrate.ts | 121 ++++++++++++++++-- packages/testing/src/mocks/database.mock.ts | 30 +++-- 20 files changed, 196 insertions(+), 53 deletions(-) diff --git a/apps/sim/.env.example b/apps/sim/.env.example index 180e9b56e98..d26ff64e52f 100644 --- a/apps/sim/.env.example +++ b/apps/sim/.env.example @@ -1,5 +1,9 @@ # Database (Required) DATABASE_URL="postgresql://postgres:your_password@localhost:5432/simstudio" +# Optional read-replica connection string for offloading heavy read paths +# (logs listing, audit logs, dashboard aggregations). Reads fall back to +# DATABASE_URL when unset. +# DATABASE_REPLICA_URL="" # Authentication (Required unless DISABLE_AUTH=true) BETTER_AUTH_SECRET=your_secret_key # Use `openssl rand -hex 32` to generate, or visit https://www.better-auth.com/docs/installation diff --git a/apps/sim/app/api/logs/export/route.ts b/apps/sim/app/api/logs/export/route.ts index f85816e26a1..560eee71618 100644 --- a/apps/sim/app/api/logs/export/route.ts +++ b/apps/sim/app/api/logs/export/route.ts @@ -1,4 +1,4 @@ -import { db } from '@sim/db' +import { dbReplica } from '@sim/db' import { permissions, workflow, workflowExecutionLogs } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { and, desc, eq, sql } from 'drizzle-orm' @@ -80,7 +80,7 @@ export const GET = withRouteHandler(async (request: NextRequest) => { let offset = 0 try { while (true) { - const rows = await db + const rows = await dbReplica .select(selectColumns) .from(workflowExecutionLogs) .leftJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id)) diff --git a/apps/sim/app/api/logs/stats/route.ts b/apps/sim/app/api/logs/stats/route.ts index 930e2e36d39..359a8b7505d 100644 --- a/apps/sim/app/api/logs/stats/route.ts +++ b/apps/sim/app/api/logs/stats/route.ts @@ -1,4 +1,4 @@ -import { db } from '@sim/db' +import { dbReplica } from '@sim/db' import { permissions, workflow, workflowExecutionLogs } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { and, eq, sql } from 'drizzle-orm' @@ -48,7 +48,7 @@ export const GET = withRouteHandler(async (request: NextRequest) => { const commonFilters = buildFilterConditions(params, { useSimpleLevelFilter: true }) const whereCondition = commonFilters ? and(workspaceFilter, commonFilters) : workspaceFilter - const boundsQuery = await db + const boundsQuery = await dbReplica .select({ minTime: sql`MIN(${workflowExecutionLogs.startedAt})`, maxTime: sql`MAX(${workflowExecutionLogs.startedAt})`, @@ -83,7 +83,7 @@ export const GET = withRouteHandler(async (request: NextRequest) => { const segmentMs = Math.max(60000, Math.floor(totalMs / params.segmentCount)) const startTimeIso = startTime.toISOString() - const statsQuery = await db + const statsQuery = await dbReplica .select({ workflowId: sql`COALESCE(${workflowExecutionLogs.workflowId}, 'deleted')`, workflowName: sql`COALESCE(${workflow.name}, 'Deleted Workflow')`, diff --git a/apps/sim/app/api/v1/audit-logs/query.ts b/apps/sim/app/api/v1/audit-logs/query.ts index 8a4173be863..d1bdcbf79d6 100644 --- a/apps/sim/app/api/v1/audit-logs/query.ts +++ b/apps/sim/app/api/v1/audit-logs/query.ts @@ -1,5 +1,5 @@ import { AuditResourceType } from '@sim/audit' -import { db } from '@sim/db' +import { dbReplica } from '@sim/db' import { auditLog, workspace } from '@sim/db/schema' import type { InferSelectModel } from 'drizzle-orm' import { and, desc, eq, gte, ilike, inArray, isNull, lt, lte, or, type SQL, sql } from 'drizzle-orm' @@ -73,7 +73,7 @@ export function buildFilterConditions(params: AuditLogFilterParams): SQL { - const rows = await db + const rows = await dbReplica .select({ id: workspace.id }) .from(workspace) .where(eq(workspace.organizationId, organizationId)) @@ -156,7 +156,7 @@ export async function queryAuditLogs( if (cursorCondition) allConditions.push(cursorCondition) } - const rows = await db + const rows = await dbReplica .select() .from(auditLog) .where(allConditions.length > 0 ? and(...allConditions) : undefined) diff --git a/apps/sim/hooks/use-inline-rename.ts b/apps/sim/hooks/use-inline-rename.ts index 87c819e1a3b..5f0cba4fabf 100644 --- a/apps/sim/hooks/use-inline-rename.ts +++ b/apps/sim/hooks/use-inline-rename.ts @@ -9,7 +9,7 @@ interface UseInlineRenameProps { * `mutateAsync(...)`) — NOT a fire-and-forget `mutate(...)` — so `isSaving` * spans the in-flight request and a rejection can revive the edit session. */ - onSave: (id: string, newName: string) => void | Promise + onSave: (id: string, newName: string) => undefined | Promise } /** diff --git a/apps/sim/lib/billing/core/usage-log.test.ts b/apps/sim/lib/billing/core/usage-log.test.ts index 448cb2eab42..d88e7847382 100644 --- a/apps/sim/lib/billing/core/usage-log.test.ts +++ b/apps/sim/lib/billing/core/usage-log.test.ts @@ -28,6 +28,10 @@ vi.mock('@sim/db', () => ({ insert: mockInsert, transaction: mockTransaction, }, + dbReplica: { + insert: mockInsert, + transaction: mockTransaction, + }, })) vi.mock('@sim/db/schema', () => ({ diff --git a/apps/sim/lib/billing/core/usage-log.ts b/apps/sim/lib/billing/core/usage-log.ts index 5d654723d49..a7efa4d52ef 100644 --- a/apps/sim/lib/billing/core/usage-log.ts +++ b/apps/sim/lib/billing/core/usage-log.ts @@ -1,5 +1,5 @@ import { createHash } from 'node:crypto' -import { db } from '@sim/db' +import { db, dbReplica } from '@sim/db' import { usageLog, workspace } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { toError } from '@sim/utils/errors' @@ -579,7 +579,7 @@ export async function getUserUsageLogs( } if (cursor) { - const cursorLog = await db + const cursorLog = await dbReplica .select({ createdAt: usageLog.createdAt }) .from(usageLog) .where(eq(usageLog.id, cursor)) @@ -592,7 +592,7 @@ export async function getUserUsageLogs( } } - const logs = await db + const logs = await dbReplica .select() .from(usageLog) .where(and(...conditions)) @@ -621,7 +621,7 @@ export async function getUserUsageLogs( if (startDate) summaryConditions.push(gte(usageLog.createdAt, startDate)) if (endDate) summaryConditions.push(lte(usageLog.createdAt, endDate)) - const summaryResult = await db + const summaryResult = await dbReplica .select({ source: usageLog.source, totalCost: sql`SUM(${usageLog.cost})`, diff --git a/apps/sim/lib/core/config/env.ts b/apps/sim/lib/core/config/env.ts index 87af1a75f35..7ba9c92511d 100644 --- a/apps/sim/lib/core/config/env.ts +++ b/apps/sim/lib/core/config/env.ts @@ -19,6 +19,7 @@ export const env = createEnv({ server: { // Core Database & Authentication DATABASE_URL: z.string().url(), // Primary database connection string + DATABASE_REPLICA_URL: z.string().url().optional(), // Read-replica connection string; opt-in reads fall back to the primary when unset BETTER_AUTH_URL: z.string().url(), // Base URL for Better Auth service BETTER_AUTH_SECRET: z.string().min(32), // Secret key for Better Auth JWT signing DISABLE_REGISTRATION: z.boolean().optional(), // Flag to disable new user registration diff --git a/apps/sim/lib/data-drains/sources/audit-logs.ts b/apps/sim/lib/data-drains/sources/audit-logs.ts index fbbbbf77aa1..070aef9886e 100644 --- a/apps/sim/lib/data-drains/sources/audit-logs.ts +++ b/apps/sim/lib/data-drains/sources/audit-logs.ts @@ -1,4 +1,4 @@ -import { db } from '@sim/db' +import { dbReplica } from '@sim/db' import { auditLog } from '@sim/db/schema' import { and, inArray, isNull, or, sql } from 'drizzle-orm' import { @@ -35,7 +35,7 @@ async function* pages(input: SourcePageInput): AsyncIterable { while (!input.signal.aborted) { const cursorClause = timeCursorPredicate(auditLog.createdAt, auditLog.id, cursor) - const rows = await db + const rows = await dbReplica .select() .from(auditLog) .where(and(scopeClause, cursorClause)) diff --git a/apps/sim/lib/data-drains/sources/copilot-chats.ts b/apps/sim/lib/data-drains/sources/copilot-chats.ts index 6bde4632dca..c6b406492d4 100644 --- a/apps/sim/lib/data-drains/sources/copilot-chats.ts +++ b/apps/sim/lib/data-drains/sources/copilot-chats.ts @@ -1,4 +1,4 @@ -import { db } from '@sim/db' +import { dbReplica } from '@sim/db' import { copilotChats, copilotMessages } from '@sim/db/schema' import { and, asc, inArray, isNull, sql } from 'drizzle-orm' import { @@ -55,7 +55,7 @@ async function* pages(input: SourcePageInput): AsyncIterable { while (!input.signal.aborted) { const cursorClause = timeCursorPredicate(copilotChats.createdAt, copilotChats.id, cursor) - const metaRows = await db + const metaRows = await dbReplica .select(chatColumns) .from(copilotChats) .where(and(inArray(copilotChats.workspaceId, workspaceIds), cursorClause)) @@ -65,7 +65,7 @@ async function* pages(input: SourcePageInput): AsyncIterable { if (metaRows.length === 0) return const chatIds = metaRows.map((r) => r.id) - const messageRows = await db + const messageRows = await dbReplica .select({ chatId: copilotMessages.chatId, content: copilotMessages.content }) .from(copilotMessages) .where(and(inArray(copilotMessages.chatId, chatIds), isNull(copilotMessages.deletedAt))) diff --git a/apps/sim/lib/data-drains/sources/copilot-runs.ts b/apps/sim/lib/data-drains/sources/copilot-runs.ts index 4b2b0503ae7..7fe2d8ac22a 100644 --- a/apps/sim/lib/data-drains/sources/copilot-runs.ts +++ b/apps/sim/lib/data-drains/sources/copilot-runs.ts @@ -1,4 +1,4 @@ -import { db } from '@sim/db' +import { dbReplica } from '@sim/db' import { copilotRuns } from '@sim/db/schema' import { and, inArray, isNotNull } from 'drizzle-orm' import { @@ -24,7 +24,7 @@ async function* pages(input: SourcePageInput): AsyncIterable { while (!input.signal.aborted) { const cursorClause = timeCursorPredicate(copilotRuns.completedAt, copilotRuns.id, cursor) - const rows = await db + const rows = await dbReplica .select() .from(copilotRuns) .where( diff --git a/apps/sim/lib/data-drains/sources/helpers.ts b/apps/sim/lib/data-drains/sources/helpers.ts index 33ff90ab42c..c04bd2825bf 100644 --- a/apps/sim/lib/data-drains/sources/helpers.ts +++ b/apps/sim/lib/data-drains/sources/helpers.ts @@ -1,4 +1,4 @@ -import { db } from '@sim/db' +import { dbReplica } from '@sim/db' import { workspace } from '@sim/db/schema' import { eq } from 'drizzle-orm' @@ -7,7 +7,7 @@ import { eq } from 'drizzle-orm' * sources whose underlying tables are workspace-scoped rather than org-scoped. */ export async function getOrganizationWorkspaceIds(organizationId: string): Promise { - const rows = await db + const rows = await dbReplica .select({ id: workspace.id }) .from(workspace) .where(eq(workspace.organizationId, organizationId)) diff --git a/apps/sim/lib/data-drains/sources/job-logs.ts b/apps/sim/lib/data-drains/sources/job-logs.ts index 789118e6e67..43d33bd15ba 100644 --- a/apps/sim/lib/data-drains/sources/job-logs.ts +++ b/apps/sim/lib/data-drains/sources/job-logs.ts @@ -1,4 +1,4 @@ -import { db } from '@sim/db' +import { dbReplica } from '@sim/db' import { jobExecutionLogs } from '@sim/db/schema' import { and, inArray, isNotNull } from 'drizzle-orm' import { @@ -24,7 +24,7 @@ async function* pages(input: SourcePageInput): AsyncIterable { while (!input.signal.aborted) { const cursorClause = timeCursorPredicate(jobExecutionLogs.endedAt, jobExecutionLogs.id, cursor) - const rows = await db + const rows = await dbReplica .select() .from(jobExecutionLogs) .where( diff --git a/apps/sim/lib/data-drains/sources/workflow-logs.ts b/apps/sim/lib/data-drains/sources/workflow-logs.ts index b64ce154791..562e5119577 100644 --- a/apps/sim/lib/data-drains/sources/workflow-logs.ts +++ b/apps/sim/lib/data-drains/sources/workflow-logs.ts @@ -1,4 +1,4 @@ -import { db } from '@sim/db' +import { dbReplica } from '@sim/db' import { workflowExecutionLogs } from '@sim/db/schema' import { and, inArray, isNotNull } from 'drizzle-orm' import { MATERIALIZE_CONCURRENCY, mapWithConcurrency } from '@/lib/core/utils/concurrency' @@ -33,7 +33,7 @@ async function* pages(input: SourcePageInput): AsyncIterable { cursor ) - const rows = await db + const rows = await dbReplica .select() .from(workflowExecutionLogs) .where( diff --git a/apps/sim/lib/logs/list-logs.test.ts b/apps/sim/lib/logs/list-logs.test.ts index 95fdda6bea0..55e2551b650 100644 --- a/apps/sim/lib/logs/list-logs.test.ts +++ b/apps/sim/lib/logs/list-logs.test.ts @@ -10,6 +10,9 @@ vi.mock('@sim/db', () => ({ db: { select: selectMock, }, + dbReplica: { + select: selectMock, + }, })) // Local drizzle-orm mock: the global mock's `sql` lacks `.as()` and the chain diff --git a/apps/sim/lib/logs/list-logs.ts b/apps/sim/lib/logs/list-logs.ts index 131092fcd6f..0165d60ad73 100644 --- a/apps/sim/lib/logs/list-logs.ts +++ b/apps/sim/lib/logs/list-logs.ts @@ -1,4 +1,4 @@ -import { db } from '@sim/db' +import { dbReplica } from '@sim/db' import { jobExecutionLogs, pausedExecutions, @@ -186,7 +186,7 @@ export async function listLogs(params: ListLogsParams, userId: string): Promise< levelList.length > 0 && !levelList.some((l) => l === 'error' || l === 'info') const includeJobLogs = !hasWorkflowSpecificFilters && !triggersExcludeJobs && !levelExcludesJobs - const workflowQuery = db + const workflowQuery = dbReplica .select({ id: workflowExecutionLogs.id, workflowId: workflowExecutionLogs.workflowId, @@ -313,7 +313,7 @@ export async function listLogs(params: ListLogsParams, userId: string): Promise< } const jobQuery = includeJobLogs - ? db + ? dbReplica .select({ id: jobExecutionLogs.id, executionId: jobExecutionLogs.executionId, diff --git a/packages/db/.env.example b/packages/db/.env.example index 14459e61f51..34e85c68f89 100644 --- a/packages/db/.env.example +++ b/packages/db/.env.example @@ -1,5 +1,6 @@ # Database connection used by @sim/db scripts (drizzle-kit generate, # db:migrate, register-sso-provider, etc.). Must match DATABASE_URL in -# apps/sim/.env and apps/realtime/.env. +# apps/sim/.env and apps/realtime/.env. Migrations always run against the +# primary — never set a replica URL here. DATABASE_URL="postgresql://postgres:postgres@localhost:5432/simstudio" diff --git a/packages/db/db.ts b/packages/db/db.ts index 9e5597fb57b..dad18ba8c6a 100644 --- a/packages/db/db.ts +++ b/packages/db/db.ts @@ -7,12 +7,31 @@ if (!connectionString) { throw new Error('Missing DATABASE_URL environment variable') } -const postgresClient = postgres(connectionString, { +const poolOptions = { prepare: false, idle_timeout: 20, connect_timeout: 30, - max: 15, onnotice: () => {}, -}) +} as const + +const postgresClient = postgres(connectionString, { ...poolOptions, max: 15 }) export const db = drizzle(postgresClient, { schema }) + +/** + * Read-replica client — EXPLICIT OPT-IN. + * + * Import `dbReplica` only for reads that tolerate bounded staleness and have no + * read-your-writes dependency: logs listing/search, audit logs, dashboard + * aggregations, bulk exports. Never use it for auth/session lookups, workflow + * state, billing-limit enforcement, or any read inside a write-reconciling + * flow. + * + * Falls back to the primary client when `DATABASE_REPLICA_URL` is unset (dev, + * self-hosted, realtime), so call sites never need to branch. + */ +const replicaUrl = process.env.DATABASE_REPLICA_URL + +export const dbReplica: typeof db = replicaUrl + ? drizzle(postgres(replicaUrl, { ...poolOptions, max: 10 }), { schema }) + : db diff --git a/packages/db/scripts/migrate.ts b/packages/db/scripts/migrate.ts index ed0af3b1c4d..f3fd505b453 100644 --- a/packages/db/scripts/migrate.ts +++ b/packages/db/scripts/migrate.ts @@ -1,3 +1,4 @@ +import { sleep } from '@sim/utils/helpers' import { drizzle } from 'drizzle-orm/postgres-js' import { migrate } from 'drizzle-orm/postgres-js/migrator' import postgres from 'postgres' @@ -12,10 +13,13 @@ import postgres from 'postgres' * `CREATE INDEX CONCURRENTLY` cannot run inside a transaction block. * * So, after generating a migration that adds an index on a large/hot table, edit - * the generated SQL to end drizzle's transaction first, then build concurrently - * and idempotently: + * the generated SQL to end drizzle's transaction first, clear the session + * `lock_timeout` (set below for fail-fast DDL; it would cancel CONCURRENTLY's + * legitimate waits on old transactions and leave an INVALID index), then build + * concurrently and idempotently: * * COMMIT;--> statement-breakpoint + * SET lock_timeout = 0;--> statement-breakpoint * CREATE INDEX CONCURRENTLY IF NOT EXISTS "idx_name" ON "table" (...); * * Notes: @@ -26,7 +30,8 @@ import postgres from 'postgres' * failed CONCURRENTLY build is safe — fresh DBs and re-applies both work. * - CONCURRENTLY only takes a SHARE UPDATE EXCLUSIVE lock (allows reads/writes). * - Always validate on staging before prod; a failed CONCURRENTLY build can - * leave an INVALID index that must be dropped and rebuilt. + * leave an INVALID index that must be dropped and rebuilt — the + * `warnOnInvalidIndexes` check below logs any such index after every run. */ const url = process.env.DATABASE_URL @@ -48,21 +53,40 @@ const client = postgres(url, { max: 1, connect_timeout: 10 }) * state and die (e.g. `DROP TABLE "form"` → `table "form" does not exist`, * exit 1 / TaskFailedToStart). * - * A session-level `pg_advisory_lock` serializes runners: the first to acquire it - * migrates while the rest block, then each loser acquires the lock, re-reads - * `__drizzle_migrations`, finds nothing pending, and exits cleanly. Session locks - * auto-release if the connection drops, so a crashed runner never wedges the lock. + * Acquisition is a bounded `pg_try_advisory_lock` retry loop rather than a plain + * `pg_advisory_lock`: an unbounded wait meant one wedged runner silently hung + * every other deploy sidecar (and the whole ECS deployment) behind it. With a + * deadline, a stuck winner turns into a visible non-zero exit on the losers that + * the deploy orchestrator can retry or surface. Session locks auto-release if + * the connection drops, so a crashed runner never wedges the lock. */ const MIGRATION_LOCK_KEY = 4_961_002_270n +const LOCK_ACQUIRE_DEADLINE_MS = 10 * 60_000 +const LOCK_RETRY_INTERVAL_MS = 5_000 + +/** + * How long any single migration statement may QUEUE for a table lock before + * failing with SQLSTATE 55P03. Without this, DDL needing an AccessExclusiveLock + * (e.g. `DROP TABLE ... CASCADE`) queues indefinitely behind long-running reads + * — and every other query on that table queues behind the pending exclusive + * lock, stalling all reads/writes table-wide until the DDL gets its turn + * (observed in production: a ~15-minute full stall). Failing fast keeps the + * world unblocked; we retry below, then let the deploy retry. + */ +const DDL_LOCK_TIMEOUT = '5s' +const MAX_MIGRATE_ATTEMPTS = 3 +const MIGRATE_RETRY_DELAY_MS = 15_000 try { // statement_timeout=0: index builds (esp. CONCURRENTLY on large tables) can run // far longer than the app default; a migration must never be killed mid-build. await client`SET statement_timeout = 0` - await client`SELECT pg_advisory_lock(${MIGRATION_LOCK_KEY})` + await client`SET lock_timeout = ${DDL_LOCK_TIMEOUT}` + await acquireMigrationLock() try { - await migrate(drizzle(client), { migrationsFolder: './migrations' }) + await runMigrationsWithRetry() console.log('Migrations applied successfully.') + await warnOnInvalidIndexes() } finally { await releaseMigrationLock() } @@ -74,6 +98,85 @@ try { await client.end() } +/** + * Acquire the cross-process migration lock, failing loudly after the deadline + * instead of blocking forever behind a wedged runner. + */ +async function acquireMigrationLock(): Promise { + const deadline = Date.now() + LOCK_ACQUIRE_DEADLINE_MS + for (;;) { + const [{ locked }] = await client`SELECT pg_try_advisory_lock(${MIGRATION_LOCK_KEY}) AS locked` + if (locked) return + if (Date.now() >= deadline) { + throw new Error( + `Timed out after ${LOCK_ACQUIRE_DEADLINE_MS}ms waiting for the migration advisory lock; ` + + 'another runner is likely stuck mid-migration. Investigate before retrying.' + ) + } + await sleep(LOCK_RETRY_INTERVAL_MS) + } +} + +/** + * Run pending migrations, retrying when a statement loses the `lock_timeout` + * race (SQLSTATE 55P03). drizzle wraps each migration file in a transaction, so + * a lock-timeout failure rolls that file back atomically and the retry re-reads + * `__drizzle_migrations` and re-runs only what is still unapplied. + */ +async function runMigrationsWithRetry(): Promise { + for (let attempt = 1; ; attempt++) { + try { + await migrate(drizzle(client), { migrationsFolder: './migrations' }) + return + } catch (error) { + if (!isLockNotAvailable(error) || attempt >= MAX_MIGRATE_ATTEMPTS) throw error + console.warn( + `WARN: migration DDL hit lock_timeout (attempt ${attempt}/${MAX_MIGRATE_ATTEMPTS}); ` + + `retrying in ${MIGRATE_RETRY_DELAY_MS}ms.` + ) + await sleep(MIGRATE_RETRY_DELAY_MS) + // Re-assert: a migration file's post-COMMIT `SET lock_timeout = 0` (the + // CONCURRENTLY convention above) is session-level and would otherwise + // leak into the retry. + await client`SET lock_timeout = ${DDL_LOCK_TIMEOUT}` + } + } +} + +/** + * SQLSTATE 55P03 (`lock_not_available`) — raised when `lock_timeout` expires + * while a statement queues for a lock. + */ +function isLockNotAvailable(error: unknown): boolean { + return error instanceof Error && (error as { code?: string }).code === '55P03' +} + +/** + * A `CREATE INDEX CONCURRENTLY` that fails partway leaves an INVALID index that + * `IF NOT EXISTS` then silently skips on every future run. Surface any such + * index loudly (warn, don't fail — the migration itself committed) so it can be + * dropped and rebuilt. + */ +async function warnOnInvalidIndexes(): Promise { + try { + const rows = await client` + SELECT n.nspname AS schema, c.relname AS index + FROM pg_index i + JOIN pg_class c ON c.oid = i.indexrelid + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE NOT i.indisvalid + ` + for (const row of rows) { + console.warn( + `WARN: invalid index ${row.schema}.${row.index} — a CONCURRENTLY build failed partway. ` + + 'Drop and rebuild it; IF NOT EXISTS will keep skipping it.' + ) + } + } catch (checkError) { + console.warn('WARN: could not check for invalid indexes.', checkError) + } +} + /** * Release the advisory lock without ever failing the process. The session-level * lock auto-releases when the connection closes, so a thrown unlock — e.g. the diff --git a/packages/testing/src/mocks/database.mock.ts b/packages/testing/src/mocks/database.mock.ts index f055746753e..13313edc130 100644 --- a/packages/testing/src/mocks/database.mock.ts +++ b/packages/testing/src/mocks/database.mock.ts @@ -235,17 +235,21 @@ export function resetDbChainMock(): void { * vi.mock('@sim/db', () => dbChainMock) * ``` */ +const dbChainInstance = { + select, + selectDistinct, + selectDistinctOn, + insert, + update, + delete: del, + execute, + transaction, +} + export const dbChainMock = { - db: { - select, - selectDistinct, - selectDistinctOn, - insert, - update, - delete: del, - execute, - transaction, - }, + db: dbChainInstance, + // Same instance as `db` so per-test chain overrides cover both clients. + dbReplica: dbChainInstance, } /** @@ -309,8 +313,12 @@ export function createMockDb() { * vi.mock('@sim/db', () => databaseMock) * ``` */ +const mockDbInstance = createMockDb() + export const databaseMock = { - db: createMockDb(), + db: mockDbInstance, + // Same instance as `db` so per-test overrides cover both clients. + dbReplica: mockDbInstance, sql: createMockSql(), ...createMockSqlOperators(), } From 232c9ea5bd15237751feca0d4f6967e949d99e99 Mon Sep 17 00:00:00 2001 From: waleed Date: Wed, 10 Jun 2026 15:03:41 -0700 Subject: [PATCH 02/12] fix(db): detect wrapped lock-timeout errors, jittered retries, direct migration DSN support --- packages/db/.env.example | 5 +++++ packages/db/scripts/migrate.ts | 37 +++++++++++++++++++++++++++------- 2 files changed, 35 insertions(+), 7 deletions(-) diff --git a/packages/db/.env.example b/packages/db/.env.example index 34e85c68f89..2f758f81173 100644 --- a/packages/db/.env.example +++ b/packages/db/.env.example @@ -4,3 +4,8 @@ # primary — never set a replica URL here. DATABASE_URL="postgresql://postgres:postgres@localhost:5432/simstudio" + +# Direct (non-pooled) DSN for db:migrate. Required when DATABASE_URL points at +# a transaction-pooling PgBouncer: session advisory locks and session SETs are +# unsupported through transaction pooling. Falls back to DATABASE_URL. +# MIGRATION_DATABASE_URL="" diff --git a/packages/db/scripts/migrate.ts b/packages/db/scripts/migrate.ts index f3fd505b453..04e0be2c137 100644 --- a/packages/db/scripts/migrate.ts +++ b/packages/db/scripts/migrate.ts @@ -1,4 +1,5 @@ import { sleep } from '@sim/utils/helpers' +import { backoffWithJitter } from '@sim/utils/retry' import { drizzle } from 'drizzle-orm/postgres-js' import { migrate } from 'drizzle-orm/postgres-js/migrator' import postgres from 'postgres' @@ -34,7 +35,21 @@ import postgres from 'postgres' * `warnOnInvalidIndexes` check below logs any such index after every run. */ -const url = process.env.DATABASE_URL +/** + * Migrations must run on a DIRECT Postgres connection, never through a + * transaction-pooling PgBouncer. Session-level advisory locks, session `SET`s + * (`statement_timeout`/`lock_timeout` below), and `pg_advisory_unlock` are all + * officially unsupported in transaction pooling — statements can land on + * different server connections, so the lock may not guard the migration, the + * unlock can strand the lock on a pooled connection (wedging the NEXT deploy + * for the full acquisition deadline), and timeout settings can leak into app + * traffic. This is the same reason Prisma requires `directUrl` for migrate. + * + * Set MIGRATION_DATABASE_URL to the direct (non-pooled) DSN in environments + * where DATABASE_URL points at a PgBouncer; it falls back to DATABASE_URL for + * dev/self-hosted setups that connect directly anyway. + */ +const url = process.env.MIGRATION_DATABASE_URL || process.env.DATABASE_URL if (!url) { console.error('ERROR: Missing DATABASE_URL environment variable.') console.error('Ensure packages/db/.env is configured.') @@ -74,8 +89,8 @@ const LOCK_RETRY_INTERVAL_MS = 5_000 * world unblocked; we retry below, then let the deploy retry. */ const DDL_LOCK_TIMEOUT = '5s' -const MAX_MIGRATE_ATTEMPTS = 3 -const MIGRATE_RETRY_DELAY_MS = 15_000 +const MAX_MIGRATE_ATTEMPTS = 8 +const MIGRATE_RETRY_BACKOFF = { baseMs: 2_000, maxMs: 30_000 } as const try { // statement_timeout=0: index builds (esp. CONCURRENTLY on large tables) can run @@ -130,11 +145,12 @@ async function runMigrationsWithRetry(): Promise { return } catch (error) { if (!isLockNotAvailable(error) || attempt >= MAX_MIGRATE_ATTEMPTS) throw error + const delayMs = Math.round(backoffWithJitter(attempt, null, MIGRATE_RETRY_BACKOFF)) console.warn( `WARN: migration DDL hit lock_timeout (attempt ${attempt}/${MAX_MIGRATE_ATTEMPTS}); ` + - `retrying in ${MIGRATE_RETRY_DELAY_MS}ms.` + `retrying in ${delayMs}ms.` ) - await sleep(MIGRATE_RETRY_DELAY_MS) + await sleep(delayMs) // Re-assert: a migration file's post-COMMIT `SET lock_timeout = 0` (the // CONCURRENTLY convention above) is session-level and would otherwise // leak into the retry. @@ -145,10 +161,17 @@ async function runMigrationsWithRetry(): Promise { /** * SQLSTATE 55P03 (`lock_not_available`) — raised when `lock_timeout` expires - * while a statement queues for a lock. + * while a statement queues for a lock. drizzle's `migrate()` wraps driver + * failures (e.g. `DrizzleQueryError` with the Postgres error on `cause`), so + * walk the whole cause chain looking for the code. */ function isLockNotAvailable(error: unknown): boolean { - return error instanceof Error && (error as { code?: string }).code === '55P03' + let current: unknown = error + for (let depth = 0; depth < 10 && current instanceof Error; depth++) { + if ((current as { code?: string }).code === '55P03') return true + current = current.cause + } + return false } /** From 7202c12d576f8f9ec1cda0313685d24c300c8005 Mon Sep 17 00:00:00 2001 From: waleed Date: Wed, 10 Jun 2026 15:22:09 -0700 Subject: [PATCH 03/12] =?UTF-8?q?fix(db):=20audit=20fixes=20=E2=80=94=20un?= =?UTF-8?q?parameterized=20SET,=20primary=20reads=20for=20authz=20scoping,?= =?UTF-8?q?=20shared=20error=20helper?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/sim/app/api/v1/audit-logs/query.ts | 8 ++- .../components/table-grid/table-grid.tsx | 2 +- apps/sim/lib/billing/core/usage-log.test.ts | 14 ++-- apps/sim/lib/data-drains/sources/helpers.ts | 8 ++- apps/sim/lib/logs/list-logs.test.ts | 12 ++-- packages/db/db.ts | 7 +- packages/db/scripts/migrate.ts | 67 ++++++++++--------- packages/testing/src/mocks/database.mock.ts | 4 +- 8 files changed, 63 insertions(+), 59 deletions(-) diff --git a/apps/sim/app/api/v1/audit-logs/query.ts b/apps/sim/app/api/v1/audit-logs/query.ts index d1bdcbf79d6..ba343fcb848 100644 --- a/apps/sim/app/api/v1/audit-logs/query.ts +++ b/apps/sim/app/api/v1/audit-logs/query.ts @@ -1,5 +1,5 @@ import { AuditResourceType } from '@sim/audit' -import { dbReplica } from '@sim/db' +import { db, dbReplica } from '@sim/db' import { auditLog, workspace } from '@sim/db/schema' import type { InferSelectModel } from 'drizzle-orm' import { and, desc, eq, gte, ilike, inArray, isNull, lt, lte, or, type SQL, sql } from 'drizzle-orm' @@ -71,9 +71,13 @@ export function buildFilterConditions(params: AuditLogFilterParams): SQL { - const rows = await dbReplica + const rows = await db .select({ id: workspace.id }) .from(workspace) .where(eq(workspace.organizationId, organizationId)) diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/table-grid.tsx b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/table-grid.tsx index 9705169c036..c95c2e66090 100644 --- a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/table-grid.tsx +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/table-grid.tsx @@ -845,7 +845,7 @@ export function TableGrid({ const oldName = columnsRef.current.find((c) => c.key === columnName)?.name ?? columnName pushUndoRef.current({ type: 'rename-column', oldName, newName, columnId: columnName }) handleColumnRename(columnName, newName) - updateColumnMutation.mutate({ columnName, updates: { name: newName } }) + return updateColumnMutation.mutateAsync({ columnName, updates: { name: newName } }) }, }) diff --git a/apps/sim/lib/billing/core/usage-log.test.ts b/apps/sim/lib/billing/core/usage-log.test.ts index d88e7847382..ff2f446c832 100644 --- a/apps/sim/lib/billing/core/usage-log.test.ts +++ b/apps/sim/lib/billing/core/usage-log.test.ts @@ -23,16 +23,10 @@ const { mockUpdate: vi.fn(), })) -vi.mock('@sim/db', () => ({ - db: { - insert: mockInsert, - transaction: mockTransaction, - }, - dbReplica: { - insert: mockInsert, - transaction: mockTransaction, - }, -})) +vi.mock('@sim/db', () => { + const instance = { insert: mockInsert, transaction: mockTransaction } + return { db: instance, dbReplica: instance } +}) vi.mock('@sim/db/schema', () => ({ usageLog: { diff --git a/apps/sim/lib/data-drains/sources/helpers.ts b/apps/sim/lib/data-drains/sources/helpers.ts index c04bd2825bf..58058d63f15 100644 --- a/apps/sim/lib/data-drains/sources/helpers.ts +++ b/apps/sim/lib/data-drains/sources/helpers.ts @@ -1,13 +1,17 @@ -import { dbReplica } from '@sim/db' +import { db } from '@sim/db' import { workspace } from '@sim/db/schema' import { eq } from 'drizzle-orm' /** * Returns the IDs of all workspaces belonging to the organization. Used by * sources whose underlying tables are workspace-scoped rather than org-scoped. + * + * Reads the primary, not the replica: this scoping list gates which rows a + * drain exports while its cursor advances monotonically — a workspace missing + * from a lagging replica would have its rows skipped past permanently. */ export async function getOrganizationWorkspaceIds(organizationId: string): Promise { - const rows = await dbReplica + const rows = await db .select({ id: workspace.id }) .from(workspace) .where(eq(workspace.organizationId, organizationId)) diff --git a/apps/sim/lib/logs/list-logs.test.ts b/apps/sim/lib/logs/list-logs.test.ts index 55e2551b650..b80bde60a9b 100644 --- a/apps/sim/lib/logs/list-logs.test.ts +++ b/apps/sim/lib/logs/list-logs.test.ts @@ -6,14 +6,10 @@ import { beforeEach, describe, expect, it, vi } from 'vitest' const { selectMock } = vi.hoisted(() => ({ selectMock: vi.fn() })) -vi.mock('@sim/db', () => ({ - db: { - select: selectMock, - }, - dbReplica: { - select: selectMock, - }, -})) +vi.mock('@sim/db', () => { + const instance = { select: selectMock } + return { db: instance, dbReplica: instance } +}) // Local drizzle-orm mock: the global mock's `sql` lacks `.as()` and the chain // mock doesn't support `.orderBy().limit()`. We only need condition/sql builders diff --git a/packages/db/db.ts b/packages/db/db.ts index dad18ba8c6a..c8fa42020c5 100644 --- a/packages/db/db.ts +++ b/packages/db/db.ts @@ -12,7 +12,7 @@ const poolOptions = { idle_timeout: 20, connect_timeout: 30, onnotice: () => {}, -} as const +} const postgresClient = postgres(connectionString, { ...poolOptions, max: 15 }) @@ -31,6 +31,11 @@ export const db = drizzle(postgresClient, { schema }) * self-hosted, realtime), so call sites never need to branch. */ const replicaUrl = process.env.DATABASE_REPLICA_URL +if (replicaUrl && !/^postgres(ql)?:\/\//.test(replicaUrl)) { + throw new Error( + 'DATABASE_REPLICA_URL is set but is not a postgres:// DSN — fix or unset it (reads fall back to the primary when unset)' + ) +} export const dbReplica: typeof db = replicaUrl ? drizzle(postgres(replicaUrl, { ...poolOptions, max: 10 }), { schema }) diff --git a/packages/db/scripts/migrate.ts b/packages/db/scripts/migrate.ts index 04e0be2c137..2f2c1c45bf4 100644 --- a/packages/db/scripts/migrate.ts +++ b/packages/db/scripts/migrate.ts @@ -1,3 +1,4 @@ +import { getPostgresErrorCode } from '@sim/utils/errors' import { sleep } from '@sim/utils/helpers' import { backoffWithJitter } from '@sim/utils/retry' import { drizzle } from 'drizzle-orm/postgres-js' @@ -16,19 +17,27 @@ import postgres from 'postgres' * So, after generating a migration that adds an index on a large/hot table, edit * the generated SQL to end drizzle's transaction first, clear the session * `lock_timeout` (set below for fail-fast DDL; it would cancel CONCURRENTLY's - * legitimate waits on old transactions and leave an INVALID index), then build - * concurrently and idempotently: + * legitimate waits on old transactions and leave an INVALID index), build + * concurrently and idempotently, then RESTORE the fail-fast timeout so later + * statements and files in the same run keep the protection (the SET is + * session-level and would otherwise persist): * * COMMIT;--> statement-breakpoint * SET lock_timeout = 0;--> statement-breakpoint - * CREATE INDEX CONCURRENTLY IF NOT EXISTS "idx_name" ON "table" (...); + * CREATE INDEX CONCURRENTLY IF NOT EXISTS "idx_name" ON "table" (...);--> statement-breakpoint + * SET lock_timeout = '5s'; * * Notes: * - Put the `COMMIT` breakpoint AFTER all transactional DDL (ALTER TABLE/TYPE) * in the file and only the concurrent CREATE INDEX statements below it. - * - Use `IF NOT EXISTS` (and make sibling DDL idempotent, e.g. - * `ADD COLUMN IF NOT EXISTS`, `ADD VALUE IF NOT EXISTS`) so a re-run after a - * failed CONCURRENTLY build is safe — fresh DBs and re-applies both work. + * - drizzle's migrate() wraps ALL pending files in ONE transaction, so the + * embedded `COMMIT` ends that batch transaction: everything after it — in + * this file AND any later pending files — runs in autocommit, one statement + * at a time. This is why EVERY statement in a file using this convention, + * and in any file that can follow it in a batch, must be idempotent + * (`IF NOT EXISTS` / `ADD COLUMN IF NOT EXISTS` / `ADD VALUE IF NOT EXISTS`): + * a failure after the COMMIT cannot roll back, and the re-run starts from + * the top of the batch. * - CONCURRENTLY only takes a SHARE UPDATE EXCLUSIVE lock (allows reads/writes). * - Always validate on staging before prod; a failed CONCURRENTLY build can * leave an INVALID index that must be dropped and rebuilt — the @@ -76,7 +85,7 @@ const client = postgres(url, { max: 1, connect_timeout: 10 }) * the connection drops, so a crashed runner never wedges the lock. */ const MIGRATION_LOCK_KEY = 4_961_002_270n -const LOCK_ACQUIRE_DEADLINE_MS = 10 * 60_000 +const LOCK_ACQUIRE_DEADLINE_MS = 30 * 60_000 const LOCK_RETRY_INTERVAL_MS = 5_000 /** @@ -96,7 +105,6 @@ try { // statement_timeout=0: index builds (esp. CONCURRENTLY on large tables) can run // far longer than the app default; a migration must never be killed mid-build. await client`SET statement_timeout = 0` - await client`SET lock_timeout = ${DDL_LOCK_TIMEOUT}` await acquireMigrationLock() try { await runMigrationsWithRetry() @@ -134,46 +142,39 @@ async function acquireMigrationLock(): Promise { /** * Run pending migrations, retrying when a statement loses the `lock_timeout` - * race (SQLSTATE 55P03). drizzle wraps each migration file in a transaction, so - * a lock-timeout failure rolls that file back atomically and the retry re-reads - * `__drizzle_migrations` and re-runs only what is still unapplied. + * race (SQLSTATE 55P03, detected anywhere in the error's `cause` chain since + * drizzle wraps driver failures). + * + * The fail-fast `lock_timeout` is (re)asserted at the top of EVERY attempt: + * `SET` is rejected by Postgres when parameterized, so the constant is inlined + * via `client.unsafe`, and a prior attempt's migration file may have left the + * session at `lock_timeout = 0` (the CONCURRENTLY convention above). + * + * Retry safety: drizzle wraps the whole pending batch in one transaction, so a + * lock-timeout failure rolls the batch back and the retry replays it from the + * top. Files using the embedded-`COMMIT` CONCURRENTLY convention break out of + * that transaction — their post-COMMIT statements are required to be + * idempotent (see the convention notes above) precisely so a replay is safe. */ async function runMigrationsWithRetry(): Promise { for (let attempt = 1; ; attempt++) { + await client.unsafe(`SET lock_timeout = '${DDL_LOCK_TIMEOUT}'`) try { await migrate(drizzle(client), { migrationsFolder: './migrations' }) return } catch (error) { - if (!isLockNotAvailable(error) || attempt >= MAX_MIGRATE_ATTEMPTS) throw error - const delayMs = Math.round(backoffWithJitter(attempt, null, MIGRATE_RETRY_BACKOFF)) + const isLockTimeout = getPostgresErrorCode(error) === '55P03' + if (!isLockTimeout || attempt >= MAX_MIGRATE_ATTEMPTS) throw error + const delayMs = backoffWithJitter(attempt, null, MIGRATE_RETRY_BACKOFF) console.warn( `WARN: migration DDL hit lock_timeout (attempt ${attempt}/${MAX_MIGRATE_ATTEMPTS}); ` + - `retrying in ${delayMs}ms.` + `retrying in ${Math.round(delayMs)}ms.` ) await sleep(delayMs) - // Re-assert: a migration file's post-COMMIT `SET lock_timeout = 0` (the - // CONCURRENTLY convention above) is session-level and would otherwise - // leak into the retry. - await client`SET lock_timeout = ${DDL_LOCK_TIMEOUT}` } } } -/** - * SQLSTATE 55P03 (`lock_not_available`) — raised when `lock_timeout` expires - * while a statement queues for a lock. drizzle's `migrate()` wraps driver - * failures (e.g. `DrizzleQueryError` with the Postgres error on `cause`), so - * walk the whole cause chain looking for the code. - */ -function isLockNotAvailable(error: unknown): boolean { - let current: unknown = error - for (let depth = 0; depth < 10 && current instanceof Error; depth++) { - if ((current as { code?: string }).code === '55P03') return true - current = current.cause - } - return false -} - /** * A `CREATE INDEX CONCURRENTLY` that fails partway leaves an INVALID index that * `IF NOT EXISTS` then silently skips on every future run. Surface any such diff --git a/packages/testing/src/mocks/database.mock.ts b/packages/testing/src/mocks/database.mock.ts index 13313edc130..7c7b9d6b489 100644 --- a/packages/testing/src/mocks/database.mock.ts +++ b/packages/testing/src/mocks/database.mock.ts @@ -248,7 +248,7 @@ const dbChainInstance = { export const dbChainMock = { db: dbChainInstance, - // Same instance as `db` so per-test chain overrides cover both clients. + /** Same instance as `db` so per-test chain overrides cover both clients. */ dbReplica: dbChainInstance, } @@ -317,7 +317,7 @@ const mockDbInstance = createMockDb() export const databaseMock = { db: mockDbInstance, - // Same instance as `db` so per-test overrides cover both clients. + /** Same instance as `db` so per-test overrides cover both clients. */ dbReplica: mockDbInstance, sql: createMockSql(), ...createMockSqlOperators(), From a050e3538881a7877f82836e1865ad8881998244 Mon Sep 17 00:00:00 2001 From: waleed Date: Wed, 10 Jun 2026 16:55:16 -0700 Subject: [PATCH 04/12] =?UTF-8?q?fix(db):=20pin=20migration=20session=20?= =?UTF-8?q?=E2=80=94=20disable=20max=5Flifetime=20recycling,=20guard=20bac?= =?UTF-8?q?kend=20pid=20across=20retries?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/db/scripts/migrate.ts | 62 ++++++++++++++++++++++++---------- 1 file changed, 45 insertions(+), 17 deletions(-) diff --git a/packages/db/scripts/migrate.ts b/packages/db/scripts/migrate.ts index 2f2c1c45bf4..5c089f722fb 100644 --- a/packages/db/scripts/migrate.ts +++ b/packages/db/scripts/migrate.ts @@ -11,7 +11,7 @@ import postgres from 'postgres' * drizzle-kit emits plain `CREATE INDEX`, which takes a SHARE lock and blocks all * writes for the build duration — on a big, write-hot table (e.g. * workflow_execution_logs, usage_log) that stalls every in-flight workflow - * completion for minutes. drizzle wraps each migration in a transaction, and + * completion for minutes. drizzle runs migrations inside a transaction, and * `CREATE INDEX CONCURRENTLY` cannot run inside a transaction block. * * So, after generating a migration that adds an index on a large/hot table, edit @@ -36,8 +36,8 @@ import postgres from 'postgres' * at a time. This is why EVERY statement in a file using this convention, * and in any file that can follow it in a batch, must be idempotent * (`IF NOT EXISTS` / `ADD COLUMN IF NOT EXISTS` / `ADD VALUE IF NOT EXISTS`): - * a failure after the COMMIT cannot roll back, and the re-run starts from - * the top of the batch. + * a failure after the COMMIT cannot roll back, and the re-run replays every + * file whose journal record has not yet committed. * - CONCURRENTLY only takes a SHARE UPDATE EXCLUSIVE lock (allows reads/writes). * - Always validate on staging before prod; a failed CONCURRENTLY build can * leave an INVALID index that must be dropped and rebuilt — the @@ -65,7 +65,13 @@ if (!url) { process.exit(1) } -const client = postgres(url, { max: 1, connect_timeout: 10 }) +/** + * `max_lifetime: null` is load-bearing: postgres-js defaults to recycling the + * connection after a randomized 30–60 minutes, and a transparent reconnect + * silently drops the session advisory lock and session `SET`s. The migration + * session must live exactly as long as the run. + */ +const client = postgres(url, { max: 1, connect_timeout: 10, max_lifetime: null }) /** * Cross-process migration lock key (a stable, app-wide 64-bit constant). @@ -101,10 +107,15 @@ const DDL_LOCK_TIMEOUT = '5s' const MAX_MIGRATE_ATTEMPTS = 8 const MIGRATE_RETRY_BACKOFF = { baseMs: 2_000, maxMs: 30_000 } as const +/** + * Backend pid of the session that acquired the advisory lock. Re-checked at the + * top of every migration attempt: if the connection was silently replaced + * (server restart, network failure), the new session does NOT hold the lock, + * and running migrations on it would break mutual exclusion — abort loudly. + */ +let lockSessionPid = 0 + try { - // statement_timeout=0: index builds (esp. CONCURRENTLY on large tables) can run - // far longer than the app default; a migration must never be killed mid-build. - await client`SET statement_timeout = 0` await acquireMigrationLock() try { await runMigrationsWithRetry() @@ -128,8 +139,12 @@ try { async function acquireMigrationLock(): Promise { const deadline = Date.now() + LOCK_ACQUIRE_DEADLINE_MS for (;;) { - const [{ locked }] = await client`SELECT pg_try_advisory_lock(${MIGRATION_LOCK_KEY}) AS locked` - if (locked) return + const [{ locked, pid }] = + await client`SELECT pg_try_advisory_lock(${MIGRATION_LOCK_KEY}) AS locked, pg_backend_pid() AS pid` + if (locked) { + lockSessionPid = pid + return + } if (Date.now() >= deadline) { throw new Error( `Timed out after ${LOCK_ACQUIRE_DEADLINE_MS}ms waiting for the migration advisory lock; ` + @@ -145,19 +160,32 @@ async function acquireMigrationLock(): Promise { * race (SQLSTATE 55P03, detected anywhere in the error's `cause` chain since * drizzle wraps driver failures). * - * The fail-fast `lock_timeout` is (re)asserted at the top of EVERY attempt: - * `SET` is rejected by Postgres when parameterized, so the constant is inlined - * via `client.unsafe`, and a prior attempt's migration file may have left the - * session at `lock_timeout = 0` (the CONCURRENTLY convention above). + * Every attempt starts by verifying the session still holds the advisory lock + * (backend pid unchanged) and re-asserting the session timeouts: + * `statement_timeout = 0` because index builds (esp. CONCURRENTLY on large + * tables) can run far longer than any app default and must never be killed + * mid-build, and the fail-fast `lock_timeout` because a prior attempt's + * migration file may have left the session at `lock_timeout = 0` (the + * CONCURRENTLY convention above). `SET` is rejected by Postgres when + * parameterized, so the constants are inlined via `client.unsafe`. * * Retry safety: drizzle wraps the whole pending batch in one transaction, so a - * lock-timeout failure rolls the batch back and the retry replays it from the - * top. Files using the embedded-`COMMIT` CONCURRENTLY convention break out of - * that transaction — their post-COMMIT statements are required to be - * idempotent (see the convention notes above) precisely so a replay is safe. + * lock-timeout failure rolls the batch back and the retry resumes from the + * first file whose journal record has not committed. Files using the + * embedded-`COMMIT` CONCURRENTLY convention break out of that transaction — + * their post-COMMIT statements are required to be idempotent (see the + * convention notes above) precisely so a replay is safe. */ async function runMigrationsWithRetry(): Promise { for (let attempt = 1; ; attempt++) { + const [{ pid }] = await client`SELECT pg_backend_pid() AS pid` + if (pid !== lockSessionPid) { + throw new Error( + `Database session changed mid-run (backend pid ${lockSessionPid} -> ${pid}); ` + + 'the migration advisory lock was lost. Aborting so a fresh runner can retry safely.' + ) + } + await client.unsafe('SET statement_timeout = 0') await client.unsafe(`SET lock_timeout = '${DDL_LOCK_TIMEOUT}'`) try { await migrate(drizzle(client), { migrationsFolder: './migrations' }) From 077fe06a30233b1d80b3b8390e559acde614b337 Mon Sep 17 00:00:00 2001 From: waleed Date: Wed, 10 Jun 2026 17:00:22 -0700 Subject: [PATCH 05/12] fix(db): gate pid session guard to direct migration connections (pooler pids legitimately vary) --- packages/db/scripts/migrate.ts | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/packages/db/scripts/migrate.ts b/packages/db/scripts/migrate.ts index 5c089f722fb..75ad68eb38c 100644 --- a/packages/db/scripts/migrate.ts +++ b/packages/db/scripts/migrate.ts @@ -65,6 +65,16 @@ if (!url) { process.exit(1) } +/** + * The backend-pid session guard below is only sound on a DIRECT connection: + * through a transaction-pooling PgBouncer, consecutive statements can + * legitimately run on different server backends, so a pid change does not mean + * the session was lost and the guard would false-positive on every run. + * MIGRATION_DATABASE_URL is by contract the direct DSN; when falling back to + * DATABASE_URL (which may be pooled), the guard is skipped. + */ +const hasDirectMigrationUrl = Boolean(process.env.MIGRATION_DATABASE_URL) + /** * `max_lifetime: null` is load-bearing: postgres-js defaults to recycling the * connection after a randomized 30–60 minutes, and a transparent reconnect @@ -178,12 +188,14 @@ async function acquireMigrationLock(): Promise { */ async function runMigrationsWithRetry(): Promise { for (let attempt = 1; ; attempt++) { - const [{ pid }] = await client`SELECT pg_backend_pid() AS pid` - if (pid !== lockSessionPid) { - throw new Error( - `Database session changed mid-run (backend pid ${lockSessionPid} -> ${pid}); ` + - 'the migration advisory lock was lost. Aborting so a fresh runner can retry safely.' - ) + if (hasDirectMigrationUrl) { + const [{ pid }] = await client`SELECT pg_backend_pid() AS pid` + if (pid !== lockSessionPid) { + throw new Error( + `Database session changed mid-run (backend pid ${lockSessionPid} -> ${pid}); ` + + 'the migration advisory lock was lost. Aborting so a fresh runner can retry safely.' + ) + } } await client.unsafe('SET statement_timeout = 0') await client.unsafe(`SET lock_timeout = '${DDL_LOCK_TIMEOUT}'`) From 1abc8e0e9784d1e5f92df143d15fca7b11090bbd Mon Sep 17 00:00:00 2001 From: waleed Date: Wed, 10 Jun 2026 17:15:10 -0700 Subject: [PATCH 06/12] improvement(billing): route display-only usage aggregations to the read replica via executor threading --- apps/sim/app/api/billing/route.ts | 6 +- .../[id]/members/[memberId]/route.ts | 4 +- apps/sim/app/api/usage/route.ts | 3 +- .../app/api/users/me/usage-limits/route.ts | 3 +- .../admin/organizations/[id]/billing/route.ts | 4 +- apps/sim/lib/billing/core/billing.ts | 37 +++++--- apps/sim/lib/billing/core/organization.ts | 38 +++++--- apps/sim/lib/billing/core/usage-log.ts | 10 +- apps/sim/lib/billing/core/usage.ts | 93 ++++++++++++------- apps/sim/lib/billing/credits/daily-refresh.ts | 24 +++-- 10 files changed, 138 insertions(+), 84 deletions(-) diff --git a/apps/sim/app/api/billing/route.ts b/apps/sim/app/api/billing/route.ts index 2cc338abd48..364f4d707cc 100644 --- a/apps/sim/app/api/billing/route.ts +++ b/apps/sim/app/api/billing/route.ts @@ -1,4 +1,4 @@ -import { db } from '@sim/db' +import { db, dbReplica } from '@sim/db' import { member } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { and, eq } from 'drizzle-orm' @@ -65,7 +65,7 @@ export const GET = withRouteHandler(async (request: NextRequest) => { } const [billingResult, billingStatus] = await Promise.all([ - getSimplifiedBillingSummary(session.user.id, contextId || undefined), + getSimplifiedBillingSummary(session.user.id, contextId || undefined, dbReplica), getEffectiveBillingStatus(session.user.id), ]) billingData = billingResult @@ -114,7 +114,7 @@ export const GET = withRouteHandler(async (request: NextRequest) => { } // Get organization-specific billing - const rawBillingData = await getOrganizationBillingData(contextId!) + const rawBillingData = await getOrganizationBillingData(contextId!, dbReplica) if (!rawBillingData) { return NextResponse.json( diff --git a/apps/sim/app/api/organizations/[id]/members/[memberId]/route.ts b/apps/sim/app/api/organizations/[id]/members/[memberId]/route.ts index 0cb2dd979b4..19592f6b246 100644 --- a/apps/sim/app/api/organizations/[id]/members/[memberId]/route.ts +++ b/apps/sim/app/api/organizations/[id]/members/[memberId]/route.ts @@ -1,5 +1,5 @@ import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit' -import { db } from '@sim/db' +import { db, dbReplica } from '@sim/db' import { member, user, userStats } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { and, eq } from 'drizzle-orm' @@ -97,7 +97,7 @@ export const GET = withRouteHandler( .where(eq(userStats.userId, memberId)) .limit(1) - const computed = await getUserUsageData(memberId) + const computed = await getUserUsageData(memberId, dbReplica) if (usageData.length > 0) { // currentPeriodCost is only a baseline; add this member's attributed diff --git a/apps/sim/app/api/usage/route.ts b/apps/sim/app/api/usage/route.ts index 5ee3f0a5401..c8271ace599 100644 --- a/apps/sim/app/api/usage/route.ts +++ b/apps/sim/app/api/usage/route.ts @@ -1,3 +1,4 @@ +import { dbReplica } from '@sim/db' import { createLogger } from '@sim/logger' import { type NextRequest, NextResponse } from 'next/server' import { getUsageLimitContract, updateUsageLimitContract } from '@/lib/api/contracts/subscription' @@ -62,7 +63,7 @@ export const GET = withRouteHandler(async (request: NextRequest) => { return NextResponse.json({ error: 'Permission denied' }, { status: 403 }) } - const org = await getOrganizationBillingData(organizationId) + const org = await getOrganizationBillingData(organizationId, dbReplica) return NextResponse.json({ success: true, context, diff --git a/apps/sim/app/api/users/me/usage-limits/route.ts b/apps/sim/app/api/users/me/usage-limits/route.ts index 4dfd60b1a0a..f85f46ac317 100644 --- a/apps/sim/app/api/users/me/usage-limits/route.ts +++ b/apps/sim/app/api/users/me/usage-limits/route.ts @@ -1,3 +1,4 @@ +import { dbReplica } from '@sim/db' import { createLogger } from '@sim/logger' import { getErrorMessage } from '@sim/utils/errors' import { type NextRequest, NextResponse } from 'next/server' @@ -43,7 +44,7 @@ export const GET = withRouteHandler(async (request: NextRequest) => { const [usageCheck, effectiveCost, storageUsage, storageLimit] = await Promise.all([ checkServerSideUsageLimits(authenticatedUserId), - getEffectiveCurrentPeriodCost(authenticatedUserId), + getEffectiveCurrentPeriodCost(authenticatedUserId, dbReplica), getUserStorageUsage(authenticatedUserId), getUserStorageLimit(authenticatedUserId), ]) diff --git a/apps/sim/app/api/v1/admin/organizations/[id]/billing/route.ts b/apps/sim/app/api/v1/admin/organizations/[id]/billing/route.ts index 9dd083a30a2..c19168b7a6f 100644 --- a/apps/sim/app/api/v1/admin/organizations/[id]/billing/route.ts +++ b/apps/sim/app/api/v1/admin/organizations/[id]/billing/route.ts @@ -15,7 +15,7 @@ * Response: AdminSingleResponse<{ success: true, orgUsageLimit: string | null }> */ -import { db } from '@sim/db' +import { db, dbReplica } from '@sim/db' import { member, organization } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { count, eq } from 'drizzle-orm' @@ -96,7 +96,7 @@ export const GET = withRouteHandler( return singleResponse(data) } - const billingData = await getOrganizationBillingData(organizationId) + const billingData = await getOrganizationBillingData(organizationId, dbReplica) if (!billingData) { return notFoundResponse('Organization or subscription') diff --git a/apps/sim/lib/billing/core/billing.ts b/apps/sim/lib/billing/core/billing.ts index 2db6b3ef7c7..aca33c6e3d7 100644 --- a/apps/sim/lib/billing/core/billing.ts +++ b/apps/sim/lib/billing/core/billing.ts @@ -22,6 +22,7 @@ import { isOrgScopedSubscription, } from '@/lib/billing/subscriptions/utils' import { Decimal, toDecimal, toNumber } from '@/lib/billing/utils/decimal' +import type { DbOrTx } from '@/lib/db/types' export { getPlanPricing } @@ -384,7 +385,8 @@ export async function calculateSubscriptionOverage(sub: { */ export async function getSimplifiedBillingSummary( userId: string, - organizationId?: string + organizationId?: string, + executor: DbOrTx = db ): Promise<{ type: 'individual' | 'organization' plan: string @@ -432,7 +434,7 @@ export async function getSimplifiedBillingSummary( organizationId ? getOrganizationSubscription(organizationId) : getHighestPrioritySubscription(userId), - getUserUsageData(userId), + getUserUsageData(userId, executor), ]) const plan = subscription?.plan || 'free' @@ -469,7 +471,9 @@ export async function getSimplifiedBillingSummary( const ledgerUsage = orgBillingPeriod ? await getBillingPeriodUsageCost( { type: 'organization', id: organizationId }, - orgBillingPeriod + orgBillingPeriod, + undefined, + executor ) : 0 // Copilot breakdown = member baselines (copilot + MCP) + the copilot-family @@ -481,7 +485,8 @@ export async function getSimplifiedBillingSummary( ? await getBillingPeriodUsageCost( { type: 'organization', id: organizationId }, orgBillingPeriod, - COPILOT_USAGE_SOURCES + COPILOT_USAGE_SOURCES, + executor ) : 0) let refreshDeduction = 0 @@ -492,15 +497,18 @@ export async function getSimplifiedBillingSummary( organizationId, subscription.periodStart ) - refreshDeduction = await computeDailyRefreshConsumed({ - userIds: pooled.memberIds, - periodStart: subscription.periodStart, - periodEnd: subscription.periodEnd ?? null, - planDollars, - seats: subscription.seats || 1, - userBounds: Object.keys(userBounds).length > 0 ? userBounds : undefined, - billingEntity: { type: 'organization', id: organizationId }, - }) + refreshDeduction = await computeDailyRefreshConsumed( + { + userIds: pooled.memberIds, + periodStart: subscription.periodStart, + periodEnd: subscription.periodEnd ?? null, + planDollars, + seats: subscription.seats || 1, + userBounds: Object.keys(userBounds).length > 0 ? userBounds : undefined, + billingEntity: { type: 'organization', id: organizationId }, + }, + executor + ) } } const effectiveCurrentUsage = Math.max(0, rawCurrentUsage + ledgerUsage - refreshDeduction) @@ -609,7 +617,8 @@ export async function getSimplifiedBillingSummary( totalCopilotCost += await getBillingPeriodUsageCost( copilotEntity, copilotBillingPeriod, - COPILOT_USAGE_SOURCES + COPILOT_USAGE_SOURCES, + executor ) } diff --git a/apps/sim/lib/billing/core/organization.ts b/apps/sim/lib/billing/core/organization.ts index dd3389213f9..f7240550ebf 100644 --- a/apps/sim/lib/billing/core/organization.ts +++ b/apps/sim/lib/billing/core/organization.ts @@ -19,6 +19,7 @@ import { hasUsableSubscriptionStatus, } from '@/lib/billing/subscriptions/utils' import { toDecimal, toNumber } from '@/lib/billing/utils/decimal' +import type { DbOrTx } from '@/lib/db/types' const logger = createLogger('OrganizationBilling') @@ -64,7 +65,8 @@ interface MemberUsageData { */ export async function getOrgMemberLedgerByUser( organizationId: string, - period?: { start: Date; end: Date } | null + period?: { start: Date; end: Date } | null, + executor: DbOrTx = db ): Promise> { let billingPeriod = period ?? null if (period === undefined) { @@ -77,7 +79,9 @@ export async function getOrgMemberLedgerByUser( if (!billingPeriod) return new Map() return getBillingPeriodUsageCostByUser( { type: 'organization', id: organizationId }, - billingPeriod + billingPeriod, + undefined, + executor ) } @@ -85,7 +89,8 @@ export async function getOrgMemberLedgerByUser( * Get comprehensive organization billing and usage data */ export async function getOrganizationBillingData( - organizationId: string + organizationId: string, + executor: DbOrTx = db ): Promise { try { // Get organization info @@ -134,7 +139,7 @@ export async function getOrganizationBillingData( subscription.periodStart && subscription.periodEnd ? { start: subscription.periodStart, end: subscription.periodEnd } : null - const usageByUser = await getOrgMemberLedgerByUser(organizationId, billingPeriod) + const usageByUser = await getOrgMemberLedgerByUser(organizationId, billingPeriod, executor) // Process member data const members: MemberUsageData[] = membersWithUsage.map((memberRecord) => { @@ -168,7 +173,9 @@ export async function getOrganizationBillingData( if (billingPeriod) { totalCurrentUsage += await getBillingPeriodUsageCost( { type: 'organization', id: subscription.referenceId }, - billingPeriod + billingPeriod, + undefined, + executor ) } @@ -180,15 +187,18 @@ export async function getOrganizationBillingData( subscription.referenceId, subscription.periodStart ) - const refreshConsumed = await computeDailyRefreshConsumed({ - userIds: memberIds, - periodStart: subscription.periodStart, - periodEnd: subscription.periodEnd ?? null, - planDollars, - seats: subscription.seats || 1, - userBounds: Object.keys(userBounds).length > 0 ? userBounds : undefined, - billingEntity: { type: 'organization', id: subscription.referenceId }, - }) + const refreshConsumed = await computeDailyRefreshConsumed( + { + userIds: memberIds, + periodStart: subscription.periodStart, + periodEnd: subscription.periodEnd ?? null, + planDollars, + seats: subscription.seats || 1, + userBounds: Object.keys(userBounds).length > 0 ? userBounds : undefined, + billingEntity: { type: 'organization', id: subscription.referenceId }, + }, + executor + ) totalCurrentUsage = Math.max(0, totalCurrentUsage - refreshConsumed) } } diff --git a/apps/sim/lib/billing/core/usage-log.ts b/apps/sim/lib/billing/core/usage-log.ts index a7efa4d52ef..c5b7435d877 100644 --- a/apps/sim/lib/billing/core/usage-log.ts +++ b/apps/sim/lib/billing/core/usage-log.ts @@ -169,7 +169,8 @@ async function resolveBillingContext( export async function getBillingPeriodUsageCost( billingEntity: BillingEntity, billingPeriod: { start: Date; end: Date }, - source?: UsageLogSource | UsageLogSource[] + source?: UsageLogSource | UsageLogSource[], + executor: DbOrTx = db ): Promise { const conditions = [ eq(usageLog.billingEntityType, billingEntity.type), @@ -183,7 +184,7 @@ export async function getBillingPeriodUsageCost( ) } - const [row] = await db + const [row] = await executor .select({ cost: sql`COALESCE(SUM(${usageLog.cost}), 0)`, }) @@ -196,7 +197,8 @@ export async function getBillingPeriodUsageCost( export async function getBillingPeriodUsageCostByUser( billingEntity: BillingEntity, billingPeriod: { start: Date; end: Date }, - source?: UsageLogSource | UsageLogSource[] + source?: UsageLogSource | UsageLogSource[], + executor: DbOrTx = db ): Promise> { const conditions = [ eq(usageLog.billingEntityType, billingEntity.type), @@ -210,7 +212,7 @@ export async function getBillingPeriodUsageCostByUser( ) } - const rows = await db + const rows = await executor .select({ userId: usageLog.userId, cost: sql`COALESCE(SUM(${usageLog.cost}), 0)`, diff --git a/apps/sim/lib/billing/core/usage.ts b/apps/sim/lib/billing/core/usage.ts index 972a199b0fb..7845312d316 100644 --- a/apps/sim/lib/billing/core/usage.ts +++ b/apps/sim/lib/billing/core/usage.ts @@ -34,6 +34,7 @@ import type { BillingData, UsageData, UsageLimitInfo } from '@/lib/billing/types import { Decimal, toDecimal, toNumber } from '@/lib/billing/utils/decimal' import { isBillingEnabled } from '@/lib/core/config/feature-flags' import { getBaseUrl } from '@/lib/core/utils/urls' +import type { DbOrTx } from '@/lib/db/types' import { sendEmail } from '@/lib/messaging/email/mailer' import { getEmailPreferences } from '@/lib/messaging/email/unsubscribe' @@ -179,7 +180,7 @@ export async function ensureUserStatsExists(userId: string): Promise { /** * Get comprehensive usage data for a user */ -export async function getUserUsageData(userId: string): Promise { +export async function getUserUsageData(userId: string, executor: DbOrTx = db): Promise { try { await ensureUserStatsExists(userId) @@ -203,7 +204,12 @@ export async function getUserUsageData(userId: string): Promise { let currentUsageDecimal = toDecimal(stats.currentPeriodCost) if (!orgScoped) { currentUsageDecimal = currentUsageDecimal.plus( - await getBillingPeriodUsageCost({ type: 'user', id: userId }, billingPeriod) + await getBillingPeriodUsageCost( + { type: 'user', id: userId }, + billingPeriod, + undefined, + executor + ) ) } @@ -242,7 +248,9 @@ export async function getUserUsageData(userId: string): Promise { lastPeriodCost = pooled.lastPeriodCost const ledgerUsage = await getBillingPeriodUsageCost( { type: 'organization', id: subscription.referenceId }, - billingPeriod + billingPeriod, + undefined, + executor ) currentUsage = pooled.currentPeriodCost + ledgerUsage } else { @@ -264,24 +272,30 @@ export async function getUserUsageData(userId: string): Promise { subscription.referenceId, billingPeriodStart ) - dailyRefreshConsumed = await computeDailyRefreshConsumed({ - userIds: orgMemberIds, + dailyRefreshConsumed = await computeDailyRefreshConsumed( + { + userIds: orgMemberIds, + periodStart: billingPeriodStart, + periodEnd: billingPeriodEnd, + planDollars, + seats: subscription.seats || 1, + userBounds: Object.keys(userBounds).length > 0 ? userBounds : undefined, + billingEntity: { type: 'organization', id: subscription.referenceId }, + }, + executor + ) + } + } else { + dailyRefreshConsumed = await computeDailyRefreshConsumed( + { + userIds: [userId], periodStart: billingPeriodStart, periodEnd: billingPeriodEnd, planDollars, - seats: subscription.seats || 1, - userBounds: Object.keys(userBounds).length > 0 ? userBounds : undefined, - billingEntity: { type: 'organization', id: subscription.referenceId }, - }) - } - } else { - dailyRefreshConsumed = await computeDailyRefreshConsumed({ - userIds: [userId], - periodStart: billingPeriodStart, - periodEnd: billingPeriodEnd, - planDollars, - billingEntity: { type: 'user', id: userId }, - }) + billingEntity: { type: 'user', id: userId }, + }, + executor + ) } } } @@ -612,7 +626,10 @@ export async function syncUsageLimitsFromSubscription(userId: string): Promise { +export async function getEffectiveCurrentPeriodCost( + userId: string, + executor: DbOrTx = db +): Promise { const subscription = await getHighestPrioritySubscription(userId) const orgScoped = isOrgScopedSubscription(subscription, userId) @@ -631,7 +648,9 @@ export async function getEffectiveCurrentPeriodCost(userId: string): Promise 0 ? userBounds : undefined, - billingEntity: - orgScoped && subscription - ? { type: 'organization', id: subscription.referenceId } - : { type: 'user', id: userId }, - }) + const refreshConsumed = await computeDailyRefreshConsumed( + { + userIds: refreshUserIds, + periodStart: subscription.periodStart, + periodEnd: subscription.periodEnd ?? null, + planDollars, + seats: subscription.seats || 1, + userBounds: Object.keys(userBounds).length > 0 ? userBounds : undefined, + billingEntity: + orgScoped && subscription + ? { type: 'organization', id: subscription.referenceId } + : { type: 'user', id: userId }, + }, + executor + ) return Math.max(0, rawCost - refreshConsumed) } diff --git a/apps/sim/lib/billing/credits/daily-refresh.ts b/apps/sim/lib/billing/credits/daily-refresh.ts index 4835c091f31..7391c74192a 100644 --- a/apps/sim/lib/billing/credits/daily-refresh.ts +++ b/apps/sim/lib/billing/credits/daily-refresh.ts @@ -16,6 +16,7 @@ import { member, usageLog, userStats } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { and, eq, gte, inArray, lt, or, sql, sum } from 'drizzle-orm' import { DAILY_REFRESH_RATE } from '@/lib/billing/constants' +import type { DbOrTx } from '@/lib/db/types' const logger = createLogger('DailyRefresh') @@ -41,15 +42,18 @@ export interface PerUserBounds { * * @returns Total dollars of refresh consumed across all days (to subtract from usage) */ -export async function computeDailyRefreshConsumed(params: { - userIds: string[] - periodStart: Date - periodEnd?: Date | null - planDollars: number - seats?: number - userBounds?: Record - billingEntity?: { type: 'user' | 'organization'; id: string } -}): Promise { +export async function computeDailyRefreshConsumed( + params: { + userIds: string[] + periodStart: Date + periodEnd?: Date | null + planDollars: number + seats?: number + userBounds?: Record + billingEntity?: { type: 'user' | 'organization'; id: string } + }, + executor: DbOrTx = db +): Promise { const { userIds, periodStart, @@ -114,7 +118,7 @@ export async function computeDailyRefreshConsumed(params: { if (rowFilters.length === 0) return 0 - const rows = await db + const rows = await executor .select({ dayIndex: sql`FLOOR((EXTRACT(EPOCH FROM ${usageLog.createdAt}) - ${Math.floor(periodStart.getTime() / 1000)}) / 86400)`.as( From 52757a994155b792bc8097c57c0ce8b0e330e37b Mon Sep 17 00:00:00 2001 From: waleed Date: Wed, 10 Jun 2026 17:21:43 -0700 Subject: [PATCH 07/12] chore(db): drop call-site justification comments --- apps/sim/app/api/v1/audit-logs/query.ts | 4 ---- apps/sim/lib/data-drains/sources/helpers.ts | 4 ---- 2 files changed, 8 deletions(-) diff --git a/apps/sim/app/api/v1/audit-logs/query.ts b/apps/sim/app/api/v1/audit-logs/query.ts index ba343fcb848..795c54ecfb4 100644 --- a/apps/sim/app/api/v1/audit-logs/query.ts +++ b/apps/sim/app/api/v1/audit-logs/query.ts @@ -71,10 +71,6 @@ export function buildFilterConditions(params: AuditLogFilterParams): SQL { const rows = await db diff --git a/apps/sim/lib/data-drains/sources/helpers.ts b/apps/sim/lib/data-drains/sources/helpers.ts index 58058d63f15..33ff90ab42c 100644 --- a/apps/sim/lib/data-drains/sources/helpers.ts +++ b/apps/sim/lib/data-drains/sources/helpers.ts @@ -5,10 +5,6 @@ import { eq } from 'drizzle-orm' /** * Returns the IDs of all workspaces belonging to the organization. Used by * sources whose underlying tables are workspace-scoped rather than org-scoped. - * - * Reads the primary, not the replica: this scoping list gates which rows a - * drain exports while its cursor advances monotonically — a workspace missing - * from a lagging replica would have its rows skipped past permanently. */ export async function getOrganizationWorkspaceIds(organizationId: string): Promise { const rows = await db From c277a595e3cb9d7e63ef6ea6b6be09cd61c6a06e Mon Sep 17 00:00:00 2001 From: waleed Date: Wed, 10 Jun 2026 17:39:05 -0700 Subject: [PATCH 08/12] chore(db): tighten doc comments --- packages/db/db.ts | 14 +--- packages/db/scripts/migrate.ts | 142 ++++++++------------------------- 2 files changed, 38 insertions(+), 118 deletions(-) diff --git a/packages/db/db.ts b/packages/db/db.ts index c8fa42020c5..397e11a894c 100644 --- a/packages/db/db.ts +++ b/packages/db/db.ts @@ -19,16 +19,10 @@ const postgresClient = postgres(connectionString, { ...poolOptions, max: 15 }) export const db = drizzle(postgresClient, { schema }) /** - * Read-replica client — EXPLICIT OPT-IN. - * - * Import `dbReplica` only for reads that tolerate bounded staleness and have no - * read-your-writes dependency: logs listing/search, audit logs, dashboard - * aggregations, bulk exports. Never use it for auth/session lookups, workflow - * state, billing-limit enforcement, or any read inside a write-reconciling - * flow. - * - * Falls back to the primary client when `DATABASE_REPLICA_URL` is unset (dev, - * self-hosted, realtime), so call sites never need to branch. + * Opt-in read-replica client for reads that tolerate bounded staleness and have + * no read-your-writes dependency (logs, exports, dashboard aggregations). Never + * for auth, workflow state, or billing enforcement. Falls back to the primary + * when `DATABASE_REPLICA_URL` is unset, so call sites never branch. */ const replicaUrl = process.env.DATABASE_REPLICA_URL if (replicaUrl && !/^postgres(ql)?:\/\//.test(replicaUrl)) { diff --git a/packages/db/scripts/migrate.ts b/packages/db/scripts/migrate.ts index 75ad68eb38c..4211001e670 100644 --- a/packages/db/scripts/migrate.ts +++ b/packages/db/scripts/migrate.ts @@ -6,57 +6,26 @@ import { migrate } from 'drizzle-orm/postgres-js/migrator' import postgres from 'postgres' /** - * Concurrent-index convention (avoid write-blocking index builds on large tables) - * -------------------------------------------------------------------------------- - * drizzle-kit emits plain `CREATE INDEX`, which takes a SHARE lock and blocks all - * writes for the build duration — on a big, write-hot table (e.g. - * workflow_execution_logs, usage_log) that stalls every in-flight workflow - * completion for minutes. drizzle runs migrations inside a transaction, and - * `CREATE INDEX CONCURRENTLY` cannot run inside a transaction block. - * - * So, after generating a migration that adds an index on a large/hot table, edit - * the generated SQL to end drizzle's transaction first, clear the session - * `lock_timeout` (set below for fail-fast DDL; it would cancel CONCURRENTLY's - * legitimate waits on old transactions and leave an INVALID index), build - * concurrently and idempotently, then RESTORE the fail-fast timeout so later - * statements and files in the same run keep the protection (the SET is - * session-level and would otherwise persist): + * Concurrent-index convention: plain `CREATE INDEX` write-blocks large/hot + * tables, and CONCURRENTLY cannot run inside drizzle's migration transaction. + * For indexes on big tables, edit the generated SQL to: * * COMMIT;--> statement-breakpoint * SET lock_timeout = 0;--> statement-breakpoint * CREATE INDEX CONCURRENTLY IF NOT EXISTS "idx_name" ON "table" (...);--> statement-breakpoint * SET lock_timeout = '5s'; * - * Notes: - * - Put the `COMMIT` breakpoint AFTER all transactional DDL (ALTER TABLE/TYPE) - * in the file and only the concurrent CREATE INDEX statements below it. - * - drizzle's migrate() wraps ALL pending files in ONE transaction, so the - * embedded `COMMIT` ends that batch transaction: everything after it — in - * this file AND any later pending files — runs in autocommit, one statement - * at a time. This is why EVERY statement in a file using this convention, - * and in any file that can follow it in a batch, must be idempotent - * (`IF NOT EXISTS` / `ADD COLUMN IF NOT EXISTS` / `ADD VALUE IF NOT EXISTS`): - * a failure after the COMMIT cannot roll back, and the re-run replays every - * file whose journal record has not yet committed. - * - CONCURRENTLY only takes a SHARE UPDATE EXCLUSIVE lock (allows reads/writes). - * - Always validate on staging before prod; a failed CONCURRENTLY build can - * leave an INVALID index that must be dropped and rebuilt — the - * `warnOnInvalidIndexes` check below logs any such index after every run. + * The embedded COMMIT ends the batch transaction, so everything after it (in + * this and later pending files) runs in autocommit and must be idempotent + * (`IF NOT EXISTS` etc.) — a failed run replays unjournaled files from the top. + * A failed CONCURRENTLY build leaves an INVALID index that `IF NOT EXISTS` + * skips; `warnOnInvalidIndexes` below surfaces those. */ /** - * Migrations must run on a DIRECT Postgres connection, never through a - * transaction-pooling PgBouncer. Session-level advisory locks, session `SET`s - * (`statement_timeout`/`lock_timeout` below), and `pg_advisory_unlock` are all - * officially unsupported in transaction pooling — statements can land on - * different server connections, so the lock may not guard the migration, the - * unlock can strand the lock on a pooled connection (wedging the NEXT deploy - * for the full acquisition deadline), and timeout settings can leak into app - * traffic. This is the same reason Prisma requires `directUrl` for migrate. - * - * Set MIGRATION_DATABASE_URL to the direct (non-pooled) DSN in environments - * where DATABASE_URL points at a PgBouncer; it falls back to DATABASE_URL for - * dev/self-hosted setups that connect directly anyway. + * Prefer a direct (non-pooled) DSN: session advisory locks and session `SET`s + * are unsupported through PgBouncer transaction pooling. Falls back to + * DATABASE_URL for setups that connect directly anyway. */ const url = process.env.MIGRATION_DATABASE_URL || process.env.DATABASE_URL if (!url) { @@ -66,63 +35,38 @@ if (!url) { } /** - * The backend-pid session guard below is only sound on a DIRECT connection: - * through a transaction-pooling PgBouncer, consecutive statements can - * legitimately run on different server backends, so a pid change does not mean - * the session was lost and the guard would false-positive on every run. - * MIGRATION_DATABASE_URL is by contract the direct DSN; when falling back to - * DATABASE_URL (which may be pooled), the guard is skipped. + * The pid guard is only sound on a direct connection — through transaction + * pooling, consecutive statements legitimately land on different backends. */ const hasDirectMigrationUrl = Boolean(process.env.MIGRATION_DATABASE_URL) /** - * `max_lifetime: null` is load-bearing: postgres-js defaults to recycling the - * connection after a randomized 30–60 minutes, and a transparent reconnect - * silently drops the session advisory lock and session `SET`s. The migration - * session must live exactly as long as the run. + * `max_lifetime: null` pins the session for the whole run: the postgres-js + * default recycles the connection after 30–60 min, silently dropping the + * session advisory lock and `SET`s. */ const client = postgres(url, { max: 1, connect_timeout: 10, max_lifetime: null }) /** - * Cross-process migration lock key (a stable, app-wide 64-bit constant). - * - * drizzle's `migrate()` has no built-in lock, so when a deployment starts N app - * replicas at once — each with a migration sidecar — all N read - * `__drizzle_migrations`, all see the same migration pending, and all try to apply - * it concurrently. One wins; the losers run the same DDL against already-mutated - * state and die (e.g. `DROP TABLE "form"` → `table "form" does not exist`, - * exit 1 / TaskFailedToStart). - * - * Acquisition is a bounded `pg_try_advisory_lock` retry loop rather than a plain - * `pg_advisory_lock`: an unbounded wait meant one wedged runner silently hung - * every other deploy sidecar (and the whole ECS deployment) behind it. With a - * deadline, a stuck winner turns into a visible non-zero exit on the losers that - * the deploy orchestrator can retry or surface. Session locks auto-release if - * the connection drops, so a crashed runner never wedges the lock. + * Cross-process migration lock. drizzle's `migrate()` has no built-in lock, so + * concurrent runners (one per app replica at deploy time) must be serialized. + * Acquisition is a bounded try-lock loop: a plain `pg_advisory_lock` wait let + * one wedged runner silently hang every other runner and the whole deploy. */ const MIGRATION_LOCK_KEY = 4_961_002_270n const LOCK_ACQUIRE_DEADLINE_MS = 30 * 60_000 const LOCK_RETRY_INTERVAL_MS = 5_000 /** - * How long any single migration statement may QUEUE for a table lock before - * failing with SQLSTATE 55P03. Without this, DDL needing an AccessExclusiveLock - * (e.g. `DROP TABLE ... CASCADE`) queues indefinitely behind long-running reads - * — and every other query on that table queues behind the pending exclusive - * lock, stalling all reads/writes table-wide until the DDL gets its turn - * (observed in production: a ~15-minute full stall). Failing fast keeps the - * world unblocked; we retry below, then let the deploy retry. + * Max time a migration statement may queue for a table lock (SQLSTATE 55P03 on + * expiry). Without it, DDL waiting on an AccessExclusiveLock queues every other + * query on the table behind it — a table-wide stall for the whole wait. */ const DDL_LOCK_TIMEOUT = '5s' const MAX_MIGRATE_ATTEMPTS = 8 const MIGRATE_RETRY_BACKOFF = { baseMs: 2_000, maxMs: 30_000 } as const -/** - * Backend pid of the session that acquired the advisory lock. Re-checked at the - * top of every migration attempt: if the connection was silently replaced - * (server restart, network failure), the new session does NOT hold the lock, - * and running migrations on it would break mutual exclusion — abort loudly. - */ +/** Backend pid of the lock-holding session; a change means the lock was lost. */ let lockSessionPid = 0 try { @@ -166,25 +110,12 @@ async function acquireMigrationLock(): Promise { } /** - * Run pending migrations, retrying when a statement loses the `lock_timeout` - * race (SQLSTATE 55P03, detected anywhere in the error's `cause` chain since - * drizzle wraps driver failures). - * - * Every attempt starts by verifying the session still holds the advisory lock - * (backend pid unchanged) and re-asserting the session timeouts: - * `statement_timeout = 0` because index builds (esp. CONCURRENTLY on large - * tables) can run far longer than any app default and must never be killed - * mid-build, and the fail-fast `lock_timeout` because a prior attempt's - * migration file may have left the session at `lock_timeout = 0` (the - * CONCURRENTLY convention above). `SET` is rejected by Postgres when - * parameterized, so the constants are inlined via `client.unsafe`. - * - * Retry safety: drizzle wraps the whole pending batch in one transaction, so a - * lock-timeout failure rolls the batch back and the retry resumes from the - * first file whose journal record has not committed. Files using the - * embedded-`COMMIT` CONCURRENTLY convention break out of that transaction — - * their post-COMMIT statements are required to be idempotent (see the - * convention notes above) precisely so a replay is safe. + * Run pending migrations, retrying on lock timeout (55P03, found anywhere in + * the wrapped `cause` chain). Each attempt re-verifies the lock session (pid) + * and re-asserts the session timeouts — a migration file may have changed them, + * and `SET` cannot be parameterized, hence `client.unsafe` with constants. + * Replays are safe: drizzle rolls the batch back on failure, and post-COMMIT + * CONCURRENTLY statements are idempotent by convention. */ async function runMigrationsWithRetry(): Promise { for (let attempt = 1; ; attempt++) { @@ -216,10 +147,8 @@ async function runMigrationsWithRetry(): Promise { } /** - * A `CREATE INDEX CONCURRENTLY` that fails partway leaves an INVALID index that - * `IF NOT EXISTS` then silently skips on every future run. Surface any such - * index loudly (warn, don't fail — the migration itself committed) so it can be - * dropped and rebuilt. + * A failed CONCURRENTLY build leaves an INVALID index that `IF NOT EXISTS` + * silently skips forever — surface it (warn only; the migration committed). */ async function warnOnInvalidIndexes(): Promise { try { @@ -242,11 +171,8 @@ async function warnOnInvalidIndexes(): Promise { } /** - * Release the advisory lock without ever failing the process. The session-level - * lock auto-releases when the connection closes, so a thrown unlock — e.g. the - * connection dropped right after `migrate()` committed — must be swallowed. - * Letting it reach the outer `catch` would exit 1 and falsely report a - * successful migration as failed to the deploy orchestrator. + * Unlock errors are swallowed: the session lock auto-releases on disconnect, + * and a thrown unlock would falsely report a committed migration as failed. */ async function releaseMigrationLock(): Promise { try { From 8b6184fe4b4d575aa2afcc6f136369f33c481707 Mon Sep 17 00:00:00 2001 From: waleed Date: Wed, 10 Jun 2026 17:45:55 -0700 Subject: [PATCH 09/12] ci(migrations): map optional direct-connection DSN secrets --- .github/workflows/migrations.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/workflows/migrations.yml b/.github/workflows/migrations.yml index 240972f9a03..ea5ca453968 100644 --- a/.github/workflows/migrations.yml +++ b/.github/workflows/migrations.yml @@ -52,10 +52,14 @@ jobs: # The expression maps the explicit environment input to exactly one repo # secret, so the job never holds another environment's database URL. An # unknown environment resolves to empty and the guard below fails the job. + # MIGRATION_DATABASE_URL is the optional direct (non-pooled) DSN preferred + # by migrate.ts; when the secret is unset it resolves to empty and the + # script falls back to DATABASE_URL. - name: Apply database schema changes working-directory: ./packages/db env: DATABASE_URL: ${{ inputs.environment == 'production' && secrets.DATABASE_URL || inputs.environment == 'staging' && secrets.STAGING_DATABASE_URL || inputs.environment == 'dev' && secrets.DEV_DATABASE_URL || '' }} + MIGRATION_DATABASE_URL: ${{ inputs.environment == 'production' && secrets.MIGRATION_DATABASE_URL || inputs.environment == 'staging' && secrets.STAGING_MIGRATION_DATABASE_URL || '' }} ENVIRONMENT: ${{ inputs.environment }} run: | if [ -z "$DATABASE_URL" ]; then From 1fd5b0dd324080594205d65eae29f0bef8f2bd1e Mon Sep 17 00:00:00 2001 From: waleed Date: Wed, 10 Jun 2026 17:53:41 -0700 Subject: [PATCH 10/12] fix(data-drains): add stability window to time cursors so late-visible rows are never skipped --- apps/sim/lib/data-drains/sources/audit-logs.ts | 3 ++- apps/sim/lib/data-drains/sources/copilot-chats.ts | 9 ++++++++- apps/sim/lib/data-drains/sources/copilot-runs.ts | 2 ++ apps/sim/lib/data-drains/sources/cursor.ts | 11 +++++++++++ apps/sim/lib/data-drains/sources/job-logs.ts | 2 ++ apps/sim/lib/data-drains/sources/workflow-logs.ts | 2 ++ 6 files changed, 27 insertions(+), 2 deletions(-) diff --git a/apps/sim/lib/data-drains/sources/audit-logs.ts b/apps/sim/lib/data-drains/sources/audit-logs.ts index 070aef9886e..b94ce50ae8b 100644 --- a/apps/sim/lib/data-drains/sources/audit-logs.ts +++ b/apps/sim/lib/data-drains/sources/audit-logs.ts @@ -6,6 +6,7 @@ import { encodeTimeCursor, timeCursorOrderBy, timeCursorPredicate, + timeCursorStabilityBound, } from '@/lib/data-drains/sources/cursor' import { getOrganizationWorkspaceIds } from '@/lib/data-drains/sources/helpers' import type { Cursor, DrainSource, SourcePageInput } from '@/lib/data-drains/types' @@ -38,7 +39,7 @@ async function* pages(input: SourcePageInput): AsyncIterable { const rows = await dbReplica .select() .from(auditLog) - .where(and(scopeClause, cursorClause)) + .where(and(scopeClause, timeCursorStabilityBound(auditLog.createdAt), cursorClause)) .orderBy(...timeCursorOrderBy(auditLog.createdAt, auditLog.id)) .limit(input.chunkSize) diff --git a/apps/sim/lib/data-drains/sources/copilot-chats.ts b/apps/sim/lib/data-drains/sources/copilot-chats.ts index c6b406492d4..34a16096381 100644 --- a/apps/sim/lib/data-drains/sources/copilot-chats.ts +++ b/apps/sim/lib/data-drains/sources/copilot-chats.ts @@ -6,6 +6,7 @@ import { encodeTimeCursor, timeCursorOrderBy, timeCursorPredicate, + timeCursorStabilityBound, } from '@/lib/data-drains/sources/cursor' import { getOrganizationWorkspaceIds } from '@/lib/data-drains/sources/helpers' import type { Cursor, DrainSource, SourcePageInput } from '@/lib/data-drains/types' @@ -58,7 +59,13 @@ async function* pages(input: SourcePageInput): AsyncIterable { const metaRows = await dbReplica .select(chatColumns) .from(copilotChats) - .where(and(inArray(copilotChats.workspaceId, workspaceIds), cursorClause)) + .where( + and( + inArray(copilotChats.workspaceId, workspaceIds), + timeCursorStabilityBound(copilotChats.createdAt), + cursorClause + ) + ) .orderBy(...timeCursorOrderBy(copilotChats.createdAt, copilotChats.id)) .limit(input.chunkSize) diff --git a/apps/sim/lib/data-drains/sources/copilot-runs.ts b/apps/sim/lib/data-drains/sources/copilot-runs.ts index 7fe2d8ac22a..698008916e4 100644 --- a/apps/sim/lib/data-drains/sources/copilot-runs.ts +++ b/apps/sim/lib/data-drains/sources/copilot-runs.ts @@ -6,6 +6,7 @@ import { encodeTimeCursor, timeCursorOrderBy, timeCursorPredicate, + timeCursorStabilityBound, } from '@/lib/data-drains/sources/cursor' import { getOrganizationWorkspaceIds } from '@/lib/data-drains/sources/helpers' import type { Cursor, DrainSource, SourcePageInput } from '@/lib/data-drains/types' @@ -31,6 +32,7 @@ async function* pages(input: SourcePageInput): AsyncIterable { and( inArray(copilotRuns.workspaceId, workspaceIds), isNotNull(copilotRuns.completedAt), + timeCursorStabilityBound(copilotRuns.completedAt), cursorClause ) ) diff --git a/apps/sim/lib/data-drains/sources/cursor.ts b/apps/sim/lib/data-drains/sources/cursor.ts index 484e5eb1273..b133a6449d0 100644 --- a/apps/sim/lib/data-drains/sources/cursor.ts +++ b/apps/sim/lib/data-drains/sources/cursor.ts @@ -54,3 +54,14 @@ export function timeCursorPredicate( export function timeCursorOrderBy(timestampCol: PgColumn, idCol: PgColumn): [SQL, SQL] { return [sql`date_trunc('milliseconds', ${timestampCol}) asc`, sql`${idCol} asc`] } + +/** + * Excludes rows newer than a short stability window. Timestamp cursors assume + * rows become visible in timestamp order, but out-of-order commits and replica + * lag can surface an earlier-stamped row after the cursor has advanced past it + * — permanently skipping it. Leaving the freshest rows for the next run bounds + * both. + */ +export function timeCursorStabilityBound(timestampCol: PgColumn): SQL { + return sql`${timestampCol} <= now() - interval '5 minutes'` +} diff --git a/apps/sim/lib/data-drains/sources/job-logs.ts b/apps/sim/lib/data-drains/sources/job-logs.ts index 43d33bd15ba..dcb2d9c98c5 100644 --- a/apps/sim/lib/data-drains/sources/job-logs.ts +++ b/apps/sim/lib/data-drains/sources/job-logs.ts @@ -6,6 +6,7 @@ import { encodeTimeCursor, timeCursorOrderBy, timeCursorPredicate, + timeCursorStabilityBound, } from '@/lib/data-drains/sources/cursor' import { getOrganizationWorkspaceIds } from '@/lib/data-drains/sources/helpers' import type { Cursor, DrainSource, SourcePageInput } from '@/lib/data-drains/types' @@ -31,6 +32,7 @@ async function* pages(input: SourcePageInput): AsyncIterable { and( inArray(jobExecutionLogs.workspaceId, workspaceIds), isNotNull(jobExecutionLogs.endedAt), + timeCursorStabilityBound(jobExecutionLogs.endedAt), cursorClause ) ) diff --git a/apps/sim/lib/data-drains/sources/workflow-logs.ts b/apps/sim/lib/data-drains/sources/workflow-logs.ts index 562e5119577..4466c5a4c60 100644 --- a/apps/sim/lib/data-drains/sources/workflow-logs.ts +++ b/apps/sim/lib/data-drains/sources/workflow-logs.ts @@ -7,6 +7,7 @@ import { encodeTimeCursor, timeCursorOrderBy, timeCursorPredicate, + timeCursorStabilityBound, } from '@/lib/data-drains/sources/cursor' import { getOrganizationWorkspaceIds } from '@/lib/data-drains/sources/helpers' import type { Cursor, DrainSource, SourcePageInput } from '@/lib/data-drains/types' @@ -40,6 +41,7 @@ async function* pages(input: SourcePageInput): AsyncIterable { and( inArray(workflowExecutionLogs.workspaceId, workspaceIds), isNotNull(workflowExecutionLogs.endedAt), + timeCursorStabilityBound(workflowExecutionLogs.endedAt), cursorClause ) ) From 3d6c9662b7fa64b38e2485ac0c9975401ee073dd Mon Sep 17 00:00:00 2001 From: waleed Date: Wed, 10 Jun 2026 18:01:05 -0700 Subject: [PATCH 11/12] fix(billing): resolve pagination cursor on primary; replica for member ledger display --- .../app/api/organizations/[id]/members/[memberId]/route.ts | 3 ++- apps/sim/lib/billing/core/usage-log.ts | 5 ++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/apps/sim/app/api/organizations/[id]/members/[memberId]/route.ts b/apps/sim/app/api/organizations/[id]/members/[memberId]/route.ts index 19592f6b246..f6f3dd68944 100644 --- a/apps/sim/app/api/organizations/[id]/members/[memberId]/route.ts +++ b/apps/sim/app/api/organizations/[id]/members/[memberId]/route.ts @@ -109,7 +109,8 @@ export const GET = withRouteHandler( organizationId, computed.billingPeriodStart && computed.billingPeriodEnd ? { start: computed.billingPeriodStart, end: computed.billingPeriodEnd } - : null + : null, + dbReplica ) ).get(memberId) ?? 0 memberData = { diff --git a/apps/sim/lib/billing/core/usage-log.ts b/apps/sim/lib/billing/core/usage-log.ts index c5b7435d877..147e0406146 100644 --- a/apps/sim/lib/billing/core/usage-log.ts +++ b/apps/sim/lib/billing/core/usage-log.ts @@ -581,7 +581,10 @@ export async function getUserUsageLogs( } if (cursor) { - const cursorLog = await dbReplica + // Cursor resolution stays on the primary: the page itself reads a + // load-balanced replica, and a laggier sibling replica missing the cursor + // row would silently restart pagination from page 1. + const cursorLog = await db .select({ createdAt: usageLog.createdAt }) .from(usageLog) .where(eq(usageLog.id, cursor)) From f07823379661d280737ac5e53fea6f57d294bf81 Mon Sep 17 00:00:00 2001 From: waleed Date: Wed, 10 Jun 2026 18:11:20 -0700 Subject: [PATCH 12/12] fix(billing): usage-limits returns cost and limit from one computation --- apps/sim/app/api/users/me/usage-limits/route.ts | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/apps/sim/app/api/users/me/usage-limits/route.ts b/apps/sim/app/api/users/me/usage-limits/route.ts index f85f46ac317..8f2b18d024e 100644 --- a/apps/sim/app/api/users/me/usage-limits/route.ts +++ b/apps/sim/app/api/users/me/usage-limits/route.ts @@ -1,4 +1,3 @@ -import { dbReplica } from '@sim/db' import { createLogger } from '@sim/logger' import { getErrorMessage } from '@sim/utils/errors' import { type NextRequest, NextResponse } from 'next/server' @@ -6,7 +5,6 @@ import { usageLimitsRequestSchema } from '@/lib/api/contracts/usage-limits' import { AuthType, checkHybridAuth } from '@/lib/auth/hybrid' import { checkServerSideUsageLimits } from '@/lib/billing' import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription' -import { getEffectiveCurrentPeriodCost } from '@/lib/billing/core/usage' import { getUserStorageLimit, getUserStorageUsage } from '@/lib/billing/storage' import { RateLimiter } from '@/lib/core/rate-limiter' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' @@ -42,14 +40,15 @@ export const GET = withRouteHandler(async (request: NextRequest) => { ), ]) - const [usageCheck, effectiveCost, storageUsage, storageLimit] = await Promise.all([ + const [usageCheck, storageUsage, storageLimit] = await Promise.all([ checkServerSideUsageLimits(authenticatedUserId), - getEffectiveCurrentPeriodCost(authenticatedUserId, dbReplica), getUserStorageUsage(authenticatedUserId), getUserStorageLimit(authenticatedUserId), ]) - const currentPeriodCost = effectiveCost + // Same computation as `limit` (one source, one tier) — the pair can never + // disagree under replication lag or mixed baseline/ledger tiers. + const currentPeriodCost = usageCheck.currentUsage return NextResponse.json({ success: true,