1、介绍

本文介绍了使用 node.js streams 开发程序的基本方法。

<code class="hljs mizar">"We should have some ways of connecting programs like garden hose--screw in
another segment when it becomes necessary to massage data in
another way. This is the way of IO also."
Doug McIlroy. October 11, 1964</code>

最早接触Stream是从早期的unix开始的数十年的实践证明Stream 思想可以很简单的开发出一些庞大的系统。在unix里,Stream是通过 |实现的;在node中,作为内置的stream模块,很多核心模块和三方模块都使用到。和unix一样,node Stream主要的操作也是.pipe(),使用者可以使用反压力机制来控制读和写的平衡。

Stream 可以为开发者提供可以重复使用统一的接口,通过抽象的Stream接口来控制Stream之间的读写平衡。

2、为什么使用Stream

node中的I/O是异步的,因此对磁盘和网络的读写需要通过回调函数来读取数据,下面是一个文件下载服务器的简单代码:

<code class="hljs javascript">var http = require('http');
var fs = require('fs');
var server = http.createServer(function (req, res) {
fs.readFile(__dirname   '/data.txt', function (err, data) {
res.end(data);
});
});
server.listen(8000);</code>

这些代码可以实现需要的功能,但是服务在发送文件数据之前需要缓存整个文件数据到内存,如果"data.txt"文件很大且并发量很大的话,会浪费很多内存。因为用户需要等到整个文件缓存到内存才能接受的文件数据,这样导致用户体验相当不好。不过还好(req, res)两个参数都是Stream,这样我们可以用fs.createReadStream()代替fs.readFile():

<code class="hljs javascript">var http = require('http');
var fs = require('fs');
var server = http.createServer(function (req, res) {
var stream = fs.createReadStream(__dirname   '/data.txt');
stream.pipe(res);
});
server.listen(8000);</code>

.pipe()方法监听fs.createReadStream()的'data' 和'end'事件,这样"data.txt"文件就不需要缓存整个文件,当客户端连接完成之后马上可以发送一个数据块到客户端。使用.pipe()另一个好处是可以解决当客户端延迟非常大时导致的读写不平衡问题。如果想压缩文件再发送,可以使用三方模块实现:

<code class="hljs javascript">var http = require('http');
var fs = require('fs');
var oppressor = require('oppressor');
var server = http.createServer(function (req, res) {
var stream = fs.createReadStream(__dirname   '/data.txt');
stream.pipe(oppressor(req)).pipe(res);
});
server.listen(8000);</code>

这样文件就会对支持gzip和deflate的浏览器进行压缩。oppressor 模块会处理所有的content-encoding。

Stream使开发程序变得简单。

3、基础概念

有五种基本的Stream: readable, writable, transform, duplex, and”classic”.

3-1、pipe

所有类型的Stream收是使用 .pipe() 来创建一个输入输出对,接收一个可读流src并将其数据输出到可写流dst,如下:

<code class="hljs perl">src.pipe(dst)</code>

.pipe( dst )方法为返回dst流,这样就可以接连使用多个.pipe(),如下:

<code class="hljs perl">a.pipe( b ).pipe( c ).pipe( d )</code>

功能与下面的代码相同:

<code class="hljs perl">a.pipe( b );
b.pipe( c );
c.pipe( d );</code>

3-2、readable streams

通过调用Readable streams的 .pipe()方法可以把Readable streams的数据写入一个Writable , Transform, 或者Duplex stream。

<code class="hljs perl">readableStream.pipe( dst )</code>

1>创建 readable stream

这里我们创建一个readable stream!

<code class="hljs perl">var Readable = require('stream').Readable;
var rs = new Readable;
rs.push('beep ');
rs.push('boop\n');
rs.push(null);
rs.pipe(process.stdout);
$ node read0.js
beep boop
</code>

rs.push( null ) 通知数据接收者数据已经发送完毕.

注意到我们在将所有数据内容压入可读流之前并没有调用rs.pipe(process.stdout);,但是我们压入的所有数据内容还是完全的输出了,这是因为可读流在接收者没有读取数据之前,会缓存所有压入的数据。但是在很多情况下, 更好的方法是只有数据接收着请求数据的时候,才压入数据到可读流而不是缓存整个数据。下面我们重写 一下._read()函数:

<code class="hljs javascript">var Readable = require('stream').Readable;
var rs = Readable();
var c = 97;
rs._read = function () {
rs.push(String.fromCharCode(c  ));
if (c > 'z'.charCodeAt(0)) rs.push(null);
};
rs.pipe(process.stdout);</code>
<code class="hljs bash">$ node read1.js
abcdefghijklmnopqrstuvwxyz</code>

上面的代码通过重写_read()方法实现了只有在数据接受者请求数据才向可读流中压入数据。_read()方法也可以接收一个size参数表示数据请求着请求的数据大小,但是可读流可以根据需要忽略这个参数。

