123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137 |
- 'use strict';
- module.exports = PullStream;
- require("setimmediate");
- var inherits = require("util").inherits;
- var PassThrough = require('readable-stream/passthrough');
- var over = require('over');
- var SliceStream = require('slice-stream');
- function PullStream(opts) {
- var self = this;
- this.opts = opts || {};
- PassThrough.call(this, opts);
- this.once('finish', function() {
- self._writesFinished = true;
- if (self._flushed) {
- self._finish();
- }
- });
- this.on('readable', function() {
- self._process();
- });
- }
- inherits(PullStream, PassThrough);
- PullStream.prototype.pull = over([
- [over.numberOptionalWithDefault(null), over.func, function (len, callback) {
- if (len === 0) {
- return callback(null, new Buffer(0));
- }
- var self = this;
- pullServiceRequest();
- function pullServiceRequest() {
- self._serviceRequests = null;
- if (self._flushed) {
- return callback(new Error('End of Stream'));
- }
- var data = self.read(len || undefined);
- if (data) {
- setImmediate(callback.bind(null, null, data));
- } else {
- self._serviceRequests = pullServiceRequest;
- }
- }
- }]
- ]);
- PullStream.prototype.pullUpTo = over([
- [over.numberOptionalWithDefault(null), function (len) {
- var data = this.read(len);
- if (len && !data) {
- data = this.read();
- }
- return data;
- }]
- ]);
- PullStream.prototype.pipe = over([
- [over.numberOptionalWithDefault(null), over.object, function (len, destStream) {
- if (!len) {
- return PassThrough.prototype.pipe.call(this, destStream);
- }
- if (len === 0) {
- return destStream.end();
- }
- var pullstream = this;
- pullstream
- .pipe(new SliceStream({ length: len }, function (buf, sliceEnd, extra) {
- if (!sliceEnd) {
- return this.push(buf);
- }
- pullstream.unpipe();
- pullstream.unshift(extra);
- this.push(buf);
- return this.push(null);
- }))
- .pipe(destStream);
- return destStream;
- }]
- ]);
- PullStream.prototype._process = function () {
- if (this._serviceRequests) {
- this._serviceRequests();
- }
- };
- PullStream.prototype.prepend = function (chunk) {
- this.unshift(chunk);
- };
- PullStream.prototype.drain = function (len, callback) {
- if (this._flushed) {
- return callback(new Error('End of Stream'));
- }
- var data = this.pullUpTo(len);
- var bytesDrained = data && data.length || 0;
- if (bytesDrained === len) {
- setImmediate(callback);
- } else if (bytesDrained > 0) {
- this.drain(len - bytesDrained, callback);
- } else {
- //internal buffer is empty, wait until data can be consumed
- this.once('readable', this.drain.bind(this, len - bytesDrained, callback));
- }
- };
- PullStream.prototype._flush = function (callback) {
- var self = this;
- if (this._readableState.length > 0) {
- return setImmediate(self._flush.bind(self, callback));
- }
- this._flushed = true;
- if (self._writesFinished) {
- self._finish(callback);
- } else {
- callback();
- }
- };
- PullStream.prototype._finish = function (callback) {
- callback = callback || function () {};
- if (this._serviceRequests) {
- this._serviceRequests();
- }
- setImmediate(callback);
- };
|