aboutsummaryrefslogtreecommitdiff
path: root/node_modules/merge2/index.js
blob: f644e3c86c7b90a4d7d6cf11bd3a75ff3d509b17 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
'use strict'
/*
 * merge2
 * https://github.com/teambition/merge2
 *
 * Copyright (c) 2014-2016 Teambition
 * Licensed under the MIT license.
 */
const Stream = require('stream')
const PassThrough = Stream.PassThrough
const slice = Array.prototype.slice

module.exports = merge2

function merge2 () {
  const streamsQueue = []
  let merging = false
  const args = slice.call(arguments)
  let options = args[args.length - 1]

  if (options && !Array.isArray(options) && options.pipe == null) args.pop()
  else options = {}

  const doEnd = options.end !== false
  if (options.objectMode == null) options.objectMode = true
  if (options.highWaterMark == null) options.highWaterMark = 64 * 1024
  const mergedStream = PassThrough(options)

  function addStream () {
    for (let i = 0, len = arguments.length; i < len; i++) {
      streamsQueue.push(pauseStreams(arguments[i], options))
    }
    mergeStream()
    return this
  }

  function mergeStream () {
    if (merging) return
    merging = true

    let streams = streamsQueue.shift()
    if (!streams) {
      process.nextTick(endStream)
      return
    }
    if (!Array.isArray(streams)) streams = [streams]

    let pipesCount = streams.length + 1

    function next () {
      if (--pipesCount > 0) return
      merging = false
      mergeStream()
    }

    function pipe (stream) {
      function onend () {
        stream.removeListener('merge2UnpipeEnd', onend)
        stream.removeListener('end', onend)
        next()
      }
      // skip ended stream
      if (stream._readableState.endEmitted) return next()

      stream.on('merge2UnpipeEnd', onend)
      stream.on('end', onend)
      stream.pipe(mergedStream, { end: false })
      // compatible for old stream
      stream.resume()
    }

    for (let i = 0; i < streams.length; i++) pipe(streams[i])

    next()
  }

  function endStream () {
    merging = false
    // emit 'queueDrain' when all streams merged.
    mergedStream.emit('queueDrain')
    return doEnd && mergedStream.end()
  }

  mergedStream.setMaxListeners(0)
  mergedStream.add = addStream
  mergedStream.on('unpipe', function (stream) {
    stream.emit('merge2UnpipeEnd')
  })

  if (args.length) addStream.apply(null, args)
  return mergedStream
}

// check and pause streams for pipe.
function pauseStreams (streams, options) {
  if (!Array.isArray(streams)) {
    // Backwards-compat with old-style streams
    if (!streams._readableState && streams.pipe) streams = streams.pipe(PassThrough(options))
    if (!streams._readableState || !streams.pause || !streams.pipe) {
      throw new Error('Only readable stream can be merged.')
    }
    streams.pause()
  } else {
    for (let i = 0, len = streams.length; i < len; i++) streams[i] = pauseStreams(streams[i], options)
  }
  return streams
}