server_selection.js 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.MIN_SECONDARY_WRITE_WIRE_VERSION = void 0;
  4. exports.writableServerSelector = writableServerSelector;
  5. exports.sameServerSelector = sameServerSelector;
  6. exports.secondaryWritableServerSelector = secondaryWritableServerSelector;
  7. exports.readPreferenceServerSelector = readPreferenceServerSelector;
  8. const error_1 = require("../error");
  9. const read_preference_1 = require("../read_preference");
  10. const common_1 = require("./common");
  11. // max staleness constants
  12. const IDLE_WRITE_PERIOD = 10000;
  13. const SMALLEST_MAX_STALENESS_SECONDS = 90;
  14. // Minimum version to try writes on secondaries.
  15. exports.MIN_SECONDARY_WRITE_WIRE_VERSION = 13;
  16. /**
  17. * Returns a server selector that selects for writable servers
  18. */
  19. function writableServerSelector() {
  20. return function writableServer(topologyDescription, servers) {
  21. return latencyWindowReducer(topologyDescription, servers.filter((s) => s.isWritable));
  22. };
  23. }
  24. /**
  25. * The purpose of this selector is to select the same server, only
  26. * if it is in a state that it can have commands sent to it.
  27. */
  28. function sameServerSelector(description) {
  29. return function sameServerSelector(topologyDescription, servers) {
  30. if (!description)
  31. return [];
  32. // Filter the servers to match the provided description only if
  33. // the type is not unknown.
  34. return servers.filter(sd => {
  35. return sd.address === description.address && sd.type !== common_1.ServerType.Unknown;
  36. });
  37. };
  38. }
  39. /**
  40. * Returns a server selector that uses a read preference to select a
  41. * server potentially for a write on a secondary.
  42. */
  43. function secondaryWritableServerSelector(wireVersion, readPreference) {
  44. // If server version < 5.0, read preference always primary.
  45. // If server version >= 5.0...
  46. // - If read preference is supplied, use that.
  47. // - If no read preference is supplied, use primary.
  48. if (!readPreference ||
  49. !wireVersion ||
  50. (wireVersion && wireVersion < exports.MIN_SECONDARY_WRITE_WIRE_VERSION)) {
  51. return readPreferenceServerSelector(read_preference_1.ReadPreference.primary);
  52. }
  53. return readPreferenceServerSelector(readPreference);
  54. }
  55. /**
  56. * Reduces the passed in array of servers by the rules of the "Max Staleness" specification
  57. * found here:
  58. *
  59. * @see https://github.com/mongodb/specifications/blob/master/source/max-staleness/max-staleness.md
  60. *
  61. * @param readPreference - The read preference providing max staleness guidance
  62. * @param topologyDescription - The topology description
  63. * @param servers - The list of server descriptions to be reduced
  64. * @returns The list of servers that satisfy the requirements of max staleness
  65. */
  66. function maxStalenessReducer(readPreference, topologyDescription, servers) {
  67. if (readPreference.maxStalenessSeconds == null || readPreference.maxStalenessSeconds < 0) {
  68. return servers;
  69. }
  70. const maxStaleness = readPreference.maxStalenessSeconds;
  71. const maxStalenessVariance = (topologyDescription.heartbeatFrequencyMS + IDLE_WRITE_PERIOD) / 1000;
  72. if (maxStaleness < maxStalenessVariance) {
  73. throw new error_1.MongoInvalidArgumentError(`Option "maxStalenessSeconds" must be at least ${maxStalenessVariance} seconds`);
  74. }
  75. if (maxStaleness < SMALLEST_MAX_STALENESS_SECONDS) {
  76. throw new error_1.MongoInvalidArgumentError(`Option "maxStalenessSeconds" must be at least ${SMALLEST_MAX_STALENESS_SECONDS} seconds`);
  77. }
  78. if (topologyDescription.type === common_1.TopologyType.ReplicaSetWithPrimary) {
  79. const primary = Array.from(topologyDescription.servers.values()).filter(primaryFilter)[0];
  80. return servers.reduce((result, server) => {
  81. const stalenessMS = server.lastUpdateTime -
  82. server.lastWriteDate -
  83. (primary.lastUpdateTime - primary.lastWriteDate) +
  84. topologyDescription.heartbeatFrequencyMS;
  85. const staleness = stalenessMS / 1000;
  86. const maxStalenessSeconds = readPreference.maxStalenessSeconds ?? 0;
  87. if (staleness <= maxStalenessSeconds) {
  88. result.push(server);
  89. }
  90. return result;
  91. }, []);
  92. }
  93. if (topologyDescription.type === common_1.TopologyType.ReplicaSetNoPrimary) {
  94. if (servers.length === 0) {
  95. return servers;
  96. }
  97. const sMax = servers.reduce((max, s) => s.lastWriteDate > max.lastWriteDate ? s : max);
  98. return servers.reduce((result, server) => {
  99. const stalenessMS = sMax.lastWriteDate - server.lastWriteDate + topologyDescription.heartbeatFrequencyMS;
  100. const staleness = stalenessMS / 1000;
  101. const maxStalenessSeconds = readPreference.maxStalenessSeconds ?? 0;
  102. if (staleness <= maxStalenessSeconds) {
  103. result.push(server);
  104. }
  105. return result;
  106. }, []);
  107. }
  108. return servers;
  109. }
  110. /**
  111. * Determines whether a server's tags match a given set of tags
  112. *
  113. * @param tagSet - The requested tag set to match
  114. * @param serverTags - The server's tags
  115. */
  116. function tagSetMatch(tagSet, serverTags) {
  117. const keys = Object.keys(tagSet);
  118. const serverTagKeys = Object.keys(serverTags);
  119. for (let i = 0; i < keys.length; ++i) {
  120. const key = keys[i];
  121. if (serverTagKeys.indexOf(key) === -1 || serverTags[key] !== tagSet[key]) {
  122. return false;
  123. }
  124. }
  125. return true;
  126. }
  127. /**
  128. * Reduces a set of server descriptions based on tags requested by the read preference
  129. *
  130. * @param readPreference - The read preference providing the requested tags
  131. * @param servers - The list of server descriptions to reduce
  132. * @returns The list of servers matching the requested tags
  133. */
  134. function tagSetReducer(readPreference, servers) {
  135. if (readPreference.tags == null ||
  136. (Array.isArray(readPreference.tags) && readPreference.tags.length === 0)) {
  137. return servers;
  138. }
  139. for (let i = 0; i < readPreference.tags.length; ++i) {
  140. const tagSet = readPreference.tags[i];
  141. const serversMatchingTagset = servers.reduce((matched, server) => {
  142. if (tagSetMatch(tagSet, server.tags))
  143. matched.push(server);
  144. return matched;
  145. }, []);
  146. if (serversMatchingTagset.length) {
  147. return serversMatchingTagset;
  148. }
  149. }
  150. return [];
  151. }
  152. /**
  153. * Reduces a list of servers to ensure they fall within an acceptable latency window. This is
  154. * further specified in the "Server Selection" specification, found here:
  155. *
  156. * @see https://github.com/mongodb/specifications/blob/master/source/server-selection/server-selection.md
  157. *
  158. * @param topologyDescription - The topology description
  159. * @param servers - The list of servers to reduce
  160. * @returns The servers which fall within an acceptable latency window
  161. */
  162. function latencyWindowReducer(topologyDescription, servers) {
  163. const low = servers.reduce((min, server) => Math.min(server.roundTripTime, min), Infinity);
  164. const high = low + topologyDescription.localThresholdMS;
  165. return servers.reduce((result, server) => {
  166. if (server.roundTripTime <= high && server.roundTripTime >= low)
  167. result.push(server);
  168. return result;
  169. }, []);
  170. }
  171. // filters
  172. function primaryFilter(server) {
  173. return server.type === common_1.ServerType.RSPrimary;
  174. }
  175. function secondaryFilter(server) {
  176. return server.type === common_1.ServerType.RSSecondary;
  177. }
  178. function nearestFilter(server) {
  179. return server.type === common_1.ServerType.RSSecondary || server.type === common_1.ServerType.RSPrimary;
  180. }
  181. function knownFilter(server) {
  182. return server.type !== common_1.ServerType.Unknown;
  183. }
  184. function loadBalancerFilter(server) {
  185. return server.type === common_1.ServerType.LoadBalancer;
  186. }
  187. /**
  188. * Returns a function which selects servers based on a provided read preference
  189. *
  190. * @param readPreference - The read preference to select with
  191. */
  192. function readPreferenceServerSelector(readPreference) {
  193. if (!readPreference.isValid()) {
  194. throw new error_1.MongoInvalidArgumentError('Invalid read preference specified');
  195. }
  196. return function readPreferenceServers(topologyDescription, servers, deprioritized = []) {
  197. if (topologyDescription.type === common_1.TopologyType.LoadBalanced) {
  198. return servers.filter(loadBalancerFilter);
  199. }
  200. if (topologyDescription.type === common_1.TopologyType.Unknown) {
  201. return [];
  202. }
  203. if (topologyDescription.type === common_1.TopologyType.Single) {
  204. return latencyWindowReducer(topologyDescription, servers.filter(knownFilter));
  205. }
  206. if (topologyDescription.type === common_1.TopologyType.Sharded) {
  207. const filtered = servers.filter(server => {
  208. return !deprioritized.includes(server);
  209. });
  210. const selectable = filtered.length > 0 ? filtered : deprioritized;
  211. return latencyWindowReducer(topologyDescription, selectable.filter(knownFilter));
  212. }
  213. const mode = readPreference.mode;
  214. if (mode === read_preference_1.ReadPreference.PRIMARY) {
  215. return servers.filter(primaryFilter);
  216. }
  217. if (mode === read_preference_1.ReadPreference.PRIMARY_PREFERRED) {
  218. const result = servers.filter(primaryFilter);
  219. if (result.length) {
  220. return result;
  221. }
  222. }
  223. const filter = mode === read_preference_1.ReadPreference.NEAREST ? nearestFilter : secondaryFilter;
  224. const selectedServers = latencyWindowReducer(topologyDescription, tagSetReducer(readPreference, maxStalenessReducer(readPreference, topologyDescription, servers.filter(filter))));
  225. if (mode === read_preference_1.ReadPreference.SECONDARY_PREFERRED && selectedServers.length === 0) {
  226. return servers.filter(primaryFilter);
  227. }
  228. return selectedServers;
  229. };
  230. }
  231. //# sourceMappingURL=server_selection.js.map