1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
|
'use strict';
const Readable = require('stream').Readable;
const EventEmitter = require('events').EventEmitter;
const path = require('path');
const normalizeOptions = require('./normalize-options');
const stat = require('./stat');
const call = require('./call');
/**
* Asynchronously reads the contents of a directory and streams the results
* via a {@link stream.Readable}.
*/
class DirectoryReader {
/**
* @param {string} dir - The absolute or relative directory path to read
* @param {object} [options] - User-specified options, if any (see {@link normalizeOptions})
* @param {object} internalOptions - Internal options that aren't part of the public API
* @class
*/
constructor (dir, options, internalOptions) {
this.options = options = normalizeOptions(options, internalOptions);
// Indicates whether we should keep reading
// This is set false if stream.Readable.push() returns false.
this.shouldRead = true;
// The directories to read
// (initialized with the top-level directory)
this.queue = [{
path: dir,
basePath: options.basePath,
posixBasePath: options.posixBasePath,
depth: 0
}];
// The number of directories that are currently being processed
this.pending = 0;
// The data that has been read, but not yet emitted
this.buffer = [];
this.stream = new Readable({ objectMode: true });
this.stream._read = () => {
// Start (or resume) reading
this.shouldRead = true;
// If we have data in the buffer, then send the next chunk
if (this.buffer.length > 0) {
this.pushFromBuffer();
}
// If we have directories queued, then start processing the next one
if (this.queue.length > 0) {
if (this.options.facade.sync) {
while (this.queue.length > 0) {
this.readNextDirectory();
}
}
else {
this.readNextDirectory();
}
}
this.checkForEOF();
};
}
/**
* Reads the next directory in the queue
*/
readNextDirectory () {
let facade = this.options.facade;
let dir = this.queue.shift();
this.pending++;
// Read the directory listing
call.safe(facade.fs.readdir, dir.path, (err, items) => {
if (err) {
// fs.readdir threw an error
this.emit('error', err);
return this.finishedReadingDirectory();
}
try {
// Process each item in the directory (simultaneously, if async)
facade.forEach(
items,
this.processItem.bind(this, dir),
this.finishedReadingDirectory.bind(this, dir)
);
}
catch (err2) {
// facade.forEach threw an error
// (probably because fs.readdir returned an invalid result)
this.emit('error', err2);
this.finishedReadingDirectory();
}
});
}
/**
* This method is called after all items in a directory have been processed.
*
* NOTE: This does not necessarily mean that the reader is finished, since there may still
* be other directories queued or pending.
*/
finishedReadingDirectory () {
this.pending--;
if (this.shouldRead) {
// If we have directories queued, then start processing the next one
if (this.queue.length > 0 && this.options.facade.async) {
this.readNextDirectory();
}
this.checkForEOF();
}
}
/**
* Determines whether the reader has finished processing all items in all directories.
* If so, then the "end" event is fired (via {@Readable#push})
*/
checkForEOF () {
if (this.buffer.length === 0 && // The stuff we've already read
this.pending === 0 && // The stuff we're currently reading
this.queue.length === 0) { // The stuff we haven't read yet
// There's no more stuff!
this.stream.push(null);
}
}
/**
* Processes a single item in a directory.
*
* If the item is a directory, and `option.deep` is enabled, then the item will be added
* to the directory queue.
*
* If the item meets the filter criteria, then it will be emitted to the reader's stream.
*
* @param {object} dir - A directory object from the queue
* @param {string} item - The name of the item (name only, no path)
* @param {function} done - A callback function that is called after the item has been processed
*/
processItem (dir, item, done) {
let stream = this.stream;
let options = this.options;
let itemPath = dir.basePath + item;
let posixPath = dir.posixBasePath + item;
let fullPath = path.join(dir.path, item);
// If `options.deep` is a number, and we've already recursed to the max depth,
// then there's no need to check fs.Stats to know if it's a directory.
// If `options.deep` is a function, then we'll need fs.Stats
let maxDepthReached = dir.depth >= options.recurseDepth;
// Do we need to call `fs.stat`?
let needStats =
!maxDepthReached || // we need the fs.Stats to know if it's a directory
options.stats || // the user wants fs.Stats objects returned
options.recurseFn || // we need fs.Stats for the recurse function
options.filterFn || // we need fs.Stats for the filter function
EventEmitter.listenerCount(stream, 'file') || // we need the fs.Stats to know if it's a file
EventEmitter.listenerCount(stream, 'directory') || // we need the fs.Stats to know if it's a directory
EventEmitter.listenerCount(stream, 'symlink'); // we need the fs.Stats to know if it's a symlink
// If we don't need stats, then exit early
if (!needStats) {
if (this.filter(itemPath, posixPath)) {
this.pushOrBuffer({ data: itemPath });
}
return done();
}
// Get the fs.Stats object for this path
stat(options.facade.fs, fullPath, (err, stats) => {
if (err) {
// fs.stat threw an error
this.emit('error', err);
return done();
}
try {
// Add the item's path to the fs.Stats object
// The base of this path, and its separators are determined by the options
// (i.e. options.basePath and options.sep)
stats.path = itemPath;
// Add depth of the path to the fs.Stats object for use this in the filter function
stats.depth = dir.depth;
if (this.shouldRecurse(stats, posixPath, maxDepthReached)) {
// Add this subdirectory to the queue
this.queue.push({
path: fullPath,
basePath: itemPath + options.sep,
posixBasePath: posixPath + '/',
depth: dir.depth + 1,
});
}
// Determine whether this item matches the filter criteria
if (this.filter(stats, posixPath)) {
this.pushOrBuffer({
data: options.stats ? stats : itemPath,
file: stats.isFile(),
directory: stats.isDirectory(),
symlink: stats.isSymbolicLink(),
});
}
done();
}
catch (err2) {
// An error occurred while processing the item
// (probably during a user-specified function, such as options.deep, options.filter, etc.)
this.emit('error', err2);
done();
}
});
}
/**
* Pushes the given chunk of data to the stream, or adds it to the buffer,
* depending on the state of the stream.
*
* @param {object} chunk
*/
pushOrBuffer (chunk) {
// Add the chunk to the buffer
this.buffer.push(chunk);
// If we're still reading, then immediately emit the next chunk in the buffer
// (which may or may not be the chunk that we just added)
if (this.shouldRead) {
this.pushFromBuffer();
}
}
/**
* Immediately pushes the next chunk in the buffer to the reader's stream.
* The "data" event will always be fired (via {@link Readable#push}).
* In addition, the "file", "directory", and/or "symlink" events may be fired,
* depending on the type of properties of the chunk.
*/
pushFromBuffer () {
let stream = this.stream;
let chunk = this.buffer.shift();
// Stream the data
try {
this.shouldRead = stream.push(chunk.data);
}
catch (err) {
this.emit('error', err);
}
// Also emit specific events, based on the type of chunk
chunk.file && this.emit('file', chunk.data);
chunk.symlink && this.emit('symlink', chunk.data);
chunk.directory && this.emit('directory', chunk.data);
}
/**
* Determines whether the given directory meets the user-specified recursion criteria.
* If the user didn't specify recursion criteria, then this function will default to true.
*
* @param {fs.Stats} stats - The directory's {@link fs.Stats} object
* @param {string} posixPath - The item's POSIX path (used for glob matching)
* @param {boolean} maxDepthReached - Whether we've already crawled the user-specified depth
* @returns {boolean}
*/
shouldRecurse (stats, posixPath, maxDepthReached) {
let options = this.options;
if (maxDepthReached) {
// We've already crawled to the maximum depth. So no more recursion.
return false;
}
else if (!stats.isDirectory()) {
// It's not a directory. So don't try to crawl it.
return false;
}
else if (options.recurseGlob) {
// Glob patterns are always tested against the POSIX path, even on Windows
// https://github.com/isaacs/node-glob#windows
return options.recurseGlob.test(posixPath);
}
else if (options.recurseRegExp) {
// Regular expressions are tested against the normal path
// (based on the OS or options.sep)
return options.recurseRegExp.test(stats.path);
}
else if (options.recurseFn) {
try {
// Run the user-specified recursion criteria
return options.recurseFn.call(null, stats);
}
catch (err) {
// An error occurred in the user's code.
// In Sync and Async modes, this will return an error.
// In Streaming mode, we emit an "error" event, but continue processing
this.emit('error', err);
}
}
else {
// No recursion function was specified, and we're within the maximum depth.
// So crawl this directory.
return true;
}
}
/**
* Determines whether the given item meets the user-specified filter criteria.
* If the user didn't specify a filter, then this function will always return true.
*
* @param {string|fs.Stats} value - Either the item's path, or the item's {@link fs.Stats} object
* @param {string} posixPath - The item's POSIX path (used for glob matching)
* @returns {boolean}
*/
filter (value, posixPath) {
let options = this.options;
if (options.filterGlob) {
// Glob patterns are always tested against the POSIX path, even on Windows
// https://github.com/isaacs/node-glob#windows
return options.filterGlob.test(posixPath);
}
else if (options.filterRegExp) {
// Regular expressions are tested against the normal path
// (based on the OS or options.sep)
return options.filterRegExp.test(value.path || value);
}
else if (options.filterFn) {
try {
// Run the user-specified filter function
return options.filterFn.call(null, value);
}
catch (err) {
// An error occurred in the user's code.
// In Sync and Async modes, this will return an error.
// In Streaming mode, we emit an "error" event, but continue processing
this.emit('error', err);
}
}
else {
// No filter was specified, so match everything
return true;
}
}
/**
* Emits an event. If one of the event listeners throws an error,
* then an "error" event is emitted.
*
* @param {string} eventName
* @param {*} data
*/
emit (eventName, data) {
let stream = this.stream;
try {
stream.emit(eventName, data);
}
catch (err) {
if (eventName === 'error') {
// Don't recursively emit "error" events.
// If the first one fails, then just throw
throw err;
}
else {
stream.emit('error', err);
}
}
}
}
module.exports = DirectoryReader;
|