Потоки (Stream) в NodeJS — реки, в которые войдешь дважды
Поток — это концепция, которая была сначала реализована в UNIX системах для передачи данных из одной программы в другую в операциях ввода/вывода. Это позволяет каждой программе быть очень специализированной в том, что она делает — быть независимым модулем. Сочетание таких простых программ помогает в создании более сложных систем путем «объединения» их в цепочку вызовов.
Потоки позволяют обмениваться данными небольшими частями, что в свою очередь дает возможность в своей работе не расходовать много памяти. Конечно, это зависит от того, как вы реализуется внутренний функционал потока.
Распространенная задача — парсинг файла большого объема. Например, в текстовом файле с данными логов нужно найти строку, содержащую определенный текст. Вместо того, чтобы файл полностью загрузить в память, и потом начать разбирать в нем строки в поисках нужной, мы можем его считывать небольшими порциями. Тем самым не занимаем память сверх необходимого, а лишь столько памяти, сколько нужно для буферизации считанных данных. Как только найдем требуемую запись, сразу прекратим дальнейшую работу. Или можем передать найденную запись в другой поток по цепочке, например, для преобразование в другой формат, или сохранения в другой файл.
Модуль stream предоставляет базовый API по работе с потоками в Node.JS. Документации Node.JS вполне достаточно, чтобы разобраться в данном вопросе, но мы попытаемся составить что-то вроде шпаргалки с пояснениями некоторых моментов.
Виды потоков
Есть четыре вида потоков:
- Readable — поток, который предоставляет данные на чтение;
- Writable — поток, в который данные можно записывать;
- Duplex — поток, из которого можно как читать данные (Readable), так и записывать в него (Writable), при этом процесс чтения и записи просиходит независимо друго от друга;
- Transform — разновидность Duplex потоков, которые могут изменять данные при их записи и чтении в/из потока (чаще используется как промежуточное звено в цепочке передачи данных).
Stream instanceof EventEmitter
Все потоки являются экземплярами EventEmitter, то есть можно генерировать события StreamClass.emit('eventName', data), и обрабатывать их StreamClass.on('eventName', (data)=>);
Метод pipe
Чтобы передать данные из одного потока в другой, самый простой способ вызвать над потоками метод pipe:
Последняя цепочка вызовов показывает, что реализовывать свои классы потоков лучше таким образом, чтобы каждый их них решал свою задачу.
Как видно — метод pipe возвращает экземпляр потока, который был передан в него, что и позволяет потоки объединять между собой.
Метод pipe, реализован таким образом, что он решает задачу контроля «скорости» передачи данных из одного потока в другой (превышение объема внутреннего буфера потока). Например, Writable поток работает на запись медленнее, чем их передает источник данных Readable. В этом случае передача данных «приостанавливается» до тех пор, пока Writable «не сообщит» (внутренний буфер очистится), что он готов принимать следующую порцию данных.
Buffering
Потоки хранят данные в своем внутреннем буфере. Размер буфера можно указать через параметр highWaterMark, который можно задать в конструкторе класса.
Физический смысл значение highWaterMark зависит от другой опции — objectMode.
В Readable потоке данные буферизируются, когда над ним вызвается метод push(data), и остаются в буфере до тех пор, пока их не прочитают, вызвав метод read(). Как только общий размер внутреннего буфера Readable потока достигнет порогового значения, указанного в highWaterMark, поток временно прекратит чтение данных.
Для Writable буферизация происходит во время вызова над ним метода write(data). Метод вернет true, пока размер буфера не достиг значения highWaterMark, и false, когда буфер переполнен. При использовании метода pipe(), как раз в этот момент он «останавливает» чтение данных, ожидает событие «drain», после чего передача данных возобновляется.
Object Mode
По умолчанию, потоки работают с данными в виде буфера, но так же могут работать как со строками, так и с другими объектами JavaScript (например, ), за исключением null-объекта, который играет отдельную роль при передаче данных (если поток получает null, это является сигналом, что данных для обработки больше нет, и чтение или запись данных завершена). Как установить тот или иной режим потока при его инициализации покажем в примерах ниже.
Состояние flowing или paused потока Readable
- flowing — данные поступают непрерывно и как можно быстро для процесса, который их считывает;
- paused — режим по умолчанию для всех типов потоков, данные передаются только если их явно запросили — явный вызов метода read() (метод read() неявно вызывается «внутри» метода pipe()).
- данные передаются другим потокам через метод pipe();
- и/или у него есть обработчик события 'data';
- и/или над ним вызван метод resume().
- если «разорвем» связь между источником данных и их потребителем (Readable.pipe(Writable); Readable.unpipe(Writable)), и/или удалим обработчик события 'data';
- или вызовем метод Readable.pause().
Readable Streams — потоки как источник данных
Readable потоки работают в одном из двух состояний: flowing и paused. В состоянии paused для считывания данных необходимо явно вызывать метод read(). Когда вы передаете данные из одного потока в другой (R.pipe(W)), метод read() вызывается автоматически.
Весь текущий буфер данных можно получить с помощью свойства Readable._readableState.buffer.
Writable Streams — потоки для записи данных
Весь текущий буфер данных можно получить с помощью метода writable._writableState.getBuffer().
Transform Streams — потоки изменения данных
Transform — разновидность Duplex потоков. Решили сначала показать пример с ним.
Duplex Streams — потоки на запись и на чтение
Duplex реализуют в себе как Readable, таки Writable потоки. При этом их «работа» происходит независимо друг от друга.
Если вы заинтересовались темой потоков, предлагаем поэкспериментировать над реализацией своих Duplex потоков самостоятельно.