Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 68 additions & 6 deletions token/services/selector/sherdlock/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ package sherdlock

import (
"context"
"io"
"sync"
"sync/atomic"
"time"

Comment thread
NETIZEN-11 marked this conversation as resolved.
"github.com/hyperledger-labs/fabric-smart-client/pkg/utils/errors"
"github.com/hyperledger-labs/fabric-smart-client/platform/common/utils/collections"
"github.com/hyperledger-labs/fabric-smart-client/platform/common/utils/collections/iterators"
"github.com/hyperledger-labs/fabric-token-sdk/token"
Expand Down Expand Up @@ -170,7 +172,11 @@ type cachedFetcher struct {
queriesResponded uint32
// prevKeys tracks cache keys from the previous update cycle to identify stale entries that need removal.
prevKeys map[string]struct{}
mu sync.RWMutex
// isUpdating indicates if a cache refresh is currently in progress.
isUpdating bool
// updateCond allows goroutines to wait for an in-progress update to complete.
updateCond *sync.Cond
mu sync.RWMutex
}

// NewCachedFetcher creates a fetcher that maintains a periodically refreshed cache of all tokens.
Expand Down Expand Up @@ -198,24 +204,60 @@ func NewCachedFetcher(tokenDB TokenDB, cacheSize int64, freshnessInterval time.D
panic("failed to create ristretto cache: " + err.Error())
}

return &cachedFetcher{
f := &cachedFetcher{
tokenDB: tokenDB,
cache: ristrettoCache,
freshnessInterval: freshnessInterval,
maxQueriesBeforeRefresh: uint32(maxQueriesBeforeRefresh),
prevKeys: make(map[string]struct{}),
}
f.updateCond = sync.NewCond(&f.mu)

return f
}

// finishUpdate releases the update lock and signals all waiting goroutines.
// Broadcast wakes all goroutines that are waiting on updateCond, not just one.
// Must be called while holding f.mu.
func (f *cachedFetcher) finishUpdate() {
f.isUpdating = false
f.updateCond.Broadcast()
Comment thread
NETIZEN-11 marked this conversation as resolved.
f.mu.Unlock()
}

// update refreshes the token cache from the database. It releases the lock during the
// potentially slow DB operation to avoid blocking other goroutines, then re-acquires
// the lock to atomically update the cache. A re-check of staleness is performed
// after the DB call completes to avoid overwriting a cache that was refreshed by
// another goroutine while waiting for the database.
func (f *cachedFetcher) update(ctx context.Context) {
f.mu.Lock()
defer f.mu.Unlock()
if f.isUpdating {
// Wait for the in-progress update to finish
for f.isUpdating {
f.updateCond.Wait()
}
f.mu.Unlock()

return
}

if !f.isCacheStale() && !f.isCacheOverused() {
logger.DebugfContext(ctx, "Cache renewed in the meantime by another process")
f.mu.Unlock()

return
}
logger.DebugfContext(ctx, "Renew token cache")
f.isUpdating = true

// Defer finishUpdate to ensure cleanup and signaling happens regardless of exit path.
// This avoids repeating finishUpdate calls at every return site.
defer f.finishUpdate()

// Release lock during slow DB operation to not block other token operations
f.mu.Unlock()

it, err := f.tokenDB.SpendableTokensIteratorBy(ctx, "", "")
if err != nil {
logger.Warnf("Failed to get token iterator: %v", err)
Expand All @@ -224,22 +266,42 @@ func (f *cachedFetcher) update(ctx context.Context) {
}
defer it.Close()

m := f.groupTokensByKey(ctx, it)
m, err := f.groupTokensByKey(ctx, it)
if err != nil {
logger.Warnf("Failed to group tokens from iterator: %v", err)

return
}

f.mu.Lock()
// Re-check: another goroutine may have refreshed while we waited for DB
if !f.isCacheStale() && !f.isCacheOverused() {
logger.DebugfContext(ctx, "Cache renewed in the meantime by another process, skipping")

return
}

f.updateCache(ctx, m)
atomic.StoreInt64(&f.lastFetched, time.Now().UnixNano())
atomic.StoreUint32(&f.queriesResponded, 0)
}

// groupTokensByKey reads tokens from the iterator and groups them by wallet/currency key.
func (f *cachedFetcher) groupTokensByKey(ctx context.Context, it driver.SpendableTokensIterator) map[string][]*token2.UnspentTokenInWallet {
// It returns an error if the iterator fails mid-way to prevent partial updates.
func (f *cachedFetcher) groupTokensByKey(ctx context.Context, it driver.SpendableTokensIterator) (map[string][]*token2.UnspentTokenInWallet, error) {
m := map[string][]*token2.UnspentTokenInWallet{}
for t, err := it.Next(); err == nil && t != nil; t, err = it.Next() {
key := tokenKey(t.WalletID, t.Type)
logger.DebugfContext(ctx, "Adding token with key [%s]", key)
m[key] = append(m[key], t)
}
// Re-check for error after loop termination
_, err := it.Next()
if err != nil && !errors.Is(err, io.EOF) {
return nil, err
}

return m
return m, nil
}

// updateCache updates the cache by adding new entries before removing stale ones.
Expand Down
186 changes: 171 additions & 15 deletions token/services/selector/sherdlock/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import (
"context"
"errors"
"sync"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -81,8 +82,8 @@
atomic.StoreInt64(&fetcher.lastFetched, time.Now().UnixNano())
assert.False(t, fetcher.isCacheStale())

// Wait for cache to become stale
time.Sleep(150 * time.Millisecond)
// Manually set lastFetched to the past instead of sleeping
fetcher.lastFetched = time.Now().Add(-fetcher.freshnessInterval * 2)

Check failure on line 86 in token/services/selector/sherdlock/fetcher_test.go

View workflow job for this annotation

GitHub Actions / lint

cannot use time.Now().Add(-fetcher.freshnessInterval * 2) (value of struct type "time".Time) as int64 value in assignment

Check failure on line 86 in token/services/selector/sherdlock/fetcher_test.go

View workflow job for this annotation

GitHub Actions / checks

cannot use time.Now().Add(-fetcher.freshnessInterval * 2) (value of struct type "time".Time) as int64 value in assignment
assert.True(t, fetcher.isCacheStale())
}

Expand Down Expand Up @@ -235,8 +236,9 @@
ctx := t.Context()
fetcher.update(ctx)

// Wait for cache to become stale
time.Sleep(100 * time.Millisecond)
// Trigger hard refresh by setting lastFetched to the past
fetcher.lastFetched = time.Now().Add(-fetcher.freshnessInterval * 2)

Check failure on line 240 in token/services/selector/sherdlock/fetcher_test.go

View workflow job for this annotation

GitHub Actions / lint

cannot use time.Now().Add(-fetcher.freshnessInterval * 2) (value of struct type "time".Time) as int64 value in assignment

Check failure on line 240 in token/services/selector/sherdlock/fetcher_test.go

View workflow job for this annotation

GitHub Actions / checks

cannot use time.Now().Add(-fetcher.freshnessInterval * 2) (value of struct type "time".Time) as int64 value in assignment
assert.True(t, fetcher.isCacheStale())

// Setup second call expectation
tokens2 := []*token2.UnspentTokenInWallet{
Expand Down Expand Up @@ -303,10 +305,6 @@

fetcher.update(ctx)

// Note: Ristretto cache uses probabilistic eviction and may not immediately reflect changes
// We wait a bit for the cache to process the clear and new additions
time.Sleep(10 * time.Millisecond)

// New key should exist
_, ok3 := fetcher.cache.Get(tokenKey("wallet3", "GBP"))
assert.True(t, ok3)
Expand Down Expand Up @@ -608,7 +606,8 @@
mockIterator := iterators.Slice(tokens)

ctx := t.Context()
grouped := fetcher.groupTokensByKey(ctx, mockIterator)
grouped, err := fetcher.groupTokensByKey(ctx, mockIterator)
require.NoError(t, err)

// Should have 3 keys: wallet1-USD, wallet1-EUR, wallet2-USD
assert.Len(t, grouped, 3)
Expand All @@ -628,7 +627,8 @@
mockIterator := iterators.Slice(tokens)

ctx := t.Context()
grouped := fetcher.groupTokensByKey(ctx, mockIterator)
grouped, err := fetcher.groupTokensByKey(ctx, mockIterator)
require.NoError(t, err)

assert.Empty(t, grouped)
})
Expand Down Expand Up @@ -667,9 +667,6 @@
}
fetcher.updateCache(ctx, tokensByKey2)

// Wait for cache to process deletions
time.Sleep(10 * time.Millisecond)

// First key should still exist, second should be removed
_, ok1 = fetcher.cache.Get(tokenKey("wallet1", "USD"))
assert.True(t, ok1)
Expand Down Expand Up @@ -731,12 +728,12 @@
},
}
fetcher.updateCache(ctx, newTokens)
time.Sleep(5 * time.Millisecond)
time.Sleep(20 * time.Millisecond)
}

