0
\$\begingroup\$

A branch of a repository I maintain has a function that can do different things based on the input and expected result, and structure of a particular inner function call.

I can echo 20 MB to and from a local TCP or UDP connection locally; make an external HTTP request over TCP and read raw response, including headers and body; make an external UDP connection; establish and keep a local or remote TCP or UDP connection active, potentially indefinitely.

Right now I use two different scripts that have the bulk of the same signature - except for the way the inner function is written.

Would you create a class; an overload with default parameters, essentially a callback-like function that is passed; keep using two (or more) function signatures? Remaining the same is a manageable option.

The basic structure of the outer function directSocket, and the inner function stream, in pertinent parts. The stream function is the one that could be passed asa callback, though the caller would have to already know what variables are used inside of directSocket - outside of the directSocket function.

Remote, short, UDP request that echoes input.

EDIT

Presently the main branch of my repository uses externally_connectable to communicate over IPC between an arbitrary tab or window to a Web extension which opens an Isolated Web App (IWA) window where a Direct Sockets TCPSocket or UDPSocket is created, and receives and sends the TCP or UDP socket data to and from the IWA window and arbitrary tab or window using text.

I'm far more interested in Direct Sockets than the idea of an "isolated web app", so I'm using various means to open the IWA window from arbitrary tabs and windows so I can exploit TCPServerSocket, TCPSocket and UDPSocket for my own purposes.

My development branch is fetch-webrtc, where I use TCPServerSocket to create a local server, and fetch() to the IWA to exchange SDP for WebRTC RTCDataChannel connection, so ArrayBuffer's can be substituted for text communication.

main https://github.com/guest271314/sockets. fetch-webrtc https://github.com/guest271314/sockets/tree/fetch-webrtc.

I started working on the fetch-webrtc development branch to get away from text streaming.

Now, what does the code do?

When the IWA window is opened a local Web extension starts a local TCP and UDP server using txiki.js. That allows me to communication with local applications over those protocols.

I'm also able to make remote UDP and TCP connections from the IWA which get forwarded to the arbitrary tab or window, where I usually make the calls in DevTools Console or Snippets. All of this is happening on Chromium based browsers where Direct Sockets are exposed in Isolated Web Apps.

Making an HTTP request over TCP can be brief, or can theoretically last forever. Similar with UDP.

So the present review question surround how to best handle the different types of communications that can occur in a single function - that will have to change signatures slightly to facilitate the different connections and data streams.

I'll add here that during the development of the directSockets function I wrote two (2) versions; one for local echoing, one for remote TCP and UDP connections.

My goal here is to merge handling direct local and remote TCP connections, HTTP over TCP port 80, local and remote UDP connections, and eventually TCP and UDP over TLS when necessary, all using the same base code - and using that same stream function to process the data "client" side.

That function below is the async stream function.

I thought I could use a callback pattern to pass in an anonymous function as parameter to directSocket({fn:async function stream(...){}}). However, that would require exposing the writeable and readable some kind of way, which leads to creating a class, which is what I did in main, above. Or, somehow overloading the stream function with default parameters that provide placeholders for the readable, writable, and bufferedAmountLow function, which is RTCDataChannel buffereamountlow event handler called with {once: true} to apply backpressure to the wrapped WHATWG Streams that I use over the RTCDataChannel.

There's a few things going on here...

So here's the full original function I posted here, in this example making a remote UDP connection

