Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
f24d8cc
feat(data-retention): granular PII redaction stages (input + block ou…
TheodoreSpeaks Jun 29, 2026
36f2a3d
fix(data-retention): propagate block-output redaction into child work…
TheodoreSpeaks Jun 30, 2026
bb3a84b
fix(data-retention): close block-output redaction gaps on streaming +…
TheodoreSpeaks Jun 30, 2026
0b81fed
fix(data-retention): drain+mask streamed output, resolve PII policy u…
TheodoreSpeaks Jun 30, 2026
2d75987
test(testing): support leftJoin().where().limit() in shared db mock
TheodoreSpeaks Jun 30, 2026
eb6b25a
fix(data-retention): mask agent/Pi memory writes under block-output r…
TheodoreSpeaks Jun 30, 2026
324b04c
Merge remote-tracking branch 'origin/staging' into feat/pii-granular-…
TheodoreSpeaks Jun 30, 2026
d55b557
fix(data-retention): guard partial PII stages in GET normalize
TheodoreSpeaks Jun 30, 2026
83ffe4d
fix(data-retention): mask seeded memory messages under block-output r…
TheodoreSpeaks Jun 30, 2026
a911af8
fix(guardrails): fail closed on misaligned Presidio batch responses
TheodoreSpeaks Jun 30, 2026
31f2e3f
fix(data-retention): enabled stage with no entity types redacts all (…
TheodoreSpeaks Jun 30, 2026
437d2bb
fix(data-retention): reject enabled stage with no entity types; empty…
TheodoreSpeaks Jun 30, 2026
78b2c56
docs(data-retention): note resume remask covers inline values only
TheodoreSpeaks Jun 30, 2026
8f86d77
fix(data-retention): scrub offloaded large-value refs from logs when …
TheodoreSpeaks Jun 30, 2026
6e9587a
fix(data-retention): hydrate, mask, and re-store large-value refs in …
TheodoreSpeaks Jun 30, 2026
f0c71cc
fix(data-retention): always apply logs policy to large-value refs whe…
TheodoreSpeaks Jun 30, 2026
9cc035d
perf(data-retention): drop redaction byte ceiling, parallelize chunks…
TheodoreSpeaks Jul 1, 2026
c31633a
Merge remote-tracking branch 'origin/staging' into feat/pii-granular-…
TheodoreSpeaks Jul 1, 2026
965eb65
feat(data-retention): gate granular PII stages behind pii-granular-re…
TheodoreSpeaks Jul 1, 2026
e7b822a
docs(pii): describe Presidio as a standalone service, not a sidecar
TheodoreSpeaks Jul 1, 2026
2175408
fix(data-retention): re-mask offloaded large-value refs on resume + d…
TheodoreSpeaks Jul 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 90 additions & 24 deletions apps/pii/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@
from typing import Any

from fastapi import FastAPI
from presidio_analyzer import AnalyzerEngine, Pattern, PatternRecognizer, RecognizerResult
from presidio_analyzer import (
AnalyzerEngine,
BatchAnalyzerEngine,
Pattern,
PatternRecognizer,
RecognizerResult,
)
from presidio_analyzer.nlp_engine import NlpEngineProvider
from presidio_analyzer.predefined_recognizers import (
AuAbnRecognizer,
Expand Down Expand Up @@ -133,6 +139,7 @@ def build_analyzer() -> AnalyzerEngine:


analyzer = build_analyzer()
batch_analyzer = BatchAnalyzerEngine(analyzer_engine=analyzer)
anonymizer = AnonymizerEngine()

# Propagates to uvicorn's root handler, so timing lands in the container log stream.
Expand All @@ -149,13 +156,65 @@ class AnalyzeRequest(BaseModel):
return_decision_process: bool = False


class AnalyzeBatchRequest(BaseModel):
texts: list[str]
language: str = "en"
entities: list[str] | None = None
score_threshold: float | None = None


class AnonymizeRequest(BaseModel):
text: str
analyzer_results: list[dict[str, Any]] = []
anonymizers: dict[str, dict[str, Any]] | None = None
operators: dict[str, dict[str, Any]] | None = None


class AnonymizeBatchItem(BaseModel):
text: str
analyzer_results: list[dict[str, Any]] = []


class AnonymizeBatchRequest(BaseModel):
items: list[AnonymizeBatchItem] = []
anonymizers: dict[str, dict[str, Any]] | None = None
operators: dict[str, dict[str, Any]] | None = None


def build_operators(
raw_operators: dict[str, dict[str, Any]] | None,
) -> dict[str, OperatorConfig] | None:
if not raw_operators:
return None
operators: dict[str, OperatorConfig] = {}
for entity, raw_cfg in raw_operators.items():
op_cfg = dict(raw_cfg)
op_type = op_cfg.pop("type", "replace")
operators[entity] = OperatorConfig(op_type, op_cfg)
return operators


def run_anonymize(
text: str,
raw_results: list[dict[str, Any]],
operators: dict[str, OperatorConfig] | None,
):
analyzer_results = [
RecognizerResult(
entity_type=r["entity_type"],
start=r["start"],
end=r["end"],
score=r.get("score", 1.0),
)
for r in raw_results
]
return anonymizer.anonymize(
text=text,
analyzer_results=analyzer_results,
operators=operators,
)


@app.get("/health")
def health() -> dict[str, str]:
return {"status": "ok"}
Expand Down Expand Up @@ -186,35 +245,28 @@ def analyze(req: AnalyzeRequest) -> list[dict[str, Any]]:
return [r.to_dict() for r in results]


@app.post("/analyze_batch")
def analyze_batch(req: AnalyzeBatchRequest) -> list[list[dict[str, Any]]]:
"""Analyze many texts in one pass (spaCy nlp.pipe), returning one span list
per input in request order — the batched counterpart to /analyze."""
results = batch_analyzer.analyze_iterator(
texts=req.texts,
language=req.language,
entities=req.entities or None,
score_threshold=req.score_threshold,
)
return [[r.to_dict() for r in per_text] for per_text in results]


@app.post("/anonymize")
def anonymize(req: AnonymizeRequest) -> dict[str, Any]:
started = time.perf_counter()
analyzer_results = [
RecognizerResult(
entity_type=r["entity_type"],
start=r["start"],
end=r["end"],
score=r.get("score", 1.0),
)
for r in req.analyzer_results
]
raw_operators = req.anonymizers or req.operators
operators = None
if raw_operators:
operators = {}
for entity, raw_cfg in raw_operators.items():
op_cfg = dict(raw_cfg)
op_type = op_cfg.pop("type", "replace")
operators[entity] = OperatorConfig(op_type, op_cfg)
result = anonymizer.anonymize(
text=req.text,
analyzer_results=analyzer_results,
operators=operators,
)
operators = build_operators(req.anonymizers or req.operators)
result = run_anonymize(req.text, req.analyzer_results, operators)
logger.info(
"anonymize chars=%d spans=%d duration_ms=%.1f",
len(req.text),
len(analyzer_results),
len(req.analyzer_results),
(time.perf_counter() - started) * 1000,
)
return {
Expand All @@ -230,3 +282,17 @@ def anonymize(req: AnonymizeRequest) -> dict[str, Any]:
for item in result.items
],
}


@app.post("/anonymize_batch")
def anonymize_batch(req: AnonymizeBatchRequest) -> dict[str, list[str]]:
"""Mask many texts in one pass, returning masked text per item in request
order — the batched counterpart to /anonymize. Anonymization is pure string
work (no NLP), so callers should send only items with detected spans."""
operators = build_operators(req.anonymizers or req.operators)
return {
"texts": [
run_anonymize(item.text, item.analyzer_results, operators).text
for item in req.items
]
}
9 changes: 5 additions & 4 deletions apps/sim/app/api/guardrails/mask-batch/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ const logger = createLogger('GuardrailsMaskBatchAPI')

/**
* Internal batch PII masking. The log-redaction persist path runs in both the
* Next.js server and the trigger.dev runtime, but the Presidio sidecars live only
* in the app task — so redaction calls this endpoint server-to-server (internal
* JWT) to keep Presidio centralized here.
* Next.js server and the trigger.dev runtime, but only the app task reaches the
* Presidio service (it holds `PII_URL` and the internal-network access) — so
* redaction calls this endpoint server-to-server (internal JWT) to keep the
* Presidio call centralized here.
*/
export const POST = withRouteHandler(async (request: NextRequest) => {
const auth = await checkInternalAuth(request, { requireWorkflowId: false })
Expand All @@ -35,7 +36,7 @@ export const POST = withRouteHandler(async (request: NextRequest) => {
})
return NextResponse.json({ masked })
} catch (error) {
// An unreachable/misconfigured Presidio sidecar makes maskPIIBatch throw; fail
// An unreachable/misconfigured Presidio service makes maskPIIBatch throw; fail
// loudly here (the caller scrubs to REDACTION_FAILED, so PII is never leaked).
logger.error('PII batch masking failed', {
error: getErrorMessage(error),
Expand Down
72 changes: 70 additions & 2 deletions apps/sim/app/api/organizations/[id]/data-retention/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,50 @@ function normalizeConfigured(
rules: settings.piiRedaction.rules.map((rule) => ({
...rule,
language: coercePiiLanguage(rule.language),
stages: rule.stages
? {
input: {
...rule.stages.input,
language: coercePiiLanguage(rule.stages.input?.language),
},
blockOutputs: {
...rule.stages.blockOutputs,
language: coercePiiLanguage(rule.stages.blockOutputs?.language),
},
logs: {
...rule.stages.logs,
language: coercePiiLanguage(rule.stages.logs?.language),
},
Comment thread
cursor[bot] marked this conversation as resolved.
}
: undefined,
})),
}
: null,
retentionOverrides: settings?.retentionOverrides ?? null,
}
}

/**
* Which granular stages (`input`/`blockOutputs`) are already enabled per rule
* target (`workspaceId ?? ''` = the org default). Used to gate the
* `pii-granular-redaction` flag on *new* enablement only: when the flag is off,
* an org that already configured granular stages must still be able to re-save
* unrelated settings (the UI re-sends the full PII snapshot every save), so we
* reject only a stage transitioning off→on, never a preserved one.
*/
function granularStageEnablement(
settings: OrganizationRetentionValues['piiRedaction']
): Map<string, { input: boolean; blockOutputs: boolean }> {
const map = new Map<string, { input: boolean; blockOutputs: boolean }>()
for (const rule of settings?.rules ?? []) {
map.set(rule.workspaceId ?? '', {
input: rule.stages?.input?.enabled === true,
blockOutputs: rule.stages?.blockOutputs?.enabled === true,
})
}
return map
}

/**
* GET /api/organizations/[id]/data-retention
* Returns the organization's data retention settings.
Expand Down Expand Up @@ -87,7 +124,10 @@ export const GET = withRouteHandler(
}

const isEnterprise = !isBillingEnabled || (await isOrganizationOnEnterprisePlan(organizationId))
const piiRedactionEnabled = await isFeatureEnabled('pii-redaction')
const [piiRedactionEnabled, piiGranularRedactionEnabled] = await Promise.all([
isFeatureEnabled('pii-redaction'),
isFeatureEnabled('pii-granular-redaction'),
])
const configured = normalizeConfigured(org.dataRetentionSettings)
const defaults = enterpriseDefaults()

Expand All @@ -99,6 +139,7 @@ export const GET = withRouteHandler(
configured,
effective: isEnterprise ? configured : defaults,
piiRedactionEnabled,
piiGranularRedactionEnabled,
},
})
}
Expand Down Expand Up @@ -167,7 +208,10 @@ export const PUT = withRouteHandler(
return NextResponse.json({ error: 'Organization not found' }, { status: 404 })
}

const piiRedactionEnabled = await isFeatureEnabled('pii-redaction')
const [piiRedactionEnabled, piiGranularRedactionEnabled] = await Promise.all([
isFeatureEnabled('pii-redaction'),
isFeatureEnabled('pii-granular-redaction'),
])

const current = normalizeConfigured(currentOrg.dataRetentionSettings)
const merged: DataRetentionSettings = { ...current }
Expand All @@ -187,6 +231,29 @@ export const PUT = withRouteHandler(
{ status: 403 }
)
}
if (!piiGranularRedactionEnabled) {
// Reject only a granular stage transitioning off→on; a body that merely
// preserves already-enabled granular stages must still save (the UI
// re-sends the full snapshot on every save), so existing orgs aren't
// locked out of unrelated retention changes when the flag is off.
const currentGranular = granularStageEnablement(current.piiRedaction)
const newlyEnablesGranular = (body.piiRedaction?.rules ?? []).some((rule) => {
const cur = currentGranular.get(rule.workspaceId ?? '')
return (
(rule.stages?.input?.enabled === true && !cur?.input) ||
(rule.stages?.blockOutputs?.enabled === true && !cur?.blockOutputs)
)
})
if (newlyEnablesGranular) {
return NextResponse.json(
{
error:
'Granular PII redaction (workflow input and block outputs) is not enabled for this organization',
},
{ status: 403 }
)
}
}
Comment thread
cursor[bot] marked this conversation as resolved.
merged.piiRedaction = body.piiRedaction
}
if (body.retentionOverrides !== undefined) {
Expand Down Expand Up @@ -251,6 +318,7 @@ export const PUT = withRouteHandler(
configured,
effective: configured,
piiRedactionEnabled,
piiGranularRedactionEnabled,
},
})
}
Expand Down
Loading
Loading