流是什么?

流,通俗来讲就是数据流动,数据从一个地方缓慢的流到另一个地方。

举个栗子,可以借助水管中的水流来辅助理解,当打开水龙头后,水便可以从源头流出水龙头;关闭水龙头,水便不再流动。

为什么需要流

那为什么会需要流呐?

其它介质和内存的数据规模不一致,例如磁盘的内存往往远远大于内存,因此磁盘中有可能会出现大于内存的文件,此时内存无法一次读入该文件。这种情形可以把水库比作磁盘,洗碗池比作内存,如果不加限制,水库的水量轻轻拿捏洗碗池,因此就需要水管来进行传输,限制水的流量。

其他介质和内存的数据处理能力不一致,内存的处理速度其他介质很难比,内存迅速处理数据,一波流传给硬盘,硬盘很难吃得消。

为了更深刻得理解流的作用,接下来我们来试一下不使用流需要如何进行文件读写。

文件读写

首先我们来实现最简单的文件拷贝功能,这个比较简单,我们可以借助 fs 模块的 readFile 和 writeFile 方法来实现。

readFile 和 writeFile 并没有 promise 化,可以借助 util.promiseify 方法将其 promise 化,但这里并不是文章的重点,因此依旧采用回调的方式

const fs = require("fs");
const path = require("path");
// 利用 path 上的方法组装路径
fs.readFile(path.resolve(__dirname, "test.txt"), (err, data) => {
  if (err) return console.log("error", err);
  fs.writeFile(path.resolve(__dirname, "result.txt"), data, () => {
    console.log("拷贝成功");
  });
});

上面的代码虽然成功实现了文件拷贝,但问题也很明显,不适用于大文件,当文件大于或接近内存时,会淹没内存,这也响应了为什么需要流的第一点。

对于大文件,如何进行读写那: 边读边写,读一点写一点,这样我们便可以控制文件读写的速率 ,也称作分片读写。总的来说就是边读边写。

分片读写

分片读写需要使用 fs 模块中的 read,write,close,open 方法。

既然 fs 有方法可以实现边读边写,那为什么还会有流的出现的?这几个方法太麻烦了,参数太多,这里只做一个演示。

首先来实现单个文字的读写。

// 创建一个存储单位 1 的 Buffer 空间,来存储中间读取的数据
let buf = Buffer.alloc(1);
// 读取源文件中的数据
fs.open(path.resolve(__dirname, "test.js"), "r", function (err, rfd) {
  // rfd 可以理解为文字指针
  // 你看到了吗?6 个参数,麻爪
  // 甚至都有点解释不动
  fs.read(rfd, buf, 0, 1, 0, function (err, bytesRead) {
    // bytesRead读取到的字节长度
    // 读取到的第一个数据存入 buf 中
    console.log(buf); // <Buffer 31>
    // 打开目标文件。
    fs.open(path.resolve(__dirname, "result.js"), "w", function (err, wfd) {
      // 6 个参数
      // 这里做的就是将 buf 内容写入 result
      fs.write(wfd, buf, 0, 1, 0, function (err, bytesWritten) {
        console.log("拷贝成功");
      });
    });
  });
});

上面的方法实现了单次数据的读取,我们只需要重复这个过程就可以实现大文件的读写。

如何重复实现上述过程那?递归,没错,就是递归,将读写部分封装成函数,在写成功的回调函数中再次调用该函数。