async function directSocket(options) {
  var decoder = new TextDecoder;
  var local = new RTCPeerConnection({
    sdpSemantics: "unified-plan",
    iceServers: []
  });
  var {resolve: transportClosedResolve, promise: transportClosedPromise} = Promise.withResolvers();
  ["signalingstatechange", "iceconnectionstatechange", "icegatheringstatechange"].forEach( (evt) => local.addEventListener(evt, (e) => {
    if (e.type === "iceconnectionstatechange" && e.target.sctp.state === "closed") {
      transportClosedResolve();
    }
    {}
  }
  ));
  local.onicecandidate = async (e) => {
    if (!e.candidate) {
      try {
        if (globalThis?.openIsolatedWebApp) {
          await openIsolatedWebApp(`?name=TCPSocket`);
        } else {
          if (globalThis?.setTitle) {
            setTitle(`?=TCPSocket`);
          } else {
            document.title = "?=TCPSocket";
          }
        }
        await scheduler.postTask( () => {}
        , {
          delay: 1000,
          priority: "user-visible"
        });
        console.log("sdp:", local.localDescription.toJSON().sdp);
        var abortable = new AbortController;
        var {signal} = abortable;
        var sdp = await (await fetch("http://0.0.0.0:44819", {
          method: "post",
          body: new TextEncoder().encode(local.localDescription.sdp),
          signal
        })).text();
        await local.setRemoteDescription({
          type: "answer",
          sdp
        });
        console.log("Done signaling SDP");
      } catch (e) {
        console.error(e);
      }
    }
  }
  ;
  var channel = local.createDataChannel(JSON.stringify({
    address: options.address,
    port: options.port,
    protocol: options.protocol
  }), {
    negotiated: false,
    ordered: true,
    id: 0,
    binaryType: "arraybuffer",
    protocol: options.protocol
  });
  var readableController;
  var writableController;
  var {resolve, promise: dataChannelStream} = Promise.withResolvers();
  channel.onopen = async (e) => {
    var readable = new ReadableStream({
      start(_) {
        return readableController = _;
      },
      cancel(reason) {
        console.log(reason);
      }
    });
    var writable = new WritableStream({
      start(_) {
        return writableController = _;
      },
      write(v) {
        channel.send(v);
      },
      close() {
        channel.close();
        readableController.close();
      },
      abort(reason) {
        console.log(reason);
      }
    });
    resolve({
      readable,
      writable
    });
  }
  ;
  var {resolve: dataChannelCloseResolve, promise: dataChannelClosePromise} = Promise.withResolvers();
  channel.onclose = async (e) => {
    const {binaryType, label, bufferedAmount, ordered, protocol, readyState, reliable} = e.target;
    dataChannelCloseResolve({
      binaryType,
      label: JSON.parse(label),
      bufferedAmount,
      ordered,
      protocol,
      readyState,
      reliable
    });
  }
  ;
  channel.onclosing = async (e) => {
    console.log(e.type);
  }
  ;
  channel.onerror = async (e) => {
    console.log(e.type, e.target);
    await Promise.allSettled([readable.closed, writable.closed]).then( (args) => console.log(readable.locked, writable.locked)).catch(console.log);
  }
  ;
  channel.onmessage = (e) => {
    readableController.enqueue(e.data);
  }
  ;
  var offer = await local.createOffer({
    voiceActivityDetection: false,
    offerToReceiveAudio: false,
    offerToReceiveVideo: false,
    iceRestart: false
  });
  await local.setLocalDescription(offer);
  var {readable, writable} = await dataChannelStream;
  var writer = writable.getWriter();
  var reader = readable.getReader();
  await scheduler.postTask( () => {}
  , {
    delay: 500,
    priority: "background"
  });
  var maxMessageSize = options.protocol === "udp" ? 65507 : local.sctp.maxMessageSize;
  var bufferedAmountLow = async () => await new Promise( (resolve) => {
    channel.addEventListener("bufferedamountlow", resolve, {
      once: true
    });
  }
  );

  async function stream(input) {
    let result = "";
    await writer.ready;
    await writer.write(input);
    await bufferedAmountLow();
    await scheduler.postTask(() => Promise.all([writer.ready, writer.close(), writer.closed]), {delay:50});
    return reader.read().then(async function read({value, done}) {
      if (done) {
        return result;
      }
      result += decoder.decode(value);
      return reader.read().then(read);
    })
  }
  var result = await stream(options.data).catch( (e) => e);
  reader.releaseLock();
  writer.releaseLock();
  return Promise.allSettled([writable.closed, readable.cancel(), transportClosedPromise, dataChannelClosePromise]).then( ([,,,{value: transport}]) => ({transport, result}));
}
directSocket({
  protocol: "udp",
  address: "52.43.121.77",
  port: 10001,
  data: new TextEncoder().encode(`So we need people to have weird new
ideas ... we need more ideas to break it
and make it better ...

Use it. Break it. File bugs. Request features.

- Soledad Penadés, Real time front-end alchemy, or: capturing, playing,
  altering and encoding video and audio streams, without
  servers or plugins!`)
}).then(async (result) => console.log(result)).catch( (e) => {
  console.warn(e);
}
).finally( () => document.title = "");

And an example of making an HTTP request on port 80 using the same function.

We'll have to handle processing 20 MB a little different than processing HTTP response headers and the body responded by GitHub for an old repository - before GitHub made HTTPS mandatory for GH Pages.

directSocket({
  protocol: "tcp",
  address: "guest271314.github.io",
  port: 80,
  data: new TextEncoder().encode("GET / HTTP/1.1\r\n\Host:guest271314.github.io\r\n\r\n")
}).then(async (result) => console.log(result)).catch( (e) => {
  console.warn(e);
}
).finally( () => document.title = "");

That's where my question came in with regard to the structue of that stream function, and the original question title "Class, overload, callback, different function".

I see the people don't get what I'm saying in the snippets I posted, so you got the bulk of what's going on, above, now.

Since I gathered the people didn't get what was going on, nor the context of my question, I went ahead and invested a few hours into writing and testing the class approach. Here it is.

Now you should be able to get your review on, I think...

