Skip to main content
deleted 2 characters in body
Source Link
toolic
  • 16.4k
  • 6
  • 29
  • 221

So, I was reading about the implementation of distributed locks where we need to verify the lease using a fencing token as per this article - https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html

package distributedlock

import (
    "errors"
    "fmt"
    "math"
    "runtime"
    "sync"
    "sync/atomic"
    "time"

    "github.com/pratyush-prateek/distributed-lock-go/utils"
)

const LockState_LOCKED = 1 << 1
const LockState_UNLOCKED = 1 << 2
const FencingToken_MAX = math.MaxUint32

type DistributedLock struct {
    State               uint32
    CurrentFencingToken uint32
    Name                string
}

type DistributedLocker struct {
    lockerGuard *sync.RWMutex
    Locks       map[string]DistributedLock
}

type Locker interface {
    CreateLock(name string, clientId string) error
    AcquireLock(name string, timeInSeconds int, clientId string) (uint32, error)
    VerifyLease(name string, leaseToken uint32, clientId string) (bool, error)
    ReleaseLock(name string, leaseToken uint32, clientId string) (bool, error)
}

// Create a distributed lock
func (distributedLocker *DistributedLocker) CreateLock(name string, clientId string) error {
    err := utils.VerifyStringEmpty(clientId, "clientId")

    if err != nil {
        return err
    }

    distributedLocker.lockerGuard.Lock()
    defer distributedLocker.lockerGuard.Unlock()

    if _, exists := distributedLocker.Locks[name]; exists {
        fmt.Printf("lock %v already created by client %v\n", name, clientId)
        return nil
    }

    distributedLocker.Locks[name] = DistributedLock{
        State:               uint32(LockState_UNLOCKED),
        CurrentFencingToken: uint32(0),
        Name:                name,
    }
    fmt.Printf("new lock %v created by client %v\n", name, clientId)
    return nil
}

// Acquire a distributed lock
func (distributedLocker *DistributedLocker) AcquireLock(name string, timeInSeconds int, clientId string) (uint32, error) {
    err := utils.VerifyStringEmpty(clientId, "clientId")

    if err != nil {
        return uint32(0), err
    }

    if timeInSeconds <= 0 {
        return uint32(0), errors.New("non zero value of lock expiry time is expected")
    }

    distributedLocker.lockerGuard.RLock()
    lock, exists := distributedLocker.Locks[name]
    distributedLocker.lockerGuard.RUnlock()

    if !exists {
        return uint32(0), fmt.Errorf("lock %v does not exists", name)
    } else {
        // Try to change lock state
        if atomic.CompareAndSwapUint32(&lock.State, LockState_UNLOCKED, LockState_LOCKED) {
            // Get a fencing token and set a timer in another goroutine to expire the lock
            updatedToken := atomic.AddUint32(&lock.CurrentFencingToken, 1) // safe to do, as token can only be updated by changing lock state
            go func() {
                timer := time.NewTimer(time.Second * time.Duration(timeInSeconds))
                <-timer.C // wait for timer to expire
                if atomic.CompareAndSwapUint32(&lock.State, LockState_LOCKED, LockState_UNLOCKED) {
                    fmt.Printf("Lock on %v by client %v expired after %v seconds\n", name, clientId, timeInSeconds)
                } else {
                    // Lock has already been released
                    fmt.Printf("Lock on %v already released\n", name)
                }

                runtime.Goexit() // For safety
            }()

            fmt.Printf("Lock %v successfully acquired by client %v", name, clientId)
            return updatedToken, nil
        } else {
            // Else state is locked, failed to acquire the lock
            return uint32(0), fmt.Errorf("failed to acquire lock %v by client %v", name, clientId)
        }
    }
}

