I tried to use Go to write a DelayQueue. I referred to the implementation in Java. So the general logic is to have a minimum heap, then try to fetch the top element of the heap every time, if it is found to have expired, then fetch it, otherwise use cond#wait to suspend the current routine.
One of the biggest problems in the process is Cond does not support timeout waiting. For example, if the top element of the heap has 1 minute left to expire, I can just wait 1 minute. So I found an implementation that supports timeout.
As mentioned in this issue, it is not recommended to use Cond in Go, but channel instead. However, I do not know whether using channel is a better choice here.
I am new to Go, so there must be a lot of other problems with my code, and I hope to get your advice.
waitable_cond.go:
package main
import (
"sync"
"sync/atomic"
"time"
"unsafe"
)
// Code from : https://gist.github.com/zviadm/c234426882bfc8acba88f3503edaaa36#file-cond2-go
type Cond struct {
L sync.Locker
n unsafe.Pointer
}
func NewCond(l sync.Locker) *Cond {
c := &Cond{L: l}
n := make(chan struct{})
c.n = unsafe.Pointer(&n)
return c
}
// Wait Waits for Broadcast calls. Similar to regular sync.Cond, this unlocks the underlying
// locker first, waits on changes and re-locks it before returning.
func (c *Cond) Wait() {
n := c.NotifyChan()
c.L.Unlock()
<-n
c.L.Lock()
}
// WaitWithTimeout Same as Wait() call, but will only wait up to a given timeout.
func (c *Cond) WaitWithTimeout(t time.Duration) {
n := c.NotifyChan()
c.L.Unlock()
select {
case <-n:
case <-time.After(t):
}
c.L.Lock()
}
// NotifyChan Returns a channel that can be used to wait for next Broadcast() call.
func (c *Cond) NotifyChan() <-chan struct{} {
ptr := atomic.LoadPointer(&c.n)
return *((*chan struct{})(ptr))
}
// Broadcast call notifies everyone that something has changed.
func (c *Cond) Broadcast() {
n := make(chan struct{})
ptrOld := atomic.SwapPointer(&c.n, unsafe.Pointer(&n))
close(*(*chan struct{})(ptrOld))
}
priority_queue.go:
package main
import (
"container/heap"
)
// Code from :https://pkg.go.dev/container/heap
// An Item is something we manage in a priority queue.
type Item struct {
value interface{} // The value of the item; arbitrary.
priority int64 // The priority of the item in the queue.
// The index is needed by update and is maintained by the heap.Interface methods.
index int // The index of the item in the heap.
}
// A PriorityQueue implements heap.Interface and holds Items.
type PriorityQueue []*Item
const initCap = 12
func newPriorityQueue() PriorityQueue {
return make(PriorityQueue, 0, initCap)
}
func (pq PriorityQueue) Len() int { return len(pq) }
func (pq PriorityQueue) Less(i, j int) bool {
return pq[i].priority < pq[j].priority
}
func (pq PriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
func (pq *PriorityQueue) Push(x interface{}) {
n := len(*pq)
c := cap(*pq)
if n+1 > c {
npq := make(PriorityQueue, n, c*2)
copy(npq, *pq)
*pq = npq
}
item := x.(*Item)
item.index = n
*pq = append(*pq, item)
}
func (pq *PriorityQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
item.index = -1 // for safety
*pq = old[0 : n-1]
return item
}
func (pq *PriorityQueue) Peek() interface{} {
if pq.Len() == 0 {
return nil
}
return (*pq)[0]
}
// update modifies the priority and value of an Item in the queue.
func (pq *PriorityQueue) update(item *Item, value string, priority int64) {
item.value = value
item.priority = priority
heap.Fix(pq, item.index)
}
delay_queue.go:
package main
import (
"container/heap"
"sync"
"time"
)
type DelayQueue struct {
wakeup chan interface{}
cond *Cond
pq PriorityQueue
}
type DelayItem struct {
deadline time.Time
data interface{}
}
// NewDelayQueue New creates an instance of delayQueue with the specified size.
func NewDelayQueue() *DelayQueue {
return &DelayQueue{
wakeup: make(chan interface{}),
pq: newPriorityQueue(),
cond: NewCond(&sync.Mutex{}),
}
}
func (dq *DelayQueue) Put(data interface{}, delay time.Duration) {
dq.cond.L.Lock()
defer dq.cond.L.Unlock()
item := &Item{
value: DelayItem{
time.Now().Add(delay),
data,
},
priority: time.Now().Add(delay).Unix(),
}
heap.Push(&dq.pq, item)
if dq.pq.Peek() == item {
dq.cond.Broadcast()
}
}
// Take This is block until get an item from Queue
func (dq *DelayQueue) Take() interface{} {
dq.cond.L.Lock()
defer dq.cond.L.Unlock()
for {
first := dq.pq.Peek()
if first == nil {
dq.cond.Wait()
} else {
deadline := first.(*Item).value.(DelayItem).deadline.Sub(time.Now())
if first.(*Item).value.(DelayItem).deadline.Before(time.Now()) {
return heap.Pop(&dq.pq).(*Item).value.(DelayItem).data
}
dq.cond.WaitWithTimeout(deadline)
}
}
}