538 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
			
		
		
	
	
			538 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			JavaScript
		
	
	
	
'use strict'
 | 
						|
 | 
						|
const { version } = require('./package.json')
 | 
						|
const { EventEmitter } = require('events')
 | 
						|
const { Worker } = require('worker_threads')
 | 
						|
const { join } = require('path')
 | 
						|
const { pathToFileURL } = require('url')
 | 
						|
const { wait } = require('./lib/wait')
 | 
						|
const {
 | 
						|
  WRITE_INDEX,
 | 
						|
  READ_INDEX
 | 
						|
} = require('./lib/indexes')
 | 
						|
const buffer = require('buffer')
 | 
						|
const assert = require('assert')
 | 
						|
 | 
						|
const kImpl = Symbol('kImpl')
 | 
						|
 | 
						|
// V8 limit for string size
 | 
						|
const MAX_STRING = buffer.constants.MAX_STRING_LENGTH
 | 
						|
 | 
						|
class FakeWeakRef {
 | 
						|
  constructor (value) {
 | 
						|
    this._value = value
 | 
						|
  }
 | 
						|
 | 
						|
  deref () {
 | 
						|
    return this._value
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
class FakeFinalizationRegistry {
 | 
						|
  register () {}
 | 
						|
 | 
						|
  unregister () {}
 | 
						|
}
 | 
						|
 | 
						|
// Currently using FinalizationRegistry with code coverage breaks the world
 | 
						|
// Ref: https://github.com/nodejs/node/issues/49344
 | 
						|
const FinalizationRegistry = process.env.NODE_V8_COVERAGE ? FakeFinalizationRegistry : global.FinalizationRegistry || FakeFinalizationRegistry
 | 
						|
const WeakRef = process.env.NODE_V8_COVERAGE ? FakeWeakRef : global.WeakRef || FakeWeakRef
 | 
						|
 | 
						|
const registry = new FinalizationRegistry((worker) => {
 | 
						|
  if (worker.exited) {
 | 
						|
    return
 | 
						|
  }
 | 
						|
  worker.terminate()
 | 
						|
})
 | 
						|
 | 
						|
function createWorker (stream, opts) {
 | 
						|
  const { filename, workerData } = opts
 | 
						|
 | 
						|
  const bundlerOverrides = '__bundlerPathsOverrides' in globalThis ? globalThis.__bundlerPathsOverrides : {}
 | 
						|
  const toExecute = bundlerOverrides['thread-stream-worker'] || join(__dirname, 'lib', 'worker.js')
 | 
						|
 | 
						|
  const worker = new Worker(toExecute, {
 | 
						|
    ...opts.workerOpts,
 | 
						|
    trackUnmanagedFds: false,
 | 
						|
    workerData: {
 | 
						|
      filename: filename.indexOf('file://') === 0
 | 
						|
        ? filename
 | 
						|
        : pathToFileURL(filename).href,
 | 
						|
      dataBuf: stream[kImpl].dataBuf,
 | 
						|
      stateBuf: stream[kImpl].stateBuf,
 | 
						|
      workerData: {
 | 
						|
        $context: {
 | 
						|
          threadStreamVersion: version
 | 
						|
        },
 | 
						|
        ...workerData
 | 
						|
      }
 | 
						|
    }
 | 
						|
  })
 | 
						|
 | 
						|
  // We keep a strong reference for now,
 | 
						|
  // we need to start writing first
 | 
						|
  worker.stream = new FakeWeakRef(stream)
 | 
						|
 | 
						|
  worker.on('message', onWorkerMessage)
 | 
						|
  worker.on('exit', onWorkerExit)
 | 
						|
  registry.register(stream, worker)
 | 
						|
 | 
						|
  return worker
 | 
						|
}
 | 
						|
 | 
						|
function drain (stream) {
 | 
						|
  assert(!stream[kImpl].sync)
 | 
						|
  if (stream[kImpl].needDrain) {
 | 
						|
    stream[kImpl].needDrain = false
 | 
						|
    stream.emit('drain')
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
function nextFlush (stream) {
 | 
						|
  const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX)
 | 
						|
  let leftover = stream[kImpl].data.length - writeIndex
 | 
						|
 | 
						|
  if (leftover > 0) {
 | 
						|
    if (stream[kImpl].buf.length === 0) {
 | 
						|
      stream[kImpl].flushing = false
 | 
						|
 | 
						|
      if (stream[kImpl].ending) {
 | 
						|
        end(stream)
 | 
						|
      } else if (stream[kImpl].needDrain) {
 | 
						|
        process.nextTick(drain, stream)
 | 
						|
      }
 | 
						|
 | 
						|
      return
 | 
						|
    }
 | 
						|
 | 
						|
    let toWrite = stream[kImpl].buf.slice(0, leftover)
 | 
						|
    let toWriteBytes = Buffer.byteLength(toWrite)
 | 
						|
    if (toWriteBytes <= leftover) {
 | 
						|
      stream[kImpl].buf = stream[kImpl].buf.slice(leftover)
 | 
						|
      // process._rawDebug('writing ' + toWrite.length)
 | 
						|
      write(stream, toWrite, nextFlush.bind(null, stream))
 | 
						|
    } else {
 | 
						|
      // multi-byte utf-8
 | 
						|
      stream.flush(() => {
 | 
						|
        // err is already handled in flush()
 | 
						|
        if (stream.destroyed) {
 | 
						|
          return
 | 
						|
        }
 | 
						|
 | 
						|
        Atomics.store(stream[kImpl].state, READ_INDEX, 0)
 | 
						|
        Atomics.store(stream[kImpl].state, WRITE_INDEX, 0)
 | 
						|
 | 
						|
        // Find a toWrite length that fits the buffer
 | 
						|
        // it must exists as the buffer is at least 4 bytes length
 | 
						|
        // and the max utf-8 length for a char is 4 bytes.
 | 
						|
        while (toWriteBytes > stream[kImpl].data.length) {
 | 
						|
          leftover = leftover / 2
 | 
						|
          toWrite = stream[kImpl].buf.slice(0, leftover)
 | 
						|
          toWriteBytes = Buffer.byteLength(toWrite)
 | 
						|
        }
 | 
						|
        stream[kImpl].buf = stream[kImpl].buf.slice(leftover)
 | 
						|
        write(stream, toWrite, nextFlush.bind(null, stream))
 | 
						|
      })
 | 
						|
    }
 | 
						|
  } else if (leftover === 0) {
 | 
						|
    if (writeIndex === 0 && stream[kImpl].buf.length === 0) {
 | 
						|
      // we had a flushSync in the meanwhile
 | 
						|
      return
 | 
						|
    }
 | 
						|
    stream.flush(() => {
 | 
						|
      Atomics.store(stream[kImpl].state, READ_INDEX, 0)
 | 
						|
      Atomics.store(stream[kImpl].state, WRITE_INDEX, 0)
 | 
						|
      nextFlush(stream)
 | 
						|
    })
 | 
						|
  } else {
 | 
						|
    // This should never happen
 | 
						|
    destroy(stream, new Error('overwritten'))
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
function onWorkerMessage (msg) {
 | 
						|
  const stream = this.stream.deref()
 | 
						|
  if (stream === undefined) {
 | 
						|
    this.exited = true
 | 
						|
    // Terminate the worker.
 | 
						|
    this.terminate()
 | 
						|
    return
 | 
						|
  }
 | 
						|
 | 
						|
  switch (msg.code) {
 | 
						|
    case 'READY':
 | 
						|
      // Replace the FakeWeakRef with a
 | 
						|
      // proper one.
 | 
						|
      this.stream = new WeakRef(stream)
 | 
						|
 | 
						|
      stream.flush(() => {
 | 
						|
        stream[kImpl].ready = true
 | 
						|
        stream.emit('ready')
 | 
						|
      })
 | 
						|
      break
 | 
						|
    case 'ERROR':
 | 
						|
      destroy(stream, msg.err)
 | 
						|
      break
 | 
						|
    case 'EVENT':
 | 
						|
      if (Array.isArray(msg.args)) {
 | 
						|
        stream.emit(msg.name, ...msg.args)
 | 
						|
      } else {
 | 
						|
        stream.emit(msg.name, msg.args)
 | 
						|
      }
 | 
						|
      break
 | 
						|
    case 'WARNING':
 | 
						|
      process.emitWarning(msg.err)
 | 
						|
      break
 | 
						|
    default:
 | 
						|
      destroy(stream, new Error('this should not happen: ' + msg.code))
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
function onWorkerExit (code) {
 | 
						|
  const stream = this.stream.deref()
 | 
						|
  if (stream === undefined) {
 | 
						|
    // Nothing to do, the worker already exit
 | 
						|
    return
 | 
						|
  }
 | 
						|
  registry.unregister(stream)
 | 
						|
  stream.worker.exited = true
 | 
						|
  stream.worker.off('exit', onWorkerExit)
 | 
						|
  destroy(stream, code !== 0 ? new Error('the worker thread exited') : null)
 | 
						|
}
 | 
						|
 | 
						|
class ThreadStream extends EventEmitter {
 | 
						|
  constructor (opts = {}) {
 | 
						|
    super()
 | 
						|
 | 
						|
    if (opts.bufferSize < 4) {
 | 
						|
      throw new Error('bufferSize must at least fit a 4-byte utf-8 char')
 | 
						|
    }
 | 
						|
 | 
						|
    this[kImpl] = {}
 | 
						|
    this[kImpl].stateBuf = new SharedArrayBuffer(128)
 | 
						|
    this[kImpl].state = new Int32Array(this[kImpl].stateBuf)
 | 
						|
    this[kImpl].dataBuf = new SharedArrayBuffer(opts.bufferSize || 4 * 1024 * 1024)
 | 
						|
    this[kImpl].data = Buffer.from(this[kImpl].dataBuf)
 | 
						|
    this[kImpl].sync = opts.sync || false
 | 
						|
    this[kImpl].ending = false
 | 
						|
    this[kImpl].ended = false
 | 
						|
    this[kImpl].needDrain = false
 | 
						|
    this[kImpl].destroyed = false
 | 
						|
    this[kImpl].flushing = false
 | 
						|
    this[kImpl].ready = false
 | 
						|
    this[kImpl].finished = false
 | 
						|
    this[kImpl].errored = null
 | 
						|
    this[kImpl].closed = false
 | 
						|
    this[kImpl].buf = ''
 | 
						|
 | 
						|
    // TODO (fix): Make private?
 | 
						|
    this.worker = createWorker(this, opts) // TODO (fix): make private
 | 
						|
    this.on('message', (message, transferList) => {
 | 
						|
      this.worker.postMessage(message, transferList)
 | 
						|
    })
 | 
						|
  }
 | 
						|
 | 
						|
  write (data) {
 | 
						|
    if (this[kImpl].destroyed) {
 | 
						|
      error(this, new Error('the worker has exited'))
 | 
						|
      return false
 | 
						|
    }
 | 
						|
 | 
						|
    if (this[kImpl].ending) {
 | 
						|
      error(this, new Error('the worker is ending'))
 | 
						|
      return false
 | 
						|
    }
 | 
						|
 | 
						|
    if (this[kImpl].flushing && this[kImpl].buf.length + data.length >= MAX_STRING) {
 | 
						|
      try {
 | 
						|
        writeSync(this)
 | 
						|
        this[kImpl].flushing = true
 | 
						|
      } catch (err) {
 | 
						|
        destroy(this, err)
 | 
						|
        return false
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    this[kImpl].buf += data
 | 
						|
 | 
						|
    if (this[kImpl].sync) {
 | 
						|
      try {
 | 
						|
        writeSync(this)
 | 
						|
        return true
 | 
						|
      } catch (err) {
 | 
						|
        destroy(this, err)
 | 
						|
        return false
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    if (!this[kImpl].flushing) {
 | 
						|
      this[kImpl].flushing = true
 | 
						|
      setImmediate(nextFlush, this)
 | 
						|
    }
 | 
						|
 | 
						|
    this[kImpl].needDrain = this[kImpl].data.length - this[kImpl].buf.length - Atomics.load(this[kImpl].state, WRITE_INDEX) <= 0
 | 
						|
    return !this[kImpl].needDrain
 | 
						|
  }
 | 
						|
 | 
						|
  end () {
 | 
						|
    if (this[kImpl].destroyed) {
 | 
						|
      return
 | 
						|
    }
 | 
						|
 | 
						|
    this[kImpl].ending = true
 | 
						|
    end(this)
 | 
						|
  }
 | 
						|
 | 
						|
  flush (cb) {
 | 
						|
    if (this[kImpl].destroyed) {
 | 
						|
      if (typeof cb === 'function') {
 | 
						|
        process.nextTick(cb, new Error('the worker has exited'))
 | 
						|
      }
 | 
						|
      return
 | 
						|
    }
 | 
						|
 | 
						|
    // TODO write all .buf
 | 
						|
    const writeIndex = Atomics.load(this[kImpl].state, WRITE_INDEX)
 | 
						|
    // process._rawDebug(`(flush) readIndex (${Atomics.load(this.state, READ_INDEX)}) writeIndex (${Atomics.load(this.state, WRITE_INDEX)})`)
 | 
						|
    wait(this[kImpl].state, READ_INDEX, writeIndex, Infinity, (err, res) => {
 | 
						|
      if (err) {
 | 
						|
        destroy(this, err)
 | 
						|
        process.nextTick(cb, err)
 | 
						|
        return
 | 
						|
      }
 | 
						|
      if (res === 'not-equal') {
 | 
						|
        // TODO handle deadlock
 | 
						|
        this.flush(cb)
 | 
						|
        return
 | 
						|
      }
 | 
						|
      process.nextTick(cb)
 | 
						|
    })
 | 
						|
  }
 | 
						|
 | 
						|
  flushSync () {
 | 
						|
    if (this[kImpl].destroyed) {
 | 
						|
      return
 | 
						|
    }
 | 
						|
 | 
						|
    writeSync(this)
 | 
						|
    flushSync(this)
 | 
						|
  }
 | 
						|
 | 
						|
  unref () {
 | 
						|
    this.worker.unref()
 | 
						|
  }
 | 
						|
 | 
						|
  ref () {
 | 
						|
    this.worker.ref()
 | 
						|
  }
 | 
						|
 | 
						|
  get ready () {
 | 
						|
    return this[kImpl].ready
 | 
						|
  }
 | 
						|
 | 
						|
  get destroyed () {
 | 
						|
    return this[kImpl].destroyed
 | 
						|
  }
 | 
						|
 | 
						|
  get closed () {
 | 
						|
    return this[kImpl].closed
 | 
						|
  }
 | 
						|
 | 
						|
  get writable () {
 | 
						|
    return !this[kImpl].destroyed && !this[kImpl].ending
 | 
						|
  }
 | 
						|
 | 
						|
  get writableEnded () {
 | 
						|
    return this[kImpl].ending
 | 
						|
  }
 | 
						|
 | 
						|
  get writableFinished () {
 | 
						|
    return this[kImpl].finished
 | 
						|
  }
 | 
						|
 | 
						|
  get writableNeedDrain () {
 | 
						|
    return this[kImpl].needDrain
 | 
						|
  }
 | 
						|
 | 
						|
  get writableObjectMode () {
 | 
						|
    return false
 | 
						|
  }
 | 
						|
 | 
						|
  get writableErrored () {
 | 
						|
    return this[kImpl].errored
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
function error (stream, err) {
 | 
						|
  setImmediate(() => {
 | 
						|
    stream.emit('error', err)
 | 
						|
  })
 | 
						|
}
 | 
						|
 | 
						|
function destroy (stream, err) {
 | 
						|
  if (stream[kImpl].destroyed) {
 | 
						|
    return
 | 
						|
  }
 | 
						|
  stream[kImpl].destroyed = true
 | 
						|
 | 
						|
  if (err) {
 | 
						|
    stream[kImpl].errored = err
 | 
						|
    error(stream, err)
 | 
						|
  }
 | 
						|
 | 
						|
  if (!stream.worker.exited) {
 | 
						|
    stream.worker.terminate()
 | 
						|
      .catch(() => {})
 | 
						|
      .then(() => {
 | 
						|
        stream[kImpl].closed = true
 | 
						|
        stream.emit('close')
 | 
						|
      })
 | 
						|
  } else {
 | 
						|
    setImmediate(() => {
 | 
						|
      stream[kImpl].closed = true
 | 
						|
      stream.emit('close')
 | 
						|
    })
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
function write (stream, data, cb) {
 | 
						|
  // data is smaller than the shared buffer length
 | 
						|
  const current = Atomics.load(stream[kImpl].state, WRITE_INDEX)
 | 
						|
  const length = Buffer.byteLength(data)
 | 
						|
  stream[kImpl].data.write(data, current)
 | 
						|
  Atomics.store(stream[kImpl].state, WRITE_INDEX, current + length)
 | 
						|
  Atomics.notify(stream[kImpl].state, WRITE_INDEX)
 | 
						|
  cb()
 | 
						|
  return true
 | 
						|
}
 | 
						|
 | 
						|
function end (stream) {
 | 
						|
  if (stream[kImpl].ended || !stream[kImpl].ending || stream[kImpl].flushing) {
 | 
						|
    return
 | 
						|
  }
 | 
						|
  stream[kImpl].ended = true
 | 
						|
 | 
						|
  try {
 | 
						|
    stream.flushSync()
 | 
						|
 | 
						|
    let readIndex = Atomics.load(stream[kImpl].state, READ_INDEX)
 | 
						|
 | 
						|
    // process._rawDebug('writing index')
 | 
						|
    Atomics.store(stream[kImpl].state, WRITE_INDEX, -1)
 | 
						|
    // process._rawDebug(`(end) readIndex (${Atomics.load(stream.state, READ_INDEX)}) writeIndex (${Atomics.load(stream.state, WRITE_INDEX)})`)
 | 
						|
    Atomics.notify(stream[kImpl].state, WRITE_INDEX)
 | 
						|
 | 
						|
    // Wait for the process to complete
 | 
						|
    let spins = 0
 | 
						|
    while (readIndex !== -1) {
 | 
						|
      // process._rawDebug(`read = ${read}`)
 | 
						|
      Atomics.wait(stream[kImpl].state, READ_INDEX, readIndex, 1000)
 | 
						|
      readIndex = Atomics.load(stream[kImpl].state, READ_INDEX)
 | 
						|
 | 
						|
      if (readIndex === -2) {
 | 
						|
        destroy(stream, new Error('end() failed'))
 | 
						|
        return
 | 
						|
      }
 | 
						|
 | 
						|
      if (++spins === 10) {
 | 
						|
        destroy(stream, new Error('end() took too long (10s)'))
 | 
						|
        return
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    process.nextTick(() => {
 | 
						|
      stream[kImpl].finished = true
 | 
						|
      stream.emit('finish')
 | 
						|
    })
 | 
						|
  } catch (err) {
 | 
						|
    destroy(stream, err)
 | 
						|
  }
 | 
						|
  // process._rawDebug('end finished...')
 | 
						|
}
 | 
						|
 | 
						|
function writeSync (stream) {
 | 
						|
  const cb = () => {
 | 
						|
    if (stream[kImpl].ending) {
 | 
						|
      end(stream)
 | 
						|
    } else if (stream[kImpl].needDrain) {
 | 
						|
      process.nextTick(drain, stream)
 | 
						|
    }
 | 
						|
  }
 | 
						|
  stream[kImpl].flushing = false
 | 
						|
 | 
						|
  while (stream[kImpl].buf.length !== 0) {
 | 
						|
    const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX)
 | 
						|
    let leftover = stream[kImpl].data.length - writeIndex
 | 
						|
    if (leftover === 0) {
 | 
						|
      flushSync(stream)
 | 
						|
      Atomics.store(stream[kImpl].state, READ_INDEX, 0)
 | 
						|
      Atomics.store(stream[kImpl].state, WRITE_INDEX, 0)
 | 
						|
      continue
 | 
						|
    } else if (leftover < 0) {
 | 
						|
      // stream should never happen
 | 
						|
      throw new Error('overwritten')
 | 
						|
    }
 | 
						|
 | 
						|
    let toWrite = stream[kImpl].buf.slice(0, leftover)
 | 
						|
    let toWriteBytes = Buffer.byteLength(toWrite)
 | 
						|
    if (toWriteBytes <= leftover) {
 | 
						|
      stream[kImpl].buf = stream[kImpl].buf.slice(leftover)
 | 
						|
      // process._rawDebug('writing ' + toWrite.length)
 | 
						|
      write(stream, toWrite, cb)
 | 
						|
    } else {
 | 
						|
      // multi-byte utf-8
 | 
						|
      flushSync(stream)
 | 
						|
      Atomics.store(stream[kImpl].state, READ_INDEX, 0)
 | 
						|
      Atomics.store(stream[kImpl].state, WRITE_INDEX, 0)
 | 
						|
 | 
						|
      // Find a toWrite length that fits the buffer
 | 
						|
      // it must exists as the buffer is at least 4 bytes length
 | 
						|
      // and the max utf-8 length for a char is 4 bytes.
 | 
						|
      while (toWriteBytes > stream[kImpl].buf.length) {
 | 
						|
        leftover = leftover / 2
 | 
						|
        toWrite = stream[kImpl].buf.slice(0, leftover)
 | 
						|
        toWriteBytes = Buffer.byteLength(toWrite)
 | 
						|
      }
 | 
						|
      stream[kImpl].buf = stream[kImpl].buf.slice(leftover)
 | 
						|
      write(stream, toWrite, cb)
 | 
						|
    }
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
function flushSync (stream) {
 | 
						|
  if (stream[kImpl].flushing) {
 | 
						|
    throw new Error('unable to flush while flushing')
 | 
						|
  }
 | 
						|
 | 
						|
  // process._rawDebug('flushSync started')
 | 
						|
 | 
						|
  const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX)
 | 
						|
 | 
						|
  let spins = 0
 | 
						|
 | 
						|
  // TODO handle deadlock
 | 
						|
  while (true) {
 | 
						|
    const readIndex = Atomics.load(stream[kImpl].state, READ_INDEX)
 | 
						|
 | 
						|
    if (readIndex === -2) {
 | 
						|
      throw Error('_flushSync failed')
 | 
						|
    }
 | 
						|
 | 
						|
    // process._rawDebug(`(flushSync) readIndex (${readIndex}) writeIndex (${writeIndex})`)
 | 
						|
    if (readIndex !== writeIndex) {
 | 
						|
      // TODO stream timeouts for some reason.
 | 
						|
      Atomics.wait(stream[kImpl].state, READ_INDEX, readIndex, 1000)
 | 
						|
    } else {
 | 
						|
      break
 | 
						|
    }
 | 
						|
 | 
						|
    if (++spins === 10) {
 | 
						|
      throw new Error('_flushSync took too long (10s)')
 | 
						|
    }
 | 
						|
  }
 | 
						|
  // process._rawDebug('flushSync finished')
 | 
						|
}
 | 
						|
 | 
						|
module.exports = ThreadStream
 |