1

I'm working on a Go microservice where I am using robfig/cron package to schedule jobs, initially thought of using time.Ticker() but package made it easier for me. My question is how to ensure cron job is registered only once until the running cron completes and prevent multiple goroutines from running simultaneously.

This is different from singleton approach, the block of code inside RunProcess() must not run until the previous RunProcess function completes.

I am new to golang and its convention. :(

Here is a simplified version of my current implementation:

package main

import (
    "fmt"
    "sync"
    "time"

    "github.com/robfig/cron/v3"
)

var isProcessRunning = false
var mu sync.Mutex

func RunProcess() {
    mu.Lock()
    defer mu.Unlock()

    if isProcessRunning {
        fmt.Println("Already Running...")
        return
    }

    isProcessRunning = true
    fmt.Println("Running...")

    // Simulate work
    time.Sleep(15 * time.Second)
    isProcessRunning = false
}

func InitCron() {
    // Create a new cron scheduler
    c := cron.New(cron.WithSeconds())

    // Add the task to run every 10 seconds
    _, err := c.AddFunc("*/10 * * * * *", RunProcess)
    if err != nil {
        fmt.Println("Error adding cron job:", err)
        return
    }

    // Start the cron scheduler
    c.Start()

    // Block indefinitely to keep the cron jobs running
    select {}
}

func main() {
    InitCron()
}

However, I noticed that when InitCron is called multiple times, it can potentially create multiple cron jobs, leading to concurrency issues and unexpected behavior for an lightweight microservice.

Any advice or examples on how to manage this properly would be greatly appreciated.

Running cron in golang and tryin to prevent the cron until it first cron is finished

4
  • This question is similar to: Singleton in go. If you believe it’s different, please edit the question, make it clear how it’s different and/or how the answers on that question are not helpful for your problem (as your question seems to be how to prevent InitCron running multiple times). Note that RunProcess is probably not doing what you want playground. Commented Jul 3, 2024 at 8:35
  • 1
    "RunProcess() must not run.." your use of a mutex in RunProcess already ensures this will not run concurrently, so you appear to have already achieved your goal? The code is confusing because "Already Running" will never be printed (due to the locked Mutex); my playground link shows how to address that, but it's unclear if that is what you want. The link ref singleton was in answer to your "noticed that when InitCron is called multiple times..." comment (which seemed to be your main question?). Commented Jul 3, 2024 at 9:42
  • Thank you @Brits for the response. I have edited my question. I am trying to lockRunProcess execution using isProcessRunning variable, when next cron cycle calls RunProcess it should stop running same code. I am trying to achieve message de-qeueuing in 1st process, the 2nd/3rd process must not affect 1st RunProcess until 1st process marked as complete. In your code you have used wg.done() its kind of acknowledgment right? Commented Jul 3, 2024 at 10:35
  • 1
    "...you have used wg.done() its kind of acknowledgment right" - Not really; this is just used to wait until the Goroutines have completed (otherwise the program would end, terminating everything, before the goroutines complete). I'm still not 100% clear on your aim but will post an answer with suggestions as this is difficult in the comments. Commented Jul 3, 2024 at 20:22

2 Answers 2

2

I think the issue is you are trying to make a generic programming solution in go where you have a powerful set of (simple) tools for it.

To make sure I understood right, you want to trigger a function to run every X seconds, and if its already running to skip the current trigger.

a cron job periodically runs a process, so instead of trying to change its inherent behaviour I would recommend just using something more suitable.

My suggestion is a channel that continuesly listens to triggers and processes them, this means it will only process one at a time by nature. if you keep the channel size default (size 0) it will block until the processing is finished so they wont be able to run in parallel.

the only slight issue I see is it means it wont "wait" the additional time between triggers if a trigger is called while processing is ongoing. if you dont like this behavior. simply within the processing function add a timer to wait your interval.

// sendTriggers sends a trigger to the channel every 10 seconds.
func sendTriggers(triggerChan chan<- struct{}) {
    ticker := time.NewTicker(10 * time.Second)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            triggerChan <- struct{}{}
        }
    }
}

// processTriggers listens to the channel and processes each trigger one at a time.
func processTriggers(triggerChan <-chan struct{}) {
    for {
        select {
        case <-triggerChan:
            fmt.Println("Received trigger, processing...")
            // Simulate work with sleep
            time.Sleep(5 * time.Second)
            fmt.Println("Processing complete.")
        }
    }
}

func main() {
    triggerChan := make(chan struct{})
    
    go sendTriggers(triggerChan)
    go processTriggers(triggerChan)
    
    // Prevent the main function from exiting
    select {}
}
Sign up to request clarification or add additional context in comments.

Comments

1

Your code is already preventing "multiple goroutines from running simultaneously" but probably not in the way you expect. There are two ways you could handle this:

  1. Concurrent calls block until the earlier call completes (a queue may form).
  2. Concurrent calls exit without performing the task (i.e. time.Sleep).

Your code currently takes the first approach; but the fmt.Println("Already Running...") indicates that you expect it to take the second.

So why is "Already Running..." never output? This is due to the way you are using the Mutex. When you call Lock:

Lock locks m. If the lock is already in use, the calling goroutine blocks until the mutex is available.

This means that when RunProcess is called concurrently, execution will block at mu.Lock() until mu.Unlock() is called (in a seperate goroutine). You defer the mu.Unlock() call which means the Mutex is unlocked "the moment the surrounding function returns". This may be easier to see if taken step by step:

  1. First RunProcess() begins, calls mu.Lock() and proceeds to time.Sleep(15 * time.Second).
  2. Second RunProcess() begins, blocks at mu.Lock().
  3. First RunProcess() completes, this means the deferred mu.Unlock() runs.
  4. Second RunProcess() acquires Mutex and proceeds to time.Sleep(15 * time.Second).

I believe that whay you intend to happen is for the second RunProcess() to output "Already Running..." and terminate; if that is the case your code would need to be more like playground:

func RunProcess() {
    mu.Lock() // Mutex protects isProcessRunning variable    
    if isProcessRunning {
        fmt.Println("Already Running...")
        mu.Unlock()
        return
    }
    isProcessRunning = true
    mu.Unlock() // isProcessRunning has been checked/set so release the Mutex (allowing other checks to proceed)

    fmt.Println("Running...")

    // Simulate work
    time.Sleep(2 * time.Second)

    fmt.Println("Done")

    mu.Lock() // Need to access isProcessRunning so acquire Mutex
    isProcessRunning = false
    mu.Unlock()
}

Hopefully this answers the first part of your question. I believe the second part ref "InitCron is called multiple times" is a duplicate (i.e. use sync.Once). However it could be argued that documenting that InitCron must only be called once may be sufficient.

Moving on to how your solution could be improved; I'm going to assume that you have chosen robfig/cron for valid reasons (I use it for scheduling reports etc). Preventing concurrent execution of tasks is a common requirement so robfig/cron provides SkipIfStillRunning; there is an example in the docs.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.