server.js 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.Server = void 0;
  4. const connection_1 = require("../cmap/connection");
  5. const connection_pool_1 = require("../cmap/connection_pool");
  6. const errors_1 = require("../cmap/errors");
  7. const constants_1 = require("../constants");
  8. const error_1 = require("../error");
  9. const mongo_types_1 = require("../mongo_types");
  10. const aggregate_1 = require("../operations/aggregate");
  11. const transactions_1 = require("../transactions");
  12. const utils_1 = require("../utils");
  13. const write_concern_1 = require("../write_concern");
  14. const common_1 = require("./common");
  15. const monitor_1 = require("./monitor");
  16. const server_description_1 = require("./server_description");
  17. const server_selection_1 = require("./server_selection");
  18. const stateTransition = (0, utils_1.makeStateMachine)({
  19. [common_1.STATE_CLOSED]: [common_1.STATE_CLOSED, common_1.STATE_CONNECTING],
  20. [common_1.STATE_CONNECTING]: [common_1.STATE_CONNECTING, common_1.STATE_CLOSING, common_1.STATE_CONNECTED, common_1.STATE_CLOSED],
  21. [common_1.STATE_CONNECTED]: [common_1.STATE_CONNECTED, common_1.STATE_CLOSING, common_1.STATE_CLOSED],
  22. [common_1.STATE_CLOSING]: [common_1.STATE_CLOSING, common_1.STATE_CLOSED]
  23. });
  24. /** @internal */
  25. class Server extends mongo_types_1.TypedEventEmitter {
  26. /** @event */
  27. static { this.SERVER_HEARTBEAT_STARTED = constants_1.SERVER_HEARTBEAT_STARTED; }
  28. /** @event */
  29. static { this.SERVER_HEARTBEAT_SUCCEEDED = constants_1.SERVER_HEARTBEAT_SUCCEEDED; }
  30. /** @event */
  31. static { this.SERVER_HEARTBEAT_FAILED = constants_1.SERVER_HEARTBEAT_FAILED; }
  32. /** @event */
  33. static { this.CONNECT = constants_1.CONNECT; }
  34. /** @event */
  35. static { this.DESCRIPTION_RECEIVED = constants_1.DESCRIPTION_RECEIVED; }
  36. /** @event */
  37. static { this.CLOSED = constants_1.CLOSED; }
  38. /** @event */
  39. static { this.ENDED = constants_1.ENDED; }
  40. /**
  41. * Create a server
  42. */
  43. constructor(topology, description, options) {
  44. super();
  45. this.on('error', utils_1.noop);
  46. this.serverApi = options.serverApi;
  47. const poolOptions = { hostAddress: description.hostAddress, ...options };
  48. this.topology = topology;
  49. this.pool = new connection_pool_1.ConnectionPool(this, poolOptions);
  50. this.s = {
  51. description,
  52. options,
  53. state: common_1.STATE_CLOSED,
  54. operationCount: 0
  55. };
  56. for (const event of [...constants_1.CMAP_EVENTS, ...constants_1.APM_EVENTS]) {
  57. this.pool.on(event, (e) => this.emit(event, e));
  58. }
  59. this.pool.on(connection_1.Connection.CLUSTER_TIME_RECEIVED, (clusterTime) => {
  60. this.clusterTime = clusterTime;
  61. });
  62. if (this.loadBalanced) {
  63. this.monitor = null;
  64. // monitoring is disabled in load balancing mode
  65. return;
  66. }
  67. // create the monitor
  68. this.monitor = new monitor_1.Monitor(this, this.s.options);
  69. for (const event of constants_1.HEARTBEAT_EVENTS) {
  70. this.monitor.on(event, (e) => this.emit(event, e));
  71. }
  72. this.monitor.on('resetServer', (error) => markServerUnknown(this, error));
  73. this.monitor.on(Server.SERVER_HEARTBEAT_SUCCEEDED, (event) => {
  74. this.emit(Server.DESCRIPTION_RECEIVED, new server_description_1.ServerDescription(this.description.hostAddress, event.reply, {
  75. roundTripTime: this.monitor?.roundTripTime,
  76. minRoundTripTime: this.monitor?.minRoundTripTime
  77. }));
  78. if (this.s.state === common_1.STATE_CONNECTING) {
  79. stateTransition(this, common_1.STATE_CONNECTED);
  80. this.emit(Server.CONNECT, this);
  81. }
  82. });
  83. }
  84. get clusterTime() {
  85. return this.topology.clusterTime;
  86. }
  87. set clusterTime(clusterTime) {
  88. this.topology.clusterTime = clusterTime;
  89. }
  90. get description() {
  91. return this.s.description;
  92. }
  93. get name() {
  94. return this.s.description.address;
  95. }
  96. get autoEncrypter() {
  97. if (this.s.options && this.s.options.autoEncrypter) {
  98. return this.s.options.autoEncrypter;
  99. }
  100. return;
  101. }
  102. get loadBalanced() {
  103. return this.topology.description.type === common_1.TopologyType.LoadBalanced;
  104. }
  105. /**
  106. * Initiate server connect
  107. */
  108. connect() {
  109. if (this.s.state !== common_1.STATE_CLOSED) {
  110. return;
  111. }
  112. stateTransition(this, common_1.STATE_CONNECTING);
  113. // If in load balancer mode we automatically set the server to
  114. // a load balancer. It never transitions out of this state and
  115. // has no monitor.
  116. if (!this.loadBalanced) {
  117. this.monitor?.connect();
  118. }
  119. else {
  120. stateTransition(this, common_1.STATE_CONNECTED);
  121. this.emit(Server.CONNECT, this);
  122. }
  123. }
  124. closeCheckedOutConnections() {
  125. return this.pool.closeCheckedOutConnections();
  126. }
  127. /** Destroy the server connection */
  128. close() {
  129. if (this.s.state === common_1.STATE_CLOSED) {
  130. return;
  131. }
  132. stateTransition(this, common_1.STATE_CLOSING);
  133. if (!this.loadBalanced) {
  134. this.monitor?.close();
  135. }
  136. this.pool.close();
  137. stateTransition(this, common_1.STATE_CLOSED);
  138. this.emit('closed');
  139. }
  140. /**
  141. * Immediately schedule monitoring of this server. If there already an attempt being made
  142. * this will be a no-op.
  143. */
  144. requestCheck() {
  145. if (!this.loadBalanced) {
  146. this.monitor?.requestCheck();
  147. }
  148. }
  149. async command(operation, timeoutContext) {
  150. if (this.s.state === common_1.STATE_CLOSING || this.s.state === common_1.STATE_CLOSED) {
  151. throw new error_1.MongoServerClosedError();
  152. }
  153. const session = operation.session;
  154. let conn = session?.pinnedConnection;
  155. this.incrementOperationCount();
  156. if (conn == null) {
  157. try {
  158. conn = await this.pool.checkOut({ timeoutContext, signal: operation.options.signal });
  159. }
  160. catch (checkoutError) {
  161. this.decrementOperationCount();
  162. if (!(checkoutError instanceof errors_1.PoolClearedError))
  163. this.handleError(checkoutError);
  164. throw checkoutError;
  165. }
  166. }
  167. let reauthPromise = null;
  168. const cleanup = () => {
  169. this.decrementOperationCount();
  170. if (session?.pinnedConnection !== conn) {
  171. if (reauthPromise != null) {
  172. // The reauth promise only exists if it hasn't thrown.
  173. const checkBackIn = () => {
  174. this.pool.checkIn(conn);
  175. };
  176. void reauthPromise.then(checkBackIn, checkBackIn);
  177. }
  178. else {
  179. this.pool.checkIn(conn);
  180. }
  181. }
  182. };
  183. let cmd;
  184. try {
  185. cmd = operation.buildCommand(conn, session);
  186. }
  187. catch (e) {
  188. cleanup();
  189. throw e;
  190. }
  191. const options = operation.buildOptions(timeoutContext);
  192. const ns = operation.ns;
  193. if (this.loadBalanced && isPinnableCommand(cmd, session) && !session?.pinnedConnection) {
  194. session?.pin(conn);
  195. }
  196. options.directConnection = this.topology.s.options.directConnection;
  197. const omitReadPreference = operation instanceof aggregate_1.AggregateOperation &&
  198. operation.hasWriteStage &&
  199. (0, utils_1.maxWireVersion)(conn) < server_selection_1.MIN_SECONDARY_WRITE_WIRE_VERSION;
  200. if (omitReadPreference) {
  201. delete options.readPreference;
  202. }
  203. if (this.description.iscryptd) {
  204. options.omitMaxTimeMS = true;
  205. }
  206. try {
  207. try {
  208. const res = await conn.command(ns, cmd, options, operation.SERVER_COMMAND_RESPONSE_TYPE);
  209. (0, write_concern_1.throwIfWriteConcernError)(res);
  210. return res;
  211. }
  212. catch (commandError) {
  213. throw this.decorateCommandError(conn, cmd, options, commandError);
  214. }
  215. }
  216. catch (operationError) {
  217. if (operationError instanceof error_1.MongoError &&
  218. operationError.code === error_1.MONGODB_ERROR_CODES.Reauthenticate) {
  219. reauthPromise = this.pool.reauthenticate(conn);
  220. reauthPromise.then(undefined, error => {
  221. reauthPromise = null;
  222. (0, utils_1.squashError)(error);
  223. });
  224. await (0, utils_1.abortable)(reauthPromise, options);
  225. reauthPromise = null; // only reachable if reauth succeeds
  226. try {
  227. const res = await conn.command(ns, cmd, options, operation.SERVER_COMMAND_RESPONSE_TYPE);
  228. (0, write_concern_1.throwIfWriteConcernError)(res);
  229. return res;
  230. }
  231. catch (commandError) {
  232. throw this.decorateCommandError(conn, cmd, options, commandError);
  233. }
  234. }
  235. else {
  236. throw operationError;
  237. }
  238. }
  239. finally {
  240. cleanup();
  241. }
  242. }
  243. /**
  244. * Handle SDAM error
  245. * @internal
  246. */
  247. handleError(error, connection) {
  248. if (!(error instanceof error_1.MongoError)) {
  249. return;
  250. }
  251. const isStaleError = error.connectionGeneration && error.connectionGeneration < this.pool.generation;
  252. if (isStaleError) {
  253. return;
  254. }
  255. const isNetworkNonTimeoutError = error instanceof error_1.MongoNetworkError && !(error instanceof error_1.MongoNetworkTimeoutError);
  256. const isNetworkTimeoutBeforeHandshakeError = error instanceof error_1.MongoNetworkError && error.beforeHandshake;
  257. const isAuthHandshakeError = error.hasErrorLabel(error_1.MongoErrorLabel.HandshakeError);
  258. if (isNetworkNonTimeoutError || isNetworkTimeoutBeforeHandshakeError || isAuthHandshakeError) {
  259. // In load balanced mode we never mark the server as unknown and always
  260. // clear for the specific service id.
  261. if (!this.loadBalanced) {
  262. error.addErrorLabel(error_1.MongoErrorLabel.ResetPool);
  263. markServerUnknown(this, error);
  264. }
  265. else if (connection) {
  266. this.pool.clear({ serviceId: connection.serviceId });
  267. }
  268. }
  269. else {
  270. if ((0, error_1.isSDAMUnrecoverableError)(error)) {
  271. if (shouldHandleStateChangeError(this, error)) {
  272. const shouldClearPool = (0, error_1.isNodeShuttingDownError)(error);
  273. if (this.loadBalanced && connection && shouldClearPool) {
  274. this.pool.clear({ serviceId: connection.serviceId });
  275. }
  276. if (!this.loadBalanced) {
  277. if (shouldClearPool) {
  278. error.addErrorLabel(error_1.MongoErrorLabel.ResetPool);
  279. }
  280. markServerUnknown(this, error);
  281. process.nextTick(() => this.requestCheck());
  282. }
  283. }
  284. }
  285. }
  286. }
  287. /**
  288. * Ensure that error is properly decorated and internal state is updated before throwing
  289. * @internal
  290. */
  291. decorateCommandError(connection, cmd, options, error) {
  292. if (typeof error !== 'object' || error == null || !('name' in error)) {
  293. throw new error_1.MongoRuntimeError('An unexpected error type: ' + typeof error);
  294. }
  295. if (error.name === 'AbortError' && 'cause' in error && error.cause instanceof error_1.MongoError) {
  296. error = error.cause;
  297. }
  298. if (!(error instanceof error_1.MongoError)) {
  299. // Node.js or some other error we have not special handling for
  300. return error;
  301. }
  302. if (connectionIsStale(this.pool, connection)) {
  303. return error;
  304. }
  305. const session = options?.session;
  306. if (error instanceof error_1.MongoNetworkError) {
  307. if (session && !session.hasEnded && session.serverSession) {
  308. session.serverSession.isDirty = true;
  309. }
  310. // inActiveTransaction check handles commit and abort.
  311. if (inActiveTransaction(session, cmd) &&
  312. !error.hasErrorLabel(error_1.MongoErrorLabel.TransientTransactionError)) {
  313. error.addErrorLabel(error_1.MongoErrorLabel.TransientTransactionError);
  314. }
  315. if ((isRetryableWritesEnabled(this.topology) || (0, transactions_1.isTransactionCommand)(cmd)) &&
  316. (0, utils_1.supportsRetryableWrites)(this) &&
  317. !inActiveTransaction(session, cmd)) {
  318. error.addErrorLabel(error_1.MongoErrorLabel.RetryableWriteError);
  319. }
  320. }
  321. else {
  322. if ((isRetryableWritesEnabled(this.topology) || (0, transactions_1.isTransactionCommand)(cmd)) &&
  323. (0, error_1.needsRetryableWriteLabel)(error, (0, utils_1.maxWireVersion)(this), this.description.type) &&
  324. !inActiveTransaction(session, cmd)) {
  325. error.addErrorLabel(error_1.MongoErrorLabel.RetryableWriteError);
  326. }
  327. }
  328. if (session &&
  329. session.isPinned &&
  330. error.hasErrorLabel(error_1.MongoErrorLabel.TransientTransactionError)) {
  331. session.unpin({ force: true });
  332. }
  333. this.handleError(error, connection);
  334. return error;
  335. }
  336. /**
  337. * Decrement the operation count, returning the new count.
  338. */
  339. decrementOperationCount() {
  340. return (this.s.operationCount -= 1);
  341. }
  342. /**
  343. * Increment the operation count, returning the new count.
  344. */
  345. incrementOperationCount() {
  346. return (this.s.operationCount += 1);
  347. }
  348. }
  349. exports.Server = Server;
  350. function markServerUnknown(server, error) {
  351. // Load balancer servers can never be marked unknown.
  352. if (server.loadBalanced) {
  353. return;
  354. }
  355. if (error instanceof error_1.MongoNetworkError && !(error instanceof error_1.MongoNetworkTimeoutError)) {
  356. server.monitor?.reset();
  357. }
  358. server.emit(Server.DESCRIPTION_RECEIVED, new server_description_1.ServerDescription(server.description.hostAddress, undefined, { error }));
  359. }
  360. function isPinnableCommand(cmd, session) {
  361. if (session) {
  362. return (session.inTransaction() ||
  363. (session.transaction.isCommitted && 'commitTransaction' in cmd) ||
  364. 'aggregate' in cmd ||
  365. 'find' in cmd ||
  366. 'getMore' in cmd ||
  367. 'listCollections' in cmd ||
  368. 'listIndexes' in cmd ||
  369. 'bulkWrite' in cmd);
  370. }
  371. return false;
  372. }
  373. function connectionIsStale(pool, connection) {
  374. if (connection.serviceId) {
  375. return (connection.generation !== pool.serviceGenerations.get(connection.serviceId.toHexString()));
  376. }
  377. return connection.generation !== pool.generation;
  378. }
  379. function shouldHandleStateChangeError(server, err) {
  380. const etv = err.topologyVersion;
  381. const stv = server.description.topologyVersion;
  382. return (0, server_description_1.compareTopologyVersion)(stv, etv) < 0;
  383. }
  384. function inActiveTransaction(session, cmd) {
  385. return session && session.inTransaction() && !(0, transactions_1.isTransactionCommand)(cmd);
  386. }
  387. /** this checks the retryWrites option passed down from the client options, it
  388. * does not check if the server supports retryable writes */
  389. function isRetryableWritesEnabled(topology) {
  390. return topology.s.options.retryWrites !== false;
  391. }
  392. //# sourceMappingURL=server.js.map