193 lines
4.3 KiB
JavaScript

const { Duplex, pipeline } = require('streamx')
module.exports = class Composer extends Duplex {
constructor (opts) {
super(opts)
this._writable = null
this._readable = null
this._isPipeline = false
this._pipelineMissing = 2
this._writeCallback = null
this._finalCallback = null
this._ondata = this._pushData.bind(this)
this._onend = this._pushEnd.bind(this, null)
this._ondrain = this._continueWrite.bind(this, null)
this._onfinish = this._maybeFinal.bind(this)
this._onerror = this.destroy.bind(this)
this._onclose = this.destroy.bind(this, null)
}
static pipeline (...streams) {
const c = new Composer()
c.setPipeline(...streams)
return c
}
static duplexer (ws = null, rs = null) {
const c = new Composer()
c.setWritable(ws)
c.setReadable(rs)
return c
}
setPipeline (first, ...streams) {
const all = Array.isArray(first) ? first : [first, ...streams]
this._isPipeline = true
this.setWritable(all[0])
this.setReadable(all[all.length - 1])
pipeline(all, (err) => {
if (err) this.destroy(err)
})
return this
}
setReadable (rs) {
if (this._readable) {
this._readable.removeListener('data', this._ondata)
this._readable.removeListener('end', this._onend)
this._readable.removeListener('error', this._onerror)
this._readable.removeListener('close', this._onclose)
}
if (rs === null) {
this._readable = null
this.push(null)
this.resume()
return
}
this._readable = rs
this._readable.on('data', this._ondata)
this._readable.on('end', this._onend)
this._readable.on('error', this._onerror)
this._readable.on('close', this._onclose)
if (this.destroying && this._readable.destroy) {
this._readable.destroy()
}
return this
}
setWritable (ws) {
if (this._writable) {
this._writable.removeListener('drain', this._ondrain)
this._writable.removeListener('finish', this._onfinish)
this._writable.removeListener('error', this._onerror)
this._writable.removeListener('close', this._onclose)
}
if (ws === null) {
this._writable = null
this._continueWrite(null)
this.end()
return
}
this._writable = ws
this._writable.on('drain', this._ondrain)
this._writable.on('finish', this._onfinish)
this._writable.on('error', this._onerror)
this._writable.on('close', this._onclose)
if (this.destroying && this._writable.destroy) {
this._writable.destroy()
}
return this
}
_read (cb) {
if (this._readable !== null) {
this._readable.resume()
}
cb(null)
}
_pushData (data) {
if (this.push(data) === false && this._readable !== null) {
this._readable.pause()
}
}
_pushEnd () {
if (this._isPipeline) {
this.on('end', this._decrementPipeline.bind(this))
}
this.push(null)
if (this._readable !== null) {
this._readable.removeListener('close', this._onclose)
}
}
_decrementPipeline () {
if (--this._pipelineMissing === 0) this._continueFinal(null)
}
_maybeFinal () {
if (this._writable !== null) {
this._writable.removeListener('close', this._onclose)
}
if (this._isPipeline) this._decrementPipeline()
else this._continueFinal(null)
}
_continueFinal (err) {
if (this._finalCallback === null) return
const cb = this._finalCallback
this._finalCallback = null
cb(err)
}
_continueWrite (err) {
if (this._writeCallback === null) return
const cb = this._writeCallback
this._writeCallback = null
cb(err)
}
_predestroy () {
if (this._writable !== null && this._writable.destroy) this._writable.destroy()
if (this._readable !== null && this._readable.destroy) this._readable.destroy()
this._continueWrite(new Error('Stream destroyed'))
this._continueFinal(new Error('Stream destroyed'))
}
_writev (datas, cb) {
if (this._writable === null) {
return cb(null)
}
let flushed = true
for (const data of datas) {
flushed = this._writable.write(data)
}
if (!flushed) {
this._writeCallback = cb
return
}
cb(null)
}
_final (cb) {
if (this._writable === null) {
return cb(null)
}
this._finalCallback = cb
this._writable.end()
}
}