// Stop readers
close(stopReading)
time.Sleep(20 * time.Millisecond)
time.Sleep(100 * time.Millisecond)

// Check for errors
select {
Expand Down Expand Up @@ -787,6 +784,61 @@
})
}

func TestCachedFetcher_Update_ThunderingHerd(t *testing.T) {
mockDB := new(mockTokenDB)
// Short freshness interval
fetcher := newCachedFetcher(mockDB, 0, 50*time.Millisecond, 100)

Check failure on line 790 in token/services/selector/sherdlock/fetcher_test.go

View workflow job for this annotation

GitHub Actions / lint

undefined: newCachedFetcher

Check failure on line 790 in token/services/selector/sherdlock/fetcher_test.go

View workflow job for this annotation

GitHub Actions / checks

undefined: newCachedFetcher

// Initial population
mockDB.On("SpendableTokensIteratorBy", mock.Anything, "", token2.Type("")).
Return(iterators.Slice([]*token2.UnspentTokenInWallet{}), nil).Once()

ctx := t.Context()
fetcher.update(ctx)

// Trigger staleness manually
fetcher.mu.Lock()
fetcher.lastFetched = time.Now().Add(-10 * time.Second)
fetcher.mu.Unlock()

// Block the next DB call with a mock that waits
dbCallStarted := make(chan struct{})
dbCallRelease := make(chan struct{})

mockDB.On("SpendableTokensIteratorBy", mock.Anything, "", token2.Type("")).
Run(func(args mock.Arguments) {
close(dbCallStarted)
<-dbCallRelease
}).
Return(iterators.Slice([]*token2.UnspentTokenInWallet{}), nil).Once()

// Start multiple concurrent reads
var wg sync.WaitGroup
for range 10 {
wg.Add(1)
go func() {
defer wg.Done()
_, _ = fetcher.UnspentTokensIteratorBy(ctx, "wallet1", "USD")
}()
}

// Wait for at least one to start the DB call
select {
case <-dbCallStarted:
case <-time.After(5 * time.Second):
t.Fatal("timeout waiting for DB call to start")
}

// Release the DB call
close(dbCallRelease)

// Wait for all to finish
wg.Wait()

// Mock should only have been called TWICE total (once for initial, once for the 10 concurrent ones)
mockDB.AssertNumberOfCalls(t, "SpendableTokensIteratorBy", 2)
}

