feat(v0.18.2.fork.1): migration v25 — ingest_log.source_id

Closes the upstream Step 5 deferral noted in schema-embedded.ts:202-204
("ingest_log.source_id is NOT added yet — lands in v17 alongside the sync
rewrite (Step 5)"). Upstream's v17 only addressed pages.source_id; the
ingest_log half was deferred without ever shipping.

- migrate.ts: v25 ALTER TABLE adds source_id NOT NULL DEFAULT 'default'
  REFERENCES sources(id) ON DELETE CASCADE + idx_ingest_log_source_id
- schema-embedded.ts: fresh-install schema mirrors the migration outcome
- types.ts: IngestLogInput.source_id?: string
- {postgres,pglite}-engine.ts logIngest: thread entry.source_id when set,
  fall back to schema DEFAULT 'default' otherwise
- import.ts + sync.ts: pass opts.sourceId to logIngest call sites

Tests:
- test/ingest-log-source-id.test.ts (new): col schema, FK enforcement,
  logIngest write-through both source-explicit and default-fallback paths

Strategy: fork-local commit, NOT sent upstream — kept separate from
Phase 1 to make it easy to drop if upstream eventually adds their own
ingest_log.source_id (would just be a rebase delete).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-06 22:14:22 +08:00
parent e5d94f63d9
commit 18f2dcdbe5
8 changed files with 162 additions and 13 deletions

View File

@@ -213,12 +213,13 @@ export async function runImport(engine: BrainEngine, args: string[], opts: { com
console.log(` ${chunksCreated} chunks created`);
}
// Log the ingest
// Log the ingest (v0.18.2.fork.1: scope to opts.sourceId when provided)
await engine.logIngest({
source_type: 'directory',
source_ref: dir,
pages_updated: importedSlugs,
summary: `Imported ${imported} pages, ${skipped} skipped, ${chunksCreated} chunks`,
source_id: opts.sourceId,
});
// Import → sync continuity: write sync checkpoint if this is a git repo.

View File

@@ -387,6 +387,7 @@ export async function performSync(engine: BrainEngine, opts: SyncOpts): Promise<
source_ref: `${repoPath} @ ${headCommit.slice(0, 8)}`,
pages_updated: pagesAffected,
summary: `Sync: +${filtered.added.length} ~${filtered.modified.length} -${filtered.deleted.length} R${filtered.renamed.length}, ${chunksCreated} chunks, ${elapsed}ms`,
source_id: opts.sourceId,
});
// Auto-extract links + timeline (always, extraction is cheap CPU)

View File

