aboutsummaryrefslogtreecommitdiff
path: root/node_modules/merge2/index.mjs
diff options
context:
space:
mode:
Diffstat (limited to 'node_modules/merge2/index.mjs')
-rw-r--r--node_modules/merge2/index.mjs109
1 files changed, 109 insertions, 0 deletions
diff --git a/node_modules/merge2/index.mjs b/node_modules/merge2/index.mjs
new file mode 100644
index 0000000..ece6734
--- /dev/null
+++ b/node_modules/merge2/index.mjs
@@ -0,0 +1,109 @@
+'use strict'
+/*
+ * merge2
+ * https://github.com/teambition/merge2
+ *
+ * Copyright (c) 2014-2016 Teambition
+ * Licensed under the MIT license.
+ */
+import Stream from 'stream'
+
+const PassThrough = Stream.PassThrough
+const slice = Array.prototype.slice
+
+function merge2 () {
+ const streamsQueue = []
+ let merging = false
+ let args = slice.call(arguments)
+ let options = args[args.length - 1]
+
+ if (options && !Array.isArray(options) && options.pipe == null) args.pop()
+ else options = {}
+
+ let doEnd = options.end !== false
+ if (options.objectMode == null) options.objectMode = true
+ if (options.highWaterMark == null) options.highWaterMark = 64 * 1024
+ const mergedStream = PassThrough(options)
+
+ function addStream () {
+ for (let i = 0, len = arguments.length; i < len; i++) {
+ streamsQueue.push(pauseStreams(arguments[i], options))
+ }
+ mergeStream()
+ return this
+ }
+
+ function mergeStream () {
+ if (merging) return
+ merging = true
+
+ let streams = streamsQueue.shift()
+ if (!streams) {
+ process.nextTick(endStream)
+ return
+ }
+ if (!Array.isArray(streams)) streams = [streams]
+
+ let pipesCount = streams.length + 1
+
+ function next () {
+ if (--pipesCount > 0) return
+ merging = false
+ mergeStream()
+ }
+
+ function pipe (stream) {
+ function onend () {
+ stream.removeListener('merge2UnpipeEnd', onend)
+ stream.removeListener('end', onend)
+ next()
+ }
+ // skip ended stream
+ if (stream._readableState.endEmitted) return next()
+
+ stream.on('merge2UnpipeEnd', onend)
+ stream.on('end', onend)
+ stream.pipe(mergedStream, { end: false })
+ // compatible for old stream
+ stream.resume()
+ }
+
+ for (let i = 0; i < streams.length; i++) pipe(streams[i])
+
+ next()
+ }
+
+ function endStream () {
+ merging = false
+ // emit 'queueDrain' when all streams merged.
+ mergedStream.emit('queueDrain')
+ return doEnd && mergedStream.end()
+ }
+
+ mergedStream.setMaxListeners(0)
+ mergedStream.add = addStream
+ mergedStream.on('unpipe', function (stream) {
+ stream.emit('merge2UnpipeEnd')
+ })
+
+ if (args.length) addStream.apply(null, args)
+ return mergedStream
+}
+
+// check and pause streams for pipe.
+function pauseStreams (streams, options) {
+ if (!Array.isArray(streams)) {
+ // Backwards-compat with old-style streams
+ if (!streams._readableState && streams.pipe) streams = streams.pipe(PassThrough(options))
+ if (!streams._readableState || !streams.pause || !streams.pipe) {
+ throw new Error('Only readable stream can be merged.')
+ }
+ streams.pause()
+ } else {
+ for (let i = 0, len = streams.length; i < len; i++) streams[i] = pauseStreams(streams[i], options)
+ }
+ return streams
+}
+
+export default merge2
+export { merge2 }