Skip to content

Commit 73c97f7

Browse files
committed
fix(logs): keep snapshot dedup a single atomic upsert (no select race)
Addresses review: the DO NOTHING + follow-up select could fail if cleanup deletes the conflicting (orphaned, aged) snapshot between the no-op insert and the select. Revert to one atomic upsert but SET only state_hash, so RETURNING always yields the row (no race) while the unchanged TOASTed state_data jsonb is still not rewritten under MVCC — keeping the per-execution write tiny.
1 parent 0b7ad99 commit 73c97f7

2 files changed

Lines changed: 56 additions & 104 deletions

File tree

apps/sim/lib/logs/execution/snapshot/service.test.ts

Lines changed: 36 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -381,36 +381,24 @@ describe('SnapshotService', () => {
381381
createdAt: Date
382382
}
383383

384-
/** Mock the insert → values → onConflictDoNothing → returning chain. */
385-
function mockInsertReturning(rows: SnapshotRow[]) {
384+
/** Mock the insert → values → onConflictDoUpdate → returning chain. */
385+
function mockUpsertReturning(rows: SnapshotRow[]) {
386386
let capturedConflictConfig: Record<string, unknown> | undefined
387-
const onConflictDoNothing = vi.fn().mockImplementation((config: Record<string, unknown>) => {
387+
const onConflictDoUpdate = vi.fn().mockImplementation((config: Record<string, unknown>) => {
388388
capturedConflictConfig = config
389389
return { returning: vi.fn().mockResolvedValue(rows) }
390390
})
391-
const values = vi.fn().mockReturnValue({ onConflictDoNothing })
391+
const values = vi.fn().mockReturnValue({ onConflictDoUpdate })
392392
databaseMock.db.insert = vi.fn().mockReturnValue({ values })
393-
return {
394-
values,
395-
onConflictDoNothing,
396-
getConflictConfig: () => capturedConflictConfig,
397-
}
398-
}
399-
400-
/** Mock the select → from → where → limit chain used on the reuse path. */
401-
function mockSelectReturning(rows: SnapshotRow[]) {
402-
const limit = vi.fn().mockResolvedValue(rows)
403-
const where = vi.fn().mockReturnValue({ limit })
404-
const from = vi.fn().mockReturnValue({ where })
405-
databaseMock.db.select = vi.fn().mockReturnValue({ from })
406-
return databaseMock.db.select
393+
databaseMock.db.select = vi.fn()
394+
return { values, onConflictDoUpdate, getConflictConfig: () => capturedConflictConfig }
407395
}
408396

409-
it('inserts a new snapshot via onConflictDoNothing without a follow-up select', async () => {
397+
it('inserts a new snapshot in a single atomic upsert', async () => {
410398
const service = new SnapshotService()
411399
const workflowId = 'wf-123'
412400

413-
const { values } = mockInsertReturning([
401+
const { values } = mockUpsertReturning([
414402
{
415403
id: 'generated-uuid-1',
416404
workflowId,
@@ -419,7 +407,6 @@ describe('SnapshotService', () => {
419407
createdAt: new Date('2026-02-19T00:00:00Z'),
420408
},
421409
])
422-
const select = mockSelectReturning([])
423410

424411
const result = await service.createSnapshotWithDeduplication(workflowId, mockState)
425412

@@ -428,57 +415,57 @@ describe('SnapshotService', () => {
428415
)
429416
expect(result.snapshot.id).toBe('generated-uuid-1')
430417
expect(result.isNew).toBe(true)
431-
// New row returned by the insert → no extra read needed.
432-
expect(select).not.toHaveBeenCalled()
418+
// Single atomic statement — never a follow-up select (which would race with cleanup).
419+
expect(databaseMock.db.select).not.toHaveBeenCalled()
433420
})
434421

435-
it('does NOT rewrite state_data on conflict (onConflictDoNothing, no set clause)', async () => {
422+
it('reuses the existing snapshot atomically when the returned id differs', async () => {
436423
const service = new SnapshotService()
437424
const workflowId = 'wf-123'
438425

439-
const { onConflictDoNothing, getConflictConfig } = mockInsertReturning([
426+
mockUpsertReturning([
440427
{
441-
id: 'generated-uuid-1',
428+
id: 'existing-snapshot-id',
442429
workflowId,
443430
stateHash: 'abc123',
444431
stateData: mockState,
445432
createdAt: new Date('2026-02-19T00:00:00Z'),
446433
},
447434
])
448-
mockSelectReturning([])
449435

450-
await service.createSnapshotWithDeduplication(workflowId, mockState)
436+
const result = await service.createSnapshotWithDeduplication(workflowId, mockState)
451437

452-
expect(onConflictDoNothing).toHaveBeenCalledTimes(1)
453-
const config = getConflictConfig()
454-
expect(config?.target).toBeDefined()
455-
// The whole point of this change: no SET clause, so the large jsonb is never rewritten.
456-
expect(config).not.toHaveProperty('set')
438+
expect(result.snapshot.id).toBe('existing-snapshot-id')
439+
expect(result.isNew).toBe(false)
440+
expect(databaseMock.db.select).not.toHaveBeenCalled()
457441
})
458442

459-
it('reuses the existing snapshot via a follow-up select when the insert no-ops', async () => {
443+
it('SET targets only state_hash on conflict, never the large state_data', async () => {
460444
const service = new SnapshotService()
461445
const workflowId = 'wf-123'
462446

463-
mockInsertReturning([]) // conflict → insert returns nothing
464-
const select = mockSelectReturning([
447+
const { onConflictDoUpdate, getConflictConfig } = mockUpsertReturning([
465448
{
466-
id: 'existing-snapshot-id',
449+
id: 'generated-uuid-1',
467450
workflowId,
468451
stateHash: 'abc123',
469452
stateData: mockState,
470453
createdAt: new Date('2026-02-19T00:00:00Z'),
471454
},
472455
])
473456

474-
const result = await service.createSnapshotWithDeduplication(workflowId, mockState)
457+
await service.createSnapshotWithDeduplication(workflowId, mockState)
475458

476-
expect(result.snapshot.id).toBe('existing-snapshot-id')
477-
expect(result.isNew).toBe(false)
478-
expect(select).toHaveBeenCalledTimes(1)
459+
expect(onConflictDoUpdate).toHaveBeenCalledTimes(1)
460+
const config = getConflictConfig()
461+
expect(config?.target).toBeDefined()
462+
// The crux of this change: the SET touches state_hash only, so the unchanged
463+
// TOASTed state_data jsonb is never rewritten.
464+
expect(config?.set).toHaveProperty('stateHash')
465+
expect(config?.set).not.toHaveProperty('stateData')
479466
})
480467

481-
it('does not throw on concurrent inserts with the same hash (loser falls back to select)', async () => {
468+
it('does not throw on concurrent inserts with the same hash', async () => {
482469
const service = new SnapshotService()
483470
const workflowId = 'wf-123'
484471

@@ -491,39 +478,24 @@ describe('SnapshotService', () => {
491478
}
492479
const existingRow: SnapshotRow = { ...newRow, id: 'existing-snapshot-id' }
493480

494-
// First caller wins the insert; second caller's insert no-ops and selects.
495-
let insertCall = 0
481+
let upsertCall = 0
496482
databaseMock.db.insert = vi.fn().mockImplementation(() => ({
497483
values: vi.fn().mockReturnValue({
498-
onConflictDoNothing: vi.fn().mockReturnValue({
499-
returning: vi.fn().mockResolvedValue(insertCall++ === 0 ? [newRow] : []),
484+
onConflictDoUpdate: vi.fn().mockReturnValue({
485+
returning: vi.fn().mockResolvedValue(upsertCall++ === 0 ? [newRow] : [existingRow]),
500486
}),
501487
}),
502488
}))
503-
mockSelectReturning([existingRow])
504489

505490
const [result1, result2] = await Promise.all([
506491
service.createSnapshotWithDeduplication(workflowId, mockState),
507492
service.createSnapshotWithDeduplication(workflowId, mockState),
508493
])
509494

510-
const byId = [result1, result2].sort((a, b) => a.snapshot.id.localeCompare(b.snapshot.id))
511-
expect(byId[0].snapshot.id).toBe('existing-snapshot-id')
512-
expect(byId[0].isNew).toBe(false)
513-
expect(byId[1].snapshot.id).toBe('generated-uuid-1')
514-
expect(byId[1].isNew).toBe(true)
515-
})
516-
517-
it('throws a descriptive error when neither the insert nor the select yields a row', async () => {
518-
const service = new SnapshotService()
519-
const workflowId = 'wf-123'
520-
521-
mockInsertReturning([])
522-
mockSelectReturning([])
523-
524-
await expect(service.createSnapshotWithDeduplication(workflowId, mockState)).rejects.toThrow(
525-
/Failed to create or load execution snapshot/
526-
)
495+
expect(result1.snapshot.id).toBe('generated-uuid-1')
496+
expect(result1.isNew).toBe(true)
497+
expect(result2.snapshot.id).toBe('existing-snapshot-id')
498+
expect(result2.isNew).toBe(false)
527499
})
528500
})
529501

apps/sim/lib/logs/execution/snapshot/service.ts

Lines changed: 20 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -38,51 +38,31 @@ export class SnapshotService implements ISnapshotService {
3838
}
3939

4040
/**
41-
* Insert the snapshot, or do nothing if a row already exists for this
42-
* (workflowId, stateHash). The hash is a sha256 of the normalized state, so
43-
* an existing row's stateData is byte-identical — there is nothing to update.
41+
* Insert the snapshot, or — when an identical (workflowId, stateHash) row
42+
* already exists — return it without rewriting the large stateData jsonb.
4443
*
45-
* The previous implementation used onConflictDoUpdate(set state_data), which
46-
* rewrote the full (tens-of-KB) state jsonb on every execution. Under Postgres
47-
* MVCC that churned a dead tuple + TOAST/WAL per run for no change.
48-
* onConflictDoNothing avoids the write entirely on the reuse path.
44+
* The hash is a sha256 of the normalized state, so an existing row's stateData
45+
* is byte-identical; there is nothing to update. The previous implementation
46+
* SET state_data on conflict, which rewrote the full (tens-of-KB) jsonb every
47+
* run. We keep a single atomic upsert — so RETURNING always yields the row and
48+
* there is no race with snapshot cleanup (unlike DO NOTHING + a follow-up
49+
* select) — but SET only the small state_hash column to itself. Under Postgres
50+
* MVCC the unchanged, TOASTed stateData is not rewritten: its existing
51+
* out-of-line storage is reused, so the per-execution write drops from the
52+
* full blob to a tiny heap tuple.
4953
*/
50-
const [insertedSnapshot] = await db
54+
const [upsertedSnapshot] = await db
5155
.insert(workflowExecutionSnapshots)
5256
.values(snapshotData)
53-
.onConflictDoNothing({
57+
.onConflictDoUpdate({
5458
target: [workflowExecutionSnapshots.workflowId, workflowExecutionSnapshots.stateHash],
59+
set: {
60+
stateHash: sql`excluded.state_hash`,
61+
},
5562
})
5663
.returning()
5764

58-
const isNew = Boolean(insertedSnapshot)
59-
60-
/**
61-
* On conflict the insert returns no row, so load the existing snapshot by its
62-
* unique (workflowId, stateHash). A freshly created snapshot cannot be removed
63-
* in this window — cleanupOrphanedSnapshots only targets rows older than its
64-
* cutoff — so this lookup is guaranteed to resolve.
65-
*/
66-
const snapshotRow =
67-
insertedSnapshot ??
68-
(
69-
await db
70-
.select()
71-
.from(workflowExecutionSnapshots)
72-
.where(
73-
and(
74-
eq(workflowExecutionSnapshots.workflowId, workflowId),
75-
eq(workflowExecutionSnapshots.stateHash, stateHash)
76-
)
77-
)
78-
.limit(1)
79-
)[0]
80-
81-
if (!snapshotRow) {
82-
throw new Error(
83-
`Failed to create or load execution snapshot for workflow ${workflowId} (hash: ${stateHash.slice(0, 12)}...)`
84-
)
85-
}
65+
const isNew = upsertedSnapshot.id === snapshotData.id
8666

8767
logger.info(
8868
isNew
@@ -92,9 +72,9 @@ export class SnapshotService implements ISnapshotService {
9272

9373
return {
9474
snapshot: {
95-
...snapshotRow,
96-
stateData: snapshotRow.stateData as WorkflowState,
97-
createdAt: snapshotRow.createdAt.toISOString(),
75+
...upsertedSnapshot,
76+
stateData: upsertedSnapshot.stateData as WorkflowState,
77+
createdAt: upsertedSnapshot.createdAt.toISOString(),
9878
},
9979
isNew,
10080
}

0 commit comments

Comments
 (0)