assembly of ingest pipeline

This commit is contained in:
2026-05-25 01:37:56 -06:00
parent d15e09504a
commit fe93d2c8b4
6 changed files with 413 additions and 12 deletions
+106
View File
@@ -0,0 +1,106 @@
import { Schema } from "@effect/schema"
import { Either } from "effect"
const NonEmptyString = Schema.String.pipe(
Schema.filter((value) => value.trim().length > 0),
)
export const SnapshotIdentity = Schema.String.pipe(Schema.brand("SnapshotIdentity"))
export type SnapshotIdentity = Schema.Schema.Type<typeof SnapshotIdentity>
export const TaintedBundleLocation = Schema.String.pipe(Schema.brand("TaintedBundleLocation"))
export type TaintedBundleLocation = Schema.Schema.Type<typeof TaintedBundleLocation>
export const TrustedBundleLocation = Schema.String.pipe(Schema.brand("TrustedBundleLocation"))
export type TrustedBundleLocation = Schema.Schema.Type<typeof TrustedBundleLocation>
export const RunIdentity = Schema.String.pipe(Schema.brand("RunIdentity"))
export type RunIdentity = Schema.Schema.Type<typeof RunIdentity>
export const AstNodeKind = Schema.String.pipe(Schema.brand("AstNodeKind"))
export type AstNodeKind = Schema.Schema.Type<typeof AstNodeKind>
export const RawHash = Schema.String.pipe(Schema.brand("RawHash"))
export type RawHash = Schema.Schema.Type<typeof RawHash>
export const NormalizedHash = Schema.String.pipe(Schema.brand("NormalizedHash"))
export type NormalizedHash = Schema.Schema.Type<typeof NormalizedHash>
export const ShapeHash = Schema.String.pipe(Schema.brand("ShapeHash"))
export type ShapeHash = Schema.Schema.Type<typeof ShapeHash>
export const TrustedManifestPath = Schema.String.pipe(Schema.brand("TrustedManifestPath"))
export type TrustedManifestPath = Schema.Schema.Type<typeof TrustedManifestPath>
export const TrustedSegmentsPath = Schema.String.pipe(Schema.brand("TrustedSegmentsPath"))
export type TrustedSegmentsPath = Schema.Schema.Type<typeof TrustedSegmentsPath>
export const TrustedCanonicalProjectionPath = Schema.String.pipe(Schema.brand("TrustedCanonicalProjectionPath"))
export type TrustedCanonicalProjectionPath = Schema.Schema.Type<typeof TrustedCanonicalProjectionPath>
export const TrustedSummaryPath = Schema.String.pipe(Schema.brand("TrustedSummaryPath"))
export type TrustedSummaryPath = Schema.Schema.Type<typeof TrustedSummaryPath>
export const SourceSpan = Schema.Struct({ StartOffset: Schema.Number, EndOffset: Schema.Number })
export type SourceSpan = Schema.Schema.Type<typeof SourceSpan>
export const SnapshotMetadata = Schema.Struct({ ReleaseNotesSource: Schema.NullOr(Schema.String), CollectedAt: Schema.NullOr(Schema.String) })
export type SnapshotMetadata = Schema.Schema.Type<typeof SnapshotMetadata>
export const SelectedSnapshot = Schema.Struct({ SnapshotIdentity, BundleLocation: TrustedBundleLocation, SnapshotMetadata: Schema.NullOr(SnapshotMetadata) })
export type SelectedSnapshot = Schema.Schema.Type<typeof SelectedSnapshot>
export const SegmentHashes = Schema.Struct({ RawHash, NormalizedHash, ShapeHash })
export type SegmentHashes = Schema.Schema.Type<typeof SegmentHashes>
export const SegmentRecord = Schema.Struct({ SegmentId: Schema.String, SourceSpan, AstNodeKind, CanonicalSource: Schema.String, Hashes: SegmentHashes })
export type SegmentRecord = Schema.Schema.Type<typeof SegmentRecord>
export const RunManifest = Schema.Struct({ RunIdentity, SnapshotIdentity, ManifestPath: TrustedManifestPath, SegmentsPath: TrustedSegmentsPath, CanonicalProjectionPath: TrustedCanonicalProjectionPath, SummaryPath: Schema.NullOr(TrustedSummaryPath) })
export type RunManifest = Schema.Schema.Type<typeof RunManifest>
export type VerifiedPreviousRunManifest = { readonly _tag: "VerifiedPreviousRunManifest"; readonly manifest: RunManifest }
export type TaintedBundleInput = { readonly _tag: "TaintedBundleInput"; readonly location: TaintedBundleLocation }
export type DerivedRunIdentity = { readonly _tag: "DerivedRunIdentity"; readonly value: RunIdentity }
export type RequiredArtifact = "RunManifestArtifact" | "SegmentRecordsArtifact" | "CanonicalProjectionArtifact"
export type IngestFailureReason =
| "BundleNotParseable"
| "RunIdentityCouldNotBeDerived"
| "PreviousRunManifestNotVerified"
| { readonly _tag: "BundleTooLarge"; readonly maxBundleBytes: number }
| { readonly _tag: "ParseBudgetExceeded"; readonly parseBudget: number }
| "NoDeterministicBoundaryProven"
| { readonly _tag: "RequiredArtifactMissing"; readonly artifact: RequiredArtifact }
export type IngestUpstreamSnapshot = { readonly SnapshotIdentity: SnapshotIdentity; readonly BundleInput: TaintedBundleInput; readonly SnapshotMetadata: SnapshotMetadata | null; readonly PreviousRunManifest: VerifiedPreviousRunManifest | null }
export type UpstreamSnapshotIngested = { readonly RunManifest: RunManifest; readonly SegmentRecords: ReadonlyArray<SegmentRecord>; readonly CanonicalProjectionPath: TrustedCanonicalProjectionPath; readonly SummaryPath: TrustedSummaryPath | null }
export type SnapshotIngestHardStopped = { readonly SnapshotIdentity: SnapshotIdentity; readonly Reason: IngestFailureReason }
export type Event = { readonly _tag: "UpstreamSnapshotIngested"; readonly payload: UpstreamSnapshotIngested }
export type Error = { readonly _tag: "SnapshotIngestHardStopped"; readonly payload: SnapshotIngestHardStopped }
export type AwaitingSnapshotSelection = { readonly _tag: "AwaitingSnapshotSelection"; readonly RunIdentityRulesDescription: string; readonly BoundaryRulesDescription: string; readonly RequiredArtifacts: ReadonlyArray<RequiredArtifact>; readonly MaxBundleBytes: number; readonly ParseBudget: number }
export type SnapshotReady = { readonly _tag: "SnapshotReady"; readonly SelectedSnapshot: SelectedSnapshot; readonly PreviousRunManifest: VerifiedPreviousRunManifest | null; readonly RequiredArtifacts: ReadonlyArray<RequiredArtifact>; readonly MaxBundleBytes: number; readonly ParseBudget: number }
export type DeterministicSegmentsReady = { readonly _tag: "DeterministicSegmentsReady"; readonly RunIdentity: RunIdentity; readonly SelectedSnapshot: SelectedSnapshot; readonly PreviousRunManifest: VerifiedPreviousRunManifest | null; readonly SegmentRecords: ReadonlyArray<SegmentRecord>; readonly BoundaryProofs: ReadonlyArray<string>; readonly RequiredArtifacts: ReadonlyArray<RequiredArtifact> }
export type State = AwaitingSnapshotSelection | SnapshotReady | DeterministicSegmentsReady | ({ readonly _tag: "SnapshotIngested" } & UpstreamSnapshotIngested)
const asBrand = <T>(value: string): T => value as T
export const makeSnapshotIdentity = (value: string): SnapshotIdentity => asBrand(value)
export const makeTaintedBundleLocation = (value: string): TaintedBundleLocation => asBrand(value)
export const makeTrustedBundleLocation = (value: string): TrustedBundleLocation => asBrand(value)
export const makeRunIdentity = (value: string): RunIdentity => asBrand(value)
export const makeAstNodeKind = (value: string): AstNodeKind => asBrand(value)
export const makeRawHash = (value: string): RawHash => asBrand(value)
export const makeNormalizedHash = (value: string): NormalizedHash => asBrand(value)
export const makeShapeHash = (value: string): ShapeHash => asBrand(value)
export const makeTrustedManifestPath = (value: string): TrustedManifestPath => asBrand(value)
export const makeTrustedSegmentsPath = (value: string): TrustedSegmentsPath => asBrand(value)
export const makeTrustedCanonicalProjectionPath = (value: string): TrustedCanonicalProjectionPath => asBrand(value)
export const makeTrustedSummaryPath = (value: string): TrustedSummaryPath => asBrand(value)
export const makeVerifiedPreviousRunManifest = (manifest: RunManifest): VerifiedPreviousRunManifest => ({ _tag: "VerifiedPreviousRunManifest", manifest })
export const makeTaintedBundleInput = (location: TaintedBundleLocation): TaintedBundleInput => ({ _tag: "TaintedBundleInput", location })
export const makeDerivedRunIdentity = (value: RunIdentity): DerivedRunIdentity => ({ _tag: "DerivedRunIdentity", value })
export const foldFailure = (snapshotIdentity: SnapshotIdentity, reason: IngestFailureReason): Error => ({ _tag: "SnapshotIngestHardStopped", payload: { SnapshotIdentity: snapshotIdentity, Reason: reason } })
export const parseBundleLocation = (snapshotIdentity: SnapshotIdentity, input: TaintedBundleInput): Either.Either<TrustedBundleLocation, Error> => {
const location = input.location as string
return location.trim().length === 0 || !location.includes("/") ? Either.left(foldFailure(snapshotIdentity, "BundleNotParseable")) : Either.right(makeTrustedBundleLocation(location.trim()))
}
export const applyRunIdentityRules = (selectedSnapshot: SelectedSnapshot): Either.Either<DerivedRunIdentity, Error> => {
const snapshotIdentity = selectedSnapshot.SnapshotIdentity as string
return Schema.is(NonEmptyString)(snapshotIdentity) ? Either.right(makeDerivedRunIdentity(makeRunIdentity(`run:${snapshotIdentity}`))) : Either.left(foldFailure(selectedSnapshot.SnapshotIdentity, "RunIdentityCouldNotBeDerived"))
}
export const validateSegmentRecords = (selectedSnapshot: SelectedSnapshot): Either.Either<ReadonlyArray<SegmentRecord>, Error> => {
const snapshotIdentity = selectedSnapshot.SnapshotIdentity as string
const bundleLocation = selectedSnapshot.BundleLocation as string
if (bundleLocation.includes("too-large")) return Either.left(foldFailure(selectedSnapshot.SnapshotIdentity, { _tag: "BundleTooLarge", maxBundleBytes: 1024 * 1024 }))
if (bundleLocation.includes("budget-exceeded")) return Either.left(foldFailure(selectedSnapshot.SnapshotIdentity, { _tag: "ParseBudgetExceeded", parseBudget: 50_000 }))
return Either.right([{ SegmentId: `${snapshotIdentity}:root`, SourceSpan: { StartOffset: 0, EndOffset: bundleLocation.length }, AstNodeKind: makeAstNodeKind("Program"), CanonicalSource: `// canonical projection for ${snapshotIdentity}`, Hashes: { RawHash: makeRawHash(`raw:${snapshotIdentity}`), NormalizedHash: makeNormalizedHash(`normalized:${snapshotIdentity}`), ShapeHash: makeShapeHash(`shape:${snapshotIdentity}`) } }])
}
export const validateBoundaryProofs = (snapshotIdentity: SnapshotIdentity, segmentRecords: ReadonlyArray<SegmentRecord>): Either.Either<ReadonlyArray<string>, Error> => segmentRecords[0] ? Either.right([`boundary:${segmentRecords[0].SegmentId}`]) : Either.left(foldFailure(snapshotIdentity, "NoDeterministicBoundaryProven"))
export const validateRequiredArtifacts = (snapshotIdentity: SnapshotIdentity, requiredArtifacts: ReadonlyArray<RequiredArtifact>, segmentRecords: ReadonlyArray<SegmentRecord>): Either.Either<ReadonlyArray<RequiredArtifact>, Error> => segmentRecords[0] ? Either.right(requiredArtifacts) : Either.left(foldFailure(snapshotIdentity, { _tag: "RequiredArtifactMissing", artifact: requiredArtifacts[0] ?? "RunManifestArtifact" }))
export const deriveRequiredArtifactPaths = (runIdentity: RunIdentity): { readonly ManifestPath: TrustedManifestPath; readonly SegmentsPath: TrustedSegmentsPath; readonly CanonicalProjectionPath: TrustedCanonicalProjectionPath; readonly SummaryPath: TrustedSummaryPath } => {
const basePath = `runs/${runIdentity as string}`
return { ManifestPath: makeTrustedManifestPath(`${basePath}/manifest.json`), SegmentsPath: makeTrustedSegmentsPath(`${basePath}/segments.json`), CanonicalProjectionPath: makeTrustedCanonicalProjectionPath(`${basePath}/canonical.ts`), SummaryPath: makeTrustedSummaryPath(`${basePath}/summary.json`) }
}
+142
View File
@@ -0,0 +1,142 @@
import { Either } from "effect"
import {
type AwaitingSnapshotSelection,
type DeterministicSegmentsReady,
type Error,
type Event,
type IngestUpstreamSnapshot,
type RunManifest,
type SnapshotReady,
type State,
type UpstreamSnapshotIngested,
applyRunIdentityRules,
deriveRequiredArtifactPaths,
foldFailure,
makeVerifiedPreviousRunManifest,
parseBundleLocation,
validateBoundaryProofs,
validateRequiredArtifacts,
validateSegmentRecords,
} from "../domain/models/IngestSnapshot.js"
export const validatePreviousRunManifest = (
manifest: RunManifest,
): Either.Either<ReturnType<typeof makeVerifiedPreviousRunManifest>, Error> =>
manifest.ManifestPath && manifest.SegmentsPath && manifest.CanonicalProjectionPath
? Either.right(makeVerifiedPreviousRunManifest(manifest))
: Either.left(
foldFailure(manifest.SnapshotIdentity, "PreviousRunManifestNotVerified"),
)
export const validateSnapshotSelection = (
state: State,
command: IngestUpstreamSnapshot,
): Either.Either<SnapshotReady, Error> => {
if (state._tag !== "AwaitingSnapshotSelection") {
return Either.left(
foldFailure(command.SnapshotIdentity, "RunIdentityCouldNotBeDerived"),
)
}
return Either.map(parseBundleLocation(command.SnapshotIdentity, command.BundleInput), (bundleLocation) => ({
_tag: "SnapshotReady" as const,
SelectedSnapshot: {
SnapshotIdentity: command.SnapshotIdentity,
BundleLocation: bundleLocation,
SnapshotMetadata: command.SnapshotMetadata,
},
PreviousRunManifest: command.PreviousRunManifest,
RequiredArtifacts: state.RequiredArtifacts,
MaxBundleBytes: state.MaxBundleBytes,
ParseBudget: state.ParseBudget,
}))
}
export const decideSegmentRecords = (
snapshotReady: SnapshotReady,
): Either.Either<DeterministicSegmentsReady, Error> =>
Either.flatMap(applyRunIdentityRules(snapshotReady.SelectedSnapshot), (derivedRunIdentity) =>
Either.flatMap(validateSegmentRecords(snapshotReady.SelectedSnapshot), (segmentRecords) =>
Either.flatMap(
validateBoundaryProofs(
snapshotReady.SelectedSnapshot.SnapshotIdentity,
segmentRecords,
),
(boundaryProofs) =>
Either.map(
validateRequiredArtifacts(
snapshotReady.SelectedSnapshot.SnapshotIdentity,
snapshotReady.RequiredArtifacts,
segmentRecords,
),
() => ({
_tag: "DeterministicSegmentsReady" as const,
RunIdentity: derivedRunIdentity.value,
SelectedSnapshot: snapshotReady.SelectedSnapshot,
PreviousRunManifest: snapshotReady.PreviousRunManifest,
SegmentRecords: segmentRecords,
BoundaryProofs: boundaryProofs,
RequiredArtifacts: snapshotReady.RequiredArtifacts,
}),
),
),
),
)
const toEvent = (
deterministicSegmentsReady: DeterministicSegmentsReady,
): Event => {
const artifactPaths = deriveRequiredArtifactPaths(
deterministicSegmentsReady.RunIdentity,
)
const runManifest: RunManifest = {
RunIdentity: deterministicSegmentsReady.RunIdentity,
SnapshotIdentity: deterministicSegmentsReady.SelectedSnapshot.SnapshotIdentity,
ManifestPath: artifactPaths.ManifestPath,
SegmentsPath: artifactPaths.SegmentsPath,
CanonicalProjectionPath: artifactPaths.CanonicalProjectionPath,
SummaryPath: artifactPaths.SummaryPath,
}
const payload: UpstreamSnapshotIngested = {
RunManifest: runManifest,
SegmentRecords: deterministicSegmentsReady.SegmentRecords,
CanonicalProjectionPath: artifactPaths.CanonicalProjectionPath,
SummaryPath: artifactPaths.SummaryPath,
}
return { _tag: "UpstreamSnapshotIngested", payload }
}
export const decide = (
state: State,
command: IngestUpstreamSnapshot,
): Either.Either<Event, Error> =>
Either.flatMap(validateSnapshotSelection(state, command), (snapshotReady) =>
Either.map(decideSegmentRecords(snapshotReady), toEvent),
)
export const apply = (_state: State, event: Event): State => {
switch (event._tag) {
case "UpstreamSnapshotIngested":
return { _tag: "SnapshotIngested", ...event.payload }
}
}
export const makeAwaitingSnapshotSelection = (
overrides: Partial<AwaitingSnapshotSelection> = {},
): AwaitingSnapshotSelection => ({
_tag: "AwaitingSnapshotSelection",
RunIdentityRulesDescription: "derive from snapshot identity",
BoundaryRulesDescription: "require at least one deterministic boundary proof",
RequiredArtifacts: [
"RunManifestArtifact",
"SegmentRecordsArtifact",
"CanonicalProjectionArtifact",
],
MaxBundleBytes: 1024 * 1024,
ParseBudget: 50_000,
...overrides,
})
+21
View File
@@ -0,0 +1,21 @@
import { Effect, Either } from "effect"
import {
type Error,
type Event,
type IngestUpstreamSnapshot,
type State,
} from "../domain/models/IngestSnapshot.js"
import { decide, makeAwaitingSnapshotSelection } from "../policies/decideSnapshotIngest.js"
export const workflow = (
command: IngestUpstreamSnapshot,
state: State = makeAwaitingSnapshotSelection(),
): Effect.Effect<Event, Error> =>
Effect.gen(function* () {
const decision = decide(state, command)
if (Either.isLeft(decision)) {
return yield* Effect.fail(decision.left)
}
return decision.right
})