热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Node.js中StreamAPI的使用

基本介绍在Node.js中,读取文件的方式有两种,一种是用fs.readFile,另外一种是利用fs.createReadStream来读

基本介绍

在 Node.js 中,读取文件的方式有两种,一种是用 fs.readFile,另外一种是利用 fs.createReadStream 来读取。

fs.readFile 对于每个 Node.js 使用者来说最熟悉不过了,简单易懂,很好上手。但它的缺点是会先将数据全部读入内存,一旦遇到大文件的时候,这种方式读取的效率就非常低下了。

fs.createReadStream 则是通过 Stream 来读取数据,它会把文件(数据)分割成小块,然后触发一些特定的事件,我们可以监听这些事件,编写特定的处理函数。这种方式相对上面来说,并不好上手,但它效率非常高。

事实上, Stream 在 Node.js 中并非仅仅用在文件处理上,其他地方也可以看到它的身影,如 process.stdin/stdout, http, tcp sockets, zlib, crypto 等都有用到。

本文是我学习 Node.js 中的 Stream API 中的一点总结,希望对大家有用。

特点

  • 基于事件通讯

  • 可以通过 pipe 来连接流

种类

  • Readable Stream 可读数据流

  • Writeable Stream 可写数据流

  • Duplex Stream 双向数据流,可以同时读和写

  • Transform Stream 转换数据流,可读可写,同时可以转换(处理)数据

事件

可读数据流的事件

  • readable 数据向外流时触发

  • data 对于那些没有显式暂停的数据流,添加data事件监听函数,会将数据流切换到流动态,尽快向外提供数据

  • end 读取完数据时触发。注意不能和 writeableStream.end() 混淆,writeableStream 并没有 end 事件,只有 .end() 方法

  • close 数据源关闭时触发

  • error 读取数据发生错误时触发

可写数据流的事件

  • drain writable.write(chunk) 返回 false 之后,缓存全部写入完成,可以重新写入时就会触发

  • finish 调用 .end 方法时,所有缓存的数据释放后触发,类似于可读数据流中的 end 事件,表示写入过程结束

  • pipe 作为 pipe 目标时触发

  • unpipe 作为 unpipe 目标时触发

  • error 写入数据发生错误时触发

状态

可读数据流有两种状态:流动态暂停态,改变数据流状态的方法如下:

暂停态 -> 流动态

  • 添加 data 事件的监听函数

  • 调用 resume 方法

  • 调用 pipe 方法

注意: 如果转为流动态时,没有 data 事件的监听函数,也没有 pipe 方法的目的地,那么数据将遗失。

流动态 -> 暂停态

  • 不存在 pipe 方法的目的地时,调用 pause 方法

  • 存在 pipe 方法的目的地时,移除所有 data 事件的监听函数,并且调用 unpipe 方法,移除所有 pipe 方法的目的地

注意: 只移除 data 事件的监听函数,并不会自动引发数据流进入「暂停态」。另外,存在 pipe 方法的目的地时,调用 pause 方法,并不能保证数据流总是处于暂停态,一旦那些目的地发出数据请求,数据流有可能会继续提供数据。

用法

读写文件

var fs = require('fs');
// 新建可读数据流
var rs = fs.createReadStream('./test1.txt');
// 新建可写数据流
var ws = fs.createWriteStream('./test2.txt');// 监听可读数据流结束事件
rs.on('end', function() {console.log('read text1.txt successfully!');
});
// 监听可写数据流结束事件
ws.on('finish', function() {console.log('write text2.txt successfully!');
});
// 把可读数据流转换成流动态,流进可写数据流中
rs.pipe(ws);

读取 CSV 文件,并上传数据(我在生产环境中写过)

var fs = require('fs');
var es = require('event-stream');
var csv = require('csv');
var parser = csv.parse();
var transformer = csv.transform(function(record) {return record.join(',');
});var data = fs.createReadStream('./demo.csv');
data.pipe(parser).pipe(transformer)// 处理前一个 stream 传递过来的数据.pipe(es.map(function(data, callback) {upload(data, function(err) {callback(err);});}))// 相当于监听前一个 stream 的 end 事件.pipe(es.wait(function(err, body) {process.stdout.write('done!');}));

更多用法

可以参考一下 https://github.com/jeresig/node-stream-playground ,进去示例网站之后直接点 add stream 就能看到结果了。

常见坑

  • rs.pipe(ws) 的方式来写文件并不是把 rs 的内容 append 到 ws 后面,而是直接用 rs 的内容覆盖 ws 原有的内容

  • 已结束/关闭的流不能重复使用,必须重新创建数据流

  • pipe 方法返回的是目标数据流,如 a.pipe(b) 返回的是 b,因此监听事件的时候请注意你监听的对象是否正确

    • 如果你要监听多个数据流,同时你又使用了 pipe 方法来串联数据流的话,你就要写成:

      data.on('end', function() {console.log('data end');}).pipe(a).on('end', function() {console.log('a end');}).pipe(b).on('end', function() {console.log('b end');});

常用类库

  • event-stream 用起来有函数式编程的感觉,个人比较喜欢

  • awesome-nodejs#streams 由于其他 stream 库我都没用过,所以有需求的就直接看这里吧

出处

https://scarletsky.github.io/2015/10/22/node-stream-api-learning/

参考资料

阮一峰 - stream接口
nodejs.org Stream
Transforming data with Node.js transform streams
NodeJS: What's the difference between a Duplex stream and a Transform stream?



