diff --git a/.server-changes/runs-child-status-tooltip.md b/.server-changes/runs-child-status-tooltip.md new file mode 100644 index 00000000000..eb9644cecb7 --- /dev/null +++ b/.server-changes/runs-child-status-tooltip.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: feature +--- + +Root run status cells on the runs table show a tooltip with a breakdown of child run statuses, aggregated in ClickHouse (roots resolved in Postgres by friendly ID). diff --git a/.server-changes/runs-list-live-reload.md b/.server-changes/runs-list-live-reload.md new file mode 100644 index 00000000000..fdad689b51e --- /dev/null +++ b/.server-changes/runs-list-live-reload.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: feature +--- + +The runs index live-reloads visible run statuses and shows a "new runs created" refresh banner. Polling pauses while the browser tab is hidden. diff --git a/apps/webapp/app/components/primitives/Buttons.tsx b/apps/webapp/app/components/primitives/Buttons.tsx index 600ff9da325..0f9f4e9a4d7 100644 --- a/apps/webapp/app/components/primitives/Buttons.tsx +++ b/apps/webapp/app/components/primitives/Buttons.tsx @@ -318,12 +318,12 @@ export function ButtonContent(props: ButtonContentPropsType) { type ButtonPropsType = Pick< JSX.IntrinsicElements["button"], - "type" | "disabled" | "onClick" | "name" | "value" | "form" | "autoFocus" + "type" | "disabled" | "onClick" | "name" | "value" | "form" | "autoFocus" | "aria-label" > & React.ComponentProps; export const Button = forwardRef( - ({ type, disabled, autoFocus, onClick, ...props }, ref) => { + ({ type, disabled, autoFocus, onClick, "aria-label": ariaLabel, ...props }, ref) => { const innerRef = useRef(null); useImperativeHandle(ref, () => innerRef.current as HTMLButtonElement); @@ -352,6 +352,7 @@ export const Button = forwardRef( ref={innerRef} form={props.form} autoFocus={autoFocus} + aria-label={ariaLabel} > + + + + ); +} diff --git a/apps/webapp/app/components/primitives/Tooltip.tsx b/apps/webapp/app/components/primitives/Tooltip.tsx index c03492abcfd..92f95babce0 100644 --- a/apps/webapp/app/components/primitives/Tooltip.tsx +++ b/apps/webapp/app/components/primitives/Tooltip.tsx @@ -66,6 +66,7 @@ function SimpleTooltip({ sideOffset, open, onOpenChange, + delayDuration, }: { button: React.ReactNode; content: React.ReactNode; @@ -80,12 +81,13 @@ function SimpleTooltip({ sideOffset?: number; open?: boolean; onOpenChange?: (open: boolean) => void; + delayDuration?: number; }) { return ( - + a.status.localeCompare(b.status)) + .map((entry) => `${entry.status}:${entry.count}`) + .join("|"); +} + +function areChildStatusesEqual(previous: ChildStatusEntry[] | undefined, next: ChildStatusEntry[]) { + if (previous === undefined) return false; + return childStatusesKey(previous) === childStatusesKey(next); +} + +function hasActiveChildStatuses(statuses: ChildStatusEntry[] | undefined) { + if (statuses === undefined) return false; + + return statuses.some((entry) => entry.count > 0 && !isFinalRunStatus(entry.status)); +} + +function shouldPollWhileTooltipOpen( + statuses: ChildStatusEntry[] | undefined, + rootHasFinished: boolean +) { + if (statuses === undefined) return true; + // Empty child statuses while the root is still running can mean + // children have not been created yet, so keep polling. + if (statuses.length === 0) return !rootHasFinished; + + // All current children may be final while the root is still running — more + // dependents can still be created. + return hasActiveChildStatuses(statuses) || !rootHasFinished; +} + +function ChildStatusBreakdown({ + orderedChildStatuses, +}: { + orderedChildStatuses: { status: NextRunListItem["status"]; count: number }[]; +}) { + return ( +
+

Child run statuses

+ + {orderedChildStatuses.map((entry) => ( + + + + {entry.count} + + + ))} + +
+ ); +} + +function useChildRunStatusesTooltip({ + friendlyId, + hasFinished, + childrenStatusesBasePath, +}: { + friendlyId: string; + hasFinished: boolean; + childrenStatusesBasePath: string; +}) { + const fetcher = useFetcher({ + key: `child-statuses-${friendlyId}`, + }); + const fetcherStateRef = useRef(fetcher.state); + fetcherStateRef.current = fetcher.state; + + const [childStatuses, setChildStatuses] = useState(); + const isOpenRef = useRef(false); + const pollIntervalRef = useRef>(); + const prevHasFinishedRef = useRef(hasFinished); + + const childrenStatusesUrl = useMemo( + () => `${childrenStatusesBasePath}/children-statuses?runIds=${encodeURIComponent(friendlyId)}`, + [childrenStatusesBasePath, friendlyId] + ); + + const loadChildStatuses = useCallback(() => { + if (fetcherStateRef.current !== "idle") return; + fetcher.load(childrenStatusesUrl); + }, [childrenStatusesUrl, fetcher]); + + // Keep the latest loader callback available to the polling interval + // without recreating the interval on every render. + const loadChildStatusesRef = useRef(loadChildStatuses); + loadChildStatusesRef.current = loadChildStatuses; + + const stopPolling = useCallback(() => { + if (pollIntervalRef.current) { + clearInterval(pollIntervalRef.current); + pollIntervalRef.current = undefined; + } + }, []); + + const startPolling = useCallback(() => { + if (pollIntervalRef.current) return; + + pollIntervalRef.current = setInterval(() => { + if (document.visibilityState !== "visible") return; + loadChildStatusesRef.current(); + }, TOOLTIP_POLL_INTERVAL_MS); + }, []); + + useEffect(() => { + if (!fetcher.data?.runs) return; + + const entry = fetcher.data.runs.find((run) => run.friendlyId === friendlyId); + if (!entry) return; + + setChildStatuses((previous) => + areChildStatusesEqual(previous, entry.statuses) ? previous : entry.statuses + ); + + if (isOpenRef.current && !shouldPollWhileTooltipOpen(entry.statuses, hasFinished)) { + stopPolling(); + } + }, [fetcher.data, friendlyId, hasFinished, stopPolling]); + + const onOpenChange = useCallback( + (open: boolean) => { + isOpenRef.current = open; + if (open) { + loadChildStatuses(); + startPolling(); + } else { + stopPolling(); + } + }, + [loadChildStatuses, startPolling, stopPolling] + ); + + useEffect(() => { + prevHasFinishedRef.current = hasFinished; + stopPolling(); + setChildStatuses(undefined); + if (isOpenRef.current) { + loadChildStatuses(); + startPolling(); + } + // Only reset when the hovered run changes, not when hasFinished toggles. + // eslint-disable-next-line react-hooks/exhaustive-deps -- friendlyId + }, [friendlyId]); + + useEffect(() => { + if (!isOpenRef.current) return; + if (prevHasFinishedRef.current === hasFinished) return; + + prevHasFinishedRef.current = hasFinished; + loadChildStatuses(); + }, [hasFinished, loadChildStatuses]); + + useEffect(() => () => stopPolling(), [stopPolling]); + + return { + childStatuses, + onOpenChange, + }; +} + +export function RunStatusCellTooltip({ + friendlyId, + status, + hasFinished, + childrenStatusesBasePath, +}: { + friendlyId: string; + status: NextRunListItem["status"]; + hasFinished: boolean; + childrenStatusesBasePath: string; +}) { + const { childStatuses, onOpenChange } = useChildRunStatusesTooltip({ + friendlyId, + hasFinished, + childrenStatusesBasePath, + }); + + const orderedChildStatuses = useMemo(() => { + const childStatusesMap = new Map( + (childStatuses ?? []).map((entry) => [entry.status, entry.count]) + ); + + return allTaskRunStatuses + .map((s) => ({ + status: s, + count: childStatusesMap.get(s) ?? 0, + })) + .filter((entry) => entry.count > 0); + }, [childStatuses]); + + const hasChildStatuses = orderedChildStatuses.length > 0; + + return ( + + ) : ( + descriptionForTaskRunStatus(status) + ) + } + disableHoverableContent + button={ + + + + } + /> + ); +} diff --git a/apps/webapp/app/components/runs/v3/TaskRunsTable.tsx b/apps/webapp/app/components/runs/v3/TaskRunsTable.tsx index 5e645dab877..9c670864900 100644 --- a/apps/webapp/app/components/runs/v3/TaskRunsTable.tsx +++ b/apps/webapp/app/components/runs/v3/TaskRunsTable.tsx @@ -57,6 +57,7 @@ import { filterableTaskRunStatuses, TaskRunStatusCombo, } from "./TaskRunStatus"; +import { RunStatusCellTooltip } from "./RunStatusCellTooltip"; import { TaskTriggerSourceIcon } from "./TaskTriggerSource"; import { useOptimisticLocation } from "~/hooks/useOptimisticLocation"; import { useSearchParams } from "~/hooks/useSearchParam"; @@ -74,6 +75,7 @@ type RunsTableProps = { variant?: TableVariant; disableAdjacentRows?: boolean; additionalTableState?: Record; + childrenStatusesBasePath?: string; }; export function TaskRunsTable({ @@ -87,6 +89,7 @@ export function TaskRunsTable({ allowSelection = false, variant = "dimmed", additionalTableState, + childrenStatusesBasePath, }: RunsTableProps) { const regions = useRegions(); const regionByMasterQueue = new Map(regions.map((r) => [r.masterQueue, r] as const)); @@ -371,11 +374,20 @@ export function TaskRunsTable({ {run.version ?? "–"} - } - /> + {run.rootTaskRunId === null && childrenStatusesBasePath ? ( + + ) : ( + } + /> + )} {run.startedAt ? : "–"} diff --git a/apps/webapp/app/hooks/useInterval.ts b/apps/webapp/app/hooks/useInterval.ts index 4d5413e977e..b59884d455a 100644 --- a/apps/webapp/app/hooks/useInterval.ts +++ b/apps/webapp/app/hooks/useInterval.ts @@ -6,6 +6,8 @@ type UseIntervalOptions = { onLoad?: boolean; onFocus?: boolean; disabled?: boolean; + /** Skip interval ticks while the document tab is hidden */ + pauseWhenHidden?: boolean; callback: () => void; }; @@ -14,6 +16,7 @@ export function useInterval({ onLoad = true, onFocus = true, disabled = false, + pauseWhenHidden = false, callback, }: UseIntervalOptions) { // Always keep the latest callback in a ref so the effects below @@ -28,11 +31,14 @@ export function useInterval({ if (!interval || interval <= 0 || disabled) return; const intervalId = setInterval(() => { + if (pauseWhenHidden && document.visibilityState !== "visible") { + return; + } latestCallback.current(); }, interval); return () => clearInterval(intervalId); - }, [interval, disabled]); + }, [interval, disabled, pauseWhenHidden]); // On focus useEffect(() => { diff --git a/apps/webapp/app/presenters/v3/mapRunToLiveFields.server.ts b/apps/webapp/app/presenters/v3/mapRunToLiveFields.server.ts new file mode 100644 index 00000000000..abbf1db7fd3 --- /dev/null +++ b/apps/webapp/app/presenters/v3/mapRunToLiveFields.server.ts @@ -0,0 +1,21 @@ +import { isCancellableRunStatus, isFinalRunStatus, isPendingRunStatus } from "~/v3/taskStatus"; +import type { ListedRun } from "~/services/runsRepository/runsRepository.server"; + +export function mapRunToLiveFields(run: ListedRun) { + const hasFinished = isFinalRunStatus(run.status); + const startedAt = run.startedAt ?? run.lockedAt; + + return { + friendlyId: run.friendlyId, + status: run.status, + updatedAt: run.updatedAt.toISOString(), + startedAt: startedAt?.toISOString(), + finishedAt: hasFinished ? run.completedAt?.toISOString() ?? run.updatedAt.toISOString() : undefined, + hasFinished, + isCancellable: isCancellableRunStatus(run.status), + isPending: isPendingRunStatus(run.status), + usageDurationMs: Number(run.usageDurationMs), + costInCents: run.costInCents, + baseCostInCents: run.baseCostInCents, + }; +} diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs._index/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs._index/route.tsx index cc41f738a29..3dff2bcd335 100644 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs._index/route.tsx +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs._index/route.tsx @@ -1,5 +1,5 @@ import { BeakerIcon, BookOpenIcon } from "@heroicons/react/24/solid"; -import { type MetaFunction, useNavigation } from "@remix-run/react"; +import { type MetaFunction, useNavigation, useRevalidator } from "@remix-run/react"; import { type LoaderFunctionArgs } from "@remix-run/server-runtime"; import { Suspense } from "react"; import { @@ -14,11 +14,12 @@ import { DevDisconnectedBanner, useDevPresence } from "~/components/DevPresence" import { StepContentContainer } from "~/components/StepContentContainer"; import { MainCenteredContainer, PageBody } from "~/components/layout/AppLayout"; import { Badge } from "~/components/primitives/Badge"; -import { LinkButton } from "~/components/primitives/Buttons"; +import { Button, LinkButton } from "~/components/primitives/Buttons"; import { Header1 } from "~/components/primitives/Headers"; import { InfoPanel } from "~/components/primitives/InfoPanel"; import { NavBar, PageAccessories, PageTitle } from "~/components/primitives/PageHeader"; import { Paragraph } from "~/components/primitives/Paragraph"; +import { PulsingDot } from "~/components/primitives/PulsingDot"; import { RESIZABLE_PANEL_ANIMATION, ResizableHandle, @@ -64,6 +65,7 @@ import { throwNotFound } from "~/utils/httpErrors"; import { ListPagination } from "../../components/ListPagination"; import { CreateBulkActionInspector } from "../resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.bulkaction"; import { Callout } from "~/components/primitives/Callout"; +import { useRunsLiveReload } from "./useRunsLiveReload"; export const meta: MetaFunction = () => { return [ @@ -89,7 +91,10 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { const filters = await getRunFiltersFromRequest(request); - const clickhouse = await clickhouseFactory.getClickhouseForOrganization(project.organizationId, "standard"); + const clickhouse = await clickhouseFactory.getClickhouseForOrganization( + project.organizationId, + "standard" + ); const presenter = new NextRunListPresenter($replica, clickhouse); const list = presenter.call(project.organizationId, environment.id, { userId, @@ -203,12 +208,36 @@ function RunsList({ rootOnlyDefault: boolean; filters: TaskRunListSearchFilters; }) { + const revalidator = useRevalidator(); const navigation = useNavigation(); const isLoading = navigation.state !== "idle"; const organization = useOrganization(); const project = useProject(); const environment = useEnvironment(); const { has, replace } = useSearchParams(); + const { visibleRuns, showNewRunsBanner, newRunsCount, dismissNewRuns, childrenStatusesBasePath } = + useRunsLiveReload({ + runs: list.runs, + hasAnyRuns: list.hasAnyRuns, + isLoading, + organizationSlug: organization.slug, + projectSlug: project.slug, + environmentSlug: environment.slug, + }); + + const onClickShowNewRuns = () => { + const isPaginated = has("cursor") || has("direction"); + dismissNewRuns(); + if (isPaginated) { + replace({ + cursor: undefined, + direction: undefined, + }); + return; + } + + revalidator.revalidate(); + }; // Shortcut keys for bulk actions useShortcutKeys({ @@ -265,6 +294,22 @@ function RunsList({ rootOnlyDefault={rootOnlyDefault} />
+ {showNewRunsBanner && ( + + + + )} {!isShowingBulkActionInspector && ( >["runs"][number]; +type LivePollFetcherData = Awaited> | undefined; + +function hasNewRunsCountFields( + data: LivePollFetcherData +): data is NonNullable & { count: number; since: number } { + return data !== undefined && "count" in data && "since" in data; +} + +function maxCreatedAtMs(runs: ListedRun[]): number | undefined { + if (runs.length === 0) return undefined; + + return runs.reduce((maxTimestamp, run) => { + const runTimestamp = new Date(run.createdAt).getTime(); + return Math.max(maxTimestamp, runTimestamp); + }, 0); +} + +function filterParamsWithoutPagination(search: string) { + const params = new URLSearchParams(search); + for (const key of RUNS_SEARCH_PARAMS_TO_REMOVE) { + params.delete(key); + } + return params; +} + +function getRunsSearchKeyWithoutPagination(search: string) { + return filterParamsWithoutPagination(search).toString(); +} + +function isNewRunsCheckTick(tick: number) { + return tick === 1 || tick % NEW_RUNS_EVERY_N_POLL_TICKS === 0; +} + +function appendNewRunsSearchParams( + searchParams: URLSearchParams, + { locationSearch, since }: { locationSearch: string; since: number } +) { + const filterParams = filterParamsWithoutPagination(locationSearch); + for (const [key, value] of filterParams) { + searchParams.append(key, value); + } + searchParams.set("includeNewRuns", "true"); + searchParams.set("since", String(since)); +} + +function patchVisibleRunsWithLiveUpdates(currentRuns: ListedRun[], liveRuns: LiveRunFields[]) { + const updatesById = new Map(liveRuns.map((run) => [run.friendlyId, run])); + + return currentRuns.map((run) => { + const update = updatesById.get(run.friendlyId); + if (!update) return run; + + return { + ...run, + status: update.status, + updatedAt: update.updatedAt, + startedAt: update.startedAt, + finishedAt: update.finishedAt, + hasFinished: update.hasFinished, + isCancellable: update.isCancellable, + isPending: update.isPending, + usageDurationMs: update.usageDurationMs, + costInCents: update.costInCents, + baseCostInCents: update.baseCostInCents, + }; + }); +} + +function useNewRunsDetection({ + runs, + hasAnyRuns, + isLoading, +}: { + runs: ListedRun[]; + hasAnyRuns: boolean; + isLoading: boolean; +}) { + const pollTickRef = useRef(0); + const [knownNewestRunMs, setKnownNewestRunMs] = useState(() => maxCreatedAtMs(runs) ?? Date.now()); + const [newRunsCount, setNewRunsCount] = useState(0); + + const shouldPollForNewRuns = hasAnyRuns && !isLoading && newRunsCount < 100; + + // Re-baseline the cutoff and clear banner/throttle state. The parent calls + // this from its single "list context changed" reset path. + const resetNewRunsTracking = useCallback(() => { + setKnownNewestRunMs(maxCreatedAtMs(runs) ?? Date.now()); + setNewRunsCount(0); + pollTickRef.current = 0; + }, [runs]); + + const dismissNewRuns = useCallback(() => { + setNewRunsCount(0); + setKnownNewestRunMs(Date.now()); + pollTickRef.current = 0; + }, []); + + const checkNewRunsOnTick = useCallback(() => { + pollTickRef.current += 1; + return shouldPollForNewRuns && isNewRunsCheckTick(pollTickRef.current); + }, [shouldPollForNewRuns]); + + const showNewRunsBanner = newRunsCount > 0; + + return { + knownNewestRunMs, + newRunsCount, + setNewRunsCount, + shouldPollForNewRuns, + showNewRunsBanner, + dismissNewRuns, + checkNewRunsOnTick, + resetNewRunsTracking, + }; +} + +export function useRunsLiveReload({ + runs, + hasAnyRuns, + isLoading, + organizationSlug, + projectSlug, + environmentSlug, +}: { + runs: ListedRun[]; + hasAnyRuns: boolean; + isLoading: boolean; + organizationSlug: string; + projectSlug: string; + environmentSlug: string; +}) { + const location = useLocation(); + const runsPollFetcher = useTypedFetcher(); + const runsPollFetcherStateRef = useRef(runsPollFetcher.state); + runsPollFetcherStateRef.current = runsPollFetcher.state; + + const [visibleRuns, setVisibleRuns] = useState(runs); + + const searchKeyWithoutPagination = useMemo( + () => getRunsSearchKeyWithoutPagination(location.search), + [location.search] + ); + + const { + knownNewestRunMs, + newRunsCount, + setNewRunsCount, + shouldPollForNewRuns, + showNewRunsBanner, + dismissNewRuns, + checkNewRunsOnTick, + resetNewRunsTracking, + } = useNewRunsDetection({ + runs, + hasAnyRuns, + isLoading, + }); + + // Single reset path: new loader data or changed filters re-baseline both the + // visible rows and new-run tracking. + useEffect(() => { + setVisibleRuns(runs); + resetNewRunsTracking(); + }, [runs, searchKeyWithoutPagination, resetNewRunsTracking]); + + // Patch visible rows from the live response. Keyed to the response alone so a + // loader refresh never re-applies a stale poll payload over fresh rows. + useEffect(() => { + const data = runsPollFetcher.data; + if (!data?.runs.length) return; + + setVisibleRuns((currentRuns) => patchVisibleRunsWithLiveUpdates(currentRuns, data.runs)); + }, [runsPollFetcher.data]); + + // Update new-runs count from the poll response. Re-evaluates when the cutoff + // changes, even if the response object itself is unchanged. + useEffect(() => { + const data = runsPollFetcher.data; + if (!hasNewRunsCountFields(data)) return; + + if (data.since === knownNewestRunMs) { + setNewRunsCount(data.count); + } + }, [runsPollFetcher.data, knownNewestRunMs, setNewRunsCount]); + + const activeRunIdsParam = useMemo( + () => + visibleRuns + .filter((run) => !run.hasFinished) + .map((run) => run.friendlyId) + .join(","), + [visibleRuns] + ); + const hasActiveRuns = activeRunIdsParam.length > 0; + + const runsResourcesBasePath = useMemo( + () => + `/resources/orgs/${organizationSlug}/projects/${projectSlug}/env/${environmentSlug}/runs`, + [organizationSlug, projectSlug, environmentSlug] + ); + + const loadRunsPoll = useCallback( + (checkForNewRuns: boolean) => { + if (runsPollFetcherStateRef.current !== "idle") return; + + if (!hasActiveRuns && !checkForNewRuns) return; + + const searchParams = new URLSearchParams(); + if (hasActiveRuns) { + searchParams.set("runIds", activeRunIdsParam); + } + + if (checkForNewRuns) { + appendNewRunsSearchParams(searchParams, { + locationSearch: location.search, + since: knownNewestRunMs, + }); + } + + runsPollFetcher.load(`${runsResourcesBasePath}/live?${searchParams.toString()}`); + }, + [ + activeRunIdsParam, + hasActiveRuns, + location.search, + knownNewestRunMs, + runsPollFetcher, + runsResourcesBasePath, + ] + ); + + const shouldPoll = !isLoading && (hasActiveRuns || shouldPollForNewRuns); + + useInterval({ + interval: RUNS_POLL_INTERVAL_MS, + onLoad: true, + pauseWhenHidden: true, + disabled: !shouldPoll, + callback: () => { + loadRunsPoll(checkNewRunsOnTick()); + }, + }); + + return { + visibleRuns, + showNewRunsBanner, + newRunsCount, + dismissNewRuns, + childrenStatusesBasePath: runsResourcesBasePath, + }; +} diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.children-statuses.ts b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.children-statuses.ts new file mode 100644 index 00000000000..896dd25dd79 --- /dev/null +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.children-statuses.ts @@ -0,0 +1,110 @@ +import { type LoaderFunctionArgs } from "@remix-run/server-runtime"; +import { type TaskRunStatus } from "@trigger.dev/database"; +import { z } from "zod"; +import { $replica } from "~/db.server"; +import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactoryInstance.server"; +import { loadProjectEnvironmentFromRequest } from "~/services/loadProjectEnvironmentFromRequest.server"; +import { RunsRepository } from "~/services/runsRepository/runsRepository.server"; +import { runIdsQueryParam } from "~/utils/searchParams"; + +const SearchParamsSchema = z.object({ + runIds: runIdsQueryParam, +}); + +const ROOT_CREATED_AT_SAFETY_MARGIN_MS = 5 * 60 * 1000; + +type RootRun = { id: string; friendlyId: string; createdAt: Date }; +type ChildStatusEntry = { status: TaskRunStatus; count: number }; +type GroupedChildStatus = { + rootRunId: string; + status: TaskRunStatus; + count: number; +}; + +function mapGroupedStatusesToFriendlyIds( + grouped: GroupedChildStatus[], + roots: RootRun[] +): Map { + const rootFriendlyIdById = new Map(roots.map((run) => [run.id, run.friendlyId])); + const statusesByFriendlyId = new Map(); + + for (const item of grouped) { + const friendlyId = rootFriendlyIdById.get(item.rootRunId); + if (!friendlyId) continue; + + const existing = statusesByFriendlyId.get(friendlyId) ?? []; + existing.push({ + status: item.status, + count: item.count, + }); + statusesByFriendlyId.set(friendlyId, existing); + } + + return statusesByFriendlyId; +} + +function childrenStatusesResponseForRunIds( + runIds: string[], + statusesByFriendlyId: Map +) { + return { + runs: runIds.map((friendlyId) => ({ + friendlyId, + statuses: (statusesByFriendlyId.get(friendlyId) ?? []).filter((entry) => entry.count > 0), + })), + }; +} + +export async function loader({ request, params }: LoaderFunctionArgs) { + const url = new URL(request.url); + const { runIds } = SearchParamsSchema.parse(Object.fromEntries(url.searchParams)); + + if (runIds.length === 0) { + return { runs: [] }; + } + + const { project, environment } = await loadProjectEnvironmentFromRequest(request, params); + + const clickhouse = await clickhouseFactory.getClickhouseForOrganization( + project.organizationId, + "standard" + ); + const runsRepository = new RunsRepository({ clickhouse, prisma: $replica }); + + const { runs: roots } = await runsRepository.listRuns({ + organizationId: project.organizationId, + projectId: project.id, + environmentId: environment.id, + runId: runIds, + page: { size: 100 }, + }); + + if (roots.length === 0) { + return { runs: [] }; + } + + const earliestRootCreatedAtMs = Math.min(...roots.map((run) => run.createdAt.getTime())); + const sinceMs = Math.max(0, earliestRootCreatedAtMs - ROOT_CREATED_AT_SAFETY_MARGIN_MS); + + const [queryError, groupedRows] = await clickhouse.taskRuns.getChildRunStatusCounts({ + organizationId: project.organizationId, + projectId: project.id, + environmentId: environment.id, + rootRunIds: roots.map((run) => run.id), + since: sinceMs, + }); + + if (queryError) { + throw queryError; + } + + const grouped: GroupedChildStatus[] = groupedRows.map((row) => ({ + rootRunId: row.root_run_id, + status: row.status as TaskRunStatus, + count: row.count, + })); + + const statusesByFriendlyId = mapGroupedStatusesToFriendlyIds(grouped, roots); + + return childrenStatusesResponseForRunIds(runIds, statusesByFriendlyId); +} diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.live.ts b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.live.ts new file mode 100644 index 00000000000..e9993b722c7 --- /dev/null +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.live.ts @@ -0,0 +1,93 @@ +import { type LoaderFunctionArgs } from "@remix-run/server-runtime"; +import { z } from "zod"; +import { $replica } from "~/db.server"; +import { mapRunToLiveFields } from "~/presenters/v3/mapRunToLiveFields.server"; +import { getRunFiltersFromRequest } from "~/presenters/RunFilters.server"; +import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactoryInstance.server"; +import { loadProjectEnvironmentFromRequest } from "~/services/loadProjectEnvironmentFromRequest.server"; +import { RunsRepository } from "~/services/runsRepository/runsRepository.server"; +import { runIdsQueryParam } from "~/utils/searchParams"; + +const SearchParamsSchema = z.object({ + runIds: runIdsQueryParam, + includeNewRuns: z + .string() + .optional() + .transform((value) => value === "true"), + since: z.coerce.number().optional(), +}); + +export async function loader({ request, params }: LoaderFunctionArgs) { + const url = new URL(request.url); + const { runIds, includeNewRuns, since } = SearchParamsSchema.parse( + Object.fromEntries(url.searchParams) + ); + + const newRunsSince = + includeNewRuns && since !== undefined ? since : undefined; + + if (runIds.length === 0 && newRunsSince === undefined) { + return { runs: [] }; + } + + const { project, environment } = await loadProjectEnvironmentFromRequest(request, params); + + const clickhouse = await clickhouseFactory.getClickhouseForOrganization( + project.organizationId, + "standard" + ); + const runsRepository = new RunsRepository({ clickhouse, prisma: $replica }); + + const [runs, newRunsResult] = await Promise.all([ + runIds.length > 0 + ? runsRepository + .listRuns({ + organizationId: project.organizationId, + projectId: project.id, + environmentId: environment.id, + runId: runIds, + page: { size: 100 }, + }) + .then(({ runs: listedRuns }) => listedRuns.map(mapRunToLiveFields)) + : Promise.resolve([]), + newRunsSince !== undefined + ? (async () => { + const filters = await getRunFiltersFromRequest(request); + + if (filters.to !== undefined && filters.to <= newRunsSince) { + return { count: 0, since: newRunsSince }; + } + + const newRunIds = await runsRepository.listRunIds({ + organizationId: project.organizationId, + projectId: project.id, + environmentId: environment.id, + tasks: filters.tasks, + versions: filters.versions, + statuses: filters.statuses, + tags: filters.tags, + scheduleId: filters.scheduleId, + period: filters.period, + from: Math.max(filters.from ?? 0, newRunsSince + 1), + to: filters.to, + rootOnly: filters.rootOnly, + batchId: filters.batchId, + runId: filters.runId, + bulkId: filters.bulkId, + queues: filters.queues, + machines: filters.machines, + errorId: filters.errorId, + page: { size: 100 }, + }); + + return { count: newRunIds.length, since: newRunsSince }; + })() + : Promise.resolve(undefined), + ]); + + if (newRunsResult) { + return { runs, ...newRunsResult }; + } + + return { runs }; +} diff --git a/apps/webapp/app/services/loadProjectEnvironmentFromRequest.server.ts b/apps/webapp/app/services/loadProjectEnvironmentFromRequest.server.ts new file mode 100644 index 00000000000..76c39caba67 --- /dev/null +++ b/apps/webapp/app/services/loadProjectEnvironmentFromRequest.server.ts @@ -0,0 +1,25 @@ +import { type Params } from "@remix-run/react"; +import { findProjectBySlug } from "~/models/project.server"; +import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server"; +import { requireUserId } from "~/services/session.server"; +import { EnvironmentParamSchema } from "~/utils/pathBuilder"; + +export async function loadProjectEnvironmentFromRequest( + request: Request, + params: Params +) { + const userId = await requireUserId(request); + const { organizationSlug, projectParam, envParam } = EnvironmentParamSchema.parse(params); + + const project = await findProjectBySlug(organizationSlug, projectParam, userId); + if (!project) { + throw new Response(undefined, { status: 404, statusText: "Project not found" }); + } + + const environment = await findEnvironmentBySlug(project.id, envParam, userId); + if (!environment) { + throw new Response(undefined, { status: 404, statusText: "Environment not found" }); + } + + return { userId, project, environment }; +} diff --git a/apps/webapp/app/utils/searchParams.ts b/apps/webapp/app/utils/searchParams.ts index 4e3b0682b06..79220c7801e 100644 --- a/apps/webapp/app/utils/searchParams.ts +++ b/apps/webapp/app/utils/searchParams.ts @@ -1,6 +1,23 @@ -import { ZodType } from "zod"; +import { z, ZodType } from "zod"; import { fromZodError } from "zod-validation-error"; +/** + * Parses a comma-separated `runIds` query param into a trimmed, de-duplicated + * list of run friendly IDs, capped at 100. Shared by the runs `/live` and + * `/children-statuses` resource routes. + */ +export const runIdsQueryParam = z + .string() + .optional() + .transform((value) => { + const ids = + value + ?.split(",") + .map((id) => id.trim()) + .filter(Boolean) ?? []; + return [...new Set(ids)].slice(0, 100); + }); + export function objectToSearchParams( obj: | undefined diff --git a/apps/webapp/test/presenters/mapRunToLiveFields.test.ts b/apps/webapp/test/presenters/mapRunToLiveFields.test.ts new file mode 100644 index 00000000000..954b502c18e --- /dev/null +++ b/apps/webapp/test/presenters/mapRunToLiveFields.test.ts @@ -0,0 +1,79 @@ +import { describe, expect, it } from "vitest"; +import { mapRunToLiveFields } from "~/presenters/v3/mapRunToLiveFields.server"; + +describe("mapRunToLiveFields", () => { + it("maps an executing run with lockedAt fallback and non-final flags", () => { + const updatedAt = new Date("2026-05-07T10:00:00.000Z"); + const lockedAt = new Date("2026-05-07T09:59:50.000Z"); + + const result = mapRunToLiveFields({ + friendlyId: "run_123", + status: "EXECUTING", + updatedAt, + startedAt: null, + lockedAt, + completedAt: null, + usageDurationMs: BigInt(2500), + costInCents: 10, + baseCostInCents: 5, + }); + + expect(result).toEqual({ + friendlyId: "run_123", + status: "EXECUTING", + updatedAt: updatedAt.toISOString(), + startedAt: lockedAt.toISOString(), + finishedAt: undefined, + hasFinished: false, + isCancellable: true, + isPending: false, + usageDurationMs: 2500, + costInCents: 10, + baseCostInCents: 5, + }); + }); + + it("maps a final run and prefers completedAt for finishedAt", () => { + const updatedAt = new Date("2026-05-07T10:00:00.000Z"); + const startedAt = new Date("2026-05-07T09:59:00.000Z"); + const completedAt = new Date("2026-05-07T09:59:30.000Z"); + + const result = mapRunToLiveFields({ + friendlyId: "run_456", + status: "COMPLETED_SUCCESSFULLY", + updatedAt, + startedAt, + lockedAt: null, + completedAt, + usageDurationMs: 1200, + costInCents: 20, + baseCostInCents: 7, + }); + + expect(result.finishedAt).toBe(completedAt.toISOString()); + expect(result.startedAt).toBe(startedAt.toISOString()); + expect(result.hasFinished).toBe(true); + expect(result.isCancellable).toBe(false); + }); + + it("falls back to updatedAt when a final run has no completedAt", () => { + const updatedAt = new Date("2026-05-07T10:00:00.000Z"); + + const result = mapRunToLiveFields({ + friendlyId: "run_789", + status: "CRASHED", + updatedAt, + startedAt: null, + lockedAt: null, + completedAt: null, + usageDurationMs: 0, + costInCents: 0, + baseCostInCents: 0, + }); + + expect(result.finishedAt).toBe(updatedAt.toISOString()); + expect(result.hasFinished).toBe(true); + expect(result.isPending).toBe(false); + expect(result.isCancellable).toBe(false); + }); +}); diff --git a/apps/webapp/test/runsRepository.part2.test.ts b/apps/webapp/test/runsRepository.part2.test.ts index 5fbe80ce52e..793e19236c0 100644 --- a/apps/webapp/test/runsRepository.part2.test.ts +++ b/apps/webapp/test/runsRepository.part2.test.ts @@ -789,4 +789,107 @@ describe("RunsRepository (part 2/2)", () => { expect(secondPage.pagination.previousCursor).toBeTruthy(); } ); -}); \ No newline at end of file + + containerTest( + "should count new runs with listRunIds", + async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => { + const { clickhouse } = await setupClickhouseReplication({ + prisma, + databaseUrl: postgresContainer.getConnectionUri(), + clickhouseUrl: clickhouseContainer.getConnectionUrl(), + redisOptions, + }); + + const organization = await prisma.organization.create({ + data: { + title: "test", + slug: "test", + }, + }); + + const project = await prisma.project.create({ + data: { + name: "test", + slug: "test", + organizationId: organization.id, + externalRef: "test", + }, + }); + + const runtimeEnvironment = await prisma.runtimeEnvironment.create({ + data: { + slug: "test", + type: "DEVELOPMENT", + projectId: project.id, + organizationId: organization.id, + apiKey: "test", + pkApiKey: "test", + shortcode: "test", + }, + }); + + const taskRun = await prisma.taskRun.create({ + data: { + friendlyId: "run_has_new", + taskIdentifier: "my-task", + payload: JSON.stringify({ foo: "bar" }), + traceId: "1234", + spanId: "1234", + queue: "test", + runtimeEnvironmentId: runtimeEnvironment.id, + projectId: project.id, + organizationId: organization.id, + environmentType: "DEVELOPMENT", + engine: "V2", + }, + }); + + await setTimeout(1000); + + const runsRepository = new RunsRepository({ + prisma, + clickhouse, + }); + + const baseOptions = { + projectId: project.id, + environmentId: runtimeEnvironment.id, + organizationId: organization.id, + }; + + const createdAtMs = taskRun.createdAt.getTime(); + + const newRunIdsBefore = await runsRepository.listRunIds({ + ...baseOptions, + from: createdAtMs - 1, + page: { size: 100 }, + }); + expect(newRunIdsBefore.length).toBeGreaterThanOrEqual(1); + + const newRunIdsAfter = await runsRepository.listRunIds({ + ...baseOptions, + from: createdAtMs + 60_000, + page: { size: 100 }, + }); + expect(newRunIdsAfter).toHaveLength(0); + + const fromBeforeRun = createdAtMs - 1; + + const matchingTaskIds = await runsRepository.listRunIds({ + ...baseOptions, + from: fromBeforeRun, + tasks: ["my-task"], + page: { size: 100 }, + }); + expect(matchingTaskIds.length).toBeGreaterThanOrEqual(1); + + const otherTaskIds = await runsRepository.listRunIds({ + ...baseOptions, + from: fromBeforeRun, + tasks: ["other-task"], + page: { size: 100 }, + }); + expect(otherTaskIds).toHaveLength(0); + } + ); +}); diff --git a/internal-packages/clickhouse/src/index.ts b/internal-packages/clickhouse/src/index.ts index 17e48623f9f..d8a97296d1d 100644 --- a/internal-packages/clickhouse/src/index.ts +++ b/internal-packages/clickhouse/src/index.ts @@ -9,6 +9,7 @@ import { getTaskRunsQueryBuilder, getTaskActivityQueryBuilder, getCurrentRunningStats, + getChildRunStatusCounts, getAverageDurations, getTaskUsageByOrganization, getTaskRunsCountQueryBuilder, @@ -228,6 +229,7 @@ export class ClickHouse { pendingVersionIdsQueryBuilder: getPendingVersionIdsQueryBuilder(this.reader), getTaskActivity: getTaskActivityQueryBuilder(this.reader), getCurrentRunningStats: getCurrentRunningStats(this.reader), + getChildRunStatusCounts: getChildRunStatusCounts(this.reader), getAverageDurations: getAverageDurations(this.reader), getTaskUsageByOrganization: getTaskUsageByOrganization(this.reader), }; diff --git a/internal-packages/clickhouse/src/taskRuns.test.ts b/internal-packages/clickhouse/src/taskRuns.test.ts index 2d35ab0d420..76c2267b073 100644 --- a/internal-packages/clickhouse/src/taskRuns.test.ts +++ b/internal-packages/clickhouse/src/taskRuns.test.ts @@ -2,6 +2,8 @@ import { clickhouseTest } from "@internal/testcontainers"; import { z } from "zod"; import { ClickhouseClient } from "./client/client.js"; import { + TASK_RUN_INDEX, + getChildRunStatusCounts, getTaskRunsQueryBuilder, insertRawTaskRunPayloadsCompactArrays, insertTaskRunsCompactArrays, @@ -416,6 +418,311 @@ describe("Task Runs V2", () => { } ); + clickhouseTest( + "should aggregate child status counts with FINAL and ignore deleted rows", + async ({ clickhouseContainer }) => { + const client = new ClickhouseClient({ + name: "test", + url: clickhouseContainer.getConnectionUrl(), + }); + + const insert = insertTaskRunsCompactArrays(client, { + async_insert: 0, // turn off async insert for this test + }); + + const baseCreatedAt = new Date("2025-05-01T12:00:00.000Z").getTime(); + const oldCreatedAt = new Date("2025-04-15T12:00:00.000Z").getTime(); + const since = baseCreatedAt - 60_000; + + const rootRun: TaskRunInsertArray = [ + "env_agg", // environment_id + "org_agg", // organization_id + "project_agg", // project_id + "root_run_1", // run_id + baseCreatedAt, // updated_at + baseCreatedAt, // created_at + "EXECUTING", // status + "DEVELOPMENT", // environment_type + "run_root_1", // friendly_id + 1, // attempt + "V2", // engine + "root-task", // task_identifier + "task/root-task", // queue + "", // schedule_id + "", // batch_id + null, // completed_at + baseCreatedAt, // started_at + null, // executed_at + null, // delay_until + baseCreatedAt, // queued_at + null, // expired_at + 0, // usage_duration_ms + 0, // cost_in_cents + 0, // base_cost_in_cents + { data: null }, // output + { data: null }, // error + "", // error_fingerprint + [], // tags + "", // task_version + "", // sdk_version + "", // cli_version + "", // machine_preset + "", // root_run_id + "", // parent_run_id + 0, // depth + "span_root", // span_id + "trace_root", // trace_id + "", // idempotency_key + "", // idempotency_key_user + "", // idempotency_key_scope + "", // expiration_ttl + true, // is_test + "1", // _version + 0, // _is_deleted + "", // concurrency_key + [], // bulk_action_group_ids + "", // worker_queue + null, // max_duration_in_seconds + "", // trigger_source + "", // root_trigger_source + "", // task_kind + null, // is_warm_start + ]; + + const childA_v1: TaskRunInsertArray = [ + "env_agg", + "org_agg", + "project_agg", + "child_a", + baseCreatedAt + 1_000, + baseCreatedAt + 1_000, + "PENDING", + "DEVELOPMENT", + "run_child_a", + 1, + "V2", + "child-task", + "task/child-task", + "", + "", + null, + null, + null, + null, + baseCreatedAt + 1_000, + null, + 0, + 0, + 0, + { data: null }, + { data: null }, + "", + [], + "", + "", + "", + "", + "root_run_1", + "root_run_1", + 1, + "span_child_a", + "trace_root", + "", + "", + "", + "", + true, + "1", + 0, + "", + [], + "", + null, + "", + "", + "", + null, + ]; + + const childA_v2: TaskRunInsertArray = [ + ...childA_v1, + ]; + childA_v2[TASK_RUN_INDEX.status] = "COMPLETED_SUCCESSFULLY"; + childA_v2[TASK_RUN_INDEX._version] = "2"; + + const childB: TaskRunInsertArray = [ + "env_agg", + "org_agg", + "project_agg", + "child_b", + baseCreatedAt + 2_000, + baseCreatedAt + 2_000, + "EXECUTING", + "DEVELOPMENT", + "run_child_b", + 1, + "V2", + "child-task", + "task/child-task", + "", + "", + null, + baseCreatedAt + 2_000, + null, + null, + baseCreatedAt + 2_000, + null, + 0, + 0, + 0, + { data: null }, + { data: null }, + "", + [], + "", + "", + "", + "", + "root_run_1", + "root_run_1", + 1, + "span_child_b", + "trace_root", + "", + "", + "", + "", + true, + "1", + 0, + "", + [], + "", + null, + "", + "", + "", + null, + ]; + + const childDeleted_v1: TaskRunInsertArray = [ + "env_agg", + "org_agg", + "project_agg", + "child_deleted", + baseCreatedAt + 3_000, + baseCreatedAt + 3_000, + "PENDING", + "DEVELOPMENT", + "run_child_deleted", + 1, + "V2", + "child-task", + "task/child-task", + "", + "", + null, + null, + null, + null, + baseCreatedAt + 3_000, + null, + 0, + 0, + 0, + { data: null }, + { data: null }, + "", + [], + "", + "", + "", + "", + "root_run_1", + "root_run_1", + 1, + "span_child_deleted", + "trace_root", + "", + "", + "", + "", + true, + "1", + 0, + "", + [], + "", + null, + "", + "", + "", + null, + ]; + + const childDeleted_v2: TaskRunInsertArray = [ + ...childDeleted_v1, + ]; + childDeleted_v2[TASK_RUN_INDEX._version] = "2"; + childDeleted_v2[TASK_RUN_INDEX._is_deleted] = 1; + + const childWrongRoot: TaskRunInsertArray = [ + ...childB, + ]; + childWrongRoot[TASK_RUN_INDEX.run_id] = "child_wrong_root"; + childWrongRoot[TASK_RUN_INDEX.friendly_id] = "run_child_wrong_root"; + childWrongRoot[TASK_RUN_INDEX.root_run_id] = "other_root"; + childWrongRoot[TASK_RUN_INDEX.parent_run_id] = "other_root"; + childWrongRoot[TASK_RUN_INDEX.span_id] = "span_child_wrong_root"; + + const childOld: TaskRunInsertArray = [ + ...childB, + ]; + childOld[TASK_RUN_INDEX.run_id] = "child_old"; + childOld[TASK_RUN_INDEX.created_at] = oldCreatedAt; + childOld[TASK_RUN_INDEX.updated_at] = oldCreatedAt; + childOld[TASK_RUN_INDEX.started_at] = oldCreatedAt; + childOld[TASK_RUN_INDEX.queued_at] = oldCreatedAt; + childOld[TASK_RUN_INDEX.friendly_id] = "run_child_old"; + childOld[TASK_RUN_INDEX.span_id] = "span_child_old"; + + const [insertError] = await insert([ + rootRun, + childA_v1, + childA_v2, + childB, + childDeleted_v1, + childDeleted_v2, + childWrongRoot, + childOld, + ]); + + expect(insertError).toBeNull(); + + const [queryError, result] = await getChildRunStatusCounts(client)({ + organizationId: "org_agg", + projectId: "project_agg", + environmentId: "env_agg", + rootRunIds: ["root_run_1"], + since, + }); + + expect(queryError).toBeNull(); + expect(result).toEqual([ + { + root_run_id: "root_run_1", + status: "COMPLETED_SUCCESSFULLY", + count: 1, + }, + { + root_run_id: "root_run_1", + status: "EXECUTING", + count: 1, + }, + ]); + } + ); + clickhouseTest( "should be able to insert payloads with a duplicate path", async ({ clickhouseContainer }) => { diff --git a/internal-packages/clickhouse/src/taskRuns.ts b/internal-packages/clickhouse/src/taskRuns.ts index 40b2daac206..77dca1f7726 100644 --- a/internal-packages/clickhouse/src/taskRuns.ts +++ b/internal-packages/clickhouse/src/taskRuns.ts @@ -514,6 +514,51 @@ export function getCurrentRunningStats(ch: ClickhouseReader, settings?: ClickHou }); } +export const ChildRunStatusCountsQueryResult = z.object({ + root_run_id: z.string(), + status: z.string(), + count: z.number().int(), +}); + +export type ChildRunStatusCountsQueryResult = z.infer; + +export const ChildRunStatusCountsQueryParams = z.object({ + organizationId: z.string(), + projectId: z.string(), + environmentId: z.string(), + rootRunIds: z.array(z.string()).min(1), + since: z.number().int(), +}); + +export function getChildRunStatusCounts(ch: ClickhouseReader, settings?: ClickHouseSettings) { + return ch.query({ + name: "getChildRunStatusCounts", + query: ` + SELECT + root_run_id, + status, + count() as count + FROM trigger_dev.task_runs_v2 FINAL + WHERE + organization_id = {organizationId: String} + AND project_id = {projectId: String} + AND environment_id = {environmentId: String} + AND root_run_id IN {rootRunIds: Array(String)} + AND created_at >= fromUnixTimestamp64Milli({since: Int64}) + AND _is_deleted = 0 + GROUP BY + root_run_id, + status + ORDER BY + root_run_id ASC, + status ASC + `, + schema: ChildRunStatusCountsQueryResult, + params: ChildRunStatusCountsQueryParams, + settings, + }); +} + export const AverageDurationsQueryResult = z.object({ task_identifier: z.string(), duration: z.number(),