Create an account

Very important

  • To access the important data of the forums, you must be active in each forum and especially in the leaks and database leaks section, send data and after sending the data and activity, data and important content will be opened and visible for you.
  • You will only see chat messages from people who are at or below your level.
  • More than 500,000 database leaks and millions of account leaks are waiting for you, so access and view with more activity.
  • Many important data are inactive and inaccessible for you, so open them with activity. (This will be done automatically)


Thread Rating:
  • 390 Vote(s) - 3.47 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Node.js Streams vs. Observables

#1
After learning about [Observables](

[To see links please register here]

), I find them quite similar to [Node.js streams][1]. Both have a mechanism of notifying the consumer whenever new data arrives, an error occurs or there is no more data (EOF).

I would love to learn about the conceptual/functional differences between the two. Thanks!


[1]:

[To see links please register here]

Reply

#2
Both _Observables_ and node.js's _Streams_ allow you to solve the same underlying problem: asynchronously process a sequence of values. The main difference between the two, I believe, is related to the context that motivated its appearance. That context is reflected in the terminology and API.

On the _Observables_ side you have an extension to EcmaScript that introduces the reactive programming model. It tries to fill the gap between value generation and asynchronicity with the minimalist and composable concepts of `Observer` and `Observable`.

On node.js and _Streams_ side you wanted to create an interface for the asynchronous and performant processing of network streams and local files. The terminology derives from that initial context and you get `pipe`, `chunk`, `encoding`, `flush`, `Duplex`, `Buffer`, etc. By having a pragmatic approach that provides explicit support for particular use cases you lose some ability to compose things because it's not as uniform. For example, you use `push` on a `Readable` stream and `write` on a `Writable` although, conceptually, you are doing the same thing: publishing a value.

So, in practice, if you look at the concepts, and if you use the option `{ objectMode: true }`, you can match `Observable` with the `Readable` stream and `Observer` with the `Writable` stream. You can even create some simple adapters between the two models.

var Readable = require('stream').Readable;
var Writable = require('stream').Writable;
var util = require('util');

var Observable = function(subscriber) {
this.subscribe = subscriber;
}

var Subscription = function(unsubscribe) {
this.unsubscribe = unsubscribe;
}

Observable.fromReadable = function(readable) {
return new Observable(function(observer) {
function nop() {};

var nextFn = observer.next ? observer.next.bind(observer) : nop;
var returnFn = observer.return ? observer.return.bind(observer) : nop;
var throwFn = observer.throw ? observer.throw.bind(observer) : nop;

readable.on('data', nextFn);
readable.on('end', returnFn);
readable.on('error', throwFn);

return new Subscription(function() {
readable.removeListener('data', nextFn);
readable.removeListener('end', returnFn);
readable.removeListener('error', throwFn);
});
});
}

var Observer = function(handlers) {
function nop() {};

this.next = handlers.next || nop;
this.return = handlers.return || nop;
this.throw = handlers.throw || nop;
}

Observer.fromWritable = function(writable, shouldEnd, throwFn) {
return new Observer({
next: writable.write.bind(writable),
return: shouldEnd ? writable.end.bind(writable) : function() {},
throw: throwFn
});
}


You may have noticed that I changed a few names and used the simpler concepts of `Observer` and `Subscription`, introduced here, to avoid the overload of reponsibilities done by _Observables_ in `Generator`. Basically, the `Subscription` allows you to unsubscribe from the `Observable`. Anyway, with the above code you can have a `pipe`.

Observable.fromReadable(process.stdin).subscribe(Observer.fromWritable(process.stdout));

Compared with `process.stdin.pipe(process.stdout)`, what you have is a way to combine, filter, and transform streams that also works for any other sequence of data. You can achieve it with `Readable`, `Transform`, and `Writable` streams but the API favors subclassing instead of chaining `Readable`s and applying functions. On the `Observable` model, For example, transforming values corresponds to applying a transformer function to the stream. It does not require a new subtype of `Transform`.

Observable.just = function(/*... arguments*/) {
var values = arguments;
return new Observable(function(observer) {
[].forEach.call(values, function(value) {
observer.next(value);
});
observer.return();
return new Subscription(function() {});
});
};

Observable.prototype.transform = function(transformer) {
var source = this;
return new Observable(function(observer) {
return source.subscribe({
next: function(v) {
observer.next(transformer(v));
},
return: observer.return.bind(observer),
throw: observer.throw.bind(observer)
});
});
};

Observable.just(1, 2, 3, 4, 5).transform(JSON.stringify)
.subscribe(Observer.fromWritable(process.stdout))

The conclusion? It's easy to introduce the reactive model and the `Observable` concept anywhere. It's harder to implement an entire library around that concept. All those little functions need to work together consistently. After all, the [ReactiveX][1] project is still going at it. But if you really need to send the file content to the client, deal with encoding, and zip it then the support it's there, in NodeJS, and it works pretty well.

[1]:

[To see links please register here]

Reply



Forum Jump:


Users browsing this thread:
1 Guest(s)

©0Day  2016 - 2023 | All Rights Reserved.  Made with    for the community. Connected through