推荐阅读
  • Node.js 教程第五讲:深入解析 EventEmitter(事件监听与发射机制)
    本文将深入探讨 Node.js 中的 EventEmitter 模块,详细介绍其在事件监听与发射机制中的应用。内容涵盖事件驱动的基本概念、如何在 Node.js 中注册和触发自定义事件,以及 EventEmitter 的核心 API 和使用方法。通过本教程,读者将能够全面理解并熟练运用 EventEmitter 进行高效的事件处理。 ... [详细]
  • 技术日志:深入探讨Spark Streaming与Spark SQL的融合应用
    技术日志:深入探讨Spark Streaming与Spark SQL的融合应用 ... [详细]
  • 在本地环境中部署了两个不同版本的 Flink 集群,分别为 1.9.1 和 1.9.2。近期在尝试启动 1.9.1 版本的 Flink 任务时,遇到了 TaskExecutor 启动失败的问题。尽管 TaskManager 日志显示正常,但任务仍无法成功启动。经过详细分析,发现该问题是由 Kafka 版本不兼容引起的。通过调整 Kafka 客户端配置并升级相关依赖,最终成功解决了这一故障。 ... [详细]
  • 本指南从零开始介绍Scala编程语言的基础知识,重点讲解了Scala解释器REPL(读取-求值-打印-循环)的使用方法。REPL是Scala开发中的重要工具,能够帮助初学者快速理解和实践Scala的基本语法和特性。通过详细的示例和练习,读者将能够熟练掌握Scala的基础概念和编程技巧。 ... [详细]
  • 在探讨 MySQL 正则表达式 REGEXP 的功能与应用之前,我们先通过一个小实验来对比 REGEXP 和 LIKE 的性能。通过具体的代码示例,我们将评估这两种查询方式的效率,以确定 REGEXP 是否值得深入研究。实验结果将为后续的详细解析提供基础。 ... [详细]
  • 初探性能优化:入门指南与实践技巧
    在编程领域,常有“尚未精通编码便急于优化”的声音。为了从性能优化的角度提升代码质量,本文将带领读者初步探索性能优化的基本概念与实践技巧。即使程序看似运行良好,数据处理效率仍有待提高,通过系统学习性能优化,能够帮助开发者编写更加高效、稳定的代码。文章不仅介绍了性能优化的基础知识,还提供了实用的调优方法和工具,帮助读者在实际项目中应用这些技术。 ... [详细]
  • 本文深入解析了Java 8并发编程中的`AtomicInteger`类,详细探讨了其源码实现和应用场景。`AtomicInteger`通过硬件级别的原子操作,确保了整型变量在多线程环境下的安全性和高效性,避免了传统加锁方式带来的性能开销。文章不仅剖析了`AtomicInteger`的内部机制,还结合实际案例展示了其在并发编程中的优势和使用技巧。 ... [详细]
  • 本文深入探讨了HTTP头部中的Expires与Cache-Control字段及其缓存机制。Cache-Control字段主要用于控制HTTP缓存行为,其在HTTP/1.1中得到了广泛应用,而HTTP/1.0中主要使用Pragma:no-cache来实现类似功能。Expires字段则定义了资源的过期时间,帮助浏览器决定是否从缓存中读取资源。文章详细解析了这两个字段的具体用法、相互关系以及在不同场景下的应用效果,为开发者提供了全面的缓存管理指南。 ... [详细]
  • 使用 `git stash` 可以将当前未提交的修改保存到一个临时存储区,以便在后续恢复工作目录时使用。例如,在处理中间状态时,可以通过 `git stash` 命令将当前的所有未提交更改推送到一个新的储藏中,从而保持工作目录的整洁。此外,本文还将详细介绍如何解决 `git stash pop` 时可能出现的冲突问题,帮助用户高效地管理代码变更。 ... [详细]
  • 探讨上传下载 API 的常见问题及解决方案 ... [详细]
  • 在HTML5应用中,Accordion(手风琴,又称抽屉)效果因其独特的展开和折叠样式而广泛使用。本文探讨了三种不同的Accordion交互效果,通过层次结构优化信息展示和页面布局,提升用户体验。这些效果不仅增强了视觉效果,还提高了内容的可访问性和互动性。 ... [详细]
  • 本文探讨了BERT模型在自然语言处理领域的应用与实践。详细介绍了Transformers库(曾用名pytorch-transformers和pytorch-pretrained-bert)的使用方法,涵盖了从模型加载到微调的各个环节。此外,还分析了BERT在文本分类、情感分析和命名实体识别等任务中的性能表现,并讨论了其在实际项目中的优势和局限性。 ... [详细]
  • Java 8 引入了 Stream API,这一新特性极大地增强了集合数据的处理能力。通过 Stream API,开发者可以更加高效、简洁地进行集合数据的遍历、过滤和转换操作。本文将详细解析 Stream API 的核心概念和常见用法,帮助读者更好地理解和应用这一强大的工具。 ... [详细]
  • 深入理解Spark框架:RDD核心概念与操作详解
    RDD是Spark框架的核心计算模型,全称为弹性分布式数据集(Resilient Distributed Dataset)。本文详细解析了RDD的基本概念、特性及其在Spark中的关键操作,包括创建、转换和行动操作等,帮助读者深入理解Spark的工作原理和优化策略。通过具体示例和代码片段,进一步阐述了如何高效利用RDD进行大数据处理。 ... [详细]
  • 本文介绍了Python高级网络编程及TCP/IP协议簇的OSI七层模型。首先简单介绍了七层模型的各层及其封装解封装过程。然后讨论了程序开发中涉及到的网络通信内容,主要包括TCP协议、UDP协议和IPV4协议。最后还介绍了socket编程、聊天socket实现、远程执行命令、上传文件、socketserver及其源码分析等相关内容。 ... [详细]
author-avatar
夏y儿
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有