Remote, short, UDP request that echoes input.
EDIT
Presently the main branch of my repository uses externally_connectable to communicate over IPC between an arbitrary tab or window to a Web extension which opens an Isolated Web App (IWA) window where a Direct Sockets TCPSocket or UDPSocket is created, and receives and sends the TCP or UDP socket data to and from the IWA window and arbitrary tab or window using text.
I'm far more interested in Direct Sockets than the idea of an "isolated web app", so I'm using various means to open the IWA window from arbitrary tabs and windows so I can exploit TCPServerSocket, TCPSocket and UDPSocket for my own purposes.
My development branch is fetch-webrtc, where I use TCPServerSocket to create a local server, and fetch() to the IWA to exchange SDP for WebRTC RTCDataChannel connection, so ArrayBuffer's can be substituted for text communication.
main https://github.com/guest271314/sockets. fetch-webrtc https://github.com/guest271314/sockets/tree/fetch-webrtc.
I started working on the fetch-webrtc development branch to get away from text streaming.
Now, what does the code do?
When the IWA window is opened a local Web extension starts a local TCP and UDP server using txiki.js. That allows me to communication with local applications over those protocols.
I'm also able to make remote UDP and TCP connections from the IWA which get forwarded to the arbitrary tab or window, where I usually make the calls in DevTools Console or Snippets. All of this is happening on Chromium based browsers where Direct Sockets are exposed in Isolated Web Apps.
Making an HTTP request over TCP can be brief, or can theoretically last forever. Similar with UDP.
So the present review question surround how to best handle the different types of communications that can occur in a single function - that will have to change signatures slightly to facilitate the different connections and data streams.
I'll add here that during the development of the directSockets function I wrote two (2) versions; one for local echoing, one for remote TCP and UDP connections.
My goal here is to merge handling direct local and remote TCP connections, HTTP over TCP port 80, local and remote UDP connections, and eventually TCP and UDP over TLS when necessary, all using the same base code - and using that same stream function to process the data "client" side.
That function below is the async stream function.
I thought I could use a callback pattern to pass in an anonymous function as parameter to directSocket({fn:async function stream(...){}}). However, that would require exposing the writeable and readable some kind of way, which leads to creating a class, which is what I did in main, above. Or, somehow overloading the stream function with default parameters that provide placeholders for the readable, writable, and bufferedAmountLow function, which is RTCDataChannel buffereamountlow event handler called with {once: true} to apply backpressure to the wrapped WHATWG Streams that I use over the RTCDataChannel.
There's a few things going on here...
So here's the full original function I posted here, in this example making a remote UDP connection
async function directSocket(options) {
var decoder = new TextDecoder;
var local = new RTCPeerConnection({
sdpSemantics: "unified-plan",
iceServers: []
});
var {resolve: transportClosedResolve, promise: transportClosedPromise} = Promise.withResolvers();
["signalingstatechange", "iceconnectionstatechange", "icegatheringstatechange"].forEach( (evt) => local.addEventListener(evt, (e) => {
if (e.type === "iceconnectionstatechange" && e.target.sctp.state === "closed") {
transportClosedResolve();
}
{}
}
));
local.onicecandidate = async (e) => {
if (!e.candidate) {
try {
if (globalThis?.openIsolatedWebApp) {
await openIsolatedWebApp(`?name=TCPSocket`);
} else {
if (globalThis?.setTitle) {
setTitle(`?=TCPSocket`);
} else {
document.title = "?=TCPSocket";
}
}
await scheduler.postTask( () => {}
, {
delay: 1000,
priority: "user-visible"
});
console.log("sdp:", local.localDescription.toJSON().sdp);
var abortable = new AbortController;
var {signal} = abortable;
var sdp = await (await fetch("http://0.0.0.0:44819", {
method: "post",
body: new TextEncoder().encode(local.localDescription.sdp),
signal
})).text();
await local.setRemoteDescription({
type: "answer",
sdp
});
console.log("Done signaling SDP");
} catch (e) {
console.error(e);
}
}
}
;
var channel = local.createDataChannel(JSON.stringify({
address: options.address,
port: options.port,
protocol: options.protocol
}), {
negotiated: false,
ordered: true,
id: 0,
binaryType: "arraybuffer",
protocol: options.protocol
});
var readableController;
var writableController;
var {resolve, promise: dataChannelStream} = Promise.withResolvers();
channel.onopen = async (e) => {
var readable = new ReadableStream({
start(_) {
return readableController = _;
},
cancel(reason) {
console.log(reason);
}
});
var writable = new WritableStream({
start(_) {
return writableController = _;
},
write(v) {
channel.send(v);
},
close() {
channel.close();
readableController.close();
},
abort(reason) {
console.log(reason);
}
});
resolve({
readable,
writable
});
}
;
var {resolve: dataChannelCloseResolve, promise: dataChannelClosePromise} = Promise.withResolvers();
channel.onclose = async (e) => {
const {binaryType, label, bufferedAmount, ordered, protocol, readyState, reliable} = e.target;
dataChannelCloseResolve({
binaryType,
label: JSON.parse(label),
bufferedAmount,
ordered,
protocol,
readyState,
reliable
});
}
;
channel.onclosing = async (e) => {
console.log(e.type);
}
;
channel.onerror = async (e) => {
console.log(e.type, e.target);
await Promise.allSettled([readable.closed, writable.closed]).then( (args) => console.log(readable.locked, writable.locked)).catch(console.log);
}
;
channel.onmessage = (e) => {
readableController.enqueue(e.data);
}
;
var offer = await local.createOffer({
voiceActivityDetection: false,
offerToReceiveAudio: false,
offerToReceiveVideo: false,
iceRestart: false
});
await local.setLocalDescription(offer);
var {readable, writable} = await dataChannelStream;
var writer = writable.getWriter();
var reader = readable.getReader();
await scheduler.postTask( () => {}
, {
delay: 500,
priority: "background"
});
var maxMessageSize = options.protocol === "udp" ? 65507 : local.sctp.maxMessageSize;
var bufferedAmountLow = async () => await new Promise( (resolve) => {
channel.addEventListener("bufferedamountlow", resolve, {
once: true
});
}
);
async function stream(input) {
let result = "";
await writer.ready;
await writer.write(input);
await bufferedAmountLow();
await scheduler.postTask(() => Promise.all([writer.ready, writer.close(), writer.closed]), {delay:30050});
return reader.read().then(async function read({value, done}) {
if (done) {
return result;
}
result += decoder.decode(value);
return reader.read().then(read);
})
}
var result = await stream(options.data).catch( (e) => e);
reader.releaseLock();
writer.releaseLock();
return Promise.allSettled([writable.closed, readable.cancel(), transportClosedPromise, dataChannelClosePromise]).then( ([,,,{value: transport}]) => ({transport, result}));
}
directSocket({
protocol: "udp",
address: "52.43.121.77",
port: 10001,
data: new TextEncoder().encode(`Remote`So UDPwe echo`need people to have weird new
ideas ... we need more ideas to break it
and make it better ...
Use it. Break it. File bugs. Request features.
- Soledad Penadés, Real time front-end alchemy, or: capturing, playing,
altering and encoding video and audio streams, without
servers or plugins!`)
}).then(async (result) => console.log(result)).catch( (e) => {
console.warn(e);
}
).finally( () => document.title = "");
Local TCPAnd an example of making an HTTP request, echoing on port 80 using the same function.
We'll have to handle processing 20 MB a little different than processing HTTP response headers and the body responded by GitHub for an old repository - before GitHub made HTTPS mandatory for GH Pages.
async function directSocket(options) {
var maxMessageSize = options.protocol === "udp" ? 65507 : local.sctp.maxMessageSize;
var bufferedAmountLow = async() => await new Promise( (resolve) => {
channel.addEventListener("bufferedamountlow", resolve"tcp", {
onceaddress: true
});
}
);
async function stream(input) {
var len = 0;
for (let i = 0; i < input.length; i += maxMessageSize) {
const data = input.subarray(i, i + maxMessageSize);
await writer"guest271314.ready;
await writergithub.write(data);
await bufferedAmountLow();
let readLength = 0;
do {
var {valueio", done} = await reader.read();
len +=port: value.byteLength80, done;
readLength += value.byteLength;
} while (readLength < data.length);
}
await writer.close();
return len;
}
var result = await stream(options.data).catch( (e) => e);
reader.releaseLock();
: new writer.releaseLockTextEncoder();
return Promise.allSettled([writable.closed, readable.cancelencode(),"GET transportClosedPromise,/ dataChannelClosePromise])HTTP/1.then( ([,,,{value: transport}]) => ({transport, result}));
}
directSocket({
protocol: "tcp",
address1\r\n\Host: "127.0guest271314.0github.1",
port: 8000,
data: new Uint8Array(1024 ** 2 * 20io\r\n\r\n")
}).then(async (result) => console.log(result)).catch( (e) => {
console.warn(e);
}
).finally( () => document.title = "");
Remote HTTP request over TCP on port 80, which can useThat's where my question came in with regard to the samestructue of that stream function as, and the original question title "Class, overload, callback, different function".
I see the people don't get what I'm saying in the snippets I posted, so you got the bulk of what's going on, above, now.
Since I gathered the people didn't get what was going on, nor the context of my question, I went ahead and invested a few hours into writing and testing the class approach. Here it is.
Now you should be able to get your review on, I think...
directSocket({ protocol: "tcp", address: "guest271314.github.io", port: 80, data: new TextEncoder().encode("GET / HTTP/1.1\r\n\Host:guest271314.github.io\r\n\r\n") }).then(async (result) => console.log(result)).catch( (e) => { console.warn(e);var DirectSockets = class { options; opened; close; closed; constructor(options) { this.options = options; const { resolve: transportClosedResolve, reject: transportClosedReject, promise: transportClosedPromise, } = Promise.withResolvers(); const { resolve: dataChannelOpenResolve, reject: dataChannelOpenReject, promise: dataChannelOpenPromise, } = Promise.withResolvers(); const { resolve: dataChannelCloseResolve, reject: dataChannelCloseReject, promise: dataChannelClosePromise, } = Promise.withResolvers(); let local, channel, readableController, writableController, rejectOpen, rejectClose; this.opened = new Promise(async (resolveOpen, _) => { rejectOpen = _; var decoder = new TextDecoder(); local = new RTCPeerConnection({ sdpSemantics: "unified-plan", iceServers: [], }); for ( const rtcPeerConnectionEvent of [ "signalingstatechange", "iceconnectionstatechange", "icegatheringstatechange", ] ) { local.addEventListener(rtcPeerConnectionEvent, async (e) => { if ( e.type === "iceconnectionstatechange" && e.target.sctp.state === "closed" ) { await dataChannelClosePromise; this.options.readyState = channel.readyState; transportClosedResolve(); } }); } local.onicecandidate = async (e) => { if (!e.candidate) { try { if (globalThis?.openIsolatedWebApp) { await openIsolatedWebApp(`?name=TCPSocket`); } else { if (globalThis?.setTitle) { setTitle(`?=TCPSocket`); } else { document.title = "?=TCPSocket"; } } await scheduler.postTask(() => {}, { delay: 1000, priority: "user-visible", }); // console.log("sdp:", local.localDescription.toJSON().sdp); // var abortable = new AbortController(); // var { signal } = abortable; const sdp = await (await fetch("http://0.0.0.0:44819", { method: "post", body: new TextEncoder().encode(local.localDescription.sdp), // signal, })).text(); await local.setRemoteDescription({ type: "answer", sdp, }); // console.log("Done signaling SDP"); } catch (e) { console.error(e); } } }; channel = local.createDataChannel( JSON.stringify({ address: options.address, port: options.port, protocol: options.protocol, }), { negotiated: false, ordered: true, id: 0, binaryType: "arraybuffer", protocol: options.protocol, }, ); channel.onopen = async (e) => { const { binaryType, label, bufferedAmount, ordered, protocol, readyState, reliable, } = e.target; Object.assign(this.options, { binaryType, label, bufferedAmount, ordered, protocol, readyState, reliable, }); const readable = new ReadableStream({ start(_) { return readableController = _; }, cancel(reason) { console.log(reason); }, }); const writable = new WritableStream({ start(_) { return writableController = _; }, write(v) { if (channel.readyState === "open") { channel.send(v); } }, close() { channel.close(); readableController.close(); }, abort(reason) { console.log(reason); }, }); dataChannelOpenResolve({ readable, writable, }); }; channel.onclose = async (e) => { console.log(local); // iceConnectionState, connectionState, signalingState this.options.readyState = e.target.readyState; if ( local.connectionState === "closed" || local?.sctp?.state === "closed" ) { dataChannelCloseReject(); } else { if (local.sctp.state === "connected") { dataChannelCloseResolve(); } } }; channel.onclosing = async (e) => { console.log(e.type); }; channel.onerror = async (e) => { console.log(e.type, e.target); await Promise.allSettled([readable.closed, writable.closed]).then(( args, ) => console.log(readable.locked, writable.locked)).catch(console.log); }; channel.onmessage = (e) => { readableController.enqueue(e.data); }; const offer = await local.createOffer({ voiceActivityDetection: false, offerToReceiveAudio: false, offerToReceiveVideo: false, iceRestart: false, }); await local.setLocalDescription(offer); const { readable, writable } = await dataChannelOpenPromise; await scheduler.postTask(() => {}, { delay: 500, priority: "background", }); this.options.maxMessageSize = options.protocol === "udp" ? 65507 : local.sctp.maxMessageSize; this.bufferedAmountLow = async () => await new Promise((resolve) => { channel.addEventListener("bufferedamountlow", resolve, { once: true, }); }); resolveOpen({ readable, writable }); }).catch((e) => { throw e; }); this.closed = Promise.allSettled([ new Promise(async (r, _) => { rejectClose = _; r(transportClosedPromise); }), dataChannelClosePromise, ]).catch((e) => { throw e; }); this.close = () => { try { transportClosedReject(); rejectOpen(); rejectClose(); channel?.close(); local?.close(); } catch (e) { console.log(e); } finally { this.options.readyState = channel.readyState; } }; } }; var socket = new DirectSockets({ protocol: "udp", address: "127.0.0.1", port: 8000, }); console.log(socket); socket.closed.then((args) => console.log("socket closed", args, socket.options.readyState) ).catch((e) => console.log("socket closed reject", e, socket.options.readyState) ); // Early close //socket.close(); var decoder = new TextDecoder(); var p = await socket.opened.catch((e) => { console.log("socket opened reject", e); }); console.log(p); if (p?.readable) { var { readable, writable } = p; var reader = readable.getReader(); var writer = writable.getWriter(); /* async function stream(input) { let result2 = ""; await writer.ready; await writer.write(input); await socket.bufferedAmountLow(); //console.log(writer.desiredSize); //socket.close(); await scheduler.postTask( () => Promise.all([writer.close(), writer.closed]), { delay: 300 }, ); //console.log(writer.desiredSize); return reader.read().then(async function read({ value, done }) { // console.log(value, done); if (done) { return result2; } result2 += decoder.decode(value); return reader.read().then(read); }); } var result = await stream(new TextEncoder().encode( `So we need people to have weird new ideas ... we need more ideas to break it and make it better ... Use it. Break it. File bugs. Request features. - Soledad Penadés, Real time front-end alchemy, or: capturing, playing, altering and encoding video and audio streams, without servers or plugins! `, )).catch((e) => e); */ async function stream(input) { var len = 0; for (let i = 0; i < input.length; i += socket.options.maxMessageSize) { const data = input.subarray(i, i + socket.options.maxMessageSize); await writer.ready; await writer.write(data).catch((e) => { throw e; }); await socket.bufferedAmountLow(); let readLength = 0; do { var { value, done } = await reader.read(); len += value.byteLength, done; readLength += value.byteLength; } while (readLength < data.length); } await Promise.all([writer.close(), writer.closed]); return len; } var result = await stream(new Uint8Array(1024 ** 2 * 20)).catch((e) => e); console.log(result); reader.releaseLock(); writer.releaseLock();} ).finally(// ()Later =>close // documentsocket.title = ""close(); ```