connection.js 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.CryptoConnection = exports.SizedMessageTransform = exports.Connection = void 0;
  4. exports.hasSessionSupport = hasSessionSupport;
  5. const stream_1 = require("stream");
  6. const timers_1 = require("timers");
  7. const bson_1 = require("../bson");
  8. const constants_1 = require("../constants");
  9. const error_1 = require("../error");
  10. const mongo_logger_1 = require("../mongo_logger");
  11. const mongo_types_1 = require("../mongo_types");
  12. const read_preference_1 = require("../read_preference");
  13. const common_1 = require("../sdam/common");
  14. const sessions_1 = require("../sessions");
  15. const timeout_1 = require("../timeout");
  16. const utils_1 = require("../utils");
  17. const command_monitoring_events_1 = require("./command_monitoring_events");
  18. const commands_1 = require("./commands");
  19. const stream_description_1 = require("./stream_description");
  20. const compression_1 = require("./wire_protocol/compression");
  21. const on_data_1 = require("./wire_protocol/on_data");
  22. const responses_1 = require("./wire_protocol/responses");
  23. const shared_1 = require("./wire_protocol/shared");
  24. /** @internal */
  25. function hasSessionSupport(conn) {
  26. const description = conn.description;
  27. return description.logicalSessionTimeoutMinutes != null;
  28. }
  29. function streamIdentifier(stream, options) {
  30. if (options.proxyHost) {
  31. // If proxy options are specified, the properties of `stream` itself
  32. // will not accurately reflect what endpoint this is connected to.
  33. return options.hostAddress.toString();
  34. }
  35. const { remoteAddress, remotePort } = stream;
  36. if (typeof remoteAddress === 'string' && typeof remotePort === 'number') {
  37. return utils_1.HostAddress.fromHostPort(remoteAddress, remotePort).toString();
  38. }
  39. return (0, utils_1.uuidV4)().toString('hex');
  40. }
  41. /** @internal */
  42. class Connection extends mongo_types_1.TypedEventEmitter {
  43. /** @event */
  44. static { this.COMMAND_STARTED = constants_1.COMMAND_STARTED; }
  45. /** @event */
  46. static { this.COMMAND_SUCCEEDED = constants_1.COMMAND_SUCCEEDED; }
  47. /** @event */
  48. static { this.COMMAND_FAILED = constants_1.COMMAND_FAILED; }
  49. /** @event */
  50. static { this.CLUSTER_TIME_RECEIVED = constants_1.CLUSTER_TIME_RECEIVED; }
  51. /** @event */
  52. static { this.CLOSE = constants_1.CLOSE; }
  53. /** @event */
  54. static { this.PINNED = constants_1.PINNED; }
  55. /** @event */
  56. static { this.UNPINNED = constants_1.UNPINNED; }
  57. constructor(stream, options) {
  58. super();
  59. this.lastHelloMS = -1;
  60. this.helloOk = false;
  61. this.delayedTimeoutId = null;
  62. /** Indicates that the connection (including underlying TCP socket) has been closed. */
  63. this.closed = false;
  64. this.clusterTime = null;
  65. this.error = null;
  66. this.dataEvents = null;
  67. this.on('error', utils_1.noop);
  68. this.socket = stream;
  69. this.id = options.id;
  70. this.address = streamIdentifier(stream, options);
  71. this.socketTimeoutMS = options.socketTimeoutMS ?? 0;
  72. this.monitorCommands = options.monitorCommands;
  73. this.serverApi = options.serverApi;
  74. this.mongoLogger = options.mongoLogger;
  75. this.established = false;
  76. this.description = new stream_description_1.StreamDescription(this.address, options);
  77. this.generation = options.generation;
  78. this.lastUseTime = (0, utils_1.now)();
  79. this.messageStream = this.socket
  80. .on('error', this.onSocketError.bind(this))
  81. .pipe(new SizedMessageTransform({ connection: this }))
  82. .on('error', this.onTransformError.bind(this));
  83. this.socket.on('close', this.onClose.bind(this));
  84. this.socket.on('timeout', this.onTimeout.bind(this));
  85. this.messageStream.pause();
  86. }
  87. get hello() {
  88. return this.description.hello;
  89. }
  90. // the `connect` method stores the result of the handshake hello on the connection
  91. set hello(response) {
  92. this.description.receiveResponse(response);
  93. Object.freeze(this.description);
  94. }
  95. get serviceId() {
  96. return this.hello?.serviceId;
  97. }
  98. get loadBalanced() {
  99. return this.description.loadBalanced;
  100. }
  101. get idleTime() {
  102. return (0, utils_1.calculateDurationInMs)(this.lastUseTime);
  103. }
  104. get hasSessionSupport() {
  105. return this.description.logicalSessionTimeoutMinutes != null;
  106. }
  107. get supportsOpMsg() {
  108. return (this.description != null &&
  109. // TODO(NODE-6672,NODE-6287): This guard is primarily for maxWireVersion = 0
  110. (0, utils_1.maxWireVersion)(this) >= 6 &&
  111. !this.description.__nodejs_mock_server__);
  112. }
  113. get shouldEmitAndLogCommand() {
  114. return ((this.monitorCommands ||
  115. (this.established &&
  116. !this.authContext?.reauthenticating &&
  117. this.mongoLogger?.willLog(mongo_logger_1.MongoLoggableComponent.COMMAND, mongo_logger_1.SeverityLevel.DEBUG))) ??
  118. false);
  119. }
  120. markAvailable() {
  121. this.lastUseTime = (0, utils_1.now)();
  122. }
  123. onSocketError(cause) {
  124. this.onError(new error_1.MongoNetworkError(cause.message, { cause }));
  125. }
  126. onTransformError(error) {
  127. this.onError(error);
  128. }
  129. onError(error) {
  130. this.cleanup(error);
  131. }
  132. onClose() {
  133. const message = `connection ${this.id} to ${this.address} closed`;
  134. this.cleanup(new error_1.MongoNetworkError(message));
  135. }
  136. onTimeout() {
  137. this.delayedTimeoutId = (0, timers_1.setTimeout)(() => {
  138. const message = `connection ${this.id} to ${this.address} timed out`;
  139. const beforeHandshake = this.hello == null;
  140. this.cleanup(new error_1.MongoNetworkTimeoutError(message, { beforeHandshake }));
  141. }, 1).unref(); // No need for this timer to hold the event loop open
  142. }
  143. destroy() {
  144. if (this.closed) {
  145. return;
  146. }
  147. // load balanced mode requires that these listeners remain on the connection
  148. // after cleanup on timeouts, errors or close so we remove them before calling
  149. // cleanup.
  150. this.removeAllListeners(Connection.PINNED);
  151. this.removeAllListeners(Connection.UNPINNED);
  152. const message = `connection ${this.id} to ${this.address} closed`;
  153. this.cleanup(new error_1.MongoNetworkError(message));
  154. }
  155. /**
  156. * A method that cleans up the connection. When `force` is true, this method
  157. * forcibly destroys the socket.
  158. *
  159. * If an error is provided, any in-flight operations will be closed with the error.
  160. *
  161. * This method does nothing if the connection is already closed.
  162. */
  163. cleanup(error) {
  164. if (this.closed) {
  165. return;
  166. }
  167. this.socket.destroy();
  168. this.error = error;
  169. this.dataEvents?.throw(error).then(undefined, utils_1.squashError);
  170. this.closed = true;
  171. this.emit(Connection.CLOSE);
  172. }
  173. prepareCommand(db, command, options) {
  174. let cmd = { ...command };
  175. const readPreference = (0, shared_1.getReadPreference)(options);
  176. const session = options?.session;
  177. let clusterTime = this.clusterTime;
  178. if (this.serverApi) {
  179. const { version, strict, deprecationErrors } = this.serverApi;
  180. cmd.apiVersion = version;
  181. if (strict != null)
  182. cmd.apiStrict = strict;
  183. if (deprecationErrors != null)
  184. cmd.apiDeprecationErrors = deprecationErrors;
  185. }
  186. if (this.hasSessionSupport && session) {
  187. if (session.clusterTime &&
  188. clusterTime &&
  189. session.clusterTime.clusterTime.greaterThan(clusterTime.clusterTime)) {
  190. clusterTime = session.clusterTime;
  191. }
  192. const sessionError = (0, sessions_1.applySession)(session, cmd, options);
  193. if (sessionError)
  194. throw sessionError;
  195. }
  196. else if (session?.explicit) {
  197. throw new error_1.MongoCompatibilityError('Current topology does not support sessions');
  198. }
  199. // if we have a known cluster time, gossip it
  200. if (clusterTime) {
  201. cmd.$clusterTime = clusterTime;
  202. }
  203. // For standalone, drivers MUST NOT set $readPreference.
  204. if (this.description.type !== common_1.ServerType.Standalone) {
  205. if (!(0, shared_1.isSharded)(this) &&
  206. !this.description.loadBalanced &&
  207. this.supportsOpMsg &&
  208. options.directConnection === true &&
  209. readPreference?.mode === 'primary') {
  210. // For mongos and load balancers with 'primary' mode, drivers MUST NOT set $readPreference.
  211. // For all other types with a direct connection, if the read preference is 'primary'
  212. // (driver sets 'primary' as default if no read preference is configured),
  213. // the $readPreference MUST be set to 'primaryPreferred'
  214. // to ensure that any server type can handle the request.
  215. cmd.$readPreference = read_preference_1.ReadPreference.primaryPreferred.toJSON();
  216. }
  217. else if ((0, shared_1.isSharded)(this) && !this.supportsOpMsg && readPreference?.mode !== 'primary') {
  218. // When sending a read operation via OP_QUERY and the $readPreference modifier,
  219. // the query MUST be provided using the $query modifier.
  220. cmd = {
  221. $query: cmd,
  222. $readPreference: readPreference.toJSON()
  223. };
  224. }
  225. else if (readPreference?.mode !== 'primary') {
  226. // For mode 'primary', drivers MUST NOT set $readPreference.
  227. // For all other read preference modes (i.e. 'secondary', 'primaryPreferred', ...),
  228. // drivers MUST set $readPreference
  229. cmd.$readPreference = readPreference.toJSON();
  230. }
  231. }
  232. const commandOptions = {
  233. numberToSkip: 0,
  234. numberToReturn: -1,
  235. checkKeys: false,
  236. // This value is not overridable
  237. secondaryOk: readPreference.secondaryOk(),
  238. ...options
  239. };
  240. options.timeoutContext?.addMaxTimeMSToCommand(cmd, options);
  241. const message = this.supportsOpMsg
  242. ? new commands_1.OpMsgRequest(db, cmd, commandOptions)
  243. : new commands_1.OpQueryRequest(db, cmd, commandOptions);
  244. return message;
  245. }
  246. async *sendWire(message, options, responseType) {
  247. this.throwIfAborted();
  248. const timeout = options.socketTimeoutMS ??
  249. options?.timeoutContext?.getSocketTimeoutMS() ??
  250. this.socketTimeoutMS;
  251. this.socket.setTimeout(timeout);
  252. try {
  253. await this.writeCommand(message, {
  254. agreedCompressor: this.description.compressor ?? 'none',
  255. zlibCompressionLevel: this.description.zlibCompressionLevel,
  256. timeoutContext: options.timeoutContext,
  257. signal: options.signal
  258. });
  259. if (message.moreToCome) {
  260. yield responses_1.MongoDBResponse.empty;
  261. return;
  262. }
  263. this.throwIfAborted();
  264. if (options.timeoutContext?.csotEnabled() &&
  265. options.timeoutContext.minRoundTripTime != null &&
  266. options.timeoutContext.remainingTimeMS < options.timeoutContext.minRoundTripTime) {
  267. throw new error_1.MongoOperationTimeoutError('Server roundtrip time is greater than the time remaining');
  268. }
  269. for await (const response of this.readMany(options)) {
  270. this.socket.setTimeout(0);
  271. const bson = response.parse();
  272. const document = (responseType ?? responses_1.MongoDBResponse).make(bson);
  273. yield document;
  274. this.throwIfAborted();
  275. this.socket.setTimeout(timeout);
  276. }
  277. }
  278. finally {
  279. this.socket.setTimeout(0);
  280. }
  281. }
  282. async *sendCommand(ns, command, options, responseType) {
  283. options?.signal?.throwIfAborted();
  284. const message = this.prepareCommand(ns.db, command, options);
  285. let started = 0;
  286. if (this.shouldEmitAndLogCommand) {
  287. started = (0, utils_1.now)();
  288. this.emitAndLogCommand(this.monitorCommands, Connection.COMMAND_STARTED, message.databaseName, this.established, new command_monitoring_events_1.CommandStartedEvent(this, message, this.description.serverConnectionId));
  289. }
  290. // If `documentsReturnedIn` not set or raw is not enabled, use input bson options
  291. // Otherwise, support raw flag. Raw only works for cursors that hardcode firstBatch/nextBatch fields
  292. const bsonOptions = options.documentsReturnedIn == null || !options.raw
  293. ? options
  294. : {
  295. ...options,
  296. raw: false,
  297. fieldsAsRaw: { [options.documentsReturnedIn]: true }
  298. };
  299. /** MongoDBResponse instance or subclass */
  300. let document = undefined;
  301. /** Cached result of a toObject call */
  302. let object = undefined;
  303. try {
  304. this.throwIfAborted();
  305. for await (document of this.sendWire(message, options, responseType)) {
  306. object = undefined;
  307. if (options.session != null) {
  308. (0, sessions_1.updateSessionFromResponse)(options.session, document);
  309. }
  310. if (document.$clusterTime) {
  311. this.clusterTime = document.$clusterTime;
  312. this.emit(Connection.CLUSTER_TIME_RECEIVED, document.$clusterTime);
  313. }
  314. if (document.ok === 0) {
  315. if (options.timeoutContext?.csotEnabled() && document.isMaxTimeExpiredError) {
  316. throw new error_1.MongoOperationTimeoutError('Server reported a timeout error', {
  317. cause: new error_1.MongoServerError((object ??= document.toObject(bsonOptions)))
  318. });
  319. }
  320. throw new error_1.MongoServerError((object ??= document.toObject(bsonOptions)));
  321. }
  322. if (this.shouldEmitAndLogCommand) {
  323. this.emitAndLogCommand(this.monitorCommands, Connection.COMMAND_SUCCEEDED, message.databaseName, this.established, new command_monitoring_events_1.CommandSucceededEvent(this, message, message.moreToCome ? { ok: 1 } : (object ??= document.toObject(bsonOptions)), started, this.description.serverConnectionId));
  324. }
  325. if (responseType == null) {
  326. yield (object ??= document.toObject(bsonOptions));
  327. }
  328. else {
  329. yield document;
  330. }
  331. this.throwIfAborted();
  332. }
  333. }
  334. catch (error) {
  335. if (this.shouldEmitAndLogCommand) {
  336. this.emitAndLogCommand(this.monitorCommands, Connection.COMMAND_FAILED, message.databaseName, this.established, new command_monitoring_events_1.CommandFailedEvent(this, message, error, started, this.description.serverConnectionId));
  337. }
  338. throw error;
  339. }
  340. }
  341. async command(ns, command, options = {}, responseType) {
  342. this.throwIfAborted();
  343. options.signal?.throwIfAborted();
  344. for await (const document of this.sendCommand(ns, command, options, responseType)) {
  345. if (options.timeoutContext?.csotEnabled()) {
  346. if (responses_1.MongoDBResponse.is(document)) {
  347. if (document.isMaxTimeExpiredError) {
  348. throw new error_1.MongoOperationTimeoutError('Server reported a timeout error', {
  349. cause: new error_1.MongoServerError(document.toObject())
  350. });
  351. }
  352. }
  353. else {
  354. if ((Array.isArray(document?.writeErrors) &&
  355. document.writeErrors.some(error => error?.code === error_1.MONGODB_ERROR_CODES.MaxTimeMSExpired)) ||
  356. document?.writeConcernError?.code === error_1.MONGODB_ERROR_CODES.MaxTimeMSExpired) {
  357. throw new error_1.MongoOperationTimeoutError('Server reported a timeout error', {
  358. cause: new error_1.MongoServerError(document)
  359. });
  360. }
  361. }
  362. }
  363. return document;
  364. }
  365. throw new error_1.MongoUnexpectedServerResponseError('Unable to get response from server');
  366. }
  367. exhaustCommand(ns, command, options, replyListener) {
  368. const exhaustLoop = async () => {
  369. this.throwIfAborted();
  370. for await (const reply of this.sendCommand(ns, command, options)) {
  371. replyListener(undefined, reply);
  372. this.throwIfAborted();
  373. }
  374. throw new error_1.MongoUnexpectedServerResponseError('Server ended moreToCome unexpectedly');
  375. };
  376. exhaustLoop().then(undefined, replyListener);
  377. }
  378. throwIfAborted() {
  379. if (this.error)
  380. throw this.error;
  381. }
  382. /**
  383. * @internal
  384. *
  385. * Writes an OP_MSG or OP_QUERY request to the socket, optionally compressing the command. This method
  386. * waits until the socket's buffer has emptied (the Nodejs socket `drain` event has fired).
  387. */
  388. async writeCommand(command, options) {
  389. const finalCommand = options.agreedCompressor === 'none' || !commands_1.OpCompressedRequest.canCompress(command)
  390. ? command
  391. : new commands_1.OpCompressedRequest(command, {
  392. agreedCompressor: options.agreedCompressor ?? 'none',
  393. zlibCompressionLevel: options.zlibCompressionLevel ?? 0
  394. });
  395. const buffer = Buffer.concat(await finalCommand.toBin());
  396. if (options.timeoutContext?.csotEnabled()) {
  397. if (options.timeoutContext.minRoundTripTime != null &&
  398. options.timeoutContext.remainingTimeMS < options.timeoutContext.minRoundTripTime) {
  399. throw new error_1.MongoOperationTimeoutError('Server roundtrip time is greater than the time remaining');
  400. }
  401. }
  402. try {
  403. if (this.socket.write(buffer))
  404. return;
  405. }
  406. catch (writeError) {
  407. const networkError = new error_1.MongoNetworkError('unexpected error writing to socket', {
  408. cause: writeError
  409. });
  410. this.onError(networkError);
  411. throw networkError;
  412. }
  413. const drainEvent = (0, utils_1.once)(this.socket, 'drain', options);
  414. const timeout = options?.timeoutContext?.timeoutForSocketWrite;
  415. const drained = timeout ? Promise.race([drainEvent, timeout]) : drainEvent;
  416. try {
  417. return await drained;
  418. }
  419. catch (writeError) {
  420. if (timeout_1.TimeoutError.is(writeError)) {
  421. const timeoutError = new error_1.MongoOperationTimeoutError('Timed out at socket write');
  422. this.onError(timeoutError);
  423. throw timeoutError;
  424. }
  425. else if (writeError === options.signal?.reason) {
  426. this.onError(writeError);
  427. }
  428. throw writeError;
  429. }
  430. finally {
  431. timeout?.clear();
  432. }
  433. }
  434. /**
  435. * @internal
  436. *
  437. * Returns an async generator that yields full wire protocol messages from the underlying socket. This function
  438. * yields messages until `moreToCome` is false or not present in a response, or the caller cancels the request
  439. * by calling `return` on the generator.
  440. *
  441. * Note that `for-await` loops call `return` automatically when the loop is exited.
  442. */
  443. async *readMany(options) {
  444. try {
  445. this.dataEvents = (0, on_data_1.onData)(this.messageStream, options);
  446. this.messageStream.resume();
  447. for await (const message of this.dataEvents) {
  448. const response = await (0, compression_1.decompressResponse)(message);
  449. yield response;
  450. if (!response.moreToCome) {
  451. return;
  452. }
  453. }
  454. }
  455. catch (readError) {
  456. if (timeout_1.TimeoutError.is(readError)) {
  457. const timeoutError = new error_1.MongoOperationTimeoutError(`Timed out during socket read (${readError.duration}ms)`);
  458. this.dataEvents = null;
  459. this.onError(timeoutError);
  460. throw timeoutError;
  461. }
  462. else if (readError === options.signal?.reason) {
  463. this.onError(readError);
  464. }
  465. throw readError;
  466. }
  467. finally {
  468. this.dataEvents = null;
  469. this.messageStream.pause();
  470. }
  471. }
  472. }
  473. exports.Connection = Connection;
  474. /** @internal */
  475. class SizedMessageTransform extends stream_1.Transform {
  476. constructor({ connection }) {
  477. super({ writableObjectMode: false, readableObjectMode: true });
  478. this.bufferPool = new utils_1.BufferPool();
  479. this.connection = connection;
  480. }
  481. _transform(chunk, encoding, callback) {
  482. if (this.connection.delayedTimeoutId != null) {
  483. (0, timers_1.clearTimeout)(this.connection.delayedTimeoutId);
  484. this.connection.delayedTimeoutId = null;
  485. }
  486. this.bufferPool.append(chunk);
  487. while (this.bufferPool.length) {
  488. // While there are any bytes in the buffer
  489. // Try to fetch a size from the top 4 bytes
  490. const sizeOfMessage = this.bufferPool.getInt32();
  491. if (sizeOfMessage == null) {
  492. // Not even an int32 worth of data. Stop the loop, we need more chunks.
  493. break;
  494. }
  495. if (sizeOfMessage < 0) {
  496. // The size in the message has a negative value, this is probably corruption, throw:
  497. return callback(new error_1.MongoParseError(`Message size cannot be negative: ${sizeOfMessage}`));
  498. }
  499. if (sizeOfMessage > this.bufferPool.length) {
  500. // We do not have enough bytes to make a sizeOfMessage chunk
  501. break;
  502. }
  503. // Add a message to the stream
  504. const message = this.bufferPool.read(sizeOfMessage);
  505. if (!this.push(message)) {
  506. // We only subscribe to data events so we should never get backpressure
  507. // if we do, we do not have the handling for it.
  508. return callback(new error_1.MongoRuntimeError(`SizedMessageTransform does not support backpressure`));
  509. }
  510. }
  511. callback();
  512. }
  513. }
  514. exports.SizedMessageTransform = SizedMessageTransform;
  515. /** @internal */
  516. class CryptoConnection extends Connection {
  517. constructor(stream, options) {
  518. super(stream, options);
  519. this.autoEncrypter = options.autoEncrypter;
  520. }
  521. async command(ns, cmd, options, responseType) {
  522. const { autoEncrypter } = this;
  523. if (!autoEncrypter) {
  524. throw new error_1.MongoRuntimeError('No AutoEncrypter available for encryption');
  525. }
  526. const serverWireVersion = (0, utils_1.maxWireVersion)(this);
  527. if (serverWireVersion === 0) {
  528. // This means the initial handshake hasn't happened yet
  529. return await super.command(ns, cmd, options, responseType);
  530. }
  531. // Save sort or indexKeys based on the command being run
  532. // the encrypt API serializes our JS objects to BSON to pass to the native code layer
  533. // and then deserializes the encrypted result, the protocol level components
  534. // of the command (ex. sort) are then converted to JS objects potentially losing
  535. // import key order information. These fields are never encrypted so we can save the values
  536. // from before the encryption and replace them after encryption has been performed
  537. const sort = cmd.find || cmd.findAndModify ? cmd.sort : null;
  538. const indexKeys = cmd.createIndexes
  539. ? cmd.indexes.map((index) => index.key)
  540. : null;
  541. const encrypted = await autoEncrypter.encrypt(ns.toString(), cmd, options);
  542. // Replace the saved values
  543. if (sort != null && (cmd.find || cmd.findAndModify)) {
  544. encrypted.sort = sort;
  545. }
  546. if (indexKeys != null && cmd.createIndexes) {
  547. for (const [offset, index] of indexKeys.entries()) {
  548. // @ts-expect-error `encrypted` is a generic "command", but we've narrowed for only `createIndexes` commands here
  549. encrypted.indexes[offset].key = index;
  550. }
  551. }
  552. const encryptedResponse = await super.command(ns, encrypted, options,
  553. // Eventually we want to require `responseType` which means we would satisfy `T` as the return type.
  554. // In the meantime, we want encryptedResponse to always be _at least_ a MongoDBResponse if not a more specific subclass
  555. // So that we can ensure we have access to the on-demand APIs for decorate response
  556. responseType ?? responses_1.MongoDBResponse);
  557. const result = await autoEncrypter.decrypt(encryptedResponse.toBytes(), options);
  558. const decryptedResponse = responseType?.make(result) ?? (0, bson_1.deserialize)(result, options);
  559. if (autoEncrypter[constants_1.kDecorateResult]) {
  560. if (responseType == null) {
  561. (0, utils_1.decorateDecryptionResult)(decryptedResponse, encryptedResponse.toObject(), true);
  562. }
  563. else if (decryptedResponse instanceof responses_1.CursorResponse) {
  564. decryptedResponse.encryptedResponse = encryptedResponse;
  565. }
  566. }
  567. return decryptedResponse;
  568. }
  569. }
  570. exports.CryptoConnection = CryptoConnection;
  571. //# sourceMappingURL=connection.js.map