Skip to content
Merged
1 change: 0 additions & 1 deletion rust/otap-dataflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ thiserror.workspace = true
serde_json.workspace = true
clap.workspace = true
mimalloc = { workspace = true, optional = true }
sysinfo.workspace = true

[target.'cfg(not(windows))'.dependencies]
tikv-jemallocator = { workspace = true, optional = true }
Expand Down
61 changes: 61 additions & 0 deletions rust/otap-dataflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,67 @@ The `views` sub-module contains zero-copy machinery for:
- interpreting OTLP bytes using views to build OTAP records
- interpreting OTAP records using views to encode OTLP bytes

## Embedding in Custom Distributions

The engine crates are designed as reusable libraries. A custom binary can
link the same controller, factory, and node crates and register its own
components via `linkme` distributed slices -- exactly how `src/main.rs` works.

The `otap_df_controller::startup` module provides three helpers that every
embedding binary typically needs:

- **`validate_engine_components`** -- Checks that every node URN in a
config maps to a registered component and runs per-component config
validation.
- **`apply_cli_overrides`** -- Merges core-allocation and HTTP-admin
bind overrides into an `OtelDataflowSpec`.
- **`system_info`** -- Returns a formatted string with CPU/memory info
and all registered component URNs, useful for `--help` banners or
diagnostics.

A minimal custom binary looks like this:

```rust
use otap_df_config::engine::OtelDataflowSpec;
use otap_df_controller::{Controller, startup};

// Side-effect imports to register components via linkme.
use otap_df_core_nodes as _;
// Bring your own contrib/custom nodes as needed.

// Reference the pipeline factory (or define your own).
use otap_df_otap::OTAP_PIPELINE_FACTORY;

fn main() -> Result<(), Box<dyn std::error::Error>> {
otap_df_otap::crypto::install_crypto_provider()?;

let mut cfg = OtelDataflowSpec::from_file("pipeline.yaml")?;

// Apply any CLI overrides (core count, admin bind address, etc.)
startup::apply_cli_overrides(&mut cfg, Some(4), None, None);

// Validate that every node URN in the config is registered.
startup::validate_engine_components(&cfg, &OTAP_PIPELINE_FACTORY)?;

// Print diagnostics.
// Pass "system" here for the minimal example; in practice, align this
// string with your binary's allocator feature (e.g. "jemalloc", "mimalloc").
println!("{}", startup::system_info(&OTAP_PIPELINE_FACTORY, "system"));

// Run the engine.
let controller = Controller::new(&OTAP_PIPELINE_FACTORY);
controller.run_forever(cfg)?;
Ok(())
}
```

This pattern is analogous to the builder approach used by projects like
`bindplane-otel-collector` in the Go ecosystem. The default `src/main.rs`
is itself a thin wrapper over these library calls.

For a complete runnable example, see
[`examples/custom_collector.rs`](examples/custom_collector.rs).

## Development Setup

**Requirements**:
Expand Down
1 change: 1 addition & 0 deletions rust/otap-dataflow/crates/controller/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ otap-df-engine = { workspace = true }
otap-df-telemetry = { workspace = true }
otap-df-admin = { workspace = true }

sysinfo = { workspace = true }
thiserror = { workspace = true }
miette = { workspace = true }
core_affinity = { workspace = true }
Expand Down
60 changes: 55 additions & 5 deletions rust/otap-dataflow/crates/controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ use otap_df_engine::topic::{
InMemoryBackend, PipelineTopicBinding, TopicBroker, TopicOptions, TopicPublishOutcomeConfig,
TopicSet,
};
use otap_df_state::store::ObservedStateStore;
use otap_df_state::store::{ObservedStateHandle, ObservedStateStore};
use otap_df_telemetry::event::{EngineEvent, ErrorSummary, ObservedEventReporter};
use otap_df_telemetry::registry::TelemetryRegistryHandle;
use otap_df_telemetry::reporter::MetricsReporter;
Expand All @@ -91,6 +91,8 @@ use std::thread;

/// Error types and helpers for the controller module.
pub mod error;
/// Reusable startup helpers (validation, CLI overrides, system info).
pub mod startup;
/// Utilities to spawn async tasks on dedicated threads with graceful shutdown.
pub mod thread_task;

Expand Down Expand Up @@ -264,14 +266,52 @@ impl<PData: 'static + Clone + Send + Sync + std::fmt::Debug + ReceivedAtNode + U

/// Starts the controller with the given engine configurations.
pub fn run_forever(&self, engine_config: OtelDataflowSpec) -> Result<(), Error> {
self.run_with_mode(engine_config, RunMode::ParkMainThread)
self.run_with_mode(
engine_config,
RunMode::ParkMainThread,
None::<fn(ObservedStateHandle)>,
)
}

/// Starts the controller and invokes `observer` with an
/// [`ObservedStateHandle`] as soon as the pipeline state store is ready.
///
/// The callback fires once, before the engine blocks. Use it to obtain
/// zero-overhead, in-process access to pipeline liveness, readiness, and
/// health without going through the admin HTTP server.
pub fn run_forever_with_observer<F>(
&self,
engine_config: OtelDataflowSpec,
observer: F,
) -> Result<(), Error>
where
F: FnOnce(ObservedStateHandle),
{
self.run_with_mode(engine_config, RunMode::ParkMainThread, Some(observer))
}

/// Starts the controller with the given engine configurations.
///
/// Runs until pipelines are shut down, then closes telemetry/admin services.
pub fn run_till_shutdown(&self, engine_config: OtelDataflowSpec) -> Result<(), Error> {
self.run_with_mode(engine_config, RunMode::ShutdownWhenDone)
self.run_with_mode(
engine_config,
RunMode::ShutdownWhenDone,
None::<fn(ObservedStateHandle)>,
)
}

/// Like [`run_till_shutdown`](Self::run_till_shutdown), but invokes
/// `observer` with an [`ObservedStateHandle`] before blocking.
pub fn run_till_shutdown_with_observer<F>(
&self,
engine_config: OtelDataflowSpec,
observer: F,
) -> Result<(), Error>
where
F: FnOnce(ObservedStateHandle),
{
self.run_with_mode(engine_config, RunMode::ShutdownWhenDone, Some(observer))
}

fn map_topic_spec_to_options(
Expand Down Expand Up @@ -957,11 +997,15 @@ impl<PData: 'static + Clone + Send + Sync + std::fmt::Debug + ReceivedAtNode + U
Ok(set)
}

fn run_with_mode(
fn run_with_mode<F>(
&self,
engine_config: OtelDataflowSpec,
run_mode: RunMode,
) -> Result<(), Error> {
observer: Option<F>,
) -> Result<(), Error>
where
F: FnOnce(ObservedStateHandle),
{
let num_pipeline_groups = engine_config.groups.len();
let resolved_config = engine_config.resolve();
let (engine, pipelines, observability_pipeline) = resolved_config.into_parts();
Expand Down Expand Up @@ -993,6 +1037,12 @@ impl<PData: 'static + Clone + Send + Sync + std::fmt::Debug + ReceivedAtNode + U
log_tap_handle.clone(),
);
let obs_state_handle = obs_state_store.handle();

// Notify the caller with a clone of the observed-state handle, if requested.
if let Some(observer) = observer {
observer(obs_state_handle.clone());
}

let engine_evt_reporter =
obs_state_store.engine_reporter(engine.observed_state.engine_events);
let console_async_reporter = telemetry_config
Expand Down
Loading
Loading