commands.js 22 KB


  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.OpCompressedRequest = exports.OpMsgResponse = exports.OpMsgRequest = exports.DocumentSequence = exports.OpReply = exports.OpQueryRequest = void 0;
  4. const BSON = require("../bson");
  5. const error_1 = require("../error");
  6. const compression_1 = require("./wire_protocol/compression");
  7. const constants_1 = require("./wire_protocol/constants");
  8. // Incrementing request id
  9. let _requestId = 0;
  10. // Query flags
  11. const OPTS_TAILABLE_CURSOR = 2;
  12. const OPTS_SECONDARY = 4;
  13. const OPTS_OPLOG_REPLAY = 8;
  14. const OPTS_NO_CURSOR_TIMEOUT = 16;
  15. const OPTS_AWAIT_DATA = 32;
  16. const OPTS_EXHAUST = 64;
  17. const OPTS_PARTIAL = 128;
  18. // Response flags
  19. const CURSOR_NOT_FOUND = 1;
  20. const QUERY_FAILURE = 2;
  21. const SHARD_CONFIG_STALE = 4;
  22. const AWAIT_CAPABLE = 8;
  23. const encodeUTF8Into = BSON.BSON.onDemand.ByteUtils.encodeUTF8Into;
  24. /** @internal */
  25. class OpQueryRequest {
  26. constructor(databaseName, query, options) {
  27. /** moreToCome is an OP_MSG only concept */
  28. this.moreToCome = false;
  29. // Basic options needed to be passed in
  30. // TODO(NODE-3483): Replace with MongoCommandError
  31. const ns = `${databaseName}.$cmd`;
  32. if (typeof databaseName !== 'string') {
  33. throw new error_1.MongoRuntimeError('Database name must be a string for a query');
  34. }
  35. // TODO(NODE-3483): Replace with MongoCommandError
  36. if (query == null)
  37. throw new error_1.MongoRuntimeError('A query document must be specified for query');
  38. // Validate that we are not passing 0x00 in the collection name
  39. if (ns.indexOf('\x00') !== -1) {
  40. // TODO(NODE-3483): Use MongoNamespace static method
  41. throw new error_1.MongoRuntimeError('Namespace cannot contain a null character');
  42. }
  43. // Basic optionsa
  44. this.databaseName = databaseName;
  45. this.query = query;
  46. this.ns = ns;
  47. // Additional options
  48. this.numberToSkip = options.numberToSkip || 0;
  49. this.numberToReturn = options.numberToReturn || 0;
  50. this.returnFieldSelector = options.returnFieldSelector || undefined;
  51. this.requestId = options.requestId ?? OpQueryRequest.getRequestId();
  52. // special case for pre-3.2 find commands, delete ASAP
  53. this.pre32Limit = options.pre32Limit;
  54. // Serialization option
  55. this.serializeFunctions =
  56. typeof options.serializeFunctions === 'boolean' ? options.serializeFunctions : false;
  57. this.ignoreUndefined =
  58. typeof options.ignoreUndefined === 'boolean' ? options.ignoreUndefined : false;
  59. this.maxBsonSize = options.maxBsonSize || 1024 * 1024 * 16;
  60. this.checkKeys = typeof options.checkKeys === 'boolean' ? options.checkKeys : false;
  61. this.batchSize = this.numberToReturn;
  62. // Flags
  63. this.tailable = false;
  64. this.secondaryOk = typeof options.secondaryOk === 'boolean' ? options.secondaryOk : false;
  65. this.oplogReplay = false;
  66. this.noCursorTimeout = false;
  67. this.awaitData = false;
  68. this.exhaust = false;
  69. this.partial = false;
  70. }
  71. /** Assign next request Id. */
  72. incRequestId() {
  73. this.requestId = _requestId++;
  74. }
  75. /** Peek next request Id. */
  76. nextRequestId() {
  77. return _requestId + 1;
  78. }
  79. /** Increment then return next request Id. */
  80. static getRequestId() {
  81. return ++_requestId;
  82. }
  83. // Uses a single allocated buffer for the process, avoiding multiple memory allocations
  84. toBin() {
  85. const buffers = [];
  86. let projection = null;
  87. // Set up the flags
  88. let flags = 0;
  89. if (this.tailable) {
  90. flags |= OPTS_TAILABLE_CURSOR;
  91. }
  92. if (this.secondaryOk) {
  93. flags |= OPTS_SECONDARY;
  94. }
  95. if (this.oplogReplay) {
  96. flags |= OPTS_OPLOG_REPLAY;
  97. }
  98. if (this.noCursorTimeout) {
  99. flags |= OPTS_NO_CURSOR_TIMEOUT;
  100. }
  101. if (this.awaitData) {
  102. flags |= OPTS_AWAIT_DATA;
  103. }
  104. if (this.exhaust) {
  105. flags |= OPTS_EXHAUST;
  106. }
  107. if (this.partial) {
  108. flags |= OPTS_PARTIAL;
  109. }
  110. // If batchSize is different to this.numberToReturn
  111. if (this.batchSize !== this.numberToReturn)
  112. this.numberToReturn = this.batchSize;
  113. // Allocate write protocol header buffer
  114. const header = Buffer.alloc(4 * 4 + // Header
  115. 4 + // Flags
  116. Buffer.byteLength(this.ns) +
  117. 1 + // namespace
  118. 4 + // numberToSkip
  119. 4 // numberToReturn
  120. );
  121. // Add header to buffers
  122. buffers.push(header);
  123. // Serialize the query
  124. const query = BSON.serialize(this.query, {
  125. checkKeys: this.checkKeys,
  126. serializeFunctions: this.serializeFunctions,
  127. ignoreUndefined: this.ignoreUndefined
  128. });
  129. // Add query document
  130. buffers.push(query);
  131. if (this.returnFieldSelector && Object.keys(this.returnFieldSelector).length > 0) {
  132. // Serialize the projection document
  133. projection = BSON.serialize(this.returnFieldSelector, {
  134. checkKeys: this.checkKeys,
  135. serializeFunctions: this.serializeFunctions,
  136. ignoreUndefined: this.ignoreUndefined
  137. });
  138. // Add projection document
  139. buffers.push(projection);
  140. }
  141. // Total message size
  142. const totalLength = header.length + query.length + (projection ? projection.length : 0);
  143. // Set up the index
  144. let index = 4;
  145. // Write total document length
  146. header[3] = (totalLength >> 24) & 0xff;
  147. header[2] = (totalLength >> 16) & 0xff;
  148. header[1] = (totalLength >> 8) & 0xff;
  149. header[0] = totalLength & 0xff;
  150. // Write header information requestId
  151. header[index + 3] = (this.requestId >> 24) & 0xff;
  152. header[index + 2] = (this.requestId >> 16) & 0xff;
  153. header[index + 1] = (this.requestId >> 8) & 0xff;
  154. header[index] = this.requestId & 0xff;
  155. index = index + 4;
  156. // Write header information responseTo
  157. header[index + 3] = (0 >> 24) & 0xff;
  158. header[index + 2] = (0 >> 16) & 0xff;
  159. header[index + 1] = (0 >> 8) & 0xff;
  160. header[index] = 0 & 0xff;
  161. index = index + 4;
  162. // Write header information OP_QUERY
  163. header[index + 3] = (constants_1.OP_QUERY >> 24) & 0xff;
  164. header[index + 2] = (constants_1.OP_QUERY >> 16) & 0xff;
  165. header[index + 1] = (constants_1.OP_QUERY >> 8) & 0xff;
  166. header[index] = constants_1.OP_QUERY & 0xff;
  167. index = index + 4;
  168. // Write header information flags
  169. header[index + 3] = (flags >> 24) & 0xff;
  170. header[index + 2] = (flags >> 16) & 0xff;
  171. header[index + 1] = (flags >> 8) & 0xff;
  172. header[index] = flags & 0xff;
  173. index = index + 4;
  174. // Write collection name
  175. index = index + header.write(this.ns, index, 'utf8') + 1;
  176. header[index - 1] = 0;
  177. // Write header information flags numberToSkip
  178. header[index + 3] = (this.numberToSkip >> 24) & 0xff;
  179. header[index + 2] = (this.numberToSkip >> 16) & 0xff;
  180. header[index + 1] = (this.numberToSkip >> 8) & 0xff;
  181. header[index] = this.numberToSkip & 0xff;
  182. index = index + 4;
  183. // Write header information flags numberToReturn
  184. header[index + 3] = (this.numberToReturn >> 24) & 0xff;
  185. header[index + 2] = (this.numberToReturn >> 16) & 0xff;
  186. header[index + 1] = (this.numberToReturn >> 8) & 0xff;
  187. header[index] = this.numberToReturn & 0xff;
  188. index = index + 4;
  189. // Return the buffers
  190. return buffers;
  191. }
  192. }
  193. exports.OpQueryRequest = OpQueryRequest;
  194. /** @internal */
  195. class OpReply {
  196. constructor(message, msgHeader, msgBody, opts) {
  197. this.index = 0;
  198. this.sections = [];
  199. /** moreToCome is an OP_MSG only concept */
  200. this.moreToCome = false;
  201. this.parsed = false;
  202. this.raw = message;
  203. this.data = msgBody;
  204. this.opts = opts ?? {
  205. useBigInt64: false,
  206. promoteLongs: true,
  207. promoteValues: true,
  208. promoteBuffers: false,
  209. bsonRegExp: false
  210. };
  211. // Read the message header
  212. this.length = msgHeader.length;
  213. this.requestId = msgHeader.requestId;
  214. this.responseTo = msgHeader.responseTo;
  215. this.opCode = msgHeader.opCode;
  216. this.fromCompressed = msgHeader.fromCompressed;
  217. // Flag values
  218. this.useBigInt64 = typeof this.opts.useBigInt64 === 'boolean' ? this.opts.useBigInt64 : false;
  219. this.promoteLongs = typeof this.opts.promoteLongs === 'boolean' ? this.opts.promoteLongs : true;
  220. this.promoteValues =
  221. typeof this.opts.promoteValues === 'boolean' ? this.opts.promoteValues : true;
  222. this.promoteBuffers =
  223. typeof this.opts.promoteBuffers === 'boolean' ? this.opts.promoteBuffers : false;
  224. this.bsonRegExp = typeof this.opts.bsonRegExp === 'boolean' ? this.opts.bsonRegExp : false;
  225. }
  226. isParsed() {
  227. return this.parsed;
  228. }
  229. parse() {
  230. // Don't parse again if not needed
  231. if (this.parsed)
  232. return this.sections[0];
  233. // Position within OP_REPLY at which documents start
  234. // (See https://www.mongodb.com/docs/manual/reference/mongodb-wire-protocol/#wire-op-reply)
  235. this.index = 20;
  236. // Read the message body
  237. this.responseFlags = this.data.readInt32LE(0);
  238. this.cursorId = new BSON.Long(this.data.readInt32LE(4), this.data.readInt32LE(8));
  239. this.startingFrom = this.data.readInt32LE(12);
  240. this.numberReturned = this.data.readInt32LE(16);
  241. if (this.numberReturned < 0 || this.numberReturned > 2 ** 32 - 1) {
  242. throw new RangeError(`OP_REPLY numberReturned is an invalid array length ${this.numberReturned}`);
  243. }
  244. this.cursorNotFound = (this.responseFlags & CURSOR_NOT_FOUND) !== 0;
  245. this.queryFailure = (this.responseFlags & QUERY_FAILURE) !== 0;
  246. this.shardConfigStale = (this.responseFlags & SHARD_CONFIG_STALE) !== 0;
  247. this.awaitCapable = (this.responseFlags & AWAIT_CAPABLE) !== 0;
  248. // Parse Body
  249. for (let i = 0; i < this.numberReturned; i++) {
  250. const bsonSize = this.data[this.index] |
  251. (this.data[this.index + 1] << 8) |
  252. (this.data[this.index + 2] << 16) |
  253. (this.data[this.index + 3] << 24);
  254. const section = this.data.subarray(this.index, this.index + bsonSize);
  255. this.sections.push(section);
  256. // Adjust the index
  257. this.index = this.index + bsonSize;
  258. }
  259. // Set parsed
  260. this.parsed = true;
  261. return this.sections[0];
  262. }
  263. }
  264. exports.OpReply = OpReply;
  265. // Msg Flags
  266. const OPTS_CHECKSUM_PRESENT = 1;
  267. const OPTS_MORE_TO_COME = 2;
  268. const OPTS_EXHAUST_ALLOWED = 1 << 16;
  269. /** @internal */
  270. class DocumentSequence {
  271. /**
  272. * Create a new document sequence for the provided field.
  273. * @param field - The field it will replace.
  274. */
  275. constructor(field, documents) {
  276. this.field = field;
  277. this.documents = [];
  278. this.chunks = [];
  279. this.serializedDocumentsLength = 0;
  280. // Document sequences starts with type 1 at the first byte.
  281. // Field strings must always be UTF-8.
  282. const buffer = Buffer.allocUnsafe(1 + 4 + this.field.length + 1);
  283. buffer[0] = 1;
  284. // Third part is the field name at offset 5 with trailing null byte.
  285. encodeUTF8Into(buffer, `${this.field}\0`, 5);
  286. this.chunks.push(buffer);
  287. this.header = buffer;
  288. if (documents) {
  289. for (const doc of documents) {
  290. this.push(doc, BSON.serialize(doc));
  291. }
  292. }
  293. }
  294. /**
  295. * Push a document to the document sequence. Will serialize the document
  296. * as well and return the current serialized length of all documents.
  297. * @param document - The document to add.
  298. * @param buffer - The serialized document in raw BSON.
  299. * @returns The new total document sequence length.
  300. */
  301. push(document, buffer) {
  302. this.serializedDocumentsLength += buffer.length;
  303. // Push the document.
  304. this.documents.push(document);
  305. // Push the document raw bson.
  306. this.chunks.push(buffer);
  307. // Write the new length.
  308. this.header?.writeInt32LE(4 + this.field.length + 1 + this.serializedDocumentsLength, 1);
  309. return this.serializedDocumentsLength + this.header.length;
  310. }
  311. /**
  312. * Get the fully serialized bytes for the document sequence section.
  313. * @returns The section bytes.
  314. */
  315. toBin() {
  316. return Buffer.concat(this.chunks);
  317. }
  318. }
  319. exports.DocumentSequence = DocumentSequence;
  320. /** @internal */
  321. class OpMsgRequest {
  322. constructor(databaseName, command, options) {
  323. // Basic options needed to be passed in
  324. if (command == null)
  325. throw new error_1.MongoInvalidArgumentError('Query document must be specified for query');
  326. // Basic optionsa
  327. this.databaseName = databaseName;
  328. this.command = command;
  329. this.command.$db = databaseName;
  330. // Ensure empty options
  331. this.options = options ?? {};
  332. // Additional options
  333. this.requestId = options.requestId ? options.requestId : OpMsgRequest.getRequestId();
  334. // Serialization option
  335. this.serializeFunctions =
  336. typeof options.serializeFunctions === 'boolean' ? options.serializeFunctions : false;
  337. this.ignoreUndefined =
  338. typeof options.ignoreUndefined === 'boolean' ? options.ignoreUndefined : false;
  339. this.checkKeys = typeof options.checkKeys === 'boolean' ? options.checkKeys : false;
  340. this.maxBsonSize = options.maxBsonSize || 1024 * 1024 * 16;
  341. // flags
  342. this.checksumPresent = false;
  343. this.moreToCome = options.moreToCome ?? command.writeConcern?.w === 0;
  344. this.exhaustAllowed =
  345. typeof options.exhaustAllowed === 'boolean' ? options.exhaustAllowed : false;
  346. }
  347. toBin() {
  348. const buffers = [];
  349. let flags = 0;
  350. if (this.checksumPresent) {
  351. flags |= OPTS_CHECKSUM_PRESENT;
  352. }
  353. if (this.moreToCome) {
  354. flags |= OPTS_MORE_TO_COME;
  355. }
  356. if (this.exhaustAllowed) {
  357. flags |= OPTS_EXHAUST_ALLOWED;
  358. }
  359. const header = Buffer.alloc(4 * 4 + // Header
  360. 4 // Flags
  361. );
  362. buffers.push(header);
  363. let totalLength = header.length;
  364. const command = this.command;
  365. totalLength += this.makeSections(buffers, command);
  366. header.writeInt32LE(totalLength, 0); // messageLength
  367. header.writeInt32LE(this.requestId, 4); // requestID
  368. header.writeInt32LE(0, 8); // responseTo
  369. header.writeInt32LE(constants_1.OP_MSG, 12); // opCode
  370. header.writeUInt32LE(flags, 16); // flags
  371. return buffers;
  372. }
  373. /**
  374. * Add the sections to the OP_MSG request's buffers and returns the length.
  375. */
  376. makeSections(buffers, document) {
  377. const sequencesBuffer = this.extractDocumentSequences(document);
  378. const payloadTypeBuffer = Buffer.allocUnsafe(1);
  379. payloadTypeBuffer[0] = 0;
  380. const documentBuffer = this.serializeBson(document);
  381. // First section, type 0
  382. buffers.push(payloadTypeBuffer);
  383. buffers.push(documentBuffer);
  384. // Subsequent sections, type 1
  385. buffers.push(sequencesBuffer);
  386. return payloadTypeBuffer.length + documentBuffer.length + sequencesBuffer.length;
  387. }
  388. /**
  389. * Extracts the document sequences from the command document and returns
  390. * a buffer to be added as multiple sections after the initial type 0
  391. * section in the message.
  392. */
  393. extractDocumentSequences(document) {
  394. // Pull out any field in the command document that's value is a document sequence.
  395. const chunks = [];
  396. for (const [key, value] of Object.entries(document)) {
  397. if (value instanceof DocumentSequence) {
  398. chunks.push(value.toBin());
  399. // Why are we removing the field from the command? This is because it needs to be
  400. // removed in the OP_MSG request first section, and DocumentSequence is not a
  401. // BSON type and is specific to the MongoDB wire protocol so there's nothing
  402. // our BSON serializer can do about this. Since DocumentSequence is not exposed
  403. // in the public API and only used internally, we are never mutating an original
  404. // command provided by the user, just our own, and it's cheaper to delete from
  405. // our own command than copying it.
  406. delete document[key];
  407. }
  408. }
  409. if (chunks.length > 0) {
  410. return Buffer.concat(chunks);
  411. }
  412. // If we have no document sequences we return an empty buffer for nothing to add
  413. // to the payload.
  414. return Buffer.alloc(0);
  415. }
  416. serializeBson(document) {
  417. return BSON.serialize(document, {
  418. checkKeys: this.checkKeys,
  419. serializeFunctions: this.serializeFunctions,
  420. ignoreUndefined: this.ignoreUndefined
  421. });
  422. }
  423. static getRequestId() {
  424. _requestId = (_requestId + 1) & 0x7fffffff;
  425. return _requestId;
  426. }
  427. }
  428. exports.OpMsgRequest = OpMsgRequest;
  429. /** @internal */
  430. class OpMsgResponse {
  431. constructor(message, msgHeader, msgBody, opts) {
  432. this.index = 0;
  433. this.sections = [];
  434. this.parsed = false;
  435. this.raw = message;
  436. this.data = msgBody;
  437. this.opts = opts ?? {
  438. useBigInt64: false,
  439. promoteLongs: true,
  440. promoteValues: true,
  441. promoteBuffers: false,
  442. bsonRegExp: false
  443. };
  444. // Read the message header
  445. this.length = msgHeader.length;
  446. this.requestId = msgHeader.requestId;
  447. this.responseTo = msgHeader.responseTo;
  448. this.opCode = msgHeader.opCode;
  449. this.fromCompressed = msgHeader.fromCompressed;
  450. // Read response flags
  451. this.responseFlags = msgBody.readInt32LE(0);
  452. this.checksumPresent = (this.responseFlags & OPTS_CHECKSUM_PRESENT) !== 0;
  453. this.moreToCome = (this.responseFlags & OPTS_MORE_TO_COME) !== 0;
  454. this.exhaustAllowed = (this.responseFlags & OPTS_EXHAUST_ALLOWED) !== 0;
  455. this.useBigInt64 = typeof this.opts.useBigInt64 === 'boolean' ? this.opts.useBigInt64 : false;
  456. this.promoteLongs = typeof this.opts.promoteLongs === 'boolean' ? this.opts.promoteLongs : true;
  457. this.promoteValues =
  458. typeof this.opts.promoteValues === 'boolean' ? this.opts.promoteValues : true;
  459. this.promoteBuffers =
  460. typeof this.opts.promoteBuffers === 'boolean' ? this.opts.promoteBuffers : false;
  461. this.bsonRegExp = typeof this.opts.bsonRegExp === 'boolean' ? this.opts.bsonRegExp : false;
  462. }
  463. isParsed() {
  464. return this.parsed;
  465. }
  466. parse() {
  467. // Don't parse again if not needed
  468. if (this.parsed)
  469. return this.sections[0];
  470. this.index = 4;
  471. while (this.index < this.data.length) {
  472. const payloadType = this.data.readUInt8(this.index++);
  473. if (payloadType === 0) {
  474. const bsonSize = this.data.readUInt32LE(this.index);
  475. const bin = this.data.subarray(this.index, this.index + bsonSize);
  476. this.sections.push(bin);
  477. this.index += bsonSize;
  478. }
  479. else if (payloadType === 1) {
  480. // It was decided that no driver makes use of payload type 1
  481. // TODO(NODE-3483): Replace with MongoDeprecationError
  482. throw new error_1.MongoRuntimeError('OP_MSG Payload Type 1 detected unsupported protocol');
  483. }
  484. }
  485. this.parsed = true;
  486. return this.sections[0];
  487. }
  488. }
  489. exports.OpMsgResponse = OpMsgResponse;
  490. const MESSAGE_HEADER_SIZE = 16;
  491. const COMPRESSION_DETAILS_SIZE = 9; // originalOpcode + uncompressedSize, compressorID
  492. /**
  493. * @internal
  494. *
  495. * An OP_COMPRESSED request wraps either an OP_QUERY or OP_MSG message.
  496. */
  497. class OpCompressedRequest {
  498. constructor(command, options) {
  499. this.command = command;
  500. this.options = {
  501. zlibCompressionLevel: options.zlibCompressionLevel,
  502. agreedCompressor: options.agreedCompressor
  503. };
  504. }
  505. // Return whether a command contains an uncompressible command term
  506. // Will return true if command contains no uncompressible command terms
  507. static canCompress(command) {
  508. const commandDoc = command instanceof OpMsgRequest ? command.command : command.query;
  509. const commandName = Object.keys(commandDoc)[0];
  510. return !compression_1.uncompressibleCommands.has(commandName);
  511. }
  512. async toBin() {
  513. const concatenatedOriginalCommandBuffer = Buffer.concat(this.command.toBin());
  514. // otherwise, compress the message
  515. const messageToBeCompressed = concatenatedOriginalCommandBuffer.slice(MESSAGE_HEADER_SIZE);
  516. // Extract information needed for OP_COMPRESSED from the uncompressed message
  517. const originalCommandOpCode = concatenatedOriginalCommandBuffer.readInt32LE(12);
  518. // Compress the message body
  519. const compressedMessage = await (0, compression_1.compress)(this.options, messageToBeCompressed);
  520. // Create the msgHeader of OP_COMPRESSED
  521. const msgHeader = Buffer.alloc(MESSAGE_HEADER_SIZE);
  522. msgHeader.writeInt32LE(MESSAGE_HEADER_SIZE + COMPRESSION_DETAILS_SIZE + compressedMessage.length, 0); // messageLength
  523. msgHeader.writeInt32LE(this.command.requestId, 4); // requestID
  524. msgHeader.writeInt32LE(0, 8); // responseTo (zero)
  525. msgHeader.writeInt32LE(constants_1.OP_COMPRESSED, 12); // opCode
  526. // Create the compression details of OP_COMPRESSED
  527. const compressionDetails = Buffer.alloc(COMPRESSION_DETAILS_SIZE);
  528. compressionDetails.writeInt32LE(originalCommandOpCode, 0); // originalOpcode
  529. compressionDetails.writeInt32LE(messageToBeCompressed.length, 4); // Size of the uncompressed compressedMessage, excluding the MsgHeader
  530. compressionDetails.writeUInt8(compression_1.Compressor[this.options.agreedCompressor], 8); // compressorID
  531. return [msgHeader, compressionDetails, compressedMessage];
  532. }
  533. }
  534. exports.OpCompressedRequest = OpCompressedRequest;
  535. //# sourceMappingURL=commands.js.map