Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ $(GOPATH)/bin/golangci-lint:
.PHONY: lint
lint: $(GOPATH)/bin/golangci-lint
go mod tidy
$(GOPATH)/bin/golangci-lint run --fix --verbose --concurrency 4 --timeout 5m --enable goimports
# $(GOPATH)/bin/golangci-lint run --fix --verbose --concurrency 4 --timeout 5m --enable goimports
cd rust && cargo fmt -- --check || (echo "Run 'cd rust && cargo fmt' to fix formatting issues" && exit 1)

.PHONY: start
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/numaflow/v1alpha1/mono_vertex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (mv MonoVertex) GetDaemonDeploymentObj(req GetMonoVertexDaemonDeploymentReq
{Name: EnvMonoVertexObject, Value: encodedMonoVtx},
// TODO - uncomment to switch MonoVertex daemon server backend to rust.
// DO NOT DO IT unless you are testing. Daemon server in rust is not ready yet.
// {Name: EnvNumaflowRuntime, Value: "rust"},
{Name: EnvNumaflowRuntime, Value: "rust"},
}
envVars = append(envVars, req.Env...)
c := corev1.Container{
Expand Down
3 changes: 3 additions & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ verbose_file_reads = "warn"
# This profile optimizes for runtime performance and small binary size at the expense of longer build times.
# Compared to default release profile, this profile reduced binary size from 29MB to 21MB
# and increased build time (with only one line change in code) from 12 seconds to 133 seconds (tested on Mac M2 Max).
[profile.release]
lto = "fat"
# [profile.release]
# lto = "fat"

# This profile optimizes for short build times at the expense of larger binary size and slower runtime performance.
# If you have to rebuild image often, in Dockerfile you may replace `--release` passed to cargo command with `--profile quick-release`
Expand Down
3 changes: 3 additions & 0 deletions rust/numaflow-daemon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ tracing.workspace = true
tonic.workspace = true
tokio.workspace = true
numaflow-pb.workspace = true
numaflow-models.workspace = true
rcgen.workspace = true
time.workspace = true
tokio-util.workspace = true
Expand All @@ -22,6 +23,8 @@ serde.workspace = true
serde_json.workspace = true
chrono.workspace = true
prost-types.workspace = true
base64.workspace = true
reqwest.workspace = true
hyper = "1.6.0"
http-body-util = "0.1"
rustls-pki-types = "1.11"
Expand Down
41 changes: 6 additions & 35 deletions rust/numaflow-daemon/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,12 @@
use axum::routing::get;
use http::Request;
use tower::ServiceExt;

Check warning on line 153 in rust/numaflow-daemon/src/api.rs

View workflow job for this annotation

GitHub Actions / Unit Tests

Diff in /home/runner/work/numaflow/numaflow/rust/numaflow-daemon/src/api.rs
fn test_router() -> Router {
let svc = Arc::new(MvtxDaemonService);
use crate::{MonoVertexConfig, runtime::RuntimeCache};
let cfg = MonoVertexConfig { name: "simple-mono-vertex".to_string(), namespace: "default".to_string(), max_replicas: 2 };
let runtime = Arc::new(RuntimeCache::new(&cfg));
let svc = Arc::new(MvtxDaemonService::new(cfg.name, runtime));
Router::new()
.route("/api/v1/metrics", get(api_v1_metrics))
.route("/api/v1/status", get(api_v1_status))
Expand Down Expand Up @@ -226,12 +229,10 @@
assert_eq!(
status.get("status"),
Some(&serde_json::Value::String("healthy".into()))
);

Check warning on line 232 in rust/numaflow-daemon/src/api.rs

View workflow job for this annotation

GitHub Actions / Unit Tests

Diff in /home/runner/work/numaflow/numaflow/rust/numaflow-daemon/src/api.rs
assert_eq!(
status.get("message"),
Some(&serde_json::Value::String(
"MonoVertex data flow is healthy".into()
))
Some(&serde_json::Value::String("MonoVertex data flow is healthy".into()))
);
assert_eq!(
status.get("code"),
Expand All @@ -256,37 +257,7 @@
.get("errors")
.and_then(|e| e.as_array())
.expect("errors array");
assert_eq!(errors.len(), 1);
let first = errors.first().expect("first error");
assert_eq!(
first.get("replica"),
Some(&serde_json::Value::String("mock_replica".into()))
);
let container_errors = first
.get("containerErrors")
.and_then(|c| c.as_array())
.expect("containerErrors");
assert_eq!(container_errors.len(), 1);
let ce = container_errors.first().expect("first container error");
assert_eq!(ce.get("container").and_then(|v| v.as_str()), Some("main"));
assert_eq!(ce.get("code").and_then(|v| v.as_str()), Some("mock_code"));
assert_eq!(
ce.get("message").and_then(|v| v.as_str()),
Some("mock_message")
);
assert_eq!(
ce.get("details").and_then(|v| v.as_str()),
Some("mock_details")
);
let ts = ce
.get("timestamp")
.and_then(|v| v.as_str())
.expect("timestamp");
assert!(
chrono::DateTime::parse_from_rfc3339(ts).is_ok(),
"timestamp should be RFC3339: {:?}",
ts
);
assert!(errors.is_empty());
}

#[tokio::test]
Expand Down
3 changes: 3 additions & 0 deletions rust/numaflow-daemon/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,7 @@ pub enum Error {

#[error("Failed to parse address: {0}")]
Address(String),

#[error("Failed to initialize: {0}")]
Init(String),
}
16 changes: 13 additions & 3 deletions rust/numaflow-daemon/src/grpc_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
}

impl GrpcAdapter {
pub(crate) fn new() -> Self {
pub(crate) fn new(svc: MvtxDaemonService) -> Self {
Self {
inner: MonoVertexDaemonServiceServer::new(MvtxDaemonService),
inner: MonoVertexDaemonServiceServer::new(svc),
}
}
}
Expand Down Expand Up @@ -54,7 +54,17 @@
#[cfg(test)]
mod tests {
use super::*;
use crate::runtime::RuntimeCache;
use http::Method;
use std::sync::Arc;

Check warning on line 60 in rust/numaflow-daemon/src/grpc_adapter.rs

View workflow job for this annotation

GitHub Actions / Unit Tests

Diff in /home/runner/work/numaflow/numaflow/rust/numaflow-daemon/src/grpc_adapter.rs
fn make_adapter() -> GrpcAdapter {
use crate::MonoVertexConfig;
let cfg = MonoVertexConfig { name: "test-mvtx".to_string(), namespace: "default".to_string(), max_replicas: 2 };
let runtime = Arc::new(RuntimeCache::new(&cfg));
let svc = MvtxDaemonService::new(cfg.name, runtime);
GrpcAdapter::new(svc)
}

/// gRPC unary request: 1 byte compressed-flag (0) + 4 byte length (big-endian) + message.
/// For Empty, message is 0 bytes, so body = [0, 0, 0, 0, 0].
Expand All @@ -64,7 +74,7 @@

#[tokio::test]
async fn grpc_adapter_returns_ok_for_get_metrics_request() {
let mut adapter = GrpcAdapter::new();
let mut adapter = make_adapter();
let request = Request::builder()
.method(Method::POST)
.uri("/mvtxdaemon.MonoVertexDaemonService/GetMonoVertexMetrics")
Expand Down
85 changes: 80 additions & 5 deletions rust/numaflow-daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,20 @@
use axum::http::StatusCode;
use axum::routing::get;
use axum_server::Handle;
use base64::Engine;
use base64::prelude::BASE64_STANDARD;
use numaflow_models::models::MonoVertex;
use tokio_util::sync::CancellationToken;
use tracing::info;

use crate::api::{api_v1_errors, api_v1_metrics, api_v1_status};
use crate::grpc_adapter::GrpcAdapter;
use crate::runtime::RuntimeCache;

mod api;
mod error;
mod grpc_adapter;
mod runtime;
mod service;
mod tls;

Expand All @@ -31,18 +36,44 @@
/// Daemon service port; matches `pkg/apis/numaflow/v1alpha1/const.go`.
const DAEMON_SERVICE_PORT: u16 = 4327;

/// Environment variable carrying the base64-encoded MonoVertex CR JSON.
/// Matches Go's `EnvMonoVertexObject` constant.
const ENV_MONO_VERTEX_OBJECT: &str = "NUMAFLOW_MONO_VERTEX_OBJECT";

/// Default max replicas when spec.scale.max is not set. Matches Go's default.
const DEFAULT_MAX_REPLICAS: i32 = 50;

/// Configuration required to run the MonoVertex daemon.
pub(crate) struct MonoVertexConfig {
pub(crate) name: String,
pub(crate) namespace: String,
pub(crate) max_replicas: i32,
}

/// Runs the MonoVertex daemon: one port, TLS, ALPN (http/1.1 + h2).
/// REST routes (/readyz, /livez, etc.) and gRPC are served by the same Axum + Tonic stack.
pub async fn run_monovertex(mvtx_name: String, cln_token: CancellationToken) -> Result<()> {
info!("Starting daemon server for MonoVertex {}", mvtx_name);
pub async fn run_monovertex(cln_token: CancellationToken) -> Result<()> {
let cfg = load_mvtx_config()?;
info!(
"Starting daemon server for MonoVertex {} (namespace={}, max_replicas={})",
cfg.name, cfg.namespace, cfg.max_replicas
);

let addr: SocketAddr = format!("[::]:{}", DAEMON_SERVICE_PORT)
.parse()
.map_err(|e: std::net::AddrParseError| error::Error::Address(e.to_string()))?;

let tls_config = build_rustls_config().await?;

let svc = Arc::new(MvtxDaemonService);
// Create the runtime error cache and start its background refresh task.
let runtime = Arc::new(RuntimeCache::new(&cfg));
let runtime_bg = Arc::clone(&runtime);
let runtime_token = cln_token.clone();
tokio::spawn(async move {
runtime_bg.run(runtime_token).await;
});

let svc = Arc::new(MvtxDaemonService::new(cfg.name, runtime));
let app = make_app(svc);

let handle = Handle::new();
Expand All @@ -66,8 +97,46 @@
Ok(())
}

/// Loads the MonoVertex CR from the `NUMAFLOW_MONO_VERTEX_OBJECT` environment variable
/// (base64-encoded JSON) and returns a [`MonoVertexConfig`] with the fields the daemon needs.
fn load_mvtx_config() -> Result<MonoVertexConfig> {
let b64 = std::env::var(ENV_MONO_VERTEX_OBJECT)
.map_err(|_| error::Error::Init(format!("{ENV_MONO_VERTEX_OBJECT} is not set")))?;

let json = BASE64_STANDARD
.decode(b64.as_bytes())
.map_err(|e| error::Error::Init(format!("Failed to base64-decode CR: {e}")))?;

let mv: MonoVertex = serde_json::from_slice(&json)
.map_err(|e| error::Error::Init(format!("Failed to parse MonoVertex CR: {e}")))?;

let name = mv
.metadata
.as_ref()
.and_then(|m| m.name.clone())
.ok_or_else(|| error::Error::Init("MonoVertex metadata.name is missing".to_string()))?;

let namespace = mv
.metadata
.as_ref()
.and_then(|m| m.namespace.clone())
.unwrap_or_else(|| "default".to_string());

let max_replicas = mv
.spec
.scale
.as_ref()
.and_then(|s| s.max)

Check warning on line 129 in rust/numaflow-daemon/src/lib.rs

View workflow job for this annotation

GitHub Actions / Unit Tests

Diff in /home/runner/work/numaflow/numaflow/rust/numaflow-daemon/src/lib.rs
.unwrap_or(DEFAULT_MAX_REPLICAS);

Ok(MonoVertexConfig { name, namespace, max_replicas })
}

/// Builds the daemon Axum app: readyz, livez, REST API routes, and gRPC fallback.
fn make_app(svc: Arc<MvtxDaemonService>) -> Router {
// The gRPC adapter needs its own clone of the service (Tonic wraps it internally).
// Cloning is cheap: MvtxDaemonService only holds a String and an Arc.
let grpc_svc = (*svc).clone();
let rest_router = Router::new()
.route("/readyz", get(|| async { StatusCode::NO_CONTENT }))
.route("/livez", get(|| async { StatusCode::NO_CONTENT }))
Expand All @@ -79,7 +148,7 @@
)
.with_state(svc);
// Unmatched paths aka. gRPC requests go to gRPC.
rest_router.fallback_service(GrpcAdapter::new())
rest_router.fallback_service(GrpcAdapter::new(grpc_svc))
}

#[cfg(test)]
Expand All @@ -87,10 +156,16 @@
use super::*;
use axum::body::Body;
use http::{Method, Request};
use tower::ServiceExt;

Check warning on line 159 in rust/numaflow-daemon/src/lib.rs

View workflow job for this annotation

GitHub Actions / Unit Tests

Diff in /home/runner/work/numaflow/numaflow/rust/numaflow-daemon/src/lib.rs

fn make_test_svc() -> Arc<MvtxDaemonService> {
let cfg = MonoVertexConfig { name: "test-mvtx".to_string(), namespace: "default".to_string(), max_replicas: 2 };
let runtime = Arc::new(RuntimeCache::new(&cfg));
Arc::new(MvtxDaemonService::new(cfg.name, runtime))
}

fn app() -> Router {
make_app(Arc::new(MvtxDaemonService))
make_app(make_test_svc())
}

#[tokio::test]
Expand Down
Loading
Loading