Stream 手册[译]

原文链接:https://github.com/substack/stream-handbook#duplex

简介

“We should have some ways of connecting programs like garden hose–screw in
another segment when it becomes necessary to massage data in
another way. This is the way of IO also.”
— Doug McIlroy. October 11, 1964

Streams在Unix早期就来到我们身边,并且在过去的几十年已经证明了它们是一个可信的方式–通过小部件来组成大系统,小部件只需要做好某一件事。在Unix中,Streams由shell用|管道实现。在node中,内置的stream模块由核心库使用并且可以供user-space模块使用。和Unix类似,node中Stream模块的主要组成运算符名为pipe(),你也可以免费获得一个背压机制来节流对一个缓慢接收者的写入。

流可以帮你免受困扰,因为它们将实现方式限制为可以重复使用的一致的接口。然后,您可以将一个流的输出插入另一个流的输入,并使用对流进行抽象运算的库来进行更高级别的流控制。

Stream是小程序设计和unix哲学的重要组成部分,但还有许多其他重要的抽象概念值得考虑。只要记住,技术债务是敌人,并寻求解决问题的最佳抽象方法。

为什么你需要使用 streams

node中I/O是异步的,所以与磁盘和网络交互涉及将回调传递给函数。

1
2
3
4
5
6
7
8
9
var http = require('http');
var fs = require('fs');
var server = http.createServer(function (req, res) {
fs.readFile(__dirname + '/data.txt', function (err, data) {
res.end(data);
});
});
server.listen(8000);

这段代码可以生效但是太笨重了,在将结果写回客户端之前,它会为每个请求缓存 data.txt 到内存中。如果 data.txt 非常大,你的程序可能会很吃内存,因为它同时服务于大量用户,特别是对于慢速连接的用户。

这样的用户体验极其糟糕,因为在用户可以接收到任何内容之前,需要等待整个文件缓存至内存。

幸运的是,这两个参数(req,res)都是流,这意味着我们可以用更好的方式来书写代码,通过使用fs.createReadStream()代替 fs.readFile():

1
2
3
4
5
6
7
8
var http = require('http');
var fs = require('fs');
var server = http.createServer(function (req, res) {
var stream = fs.createReadStream(__dirname + '/data.txt');
stream.pipe(res);
});
server.listen(8000);

这里pipe()关心的是监听fs.createReadStream()dataend事件。这段代码不仅更干净,而且现在data.txt文件将被从磁盘接收一次写入客户端一次。

使用.pipe()也具有其他好处,例如自动处理背压,以便当远程客户端处于非常慢或高延迟的连接时,node不必将块缓存到内存中。

想压缩?还有流模块呢!

1
2
3
4
5
6
7
8
9
var http = require('http');
var fs = require('fs');
var oppressor = require('oppressor');
var server = http.createServer(function (req, res) {
var stream = fs.createReadStream(__dirname + '/data.txt');
stream.pipe(oppressor(req)).pipe(res);
});
server.listen(8000);

现在我们的文件已经为浏览器进行压缩了,它支持gzip和deflate。我们可以让oppressor处理所有内容编码的东西。

一旦你学习了 stream 的 api,你就能像拼接乐高积木和橡胶软管一样组合这些流模块,而不是记住如何通过不靠谱的非流式的自定义 API 来传递数据。

Streams 使得在node中变成更简单,优雅,可组合(自由)。

basics

这里有5种流:可读,可写,转换,双工和“经典”。

pipe

所有的不同类型的流都通过pipe()来配对输入与输出。

pipe()只是一个获取可读源流src并将输出挂接到一个目标可写流dst的函数。

1
src.pipe(dst)

.pipe(dst)返回dst以便你可以将多个pipe()连接在一起。

1
a.pipe(b).pipe(c).pipe(d)

就像这样:

1
2
3
a.pipe(b);
b.pipe(c);
c.pipe(d);

