connection.js 15 KB


  1. /*!
  2. * Module dependencies.
  3. */
  4. 'use strict';
  5. const MongooseConnection = require('../../connection');
  6. const MongooseError = require('../../error/index');
  7. const STATES = require('../../connectionState');
  8. const mongodb = require('mongodb');
  9. const pkg = require('../../../package.json');
  10. const processConnectionOptions = require('../../helpers/processConnectionOptions');
  11. const setTimeout = require('../../helpers/timers').setTimeout;
  12. const utils = require('../../utils');
  13. const Schema = require('../../schema');
  14. /**
  15. * A [node-mongodb-native](https://github.com/mongodb/node-mongodb-native) connection implementation.
  16. *
  17. * @inherits Connection
  18. * @api private
  19. */
  20. function NativeConnection() {
  21. MongooseConnection.apply(this, arguments);
  22. this._listening = false;
  23. // Tracks the last time (as unix timestamp) the connection received a
  24. // serverHeartbeatSucceeded or serverHeartbeatFailed event from the underlying MongoClient.
  25. // If we haven't received one in a while (like due to a frozen AWS Lambda container) then
  26. // `readyState` is likely stale.
  27. this._lastHeartbeatAt = null;
  28. }
  29. /**
  30. * Expose the possible connection states.
  31. * @api public
  32. */
  33. NativeConnection.STATES = STATES;
  34. /*!
  35. * Inherits from Connection.
  36. */
  37. Object.setPrototypeOf(NativeConnection.prototype, MongooseConnection.prototype);
  38. /**
  39. * Switches to a different database using the same connection pool.
  40. *
  41. * Returns a new connection object, with the new db. If you set the `useCache`
  42. * option, `useDb()` will cache connections by `name`.
  43. *
  44. * **Note:** Calling `close()` on a `useDb()` connection will close the base connection as well.
  45. *
  46. * @param {String} name The database name
  47. * @param {Object} [options]
  48. * @param {Boolean} [options.useCache=false] If true, cache results so calling `useDb()` multiple times with the same name only creates 1 connection object.
  49. * @return {Connection} New Connection Object
  50. * @api public
  51. */
  52. NativeConnection.prototype.useDb = function(name, options) {
  53. // Return immediately if cached
  54. options = options || {};
  55. if (options.useCache && this.relatedDbs[name]) {
  56. return this.relatedDbs[name];
  57. }
  58. // we have to manually copy all of the attributes...
  59. const newConn = new this.constructor();
  60. newConn.name = name;
  61. newConn.base = this.base;
  62. newConn.collections = {};
  63. newConn.models = {};
  64. newConn.replica = this.replica;
  65. newConn.config = Object.assign({}, this.config, newConn.config);
  66. newConn.name = this.name;
  67. newConn.options = this.options;
  68. newConn._readyState = this._readyState;
  69. newConn._closeCalled = this._closeCalled;
  70. newConn._hasOpened = this._hasOpened;
  71. newConn._listening = false;
  72. newConn._parent = this;
  73. newConn.host = this.host;
  74. newConn.port = this.port;
  75. newConn.user = this.user;
  76. newConn.pass = this.pass;
  77. // First, when we create another db object, we are not guaranteed to have a
  78. // db object to work with. So, in the case where we have a db object and it
  79. // is connected, we can just proceed with setting everything up. However, if
  80. // we do not have a db or the state is not connected, then we need to wait on
  81. // the 'open' event of the connection before doing the rest of the setup
  82. // the 'connected' event is the first time we'll have access to the db object
  83. const _this = this;
  84. newConn.client = _this.client;
  85. if (this.db && this._readyState === STATES.connected) {
  86. wireup();
  87. } else {
  88. this._queue.push({ fn: wireup });
  89. }
  90. function wireup() {
  91. newConn.client = _this.client;
  92. newConn.db = _this.client.db(name);
  93. newConn._lastHeartbeatAt = _this._lastHeartbeatAt;
  94. newConn.onOpen();
  95. }
  96. newConn.name = name;
  97. // push onto the otherDbs stack, this is used when state changes
  98. this.otherDbs.push(newConn);
  99. newConn.otherDbs.push(this);
  100. // push onto the relatedDbs cache, this is used when state changes
  101. if (options?.useCache) {
  102. this.relatedDbs[newConn.name] = newConn;
  103. newConn.relatedDbs = this.relatedDbs;
  104. }
  105. return newConn;
  106. };
  107. /**
  108. * Runs a [db-level aggregate()](https://www.mongodb.com/docs/manual/reference/method/db.aggregate/) on this connection's underlying `db`
  109. *
  110. * @param {Array} pipeline
  111. * @param {Object} [options]
  112. */
  113. NativeConnection.prototype.aggregate = function aggregate(pipeline, options) {
  114. return new this.base.Aggregate(null, this).append(pipeline).option(options ?? {});
  115. };
  116. /**
  117. * Removes the database connection with the given name created with `useDb()`.
  118. *
  119. * Throws an error if the database connection was not found.
  120. *
  121. * #### Example:
  122. *
  123. * // Connect to `initialdb` first
  124. * const conn = await mongoose.createConnection('mongodb://127.0.0.1:27017/initialdb').asPromise();
  125. *
  126. * // Creates an un-cached connection to `mydb`
  127. * const db = conn.useDb('mydb');
  128. *
  129. * // Closes `db`, and removes `db` from `conn.relatedDbs` and `conn.otherDbs`
  130. * await conn.removeDb('mydb');
  131. *
  132. * @method removeDb
  133. * @memberOf Connection
  134. * @param {String} name The database name
  135. * @return {Connection} this
  136. */
  137. NativeConnection.prototype.removeDb = function removeDb(name) {
  138. const dbs = this.otherDbs.filter(db => db.name === name);
  139. if (!dbs.length) {
  140. throw new MongooseError(`No connections to database "${name}" found`);
  141. }
  142. for (const db of dbs) {
  143. db._closeCalled = true;
  144. db._destroyCalled = true;
  145. db._readyState = STATES.disconnected;
  146. db.$wasForceClosed = true;
  147. }
  148. delete this.relatedDbs[name];
  149. this.otherDbs = this.otherDbs.filter(db => db.name !== name);
  150. };
  151. /**
  152. * Closes the connection
  153. *
  154. * @param {Boolean} [force]
  155. * @return {Connection} this
  156. * @api private
  157. */
  158. NativeConnection.prototype.doClose = async function doClose(force) {
  159. if (this.client == null) {
  160. return this;
  161. }
  162. let skipCloseClient = false;
  163. if (force != null && typeof force === 'object') {
  164. skipCloseClient = force.skipCloseClient;
  165. force = force.force;
  166. }
  167. if (skipCloseClient) {
  168. return this;
  169. }
  170. await this.client.close(force);
  171. // Defer because the driver will wait at least 1ms before finishing closing
  172. // the pool, see https://github.com/mongodb-js/mongodb-core/blob/a8f8e4ce41936babc3b9112bf42d609779f03b39/lib/connection/pool.js#L1026-L1030.
  173. // If there's queued operations, you may still get some background work
  174. // after the callback is called.
  175. await new Promise(resolve => setTimeout(resolve, 1));
  176. return this;
  177. };
  178. /**
  179. * Implementation of `listDatabases()` for MongoDB driver
  180. *
  181. * @return Promise
  182. * @api public
  183. */
  184. NativeConnection.prototype.listDatabases = async function listDatabases() {
  185. await this._waitForConnect();
  186. return await this.db.admin().listDatabases();
  187. };
  188. /*!
  189. * ignore
  190. */
  191. NativeConnection.prototype.createClient = async function createClient(uri, options) {
  192. if (typeof uri !== 'string') {
  193. throw new MongooseError('The `uri` parameter to `openUri()` must be a ' +
  194. `string, got "${typeof uri}". Make sure the first parameter to ` +
  195. '`mongoose.connect()` or `mongoose.createConnection()` is a string.');
  196. }
  197. if (this._destroyCalled) {
  198. throw new MongooseError(
  199. 'Connection has been closed and destroyed, and cannot be used for re-opening the connection. ' +
  200. 'Please create a new connection with `mongoose.createConnection()` or `mongoose.connect()`.'
  201. );
  202. }
  203. if (this.readyState === STATES.connecting || this.readyState === STATES.connected) {
  204. if (this._connectionString !== uri) {
  205. throw new MongooseError('Can\'t call `openUri()` on an active connection with ' +
  206. 'different connection strings. Make sure you aren\'t calling `mongoose.connect()` ' +
  207. 'multiple times. See: https://mongoosejs.com/docs/connections.html#multiple_connections');
  208. }
  209. }
  210. options = processConnectionOptions(uri, options);
  211. if (options) {
  212. const autoIndex = options.config?.autoIndex ?? options.autoIndex;
  213. if (autoIndex != null) {
  214. this.config.autoIndex = autoIndex !== false;
  215. delete options.config;
  216. delete options.autoIndex;
  217. }
  218. if ('autoCreate' in options) {
  219. this.config.autoCreate = !!options.autoCreate;
  220. delete options.autoCreate;
  221. }
  222. if ('sanitizeFilter' in options) {
  223. this.config.sanitizeFilter = options.sanitizeFilter;
  224. delete options.sanitizeFilter;
  225. }
  226. if ('autoSearchIndex' in options) {
  227. this.config.autoSearchIndex = options.autoSearchIndex;
  228. delete options.autoSearchIndex;
  229. }
  230. if ('bufferTimeoutMS' in options) {
  231. this.config.bufferTimeoutMS = options.bufferTimeoutMS;
  232. delete options.bufferTimeoutMS;
  233. }
  234. // Backwards compat
  235. if (options.user || options.pass) {
  236. options.auth = options.auth || {};
  237. options.auth.username = options.user;
  238. options.auth.password = options.pass;
  239. this.user = options.user;
  240. this.pass = options.pass;
  241. }
  242. delete options.user;
  243. delete options.pass;
  244. if (options.bufferCommands != null) {
  245. this.config.bufferCommands = options.bufferCommands;
  246. delete options.bufferCommands;
  247. }
  248. } else {
  249. options = {};
  250. }
  251. this._connectionOptions = options;
  252. const dbName = options.dbName;
  253. if (dbName != null) {
  254. this.$dbName = dbName;
  255. }
  256. delete options.dbName;
  257. if (!utils.hasUserDefinedProperty(options, 'driverInfo')) {
  258. options.driverInfo = {
  259. name: 'Mongoose',
  260. version: pkg.version
  261. };
  262. }
  263. const { schemaMap, encryptedFieldsMap } = this._buildEncryptionSchemas();
  264. if ((utils.hasOwnKeys(schemaMap) || utils.hasOwnKeys(encryptedFieldsMap)) && !options.autoEncryption) {
  265. throw new Error('Must provide `autoEncryption` when connecting with encrypted schemas.');
  266. }
  267. if (utils.hasOwnKeys(schemaMap)) {
  268. options.autoEncryption.schemaMap = schemaMap;
  269. }
  270. if (utils.hasOwnKeys(encryptedFieldsMap)) {
  271. options.autoEncryption.encryptedFieldsMap = encryptedFieldsMap;
  272. }
  273. this.readyState = STATES.connecting;
  274. this._connectionString = uri;
  275. let client;
  276. try {
  277. client = new mongodb.MongoClient(uri, options);
  278. } catch (error) {
  279. this.readyState = STATES.disconnected;
  280. throw error;
  281. }
  282. this.client = client;
  283. client.setMaxListeners(0);
  284. await client.connect();
  285. _setClient(this, client, options, dbName);
  286. for (const db of this.otherDbs) {
  287. _setClient(db, client, {}, db.name);
  288. }
  289. return this;
  290. };
  291. /**
  292. * Given a connection, which may or may not have encrypted models, build
  293. * a schemaMap and/or an encryptedFieldsMap for the connection, combining all models
  294. * into a single schemaMap and encryptedFields map.
  295. *
  296. * @returns the generated schemaMap and encryptedFieldsMap
  297. */
  298. NativeConnection.prototype._buildEncryptionSchemas = function() {
  299. const qeMappings = {};
  300. const csfleMappings = {};
  301. const encryptedModels = Object.values(this.models).filter(model => model.schema._hasEncryptedFields());
  302. // If discriminators are configured for the collection, there might be multiple models
  303. // pointing to the same namespace. For this scenario, we merge all the schemas for each namespace
  304. // into a single schema and then generate a schemaMap/encryptedFieldsMap for the combined schema.
  305. for (const model of encryptedModels) {
  306. const { schema, collection: { collectionName } } = model;
  307. const namespace = `${this.$dbName}.${collectionName}`;
  308. const mappings = schema.encryptionType() === 'csfle' ? csfleMappings : qeMappings;
  309. mappings[namespace] ??= new Schema({}, { encryptionType: schema.encryptionType() });
  310. const isNonRootDiscriminator = schema.discriminatorMapping && !schema.discriminatorMapping.isRoot;
  311. if (isNonRootDiscriminator) {
  312. const rootSchema = schema._baseSchema;
  313. schema.eachPath((pathname) => {
  314. if (rootSchema.path(pathname)) return;
  315. if (!mappings[namespace]._hasEncryptedField(pathname)) return;
  316. throw new Error(`Cannot have duplicate keys in discriminators with encryption. key=${pathname}`);
  317. });
  318. }
  319. mappings[namespace].add(schema);
  320. }
  321. const schemaMap = Object.fromEntries(Object.entries(csfleMappings).map(
  322. ([namespace, schema]) => ([namespace, schema._buildSchemaMap()])
  323. ));
  324. const encryptedFieldsMap = Object.fromEntries(Object.entries(qeMappings).map(
  325. ([namespace, schema]) => ([namespace, schema._buildEncryptedFields()])
  326. ));
  327. return {
  328. schemaMap, encryptedFieldsMap
  329. };
  330. };
  331. /*!
  332. * ignore
  333. */
  334. NativeConnection.prototype.setClient = function setClient(client) {
  335. if (!(client instanceof mongodb.MongoClient)) {
  336. throw new MongooseError('Must call `setClient()` with an instance of MongoClient');
  337. }
  338. if (this.readyState !== STATES.disconnected) {
  339. throw new MongooseError('Cannot call `setClient()` on a connection that is already connected.');
  340. }
  341. if (client.topology == null) {
  342. throw new MongooseError('Cannot call `setClient()` with a MongoClient that you have not called `connect()` on yet.');
  343. }
  344. this._connectionString = client.s.url;
  345. _setClient(this, client, {}, client.s.options.dbName);
  346. for (const model of Object.values(this.models)) {
  347. // Errors handled internally, so safe to ignore error
  348. model.init().catch(function $modelInitNoop() {});
  349. }
  350. return this;
  351. };
  352. /*!
  353. * ignore
  354. */
  355. function _setClient(conn, client, options, dbName) {
  356. const db = dbName != null ? client.db(dbName) : client.db();
  357. conn.db = db;
  358. conn.client = client;
  359. conn.host = client?.s?.options?.hosts?.[0]?.host;
  360. conn.port = client?.s?.options?.hosts?.[0]?.port;
  361. conn.name = dbName != null ? dbName : db.databaseName;
  362. conn._closeCalled = client._closeCalled;
  363. const _handleReconnect = () => {
  364. // If we aren't disconnected, we assume this reconnect is due to a
  365. // socket timeout. If there's no activity on a socket for
  366. // `socketTimeoutMS`, the driver will attempt to reconnect and emit
  367. // this event.
  368. if (conn.readyState !== STATES.connected) {
  369. conn.readyState = STATES.connected;
  370. conn.emit('reconnect');
  371. conn.emit('reconnected');
  372. conn.onOpen();
  373. }
  374. };
  375. const type = client?.topology?.description?.type || '';
  376. if (type === 'Single') {
  377. client.on('serverDescriptionChanged', ev => {
  378. const newDescription = ev.newDescription;
  379. if (newDescription.type === 'Unknown') {
  380. conn.readyState = STATES.disconnected;
  381. } else {
  382. _handleReconnect();
  383. }
  384. });
  385. } else if (type.startsWith('ReplicaSet')) {
  386. client.on('topologyDescriptionChanged', ev => {
  387. // Emit disconnected if we've lost connectivity to the primary
  388. const description = ev.newDescription;
  389. if (conn.readyState === STATES.connected && description.type !== 'ReplicaSetWithPrimary') {
  390. // Implicitly emits 'disconnected'
  391. conn.readyState = STATES.disconnected;
  392. } else if (conn.readyState === STATES.disconnected && description.type === 'ReplicaSetWithPrimary') {
  393. _handleReconnect();
  394. }
  395. });
  396. }
  397. conn._lastHeartbeatAt = null;
  398. client.on('serverHeartbeatSucceeded', () => {
  399. conn._lastHeartbeatAt = Date.now();
  400. for (const otherDb of conn.otherDbs) {
  401. otherDb._lastHeartbeatAt = conn._lastHeartbeatAt;
  402. }
  403. });
  404. if (options.monitorCommands) {
  405. client.on('commandStarted', (data) => conn.emit('commandStarted', data));
  406. client.on('commandFailed', (data) => conn.emit('commandFailed', data));
  407. client.on('commandSucceeded', (data) => conn.emit('commandSucceeded', data));
  408. }
  409. conn.onOpen();
  410. for (const i in conn.collections) {
  411. if (Object.hasOwn(conn.collections, i)) {
  412. conn.collections[i].onOpen();
  413. }
  414. }
  415. }
  416. /*!
  417. * Module exports.
  418. */
  419. module.exports = NativeConnection;