// TestNewFetcherProvider verifies provider creation with valid/invalid strategies and zero values.
func TestNewFetcherProvider(t *testing.T) {
t.Run("creates provider with valid strategy", func(t *testing.T) {
Expand Down Expand Up @@ -980,3 +1032,107 @@

return nil, errors.New("not implemented")
}

// TestCachedFetcher_UpdateDoesNotBlockReaders tests that the update() function
// releases the lock during the potentially slow DB operation, allowing concurrent
// readers to access the cache. This is the fix for issue #16.
func TestCachedFetcher_UpdateDoesNotBlockReaders(t *testing.T) {
mockDB := new(mockTokenDB)
// Use long freshness interval so cache won't be stale
fetcher := newCachedFetcher(mockDB, 0, 10*time.Second, 100)

Check failure on line 1042 in token/services/selector/sherdlock/fetcher_test.go

View workflow job for this annotation

GitHub Actions / lint

undefined: newCachedFetcher

Check failure on line 1042 in token/services/selector/sherdlock/fetcher_test.go

View workflow job for this annotation

GitHub Actions / checks

undefined: newCachedFetcher

// Pre-populate the cache so readers can hit it
initialTokens := []*token2.UnspentTokenInWallet{
{WalletID: "wallet1", Type: "USD", Quantity: "100"},
}
mockIterator := iterators.Slice(initialTokens)
mockDB.On("SpendableTokensIteratorBy", mock.Anything, "", token2.Type("")).Return(mockIterator, nil).Once()

ctx := t.Context()
fetcher.update(ctx)

// Make cache stale so update() will be called
fetcher.lastFetched = time.Now().Add(-20 * time.Second)

// Use channels to synchronize instead of Sleep
dbStarted := make(chan struct{})
slowDB := make(chan struct{})
tokensAfterSlowDB := []*token2.UnspentTokenInWallet{
{WalletID: "wallet1", Type: "USD", Quantity: "200"},
}
mockIterator2 := iterators.Slice(tokensAfterSlowDB)
mockDB.On("SpendableTokensIteratorBy", mock.Anything, "", token2.Type("")).Return(mockIterator2, nil).Run(func(args mock.Arguments) {
close(dbStarted) // Signal that the DB operation has started
<-slowDB // Wait before returning to simulate slow DB
}).Once()

// Track whether reader succeeded while update() was blocked on DB
var readerSuccess atomic.Bool
var readerWg sync.WaitGroup

// Start update in background (it will block on DB call)
readerWg.Add(1)
go func() {
defer readerWg.Done()
fetcher.update(ctx)
}()

// Wait for the background update to actually reach the DB call
select {
case <-dbStarted:
// Background update is now at line 240, having released the lock at line 238
case <-time.After(5 * time.Second):
t.Fatal("Timeout waiting for background update to reach DB operation")
}

// Reader should be able to acquire RLock while update() waits on DB
// This would deadlock before the fix
fetcher.mu.RLock()
_, ok := fetcher.cache.Get(tokenKey("wallet1", "USD"))
fetcher.mu.RUnlock()

if ok {
readerSuccess.Store(true)
}

// Signal DB to complete
close(slowDB)

// Wait for update to complete
readerWg.Wait()

// Verify reader succeeded - the cache should still be accessible during update
assert.True(t, readerSuccess.Load(), "reader should be able to access cache while update() is blocked on DB")
mockDB.AssertExpectations(t)
}

// TestCachedFetcher_UpdateReacquiresLockAfterDB tests that after the DB operation
// completes, update() correctly re-acquires the lock and performs the cache update.
func TestCachedFetcher_UpdateReacquiresLockAfterDB(t *testing.T) {
mockDB := new(mockTokenDB)
fetcher := newCachedFetcher(mockDB, 0, 1*time.Second, 100)

Check failure on line 1113 in token/services/selector/sherdlock/fetcher_test.go

View workflow job for this annotation

GitHub Actions / lint

undefined: newCachedFetcher (typecheck)

Check failure on line 1113 in token/services/selector/sherdlock/fetcher_test.go

View workflow job for this annotation

GitHub Actions / checks

undefined: newCachedFetcher

// Pre-populate to make cache appear stale
fetcher.lastFetched = time.Now().Add(-20 * time.Second)

tokens := []*token2.UnspentTokenInWallet{
{WalletID: "wallet1", Type: "USD", Quantity: "300"},
}
mockIterator := iterators.Slice(tokens)
mockDB.On("SpendableTokensIteratorBy", mock.Anything, "", token2.Type("")).Return(mockIterator, nil).Once()

ctx := t.Context()
fetcher.update(ctx)

// After update completes, cache should be refreshed (not stale)
assert.False(t, fetcher.isCacheStale())
assert.Equal(t, uint32(0), atomic.LoadUint32(&fetcher.queriesResponded))

// Token should be in cache
fetcher.mu.RLock()
_, ok := fetcher.cache.Get(tokenKey("wallet1", "USD"))
fetcher.mu.RUnlock()
assert.True(t, ok, "token should be in cache after update")

mockDB.AssertExpectations(t)
}
Loading
Loading