这个非常像你可能会通过命令行一起管理程序的方式:

1
a | b | c | d

node 除外而不是shell!

可读流

可读流提供一个可以通过调用.pipe()转换成可写,转换,双工流的数据。

创建一个可读流

让我们来创造一个可读流!

1
2
3
4
5
6
7
8
var Readable = require('stream').Readable;
var rs = new Readable;
rs.push('beep ');
rs.push('boop\n');
rs.push(null);
rs.pipe(process.stdout);
1
2
$ node read0.js
beep boop

rs.push(null)告诉接收者rs已经完成了数据输出。

注意,我们在将可读流rs传输到process.stdout前将内容内容推送到rs,但完整的消息仍能写入。

这是因为当你.push()到可读流时,你推送的那些块都被缓存了,直到接收者准备好读取它们。

但是,如果我们能够避免将数据一起缓存并且只在接收者需要的时候再生成数据,很多时候会更好。

我们可以通过定义一个._read函数来按需推送块:

1
2
3
4
5
6
7
8
9
10
var Readable = require('stream').Readable;
var rs = Readable();
var c = 97;
rs._read = function () {
rs.push(String.fromCharCode(c++));
if (c > 'z'.charCodeAt(0)) rs.push(null);
};
rs.pipe(process.stdout);
1
2
$ node read1.js
abcdefghijklmnopqrstuvwxyz

这里我们传递了字母az,但是只在接收者准备接受它们的时候。

_read函数同样会接手一个临时的size参数作为他的第一个入参来指定接收者想要读取多少个字节,但是你的可读流可以忽略size,只要它想。

注意,你同样可以使用util.inherits()子类化一个可读流,但是该方法并没有一个可以很好理解它的例子。

为了展示我们的 _read 函数只在被接收者请求时才调用,我们可以稍微修改一下我们的可读流代码来添加一个延迟:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
var Readable = require('stream').Readable;
var rs = Readable();
var c = 97 - 1;
rs._read = function () {
if (c >= 'z'.charCodeAt(0)) return rs.push(null);
setTimeout(function () {
rs.push(String.fromCharCode(++c));
}, 100);
};
rs.pipe(process.stdout);
process.on('exit', function () {
console.error('\n_read() called ' + (c - 97) + ' times');
});
process.stdout.on('error', process.exit);

运行这个程序我们可以看到,当我们只请求5个字节的输出时,_read()被调用了五次。

1
2
3
$ node read2.js | head -c5
abcde
_read() called 5 times

setTimeout延迟是必须的,因为操作系统需要一点时间来向我们发送一些相关信号以关闭管道。

process.stdout.on('error', fn)处理器也是必要的,因为当head对我们的程序输出不再有兴趣时,操作系统发送一个 SIGPIPE 我们的进程,在process.stdout上作为EPIPE错误被发出

当与外部操作系统管道交互时,这些额外的复杂性是必需的,但是当我们与node流直接交互时,它们是自动的。

如果要创建一个可读取的流,可以推送任意值而不是仅仅是字符串和缓存,请确保使用Readable({ objectMode: true })来创建可读流 。

使用可读流

大多数时候,传输一个可读流转换成其他的流或者一个由类似 through or concat-stream 的模块创建的流是很容易的,但有时候直接使用可读流是有用的。

1
2
3
4
process.stdin.on('readable', function () {
var buf = process.stdin.read();
console.dir(buf);
});
1
2
3
4
5
$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume0.js
<Buffer 61 62 63 0a>
<Buffer 64 65 66 0a>
<Buffer 67 68 69 0a>
null

当数据可用时,’readable’事件触发,你可以调用.read() 从缓冲区中获取一些数据。

当流完成后,.read()返回null,因为没有更多的字节可获取了。

您还可以告诉.read(n)返回n字节的数据。读取多个字节只是建议,对于对象流不起作用,但所有核心流都支持它。

以下是使用.read(n)将标准输入缓存为3字节块的示例:

