diff --git a/src/sentry/api/serializers/models/userreport.py b/src/sentry/api/serializers/models/userreport.py index ea7ab510054441..2664c2850add80 100644 --- a/src/sentry/api/serializers/models/userreport.py +++ b/src/sentry/api/serializers/models/userreport.py @@ -8,6 +8,7 @@ from sentry.models.group import Group from sentry.models.project import Project from sentry.models.userreport import UserReport +from sentry.search.eap.occurrences.query_utils import build_event_id_in_filter from sentry.services import eventstore from sentry.services.eventstore.models import Event from sentry.snuba.dataset import Dataset @@ -46,12 +47,14 @@ def get_attrs(self, item_list, user, **kwargs): project = Project.objects.get(id=item_list[0].project_id) retention = quotas.backend.get_event_retention(organization=project.organization) + event_ids = [item.event_id for item in item_list] events = eventstore.backend.get_events( filter=eventstore.Filter( - event_ids=[item.event_id for item in item_list], + event_ids=event_ids, project_ids=[project.id], start=timezone.now() - timedelta(days=retention) if retention else None, ), + eap_conditions=build_event_id_in_filter(event_ids), referrer="UserReportSerializer.get_attrs", dataset=Dataset.Events, tenant_ids={"organization_id": project.organization_id}, diff --git a/src/sentry/deletions/tasks/nodestore.py b/src/sentry/deletions/tasks/nodestore.py index 2b24d2f79ab5c4..0b57284fcd1682 100644 --- a/src/sentry/deletions/tasks/nodestore.py +++ b/src/sentry/deletions/tasks/nodestore.py @@ -4,6 +4,7 @@ from typing import Any import sentry_sdk +from sentry_protos.snuba.v1.trace_item_filter_pb2 import TraceItemFilter from snuba_sdk import DeleteQuery, Request from taskbroker_client.retry import Retry @@ -13,6 +14,11 @@ from sentry.exceptions import DeleteAborted from sentry.models.eventattachment import EventAttachment from sentry.models.userreport import UserReport +from sentry.search.eap.occurrences.query_utils import ( + build_group_id_in_filter, + build_keyset_pagination_filter, +) +from sentry.search.eap.rpc_utils import and_trace_item_filters from sentry.services import eventstore from sentry.services.eventstore.models import Event from sentry.silo.base import SiloMode @@ -155,6 +161,7 @@ def fetch_events_from_eventstore( ) -> list[Event]: logger.info("Fetching %s events for deletion.", limit) conditions = [] + eap_conditions: TraceItemFilter | None = build_group_id_in_filter(group_ids) if last_event_id and last_event_timestamp: conditions.extend( [ @@ -162,6 +169,13 @@ def fetch_events_from_eventstore( [["timestamp", "<", last_event_timestamp], ["event_id", "<", last_event_id]], ] ) + eap_conditions = and_trace_item_filters( + eap_conditions, + build_keyset_pagination_filter( + timestamp_value=last_event_timestamp, + event_id=last_event_id, + ), + ) events = eventstore.backend.get_unfetched_events( filter=eventstore.Filter( @@ -169,6 +183,7 @@ def fetch_events_from_eventstore( project_ids=[project_id], group_ids=group_ids, ), + eap_conditions=eap_conditions, limit=limit, referrer=referrer, orderby=["-timestamp", "-event_id"], diff --git a/src/sentry/feedback/tasks/update_user_reports.py b/src/sentry/feedback/tasks/update_user_reports.py index b3b50bafca85df..ff7ccf601f6844 100644 --- a/src/sentry/feedback/tasks/update_user_reports.py +++ b/src/sentry/feedback/tasks/update_user_reports.py @@ -9,6 +9,7 @@ from sentry.feedback.usecases.ingest.shim_to_feedback import shim_to_feedback from sentry.models.project import Project from sentry.models.userreport import UserReport +from sentry.search.eap.occurrences.query_utils import build_event_id_in_filter from sentry.services import eventstore from sentry.silo.base import SiloMode from sentry.snuba.referrer import Referrer @@ -90,7 +91,9 @@ def update_user_reports( ) try: events_chunk = eventstore.backend.get_events( - filter=snuba_filter, referrer=Referrer.TASKS_UPDATE_USER_REPORTS.value + filter=snuba_filter, + eap_conditions=build_event_id_in_filter(event_id_chunk), + referrer=Referrer.TASKS_UPDATE_USER_REPORTS.value, ) events.extend(events_chunk) except Exception: diff --git a/src/sentry/issues/endpoints/organization_eventid.py b/src/sentry/issues/endpoints/organization_eventid.py index e3da65c606a5d7..04f6e47c72759e 100644 --- a/src/sentry/issues/endpoints/organization_eventid.py +++ b/src/sentry/issues/endpoints/organization_eventid.py @@ -3,6 +3,8 @@ from drf_spectacular.utils import extend_schema from rest_framework.request import Request from rest_framework.response import Response +from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey, AttributeValue +from sentry_protos.snuba.v1.trace_item_filter_pb2 import ComparisonFilter, TraceItemFilter from sentry.api.api_owners import ApiOwner from sentry.api.api_publish_status import ApiPublishStatus @@ -18,6 +20,8 @@ from sentry.models.organization import Organization from sentry.models.project import Project from sentry.ratelimits.config import RateLimitConfig +from sentry.search.eap.occurrences.query_utils import build_event_id_in_filter +from sentry.search.eap.rpc_utils import and_trace_item_filters from sentry.services import eventstore from sentry.types.ratelimit import RateLimit, RateLimitCategory from sentry.utils.validators import INVALID_ID_DETAILS, is_event_id @@ -76,8 +80,21 @@ def get(self, request: Request, organization: Organization, event_id: str) -> Re project_ids=list(project_slugs_by_id.keys()), event_ids=[event_id], ) + eap_conditions = and_trace_item_filters( + build_event_id_in_filter([event_id]), + TraceItemFilter( + comparison_filter=ComparisonFilter( + key=AttributeKey(name="type", type=AttributeKey.TYPE_STRING), + op=ComparisonFilter.OP_NOT_EQUALS, + value=AttributeValue(val_str="transaction"), + ) + ), + ) event = eventstore.backend.get_events( - filter=snuba_filter, limit=1, tenant_ids={"organization_id": organization.id} + filter=snuba_filter, + eap_conditions=eap_conditions, + limit=1, + tenant_ids={"organization_id": organization.id}, )[0] except IndexError: raise ResourceDoesNotExist() diff --git a/src/sentry/issues/endpoints/project_events.py b/src/sentry/issues/endpoints/project_events.py index 44e7bdc96616a0..19dc36599ccc95 100644 --- a/src/sentry/issues/endpoints/project_events.py +++ b/src/sentry/issues/endpoints/project_events.py @@ -4,6 +4,7 @@ from rest_framework.exceptions import ParseError from rest_framework.request import Request from rest_framework.response import Response +from sentry_protos.snuba.v1.trace_item_filter_pb2 import TraceItemFilter from sentry.api.api_owners import ApiOwner from sentry.api.api_publish_status import ApiPublishStatus @@ -95,6 +96,7 @@ def get(self, request: Request, project: Project) -> Response: data_fn = partial( eventstore.backend.get_events, filter=event_filter, + eap_conditions=TraceItemFilter(), # TODO: not currently taking the query into account referrer="api.project-events", tenant_ids={"organization_id": project.organization_id}, ) diff --git a/src/sentry/models/group.py b/src/sentry/models/group.py index 8171735a35426c..492224f95ac3f4 100644 --- a/src/sentry/models/group.py +++ b/src/sentry/models/group.py @@ -18,6 +18,8 @@ from django.utils import timezone from django.utils.http import urlencode from django.utils.translation import gettext_lazy as _ +from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey +from sentry_protos.snuba.v1.trace_item_filter_pb2 import ExistsFilter, TraceItemFilter from snuba_sdk import Column, Condition, Op from sentry import eventstore, eventtypes, options, tagstore @@ -44,6 +46,8 @@ from sentry.models.commit import Commit from sentry.models.grouphistory import record_group_history, record_group_history_from_activity_type from sentry.models.organization import Organization +from sentry.search.eap.occurrences.query_utils import build_event_id_in_filter +from sentry.search.eap.rpc_utils import and_trace_item_filters from sentry.services.eventstore.models import GroupEvent from sentry.snuba.dataset import Dataset from sentry.snuba.referrer import Referrer @@ -435,6 +439,14 @@ def filter_by_event_id(self, project_ids, event_id, tenant_ids=None): project_ids=project_ids, conditions=[["group_id", "IS NOT NULL", None]], ), + eap_conditions=and_trace_item_filters( + build_event_id_in_filter([event_id]), + TraceItemFilter( + exists_filter=ExistsFilter( + key=AttributeKey(name="group_id", type=AttributeKey.TYPE_INT) + ) + ), + ), limit=max(len(project_ids), 100), referrer="Group.filter_by_event_id", tenant_ids=tenant_ids, diff --git a/src/sentry/search/eap/occurrences/query_utils.py b/src/sentry/search/eap/occurrences/query_utils.py index f6ae9bc8971a67..a4e6bab1437d02 100644 --- a/src/sentry/search/eap/occurrences/query_utils.py +++ b/src/sentry/search/eap/occurrences/query_utils.py @@ -2,9 +2,18 @@ from datetime import datetime from typing import Any +from sentry_protos.snuba.v1.trace_item_attribute_pb2 import ( + AttributeKey, + AttributeValue, + IntArray, + StrArray, +) +from sentry_protos.snuba.v1.trace_item_filter_pb2 import ComparisonFilter, TraceItemFilter + from sentry.models.environment import Environment from sentry.models.organization import Organization from sentry.models.project import Project +from sentry.search.eap.rpc_utils import and_trace_item_filters, or_trace_item_filters from sentry.search.events.types import SnubaParams @@ -74,3 +83,56 @@ def _to_count_map(rows: Sequence[Mapping[str, Any]]) -> dict[Hashable, int]: return False return all(exp_count <= control_map[key] for key, exp_count in experimental_map.items()) + + +def build_group_id_in_filter(group_ids: Sequence[int]) -> TraceItemFilter: + return TraceItemFilter( + comparison_filter=ComparisonFilter( + key=AttributeKey(name="group_id", type=AttributeKey.TYPE_INT), + op=ComparisonFilter.OP_IN, + value=AttributeValue(val_int_array=IntArray(values=list(group_ids))), + ) + ) + + +def build_event_id_in_filter(event_ids: Sequence[str]) -> TraceItemFilter: + return TraceItemFilter( + comparison_filter=ComparisonFilter( + key=AttributeKey(name="sentry.item_id", type=AttributeKey.TYPE_STRING), + op=ComparisonFilter.OP_IN, + value=AttributeValue(val_str_array=StrArray(values=list(event_ids))), + ) + ) + + +def build_keyset_pagination_filter( + timestamp_value: str, + event_id: str, +) -> TraceItemFilter | None: + ts_epoch = datetime.fromisoformat(timestamp_value).timestamp() + timestamp_key = AttributeKey(name="sentry.timestamp", type=AttributeKey.TYPE_DOUBLE) + event_id_key = AttributeKey(name="sentry.item_id", type=AttributeKey.TYPE_STRING) + + ts_lte = TraceItemFilter( + comparison_filter=ComparisonFilter( + key=timestamp_key, + op=ComparisonFilter.OP_LESS_THAN_OR_EQUALS, + value=AttributeValue(val_double=ts_epoch), + ) + ) + ts_lt = TraceItemFilter( + comparison_filter=ComparisonFilter( + key=timestamp_key, + op=ComparisonFilter.OP_LESS_THAN, + value=AttributeValue(val_double=ts_epoch), + ) + ) + eid_lt = TraceItemFilter( + comparison_filter=ComparisonFilter( + key=event_id_key, + op=ComparisonFilter.OP_LESS_THAN, + value=AttributeValue(val_str=event_id), + ) + ) + + return and_trace_item_filters(ts_lte, or_trace_item_filters(ts_lt, eid_lt)) diff --git a/src/sentry/seer/explorer/tools.py b/src/sentry/seer/explorer/tools.py index c5d5dc1df390df..0910968b8db180 100644 --- a/src/sentry/seer/explorer/tools.py +++ b/src/sentry/seer/explorer/tools.py @@ -32,6 +32,7 @@ from sentry.replays.post_process import process_raw_response from sentry.replays.query import query_replay_id_by_prefix, query_replay_instance from sentry.search.eap.constants import BOOLEAN, DOUBLE, INT, STRING +from sentry.search.eap.occurrences.query_utils import build_event_id_in_filter from sentry.search.eap.resolver import SearchResolver from sentry.search.eap.types import SearchResolverConfig from sentry.search.events.constants import ISSUE_ID_ALIAS @@ -1255,6 +1256,7 @@ def get_event_details( organization_id=organization_id, project_ids=project_ids, ), + eap_conditions=build_event_id_in_filter([event_id]), limit=1, tenant_ids={"organization_id": organization_id}, dataset=dataset, @@ -1355,6 +1357,7 @@ def get_issue_and_event_details_v2( organization_id=organization_id, project_ids=project_ids, ), + eap_conditions=build_event_id_in_filter([event_id]), limit=1, tenant_ids={"organization_id": organization_id}, dataset=dataset, diff --git a/src/sentry/services/eventstore/base.py b/src/sentry/services/eventstore/base.py index 79016ff981f4c9..876998ee1db4a1 100644 --- a/src/sentry/services/eventstore/base.py +++ b/src/sentry/services/eventstore/base.py @@ -7,12 +7,14 @@ from typing import Any, Literal, Self, overload import sentry_sdk +from sentry_protos.snuba.v1.trace_item_filter_pb2 import TraceItemFilter from snuba_sdk import Condition from sentry import nodestore from sentry.services.eventstore.models import Event, GroupEvent from sentry.snuba.dataset import Dataset from sentry.snuba.events import Columns +from sentry.snuba.referrer import Referrer from sentry.utils.services import Service @@ -166,10 +168,11 @@ class EventStorage(Service): def get_events( self, filter: Filter, + eap_conditions: TraceItemFilter | None = None, orderby: Sequence[str] | None = None, limit: int = 100, offset: int = 0, - referrer: str = "eventstore.get_events", + referrer: str = Referrer.EVENTSTORE_GET_EVENTS.value, dataset: Dataset = Dataset.Events, tenant_ids: Mapping[str, Any] | None = None, ) -> list[Event]: @@ -180,11 +183,12 @@ def get_events( transaction events. Returns an empty list if no events match the filter. Arguments: - snuba_filter (Filter): Filter + filter (Filter): Snuba query filter + eap_conditions (TraceItemFilter | None): EAP query conditions orderby (Sequence[str]): List of fields to order by - default ['-time', '-event_id'] limit (int): Query limit - default 100 offset (int): Query offset - default 0 - referrer (string): Referrer - default "eventstore.get_events" + referrer (string): Referrer """ raise NotImplementedError @@ -208,28 +212,30 @@ def get_events_snql( def get_unfetched_events( self, filter: Filter, + eap_conditions: TraceItemFilter | None = None, orderby: Sequence[str] | None = None, limit: int = 100, offset: int = 0, - referrer: str = "eventstore.get_unfetched_events", + referrer: str = Referrer.EVENTSTORE_GET_UNFETCHED_EVENTS.value, dataset: Dataset = Dataset.Events, tenant_ids: Mapping[str, Any] | None = None, ) -> list[Event]: """ - Same as get_events but returns events without their node datas loaded. - Only the event ID, projectID, groupID and timestamp field will be present without - an additional fetch to nodestore. + Same as get_events but returns events without their node data loaded. + Only the event ID, project ID, group ID, and timestamp fields will be present + without an additional fetch to nodestore. Used for fetching large volumes of events that do not need data loaded from nodestore. Currently this is just used for event data deletions where we just need the event IDs in order to process the deletions. Arguments: - snuba_filter (Filter): Filter + filter (Filter): Snuba query filter + eap_conditions (TraceItemFilter | None): EAP query conditions orderby (Sequence[str]): List of fields to order by - default ['-time', '-event_id'] limit (int): Query limit - default 100 offset (int): Query offset - default 0 - referrer (string): Referrer - default "eventstore.get_unfetched_events" + referrer (string): Referrer """ raise NotImplementedError diff --git a/src/sentry/services/eventstore/snuba/backend.py b/src/sentry/services/eventstore/snuba/backend.py index 59a4203984f1cc..85ea6f710c8e15 100644 --- a/src/sentry/services/eventstore/snuba/backend.py +++ b/src/sentry/services/eventstore/snuba/backend.py @@ -9,6 +9,7 @@ import sentry_sdk from django.utils import timezone +from sentry_protos.snuba.v1.trace_item_filter_pb2 import TraceItemFilter from snuba_sdk import ( Column, Condition, @@ -227,10 +228,11 @@ def get_events_snql( def get_events( self, filter: Filter, + eap_conditions: TraceItemFilter | None = None, orderby: Sequence[str] | None = None, limit: int = DEFAULT_LIMIT, offset: int = DEFAULT_OFFSET, - referrer: str = "eventstore.get_events", + referrer: str = Referrer.EVENTSTORE_GET_EVENTS.value, dataset: Dataset = Dataset.Events, tenant_ids: Mapping[str, Any] | None = None, ) -> list[Event]: @@ -240,6 +242,7 @@ def get_events( with sentry_sdk.start_span(op="eventstore.snuba.get_events"): return self.__get_events( filter, + eap_conditions=eap_conditions, orderby=orderby, limit=limit, offset=offset, @@ -252,10 +255,11 @@ def get_events( def get_unfetched_events( self, filter: Filter, + eap_conditions: TraceItemFilter | None = None, orderby: Sequence[str] | None = None, limit: int = DEFAULT_LIMIT, offset: int = DEFAULT_OFFSET, - referrer: str = "eventstore.get_unfetched_events", + referrer: str = Referrer.EVENTSTORE_GET_UNFETCHED_EVENTS.value, dataset: Dataset = Dataset.Events, tenant_ids: Mapping[str, Any] | None = None, ) -> list[Event]: @@ -264,6 +268,7 @@ def get_unfetched_events( """ return self.__get_events( filter, + eap_conditions=eap_conditions, orderby=orderby, limit=limit, offset=offset, @@ -276,10 +281,11 @@ def get_unfetched_events( def __get_events( self, filter: Filter, + eap_conditions: TraceItemFilter | None = None, orderby: Sequence[str] | None = None, limit: int = DEFAULT_LIMIT, offset: int = DEFAULT_OFFSET, - referrer: str = "eventstore.get_unfetched_events", + referrer: str = Referrer.EVENTSTORE_GET_UNFETCHED_EVENTS.value, should_bind_nodes: bool = False, dataset: Dataset = Dataset.Events, tenant_ids: Mapping[str, Any] | None = None, @@ -368,6 +374,56 @@ def __get_events( events = [self.__make_event(evt) for evt in result["data"]] if should_bind_nodes: self.bind_nodes(events) + + callsite = "eventstore.backend.get_events" + if ( + eap_conditions is not None + and tenant_ids + and "organization_id" in tenant_ids + and EAPOccurrencesComparator.should_check_experiment(callsite) + ): + occurrence_category = ( + OccurrenceCategory.ISSUE_PLATFORM + if dataset == Dataset.IssuePlatform + else OccurrenceCategory.ERROR + ) + eap_results = self._get_events_eap( + eap_conditions=eap_conditions, + project_ids=filter.project_ids or [], + organization_id=tenant_ids["organization_id"], + occurrence_category=occurrence_category, + orderby=orderby, + limit=limit, + offset=offset, + referrer=referrer, + start=filter.start, + end=filter.end, + group_ids=filter.group_ids, + ) + control_data = {(e.event_id, e.group_id) for e in events} + experimental_data = ( + {(row["id"], row["group_id"]) for row in eap_results} + if eap_results is not None + else set() + ) + EAPOccurrencesComparator.check_and_choose( + control_data=control_data, + experimental_data=experimental_data, + callsite=callsite, + is_experimental_data_a_null_result=eap_results is None, + reasonable_match_comparator=lambda ctl, exp: exp.issubset(ctl), + debug_context={ + "project_ids": list(filter.project_ids) if filter.project_ids else [], + "group_ids": list(filter.group_ids) if filter.group_ids else [], + "dataset": dataset.value, + "limit": limit, + "offset": offset, + "referrer": referrer, + "control_count": len(events), + "experimental_count": len(experimental_data), + }, + ) + return events return [] @@ -606,6 +662,90 @@ def _get_event_by_id_eap( ) return None + def _get_events_eap( + self, + eap_conditions: TraceItemFilter, + project_ids: Sequence[int], + organization_id: int, + occurrence_category: OccurrenceCategory, + orderby: Sequence[str] | None = None, + limit: int = DEFAULT_LIMIT, + offset: int = DEFAULT_OFFSET, + referrer: str = Referrer.EVENTSTORE_GET_EVENTS.value, + start: datetime | None = None, + end: datetime | None = None, + group_ids: Sequence[int] | None = None, + ) -> list[dict[str, Any]] | None: + try: + try: + organization = Organization.objects.get_from_cache(id=organization_id) + except Organization.DoesNotExist: + return None + + projects = list( + Project.objects.filter(id__in=project_ids, organization_id=organization_id) + ) + if not projects: + return None + + try: + prepared_start, prepared_end = _prepare_start_end( + start, + end, + organization_id, + group_ids, + ) + except (snuba.QueryOutsideRetentionError, snuba.QueryOutsideGroupActivityError): + return None + + snuba_params = SnubaParams( + start=prepared_start, + end=prepared_end, + organization=organization, + projects=projects, + environments=[], + ) + + eap_orderby: list[str] | None = None + if orderby: + eap_orderby = [] + for col in orderby: + stripped = col.lstrip("-") + prefix = "-" if col.startswith("-") else "" + # EAP uses "id" as the public alias for event_id + eap_name = "id" if stripped == "event_id" else stripped + eap_orderby.append(f"{prefix}{eap_name}") + + selected_columns = ["id", "group_id", "project_id", "timestamp"] + if occurrence_category == OccurrenceCategory.ISSUE_PLATFORM: + selected_columns.append("issue_occurrence_id") + + result = Occurrences.run_table_query( + params=snuba_params, + query_string="", + selected_columns=selected_columns, + orderby=eap_orderby, + offset=offset, + limit=limit, + referrer=referrer, + config=SearchResolverConfig(), + occurrence_category=occurrence_category, + extra_conditions=eap_conditions, + ) + + return result.get("data", []) + + except Exception: + logger.exception( + "EAP query failed in _get_events_eap", + extra={ + "project_ids": list(project_ids), + "group_ids": list(group_ids) if group_ids else [], + "referrer": referrer, + }, + ) + return None + def _get_dataset_for_event(self, event: Event | GroupEvent) -> Dataset: if getattr(event, "occurrence", None) or event.get_event_type() == "generic": return Dataset.IssuePlatform diff --git a/src/sentry/snuba/occurrences_rpc.py b/src/sentry/snuba/occurrences_rpc.py index f1a73c5005e3d9..708dea54892f84 100644 --- a/src/sentry/snuba/occurrences_rpc.py +++ b/src/sentry/snuba/occurrences_rpc.py @@ -15,6 +15,7 @@ from sentry.search.eap.columns import ColumnDefinitions, ResolvedAttribute, ResolvedColumn from sentry.search.eap.occurrences.definitions import OCCURRENCE_DEFINITIONS from sentry.search.eap.resolver import SearchResolver +from sentry.search.eap.rpc_utils import and_trace_item_filters from sentry.search.eap.types import AdditionalQueries, EAPResponse, SearchResolverConfig from sentry.search.events.types import SAMPLING_MODES, SnubaData, SnubaParams from sentry.snuba import rpc_dataset_common @@ -59,6 +60,7 @@ def run_table_query( page_token: PageToken | None = None, additional_queries: AdditionalQueries | None = None, occurrence_category: OccurrenceCategory | None = None, + extra_conditions: TraceItemFilter | None = None, ) -> EAPResponse: return cls._run_table_query( rpc_dataset_common.TableQuery( @@ -73,7 +75,10 @@ def run_table_query( resolver=search_resolver or cls.get_resolver(params, config), page_token=page_token, additional_queries=additional_queries, - extra_conditions=cls._build_category_filter(occurrence_category), + extra_conditions=and_trace_item_filters( + cls._build_category_filter(occurrence_category), + extra_conditions, + ), ), params.debug, ) diff --git a/src/sentry/tasks/reprocessing2.py b/src/sentry/tasks/reprocessing2.py index 1d8a4dbf27a4fd..3d4d9198c3c01a 100644 --- a/src/sentry/tasks/reprocessing2.py +++ b/src/sentry/tasks/reprocessing2.py @@ -9,6 +9,7 @@ from sentry import eventstream, nodestore from sentry.models.project import Project from sentry.reprocessing2 import buffered_delete_old_primary_hash +from sentry.search.eap.occurrences.query_utils import build_group_id_in_filter from sentry.services import eventstore from sentry.services.eventstore.models import Event from sentry.silo.base import SiloMode @@ -73,6 +74,7 @@ def reprocess_group( tenant_ids={ "organization_id": Project.objects.get_from_cache(id=project_id).organization_id }, + eap_conditions=build_group_id_in_filter([group_id]), ) if not events: diff --git a/src/sentry/tasks/unmerge.py b/src/sentry/tasks/unmerge.py index b4a8b5836737e2..c4bdbaa2917765 100644 --- a/src/sentry/tasks/unmerge.py +++ b/src/sentry/tasks/unmerge.py @@ -24,6 +24,7 @@ from sentry.models.project import Project from sentry.models.release import Release from sentry.models.userreport import UserReport +from sentry.search.eap.occurrences.query_utils import build_group_id_in_filter from sentry.services import eventstore from sentry.services.eventstore.models import GroupEvent from sentry.silo.base import SiloMode @@ -532,6 +533,7 @@ def unmerge(*posargs: Any, **kwargs: Any) -> None: state=last_event, referrer="unmerge", tenant_ids={"organization_id": source.project.organization_id}, + eap_conditions=build_group_id_in_filter([source.id]), ) # Convert Event objects to GroupEvent objects events: list[GroupEvent] = [event.for_group(source) for event in raw_events] diff --git a/src/sentry/utils/query.py b/src/sentry/utils/query.py index c9a694aeb60dca..b20fe8e61fb262 100644 --- a/src/sentry/utils/query.py +++ b/src/sentry/utils/query.py @@ -13,6 +13,7 @@ from django.db.models.sql.constants import ROW_COUNT from django.db.models.sql.subqueries import DeleteQuery from django.db.utils import OperationalError +from sentry_protos.snuba.v1.trace_item_filter_pb2 import TraceItemFilter from sentry.db.models.base import Model from sentry.services import eventstore @@ -40,6 +41,7 @@ def task_run_batch_query( state: TaskBulkQueryState | None = None, fetch_events: bool = True, tenant_ids: dict[str, int | str] | None = None, + eap_conditions: TraceItemFilter | None = None, ) -> tuple[TaskBulkQueryState | None, list[Event]]: """ A tool for batched queries similar in purpose to RangeQuerySetWrapper that @@ -69,6 +71,17 @@ def task_run_batch_query( filter.conditions.append( [["timestamp", "<", state["timestamp"]], ["event_id", "<", state["event_id"]]] ) + if eap_conditions is not None: + from sentry.search.eap.occurrences.query_utils import build_keyset_pagination_filter + from sentry.search.eap.rpc_utils import and_trace_item_filters + + eap_conditions = and_trace_item_filters( + eap_conditions, + build_keyset_pagination_filter( + timestamp_value=state["timestamp"], + event_id=state["event_id"], + ), + ) method = ( eventstore.backend.get_events if fetch_events else eventstore.backend.get_unfetched_events @@ -77,6 +90,7 @@ def task_run_batch_query( events = list( method( filter=filter, + eap_conditions=eap_conditions, limit=batch_size, referrer=referrer, orderby=["-timestamp", "-event_id"], diff --git a/tests/sentry/services/eventstore/snuba/test_backend.py b/tests/sentry/services/eventstore/snuba/test_backend.py index d6f742b87692e1..26ef60d9ab8856 100644 --- a/tests/sentry/services/eventstore/snuba/test_backend.py +++ b/tests/sentry/services/eventstore/snuba/test_backend.py @@ -6,6 +6,12 @@ from snuba_sdk import Column, Condition, Op from sentry.issues.grouptype import PerformanceNPlusOneGroupType +from sentry.search.eap.occurrences.query_utils import ( + build_group_id_in_filter, + build_keyset_pagination_filter, +) +from sentry.search.eap.occurrences.rollout_utils import EAPOccurrencesComparator +from sentry.search.eap.rpc_utils import and_trace_item_filters from sentry.services.eventstore.base import Filter from sentry.services.eventstore.models import Event from sentry.services.eventstore.snuba.backend import SnubaEventStorage @@ -891,3 +897,175 @@ def test_get_event_by_id_eap_respects_time_window(self) -> None: occurrence_category=OccurrenceCategory.ERROR, ) assert result is None + + def test_get_events_eap_basic(self) -> None: + group = self.create_group(project=self.project) + event_ids = [uuid4().hex for _ in range(3)] + timestamps = [ + before_now(minutes=3), + before_now(minutes=2), + before_now(minutes=1), + ] + + items = [ + self.create_eap_occurrence( + group_id=group.id, + event_id=eid, + timestamp=ts, + ) + for eid, ts in zip(event_ids, timestamps) + ] + self.store_eap_items(items) + + result = self.eventstore._get_events_eap( + eap_conditions=build_group_id_in_filter([group.id]), + project_ids=[self.project.id], + organization_id=self.organization.id, + occurrence_category=OccurrenceCategory.ERROR, + orderby=["-timestamp", "-event_id"], + limit=10, + offset=0, + referrer="test.get_events_eap_basic", + ) + + assert result is not None + assert len(result) == 3 + for row in result: + assert row["id"] in event_ids + assert row["group_id"] == group.id + assert row["project_id"] == self.project.id + assert "timestamp" in row + + # Verify descending timestamp order + result_timestamps = [row["timestamp"] for row in result] + assert result_timestamps == sorted(result_timestamps, reverse=True) + + def test_get_events_eap_with_pagination(self) -> None: + group = self.create_group(project=self.project) + timestamps = [before_now(minutes=i) for i in range(5, 0, -1)] + event_ids = [uuid4().hex for _ in range(5)] + + items = [ + self.create_eap_occurrence( + group_id=group.id, + event_id=eid, + timestamp=ts, + ) + for eid, ts in zip(event_ids, timestamps) + ] + self.store_eap_items(items) + + # Use the middle event as a pagination cursor + middle_ts = timestamps[2].isoformat() + middle_eid = event_ids[2] + + eap_conditions = and_trace_item_filters( + build_group_id_in_filter([group.id]), + build_keyset_pagination_filter( + timestamp_value=middle_ts, + event_id=middle_eid, + ), + ) + assert eap_conditions is not None + + result = self.eventstore._get_events_eap( + eap_conditions=eap_conditions, + project_ids=[self.project.id], + organization_id=self.organization.id, + occurrence_category=OccurrenceCategory.ERROR, + orderby=["-timestamp", "-event_id"], + limit=10, + offset=0, + referrer="test.get_events_eap_pagination", + ) + + assert result is not None + # Should only include events at or before the middle timestamp + assert len(result) <= 3 + + def test_get_events_eap_occurrence_category_filtering(self) -> None: + group = self.create_group(project=self.project) + error_event_id = uuid4().hex + ip_event_id = uuid4().hex + occurrence_id = uuid4().hex + + error_item = self.create_eap_occurrence( + group_id=group.id, + event_id=error_event_id, + timestamp=self.now, + ) + ip_item = self.create_eap_occurrence( + group_id=group.id, + event_id=ip_event_id, + timestamp=self.now, + issue_occurrence_id=occurrence_id, + ) + self.store_eap_items([error_item, ip_item]) + + eap_conditions = build_group_id_in_filter([group.id]) + + error_result = self.eventstore._get_events_eap( + eap_conditions=eap_conditions, + project_ids=[self.project.id], + organization_id=self.organization.id, + occurrence_category=OccurrenceCategory.ERROR, + ) + assert error_result is not None + error_ids = {row["id"] for row in error_result} + assert error_event_id in error_ids + assert ip_event_id not in error_ids + + ip_result = self.eventstore._get_events_eap( + eap_conditions=eap_conditions, + project_ids=[self.project.id], + organization_id=self.organization.id, + occurrence_category=OccurrenceCategory.ISSUE_PLATFORM, + ) + assert ip_result is not None + ip_ids = {row["id"] for row in ip_result} + assert ip_event_id in ip_ids + assert error_event_id not in ip_ids + + def test_get_events_double_read_end_to_end(self) -> None: + event = self.store_event( + data={ + "event_id": uuid4().hex, + "type": "default", + "platform": "python", + "fingerprint": ["group1"], + "timestamp": self.now.isoformat(), + }, + project_id=self.project.id, + ) + + trace_item = self.create_eap_occurrence( + group_id=event.group_id, + event_id=event.event_id, + timestamp=self.now, + ) + self.store_eap_items([trace_item]) + + eap_conditions = build_group_id_in_filter([event.group_id]) + callsite = "eventstore.backend.get_events" + + with self.options( + { + EAPOccurrencesComparator._should_eval_option_name(): True, + EAPOccurrencesComparator._callsite_allowlist_option_name(): [callsite], + } + ): + events = self.eventstore.get_events( + filter=Filter( + project_ids=[self.project.id], + group_ids=[event.group_id], + ), + eap_conditions=eap_conditions, + tenant_ids={ + "organization_id": self.organization.id, + "referrer": "test.double_read", + }, + ) + + assert len(events) == 1 + assert events[0].event_id == event.event_id + assert events[0].group_id == event.group_id