diff --git a/packages/crawl-common/package.json b/packages/crawl-common/package.json index a8a4d8a..1c7a60e 100644 --- a/packages/crawl-common/package.json +++ b/packages/crawl-common/package.json @@ -16,6 +16,7 @@ "lint": "eslint src/" }, "devDependencies": { + "@types/node": "^24.12.0", "eslint": "^10.0.0", "tsup": "^8.0.0", "typescript": "^5.8.0", @@ -23,6 +24,7 @@ }, "dependencies": { "is-network-error": "^1.3.1", + "jose": "^6.2.2", "p-retry": "^8.0.0" } } diff --git a/packages/crawl-common/src/corpus-api/client.integration.ts b/packages/crawl-common/src/corpus-api/client.integration.ts new file mode 100644 index 0000000..1240bcb --- /dev/null +++ b/packages/crawl-common/src/corpus-api/client.integration.ts @@ -0,0 +1,179 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { + initCorpusApiClient, + updateApprovedCorpusItem, + CorpusApiError, + RETRY_MAX_TIMEOUT_MS, + TOKEN_REFRESH_WINDOW_MS, +} from './client.js'; +import { + CLIENT_OPTS, + UPDATE_APPROVED_CORPUS_ITEM_INPUT, + UPDATE_APPROVED_CORPUS_ITEM_SUCCESS_BODY, + mockResponse, +} from './test-helpers.js'; + +/** Extract the Bearer token from a fetch mock call. */ +function extractToken( + fetchMock: ReturnType, + callIndex: number, +): string { + const headers = fetchMock.mock.calls[callIndex][1].headers as Record< + string, + string + >; + return headers.authorization.replace('Bearer ', ''); +} + +/** + * Integration tests for retry and JWT refresh behavior. + * Uses fake timers to avoid real retry delays and to + * control token expiry. + */ +describe('Corpus API integration', () => { + const fetchMock = vi.fn(); + + afterEach(() => { + fetchMock.mockReset(); + vi.useRealTimers(); + vi.unstubAllGlobals(); + }); + + describe('retry', () => { + beforeEach(async () => { + vi.stubGlobal('fetch', fetchMock); + await initCorpusApiClient(CLIENT_OPTS); + // Prime the token cache with real timers so jose's + // async Web Crypto signing completes. Retry tests + // then use the cached token under fake timers. + fetchMock.mockResolvedValueOnce( + mockResponse(UPDATE_APPROVED_CORPUS_ITEM_SUCCESS_BODY), + ); + await updateApprovedCorpusItem(UPDATE_APPROVED_CORPUS_ITEM_INPUT); + fetchMock.mockReset(); + vi.useFakeTimers(); + }); + + it('retries on 5xx and succeeds', async () => { + fetchMock + .mockResolvedValueOnce(mockResponse({ error: 'server' }, 503)) + .mockResolvedValueOnce( + mockResponse(UPDATE_APPROVED_CORPUS_ITEM_SUCCESS_BODY), + ); + + const promise = updateApprovedCorpusItem( + UPDATE_APPROVED_CORPUS_ITEM_INPUT, + ); + await vi.advanceTimersByTimeAsync(RETRY_MAX_TIMEOUT_MS); + const result = await promise; + + expect(result.externalId).toBe( + UPDATE_APPROVED_CORPUS_ITEM_SUCCESS_BODY.data.updateApprovedCorpusItem + .externalId, + ); + expect(fetchMock).toHaveBeenCalledTimes(2); + }); + + it('retries on network error and succeeds', async () => { + fetchMock + .mockRejectedValueOnce(new TypeError('fetch failed')) + .mockResolvedValueOnce( + mockResponse(UPDATE_APPROVED_CORPUS_ITEM_SUCCESS_BODY), + ); + + const promise = updateApprovedCorpusItem( + UPDATE_APPROVED_CORPUS_ITEM_INPUT, + ); + await vi.advanceTimersByTimeAsync(RETRY_MAX_TIMEOUT_MS); + const result = await promise; + + expect(result.externalId).toBe( + UPDATE_APPROVED_CORPUS_ITEM_SUCCESS_BODY.data.updateApprovedCorpusItem + .externalId, + ); + expect(fetchMock).toHaveBeenCalledTimes(2); + }); + + it('stops after max retries', async () => { + fetchMock.mockResolvedValue(mockResponse({ error: 'down' }, 500)); + + const promise = updateApprovedCorpusItem( + UPDATE_APPROVED_CORPUS_ITEM_INPUT, + ).catch((e) => e); + await vi.advanceTimersByTimeAsync(RETRY_MAX_TIMEOUT_MS * 5); + const err = await promise; + + expect(err).toBeInstanceOf(CorpusApiError); + // 1 initial + 4 retries = 5 total. + expect(fetchMock).toHaveBeenCalledTimes(5); + }); + }); + + describe('token refresh', () => { + beforeEach(async () => { + vi.stubGlobal('fetch', fetchMock); + await initCorpusApiClient(CLIENT_OPTS); + vi.useFakeTimers(); + }); + + it('issues a new token after the refresh window', async () => { + // First call primes the token cache. + fetchMock.mockResolvedValueOnce( + mockResponse(UPDATE_APPROVED_CORPUS_ITEM_SUCCESS_BODY), + ); + const promise1 = updateApprovedCorpusItem( + UPDATE_APPROVED_CORPUS_ITEM_INPUT, + ); + await vi.advanceTimersByTimeAsync(0); + await promise1; + + const token1 = extractToken(fetchMock, 0); + + // Advance past the refresh window. + await vi.advanceTimersByTimeAsync(TOKEN_REFRESH_WINDOW_MS + 1_000); + + // Second call should issue a new token. + fetchMock.mockResolvedValueOnce( + mockResponse(UPDATE_APPROVED_CORPUS_ITEM_SUCCESS_BODY), + ); + const promise2 = updateApprovedCorpusItem( + UPDATE_APPROVED_CORPUS_ITEM_INPUT, + ); + await vi.advanceTimersByTimeAsync(0); + await promise2; + + const token2 = extractToken(fetchMock, 1); + expect(token2).not.toBe(token1); + }); + + it('reuses token within the refresh window', async () => { + fetchMock + .mockResolvedValueOnce( + mockResponse(UPDATE_APPROVED_CORPUS_ITEM_SUCCESS_BODY), + ) + .mockResolvedValueOnce( + mockResponse(UPDATE_APPROVED_CORPUS_ITEM_SUCCESS_BODY), + ); + + // First call primes the cache. + const promise1 = updateApprovedCorpusItem( + UPDATE_APPROVED_CORPUS_ITEM_INPUT, + ); + await vi.advanceTimersByTimeAsync(0); + await promise1; + + // Advance to the halfway point of the refresh window. + await vi.advanceTimersByTimeAsync(TOKEN_REFRESH_WINDOW_MS / 2); + + const promise2 = updateApprovedCorpusItem( + UPDATE_APPROVED_CORPUS_ITEM_INPUT, + ); + await vi.advanceTimersByTimeAsync(0); + await promise2; + + const token1 = extractToken(fetchMock, 0); + const token2 = extractToken(fetchMock, 1); + expect(token1).toBe(token2); + }); + }); +}); diff --git a/packages/crawl-common/src/corpus-api/client.spec.ts b/packages/crawl-common/src/corpus-api/client.spec.ts new file mode 100644 index 0000000..6dce8f6 --- /dev/null +++ b/packages/crawl-common/src/corpus-api/client.spec.ts @@ -0,0 +1,167 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { + initCorpusApiClient, + updateApprovedCorpusItem, + CorpusApiError, +} from './client.js'; +import { + TEST_JWK, + CLIENT_OPTS, + UPDATE_APPROVED_CORPUS_ITEM_INPUT, + UPDATE_APPROVED_CORPUS_ITEM_SUCCESS_BODY, + mockResponse, +} from './test-helpers.js'; + +let fetchMock: ReturnType; + +describe('corpus-api client', () => { + beforeEach(async () => { + fetchMock = vi.fn(); + vi.stubGlobal('fetch', fetchMock); + await initCorpusApiClient(CLIENT_OPTS); + }); + + afterEach(() => { + vi.unstubAllGlobals(); + vi.restoreAllMocks(); + }); + + describe('request', () => { + it('sends a GraphQL mutation with JWT auth', async () => { + fetchMock.mockResolvedValueOnce( + mockResponse(UPDATE_APPROVED_CORPUS_ITEM_SUCCESS_BODY), + ); + + await updateApprovedCorpusItem(UPDATE_APPROVED_CORPUS_ITEM_INPUT); + + expect(fetchMock).toHaveBeenCalledOnce(); + const [url, init] = fetchMock.mock.calls[0] as [string, RequestInit]; + expect(url).toBe(CLIENT_OPTS.endpoint); + expect(init.method).toBe('POST'); + + const headers = init.headers as Record; + expect(headers['content-type']).toBe('application/json'); + expect(headers.authorization).toMatch(/^Bearer eyJ/); + expect(headers['apollographql-client-name']).toBe('hnt-content'); + }); + + it('sends the mutation variables', async () => { + fetchMock.mockResolvedValueOnce( + mockResponse(UPDATE_APPROVED_CORPUS_ITEM_SUCCESS_BODY), + ); + + await updateApprovedCorpusItem(UPDATE_APPROVED_CORPUS_ITEM_INPUT); + + const body = JSON.parse(fetchMock.mock.calls[0][1].body as string); + expect(body.variables.data.externalId).toBe( + UPDATE_APPROVED_CORPUS_ITEM_INPUT.externalId, + ); + expect(body.variables.data.title).toBe( + UPDATE_APPROVED_CORPUS_ITEM_INPUT.title, + ); + expect(body.query).toContain('updateApprovedCorpusItem'); + }); + }); + + describe('response', () => { + it('returns the mutation result on success', async () => { + fetchMock.mockResolvedValueOnce( + mockResponse(UPDATE_APPROVED_CORPUS_ITEM_SUCCESS_BODY), + ); + + const result = await updateApprovedCorpusItem( + UPDATE_APPROVED_CORPUS_ITEM_INPUT, + ); + + const expected = + UPDATE_APPROVED_CORPUS_ITEM_SUCCESS_BODY.data.updateApprovedCorpusItem; + expect(result.externalId).toBe(expected.externalId); + expect(result.title).toBe(expected.title); + }); + + it.each([ + { + scenario: 'GraphQL errors', + body: { errors: [{ message: 'Item not found' }] }, + status: 200, + }, + { scenario: 'null data', body: { data: null }, status: 200 }, + { scenario: '4xx', body: { error: 'bad request' }, status: 400 }, + ])( + 'throws CorpusApiError on $scenario without retrying', + async ({ body, status }) => { + fetchMock.mockResolvedValueOnce(mockResponse(body, status)); + + await expect( + updateApprovedCorpusItem(UPDATE_APPROVED_CORPUS_ITEM_INPUT), + ).rejects.toThrow(CorpusApiError); + expect(fetchMock).toHaveBeenCalledOnce(); + }, + ); + }); + + describe('jwt', () => { + it('caches the JWT token across calls', async () => { + fetchMock + .mockResolvedValueOnce( + mockResponse(UPDATE_APPROVED_CORPUS_ITEM_SUCCESS_BODY), + ) + .mockResolvedValueOnce( + mockResponse(UPDATE_APPROVED_CORPUS_ITEM_SUCCESS_BODY), + ); + + await updateApprovedCorpusItem(UPDATE_APPROVED_CORPUS_ITEM_INPUT); + await updateApprovedCorpusItem(UPDATE_APPROVED_CORPUS_ITEM_INPUT); + + // Both calls should use the same token. + const token1 = ( + fetchMock.mock.calls[0][1].headers as Record + ).authorization; + const token2 = ( + fetchMock.mock.calls[1][1].headers as Record + ).authorization; + expect(token1).toBe(token2); + }); + + it('handles the {"keys": [...]} wrapper format', async () => { + const wrapped = JSON.stringify({ + keys: [JSON.parse(TEST_JWK)], + }); + await initCorpusApiClient({ + ...CLIENT_OPTS, + jwkJson: wrapped, + }); + fetchMock.mockResolvedValueOnce( + mockResponse(UPDATE_APPROVED_CORPUS_ITEM_SUCCESS_BODY), + ); + + const result = await updateApprovedCorpusItem( + UPDATE_APPROVED_CORPUS_ITEM_INPUT, + ); + + expect(result.externalId).toBe( + UPDATE_APPROVED_CORPUS_ITEM_SUCCESS_BODY.data.updateApprovedCorpusItem + .externalId, + ); + }); + + it('includes kid in the JWT header', async () => { + fetchMock.mockResolvedValueOnce( + mockResponse(UPDATE_APPROVED_CORPUS_ITEM_SUCCESS_BODY), + ); + + await updateApprovedCorpusItem(UPDATE_APPROVED_CORPUS_ITEM_INPUT); + + const authHeader = ( + fetchMock.mock.calls[0][1].headers as Record + ).authorization; + const token = authHeader.replace('Bearer ', ''); + // Decode the JWT header (first segment, base64url). + const header = JSON.parse( + Buffer.from(token.split('.')[0], 'base64url').toString(), + ); + expect(header.alg).toBe('RS256'); + expect(header.kid).toBe('test-kid'); + }); + }); +}); diff --git a/packages/crawl-common/src/corpus-api/client.ts b/packages/crawl-common/src/corpus-api/client.ts new file mode 100644 index 0000000..cc0a821 --- /dev/null +++ b/packages/crawl-common/src/corpus-api/client.ts @@ -0,0 +1,231 @@ +import { importJWK, SignJWT } from 'jose'; +import type { JWK } from 'jose'; +import pRetry from 'p-retry'; +import type { + CorpusApiClientOptions, + UpdateApprovedCorpusItemInput, + UpdateApprovedCorpusItemResponse, +} from './types.js'; + +// JWT configuration matching content-ml-services +// admin_backend.py. +const JWT_TTL_SECONDS = 300; +const JWT_REFRESH_BUFFER = 0.95; +const JWT_USERNAME = 'ML'; +const JWT_GROUPS = ['mozilliansorg_pocket_scheduled_surface_curator_full']; + +/** + * How long a cached JWT stays fresh before the next call + * signs a new one. + */ +export const TOKEN_REFRESH_WINDOW_MS = + JWT_TTL_SECONDS * JWT_REFRESH_BUFFER * 1000; + +// 4 retries (5 attempts total) with exponential backoff +// designed to complete well within the 600s Pub/Sub ack +// deadline: ~2s + ~4s + ~8s + ~16s = ~30s worst case. +const MAX_RETRIES = 4; +const RETRY_MIN_TIMEOUT_MS = 2_000; + +/** Upper bound on retry delay for Corpus API calls. */ +export const RETRY_MAX_TIMEOUT_MS = 16_000; + +/** Error thrown when a Corpus Admin API request fails. */ +export class CorpusApiError extends Error { + constructor( + message: string, + readonly statusCode?: number, + ) { + super(message); + this.name = 'CorpusApiError'; + } +} + +// Module-level state. +let endpoint: string | undefined; +let clientName = 'hnt-content'; +let clientVersion = '1.0'; +let privateKey: CryptoKey | Uint8Array; +let kid: string; + +// Cached JWT token. +let cachedToken: string | undefined; +let tokenExpiresAt = 0; + +// JWT claim config. +let issuer: string; +let audience: string; + +/** + * Initialize the Corpus Admin API client. Must be called + * once before updateApprovedCorpusItem. + */ +export async function initCorpusApiClient( + opts: CorpusApiClientOptions, +): Promise { + endpoint = opts.endpoint; + issuer = opts.issuer; + audience = opts.audience; + clientName = opts.clientName ?? clientName; + clientVersion = opts.clientVersion ?? clientVersion; + + const jwk = parseJwk(opts.jwkJson); + kid = jwk.kid!; + privateKey = await importJWK(jwk, 'RS256'); + + // Reset cached token when re-initialized. + cachedToken = undefined; + tokenExpiresAt = 0; +} + +/** + * Parse a JWK JSON string, handling the {"keys": [...]} + * wrapper format used by some secret stores. + */ +function parseJwk(jwkJson: string): JWK { + const parsed = JSON.parse(jwkJson) as JWK | { keys: JWK[] }; + const jwk = + 'keys' in parsed && Array.isArray(parsed.keys) + ? parsed.keys[0] + : (parsed as JWK); + if (!jwk) { + throw new Error('JWK keys array is empty'); + } + if (!jwk.kid) { + throw new Error('JWK must include a kid field'); + } + return jwk; +} + +/** Get a valid JWT, using the cache when possible. */ +async function getToken(): Promise { + const now = Date.now(); + if (cachedToken && now < tokenExpiresAt) { + return cachedToken; + } + + cachedToken = await new SignJWT({ + iss: issuer, + aud: audience, + name: 'Article Crawler', + identities: [{ userId: JWT_USERNAME }], + // Stringified per Cognito convention expected by + // admin-api JWT validation. + 'custom:groups': JSON.stringify(JWT_GROUPS), + }) + .setProtectedHeader({ alg: 'RS256', kid }) + .setIssuedAt() + .setExpirationTime(`${JWT_TTL_SECONDS}s`) + .sign(privateKey); + + tokenExpiresAt = now + JWT_TTL_SECONDS * JWT_REFRESH_BUFFER * 1_000; + return cachedToken; +} + +const UPDATE_MUTATION = ` + mutation UpdateApprovedCorpusItem( + $data: UpdateApprovedCorpusItemInput! + ) { + updateApprovedCorpusItem(data: $data) { + externalId + url + title + excerpt + } + } +`; + +/** + * Call the updateApprovedCorpusItem GraphQL mutation. + * Retries transient HTTP errors (5xx, network) with + * exponential backoff. + */ +export async function updateApprovedCorpusItem( + input: UpdateApprovedCorpusItemInput, +): Promise { + if (!endpoint) { + throw new Error( + 'Corpus API client not initialized. ' + + 'Call initCorpusApiClient() first.', + ); + } + + return pRetry( + async () => { + const token = await getToken(); + const response = await fetch(endpoint!, { + method: 'POST', + headers: { + 'content-type': 'application/json', + authorization: `Bearer ${token}`, + 'apollographql-client-name': clientName, + 'apollographql-client-version': clientVersion, + }, + body: JSON.stringify({ + query: UPDATE_MUTATION, + variables: { data: input }, + }), + signal: AbortSignal.timeout(30_000), + }); + + if (response.status >= 500) { + throw new CorpusApiError( + `Corpus API error: ${response.status} for ` + + `item ${input.externalId}`, + response.status, + ); + } + + if (response.status >= 400) { + let body: unknown; + try { + body = await response.json(); + } catch { + // Response may not be JSON. + } + throw new CorpusApiError( + `Corpus API client error: ${response.status} ` + + `for item ${input.externalId}: ` + + JSON.stringify(body), + response.status, + ); + } + + const payload = (await response.json()) as { + data?: { + updateApprovedCorpusItem: UpdateApprovedCorpusItemResponse; + }; + errors?: Array<{ message: string }>; + }; + + if (payload.errors?.length) { + throw new CorpusApiError( + `GraphQL errors for item ${input.externalId}: ` + + payload.errors.map((e) => e.message).join('; '), + ); + } + + if (!payload.data?.updateApprovedCorpusItem) { + throw new CorpusApiError( + `No data returned for item ${input.externalId}: ` + + JSON.stringify(payload), + ); + } + + return payload.data.updateApprovedCorpusItem; + }, + { + retries: MAX_RETRIES, + minTimeout: RETRY_MIN_TIMEOUT_MS, + maxTimeout: RETRY_MAX_TIMEOUT_MS, + factor: 2, + shouldRetry({ error }) { + // Only retry server errors and network failures. + if (error instanceof CorpusApiError) { + return error.statusCode != null && error.statusCode >= 500; + } + return true; // Network errors. + }, + }, + ); +} diff --git a/packages/crawl-common/src/corpus-api/index.ts b/packages/crawl-common/src/corpus-api/index.ts new file mode 100644 index 0000000..6647448 --- /dev/null +++ b/packages/crawl-common/src/corpus-api/index.ts @@ -0,0 +1,11 @@ +export { + initCorpusApiClient, + updateApprovedCorpusItem, + CorpusApiError, + RETRY_MAX_TIMEOUT_MS, +} from './client.js'; +export type { + CorpusApiClientOptions, + UpdateApprovedCorpusItemInput, + UpdateApprovedCorpusItemResponse, +} from './types.js'; diff --git a/packages/crawl-common/src/corpus-api/test-helpers.ts b/packages/crawl-common/src/corpus-api/test-helpers.ts new file mode 100644 index 0000000..22572ac --- /dev/null +++ b/packages/crawl-common/src/corpus-api/test-helpers.ts @@ -0,0 +1,58 @@ +/** + * Shared test fixtures for corpus-api unit and integration + * tests. Uses a generated 2048-bit RSA JWK (the real private + * key is a secret loaded at runtime). + */ +import type { UpdateApprovedCorpusItemInput } from './types.js'; + +export const TEST_JWK = JSON.stringify({ + kty: 'RSA', + kid: 'test-kid', + n: 'tlGRF0xXZdfLwe6wLslVERpJS5oH7ZL2UD33DDkx_S__GaHYx2VesvKn5qQ3j1SWgO3VBJV8i2YXpj6xPpZ8Kj34TbuAFO4klTZ1oy8FxyH-yUC56OJi2FlmqIz9zOWVMpkvwWweZcXms1QaPHxCjPix8LaAd1y0_urXnMvbZvgFtgqV3Gv1-rO2hM_VNIgCzZCFQ8Iz1viHAAgdIoOm6Bs7i6skzw0XTC_gv-ZDQmVSsKb6RNQL3Lyto2rbnOmMWt5zxndwZ-AP7UzZuJbR77OphFnUsN0hyL3-ShKMbo8pIQPrJs-B3GAplMCWkxpvyKBqDvYVXt9vrrou_YrITw', + e: 'AQAB', + d: 'WIEEE_FFQ_UbvormD_BAUUsXZZHiY1vCInXSJabmM2hHR-QfXbxB2lCdXQM-zV9cqD3L-Kuwh-MJe_RXCnD22XK3xNROetqX-68yMAM1pNNF4eB_3yN2pFvRz-SRmBOi96sRWa3om7MUKN2c1tvjWpenmZieiFMCsfTCsiTr3vGYrnydPGTB5AJlG_lAKLCX8ta9bDwQK5kJLcc8JyP0Tivwv6Dh0u_wCpc7Iiiq_SmQKLrAvFeJZaF2evh3OY_0zIYREGBKOwooTTnRy0QURTwaiWl7P4PwDDERd0og1Lcar2dj5JHGtcUw0qgbTGz4Du6jzy6-ieWRLoXqeumU0Q', + p: '8IML8lVMrnMDQqylf2fAO0iU6ge80zDvL8dIVWpTfJsBkvL6_eNZkeqC72f3xxH_xSQbdOGKYNDacCQcR2ChMQAn4LKsKE7s7NPmQnE6p36l-GsvwhUWQUXR7Z_A9cTF3k77qyDjK7E9EQ_u_jKsgv2YGl7CRTz_7VByQbf7lks', + q: 'wg8tFkRFCFcydpfqw7xDKdbo1tZMQA_XgkPuSoI5wu5rgMCWQw29s0rL4HM-cdeeyXqgJCRPwOybaWGCm59H1sWZew_rtIqI7M34JhmnxuiX444ySoqo7jCcW0WoJ5xpv-3XXRKC0Si1MC10m2oAvhMHhEiONnMtQ7tCuHZOY40', + dp: '6-nfIgkBemxeWlw2yc3fBUegqh6E3TM2qsry7LWqxqLU3GtyPu9uwG4jmOmGZcIF_D36oJ9KuMSkPzNseacS9ZmNhB4-OBuS0orXZXzjZ8AW1KFu6xT8C3KNBGSbRXeKDxGyUp2jtwvXNpFGgBj8llBhjhw8uuWmtAUgzc3F_hk', + dq: 'kQnyss-3oLI7PzPv_Pc6Y40CXX-xYbf1ZKEM-pc2QKEdrA9Evz0H6XcfxdOcek2jmgaSpjCVgyXUSgDdMx7q_HSXb8jIbBmWmRagPymxohK5YxQmNlxIQi4GzpjTQze-OfqzmhZ5u4XnVejDXFzvzSA_3_iygbO3wwW0qlWR5Qk', + qi: '1l7lV0YqJuRptrOKwbhna6O2OavR7xxmSI2HUzkM45tEjmaZ-w9VcQ_XkTNUZEbkk-qioNMmUVGYwW30sWkMiVI2RC_ybsaFmJ61VvfUp_0j7jg_6KsFJ6UXoDDOL1kB2bWoI0DyLe3tLcVxfjTehmNAVr51w9I9JzCT6Y4DS4o', +}); + +export const CLIENT_OPTS = { + endpoint: 'https://admin-api.test/', + jwkJson: TEST_JWK, + issuer: 'https://getpocket.com', + audience: 'https://admin-api.test/', +}; + +export const UPDATE_APPROVED_CORPUS_ITEM_INPUT: UpdateApprovedCorpusItemInput = + { + externalId: 'abc-123', + title: 'Test Title', + excerpt: 'Test Excerpt', + authors: [{ name: 'Jane Doe', sortOrder: 0 }], + status: 'CORPUS', + language: 'EN', + publisher: 'Test Publisher', + imageUrl: 'https://s3.amazonaws.com/image.jpg', + topic: 'TECHNOLOGY', + isTimeSensitive: false, + }; + +export const UPDATE_APPROVED_CORPUS_ITEM_SUCCESS_BODY = { + data: { + updateApprovedCorpusItem: { + externalId: 'abc-123', + url: 'https://example.com/article', + title: 'Test Title', + excerpt: 'Test Excerpt', + }, + }, +}; + +export function mockResponse(body: unknown, status = 200): Response { + return new Response(JSON.stringify(body), { + status, + headers: { 'content-type': 'application/json' }, + }); +} diff --git a/packages/crawl-common/src/corpus-api/types.ts b/packages/crawl-common/src/corpus-api/types.ts new file mode 100644 index 0000000..826ae9c --- /dev/null +++ b/packages/crawl-common/src/corpus-api/types.ts @@ -0,0 +1,44 @@ +/** Options for configuring the Corpus Admin API client. */ +export interface CorpusApiClientOptions { + /** + * Admin API GraphQL endpoint (e.g. + * 'https://admin-api.getpocket.com/'). + */ + endpoint: string; + /** JWK JSON string containing the RSA private key. */ + jwkJson: string; + /** JWT issuer claim. */ + issuer: string; + /** JWT audience claim. */ + audience: string; + /** + * apollographql-client-name header value. Identifies + * this service in admin-api logs. + */ + clientName?: string; + /** apollographql-client-version header value. */ + clientVersion?: string; +} + +/** Input for the updateApprovedCorpusItem GraphQL mutation. */ +export interface UpdateApprovedCorpusItemInput { + externalId: string; + title: string; + excerpt: string; + authors: Array<{ name: string; sortOrder: number }>; + status: 'CORPUS' | 'RECOMMENDATION'; + language: 'EN' | 'DE' | 'ES' | 'FR' | 'IT'; + publisher: string; + imageUrl: string; + topic: string; + isTimeSensitive: boolean; + datePublished?: string; +} + +/** Subset of fields returned by the mutation. */ +export interface UpdateApprovedCorpusItemResponse { + externalId: string; + url: string; + title: string; + excerpt: string; +} diff --git a/packages/crawl-common/src/index.ts b/packages/crawl-common/src/index.ts index 11fddc6..f42456e 100644 --- a/packages/crawl-common/src/index.ts +++ b/packages/crawl-common/src/index.ts @@ -19,3 +19,21 @@ export type { ZyteArticleListItemMetadata, ZyteResponse, } from './zyte/index.js'; +export { normalizeText } from './utils/index.js'; +export { + initCorpusApiClient, + updateApprovedCorpusItem, + CorpusApiError, +} from './corpus-api/index.js'; +export type { + CorpusApiClientOptions, + UpdateApprovedCorpusItemInput, + UpdateApprovedCorpusItemResponse, +} from './corpus-api/index.js'; +export type { + CorpusItem, + CrawlArticleMessage, + ArticleAuthor, + ArticleBreadcrumb, + ArticleEvent, +} from './types/index.js'; diff --git a/packages/crawl-common/src/types/events.ts b/packages/crawl-common/src/types/events.ts new file mode 100644 index 0000000..c8af131 --- /dev/null +++ b/packages/crawl-common/src/types/events.ts @@ -0,0 +1,28 @@ +/** Author in an article event published to BigQuery. */ +export interface ArticleAuthor { + name: string; +} + +/** Breadcrumb in an article event published to BigQuery. */ +export interface ArticleBreadcrumb { + name?: string; + url?: string; +} + +/** + * Event published to the articles Pub/Sub topic and written + * to the crawl.articles BigQuery table via a BigQuery + * subscription. + */ +export interface ArticleEvent { + url: string; + extracted_at: string; + headline?: string; + description?: string; + authors?: ArticleAuthor[]; + main_image_url?: string; + body_truncated?: string; + published_at?: string; + breadcrumbs?: ArticleBreadcrumb[]; + language?: string; +} diff --git a/packages/crawl-common/src/types/index.ts b/packages/crawl-common/src/types/index.ts new file mode 100644 index 0000000..f0da155 --- /dev/null +++ b/packages/crawl-common/src/types/index.ts @@ -0,0 +1,6 @@ +export type { CorpusItem, CrawlArticleMessage } from './messages.js'; +export type { + ArticleAuthor, + ArticleBreadcrumb, + ArticleEvent, +} from './events.js'; diff --git a/packages/crawl-common/src/types/messages.ts b/packages/crawl-common/src/types/messages.ts new file mode 100644 index 0000000..9d5b658 --- /dev/null +++ b/packages/crawl-common/src/types/messages.ts @@ -0,0 +1,31 @@ +/** + * Curated corpus item metadata, present on crawl-article + * messages for live articles managed by editors. + */ +export interface CorpusItem { + external_id: string; + title: string; + excerpt: string; + authors: { name: string }[]; + status: 'CORPUS' | 'RECOMMENDATION'; + // Sync with CorpusLanguage in content-monorepo + // packages/content-common/src/types.ts. + language: 'EN' | 'DE' | 'ES' | 'FR' | 'IT'; + publisher: string; + image_url: string; + topic: string; + is_time_sensitive: boolean; +} + +/** + * Pub/Sub message consumed from the crawl-article + * subscription. corpus_item is present only for live + * articles published by the crawl agent. + */ +export interface CrawlArticleMessage { + url: string; + source_url: string; + crawl_id: string; + enqueued_at: string; + corpus_item?: CorpusItem; +} diff --git a/packages/crawl-common/src/utils/index.ts b/packages/crawl-common/src/utils/index.ts new file mode 100644 index 0000000..9f47182 --- /dev/null +++ b/packages/crawl-common/src/utils/index.ts @@ -0,0 +1 @@ +export { normalizeText } from './normalize.js'; diff --git a/packages/crawl-common/src/utils/normalize.spec.ts b/packages/crawl-common/src/utils/normalize.spec.ts new file mode 100644 index 0000000..1202fe8 --- /dev/null +++ b/packages/crawl-common/src/utils/normalize.spec.ts @@ -0,0 +1,67 @@ +import { describe, expect, it } from 'vitest'; +import { normalizeText } from './normalize.js'; + +describe('normalizeText', () => { + it.each([ + ['returns empty string for null', null, undefined, ''], + ['returns empty string for undefined', undefined, undefined, ''], + ['returns empty string for empty string', '', undefined, ''], + [ + 'strips leading and trailing whitespace', + ' hello world ', + undefined, + 'hello world', + ], + ['lowercases text', 'Hello World', undefined, 'hello world'], + ['normalizes unicode to NFC', '\u0065\u0301', undefined, '\u00e9'], + [ + 'normalizes smart single quotes', + '\u2018hello\u2019', + undefined, + "'hello'", + ], + [ + 'normalizes smart double quotes', + '\u201chello\u201d', + undefined, + '"hello"', + ], + [ + 'normalizes mixed quotes', + 'it\u2019s a \u201ctest\u201d with \u2018quotes\u2019', + undefined, + "it's a \"test\" with 'quotes'", + ], + ['strips trailing period', 'Hello world.', undefined, 'hello world'], + [ + 'strips multiple trailing periods', + 'Multiple periods...', + undefined, + 'multiple periods', + ], + ['collapses multiple spaces', 'hello world', undefined, 'hello world'], + [ + 'collapses tabs and newlines', + 'hello\t\n world', + undefined, + 'hello world', + ], + ['truncates to maxLength', 'A'.repeat(300), 255, 'a'.repeat(255)], + [ + 'truncates before stripping periods', + 'A'.repeat(254) + '..', + 255, + 'a'.repeat(254), + ], + ])( + '%s', + ( + _name: string, + input: string | null | undefined, + maxLength: number | undefined, + expected: string, + ) => { + expect(normalizeText(input, maxLength)).toBe(expected); + }, + ); +}); diff --git a/packages/crawl-common/src/utils/normalize.ts b/packages/crawl-common/src/utils/normalize.ts new file mode 100644 index 0000000..e4d634f --- /dev/null +++ b/packages/crawl-common/src/utils/normalize.ts @@ -0,0 +1,42 @@ +/** + * Normalize text for comparison only (not for storage). + * Detects meaningful changes between Zyte-extracted and + * Corpus API-stored metadata while ignoring cosmetic + * differences in case, whitespace, and quote style. + * + * The Corpus API stores title/excerpt as-is with no + * normalization. Steps ported from the existing Python + * crawler in content-ml-services diff.py:normalize_text(). + */ +export function normalizeText( + text: string | null | undefined, + maxLength?: number, +): string { + if (text == null) return ''; + + // Unicode NFC canonical composition. + let result = text.normalize('NFC'); + + result = result.trim(); + + if (maxLength != null) { + result = result.slice(0, maxLength); + } + + // Strip trailing periods (inconsistent between sources). + result = result.replace(/\.+$/, ''); + + // Collapse whitespace to a single space. + result = result.replace(/\s+/g, ' '); + + result = result.toLowerCase(); + + // Normalize smart quotes to straight quotes. + result = result + .replaceAll('\u2018', "'") + .replaceAll('\u2019', "'") + .replaceAll('\u201c', '"') + .replaceAll('\u201d', '"'); + + return result; +} diff --git a/packages/crawl-common/tsconfig.json b/packages/crawl-common/tsconfig.json index 133e96a..782d8ad 100644 --- a/packages/crawl-common/tsconfig.json +++ b/packages/crawl-common/tsconfig.json @@ -2,7 +2,8 @@ "extends": "../../tsconfig.json", "compilerOptions": { "outDir": "dist", - "rootDir": "src" + "rootDir": "src", + "types": ["node", "vitest/globals"] }, "include": ["src"], "exclude": ["src/**/*.spec.ts", "src/**/*.integration.ts"] diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index d007acd..d4f8535 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -33,10 +33,16 @@ importers: is-network-error: specifier: ^1.3.1 version: 1.3.1 + jose: + specifier: ^6.2.2 + version: 6.2.2 p-retry: specifier: ^8.0.0 version: 8.0.0 devDependencies: + '@types/node': + specifier: ^24.12.0 + version: 24.12.0 eslint: specifier: ^10.0.0 version: 10.1.0 @@ -1146,6 +1152,9 @@ packages: isexe@2.0.0: resolution: {integrity: sha512-RHxMLp9lnKHGHRng9QFhRCMbYAcVpn69smSGcq3f36xjgVVWThj4qqLbTLlq7Ssj8B+fIQ1EuCEGI2lKsyQeIw==} + jose@6.2.2: + resolution: {integrity: sha512-d7kPDd34KO/YnzaDOlikGpOurfF0ByC2sEV4cANCtdqLlTfBlw2p14O/5d/zv40gJPbIQxfES3nSx1/oYNyuZQ==} + joycon@3.1.1: resolution: {integrity: sha512-34wB/Y7MW7bzjKRjUKTa46I2Z7eV62Rkhva+KkopW7Qvv/OSWBqvkSY7vusOPrNuZcUG3tApvdVgNB8POj3SPw==} engines: {node: '>=10'} @@ -2688,6 +2697,8 @@ snapshots: isexe@2.0.0: {} + jose@6.2.2: {} + joycon@3.1.1: {} json-buffer@3.0.1: {} diff --git a/services/crawl-worker/src/handlers/extract-article.integration.ts b/services/crawl-worker/src/handlers/extract-article.integration.ts new file mode 100644 index 0000000..b1d9609 --- /dev/null +++ b/services/crawl-worker/src/handlers/extract-article.integration.ts @@ -0,0 +1,134 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import { + initCorpusApiClient, + initZyteClient, + type CrawlArticleMessage, +} from 'crawl-common'; +import { handleArticleExtraction } from './extract-article.js'; +import { + BASE_MESSAGE, + CORPUS_ITEM, + TEST_JWK, + TEST_URL, + ZYTE_ARTICLE, +} from './test-helpers.js'; + +const ZYTE_URL = 'https://api.zyte.com/v1/extract'; +const CORPUS_API_URL = 'https://admin-api.test/'; + +/** + * Integration test: exercises the handler against real Zyte + * and Corpus API clients, with fetch stubbed at the network + * boundary. Catches regressions in client wiring (JWT signing, + * request construction, response parsing) that module-mocked + * unit tests cannot. + */ +describe('extract-article integration', () => { + const fetchMock = vi.fn(); + + beforeEach(async () => { + vi.stubGlobal('fetch', fetchMock); + + initZyteClient({ apiKey: 'test-zyte-key', maxRetries: 0 }); + await initCorpusApiClient({ + endpoint: CORPUS_API_URL, + jwkJson: TEST_JWK, + issuer: 'https://getpocket.com', + audience: CORPUS_API_URL, + }); + }); + + afterEach(() => { + fetchMock.mockReset(); + vi.unstubAllGlobals(); + }); + + it('triggers a Corpus API update and returns the mapped event when a live article has a changed title', async () => { + const updatedHeadline = 'Updated Headline From Zyte'; + const zyteArticle = { + ...ZYTE_ARTICLE, + headline: updatedHeadline, + authors: [{ name: 'Author A' }, { name: 'Author B' }], + }; + + fetchMock.mockImplementation(async (url: string) => { + if (url === ZYTE_URL) { + return new Response( + JSON.stringify({ + article: zyteArticle, + url: zyteArticle.url, + statusCode: 200, + }), + { status: 200, headers: { 'content-type': 'application/json' } }, + ); + } + if (url === CORPUS_API_URL) { + return new Response( + JSON.stringify({ + data: { + updateApprovedCorpusItem: { + externalId: CORPUS_ITEM.external_id, + url: zyteArticle.url, + title: updatedHeadline, + excerpt: zyteArticle.description, + }, + }, + }), + { status: 200, headers: { 'content-type': 'application/json' } }, + ); + } + throw new Error(`Unexpected fetch: ${url}`); + }); + + const message: CrawlArticleMessage = { + ...BASE_MESSAGE, + corpus_item: CORPUS_ITEM, + }; + const event = await handleArticleExtraction(message); + + // Both clients hit the network. + expect(fetchMock).toHaveBeenCalledTimes(2); + + // Corpus API request carries a Bearer JWT and the + // expected GraphQL mutation variables. + const corpusCall = fetchMock.mock.calls.find(([u]) => u === CORPUS_API_URL); + expect(corpusCall).toBeDefined(); + const corpusInit = corpusCall![1]!; + const headers = corpusInit.headers as Record; + expect(headers.authorization).toMatch(/^Bearer /); + expect(headers['apollographql-client-name']).toBe('hnt-content'); + + const corpusBody = JSON.parse(corpusInit.body as string) as { + variables: { data: Record }; + }; + expect(corpusBody.variables.data).toMatchObject({ + externalId: CORPUS_ITEM.external_id, + title: updatedHeadline, + // Excerpt unchanged: extracted matches corpus_item. + excerpt: CORPUS_ITEM.excerpt, + authors: [ + { name: 'Author A', sortOrder: 0 }, + { name: 'Author B', sortOrder: 1 }, + ], + status: CORPUS_ITEM.status, + language: CORPUS_ITEM.language, + publisher: CORPUS_ITEM.publisher, + imageUrl: CORPUS_ITEM.image_url, + topic: CORPUS_ITEM.topic, + isTimeSensitive: CORPUS_ITEM.is_time_sensitive, + }); + + // Handler returned the expected ArticleEvent. + expect(event).toMatchObject({ + url: TEST_URL, + headline: updatedHeadline, + description: ZYTE_ARTICLE.description, + authors: [{ name: 'Author A' }, { name: 'Author B' }], + main_image_url: ZYTE_ARTICLE.mainImage?.url, + body_truncated: ZYTE_ARTICLE.articleBody, + published_at: ZYTE_ARTICLE.datePublished, + language: ZYTE_ARTICLE.inLanguage, + }); + expect(event.extracted_at).toBeDefined(); + }); +}); diff --git a/services/crawl-worker/src/handlers/extract-article.spec.ts b/services/crawl-worker/src/handlers/extract-article.spec.ts new file mode 100644 index 0000000..87de09e --- /dev/null +++ b/services/crawl-worker/src/handlers/extract-article.spec.ts @@ -0,0 +1,288 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import type { + CrawlArticleMessage, + UpdateApprovedCorpusItemInput, +} from 'crawl-common'; + +vi.mock('crawl-common', async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + extractArticle: vi.fn(), + updateApprovedCorpusItem: vi.fn(), + }; +}); + +import { extractArticle, updateApprovedCorpusItem } from 'crawl-common'; +import { handleArticleExtraction } from './extract-article.js'; +import { + BASE_MESSAGE, + CORPUS_ITEM, + ZYTE_ARTICLE, + ZYTE_RESPONSE, +} from './test-helpers.js'; + +const extractArticleMock = vi.mocked(extractArticle); +const updateCorpusMock = vi.mocked(updateApprovedCorpusItem); + +describe('handleArticleExtraction', () => { + beforeEach(() => { + extractArticleMock.mockResolvedValue(ZYTE_RESPONSE); + updateCorpusMock.mockResolvedValue({ + externalId: 'ext-123', + url: 'https://example.com/article', + title: 'Test Headline', + excerpt: 'Test description', + }); + }); + + afterEach(() => { + vi.clearAllMocks(); + }); + + describe('discovered article (no corpus_item)', () => { + it('returns an ArticleEvent with mapped fields', async () => { + const event = await handleArticleExtraction(BASE_MESSAGE); + + expect(event.url).toBe(BASE_MESSAGE.url); + expect(event.headline).toBe(ZYTE_ARTICLE.headline); + expect(event.description).toBe(ZYTE_ARTICLE.description); + expect(event.authors).toEqual(ZYTE_ARTICLE.authors); + expect(event.main_image_url).toBe(ZYTE_ARTICLE.mainImage?.url); + expect(event.body_truncated).toBe(ZYTE_ARTICLE.articleBody); + expect(event.published_at).toBe(ZYTE_ARTICLE.datePublished); + expect(event.breadcrumbs).toEqual(ZYTE_ARTICLE.breadcrumbs); + expect(event.language).toBe(ZYTE_ARTICLE.inLanguage); + expect(event.extracted_at).toBeDefined(); + }); + + it('calls extractArticle with httpResponseBody', async () => { + await handleArticleExtraction(BASE_MESSAGE); + + expect(extractArticleMock).toHaveBeenCalledWith(BASE_MESSAGE.url, { + extractFrom: 'httpResponseBody', + }); + }); + + it('does not call updateApprovedCorpusItem', async () => { + await handleArticleExtraction(BASE_MESSAGE); + + expect(updateCorpusMock).not.toHaveBeenCalled(); + }); + + it('truncates articleBody to 2000 characters', async () => { + extractArticleMock.mockResolvedValueOnce({ + ...ZYTE_RESPONSE, + data: { + ...ZYTE_ARTICLE, + articleBody: 'x'.repeat(5000), + }, + }); + + const event = await handleArticleExtraction(BASE_MESSAGE); + + expect(event.body_truncated).toHaveLength(2000); + }); + }); + + describe('live article (with corpus_item)', () => { + const liveMessage: CrawlArticleMessage = { + ...BASE_MESSAGE, + corpus_item: CORPUS_ITEM, + }; + + it('does not update when title and excerpt match', async () => { + await handleArticleExtraction(liveMessage); + + expect(updateCorpusMock).not.toHaveBeenCalled(); + }); + + it('updates when title changed and passes through corpus fields', async () => { + extractArticleMock.mockResolvedValueOnce({ + ...ZYTE_RESPONSE, + data: { + ...ZYTE_ARTICLE, + headline: 'New Headline', + }, + }); + + await handleArticleExtraction(liveMessage); + + expect(updateCorpusMock).toHaveBeenCalledOnce(); + const input = updateCorpusMock.mock + .calls[0][0] as UpdateApprovedCorpusItemInput; + expect(input.title).toBe('New Headline'); + // Unchanged excerpt uses corpus item value. + expect(input.excerpt).toBe(CORPUS_ITEM.excerpt); + // Passthrough fields from corpus_item. + expect(input.externalId).toBe(CORPUS_ITEM.external_id); + expect(input.status).toBe(CORPUS_ITEM.status); + expect(input.language).toBe(CORPUS_ITEM.language); + expect(input.publisher).toBe(CORPUS_ITEM.publisher); + expect(input.imageUrl).toBe(CORPUS_ITEM.image_url); + expect(input.topic).toBe(CORPUS_ITEM.topic); + expect(input.isTimeSensitive).toBe(CORPUS_ITEM.is_time_sensitive); + }); + + it('updates when excerpt changed', async () => { + extractArticleMock.mockResolvedValueOnce({ + ...ZYTE_RESPONSE, + data: { + ...ZYTE_ARTICLE, + description: 'Completely different excerpt.', + }, + }); + + await handleArticleExtraction(liveMessage); + + expect(updateCorpusMock).toHaveBeenCalledOnce(); + const input = updateCorpusMock.mock + .calls[0][0] as UpdateApprovedCorpusItemInput; + expect(input.excerpt).toBe('Completely different excerpt.'); + }); + + it.each([ + { + scenario: 'case differences', + corpusTitle: 'Test Headline', + zyteHeadline: 'test headline', + }, + { + scenario: 'smart quote differences', + corpusTitle: "It's a 'test'", + zyteHeadline: 'It\u2019s a \u2018test\u2019', + }, + ])( + 'ignores $scenario in comparison', + async ({ corpusTitle, zyteHeadline }) => { + const msg: CrawlArticleMessage = { + ...BASE_MESSAGE, + corpus_item: { ...CORPUS_ITEM, title: corpusTitle }, + }; + extractArticleMock.mockResolvedValueOnce({ + ...ZYTE_RESPONSE, + data: { ...ZYTE_ARTICLE, headline: zyteHeadline }, + }); + + await handleArticleExtraction(msg); + + expect(updateCorpusMock).not.toHaveBeenCalled(); + }, + ); + + it('truncates excerpt to 255 chars for comparison', async () => { + const baseExcerpt = 'a'.repeat(255); + const msg: CrawlArticleMessage = { + ...BASE_MESSAGE, + corpus_item: { + ...CORPUS_ITEM, + excerpt: baseExcerpt, + }, + }; + extractArticleMock.mockResolvedValueOnce({ + ...ZYTE_RESPONSE, + data: { + ...ZYTE_ARTICLE, + // Same first 255 chars, different after. + description: baseExcerpt + ' extra text', + }, + }); + + await handleArticleExtraction(msg); + + expect(updateCorpusMock).not.toHaveBeenCalled(); + }); + + it('skips comparison when extracted fields are empty', async () => { + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}); + + extractArticleMock.mockResolvedValueOnce({ + ...ZYTE_RESPONSE, + data: { + ...ZYTE_ARTICLE, + headline: undefined, + description: undefined, + }, + }); + + await handleArticleExtraction(liveMessage); + + expect(updateCorpusMock).not.toHaveBeenCalled(); + expect(warnSpy).toHaveBeenCalledWith( + expect.stringContaining('Empty title and excerpt'), + ); + }); + + it('skips title comparison when only title is empty', async () => { + extractArticleMock.mockResolvedValueOnce({ + ...ZYTE_RESPONSE, + data: { + ...ZYTE_ARTICLE, + headline: '', + description: 'Completely different excerpt.', + }, + }); + + await handleArticleExtraction(liveMessage); + + expect(updateCorpusMock).toHaveBeenCalledOnce(); + const input = updateCorpusMock.mock + .calls[0][0] as UpdateApprovedCorpusItemInput; + // Falls back to corpus_item title since extracted is empty. + expect(input.title).toBe(CORPUS_ITEM.title); + expect(input.excerpt).toBe('Completely different excerpt.'); + }); + + it('prefers extracted authors over corpus_item', async () => { + extractArticleMock.mockResolvedValueOnce({ + ...ZYTE_RESPONSE, + data: { + ...ZYTE_ARTICLE, + headline: 'New Headline', + authors: [{ name: 'New Author One' }, { name: 'New Author Two' }], + }, + }); + + await handleArticleExtraction(liveMessage); + + const input = updateCorpusMock.mock + .calls[0][0] as UpdateApprovedCorpusItemInput; + expect(input.authors).toEqual([ + { name: 'New Author One', sortOrder: 0 }, + { name: 'New Author Two', sortOrder: 1 }, + ]); + }); + + it('falls back to corpus_item authors when extracted is empty', async () => { + extractArticleMock.mockResolvedValueOnce({ + ...ZYTE_RESPONSE, + data: { + ...ZYTE_ARTICLE, + headline: 'New Headline', + authors: [], + }, + }); + + await handleArticleExtraction(liveMessage); + + const input = updateCorpusMock.mock + .calls[0][0] as UpdateApprovedCorpusItemInput; + expect(input.authors).toEqual([{ name: 'Jane Doe', sortOrder: 0 }]); + }); + + it('throws when Corpus API update fails', async () => { + extractArticleMock.mockResolvedValueOnce({ + ...ZYTE_RESPONSE, + data: { + ...ZYTE_ARTICLE, + headline: 'New Headline', + }, + }); + updateCorpusMock.mockRejectedValueOnce(new Error('Corpus API down')); + + await expect(handleArticleExtraction(liveMessage)).rejects.toThrow( + 'Corpus API down', + ); + }); + }); +}); diff --git a/services/crawl-worker/src/handlers/extract-article.ts b/services/crawl-worker/src/handlers/extract-article.ts new file mode 100644 index 0000000..97f9fab --- /dev/null +++ b/services/crawl-worker/src/handlers/extract-article.ts @@ -0,0 +1,141 @@ +import { + extractArticle, + updateApprovedCorpusItem, + normalizeText, +} from 'crawl-common'; +import type { + CrawlArticleMessage, + ArticleEvent, + CorpusItem, + UpdateApprovedCorpusItemInput, + ZyteArticle, +} from 'crawl-common'; + +const BODY_TRUNCATE_LENGTH = 2_000; +const EXCERPT_COMPARE_LENGTH = 255; + +/** + * Extract an article via Zyte, map it to the articles event + * schema, and update the Curated Corpus API when a live + * article's title or excerpt has changed. + */ +export async function handleArticleExtraction( + message: CrawlArticleMessage, +): Promise { + const { data: article, url } = await extractArticle(message.url, { + extractFrom: 'httpResponseBody', + }); + + const event = mapToArticleEvent(article, url); + + if (message.corpus_item) { + await detectAndSyncChanges(article, message.corpus_item); + } + + return event; +} + +/** Map a Zyte article response to the BigQuery event schema. */ +function mapToArticleEvent(article: ZyteArticle, url: string): ArticleEvent { + return { + url, + extracted_at: new Date().toISOString(), + headline: article.headline ?? undefined, + description: article.description ?? undefined, + authors: article.authors?.map((a) => ({ name: a.name })), + main_image_url: article.mainImage?.url ?? undefined, + body_truncated: article.articleBody?.slice(0, BODY_TRUNCATE_LENGTH), + published_at: article.datePublished ?? undefined, + breadcrumbs: article.breadcrumbs?.map((b) => ({ + name: b.name, + url: b.url, + })), + language: article.inLanguage ?? undefined, + }; +} + +/** + * Compare extracted metadata against the corpus item and + * call the Corpus Admin API if title or excerpt changed. + * Throws on API failure so Pub/Sub redelivers the message. + */ +async function detectAndSyncChanges( + article: ZyteArticle, + corpusItem: CorpusItem, +): Promise { + const extractedTitle = article.headline; + const extractedExcerpt = article.description; + + if (!extractedTitle && !extractedExcerpt) { + console.warn( + 'Empty title and excerpt from Zyte for live article ' + + `${corpusItem.external_id}; skipping comparison`, + ); + return; + } + + const titleChanged = + extractedTitle != null && + extractedTitle.trim() !== '' && + normalizeText(extractedTitle) !== normalizeText(corpusItem.title); + + const excerptChanged = + extractedExcerpt != null && + extractedExcerpt.trim() !== '' && + normalizeText(extractedExcerpt, EXCERPT_COMPARE_LENGTH) !== + normalizeText(corpusItem.excerpt, EXCERPT_COMPARE_LENGTH); + + if (!titleChanged && !excerptChanged) return; + + const changedFields = [ + ...(titleChanged ? ['title'] : []), + ...(excerptChanged ? ['excerpt'] : []), + ]; + console.log( + `Detected changes in [${changedFields.join(', ')}] ` + + `for corpus item ${corpusItem.external_id}`, + ); + + const input = buildUpdateInput(article, corpusItem, { + title: titleChanged ? extractedTitle : undefined, + excerpt: excerptChanged ? extractedExcerpt : undefined, + }); + await updateApprovedCorpusItem(input); +} + +/** + * Build the GraphQL mutation input. Only overrides title or + * excerpt when the corresponding field actually changed; + * unchanged fields use the corpus item value to avoid + * overwriting curator edits with cosmetic differences. + */ +function buildUpdateInput( + article: ZyteArticle, + corpusItem: CorpusItem, + changed: { title?: string; excerpt?: string }, +): UpdateApprovedCorpusItemInput { + // Prefer extracted authors, fall back to corpus item. + const authors = + article.authors && article.authors.length > 0 + ? article.authors.map((a, i) => ({ + name: a.name, + sortOrder: i, + })) + : corpusItem.authors.map((a, i) => ({ + name: a.name, + sortOrder: i, + })); + + return { + externalId: corpusItem.external_id, + title: changed.title?.trim() ?? corpusItem.title, + excerpt: changed.excerpt?.trim() ?? corpusItem.excerpt, + authors, + status: corpusItem.status, + language: corpusItem.language, + publisher: corpusItem.publisher, + imageUrl: corpusItem.image_url, + topic: corpusItem.topic, + isTimeSensitive: corpusItem.is_time_sensitive, + }; +} diff --git a/services/crawl-worker/src/handlers/test-helpers.ts b/services/crawl-worker/src/handlers/test-helpers.ts new file mode 100644 index 0000000..2cadfec --- /dev/null +++ b/services/crawl-worker/src/handlers/test-helpers.ts @@ -0,0 +1,73 @@ +import type { + CorpusItem, + CrawlArticleMessage, + ZyteArticle, + ZyteResponse, +} from 'crawl-common'; + +export const TEST_URL = 'https://example.com/article'; +export const TEST_SOURCE_URL = 'https://example.com/news'; + +/** Base ZyteArticle. Tests spread and override specific fields. */ +export const ZYTE_ARTICLE: ZyteArticle = { + url: TEST_URL, + headline: 'Test Headline', + description: 'Test description of the article.', + authors: [{ name: 'Jane Doe' }], + mainImage: { url: 'https://example.com/image.jpg' }, + articleBody: 'Full article body text here.', + datePublished: '2025-06-01T12:00:00Z', + breadcrumbs: [{ name: 'News', url: TEST_SOURCE_URL }], + inLanguage: 'en', + metadata: { + probability: 0.95, + dateDownloaded: '2025-06-01T12:01:00Z', + }, +}; + +/** Base ZyteResponse envelope wrapping ZYTE_ARTICLE. */ +export const ZYTE_RESPONSE: ZyteResponse = { + data: ZYTE_ARTICLE, + url: TEST_URL, + statusCode: 200, +}; + +/** Base CrawlArticleMessage for a discovered article. */ +export const BASE_MESSAGE: CrawlArticleMessage = { + url: TEST_URL, + source_url: TEST_SOURCE_URL, + crawl_id: 'crawl-001', + enqueued_at: '2025-06-01T12:00:00Z', +}; + +/** Base CorpusItem whose title/excerpt match ZYTE_ARTICLE. */ +export const CORPUS_ITEM: CorpusItem = { + external_id: 'ext-123', + title: 'Test Headline', + excerpt: 'Test description of the article.', + authors: [{ name: 'Jane Doe' }], + status: 'CORPUS', + language: 'EN', + publisher: 'Example News', + image_url: 'https://s3.amazonaws.com/image.jpg', + topic: 'TECHNOLOGY', + is_time_sensitive: false, +}; + +/** + * Generated 2048-bit RSA JWK used by integration tests that + * need to sign real JWTs against the Corpus API client. The + * production key is a runtime secret. + */ +export const TEST_JWK = JSON.stringify({ + kty: 'RSA', + kid: 'test-kid', + n: 'tlGRF0xXZdfLwe6wLslVERpJS5oH7ZL2UD33DDkx_S__GaHYx2VesvKn5qQ3j1SWgO3VBJV8i2YXpj6xPpZ8Kj34TbuAFO4klTZ1oy8FxyH-yUC56OJi2FlmqIz9zOWVMpkvwWweZcXms1QaPHxCjPix8LaAd1y0_urXnMvbZvgFtgqV3Gv1-rO2hM_VNIgCzZCFQ8Iz1viHAAgdIoOm6Bs7i6skzw0XTC_gv-ZDQmVSsKb6RNQL3Lyto2rbnOmMWt5zxndwZ-AP7UzZuJbR77OphFnUsN0hyL3-ShKMbo8pIQPrJs-B3GAplMCWkxpvyKBqDvYVXt9vrrou_YrITw', + e: 'AQAB', + d: 'WIEEE_FFQ_UbvormD_BAUUsXZZHiY1vCInXSJabmM2hHR-QfXbxB2lCdXQM-zV9cqD3L-Kuwh-MJe_RXCnD22XK3xNROetqX-68yMAM1pNNF4eB_3yN2pFvRz-SRmBOi96sRWa3om7MUKN2c1tvjWpenmZieiFMCsfTCsiTr3vGYrnydPGTB5AJlG_lAKLCX8ta9bDwQK5kJLcc8JyP0Tivwv6Dh0u_wCpc7Iiiq_SmQKLrAvFeJZaF2evh3OY_0zIYREGBKOwooTTnRy0QURTwaiWl7P4PwDDERd0og1Lcar2dj5JHGtcUw0qgbTGz4Du6jzy6-ieWRLoXqeumU0Q', + p: '8IML8lVMrnMDQqylf2fAO0iU6ge80zDvL8dIVWpTfJsBkvL6_eNZkeqC72f3xxH_xSQbdOGKYNDacCQcR2ChMQAn4LKsKE7s7NPmQnE6p36l-GsvwhUWQUXR7Z_A9cTF3k77qyDjK7E9EQ_u_jKsgv2YGl7CRTz_7VByQbf7lks', + q: 'wg8tFkRFCFcydpfqw7xDKdbo1tZMQA_XgkPuSoI5wu5rgMCWQw29s0rL4HM-cdeeyXqgJCRPwOybaWGCm59H1sWZew_rtIqI7M34JhmnxuiX444ySoqo7jCcW0WoJ5xpv-3XXRKC0Si1MC10m2oAvhMHhEiONnMtQ7tCuHZOY40', + dp: '6-nfIgkBemxeWlw2yc3fBUegqh6E3TM2qsry7LWqxqLU3GtyPu9uwG4jmOmGZcIF_D36oJ9KuMSkPzNseacS9ZmNhB4-OBuS0orXZXzjZ8AW1KFu6xT8C3KNBGSbRXeKDxGyUp2jtwvXNpFGgBj8llBhjhw8uuWmtAUgzc3F_hk', + dq: 'kQnyss-3oLI7PzPv_Pc6Y40CXX-xYbf1ZKEM-pc2QKEdrA9Evz0H6XcfxdOcek2jmgaSpjCVgyXUSgDdMx7q_HSXb8jIbBmWmRagPymxohK5YxQmNlxIQi4GzpjTQze-OfqzmhZ5u4XnVejDXFzvzSA_3_iygbO3wwW0qlWR5Qk', + qi: '1l7lV0YqJuRptrOKwbhna6O2OavR7xxmSI2HUzkM45tEjmaZ-w9VcQ_XkTNUZEbkk-qioNMmUVGYwW30sWkMiVI2RC_ybsaFmJ61VvfUp_0j7jg_6KsFJ6UXoDDOL1kB2bWoI0DyLe3tLcVxfjTehmNAVr51w9I9JzCT6Y4DS4o', +});