# 08 · Workers & runtime wiring

How the packages from 02-05 actually run on Cloudflare. Workers stay thin; every behavior here must be testable in `packages/` with the worker as a ~50-line shell.

## 1 · Bindings per worker

| Worker | Trigger | Bindings | Secrets |
|---|---|---|---|
| `scheduler` | cron `0 * * * *` (+ `*/30 * * * *` when any entity in launch — implemented as: hourly cron always runs; a second `*/30` cron runs and no-ops unless launch state exists) | D1 `avb-state`, producer → `avb-content-pulls`, R2 creds | `X_BEARER_TOKEN`, `S3_*`, `SLACK_WEBHOOK` |
| `ingest` | consumer `avb-content-pulls` (batch ≤ 5, timeout 30s) | D1, producer → `avb-enrich`, R2 creds | `X_BEARER_TOKEN`, `S3_*` |
| `enricher` | consumer `avb-enrich` (batch ≤ 3) | D1, R2 creds | `LLM_API_KEY`, `S3_*` |
| `publisher` | cron `*/30 * * * *` + producer message from enricher on batch completion | D1, R2 creds | `S3_*`, `SLACK_WEBHOOK` |

All R2 access goes through `packages/storage` using **aws4fetch** against the R2 S3 endpoint (`https://<account>.r2.cloudflarestorage.com`). No native R2 bindings in the storage layer (the S3-swappability rule from README §6); native bindings are permitted ONLY as an optimization behind the same interface, off by default.

## 2 · runId and idempotency

`runId = <scheduledTimeISO>/<entity>/<purpose>` (e.g. `2026-06-09T21:00:00Z/fable-5/content-relevancy`). Derived from the cron's `scheduledTime`, never `Date.now()`, so a retried invocation regenerates the same runId.

Idempotency ledger (everything keyed on runId or natural keys):
- raw writes: key contains runId → retry overwrites identically, harmless
- `posts_seen`: `INSERT OR IGNORE`
- classifications: cache key `(post_id, prompt_version)`
- budget debit: see §6 — keyed `(day, entity)` with the conditional update; a `debits(run_id PRIMARY KEY)` table prevents double-debit on queue redelivery
- events: `events_emitted(event_key)` PK
- publish: same runId → same manifest content; overwrite is a no-op

Queues are **at-least-once**: every consumer must be a no-op on redelivery. This is a test in 07, not a hope.

## 3 · Scheduler tick (pseudocode)

```
onCron(scheduledTime):
  entities = loadRegistry()                          // from repo bundle, not network
  states   = d1.entity_state.all()
  for e of entities (concurrency 5):
    counts = collector.counts(compileCountsQuery(e), { granularity: "hour" })
    storage.putRaw(countsRaw)                        // raw/<source>/<e>/<date>/<runId>/counts.json
    d1.upsertHourlyCounts(e, counts)                 // small rolling table for state eval + sparklines
  newStates = evaluateStateMachine(states, counts, registry.launchFlags)   // 05 §4, pure fn
  d1.writeStates(newStates); fire Slack on baseline→elevated/launch transitions
  for e of entities:
    if contentPullDue(e, newStates[e], scheduledTime):     // cadence table 03 §4
      enqueue(avb-content-pulls, { entity: e.slug, mode: "relevancy", runId } )
      enqueue(avb-content-pulls, { entity: e.slug, mode: "recency",   runId } )
  if scheduledTime.hour == 6: enqueue daily metrics-refresh messages (one per entity)
```