// Verify lease on a distributed lock, if it has expired or not
// For clients where their is some sort of pause between acquiring lock and accessing the critical section
// clients should verify the lease through fencing token
func (distributedLocker *DistributedLocker) VerifyLease(name string, leaseToken uint32, clientId string) (bool, error) {
    distributedLocker.lockerGuard.RLock()
    lock, exists := distributedLocker.Locks[name]
    distributedLocker.lockerGuard.RUnlock()

    if !exists {
        return false, fmt.Errorf("lock %v does not exists", name)
    } else {
        if atomic.LoadUint32(&lock.State) == LockState_LOCKED {
            if atomic.LoadUint32(&lock.CurrentFencingToken) == leaseToken {
                fmt.Printf("Lease on lock %v successfully verified by client %v", lock.Name, clientId)
                return true, nil
            } else {
                return false, fmt.Errorf("lease on lock %v has expired", lock.Name)
            }
        } else {
            // Lock has been expired
            return false, fmt.Errorf("lock %v has already been unlocked and lease on this lock has expired", lock.Name)
        }
    }
}

// Release a distributed lock, requires a lease token
// Clients cannot release an expired lock, or a lock with old lease token
func (distributedLocker *DistributedLocker) ReleaseLock(name string, leaseToken uint32, clientId string) (bool, error) {
    distributedLocker.lockerGuard.RLock()
    lock, exists := distributedLocker.Locks[name]
    distributedLocker.lockerGuard.RUnlock()

    if !exists {
        return false, fmt.Errorf("lock %v does not exists", name)
    } else {
        // Check lock state
        if atomic.LoadUint32(&lock.State) == LockState_LOCKED {
            // If locked, then only verify the lease and unlock using CAS
            if atomic.LoadUint32(&lock.CurrentFencingToken) == leaseToken {
                if atomic.CompareAndSwapUint32(&lock.State, LockState_LOCKED, LockState_UNLOCKED) {
                    // Lock successfully released
                    fmt.Printf("lock %v successfully released by client %v after lease verification", name, clientId)
                    return true, nil
                } else {
                    // Lease is verified but CAS failed, means post lease verification, someone else released the lock
                    // Or it expired in between
                    return false, fmt.Errorf("lease on lock %v verified by client %v but the lock expired", name, clientId)
                }
            } else {
                return false, fmt.Errorf("lease on lock %v has expired", lock.Name)
            }
        } else {
            // If unlocked, means lock has been expired due to timer, or unlocked previously by user
            return false, fmt.Errorf("the lock on %v has expired or the lock has already been released", name)
        }
    }
}
```

So, I was reading about the implementation of distributed locks where we need to verify the lease using a fencing token as per this article - https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html

package distributedlock

import (
    "errors"
    "fmt"
    "math"
    "runtime"
    "sync"
    "sync/atomic"
    "time"

    "github.com/pratyush-prateek/distributed-lock-go/utils"
)

const LockState_LOCKED = 1 << 1
const LockState_UNLOCKED = 1 << 2
const FencingToken_MAX = math.MaxUint32

type DistributedLock struct {
    State               uint32
    CurrentFencingToken uint32
    Name                string
}

type DistributedLocker struct {
    lockerGuard *sync.RWMutex
    Locks       map[string]DistributedLock
}

type Locker interface {
    CreateLock(name string, clientId string) error
    AcquireLock(name string, timeInSeconds int, clientId string) (uint32, error)
    VerifyLease(name string, leaseToken uint32, clientId string) (bool, error)
    ReleaseLock(name string, leaseToken uint32, clientId string) (bool, error)
}

// Create a distributed lock
func (distributedLocker *DistributedLocker) CreateLock(name string, clientId string) error {
    err := utils.VerifyStringEmpty(clientId, "clientId")

    if err != nil {
        return err
    }

    distributedLocker.lockerGuard.Lock()
    defer distributedLocker.lockerGuard.Unlock()

    if _, exists := distributedLocker.Locks[name]; exists {
        fmt.Printf("lock %v already created by client %v\n", name, clientId)
        return nil
    }

    distributedLocker.Locks[name] = DistributedLock{
        State:               uint32(LockState_UNLOCKED),
        CurrentFencingToken: uint32(0),
        Name:                name,
    }
    fmt.Printf("new lock %v created by client %v\n", name, clientId)
    return nil
}

// Acquire a distributed lock
func (distributedLocker *DistributedLocker) AcquireLock(name string, timeInSeconds int, clientId string) (uint32, error) {
    err := utils.VerifyStringEmpty(clientId, "clientId")

    if err != nil {
        return uint32(0), err
    }

    if timeInSeconds <= 0 {
        return uint32(0), errors.New("non zero value of lock expiry time is expected")
    }

    distributedLocker.lockerGuard.RLock()
    lock, exists := distributedLocker.Locks[name]
    distributedLocker.lockerGuard.RUnlock()

    if !exists {
        return uint32(0), fmt.Errorf("lock %v does not exists", name)
    } else {
        // Try to change lock state
        if atomic.CompareAndSwapUint32(&lock.State, LockState_UNLOCKED, LockState_LOCKED) {
            // Get a fencing token and set a timer in another goroutine to expire the lock
            updatedToken := atomic.AddUint32(&lock.CurrentFencingToken, 1) // safe to do, as token can only be updated by changing lock state
            go func() {
                timer := time.NewTimer(time.Second * time.Duration(timeInSeconds))
                <-timer.C // wait for timer to expire
                if atomic.CompareAndSwapUint32(&lock.State, LockState_LOCKED, LockState_UNLOCKED) {
                    fmt.Printf("Lock on %v by client %v expired after %v seconds\n", name, clientId, timeInSeconds)
                } else {
                    // Lock has already been released
                    fmt.Printf("Lock on %v already released\n", name)
                }

                runtime.Goexit() // For safety
            }()

            fmt.Printf("Lock %v successfully acquired by client %v", name, clientId)
            return updatedToken, nil
        } else {
            // Else state is locked, failed to acquire the lock
            return uint32(0), fmt.Errorf("failed to acquire lock %v by client %v", name, clientId)
        }
    }
}

// Verify lease on a distributed lock, if it has expired or not
// For clients where their is some sort of pause between acquiring lock and accessing the critical section
// clients should verify the lease through fencing token
func (distributedLocker *DistributedLocker) VerifyLease(name string, leaseToken uint32, clientId string) (bool, error) {
    distributedLocker.lockerGuard.RLock()
    lock, exists := distributedLocker.Locks[name]
    distributedLocker.lockerGuard.RUnlock()

    if !exists {
        return false, fmt.Errorf("lock %v does not exists", name)
    } else {
        if atomic.LoadUint32(&lock.State) == LockState_LOCKED {
            if atomic.LoadUint32(&lock.CurrentFencingToken) == leaseToken {
                fmt.Printf("Lease on lock %v successfully verified by client %v", lock.Name, clientId)
                return true, nil
            } else {
                return false, fmt.Errorf("lease on lock %v has expired", lock.Name)
            }
        } else {
            // Lock has been expired
            return false, fmt.Errorf("lock %v has already been unlocked and lease on this lock has expired", lock.Name)
        }
    }
}

// Release a distributed lock, requires a lease token
// Clients cannot release an expired lock, or a lock with old lease token
func (distributedLocker *DistributedLocker) ReleaseLock(name string, leaseToken uint32, clientId string) (bool, error) {
    distributedLocker.lockerGuard.RLock()
    lock, exists := distributedLocker.Locks[name]
    distributedLocker.lockerGuard.RUnlock()

    if !exists {
        return false, fmt.Errorf("lock %v does not exists", name)
    } else {
        // Check lock state
        if atomic.LoadUint32(&lock.State) == LockState_LOCKED {
            // If locked, then only verify the lease and unlock using CAS
            if atomic.LoadUint32(&lock.CurrentFencingToken) == leaseToken {
                if atomic.CompareAndSwapUint32(&lock.State, LockState_LOCKED, LockState_UNLOCKED) {
                    // Lock successfully released
                    fmt.Printf("lock %v successfully released by client %v after lease verification", name, clientId)
                    return true, nil
                } else {
                    // Lease is verified but CAS failed, means post lease verification, someone else released the lock
                    // Or it expired in between
                    return false, fmt.Errorf("lease on lock %v verified by client %v but the lock expired", name, clientId)
                }
            } else {
                return false, fmt.Errorf("lease on lock %v has expired", lock.Name)
            }
        } else {
            // If unlocked, means lock has been expired due to timer, or unlocked previously by user
            return false, fmt.Errorf("the lock on %v has expired or the lock has already been released", name)
        }
    }
}
```