注意我们也可以用util.inherits()继承可读流。为了说明只有在数据接受者请求数据时_read()方法才被调用,我们在向可读流压入数据时做一个延时,如下:

<code class="hljs javascript">var Readable = require('stream').Readable;
var rs = Readable();
var c = 97 - 1;
rs._read = function () {
if (c >= 'z'.charCodeAt(0)) return rs.push(null);
setTimeout(function () {
rs.push(String.fromCharCode(  c));
}, 100);
};
rs.pipe(process.stdout);
process.on('exit', function () {
console.error('\n_read() called '   (c - 97)   ' times');
});
process.stdout.on('error', process.exit);</code>

用下面的命令运行程序我们发现_read()方法只调用了5次:

<code class="hljs bash">$ node read2.js | head -c5
abcde
_read() called 5 times</code>

使用计时器的原因是系统需要时间来发送信号来通知程序关闭管道。使用process.stdout.on('error', fn) 是为了处理系统因为header命令关闭管道而发送SIGPIPE信号,因为这样会导致process.stdout触发EPIPE事件。如果想创建一个的可以压入任意形式数据的可读流,只要在创建流的时候设置参数objectMode为true即可,例如:Readable({ objectMode: true })。

2>读取readable stream数据

大部分情况下我们只要简单的使用pipe方法将可读流的数据重定向到另外形式的流,但是在某些情况下也许直接从可读流中读取数据更有用。如下:

<code class="hljs php">process.stdin.on('readable', function () {
var buf = process.stdin.read();
console.dir(buf);
});
$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume0.js 
<buffer 0a="" 61="" 62="" 63="">
<buffer 0a="" 64="" 65="" 66="">
<buffer 0a="" 67="" 68="" 69="">
null</buffer></buffer></buffer></code>

当可读流中有数据可读取时,流会触发'readable' 事件,这样就可以调用.read()方法来读取相关数据,当可读流中没有数据可读取时,.read() 会返回null,这样就可以结束.read() 的调用, 等待下一次'readable' 事件的触发。下面是一个使用.read(n)从标准输入每次读取3个字节的例子:

<code class="hljs javascript">process.stdin.on('readable', function () {
var buf = process.stdin.read(3);
console.dir(buf);
});</code>

如下运行程序发现,输出结果并不完全!

<code class="hljs bash">$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume1.js 
<buffer 61="" 62="" 63="">
<buffer 0a="" 64="" 65="">
<buffer 0a="" 66="" 67=""></buffer></buffer></buffer></code>

这是应为额外的数据数据留在流的内部缓冲区里了,而我们需要通知流我们要读取更多的数据.read(0)可以达到这个目的。

<code class="hljs javascript">process.stdin.on('readable', function () {
var buf = process.stdin.read(3);
console.dir(buf);
process.stdin.read(0);
});</code>

这次运行结果如下:

<code class="hljs xml">$ (echo abc; sleep 1; echo def; sleep 1; echo ghi) | node consume2.js 
<buffer 0a="" 64="" 65="">
<buffer 0a="" 68="" 69=""></buffer></buffer></code>

我们可以使用 .unshift() 将数据重新押回流数据队列的头部,这样可以接续读取押回的数据。如下面的代码,会按行输出标准输入的内容:

<code class="hljs javascript">var offset = 0;
process.stdin.on('readable', function () {
var buf = process.stdin.read();
if (!buf) return;
for (; offset < buf.length; offset  ) {
if (buf[offset] === 0x0a) {
console.dir(buf.slice(0, offset).toString());
buf = buf.slice(offset   1);
offset = 0;
process.stdin.unshift(buf);
return;
}
}
process.stdin.unshift(buf);
});
$ tail -n  50000 /usr/share/dict/american-english | head -n10 | node lines.js 
'hearties'
'heartiest'
'heartily'
'heartiness'
'heartiness\'s'
'heartland'
'heartland\'s'
'heartlands'
'heartless'
'heartlessly'</code>

当然,有很多模块可以实现这个功能,如:split 。

3-3、writable streams

writable streams只可以作为.pipe()函数的目的参数。如下代码:

<code class="hljs perl">src.pipe( writableStream );</code>

1>创建 writable stream

重写 ._write(chunk, enc, next) 方法就可以接受一个readable stream的数据。

<code class="hljs php">var Writable = require('stream').Writable;
var ws = Writable();
ws._write = function (chunk, enc, next) {
console.dir(chunk);
next();
};
process.stdin.pipe(ws);
$ (echo beep; sleep 1; echo boop) | node write0.js 
<buffer 0a="" 62="" 65="" 70="">
<buffer 0a="" 62="" 6f="" 70=""></buffer></buffer></code>

