mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-28 10:30:40 +00:00
Add set_mergeable
This commit is contained in:
@@ -637,6 +637,14 @@ impl ComputeNode {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Mark brnach as mergeable
|
||||
#[instrument(skip_all)]
|
||||
pub fn set_mergeable(&self) -> Result<()> {
|
||||
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
|
||||
handle_set_mergeable(&mut client, self.connstr.as_str())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Start Postgres as a child process and manage DBs/roles.
|
||||
/// After that this will hang waiting on the postmaster process to exit.
|
||||
#[instrument(skip_all)]
|
||||
@@ -707,7 +715,6 @@ impl ComputeNode {
|
||||
handle_databases(spec, &mut client)?;
|
||||
handle_role_deletions(spec, self.connstr.as_str(), &mut client)?;
|
||||
handle_grants(spec, &mut client, self.connstr.as_str())?;
|
||||
handle_replication(spec, &mut client, self.connstr.as_str())?;
|
||||
handle_extensions(spec, &mut client)?;
|
||||
create_availability_check_data(&mut client)?;
|
||||
|
||||
|
||||
@@ -40,16 +40,27 @@ fn configurator_main_loop(compute: &Arc<ComputeNode>) {
|
||||
drop(state);
|
||||
|
||||
let mut new_status = ComputeStatus::Failed;
|
||||
if let Err(e) = compute.merge(&connstr)
|
||||
{
|
||||
if let Err(e) = compute.merge(&connstr) {
|
||||
info!("could not merge compute node: {}", e);
|
||||
}
|
||||
else
|
||||
{
|
||||
} else {
|
||||
new_status = ComputeStatus::Running;
|
||||
info!("merge complete");
|
||||
}
|
||||
compute.set_status(new_status);
|
||||
} else if state.status == ComputeStatus::SetMergeablePending {
|
||||
info!("got set mergeable request");
|
||||
state.status = ComputeStatus::SetMergeable;
|
||||
compute.state_changed.notify_all();
|
||||
drop(state);
|
||||
|
||||
let mut new_status = ComputeStatus::Failed;
|
||||
if let Err(e) = compute.set_mergeable() {
|
||||
info!("could not mark branch as mergeable: {}", e);
|
||||
} else {
|
||||
new_status = ComputeStatus::Running;
|
||||
info!("marked as mergeable");
|
||||
}
|
||||
compute.set_status(new_status);
|
||||
} else if state.status == ComputeStatus::Failed {
|
||||
info!("compute node is now in Failed state, exiting");
|
||||
break;
|
||||
|
||||
@@ -135,6 +135,18 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
|
||||
}
|
||||
}
|
||||
|
||||
// Handle branch set mergeable request
|
||||
(&Method::POST, "/set_mergeable") => {
|
||||
info!("serving /set_mergeable POST request");
|
||||
match handle_set_mergeable_request(compute).await {
|
||||
Ok(msg) => Response::new(Body::from(msg)),
|
||||
Err((msg, code)) => {
|
||||
error!("error handling /set_mergeable request: {msg}");
|
||||
render_json_error(&msg, code)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// download extension files from S3 on demand
|
||||
(&Method::POST, route) if route.starts_with("/extension_server/") => {
|
||||
info!("serving {:?} POST request", route);
|
||||
@@ -221,9 +233,10 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_merge_request(req: Request<Body>, compute: &Arc<ComputeNode>)
|
||||
-> Result<String, (String, StatusCode)>
|
||||
{
|
||||
async fn handle_merge_request(
|
||||
req: Request<Body>,
|
||||
compute: &Arc<ComputeNode>,
|
||||
) -> Result<String, (String, StatusCode)> {
|
||||
let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap();
|
||||
let connstr = String::from_utf8(body_bytes.to_vec()).unwrap();
|
||||
|
||||
@@ -260,12 +273,58 @@ async fn handle_merge_request(req: Request<Body>, compute: &Arc<ComputeNode>)
|
||||
return Err((msg, StatusCode::INTERNAL_SERVER_ERROR));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
.unwrap()?;
|
||||
|
||||
.await
|
||||
.unwrap()?;
|
||||
|
||||
let state = compute.state.lock().unwrap().clone();
|
||||
let status_response = status_response_from_state(&state);
|
||||
Ok(serde_json::to_string(&status_response).unwrap())
|
||||
}
|
||||
|
||||
async fn handle_set_mergeable_request(
|
||||
compute: &Arc<ComputeNode>,
|
||||
) -> Result<String, (String, StatusCode)> {
|
||||
let c = compute.clone();
|
||||
|
||||
{
|
||||
let mut state = compute.state.lock().unwrap();
|
||||
if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running {
|
||||
let msg = format!(
|
||||
"invalid compute status for merge request: {:?}",
|
||||
state.status.clone()
|
||||
);
|
||||
return Err((msg, StatusCode::PRECONDITION_FAILED));
|
||||
}
|
||||
state.status = ComputeStatus::SetMergeablePending;
|
||||
compute.state_changed.notify_all();
|
||||
drop(state);
|
||||
info!("set new spec and notified waiters");
|
||||
}
|
||||
|
||||
task::spawn_blocking(move || {
|
||||
let mut state = c.state.lock().unwrap();
|
||||
while state.status != ComputeStatus::Running {
|
||||
state = c.state_changed.wait(state).unwrap();
|
||||
info!(
|
||||
"waiting for compute to become Running, current status: {:?}",
|
||||
state.status
|
||||
);
|
||||
|
||||
if state.status == ComputeStatus::Failed {
|
||||
let err = state.error.as_ref().map_or("unknown error", |x| x);
|
||||
let msg = format!("compute configuration failed: {:?}", err);
|
||||
return Err((msg, StatusCode::INTERNAL_SERVER_ERROR));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
.unwrap()?;
|
||||
|
||||
let state = compute.state.lock().unwrap().clone();
|
||||
let status_response = status_response_from_state(&state);
|
||||
Ok(serde_json::to_string(&status_response).unwrap())
|
||||
|
||||
@@ -89,7 +89,7 @@ paths:
|
||||
post:
|
||||
tags:
|
||||
- Merge
|
||||
summary: Mere branches.
|
||||
summary: Merge branches.
|
||||
description: |
|
||||
This is a blocking API endpoint, i.e. it blocks waiting until
|
||||
compute is finished configuration and is in `Running` state.
|
||||
@@ -116,6 +116,30 @@ paths:
|
||||
schema:
|
||||
$ref: "#/components/schemas/GenericError"
|
||||
|
||||
/set_mergeable:
|
||||
post:
|
||||
tags:
|
||||
- Set mergeable
|
||||
summary: Mark branch as mergeable.
|
||||
description: |
|
||||
This is a blocking API endpoint, i.e. it blocks waiting until
|
||||
compute is finished configuration and is in `Running` state.
|
||||
Optional non-blocking mode could be added later.
|
||||
operationId: markBranchMergeable
|
||||
responses:
|
||||
200:
|
||||
description: Branch marked as mergeable
|
||||
content:
|
||||
application/json:OK
|
||||
schema:
|
||||
$ref: "#/components/schemas/ComputeState"
|
||||
500:
|
||||
description: Set mergeable request failed.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/GenericError"
|
||||
|
||||
/configure:
|
||||
post:
|
||||
tags:
|
||||
|
||||
@@ -413,16 +413,15 @@ pub fn handle_merge(client: &mut Client, dst_connstr: &str, src_connstr: &str) -
|
||||
let existing_dbs = get_existing_dbs(client)?;
|
||||
|
||||
for (_, db) in existing_dbs {
|
||||
if db.name.starts_with("template") {
|
||||
continue;
|
||||
}
|
||||
let mut dst_conf = Config::from_str(dst_connstr)?;
|
||||
dst_conf.dbname(&db.name);
|
||||
|
||||
let mut src_conf = Config::from_str(src_connstr)?;
|
||||
src_conf.dbname(&db.name);
|
||||
|
||||
let mut pub_client = src_conf.connect(NoTls)?;
|
||||
let create_pub = format!("create publication pub_merge for tables in schema public");
|
||||
pub_client.simple_query(&create_pub)?;
|
||||
|
||||
let mut sub_client = dst_conf.connect(NoTls)?;
|
||||
let create_sub = format!("create subscription sub_merge connection '{}' publication pub_merge with (create_slot=false, slot_name=merge_slot_{}, copy_data=false)", str::replace(src_connstr, "'", "''"), &db.name);
|
||||
sub_client.simple_query(&create_sub)?;
|
||||
@@ -432,11 +431,14 @@ pub fn handle_merge(client: &mut Client, dst_connstr: &str, src_connstr: &str) -
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub fn handle_replication(spec: &ComputeSpec, client: &mut Client, connstr: &str) -> Result<()> {
|
||||
info!("Creating logical replication slot");
|
||||
pub fn handle_set_mergeable(client: &mut Client, connstr: &str) -> Result<()> {
|
||||
info!("Mark branch as mergeable");
|
||||
let existing_dbs = get_existing_dbs(client)?;
|
||||
|
||||
for (_, db) in existing_dbs {
|
||||
if db.name.starts_with("template") {
|
||||
continue;
|
||||
}
|
||||
let mut conf = Config::from_str(connstr)?;
|
||||
conf.dbname(&db.name);
|
||||
|
||||
@@ -447,6 +449,7 @@ pub fn handle_replication(spec: &ComputeSpec, client: &mut Client, connstr: &str
|
||||
&db.name
|
||||
);
|
||||
db_client.simple_query(&create_slot)?;
|
||||
db_client.simple_query("create publication pub_merge for all tables")?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
Reference in New Issue
Block a user