Compare commits

...

6 Commits

Author SHA1 Message Date
Konstantin Knizhnik
ba3480a568 Fix construction of database URL 2023-11-17 12:05:14 +01:00
Konstantin Knizhnik
71bd53c646 Add set_mergeable 2023-11-16 23:55:30 +01:00
Konstantin Knizhnik
012f22c36d Create slot in test_merge 2023-11-16 21:00:47 +01:00
Sasha Krassovsky
a1db731f4a Fix mistake in configurator 2023-11-16 21:00:27 +01:00
Sasha Krassovsky
a5d0f1a92e Make merging happen on configurator thread 2023-11-16 20:52:46 +01:00
Konstantin Knizhnik
d19860abf7 Merge branches using logical replication 2023-11-16 19:29:22 +01:00
10 changed files with 446 additions and 1 deletions

View File

@@ -83,6 +83,7 @@ pub struct ComputeState {
pub last_active: Option<DateTime<Utc>>, pub last_active: Option<DateTime<Utc>>,
pub error: Option<String>, pub error: Option<String>,
pub pspec: Option<ParsedSpec>, pub pspec: Option<ParsedSpec>,
pub merge_src_connstr: Option<String>,
pub metrics: ComputeMetrics, pub metrics: ComputeMetrics,
} }
@@ -94,6 +95,7 @@ impl ComputeState {
last_active: None, last_active: None,
error: None, error: None,
pspec: None, pspec: None,
merge_src_connstr: None,
metrics: ComputeMetrics::default(), metrics: ComputeMetrics::default(),
} }
} }
@@ -627,6 +629,22 @@ impl ComputeNode {
Ok(()) Ok(())
} }
/// Merge two branches
#[instrument(skip_all)]
pub fn merge(&self, src_connstr: &str) -> Result<()> {
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
handle_merge(&mut client, self.connstr.as_str(), &src_connstr)?;
Ok(())
}
/// Mark brnach as mergeable
#[instrument(skip_all)]
pub fn set_mergeable(&self) -> Result<()> {
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
handle_set_mergeable(&mut client, self.connstr.as_str())?;
Ok(())
}
/// Start Postgres as a child process and manage DBs/roles. /// Start Postgres as a child process and manage DBs/roles.
/// After that this will hang waiting on the postmaster process to exit. /// After that this will hang waiting on the postmaster process to exit.
#[instrument(skip_all)] #[instrument(skip_all)]

View File