第一个参数chunk是数据输入者写入的数据。第二个参数end是数据的编码格式。第三个参数next(err)通过回调函数通知数据写入者可以写入更多的时间。如果readable stream写入的是字符串,那么字符串会默认转换为Buffer,如果在创建流的时候设置Writable({ decodeStrings: false })参数,那么不会做转换。如果readable stream写入的数据时对象,那么需要这样创建writable stream

<code class="hljs css">Writable({ objectMode: true })</code>

2>写数据到 writable stream

调用writable stream的.write(data)方法即可完成数据写入。

<code class="hljs vala">process.stdout.write('beep boop\n');</code>

调用.end()方法通知writable stream 数据已经写入完成。

<code class="hljs javascript">var fs = require('fs');
var ws = fs.createWriteStream('message.txt');
ws.write('beep ');
setTimeout(function () {
ws.end('boop\n');
}, 1000);
$ node writing1.js 
$ cat message.txt
beep boop</code>

如果需要设置writable stream的缓冲区的大小,那么在创建流的时候,需要设置opts.highWaterMark,这样如果缓冲区里的数据超过opts.highWaterMark,.write(data)方法会返回false。当缓冲区可写的时候,writable stream会触发'drain' 事件。

3-4、classic streams

Classic streams比较老的接口了,最早出现在node 0.4版本中,但是了解一下其运行原理还是十分有好
处的。当一个流被注册了"data" 事件的回到函数,那么流就会工作在老版本模式下,即会使用老的API。

1>classic readable streams

Classic readable streams事件就是一个事件触发器,如果Classic readable streams有数据可读取,那么其触发 "data" 事件,等到数据读取完毕时,会触发"end" 事件。.pipe() 方法通过检查stream.readable 的值确定流是否有数据可读。下面是一个使用Classic readable streams打印A-J字母的例子:

<code class="hljs javascript">var Stream = require('stream');
var stream = new Stream;
stream.readable = true;
var c = 64;
var iv = setInterval(function () {
if (  c >= 75) {
clearInterval(iv);
stream.emit('end');
}
else stream.emit('data', String.fromCharCode(c));
}, 100);
stream.pipe(process.stdout);
$ node classic0.js
ABCDEFGHIJ</code>

如果要从classic readable stream中读取数据,注册"data" 和"end"两个事件的回调函数即可,代码如下:

<code class="hljs php">process.stdin.on('data', function (buf) {
console.log(buf);
});
process.stdin.on('end', function () {
console.log('__END__');
});
$ (echo beep; sleep 1; echo boop) | node classic1.js 
<buffer 0a="" 62="" 65="" 70="">
<buffer 0a="" 62="" 6f="" 70="">
__END__</buffer></buffer></code>

需要注意的是如果你使用这种方式读取数据,那么会失去使用新接口带来的好处。比如你在往一个 延迟非常大的流写数据时,需要注意读取数据和写数据的平衡问题,否则会导致大量数据缓存在内存中,导致浪费大量内存。一般这时候强烈建议使用流的.pipe()方法,这样就不用自己监听”data” 和”end”事件了,也不用担心读写不平衡的问题了。当然你也可以用 through代替自己监听”data” 和”end” 事件,如下面的代码:

<code class="hljs php">var through = require('through');
process.stdin.pipe(through(write, end));
function write (buf) {
console.log(buf);
}
function end () {
console.log('__END__');
}
$ (echo beep; sleep 1; echo boop) | node through.js 
<buffer 0a="" 62="" 65="" 70="">
<buffer 0a="" 62="" 6f="" 70="">
__END__</buffer></buffer></code>

或者也可以使用concat-stream来缓存整个流的内容:

<code class="hljs oxygene">var concat = require('concat-stream');
process.stdin.pipe(concat(function (body) {
console.log(JSON.parse(body));
}));
$ echo '{"beep":"boop"}' | node concat.js 
{ beep: 'boop' }</code>

当然如果你非要自己监听"data" 和"end"事件,那么你可以在写数据的流不可写的时候使用.pause()方法暂停Classic readable streams继续触发”data” 事件。等到写数据的流可写的时候再使用.resume() 方法通知流继续触发"data" 事件继续读取
数据。

2>classic writable streams

Classic writable streams 非常简单。只有 .write(buf), .end(buf)和.destroy()三个方法。.end(buf) 方法的buf参数是可选的,如果选择该参数,相当于stream.write(buf); stream.end() 这样的操作,需要注意的是当流的缓冲区写满即流不可写时.write(buf)方法会返回false,如果流再次可写时,流会触发drain事件。

4、transform

transform是一个对读入数据过滤然输出的流。

5、duplex

duplex stream是一个可读也可写的双向流,如下面的a就是一个duplex stream:

<code class="hljs livecodeserver">a.pipe(b).pipe(a)</code>

以上内容是小编给大家介绍的Nodejs Stream 数据流使用手册,希望对大家有所帮助!