@@ -812,6 +812,33 @@ export const MIGRATIONS: Migration[] = [
END $$;
`,
},
{
// v0.18.2.fork.1 — Step 5 follow-up. The v0.18.0 schema-embedded.ts
// comment at the ingest_log block reads:
//
// "ingest_log.source_id is NOT added yet — lands in v17 alongside
// the sync rewrite (Step 5)"
//
// Upstream's v17 (= MIGRATIONS version 17 here, name=
// pages_source_id_composite_unique) only addressed pages.source_id.
// The ingest_log half was deferred without ever shipping. This fork
// closes the gap so per-source ingest history isn't permanently
// blind: dashboards / dream-cron / audits that JOIN ingest_log to
// sources can now scope cleanly without inferring source from
// pages_updated payload contents.
//
// Backward-compat: column is NOT NULL DEFAULT 'default' so legacy
// rows that never had a source_id get the same value the schema
// assumes for pages.source_id (matching v0.18.0's seed).
version: 25,
name: 'ingest_log_source_id',
sql: `
ALTER TABLE ingest_log
ADD COLUMN IF NOT EXISTS source_id TEXT NOT NULL DEFAULT 'default'
REFERENCES sources(id) ON DELETE CASCADE;
CREATE INDEX IF NOT EXISTS idx_ingest_log_source_id ON ingest_log(source_id);
`,
},
];
export const LATEST_VERSION = MIGRATIONS.length > 0

View File

@@ -1055,11 +1055,20 @@ export class PGLiteEngine implements BrainEngine {
// Ingest log
async logIngest(entry: IngestLogInput): Promise<void> {
await this.db.query(
`INSERT INTO ingest_log (source_type, source_ref, pages_updated, summary)
VALUES ($1, $2, $3::jsonb, $4)`,
[entry.source_type, entry.source_ref, JSON.stringify(entry.pages_updated), entry.summary]
);
// v0.18.2.fork.1: source_id explicit when provided, schema DEFAULT 'default' otherwise.
if (entry.source_id) {
await this.db.query(
`INSERT INTO ingest_log (source_id, source_type, source_ref, pages_updated, summary)
VALUES ($1, $2, $3, $4::jsonb, $5)`,
[entry.source_id, entry.source_type, entry.source_ref, JSON.stringify(entry.pages_updated), entry.summary]
);
} else {
await this.db.query(
`INSERT INTO ingest_log (source_type, source_ref, pages_updated, summary)
VALUES ($1, $2, $3::jsonb, $4)`,
[entry.source_type, entry.source_ref, JSON.stringify(entry.pages_updated), entry.summary]
);
}
}
async getIngestLog(opts?: { limit?: number }): Promise<IngestLogEntry[]> {

View File

@@ -1094,10 +1094,18 @@ export class PostgresEngine implements BrainEngine {
// Ingest log
async logIngest(entry: IngestLogInput): Promise<void> {
const sql = this.sql;
await sql`
INSERT INTO ingest_log (source_type, source_ref, pages_updated, summary)
VALUES (${entry.source_type}, ${entry.source_ref}, ${sql.json(entry.pages_updated)}, ${entry.summary})
`;
// v0.18.2.fork.1: source_id explicit when provided, schema DEFAULT 'default' otherwise.
if (entry.source_id) {
await sql`
INSERT INTO ingest_log (source_id, source_type, source_ref, pages_updated, summary)
VALUES (${entry.source_id}, ${entry.source_type}, ${entry.source_ref}, ${sql.json(entry.pages_updated)}, ${entry.summary})
`;
} else {
await sql`
INSERT INTO ingest_log (source_type, source_ref, pages_updated, summary)
VALUES (${entry.source_type}, ${entry.source_ref}, ${sql.json(entry.pages_updated)}, ${entry.summary})
`;
}
}
async getIngestLog(opts?: { limit?: number }): Promise<IngestLogEntry[]> {

View File

@@ -199,17 +199,20 @@ CREATE INDEX IF NOT EXISTS idx_versions_page ON page_versions(page_id);
-- ============================================================
-- ingest_log
-- ============================================================
-- NOTE (v0.18.0 Step 1): ingest_log.source_id is NOT added yet — lands
-- in v17 alongside the sync rewrite (Step 5), which starts writing
-- source-scoped entries.
-- v0.18.2.fork.1 (migration v25): source_id added per fork's Step 5
-- closure. Fresh installs get the column inline; existing brains pick
-- it up via the v25 ALTER TABLE migration (NOT NULL DEFAULT 'default').
CREATE TABLE IF NOT EXISTS ingest_log (
id SERIAL PRIMARY KEY,
source_id TEXT NOT NULL DEFAULT 'default'
REFERENCES sources(id) ON DELETE CASCADE,
source_type TEXT NOT NULL,
source_ref TEXT NOT NULL,
pages_updated JSONB NOT NULL DEFAULT '[]',
summary TEXT NOT NULL DEFAULT '',
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS idx_ingest_log_source_id ON ingest_log(source_id);
-- ============================================================
-- config: brain-level settings

View File

@@ -246,6 +246,8 @@ export interface IngestLogInput {
source_ref: string;
pages_updated: string[];
summary: string;
/** v0.18.2.fork.1 — per-source ingest history. Falls back to schema DEFAULT 'default' when omitted. */
source_id?: string;
}
// Config

View File

@@ -0,0 +1,98 @@
/**
* v0.18.2.fork.1 — migration v25 ingest_log.source_id.
*
* Closes the upstream Step 5 deferral noted at schema-embedded.ts:202-204:
*
* "ingest_log.source_id is NOT added yet — lands in v17 alongside the
* sync rewrite (Step 5)"
*
* Verifies:
* - migration v25 adds the column with NOT NULL DEFAULT 'default'
* - existing rows backfill to 'default' (the schema seed exists)
* - new rows can be written with explicit source_id
* - logIngest signature accepts entry.source_id and threads it through
* - omitting source_id falls back to schema DEFAULT 'default'
*/
import { describe, test, expect, beforeAll, afterAll } from 'bun:test';
import { PGLiteEngine } from '../src/core/pglite-engine.ts';
let engine: PGLiteEngine;
beforeAll(async () => {
engine = new PGLiteEngine();
await engine.connect({ type: 'pglite' } as never);
await engine.initSchema();
await engine.executeRaw(
`INSERT INTO sources (id, name, config) VALUES
('memory-dashboard', 'memory-dashboard', '{"federated": true}'::jsonb)
ON CONFLICT (id) DO NOTHING`,
);
});
afterAll(async () => {
await engine.disconnect();
});
describe('v25 — ingest_log.source_id schema', () => {
test('source_id column exists with NOT NULL DEFAULT default', async () => {
const rows = await engine.executeRaw<{ column_default: string | null; is_nullable: string }>(
`SELECT column_default, is_nullable FROM information_schema.columns
WHERE table_name = 'ingest_log' AND column_name = 'source_id'`,
);
expect(rows.length).toBe(1);
expect(rows[0].is_nullable).toBe('NO');
expect(rows[0].column_default).toContain('default');
});
test('idx_ingest_log_source_id index exists', async () => {
const rows = await engine.executeRaw<{ indexname: string }>(
`SELECT indexname FROM pg_indexes WHERE indexname = 'idx_ingest_log_source_id'`,
);
expect(rows.length).toBe(1);
});
test('FK to sources(id) is enforced (insert with bogus source rejected)', async () => {
let threw = false;
try {
await engine.executeRaw(
`INSERT INTO ingest_log (source_id, source_type, source_ref, summary)
VALUES ('does-not-exist', 'directory', '/tmp', '')`,
);
} catch {
threw = true;
}
expect(threw).toBe(true);
});
});
describe('v25 — logIngest write-through', () => {
test('logIngest with source_id writes to that source', async () => {
await engine.logIngest({
source_type: 'directory',
source_ref: '/tmp/md',
pages_updated: ['a', 'b'],
summary: 'test ingest md',
source_id: 'memory-dashboard',
});
const rows = await engine.executeRaw<{ source_id: string }>(
`SELECT source_id FROM ingest_log WHERE source_ref = '/tmp/md'`,
);
expect(rows.length).toBe(1);
expect(rows[0].source_id).toBe('memory-dashboard');
});
test('logIngest without source_id falls back to schema DEFAULT default', async () => {
await engine.logIngest({
source_type: 'directory',
source_ref: '/tmp/legacy',
pages_updated: [],
summary: 'legacy single-source caller',
});
const rows = await engine.executeRaw<{ source_id: string }>(
`SELECT source_id FROM ingest_log WHERE source_ref = '/tmp/legacy'`,
);
expect(rows.length).toBe(1);
expect(rows[0].source_id).toBe('default');
});
});