I was reading about the implementation of distributed locks where we need to verify the lease using a fencing token as per this article - https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html

package distributedlock

import (
    "errors"
    "fmt"
    "math"
    "runtime"
    "sync"
    "sync/atomic"
    "time"

    "github.com/pratyush-prateek/distributed-lock-go/utils"
)

const LockState_LOCKED = 1 << 1
const LockState_UNLOCKED = 1 << 2
const FencingToken_MAX = math.MaxUint32

type DistributedLock struct {
    State               uint32
    CurrentFencingToken uint32
    Name                string
}

type DistributedLocker struct {
    lockerGuard *sync.RWMutex
    Locks       map[string]DistributedLock
}

type Locker interface {
    CreateLock(name string, clientId string) error
    AcquireLock(name string, timeInSeconds int, clientId string) (uint32, error)
    VerifyLease(name string, leaseToken uint32, clientId string) (bool, error)
    ReleaseLock(name string, leaseToken uint32, clientId string) (bool, error)
}

// Create a distributed lock
func (distributedLocker *DistributedLocker) CreateLock(name string, clientId string) error {
    err := utils.VerifyStringEmpty(clientId, "clientId")

    if err != nil {
        return err
    }

    distributedLocker.lockerGuard.Lock()
    defer distributedLocker.lockerGuard.Unlock()

    if _, exists := distributedLocker.Locks[name]; exists {
        fmt.Printf("lock %v already created by client %v\n", name, clientId)
        return nil
    }

    distributedLocker.Locks[name] = DistributedLock{
        State:               uint32(LockState_UNLOCKED),
        CurrentFencingToken: uint32(0),
        Name:                name,
    }
    fmt.Printf("new lock %v created by client %v\n", name, clientId)
    return nil
}