// source 源文件
// target 目标文件
// cb 回调函数
// bufferSize buffer固定长度,即一次读写的数量
function copy(source, target, cb, bufferSize = 3) {
  const SOURCE_PATH = path.resolve(__dirname, source);
  const TARGET_PATH = path.resolve(__dirname, target);
  let buf = Buffer.alloc(bufferSize); // 创建 buffer 实例
  let rOffset = 0; // 读取偏移量
  let wOffset = 0; // 写入偏移量
  fs.open(SOURCE_PATH, "r", function (err, rfd) {
    if (err) return cb(err);
    fs.open(TARGET_PATH, "w", function (err, wfd) {
      if (err) return cb(err);
      // 递归读写函数 next
      function next() {
        fs.read(rfd, buf, 0, bufferSize, rOffset, function (err, bytesRead) {
          if (err) return cb(err);
          // bytesRead 代表一次读取的字节数
          // 当 bytesRead 为 0 时,代表文件已经成功读完
          // 则可以停止读写操作,关闭文件
          if (bytesRead == 0) {
            let index = 0;
            let done = () => {
              if (  index == 2) {
                cb();
              }
            };
            fs.close(wfd, done);
            fs.close(rfd, done);
            return;
          }
          fs.write(
            wfd,
            buf,
            0,
            bytesRead,
            wOffset,
            function (err, bytesWritten) {
              if (err) return cb(err);
              // 读取成功,并更新偏移量
              rOffset  = bytesRead;
              wOffset  = bytesWritten;
              next();
            }
          );
        });
      }
      next();
    });
  });
}
copy("test.js", "result.js", function (err) {
  if (err) return console.log(err);
  console.log("拷贝成功");
});

这样我们就成功地实现大文件分片读写,但可以明显发现:

  • write/read 方法参数多,用起来非常繁琐
  • 上面的代码有些回调地狱的倾向,不宜维护和扩展

因此,流就出现了,下面一起来了解一下 nodejs 中的流。

可读流及源码编写

node 中有四种流,下面我们来依次介绍一下,本文主要介绍 Readable 可读流的使用及其源码编写。

Node.js 中的流同样位于 fs 模块

EventListener

Nodejs 中的流都继承于 EventListener ,也就是说其工作原理都是基于发布订阅模式。

Readable 可读流

可读流用于文件内容的读取,它主要有两种读取模式:

  • 流动模式: 可读流自动读取数据,通过 EventListener 接口将数据传递给应用
  • 暂停模式: 这种模式下不会主动通过 EventListener 给应用传递数据,当显式调用 stream.read 后重启数据流动

通过 createReadStream 方法可以创建可读流,该方法有两个参数:

  • 参数一读取文件的路径
  • 参数二是 options 配置项,该项有八个参数,但日常我们只需要常用带星号的几个配置。
  • flags* :标识位,默认为 r;
  • encoding :字符编码,默认为 null;
  • fd :文件描述符,默认为 null;
  • mode :权限位,默认为 0o666;
  • autoClose :是否自动关闭文件,默认为 true;
  • start :读取文件的起始位置;
  • end :读取文件的(包含)结束位置;
  • highWaterMark* :最大读取文件的字节数,默认 64 * 1024 。

highWaterMark 是最值得注意的,它表示每次读取的文件字节长度。

看起来流的参数很多,用起来会很复杂,那你就错了,下面来看个例子。

// 流是基于发布订阅模式实现的
// 因此我们只需要订阅对应事件即可
const fs = require("fs");
const path = require("path");
// 返回一个可读流
const rs = fs.createReadStream(path.resolve(__dirname, "test.txt"), {
  highWaterMark: 3, // 每次读取 3kb
});
// 文件打开的钩子函数
rs.on("open", (fd) => {
  console.log(fd); // 3
});
// 当可读流处于流动模式时,data 事件会不断触发
// 在这里我们可以获取到读取的数据,进行后续操作
rs.on("data", (chunk) => {
  console.log(chunk);
});
rs.on("end", () => {
  console.log("end"); // 结束事件
});

data 事件会一直触发,也就是说在文件读取完成前, data 会一直传递数据,有时候我们并非需要一直读取,例如读取一下暂停一下,那该如何实现那?

// 借助 pause 和 resume 方法可以实现数据读取的暂停与恢复
rs.on("data", function (data) {
  // 读取的数据为 buffer 类型
  console.log(`读取了 ${data.length} 字节数据 : ${data.toString()}`);

  //使流动模式的流停止触发'data'事件,切换出流动模式,数据都会保留在内部缓存中。
  rs.pause();

  //等待3秒后,再恢复触发'data'事件,将流切换回流动模式。
  setTimeout(function () {
    rs.resume();
  }, 3000);
});

下面我们来实现一下可读流的源码。

源码实现

Step1: 定义可读流

可读流继承于 EventListener ,因此我们首先建立 ReadStream 类继承于 EventListener ,这样 ReadStream 便可以使用 EventListener 类的方法。

