Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 51 additions & 5 deletions lib/api/api-stream.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,61 @@
'use strict'

const assert = require('node:assert')
const { finished } = require('node:stream')
const { AsyncResource } = require('node:async_hooks')
const { InvalidArgumentError, InvalidReturnValueError } = require('../core/errors')
const util = require('../core/util')
const { addSignal, removeSignal } = require('./abort-signal')

function noop () {}

function getWritableError (stream) {
return stream.errored ?? stream.writableErrored ?? stream._writableState?.errored
}

function createPrematureCloseError () {
const err = new Error('Premature close')
err.code = 'ERR_STREAM_PREMATURE_CLOSE'
return err
}

function trackWritableLifecycle (stream, callback) {
let done = false

const cleanup = () => {
stream.removeListener('close', onClose)
stream.removeListener('error', onError)
stream.removeListener('finish', onFinish)
}

const finish = (err, fromErrorEvent = false) => {
if (done) {
return
}

done = true
cleanup()
callback(err, fromErrorEvent)
}

const onClose = () => {
const err = getWritableError(stream)
finish(err ?? (!stream.writableFinished ? createPrematureCloseError() : undefined))
}

const onError = (err) => finish(err, true)
const onFinish = () => finish()

stream.on('close', onClose)
stream.on('error', onError)
stream.on('finish', onFinish)

if (stream.closed) {
process.nextTick(onClose)
} else if (stream.writableFinished) {
process.nextTick(onFinish)
}
}

class StreamHandler extends AsyncResource {
constructor (opts, factory, callback) {
if (!opts || typeof opts !== 'object') {
Expand Down Expand Up @@ -117,20 +164,19 @@ class StreamHandler extends AsyncResource {
throw new InvalidReturnValueError('expected Writable')
}

// TODO: Avoid finished. It registers an unnecessary amount of listeners.
finished(res, { readable: false }, (err) => {
trackWritableLifecycle(res, (err, fromErrorEvent) => {
const { callback, res, opaque, trailers, abort } = this

this.res = null
if (err || !res?.readable) {
util.destroy(res, err)
util.destroy(res, fromErrorEvent ? undefined : err)
}

this.callback = null
this.runInAsyncScope(callback, null, err || null, { opaque, trailers })

if (err) {
abort()
abort(err)
}
})

Expand Down
59 changes: 59 additions & 0 deletions test/client-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,38 @@ test('stream GET destroy res', async (t) => {
await t.completed
})

test('stream GET destroy res without error', async (t) => {
t = tspl(t, { plan: 1 })

const server = createServer({ joinDuplicateHeaders: true }, (req, res) => {
res.write('hello')
setImmediate(() => {
res.end(' world')
})
})
after(() => server.close())

server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
after(() => client.close())

client.stream({
path: '/',
method: 'GET'
}, () => {
const pt = new PassThrough()
pt.on('data', () => {
pt.destroy()
})
return pt
}, (err) => {
t.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE')
})
})

await t.completed
})

test('stream GET remote destroy', async (t) => {
t = tspl(t, { plan: 4 })

Expand Down Expand Up @@ -283,6 +315,33 @@ test('stream waits only for writable side', async (t) => {
await t.completed
})

test('stream accepts already finished writable', async (t) => {
t = tspl(t, { plan: 1 })

const server = createServer({ joinDuplicateHeaders: true }, (req, res) => {
res.end()
})
after(() => server.close())

server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
after(() => client.close())

client.stream({
path: '/',
method: 'GET'
}, () => {
const pt = new PassThrough({ autoDestroy: false })
pt.end()
return pt
}, (err) => {
t.ifError(err)
})
})

await t.completed
})

test('stream args validation', async (t) => {
t = tspl(t, { plan: 3 })

Expand Down
Loading