change_stream.js 19 KB


  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.ChangeStream = void 0;
  4. exports.filterOutOptions = filterOutOptions;
  5. const collection_1 = require("./collection");
  6. const constants_1 = require("./constants");
  7. const abstract_cursor_1 = require("./cursor/abstract_cursor");
  8. const change_stream_cursor_1 = require("./cursor/change_stream_cursor");
  9. const db_1 = require("./db");
  10. const error_1 = require("./error");
  11. const mongo_client_1 = require("./mongo_client");
  12. const mongo_types_1 = require("./mongo_types");
  13. const timeout_1 = require("./timeout");
  14. const utils_1 = require("./utils");
  15. const CHANGE_DOMAIN_TYPES = {
  16. COLLECTION: Symbol('Collection'),
  17. DATABASE: Symbol('Database'),
  18. CLUSTER: Symbol('Cluster')
  19. };
  20. const CHANGE_STREAM_EVENTS = [constants_1.RESUME_TOKEN_CHANGED, constants_1.END, constants_1.CLOSE];
  21. const NO_RESUME_TOKEN_ERROR = 'A change stream document has been received that lacks a resume token (_id).';
  22. const CHANGESTREAM_CLOSED_ERROR = 'ChangeStream is closed';
  23. const INVALID_STAGE_OPTIONS = buildDisallowedChangeStreamOptions();
  24. function filterOutOptions(options) {
  25. return Object.fromEntries(Object.entries(options).filter(([k, _]) => !INVALID_STAGE_OPTIONS.has(k)));
  26. }
  27. /**
  28. * Creates a new Change Stream instance. Normally created using {@link Collection#watch|Collection.watch()}.
  29. * @public
  30. */
  31. class ChangeStream extends mongo_types_1.TypedEventEmitter {
  32. /**
  33. * @experimental
  34. * An alias for {@link ChangeStream.close|ChangeStream.close()}.
  35. */
  36. async [Symbol.asyncDispose]() {
  37. await this.close();
  38. }
  39. /** @event */
  40. static { this.RESPONSE = constants_1.RESPONSE; }
  41. /** @event */
  42. static { this.MORE = constants_1.MORE; }
  43. /** @event */
  44. static { this.INIT = constants_1.INIT; }
  45. /** @event */
  46. static { this.CLOSE = constants_1.CLOSE; }
  47. /**
  48. * Fired for each new matching change in the specified namespace. Attaching a `change`
  49. * event listener to a Change Stream will switch the stream into flowing mode. Data will
  50. * then be passed as soon as it is available.
  51. * @event
  52. */
  53. static { this.CHANGE = constants_1.CHANGE; }
  54. /** @event */
  55. static { this.END = constants_1.END; }
  56. /** @event */
  57. static { this.ERROR = constants_1.ERROR; }
  58. /**
  59. * Emitted each time the change stream stores a new resume token.
  60. * @event
  61. */
  62. static { this.RESUME_TOKEN_CHANGED = constants_1.RESUME_TOKEN_CHANGED; }
  63. /**
  64. * @internal
  65. *
  66. * @param parent - The parent object that created this change stream
  67. * @param pipeline - An array of {@link https://www.mongodb.com/docs/manual/reference/operator/aggregation-pipeline/|aggregation pipeline stages} through which to pass change stream documents
  68. */
  69. constructor(parent, pipeline = [], options = {}) {
  70. super();
  71. this.pipeline = pipeline;
  72. this.options = { ...options };
  73. let serverSelectionTimeoutMS;
  74. delete this.options.writeConcern;
  75. if (parent instanceof collection_1.Collection) {
  76. this.type = CHANGE_DOMAIN_TYPES.COLLECTION;
  77. serverSelectionTimeoutMS = parent.s.db.client.options.serverSelectionTimeoutMS;
  78. }
  79. else if (parent instanceof db_1.Db) {
  80. this.type = CHANGE_DOMAIN_TYPES.DATABASE;
  81. serverSelectionTimeoutMS = parent.client.options.serverSelectionTimeoutMS;
  82. }
  83. else if (parent instanceof mongo_client_1.MongoClient) {
  84. this.type = CHANGE_DOMAIN_TYPES.CLUSTER;
  85. serverSelectionTimeoutMS = parent.options.serverSelectionTimeoutMS;
  86. }
  87. else {
  88. throw new error_1.MongoChangeStreamError('Parent provided to ChangeStream constructor must be an instance of Collection, Db, or MongoClient');
  89. }
  90. this.contextOwner = Symbol();
  91. this.parent = parent;
  92. this.namespace = parent.s.namespace;
  93. if (!this.options.readPreference && parent.readPreference) {
  94. this.options.readPreference = parent.readPreference;
  95. }
  96. // Create contained Change Stream cursor
  97. this.cursor = this._createChangeStreamCursor(options);
  98. this.isClosed = false;
  99. this.mode = false;
  100. // Listen for any `change` listeners being added to ChangeStream
  101. this.on('newListener', eventName => {
  102. if (eventName === 'change' && this.cursor && this.listenerCount('change') === 0) {
  103. this._streamEvents(this.cursor);
  104. }
  105. });
  106. this.on('removeListener', eventName => {
  107. if (eventName === 'change' && this.listenerCount('change') === 0 && this.cursor) {
  108. this.cursorStream?.removeAllListeners('data');
  109. }
  110. });
  111. if (this.options.timeoutMS != null) {
  112. this.timeoutContext = new timeout_1.CSOTTimeoutContext({
  113. timeoutMS: this.options.timeoutMS,
  114. serverSelectionTimeoutMS
  115. });
  116. }
  117. }
  118. /** The cached resume token that is used to resume after the most recently returned change. */
  119. get resumeToken() {
  120. return this.cursor?.resumeToken;
  121. }
  122. /** Check if there is any document still available in the Change Stream */
  123. async hasNext() {
  124. this._setIsIterator();
  125. // Change streams must resume indefinitely while each resume event succeeds.
  126. // This loop continues until either a change event is received or until a resume attempt
  127. // fails.
  128. this.timeoutContext?.refresh();
  129. try {
  130. while (true) {
  131. try {
  132. const hasNext = await this.cursor.hasNext();
  133. return hasNext;
  134. }
  135. catch (error) {
  136. try {
  137. await this._processErrorIteratorMode(error, this.cursor.id != null);
  138. }
  139. catch (error) {
  140. if (error instanceof error_1.MongoOperationTimeoutError && this.cursor.id == null) {
  141. throw error;
  142. }
  143. try {
  144. await this.close();
  145. }
  146. catch (error) {
  147. (0, utils_1.squashError)(error);
  148. }
  149. throw error;
  150. }
  151. }
  152. }
  153. }
  154. finally {
  155. this.timeoutContext?.clear();
  156. }
  157. }
  158. /** Get the next available document from the Change Stream. */
  159. async next() {
  160. this._setIsIterator();
  161. // Change streams must resume indefinitely while each resume event succeeds.
  162. // This loop continues until either a change event is received or until a resume attempt
  163. // fails.
  164. this.timeoutContext?.refresh();
  165. try {
  166. while (true) {
  167. try {
  168. const change = await this.cursor.next();
  169. const processedChange = this._processChange(change ?? null);
  170. return processedChange;
  171. }
  172. catch (error) {
  173. try {
  174. await this._processErrorIteratorMode(error, this.cursor.id != null);
  175. }
  176. catch (error) {
  177. if (error instanceof error_1.MongoOperationTimeoutError && this.cursor.id == null) {
  178. throw error;
  179. }
  180. try {
  181. await this.close();
  182. }
  183. catch (error) {
  184. (0, utils_1.squashError)(error);
  185. }
  186. throw error;
  187. }
  188. }
  189. }
  190. }
  191. finally {
  192. this.timeoutContext?.clear();
  193. }
  194. }
  195. /**
  196. * Try to get the next available document from the Change Stream's cursor or `null` if an empty batch is returned
  197. */
  198. async tryNext() {
  199. this._setIsIterator();
  200. // Change streams must resume indefinitely while each resume event succeeds.
  201. // This loop continues until either a change event is received or until a resume attempt
  202. // fails.
  203. this.timeoutContext?.refresh();
  204. try {
  205. while (true) {
  206. try {
  207. const change = await this.cursor.tryNext();
  208. if (!change) {
  209. return null;
  210. }
  211. const processedChange = this._processChange(change);
  212. return processedChange;
  213. }
  214. catch (error) {
  215. try {
  216. await this._processErrorIteratorMode(error, this.cursor.id != null);
  217. }
  218. catch (error) {
  219. if (error instanceof error_1.MongoOperationTimeoutError && this.cursor.id == null)
  220. throw error;
  221. try {
  222. await this.close();
  223. }
  224. catch (error) {
  225. (0, utils_1.squashError)(error);
  226. }
  227. throw error;
  228. }
  229. }
  230. }
  231. }
  232. finally {
  233. this.timeoutContext?.clear();
  234. }
  235. }
  236. async *[Symbol.asyncIterator]() {
  237. if (this.closed) {
  238. return;
  239. }
  240. try {
  241. // Change streams run indefinitely as long as errors are resumable
  242. // So the only loop breaking condition is if `next()` throws
  243. while (true) {
  244. yield await this.next();
  245. }
  246. }
  247. finally {
  248. try {
  249. await this.close();
  250. }
  251. catch (error) {
  252. (0, utils_1.squashError)(error);
  253. }
  254. }
  255. }
  256. /** Is the cursor closed */
  257. get closed() {
  258. return this.isClosed || this.cursor.closed;
  259. }
  260. /**
  261. * Frees the internal resources used by the change stream.
  262. */
  263. async close() {
  264. this.timeoutContext?.clear();
  265. this.timeoutContext = undefined;
  266. this.isClosed = true;
  267. const cursor = this.cursor;
  268. try {
  269. await cursor.close();
  270. }
  271. finally {
  272. this._endStream();
  273. }
  274. }
  275. /**
  276. * Return a modified Readable stream including a possible transform method.
  277. *
  278. * NOTE: When using a Stream to process change stream events, the stream will
  279. * NOT automatically resume in the case a resumable error is encountered.
  280. *
  281. * @throws MongoChangeStreamError if the underlying cursor or the change stream is closed
  282. */
  283. stream() {
  284. if (this.closed) {
  285. throw new error_1.MongoChangeStreamError(CHANGESTREAM_CLOSED_ERROR);
  286. }
  287. return this.cursor.stream();
  288. }
  289. /** @internal */
  290. _setIsEmitter() {
  291. if (this.mode === 'iterator') {
  292. // TODO(NODE-3485): Replace with MongoChangeStreamModeError
  293. throw new error_1.MongoAPIError('ChangeStream cannot be used as an EventEmitter after being used as an iterator');
  294. }
  295. this.mode = 'emitter';
  296. }
  297. /** @internal */
  298. _setIsIterator() {
  299. if (this.mode === 'emitter') {
  300. // TODO(NODE-3485): Replace with MongoChangeStreamModeError
  301. throw new error_1.MongoAPIError('ChangeStream cannot be used as an iterator after being used as an EventEmitter');
  302. }
  303. this.mode = 'iterator';
  304. }
  305. /**
  306. * Create a new change stream cursor based on self's configuration
  307. * @internal
  308. */
  309. _createChangeStreamCursor(options) {
  310. const changeStreamStageOptions = filterOutOptions(options);
  311. if (this.type === CHANGE_DOMAIN_TYPES.CLUSTER) {
  312. changeStreamStageOptions.allChangesForCluster = true;
  313. }
  314. const pipeline = [{ $changeStream: changeStreamStageOptions }, ...this.pipeline];
  315. const client = this.type === CHANGE_DOMAIN_TYPES.CLUSTER
  316. ? this.parent
  317. : this.type === CHANGE_DOMAIN_TYPES.DATABASE
  318. ? this.parent.client
  319. : this.type === CHANGE_DOMAIN_TYPES.COLLECTION
  320. ? this.parent.client
  321. : null;
  322. if (client == null) {
  323. // This should never happen because of the assertion in the constructor
  324. throw new error_1.MongoRuntimeError(`Changestream type should only be one of cluster, database, collection. Found ${this.type.toString()}`);
  325. }
  326. const changeStreamCursor = new change_stream_cursor_1.ChangeStreamCursor(client, this.namespace, pipeline, {
  327. ...options,
  328. timeoutContext: this.timeoutContext
  329. ? new abstract_cursor_1.CursorTimeoutContext(this.timeoutContext, this.contextOwner)
  330. : undefined
  331. });
  332. for (const event of CHANGE_STREAM_EVENTS) {
  333. changeStreamCursor.on(event, e => this.emit(event, e));
  334. }
  335. if (this.listenerCount(ChangeStream.CHANGE) > 0) {
  336. this._streamEvents(changeStreamCursor);
  337. }
  338. return changeStreamCursor;
  339. }
  340. /** @internal */
  341. _closeEmitterModeWithError(error) {
  342. this.emit(ChangeStream.ERROR, error);
  343. this.close().then(undefined, utils_1.squashError);
  344. }
  345. /** @internal */
  346. _streamEvents(cursor) {
  347. this._setIsEmitter();
  348. const stream = this.cursorStream ?? cursor.stream();
  349. this.cursorStream = stream;
  350. stream.on('data', change => {
  351. try {
  352. const processedChange = this._processChange(change);
  353. this.emit(ChangeStream.CHANGE, processedChange);
  354. }
  355. catch (error) {
  356. this.emit(ChangeStream.ERROR, error);
  357. }
  358. this.timeoutContext?.refresh();
  359. });
  360. stream.on('error', error => this._processErrorStreamMode(error, this.cursor.id != null));
  361. }
  362. /** @internal */
  363. _endStream() {
  364. this.cursorStream?.removeAllListeners('data');
  365. this.cursorStream?.removeAllListeners('close');
  366. this.cursorStream?.removeAllListeners('end');
  367. this.cursorStream?.destroy();
  368. this.cursorStream = undefined;
  369. }
  370. /** @internal */
  371. _processChange(change) {
  372. if (this.isClosed) {
  373. // TODO(NODE-3485): Replace with MongoChangeStreamClosedError
  374. throw new error_1.MongoAPIError(CHANGESTREAM_CLOSED_ERROR);
  375. }
  376. // a null change means the cursor has been notified, implicitly closing the change stream
  377. if (change == null) {
  378. // TODO(NODE-3485): Replace with MongoChangeStreamClosedError
  379. throw new error_1.MongoRuntimeError(CHANGESTREAM_CLOSED_ERROR);
  380. }
  381. if (change && !change._id) {
  382. throw new error_1.MongoChangeStreamError(NO_RESUME_TOKEN_ERROR);
  383. }
  384. // cache the resume token
  385. this.cursor.cacheResumeToken(change._id);
  386. // wipe the startAtOperationTime if there was one so that there won't be a conflict
  387. // between resumeToken and startAtOperationTime if we need to reconnect the cursor
  388. this.options.startAtOperationTime = undefined;
  389. return change;
  390. }
  391. /** @internal */
  392. _processErrorStreamMode(changeStreamError, cursorInitialized) {
  393. // If the change stream has been closed explicitly, do not process error.
  394. if (this.isClosed)
  395. return;
  396. if (cursorInitialized &&
  397. ((0, error_1.isResumableError)(changeStreamError, this.cursor.maxWireVersion) ||
  398. changeStreamError instanceof error_1.MongoOperationTimeoutError)) {
  399. this._endStream();
  400. this.cursor
  401. .close()
  402. .then(() => this._resume(changeStreamError), e => {
  403. (0, utils_1.squashError)(e);
  404. return this._resume(changeStreamError);
  405. })
  406. .then(() => {
  407. if (changeStreamError instanceof error_1.MongoOperationTimeoutError)
  408. this.emit(ChangeStream.ERROR, changeStreamError);
  409. }, () => this._closeEmitterModeWithError(changeStreamError));
  410. }
  411. else {
  412. this._closeEmitterModeWithError(changeStreamError);
  413. }
  414. }
  415. /** @internal */
  416. async _processErrorIteratorMode(changeStreamError, cursorInitialized) {
  417. if (this.isClosed) {
  418. // TODO(NODE-3485): Replace with MongoChangeStreamClosedError
  419. throw new error_1.MongoAPIError(CHANGESTREAM_CLOSED_ERROR);
  420. }
  421. if (cursorInitialized &&
  422. ((0, error_1.isResumableError)(changeStreamError, this.cursor.maxWireVersion) ||
  423. changeStreamError instanceof error_1.MongoOperationTimeoutError)) {
  424. try {
  425. await this.cursor.close();
  426. }
  427. catch (error) {
  428. (0, utils_1.squashError)(error);
  429. }
  430. await this._resume(changeStreamError);
  431. if (changeStreamError instanceof error_1.MongoOperationTimeoutError)
  432. throw changeStreamError;
  433. }
  434. else {
  435. try {
  436. await this.close();
  437. }
  438. catch (error) {
  439. (0, utils_1.squashError)(error);
  440. }
  441. throw changeStreamError;
  442. }
  443. }
  444. async _resume(changeStreamError) {
  445. this.timeoutContext?.refresh();
  446. const topology = (0, utils_1.getTopology)(this.parent);
  447. try {
  448. await topology.selectServer(this.cursor.readPreference, {
  449. operationName: 'reconnect topology in change stream',
  450. timeoutContext: this.timeoutContext
  451. });
  452. this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions);
  453. }
  454. catch {
  455. // if the topology can't reconnect, close the stream
  456. await this.close();
  457. throw changeStreamError;
  458. }
  459. }
  460. }
  461. exports.ChangeStream = ChangeStream;
  462. /**
  463. * This function returns a list of options that are *not* supported by the $changeStream
  464. * aggregation stage. This is best-effort - it uses the options "officially supported" by the driver
  465. * to derive a list of known, unsupported options for the $changeStream stage.
  466. *
  467. * Notably, at runtime, users can still provide options unknown to the driver and the driver will
  468. * *not* filter them out of the options object (see NODE-5510).
  469. */
  470. function buildDisallowedChangeStreamOptions() {
  471. const denyList = {
  472. allowDiskUse: '',
  473. authdb: '',
  474. batchSize: '',
  475. bsonRegExp: '',
  476. bypassDocumentValidation: '',
  477. bypassPinningCheck: '',
  478. checkKeys: '',
  479. collation: '',
  480. comment: '',
  481. cursor: '',
  482. dbName: '',
  483. enableUtf8Validation: '',
  484. explain: '',
  485. fieldsAsRaw: '',
  486. hint: '',
  487. ignoreUndefined: '',
  488. let: '',
  489. maxAwaitTimeMS: '',
  490. maxTimeMS: '',
  491. omitMaxTimeMS: '',
  492. out: '',
  493. promoteBuffers: '',
  494. promoteLongs: '',
  495. promoteValues: '',
  496. raw: '',
  497. rawData: '',
  498. readConcern: '',
  499. readPreference: '',
  500. serializeFunctions: '',
  501. session: '',
  502. timeoutContext: '',
  503. timeoutMS: '',
  504. timeoutMode: '',
  505. useBigInt64: '',
  506. willRetryWrite: '',
  507. writeConcern: ''
  508. };
  509. return new Set(Object.keys(denyList));
  510. }
  511. //# sourceMappingURL=change_stream.js.map