11import { CompressorFactory , DuplexStream , InputStream , OutputStream } from '../native/streams.nitro'
22
33export const toReadableStream = ( inputStream : InputStream ) => {
4- console . log ( 'Creating ReadableStream from InputStream' )
54 const stream = new ReadableStream < Uint8Array > ( {
6- async start ( ) {
7- console . log ( 'ReadableStream.start()' )
8- try {
9- await inputStream . open ( )
10- console . log ( 'InputStream opened successfully' )
11- } catch ( error ) {
12- console . error ( 'Error opening InputStream:' , error )
13- throw error
14- }
5+ start ( ) {
6+ inputStream . open ( )
157 } ,
168 async pull ( controller ) {
17- console . log ( 'ReadableStream.pull()' )
18- try {
19- const buffer = await inputStream . read ( )
20- console . log ( `Read ${ buffer . byteLength } bytes from InputStream` )
21-
22- if ( buffer . byteLength === 0 ) {
23- console . log ( 'End of stream reached, closing controller' )
24- controller . close ( )
25- return
26- }
27-
28- controller . enqueue ( new Uint8Array ( buffer ) )
29- console . log ( 'Enqueued data to controller' )
30- } catch ( error ) {
31- console . error ( 'Error reading from InputStream:' , error )
32- throw error
9+ const buffer = await inputStream . read ( )
10+ if ( buffer . byteLength == 0 ) {
11+ controller . close ( )
12+ return
3313 }
14+ controller . enqueue ( new Uint8Array ( buffer ) )
3415 } ,
3516 cancel ( ) {
36- console . log ( 'ReadableStream.cancel()' )
37- try {
38- inputStream . close ( )
39- console . log ( 'InputStream closed successfully' )
40- } catch ( error ) {
41- console . error ( 'Error closing InputStream:' , error )
42- }
17+ inputStream . close ( )
4318 } ,
4419 } )
4520
4621 return stream
4722}
4823
4924export const toWritableStream = ( outputStream : OutputStream ) => {
50- console . log ( 'Creating WritableStream from OutputStream' )
5125 return new WritableStream ( {
52- async start ( ) {
53- console . log ( 'WritableStream.start()' )
54- try {
55- await outputStream . open ( )
56- console . log ( 'OutputStream opened successfully' )
57- } catch ( error ) {
58- console . error ( 'Error opening OutputStream:' , error )
59- throw error
60- }
26+ start ( ) {
27+ outputStream . open ( )
6128 } ,
6229 async write ( chunk : Uint8Array ) {
63- console . log ( `WritableStream.write() with ${ chunk . byteLength } bytes` )
6430 if ( chunk . byteLength === 0 ) {
65- console . log ( 'Skipping empty chunk' )
6631 return
6732 }
68-
69- try {
70- await outputStream . write ( chunk . buffer )
71- console . log ( 'Successfully wrote chunk to OutputStream' )
72- } catch ( error ) {
73- console . error ( 'Error writing to OutputStream:' , error )
74- throw error
75- }
33+ await outputStream . write ( chunk . buffer )
7634 } ,
7735 close ( ) {
78- console . log ( 'WritableStream.close()' )
79- try {
80- outputStream . close ( )
81- console . log ( 'OutputStream closed successfully' )
82- } catch ( error ) {
83- console . error ( 'Error closing OutputStream:' , error )
84- }
36+ outputStream . close ( )
8537 } ,
8638 abort ( ) {
87- console . log ( 'WritableStream.abort()' )
88- try {
89- outputStream . close ( )
90- console . log ( 'OutputStream closed successfully on abort' )
91- } catch ( error ) {
92- console . error ( 'Error closing OutputStream on abort:' , error )
93- }
39+ outputStream . close ( )
9440 } ,
9541 } )
9642}
9743
9844export const fromReadableStream = ( stream : ReadableStream ) : InputStream => {
99- console . log ( 'Creating InputStream from ReadableStream' )
10045 const duplexStream = new DuplexStream ( )
101- console . log ( 'Created DuplexStream' )
10246
10347 const writableStream = toWritableStream ( duplexStream . outputStream )
104- console . log ( 'Piping ReadableStream to WritableStream' )
105- stream . pipeTo ( writableStream ) . catch ( ( error ) => {
106- console . error ( 'Error piping stream:' , error )
107- } )
48+ stream . pipeTo ( writableStream )
10849
10950 return duplexStream . inputStream
11051}
@@ -114,41 +55,28 @@ export class CompressionStream implements globalThis.CompressionStream {
11455 readonly writable : WritableStream < Uint8Array >
11556
11657 constructor ( format : CompressionFormat ) {
117- console . log ( `Creating CompressionStream with format: ${ format } ` )
11858 const compressor = CompressorFactory . create ( format )
119- console . log ( 'Created compressor' )
12059
12160 const { readable, writable } = new TransformStream < Uint8Array > ( {
12261 transform ( chunk , controller ) {
123- console . log ( `Compressing chunk of ${ chunk . byteLength } bytes` )
12462 try {
12563 const compressedData = compressor . compress ( chunk . buffer )
126- console . log ( `Compressed to ${ compressedData . byteLength } bytes` )
12764 controller . enqueue ( new Uint8Array ( compressedData ) )
128- console . log ( 'Enqueued compressed data' )
12965 } catch ( error ) {
130- console . error ( 'Error during compression:' , error )
131- throw error
66+ console . error ( error )
13267 }
13368 } ,
13469 flush ( controller ) {
135- console . log ( 'Finalizing compression' )
136- try {
137- const finalData = compressor . finalize ( )
138- console . log ( `Final compressed data: ${ finalData . byteLength } bytes` )
139- if ( finalData . byteLength > 0 ) {
140- controller . enqueue ( new Uint8Array ( finalData ) )
141- console . log ( 'Enqueued final compressed data' )
142- }
143- } catch ( error ) {
144- console . error ( 'Error finalizing compression:' , error )
145- throw error
70+ console . log ( 'flushing' )
71+ const finalData = compressor . finalize ( )
72+ if ( finalData . byteLength > 0 ) {
73+ console . log ( finalData . byteLength )
74+ controller . enqueue ( new Uint8Array ( finalData ) )
14675 }
14776 } ,
14877 } )
14978
15079 this . readable = readable
15180 this . writable = writable
152- console . log ( 'CompressionStream ready' )
15381 }
15482}
0 commit comments