upload.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.GridFSBucketWriteStream = void 0;
  4. const stream_1 = require("stream");
  5. const bson_1 = require("../bson");
  6. const abstract_cursor_1 = require("../cursor/abstract_cursor");
  7. const error_1 = require("../error");
  8. const timeout_1 = require("../timeout");
  9. const utils_1 = require("../utils");
  10. const write_concern_1 = require("./../write_concern");
  11. /**
  12. * A writable stream that enables you to write buffers to GridFS.
  13. *
  14. * Do not instantiate this class directly. Use `openUploadStream()` instead.
  15. * @public
  16. */
  17. class GridFSBucketWriteStream extends stream_1.Writable {
  18. /**
  19. * @param bucket - Handle for this stream's corresponding bucket
  20. * @param filename - The value of the 'filename' key in the files doc
  21. * @param options - Optional settings.
  22. * @internal
  23. */
  24. constructor(bucket, filename, options) {
  25. super();
  26. /**
  27. * The document containing information about the inserted file.
  28. * This property is defined _after_ the finish event has been emitted.
  29. * It will remain `null` if an error occurs.
  30. *
  31. * @example
  32. * ```ts
  33. * fs.createReadStream('file.txt')
  34. * .pipe(bucket.openUploadStream('file.txt'))
  35. * .on('finish', function () {
  36. * console.log(this.gridFSFile)
  37. * })
  38. * ```
  39. */
  40. this.gridFSFile = null;
  41. options = options ?? {};
  42. this.bucket = bucket;
  43. this.chunks = bucket.s._chunksCollection;
  44. this.filename = filename;
  45. this.files = bucket.s._filesCollection;
  46. this.options = options;
  47. this.writeConcern = write_concern_1.WriteConcern.fromOptions(options) || bucket.s.options.writeConcern;
  48. // Signals the write is all done
  49. this.done = false;
  50. this.id = options.id ? options.id : new bson_1.ObjectId();
  51. // properly inherit the default chunksize from parent
  52. this.chunkSizeBytes = options.chunkSizeBytes || this.bucket.s.options.chunkSizeBytes;
  53. this.bufToStore = Buffer.alloc(this.chunkSizeBytes);
  54. this.length = 0;
  55. this.n = 0;
  56. this.pos = 0;
  57. this.state = {
  58. streamEnd: false,
  59. outstandingRequests: 0,
  60. errored: false,
  61. aborted: false
  62. };
  63. if (options.timeoutMS != null)
  64. this.timeoutContext = new timeout_1.CSOTTimeoutContext({
  65. timeoutMS: options.timeoutMS,
  66. serverSelectionTimeoutMS: (0, utils_1.resolveTimeoutOptions)(this.bucket.s.db.client, {})
  67. .serverSelectionTimeoutMS
  68. });
  69. }
  70. /**
  71. * @internal
  72. *
  73. * The stream is considered constructed when the indexes are done being created
  74. */
  75. _construct(callback) {
  76. if (!this.bucket.s.calledOpenUploadStream) {
  77. this.bucket.s.calledOpenUploadStream = true;
  78. checkIndexes(this).then(() => {
  79. this.bucket.s.checkedIndexes = true;
  80. this.bucket.emit('index');
  81. callback();
  82. }, error => {
  83. if (error instanceof error_1.MongoOperationTimeoutError) {
  84. return handleError(this, error, callback);
  85. }
  86. (0, utils_1.squashError)(error);
  87. callback();
  88. });
  89. }
  90. else {
  91. return process.nextTick(callback);
  92. }
  93. }
  94. /**
  95. * @internal
  96. * Write a buffer to the stream.
  97. *
  98. * @param chunk - Buffer to write
  99. * @param encoding - Optional encoding for the buffer
  100. * @param callback - Function to call when the chunk was added to the buffer, or if the entire chunk was persisted to MongoDB if this chunk caused a flush.
  101. */
  102. _write(chunk, encoding, callback) {
  103. doWrite(this, chunk, encoding, callback);
  104. }
  105. /** @internal */
  106. _final(callback) {
  107. if (this.state.streamEnd) {
  108. return process.nextTick(callback);
  109. }
  110. this.state.streamEnd = true;
  111. writeRemnant(this, callback);
  112. }
  113. /**
  114. * Places this write stream into an aborted state (all future writes fail)
  115. * and deletes all chunks that have already been written.
  116. */
  117. async abort() {
  118. if (this.state.streamEnd) {
  119. // TODO(NODE-3485): Replace with MongoGridFSStreamClosed
  120. throw new error_1.MongoAPIError('Cannot abort a stream that has already completed');
  121. }
  122. if (this.state.aborted) {
  123. // TODO(NODE-3485): Replace with MongoGridFSStreamClosed
  124. throw new error_1.MongoAPIError('Cannot call abort() on a stream twice');
  125. }
  126. this.state.aborted = true;
  127. const remainingTimeMS = this.timeoutContext?.getRemainingTimeMSOrThrow(`Upload timed out after ${this.timeoutContext?.timeoutMS}ms`);
  128. await this.chunks.deleteMany({ files_id: this.id }, { timeoutMS: remainingTimeMS });
  129. }
  130. }
  131. exports.GridFSBucketWriteStream = GridFSBucketWriteStream;
  132. function handleError(stream, error, callback) {
  133. if (stream.state.errored) {
  134. process.nextTick(callback);
  135. return;
  136. }
  137. stream.state.errored = true;
  138. process.nextTick(callback, error);
  139. }
  140. function createChunkDoc(filesId, n, data) {
  141. return {
  142. _id: new bson_1.ObjectId(),
  143. files_id: filesId,
  144. n,
  145. data
  146. };
  147. }
  148. async function checkChunksIndex(stream) {
  149. const index = { files_id: 1, n: 1 };
  150. let remainingTimeMS;
  151. remainingTimeMS = stream.timeoutContext?.getRemainingTimeMSOrThrow(`Upload timed out after ${stream.timeoutContext?.timeoutMS}ms`);
  152. let indexes;
  153. try {
  154. indexes = await stream.chunks
  155. .listIndexes({
  156. timeoutMode: remainingTimeMS != null ? abstract_cursor_1.CursorTimeoutMode.LIFETIME : undefined,
  157. timeoutMS: remainingTimeMS
  158. })
  159. .toArray();
  160. }
  161. catch (error) {
  162. if (error instanceof error_1.MongoError && error.code === error_1.MONGODB_ERROR_CODES.NamespaceNotFound) {
  163. indexes = [];
  164. }
  165. else {
  166. throw error;
  167. }
  168. }
  169. const hasChunksIndex = !!indexes.find(index => {
  170. const keys = Object.keys(index.key);
  171. if (keys.length === 2 && index.key.files_id === 1 && index.key.n === 1) {
  172. return true;
  173. }
  174. return false;
  175. });
  176. if (!hasChunksIndex) {
  177. remainingTimeMS = stream.timeoutContext?.getRemainingTimeMSOrThrow(`Upload timed out after ${stream.timeoutContext?.timeoutMS}ms`);
  178. await stream.chunks.createIndex(index, {
  179. ...stream.writeConcern,
  180. background: true,
  181. unique: true,
  182. timeoutMS: remainingTimeMS
  183. });
  184. }
  185. }
  186. function checkDone(stream, callback) {
  187. if (stream.done) {
  188. return process.nextTick(callback);
  189. }
  190. if (stream.state.streamEnd && stream.state.outstandingRequests === 0 && !stream.state.errored) {
  191. // Set done so we do not trigger duplicate createFilesDoc
  192. stream.done = true;
  193. // Create a new files doc
  194. const gridFSFile = createFilesDoc(stream.id, stream.length, stream.chunkSizeBytes, stream.filename, stream.options.metadata);
  195. if (isAborted(stream, callback)) {
  196. return;
  197. }
  198. const remainingTimeMS = stream.timeoutContext?.remainingTimeMS;
  199. if (remainingTimeMS != null && remainingTimeMS <= 0) {
  200. return handleError(stream, new error_1.MongoOperationTimeoutError(`Upload timed out after ${stream.timeoutContext?.timeoutMS}ms`), callback);
  201. }
  202. stream.files
  203. .insertOne(gridFSFile, { writeConcern: stream.writeConcern, timeoutMS: remainingTimeMS })
  204. .then(() => {
  205. stream.gridFSFile = gridFSFile;
  206. callback();
  207. }, error => {
  208. return handleError(stream, error, callback);
  209. });
  210. return;
  211. }
  212. process.nextTick(callback);
  213. }
  214. async function checkIndexes(stream) {
  215. let remainingTimeMS = stream.timeoutContext?.getRemainingTimeMSOrThrow(`Upload timed out after ${stream.timeoutContext?.timeoutMS}ms`);
  216. const doc = await stream.files.findOne({}, {
  217. projection: { _id: 1 },
  218. timeoutMS: remainingTimeMS
  219. });
  220. if (doc != null) {
  221. // If at least one document exists assume the collection has the required index
  222. return;
  223. }
  224. const index = { filename: 1, uploadDate: 1 };
  225. let indexes;
  226. remainingTimeMS = stream.timeoutContext?.getRemainingTimeMSOrThrow(`Upload timed out after ${stream.timeoutContext?.timeoutMS}ms`);
  227. const listIndexesOptions = {
  228. timeoutMode: remainingTimeMS != null ? abstract_cursor_1.CursorTimeoutMode.LIFETIME : undefined,
  229. timeoutMS: remainingTimeMS
  230. };
  231. try {
  232. indexes = await stream.files.listIndexes(listIndexesOptions).toArray();
  233. }
  234. catch (error) {
  235. if (error instanceof error_1.MongoError && error.code === error_1.MONGODB_ERROR_CODES.NamespaceNotFound) {
  236. indexes = [];
  237. }
  238. else {
  239. throw error;
  240. }
  241. }
  242. const hasFileIndex = !!indexes.find(index => {
  243. const keys = Object.keys(index.key);
  244. if (keys.length === 2 && index.key.filename === 1 && index.key.uploadDate === 1) {
  245. return true;
  246. }
  247. return false;
  248. });
  249. if (!hasFileIndex) {
  250. remainingTimeMS = stream.timeoutContext?.getRemainingTimeMSOrThrow(`Upload timed out after ${stream.timeoutContext?.timeoutMS}ms`);
  251. await stream.files.createIndex(index, { background: false, timeoutMS: remainingTimeMS });
  252. }
  253. await checkChunksIndex(stream);
  254. }
  255. function createFilesDoc(_id, length, chunkSize, filename, metadata) {
  256. const ret = {
  257. _id,
  258. length,
  259. chunkSize,
  260. uploadDate: new Date(),
  261. filename
  262. };
  263. if (metadata) {
  264. ret.metadata = metadata;
  265. }
  266. return ret;
  267. }
  268. function doWrite(stream, chunk, encoding, callback) {
  269. if (isAborted(stream, callback)) {
  270. return;
  271. }
  272. const inputBuf = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding);
  273. stream.length += inputBuf.length;
  274. // Input is small enough to fit in our buffer
  275. if (stream.pos + inputBuf.length < stream.chunkSizeBytes) {
  276. inputBuf.copy(stream.bufToStore, stream.pos);
  277. stream.pos += inputBuf.length;
  278. process.nextTick(callback);
  279. return;
  280. }
  281. // Otherwise, buffer is too big for current chunk, so we need to flush
  282. // to MongoDB.
  283. let inputBufRemaining = inputBuf.length;
  284. let spaceRemaining = stream.chunkSizeBytes - stream.pos;
  285. let numToCopy = Math.min(spaceRemaining, inputBuf.length);
  286. let outstandingRequests = 0;
  287. while (inputBufRemaining > 0) {
  288. const inputBufPos = inputBuf.length - inputBufRemaining;
  289. inputBuf.copy(stream.bufToStore, stream.pos, inputBufPos, inputBufPos + numToCopy);
  290. stream.pos += numToCopy;
  291. spaceRemaining -= numToCopy;
  292. let doc;
  293. if (spaceRemaining === 0) {
  294. doc = createChunkDoc(stream.id, stream.n, Buffer.from(stream.bufToStore));
  295. const remainingTimeMS = stream.timeoutContext?.remainingTimeMS;
  296. if (remainingTimeMS != null && remainingTimeMS <= 0) {
  297. return handleError(stream, new error_1.MongoOperationTimeoutError(`Upload timed out after ${stream.timeoutContext?.timeoutMS}ms`), callback);
  298. }
  299. ++stream.state.outstandingRequests;
  300. ++outstandingRequests;
  301. if (isAborted(stream, callback)) {
  302. return;
  303. }
  304. stream.chunks
  305. .insertOne(doc, { writeConcern: stream.writeConcern, timeoutMS: remainingTimeMS })
  306. .then(() => {
  307. --stream.state.outstandingRequests;
  308. --outstandingRequests;
  309. if (!outstandingRequests) {
  310. checkDone(stream, callback);
  311. }
  312. }, error => {
  313. return handleError(stream, error, callback);
  314. });
  315. spaceRemaining = stream.chunkSizeBytes;
  316. stream.pos = 0;
  317. ++stream.n;
  318. }
  319. inputBufRemaining -= numToCopy;
  320. numToCopy = Math.min(spaceRemaining, inputBufRemaining);
  321. }
  322. }
  323. function writeRemnant(stream, callback) {
  324. // Buffer is empty, so don't bother to insert
  325. if (stream.pos === 0) {
  326. return checkDone(stream, callback);
  327. }
  328. // Create a new buffer to make sure the buffer isn't bigger than it needs
  329. // to be.
  330. const remnant = Buffer.alloc(stream.pos);
  331. stream.bufToStore.copy(remnant, 0, 0, stream.pos);
  332. const doc = createChunkDoc(stream.id, stream.n, remnant);
  333. // If the stream was aborted, do not write remnant
  334. if (isAborted(stream, callback)) {
  335. return;
  336. }
  337. const remainingTimeMS = stream.timeoutContext?.remainingTimeMS;
  338. if (remainingTimeMS != null && remainingTimeMS <= 0) {
  339. return handleError(stream, new error_1.MongoOperationTimeoutError(`Upload timed out after ${stream.timeoutContext?.timeoutMS}ms`), callback);
  340. }
  341. ++stream.state.outstandingRequests;
  342. stream.chunks
  343. .insertOne(doc, { writeConcern: stream.writeConcern, timeoutMS: remainingTimeMS })
  344. .then(() => {
  345. --stream.state.outstandingRequests;
  346. checkDone(stream, callback);
  347. }, error => {
  348. return handleError(stream, error, callback);
  349. });
  350. }
  351. function isAborted(stream, callback) {
  352. if (stream.state.aborted) {
  353. process.nextTick(callback, new error_1.MongoAPIError('Stream has been aborted'));
  354. return true;
  355. }
  356. return false;
  357. }
  358. //# sourceMappingURL=upload.js.map