0

I tried to de-duplicate the sent arrays and then merge them into arrays

import { from, BehaviorSubject, distinct, switchMap, toArray } from "rxjs";

let list$ = new BehaviorSubject([1, 2, 3, 2, 3, 5]);

list$.pipe(
  switchMap((e) => from(e)),
  distinct(),
  toArray()
).subscribe(console.log);

expected result:

BehaviorSubject -> [1, 2, 3, 2, 3, 5]
switchMap       -> 1 2 3 2 3 5
distinct        -> 1 2 3 5
toArray         -> [1, 2, 3, 5]
console.log     -> [1, 2, 3, 5]

Actually did not receive any value in console.log, why is this and how can I work as expected

"rxjs": "^7.2.0"

4 Answers 4

3

toArray only emits once the source observable completes.

The following should work as expected.

list$.pipe(
  take(1),
  switchMap(e => e),
  distinct(),
  toArray()
).subscribe(console.log);

If what you really want to do is filter unique values of an array, then RxJS's unique operator might be overkill. I wouldn't bother turning your array into a stream. Just filter the array.

list$.pipe(
  map(a => [...new Set(a)])
).subscribe(console.log);
Sign up to request clarification or add additional context in comments.

3 Comments

Thank you for your answer, but my source will not stop after sending it once. Is there an operator that replaces toArray
Hey @januwa, If that's the case, I don't think you need RxJS to do your uniqueness filtering. See my update above.
Love the seconds example. saved my situation! I have an array of objects coming in from a stream. Needed a unique set of certain properties from the objects in the array (e.g. address) pipe( map( items => [...new Set(items.map(x => x.address))]) ) is what worked for me.
1

So, if the source does not stop after the first notification, I assume that it will continue emit other arrays and that you want to filter the duplicates on each array emitted. In other words, if the list$ of your example emits first [1, 2, 3, 2, 3, 5] and then [3, 2, 1, 6, 6, 6,] what you want to log are 2 arrays, [1, 2, 3, 5] and [3, 2, 1, 6].

If my assumption is right, than the solution could be the following

list$.pipe(
  concatMap((e) => from(e).pipe(
    distinct(),
    toArray()
  )),
).subscribe(console.log);

The trick here is that each from(e) stream will complete when there are no more elements in the array. Therefore, since it completes, the toArray operator can actually work.

Comments

1

scan could do the trick.

list$.pipe(
  switchMap((e) => from(e)),
  distinct(),
  scan((acc, curr) => [...acc, curr], []),
).subscribe(console.log);
// will print: [1], [1, 2], [1, 2, 3], [1, 2, 3, 5]

You could insert debounceTime in the pipe, if you need less emissions:

list$.pipe(
  switchMap((e) => from(e)),
  distinct(),
  scan((acc, curr) => [...acc, curr], []),
  debounceTime(0)
).subscribe(console.log); // will print [1, 2, 3, 5]

Comments

1

If the only requirement is to remove duplicates, you're better off handling it using vaniall JS. See here: https://stackoverflow.com/a/9229821/6513921

We'll take the shortest solution without any regards to performance: uniq = [...new Set(array)];

You could then write a custom RxJS operator to include it in the pipe with other operators.

const { BehaviorSubject, from } = rxjs;
const { map, switchMap } = rxjs.operators;

const uniqueArray = (obs$) => {
  return (obs$) => {
    return obs$.pipe(
      map(arr => [...new Set(arr)])
    );
  };
};

const sub = new BehaviorSubject([1, 2, 3, 2, 3, 5]);

sub.asObservable().pipe(
  uniqueArray()
).subscribe(console.log);

sub.next([6, 3, 1, 6, 7, 1, 1]);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/[email protected]/bundles/rxjs.umd.min.js"></script>

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.