I don't know if there are already a package there gives this functionality but since it is not that complicated to write you own logic I have made the following example:
import 'dart:async';
import 'dart:collection';
import 'dart:math';
class TaskRunner<A, B> {
final Queue<A> _input = Queue();
final StreamController<B> _streamController = StreamController();
final Future<B> Function(A) task;
final int maxConcurrentTasks;
int runningTasks = 0;
TaskRunner(this.task, {this.maxConcurrentTasks = 5});
Stream<B> get stream => _streamController.stream;
void add(A value) {
_input.add(value);
_startExecution();
}
void addAll(Iterable<A> iterable) {
_input.addAll(iterable);
_startExecution();
}
void _startExecution() {
if (runningTasks == maxConcurrentTasks || _input.isEmpty) {
return;
}
while (_input.isNotEmpty && runningTasks < maxConcurrentTasks) {
runningTasks++;
print('Concurrent workers: $runningTasks');
task(_input.removeFirst()).then((value) async {
_streamController.add(value);
while (_input.isNotEmpty) {
_streamController.add(await task(_input.removeFirst()));
}
runningTasks--;
print('Concurrent workers: $runningTasks');
});
}
}
}
Random _rnd = Random();
Future<List<String>> crawl(String x) =>
Future.delayed(Duration(seconds: _rnd.nextInt(5)), () => x.split('-'));
void main() {
final runner = TaskRunner(crawl, maxConcurrentTasks: 3);
runner.stream.forEach((listOfString) {
if (listOfString.length == 1) {
print('DONE: ${listOfString.first}');
} else {
print('PUTTING STRINGS ON QUEUE: $listOfString');
runner.addAll(listOfString);
}
});
runner.addAll(['1-2-3-4-5-6-7-8-9', '10-20-30-40-50-60-70-80-90']);
}
Which outputs:
Concurrent workers: 1
Concurrent workers: 2
Concurrent workers: 1
PUTTING STRINGS ON QUEUE: [1, 2, 3, 4, 5, 6, 7, 8, 9]
Concurrent workers: 2
Concurrent workers: 3
Concurrent workers: 4
PUTTING STRINGS ON QUEUE: [10, 20, 30, 40, 50, 60, 70, 80, 90]
DONE: 3
DONE: 5
DONE: 1
DONE: 2
DONE: 7
DONE: 4
DONE: 6
DONE: 10
DONE: 8
DONE: 9
DONE: 30
DONE: 20
DONE: 40
DONE: 50
Concurrent workers: 3
DONE: 90
Concurrent workers: 2
DONE: 60
Concurrent workers: 1
DONE: 80
Concurrent workers: 0
DONE: 70
I am sure the usability of the class can be improved but I think the core concept are easy enough to understand. The concepts are we defines a Queue and every time we add stuff to this Queue we checks if we can start executing new async tasks. Else we just skip it since we ensure that every current running async task will check for more content on the Queue before "closing down".
The results is returned by a Stream which you can subscribe on and e.g. add more content to the TaskRunner based on the result like I show in my example. The order the data is returned is based in the order they are finished.
It is important that this is NOT a way to run tasks in multiple threads. All the code are running in a single Dart isolate thread but because HTTP requests are IO delayed there are a point in trying to spawn multiple Future's and wait for the result.