Merge branch 'main' of github.com:neondatabase/neon into pg-extensions

This commit is contained in:
Alek Westover
2023-06-07 17:17:10 -04:00
38 changed files with 930 additions and 419 deletions

3
Cargo.lock generated
View File

@@ -2876,7 +2876,6 @@ dependencies = [
"serde",
"thiserror",
"utils",
"wal_craft",
"workspace_hack",
]
@@ -4896,7 +4895,9 @@ dependencies = [
"once_cell",
"postgres",
"postgres_ffi",
"regex",
"tempfile",
"utils",
"workspace_hack",
]

View File

@@ -9,7 +9,20 @@ members = [
"storage_broker",
"workspace_hack",
"trace",
"libs/*",
"libs/compute_api",
"libs/pageserver_api",
"libs/postgres_ffi",
"libs/safekeeper_api",
"libs/utils",
"libs/consumption_metrics",
"libs/postgres_backend",
"libs/pq_proto",
"libs/tenant_size_model",
"libs/metrics",
"libs/postgres_connection",
"libs/remote_storage",
"libs/tracing-utils",
"libs/postgres_ffi/wal_craft",
]
[workspace.package]

View File

@@ -531,7 +531,7 @@ RUN wget https://github.com/pksunkara/pgx_ulid/archive/refs/tags/v0.1.0.tar.gz -
mkdir pgx_ulid-src && cd pgx_ulid-src && tar xvzf ../pgx_ulid.tar.gz --strip-components=1 -C . && \
sed -i 's/pgx = "=0.7.3"/pgx = { version = "0.7.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
cargo pgx install --release && \
echo "trusted = true" >> /usr/local/pgsql/share/extension/pgx_ulid.control
echo "trusted = true" >> /usr/local/pgsql/share/extension/ulid.control
#########################################################################################
#

View File

@@ -110,12 +110,11 @@ impl TenantState {
Self::Active => Attached,
// If the (initial or resumed) attach procedure fails, the tenant becomes Broken.
// However, it also becomes Broken if the regular load fails.
// We would need a separate TenantState variant to distinguish these cases.
// However, there's no practical difference from Console's perspective.
// It will run a Postgres-level health check as soon as it observes Attached.
// That will fail on Broken tenants.
// Console can then rollback the attach, or, wait for operator to fix the Broken tenant.
Self::Broken { .. } => Attached,
// From Console's perspective there's no practical difference
// because attachment_status is polled by console only during attach operation execution.
Self::Broken { reason, .. } => Failed {
reason: reason.to_owned(),
},
// Why is Stopping a Maybe case? Because, during pageserver shutdown,
// we set the Stopping state irrespective of whether the tenant
// has finished attaching or not.
@@ -312,10 +311,11 @@ impl std::ops::Deref for TenantAttachConfig {
/// See [`TenantState::attachment_status`] and the OpenAPI docs for context.
#[derive(Serialize, Deserialize, Clone)]
#[serde(rename_all = "snake_case")]
#[serde(tag = "slug", content = "data", rename_all = "snake_case")]
pub enum TenantAttachmentStatus {
Maybe,
Attached,
Failed { reason: String },
}
#[serde_as]
@@ -809,7 +809,9 @@ mod tests {
"slug": "Active",
},
"current_physical_size": 42,
"attachment_status": "attached",
"attachment_status": {
"slug":"attached",
}
});
let original_broken = TenantInfo {
@@ -831,7 +833,9 @@ mod tests {
}
},
"current_physical_size": 42,
"attachment_status": "attached",
"attachment_status": {
"slug":"attached",
}
});
assert_eq!(

View File

@@ -24,7 +24,6 @@ workspace_hack.workspace = true
[dev-dependencies]
env_logger.workspace = true
postgres.workspace = true
wal_craft = { path = "wal_craft" }
[build-dependencies]
anyhow.workspace = true

View File

@@ -33,6 +33,7 @@ macro_rules! postgres_ffi {
}
pub mod controlfile_utils;
pub mod nonrelfile_utils;
pub mod wal_craft_test_export;
pub mod waldecoder_handler;
pub mod xlog_utils;
@@ -45,8 +46,15 @@ macro_rules! postgres_ffi {
};
}
postgres_ffi!(v14);
postgres_ffi!(v15);
#[macro_export]
macro_rules! for_all_postgres_versions {
($macro:tt) => {
$macro!(v14);
$macro!(v15);
};
}
for_all_postgres_versions! { postgres_ffi }
pub mod pg_constants;
pub mod relfile_utils;

View File

@@ -0,0 +1,6 @@
//! This module is for WAL craft to test with postgres_ffi. Should not import any thing in normal usage.
pub use super::PG_MAJORVERSION;
pub use super::xlog_utils::*;
pub use super::bindings::*;
pub use crate::WAL_SEGMENT_SIZE;

View File

@@ -481,220 +481,4 @@ pub fn encode_logical_message(prefix: &str, message: &str) -> Vec<u8> {
wal
}
#[cfg(test)]
mod tests {
use super::super::PG_MAJORVERSION;
use super::*;
use regex::Regex;
use std::cmp::min;
use std::fs;
use std::{env, str::FromStr};
use utils::const_assert;
fn init_logging() {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or(
format!("wal_craft=info,postgres_ffi::{PG_MAJORVERSION}::xlog_utils=trace"),
))
.is_test(true)
.try_init();
}
fn test_end_of_wal<C: wal_craft::Crafter>(test_name: &str) {
use wal_craft::*;
let pg_version = PG_MAJORVERSION[1..3].parse::<u32>().unwrap();
// Craft some WAL
let top_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("..")
.join("..");
let cfg = Conf {
pg_version,
pg_distrib_dir: top_path.join("pg_install"),
datadir: top_path.join(format!("test_output/{}-{PG_MAJORVERSION}", test_name)),
};
if cfg.datadir.exists() {
fs::remove_dir_all(&cfg.datadir).unwrap();
}
cfg.initdb().unwrap();
let srv = cfg.start_server().unwrap();
let (intermediate_lsns, expected_end_of_wal_partial) =
C::craft(&mut srv.connect_with_timeout().unwrap()).unwrap();
let intermediate_lsns: Vec<Lsn> = intermediate_lsns
.iter()
.map(|&lsn| u64::from(lsn).into())
.collect();
let expected_end_of_wal: Lsn = u64::from(expected_end_of_wal_partial).into();
srv.kill();
// Check find_end_of_wal on the initial WAL
let last_segment = cfg
.wal_dir()
.read_dir()
.unwrap()
.map(|f| f.unwrap().file_name().into_string().unwrap())
.filter(|fname| IsXLogFileName(fname))
.max()
.unwrap();
check_pg_waldump_end_of_wal(&cfg, &last_segment, expected_end_of_wal);
for start_lsn in intermediate_lsns
.iter()
.chain(std::iter::once(&expected_end_of_wal))
{
// Erase all WAL before `start_lsn` to ensure it's not used by `find_end_of_wal`.
// We assume that `start_lsn` is non-decreasing.
info!(
"Checking with start_lsn={}, erasing WAL before it",
start_lsn
);
for file in fs::read_dir(cfg.wal_dir()).unwrap().flatten() {
let fname = file.file_name().into_string().unwrap();
if !IsXLogFileName(&fname) {
continue;
}
let (segno, _) = XLogFromFileName(&fname, WAL_SEGMENT_SIZE);
let seg_start_lsn = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE);
if seg_start_lsn > u64::from(*start_lsn) {
continue;
}
let mut f = File::options().write(true).open(file.path()).unwrap();
const ZEROS: [u8; WAL_SEGMENT_SIZE] = [0u8; WAL_SEGMENT_SIZE];
f.write_all(
&ZEROS[0..min(
WAL_SEGMENT_SIZE,
(u64::from(*start_lsn) - seg_start_lsn) as usize,
)],
)
.unwrap();
}
check_end_of_wal(&cfg, &last_segment, *start_lsn, expected_end_of_wal);
}
}
fn check_pg_waldump_end_of_wal(
cfg: &wal_craft::Conf,
last_segment: &str,
expected_end_of_wal: Lsn,
) {
// Get the actual end of WAL by pg_waldump
let waldump_output = cfg
.pg_waldump("000000010000000000000001", last_segment)
.unwrap()
.stderr;
let waldump_output = std::str::from_utf8(&waldump_output).unwrap();
let caps = match Regex::new(r"invalid record length at (.+):")
.unwrap()
.captures(waldump_output)
{
Some(caps) => caps,
None => {
error!("Unable to parse pg_waldump's stderr:\n{}", waldump_output);
panic!();
}
};
let waldump_wal_end = Lsn::from_str(caps.get(1).unwrap().as_str()).unwrap();
info!(
"waldump erred on {}, expected wal end at {}",
waldump_wal_end, expected_end_of_wal
);
assert_eq!(waldump_wal_end, expected_end_of_wal);
}
fn check_end_of_wal(
cfg: &wal_craft::Conf,
last_segment: &str,
start_lsn: Lsn,
expected_end_of_wal: Lsn,
) {
// Check end_of_wal on non-partial WAL segment (we treat it as fully populated)
// let wal_end = find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, start_lsn).unwrap();
// info!(
// "find_end_of_wal returned wal_end={} with non-partial WAL segment",
// wal_end
// );
// assert_eq!(wal_end, expected_end_of_wal_non_partial);
// Rename file to partial to actually find last valid lsn, then rename it back.
fs::rename(
cfg.wal_dir().join(last_segment),
cfg.wal_dir().join(format!("{}.partial", last_segment)),
)
.unwrap();
let wal_end = find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, start_lsn).unwrap();
info!(
"find_end_of_wal returned wal_end={} with partial WAL segment",
wal_end
);
assert_eq!(wal_end, expected_end_of_wal);
fs::rename(
cfg.wal_dir().join(format!("{}.partial", last_segment)),
cfg.wal_dir().join(last_segment),
)
.unwrap();
}
const_assert!(WAL_SEGMENT_SIZE == 16 * 1024 * 1024);
#[test]
pub fn test_find_end_of_wal_simple() {
init_logging();
test_end_of_wal::<wal_craft::Simple>("test_find_end_of_wal_simple");
}
#[test]
pub fn test_find_end_of_wal_crossing_segment_followed_by_small_one() {
init_logging();
test_end_of_wal::<wal_craft::WalRecordCrossingSegmentFollowedBySmallOne>(
"test_find_end_of_wal_crossing_segment_followed_by_small_one",
);
}
#[test]
pub fn test_find_end_of_wal_last_crossing_segment() {
init_logging();
test_end_of_wal::<wal_craft::LastWalRecordCrossingSegment>(
"test_find_end_of_wal_last_crossing_segment",
);
}
/// Check the math in update_next_xid
///
/// NOTE: These checks are sensitive to the value of XID_CHECKPOINT_INTERVAL,
/// currently 1024.
#[test]
pub fn test_update_next_xid() {
let checkpoint_buf = [0u8; std::mem::size_of::<CheckPoint>()];
let mut checkpoint = CheckPoint::decode(&checkpoint_buf).unwrap();
checkpoint.nextXid = FullTransactionId { value: 10 };
assert_eq!(checkpoint.nextXid.value, 10);
// The input XID gets rounded up to the next XID_CHECKPOINT_INTERVAL
// boundary
checkpoint.update_next_xid(100);
assert_eq!(checkpoint.nextXid.value, 1024);
// No change
checkpoint.update_next_xid(500);
assert_eq!(checkpoint.nextXid.value, 1024);
checkpoint.update_next_xid(1023);
assert_eq!(checkpoint.nextXid.value, 1024);
// The function returns the *next* XID, given the highest XID seen so
// far. So when we pass 1024, the nextXid gets bumped up to the next
// XID_CHECKPOINT_INTERVAL boundary.
checkpoint.update_next_xid(1024);
assert_eq!(checkpoint.nextXid.value, 2048);
}
#[test]
pub fn test_encode_logical_message() {
let expected = [
64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 21, 0, 0, 170, 34, 166, 227, 255,
38, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 112, 114,
101, 102, 105, 120, 0, 109, 101, 115, 115, 97, 103, 101,
];
let actual = encode_logical_message("prefix", "message");
assert_eq!(expected, actual[..]);
}
}
// If you need to craft WAL and write tests for this module, put it at wal_craft crate.

