Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 86 additions & 11 deletions token/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/hyperledger-labs/fabric-token-sdk/token/driver"
"github.com/hyperledger-labs/fabric-token-sdk/token/driver/protos-go/request"
"github.com/hyperledger-labs/fabric-token-sdk/token/services/utils"
"github.com/hyperledger-labs/fabric-token-sdk/token/services/validation"
"github.com/hyperledger-labs/fabric-token-sdk/token/token"
"go.uber.org/zap/zapcore"
)
Expand Down Expand Up @@ -283,24 +284,30 @@ func (r *Request) ID() RequestAnchor {
// Additional options can be passed to customize the action.
func (r *Request) Issue(ctx context.Context, wallet *IssuerWallet, receiver Identity, typ token.Type, q uint64, opts ...IssueOption) (*IssueAction, error) {
logger.DebugfContext(ctx, "Start issue")
logger.DebugfContext(ctx, "Done issue")

if wallet == nil {
return nil, errors.Errorf("wallet is nil")
}
if typ == "" {
return nil, errors.Errorf("type is empty")
if err := validation.ValidateTokenType(string(typ)); err != nil {
return nil, errors.Wrap(err, "invalid token type")
}
if err := validation.ValidateAmount(q, 0); err != nil {
return nil, errors.Wrap(err, "invalid amount")
}
if receiver.IsNone() {
return nil, errors.Errorf("all recipients should be defined")
}
if q == 0 {
return nil, errors.Errorf("q is zero")
if r.TokenService == nil || r.TokenService.PublicParametersManager() == nil || r.TokenService.PublicParametersManager().PublicParameters() == nil {
return nil, errors.Errorf("token service is not properly initialized")
}

// Validate amount doesn't exceed max token value
maxTokenValue := r.TokenService.PublicParametersManager().PublicParameters().MaxTokenValue()
if q > maxTokenValue {
return nil, errors.Errorf("q is larger than max token value [%d]", maxTokenValue)
return nil, errors.Errorf("amount exceeds max token value [%d]", maxTokenValue)
}

if receiver.IsNone() {
return nil, errors.Errorf("all recipients should be defined")
}
logger.DebugfContext(ctx, "Done issue")

id, err := wallet.GetIssuerIdentity(typ)
if err != nil {
Expand All @@ -312,6 +319,11 @@ func (r *Request) Issue(ctx context.Context, wallet *IssuerWallet, receiver Iden
return nil, errors.WithMessagef(err, "failed compiling options [%v]", opts)
}

// Validate metadata using the validation package
if err := validation.ValidateMetadata(opt.Attributes); err != nil {
return nil, errors.Wrap(err, "invalid metadata")
}

// Compute Issue
action, metaRaw, err := r.TokenService.tms.IssueService().Issue(
ctx,
Expand Down Expand Up @@ -343,15 +355,51 @@ func (r *Request) Issue(ctx context.Context, wallet *IssuerWallet, receiver Iden
// In other words, owners[0] will receives values[0], and so on.
// Additional options can be passed to customize the action.
func (r *Request) Transfer(ctx context.Context, wallet *OwnerWallet, typ token.Type, values []uint64, owners []Identity, opts ...TransferOption) (*TransferAction, error) {
for _, v := range values {
if wallet == nil {
return nil, errors.Errorf("wallet is nil")
}
if r.TokenService == nil || r.TokenService.PublicParametersManager() == nil || r.TokenService.PublicParametersManager().PublicParameters() == nil {
return nil, errors.Errorf("token service is not properly initialized")
}

// Validate token type
if err := validation.ValidateTokenType(string(typ)); err != nil {
return nil, errors.Wrap(err, "invalid token type")
}

// Validate values using the validation package
maxTokenValue := r.TokenService.PublicParametersManager().PublicParameters().MaxTokenValue()
for i, v := range values {
if v == 0 {
return nil, errors.Errorf("value is zero")
return nil, errors.Errorf("value at index %d is zero", i)
}
if v > maxTokenValue {
return nil, errors.Errorf("value at index %d exceeds max token value [%d]", i, maxTokenValue)
}
}

// Validate owners match values length
if len(owners) != len(values) {
return nil, errors.Errorf("number of owners [%d] does not match number of values [%d]", len(owners), len(values))
}

// Validate all owners are defined
for i, owner := range owners {
if owner.IsNone() {
return nil, errors.Errorf("owner at index %d is not defined", i)
}
}

opt, err := CompileTransferOptions(opts...)
if err != nil {
return nil, errors.WithMessagef(err, "failed compiling options [%v]", opts)
}

// Validate metadata using the validation package
if err := validation.ValidateMetadata(opt.Attributes); err != nil {
return nil, errors.Wrap(err, "invalid metadata")
}

tokenIDs, outputTokens, err := r.prepareTransfer(ctx, false, wallet, typ, values, owners, opt)
if err != nil {
return nil, errors.Wrap(err, "failed preparing transfer")
Expand Down Expand Up @@ -397,10 +445,37 @@ func (r *Request) Transfer(ctx context.Context, wallet *OwnerWallet, typ token.T
// The action redeems tokens of the passed type for a total amount matching the passed value.
// Additional options can be passed to customize the action.
func (r *Request) Redeem(ctx context.Context, wallet *OwnerWallet, typ token.Type, value uint64, opts ...TransferOption) (*TransferAction, error) {
if wallet == nil {
return nil, errors.Errorf("wallet is nil")
}
if r.TokenService == nil || r.TokenService.PublicParametersManager() == nil || r.TokenService.PublicParametersManager().PublicParameters() == nil {
return nil, errors.Errorf("token service is not properly initialized")
}

// Validate token type
if err := validation.ValidateTokenType(string(typ)); err != nil {
return nil, errors.Wrap(err, "invalid token type")
}

// Validate value doesn't exceed max
maxTokenValue := r.TokenService.PublicParametersManager().PublicParameters().MaxTokenValue()
if value == 0 {
return nil, errors.Errorf("redeem value is zero")
}
if value > maxTokenValue {
return nil, errors.Errorf("redeem value exceeds max token value [%d]", maxTokenValue)
}

opt, err := CompileTransferOptions(opts...)
if err != nil {
return nil, errors.WithMessagef(err, "failed compiling options [%v]", opts)
}

// Validate metadata using the validation package
if err := validation.ValidateMetadata(opt.Attributes); err != nil {
return nil, errors.Wrap(err, "invalid metadata")
}

tokenIDs, outputTokens, err := r.prepareTransfer(ctx, true, wallet, typ, []uint64{value}, []Identity{nil}, opt)
if err != nil {
return nil, errors.Wrap(err, "failed preparing transfer")
Expand Down
45 changes: 38 additions & 7 deletions token/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,15 +525,15 @@ func TestRequest_Issue(t *testing.T) {
wallet := &IssuerWallet{}
_, err := req.Issue(ctx, wallet, Identity("receiver"), "", 100)
require.Error(t, err)
assert.Contains(t, err.Error(), "type is empty")
assert.Contains(t, err.Error(), "invalid token type")
})

t.Run("zero quantity", func(t *testing.T) {
req := NewRequest(nil, "test-anchor")
wallet := &IssuerWallet{}
_, err := req.Issue(ctx, wallet, Identity("receiver"), "USD", 0)
require.Error(t, err)
assert.Contains(t, err.Error(), "q is zero")
assert.Contains(t, err.Error(), "invalid amount")
})

t.Run("none receiver", func(t *testing.T) {
Expand Down Expand Up @@ -577,28 +577,59 @@ func TestRequest_Issue(t *testing.T) {
mockWallet := &IssuerWallet{}
_, err := req.Issue(ctx, mockWallet, Identity("receiver"), "USD", 200)
require.Error(t, err)
assert.Contains(t, err.Error(), "q is larger than max token value")
assert.Contains(t, err.Error(), "amount exceeds max token value")
})
}

// TestRequest_Transfer tests the Transfer function
func TestRequest_Transfer(t *testing.T) {
ctx := t.Context()

t.Run("zero value", func(t *testing.T) {
t.Run("nil wallet", func(t *testing.T) {
req := NewRequest(nil, "test-anchor")
_, err := req.Transfer(ctx, nil, "USD", []uint64{100}, []Identity{Identity("receiver")})
require.Error(t, err)
assert.Contains(t, err.Error(), "wallet is nil")
})

t.Run("zero value", func(t *testing.T) {
mockPP := &driver2.PublicParameters{}
mockPP.MaxTokenValueReturns(1000000)

mockPPM := &driver2.PublicParamsManager{}
mockPPM.PublicParametersReturns(mockPP)

tms := &ManagementService{
publicParametersManager: &PublicParametersManager{
ppm: mockPPM,
pp: &PublicParameters{PublicParameters: mockPP},
},
}
req := NewRequest(tms, "test-anchor")
wallet := &OwnerWallet{}
_, err := req.Transfer(ctx, wallet, "USD", []uint64{0, 100}, []Identity{Identity("receiver1"), Identity("receiver2")})
require.Error(t, err)
assert.Contains(t, err.Error(), "value is zero")
assert.Contains(t, err.Error(), "value at index 0 is zero")
})

t.Run("multiple zero values", func(t *testing.T) {
req := NewRequest(nil, "test-anchor")
mockPP := &driver2.PublicParameters{}
mockPP.MaxTokenValueReturns(1000000)

mockPPM := &driver2.PublicParamsManager{}
mockPPM.PublicParametersReturns(mockPP)

tms := &ManagementService{
publicParametersManager: &PublicParametersManager{
ppm: mockPPM,
pp: &PublicParameters{PublicParameters: mockPP},
},
}
req := NewRequest(tms, "test-anchor")
wallet := &OwnerWallet{}
_, err := req.Transfer(ctx, wallet, "USD", []uint64{100, 0}, []Identity{Identity("receiver1"), Identity("receiver2")})
require.Error(t, err)
assert.Contains(t, err.Error(), "value is zero")
assert.Contains(t, err.Error(), "value at index 1 is zero")
})
}

Expand Down
80 changes: 74 additions & 6 deletions token/services/selector/sherdlock/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ package sherdlock

import (
"context"
"io"
"sync"
"sync/atomic"
"time"

"github.com/hyperledger-labs/fabric-smart-client/pkg/utils/errors"
"github.com/hyperledger-labs/fabric-smart-client/platform/common/utils/collections"
"github.com/hyperledger-labs/fabric-smart-client/platform/common/utils/collections/iterators"
"github.com/hyperledger-labs/fabric-token-sdk/token"
Expand Down Expand Up @@ -170,7 +172,11 @@ type cachedFetcher struct {
queriesResponded uint32
// prevKeys tracks cache keys from the previous update cycle to identify stale entries that need removal.
prevKeys map[string]struct{}
mu sync.RWMutex
// isUpdating indicates if a cache refresh is currently in progress.
isUpdating bool
// updateCond allows goroutines to wait for an in-progress update to complete.
updateCond *sync.Cond
mu sync.RWMutex
}

// NewCachedFetcher creates a fetcher that maintains a periodically refreshed cache of all tokens.
Expand Down Expand Up @@ -198,48 +204,110 @@ func NewCachedFetcher(tokenDB TokenDB, cacheSize int64, freshnessInterval time.D
panic("failed to create ristretto cache: " + err.Error())
}

return &cachedFetcher{
f := &cachedFetcher{
tokenDB: tokenDB,
cache: ristrettoCache,
freshnessInterval: freshnessInterval,
maxQueriesBeforeRefresh: uint32(maxQueriesBeforeRefresh),
prevKeys: make(map[string]struct{}),
}
f.updateCond = sync.NewCond(&f.mu)

return f
}

// finishUpdate signals completion and releases the lock.
// Must be called while holding f.mu.
func (f *cachedFetcher) finishUpdate() {
f.isUpdating = false
f.updateCond.Broadcast()
f.mu.Unlock()
}

// completeUpdate signals completion without unlocking.
// Use this when the lock is not held (e.g., on error paths before re-acquiring lock).
func (f *cachedFetcher) completeUpdate() {
f.isUpdating = false
f.updateCond.Broadcast()
}

// update refreshes the token cache from the database. It releases the lock during the
Comment thread
NETIZEN-11 marked this conversation as resolved.
// potentially slow DB operation to avoid blocking other goroutines, then re-acquires
// the lock to atomically update the cache. A re-check of staleness is performed
// after the DB call completes to avoid overwriting a cache that was refreshed by
// another goroutine while waiting for the database.
func (f *cachedFetcher) update(ctx context.Context) {
f.mu.Lock()
defer f.mu.Unlock()
if f.isUpdating {
// Wait for the in-progress update to finish
for f.isUpdating {
f.updateCond.Wait()
}
f.mu.Unlock()

return
}

if !f.isCacheStale() && !f.isCacheOverused() {
logger.DebugfContext(ctx, "Cache renewed in the meantime by another process")
f.mu.Unlock()

return
}
logger.DebugfContext(ctx, "Renew token cache")
f.isUpdating = true

// Release lock during slow DB operation to not block other token operations
f.mu.Unlock()

it, err := f.tokenDB.SpendableTokensIteratorBy(ctx, "", "")
if err != nil {
logger.Warnf("Failed to get token iterator: %v", err)
f.completeUpdate()

return
}
defer it.Close()

m := f.groupTokensByKey(ctx, it)
m, err := f.groupTokensByKey(ctx, it)
if err != nil {
logger.Warnf("Failed to group tokens from iterator: %v", err)
f.completeUpdate()

return
}

f.mu.Lock()
// Re-check: another goroutine may have refreshed while we waited for DB
if !f.isCacheStale() && !f.isCacheOverused() {
logger.DebugfContext(ctx, "Cache renewed in the meantime by another process, skipping")
f.finishUpdate()

return
}

f.updateCache(ctx, m)
atomic.StoreInt64(&f.lastFetched, time.Now().UnixNano())
atomic.StoreUint32(&f.queriesResponded, 0)
f.finishUpdate()
}

// groupTokensByKey reads tokens from the iterator and groups them by wallet/currency key.
func (f *cachedFetcher) groupTokensByKey(ctx context.Context, it driver.SpendableTokensIterator) map[string][]*token2.UnspentTokenInWallet {
// It returns an error if the iterator fails mid-way to prevent partial updates.
func (f *cachedFetcher) groupTokensByKey(ctx context.Context, it driver.SpendableTokensIterator) (map[string][]*token2.UnspentTokenInWallet, error) {
m := map[string][]*token2.UnspentTokenInWallet{}
for t, err := it.Next(); err == nil && t != nil; t, err = it.Next() {
key := tokenKey(t.WalletID, t.Type)
logger.DebugfContext(ctx, "Adding token with key [%s]", key)
m[key] = append(m[key], t)
}
// Re-check for error after loop termination
_, err := it.Next()
if err != nil && !errors.Is(err, io.EOF) {
return nil, err
}

return m
return m, nil
}

// updateCache updates the cache by adding new entries before removing stale ones.
Expand Down
Loading