EventListener 实现其实并不困难,小包前面的文章也讲过 EventListener 源码的解读及编写。

let fs = require("fs");
let EventEmitter = require("events");
class ReadStream extends EventEmitter {}

Step2: 参数配置

可读流有两个参数, path 路径和 options 配置项,我们把对应的参数配置在类上,因此我们需要编写一下构造函数。

constructor(path, options = {}) {
  // 使用继承,子类必须调用 super 函数
  super();
  this.path = path; //指定要读取的文件地址
  this.highWaterMark = options.highWaterMark || 64 * 1024;
  this.autoClose = options.autoClose || true; //是否自动关闭文件
  this.start = options.start || 0; // 从文件哪个位置开始读取
  this.end = options.end || null; // null表示没传递
  this.encoding = options.encoding || null;// buffer编码
  this.flags = options.flags || 'r';
}

除了 ReadStream 所需的参数外,我们还需要添加几个控制参数

  • pos : 记录当前文件读取到的位置
  • flowing : 当前读取的模式, true 为流动模式
  • buffer : 每次读取内容的存储位置
constructor() {
  // ...
  this.pos = this.start;
  this.flowing = null;
  this.buffer = Buffer.alloc(this.highWaterMark);
}

Step3: 打开待读文件

ReadStream 中分别使用 close、open、error 注册事件来控制对应行为的产生,当打开文件后,触发 open 事件;打开失败,触发 error 事件。

这里我们处理一下上面几个事件的触发时机,使用 fs.open 方法来打开文件。

open() {
    fs.open(this.path, this.flags, (err, fd) => {
        if (err) {
            if (this.autoClose) { // 如果需要自动关闭则去关闭文件
                this.destroy(); // 销毁(关闭文件,触发关闭事件)
            }
            this.emit('error', err); // 打开错误,触发 error 事件
            return;
        }
        this.fd = fd; // 保存文件描述符,方便后续轮询判断
        this.emit('open', this.fd); // 文件打开,触发 open 事件
    });
}

Step4: 读取文件内容

上文提到, ReadStream 有两种模式: 流动模式和暂停模式,并用 flowing 属性来标识两种模式。

ReadStream 通过监听 data 事件来启动文件读取,即:

rs.on("data", (chunk) => {
  console.log(chunk);
});

这里实现有两个难点:

  • 当监听 data 事件后, ReadStream 才开启数据读取,那应该如何监听 data 事件的注册那?
  • fs.open 是异步读取操作,因此有可能出现 data 事件触发时,文件还未读取完毕,那我们应该如何处理这种情况那?

一个问题一个问题来解决, EventListener 中提供了 newListener 事件,当注册新事件后,该事件的处理函数触发,因此我们可以监听该事件,判断事件类型,如果为 data 事件,打开 flowing ,开始读取

class ReadStream extends EventEmitter {
  constructor(path, options) {
    // 监听newListener事件,判断当前监听事件是否为 data 事件
    // 如果为 data 事件,开启文件读取
    this.on("newListener", (type) => {
      if (type === "data") {
        //  开启流动模式,开始读取文件中的内容
        this.flowing = true;
        this.read();
      }
    });
  }
}

由于 data 事件的触发可能发生在 fs.open 读取之前,因此 read 函数中要做一个 轮询操作 ,每次判断是否成功读取。

read() {
    // 文件如果未打卡,fd 是没有值的
    if (typeof this.fd !== "number") {
        // 如果文件未打开,触发 open 事件
        return this.once("open", () => this.read());
    }
}

Step5: 编写 read 方法

上面编写完毕后,我们可以成功的监听到 data 事件,且可以打开文件,后续就可以进行文件的读取了。

文件读取的内容上文案例中提到过,即利用 fs.read 方法进行读取,下面直接在源码上进行解释。

