1

How to I go about implementing the aggregation pattern in Go, I have to send a bunch of http request concurrently where each go routine will call the endpoint and send the response status on a channel. Now on the main calling function I will range through the channel and display all the responses.

The problem is how do I unblock the channel ?? - I cannot close the channel from the go routines as it will be closed before the complete work is done

package main

import (
    "fmt"
    "net/http"
    "sync"
    "time"

    "golang.org/x/net/context"
)

func main() {

    var wg sync.WaitGroup
    wg.Add(10)
    c := make(chan string, 100)
    ctx := context.Background()

    ctx, cancel := context.WithTimeout(ctx, 5*time.Second)

    defer cancel()
    for i := 1; i <= 10; i++ {
        go SendHttpRequest(ctx, c, &wg)
    }

    for v := range c {
        fmt.Println(v)
    }

    wg.Wait()

}

func SendHttpRequest(ctx context.Context, c chan string, wg *sync.WaitGroup) {

    //defer wg.Done()
    client := http.Client{}
    req, err := http.NewRequest("POST", "https://jsonplaceholder.typicode.com/posts/1", nil)
    if err != nil {
        panic(err)
    }
    req.WithContext(ctx)

    res, _ := client.Do(req)

    select {
    case <-time.After(1 * time.Microsecond):
        c <- res.Status
    case <-ctx.Done():
        c <- "599 ToLong"
    }
    if res != nil {
        defer res.Body.Close()
    }
    //close(c)
    defer wg.Done()
}
1
  • 1
    Random note: replace "golang.org/x/net/context" with "context". I think you are supposed to use "context" as of 1.8. Commented Dec 7, 2018 at 20:38

2 Answers 2

2

Use the WaitGroup

go func(){
  wg.Wait()
  close(c)
}()

for v := range c {
  fmt.Println(v)
}

// Don't bother with wg.Wait() here
Sign up to request clarification or add additional context in comments.

2 Comments

Thankyou. it worked well. I didnt think we could close it inside a goroutine.
You can because the code is synchronized via the WaitGroup.
0

In this kind of situation use a generator and idiomatic early defer patterns:

import (
    "fmt"
    "errors"
    "net/http"
    "sync"
    "time"

    "golang.org/x/net/context"
)

func main() {

    ctx := context.Background()
    ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
    defer cancel() // defer early context cancel 

    for v := range requests(ctx) {
        fmt.Println(v)
    }

}

// requests generator (handling synchro)
func requests(ctx context.Context)<-chan string {

    c := make(chan string/*, 100*/) // No need for buffer, do it on the fly
    go func(){
        defer close(c) // defer early chan close, will also check goroutine ending
        
        var wg sync.WaitGroup
        defer wg.Wait() // defer early wait
    
        wg.Add(10)
        for i := 1; i <= 10; i++ {
            go func() {
                defer wg.Done() // defer early goroutine waitgroup done

                if status, err := SendHttpRequest(ctx, c); err != nil {
                    c <- status
                }
            }()
        }
    }

    return c
}

// SendHttpRequest looks more conventional, no goroutines, no syncro (waitgroup not spread)
func SendHttpRequest(ctx context.Context) (status string, err error) {

    client := http.Client{}
    req, err := http.NewRequest("POST", "https://jsonplaceholder.typicode.com/posts/1", nil)
    if err != nil {
        return
    }
    req.WithContext(ctx)

    res, err := client.Do(req)
    if err != nil {
        if errors.Is(err, context.Canceled) { // check that request was not cancelled by context cancel trigger
            status = "599 ToLong"
        }
        return 
    }
    defer res.Body.Close() // defer early response body close (in case of no error)
    status = res.Status
    return
}

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.