1

I wish to implement parallel api calling in golang using go routines. Once the requests are fired,

  1. I need to wait for all responses (which take different time).
  2. If any of the request fails and returns an error, I wish to end (or pretend) the routines.
  3. I also want to have a timeout value associated with each go routine (or api call).

I have implemented the below for 1 and 2, but need help as to how can I implement 3. Also, feedback on 1 and 2 will also help.

package main

import (
    "errors"
    "fmt"
    "sync"
    "time"
)

func main() {
    var wg sync.WaitGroup

    c := make(chan interface{}, 1)
    c2 := make(chan interface{}, 1)
    err := make(chan interface{})

    wg.Add(1)
    go func() {
        defer wg.Done()
        result, e := doSomeWork()
        if e != nil {
            err <- e
            return
        }
        c <- result
    }()

    wg.Add(1)
    go func() {
        defer wg.Done()
        result2, e := doSomeWork2()
        if e != nil {
            err <- e
            return
        }
        c2 <- result2
    }()

    go func() {
        wg.Wait()
        close(c)
        close(c2)
        close(err)
    }()

    for e := range err {
        // here error happend u could exit your caller function
        fmt.Println("Error==>", e)
        return

    }
    fmt.Println(<-c, <-c2)

}

// mimic api call 1
func doSomeWork() (function1, error) {
    time.Sleep(10 * time.Second)
    obj := function1{"ABC", "29"}
    return obj, nil
}

type function1 struct {
    Name string
    Age  string
}

// mimic api call 2
func doSomeWork2() (function2, error) {
    time.Sleep(4 * time.Second)
    r := errors.New("Error Occured")
    if 1 == 2 {
        fmt.Println(r)
    }
    obj := function2{"Delhi", "Delhi"}
    // return error as nil for now
    return obj, nil
}

type function2 struct {
    City  string
    State string
}

Thanks in advance.

2 Answers 2

3

This kind of fork-and-join pattern is exactly what golang.org/x/sync/errgroup was designed for. (Identifying the appropriate “first error” from a group of goroutines can be surprisingly subtle.)

You can use errgroup.WithContext to obtain a context.Context that is cancelled if any of the goroutines in the group returns. The (*Group).Wait method waits for the goroutines to complete and returns the first error.

For your example, that might look something like: https://play.golang.org/p/jqYeb4chHCZ.


You can then inject a timeout within any given call by wrapping the Context using context.WithTimeout.

(However, in my experience if you've plumbed in cancellation correctly, explicit timeouts are almost never helpful — the end user can cancel explicitly if they get tired of waiting, and you probably don't want to promote degraded service to a complete outage if something starts to take just a bit longer than you expected.)

Sign up to request clarification or add additional context in comments.

Comments

1

To support timeouts and cancelation of goroutine work, the standard mechanism is to use context.Context.

ctx := context.Background() // root context

// wrap the context with a timeout and/or cancelation mechanism

ctx, cancel := context.WithTimeout(ctx, 5*time.Second) // with timeout or cancel
//ctx, cancel := context.WithCancel(ctx)               // no   timeout just cancel

defer cancel() // avoid memory leak if we never cancel/timeout

Next your worker goroutines need to support taking and monitoring the state of the ctx. To do this in parallel with the time.Sleep (to mimic a long computation), convert the sleep to a channel based solution:

// mimic api call 1
func doSomeWork(ctx context.Context) (function1, error) {
    //time.Sleep(10 * time.Second)
    select {
    case <-time.After(10 * time.Second):
        // wait completed
    case <-ctx.Done():
        return function1{}, ctx.Err()
    }
    // ...
}

And if one worker goroutine fails, to signal to the other worker that the request should be aborted, simply call the cancel() function.

result, e := doSomeWork(ctx)
if e != nil {
    cancel()    // <- add this
    err <- e
    return
}

Pulling this all together:

https://play.golang.org/p/1Kpe_tre7XI


EDIT: the sleep example above is obviously a contrived example of how to abort a "fake" task. In the real world, http or SQL DB calls would be involve - and since go 1.7 & 1.8 - the standard library added context support to any of these potentially blocking calls:

func doSomeWork(ctx context.Context) (error) 

    // DB

    db, err := sql.Open("mysql", "...") // check err

    //rows, err := db.Query("SELECT age from users", age)
    rows, err := db.QueryContext(ctx, "SELECT age from users", age)
    if err != nil {
        return err // will return with error if context is canceled
    }

    // http

    // req, err := http.NewRequest("GET", "http://example.com", nil)
    req, err := http.NewRequestWithContext(ctx, "GET", "http://example.com", nil) // check err

    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        return err // will return with error if context is canceled
    }

}

EDIT (2): to poll a context's state without blocking, leverage select's default branch:

 select {
 case <-ctx.Done():
     return ctx.Err()
 default:
     // if ctx is not done - this branch is used
 }

the default branch can optional have code in it, but even if it is empty of code it's presence will prevent blocking - and thus just poll the status of the context in that instant of time.

7 Comments

Beat me to it. The cancelFunc() is sometimes excessive though. It can be omitted if one can assert that context cancellation will occur anyway. (Which is the case for context.WithTimeout(); the cancelFunc is just an added bonus for even earlier cancellation in this case)
From the code that you shared for doSomeWork function, since select is a blocking call, how will the go routine perform the work it intends to do? suppose that this go routine had to fetch data from db, where and how should the statement to fetch data be put in the code? At the same time, I want to keep listening to the context.Done signal to come in from the parent go routine.
@colm.anseo thanks for your answer. It did help me. Just 1 more thing. How would the last function in the chain which receives the context, look like? func lastFuncToReceiveContext(ctx context.Context) error { select { case <-ctx.Done(): return ctx.Err() } // Some processing code here. } Since the select statement is a blocking call, the processing statements in this func never execute (Please correct if am wrng). If a put this select in an infinite for and another go routine, I will need to handle the cancel context in the new routine I create too. Can you help please?
@Eshan final update: on how to avoid blocking by polling a context's state instead.
@colm.anseo yes, I agree that default would unblock the select but once it goes to default, it would never go back to the other case. Assuming that I put my processing instructions in the default case, and during the time when default case is getting executed, if context gets cancelled, it would still execute completely. Don't you think? If I put the entire thing in an infinite for loop (to check the context.done repeatedly), the default case may execute multiple times which isn't what I need.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.