Węzeł.js przesyłanie tego samego czytelnego strumienia do wielu (zapisywalnych) celów

Muszę uruchomić dwa polecenia w serii, które muszą odczytywać dane z tego samego strumienia. Po przekierowaniu strumienia do innego bufor jest opróżniany, więc nie mogę ponownie odczytać danych z tego strumienia, więc to nie działa:

var spawn = require('child_process').spawn;
var fs = require('fs');
var request = require('request');

var inputStream = request('http://placehold.it/640x360');
var identify = spawn('identify',['-']);

inputStream.pipe(identify.stdin);

var chunks = [];
identify.stdout.on('data',function(chunk) {
  chunks.push(chunk);
});

identify.stdout.on('end',function() {
  var size = getSize(Buffer.concat(chunks)); //width
  var convert = spawn('convert',['-','-scale',size * 0.5,'png:-']);
  inputStream.pipe(convert.stdin);
  convert.stdout.pipe(fs.createWriteStream('half.png'));
});

function getSize(buffer){
  return parseInt(buffer.toString().split(' ')[2].split('x')[0]);
}

Prośba narzeka na to

Error: You cannot pipe after data has been emitted from the response.

I zmiana strumienia wejściowego na fs.createWriteStream daje oczywiście ten sam problem. Nie chcę zapisywać do pliku, ale ponownie użyć w jakiś sposób strumienia, który request produkuje (lub jakikolwiek inny dla to ma znaczenie).

Czy istnieje sposób na ponowne użycie czytelnego strumienia po zakończeniu rurociągu? Jaki byłby najlepszy sposób na osiągnięcie czegoś takiego jak powyższy przykład?

Author: Alexander Mills, 2013-10-24

5 answers

Musisz utworzyć duplikat strumienia przez Orurowanie go do dwóch strumieni. Możesz utworzyć prosty strumień ze strumieniem Przejściowym, który po prostu przekazuje wejście na wyjście.

const spawn = require('child_process').spawn;
const PassThrough = require('stream').PassThrough;

const a = spawn('echo', ['hi user']);
const b = new PassThrough();
const c = new PassThrough();

a.stdout.pipe(b);
a.stdout.pipe(c);

let count = 0;
b.on('data', function (chunk) {
  count += chunk.length;
});
b.on('end', function () {
  console.log(count);
  c.pipe(process.stdout);
});

Wyjście:

8
hi user
 66
Author: user568109,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2018-05-08 07:43:58

Pierwsza odpowiedź działa tylko wtedy, gdy przetwarzanie danych zajmuje mniej więcej tyle samo czasu. Jeśli trwa to znacznie dłużej, tym szybciej zażąda nowych danych, w konsekwencji nadpisując dane nadal używane przez wolniejszego (miałem ten problem po próbie rozwiązania go za pomocą duplikatu strumienia).

Następujący wzór działał bardzo dobrze dla mnie. Wykorzystuje bibliotekę opartą na strumieniach Stream2, Streamz i obiecuje synchronizować strumienie asynchroniczne poprzez wywołanie zwrotne. Korzystanie z znany przykład z pierwszej odpowiedzi:

spawn = require('child_process').spawn;
pass = require('stream').PassThrough;
streamz = require('streamz').PassThrough;
var Promise = require('bluebird');

a = spawn('echo', ['hi user']);
b = new pass;
c = new pass;   

a.stdout.pipe(streamz(combineStreamOperations)); 

function combineStreamOperations(data, next){
  Promise.join(b, c, function(b, c){ //perform n operations on the same data
  next(); //request more
}

count = 0;
b.on('data', function(chunk) { count += chunk.length; });
b.on('end', function() { console.log(count); c.pipe(process.stdout); });
 11
Author: artikas,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-11-23 19:38:12

Dla ogólnego problemu, poniższy kod działa dobrze

var PassThrough = require('stream').PassThrough
a=PassThrough()
b1=PassThrough()
b2=PassThrough()
a.pipe(b1)
a.pipe(b2)
b1.on('data', function(data) {
  console.log('b1:', data.toString())
})
b2.on('data', function(data) {
  console.log('b2:', data.toString())
})
a.write('text')
 1
Author: Jake,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2015-11-06 09:55:14

A co z rurociągiem do dwóch lub więcej strumieni Nie w tym samym czasie ?

Na przykład:

var PassThrough = require('stream').PassThrough;
var mybiraryStream = stream.start(); //never ending audio stream
var file1 = fs.createWriteStream('file1.wav',{encoding:'binary'})
var file2 = fs.createWriteStream('file2.wav',{encoding:'binary'})
var mypass = PassThrough
mybinaryStream.pipe(mypass)
mypass.pipe(file1)
setTimeout(function(){
   mypass.pipe(file2);
},2000)

Powyższy kod nie powoduje żadnych błędów, ale plik2 jest pusty

 1
Author: user3683370,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2016-11-29 20:14:41

Mam inne rozwiązanie do zapisu do dwóch strumieni jednocześnie, naturalnie, czas na zapis będzie dodanie dwóch razy, ale używam go do odpowiedzi na żądanie pobierania, gdzie chcę zachować kopię pobranego pliku na moim serwerze (właściwie używam kopii zapasowej S3, więc buforuję najczęściej używane pliki lokalnie, aby uniknąć wielokrotnego transferu plików) {]}

/**
 * A utility class made to write to a file while answering a file download request
 */
class TwoOutputStreams {
  constructor(streamOne, streamTwo) {
    this.streamOne = streamOne
    this.streamTwo = streamTwo
  }

  setHeader(header, value) {
    if (this.streamOne.setHeader)
      this.streamOne.setHeader(header, value)
    if (this.streamTwo.setHeader)
      this.streamTwo.setHeader(header, value)
  }

  write(chunk) {
    this.streamOne.write(chunk)
    this.streamTwo.write(chunk)
  }

  end() {
    this.streamOne.end()
    this.streamTwo.end()
  }
}

Możesz użyć tego jako regularnego strumienia wyjściowego

const twoStreamsOut = new TwoOutputStreams(fileOut, responseStream)

I przekaż go do swojej metody, jakby była to odpowiedź lub fileOutputStream

 0
Author: Zied Hamdi,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/doraprojects.net/template/agent.layouts/content.php on line 54
2017-11-08 17:50:37