1
2
3
4
process.stdin.on('readable', function () {
var buf = process.stdin.read(3);
console.dir(buf);
});

运行这个例子,它会给我们提供不完整的数据。

1
2
3
4
$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume1.js
<Buffer 61 62 63>
<Buffer 0a 64 65>
<Buffer 66 0a 67>

这是因为内部缓存区有额外的数据留存。我们需要给节点一个“kick”来告诉它,我们对已读取的3个字节以外的更多数据感兴趣。一个简单的.read(0)可以达成:

1
2
3
4
5
process.stdin.on('readable', function () {
var buf = process.stdin.read(3);
console.dir(buf);
process.stdin.read(0);
});

现在我们的代码如我们预期的一样以3字节的块在运行!

1
2
3
4
5
$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume2.js
<Buffer 61 62 63>
<Buffer 0a 64 65>
<Buffer 66 0a 67>
<Buffer 68 69 0a>

你同样可以使用.unshift()放回数据,这样的话当.read()给到更多你想要的数据时,相同的读取逻辑就会触发。

使用.unshift()使我们避免生成不必要的缓冲区副本。在这里我们可以构建一个可读的解析器来分割换行符:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
var offset = 0;
process.stdin.on('readable', function () {
var buf = process.stdin.read();
if (!buf) return;
for (; offset < buf.length; offset++) {
if (buf[offset] === 0x0a) {
console.dir(buf.slice(0, offset).toString());
buf = buf.slice(offset + 1);
offset = 0;
process.stdin.unshift(buf);
return;
}
}
process.stdin.unshift(buf);
});
1
2
3
4
5
6
7
8
9
10
11
$ tail -n +50000 /usr/share/dict/american-english | head -n10 | node lines.js
'hearties'
'heartiest'
'heartily'
'heartiness'
'heartiness\'s'
'heartland'
'heartland\'s'
'heartlands'
'heartless'
'heartlessly'

但是,你应该使用node中诸如 split 此类的模块而不是驱动你自己的行解析逻辑。

可写流

一个可写流是一个你可以传输进的流而不是传输出去的流:

1
src.pipe(writableStream)

创建一个可写流

只需定义一个._write(chunk, enc, next)函数,然后您可以传输可读流进来:

1
2
3
4
5
6
7
8
var Writable = require('stream').Writable;
var ws = Writable();
ws._write = function (chunk, enc, next) {
console.dir(chunk);
next();
};
process.stdin.pipe(ws);
1
2
3
$ (echo beep; sleep 1; echo boop) | node write0.js
<Buffer 62 65 65 70 0a>
<Buffer 62 6f 6f 70 0a>

第一个参数,chunk 是一个被提供者写入的数据。

第二个参数enc是一个带有字符串编码的字符串,但只有当 opts.decodeStringfalse且已经写入了一个字符串时才这样。

第三个参数,next(err) 是告诉使用者他们可以写入更多数据的回调。你可以选择传入一个可以在流的实例中触发 error 事件的 error 对象err

如果你正在传递的可读流在写入字符串,它们将被转换为缓存,除非你使用 Writable({ decodeStrings: false }) 创建了你的可写流。

如果你正在传递的可读流在写入对象,请使用 Writable({ objectMode: true }) 创建了你的可写流。

写入一个可写流

如果你想写入一个可写流,只需要用你想写入的 data 调用 .write(data) 就行了。

To tell the destination writable stream that you’re done writing, just call .end(). You can also give .end(data) some data to write before ending:

process.stdout.write('beep boop\n');

为了告诉目标可写流你已经写入完成,只需调用 .end()。你同样可以给 .end(data) 一些数据在结束之前写入。

1
2
3
4
5
6
7
8
var fs = require('fs');
var ws = fs.createWriteStream('message.txt');
ws.write('beep ');
setTimeout(function () {
ws.end('boop\n');
}, 1000);
1
2
3
$ node writing1.js
$ cat message.txt
beep boop

