diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index a9a4944d73c758..04fcded8941adb 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -880,6 +880,7 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str: "sentry.integrations.github.tasks.pr_comment", "sentry.integrations.github.tasks.sync_repos", "sentry.integrations.github.tasks.sync_repos_on_install_change", + "sentry.integrations.source_code_management.sync_repos", "sentry.integrations.gitlab.tasks", "sentry.integrations.jira.tasks", "sentry.integrations.opsgenie.tasks", @@ -1258,8 +1259,8 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str: "task": "sdk.control:sentry.tasks.release_registry.fetch_release_registry_data_control", "schedule": crontab("*/5", "*", "*", "*", "*"), }, - "github-repo-sync-beat": { - "task": "integrations.control:sentry.integrations.github.tasks.sync_repos.github_repo_sync_beat", + "scm-repo-sync-beat": { + "task": "integrations.control:sentry.integrations.source_code_management.sync_repos.scm_repo_sync_beat", "schedule": timedelta(minutes=1), }, } diff --git a/src/sentry/integrations/github/tasks/__init__.py b/src/sentry/integrations/github/tasks/__init__.py index d86fd1ec513b6e..c2f359a7950efc 100644 --- a/src/sentry/integrations/github/tasks/__init__.py +++ b/src/sentry/integrations/github/tasks/__init__.py @@ -2,7 +2,7 @@ from .codecov_account_unlink import codecov_account_unlink from .link_all_repos import link_all_repos from .pr_comment import github_comment_workflow -from .sync_repos import github_repo_sync_beat, sync_repos_for_org +from .sync_repos import github_repo_sync_beat, scm_repo_sync_beat, sync_repos_for_org from .sync_repos_on_install_change import sync_repos_on_install_change __all__ = ( @@ -11,6 +11,7 @@ "github_comment_workflow", "github_repo_sync_beat", "link_all_repos", + "scm_repo_sync_beat", "sync_repos_for_org", "sync_repos_on_install_change", ) diff --git a/src/sentry/integrations/github/tasks/sync_repos.py b/src/sentry/integrations/github/tasks/sync_repos.py index c008433b1308b8..0a0eb8d2327c5c 100644 --- a/src/sentry/integrations/github/tasks/sync_repos.py +++ b/src/sentry/integrations/github/tasks/sync_repos.py @@ -1,256 +1,20 @@ """ -Periodic repo sync for GitHub integrations. - -The beat task (`github_repo_sync_beat`) runs on a schedule and uses -CursoredScheduler to iterate over all active GitHub OrganizationIntegrations. -For each one, it dispatches `sync_repos_for_org` which diffs GitHub's repo -list against Sentry's Repository table and creates/disables/re-enables as needed. +Backwards-compatible re-exports. The sync task has moved to +sentry.integrations.source_code_management.sync_repos. """ -import logging -from datetime import timedelta - -from taskbroker_client.retry import Retry - -from sentry import features -from sentry.constants import ObjectStatus -from sentry.integrations.models.organization_integration import OrganizationIntegration -from sentry.integrations.services.integration import integration_service -from sentry.integrations.services.repository.service import repository_service -from sentry.integrations.source_code_management.metrics import ( - SCMIntegrationInteractionEvent, - SCMIntegrationInteractionType, +from sentry.integrations.source_code_management.sync_repos import ( + scm_repo_sync_beat as scm_repo_sync_beat, ) -from sentry.integrations.source_code_management.repo_audit import log_repo_change -from sentry.organizations.services.organization import organization_service -from sentry.plugins.providers.integration_repository import get_integration_repository_provider -from sentry.shared_integrations.exceptions import ApiError -from sentry.silo.base import SiloMode -from sentry.tasks.base import instrumented_task, retry -from sentry.taskworker.namespaces import integrations_control_tasks -from sentry.utils import metrics -from sentry.utils.cursored_scheduler import CursoredScheduler - -from .link_all_repos import get_repo_config - -logger = logging.getLogger(__name__) - - -@instrumented_task( - name="sentry.integrations.github.tasks.sync_repos.sync_repos_for_org", - namespace=integrations_control_tasks, - retry=Retry(times=3, delay=120), - processing_deadline_duration=120, - silo_mode=SiloMode.CONTROL, +from sentry.integrations.source_code_management.sync_repos import ( + sync_repos_for_org as sync_repos_for_org, ) -@retry() -def sync_repos_for_org(organization_integration_id: int) -> None: - """ - Sync repositories for a single OrganizationIntegration. - - Fetches all repos from GitHub, diffs against Sentry's Repository table, - and creates/disables/re-enables repos as needed. - """ - try: - oi = OrganizationIntegration.objects.get( - id=organization_integration_id, - status=ObjectStatus.ACTIVE, - ) - except OrganizationIntegration.DoesNotExist: - logger.info( - "sync_repos_for_org.missing_org_integration", - extra={"organization_integration_id": organization_integration_id}, - ) - return - - integration = integration_service.get_integration( - integration_id=oi.integration_id, status=ObjectStatus.ACTIVE - ) - if integration is None: - logger.info( - "sync_repos_for_org.missing_integration", - extra={"integration_id": oi.integration_id}, - ) - return - - organization_id = oi.organization_id - org_context = organization_service.get_organization_by_id( - id=organization_id, include_projects=False, include_teams=False - ) - if org_context is None: - logger.info( - "sync_repos_for_org.missing_organization", - extra={"organization_id": organization_id}, - ) - return - - rpc_org = org_context.organization - if not features.has("organizations:github-repo-auto-sync", rpc_org): - return - - provider = f"integrations:{integration.provider}" - dry_run = not features.has("organizations:github-repo-auto-sync-apply", rpc_org) - - with SCMIntegrationInteractionEvent( - interaction_type=SCMIntegrationInteractionType.SYNC_REPOS, - integration_id=integration.id, - organization_id=organization_id, - provider_key=integration.provider, - ).capture(): - installation = integration.get_installation(organization_id=organization_id) - client = installation.get_client() - - try: - github_repos = client.get_repos() - except ApiError as e: - if installation.is_rate_limited_error(e): - logger.info( - "sync_repos_for_org.rate_limited", - extra={ - "integration_id": integration.id, - "organization_id": organization_id, - }, - ) - raise - - github_external_ids = {str(repo["id"]) for repo in github_repos} - all_repos = repository_service.get_repositories( - organization_id=organization_id, - integration_id=integration.id, - providers=[provider], - ) - active_repos = [r for r in all_repos if r.status == ObjectStatus.ACTIVE and r.external_id] - disabled_repos = [ - r for r in all_repos if r.status == ObjectStatus.DISABLED and r.external_id - ] +# Legacy alias +github_repo_sync_beat = scm_repo_sync_beat - sentry_active_ids = {r.external_id for r in active_repos} - sentry_disabled_ids = {r.external_id for r in disabled_repos} - - new_ids = github_external_ids - sentry_active_ids - sentry_disabled_ids - removed_ids = sentry_active_ids - github_external_ids - restored_ids = sentry_disabled_ids & github_external_ids - - metric_tags = { - "provider": integration.provider, - "dry_run": str(dry_run), - } - metrics.distribution("scm.repo_sync.new_repos", len(new_ids), tags=metric_tags) - metrics.distribution("scm.repo_sync.removed_repos", len(removed_ids), tags=metric_tags) - metrics.distribution("scm.repo_sync.restored_repos", len(restored_ids), tags=metric_tags) - metrics.distribution( - "scm.repo_sync.provider_total", len(github_external_ids), tags=metric_tags - ) - metrics.distribution( - "scm.repo_sync.sentry_active", len(sentry_active_ids), tags=metric_tags - ) - metrics.distribution( - "scm.repo_sync.sentry_disabled", len(sentry_disabled_ids), tags=metric_tags - ) - - if new_ids or removed_ids or restored_ids: - logger.info( - "scm.repo_sync.diff", - extra={ - "provider": integration.provider, - "integration_id": integration.id, - "organization_id": organization_id, - "dry_run": dry_run, - "provider_total": len(github_external_ids), - "sentry_active": len(sentry_active_ids), - "sentry_disabled": len(sentry_disabled_ids), - "new": len(new_ids), - "removed": len(removed_ids), - "restored": len(restored_ids), - }, - ) - - if dry_run: - return - - repo_by_external_id = {r.external_id: r for r in active_repos + disabled_repos} - - if new_ids: - integration_repo_provider = get_integration_repository_provider(integration) - repo_configs = [ - get_repo_config(repo, integration.id) - for repo in github_repos - if str(repo["id"]) in new_ids - ] - if repo_configs: - created_repos, reactivated_repos, _ = integration_repo_provider.create_repositories( - configs=repo_configs, organization=rpc_org - ) - - for repo in created_repos: - log_repo_change( - event_name="REPO_ADDED", - organization_id=organization_id, - repo=repo, - source="automatic SCM syncing", - provider=integration.provider, - ) - - for repo in reactivated_repos: - log_repo_change( - event_name="REPO_ENABLED", - organization_id=organization_id, - repo=repo, - source="automatic SCM syncing", - provider=integration.provider, - ) - - if removed_ids: - repository_service.disable_repositories_by_external_ids( - organization_id=organization_id, - integration_id=integration.id, - provider=provider, - external_ids=list(removed_ids), - ) - - for eid in removed_ids: - removed_repo = repo_by_external_id.get(eid) - if removed_repo: - log_repo_change( - event_name="REPO_DISABLED", - organization_id=organization_id, - repo=removed_repo, - source="automatic SCM syncing", - provider=integration.provider, - ) - - if restored_ids: - for repo in disabled_repos: - if repo.external_id in restored_ids: - repo.status = ObjectStatus.ACTIVE - repository_service.update_repository( - organization_id=organization_id, update=repo - ) - log_repo_change( - event_name="REPO_ENABLED", - organization_id=organization_id, - repo=repo, - source="automatic SCM syncing", - provider=integration.provider, - ) - - -@instrumented_task( - name="sentry.integrations.github.tasks.sync_repos.github_repo_sync_beat", - namespace=integrations_control_tasks, - silo_mode=SiloMode.CONTROL, -) -def github_repo_sync_beat() -> None: - scheduler = CursoredScheduler( - name="github_repo_sync", - schedule_key="github-repo-sync-beat", - queryset=OrganizationIntegration.objects.filter( - integration__provider__in=["github", "github_enterprise"], - integration__status=ObjectStatus.ACTIVE, - status=ObjectStatus.ACTIVE, - ), - task=sync_repos_for_org, - cycle_duration=timedelta(hours=24), - ) - scheduler.tick() +__all__ = [ + "github_repo_sync_beat", + "scm_repo_sync_beat", + "sync_repos_for_org", +] diff --git a/src/sentry/integrations/source_code_management/sync_repos.py b/src/sentry/integrations/source_code_management/sync_repos.py new file mode 100644 index 00000000000000..f51d490c85cef8 --- /dev/null +++ b/src/sentry/integrations/source_code_management/sync_repos.py @@ -0,0 +1,256 @@ +""" +Periodic repo sync for SCM integrations. + +The beat task (`scm_repo_sync_beat`) runs on a schedule and uses +CursoredScheduler to iterate over all active SCM OrganizationIntegrations. +For each one, it dispatches `sync_repos_for_org` which diffs the provider's +repo list against Sentry's Repository table and creates/disables/re-enables +as needed. +""" + +import logging +from datetime import timedelta + +from taskbroker_client.retry import Retry + +from sentry import features +from sentry.constants import ObjectStatus +from sentry.integrations.github.tasks.link_all_repos import get_repo_config +from sentry.integrations.models.organization_integration import OrganizationIntegration +from sentry.integrations.services.integration import integration_service +from sentry.integrations.services.repository.service import repository_service +from sentry.integrations.source_code_management.metrics import ( + SCMIntegrationInteractionEvent, + SCMIntegrationInteractionType, +) +from sentry.integrations.source_code_management.repo_audit import log_repo_change +from sentry.organizations.services.organization import organization_service +from sentry.plugins.providers.integration_repository import get_integration_repository_provider +from sentry.shared_integrations.exceptions import ApiError +from sentry.silo.base import SiloMode +from sentry.tasks.base import instrumented_task, retry +from sentry.taskworker.namespaces import integrations_control_tasks +from sentry.utils import metrics +from sentry.utils.cursored_scheduler import CursoredScheduler + +logger = logging.getLogger(__name__) + + +@instrumented_task( + name="sentry.integrations.source_code_management.sync_repos.sync_repos_for_org", + namespace=integrations_control_tasks, + retry=Retry(times=3, delay=120), + processing_deadline_duration=120, + silo_mode=SiloMode.CONTROL, +) +@retry() +def sync_repos_for_org(organization_integration_id: int) -> None: + """ + Sync repositories for a single OrganizationIntegration. + + Fetches all repos from GitHub, diffs against Sentry's Repository table, + and creates/disables/re-enables repos as needed. + """ + try: + oi = OrganizationIntegration.objects.get( + id=organization_integration_id, + status=ObjectStatus.ACTIVE, + ) + except OrganizationIntegration.DoesNotExist: + logger.info( + "sync_repos_for_org.missing_org_integration", + extra={"organization_integration_id": organization_integration_id}, + ) + return + + integration = integration_service.get_integration( + integration_id=oi.integration_id, status=ObjectStatus.ACTIVE + ) + if integration is None: + logger.info( + "sync_repos_for_org.missing_integration", + extra={"integration_id": oi.integration_id}, + ) + return + + organization_id = oi.organization_id + org_context = organization_service.get_organization_by_id( + id=organization_id, include_projects=False, include_teams=False + ) + if org_context is None: + logger.info( + "sync_repos_for_org.missing_organization", + extra={"organization_id": organization_id}, + ) + return + + rpc_org = org_context.organization + if not features.has("organizations:github-repo-auto-sync", rpc_org): + return + + provider = f"integrations:{integration.provider}" + dry_run = not features.has("organizations:github-repo-auto-sync-apply", rpc_org) + + with SCMIntegrationInteractionEvent( + interaction_type=SCMIntegrationInteractionType.SYNC_REPOS, + integration_id=integration.id, + organization_id=organization_id, + provider_key=integration.provider, + ).capture(): + installation = integration.get_installation(organization_id=organization_id) + client = installation.get_client() + + try: + github_repos = client.get_repos() + except ApiError as e: + if installation.is_rate_limited_error(e): + logger.info( + "sync_repos_for_org.rate_limited", + extra={ + "integration_id": integration.id, + "organization_id": organization_id, + }, + ) + raise + + github_external_ids = {str(repo["id"]) for repo in github_repos} + + all_repos = repository_service.get_repositories( + organization_id=organization_id, + integration_id=integration.id, + providers=[provider], + ) + active_repos = [r for r in all_repos if r.status == ObjectStatus.ACTIVE and r.external_id] + disabled_repos = [ + r for r in all_repos if r.status == ObjectStatus.DISABLED and r.external_id + ] + + sentry_active_ids = {r.external_id for r in active_repos} + sentry_disabled_ids = {r.external_id for r in disabled_repos} + + new_ids = github_external_ids - sentry_active_ids - sentry_disabled_ids + removed_ids = sentry_active_ids - github_external_ids + restored_ids = sentry_disabled_ids & github_external_ids + + metric_tags = { + "provider": integration.provider, + "dry_run": str(dry_run), + } + metrics.distribution("scm.repo_sync.new_repos", len(new_ids), tags=metric_tags) + metrics.distribution("scm.repo_sync.removed_repos", len(removed_ids), tags=metric_tags) + metrics.distribution("scm.repo_sync.restored_repos", len(restored_ids), tags=metric_tags) + metrics.distribution( + "scm.repo_sync.provider_total", len(github_external_ids), tags=metric_tags + ) + metrics.distribution( + "scm.repo_sync.sentry_active", len(sentry_active_ids), tags=metric_tags + ) + metrics.distribution( + "scm.repo_sync.sentry_disabled", len(sentry_disabled_ids), tags=metric_tags + ) + + if new_ids or removed_ids or restored_ids: + logger.info( + "scm.repo_sync.diff", + extra={ + "provider": integration.provider, + "integration_id": integration.id, + "organization_id": organization_id, + "dry_run": dry_run, + "provider_total": len(github_external_ids), + "sentry_active": len(sentry_active_ids), + "sentry_disabled": len(sentry_disabled_ids), + "new": len(new_ids), + "removed": len(removed_ids), + "restored": len(restored_ids), + }, + ) + + if dry_run: + return + + repo_by_external_id = {r.external_id: r for r in active_repos + disabled_repos} + + if new_ids: + integration_repo_provider = get_integration_repository_provider(integration) + repo_configs = [ + get_repo_config(repo, integration.id) + for repo in github_repos + if str(repo["id"]) in new_ids + ] + if repo_configs: + created_repos, reactivated_repos, _ = integration_repo_provider.create_repositories( + configs=repo_configs, organization=rpc_org + ) + + for repo in created_repos: + log_repo_change( + event_name="REPO_ADDED", + organization_id=organization_id, + repo=repo, + source="automatic SCM syncing", + provider=integration.provider, + ) + + for repo in reactivated_repos: + log_repo_change( + event_name="REPO_ENABLED", + organization_id=organization_id, + repo=repo, + source="automatic SCM syncing", + provider=integration.provider, + ) + + if removed_ids: + repository_service.disable_repositories_by_external_ids( + organization_id=organization_id, + integration_id=integration.id, + provider=provider, + external_ids=list(removed_ids), + ) + + for eid in removed_ids: + removed_repo = repo_by_external_id.get(eid) + if removed_repo: + log_repo_change( + event_name="REPO_DISABLED", + organization_id=organization_id, + repo=removed_repo, + source="automatic SCM syncing", + provider=integration.provider, + ) + + if restored_ids: + for repo in disabled_repos: + if repo.external_id in restored_ids: + repo.status = ObjectStatus.ACTIVE + repository_service.update_repository( + organization_id=organization_id, update=repo + ) + log_repo_change( + event_name="REPO_ENABLED", + organization_id=organization_id, + repo=repo, + source="automatic SCM syncing", + provider=integration.provider, + ) + + +@instrumented_task( + name="sentry.integrations.source_code_management.sync_repos.scm_repo_sync_beat", + namespace=integrations_control_tasks, + silo_mode=SiloMode.CONTROL, +) +def scm_repo_sync_beat() -> None: + scheduler = CursoredScheduler( + name="scm_repo_sync", + schedule_key="scm-repo-sync-beat", + queryset=OrganizationIntegration.objects.filter( + integration__provider__in=["github", "github_enterprise"], + integration__status=ObjectStatus.ACTIVE, + status=ObjectStatus.ACTIVE, + ), + task=sync_repos_for_org, + cycle_duration=timedelta(hours=24), + ) + scheduler.tick() diff --git a/tests/sentry/integrations/github/tasks/test_sync_repos.py b/tests/sentry/integrations/source_code_management/test_sync_repos.py similarity index 96% rename from tests/sentry/integrations/github/tasks/test_sync_repos.py rename to tests/sentry/integrations/source_code_management/test_sync_repos.py index bb1f14f74fb51a..4471f72be2babc 100644 --- a/tests/sentry/integrations/github/tasks/test_sync_repos.py +++ b/tests/sentry/integrations/source_code_management/test_sync_repos.py @@ -7,9 +7,8 @@ from sentry import audit_log from sentry.constants import ObjectStatus from sentry.integrations.github.integration import GitHubIntegrationProvider -from sentry.integrations.github.tasks.sync_repos import sync_repos_for_org -from sentry.integrations.github_enterprise.integration import GitHubEnterpriseIntegrationProvider from sentry.integrations.models.organization_integration import OrganizationIntegration +from sentry.integrations.source_code_management.sync_repos import sync_repos_for_org from sentry.models.auditlogentry import AuditLogEntry from sentry.models.repository import Repository from sentry.silo.base import SiloMode @@ -24,13 +23,13 @@ class SyncReposForOrgTestCase(IntegrationTestCase): base_url = "https://api.github.com" key = "github" - def setUp(self): + def setUp(self) -> None: super().setUp() self.oi = OrganizationIntegration.objects.get( organization_id=self.organization.id, integration=self.integration ) - def _add_repos_response(self, repos): + def _add_repos_response(self, repos: list[dict[str, object]]) -> None: responses.add( responses.GET, self.base_url + "/installation/repositories?per_page=100", @@ -225,6 +224,10 @@ def test_rate_limited_raises_for_retry(self, _: MagicMock) -> None: class SyncReposForOrgGHETestCase(TestCase): @patch("sentry.integrations.github.client.GitHubBaseClient.get_repos") def test_creates_new_repos_for_ghe(self, mock_get_repos: MagicMock) -> None: + from sentry.integrations.github_enterprise.integration import ( + GitHubEnterpriseIntegrationProvider, + ) + GitHubEnterpriseIntegrationProvider().setup() integration = self.create_integration(