// Acquire a distributed lock
func (distributedLocker *DistributedLocker) AcquireLock(name string, timeInSeconds int, clientId string) (uint32, error) {
    err := utils.VerifyStringEmpty(clientId, "clientId")

    if err != nil {
        return uint32(0), err
    }

    if timeInSeconds <= 0 {
        return uint32(0), errors.New("non zero value of lock expiry time is expected")
    }

    distributedLocker.lockerGuard.RLock()
    lock, exists := distributedLocker.Locks[name]
    distributedLocker.lockerGuard.RUnlock()

    if !exists {
        return uint32(0), fmt.Errorf("lock %v does not exists", name)
    } else {
        // Try to change lock state
        if atomic.CompareAndSwapUint32(&lock.State, LockState_UNLOCKED, LockState_LOCKED) {
            // Get a fencing token and set a timer in another goroutine to expire the lock
            updatedToken := atomic.AddUint32(&lock.CurrentFencingToken, 1) // safe to do, as token can only be updated by changing lock state
            go func() {
                timer := time.NewTimer(time.Second * time.Duration(timeInSeconds))
                <-timer.C // wait for timer to expire
                if atomic.CompareAndSwapUint32(&lock.State, LockState_LOCKED, LockState_UNLOCKED) {
                    fmt.Printf("Lock on %v by client %v expired after %v seconds\n", name, clientId, timeInSeconds)
                } else {
                    // Lock has already been released
                    fmt.Printf("Lock on %v already released\n", name)
                }

                runtime.Goexit() // For safety
            }()

            fmt.Printf("Lock %v successfully acquired by client %v", name, clientId)
            return updatedToken, nil
        } else {
            // Else state is locked, failed to acquire the lock
            return uint32(0), fmt.Errorf("failed to acquire lock %v by client %v", name, clientId)
        }
    }
}

// Verify lease on a distributed lock, if it has expired or not
// For clients where their is some sort of pause between acquiring lock and accessing the critical section
// clients should verify the lease through fencing token
func (distributedLocker *DistributedLocker) VerifyLease(name string, leaseToken uint32, clientId string) (bool, error) {
    distributedLocker.lockerGuard.RLock()
    lock, exists := distributedLocker.Locks[name]
    distributedLocker.lockerGuard.RUnlock()

    if !exists {
        return false, fmt.Errorf("lock %v does not exists", name)
    } else {
        if atomic.LoadUint32(&lock.State) == LockState_LOCKED {
            if atomic.LoadUint32(&lock.CurrentFencingToken) == leaseToken {
                fmt.Printf("Lease on lock %v successfully verified by client %v", lock.Name, clientId)
                return true, nil
            } else {
                return false, fmt.Errorf("lease on lock %v has expired", lock.Name)
            }
        } else {
            // Lock has been expired
            return false, fmt.Errorf("lock %v has already been unlocked and lease on this lock has expired", lock.Name)
        }
    }
}