class ReadStream extends EventEmitter {
  read() {
    // 计算当前读取字节
    const howManyToRead = this.end
      ? Math.min(this.highWaterMark, this.end - this.pos   1)
      : this.highWaterMark;
    // 创建 buffer 实例
    const buffer = Buffer.alloc(howManyToRead);
    // 利用 fs.read 进行文件内容读取
    fs.read(
      this.fd,
      buffer,
      0,
      howManyToRead,
      this.offset,
      (err, bytesRead) => {
        if (err) return this.destory(err);
        this.pos  = bytesRead;
        // 可能存在最后一次的 buffer 大小 大于 实际数据大小的情况,所以使用slice来进行截取
        // 将读取后的内容传递给 data 事件
        this.emit("data", buffer.slice(0, bytesRead));
      }
    );
  }
}

这样便可以实现一次读取,一次读取完毕后,接着调用 read 方法就可以实现不断读取,即流动模式

read() {
    // ...
    // 流动模式下,循环进行读取
    if (this.flowing) {
         this.read();
    }
}

Step6: 流动模式与暂停模式

ReadStream 使用 flowing 来控制可读流的读取与暂停,最后我们来实现可读流的暂停和恢复。

 pause() {
      // 判断当前是否读取完毕了
      if (this.flowing) {
        this.flowing = false;
      }
    }
    resume() {
      // 判断当前是否读取完毕了
      if (!this.flowing) {
        this.flowing = true;
        this.read();
      }
    }

总结

本文详细的讲解了流的前因后果,流可以说是 node 的核心之一,对此我们需要完美掌握,灵活运用。本文为了让大家更深入的了解流,从源码和应用出发,带你全方位了解流。可写流的编写更有意思,可以学到更多东西,后续小包会继续撰写文章。

以上就是一文带你搞懂Node中的流的详细内容,更多关于Node 流的资料请关注Devmax其它相关文章!

一文带你搞懂Node中的流的更多相关文章

  1. 利用Node实现HTML5离线存储的方法

    这篇文章主要介绍了利用Node实现HTML5离线存储的方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

  2. ios – 使用带有NodeJs HTTPS的certificates.cer

    我为IOS推送通知生成了一个.cer文件,我希望将它与NodeJSHTTPS模块一起使用.我发现HTTPS模块的唯一例子是使用.pem和.sfx文件,而不是.cer:有解决方案吗解决方法.cer文件可以使用两种不同的格式进行编码:PEM和DER.如果您的文件使用PEM格式编码,您可以像使用任何其他.pem文件一样使用它(有关详细信息,请参见Node.jsdocumentation):如果您的文件使

  3. 如何在XCode IDE中构建NodeJS?

    如何在XCodeIDE中将NodeJS构建为项目?NodeJS构建指令说它应该用以下内容构建:但是我希望在XCodeIDE中构建.我真正想要做的是在我的应用程序中嵌入NodeJS,所以我想如果我可以在XCode中构建NodeJS,那么我可以调整它以在我建立和运行NodeJS后添加我的应用程序.我想通过让V8在XCode中编译来取得一些进展,现在我正在尝试将NodeJS添加到V8项目中.解决方法在节点存储库根目录中运行./configure–xcode,您将获得所需的node.xcodeproj文件.

  4. 深入云存储系统Swift核心组件:Ring实现原理剖析

    它的目的是用于托管Rackspace的CloudFilesservice,原始项目代号是swift,所以沿用至今。Ring是Swift中最重要的组件,用于记录存储对象与物理位置间映射关系。先来看一下Swift文档中关于Ring的描述:Ring用来确定数据驻留在集群中的位置。有单独对应于Account数据库、container数据库和单个object的ring。Ring使用zone的概念来保证数据的隔离。每个partition的replica都确保放在了不同的zone中。本文逐步深入探讨了Swift如何通过

  5. Swift开发:创建XML文件,包含节点,属性值

    .append;//3创建第二个节点数据letitem2:Item=Item;for{letnode=Node;node.id=i+1;node.attributes=["ID":"\","Name":"N-\","disp":"1","Appliance":"1","Icon":"ic_switch_4"]item2.addNode;}xml.items?

  6. 泛型 – 符合Swift中Comparable的泛型类

    我正在尝试创建一个符合Comparable协议的简单通用节点类,以便我可以轻松地比较节点而无需访问其密钥.当我试图写

  7. swift3 – 将SceneKit对象放在SCNCamera当前方向的前面

    >生成SCNVector4,它定向节点,使其“面向”相机?但是让我有点失落.我看到了许多类似的问题,比如thisone,但没有答案.嘿,如果要将对象放在相对于另一个节点的某个位置,并且与参考节点的方向相同,则可以使用这个更简单的函数:如果您想将’node’2m放在某个’cameraNode’前面,你可以这样称呼:

  8. 如何在Swift中继承NSOperation以将SKAction对象排队以进行串行执行?

    Rob为子类化NSOperation提供了agreatObjective-Csolution,以实现SKAction对象的串行排队机制.我在自己的Swift项目中成功实现了这一点.要使用Actionoperation,请在客户端类中实例化NSOperationQueue类成员:在init方法中添加以下重要行:然后当您准备好向其添加SKActions时,它们会连续运行:您是否需要在任何时候终止操作:希望有所帮助!

  9. 核心数据 – 如何在Swift中定义CoreData关系?

    在CoreData中,我已经从Node到Tag定义了一个无序的多对多关系.我创建了一个这样的Swift实体:现在我想添加一个Tag到Node的一个实例,像这样:但是,这会失败,并显示以下错误:Terminatingappduetouncaughtexception‘NSinvalidargumentexception’,reason:‘Unacceptabletypeofvalueforto-ma

  10. 将“nil”值赋给Swift中的一般类型变量

    您需要将变量声明为可选项:不幸的是,这似乎触发了一个未实现的编译器功能:您可以通过使用NSObject的类型约束声明T来解决它:

