0

I have a three async function that I want to call from multiple threads in parallel at the same time. Till now I have tried the following approach -

int numOfThreads = 4; 
var taskList = List<Task>(); 
using(fs = new FileStream(inputFilePath, FileMode.OpenOrCreate,FileAccess.ReadWrite,FileShare.ReadWrite))
{
    for(int i=1; i<= numOfThreads ; i++) 
   {
      taskList.Add(Task.Run( async() => {
       byte[] buffer = new byte[length]; // length could be upto a few thousand
       await Function1Async(); // Reads from the file into a byte array
       long result = await Function2Aync(); // Does some async operation with that byte array data
       await Function3Async(result); // Writes the result into the file 
      }
   }
}
Task.WaitAll(taskList.toArray());  

However, not all of the tasks complete before the execution reaches an end. I have limited experience with threading in c#. What am I doing wrong in my code? Or should I take an alternative approach?

EDIT - So I made some changes to my approach. I got rid of the Function3Async for now -

for(int i=1;i<=numOfThreads; i++) 
{
   using(fs = new FileStream(----))
   {
      taskList.Add(Task.Run( async() => {
       byte[] buffer = new byte[length]; // length could be upto a few thousand
       await Function1Async(buffer); // Reads from the file into a byte array
       Stream data = new MemoryStream(buffer); 
       /** Write the Stream into a file and return 
        * the offset at which the write operation was done
        */
       long blockStartOffset = await Function2Aync(data); 

       Console.WriteLine($"Block written at - {blockStartOffset}");
      }
   }
}
Task.WaitAll(taskList.toArray());

Now all threads seem to proceed to completion but the Function2Async seems to randomly write some Japanese characters to the output file. I guess it is some threading issue perhaps? Here is the implementation of the Function2Async ->

public async Task<long> Function2Async(Stream data)
{
        long offset = getBlockOffset(); 
        using(var outputFs = new FileStream(fileName,
            FileMode.OpenOrCreate,
            FileAccess.ReadWrite,
            FileShare.ReadWrite))
        {
            outputFs.Seek(offset, SeekOrigin.Begin);
            await data.CopyToAsync(outputFs);     
        }
         return offset;
}
5
  • If WaitAll returns then all Tasks have completed. Have you examined each Task to see whether they completed successfully? Commented Jun 13, 2022 at 5:30
  • Juat add awai t before async in the Task call Commented Jun 13, 2022 at 5:54
  • @user18387401 yes I have examined that. Commented Jun 13, 2022 at 7:37
  • Could you include in the question the Function1Async and Function3Async methods? Commented Jun 13, 2022 at 11:26
  • @Theodor Zoulias I have made an edit to my question to include some more code and my changed approach. Commented Jun 14, 2022 at 6:48

1 Answer 1

1

In your example you have passed neither fs nor buffer into Function1Async but your comment says it reads from fs into buffer, so I will assume that is what happens.

You cannot read from a stream in parallel. It does not support that. If you find one that supports it, it will be horribly inefficient, because that is how hard disk storage works. Even worse if it is a network drive.

Read from the stream into your buffers first and in sequence, then let your threads loose and run your logic. In parallel, on the already existing buffers in memory.

Writing by the way would have the same problem if you wrote to the same file. If you write to one file per buffer, that's fine, otherwise, do it sequentially.

Sign up to request clarification or add additional context in comments.

3 Comments

Ok but the file that I am reading could be quite large. I decided to store each block (aka the data that I will write with each single thread) in some List<byte[]>. I changed my code and firstly read the data in a sequential and synchronous manner into this list and start writing the data from multiple threads. But, will this approach be good for huge files?
You could modify this: stackoverflow.com/questions/17188357/… to work for bytes instead. The only available generator is for text files and lines, you would have to build your own and tell it where your byte chunks end.
@Mapper you might want to learn about the producer-consumer pattern. In .NET is usually implemented with the BlockingCollection<T> class.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.