// Release a distributed lock, requires a lease token
// Clients cannot release an expired lock, or a lock with old lease token
func (distributedLocker *DistributedLocker) ReleaseLock(name string, leaseToken uint32, clientId string) (bool, error) {
    distributedLocker.lockerGuard.RLock()
    lock, exists := distributedLocker.Locks[name]
    distributedLocker.lockerGuard.RUnlock()

    if !exists {
        return false, fmt.Errorf("lock %v does not exists", name)
    } else {
        // Check lock state
        if atomic.LoadUint32(&lock.State) == LockState_LOCKED {
            // If locked, then only verify the lease and unlock using CAS
            if atomic.LoadUint32(&lock.CurrentFencingToken) == leaseToken {
                if atomic.CompareAndSwapUint32(&lock.State, LockState_LOCKED, LockState_UNLOCKED) {
                    // Lock successfully released
                    fmt.Printf("lock %v successfully released by client %v after lease verification", name, clientId)
                    return true, nil
                } else {
                    // Lease is verified but CAS failed, means post lease verification, someone else released the lock
                    // Or it expired in between
                    return false, fmt.Errorf("lease on lock %v verified by client %v but the lock expired", name, clientId)
                }
            } else {
                return false, fmt.Errorf("lease on lock %v has expired", lock.Name)
            }
        } else {
            // If unlocked, means lock has been expired due to timer, or unlocked previously by user
            return false, fmt.Errorf("the lock on %v has expired or the lock has already been released", name)
        }
    }
}
Source Link

Distributed locking with fencing token implementation in Golang

So, I was reading about the implementation of distributed locks where we need to verify the lease using a fencing token as per this article - https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html

I tried to implement a library in Golang which can run as a standalone Golang application. Clients open a TCP connection with this Golang application and issue commands to create a distributed lock and get a fencing token, verify lease on a lock and release a lock. Now, the code for accepting concurrent TCP connections is still in progress, I wanted to check whether the following logic for implementing a simple distributed lock using atomic counters for fencing token would work in a simple scenario?

It works when I try to test it using concurrent calls, but wanted to make sure there are no bugs.

package distributedlock

import (
    "errors"
    "fmt"
    "math"
    "runtime"
    "sync"
    "sync/atomic"
    "time"

    "github.com/pratyush-prateek/distributed-lock-go/utils"
)

const LockState_LOCKED = 1 << 1
const LockState_UNLOCKED = 1 << 2
const FencingToken_MAX = math.MaxUint32

type DistributedLock struct {
    State               uint32
    CurrentFencingToken uint32
    Name                string
}

type DistributedLocker struct {
    lockerGuard *sync.RWMutex
    Locks       map[string]DistributedLock
}

type Locker interface {
    CreateLock(name string, clientId string) error
    AcquireLock(name string, timeInSeconds int, clientId string) (uint32, error)
    VerifyLease(name string, leaseToken uint32, clientId string) (bool, error)
    ReleaseLock(name string, leaseToken uint32, clientId string) (bool, error)
}

// Create a distributed lock
func (distributedLocker *DistributedLocker) CreateLock(name string, clientId string) error {
    err := utils.VerifyStringEmpty(clientId, "clientId")

    if err != nil {
        return err
    }

    distributedLocker.lockerGuard.Lock()
    defer distributedLocker.lockerGuard.Unlock()

    if _, exists := distributedLocker.Locks[name]; exists {
        fmt.Printf("lock %v already created by client %v\n", name, clientId)
        return nil
    }

    distributedLocker.Locks[name] = DistributedLock{
        State:               uint32(LockState_UNLOCKED),
        CurrentFencingToken: uint32(0),
        Name:                name,
    }
    fmt.Printf("new lock %v created by client %v\n", name, clientId)
    return nil
}

