-1

reading the first n bytes of a byte stream (in form of a AsyncIterable) feels cumbersome and error prone.

Is there a better way to implement this?

async function shift(
  length: number,
  stream: AsyncIterable<Uint8Array>
): Promise<[Uint8Array, AsyncIterable<Uint8Array>]> {
  const prefix = new Uint8Array(length);

  let offset = 0;

  const iterator = stream[Symbol.asyncIterator]();

  while (true) {
    const { done, value } = await iterator.next();

    if (done) {
      throw new Error("Buffer underflow");
    } else {
      const chunk = value;
      if (chunk.length < length - offset) {
        prefix.set(chunk, offset);
        offset += chunk.length;
      } else {
        const slice = chunk.slice(0, length - offset);
        prefix.set(slice, offset);

        return [prefix, prepend(chunk.slice(slice.length), stream)];
      }
    }
  }
}

async function* prepend(
  prefix: Uint8Array,
  stream: AsyncIterable<Uint8Array>
) {
  yield prefix;
  yield* stream;
}
3
  • If you're seeking code review, there's a dedicated SE site for that: codereview.stackexchange.com Otherwise, "Is there a better way...?" questions solicit opinion answers, so they're generally not on-topic for SO. Commented May 4, 2023 at 12:19
  • 2
    Before posting on Code Review please read A guide to Code Review for Stack Overflow users and How do I ask a good question?. Commented May 4, 2023 at 13:17
  • 1
    Please don't cross post on Code Review. There is no reason to post there when you have 2 valid answers (and one accepted answer) here on Stack Overflow. Commented May 5, 2023 at 13:14

2 Answers 2

1

stream primitives

We'll start by defining stream primitives -

flatten<T>(t: AsyncIterable<Iterable<T>>): AsyncIterable<T>
take<T>(t: AsyncIterable<T>, n: number): AsyncIterable<T>
skip<T>(t: AsyncIterable<T>, n: number): AsyncIterable<T>
toArray<T>(t: AsyncIterable<T>): Promise<Array<T>>
async function *flatten<T>(t: AsyncIterable<Iterable<T>>): AsyncIterable<T> {
  for await (const a of t) {
    yield *a
  }
}

async function *take<T>(t: AsyncIterable<T>, n: number): AsyncIterable<T> {
  for await (const v of t) {
    if (n-- <= 0) return
    yield v 
  }
  if (n > 0) throw Error("buffer underflow")
}

async function *skip<T>(t: AsyncIterable<T>, n: number): AsyncIterable<T> {
  for await (const v of t) {
    if (n-- > 0) continue
    yield v
  }
  if (n > 0) throw Error("buffer underflow")
}

async function toArray<T>(t: AsyncIterable<T>): Promise<Array<T>> {
  const r = []
  for await (const v of t) r.push(v)
  return r
}

shift

Using these stream primitives, we can write shift in a comfortable and safe way -

shift(stream: AsyncIterable<Uint8Array>, count: number): Promise<[Uint8Array, AsyncIterable<number>]>
async function shift(stream: AsyncIterable<Uint8Array>, count: number) {
  return [
    new Uint8Array(await toArray(take(flatten(stream), count))),
    skip(flatten(stream), count)
  ] as const
}

Let's create a mock buffer and test it -

const buffer: AsyncIterable<Uint8Array> = {
  async *[Symbol.asyncIterator]() {
    for (const v of [[0,1,2],[3,4],[5,6,7,8],[9]]) {
      yield new Uint8Array(v)
      await new Promise(r => setTimeout(r, 100))
    }
  }
}

async function main() {
  const [first, rest] = await shift(buffer, 4)
  console.log({
    first: Array.from(first),
    rest: await toArray(rest)
  })
}

main().then(console.log, console.error)
{
  first: [0, 1, 2, 3],
  rest: [4, 5, 6, 7, 8, 9]
}

demo

Run and verify the result on the typescript playground

Sign up to request clarification or add additional context in comments.

3 Comments

Instead of defining these yourself, you may as well use the iterator helper methods
I don't think your shift method works as the OP expects: it actually iterates the stream twice
thanks @Bergi I was unaware of the helper methods.
1

I think the iterator logic itself can be simplified by using a notClosing helper and normal iteration:

async function shift(
  length: number,
  stream: AsyncIterable<Uint8Array>
): Promise<[Uint8Array, AsyncIterable<Uint8Array>]> {
  const prefix = new Uint8Array(length);
  const iter = stream[Symbol.asyncIterator]();
  let offset = 0;
  for await (const chunk of notClosing(iter)) {
    if (chunk.length < length - offset) {
      prefix.set(chunk, offset);
      offset += chunk.length;
    } else {
      const slice = chunk.slice(0, length - offset);
      prefix.set(slice, offset);
      return [prefix, prepend(chunk.slice(slice.length), iter)];
    }
  }
  throw new Error("Buffer underflow");
}

Unless you want to convert the stream from an iterator of chunks into a much less efficient iterator of individual bytes, there's nothing you can further simplify about the offset logic.

const AsyncIteratorPrototype = Object.getPrototypeOf(Object.getPrototypeOf(async function*(){}.prototype)) as AsyncIterator<any>;
function prepend<T>(val: T, iter: AsyncIterator<T>): AsyncIterable<T> & AsyncIterator<T> {
  return Object.assign(Object.create(AsyncIteratorPrototype), {
    first: true,
    next() {
      if (this.first) {
        const res = {done: false, value: val};
        val = undefined!; // GC
        this.first = false;
        return res;
      }
      return iter.next();
    },
    return: iter.return ? () => iter.return!() : undefined,
  });
}
function notClosing<T>(iter: AsyncIterator<T>): AsyncIterable<T> & AsyncIterator<T> {
  return Object.assign(Object.create(AsyncIteratorPrototype), {
    next: iter.next.bind(iter),
  });
}

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.