@@ -31,6 +31,35 @@ fn configurator_main_loop(compute: &Arc<ComputeNode>) {
// XXX: used to test that API is blocking // XXX: used to test that API is blocking
// std::thread::sleep(std::time::Duration::from_millis(10000)); // std::thread::sleep(std::time::Duration::from_millis(10000));
compute.set_status(new_status);
} else if state.status == ComputeStatus::MergePending {
info!("got merge request");
state.status = ComputeStatus::Merging;
compute.state_changed.notify_all();
let connstr = state.merge_src_connstr.clone().unwrap();
drop(state);
let mut new_status = ComputeStatus::Failed;
if let Err(e) = compute.merge(&connstr) {
info!("could not merge compute node: {}", e);
} else {
new_status = ComputeStatus::Running;
info!("merge complete");
}
compute.set_status(new_status);
} else if state.status == ComputeStatus::SetMergeablePending {
info!("got set mergeable request");
state.status = ComputeStatus::SetMergeable;
compute.state_changed.notify_all();
drop(state);
let mut new_status = ComputeStatus::Failed;
if let Err(e) = compute.set_mergeable() {
info!("could not mark branch as mergeable: {}", e);
} else {
new_status = ComputeStatus::Running;
info!("marked as mergeable");
}
compute.set_status(new_status); compute.set_status(new_status);
} else if state.status == ComputeStatus::Failed { } else if state.status == ComputeStatus::Failed {
info!("compute node is now in Failed state, exiting"); info!("compute node is now in Failed state, exiting");

View File

@@ -123,6 +123,30 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
} }
} }
// Handle branch merge request
(&Method::POST, "/merge") => {
info!("serving /merge POST request");
match handle_merge_request(req, compute).await {
Ok(msg) => Response::new(Body::from(msg)),
Err((msg, code)) => {
error!("error handling /merge request: {msg}");
render_json_error(&msg, code)
}
}
}
// Handle branch set mergeable request
(&Method::POST, "/set_mergeable") => {
info!("serving /set_mergeable POST request");
match handle_set_mergeable_request(compute).await {
Ok(msg) => Response::new(Body::from(msg)),
Err((msg, code)) => {
error!("error handling /set_mergeable request: {msg}");
render_json_error(&msg, code)
}
}
}
// download extension files from S3 on demand // download extension files from S3 on demand
(&Method::POST, route) if route.starts_with("/extension_server/") => { (&Method::POST, route) if route.starts_with("/extension_server/") => {
info!("serving {:?} POST request", route); info!("serving {:?} POST request", route);
@@ -209,6 +233,103 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
} }
} }
async fn handle_merge_request(
req: Request<Body>,
compute: &Arc<ComputeNode>,
) -> Result<String, (String, StatusCode)> {
let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap();
let connstr = String::from_utf8(body_bytes.to_vec()).unwrap();
let c = compute.clone();
{
let mut state = compute.state.lock().unwrap();
if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running {
let msg = format!(
"invalid compute status for merge request: {:?}",
state.status.clone()
);
return Err((msg, StatusCode::PRECONDITION_FAILED));
}
state.merge_src_connstr = Some(connstr);
state.status = ComputeStatus::MergePending;
compute.state_changed.notify_all();
drop(state);
info!("set new spec and notified waiters");
}
task::spawn_blocking(move || {
let mut state = c.state.lock().unwrap();
while state.status != ComputeStatus::Running {
state = c.state_changed.wait(state).unwrap();
info!(
"waiting for compute to become Running, current status: {:?}",
state.status
);
if state.status == ComputeStatus::Failed {
let err = state.error.as_ref().map_or("unknown error", |x| x);
let msg = format!("compute configuration failed: {:?}", err);
return Err((msg, StatusCode::INTERNAL_SERVER_ERROR));
}
}
Ok(())
})
.await
.unwrap()?;
let state = compute.state.lock().unwrap().clone();
let status_response = status_response_from_state(&state);
Ok(serde_json::to_string(&status_response).unwrap())
}
async fn handle_set_mergeable_request(
compute: &Arc<ComputeNode>,
) -> Result<String, (String, StatusCode)> {
let c = compute.clone();
{
let mut state = compute.state.lock().unwrap();
if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running {
let msg = format!(
"invalid compute status for merge request: {:?}",
state.status.clone()
);
return Err((msg, StatusCode::PRECONDITION_FAILED));
}
state.status = ComputeStatus::SetMergeablePending;
compute.state_changed.notify_all();
drop(state);
info!("set new spec and notified waiters");
}
task::spawn_blocking(move || {
let mut state = c.state.lock().unwrap();
while state.status != ComputeStatus::Running {
state = c.state_changed.wait(state).unwrap();
info!(
"waiting for compute to become Running, current status: {:?}",
state.status
);
if state.status == ComputeStatus::Failed {
let err = state.error.as_ref().map_or("unknown error", |x| x);
let msg = format!("compute configuration failed: {:?}", err);
return Err((msg, StatusCode::INTERNAL_SERVER_ERROR));
}
}
Ok(())
})
.await
.unwrap()?;
let state = compute.state.lock().unwrap().clone();
let status_response = status_response_from_state(&state);
Ok(serde_json::to_string(&status_response).unwrap())
}
async fn handle_configure_request( async fn handle_configure_request(
req: Request<Body>, req: Request<Body>,
compute: &Arc<ComputeNode>, compute: &Arc<ComputeNode>,

View File

@@ -85,6 +85,61 @@ paths:
description: Error text or 'true' if check passed. description: Error text or 'true' if check passed.
example: "true" example: "true"
/merge:
post:
tags:
- Merge
summary: Merge branches.
description: |
This is a blocking API endpoint, i.e. it blocks waiting until
compute is finished configuration and is in `Running` state.
Optional non-blocking mode could be added later.
operationId: mergeBranches
requestBody:
description: connection string of target branch
required: true
content:
text/plain:
schema:
type: string
responses:
200:
description: Merge finished.
content:
application/json:OK
schema:
$ref: "#/components/schemas/ComputeState"
500:
description: Merge request failed.
content:
application/json:
schema:
$ref: "#/components/schemas/GenericError"
/set_mergeable:
post:
tags:
- Set mergeable
summary: Mark branch as mergeable.
description: |
This is a blocking API endpoint, i.e. it blocks waiting until
compute is finished configuration and is in `Running` state.
Optional non-blocking mode could be added later.
operationId: markBranchMergeable
responses:
200:
description: Branch marked as mergeable
content:
application/json:OK
schema:
$ref: "#/components/schemas/ComputeState"
500:
description: Set mergeable request failed.
content:
application/json:
schema:
$ref: "#/components/schemas/GenericError"
/configure: /configure:
post: post:
tags: tags:

View File

@@ -407,6 +407,58 @@ fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent
Ok(()) Ok(())
} }
#[instrument(skip_all)]
pub fn handle_merge(client: &mut Client, dst_connstr: &str, src_connstr: &str) -> Result<()> {
info!("Merge branch into {}", dst_connstr);
let existing_dbs = get_existing_dbs(client)?;
for (_, db) in existing_dbs {
if db.name.starts_with("template") {
continue;
}
let mut dst_conf = Config::from_str(dst_connstr)?;
dst_conf.dbname(&db.name);
let mut src_conf = Config::from_str(src_connstr)?;
src_conf.dbname(&db.name);
let mut sub_client = dst_conf.connect(NoTls)?;
let mut connstr_parts: Vec<&str> = src_connstr.split('/').collect();
connstr_parts.pop();
connstr_parts.push(&db.name);
let connstr = connstr_parts.join("/");
let create_sub = format!("create subscription sub_merge connection '{}' publication pub_merge with (create_slot=false, slot_name=merge_slot_{}, copy_data=false)", connstr, &db.name);
sub_client.simple_query(&create_sub)?;
}
Ok(())
}
#[instrument(skip_all)]
pub fn handle_set_mergeable(client: &mut Client, connstr: &str) -> Result<()> {
info!("Mark branch as mergeable");
let existing_dbs = get_existing_dbs(client)?;
for (_, db) in existing_dbs {
if db.name.starts_with("template") {
continue;
}
let mut conf = Config::from_str(connstr)?;
conf.dbname(&db.name);
let mut db_client = conf.connect(NoTls)?;
let create_slot = format!(
"select pg_create_logical_replication_slot('merge_slot_{}', 'pgoutput')",
&db.name
);
db_client.simple_query(&create_slot)?;
db_client.simple_query("create publication pub_merge for all tables")?;
}
Ok(())
}
/// It follows mostly the same logic as `handle_roles()` excepting that we /// It follows mostly the same logic as `handle_roles()` excepting that we
/// does not use an explicit transactions block, since major database operations /// does not use an explicit transactions block, since major database operations
/// like `CREATE DATABASE` and `DROP DATABASE` do not support it. Statement-level /// like `CREATE DATABASE` and `DROP DATABASE` do not support it. Statement-level