// Acquire a distributed lock
func (distributedLocker *DistributedLocker) AcquireLock(name string, timeInSeconds int, clientId string) (uint32, error) {
    err := utils.VerifyStringEmpty(clientId, "clientId")

    if err != nil {
        return uint32(0), err
    }

    if timeInSeconds <= 0 {
        return uint32(0), errors.New("non zero value of lock expiry time is expected")
    }

    distributedLocker.lockerGuard.RLock()
    lock, exists := distributedLocker.Locks[name]
    distributedLocker.lockerGuard.RUnlock()

    if !exists {
        return uint32(0), fmt.Errorf("lock %v does not exists", name)
    } else {
        // Try to change lock state
        if atomic.CompareAndSwapUint32(&lock.State, LockState_UNLOCKED, LockState_LOCKED) {
            // Get a fencing token and set a timer in another goroutine to expire the lock
            updatedToken := atomic.AddUint32(&lock.CurrentFencingToken, 1) // safe to do, as token can only be updated by changing lock state
            go func() {
                timer := time.NewTimer(time.Second * time.Duration(timeInSeconds))
                <-timer.C // wait for timer to expire
                if atomic.CompareAndSwapUint32(&lock.State, LockState_LOCKED, LockState_UNLOCKED) {
                    fmt.Printf("Lock on %v by client %v expired after %v seconds\n", name, clientId, timeInSeconds)
                } else {
                    // Lock has already been released
                    fmt.Printf("Lock on %v already released\n", name)
                }

                runtime.Goexit() // For safety
            }()

            fmt.Printf("Lock %v successfully acquired by client %v", name, clientId)
            return updatedToken, nil
        } else {
            // Else state is locked, failed to acquire the lock
            return uint32(0), fmt.Errorf("failed to acquire lock %v by client %v", name, clientId)
        }
    }
}

// Verify lease on a distributed lock, if it has expired or not
// For clients where their is some sort of pause between acquiring lock and accessing the critical section
// clients should verify the lease through fencing token
func (distributedLocker *DistributedLocker) VerifyLease(name string, leaseToken uint32, clientId string) (bool, error) {
    distributedLocker.lockerGuard.RLock()
    lock, exists := distributedLocker.Locks[name]
    distributedLocker.lockerGuard.RUnlock()

    if !exists {
        return false, fmt.Errorf("lock %v does not exists", name)
    } else {
        if atomic.LoadUint32(&lock.State) == LockState_LOCKED {
            if atomic.LoadUint32(&lock.CurrentFencingToken) == leaseToken {
                fmt.Printf("Lease on lock %v successfully verified by client %v", lock.Name, clientId)
                return true, nil
            } else {
                return false, fmt.Errorf("lease on lock %v has expired", lock.Name)
            }
        } else {
            // Lock has been expired
            return false, fmt.Errorf("lock %v has already been unlocked and lease on this lock has expired", lock.Name)
        }
    }
}

// Release a distributed lock, requires a lease token
// Clients cannot release an expired lock, or a lock with old lease token
func (distributedLocker *DistributedLocker) ReleaseLock(name string, leaseToken uint32, clientId string) (bool, error) {
    distributedLocker.lockerGuard.RLock()
    lock, exists := distributedLocker.Locks[name]
    distributedLocker.lockerGuard.RUnlock()

    if !exists {
        return false, fmt.Errorf("lock %v does not exists", name)
    } else {
        // Check lock state
        if atomic.LoadUint32(&lock.State) == LockState_LOCKED {
            // If locked, then only verify the lease and unlock using CAS
            if atomic.LoadUint32(&lock.CurrentFencingToken) == leaseToken {
                if atomic.CompareAndSwapUint32(&lock.State, LockState_LOCKED, LockState_UNLOCKED) {
                    // Lock successfully released
                    fmt.Printf("lock %v successfully released by client %v after lease verification", name, clientId)
                    return true, nil
                } else {
                    // Lease is verified but CAS failed, means post lease verification, someone else released the lock
                    // Or it expired in between
                    return false, fmt.Errorf("lease on lock %v verified by client %v but the lock expired", name, clientId)
                }
            } else {
                return false, fmt.Errorf("lease on lock %v has expired", lock.Name)
            }
        } else {
            // If unlocked, means lock has been expired due to timer, or unlocked previously by user
            return false, fmt.Errorf("the lock on %v has expired or the lock has already been released", name)
        }
    }
}
```