如果你关心高水位和缓存区,当在接下来的缓存中有比 pts.highWaterMark 传递给 Writable() 还要多的数据时, .write() 会返回 false

If you want to wait for the buffer to empty again, listen for a ‘drain’ event.

如果你想等待缓存再次清空,监听 drain 事件即可。

转换

转换流是双工流的一种类型(可读可写)。区别在于,在转换流中,输出是输入以某种方式计算得出的。

你可能也听过称为 “through streams” 的转换流。

Through streams 是一个简单的读写过滤器,可以转换输入并产生输出。

双工

双工流是可读/可写的,并且流的两端参与双向交互,来回发送诸如电话的消息。rpc交换是双工流的一个很好的例子。任何时候你看到像这样的:

a.pipe(b).pipe(a)

N可能正在处理双工流。

经典流

经典流是一个老的接口,首次出现在 node 0.4 版本。很长时间内你都可能遇到这种风格的流,所以知道它们是如何工作是很棒的。

经典可读流

经典可读流只是时间触发器,当它们拥有数据供使用者使用时,会触发 data 事件,当它们已经提供完数据时触发 end 事件。

.pipe() 会通过检查 stream.readable 的真实性来检查一个经典流是否是一个可读流。

Here is a super simple readable stream that prints A through J, inclusive:

这里有一个特别简单的打印 AJ 的可读流,示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var Stream = require('stream');
var stream = new Stream;
stream.readable = true;
var c = 64;
var iv = setInterval(function () {
if (++c >= 75) {
clearInterval(iv);
stream.emit('end');
}
else stream.emit('data', String.fromCharCode(c));
}, 100);
stream.pipe(process.stdout);
1
2
$ node classic0.js
ABCDEFGHIJ

要读取一个经典可读流,你需要注册 "data""end" 监听事件。这里有一个使用老的可读流风格从 process.stdin 读取的例子。

1
2
3
4
5
6
process.stdin.on('data', function (buf) {
console.log(buf);
});
process.stdin.on('end', function () {
console.log('__END__');
});
1
2
3
4
$ (echo beep; sleep 1; echo boop) | node classic1.js
<Buffer 62 65 65 70 0a>
<Buffer 62 6f 6f 70 0a>
__END__

请注意,无论你何时注册一个 "data" 监听事件,你都可以让流进入兼容模式,以免丢失新的stream2 api的好处。

你自己应该几乎不会注册 "data""end" 处理器了。如果您需要与旧版流互动,尽可能使用可以 .pipe() 到的库。

例如,您可以使用 through 来避免显式设置 "data""end" 监听器:

1
2
3
4
5
6
7
8
9
var through = require('through');
process.stdin.pipe(through(write, end));
function write (buf) {
console.log(buf);
}
function end () {
console.log('__END__');
}
1
2
3
4
$ (echo beep; sleep 1; echo boop) | node through.js
<Buffer 62 65 65 70 0a>
<Buffer 62 6f 6f 70 0a>
__END__

或者使用 concat-stream 来缓存整个流的内容:

1
2
3
4
var concat = require('concat-stream');
process.stdin.pipe(concat(function (body) {
console.log(JSON.parse(body));
}));
1
2
$ echo '{"beep":"boop"}' | node concat.js
{ beep: 'boop' }

经典可读流具有 .pause().resume() 用于临时暂停流,但是这仅仅是建议性的。如果你要通过经典可读流使用 .pause().resume(),应该使用 through 来处理缓存而不是自己写。

经典可写流

经典可写流非常简单。只有 .write(buf), .end(buf).destroy()三个方法。

.end(buf) 方法的buf参数是可选的,如果选择该参数,相当于stream.write(buf); stream.end() 这样的操作,需要注意的是当流的缓冲区写满即流不可写时.write(buf)方法会返回false,如果流再次可写时,流会触发drain事件。

热评文章