Tick must complete < 30s wall for 20 entities (counts calls dominate; they're parallel). Missed/late crons are fine: `since_id` heals content gaps, counts backfill via `start_time` on the next tick (counts/recent covers 7 days).

## 4 · Queue message contracts (in `packages/contracts`)

```ts
type ContentPullMsg   = { entity: string, mode: "relevancy" | "recency", runId: string }
type MetricsRefreshMsg = { entity: string, post_ids: string[], runId: string }
type EnrichMsg        = { entity: string, post_ids: string[], runId: string }   // ≤ 50 ids per msg
type PublishMsg       = { reason: "enrich-complete" | "safety-cron", runId: string }
```

`ingest` consumer: budget-debit (refuse → ack + log, never retry) → collector.posts() → normalize → dedup vs `posts_seen` → raw write → `INSERT OR IGNORE posts_seen` → chunk new IDs into EnrichMsgs.
`enricher` consumer: load NormalizedPosts from raw → spam heuristics → cache-check D1 → LLM classify misses → write `classifications` (cache) + flat `enriched_posts` rows (the analytical tier, 01 schema) in one `db.batch` → author classification for unseen authors → send PublishMsg.
`publisher`: query window rows from `enriched_posts` + `hourly_counts` (indexed SQL, never re-read raw/JSONL) → run all rollups/scores (pure functions taking rows) → zod-validate → leak-check → atomic publish (§7) → events diff → append publish-log. Monthly prune in the same worker: `DELETE FROM enriched_posts WHERE created_at < date('now','-90 days')` (raw keeps everything; D1 is a rebuildable view).

## 5 · D1 patterns

- Always `db.batch([...])` (atomic) for multi-statement writes.
- Hot tables get covering indexes: `posts_seen(entity, first_seen)`, `classifications(post_id)`.
- Keep rolling hourly counts in D1 (`hourly_counts(entity, hour, mentions)` PK(entity,hour), pruned > 35 days) so the scheduler and sparklines never re-read raw.
- D1 is the only mutable state. If D1 is lost: re-create schema, replay raw (09 §4); nothing published depends on D1 history beyond convenience.

## 6 · Budget guard (exact SQL)

```sql
-- one statement, no read-modify-write race:
UPDATE budget_ledger SET posts_read = posts_read + ?req
 WHERE day = ?today AND entity = ?e AND posts_read + ?req <= ?cap;
-- changes == 0 → either no row (INSERT first via OR IGNORE) or cap exceeded → REFUSE the pull
```

Wrap with `INSERT OR IGNORE INTO budget_ledger VALUES (?today, ?e, 0)` in the same batch. Global monthly check: `SELECT SUM(posts_read) FROM budget_ledger WHERE day >= month_start` before each pull; ≥ 90% of `MONTHLY_POST_BUDGET` → clamp to baseline + Slack (once per day, `events_emitted` key `global:budget90:<day>`).

## 7 · Atomic publish: manifest pointer (supersedes the staging-copy sketch in 05 §5)

1. Write all artifacts to immutable keys: `public/runs/<runId>/index.json`, `…/models/<slug>/summary.json`, etc. (cache-control: `public, max-age=31536000, immutable`)
2. Last, overwrite tiny `public/manifest.json` → `{ schema_version, runId, updated_at, paths: {...} }` (cache-control: `no-store`)
3. Site always fetches manifest first, then the immutable files it points to. Readers can never see a torn run; rollback = rewrite manifest to the previous runId.
4. Janitor (inside publisher): delete `public/runs/*` older than 48h except the live and previous run.

## 8 · LLM client (`packages/enrich/llm.ts`)

- Env-driven: `LLM_PROVIDER=google|anthropic`, `LLM_MODEL` (default `gemini-2.5-flash`; alternate `claude-haiku-4-5`). One function: `complete(systemPrompt, userJson, zodSchema)`.
- Force JSON output (provider JSON mode); strip markdown fences before parse; zod-parse; one repair retry with the validation error appended; second failure → DLQ the batch.
- Truncate post text to 500 chars for classification (accuracy degrades on long batches, and nothing past 500 chars changes a label).
- Concurrency 4; respect 429 with provider retry-after; per-day token ledger in D1 (observability only).
- Test double: `MockLLM` replays recorded fixture responses keyed by content hash; CI never calls a real LLM (README §constraint).

## 9 · Local dev & deploy

- `pnpm dev:pipeline` → `wrangler dev` with Miniflare-simulated D1/queues/R2 (`--persist-to .wrangler/state`), `FixtureCollector` + `MockLLM` wired via `LOCAL_FIXTURES=1`. One command runs a full tick → publish against fixtures.
- `pnpm dev:site` → Astro dev pointing at `http://localhost:8787/public/manifest.json` (a tiny static server over `.wrangler` state or `fixtures/public/`).
- Deploy: `pnpm deploy:workers` (wrangler deploy each worker, ordered scheduler-last) and `pnpm deploy:site` (Pages). CI deploys on main after the full gate; secrets only ever set via `wrangler secret put` (09 §1).
