0

I have some tasks executing in a WhenAll(). I get a semantic error if a task returns an object and calls an async method inside their Run(). The async method fetches from Blob some string content, then constructs and returns an object.

Do you know how to solve this issue, while maintaining the batch download done by tasks? I need a list with those FinalWrapperObjects.

Error message

Cannot convert async lamba expression to delegate type 'Func<FinalWrapperObject>'. An async lambda expression may return void, Task or Task, none of which are convertible to 'Func<FinalWrapperObject>'.

...
List<FinalWrapperObject> finalReturns = new List<FinalWrapperObject>();
List<Task<FinalWrapperObject>> tasks = new List<Task<FinalWrapperObject>>();
var resultsBatch = fetchedObjects.Skip(i).Take(10).ToList();

foreach (var resultBatchItem in resultsBatch)
{
    tasks.Add(
        new Task<FinalWrapperObject>(async () => //!! errors here on arrow
        {
            var blobContent = await azureBlobService.GetAsync(resultBatchItem.StoragePath);
            return new FinalWrapperObject {
                BlobContent = blobContent,
                CreationDateTime = resultBatchItem.CreationDateTime
            };
        })
    );
}

FinalWrapperObject[] listFinalWrapperObjects = await Task.WhenAll(tasks);
finalReturns.AddRange(listFinalWrapperObjects);

return finalReturns;
4
  • 1
    Which Task<T> constructor are you expecting to use? Note that explicitly constructing a Task<T> is very rare. I would suggest just using a local async method, and calling it within the loop... Commented Jun 1, 2022 at 10:01
  • 3
    Replace new Task<FinalWrapperObject> with Task<FinalWrapperObject>.Run or simply Task.Run? Commented Jun 1, 2022 at 10:15
  • Do you have any particular reason to invoke the azureBlobService.GetAsync method on ThreadPool threads, instead of invoking it on the current thread? Commented Jun 1, 2022 at 10:50
  • Also are you aware that on .NET 6 there is available a Chunk LINQ operator, that makes the .Skip(i).Take(10) shenanigans obsolete? Commented Jun 1, 2022 at 10:57

1 Answer 1

4

Your code never starts any tasks. Tasks aren't threads anyway. They're a promise that something will complete and maybe produce a value in the future. Some tasks require a thread to run. These are executed using threads that come from a threadpool. Others, eg async IO operations, don't require a thread. Uploading a file is such an IO operation.

Your lambda is asynchronous and already returning a Task so there's no reason to use Task.Run. You can execute it once for all items, collect the Tasks in a list and await all of them. That's the bare-bones way :

async Task<FinalWrapperObject> UploadItemAsync(BatchItem resultBatchItem) =>
{
    var blobContent = await azureBlobService.GetAsync(resultBatchItem.StoragePath);
    return new FinalWrapperObject {
        BlobContent = blobContent,
        CreationDateTime = resultBatchItem.CreationDateTime
    };
}

...

var tasks=resultsBatch.Select(UploadItemAsync);
var results=await Task.WhenAll(tasks);

Using TPL Dataflow

A better option would be to use the TPL Dataflow classes to upload items concurrently and even construct a pipeline from processing blocks.

var options= new ExecutionDataflowBlockOptions
         {
            MaxDegreeOfParallelism = 10
         };
var results=new BufferBlock<FinalWrapperObject>();
var uploader=new TransformBlock<BatchItem,FinalWrapperObject>(UploadItemAsync,options);
uploader.LinkTo(results);

foreach(var item in fetchedObjects)
{
    uploader.PostAsync(item);
}
uploader.Complete();
await uploader.Completion;

By default, a block only processes one message at a time. Using MaxDegreeOfParallelism = 10 we're telling it to process 10 items concurrently. This code will upload 10 items concurrently at a time, as long as there items to post to the uploader block.

The results are forwarded to the results BufferBlock. The items can be extracted with TryReceiveAll :

IList<FinalWrapperObject> items;
results.TryReceiveAll(out items);

Dataflow blocks can be combined into a pipeline. You could have a block that loads items from disk, another to upload them and a final one that stores the response to another file or database :

var dop10= new ExecutionDataflowBlockOptions
         {
            MaxDegreeOfParallelism = 10,
            BoundedCapacity=4
         };
var bounded= new ExecutionDataflowBlockOptions
         {
            BoundedCapacity=4
         };

var loader=new TransformBlock<FileInfo,BatchItem>(LoadFile,bounded);
var uploader=new TransformBlock<BatchItem,FinalWrapperObject>(UploadItemAsync,dop10);
var dbLogger=new ActionBlock<FinalWrapperObject>(bounded);

var linkOptions=new DataflowLinkOptions {PropagateCompletion=true};
loader.LinkTo(uploader,linkOptions);
uploader.LinkTo(dbLogger,linkOptions);

var folder=new DirectoryInfo(rootPath);
foreach(var item in folder.EnumerateFiles())
{
    await loader.SendAsync(item);
}
loader.Complete();

await dbLogger.Completion;

In this case, all files in a folder are posted to the loader block which loads files one by one and forwards a BatchItem. The uploader uploads the file and the results are stored by dbLogger. In the end, we tell loader we're finished and wait for all items to get processed all the way to the end with await dbLogger.Completion.

The BoundedCapacity is used to put a limit on how many items can be held at each block's input buffer. This prevents loading all files into memory.

Sign up to request clarification or add additional context in comments.

1 Comment

Hi! Can you show better where would this part go in my code? async Task<FinalWrapperObject> UploadItemAsync(BatchItem resultBatchItem) ... I don't know where to put this function to make it compile.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.