Потоки (Stream) в NodeJS — реки, в которые войдешь дважды

Потоки (Stream) в NodeJS — реки, в которые войдешь дважды

Поток — это концепция, которая была сначала реализована в UNIX системах для передачи данных из одной программы в другую в операциях ввода/вывода. Это позволяет каждой программе быть очень специализированной в том, что она делает — быть независимым модулем. Сочетание таких простых программ помогает в создании более сложных систем путем «объединения» их в цепочку вызовов.

Потоки позволяют обмениваться данными небольшими частями, что в свою очередь дает возможность в своей работе не расходовать много памяти. Конечно, это зависит от того, как вы реализуется внутренний функционал потока.

Распространенная задача — парсинг файла большого объема. Например, в текстовом файле с данными логов нужно найти строку, содержащую определенный текст. Вместо того, чтобы файл полностью загрузить в память, и потом начать разбирать в нем строки в поисках нужной, мы можем его считывать небольшими порциями. Тем самым не занимаем память сверх необходимого, а лишь столько памяти, сколько нужно для буферизации считанных данных. Как только найдем требуемую запись, сразу прекратим дальнейшую работу. Или можем передать найденную запись в другой поток по цепочке, например, для преобразование в другой формат, или сохранения в другой файл.

Модуль stream предоставляет базовый API по работе с потоками в Node.JS. Документации Node.JS вполне достаточно, чтобы разобраться в данном вопросе, но мы попытаемся составить что-то вроде шпаргалки с пояснениями некоторых моментов.

Виды потоков

Есть четыре вида потоков:

  • Readable — поток, который предоставляет данные на чтение;
  • Writable — поток, в который данные можно записывать;
  • Duplex — поток, из которого можно как читать данные (Readable), так и записывать в него (Writable), при этом процесс чтения и записи просиходит независимо друго от друга;
  • Transform — разновидность Duplex потоков, которые могут изменять данные при их записи и чтении в/из потока (чаще используется как промежуточное звено в цепочке передачи данных).

Stream instanceof EventEmitter

Все потоки являются экземплярами EventEmitter, то есть можно генерировать события StreamClass.emit('eventName', data), и обрабатывать их StreamClass.on('eventName', (data)=>);

Метод pipe

Чтобы передать данные из одного потока в другой, самый простой способ вызвать над потоками метод pipe:

Последняя цепочка вызовов показывает, что реализовывать свои классы потоков лучше таким образом, чтобы каждый их них решал свою задачу.

Как видно — метод pipe возвращает экземпляр потока, который был передан в него, что и позволяет потоки объединять между собой.

Метод pipe, реализован таким образом, что он решает задачу контроля «скорости» передачи данных из одного потока в другой (превышение объема внутреннего буфера потока). Например, Writable поток работает на запись медленнее, чем их передает источник данных Readable. В этом случае передача данных «приостанавливается» до тех пор, пока Writable «не сообщит» (внутренний буфер очистится), что он готов принимать следующую порцию данных.

Buffering

Потоки хранят данные в своем внутреннем буфере. Размер буфера можно указать через параметр highWaterMark, который можно задать в конструкторе класса.

Физический смысл значение highWaterMark зависит от другой опции — objectMode.

В Readable потоке данные буферизируются, когда над ним вызвается метод push(data), и остаются в буфере до тех пор, пока их не прочитают, вызвав метод read(). Как только общий размер внутреннего буфера Readable потока достигнет порогового значения, указанного в highWaterMark, поток временно прекратит чтение данных.

Для Writable буферизация происходит во время вызова над ним метода write(data). Метод вернет true, пока размер буфера не достиг значения highWaterMark, и false, когда буфер переполнен. При использовании метода pipe(), как раз в этот момент он «останавливает» чтение данных, ожидает событие «drain», после чего передача данных возобновляется.

Object Mode

По умолчанию, потоки работают с данными в виде буфера, но так же могут работать как со строками, так и с другими объектами JavaScript (например, ), за исключением null-объекта, который играет отдельную роль при передаче данных (если поток получает null, это является сигналом, что данных для обработки больше нет, и чтение или запись данных завершена). Как установить тот или иной режим потока при его инициализации покажем в примерах ниже.

Состояние flowing или paused потока Readable

  • flowing — данные поступают непрерывно и как можно быстро для процесса, который их считывает;
  • paused — режим по умолчанию для всех типов потоков, данные передаются только если их явно запросили — явный вызов метода read() (метод read() неявно вызывается «внутри» метода pipe()).
  • данные передаются другим потокам через метод pipe();
  • и/или у него есть обработчик события 'data';
  • и/или над ним вызван метод resume().
  • если «разорвем» связь между источником данных и их потребителем (Readable.pipe(Writable); Readable.unpipe(Writable)), и/или удалим обработчик события 'data';
  • или вызовем метод Readable.pause().

Readable Streams — потоки как источник данных

Readable потоки работают в одном из двух состояний: flowing и paused. В состоянии paused для считывания данных необходимо явно вызывать метод read(). Когда вы передаете данные из одного потока в другой (R.pipe(W)), метод read() вызывается автоматически.

Весь текущий буфер данных можно получить с помощью свойства Readable._readableState.buffer.

Writable Streams — потоки для записи данных

Весь текущий буфер данных можно получить с помощью метода writable._writableState.getBuffer().

Transform Streams — потоки изменения данных

Transform — разновидность Duplex потоков. Решили сначала показать пример с ним.

Duplex Streams — потоки на запись и на чтение

Duplex реализуют в себе как Readable, таки Writable потоки. При этом их «работа» происходит независимо друг от друга.

Если вы заинтересовались темой потоков, предлагаем поэкспериментировать над реализацией своих Duplex потоков самостоятельно.

📎📎📎📎📎📎📎📎📎📎