Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ members = ["examples", "jito_protos", "proxy"]
resolver = "2"

[workspace.package]
version = "0.2.12"
version = "0.2.13"
description = "Fast path to receive shreds from Jito, forwarding to local consumers. See https://docs.jito.wtf/lowlatencytxnfeed/ for details."
authors = ["Jito Team <team@jito.wtf>"]
homepage = "https://jito.wtf/"
Expand Down
2 changes: 1 addition & 1 deletion proxy/src/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,7 @@ mod tests {
thread::spawn(move || listen_and_collect(socket, to_receive));
});

let (reconstruct_tx, _reconstruct_rx) = crossbeam_channel::bounded(1_024);
let (reconstruct_tx, _reconstruct_rx) = crossbeam_channel::bounded(10_240);
// send packets
recv_from_channel_and_send_multiple_dest(
packet_receiver.recv(),
Expand Down
113 changes: 113 additions & 0 deletions proxy/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,119 @@ pub fn heartbeat_loop_thread(
}).unwrap()
}

/// Simplified heartbeat loop that only performs authentication and sends heartbeats.
/// Does not check for received shreds or track metrics. Used for subscribe-only mode.
#[allow(clippy::too_many_arguments)]
pub fn heartbeat_loop_subscribe_only(
block_engine_url: String,
auth_url: String,
auth_keypair: Arc<Keypair>,
desired_regions: Vec<String>,
recv_socket: SocketAddr,
runtime: Runtime,
service_name: String,
shutdown_receiver: Receiver<()>,
exit: Arc<AtomicBool>,
) -> JoinHandle<()> {
Builder::new()
.name("ssPxyHbeatSub".to_string())
.spawn(move || {
let heartbeat_socket = jito_protos::shared::Socket {
ip: recv_socket.ip().to_string(),
port: recv_socket.port() as i64,
};
let mut heartbeat_interval = Duration::from_secs(1);
let mut heartbeat_tick = crossbeam_channel::tick(heartbeat_interval);
let mut client_restart_count_cumulative = 0u64;
let mut successful_heartbeat_count_cumulative = 0u64;
let mut failed_heartbeat_count_cumulative = 0u64;

while !exit.load(Ordering::Relaxed) {
let per_con_exit = ScopedAtomicBool::default();
info!("Starting heartbeat client (subscribe-only mode)");
let shredstream_client_res = runtime.block_on(get_grpc_client(
block_engine_url.clone(),
auth_url.clone(),
auth_keypair.clone(),
service_name.clone(),
per_con_exit.get_inner_clone(),
));

let (mut shredstream_client, _refresh_thread_hdl) = match shredstream_client_res {
Ok(c) => c,
Err(e) => {
warn!("Failed to connect to block engine, retrying. Error: {e}");
client_restart_count_cumulative += 1;
datapoint_warn!(
"shredstream_proxy-heartbeat_client_error",
"block_engine_url" => block_engine_url,
("errors", 1, i64),
("error_str", e.to_string(), String),
);
sleep(Duration::from_secs(5));
continue;
}
};

let mut successful_heartbeat_count = 0u64;
let mut failed_heartbeat_count = 0u64;

while !exit.load(Ordering::Relaxed) {
crossbeam_channel::select! {
recv(heartbeat_tick) -> _ => {
let heartbeat_result = runtime.block_on(
shredstream_client.send_heartbeat(Heartbeat {
socket: Some(heartbeat_socket.clone()),
regions: desired_regions.clone(),
})
);

match heartbeat_result {
Ok(hb) => {
let new_interval =
Duration::from_millis((hb.get_ref().ttl_ms / 3) as u64);
if heartbeat_interval != new_interval {
info!("Sending heartbeat every {new_interval:?}.");
heartbeat_interval = new_interval;
heartbeat_tick = crossbeam_channel::tick(new_interval);
}
successful_heartbeat_count += 1;
}
Err(err) => {
if err.code() == Code::InvalidArgument {
panic!("Invalid arguments: {err}.");
};
warn!("Error sending heartbeat: {err}");
failed_heartbeat_count += 1;
// Restart client on repeated failures
if failed_heartbeat_count > 5 {
warn!("Too many heartbeat failures, restarting client.");
break;
}
}
}
}

recv(shutdown_receiver) -> _ => {
break;
}
}
}

successful_heartbeat_count_cumulative += successful_heartbeat_count;
failed_heartbeat_count_cumulative += failed_heartbeat_count;
}

info!(
"Exiting heartbeat thread (subscribe-only), sent {} successful, {} failed heartbeats. Client restarted {} times.",
successful_heartbeat_count_cumulative,
failed_heartbeat_count_cumulative,
client_restart_count_cumulative
);
})
.unwrap()
}

