mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-05 17:30:38 +00:00
Compare commits
6 Commits
split-prox
...
merge_bran
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ba3480a568 | ||
|
|
71bd53c646 | ||
|
|
012f22c36d | ||
|
|
a1db731f4a | ||
|
|
a5d0f1a92e | ||
|
|
d19860abf7 |
@@ -83,6 +83,7 @@ pub struct ComputeState {
|
|||||||
pub last_active: Option<DateTime<Utc>>,
|
pub last_active: Option<DateTime<Utc>>,
|
||||||
pub error: Option<String>,
|
pub error: Option<String>,
|
||||||
pub pspec: Option<ParsedSpec>,
|
pub pspec: Option<ParsedSpec>,
|
||||||
|
pub merge_src_connstr: Option<String>,
|
||||||
pub metrics: ComputeMetrics,
|
pub metrics: ComputeMetrics,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -94,6 +95,7 @@ impl ComputeState {
|
|||||||
last_active: None,
|
last_active: None,
|
||||||
error: None,
|
error: None,
|
||||||
pspec: None,
|
pspec: None,
|
||||||
|
merge_src_connstr: None,
|
||||||
metrics: ComputeMetrics::default(),
|
metrics: ComputeMetrics::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -627,6 +629,22 @@ impl ComputeNode {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Merge two branches
|
||||||
|
#[instrument(skip_all)]
|
||||||
|
pub fn merge(&self, src_connstr: &str) -> Result<()> {
|
||||||
|
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
|
||||||
|
handle_merge(&mut client, self.connstr.as_str(), &src_connstr)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Mark brnach as mergeable
|
||||||
|
#[instrument(skip_all)]
|
||||||
|
pub fn set_mergeable(&self) -> Result<()> {
|
||||||
|
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
|
||||||
|
handle_set_mergeable(&mut client, self.connstr.as_str())?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Start Postgres as a child process and manage DBs/roles.
|
/// Start Postgres as a child process and manage DBs/roles.
|
||||||
/// After that this will hang waiting on the postmaster process to exit.
|
/// After that this will hang waiting on the postmaster process to exit.
|
||||||
#[instrument(skip_all)]
|
#[instrument(skip_all)]
|
||||||
|
|||||||
@@ -31,6 +31,35 @@ fn configurator_main_loop(compute: &Arc<ComputeNode>) {
|
|||||||
// XXX: used to test that API is blocking
|
// XXX: used to test that API is blocking
|
||||||
// std::thread::sleep(std::time::Duration::from_millis(10000));
|
// std::thread::sleep(std::time::Duration::from_millis(10000));
|
||||||
|
|
||||||
|
compute.set_status(new_status);
|
||||||
|
} else if state.status == ComputeStatus::MergePending {
|
||||||
|
info!("got merge request");
|
||||||
|
state.status = ComputeStatus::Merging;
|
||||||
|
compute.state_changed.notify_all();
|
||||||
|
let connstr = state.merge_src_connstr.clone().unwrap();
|
||||||
|
drop(state);
|
||||||
|
|
||||||
|
let mut new_status = ComputeStatus::Failed;
|
||||||
|
if let Err(e) = compute.merge(&connstr) {
|
||||||
|
info!("could not merge compute node: {}", e);
|
||||||
|
} else {
|
||||||
|
new_status = ComputeStatus::Running;
|
||||||
|
info!("merge complete");
|
||||||
|
}
|
||||||
|
compute.set_status(new_status);
|
||||||
|
} else if state.status == ComputeStatus::SetMergeablePending {
|
||||||
|
info!("got set mergeable request");
|
||||||
|
state.status = ComputeStatus::SetMergeable;
|
||||||
|
compute.state_changed.notify_all();
|
||||||
|
drop(state);
|
||||||
|
|
||||||
|
let mut new_status = ComputeStatus::Failed;
|
||||||
|
if let Err(e) = compute.set_mergeable() {
|
||||||
|
info!("could not mark branch as mergeable: {}", e);
|
||||||
|
} else {
|
||||||
|
new_status = ComputeStatus::Running;
|
||||||
|
info!("marked as mergeable");
|
||||||
|
}
|
||||||
compute.set_status(new_status);
|
compute.set_status(new_status);
|
||||||
} else if state.status == ComputeStatus::Failed {
|
} else if state.status == ComputeStatus::Failed {
|
||||||
info!("compute node is now in Failed state, exiting");
|
info!("compute node is now in Failed state, exiting");
|
||||||
|
|||||||
@@ -123,6 +123,30 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Handle branch merge request
|
||||||
|
(&Method::POST, "/merge") => {
|
||||||
|
info!("serving /merge POST request");
|
||||||
|
match handle_merge_request(req, compute).await {
|
||||||
|
Ok(msg) => Response::new(Body::from(msg)),
|
||||||
|
Err((msg, code)) => {
|
||||||
|
error!("error handling /merge request: {msg}");
|
||||||
|
render_json_error(&msg, code)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle branch set mergeable request
|
||||||
|
(&Method::POST, "/set_mergeable") => {
|
||||||
|
info!("serving /set_mergeable POST request");
|
||||||
|
match handle_set_mergeable_request(compute).await {
|
||||||
|
Ok(msg) => Response::new(Body::from(msg)),
|
||||||
|
Err((msg, code)) => {
|
||||||
|
error!("error handling /set_mergeable request: {msg}");
|
||||||
|
render_json_error(&msg, code)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// download extension files from S3 on demand
|
// download extension files from S3 on demand
|
||||||
(&Method::POST, route) if route.starts_with("/extension_server/") => {
|
(&Method::POST, route) if route.starts_with("/extension_server/") => {
|
||||||
info!("serving {:?} POST request", route);
|
info!("serving {:?} POST request", route);
|
||||||
@@ -209,6 +233,103 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn handle_merge_request(
|
||||||
|
req: Request<Body>,
|
||||||
|
compute: &Arc<ComputeNode>,
|
||||||
|
) -> Result<String, (String, StatusCode)> {
|
||||||
|
let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap();
|
||||||
|
let connstr = String::from_utf8(body_bytes.to_vec()).unwrap();
|
||||||
|
|
||||||
|
let c = compute.clone();
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut state = compute.state.lock().unwrap();
|
||||||
|
if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running {
|
||||||
|
let msg = format!(
|
||||||
|
"invalid compute status for merge request: {:?}",
|
||||||
|
state.status.clone()
|
||||||
|
);
|
||||||
|
return Err((msg, StatusCode::PRECONDITION_FAILED));
|
||||||
|
}
|
||||||
|
state.merge_src_connstr = Some(connstr);
|
||||||
|
state.status = ComputeStatus::MergePending;
|
||||||
|
compute.state_changed.notify_all();
|
||||||
|
drop(state);
|
||||||
|
info!("set new spec and notified waiters");
|
||||||
|
}
|
||||||
|
|
||||||
|
task::spawn_blocking(move || {
|
||||||
|
let mut state = c.state.lock().unwrap();
|
||||||
|
while state.status != ComputeStatus::Running {
|
||||||
|
state = c.state_changed.wait(state).unwrap();
|
||||||
|
info!(
|
||||||
|
"waiting for compute to become Running, current status: {:?}",
|
||||||
|
state.status
|
||||||
|
);
|
||||||
|
|
||||||
|
if state.status == ComputeStatus::Failed {
|
||||||
|
let err = state.error.as_ref().map_or("unknown error", |x| x);
|
||||||
|
let msg = format!("compute configuration failed: {:?}", err);
|
||||||
|
return Err((msg, StatusCode::INTERNAL_SERVER_ERROR));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap()?;
|
||||||
|
|
||||||
|
let state = compute.state.lock().unwrap().clone();
|
||||||
|
let status_response = status_response_from_state(&state);
|
||||||
|
Ok(serde_json::to_string(&status_response).unwrap())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_set_mergeable_request(
|
||||||
|
compute: &Arc<ComputeNode>,
|
||||||
|
) -> Result<String, (String, StatusCode)> {
|
||||||
|
let c = compute.clone();
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut state = compute.state.lock().unwrap();
|
||||||
|
if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running {
|
||||||
|
let msg = format!(
|
||||||
|
"invalid compute status for merge request: {:?}",
|
||||||
|
state.status.clone()
|
||||||
|
);
|
||||||
|
return Err((msg, StatusCode::PRECONDITION_FAILED));
|
||||||
|
}
|
||||||
|
state.status = ComputeStatus::SetMergeablePending;
|
||||||
|
compute.state_changed.notify_all();
|
||||||
|
drop(state);
|
||||||
|
info!("set new spec and notified waiters");
|
||||||
|
}
|
||||||
|
|
||||||
|
task::spawn_blocking(move || {
|
||||||
|
let mut state = c.state.lock().unwrap();
|
||||||
|
while state.status != ComputeStatus::Running {
|
||||||
|
state = c.state_changed.wait(state).unwrap();
|
||||||
|
info!(
|
||||||
|
"waiting for compute to become Running, current status: {:?}",
|
||||||
|
state.status
|
||||||
|
);
|
||||||
|
|
||||||
|
if state.status == ComputeStatus::Failed {
|
||||||
|
let err = state.error.as_ref().map_or("unknown error", |x| x);
|
||||||
|
let msg = format!("compute configuration failed: {:?}", err);
|
||||||
|
return Err((msg, StatusCode::INTERNAL_SERVER_ERROR));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap()?;
|
||||||
|
|
||||||
|
let state = compute.state.lock().unwrap().clone();
|
||||||
|
let status_response = status_response_from_state(&state);
|
||||||
|
Ok(serde_json::to_string(&status_response).unwrap())
|
||||||
|
}
|
||||||
|
|
||||||
async fn handle_configure_request(
|
async fn handle_configure_request(
|
||||||
req: Request<Body>,
|
req: Request<Body>,
|
||||||
compute: &Arc<ComputeNode>,
|
compute: &Arc<ComputeNode>,
|
||||||
|
|||||||
@@ -85,6 +85,61 @@ paths:
|
|||||||
description: Error text or 'true' if check passed.
|
description: Error text or 'true' if check passed.
|
||||||
example: "true"
|
example: "true"
|
||||||
|
|
||||||
|
/merge:
|
||||||
|
post:
|
||||||
|
tags:
|
||||||
|
- Merge
|
||||||
|
summary: Merge branches.
|
||||||
|
description: |
|
||||||
|
This is a blocking API endpoint, i.e. it blocks waiting until
|
||||||
|
compute is finished configuration and is in `Running` state.
|
||||||
|
Optional non-blocking mode could be added later.
|
||||||
|
operationId: mergeBranches
|
||||||
|
requestBody:
|
||||||
|
description: connection string of target branch
|
||||||
|
required: true
|
||||||
|
content:
|
||||||
|
text/plain:
|
||||||
|
schema:
|
||||||
|
type: string
|
||||||
|
responses:
|
||||||
|
200:
|
||||||
|
description: Merge finished.
|
||||||
|
content:
|
||||||
|
application/json:OK
|
||||||
|
schema:
|
||||||
|
$ref: "#/components/schemas/ComputeState"
|
||||||
|
500:
|
||||||
|
description: Merge request failed.
|
||||||
|
content:
|
||||||
|
application/json:
|
||||||
|
schema:
|
||||||
|
$ref: "#/components/schemas/GenericError"
|
||||||
|
|
||||||
|
/set_mergeable:
|
||||||
|
post:
|
||||||
|
tags:
|
||||||
|
- Set mergeable
|
||||||
|
summary: Mark branch as mergeable.
|
||||||
|
description: |
|
||||||
|
This is a blocking API endpoint, i.e. it blocks waiting until
|
||||||
|
compute is finished configuration and is in `Running` state.
|
||||||
|
Optional non-blocking mode could be added later.
|
||||||
|
operationId: markBranchMergeable
|
||||||
|
responses:
|
||||||
|
200:
|
||||||
|
description: Branch marked as mergeable
|
||||||
|
content:
|
||||||
|
application/json:OK
|
||||||
|
schema:
|
||||||
|
$ref: "#/components/schemas/ComputeState"
|
||||||
|
500:
|
||||||
|
description: Set mergeable request failed.
|
||||||
|
content:
|
||||||
|
application/json:
|
||||||
|
schema:
|
||||||
|
$ref: "#/components/schemas/GenericError"
|
||||||
|
|
||||||
/configure:
|
/configure:
|
||||||
post:
|
post:
|
||||||
tags:
|
tags:
|
||||||
|
|||||||
@@ -407,6 +407,58 @@ fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(skip_all)]
|
||||||
|
pub fn handle_merge(client: &mut Client, dst_connstr: &str, src_connstr: &str) -> Result<()> {
|
||||||
|
info!("Merge branch into {}", dst_connstr);
|
||||||
|
let existing_dbs = get_existing_dbs(client)?;
|
||||||
|
|
||||||
|
for (_, db) in existing_dbs {
|
||||||
|
if db.name.starts_with("template") {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let mut dst_conf = Config::from_str(dst_connstr)?;
|
||||||
|
dst_conf.dbname(&db.name);
|
||||||
|
|
||||||
|
let mut src_conf = Config::from_str(src_connstr)?;
|
||||||
|
src_conf.dbname(&db.name);
|
||||||
|
|
||||||
|
let mut sub_client = dst_conf.connect(NoTls)?;
|
||||||
|
let mut connstr_parts: Vec<&str> = src_connstr.split('/').collect();
|
||||||
|
connstr_parts.pop();
|
||||||
|
connstr_parts.push(&db.name);
|
||||||
|
let connstr = connstr_parts.join("/");
|
||||||
|
let create_sub = format!("create subscription sub_merge connection '{}' publication pub_merge with (create_slot=false, slot_name=merge_slot_{}, copy_data=false)", connstr, &db.name);
|
||||||
|
sub_client.simple_query(&create_sub)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[instrument(skip_all)]
|
||||||
|
pub fn handle_set_mergeable(client: &mut Client, connstr: &str) -> Result<()> {
|
||||||
|
info!("Mark branch as mergeable");
|
||||||
|
let existing_dbs = get_existing_dbs(client)?;
|
||||||
|
|
||||||
|
for (_, db) in existing_dbs {
|
||||||
|
if db.name.starts_with("template") {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let mut conf = Config::from_str(connstr)?;
|
||||||
|
conf.dbname(&db.name);
|
||||||
|
|
||||||
|
let mut db_client = conf.connect(NoTls)?;
|
||||||
|
|
||||||
|
let create_slot = format!(
|
||||||
|
"select pg_create_logical_replication_slot('merge_slot_{}', 'pgoutput')",
|
||||||
|
&db.name
|
||||||
|
);
|
||||||
|
db_client.simple_query(&create_slot)?;
|
||||||
|
db_client.simple_query("create publication pub_merge for all tables")?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// It follows mostly the same logic as `handle_roles()` excepting that we
|
/// It follows mostly the same logic as `handle_roles()` excepting that we
|
||||||
/// does not use an explicit transactions block, since major database operations
|
/// does not use an explicit transactions block, since major database operations
|
||||||
/// like `CREATE DATABASE` and `DROP DATABASE` do not support it. Statement-level
|
/// like `CREATE DATABASE` and `DROP DATABASE` do not support it. Statement-level
|
||||||
|
|||||||
@@ -587,6 +587,31 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
|
|||||||
timeline_info.timeline_id
|
timeline_info.timeline_id
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
Some(("merge", branch_match)) => {
|
||||||
|
let src_endpoint_id = branch_match
|
||||||
|
.get_one::<String>("src-endpoint")
|
||||||
|
.map(|s| s.as_str())
|
||||||
|
.ok_or(anyhow!("No source endpoint provided"))?;
|
||||||
|
let dst_endpoint_id = branch_match
|
||||||
|
.get_one::<String>("dst-endpoint")
|
||||||
|
.map(|s| s.as_str())
|
||||||
|
.ok_or(anyhow!("No destination endpoint provided"))?;
|
||||||
|
|
||||||
|
let cplane = ComputeControlPlane::load(env.clone())?;
|
||||||
|
let src_endpoint = cplane.endpoints.get(src_endpoint_id).unwrap();
|
||||||
|
let dst_endpoint = cplane.endpoints.get(dst_endpoint_id).unwrap();
|
||||||
|
dst_endpoint.merge_from(src_endpoint)?;
|
||||||
|
}
|
||||||
|
Some(("set_mergeable", branch_match)) => {
|
||||||
|
let endpoint_id = branch_match
|
||||||
|
.get_one::<String>("endpoint")
|
||||||
|
.map(|s| s.as_str())
|
||||||
|
.ok_or(anyhow!("No endpoint provided"))?;
|
||||||
|
|
||||||
|
let cplane = ComputeControlPlane::load(env.clone())?;
|
||||||
|
let endpoint = cplane.endpoints.get(endpoint_id).unwrap();
|
||||||
|
endpoint.set_mergeable()?;
|
||||||
|
}
|
||||||
Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{sub_name}'"),
|
Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{sub_name}'"),
|
||||||
None => bail!("no tenant subcommand provided"),
|
None => bail!("no tenant subcommand provided"),
|
||||||
}
|
}
|
||||||
@@ -1305,6 +1330,15 @@ fn cli() -> Command {
|
|||||||
.help("Use last Lsn of another timeline (and its data) as base when creating the new timeline. The timeline gets resolved by its branch name.").required(false))
|
.help("Use last Lsn of another timeline (and its data) as base when creating the new timeline. The timeline gets resolved by its branch name.").required(false))
|
||||||
.arg(Arg::new("ancestor-start-lsn").long("ancestor-start-lsn")
|
.arg(Arg::new("ancestor-start-lsn").long("ancestor-start-lsn")
|
||||||
.help("When using another timeline as base, use a specific Lsn in it instead of the latest one").required(false)))
|
.help("When using another timeline as base, use a specific Lsn in it instead of the latest one").required(false)))
|
||||||
|
.subcommand(Command::new("merge")
|
||||||
|
.about("Merge changes from one branch into another")
|
||||||
|
.arg(Arg::new("src-endpoint").long("src-endpoint").help("Source endpoint for merge").required(true))
|
||||||
|
.arg(Arg::new("dst-endpoint").long("dst-endpoint").help("Destination endpoint for merge").required(true))
|
||||||
|
)
|
||||||
|
.subcommand(Command::new("set_mergeable")
|
||||||
|
.about("Mark branch as mergeable")
|
||||||
|
.arg(Arg::new("endpoint").long("endpoint").help("Enpoint to be marked as mergeable").required(true))
|
||||||
|
)
|
||||||
.subcommand(Command::new("create")
|
.subcommand(Command::new("create")
|
||||||
.about("Create a new blank timeline")
|
.about("Create a new blank timeline")
|
||||||
.arg(tenant_id_arg.clone())
|
.arg(tenant_id_arg.clone())
|
||||||
|
|||||||
@@ -572,7 +572,11 @@ impl Endpoint {
|
|||||||
}
|
}
|
||||||
ComputeStatus::Empty
|
ComputeStatus::Empty
|
||||||
| ComputeStatus::ConfigurationPending
|
| ComputeStatus::ConfigurationPending
|
||||||
| ComputeStatus::Configuration => {
|
| ComputeStatus::Configuration
|
||||||
|
| ComputeStatus::MergePending
|
||||||
|
| ComputeStatus::Merging
|
||||||
|
| ComputeStatus::SetMergeablePending
|
||||||
|
| ComputeStatus::SetMergeable => {
|
||||||
bail!("unexpected compute status: {:?}", state.status)
|
bail!("unexpected compute status: {:?}", state.status)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -674,6 +678,51 @@ impl Endpoint {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn merge_from(&self, merge_from: &Arc<Endpoint>) -> Result<()> {
|
||||||
|
let client = reqwest::blocking::Client::new();
|
||||||
|
let response = client
|
||||||
|
.post(format!(
|
||||||
|
"http://{}:{}/merge",
|
||||||
|
self.http_address.ip(),
|
||||||
|
self.http_address.port()
|
||||||
|
))
|
||||||
|
.body(merge_from.connstr())
|
||||||
|
.send()?;
|
||||||
|
let status = response.status();
|
||||||
|
if !(status.is_client_error() || status.is_server_error()) {
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
let url = response.url().to_owned();
|
||||||
|
let msg = match response.text() {
|
||||||
|
Ok(err_body) => format!("Error: {}", err_body),
|
||||||
|
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
|
||||||
|
};
|
||||||
|
Err(anyhow::anyhow!(msg))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_mergeable(&self) -> Result<()> {
|
||||||
|
let client = reqwest::blocking::Client::new();
|
||||||
|
let response = client
|
||||||
|
.post(format!(
|
||||||
|
"http://{}:{}/set_mergeable",
|
||||||
|
self.http_address.ip(),
|
||||||
|
self.http_address.port()
|
||||||
|
))
|
||||||
|
.send()?;
|
||||||
|
let status = response.status();
|
||||||
|
if !(status.is_client_error() || status.is_server_error()) {
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
let url = response.url().to_owned();
|
||||||
|
let msg = match response.text() {
|
||||||
|
Ok(err_body) => format!("Error: {}", err_body),
|
||||||
|
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
|
||||||
|
};
|
||||||
|
Err(anyhow::anyhow!(msg))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn stop(&self, destroy: bool) -> Result<()> {
|
pub fn stop(&self, destroy: bool) -> Result<()> {
|
||||||
// If we are going to destroy data directory,
|
// If we are going to destroy data directory,
|
||||||
// use immediate shutdown mode, otherwise,
|
// use immediate shutdown mode, otherwise,
|
||||||
|
|||||||
@@ -48,6 +48,14 @@ pub enum ComputeStatus {
|
|||||||
Running,
|
Running,
|
||||||
// New spec is being applied.
|
// New spec is being applied.
|
||||||
Configuration,
|
Configuration,
|
||||||
|
// Merge requested
|
||||||
|
MergePending,
|
||||||
|
// Set mergeable requested
|
||||||
|
SetMergeablePending,
|
||||||
|
// Merge in progress
|
||||||
|
Merging,
|
||||||
|
// Set mergeable in progress
|
||||||
|
SetMergeable,
|
||||||
// Either startup or configuration failed,
|
// Either startup or configuration failed,
|
||||||
// compute will exit soon or is waiting for
|
// compute will exit soon or is waiting for
|
||||||
// control-plane to terminate it.
|
// control-plane to terminate it.
|
||||||
|
|||||||
@@ -1154,6 +1154,42 @@ class NeonCli(AbstractNeonCli):
|
|||||||
kwargs["local_binpath"] = True
|
kwargs["local_binpath"] = True
|
||||||
return super().raw_cli(*args, **kwargs)
|
return super().raw_cli(*args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
def merge(
|
||||||
|
self,
|
||||||
|
src_endpoint: Endpoint,
|
||||||
|
dst_endpoint: Endpoint):
|
||||||
|
"""
|
||||||
|
Merge two branches
|
||||||
|
"""
|
||||||
|
|
||||||
|
args = [
|
||||||
|
"timeline",
|
||||||
|
"merge",
|
||||||
|
"--src-endpoint",
|
||||||
|
str(src_endpoint.endpoint_id),
|
||||||
|
"--dst-endpoint",
|
||||||
|
str(dst_endpoint.endpoint_id)
|
||||||
|
]
|
||||||
|
res = self.raw_cli(args)
|
||||||
|
res.check_returncode()
|
||||||
|
|
||||||
|
def set_mergeable(
|
||||||
|
self,
|
||||||
|
endpoint: Endpoint):
|
||||||
|
"""
|
||||||
|
Merge two branches
|
||||||
|
"""
|
||||||
|
|
||||||
|
args = [
|
||||||
|
"timeline",
|
||||||
|
"set_mergeable",
|
||||||
|
"--endpoint",
|
||||||
|
str(endpoint.endpoint_id),
|
||||||
|
]
|
||||||
|
res = self.raw_cli(args)
|
||||||
|
res.check_returncode()
|
||||||
|
|
||||||
def create_tenant(
|
def create_tenant(
|
||||||
self,
|
self,
|
||||||
tenant_id: Optional[TenantId] = None,
|
tenant_id: Optional[TenantId] = None,
|
||||||
|
|||||||
43
test_runner/regress/test_merge.py
Normal file
43
test_runner/regress/test_merge.py
Normal file
@@ -0,0 +1,43 @@
|
|||||||
|
import time
|
||||||
|
from fixtures.log_helper import log
|
||||||
|
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||||
|
from fixtures.types import TimelineId
|
||||||
|
from fixtures.utils import query_scalar
|
||||||
|
|
||||||
|
#
|
||||||
|
# Merge ancestor branch with the main branch.
|
||||||
|
#
|
||||||
|
def test_merge(neon_env_builder: NeonEnvBuilder):
|
||||||
|
env = neon_env_builder.init_start()
|
||||||
|
pageserver_http = env.pageserver.http_client()
|
||||||
|
|
||||||
|
# Override defaults: 4M checkpoint_distance, disable background compaction and gc.
|
||||||
|
tenant, _ = env.neon_cli.create_tenant()
|
||||||
|
|
||||||
|
main_branch = env.endpoints.create_start("main", tenant_id=tenant)
|
||||||
|
main_cur = main_branch.connect().cursor()
|
||||||
|
|
||||||
|
# Create table and insert some data
|
||||||
|
main_cur.execute("CREATE TABLE t(x bigint primary key)")
|
||||||
|
main_cur.execute("INSERT INTO t values(generate_series(1, 10000))");
|
||||||
|
|
||||||
|
# Create branch ws.
|
||||||
|
env.neon_cli.create_branch("ws", "main", tenant_id=tenant)
|
||||||
|
ws_branch = env.endpoints.create_start("ws", tenant_id=tenant)
|
||||||
|
log.info("postgres is running on 'ws' branch")
|
||||||
|
|
||||||
|
# Merge brnach ws as mergeable:it create logical replication slots and pins WAL
|
||||||
|
env.neon_cli.set_mergeable(ws_branch)
|
||||||
|
|
||||||
|
# Insert more data in the branch
|
||||||
|
ws_cur = ws_branch.connect().cursor()
|
||||||
|
ws_cur.execute("INSERT INTO t values(generate_series(10001, 20000))")
|
||||||
|
|
||||||
|
# Merge ws brnach intp main
|
||||||
|
env.neon_cli.merge(ws_branch, main_branch)
|
||||||
|
|
||||||
|
# sleep for some time until changes are applied
|
||||||
|
time.sleep(2)
|
||||||
|
|
||||||
|
# Check that changes are merged
|
||||||
|
assert query_scalar(main_cur, "SELECT count(*) from t") == 20000
|
||||||
Reference in New Issue
Block a user