changeStream.js 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. 'use strict';
  2. /*!
  3. * Module dependencies.
  4. */
  5. const EventEmitter = require('events').EventEmitter;
  6. const MongooseError = require('../error/mongooseError');
  7. /*!
  8. * ignore
  9. */
  10. const driverChangeStreamEvents = ['close', 'change', 'end', 'error', 'resumeTokenChanged'];
  11. /*!
  12. * ignore
  13. */
  14. class ChangeStream extends EventEmitter {
  15. constructor(changeStreamThunk, pipeline, options) {
  16. super();
  17. this.driverChangeStream = null;
  18. this.closed = false;
  19. this.bindedEvents = false;
  20. this.pipeline = pipeline;
  21. this.options = options;
  22. this.errored = false;
  23. if (options?.hydrate && !options.model) {
  24. throw new Error(
  25. 'Cannot create change stream with `hydrate: true` ' +
  26. 'unless calling `Model.watch()`'
  27. );
  28. }
  29. let syncError = null;
  30. this.$driverChangeStreamPromise = new Promise((resolve, reject) => {
  31. // This wrapper is necessary because of buffering.
  32. try {
  33. changeStreamThunk((err, driverChangeStream) => {
  34. if (err != null) {
  35. this.errored = true;
  36. this.emit('error', err);
  37. return reject(err);
  38. }
  39. this.driverChangeStream = driverChangeStream;
  40. this.emit('ready');
  41. resolve();
  42. });
  43. } catch (err) {
  44. syncError = err;
  45. this.errored = true;
  46. this.emit('error', err);
  47. reject(err);
  48. }
  49. });
  50. // Because a ChangeStream is an event emitter, there's no way to register an 'error' handler
  51. // that catches errors which occur in the constructor, unless we force sync errors into async
  52. // errors with setImmediate(). For cleaner stack trace, we just immediately throw any synchronous
  53. // errors that occurred with changeStreamThunk().
  54. if (syncError != null) {
  55. throw syncError;
  56. }
  57. }
  58. _bindEvents() {
  59. if (this.bindedEvents) {
  60. return;
  61. }
  62. this.bindedEvents = true;
  63. if (this.driverChangeStream == null) {
  64. this.$driverChangeStreamPromise.then(
  65. () => {
  66. this.driverChangeStream.on('close', () => {
  67. this.closed = true;
  68. });
  69. driverChangeStreamEvents.forEach(ev => {
  70. this.driverChangeStream.on(ev, data => {
  71. if (data?.fullDocument != null && this.options?.hydrate) {
  72. data.fullDocument = this.options.model.hydrate(data.fullDocument);
  73. }
  74. this.emit(ev, data);
  75. });
  76. });
  77. },
  78. () => {} // No need to register events if opening change stream failed
  79. );
  80. return;
  81. }
  82. this.driverChangeStream.on('close', () => {
  83. this.closed = true;
  84. });
  85. driverChangeStreamEvents.forEach(ev => {
  86. this.driverChangeStream.on(ev, data => {
  87. if (data?.fullDocument != null && this.options?.hydrate) {
  88. data.fullDocument = this.options.model.hydrate(data.fullDocument);
  89. }
  90. this.emit(ev, data);
  91. });
  92. });
  93. }
  94. hasNext(cb) {
  95. if (this.errored) {
  96. throw new MongooseError('Cannot call hasNext() on errored ChangeStream');
  97. }
  98. return this.driverChangeStream.hasNext(cb);
  99. }
  100. next(cb) {
  101. if (this.errored) {
  102. throw new MongooseError('Cannot call next() on errored ChangeStream');
  103. }
  104. if (this.options?.hydrate) {
  105. if (cb != null) {
  106. const originalCb = cb;
  107. cb = (err, data) => {
  108. if (err != null) {
  109. return originalCb(err);
  110. }
  111. if (data.fullDocument != null) {
  112. data.fullDocument = this.options.model.hydrate(data.fullDocument);
  113. }
  114. return originalCb(null, data);
  115. };
  116. }
  117. let maybePromise = this.driverChangeStream.next(cb);
  118. if (typeof maybePromise?.then === 'function') {
  119. maybePromise = maybePromise.then(data => {
  120. if (data.fullDocument != null) {
  121. data.fullDocument = this.options.model.hydrate(data.fullDocument);
  122. }
  123. return data;
  124. });
  125. }
  126. return maybePromise;
  127. }
  128. return this.driverChangeStream.next(cb);
  129. }
  130. addListener(event, handler) {
  131. if (this.errored) {
  132. throw new MongooseError('Cannot call addListener() on errored ChangeStream');
  133. }
  134. this._bindEvents();
  135. return super.addListener(event, handler);
  136. }
  137. on(event, handler) {
  138. if (this.errored) {
  139. throw new MongooseError('Cannot call on() on errored ChangeStream');
  140. }
  141. this._bindEvents();
  142. return super.on(event, handler);
  143. }
  144. once(event, handler) {
  145. if (this.errored) {
  146. throw new MongooseError('Cannot call once() on errored ChangeStream');
  147. }
  148. this._bindEvents();
  149. return super.once(event, handler);
  150. }
  151. _queue(cb) {
  152. this.once('ready', () => cb());
  153. }
  154. close() {
  155. this.closed = true;
  156. if (this.driverChangeStream) {
  157. return this.driverChangeStream.close();
  158. } else {
  159. return this.$driverChangeStreamPromise.then(
  160. () => this.driverChangeStream.close(),
  161. () => {} // No need to close if opening the change stream failed
  162. );
  163. }
  164. }
  165. }
  166. /*!
  167. * ignore
  168. */
  169. module.exports = ChangeStream;