added pipeline sub command and update docs#1874
Conversation
Greptile SummaryThis PR promotes
|
| Filename | Overview |
|---|---|
| nemo_retriever/src/nemo_retriever/pipeline/main.py | New 1,228-line CLI module exposing retriever pipeline run; contains resource-management issues (Ray not shut down on exception paths), _TeeStream.fileno() can raise AttributeError, and _configure_logging mutates streams before the try/finally guard. |
| nemo_retriever/src/nemo_retriever/pipeline/init.py | New package init with lazy __getattr__ forwarding app and run from __main__; clean pattern with SPDX header and correct __all__. |
| nemo_retriever/src/nemo_retriever/examples/graph_pipeline.py | Correctly converted to a backward-compat shim that re-exports app, _ensure_lancedb_table, _resolve_file_patterns from the new pipeline module. |
| nemo_retriever/src/nemo_retriever/evaluation/cli.py | Extracted run_qa_sweep_from_config_dict so the QA sweep logic can be called from both retriever eval run and retriever pipeline run --evaluation-mode=qa; internal raise typer.Exit replaced by return 1 which is correct for library use. |
| nemo_retriever/tests/test_pipeline_helpers.py | Comprehensive unit tests for private helper functions with good happy-path and error-path coverage. |
| nemo_retriever/tests/test_batch_pipeline.py | Integration-style CLI tests using CliRunner with proper monkeypatching of Ray, LanceDB, and GraphIngestor; covers multimodal, audio, BEIR, and runtime-metrics flags. |
Flowchart
%%{init: {'theme': 'neutral'}}%%
flowchart TD
A["retriever pipeline run INPUT_PATH"] --> B["_configure_logging()"]
B --> C{"log_file set?"}
C -- yes --> D["Open file handle\nReplace sys.stdout/stderr\nwith _TeeStream"]
C -- no --> E["basicConfig only"]
D --> F["try block begins"]
E --> F
F --> G["Validate run_mode, evaluation_mode, etc."]
G --> H["_ensure_lancedb_table()"]
H --> I["_resolve_file_patterns()"]
I --> J["_build_extract_params()\n_build_embed_params()"]
J --> K["_build_ingestor() / GraphIngestor chain"]
K --> L["ingestor.ingest()"]
L --> M["_collect_results()"]
M --> P["handle_lancedb() overwrite"]
P --> Q{"table.count_rows()==0?"}
Q -- yes --> R["Skip eval\n⚠ ray.shutdown() here"]
Q -- no --> S{"evaluation_mode?"}
S -- qa --> T["run_qa_sweep_from_config_dict()\n⚠ ray.shutdown() here"]
S -- recall/beir --> U["_run_evaluation()"]
U --> W{"ran?"}
W -- no --> X["⚠ ray.shutdown() here"]
W -- yes --> Y["_write_runtime_summary()\n⚠ ray.shutdown() here"]
R --> Z["finally: restore streams only\n❌ NO ray.shutdown() on exception"]
T --> Z
X --> Z
Y --> Z
style Z fill:#ffcccc
style R fill:#ffe0cc
style T fill:#ffe0cc
style X fill:#ffe0cc
style Y fill:#ffe0cc
Prompt To Fix All With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/pipeline/__main__.py
Line: 1215-1219
Comment:
**Ray cluster not shut down on exception in batch mode**
`ray.shutdown()` is called at four separate early-return points (lines 1041–1044, 1091–1094, 1162–1165, 1193–1196), but the `finally` block only restores log streams. If any exception propagates out of the `try` body after Ray has been initialized — e.g. from `ingestor.ingest()`, `_collect_results()`, or `handle_lancedb()` — Ray worker processes and GPU allocations are never released.
Moving `ray.shutdown()` into the `finally` block eliminates all four duplicate call-sites and closes the leak:
```python
finally:
if run_mode == "batch":
import ray
if ray.is_initialized():
ray.shutdown()
os.sys.stdout = original_stdout
os.sys.stderr = original_stderr
if log_handle is not None:
log_handle.close()
```
Each early-return guard (`if run_mode == "batch": import ray; ray.shutdown(); return`) can then be replaced with a bare `return`.
How can I resolve this? If you propose a fix, please make it concise.Reviews (8): Last reviewed commit: "fix format on file for pre-commit hooks" | Re-trigger Greptile
|
@jperez999 looks good but there are some merge conflicts. If you resolve those we can get this merged. |
| from nemo_retriever.evaluation.cli import run_qa_sweep_from_config_dict | ||
| from nemo_retriever.evaluation.config import load_eval_config | ||
|
|
||
| assert eval_config is not None |
There was a problem hiding this comment.
assert used for mandatory runtime validation
assert eval_config is not None is stripped when Python is run with -O (optimized mode) or compiled to .pyc with optimizations, causing eval_config to silently be None and crashing the subsequent load_eval_config(str(None)) call with a confusing error. The guard on lines 842–846 already guarantees this invariant before we reach line 1051, so the assert is redundant — but it becomes dangerous if the early guard is ever refactored away.
Replace with an explicit raise so the protection works under all Python execution modes:
| assert eval_config is not None | |
| if eval_config is None: | |
| raise typer.BadParameter("--evaluation-mode=qa requires --eval-config.") |
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/pipeline/__main__.py
Line: 1051
Comment:
**`assert` used for mandatory runtime validation**
`assert eval_config is not None` is stripped when Python is run with `-O` (optimized mode) or compiled to `.pyc` with optimizations, causing `eval_config` to silently be `None` and crashing the subsequent `load_eval_config(str(None))` call with a confusing error. The guard on lines 842–846 already guarantees this invariant before we reach line 1051, so the assert is redundant — but it becomes dangerous if the early guard is ever refactored away.
Replace with an explicit raise so the protection works under all Python execution modes:
```suggestion
if eval_config is None:
raise typer.BadParameter("--evaluation-mode=qa requires --eval-config.")
```
How can I resolve this? If you propose a fix, please make it concise.| log_handle, original_stdout, original_stderr = _configure_logging(log_file, debug=bool(debug)) | ||
| try: |
There was a problem hiding this comment.
_configure_logging called outside the try/finally guard — resource leak if it raises after mutating stdout/stderr
_configure_logging first opens a file handle, then replaces os.sys.stdout and os.sys.stderr with _TeeStream instances. If anything after the open() call raises (e.g., logging.basicConfig or the logger.info write), those stream replacements are permanent for the process and the open file handle is never closed — because the finally block only runs if the try is entered on the next line.
Move the call inside the try block to fix both the leak and the unreachable finally.
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/pipeline/__main__.py
Line: 832-833
Comment:
**`_configure_logging` called outside the `try/finally` guard — resource leak if it raises after mutating stdout/stderr**
`_configure_logging` first opens a file handle, then replaces `os.sys.stdout` and `os.sys.stderr` with `_TeeStream` instances. If anything after the `open()` call raises (e.g., `logging.basicConfig` or the `logger.info` write), those stream replacements are permanent for the process and the open file handle is never closed — because the `finally` block only runs if the `try` is entered on the next line.
Move the call inside the `try` block to fix both the leak and the unreachable `finally`.
How can I resolve this? If you propose a fix, please make it concise.| finally: | ||
| os.sys.stdout = original_stdout | ||
| os.sys.stderr = original_stderr | ||
| if log_handle is not None: | ||
| log_handle.close() |
There was a problem hiding this comment.
Ray cluster not shut down on exception in batch mode
ray.shutdown() is called at four separate early-return points (lines 1041–1044, 1091–1094, 1162–1165, 1193–1196), but the finally block only restores log streams. If any exception propagates out of the try body after Ray has been initialized — e.g. from ingestor.ingest(), _collect_results(), or handle_lancedb() — Ray worker processes and GPU allocations are never released.
Moving ray.shutdown() into the finally block eliminates all four duplicate call-sites and closes the leak:
finally:
if run_mode == "batch":
import ray
if ray.is_initialized():
ray.shutdown()
os.sys.stdout = original_stdout
os.sys.stderr = original_stderr
if log_handle is not None:
log_handle.close()Each early-return guard (if run_mode == "batch": import ray; ray.shutdown(); return) can then be replaced with a bare return.
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/pipeline/__main__.py
Line: 1215-1219
Comment:
**Ray cluster not shut down on exception in batch mode**
`ray.shutdown()` is called at four separate early-return points (lines 1041–1044, 1091–1094, 1162–1165, 1193–1196), but the `finally` block only restores log streams. If any exception propagates out of the `try` body after Ray has been initialized — e.g. from `ingestor.ingest()`, `_collect_results()`, or `handle_lancedb()` — Ray worker processes and GPU allocations are never released.
Moving `ray.shutdown()` into the `finally` block eliminates all four duplicate call-sites and closes the leak:
```python
finally:
if run_mode == "batch":
import ray
if ray.is_initialized():
ray.shutdown()
os.sys.stdout = original_stdout
os.sys.stderr = original_stderr
if log_handle is not None:
log_handle.close()
```
Each early-return guard (`if run_mode == "batch": import ray; ray.shutdown(); return`) can then be replaced with a bare `return`.
How can I resolve this? If you propose a fix, please make it concise.
Description
This PR turns the graph_pipeline.py file into retriever subcommand pipeline. Now you can you
retriever pipelineto run the exact same logic as graph_pipeline.pyChecklist