Pattern for dynamically adding new listeners/services #641
Replies: 2 comments 1 reply
-
|
Right now we don't have a supported way of doing this and no plans for this in the immediate term. I don't recall that we have an issue on this already if you'd like to make a feature request for the long term. |
Beta Was this translation helpful? Give feedback.
-
|
I needed this as well, to be able to dynamically start / stop
The bonus is: You control your Here's an example of creating a struct Proxy;
#[async_trait]
impl ProxyHttp for Proxy {
// Our own proxying struct
async fn upstream_peer(
&self,
session: &mut Session,
ctxt: &mut ProxyRequestContext,
) -> pingora::Result<Box<HttpPeer>>
todo!()
}
};
struct ResourceDiscovery;
#[async_trait]
impl ServiceDiscovery for ResourceDiscovery {
async fn discover(&self) -> pingora::Result<(BTreeSet<Backend>, HashMap<u64, bool>)> {
todo!()
}
}
fn make_load_balancer() -> GenBackgroundService<LoadBalancer<RoundRobin>> {
let backends = Backends::new(Box::new(ResourceDiscovery));
let mut load_balancer = LoadBalancer::from_backends(backends);
let health_check = TcpHealthCheck::new();
load_balancer.set_health_check(health_check);
load_balancer.health_check_frequency = Some(Duration::from_secs(5));
load_balancer.update_frequency = Some(Duration::from_secs(30));
let load_balancer_service = background_service("health_check", load_balancer);
load_balancer_service
}
fn main() -> Result<(), anyhow::Error> {
// Manage our own tokio Runtime
let runtime = tokio::runtime::Runtime::new()
.expect("Could not start tokio runtime");
// Create a Vec of tokio task handles
let mut tasks = Vec::new();
// Each service watches this common channel to trigger clean shutdown.
let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
let load_balancer = make_load_balancer();
// Start a load balancer on the tokio runtime ourselves
tasks.push(runtime.spawn(async move {
load_balancer.task()
.start(shutdown_rx.clone())
.await
}));
// With no Server, we have to manage our own ServerConf
let server_config: Arc<ServerConf> = Arc::new(Default::default());
let mut proxy_service = http_proxy_service(&server_config, Proxy);
proxy_service.add_tcp("0.0.0.0:80");
// Start the http proxy on the tokio Runtime
tasks.push(runtime.spawn(async move {
proxy_service.start_service(None, shutdown_rx.clone(), 1)
.await
}));
runtime.block_on(async {
let mut interval = tokio::time::interval(Duration::from_secs(30));
loop {
tokio::select! {
// Wait for shutdown. Normally the Server blocks on this for you.
_ = tokio::signal::ctrl_c() => {
tracing::info!("Got shutdown signal. Stopping");
// Trigger shutdown to all services
tx.send(true)?;
// Join / wait for all tasks to stop
for task in tasks {
if let Err(err) = task.await {
tracing::error!("Join error during task shutdown: {:?}", err);
}
}
break;
}
// Contrived example: Making a LoadBalancer on a timer. In practice, you'd probably stash
// your LoadBalancer instances in a shared data structure, and start / stop them on whatever
// signal is meaningful for your service
_ = interval.tick() => {
tasks.push(runtime.spawn(async move {
make_load_balancer().task()
.start(shutdown_rx.clone())
.await;
}));
}
}
}
Ok(())
})
} |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Hi there,
is there a supported way in pingora to add new ports/listeners dynamically? basically i want to start services on new ports sometime after i initially start my server up (based on inbound data to one of my services), ideally without restarting the process. All the examples i've seen specify the services/listeners up front and then block on the
run_forevercall.Any suggestions here or am i just going down a bad path? Thank you in advance. Btw Pingora is an incredible tool and i've already gotten so much value out of it!
Beta Was this translation helpful? Give feedback.
All reactions