diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 959fd6c928..c2fd1b5a87 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -83,6 +83,7 @@ pub struct ComputeState { pub last_active: Option>, pub error: Option, pub pspec: Option, + pub merge_src_connstr: Option, pub metrics: ComputeMetrics, } @@ -94,6 +95,7 @@ impl ComputeState { last_active: None, error: None, pspec: None, + merge_src_connstr: None, metrics: ComputeMetrics::default(), } } @@ -629,11 +631,9 @@ impl ComputeNode { /// Merge two branches #[instrument(skip_all)] - pub fn merge(&self, src_constr: &str) -> Result<()> { - let compute_state = self.state.lock().unwrap().clone(); - let spec = &compute_state.pspec.as_ref().expect("spec must be set").spec; + pub fn merge(&self, src_connstr: &str) -> Result<()> { let mut client = Client::connect(self.connstr.as_str(), NoTls)?; - handle_merge(spec, &mut client, self.connstr.as_str(), src_constr)?; + handle_merge(&mut client, self.connstr.as_str(), &src_connstr)?; Ok(()) } diff --git a/compute_tools/src/configurator.rs b/compute_tools/src/configurator.rs index 274a221ac7..448c59c355 100644 --- a/compute_tools/src/configurator.rs +++ b/compute_tools/src/configurator.rs @@ -31,6 +31,22 @@ fn configurator_main_loop(compute: &Arc) { // XXX: used to test that API is blocking // std::thread::sleep(std::time::Duration::from_millis(10000)); + compute.set_status(new_status); + } else if state.status == ComputeStatus::MergePending { + info!("got merge request"); + state.status = ComputeStatus::Merging; + compute.state_changed.notify_all(); + let mut new_status = ComputeStatus::Failed; + if let Err(e) = compute.merge(&state.merge_src_connstr.clone().unwrap()) + { + drop(state); + info!("could not merge compute node: {}", e); + } + else + { + new_status = ComputeStatus::Running; + info!("merge complete"); + } compute.set_status(new_status); } else if state.status == ComputeStatus::Failed { info!("compute node is now in Failed state, exiting"); diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs index 06ec36cbaa..64d2ef4529 100644 --- a/compute_tools/src/http/api.rs +++ b/compute_tools/src/http/api.rs @@ -126,7 +126,13 @@ async fn routes(req: Request, compute: &Arc) -> Response { info!("serving /merge POST request"); - handle_merge_request(req, compute).await + match handle_merge_request(req, compute).await { + Ok(msg) => Response::new(Body::from(msg)), + Err((msg, code)) => { + error!("error handling /merge request: {msg}"); + render_json_error(&msg, code) + } + } } // download extension files from S3 on demand @@ -215,16 +221,54 @@ async fn routes(req: Request, compute: &Arc) -> Response, compute: &Arc) -> Response { +async fn handle_merge_request(req: Request, compute: &Arc) + -> Result +{ let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap(); let connstr = String::from_utf8(body_bytes.to_vec()).unwrap(); - match compute.merge(&connstr) { - Ok(_) => Response::new(Body::from("OK")), - Err(e) => { - error!("Branch merge failed: {}", e); - Response::new(Body::from(e.to_string())) + + let c = compute.clone(); + + { + let mut state = compute.state.lock().unwrap(); + if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running { + let msg = format!( + "invalid compute status for merge request: {:?}", + state.status.clone() + ); + return Err((msg, StatusCode::PRECONDITION_FAILED)); } + state.merge_src_connstr = Some(connstr); + state.status = ComputeStatus::MergePending; + compute.state_changed.notify_all(); + drop(state); + info!("set new spec and notified waiters"); } + + task::spawn_blocking(move || { + let mut state = c.state.lock().unwrap(); + while state.status != ComputeStatus::Running { + state = c.state_changed.wait(state).unwrap(); + info!( + "waiting for compute to become Running, current status: {:?}", + state.status + ); + + if state.status == ComputeStatus::Failed { + let err = state.error.as_ref().map_or("unknown error", |x| x); + let msg = format!("compute configuration failed: {:?}", err); + return Err((msg, StatusCode::INTERNAL_SERVER_ERROR)); + } + } + + Ok(()) + }) + .await + .unwrap()?; + + let state = compute.state.lock().unwrap().clone(); + let status_response = status_response_from_state(&state); + Ok(serde_json::to_string(&status_response).unwrap()) } async fn handle_configure_request( diff --git a/compute_tools/src/spec.rs b/compute_tools/src/spec.rs index b4f86c8514..d83cf27567 100644 --- a/compute_tools/src/spec.rs +++ b/compute_tools/src/spec.rs @@ -408,7 +408,7 @@ fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent } #[instrument(skip_all)] -pub fn handle_merge(spec: &ComputeSpec, client: &mut Client, dst_connstr: &str, src_connstr: &str) -> Result<()> { +pub fn handle_merge(client: &mut Client, dst_connstr: &str, src_connstr: &str) -> Result<()> { info!("Merge branch into {}", dst_connstr); let existing_dbs = get_existing_dbs(client)?; diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index a94ddc92c0..dd449e95e5 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -572,7 +572,9 @@ impl Endpoint { } ComputeStatus::Empty | ComputeStatus::ConfigurationPending - | ComputeStatus::Configuration => { + | ComputeStatus::Configuration + | ComputeStatus::MergePending + | ComputeStatus::Merging => { bail!("unexpected compute status: {:?}", state.status) } } diff --git a/libs/compute_api/src/responses.rs b/libs/compute_api/src/responses.rs index 92bbf79cd4..2d56073772 100644 --- a/libs/compute_api/src/responses.rs +++ b/libs/compute_api/src/responses.rs @@ -48,6 +48,10 @@ pub enum ComputeStatus { Running, // New spec is being applied. Configuration, + // Merge requested + MergePending, + // Merge in progress + Merging, // Either startup or configuration failed, // compute will exit soon or is waiting for // control-plane to terminate it.