随机推荐

  1. Error: Cannot find module ‘node:util‘问题解决

    控制台 安装 Vue-Cli 最后一步出现 Error: Cannot find module 'node:util' 问题解决方案1.问题C:\Windows\System32>cnpm install -g @vue/cli@4.0.3internal/modules/cjs/loader.js:638 throw err; &nbs

  2. yarn的安装和使用(全网最详细)

    一、yarn的简介:Yarn是facebook发布的一款取代npm的包管理工具。二、yarn的特点:速度超快。Yarn 缓存了每个下载过的包,所以再次使用时无需重复下载。 同时利用并行下载以最大化资源利用率,因此安装速度更快。超级安全。在执行代码之前,Yarn 会通过算法校验每个安装包的完整性。超级可靠。使用详细、简洁的锁文件格式和明确的安装算法,Yarn 能够保证在不同系统上无差异的工作。三、y

  3. 前端环境 本机可切换node多版本 问题源头是node使用的高版本

    前言投降投降 重头再来 重装环境 也就分分钟的事 偏要折腾 这下好了1天了 还没折腾出来问题的源头是node 使用的高版本 方案那就用 本机可切换多版本最终问题是因为nodejs的版本太高,导致的node-sass不兼容问题,我的node是v16.14.0的版本,项目中用了"node-sass": "^4.7.2"版本,无法匹配当前的node版本根据文章的提

  4. nodejs模块学习之connect解析

    这篇文章主要介绍了nodejs模块学习之connect解析,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧

  5. nodejs npm package.json中文文档

    这篇文章主要介绍了nodejs npm package.json中文文档,本文档中描述的很多行为都受npm-config(7)的影响,需要的朋友可以参考下

  6. 详解koa2学习中使用 async 、await、promise解决异步的问题

    这篇文章主要介绍了详解koa2学习中使用 async 、await、promise解决异步的问题,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧

  7. Node.js编写爬虫的基本思路及抓取百度图片的实例分享

    这篇文章主要介绍了Node.js编写爬虫的基本思路及抓取百度图片的实例分享,其中作者提到了需要特别注意GBK转码的转码问题,需要的朋友可以参考下

  8. CentOS 8.2服务器上安装最新版Node.js的方法

    这篇文章主要介绍了CentOS 8.2服务器上安装最新版Node.js的方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

  9. node.js三个步骤实现一个服务器及Express包使用

    这篇文章主要介绍了node.js三个步骤实现一个服务器及Express包使用,文章通过新建一个文件展开全文内容,具有一定的参考价值,需要的小伙伴可以参考一下

  10. node下使用UglifyJS压缩合并JS文件的方法

    下面小编就为大家分享一篇node下使用UglifyJS压缩合并JS文件的方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧

返回
顶部