monitor.js 21 KB


  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.RTTSampler = exports.MonitorInterval = exports.RTTPinger = exports.Monitor = exports.ServerMonitoringMode = void 0;
  4. const timers_1 = require("timers");
  5. const bson_1 = require("../bson");
  6. const connect_1 = require("../cmap/connect");
  7. const client_metadata_1 = require("../cmap/handshake/client_metadata");
  8. const constants_1 = require("../constants");
  9. const error_1 = require("../error");
  10. const mongo_logger_1 = require("../mongo_logger");
  11. const mongo_types_1 = require("../mongo_types");
  12. const utils_1 = require("../utils");
  13. const common_1 = require("./common");
  14. const events_1 = require("./events");
  15. const server_1 = require("./server");
  16. const STATE_IDLE = 'idle';
  17. const STATE_MONITORING = 'monitoring';
  18. const stateTransition = (0, utils_1.makeStateMachine)({
  19. [common_1.STATE_CLOSING]: [common_1.STATE_CLOSING, STATE_IDLE, common_1.STATE_CLOSED],
  20. [common_1.STATE_CLOSED]: [common_1.STATE_CLOSED, STATE_MONITORING],
  21. [STATE_IDLE]: [STATE_IDLE, STATE_MONITORING, common_1.STATE_CLOSING],
  22. [STATE_MONITORING]: [STATE_MONITORING, STATE_IDLE, common_1.STATE_CLOSING]
  23. });
  24. const INVALID_REQUEST_CHECK_STATES = new Set([common_1.STATE_CLOSING, common_1.STATE_CLOSED, STATE_MONITORING]);
  25. function isInCloseState(monitor) {
  26. return monitor.s.state === common_1.STATE_CLOSED || monitor.s.state === common_1.STATE_CLOSING;
  27. }
  28. /** @public */
  29. exports.ServerMonitoringMode = Object.freeze({
  30. auto: 'auto',
  31. poll: 'poll',
  32. stream: 'stream'
  33. });
  34. /** @internal */
  35. class Monitor extends mongo_types_1.TypedEventEmitter {
  36. constructor(server, options) {
  37. super();
  38. /** @internal */
  39. this.component = mongo_logger_1.MongoLoggableComponent.TOPOLOGY;
  40. this.on('error', utils_1.noop);
  41. this.server = server;
  42. this.connection = null;
  43. this.cancellationToken = new mongo_types_1.CancellationToken();
  44. this.cancellationToken.setMaxListeners(Infinity);
  45. this.monitorId = undefined;
  46. this.s = {
  47. state: common_1.STATE_CLOSED
  48. };
  49. this.address = server.description.address;
  50. this.options = Object.freeze({
  51. connectTimeoutMS: options.connectTimeoutMS ?? 10000,
  52. heartbeatFrequencyMS: options.heartbeatFrequencyMS ?? 10000,
  53. minHeartbeatFrequencyMS: options.minHeartbeatFrequencyMS ?? 500,
  54. serverMonitoringMode: options.serverMonitoringMode
  55. });
  56. this.isRunningInFaasEnv = (0, client_metadata_1.getFAASEnv)() != null;
  57. this.mongoLogger = this.server.topology.client?.mongoLogger;
  58. this.rttSampler = new RTTSampler(10);
  59. const cancellationToken = this.cancellationToken;
  60. // TODO: refactor this to pull it directly from the pool, requires new ConnectionPool integration
  61. const connectOptions = {
  62. id: '<monitor>',
  63. generation: server.pool.generation,
  64. cancellationToken,
  65. hostAddress: server.description.hostAddress,
  66. ...options,
  67. // force BSON serialization options
  68. raw: false,
  69. useBigInt64: false,
  70. promoteLongs: true,
  71. promoteValues: true,
  72. promoteBuffers: true
  73. };
  74. // ensure no authentication is used for monitoring
  75. delete connectOptions.credentials;
  76. if (connectOptions.autoEncrypter) {
  77. delete connectOptions.autoEncrypter;
  78. }
  79. this.connectOptions = Object.freeze(connectOptions);
  80. }
  81. connect() {
  82. if (this.s.state !== common_1.STATE_CLOSED) {
  83. return;
  84. }
  85. // start
  86. const heartbeatFrequencyMS = this.options.heartbeatFrequencyMS;
  87. const minHeartbeatFrequencyMS = this.options.minHeartbeatFrequencyMS;
  88. this.monitorId = new MonitorInterval(monitorServer(this), {
  89. heartbeatFrequencyMS: heartbeatFrequencyMS,
  90. minHeartbeatFrequencyMS: minHeartbeatFrequencyMS,
  91. immediate: true
  92. });
  93. }
  94. requestCheck() {
  95. if (INVALID_REQUEST_CHECK_STATES.has(this.s.state)) {
  96. return;
  97. }
  98. this.monitorId?.wake();
  99. }
  100. reset() {
  101. const topologyVersion = this.server.description.topologyVersion;
  102. if (isInCloseState(this) || topologyVersion == null) {
  103. return;
  104. }
  105. stateTransition(this, common_1.STATE_CLOSING);
  106. resetMonitorState(this);
  107. // restart monitor
  108. stateTransition(this, STATE_IDLE);
  109. // restart monitoring
  110. const heartbeatFrequencyMS = this.options.heartbeatFrequencyMS;
  111. const minHeartbeatFrequencyMS = this.options.minHeartbeatFrequencyMS;
  112. this.monitorId = new MonitorInterval(monitorServer(this), {
  113. heartbeatFrequencyMS: heartbeatFrequencyMS,
  114. minHeartbeatFrequencyMS: minHeartbeatFrequencyMS
  115. });
  116. }
  117. close() {
  118. if (isInCloseState(this)) {
  119. return;
  120. }
  121. stateTransition(this, common_1.STATE_CLOSING);
  122. resetMonitorState(this);
  123. // close monitor
  124. this.emit('close');
  125. stateTransition(this, common_1.STATE_CLOSED);
  126. }
  127. get roundTripTime() {
  128. return this.rttSampler.average();
  129. }
  130. get minRoundTripTime() {
  131. return this.rttSampler.min();
  132. }
  133. get latestRtt() {
  134. return this.rttSampler.last;
  135. }
  136. addRttSample(rtt) {
  137. this.rttSampler.addSample(rtt);
  138. }
  139. clearRttSamples() {
  140. this.rttSampler.clear();
  141. }
  142. }
  143. exports.Monitor = Monitor;
  144. function resetMonitorState(monitor) {
  145. monitor.monitorId?.stop();
  146. monitor.monitorId = undefined;
  147. monitor.rttPinger?.close();
  148. monitor.rttPinger = undefined;
  149. monitor.cancellationToken.emit('cancel');
  150. monitor.connection?.destroy();
  151. monitor.connection = null;
  152. monitor.clearRttSamples();
  153. }
  154. function useStreamingProtocol(monitor, topologyVersion) {
  155. // If we have no topology version we always poll no matter
  156. // what the user provided, since the server does not support
  157. // the streaming protocol.
  158. if (topologyVersion == null)
  159. return false;
  160. const serverMonitoringMode = monitor.options.serverMonitoringMode;
  161. if (serverMonitoringMode === exports.ServerMonitoringMode.poll)
  162. return false;
  163. if (serverMonitoringMode === exports.ServerMonitoringMode.stream)
  164. return true;
  165. // If we are in auto mode, we need to figure out if we're in a FaaS
  166. // environment or not and choose the appropriate mode.
  167. if (monitor.isRunningInFaasEnv)
  168. return false;
  169. return true;
  170. }
  171. function checkServer(monitor, callback) {
  172. let start;
  173. let awaited;
  174. const topologyVersion = monitor.server.description.topologyVersion;
  175. const isAwaitable = useStreamingProtocol(monitor, topologyVersion);
  176. monitor.emitAndLogHeartbeat(server_1.Server.SERVER_HEARTBEAT_STARTED, monitor.server.topology.s.id, undefined, new events_1.ServerHeartbeatStartedEvent(monitor.address, isAwaitable));
  177. function onHeartbeatFailed(err) {
  178. monitor.connection?.destroy();
  179. monitor.connection = null;
  180. monitor.emitAndLogHeartbeat(server_1.Server.SERVER_HEARTBEAT_FAILED, monitor.server.topology.s.id, undefined, new events_1.ServerHeartbeatFailedEvent(monitor.address, (0, utils_1.calculateDurationInMs)(start), err, awaited));
  181. const error = !(err instanceof error_1.MongoError)
  182. ? new error_1.MongoError(error_1.MongoError.buildErrorMessage(err), { cause: err })
  183. : err;
  184. error.addErrorLabel(error_1.MongoErrorLabel.ResetPool);
  185. if (error instanceof error_1.MongoNetworkTimeoutError) {
  186. error.addErrorLabel(error_1.MongoErrorLabel.InterruptInUseConnections);
  187. }
  188. monitor.emit('resetServer', error);
  189. callback(err);
  190. }
  191. function onHeartbeatSucceeded(hello) {
  192. if (!('isWritablePrimary' in hello)) {
  193. // Provide hello-style response document.
  194. hello.isWritablePrimary = hello[constants_1.LEGACY_HELLO_COMMAND];
  195. }
  196. // NOTE: here we use the latestRtt as this measurement corresponds with the value
  197. // obtained for this successful heartbeat, if there is no latestRtt, then we calculate the
  198. // duration
  199. const duration = isAwaitable && monitor.rttPinger
  200. ? (monitor.rttPinger.latestRtt ?? (0, utils_1.calculateDurationInMs)(start))
  201. : (0, utils_1.calculateDurationInMs)(start);
  202. monitor.addRttSample(duration);
  203. monitor.emitAndLogHeartbeat(server_1.Server.SERVER_HEARTBEAT_SUCCEEDED, monitor.server.topology.s.id, hello.connectionId, new events_1.ServerHeartbeatSucceededEvent(monitor.address, duration, hello, isAwaitable));
  204. if (isAwaitable) {
  205. // If we are using the streaming protocol then we immediately issue another 'started'
  206. // event, otherwise the "check" is complete and return to the main monitor loop
  207. monitor.emitAndLogHeartbeat(server_1.Server.SERVER_HEARTBEAT_STARTED, monitor.server.topology.s.id, undefined, new events_1.ServerHeartbeatStartedEvent(monitor.address, true));
  208. // We have not actually sent an outgoing handshake, but when we get the next response we
  209. // want the duration to reflect the time since we last heard from the server
  210. start = (0, utils_1.now)();
  211. }
  212. else {
  213. monitor.rttPinger?.close();
  214. monitor.rttPinger = undefined;
  215. callback(undefined, hello);
  216. }
  217. }
  218. const { connection } = monitor;
  219. if (connection && !connection.closed) {
  220. const { serverApi, helloOk } = connection;
  221. const connectTimeoutMS = monitor.options.connectTimeoutMS;
  222. const maxAwaitTimeMS = monitor.options.heartbeatFrequencyMS;
  223. const cmd = {
  224. [serverApi?.version || helloOk ? 'hello' : constants_1.LEGACY_HELLO_COMMAND]: 1,
  225. ...(isAwaitable && topologyVersion
  226. ? { maxAwaitTimeMS, topologyVersion: makeTopologyVersion(topologyVersion) }
  227. : {})
  228. };
  229. const options = isAwaitable
  230. ? {
  231. socketTimeoutMS: connectTimeoutMS ? connectTimeoutMS + maxAwaitTimeMS : 0,
  232. exhaustAllowed: true
  233. }
  234. : { socketTimeoutMS: connectTimeoutMS };
  235. if (isAwaitable && monitor.rttPinger == null) {
  236. monitor.rttPinger = new RTTPinger(monitor);
  237. }
  238. // Record new start time before sending handshake
  239. start = (0, utils_1.now)();
  240. if (isAwaitable) {
  241. awaited = true;
  242. return connection.exhaustCommand((0, utils_1.ns)('admin.$cmd'), cmd, options, (error, hello) => {
  243. if (error)
  244. return onHeartbeatFailed(error);
  245. return onHeartbeatSucceeded(hello);
  246. });
  247. }
  248. awaited = false;
  249. connection
  250. .command((0, utils_1.ns)('admin.$cmd'), cmd, options)
  251. .then(onHeartbeatSucceeded, onHeartbeatFailed);
  252. return;
  253. }
  254. // connecting does an implicit `hello`
  255. (async () => {
  256. const socket = await (0, connect_1.makeSocket)(monitor.connectOptions);
  257. const connection = (0, connect_1.makeConnection)(monitor.connectOptions, socket);
  258. // The start time is after socket creation but before the handshake
  259. start = (0, utils_1.now)();
  260. try {
  261. await (0, connect_1.performInitialHandshake)(connection, monitor.connectOptions);
  262. return connection;
  263. }
  264. catch (error) {
  265. connection.destroy();
  266. throw error;
  267. }
  268. })().then(connection => {
  269. if (isInCloseState(monitor)) {
  270. connection.destroy();
  271. return;
  272. }
  273. const duration = (0, utils_1.calculateDurationInMs)(start);
  274. monitor.addRttSample(duration);
  275. monitor.connection = connection;
  276. monitor.emitAndLogHeartbeat(server_1.Server.SERVER_HEARTBEAT_SUCCEEDED, monitor.server.topology.s.id, connection.hello?.connectionId, new events_1.ServerHeartbeatSucceededEvent(monitor.address, duration, connection.hello, useStreamingProtocol(monitor, connection.hello?.topologyVersion)));
  277. callback(undefined, connection.hello);
  278. }, error => {
  279. monitor.connection = null;
  280. awaited = false;
  281. onHeartbeatFailed(error);
  282. });
  283. }
  284. function monitorServer(monitor) {
  285. return (callback) => {
  286. if (monitor.s.state === STATE_MONITORING) {
  287. process.nextTick(callback);
  288. return;
  289. }
  290. stateTransition(monitor, STATE_MONITORING);
  291. function done() {
  292. if (!isInCloseState(monitor)) {
  293. stateTransition(monitor, STATE_IDLE);
  294. }
  295. callback();
  296. }
  297. checkServer(monitor, (err, hello) => {
  298. if (err) {
  299. // otherwise an error occurred on initial discovery, also bail
  300. if (monitor.server.description.type === common_1.ServerType.Unknown) {
  301. return done();
  302. }
  303. }
  304. // if the check indicates streaming is supported, immediately reschedule monitoring
  305. if (useStreamingProtocol(monitor, hello?.topologyVersion)) {
  306. (0, timers_1.setTimeout)(() => {
  307. if (!isInCloseState(monitor)) {
  308. monitor.monitorId?.wake();
  309. }
  310. }, 0);
  311. }
  312. done();
  313. });
  314. };
  315. }
  316. function makeTopologyVersion(tv) {
  317. return {
  318. processId: tv.processId,
  319. // tests mock counter as just number, but in a real situation counter should always be a Long
  320. // TODO(NODE-2674): Preserve int64 sent from MongoDB
  321. counter: bson_1.Long.isLong(tv.counter) ? tv.counter : bson_1.Long.fromNumber(tv.counter)
  322. };
  323. }
  324. /** @internal */
  325. class RTTPinger {
  326. constructor(monitor) {
  327. this.connection = undefined;
  328. this.cancellationToken = monitor.cancellationToken;
  329. this.closed = false;
  330. this.monitor = monitor;
  331. this.latestRtt = monitor.latestRtt ?? undefined;
  332. const heartbeatFrequencyMS = monitor.options.heartbeatFrequencyMS;
  333. this.monitorId = (0, timers_1.setTimeout)(() => this.measureRoundTripTime(), heartbeatFrequencyMS);
  334. }
  335. get roundTripTime() {
  336. return this.monitor.roundTripTime;
  337. }
  338. get minRoundTripTime() {
  339. return this.monitor.minRoundTripTime;
  340. }
  341. close() {
  342. this.closed = true;
  343. (0, timers_1.clearTimeout)(this.monitorId);
  344. this.connection?.destroy();
  345. this.connection = undefined;
  346. }
  347. measureAndReschedule(start, conn) {
  348. if (this.closed) {
  349. conn?.destroy();
  350. return;
  351. }
  352. if (this.connection == null) {
  353. this.connection = conn;
  354. }
  355. this.latestRtt = (0, utils_1.calculateDurationInMs)(start);
  356. this.monitorId = (0, timers_1.setTimeout)(() => this.measureRoundTripTime(), this.monitor.options.heartbeatFrequencyMS);
  357. }
  358. measureRoundTripTime() {
  359. const start = (0, utils_1.now)();
  360. if (this.closed) {
  361. return;
  362. }
  363. const connection = this.connection;
  364. if (connection == null) {
  365. (0, connect_1.connect)(this.monitor.connectOptions).then(connection => {
  366. this.measureAndReschedule(start, connection);
  367. }, () => {
  368. this.connection = undefined;
  369. });
  370. return;
  371. }
  372. const commandName = connection.serverApi?.version || connection.helloOk ? 'hello' : constants_1.LEGACY_HELLO_COMMAND;
  373. connection.command((0, utils_1.ns)('admin.$cmd'), { [commandName]: 1 }, undefined).then(() => this.measureAndReschedule(start), () => {
  374. this.connection?.destroy();
  375. this.connection = undefined;
  376. return;
  377. });
  378. }
  379. }
  380. exports.RTTPinger = RTTPinger;
  381. /**
  382. * @internal
  383. */
  384. class MonitorInterval {
  385. constructor(fn, options = {}) {
  386. this.isExpeditedCallToFnScheduled = false;
  387. this.stopped = false;
  388. this.isExecutionInProgress = false;
  389. this.hasExecutedOnce = false;
  390. this._executeAndReschedule = () => {
  391. if (this.stopped)
  392. return;
  393. if (this.timerId) {
  394. (0, timers_1.clearTimeout)(this.timerId);
  395. }
  396. this.isExpeditedCallToFnScheduled = false;
  397. this.isExecutionInProgress = true;
  398. this.fn(() => {
  399. this.lastExecutionEnded = (0, utils_1.now)();
  400. this.isExecutionInProgress = false;
  401. this._reschedule(this.heartbeatFrequencyMS);
  402. });
  403. };
  404. this.fn = fn;
  405. this.lastExecutionEnded = -Infinity;
  406. this.heartbeatFrequencyMS = options.heartbeatFrequencyMS ?? 1000;
  407. this.minHeartbeatFrequencyMS = options.minHeartbeatFrequencyMS ?? 500;
  408. if (options.immediate) {
  409. this._executeAndReschedule();
  410. }
  411. else {
  412. this._reschedule(undefined);
  413. }
  414. }
  415. wake() {
  416. const currentTime = (0, utils_1.now)();
  417. const timeSinceLastCall = currentTime - this.lastExecutionEnded;
  418. // TODO(NODE-4674): Add error handling and logging to the monitor
  419. if (timeSinceLastCall < 0) {
  420. return this._executeAndReschedule();
  421. }
  422. if (this.isExecutionInProgress) {
  423. return;
  424. }
  425. // debounce multiple calls to wake within the `minInterval`
  426. if (this.isExpeditedCallToFnScheduled) {
  427. return;
  428. }
  429. // reschedule a call as soon as possible, ensuring the call never happens
  430. // faster than the `minInterval`
  431. if (timeSinceLastCall < this.minHeartbeatFrequencyMS) {
  432. this.isExpeditedCallToFnScheduled = true;
  433. this._reschedule(this.minHeartbeatFrequencyMS - timeSinceLastCall);
  434. return;
  435. }
  436. this._executeAndReschedule();
  437. }
  438. stop() {
  439. this.stopped = true;
  440. if (this.timerId) {
  441. (0, timers_1.clearTimeout)(this.timerId);
  442. this.timerId = undefined;
  443. }
  444. this.lastExecutionEnded = -Infinity;
  445. this.isExpeditedCallToFnScheduled = false;
  446. }
  447. toString() {
  448. return JSON.stringify(this);
  449. }
  450. toJSON() {
  451. const currentTime = (0, utils_1.now)();
  452. const timeSinceLastCall = currentTime - this.lastExecutionEnded;
  453. return {
  454. timerId: this.timerId != null ? 'set' : 'cleared',
  455. lastCallTime: this.lastExecutionEnded,
  456. isExpeditedCheckScheduled: this.isExpeditedCallToFnScheduled,
  457. stopped: this.stopped,
  458. heartbeatFrequencyMS: this.heartbeatFrequencyMS,
  459. minHeartbeatFrequencyMS: this.minHeartbeatFrequencyMS,
  460. currentTime,
  461. timeSinceLastCall
  462. };
  463. }
  464. _reschedule(ms) {
  465. if (this.stopped)
  466. return;
  467. if (this.timerId) {
  468. (0, timers_1.clearTimeout)(this.timerId);
  469. }
  470. this.timerId = (0, timers_1.setTimeout)(this._executeAndReschedule, ms || this.heartbeatFrequencyMS);
  471. }
  472. }
  473. exports.MonitorInterval = MonitorInterval;
  474. /** @internal
  475. * This class implements the RTT sampling logic specified for [CSOT](https://github.com/mongodb/specifications/blob/bbb335e60cd7ea1e0f7cd9a9443cb95fc9d3b64d/source/client-side-operations-timeout/client-side-operations-timeout.md#drivers-use-minimum-rtt-to-short-circuit-operations)
  476. *
  477. * This is implemented as a [circular buffer](https://en.wikipedia.org/wiki/Circular_buffer) keeping
  478. * the most recent `windowSize` samples
  479. * */
  480. class RTTSampler {
  481. constructor(windowSize = 10) {
  482. this.rttSamples = new Float64Array(windowSize);
  483. this.length = 0;
  484. this.writeIndex = 0;
  485. }
  486. /**
  487. * Adds an rtt sample to the end of the circular buffer
  488. * When `windowSize` samples have been collected, `addSample` overwrites the least recently added
  489. * sample
  490. */
  491. addSample(sample) {
  492. this.rttSamples[this.writeIndex++] = sample;
  493. if (this.length < this.rttSamples.length) {
  494. this.length++;
  495. }
  496. this.writeIndex %= this.rttSamples.length;
  497. }
  498. /**
  499. * When \< 2 samples have been collected, returns 0
  500. * Otherwise computes the minimum value samples contained in the buffer
  501. */
  502. min() {
  503. if (this.length < 2)
  504. return 0;
  505. let min = this.rttSamples[0];
  506. for (let i = 1; i < this.length; i++) {
  507. if (this.rttSamples[i] < min)
  508. min = this.rttSamples[i];
  509. }
  510. return min;
  511. }
  512. /**
  513. * Returns mean of samples contained in the buffer
  514. */
  515. average() {
  516. if (this.length === 0)
  517. return 0;
  518. let sum = 0;
  519. for (let i = 0; i < this.length; i++) {
  520. sum += this.rttSamples[i];
  521. }
  522. return sum / this.length;
  523. }
  524. /**
  525. * Returns most recently inserted element in the buffer
  526. * Returns null if the buffer is empty
  527. * */
  528. get last() {
  529. if (this.length === 0)
  530. return null;
  531. return this.rttSamples[this.writeIndex === 0 ? this.length - 1 : this.writeIndex - 1];
  532. }
  533. /**
  534. * Clear the buffer
  535. * NOTE: this does not overwrite the data held in the internal array, just the pointers into
  536. * this array
  537. */
  538. clear() {
  539. this.length = 0;
  540. this.writeIndex = 0;
  541. }
  542. }
  543. exports.RTTSampler = RTTSampler;
  544. //# sourceMappingURL=monitor.js.map