| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198 |
- 'use strict';
- /*!
- * Module dependencies.
- */
- const EventEmitter = require('events').EventEmitter;
- const MongooseError = require('../error/mongooseError');
- /*!
- * ignore
- */
- const driverChangeStreamEvents = ['close', 'change', 'end', 'error', 'resumeTokenChanged'];
- /*!
- * ignore
- */
- class ChangeStream extends EventEmitter {
- constructor(changeStreamThunk, pipeline, options) {
- super();
- this.driverChangeStream = null;
- this.closed = false;
- this.bindedEvents = false;
- this.pipeline = pipeline;
- this.options = options;
- this.errored = false;
- if (options?.hydrate && !options.model) {
- throw new Error(
- 'Cannot create change stream with `hydrate: true` ' +
- 'unless calling `Model.watch()`'
- );
- }
- let syncError = null;
- this.$driverChangeStreamPromise = new Promise((resolve, reject) => {
- // This wrapper is necessary because of buffering.
- try {
- changeStreamThunk((err, driverChangeStream) => {
- if (err != null) {
- this.errored = true;
- this.emit('error', err);
- return reject(err);
- }
- this.driverChangeStream = driverChangeStream;
- this.emit('ready');
- resolve();
- });
- } catch (err) {
- syncError = err;
- this.errored = true;
- this.emit('error', err);
- reject(err);
- }
- });
- // Because a ChangeStream is an event emitter, there's no way to register an 'error' handler
- // that catches errors which occur in the constructor, unless we force sync errors into async
- // errors with setImmediate(). For cleaner stack trace, we just immediately throw any synchronous
- // errors that occurred with changeStreamThunk().
- if (syncError != null) {
- throw syncError;
- }
- }
- _bindEvents() {
- if (this.bindedEvents) {
- return;
- }
- this.bindedEvents = true;
- if (this.driverChangeStream == null) {
- this.$driverChangeStreamPromise.then(
- () => {
- this.driverChangeStream.on('close', () => {
- this.closed = true;
- });
- driverChangeStreamEvents.forEach(ev => {
- this.driverChangeStream.on(ev, data => {
- if (data?.fullDocument != null && this.options?.hydrate) {
- data.fullDocument = this.options.model.hydrate(data.fullDocument);
- }
- this.emit(ev, data);
- });
- });
- },
- () => {} // No need to register events if opening change stream failed
- );
- return;
- }
- this.driverChangeStream.on('close', () => {
- this.closed = true;
- });
- driverChangeStreamEvents.forEach(ev => {
- this.driverChangeStream.on(ev, data => {
- if (data?.fullDocument != null && this.options?.hydrate) {
- data.fullDocument = this.options.model.hydrate(data.fullDocument);
- }
- this.emit(ev, data);
- });
- });
- }
- hasNext(cb) {
- if (this.errored) {
- throw new MongooseError('Cannot call hasNext() on errored ChangeStream');
- }
- return this.driverChangeStream.hasNext(cb);
- }
- next(cb) {
- if (this.errored) {
- throw new MongooseError('Cannot call next() on errored ChangeStream');
- }
- if (this.options?.hydrate) {
- if (cb != null) {
- const originalCb = cb;
- cb = (err, data) => {
- if (err != null) {
- return originalCb(err);
- }
- if (data.fullDocument != null) {
- data.fullDocument = this.options.model.hydrate(data.fullDocument);
- }
- return originalCb(null, data);
- };
- }
- let maybePromise = this.driverChangeStream.next(cb);
- if (typeof maybePromise?.then === 'function') {
- maybePromise = maybePromise.then(data => {
- if (data.fullDocument != null) {
- data.fullDocument = this.options.model.hydrate(data.fullDocument);
- }
- return data;
- });
- }
- return maybePromise;
- }
- return this.driverChangeStream.next(cb);
- }
- addListener(event, handler) {
- if (this.errored) {
- throw new MongooseError('Cannot call addListener() on errored ChangeStream');
- }
- this._bindEvents();
- return super.addListener(event, handler);
- }
- on(event, handler) {
- if (this.errored) {
- throw new MongooseError('Cannot call on() on errored ChangeStream');
- }
- this._bindEvents();
- return super.on(event, handler);
- }
- once(event, handler) {
- if (this.errored) {
- throw new MongooseError('Cannot call once() on errored ChangeStream');
- }
- this._bindEvents();
- return super.once(event, handler);
- }
- _queue(cb) {
- this.once('ready', () => cb());
- }
- close() {
- this.closed = true;
- if (this.driverChangeStream) {
- return this.driverChangeStream.close();
- } else {
- return this.$driverChangeStreamPromise.then(
- () => this.driverChangeStream.close(),
- () => {} // No need to close if opening the change stream failed
- );
- }
- }
- }
- /*!
- * ignore
- */
- module.exports = ChangeStream;
|