mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-20 11:00:38 +00:00
Compare commits
6 Commits
jcsp/relat
...
merge_bran
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ba3480a568 | ||
|
|
71bd53c646 | ||
|
|
012f22c36d | ||
|
|
a1db731f4a | ||
|
|
a5d0f1a92e | ||
|
|
d19860abf7 |
@@ -1,3 +1,17 @@
|
||||
# The binaries are really slow, if you compile them in 'dev' mode with the defaults.
|
||||
# Enable some optimizations even in 'dev' mode, to make tests faster. The basic
|
||||
# optimizations enabled by "opt-level=1" don't affect debuggability too much.
|
||||
#
|
||||
# See https://www.reddit.com/r/rust/comments/gvrgca/this_is_a_neat_trick_for_getting_good_runtime/
|
||||
#
|
||||
[profile.dev.package."*"]
|
||||
# Set the default for dependencies in Development mode.
|
||||
opt-level = 3
|
||||
|
||||
[profile.dev]
|
||||
# Turn on a small amount of optimization in Development mode.
|
||||
opt-level = 1
|
||||
|
||||
[build]
|
||||
# This is only present for local builds, as it will be overridden
|
||||
# by the RUSTDOCFLAGS env var in CI.
|
||||
|
||||
@@ -83,6 +83,7 @@ pub struct ComputeState {
|
||||
pub last_active: Option<DateTime<Utc>>,
|
||||
pub error: Option<String>,
|
||||
pub pspec: Option<ParsedSpec>,
|
||||
pub merge_src_connstr: Option<String>,
|
||||
pub metrics: ComputeMetrics,
|
||||
}
|
||||
|
||||
@@ -94,6 +95,7 @@ impl ComputeState {
|
||||
last_active: None,
|
||||
error: None,
|
||||
pspec: None,
|
||||
merge_src_connstr: None,
|
||||
metrics: ComputeMetrics::default(),
|
||||
}
|
||||
}
|
||||
@@ -627,6 +629,22 @@ impl ComputeNode {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Merge two branches
|
||||
#[instrument(skip_all)]
|
||||
pub fn merge(&self, src_connstr: &str) -> Result<()> {
|
||||
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
|
||||
handle_merge(&mut client, self.connstr.as_str(), &src_connstr)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Mark brnach as mergeable
|
||||
#[instrument(skip_all)]
|
||||
pub fn set_mergeable(&self) -> Result<()> {
|
||||
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
|
||||
handle_set_mergeable(&mut client, self.connstr.as_str())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Start Postgres as a child process and manage DBs/roles.
|
||||
/// After that this will hang waiting on the postmaster process to exit.
|
||||
#[instrument(skip_all)]
|
||||
|
||||
@@ -31,6 +31,35 @@ fn configurator_main_loop(compute: &Arc<ComputeNode>) {
|
||||
// XXX: used to test that API is blocking
|
||||
// std::thread::sleep(std::time::Duration::from_millis(10000));
|
||||
|
||||
compute.set_status(new_status);
|
||||
} else if state.status == ComputeStatus::MergePending {
|
||||
info!("got merge request");
|
||||
state.status = ComputeStatus::Merging;
|
||||
compute.state_changed.notify_all();
|
||||
let connstr = state.merge_src_connstr.clone().unwrap();
|
||||
drop(state);
|
||||
|
||||
let mut new_status = ComputeStatus::Failed;
|
||||
if let Err(e) = compute.merge(&connstr) {
|
||||
info!("could not merge compute node: {}", e);
|
||||
} else {
|
||||
new_status = ComputeStatus::Running;
|
||||
info!("merge complete");
|
||||
}
|
||||
compute.set_status(new_status);
|
||||
} else if state.status == ComputeStatus::SetMergeablePending {
|
||||
info!("got set mergeable request");
|
||||
state.status = ComputeStatus::SetMergeable;
|
||||
compute.state_changed.notify_all();
|
||||
drop(state);
|
||||
|
||||
let mut new_status = ComputeStatus::Failed;
|
||||
if let Err(e) = compute.set_mergeable() {
|
||||
info!("could not mark branch as mergeable: {}", e);
|
||||
} else {
|
||||
new_status = ComputeStatus::Running;
|
||||
info!("marked as mergeable");
|
||||
}
|
||||
compute.set_status(new_status);
|
||||
} else if state.status == ComputeStatus::Failed {
|
||||
info!("compute node is now in Failed state, exiting");
|
||||
|
||||
@@ -123,6 +123,30 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
|
||||
}
|
||||
}
|
||||
|
||||
// Handle branch merge request
|
||||
(&Method::POST, "/merge") => {
|
||||
info!("serving /merge POST request");
|
||||
match handle_merge_request(req, compute).await {
|
||||
Ok(msg) => Response::new(Body::from(msg)),
|
||||
Err((msg, code)) => {
|
||||
error!("error handling /merge request: {msg}");
|
||||
render_json_error(&msg, code)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Handle branch set mergeable request
|
||||
(&Method::POST, "/set_mergeable") => {
|
||||
info!("serving /set_mergeable POST request");
|
||||
match handle_set_mergeable_request(compute).await {
|
||||
Ok(msg) => Response::new(Body::from(msg)),
|
||||
Err((msg, code)) => {
|
||||
error!("error handling /set_mergeable request: {msg}");
|
||||
render_json_error(&msg, code)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// download extension files from S3 on demand
|
||||
(&Method::POST, route) if route.starts_with("/extension_server/") => {
|
||||
info!("serving {:?} POST request", route);
|
||||
@@ -209,6 +233,103 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_merge_request(
|
||||
req: Request<Body>,
|
||||
compute: &Arc<ComputeNode>,
|
||||
) -> Result<String, (String, StatusCode)> {
|
||||
let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap();
|
||||
let connstr = String::from_utf8(body_bytes.to_vec()).unwrap();
|
||||
|
||||
let c = compute.clone();
|
||||
|
||||
{
|
||||
let mut state = compute.state.lock().unwrap();
|
||||
if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running {
|
||||
let msg = format!(
|
||||
"invalid compute status for merge request: {:?}",
|
||||
state.status.clone()
|
||||
);
|
||||
return Err((msg, StatusCode::PRECONDITION_FAILED));
|
||||
}
|
||||
state.merge_src_connstr = Some(connstr);
|
||||
state.status = ComputeStatus::MergePending;
|
||||
compute.state_changed.notify_all();
|
||||
drop(state);
|
||||
info!("set new spec and notified waiters");
|
||||
}
|
||||
|
||||
task::spawn_blocking(move || {
|
||||
let mut state = c.state.lock().unwrap();
|
||||
while state.status != ComputeStatus::Running {
|
||||
state = c.state_changed.wait(state).unwrap();
|
||||
info!(
|
||||
"waiting for compute to become Running, current status: {:?}",
|
||||
state.status
|
||||
);
|
||||
|
||||
if state.status == ComputeStatus::Failed {
|
||||
let err = state.error.as_ref().map_or("unknown error", |x| x);
|
||||
let msg = format!("compute configuration failed: {:?}", err);
|
||||
return Err((msg, StatusCode::INTERNAL_SERVER_ERROR));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
.unwrap()?;
|
||||
|
||||
let state = compute.state.lock().unwrap().clone();
|
||||
let status_response = status_response_from_state(&state);
|
||||
Ok(serde_json::to_string(&status_response).unwrap())
|
||||
}
|
||||
|
||||
async fn handle_set_mergeable_request(
|
||||
compute: &Arc<ComputeNode>,
|
||||
) -> Result<String, (String, StatusCode)> {
|
||||
let c = compute.clone();
|
||||
|
||||
{
|
||||
let mut state = compute.state.lock().unwrap();
|
||||
if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running {
|
||||
let msg = format!(
|
||||
"invalid compute status for merge request: {:?}",
|
||||
state.status.clone()
|
||||
);
|
||||
return Err((msg, StatusCode::PRECONDITION_FAILED));
|
||||
}
|
||||
state.status = ComputeStatus::SetMergeablePending;
|
||||
compute.state_changed.notify_all();
|
||||
drop(state);
|
||||
info!("set new spec and notified waiters");
|
||||
}
|
||||
|
||||
task::spawn_blocking(move || {
|
||||
let mut state = c.state.lock().unwrap();
|
||||
while state.status != ComputeStatus::Running {
|
||||
state = c.state_changed.wait(state).unwrap();
|
||||
info!(
|
||||
"waiting for compute to become Running, current status: {:?}",
|
||||
state.status
|
||||
);
|
||||
|
||||
if state.status == ComputeStatus::Failed {
|
||||
let err = state.error.as_ref().map_or("unknown error", |x| x);
|
||||
let msg = format!("compute configuration failed: {:?}", err);
|
||||
return Err((msg, StatusCode::INTERNAL_SERVER_ERROR));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
.unwrap()?;
|
||||
|
||||
let state = compute.state.lock().unwrap().clone();
|
||||
let status_response = status_response_from_state(&state);
|
||||
Ok(serde_json::to_string(&status_response).unwrap())
|
||||
}
|
||||
|
||||
async fn handle_configure_request(
|
||||
req: Request<Body>,
|
||||
compute: &Arc<ComputeNode>,
|
||||
|
||||
@@ -85,6 +85,61 @@ paths:
|
||||
description: Error text or 'true' if check passed.
|
||||
example: "true"
|
||||
|
||||
/merge:
|
||||
post:
|
||||
tags:
|
||||
- Merge
|
||||
summary: Merge branches.
|
||||
description: |
|
||||
This is a blocking API endpoint, i.e. it blocks waiting until
|
||||
compute is finished configuration and is in `Running` state.
|
||||
Optional non-blocking mode could be added later.
|
||||
operationId: mergeBranches
|
||||
requestBody:
|
||||
description: connection string of target branch
|
||||
required: true
|
||||
content:
|
||||
text/plain:
|
||||
schema:
|
||||
type: string
|
||||
responses:
|
||||
200:
|
||||
description: Merge finished.
|
||||
content:
|
||||
application/json:OK
|
||||
schema:
|
||||
$ref: "#/components/schemas/ComputeState"
|
||||
500:
|
||||
description: Merge request failed.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/GenericError"
|
||||
|
||||
/set_mergeable:
|
||||
post:
|
||||
tags:
|
||||
- Set mergeable
|
||||
summary: Mark branch as mergeable.
|
||||
description: |
|
||||
This is a blocking API endpoint, i.e. it blocks waiting until
|
||||
compute is finished configuration and is in `Running` state.
|
||||
Optional non-blocking mode could be added later.
|
||||
operationId: markBranchMergeable
|
||||
responses:
|
||||
200:
|
||||
description: Branch marked as mergeable
|
||||
content:
|
||||
application/json:OK
|
||||
schema:
|
||||
$ref: "#/components/schemas/ComputeState"
|
||||
500:
|
||||
description: Set mergeable request failed.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/GenericError"
|
||||
|
||||
/configure:
|
||||
post:
|
||||
tags:
|
||||
|
||||
@@ -407,6 +407,58 @@ fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub fn handle_merge(client: &mut Client, dst_connstr: &str, src_connstr: &str) -> Result<()> {
|
||||
info!("Merge branch into {}", dst_connstr);
|
||||
let existing_dbs = get_existing_dbs(client)?;
|
||||
|
||||
for (_, db) in existing_dbs {
|
||||
if db.name.starts_with("template") {
|
||||
continue;
|
||||
}
|
||||
let mut dst_conf = Config::from_str(dst_connstr)?;
|
||||
dst_conf.dbname(&db.name);
|
||||
|
||||
let mut src_conf = Config::from_str(src_connstr)?;
|
||||
src_conf.dbname(&db.name);
|
||||
|
||||
let mut sub_client = dst_conf.connect(NoTls)?;
|
||||
let mut connstr_parts: Vec<&str> = src_connstr.split('/').collect();
|
||||
connstr_parts.pop();
|
||||
connstr_parts.push(&db.name);
|
||||
let connstr = connstr_parts.join("/");
|
||||
let create_sub = format!("create subscription sub_merge connection '{}' publication pub_merge with (create_slot=false, slot_name=merge_slot_{}, copy_data=false)", connstr, &db.name);
|
||||
sub_client.simple_query(&create_sub)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub fn handle_set_mergeable(client: &mut Client, connstr: &str) -> Result<()> {
|
||||
info!("Mark branch as mergeable");
|
||||
let existing_dbs = get_existing_dbs(client)?;
|
||||
|
||||
for (_, db) in existing_dbs {
|
||||
if db.name.starts_with("template") {
|
||||
continue;
|
||||
}
|
||||
let mut conf = Config::from_str(connstr)?;
|
||||
conf.dbname(&db.name);
|
||||
|
||||
let mut db_client = conf.connect(NoTls)?;
|
||||
|
||||
let create_slot = format!(
|
||||
"select pg_create_logical_replication_slot('merge_slot_{}', 'pgoutput')",
|
||||
&db.name
|
||||
);
|
||||
db_client.simple_query(&create_slot)?;
|
||||
db_client.simple_query("create publication pub_merge for all tables")?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// It follows mostly the same logic as `handle_roles()` excepting that we
|
||||
/// does not use an explicit transactions block, since major database operations
|
||||
/// like `CREATE DATABASE` and `DROP DATABASE` do not support it. Statement-level
|
||||
|
||||
@@ -587,6 +587,31 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
|
||||
timeline_info.timeline_id
|
||||
);
|
||||
}
|
||||
Some(("merge", branch_match)) => {
|
||||
let src_endpoint_id = branch_match
|
||||
.get_one::<String>("src-endpoint")
|
||||
.map(|s| s.as_str())
|
||||
.ok_or(anyhow!("No source endpoint provided"))?;
|
||||
let dst_endpoint_id = branch_match
|
||||
.get_one::<String>("dst-endpoint")
|
||||
.map(|s| s.as_str())
|
||||
.ok_or(anyhow!("No destination endpoint provided"))?;
|
||||
|
||||
let cplane = ComputeControlPlane::load(env.clone())?;
|
||||
let src_endpoint = cplane.endpoints.get(src_endpoint_id).unwrap();
|
||||
let dst_endpoint = cplane.endpoints.get(dst_endpoint_id).unwrap();
|
||||
dst_endpoint.merge_from(src_endpoint)?;
|
||||
}
|
||||
Some(("set_mergeable", branch_match)) => {
|
||||
let endpoint_id = branch_match
|
||||
.get_one::<String>("endpoint")
|
||||
.map(|s| s.as_str())
|
||||
.ok_or(anyhow!("No endpoint provided"))?;
|
||||
|
||||
let cplane = ComputeControlPlane::load(env.clone())?;
|
||||
let endpoint = cplane.endpoints.get(endpoint_id).unwrap();
|
||||
endpoint.set_mergeable()?;
|
||||
}
|
||||
Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{sub_name}'"),
|
||||
None => bail!("no tenant subcommand provided"),
|
||||
}
|
||||
@@ -1305,6 +1330,15 @@ fn cli() -> Command {
|
||||
.help("Use last Lsn of another timeline (and its data) as base when creating the new timeline. The timeline gets resolved by its branch name.").required(false))
|
||||
.arg(Arg::new("ancestor-start-lsn").long("ancestor-start-lsn")
|
||||
.help("When using another timeline as base, use a specific Lsn in it instead of the latest one").required(false)))
|
||||
.subcommand(Command::new("merge")
|
||||
.about("Merge changes from one branch into another")
|
||||
.arg(Arg::new("src-endpoint").long("src-endpoint").help("Source endpoint for merge").required(true))
|
||||
.arg(Arg::new("dst-endpoint").long("dst-endpoint").help("Destination endpoint for merge").required(true))
|
||||
)
|
||||
.subcommand(Command::new("set_mergeable")
|
||||
.about("Mark branch as mergeable")
|
||||
.arg(Arg::new("endpoint").long("endpoint").help("Enpoint to be marked as mergeable").required(true))
|
||||
)
|
||||
.subcommand(Command::new("create")
|
||||
.about("Create a new blank timeline")
|
||||
.arg(tenant_id_arg.clone())
|
||||
|
||||
@@ -572,7 +572,11 @@ impl Endpoint {
|
||||
}
|
||||
ComputeStatus::Empty
|
||||
| ComputeStatus::ConfigurationPending
|
||||
| ComputeStatus::Configuration => {
|
||||
| ComputeStatus::Configuration
|
||||
| ComputeStatus::MergePending
|
||||
| ComputeStatus::Merging
|
||||
| ComputeStatus::SetMergeablePending
|
||||
| ComputeStatus::SetMergeable => {
|
||||
bail!("unexpected compute status: {:?}", state.status)
|
||||
}
|
||||
}
|
||||
@@ -674,6 +678,51 @@ impl Endpoint {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn merge_from(&self, merge_from: &Arc<Endpoint>) -> Result<()> {
|
||||
let client = reqwest::blocking::Client::new();
|
||||
let response = client
|
||||
.post(format!(
|
||||
"http://{}:{}/merge",
|
||||
self.http_address.ip(),
|
||||
self.http_address.port()
|
||||
))
|
||||
.body(merge_from.connstr())
|
||||
.send()?;
|
||||
let status = response.status();
|
||||
if !(status.is_client_error() || status.is_server_error()) {
|
||||
Ok(())
|
||||
} else {
|
||||
let url = response.url().to_owned();
|
||||
let msg = match response.text() {
|
||||
Ok(err_body) => format!("Error: {}", err_body),
|
||||
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
|
||||
};
|
||||
Err(anyhow::anyhow!(msg))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_mergeable(&self) -> Result<()> {
|
||||
let client = reqwest::blocking::Client::new();
|
||||
let response = client
|
||||
.post(format!(
|
||||
"http://{}:{}/set_mergeable",
|
||||
self.http_address.ip(),
|
||||
self.http_address.port()
|
||||
))
|
||||
.send()?;
|
||||
let status = response.status();
|
||||
if !(status.is_client_error() || status.is_server_error()) {
|
||||
Ok(())
|
||||
} else {
|
||||
let url = response.url().to_owned();
|
||||
let msg = match response.text() {
|
||||
Ok(err_body) => format!("Error: {}", err_body),
|
||||
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
|
||||
};
|
||||
Err(anyhow::anyhow!(msg))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn stop(&self, destroy: bool) -> Result<()> {
|
||||
// If we are going to destroy data directory,
|
||||
// use immediate shutdown mode, otherwise,
|
||||
|
||||
@@ -48,6 +48,14 @@ pub enum ComputeStatus {
|
||||
Running,
|
||||
// New spec is being applied.
|
||||
Configuration,
|
||||
// Merge requested
|
||||
MergePending,
|
||||
// Set mergeable requested
|
||||
SetMergeablePending,
|
||||
// Merge in progress
|
||||
Merging,
|
||||
// Set mergeable in progress
|
||||
SetMergeable,
|
||||
// Either startup or configuration failed,
|
||||
// compute will exit soon or is waiting for
|
||||
// control-plane to terminate it.
|
||||
|
||||
@@ -1,7 +1,4 @@
|
||||
use crate::{
|
||||
pgdatadir_mapping::{BASEBACKUP_CUT, METADATA_CUT},
|
||||
repository::{key_range_size, singleton_range, Key},
|
||||
};
|
||||
use crate::repository::{key_range_size, singleton_range, Key};
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use std::ops::Range;
|
||||
|
||||
@@ -25,22 +22,13 @@ impl KeySpace {
|
||||
let target_nblocks = (target_size / BLCKSZ as u64) as usize;
|
||||
|
||||
let mut parts = Vec::new();
|
||||
let mut current_part: Vec<Range<Key>> = Vec::new();
|
||||
let mut current_part = Vec::new();
|
||||
let mut current_part_size: usize = 0;
|
||||
for range in &self.ranges {
|
||||
let last = current_part
|
||||
.last()
|
||||
.map(|r| r.end)
|
||||
.unwrap_or(Key::from_i128(0));
|
||||
let cut_here = (range.start >= METADATA_CUT && last < METADATA_CUT)
|
||||
|| (range.start >= BASEBACKUP_CUT && last < BASEBACKUP_CUT);
|
||||
|
||||
// If appending the next contiguous range in the keyspace to the current
|
||||
// partition would cause it to be too large, start a new partition.
|
||||
let this_size = key_range_size(range) as usize;
|
||||
if cut_here
|
||||
|| current_part_size + this_size > target_nblocks && !current_part.is_empty()
|
||||
{
|
||||
if current_part_size + this_size > target_nblocks && !current_part.is_empty() {
|
||||
parts.push(KeySpace {
|
||||
ranges: current_part,
|
||||
});
|
||||
|
||||
@@ -40,6 +40,9 @@ pub enum StorageTimeOperation {
|
||||
#[strum(serialize = "logical size")]
|
||||
LogicalSize,
|
||||
|
||||
#[strum(serialize = "imitate logical size")]
|
||||
ImitateLogicalSize,
|
||||
|
||||
#[strum(serialize = "load layer map")]
|
||||
LoadLayerMap,
|
||||
|
||||
@@ -1361,6 +1364,7 @@ pub struct TimelineMetrics {
|
||||
pub compact_time_histo: StorageTimeMetrics,
|
||||
pub create_images_time_histo: StorageTimeMetrics,
|
||||
pub logical_size_histo: StorageTimeMetrics,
|
||||
pub imitate_logical_size_histo: StorageTimeMetrics,
|
||||
pub load_layer_map_histo: StorageTimeMetrics,
|
||||
pub garbage_collect_histo: StorageTimeMetrics,
|
||||
pub last_record_gauge: IntGauge,
|
||||
@@ -1389,6 +1393,11 @@ impl TimelineMetrics {
|
||||
StorageTimeMetrics::new(StorageTimeOperation::CreateImages, &tenant_id, &timeline_id);
|
||||
let logical_size_histo =
|
||||
StorageTimeMetrics::new(StorageTimeOperation::LogicalSize, &tenant_id, &timeline_id);
|
||||
let imitate_logical_size_histo = StorageTimeMetrics::new(
|
||||
StorageTimeOperation::ImitateLogicalSize,
|
||||
&tenant_id,
|
||||
&timeline_id,
|
||||
);
|
||||
let load_layer_map_histo =
|
||||
StorageTimeMetrics::new(StorageTimeOperation::LoadLayerMap, &tenant_id, &timeline_id);
|
||||
let garbage_collect_histo =
|
||||
@@ -1421,6 +1430,7 @@ impl TimelineMetrics {
|
||||
compact_time_histo,
|
||||
create_images_time_histo,
|
||||
logical_size_histo,
|
||||
imitate_logical_size_histo,
|
||||
garbage_collect_histo,
|
||||
load_layer_map_histo,
|
||||
last_record_gauge,
|
||||
|
||||
@@ -662,21 +662,20 @@ impl Timeline {
|
||||
ctx: &RequestContext,
|
||||
) -> Result<KeySpace, CollectKeySpaceError> {
|
||||
// Iterate through key ranges, greedily packing them into partitions
|
||||
// This function is responsible for appending keys in order, using implicit
|
||||
// knowledge of how keys are defined.
|
||||
let mut result = KeySpaceAccum::new();
|
||||
|
||||
// The dbdir metadata always exists
|
||||
result.add_key(DBDIR_KEY);
|
||||
|
||||
// Fetch list of database dirs and iterate them
|
||||
let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
|
||||
let dbdir = DbDirectory::des(&buf)?;
|
||||
|
||||
let mut metadata_keys = Vec::new();
|
||||
|
||||
let mut dbs: Vec<(Oid, Oid)> = dbdir.dbdirs.keys().cloned().collect();
|
||||
dbs.sort_unstable();
|
||||
for (spcnode, dbnode) in dbs {
|
||||
metadata_keys.push(relmap_file_key(spcnode, dbnode));
|
||||
metadata_keys.push(rel_dir_to_key(spcnode, dbnode));
|
||||
result.add_key(relmap_file_key(spcnode, dbnode));
|
||||
result.add_key(rel_dir_to_key(spcnode, dbnode));
|
||||
|
||||
let mut rels: Vec<RelTag> = self
|
||||
.list_rels(spcnode, dbnode, lsn, ctx)
|
||||
@@ -690,7 +689,7 @@ impl Timeline {
|
||||
let relsize = buf.get_u32_le();
|
||||
|
||||
result.add_range(rel_block_to_key(rel, 0)..rel_block_to_key(rel, relsize));
|
||||
metadata_keys.push(relsize_key);
|
||||
result.add_key(relsize_key);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -733,13 +732,6 @@ impl Timeline {
|
||||
if self.get(AUX_FILES_KEY, lsn, ctx).await.is_ok() {
|
||||
result.add_key(AUX_FILES_KEY);
|
||||
}
|
||||
|
||||
// The dbdir metadata always exists
|
||||
result.add_key(DBDIR_KEY);
|
||||
for key in metadata_keys {
|
||||
result.add_key(key);
|
||||
}
|
||||
|
||||
Ok(result.to_keyspace())
|
||||
}
|
||||
|
||||
@@ -1482,11 +1474,21 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
|
||||
//
|
||||
// Below is a full list of the keyspace allocation:
|
||||
//
|
||||
|
||||
// DbDir:
|
||||
// 00 00000000 00000000 00000000 00 00000000
|
||||
//
|
||||
// Filenodemap:
|
||||
// 00 SPCNODE DBNODE 00000000 00 00000000
|
||||
//
|
||||
// RelDir:
|
||||
// 00 SPCNODE DBNODE 00000000 00 00000001 (Postgres never uses relfilenode 0)
|
||||
//
|
||||
// RelBlock:
|
||||
// 00 SPCNODE DBNODE RELNODE FORK BLKNUM
|
||||
//
|
||||
// RelSize:
|
||||
// 00 SPCNODE DBNODE RELNODE FORK FFFFFFFF
|
||||
//
|
||||
// SlruDir:
|
||||
// 01 kind 00000000 00000000 00 00000000
|
||||
//
|
||||
@@ -1511,31 +1513,11 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
|
||||
// AuxFiles:
|
||||
// 03 00000000 00000000 00000000 00 00000002
|
||||
//
|
||||
// DbDir:
|
||||
// 04 00000000 00000000 00000000 00 00000000
|
||||
//
|
||||
// Filenodemap:
|
||||
// 04 SPCNODE DBNODE 00000000 00 00000000
|
||||
//
|
||||
// RelDir:
|
||||
// 04 SPCNODE DBNODE 00000000 00 00000001 (Postgres never uses relfilenode 0)
|
||||
//
|
||||
// RelSize:
|
||||
// 04 SPCNODE DBNODE RELNODE FORK FFFFFFFF
|
||||
|
||||
//-- Section 01: relation data and metadata
|
||||
|
||||
/// Keys above this Key are required to serve a basebackup request
|
||||
pub(crate) const BASEBACKUP_CUT: Key = slru_dir_to_key(SlruKind::Clog);
|
||||
|
||||
/// Keys aboe this Key are needed to make a logical size calculation
|
||||
///
|
||||
/// Ensuring that such keys are stored above the main range of user relation
|
||||
/// blocks enables much more efficient space management.
|
||||
pub(crate) const METADATA_CUT: Key = CONTROLFILE_KEY;
|
||||
|
||||
const DBDIR_KEY: Key = Key {
|
||||
field1: 0x04,
|
||||
field1: 0x00,
|
||||
field2: 0,
|
||||
field3: 0,
|
||||
field4: 0,
|
||||
@@ -1545,14 +1527,14 @@ const DBDIR_KEY: Key = Key {
|
||||
|
||||
fn dbdir_key_range(spcnode: Oid, dbnode: Oid) -> Range<Key> {
|
||||
Key {
|
||||
field1: 0x04,
|
||||
field1: 0x00,
|
||||
field2: spcnode,
|
||||
field3: dbnode,
|
||||
field4: 0,
|
||||
field5: 0,
|
||||
field6: 0,
|
||||
}..Key {
|
||||
field1: 0x04,
|
||||
field1: 0x00,
|
||||
field2: spcnode,
|
||||
field3: dbnode,
|
||||
field4: 0xffffffff,
|
||||
@@ -1563,7 +1545,7 @@ fn dbdir_key_range(spcnode: Oid, dbnode: Oid) -> Range<Key> {
|
||||
|
||||
fn relmap_file_key(spcnode: Oid, dbnode: Oid) -> Key {
|
||||
Key {
|
||||
field1: 0x04,
|
||||
field1: 0x00,
|
||||
field2: spcnode,
|
||||
field3: dbnode,
|
||||
field4: 0,
|
||||
@@ -1574,7 +1556,7 @@ fn relmap_file_key(spcnode: Oid, dbnode: Oid) -> Key {
|
||||
|
||||
fn rel_dir_to_key(spcnode: Oid, dbnode: Oid) -> Key {
|
||||
Key {
|
||||
field1: 0x04,
|
||||
field1: 0x00,
|
||||
field2: spcnode,
|
||||
field3: dbnode,
|
||||
field4: 0,
|
||||
@@ -1596,7 +1578,7 @@ fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key {
|
||||
|
||||
fn rel_size_to_key(rel: RelTag) -> Key {
|
||||
Key {
|
||||
field1: 0x04,
|
||||
field1: 0x00,
|
||||
field2: rel.spcnode,
|
||||
field3: rel.dbnode,
|
||||
field4: rel.relnode,
|
||||
@@ -1625,7 +1607,7 @@ fn rel_key_range(rel: RelTag) -> Range<Key> {
|
||||
|
||||
//-- Section 02: SLRUs
|
||||
|
||||
const fn slru_dir_to_key(kind: SlruKind) -> Key {
|
||||
fn slru_dir_to_key(kind: SlruKind) -> Key {
|
||||
Key {
|
||||
field1: 0x01,
|
||||
field2: match kind {
|
||||
|
||||
@@ -61,6 +61,7 @@ use self::mgr::TenantsMap;
|
||||
use self::remote_timeline_client::RemoteTimelineClient;
|
||||
use self::timeline::uninit::TimelineUninitMark;
|
||||
use self::timeline::uninit::UninitializedTimeline;
|
||||
use self::timeline::EvictionTaskTenantState;
|
||||
use self::timeline::TimelineResources;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
@@ -251,6 +252,8 @@ pub struct Tenant {
|
||||
cached_logical_sizes: tokio::sync::Mutex<HashMap<(TimelineId, Lsn), u64>>,
|
||||
cached_synthetic_tenant_size: Arc<AtomicU64>,
|
||||
|
||||
eviction_task_tenant_state: tokio::sync::Mutex<EvictionTaskTenantState>,
|
||||
|
||||
pub(crate) delete_progress: Arc<tokio::sync::Mutex<DeleteTenantFlow>>,
|
||||
|
||||
// Cancellation token fires when we have entered shutdown(). This is a parent of
|
||||
@@ -2364,6 +2367,7 @@ impl Tenant {
|
||||
state,
|
||||
cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()),
|
||||
cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
|
||||
eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
|
||||
delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())),
|
||||
cancel: CancellationToken::default(),
|
||||
gate: Gate::new(format!("Tenant<{tenant_id}>")),
|
||||
|
||||
@@ -5,7 +5,7 @@ use utils::{
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
use crate::{pgdatadir_mapping::METADATA_CUT, repository::Key};
|
||||
use crate::repository::Key;
|
||||
|
||||
use super::{DeltaFileName, ImageFileName, LayerFileName};
|
||||
|
||||
@@ -49,20 +49,6 @@ impl PersistentLayerDesc {
|
||||
}
|
||||
}
|
||||
|
||||
/// Does this layer consist exclusively of metadata
|
||||
/// content such as dbdir & relation sizes? This is a
|
||||
/// hint that the layer is likely to be small and should
|
||||
/// not be a candidate for eviction under normal circumstances.
|
||||
pub fn is_metadata_pages(&self) -> bool {
|
||||
self.key_range.start >= METADATA_CUT
|
||||
}
|
||||
|
||||
/// Does this layer consist exclusively of content
|
||||
/// required to serve a basebackup request?
|
||||
pub fn is_basebackup_pages(&self) -> bool {
|
||||
self.key_range.start >= METADATA_CUT
|
||||
}
|
||||
|
||||
pub fn short_id(&self) -> impl Display {
|
||||
self.filename()
|
||||
}
|
||||
|
||||
@@ -81,6 +81,8 @@ use crate::task_mgr::TaskKind;
|
||||
use crate::ZERO_PAGE;
|
||||
|
||||
use self::delete::DeleteTimelineFlow;
|
||||
pub(super) use self::eviction_task::EvictionTaskTenantState;
|
||||
use self::eviction_task::EvictionTaskTimelineState;
|
||||
use self::layer_manager::LayerManager;
|
||||
use self::logical_size::LogicalSize;
|
||||
use self::walreceiver::{WalReceiver, WalReceiverConf};
|
||||
@@ -296,6 +298,8 @@ pub struct Timeline {
|
||||
/// timeline is being deleted. If 'true', the timeline has already been deleted.
|
||||
pub delete_progress: Arc<tokio::sync::Mutex<DeleteTimelineFlow>>,
|
||||
|
||||
eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
|
||||
|
||||
/// Barrier to wait before doing initial logical size calculation. Used only during startup.
|
||||
initial_logical_size_can_start: Option<completion::Barrier>,
|
||||
|
||||
@@ -429,6 +433,7 @@ impl std::fmt::Display for PageReconstructError {
|
||||
pub enum LogicalSizeCalculationCause {
|
||||
Initial,
|
||||
ConsumptionMetricsSyntheticSize,
|
||||
EvictionTaskImitation,
|
||||
TenantSizeHandler,
|
||||
}
|
||||
|
||||
@@ -1437,6 +1442,9 @@ impl Timeline {
|
||||
|
||||
state,
|
||||
|
||||
eviction_task_timeline_state: tokio::sync::Mutex::new(
|
||||
EvictionTaskTimelineState::default(),
|
||||
),
|
||||
delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTimelineFlow::default())),
|
||||
|
||||
initial_logical_size_can_start,
|
||||
@@ -1959,6 +1967,9 @@ impl Timeline {
|
||||
LogicalSizeCalculationCause::Initial
|
||||
| LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize
|
||||
| LogicalSizeCalculationCause::TenantSizeHandler => &self.metrics.logical_size_histo,
|
||||
LogicalSizeCalculationCause::EvictionTaskImitation => {
|
||||
&self.metrics.imitate_logical_size_histo
|
||||
}
|
||||
};
|
||||
let timer = storage_time_metrics.start_timer();
|
||||
let logical_size = self
|
||||
@@ -2735,18 +2746,18 @@ impl Timeline {
|
||||
partition_size: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<(KeyPartitioning, Lsn)> {
|
||||
// {
|
||||
// let partitioning_guard = self.partitioning.lock().unwrap();
|
||||
// let distance = lsn.0 - partitioning_guard.1 .0;
|
||||
// if partitioning_guard.1 != Lsn(0) && distance <= self.repartition_threshold {
|
||||
// debug!(
|
||||
// distance,
|
||||
// threshold = self.repartition_threshold,
|
||||
// "no repartitioning needed"
|
||||
// );
|
||||
// return Ok((partitioning_guard.0.clone(), partitioning_guard.1));
|
||||
// }
|
||||
// }
|
||||
{
|
||||
let partitioning_guard = self.partitioning.lock().unwrap();
|
||||
let distance = lsn.0 - partitioning_guard.1 .0;
|
||||
if partitioning_guard.1 != Lsn(0) && distance <= self.repartition_threshold {
|
||||
debug!(
|
||||
distance,
|
||||
threshold = self.repartition_threshold,
|
||||
"no repartitioning needed"
|
||||
);
|
||||
return Ok((partitioning_guard.0.clone(), partitioning_guard.1));
|
||||
}
|
||||
}
|
||||
let keyspace = self.collect_keyspace(lsn, ctx).await?;
|
||||
let partitioning = keyspace.partition(partition_size);
|
||||
|
||||
@@ -4274,11 +4285,6 @@ impl Timeline {
|
||||
let file_size = l.file_size();
|
||||
max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
|
||||
|
||||
// Don't evict small layers required to serve a basebackup
|
||||
if l.is_basebackup_pages() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let l = guard.get_from_desc(&l);
|
||||
|
||||
let l = match l.keep_resident().await {
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
//!
|
||||
//! See write-up on restart on-demand download spike: <https://gist.github.com/problame/2265bf7b8dc398be834abfead36c76b5>
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
ops::ControlFlow,
|
||||
sync::Arc,
|
||||
time::{Duration, SystemTime},
|
||||
@@ -21,15 +22,17 @@ use std::{
|
||||
|
||||
use tokio::time::Instant;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, instrument, warn};
|
||||
use tracing::{debug, error, info, info_span, instrument, warn, Instrument};
|
||||
|
||||
use crate::{
|
||||
context::{DownloadBehavior, RequestContext},
|
||||
pgdatadir_mapping::CollectKeySpaceError,
|
||||
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
|
||||
tenant::{
|
||||
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
|
||||
tasks::{BackgroundLoopKind, RateLimitError},
|
||||
timeline::EvictionError,
|
||||
LogicalSizeCalculationCause, Tenant,
|
||||
},
|
||||
};
|
||||
|
||||
@@ -37,6 +40,16 @@ use utils::completion;
|
||||
|
||||
use super::Timeline;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct EvictionTaskTimelineState {
|
||||
last_layer_access_imitation: Option<tokio::time::Instant>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct EvictionTaskTenantState {
|
||||
last_layer_access_imitation: Option<Instant>,
|
||||
}
|
||||
|
||||
impl Timeline {
|
||||
pub(super) fn launch_eviction_task(
|
||||
self: &Arc<Self>,
|
||||
@@ -165,6 +178,7 @@ impl Timeline {
|
||||
// that were accessed to compute the value in the first place.
|
||||
// 3. Invalidate the caches at a period of < p.threshold/2, so that the values
|
||||
// get re-computed from layers, thereby counting towards layer access stats.
|
||||
// 4. Make the eviction task imitate the layer accesses that typically hit caches.
|
||||
//
|
||||
// We follow approach (4) here because in Neon prod deployment:
|
||||
// - page cache is quite small => high churn => low hit rate
|
||||
@@ -176,6 +190,10 @@ impl Timeline {
|
||||
//
|
||||
// We should probably move to persistent caches in the future, or avoid
|
||||
// having inactive tenants attached to pageserver in the first place.
|
||||
match self.imitate_layer_accesses(p, cancel, ctx).await {
|
||||
ControlFlow::Break(()) => return ControlFlow::Break(()),
|
||||
ControlFlow::Continue(()) => (),
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[derive(Debug, Default)]
|
||||
@@ -197,11 +215,6 @@ impl Timeline {
|
||||
let layers = guard.layer_map();
|
||||
let mut candidates = Vec::new();
|
||||
for hist_layer in layers.iter_historic_layers() {
|
||||
// Don't evict the small layers needed to serve a basebackup request.
|
||||
if hist_layer.is_basebackup_pages() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let hist_layer = guard.get_from_desc(&hist_layer);
|
||||
|
||||
// guard against eviction while we inspect it; it might be that eviction_task and
|
||||
@@ -297,4 +310,170 @@ impl Timeline {
|
||||
}
|
||||
ControlFlow::Continue(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn imitate_layer_accesses(
|
||||
&self,
|
||||
p: &EvictionPolicyLayerAccessThreshold,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> ControlFlow<()> {
|
||||
let mut state = self.eviction_task_timeline_state.lock().await;
|
||||
|
||||
// Only do the imitate_layer accesses approximately as often as the threshold. A little
|
||||
// more frequently, to avoid this period racing with the threshold/period-th eviction iteration.
|
||||
let inter_imitate_period = p.threshold.checked_sub(p.period).unwrap_or(p.threshold);
|
||||
|
||||
match state.last_layer_access_imitation {
|
||||
Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
|
||||
_ => {
|
||||
self.imitate_timeline_cached_layer_accesses(ctx).await;
|
||||
state.last_layer_access_imitation = Some(tokio::time::Instant::now())
|
||||
}
|
||||
}
|
||||
drop(state);
|
||||
|
||||
if cancel.is_cancelled() {
|
||||
return ControlFlow::Break(());
|
||||
}
|
||||
|
||||
// This task is timeline-scoped, but the synthetic size calculation is tenant-scoped.
|
||||
// Make one of the tenant's timelines draw the short straw and run the calculation.
|
||||
// The others wait until the calculation is done so that they take into account the
|
||||
// imitated accesses that the winner made.
|
||||
let tenant = match crate::tenant::mgr::get_tenant(self.tenant_id, true) {
|
||||
Ok(t) => t,
|
||||
Err(_) => {
|
||||
return ControlFlow::Break(());
|
||||
}
|
||||
};
|
||||
let mut state = tenant.eviction_task_tenant_state.lock().await;
|
||||
match state.last_layer_access_imitation {
|
||||
Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
|
||||
_ => {
|
||||
self.imitate_synthetic_size_calculation_worker(&tenant, ctx, cancel)
|
||||
.await;
|
||||
state.last_layer_access_imitation = Some(tokio::time::Instant::now());
|
||||
}
|
||||
}
|
||||
drop(state);
|
||||
|
||||
if cancel.is_cancelled() {
|
||||
return ControlFlow::Break(());
|
||||
}
|
||||
|
||||
ControlFlow::Continue(())
|
||||
}
|
||||
|
||||
/// Recompute the values which would cause on-demand downloads during restart.
|
||||
#[instrument(skip_all)]
|
||||
async fn imitate_timeline_cached_layer_accesses(&self, ctx: &RequestContext) {
|
||||
let lsn = self.get_last_record_lsn();
|
||||
|
||||
// imitiate on-restart initial logical size
|
||||
let size = self
|
||||
.calculate_logical_size(lsn, LogicalSizeCalculationCause::EvictionTaskImitation, ctx)
|
||||
.instrument(info_span!("calculate_logical_size"))
|
||||
.await;
|
||||
|
||||
match &size {
|
||||
Ok(_size) => {
|
||||
// good, don't log it to avoid confusion
|
||||
}
|
||||
Err(_) => {
|
||||
// we have known issues for which we already log this on consumption metrics,
|
||||
// gc, and compaction. leave logging out for now.
|
||||
//
|
||||
// https://github.com/neondatabase/neon/issues/2539
|
||||
}
|
||||
}
|
||||
|
||||
// imitiate repartiting on first compactation
|
||||
if let Err(e) = self
|
||||
.collect_keyspace(lsn, ctx)
|
||||
.instrument(info_span!("collect_keyspace"))
|
||||
.await
|
||||
{
|
||||
// if this failed, we probably failed logical size because these use the same keys
|
||||
if size.is_err() {
|
||||
// ignore, see above comment
|
||||
} else {
|
||||
match e {
|
||||
CollectKeySpaceError::Cancelled => {
|
||||
// Shutting down, ignore
|
||||
}
|
||||
err => {
|
||||
warn!(
|
||||
"failed to collect keyspace but succeeded in calculating logical size: {err:#}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Imitate the synthetic size calculation done by the consumption_metrics module.
|
||||
#[instrument(skip_all)]
|
||||
async fn imitate_synthetic_size_calculation_worker(
|
||||
&self,
|
||||
tenant: &Arc<Tenant>,
|
||||
ctx: &RequestContext,
|
||||
cancel: &CancellationToken,
|
||||
) {
|
||||
if self.conf.metric_collection_endpoint.is_none() {
|
||||
// We don't start the consumption metrics task if this is not set in the config.
|
||||
// So, no need to imitate the accesses in that case.
|
||||
return;
|
||||
}
|
||||
|
||||
// The consumption metrics are collected on a per-tenant basis, by a single
|
||||
// global background loop.
|
||||
// It limits the number of synthetic size calculations using the global
|
||||
// `concurrent_tenant_size_logical_size_queries` semaphore to not overload
|
||||
// the pageserver. (size calculation is somewhat expensive in terms of CPU and IOs).
|
||||
//
|
||||
// If we used that same semaphore here, then we'd compete for the
|
||||
// same permits, which may impact timeliness of consumption metrics.
|
||||
// That is a no-go, as consumption metrics are much more important
|
||||
// than what we do here.
|
||||
//
|
||||
// So, we have a separate semaphore, initialized to the same
|
||||
// number of permits as the `concurrent_tenant_size_logical_size_queries`.
|
||||
// In the worst, we would have twice the amount of concurrenct size calculations.
|
||||
// But in practice, the `p.threshold` >> `consumption metric interval`, and
|
||||
// we spread out the eviction task using `random_init_delay`.
|
||||
// So, the chance of the worst case is quite low in practice.
|
||||
// It runs as a per-tenant task, but the eviction_task.rs is per-timeline.
|
||||
// So, we must coordinate with other with other eviction tasks of this tenant.
|
||||
let limit = self
|
||||
.conf
|
||||
.eviction_task_immitated_concurrent_logical_size_queries
|
||||
.inner();
|
||||
|
||||
let mut throwaway_cache = HashMap::new();
|
||||
let gather = crate::tenant::size::gather_inputs(
|
||||
tenant,
|
||||
limit,
|
||||
None,
|
||||
&mut throwaway_cache,
|
||||
LogicalSizeCalculationCause::EvictionTaskImitation,
|
||||
ctx,
|
||||
)
|
||||
.instrument(info_span!("gather_inputs"));
|
||||
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => {}
|
||||
gather_result = gather => {
|
||||
match gather_result {
|
||||
Ok(_) => {},
|
||||
Err(e) => {
|
||||
// We don't care about the result, but, if it failed, we should log it,
|
||||
// since consumption metric might be hitting the cached value and
|
||||
// thus not encountering this error.
|
||||
warn!("failed to imitate synthetic size calculation accesses: {e:#}")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1154,6 +1154,42 @@ class NeonCli(AbstractNeonCli):
|
||||
kwargs["local_binpath"] = True
|
||||
return super().raw_cli(*args, **kwargs)
|
||||
|
||||
|
||||
def merge(
|
||||
self,
|
||||
src_endpoint: Endpoint,
|
||||
dst_endpoint: Endpoint):
|
||||
"""
|
||||
Merge two branches
|
||||
"""
|
||||
|
||||
args = [
|
||||
"timeline",
|
||||
"merge",
|
||||
"--src-endpoint",
|
||||
str(src_endpoint.endpoint_id),
|
||||
"--dst-endpoint",
|
||||
str(dst_endpoint.endpoint_id)
|
||||
]
|
||||
res = self.raw_cli(args)
|
||||
res.check_returncode()
|
||||
|
||||
def set_mergeable(
|
||||
self,
|
||||
endpoint: Endpoint):
|
||||
"""
|
||||
Merge two branches
|
||||
"""
|
||||
|
||||
args = [
|
||||
"timeline",
|
||||
"set_mergeable",
|
||||
"--endpoint",
|
||||
str(endpoint.endpoint_id),
|
||||
]
|
||||
res = self.raw_cli(args)
|
||||
res.check_returncode()
|
||||
|
||||
def create_tenant(
|
||||
self,
|
||||
tenant_id: Optional[TenantId] = None,
|
||||
|
||||
43
test_runner/regress/test_merge.py
Normal file
43
test_runner/regress/test_merge.py
Normal file
@@ -0,0 +1,43 @@
|
||||
import time
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.types import TimelineId
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
#
|
||||
# Merge ancestor branch with the main branch.
|
||||
#
|
||||
def test_merge(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
# Override defaults: 4M checkpoint_distance, disable background compaction and gc.
|
||||
tenant, _ = env.neon_cli.create_tenant()
|
||||
|
||||
main_branch = env.endpoints.create_start("main", tenant_id=tenant)
|
||||
main_cur = main_branch.connect().cursor()
|
||||
|
||||
# Create table and insert some data
|
||||
main_cur.execute("CREATE TABLE t(x bigint primary key)")
|
||||
main_cur.execute("INSERT INTO t values(generate_series(1, 10000))");
|
||||
|
||||
# Create branch ws.
|
||||
env.neon_cli.create_branch("ws", "main", tenant_id=tenant)
|
||||
ws_branch = env.endpoints.create_start("ws", tenant_id=tenant)
|
||||
log.info("postgres is running on 'ws' branch")
|
||||
|
||||
# Merge brnach ws as mergeable:it create logical replication slots and pins WAL
|
||||
env.neon_cli.set_mergeable(ws_branch)
|
||||
|
||||
# Insert more data in the branch
|
||||
ws_cur = ws_branch.connect().cursor()
|
||||
ws_cur.execute("INSERT INTO t values(generate_series(10001, 20000))")
|
||||
|
||||
# Merge ws brnach intp main
|
||||
env.neon_cli.merge(ws_branch, main_branch)
|
||||
|
||||
# sleep for some time until changes are applied
|
||||
time.sleep(2)
|
||||
|
||||
# Check that changes are merged
|
||||
assert query_scalar(main_cur, "SELECT count(*) from t") == 20000
|
||||
Reference in New Issue
Block a user