View File

@@ -15,3 +15,7 @@ postgres_ffi.workspace = true
tempfile.workspace = true
workspace_hack.workspace = true
[dev-dependencies]
regex.workspace = true
utils.workspace = true

View File

@@ -10,6 +10,20 @@ use std::process::Command;
use std::time::{Duration, Instant};
use tempfile::{tempdir, TempDir};
macro_rules! xlog_utils_test {
($version:ident) => {
#[path = "."]
mod $version {
pub use postgres_ffi::$version::wal_craft_test_export::*;
#[allow(clippy::duplicate_mod)]
#[cfg(test)]
mod xlog_utils_test;
}
};
}
postgres_ffi::for_all_postgres_versions! { xlog_utils_test }
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Conf {
pub pg_version: u32,

View File

@@ -0,0 +1,219 @@
//! Tests for postgres_ffi xlog_utils module. Put it here to break cyclic dependency.
use super::*;
use crate::{error, info};
use regex::Regex;
use std::cmp::min;
use std::fs::{self, File};
use std::io::Write;
use std::{env, str::FromStr};
use utils::const_assert;
use utils::lsn::Lsn;
fn init_logging() {
let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or(
format!("crate=info,postgres_ffi::{PG_MAJORVERSION}::xlog_utils=trace"),
))
.is_test(true)
.try_init();
}
fn test_end_of_wal<C: crate::Crafter>(test_name: &str) {
use crate::*;
let pg_version = PG_MAJORVERSION[1..3].parse::<u32>().unwrap();
// Craft some WAL
let top_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("..")
.join("..")
.join("..");
let cfg = Conf {
pg_version,
pg_distrib_dir: top_path.join("pg_install"),
datadir: top_path.join(format!("test_output/{}-{PG_MAJORVERSION}", test_name)),
};
if cfg.datadir.exists() {
fs::remove_dir_all(&cfg.datadir).unwrap();
}
cfg.initdb().unwrap();
let srv = cfg.start_server().unwrap();
let (intermediate_lsns, expected_end_of_wal_partial) =
C::craft(&mut srv.connect_with_timeout().unwrap()).unwrap();
let intermediate_lsns: Vec<Lsn> = intermediate_lsns
.iter()
.map(|&lsn| u64::from(lsn).into())
.collect();
let expected_end_of_wal: Lsn = u64::from(expected_end_of_wal_partial).into();
srv.kill();
// Check find_end_of_wal on the initial WAL
let last_segment = cfg
.wal_dir()
.read_dir()
.unwrap()
.map(|f| f.unwrap().file_name().into_string().unwrap())
.filter(|fname| IsXLogFileName(fname))
.max()
.unwrap();
check_pg_waldump_end_of_wal(&cfg, &last_segment, expected_end_of_wal);
for start_lsn in intermediate_lsns
.iter()
.chain(std::iter::once(&expected_end_of_wal))
{
// Erase all WAL before `start_lsn` to ensure it's not used by `find_end_of_wal`.
// We assume that `start_lsn` is non-decreasing.
info!(
"Checking with start_lsn={}, erasing WAL before it",
start_lsn
);
for file in fs::read_dir(cfg.wal_dir()).unwrap().flatten() {
let fname = file.file_name().into_string().unwrap();
if !IsXLogFileName(&fname) {
continue;
}
let (segno, _) = XLogFromFileName(&fname, WAL_SEGMENT_SIZE);
let seg_start_lsn = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE);
if seg_start_lsn > u64::from(*start_lsn) {
continue;
}
let mut f = File::options().write(true).open(file.path()).unwrap();
const ZEROS: [u8; WAL_SEGMENT_SIZE] = [0u8; WAL_SEGMENT_SIZE];
f.write_all(
&ZEROS[0..min(
WAL_SEGMENT_SIZE,
(u64::from(*start_lsn) - seg_start_lsn) as usize,
)],
)
.unwrap();
}
check_end_of_wal(&cfg, &last_segment, *start_lsn, expected_end_of_wal);
}
}
fn check_pg_waldump_end_of_wal(
cfg: &crate::Conf,
last_segment: &str,
expected_end_of_wal: Lsn,
) {
// Get the actual end of WAL by pg_waldump
let waldump_output = cfg
.pg_waldump("000000010000000000000001", last_segment)
.unwrap()
.stderr;
let waldump_output = std::str::from_utf8(&waldump_output).unwrap();
let caps = match Regex::new(r"invalid record length at (.+):")
.unwrap()
.captures(waldump_output)
{
Some(caps) => caps,
None => {
error!("Unable to parse pg_waldump's stderr:\n{}", waldump_output);
panic!();
}
};
let waldump_wal_end = Lsn::from_str(caps.get(1).unwrap().as_str()).unwrap();
info!(
"waldump erred on {}, expected wal end at {}",
waldump_wal_end, expected_end_of_wal
);
assert_eq!(waldump_wal_end, expected_end_of_wal);
}
fn check_end_of_wal(
cfg: &crate::Conf,
last_segment: &str,
start_lsn: Lsn,
expected_end_of_wal: Lsn,
) {
// Check end_of_wal on non-partial WAL segment (we treat it as fully populated)
// let wal_end = find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, start_lsn).unwrap();
// info!(
// "find_end_of_wal returned wal_end={} with non-partial WAL segment",
// wal_end
// );
// assert_eq!(wal_end, expected_end_of_wal_non_partial);
// Rename file to partial to actually find last valid lsn, then rename it back.
fs::rename(
cfg.wal_dir().join(last_segment),
cfg.wal_dir().join(format!("{}.partial", last_segment)),
)
.unwrap();
let wal_end = find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, start_lsn).unwrap();
info!(
"find_end_of_wal returned wal_end={} with partial WAL segment",
wal_end
);
assert_eq!(wal_end, expected_end_of_wal);
fs::rename(
cfg.wal_dir().join(format!("{}.partial", last_segment)),
cfg.wal_dir().join(last_segment),
)
.unwrap();
}
const_assert!(WAL_SEGMENT_SIZE == 16 * 1024 * 1024);
#[test]
pub fn test_find_end_of_wal_simple() {
init_logging();
test_end_of_wal::<crate::Simple>("test_find_end_of_wal_simple");
}
#[test]
pub fn test_find_end_of_wal_crossing_segment_followed_by_small_one() {
init_logging();
test_end_of_wal::<crate::WalRecordCrossingSegmentFollowedBySmallOne>(
"test_find_end_of_wal_crossing_segment_followed_by_small_one",
);
}
#[test]
pub fn test_find_end_of_wal_last_crossing_segment() {
init_logging();
test_end_of_wal::<crate::LastWalRecordCrossingSegment>(
"test_find_end_of_wal_last_crossing_segment",
);
}
/// Check the math in update_next_xid
///
/// NOTE: These checks are sensitive to the value of XID_CHECKPOINT_INTERVAL,
/// currently 1024.
#[test]
pub fn test_update_next_xid() {
let checkpoint_buf = [0u8; std::mem::size_of::<CheckPoint>()];
let mut checkpoint = CheckPoint::decode(&checkpoint_buf).unwrap();
checkpoint.nextXid = FullTransactionId { value: 10 };
assert_eq!(checkpoint.nextXid.value, 10);
// The input XID gets rounded up to the next XID_CHECKPOINT_INTERVAL
// boundary
checkpoint.update_next_xid(100);
assert_eq!(checkpoint.nextXid.value, 1024);
// No change
checkpoint.update_next_xid(500);
assert_eq!(checkpoint.nextXid.value, 1024);
checkpoint.update_next_xid(1023);
assert_eq!(checkpoint.nextXid.value, 1024);
// The function returns the *next* XID, given the highest XID seen so
// far. So when we pass 1024, the nextXid gets bumped up to the next
// XID_CHECKPOINT_INTERVAL boundary.
checkpoint.update_next_xid(1024);
assert_eq!(checkpoint.nextXid.value, 2048);
}
#[test]
pub fn test_encode_logical_message() {
let expected = [
64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 21, 0, 0, 170, 34, 166, 227, 255,
38, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 112, 114,
101, 102, 105, 120, 0, 109, 101, 115, 115, 97, 103, 101,
];
let actual = encode_logical_message("prefix", "message");
assert_eq!(expected, actual[..]);
}

View File

@@ -33,7 +33,7 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap<LayerDescriptor> {
min_lsn = min(min_lsn, lsn_range.start);
max_lsn = max(max_lsn, Lsn(lsn_range.end.0 - 1));
updates.insert_historic(Arc::new(layer));
updates.insert_historic(layer.get_persistent_layer_desc(), Arc::new(layer));
}
println!("min: {min_lsn}, max: {max_lsn}");
@@ -215,7 +215,7 @@ fn bench_sequential(c: &mut Criterion) {
is_incremental: false,
short_id: format!("Layer {}", i),
};
updates.insert_historic(Arc::new(layer));
updates.insert_historic(layer.get_persistent_layer_desc(), Arc::new(layer));
}
updates.flush();
println!("Finished layer map init in {:?}", now.elapsed());

View File

