"use strict"; Object.defineProperty(exports, "__esModule", { value: true }); exports.FsaNodeWriteStream = void 0; const stream_1 = require("stream"); const Defer_1 = require("../thingies/Defer"); const concurrency_1 = require("../thingies/concurrency"); const util_1 = require("../node/util"); const queueMicrotask_1 = require("../queueMicrotask"); /** * This WriteStream implementation does not build on top of the `fs` module, * but instead uses the lower-level `FileSystemFileHandle` interface. The reason * is the different semantics in `fs` and FSA (File System Access API) write streams. * * When data is written to an FSA file, a new FSA stream is created, it copies * the file to a temporary swap file. After each written chunk, that swap file * is closed and the original file is replaced with the swap file. This means, * if WriteStream was built on top of `fs`, each chunk write would result in * a file copy, write, close, rename operations, which is not what we want. * * Instead this implementation hooks into the lower-level and closes the swap * file only once the stream is closed. The downside is that the written data * is not immediately visible to other processes (because it is written to the * swap file), but that is the trade-off we have to make. * * @todo Could make this flush the data to the original file periodically, so that * the data is visible to other processes. * @todo This stream could work through `FileSystemSyncAccessHandle.write` in a * Worker thread instead. */ class FsaNodeWriteStream extends stream_1.Writable { constructor(handle, path, options) { super(); this.path = path; this.options = options; this.__pending__ = true; this.__closed__ = false; this.__bytes__ = 0; this.__mutex__ = (0, concurrency_1.concurrency)(1); if (options.start !== undefined) { if (typeof options.start !== 'number') { throw new TypeError('"start" option must be a Number'); } if (options.start < 0) { throw new TypeError('"start" must be >= zero'); } } const stream = new Defer_1.Defer(); this.__stream__ = stream.promise; (async () => { var _a, _b; const fsaHandle = await handle; const fileWasOpened = !options.fd; if (fileWasOpened) this.emit('open', fsaHandle.fd); const flags = (0, util_1.flagsToNumber)((_a = options.flags) !== null && _a !== void 0 ? _a : 'w'); const keepExistingData = flags & 1024 /* FLAG.O_APPEND */ ? true : false; const writable = await fsaHandle.file.createWritable({ keepExistingData }); if (keepExistingData) { const start = Number((_b = options.start) !== null && _b !== void 0 ? _b : 0); if (start) await writable.seek(start); } this.__pending__ = false; stream.resolve(writable); })().catch(error => { stream.reject(error); }); } async ___write___(buffers) { await this.__mutex__(async () => { if (this.__closed__) return; // if (this.__closed__) throw new Error('WriteStream is closed'); const writable = await this.__stream__; for (const buffer of buffers) { await writable.write(buffer); this.__bytes__ += buffer.byteLength; } }); } async __close__() { const emitClose = this.options.emitClose; await this.__mutex__(async () => { if (this.__closed__ && emitClose) { (0, queueMicrotask_1.default)(() => this.emit('close')); return; } try { const writable = await this.__stream__; this.__closed__ = true; await writable.close(); if (emitClose) this.emit('close'); } catch (error) { this.emit('error', error); if (emitClose) this.emit('close', error); } }); } // ------------------------------------------------------------- IWriteStream get bytesWritten() { return this.__bytes__; } get pending() { return this.__pending__; } close(cb) { if (cb) this.once('close', cb); this.__close__().catch(() => { }); } // ----------------------------------------------------------------- Writable _write(chunk, encoding, callback) { this.___write___([chunk]) .then(() => { if (callback) callback(null); }) .catch(error => { if (callback) callback(error); }); } _writev(chunks, callback) { const buffers = chunks.map(({ chunk }) => chunk); this.___write___(buffers) .then(() => { if (callback) callback(null); }) .catch(error => { if (callback) callback(error); }); } _final(callback) { this.__close__() .then(() => { if (callback) callback(null); }) .catch(error => { if (callback) callback(error); }); } } exports.FsaNodeWriteStream = FsaNodeWriteStream; //# sourceMappingURL=FsaNodeWriteStream.js.map