Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
a6f5c9c
Merge branch 'release/0.1.7'
Dec 3, 2025
3d74559
Merge branch 'release/0.1.8'
Dec 11, 2025
187d54e
Merge branch 'release/0.1.9'
Jan 20, 2026
b2d8c1a
Merge branch 'release/0.1.10'
Feb 2, 2026
0b24b2c
feat: added automated ingestion workflow
Feb 19, 2026
ea9ea9d
Merge branch 'develop' into Automated_Ingestion_Workflow
Mesh-ach Feb 19, 2026
7b5ce17
added pdp ingestion files
Mesh-ach Feb 19, 2026
34297b2
Merge branch 'develop' into Automated_Ingestion_Workflow
Mesh-ach Feb 23, 2026
649ef40
feat: moved reusueable components into helper.py
Mesh-ach Feb 23, 2026
716ac97
fix: initialized spark
Mesh-ach Feb 23, 2026
d3e0f74
fix: initialized spark
Mesh-ach Feb 24, 2026
0a3ae3a
fix: notebook docs
Mesh-ach Feb 24, 2026
42357b7
fix: notebook docs
Mesh-ach Feb 24, 2026
9b137e9
feat: refactor
Feb 24, 2026
ca4ef23
refactor: moved helpers to src code
Feb 24, 2026
8ef2dea
refactor: putting hardcoded constants in constants file
Feb 24, 2026
318c65b
refactor: moved functions from notebook 1 into modules
Feb 24, 2026
953d350
Add comprehensive test class for databricksify_inst_name
Feb 24, 2026
87089b1
fix: import
Feb 24, 2026
fd557e9
fix: tests & style
Feb 24, 2026
f6197da
fix: ruff
Feb 24, 2026
180e4e9
fix: style
Feb 24, 2026
0b2d742
fix: type check with overrides for paramiko
Feb 24, 2026
4c061a7
fix: type check
Feb 24, 2026
e64280c
chore: move test file from notebooks/ to ingestion/
Feb 24, 2026
1d30428
Move databricksify_inst_name and reverse_databricksify_inst_name to u…
Feb 24, 2026
a97fdbb
Move databricksify tests to test_databricks.py
Feb 24, 2026
c83c2bd
fix: style
Feb 24, 2026
d3fee8e
style
Feb 24, 2026
2973bab
fix: tests
Feb 24, 2026
54c979b
fix: added env differentiation
Mesh-ach Feb 26, 2026
dc68cf9
fix: env path
Mesh-ach Feb 26, 2026
1e81e88
fix: mandatory databricks parameters
Mesh-ach Feb 26, 2026
7af1c5b
fix: claude review
Mesh-ach Feb 26, 2026
92c78ba
fix: claude review
Mesh-ach Feb 26, 2026
c929ff1
fix: claude review
Mesh-ach Feb 26, 2026
616cee4
fix: added edvise imports
Mesh-ach Feb 26, 2026
e35ceba
fix: added verify parameter to download_sftp_atomic function
Mesh-ach Feb 26, 2026
d5c1e6e
fix: issues with snakecase normalizations that claude flagged
Mesh-ach Feb 26, 2026
12c5287
fix: resolved dbutils issues
Mesh-ach Feb 26, 2026
e17d074
fix: resolved dbutils issues
Mesh-ach Feb 26, 2026
271de56
fix: resolved gcp_config.ysml
Mesh-ach Feb 26, 2026
bd8f442
fix: resolved ruff issues
Mesh-ach Feb 26, 2026
44c77bf
fix: resolved ruff issues
Mesh-ach Feb 26, 2026
25af159
fix: added valuable output statements for workflow
Mesh-ach Feb 26, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -212,3 +212,5 @@ __marimo__/

