From 36bee2fb472baa6cb7850a0ff790e51df7f6925b Mon Sep 17 00:00:00 2001 From: Laura Couto Date: Fri, 17 Apr 2026 13:37:02 -0300 Subject: [PATCH 1/3] Reopen Signed-off-by: Laura Couto From ddbe23ed9f3f54e8e18a60bfcbf1da25fb988858 Mon Sep 17 00:00:00 2001 From: Laura Couto Date: Fri, 17 Apr 2026 13:45:15 -0300 Subject: [PATCH 2/3] Add conf file Signed-off-by: Laura Couto --- .../conf/base/catalog_evaluation_opik.yml | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 kedro-agentic-workflows/conf/base/catalog_evaluation_opik.yml diff --git a/kedro-agentic-workflows/conf/base/catalog_evaluation_opik.yml b/kedro-agentic-workflows/conf/base/catalog_evaluation_opik.yml new file mode 100644 index 0000000..053e329 --- /dev/null +++ b/kedro-agentic-workflows/conf/base/catalog_evaluation_opik.yml @@ -0,0 +1,27 @@ +opik_intent_evaluation_data: + type: kedro_agentic_workflows.datasets.opik_evaluation_dataset.OpikEvaluationDataset + dataset_name: evaluations/intent_agent_evaluation + filepath: data/intent_detection/evaluation/intent_evaluation.json + sync_policy: local + credentials: opik_credentials + +opik_intent_judge_llm: + type: langchain.ChatOpenAIDataset + kwargs: + model: "gpt-4o" + temperature: 0.0 + credentials: openai + +opik_judge_llm_prompt: + type: kedro_datasets_experimental.opik.OpikPromptDataset + filepath: data/intent_detection/evaluation/prompts/intent_judge_llm_prompt_opik.json + prompt_name: "opik_judge_llm_prompt" + prompt_type: "chat" + credentials: opik_credentials + sync_policy: local + mode: langchain + +opik_client: + type: kedro_datasets_experimental.opik.OpikTraceDataset + credentials: opik_credentials + mode: langchain From 3a39a3aa02a2d91909a0d7daa7f7c90798c95a90 Mon Sep 17 00:00:00 2001 From: Laura Couto Date: Fri, 17 Apr 2026 13:46:08 -0300 Subject: [PATCH 3/3] Reapply "OpikEvaluationDataset test branch" This reverts commit dec5228fd114f6fad66ac50afa804858c42f73e7. Signed-off-by: Laura Couto --- kedro-agentic-workflows/README.md | 34 +- .../conf/base/catalog_evaluation_opik.yml | 2 +- .../conf/base/catalog_genai_config.yml | 30 +- .../conf/base/parameters.yml | 2 +- .../evaluation/intent_evaluation.json | 2 +- .../prompts/intent_judge_llm_prompt_opik.json | 10 + .../e2e_opik_evaluation_dataset.ipynb | 974 ++++++++++++++++++ .../datasets/opik_evaluation_dataset.py | 604 +++++++++++ .../pipeline_registry.py | 2 + .../pipelines/__init__.py | 0 .../pipelines/intent_detection/agent.py | 22 +- .../__init__.py | 1 + .../intent_detection_evaluation_opik/nodes.py | 147 +++ .../pipeline.py | 58 ++ .../src/kedro_agentic_workflows/settings.py | 1 - 15 files changed, 1865 insertions(+), 24 deletions(-) create mode 100644 kedro-agentic-workflows/data/intent_detection/evaluation/prompts/intent_judge_llm_prompt_opik.json create mode 100644 kedro-agentic-workflows/notebooks/e2e_opik_evaluation_dataset.ipynb create mode 100644 kedro-agentic-workflows/src/kedro_agentic_workflows/datasets/opik_evaluation_dataset.py create mode 100644 kedro-agentic-workflows/src/kedro_agentic_workflows/pipelines/__init__.py create mode 100644 kedro-agentic-workflows/src/kedro_agentic_workflows/pipelines/intent_detection_evaluation_opik/__init__.py create mode 100644 kedro-agentic-workflows/src/kedro_agentic_workflows/pipelines/intent_detection_evaluation_opik/nodes.py create mode 100644 kedro-agentic-workflows/src/kedro_agentic_workflows/pipelines/intent_detection_evaluation_opik/pipeline.py diff --git a/kedro-agentic-workflows/README.md b/kedro-agentic-workflows/README.md index 9c22f56..a8920e0 100644 --- a/kedro-agentic-workflows/README.md +++ b/kedro-agentic-workflows/README.md @@ -199,17 +199,17 @@ For more details see `conf/base/catalog_genai_config.yml` and [docs for `Langfus ## ๐Ÿงช Evaluation -The project includes an **intent detection evaluation pipeline** that runs the intent classification agent against a labeled dataset and scores results using two evaluators. It integrates with [Langfuse](https://langfuse.com/) so results, traces, and scores are visible in the Langfuse UI. +The project includes an **intent detection evaluation pipeline** that runs the intent classification agent against a labeled dataset and scores results using two evaluators. It supports two observability backends โ€” [Langfuse](https://langfuse.com/) and [Opik](https://www.comet.com/opik) โ€” so results, traces, and scores are visible in either platform's UI. ### How it works The pipeline: -1. Loads the **evaluation dataset** (labeled question/intent pairs) from a local JSON file and syncs it to Langfuse. -2. Runs the **Intent Detection Agent** on each item, recording traces as Langfuse observations linked to the prompt and model. +1. Loads the **evaluation dataset** (labeled question/intent pairs) from a local JSON file and syncs it to the remote platform. +2. Runs the **Intent Detection Agent** on each item, recording traces linked to the prompt version and model. 3. Scores each result with two evaluators: - **Intent accuracy** โ€” binary match between predicted and expected intent. - **Reason quality** โ€” LLM-as-a-judge score (1โ€“5) evaluating the reasoning behind the prediction. -4. Publishes the experiment to Langfuse with all scores, traces, and metadata. +4. Publishes the experiment with all scores, traces, and metadata. ### `LangfuseEvaluationDataset` @@ -237,14 +237,40 @@ intent_evaluation_data: created_by: kedro ``` +### `OpikEvaluationDataset` + +Alternatively, the evaluation dataset can be managed by `OpikEvaluationDataset` from `kedro-datasets-experimental`, which bridges a local JSON/YAML file with a remote Opik dataset. It supports the same two sync policies as the Langfuse variant: + +- **`local`** โ€” the local file is the source of truth; `load()` re-inserts all local items to remote on every sync via Opik's upsert-by-ID API. Items with a UUID v7 `id` update the existing remote row in-place; items without a UUID v7 `id` create a new remote row on every sync. +- **`remote`** โ€” the remote Opik dataset is the source of truth. `load()` fetches remote as-is with no local file interaction; `save()` inserts items to remote only. + +Catalog entry (`conf/base/catalog_evaluation_opik.yml`): + +```yaml +opik_intent_evaluation_data: + type: kedro_datasets_experimental.opik.OpikEvaluationDataset + dataset_name: evaluations/intent_agent_evaluation + filepath: data/intent_detection/evaluation/intent_evaluation.json + sync_policy: local + credentials: opik_credentials +``` + ### Running the evaluation pipeline +**With Langfuse:** ```bash kedro run -p intent_detection_evaluation --params intent_prompt_version=1,model_name=gpt-4o ``` The `intent_prompt_version` and `model_name` parameters are used to name the experiment in Langfuse (e.g., `intent_prompt_v1_model_gpt-4o`), making it easy to compare runs across prompt iterations and models. +**With Opik:** +```bash +kedro run -p intent_detection_evaluation_opik --params model_name=gpt-4o +``` + +The experiment name is derived automatically from the active prompt name, its commit hash, and the model (e.g., `intent_eval_prompt_intent-classifier_abc12345_model_gpt-4o`), making it easy to compare runs across prompt versions and models in the Opik UI. + ### Evaluation data Stored at `data/intent_detection/evaluation/intent_evaluation.json` โ€” a JSON array of labeled items: diff --git a/kedro-agentic-workflows/conf/base/catalog_evaluation_opik.yml b/kedro-agentic-workflows/conf/base/catalog_evaluation_opik.yml index 053e329..63c4621 100644 --- a/kedro-agentic-workflows/conf/base/catalog_evaluation_opik.yml +++ b/kedro-agentic-workflows/conf/base/catalog_evaluation_opik.yml @@ -1,5 +1,5 @@ opik_intent_evaluation_data: - type: kedro_agentic_workflows.datasets.opik_evaluation_dataset.OpikEvaluationDataset + type: kedro_datasets_experimental.opik.OpikEvaluationDataset dataset_name: evaluations/intent_agent_evaluation filepath: data/intent_detection/evaluation/intent_evaluation.json sync_policy: local diff --git a/kedro-agentic-workflows/conf/base/catalog_genai_config.yml b/kedro-agentic-workflows/conf/base/catalog_genai_config.yml index 5fb13b3..06dce73 100644 --- a/kedro-agentic-workflows/conf/base/catalog_genai_config.yml +++ b/kedro-agentic-workflows/conf/base/catalog_genai_config.yml @@ -35,23 +35,25 @@ response_prompt: dataset: type: yaml.YAMLDataset -intent_prompt: - type: kedro_datasets_experimental.langfuse.LangfusePromptDataset - filepath: data/intent_detection/prompts/intent_prompt_langfuse.json - prompt_name: "intent-classifier" - prompt_type: "chat" - credentials: langfuse_credentials - sync_policy: local # local|remote|strict - mode: sdk # langchain|sdk - load_args: - version: ${runtime_params:intent_prompt_version, 1} - # intent_prompt: -# type: kedro_datasets_experimental.opik.OpikPromptDataset -# filepath: data/intent_detection/prompts/intent_prompt_opik.json +# type: kedro_datasets_experimental.langfuse.LangfusePromptDataset +# filepath: data/intent_detection/prompts/intent_prompt_langfuse.json # prompt_name: "intent-classifier" # prompt_type: "chat" -# credentials: opik_credentials +# credentials: langfuse_credentials +# sync_policy: local # local|remote|strict +# mode: sdk # langchain|sdk +# load_args: +# version: ${runtime_params:intent_prompt_version, 1} + +intent_prompt: + type: kedro_datasets_experimental.opik.OpikPromptDataset + filepath: data/intent_detection/prompts/intent_prompt_opik.json + prompt_name: "intent-classifier" + prompt_type: "chat" + credentials: opik_credentials + sync_policy: local + mode: sdk # --- Tracing --- intent_tracer_langfuse: diff --git a/kedro-agentic-workflows/conf/base/parameters.yml b/kedro-agentic-workflows/conf/base/parameters.yml index ddebb34..85a768b 100644 --- a/kedro-agentic-workflows/conf/base/parameters.yml +++ b/kedro-agentic-workflows/conf/base/parameters.yml @@ -1,4 +1,4 @@ user_id: 1 docs_matches: 3 intent_prompt_version: 1 -model_name: "gpt-4o" \ No newline at end of file +model_name: "gpt-4o" diff --git a/kedro-agentic-workflows/data/intent_detection/evaluation/intent_evaluation.json b/kedro-agentic-workflows/data/intent_detection/evaluation/intent_evaluation.json index 397f8af..e5eb1f1 100644 --- a/kedro-agentic-workflows/data/intent_detection/evaluation/intent_evaluation.json +++ b/kedro-agentic-workflows/data/intent_detection/evaluation/intent_evaluation.json @@ -109,4 +109,4 @@ "reason": "While the first part is a general question, the second part reveals the user has a pending claim they are asking about." } } -] \ No newline at end of file +] diff --git a/kedro-agentic-workflows/data/intent_detection/evaluation/prompts/intent_judge_llm_prompt_opik.json b/kedro-agentic-workflows/data/intent_detection/evaluation/prompts/intent_judge_llm_prompt_opik.json new file mode 100644 index 0000000..6b853cb --- /dev/null +++ b/kedro-agentic-workflows/data/intent_detection/evaluation/prompts/intent_judge_llm_prompt_opik.json @@ -0,0 +1,10 @@ +[ + { + "role": "system", + "content": "You are an expert evaluator for an insurance customer support intent classification system.\n\nYour task is to evaluate the QUALITY OF THE MODEL'S REASONING for the predicted intent.\n\nYou will receive:\n- The user's question\n- The model's predicted intent\n- The model's explanation (reason)\n- The expected intent\n- The reference explanation\n\nYour job is NOT to rewrite the explanation. Instead, you must score how well the model's explanation justifies the predicted intent and whether it aligns with the user question.\n\nEvaluation Criteria:\n\n5 \u2014 Excellent\nThe predicted intent is correct and the reasoning clearly and accurately explains why the intent was chosen.\n\n4 \u2014 Good\nThe predicted intent is correct and the reasoning is mostly correct, but slightly incomplete or generic.\n\n3 \u2014 Acceptable\nThe reasoning partially explains the intent but is vague or lacks clarity.\n\n2 \u2014 Poor\nThe reasoning is weak, incorrect, or does not clearly justify the predicted intent.\n\n1 \u2014 Incorrect\nThe predicted intent is wrong OR the reasoning does not match the user question.\n\nImportant Rules:\n- Focus on whether the reasoning logically explains the predicted intent.\n- If the predicted intent is incorrect, the score should be 1.\n- Minor wording differences from the reference explanation are acceptable.\n- Return only the score according to the schema." + }, + { + "role": "human", + "content": "User Question:\n{question}\n\nPredicted Intent:\n{predicted_intent}\n\nModel Explanation:\n{predicted_reason}\n\nExpected Intent:\n{expected_intent}\n\nReference Explanation:\n{expected_reason}" + } +] \ No newline at end of file diff --git a/kedro-agentic-workflows/notebooks/e2e_opik_evaluation_dataset.ipynb b/kedro-agentic-workflows/notebooks/e2e_opik_evaluation_dataset.ipynb new file mode 100644 index 0000000..ad62df3 --- /dev/null +++ b/kedro-agentic-workflows/notebooks/e2e_opik_evaluation_dataset.ipynb @@ -0,0 +1,974 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# OpikEvaluationDataset \u2014 E2E Tests\n", + "\n", + "Manual end-to-end scenarios for `OpikEvaluationDataset`. Run cells top-to-bottom within each section and verify results in the **Opik UI** after each scenario.\n", + "\n", + "| Section | Scenarios | What it covers |\n", + "|---------|-----------|----------------|\n", + "| **1. Local mode** | 1\u20134 | Fresh start, idempotent reload, incremental sync, `save()` with local merge |\n", + "| **2. Remote mode** | 5\u20136 | Remote load, `save()` without local file interaction |\n", + "| **3. UUID id behaviour** | 7\u20138 | UUID ids forwarded to Opik; non-UUID ids stripped |\n", + "| **4. Edge cases** | 9\u201310 | Items without id, invalid credentials |\n", + "| **5. Lifecycle & integration** | 11\u201312 | Native Opik API operations, running an experiment |" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "# 1. Local Mode (Scenarios 1\u20134)\n", + "\n", + "## Setup" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import json\nimport logging\nimport os\nimport tempfile\nimport time\nfrom datetime import datetime\nfrom pathlib import Path\n\nimport yaml\nfrom kedro.io import DatasetError\nfrom opik.evaluation import evaluate\nfrom opik.evaluation.metrics.score_result import ScoreResult\n\nfrom kedro_datasets_experimental.opik import OpikEvaluationDataset\n\nlogging.basicConfig(\n level=logging.INFO,\n format=\"%(name)s \u2014 %(levelname)s \u2014 %(message)s\",\n)\n\n# Load credentials from the Kedro local config\nCREDENTIALS_FILE = Path(\"../conf/local/credentials.yml\")\nwith open(CREDENTIALS_FILE) as f:\n all_credentials = yaml.safe_load(f)\n\nCREDENTIALS = {\n \"api_key\": all_credentials[\"opik_credentials\"][\"api_key\"],\n \"workspace\": all_credentials[\"opik_credentials\"].get(\"workspace\", \"\"),\n}\n\n# Unique name per run \u2014 avoids cleanup issues between runs\nRUN_ID = datetime.now().strftime(\"%Y%m%d-%H%M%S\")\nDATASET_NAME = f\"e2e-intent-eval-{RUN_ID}\"\n\nTMP_DIR = Path(tempfile.mkdtemp(prefix=\"e2e_opik_\"))\nLOCAL_FILE = TMP_DIR / \"intent_eval_items.json\"\n\nprint(f\"Dataset name : {DATASET_NAME}\")\nprint(f\"Local file : {LOCAL_FILE}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## UUID helper\n", + "\n", + "Opik requires UUID v7 for stable dataset item IDs. Python < 3.13 does not have `uuid.uuid7()`, so we generate them manually." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def _uuid7() -> str:\n", + " \"\"\"Generate a UUID version 7 (time-ordered random).\"\"\"\n", + " timestamp_ms = int(time.time() * 1000)\n", + " rand = int.from_bytes(os.urandom(10), \"big\")\n", + " rand_a = (rand >> 62) & 0xFFF\n", + " rand_b = rand & 0x3FFFFFFFFFFFFFFF\n", + " uuid_int = (\n", + " (timestamp_ms & 0xFFFFFFFFFFFF) << 80\n", + " | 0x7 << 76\n", + " | rand_a << 64\n", + " | 0b10 << 62\n", + " | rand_b\n", + " )\n", + " h = f\"{uuid_int:032x}\"\n", + " return f\"{h[:8]}-{h[8:12]}-{h[12:16]}-{h[16:20]}-{h[20:]}\"" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Test data\n", + "\n", + "Stable UUID v7 values generated once per run. Items with human-readable ids are used only in Scenarios 7\u20138 to exercise the id-stripping path." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "_UUID_1 = _uuid7()\n", + "_UUID_2 = _uuid7()\n", + "_UUID_3 = _uuid7()\n", + "_UUID_4 = _uuid7()\n", + "_UUID_5 = _uuid7()\n", + "_UUID_6 = _uuid7()\n", + "\n", + "INITIAL_ITEMS = [\n", + " {\n", + " \"id\": _UUID_1,\n", + " \"input\": {\"question\": \"How do I submit a claim for a car accident?\"},\n", + " \"expected_output\": {\"intent\": \"general_question\"},\n", + " },\n", + " {\n", + " \"id\": _UUID_2,\n", + " \"input\": {\"question\": \"My car was hit in the parking lot, the door is damaged.\"},\n", + " \"expected_output\": {\"intent\": \"claim_new\"},\n", + " },\n", + " {\n", + " \"id\": _UUID_3,\n", + " \"input\": {\"question\": \"What's the status of my claim 48392?\"},\n", + " \"expected_output\": {\"intent\": \"existing_claim_question\"},\n", + " },\n", + "]\n", + "\n", + "# New item added in Scenario 3\n", + "NEW_ITEM = {\n", + " \"id\": _UUID_4,\n", + " \"input\": {\"question\": \"I need to upload more documents to my claim.\"},\n", + " \"expected_output\": {\"intent\": \"existing_claim_question\"},\n", + "}\n", + "\n", + "# Items passed to save() in Scenario 4:\n", + "# _UUID_5, _UUID_6 \u2014 new\n", + "# _UUID_1 \u2014 same UUID, updated expected_output (updated in-place on remote)\n", + "SAVE_ITEMS = [\n", + " {\n", + " \"id\": _UUID_5,\n", + " \"input\": {\"question\": \"I lost my phone during my trip yesterday.\"},\n", + " \"expected_output\": {\"intent\": \"claim_new\"},\n", + " },\n", + " {\n", + " \"id\": _UUID_6,\n", + " \"input\": {\"question\": \"I need help.\"},\n", + " \"expected_output\": {\"intent\": \"clarification_needed\"},\n", + " },\n", + " {\n", + " \"id\": _UUID_1,\n", + " \"input\": {\"question\": \"How do I submit a claim for a car accident?\"},\n", + " \"expected_output\": {\"intent\": \"general_question_updated\"},\n", + " },\n", + "]\n", + "\n", + "print(f\"UUID_1: {_UUID_1}\")\n", + "print(f\"UUID_2: {_UUID_2}\")\n", + "print(f\"UUID_3: {_UUID_3}\")\n", + "print(f\"UUID_4: {_UUID_4}\")\n", + "print(f\"UUID_5: {_UUID_5}\")\n", + "print(f\"UUID_6: {_UUID_6}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Helpers" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def write_local(items: list[dict]) -> None:\n", + " \"\"\"Write items to the local JSON test file.\"\"\"\n", + " LOCAL_FILE.write_text(json.dumps(items, indent=2))\n", + " print(f\" Wrote {len(items)} item(s) to local file\")\n", + "\n", + "\n", + "def read_local() -> list[dict]:\n", + " \"\"\"Read items from the local JSON test file.\"\"\"\n", + " return json.loads(LOCAL_FILE.read_text())\n", + "\n", + "\n", + "def make_dataset(sync_policy: str = \"local\", filepath: str | None = None) -> OpikEvaluationDataset:\n", + " \"\"\"Create an OpikEvaluationDataset instance with the shared test config.\"\"\"\n", + " return OpikEvaluationDataset(\n", + " dataset_name=DATASET_NAME,\n", + " credentials=CREDENTIALS,\n", + " filepath=filepath if filepath is not None else str(LOCAL_FILE),\n", + " sync_policy=sync_policy,\n", + " )\n", + "\n", + "\n", + "def summarise_remote(opik_dataset) -> dict:\n", + " \"\"\"Return a summary dict of the remote dataset for inspection.\"\"\"\n", + " items = opik_dataset.get_items()\n", + " return {\n", + " \"count\": len(items),\n", + " \"ids\": sorted(item[\"id\"] for item in items),\n", + " }" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "## Scenario 1: Fresh start \u2014 create remote dataset + sync local items\n", + "\n", + "**Steps:**\n", + "1. Write 3 items with UUID ids to the local file.\n", + "2. Call `load()` with `sync_policy='local'`.\n", + "\n", + "**Expected behaviour:**\n", + "- Remote dataset is created (it did not exist before).\n", + "- All 3 local items are inserted to remote.\n", + "- `load()` returns an `opik.Dataset` with 3 items.\n", + "\n", + "**Expected logs:**\n", + "```\n", + "Dataset '...' not found on Opik, creating it.\n", + "Syncing 3 item(s) from '...' to remote dataset '...'.\n", + "Loaded dataset '...' (sync_policy='local').\n", + "```\n", + "\n", + "**Verify in Opik UI:**\n", + "- Dataset `e2e-intent-eval-` exists under Datasets.\n", + "- Contains exactly 3 items with the correct `input` and `expected_output`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(\"=\" * 60)\n", + "print(\"SCENARIO 1: Fresh start\")\n", + "print(\"=\" * 60)\n", + "\n", + "write_local(INITIAL_ITEMS)\n", + "\n", + "ds1 = make_dataset()\n", + "result1 = ds1.load()\n", + "\n", + "summary1 = summarise_remote(result1)\n", + "assert summary1[\"count\"] == 3, f\"Expected 3 items, got {summary1['count']}\"\n", + "assert _UUID_1 in summary1[\"ids\"], f\"UUID_1 not found in remote: {summary1['ids']}\"\n", + "assert _UUID_2 in summary1[\"ids\"], f\"UUID_2 not found in remote: {summary1['ids']}\"\n", + "assert _UUID_3 in summary1[\"ids\"], f\"UUID_3 not found in remote: {summary1['ids']}\"\n", + "\n", + "print(f\"\\n\u2713 PASSED \u2014 Remote has {summary1['count']} item(s): {summary1['ids']}\")\n", + "print(\" \u2192 Check Opik UI: dataset should exist with 3 items\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n## Scenario 2: Idempotent reload \u2014 no duplicates on repeated `load()`\n\n**Steps:**\n1. Load the same dataset again (new instance, same config).\n2. Local file has not changed \u2014 same UUID v7 IDs, same content.\n\n**Expected behaviour:**\n- All items carry valid UUID v7 IDs, so Opik upserts by item ID. Same content means a no-op \u2014 no new remote rows.\n- Remote item count remains 3.\n\n**Verify in Opik UI:**\n- Dataset still shows 3 items (not 6).\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(\"=\" * 60)\n", + "print(\"SCENARIO 2: Idempotent reload\")\n", + "print(\"=\" * 60)\n", + "\n", + "ds2 = make_dataset()\n", + "result2 = ds2.load()\n", + "\n", + "summary2 = summarise_remote(result2)\n", + "assert summary2[\"count\"] == 3, f\"Expected 3 items (no duplicates), got {summary2['count']}\"\n", + "assert summary2[\"ids\"] == summary1[\"ids\"], (\n", + " f\"Ids changed between loads: before={summary1['ids']}, after={summary2['ids']}\"\n", + ")\n", + "\n", + "print(f\"\\n\u2713 PASSED \u2014 Still {summary2['count']} item(s) (no duplicates)\")\n", + "print(\" \u2192 Check Opik UI: dataset should still show 3 items\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n## Scenario 3: Incremental sync \u2014 one new local item\n\n**Steps:**\n1. Append a new item (UUID_4) to the local file.\n2. Call `load()`.\n\n**Expected behaviour:**\n- Remote gains exactly 1 new item (UUID_4).\n- Existing 3 items carry their original UUID v7 IDs \u2014 Opik upserts by ID, unchanged content is a no-op, so no new rows are created.\n\n**Verify in Opik UI:**\n- Dataset now shows 4 items.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(\"=\" * 60)\n", + "print(\"SCENARIO 3: Incremental sync\")\n", + "print(\"=\" * 60)\n", + "\n", + "write_local(INITIAL_ITEMS + [NEW_ITEM])\n", + "\n", + "ds3 = make_dataset()\n", + "result3 = ds3.load()\n", + "\n", + "summary3 = summarise_remote(result3)\n", + "assert summary3[\"count\"] == 4, f\"Expected 4 items, got {summary3['count']}\"\n", + "assert _UUID_4 in summary3[\"ids\"], f\"UUID_4 not found on remote: {summary3['ids']}\"\n", + "\n", + "items3 = result3.get_items()\n", + "item_uuid1 = next((i for i in items3 if i[\"id\"] == _UUID_1), None)\n", + "assert item_uuid1 is not None, \"UUID_1 not found in remote items\"\n", + "assert item_uuid1[\"input\"][\"question\"] == INITIAL_ITEMS[0][\"input\"][\"question\"], (\n", + " f\"UUID_1 content changed unexpectedly: {item_uuid1['input']}\"\n", + ")\n", + "\n", + "print(f\"\\n\u2713 PASSED \u2014 {summary3['count']} item(s): {summary3['ids']}\")\n", + "print(\" \u2192 Check Opik UI: dataset should show 4 items\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "## Scenario 4: `save()` \u2014 insert to remote + merge into local file\n", + "\n", + "**Steps:**\n", + "1. Call `save()` with 3 items: UUID_5 (new), UUID_6 (new), UUID_1 (same UUID, updated `expected_output`).\n", + "\n", + "**Expected behaviour (remote):**\n", + "- 2 new rows for UUID_5 and UUID_6.\n", + "- UUID_1 is a valid UUID v7 \u2014 Opik upserts by ID, so the existing row is updated in-place (no new row created).\n", + "- Remote total: 6 items (4 existing + UUID_5 + UUID_6; UUID_1 updated in place).\n", + "\n", + "**Expected behaviour (local file):**\n", + "- UUID_5 and UUID_6 appended.\n", + "- UUID_1 entry updated in place (local merge by id).\n", + "- Local total: 6 items.\n", + "\n", + "**Verify in Opik UI:**\n", + "- Dataset has 6 items.\n", + "- UUID_1's `expected_output` is `general_question_updated`.\n", + "- UUID_5 and UUID_6 are present." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(\"=\" * 60)\n", + "print(\"SCENARIO 4: save() with local merge\")\n", + "print(\"=\" * 60)\n", + "\n", + "ds4 = make_dataset()\n", + "ds4.save(SAVE_ITEMS)\n", + "\n", + "# Verify local file\n", + "local_items4 = read_local()\n", + "local_ids4 = [i.get(\"id\") for i in local_items4]\n", + "\n", + "assert len(local_items4) == 6, f\"Expected 6 local items, got {len(local_items4)}\"\n", + "assert _UUID_5 in local_ids4, \"UUID_5 not found in local file\"\n", + "assert _UUID_6 in local_ids4, \"UUID_6 not found in local file\"\n", + "\n", + "item_uuid1_local = next((i for i in local_items4 if i.get(\"id\") == _UUID_1), None)\n", + "assert item_uuid1_local is not None, \"UUID_1 not found in local file after save()\"\n", + "assert item_uuid1_local[\"expected_output\"][\"intent\"] == \"general_question_updated\", (\n", + " f\"UUID_1 expected_output not updated locally: {item_uuid1_local['expected_output']}\"\n", + ")\n", + "\n", + "# Verify remote state\n", + "ds4_remote = make_dataset(sync_policy=\"remote\")\n", + "result4_remote = ds4_remote.load()\n", + "summary4 = summarise_remote(result4_remote)\n", + "items4_remote = result4_remote.get_items()\n", + "\n", + "assert summary4[\"count\"] == 6, f\"Expected 6 remote items after save(), got {summary4['count']}\"\n", + "assert _UUID_5 in summary4[\"ids\"], f\"UUID_5 not found on remote: {summary4['ids']}\"\n", + "assert _UUID_6 in summary4[\"ids\"], f\"UUID_6 not found on remote: {summary4['ids']}\"\n", + "\n", + "item_uuid1_remote = next((i for i in items4_remote if i[\"id\"] == _UUID_1), None)\n", + "assert item_uuid1_remote is not None, \"UUID_1 not found on remote after save()\"\n", + "assert item_uuid1_remote[\"expected_output\"][\"intent\"] == \"general_question_updated\", (\n", + " f\"UUID_1 expected_output not updated on remote: {item_uuid1_remote['expected_output']}\"\n", + ")\n", + "\n", + "print(f\"\\n\u2713 PASSED \u2014 Local file has {len(local_items4)} item(s); UUID_1 updated in place\")\n", + "print(f\" Remote has {summary4['count']} item(s); UUID_1 updated in-place, UUID_5/UUID_6 added\")\n", + "print(\" \u2192 Check Opik UI: dataset should show 6 items\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Section 1 checklist\n", + "\n", + "| Check | Expected |\n", + "|-------|----------|\n", + "| Dataset `e2e-intent-eval-` exists | Yes |\n", + "| Total items after Scenario 1 | 3 |\n", + "| Total items after Scenario 2 (reload) | 3 (no duplicates) |\n", + "| Total items after Scenario 3 | 4 |\n", + "| Total items after Scenario 4 | 6 |\n", + "| UUID_1 `expected_output` after Scenario 4 | `general_question_updated` |\n", + "| UUID_5 and UUID_6 present | Yes |" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "# 2. Remote Mode (Scenarios 5\u20136)\n", + "\n", + "These scenarios reuse the dataset created by Section 1. Run Section 1 first." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "## Scenario 5: Remote mode \u2014 load from existing dataset\n", + "\n", + "**Steps:**\n", + "1. Instantiate `OpikEvaluationDataset` with `sync_policy='remote'`.\n", + "2. Call `load()`.\n", + "\n", + "**Expected behaviour:**\n", + "- Remote dataset is fetched as-is.\n", + "- Local file is NOT read or modified.\n", + "- `load()` returns the `opik.Dataset` with all remote items.\n", + "\n", + "**Verify in Opik UI:**\n", + "- No new items created; dataset unchanged." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(\"=\" * 60)\n", + "print(\"SCENARIO 5: Remote mode \u2014 load from existing dataset\")\n", + "print(\"=\" * 60)\n", + "\n", + "ds5 = make_dataset(sync_policy=\"remote\")\n", + "result5 = ds5.load()\n", + "\n", + "summary5 = summarise_remote(result5)\n", + "assert summary5[\"count\"] >= 4, (\n", + " f\"Expected at least 4 items from prior scenarios, got {summary5['count']}\"\n", + ")\n", + "assert _UUID_2 in summary5[\"ids\"], f\"UUID_2 not found in remote after remote load: {summary5['ids']}\"\n", + "\n", + "items5 = result5.get_items()\n", + "item_uuid2 = next((i for i in items5 if i[\"id\"] == _UUID_2), None)\n", + "assert item_uuid2 is not None, \"UUID_2 not found in remote items\"\n", + "assert item_uuid2[\"input\"][\"question\"] == INITIAL_ITEMS[1][\"input\"][\"question\"], (\n", + " f\"UUID_2 content changed unexpectedly: {item_uuid2['input']}\"\n", + ")\n", + "\n", + "print(f\"\\n\u2713 PASSED \u2014 Remote mode loaded {summary5['count']} item(s)\")\n", + "print(\" \u2192 Check Opik UI: dataset unchanged\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "## Scenario 6: Remote mode \u2014 `save()` uploads to remote, local file untouched\n", + "\n", + "**Steps:**\n", + "1. Record local file item count before `save()`.\n", + "2. Call `save()` with a new item using `sync_policy='remote'`.\n", + "\n", + "**Expected behaviour:**\n", + "- New item is uploaded to remote.\n", + "- Local file is NOT modified.\n", + "\n", + "**Verify in Opik UI:**\n", + "- Remote dataset has one additional item." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(\"=\" * 60)\n", + "print(\"SCENARIO 6: Remote mode \u2014 save() uploads to remote only\")\n", + "print(\"=\" * 60)\n", + "\n", + "local_items_before = read_local()\n", + "local_count_before = len(local_items_before)\n", + "\n", + "remote_save_item = {\n", + " \"id\": _uuid7(),\n", + " \"input\": {\"question\": \"Can I update my contact address in my insurance profile?\"},\n", + " \"expected_output\": {\"intent\": \"general_question\"},\n", + "}\n", + "\n", + "ds6 = make_dataset(sync_policy=\"remote\")\n", + "ds6.save([remote_save_item])\n", + "\n", + "local_items_after = read_local()\n", + "assert len(local_items_after) == local_count_before, (\n", + " f\"Local file was modified in remote mode: before={local_count_before}, \"\n", + " f\"after={len(local_items_after)}\"\n", + ")\n", + "assert not any(i.get(\"id\") == remote_save_item[\"id\"] for i in local_items_after), (\n", + " \"Remote-only save() item found in local file \u2014 should not be written locally\"\n", + ")\n", + "\n", + "print(f\"\\n\u2713 PASSED \u2014 Local file unchanged ({local_count_before} items); remote received new item\")\n", + "print(\" \u2192 Check Opik UI: dataset should have one additional item\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Section 2 checklist\n", + "\n", + "| Check | Expected |\n", + "|-------|----------|\n", + "| Scenario 5: item count | Same as end of Section 1 (no change) |\n", + "| Scenario 5: UUID_2 content | Unchanged |\n", + "| Scenario 6: local file item count | Unchanged |\n", + "| Scenario 6: new item on remote | Yes (1 additional row) |" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "# 3. UUID ID Behaviour (Scenarios 7\u20138)\n", + "\n", + "These scenarios use a **separate dataset** (`-sc7`) to avoid interfering with Section 1\u20132 data." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "## Scenario 7: UUID ids forwarded; human-readable ids stripped\n", + "\n", + "**Steps:**\n", + "1. Create a separate dataset with one UUID-id item and one human-readable-id item.\n", + "2. Call `load()` with `sync_policy='local'`.\n", + "\n", + "**Expected behaviour:**\n", + "- UUID-id item appears on remote with its original UUID.\n", + "- Human-readable-id item (`human_001`) appears with an auto-generated UUID \u2014 the original id is not present in any remote item.\n", + "\n", + "**Verify in Opik UI:**\n", + "- One remote item has id == UUID-id item's UUID.\n", + "- The other remote item has a system-generated UUID (not `human_001`)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(\"=\" * 60)\n", + "print(\"SCENARIO 7: UUID ids forwarded; human-readable ids stripped\")\n", + "print(\"=\" * 60)\n", + "\n", + "uuid_id_item = {\n", + " \"id\": _uuid7(),\n", + " \"input\": {\"question\": \"UUID-id item \u2014 stable remote identity.\"},\n", + " \"expected_output\": {\"intent\": \"test\"},\n", + "}\n", + "non_uuid_id_item = {\n", + " \"id\": \"human_001\",\n", + " \"input\": {\"question\": \"Human-readable id item \u2014 id will be stripped.\"},\n", + " \"expected_output\": {\"intent\": \"test\"},\n", + "}\n", + "\n", + "tmp_file7 = TMP_DIR / \"scenario7.json\"\n", + "tmp_file7.write_text(json.dumps([uuid_id_item, non_uuid_id_item], indent=2))\n", + "\n", + "ds7 = OpikEvaluationDataset(\n", + " dataset_name=f\"{DATASET_NAME}-sc7\",\n", + " credentials=CREDENTIALS,\n", + " filepath=str(tmp_file7),\n", + " sync_policy=\"local\",\n", + ")\n", + "result7 = ds7.load()\n", + "\n", + "items7 = result7.get_items()\n", + "assert len(items7) == 2, f\"Expected 2 remote items, got {len(items7)}\"\n", + "\n", + "remote_ids7 = [i[\"id\"] for i in items7]\n", + "assert uuid_id_item[\"id\"] in remote_ids7, f\"UUID id not found in remote: {remote_ids7}\"\n", + "assert \"human_001\" not in remote_ids7, f\"Human-readable id leaked to remote: {remote_ids7}\"\n", + "\n", + "print(f\"\\n\u2713 PASSED \u2014 UUID id preserved; 'human_001' stripped (remote ids: {remote_ids7})\")\n", + "print(\" \u2192 Check Opik UI: one item has the original UUID, the other has a generated UUID\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "## Scenario 8: Same UUID id with changed content \u2014 row updated in-place\n", + "\n", + "**Steps:**\n", + "1. Update the UUID-id item's `expected_output` in the local file.\n", + "2. Call `load()` again.\n", + "\n", + "**Expected behaviour:**\n", + "- The UUID is still forwarded in the insert call (not stripped).\n", + "- Opik's REST endpoint upserts by item ID \u2014 the existing remote row is updated in-place with the new content. No additional row is created.\n", + "\n", + "**Verify in Opik UI:**\n", + "- Dataset still has 2 items.\n", + "- UUID-id item now has `expected_output.intent == test_updated`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(\"=\" * 60)\nprint(\"SCENARIO 8: Same UUID id with changed content \u2014 updated in-place\")\nprint(\"=\" * 60)\n\n# Self-contained: own dataset and file\nuuid_id_item_sc8 = {\n \"id\": _uuid7(),\n \"input\": {\"question\": \"UUID-id item \u2014 stable remote identity.\"},\n \"expected_output\": {\"intent\": \"test\"},\n}\nnon_uuid_id_item_sc8 = {\n \"id\": \"human_001\",\n \"input\": {\"question\": \"Human-readable id item \u2014 id will be stripped.\"},\n \"expected_output\": {\"intent\": \"test\"},\n}\ntmp_file8 = TMP_DIR / \"scenario8.json\"\ntmp_file8.write_text(json.dumps([uuid_id_item_sc8, non_uuid_id_item_sc8], indent=2))\n\n# First load \u2014 establishes the remote item with uuid_id_item_sc8[\"id\"]\nds8_init = OpikEvaluationDataset(\n dataset_name=f\"{DATASET_NAME}-sc8\",\n credentials=CREDENTIALS,\n filepath=str(tmp_file8),\n sync_policy=\"local\",\n)\nds8_init.load()\n\n# Now update the content for the UUID item and reload\nupdated_uuid_item_sc8 = {\n **uuid_id_item_sc8,\n \"expected_output\": {\"intent\": \"test_updated\"},\n}\ntmp_file8.write_text(json.dumps([updated_uuid_item_sc8, non_uuid_id_item_sc8], indent=2))\n\nds8 = OpikEvaluationDataset(\n dataset_name=f\"{DATASET_NAME}-sc8\",\n credentials=CREDENTIALS,\n filepath=str(tmp_file8),\n sync_policy=\"local\",\n)\nresult8 = ds8.load()\n\nitems8 = result8.get_items()\nremote_ids8 = [i[\"id\"] for i in items8]\nassert uuid_id_item_sc8[\"id\"] in remote_ids8, (\n f\"UUID id no longer in remote after content change: {remote_ids8}\"\n)\n\nprint(\"\")\nprint(f\"\u2713 PASSED \u2014 UUID id still present after content change (remote ids: {remote_ids8})\")\nprint(\" \u2192 Check Opik UI: UUID-id item should show updated expected_output\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Section 3 checklist\n", + "\n", + "| Check | Expected |\n", + "|-------|----------|\n", + "| Dataset `-sc7` item count | 2 |\n", + "| UUID-id item remote id | Same as local UUID |\n", + "| `human_001` id on remote | Not present (stripped) |\n", + "| After Scenario 8: UUID-id item `expected_output` | `test_updated` |\n", + "| After Scenario 8: total item count | Still 2 (no new row created) |" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "# 4. Edge Cases (Scenarios 9\u201310)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "## Scenario 9: Items without `id` \u2014 warning emitted, content-hash dedup on reload\n", + "\n", + "**Steps:**\n", + "1. Write 2 items with no `id` field to a local file.\n", + "2. Call `load()` twice.\n", + "\n", + "**Expected behaviour:**\n", + "- Warning is logged on each load about missing/empty `id` fields.\n", + "- Each `load()` strips the missing id and Opik auto-generates a new UUID v7.\n", + "- Because content is unchanged between the two loads, Opik's content-hash dedup prevents duplicates \u2014 remote count stays at 2 after the second load.\n", + "- Items without an `id` whose content changes between loads will accumulate new remote rows on every sync.\n", + "\n", + "**Verify in Opik UI:**\n", + "- Dataset `-sc9` has 2 items after both loads." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(\"=\" * 60)\n", + "print(\"SCENARIO 9: Items without id \u2014 content-hash dedup on reload\")\n", + "print(\"=\" * 60)\n", + "\n", + "no_id_items = [\n", + " {\"input\": {\"question\": \"What happens without an id?\"}},\n", + " {\"input\": {\"question\": \"Another item with no id.\"}},\n", + "]\n", + "tmp_file9 = TMP_DIR / \"scenario9.json\"\n", + "tmp_file9.write_text(json.dumps(no_id_items, indent=2))\n", + "\n", + "ds9a = OpikEvaluationDataset(\n", + " dataset_name=f\"{DATASET_NAME}-sc9\",\n", + " credentials=CREDENTIALS,\n", + " filepath=str(tmp_file9),\n", + " sync_policy=\"local\",\n", + ")\n", + "result9a = ds9a.load()\n", + "count9a = len(result9a.get_items())\n", + "assert count9a == 2, f\"Expected 2 items after first load, got {count9a}\"\n", + "\n", + "ds9b = OpikEvaluationDataset(\n", + " dataset_name=f\"{DATASET_NAME}-sc9\",\n", + " credentials=CREDENTIALS,\n", + " filepath=str(tmp_file9),\n", + " sync_policy=\"local\",\n", + ")\n", + "result9b = ds9b.load()\n", + "count9b = len(result9b.get_items())\n", + "assert count9b == 2, (\n", + " f\"Expected 2 items after second load (content-hash dedup), got {count9b}\"\n", + ")\n", + "\n", + "print(f\"\\n\u2713 PASSED \u2014 {count9b} item(s) on remote; content-hash dedup prevented duplicates\")\n", + "print(\" \u2192 Check logs above for 'missing, None, or empty id' warning\")\n", + "print(\" \u2192 Check Opik UI: dataset should show 2 items\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "## Scenario 10: Invalid credentials \u2014 `DatasetError` raised at `__init__`\n", + "\n", + "**Steps:**\n", + "1. Attempt to create instances with missing and empty `api_key`.\n", + "\n", + "**Expected behaviour:**\n", + "- Both cases raise `DatasetError` at construction time (not at `load()`).\n", + "- Error message mentions `api_key`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(\"=\" * 60)\n", + "print(\"SCENARIO 10: Invalid credentials \u2014 DatasetError raised\")\n", + "print(\"=\" * 60)\n", + "\n", + "test_cases_10 = [\n", + " {\n", + " \"label\": \"Missing api_key\",\n", + " \"credentials\": {\"workspace\": \"x\"},\n", + " \"expect_in_error\": \"api_key\",\n", + " },\n", + " {\n", + " \"label\": \"Empty api_key\",\n", + " \"credentials\": {\"api_key\": \" \"},\n", + " \"expect_in_error\": \"api_key\",\n", + " },\n", + "]\n", + "\n", + "for tc in test_cases_10:\n", + " try:\n", + " OpikEvaluationDataset(\n", + " dataset_name=\"dummy\",\n", + " credentials=tc[\"credentials\"],\n", + " )\n", + " assert False, f\"Expected DatasetError for: {tc['label']}\"\n", + " except DatasetError as e:\n", + " assert tc[\"expect_in_error\"] in str(e), (\n", + " f\"Error message for '{tc['label']}' should mention '{tc['expect_in_error']}': {e}\"\n", + " )\n", + " print(f\" \u2713 {tc['label']}: DatasetError raised as expected\")\n", + "\n", + "print(\"\\n\u2713 PASSED \u2014 All invalid config cases raise DatasetError\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Section 4 checklist\n", + "\n", + "| Check | Expected |\n", + "|-------|----------|\n", + "| Dataset `-sc9` item count after both loads | 2 (content-hash dedup) |\n", + "| Missing `api_key` | `DatasetError` at `__init__` |\n", + "| Empty `api_key` | `DatasetError` at `__init__` |" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "# 5. Lifecycle & Integration (Scenarios 11\u201312)\n", + "\n", + "These scenarios reuse the main dataset from Section 1. Run Section 1 first." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "## Scenario 11: Lifecycle via native Opik API \u2014 update and delete\n", + "\n", + "**Steps:**\n", + "1. Load the main dataset (built by Scenarios 1\u20134).\n", + "2. Update the first item's `expected_output` via `dataset.update()`.\n", + "3. Delete the last item via `dataset.delete()`.\n", + "\n", + "**Expected behaviour:**\n", + "- Updated item reflects new `expected_output`.\n", + "- Deleted item no longer present in `get_items()`.\n", + "\n", + "**Verify in Opik UI:**\n", + "- Correct item has updated `expected_output`.\n", + "- Deleted item is gone." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(\"=\" * 60)\n", + "print(\"SCENARIO 11: Lifecycle via native Opik API \u2014 update and delete\")\n", + "print(\"=\" * 60)\n", + "\n", + "ds11 = make_dataset(sync_policy=\"remote\")\n", + "result11 = ds11.load()\n", + "items11_before = result11.get_items()\n", + "count11_before = len(items11_before)\n", + "\n", + "assert count11_before >= 2, f\"Need at least 2 items for update/delete demo, got {count11_before}\"\n", + "\n", + "# Update first item\n", + "to_update = {**items11_before[0], \"expected_output\": {\"intent\": \"lifecycle_updated\"}}\n", + "result11.update([to_update])\n", + "\n", + "refreshed11 = result11.get_items()\n", + "updated_item = next((i for i in refreshed11 if i[\"id\"] == items11_before[0][\"id\"]), None)\n", + "assert updated_item is not None, \"Updated item not found after update()\"\n", + "assert updated_item[\"expected_output\"][\"intent\"] == \"lifecycle_updated\", (\n", + " f\"Update did not take effect: {updated_item['expected_output']}\"\n", + ")\n", + "\n", + "# Delete last item\n", + "to_delete_id = items11_before[-1][\"id\"]\n", + "result11.delete([to_delete_id])\n", + "\n", + "refreshed11_after_delete = result11.get_items()\n", + "deleted_ids = [i[\"id\"] for i in refreshed11_after_delete]\n", + "assert to_delete_id not in deleted_ids, (\n", + " f\"Deleted item {to_delete_id} still present in remote\"\n", + ")\n", + "\n", + "print(f\"\\n\u2713 PASSED \u2014 Item {items11_before[0]['id']} updated; item {to_delete_id} deleted\")\n", + "print(f\" Items remaining: {len(refreshed11_after_delete)}\")\n", + "print(\" \u2192 Check Opik UI: first item updated, last item gone\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "## Scenario 12: Running an experiment with `opik.evaluation.evaluate()`\n", + "\n", + "**Steps:**\n", + "1. Load the main dataset.\n", + "2. Define a simple echo task and an exact-match scorer.\n", + "3. Run `opik.evaluation.evaluate()`.\n", + "\n", + "**Expected behaviour:**\n", + "- Experiment is created in Opik.\n", + "- Scores are computed for each item.\n", + "- `experiment_url` is printed.\n", + "\n", + "**Verify in Opik UI:**\n", + "- Experiment appears under the dataset.\n", + "- Each item has a score." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(\"=\" * 60)\n", + "print(\"SCENARIO 12: Running an experiment with evaluate()\")\n", + "print(\"=\" * 60)\n", + "\n", + "\n", + "def echo_task(dataset_item: dict) -> dict:\n", + " \"\"\"Return the expected_output as the task output (trivial echo).\"\"\"\n", + " return {\"output\": dataset_item.get(\"expected_output\", {})}\n", + "\n", + "\n", + "def exact_match_scorer(dataset_item: dict, task_outputs: dict, **kwargs) -> ScoreResult:\n", + " \"\"\"Score 1.0 if task output matches expected_output, else 0.0.\"\"\"\n", + " expected = dataset_item.get(\"expected_output\", {})\n", + " actual = task_outputs.get(\"output\", {})\n", + " score = 1.0 if actual == expected else 0.0\n", + " return ScoreResult(\n", + " name=\"exact_match\",\n", + " value=score,\n", + " reason=f\"expected={expected}, actual={actual}\",\n", + " )\n", + "\n", + "\n", + "ds12 = make_dataset(sync_policy=\"remote\")\n", + "eval_dataset = ds12.load()\n", + "\n", + "experiment_name = f\"e2e-echo-experiment-{RUN_ID}\"\n", + "result12 = evaluate(\n", + " dataset=eval_dataset,\n", + " task=echo_task,\n", + " scoring_functions=[exact_match_scorer],\n", + " experiment_name=experiment_name,\n", + " experiment_config={\"task\": \"echo\", \"scorer\": \"exact_match\"},\n", + ")\n", + "\n", + "scores12 = [\n", + " sr.value\n", + " for tr in result12.test_results\n", + " for sr in tr.score_results\n", + " if not sr.scoring_failed\n", + "]\n", + "assert len(scores12) > 0, \"No scores recorded \u2014 check that the dataset has items\"\n", + "\n", + "avg12 = sum(scores12) / len(scores12)\n", + "print(f\"\\n\u2713 PASSED \u2014 Experiment '{experiment_name}' completed\")\n", + "print(f\" Items scored: {len(scores12)}\")\n", + "print(f\" Average exact_match score: {avg12:.2f}\")\n", + "print(f\" Results: {result12.experiment_url}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Section 5 checklist\n", + "\n", + "| Check | Expected |\n", + "|-------|----------|\n", + "| Scenario 11: first item `expected_output` | `lifecycle_updated` |\n", + "| Scenario 11: deleted item | Gone from dataset |\n", + "| Scenario 12: experiment created | Yes |\n", + "| Scenario 12: scores recorded | > 0 |\n", + "| Scenario 12: `experiment_url` printed | Yes |\n", + "\n", + "---\n", + "## Clean up\n", + "\n", + "Delete the datasets created by this run from the Opik UI:\n", + "- `e2e-intent-eval-`\n", + "- `e2e-intent-eval--sc7`\n", + "- `e2e-intent-eval--sc9`" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python", + "version": "3.10.0" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} \ No newline at end of file diff --git a/kedro-agentic-workflows/src/kedro_agentic_workflows/datasets/opik_evaluation_dataset.py b/kedro-agentic-workflows/src/kedro_agentic_workflows/datasets/opik_evaluation_dataset.py new file mode 100644 index 0000000..d569bc8 --- /dev/null +++ b/kedro-agentic-workflows/src/kedro_agentic_workflows/datasets/opik_evaluation_dataset.py @@ -0,0 +1,604 @@ +import json +import logging +import uuid +from pathlib import Path +from typing import TYPE_CHECKING, Any, Literal + +from kedro.io import AbstractDataset, DatasetError +from opik import Opik +from opik.api_objects.dataset.dataset import Dataset +from opik.rest_api.core.api_error import ApiError + +from kedro_datasets._typing import JSONPreview + +if TYPE_CHECKING: + from kedro_datasets.json import JSONDataset + from kedro_datasets.yaml import YAMLDataset + +logger = logging.getLogger(__name__) + +SUPPORTED_FILE_EXTENSIONS = {".json", ".yaml", ".yml"} +REQUIRED_OPIK_CREDENTIALS = {"api_key"} +OPTIONAL_OPIK_CREDENTIALS = {"workspace", "host", "project_name"} +VALID_SYNC_POLICIES = {"local", "remote"} +HTTP_NOT_FOUND = 404 +REQUIRED_UUID_VERSION = 7 + + +class OpikEvaluationDataset(AbstractDataset): + """Kedro dataset for Opik evaluation datasets. + + Connects to an Opik evaluation dataset and returns an ``opik.Dataset`` + on ``load()``, which can be passed to ``opik.evaluation.evaluate()`` to + run experiments. Supports an optional local JSON/YAML file as the + authoring surface for evaluation items. + + **On load / save behaviour:** + + - **On load:** Creates the remote dataset if it does not exist, + synchronises based on ``sync_policy``, and returns an ``opik.Dataset``. + - **On save:** Inserts all items to the remote dataset via Opik's + upsert-by-ID API. Items with a UUID v7 ``id`` update the existing + remote row in-place; items without a UUID v7 ``id`` create a new + remote row on every call. In ``local`` mode, items are also merged + into the local file (new items take precedence). In ``remote`` mode, + only the remote insert occurs. + + **Item format:** + + The local file and ``save()`` data must be a list of dicts. Each item + accepts the following keys: + + - ``input`` (**required**) โ€” the evaluation input payload. + - ``id`` โ€” identifier used for local deduplication. The upload + behaviour depends on whether ``id`` is a valid UUID v7: + + - **Valid UUID v7**: forwarded to Opik. Opik's API upserts by item + ID โ€” the first sync creates the remote row; subsequent syncs + update that same row in-place if the content has changed. + The remote row keeps the same UUID across all syncs. Whenever + content changes, the existing remote row is updated in-place, + while no new row is created. + - **All other values** (human-readable strings, UUIDs of other + versions, ``None``, empty string, or no ``id`` key): stripped + before upload. Opik auto-generates a new UUID v7. Unchanged + content is deduplicated by content hash (no-op), but changed + content creates a **new remote row** while the previous one + remains, leading to row accumulation over time. + + - ``expected_output`` โ€” ground-truth value for scoring. + - ``metadata`` โ€” arbitrary metadata dict attached to the item. + + ```json + [ + { + "id": "q1", + "input": {"text": "cancel my order"}, + "expected_output": "cancel_order", + "metadata": {"source": "production"} + } + ] + ``` + ("q1" is used for local deduplication only, as it is not a UUID v7 and will be stripped on upload) + + **Sync policies:** + + - **local** (default): The local file is the source of truth. On + ``load()``, all local items are re-inserted to remote on every sync. + Opik's API upserts by item ID, so the outcome depends on whether + each item carries a UUID v7 ``id``: + + - Items with a UUID v7 ``id`` are updated in-place on the remote โ€” + content changes replace the existing row; unchanged items are + a no-op. + - Items without a UUID v7 ``id`` (non-UUID values are stripped) + are deduplicated by content hash โ€” unchanged content is a no-op, + but changed content creates a **new remote row** (the previous + row remains), leading to row accumulation over time. + + ``save()`` inserts to remote and merges into the local file (new + data takes precedence). + - **remote**: The remote Opik dataset is the sole source of truth. + ``load()`` fetches the remote dataset as-is with no local file + interaction. ``save()`` inserts all items to remote without writing + to any local file. If the remote dataset does not exist yet, it is + created empty โ€” **no items are pushed from the local file**. To seed + a new remote dataset, run with ``sync_policy="local"`` at least once, + or create and populate the dataset directly via the Opik UI. + + Examples: + Using catalog YAML configuration: + + ```yaml + # Local sync policy โ€” local file seeds and syncs to remote + evaluation_dataset: + type: kedro_datasets_experimental.opik.OpikEvaluationDataset + dataset_name: intent-detection-eval + filepath: data/evaluation/intent_items.json + sync_policy: local + credentials: opik_credentials + metadata: + project: intent-detection + + # Remote sync policy โ€” Opik is the source of truth + production_eval: + type: kedro_datasets_experimental.opik.OpikEvaluationDataset + dataset_name: intent-detection-eval + sync_policy: remote + credentials: opik_credentials + ``` + + Using Python API: + + ```python + from kedro_datasets_experimental.opik import OpikEvaluationDataset + + dataset = OpikEvaluationDataset( + dataset_name="intent-detection-eval", + credentials={"api_key": "..."}, # pragma: allowlist secret + filepath="data/evaluation/intent_items.json", + ) + + # Load returns an opik.Dataset for running experiments + from opik.evaluation import evaluate + + eval_dataset = dataset.load() + evaluate( + dataset=eval_dataset, + task=my_task, + scoring_functions=[my_scorer], + experiment_name="my-experiment", + ) + + # Save new evaluation items + dataset.save( + [ + {"id": "q1", "input": {"text": "cancel order"}, "expected_output": "cancel"}, + ] + ) + + # Same as in the other example, "q1" is not a UUID v7 and will be stripped on upload + ``` + """ + + def __init__( + self, + dataset_name: str, + credentials: dict[str, str], + filepath: str | None = None, + sync_policy: Literal["local", "remote"] = "local", + metadata: dict[str, Any] | None = None, + ): + """Initialise ``OpikEvaluationDataset``. + + Args: + dataset_name: Name of the evaluation dataset in Opik. + credentials: Opik authentication credentials. + Required: ``api_key``. + Optional: ``workspace``, ``host``, ``project_name``. + filepath: Path to a local JSON/YAML file for authoring evaluation + items. Supports ``.json``, ``.yaml``, and ``.yml`` extensions. + When ``None``, no local file interaction occurs. + sync_policy: Controls the source of truth for reads and whether + a local file is involved: + ``"local"`` (default) โ€” all local items are re-inserted to + remote on ``load()``; ``save()`` inserts to remote and + merges into the local file (new data takes precedence). + ``"remote"`` โ€” ``load()`` fetches remote as-is; ``save()`` + inserts to remote without local file interaction. + metadata: Optional metadata dict stored locally and returned by + ``_describe()``. Note: Opik's ``create_dataset()`` does not + accept a metadata argument, so this value is not propagated + to the remote dataset. + """ + self._validate_init_params(credentials, filepath, sync_policy) + + self._dataset_name = dataset_name + self._filepath = Path(filepath) if filepath else None + self._sync_policy = sync_policy + self._metadata = metadata + self._file_dataset = None + + try: + self._client = Opik(**credentials) + except Exception as e: + raise DatasetError(f"Failed to initialise Opik client: {e}") from e + + @staticmethod + def _validate_init_params( + credentials: dict[str, str], + filepath: str | None, + sync_policy: str, + ) -> None: + OpikEvaluationDataset._validate_credentials(credentials) + OpikEvaluationDataset._validate_sync_policy(sync_policy) + OpikEvaluationDataset._validate_filepath(filepath) + + @staticmethod + def _validate_credentials(credentials: dict[str, str]) -> None: + for key in REQUIRED_OPIK_CREDENTIALS: + if key not in credentials: + raise DatasetError( + f"Missing required Opik credential: '{key}'." + ) + if not credentials[key] or not str(credentials[key]).strip(): + raise DatasetError( + f"Opik credential '{key}' cannot be empty." + ) + for key in OPTIONAL_OPIK_CREDENTIALS: + if key in credentials and ( + not credentials[key] or not str(credentials[key]).strip() + ): + raise DatasetError( + f"Opik credential '{key}' cannot be empty if provided." + ) + + @staticmethod + def _validate_sync_policy(sync_policy: str) -> None: + if sync_policy not in VALID_SYNC_POLICIES: + raise DatasetError( + f"Invalid sync_policy '{sync_policy}'. " + f"Must be one of: {', '.join(sorted(VALID_SYNC_POLICIES))}." + ) + + @staticmethod + def _validate_filepath(filepath: str | None) -> None: + if filepath is None: + return + suffix = Path(filepath).suffix.lower() + if suffix not in SUPPORTED_FILE_EXTENSIONS: + raise DatasetError( + f"Unsupported file extension '{suffix}'. " + f"Supported formats: {', '.join(sorted(SUPPORTED_FILE_EXTENSIONS))}." + ) + + @property + def file_dataset(self) -> "JSONDataset | YAMLDataset": + """Return a JSON or YAML file dataset based on the filepath extension.""" + if not self._filepath: + raise DatasetError("filepath must be provided for file dataset operations.") + if self._file_dataset is None: + suffix = self._filepath.suffix.lower() + if suffix in (".yaml", ".yml"): + from kedro_datasets.yaml import YAMLDataset # noqa: PLC0415 + self._file_dataset = YAMLDataset(filepath=str(self._filepath)) + else: + from kedro_datasets.json import JSONDataset # noqa: PLC0415 + self._file_dataset = JSONDataset(filepath=str(self._filepath)) + return self._file_dataset + + def _get_or_create_remote_dataset(self) -> Dataset: + """Ensure the remote Opik dataset exists, creating it if not found. + + Returns the latest ``Dataset`` object. + + Raises: + DatasetError: If the Opik API returns an unexpected error or is + unreachable. + """ + try: + return self._client.get_dataset(name=self._dataset_name) + except ApiError as e: + if e.status_code != HTTP_NOT_FOUND: + raise DatasetError( + f"Opik API error while fetching dataset '{self._dataset_name}': {e}" + ) from e + except Exception as e: + raise DatasetError( + f"Failed to connect to Opik while fetching dataset " + f"'{self._dataset_name}': {e}" + ) from e + + try: + logger.info( + "Dataset '%s' not found on Opik, creating it.", + self._dataset_name, + ) + return self._client.create_dataset( + name=self._dataset_name, + description=f"Created by Kedro (sync_policy={self._sync_policy})", + ) + except ApiError as e: + raise DatasetError( + f"Opik API error while creating dataset '{self._dataset_name}': {e}" + ) from e + except Exception as e: + raise DatasetError( + f"Failed to connect to Opik while creating dataset " + f"'{self._dataset_name}': {e}" + ) from e + + @staticmethod + def _strip_id(item: dict[str, Any]) -> dict[str, Any]: + return {k: v for k, v in item.items() if k != "id"} + + @staticmethod + def _validate_items(items: list[dict[str, Any]]) -> None: + """Validate that all items contain the required ``input`` key. + + Raises: + DatasetError: If any item is missing the ``input`` key. + """ + for i, item in enumerate(items): + if "input" not in item: + raise DatasetError( + f"Dataset item at index {i} is missing required 'input' key." + ) + + def _upload_items(self, dataset: Dataset, items: list[dict[str, Any]]) -> None: + """Insert items into the remote Opik dataset. + + Upload behaviour depends on whether an item carries a UUID v7 ``id``: + + - **Valid UUID v7**: forwarded to Opik. Opik's REST API calls + ``create_or_update`` by item ID โ€” the first call creates the + remote row; subsequent calls update that same row in-place if + the content has changed. Whenever content changes, the existing + remote row is updated in-place, while no new row is created. + - **All other values** (human-readable strings, UUIDs of other + versions, ``None``, empty string, or no ``id`` key): stripped + before upload. Opik auto-generates a new UUID v7. Unchanged + content is deduplicated by content hash (no-op), but changed + content creates a **new remote row** while the previous one + remains. + + Callers are responsible for validating items before calling this method. + + Raises: + DatasetError: If the Opik API returns an error or the server is + unreachable during insert. + """ + items_to_insert = [] + for item in items: + if "id" not in item: + items_to_insert.append(item) + elif not item["id"]: + items_to_insert.append(self._strip_id(item)) + else: + try: + parsed = uuid.UUID(str(item["id"])) + if parsed.version == REQUIRED_UUID_VERSION: + items_to_insert.append(item) # valid UUID v7 โ€” preserve id + else: + items_to_insert.append(self._strip_id(item)) + except ValueError: + items_to_insert.append(self._strip_id(item)) + try: + dataset.insert(items_to_insert) + except ApiError as e: + raise DatasetError( + f"Opik API error while inserting items into dataset " + f"'{self._dataset_name}': {e}" + ) from e + except Exception as e: + raise DatasetError( + f"Failed to insert items into Opik dataset '{self._dataset_name}': {e}" + ) from e + + def _sync_local_to_remote(self, dataset: Dataset) -> Dataset: + """Insert all local items into the remote dataset. + + Reads the local file and inserts all items into the remote dataset. + The Opik SDK deduplicates by content hash, so re-inserting unchanged + items is a no-op. Returns a refreshed ``Dataset`` object. If the dataset's + id is a valid UUID v7, the same remote row is updated in-place on every sync. + Otherwise, a new remote row will be created. + """ + if not self._filepath or not self._filepath.exists(): + return dataset + + local_items = self.file_dataset.load() + self._validate_items(local_items) + + if not local_items: + return dataset + + items_without_stable_id = [ + item for item in local_items + if "id" not in item or not item.get("id") + ] + if items_without_stable_id: + logger.warning( + "Found %d item(s) with a missing, None, or empty 'id' field in '%s'. " + "These cannot be tracked across syncs and will create new remote " + "rows on every load.", + len(items_without_stable_id), + self._filepath, + ) + + items_with_non_uuid_v7_id = [] + for item in local_items: + if item.get("id"): # present and non-empty/non-None + try: + parsed = uuid.UUID(str(item["id"])) + if parsed.version != REQUIRED_UUID_VERSION: + items_with_non_uuid_v7_id.append(item) + except ValueError: + items_with_non_uuid_v7_id.append(item) + if items_with_non_uuid_v7_id: + logger.warning( + "Found %d item(s) with non-UUID-v7 'id' values in '%s' " + "(e.g. '%s'). Opik requires UUID v7 for item IDs โ€” these " + "will be stripped before upload and Opik will auto-generate " + "UUID v7 values. Remote rows will not have stable identities.", + len(items_with_non_uuid_v7_id), + self._filepath, + items_with_non_uuid_v7_id[0]["id"], + ) + + logger.info( + "Syncing %d item(s) from '%s' to remote dataset '%s'.", + len(local_items), + self._filepath, + self._dataset_name, + ) + self._upload_items(dataset, local_items) + try: + self._client.flush() + except Exception as e: + raise DatasetError( + f"Failed to flush items to Opik dataset '{self._dataset_name}': {e}" + ) from e + + try: + return self._client.get_dataset(name=self._dataset_name) + except ApiError as e: + raise DatasetError( + f"Opik API error while refreshing dataset '{self._dataset_name}' after sync: {e}" + ) from e + except Exception as e: + raise DatasetError( + f"Failed to refresh dataset '{self._dataset_name}' after sync: {e}" + ) from e + + @staticmethod + def _merge_items( + existing: list[dict[str, Any]], + new: list[dict[str, Any]], + ) -> list[dict[str, Any]]: + """Merge new items into an existing list, deduplicating by ``id``. + + Items without an ``id`` key are always appended. For items with an + ``id``, new items take precedence โ€” existing entries with the same + ``id`` are replaced in place. + """ + new_by_id: dict[str, dict[str, Any]] = { + item["id"]: item for item in new if "id" in item + } + + seen_ids: set[str] = set() + merged: list[dict[str, Any]] = [] + + for item in existing: + item_id = item.get("id") + if item_id is not None and item_id in new_by_id: + merged.append(new_by_id[item_id]) + seen_ids.add(item_id) + else: + merged.append(item) + if item_id is not None: + seen_ids.add(item_id) + + for item in new: + item_id = item.get("id") + if item_id is not None and item_id in seen_ids: + continue + if item_id is not None: + seen_ids.add(item_id) + merged.append(item) + + return merged + + def load(self) -> Dataset: + """Load the Opik dataset, syncing local items to remote if sync_policy is ``local``. + + Creates the remote dataset if it does not exist. In ``local`` mode, all + local items are re-inserted to remote on every load via Opik's + ``create_or_update`` API (upsert by item ID). On items with a valid UUID v7 + ``id``, update the existing remote row in-place, and no new row is created. + On items where the ``id`` is not a valid UUID v7 (including missing, ``None``, or empty), + the ``id`` is stripped before upload and Opik auto-generates a new UUID v7. + Unchanged content is deduplicated (no-op), but changed content creates a + new remote row while the previous one remains. + + Returns: + Dataset: The Opik dataset ready for use in experiments. + + Raises: + DatasetError: If the Opik API returns an unexpected error or the + server is unreachable. + """ + dataset = self._get_or_create_remote_dataset() + + if self._sync_policy == "local": + dataset = self._sync_local_to_remote(dataset) + + logger.info( + "Loaded dataset '%s' (sync_policy='%s').", + self._dataset_name, + self._sync_policy, + ) + return dataset + + def save(self, data: list[dict[str, Any]]) -> None: + """Insert items into the Opik dataset and optionally update the local file. + + In ``remote`` mode, only the remote upload occurs. In ``local`` mode, + items are also merged into the local file. + + Args: + data: List of dicts, each containing at least an ``input`` key. + + Raises: + DatasetError: If the Opik API call fails or any item is missing ``input``. + """ + if self._sync_policy == "remote": + logger.warning( + "sync_policy='remote': save() uploads to remote only, " + "local file '%s' will not be updated.", + self._filepath, + ) + + self._validate_items(data) + + dataset = self._get_or_create_remote_dataset() + self._upload_items(dataset, data) + try: + self._client.flush() + except Exception as e: + raise DatasetError( + f"Failed to flush items to Opik dataset '{self._dataset_name}': {e}" + ) from e + + if self._sync_policy == "local" and self._filepath: + existing: list[dict] = [] + if self._filepath.exists(): + existing = self.file_dataset.load() + self.file_dataset.save(self._merge_items(existing, data)) + + def _exists(self) -> bool: + try: + self._client.get_dataset(name=self._dataset_name) + return True + except ApiError as e: + if e.status_code == HTTP_NOT_FOUND: + return False + raise DatasetError( + f"Opik API error while checking dataset '{self._dataset_name}': {e}" + ) from e + except Exception as e: + raise DatasetError( + f"Failed to connect to Opik while checking dataset " + f"'{self._dataset_name}': {e}" + ) from e + + def _describe(self) -> dict[str, Any]: + return { + "dataset_name": self._dataset_name, + "filepath": str(self._filepath) if self._filepath else None, + "sync_policy": self._sync_policy, + "metadata": self._metadata, + } + + def preview(self) -> JSONPreview: + """Generate a JSON-compatible preview of the local evaluation data for Kedro-Viz. + + Returns: + JSONPreview: A Kedro-Viz-compatible object containing a serialized JSON string. + Returns a descriptive message if filepath is not configured or does not exist. + """ + if not self._filepath: + return JSONPreview("No filepath configured.") + + if not self._filepath.exists(): + return JSONPreview("Local evaluation dataset does not exist.") + + local_data = self.file_dataset.load() + + if isinstance(local_data, str): + local_data = {"content": local_data} + + try: + return JSONPreview(json.dumps(local_data)) + except (TypeError, ValueError) as e: + return JSONPreview(f"Could not serialise local data to JSON: {e}") diff --git a/kedro-agentic-workflows/src/kedro_agentic_workflows/pipeline_registry.py b/kedro-agentic-workflows/src/kedro_agentic_workflows/pipeline_registry.py index 185e1a4..85ce5c8 100644 --- a/kedro-agentic-workflows/src/kedro_agentic_workflows/pipeline_registry.py +++ b/kedro-agentic-workflows/src/kedro_agentic_workflows/pipeline_registry.py @@ -17,4 +17,6 @@ def register_pipelines() -> dict[str, Pipeline]: pipelines["__default__"] = pipelines["intent_detection"] + pipelines["response_generation"] pipelines["openai"] = pipelines["intent_detection"] + pipelines["response_generation_openai"] pipelines["autogen"] = pipelines["intent_detection"] + pipelines["response_generation_autogen"] + pipelines["intent_detection_evaluation"] = pipelines["intent_detection_evaluation"] + pipelines["intent_detection_evaluation_opik"] = pipelines["intent_detection_evaluation_opik"] return pipelines diff --git a/kedro-agentic-workflows/src/kedro_agentic_workflows/pipelines/__init__.py b/kedro-agentic-workflows/src/kedro_agentic_workflows/pipelines/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/kedro-agentic-workflows/src/kedro_agentic_workflows/pipelines/intent_detection/agent.py b/kedro-agentic-workflows/src/kedro_agentic_workflows/pipelines/intent_detection/agent.py index 75ec94b..c0f2917 100644 --- a/kedro-agentic-workflows/src/kedro_agentic_workflows/pipelines/intent_detection/agent.py +++ b/kedro-agentic-workflows/src/kedro_agentic_workflows/pipelines/intent_detection/agent.py @@ -1,11 +1,14 @@ from functools import partial from typing import TypedDict, Literal +import json + from kedro.pipeline import LLMContext from langchain_core.messages import AnyMessage, AIMessage from langchain_core.runnables import Runnable from langchain_core.prompts import ChatPromptTemplate from langfuse.model import ChatPromptClient +from opik.api_objects.prompt.text.prompt import Prompt as OpikPrompt from langgraph.graph import StateGraph, START, END from langgraph.checkpoint.memory import MemorySaver from langgraph.graph.state import CompiledStateGraph @@ -66,13 +69,28 @@ def __init__(self, context: LLMContext): self.compiled_graph: CompiledStateGraph | None = None self.memory: MemorySaver | None = None - # Preserve Langfuse prompt for tracing - # But convert to the Langchain prompt for execution + # The catalog can deliver intent_prompt as a Langfuse + # (mode: langchain/sdk) or an Opik Prompt SDK object (mode: langchain/sdk). + # Both carry the same template content but expose different APIs. + # We normalize to ChatPromptTemplate here, so the rest of the agent can use a single + # interface regardless of which provider is active. prompt = self.context.prompts["intent_prompt"] if isinstance(prompt, ChatPromptClient): prompt = ChatPromptTemplate.from_messages( prompt.get_langchain_prompt() ) + elif isinstance(prompt, OpikPrompt): + data = prompt.prompt + if isinstance(data, str): + try: + data = json.loads(data) + except json.JSONDecodeError: + prompt = ChatPromptTemplate.from_template(data) + data = None + if data is not None: + prompt = ChatPromptTemplate.from_messages( + [(m["role"], m["content"]) for m in data] + ) # LLM bound to structured intent output structured_llm = self.context.llm.with_structured_output(IntentOutput) diff --git a/kedro-agentic-workflows/src/kedro_agentic_workflows/pipelines/intent_detection_evaluation_opik/__init__.py b/kedro-agentic-workflows/src/kedro_agentic_workflows/pipelines/intent_detection_evaluation_opik/__init__.py new file mode 100644 index 0000000..0e63d7f --- /dev/null +++ b/kedro-agentic-workflows/src/kedro_agentic_workflows/pipelines/intent_detection_evaluation_opik/__init__.py @@ -0,0 +1 @@ +from .pipeline import create_pipeline # NOQA diff --git a/kedro-agentic-workflows/src/kedro_agentic_workflows/pipelines/intent_detection_evaluation_opik/nodes.py b/kedro-agentic-workflows/src/kedro_agentic_workflows/pipelines/intent_detection_evaluation_opik/nodes.py new file mode 100644 index 0000000..0ad288c --- /dev/null +++ b/kedro-agentic-workflows/src/kedro_agentic_workflows/pipelines/intent_detection_evaluation_opik/nodes.py @@ -0,0 +1,147 @@ +import logging +from typing import Any, Callable + +from kedro.pipeline import LLMContext +from langchain_core.messages import BaseMessage, HumanMessage +from langchain_core.prompts import ChatPromptTemplate +from langchain_openai import ChatOpenAI +from opik.api_objects.dataset.dataset import Dataset +from opik.api_objects.prompt.text.prompt import Prompt +from opik.evaluation import evaluate +from opik.evaluation.metrics.score_result import ScoreResult +from pydantic import BaseModel, Field + +from ..intent_detection.agent import IntentDetectionAgent + +logger = logging.getLogger(__name__) + + +class JudgeScore(BaseModel): + score: int = Field(description="Integer score between 1 and 5 inclusive.") + + +def init_reason_judge_evaluator( + intent_judge_llm: ChatOpenAI, + judge_llm_prompt: ChatPromptTemplate, +) -> Callable: + """Creates LLM-as-a-Judge scorer compatible with opik.evaluation.evaluate().""" + model_name = getattr(intent_judge_llm, "model_name", None) + metadata = {"judge_model": model_name} if model_name else None + structured_judge_llm = intent_judge_llm.with_structured_output(JudgeScore) + + def reason_judge_evaluator( + dataset_item: dict[str, Any], + task_outputs: dict[str, Any], + **kwargs, + ) -> ScoreResult: + input_ = dataset_item.get("input", {}) + expected_output = dataset_item.get("expected_output", {}) + + messages: list[BaseMessage] = judge_llm_prompt.format_messages( + question=input_.get("question", ""), + predicted_intent=task_outputs.get("intent", ""), + predicted_reason=task_outputs.get("reason", ""), + expected_intent=expected_output.get("intent", ""), + expected_reason=expected_output.get("reason", ""), + ) + + try: + result: JudgeScore = structured_judge_llm.invoke(messages) + score = result.score + reason = "LLM judge evaluation of reasoning quality" + except Exception as e: + score = 0 + reason = f"Evaluator failed: {str(e)}" + + return ScoreResult( + name="reason_quality", + value=float(score), + reason=reason, + metadata=metadata, + ) + + return reason_judge_evaluator + + +def init_intent_accuracy_evaluator() -> Callable: + """Creates exact-match intent accuracy scorer compatible with opik.evaluation.evaluate().""" + + def intent_accuracy_evaluator( + dataset_item: dict[str, Any], + task_outputs: dict[str, Any], + **kwargs, + ) -> ScoreResult: + predicted = task_outputs.get("intent", "") + expected = dataset_item.get("expected_output", {}).get("intent", "") + score = 1.0 if predicted == expected else 0.0 + + return ScoreResult( + name="intent_accuracy", + value=score, + reason=f"predicted={predicted}, expected={expected}", + ) + + return intent_accuracy_evaluator + + +def make_intent_detection_task( + intent_detection_context: LLMContext, + opik_tracer: Any, +) -> Callable[[dict[str, Any]], dict[str, Any]]: + """Creates intent detection task callable compatible with opik.evaluation.evaluate(). + + The returned callable follows the LLMTask type expected by Opik: receives a + dataset item dict and returns a dict with the task outputs. + """ + agent = IntentDetectionAgent(context=intent_detection_context) + agent.compile() + + def intent_agent_task(dataset_item: dict[str, Any]) -> dict[str, Any]: + question = dataset_item.get("input", {}).get("question", "") + item_id = dataset_item.get("id", "unknown") + + agent_input = { + "messages": [HumanMessage(content=question)], + "user_context": {}, + } + + result = agent.invoke( + agent_input, + { + "configurable": {"thread_id": item_id}, + "callbacks": [opik_tracer], + }, + ) + + return { + "intent": result.get("intent", ""), + "reason": result.get("reason", ""), + } + + return intent_agent_task + + +def run_experiment( + intent_evaluation_data: Dataset, + intent_agent_task: Callable, + intent_accuracy_evaluator: Callable, + reason_judge_evaluator: Callable, + intent_prompt: Prompt, + model_name: str, +) -> None: + """Run an Opik evaluation experiment over the intent detection dataset.""" + intent_prompt_commit = intent_prompt.commit + prompt_name = intent_prompt.name + experiment_name = f"intent_eval_prompt_{prompt_name}_{intent_prompt_commit[:8]}_model_{model_name}" + + evaluate( + dataset=intent_evaluation_data, + task=intent_agent_task, + scoring_functions=[intent_accuracy_evaluator, reason_judge_evaluator], + experiment_name=experiment_name, + experiment_config={ + "intent_prompt_commit": intent_prompt_commit, + "prompt_name": prompt_name, + "model_name": model_name, + }, + ) diff --git a/kedro-agentic-workflows/src/kedro_agentic_workflows/pipelines/intent_detection_evaluation_opik/pipeline.py b/kedro-agentic-workflows/src/kedro_agentic_workflows/pipelines/intent_detection_evaluation_opik/pipeline.py new file mode 100644 index 0000000..7a9721a --- /dev/null +++ b/kedro-agentic-workflows/src/kedro_agentic_workflows/pipelines/intent_detection_evaluation_opik/pipeline.py @@ -0,0 +1,58 @@ +from kedro.pipeline import llm_context_node, node, pipeline, Pipeline + +from .nodes import ( + init_intent_accuracy_evaluator, + init_reason_judge_evaluator, + make_intent_detection_task, + run_experiment, +) + + +def create_pipeline(**kwargs) -> Pipeline: + return pipeline( + [ + llm_context_node( + name="opik_intent_agent_context_node", + outputs="opik_intent_detection_context", + llm="llm", + prompts=["intent_prompt"], + ), + node( + func=init_intent_accuracy_evaluator, + inputs=None, + outputs="opik_intent_accuracy_evaluator", + name="opik_init_intent_accuracy_evaluator_node", + ), + node( + func=init_reason_judge_evaluator, + inputs=[ + "opik_intent_judge_llm", + "opik_judge_llm_prompt", + ], + outputs="opik_reason_judge_evaluator", + name="opik_init_reason_judge_evaluator_node", + ), + node( + func=make_intent_detection_task, + inputs=[ + "opik_intent_detection_context", + "opik_client", + ], + outputs="opik_intent_agent_task", + name="opik_make_intent_detection_task_node", + ), + node( + func=run_experiment, + inputs=[ + "opik_intent_evaluation_data", + "opik_intent_agent_task", + "opik_intent_accuracy_evaluator", + "opik_reason_judge_evaluator", + "intent_prompt", + "params:model_name", + ], + outputs=None, + name="opik_run_intent_experiment_node", + ), + ] + ) diff --git a/kedro-agentic-workflows/src/kedro_agentic_workflows/settings.py b/kedro-agentic-workflows/src/kedro_agentic_workflows/settings.py index 2ad016b..bf4d065 100644 --- a/kedro-agentic-workflows/src/kedro_agentic_workflows/settings.py +++ b/kedro-agentic-workflows/src/kedro_agentic_workflows/settings.py @@ -47,7 +47,6 @@ import warnings from kedro.utils import KedroExperimentalWarning - # Suppresses all subsequent KedroExperimentalWarning warnings warnings.filterwarnings("ignore", category=KedroExperimentalWarning) warnings.filterwarnings("ignore", message="Pydantic serializer warnings")