@@ -337,33 +337,114 @@ fn start_pageserver(
// Startup staging or optimizing:
//
// (init_done_tx, init_done_rx) are used to control when do background loops start. This is to
// avoid starving out the BACKGROUND_RUNTIME async worker threads doing heavy work, like
// initial repartitioning while we still have Loading tenants.
// We want to minimize downtime for `page_service` connections, and trying not to overload
// BACKGROUND_RUNTIME by doing initial compactions and initial logical sizes at the same time.
//
// init_done_rx is a barrier which stops waiting once all init_done_tx clones are dropped.
// init_done_rx will notify when all initial load operations have completed.
//
// background_jobs_can_start (same name used to hold off background jobs from starting at
// consumer side) will be dropped once we can start the background jobs. Currently it is behind
// completing all initial logical size calculations (init_logical_size_done_rx) and a timeout
// (background_task_maximum_delay).
let (init_done_tx, init_done_rx) = utils::completion::channel();
let (init_logical_size_done_tx, init_logical_size_done_rx) = utils::completion::channel();
let (background_jobs_can_start, background_jobs_barrier) = utils::completion::channel();
let order = pageserver::InitializationOrder {
initial_tenant_load: Some(init_done_tx),
initial_logical_size_can_start: init_done_rx.clone(),
initial_logical_size_attempt: init_logical_size_done_tx,
background_jobs_can_start: background_jobs_barrier.clone(),
};
// Scan the local 'tenants/' directory and start loading the tenants
let init_started_at = std::time::Instant::now();
let shutdown_pageserver = tokio_util::sync::CancellationToken::new();
BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
conf,
broker_client.clone(),
remote_storage.clone(),
(init_done_tx, init_done_rx.clone()),
order,
))?;
BACKGROUND_RUNTIME.spawn({
let init_done_rx = init_done_rx.clone();
async move {
init_done_rx.wait().await;
let init_done_rx = init_done_rx;
let shutdown_pageserver = shutdown_pageserver.clone();
let drive_init = async move {
// NOTE: unlike many futures in pageserver, this one is cancellation-safe
let guard = scopeguard::guard_on_success((), |_| tracing::info!("Cancelled before initial load completed"));
let elapsed = init_started_at.elapsed();
init_done_rx.wait().await;
// initial logical sizes can now start, as they were waiting on init_done_rx.
scopeguard::ScopeGuard::into_inner(guard);
let init_done = std::time::Instant::now();
let elapsed = init_done - init_started_at;
tracing::info!(
elapsed_millis = elapsed.as_millis(),
"Initial load completed."
"Initial load completed"
);
let mut init_sizes_done = std::pin::pin!(init_logical_size_done_rx.wait());
let timeout = conf.background_task_maximum_delay;
let guard = scopeguard::guard_on_success((), |_| tracing::info!("Cancelled before initial logical sizes completed"));
let init_sizes_done = tokio::select! {
_ = &mut init_sizes_done => {
let now = std::time::Instant::now();
tracing::info!(
from_init_done_millis = (now - init_done).as_millis(),
from_init_millis = (now - init_started_at).as_millis(),
"Initial logical sizes completed"
);
None
}
_ = tokio::time::sleep(timeout) => {
tracing::info!(
timeout_millis = timeout.as_millis(),
"Initial logical size timeout elapsed; starting background jobs"
);
Some(init_sizes_done)
}
};
scopeguard::ScopeGuard::into_inner(guard);
// allow background jobs to start
drop(background_jobs_can_start);
if let Some(init_sizes_done) = init_sizes_done {
// ending up here is not a bug; at the latest logical sizes will be queried by
// consumption metrics.
let guard = scopeguard::guard_on_success((), |_| tracing::info!("Cancelled before initial logical sizes completed"));
init_sizes_done.await;
scopeguard::ScopeGuard::into_inner(guard);
let now = std::time::Instant::now();
tracing::info!(
from_init_done_millis = (now - init_done).as_millis(),
from_init_millis = (now - init_started_at).as_millis(),
"Initial logical sizes completed after timeout (background jobs already started)"
);
}
};
async move {
let mut drive_init = std::pin::pin!(drive_init);
// just race these tasks
tokio::select! {
_ = shutdown_pageserver.cancelled() => {},
_ = &mut drive_init => {},
}
}
});
@@ -378,7 +459,7 @@ fn start_pageserver(
conf,
remote_storage.clone(),
disk_usage_eviction_state.clone(),
init_done_rx.clone(),
background_jobs_barrier.clone(),
)?;
}
@@ -416,7 +497,7 @@ fn start_pageserver(
);
if let Some(metric_collection_endpoint) = &conf.metric_collection_endpoint {
let init_done_rx = init_done_rx;
let background_jobs_barrier = background_jobs_barrier;
let metrics_ctx = RequestContext::todo_child(
TaskKind::MetricsCollection,
// This task itself shouldn't download anything.
@@ -432,12 +513,17 @@ fn start_pageserver(
"consumption metrics collection",
true,
async move {
// first wait for initial load to complete before first iteration.
// first wait until background jobs are cleared to launch.
//
// this is because we only process active tenants and timelines, and the
// Timeline::get_current_logical_size will spawn the logical size calculation,
// which will not be rate-limited.
init_done_rx.wait().await;
let cancel = task_mgr::shutdown_token();
tokio::select! {
_ = cancel.cancelled() => { return Ok(()); },
_ = background_jobs_barrier.wait() => {}
};
pageserver::consumption_metrics::collect_metrics(
metric_collection_endpoint,
@@ -487,6 +573,8 @@ fn start_pageserver(
);
}
let mut shutdown_pageserver = Some(shutdown_pageserver.drop_guard());
// All started up! Now just sit and wait for shutdown signal.
ShutdownSignals::handle(|signal| match signal {
Signal::Quit => {
@@ -502,6 +590,11 @@ fn start_pageserver(
"Got {}. Terminating gracefully in fast shutdown mode",
signal.name()
);
// This cancels the `shutdown_pageserver` cancellation tree.
// Right now that tree doesn't reach very far, and `task_mgr` is used instead.
// The plan is to change that over time.
shutdown_pageserver.take();
BACKGROUND_RUNTIME.block_on(pageserver::shutdown_pageserver(0));
unreachable!()
}

View File

@@ -63,6 +63,7 @@ pub mod defaults {
pub const DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL: &str = "1 hour";
pub const DEFAULT_METRIC_COLLECTION_ENDPOINT: Option<reqwest::Url> = None;
pub const DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL: &str = "10 min";
pub const DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY: &str = "10s";
///
/// Default built-in configuration file.
@@ -91,9 +92,10 @@ pub mod defaults {
#cached_metric_collection_interval = '{DEFAULT_CACHED_METRIC_COLLECTION_INTERVAL}'
#synthetic_size_calculation_interval = '{DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL}'
#disk_usage_based_eviction = {{ max_usage_pct = .., min_avail_bytes = .., period = "10s"}}
#background_task_maximum_delay = '{DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY}'
# [tenant_config]
#checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes
#checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT}
@@ -187,6 +189,15 @@ pub struct PageServerConf {
pub test_remote_failures: u64,
pub ondemand_download_behavior_treat_error_as_warn: bool,
/// How long will background tasks be delayed at most after initial load of tenants.
///
/// Our largest initialization completions are in the range of 100-200s, so perhaps 10s works
/// as we now isolate initial loading, initial logical size calculation and background tasks.
/// Smaller nodes will have background tasks "not running" for this long unless every timeline
/// has it's initial logical size calculated. Not running background tasks for some seconds is
/// not terrible.
pub background_task_maximum_delay: Duration,
}
/// We do not want to store this in a PageServerConf because the latter may be logged
@@ -259,6 +270,8 @@ struct PageServerConfigBuilder {
test_remote_failures: BuilderValue<u64>,
ondemand_download_behavior_treat_error_as_warn: BuilderValue<bool>,
background_task_maximum_delay: BuilderValue<Duration>,
}
impl Default for PageServerConfigBuilder {
@@ -316,6 +329,11 @@ impl Default for PageServerConfigBuilder {
test_remote_failures: Set(0),
ondemand_download_behavior_treat_error_as_warn: Set(false),
background_task_maximum_delay: Set(humantime::parse_duration(
DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY,
)
.unwrap()),
}
}
}
@@ -440,6 +458,10 @@ impl PageServerConfigBuilder {
BuilderValue::Set(ondemand_download_behavior_treat_error_as_warn);
}
pub fn background_task_maximum_delay(&mut self, delay: Duration) {
self.background_task_maximum_delay = BuilderValue::Set(delay);
}
pub fn build(self) -> anyhow::Result<PageServerConf> {
let concurrent_tenant_size_logical_size_queries = self
.concurrent_tenant_size_logical_size_queries
@@ -522,6 +544,9 @@ impl PageServerConfigBuilder {
.ok_or(anyhow!(
"missing ondemand_download_behavior_treat_error_as_warn"
))?,
background_task_maximum_delay: self
.background_task_maximum_delay
.ok_or(anyhow!("missing background_task_maximum_delay"))?,
})
}
}
@@ -710,6 +735,7 @@ impl PageServerConf {
)
},
"ondemand_download_behavior_treat_error_as_warn" => builder.ondemand_download_behavior_treat_error_as_warn(parse_toml_bool(key, item)?),
"background_task_maximum_delay" => builder.background_task_maximum_delay(parse_toml_duration(key, item)?),
_ => bail!("unrecognized pageserver option '{key}'"),
}
}
@@ -877,6 +903,7 @@ impl PageServerConf {
disk_usage_based_eviction: None,
test_remote_failures: 0,
ondemand_download_behavior_treat_error_as_warn: false,
background_task_maximum_delay: Duration::ZERO,
}
}
}
@@ -1036,6 +1063,7 @@ metric_collection_endpoint = 'http://localhost:80/metrics'
synthetic_size_calculation_interval = '333 s'
log_format = 'json'
background_task_maximum_delay = '334 s'
"#;
@@ -1094,6 +1122,9 @@ log_format = 'json'
disk_usage_based_eviction: None,
test_remote_failures: 0,
ondemand_download_behavior_treat_error_as_warn: false,
background_task_maximum_delay: humantime::parse_duration(
defaults::DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY
)?,
},
"Correct defaults should be used when no config values are provided"
);
@@ -1148,6 +1179,7 @@ log_format = 'json'
disk_usage_based_eviction: None,
test_remote_failures: 0,
ondemand_download_behavior_treat_error_as_warn: false,
background_task_maximum_delay: Duration::from_secs(334),
},
"Should be able to parse all basic config values correctly"
);

View File

