Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 58 additions & 15 deletions apps/sim/lib/copilot/chat/messages-dual-write.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ const assistantMsg: PersistedMessage = {
timestamp: '2026-01-01T00:00:01.000Z',
}

/** The first arg passed to the most recent `.values(...)` call. */
function lastValuesRows() {
const calls = dbChainMockFns.values.mock.calls
return calls[calls.length - 1][0] as Array<Record<string, unknown>>
}

describe('messages-dual-write', () => {
beforeEach(() => {
vi.clearAllMocks()
Expand All @@ -43,7 +49,7 @@ describe('messages-dual-write', () => {

expect(dbChainMockFns.insert).toHaveBeenCalledTimes(1)
expect(dbChainMockFns.values).toHaveBeenCalledTimes(1)
const rows = dbChainMockFns.values.mock.calls[0][0]
const rows = lastValuesRows()
expect(rows).toHaveLength(2)

expect(rows[0]).toMatchObject({
Expand All @@ -54,22 +60,34 @@ describe('messages-dual-write', () => {
model: null,
streamId: null,
})
expect(rows[0].createdAt).toEqual(new Date(userMsg.timestamp))
expect(rows[0].updatedAt).toEqual(new Date(userMsg.timestamp))
expect(rows[0].createdAt as Date).toEqual(new Date(userMsg.timestamp))
expect(rows[0].updatedAt as Date).toEqual(new Date(userMsg.timestamp))

expect(rows[1]).toMatchObject({
chatId: 'chat-1',
messageId: 'msg-asst-1',
role: 'assistant',
content: assistantMsg,
})
expect(rows[1].createdAt).toEqual(new Date(assistantMsg.timestamp))
expect(rows[1].createdAt as Date).toEqual(new Date(assistantMsg.timestamp))
})

it('preserves per-message ordering via timestamp', async () => {
it('assigns seq as 0-based array index when the chat has no prior rows', async () => {
dbChainMockFns.where.mockResolvedValueOnce([{ maxSeq: null }])

await appendCopilotChatMessages('chat-1', [userMsg, assistantMsg])
const rows = dbChainMockFns.values.mock.calls[0][0]
expect(rows[0].createdAt.getTime()).toBeLessThan(rows[1].createdAt.getTime())
const rows = lastValuesRows()
expect(rows[0].seq).toBe(0)
expect(rows[1].seq).toBe(1)
})
Comment thread
waleedlatif1 marked this conversation as resolved.

it('continues seq from MAX(seq)+1 when the chat already has rows', async () => {
dbChainMockFns.where.mockResolvedValueOnce([{ maxSeq: 4 }])

await appendCopilotChatMessages('chat-1', [userMsg, assistantMsg])
const rows = lastValuesRows()
expect(rows[0].seq).toBe(5)
expect(rows[1].seq).toBe(6)
})

it('passes chatModel and streamId options to every row', async () => {
Expand All @@ -78,14 +96,14 @@ describe('messages-dual-write', () => {
streamId: 'stream-xyz',
})

const rows = dbChainMockFns.values.mock.calls[0][0]
const rows = lastValuesRows()
expect(rows[0].model).toBe('claude-sonnet-4-5')
expect(rows[0].streamId).toBe('stream-xyz')
expect(rows[1].model).toBe('claude-sonnet-4-5')
expect(rows[1].streamId).toBe('stream-xyz')
})

it('uses ON CONFLICT DO UPDATE with chat_id + message_id target', async () => {
it('uses ON CONFLICT DO UPDATE that PRESERVES existing seq', async () => {
await appendCopilotChatMessages('chat-1', [userMsg])

expect(dbChainMockFns.onConflictDoUpdate).toHaveBeenCalledTimes(1)
Expand All @@ -96,6 +114,14 @@ describe('messages-dual-write', () => {
expect(conflictArg.set).toHaveProperty('model')
expect(conflictArg.set).toHaveProperty('streamId')
expect(conflictArg.set).toHaveProperty('updatedAt')
expect(conflictArg.set.seq.strings.join('')).toContain('COALESCE(')
})

it('collapses duplicate message ids to a single row', async () => {
await appendCopilotChatMessages('chat-1', [userMsg, { ...userMsg, content: 'dupe' }])
const rows = lastValuesRows()
expect(rows).toHaveLength(1)
expect(rows[0].messageId).toBe('msg-user-1')
})

it('swallows DB errors so the legacy JSONB write stays canonical', async () => {
Expand All @@ -120,25 +146,42 @@ describe('messages-dual-write', () => {
expect(dbChainMockFns.delete).toHaveBeenCalledTimes(1)
expect(dbChainMockFns.insert).toHaveBeenCalledTimes(1)

const rows = dbChainMockFns.values.mock.calls[0][0]
const rows = lastValuesRows()
expect(rows).toHaveLength(2)
expect(rows.map((r: { messageId: string }) => r.messageId)).toEqual([
'msg-user-1',
'msg-asst-1',
])
expect(rows.map((r) => r.messageId)).toEqual(['msg-user-1', 'msg-asst-1'])

expect(dbChainMockFns.onConflictDoUpdate).toHaveBeenCalledTimes(1)
const conflictArg = dbChainMockFns.onConflictDoUpdate.mock.calls[0][0]
expect(conflictArg.set).toHaveProperty('streamId')
expect(conflictArg.set).toHaveProperty('model')
})

it('assigns seq as the snapshot array index (0-based)', async () => {
await replaceCopilotChatMessages('chat-1', [userMsg, assistantMsg])
const rows = lastValuesRows()
expect(rows[0].seq).toBe(0)
expect(rows[1].seq).toBe(1)
})

it('OVERWRITES seq on conflict so positions re-densify after a delete', async () => {
await replaceCopilotChatMessages('chat-1', [userMsg])
const conflictArg = dbChainMockFns.onConflictDoUpdate.mock.calls[0][0]
expect(conflictArg.set.seq.strings.join('')).toBe('excluded.seq')
})

it('collapses duplicate message ids to a single row', async () => {
await replaceCopilotChatMessages('chat-1', [userMsg, { ...userMsg, content: 'dupe' }])
const rows = lastValuesRows()
expect(rows).toHaveLength(1)
expect(rows[0].seq).toBe(0)
})

it('passes chatModel to every row in the snapshot', async () => {
await replaceCopilotChatMessages('chat-1', [userMsg], {
chatModel: 'gpt-4o-mini',
})

const rows = dbChainMockFns.values.mock.calls[0][0]
const rows = lastValuesRows()
expect(rows[0].model).toBe('gpt-4o-mini')
})

Expand Down
50 changes: 42 additions & 8 deletions apps/sim/lib/copilot/chat/messages-dual-write.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,26 @@ import type { PersistedMessage } from '@/lib/copilot/chat/persisted-message'

const logger = createLogger('CopilotMessagesDualWrite')

/**
* Keep the first occurrence of each message id. A single `INSERT ... ON
* CONFLICT` cannot touch the same conflict target twice, so a repeated id
* would otherwise throw.
*/
function dedupeById(messages: PersistedMessage[]): PersistedMessage[] {
const seen = new Set<string>()
const out: PersistedMessage[] = []
for (const m of messages) {
if (seen.has(m.id)) continue
seen.add(m.id)
out.push(m)
}
return out
}

function toRow(
chatId: string,
message: PersistedMessage,
seq: number,
options?: { chatModel?: string | null; streamId?: string | null }
): typeof copilotMessages.$inferInsert {
const ts = new Date(message.timestamp)
Expand All @@ -18,6 +35,7 @@ function toRow(
messageId: message.id,
role: message.role,
content: message,
seq,
model: options?.chatModel ?? null,
streamId: options?.streamId ?? null,
createdAt: ts,
Expand All @@ -27,8 +45,15 @@ function toRow(

/**
* Append messages to the new `copilot_messages` table. Best-effort — errors
* are logged but never thrown, since the legacy `copilot_chats.messages`
* JSONB column remains the source of truth during the dual-write rollout.
* are logged but never thrown; the legacy `copilot_chats.messages` JSONB
* column stays the source of truth during the dual-write rollout.
*
* `seq` is `MAX(seq) + index`, computed in JS (not in SQL, where every row of
* a multi-row INSERT would read the same pre-insert MAX and collide). The
* read-then-insert is non-atomic, so interleaved appends to one chat can tie
* `seq`; that window is bounded by the cutover read order (`seq, created_at,
* id`) and `replaceCopilotChatMessages`, which re-densifies `seq` from the
* authoritative JSONB order on the next snapshot save.
*/
export async function appendCopilotChatMessages(
chatId: string,
Expand All @@ -37,16 +62,23 @@ export async function appendCopilotChatMessages(
): Promise<void> {
if (messages.length === 0) return
try {
const deduped = dedupeById(messages)
const [maxRow] = await db
.select({ maxSeq: sql<number | null>`max(${copilotMessages.seq})` })
.from(copilotMessages)
.where(eq(copilotMessages.chatId, chatId))
const base = (maxRow?.maxSeq ?? -1) + 1
await db
.insert(copilotMessages)
.values(messages.map((m) => toRow(chatId, m, options)))
.values(deduped.map((m, i) => toRow(chatId, m, base + i, options)))
Comment thread
waleedlatif1 marked this conversation as resolved.
.onConflictDoUpdate({
target: [copilotMessages.chatId, copilotMessages.messageId],
set: {
content: sql`excluded.content`,
role: sql`excluded.role`,
model: sql`COALESCE(excluded.model, ${copilotMessages.model})`,
streamId: sql`COALESCE(excluded.stream_id, ${copilotMessages.streamId})`,
seq: sql`COALESCE(${copilotMessages.seq}, excluded.seq)`,
updatedAt: sql`now()`,
},
})
Expand All @@ -69,7 +101,8 @@ export async function replaceCopilotChatMessages(
options?: { chatModel?: string | null }
): Promise<void> {
try {
const newMessageIds = messages.map((m) => m.id)
const deduped = dedupeById(messages)
const newMessageIds = deduped.map((m) => m.id)
await db.transaction(async (tx) => {
// Drop rows for messages not in the new snapshot.
await tx
Expand All @@ -82,19 +115,20 @@ export async function replaceCopilotChatMessages(
)
: eq(copilotMessages.chatId, chatId)
)
if (messages.length === 0) return
// Upsert remaining rows. ON CONFLICT preserves existing stream_id / model
// so a snapshot save doesn't clobber metadata set during streaming.
if (deduped.length === 0) return
// Snapshot is authoritative on order, so seq = array index is overwritten
// on conflict; stream_id / model are preserved via COALESCE.
await tx
.insert(copilotMessages)
.values(messages.map((m) => toRow(chatId, m, options)))
.values(deduped.map((m, i) => toRow(chatId, m, i, options)))
.onConflictDoUpdate({
target: [copilotMessages.chatId, copilotMessages.messageId],
set: {
content: sql`excluded.content`,
role: sql`excluded.role`,
model: sql`COALESCE(excluded.model, ${copilotMessages.model})`,
streamId: sql`COALESCE(excluded.stream_id, ${copilotMessages.streamId})`,
seq: sql`excluded.seq`,
updatedAt: sql`now()`,
},
})
Expand Down
19 changes: 19 additions & 0 deletions packages/db/migrations/0219_amused_leo.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
ALTER TABLE "copilot_messages" ADD COLUMN "seq" integer;--> statement-breakpoint
WITH ordered AS (
SELECT c."id" AS chat_id, elem.value->>'id' AS message_id, elem.ord AS ord
FROM "copilot_chats" c
CROSS JOIN LATERAL jsonb_array_elements(c."messages") WITH ORDINALITY AS elem(value, ord)
WHERE jsonb_typeof(c."messages") = 'array' AND jsonb_array_length(c."messages") > 0
),
first_occurrence AS (
SELECT chat_id, message_id, MIN(ord) AS first_ord FROM ordered GROUP BY chat_id, message_id
),
ranked AS (
SELECT chat_id, message_id,
(ROW_NUMBER() OVER (PARTITION BY chat_id ORDER BY first_ord) - 1) AS seq
FROM first_occurrence
)
UPDATE "copilot_messages" m SET "seq" = r.seq
FROM ranked r
WHERE m."chat_id" = r.chat_id AND m."message_id" = r.message_id;--> statement-breakpoint
CREATE INDEX "copilot_messages_chat_seq_idx" ON "copilot_messages" USING btree ("chat_id","seq") WHERE "copilot_messages"."deleted_at" IS NULL;
Loading
Loading