0

I need someome to help or at least any tip. I'm trying to read from large files (100mb - 11gb) line by line and then store some data into Map.

var m map[string]string

// expansive func
func stress(s string, mutex sync.Mutex)  {
    // some very cost operation .... that's why I want to use goroutines
    mutex.Lock()
    m[s] = s // store result
    mutex.Unlock()
}

func main() {
    file, err := os.Open("somefile.txt")
    if err != nil {
        fmt.Println(err)
        return
    }
    defer func() {
        if err = file.Close(); err != nil {
            fmt.Println(err)
            return
        }
    }()

    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        go stress(scanner.Text(), mutex)
    }
}

Without gouroutines it works fine but slow. As you can see, file is large so within loop there will be a lot of gouroutines. And that fact provides two problems:

  1. Sometimes mutex doesn't work properly. And programm crashes. (How many goroutines mutex suppose?)
  2. Everytime some data just lost (But programm doesn't crash)

I suppose I should use WaitGroup, but I cannot understand how it should be. Also I guess there should be some limit for goroutines, maybe some counter. It would be great to run it in 5-20 goroutines.


UPD. Yes, As @user229044 mentioned, I have to pass mutex by pointer. But the problem with limiting goroutines within loop still active.


UPD2. This is how I workaround this problem. I don't exactly understand which way program handle these goroutines and how memory and process time go. Also almost all commentors point on Map structure, but the main problem was to handle runtime of goroutines. How many goroutines spawn if it would be 10billions iterations of Scan() loop, and how goroutines store in RAM?

func stress(s string, mutex *sync.Mutex)  {
    // a lot of coslty ops
    // ...
    // ...
    mutex.Lock()
    m[where] = result // store result
    mutex.Unlock()
    wg.Done()
}

// main
for scanner.Scan() {
    wg.Add(1)
    go func(data string) {
        stress(data, &mutex)
    }(scanner.Text())
}
wg.Wait()
3
  • My two cents: send the lines to a buffered channel and when it is over you just close it. Start several goroutines (use waitgroup) that perform a for range on such channel. Each goroutine send the string to a local map[string]struct{} and merge everything in the end. By fine-tunning the number of goroutines you can find an acceptable solution Commented May 29, 2022 at 21:34
  • @mh-cbon please stop thinking of map. there stored some filtered data, map is not big. From each 10i-th million of input strings calculated data stores in map[i]. Better think of RAM for 10B goroutines... Commented May 30, 2022 at 7:50
  • @mh-cbon I know that I was spawning an unbounded number of go routines. So the main question how to bound it without changing arcitecture. I updated my question, so there's suitable solution for me. But now I don't understand why it works. I run this code on my PC. I'm not pretending for network, just calculated math (some graphs). Each goroutine should work longer then new Scan() happend. So there will be ton of gouroutines. In my suggestion, even 1B gourotines should cost a lot. Then why my pc is NOT out of memory? Is there any built-in mechanism to contorol all of routines? Commented May 30, 2022 at 8:10

1 Answer 1

1

Your specific problem is that you're copying the mutex by value. You should be passing a pointer to the mutex, so that a single instance of your mutex is shared by all function invocations. You're also spawning an unbounded number of go routines, which will eventually exhaust your system's memory.

However, you can spawn as many Go routines as you want and you're only wasting resources for no gain, and juggling all of those useless Go routines will probably cause a net loss of performance. Increased parallelism can't help you when every parallel process has to wait for serialized access to a data structure, as is the case with your map. sync.WaitGroup and mutexes are the wrong approach here.

Instead, to add and control concurrency, you want a buffered channel and single Go routine responsible for map inserts. This way you have one process reading from the file, and one process inserting into the map, decoupling the disk IO from the map insertion.

Something like this:

scanner := bufio.NewScanner(file)

ch := make(chan string, 10)

go func() {
    for s := range ch {
        m[s] = s
    }
}()

for scanner.Scan() {
    ch <- scanner.Text()
}

close(ch)
Sign up to request clarification or add additional context in comments.

5 Comments

Why goroutines won't help? I tested it with some trash but expensive function, so parallelism seems pretty good. go.dev/play/p/7vLBcIOdEpT
@mh-cbon maybe I make wrong description of problem. I have function that very costs, it get input string, make some operations, then it stores result into map. Problem is not to asyncroniously store in map but make whole function a little cheaper.
To @mh-cbon first comment: moving the map logic to the main goroutine ensures the task is complete & all writes are finished. The other way, there's a tiny chance of a race as the final write is written & the map may be read from elsewhere.
@Shamelezz If you have more work to do than simply inserting the values into a map, that work may be parallelizable. But your example does nothing but attempt to parallelize map inserts, which can't be parallelized. Maps are not safe for concurrent writing. When your entire function does nothing but lock a mutex/work/unlock the mutex, there is zero opportunity to parallelize that function. The whole point of the mutex is to guard critical sections so that only one thread of execution can enter that part of your code. You'll have one routine executing, and all other routines waiting.
It's the stuff outside the mutex that may be safe to parallelize, but your question doesn't have anything outside the mutex. So as written, the best course of action is to drop the mutex and rely on a simple buffered channel to serialize access to the map.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.