diff --git a/targeting/engine.go b/targeting/engine.go index 79dab32..a3e2143 100644 --- a/targeting/engine.go +++ b/targeting/engine.go @@ -243,7 +243,7 @@ func (e *Engine) EvaluateContextResolved(ctx context.Context, resolved *Resolved // 5. Resolve artifact topics from Store (per-request, can't cache). var artifactTopics []string for _, artifact := range artifactRefs { - topics, err := e.store.SetMembers(ctx, "topics:artifact:"+artifact) + topics, err := e.store.SetMembers(ctx, keyPrefixTopicsArtifact+artifact) if err != nil { e.metrics.StoreError(StageTopicMatch, err) } else { @@ -347,8 +347,8 @@ func (e *Engine) EvaluateIdentityResolved(ctx context.Context, resolved *Resolve keys := make([]string, 0, len(identities)*2) for _, uid := range identities { hash := HashToken(uid.UserToken) - keys = append(keys, "user:profile:"+hash) - keys = append(keys, "user:exposures:"+hash) + keys = append(keys, keyPrefixUserProfile+hash) + keys = append(keys, keyPrefixUserExposures+hash) } // 2. Single MGet — 1 round-trip. @@ -452,7 +452,7 @@ func (e *Engine) SetUserProfile(ctx context.Context, userToken string, segments if err != nil { return err } - return e.store.Set(ctx, "user:profile:"+hash, string(data), 0) + return e.store.Set(ctx, keyPrefixUserProfile+hash, string(data), 0) } // SetUserProfiles writes segment memberships for multiple users in a single batch. @@ -466,11 +466,27 @@ func (e *Engine) SetUserProfiles(ctx context.Context, profiles map[string]map[st if err != nil { return err } - kvs["user:profile:"+hash] = string(data) + kvs[keyPrefixUserProfile+hash] = string(data) } return e.store.MSet(ctx, kvs, 0) } +// DeleteUserProfile removes a user's segment profile. +func (e *Engine) DeleteUserProfile(ctx context.Context, userToken string) error { + hash := HashToken(userToken) + return e.store.Del(ctx, keyPrefixUserProfile+hash) +} + +// DeleteUserProfiles removes segment profiles for multiple users in a single batch. +// The userTokens slice contains user tokens. +func (e *Engine) DeleteUserProfiles(ctx context.Context, userTokens []string) error { + keys := make([]string, len(userTokens)) + for i, userToken := range userTokens { + keys[i] = keyPrefixUserProfile + HashToken(userToken) + } + return e.store.MDel(ctx, keys...) +} + // RecordExposure records an impression to the exposure log for all UIDs. // Each UID's exposure log is read, the new entry is appended, // old entries are pruned, and the log is written back. @@ -529,7 +545,7 @@ func (e *Engine) RecordExposure(ctx context.Context, req *ExposeRequest) (*Expos // Write to each UID's exposure log. Capture the first UID's log for the response. var firstLog BinaryExposureLog for i, hash := range hashes { - key := "user:exposures:" + hash + key := keyPrefixUserExposures + hash val, _, err := e.store.Get(ctx, key) if err != nil { @@ -636,7 +652,7 @@ func (e *Engine) checkURLFilter(ctx context.Context, artifacts []string, pkgID s urlHash := HashURL(artifact) if cfg.URLBlocklist { - blocked, err := e.store.SetIsMember(ctx, "url:blocklist:"+pkgID, urlHash) + blocked, err := e.store.SetIsMember(ctx, keyPrefixURLBlocklist+pkgID, urlHash) if err != nil { return false, err } @@ -646,7 +662,7 @@ func (e *Engine) checkURLFilter(ctx context.Context, artifacts []string, pkgID s } if cfg.URLAllowlist { - allowKey := "url:allowlist:" + pkgID + allowKey := keyPrefixURLAllowlist + pkgID exists, err := e.store.Exists(ctx, allowKey) if err != nil { return false, err @@ -672,7 +688,7 @@ func (e *Engine) checkTopicMatch(ctx context.Context, artifacts []string, pkgID return true, nil } for _, artifact := range artifacts { - intersection, err := e.store.SetIntersect(ctx, "topics:package:"+pkgID, "topics:artifact:"+artifact) + intersection, err := e.store.SetIntersect(ctx, keyPrefixTopicsPackage+pkgID, keyPrefixTopicsArtifact+artifact) if err != nil { return false, err } diff --git a/targeting/identity_config.go b/targeting/identity_config.go index 7a731bb..0ecd627 100644 --- a/targeting/identity_config.go +++ b/targeting/identity_config.go @@ -10,9 +10,9 @@ import ( // PackageIdentityConfig is the identity-side configuration for a package, // stored in the Store as JSON at key "config:pkg:{packageID}". type PackageIdentityConfig struct { - CampaignID string `json:"campaign_id,omitempty"` + CampaignID string `json:"campaign_id,omitempty"` FrequencyRules []FrequencyRuleJSON `json:"frequency_rules,omitempty"` - TargetSegments []string `json:"target_segments,omitempty"` + TargetSegments []string `json:"target_segments,omitempty"` } // CampaignFreqConfig is the frequency cap configuration for a campaign, @@ -42,7 +42,7 @@ func toFrequencyRules(rules []FrequencyRuleJSON) []FrequencyRule { // loadPackageIdentityConfig reads identity config for a package from the Store. // Returns nil if no config is found (package has no identity dimensions). func loadPackageIdentityConfig(ctx context.Context, store Store, pkgID string) (*PackageIdentityConfig, error) { - key := fmt.Sprintf("config:pkg:%s", pkgID) + key := keyPrefixConfigPkg + pkgID val, ok, err := store.Get(ctx, key) if err != nil { return nil, err @@ -64,7 +64,7 @@ func batchLoadPackageContextConfigs(ctx context.Context, store Store, pkgIDs []s } keys := make([]string, len(pkgIDs)) for i, id := range pkgIDs { - keys[i] = fmt.Sprintf("config:pkg:%s:context", id) + keys[i] = keyPrefixConfigPkg + id + ":context" } values, err := store.MGet(ctx, keys...) if err != nil { @@ -91,7 +91,7 @@ func batchLoadPackageIdentityConfigs(ctx context.Context, store Store, pkgIDs [] } keys := make([]string, len(pkgIDs)) for i, id := range pkgIDs { - keys[i] = fmt.Sprintf("config:pkg:%s", id) + keys[i] = keyPrefixConfigPkg + id } values, err := store.MGet(ctx, keys...) if err != nil { @@ -118,7 +118,7 @@ func batchLoadCampaignFreqConfigs(ctx context.Context, store Store, campaignIDs } keys := make([]string, len(campaignIDs)) for i, id := range campaignIDs { - keys[i] = fmt.Sprintf("config:campaign:%s", id) + keys[i] = keyPrefixConfigCampaign + id } values, err := store.MGet(ctx, keys...) if err != nil { @@ -144,7 +144,7 @@ func SeedPackageIdentityConfig(ctx context.Context, store Store, pkgID string, c if err != nil { return err } - return store.Set(ctx, fmt.Sprintf("config:pkg:%s", pkgID), string(data), 0) + return store.Set(ctx, keyPrefixConfigPkg+pkgID, string(data), 0) } // SeedCampaignFreqConfig writes frequency config for a campaign to any Store. @@ -153,13 +153,13 @@ func SeedCampaignFreqConfig(ctx context.Context, store Store, campaignID string, if err != nil { return err } - return store.Set(ctx, fmt.Sprintf("config:campaign:%s", campaignID), string(data), 0) + return store.Set(ctx, keyPrefixConfigCampaign+campaignID, string(data), 0) } // loadCampaignFreqConfig reads frequency cap config for a campaign from the Store. // Returns nil if no config is found. func loadCampaignFreqConfig(ctx context.Context, store Store, campaignID string) (*CampaignFreqConfig, error) { - key := fmt.Sprintf("config:campaign:%s", campaignID) + key := keyPrefixConfigCampaign + campaignID val, ok, err := store.Get(ctx, key) if err != nil { return nil, err diff --git a/targeting/mock_store.go b/targeting/mock_store.go index fced5ec..5cbbe0b 100644 --- a/targeting/mock_store.go +++ b/targeting/mock_store.go @@ -3,7 +3,6 @@ package targeting import ( "context" "encoding/json" - "fmt" "sort" "sync" "time" @@ -63,7 +62,7 @@ func (m *MockStore) SetPackageIdentityConfig(pkgID string, cfg PackageIdentityCo data, _ := json.Marshal(cfg) m.mu.Lock() defer m.mu.Unlock() - m.strings[fmt.Sprintf("config:pkg:%s", pkgID)] = stringEntry{value: string(data)} + m.strings[keyPrefixConfigPkg+pkgID] = stringEntry{value: string(data)} } // SetCampaignFreqConfig stores frequency config for a campaign. Test helper. @@ -71,7 +70,7 @@ func (m *MockStore) SetCampaignFreqConfig(campaignID string, cfg CampaignFreqCon data, _ := json.Marshal(cfg) m.mu.Lock() defer m.mu.Unlock() - m.strings[fmt.Sprintf("config:campaign:%s", campaignID)] = stringEntry{value: string(data)} + m.strings[keyPrefixConfigCampaign+campaignID] = stringEntry{value: string(data)} } // SetMediaBuy stores a media buy and adds it to the seller's set. Test helper. @@ -80,7 +79,7 @@ func (m *MockStore) SetMediaBuy(mb MediaBuy) { m.mu.Lock() defer m.mu.Unlock() // Add to seller set. - sellerKey := "mediabuy:seller:" + mb.SellerID + sellerKey := keyPrefixMediaBuySeller + mb.SellerID s, ok := m.sets[sellerKey] if !ok { s = make(map[string]struct{}) @@ -88,7 +87,7 @@ func (m *MockStore) SetMediaBuy(mb MediaBuy) { } s[mb.MediaBuyID] = struct{}{} // Store media buy JSON. - m.strings["mediabuy:"+mb.MediaBuyID] = stringEntry{value: string(data)} + m.strings[keyPrefixMediaBuy+mb.MediaBuyID] = stringEntry{value: string(data)} } // SetUserProfile stores a user's segment memberships. Test helper. @@ -98,7 +97,7 @@ func (m *MockStore) SetUserProfile(token string, segments map[string]float64) { data, _ := json.Marshal(profile) m.mu.Lock() defer m.mu.Unlock() - m.strings["user:profile:"+hash] = stringEntry{value: string(data)} + m.strings[keyPrefixUserProfile+hash] = stringEntry{value: string(data)} } // SetUserExposures stores a user's exposure log in binary format. Test helper. @@ -107,13 +106,13 @@ func (m *MockStore) SetUserExposures(token string, entries []ExposureEntry) { bin := EncodeBinaryExposureLog(entries) m.mu.Lock() defer m.mu.Unlock() - m.strings["user:exposures:"+hash] = stringEntry{value: string(bin)} + m.strings[keyPrefixUserExposures+hash] = stringEntry{value: string(bin)} } // AddExposure appends an exposure entry to a user's log. Test helper. func (m *MockStore) AddExposure(token string, entry ExposureEntry) { hash := HashToken(token) - key := "user:exposures:" + hash + key := keyPrefixUserExposures + hash m.mu.Lock() defer m.mu.Unlock() existing := BinaryExposureLog(m.strings[key].value) @@ -127,7 +126,7 @@ func (m *MockStore) SetPackageContextConfig(pkgID string, cfg PackageContextConf data, _ := json.Marshal(cfg) m.mu.Lock() defer m.mu.Unlock() - m.strings[fmt.Sprintf("config:pkg:%s:context", pkgID)] = stringEntry{value: string(data)} + m.strings[keyPrefixConfigPkg+pkgID+":context"] = stringEntry{value: string(data)} } func (m *MockStore) SetIsMember(_ context.Context, key, member string) (bool, error) { @@ -334,6 +333,22 @@ func (m *MockStore) MSet(_ context.Context, kvs map[string]string, ttl time.Dura return nil } +func (m *MockStore) Del(_ context.Context, key string) error { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.strings, key) + return nil +} + +func (m *MockStore) MDel(_ context.Context, keys ...string) error { + m.mu.Lock() + defer m.mu.Unlock() + for _, key := range keys { + delete(m.strings, key) + } + return nil +} + // isExpired checks key-level expiry. Must be called with lock held. func (m *MockStore) isExpired(key string) bool { exp, ok := m.expiry[key] diff --git a/targeting/resolver.go b/targeting/resolver.go index 1be9fa3..6689dd7 100644 --- a/targeting/resolver.go +++ b/targeting/resolver.go @@ -38,7 +38,7 @@ type MediaBuyPackage struct { // Total: 2 Store round-trips (1 SetMembers + 1 MGet) regardless of media buy count. func ResolvePackages(ctx context.Context, store Store, sellerID, propertyID, country string, now time.Time) ([]tmproto.AvailablePackage, error) { // 1. Get all media buy IDs for this seller. - mbIDs, err := store.SetMembers(ctx, "mediabuy:seller:"+sellerID) + mbIDs, err := store.SetMembers(ctx, keyPrefixMediaBuySeller+sellerID) if err != nil { return nil, fmt.Errorf("resolve media buys for seller %s: %w", sellerID, err) } @@ -49,7 +49,7 @@ func ResolvePackages(ctx context.Context, store Store, sellerID, propertyID, cou // 2. Batch-load all media buy JSON. keys := make([]string, len(mbIDs)) for i, id := range mbIDs { - keys[i] = "mediabuy:" + id + keys[i] = keyPrefixMediaBuy + id } values, err := store.MGet(ctx, keys...) if err != nil { @@ -157,14 +157,14 @@ func Resolve(ctx context.Context, store Store, sellerID, propertyID, country str } // Topic index: load topic set from Store. - topics, _ := store.SetMembers(ctx, "topics:package:"+pkgID) + topics, _ := store.SetMembers(ctx, keyPrefixTopicsPackage+pkgID) for _, topic := range topics { topicIdx[topic] = append(topicIdx[topic], pkgID) } // URL blocklist index: load blocklist from Store. if cc := ctxConfigs[pkgID]; cc != nil && cc.URLBlocklist { - blocked, _ := store.SetMembers(ctx, "url:blocklist:"+pkgID) + blocked, _ := store.SetMembers(ctx, keyPrefixURLBlocklist+pkgID) for _, hash := range blocked { urlBlockIdx[hash] = append(urlBlockIdx[hash], pkgID) } @@ -172,7 +172,7 @@ func Resolve(ctx context.Context, store Store, sellerID, propertyID, country str // URL allowlist: load allowlist from Store. if cc := ctxConfigs[pkgID]; cc != nil && cc.URLAllowlist { - allowed, _ := store.SetMembers(ctx, "url:allowlist:"+pkgID) + allowed, _ := store.SetMembers(ctx, keyPrefixURLAllowlist+pkgID) if len(allowed) > 0 { set := make(map[string]struct{}, len(allowed)) for _, hash := range allowed { diff --git a/targeting/store.go b/targeting/store.go index a74ac42..55cbda6 100644 --- a/targeting/store.go +++ b/targeting/store.go @@ -5,6 +5,20 @@ import ( "time" ) +// Store key prefixes. +const ( + keyPrefixUserProfile = "user:profile:" + keyPrefixUserExposures = "user:exposures:" + keyPrefixTopicsArtifact = "topics:artifact:" + keyPrefixTopicsPackage = "topics:package:" + keyPrefixURLBlocklist = "url:blocklist:" + keyPrefixURLAllowlist = "url:allowlist:" + keyPrefixMediaBuySeller = "mediabuy:seller:" + keyPrefixMediaBuy = "mediabuy:" + keyPrefixConfigPkg = "config:pkg:" + keyPrefixConfigCampaign = "config:campaign:" +) + // Store is a storage backend for the targeting engine. // Implementations wrap Valkey/Redis or an in-memory mock. type Store interface { @@ -43,4 +57,10 @@ type Store interface { // MSet stores multiple key-value pairs with an optional TTL. Zero TTL means no expiry. MSet(ctx context.Context, kvs map[string]string, ttl time.Duration) error + + // Del removes a key. It is a no-op if the key does not exist. + Del(ctx context.Context, key string) error + + // MDel removes multiple keys. It is a no-op for keys that do not exist. + MDel(ctx context.Context, keys ...string) error } diff --git a/targeting/valkeystore/store.go b/targeting/valkeystore/store.go index 2ed3dd6..693b3e9 100644 --- a/targeting/valkeystore/store.go +++ b/targeting/valkeystore/store.go @@ -101,6 +101,17 @@ func (s *Store) MGet(ctx context.Context, keys ...string) ([]string, error) { return results, nil } +func (s *Store) Del(ctx context.Context, key string) error { + return s.rdb.Del(ctx, key).Err() +} + +func (s *Store) MDel(ctx context.Context, keys ...string) error { + if len(keys) == 0 { + return nil + } + return s.rdb.Del(ctx, keys...).Err() +} + func (s *Store) MSet(ctx context.Context, kvs map[string]string, ttl time.Duration) error { if len(kvs) == 0 { return nil