mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-01 07:20:37 +00:00
Compare commits
3 Commits
skyzh/comp
...
skyzh/laye
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
42b1368a0f | ||
|
|
631b815433 | ||
|
|
a3909e03f8 |
7
Cargo.lock
generated
7
Cargo.lock
generated
@@ -110,6 +110,12 @@ dependencies = [
|
||||
"backtrace",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "arc-swap"
|
||||
version = "1.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6"
|
||||
|
||||
[[package]]
|
||||
name = "archery"
|
||||
version = "0.5.0"
|
||||
@@ -2542,6 +2548,7 @@ name = "pageserver"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"arc-swap",
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
"byteorder",
|
||||
|
||||
@@ -32,6 +32,7 @@ license = "Apache-2.0"
|
||||
## All dependency versions, used in the project
|
||||
[workspace.dependencies]
|
||||
anyhow = { version = "1.0", features = ["backtrace"] }
|
||||
arc-swap = "1.6"
|
||||
async-stream = "0.3"
|
||||
async-trait = "0.1"
|
||||
atty = "0.2.14"
|
||||
|
||||
@@ -308,8 +308,7 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result<LocalEnv> {
|
||||
|
||||
let mut env =
|
||||
LocalEnv::parse_config(&toml_file).context("Failed to create neon configuration")?;
|
||||
let force = init_match.get_flag("force");
|
||||
env.init(pg_version, force)
|
||||
env.init(pg_version)
|
||||
.context("Failed to initialize neon repository")?;
|
||||
|
||||
// Initialize pageserver, create initial tenant and timeline.
|
||||
@@ -1014,13 +1013,6 @@ fn cli() -> Command {
|
||||
.help("If set, the node will be a hot replica on the specified timeline")
|
||||
.required(false);
|
||||
|
||||
let force_arg = Arg::new("force")
|
||||
.value_parser(value_parser!(bool))
|
||||
.long("force")
|
||||
.action(ArgAction::SetTrue)
|
||||
.help("Force initialization even if the repository is not empty")
|
||||
.required(false);
|
||||
|
||||
Command::new("Neon CLI")
|
||||
.arg_required_else_help(true)
|
||||
.version(GIT_VERSION)
|
||||
@@ -1036,7 +1028,6 @@ fn cli() -> Command {
|
||||
.value_name("config"),
|
||||
)
|
||||
.arg(pg_version_arg.clone())
|
||||
.arg(force_arg)
|
||||
)
|
||||
.subcommand(
|
||||
Command::new("timeline")
|
||||
|
||||
@@ -364,7 +364,7 @@ impl LocalEnv {
|
||||
//
|
||||
// Initialize a new Neon repository
|
||||
//
|
||||
pub fn init(&mut self, pg_version: u32, force: bool) -> anyhow::Result<()> {
|
||||
pub fn init(&mut self, pg_version: u32) -> anyhow::Result<()> {
|
||||
// check if config already exists
|
||||
let base_path = &self.base_data_dir;
|
||||
ensure!(
|
||||
@@ -372,29 +372,11 @@ impl LocalEnv {
|
||||
"repository base path is missing"
|
||||
);
|
||||
|
||||
if base_path.exists() {
|
||||
if force {
|
||||
println!("removing all contents of '{}'", base_path.display());
|
||||
// instead of directly calling `remove_dir_all`, we keep the original dir but removing
|
||||
// all contents inside. This helps if the developer symbol links another directory (i.e.,
|
||||
// S3 local SSD) to the `.neon` base directory.
|
||||
for entry in std::fs::read_dir(base_path)? {
|
||||
let entry = entry?;
|
||||
let path = entry.path();
|
||||
if path.is_dir() {
|
||||
fs::remove_dir_all(&path)?;
|
||||
} else {
|
||||
fs::remove_file(&path)?;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
bail!(
|
||||
"directory '{}' already exists. Perhaps already initialized? (Hint: use --force to remove all contents)",
|
||||
base_path.display()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
ensure!(
|
||||
!base_path.exists(),
|
||||
"directory '{}' already exists. Perhaps already initialized?",
|
||||
base_path.display()
|
||||
);
|
||||
if !self.pg_bin_dir(pg_version)?.join("postgres").exists() {
|
||||
bail!(
|
||||
"Can't find postgres binary at {}",
|
||||
@@ -410,7 +392,7 @@ impl LocalEnv {
|
||||
}
|
||||
}
|
||||
|
||||
fs::create_dir_all(base_path)?;
|
||||
fs::create_dir(base_path)?;
|
||||
|
||||
// Generate keypair for JWT.
|
||||
//
|
||||
|
||||
@@ -12,6 +12,7 @@ testing = ["fail/failpoints"]
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
arc-swap.workspace = true
|
||||
async-stream.workspace = true
|
||||
async-trait.workspace = true
|
||||
byteorder.workspace = true
|
||||
|
||||
@@ -117,8 +117,7 @@ pub fn main() -> Result<()> {
|
||||
|
||||
let mut lsn_diff = (lsn_end - lsn_start) as f32;
|
||||
let mut fill = Fill::None;
|
||||
let mut ymargin = 0.05 * lsn_diff; // Height-dependent margin to disambiguate overlapping deltas
|
||||
let xmargin = 0.05; // Height-dependent margin to disambiguate overlapping deltas
|
||||
let mut margin = 0.05 * lsn_diff; // Height-dependent margin to disambiguate overlapping deltas
|
||||
let mut lsn_offset = 0.0;
|
||||
|
||||
// Fill in and thicken rectangle if it's an
|
||||
@@ -129,7 +128,7 @@ pub fn main() -> Result<()> {
|
||||
num_images += 1;
|
||||
lsn_diff = 0.3;
|
||||
lsn_offset = -lsn_diff / 2.0;
|
||||
ymargin = 0.05;
|
||||
margin = 0.05;
|
||||
fill = Fill::Color(rgb(0, 0, 0));
|
||||
}
|
||||
Ordering::Greater => panic!("Invalid lsn range {}-{}", lsn_start, lsn_end),
|
||||
@@ -138,10 +137,10 @@ pub fn main() -> Result<()> {
|
||||
println!(
|
||||
" {}",
|
||||
rectangle(
|
||||
key_start as f32 + stretch * xmargin,
|
||||
stretch * (lsn_max as f32 - (lsn_end as f32 - ymargin - lsn_offset)),
|
||||
key_diff as f32 - stretch * 2.0 * xmargin,
|
||||
stretch * (lsn_diff - 2.0 * ymargin)
|
||||
key_start as f32 + stretch * margin,
|
||||
stretch * (lsn_max as f32 - (lsn_end as f32 - margin - lsn_offset)),
|
||||
key_diff as f32 - stretch * 2.0 * margin,
|
||||
stretch * (lsn_diff - 2.0 * margin)
|
||||
)
|
||||
.fill(fill)
|
||||
.stroke(Stroke::Color(rgb(0, 0, 0), 0.1))
|
||||
|
||||
@@ -53,33 +53,6 @@ pub enum StorageTimeOperation {
|
||||
CreateTenant,
|
||||
}
|
||||
|
||||
pub static NUM_TIERS: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
register_int_gauge_vec!(
|
||||
"pageserver_storage_tiers_num",
|
||||
"Number of sorted runs",
|
||||
&["tenant_id", "timeline_id"],
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub static NUM_COMPACTIONS: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
register_int_gauge_vec!(
|
||||
"pageserver_storage_compaction_num",
|
||||
"Number of ongoing compactions",
|
||||
&["tenant_id", "timeline_id"],
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub static STORAGE_PHYSICAL_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
register_int_gauge_vec!(
|
||||
"pageserver_storage_physical_size_sum",
|
||||
"Physical size of different types of storage files",
|
||||
&["type", "tenant_id", "timeline_id"],
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub static STORAGE_TIME_SUM_PER_TIMELINE: Lazy<CounterVec> = Lazy::new(|| {
|
||||
register_counter_vec!(
|
||||
"pageserver_storage_operations_seconds_sum",
|
||||
@@ -419,8 +392,6 @@ const STORAGE_IO_TIME_OPERATIONS: &[&str] = &[
|
||||
|
||||
const STORAGE_IO_SIZE_OPERATIONS: &[&str] = &["read", "write"];
|
||||
|
||||
pub const STORAGE_PHYSICAL_SIZE_FILE_TYPE: &[&str] = &["image", "delta", "partial-image"];
|
||||
|
||||
pub static STORAGE_IO_TIME: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
register_histogram_vec!(
|
||||
"pageserver_io_operations_seconds",
|
||||
@@ -802,8 +773,6 @@ pub struct TimelineMetrics {
|
||||
pub persistent_bytes_written: IntCounter,
|
||||
pub evictions: IntCounter,
|
||||
pub evictions_with_low_residence_duration: std::sync::RwLock<EvictionsWithLowResidenceDuration>,
|
||||
pub num_tiers: IntGauge,
|
||||
pub num_compactions: IntGauge,
|
||||
}
|
||||
|
||||
impl TimelineMetrics {
|
||||
@@ -869,12 +838,6 @@ impl TimelineMetrics {
|
||||
.unwrap();
|
||||
let evictions_with_low_residence_duration =
|
||||
evictions_with_low_residence_duration_builder.build(&tenant_id, &timeline_id);
|
||||
let num_tiers = NUM_TIERS
|
||||
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
|
||||
.unwrap();
|
||||
let num_compactions = NUM_COMPACTIONS
|
||||
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
|
||||
.unwrap();
|
||||
|
||||
TimelineMetrics {
|
||||
tenant_id,
|
||||
@@ -901,8 +864,6 @@ impl TimelineMetrics {
|
||||
evictions_with_low_residence_duration,
|
||||
),
|
||||
read_num_fs_layers,
|
||||
num_tiers,
|
||||
num_compactions,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -923,7 +884,6 @@ impl Drop for TimelineMetrics {
|
||||
let _ = PERSISTENT_BYTES_WRITTEN.remove_label_values(&[tenant_id, timeline_id]);
|
||||
let _ = EVICTIONS.remove_label_values(&[tenant_id, timeline_id]);
|
||||
let _ = READ_NUM_FS_LAYERS.remove_label_values(&[tenant_id, timeline_id]);
|
||||
let _ = STORAGE_PHYSICAL_SIZE.remove_label_values(&[tenant_id, timeline_id]);
|
||||
|
||||
self.evictions_with_low_residence_duration
|
||||
.write()
|
||||
@@ -946,9 +906,6 @@ impl Drop for TimelineMetrics {
|
||||
for op in SMGR_QUERY_TIME_OPERATIONS {
|
||||
let _ = SMGR_QUERY_TIME.remove_label_values(&[op, tenant_id, timeline_id]);
|
||||
}
|
||||
for ty in STORAGE_PHYSICAL_SIZE_FILE_TYPE {
|
||||
let _ = STORAGE_PHYSICAL_SIZE.remove_label_values(&[ty, tenant_id, timeline_id]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -85,8 +85,8 @@ pub mod blob_io;
|
||||
pub mod block_io;
|
||||
pub mod disk_btree;
|
||||
pub(crate) mod ephemeral_file;
|
||||
pub mod layer_cache;
|
||||
pub mod layer_map;
|
||||
pub mod layer_map_mgr;
|
||||
pub mod manifest;
|
||||
|
||||
pub mod metadata;
|
||||
@@ -1560,7 +1560,7 @@ impl Tenant {
|
||||
// No timeout here, GC & Compaction should be responsive to the
|
||||
// `TimelineState::Stopping` change.
|
||||
info!("waiting for layer_removal_cs.lock()");
|
||||
let layer_removal_guard = timeline.lcache.delete_guard_write().await;
|
||||
let layer_removal_guard = timeline.layer_removal_cs.lock().await;
|
||||
info!("got layer_removal_cs.lock(), deleting layer files");
|
||||
|
||||
// NB: storage_sync upload tasks that reference these layers have been cancelled
|
||||
|
||||
@@ -1,198 +0,0 @@
|
||||
use super::storage_layer::{PersistentLayer, PersistentLayerDesc, PersistentLayerKey, RemoteLayer};
|
||||
use super::Timeline;
|
||||
use crate::metrics::{STORAGE_PHYSICAL_SIZE, STORAGE_PHYSICAL_SIZE_FILE_TYPE};
|
||||
use crate::tenant::layer_map::{self, LayerMap};
|
||||
use anyhow::Result;
|
||||
use std::sync::{Mutex, Weak};
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
pub struct LayerCache {
|
||||
/// Layer removal lock.
|
||||
/// A lock to ensure that no layer of the timeline is removed concurrently by other tasks.
|
||||
/// This lock is acquired in [`Timeline::gc`], [`Timeline::compact`],
|
||||
/// and [`Tenant::delete_timeline`]. This is an `Arc<Mutex>` lock because we need an owned
|
||||
/// lock guard in functions that will be spawned to tokio I/O pool (which requires `'static`).
|
||||
pub layers_removal_lock: Arc<tokio::sync::RwLock<()>>,
|
||||
|
||||
/// We need this lock b/c we do not have any way to prevent GC/compaction from removing files in-use.
|
||||
/// We need to do reference counting on Arc to prevent this from happening, and we can safely remove this lock.
|
||||
pub layers_operation_lock: Arc<tokio::sync::RwLock<()>>,
|
||||
|
||||
/// Will be useful when we move evict / download to layer cache.
|
||||
#[allow(unused)]
|
||||
timeline: Weak<Timeline>,
|
||||
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
pub tenant_id_str: String,
|
||||
pub timeline_id_str: String,
|
||||
|
||||
mapping: Mutex<HashMap<PersistentLayerKey, Arc<dyn PersistentLayer>>>,
|
||||
}
|
||||
|
||||
pub struct LayerInUseWrite(tokio::sync::OwnedRwLockWriteGuard<()>);
|
||||
|
||||
pub struct LayerInUseRead(tokio::sync::OwnedRwLockReadGuard<()>);
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct DeleteGuardRead(Arc<tokio::sync::OwnedRwLockReadGuard<()>>);
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct DeleteGuardWrite(Arc<tokio::sync::OwnedRwLockWriteGuard<()>>);
|
||||
|
||||
impl LayerCache {
|
||||
pub fn new(timeline: Weak<Timeline>, tenant_id: TenantId, timeline_id: TimelineId) -> Self {
|
||||
Self {
|
||||
layers_operation_lock: Arc::new(tokio::sync::RwLock::new(())),
|
||||
layers_removal_lock: Arc::new(tokio::sync::RwLock::new(())),
|
||||
mapping: Mutex::new(HashMap::new()),
|
||||
timeline: timeline,
|
||||
tenant_id: tenant_id,
|
||||
timeline_id: timeline_id,
|
||||
tenant_id_str: tenant_id.to_string(),
|
||||
timeline_id_str: timeline_id.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc<dyn PersistentLayer> {
|
||||
let guard = self.mapping.lock().unwrap();
|
||||
guard.get(&desc.key()).expect("not found").clone()
|
||||
}
|
||||
|
||||
/// This function is to mock the original behavior of `layers` lock in `Timeline`. Can be removed after we ensure
|
||||
/// we won't delete files that are being read.
|
||||
pub async fn layer_in_use_write(&self) -> LayerInUseWrite {
|
||||
LayerInUseWrite(self.layers_operation_lock.clone().write_owned().await)
|
||||
}
|
||||
|
||||
/// This function is to mock the original behavior of `layers` lock in `Timeline`. Can be removed after we ensure
|
||||
/// we won't delete files that are being read.
|
||||
pub async fn layer_in_use_read(&self) -> LayerInUseRead {
|
||||
LayerInUseRead(self.layers_operation_lock.clone().read_owned().await)
|
||||
}
|
||||
|
||||
/// Ensures only one of compaction / gc can happen at a time.
|
||||
pub async fn delete_guard_read(&self) -> DeleteGuardRead {
|
||||
DeleteGuardRead(Arc::new(
|
||||
self.layers_removal_lock.clone().read_owned().await,
|
||||
))
|
||||
}
|
||||
|
||||
/// Ensures only one of compaction / gc can happen at a time.
|
||||
pub async fn delete_guard_write(&self) -> DeleteGuardWrite {
|
||||
DeleteGuardWrite(Arc::new(
|
||||
self.layers_removal_lock.clone().write_owned().await,
|
||||
))
|
||||
}
|
||||
|
||||
/// Should only be called when initializing the timeline. Bypass checks and layer operation lock.
|
||||
pub fn remove_local_when_init(&self, layer: Arc<dyn PersistentLayer>) {
|
||||
self.metrics_size_sub(&*layer);
|
||||
let mut guard = self.mapping.lock().unwrap();
|
||||
guard.remove(&layer.layer_desc().key());
|
||||
}
|
||||
|
||||
/// Should only be called when initializing the timeline. Bypass checks and layer operation lock.
|
||||
pub fn populate_remote_when_init(&self, layer: Arc<RemoteLayer>) {
|
||||
self.metrics_size_add(&*layer);
|
||||
let mut guard = self.mapping.lock().unwrap();
|
||||
guard.insert(layer.layer_desc().key(), layer);
|
||||
}
|
||||
|
||||
/// Should only be called when initializing the timeline. Bypass checks and layer operation lock.
|
||||
pub fn populate_local_when_init(&self, layer: Arc<dyn PersistentLayer>) {
|
||||
self.metrics_size_add(&*layer);
|
||||
let mut guard = self.mapping.lock().unwrap();
|
||||
guard.insert(layer.layer_desc().key(), layer);
|
||||
}
|
||||
|
||||
/// Called within read path.
|
||||
pub fn replace_and_verify(
|
||||
&self,
|
||||
expected: Arc<dyn PersistentLayer>,
|
||||
new: Arc<dyn PersistentLayer>,
|
||||
) -> Result<()> {
|
||||
let mut guard = self.mapping.lock().unwrap();
|
||||
|
||||
let key: PersistentLayerKey = expected.layer_desc().key();
|
||||
let other = new.layer_desc().key();
|
||||
|
||||
let expected_l0 = LayerMap::is_l0(expected.layer_desc());
|
||||
let new_l0 = LayerMap::is_l0(new.layer_desc());
|
||||
|
||||
fail::fail_point!("layermap-replace-notfound", |_| anyhow::bail!(
|
||||
"replacing downloaded layer into layermap failed because layer was not found"
|
||||
));
|
||||
|
||||
anyhow::ensure!(
|
||||
key == other,
|
||||
"replacing downloaded layer into layermap failed because two layers have different keys: {key:?} != {other:?}"
|
||||
);
|
||||
|
||||
anyhow::ensure!(
|
||||
expected_l0 == new_l0,
|
||||
"replacing downloaded layer into layermap failed because one layer is l0 while the other is not: {expected_l0} != {new_l0}"
|
||||
);
|
||||
|
||||
if let Some(layer) = guard.get_mut(&expected.layer_desc().key()) {
|
||||
anyhow::ensure!(
|
||||
layer_map::compare_arced_layers(&expected, layer),
|
||||
"replacing downloaded layer into layermap failed because another layer was found instead of expected, expected={expected:?}, new={new:?}",
|
||||
expected = Arc::as_ptr(&expected),
|
||||
new = Arc::as_ptr(layer),
|
||||
);
|
||||
*layer = new;
|
||||
Ok(())
|
||||
} else {
|
||||
anyhow::bail!(
|
||||
"replacing downloaded layer into layermap failed because layer was not found"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Called within write path. When compaction and image layer creation we will create new layers.
|
||||
pub fn create_new_layer(&self, layer: Arc<dyn PersistentLayer>) {
|
||||
self.metrics_size_add(&*layer);
|
||||
let mut guard = self.mapping.lock().unwrap();
|
||||
guard.insert(layer.layer_desc().key(), layer);
|
||||
}
|
||||
|
||||
/// Called within write path. When GC and compaction we will remove layers and delete them on disk.
|
||||
/// Will move logic to delete files here later.
|
||||
pub fn delete_layer(&self, layer: Arc<dyn PersistentLayer>) {
|
||||
self.metrics_size_sub(&*layer);
|
||||
let mut guard = self.mapping.lock().unwrap();
|
||||
guard.remove(&layer.layer_desc().key());
|
||||
}
|
||||
|
||||
fn metrics_size_add(&self, layer: &dyn PersistentLayer) {
|
||||
STORAGE_PHYSICAL_SIZE
|
||||
.with_label_values(&[
|
||||
Self::get_layer_type(layer),
|
||||
&self.tenant_id_str,
|
||||
&self.timeline_id_str,
|
||||
])
|
||||
.add(layer.file_size() as i64);
|
||||
}
|
||||
|
||||
fn metrics_size_sub(&self, layer: &dyn PersistentLayer) {
|
||||
STORAGE_PHYSICAL_SIZE
|
||||
.with_label_values(&[
|
||||
Self::get_layer_type(layer),
|
||||
&self.tenant_id_str,
|
||||
&self.timeline_id_str,
|
||||
])
|
||||
.sub(layer.file_size() as i64);
|
||||
}
|
||||
|
||||
fn get_layer_type(layer: &dyn PersistentLayer) -> &'static str {
|
||||
if layer.layer_desc().is_delta() {
|
||||
&STORAGE_PHYSICAL_SIZE_FILE_TYPE[1]
|
||||
} else if layer.layer_desc().is_incremental() {
|
||||
&STORAGE_PHYSICAL_SIZE_FILE_TYPE[2]
|
||||
} else {
|
||||
&STORAGE_PHYSICAL_SIZE_FILE_TYPE[0]
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -66,7 +66,7 @@ use super::storage_layer::PersistentLayerDesc;
|
||||
///
|
||||
/// LayerMap tracks what layers exist on a timeline.
|
||||
///
|
||||
#[derive(Default)]
|
||||
#[derive(Default, Clone)]
|
||||
pub struct LayerMap {
|
||||
//
|
||||
// 'open_layer' holds the current InMemoryLayer that is accepting new
|
||||
@@ -93,58 +93,6 @@ pub struct LayerMap {
|
||||
/// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient.
|
||||
/// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
|
||||
l0_delta_layers: Vec<Arc<PersistentLayerDesc>>,
|
||||
|
||||
/// All sorted runs. For tiered compaction.
|
||||
pub sorted_runs: SortedRuns,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct SortedRuns {
|
||||
pub runs: Vec<(usize, Vec<Arc<PersistentLayerDesc>>)>,
|
||||
next_tier_id: usize,
|
||||
}
|
||||
|
||||
impl SortedRuns {
|
||||
/// Create a new sorted run and insert it at the top of the LSM tree.
|
||||
pub fn create_new_run(&mut self, layers: Vec<Arc<PersistentLayerDesc>>) -> usize {
|
||||
let tier_id = self.next_tier_id();
|
||||
self.runs.insert(0, (tier_id, layers));
|
||||
tier_id
|
||||
}
|
||||
|
||||
/// Create a new sorted run and insert it at the bottom of the LSM tree.
|
||||
pub fn create_new_bottom_run(&mut self, layers: Vec<Arc<PersistentLayerDesc>>) -> usize {
|
||||
let tier_id = self.next_tier_id();
|
||||
self.runs.push((tier_id, layers));
|
||||
tier_id
|
||||
}
|
||||
|
||||
pub fn compute_tier_sizes(&self) -> Vec<(usize, u64)> {
|
||||
self.runs
|
||||
.iter()
|
||||
.map(|(tier_id, layers)| (*tier_id, layers.iter().map(|layer| layer.file_size()).sum()))
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
/// Remove a sorted run from the LSM tree.
|
||||
pub fn remove_run(&mut self, tier_id: usize) {
|
||||
self.runs.retain(|(id, _)| *id != tier_id);
|
||||
}
|
||||
|
||||
/// Remove layers and the corresponding sorted runs.
|
||||
pub fn insert_run_at(&mut self, idx: usize, layers: Vec<Arc<PersistentLayerDesc>>) {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
pub fn num_of_tiers(&self) -> usize {
|
||||
self.runs.len()
|
||||
}
|
||||
|
||||
pub fn next_tier_id(&mut self) -> usize {
|
||||
let ret = self.next_tier_id;
|
||||
self.next_tier_id += 1;
|
||||
ret
|
||||
}
|
||||
}
|
||||
|
||||
/// The primary update API for the layer map.
|
||||
@@ -166,28 +114,15 @@ impl BatchedUpdates<'_> {
|
||||
///
|
||||
// TODO remove the `layer` argument when `mapping` is refactored out of `LayerMap`
|
||||
pub fn insert_historic(&mut self, layer_desc: PersistentLayerDesc) {
|
||||
self.insert_historic_new(layer_desc) // insert into layer map without populating tiering structure
|
||||
}
|
||||
|
||||
pub fn insert_historic_new(&mut self, layer_desc: PersistentLayerDesc) {
|
||||
self.layer_map.insert_historic_noflush(layer_desc)
|
||||
}
|
||||
|
||||
/// Get a reference to the current sorted runs.
|
||||
pub fn sorted_runs(&mut self) -> &mut SortedRuns {
|
||||
&mut self.layer_map.sorted_runs
|
||||
}
|
||||
|
||||
///
|
||||
/// Remove an on-disk layer from the map.
|
||||
///
|
||||
/// This should be called when the corresponding file on disk has been deleted.
|
||||
///
|
||||
pub fn remove_historic(&mut self, layer_desc: PersistentLayerDesc) {
|
||||
self.remove_historic_new(layer_desc) // remove from layer map without populating tiering structure
|
||||
}
|
||||
|
||||
pub fn remove_historic_new(&mut self, layer_desc: PersistentLayerDesc) {
|
||||
self.layer_map.remove_historic_noflush(layer_desc)
|
||||
}
|
||||
|
||||
@@ -249,28 +184,14 @@ impl LayerMap {
|
||||
/// 'open' and 'frozen' layers!
|
||||
///
|
||||
pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult> {
|
||||
self.search_incremental(key, end_lsn, false)
|
||||
}
|
||||
|
||||
pub fn search_incremental(
|
||||
&self,
|
||||
key: Key,
|
||||
end_lsn: Lsn,
|
||||
exclude_image: bool,
|
||||
) -> Option<SearchResult> {
|
||||
let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?;
|
||||
let latest_delta = version.delta_coverage.query(key.to_i128());
|
||||
let latest_image = if exclude_image {
|
||||
let version = self.historic.get().unwrap().get_version(end_lsn.0 - 2)?;
|
||||
version.image_coverage.query(key.to_i128())
|
||||
} else {
|
||||
version.image_coverage.query(key.to_i128())
|
||||
};
|
||||
let latest_image = version.image_coverage.query(key.to_i128());
|
||||
|
||||
match (latest_delta, latest_image) {
|
||||
(None, None) => None,
|
||||
(None, Some(image)) => {
|
||||
let lsn_floor = image.get_lsn_range().end;
|
||||
let lsn_floor = image.get_lsn_range().start;
|
||||
Some(SearchResult {
|
||||
layer: image,
|
||||
lsn_floor,
|
||||
@@ -290,7 +211,7 @@ impl LayerMap {
|
||||
if image_is_newer || image_exact_match {
|
||||
Some(SearchResult {
|
||||
layer: image,
|
||||
lsn_floor: img_lsn + 1,
|
||||
lsn_floor: img_lsn,
|
||||
})
|
||||
} else {
|
||||
let lsn_floor =
|
||||
@@ -719,19 +640,10 @@ impl LayerMap {
|
||||
frozen_layer.dump(verbose, ctx)?;
|
||||
}
|
||||
|
||||
println!("l0_deltas:");
|
||||
for layer in &self.l0_delta_layers {
|
||||
println!("historic_layers:");
|
||||
for layer in self.iter_historic_layers() {
|
||||
layer.dump(verbose, ctx)?;
|
||||
}
|
||||
|
||||
println!("sorted_runs:");
|
||||
for (lvl, (tier_id, layer)) in self.sorted_runs.runs.iter().enumerate() {
|
||||
println!("tier {}", tier_id);
|
||||
for layer in layer {
|
||||
layer.dump(verbose, ctx)?;
|
||||
}
|
||||
}
|
||||
|
||||
println!("End dump LayerMap");
|
||||
Ok(())
|
||||
}
|
||||
@@ -774,12 +686,14 @@ mod tests {
|
||||
|
||||
mod l0_delta_layers_updated {
|
||||
|
||||
use crate::tenant::storage_layer::{PersistentLayer, PersistentLayerDesc};
|
||||
use crate::tenant::{
|
||||
storage_layer::{PersistentLayer, PersistentLayerDesc},
|
||||
timeline::LayerMapping,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn for_full_range_delta() {
|
||||
// l0_delta_layers are used by compaction, and should observe all buffered updates
|
||||
l0_delta_layers_updated_scenario(
|
||||
@@ -789,7 +703,6 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn for_non_full_range_delta() {
|
||||
// has minimal uncovered areas compared to l0_delta_layers_updated_on_insert_replace_remove_for_full_range_delta
|
||||
l0_delta_layers_updated_scenario(
|
||||
@@ -800,7 +713,6 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn for_image() {
|
||||
l0_delta_layers_updated_scenario(
|
||||
"000000000000000000000000000000000000-000000000000000000000000000000010000__0000000053424D69",
|
||||
@@ -810,6 +722,31 @@ mod tests {
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn replacing_missing_l0_is_notfound() {
|
||||
// original impl had an oversight, and L0 was an anyhow::Error. anyhow::Error should
|
||||
// however only happen for precondition failures.
|
||||
|
||||
let layer = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000053423C21-0000000053424D69";
|
||||
let layer = LayerFileName::from_str(layer).unwrap();
|
||||
let layer = LayerDescriptor::from(layer);
|
||||
|
||||
// same skeletan construction; see scenario below
|
||||
let not_found = Arc::new(layer.clone());
|
||||
let new_version = Arc::new(layer);
|
||||
|
||||
// after the immutable storage state refactor, the replace operation
|
||||
// will not use layer map any more. We keep it here for consistency in test cases
|
||||
// and can remove it in the future.
|
||||
let _map = LayerMap::default();
|
||||
|
||||
let mut mapping = LayerMapping::new();
|
||||
|
||||
mapping
|
||||
.replace_and_verify(not_found, new_version)
|
||||
.unwrap_err();
|
||||
}
|
||||
|
||||
fn l0_delta_layers_updated_scenario(layer_name: &str, expected_l0: bool) {
|
||||
let name = LayerFileName::from_str(layer_name).unwrap();
|
||||
let skeleton = LayerDescriptor::from(name);
|
||||
@@ -818,6 +755,7 @@ mod tests {
|
||||
let downloaded = Arc::new(skeleton);
|
||||
|
||||
let mut map = LayerMap::default();
|
||||
let mut mapping = LayerMapping::new();
|
||||
|
||||
// two disjoint Arcs in different lifecycle phases. even if it seems they must be the
|
||||
// same layer, we use LayerMap::compare_arced_layers as the identity of layers.
|
||||
@@ -827,11 +765,20 @@ mod tests {
|
||||
|
||||
map.batch_update()
|
||||
.insert_historic(remote.layer_desc().clone());
|
||||
mapping.insert(remote.clone());
|
||||
assert_eq!(
|
||||
count_layer_in(&map, remote.layer_desc()),
|
||||
expected_in_counts
|
||||
);
|
||||
|
||||
mapping
|
||||
.replace_and_verify(remote, downloaded.clone())
|
||||
.expect("name derived attributes are the same");
|
||||
assert_eq!(
|
||||
count_layer_in(&map, downloaded.layer_desc()),
|
||||
expected_in_counts
|
||||
);
|
||||
|
||||
map.batch_update()
|
||||
.remove_historic(downloaded.layer_desc().clone());
|
||||
assert_eq!(count_layer_in(&map, downloaded.layer_desc()), (0, 0));
|
||||
|
||||
@@ -43,6 +43,18 @@ impl Ord for LayerKey {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, L: crate::tenant::storage_layer::Layer + ?Sized> From<&'a L> for LayerKey {
|
||||
fn from(layer: &'a L) -> Self {
|
||||
let kr = layer.get_key_range();
|
||||
let lr = layer.get_lsn_range();
|
||||
LayerKey {
|
||||
key: kr.start.to_i128()..kr.end.to_i128(),
|
||||
lsn: lr.start.0..lr.end.0,
|
||||
is_image: !layer.is_incremental(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&PersistentLayerDesc> for LayerKey {
|
||||
fn from(layer: &PersistentLayerDesc) -> Self {
|
||||
let kr = layer.get_key_range();
|
||||
@@ -50,7 +62,7 @@ impl From<&PersistentLayerDesc> for LayerKey {
|
||||
LayerKey {
|
||||
key: kr.start.to_i128()..kr.end.to_i128(),
|
||||
lsn: lr.start.0..lr.end.0,
|
||||
is_image: !layer.is_delta,
|
||||
is_image: !layer.is_incremental(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -60,6 +72,7 @@ impl From<&PersistentLayerDesc> for LayerKey {
|
||||
/// Allows answering layer map queries very efficiently,
|
||||
/// but doesn't allow retroactive insertion, which is
|
||||
/// sometimes necessary. See BufferedHistoricLayerCoverage.
|
||||
#[derive(Clone)]
|
||||
pub struct HistoricLayerCoverage<Value> {
|
||||
/// The latest state
|
||||
head: LayerCoverageTuple<Value>,
|
||||
@@ -413,6 +426,7 @@ fn test_persistent_overlapping() {
|
||||
///
|
||||
/// See this for more on persistent and retroactive techniques:
|
||||
/// https://www.youtube.com/watch?v=WqCWghETNDc&t=581s
|
||||
#[derive(Clone)]
|
||||
pub struct BufferedHistoricLayerCoverage<Value> {
|
||||
/// A persistent layer map that we rebuild when we need to retroactively update
|
||||
historic_coverage: HistoricLayerCoverage<Value>,
|
||||
|
||||
@@ -15,6 +15,7 @@ use rpds::RedBlackTreeMapSync;
|
||||
///
|
||||
/// NOTE The struct is parameterized over Value for easier
|
||||
/// testing, but in practice it's some sort of layer.
|
||||
#[derive(Clone)]
|
||||
pub struct LayerCoverage<Value> {
|
||||
/// For every change in coverage (as we sweep the key space)
|
||||
/// we store (lsn.end, value).
|
||||
@@ -139,6 +140,7 @@ impl<Value: Clone> LayerCoverage<Value> {
|
||||
}
|
||||
|
||||
/// Image and delta coverage at a specific LSN.
|
||||
#[derive(Clone)]
|
||||
pub struct LayerCoverageTuple<Value> {
|
||||
pub image_coverage: LayerCoverage<Value>,
|
||||
pub delta_coverage: LayerCoverage<Value>,
|
||||
|
||||
269
pageserver/src/tenant/layer_map_mgr.rs
Normal file
269
pageserver/src/tenant/layer_map_mgr.rs
Normal file
@@ -0,0 +1,269 @@
|
||||
//! This module implements `LayerMapMgr`, which manages a layer map object and provides lock-free access to the state.
|
||||
//!
|
||||
//! A common usage pattern is as follows:
|
||||
//!
|
||||
//! ```ignore
|
||||
//! async fn compaction(&self) {
|
||||
//! // Get the current state.
|
||||
//! let state = self.layer_map_mgr.read();
|
||||
//! // No lock held at this point. Do compaction based on the state. This part usually incurs I/O operations and may
|
||||
//! // take a long time.
|
||||
//! let compaction_result = self.do_compaction(&state).await?;
|
||||
//! // Update the state.
|
||||
//! self.layer_map_mgr.update(|mut state| async move {
|
||||
//! // do updates to the state, return it.
|
||||
//! Ok(state)
|
||||
//! }).await?;
|
||||
//! }
|
||||
//! ```
|
||||
use anyhow::Result;
|
||||
use arc_swap::ArcSwap;
|
||||
use futures::Future;
|
||||
use std::{cmp::Reverse, collections::BinaryHeap, sync::Arc};
|
||||
|
||||
use super::layer_map::LayerMap;
|
||||
|
||||
/// Manages the storage state. Provide utility functions to modify the layer map and get an immutable reference to the
|
||||
/// layer map.
|
||||
pub struct LayerMapMgr {
|
||||
layer_map: ArcSwap<LayerMapGuard>,
|
||||
state_lock: tokio::sync::Mutex<()>,
|
||||
watermark: Arc<LayerMgrWatermark>,
|
||||
}
|
||||
|
||||
/// A guard that holds a version of the layer map. When dropped, the version is released and the watermark will be updated.
|
||||
#[derive(Clone)]
|
||||
pub struct LayerMapGuard {
|
||||
version: u64,
|
||||
layer_map: LayerMap,
|
||||
watermark: Arc<LayerMgrWatermark>,
|
||||
}
|
||||
|
||||
impl std::ops::Deref for LayerMapGuard {
|
||||
type Target = LayerMap;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.layer_map
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for LayerMapGuard {
|
||||
fn drop(&mut self) {
|
||||
self.watermark.release(self.version);
|
||||
}
|
||||
}
|
||||
|
||||
impl LayerMapMgr {
|
||||
/// Get the current state of the layer map.
|
||||
pub fn read(&self) -> Arc<LayerMapGuard> {
|
||||
// TODO: it is possible to use `load` to reduce the overhead of cloning the Arc, but read path usually involves
|
||||
// disk reads and layer mapping fetching, and therefore it's not a big deal to use a more optimized version
|
||||
// here.
|
||||
self.layer_map.load_full()
|
||||
}
|
||||
|
||||
/// Clone the layer map for modification.
|
||||
fn clone_for_write(
|
||||
&self,
|
||||
_state_lock_witness: &tokio::sync::MutexGuard<'_, ()>,
|
||||
) -> LayerMapGuard {
|
||||
(**self.layer_map.load()).clone()
|
||||
}
|
||||
|
||||
pub fn new(layer_map: LayerMap) -> Self {
|
||||
const INITIAL_VERSION: u64 = 0;
|
||||
let watermark = Arc::new(LayerMgrWatermark::new(INITIAL_VERSION));
|
||||
Self {
|
||||
layer_map: ArcSwap::new(Arc::new(LayerMapGuard {
|
||||
version: INITIAL_VERSION,
|
||||
layer_map,
|
||||
watermark: watermark.clone(),
|
||||
})),
|
||||
watermark,
|
||||
state_lock: tokio::sync::Mutex::new(()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Update the layer map.
|
||||
pub async fn update<O, F>(&self, operation: O) -> Result<()>
|
||||
where
|
||||
O: FnOnce(LayerMap) -> F,
|
||||
F: Future<Output = Result<LayerMap>>,
|
||||
{
|
||||
let state_lock = self.state_lock.lock().await;
|
||||
let mut guard = self.clone_for_write(&state_lock);
|
||||
guard.version += 1;
|
||||
let layer_map = std::mem::take(&mut guard.layer_map);
|
||||
guard.layer_map = operation(layer_map).await?;
|
||||
self.layer_map.store(Arc::new(guard));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn lowest_version_in_use(&self) -> u64 {
|
||||
self.watermark.lowest_version_in_use()
|
||||
}
|
||||
}
|
||||
|
||||
struct LayerMgrWatermarkCore {
|
||||
lowest_version_in_use: u64,
|
||||
versions_in_use: BinaryHeap<Reverse<u64>>,
|
||||
}
|
||||
|
||||
/// Computes the lowest version used by any read thread. Once a version is not used any more,
|
||||
/// we can remove all layers that are intended to be removed in that version.
|
||||
struct LayerMgrWatermark {
|
||||
core: std::sync::Mutex<LayerMgrWatermarkCore>,
|
||||
}
|
||||
|
||||
impl LayerMgrWatermark {
|
||||
fn new(initial_version: u64) -> Self {
|
||||
Self {
|
||||
core: std::sync::Mutex::new(LayerMgrWatermarkCore {
|
||||
lowest_version_in_use: initial_version,
|
||||
versions_in_use: BinaryHeap::new(),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
fn lowest_version_in_use(&self) -> u64 {
|
||||
self.core.lock().unwrap().lowest_version_in_use
|
||||
}
|
||||
|
||||
fn release(&self, version: u64) {
|
||||
let mut core = self.core.lock().unwrap();
|
||||
match version.cmp(&core.lowest_version_in_use) {
|
||||
std::cmp::Ordering::Less => {
|
||||
if cfg!(debug_assertions) {
|
||||
// TODO(chi): this panic might not be correctly handled by the panic handler
|
||||
// given this function is called in a drop handler. We can move it to a separate
|
||||
// thread if necessary.
|
||||
panic!("release a version lower than the lowest version in use.")
|
||||
}
|
||||
}
|
||||
std::cmp::Ordering::Equal => {
|
||||
// Find the next version in use.
|
||||
let mut current_version = version + 1;
|
||||
while let Some(Reverse(next_version)) = core.versions_in_use.peek() {
|
||||
if *next_version == current_version {
|
||||
current_version += 1;
|
||||
core.versions_in_use.pop();
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
core.lowest_version_in_use = current_version;
|
||||
}
|
||||
std::cmp::Ordering::Greater => {
|
||||
// This version is in use. Add it to the heap.
|
||||
core.versions_in_use.push(Reverse(version));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use utils::{
|
||||
id::{TenantId, TimelineId},
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
use crate::{repository::Key, tenant::storage_layer::PersistentLayerDesc};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_layer_map_manage() -> Result<()> {
|
||||
let mgr = LayerMapMgr::new(Default::default());
|
||||
mgr.update(|mut map| async move {
|
||||
let mut updates = map.batch_update();
|
||||
updates.insert_historic(PersistentLayerDesc::new_img(
|
||||
TenantId::generate(),
|
||||
TimelineId::generate(),
|
||||
Key::from_i128(0)..Key::from_i128(1),
|
||||
Lsn(0),
|
||||
false,
|
||||
0,
|
||||
));
|
||||
updates.flush();
|
||||
Ok(map)
|
||||
})
|
||||
.await?;
|
||||
|
||||
let ref_1 = mgr.read();
|
||||
|
||||
mgr.update(|mut map| async move {
|
||||
let mut updates = map.batch_update();
|
||||
updates.insert_historic(PersistentLayerDesc::new_img(
|
||||
TenantId::generate(),
|
||||
TimelineId::generate(),
|
||||
Key::from_i128(1)..Key::from_i128(2),
|
||||
Lsn(0),
|
||||
false,
|
||||
0,
|
||||
));
|
||||
updates.flush();
|
||||
Ok(map)
|
||||
})
|
||||
.await?;
|
||||
|
||||
let ref_2 = mgr.read();
|
||||
|
||||
// Modification should not be visible to the old reference.
|
||||
assert_eq!(
|
||||
ref_1
|
||||
.search(Key::from_i128(0), Lsn(1))
|
||||
.unwrap()
|
||||
.layer
|
||||
.key_range,
|
||||
Key::from_i128(0)..Key::from_i128(1)
|
||||
);
|
||||
assert!(ref_1.search(Key::from_i128(1), Lsn(1)).is_none());
|
||||
|
||||
// Modification should be visible to the new reference.
|
||||
assert_eq!(
|
||||
ref_2
|
||||
.search(Key::from_i128(0), Lsn(1))
|
||||
.unwrap()
|
||||
.layer
|
||||
.key_range,
|
||||
Key::from_i128(0)..Key::from_i128(1)
|
||||
);
|
||||
assert_eq!(
|
||||
ref_2
|
||||
.search(Key::from_i128(1), Lsn(1))
|
||||
.unwrap()
|
||||
.layer
|
||||
.key_range,
|
||||
Key::from_i128(1)..Key::from_i128(2)
|
||||
);
|
||||
|
||||
assert_eq!(mgr.lowest_version_in_use(), 1);
|
||||
drop(ref_1);
|
||||
drop(ref_2);
|
||||
assert_eq!(mgr.lowest_version_in_use(), 2);
|
||||
mgr.update(|map| async move { Ok(map) }).await?;
|
||||
assert_eq!(mgr.lowest_version_in_use(), 3);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_watermark() {
|
||||
let watermark = LayerMgrWatermark::new(0);
|
||||
assert_eq!(watermark.lowest_version_in_use(), 0);
|
||||
watermark.release(0);
|
||||
assert_eq!(watermark.lowest_version_in_use(), 1);
|
||||
watermark.release(1);
|
||||
assert_eq!(watermark.lowest_version_in_use(), 2);
|
||||
watermark.release(3);
|
||||
watermark.release(4);
|
||||
watermark.release(5);
|
||||
watermark.release(7);
|
||||
assert_eq!(watermark.lowest_version_in_use(), 2);
|
||||
watermark.release(2);
|
||||
assert_eq!(watermark.lowest_version_in_use(), 6);
|
||||
watermark.release(6);
|
||||
assert_eq!(watermark.lowest_version_in_use(), 8);
|
||||
}
|
||||
}
|
||||
@@ -222,14 +222,13 @@ impl Layer for DeltaLayer {
|
||||
/// debugging function to print out the contents of the layer
|
||||
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
|
||||
println!(
|
||||
"----- delta layer for ten {} tli {} keys {}-{} lsn {}-{} size {} ----",
|
||||
"----- delta layer for ten {} tli {} keys {}-{} lsn {}-{} ----",
|
||||
self.desc.tenant_id,
|
||||
self.desc.timeline_id,
|
||||
self.desc.key_range.start,
|
||||
self.desc.key_range.end,
|
||||
self.desc.lsn_range.start,
|
||||
self.desc.lsn_range.end,
|
||||
self.desc.file_size
|
||||
self.desc.lsn_range.end
|
||||
);
|
||||
|
||||
if !verbose {
|
||||
|
||||
@@ -153,13 +153,12 @@ impl Layer for ImageLayer {
|
||||
/// debugging function to print out the contents of the layer
|
||||
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
|
||||
println!(
|
||||
"----- image layer for ten {} tli {} key {}-{} at {} size {} ----",
|
||||
"----- image layer for ten {} tli {} key {}-{} at {} ----",
|
||||
self.desc.tenant_id,
|
||||
self.desc.timeline_id,
|
||||
self.desc.key_range.start,
|
||||
self.desc.key_range.end,
|
||||
self.lsn,
|
||||
self.desc.file_size
|
||||
self.lsn
|
||||
);
|
||||
|
||||
if !verbose {
|
||||
@@ -213,11 +212,7 @@ impl Layer for ImageLayer {
|
||||
reconstruct_state.img = Some((self.lsn, value));
|
||||
Ok(ValueReconstructResult::Complete)
|
||||
} else {
|
||||
if self.desc.is_incremental {
|
||||
Ok(ValueReconstructResult::Continue)
|
||||
} else {
|
||||
Ok(ValueReconstructResult::Missing)
|
||||
}
|
||||
Ok(ValueReconstructResult::Missing)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -409,7 +404,7 @@ impl ImageLayer {
|
||||
timeline_id,
|
||||
filename.key_range.clone(),
|
||||
filename.lsn,
|
||||
true,
|
||||
false,
|
||||
file_size,
|
||||
), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
|
||||
lsn: filename.lsn,
|
||||
@@ -441,7 +436,7 @@ impl ImageLayer {
|
||||
summary.timeline_id,
|
||||
summary.key_range,
|
||||
summary.lsn,
|
||||
true,
|
||||
false,
|
||||
metadata.len(),
|
||||
), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
|
||||
lsn: summary.lsn,
|
||||
@@ -486,14 +481,12 @@ struct ImageLayerWriterInner {
|
||||
path: PathBuf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_id: TenantId,
|
||||
key_range: Range<Key>,
|
||||
lsn: Lsn,
|
||||
is_incremental: bool,
|
||||
|
||||
blob_writer: WriteBlobWriter<VirtualFile>,
|
||||
tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
|
||||
|
||||
start_key: Key,
|
||||
last_key: Option<Key>,
|
||||
}
|
||||
|
||||
impl ImageLayerWriterInner {
|
||||
@@ -504,8 +497,8 @@ impl ImageLayerWriterInner {
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_id: TenantId,
|
||||
key_range: &Range<Key>,
|
||||
lsn: Lsn,
|
||||
start_key: Key,
|
||||
is_incremental: bool,
|
||||
) -> anyhow::Result<Self> {
|
||||
// Create the file initially with a temporary filename.
|
||||
@@ -515,7 +508,7 @@ impl ImageLayerWriterInner {
|
||||
timeline_id,
|
||||
tenant_id,
|
||||
&ImageFileName {
|
||||
key_range: start_key..start_key, // TODO(chi): use number instead of dummy range
|
||||
key_range: key_range.clone(),
|
||||
lsn,
|
||||
},
|
||||
);
|
||||
@@ -537,12 +530,11 @@ impl ImageLayerWriterInner {
|
||||
path,
|
||||
timeline_id,
|
||||
tenant_id,
|
||||
key_range: key_range.clone(),
|
||||
lsn,
|
||||
tree: tree_builder,
|
||||
blob_writer,
|
||||
is_incremental,
|
||||
start_key,
|
||||
last_key: None,
|
||||
};
|
||||
|
||||
Ok(writer)
|
||||
@@ -554,14 +546,7 @@ impl ImageLayerWriterInner {
|
||||
/// The page versions must be appended in blknum order.
|
||||
///
|
||||
fn put_image(&mut self, key: Key, img: &[u8]) -> anyhow::Result<()> {
|
||||
if cfg!(debug_assertions) {
|
||||
ensure!(key >= self.start_key);
|
||||
if let Some(last_key) = self.last_key.as_ref() {
|
||||
ensure!(last_key < &key);
|
||||
}
|
||||
self.last_key = Some(key.clone());
|
||||
}
|
||||
|
||||
ensure!(self.key_range.contains(&key));
|
||||
let off = self.blob_writer.write_blob(img)?;
|
||||
|
||||
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
|
||||
@@ -574,7 +559,7 @@ impl ImageLayerWriterInner {
|
||||
///
|
||||
/// Finish writing the image layer.
|
||||
///
|
||||
fn finish(self, end_key: Key) -> anyhow::Result<ImageLayer> {
|
||||
fn finish(self) -> anyhow::Result<ImageLayer> {
|
||||
let index_start_blk =
|
||||
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
|
||||
|
||||
@@ -587,15 +572,13 @@ impl ImageLayerWriterInner {
|
||||
file.write_all(buf.as_ref())?;
|
||||
}
|
||||
|
||||
let key_range = self.start_key.clone()..end_key;
|
||||
|
||||
// Fill in the summary on blk 0
|
||||
let summary = Summary {
|
||||
magic: IMAGE_FILE_MAGIC,
|
||||
format_version: STORAGE_FORMAT_VERSION,
|
||||
tenant_id: self.tenant_id,
|
||||
timeline_id: self.timeline_id,
|
||||
key_range: key_range.clone(),
|
||||
key_range: self.key_range.clone(),
|
||||
lsn: self.lsn,
|
||||
index_start_blk,
|
||||
index_root_blk,
|
||||
@@ -610,7 +593,7 @@ impl ImageLayerWriterInner {
|
||||
let desc = PersistentLayerDesc::new_img(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
key_range.clone(),
|
||||
self.key_range.clone(),
|
||||
self.lsn,
|
||||
self.is_incremental, // for now, image layer ALWAYS covers the full range
|
||||
metadata.len(),
|
||||
@@ -644,7 +627,7 @@ impl ImageLayerWriterInner {
|
||||
self.timeline_id,
|
||||
self.tenant_id,
|
||||
&ImageFileName {
|
||||
key_range,
|
||||
key_range: self.key_range.clone(),
|
||||
lsn: self.lsn,
|
||||
},
|
||||
);
|
||||
@@ -654,10 +637,6 @@ impl ImageLayerWriterInner {
|
||||
|
||||
Ok(layer)
|
||||
}
|
||||
|
||||
fn size(&self) -> u64 {
|
||||
self.blob_writer.size() + self.tree.borrow_writer().size()
|
||||
}
|
||||
}
|
||||
|
||||
/// A builder object for constructing a new image layer.
|
||||
@@ -694,7 +673,7 @@ impl ImageLayerWriter {
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_id: TenantId,
|
||||
start_key: Key,
|
||||
key_range: &Range<Key>,
|
||||
lsn: Lsn,
|
||||
is_incremental: bool,
|
||||
) -> anyhow::Result<ImageLayerWriter> {
|
||||
@@ -703,8 +682,8 @@ impl ImageLayerWriter {
|
||||
conf,
|
||||
timeline_id,
|
||||
tenant_id,
|
||||
key_range,
|
||||
lsn,
|
||||
start_key,
|
||||
is_incremental,
|
||||
)?),
|
||||
})
|
||||
@@ -722,12 +701,8 @@ impl ImageLayerWriter {
|
||||
///
|
||||
/// Finish writing the image layer.
|
||||
///
|
||||
pub fn finish(mut self, end_key: Key) -> anyhow::Result<ImageLayer> {
|
||||
self.inner.take().unwrap().finish(end_key)
|
||||
}
|
||||
|
||||
pub fn size(&self) -> u64 {
|
||||
self.inner.as_ref().unwrap().size()
|
||||
pub fn finish(mut self) -> anyhow::Result<ImageLayer> {
|
||||
self.inner.take().unwrap().finish()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -11,7 +11,6 @@ use crate::tenant::blob_io::{BlobCursor, BlobWriter};
|
||||
use crate::tenant::block_io::BlockReader;
|
||||
use crate::tenant::ephemeral_file::EphemeralFile;
|
||||
use crate::tenant::storage_layer::{ValueReconstructResult, ValueReconstructState};
|
||||
use crate::tenant::timeline::ENABLE_TIERED_COMPACTION;
|
||||
use crate::walrecord;
|
||||
use anyhow::{ensure, Result};
|
||||
use pageserver_api::models::InMemoryLayerInfo;
|
||||
@@ -150,8 +149,8 @@ impl Layer for InMemoryLayer {
|
||||
.unwrap_or_default();
|
||||
|
||||
println!(
|
||||
"----- in-memory layer LSNs {}-{} ----",
|
||||
self.start_lsn, end_str,
|
||||
"----- in-memory layer for tli {} LSNs {}-{} ----",
|
||||
self.timeline_id, self.start_lsn, end_str,
|
||||
);
|
||||
|
||||
if !verbose {
|
||||
@@ -342,18 +341,11 @@ impl InMemoryLayer {
|
||||
// rare though, so we just accept the potential latency hit for now.
|
||||
let inner = self.inner.read().unwrap();
|
||||
|
||||
let mut keys: Vec<(&Key, &VecMap<Lsn, u64>)> = inner.index.iter().collect();
|
||||
keys.sort_by_key(|k| k.0);
|
||||
|
||||
let mut delta_layer_writer = DeltaLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_id,
|
||||
if ENABLE_TIERED_COMPACTION {
|
||||
keys.first().unwrap().0.clone()
|
||||
} else {
|
||||
Key::MIN
|
||||
},
|
||||
Key::MIN,
|
||||
self.start_lsn..inner.end_lsn.unwrap(),
|
||||
)?;
|
||||
|
||||
@@ -361,6 +353,9 @@ impl InMemoryLayer {
|
||||
|
||||
let mut cursor = inner.file.block_cursor();
|
||||
|
||||
let mut keys: Vec<(&Key, &VecMap<Lsn, u64>)> = inner.index.iter().collect();
|
||||
keys.sort_by_key(|k| k.0);
|
||||
|
||||
for (key, vec_map) in keys.iter() {
|
||||
let key = **key;
|
||||
// Write all page versions
|
||||
@@ -371,11 +366,7 @@ impl InMemoryLayer {
|
||||
}
|
||||
}
|
||||
|
||||
let delta_layer = delta_layer_writer.finish(if ENABLE_TIERED_COMPACTION {
|
||||
keys.last().unwrap().0.next()
|
||||
} else {
|
||||
Key::MAX
|
||||
})?;
|
||||
let delta_layer = delta_layer_writer.finish(Key::MAX)?;
|
||||
Ok(delta_layer)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -173,14 +173,13 @@ impl PersistentLayerDesc {
|
||||
|
||||
pub fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
|
||||
println!(
|
||||
"----- layer for keys {}-{} lsn {}-{} size {} is_delta {} is_incremental {} ----",
|
||||
"----- layer for ten {} tli {} keys {}-{} lsn {}-{} ----",
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
self.key_range.start,
|
||||
self.key_range.end,
|
||||
self.lsn_range.start,
|
||||
self.lsn_range.end,
|
||||
self.file_size,
|
||||
self.is_delta,
|
||||
self.is_incremental
|
||||
self.lsn_range.end
|
||||
);
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -14,43 +14,35 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::completion;
|
||||
|
||||
use super::timeline::ENABLE_TIERED_COMPACTION;
|
||||
|
||||
/// Start per tenant background loops: compaction and gc.
|
||||
pub fn start_background_loops(
|
||||
tenant: &Arc<Tenant>,
|
||||
background_jobs_can_start: Option<&completion::Barrier>,
|
||||
) {
|
||||
let tenant_id = tenant.tenant_id;
|
||||
// start two compaction threads
|
||||
let range = if ENABLE_TIERED_COMPACTION { 0..4 } else { 0..1 };
|
||||
for cpt_id in range {
|
||||
task_mgr::spawn(
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::Compaction,
|
||||
Some(tenant_id),
|
||||
None,
|
||||
&format!("compactor for tenant {tenant_id}"),
|
||||
false,
|
||||
{
|
||||
let tenant = Arc::clone(tenant);
|
||||
let background_jobs_can_start = background_jobs_can_start.cloned();
|
||||
async move {
|
||||
let cancel = task_mgr::shutdown_token();
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => { return Ok(()) },
|
||||
_ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
|
||||
};
|
||||
compaction_loop(tenant, cancel)
|
||||
.instrument(
|
||||
info_span!("compaction_loop", tenant_id = %tenant_id, cpt_id = %cpt_id),
|
||||
)
|
||||
.await;
|
||||
Ok(())
|
||||
}
|
||||
},
|
||||
);
|
||||
}
|
||||
task_mgr::spawn(
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::Compaction,
|
||||
Some(tenant_id),
|
||||
None,
|
||||
&format!("compactor for tenant {tenant_id}"),
|
||||
false,
|
||||
{
|
||||
let tenant = Arc::clone(tenant);
|
||||
let background_jobs_can_start = background_jobs_can_start.cloned();
|
||||
async move {
|
||||
let cancel = task_mgr::shutdown_token();
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => { return Ok(()) },
|
||||
_ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
|
||||
};
|
||||
compaction_loop(tenant, cancel)
|
||||
.instrument(info_span!("compaction_loop", tenant_id = %tenant_id))
|
||||
.await;
|
||||
Ok(())
|
||||
}
|
||||
},
|
||||
);
|
||||
task_mgr::spawn(
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::GarbageCollector,
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -198,10 +198,10 @@ impl Timeline {
|
||||
// So, we just need to deal with this.
|
||||
let candidates: Vec<Arc<dyn PersistentLayer>> = {
|
||||
let guard = self.layers.read().await;
|
||||
let (layers, _) = &*guard;
|
||||
let (layers, mapping) = &*guard;
|
||||
let mut candidates = Vec::new();
|
||||
for hist_layer in layers.iter_historic_layers() {
|
||||
let hist_layer = self.lcache.get_from_desc(&hist_layer);
|
||||
let hist_layer = mapping.get_from_desc(&hist_layer);
|
||||
if hist_layer.is_remote_layer() {
|
||||
continue;
|
||||
}
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 1144aee166...a2daebc6b4
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 1984832c74...2df2ce3744
Reference in New Issue
Block a user