| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535 |
- "use strict";
- Object.defineProperty(exports, "__esModule", { value: true });
- exports.OpCompressedRequest = exports.OpMsgResponse = exports.OpMsgRequest = exports.DocumentSequence = exports.OpReply = exports.OpQueryRequest = void 0;
- const BSON = require("../bson");
- const error_1 = require("../error");
- const compression_1 = require("./wire_protocol/compression");
- const constants_1 = require("./wire_protocol/constants");
- // Incrementing request id
- let _requestId = 0;
- // Query flags
- const OPTS_TAILABLE_CURSOR = 2;
- const OPTS_SECONDARY = 4;
- const OPTS_OPLOG_REPLAY = 8;
- const OPTS_NO_CURSOR_TIMEOUT = 16;
- const OPTS_AWAIT_DATA = 32;
- const OPTS_EXHAUST = 64;
- const OPTS_PARTIAL = 128;
- // Response flags
- const CURSOR_NOT_FOUND = 1;
- const QUERY_FAILURE = 2;
- const SHARD_CONFIG_STALE = 4;
- const AWAIT_CAPABLE = 8;
- const encodeUTF8Into = BSON.BSON.onDemand.ByteUtils.encodeUTF8Into;
- /** @internal */
- class OpQueryRequest {
- constructor(databaseName, query, options) {
- /** moreToCome is an OP_MSG only concept */
- this.moreToCome = false;
- // Basic options needed to be passed in
- // TODO(NODE-3483): Replace with MongoCommandError
- const ns = `${databaseName}.$cmd`;
- if (typeof databaseName !== 'string') {
- throw new error_1.MongoRuntimeError('Database name must be a string for a query');
- }
- // TODO(NODE-3483): Replace with MongoCommandError
- if (query == null)
- throw new error_1.MongoRuntimeError('A query document must be specified for query');
- // Validate that we are not passing 0x00 in the collection name
- if (ns.indexOf('\x00') !== -1) {
- // TODO(NODE-3483): Use MongoNamespace static method
- throw new error_1.MongoRuntimeError('Namespace cannot contain a null character');
- }
- // Basic optionsa
- this.databaseName = databaseName;
- this.query = query;
- this.ns = ns;
- // Additional options
- this.numberToSkip = options.numberToSkip || 0;
- this.numberToReturn = options.numberToReturn || 0;
- this.returnFieldSelector = options.returnFieldSelector || undefined;
- this.requestId = options.requestId ?? OpQueryRequest.getRequestId();
- // special case for pre-3.2 find commands, delete ASAP
- this.pre32Limit = options.pre32Limit;
- // Serialization option
- this.serializeFunctions =
- typeof options.serializeFunctions === 'boolean' ? options.serializeFunctions : false;
- this.ignoreUndefined =
- typeof options.ignoreUndefined === 'boolean' ? options.ignoreUndefined : false;
- this.maxBsonSize = options.maxBsonSize || 1024 * 1024 * 16;
- this.checkKeys = typeof options.checkKeys === 'boolean' ? options.checkKeys : false;
- this.batchSize = this.numberToReturn;
- // Flags
- this.tailable = false;
- this.secondaryOk = typeof options.secondaryOk === 'boolean' ? options.secondaryOk : false;
- this.oplogReplay = false;
- this.noCursorTimeout = false;
- this.awaitData = false;
- this.exhaust = false;
- this.partial = false;
- }
- /** Assign next request Id. */
- incRequestId() {
- this.requestId = _requestId++;
- }
- /** Peek next request Id. */
- nextRequestId() {
- return _requestId + 1;
- }
- /** Increment then return next request Id. */
- static getRequestId() {
- return ++_requestId;
- }
- // Uses a single allocated buffer for the process, avoiding multiple memory allocations
- toBin() {
- const buffers = [];
- let projection = null;
- // Set up the flags
- let flags = 0;
- if (this.tailable) {
- flags |= OPTS_TAILABLE_CURSOR;
- }
- if (this.secondaryOk) {
- flags |= OPTS_SECONDARY;
- }
- if (this.oplogReplay) {
- flags |= OPTS_OPLOG_REPLAY;
- }
- if (this.noCursorTimeout) {
- flags |= OPTS_NO_CURSOR_TIMEOUT;
- }
- if (this.awaitData) {
- flags |= OPTS_AWAIT_DATA;
- }
- if (this.exhaust) {
- flags |= OPTS_EXHAUST;
- }
- if (this.partial) {
- flags |= OPTS_PARTIAL;
- }
- // If batchSize is different to this.numberToReturn
- if (this.batchSize !== this.numberToReturn)
- this.numberToReturn = this.batchSize;
- // Allocate write protocol header buffer
- const header = Buffer.alloc(4 * 4 + // Header
- 4 + // Flags
- Buffer.byteLength(this.ns) +
- 1 + // namespace
- 4 + // numberToSkip
- 4 // numberToReturn
- );
- // Add header to buffers
- buffers.push(header);
- // Serialize the query
- const query = BSON.serialize(this.query, {
- checkKeys: this.checkKeys,
- serializeFunctions: this.serializeFunctions,
- ignoreUndefined: this.ignoreUndefined
- });
- // Add query document
- buffers.push(query);
- if (this.returnFieldSelector && Object.keys(this.returnFieldSelector).length > 0) {
- // Serialize the projection document
- projection = BSON.serialize(this.returnFieldSelector, {
- checkKeys: this.checkKeys,
- serializeFunctions: this.serializeFunctions,
- ignoreUndefined: this.ignoreUndefined
- });
- // Add projection document
- buffers.push(projection);
- }
- // Total message size
- const totalLength = header.length + query.length + (projection ? projection.length : 0);
- // Set up the index
- let index = 4;
- // Write total document length
- header[3] = (totalLength >> 24) & 0xff;
- header[2] = (totalLength >> 16) & 0xff;
- header[1] = (totalLength >> 8) & 0xff;
- header[0] = totalLength & 0xff;
- // Write header information requestId
- header[index + 3] = (this.requestId >> 24) & 0xff;
- header[index + 2] = (this.requestId >> 16) & 0xff;
- header[index + 1] = (this.requestId >> 8) & 0xff;
- header[index] = this.requestId & 0xff;
- index = index + 4;
- // Write header information responseTo
- header[index + 3] = (0 >> 24) & 0xff;
- header[index + 2] = (0 >> 16) & 0xff;
- header[index + 1] = (0 >> 8) & 0xff;
- header[index] = 0 & 0xff;
- index = index + 4;
- // Write header information OP_QUERY
- header[index + 3] = (constants_1.OP_QUERY >> 24) & 0xff;
- header[index + 2] = (constants_1.OP_QUERY >> 16) & 0xff;
- header[index + 1] = (constants_1.OP_QUERY >> 8) & 0xff;
- header[index] = constants_1.OP_QUERY & 0xff;
- index = index + 4;
- // Write header information flags
- header[index + 3] = (flags >> 24) & 0xff;
- header[index + 2] = (flags >> 16) & 0xff;
- header[index + 1] = (flags >> 8) & 0xff;
- header[index] = flags & 0xff;
- index = index + 4;
- // Write collection name
- index = index + header.write(this.ns, index, 'utf8') + 1;
- header[index - 1] = 0;
- // Write header information flags numberToSkip
- header[index + 3] = (this.numberToSkip >> 24) & 0xff;
- header[index + 2] = (this.numberToSkip >> 16) & 0xff;
- header[index + 1] = (this.numberToSkip >> 8) & 0xff;
- header[index] = this.numberToSkip & 0xff;
- index = index + 4;
- // Write header information flags numberToReturn
- header[index + 3] = (this.numberToReturn >> 24) & 0xff;
- header[index + 2] = (this.numberToReturn >> 16) & 0xff;
- header[index + 1] = (this.numberToReturn >> 8) & 0xff;
- header[index] = this.numberToReturn & 0xff;
- index = index + 4;
- // Return the buffers
- return buffers;
- }
- }
- exports.OpQueryRequest = OpQueryRequest;
- /** @internal */
- class OpReply {
- constructor(message, msgHeader, msgBody, opts) {
- this.index = 0;
- this.sections = [];
- /** moreToCome is an OP_MSG only concept */
- this.moreToCome = false;
- this.parsed = false;
- this.raw = message;
- this.data = msgBody;
- this.opts = opts ?? {
- useBigInt64: false,
- promoteLongs: true,
- promoteValues: true,
- promoteBuffers: false,
- bsonRegExp: false
- };
- // Read the message header
- this.length = msgHeader.length;
- this.requestId = msgHeader.requestId;
- this.responseTo = msgHeader.responseTo;
- this.opCode = msgHeader.opCode;
- this.fromCompressed = msgHeader.fromCompressed;
- // Flag values
- this.useBigInt64 = typeof this.opts.useBigInt64 === 'boolean' ? this.opts.useBigInt64 : false;
- this.promoteLongs = typeof this.opts.promoteLongs === 'boolean' ? this.opts.promoteLongs : true;
- this.promoteValues =
- typeof this.opts.promoteValues === 'boolean' ? this.opts.promoteValues : true;
- this.promoteBuffers =
- typeof this.opts.promoteBuffers === 'boolean' ? this.opts.promoteBuffers : false;
- this.bsonRegExp = typeof this.opts.bsonRegExp === 'boolean' ? this.opts.bsonRegExp : false;
- }
- isParsed() {
- return this.parsed;
- }
- parse() {
- // Don't parse again if not needed
- if (this.parsed)
- return this.sections[0];
- // Position within OP_REPLY at which documents start
- // (See https://www.mongodb.com/docs/manual/reference/mongodb-wire-protocol/#wire-op-reply)
- this.index = 20;
- // Read the message body
- this.responseFlags = this.data.readInt32LE(0);
- this.cursorId = new BSON.Long(this.data.readInt32LE(4), this.data.readInt32LE(8));
- this.startingFrom = this.data.readInt32LE(12);
- this.numberReturned = this.data.readInt32LE(16);
- if (this.numberReturned < 0 || this.numberReturned > 2 ** 32 - 1) {
- throw new RangeError(`OP_REPLY numberReturned is an invalid array length ${this.numberReturned}`);
- }
- this.cursorNotFound = (this.responseFlags & CURSOR_NOT_FOUND) !== 0;
- this.queryFailure = (this.responseFlags & QUERY_FAILURE) !== 0;
- this.shardConfigStale = (this.responseFlags & SHARD_CONFIG_STALE) !== 0;
- this.awaitCapable = (this.responseFlags & AWAIT_CAPABLE) !== 0;
- // Parse Body
- for (let i = 0; i < this.numberReturned; i++) {
- const bsonSize = this.data[this.index] |
- (this.data[this.index + 1] << 8) |
- (this.data[this.index + 2] << 16) |
- (this.data[this.index + 3] << 24);
- const section = this.data.subarray(this.index, this.index + bsonSize);
- this.sections.push(section);
- // Adjust the index
- this.index = this.index + bsonSize;
- }
- // Set parsed
- this.parsed = true;
- return this.sections[0];
- }
- }
- exports.OpReply = OpReply;
- // Msg Flags
- const OPTS_CHECKSUM_PRESENT = 1;
- const OPTS_MORE_TO_COME = 2;
- const OPTS_EXHAUST_ALLOWED = 1 << 16;
- /** @internal */
- class DocumentSequence {
- /**
- * Create a new document sequence for the provided field.
- * @param field - The field it will replace.
- */
- constructor(field, documents) {
- this.field = field;
- this.documents = [];
- this.chunks = [];
- this.serializedDocumentsLength = 0;
- // Document sequences starts with type 1 at the first byte.
- // Field strings must always be UTF-8.
- const buffer = Buffer.allocUnsafe(1 + 4 + this.field.length + 1);
- buffer[0] = 1;
- // Third part is the field name at offset 5 with trailing null byte.
- encodeUTF8Into(buffer, `${this.field}\0`, 5);
- this.chunks.push(buffer);
- this.header = buffer;
- if (documents) {
- for (const doc of documents) {
- this.push(doc, BSON.serialize(doc));
- }
- }
- }
- /**
- * Push a document to the document sequence. Will serialize the document
- * as well and return the current serialized length of all documents.
- * @param document - The document to add.
- * @param buffer - The serialized document in raw BSON.
- * @returns The new total document sequence length.
- */
- push(document, buffer) {
- this.serializedDocumentsLength += buffer.length;
- // Push the document.
- this.documents.push(document);
- // Push the document raw bson.
- this.chunks.push(buffer);
- // Write the new length.
- this.header?.writeInt32LE(4 + this.field.length + 1 + this.serializedDocumentsLength, 1);
- return this.serializedDocumentsLength + this.header.length;
- }
- /**
- * Get the fully serialized bytes for the document sequence section.
- * @returns The section bytes.
- */
- toBin() {
- return Buffer.concat(this.chunks);
- }
- }
- exports.DocumentSequence = DocumentSequence;
- /** @internal */
- class OpMsgRequest {
- constructor(databaseName, command, options) {
- // Basic options needed to be passed in
- if (command == null)
- throw new error_1.MongoInvalidArgumentError('Query document must be specified for query');
- // Basic optionsa
- this.databaseName = databaseName;
- this.command = command;
- this.command.$db = databaseName;
- // Ensure empty options
- this.options = options ?? {};
- // Additional options
- this.requestId = options.requestId ? options.requestId : OpMsgRequest.getRequestId();
- // Serialization option
- this.serializeFunctions =
- typeof options.serializeFunctions === 'boolean' ? options.serializeFunctions : false;
- this.ignoreUndefined =
- typeof options.ignoreUndefined === 'boolean' ? options.ignoreUndefined : false;
- this.checkKeys = typeof options.checkKeys === 'boolean' ? options.checkKeys : false;
- this.maxBsonSize = options.maxBsonSize || 1024 * 1024 * 16;
- // flags
- this.checksumPresent = false;
- this.moreToCome = options.moreToCome ?? command.writeConcern?.w === 0;
- this.exhaustAllowed =
- typeof options.exhaustAllowed === 'boolean' ? options.exhaustAllowed : false;
- }
- toBin() {
- const buffers = [];
- let flags = 0;
- if (this.checksumPresent) {
- flags |= OPTS_CHECKSUM_PRESENT;
- }
- if (this.moreToCome) {
- flags |= OPTS_MORE_TO_COME;
- }
- if (this.exhaustAllowed) {
- flags |= OPTS_EXHAUST_ALLOWED;
- }
- const header = Buffer.alloc(4 * 4 + // Header
- 4 // Flags
- );
- buffers.push(header);
- let totalLength = header.length;
- const command = this.command;
- totalLength += this.makeSections(buffers, command);
- header.writeInt32LE(totalLength, 0); // messageLength
- header.writeInt32LE(this.requestId, 4); // requestID
- header.writeInt32LE(0, 8); // responseTo
- header.writeInt32LE(constants_1.OP_MSG, 12); // opCode
- header.writeUInt32LE(flags, 16); // flags
- return buffers;
- }
- /**
- * Add the sections to the OP_MSG request's buffers and returns the length.
- */
- makeSections(buffers, document) {
- const sequencesBuffer = this.extractDocumentSequences(document);
- const payloadTypeBuffer = Buffer.allocUnsafe(1);
- payloadTypeBuffer[0] = 0;
- const documentBuffer = this.serializeBson(document);
- // First section, type 0
- buffers.push(payloadTypeBuffer);
- buffers.push(documentBuffer);
- // Subsequent sections, type 1
- buffers.push(sequencesBuffer);
- return payloadTypeBuffer.length + documentBuffer.length + sequencesBuffer.length;
- }
- /**
- * Extracts the document sequences from the command document and returns
- * a buffer to be added as multiple sections after the initial type 0
- * section in the message.
- */
- extractDocumentSequences(document) {
- // Pull out any field in the command document that's value is a document sequence.
- const chunks = [];
- for (const [key, value] of Object.entries(document)) {
- if (value instanceof DocumentSequence) {
- chunks.push(value.toBin());
- // Why are we removing the field from the command? This is because it needs to be
- // removed in the OP_MSG request first section, and DocumentSequence is not a
- // BSON type and is specific to the MongoDB wire protocol so there's nothing
- // our BSON serializer can do about this. Since DocumentSequence is not exposed
- // in the public API and only used internally, we are never mutating an original
- // command provided by the user, just our own, and it's cheaper to delete from
- // our own command than copying it.
- delete document[key];
- }
- }
- if (chunks.length > 0) {
- return Buffer.concat(chunks);
- }
- // If we have no document sequences we return an empty buffer for nothing to add
- // to the payload.
- return Buffer.alloc(0);
- }
- serializeBson(document) {
- return BSON.serialize(document, {
- checkKeys: this.checkKeys,
- serializeFunctions: this.serializeFunctions,
- ignoreUndefined: this.ignoreUndefined
- });
- }
- static getRequestId() {
- _requestId = (_requestId + 1) & 0x7fffffff;
- return _requestId;
- }
- }
- exports.OpMsgRequest = OpMsgRequest;
- /** @internal */
- class OpMsgResponse {
- constructor(message, msgHeader, msgBody, opts) {
- this.index = 0;
- this.sections = [];
- this.parsed = false;
- this.raw = message;
- this.data = msgBody;
- this.opts = opts ?? {
- useBigInt64: false,
- promoteLongs: true,
- promoteValues: true,
- promoteBuffers: false,
- bsonRegExp: false
- };
- // Read the message header
- this.length = msgHeader.length;
- this.requestId = msgHeader.requestId;
- this.responseTo = msgHeader.responseTo;
- this.opCode = msgHeader.opCode;
- this.fromCompressed = msgHeader.fromCompressed;
- // Read response flags
- this.responseFlags = msgBody.readInt32LE(0);
- this.checksumPresent = (this.responseFlags & OPTS_CHECKSUM_PRESENT) !== 0;
- this.moreToCome = (this.responseFlags & OPTS_MORE_TO_COME) !== 0;
- this.exhaustAllowed = (this.responseFlags & OPTS_EXHAUST_ALLOWED) !== 0;
- this.useBigInt64 = typeof this.opts.useBigInt64 === 'boolean' ? this.opts.useBigInt64 : false;
- this.promoteLongs = typeof this.opts.promoteLongs === 'boolean' ? this.opts.promoteLongs : true;
- this.promoteValues =
- typeof this.opts.promoteValues === 'boolean' ? this.opts.promoteValues : true;
- this.promoteBuffers =
- typeof this.opts.promoteBuffers === 'boolean' ? this.opts.promoteBuffers : false;
- this.bsonRegExp = typeof this.opts.bsonRegExp === 'boolean' ? this.opts.bsonRegExp : false;
- }
- isParsed() {
- return this.parsed;
- }
- parse() {
- // Don't parse again if not needed
- if (this.parsed)
- return this.sections[0];
- this.index = 4;
- while (this.index < this.data.length) {
- const payloadType = this.data.readUInt8(this.index++);
- if (payloadType === 0) {
- const bsonSize = this.data.readUInt32LE(this.index);
- const bin = this.data.subarray(this.index, this.index + bsonSize);
- this.sections.push(bin);
- this.index += bsonSize;
- }
- else if (payloadType === 1) {
- // It was decided that no driver makes use of payload type 1
- // TODO(NODE-3483): Replace with MongoDeprecationError
- throw new error_1.MongoRuntimeError('OP_MSG Payload Type 1 detected unsupported protocol');
- }
- }
- this.parsed = true;
- return this.sections[0];
- }
- }
- exports.OpMsgResponse = OpMsgResponse;
- const MESSAGE_HEADER_SIZE = 16;
- const COMPRESSION_DETAILS_SIZE = 9; // originalOpcode + uncompressedSize, compressorID
- /**
- * @internal
- *
- * An OP_COMPRESSED request wraps either an OP_QUERY or OP_MSG message.
- */
- class OpCompressedRequest {
- constructor(command, options) {
- this.command = command;
- this.options = {
- zlibCompressionLevel: options.zlibCompressionLevel,
- agreedCompressor: options.agreedCompressor
- };
- }
- // Return whether a command contains an uncompressible command term
- // Will return true if command contains no uncompressible command terms
- static canCompress(command) {
- const commandDoc = command instanceof OpMsgRequest ? command.command : command.query;
- const commandName = Object.keys(commandDoc)[0];
- return !compression_1.uncompressibleCommands.has(commandName);
- }
- async toBin() {
- const concatenatedOriginalCommandBuffer = Buffer.concat(this.command.toBin());
- // otherwise, compress the message
- const messageToBeCompressed = concatenatedOriginalCommandBuffer.slice(MESSAGE_HEADER_SIZE);
- // Extract information needed for OP_COMPRESSED from the uncompressed message
- const originalCommandOpCode = concatenatedOriginalCommandBuffer.readInt32LE(12);
- // Compress the message body
- const compressedMessage = await (0, compression_1.compress)(this.options, messageToBeCompressed);
- // Create the msgHeader of OP_COMPRESSED
- const msgHeader = Buffer.alloc(MESSAGE_HEADER_SIZE);
- msgHeader.writeInt32LE(MESSAGE_HEADER_SIZE + COMPRESSION_DETAILS_SIZE + compressedMessage.length, 0); // messageLength
- msgHeader.writeInt32LE(this.command.requestId, 4); // requestID
- msgHeader.writeInt32LE(0, 8); // responseTo (zero)
- msgHeader.writeInt32LE(constants_1.OP_COMPRESSED, 12); // opCode
- // Create the compression details of OP_COMPRESSED
- const compressionDetails = Buffer.alloc(COMPRESSION_DETAILS_SIZE);
- compressionDetails.writeInt32LE(originalCommandOpCode, 0); // originalOpcode
- compressionDetails.writeInt32LE(messageToBeCompressed.length, 4); // Size of the uncompressed compressedMessage, excluding the MsgHeader
- compressionDetails.writeUInt8(compression_1.Compressor[this.options.agreedCompressor], 8); // compressorID
- return [msgHeader, compressionDetails, compressedMessage];
- }
- }
- exports.OpCompressedRequest = OpCompressedRequest;
- //# sourceMappingURL=commands.js.map
|