Make merging happen on configurator thread

This commit is contained in:
Sasha Krassovsky
2023-11-16 20:52:46 +01:00
parent d19860abf7
commit a5d0f1a92e
6 changed files with 79 additions and 13 deletions

View File

@@ -83,6 +83,7 @@ pub struct ComputeState {
pub last_active: Option<DateTime<Utc>>,
pub error: Option<String>,
pub pspec: Option<ParsedSpec>,
pub merge_src_connstr: Option<String>,
pub metrics: ComputeMetrics,
}
@@ -94,6 +95,7 @@ impl ComputeState {
last_active: None,
error: None,
pspec: None,
merge_src_connstr: None,
metrics: ComputeMetrics::default(),
}
}
@@ -629,11 +631,9 @@ impl ComputeNode {
/// Merge two branches
#[instrument(skip_all)]
pub fn merge(&self, src_constr: &str) -> Result<()> {
let compute_state = self.state.lock().unwrap().clone();
let spec = &compute_state.pspec.as_ref().expect("spec must be set").spec;
pub fn merge(&self, src_connstr: &str) -> Result<()> {
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
handle_merge(spec, &mut client, self.connstr.as_str(), src_constr)?;
handle_merge(&mut client, self.connstr.as_str(), &src_connstr)?;
Ok(())
}

View File

@@ -31,6 +31,22 @@ fn configurator_main_loop(compute: &Arc<ComputeNode>) {
// XXX: used to test that API is blocking
// std::thread::sleep(std::time::Duration::from_millis(10000));
compute.set_status(new_status);
} else if state.status == ComputeStatus::MergePending {
info!("got merge request");
state.status = ComputeStatus::Merging;
compute.state_changed.notify_all();
let mut new_status = ComputeStatus::Failed;
if let Err(e) = compute.merge(&state.merge_src_connstr.clone().unwrap())
{
drop(state);
info!("could not merge compute node: {}", e);
}
else
{
new_status = ComputeStatus::Running;
info!("merge complete");
}
compute.set_status(new_status);
} else if state.status == ComputeStatus::Failed {
info!("compute node is now in Failed state, exiting");

View File

@@ -126,7 +126,13 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
// Handle branch merge request
(&Method::POST, "/merge") => {
info!("serving /merge POST request");
handle_merge_request(req, compute).await
match handle_merge_request(req, compute).await {
Ok(msg) => Response::new(Body::from(msg)),
Err((msg, code)) => {
error!("error handling /merge request: {msg}");
render_json_error(&msg, code)
}
}
}
// download extension files from S3 on demand
@@ -215,16 +221,54 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
}
}
async fn handle_merge_request(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body> {
async fn handle_merge_request(req: Request<Body>, compute: &Arc<ComputeNode>)
-> Result<String, (String, StatusCode)>
{
let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap();
let connstr = String::from_utf8(body_bytes.to_vec()).unwrap();
match compute.merge(&connstr) {
Ok(_) => Response::new(Body::from("OK")),
Err(e) => {
error!("Branch merge failed: {}", e);
Response::new(Body::from(e.to_string()))
let c = compute.clone();
{
let mut state = compute.state.lock().unwrap();
if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running {
let msg = format!(
"invalid compute status for merge request: {:?}",
state.status.clone()
);
return Err((msg, StatusCode::PRECONDITION_FAILED));
}
state.merge_src_connstr = Some(connstr);
state.status = ComputeStatus::MergePending;
compute.state_changed.notify_all();
drop(state);
info!("set new spec and notified waiters");
}
task::spawn_blocking(move || {
let mut state = c.state.lock().unwrap();
while state.status != ComputeStatus::Running {
state = c.state_changed.wait(state).unwrap();
info!(
"waiting for compute to become Running, current status: {:?}",
state.status
);
if state.status == ComputeStatus::Failed {
let err = state.error.as_ref().map_or("unknown error", |x| x);
let msg = format!("compute configuration failed: {:?}", err);
return Err((msg, StatusCode::INTERNAL_SERVER_ERROR));
}
}
Ok(())
})
.await
.unwrap()?;
let state = compute.state.lock().unwrap().clone();
let status_response = status_response_from_state(&state);
Ok(serde_json::to_string(&status_response).unwrap())
}
async fn handle_configure_request(

View File

@@ -408,7 +408,7 @@ fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent
}
#[instrument(skip_all)]
pub fn handle_merge(spec: &ComputeSpec, client: &mut Client, dst_connstr: &str, src_connstr: &str) -> Result<()> {
pub fn handle_merge(client: &mut Client, dst_connstr: &str, src_connstr: &str) -> Result<()> {
info!("Merge branch into {}", dst_connstr);
let existing_dbs = get_existing_dbs(client)?;

View File

@@ -572,7 +572,9 @@ impl Endpoint {
}
ComputeStatus::Empty
| ComputeStatus::ConfigurationPending
| ComputeStatus::Configuration => {
| ComputeStatus::Configuration
| ComputeStatus::MergePending
| ComputeStatus::Merging => {
bail!("unexpected compute status: {:?}", state.status)
}
}

View File

@@ -48,6 +48,10 @@ pub enum ComputeStatus {
Running,
// New spec is being applied.
Configuration,
// Merge requested
MergePending,
// Merge in progress
Merging,
// Either startup or configuration failed,
// compute will exit soon or is waiting for
// control-plane to terminate it.