# Claude
.claude/
*notebooks/nsc_sftp_automated_data_ingestion/tmp/
*notebooks/nsc_sftp_automated_data_ingestion/gcp_config.yaml
377 changes: 377 additions & 0 deletions notebooks/nsc_sftp_automated_data_ingestion/01_sftp_receive_scan.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,377 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "7dc0a9a7-1db8-42b9-b0c4-07946f392d5e",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"outputs": [],
"source": [
"# 1. Connect to SFTP and scan the receive folder for files.\n",
"# 2. Upsert unseen files into `ingestion_manifest` with status=NEW.\n",
"# 3. Download and stage NEW + unqueued files locally and upsert them into `pending_ingest_queue`.\n",
"\n",
"# Recent refactor:\n",
"# - SFTP helpers moved to `helper.py` (`connect_sftp`, `list_receive_files`, `download_sftp_atomic`).\n",
"# - `list_receive_files` now takes `source_system` explicitly (no hidden notebook globals).\n",
"\n",
"# Constraints:\n",
"# - SFTP connection required\n",
"# - NO API calls\n",
"# - Stages files to UC volume (CATALOG.default.tmp) + writes to Delta tables only\n",
"\n",
"# Inputs:\n",
"# - SFTP folder: `./receive`\n",
"# - Required workflow parameters (exact SFTP file names):\n",
"# - `cohort_file_name`\n",
"# - `course_file_name`\n",
"# - Both file names must end with the same 14-digit file stamp (e.g. `..._YYYYMMDDHHMMSS.csv`).\n",
"\n",
"# Outputs:\n",
"# - `CATALOG.default.ingestion_manifest`\n",
"# - `CATALOG.default.pending_ingest_queue`\n",
"# - Staged files written to UC Volume: `CATALOG.default.tmp` (path `/Volumes/<CATALOG>/default/tmp`)\n"
]
},
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "cbd7694b-4b30-41bf-9371-259479726010",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"outputs": [],
"source": [
"%pip install paramiko python-box pyyaml\n",
"%pip install git+https://github.com/datakind/edvise.git@Automated_Ingestion_Workflow"
]
},
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "b9ae88af-ade1-4df0-86a0-34d6d492383a",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"outputs": [],
"source": [
"%restart_python"
]
},
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "5888f9b8-bda7-4586-9f9f-ed1243d878de",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"outputs": [],
"source": [
"import logging\n",
"import os\n",
"import re\n",
"from databricks.connect import DatabricksSession\n",
"from pyspark.sql import functions as F\n",
"\n",
"from edvise.utils.sftp import connect_sftp, list_receive_files\n",
"from edvise.ingestion.constants import (\n",
" MANIFEST_TABLE_PATH,\n",
" QUEUE_TABLE_PATH,\n",
" SFTP_REMOTE_FOLDER,\n",
" SFTP_SOURCE_SYSTEM,\n",
" SFTP_TMP_DIR,\n",
")\n",
"from edvise.ingestion.nsc_sftp_helpers import (\n",
" build_listing_df,\n",
" download_new_files_and_queue,\n",
" ensure_manifest_and_queue_tables,\n",
" get_files_to_queue,\n",
" upsert_new_to_manifest,\n",
")\n",
"from edvise import utils\n",
"\n",
"try:\n",
" dbutils # noqa: F821\n",
"except NameError:\n",
" from unittest.mock import MagicMock\n",
"\n",
" dbutils = MagicMock()\n",
"spark = DatabricksSession.builder.getOrCreate()"
]
},
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "61b348b8-aa62-4b5a-9442-d48d52e1a862",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"outputs": [],
"source": [
"logging.basicConfig(\n",
" level=logging.INFO,\n",
" format=\"%(asctime)s - %(name)s - %(levelname)s - %(message)s\",\n",
")\n",
"logger = logging.getLogger(__name__)\n",
"\n",
"asset_scope = \"nsc-sftp-asset\"\n",
"\n",
"host = dbutils.secrets.get(scope=asset_scope, key=\"nsc-sftp-host\")\n",
"user = dbutils.secrets.get(scope=asset_scope, key=\"nsc-sftp-user\")\n",
"password = dbutils.secrets.get(scope=asset_scope, key=\"nsc-sftp-password\")\n",
"\n",
"cohort_file_name = utils.databricks.get_db_widget_param(\"cohort_file_name\", default=\"\")\n",
"course_file_name = utils.databricks.get_db_widget_param(\"course_file_name\", default=\"\")\n",
"cohort_file_name = str(cohort_file_name).strip()\n",
"course_file_name = str(course_file_name).strip()\n",
"if not cohort_file_name or not course_file_name:\n",
" raise ValueError(\n",
" \"Missing required workflow parameters: cohort_file_name and course_file_name. \"\n",
" \"Pass them as Databricks job base parameters.\"\n",
" )\n",
"\n",
"\n",
"def _extract_file_stamp(file_name: str) -> str:\n",
" base = os.path.basename(file_name)\n",
" m = re.search(r\"_(\\d{14})(?:\\.[^.]+)?$\", base)\n",
" if not m:\n",
" raise ValueError(\n",
" \"Expected file name to end with a 14-digit file stamp, e.g. \"\n",
" \"'..._YYYYMMDDHHMMSS.csv'. Got: \"\n",
" f\"{file_name}\"\n",
" )\n",
" return m.group(1)\n",
"\n",
"\n",
"cohort_stamp = _extract_file_stamp(cohort_file_name)\n",
"course_stamp = _extract_file_stamp(course_file_name)\n",
"if cohort_stamp != course_stamp:\n",
" raise ValueError(\n",
" \"cohort_file_name and course_file_name must end with the same file stamp. \"\n",
" f\"Got cohort stamp={cohort_stamp}, course stamp={course_stamp}.\"\n",
" )\n",
"logger.info(f\"Validated file stamp: {cohort_stamp}\")\n",
"logger.info(f\"Staging to UC volume path: {SFTP_TMP_DIR}\")\n",
"logger.info(\n",
" \"Manual file selection enabled: \"\n",
" f\"cohort_file_name={cohort_file_name}, course_file_name={course_file_name}\"\n",
")\n",
"\n",
"logger.info(\"SFTP secured assets loaded successfully.\")"
]
},
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "80968f66-5082-49ca-b03f-b3a1ef0bb908",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"outputs": [],
"source": [
"transport = None\n",
"sftp = None\n",
"\n",
"try:\n",
" ensure_manifest_and_queue_tables(spark)\n",
"\n",
" transport, sftp = connect_sftp(host, user, password)\n",
" logger.info(\n",
" f\"Connected to SFTP host={host} and scanning folder={SFTP_REMOTE_FOLDER}\"\n",
" )\n",
"\n",
" file_rows_all = list_receive_files(sftp, SFTP_REMOTE_FOLDER, SFTP_SOURCE_SYSTEM)\n",
" if not file_rows_all:\n",
" logger.info(\n",
" f\"No files found in SFTP folder: {SFTP_REMOTE_FOLDER}. Exiting (no-op).\"\n",
" )\n",
" dbutils.notebook.exit(\"NO_FILES\")\n",
"\n",
" requested_names = {cohort_file_name, course_file_name}\n",
" logger.info(\n",
" f\"Found {len(file_rows_all)} file(s) on SFTP in folder={SFTP_REMOTE_FOLDER}; \"\n",
" f\"requested={sorted(requested_names)}\"\n",
" )\n",
" file_rows = [r for r in file_rows_all if r.get(\"file_name\") in requested_names]\n",
"\n",
" found_names = {r.get(\"file_name\") for r in file_rows}\n",
" missing_names = sorted(requested_names - found_names)\n",
" if missing_names:\n",
" available = sorted({r.get(\"file_name\") for r in file_rows_all})\n",
" preview = available[:25]\n",
" raise FileNotFoundError(\n",
" f\"Requested file(s) not found on SFTP in folder '{SFTP_REMOTE_FOLDER}': {missing_names}. \"\n",
" f\"Available file count={len(available)}; first 25={preview}\"\n",
" )\n",
"\n",
" for r in file_rows:\n",
" logger.info(\n",
" f\"Selected SFTP file: name={r.get('file_name')} size={r.get('file_size')} \"\n",
" f\"modified={r.get('file_modified_time')}\"\n",
" )\n",
"\n",
" df_listing = build_listing_df(spark, file_rows)\n",
" fingerprints = [\n",
" r[\"file_fingerprint\"] for r in df_listing.select(\"file_fingerprint\").collect()\n",
" ]\n",
"\n",
" logger.info(\"SFTP listing (selected files):\")\n",
" df_listing.select(\n",
" \"file_name\", \"file_size\", \"file_modified_time\", \"file_fingerprint\"\n",
" ).show(truncate=False)\n",
"\n",
" # 1) Ensure everything on SFTP is at least represented in manifest as NEW\n",
" upsert_new_to_manifest(spark, df_listing)\n",
"\n",
" logger.info(\"Manifest rows (selected files):\")\n",
" spark.table(MANIFEST_TABLE_PATH).where(\n",
" F.col(\"file_fingerprint\").isin(fingerprints)\n",
" ).select(\n",
" \"file_name\",\n",
" \"file_fingerprint\",\n",
" \"status\",\n",
" \"processed_at\",\n",
" \"error_message\",\n",
" ).show(truncate=False)\n",
"\n",
" # 2) Queue anything that is still NEW and not already queued\n",
" df_to_queue = get_files_to_queue(spark, df_listing)\n",
"\n",
" to_queue_count = df_to_queue.count()\n",
" if to_queue_count == 0:\n",
" logger.info(\n",
" \"No files to queue: either nothing is NEW, or NEW files are already queued. Exiting (no-op).\"\n",
" )\n",
" dbutils.notebook.exit(\"QUEUED_FILES=0\")\n",
"\n",
" logger.info(\"Files eligible to queue:\")\n",
" df_to_queue.select(\n",
" \"file_name\", \"file_size\", \"file_modified_time\", \"file_fingerprint\"\n",
" ).show(truncate=False)\n",
"\n",
" logger.info(\n",
" f\"Queuing {to_queue_count} NEW-unqueued file(s) to {QUEUE_TABLE_PATH} and staging to UC volume.\"\n",
" )\n",
" queued_count = download_new_files_and_queue(spark, sftp, df_to_queue, logger)\n",
"\n",
" logger.info(\"Queue rows (selected files):\")\n",
" spark.table(QUEUE_TABLE_PATH).where(\n",
" F.col(\"file_fingerprint\").isin(fingerprints)\n",
" ).select(\"file_name\", \"file_fingerprint\", \"local_tmp_path\", \"queued_at\").show(\n",
" truncate=False\n",
" )\n",
"\n",
" logger.info(\n",
" f\"Queued {queued_count} file(s) for downstream processing in {QUEUE_TABLE_PATH}.\"\n",
" )\n",
" dbutils.notebook.exit(f\"QUEUED_FILES={queued_count}\")\n",
"\n",
"finally:\n",
" try:\n",
" if sftp is not None:\n",
" sftp.close()\n",
" except Exception:\n",
" pass\n",
" try:\n",
" if transport is not None:\n",
" transport.close()\n",
" except Exception:\n",
" pass"
]
},
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {},
"inputWidgets": {},
"nuid": "edff98e1-0862-4e41-8c35-bd5fb6647136",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"outputs": [],
"source": []
}
],
"metadata": {
"application/vnd.databricks.v1+notebook": {
"computePreferences": null,
"dashboards": [],
"environmentMetadata": {
"base_environment": "",
"environment_version": "4"
},
"inputWidgetPreferences": null,
"language": "python",
"notebookMetadata": {
"pythonIndentUnit": 4
},
"notebookName": "01_sftp_receive_scan",
"widgets": {}
},
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Loading
Loading