Nodejs Stream 数据流使用手册的更多相关文章

  1. nodejs npm package.json中文文档

    这篇文章主要介绍了nodejs npm package.json中文文档,本文档中描述的很多行为都受npm-config(7)的影响,需要的朋友可以参考下

  2. 浅析Nodejs npm常用命令

    这篇文章主要介绍了浅析Nodejs npm常用命令的相关资料,非常不错,具有参考借鉴价值,需要的朋友可以参考下

  3. nodejs 使用nodejs-websocket模块实现点对点实时通讯

    这篇文章主要介绍了nodejs 使用nodejs-websocket模块实现点对点实时通讯的实例代码,代码简单易懂,非常不错,具有一定的参考借鉴价值,需要的朋友可以参考下

  4. nodeJs链接Mysql做增删改查的简单操作

    本篇文章主要介绍了nodeJs链接Mysql做增删改查的简单操作,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧

  5. Nodejs中使用captchapng模块生成图片验证码

    本篇文章主要介绍了Nodejs中使用captchapng模块实现图片验证码,非常具有实用价值,需要的朋友可以参考下

  6. nodejs 图片预览和上传的示例代码

    本篇文章主要介绍了nodejs 图片预览和上传的示例代码,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧

  7. nodejs中函数的调用实例详解

    本文通过实例代码给大家介绍了nodejs函数的调用,代码简单易懂,非常不错,具有一定的参考借鉴价值,需要的朋友可以参考下

  8. NodeJS使用formidable实现文件上传

    这篇文章主要为大家详细介绍了NodeJS使用formidable实现文件上传的相关方法,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

  9. Nodejs获取网络数据并生成Excel表格

    这篇文章主要为大家详细介绍了Nodejs获取网络数据并生成Excel表格的具体实现方法,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

  10. NodeJS实现不可逆加密与密码密文保存的方法

    这篇文章主要介绍了NodeJS实现不可逆加密与密码密文保存的方法,简单讲述了不可逆加密与密码密文保存的原理并结合实例形式分析了nodejs相关加密操作实现技巧,需要的朋友可以参考下

随机推荐

  1. Error: Cannot find module ‘node:util‘问题解决

    控制台 安装 Vue-Cli 最后一步出现 Error: Cannot find module 'node:util' 问题解决方案1.问题C:\Windows\System32>cnpm install -g @vue/cli@4.0.3internal/modules/cjs/loader.js:638 throw err; &nbs

  2. yarn的安装和使用(全网最详细)

    一、yarn的简介:Yarn是facebook发布的一款取代npm的包管理工具。二、yarn的特点:速度超快。Yarn 缓存了每个下载过的包,所以再次使用时无需重复下载。 同时利用并行下载以最大化资源利用率,因此安装速度更快。超级安全。在执行代码之前,Yarn 会通过算法校验每个安装包的完整性。超级可靠。使用详细、简洁的锁文件格式和明确的安装算法,Yarn 能够保证在不同系统上无差异的工作。三、y

  3. 前端环境 本机可切换node多版本 问题源头是node使用的高版本

    前言投降投降 重头再来 重装环境 也就分分钟的事 偏要折腾 这下好了1天了 还没折腾出来问题的源头是node 使用的高版本 方案那就用 本机可切换多版本最终问题是因为nodejs的版本太高,导致的node-sass不兼容问题,我的node是v16.14.0的版本,项目中用了"node-sass": "^4.7.2"版本,无法匹配当前的node版本根据文章的提

  4. nodejs模块学习之connect解析

    这篇文章主要介绍了nodejs模块学习之connect解析,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧

  5. nodejs npm package.json中文文档

    这篇文章主要介绍了nodejs npm package.json中文文档,本文档中描述的很多行为都受npm-config(7)的影响,需要的朋友可以参考下

  6. 详解koa2学习中使用 async 、await、promise解决异步的问题

    这篇文章主要介绍了详解koa2学习中使用 async 、await、promise解决异步的问题,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧

  7. Node.js编写爬虫的基本思路及抓取百度图片的实例分享

    这篇文章主要介绍了Node.js编写爬虫的基本思路及抓取百度图片的实例分享,其中作者提到了需要特别注意GBK转码的转码问题,需要的朋友可以参考下

  8. CentOS 8.2服务器上安装最新版Node.js的方法

    这篇文章主要介绍了CentOS 8.2服务器上安装最新版Node.js的方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

  9. node.js三个步骤实现一个服务器及Express包使用

    这篇文章主要介绍了node.js三个步骤实现一个服务器及Express包使用,文章通过新建一个文件展开全文内容,具有一定的参考价值,需要的小伙伴可以参考一下

  10. node下使用UglifyJS压缩合并JS文件的方法

    下面小编就为大家分享一篇node下使用UglifyJS压缩合并JS文件的方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧

返回
顶部