var DirectSockets = class {
  options;
  opened;
  close;
  closed;
  constructor(options) {
    this.options = options;
    const {
      resolve: transportClosedResolve,
      reject: transportClosedReject,
      promise: transportClosedPromise,
    } = Promise.withResolvers();
    const {
      resolve: dataChannelOpenResolve,
      reject: dataChannelOpenReject,
      promise: dataChannelOpenPromise,
    } = Promise.withResolvers();
    const {
      resolve: dataChannelCloseResolve,
      reject: dataChannelCloseReject,
      promise: dataChannelClosePromise,
    } = Promise.withResolvers();
    let local,
      channel,
      readableController,
      writableController,
      rejectOpen,
      rejectClose;
    this.opened = new Promise(async (resolveOpen, _) => {
      rejectOpen = _;
      var decoder = new TextDecoder();
      local = new RTCPeerConnection({
        sdpSemantics: "unified-plan",
        iceServers: [],
      });

      for (
        const rtcPeerConnectionEvent of [
          "signalingstatechange",
          "iceconnectionstatechange",
          "icegatheringstatechange",
        ]
      ) {
        local.addEventListener(rtcPeerConnectionEvent, async (e) => {
          if (
            e.type === "iceconnectionstatechange" &&
            e.target.sctp.state === "closed"
          ) {
            await dataChannelClosePromise;
            this.options.readyState = channel.readyState;
            transportClosedResolve();
          }
        });
      }
      local.onicecandidate = async (e) => {
        if (!e.candidate) {
          try {
            if (globalThis?.openIsolatedWebApp) {
              await openIsolatedWebApp(`?name=TCPSocket`);
            } else {
              if (globalThis?.setTitle) {
                setTitle(`?=TCPSocket`);
              } else {
                document.title = "?=TCPSocket";
              }
            }
            await scheduler.postTask(() => {}, {
              delay: 1000,
              priority: "user-visible",
            });
            // console.log("sdp:", local.localDescription.toJSON().sdp);
            // var abortable = new AbortController();
            // var { signal } = abortable;
            const sdp = await (await fetch("http://0.0.0.0:44819", {
              method: "post",
              body: new TextEncoder().encode(local.localDescription.sdp),
              // signal,
            })).text();
            await local.setRemoteDescription({
              type: "answer",
              sdp,
            });
            // console.log("Done signaling SDP");
          } catch (e) {
            console.error(e);
          }
        }
      };
      channel = local.createDataChannel(
        JSON.stringify({
          address: options.address,
          port: options.port,
          protocol: options.protocol,
        }),
        {
          negotiated: false,
          ordered: true,
          id: 0,
          binaryType: "arraybuffer",
          protocol: options.protocol,
        },
      );

      channel.onopen = async (e) => {
        const {
          binaryType,
          label,
          bufferedAmount,
          ordered,
          protocol,
          readyState,
          reliable,
        } = e.target;
        Object.assign(this.options, {
          binaryType,
          label,
          bufferedAmount,
          ordered,
          protocol,
          readyState,
          reliable,
        });
        const readable = new ReadableStream({
          start(_) {
            return readableController = _;
          },
          cancel(reason) {
            console.log(reason);
          },
        });
        const writable = new WritableStream({
          start(_) {
            return writableController = _;
          },
          write(v) {
            if (channel.readyState === "open") {
              channel.send(v);
            }
          },
          close() {
            channel.close();
            readableController.close();
          },
          abort(reason) {
            console.log(reason);
          },
        });
        dataChannelOpenResolve({
          readable,
          writable,
        });
      };
      channel.onclose = async (e) => {
        console.log(local);
        // iceConnectionState, connectionState, signalingState
        this.options.readyState = e.target.readyState;
        if (
          local.connectionState === "closed" || local?.sctp?.state === "closed"
        ) {
          dataChannelCloseReject();
        } else {
          if (local.sctp.state === "connected") {
            dataChannelCloseResolve();
          }
        }
      };
      channel.onclosing = async (e) => {
        console.log(e.type);
      };
      channel.onerror = async (e) => {
        console.log(e.type, e.target);
        await Promise.allSettled([readable.closed, writable.closed]).then((
          args,
        ) => console.log(readable.locked, writable.locked)).catch(console.log);
      };
      channel.onmessage = (e) => {
        readableController.enqueue(e.data);
      };
      const offer = await local.createOffer({
        voiceActivityDetection: false,
        offerToReceiveAudio: false,
        offerToReceiveVideo: false,
        iceRestart: false,
      });
      await local.setLocalDescription(offer);
      const { readable, writable } = await dataChannelOpenPromise;
      await scheduler.postTask(() => {}, {
        delay: 500,
        priority: "background",
      });
      this.options.maxMessageSize = options.protocol === "udp"
        ? 65507
        : local.sctp.maxMessageSize;
      this.bufferedAmountLow = async () =>
        await new Promise((resolve) => {
          channel.addEventListener("bufferedamountlow", resolve, {
            once: true,
          });
        });
      resolveOpen({ readable, writable });
    }).catch((e) => {
      throw e;
    });
    this.closed = Promise.allSettled([
      new Promise(async (r, _) => {
        rejectClose = _;
        r(transportClosedPromise);
      }),
      dataChannelClosePromise,
    ]).catch((e) => {
      throw e;
    });
    this.close = () => {
      try {
        transportClosedReject();
        rejectOpen();
        rejectClose();
        channel?.close();
        local?.close();
      } catch (e) {
        console.log(e);
      } finally {
        this.options.readyState = channel.readyState;
      }
    };
  }
};
var socket = new DirectSockets({
  protocol: "udp",
  address: "127.0.0.1",
  port: 8000,
});
console.log(socket);
socket.closed.then((args) =>
  console.log("socket closed", args, socket.options.readyState)
).catch((e) =>
  console.log("socket closed reject", e, socket.options.readyState)
);
// Early close
//socket.close();
var decoder = new TextDecoder();
var p = await socket.opened.catch((e) => {
  console.log("socket opened reject", e);
});
console.log(p);
if (p?.readable) {
  var { readable, writable } = p;
  var reader = readable.getReader();
  var writer = writable.getWriter();
  /*
  async function stream(input) {
    let result2 = "";
    await writer.ready;
    await writer.write(input);
    await socket.bufferedAmountLow();
    //console.log(writer.desiredSize);
    //socket.close();
    await scheduler.postTask(
      () => Promise.all([writer.close(), writer.closed]),
      { delay: 300 },
    );
    //console.log(writer.desiredSize);
    return reader.read().then(async function read({ value, done }) {
     // console.log(value, done);
      if (done) {
        return result2;
      }
      result2 += decoder.decode(value);
      return reader.read().then(read);
    });
  }
  var result = await stream(new TextEncoder().encode(
    `So we need people to have weird new
ideas ... we need more ideas to break it
and make it better ...

Use it. Break it. File bugs. Request features.

- Soledad Penadés, Real time front-end alchemy, or: capturing, playing,
  altering and encoding video and audio streams, without
  servers or plugins!
`,
  )).catch((e) => e);
  */
  async function stream(input) {
    var len = 0;
    for (let i = 0; i < input.length; i += socket.options.maxMessageSize) {
      const data = input.subarray(i, i + socket.options.maxMessageSize);
      await writer.ready;
      await writer.write(data).catch((e) => {
        throw e;
      });
      await socket.bufferedAmountLow();
      let readLength = 0;

      do {
        var { value, done } = await reader.read();
        len += value.byteLength, done;
        readLength += value.byteLength;
      } while (readLength < data.length);
    }
    await Promise.all([writer.close(), writer.closed]);
    return len;
  }
  var result = await stream(new Uint8Array(1024 ** 2 * 20)).catch((e) => e);

  console.log(result);
  reader.releaseLock();
  writer.releaseLock();
}
// Later close
// socket.close();

