Skip to content

Commit 32bc2ca

Browse files
lalitbjmacd
andauthored
feat(otap-dataflow): expose startup helpers as a public library module (#2505)
Fixes: #2462, #2463 ### Summary - Moves reusable startup functions (`validate_engine_components`, `apply_cli_overrides`, `system_info`) from `src/main.rs` into a new `otap_df_controller::startup` module. This lets custom distribution binaries share the same validation, override, and diagnostics logic without copying code that may drift across releases. - Adds `run_forever_with_observer` and `run_till_shutdown_with_observer` to `Controller`, exposing the `ObservedStateHandle` via a callback so that in-process integrations can read pipeline liveness and health with zero HTTP overhead. - `src/main.rs` becomes a thin wrapper over these library calls - no behavioral change for existing users. A new "Embedding in Custom Distributions" section is added to the README, and a runnable `examples/custom_collector.rs` demonstrates the full library embedding pattern end-to-end, including the observer API with an opt-in `--poll-status` flag for periodic health snapshots. --------- Co-authored-by: Joshua MacDonald <jmacd@users.noreply.github.com>
1 parent a1fa843 commit 32bc2ca

File tree

7 files changed

+761
-413
lines changed

7 files changed

+761
-413
lines changed

rust/otap-dataflow/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ thiserror.workspace = true
4040
serde_json.workspace = true
4141
clap.workspace = true
4242
mimalloc = { workspace = true, optional = true }
43-
sysinfo.workspace = true
4443

4544
[target.'cfg(not(windows))'.dependencies]
4645
tikv-jemallocator = { workspace = true, optional = true }

rust/otap-dataflow/README.md

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,67 @@ The `views` sub-module contains zero-copy machinery for:
351351
- interpreting OTLP bytes using views to build OTAP records
352352
- interpreting OTAP records using views to encode OTLP bytes
353353

354+
## Embedding in Custom Distributions
355+
356+
The engine crates are designed as reusable libraries. A custom binary can
357+
link the same controller, factory, and node crates and register its own
358+
components via `linkme` distributed slices -- exactly how `src/main.rs` works.
359+
360+
The `otap_df_controller::startup` module provides three helpers that every
361+
embedding binary typically needs:
362+
363+
- **`validate_engine_components`** -- Checks that every node URN in a
364+
config maps to a registered component and runs per-component config
365+
validation.
366+
- **`apply_cli_overrides`** -- Merges core-allocation and HTTP-admin
367+
bind overrides into an `OtelDataflowSpec`.
368+
- **`system_info`** -- Returns a formatted string with CPU/memory info
369+
and all registered component URNs, useful for `--help` banners or
370+
diagnostics.
371+
372+
A minimal custom binary looks like this:
373+
374+
```rust
375+
use otap_df_config::engine::OtelDataflowSpec;
376+
use otap_df_controller::{Controller, startup};
377+
378+
// Side-effect imports to register components via linkme.
379+
use otap_df_core_nodes as _;
380+
// Bring your own contrib/custom nodes as needed.
381+
382+
// Reference the pipeline factory (or define your own).
383+
use otap_df_otap::OTAP_PIPELINE_FACTORY;
384+
385+
fn main() -> Result<(), Box<dyn std::error::Error>> {
386+
otap_df_otap::crypto::install_crypto_provider()?;
387+
388+
let mut cfg = OtelDataflowSpec::from_file("pipeline.yaml")?;
389+
390+
// Apply any CLI overrides (core count, admin bind address, etc.)
391+
startup::apply_cli_overrides(&mut cfg, Some(4), None, None);
392+
393+
// Validate that every node URN in the config is registered.
394+
startup::validate_engine_components(&cfg, &OTAP_PIPELINE_FACTORY)?;
395+
396+
// Print diagnostics.
397+
// Pass "system" here for the minimal example; in practice, align this
398+
// string with your binary's allocator feature (e.g. "jemalloc", "mimalloc").
399+
println!("{}", startup::system_info(&OTAP_PIPELINE_FACTORY, "system"));
400+
401+
// Run the engine.
402+
let controller = Controller::new(&OTAP_PIPELINE_FACTORY);
403+
controller.run_forever(cfg)?;
404+
Ok(())
405+
}
406+
```
407+
408+
This pattern is analogous to the builder approach used by projects like
409+
`bindplane-otel-collector` in the Go ecosystem. The default `src/main.rs`
410+
is itself a thin wrapper over these library calls.
411+
412+
For a complete runnable example, see
413+
[`examples/custom_collector.rs`](examples/custom_collector.rs).
414+
354415
## Development Setup
355416

356417
**Requirements**:

rust/otap-dataflow/crates/controller/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ otap-df-engine = { workspace = true }
2222
otap-df-telemetry = { workspace = true }
2323
otap-df-admin = { workspace = true }
2424

25+
sysinfo = { workspace = true }
2526
thiserror = { workspace = true }
2627
miette = { workspace = true }
2728
core_affinity = { workspace = true }

rust/otap-dataflow/crates/controller/src/lib.rs

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ use otap_df_engine::topic::{
7575
InMemoryBackend, PipelineTopicBinding, TopicBroker, TopicOptions, TopicPublishOutcomeConfig,
7676
TopicSet,
7777
};
78-
use otap_df_state::store::ObservedStateStore;
78+
use otap_df_state::store::{ObservedStateHandle, ObservedStateStore};
7979
use otap_df_telemetry::event::{EngineEvent, ErrorSummary, ObservedEventReporter};
8080
use otap_df_telemetry::registry::TelemetryRegistryHandle;
8181
use otap_df_telemetry::reporter::MetricsReporter;
@@ -91,6 +91,8 @@ use std::thread;
9191

9292
/// Error types and helpers for the controller module.
9393
pub mod error;
94+
/// Reusable startup helpers (validation, CLI overrides, system info).
95+
pub mod startup;
9496
/// Utilities to spawn async tasks on dedicated threads with graceful shutdown.
9597
pub mod thread_task;
9698

@@ -264,14 +266,52 @@ impl<PData: 'static + Clone + Send + Sync + std::fmt::Debug + ReceivedAtNode + U
264266

265267
/// Starts the controller with the given engine configurations.
266268
pub fn run_forever(&self, engine_config: OtelDataflowSpec) -> Result<(), Error> {
267-
self.run_with_mode(engine_config, RunMode::ParkMainThread)
269+
self.run_with_mode(
270+
engine_config,
271+
RunMode::ParkMainThread,
272+
None::<fn(ObservedStateHandle)>,
273+
)
274+
}
275+
276+
/// Starts the controller and invokes `observer` with an
277+
/// [`ObservedStateHandle`] as soon as the pipeline state store is ready.
278+
///
279+
/// The callback fires once, before the engine blocks. Use it to obtain
280+
/// zero-overhead, in-process access to pipeline liveness, readiness, and
281+
/// health without going through the admin HTTP server.
282+
pub fn run_forever_with_observer<F>(
283+
&self,
284+
engine_config: OtelDataflowSpec,
285+
observer: F,
286+
) -> Result<(), Error>
287+
where
288+
F: FnOnce(ObservedStateHandle),
289+
{
290+
self.run_with_mode(engine_config, RunMode::ParkMainThread, Some(observer))
268291
}
269292

270293
/// Starts the controller with the given engine configurations.
271294
///
272295
/// Runs until pipelines are shut down, then closes telemetry/admin services.
273296
pub fn run_till_shutdown(&self, engine_config: OtelDataflowSpec) -> Result<(), Error> {
274-
self.run_with_mode(engine_config, RunMode::ShutdownWhenDone)
297+
self.run_with_mode(
298+
engine_config,
299+
RunMode::ShutdownWhenDone,
300+
None::<fn(ObservedStateHandle)>,
301+
)
302+
}
303+
304+
/// Like [`run_till_shutdown`](Self::run_till_shutdown), but invokes
305+
/// `observer` with an [`ObservedStateHandle`] before blocking.
306+
pub fn run_till_shutdown_with_observer<F>(
307+
&self,
308+
engine_config: OtelDataflowSpec,
309+
observer: F,
310+
) -> Result<(), Error>
311+
where
312+
F: FnOnce(ObservedStateHandle),
313+
{
314+
self.run_with_mode(engine_config, RunMode::ShutdownWhenDone, Some(observer))
275315
}
276316

277317
fn map_topic_spec_to_options(
@@ -957,11 +997,15 @@ impl<PData: 'static + Clone + Send + Sync + std::fmt::Debug + ReceivedAtNode + U
957997
Ok(set)
958998
}
959999

960-
fn run_with_mode(
1000+
fn run_with_mode<F>(
9611001
&self,
9621002
engine_config: OtelDataflowSpec,
9631003
run_mode: RunMode,
964-
) -> Result<(), Error> {
1004+
observer: Option<F>,
1005+
) -> Result<(), Error>
1006+
where
1007+
F: FnOnce(ObservedStateHandle),
1008+
{
9651009
let num_pipeline_groups = engine_config.groups.len();
9661010
let resolved_config = engine_config.resolve();
9671011
let (engine, pipelines, observability_pipeline) = resolved_config.into_parts();
@@ -993,6 +1037,12 @@ impl<PData: 'static + Clone + Send + Sync + std::fmt::Debug + ReceivedAtNode + U
9931037
log_tap_handle.clone(),
9941038
);
9951039
let obs_state_handle = obs_state_store.handle();
1040+
1041+
// Notify the caller with a clone of the observed-state handle, if requested.
1042+
if let Some(observer) = observer {
1043+
observer(obs_state_handle.clone());
1044+
}
1045+
9961046
let engine_evt_reporter =
9971047
obs_state_store.engine_reporter(engine.observed_state.engine_events);
9981048
let console_async_reporter = telemetry_config

0 commit comments

Comments
 (0)