diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 373f05ab2f..959fd6c928 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -627,6 +627,16 @@ impl ComputeNode { Ok(()) } + /// Merge two branches + #[instrument(skip_all)] + pub fn merge(&self, src_constr: &str) -> Result<()> { + let compute_state = self.state.lock().unwrap().clone(); + let spec = &compute_state.pspec.as_ref().expect("spec must be set").spec; + let mut client = Client::connect(self.connstr.as_str(), NoTls)?; + handle_merge(spec, &mut client, self.connstr.as_str(), src_constr)?; + Ok(()) + } + /// Start Postgres as a child process and manage DBs/roles. /// After that this will hang waiting on the postmaster process to exit. #[instrument(skip_all)] @@ -697,6 +707,7 @@ impl ComputeNode { handle_databases(spec, &mut client)?; handle_role_deletions(spec, self.connstr.as_str(), &mut client)?; handle_grants(spec, &mut client, self.connstr.as_str())?; + handle_replication(spec, &mut client, self.connstr.as_str())?; handle_extensions(spec, &mut client)?; create_availability_check_data(&mut client)?; diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs index 8851be1ec1..06ec36cbaa 100644 --- a/compute_tools/src/http/api.rs +++ b/compute_tools/src/http/api.rs @@ -123,6 +123,12 @@ async fn routes(req: Request, compute: &Arc) -> Response { + info!("serving /merge POST request"); + handle_merge_request(req, compute).await + } + // download extension files from S3 on demand (&Method::POST, route) if route.starts_with("/extension_server/") => { info!("serving {:?} POST request", route); @@ -209,6 +215,18 @@ async fn routes(req: Request, compute: &Arc) -> Response, compute: &Arc) -> Response { + let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap(); + let connstr = String::from_utf8(body_bytes.to_vec()).unwrap(); + match compute.merge(&connstr) { + Ok(_) => Response::new(Body::from("OK")), + Err(e) => { + error!("Branch merge failed: {}", e); + Response::new(Body::from(e.to_string())) + } + } +} + async fn handle_configure_request( req: Request, compute: &Arc, diff --git a/compute_tools/src/http/openapi_spec.yaml b/compute_tools/src/http/openapi_spec.yaml index dc26cc63eb..dc0fd6b47f 100644 --- a/compute_tools/src/http/openapi_spec.yaml +++ b/compute_tools/src/http/openapi_spec.yaml @@ -85,6 +85,37 @@ paths: description: Error text or 'true' if check passed. example: "true" + /merge: + post: + tags: + - Merge + summary: Mere branches. + description: | + This is a blocking API endpoint, i.e. it blocks waiting until + compute is finished configuration and is in `Running` state. + Optional non-blocking mode could be added later. + operationId: mergeBranches + requestBody: + description: connection string of target branch + required: true + content: + text/plain: + schema: + type: string + responses: + 200: + description: Merge finished. + content: + application/json:OK + schema: + $ref: "#/components/schemas/ComputeState" + 500: + description: Merge request failed. + content: + application/json: + schema: + $ref: "#/components/schemas/GenericError" + /configure: post: tags: diff --git a/compute_tools/src/spec.rs b/compute_tools/src/spec.rs index a85d6287b1..b4f86c8514 100644 --- a/compute_tools/src/spec.rs +++ b/compute_tools/src/spec.rs @@ -407,6 +407,51 @@ fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent Ok(()) } +#[instrument(skip_all)] +pub fn handle_merge(spec: &ComputeSpec, client: &mut Client, dst_connstr: &str, src_connstr: &str) -> Result<()> { + info!("Merge branch into {}", dst_connstr); + let existing_dbs = get_existing_dbs(client)?; + + for (_, db) in existing_dbs { + let mut dst_conf = Config::from_str(dst_connstr)?; + dst_conf.dbname(&db.name); + + let mut src_conf = Config::from_str(src_connstr)?; + src_conf.dbname(&db.name); + + let mut pub_client = src_conf.connect(NoTls)?; + let create_pub = format!("create publication pub_merge for tables in schema public"); + pub_client.simple_query(&create_pub)?; + + let mut sub_client = dst_conf.connect(NoTls)?; + let create_sub = format!("create subscription sub_merge connection '{}' publication pub_merge with (create_slot=false, slot_name=merge_slot_{}, copy_data=false)", str::replace(src_connstr, "'", "''"), &db.name); + sub_client.simple_query(&create_sub)?; + } + + Ok(()) +} + +#[instrument(skip_all)] +pub fn handle_replication(spec: &ComputeSpec, client: &mut Client, connstr: &str) -> Result<()> { + info!("Creating logical replication slot"); + let existing_dbs = get_existing_dbs(client)?; + + for (_, db) in existing_dbs { + let mut conf = Config::from_str(connstr)?; + conf.dbname(&db.name); + + let mut db_client = conf.connect(NoTls)?; + + let create_slot = format!( + "select pg_create_logical_replication_slot('merge_slot_{}', 'pgoutput')", + &db.name + ); + db_client.simple_query(&create_slot)?; + } + + Ok(()) +} + /// It follows mostly the same logic as `handle_roles()` excepting that we /// does not use an explicit transactions block, since major database operations /// like `CREATE DATABASE` and `DROP DATABASE` do not support it. Statement-level diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 87ea519a9e..621d242619 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -587,6 +587,21 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) - timeline_info.timeline_id ); } + Some(("merge", branch_match)) => { + let src_endpoint_id = branch_match + .get_one::("src-endpoint") + .map(|s| s.as_str()) + .ok_or(anyhow!("No source endpoint provided"))?; + let dst_endpoint_id = branch_match + .get_one::("dst-endpoint") + .map(|s| s.as_str()) + .ok_or(anyhow!("No destination endpoint provided"))?; + + let cplane = ComputeControlPlane::load(env.clone())?; + let src_endpoint = cplane.endpoints.get(src_endpoint_id).unwrap(); + let dst_endpoint = cplane.endpoints.get(dst_endpoint_id).unwrap(); + dst_endpoint.merge_from(src_endpoint)?; + } Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{sub_name}'"), None => bail!("no tenant subcommand provided"), } @@ -1305,6 +1320,11 @@ fn cli() -> Command { .help("Use last Lsn of another timeline (and its data) as base when creating the new timeline. The timeline gets resolved by its branch name.").required(false)) .arg(Arg::new("ancestor-start-lsn").long("ancestor-start-lsn") .help("When using another timeline as base, use a specific Lsn in it instead of the latest one").required(false))) + .subcommand(Command::new("merge") + .about("Merge changes from one branch into another") + .arg(Arg::new("src-endpoint").long("src-endpoint").help("Source endpoint for merge").required(true)) + .arg(Arg::new("dst-endpoint").long("dst-endpoint").help("Destination endpoint for merge").required(true)) + ) .subcommand(Command::new("create") .about("Create a new blank timeline") .arg(tenant_id_arg.clone()) diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 4443fd8704..a94ddc92c0 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -674,6 +674,29 @@ impl Endpoint { } } + pub fn merge_from(&self, merge_from: &Arc) -> Result<()> { + let client = reqwest::blocking::Client::new(); + let response = client + .post(format!( + "http://{}:{}/merge", + self.http_address.ip(), + self.http_address.port() + )) + .body(merge_from.connstr()) + .send()?; + let status = response.status(); + if !(status.is_client_error() || status.is_server_error()) { + Ok(()) + } else { + let url = response.url().to_owned(); + let msg = match response.text() { + Ok(err_body) => format!("Error: {}", err_body), + Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url), + }; + Err(anyhow::anyhow!(msg)) + } + } + pub fn stop(&self, destroy: bool) -> Result<()> { // If we are going to destroy data directory, // use immediate shutdown mode, otherwise, diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 6737ca5fe3..417eeba54e 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1154,6 +1154,26 @@ class NeonCli(AbstractNeonCli): kwargs["local_binpath"] = True return super().raw_cli(*args, **kwargs) + + def merge( + self, + src_endpoint: Endpoint, + dst_endpoint: Endpoint): + """ + Merge two branches + """ + + args = [ + "timeline", + "merge", + "--src-endpoint", + str(src_endpoint.endpoint_id), + "--dst-endpoint", + str(dst_endpoint.endpoint_id) + ] + res = self.raw_cli(args) + res.check_returncode() + def create_tenant( self, tenant_id: Optional[TenantId] = None, diff --git a/test_runner/regress/test_merge.py b/test_runner/regress/test_merge.py new file mode 100644 index 0000000000..0613a2abec --- /dev/null +++ b/test_runner/regress/test_merge.py @@ -0,0 +1,37 @@ +import time +from fixtures.log_helper import log +from fixtures.neon_fixtures import NeonEnvBuilder +from fixtures.types import TimelineId +from fixtures.utils import query_scalar + +# +# Merge ancestor branch with the main branch. +# +def test_merge(neon_env_builder: NeonEnvBuilder): + env = neon_env_builder.init_start() + pageserver_http = env.pageserver.http_client() + + # Override defaults: 4M checkpoint_distance, disable background compaction and gc. + tenant, _ = env.neon_cli.create_tenant() + + main_branch = env.endpoints.create_start("main", tenant_id=tenant) + main_cur = main_branch.connect().cursor() + + main_cur.execute("CREATE TABLE t(x bigint primary key)") + main_cur.execute("INSERT INTO t values(generate_series(1, 10000))"); + + # Create branch ws. + env.neon_cli.create_branch("ws", "main", tenant_id=tenant) + ws_branch = env.endpoints.create_start("ws", tenant_id=tenant) + log.info("postgres is running on 'ws' branch") + + ws_cur = ws_branch.connect().cursor() + + ws_cur.execute("INSERT INTO t values(generate_series(10001, 20000))") + + env.neon_cli.merge(ws_branch, main_branch) + + # sleep for some time until changes are applied + time.sleep(2) + + assert query_scalar(main_cur, "SELECT count(*) from t") == 20000