diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index c2fd1b5a87..c9d3fbfc6d 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -637,6 +637,14 @@ impl ComputeNode { Ok(()) } + /// Mark brnach as mergeable + #[instrument(skip_all)] + pub fn set_mergeable(&self) -> Result<()> { + let mut client = Client::connect(self.connstr.as_str(), NoTls)?; + handle_set_mergeable(&mut client, self.connstr.as_str())?; + Ok(()) + } + /// Start Postgres as a child process and manage DBs/roles. /// After that this will hang waiting on the postmaster process to exit. #[instrument(skip_all)] @@ -707,7 +715,6 @@ impl ComputeNode { handle_databases(spec, &mut client)?; handle_role_deletions(spec, self.connstr.as_str(), &mut client)?; handle_grants(spec, &mut client, self.connstr.as_str())?; - handle_replication(spec, &mut client, self.connstr.as_str())?; handle_extensions(spec, &mut client)?; create_availability_check_data(&mut client)?; diff --git a/compute_tools/src/configurator.rs b/compute_tools/src/configurator.rs index 81d0be9283..5f471d695f 100644 --- a/compute_tools/src/configurator.rs +++ b/compute_tools/src/configurator.rs @@ -40,16 +40,27 @@ fn configurator_main_loop(compute: &Arc) { drop(state); let mut new_status = ComputeStatus::Failed; - if let Err(e) = compute.merge(&connstr) - { + if let Err(e) = compute.merge(&connstr) { info!("could not merge compute node: {}", e); - } - else - { + } else { new_status = ComputeStatus::Running; info!("merge complete"); } compute.set_status(new_status); + } else if state.status == ComputeStatus::SetMergeablePending { + info!("got set mergeable request"); + state.status = ComputeStatus::SetMergeable; + compute.state_changed.notify_all(); + drop(state); + + let mut new_status = ComputeStatus::Failed; + if let Err(e) = compute.set_mergeable() { + info!("could not mark branch as mergeable: {}", e); + } else { + new_status = ComputeStatus::Running; + info!("marked as mergeable"); + } + compute.set_status(new_status); } else if state.status == ComputeStatus::Failed { info!("compute node is now in Failed state, exiting"); break; diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs index 64d2ef4529..3051ef7bb3 100644 --- a/compute_tools/src/http/api.rs +++ b/compute_tools/src/http/api.rs @@ -135,6 +135,18 @@ async fn routes(req: Request, compute: &Arc) -> Response { + info!("serving /set_mergeable POST request"); + match handle_set_mergeable_request(compute).await { + Ok(msg) => Response::new(Body::from(msg)), + Err((msg, code)) => { + error!("error handling /set_mergeable request: {msg}"); + render_json_error(&msg, code) + } + } + } + // download extension files from S3 on demand (&Method::POST, route) if route.starts_with("/extension_server/") => { info!("serving {:?} POST request", route); @@ -221,9 +233,10 @@ async fn routes(req: Request, compute: &Arc) -> Response, compute: &Arc) - -> Result -{ +async fn handle_merge_request( + req: Request, + compute: &Arc, +) -> Result { let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap(); let connstr = String::from_utf8(body_bytes.to_vec()).unwrap(); @@ -260,12 +273,58 @@ async fn handle_merge_request(req: Request, compute: &Arc) return Err((msg, StatusCode::INTERNAL_SERVER_ERROR)); } } - + Ok(()) }) - .await - .unwrap()?; - + .await + .unwrap()?; + + let state = compute.state.lock().unwrap().clone(); + let status_response = status_response_from_state(&state); + Ok(serde_json::to_string(&status_response).unwrap()) +} + +async fn handle_set_mergeable_request( + compute: &Arc, +) -> Result { + let c = compute.clone(); + + { + let mut state = compute.state.lock().unwrap(); + if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running { + let msg = format!( + "invalid compute status for merge request: {:?}", + state.status.clone() + ); + return Err((msg, StatusCode::PRECONDITION_FAILED)); + } + state.status = ComputeStatus::SetMergeablePending; + compute.state_changed.notify_all(); + drop(state); + info!("set new spec and notified waiters"); + } + + task::spawn_blocking(move || { + let mut state = c.state.lock().unwrap(); + while state.status != ComputeStatus::Running { + state = c.state_changed.wait(state).unwrap(); + info!( + "waiting for compute to become Running, current status: {:?}", + state.status + ); + + if state.status == ComputeStatus::Failed { + let err = state.error.as_ref().map_or("unknown error", |x| x); + let msg = format!("compute configuration failed: {:?}", err); + return Err((msg, StatusCode::INTERNAL_SERVER_ERROR)); + } + } + + Ok(()) + }) + .await + .unwrap()?; + let state = compute.state.lock().unwrap().clone(); let status_response = status_response_from_state(&state); Ok(serde_json::to_string(&status_response).unwrap()) diff --git a/compute_tools/src/http/openapi_spec.yaml b/compute_tools/src/http/openapi_spec.yaml index dc0fd6b47f..0c7e437584 100644 --- a/compute_tools/src/http/openapi_spec.yaml +++ b/compute_tools/src/http/openapi_spec.yaml @@ -89,7 +89,7 @@ paths: post: tags: - Merge - summary: Mere branches. + summary: Merge branches. description: | This is a blocking API endpoint, i.e. it blocks waiting until compute is finished configuration and is in `Running` state. @@ -116,6 +116,30 @@ paths: schema: $ref: "#/components/schemas/GenericError" + /set_mergeable: + post: + tags: + - Set mergeable + summary: Mark branch as mergeable. + description: | + This is a blocking API endpoint, i.e. it blocks waiting until + compute is finished configuration and is in `Running` state. + Optional non-blocking mode could be added later. + operationId: markBranchMergeable + responses: + 200: + description: Branch marked as mergeable + content: + application/json:OK + schema: + $ref: "#/components/schemas/ComputeState" + 500: + description: Set mergeable request failed. + content: + application/json: + schema: + $ref: "#/components/schemas/GenericError" + /configure: post: tags: diff --git a/compute_tools/src/spec.rs b/compute_tools/src/spec.rs index d83cf27567..34b057e270 100644 --- a/compute_tools/src/spec.rs +++ b/compute_tools/src/spec.rs @@ -413,16 +413,15 @@ pub fn handle_merge(client: &mut Client, dst_connstr: &str, src_connstr: &str) - let existing_dbs = get_existing_dbs(client)?; for (_, db) in existing_dbs { + if db.name.starts_with("template") { + continue; + } let mut dst_conf = Config::from_str(dst_connstr)?; dst_conf.dbname(&db.name); let mut src_conf = Config::from_str(src_connstr)?; src_conf.dbname(&db.name); - let mut pub_client = src_conf.connect(NoTls)?; - let create_pub = format!("create publication pub_merge for tables in schema public"); - pub_client.simple_query(&create_pub)?; - let mut sub_client = dst_conf.connect(NoTls)?; let create_sub = format!("create subscription sub_merge connection '{}' publication pub_merge with (create_slot=false, slot_name=merge_slot_{}, copy_data=false)", str::replace(src_connstr, "'", "''"), &db.name); sub_client.simple_query(&create_sub)?; @@ -432,11 +431,14 @@ pub fn handle_merge(client: &mut Client, dst_connstr: &str, src_connstr: &str) - } #[instrument(skip_all)] -pub fn handle_replication(spec: &ComputeSpec, client: &mut Client, connstr: &str) -> Result<()> { - info!("Creating logical replication slot"); +pub fn handle_set_mergeable(client: &mut Client, connstr: &str) -> Result<()> { + info!("Mark branch as mergeable"); let existing_dbs = get_existing_dbs(client)?; for (_, db) in existing_dbs { + if db.name.starts_with("template") { + continue; + } let mut conf = Config::from_str(connstr)?; conf.dbname(&db.name); @@ -447,6 +449,7 @@ pub fn handle_replication(spec: &ComputeSpec, client: &mut Client, connstr: &str &db.name ); db_client.simple_query(&create_slot)?; + db_client.simple_query("create publication pub_merge for all tables")?; } Ok(()) diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 621d242619..47f9332b84 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -602,6 +602,16 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) - let dst_endpoint = cplane.endpoints.get(dst_endpoint_id).unwrap(); dst_endpoint.merge_from(src_endpoint)?; } + Some(("set_mergeable", branch_match)) => { + let endpoint_id = branch_match + .get_one::("endpoint") + .map(|s| s.as_str()) + .ok_or(anyhow!("No endpoint provided"))?; + + let cplane = ComputeControlPlane::load(env.clone())?; + let endpoint = cplane.endpoints.get(endpoint_id).unwrap(); + endpoint.set_mergeable()?; + } Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{sub_name}'"), None => bail!("no tenant subcommand provided"), } @@ -1325,6 +1335,10 @@ fn cli() -> Command { .arg(Arg::new("src-endpoint").long("src-endpoint").help("Source endpoint for merge").required(true)) .arg(Arg::new("dst-endpoint").long("dst-endpoint").help("Destination endpoint for merge").required(true)) ) + .subcommand(Command::new("set_mergeable") + .about("Mark branch as mergeable") + .arg(Arg::new("endpoint").long("endpoint").help("Enpoint to be marked as mergeable").required(true)) + ) .subcommand(Command::new("create") .about("Create a new blank timeline") .arg(tenant_id_arg.clone()) diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index dd449e95e5..8e1bee7fb6 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -572,9 +572,11 @@ impl Endpoint { } ComputeStatus::Empty | ComputeStatus::ConfigurationPending - | ComputeStatus::Configuration - | ComputeStatus::MergePending - | ComputeStatus::Merging => { + | ComputeStatus::Configuration + | ComputeStatus::MergePending + | ComputeStatus::Merging + | ComputeStatus::SetMergeablePending + | ComputeStatus::SetMergeable => { bail!("unexpected compute status: {:?}", state.status) } } @@ -699,6 +701,28 @@ impl Endpoint { } } + pub fn set_mergeable(&self) -> Result<()> { + let client = reqwest::blocking::Client::new(); + let response = client + .post(format!( + "http://{}:{}/set_mergeable", + self.http_address.ip(), + self.http_address.port() + )) + .send()?; + let status = response.status(); + if !(status.is_client_error() || status.is_server_error()) { + Ok(()) + } else { + let url = response.url().to_owned(); + let msg = match response.text() { + Ok(err_body) => format!("Error: {}", err_body), + Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url), + }; + Err(anyhow::anyhow!(msg)) + } + } + pub fn stop(&self, destroy: bool) -> Result<()> { // If we are going to destroy data directory, // use immediate shutdown mode, otherwise, diff --git a/libs/compute_api/src/responses.rs b/libs/compute_api/src/responses.rs index 2d56073772..2d5888bab4 100644 --- a/libs/compute_api/src/responses.rs +++ b/libs/compute_api/src/responses.rs @@ -50,8 +50,12 @@ pub enum ComputeStatus { Configuration, // Merge requested MergePending, + // Set mergeable requested + SetMergeablePending, // Merge in progress Merging, + // Set mergeable in progress + SetMergeable, // Either startup or configuration failed, // compute will exit soon or is waiting for // control-plane to terminate it. diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 417eeba54e..99c5637bd0 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1174,6 +1174,22 @@ class NeonCli(AbstractNeonCli): res = self.raw_cli(args) res.check_returncode() + def set_mergeable( + self, + endpoint: Endpoint): + """ + Merge two branches + """ + + args = [ + "timeline", + "set_mergeable", + "--endpoint", + str(endpoint.endpoint_id), + ] + res = self.raw_cli(args) + res.check_returncode() + def create_tenant( self, tenant_id: Optional[TenantId] = None, diff --git a/test_runner/regress/test_merge.py b/test_runner/regress/test_merge.py index 1d0ae04b17..995789f6c1 100644 --- a/test_runner/regress/test_merge.py +++ b/test_runner/regress/test_merge.py @@ -17,6 +17,7 @@ def test_merge(neon_env_builder: NeonEnvBuilder): main_branch = env.endpoints.create_start("main", tenant_id=tenant) main_cur = main_branch.connect().cursor() + # Create table and insert some data main_cur.execute("CREATE TABLE t(x bigint primary key)") main_cur.execute("INSERT INTO t values(generate_series(1, 10000))"); @@ -25,13 +26,18 @@ def test_merge(neon_env_builder: NeonEnvBuilder): ws_branch = env.endpoints.create_start("ws", tenant_id=tenant) log.info("postgres is running on 'ws' branch") + # Merge brnach ws as mergeable:it create logical replication slots and pins WAL + env.neon_cli.set_mergeable(ws_branch) + + # Insert more data in the branch ws_cur = ws_branch.connect().cursor() - ws_cur.execute("select pg_create_logical_replication_slot('merge_slot_postgres', 'pgoutput')") ws_cur.execute("INSERT INTO t values(generate_series(10001, 20000))") + # Merge ws brnach intp main env.neon_cli.merge(ws_branch, main_branch) # sleep for some time until changes are applied time.sleep(2) + # Check that changes are merged assert query_scalar(main_cur, "SELECT count(*) from t") == 20000