@@ -83,7 +83,7 @@ pub fn launch_disk_usage_global_eviction_task(
conf: &'static PageServerConf,
storage: GenericRemoteStorage,
state: Arc<State>,
init_done: completion::Barrier,
background_jobs_barrier: completion::Barrier,
) -> anyhow::Result<()> {
let Some(task_config) = &conf.disk_usage_based_eviction else {
info!("disk usage based eviction task not configured");
@@ -100,17 +100,16 @@ pub fn launch_disk_usage_global_eviction_task(
"disk usage based eviction",
false,
async move {
// wait until initial load is complete, because we cannot evict from loading tenants.
init_done.wait().await;
let cancel = task_mgr::shutdown_token();
disk_usage_eviction_task(
&state,
task_config,
storage,
&conf.tenants_path(),
task_mgr::shutdown_token(),
)
.await;
// wait until initial load is complete, because we cannot evict from loading tenants.
tokio::select! {
_ = cancel.cancelled() => { return Ok(()); },
_ = background_jobs_barrier.wait() => { }
};
disk_usage_eviction_task(&state, task_config, storage, &conf.tenants_path(), cancel)
.await;
info!("disk usage based eviction task finishing");
Ok(())
},

View File

@@ -928,12 +928,28 @@ components:
writing to the tenant's S3 state, so, DO NOT ATTACH the
tenant to any other pageserver, or we risk split-brain.
- `attached` means that the attach operation has completed,
maybe successfully, maybe not. Perform a health check at
the Postgres level to determine healthiness of the tenant.
successfully
- `failed` means that attach has failed. For reason check corresponding `reason` failed.
`failed` is the terminal state, retrying attach call wont resolve the issue.
For example this can be caused by s3 being unreachable. The retry may be implemented
with call to detach, though it would be better to not automate it and inspec failed state
manually before proceeding with a retry.
See the tenant `/attach` endpoint for more information.
type: string
enum: [ "maybe", "attached" ]
type: object
required:
- slug
- data
properties:
slug:
type: string
enum: [ "maybe", "attached", "failed" ]
data:
- type: object
properties:
reason:
type: string
TenantCreateRequest:
allOf:
- $ref: '#/components/schemas/TenantConfig'

View File

@@ -132,6 +132,29 @@ pub fn is_uninit_mark(path: &Path) -> bool {
}
}
/// During pageserver startup, we need to order operations not to exhaust tokio worker threads by
/// blocking.
///
/// The instances of this value exist only during startup, otherwise `None` is provided, meaning no
/// delaying is needed.
#[derive(Clone)]
pub struct InitializationOrder {
/// Each initial tenant load task carries this until completion.
pub initial_tenant_load: Option<utils::completion::Completion>,
/// Barrier for when we can start initial logical size calculations.
pub initial_logical_size_can_start: utils::completion::Barrier,
/// Each timeline owns a clone of this to be consumed on the initial logical size calculation
/// attempt. It is important to drop this once the attempt has completed.
pub initial_logical_size_attempt: utils::completion::Completion,
/// Barrier for when we can start any background jobs.
///
/// This can be broken up later on, but right now there is just one class of a background job.
pub background_jobs_can_start: utils::completion::Barrier,
}
#[cfg(test)]
mod backoff_defaults_tests {
use super::*;

View File

@@ -65,6 +65,7 @@ use crate::tenant::remote_timeline_client::PersistIndexPartWithDeletedFlagError;
use crate::tenant::storage_layer::DeltaLayer;
use crate::tenant::storage_layer::ImageLayer;
use crate::tenant::storage_layer::Layer;
use crate::InitializationOrder;
use crate::virtual_file::VirtualFile;
use crate::walredo::PostgresRedoManager;
@@ -510,6 +511,7 @@ impl Tenant {
local_metadata: Option<TimelineMetadata>,
ancestor: Option<Arc<Timeline>>,
first_save: bool,
init_order: Option<&InitializationOrder>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let tenant_id = self.tenant_id;
@@ -535,6 +537,7 @@ impl Tenant {
up_to_date_metadata,
ancestor.clone(),
remote_client,
init_order,
)?;
let timeline = UninitializedTimeline {
@@ -560,6 +563,7 @@ impl Tenant {
up_to_date_metadata,
ancestor.clone(),
None,
None,
)
.with_context(|| {
format!("creating broken timeline data for {tenant_id}/{timeline_id}")
@@ -858,6 +862,7 @@ impl Tenant {
local_metadata,
ancestor,
true,
None,
ctx,
)
.await
@@ -892,16 +897,13 @@ impl Tenant {
///
/// If the loading fails for some reason, the Tenant will go into Broken
/// state.
///
/// `init_done` is an optional channel used during initial load to delay background task
/// start. It is not used later.
#[instrument(skip_all, fields(tenant_id=%tenant_id))]
pub fn spawn_load(
conf: &'static PageServerConf,
tenant_id: TenantId,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
init_done: Option<(completion::Completion, completion::Barrier)>,
init_order: Option<InitializationOrder>,
ctx: &RequestContext,
) -> Arc<Tenant> {
debug_assert_current_span_has_tenant_id();
@@ -937,17 +939,17 @@ impl Tenant {
"initial tenant load",
false,
async move {
// keep the sender alive as long as we have the initial load ongoing; it will be
// None for loads spawned after init_tenant_mgr.
let (_tx, rx) = if let Some((tx, rx)) = init_done {
(Some(tx), Some(rx))
} else {
(None, None)
};
match tenant_clone.load(&ctx).await {
let mut init_order = init_order;
// take the completion because initial tenant loading will complete when all of
// these tasks complete.
let _completion = init_order.as_mut().and_then(|x| x.initial_tenant_load.take());
match tenant_clone.load(init_order.as_ref(), &ctx).await {
Ok(()) => {
debug!("load finished, activating");
tenant_clone.activate(broker_client, rx.as_ref(), &ctx);
let background_jobs_can_start = init_order.as_ref().map(|x| &x.background_jobs_can_start);
tenant_clone.activate(broker_client, background_jobs_can_start, &ctx);
}
Err(err) => {
error!("load failed, setting tenant state to Broken: {err:?}");
@@ -974,7 +976,11 @@ impl Tenant {
/// files on disk. Used at pageserver startup.
///
/// No background tasks are started as part of this routine.
async fn load(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
async fn load(
self: &Arc<Tenant>,
init_order: Option<&InitializationOrder>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_id();
debug!("loading tenant task");
@@ -1094,7 +1100,7 @@ impl Tenant {
// 1. "Timeline has no ancestor and no layer files"
for (timeline_id, local_metadata) in sorted_timelines {
self.load_local_timeline(timeline_id, local_metadata, ctx)
self.load_local_timeline(timeline_id, local_metadata, init_order, ctx)
.await
.with_context(|| format!("load local timeline {timeline_id}"))?;
}
@@ -1112,6 +1118,7 @@ impl Tenant {
&self,
timeline_id: TimelineId,
local_metadata: TimelineMetadata,
init_order: Option<&InitializationOrder>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_id();
@@ -1181,6 +1188,7 @@ impl Tenant {
Some(local_metadata),
ancestor,
false,
init_order,
ctx,
)
.await
@@ -1724,12 +1732,12 @@ impl Tenant {
/// Changes tenant status to active, unless shutdown was already requested.
///
/// `init_done` is an optional channel used during initial load to delay background task
/// start. It is not used later.
/// `background_jobs_can_start` is an optional barrier set to a value during pageserver startup
/// to delay background jobs. Background jobs can be started right away when None is given.
fn activate(
self: &Arc<Self>,
broker_client: BrokerClientChannel,
init_done: Option<&completion::Barrier>,
background_jobs_can_start: Option<&completion::Barrier>,
ctx: &RequestContext,
) {
debug_assert_current_span_has_tenant_id();
@@ -1762,12 +1770,12 @@ impl Tenant {
// Spawn gc and compaction loops. The loops will shut themselves
// down when they notice that the tenant is inactive.
tasks::start_background_loops(self, init_done);
tasks::start_background_loops(self, background_jobs_can_start);
let mut activated_timelines = 0;
for timeline in not_broken_timelines {
timeline.activate(broker_client.clone(), init_done, ctx);
timeline.activate(broker_client.clone(), background_jobs_can_start, ctx);
activated_timelines += 1;
}
@@ -2158,6 +2166,7 @@ impl Tenant {
new_metadata: &TimelineMetadata,
ancestor: Option<Arc<Timeline>>,
remote_client: Option<RemoteTimelineClient>,
init_order: Option<&InitializationOrder>,
) -> anyhow::Result<Arc<Timeline>> {
if let Some(ancestor_timeline_id) = new_metadata.ancestor_timeline() {
anyhow::ensure!(
@@ -2166,6 +2175,9 @@ impl Tenant {
)
}
let initial_logical_size_can_start = init_order.map(|x| &x.initial_logical_size_can_start);
let initial_logical_size_attempt = init_order.map(|x| &x.initial_logical_size_attempt);
let pg_version = new_metadata.pg_version();
Ok(Timeline::new(
self.conf,
@@ -2177,6 +2189,8 @@ impl Tenant {
Arc::clone(&self.walredo_mgr),
remote_client,
pg_version,
initial_logical_size_can_start.cloned(),
initial_logical_size_attempt.cloned(),
))
}
@@ -2852,7 +2866,7 @@ impl Tenant {
remote_client: Option<RemoteTimelineClient>,
) -> anyhow::Result<Arc<Timeline>> {
let timeline_data = self
.create_timeline_data(new_timeline_id, new_metadata, ancestor, remote_client)
.create_timeline_data(new_timeline_id, new_metadata, ancestor, remote_client, None)
.context("Failed to create timeline data structure")?;
crashsafe::create_dir_all(timeline_path).context("Failed to create timeline directory")?;
@@ -3420,7 +3434,7 @@ pub mod harness {
timelines_to_load.insert(timeline_id, timeline_metadata);
}
tenant
.load(ctx)
.load(None, ctx)
.instrument(info_span!("try_load", tenant_id=%self.tenant_id))
.await?;
tenant.state.send_replace(TenantState::Active);

View File

@@ -51,7 +51,9 @@ use crate::keyspace::KeyPartitioning;
use crate::repository::Key;
use crate::tenant::storage_layer::InMemoryLayer;
use crate::tenant::storage_layer::Layer;
use anyhow::Context;
use anyhow::Result;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::ops::Range;
use std::sync::Arc;
@@ -61,6 +63,8 @@ use historic_layer_coverage::BufferedHistoricLayerCoverage;
pub use historic_layer_coverage::Replacement;
use super::storage_layer::range_eq;
use super::storage_layer::PersistentLayerDesc;
use super::storage_layer::PersistentLayerKey;
///
/// LayerMap tracks what layers exist on a timeline.
@@ -86,11 +90,16 @@ pub struct LayerMap<L: ?Sized> {
pub frozen_layers: VecDeque<Arc<InMemoryLayer>>,
/// Index of the historic layers optimized for search
historic: BufferedHistoricLayerCoverage<Arc<L>>,
historic: BufferedHistoricLayerCoverage<Arc<PersistentLayerDesc>>,
/// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient.
/// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
l0_delta_layers: Vec<Arc<L>>,
l0_delta_layers: Vec<Arc<PersistentLayerDesc>>,
/// Mapping from persistent layer key to the actual layer object. Currently, it stores delta, image, and
/// remote layers. In future refactors, this will be eventually moved out of LayerMap into Timeline, and
/// RemoteLayer will be removed.
mapping: HashMap<PersistentLayerKey, Arc<L>>,
}
impl<L: ?Sized> Default for LayerMap<L> {
@@ -101,6 +110,7 @@ impl<L: ?Sized> Default for LayerMap<L> {
frozen_layers: VecDeque::default(),
l0_delta_layers: Vec::default(),
historic: BufferedHistoricLayerCoverage::default(),
mapping: HashMap::default(),
}
}
}
@@ -125,8 +135,9 @@ where
///
/// Insert an on-disk layer.
///
pub fn insert_historic(&mut self, layer: Arc<L>) {
self.layer_map.insert_historic_noflush(layer)
// TODO remove the `layer` argument when `mapping` is refactored out of `LayerMap`
pub fn insert_historic(&mut self, layer_desc: PersistentLayerDesc, layer: Arc<L>) {
self.layer_map.insert_historic_noflush(layer_desc, layer)
}
///
@@ -134,8 +145,8 @@ where
///
/// This should be called when the corresponding file on disk has been deleted.
///
pub fn remove_historic(&mut self, layer: Arc<L>) {
self.layer_map.remove_historic_noflush(layer)
pub fn remove_historic(&mut self, layer_desc: PersistentLayerDesc, layer: Arc<L>) {
self.layer_map.remove_historic_noflush(layer_desc, layer)
}
/// Replaces existing layer iff it is the `expected`.
@@ -150,12 +161,15 @@ where
/// that we can replace values only by updating a hashmap.
pub fn replace_historic(
&mut self,
expected_desc: PersistentLayerDesc,
expected: &Arc<L>,
new_desc: PersistentLayerDesc,
new: Arc<L>,
) -> anyhow::Result<Replacement<Arc<L>>> {
fail::fail_point!("layermap-replace-notfound", |_| Ok(Replacement::NotFound));
self.layer_map.replace_historic_noflush(expected, new)
self.layer_map
.replace_historic_noflush(expected_desc, expected, new_desc, new)
}
// We will flush on drop anyway, but this method makes it
@@ -230,6 +244,7 @@ where
(None, None) => None,
(None, Some(image)) => {
let lsn_floor = image.get_lsn_range().start;
let image = self.get_layer_from_mapping(&image.key()).clone();
Some(SearchResult {
layer: image,
lsn_floor,
@@ -237,6 +252,7 @@ where
}
(Some(delta), None) => {
let lsn_floor = delta.get_lsn_range().start;
let delta = self.get_layer_from_mapping(&delta.key()).clone();
Some(SearchResult {
layer: delta,
lsn_floor,
@@ -247,6 +263,7 @@ where
let image_is_newer = image.get_lsn_range().end >= delta.get_lsn_range().end;
let image_exact_match = img_lsn + 1 == end_lsn;
if image_is_newer || image_exact_match {
let image = self.get_layer_from_mapping(&image.key()).clone();
Some(SearchResult {
layer: image,
lsn_floor: img_lsn,
@@ -254,6 +271,7 @@ where
} else {
let lsn_floor =
std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1);
let delta = self.get_layer_from_mapping(&delta.key()).clone();
Some(SearchResult {
layer: delta,
lsn_floor,
@@ -273,16 +291,33 @@ where
///
/// Helper function for BatchedUpdates::insert_historic
///
pub(self) fn insert_historic_noflush(&mut self, layer: Arc<L>) {
/// TODO(chi): remove L generic so that we do not need to pass layer object.
pub(self) fn insert_historic_noflush(
&mut self,
layer_desc: PersistentLayerDesc,
layer: Arc<L>,
) {
self.mapping.insert(layer_desc.key(), layer.clone());
// TODO: See #3869, resulting #4088, attempted fix and repro #4094
self.historic.insert(
historic_layer_coverage::LayerKey::from(&*layer),
Arc::clone(&layer),
);
if Self::is_l0(&layer) {
self.l0_delta_layers.push(layer);
self.l0_delta_layers.push(layer_desc.clone().into());
}
self.historic.insert(
historic_layer_coverage::LayerKey::from(&*layer),
layer_desc.into(),
);
}
fn get_layer_from_mapping(&self, key: &PersistentLayerKey) -> &Arc<L> {
let layer = self
.mapping
.get(key)
.with_context(|| format!("{key:?}"))
.expect("inconsistent layer mapping");
layer
}
///
@@ -290,14 +325,16 @@ where
///
/// Helper function for BatchedUpdates::remove_historic
///
pub fn remove_historic_noflush(&mut self, layer: Arc<L>) {
pub fn remove_historic_noflush(&mut self, layer_desc: PersistentLayerDesc, layer: Arc<L>) {
self.historic
.remove(historic_layer_coverage::LayerKey::from(&*layer));
if Self::is_l0(&layer) {
let len_before = self.l0_delta_layers.len();
self.l0_delta_layers
.retain(|other| !Self::compare_arced_layers(other, &layer));
let mut l0_delta_layers = std::mem::take(&mut self.l0_delta_layers);
l0_delta_layers.retain(|other| {
!Self::compare_arced_layers(self.get_layer_from_mapping(&other.key()), &layer)
});
self.l0_delta_layers = l0_delta_layers;
// this assertion is related to use of Arc::ptr_eq in Self::compare_arced_layers,
// there's a chance that the comparison fails at runtime due to it comparing (pointer,
// vtable) pairs.
@@ -307,11 +344,14 @@ where
"failed to locate removed historic layer from l0_delta_layers"
);
}
self.mapping.remove(&layer_desc.key());
}
pub(self) fn replace_historic_noflush(
&mut self,
expected_desc: PersistentLayerDesc,
expected: &Arc<L>,
new_desc: PersistentLayerDesc,
new: Arc<L>,
) -> anyhow::Result<Replacement<Arc<L>>> {
let key = historic_layer_coverage::LayerKey::from(&**expected);
@@ -332,10 +372,9 @@ where
let l0_index = if expected_l0 {
// find the index in case replace worked, we need to replace that as well
let pos = self
.l0_delta_layers
.iter()
.position(|slot| Self::compare_arced_layers(slot, expected));
let pos = self.l0_delta_layers.iter().position(|slot| {
Self::compare_arced_layers(self.get_layer_from_mapping(&slot.key()), expected)
});
if pos.is_none() {
return Ok(Replacement::NotFound);
@@ -345,16 +384,28 @@ where
None
};
let replaced = self.historic.replace(&key, new.clone(), |existing| {
Self::compare_arced_layers(existing, expected)
let new_desc = Arc::new(new_desc);
let replaced = self.historic.replace(&key, new_desc.clone(), |existing| {
**existing == expected_desc
});
if let Replacement::Replaced { .. } = &replaced {
self.mapping.remove(&expected_desc.key());
self.mapping.insert(new_desc.key(), new);
if let Some(index) = l0_index {
self.l0_delta_layers[index] = new;
self.l0_delta_layers[index] = new_desc;
}
}
let replaced = match replaced {
Replacement::Replaced { in_buffered } => Replacement::Replaced { in_buffered },
Replacement::NotFound => Replacement::NotFound,
Replacement::RemovalBuffered => Replacement::RemovalBuffered,
Replacement::Unexpected(x) => {
Replacement::Unexpected(self.get_layer_from_mapping(&x.key()).clone())
}
};
Ok(replaced)
}
@@ -383,7 +434,7 @@ where
let start = key.start.to_i128();
let end = key.end.to_i128();
let layer_covers = |layer: Option<Arc<L>>| match layer {
let layer_covers = |layer: Option<Arc<PersistentLayerDesc>>| match layer {
Some(layer) => layer.get_lsn_range().start >= lsn.start,
None => false,
};
@@ -404,7 +455,9 @@ where
}
pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<L>> {
self.historic.iter()
self.historic
.iter()
.map(|x| self.get_layer_from_mapping(&x.key()).clone())
}
///
@@ -436,14 +489,24 @@ where
// Loop through the change events and push intervals
for (change_key, change_val) in version.image_coverage.range(start..end) {
let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
coverage.push((kr, current_val.take()));
coverage.push((
kr,
current_val
.take()
.map(|l| self.get_layer_from_mapping(&l.key()).clone()),
));
current_key = change_key;
current_val = change_val.clone();
}
// Add the final interval
let kr = Key::from_i128(current_key)..Key::from_i128(end);
coverage.push((kr, current_val.take()));
coverage.push((
kr,
current_val
.take()
.map(|l| self.get_layer_from_mapping(&l.key()).clone()),
));
Ok(coverage)
}
@@ -532,7 +595,9 @@ where
let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
let lr = lsn.start..val.get_lsn_range().start;
if !kr.is_empty() {
let base_count = Self::is_reimage_worthy(&val, key) as usize;
let base_count =
Self::is_reimage_worthy(self.get_layer_from_mapping(&val.key()), key)
as usize;
let new_limit = limit.map(|l| l - base_count);
let max_stacked_deltas_underneath =
self.count_deltas(&kr, &lr, new_limit)?;
@@ -555,7 +620,9 @@ where
let lr = lsn.start..val.get_lsn_range().start;
if !kr.is_empty() {
let base_count = Self::is_reimage_worthy(&val, key) as usize;
let base_count =
Self::is_reimage_worthy(self.get_layer_from_mapping(&val.key()), key)
as usize;
let new_limit = limit.map(|l| l - base_count);
let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit)?;
max_stacked_deltas = std::cmp::max(
@@ -706,7 +773,11 @@ where
/// Return all L0 delta layers
pub fn get_level0_deltas(&self) -> Result<Vec<Arc<L>>> {
Ok(self.l0_delta_layers.clone())
Ok(self
.l0_delta_layers
.iter()
.map(|x| self.get_layer_from_mapping(&x.key()).clone())
.collect())
}
/// debugging function to print out the contents of the layer map
@@ -809,12 +880,17 @@ mod tests {
let layer = LayerDescriptor::from(layer);
// same skeletan construction; see scenario below
let not_found: Arc<dyn Layer> = Arc::new(layer.clone());
let new_version: Arc<dyn Layer> = Arc::new(layer);
let not_found = Arc::new(layer.clone());
let new_version = Arc::new(layer);
let mut map = LayerMap::default();
let res = map.batch_update().replace_historic(&not_found, new_version);
let res = map.batch_update().replace_historic(
not_found.get_persistent_layer_desc(),
&not_found,
new_version.get_persistent_layer_desc(),
new_version,
);
assert!(matches!(res, Ok(Replacement::NotFound)), "{res:?}");
}
@@ -823,8 +899,8 @@ mod tests {
let name = LayerFileName::from_str(layer_name).unwrap();
let skeleton = LayerDescriptor::from(name);
let remote: Arc<dyn Layer> = Arc::new(skeleton.clone());
let downloaded: Arc<dyn Layer> = Arc::new(skeleton);
let remote = Arc::new(skeleton.clone());
let downloaded = Arc::new(skeleton);
let mut map = LayerMap::default();
@@ -834,12 +910,18 @@ mod tests {
let expected_in_counts = (1, usize::from(expected_l0));
map.batch_update().insert_historic(remote.clone());
map.batch_update()
.insert_historic(remote.get_persistent_layer_desc(), remote.clone());
assert_eq!(count_layer_in(&map, &remote), expected_in_counts);
let replaced = map
.batch_update()
.replace_historic(&remote, downloaded.clone())
.replace_historic(
remote.get_persistent_layer_desc(),
&remote,
downloaded.get_persistent_layer_desc(),
downloaded.clone(),
)
.expect("name derived attributes are the same");
assert!(
matches!(replaced, Replacement::Replaced { .. }),
@@ -847,11 +929,12 @@ mod tests {
);
assert_eq!(count_layer_in(&map, &downloaded), expected_in_counts);
map.batch_update().remove_historic(downloaded.clone());
map.batch_update()
.remove_historic(downloaded.get_persistent_layer_desc(), downloaded.clone());
assert_eq!(count_layer_in(&map, &downloaded), (0, 0));
}
fn count_layer_in(map: &LayerMap<dyn Layer>, layer: &Arc<dyn Layer>) -> (usize, usize) {
fn count_layer_in<L: Layer + ?Sized>(map: &LayerMap<L>, layer: &Arc<L>) -> (usize, usize) {
let historic = map
.iter_historic_layers()
.filter(|x| LayerMap::compare_arced_layers(x, layer))

View File

@@ -21,9 +21,8 @@ use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::TenantConfOpt;
use crate::tenant::{create_tenant_files, CreateTenantFilesMode, Tenant, TenantState};
use crate::IGNORED_TENANT_FILE_NAME;
use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME};
use utils::completion;
use utils::fs_ext::PathExt;
use utils::id::{TenantId, TimelineId};
@@ -65,7 +64,7 @@ pub async fn init_tenant_mgr(
conf: &'static PageServerConf,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
init_done: (completion::Completion, completion::Barrier),
init_order: InitializationOrder,
) -> anyhow::Result<()> {
// Scan local filesystem for attached tenants
let tenants_dir = conf.tenants_path();
@@ -122,7 +121,7 @@ pub async fn init_tenant_mgr(
&tenant_dir_path,
broker_client.clone(),
remote_storage.clone(),
Some(init_done.clone()),
Some(init_order.clone()),
&ctx,
) {
Ok(tenant) => {
@@ -153,14 +152,12 @@ pub async fn init_tenant_mgr(
Ok(())
}
/// `init_done` is an optional channel used during initial load to delay background task
/// start. It is not used later.
pub fn schedule_local_tenant_processing(
conf: &'static PageServerConf,
tenant_path: &Path,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
init_done: Option<(completion::Completion, completion::Barrier)>,
init_order: Option<InitializationOrder>,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
anyhow::ensure!(
@@ -219,7 +216,7 @@ pub fn schedule_local_tenant_processing(
tenant_id,
broker_client,
remote_storage,
init_done,
init_order,
ctx,
)
};

View File

@@ -38,7 +38,7 @@ pub use delta_layer::{DeltaLayer, DeltaLayerWriter};
pub use filename::{DeltaFileName, ImageFileName, LayerFileName};
pub use image_layer::{ImageLayer, ImageLayerWriter};
pub use inmemory_layer::InMemoryLayer;
pub use layer_desc::PersistentLayerDesc;
pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey};
pub use remote_layer::RemoteLayer;
use super::layer_map::BatchedUpdates;
@@ -454,7 +454,9 @@ pub trait PersistentLayer: Layer {
///
/// Should not change over the lifetime of the layer object because
/// current_physical_size is computed as the som of this value.
fn file_size(&self) -> u64;
fn file_size(&self) -> u64 {
self.layer_desc().file_size
}
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo;
@@ -483,6 +485,20 @@ pub struct LayerDescriptor {
pub short_id: String,
}
impl LayerDescriptor {
/// `LayerDescriptor` is only used for testing purpose so it does not matter whether it is image / delta,
/// and the tenant / timeline id does not matter.
pub fn get_persistent_layer_desc(&self) -> PersistentLayerDesc {
PersistentLayerDesc::new_delta(
TenantId::from_array([0; 16]),
TimelineId::from_array([0; 16]),
self.key.clone(),
self.lsn.clone(),
233,
)
}
}
impl Layer for LayerDescriptor {
fn get_key_range(&self) -> Range<Key> {
self.key.clone()

View File

@@ -182,8 +182,6 @@ pub struct DeltaLayer {
pub desc: PersistentLayerDesc,
pub file_size: u64,
access_stats: LayerAccessStats,
inner: RwLock<DeltaLayerInner>,
@@ -196,7 +194,7 @@ impl std::fmt::Debug for DeltaLayer {
f.debug_struct("DeltaLayer")
.field("key_range", &RangeDisplayDebug(&self.desc.key_range))
.field("lsn_range", &self.desc.lsn_range)
.field("file_size", &self.file_size)
.field("file_size", &self.desc.file_size)
.field("inner", &self.inner)
.finish()
}
@@ -439,10 +437,6 @@ impl PersistentLayer for DeltaLayer {
Ok(())
}
fn file_size(&self) -> u64 {
self.file_size
}
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
let layer_file_name = self.filename().file_name();
let lsn_range = self.get_lsn_range();
@@ -451,7 +445,7 @@ impl PersistentLayer for DeltaLayer {
HistoricLayerInfo::Delta {
layer_file_name,
layer_file_size: self.file_size,
layer_file_size: self.desc.file_size,
lsn_start: lsn_range.start,
lsn_end: lsn_range.end,
remote: false,
@@ -602,8 +596,8 @@ impl DeltaLayer {
timeline_id,
filename.key_range.clone(),
filename.lsn_range.clone(),
file_size,
),
file_size,
access_stats,
inner: RwLock::new(DeltaLayerInner {
loaded: false,
@@ -634,8 +628,8 @@ impl DeltaLayer {
summary.timeline_id,
summary.key_range,
summary.lsn_range,
metadata.len(),
),
file_size: metadata.len(),
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
inner: RwLock::new(DeltaLayerInner {
loaded: false,
@@ -803,8 +797,8 @@ impl DeltaLayerWriterInner {
self.timeline_id,
self.key_start..key_end,
self.lsn_range.clone(),
metadata.len(),
),
file_size: metadata.len(),
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
inner: RwLock::new(DeltaLayerInner {
loaded: false,

View File

@@ -109,8 +109,6 @@ pub struct ImageLayer {
// This entry contains an image of all pages as of this LSN, should be the same as desc.lsn
pub lsn: Lsn,
pub file_size: u64,
access_stats: LayerAccessStats,
inner: RwLock<ImageLayerInner>,
@@ -122,7 +120,7 @@ impl std::fmt::Debug for ImageLayer {
f.debug_struct("ImageLayer")
.field("key_range", &RangeDisplayDebug(&self.desc.key_range))
.field("file_size", &self.file_size)
.field("file_size", &self.desc.file_size)
.field("lsn", &self.lsn)
.field("inner", &self.inner)
.finish()
@@ -258,17 +256,13 @@ impl PersistentLayer for ImageLayer {
Ok(())
}
fn file_size(&self) -> u64 {
self.file_size
}
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
let layer_file_name = self.filename().file_name();
let lsn_range = self.get_lsn_range();
HistoricLayerInfo::Image {
layer_file_name,
layer_file_size: self.file_size,
layer_file_size: self.desc.file_size,
lsn_start: lsn_range.start,
remote: false,
access_stats: self.access_stats.as_api_model(reset),
@@ -411,9 +405,9 @@ impl ImageLayer {
filename.key_range.clone(),
filename.lsn,
false,
file_size,
), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
lsn: filename.lsn,
file_size,
access_stats,
inner: RwLock::new(ImageLayerInner {
loaded: false,
@@ -443,9 +437,9 @@ impl ImageLayer {
summary.key_range,
summary.lsn,
false,
metadata.len(),
), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
lsn: summary.lsn,
file_size: metadata.len(),
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
inner: RwLock::new(ImageLayerInner {
file: None,
@@ -578,14 +572,6 @@ impl ImageLayerWriterInner {
file.write_all(buf.as_ref())?;
}
let desc = PersistentLayerDesc::new_img(
self.tenant_id,
self.timeline_id,
self.key_range.clone(),
self.lsn,
self.is_incremental, // for now, image layer ALWAYS covers the full range
);
// Fill in the summary on blk 0
let summary = Summary {
magic: IMAGE_FILE_MAGIC,
@@ -604,6 +590,15 @@ impl ImageLayerWriterInner {
.metadata()
.context("get metadata to determine file size")?;
let desc = PersistentLayerDesc::new_img(
self.tenant_id,
self.timeline_id,
self.key_range.clone(),
self.lsn,
self.is_incremental, // for now, image layer ALWAYS covers the full range
metadata.len(),
);
// Note: Because we open the file in write-only mode, we cannot
// reuse the same VirtualFile for reading later. That's why we don't
// set inner.file here. The first read will have to re-open it.
@@ -611,7 +606,6 @@ impl ImageLayerWriterInner {
path_or_conf: PathOrConf::Conf(self.conf),
desc,
lsn: self.lsn,
file_size: metadata.len(),
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
inner: RwLock::new(ImageLayerInner {
loaded: false,

View File

@@ -1,10 +1,11 @@
use anyhow::Result;
use std::ops::Range;
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
};
use crate::repository::Key;
use crate::{context::RequestContext, repository::Key};
use super::{DeltaFileName, ImageFileName, LayerFileName};
@@ -24,9 +25,27 @@ pub struct PersistentLayerDesc {
/// always be equal to `is_delta`. If we land the partial image layer PR someday, image layer could also be
/// incremental.
pub is_incremental: bool,
/// File size
pub file_size: u64,
}
/// A unique identifier of a persistent layer within the context of one timeline.
#[derive(Debug, PartialEq, Eq, Clone, Hash)]
pub struct PersistentLayerKey {
pub key_range: Range<Key>,
pub lsn_range: Range<Lsn>,
pub is_delta: bool,
}
impl PersistentLayerDesc {
pub fn key(&self) -> PersistentLayerKey {
PersistentLayerKey {
key_range: self.key_range.clone(),
lsn_range: self.lsn_range.clone(),
is_delta: self.is_delta,
}
}
pub fn short_id(&self) -> String {
self.filename().file_name()
}
@@ -37,6 +56,7 @@ impl PersistentLayerDesc {
key_range: Range<Key>,
lsn: Lsn,
is_incremental: bool,
file_size: u64,
) -> Self {
Self {
tenant_id,
@@ -45,6 +65,7 @@ impl PersistentLayerDesc {
lsn_range: Self::image_layer_lsn_range(lsn),
is_delta: false,
is_incremental,
file_size,
}
}
@@ -53,6 +74,7 @@ impl PersistentLayerDesc {
timeline_id: TimelineId,
key_range: Range<Key>,
lsn_range: Range<Lsn>,
file_size: u64,
) -> Self {
Self {
tenant_id,
@@ -61,6 +83,7 @@ impl PersistentLayerDesc {
lsn_range,
is_delta: true,
is_incremental: true,
file_size,
}
}
@@ -106,4 +129,48 @@ impl PersistentLayerDesc {
self.image_file_name().into()
}
}
// TODO: remove this in the future once we refactor timeline APIs.
pub fn get_lsn_range(&self) -> Range<Lsn> {
self.lsn_range.clone()
}
pub fn get_key_range(&self) -> Range<Key> {
self.key_range.clone()
}
pub fn get_timeline_id(&self) -> TimelineId {
self.timeline_id
}
pub fn get_tenant_id(&self) -> TenantId {
self.tenant_id
}
pub fn is_incremental(&self) -> bool {
self.is_incremental
}
pub fn is_delta(&self) -> bool {
self.is_delta
}
pub fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
println!(
"----- layer for ten {} tli {} keys {}-{} lsn {}-{} ----",
self.tenant_id,
self.timeline_id,
self.key_range.start,
self.key_range.end,
self.lsn_range.start,
self.lsn_range.end
);
Ok(())
}
pub fn file_size(&self) -> u64 {
self.file_size
}
}

View File

@@ -142,10 +142,6 @@ impl PersistentLayer for RemoteLayer {
true
}
fn file_size(&self) -> u64 {
self.layer_metadata.file_size()
}
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
let layer_file_name = self.filename().file_name();
let lsn_range = self.get_lsn_range();
@@ -190,6 +186,7 @@ impl RemoteLayer {
fname.key_range.clone(),
fname.lsn,
false,
layer_metadata.file_size(),
),
layer_metadata: layer_metadata.clone(),
ongoing_download: Arc::new(tokio::sync::Semaphore::new(1)),
@@ -211,6 +208,7 @@ impl RemoteLayer {
timelineid,
fname.key_range.clone(),
fname.lsn_range.clone(),
layer_metadata.file_size(),
),
layer_metadata: layer_metadata.clone(),
ongoing_download: Arc::new(tokio::sync::Semaphore::new(1)),

View File

@@ -15,10 +15,10 @@ use tracing::*;
use utils::completion;
/// Start per tenant background loops: compaction and gc.
///
/// `init_done` is an optional channel used during initial load to delay background task
/// start. It is not used later.
pub fn start_background_loops(tenant: &Arc<Tenant>, init_done: Option<&completion::Barrier>) {
pub fn start_background_loops(
tenant: &Arc<Tenant>,
background_jobs_can_start: Option<&completion::Barrier>,
) {
let tenant_id = tenant.tenant_id;
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
@@ -29,10 +29,14 @@ pub fn start_background_loops(tenant: &Arc<Tenant>, init_done: Option<&completio
false,
{
let tenant = Arc::clone(tenant);
let init_done = init_done.cloned();
let background_jobs_can_start = background_jobs_can_start.cloned();
async move {
completion::Barrier::maybe_wait(init_done).await;
compaction_loop(tenant)
let cancel = task_mgr::shutdown_token();
tokio::select! {
_ = cancel.cancelled() => { return Ok(()) },
_ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
};
compaction_loop(tenant, cancel)
.instrument(info_span!("compaction_loop", tenant_id = %tenant_id))
.await;
Ok(())
@@ -48,10 +52,14 @@ pub fn start_background_loops(tenant: &Arc<Tenant>, init_done: Option<&completio
false,
{
let tenant = Arc::clone(tenant);
let init_done = init_done.cloned();
let background_jobs_can_start = background_jobs_can_start.cloned();
async move {
completion::Barrier::maybe_wait(init_done).await;
gc_loop(tenant)
let cancel = task_mgr::shutdown_token();
tokio::select! {
_ = cancel.cancelled() => { return Ok(()) },
_ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
};
gc_loop(tenant, cancel)
.instrument(info_span!("gc_loop", tenant_id = %tenant_id))
.await;
Ok(())
@@ -63,12 +71,11 @@ pub fn start_background_loops(tenant: &Arc<Tenant>, init_done: Option<&completio
///
/// Compaction task's main loop
///
async fn compaction_loop(tenant: Arc<Tenant>) {
async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
let wait_duration = Duration::from_secs(2);
info!("starting");
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
async {
let cancel = task_mgr::shutdown_token();
let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
let mut first = true;
loop {
@@ -133,12 +140,11 @@ async fn compaction_loop(tenant: Arc<Tenant>) {
///
/// GC task's main loop
///
async fn gc_loop(tenant: Arc<Tenant>) {
async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
let wait_duration = Duration::from_secs(2);
info!("starting");
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
async {
let cancel = task_mgr::shutdown_token();
// GC might require downloading, to find the cutoff LSN that corresponds to the
// cutoff specified as time.
let ctx =

View File

@@ -242,6 +242,13 @@ pub struct Timeline {
pub delete_lock: tokio::sync::Mutex<bool>,
eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
/// Barrier to wait before doing initial logical size calculation. Used only during startup.
initial_logical_size_can_start: Option<completion::Barrier>,
/// Completion shared between all timelines loaded during startup; used to delay heavier
/// background tasks until some logical sizes have been calculated.
initial_logical_size_attempt: Mutex<Option<completion::Completion>>,
}
/// Internal structure to hold all data needed for logical size calculation.
@@ -932,12 +939,12 @@ impl Timeline {
pub fn activate(
self: &Arc<Self>,
broker_client: BrokerClientChannel,
init_done: Option<&completion::Barrier>,
background_jobs_can_start: Option<&completion::Barrier>,
ctx: &RequestContext,
) {
self.launch_wal_receiver(ctx, broker_client);
self.set_state(TimelineState::Active);
self.launch_eviction_task(init_done);
self.launch_eviction_task(background_jobs_can_start);
}
pub fn set_state(&self, new_state: TimelineState) {
@@ -955,6 +962,14 @@ impl Timeline {
error!("Not activating a Stopping timeline");
}
(_, new_state) => {
if matches!(new_state, TimelineState::Stopping | TimelineState::Broken) {
// drop the copmletion guard, if any; it might be holding off the completion
// forever needlessly
self.initial_logical_size_attempt
.lock()
.unwrap_or_else(|e| e.into_inner())
.take();
}
self.state.send_replace(new_state);
}
}
@@ -1196,7 +1211,12 @@ impl Timeline {
),
});
let replaced = match batch_updates.replace_historic(local_layer, new_remote_layer)? {
let replaced = match batch_updates.replace_historic(
local_layer.layer_desc().clone(),
local_layer,
new_remote_layer.layer_desc().clone(),
new_remote_layer,
)? {
Replacement::Replaced { .. } => {
if let Err(e) = local_layer.delete_resident_layer_file() {
error!("failed to remove layer file on evict after replacement: {e:#?}");
@@ -1345,6 +1365,8 @@ impl Timeline {
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
remote_client: Option<RemoteTimelineClient>,
pg_version: u32,
initial_logical_size_can_start: Option<completion::Barrier>,
initial_logical_size_attempt: Option<completion::Completion>,
) -> Arc<Self> {
let disk_consistent_lsn = metadata.disk_consistent_lsn();
let (state, _) = watch::channel(TimelineState::Loading);
@@ -1439,6 +1461,9 @@ impl Timeline {
EvictionTaskTimelineState::default(),
),
delete_lock: tokio::sync::Mutex::new(false),
initial_logical_size_can_start,
initial_logical_size_attempt: Mutex::new(initial_logical_size_attempt),
};
result.repartition_threshold = result.get_checkpoint_distance() / 10;
result
@@ -1587,7 +1612,7 @@ impl Timeline {
trace!("found layer {}", layer.path().display());
total_physical_size += file_size;
updates.insert_historic(Arc::new(layer));
updates.insert_historic(layer.layer_desc().clone(), Arc::new(layer));
num_layers += 1;
} else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) {
// Create a DeltaLayer struct for each delta file.
@@ -1619,7 +1644,7 @@ impl Timeline {
trace!("found layer {}", layer.path().display());
total_physical_size += file_size;
updates.insert_historic(Arc::new(layer));
updates.insert_historic(layer.layer_desc().clone(), Arc::new(layer));
num_layers += 1;
} else if fname == METADATA_FILE_NAME || fname.ends_with(".old") {
// ignore these
@@ -1718,7 +1743,7 @@ impl Timeline {
anyhow::bail!("could not rename file {local_layer_path:?}: {err:?}");
} else {
self.metrics.resident_physical_size_gauge.sub(local_size);
updates.remove_historic(local_layer);
updates.remove_historic(local_layer.layer_desc().clone(), local_layer);
// fall-through to adding the remote layer
}
} else {
@@ -1757,7 +1782,7 @@ impl Timeline {
);
let remote_layer = Arc::new(remote_layer);
updates.insert_historic(remote_layer);
updates.insert_historic(remote_layer.layer_desc().clone(), remote_layer);
}
LayerFileName::Delta(deltafilename) => {
// Create a RemoteLayer for the delta file.
@@ -1784,7 +1809,7 @@ impl Timeline {
),
);
let remote_layer = Arc::new(remote_layer);
updates.insert_historic(remote_layer);
updates.insert_historic(remote_layer.layer_desc().clone(), remote_layer);
}
}
}
@@ -1927,7 +1952,27 @@ impl Timeline {
false,
// NB: don't log errors here, task_mgr will do that.
async move {
// no cancellation here, because nothing really waits for this to complete compared
let cancel = task_mgr::shutdown_token();
// in case we were created during pageserver initialization, wait for
// initialization to complete before proceeding. startup time init runs on the same
// runtime.
tokio::select! {
_ = cancel.cancelled() => { return Ok(()); },
_ = completion::Barrier::maybe_wait(self_clone.initial_logical_size_can_start.clone()) => {}
};
// hold off background tasks from starting until all timelines get to try at least
// once initial logical size calculation; though retry will rarely be useful.
// holding off is done because heavier tasks execute blockingly on the same
// runtime.
//
// dropping this at every outcome is probably better than trying to cling on to it,
// delay will be terminated by a timeout regardless.
let _completion = { self_clone.initial_logical_size_attempt.lock().expect("unexpected initial_logical_size_attempt poisoned").take() };
// no extra cancellation here, because nothing really waits for this to complete compared
// to spawn_ondemand_logical_size_calculation.
let cancel = CancellationToken::new();
@@ -2212,7 +2257,7 @@ impl Timeline {
// won't be needed for page reconstruction for this timeline,
// and mark what we can't delete yet as deleted from the layer
// map index without actually rebuilding the index.
updates.remove_historic(layer);
updates.remove_historic(layer.layer_desc().clone(), layer);
Ok(())
}
@@ -2922,7 +2967,7 @@ impl Timeline {
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
batch_updates.insert_historic(l);
batch_updates.insert_historic(l.layer_desc().clone(), l);
batch_updates.flush();
// update the timeline's physical size
@@ -3170,7 +3215,7 @@ impl Timeline {
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
updates.insert_historic(l);
updates.insert_historic(l.layer_desc().clone(), l);
}
updates.flush();
drop(layers);
@@ -3617,7 +3662,7 @@ impl Timeline {
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
updates.insert_historic(x);
updates.insert_historic(x.layer_desc().clone(), x);
}
// Now that we have reshuffled the data to set of new delta layers, we can
@@ -4152,7 +4197,7 @@ impl Timeline {
{
use crate::tenant::layer_map::Replacement;
let l: Arc<dyn PersistentLayer> = remote_layer.clone();
let failure = match updates.replace_historic(&l, new_layer) {
let failure = match updates.replace_historic(l.layer_desc().clone(), &l, new_layer.layer_desc().clone(), new_layer) {
Ok(Replacement::Replaced { .. }) => false,
Ok(Replacement::NotFound) => {
// TODO: the downloaded file should probably be removed, otherwise

View File

@@ -49,9 +49,12 @@ pub struct EvictionTaskTenantState {
}
impl Timeline {
pub(super) fn launch_eviction_task(self: &Arc<Self>, init_done: Option<&completion::Barrier>) {
pub(super) fn launch_eviction_task(
self: &Arc<Self>,
background_tasks_can_start: Option<&completion::Barrier>,
) {
let self_clone = Arc::clone(self);
let init_done = init_done.cloned();
let background_tasks_can_start = background_tasks_can_start.cloned();
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::Eviction,
@@ -60,8 +63,13 @@ impl Timeline {
&format!("layer eviction for {}/{}", self.tenant_id, self.timeline_id),
false,
async move {
completion::Barrier::maybe_wait(init_done).await;
self_clone.eviction_task(task_mgr::shutdown_token()).await;
let cancel = task_mgr::shutdown_token();
tokio::select! {
_ = cancel.cancelled() => { return Ok(()); }
_ = completion::Barrier::maybe_wait(background_tasks_can_start) => {}
};
self_clone.eviction_task(cancel).await;
info!("eviction task finishing");
Ok(())
},

View File

@@ -254,20 +254,20 @@ nwp_register_gucs(void)
DefineCustomIntVariable(
"neon.safekeeper_reconnect_timeout",
"Timeout for reconnecting to offline wal acceptor.",
"Walproposer reconnects to offline safekeepers once in this interval.",
NULL,
&wal_acceptor_reconnect_timeout,
1000, 0, INT_MAX, /* default, min, max */
5000, 0, INT_MAX, /* default, min, max */
PGC_SIGHUP, /* context */
GUC_UNIT_MS, /* flags */
NULL, NULL, NULL);
DefineCustomIntVariable(
"neon.safekeeper_connect_timeout",
"Timeout for connection establishement and it's maintenance against safekeeper",
"Connection or connection attempt to safekeeper is terminated if no message is received (or connection attempt doesn't finish) within this period.",
NULL,
&wal_acceptor_connection_timeout,
5000, 0, INT_MAX,
10000, 0, INT_MAX,
PGC_SIGHUP,
GUC_UNIT_MS,
NULL, NULL, NULL);
@@ -441,7 +441,7 @@ WalProposerPoll(void)
if (TimestampDifferenceExceeds(sk->latestMsgReceivedAt, now,
wal_acceptor_connection_timeout))
{
elog(WARNING, "failed to connect to node '%s:%s' in '%s' state: exceeded connection timeout %dms",
elog(WARNING, "terminating connection to safekeeper '%s:%s' in '%s' state: no messages received during the last %dms or connection attempt took longer than that",
sk->host, sk->port, FormatSafekeeperState(sk->state), wal_acceptor_connection_timeout);
ShutdownConnection(sk);
}
@@ -1035,9 +1035,16 @@ RecvAcceptorGreeting(Safekeeper *sk)
if (!AsyncReadMessage(sk, (AcceptorProposerMessage *) & sk->greetResponse))
return;
elog(LOG, "received AcceptorGreeting from safekeeper %s:%s", sk->host, sk->port);
/* Protocol is all good, move to voting. */
sk->state = SS_VOTING;
/*
* Note: it would be better to track the counter on per safekeeper basis,
* but at worst walproposer would restart with 'term rejected', so leave as
* is for now.
*/
++n_connected;
if (n_connected <= quorum)
{

View File

@@ -7,6 +7,7 @@ use std::fs::{self, File, OpenOptions};
use std::io::{Read, Write};
use std::ops::Deref;
use std::path::{Path, PathBuf};
use std::time::Instant;
use crate::control_file_upgrade::upgrade_control_file;
use crate::metrics::PERSIST_CONTROL_FILE_SECONDS;
@@ -28,6 +29,9 @@ pub const CHECKSUM_SIZE: usize = std::mem::size_of::<u32>();
pub trait Storage: Deref<Target = SafeKeeperState> {
/// Persist safekeeper state on disk and update internal state.
fn persist(&mut self, s: &SafeKeeperState) -> Result<()>;
/// Timestamp of last persist.
fn last_persist_at(&self) -> Instant;
}
#[derive(Debug)]
@@ -38,6 +42,8 @@ pub struct FileStorage {
/// Last state persisted to disk.
state: SafeKeeperState,
/// Not preserved across restarts.
last_persist_at: Instant,
}
impl FileStorage {
@@ -51,6 +57,7 @@ impl FileStorage {
timeline_dir,
conf: conf.clone(),
state,
last_persist_at: Instant::now(),
})
}
@@ -66,6 +73,7 @@ impl FileStorage {
timeline_dir,
conf: conf.clone(),
state,
last_persist_at: Instant::now(),
};
Ok(store)
@@ -216,6 +224,10 @@ impl Storage for FileStorage {
self.state = s.clone();
Ok(())
}
fn last_persist_at(&self) -> Instant {
self.last_persist_at
}
}
#[cfg(test)]

View File

@@ -17,6 +17,9 @@ pub fn thread_main(conf: SafeKeeperConf) {
let ttid = tli.ttid;
let _enter =
info_span!("", tenant = %ttid.tenant_id, timeline = %ttid.timeline_id).entered();
if let Err(e) = tli.maybe_pesist_control_file() {
warn!("failed to persist control file: {e}");
}
if let Err(e) = tli.remove_old_wal(conf.wal_backup_enabled) {
warn!("failed to remove WAL: {}", e);
}

View File

@@ -10,6 +10,7 @@ use std::cmp::max;
use std::cmp::min;
use std::fmt;
use std::io::Read;
use std::time::Duration;
use storage_broker::proto::SafekeeperTimelineInfo;
use tracing::*;
@@ -634,7 +635,8 @@ where
}
// system_id will be updated on mismatch
if self.state.server.system_id != msg.system_id {
// sync-safekeepers doesn't know sysid and sends 0, ignore it
if self.state.server.system_id != msg.system_id && msg.system_id != 0 {
if self.state.server.system_id != 0 {
warn!(
"unexpected system ID arrived, got {}, expected {}",
@@ -836,6 +838,26 @@ where
self.state.persist(&state)
}
/// Persist control file if there is something to save and enough time
/// passed after the last save.
pub fn maybe_persist_control_file(&mut self, inmem_remote_consistent_lsn: Lsn) -> Result<()> {
const CF_SAVE_INTERVAL: Duration = Duration::from_secs(300);
if self.state.last_persist_at().elapsed() < CF_SAVE_INTERVAL {
return Ok(());
}
let need_persist = self.inmem.commit_lsn > self.state.commit_lsn
|| self.inmem.backup_lsn > self.state.backup_lsn
|| self.inmem.peer_horizon_lsn > self.state.peer_horizon_lsn
|| inmem_remote_consistent_lsn > self.state.remote_consistent_lsn;
if need_persist {
let mut state = self.state.clone();
state.remote_consistent_lsn = inmem_remote_consistent_lsn;
self.persist_control_file(state)?;
trace!("saved control file: {CF_SAVE_INTERVAL:?} passed");
}
Ok(())
}
/// Handle request to append WAL.
#[allow(clippy::comparison_chain)]
fn handle_append_request(
@@ -948,9 +970,8 @@ where
if sync_control_file {
let mut state = self.state.clone();
// Note: we do not persist remote_consistent_lsn in other paths of
// persisting cf -- that is not much needed currently. We could do
// that by storing Arc to walsenders in Safekeeper.
// Note: we could make remote_consistent_lsn update in cf common by
// storing Arc to walsenders in Safekeeper.
state.remote_consistent_lsn = new_remote_consistent_lsn;
self.persist_control_file(state)?;
}
@@ -980,7 +1001,7 @@ mod tests {
use super::*;
use crate::wal_storage::Storage;
use std::ops::Deref;
use std::{ops::Deref, time::Instant};
// fake storage for tests
struct InMemoryState {
@@ -992,6 +1013,10 @@ mod tests {
self.persisted_state = s.clone();
Ok(())
}
fn last_persist_at(&self) -> Instant {
Instant::now()
}
}
impl Deref for InMemoryState {

View File

@@ -234,7 +234,6 @@ impl SharedState {
flush_lsn: self.sk.wal_store.flush_lsn().0,
// note: this value is not flushed to control file yet and can be lost
commit_lsn: self.sk.inmem.commit_lsn.0,
// TODO: rework feedbacks to avoid max here
remote_consistent_lsn: remote_consistent_lsn.0,
peer_horizon_lsn: self.sk.inmem.peer_horizon_lsn.0,
safekeeper_connstr: conf.listen_pg_addr.clone(),
@@ -673,6 +672,17 @@ impl Timeline {
Ok(())
}
/// Persist control file if there is something to save and enough time
/// passed after the last save. This helps to keep remote_consistent_lsn up
/// to date so that storage nodes restart doesn't cause many pageserver ->
/// safekeeper reconnections.
pub fn maybe_pesist_control_file(&self) -> Result<()> {
let remote_consistent_lsn = self.walsenders.get_remote_consistent_lsn();
self.write_shared_state()
.sk
.maybe_persist_control_file(remote_consistent_lsn)
}
/// Returns full timeline info, required for the metrics. If the timeline is
/// not active, returns None instead.
pub fn info_for_metrics(&self) -> Option<FullTimelineInfo> {

View File

@@ -379,6 +379,12 @@ impl Storage for PhysicalStorage {
);
}
// Quick exit if nothing to do to avoid writing up to 16 MiB of zeros on
// disk (this happens on each connect).
if end_pos == self.write_lsn {
return Ok(());
}
// Close previously opened file, if any
if let Some(mut unflushed_file) = self.file.take() {
self.fdatasync_file(&mut unflushed_file)?;

View File

@@ -1,5 +1,5 @@
import time
from typing import Optional
from typing import Any, Dict, Optional
from fixtures.log_helper import log
from fixtures.pageserver.http import PageserverHttpClient
@@ -72,7 +72,7 @@ def wait_until_tenant_state(
expected_state: str,
iterations: int,
period: float = 1.0,
) -> bool:
) -> Dict[str, Any]:
"""
Does not use `wait_until` for debugging purposes
"""
@@ -81,7 +81,7 @@ def wait_until_tenant_state(
tenant = pageserver_http.tenant_status(tenant_id=tenant_id)
log.debug(f"Tenant {tenant_id} data: {tenant}")
if tenant["state"]["slug"] == expected_state:
return True
return tenant
except Exception as e:
log.debug(f"Tenant {tenant_id} state retrieval failure: {e}")

View File

@@ -110,6 +110,12 @@ class EvictionEnv:
overrides=(
"--pageserver-config-override=disk_usage_based_eviction="
+ enc.dump_inline_table(disk_usage_config).replace("\n", " "),
# Disk usage based eviction runs as a background task.
# But pageserver startup delays launch of background tasks for some time, to prioritize initial logical size calculations during startup.
# But, initial logical size calculation may not be triggered if safekeepers don't publish new broker messages.
# But, we only have a 10-second-timeout in this test.
# So, disable the delay for this test.
"--pageserver-config-override=background_task_maximum_delay='0s'",
),
)

View File

@@ -147,7 +147,12 @@ def test_remote_storage_backup_and_restore(
# listing the remote timelines will fail because of the failpoint,
# and the tenant will be marked as Broken.
client.tenant_attach(tenant_id)
wait_until_tenant_state(pageserver_http, tenant_id, "Broken", 15)
tenant_info = wait_until_tenant_state(pageserver_http, tenant_id, "Broken", 15)
assert tenant_info["attachment_status"] == {
"slug": "failed",
"data": {"reason": "storage-sync-list-remote-timelines"},
}
# Ensure that even though the tenant is broken, we can't attach it again.
with pytest.raises(Exception, match=f"tenant {tenant_id} already exists, state: Broken"):

View File

@@ -532,7 +532,7 @@ def test_ignored_tenant_reattach(
):
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="test_remote_storage_backup_and_restore",
test_name="test_ignored_tenant_reattach",
)
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()