Blake Smith

create. code. learn.

»

Dynamic Provisioned Load Balancers with Pingora

NOTE: This is a longer explanation to a question I responded to on GitHub about dynamically adding listeners / services to Pingora.

I wanted to show the technique I’m using to dynamically manage Pingora LoadBalancer instances inside a general proxy / load balancer service I’m building. Pingora is a Rust based proxy library from Cloudflare that can be used to build high performance http load balancers and proxy services.

Pingora’s design prompts you to setup your process using a static Service graph that you build during process startup. This similar to patterns found in Guava’s Service interface, for managing several logical asynchronous services inside a single process. You configure your various services (load balancers, proxy services, background health checks, etc), add them to a Server instance, and then start the Server which takes over the lifecycle of your services.

From Pingora’s getting started guide:

fn main() {
    let mut server = Server::new(None).unwrap();
    server.bootstrap();

    let upstreams = LoadBalancer::try_from_iter(["1.1.1.1:443", "1.0.0.1:443"]).unwrap();
    let mut lb = http_proxy_service(&server.configuration, LB(Arc::new(upstreams)));
    lb.add_tcp("0.0.0.0:6188");

    server.add_service(lb);
    server.run_forever();
}

The Server struct normally does a lot of heavy lifting for you:

  1. Clean process startup / shutdown, including handling the correct Service dependency ordering.
  2. Managing each Service’s tokio Runtime. The Server instance creates an individual Runtime instance for each Service that it manages.
  3. Zero downtime listener socket handoff: The Server handles handing off listener file descriptors over a unix socket to a new process to support zero downtime upgrades. This is very similar to how something like Envoy proxy does online hot restarts.

One major problem: After setting up services, the API to start the Server returns a Rust “Never type”. Once your program hands off control to Server#run_forever, you’re never getting it back.

impl Server {
    /// Start the server using Self::run and default RunArgs.
    /// This function will block forever until the server needs to quit. So this would be the last function to call for this object.
    /// Note: this function may fork the process for daemonization, so any additional threads created before this function will be lost to any service logic once this function is called.
    pub fn run_forever(self) -> !
}

It’s quite easy to forgo the Server type entirely, easily use all the Pingora services, and retain full control of your process. This gives you the ability to dynamically start / stop services (including LoadBalancer instances), or do whatever else you want. It’s on you to manage clean shutdown, and provision tokio runtimes. You lose out on some of the other built-ins like zero downtime hot restarts, but in my case, it’s worth it.

Starting a LoadBalancer without a Server is pretty straightforward:

fn make_load_balancer() -> GenBackgroundService<LoadBalancer<RoundRobin>> {
    let backends = Backends::new(Box::new(ResourceDiscovery));
    let mut load_balancer = LoadBalancer::from_backends(backends);
    let health_check = TcpHealthCheck::new();
    load_balancer.set_health_check(health_check);
    load_balancer.health_check_frequency = Some(Duration::from_secs(5));
    load_balancer.update_frequency = Some(Duration::from_secs(30));
    let load_balancer_service = background_service("health_check", load_balancer);
    load_balancer_service
}

fn main() -> Result<(), anyhow::Error> {
    // Manage our own tokio Runtime
    let runtime = tokio::runtime::Runtime::new()
        .expect("Could not start tokio runtime");
    // Create a Vec of tokio task handles, so we can wait for
    // them to finish during process shutdown
    let mut tasks = Vec::new();
    
    // Each service watches this common channel to trigger clean shutdown.
    let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);

    let load_balancer = make_load_balancer();
    // Start a load balancer on the tokio runtime ourselves
    tasks.push(runtime.spawn(async move {
        load_balancer.task()
            .start(shutdown_rx.clone())
            .await
    }));
}

Starting a proxy service is also straightforward:

fn main() -> Result<(), anyhow::Error> {
    // With no Server, we have to manage our own ServerConf
    let server_config: Arc<ServerConf> = Arc::new(Default::default());
    let mut proxy_service = http_proxy_service(&server_config, Proxy);
    proxy_service.add_tcp("0.0.0.0:80");
    
    // Start the http proxy on the tokio Runtime
    tasks.push(runtime.spawn(async move {
        proxy_service.start_service(None, shutdown_rx.clone(), 1)
            .await
    }));
}

Here’s an example where we start a new LoadBalancer service whenever we receive an event on a tokio signal (in this case, a timer). This is the control loop running on main, that we use in place of Server#run_forever:

fn main() -> Result<(), anyhow::Error> {
    let mut tasks = Vec::new();
    let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);

    // Setup services, like above (load balancers, proxies, etc.)
    // then proceed to main control loop.

    runtime.block_on(async {
        let mut interval = tokio::time::interval(Duration::from_secs(30));
        loop {
            tokio::select! {
                // Wait for shutdown. Normally the Server handles all external signal handling for you.
                _ = tokio::signal::ctrl_c() => {
                    tracing::info!("Got shutdown signal. Stopping");
                    // Trigger shutdown to all services
                    shutdown_tx.send(true)?;
                    // Join / wait for all tasks to stop
                    for task in tasks {
                        if let Err(err) = task.await {
                            tracing::error!("Join error during task shutdown: {:?}", err);
                        }
                    }
                    break;
                }

               // Contrived example: Making a LoadBalancer on a timer. In practice, you'd probably stash
               // your LoadBalancer instances in a shared data structure, and start / stop them on whatever
               // signal is meaningful for your service.
               //
               // This example uses a timer, but you can manage this any way you want.
               _ = interval.tick() => {
                   tasks.push(runtime.spawn(async move {
                       make_load_balancer().task()
                           .start(shutdown_rx.clone())
                           .await;
                   }));
              }
           }
        }
        Ok(())
    })
}

In real code, I run through a full reconciliation process inside the proxy process. The proxy calls out to the control plane to fetch the list of configured load balancers, and starts / stops them when they’re created or destroyed. This design allows me to host multiple LoadBalancer instances inside the same process, while still maintaining separate backend configurations for each distinct load balancer. The control tasks run on a separate tokio Runtime than the proxy and load balancer services that handle requests / responses.

Check out Pingora if you’re interested in a Rust based load balancer library.


about the author

Blake Smith is a Principal Software Engineer at Sprout Social.