From bca4deaaf6c87d90d005e544cda772d4b40b5fa4 Mon Sep 17 00:00:00 2001 From: Sasha Krassovsky Date: Thu, 16 Nov 2023 15:32:14 +0100 Subject: [PATCH] Add support for `neon_local merge` --- control_plane/src/bin/neon_local.rs | 28 ++++++++++++++++++++++ control_plane/src/endpoint.rs | 36 +++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+) diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 87ea519a9e..021fc6240e 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -587,6 +587,28 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) - timeline_info.timeline_id ); } + Some(("merge", branch_match)) => + { + let tenant_id = get_tenant_id(branch_match, env)?; + let branch_name = branch_match + .get_one::("branch-name") + .map(|s| s.as_str()) + .unwrap_or(DEFAULT_BRANCH_NAME); + let from_branch_name = branch_match + .get_one::("from-branch-name") + .map(|s| s.as_str()) + .ok_or(anyhow!("No branch name provided"))?; + let cplane = ComputeControlPlane::load(env.clone())?; + let into_timeline = env + .get_branch_timeline_id(branch_name, tenant_id) + .ok_or(anyhow!("Timeline not found for given branch/tenant"))?; + let from_timeline = env + .get_branch_timeline_id(from_branch_name, tenant_id) + .ok_or(anyhow!("Timeline not found for given from-branch/tenant"))?; + let into_ep = cplane.endpoint_from_timeline(into_timeline)?; + let from_ep = cplane.endpoint_from_timeline(from_timeline)?; + into_ep.merge_from(from_ep)?; + } Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{sub_name}'"), None => bail!("no tenant subcommand provided"), } @@ -1305,6 +1327,12 @@ fn cli() -> Command { .help("Use last Lsn of another timeline (and its data) as base when creating the new timeline. The timeline gets resolved by its branch name.").required(false)) .arg(Arg::new("ancestor-start-lsn").long("ancestor-start-lsn") .help("When using another timeline as base, use a specific Lsn in it instead of the latest one").required(false))) + .subcommand(Command::new("merge") + .about("Merge changes from one branch into another") + .arg(tenant_id_arg.clone()) + .arg(branch_name_arg.clone()) + .arg(Arg::new("from-branch-name").long("from-branch-name").help("Branch to read from").required(true)) + ) .subcommand(Command::new("create") .about("Create a new blank timeline") .arg(tenant_id_arg.clone()) diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 4443fd8704..e8f5e5c783 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -99,6 +99,18 @@ impl ComputeControlPlane { }) } + pub fn endpoint_from_timeline(&self, timeline_id : TimelineId) -> Result> + { + for (_, value) in self.endpoints.iter() + { + if value.timeline_id == timeline_id && value.mode == ComputeMode::Primary + { + return Ok(value.clone()); + } + } + bail!("Endpoint not found with current timeline") + } + fn get_port(&mut self) -> u16 { 1 + self .endpoints @@ -674,6 +686,30 @@ impl Endpoint { } } + pub fn merge_from(&self, merge_from: Arc) -> Result<()> + { + let client = reqwest::blocking::Client::new(); + let response = client + .post(format!( + "http://{}:{}/merge", + self.http_address.ip(), + self.http_address.port() + )) + .body(merge_from.connstr()) + .send()?; + let status = response.status(); + if !(status.is_client_error() || status.is_server_error()) { + Ok(()) + } else { + let url = response.url().to_owned(); + let msg = match response.text() { + Ok(err_body) => format!("Error: {}", err_body), + Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url), + }; + Err(anyhow::anyhow!(msg)) + } + } + pub fn stop(&self, destroy: bool) -> Result<()> { // If we are going to destroy data directory, // use immediate shutdown mode, otherwise,