From 24efa26714a7c48cf8d2b422503239dbdd161f0c Mon Sep 17 00:00:00 2001 From: Furkan Goktas Date: Fri, 15 May 2026 13:59:56 -0400 Subject: [PATCH 01/23] Add S3 credentials and download helpers (#783) --- .../convert_observations/tempo_no2_tropo.yaml | 22 ++ .../tempo_no2_tropo.yaml | 33 ++ src/swell/tasks/download_obs.py | 347 +++++++++++++++++- 3 files changed, 390 insertions(+), 12 deletions(-) create mode 100644 src/swell/configuration/jedi/interfaces/geos_cf/convert_observations/tempo_no2_tropo.yaml create mode 100644 src/swell/configuration/jedi/interfaces/geos_cf/download_observations/tempo_no2_tropo.yaml diff --git a/src/swell/configuration/jedi/interfaces/geos_cf/convert_observations/tempo_no2_tropo.yaml b/src/swell/configuration/jedi/interfaces/geos_cf/convert_observations/tempo_no2_tropo.yaml new file mode 100644 index 000000000..62021f37d --- /dev/null +++ b/src/swell/configuration/jedi/interfaces/geos_cf/convert_observations/tempo_no2_tropo.yaml @@ -0,0 +1,22 @@ +# TEMPO NO2 Tropospheric Column L2 — IODA converter configuration +# Converter: tempo_nc2ioda.py (must be available in `converter_path` or +# in jedi_bundle/build/bin/) +# +# Invocation (one call per cycle, all granules passed together): +# python3 /build/bin/tempo_nc2ioda.py +# -i ... +# -o /ioda/tempo_no2_tropo/tempo_no2_tropo_YYYYMMDDHH.nc +# -c troposphere +# -v no2 + +# Name of the ioda-converters Python script. +converter_script: tempo_nc2ioda.py + +# Output filename template (must match the source pattern in +# ingest_observations/tempo_no2_tropo.yaml once that is created). +output_filename_template: "tempo_no2_tropo_%Y%m%d%H.nc" + +# Additional flags passed verbatim to the converter after -i and -o. +extra_flags: + -c: troposphere # retrieval layer: troposphere column + -v: no2 # variable: nitrogen dioxide diff --git a/src/swell/configuration/jedi/interfaces/geos_cf/download_observations/tempo_no2_tropo.yaml b/src/swell/configuration/jedi/interfaces/geos_cf/download_observations/tempo_no2_tropo.yaml new file mode 100644 index 000000000..51771dff4 --- /dev/null +++ b/src/swell/configuration/jedi/interfaces/geos_cf/download_observations/tempo_no2_tropo.yaml @@ -0,0 +1,33 @@ +# TEMPO NO2 Tropospheric Column L2 — download configuration +# Instrument: TEMPO (Tropospheric Emissions: Monitoring of Pollution) +# Product: TEMPO_NO2_L2 V03 (NRT) +# Source: NASA ASDC S3 bucket via Earthdata Cumulus +# (Earthdata authentication required in ~/.netrc for +# urs.earthdata.nasa.gov) +# +# Files are named: +# TEMPO_NO2_L2_V03_YYYYMMDDTHHMMSSz_YYYYMMDDTHHMMSSz_SG.nc +# where field 4 (0-based, split on '_') is the granule start timestamp. +# +# Available versions: +# V03 (NRT): s3://asdc-prod-protected/TEMPO/TEMPO_NO2_L2_V03/YYYY.MM.DD +# V04 (current): s3://asdc-prod-protected/TEMPO/TEMPO_NO2_L2_V04/YYYY.MM.DD +# Change s3_source below to switch versions. + +retrieval_method: s3_secure + +# S3 URI template — YYYY, MM, DD are resolved at runtime. +s3_source: "s3://asdc-prod-protected/TEMPO/TEMPO_NO2_L2_V03/YYYY.MM.DD" + +# Index (0-based) of the start-time token in the filename when split on '_'. +# TEMPO_NO2_L2_V03_20231015T180000Z_... → field 4 = '20231015T180000Z' +filename_datetime_field: 4 + +# strptime format for the start-time token above. +filename_datetime_format: "%Y%m%dT%H%M%SZ" + +# Maximum duration of a single TEMPO scan granule. Used to extend the file +# search window backwards so that granules starting before window_begin but +# containing data within the DA window are not missed. +# TEMPO scans roughly one hemisphere per hour; PT2H provides a safe margin. +max_orbit_duration: PT2H diff --git a/src/swell/tasks/download_obs.py b/src/swell/tasks/download_obs.py index a315baf8a..ae0e74d16 100644 --- a/src/swell/tasks/download_obs.py +++ b/src/swell/tasks/download_obs.py @@ -7,30 +7,53 @@ """ Task for downloading raw observation files from remote servers. -Downloads native observation files (e.g. HDF5) from HTTPS servers such as -NASA GES DISC prior to ingestion into R2D2. Authentication is handled via -~/.netrc (same mechanism used by wget/curl). +Downloads native observation files (e.g. HDF5, NetCDF) from either HTTPS +servers such as NASA GES DISC, or from S3 buckets via the NASA Earthdata +Cumulus distribution service. + +HTTPS authentication is handled via ~/.netrc (same mechanism used by +wget/curl). + +S3 (``retrieval_method: s3_secure``) authentication is done by exchanging +Earthdata credentials (read from ``~/.netrc`` for +``urs.earthdata.nasa.gov``) for temporary AWS credentials via the NASA ASDC +Cumulus S3 distribution endpoint. Requires ``boto3`` to be installed. """ +import base64 +import datetime +import netrc import os import re import yaml import isodate -import datetime import requests from swell.tasks.base.task_base import taskBase class DownloadObs(taskBase): - """Download raw observation files from a remote HTTPS server. + """Download raw observation files from a remote server. For each observation in ``obs_to_download``, this task reads a per-obs YAML from ``download_observations/.yaml`` in the model's - configuration directory and downloads all files whose start time - falls within the DA window (extended backwards by ``max_orbit_duration`` - to capture orbits that started before the window but contain data inside - it). + configuration directory. + + Two retrieval methods are supported: + + ``https`` (default) + Scrapes an HTML directory listing from ``remote_host`` + + ``remote_path_template``, matches filenames against + ``filename_pattern``, and streams files via HTTPS. + Authentication uses ``~/.netrc``. + + ``s3_secure`` + Downloads files from an S3 bucket path given by ``s3_source`` + (e.g. ``s3://asdc-prod-protected/TEMPO/TEMPO_NO2_L2_V03/YYYY.MM.DD``). + Temporary AWS credentials are obtained automatically via the NASA + Earthdata Cumulus distribution endpoint using the Earthdata login + stored in ``~/.netrc`` for ``urs.earthdata.nasa.gov``. + Requires ``boto3``. Raw obs files are placed in ``/download//``. @@ -40,8 +63,8 @@ class DownloadObs(taskBase): Args: config: Inherited from ``taskBase``. Relevant keys: - - ``obs_to_download``: list of YAML filenames with names matching the obs name - in ``download_observations/``. + - ``obs_to_download``: list of obs names whose YAML files are + present under ``download_observations/``. - ``window_length``: ISO-8601 duration (e.g. ``"PT6H"``). - ``dry_run``: if ``True``, skip actual downloads. @@ -109,10 +132,20 @@ def _download_obs( window_end_dto: datetime.datetime, dry_run: bool, ) -> tuple[int, int]: - """Download all files for one observation type. + """Dispatch to the correct retrieval method and download all files + for one observation type. Returns ``(n_downloaded, n_failed)``. """ + retrieval_method = obs_config.get('retrieval_method', 'https') + + if retrieval_method == 's3_secure': + return self._download_obs_s3_secure( + obs_config, obs_name, window_begin_dto, window_end_dto, dry_run) + + # ------------------------------------------------------------------ + # Default: HTTPS directory listing + # ------------------------------------------------------------------ remote_host = obs_config['remote_host'] remote_path_template = obs_config['remote_path_template'] filename_pattern = obs_config['filename_pattern'] @@ -181,6 +214,263 @@ def _download_obs( return downloaded, failed + def _download_obs_s3_secure( + self, + obs_config: dict, + obs_name: str, + window_begin_dto: datetime.datetime, + window_end_dto: datetime.datetime, + dry_run: bool, + ) -> tuple[int, int]: + """Download files for one observation type from an S3 bucket using + temporary credentials obtained via the NASA Earthdata Cumulus + distribution endpoint. + + The ``obs_config`` dict must contain: + + - ``s3_source``: S3 URI template with ``YYYY``, ``MM``, ``DD`` + placeholders, e.g. + ``s3://asdc-prod-protected/TEMPO/TEMPO_NO2_L2_V03/YYYY.MM.DD``. + + Optional keys: + + - ``max_orbit_duration``: ISO-8601 duration; extends the search + window backwards (default ``PT0H``). + - ``filename_datetime_field``: 0-based index of the start-time + field when the filename is split on ``_`` (default ``4``, + matching the TEMPO L2 naming convention). + - ``filename_datetime_format``: strptime format string for that + field (default ``%Y%m%dT%H%M%SZ``). + + Returns ``(n_downloaded, n_failed)``. + """ + try: + import boto3 + from botocore.exceptions import BotoCoreError, ClientError + except ImportError: + self.logger.abort( + "boto3 is required for 's3_secure' retrieval but is not installed. " + "Install it with: pip install boto3") + + s3_source_template = obs_config['s3_source'] + max_orbit_dur = isodate.parse_duration( + obs_config.get('max_orbit_duration', 'PT0H')) + + # Index (0-based) of the start-time token when the filename is + # split on '_'. TEMPO L2 filenames look like: + # TEMPO_NO2_L2_V03_20231015T180000Z_20231015T190000Z_S001G01.nc + # ^^^^ field 4 = start time + datetime_field = obs_config.get('filename_datetime_field', 4) + datetime_fmt = obs_config.get('filename_datetime_format', '%Y%m%dT%H%M%SZ') + + # Extend window backwards for long-orbit instruments. + search_start = window_begin_dto - max_orbit_dur + search_end = window_end_dto + + # Make both bounds UTC-aware for comparison against parsed timestamps. + utc = datetime.timezone.utc + if search_start.tzinfo is None: + search_start = search_start.replace(tzinfo=utc) + if window_begin_dto.tzinfo is None: + window_begin_dto = window_begin_dto.replace(tzinfo=utc) + if window_end_dto.tzinfo is None: + window_end_dto = window_end_dto.replace(tzinfo=utc) + search_end = window_end_dto + + dest_dir = os.path.join(self.cycle_dir(), 'download', obs_name) + if not dry_run: + os.makedirs(dest_dir, exist_ok=True) + + # Parse the S3 URI: s3://bucket/prefix/template + without_scheme = s3_source_template[len('s3://'):] + bucket, _, prefix_template = without_scheme.partition('/') + + if dry_run: + for day_date in self._day_slots(search_start, search_end): + prefix = self._resolve_s3_prefix(prefix_template, day_date) + self.logger.info( + f' [DRY RUN] Would list s3://{bucket}/{prefix}') + return 0, 0 + + # Obtain temporary AWS credentials via Earthdata Cumulus. + try: + creds = self._get_earthdata_s3_credentials() + except Exception as exc: + self.logger.abort(f'Failed to obtain Earthdata S3 credentials: {exc}') + + s3_client = boto3.client( + 's3', + aws_access_key_id=creds['accessKeyId'], + aws_secret_access_key=creds['secretAccessKey'], + aws_session_token=creds['sessionToken'], + region_name='us-west-2', + ) + + downloaded = 0 + failed = 0 + + for day_date in self._day_slots(search_start, search_end): + prefix = self._resolve_s3_prefix(prefix_template, day_date) + self.logger.info( + f' Listing s3://{bucket}/{prefix}') + + try: + paginator = s3_client.get_paginator('list_objects_v2') + pages = paginator.paginate(Bucket=bucket, Prefix=prefix) + except (BotoCoreError, ClientError) as exc: + self.logger.error( + f' Failed to list s3://{bucket}/{prefix}: {exc}') + failed += 1 + continue + + for page in pages: + for obj in page.get('Contents', []): + key = obj['Key'] + filename = os.path.basename(key) + + # Skip sidecar/metadata files. + if filename.endswith(('.met', '.dmrpp')): + continue + + # Parse the granule start time from the filename. + try: + parts = filename.split('_') + ts_str = parts[datetime_field] + # Ensure the format string ends with Z and the + # token also ends with Z (strip trailing chars if any). + if not ts_str.endswith('Z'): + ts_str = ts_str[:15] + 'Z' + file_dt = datetime.datetime.strptime( + ts_str, datetime_fmt).replace(tzinfo=utc) + except (IndexError, ValueError) as exc: + self.logger.warning( + f' Could not parse timestamp from {filename}: {exc}') + continue + + # Include files whose granule start time falls within + # the extended search window (search_start..window_end). + if not (search_start <= file_dt <= search_end): + continue + + dest_path = os.path.join(dest_dir, filename) + if os.path.exists(dest_path): + self.logger.info( + f' Already exists, skipping: {filename}') + downloaded += 1 + continue + + try: + s3_client.download_file(bucket, key, dest_path) + self.logger.info(f' Downloaded: {filename}') + downloaded += 1 + except (BotoCoreError, ClientError) as exc: + self.logger.error( + f' Failed to download {filename}: {exc}') + failed += 1 + + return downloaded, failed + + def _get_earthdata_s3_credentials(self) -> dict: + """Obtain temporary AWS credentials via the NASA Earthdata Cumulus + S3 distribution endpoint. + + Reads Earthdata username and password from ``~/.netrc`` for + ``urs.earthdata.nasa.gov`` and performs the OAuth redirect chain + against the ASDC Cumulus distribution URL, returning a dict with + keys ``accessKeyId``, ``secretAccessKey``, ``sessionToken``, and + ``expiration``. + + The credentials are valid for **1 hour** (AWS STS role-chaining + limit). For DA cycles whose download phase takes longer than 1 + hour a new call to this method will be required; callers should + catch ``botocore.exceptions.ClientError`` with error code + ``ExpiredTokenException`` and re-invoke this method. + + **In-region access only**: the ASDC S3 bucket resides in + ``us-west-2``. Direct S3 access only works from compute running + in the same AWS region. + + Raises: + ValueError: if ``~/.netrc`` has no entry for Earthdata or if + the credential exchange fails. + + Reference: + https://data.asdc.earthdata.nasa.gov/s3credentialsREADME + """ + DISTRIBUTION_URL = 'https://data.asdc.earthdata.nasa.gov' + CREDENTIALS_URL = f'{DISTRIBUTION_URL}/s3credentials' + EARTHDATA_HOST = 'urs.earthdata.nasa.gov' + + # Read username/password from ~/.netrc. + try: + nrc = netrc.netrc() + auth_info = nrc.authenticators(EARTHDATA_HOST) + except FileNotFoundError: + raise ValueError('~/.netrc not found. Please create it with ' + f'credentials for {EARTHDATA_HOST}.') + if auth_info is None: + raise ValueError( + f'No credentials found for {EARTHDATA_HOST} in ~/.netrc. ' + 'Add a line: machine urs.earthdata.nasa.gov login password ') + + username, _, password = auth_info + auth_encoded = base64.b64encode( + f'{username}:{password}'.encode()).decode() + + # Step 1 — GET the credentials URL; Cumulus redirects to the + # Earthdata URS authorization endpoint. + login_resp = requests.get(CREDENTIALS_URL, allow_redirects=False, timeout=30) + login_resp.raise_for_status() + authorize_url = login_resp.headers.get('location', '').strip() + if not authorize_url: + raise ValueError( + 'No redirect received from Cumulus credentials endpoint. ' + f'Response status: {login_resp.status_code}') + + # Step 2 — POST base64-encoded credentials to URS to get a + # grant-code redirect. + auth_redirect = requests.post( + authorize_url, + data={'credentials': auth_encoded}, + headers={'Origin': DISTRIBUTION_URL}, + allow_redirects=False, + timeout=30, + ) + auth_redirect.raise_for_status() + redirect_url = auth_redirect.headers.get('location', '').strip() + if not redirect_url: + raise ValueError( + 'No redirect received after posting Earthdata credentials. ' + f'Response status: {auth_redirect.status_code}') + + # Step 3 — Follow the grant-code redirect to obtain the + # accessToken cookie (do NOT follow further redirects). + final = requests.get(redirect_url, allow_redirects=False, timeout=30) + + # Step 4 — Re-request the credentials URL passing the accessToken + # cookie; Cumulus returns JSON with temporary AWS keys. + results = requests.get( + CREDENTIALS_URL, + cookies={'accessToken': final.cookies['accessToken']}, + timeout=30, + ) + results.raise_for_status() + creds = results.json() + + required_keys = {'accessKeyId', 'secretAccessKey', 'sessionToken'} + if not required_keys.issubset(creds): + raise ValueError( + f'Unexpected credentials response (missing keys): {creds}') + + self.logger.info( + f'Obtained temporary S3 credentials from Earthdata Cumulus ' + f'(expires: {creds.get("expiration", "unknown")})') + return creds + + # ------------------------------------------------------------------ + # Slot/date helpers + # ------------------------------------------------------------------ + def _hour_slots( self, search_start: datetime.datetime, @@ -194,6 +484,24 @@ def _hour_slots( current += datetime.timedelta(hours=1) return slots + def _day_slots( + self, + search_start: datetime.datetime, + search_end: datetime.datetime, + ) -> list[datetime.date]: + """Return a list of unique date objects from search_start to search_end.""" + days = [] + current = search_start.date() + end_date = search_end.date() + while current <= end_date: + days.append(current) + current += datetime.timedelta(days=1) + return days + + # ------------------------------------------------------------------ + # Template resolution helpers + # ------------------------------------------------------------------ + def _resolve_path(self, template: str, date: datetime.date) -> str: """Substitute YYYY, MM, DD, JJJ placeholders in a path template.""" day_of_year = date.timetuple().tm_yday @@ -213,6 +521,21 @@ def _resolve_filename(self, template: str, date: datetime.date, hour: int) -> st .replace('JJJ', f'{day_of_year:03d}') .replace('HH', f'{hour:02d}')) + def _resolve_s3_prefix(self, template: str, date: datetime.date) -> str: + """Substitute YYYY, MM, DD in an S3 prefix template. + + Handles both slash-separated (``YYYY/MM/DD``) and dot-separated + (``YYYY.MM.DD``) date formats that appear in NASA ASDC S3 paths. + """ + return (template + .replace('YYYY', f'{date.year:04d}') + .replace('MM', f'{date.month:02d}') + .replace('DD', f'{date.day:02d}')) + + # ------------------------------------------------------------------ + # HTTPS helpers + # ------------------------------------------------------------------ + def _list_remote_dir(self, session: requests.Session, url: str) -> list[str]: """Return filenames found in an HTML directory listing at ``url``.""" response = session.get(url, timeout=(5, 30)) From 9e459dead0ffeb76df5b82216946c8a23cd53aa7 Mon Sep 17 00:00:00 2001 From: Furkan Goktas Date: Fri, 15 May 2026 14:35:03 -0400 Subject: [PATCH 02/23] Fix S3 session cookies (#783) --- pyproject.toml | 2 ++ src/swell/tasks/download_obs.py | 47 ++++++++++++++++++--------------- 2 files changed, 27 insertions(+), 22 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 759952ddc..413d03530 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,6 +39,8 @@ dependencies = [ "pycodestyle>=2.11.0", "numpy<2", "pandas>=1.4.0", + "requests>=2.28.0", + "boto3>=1.26.0", "isodate>=0.5.4", "f90nml>=1.4.3", "questionary>=1.10.0", diff --git a/src/swell/tasks/download_obs.py b/src/swell/tasks/download_obs.py index ae0e74d16..c98dfcdc9 100644 --- a/src/swell/tasks/download_obs.py +++ b/src/swell/tasks/download_obs.py @@ -417,45 +417,48 @@ def _get_earthdata_s3_credentials(self) -> dict: auth_encoded = base64.b64encode( f'{username}:{password}'.encode()).decode() + # Use a single session so cookies are shared across all steps, + # matching the behaviour of curl's --cookie-jar (-c/-b) flags in + # the reference ewok bash script. + session = requests.Session() + # Step 1 — GET the credentials URL; Cumulus redirects to the # Earthdata URS authorization endpoint. - login_resp = requests.get(CREDENTIALS_URL, allow_redirects=False, timeout=30) - login_resp.raise_for_status() - authorize_url = login_resp.headers.get('location', '').strip() + r1 = session.get(CREDENTIALS_URL, allow_redirects=False, timeout=30) + r1.raise_for_status() + authorize_url = r1.headers.get('location', '').strip() if not authorize_url: raise ValueError( 'No redirect received from Cumulus credentials endpoint. ' - f'Response status: {login_resp.status_code}') + f'Response status: {r1.status_code}') # Step 2 — POST base64-encoded credentials to URS to get a # grant-code redirect. - auth_redirect = requests.post( + r2 = session.post( authorize_url, data={'credentials': auth_encoded}, headers={'Origin': DISTRIBUTION_URL}, allow_redirects=False, timeout=30, ) - auth_redirect.raise_for_status() - redirect_url = auth_redirect.headers.get('location', '').strip() + r2.raise_for_status() + redirect_url = r2.headers.get('location', '').strip() if not redirect_url: raise ValueError( 'No redirect received after posting Earthdata credentials. ' - f'Response status: {auth_redirect.status_code}') - - # Step 3 — Follow the grant-code redirect to obtain the - # accessToken cookie (do NOT follow further redirects). - final = requests.get(redirect_url, allow_redirects=False, timeout=30) - - # Step 4 — Re-request the credentials URL passing the accessToken - # cookie; Cumulus returns JSON with temporary AWS keys. - results = requests.get( - CREDENTIALS_URL, - cookies={'accessToken': final.cookies['accessToken']}, - timeout=30, - ) - results.raise_for_status() - creds = results.json() + f'Response status: {r2.status_code}') + + # Step 3 — Follow the full redirect chain from the grant-code URL. + # The session cookie jar captures the accessToken cookie + # regardless of which hop in the chain sets it. + session.get(redirect_url, allow_redirects=True, timeout=30) + + # Step 4 — Re-request the credentials URL; the session now carries + # the accessToken cookie and Cumulus returns JSON with + # temporary AWS keys. + r4 = session.get(CREDENTIALS_URL, timeout=30) + r4.raise_for_status() + creds = r4.json() required_keys = {'accessKeyId', 'secretAccessKey', 'sessionToken'} if not required_keys.issubset(creds): From d611e0c005ecf0b55fd73973c35ca7d3d482de18 Mon Sep 17 00:00:00 2001 From: Furkan Goktas Date: Fri, 15 May 2026 14:47:49 -0400 Subject: [PATCH 03/23] Update yaml --- .../tempo_no2_tropo.yaml | 29 +++++++------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/src/swell/configuration/jedi/interfaces/geos_cf/download_observations/tempo_no2_tropo.yaml b/src/swell/configuration/jedi/interfaces/geos_cf/download_observations/tempo_no2_tropo.yaml index 51771dff4..12060ba91 100644 --- a/src/swell/configuration/jedi/interfaces/geos_cf/download_observations/tempo_no2_tropo.yaml +++ b/src/swell/configuration/jedi/interfaces/geos_cf/download_observations/tempo_no2_tropo.yaml @@ -1,30 +1,23 @@ # TEMPO NO2 Tropospheric Column L2 — download configuration # Instrument: TEMPO (Tropospheric Emissions: Monitoring of Pollution) -# Product: TEMPO_NO2_L2 V03 (NRT) -# Source: NASA ASDC S3 bucket via Earthdata Cumulus +# Product: TEMPO_NO2_L2 V03 +# Source: NASA ASDC Cumulus HTTPS distribution endpoint # (Earthdata authentication required in ~/.netrc for # urs.earthdata.nasa.gov) # # Files are named: # TEMPO_NO2_L2_V03_YYYYMMDDTHHMMSSz_YYYYMMDDTHHMMSSz_SG.nc -# where field 4 (0-based, split on '_') is the granule start timestamp. +# The filename_pattern uses YYYYMMDDTHH to match files by granule start hour. # -# Available versions: -# V03 (NRT): s3://asdc-prod-protected/TEMPO/TEMPO_NO2_L2_V03/YYYY.MM.DD -# V04 (current): s3://asdc-prod-protected/TEMPO/TEMPO_NO2_L2_V04/YYYY.MM.DD -# Change s3_source below to switch versions. +# Note: the same data is also available via direct S3 access for workflows +# running inside AWS us-west-2 (in-region access only): +# retrieval_method: s3_secure +# s3_source: "s3://asdc-prod-protected/TEMPO/TEMPO_NO2_L2_V03/YYYY.MM.DD" +# The HTTPS endpoint below works from any network (e.g. Discover/HPC). -retrieval_method: s3_secure - -# S3 URI template — YYYY, MM, DD are resolved at runtime. -s3_source: "s3://asdc-prod-protected/TEMPO/TEMPO_NO2_L2_V03/YYYY.MM.DD" - -# Index (0-based) of the start-time token in the filename when split on '_'. -# TEMPO_NO2_L2_V03_20231015T180000Z_... → field 4 = '20231015T180000Z' -filename_datetime_field: 4 - -# strptime format for the start-time token above. -filename_datetime_format: "%Y%m%dT%H%M%SZ" +remote_host: https://data.asdc.earthdata.nasa.gov +remote_path_template: /TEMPO/TEMPO_NO2_L2_V03/YYYY.MM.DD/ +filename_pattern: "TEMPO_NO2_L2_V03_YYYYMMDDTHH*.nc" # Maximum duration of a single TEMPO scan granule. Used to extend the file # search window backwards so that granules starting before window_begin but From f9697cd972bade20204a02e05a886840e1feaffb Mon Sep 17 00:00:00 2001 From: Furkan Goktas Date: Fri, 15 May 2026 15:16:11 -0400 Subject: [PATCH 04/23] Use authenticated session in HTTPS path --- .../tempo_no2_tropo.yaml | 14 +-- src/swell/tasks/download_obs.py | 92 ++++++++++++------- 2 files changed, 66 insertions(+), 40 deletions(-) diff --git a/src/swell/configuration/jedi/interfaces/geos_cf/download_observations/tempo_no2_tropo.yaml b/src/swell/configuration/jedi/interfaces/geos_cf/download_observations/tempo_no2_tropo.yaml index 12060ba91..a1bfc9c79 100644 --- a/src/swell/configuration/jedi/interfaces/geos_cf/download_observations/tempo_no2_tropo.yaml +++ b/src/swell/configuration/jedi/interfaces/geos_cf/download_observations/tempo_no2_tropo.yaml @@ -9,18 +9,18 @@ # TEMPO_NO2_L2_V03_YYYYMMDDTHHMMSSz_YYYYMMDDTHHMMSSz_SG.nc # The filename_pattern uses YYYYMMDDTHH to match files by granule start hour. # -# Note: the same data is also available via direct S3 access for workflows -# running inside AWS us-west-2 (in-region access only): +# Note: for workflows running inside AWS us-west-2, direct S3 access is also +# available (faster, no directory listing required): # retrieval_method: s3_secure # s3_source: "s3://asdc-prod-protected/TEMPO/TEMPO_NO2_L2_V03/YYYY.MM.DD" -# The HTTPS endpoint below works from any network (e.g. Discover/HPC). remote_host: https://data.asdc.earthdata.nasa.gov remote_path_template: /TEMPO/TEMPO_NO2_L2_V03/YYYY.MM.DD/ filename_pattern: "TEMPO_NO2_L2_V03_YYYYMMDDTHH*.nc" -# Maximum duration of a single TEMPO scan granule. Used to extend the file -# search window backwards so that granules starting before window_begin but -# containing data within the DA window are not missed. -# TEMPO scans roughly one hemisphere per hour; PT2H provides a safe margin. +# Use the Earthdata OAuth session (accessToken cookie) instead of basic auth. +# Required for ASDC Cumulus endpoints which do not support ~/.netrc basic auth. +earthdata_auth: true + +# Maximum duration of a single TEMPO scan granule. max_orbit_duration: PT2H diff --git a/src/swell/tasks/download_obs.py b/src/swell/tasks/download_obs.py index c98dfcdc9..7a433fd50 100644 --- a/src/swell/tasks/download_obs.py +++ b/src/swell/tasks/download_obs.py @@ -163,8 +163,17 @@ def _download_obs( if not dry_run: os.makedirs(dest_dir, exist_ok=True) - # requests.Session uses ~/.netrc automatically for authentication. - session = requests.Session() + # requests.Session uses ~/.netrc automatically for basic-auth + # endpoints (e.g. GES DISC). For ASDC Cumulus endpoints that use + # OAuth, set earthdata_auth: true in the obs YAML to get a fully + # authenticated session with the accessToken cookie instead. + if obs_config.get('earthdata_auth', False): + try: + session = self._create_earthdata_session() + except Exception as exc: + self.logger.abort(f'Failed to create Earthdata session: {exc}') + else: + session = requests.Session() downloaded = 0 failed = 0 @@ -370,32 +379,19 @@ def _download_obs_s3_secure( return downloaded, failed - def _get_earthdata_s3_credentials(self) -> dict: - """Obtain temporary AWS credentials via the NASA Earthdata Cumulus - S3 distribution endpoint. + def _create_earthdata_session(self) -> requests.Session: + """Return an authenticated ``requests.Session`` for NASA Earthdata + protected HTTPS endpoints (e.g. ``data.asdc.earthdata.nasa.gov``). - Reads Earthdata username and password from ``~/.netrc`` for - ``urs.earthdata.nasa.gov`` and performs the OAuth redirect chain - against the ASDC Cumulus distribution URL, returning a dict with - keys ``accessKeyId``, ``secretAccessKey``, ``sessionToken``, and - ``expiration``. - - The credentials are valid for **1 hour** (AWS STS role-chaining - limit). For DA cycles whose download phase takes longer than 1 - hour a new call to this method will be required; callers should - catch ``botocore.exceptions.ClientError`` with error code - ``ExpiredTokenException`` and re-invoke this method. - - **In-region access only**: the ASDC S3 bucket resides in - ``us-west-2``. Direct S3 access only works from compute running - in the same AWS region. + Reads Earthdata credentials from ``~/.netrc`` for + ``urs.earthdata.nasa.gov`` and performs the four-step Cumulus OAuth + redirect chain, leaving the session's cookie jar populated with the + ``accessToken`` cookie that ASDC requires for both HTTPS file access + and the ``/s3credentials`` endpoint. Raises: ValueError: if ``~/.netrc`` has no entry for Earthdata or if the credential exchange fails. - - Reference: - https://data.asdc.earthdata.nasa.gov/s3credentialsREADME """ DISTRIBUTION_URL = 'https://data.asdc.earthdata.nasa.gov' CREDENTIALS_URL = f'{DISTRIBUTION_URL}/s3credentials' @@ -417,13 +413,9 @@ def _get_earthdata_s3_credentials(self) -> dict: auth_encoded = base64.b64encode( f'{username}:{password}'.encode()).decode() - # Use a single session so cookies are shared across all steps, - # matching the behaviour of curl's --cookie-jar (-c/-b) flags in - # the reference ewok bash script. session = requests.Session() - # Step 1 — GET the credentials URL; Cumulus redirects to the - # Earthdata URS authorization endpoint. + # Step 1 — GET the credentials URL; Cumulus redirects to URS. r1 = session.get(CREDENTIALS_URL, allow_redirects=False, timeout=30) r1.raise_for_status() authorize_url = r1.headers.get('location', '').strip() @@ -432,8 +424,7 @@ def _get_earthdata_s3_credentials(self) -> dict: 'No redirect received from Cumulus credentials endpoint. ' f'Response status: {r1.status_code}') - # Step 2 — POST base64-encoded credentials to URS to get a - # grant-code redirect. + # Step 2 — POST credentials to URS to get a grant-code redirect. r2 = session.post( authorize_url, data={'credentials': auth_encoded}, @@ -448,11 +439,46 @@ def _get_earthdata_s3_credentials(self) -> dict: 'No redirect received after posting Earthdata credentials. ' f'Response status: {r2.status_code}') - # Step 3 — Follow the full redirect chain from the grant-code URL. - # The session cookie jar captures the accessToken cookie - # regardless of which hop in the chain sets it. + # Step 3 — Follow the full redirect chain; session captures the + # accessToken cookie regardless of which hop sets it. session.get(redirect_url, allow_redirects=True, timeout=30) + self.logger.info('Obtained Earthdata session (accessToken cookie set)') + return session + + def _get_earthdata_s3_credentials(self) -> dict: + """Obtain temporary AWS credentials via the NASA Earthdata Cumulus + S3 distribution endpoint. + + Reads Earthdata username and password from ``~/.netrc`` for + ``urs.earthdata.nasa.gov`` and performs the OAuth redirect chain + against the ASDC Cumulus distribution URL, returning a dict with + keys ``accessKeyId``, ``secretAccessKey``, ``sessionToken``, and + ``expiration``. + + The credentials are valid for **1 hour** (AWS STS role-chaining + limit). For DA cycles whose download phase takes longer than 1 + hour a new call to this method will be required; callers should + catch ``botocore.exceptions.ClientError`` with error code + ``ExpiredTokenException`` and re-invoke this method. + + **In-region access only**: the ASDC S3 bucket resides in + ``us-west-2``. Direct S3 access only works from compute running + in the same AWS region. + + Raises: + ValueError: if ``~/.netrc`` has no entry for Earthdata or if + the credential exchange fails. + + Reference: + https://data.asdc.earthdata.nasa.gov/s3credentialsREADME + """ + CREDENTIALS_URL = 'https://data.asdc.earthdata.nasa.gov/s3credentials' + + # Reuse the shared OAuth exchange — session already has the + # accessToken cookie after this call. + session = self._create_earthdata_session() + # Step 4 — Re-request the credentials URL; the session now carries # the accessToken cookie and Cumulus returns JSON with # temporary AWS keys. From 1610032ed738871dc5bee9608f299c67ac6c5430 Mon Sep 17 00:00:00 2001 From: Furkan Goktas Date: Fri, 15 May 2026 15:29:30 -0400 Subject: [PATCH 05/23] Try cmr method --- .../tempo_no2_tropo.yaml | 23 ++-- src/swell/tasks/download_obs.py | 127 ++++++++++++++++++ 2 files changed, 137 insertions(+), 13 deletions(-) diff --git a/src/swell/configuration/jedi/interfaces/geos_cf/download_observations/tempo_no2_tropo.yaml b/src/swell/configuration/jedi/interfaces/geos_cf/download_observations/tempo_no2_tropo.yaml index a1bfc9c79..75cd205a5 100644 --- a/src/swell/configuration/jedi/interfaces/geos_cf/download_observations/tempo_no2_tropo.yaml +++ b/src/swell/configuration/jedi/interfaces/geos_cf/download_observations/tempo_no2_tropo.yaml @@ -1,26 +1,23 @@ # TEMPO NO2 Tropospheric Column L2 — download configuration # Instrument: TEMPO (Tropospheric Emissions: Monitoring of Pollution) # Product: TEMPO_NO2_L2 V03 -# Source: NASA ASDC Cumulus HTTPS distribution endpoint +# Source: NASA CMR search API + ASDC Cumulus HTTPS download # (Earthdata authentication required in ~/.netrc for # urs.earthdata.nasa.gov) # -# Files are named: -# TEMPO_NO2_L2_V03_YYYYMMDDTHHMMSSz_YYYYMMDDTHHMMSSz_SG.nc -# The filename_pattern uses YYYYMMDDTHH to match files by granule start hour. +# Files are discovered via the NASA CMR API and downloaded from: +# https://data.asdc.earthdata.nasa.gov/TEMPO/TEMPO_NO2_L2_V03/... # # Note: for workflows running inside AWS us-west-2, direct S3 access is also -# available (faster, no directory listing required): +# available (faster, no CMR query required): # retrieval_method: s3_secure # s3_source: "s3://asdc-prod-protected/TEMPO/TEMPO_NO2_L2_V03/YYYY.MM.DD" -remote_host: https://data.asdc.earthdata.nasa.gov -remote_path_template: /TEMPO/TEMPO_NO2_L2_V03/YYYY.MM.DD/ -filename_pattern: "TEMPO_NO2_L2_V03_YYYYMMDDTHH*.nc" +retrieval_method: cmr +cmr_short_name: TEMPO_NO2_L2 +cmr_version: V03 -# Use the Earthdata OAuth session (accessToken cookie) instead of basic auth. -# Required for ASDC Cumulus endpoints which do not support ~/.netrc basic auth. -earthdata_auth: true - -# Maximum duration of a single TEMPO scan granule. +# Maximum duration of a single TEMPO scan granule. Used to extend the +# temporal search window backwards so granules starting before window_begin +# but containing data within the DA window are not missed. max_orbit_duration: PT2H diff --git a/src/swell/tasks/download_obs.py b/src/swell/tasks/download_obs.py index 7a433fd50..fca2785d4 100644 --- a/src/swell/tasks/download_obs.py +++ b/src/swell/tasks/download_obs.py @@ -143,6 +143,10 @@ def _download_obs( return self._download_obs_s3_secure( obs_config, obs_name, window_begin_dto, window_end_dto, dry_run) + if retrieval_method == 'cmr': + return self._download_obs_cmr( + obs_config, obs_name, window_begin_dto, window_end_dto, dry_run) + # ------------------------------------------------------------------ # Default: HTTPS directory listing # ------------------------------------------------------------------ @@ -379,6 +383,129 @@ def _download_obs_s3_secure( return downloaded, failed + def _download_obs_cmr( + self, + obs_config: dict, + obs_name: str, + window_begin_dto: datetime.datetime, + window_end_dto: datetime.datetime, + dry_run: bool, + ) -> tuple[int, int]: + """Download files for one observation type by querying the NASA CMR + API for granule download URLs, then fetching each file via + authenticated HTTPS. + + The ``obs_config`` dict must contain: + + - ``cmr_short_name``: CMR collection short name, e.g. ``TEMPO_NO2_L2``. + + Optional keys: + + - ``cmr_version``: collection version string, e.g. ``V03``. + - ``max_orbit_duration``: ISO-8601 duration; extends the temporal + search backwards (default ``PT0H``). + + Returns ``(n_downloaded, n_failed)``. + """ + CMR_GRANULES_URL = 'https://cmr.earthdata.nasa.gov/search/granules.json' + DATA_LINK_REL = 'http://esipfed.org/ns/fedsearch/1.1/data#' + SKIP_SUFFIXES = ('.met', '.dmrpp', '.xml', '.md5') + + cmr_short_name = obs_config['cmr_short_name'] + cmr_version = obs_config.get('cmr_version', '') + max_orbit_dur = isodate.parse_duration( + obs_config.get('max_orbit_duration', 'PT0H')) + + # Extend window backwards for instruments with long scan durations. + utc = datetime.timezone.utc + search_start = window_begin_dto - max_orbit_dur + search_end = window_end_dto + if search_start.tzinfo is None: + search_start = search_start.replace(tzinfo=utc) + if search_end.tzinfo is None: + search_end = search_end.replace(tzinfo=utc) + + temporal = (f'{search_start.strftime("%Y-%m-%dT%H:%M:%SZ")},' + f'{search_end.strftime("%Y-%m-%dT%H:%M:%SZ")}') + + if dry_run: + self.logger.info( + f' [DRY RUN] Would query CMR: short_name={cmr_short_name} ' + f'version={cmr_version} temporal={temporal}') + return 0, 0 + + # CMR search is public — no auth required. + params = {'short_name': cmr_short_name, + 'temporal[]': temporal, + 'page_size': 2000} + if cmr_version: + params['version'] = cmr_version + + self.logger.info( + f' Querying CMR: short_name={cmr_short_name} ' + f'version={cmr_version} temporal={temporal}') + + try: + cmr_resp = requests.get(CMR_GRANULES_URL, params=params, timeout=30) + cmr_resp.raise_for_status() + except requests.RequestException as exc: + self.logger.abort(f'CMR query failed: {exc}') + + granules = cmr_resp.json().get('feed', {}).get('entry', []) + self.logger.info(f' CMR returned {len(granules)} granule(s)') + + if not granules: + self.logger.info(' No granules found for this window — nothing to download') + return 0, 0 + + # Authenticated session for ASDC HTTPS downloads. + try: + session = self._create_earthdata_session() + except Exception as exc: + self.logger.abort(f'Failed to create Earthdata session: {exc}') + + dest_dir = os.path.join(self.cycle_dir(), 'download', obs_name) + os.makedirs(dest_dir, exist_ok=True) + + downloaded = 0 + failed = 0 + + for granule in granules: + # Find the primary HTTPS data link for this granule. + data_url = None + for link in granule.get('links', []): + if DATA_LINK_REL in link.get('rel', '') and \ + link.get('href', '').startswith('https://'): + data_url = link['href'] + break + + if not data_url: + self.logger.warning( + f' No HTTPS data link found for granule: ' + f'{granule.get("id", "unknown")}') + continue + + filename = os.path.basename(data_url) + + if filename.endswith(SKIP_SUFFIXES): + continue + + dest_path = os.path.join(dest_dir, filename) + if os.path.exists(dest_path): + self.logger.info(f' Already exists, skipping: {filename}') + downloaded += 1 + continue + + try: + self._download_file(session, data_url, dest_path) + self.logger.info(f' Downloaded: {filename}') + downloaded += 1 + except requests.RequestException as exc: + self.logger.error(f' Failed to download {filename}: {exc}') + failed += 1 + + return downloaded, failed + def _create_earthdata_session(self) -> requests.Session: """Return an authenticated ``requests.Session`` for NASA Earthdata protected HTTPS endpoints (e.g. ``data.asdc.earthdata.nasa.gov``). From 8ecbe429ad83b1d942e0236525030d9e15cb4b86 Mon Sep 17 00:00:00 2001 From: Furkan Goktas Date: Fri, 15 May 2026 16:39:24 -0400 Subject: [PATCH 06/23] Update config for tempo --- .../geos_cf/ingest_observations/tempo_no2_tropo.yaml | 5 +++++ src/swell/suites/ingest_obs/suite_config.py | 4 ++-- 2 files changed, 7 insertions(+), 2 deletions(-) create mode 100644 src/swell/configuration/jedi/interfaces/geos_cf/ingest_observations/tempo_no2_tropo.yaml diff --git a/src/swell/configuration/jedi/interfaces/geos_cf/ingest_observations/tempo_no2_tropo.yaml b/src/swell/configuration/jedi/interfaces/geos_cf/ingest_observations/tempo_no2_tropo.yaml new file mode 100644 index 000000000..dfa4d69f6 --- /dev/null +++ b/src/swell/configuration/jedi/interfaces/geos_cf/ingest_observations/tempo_no2_tropo.yaml @@ -0,0 +1,5 @@ +# TEMPO NO2 Tropospheric Column L2 — R2D2 ingestion configuration +# Source: output of ConvertObsToIoda, relative to the cycle work directory. + +acquisition_method: local +source: ioda/tempo_no2_tropo/tempo_no2_tropo_%Y%m%d%H.nc # in CYCLE_DIR diff --git a/src/swell/suites/ingest_obs/suite_config.py b/src/swell/suites/ingest_obs/suite_config.py index ae47fbbe3..15fbc4a42 100644 --- a/src/swell/suites/ingest_obs/suite_config.py +++ b/src/swell/suites/ingest_obs/suite_config.py @@ -53,8 +53,8 @@ class SuiteConfig(QuestionContainer, Enum): ], geos_cf=[ qd.window_length("PT6H"), - qd.obs_to_download(['omps_o3_nm_total']), - qd.obs_to_ingest(['omps_o3_nm_total']), + qd.obs_to_download(['tempo_no2_tropo']), + qd.obs_to_ingest(['tempo_no2_tropo']), qd.converter_path( "/discover/nobackup/projects/jcsda/s2127/maryamao/" "jedi-bundle/build-intel-1.9/bin/" From b8a95e18dd255bcf6bfe96f9bba60ab4afd7ecc8 Mon Sep 17 00:00:00 2001 From: Furkan Goktas Date: Fri, 15 May 2026 16:55:43 -0400 Subject: [PATCH 07/23] Make NO2 uppercase --- .../geos_cf/convert_observations/tempo_no2_tropo.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/swell/configuration/jedi/interfaces/geos_cf/convert_observations/tempo_no2_tropo.yaml b/src/swell/configuration/jedi/interfaces/geos_cf/convert_observations/tempo_no2_tropo.yaml index 62021f37d..2a5bd4a96 100644 --- a/src/swell/configuration/jedi/interfaces/geos_cf/convert_observations/tempo_no2_tropo.yaml +++ b/src/swell/configuration/jedi/interfaces/geos_cf/convert_observations/tempo_no2_tropo.yaml @@ -19,4 +19,4 @@ output_filename_template: "tempo_no2_tropo_%Y%m%d%H.nc" # Additional flags passed verbatim to the converter after -i and -o. extra_flags: -c: troposphere # retrieval layer: troposphere column - -v: no2 # variable: nitrogen dioxide + -v: NO2 # variable: nitrogen dioxide From e033bf7579eec6de0c2c34577b308ad064070c7e Mon Sep 17 00:00:00 2001 From: Furkan Goktas Date: Fri, 15 May 2026 17:24:33 -0400 Subject: [PATCH 08/23] Fix abort when not in dry run --- src/swell/tasks/convert_obs_to_ioda.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/swell/tasks/convert_obs_to_ioda.py b/src/swell/tasks/convert_obs_to_ioda.py index e3ba0adcf..0f4c97910 100644 --- a/src/swell/tasks/convert_obs_to_ioda.py +++ b/src/swell/tasks/convert_obs_to_ioda.py @@ -144,11 +144,9 @@ def _run_converter( input_files = sorted(glob.glob(input_pattern)) if not input_files: - msg = f'No input files found for {obs_name} in {download_dir}' - if dry_run: - self.logger.warning(msg) - else: - self.logger.abort(msg) + self.logger.warning( + f'No input files found for {obs_name} in {download_dir} - skipping') + return self.logger.info(f'Found {len(input_files)} input file(s)') From 4fc7c103d9b767d1dbec06668f39f36bc675c17c Mon Sep 17 00:00:00 2001 From: Furkan Goktas Date: Fri, 15 May 2026 17:31:45 -0400 Subject: [PATCH 09/23] Change default dates --- src/swell/suites/ingest_obs/suite_config.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/swell/suites/ingest_obs/suite_config.py b/src/swell/suites/ingest_obs/suite_config.py index 15fbc4a42..16baab51f 100644 --- a/src/swell/suites/ingest_obs/suite_config.py +++ b/src/swell/suites/ingest_obs/suite_config.py @@ -44,8 +44,8 @@ class SuiteConfig(QuestionContainer, Enum): list_name="ingest_obs_cf", questions=[ ingest_obs, - qd.start_cycle_point("2023-08-10T00:00:00Z"), - qd.final_cycle_point("2023-08-11T00:00:00Z"), + qd.start_cycle_point("2024-01-01T18:00:00Z"), + qd.final_cycle_point("2024-01-01T18:00:00Z"), qd.model_components(['geos_cf']), qd.runahead_limit("P5"), qd.download_convert_pipeline(True), From cf43c31c9096ade55d1bb96bfcae38513d1c94f5 Mon Sep 17 00:00:00 2001 From: Furkan Goktas Date: Fri, 15 May 2026 18:04:04 -0400 Subject: [PATCH 10/23] Handle corrupt files --- src/swell/tasks/download_obs.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/src/swell/tasks/download_obs.py b/src/swell/tasks/download_obs.py index fca2785d4..5e12d4301 100644 --- a/src/swell/tasks/download_obs.py +++ b/src/swell/tasks/download_obs.py @@ -701,9 +701,26 @@ def _list_remote_dir(self, session: requests.Session, url: str) -> list[str]: def _download_file( self, session: requests.Session, url: str, dest_path: str ) -> None: - """Stream a remote file to ``dest_path`` in 1 MB chunks.""" + """Stream a remote file to ``dest_path`` in 1 MB chunks. + + Verifies the number of bytes written against the Content-Length + in the response header. If they do not match (truncated download), + the partial file is deleted and a requests.RequestException is raised + so the caller can record the failure and the file will be + re-attempted on the next run. + """ with session.get(url, stream=True, timeout=(5, 30)) as response: response.raise_for_status() + expected = int(response.headers.get('Content-Length', 0)) + written = 0 with open(dest_path, 'wb') as fh: for chunk in response.iter_content(chunk_size=1024 * 1024): fh.write(chunk) + written += len(chunk) + + if expected and written != expected: + os.remove(dest_path) + raise requests.RequestException( + f'Incomplete download: got {written} of {expected} bytes for ' + f'{os.path.basename(dest_path)}' + ) From 81fb55b48391ab8e03b5f0cd567ac43d2ea6d819 Mon Sep 17 00:00:00 2001 From: jeromebarre Date: Wed, 20 May 2026 15:04:00 -0400 Subject: [PATCH 11/23] add s3 public download and yamls for tropomi no2 and co --- .../tropomi_s5p_co_total.yaml | 26 ++++ .../tropomi_s5p_no2_tropo.yaml | 26 ++++ .../tropomi_s5p_co_total.yaml | 16 +++ .../tropomi_s5p_no2_tropo.yaml | 16 +++ .../tropomi_s5p_co_total.yaml | 5 + .../tropomi_s5p_no2_tropo.yaml | 5 + .../jedi/observation_ioda_names.yaml | 4 + src/swell/suites/ingest_obs/suite_config.py | 10 +- src/swell/tasks/download_obs.py | 125 +++++++++++++++++- 9 files changed, 227 insertions(+), 6 deletions(-) create mode 100644 src/swell/configuration/jedi/interfaces/geos_cf/convert_observations/tropomi_s5p_co_total.yaml create mode 100644 src/swell/configuration/jedi/interfaces/geos_cf/convert_observations/tropomi_s5p_no2_tropo.yaml create mode 100644 src/swell/configuration/jedi/interfaces/geos_cf/download_observations/tropomi_s5p_co_total.yaml create mode 100644 src/swell/configuration/jedi/interfaces/geos_cf/download_observations/tropomi_s5p_no2_tropo.yaml create mode 100644 src/swell/configuration/jedi/interfaces/geos_cf/ingest_observations/tropomi_s5p_co_total.yaml create mode 100644 src/swell/configuration/jedi/interfaces/geos_cf/ingest_observations/tropomi_s5p_no2_tropo.yaml diff --git a/src/swell/configuration/jedi/interfaces/geos_cf/convert_observations/tropomi_s5p_co_total.yaml b/src/swell/configuration/jedi/interfaces/geos_cf/convert_observations/tropomi_s5p_co_total.yaml new file mode 100644 index 000000000..6fb148c53 --- /dev/null +++ b/src/swell/configuration/jedi/interfaces/geos_cf/convert_observations/tropomi_s5p_co_total.yaml @@ -0,0 +1,26 @@ +# TROPOMI S5P CO Total Column L2 — IODA converter configuration +# Converter: tropomi_no2_co_nc2ioda.py (must be available in `converter_path` +# or in jedi_bundle/build/bin/) +# +# Invocation (one call per cycle, all granules passed together): +# python3 /build/bin/tropomi_no2_co_nc2ioda.py +# -i ... +# -o /ioda/tropomi_s5p_co_total/tropomi_s5p_co_total_YYYYMMDDHH.nc +# -v co +# -c total +# -n 0.0 +# -q 0.5 + +# Name of the ioda-converters Python script. +converter_script: tropomi_no2_co_nc2ioda.py + +# Output filename template (must match the source pattern in +# ingest_observations/tropomi_s5p_co_total.yaml). +output_filename_template: "tropomi_s5p_co_total_%Y%m%d%H.nc" + +# Additional flags passed verbatim to the converter after -i and -o. +extra_flags: + -v: co # variable name + -c: total # column type + -n: 0.0 # no thinning + -q: 0.5 # quality value threshold diff --git a/src/swell/configuration/jedi/interfaces/geos_cf/convert_observations/tropomi_s5p_no2_tropo.yaml b/src/swell/configuration/jedi/interfaces/geos_cf/convert_observations/tropomi_s5p_no2_tropo.yaml new file mode 100644 index 000000000..ddc17b1f5 --- /dev/null +++ b/src/swell/configuration/jedi/interfaces/geos_cf/convert_observations/tropomi_s5p_no2_tropo.yaml @@ -0,0 +1,26 @@ +# TROPOMI S5P NO2 Tropospheric Column L2 — IODA converter configuration +# Converter: tropomi_no2_co_nc2ioda.py (must be available in `converter_path` +# or in jedi_bundle/build/bin/) +# +# Invocation (one call per cycle, all granules passed together): +# python3 /build/bin/tropomi_no2_co_nc2ioda.py +# -i ... +# -o /ioda/tropomi_s5p_no2_tropo/tropomi_s5p_no2_tropo_YYYYMMDDHH.nc +# -v no2 +# -c troposphere +# -n 0.0 +# -q 0.75 + +# Name of the ioda-converters Python script. +converter_script: tropomi_no2_co_nc2ioda.py + +# Output filename template (must match the source pattern in +# ingest_observations/tropomi_s5p_no2_tropo.yaml). +output_filename_template: "tropomi_s5p_no2_tropo_%Y%m%d%H.nc" + +# Additional flags passed verbatim to the converter after -i and -o. +extra_flags: + -v: no2 # variable name + -c: troposphere # column type + -n: 0.0 # no thinning + -q: 0.75 # quality value threshold diff --git a/src/swell/configuration/jedi/interfaces/geos_cf/download_observations/tropomi_s5p_co_total.yaml b/src/swell/configuration/jedi/interfaces/geos_cf/download_observations/tropomi_s5p_co_total.yaml new file mode 100644 index 000000000..1cc2215bd --- /dev/null +++ b/src/swell/configuration/jedi/interfaces/geos_cf/download_observations/tropomi_s5p_co_total.yaml @@ -0,0 +1,16 @@ +# TROPOMI S5P CO Total Column L2 — download configuration +# Instrument: TROPOMI on Sentinel-5P +# Product: NRTI L2__CO____ (Near Real-Time total column CO) +# Source: MEEO S3 public bucket (no credentials required) +# +# Files are named: +# S5P_NRTI_L2__CO_____YYYYMMDDTHHmmss_*.nc + +retrieval_method: s3_public +s3_source: 's3://meeo-s5p/NRTI/L2__CO____/YYYY/MM/DD' + +# Maximum duration of a single TROPOMI orbit granule. Used to extend the +# file search window backwards so that orbits starting before window_begin +# but containing data within the DA window are not missed. +# TROPOMI orbits are approximately 101 minutes; PT2H provides a safe margin. +max_orbit_duration: PT2H diff --git a/src/swell/configuration/jedi/interfaces/geos_cf/download_observations/tropomi_s5p_no2_tropo.yaml b/src/swell/configuration/jedi/interfaces/geos_cf/download_observations/tropomi_s5p_no2_tropo.yaml new file mode 100644 index 000000000..21fe0f6e3 --- /dev/null +++ b/src/swell/configuration/jedi/interfaces/geos_cf/download_observations/tropomi_s5p_no2_tropo.yaml @@ -0,0 +1,16 @@ +# TROPOMI S5P NO2 Tropospheric Column L2 — download configuration +# Instrument: TROPOMI on Sentinel-5P +# Product: NRTI L2__NO2___ (Near Real-Time tropospheric NO2) +# Source: MEEO S3 public bucket (no credentials required) +# +# Files are named: +# S5P_NRTI_L2__NO2____YYYYMMDDTHHmmss_*.nc + +retrieval_method: s3_public +s3_source: 's3://meeo-s5p/NRTI/L2__NO2___/YYYY/MM/DD' + +# Maximum duration of a single TROPOMI orbit granule. Used to extend the +# file search window backwards so that orbits starting before window_begin +# but containing data within the DA window are not missed. +# TROPOMI orbits are approximately 101 minutes; PT2H provides a safe margin. +max_orbit_duration: PT2H diff --git a/src/swell/configuration/jedi/interfaces/geos_cf/ingest_observations/tropomi_s5p_co_total.yaml b/src/swell/configuration/jedi/interfaces/geos_cf/ingest_observations/tropomi_s5p_co_total.yaml new file mode 100644 index 000000000..77d8acdc7 --- /dev/null +++ b/src/swell/configuration/jedi/interfaces/geos_cf/ingest_observations/tropomi_s5p_co_total.yaml @@ -0,0 +1,5 @@ +# TROPOMI S5P CO Total Column L2 — R2D2 ingestion configuration +# Source: output of ConvertObsToIoda, relative to the cycle work directory. + +acquisition_method: local +source: ioda/tropomi_s5p_co_total/tropomi_s5p_co_total_%Y%m%d%H.nc # in CYCLE_DIR diff --git a/src/swell/configuration/jedi/interfaces/geos_cf/ingest_observations/tropomi_s5p_no2_tropo.yaml b/src/swell/configuration/jedi/interfaces/geos_cf/ingest_observations/tropomi_s5p_no2_tropo.yaml new file mode 100644 index 000000000..4dc969a6d --- /dev/null +++ b/src/swell/configuration/jedi/interfaces/geos_cf/ingest_observations/tropomi_s5p_no2_tropo.yaml @@ -0,0 +1,5 @@ +# TROPOMI S5P NO2 Tropospheric Column L2 — R2D2 ingestion configuration +# Source: output of ConvertObsToIoda, relative to the cycle work directory. + +acquisition_method: local +source: ioda/tropomi_s5p_no2_tropo/tropomi_s5p_no2_tropo_%Y%m%d%H.nc # in CYCLE_DIR diff --git a/src/swell/configuration/jedi/observation_ioda_names.yaml b/src/swell/configuration/jedi/observation_ioda_names.yaml index a2a920ee8..b6950e69c 100644 --- a/src/swell/configuration/jedi/observation_ioda_names.yaml +++ b/src/swell/configuration/jedi/observation_ioda_names.yaml @@ -358,6 +358,10 @@ ioda instrument names: full name: TROPOMI tropospheric column CO inst type: retrieval provider : esa + - ioda name: tropomi_s5p_co_total + full name: TROPOMI S5P total column CO (NRTI L2) + inst type: retrieval + provider : esa - ioda name: omps_o3_nm_total full name: OMPS total column O3 (Nadir Mapper) inst type: retrieval diff --git a/src/swell/suites/ingest_obs/suite_config.py b/src/swell/suites/ingest_obs/suite_config.py index 16baab51f..85f44bff8 100644 --- a/src/swell/suites/ingest_obs/suite_config.py +++ b/src/swell/suites/ingest_obs/suite_config.py @@ -44,8 +44,8 @@ class SuiteConfig(QuestionContainer, Enum): list_name="ingest_obs_cf", questions=[ ingest_obs, - qd.start_cycle_point("2024-01-01T18:00:00Z"), - qd.final_cycle_point("2024-01-01T18:00:00Z"), + qd.start_cycle_point("2025-08-01T18:00:00Z"), + qd.final_cycle_point("2025-08-01T18:00:00Z"), qd.model_components(['geos_cf']), qd.runahead_limit("P5"), qd.download_convert_pipeline(True), @@ -53,8 +53,10 @@ class SuiteConfig(QuestionContainer, Enum): ], geos_cf=[ qd.window_length("PT6H"), - qd.obs_to_download(['tempo_no2_tropo']), - qd.obs_to_ingest(['tempo_no2_tropo']), + qd.obs_to_download(['omps_o3_nm_total', 'tempo_no2_tropo', 'tropomi_s5p_no2_tropo', + 'tropomi_s5p_co_total']), + qd.obs_to_ingest(['omps_o3_nm_total', 'tempo_no2_tropo', 'tropomi_s5p_no2_tropo', + 'tropomi_s5p_co_total']), qd.converter_path( "/discover/nobackup/projects/jcsda/s2127/maryamao/" "jedi-bundle/build-intel-1.9/bin/" diff --git a/src/swell/tasks/download_obs.py b/src/swell/tasks/download_obs.py index 5e12d4301..548d122f6 100644 --- a/src/swell/tasks/download_obs.py +++ b/src/swell/tasks/download_obs.py @@ -8,12 +8,15 @@ Task for downloading raw observation files from remote servers. Downloads native observation files (e.g. HDF5, NetCDF) from either HTTPS -servers such as NASA GES DISC, or from S3 buckets via the NASA Earthdata -Cumulus distribution service. +servers such as NASA GES DISC, from public AWS S3 buckets, or from +protected S3 buckets via the NASA Earthdata Cumulus distribution service. HTTPS authentication is handled via ~/.netrc (same mechanism used by wget/curl). +S3 (``retrieval_method: s3_public``) uses anonymous boto3 access for +publicly readable buckets (e.g. ``s3://meeo-s5p``). Requires ``boto3``. + S3 (``retrieval_method: s3_secure``) authentication is done by exchanging Earthdata credentials (read from ``~/.netrc`` for ``urs.earthdata.nasa.gov``) for temporary AWS credentials via the NASA ASDC @@ -47,6 +50,12 @@ class DownloadObs(taskBase): ``filename_pattern``, and streams files via HTTPS. Authentication uses ``~/.netrc``. + ``s3_public`` + Downloads files from a publicly readable S3 bucket path given by + ``s3_source`` (e.g. ``s3://meeo-s5p/NRTI/L2__NO2___/YYYY/MM/DD``). + No credentials required; uses anonymous boto3 access. + Requires ``boto3``. + ``s3_secure`` Downloads files from an S3 bucket path given by ``s3_source`` (e.g. ``s3://asdc-prod-protected/TEMPO/TEMPO_NO2_L2_V03/YYYY.MM.DD``). @@ -139,6 +148,10 @@ def _download_obs( """ retrieval_method = obs_config.get('retrieval_method', 'https') + if retrieval_method == 's3_public': + return self._download_obs_s3_public( + obs_config, obs_name, window_begin_dto, window_end_dto, dry_run) + if retrieval_method == 's3_secure': return self._download_obs_s3_secure( obs_config, obs_name, window_begin_dto, window_end_dto, dry_run) @@ -227,6 +240,114 @@ def _download_obs( return downloaded, failed + def _download_obs_s3_public( + self, + obs_config: dict, + obs_name: str, + window_begin_dto: datetime.datetime, + window_end_dto: datetime.datetime, + dry_run: bool, + ) -> tuple[int, int]: + """Download files for one observation type from a publicly readable + S3 bucket using anonymous (unsigned) boto3 access. + + The ``obs_config`` dict must contain: + + - ``s3_source``: S3 URI template with ``YYYY``, ``MM``, ``DD`` + placeholders, e.g. + ``s3://meeo-s5p/NRTI/L2__NO2___/YYYY/MM/DD``. + + Optional keys: + + - ``max_orbit_duration``: ISO-8601 duration; extends the search + window backwards (default ``PT0H``). + + Returns ``(n_downloaded, n_failed)``. + """ + try: + import boto3 + from botocore import UNSIGNED + from botocore.config import Config as BotocoreConfig + from botocore.exceptions import BotoCoreError, ClientError + except ImportError: + self.logger.abort( + "boto3 is required for 's3_public' retrieval but is not installed. " + 'Install it with: pip install boto3') + + s3_source_template = obs_config['s3_source'] + max_orbit_dur = isodate.parse_duration( + obs_config.get('max_orbit_duration', 'PT0H')) + + search_start = window_begin_dto - max_orbit_dur + search_end = window_end_dto + + dest_dir = os.path.join(self.cycle_dir(), 'download', obs_name) + if not dry_run: + os.makedirs(dest_dir, exist_ok=True) + + without_scheme = s3_source_template[len('s3://'):] + bucket, _, prefix_template = without_scheme.partition('/') + + if dry_run: + for day_date in self._day_slots(search_start, search_end): + prefix = self._resolve_s3_prefix(prefix_template, day_date) + self.logger.info( + f' [DRY RUN] Would list s3://{bucket}/{prefix}') + return 0, 0 + + s3_client = boto3.client( + 's3', + config=BotocoreConfig(signature_version=UNSIGNED)) + + downloaded = 0 + failed = 0 + + for day_date in self._day_slots(search_start, search_end): + prefix = self._resolve_s3_prefix(prefix_template, day_date) + self.logger.info(f' Listing s3://{bucket}/{prefix}') + + try: + paginator = s3_client.get_paginator('list_objects_v2') + pages = paginator.paginate(Bucket=bucket, Prefix=prefix) + keys = [ + obj['Key'] + for page in pages + for obj in page.get('Contents', []) + ] + except (BotoCoreError, ClientError) as exc: + self.logger.error( + f' Failed to list s3://{bucket}/{prefix}: {exc}') + failed += 1 + continue + + if not keys: + self.logger.info( + f' No objects found under s3://{bucket}/{prefix}') + continue + + self.logger.info( + f' {day_date}: {len(keys)} object(s) found') + + for key in keys: + filename = os.path.basename(key) + dest_path = os.path.join(dest_dir, filename) + + if os.path.exists(dest_path): + self.logger.info(f' Already exists, skipping: {filename}') + downloaded += 1 + continue + + try: + s3_client.download_file(bucket, key, dest_path) + self.logger.info(f' Downloaded: {filename}') + downloaded += 1 + except (BotoCoreError, ClientError) as exc: + self.logger.error( + f' Failed to download {filename}: {exc}') + failed += 1 + + return downloaded, failed + def _download_obs_s3_secure( self, obs_config: dict, From f2fd0fae834ddc80760fa7df77834bf0acad3d3f Mon Sep 17 00:00:00 2001 From: jeromebarre Date: Wed, 20 May 2026 15:11:56 -0400 Subject: [PATCH 12/23] remove tempo --- src/swell/suites/ingest_obs/suite_config.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/swell/suites/ingest_obs/suite_config.py b/src/swell/suites/ingest_obs/suite_config.py index 85f44bff8..9c569eda8 100644 --- a/src/swell/suites/ingest_obs/suite_config.py +++ b/src/swell/suites/ingest_obs/suite_config.py @@ -53,9 +53,9 @@ class SuiteConfig(QuestionContainer, Enum): ], geos_cf=[ qd.window_length("PT6H"), - qd.obs_to_download(['omps_o3_nm_total', 'tempo_no2_tropo', 'tropomi_s5p_no2_tropo', + qd.obs_to_download(['omps_o3_nm_total', 'tropomi_s5p_no2_tropo', 'tropomi_s5p_co_total']), - qd.obs_to_ingest(['omps_o3_nm_total', 'tempo_no2_tropo', 'tropomi_s5p_no2_tropo', + qd.obs_to_ingest(['omps_o3_nm_total', 'tropomi_s5p_no2_tropo', 'tropomi_s5p_co_total']), qd.converter_path( "/discover/nobackup/projects/jcsda/s2127/maryamao/" From edd2190127ec4008c581d929a6f61c03dacf720a Mon Sep 17 00:00:00 2001 From: jeromebarre Date: Wed, 20 May 2026 16:23:56 -0400 Subject: [PATCH 13/23] limit times --- src/swell/tasks/download_obs.py | 81 ++++++++++++++++++++++++++------- 1 file changed, 64 insertions(+), 17 deletions(-) diff --git a/src/swell/tasks/download_obs.py b/src/swell/tasks/download_obs.py index 548d122f6..c799670fc 100644 --- a/src/swell/tasks/download_obs.py +++ b/src/swell/tasks/download_obs.py @@ -169,10 +169,13 @@ def _download_obs( max_orbit_dur_str = obs_config.get('max_orbit_duration', 'PT0H') max_orbit_dur = isodate.parse_duration(max_orbit_dur_str) - # Extend the search window backwards so we catch orbits that started - # before window_begin but still have data inside the window. + # Extend the search window on both ends: + # - backwards so we catch orbits that started before window_begin + # but still have data inside the window; + # - forwards so we catch orbits that started after window_end + # but whose end time overlaps the tail of the window. search_start = window_begin_dto - max_orbit_dur - search_end = window_end_dto + scan_end = window_end_dto + max_orbit_dur hour_slots = self._hour_slots(search_start, search_end) @@ -261,6 +264,12 @@ def _download_obs_s3_public( - ``max_orbit_duration``: ISO-8601 duration; extends the search window backwards (default ``PT0H``). + - ``filename_datetime_field``: 0-based index of the start-time + field when the filename is split on ``_``. If omitted the + method falls back to a regex search for the first + ``YYYYMMDDTHHMMSS`` token in the filename. + - ``filename_datetime_format``: strptime format for that field + (default ``%Y%m%dT%H%M%S``). Returns ``(n_downloaded, n_failed)``. """ @@ -281,6 +290,20 @@ def _download_obs_s3_public( search_start = window_begin_dto - max_orbit_dur search_end = window_end_dto + # Make bounds UTC-aware for comparison against parsed timestamps. + utc = datetime.timezone.utc + if search_start.tzinfo is None: + search_start = search_start.replace(tzinfo=utc) + if window_end_dto.tzinfo is None: + window_end_dto = window_end_dto.replace(tzinfo=utc) + search_end = window_end_dto + + # Granule start-time parsing config (same keys as _download_obs_s3_secure). + # Falls back to a regex search when filename_datetime_field is not set. + datetime_field = obs_config.get('filename_datetime_field', None) + datetime_fmt = obs_config.get('filename_datetime_format', '%Y%m%dT%H%M%S') + _dt_re = re.compile(r'\d{8}T\d{6}') + dest_dir = os.path.join(self.cycle_dir(), 'download', obs_name) if not dry_run: os.makedirs(dest_dir, exist_ok=True) @@ -332,6 +355,37 @@ def _download_obs_s3_public( filename = os.path.basename(key) dest_path = os.path.join(dest_dir, filename) + # ---------------------------------------------------------- + # Time-filter: skip granules outside [search_start, window_end]. + # ---------------------------------------------------------- + file_dt = None + if datetime_field is not None: + try: + parts = filename.split('_') + ts_str = parts[datetime_field] + fmt = datetime_fmt + # Normalise mismatched trailing 'Z'. + if fmt.endswith('Z') and not ts_str.endswith('Z'): + fmt = fmt[:-1] + elif ts_str.endswith('Z') and not fmt.endswith('Z'): + ts_str = ts_str[:-1] + file_dt = datetime.datetime.strptime( + ts_str, fmt).replace(tzinfo=utc) + except (IndexError, ValueError): + pass + if file_dt is None: + # Fallback: find the first YYYYMMDDTHHMMSS token. + m = _dt_re.search(filename) + if m: + try: + file_dt = datetime.datetime.strptime( + m.group(), '%Y%m%dT%H%M%S').replace(tzinfo=utc) + except ValueError: + pass + if file_dt is not None and not (search_start <= file_dt <= search_end): + self.logger.info(f' Skipping (outside window): {filename}') + continue + if os.path.exists(dest_path): self.logger.info(f' Already exists, skipping: {filename}') downloaded += 1 @@ -397,19 +451,18 @@ def _download_obs_s3_secure( datetime_field = obs_config.get('filename_datetime_field', 4) datetime_fmt = obs_config.get('filename_datetime_format', '%Y%m%dT%H%M%SZ') - # Extend window backwards for long-orbit instruments. + # Extend window backwards (catch early-starting orbits) and forwards + # (catch orbits that started after window_end but still overlap it). search_start = window_begin_dto - max_orbit_dur - search_end = window_end_dto + scan_end = window_end_dto + max_orbit_dur - # Make both bounds UTC-aware for comparison against parsed timestamps. + # Make bounds UTC-aware for comparison against parsed timestamps. utc = datetime.timezone.utc if search_start.tzinfo is None: search_start = search_start.replace(tzinfo=utc) - if window_begin_dto.tzinfo is None: - window_begin_dto = window_begin_dto.replace(tzinfo=utc) if window_end_dto.tzinfo is None: window_end_dto = window_end_dto.replace(tzinfo=utc) - search_end = window_end_dto + scan_end_utc = scan_end.replace(tzinfo=utc) if scan_end.tzinfo is None else scan_end dest_dir = os.path.join(self.cycle_dir(), 'download', obs_name) if not dry_run: @@ -420,7 +473,7 @@ def _download_obs_s3_secure( bucket, _, prefix_template = without_scheme.partition('/') if dry_run: - for day_date in self._day_slots(search_start, search_end): + for day_date in self._day_slots(search_start, scan_end_utc): prefix = self._resolve_s3_prefix(prefix_template, day_date) self.logger.info( f' [DRY RUN] Would list s3://{bucket}/{prefix}') @@ -443,13 +496,7 @@ def _download_obs_s3_secure( downloaded = 0 failed = 0 - for day_date in self._day_slots(search_start, search_end): - prefix = self._resolve_s3_prefix(prefix_template, day_date) - self.logger.info( - f' Listing s3://{bucket}/{prefix}') - - try: - paginator = s3_client.get_paginator('list_objects_v2') + for day_date in self._day_slots(search_start, scan_end_utc): pages = paginator.paginate(Bucket=bucket, Prefix=prefix) except (BotoCoreError, ClientError) as exc: self.logger.error( From 4f2c05c5ab8011b13479384f0fc9bd36b837d478 Mon Sep 17 00:00:00 2001 From: jeromebarre Date: Wed, 20 May 2026 16:46:17 -0400 Subject: [PATCH 14/23] Fix IndentationError in _download_obs_s3_secure introduced in previous commit --- src/swell/tasks/download_obs.py | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/src/swell/tasks/download_obs.py b/src/swell/tasks/download_obs.py index c799670fc..29acc4cb3 100644 --- a/src/swell/tasks/download_obs.py +++ b/src/swell/tasks/download_obs.py @@ -169,13 +169,10 @@ def _download_obs( max_orbit_dur_str = obs_config.get('max_orbit_duration', 'PT0H') max_orbit_dur = isodate.parse_duration(max_orbit_dur_str) - # Extend the search window on both ends: - # - backwards so we catch orbits that started before window_begin - # but still have data inside the window; - # - forwards so we catch orbits that started after window_end - # but whose end time overlaps the tail of the window. + # Extend the search window backwards so we catch orbits that started + # before window_begin but still have data inside the window. search_start = window_begin_dto - max_orbit_dur - scan_end = window_end_dto + max_orbit_dur + search_end = window_end_dto hour_slots = self._hour_slots(search_start, search_end) @@ -451,18 +448,19 @@ def _download_obs_s3_secure( datetime_field = obs_config.get('filename_datetime_field', 4) datetime_fmt = obs_config.get('filename_datetime_format', '%Y%m%dT%H%M%SZ') - # Extend window backwards (catch early-starting orbits) and forwards - # (catch orbits that started after window_end but still overlap it). + # Extend window backwards for long-orbit instruments. search_start = window_begin_dto - max_orbit_dur - scan_end = window_end_dto + max_orbit_dur + search_end = window_end_dto - # Make bounds UTC-aware for comparison against parsed timestamps. + # Make both bounds UTC-aware for comparison against parsed timestamps. utc = datetime.timezone.utc if search_start.tzinfo is None: search_start = search_start.replace(tzinfo=utc) + if window_begin_dto.tzinfo is None: + window_begin_dto = window_begin_dto.replace(tzinfo=utc) if window_end_dto.tzinfo is None: window_end_dto = window_end_dto.replace(tzinfo=utc) - scan_end_utc = scan_end.replace(tzinfo=utc) if scan_end.tzinfo is None else scan_end + search_end = window_end_dto dest_dir = os.path.join(self.cycle_dir(), 'download', obs_name) if not dry_run: @@ -473,7 +471,7 @@ def _download_obs_s3_secure( bucket, _, prefix_template = without_scheme.partition('/') if dry_run: - for day_date in self._day_slots(search_start, scan_end_utc): + for day_date in self._day_slots(search_start, search_end): prefix = self._resolve_s3_prefix(prefix_template, day_date) self.logger.info( f' [DRY RUN] Would list s3://{bucket}/{prefix}') @@ -496,7 +494,13 @@ def _download_obs_s3_secure( downloaded = 0 failed = 0 - for day_date in self._day_slots(search_start, scan_end_utc): + for day_date in self._day_slots(search_start, search_end): + prefix = self._resolve_s3_prefix(prefix_template, day_date) + self.logger.info( + f' Listing s3://{bucket}/{prefix}') + + try: + paginator = s3_client.get_paginator('list_objects_v2') pages = paginator.paginate(Bucket=bucket, Prefix=prefix) except (BotoCoreError, ClientError) as exc: self.logger.error( From 943a1cbead918778abb7760cd6b66981e8709a8d Mon Sep 17 00:00:00 2001 From: ftgoktas Date: Thu, 21 May 2026 01:27:56 -0400 Subject: [PATCH 15/23] fix name --- .../geos_cf/convert_observations/tempo_no2_tropo.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/swell/configuration/jedi/interfaces/geos_cf/convert_observations/tempo_no2_tropo.yaml b/src/swell/configuration/jedi/interfaces/geos_cf/convert_observations/tempo_no2_tropo.yaml index 2a5bd4a96..62021f37d 100644 --- a/src/swell/configuration/jedi/interfaces/geos_cf/convert_observations/tempo_no2_tropo.yaml +++ b/src/swell/configuration/jedi/interfaces/geos_cf/convert_observations/tempo_no2_tropo.yaml @@ -19,4 +19,4 @@ output_filename_template: "tempo_no2_tropo_%Y%m%d%H.nc" # Additional flags passed verbatim to the converter after -i and -o. extra_flags: -c: troposphere # retrieval layer: troposphere column - -v: NO2 # variable: nitrogen dioxide + -v: no2 # variable: nitrogen dioxide From 76f2a2203066730e56bb466b1f31b4fb07c4a396 Mon Sep 17 00:00:00 2001 From: Furkan Goktas Date: Thu, 21 May 2026 10:32:01 -0400 Subject: [PATCH 16/23] Remove dead code --- src/swell/tasks/download_obs.py | 268 ++------------------------------ 1 file changed, 15 insertions(+), 253 deletions(-) diff --git a/src/swell/tasks/download_obs.py b/src/swell/tasks/download_obs.py index 5e12d4301..5be19a4b3 100644 --- a/src/swell/tasks/download_obs.py +++ b/src/swell/tasks/download_obs.py @@ -7,17 +7,18 @@ """ Task for downloading raw observation files from remote servers. -Downloads native observation files (e.g. HDF5, NetCDF) from either HTTPS -servers such as NASA GES DISC, or from S3 buckets via the NASA Earthdata -Cumulus distribution service. - -HTTPS authentication is handled via ~/.netrc (same mechanism used by -wget/curl). - -S3 (``retrieval_method: s3_secure``) authentication is done by exchanging -Earthdata credentials (read from ``~/.netrc`` for -``urs.earthdata.nasa.gov``) for temporary AWS credentials via the NASA ASDC -Cumulus S3 distribution endpoint. Requires ``boto3`` to be installed. +Two retrieval methods are supported, selected by ``retrieval_method`` in +the per-obs download YAML: + +``https`` (default) + Scrapes an HTML directory listing and streams files via HTTPS. + Authentication uses ``~/.netrc``. + +``cmr`` + Queries the NASA CMR API for granule URLs, then downloads via + authenticated HTTPS (Earthdata ``~/.netrc``). Use this for NASA ASDC + datasets such as TEMPO NO2 whose S3 bucket is restricted to AWS + ``us-west-2``. """ import base64 @@ -47,13 +48,9 @@ class DownloadObs(taskBase): ``filename_pattern``, and streams files via HTTPS. Authentication uses ``~/.netrc``. - ``s3_secure`` - Downloads files from an S3 bucket path given by ``s3_source`` - (e.g. ``s3://asdc-prod-protected/TEMPO/TEMPO_NO2_L2_V03/YYYY.MM.DD``). - Temporary AWS credentials are obtained automatically via the NASA - Earthdata Cumulus distribution endpoint using the Earthdata login - stored in ``~/.netrc`` for ``urs.earthdata.nasa.gov``. - Requires ``boto3``. + ``cmr`` + Queries the NASA CMR API for granule download URLs and fetches + each file via authenticated HTTPS (Earthdata ``~/.netrc``). Raw obs files are placed in ``/download//``. @@ -139,10 +136,6 @@ def _download_obs( """ retrieval_method = obs_config.get('retrieval_method', 'https') - if retrieval_method == 's3_secure': - return self._download_obs_s3_secure( - obs_config, obs_name, window_begin_dto, window_end_dto, dry_run) - if retrieval_method == 'cmr': return self._download_obs_cmr( obs_config, obs_name, window_begin_dto, window_end_dto, dry_run) @@ -227,162 +220,6 @@ def _download_obs( return downloaded, failed - def _download_obs_s3_secure( - self, - obs_config: dict, - obs_name: str, - window_begin_dto: datetime.datetime, - window_end_dto: datetime.datetime, - dry_run: bool, - ) -> tuple[int, int]: - """Download files for one observation type from an S3 bucket using - temporary credentials obtained via the NASA Earthdata Cumulus - distribution endpoint. - - The ``obs_config`` dict must contain: - - - ``s3_source``: S3 URI template with ``YYYY``, ``MM``, ``DD`` - placeholders, e.g. - ``s3://asdc-prod-protected/TEMPO/TEMPO_NO2_L2_V03/YYYY.MM.DD``. - - Optional keys: - - - ``max_orbit_duration``: ISO-8601 duration; extends the search - window backwards (default ``PT0H``). - - ``filename_datetime_field``: 0-based index of the start-time - field when the filename is split on ``_`` (default ``4``, - matching the TEMPO L2 naming convention). - - ``filename_datetime_format``: strptime format string for that - field (default ``%Y%m%dT%H%M%SZ``). - - Returns ``(n_downloaded, n_failed)``. - """ - try: - import boto3 - from botocore.exceptions import BotoCoreError, ClientError - except ImportError: - self.logger.abort( - "boto3 is required for 's3_secure' retrieval but is not installed. " - "Install it with: pip install boto3") - - s3_source_template = obs_config['s3_source'] - max_orbit_dur = isodate.parse_duration( - obs_config.get('max_orbit_duration', 'PT0H')) - - # Index (0-based) of the start-time token when the filename is - # split on '_'. TEMPO L2 filenames look like: - # TEMPO_NO2_L2_V03_20231015T180000Z_20231015T190000Z_S001G01.nc - # ^^^^ field 4 = start time - datetime_field = obs_config.get('filename_datetime_field', 4) - datetime_fmt = obs_config.get('filename_datetime_format', '%Y%m%dT%H%M%SZ') - - # Extend window backwards for long-orbit instruments. - search_start = window_begin_dto - max_orbit_dur - search_end = window_end_dto - - # Make both bounds UTC-aware for comparison against parsed timestamps. - utc = datetime.timezone.utc - if search_start.tzinfo is None: - search_start = search_start.replace(tzinfo=utc) - if window_begin_dto.tzinfo is None: - window_begin_dto = window_begin_dto.replace(tzinfo=utc) - if window_end_dto.tzinfo is None: - window_end_dto = window_end_dto.replace(tzinfo=utc) - search_end = window_end_dto - - dest_dir = os.path.join(self.cycle_dir(), 'download', obs_name) - if not dry_run: - os.makedirs(dest_dir, exist_ok=True) - - # Parse the S3 URI: s3://bucket/prefix/template - without_scheme = s3_source_template[len('s3://'):] - bucket, _, prefix_template = without_scheme.partition('/') - - if dry_run: - for day_date in self._day_slots(search_start, search_end): - prefix = self._resolve_s3_prefix(prefix_template, day_date) - self.logger.info( - f' [DRY RUN] Would list s3://{bucket}/{prefix}') - return 0, 0 - - # Obtain temporary AWS credentials via Earthdata Cumulus. - try: - creds = self._get_earthdata_s3_credentials() - except Exception as exc: - self.logger.abort(f'Failed to obtain Earthdata S3 credentials: {exc}') - - s3_client = boto3.client( - 's3', - aws_access_key_id=creds['accessKeyId'], - aws_secret_access_key=creds['secretAccessKey'], - aws_session_token=creds['sessionToken'], - region_name='us-west-2', - ) - - downloaded = 0 - failed = 0 - - for day_date in self._day_slots(search_start, search_end): - prefix = self._resolve_s3_prefix(prefix_template, day_date) - self.logger.info( - f' Listing s3://{bucket}/{prefix}') - - try: - paginator = s3_client.get_paginator('list_objects_v2') - pages = paginator.paginate(Bucket=bucket, Prefix=prefix) - except (BotoCoreError, ClientError) as exc: - self.logger.error( - f' Failed to list s3://{bucket}/{prefix}: {exc}') - failed += 1 - continue - - for page in pages: - for obj in page.get('Contents', []): - key = obj['Key'] - filename = os.path.basename(key) - - # Skip sidecar/metadata files. - if filename.endswith(('.met', '.dmrpp')): - continue - - # Parse the granule start time from the filename. - try: - parts = filename.split('_') - ts_str = parts[datetime_field] - # Ensure the format string ends with Z and the - # token also ends with Z (strip trailing chars if any). - if not ts_str.endswith('Z'): - ts_str = ts_str[:15] + 'Z' - file_dt = datetime.datetime.strptime( - ts_str, datetime_fmt).replace(tzinfo=utc) - except (IndexError, ValueError) as exc: - self.logger.warning( - f' Could not parse timestamp from {filename}: {exc}') - continue - - # Include files whose granule start time falls within - # the extended search window (search_start..window_end). - if not (search_start <= file_dt <= search_end): - continue - - dest_path = os.path.join(dest_dir, filename) - if os.path.exists(dest_path): - self.logger.info( - f' Already exists, skipping: {filename}') - downloaded += 1 - continue - - try: - s3_client.download_file(bucket, key, dest_path) - self.logger.info(f' Downloaded: {filename}') - downloaded += 1 - except (BotoCoreError, ClientError) as exc: - self.logger.error( - f' Failed to download {filename}: {exc}') - failed += 1 - - return downloaded, failed - def _download_obs_cmr( self, obs_config: dict, @@ -573,56 +410,6 @@ def _create_earthdata_session(self) -> requests.Session: self.logger.info('Obtained Earthdata session (accessToken cookie set)') return session - def _get_earthdata_s3_credentials(self) -> dict: - """Obtain temporary AWS credentials via the NASA Earthdata Cumulus - S3 distribution endpoint. - - Reads Earthdata username and password from ``~/.netrc`` for - ``urs.earthdata.nasa.gov`` and performs the OAuth redirect chain - against the ASDC Cumulus distribution URL, returning a dict with - keys ``accessKeyId``, ``secretAccessKey``, ``sessionToken``, and - ``expiration``. - - The credentials are valid for **1 hour** (AWS STS role-chaining - limit). For DA cycles whose download phase takes longer than 1 - hour a new call to this method will be required; callers should - catch ``botocore.exceptions.ClientError`` with error code - ``ExpiredTokenException`` and re-invoke this method. - - **In-region access only**: the ASDC S3 bucket resides in - ``us-west-2``. Direct S3 access only works from compute running - in the same AWS region. - - Raises: - ValueError: if ``~/.netrc`` has no entry for Earthdata or if - the credential exchange fails. - - Reference: - https://data.asdc.earthdata.nasa.gov/s3credentialsREADME - """ - CREDENTIALS_URL = 'https://data.asdc.earthdata.nasa.gov/s3credentials' - - # Reuse the shared OAuth exchange — session already has the - # accessToken cookie after this call. - session = self._create_earthdata_session() - - # Step 4 — Re-request the credentials URL; the session now carries - # the accessToken cookie and Cumulus returns JSON with - # temporary AWS keys. - r4 = session.get(CREDENTIALS_URL, timeout=30) - r4.raise_for_status() - creds = r4.json() - - required_keys = {'accessKeyId', 'secretAccessKey', 'sessionToken'} - if not required_keys.issubset(creds): - raise ValueError( - f'Unexpected credentials response (missing keys): {creds}') - - self.logger.info( - f'Obtained temporary S3 credentials from Earthdata Cumulus ' - f'(expires: {creds.get("expiration", "unknown")})') - return creds - # ------------------------------------------------------------------ # Slot/date helpers # ------------------------------------------------------------------ @@ -640,20 +427,6 @@ def _hour_slots( current += datetime.timedelta(hours=1) return slots - def _day_slots( - self, - search_start: datetime.datetime, - search_end: datetime.datetime, - ) -> list[datetime.date]: - """Return a list of unique date objects from search_start to search_end.""" - days = [] - current = search_start.date() - end_date = search_end.date() - while current <= end_date: - days.append(current) - current += datetime.timedelta(days=1) - return days - # ------------------------------------------------------------------ # Template resolution helpers # ------------------------------------------------------------------ @@ -677,17 +450,6 @@ def _resolve_filename(self, template: str, date: datetime.date, hour: int) -> st .replace('JJJ', f'{day_of_year:03d}') .replace('HH', f'{hour:02d}')) - def _resolve_s3_prefix(self, template: str, date: datetime.date) -> str: - """Substitute YYYY, MM, DD in an S3 prefix template. - - Handles both slash-separated (``YYYY/MM/DD``) and dot-separated - (``YYYY.MM.DD``) date formats that appear in NASA ASDC S3 paths. - """ - return (template - .replace('YYYY', f'{date.year:04d}') - .replace('MM', f'{date.month:02d}') - .replace('DD', f'{date.day:02d}')) - # ------------------------------------------------------------------ # HTTPS helpers # ------------------------------------------------------------------ From f219411aac710f6b230e01841f77ba26a4c78fd6 Mon Sep 17 00:00:00 2001 From: Furkan Goktas Date: Thu, 21 May 2026 10:51:40 -0400 Subject: [PATCH 17/23] Remove boto3 --- pyproject.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 413d03530..8440e2cec 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,7 +40,6 @@ dependencies = [ "numpy<2", "pandas>=1.4.0", "requests>=2.28.0", - "boto3>=1.26.0", "isodate>=0.5.4", "f90nml>=1.4.3", "questionary>=1.10.0", From 71a31fbbef0d1d0e29626e4e539d489a71b5a184 Mon Sep 17 00:00:00 2001 From: Furkan Goktas Date: Fri, 22 May 2026 18:34:18 -0400 Subject: [PATCH 18/23] Add documentation (#783) --- docs/examples/r2d2/ingest_obs.md | 80 +++++++++++++++---- .../convert_observations/tempo_no2_tropo.yaml | 3 +- .../tempo_no2_tropo.yaml | 8 +- 3 files changed, 70 insertions(+), 21 deletions(-) diff --git a/docs/examples/r2d2/ingest_obs.md b/docs/examples/r2d2/ingest_obs.md index 6f9260ab0..e2a3c779e 100644 --- a/docs/examples/r2d2/ingest_obs.md +++ b/docs/examples/r2d2/ingest_obs.md @@ -99,17 +99,25 @@ Same as the direct-copy pipeline, add an entry to `observation_ioda_names.yaml`. ### Step 3: Create the download YAML -Create `src/swell/configuration/jedi/interfaces//download_observations/my_obs.yaml`: +Create `src/swell/configuration/jedi/interfaces//download_observations/my_obs.yaml`. + +The `retrieval_method` key selects how files are discovered and downloaded. +Two methods are supported: + +--- + +#### `retrieval_method: https` (default) + +Scrapes an HTML directory listing from a remote HTTPS server and downloads +matching files. Use this for datasets served via GES DISC or similar +directory-listing endpoints. ```yaml +retrieval_method: https # optional — https is the default remote_host: https://snpp-omps.gesdisc.eosdis.nasa.gov remote_path_template: /data/SNPP_OMPS_Level2/OMPS_NPP_NMTO3_L2.2/YYYY/JJJ/ filename_pattern: OMPS-NPP_NMTO3-L2_v2.1_YYYYmMMDDtHH*.h5 auth_type: earthdata_token - -# How far before window_begin to extend the file search. -# Use this to capture orbit granules that started before the DA window -# but contain data within it. Set to the maximum granule/orbit duration. max_orbit_duration: PT2H ``` @@ -117,11 +125,41 @@ Supported placeholders in `remote_path_template` and `filename_pattern`: `YYYY`, `MM`, `DD`, `JJJ` (day-of-year), `HH`. Use `*` as a wildcard in `filename_pattern` where the exact timestamp is not known in advance. -With `auth_type` set to `earthdata_token`, Authentication uses `~/.netrc` and no tokens are stored in the config. -Follow the instructions on the NASA Earthdata website [here](https://urs.earthdata.nasa.gov/documentation/for_users/data_access/create_net_rc_file) to create -an account and set up the authentication. +--- + +#### `retrieval_method: cmr` + +Queries the [NASA CMR API](https://cmr.earthdata.nasa.gov) to discover +granule download URLs, then fetches each file over authenticated HTTPS using +Earthdata credentials from `~/.netrc`. + +Use this for NASA ASDC datasets (e.g. TEMPO NO2) where direct S3 access is +restricted to AWS `us-west-2`. CMR + HTTPS works from any network including +Discover and other HPC systems. + +```yaml +retrieval_method: cmr +cmr_short_name: TEMPO_NO2_L2 # CMR collection short name +cmr_version: V03 # collection version (optional) +max_orbit_duration: PT2H # extend search window backwards (optional, default PT0H) +``` + +| Key | Required | Description | +|-----|----------|-------------| +| `cmr_short_name` | Yes | CMR collection short name (e.g. `TEMPO_NO2_L2`, `OMPS_NPP_NMTO3_L2`) | +| `cmr_version` | No | Collection version string (e.g. `V03`). Omit to match all versions. | +| `max_orbit_duration` | No | ISO-8601 duration; extends the CMR temporal query backwards so granules starting just before `window_begin` are not missed. Default `PT0H`. | + +CMR search requires no authentication, but you'll need Earthdata account. You can register at [urs.earthdata.nasa.gov](https://urs.earthdata.nasa.gov) if you don't have one. File downloads use the four-step +Earthdata OAuth flow with credentials from `~/.netrc`: + +``` +machine urs.earthdata.nasa.gov login password +``` + + -`DownloadObs` task places files in `/download//`. +`DownloadObs` places files in `/download//`. ### Step 4: Create the converter YAML @@ -210,22 +248,29 @@ acquisition_method: 'cp' cp_source: '/discover/nobackup/projects/gmao/soca/obs/ioda/ocean/adt_cryosat2n/YYYY/MM/ioda-obs-YYYYMMDDHH-adt_cryosat2n.nc' ``` -### `ingest_obs_cf` — OMPS-NM ozone from NASA GES DISC +### `ingest_obs_cf` — TEMPO NO2 and OMPS-NM ozone ```bash swell create ingest_obs_cf swell launch ``` -Downloads OMPS-NM HDF5 granules from GES DISC, converts them to IODA format, and +Downloads raw granules from remote servers, converts them to IODA format, and ingests the result into R2D2. Uses the `DownloadObs → ConvertObsToIoda → IngestObs` pipeline. Requires Earthdata credentials in `~/.netrc`. +Currently configured observations: + +| Observation | Retrieval method | Source | +|-------------|-----------------|--------| +| `tempo_no2_tropo` | `cmr` | NASA CMR + ASDC HTTPS (`TEMPO_NO2_L2 V03`) | +| `omps_o3_nm_total` | `https` | NASA GES DISC | + Suite configuration: ```python qd.download_convert_pipeline(True) -qd.obs_to_download(['omps_o3_nm_total']) -qd.obs_to_ingest(['omps_o3_nm_total']) +qd.obs_to_download(['tempo_no2_tropo']) +qd.obs_to_ingest(['tempo_no2_tropo']) qd.converter_path('/path/to/jedi-bundle/build/bin/') qd.window_length("PT6H") qd.dry_run(False) @@ -271,7 +316,9 @@ For each cycle: 1. **`DownloadObs`** reads `obs_to_download`, then for each obs: - Reads `download_observations/.yaml` - Extends the DA window backwards by `max_orbit_duration` to avoid missing partial orbits - - Walks through each hour slot, lists the remote directory, and downloads matching files to `/download//` + - **`https`**: walks through each hour slot, lists the remote directory, and downloads matching files + - **`cmr`**: queries the NASA CMR API for granule URLs covering the window, then downloads each file via authenticated HTTPS + - Files are placed in `/download//` 2. **`ConvertObsToIoda`** reads `obs_to_download`, then for each obs: - Reads `convert_observations/.yaml` @@ -312,3 +359,8 @@ in the format: ``` machine urs.earthdata.nasa.gov login password ``` + +File permissions must be `600` (`chmod 600 ~/.netrc`). For ASDC datasets +(`retrieval_method: cmr`), also confirm that the ASDC DAAC application is +approved in your Earthdata account (urs.earthdata.nasa.gov -> Applications -> +Authorized Apps). diff --git a/src/swell/configuration/jedi/interfaces/geos_cf/convert_observations/tempo_no2_tropo.yaml b/src/swell/configuration/jedi/interfaces/geos_cf/convert_observations/tempo_no2_tropo.yaml index 62021f37d..d0cb30dd3 100644 --- a/src/swell/configuration/jedi/interfaces/geos_cf/convert_observations/tempo_no2_tropo.yaml +++ b/src/swell/configuration/jedi/interfaces/geos_cf/convert_observations/tempo_no2_tropo.yaml @@ -12,8 +12,7 @@ # Name of the ioda-converters Python script. converter_script: tempo_nc2ioda.py -# Output filename template (must match the source pattern in -# ingest_observations/tempo_no2_tropo.yaml once that is created). +# Output filename template (must match `source` in ingest_observations/tempo_no2_tropo.yaml). output_filename_template: "tempo_no2_tropo_%Y%m%d%H.nc" # Additional flags passed verbatim to the converter after -i and -o. diff --git a/src/swell/configuration/jedi/interfaces/geos_cf/download_observations/tempo_no2_tropo.yaml b/src/swell/configuration/jedi/interfaces/geos_cf/download_observations/tempo_no2_tropo.yaml index 75cd205a5..143a46689 100644 --- a/src/swell/configuration/jedi/interfaces/geos_cf/download_observations/tempo_no2_tropo.yaml +++ b/src/swell/configuration/jedi/interfaces/geos_cf/download_observations/tempo_no2_tropo.yaml @@ -5,13 +5,11 @@ # (Earthdata authentication required in ~/.netrc for # urs.earthdata.nasa.gov) # -# Files are discovered via the NASA CMR API and downloaded from: +# Granules are discovered via the NASA CMR API and downloaded from: # https://data.asdc.earthdata.nasa.gov/TEMPO/TEMPO_NO2_L2_V03/... # -# Note: for workflows running inside AWS us-west-2, direct S3 access is also -# available (faster, no CMR query required): -# retrieval_method: s3_secure -# s3_source: "s3://asdc-prod-protected/TEMPO/TEMPO_NO2_L2_V03/YYYY.MM.DD" +# Direct S3 access (s3://asdc-prod-protected/...) is blocked outside +# AWS us-west-2 by ASDC IAM policy; CMR + HTTPS works from any network. retrieval_method: cmr cmr_short_name: TEMPO_NO2_L2 From 0511f9b47cd198187e1c3ed2cbdc3556927134c3 Mon Sep 17 00:00:00 2001 From: Furkan Goktas Date: Fri, 22 May 2026 18:43:33 -0400 Subject: [PATCH 19/23] Fix pycode (#783) --- docs/examples/r2d2/ingest_obs.md | 10 +++++++++- src/swell/tasks/download_obs.py | 4 ++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/docs/examples/r2d2/ingest_obs.md b/docs/examples/r2d2/ingest_obs.md index e2a3c779e..8d62649ae 100644 --- a/docs/examples/r2d2/ingest_obs.md +++ b/docs/examples/r2d2/ingest_obs.md @@ -118,6 +118,10 @@ remote_host: https://snpp-omps.gesdisc.eosdis.nasa.gov remote_path_template: /data/SNPP_OMPS_Level2/OMPS_NPP_NMTO3_L2.2/YYYY/JJJ/ filename_pattern: OMPS-NPP_NMTO3-L2_v2.1_YYYYmMMDDtHH*.h5 auth_type: earthdata_token + +# How far before window_begin to extend the file search. +# Use this to capture orbit granules that started before the DA window +# but contain data within it. Set to the maximum granule/orbit duration. max_orbit_duration: PT2H ``` @@ -125,6 +129,10 @@ Supported placeholders in `remote_path_template` and `filename_pattern`: `YYYY`, `MM`, `DD`, `JJJ` (day-of-year), `HH`. Use `*` as a wildcard in `filename_pattern` where the exact timestamp is not known in advance. +- With `auth_type` set to `earthdata_token`, authentication uses `~/.netrc` and no tokens are stored in the config. +- Follow the instructions on the NASA Earthdata website [here](https://urs.earthdata.nasa.gov/documentation/for_users/data_access/create_net_rc_file) to create +- an account and set up the authentication. + --- #### `retrieval_method: cmr` @@ -159,7 +167,7 @@ machine urs.earthdata.nasa.gov login password -`DownloadObs` places files in `/download//`. +`DownloadObs` task places files in `/download//`. ### Step 4: Create the converter YAML diff --git a/src/swell/tasks/download_obs.py b/src/swell/tasks/download_obs.py index 5be19a4b3..6f3f79b97 100644 --- a/src/swell/tasks/download_obs.py +++ b/src/swell/tasks/download_obs.py @@ -465,8 +465,8 @@ def _download_file( ) -> None: """Stream a remote file to ``dest_path`` in 1 MB chunks. - Verifies the number of bytes written against the Content-Length - in the response header. If they do not match (truncated download), + Verifies the number of bytes written against the Content-Length + in the response header. If they do not match (truncated download), the partial file is deleted and a requests.RequestException is raised so the caller can record the failure and the file will be re-attempted on the next run. From 22a90f6f07981c89fabce2e9631e286db9bad96a Mon Sep 17 00:00:00 2001 From: jeromebarre Date: Wed, 3 Jun 2026 16:50:09 -0400 Subject: [PATCH 20/23] reintroduce changes removed with merge --- src/swell/tasks/download_obs.py | 161 ++++++++++++++++++++++++++++++-- 1 file changed, 154 insertions(+), 7 deletions(-) diff --git a/src/swell/tasks/download_obs.py b/src/swell/tasks/download_obs.py index 108875394..e69e3b677 100644 --- a/src/swell/tasks/download_obs.py +++ b/src/swell/tasks/download_obs.py @@ -7,12 +7,20 @@ """ Task for downloading raw observation files from remote servers. -Two retrieval methods are supported, selected by ``retrieval_method`` in -the per-obs download YAML: +Downloads native observation files (e.g. HDF5, NetCDF) from either HTTPS +servers such as NASA GES DISC, from public AWS S3 buckets, or from +protected S3 buckets via the NASA Earthdata Cumulus distribution service. -``https`` (default) - Scrapes an HTML directory listing and streams files via HTTPS. - Authentication uses ``~/.netrc``. +HTTPS authentication is handled via ~/.netrc (same mechanism used by +wget/curl). + +S3 (``retrieval_method: s3_public``) uses anonymous boto3 access for +publicly readable buckets (e.g. ``s3://meeo-s5p``). Requires ``boto3``. + +S3 (``retrieval_method: s3_secure``) authentication is done by exchanging +Earthdata credentials (read from ``~/.netrc`` for +``urs.earthdata.nasa.gov``) for temporary AWS credentials via the NASA ASDC +Cumulus S3 distribution endpoint. Requires ``boto3`` to be installed. ``cmr`` Queries the NASA CMR API for granule URLs, then downloads via @@ -136,6 +144,14 @@ def _download_obs( """ retrieval_method = obs_config.get('retrieval_method', 'https') + if retrieval_method == 's3_public': + return self._download_obs_s3_public( + obs_config, obs_name, window_begin_dto, window_end_dto, dry_run) + + if retrieval_method == 's3_secure': + return self._download_obs_s3_secure( + obs_config, obs_name, window_begin_dto, window_end_dto, dry_run) + if retrieval_method == 'cmr': return self._download_obs_cmr( obs_config, obs_name, window_begin_dto, window_end_dto, dry_run) @@ -220,8 +236,114 @@ def _download_obs( return downloaded, failed -======= ->>>>>>> feature/tempo-no2-ingest + def _download_obs_s3_public( + self, + obs_config: dict, + obs_name: str, + window_begin_dto: datetime.datetime, + window_end_dto: datetime.datetime, + dry_run: bool, + ) -> tuple[int, int]: + """Download files for one observation type from a publicly readable + S3 bucket using anonymous (unsigned) boto3 access. + + The ``obs_config`` dict must contain: + + - ``s3_source``: S3 URI template with ``YYYY``, ``MM``, ``DD`` + placeholders, e.g. + ``s3://meeo-s5p/NRTI/L2__NO2___/YYYY/MM/DD``. + + Optional keys: + + - ``max_orbit_duration``: ISO-8601 duration; extends the search + window backwards (default ``PT0H``). + + Returns ``(n_downloaded, n_failed)``. + """ + try: + import boto3 + from botocore import UNSIGNED + from botocore.config import Config as BotocoreConfig + from botocore.exceptions import BotoCoreError, ClientError + except ImportError: + self.logger.abort( + "boto3 is required for 's3_public' retrieval but is not installed. " + 'Install it with: pip install boto3') + + s3_source_template = obs_config['s3_source'] + max_orbit_dur = isodate.parse_duration( + obs_config.get('max_orbit_duration', 'PT0H')) + + search_start = window_begin_dto - max_orbit_dur + search_end = window_end_dto + + dest_dir = os.path.join(self.cycle_dir(), 'download', obs_name) + if not dry_run: + os.makedirs(dest_dir, exist_ok=True) + + without_scheme = s3_source_template[len('s3://'):] + bucket, _, prefix_template = without_scheme.partition('/') + + if dry_run: + for day_date in self._day_slots(search_start, search_end): + prefix = self._resolve_s3_prefix(prefix_template, day_date) + self.logger.info( + f' [DRY RUN] Would list s3://{bucket}/{prefix}') + return 0, 0 + + s3_client = boto3.client( + 's3', + config=BotocoreConfig(signature_version=UNSIGNED)) + + downloaded = 0 + failed = 0 + + for day_date in self._day_slots(search_start, search_end): + prefix = self._resolve_s3_prefix(prefix_template, day_date) + self.logger.info(f' Listing s3://{bucket}/{prefix}') + + try: + paginator = s3_client.get_paginator('list_objects_v2') + pages = paginator.paginate(Bucket=bucket, Prefix=prefix) + keys = [ + obj['Key'] + for page in pages + for obj in page.get('Contents', []) + ] + except (BotoCoreError, ClientError) as exc: + self.logger.error( + f' Failed to list s3://{bucket}/{prefix}: {exc}') + failed += 1 + continue + + if not keys: + self.logger.info( + f' No objects found under s3://{bucket}/{prefix}') + continue + + self.logger.info( + f' {day_date}: {len(keys)} object(s) found') + + for key in keys: + filename = os.path.basename(key) + dest_path = os.path.join(dest_dir, filename) + + if os.path.exists(dest_path): + self.logger.info(f' Already exists, skipping: {filename}') + downloaded += 1 + continue + + try: + s3_client.download_file(bucket, key, dest_path) + self.logger.info(f' Downloaded: {filename}') + downloaded += 1 + except (BotoCoreError, ClientError) as exc: + self.logger.error( + f' Failed to download {filename}: {exc}') + failed += 1 + + return downloaded, failed + def _download_obs_cmr( self, obs_config: dict, @@ -416,6 +538,20 @@ def _create_earthdata_session(self) -> requests.Session: # Slot/date helpers # ------------------------------------------------------------------ + def _day_slots( + self, + search_start: datetime.datetime, + search_end: datetime.datetime, + ) -> list[datetime.date]: + """Return a list of unique date objects from search_start to search_end.""" + days = [] + current = search_start.date() + end_date = search_end.date() + while current <= end_date: + days.append(current) + current += datetime.timedelta(days=1) + return days + def _hour_slots( self, search_start: datetime.datetime, @@ -433,6 +569,17 @@ def _hour_slots( # Template resolution helpers # ------------------------------------------------------------------ + def _resolve_s3_prefix(self, template: str, date: datetime.date) -> str: + """Substitute YYYY, MM, DD in an S3 prefix template. + + Handles both slash-separated (``YYYY/MM/DD``) and dot-separated + (``YYYY.MM.DD``) date formats that appear in NASA ASDC S3 paths. + """ + return (template + .replace('YYYY', f'{date.year:04d}') + .replace('MM', f'{date.month:02d}') + .replace('DD', f'{date.day:02d}')) + def _resolve_path(self, template: str, date: datetime.date) -> str: """Substitute YYYY, MM, DD, JJJ placeholders in a path template.""" day_of_year = date.timetuple().tm_yday From 7c94dc4a6814a44faa22685bd67878c013f746db Mon Sep 17 00:00:00 2001 From: jeromebarre Date: Wed, 3 Jun 2026 16:55:45 -0400 Subject: [PATCH 21/23] remove useless method --- src/swell/tasks/download_obs.py | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/src/swell/tasks/download_obs.py b/src/swell/tasks/download_obs.py index e69e3b677..730696b75 100644 --- a/src/swell/tasks/download_obs.py +++ b/src/swell/tasks/download_obs.py @@ -286,7 +286,7 @@ def _download_obs_s3_public( if dry_run: for day_date in self._day_slots(search_start, search_end): - prefix = self._resolve_s3_prefix(prefix_template, day_date) + prefix = self._resolve_path(prefix_template, day_date) self.logger.info( f' [DRY RUN] Would list s3://{bucket}/{prefix}') return 0, 0 @@ -299,7 +299,7 @@ def _download_obs_s3_public( failed = 0 for day_date in self._day_slots(search_start, search_end): - prefix = self._resolve_s3_prefix(prefix_template, day_date) + prefix = self._resolve_path(prefix_template, day_date) self.logger.info(f' Listing s3://{bucket}/{prefix}') try: @@ -569,17 +569,6 @@ def _hour_slots( # Template resolution helpers # ------------------------------------------------------------------ - def _resolve_s3_prefix(self, template: str, date: datetime.date) -> str: - """Substitute YYYY, MM, DD in an S3 prefix template. - - Handles both slash-separated (``YYYY/MM/DD``) and dot-separated - (``YYYY.MM.DD``) date formats that appear in NASA ASDC S3 paths. - """ - return (template - .replace('YYYY', f'{date.year:04d}') - .replace('MM', f'{date.month:02d}') - .replace('DD', f'{date.day:02d}')) - def _resolve_path(self, template: str, date: datetime.date) -> str: """Substitute YYYY, MM, DD, JJJ placeholders in a path template.""" day_of_year = date.timetuple().tm_yday From 3205542f76c9c9efb626cf7b7b11750b6891c72c Mon Sep 17 00:00:00 2001 From: jeromebarre Date: Wed, 3 Jun 2026 18:24:37 -0400 Subject: [PATCH 22/23] tropomi tropo column doesnt exists --- src/swell/configuration/jedi/observation_ioda_names.yaml | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/swell/configuration/jedi/observation_ioda_names.yaml b/src/swell/configuration/jedi/observation_ioda_names.yaml index b6950e69c..bb750ce4e 100644 --- a/src/swell/configuration/jedi/observation_ioda_names.yaml +++ b/src/swell/configuration/jedi/observation_ioda_names.yaml @@ -354,12 +354,8 @@ ioda instrument names: full name: TROPOMI tropospheric column NO2 inst type: retrieval provider : esa - - ioda name: tropomi_s5p_co_tropo - full name: TROPOMI tropospheric column CO - inst type: retrieval - provider : esa - ioda name: tropomi_s5p_co_total - full name: TROPOMI S5P total column CO (NRTI L2) + full name: TROPOMI S5P total column CO inst type: retrieval provider : esa - ioda name: omps_o3_nm_total From 7f2a90bf4bde503e7683f333e30e45811b36e1b0 Mon Sep 17 00:00:00 2001 From: jeromebarre Date: Wed, 3 Jun 2026 19:57:04 -0400 Subject: [PATCH 23/23] revert original date in config --- src/swell/suites/ingest_obs/suite_config.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/swell/suites/ingest_obs/suite_config.py b/src/swell/suites/ingest_obs/suite_config.py index 70e9d78e2..a8f7addb4 100644 --- a/src/swell/suites/ingest_obs/suite_config.py +++ b/src/swell/suites/ingest_obs/suite_config.py @@ -44,8 +44,8 @@ class SuiteConfig(QuestionContainer, Enum): list_name="ingest_obs_cf", questions=[ ingest_obs, - qd.start_cycle_point("2025-08-01T18:00:00Z"), - qd.final_cycle_point("2025-08-01T18:00:00Z"), + qd.start_cycle_point("2024-01-01T18:00:00Z"), + qd.final_cycle_point("2024-01-01T18:00:00Z"), qd.model_components(['geos_cf']), qd.runahead_limit("P5"), qd.download_convert_pipeline(True),