diff --git a/documentation/behind-the-scenes-of-adding-a-file.md b/documentation/behind-the-scenes-of-adding-a-file.md index 82ce8095..58389f0f 100644 --- a/documentation/behind-the-scenes-of-adding-a-file.md +++ b/documentation/behind-the-scenes-of-adding-a-file.md @@ -83,7 +83,7 @@ v1 CAR containing the Merkle DAG representing the provided file. There is one r *Expected duration:* -This is a function of the size of the input file and the hardware. Typical DAGification of files and directories is relatively quick as it's simply a matter of chunking and hashing using common algorithms. The most time-consuming part is the generation of the ["Piece CID](glossary.md#piece-cid) of the whole CAR on the client side prior to upload, where a a 1Gb input can take upwards of a minute. As the car is being created, it can be streamed to an SP, which is most likely the bottleneck. +Depends on input size, local disk/CPU, and uplink speed. DAGification itself is mostly chunk-and-hash and is fast on modern hardware. As we build the CAR, we stream it directly to the SP (no full buffering); [Synapse](glossary.md#synapse) overlaps DAG creation, [Piece CID](glossary.md#piece-cid) calculation, and upload, so on a fast machine the bottleneck is usually your upload bandwidth. ### Upload CAR @@ -97,13 +97,13 @@ The upload includes [metadata](glossary.md#metadata) that will be stored on-chai *Outputs:* -SP parks the Piece and queues it up for processing, while the client gets an HTTP response with the [Piece CID](glossary.md#piece-cid). The server calculates the Piece CID for the data and confirms that it matches the Piece CID calculated and provided by the Filecoin Pin client to provide assurance that we are providing the exact bytes we expect. +SP parks the Piece and queues it up for processing, while the client gets an HTTP response with the [Piece CID](glossary.md#piece-cid). Synapse now handles Piece CID calculation/validation while streaming, keeping client and SP in sync without an extra buffer round-trip. Since the SP has the data for the Piece, it can be retrieved with https://sp.domain/piece/$pieceCid retrieval. *Expected duration:* -This is a function of the CAR size and the throughput between the client and the SP. +This is a function of the CAR size and the throughput between the client and the SP. The current maximum piece size is dictated by Synapse + SP limits (tracked in https://github.com/FilOzone/synapse-sdk/issues/110). ### Index and Advertise CAR CIDs diff --git a/documentation/glossary.md b/documentation/glossary.md index 4e136afb..1a7e2da0 100644 --- a/documentation/glossary.md +++ b/documentation/glossary.md @@ -119,6 +119,10 @@ With [Filecoin Pin](#filecoin-pin), the Piece is the [CAR](#car) file itself; an PieceCID, or "CommP" (Commitment of Piece), is a specific form of [CID](#cid) used in Filecoin to commit Merkle proofs of large _pieces_ of data on chain. A PieceCID includes a digest of the contiguous bytes, with no special handling of any internal format or packing (including CAR formats containing IPFS data). It uses a modified form of SHA2-256 internally, and further details can be found in [FRC-0069](https://github.com/filecoin-project/FIPs/blob/master/FRCs/frc-0069.md). PieceCID is a variant of CID specifically for use in Filecoin's proof system, and will differ from the CIDs used in IPFS. When presented in standard base32 format, it will begin with the characters `bafkzcib` and be between 64 and 65 characters long. +## Streaming Uploads + +[Filecoin Pin](#filecoin-pin) streams CAR data to [Service Providers](#service-provider) via [Synapse](#synapse), so files are not buffered fully in memory during upload. The maximum supported piece size is bounded by the Synapse SDK and SP configuration (tracked in https://github.com/FilOzone/synapse-sdk/issues/110). + ## `/piece` Retrieval This is a Filecoin-defined retrieval specification outlined in https://github.com/filecoin-project/FIPs/blob/master/FRCs/frc-0066.md. It is for retrieving pieces by [Piece CID](#piece-cid), optionally taking a byte range specified by standard HTTP request format. Piece retrieval is useful for downloading the bytes _as they are stored and proven_ in Filecoin, either to request the original non-IPFS data stored, or downloading the CAR format data generated by Filecoin Pin. diff --git a/src/add/add.ts b/src/add/add.ts index 3a41f749..e07c7efb 100644 --- a/src/add/add.ts +++ b/src/add/add.ts @@ -5,7 +5,9 @@ * It encodes content as UnixFS, creates CAR files, and uploads to Filecoin. */ -import { readFile, stat } from 'node:fs/promises' +import { createReadStream } from 'node:fs' +import { stat } from 'node:fs/promises' +import { Readable } from 'node:stream' import pc from 'picocolors' import pino from 'pino' import { warnAboutCDNPricingLimitations } from '../common/cdn-warning.js' @@ -151,11 +153,12 @@ export async function runAdd(options: AddOptions): Promise { spinner.stop(`${pc.green('✓')} ${isDirectory ? 'Directory' : 'File'} packed with root CID: ${rootCid.toString()}`) - // Read CAR data - spinner.start('Loading packed IPFS content ...') - const carData = await readFile(tempCarPath) - const carSize = carData.length - spinner.stop(`${pc.green('✓')} IPFS content loaded (${formatFileSize(carSize)})`) + // Prepare CAR data stream + spinner.start('Preparing packed IPFS content for upload...') + const carStats = await stat(tempCarPath) + const carSize = carStats.size + const carStream = Readable.toWeb(createReadStream(tempCarPath)) as ReadableStream + spinner.stop(`${pc.green('✓')} IPFS content ready (${formatFileSize(carSize)})`) if (options.autoFund) { // Perform auto-funding if requested (now that we know the file size) @@ -204,7 +207,7 @@ export async function runAdd(options: AddOptions): Promise { const synapseService: SynapseService = { synapse, storage, providerInfo } // Upload to Synapse - const uploadResult = await performUpload(synapseService, carData, rootCid, { + const uploadResult = await performUpload(synapseService, carStream, rootCid, { contextType: 'add', fileSize: carSize, logger, diff --git a/src/common/upload-flow.ts b/src/common/upload-flow.ts index 20e3aaf9..d9a95006 100644 --- a/src/common/upload-flow.ts +++ b/src/common/upload-flow.ts @@ -17,6 +17,7 @@ import { getDownloadURL, getServiceURL, type SynapseUploadResult, + type UploadData, } from '../core/upload/index.js' import { formatUSDFC } from '../core/utils/format.js' import { autoFund } from '../payments/fund.js' @@ -256,7 +257,7 @@ function displayPaymentIssues(capacityCheck: PaymentCapacityCheck, fileSize: num */ export async function performUpload( synapseService: SynapseService, - carData: Uint8Array, + carData: UploadData, rootCid: CID, options: UploadFlowOptions ): Promise { diff --git a/src/core/upload/index.ts b/src/core/upload/index.ts index f682b624..b7377dc0 100644 --- a/src/core/upload/index.ts +++ b/src/core/upload/index.ts @@ -17,9 +17,9 @@ import { type WaitForIpniProviderResultsOptions, waitForIpniProviderResults, } from '../utils/validate-ipni-advertisement.js' -import { type SynapseUploadResult, type UploadProgressEvents, uploadToSynapse } from './synapse.js' +import { type SynapseUploadResult, type UploadData, type UploadProgressEvents, uploadToSynapse } from './synapse.js' -export type { SynapseUploadOptions, SynapseUploadResult, UploadProgressEvents } from './synapse.js' +export type { SynapseUploadOptions, SynapseUploadResult, UploadData, UploadProgressEvents } from './synapse.js' export { getDownloadURL, getServiceURL, uploadToSynapse } from './synapse.js' /** @@ -217,7 +217,7 @@ export interface UploadExecutionResult extends SynapseUploadResult { */ export async function executeUpload( synapseService: SynapseService, - carData: Uint8Array, + carData: UploadData, rootCid: CID, options: UploadExecutionOptions ): Promise { diff --git a/src/core/upload/synapse.ts b/src/core/upload/synapse.ts index 8f6e4064..f019bf89 100644 --- a/src/core/upload/synapse.ts +++ b/src/core/upload/synapse.ts @@ -11,6 +11,8 @@ import type { Logger } from 'pino' import type { SynapseService } from '../synapse/index.js' import type { ProgressEvent, ProgressEventHandler } from '../utils/types.js' +export type UploadData = Uint8Array | ReadableStream + export type UploadProgressEvents = | ProgressEvent<'onUploadComplete', { pieceCid: PieceCID }> | ProgressEvent<'onPieceAdded', { txHash: `0x${string}` | undefined }> @@ -64,7 +66,7 @@ export function getServiceURL(providerInfo: ProviderInfo): string { * 3. Return piece information * * @param synapseService - Initialized Synapse service - * @param carData - CAR file data as Uint8Array + * @param carData - CAR file data as Uint8Array or streaming source * @param rootCid - The IPFS root CID to associate with this piece * @param logger - Logger instance for tracking * @param options - Optional callbacks and context @@ -72,7 +74,7 @@ export function getServiceURL(providerInfo: ProviderInfo): string { */ export async function uploadToSynapse( synapseService: SynapseService, - carData: Uint8Array, + carData: UploadData, rootCid: CID, logger: Logger, options: SynapseUploadOptions = {} diff --git a/src/filecoin-pin-store.ts b/src/filecoin-pin-store.ts index ff1bb167..bc3d9a7e 100644 --- a/src/filecoin-pin-store.ts +++ b/src/filecoin-pin-store.ts @@ -1,6 +1,8 @@ import { EventEmitter } from 'node:events' -import { readFile, unlink } from 'node:fs/promises' +import { createReadStream } from 'node:fs' +import { unlink } from 'node:fs/promises' import { join } from 'node:path' +import { Readable } from 'node:stream' import type { Helia } from 'helia' import type { CID } from 'multiformats/cid' import type { Logger } from 'pino' @@ -304,12 +306,11 @@ export class FilecoinPinStore extends EventEmitter { // 3. Track the returned piece information in application state // 4. Handle errors gracefully with proper cleanup try { - // Read the CAR file (streaming not yet supported in Synapse) - // TODO: When Synapse supports streaming, this could be optimized - const carData = await readFile(pinStatus.filecoin.carFilePath) + // Stream the CAR file to Synapse to avoid buffering large data in memory + const carStream = Readable.toWeb(createReadStream(pinStatus.filecoin.carFilePath)) as ReadableStream // Upload using shared function with pinId as context and IPFS root CID metadata - const uploadResult = await uploadToSynapse(this.synapseService, carData, cid, this.logger, { + const uploadResult = await uploadToSynapse(this.synapseService, carStream, cid, this.logger, { contextId: pinId, }) diff --git a/src/import/import.ts b/src/import/import.ts index fae97d8c..fcde311b 100644 --- a/src/import/import.ts +++ b/src/import/import.ts @@ -6,7 +6,8 @@ */ import { createReadStream } from 'node:fs' -import { readFile, stat } from 'node:fs/promises' +import { stat } from 'node:fs/promises' +import { Readable } from 'node:stream' import { CarReader } from '@ipld/car' import { CID } from 'multiformats/cid' import pc from 'picocolors' @@ -244,13 +245,12 @@ export async function runCarImport(options: ImportOptions): Promise // Upload using common upload flow - const uploadResult = await performUpload(synapseService, carData, rootCid, { + const uploadResult = await performUpload(synapseService, carStream, rootCid, { contextType: 'import', fileSize: fileStat.size, logger, diff --git a/src/test/unit/add.test.ts b/src/test/unit/add.test.ts index 56514bac..74b37881 100644 --- a/src/test/unit/add.test.ts +++ b/src/test/unit/add.test.ts @@ -90,11 +90,19 @@ vi.mock('../../utils/cli-helpers.js', () => ({ })) // We need to partially mock fs/promises to keep real file operations for test setup -// but mock readFile for the CAR reading part +// but mock readFile/stat for the CAR handling part vi.mock('node:fs/promises', async () => { const actual = await vi.importActual('node:fs/promises') return { ...actual, + stat: vi.fn((path: string) => { + if (path === '/tmp/test.car') { + return Promise.resolve({ + size: 1024, + } as any) + } + return actual.stat(path as any) + }), readFile: vi.fn((path: string) => { // If it's reading the temp CAR, return mock data if (path === '/tmp/test.car') { diff --git a/src/test/unit/import.test.ts b/src/test/unit/import.test.ts index 93ec4687..579ca7e7 100644 --- a/src/test/unit/import.test.ts +++ b/src/test/unit/import.test.ts @@ -13,6 +13,7 @@ import { createWriteStream } from 'node:fs' import { mkdir, rm, stat, writeFile } from 'node:fs/promises' import { join } from 'node:path' import { pipeline } from 'node:stream/promises' +import { ReadableStream } from 'node:stream/web' import { CarWriter } from '@ipld/car' import { CID } from 'multiformats/cid' import * as raw from 'multiformats/codecs/raw' @@ -455,10 +456,9 @@ describe('CAR Import', () => { ) const { performUpload } = await import('../../common/upload-flow.js') - expect(vi.mocked(performUpload)).toHaveBeenCalledWith( - expect.any(Object), - expect.any(Uint8Array), - expect.any(Object), + const lastCall = vi.mocked(performUpload).mock.calls.at(-1) + expect(lastCall?.[1]).toBeInstanceOf(ReadableStream) + expect(lastCall?.[3]).toEqual( expect.objectContaining({ pieceMetadata: { ics: '8004' }, }) diff --git a/upload-action/src/filecoin.js b/upload-action/src/filecoin.js index ebeb0284..0c02d1ba 100644 --- a/upload-action/src/filecoin.js +++ b/upload-action/src/filecoin.js @@ -1,4 +1,5 @@ -import { promises as fs } from 'node:fs' +import { createReadStream } from 'node:fs' +import { Readable } from 'node:stream' import { calculateRequiredTopUp, calculateStorageRunway, @@ -125,8 +126,8 @@ export async function handlePayments(synapse, options, logger) { export async function uploadCarToFilecoin(synapse, carPath, ipfsRootCid, options, logger) { const { withCDN, providerAddress, providerId } = options - // Read CAR data - const carBytes = await fs.readFile(carPath) + // Stream CAR data + const carStream = Readable.toWeb(createReadStream(carPath)) // Create storage context with provider selection /** @type {CreateStorageContextOptions} */ @@ -151,7 +152,7 @@ export async function uploadCarToFilecoin(synapse, carPath, ipfsRootCid, options console.log('\nStarting upload to storage provider...') console.log('⏳ Uploading data to PDP server...') - const uploadResult = await executeUpload(synapseService, carBytes, cid, { + const uploadResult = await executeUpload(synapseService, carStream, cid, { logger, contextId: `gha-upload-${Date.now()}`, onProgress: (event) => {