Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 54 additions & 79 deletions locker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,12 @@ import (

// callback function is a function that should be executed right after it inserted into hashmap
// generally callback is responsible for the removing itself from the hashmap
// id - id of the lock
// notifyCh - channel to notify that all locks were removed
// stopCh - broadcast channel to stop all the callbacks associated with the resource
type callback func(id string, notifyCh chan<- struct{}, stopCh <-chan struct{})
type callback func(notifyCh chan<- struct{}, stopCh <-chan struct{})

// item represents callback element
type item struct {
// callback to remove the item
callback callback
// item's stop channel
stopCh chan struct{}
// item's update TTL channel
Expand Down Expand Up @@ -102,21 +99,19 @@ func (l *locker) lock(ctx context.Context, res, id string, ttl int) bool {

rr.ownerID.Store(new(id))
rr.writerCount.Store(1)
rr.readerCount.Store(0)

l.resources[res] = rr

callb, stopCbCh, updateTTLCh := l.makeLockCallback(res, id, ttl)

rr.locks.Store(id, &item{
callback: callb,
stopCh: stopCbCh,
updateTTLCh: updateTTLCh,
})

// run the callback
go func() {
callb(id, rr.notificationCh, rr.stopCh)
callb(rr.notificationCh, rr.stopCh)
}()

l.globalMu.unlock()
Expand Down Expand Up @@ -167,7 +162,7 @@ func (l *locker) lock(ctx context.Context, res, id string, ttl int) bool {
l.log.Debug("got release mutex back", "id", id)

// inconsistent, still have readers/writers after notification
if r.writerCount.Load() != 0 && r.readerCount.Load() != 0 {
if r.writerCount.Load() != 0 || r.readerCount.Load() != 0 {
l.log.Error("inconsistent state, should be zero writers and zero readers",
"resource", res,
"id", id,
Expand All @@ -183,13 +178,12 @@ func (l *locker) lock(ctx context.Context, res, id string, ttl int) bool {
callb, stopCbCh, updateTTLCh := l.makeLockCallback(res, id, ttl)

r.locks.Store(id, &item{
callback: callb,
stopCh: stopCbCh,
updateTTLCh: updateTTLCh,
})
// run the callback
go func() {
callb(id, r.notificationCh, r.stopCh)
callb(r.notificationCh, r.stopCh)
}()

r.resourceMu.unlock()
Expand Down Expand Up @@ -264,7 +258,7 @@ func (l *locker) lock(ctx context.Context, res, id string, ttl int) bool {
}

// inconsistent, still have readers/writers after notification
if r.writerCount.Load() != 0 && r.readerCount.Load() != 0 {
if r.writerCount.Load() != 0 || r.readerCount.Load() != 0 {
l.log.Error("inconsistent state, should be zero writers and zero readers",
"resource", res,
"id", id,
Expand All @@ -284,14 +278,13 @@ func (l *locker) lock(ctx context.Context, res, id string, ttl int) bool {
callb, stopCbCh, updateTTLCh := l.makeLockCallback(res, id, ttl)

r.locks.Store(id, &item{
callback: callb,
stopCh: stopCbCh,
updateTTLCh: updateTTLCh,
})

// run the callback
go func() {
callb(id, r.notificationCh, r.stopCh)
callb(r.notificationCh, r.stopCh)
}()

r.resourceMu.unlock()
Expand Down Expand Up @@ -322,14 +315,13 @@ func (l *locker) lock(ctx context.Context, res, id string, ttl int) bool {
callb, stopCbCh, updateTTLCh := l.makeLockCallback(res, id, ttl)

r.locks.Store(id, &item{
callback: callb,
stopCh: stopCbCh,
updateTTLCh: updateTTLCh,
})

// run the callback
go func() {
callb(id, r.notificationCh, r.stopCh)
callb(r.notificationCh, r.stopCh)
}()

r.resourceMu.unlock()
Expand Down Expand Up @@ -363,14 +355,13 @@ func (l *locker) lock(ctx context.Context, res, id string, ttl int) bool {
callb, stopCbCh, updateTTLCh := l.makeLockCallback(res, id, ttl)

r.locks.Store(id, &item{
callback: callb,
stopCh: stopCbCh,
updateTTLCh: updateTTLCh,
})

// run the callback
go func() {
callb(id, r.notificationCh, r.stopCh)
callb(r.notificationCh, r.stopCh)
}()

r.resourceMu.unlock()
Expand Down Expand Up @@ -416,22 +407,20 @@ func (l *locker) lockRead(ctx context.Context, res, id string, ttl int) bool {
}

rr.ownerID.Store(new(""))
rr.writerCount.Store(0)
rr.readerCount.Store(1)

l.resources[res] = rr

callb, stopCbCh, updateTTLCh := l.makeLockCallback(res, id, ttl)

rr.locks.Store(id, &item{
callback: callb,
stopCh: stopCbCh,
updateTTLCh: updateTTLCh,
})

// run the callback
go func() {
callb(id, rr.notificationCh, rr.stopCh)
callb(rr.notificationCh, rr.stopCh)
}()

l.globalMu.unlock()
Expand Down Expand Up @@ -490,7 +479,7 @@ func (l *locker) lockRead(ctx context.Context, res, id string, ttl int) bool {
}

// inconsistent, still have readers/writers after notification
if r.writerCount.Load() != 0 && r.readerCount.Load() != 0 {
if r.writerCount.Load() != 0 || r.readerCount.Load() != 0 {
l.log.Error("inconsistent state, should be zero writers and zero readers",
"resource", res,
"id", id,
Expand All @@ -507,14 +496,13 @@ func (l *locker) lockRead(ctx context.Context, res, id string, ttl int) bool {
callb, stopCbCh, updateTTLCh := l.makeLockCallback(res, id, ttl)

r.locks.Store(id, &item{
callback: callb,
stopCh: stopCbCh,
updateTTLCh: updateTTLCh,
})

// run the callback
go func() {
callb(id, r.notificationCh, r.stopCh)
callb(r.notificationCh, r.stopCh)
}()

r.resourceMu.unlock()
Expand All @@ -534,21 +522,19 @@ func (l *locker) lockRead(ctx context.Context, res, id string, ttl int) bool {
"resource", res,
"id", id)
// increase readers
r.writerCount.Store(0)
r.readerCount.Add(1)

// we have TTL, create callback
callb, stopCbCh, updateTTLCh := l.makeLockCallback(res, id, ttl)

r.locks.Store(id, &item{
callback: callb,
stopCh: stopCbCh,
updateTTLCh: updateTTLCh,
})

// run the callback
go func() {
callb(id, r.notificationCh, r.stopCh)
callb(r.notificationCh, r.stopCh)
}()

r.resourceMu.unlock()
Expand Down Expand Up @@ -698,11 +684,7 @@ func (l *locker) exists(ctx context.Context, res, id string) bool {

// Special case, check if we have any locks
if id == "*" {
if r.writerCount.Load() > 0 || r.readerCount.Load() > 0 {
return true
}

return false
return r.writerCount.Load() > 0 || r.readerCount.Load() > 0
}

if _, existsID := r.locks.Load(id); !existsID {
Expand Down Expand Up @@ -751,15 +733,6 @@ func (l *locker) updateTTL(ctx context.Context, res, id string, ttl int) bool {
"resource", res,
"id", id)

if !ok {
l.log.Warn("no such resource",
"resource", res,
"id", id)

r.resourceMu.unlockRelease()
return false
}

rl, ok := r.locks.Load(id)
if !ok {
l.log.Warn("no such resource ID",
Expand Down Expand Up @@ -811,7 +784,7 @@ func (l *locker) makeLockCallback(res, id string, ttl int) (callback, chan struc
updateTTLCh := make(chan int, 1)

// at this point, when adding lock, we should not have the callback
return func(lockID string, notifCh chan<- struct{}, sCh <-chan struct{}) {
return func(notifCh chan<- struct{}, sCh <-chan struct{}) {
// case for the items without TTL. We should add such items to control their flow
cbttl := ttl
if cbttl == 0 {
Expand All @@ -820,45 +793,47 @@ func (l *locker) makeLockCallback(res, id string, ttl int) (callback, chan struc

// TTL channel
ta := time.NewTicker(time.Microsecond * time.Duration(cbttl))
loop:
select {
case <-ta.C:
l.log.Debug("r/lock: ttl expired",
"resource", res,
"id", lockID,
"ttl microseconds", cbttl,
)
ta.Stop()
// broadcast stop channel
case <-sCh:
l.log.Debug("r/lock: ttl removed, stop broadcast call",
"resource", res,
"id", lockID,
"ttl microseconds", cbttl,
)
ta.Stop()
// item stop channel
case <-stopCbCh:
l.log.Debug("r/lock: ttl removed, stop callback call",
"resource", res,
"id", lockID,
"ttl microseconds", cbttl,
)
ta.Stop()
case newTTL := <-updateTTLCh:
// if the new TTL is 0, we should treat it as unlimited
if newTTL == 0 {
newTTL = 31555952000000 // year
for {
select {
case <-ta.C:
l.log.Debug("r/lock: ttl expired",
"resource", res,
"id", id,
"ttl microseconds", cbttl,
)
ta.Stop()
// broadcast stop channel
case <-sCh:
l.log.Debug("r/lock: ttl removed, stop broadcast call",
"resource", res,
"id", id,
"ttl microseconds", cbttl,
)
ta.Stop()
// item stop channel
case <-stopCbCh:
l.log.Debug("r/lock: ttl removed, stop callback call",
"resource", res,
"id", id,
"ttl microseconds", cbttl,
)
ta.Stop()
case newTTL := <-updateTTLCh:
// if the new TTL is 0, we should treat it as unlimited
if newTTL == 0 {
newTTL = 31555952000000 // year
}
l.log.Debug("r/lock: ttl was updated",
"resource", res,
"id", id,
"new ttl microseconds", newTTL)
// update the initial ttl
cbttl = newTTL
ta.Reset(time.Microsecond * time.Duration(cbttl))
// in case of TTL, we don't need to remove the item, only update TTL
continue
}
l.log.Debug("r/lock: ttl was updated",
"resource", res,
"id", id,
"new ttl microseconds", newTTL)
// update the initial ttl
cbttl = newTTL
ta.Reset(time.Microsecond * time.Duration(cbttl))
// in case of TTL, we don't need to remove the item, only update TTL
goto loop
break
}

// unlimited but should not be long
Expand Down
4 changes: 2 additions & 2 deletions rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package lock

import (
"context"
stderr "errors"
"errors"
"time"

"connectrpc.com/connect"
Expand All @@ -11,7 +11,7 @@ import (

const defaultImmediateTimeout = time.Millisecond

var errEmptyID = stderr.New("empty ID is not allowed")
var errEmptyID = errors.New("empty ID is not allowed")

type rpc struct {
pl *Plugin
Expand Down
Loading