```
\$\endgroup\$
6
  • 4
    \$\begingroup\$ Please edit your question so that the title describes the purpose of the code, rather than its mechanism. We really need to understand the motivational context to give good reviews. It's best to describe what value this code provides to its user. \$\endgroup\$ Commented Sep 19 at 14:50
  • \$\begingroup\$ @TobySpeight "Please edit your question so that the title describes the purpose of the code" I did. Read the 2d paragraph in OP. \$\endgroup\$ Commented Sep 20 at 0:29
  • 2
    \$\begingroup\$ The second paragraph of the post doesn't clarify what the high-level point of all this code is, nor does the title. Please edit to clarify the purpose. For what goal did you bother writing this code, and who is supposed to use it? Why not use builtin UDP/TCP libs? \$\endgroup\$ Commented Sep 20 at 2:35
  • \$\begingroup\$ @guest271314, you don't seem to have edited at all since I wrote that. Almost every question has classes, overloads, callbacks and/or functions, so the current title doesn't give any meaningful information. See the guidance in How do I ask a good question? to help you provide a title that helps users decide whether to look at your question. A good title really helps to attract the people best able to help you with answers. \$\endgroup\$ Commented Sep 20 at 11:02
  • 1
    \$\begingroup\$ I don't think this code can be meaningfully reviewed as currently written. There are three snippets (two functions and a usage example?), referring to a bunch of undeclared variables (local, channel, writer, maybe more, and none of them are available as browser or node globals), and the overall purpose is totally unclear (you wrote a paragraph listing potential uses of the function, but those don't sound like a reasonable "single responsibility" of some function). I rarely downvote questions on CodeReview, but this is one of such questions. \$\endgroup\$ Commented Sep 21 at 14:11

0

You must log in to answer this question.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.