pub async fn get_grpc_client(
block_engine_url: String,
auth_url: String,
Expand Down
103 changes: 103 additions & 0 deletions proxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ enum ProxySubcommands {

/// Does not request shreds from Jito. Sends anything received on `src-bind-addr`:`src-bind-port` to all destinations.
ForwardOnly(CommonArgs),

/// Requests shreds from Jito to be sent to a specific port without binding a socket or forwarding.
/// Only performs authentication and heartbeat. Useful when another process is listening on the destination port.
SubscribeOnly(SubscribeOnlyArgs),
}

#[derive(clap::Args, Clone, Debug)]
Expand All @@ -82,6 +86,36 @@ struct ShredstreamArgs {
common_args: CommonArgs,
}

#[derive(clap::Args, Clone, Debug)]
struct SubscribeOnlyArgs {
/// Address for Jito Block Engine.
/// See https://jito-labs.gitbook.io/mev/searcher-resources/block-engine#connection-details
#[arg(long, env)]
block_engine_url: String,

/// Manual override for auth service address. For internal use.
#[arg(long, env)]
auth_url: Option<String>,

/// Path to keypair file used to authenticate with the backend.
#[arg(long, env)]
auth_keypair: PathBuf,

/// Desired regions to receive heartbeats from.
/// Receives `n` different streams. Requires at least 1 region, comma separated.
#[arg(long, env, value_delimiter = ',', required(true))]
desired_regions: Vec<String>,

/// Port to tell Jito to send shreds to. This process will NOT listen on this port.
#[arg(long, env)]
dest_port: u16,

/// Public IP address to use.
/// Overrides value fetched from `ifconfig.me`.
#[arg(long, env)]
public_ip: Option<IpAddr>,
}

#[derive(clap::Args, Clone, Debug)]
struct CommonArgs {
/// Address where Shredstream proxy listens.
Expand Down Expand Up @@ -221,10 +255,17 @@ fn main() -> Result<(), ShredstreamProxyError> {
let all_args: Args = Args::parse();

let shredstream_args = all_args.shredstream_args.clone();

// Handle SubscribeOnly mode separately - it only needs heartbeat
if let ProxySubcommands::SubscribeOnly(ref sub_args) = shredstream_args {
return run_subscribe_only_mode(sub_args.clone());
}

// common args
let args = match all_args.shredstream_args {
ProxySubcommands::Shredstream(x) => x.common_args,
ProxySubcommands::ForwardOnly(x) => x,
ProxySubcommands::SubscribeOnly(_) => unreachable!(),
};
set_host_id(hostname::get()?.into_string().unwrap());
if (args.endpoint_discovery_url.is_none() && args.discovered_endpoints_port.is_some())
Expand Down Expand Up @@ -376,6 +417,68 @@ fn main() -> Result<(), ShredstreamProxyError> {
Ok(())
}

fn run_subscribe_only_mode(args: SubscribeOnlyArgs) -> Result<(), ShredstreamProxyError> {
set_host_id(hostname::get()?.into_string().unwrap());

let exit = Arc::new(AtomicBool::new(false));
let (shutdown_sender, shutdown_receiver) =
shutdown_notifier(exit.clone()).expect("Failed to set up signal handler");
let panic_hook = panic::take_hook();
{
let exit = exit.clone();
panic::set_hook(Box::new(move |panic_info| {
exit.store(true, Ordering::SeqCst);
let _ = shutdown_sender.send(());
error!("exiting process");
sleep(Duration::from_secs(1));
panic_hook(panic_info);
}));
}

let runtime = Runtime::new()?;

if args.desired_regions.len() > 2 {
warn!(
"Too many regions requested, only regions: {:?} will be used",
&args.desired_regions[..2]
);
}

let auth_keypair = Arc::new(
read_keypair_file(Path::new(&args.auth_keypair)).unwrap_or_else(|e| {
panic!(
"Unable to parse keypair file. Ensure that file {:?} is readable. Error: {e}",
args.auth_keypair
)
}),
);

let public_ip = args.public_ip.unwrap_or_else(|| get_public_ip().unwrap());
let recv_socket = SocketAddr::new(public_ip, args.dest_port);

info!(
"Starting subscribe-only mode. Requesting shreds to be sent to {}",
recv_socket
);

let heartbeat_hdl = heartbeat::heartbeat_loop_subscribe_only(
args.block_engine_url.clone(),
args.auth_url.unwrap_or(args.block_engine_url),
auth_keypair,
args.desired_regions,
recv_socket,
runtime,
"shredstream_proxy_subscribe".to_string(),
shutdown_receiver,
exit,
);

heartbeat_hdl.join().expect("heartbeat thread panicked");

info!("Exiting subscribe-only mode.");
Ok(())
}

fn start_heartbeat(
args: ShredstreamArgs,
exit: &Arc<AtomicBool>,
Expand Down
Loading