Skip to content

Commit 155c113

Browse files
Frandoclaude
andcommitted
fix: nftables rule generation and masquerade for L4 load balancer
Add meta l4proto prefix to DNAT rules so nftables has transport protocol context for the inet_service (port) mapping. Add a postrouting masquerade chain with ct status dnat to handle same-subnet backends where the response would otherwise bypass conntrack and miss the reverse DNAT. Also fix test compilation issues and adjust unit test assertions for the updated rule format. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent c2635ea commit 155c113

2 files changed

Lines changed: 55 additions & 17 deletions

File tree

patchbay/src/balancer.rs

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -324,12 +324,23 @@ pub(crate) fn generate_lb_rules(balancers: &[ResolvedBalancer]) -> String {
324324
}
325325

326326
// Distribution via numgen.
327+
// The protocol match is required because the DNAT map includes a port
328+
// (inet_service), and nftables needs L4 protocol context.
327329
let numgen = match balancer.algorithm {
328330
LbAlgorithm::RoundRobin => "numgen inc",
329331
LbAlgorithm::Random => "numgen random",
330332
};
333+
let proto_prefix = match balancer.protocol {
334+
LbProtocol::Tcp => "meta l4proto tcp ",
335+
LbProtocol::Udp => "meta l4proto udp ",
336+
LbProtocol::Both => "meta l4proto { tcp, udp } ",
337+
};
331338
let backend_count = balancer.backends.len();
332-
writeln!(rules, " dnat to {numgen} mod {backend_count} map {{").unwrap();
339+
writeln!(
340+
rules,
341+
" {proto_prefix}dnat to {numgen} mod {backend_count} map {{"
342+
)
343+
.unwrap();
333344
for (index, backend) in balancer.backends.iter().enumerate() {
334345
let comma = if index + 1 < backend_count { "," } else { "" };
335346
writeln!(
@@ -355,6 +366,23 @@ pub(crate) fn generate_lb_rules(balancers: &[ResolvedBalancer]) -> String {
355366
writeln!(rules, " }}").unwrap();
356367
}
357368

369+
// Postrouting masquerade chain.
370+
// When the VIP and backends share a subnet, the backend would respond
371+
// directly to the client (L2), bypassing the router's conntrack. The
372+
// reverse DNAT never fires and the client sees a reply from the
373+
// backend IP instead of the VIP. Masquerading DNAT'd traffic forces
374+
// the backend to reply via the router so conntrack restores the VIP.
375+
// `ct status dnat` matches only packets that were destination-NATted
376+
// by a rule in this table, leaving other traffic untouched.
377+
writeln!(rules, " chain postrouting {{").unwrap();
378+
writeln!(
379+
rules,
380+
" type nat hook postrouting priority srcnat; policy accept;"
381+
)
382+
.unwrap();
383+
writeln!(rules, " ct status dnat masquerade").unwrap();
384+
writeln!(rules, " }}").unwrap();
385+
358386
writeln!(rules, "}}").unwrap();
359387
rules
360388
}
@@ -705,7 +733,7 @@ mod tests {
705733
protocol: LbProtocol::Tcp,
706734
}];
707735
let rules = generate_lb_rules(&balancers);
708-
assert!(rules.contains("numgen inc mod 3"));
736+
assert!(rules.contains("meta l4proto tcp dnat to numgen inc mod 3"));
709737
assert!(rules.contains("10.0.1.1 . 8080"));
710738
assert!(rules.contains("10.0.1.2 . 8080"));
711739
assert!(rules.contains("10.0.1.3 . 8080"));
@@ -738,7 +766,7 @@ mod tests {
738766
let rules = generate_lb_rules(&balancers);
739767
assert!(rules.contains("map sticky_affinity"));
740768
assert!(rules.contains("timeout 3600s"));
741-
assert!(rules.contains("numgen random mod 2"));
769+
assert!(rules.contains("meta l4proto tcp dnat to numgen random mod 2"));
742770
assert!(rules.contains("update @sticky_affinity"));
743771
}
744772

patchbay/src/tests/balancer.rs

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,21 +12,31 @@ async fn spawn_ident_server(device: &Device, port: u16, ident: &str) -> Result<(
1212
let ident = ident.to_string();
1313
device
1414
.spawn(move |_| async move {
15-
let listener = tokio::net::TcpListener::bind(bind).await?;
16-
loop {
17-
let Ok((mut stream, _peer)) = listener.accept().await else {
18-
break;
19-
};
20-
let msg = ident.clone();
21-
tokio::spawn(async move {
22-
let _ = stream.write_all(msg.as_bytes()).await;
23-
});
24-
}
25-
anyhow::Ok(())
15+
let (ready_tx, ready_rx) = oneshot::channel::<Result<()>>();
16+
tokio::spawn(async move {
17+
match tokio::net::TcpListener::bind(bind).await {
18+
Ok(listener) => {
19+
let _ = ready_tx.send(Ok(()));
20+
loop {
21+
let Ok((mut stream, _peer)) = listener.accept().await else {
22+
break;
23+
};
24+
let msg = ident.clone();
25+
tokio::spawn(async move {
26+
let _ = stream.write_all(msg.as_bytes()).await;
27+
});
28+
}
29+
}
30+
Err(err) => {
31+
let _ = ready_tx.send(Err(anyhow!("ident server bind {bind}: {err}")));
32+
}
33+
}
34+
});
35+
ready_rx
36+
.await
37+
.map_err(|_| anyhow!("ident server dropped before ready"))?
2638
})?
2739
.await??;
28-
// Small delay for the listener to be ready.
29-
tokio::time::sleep(Duration::from_millis(100)).await;
3040
Ok(())
3141
}
3242

@@ -288,7 +298,7 @@ async fn udp_balancing() -> Result<()> {
288298
.context("udp recv timeout")??;
289299
let reply = String::from_utf8_lossy(&buf[..len]).to_string();
290300
let ident = reply.split(':').next().unwrap_or("").to_string();
291-
Ok(ident)
301+
anyhow::Ok(ident)
292302
})?
293303
.await??;
294304
seen.insert(ident);

0 commit comments

Comments
 (0)