pullstream.js 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. 'use strict';
  2. module.exports = PullStream;
  3. require("setimmediate");
  4. var inherits = require("util").inherits;
  5. var PassThrough = require('readable-stream/passthrough');
  6. var over = require('over');
  7. var SliceStream = require('slice-stream');
  8. function PullStream(opts) {
  9. var self = this;
  10. this.opts = opts || {};
  11. PassThrough.call(this, opts);
  12. this.once('finish', function() {
  13. self._writesFinished = true;
  14. if (self._flushed) {
  15. self._finish();
  16. }
  17. });
  18. this.on('readable', function() {
  19. self._process();
  20. });
  21. }
  22. inherits(PullStream, PassThrough);
  23. PullStream.prototype.pull = over([
  24. [over.numberOptionalWithDefault(null), over.func, function (len, callback) {
  25. if (len === 0) {
  26. return callback(null, new Buffer(0));
  27. }
  28. var self = this;
  29. pullServiceRequest();
  30. function pullServiceRequest() {
  31. self._serviceRequests = null;
  32. if (self._flushed) {
  33. return callback(new Error('End of Stream'));
  34. }
  35. var data = self.read(len || undefined);
  36. if (data) {
  37. setImmediate(callback.bind(null, null, data));
  38. } else {
  39. self._serviceRequests = pullServiceRequest;
  40. }
  41. }
  42. }]
  43. ]);
  44. PullStream.prototype.pullUpTo = over([
  45. [over.numberOptionalWithDefault(null), function (len) {
  46. var data = this.read(len);
  47. if (len && !data) {
  48. data = this.read();
  49. }
  50. return data;
  51. }]
  52. ]);
  53. PullStream.prototype.pipe = over([
  54. [over.numberOptionalWithDefault(null), over.object, function (len, destStream) {
  55. if (!len) {
  56. return PassThrough.prototype.pipe.call(this, destStream);
  57. }
  58. if (len === 0) {
  59. return destStream.end();
  60. }
  61. var pullstream = this;
  62. pullstream
  63. .pipe(new SliceStream({ length: len }, function (buf, sliceEnd, extra) {
  64. if (!sliceEnd) {
  65. return this.push(buf);
  66. }
  67. pullstream.unpipe();
  68. pullstream.unshift(extra);
  69. this.push(buf);
  70. return this.push(null);
  71. }))
  72. .pipe(destStream);
  73. return destStream;
  74. }]
  75. ]);
  76. PullStream.prototype._process = function () {
  77. if (this._serviceRequests) {
  78. this._serviceRequests();
  79. }
  80. };
  81. PullStream.prototype.prepend = function (chunk) {
  82. this.unshift(chunk);
  83. };
  84. PullStream.prototype.drain = function (len, callback) {
  85. if (this._flushed) {
  86. return callback(new Error('End of Stream'));
  87. }
  88. var data = this.pullUpTo(len);
  89. var bytesDrained = data && data.length || 0;
  90. if (bytesDrained === len) {
  91. setImmediate(callback);
  92. } else if (bytesDrained > 0) {
  93. this.drain(len - bytesDrained, callback);
  94. } else {
  95. //internal buffer is empty, wait until data can be consumed
  96. this.once('readable', this.drain.bind(this, len - bytesDrained, callback));
  97. }
  98. };
  99. PullStream.prototype._flush = function (callback) {
  100. var self = this;
  101. if (this._readableState.length > 0) {
  102. return setImmediate(self._flush.bind(self, callback));
  103. }
  104. this._flushed = true;
  105. if (self._writesFinished) {
  106. self._finish(callback);
  107. } else {
  108. callback();
  109. }
  110. };
  111. PullStream.prototype._finish = function (callback) {
  112. callback = callback || function () {};
  113. if (this._serviceRequests) {
  114. this._serviceRequests();
  115. }
  116. setImmediate(callback);
  117. };