初识系列:nodejs之stream可读流

March 24, 2021

前言

这是初识系列的第一篇:stream可读流。刚接触stream的时候有点难以理解,在客户端开发,基本接触不到stream,顶多也就是文档下载的时候,后端返回文件流,这个和stream沾边的东西。如此神秘,自然成为了首个研究的对象。nodejs对象里面有可读流,可写流,还有可读可写流,像HTTP响应Response对象就是可读流,而服务端的是可写流,下面介绍一下可读流Readable。

基本

常见用到可读流的情景是用fs.createReadStream(path[, options]),并通过监听可读流的dataend事件来操作,或则是用pipe方法将可读流的数据流到可写流里面。

可读流里面有两个构造函数,一个是Readable,一个是ReadableState。先看看ReadableState的构造函数:

function ReadableState(options, stream) {
  // ...省略部分代码
  // objectMode 对象流模式,返回的是对象而不是n字节缓存,hwm:高水位标志
  var hwm = options.highWaterMark;
  var defaultHwm = this.objectMode ? 16 : 16 * 1024;
  this.highWaterMark = hwm || hwm === 0 ? hwm : defaultHwm;
  this.highWaterMark = Math.floor(this.highWaterMark);

  //BufferList是可读流的缓冲区,其结构类似与C语言的链表,操作比数组快
  this.buffer = new BufferList();
  //缓冲区大小长度
  this.length = 0;
  // pipes:目标对象流,pipesCount为目标对象流长度
  this.pipes = null;
  this.pipesCount = 0;
  // flowing模式标志,ended为可读流结束标志,endEmitted:是否已经触发ended
  // reading:是否正在正在调用this._read方法
  this.flowing = null;
  this.ended = false;
  this.endEmitted = false;
  this.reading = false;
  // 异步标志置为true,用来控制'readable'/'data'事件是否立即执行
  this.sync = true;
  // 是否需要触发readable事件,
  this.needReadable = false;
  this.emittedReadable = false;
  this.readableListening = false;
  this.resumeScheduled = false;

  this.destroyed = false;

  // 目标流不处于drain状态时,等待drain事件的数量;
  this.awaitDrain = 0;
  // 是否正在读取更多数据,maybeReadMore函数
  this.readingMore = false;
  //.. 省略编解码相关部分
}

BufferList,就是读取过程中操作的缓冲池。ReadableState构造函数基本上是用来控制可读流的标志,其中最常见的就是flowing。可读流的工作模式分为flowing模式和pause模式。一般直接使用readable.pipe() 方法来消费流数据,因为它是最简单的一种实现,如果你想要精细控制,那就是通过控制flowing标志以及其他来实现。

添加chunk

可读流的数据从哪里来?或许最常见的就是用fs模块来createReadStream,然后直接读取就好了,似乎不涉及到chunk的添加过程。但是createReadStream又是如何创建可读流的呢?到最后还是需要Readable的push这个API,不断地push(chunk),也就是往缓冲区添加数据。下面介绍一下push方法:

function addChunk(stream, state, chunk, addToFront) {
  if (state.flowing && state.length === 0 && !state.sync) {
    stream.emit('data', chunk);
    stream.read(0);
  } else {
    // update the buffer info.
    state.length += state.objectMode ? 1 : chunk.length;
    if (addToFront) state.buffer.unshift(chunk);else state.buffer.push(chunk);

    if (state.needReadable) emitReadable(stream);
  }
  maybeReadMore(stream, state);
}

上面是addChunk方法,而Readable的push,会先检查objectMode,若不是objectMode,当压入的数据chunk是一个Buffer, Uint8Array或者string,objectMode就可以是any了,在readableAddChunk函数里面会state.reading = false添加块的过程并不是在执行_read方法。 回到addChunk方法,先看看else语句,可以发现添加chunk只是修改state.length,同时调用push/unshift来把chunk添加到缓冲区里面。并根据情况来触发readable事件。readable事件表明会有新的动态,要么有新的数据,要么到了流的尾部。而前面if语句里面,为flowing模式,并且缓冲区没有数据,且为同步模式下,才会触发data事件,并执行read(0)read(0)在满足条件的情况下,只是简单触发readable事件而不会读取当前缓冲区,后面会介绍到。addChunk结束部分还调用了maybeReadMore,在read部分会介绍到。

read

Readable.prototype.read方法,用于读取缓冲池里面的数据,其开始如下:

  if (n === 0 && state.needReadable && (state.length >= state.highWaterMark || state.ended)) {
    debug('read: emitReadable', state.length, state.ended);
    if (state.length === 0 && state.ended) endReadable(this);else emitReadable(this);
    return null;
  }