View File

@@ -587,6 +587,31 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
timeline_info.timeline_id timeline_info.timeline_id
); );
} }
Some(("merge", branch_match)) => {
let src_endpoint_id = branch_match
.get_one::<String>("src-endpoint")
.map(|s| s.as_str())
.ok_or(anyhow!("No source endpoint provided"))?;
let dst_endpoint_id = branch_match
.get_one::<String>("dst-endpoint")
.map(|s| s.as_str())
.ok_or(anyhow!("No destination endpoint provided"))?;
let cplane = ComputeControlPlane::load(env.clone())?;
let src_endpoint = cplane.endpoints.get(src_endpoint_id).unwrap();
let dst_endpoint = cplane.endpoints.get(dst_endpoint_id).unwrap();
dst_endpoint.merge_from(src_endpoint)?;
}
Some(("set_mergeable", branch_match)) => {
let endpoint_id = branch_match
.get_one::<String>("endpoint")
.map(|s| s.as_str())
.ok_or(anyhow!("No endpoint provided"))?;
let cplane = ComputeControlPlane::load(env.clone())?;
let endpoint = cplane.endpoints.get(endpoint_id).unwrap();
endpoint.set_mergeable()?;
}
Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{sub_name}'"), Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{sub_name}'"),
None => bail!("no tenant subcommand provided"), None => bail!("no tenant subcommand provided"),
} }
@@ -1305,6 +1330,15 @@ fn cli() -> Command {
.help("Use last Lsn of another timeline (and its data) as base when creating the new timeline. The timeline gets resolved by its branch name.").required(false)) .help("Use last Lsn of another timeline (and its data) as base when creating the new timeline. The timeline gets resolved by its branch name.").required(false))
.arg(Arg::new("ancestor-start-lsn").long("ancestor-start-lsn") .arg(Arg::new("ancestor-start-lsn").long("ancestor-start-lsn")
.help("When using another timeline as base, use a specific Lsn in it instead of the latest one").required(false))) .help("When using another timeline as base, use a specific Lsn in it instead of the latest one").required(false)))
.subcommand(Command::new("merge")
.about("Merge changes from one branch into another")
.arg(Arg::new("src-endpoint").long("src-endpoint").help("Source endpoint for merge").required(true))
.arg(Arg::new("dst-endpoint").long("dst-endpoint").help("Destination endpoint for merge").required(true))
)
.subcommand(Command::new("set_mergeable")
.about("Mark branch as mergeable")
.arg(Arg::new("endpoint").long("endpoint").help("Enpoint to be marked as mergeable").required(true))
)
.subcommand(Command::new("create") .subcommand(Command::new("create")
.about("Create a new blank timeline") .about("Create a new blank timeline")
.arg(tenant_id_arg.clone()) .arg(tenant_id_arg.clone())

View File

@@ -572,7 +572,11 @@ impl Endpoint {
} }
ComputeStatus::Empty ComputeStatus::Empty
| ComputeStatus::ConfigurationPending | ComputeStatus::ConfigurationPending
| ComputeStatus::Configuration => { | ComputeStatus::Configuration
| ComputeStatus::MergePending
| ComputeStatus::Merging
| ComputeStatus::SetMergeablePending
| ComputeStatus::SetMergeable => {
bail!("unexpected compute status: {:?}", state.status) bail!("unexpected compute status: {:?}", state.status)
} }
} }
@@ -674,6 +678,51 @@ impl Endpoint {
} }
} }
pub fn merge_from(&self, merge_from: &Arc<Endpoint>) -> Result<()> {
let client = reqwest::blocking::Client::new();
let response = client
.post(format!(
"http://{}:{}/merge",
self.http_address.ip(),
self.http_address.port()
))
.body(merge_from.connstr())
.send()?;
let status = response.status();
if !(status.is_client_error() || status.is_server_error()) {
Ok(())
} else {
let url = response.url().to_owned();
let msg = match response.text() {
Ok(err_body) => format!("Error: {}", err_body),
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
};
Err(anyhow::anyhow!(msg))
}
}
pub fn set_mergeable(&self) -> Result<()> {
let client = reqwest::blocking::Client::new();
let response = client
.post(format!(
"http://{}:{}/set_mergeable",
self.http_address.ip(),
self.http_address.port()
))
.send()?;
let status = response.status();
if !(status.is_client_error() || status.is_server_error()) {
Ok(())
} else {
let url = response.url().to_owned();
let msg = match response.text() {
Ok(err_body) => format!("Error: {}", err_body),
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
};
Err(anyhow::anyhow!(msg))
}
}
pub fn stop(&self, destroy: bool) -> Result<()> { pub fn stop(&self, destroy: bool) -> Result<()> {
// If we are going to destroy data directory, // If we are going to destroy data directory,
// use immediate shutdown mode, otherwise, // use immediate shutdown mode, otherwise,

View File

@@ -48,6 +48,14 @@ pub enum ComputeStatus {
Running, Running,
// New spec is being applied. // New spec is being applied.
Configuration, Configuration,
// Merge requested
MergePending,
// Set mergeable requested
SetMergeablePending,
// Merge in progress
Merging,
// Set mergeable in progress
SetMergeable,
// Either startup or configuration failed, // Either startup or configuration failed,
// compute will exit soon or is waiting for // compute will exit soon or is waiting for
// control-plane to terminate it. // control-plane to terminate it.

View File

@@ -1154,6 +1154,42 @@ class NeonCli(AbstractNeonCli):
kwargs["local_binpath"] = True kwargs["local_binpath"] = True
return super().raw_cli(*args, **kwargs) return super().raw_cli(*args, **kwargs)
def merge(
self,
src_endpoint: Endpoint,
dst_endpoint: Endpoint):
"""
Merge two branches
"""
args = [
"timeline",
"merge",
"--src-endpoint",
str(src_endpoint.endpoint_id),
"--dst-endpoint",
str(dst_endpoint.endpoint_id)
]
res = self.raw_cli(args)
res.check_returncode()
def set_mergeable(
self,
endpoint: Endpoint):
"""
Merge two branches
"""
args = [
"timeline",
"set_mergeable",
"--endpoint",
str(endpoint.endpoint_id),
]
res = self.raw_cli(args)
res.check_returncode()
def create_tenant( def create_tenant(
self, self,
tenant_id: Optional[TenantId] = None, tenant_id: Optional[TenantId] = None,

View File

@@ -0,0 +1,43 @@
import time
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.types import TimelineId
from fixtures.utils import query_scalar
#
# Merge ancestor branch with the main branch.
#
def test_merge(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
# Override defaults: 4M checkpoint_distance, disable background compaction and gc.
tenant, _ = env.neon_cli.create_tenant()
main_branch = env.endpoints.create_start("main", tenant_id=tenant)
main_cur = main_branch.connect().cursor()
# Create table and insert some data
main_cur.execute("CREATE TABLE t(x bigint primary key)")
main_cur.execute("INSERT INTO t values(generate_series(1, 10000))");
# Create branch ws.
env.neon_cli.create_branch("ws", "main", tenant_id=tenant)
ws_branch = env.endpoints.create_start("ws", tenant_id=tenant)
log.info("postgres is running on 'ws' branch")
# Merge brnach ws as mergeable:it create logical replication slots and pins WAL
env.neon_cli.set_mergeable(ws_branch)
# Insert more data in the branch
ws_cur = ws_branch.connect().cursor()
ws_cur.execute("INSERT INTO t values(generate_series(10001, 20000))")
# Merge ws brnach intp main
env.neon_cli.merge(ws_branch, main_branch)
# sleep for some time until changes are applied
time.sleep(2)
# Check that changes are merged
assert query_scalar(main_cur, "SELECT count(*) from t") == 20000