Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 25 additions & 9 deletions targeting/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (e *Engine) EvaluateContextResolved(ctx context.Context, resolved *Resolved
// 5. Resolve artifact topics from Store (per-request, can't cache).
var artifactTopics []string
for _, artifact := range artifactRefs {
topics, err := e.store.SetMembers(ctx, "topics:artifact:"+artifact)
topics, err := e.store.SetMembers(ctx, keyPrefixTopicsArtifact+artifact)
if err != nil {
e.metrics.StoreError(StageTopicMatch, err)
} else {
Expand Down Expand Up @@ -347,8 +347,8 @@ func (e *Engine) EvaluateIdentityResolved(ctx context.Context, resolved *Resolve
keys := make([]string, 0, len(identities)*2)
for _, uid := range identities {
hash := HashToken(uid.UserToken)
keys = append(keys, "user:profile:"+hash)
keys = append(keys, "user:exposures:"+hash)
keys = append(keys, keyPrefixUserProfile+hash)
keys = append(keys, keyPrefixUserExposures+hash)
}

// 2. Single MGet — 1 round-trip.
Expand Down Expand Up @@ -452,7 +452,7 @@ func (e *Engine) SetUserProfile(ctx context.Context, userToken string, segments
if err != nil {
return err
}
return e.store.Set(ctx, "user:profile:"+hash, string(data), 0)
return e.store.Set(ctx, keyPrefixUserProfile+hash, string(data), 0)
}

// SetUserProfiles writes segment memberships for multiple users in a single batch.
Expand All @@ -466,11 +466,27 @@ func (e *Engine) SetUserProfiles(ctx context.Context, profiles map[string]map[st
if err != nil {
return err
}
kvs["user:profile:"+hash] = string(data)
kvs[keyPrefixUserProfile+hash] = string(data)
}
return e.store.MSet(ctx, kvs, 0)
}

// DeleteUserProfile removes a user's segment profile.
func (e *Engine) DeleteUserProfile(ctx context.Context, userToken string) error {
hash := HashToken(userToken)
return e.store.Del(ctx, keyPrefixUserProfile+hash)
}

// DeleteUserProfiles removes segment profiles for multiple users in a single batch.
// The userTokens slice contains user tokens.
func (e *Engine) DeleteUserProfiles(ctx context.Context, userTokens []string) error {
keys := make([]string, len(userTokens))
for i, userToken := range userTokens {
keys[i] = keyPrefixUserProfile + HashToken(userToken)
}
return e.store.MDel(ctx, keys...)
}

// RecordExposure records an impression to the exposure log for all UIDs.
// Each UID's exposure log is read, the new entry is appended,
// old entries are pruned, and the log is written back.
Expand Down Expand Up @@ -529,7 +545,7 @@ func (e *Engine) RecordExposure(ctx context.Context, req *ExposeRequest) (*Expos
// Write to each UID's exposure log. Capture the first UID's log for the response.
var firstLog BinaryExposureLog
for i, hash := range hashes {
key := "user:exposures:" + hash
key := keyPrefixUserExposures + hash

val, _, err := e.store.Get(ctx, key)
if err != nil {
Expand Down Expand Up @@ -636,7 +652,7 @@ func (e *Engine) checkURLFilter(ctx context.Context, artifacts []string, pkgID s
urlHash := HashURL(artifact)

if cfg.URLBlocklist {
blocked, err := e.store.SetIsMember(ctx, "url:blocklist:"+pkgID, urlHash)
blocked, err := e.store.SetIsMember(ctx, keyPrefixURLBlocklist+pkgID, urlHash)
if err != nil {
return false, err
}
Expand All @@ -646,7 +662,7 @@ func (e *Engine) checkURLFilter(ctx context.Context, artifacts []string, pkgID s
}

if cfg.URLAllowlist {
allowKey := "url:allowlist:" + pkgID
allowKey := keyPrefixURLAllowlist + pkgID
exists, err := e.store.Exists(ctx, allowKey)
if err != nil {
return false, err
Expand All @@ -672,7 +688,7 @@ func (e *Engine) checkTopicMatch(ctx context.Context, artifacts []string, pkgID
return true, nil
}
for _, artifact := range artifacts {
intersection, err := e.store.SetIntersect(ctx, "topics:package:"+pkgID, "topics:artifact:"+artifact)
intersection, err := e.store.SetIntersect(ctx, keyPrefixTopicsPackage+pkgID, keyPrefixTopicsArtifact+artifact)
if err != nil {
return false, err
}
Expand Down
18 changes: 9 additions & 9 deletions targeting/identity_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (
// PackageIdentityConfig is the identity-side configuration for a package,
// stored in the Store as JSON at key "config:pkg:{packageID}".
type PackageIdentityConfig struct {
CampaignID string `json:"campaign_id,omitempty"`
CampaignID string `json:"campaign_id,omitempty"`
FrequencyRules []FrequencyRuleJSON `json:"frequency_rules,omitempty"`
TargetSegments []string `json:"target_segments,omitempty"`
TargetSegments []string `json:"target_segments,omitempty"`
}

// CampaignFreqConfig is the frequency cap configuration for a campaign,
Expand Down Expand Up @@ -42,7 +42,7 @@ func toFrequencyRules(rules []FrequencyRuleJSON) []FrequencyRule {
// loadPackageIdentityConfig reads identity config for a package from the Store.
// Returns nil if no config is found (package has no identity dimensions).
func loadPackageIdentityConfig(ctx context.Context, store Store, pkgID string) (*PackageIdentityConfig, error) {
key := fmt.Sprintf("config:pkg:%s", pkgID)
key := keyPrefixConfigPkg + pkgID
val, ok, err := store.Get(ctx, key)
if err != nil {
return nil, err
Expand All @@ -64,7 +64,7 @@ func batchLoadPackageContextConfigs(ctx context.Context, store Store, pkgIDs []s
}
keys := make([]string, len(pkgIDs))
for i, id := range pkgIDs {
keys[i] = fmt.Sprintf("config:pkg:%s:context", id)
keys[i] = keyPrefixConfigPkg + id + ":context"
}
values, err := store.MGet(ctx, keys...)
if err != nil {
Expand All @@ -91,7 +91,7 @@ func batchLoadPackageIdentityConfigs(ctx context.Context, store Store, pkgIDs []
}
keys := make([]string, len(pkgIDs))
for i, id := range pkgIDs {
keys[i] = fmt.Sprintf("config:pkg:%s", id)
keys[i] = keyPrefixConfigPkg + id
}
values, err := store.MGet(ctx, keys...)
if err != nil {
Expand All @@ -118,7 +118,7 @@ func batchLoadCampaignFreqConfigs(ctx context.Context, store Store, campaignIDs
}
keys := make([]string, len(campaignIDs))
for i, id := range campaignIDs {
keys[i] = fmt.Sprintf("config:campaign:%s", id)
keys[i] = keyPrefixConfigCampaign + id
}
values, err := store.MGet(ctx, keys...)
if err != nil {
Expand All @@ -144,7 +144,7 @@ func SeedPackageIdentityConfig(ctx context.Context, store Store, pkgID string, c
if err != nil {
return err
}
return store.Set(ctx, fmt.Sprintf("config:pkg:%s", pkgID), string(data), 0)
return store.Set(ctx, keyPrefixConfigPkg+pkgID, string(data), 0)
}

// SeedCampaignFreqConfig writes frequency config for a campaign to any Store.
Expand All @@ -153,13 +153,13 @@ func SeedCampaignFreqConfig(ctx context.Context, store Store, campaignID string,
if err != nil {
return err
}
return store.Set(ctx, fmt.Sprintf("config:campaign:%s", campaignID), string(data), 0)
return store.Set(ctx, keyPrefixConfigCampaign+campaignID, string(data), 0)
}

// loadCampaignFreqConfig reads frequency cap config for a campaign from the Store.
// Returns nil if no config is found.
func loadCampaignFreqConfig(ctx context.Context, store Store, campaignID string) (*CampaignFreqConfig, error) {
key := fmt.Sprintf("config:campaign:%s", campaignID)
key := keyPrefixConfigCampaign + campaignID
val, ok, err := store.Get(ctx, key)
if err != nil {
return nil, err
Expand Down
33 changes: 24 additions & 9 deletions targeting/mock_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package targeting
import (
"context"
"encoding/json"
"fmt"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -63,15 +62,15 @@ func (m *MockStore) SetPackageIdentityConfig(pkgID string, cfg PackageIdentityCo
data, _ := json.Marshal(cfg)
m.mu.Lock()
defer m.mu.Unlock()
m.strings[fmt.Sprintf("config:pkg:%s", pkgID)] = stringEntry{value: string(data)}
m.strings[keyPrefixConfigPkg+pkgID] = stringEntry{value: string(data)}
}

// SetCampaignFreqConfig stores frequency config for a campaign. Test helper.
func (m *MockStore) SetCampaignFreqConfig(campaignID string, cfg CampaignFreqConfig) {
data, _ := json.Marshal(cfg)
m.mu.Lock()
defer m.mu.Unlock()
m.strings[fmt.Sprintf("config:campaign:%s", campaignID)] = stringEntry{value: string(data)}
m.strings[keyPrefixConfigCampaign+campaignID] = stringEntry{value: string(data)}
}

// SetMediaBuy stores a media buy and adds it to the seller's set. Test helper.
Expand All @@ -80,15 +79,15 @@ func (m *MockStore) SetMediaBuy(mb MediaBuy) {
m.mu.Lock()
defer m.mu.Unlock()
// Add to seller set.
sellerKey := "mediabuy:seller:" + mb.SellerID
sellerKey := keyPrefixMediaBuySeller + mb.SellerID
s, ok := m.sets[sellerKey]
if !ok {
s = make(map[string]struct{})
m.sets[sellerKey] = s
}
s[mb.MediaBuyID] = struct{}{}
// Store media buy JSON.
m.strings["mediabuy:"+mb.MediaBuyID] = stringEntry{value: string(data)}
m.strings[keyPrefixMediaBuy+mb.MediaBuyID] = stringEntry{value: string(data)}
}

// SetUserProfile stores a user's segment memberships. Test helper.
Expand All @@ -98,7 +97,7 @@ func (m *MockStore) SetUserProfile(token string, segments map[string]float64) {
data, _ := json.Marshal(profile)
m.mu.Lock()
defer m.mu.Unlock()
m.strings["user:profile:"+hash] = stringEntry{value: string(data)}
m.strings[keyPrefixUserProfile+hash] = stringEntry{value: string(data)}
}

// SetUserExposures stores a user's exposure log in binary format. Test helper.
Expand All @@ -107,13 +106,13 @@ func (m *MockStore) SetUserExposures(token string, entries []ExposureEntry) {
bin := EncodeBinaryExposureLog(entries)
m.mu.Lock()
defer m.mu.Unlock()
m.strings["user:exposures:"+hash] = stringEntry{value: string(bin)}
m.strings[keyPrefixUserExposures+hash] = stringEntry{value: string(bin)}
}

// AddExposure appends an exposure entry to a user's log. Test helper.
func (m *MockStore) AddExposure(token string, entry ExposureEntry) {
hash := HashToken(token)
key := "user:exposures:" + hash
key := keyPrefixUserExposures + hash
m.mu.Lock()
defer m.mu.Unlock()
existing := BinaryExposureLog(m.strings[key].value)
Expand All @@ -127,7 +126,7 @@ func (m *MockStore) SetPackageContextConfig(pkgID string, cfg PackageContextConf
data, _ := json.Marshal(cfg)
m.mu.Lock()
defer m.mu.Unlock()
m.strings[fmt.Sprintf("config:pkg:%s:context", pkgID)] = stringEntry{value: string(data)}
m.strings[keyPrefixConfigPkg+pkgID+":context"] = stringEntry{value: string(data)}
}

func (m *MockStore) SetIsMember(_ context.Context, key, member string) (bool, error) {
Expand Down Expand Up @@ -334,6 +333,22 @@ func (m *MockStore) MSet(_ context.Context, kvs map[string]string, ttl time.Dura
return nil
}

func (m *MockStore) Del(_ context.Context, key string) error {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.strings, key)
return nil
}

func (m *MockStore) MDel(_ context.Context, keys ...string) error {
m.mu.Lock()
defer m.mu.Unlock()
for _, key := range keys {
delete(m.strings, key)
}
return nil
}

// isExpired checks key-level expiry. Must be called with lock held.
func (m *MockStore) isExpired(key string) bool {
exp, ok := m.expiry[key]
Expand Down
10 changes: 5 additions & 5 deletions targeting/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type MediaBuyPackage struct {
// Total: 2 Store round-trips (1 SetMembers + 1 MGet) regardless of media buy count.
func ResolvePackages(ctx context.Context, store Store, sellerID, propertyID, country string, now time.Time) ([]tmproto.AvailablePackage, error) {
// 1. Get all media buy IDs for this seller.
mbIDs, err := store.SetMembers(ctx, "mediabuy:seller:"+sellerID)
mbIDs, err := store.SetMembers(ctx, keyPrefixMediaBuySeller+sellerID)
if err != nil {
return nil, fmt.Errorf("resolve media buys for seller %s: %w", sellerID, err)
}
Expand All @@ -49,7 +49,7 @@ func ResolvePackages(ctx context.Context, store Store, sellerID, propertyID, cou
// 2. Batch-load all media buy JSON.
keys := make([]string, len(mbIDs))
for i, id := range mbIDs {
keys[i] = "mediabuy:" + id
keys[i] = keyPrefixMediaBuy + id
}
values, err := store.MGet(ctx, keys...)
if err != nil {
Expand Down Expand Up @@ -157,22 +157,22 @@ func Resolve(ctx context.Context, store Store, sellerID, propertyID, country str
}

// Topic index: load topic set from Store.
topics, _ := store.SetMembers(ctx, "topics:package:"+pkgID)
topics, _ := store.SetMembers(ctx, keyPrefixTopicsPackage+pkgID)
for _, topic := range topics {
topicIdx[topic] = append(topicIdx[topic], pkgID)
}

// URL blocklist index: load blocklist from Store.
if cc := ctxConfigs[pkgID]; cc != nil && cc.URLBlocklist {
blocked, _ := store.SetMembers(ctx, "url:blocklist:"+pkgID)
blocked, _ := store.SetMembers(ctx, keyPrefixURLBlocklist+pkgID)
for _, hash := range blocked {
urlBlockIdx[hash] = append(urlBlockIdx[hash], pkgID)
}
}

// URL allowlist: load allowlist from Store.
if cc := ctxConfigs[pkgID]; cc != nil && cc.URLAllowlist {
allowed, _ := store.SetMembers(ctx, "url:allowlist:"+pkgID)
allowed, _ := store.SetMembers(ctx, keyPrefixURLAllowlist+pkgID)
if len(allowed) > 0 {
set := make(map[string]struct{}, len(allowed))
for _, hash := range allowed {
Expand Down
20 changes: 20 additions & 0 deletions targeting/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,20 @@ import (
"time"
)

// Store key prefixes.
const (
keyPrefixUserProfile = "user:profile:"
keyPrefixUserExposures = "user:exposures:"
keyPrefixTopicsArtifact = "topics:artifact:"
keyPrefixTopicsPackage = "topics:package:"
keyPrefixURLBlocklist = "url:blocklist:"
keyPrefixURLAllowlist = "url:allowlist:"
keyPrefixMediaBuySeller = "mediabuy:seller:"
keyPrefixMediaBuy = "mediabuy:"
keyPrefixConfigPkg = "config:pkg:"
keyPrefixConfigCampaign = "config:campaign:"
)

// Store is a storage backend for the targeting engine.
// Implementations wrap Valkey/Redis or an in-memory mock.
type Store interface {
Expand Down Expand Up @@ -43,4 +57,10 @@ type Store interface {

// MSet stores multiple key-value pairs with an optional TTL. Zero TTL means no expiry.
MSet(ctx context.Context, kvs map[string]string, ttl time.Duration) error

// Del removes a key. It is a no-op if the key does not exist.
Del(ctx context.Context, key string) error

// MDel removes multiple keys. It is a no-op for keys that do not exist.
MDel(ctx context.Context, keys ...string) error
}
11 changes: 11 additions & 0 deletions targeting/valkeystore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,17 @@ func (s *Store) MGet(ctx context.Context, keys ...string) ([]string, error) {
return results, nil
}

func (s *Store) Del(ctx context.Context, key string) error {
return s.rdb.Del(ctx, key).Err()
}

func (s *Store) MDel(ctx context.Context, keys ...string) error {
if len(keys) == 0 {
return nil
}
return s.rdb.Del(ctx, keys...).Err()
}

func (s *Store) MSet(ctx context.Context, kvs map[string]string, ttl time.Duration) error {
if len(kvs) == 0 {
return nil
Expand Down
Loading