From e288cd2198b759041d684c64410f4234a981bd04 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Mon, 16 Jun 2025 22:43:33 +0100 Subject: [PATCH] fix concurrent reconfigure while TLS configuration is taking place --- compute_tools/src/http/routes/configure.rs | 44 +++++++++------------- 1 file changed, 18 insertions(+), 26 deletions(-) diff --git a/compute_tools/src/http/routes/configure.rs b/compute_tools/src/http/routes/configure.rs index 943ff45357..89db0e8364 100644 --- a/compute_tools/src/http/routes/configure.rs +++ b/compute_tools/src/http/routes/configure.rs @@ -27,32 +27,6 @@ pub(in crate::http) async fn configure( Err(e) => return JsonResponse::error(StatusCode::BAD_REQUEST, e), }; - // XXX: wrap state update under lock in a code block. Otherwise, we will try - // to `Send` `mut state` into the spawned thread bellow, which will cause - // the following rustc error: - // - // error: future cannot be sent between threads safely - { - let mut state = compute.state.lock().unwrap(); - if !matches!(state.status, ComputeStatus::Empty | ComputeStatus::Running) { - return JsonResponse::invalid_status(state.status); - } - - // Pass the tracing span to the main thread that performs the startup, - // so that the start_compute operation is considered a child of this - // configure request for tracing purposes. - state.startup_span = Some(tracing::Span::current()); - - if compute.params.lakebase_mode { - ComputeNode::set_spec(&compute.params, &mut state, pspec); - } else { - state.pspec = Some(pspec); - } - - state.set_status(ComputeStatus::ConfigurationPending, &compute.state_changed); - drop(state); - } - // Spawn a blocking thread to wait for compute to become Running. This is // needed to not block the main pool of workers and to be able to serve // other requests while some particular request is waiting for compute to @@ -60,6 +34,24 @@ pub(in crate::http) async fn configure( let c = compute.clone(); let completed = task::spawn_blocking(move || { let mut state = c.state.lock().unwrap(); + while !matches!(state.status, ComputeStatus::Empty | ComputeStatus::Running) { + // wait until we are not concurrently configuring + state = c.state_changed.wait(state).unwrap(); + } + + // Pass the tracing span to the main thread that performs the startup, + // so that the start_compute operation is considered a child of this + // configure request for tracing purposes. + state.startup_span = Some(tracing::Span::current()); + + if c.params.lakebase_mode { + ComputeNode::set_spec(&c.params, &mut state, pspec); + } else { + state.pspec = Some(pspec); + } + + state.set_status(ComputeStatus::ConfigurationPending, &c.state_changed); + while state.status != ComputeStatus::Running { state = c.state_changed.wait(state).unwrap(); info!(