在开始的时候如果n=0,并且符合其他条件,则会执行emitReadable,接着触发readable事件,并返回null,这个时候read(0)并不会触发缓冲区的数据读取,只是简单的触发readable事件,这个实现还是很巧妙的。

  n = howMuchToRead(n, state);
  if (n === 0 && state.ended) {
    if (state.length === 0) endReadable(this);
    return null;
  }
  var doRead = state.needReadable;
  debug('need readable', doRead);
  // 数据比高水线低,那就需要读
  if (state.length === 0 || state.length - n < state.highWaterMark) {
    doRead = true;
    debug('length less than watermark', doRead);
  }

  if (state.ended || state.reading) {
    doRead = false;
    debug('reading or ended', doRead);
  } else if (doRead) {
    debug('do read');
    state.reading = true;
    state.sync = true;
    if (state.length === 0) state.needReadable = true;
    this._read(state.highWaterMark);
    state.sync = false;
    if (!state.reading) n = howMuchToRead(nOrig, state);
  }

这里面先是重新计算n, 当n===NaN的时候,读取缓冲区的第一个节点或则所有缓冲数据, 当n<=state.length返回n,并且当n>hwm的时候,重新计算高水位,为最小大于n的2^x, 当n>state.length的时候,返回0,并使得state.needReadable=true, 下面则是通过needReadable来判断是否执行_read方法,_read方法是需要自定义实现的,在方法里面手动添加数据到缓冲区里面,以便后面读取数据以及判断缓冲区长度。由于_read可能是同步的方法,修改缓冲区,所以需要重新评估n,以便后面获取数据。 后面部分,获取数据:

  var ret;
  if (n > 0) ret = fromList(n, state);else ret = null;
  if (ret === null) {
    state.needReadable = true;
    n = 0;
  } else {
    state.length -= n;
  }
  if (state.length === 0) {
    if (!state.ended) state.needReadable = true;
    if (nOrig !== n && state.ended) endReadable(this);
  }
  if (ret !== null) this.emit('data', ret);

  return ret;

这一部分就简单了,就是获取数据,设置needReadable,并触发data事件,供可读流监听,操作chunk。

needReadable的作用

在read方法里面,needReadable经常被修改为true,这个有什么用呢? 在addChunk里面若执行if语句里面的read(0),由于需要state.length>=htm是不会触发readable事件的,相反执行else语句,添加chunk之后,就触发readable事件,并将所有的缓冲区数据读出来,相当于把之前的数据,和本次加的数据都读出来了。addChunk的最后面也会通过调用maybeReadMore来执行read(0),其实现如下:

function maybeReadMore(stream, state) {
  if (!state.readingMore) {
    state.readingMore = true;
    processNextTick(maybeReadMore_, stream, state);
  }
}
function maybeReadMore_(stream, state) {
  var len = state.length;
  while (!state.reading && !state.flowing && !state.ended && state.length < state.highWaterMark) {
    debug('maybeReadMore read 0');
    stream.read(0);
    if (len === state.length)
      // didn't get any data, stop spinning.
      break;else len = state.length;
  }
  state.readingMore = false;
}

当添加chunk的时候,若state.lenght<hwm则会执行stream.read(0),从而肯定会执行_read方法,导致缓冲区增加,直到缓冲区长度超过高水线,同时最后一次调用stream.read(0),也不会触发readable事件。 另外在Readable.prototype.on函数,readable事件的处理函数里面,异步调用read(0)。 ps:maybeReadMore用了异步调用maybeReadMore_,是为了让本轮循环里面调用的_read执行完先,在_read里面的每个添加chunk的步骤,都会执行maybeReadMore函数,若同步执行maybeReadMore_,缓冲区数据将会远远超标。

数据读取ended

当数据读取结束后,要显式的执行push(null)/unshift(null)来调用onEofChunk方法。在onEofChunk里面,会通过emitReadable处理剩余的数据,并设置ended为true。emitReadable方法里面,通过while循环来读取剩余数据,使得state.length为0,并执行endReadable方法。

function endReadableNT(state, stream) {
  // Check that we didn't get one last unshift.
  if (!state.endEmitted && state.length === 0) {
    state.endEmitted = true;
    stream.readable = false;
    stream.emit('end');
  }
}

endReadable方法异步调用endReadableNT,置readable为false,并触发end事件。这样数据读取就结束了。

pipe管道

readable里面的pipe方法,主要部分是事件上的监听,核心部分如下:

  dest.emit('pipe', src);

  // start the flow if it hasn't been started already.
  if (!state.flowing) {
    debug('pipe resume');
    src.resume();
  }

触发pipe事件,同时开启flowing模式,并在结束的时候,会调用cleanup清理掉之前监听的事件。如果在pipe的时候,src又添加数据,而目标文件不处于drain状态,就需要监听drain事件,来单独处理。