Merge branches using logical replication

This commit is contained in:
Konstantin Knizhnik
2023-11-16 19:29:22 +01:00
parent ab631e6792
commit d19860abf7
8 changed files with 205 additions and 0 deletions

View File

@@ -627,6 +627,16 @@ impl ComputeNode {
Ok(())
}
/// Merge two branches
#[instrument(skip_all)]
pub fn merge(&self, src_constr: &str) -> Result<()> {
let compute_state = self.state.lock().unwrap().clone();
let spec = &compute_state.pspec.as_ref().expect("spec must be set").spec;
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
handle_merge(spec, &mut client, self.connstr.as_str(), src_constr)?;
Ok(())
}
/// Start Postgres as a child process and manage DBs/roles.
/// After that this will hang waiting on the postmaster process to exit.
#[instrument(skip_all)]
@@ -697,6 +707,7 @@ impl ComputeNode {
handle_databases(spec, &mut client)?;
handle_role_deletions(spec, self.connstr.as_str(), &mut client)?;
handle_grants(spec, &mut client, self.connstr.as_str())?;
handle_replication(spec, &mut client, self.connstr.as_str())?;
handle_extensions(spec, &mut client)?;
create_availability_check_data(&mut client)?;

View File

@@ -123,6 +123,12 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
}
}
// Handle branch merge request
(&Method::POST, "/merge") => {
info!("serving /merge POST request");
handle_merge_request(req, compute).await
}
// download extension files from S3 on demand
(&Method::POST, route) if route.starts_with("/extension_server/") => {
info!("serving {:?} POST request", route);
@@ -209,6 +215,18 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
}
}
async fn handle_merge_request(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body> {
let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap();
let connstr = String::from_utf8(body_bytes.to_vec()).unwrap();
match compute.merge(&connstr) {
Ok(_) => Response::new(Body::from("OK")),
Err(e) => {
error!("Branch merge failed: {}", e);
Response::new(Body::from(e.to_string()))
}
}
}
async fn handle_configure_request(
req: Request<Body>,
compute: &Arc<ComputeNode>,

View File

@@ -85,6 +85,37 @@ paths:
description: Error text or 'true' if check passed.
example: "true"
/merge:
post:
tags:
- Merge
summary: Mere branches.
description: |
This is a blocking API endpoint, i.e. it blocks waiting until
compute is finished configuration and is in `Running` state.
Optional non-blocking mode could be added later.
operationId: mergeBranches
requestBody:
description: connection string of target branch
required: true
content:
text/plain:
schema:
type: string
responses:
200:
description: Merge finished.
content:
application/json:OK
schema:
$ref: "#/components/schemas/ComputeState"
500:
description: Merge request failed.
content:
application/json:
schema:
$ref: "#/components/schemas/GenericError"
/configure:
post:
tags:

View File

@@ -407,6 +407,51 @@ fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent
Ok(())
}
#[instrument(skip_all)]
pub fn handle_merge(spec: &ComputeSpec, client: &mut Client, dst_connstr: &str, src_connstr: &str) -> Result<()> {
info!("Merge branch into {}", dst_connstr);
let existing_dbs = get_existing_dbs(client)?;
for (_, db) in existing_dbs {
let mut dst_conf = Config::from_str(dst_connstr)?;
dst_conf.dbname(&db.name);
let mut src_conf = Config::from_str(src_connstr)?;
src_conf.dbname(&db.name);
let mut pub_client = src_conf.connect(NoTls)?;
let create_pub = format!("create publication pub_merge for tables in schema public");
pub_client.simple_query(&create_pub)?;
let mut sub_client = dst_conf.connect(NoTls)?;
let create_sub = format!("create subscription sub_merge connection '{}' publication pub_merge with (create_slot=false, slot_name=merge_slot_{}, copy_data=false)", str::replace(src_connstr, "'", "''"), &db.name);
sub_client.simple_query(&create_sub)?;
}
Ok(())
}
#[instrument(skip_all)]
pub fn handle_replication(spec: &ComputeSpec, client: &mut Client, connstr: &str) -> Result<()> {
info!("Creating logical replication slot");
let existing_dbs = get_existing_dbs(client)?;
for (_, db) in existing_dbs {
let mut conf = Config::from_str(connstr)?;
conf.dbname(&db.name);
let mut db_client = conf.connect(NoTls)?;
let create_slot = format!(
"select pg_create_logical_replication_slot('merge_slot_{}', 'pgoutput')",
&db.name
);
db_client.simple_query(&create_slot)?;
}
Ok(())
}
/// It follows mostly the same logic as `handle_roles()` excepting that we
/// does not use an explicit transactions block, since major database operations
/// like `CREATE DATABASE` and `DROP DATABASE` do not support it. Statement-level

View File

@@ -587,6 +587,21 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
timeline_info.timeline_id
);
}
Some(("merge", branch_match)) => {
let src_endpoint_id = branch_match
.get_one::<String>("src-endpoint")
.map(|s| s.as_str())
.ok_or(anyhow!("No source endpoint provided"))?;
let dst_endpoint_id = branch_match
.get_one::<String>("dst-endpoint")
.map(|s| s.as_str())
.ok_or(anyhow!("No destination endpoint provided"))?;
let cplane = ComputeControlPlane::load(env.clone())?;
let src_endpoint = cplane.endpoints.get(src_endpoint_id).unwrap();
let dst_endpoint = cplane.endpoints.get(dst_endpoint_id).unwrap();
dst_endpoint.merge_from(src_endpoint)?;
}
Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{sub_name}'"),
None => bail!("no tenant subcommand provided"),
}
@@ -1305,6 +1320,11 @@ fn cli() -> Command {
.help("Use last Lsn of another timeline (and its data) as base when creating the new timeline. The timeline gets resolved by its branch name.").required(false))
.arg(Arg::new("ancestor-start-lsn").long("ancestor-start-lsn")
.help("When using another timeline as base, use a specific Lsn in it instead of the latest one").required(false)))
.subcommand(Command::new("merge")
.about("Merge changes from one branch into another")
.arg(Arg::new("src-endpoint").long("src-endpoint").help("Source endpoint for merge").required(true))
.arg(Arg::new("dst-endpoint").long("dst-endpoint").help("Destination endpoint for merge").required(true))
)
.subcommand(Command::new("create")
.about("Create a new blank timeline")
.arg(tenant_id_arg.clone())

View File

@@ -674,6 +674,29 @@ impl Endpoint {
}
}
pub fn merge_from(&self, merge_from: &Arc<Endpoint>) -> Result<()> {
let client = reqwest::blocking::Client::new();
let response = client
.post(format!(
"http://{}:{}/merge",
self.http_address.ip(),
self.http_address.port()
))
.body(merge_from.connstr())
.send()?;
let status = response.status();
if !(status.is_client_error() || status.is_server_error()) {
Ok(())
} else {
let url = response.url().to_owned();
let msg = match response.text() {
Ok(err_body) => format!("Error: {}", err_body),
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
};
Err(anyhow::anyhow!(msg))
}
}
pub fn stop(&self, destroy: bool) -> Result<()> {
// If we are going to destroy data directory,
// use immediate shutdown mode, otherwise,

View File

@@ -1154,6 +1154,26 @@ class NeonCli(AbstractNeonCli):
kwargs["local_binpath"] = True
return super().raw_cli(*args, **kwargs)
def merge(
self,
src_endpoint: Endpoint,
dst_endpoint: Endpoint):
"""
Merge two branches
"""
args = [
"timeline",
"merge",
"--src-endpoint",
str(src_endpoint.endpoint_id),
"--dst-endpoint",
str(dst_endpoint.endpoint_id)
]
res = self.raw_cli(args)
res.check_returncode()
def create_tenant(
self,
tenant_id: Optional[TenantId] = None,

View File

@@ -0,0 +1,37 @@
import time
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.types import TimelineId
from fixtures.utils import query_scalar
#
# Merge ancestor branch with the main branch.
#
def test_merge(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
# Override defaults: 4M checkpoint_distance, disable background compaction and gc.
tenant, _ = env.neon_cli.create_tenant()
main_branch = env.endpoints.create_start("main", tenant_id=tenant)
main_cur = main_branch.connect().cursor()
main_cur.execute("CREATE TABLE t(x bigint primary key)")
main_cur.execute("INSERT INTO t values(generate_series(1, 10000))");
# Create branch ws.
env.neon_cli.create_branch("ws", "main", tenant_id=tenant)
ws_branch = env.endpoints.create_start("ws", tenant_id=tenant)
log.info("postgres is running on 'ws' branch")
ws_cur = ws_branch.connect().cursor()
ws_cur.execute("INSERT INTO t values(generate_series(10001, 20000))")
env.neon_cli.merge(ws_branch, main_branch)
# sleep for some time until changes are applied
time.sleep(2)
assert query_scalar(main_cur, "SELECT count(*) from t") == 20000