diff --git a/example/tests/benchmark.tsx b/example/tests/benchmark.tsx index 7726fca..29dd7bb 100644 --- a/example/tests/benchmark.tsx +++ b/example/tests/benchmark.tsx @@ -76,7 +76,7 @@ const TESTS = [ }, ] -const BASE_URL = Platform.OS === 'android' ? 'ws://10.0.2.2' : 'ws://localhost' +export const BASE_URL = Platform.OS === 'android' ? '10.0.2.2' : 'localhost' export function BenchmarkUI() { return ( @@ -278,7 +278,7 @@ async function runSingleTest(opts: TestCase): Promise { const { Ws, port, messageCount, testCase, payload } = opts return new Promise((resolve) => { - const ws = new Ws(`${BASE_URL}:${port}`) + const ws = new Ws(`ws://${BASE_URL}:${port}`) let outgoingTime: number let incomingTime: number let received = 0 diff --git a/example/tests/filesystem.tsx b/example/tests/filesystem.tsx index 67bfbcb..52f6a2c 100644 --- a/example/tests/filesystem.tsx +++ b/example/tests/filesystem.tsx @@ -7,6 +7,8 @@ import { showOpenFilePicker, } from 'react-native-fast-io' +import { BASE_URL } from './benchmark' + export function FileSystemUI() { const [file, setFile] = useState(null) @@ -24,7 +26,7 @@ export function FileSystemUI() { const body = compression ? file.stream().pipeThrough(new CompressionStream(compression)) : file - await fetch('http://localhost:3002/upload', { + await fetch(`http://${BASE_URL}:3002/upload`, { method: 'POST', body, }) @@ -32,7 +34,7 @@ export function FileSystemUI() { const logContents = async () => { for await (const chunk of file!.stream()) { - console.log(chunk) + console.log('Chunk length: ', chunk.length) } } diff --git a/packages/react-native-fast-io/android/src/main/java/com/margelo/nitro/fastio/HybridDuplexStream.kt b/packages/react-native-fast-io/android/src/main/java/com/margelo/nitro/fastio/HybridDuplexStream.kt index 7a88210..4f0f088 100644 --- a/packages/react-native-fast-io/android/src/main/java/com/margelo/nitro/fastio/HybridDuplexStream.kt +++ b/packages/react-native-fast-io/android/src/main/java/com/margelo/nitro/fastio/HybridDuplexStream.kt @@ -1,14 +1,24 @@ package com.margelo.nitro.fastio -class HybridDuplexStream : HybridDuplexStreamSpec() { - override var inputStream: HybridInputStreamSpec - get() = throw NotImplementedError("HybridDuplexStream.inputStream getter not implemented") - set(_) = throw NotImplementedError("HybridDuplexStream.inputStream setter not implemented") +import java.io.PipedInputStream +import java.io.PipedOutputStream - override var outputStream: HybridOutputStreamSpec - get() = throw NotImplementedError("HybridDuplexStream.outputStream getter not implemented") - set(_) = throw NotImplementedError("HybridDuplexStream.outputStream setter not implemented") +class HybridDuplexStream : HybridDuplexStreamSpec() { + private val pipedInputStream = PipedInputStream(HybridStreamFactory.BUFFER_SIZE) + private val pipedOutputStream = PipedOutputStream(pipedInputStream) + override var inputStream = HybridInputStream(pipedInputStream) as HybridInputStreamSpec + override var outputStream = HybridOutputStream(pipedOutputStream) as HybridOutputStreamSpec + override val memorySize: Long - get() = 0L + get() = inputStream.memorySize + outputStream.memorySize + + fun close() { + try { + outputStream.close() + inputStream.close() + } catch (e: Exception) { + println("Error closing duplex stream: ${e.message}") + } + } } diff --git a/packages/react-native-fast-io/android/src/main/java/com/margelo/nitro/fastio/HybridInputStream.kt b/packages/react-native-fast-io/android/src/main/java/com/margelo/nitro/fastio/HybridInputStream.kt index 1835dca..32567f0 100644 --- a/packages/react-native-fast-io/android/src/main/java/com/margelo/nitro/fastio/HybridInputStream.kt +++ b/packages/react-native-fast-io/android/src/main/java/com/margelo/nitro/fastio/HybridInputStream.kt @@ -1,44 +1,42 @@ package com.margelo.nitro.fastio import com.margelo.nitro.core.ArrayBuffer +import com.margelo.nitro.core.Promise import java.io.InputStream -class HybridInputStream(private val stream: InputStream) : HybridInputStreamSpec() { - private var isOpen = true - - override fun hasBytesAvailable(): Boolean { - if (!isOpen) return false - - return try { - stream.available() > 0 - } catch (e: Exception) { - false +class HybridInputStream(public val stream: InputStream) : HybridInputStreamSpec() { + override fun read(): Promise { + return Promise.async { + val bytes = ByteArray(HybridStreamFactory.BUFFER_SIZE) + val bytesRead = stream.read(bytes, 0, bytes.size) + + when { + bytesRead == -1 -> { + // End of stream + ArrayBuffer.allocate(0) + } + bytesRead > 0 -> { + val arrayBuffer = ArrayBuffer.allocate(bytesRead) + + val destBuffer = arrayBuffer.getBuffer(false) + destBuffer.put(bytes, 0, bytesRead) + + arrayBuffer + } + else -> { + // Error case + throw Error("Unexpected error reading stream") + } + } } } - override fun read(buffer: ArrayBuffer, maxLength: Double): Double { - val byteBuffer = buffer.getBuffer(false) - - val tempBuffer = ByteArray(minOf(maxLength.toInt(), buffer.size)) - - val bytesRead = stream.read(tempBuffer, 0, tempBuffer.size) - - if (bytesRead > 0) { - byteBuffer.put(tempBuffer, 0, bytesRead) - } - - return bytesRead.toDouble() - } - override fun open() { - // no-op + // No explicit open needed for Java InputStreams } override fun close() { - if (!isOpen) return - stream.close() - isOpen = false } override val memorySize: Long diff --git a/packages/react-native-fast-io/android/src/main/java/com/margelo/nitro/fastio/HybridNetwork.kt b/packages/react-native-fast-io/android/src/main/java/com/margelo/nitro/fastio/HybridNetwork.kt index 96dd114..b5c7e56 100644 --- a/packages/react-native-fast-io/android/src/main/java/com/margelo/nitro/fastio/HybridNetwork.kt +++ b/packages/react-native-fast-io/android/src/main/java/com/margelo/nitro/fastio/HybridNetwork.kt @@ -1,10 +1,44 @@ package com.margelo.nitro.fastio +import android.util.Log import com.margelo.nitro.core.Promise +import java.net.HttpURLConnection +import java.net.URL class HybridNetwork : HybridNetworkSpec() { override fun request(opts: RequestOptions): Promise { - throw NotImplementedError("HybridNetwork.request() not implemented") + return Promise.async { + val connection = URL(opts.url).openConnection() as HttpURLConnection + + connection.apply { + requestMethod = opts.method.name.uppercase() + doInput = true + + opts.body?.let { hybridStream -> + (hybridStream as HybridInputStream).stream.use { input -> + outputStream.use { output -> + val buffer = ByteArray(HybridStreamFactory.BUFFER_SIZE) + var bytesRead: Int + + while (input.read(buffer).also { bytesRead = it } != -1) { + output.write(buffer, 0, bytesRead) + output.flush() // Important: flush each chunk + } + } + } + } + + connect() + + if (responseCode in 200..299) { + // tbd + } else { + throw Error("HTTP Error: $responseCode") + } + } + + connection.disconnect() + } } override val memorySize: Long diff --git a/packages/react-native-fast-io/android/src/main/java/com/margelo/nitro/fastio/HybridOutputStream.kt b/packages/react-native-fast-io/android/src/main/java/com/margelo/nitro/fastio/HybridOutputStream.kt index c47b684..b0c9a85 100644 --- a/packages/react-native-fast-io/android/src/main/java/com/margelo/nitro/fastio/HybridOutputStream.kt +++ b/packages/react-native-fast-io/android/src/main/java/com/margelo/nitro/fastio/HybridOutputStream.kt @@ -1,22 +1,32 @@ package com.margelo.nitro.fastio import com.margelo.nitro.core.ArrayBuffer +import com.margelo.nitro.core.Promise +import java.io.OutputStream -class HybridOutputStream : HybridOutputStreamSpec() { - override fun hasSpaceAvailable(): Boolean { - throw NotImplementedError("HybridOutputStream.hasSpaceAvailable() not implemented") - } +class HybridOutputStream(private val stream: OutputStream) : HybridOutputStreamSpec() { + + override fun write(buffer: ArrayBuffer): Promise { + val byteBuffer = buffer.getBuffer(false) + val bytes = ByteArray(buffer.size) + byteBuffer.get(bytes) - override fun write(buffer: ArrayBuffer, maxLength: Double): Double { - throw NotImplementedError("HybridOutputStream.write() not implemented") + return Promise.async { + stream.write(bytes) + } } override fun open() { - throw NotImplementedError("HybridOutputStream.open() not implemented") + // No explicit open needed for Java OutputStreams } override fun close() { - throw NotImplementedError("HybridOutputStream.close() not implemented") + try { + stream.flush() + stream.close() + } catch (e: Exception) { + println("Error closing stream: ${e.message}") + } } override val memorySize: Long diff --git a/packages/react-native-fast-io/android/src/main/java/com/margelo/nitro/fastio/HybridStreamFactory.kt b/packages/react-native-fast-io/android/src/main/java/com/margelo/nitro/fastio/HybridStreamFactory.kt index d13a733..e6c1cbf 100644 --- a/packages/react-native-fast-io/android/src/main/java/com/margelo/nitro/fastio/HybridStreamFactory.kt +++ b/packages/react-native-fast-io/android/src/main/java/com/margelo/nitro/fastio/HybridStreamFactory.kt @@ -22,6 +22,9 @@ class HybridStreamFactory : HybridStreamFactorySpec() { get() = 0L companion object { + @JvmStatic + val BUFFER_SIZE: Int = getBufferSize() + @JvmStatic private external fun getBufferSize(): Int } diff --git a/packages/react-native-fast-io/ios/HybridDuplexStream.swift b/packages/react-native-fast-io/ios/HybridDuplexStream.swift index 0fdaffb..749ee3c 100644 --- a/packages/react-native-fast-io/ios/HybridDuplexStream.swift +++ b/packages/react-native-fast-io/ios/HybridDuplexStream.swift @@ -12,10 +12,10 @@ class HybridDuplexStream : HybridDuplexStreamSpec { var outputStream: (any HybridOutputStreamSpec) init() { - var inputStreamRef: InputStream? = InputStream() + var inputStreamRef: InputStream? = InputStream() var outputStreamRef: OutputStream? = OutputStream(toMemory: ()) - Stream.getBoundStreams(withBufferSize: HybridStreamFactory.BUFFER_SIZE, inputStream: &inputStreamRef, outputStream: &outputStreamRef) + Stream.getBoundStreams(withBufferSize: Int(HybridStreamFactory.BUFFER_SIZE), inputStream: &inputStreamRef, outputStream: &outputStreamRef) guard let inputStreamRef, let outputStreamRef else { fatalError("Could not create streams") diff --git a/packages/react-native-fast-io/ios/HybridInputStream.swift b/packages/react-native-fast-io/ios/HybridInputStream.swift index 93cb32a..c99f527 100644 --- a/packages/react-native-fast-io/ios/HybridInputStream.swift +++ b/packages/react-native-fast-io/ios/HybridInputStream.swift @@ -15,16 +15,32 @@ class HybridInputStream : HybridInputStreamSpec { self.stream = stream } - func hasBytesAvailable() throws -> Bool { - stream.hasBytesAvailable - } - func open() throws -> Void { stream.open() } - func read(buffer: ArrayBufferHolder, maxLength: Double) throws -> Double { - Double(stream.read(buffer.data, maxLength: Int(maxLength))) + func read() throws -> Promise { + let promise = Promise() + + Task { + let size = Int(HybridStreamFactory.BUFFER_SIZE) + let data = UnsafeMutablePointer.allocate(capacity: size) + + let bytesRead = stream.read(data, maxLength: size) + + let deleteFunc = { + data.deallocate() + } + + if (bytesRead >= 0) { + promise.resolve(withResult: ArrayBufferHolder.wrap(dataWithoutCopy: data, size: bytesRead, onDelete: deleteFunc)) + } else { + deleteFunc() + promise.reject(withError: stream.streamError ?? RuntimeError.error(withMessage: "Unexpected error reading stream")) + } + } + + return promise } func close() { diff --git a/packages/react-native-fast-io/ios/HybridOutputStream.swift b/packages/react-native-fast-io/ios/HybridOutputStream.swift index fc8a7e8..1b182fa 100644 --- a/packages/react-native-fast-io/ios/HybridOutputStream.swift +++ b/packages/react-native-fast-io/ios/HybridOutputStream.swift @@ -15,16 +15,26 @@ class HybridOutputStream : HybridOutputStreamSpec { self.stream = stream } - func hasSpaceAvailable() throws -> Bool { - stream.hasSpaceAvailable - } - func open() throws -> Void { stream.open() } - func write(buffer: ArrayBufferHolder, maxLength: Double) throws -> Double { - Double(stream.write(buffer.data, maxLength: Int(maxLength))) + func write(buffer: ArrayBufferHolder) throws -> Promise { + let promise = Promise() + + let data = buffer.data + let length = buffer.size + + Task { + let bytesWritten = stream.write(data, maxLength: length) + if (bytesWritten == length) { + promise.resolve(withResult: ()) + } else { + promise.reject(withError: stream.streamError ?? RuntimeError.error(withMessage: "Unexpected error writing to stream")) + } + } + + return promise } func close() { diff --git a/packages/react-native-fast-io/nitrogen/generated/android/c++/JHybridInputStreamSpec.cpp b/packages/react-native-fast-io/nitrogen/generated/android/c++/JHybridInputStreamSpec.cpp index e16a52a..73486e7 100644 --- a/packages/react-native-fast-io/nitrogen/generated/android/c++/JHybridInputStreamSpec.cpp +++ b/packages/react-native-fast-io/nitrogen/generated/android/c++/JHybridInputStreamSpec.cpp @@ -10,7 +10,9 @@ // Forward declaration of `ArrayBuffer` to properly resolve imports. namespace NitroModules { class ArrayBuffer; } +#include #include +#include #include namespace margelo::nitro::fastio { @@ -34,15 +36,21 @@ namespace margelo::nitro::fastio { // Methods - bool JHybridInputStreamSpec::hasBytesAvailable() { - static const auto method = _javaPart->getClass()->getMethod("hasBytesAvailable"); + std::future> JHybridInputStreamSpec::read() { + static const auto method = _javaPart->getClass()->getMethod()>("read"); auto __result = method(_javaPart); - return static_cast(__result); - } - double JHybridInputStreamSpec::read(const std::shared_ptr& buffer, double maxLength) { - static const auto method = _javaPart->getClass()->getMethod /* buffer */, double /* maxLength */)>("read"); - auto __result = method(_javaPart, JArrayBuffer::wrap(buffer), maxLength); - return __result; + return [&]() { + auto __promise = std::make_shared>>(); + __result->cthis()->addOnResolvedListener([=](const jni::alias_ref& __boxedResult) { + auto __result = jni::static_ref_cast(__boxedResult); + __promise->set_value(__result->cthis()->getArrayBuffer()); + }); + __result->cthis()->addOnRejectedListener([=](const jni::alias_ref& __message) { + std::runtime_error __error(__message->toStdString()); + __promise->set_exception(std::make_exception_ptr(__error)); + }); + return __promise->get_future(); + }(); } void JHybridInputStreamSpec::open() { static const auto method = _javaPart->getClass()->getMethod("open"); diff --git a/packages/react-native-fast-io/nitrogen/generated/android/c++/JHybridInputStreamSpec.hpp b/packages/react-native-fast-io/nitrogen/generated/android/c++/JHybridInputStreamSpec.hpp index 6d8cf04..e4f3e49 100644 --- a/packages/react-native-fast-io/nitrogen/generated/android/c++/JHybridInputStreamSpec.hpp +++ b/packages/react-native-fast-io/nitrogen/generated/android/c++/JHybridInputStreamSpec.hpp @@ -51,8 +51,7 @@ namespace margelo::nitro::fastio { public: // Methods - bool hasBytesAvailable() override; - double read(const std::shared_ptr& buffer, double maxLength) override; + std::future> read() override; void open() override; void close() override; diff --git a/packages/react-native-fast-io/nitrogen/generated/android/c++/JHybridOutputStreamSpec.cpp b/packages/react-native-fast-io/nitrogen/generated/android/c++/JHybridOutputStreamSpec.cpp index 692dcbb..a70a4c9 100644 --- a/packages/react-native-fast-io/nitrogen/generated/android/c++/JHybridOutputStreamSpec.cpp +++ b/packages/react-native-fast-io/nitrogen/generated/android/c++/JHybridOutputStreamSpec.cpp @@ -10,6 +10,8 @@ // Forward declaration of `ArrayBuffer` to properly resolve imports. namespace NitroModules { class ArrayBuffer; } +#include +#include #include #include @@ -34,15 +36,20 @@ namespace margelo::nitro::fastio { // Methods - bool JHybridOutputStreamSpec::hasSpaceAvailable() { - static const auto method = _javaPart->getClass()->getMethod("hasSpaceAvailable"); - auto __result = method(_javaPart); - return static_cast(__result); - } - double JHybridOutputStreamSpec::write(const std::shared_ptr& buffer, double maxLength) { - static const auto method = _javaPart->getClass()->getMethod /* buffer */, double /* maxLength */)>("write"); - auto __result = method(_javaPart, JArrayBuffer::wrap(buffer), maxLength); - return __result; + std::future JHybridOutputStreamSpec::write(const std::shared_ptr& buffer) { + static const auto method = _javaPart->getClass()->getMethod(jni::alias_ref /* buffer */)>("write"); + auto __result = method(_javaPart, JArrayBuffer::wrap(buffer)); + return [&]() { + auto __promise = std::make_shared>(); + __result->cthis()->addOnResolvedListener([=](const jni::alias_ref& __boxedResult) { + __promise->set_value(); + }); + __result->cthis()->addOnRejectedListener([=](const jni::alias_ref& __message) { + std::runtime_error __error(__message->toStdString()); + __promise->set_exception(std::make_exception_ptr(__error)); + }); + return __promise->get_future(); + }(); } void JHybridOutputStreamSpec::open() { static const auto method = _javaPart->getClass()->getMethod("open"); diff --git a/packages/react-native-fast-io/nitrogen/generated/android/c++/JHybridOutputStreamSpec.hpp b/packages/react-native-fast-io/nitrogen/generated/android/c++/JHybridOutputStreamSpec.hpp index b13f035..2154b8b 100644 --- a/packages/react-native-fast-io/nitrogen/generated/android/c++/JHybridOutputStreamSpec.hpp +++ b/packages/react-native-fast-io/nitrogen/generated/android/c++/JHybridOutputStreamSpec.hpp @@ -51,8 +51,7 @@ namespace margelo::nitro::fastio { public: // Methods - bool hasSpaceAvailable() override; - double write(const std::shared_ptr& buffer, double maxLength) override; + std::future write(const std::shared_ptr& buffer) override; void open() override; void close() override; diff --git a/packages/react-native-fast-io/nitrogen/generated/android/kotlin/com/margelo/nitro/fastio/HybridInputStreamSpec.kt b/packages/react-native-fast-io/nitrogen/generated/android/kotlin/com/margelo/nitro/fastio/HybridInputStreamSpec.kt index db85292..4bc2b4d 100644 --- a/packages/react-native-fast-io/nitrogen/generated/android/kotlin/com/margelo/nitro/fastio/HybridInputStreamSpec.kt +++ b/packages/react-native-fast-io/nitrogen/generated/android/kotlin/com/margelo/nitro/fastio/HybridInputStreamSpec.kt @@ -43,11 +43,7 @@ abstract class HybridInputStreamSpec: HybridObject() { // Methods @DoNotStrip @Keep - abstract fun hasBytesAvailable(): Boolean - - @DoNotStrip - @Keep - abstract fun read(buffer: ArrayBuffer, maxLength: Double): Double + abstract fun read(): Promise @DoNotStrip @Keep diff --git a/packages/react-native-fast-io/nitrogen/generated/android/kotlin/com/margelo/nitro/fastio/HybridOutputStreamSpec.kt b/packages/react-native-fast-io/nitrogen/generated/android/kotlin/com/margelo/nitro/fastio/HybridOutputStreamSpec.kt index c20a94d..85d2e2d 100644 --- a/packages/react-native-fast-io/nitrogen/generated/android/kotlin/com/margelo/nitro/fastio/HybridOutputStreamSpec.kt +++ b/packages/react-native-fast-io/nitrogen/generated/android/kotlin/com/margelo/nitro/fastio/HybridOutputStreamSpec.kt @@ -43,11 +43,7 @@ abstract class HybridOutputStreamSpec: HybridObject() { // Methods @DoNotStrip @Keep - abstract fun hasSpaceAvailable(): Boolean - - @DoNotStrip - @Keep - abstract fun write(buffer: ArrayBuffer, maxLength: Double): Double + abstract fun write(buffer: ArrayBuffer): Promise @DoNotStrip @Keep diff --git a/packages/react-native-fast-io/nitrogen/generated/ios/FastIO-Swift-Cxx-Bridge.hpp b/packages/react-native-fast-io/nitrogen/generated/ios/FastIO-Swift-Cxx-Bridge.hpp index 0cf1d15..da13f0b 100644 --- a/packages/react-native-fast-io/nitrogen/generated/ios/FastIO-Swift-Cxx-Bridge.hpp +++ b/packages/react-native-fast-io/nitrogen/generated/ios/FastIO-Swift-Cxx-Bridge.hpp @@ -173,6 +173,15 @@ namespace margelo::nitro::fastio::bridge::swift { std::shared_ptr create_std__shared_ptr_margelo__nitro__fastio__HybridNetworkSpec_(void* _Nonnull swiftUnsafePointer); void* _Nonnull get_std__shared_ptr_margelo__nitro__fastio__HybridNetworkSpec_(std__shared_ptr_margelo__nitro__fastio__HybridNetworkSpec_ cppType); + // pragma MARK: PromiseHolder> + /** + * Specialized version of `PromiseHolder>`. + */ + using PromiseHolder_std__shared_ptr_ArrayBuffer__ = PromiseHolder>; + inline PromiseHolder> create_PromiseHolder_std__shared_ptr_ArrayBuffer__() { + return PromiseHolder>(); + } + // pragma MARK: std::shared_ptr /** * Specialized version of `std::shared_ptr`. diff --git a/packages/react-native-fast-io/nitrogen/generated/ios/c++/HybridInputStreamSpecSwift.hpp b/packages/react-native-fast-io/nitrogen/generated/ios/c++/HybridInputStreamSpecSwift.hpp index d70e59f..957536b 100644 --- a/packages/react-native-fast-io/nitrogen/generated/ios/c++/HybridInputStreamSpecSwift.hpp +++ b/packages/react-native-fast-io/nitrogen/generated/ios/c++/HybridInputStreamSpecSwift.hpp @@ -17,7 +17,9 @@ namespace NitroModules { class ArrayBuffer; } // Forward declaration of `ArrayBufferHolder` to properly resolve imports. namespace NitroModules { class ArrayBufferHolder; } +#include #include +#include #include #if __has_include() @@ -63,13 +65,9 @@ namespace margelo::nitro::fastio { public: // Methods - inline bool hasBytesAvailable() override { - auto __result = _swiftPart.hasBytesAvailable(); - return __result; - } - inline double read(const std::shared_ptr& buffer, double maxLength) override { - auto __result = _swiftPart.read(ArrayBufferHolder(buffer), std::forward(maxLength)); - return __result; + inline std::future> read() override { + auto __result = _swiftPart.read(); + return __result.getFuture(); } inline void open() override { _swiftPart.open(); diff --git a/packages/react-native-fast-io/nitrogen/generated/ios/c++/HybridOutputStreamSpecSwift.hpp b/packages/react-native-fast-io/nitrogen/generated/ios/c++/HybridOutputStreamSpecSwift.hpp index 5714a8f..8625f52 100644 --- a/packages/react-native-fast-io/nitrogen/generated/ios/c++/HybridOutputStreamSpecSwift.hpp +++ b/packages/react-native-fast-io/nitrogen/generated/ios/c++/HybridOutputStreamSpecSwift.hpp @@ -17,6 +17,8 @@ namespace NitroModules { class ArrayBuffer; } // Forward declaration of `ArrayBufferHolder` to properly resolve imports. namespace NitroModules { class ArrayBufferHolder; } +#include +#include #include #include @@ -63,13 +65,9 @@ namespace margelo::nitro::fastio { public: // Methods - inline bool hasSpaceAvailable() override { - auto __result = _swiftPart.hasSpaceAvailable(); - return __result; - } - inline double write(const std::shared_ptr& buffer, double maxLength) override { - auto __result = _swiftPart.write(ArrayBufferHolder(buffer), std::forward(maxLength)); - return __result; + inline std::future write(const std::shared_ptr& buffer) override { + auto __result = _swiftPart.write(ArrayBufferHolder(buffer)); + return __result.getFuture(); } inline void open() override { _swiftPart.open(); diff --git a/packages/react-native-fast-io/nitrogen/generated/ios/swift/HybridInputStreamSpec.swift b/packages/react-native-fast-io/nitrogen/generated/ios/swift/HybridInputStreamSpec.swift index a727103..bbbb625 100644 --- a/packages/react-native-fast-io/nitrogen/generated/ios/swift/HybridInputStreamSpec.swift +++ b/packages/react-native-fast-io/nitrogen/generated/ios/swift/HybridInputStreamSpec.swift @@ -32,8 +32,7 @@ public protocol HybridInputStreamSpec: AnyObject, HybridObjectSpec { // Methods - func hasBytesAvailable() throws -> Bool - func read(buffer: ArrayBufferHolder, maxLength: Double) throws -> Double + func read() throws -> Promise func open() throws -> Void func close() throws -> Void } diff --git a/packages/react-native-fast-io/nitrogen/generated/ios/swift/HybridInputStreamSpecCxx.swift b/packages/react-native-fast-io/nitrogen/generated/ios/swift/HybridInputStreamSpecCxx.swift index 48fb96c..8f3def9 100644 --- a/packages/react-native-fast-io/nitrogen/generated/ios/swift/HybridInputStreamSpecCxx.swift +++ b/packages/react-native-fast-io/nitrogen/generated/ios/swift/HybridInputStreamSpecCxx.swift @@ -99,21 +99,16 @@ public class HybridInputStreamSpecCxx { // Methods @inline(__always) - public func hasBytesAvailable() -> Bool { + public func read() -> bridge.PromiseHolder_std__shared_ptr_ArrayBuffer__ { do { - let __result = try self.__implementation.hasBytesAvailable() - return __result - } catch { - let __message = "\(error.localizedDescription)" - fatalError("Swift errors can currently not be propagated to C++! See https://github.com/swiftlang/swift/issues/75290 (Error: \(__message))") - } - } - - @inline(__always) - public func read(buffer: ArrayBufferHolder, maxLength: Double) -> Double { - do { - let __result = try self.__implementation.read(buffer: buffer, maxLength: maxLength) - return __result + let __result = try self.__implementation.read() + return { () -> bridge.PromiseHolder_std__shared_ptr_ArrayBuffer__ in + let __promiseHolder = bridge.create_PromiseHolder_std__shared_ptr_ArrayBuffer__() + __result + .then({ __result in __promiseHolder.resolve(__result.getArrayBuffer()) }) + .catch({ __error in __promiseHolder.reject(std.string(String(describing: __error))) }) + return __promiseHolder + }() } catch { let __message = "\(error.localizedDescription)" fatalError("Swift errors can currently not be propagated to C++! See https://github.com/swiftlang/swift/issues/75290 (Error: \(__message))") diff --git a/packages/react-native-fast-io/nitrogen/generated/ios/swift/HybridOutputStreamSpec.swift b/packages/react-native-fast-io/nitrogen/generated/ios/swift/HybridOutputStreamSpec.swift index 69708ca..01b6346 100644 --- a/packages/react-native-fast-io/nitrogen/generated/ios/swift/HybridOutputStreamSpec.swift +++ b/packages/react-native-fast-io/nitrogen/generated/ios/swift/HybridOutputStreamSpec.swift @@ -32,8 +32,7 @@ public protocol HybridOutputStreamSpec: AnyObject, HybridObjectSpec { // Methods - func hasSpaceAvailable() throws -> Bool - func write(buffer: ArrayBufferHolder, maxLength: Double) throws -> Double + func write(buffer: ArrayBufferHolder) throws -> Promise func open() throws -> Void func close() throws -> Void } diff --git a/packages/react-native-fast-io/nitrogen/generated/ios/swift/HybridOutputStreamSpecCxx.swift b/packages/react-native-fast-io/nitrogen/generated/ios/swift/HybridOutputStreamSpecCxx.swift index cc06fb1..1ba49eb 100644 --- a/packages/react-native-fast-io/nitrogen/generated/ios/swift/HybridOutputStreamSpecCxx.swift +++ b/packages/react-native-fast-io/nitrogen/generated/ios/swift/HybridOutputStreamSpecCxx.swift @@ -99,21 +99,16 @@ public class HybridOutputStreamSpecCxx { // Methods @inline(__always) - public func hasSpaceAvailable() -> Bool { + public func write(buffer: ArrayBufferHolder) -> bridge.PromiseHolder_void_ { do { - let __result = try self.__implementation.hasSpaceAvailable() - return __result - } catch { - let __message = "\(error.localizedDescription)" - fatalError("Swift errors can currently not be propagated to C++! See https://github.com/swiftlang/swift/issues/75290 (Error: \(__message))") - } - } - - @inline(__always) - public func write(buffer: ArrayBufferHolder, maxLength: Double) -> Double { - do { - let __result = try self.__implementation.write(buffer: buffer, maxLength: maxLength) - return __result + let __result = try self.__implementation.write(buffer: buffer) + return { () -> bridge.PromiseHolder_void_ in + let __promiseHolder = bridge.create_PromiseHolder_void_() + __result + .then({ __result in __promiseHolder.resolve() }) + .catch({ __error in __promiseHolder.reject(std.string(String(describing: __error))) }) + return __promiseHolder + }() } catch { let __message = "\(error.localizedDescription)" fatalError("Swift errors can currently not be propagated to C++! See https://github.com/swiftlang/swift/issues/75290 (Error: \(__message))") diff --git a/packages/react-native-fast-io/nitrogen/generated/shared/c++/HybridInputStreamSpec.cpp b/packages/react-native-fast-io/nitrogen/generated/shared/c++/HybridInputStreamSpec.cpp index bb4f450..87a29f8 100644 --- a/packages/react-native-fast-io/nitrogen/generated/shared/c++/HybridInputStreamSpec.cpp +++ b/packages/react-native-fast-io/nitrogen/generated/shared/c++/HybridInputStreamSpec.cpp @@ -14,7 +14,6 @@ namespace margelo::nitro::fastio { HybridObject::loadHybridMethods(); // load custom methods/properties registerHybrids(this, [](Prototype& prototype) { - prototype.registerHybridMethod("hasBytesAvailable", &HybridInputStreamSpec::hasBytesAvailable); prototype.registerHybridMethod("read", &HybridInputStreamSpec::read); prototype.registerHybridMethod("open", &HybridInputStreamSpec::open); prototype.registerHybridMethod("close", &HybridInputStreamSpec::close); diff --git a/packages/react-native-fast-io/nitrogen/generated/shared/c++/HybridInputStreamSpec.hpp b/packages/react-native-fast-io/nitrogen/generated/shared/c++/HybridInputStreamSpec.hpp index 674f1c3..39f6941 100644 --- a/packages/react-native-fast-io/nitrogen/generated/shared/c++/HybridInputStreamSpec.hpp +++ b/packages/react-native-fast-io/nitrogen/generated/shared/c++/HybridInputStreamSpec.hpp @@ -16,6 +16,7 @@ // Forward declaration of `ArrayBuffer` to properly resolve imports. namespace NitroModules { class ArrayBuffer; } +#include #include namespace margelo::nitro::fastio { @@ -49,8 +50,7 @@ namespace margelo::nitro::fastio { public: // Methods - virtual bool hasBytesAvailable() = 0; - virtual double read(const std::shared_ptr& buffer, double maxLength) = 0; + virtual std::future> read() = 0; virtual void open() = 0; virtual void close() = 0; diff --git a/packages/react-native-fast-io/nitrogen/generated/shared/c++/HybridOutputStreamSpec.cpp b/packages/react-native-fast-io/nitrogen/generated/shared/c++/HybridOutputStreamSpec.cpp index c0990d6..82763b8 100644 --- a/packages/react-native-fast-io/nitrogen/generated/shared/c++/HybridOutputStreamSpec.cpp +++ b/packages/react-native-fast-io/nitrogen/generated/shared/c++/HybridOutputStreamSpec.cpp @@ -14,7 +14,6 @@ namespace margelo::nitro::fastio { HybridObject::loadHybridMethods(); // load custom methods/properties registerHybrids(this, [](Prototype& prototype) { - prototype.registerHybridMethod("hasSpaceAvailable", &HybridOutputStreamSpec::hasSpaceAvailable); prototype.registerHybridMethod("write", &HybridOutputStreamSpec::write); prototype.registerHybridMethod("open", &HybridOutputStreamSpec::open); prototype.registerHybridMethod("close", &HybridOutputStreamSpec::close); diff --git a/packages/react-native-fast-io/nitrogen/generated/shared/c++/HybridOutputStreamSpec.hpp b/packages/react-native-fast-io/nitrogen/generated/shared/c++/HybridOutputStreamSpec.hpp index 98f86b1..4f13652 100644 --- a/packages/react-native-fast-io/nitrogen/generated/shared/c++/HybridOutputStreamSpec.hpp +++ b/packages/react-native-fast-io/nitrogen/generated/shared/c++/HybridOutputStreamSpec.hpp @@ -16,6 +16,7 @@ // Forward declaration of `ArrayBuffer` to properly resolve imports. namespace NitroModules { class ArrayBuffer; } +#include #include namespace margelo::nitro::fastio { @@ -49,8 +50,7 @@ namespace margelo::nitro::fastio { public: // Methods - virtual bool hasSpaceAvailable() = 0; - virtual double write(const std::shared_ptr& buffer, double maxLength) = 0; + virtual std::future write(const std::shared_ptr& buffer) = 0; virtual void open() = 0; virtual void close() = 0; diff --git a/packages/react-native-fast-io/src/native/streams.nitro.ts b/packages/react-native-fast-io/src/native/streams.nitro.ts index 2063676..975998e 100644 --- a/packages/react-native-fast-io/src/native/streams.nitro.ts +++ b/packages/react-native-fast-io/src/native/streams.nitro.ts @@ -1,16 +1,14 @@ import { getHybridObjectConstructor, HybridObject, NitroModules } from 'react-native-nitro-modules' export interface InputStream extends HybridObject<{ ios: 'swift'; android: 'kotlin' }> { - hasBytesAvailable(): boolean - read(buffer: ArrayBuffer, maxLength: number): number + read(): Promise open(): void close(): void } export interface OutputStream extends HybridObject<{ ios: 'swift'; android: 'kotlin' }> { - hasSpaceAvailable(): boolean - write(buffer: ArrayBuffer, maxLength: number): number + write(buffer: ArrayBuffer): Promise open(): void close(): void diff --git a/packages/react-native-fast-io/src/w3c/streams.ts b/packages/react-native-fast-io/src/w3c/streams.ts index 5d106e2..ed1be27 100644 --- a/packages/react-native-fast-io/src/w3c/streams.ts +++ b/packages/react-native-fast-io/src/w3c/streams.ts @@ -1,35 +1,17 @@ -import { - CompressorFactory, - DuplexStream, - InputStream, - OutputStream, - StreamFactory, -} from '../native/streams.nitro' +import { CompressorFactory, DuplexStream, InputStream, OutputStream } from '../native/streams.nitro' export const toReadableStream = (inputStream: InputStream) => { const stream = new ReadableStream({ start() { inputStream.open() }, - pull(controller) { - const buffer = new ArrayBuffer(StreamFactory.bufferSize) - - if (!inputStream.hasBytesAvailable()) { - inputStream.close() + async pull(controller) { + const buffer = await inputStream.read() + if (buffer.byteLength == 0) { controller.close() return } - - const bytesRead = inputStream.read(buffer, StreamFactory.bufferSize) - if (bytesRead < 0) { - inputStream.close() - controller.error('Error reading from stream.') - return - } - - if (bytesRead > 0) { - controller.enqueue(new Uint8Array(buffer.slice(0, bytesRead))) - } + controller.enqueue(new Uint8Array(buffer)) }, cancel() { inputStream.close() @@ -48,16 +30,7 @@ export const toWritableStream = (outputStream: OutputStream) => { if (chunk.byteLength === 0) { return } - - // tbd: implement better backpressure mechanism - while (!outputStream.hasSpaceAvailable()) { - await new Promise((resolve) => setTimeout(resolve, 1)) - } - - const bytesWritten = outputStream.write(chunk.buffer, chunk.byteLength) - if (bytesWritten < 0) { - throw new Error('Failed to write to output stream') - } + await outputStream.write(chunk.buffer) }, close() { outputStream.close() @@ -86,12 +59,8 @@ export class CompressionStream implements globalThis.CompressionStream { const { readable, writable } = new TransformStream({ transform(chunk, controller) { - try { - const compressedData = compressor.compress(chunk.buffer) - controller.enqueue(new Uint8Array(compressedData)) - } catch (error) { - console.error(error) - } + const compressedData = compressor.compress(chunk.buffer) + controller.enqueue(new Uint8Array(compressedData)) }, flush(controller) { const finalData = compressor.finalize()