8

My goal is to create a kind of web-crawler in dart. For this I want to maintain an task queue where the elements are stored that need to be crawled (e.g URLs). The elements are crawled within the crawl function which returns a List of more elements that need to be processed. Thus these elements are added to the queue. Example code:

import "dart:collection";
final queue = Queue<String>();
main() async{
  queue
    ..add("...")
    ..add("...")
    ..add("...");
  while (queue.isNotEmpty) {
    results = await crawl(queue.removeFirst());
    queue.addAll(results);
  }
}

Future<List<String>> crawl(String x) async {
  ...
  res = await http.get(x)
  ...
  return results;
}

This code of coarse only processes one element at a time. However I want to have a pool of workers (for example 5) that take elements out of the queue and process them at the same time, and add the results back to the queue. Since the bottleneck is the HTTP Request I think a Future.wait() call with multiple workers could speed up the execution. However I dont want to overload the servers and thus I also want to limit the amount of workers.

Can this be realized with basic async primitives and semaphores? I would like to avoid isolates if possible in order to keep the solution as easy as possible.

2 Answers 2

14

I don't know if there are already a package there gives this functionality but since it is not that complicated to write you own logic I have made the following example:

import 'dart:async';
import 'dart:collection';
import 'dart:math';

class TaskRunner<A, B> {
  final Queue<A> _input = Queue();
  final StreamController<B> _streamController = StreamController();
  final Future<B> Function(A) task;

  final int maxConcurrentTasks;
  int runningTasks = 0;

  TaskRunner(this.task, {this.maxConcurrentTasks = 5});

  Stream<B> get stream => _streamController.stream;

  void add(A value) {
    _input.add(value);
    _startExecution();
  }

  void addAll(Iterable<A> iterable) {
    _input.addAll(iterable);
    _startExecution();
  }

  void _startExecution() {
    if (runningTasks == maxConcurrentTasks || _input.isEmpty) {
      return;
    }

    while (_input.isNotEmpty && runningTasks < maxConcurrentTasks) {
      runningTasks++;
      print('Concurrent workers: $runningTasks');

      task(_input.removeFirst()).then((value) async {
        _streamController.add(value);

        while (_input.isNotEmpty) {
          _streamController.add(await task(_input.removeFirst()));
        }

        runningTasks--;
        print('Concurrent workers: $runningTasks');
      });
    }
  }
}

Random _rnd = Random();
Future<List<String>> crawl(String x) =>
    Future.delayed(Duration(seconds: _rnd.nextInt(5)), () => x.split('-'));

void main() {
  final runner = TaskRunner(crawl, maxConcurrentTasks: 3);

  runner.stream.forEach((listOfString) {
    if (listOfString.length == 1) {
      print('DONE: ${listOfString.first}');
    } else {
      print('PUTTING STRINGS ON QUEUE: $listOfString');
      runner.addAll(listOfString);
    }
  });

  runner.addAll(['1-2-3-4-5-6-7-8-9', '10-20-30-40-50-60-70-80-90']);
}

Which outputs:

Concurrent workers: 1
Concurrent workers: 2
Concurrent workers: 1
PUTTING STRINGS ON QUEUE: [1, 2, 3, 4, 5, 6, 7, 8, 9]
Concurrent workers: 2
Concurrent workers: 3
Concurrent workers: 4
PUTTING STRINGS ON QUEUE: [10, 20, 30, 40, 50, 60, 70, 80, 90]
DONE: 3
DONE: 5
DONE: 1
DONE: 2
DONE: 7
DONE: 4
DONE: 6
DONE: 10
DONE: 8
DONE: 9
DONE: 30
DONE: 20
DONE: 40
DONE: 50
Concurrent workers: 3
DONE: 90
Concurrent workers: 2
DONE: 60
Concurrent workers: 1
DONE: 80
Concurrent workers: 0
DONE: 70

I am sure the usability of the class can be improved but I think the core concept are easy enough to understand. The concepts are we defines a Queue and every time we add stuff to this Queue we checks if we can start executing new async tasks. Else we just skip it since we ensure that every current running async task will check for more content on the Queue before "closing down".

The results is returned by a Stream which you can subscribe on and e.g. add more content to the TaskRunner based on the result like I show in my example. The order the data is returned is based in the order they are finished.

It is important that this is NOT a way to run tasks in multiple threads. All the code are running in a single Dart isolate thread but because HTTP requests are IO delayed there are a point in trying to spawn multiple Future's and wait for the result.

Sign up to request clarification or add additional context in comments.

4 Comments

Awesome, thanks a lot! How comes that "Concurrent workers: 4" is shown even though maxConcurrentTasks=3 in your example?
The line while (_input.isNotEmpty && runningTasks <= maxConcurrentTasks) { should be while (_input.isNotEmpty && runningTasks < maxConcurrentTasks) { instead I guess.
Any way to await until all tasks from queue are processed?
@TomRaganowicz Is possible but it is going to be a bit hard to make something generic usable since if we use a Future to communicate when it is done, then we could in theory end up putting more events on the queue before we trigger the "isDone" code. I don't really have any good simple idea I can provide which would work in all situations.
2

Similar to the p-limit package in js. There is now a p_limit package in Dart. (Disclaimer, I am the author of the implementation of it in Dart). See the documentation

Just instantiate it with the argument being how many concurrent tasks you want, then make a list of Futures, each wrapped with the generated limit function below.

import 'package:p_limit/p_limit.dart';


void main() async {
  // Example concurrency of 3 futures at once
  final limit = PLimit<http.Response>(3);

  final queue = Queue<String>();
  queue
    ..add("http://www.exampleone.com/")
    ..add("http://www.exampletwo.com/")
    ..add("http://www.examplethree.com/")
    ..add("http://www.examplefour.com/");
    
    
  final futures = queue.map((url) {

    // wrap the function we are calling in the limit function we defined above
    return limit(() => http.get(Uri.parse(url)));
  });

  // Only three futures are run at once (as defined above)
  final results = await Future.wait(futures);
  print(results);
}

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.