-
Notifications
You must be signed in to change notification settings - Fork 1
feat: added automated ingestion workflow #113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 32 commits
Commits
Show all changes
45 commits
Select commit
Hold shift + click to select a range
a6f5c9c
Merge branch 'release/0.1.7'
3d74559
Merge branch 'release/0.1.8'
187d54e
Merge branch 'release/0.1.9'
b2d8c1a
Merge branch 'release/0.1.10'
0b24b2c
feat: added automated ingestion workflow
ea9ea9d
Merge branch 'develop' into Automated_Ingestion_Workflow
Mesh-ach 7b5ce17
added pdp ingestion files
Mesh-ach 34297b2
Merge branch 'develop' into Automated_Ingestion_Workflow
Mesh-ach 649ef40
feat: moved reusueable components into helper.py
Mesh-ach 716ac97
fix: initialized spark
Mesh-ach d3e0f74
fix: initialized spark
Mesh-ach 0a3ae3a
fix: notebook docs
Mesh-ach 42357b7
fix: notebook docs
Mesh-ach 9b137e9
feat: refactor
ca4ef23
refactor: moved helpers to src code
8ef2dea
refactor: putting hardcoded constants in constants file
318c65b
refactor: moved functions from notebook 1 into modules
953d350
Add comprehensive test class for databricksify_inst_name
87089b1
fix: import
fd557e9
fix: tests & style
f6197da
fix: ruff
180e4e9
fix: style
0b2d742
fix: type check with overrides for paramiko
4c061a7
fix: type check
e64280c
chore: move test file from notebooks/ to ingestion/
1d30428
Move databricksify_inst_name and reverse_databricksify_inst_name to u…
a97fdbb
Move databricksify tests to test_databricks.py
c83c2bd
fix: style
d3fee8e
style
2973bab
fix: tests
54c979b
fix: added env differentiation
Mesh-ach dc68cf9
fix: env path
Mesh-ach 1e81e88
fix: mandatory databricks parameters
Mesh-ach 7af1c5b
fix: claude review
Mesh-ach 92c78ba
fix: claude review
Mesh-ach c929ff1
fix: claude review
Mesh-ach 616cee4
fix: added edvise imports
Mesh-ach e35ceba
fix: added verify parameter to download_sftp_atomic function
Mesh-ach d5c1e6e
fix: issues with snakecase normalizations that claude flagged
Mesh-ach 12c5287
fix: resolved dbutils issues
Mesh-ach e17d074
fix: resolved dbutils issues
Mesh-ach 271de56
fix: resolved gcp_config.ysml
Mesh-ach bd8f442
fix: resolved ruff issues
Mesh-ach 44c77bf
fix: resolved ruff issues
Mesh-ach 25af159
fix: added valuable output statements for workflow
Mesh-ach File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
278 changes: 278 additions & 0 deletions
278
notebooks/nsc_sftp_automated_data_ingestion/01_sftp_receive_scan.ipynb
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,278 @@ | ||
| { | ||
| "cells": [ | ||
| { | ||
| "cell_type": "code", | ||
| "execution_count": 0, | ||
| "metadata": { | ||
| "application/vnd.databricks.v1+cell": { | ||
| "cellMetadata": { | ||
| "byteLimit": 2048000, | ||
| "rowLimit": 10000 | ||
| }, | ||
| "inputWidgets": {}, | ||
| "nuid": "7dc0a9a7-1db8-42b9-b0c4-07946f392d5e", | ||
| "showTitle": false, | ||
| "tableResultSettingsMap": {}, | ||
| "title": "" | ||
| } | ||
| }, | ||
| "outputs": [], | ||
| "source": [ | ||
| "# 1. Connect to SFTP and scan the receive folder for files.\n", | ||
| "# 2. Upsert unseen files into `ingestion_manifest` with status=NEW.\n", | ||
| "# 3. Download and stage NEW + unqueued files locally and upsert them into `pending_ingest_queue`.\n", | ||
| "\n", | ||
| "# Recent refactor:\n", | ||
| "# - SFTP helpers moved to `helper.py` (`connect_sftp`, `list_receive_files`, `download_sftp_atomic`).\n", | ||
| "# - `list_receive_files` now takes `source_system` explicitly (no hidden notebook globals).\n", | ||
| "\n", | ||
| "# Constraints:\n", | ||
| "# - SFTP connection required\n", | ||
| "# - NO API calls\n", | ||
| "# - Stages files locally (TMP_DIR) + writes to Delta tables only\n", | ||
| "\n", | ||
| "# Inputs:\n", | ||
| "# - SFTP folder: `./receive`\n", | ||
| "\n", | ||
| "# Outputs:\n", | ||
| "# - `staging_sst_01.default.ingestion_manifest`\n", | ||
| "# - `staging_sst_01.default.pending_ingest_queue`\n", | ||
| "# - Staged files written to: `/tmp/pdp_sftp_stage`\n" | ||
| ] | ||
| }, | ||
| { | ||
| "cell_type": "code", | ||
| "execution_count": 0, | ||
| "metadata": { | ||
| "application/vnd.databricks.v1+cell": { | ||
| "cellMetadata": { | ||
| "byteLimit": 2048000, | ||
| "rowLimit": 10000 | ||
| }, | ||
| "inputWidgets": {}, | ||
| "nuid": "cbd7694b-4b30-41bf-9371-259479726010", | ||
| "showTitle": false, | ||
| "tableResultSettingsMap": {}, | ||
| "title": "" | ||
| } | ||
| }, | ||
| "outputs": [], | ||
| "source": [ | ||
| "%pip install paramiko python-box pyyaml\n", | ||
| "%pip install git+https://github.com/datakind/edvise.git@Automated_Ingestion_Workflow" | ||
Mesh-ach marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| ] | ||
| }, | ||
| { | ||
| "cell_type": "code", | ||
| "execution_count": 0, | ||
| "metadata": { | ||
| "application/vnd.databricks.v1+cell": { | ||
| "cellMetadata": { | ||
| "byteLimit": 2048000, | ||
| "rowLimit": 10000 | ||
| }, | ||
| "inputWidgets": {}, | ||
| "nuid": "b9ae88af-ade1-4df0-86a0-34d6d492383a", | ||
| "showTitle": false, | ||
| "tableResultSettingsMap": {}, | ||
| "title": "" | ||
| } | ||
| }, | ||
| "outputs": [], | ||
| "source": [ | ||
| "%restart_python" | ||
| ] | ||
| }, | ||
| { | ||
| "cell_type": "code", | ||
| "execution_count": 0, | ||
| "metadata": { | ||
| "application/vnd.databricks.v1+cell": { | ||
| "cellMetadata": { | ||
| "byteLimit": 2048000, | ||
| "rowLimit": 10000 | ||
| }, | ||
| "inputWidgets": {}, | ||
| "nuid": "5888f9b8-bda7-4586-9f9f-ed1243d878de", | ||
| "showTitle": false, | ||
| "tableResultSettingsMap": {}, | ||
| "title": "" | ||
| } | ||
| }, | ||
| "outputs": [], | ||
| "source": [ | ||
| "import logging\n", | ||
| "import yaml\n", | ||
| "from box import Box\n", | ||
| "from databricks.connect import DatabricksSession\n", | ||
| "\n", | ||
| "from edvise.utils.sftp import connect_sftp, list_receive_files\n", | ||
| "from edvise.ingestion.constants import (\n", | ||
| " QUEUE_TABLE_PATH,\n", | ||
| " SFTP_REMOTE_FOLDER,\n", | ||
| " SFTP_SOURCE_SYSTEM,\n", | ||
| ")\n", | ||
| "from edvise.ingestion.nsc_sftp_helpers import (\n", | ||
| " build_listing_df,\n", | ||
| " download_new_files_and_queue,\n", | ||
| " ensure_manifest_and_queue_tables,\n", | ||
| " get_files_to_queue,\n", | ||
| " upsert_new_to_manifest,\n", | ||
| ")\n", | ||
| "\n", | ||
| "try:\n", | ||
| " dbutils # noqa: F821\n", | ||
| "except NameError:\n", | ||
| " from unittest.mock import MagicMock\n", | ||
| "\n", | ||
| " dbutils = MagicMock()\n", | ||
| "spark = DatabricksSession.builder.getOrCreate()\n" | ||
| ] | ||
| }, | ||
| { | ||
| "cell_type": "code", | ||
| "execution_count": 0, | ||
| "metadata": { | ||
| "application/vnd.databricks.v1+cell": { | ||
| "cellMetadata": { | ||
| "byteLimit": 2048000, | ||
| "rowLimit": 10000 | ||
| }, | ||
| "inputWidgets": {}, | ||
| "nuid": "61b348b8-aa62-4b5a-9442-d48d52e1a862", | ||
| "showTitle": false, | ||
| "tableResultSettingsMap": {}, | ||
| "title": "" | ||
| } | ||
| }, | ||
| "outputs": [], | ||
| "source": [ | ||
| "logging.basicConfig(\n", | ||
| " level=logging.INFO,\n", | ||
| " format=\"%(asctime)s - %(name)s - %(levelname)s - %(message)s\",\n", | ||
| ")\n", | ||
| "logger = logging.getLogger(__name__)\n", | ||
| "\n", | ||
| "asset_scope = \"nsc-sftp-asset\"\n", | ||
| "\n", | ||
| "host = dbutils.secrets.get(scope=asset_scope, key=\"nsc-sftp-host\")\n", | ||
| "user = dbutils.secrets.get(scope=asset_scope, key=\"nsc-sftp-user\")\n", | ||
| "password = dbutils.secrets.get(scope=asset_scope, key=\"nsc-sftp-password\")\n", | ||
| "\n", | ||
| "logger.info(\"SFTP secured assets loaded successfully.\")" | ||
| ] | ||
| }, | ||
| { | ||
| "cell_type": "code", | ||
| "execution_count": 0, | ||
| "metadata": { | ||
| "application/vnd.databricks.v1+cell": { | ||
| "cellMetadata": { | ||
| "byteLimit": 2048000, | ||
| "rowLimit": 10000 | ||
| }, | ||
| "inputWidgets": {}, | ||
| "nuid": "80968f66-5082-49ca-b03f-b3a1ef0bb908", | ||
| "showTitle": false, | ||
| "tableResultSettingsMap": {}, | ||
| "title": "" | ||
| } | ||
| }, | ||
| "outputs": [], | ||
| "source": [ | ||
| "transport = None\n", | ||
| "sftp = None\n", | ||
| "\n", | ||
| "try:\n", | ||
| " ensure_manifest_and_queue_tables(spark)\n", | ||
| "\n", | ||
| " transport, sftp = connect_sftp(host, user, password)\n", | ||
| " logger.info(\n", | ||
| " f\"Connected to SFTP host={host} and scanning folder={SFTP_REMOTE_FOLDER}\"\n", | ||
| " )\n", | ||
| "\n", | ||
| " file_rows = list_receive_files(sftp, SFTP_REMOTE_FOLDER, SFTP_SOURCE_SYSTEM)\n", | ||
| " if not file_rows:\n", | ||
| " logger.info(\n", | ||
| " f\"No files found in SFTP folder: {SFTP_REMOTE_FOLDER}. Exiting (no-op).\"\n", | ||
| " )\n", | ||
| " dbutils.notebook.exit(\"NO_FILES\")\n", | ||
| "\n", | ||
| " df_listing = build_listing_df(spark, file_rows)\n", | ||
| "\n", | ||
| " # 1) Ensure everything on SFTP is at least represented in manifest as NEW\n", | ||
| " upsert_new_to_manifest(spark, df_listing)\n", | ||
| "\n", | ||
| " # 2) Queue anything that is still NEW and not already queued\n", | ||
| " df_to_queue = get_files_to_queue(spark, df_listing)\n", | ||
| "\n", | ||
| " to_queue_count = df_to_queue.count()\n", | ||
| " if to_queue_count == 0:\n", | ||
| " logger.info(\n", | ||
| " \"No files to queue: either nothing is NEW, or NEW files are already queued. Exiting (no-op).\"\n", | ||
| " )\n", | ||
| " dbutils.notebook.exit(\"QUEUED_FILES=0\")\n", | ||
| "\n", | ||
| " logger.info(\n", | ||
| " f\"Queuing {to_queue_count} NEW-unqueued file(s) to {QUEUE_TABLE_PATH} and staging locally.\"\n", | ||
| " )\n", | ||
| " queued_count = download_new_files_and_queue(spark, sftp, df_to_queue, logger)\n", | ||
| "\n", | ||
| " logger.info(\n", | ||
| " f\"Queued {queued_count} file(s) for downstream processing in {QUEUE_TABLE_PATH}.\"\n", | ||
| " )\n", | ||
| " dbutils.notebook.exit(f\"QUEUED_FILES={queued_count}\")\n", | ||
| "\n", | ||
| "finally:\n", | ||
| " try:\n", | ||
| " if sftp is not None:\n", | ||
| " sftp.close()\n", | ||
| " except Exception:\n", | ||
| " pass\n", | ||
| " try:\n", | ||
| " if transport is not None:\n", | ||
| " transport.close()\n", | ||
| " except Exception:\n", | ||
| " pass" | ||
| ] | ||
| }, | ||
| { | ||
| "cell_type": "code", | ||
| "execution_count": 0, | ||
| "metadata": { | ||
| "application/vnd.databricks.v1+cell": { | ||
| "cellMetadata": {}, | ||
| "inputWidgets": {}, | ||
| "nuid": "edff98e1-0862-4e41-8c35-bd5fb6647136", | ||
| "showTitle": false, | ||
| "tableResultSettingsMap": {}, | ||
| "title": "" | ||
| } | ||
| }, | ||
| "outputs": [], | ||
| "source": [] | ||
| } | ||
| ], | ||
| "metadata": { | ||
| "application/vnd.databricks.v1+notebook": { | ||
| "computePreferences": null, | ||
| "dashboards": [], | ||
| "environmentMetadata": { | ||
| "base_environment": "", | ||
| "environment_version": "4" | ||
| }, | ||
| "inputWidgetPreferences": null, | ||
| "language": "python", | ||
| "notebookMetadata": { | ||
| "pythonIndentUnit": 4 | ||
| }, | ||
| "notebookName": "01_sftp_receive_scan", | ||
| "widgets": {} | ||
| }, | ||
| "language_info": { | ||
| "name": "python" | ||
| } | ||
| }, | ||
| "nbformat": 4, | ||
| "nbformat_minor": 0 | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.