execute_operation.js 11 KB


  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.executeOperation = executeOperation;
  4. exports.autoConnect = autoConnect;
  5. const constants_1 = require("../cmap/wire_protocol/constants");
  6. const error_1 = require("../error");
  7. const read_preference_1 = require("../read_preference");
  8. const server_selection_1 = require("../sdam/server_selection");
  9. const timeout_1 = require("../timeout");
  10. const utils_1 = require("../utils");
  11. const aggregate_1 = require("./aggregate");
  12. const operation_1 = require("./operation");
  13. const MMAPv1_RETRY_WRITES_ERROR_CODE = error_1.MONGODB_ERROR_CODES.IllegalOperation;
  14. const MMAPv1_RETRY_WRITES_ERROR_MESSAGE = 'This MongoDB deployment does not support retryable writes. Please add retryWrites=false to your connection string.';
  15. /**
  16. * Executes the given operation with provided arguments.
  17. * @internal
  18. *
  19. * @remarks
  20. * Allows for a single point of entry to provide features such as implicit sessions, which
  21. * are required by the Driver Sessions specification in the event that a ClientSession is
  22. * not provided.
  23. *
  24. * The expectation is that this function:
  25. * - Connects the MongoClient if it has not already been connected, see {@link autoConnect}
  26. * - Creates a session if none is provided and cleans up the session it creates
  27. * - Tries an operation and retries under certain conditions, see {@link tryOperation}
  28. *
  29. * @typeParam T - The operation's type
  30. * @typeParam TResult - The type of the operation's result, calculated from T
  31. *
  32. * @param client - The MongoClient to execute this operation with
  33. * @param operation - The operation to execute
  34. */
  35. async function executeOperation(client, operation, timeoutContext) {
  36. if (!(operation instanceof operation_1.AbstractOperation)) {
  37. // TODO(NODE-3483): Extend MongoRuntimeError
  38. throw new error_1.MongoRuntimeError('This method requires a valid operation instance');
  39. }
  40. const topology = client.topology == null
  41. ? await (0, utils_1.abortable)(autoConnect(client), operation.options)
  42. : client.topology;
  43. // The driver sessions spec mandates that we implicitly create sessions for operations
  44. // that are not explicitly provided with a session.
  45. let session = operation.session;
  46. let owner;
  47. if (session == null) {
  48. owner = Symbol();
  49. session = client.startSession({ owner, explicit: false });
  50. }
  51. else if (session.hasEnded) {
  52. throw new error_1.MongoExpiredSessionError('Use of expired sessions is not permitted');
  53. }
  54. else if (session.snapshotEnabled &&
  55. (0, utils_1.maxWireVersion)(topology) < constants_1.MIN_SUPPORTED_SNAPSHOT_READS_WIRE_VERSION) {
  56. throw new error_1.MongoCompatibilityError('Snapshot reads require MongoDB 5.0 or later');
  57. }
  58. else if (session.client !== client) {
  59. throw new error_1.MongoInvalidArgumentError('ClientSession must be from the same MongoClient');
  60. }
  61. operation.session ??= session;
  62. const readPreference = operation.readPreference ?? read_preference_1.ReadPreference.primary;
  63. const inTransaction = !!session?.inTransaction();
  64. const hasReadAspect = operation.hasAspect(operation_1.Aspect.READ_OPERATION);
  65. if (inTransaction &&
  66. !readPreference.equals(read_preference_1.ReadPreference.primary) &&
  67. (hasReadAspect || operation.commandName === 'runCommand')) {
  68. throw new error_1.MongoTransactionError(`Read preference in a transaction must be primary, not: ${readPreference.mode}`);
  69. }
  70. if (session?.isPinned && session.transaction.isCommitted && !operation.bypassPinningCheck) {
  71. session.unpin();
  72. }
  73. timeoutContext ??= timeout_1.TimeoutContext.create({
  74. session,
  75. serverSelectionTimeoutMS: client.s.options.serverSelectionTimeoutMS,
  76. waitQueueTimeoutMS: client.s.options.waitQueueTimeoutMS,
  77. timeoutMS: operation.options.timeoutMS
  78. });
  79. try {
  80. return await tryOperation(operation, {
  81. topology,
  82. timeoutContext,
  83. session,
  84. readPreference
  85. });
  86. }
  87. finally {
  88. if (session?.owner != null && session.owner === owner) {
  89. await session.endSession();
  90. }
  91. }
  92. }
  93. /**
  94. * Connects a client if it has not yet been connected
  95. * @internal
  96. */
  97. async function autoConnect(client) {
  98. if (client.topology == null) {
  99. if (client.s.hasBeenClosed) {
  100. throw new error_1.MongoNotConnectedError('Client must be connected before running operations');
  101. }
  102. client.s.options.__skipPingOnConnect = true;
  103. try {
  104. await client.connect();
  105. if (client.topology == null) {
  106. throw new error_1.MongoRuntimeError('client.connect did not create a topology but also did not throw');
  107. }
  108. return client.topology;
  109. }
  110. finally {
  111. delete client.s.options.__skipPingOnConnect;
  112. }
  113. }
  114. return client.topology;
  115. }
  116. /**
  117. * Executes an operation and retries as appropriate
  118. * @internal
  119. *
  120. * @remarks
  121. * Implements behaviour described in [Retryable Reads](https://github.com/mongodb/specifications/blob/master/source/retryable-reads/retryable-reads.md) and [Retryable
  122. * Writes](https://github.com/mongodb/specifications/blob/master/source/retryable-writes/retryable-writes.md) specification
  123. *
  124. * This function:
  125. * - performs initial server selection
  126. * - attempts to execute an operation
  127. * - retries the operation if it meets the criteria for a retryable read or a retryable write
  128. *
  129. * @typeParam T - The operation's type
  130. * @typeParam TResult - The type of the operation's result, calculated from T
  131. *
  132. * @param operation - The operation to execute
  133. * */
  134. async function tryOperation(operation, { topology, timeoutContext, session, readPreference }) {
  135. let selector;
  136. if (operation.hasAspect(operation_1.Aspect.MUST_SELECT_SAME_SERVER)) {
  137. // GetMore and KillCursor operations must always select the same server, but run through
  138. // server selection to potentially force monitor checks if the server is
  139. // in an unknown state.
  140. selector = (0, server_selection_1.sameServerSelector)(operation.server?.description);
  141. }
  142. else if (operation instanceof aggregate_1.AggregateOperation && operation.hasWriteStage) {
  143. // If operation should try to write to secondary use the custom server selector
  144. // otherwise provide the read preference.
  145. selector = (0, server_selection_1.secondaryWritableServerSelector)(topology.commonWireVersion, readPreference);
  146. }
  147. else {
  148. selector = readPreference;
  149. }
  150. let server = await topology.selectServer(selector, {
  151. session,
  152. operationName: operation.commandName,
  153. timeoutContext,
  154. signal: operation.options.signal
  155. });
  156. const hasReadAspect = operation.hasAspect(operation_1.Aspect.READ_OPERATION);
  157. const hasWriteAspect = operation.hasAspect(operation_1.Aspect.WRITE_OPERATION);
  158. const inTransaction = session?.inTransaction() ?? false;
  159. const willRetryRead = topology.s.options.retryReads && !inTransaction && operation.canRetryRead;
  160. const willRetryWrite = topology.s.options.retryWrites &&
  161. !inTransaction &&
  162. (0, utils_1.supportsRetryableWrites)(server) &&
  163. operation.canRetryWrite;
  164. const willRetry = operation.hasAspect(operation_1.Aspect.RETRYABLE) &&
  165. session != null &&
  166. ((hasReadAspect && willRetryRead) || (hasWriteAspect && willRetryWrite));
  167. if (hasWriteAspect && willRetryWrite && session != null) {
  168. operation.options.willRetryWrite = true;
  169. session.incrementTransactionNumber();
  170. }
  171. const maxTries = willRetry ? (timeoutContext.csotEnabled() ? Infinity : 2) : 1;
  172. let previousOperationError;
  173. let previousServer;
  174. for (let tries = 0; tries < maxTries; tries++) {
  175. if (previousOperationError) {
  176. if (hasWriteAspect && previousOperationError.code === MMAPv1_RETRY_WRITES_ERROR_CODE) {
  177. throw new error_1.MongoServerError({
  178. message: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
  179. errmsg: MMAPv1_RETRY_WRITES_ERROR_MESSAGE,
  180. originalError: previousOperationError
  181. });
  182. }
  183. if (operation.hasAspect(operation_1.Aspect.COMMAND_BATCHING) && !operation.canRetryWrite) {
  184. throw previousOperationError;
  185. }
  186. if (hasWriteAspect && !(0, error_1.isRetryableWriteError)(previousOperationError))
  187. throw previousOperationError;
  188. if (hasReadAspect && !(0, error_1.isRetryableReadError)(previousOperationError)) {
  189. throw previousOperationError;
  190. }
  191. if (previousOperationError instanceof error_1.MongoNetworkError &&
  192. operation.hasAspect(operation_1.Aspect.CURSOR_CREATING) &&
  193. session != null &&
  194. session.isPinned &&
  195. !session.inTransaction()) {
  196. session.unpin({ force: true, forceClear: true });
  197. }
  198. server = await topology.selectServer(selector, {
  199. session,
  200. operationName: operation.commandName,
  201. previousServer,
  202. signal: operation.options.signal
  203. });
  204. if (hasWriteAspect && !(0, utils_1.supportsRetryableWrites)(server)) {
  205. throw new error_1.MongoUnexpectedServerResponseError('Selected server does not support retryable writes');
  206. }
  207. }
  208. operation.server = server;
  209. try {
  210. // If tries > 0 and we are command batching we need to reset the batch.
  211. if (tries > 0 && operation.hasAspect(operation_1.Aspect.COMMAND_BATCHING)) {
  212. operation.resetBatch();
  213. }
  214. try {
  215. const result = await server.command(operation, timeoutContext);
  216. return operation.handleOk(result);
  217. }
  218. catch (error) {
  219. return operation.handleError(error);
  220. }
  221. }
  222. catch (operationError) {
  223. if (!(operationError instanceof error_1.MongoError))
  224. throw operationError;
  225. if (previousOperationError != null &&
  226. operationError.hasErrorLabel(error_1.MongoErrorLabel.NoWritesPerformed)) {
  227. throw previousOperationError;
  228. }
  229. previousServer = server.description;
  230. previousOperationError = operationError;
  231. // Reset timeouts
  232. timeoutContext.clear();
  233. }
  234. }
  235. throw (previousOperationError ??
  236. new error_1.MongoRuntimeError('Tried to propagate retryability error, but no error was found.'));
  237. }
  238. //# sourceMappingURL=execute_operation.js.map