topology.js 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.Topology = void 0;
  4. const connection_string_1 = require("../connection_string");
  5. const constants_1 = require("../constants");
  6. const error_1 = require("../error");
  7. const mongo_logger_1 = require("../mongo_logger");
  8. const mongo_types_1 = require("../mongo_types");
  9. const read_preference_1 = require("../read_preference");
  10. const timeout_1 = require("../timeout");
  11. const utils_1 = require("../utils");
  12. const common_1 = require("./common");
  13. const events_1 = require("./events");
  14. const server_1 = require("./server");
  15. const server_description_1 = require("./server_description");
  16. const server_selection_1 = require("./server_selection");
  17. const server_selection_events_1 = require("./server_selection_events");
  18. const srv_polling_1 = require("./srv_polling");
  19. const topology_description_1 = require("./topology_description");
  20. // Global state
  21. let globalTopologyCounter = 0;
  22. const stateTransition = (0, utils_1.makeStateMachine)({
  23. [common_1.STATE_CLOSED]: [common_1.STATE_CLOSED, common_1.STATE_CONNECTING],
  24. [common_1.STATE_CONNECTING]: [common_1.STATE_CONNECTING, common_1.STATE_CLOSING, common_1.STATE_CONNECTED, common_1.STATE_CLOSED],
  25. [common_1.STATE_CONNECTED]: [common_1.STATE_CONNECTED, common_1.STATE_CLOSING, common_1.STATE_CLOSED],
  26. [common_1.STATE_CLOSING]: [common_1.STATE_CLOSING, common_1.STATE_CLOSED]
  27. });
  28. /**
  29. * A container of server instances representing a connection to a MongoDB topology.
  30. * @internal
  31. */
  32. class Topology extends mongo_types_1.TypedEventEmitter {
  33. /** @event */
  34. static { this.SERVER_OPENING = constants_1.SERVER_OPENING; }
  35. /** @event */
  36. static { this.SERVER_CLOSED = constants_1.SERVER_CLOSED; }
  37. /** @event */
  38. static { this.SERVER_DESCRIPTION_CHANGED = constants_1.SERVER_DESCRIPTION_CHANGED; }
  39. /** @event */
  40. static { this.TOPOLOGY_OPENING = constants_1.TOPOLOGY_OPENING; }
  41. /** @event */
  42. static { this.TOPOLOGY_CLOSED = constants_1.TOPOLOGY_CLOSED; }
  43. /** @event */
  44. static { this.TOPOLOGY_DESCRIPTION_CHANGED = constants_1.TOPOLOGY_DESCRIPTION_CHANGED; }
  45. /** @event */
  46. static { this.ERROR = constants_1.ERROR; }
  47. /** @event */
  48. static { this.OPEN = constants_1.OPEN; }
  49. /** @event */
  50. static { this.CONNECT = constants_1.CONNECT; }
  51. /** @event */
  52. static { this.CLOSE = constants_1.CLOSE; }
  53. /** @event */
  54. static { this.TIMEOUT = constants_1.TIMEOUT; }
  55. /**
  56. * @param seedlist - a list of HostAddress instances to connect to
  57. */
  58. constructor(client, seeds, options) {
  59. super();
  60. this.on('error', utils_1.noop);
  61. this.client = client;
  62. // Options should only be undefined in tests, MongoClient will always have defined options
  63. options = options ?? {
  64. hosts: [utils_1.HostAddress.fromString('localhost:27017')],
  65. ...Object.fromEntries(connection_string_1.DEFAULT_OPTIONS.entries())
  66. };
  67. if (typeof seeds === 'string') {
  68. seeds = [utils_1.HostAddress.fromString(seeds)];
  69. }
  70. else if (!Array.isArray(seeds)) {
  71. seeds = [seeds];
  72. }
  73. const seedlist = [];
  74. for (const seed of seeds) {
  75. if (typeof seed === 'string') {
  76. seedlist.push(utils_1.HostAddress.fromString(seed));
  77. }
  78. else if (seed instanceof utils_1.HostAddress) {
  79. seedlist.push(seed);
  80. }
  81. else {
  82. // FIXME(NODE-3483): May need to be a MongoParseError
  83. throw new error_1.MongoRuntimeError(`Topology cannot be constructed from ${JSON.stringify(seed)}`);
  84. }
  85. }
  86. const topologyType = topologyTypeFromOptions(options);
  87. const topologyId = globalTopologyCounter++;
  88. const selectedHosts = options.srvMaxHosts == null ||
  89. options.srvMaxHosts === 0 ||
  90. options.srvMaxHosts >= seedlist.length
  91. ? seedlist
  92. : (0, utils_1.shuffle)(seedlist, options.srvMaxHosts);
  93. const serverDescriptions = new Map();
  94. for (const hostAddress of selectedHosts) {
  95. serverDescriptions.set(hostAddress.toString(), new server_description_1.ServerDescription(hostAddress));
  96. }
  97. this.waitQueue = new utils_1.List();
  98. this.s = {
  99. // the id of this topology
  100. id: topologyId,
  101. // passed in options
  102. options,
  103. // initial seedlist of servers to connect to
  104. seedlist,
  105. // initial state
  106. state: common_1.STATE_CLOSED,
  107. // the topology description
  108. description: new topology_description_1.TopologyDescription(topologyType, serverDescriptions, options.replicaSet, undefined, undefined, undefined, options),
  109. serverSelectionTimeoutMS: options.serverSelectionTimeoutMS,
  110. heartbeatFrequencyMS: options.heartbeatFrequencyMS,
  111. minHeartbeatFrequencyMS: options.minHeartbeatFrequencyMS,
  112. // a map of server instances to normalized addresses
  113. servers: new Map(),
  114. credentials: options?.credentials,
  115. clusterTime: undefined,
  116. detectShardedTopology: ev => this.detectShardedTopology(ev),
  117. detectSrvRecords: ev => this.detectSrvRecords(ev)
  118. };
  119. this.mongoLogger = client.mongoLogger;
  120. this.component = 'topology';
  121. if (options.srvHost && !options.loadBalanced) {
  122. this.s.srvPoller =
  123. options.srvPoller ??
  124. new srv_polling_1.SrvPoller({
  125. heartbeatFrequencyMS: this.s.heartbeatFrequencyMS,
  126. srvHost: options.srvHost,
  127. srvMaxHosts: options.srvMaxHosts,
  128. srvServiceName: options.srvServiceName
  129. });
  130. this.on(Topology.TOPOLOGY_DESCRIPTION_CHANGED, this.s.detectShardedTopology);
  131. }
  132. this.connectionLock = undefined;
  133. }
  134. detectShardedTopology(event) {
  135. const previousType = event.previousDescription.type;
  136. const newType = event.newDescription.type;
  137. const transitionToSharded = previousType !== common_1.TopologyType.Sharded && newType === common_1.TopologyType.Sharded;
  138. const srvListeners = this.s.srvPoller?.listeners(srv_polling_1.SrvPoller.SRV_RECORD_DISCOVERY);
  139. const listeningToSrvPolling = !!srvListeners?.includes(this.s.detectSrvRecords);
  140. if (transitionToSharded && !listeningToSrvPolling) {
  141. this.s.srvPoller?.on(srv_polling_1.SrvPoller.SRV_RECORD_DISCOVERY, this.s.detectSrvRecords);
  142. this.s.srvPoller?.start();
  143. }
  144. }
  145. detectSrvRecords(ev) {
  146. const previousTopologyDescription = this.s.description;
  147. this.s.description = this.s.description.updateFromSrvPollingEvent(ev, this.s.options.srvMaxHosts);
  148. if (this.s.description === previousTopologyDescription) {
  149. // Nothing changed, so return
  150. return;
  151. }
  152. updateServers(this);
  153. this.emitAndLog(Topology.TOPOLOGY_DESCRIPTION_CHANGED, new events_1.TopologyDescriptionChangedEvent(this.s.id, previousTopologyDescription, this.s.description));
  154. }
  155. /**
  156. * @returns A `TopologyDescription` for this topology
  157. */
  158. get description() {
  159. return this.s.description;
  160. }
  161. get loadBalanced() {
  162. return this.s.options.loadBalanced;
  163. }
  164. get serverApi() {
  165. return this.s.options.serverApi;
  166. }
  167. /** Initiate server connect */
  168. async connect(options) {
  169. this.connectionLock ??= this._connect(options);
  170. try {
  171. await this.connectionLock;
  172. return this;
  173. }
  174. finally {
  175. this.connectionLock = undefined;
  176. }
  177. }
  178. async _connect(options) {
  179. options = options ?? {};
  180. if (this.s.state === common_1.STATE_CONNECTED) {
  181. return this;
  182. }
  183. stateTransition(this, common_1.STATE_CONNECTING);
  184. // emit SDAM monitoring events
  185. this.emitAndLog(Topology.TOPOLOGY_OPENING, new events_1.TopologyOpeningEvent(this.s.id));
  186. // emit an event for the topology change
  187. this.emitAndLog(Topology.TOPOLOGY_DESCRIPTION_CHANGED, new events_1.TopologyDescriptionChangedEvent(this.s.id, new topology_description_1.TopologyDescription(common_1.TopologyType.Unknown), // initial is always Unknown
  188. this.s.description));
  189. // connect all known servers, then attempt server selection to connect
  190. const serverDescriptions = Array.from(this.s.description.servers.values());
  191. this.s.servers = new Map(serverDescriptions.map(serverDescription => [
  192. serverDescription.address,
  193. createAndConnectServer(this, serverDescription)
  194. ]));
  195. // In load balancer mode we need to fake a server description getting
  196. // emitted from the monitor, since the monitor doesn't exist.
  197. if (this.s.options.loadBalanced) {
  198. for (const description of serverDescriptions) {
  199. const newDescription = new server_description_1.ServerDescription(description.hostAddress, undefined, {
  200. loadBalanced: this.s.options.loadBalanced
  201. });
  202. this.serverUpdateHandler(newDescription);
  203. }
  204. }
  205. const serverSelectionTimeoutMS = this.client.s.options.serverSelectionTimeoutMS;
  206. const readPreference = options.readPreference ?? read_preference_1.ReadPreference.primary;
  207. const timeoutContext = timeout_1.TimeoutContext.create({
  208. // TODO(NODE-6448): auto-connect ignores timeoutMS; potential future feature
  209. timeoutMS: undefined,
  210. serverSelectionTimeoutMS,
  211. waitQueueTimeoutMS: this.client.s.options.waitQueueTimeoutMS
  212. });
  213. const selectServerOptions = {
  214. operationName: 'handshake',
  215. ...options,
  216. timeoutContext
  217. };
  218. try {
  219. const server = await this.selectServer((0, server_selection_1.readPreferenceServerSelector)(readPreference), selectServerOptions);
  220. const skipPingOnConnect = this.s.options.__skipPingOnConnect === true;
  221. if (!skipPingOnConnect) {
  222. const connection = await server.pool.checkOut({ timeoutContext: timeoutContext });
  223. server.pool.checkIn(connection);
  224. stateTransition(this, common_1.STATE_CONNECTED);
  225. this.emit(Topology.OPEN, this);
  226. this.emit(Topology.CONNECT, this);
  227. return this;
  228. }
  229. stateTransition(this, common_1.STATE_CONNECTED);
  230. this.emit(Topology.OPEN, this);
  231. this.emit(Topology.CONNECT, this);
  232. return this;
  233. }
  234. catch (error) {
  235. this.close();
  236. throw error;
  237. }
  238. }
  239. closeCheckedOutConnections() {
  240. for (const server of this.s.servers.values()) {
  241. return server.closeCheckedOutConnections();
  242. }
  243. }
  244. /** Close this topology */
  245. close() {
  246. if (this.s.state === common_1.STATE_CLOSED || this.s.state === common_1.STATE_CLOSING) {
  247. return;
  248. }
  249. for (const server of this.s.servers.values()) {
  250. closeServer(server, this);
  251. }
  252. this.s.servers.clear();
  253. stateTransition(this, common_1.STATE_CLOSING);
  254. drainWaitQueue(this.waitQueue, new error_1.MongoTopologyClosedError());
  255. if (this.s.srvPoller) {
  256. this.s.srvPoller.stop();
  257. this.s.srvPoller.removeListener(srv_polling_1.SrvPoller.SRV_RECORD_DISCOVERY, this.s.detectSrvRecords);
  258. }
  259. this.removeListener(Topology.TOPOLOGY_DESCRIPTION_CHANGED, this.s.detectShardedTopology);
  260. stateTransition(this, common_1.STATE_CLOSED);
  261. // emit an event for close
  262. this.emitAndLog(Topology.TOPOLOGY_CLOSED, new events_1.TopologyClosedEvent(this.s.id));
  263. }
  264. /**
  265. * Selects a server according to the selection predicate provided
  266. *
  267. * @param selector - An optional selector to select servers by, defaults to a random selection within a latency window
  268. * @param options - Optional settings related to server selection
  269. * @param callback - The callback used to indicate success or failure
  270. * @returns An instance of a `Server` meeting the criteria of the predicate provided
  271. */
  272. async selectServer(selector, options) {
  273. let serverSelector;
  274. if (typeof selector !== 'function') {
  275. if (typeof selector === 'string') {
  276. serverSelector = (0, server_selection_1.readPreferenceServerSelector)(read_preference_1.ReadPreference.fromString(selector));
  277. }
  278. else {
  279. let readPreference;
  280. if (selector instanceof read_preference_1.ReadPreference) {
  281. readPreference = selector;
  282. }
  283. else {
  284. read_preference_1.ReadPreference.translate(options);
  285. readPreference = options.readPreference || read_preference_1.ReadPreference.primary;
  286. }
  287. serverSelector = (0, server_selection_1.readPreferenceServerSelector)(readPreference);
  288. }
  289. }
  290. else {
  291. serverSelector = selector;
  292. }
  293. options = { serverSelectionTimeoutMS: this.s.serverSelectionTimeoutMS, ...options };
  294. if (this.client.mongoLogger?.willLog(mongo_logger_1.MongoLoggableComponent.SERVER_SELECTION, mongo_logger_1.SeverityLevel.DEBUG)) {
  295. this.client.mongoLogger?.debug(mongo_logger_1.MongoLoggableComponent.SERVER_SELECTION, new server_selection_events_1.ServerSelectionStartedEvent(selector, this.description, options.operationName));
  296. }
  297. let timeout;
  298. if (options.timeoutContext)
  299. timeout = options.timeoutContext.serverSelectionTimeout;
  300. else {
  301. timeout = timeout_1.Timeout.expires(options.serverSelectionTimeoutMS ?? 0);
  302. }
  303. const isSharded = this.description.type === common_1.TopologyType.Sharded;
  304. const session = options.session;
  305. const transaction = session && session.transaction;
  306. if (isSharded && transaction && transaction.server) {
  307. if (this.client.mongoLogger?.willLog(mongo_logger_1.MongoLoggableComponent.SERVER_SELECTION, mongo_logger_1.SeverityLevel.DEBUG)) {
  308. this.client.mongoLogger?.debug(mongo_logger_1.MongoLoggableComponent.SERVER_SELECTION, new server_selection_events_1.ServerSelectionSucceededEvent(selector, this.description, transaction.server.pool.address, options.operationName));
  309. }
  310. if (options.timeoutContext?.clearServerSelectionTimeout)
  311. timeout?.clear();
  312. return transaction.server;
  313. }
  314. const { promise: serverPromise, resolve, reject } = (0, utils_1.promiseWithResolvers)();
  315. const waitQueueMember = {
  316. serverSelector,
  317. topologyDescription: this.description,
  318. mongoLogger: this.client.mongoLogger,
  319. transaction,
  320. resolve,
  321. reject,
  322. cancelled: false,
  323. startTime: (0, utils_1.now)(),
  324. operationName: options.operationName,
  325. waitingLogged: false,
  326. previousServer: options.previousServer
  327. };
  328. const abortListener = (0, utils_1.addAbortListener)(options.signal, function () {
  329. waitQueueMember.cancelled = true;
  330. reject(this.reason);
  331. });
  332. this.waitQueue.push(waitQueueMember);
  333. processWaitQueue(this);
  334. try {
  335. timeout?.throwIfExpired();
  336. const server = await (timeout ? Promise.race([serverPromise, timeout]) : serverPromise);
  337. if (options.timeoutContext?.csotEnabled() && server.description.minRoundTripTime !== 0) {
  338. options.timeoutContext.minRoundTripTime = server.description.minRoundTripTime;
  339. }
  340. return server;
  341. }
  342. catch (error) {
  343. if (timeout_1.TimeoutError.is(error)) {
  344. // Timeout
  345. waitQueueMember.cancelled = true;
  346. const timeoutError = new error_1.MongoServerSelectionError(`Server selection timed out after ${timeout?.duration} ms`, this.description);
  347. if (this.client.mongoLogger?.willLog(mongo_logger_1.MongoLoggableComponent.SERVER_SELECTION, mongo_logger_1.SeverityLevel.DEBUG)) {
  348. this.client.mongoLogger?.debug(mongo_logger_1.MongoLoggableComponent.SERVER_SELECTION, new server_selection_events_1.ServerSelectionFailedEvent(selector, this.description, timeoutError, options.operationName));
  349. }
  350. if (options.timeoutContext?.csotEnabled()) {
  351. throw new error_1.MongoOperationTimeoutError('Timed out during server selection', {
  352. cause: timeoutError
  353. });
  354. }
  355. throw timeoutError;
  356. }
  357. // Other server selection error
  358. throw error;
  359. }
  360. finally {
  361. abortListener?.[utils_1.kDispose]();
  362. if (options.timeoutContext?.clearServerSelectionTimeout)
  363. timeout?.clear();
  364. }
  365. }
  366. /**
  367. * Update the internal TopologyDescription with a ServerDescription
  368. *
  369. * @param serverDescription - The server to update in the internal list of server descriptions
  370. */
  371. serverUpdateHandler(serverDescription) {
  372. if (!this.s.description.hasServer(serverDescription.address)) {
  373. return;
  374. }
  375. // ignore this server update if its from an outdated topologyVersion
  376. if (isStaleServerDescription(this.s.description, serverDescription)) {
  377. return;
  378. }
  379. // these will be used for monitoring events later
  380. const previousTopologyDescription = this.s.description;
  381. const previousServerDescription = this.s.description.servers.get(serverDescription.address);
  382. if (!previousServerDescription) {
  383. return;
  384. }
  385. // Driver Sessions Spec: "Whenever a driver receives a cluster time from
  386. // a server it MUST compare it to the current highest seen cluster time
  387. // for the deployment. If the new cluster time is higher than the
  388. // highest seen cluster time it MUST become the new highest seen cluster
  389. // time. Two cluster times are compared using only the BsonTimestamp
  390. // value of the clusterTime embedded field."
  391. const clusterTime = serverDescription.$clusterTime;
  392. if (clusterTime) {
  393. (0, common_1._advanceClusterTime)(this, clusterTime);
  394. }
  395. // If we already know all the information contained in this updated description, then
  396. // we don't need to emit SDAM events, but still need to update the description, in order
  397. // to keep client-tracked attributes like last update time and round trip time up to date
  398. const equalDescriptions = previousServerDescription && previousServerDescription.equals(serverDescription);
  399. // first update the TopologyDescription
  400. this.s.description = this.s.description.update(serverDescription);
  401. if (this.s.description.compatibilityError) {
  402. this.emit(Topology.ERROR, new error_1.MongoCompatibilityError(this.s.description.compatibilityError));
  403. return;
  404. }
  405. // emit monitoring events for this change
  406. if (!equalDescriptions) {
  407. const newDescription = this.s.description.servers.get(serverDescription.address);
  408. if (newDescription) {
  409. this.emit(Topology.SERVER_DESCRIPTION_CHANGED, new events_1.ServerDescriptionChangedEvent(this.s.id, serverDescription.address, previousServerDescription, newDescription));
  410. }
  411. }
  412. // update server list from updated descriptions
  413. updateServers(this, serverDescription);
  414. // attempt to resolve any outstanding server selection attempts
  415. if (this.waitQueue.length > 0) {
  416. processWaitQueue(this);
  417. }
  418. if (!equalDescriptions) {
  419. this.emitAndLog(Topology.TOPOLOGY_DESCRIPTION_CHANGED, new events_1.TopologyDescriptionChangedEvent(this.s.id, previousTopologyDescription, this.s.description));
  420. }
  421. }
  422. auth(credentials, callback) {
  423. if (typeof credentials === 'function')
  424. ((callback = credentials), (credentials = undefined));
  425. if (typeof callback === 'function')
  426. callback(undefined, true);
  427. }
  428. isConnected() {
  429. return this.s.state === common_1.STATE_CONNECTED;
  430. }
  431. isDestroyed() {
  432. return this.s.state === common_1.STATE_CLOSED;
  433. }
  434. // NOTE: There are many places in code where we explicitly check the last hello
  435. // to do feature support detection. This should be done any other way, but for
  436. // now we will just return the first hello seen, which should suffice.
  437. lastHello() {
  438. const serverDescriptions = Array.from(this.description.servers.values());
  439. if (serverDescriptions.length === 0)
  440. return {};
  441. const sd = serverDescriptions.filter((sd) => sd.type !== common_1.ServerType.Unknown)[0];
  442. const result = sd || { maxWireVersion: this.description.commonWireVersion };
  443. return result;
  444. }
  445. get commonWireVersion() {
  446. return this.description.commonWireVersion;
  447. }
  448. get logicalSessionTimeoutMinutes() {
  449. return this.description.logicalSessionTimeoutMinutes;
  450. }
  451. get clusterTime() {
  452. return this.s.clusterTime;
  453. }
  454. set clusterTime(clusterTime) {
  455. this.s.clusterTime = clusterTime;
  456. }
  457. }
  458. exports.Topology = Topology;
  459. /** Destroys a server, and removes all event listeners from the instance */
  460. function closeServer(server, topology) {
  461. for (const event of constants_1.LOCAL_SERVER_EVENTS) {
  462. server.removeAllListeners(event);
  463. }
  464. server.close();
  465. topology.emitAndLog(Topology.SERVER_CLOSED, new events_1.ServerClosedEvent(topology.s.id, server.description.address));
  466. for (const event of constants_1.SERVER_RELAY_EVENTS) {
  467. server.removeAllListeners(event);
  468. }
  469. }
  470. /** Predicts the TopologyType from options */
  471. function topologyTypeFromOptions(options) {
  472. if (options?.directConnection) {
  473. return common_1.TopologyType.Single;
  474. }
  475. if (options?.replicaSet) {
  476. return common_1.TopologyType.ReplicaSetNoPrimary;
  477. }
  478. if (options?.loadBalanced) {
  479. return common_1.TopologyType.LoadBalanced;
  480. }
  481. return common_1.TopologyType.Unknown;
  482. }
  483. /**
  484. * Creates new server instances and attempts to connect them
  485. *
  486. * @param topology - The topology that this server belongs to
  487. * @param serverDescription - The description for the server to initialize and connect to
  488. */
  489. function createAndConnectServer(topology, serverDescription) {
  490. topology.emitAndLog(Topology.SERVER_OPENING, new events_1.ServerOpeningEvent(topology.s.id, serverDescription.address));
  491. const server = new server_1.Server(topology, serverDescription, topology.s.options);
  492. for (const event of constants_1.SERVER_RELAY_EVENTS) {
  493. server.on(event, (e) => topology.emit(event, e));
  494. }
  495. server.on(server_1.Server.DESCRIPTION_RECEIVED, description => topology.serverUpdateHandler(description));
  496. server.connect();
  497. return server;
  498. }
  499. /**
  500. * @param topology - Topology to update.
  501. * @param incomingServerDescription - New server description.
  502. */
  503. function updateServers(topology, incomingServerDescription) {
  504. // update the internal server's description
  505. if (incomingServerDescription && topology.s.servers.has(incomingServerDescription.address)) {
  506. const server = topology.s.servers.get(incomingServerDescription.address);
  507. if (server) {
  508. server.s.description = incomingServerDescription;
  509. if (incomingServerDescription.error instanceof error_1.MongoError &&
  510. incomingServerDescription.error.hasErrorLabel(error_1.MongoErrorLabel.ResetPool)) {
  511. const interruptInUseConnections = incomingServerDescription.error.hasErrorLabel(error_1.MongoErrorLabel.InterruptInUseConnections);
  512. server.pool.clear({ interruptInUseConnections });
  513. }
  514. else if (incomingServerDescription.error == null) {
  515. const newTopologyType = topology.s.description.type;
  516. const shouldMarkPoolReady = incomingServerDescription.isDataBearing ||
  517. (incomingServerDescription.type !== common_1.ServerType.Unknown &&
  518. newTopologyType === common_1.TopologyType.Single);
  519. if (shouldMarkPoolReady) {
  520. server.pool.ready();
  521. }
  522. }
  523. }
  524. }
  525. // add new servers for all descriptions we currently don't know about locally
  526. for (const serverDescription of topology.description.servers.values()) {
  527. if (!topology.s.servers.has(serverDescription.address)) {
  528. const server = createAndConnectServer(topology, serverDescription);
  529. topology.s.servers.set(serverDescription.address, server);
  530. }
  531. }
  532. // for all servers no longer known, remove their descriptions and destroy their instances
  533. for (const entry of topology.s.servers) {
  534. const serverAddress = entry[0];
  535. if (topology.description.hasServer(serverAddress)) {
  536. continue;
  537. }
  538. if (!topology.s.servers.has(serverAddress)) {
  539. continue;
  540. }
  541. const server = topology.s.servers.get(serverAddress);
  542. topology.s.servers.delete(serverAddress);
  543. // prepare server for garbage collection
  544. if (server) {
  545. closeServer(server, topology);
  546. }
  547. }
  548. }
  549. function drainWaitQueue(queue, drainError) {
  550. while (queue.length) {
  551. const waitQueueMember = queue.shift();
  552. if (!waitQueueMember) {
  553. continue;
  554. }
  555. if (!waitQueueMember.cancelled) {
  556. if (waitQueueMember.mongoLogger?.willLog(mongo_logger_1.MongoLoggableComponent.SERVER_SELECTION, mongo_logger_1.SeverityLevel.DEBUG)) {
  557. waitQueueMember.mongoLogger?.debug(mongo_logger_1.MongoLoggableComponent.SERVER_SELECTION, new server_selection_events_1.ServerSelectionFailedEvent(waitQueueMember.serverSelector, waitQueueMember.topologyDescription, drainError, waitQueueMember.operationName));
  558. }
  559. waitQueueMember.reject(drainError);
  560. }
  561. }
  562. }
  563. function processWaitQueue(topology) {
  564. if (topology.s.state === common_1.STATE_CLOSED) {
  565. drainWaitQueue(topology.waitQueue, new error_1.MongoTopologyClosedError());
  566. return;
  567. }
  568. const isSharded = topology.description.type === common_1.TopologyType.Sharded;
  569. const serverDescriptions = Array.from(topology.description.servers.values());
  570. const membersToProcess = topology.waitQueue.length;
  571. for (let i = 0; i < membersToProcess; ++i) {
  572. const waitQueueMember = topology.waitQueue.shift();
  573. if (!waitQueueMember) {
  574. continue;
  575. }
  576. if (waitQueueMember.cancelled) {
  577. continue;
  578. }
  579. let selectedDescriptions;
  580. try {
  581. const serverSelector = waitQueueMember.serverSelector;
  582. const previousServer = waitQueueMember.previousServer;
  583. selectedDescriptions = serverSelector
  584. ? serverSelector(topology.description, serverDescriptions, previousServer ? [previousServer] : [])
  585. : serverDescriptions;
  586. }
  587. catch (selectorError) {
  588. if (topology.client.mongoLogger?.willLog(mongo_logger_1.MongoLoggableComponent.SERVER_SELECTION, mongo_logger_1.SeverityLevel.DEBUG)) {
  589. topology.client.mongoLogger?.debug(mongo_logger_1.MongoLoggableComponent.SERVER_SELECTION, new server_selection_events_1.ServerSelectionFailedEvent(waitQueueMember.serverSelector, topology.description, selectorError, waitQueueMember.operationName));
  590. }
  591. waitQueueMember.reject(selectorError);
  592. continue;
  593. }
  594. let selectedServer;
  595. if (selectedDescriptions.length === 0) {
  596. if (!waitQueueMember.waitingLogged) {
  597. if (topology.client.mongoLogger?.willLog(mongo_logger_1.MongoLoggableComponent.SERVER_SELECTION, mongo_logger_1.SeverityLevel.INFORMATIONAL)) {
  598. topology.client.mongoLogger?.info(mongo_logger_1.MongoLoggableComponent.SERVER_SELECTION, new server_selection_events_1.WaitingForSuitableServerEvent(waitQueueMember.serverSelector, topology.description, topology.s.serverSelectionTimeoutMS !== 0
  599. ? topology.s.serverSelectionTimeoutMS - ((0, utils_1.now)() - waitQueueMember.startTime)
  600. : -1, waitQueueMember.operationName));
  601. }
  602. waitQueueMember.waitingLogged = true;
  603. }
  604. topology.waitQueue.push(waitQueueMember);
  605. continue;
  606. }
  607. else if (selectedDescriptions.length === 1) {
  608. selectedServer = topology.s.servers.get(selectedDescriptions[0].address);
  609. }
  610. else {
  611. const descriptions = (0, utils_1.shuffle)(selectedDescriptions, 2);
  612. const server1 = topology.s.servers.get(descriptions[0].address);
  613. const server2 = topology.s.servers.get(descriptions[1].address);
  614. selectedServer =
  615. server1 && server2 && server1.s.operationCount < server2.s.operationCount
  616. ? server1
  617. : server2;
  618. }
  619. if (!selectedServer) {
  620. const serverSelectionError = new error_1.MongoServerSelectionError('server selection returned a server description but the server was not found in the topology', topology.description);
  621. if (topology.client.mongoLogger?.willLog(mongo_logger_1.MongoLoggableComponent.SERVER_SELECTION, mongo_logger_1.SeverityLevel.DEBUG)) {
  622. topology.client.mongoLogger?.debug(mongo_logger_1.MongoLoggableComponent.SERVER_SELECTION, new server_selection_events_1.ServerSelectionFailedEvent(waitQueueMember.serverSelector, topology.description, serverSelectionError, waitQueueMember.operationName));
  623. }
  624. waitQueueMember.reject(serverSelectionError);
  625. return;
  626. }
  627. const transaction = waitQueueMember.transaction;
  628. if (isSharded && transaction && transaction.isActive && selectedServer) {
  629. transaction.pinServer(selectedServer);
  630. }
  631. if (topology.client.mongoLogger?.willLog(mongo_logger_1.MongoLoggableComponent.SERVER_SELECTION, mongo_logger_1.SeverityLevel.DEBUG)) {
  632. topology.client.mongoLogger?.debug(mongo_logger_1.MongoLoggableComponent.SERVER_SELECTION, new server_selection_events_1.ServerSelectionSucceededEvent(waitQueueMember.serverSelector, waitQueueMember.topologyDescription, selectedServer.pool.address, waitQueueMember.operationName));
  633. }
  634. waitQueueMember.resolve(selectedServer);
  635. }
  636. if (topology.waitQueue.length > 0) {
  637. // ensure all server monitors attempt monitoring soon
  638. for (const [, server] of topology.s.servers) {
  639. process.nextTick(function scheduleServerCheck() {
  640. return server.requestCheck();
  641. });
  642. }
  643. }
  644. }
  645. function isStaleServerDescription(topologyDescription, incomingServerDescription) {
  646. const currentServerDescription = topologyDescription.servers.get(incomingServerDescription.address);
  647. const currentTopologyVersion = currentServerDescription?.topologyVersion;
  648. return ((0, server_description_1.compareTopologyVersion)(currentTopologyVersion, incomingServerDescription.topologyVersion) > 0);
  649. }
  650. //# sourceMappingURL=topology.js.map