connection_pool.js 24 KB


  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.ConnectionPool = exports.PoolState = void 0;
  4. const timers_1 = require("timers");
  5. const constants_1 = require("../constants");
  6. const error_1 = require("../error");
  7. const mongo_types_1 = require("../mongo_types");
  8. const timeout_1 = require("../timeout");
  9. const utils_1 = require("../utils");
  10. const connect_1 = require("./connect");
  11. const connection_1 = require("./connection");
  12. const connection_pool_events_1 = require("./connection_pool_events");
  13. const errors_1 = require("./errors");
  14. const metrics_1 = require("./metrics");
  15. /** @internal */
  16. exports.PoolState = Object.freeze({
  17. paused: 'paused',
  18. ready: 'ready',
  19. closed: 'closed'
  20. });
  21. /**
  22. * A pool of connections which dynamically resizes, and emit events related to pool activity
  23. * @internal
  24. */
  25. class ConnectionPool extends mongo_types_1.TypedEventEmitter {
  26. /**
  27. * Emitted when the connection pool is created.
  28. * @event
  29. */
  30. static { this.CONNECTION_POOL_CREATED = constants_1.CONNECTION_POOL_CREATED; }
  31. /**
  32. * Emitted once when the connection pool is closed
  33. * @event
  34. */
  35. static { this.CONNECTION_POOL_CLOSED = constants_1.CONNECTION_POOL_CLOSED; }
  36. /**
  37. * Emitted each time the connection pool is cleared and it's generation incremented
  38. * @event
  39. */
  40. static { this.CONNECTION_POOL_CLEARED = constants_1.CONNECTION_POOL_CLEARED; }
  41. /**
  42. * Emitted each time the connection pool is marked ready
  43. * @event
  44. */
  45. static { this.CONNECTION_POOL_READY = constants_1.CONNECTION_POOL_READY; }
  46. /**
  47. * Emitted when a connection is created.
  48. * @event
  49. */
  50. static { this.CONNECTION_CREATED = constants_1.CONNECTION_CREATED; }
  51. /**
  52. * Emitted when a connection becomes established, and is ready to use
  53. * @event
  54. */
  55. static { this.CONNECTION_READY = constants_1.CONNECTION_READY; }
  56. /**
  57. * Emitted when a connection is closed
  58. * @event
  59. */
  60. static { this.CONNECTION_CLOSED = constants_1.CONNECTION_CLOSED; }
  61. /**
  62. * Emitted when an attempt to check out a connection begins
  63. * @event
  64. */
  65. static { this.CONNECTION_CHECK_OUT_STARTED = constants_1.CONNECTION_CHECK_OUT_STARTED; }
  66. /**
  67. * Emitted when an attempt to check out a connection fails
  68. * @event
  69. */
  70. static { this.CONNECTION_CHECK_OUT_FAILED = constants_1.CONNECTION_CHECK_OUT_FAILED; }
  71. /**
  72. * Emitted each time a connection is successfully checked out of the connection pool
  73. * @event
  74. */
  75. static { this.CONNECTION_CHECKED_OUT = constants_1.CONNECTION_CHECKED_OUT; }
  76. /**
  77. * Emitted each time a connection is successfully checked into the connection pool
  78. * @event
  79. */
  80. static { this.CONNECTION_CHECKED_IN = constants_1.CONNECTION_CHECKED_IN; }
  81. constructor(server, options) {
  82. super();
  83. this.on('error', utils_1.noop);
  84. this.options = Object.freeze({
  85. connectionType: connection_1.Connection,
  86. ...options,
  87. maxPoolSize: options.maxPoolSize ?? 100,
  88. minPoolSize: options.minPoolSize ?? 0,
  89. maxConnecting: options.maxConnecting ?? 2,
  90. maxIdleTimeMS: options.maxIdleTimeMS ?? 0,
  91. waitQueueTimeoutMS: options.waitQueueTimeoutMS ?? 0,
  92. minPoolSizeCheckFrequencyMS: options.minPoolSizeCheckFrequencyMS ?? 100,
  93. autoEncrypter: options.autoEncrypter
  94. });
  95. if (this.options.minPoolSize > this.options.maxPoolSize) {
  96. throw new error_1.MongoInvalidArgumentError('Connection pool minimum size must not be greater than maximum pool size');
  97. }
  98. this.poolState = exports.PoolState.paused;
  99. this.server = server;
  100. this.connections = new utils_1.List();
  101. this.pending = 0;
  102. this.checkedOut = new Set();
  103. this.minPoolSizeTimer = undefined;
  104. this.generation = 0;
  105. this.serviceGenerations = new Map();
  106. this.connectionCounter = (0, utils_1.makeCounter)(1);
  107. this.cancellationToken = new mongo_types_1.CancellationToken();
  108. this.cancellationToken.setMaxListeners(Infinity);
  109. this.waitQueue = new utils_1.List();
  110. this.metrics = new metrics_1.ConnectionPoolMetrics();
  111. this.processingWaitQueue = false;
  112. this.mongoLogger = this.server.topology.client?.mongoLogger;
  113. this.component = 'connection';
  114. process.nextTick(() => {
  115. this.emitAndLog(ConnectionPool.CONNECTION_POOL_CREATED, new connection_pool_events_1.ConnectionPoolCreatedEvent(this));
  116. });
  117. }
  118. /** The address of the endpoint the pool is connected to */
  119. get address() {
  120. return this.options.hostAddress.toString();
  121. }
  122. /**
  123. * Check if the pool has been closed
  124. *
  125. * TODO(NODE-3263): We can remove this property once shell no longer needs it
  126. */
  127. get closed() {
  128. return this.poolState === exports.PoolState.closed;
  129. }
  130. /** An integer expressing how many total connections (available + pending + in use) the pool currently has */
  131. get totalConnectionCount() {
  132. return (this.availableConnectionCount + this.pendingConnectionCount + this.currentCheckedOutCount);
  133. }
  134. /** An integer expressing how many connections are currently available in the pool. */
  135. get availableConnectionCount() {
  136. return this.connections.length;
  137. }
  138. get pendingConnectionCount() {
  139. return this.pending;
  140. }
  141. get currentCheckedOutCount() {
  142. return this.checkedOut.size;
  143. }
  144. get waitQueueSize() {
  145. return this.waitQueue.length;
  146. }
  147. get loadBalanced() {
  148. return this.options.loadBalanced;
  149. }
  150. get serverError() {
  151. return this.server.description.error;
  152. }
  153. /**
  154. * This is exposed ONLY for use in mongosh, to enable
  155. * killing all connections if a user quits the shell with
  156. * operations in progress.
  157. *
  158. * This property may be removed as a part of NODE-3263.
  159. */
  160. get checkedOutConnections() {
  161. return this.checkedOut;
  162. }
  163. /**
  164. * Get the metrics information for the pool when a wait queue timeout occurs.
  165. */
  166. waitQueueErrorMetrics() {
  167. return this.metrics.info(this.options.maxPoolSize);
  168. }
  169. /**
  170. * Set the pool state to "ready"
  171. */
  172. ready() {
  173. if (this.poolState !== exports.PoolState.paused) {
  174. return;
  175. }
  176. this.poolState = exports.PoolState.ready;
  177. this.emitAndLog(ConnectionPool.CONNECTION_POOL_READY, new connection_pool_events_1.ConnectionPoolReadyEvent(this));
  178. (0, timers_1.clearTimeout)(this.minPoolSizeTimer);
  179. this.ensureMinPoolSize();
  180. }
  181. /**
  182. * Check a connection out of this pool. The connection will continue to be tracked, but no reference to it
  183. * will be held by the pool. This means that if a connection is checked out it MUST be checked back in or
  184. * explicitly destroyed by the new owner.
  185. */
  186. async checkOut(options) {
  187. const checkoutTime = (0, utils_1.now)();
  188. this.emitAndLog(ConnectionPool.CONNECTION_CHECK_OUT_STARTED, new connection_pool_events_1.ConnectionCheckOutStartedEvent(this));
  189. const { promise, resolve, reject } = (0, utils_1.promiseWithResolvers)();
  190. const timeout = options.timeoutContext.connectionCheckoutTimeout;
  191. const waitQueueMember = {
  192. resolve,
  193. reject,
  194. cancelled: false,
  195. checkoutTime
  196. };
  197. const abortListener = (0, utils_1.addAbortListener)(options.signal, function () {
  198. waitQueueMember.cancelled = true;
  199. reject(this.reason);
  200. });
  201. this.waitQueue.push(waitQueueMember);
  202. process.nextTick(() => this.processWaitQueue());
  203. try {
  204. timeout?.throwIfExpired();
  205. return await (timeout ? Promise.race([promise, timeout]) : promise);
  206. }
  207. catch (error) {
  208. if (timeout_1.TimeoutError.is(error)) {
  209. timeout?.clear();
  210. waitQueueMember.cancelled = true;
  211. this.emitAndLog(ConnectionPool.CONNECTION_CHECK_OUT_FAILED, new connection_pool_events_1.ConnectionCheckOutFailedEvent(this, 'timeout', waitQueueMember.checkoutTime));
  212. const timeoutError = new errors_1.WaitQueueTimeoutError(this.loadBalanced
  213. ? this.waitQueueErrorMetrics()
  214. : 'Timed out while checking out a connection from connection pool', this.address);
  215. if (options.timeoutContext.csotEnabled()) {
  216. throw new error_1.MongoOperationTimeoutError('Timed out during connection checkout', {
  217. cause: timeoutError
  218. });
  219. }
  220. throw timeoutError;
  221. }
  222. throw error;
  223. }
  224. finally {
  225. abortListener?.[utils_1.kDispose]();
  226. timeout?.clear();
  227. }
  228. }
  229. /**
  230. * Check a connection into the pool.
  231. *
  232. * @param connection - The connection to check in
  233. */
  234. checkIn(connection) {
  235. if (!this.checkedOut.has(connection)) {
  236. return;
  237. }
  238. const poolClosed = this.closed;
  239. const stale = this.connectionIsStale(connection);
  240. const willDestroy = !!(poolClosed || stale || connection.closed);
  241. if (!willDestroy) {
  242. connection.markAvailable();
  243. this.connections.unshift(connection);
  244. }
  245. this.checkedOut.delete(connection);
  246. this.emitAndLog(ConnectionPool.CONNECTION_CHECKED_IN, new connection_pool_events_1.ConnectionCheckedInEvent(this, connection));
  247. if (willDestroy) {
  248. const reason = connection.closed ? 'error' : poolClosed ? 'poolClosed' : 'stale';
  249. this.destroyConnection(connection, reason);
  250. }
  251. process.nextTick(() => this.processWaitQueue());
  252. }
  253. /**
  254. * Clear the pool
  255. *
  256. * Pool reset is handled by incrementing the pool's generation count. Any existing connection of a
  257. * previous generation will eventually be pruned during subsequent checkouts.
  258. */
  259. clear(options = {}) {
  260. if (this.closed) {
  261. return;
  262. }
  263. // handle load balanced case
  264. if (this.loadBalanced) {
  265. const { serviceId } = options;
  266. if (!serviceId) {
  267. throw new error_1.MongoRuntimeError('ConnectionPool.clear() called in load balanced mode with no serviceId.');
  268. }
  269. const sid = serviceId.toHexString();
  270. const generation = this.serviceGenerations.get(sid);
  271. // Only need to worry if the generation exists, since it should
  272. // always be there but typescript needs the check.
  273. if (generation == null) {
  274. throw new error_1.MongoRuntimeError('Service generations are required in load balancer mode.');
  275. }
  276. else {
  277. // Increment the generation for the service id.
  278. this.serviceGenerations.set(sid, generation + 1);
  279. }
  280. this.emitAndLog(ConnectionPool.CONNECTION_POOL_CLEARED, new connection_pool_events_1.ConnectionPoolClearedEvent(this, { serviceId }));
  281. return;
  282. }
  283. // handle non load-balanced case
  284. const interruptInUseConnections = options.interruptInUseConnections ?? false;
  285. const oldGeneration = this.generation;
  286. this.generation += 1;
  287. const alreadyPaused = this.poolState === exports.PoolState.paused;
  288. this.poolState = exports.PoolState.paused;
  289. this.clearMinPoolSizeTimer();
  290. if (!alreadyPaused) {
  291. this.emitAndLog(ConnectionPool.CONNECTION_POOL_CLEARED, new connection_pool_events_1.ConnectionPoolClearedEvent(this, {
  292. interruptInUseConnections
  293. }));
  294. }
  295. if (interruptInUseConnections) {
  296. process.nextTick(() => this.interruptInUseConnections(oldGeneration));
  297. }
  298. this.processWaitQueue();
  299. }
  300. /**
  301. * Closes all stale in-use connections in the pool with a resumable PoolClearedOnNetworkError.
  302. *
  303. * Only connections where `connection.generation <= minGeneration` are killed.
  304. */
  305. interruptInUseConnections(minGeneration) {
  306. for (const connection of this.checkedOut) {
  307. if (connection.generation <= minGeneration) {
  308. connection.onError(new errors_1.PoolClearedOnNetworkError(this));
  309. }
  310. }
  311. }
  312. /** For MongoClient.close() procedures */
  313. closeCheckedOutConnections() {
  314. for (const conn of this.checkedOut) {
  315. conn.onError(new error_1.MongoClientClosedError());
  316. }
  317. }
  318. /** Close the pool */
  319. close() {
  320. if (this.closed) {
  321. return;
  322. }
  323. // immediately cancel any in-flight connections
  324. this.cancellationToken.emit('cancel');
  325. // end the connection counter
  326. if (typeof this.connectionCounter.return === 'function') {
  327. this.connectionCounter.return(undefined);
  328. }
  329. this.poolState = exports.PoolState.closed;
  330. this.clearMinPoolSizeTimer();
  331. this.processWaitQueue();
  332. for (const conn of this.connections) {
  333. this.emitAndLog(ConnectionPool.CONNECTION_CLOSED, new connection_pool_events_1.ConnectionClosedEvent(this, conn, 'poolClosed'));
  334. conn.destroy();
  335. }
  336. this.connections.clear();
  337. this.emitAndLog(ConnectionPool.CONNECTION_POOL_CLOSED, new connection_pool_events_1.ConnectionPoolClosedEvent(this));
  338. }
  339. /**
  340. * @internal
  341. * Reauthenticate a connection
  342. */
  343. async reauthenticate(connection) {
  344. const authContext = connection.authContext;
  345. if (!authContext) {
  346. throw new error_1.MongoRuntimeError('No auth context found on connection.');
  347. }
  348. const credentials = authContext.credentials;
  349. if (!credentials) {
  350. throw new error_1.MongoMissingCredentialsError('Connection is missing credentials when asked to reauthenticate');
  351. }
  352. const resolvedCredentials = credentials.resolveAuthMechanism(connection.hello);
  353. const provider = this.server.topology.client.s.authProviders.getOrCreateProvider(resolvedCredentials.mechanism, resolvedCredentials.mechanismProperties);
  354. if (!provider) {
  355. throw new error_1.MongoMissingCredentialsError(`Reauthenticate failed due to no auth provider for ${credentials.mechanism}`);
  356. }
  357. await provider.reauth(authContext);
  358. return;
  359. }
  360. /** Clear the min pool size timer */
  361. clearMinPoolSizeTimer() {
  362. const minPoolSizeTimer = this.minPoolSizeTimer;
  363. if (minPoolSizeTimer) {
  364. (0, timers_1.clearTimeout)(minPoolSizeTimer);
  365. }
  366. }
  367. destroyConnection(connection, reason) {
  368. this.emitAndLog(ConnectionPool.CONNECTION_CLOSED, new connection_pool_events_1.ConnectionClosedEvent(this, connection, reason));
  369. // destroy the connection
  370. connection.destroy();
  371. }
  372. connectionIsStale(connection) {
  373. const serviceId = connection.serviceId;
  374. if (this.loadBalanced && serviceId) {
  375. const sid = serviceId.toHexString();
  376. const generation = this.serviceGenerations.get(sid);
  377. return connection.generation !== generation;
  378. }
  379. return connection.generation !== this.generation;
  380. }
  381. connectionIsIdle(connection) {
  382. return !!(this.options.maxIdleTimeMS && connection.idleTime > this.options.maxIdleTimeMS);
  383. }
  384. /**
  385. * Destroys a connection if the connection is perished.
  386. *
  387. * @returns `true` if the connection was destroyed, `false` otherwise.
  388. */
  389. destroyConnectionIfPerished(connection) {
  390. const isStale = this.connectionIsStale(connection);
  391. const isIdle = this.connectionIsIdle(connection);
  392. if (!isStale && !isIdle && !connection.closed) {
  393. return false;
  394. }
  395. const reason = connection.closed ? 'error' : isStale ? 'stale' : 'idle';
  396. this.destroyConnection(connection, reason);
  397. return true;
  398. }
  399. createConnection(callback) {
  400. // Note that metadata may have changed on the client but have
  401. // been frozen here, so we pull the metadata promise always from the client
  402. // no matter what options were set at the construction of the pool.
  403. const connectOptions = {
  404. ...this.options,
  405. id: this.connectionCounter.next().value,
  406. generation: this.generation,
  407. cancellationToken: this.cancellationToken,
  408. mongoLogger: this.mongoLogger,
  409. authProviders: this.server.topology.client.s.authProviders,
  410. metadata: this.server.topology.client.options.metadata
  411. };
  412. this.pending++;
  413. // This is our version of a "virtual" no-I/O connection as the spec requires
  414. const connectionCreatedTime = (0, utils_1.now)();
  415. this.emitAndLog(ConnectionPool.CONNECTION_CREATED, new connection_pool_events_1.ConnectionCreatedEvent(this, { id: connectOptions.id }));
  416. (0, connect_1.connect)(connectOptions).then(connection => {
  417. // The pool might have closed since we started trying to create a connection
  418. if (this.poolState !== exports.PoolState.ready) {
  419. this.pending--;
  420. connection.destroy();
  421. callback(this.closed ? new errors_1.PoolClosedError(this) : new errors_1.PoolClearedError(this));
  422. return;
  423. }
  424. // forward all events from the connection to the pool
  425. for (const event of [...constants_1.APM_EVENTS, connection_1.Connection.CLUSTER_TIME_RECEIVED]) {
  426. connection.on(event, (e) => this.emit(event, e));
  427. }
  428. if (this.loadBalanced) {
  429. connection.on(connection_1.Connection.PINNED, pinType => this.metrics.markPinned(pinType));
  430. connection.on(connection_1.Connection.UNPINNED, pinType => this.metrics.markUnpinned(pinType));
  431. const serviceId = connection.serviceId;
  432. if (serviceId) {
  433. let generation;
  434. const sid = serviceId.toHexString();
  435. if ((generation = this.serviceGenerations.get(sid))) {
  436. connection.generation = generation;
  437. }
  438. else {
  439. this.serviceGenerations.set(sid, 0);
  440. connection.generation = 0;
  441. }
  442. }
  443. }
  444. connection.markAvailable();
  445. this.emitAndLog(ConnectionPool.CONNECTION_READY, new connection_pool_events_1.ConnectionReadyEvent(this, connection, connectionCreatedTime));
  446. this.pending--;
  447. callback(undefined, connection);
  448. }, error => {
  449. this.pending--;
  450. this.server.handleError(error);
  451. this.emitAndLog(ConnectionPool.CONNECTION_CLOSED, new connection_pool_events_1.ConnectionClosedEvent(this, { id: connectOptions.id, serviceId: undefined }, 'error',
  452. // TODO(NODE-5192): Remove this cast
  453. error));
  454. if (error instanceof error_1.MongoNetworkError || error instanceof error_1.MongoServerError) {
  455. error.connectionGeneration = connectOptions.generation;
  456. }
  457. callback(error ?? new error_1.MongoRuntimeError('Connection creation failed without error'));
  458. });
  459. }
  460. ensureMinPoolSize() {
  461. const minPoolSize = this.options.minPoolSize;
  462. if (this.poolState !== exports.PoolState.ready) {
  463. return;
  464. }
  465. this.connections.prune(connection => this.destroyConnectionIfPerished(connection));
  466. if (this.totalConnectionCount < minPoolSize &&
  467. this.pendingConnectionCount < this.options.maxConnecting) {
  468. // NOTE: ensureMinPoolSize should not try to get all the pending
  469. // connection permits because that potentially delays the availability of
  470. // the connection to a checkout request
  471. this.createConnection((err, connection) => {
  472. if (!err && connection) {
  473. this.connections.push(connection);
  474. process.nextTick(() => this.processWaitQueue());
  475. }
  476. if (this.poolState === exports.PoolState.ready) {
  477. (0, timers_1.clearTimeout)(this.minPoolSizeTimer);
  478. this.minPoolSizeTimer = (0, timers_1.setTimeout)(() => this.ensureMinPoolSize(), this.options.minPoolSizeCheckFrequencyMS);
  479. }
  480. });
  481. }
  482. else {
  483. (0, timers_1.clearTimeout)(this.minPoolSizeTimer);
  484. this.minPoolSizeTimer = (0, timers_1.setTimeout)(() => this.ensureMinPoolSize(), this.options.minPoolSizeCheckFrequencyMS);
  485. }
  486. }
  487. processWaitQueue() {
  488. if (this.processingWaitQueue) {
  489. return;
  490. }
  491. this.processingWaitQueue = true;
  492. while (this.waitQueueSize) {
  493. const waitQueueMember = this.waitQueue.first();
  494. if (!waitQueueMember) {
  495. this.waitQueue.shift();
  496. continue;
  497. }
  498. if (waitQueueMember.cancelled) {
  499. this.waitQueue.shift();
  500. continue;
  501. }
  502. if (this.poolState !== exports.PoolState.ready) {
  503. const reason = this.closed ? 'poolClosed' : 'connectionError';
  504. const error = this.closed ? new errors_1.PoolClosedError(this) : new errors_1.PoolClearedError(this);
  505. this.emitAndLog(ConnectionPool.CONNECTION_CHECK_OUT_FAILED, new connection_pool_events_1.ConnectionCheckOutFailedEvent(this, reason, waitQueueMember.checkoutTime, error));
  506. this.waitQueue.shift();
  507. waitQueueMember.reject(error);
  508. continue;
  509. }
  510. if (!this.availableConnectionCount) {
  511. break;
  512. }
  513. const connection = this.connections.shift();
  514. if (!connection) {
  515. break;
  516. }
  517. if (!this.destroyConnectionIfPerished(connection)) {
  518. this.checkedOut.add(connection);
  519. this.emitAndLog(ConnectionPool.CONNECTION_CHECKED_OUT, new connection_pool_events_1.ConnectionCheckedOutEvent(this, connection, waitQueueMember.checkoutTime));
  520. this.waitQueue.shift();
  521. waitQueueMember.resolve(connection);
  522. }
  523. }
  524. const { maxPoolSize, maxConnecting } = this.options;
  525. while (this.waitQueueSize > 0 &&
  526. this.pendingConnectionCount < maxConnecting &&
  527. (maxPoolSize === 0 || this.totalConnectionCount < maxPoolSize)) {
  528. const waitQueueMember = this.waitQueue.shift();
  529. if (!waitQueueMember || waitQueueMember.cancelled) {
  530. continue;
  531. }
  532. this.createConnection((err, connection) => {
  533. if (waitQueueMember.cancelled) {
  534. if (!err && connection) {
  535. this.connections.push(connection);
  536. }
  537. }
  538. else {
  539. if (err) {
  540. this.emitAndLog(ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
  541. // TODO(NODE-5192): Remove this cast
  542. new connection_pool_events_1.ConnectionCheckOutFailedEvent(this, 'connectionError', waitQueueMember.checkoutTime, err));
  543. waitQueueMember.reject(err);
  544. }
  545. else if (connection) {
  546. this.checkedOut.add(connection);
  547. this.emitAndLog(ConnectionPool.CONNECTION_CHECKED_OUT, new connection_pool_events_1.ConnectionCheckedOutEvent(this, connection, waitQueueMember.checkoutTime));
  548. waitQueueMember.resolve(connection);
  549. }
  550. }
  551. process.nextTick(() => this.processWaitQueue());
  552. });
  553. }
  554. this.processingWaitQueue = false;
  555. }
  556. }
  557. exports.ConnectionPool = ConnectionPool;
  558. //# sourceMappingURL=connection_pool.js.map