Compare commits

...

60 Commits

Author SHA1 Message Date
Alex Chi Z
1ab2e4da52 fix
Signed-off-by: Alex Chi Z <chi@neon.tech>
2023-07-11 15:34:03 -04:00
Alex Chi Z
9cdd9a2801 fix
Signed-off-by: Alex Chi Z <chi@neon.tech>
2023-07-11 13:59:26 -04:00
Alex Chi Z
a265efa7a5 exact match
Signed-off-by: Alex Chi Z <chi@neon.tech>
2023-07-11 13:56:01 -04:00
Alex Chi Z
c0724538ef fix incremental image layer again
Signed-off-by: Alex Chi Z <chi@neon.tech>
2023-07-11 13:47:09 -04:00
Alex Chi Z
d558b547e8 probably better strategy?
Signed-off-by: Alex Chi Z <chi@neon.tech>
2023-07-10 08:04:59 -04:00
Alex Chi Z
569ed35c91 trivial move switch
Signed-off-by: Alex Chi Z <chi@neon.tech>
2023-07-07 15:07:33 -04:00
Alex Chi Z
f258f50b76 pagectl: separate xy margin for draw timeline
Signed-off-by: Alex Chi Z <chi@neon.tech>
2023-07-07 12:14:08 -04:00
Alex Chi Z
f31cc2394b compaction PoC: subcompaction (#4656)
This PR adds subcompaction support for compaction PoC. For compaction
job >= 4GB, it will be split into 4 threads.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2023-07-07 12:13:44 -04:00
Alex Chi Z
7e7cdaa3eb handle name conflict
Signed-off-by: Alex Chi Z <chi@neon.tech>
2023-07-05 16:26:02 -04:00
Alex Chi Z
a079d250d9 compaction PoC: trivial move compaction (#4604)
reduce write amp. for bulk load, might also be useful for main branch

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2023-07-05 15:27:25 -04:00
Alex Chi Z
756319b0ce do not excldue last tier
Signed-off-by: Alex Chi Z <chi@neon.tech>
2023-07-03 14:02:53 -04:00
Alex Chi Z
8816fc98fc fix compaction algorithm
Signed-off-by: Alex Chi Z <chi@neon.tech>
2023-06-29 18:11:51 -04:00
Alex Chi Z
c3bcaa0551 rm println
Signed-off-by: Alex Chi Z <chi@neon.tech>
2023-06-29 15:02:30 -04:00
Alex Chi Z
8aede79abf fix
Signed-off-by: Alex Chi Z <chi@neon.tech>
2023-06-29 14:57:42 -04:00
Alex Chi Z
d28e309c06 fix
Signed-off-by: Alex Chi Z <chi@neon.tech>
2023-06-29 14:48:54 -04:00
Alex Chi Z
647b7a70a8 fix
Signed-off-by: Alex Chi Z <chi@neon.tech>
2023-06-29 14:31:00 -04:00
Alex Chi Z
e7955895d1 fix
Signed-off-by: Alex Chi Z <chi@neon.tech>
2023-06-29 14:18:46 -04:00
Alex Chi Z
dc9c842d21 true incremental
Signed-off-by: Alex Chi Z <chi@neon.tech>
2023-06-29 14:05:28 -04:00
Alex Chi Z
05719cb9cd debug
Signed-off-by: Alex Chi Z <chi@neon.tech>
2023-06-29 12:42:15 -04:00
Alex Chi Z
0051a6c931 debug
Signed-off-by: Alex Chi Z <chi@neon.tech>
2023-06-29 12:27:36 -04:00
Alex Chi Z
c9c40171cd fix layer map
Signed-off-by: Alex Chi Z <chi@neon.tech>
2023-06-29 11:29:41 -04:00
Alex Chi Z
376762e07e fewer logs
Signed-off-by: Alex Chi Z <chi@neon.tech>
2023-06-29 09:16:46 -04:00
Alex Chi Z
d4e262f646 delta with correct range
Signed-off-by: Alex Chi Z <chi@neon.tech>
2023-06-29 09:11:15 -04:00
Alex Chi Z
d279b4421e increase threshold
Signed-off-by: Alex Chi Z <chi@neon.tech>
2023-06-28 15:39:52 -04:00
Alex Chi Z
b1f0bbd12a add reduce num sorted run trigger
Signed-off-by: Alex Chi Z <chi@neon.tech>
2023-06-28 15:38:14 -04:00
Alex Chi Z
7d16a9f96f fix again
Signed-off-by: Alex Chi Z <chi@neon.tech>
2023-06-28 15:07:31 -04:00
Alex Chi Z
878627161c revert not
Signed-off-by: Alex Chi Z <chi@neon.tech>
2023-06-28 14:58:42 -04:00
Alex Chi Z
4db4f42dec correctly handle compaction
Signed-off-by: Alex Chi Z <chi@neon.tech>
2023-06-28 14:41:26 -04:00
Alex Chi Z
6cb149e3c3 enable tiered again
Signed-off-by: Alex Chi Z <chi@neon.tech>
2023-06-28 14:30:17 -04:00
Alex Chi
f3fdaf8ef1 parallel compaction
Signed-off-by: Alex Chi <chi@neon.tech>
2023-06-27 16:42:56 -04:00
Alex Chi
eb93e686ab fix deletion
Signed-off-by: Alex Chi <chi@neon.tech>
2023-06-27 14:47:04 -04:00
Alex Chi
2cb79ae3ff fix deletion
Signed-off-by: Alex Chi <chi@neon.tech>
2023-06-27 14:43:20 -04:00
Alex Chi
dfe8527806 remove assertion
Signed-off-by: Alex Chi <chi@neon.tech>
2023-06-27 13:52:12 -04:00
Alex Chi
335710cec6 bring back original compaction
Signed-off-by: Alex Chi <chi@neon.tech>
2023-06-27 13:38:02 -04:00
Alex Chi
a78008ad82 max_merge_width
Signed-off-by: Alex Chi <chi@neon.tech>
2023-06-27 13:30:23 -04:00
Alex Chi
30e7ffcd28 adjust compaction strategy
Signed-off-by: Alex Chi <chi@neon.tech>
2023-06-26 15:52:37 -04:00
Alex Chi
43d564ce0a incremental image layer
Signed-off-by: Alex Chi <chi@neon.tech>
2023-06-26 15:25:35 -04:00
Alex Chi
f86ff5e54b dump more
Signed-off-by: Alex Chi <chi@neon.tech>
2023-06-26 14:57:00 -04:00
Alex Chi
9ed6ad1d24 fix weak ptr
Signed-off-by: Alex Chi <chi@neon.tech>
2023-06-26 14:33:00 -04:00
Alex Chi
91f28cb516 include delta l0 in compaction, more metrics
Signed-off-by: Alex Chi <chi@neon.tech>
2023-06-26 13:56:29 -04:00
Alex Chi
0b459eb414 fix ratio compute
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-22 15:11:15 -04:00
Alex Chi
0865ed623c fix comment
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-22 15:01:25 -04:00
Alex Chi
9e0f103c7b insert at 0
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-22 15:00:58 -04:00
Alex Chi
9f216a78a1 print
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-22 15:00:10 -04:00
Alex Chi
6967b4837b fix
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-22 14:53:53 -04:00
Alex Chi
9b50350857 threshold = 3
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-22 14:37:29 -04:00
Alex Chi
8ebfa32a0c compaction l0 adds to sorted runs
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-22 14:24:57 -04:00
Alex Chi
9905d75715 dump file size
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-22 14:18:54 -04:00
Alex Chi
b0b616f3ac dump
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-22 14:12:33 -04:00
Alex Chi
820685fe92 remove all contents
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-22 13:52:34 -04:00
Alex Chi
a593d96b79 neon_local: support force init
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-22 13:52:28 -04:00
Alex Chi
867b656ef2 bypass ut
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-22 11:28:34 -04:00
Alex Chi
76b339b150 create partial image layers
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-21 14:38:11 -04:00
Alex Chi
9b3fa1a2e1 fix compile error
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-21 14:16:44 -04:00
Alex Chi
17781776c8 add two compaction triggers
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-21 10:27:24 -04:00
Alex Chi
5274f487e4 add tiered compaction skeleton
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-20 14:43:07 -04:00
Alex Chi
9b7747436c incremental image?
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-20 11:05:16 -04:00
Alex Chi
a2056666ae pgserver: move mapping logic to layer cache
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-14 15:07:38 -04:00
Alex Chi
fc190a2a19 resolve merge conflicts
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-13 13:56:50 -04:00
Alex Chi
faee3152f3 refactor: use LayerDesc in LayerMap (part 2)
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-13 13:54:59 -04:00
20 changed files with 1963 additions and 629 deletions

View File

@@ -308,7 +308,8 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result<LocalEnv> {
let mut env =
LocalEnv::parse_config(&toml_file).context("Failed to create neon configuration")?;
env.init(pg_version)
let force = init_match.get_flag("force");
env.init(pg_version, force)
.context("Failed to initialize neon repository")?;
// Initialize pageserver, create initial tenant and timeline.
@@ -1013,6 +1014,13 @@ fn cli() -> Command {
.help("If set, the node will be a hot replica on the specified timeline")
.required(false);
let force_arg = Arg::new("force")
.value_parser(value_parser!(bool))
.long("force")
.action(ArgAction::SetTrue)
.help("Force initialization even if the repository is not empty")
.required(false);
Command::new("Neon CLI")
.arg_required_else_help(true)
.version(GIT_VERSION)
@@ -1028,6 +1036,7 @@ fn cli() -> Command {
.value_name("config"),
)
.arg(pg_version_arg.clone())
.arg(force_arg)
)
.subcommand(
Command::new("timeline")

View File

@@ -364,7 +364,7 @@ impl LocalEnv {
//
// Initialize a new Neon repository
//
pub fn init(&mut self, pg_version: u32) -> anyhow::Result<()> {
pub fn init(&mut self, pg_version: u32, force: bool) -> anyhow::Result<()> {
// check if config already exists
let base_path = &self.base_data_dir;
ensure!(
@@ -372,11 +372,29 @@ impl LocalEnv {
"repository base path is missing"
);
ensure!(
!base_path.exists(),
"directory '{}' already exists. Perhaps already initialized?",
base_path.display()
);
if base_path.exists() {
if force {
println!("removing all contents of '{}'", base_path.display());
// instead of directly calling `remove_dir_all`, we keep the original dir but removing
// all contents inside. This helps if the developer symbol links another directory (i.e.,
// S3 local SSD) to the `.neon` base directory.
for entry in std::fs::read_dir(base_path)? {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
fs::remove_dir_all(&path)?;
} else {
fs::remove_file(&path)?;
}
}
} else {
bail!(
"directory '{}' already exists. Perhaps already initialized? (Hint: use --force to remove all contents)",
base_path.display()
);
}
}
if !self.pg_bin_dir(pg_version)?.join("postgres").exists() {
bail!(
"Can't find postgres binary at {}",
@@ -392,7 +410,7 @@ impl LocalEnv {
}
}
fs::create_dir(base_path)?;
fs::create_dir_all(base_path)?;
// Generate keypair for JWT.
//

View File

@@ -1,22 +1,23 @@
use pageserver::keyspace::{KeyPartitioning, KeySpace};
use pageserver::repository::Key;
use pageserver::tenant::layer_map::LayerMap;
use pageserver::tenant::storage_layer::{Layer, LayerDescriptor, LayerFileName};
use pageserver::tenant::storage_layer::{tests::LayerDescriptor, Layer, LayerFileName};
use pageserver::tenant::storage_layer::{PersistentLayer, PersistentLayerDesc};
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
use std::cmp::{max, min};
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Instant;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
fn build_layer_map(filename_dump: PathBuf) -> LayerMap<LayerDescriptor> {
let mut layer_map = LayerMap::<LayerDescriptor>::default();
fn build_layer_map(filename_dump: PathBuf) -> LayerMap {
let mut layer_map = LayerMap::default();
let mut min_lsn = Lsn(u64::MAX);
let mut max_lsn = Lsn(0);
@@ -33,7 +34,7 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap<LayerDescriptor> {
min_lsn = min(min_lsn, lsn_range.start);
max_lsn = max(max_lsn, Lsn(lsn_range.end.0 - 1));
updates.insert_historic(layer.get_persistent_layer_desc(), Arc::new(layer));
updates.insert_historic(layer.layer_desc().clone());
}
println!("min: {min_lsn}, max: {max_lsn}");
@@ -43,7 +44,7 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap<LayerDescriptor> {
}
/// Construct a layer map query pattern for benchmarks
fn uniform_query_pattern(layer_map: &LayerMap<LayerDescriptor>) -> Vec<(Key, Lsn)> {
fn uniform_query_pattern(layer_map: &LayerMap) -> Vec<(Key, Lsn)> {
// For each image layer we query one of the pages contained, at LSN right
// before the image layer was created. This gives us a somewhat uniform
// coverage of both the lsn and key space because image layers have
@@ -69,7 +70,7 @@ fn uniform_query_pattern(layer_map: &LayerMap<LayerDescriptor>) -> Vec<(Key, Lsn
// Construct a partitioning for testing get_difficulty map when we
// don't have an exact result of `collect_keyspace` to work with.
fn uniform_key_partitioning(layer_map: &LayerMap<LayerDescriptor>, _lsn: Lsn) -> KeyPartitioning {
fn uniform_key_partitioning(layer_map: &LayerMap, _lsn: Lsn) -> KeyPartitioning {
let mut parts = Vec::new();
// We add a partition boundary at the start of each image layer,
@@ -209,13 +210,15 @@ fn bench_sequential(c: &mut Criterion) {
for i in 0..100_000 {
let i32 = (i as u32) % 100;
let zero = Key::from_hex("000000000000000000000000000000000000").unwrap();
let layer = LayerDescriptor {
key: zero.add(10 * i32)..zero.add(10 * i32 + 1),
lsn: Lsn(i)..Lsn(i + 1),
is_incremental: false,
short_id: format!("Layer {}", i),
};
updates.insert_historic(layer.get_persistent_layer_desc(), Arc::new(layer));
let layer = LayerDescriptor::from(PersistentLayerDesc::new_img(
TenantId::generate(),
TimelineId::generate(),
zero.add(10 * i32)..zero.add(10 * i32 + 1),
Lsn(i),
false,
0,
));
updates.insert_historic(layer.layer_desc().clone());
}
updates.flush();
println!("Finished layer map init in {:?}", now.elapsed());

View File

@@ -117,7 +117,8 @@ pub fn main() -> Result<()> {
let mut lsn_diff = (lsn_end - lsn_start) as f32;
let mut fill = Fill::None;
let mut margin = 0.05 * lsn_diff; // Height-dependent margin to disambiguate overlapping deltas
let mut ymargin = 0.05 * lsn_diff; // Height-dependent margin to disambiguate overlapping deltas
let xmargin = 0.05; // Height-dependent margin to disambiguate overlapping deltas
let mut lsn_offset = 0.0;
// Fill in and thicken rectangle if it's an
@@ -128,7 +129,7 @@ pub fn main() -> Result<()> {
num_images += 1;
lsn_diff = 0.3;
lsn_offset = -lsn_diff / 2.0;
margin = 0.05;
ymargin = 0.05;
fill = Fill::Color(rgb(0, 0, 0));
}
Ordering::Greater => panic!("Invalid lsn range {}-{}", lsn_start, lsn_end),
@@ -137,10 +138,10 @@ pub fn main() -> Result<()> {
println!(
" {}",
rectangle(
key_start as f32 + stretch * margin,
stretch * (lsn_max as f32 - (lsn_end as f32 - margin - lsn_offset)),
key_diff as f32 - stretch * 2.0 * margin,
stretch * (lsn_diff - 2.0 * margin)
key_start as f32 + stretch * xmargin,
stretch * (lsn_max as f32 - (lsn_end as f32 - ymargin - lsn_offset)),
key_diff as f32 - stretch * 2.0 * xmargin,
stretch * (lsn_diff - 2.0 * ymargin)
)
.fill(fill)
.stroke(Stroke::Color(rgb(0, 0, 0), 0.1))

View File

@@ -53,6 +53,33 @@ pub enum StorageTimeOperation {
CreateTenant,
}
pub static NUM_TIERS: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_storage_tiers_num",
"Number of sorted runs",
&["tenant_id", "timeline_id"],
)
.expect("failed to define a metric")
});
pub static NUM_COMPACTIONS: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_storage_compaction_num",
"Number of ongoing compactions",
&["tenant_id", "timeline_id"],
)
.expect("failed to define a metric")
});
pub static STORAGE_PHYSICAL_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_storage_physical_size_sum",
"Physical size of different types of storage files",
&["type", "tenant_id", "timeline_id"],
)
.expect("failed to define a metric")
});
pub static STORAGE_TIME_SUM_PER_TIMELINE: Lazy<CounterVec> = Lazy::new(|| {
register_counter_vec!(
"pageserver_storage_operations_seconds_sum",
@@ -392,6 +419,8 @@ const STORAGE_IO_TIME_OPERATIONS: &[&str] = &[
const STORAGE_IO_SIZE_OPERATIONS: &[&str] = &["read", "write"];
pub const STORAGE_PHYSICAL_SIZE_FILE_TYPE: &[&str] = &["image", "delta", "partial-image"];
pub static STORAGE_IO_TIME: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"pageserver_io_operations_seconds",
@@ -773,6 +802,8 @@ pub struct TimelineMetrics {
pub persistent_bytes_written: IntCounter,
pub evictions: IntCounter,
pub evictions_with_low_residence_duration: std::sync::RwLock<EvictionsWithLowResidenceDuration>,
pub num_tiers: IntGauge,
pub num_compactions: IntGauge,
}
impl TimelineMetrics {
@@ -838,6 +869,12 @@ impl TimelineMetrics {
.unwrap();
let evictions_with_low_residence_duration =
evictions_with_low_residence_duration_builder.build(&tenant_id, &timeline_id);
let num_tiers = NUM_TIERS
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
let num_compactions = NUM_COMPACTIONS
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
TimelineMetrics {
tenant_id,
@@ -864,6 +901,8 @@ impl TimelineMetrics {
evictions_with_low_residence_duration,
),
read_num_fs_layers,
num_tiers,
num_compactions,
}
}
}
@@ -884,6 +923,7 @@ impl Drop for TimelineMetrics {
let _ = PERSISTENT_BYTES_WRITTEN.remove_label_values(&[tenant_id, timeline_id]);
let _ = EVICTIONS.remove_label_values(&[tenant_id, timeline_id]);
let _ = READ_NUM_FS_LAYERS.remove_label_values(&[tenant_id, timeline_id]);
let _ = STORAGE_PHYSICAL_SIZE.remove_label_values(&[tenant_id, timeline_id]);
self.evictions_with_low_residence_duration
.write()
@@ -906,6 +946,9 @@ impl Drop for TimelineMetrics {
for op in SMGR_QUERY_TIME_OPERATIONS {
let _ = SMGR_QUERY_TIME.remove_label_values(&[op, tenant_id, timeline_id]);
}
for ty in STORAGE_PHYSICAL_SIZE_FILE_TYPE {
let _ = STORAGE_PHYSICAL_SIZE.remove_label_values(&[ty, tenant_id, timeline_id]);
}
}
}

View File

@@ -85,6 +85,7 @@ pub mod blob_io;
pub mod block_io;
pub mod disk_btree;
pub(crate) mod ephemeral_file;
pub mod layer_cache;
pub mod layer_map;
pub mod manifest;
@@ -562,6 +563,7 @@ impl Tenant {
.layers
.read()
.await
.0
.iter_historic_layers()
.next()
.is_some(),
@@ -1558,7 +1560,7 @@ impl Tenant {
// No timeout here, GC & Compaction should be responsive to the
// `TimelineState::Stopping` change.
info!("waiting for layer_removal_cs.lock()");
let layer_removal_guard = timeline.layer_removal_cs.lock().await;
let layer_removal_guard = timeline.lcache.delete_guard_write().await;
info!("got layer_removal_cs.lock(), deleting layer files");
// NB: storage_sync upload tasks that reference these layers have been cancelled

View File

@@ -0,0 +1,198 @@
use super::storage_layer::{PersistentLayer, PersistentLayerDesc, PersistentLayerKey, RemoteLayer};
use super::Timeline;
use crate::metrics::{STORAGE_PHYSICAL_SIZE, STORAGE_PHYSICAL_SIZE_FILE_TYPE};
use crate::tenant::layer_map::{self, LayerMap};
use anyhow::Result;
use std::sync::{Mutex, Weak};
use std::{collections::HashMap, sync::Arc};
use utils::id::{TenantId, TimelineId};
pub struct LayerCache {
/// Layer removal lock.
/// A lock to ensure that no layer of the timeline is removed concurrently by other tasks.
/// This lock is acquired in [`Timeline::gc`], [`Timeline::compact`],
/// and [`Tenant::delete_timeline`]. This is an `Arc<Mutex>` lock because we need an owned
/// lock guard in functions that will be spawned to tokio I/O pool (which requires `'static`).
pub layers_removal_lock: Arc<tokio::sync::RwLock<()>>,
/// We need this lock b/c we do not have any way to prevent GC/compaction from removing files in-use.
/// We need to do reference counting on Arc to prevent this from happening, and we can safely remove this lock.
pub layers_operation_lock: Arc<tokio::sync::RwLock<()>>,
/// Will be useful when we move evict / download to layer cache.
#[allow(unused)]
timeline: Weak<Timeline>,
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub tenant_id_str: String,
pub timeline_id_str: String,
mapping: Mutex<HashMap<PersistentLayerKey, Arc<dyn PersistentLayer>>>,
}
pub struct LayerInUseWrite(tokio::sync::OwnedRwLockWriteGuard<()>);
pub struct LayerInUseRead(tokio::sync::OwnedRwLockReadGuard<()>);
#[derive(Clone)]
pub struct DeleteGuardRead(Arc<tokio::sync::OwnedRwLockReadGuard<()>>);
#[derive(Clone)]
pub struct DeleteGuardWrite(Arc<tokio::sync::OwnedRwLockWriteGuard<()>>);
impl LayerCache {
pub fn new(timeline: Weak<Timeline>, tenant_id: TenantId, timeline_id: TimelineId) -> Self {
Self {
layers_operation_lock: Arc::new(tokio::sync::RwLock::new(())),
layers_removal_lock: Arc::new(tokio::sync::RwLock::new(())),
mapping: Mutex::new(HashMap::new()),
timeline: timeline,
tenant_id: tenant_id,
timeline_id: timeline_id,
tenant_id_str: tenant_id.to_string(),
timeline_id_str: timeline_id.to_string(),
}
}
pub fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc<dyn PersistentLayer> {
let guard = self.mapping.lock().unwrap();
guard.get(&desc.key()).expect("not found").clone()
}
/// This function is to mock the original behavior of `layers` lock in `Timeline`. Can be removed after we ensure
/// we won't delete files that are being read.
pub async fn layer_in_use_write(&self) -> LayerInUseWrite {
LayerInUseWrite(self.layers_operation_lock.clone().write_owned().await)
}
/// This function is to mock the original behavior of `layers` lock in `Timeline`. Can be removed after we ensure
/// we won't delete files that are being read.
pub async fn layer_in_use_read(&self) -> LayerInUseRead {
LayerInUseRead(self.layers_operation_lock.clone().read_owned().await)
}
/// Ensures only one of compaction / gc can happen at a time.
pub async fn delete_guard_read(&self) -> DeleteGuardRead {
DeleteGuardRead(Arc::new(
self.layers_removal_lock.clone().read_owned().await,
))
}
/// Ensures only one of compaction / gc can happen at a time.
pub async fn delete_guard_write(&self) -> DeleteGuardWrite {
DeleteGuardWrite(Arc::new(
self.layers_removal_lock.clone().write_owned().await,
))
}
/// Should only be called when initializing the timeline. Bypass checks and layer operation lock.
pub fn remove_local_when_init(&self, layer: Arc<dyn PersistentLayer>) {
self.metrics_size_sub(&*layer);
let mut guard = self.mapping.lock().unwrap();
guard.remove(&layer.layer_desc().key());
}
/// Should only be called when initializing the timeline. Bypass checks and layer operation lock.
pub fn populate_remote_when_init(&self, layer: Arc<RemoteLayer>) {
self.metrics_size_add(&*layer);
let mut guard = self.mapping.lock().unwrap();
guard.insert(layer.layer_desc().key(), layer);
}
/// Should only be called when initializing the timeline. Bypass checks and layer operation lock.
pub fn populate_local_when_init(&self, layer: Arc<dyn PersistentLayer>) {
self.metrics_size_add(&*layer);
let mut guard = self.mapping.lock().unwrap();
guard.insert(layer.layer_desc().key(), layer);
}
/// Called within read path.
pub fn replace_and_verify(
&self,
expected: Arc<dyn PersistentLayer>,
new: Arc<dyn PersistentLayer>,
) -> Result<()> {
let mut guard = self.mapping.lock().unwrap();
let key: PersistentLayerKey = expected.layer_desc().key();
let other = new.layer_desc().key();
let expected_l0 = LayerMap::is_l0(expected.layer_desc());
let new_l0 = LayerMap::is_l0(new.layer_desc());
fail::fail_point!("layermap-replace-notfound", |_| anyhow::bail!(
"replacing downloaded layer into layermap failed because layer was not found"
));
anyhow::ensure!(
key == other,
"replacing downloaded layer into layermap failed because two layers have different keys: {key:?} != {other:?}"
);
anyhow::ensure!(
expected_l0 == new_l0,
"replacing downloaded layer into layermap failed because one layer is l0 while the other is not: {expected_l0} != {new_l0}"
);
if let Some(layer) = guard.get_mut(&expected.layer_desc().key()) {
anyhow::ensure!(
layer_map::compare_arced_layers(&expected, layer),
"replacing downloaded layer into layermap failed because another layer was found instead of expected, expected={expected:?}, new={new:?}",
expected = Arc::as_ptr(&expected),
new = Arc::as_ptr(layer),
);
*layer = new;
Ok(())
} else {
anyhow::bail!(
"replacing downloaded layer into layermap failed because layer was not found"
);
}
}
/// Called within write path. When compaction and image layer creation we will create new layers.
pub fn create_new_layer(&self, layer: Arc<dyn PersistentLayer>) {
self.metrics_size_add(&*layer);
let mut guard = self.mapping.lock().unwrap();
guard.insert(layer.layer_desc().key(), layer);
}
/// Called within write path. When GC and compaction we will remove layers and delete them on disk.
/// Will move logic to delete files here later.
pub fn delete_layer(&self, layer: Arc<dyn PersistentLayer>) {
self.metrics_size_sub(&*layer);
let mut guard = self.mapping.lock().unwrap();
guard.remove(&layer.layer_desc().key());
}
fn metrics_size_add(&self, layer: &dyn PersistentLayer) {
STORAGE_PHYSICAL_SIZE
.with_label_values(&[
Self::get_layer_type(layer),
&self.tenant_id_str,
&self.timeline_id_str,
])
.add(layer.file_size() as i64);
}
fn metrics_size_sub(&self, layer: &dyn PersistentLayer) {
STORAGE_PHYSICAL_SIZE
.with_label_values(&[
Self::get_layer_type(layer),
&self.tenant_id_str,
&self.timeline_id_str,
])
.sub(layer.file_size() as i64);
}
fn get_layer_type(layer: &dyn PersistentLayer) -> &'static str {
if layer.layer_desc().is_delta() {
&STORAGE_PHYSICAL_SIZE_FILE_TYPE[1]
} else if layer.layer_desc().is_incremental() {
&STORAGE_PHYSICAL_SIZE_FILE_TYPE[2]
} else {
&STORAGE_PHYSICAL_SIZE_FILE_TYPE[0]
}
}
}

View File

@@ -51,25 +51,23 @@ use crate::keyspace::KeyPartitioning;
use crate::repository::Key;
use crate::tenant::storage_layer::InMemoryLayer;
use crate::tenant::storage_layer::Layer;
use anyhow::Context;
use anyhow::Result;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::ops::Range;
use std::sync::Arc;
use utils::lsn::Lsn;
use historic_layer_coverage::BufferedHistoricLayerCoverage;
pub use historic_layer_coverage::Replacement;
pub use historic_layer_coverage::{LayerKey, Replacement};
use super::storage_layer::range_eq;
use super::storage_layer::PersistentLayerDesc;
use super::storage_layer::PersistentLayerKey;
///
/// LayerMap tracks what layers exist on a timeline.
///
pub struct LayerMap<L: ?Sized> {
#[derive(Default)]
pub struct LayerMap {
//
// 'open_layer' holds the current InMemoryLayer that is accepting new
// records. If it is None, 'next_open_layer_at' will be set instead, indicating
@@ -96,22 +94,56 @@ pub struct LayerMap<L: ?Sized> {
/// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
l0_delta_layers: Vec<Arc<PersistentLayerDesc>>,
/// Mapping from persistent layer key to the actual layer object. Currently, it stores delta, image, and
/// remote layers. In future refactors, this will be eventually moved out of LayerMap into Timeline, and
/// RemoteLayer will be removed.
mapping: HashMap<PersistentLayerKey, Arc<L>>,
/// All sorted runs. For tiered compaction.
pub sorted_runs: SortedRuns,
}
impl<L: ?Sized> Default for LayerMap<L> {
fn default() -> Self {
Self {
open_layer: None,
next_open_layer_at: None,
frozen_layers: VecDeque::default(),
l0_delta_layers: Vec::default(),
historic: BufferedHistoricLayerCoverage::default(),
mapping: HashMap::default(),
}
#[derive(Default)]
pub struct SortedRuns {
pub runs: Vec<(usize, Vec<Arc<PersistentLayerDesc>>)>,
next_tier_id: usize,
}
impl SortedRuns {
/// Create a new sorted run and insert it at the top of the LSM tree.
pub fn create_new_run(&mut self, layers: Vec<Arc<PersistentLayerDesc>>) -> usize {
let tier_id = self.next_tier_id();
self.runs.insert(0, (tier_id, layers));
tier_id
}
/// Create a new sorted run and insert it at the bottom of the LSM tree.
pub fn create_new_bottom_run(&mut self, layers: Vec<Arc<PersistentLayerDesc>>) -> usize {
let tier_id = self.next_tier_id();
self.runs.push((tier_id, layers));
tier_id
}
pub fn compute_tier_sizes(&self) -> Vec<(usize, u64)> {
self.runs
.iter()
.map(|(tier_id, layers)| (*tier_id, layers.iter().map(|layer| layer.file_size()).sum()))
.collect::<Vec<_>>()
}
/// Remove a sorted run from the LSM tree.
pub fn remove_run(&mut self, tier_id: usize) {
self.runs.retain(|(id, _)| *id != tier_id);
}
/// Remove layers and the corresponding sorted runs.
pub fn insert_run_at(&mut self, idx: usize, layers: Vec<Arc<PersistentLayerDesc>>) {
unimplemented!()
}
pub fn num_of_tiers(&self) -> usize {
self.runs.len()
}
pub fn next_tier_id(&mut self) -> usize {
let ret = self.next_tier_id;
self.next_tier_id += 1;
ret
}
}
@@ -120,24 +152,30 @@ impl<L: ?Sized> Default for LayerMap<L> {
/// Batching historic layer insertions and removals is good for
/// performance and this struct helps us do that correctly.
#[must_use]
pub struct BatchedUpdates<'a, L: ?Sized + Layer> {
pub struct BatchedUpdates<'a> {
// While we hold this exclusive reference to the layer map the type checker
// will prevent us from accidentally reading any unflushed updates.
layer_map: &'a mut LayerMap<L>,
layer_map: &'a mut LayerMap,
}
/// Provide ability to batch more updates while hiding the read
/// API so we don't accidentally read without flushing.
impl<L> BatchedUpdates<'_, L>
where
L: ?Sized + Layer,
{
impl BatchedUpdates<'_> {
///
/// Insert an on-disk layer.
///
// TODO remove the `layer` argument when `mapping` is refactored out of `LayerMap`
pub fn insert_historic(&mut self, layer_desc: PersistentLayerDesc, layer: Arc<L>) {
self.layer_map.insert_historic_noflush(layer_desc, layer)
pub fn insert_historic(&mut self, layer_desc: PersistentLayerDesc) {
self.insert_historic_new(layer_desc) // insert into layer map without populating tiering structure
}
pub fn insert_historic_new(&mut self, layer_desc: PersistentLayerDesc) {
self.layer_map.insert_historic_noflush(layer_desc)
}
/// Get a reference to the current sorted runs.
pub fn sorted_runs(&mut self) -> &mut SortedRuns {
&mut self.layer_map.sorted_runs
}
///
@@ -145,31 +183,12 @@ where
///
/// This should be called when the corresponding file on disk has been deleted.
///
pub fn remove_historic(&mut self, layer_desc: PersistentLayerDesc, layer: Arc<L>) {
self.layer_map.remove_historic_noflush(layer_desc, layer)
pub fn remove_historic(&mut self, layer_desc: PersistentLayerDesc) {
self.remove_historic_new(layer_desc) // remove from layer map without populating tiering structure
}
/// Replaces existing layer iff it is the `expected`.
///
/// If the expected layer has been removed it will not be inserted by this function.
///
/// Returned `Replacement` describes succeeding in replacement or the reason why it could not
/// be done.
///
/// TODO replacement can be done without buffering and rebuilding layer map updates.
/// One way to do that is to add a layer of indirection for returned values, so
/// that we can replace values only by updating a hashmap.
pub fn replace_historic(
&mut self,
expected_desc: PersistentLayerDesc,
expected: &Arc<L>,
new_desc: PersistentLayerDesc,
new: Arc<L>,
) -> anyhow::Result<Replacement<Arc<L>>> {
fail::fail_point!("layermap-replace-notfound", |_| Ok(Replacement::NotFound));
self.layer_map
.replace_historic_noflush(expected_desc, expected, new_desc, new)
pub fn remove_historic_new(&mut self, layer_desc: PersistentLayerDesc) {
self.layer_map.remove_historic_noflush(layer_desc)
}
// We will flush on drop anyway, but this method makes it
@@ -185,25 +204,19 @@ where
// than panic later or read without flushing.
//
// TODO maybe warn if flush hasn't explicitly been called
impl<L> Drop for BatchedUpdates<'_, L>
where
L: ?Sized + Layer,
{
impl Drop for BatchedUpdates<'_> {
fn drop(&mut self) {
self.layer_map.flush_updates();
}
}
/// Return value of LayerMap::search
pub struct SearchResult<L: ?Sized> {
pub layer: Arc<L>,
pub struct SearchResult {
pub layer: Arc<PersistentLayerDesc>,
pub lsn_floor: Lsn,
}
impl<L> LayerMap<L>
where
L: ?Sized + Layer,
{
impl LayerMap {
///
/// Find the latest layer (by lsn.end) that covers the given
/// 'key', with lsn.start < 'end_lsn'.
@@ -235,16 +248,29 @@ where
/// NOTE: This only searches the 'historic' layers, *not* the
/// 'open' and 'frozen' layers!
///
pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult<L>> {
pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult> {
self.search_incremental(key, end_lsn, false)
}
pub fn search_incremental(
&self,
key: Key,
end_lsn: Lsn,
exclude_image: bool,
) -> Option<SearchResult> {
let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?;
let latest_delta = version.delta_coverage.query(key.to_i128());
let latest_image = version.image_coverage.query(key.to_i128());
let latest_image = if exclude_image {
let version = self.historic.get().unwrap().get_version(end_lsn.0 - 2)?;
version.image_coverage.query(key.to_i128())
} else {
version.image_coverage.query(key.to_i128())
};
match (latest_delta, latest_image) {
(None, None) => None,
(None, Some(image)) => {
let lsn_floor = image.get_lsn_range().start;
let image = self.get_layer_from_mapping(&image.key()).clone();
let lsn_floor = image.get_lsn_range().end;
Some(SearchResult {
layer: image,
lsn_floor,
@@ -252,7 +278,6 @@ where
}
(Some(delta), None) => {
let lsn_floor = delta.get_lsn_range().start;
let delta = self.get_layer_from_mapping(&delta.key()).clone();
Some(SearchResult {
layer: delta,
lsn_floor,
@@ -263,15 +288,13 @@ where
let image_is_newer = image.get_lsn_range().end >= delta.get_lsn_range().end;
let image_exact_match = img_lsn + 1 == end_lsn;
if image_is_newer || image_exact_match {
let image = self.get_layer_from_mapping(&image.key()).clone();
Some(SearchResult {
layer: image,
lsn_floor: img_lsn,
lsn_floor: img_lsn + 1,
})
} else {
let lsn_floor =
std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1);
let delta = self.get_layer_from_mapping(&delta.key()).clone();
Some(SearchResult {
layer: delta,
lsn_floor,
@@ -282,7 +305,7 @@ where
}
/// Start a batch of updates, applied on drop
pub fn batch_update(&mut self) -> BatchedUpdates<'_, L> {
pub fn batch_update(&mut self) -> BatchedUpdates<'_> {
BatchedUpdates { layer_map: self }
}
@@ -292,48 +315,32 @@ where
/// Helper function for BatchedUpdates::insert_historic
///
/// TODO(chi): remove L generic so that we do not need to pass layer object.
pub(self) fn insert_historic_noflush(
&mut self,
layer_desc: PersistentLayerDesc,
layer: Arc<L>,
) {
self.mapping.insert(layer_desc.key(), layer.clone());
pub(self) fn insert_historic_noflush(&mut self, layer_desc: PersistentLayerDesc) {
// TODO: See #3869, resulting #4088, attempted fix and repro #4094
if Self::is_l0(&layer) {
if Self::is_l0(&layer_desc) {
self.l0_delta_layers.push(layer_desc.clone().into());
}
self.historic.insert(
historic_layer_coverage::LayerKey::from(&*layer),
historic_layer_coverage::LayerKey::from(&layer_desc),
layer_desc.into(),
);
}
fn get_layer_from_mapping(&self, key: &PersistentLayerKey) -> &Arc<L> {
let layer = self
.mapping
.get(key)
.with_context(|| format!("{key:?}"))
.expect("inconsistent layer mapping");
layer
}
///
/// Remove an on-disk layer from the map.
///
/// Helper function for BatchedUpdates::remove_historic
///
pub fn remove_historic_noflush(&mut self, layer_desc: PersistentLayerDesc, layer: Arc<L>) {
pub fn remove_historic_noflush(&mut self, layer_desc: PersistentLayerDesc) {
self.historic
.remove(historic_layer_coverage::LayerKey::from(&*layer));
if Self::is_l0(&layer) {
.remove(historic_layer_coverage::LayerKey::from(&layer_desc));
let layer_key = layer_desc.key();
if Self::is_l0(&layer_desc) {
let len_before = self.l0_delta_layers.len();
let mut l0_delta_layers = std::mem::take(&mut self.l0_delta_layers);
l0_delta_layers.retain(|other| {
!Self::compare_arced_layers(self.get_layer_from_mapping(&other.key()), &layer)
});
l0_delta_layers.retain(|other| other.key() != layer_key);
self.l0_delta_layers = l0_delta_layers;
// this assertion is related to use of Arc::ptr_eq in Self::compare_arced_layers,
// there's a chance that the comparison fails at runtime due to it comparing (pointer,
@@ -344,69 +351,6 @@ where
"failed to locate removed historic layer from l0_delta_layers"
);
}
self.mapping.remove(&layer_desc.key());
}
pub(self) fn replace_historic_noflush(
&mut self,
expected_desc: PersistentLayerDesc,
expected: &Arc<L>,
new_desc: PersistentLayerDesc,
new: Arc<L>,
) -> anyhow::Result<Replacement<Arc<L>>> {
let key = historic_layer_coverage::LayerKey::from(&**expected);
let other = historic_layer_coverage::LayerKey::from(&*new);
let expected_l0 = Self::is_l0(expected);
let new_l0 = Self::is_l0(&new);
anyhow::ensure!(
key == other,
"expected and new must have equal LayerKeys: {key:?} != {other:?}"
);
anyhow::ensure!(
expected_l0 == new_l0,
"expected and new must both be l0 deltas or neither should be: {expected_l0} != {new_l0}"
);
let l0_index = if expected_l0 {
// find the index in case replace worked, we need to replace that as well
let pos = self.l0_delta_layers.iter().position(|slot| {
Self::compare_arced_layers(self.get_layer_from_mapping(&slot.key()), expected)
});
if pos.is_none() {
return Ok(Replacement::NotFound);
}
pos
} else {
None
};
let new_desc = Arc::new(new_desc);
let replaced = self.historic.replace(&key, new_desc.clone(), |existing| {
**existing == expected_desc
});
if let Replacement::Replaced { .. } = &replaced {
self.mapping.remove(&expected_desc.key());
self.mapping.insert(new_desc.key(), new);
if let Some(index) = l0_index {
self.l0_delta_layers[index] = new_desc;
}
}
let replaced = match replaced {
Replacement::Replaced { in_buffered } => Replacement::Replaced { in_buffered },
Replacement::NotFound => Replacement::NotFound,
Replacement::RemovalBuffered => Replacement::RemovalBuffered,
Replacement::Unexpected(x) => {
Replacement::Unexpected(self.get_layer_from_mapping(&x.key()).clone())
}
};
Ok(replaced)
}
/// Helper function for BatchedUpdates::drop.
@@ -454,10 +398,8 @@ where
Ok(true)
}
pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<L>> {
self.historic
.iter()
.map(|x| self.get_layer_from_mapping(&x.key()).clone())
pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<PersistentLayerDesc>> {
self.historic.iter()
}
///
@@ -472,7 +414,7 @@ where
&self,
key_range: &Range<Key>,
lsn: Lsn,
) -> Result<Vec<(Range<Key>, Option<Arc<L>>)>> {
) -> Result<Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)>> {
let version = match self.historic.get().unwrap().get_version(lsn.0) {
Some(v) => v,
None => return Ok(vec![]),
@@ -482,36 +424,26 @@ where
let end = key_range.end.to_i128();
// Initialize loop variables
let mut coverage: Vec<(Range<Key>, Option<Arc<L>>)> = vec![];
let mut coverage: Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)> = vec![];
let mut current_key = start;
let mut current_val = version.image_coverage.query(start);
// Loop through the change events and push intervals
for (change_key, change_val) in version.image_coverage.range(start..end) {
let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
coverage.push((
kr,
current_val
.take()
.map(|l| self.get_layer_from_mapping(&l.key()).clone()),
));
coverage.push((kr, current_val.take()));
current_key = change_key;
current_val = change_val.clone();
}
// Add the final interval
let kr = Key::from_i128(current_key)..Key::from_i128(end);
coverage.push((
kr,
current_val
.take()
.map(|l| self.get_layer_from_mapping(&l.key()).clone()),
));
coverage.push((kr, current_val.take()));
Ok(coverage)
}
pub fn is_l0(layer: &L) -> bool {
pub fn is_l0(layer: &PersistentLayerDesc) -> bool {
range_eq(&layer.get_key_range(), &(Key::MIN..Key::MAX))
}
@@ -537,7 +469,7 @@ where
/// TODO The optimal number should probably be slightly higher than 1, but to
/// implement that we need to plumb a lot more context into this function
/// than just the current partition_range.
pub fn is_reimage_worthy(layer: &L, partition_range: &Range<Key>) -> bool {
pub fn is_reimage_worthy(layer: &PersistentLayerDesc, partition_range: &Range<Key>) -> bool {
// Case 1
if !Self::is_l0(layer) {
return true;
@@ -595,9 +527,7 @@ where
let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
let lr = lsn.start..val.get_lsn_range().start;
if !kr.is_empty() {
let base_count =
Self::is_reimage_worthy(self.get_layer_from_mapping(&val.key()), key)
as usize;
let base_count = Self::is_reimage_worthy(&val, key) as usize;
let new_limit = limit.map(|l| l - base_count);
let max_stacked_deltas_underneath =
self.count_deltas(&kr, &lr, new_limit)?;
@@ -620,9 +550,7 @@ where
let lr = lsn.start..val.get_lsn_range().start;
if !kr.is_empty() {
let base_count =
Self::is_reimage_worthy(self.get_layer_from_mapping(&val.key()), key)
as usize;
let base_count = Self::is_reimage_worthy(&val, key) as usize;
let new_limit = limit.map(|l| l - base_count);
let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit)?;
max_stacked_deltas = std::cmp::max(
@@ -772,12 +700,8 @@ where
}
/// Return all L0 delta layers
pub fn get_level0_deltas(&self) -> Result<Vec<Arc<L>>> {
Ok(self
.l0_delta_layers
.iter()
.map(|x| self.get_layer_from_mapping(&x.key()).clone())
.collect())
pub fn get_level0_deltas(&self) -> Result<Vec<Arc<PersistentLayerDesc>>> {
Ok(self.l0_delta_layers.to_vec())
}
/// debugging function to print out the contents of the layer map
@@ -795,104 +719,95 @@ where
frozen_layer.dump(verbose, ctx)?;
}
println!("historic_layers:");
for layer in self.iter_historic_layers() {
println!("l0_deltas:");
for layer in &self.l0_delta_layers {
layer.dump(verbose, ctx)?;
}
println!("sorted_runs:");
for (lvl, (tier_id, layer)) in self.sorted_runs.runs.iter().enumerate() {
println!("tier {}", tier_id);
for layer in layer {
layer.dump(verbose, ctx)?;
}
}
println!("End dump LayerMap");
Ok(())
}
}
/// Similar to `Arc::ptr_eq`, but only compares the object pointers, not vtables.
///
/// Returns `true` if the two `Arc` point to the same layer, false otherwise.
#[inline(always)]
pub fn compare_arced_layers(left: &Arc<L>, right: &Arc<L>) -> bool {
// "dyn Trait" objects are "fat pointers" in that they have two components:
// - pointer to the object
// - pointer to the vtable
//
// rust does not provide a guarantee that these vtables are unique, but however
// `Arc::ptr_eq` as of writing (at least up to 1.67) uses a comparison where both the
// pointer and the vtable need to be equal.
//
// See: https://github.com/rust-lang/rust/issues/103763
//
// A future version of rust will most likely use this form below, where we cast each
// pointer into a pointer to unit, which drops the inaccessible vtable pointer, making it
// not affect the comparison.
//
// See: https://github.com/rust-lang/rust/pull/106450
let left = Arc::as_ptr(left) as *const ();
let right = Arc::as_ptr(right) as *const ();
/// Similar to `Arc::ptr_eq`, but only compares the object pointers, not vtables.
///
/// Returns `true` if the two `Arc` point to the same layer, false otherwise.
///
/// If comparing persistent layers, ALWAYS compare the layer descriptor key.
#[inline(always)]
pub fn compare_arced_layers<L: ?Sized>(left: &Arc<L>, right: &Arc<L>) -> bool {
// "dyn Trait" objects are "fat pointers" in that they have two components:
// - pointer to the object
// - pointer to the vtable
//
// rust does not provide a guarantee that these vtables are unique, but however
// `Arc::ptr_eq` as of writing (at least up to 1.67) uses a comparison where both the
// pointer and the vtable need to be equal.
//
// See: https://github.com/rust-lang/rust/issues/103763
//
// A future version of rust will most likely use this form below, where we cast each
// pointer into a pointer to unit, which drops the inaccessible vtable pointer, making it
// not affect the comparison.
//
// See: https://github.com/rust-lang/rust/pull/106450
let left = Arc::as_ptr(left) as *const ();
let right = Arc::as_ptr(right) as *const ();
left == right
}
left == right
}
#[cfg(test)]
mod tests {
use super::{LayerMap, Replacement};
use crate::tenant::storage_layer::{Layer, LayerDescriptor, LayerFileName};
use super::LayerMap;
use crate::tenant::storage_layer::{tests::LayerDescriptor, LayerFileName};
use std::str::FromStr;
use std::sync::Arc;
mod l0_delta_layers_updated {
use crate::tenant::storage_layer::{PersistentLayer, PersistentLayerDesc};
use super::*;
#[test]
#[ignore]
fn for_full_range_delta() {
// l0_delta_layers are used by compaction, and should observe all buffered updates
l0_delta_layers_updated_scenario(
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000053423C21-0000000053424D69",
true
)
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000053423C21-0000000053424D69",
true
)
}
#[test]
#[ignore]
fn for_non_full_range_delta() {
// has minimal uncovered areas compared to l0_delta_layers_updated_on_insert_replace_remove_for_full_range_delta
l0_delta_layers_updated_scenario(
"000000000000000000000000000000000001-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFE__0000000053423C21-0000000053424D69",
// because not full range
false
)
"000000000000000000000000000000000001-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFE__0000000053423C21-0000000053424D69",
// because not full range
false
)
}
#[test]
#[ignore]
fn for_image() {
l0_delta_layers_updated_scenario(
"000000000000000000000000000000000000-000000000000000000000000000000010000__0000000053424D69",
// code only checks if it is a full range layer, doesn't care about images, which must
// mean we should in practice never have full range images
false
)
}
#[test]
fn replacing_missing_l0_is_notfound() {
// original impl had an oversight, and L0 was an anyhow::Error. anyhow::Error should
// however only happen for precondition failures.
let layer = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000053423C21-0000000053424D69";
let layer = LayerFileName::from_str(layer).unwrap();
let layer = LayerDescriptor::from(layer);
// same skeletan construction; see scenario below
let not_found = Arc::new(layer.clone());
let new_version = Arc::new(layer);
let mut map = LayerMap::default();
let res = map.batch_update().replace_historic(
not_found.get_persistent_layer_desc(),
&not_found,
new_version.get_persistent_layer_desc(),
new_version,
);
assert!(matches!(res, Ok(Replacement::NotFound)), "{res:?}");
"000000000000000000000000000000000000-000000000000000000000000000000010000__0000000053424D69",
// code only checks if it is a full range layer, doesn't care about images, which must
// mean we should in practice never have full range images
false
)
}
fn l0_delta_layers_updated_scenario(layer_name: &str, expected_l0: bool) {
@@ -906,46 +821,31 @@ mod tests {
// two disjoint Arcs in different lifecycle phases. even if it seems they must be the
// same layer, we use LayerMap::compare_arced_layers as the identity of layers.
assert!(!LayerMap::compare_arced_layers(&remote, &downloaded));
assert_eq!(remote.layer_desc(), downloaded.layer_desc());
let expected_in_counts = (1, usize::from(expected_l0));
map.batch_update()
.insert_historic(remote.get_persistent_layer_desc(), remote.clone());
assert_eq!(count_layer_in(&map, &remote), expected_in_counts);
let replaced = map
.batch_update()
.replace_historic(
remote.get_persistent_layer_desc(),
&remote,
downloaded.get_persistent_layer_desc(),
downloaded.clone(),
)
.expect("name derived attributes are the same");
assert!(
matches!(replaced, Replacement::Replaced { .. }),
"{replaced:?}"
.insert_historic(remote.layer_desc().clone());
assert_eq!(
count_layer_in(&map, remote.layer_desc()),
expected_in_counts
);
assert_eq!(count_layer_in(&map, &downloaded), expected_in_counts);
map.batch_update()
.remove_historic(downloaded.get_persistent_layer_desc(), downloaded.clone());
assert_eq!(count_layer_in(&map, &downloaded), (0, 0));
.remove_historic(downloaded.layer_desc().clone());
assert_eq!(count_layer_in(&map, downloaded.layer_desc()), (0, 0));
}
fn count_layer_in<L: Layer + ?Sized>(map: &LayerMap<L>, layer: &Arc<L>) -> (usize, usize) {
fn count_layer_in(map: &LayerMap, layer: &PersistentLayerDesc) -> (usize, usize) {
let historic = map
.iter_historic_layers()
.filter(|x| LayerMap::compare_arced_layers(x, layer))
.filter(|x| x.key() == layer.key())
.count();
let l0s = map
.get_level0_deltas()
.expect("why does this return a result");
let l0 = l0s
.iter()
.filter(|x| LayerMap::compare_arced_layers(x, layer))
.count();
let l0 = l0s.iter().filter(|x| x.key() == layer.key()).count();
(historic, l0)
}

View File

@@ -3,6 +3,8 @@ use std::ops::Range;
use tracing::info;
use crate::tenant::storage_layer::PersistentLayerDesc;
use super::layer_coverage::LayerCoverageTuple;
/// Layers in this module are identified and indexed by this data.
@@ -41,14 +43,14 @@ impl Ord for LayerKey {
}
}
impl<'a, L: crate::tenant::storage_layer::Layer + ?Sized> From<&'a L> for LayerKey {
fn from(layer: &'a L) -> Self {
impl From<&PersistentLayerDesc> for LayerKey {
fn from(layer: &PersistentLayerDesc) -> Self {
let kr = layer.get_key_range();
let lr = layer.get_lsn_range();
LayerKey {
key: kr.start.to_i128()..kr.end.to_i128(),
lsn: lr.start.0..lr.end.0,
is_image: !layer.is_incremental(),
is_image: !layer.is_delta,
}
}
}
@@ -467,6 +469,11 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
///
/// Returns a `Replacement` value describing the outcome; only the case of
/// `Replacement::Replaced` modifies the map and requires a rebuild.
///
/// This function is unlikely to be used in the future because LayerMap now only records the
/// layer descriptors. Therefore, anything added to the layer map will only be removed or
/// added, and never replaced.
#[allow(dead_code)]
pub fn replace<F>(
&mut self,
layer_key: &LayerKey,

View File

@@ -176,13 +176,10 @@ impl LayerAccessStats {
/// Create an empty stats object and record a [`LayerLoad`] event with the given residence status.
///
/// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
pub(crate) fn for_loading_layer<L>(
layer_map_lock_held_witness: &BatchedUpdates<'_, L>,
pub(crate) fn for_loading_layer(
layer_map_lock_held_witness: &BatchedUpdates<'_>,
status: LayerResidenceStatus,
) -> Self
where
L: ?Sized + Layer,
{
) -> Self {
let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default()));
new.record_residence_event(
layer_map_lock_held_witness,
@@ -197,14 +194,11 @@ impl LayerAccessStats {
/// The `new_status` is not recorded in `self`.
///
/// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
pub(crate) fn clone_for_residence_change<L>(
pub(crate) fn clone_for_residence_change(
&self,
layer_map_lock_held_witness: &BatchedUpdates<'_, L>,
layer_map_lock_held_witness: &BatchedUpdates<'_>,
new_status: LayerResidenceStatus,
) -> LayerAccessStats
where
L: ?Sized + Layer,
{
) -> LayerAccessStats {
let clone = {
let inner = self.0.lock().unwrap();
inner.clone()
@@ -232,14 +226,12 @@ impl LayerAccessStats {
/// - Compact: Grab layer map lock, add the new L1 to layer map and remove the L0s, release layer map lock.
/// - Eviction: observes the new L1 layer whose only activity timestamp is the LayerCreate event.
///
pub(crate) fn record_residence_event<L>(
pub(crate) fn record_residence_event(
&self,
_layer_map_lock_held_witness: &BatchedUpdates<'_, L>,
_layer_map_lock_held_witness: &BatchedUpdates<'_>,
status: LayerResidenceStatus,
reason: LayerResidenceEventReason,
) where
L: ?Sized + Layer,
{
) {
let mut locked = self.0.lock().unwrap();
locked.iter_mut().for_each(|inner| {
inner
@@ -473,94 +465,125 @@ pub fn downcast_remote_layer(
}
}
/// Holds metadata about a layer without any content. Used mostly for testing.
///
/// To use filenames as fixtures, parse them as [`LayerFileName`] then convert from that to a
/// LayerDescriptor.
#[derive(Clone, Debug)]
pub struct LayerDescriptor {
pub key: Range<Key>,
pub lsn: Range<Lsn>,
pub is_incremental: bool,
pub short_id: String,
}
pub mod tests {
use super::*;
impl LayerDescriptor {
/// `LayerDescriptor` is only used for testing purpose so it does not matter whether it is image / delta,
/// and the tenant / timeline id does not matter.
pub fn get_persistent_layer_desc(&self) -> PersistentLayerDesc {
PersistentLayerDesc::new_delta(
TenantId::from_array([0; 16]),
TimelineId::from_array([0; 16]),
self.key.clone(),
self.lsn.clone(),
233,
)
}
}
impl Layer for LayerDescriptor {
fn get_key_range(&self) -> Range<Key> {
self.key.clone()
/// Holds metadata about a layer without any content. Used mostly for testing.
///
/// To use filenames as fixtures, parse them as [`LayerFileName`] then convert from that to a
/// LayerDescriptor.
#[derive(Clone, Debug)]
pub struct LayerDescriptor {
base: PersistentLayerDesc,
}
fn get_lsn_range(&self) -> Range<Lsn> {
self.lsn.clone()
}
fn is_incremental(&self) -> bool {
self.is_incremental
}
fn get_value_reconstruct_data(
&self,
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_data: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> Result<ValueReconstructResult> {
todo!("This method shouldn't be part of the Layer trait")
}
fn short_id(&self) -> String {
self.short_id.clone()
}
fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
todo!()
}
}
impl From<DeltaFileName> for LayerDescriptor {
fn from(value: DeltaFileName) -> Self {
let short_id = value.to_string();
LayerDescriptor {
key: value.key_range,
lsn: value.lsn_range,
is_incremental: true,
short_id,
impl From<PersistentLayerDesc> for LayerDescriptor {
fn from(base: PersistentLayerDesc) -> Self {
Self { base }
}
}
}
impl From<ImageFileName> for LayerDescriptor {
fn from(value: ImageFileName) -> Self {
let short_id = value.to_string();
let lsn = value.lsn_as_range();
LayerDescriptor {
key: value.key_range,
lsn,
is_incremental: false,
short_id,
impl Layer for LayerDescriptor {
fn get_value_reconstruct_data(
&self,
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_data: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> Result<ValueReconstructResult> {
todo!("This method shouldn't be part of the Layer trait")
}
fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
todo!()
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
fn get_key_range(&self) -> Range<Key> {
self.layer_desc().key_range.clone()
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
fn get_lsn_range(&self) -> Range<Lsn> {
self.layer_desc().lsn_range.clone()
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
fn is_incremental(&self) -> bool {
self.layer_desc().is_incremental
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
fn short_id(&self) -> String {
self.layer_desc().short_id()
}
}
}
impl From<LayerFileName> for LayerDescriptor {
fn from(value: LayerFileName) -> Self {
match value {
LayerFileName::Delta(d) => Self::from(d),
LayerFileName::Image(i) => Self::from(i),
impl PersistentLayer for LayerDescriptor {
fn layer_desc(&self) -> &PersistentLayerDesc {
&self.base
}
fn local_path(&self) -> Option<PathBuf> {
unimplemented!()
}
fn iter(&self, _: &RequestContext) -> Result<LayerIter<'_>> {
unimplemented!()
}
fn key_iter(&self, _: &RequestContext) -> Result<LayerKeyIter<'_>> {
unimplemented!()
}
fn delete_resident_layer_file(&self) -> Result<()> {
unimplemented!()
}
fn info(&self, _: LayerAccessStatsReset) -> HistoricLayerInfo {
unimplemented!()
}
fn access_stats(&self) -> &LayerAccessStats {
unimplemented!()
}
}
impl From<DeltaFileName> for LayerDescriptor {
fn from(value: DeltaFileName) -> Self {
LayerDescriptor {
base: PersistentLayerDesc::new_delta(
TenantId::from_array([0; 16]),
TimelineId::from_array([0; 16]),
value.key_range,
value.lsn_range,
233,
),
}
}
}
impl From<ImageFileName> for LayerDescriptor {
fn from(value: ImageFileName) -> Self {
LayerDescriptor {
base: PersistentLayerDesc::new_img(
TenantId::from_array([0; 16]),
TimelineId::from_array([0; 16]),
value.key_range,
value.lsn,
false,
233,
),
}
}
}
impl From<LayerFileName> for LayerDescriptor {
fn from(value: LayerFileName) -> Self {
match value {
LayerFileName::Delta(d) => Self::from(d),
LayerFileName::Image(i) => Self::from(i),
}
}
}
}

View File

@@ -222,13 +222,14 @@ impl Layer for DeltaLayer {
/// debugging function to print out the contents of the layer
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
println!(
"----- delta layer for ten {} tli {} keys {}-{} lsn {}-{} ----",
"----- delta layer for ten {} tli {} keys {}-{} lsn {}-{} size {} ----",
self.desc.tenant_id,
self.desc.timeline_id,
self.desc.key_range.start,
self.desc.key_range.end,
self.desc.lsn_range.start,
self.desc.lsn_range.end
self.desc.lsn_range.end,
self.desc.file_size
);
if !verbose {

View File

@@ -153,12 +153,13 @@ impl Layer for ImageLayer {
/// debugging function to print out the contents of the layer
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
println!(
"----- image layer for ten {} tli {} key {}-{} at {} ----",
"----- image layer for ten {} tli {} key {}-{} at {} size {} ----",
self.desc.tenant_id,
self.desc.timeline_id,
self.desc.key_range.start,
self.desc.key_range.end,
self.lsn
self.lsn,
self.desc.file_size
);
if !verbose {
@@ -212,7 +213,11 @@ impl Layer for ImageLayer {
reconstruct_state.img = Some((self.lsn, value));
Ok(ValueReconstructResult::Complete)
} else {
Ok(ValueReconstructResult::Missing)
if self.desc.is_incremental {
Ok(ValueReconstructResult::Continue)
} else {
Ok(ValueReconstructResult::Missing)
}
}
}
@@ -404,7 +409,7 @@ impl ImageLayer {
timeline_id,
filename.key_range.clone(),
filename.lsn,
false,
true,
file_size,
), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
lsn: filename.lsn,
@@ -436,7 +441,7 @@ impl ImageLayer {
summary.timeline_id,
summary.key_range,
summary.lsn,
false,
true,
metadata.len(),
), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
lsn: summary.lsn,
@@ -481,12 +486,14 @@ struct ImageLayerWriterInner {
path: PathBuf,
timeline_id: TimelineId,
tenant_id: TenantId,
key_range: Range<Key>,
lsn: Lsn,
is_incremental: bool,
blob_writer: WriteBlobWriter<VirtualFile>,
tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
start_key: Key,
last_key: Option<Key>,
}
impl ImageLayerWriterInner {
@@ -497,8 +504,8 @@ impl ImageLayerWriterInner {
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_id: TenantId,
key_range: &Range<Key>,
lsn: Lsn,
start_key: Key,
is_incremental: bool,
) -> anyhow::Result<Self> {
// Create the file initially with a temporary filename.
@@ -508,7 +515,7 @@ impl ImageLayerWriterInner {
timeline_id,
tenant_id,
&ImageFileName {
key_range: key_range.clone(),
key_range: start_key..start_key, // TODO(chi): use number instead of dummy range
lsn,
},
);
@@ -530,11 +537,12 @@ impl ImageLayerWriterInner {
path,
timeline_id,
tenant_id,
key_range: key_range.clone(),
lsn,
tree: tree_builder,
blob_writer,
is_incremental,
start_key,
last_key: None,
};
Ok(writer)
@@ -546,7 +554,14 @@ impl ImageLayerWriterInner {
/// The page versions must be appended in blknum order.
///
fn put_image(&mut self, key: Key, img: &[u8]) -> anyhow::Result<()> {
ensure!(self.key_range.contains(&key));
if cfg!(debug_assertions) {
ensure!(key >= self.start_key);
if let Some(last_key) = self.last_key.as_ref() {
ensure!(last_key < &key);
}
self.last_key = Some(key.clone());
}
let off = self.blob_writer.write_blob(img)?;
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
@@ -559,7 +574,7 @@ impl ImageLayerWriterInner {
///
/// Finish writing the image layer.
///
fn finish(self) -> anyhow::Result<ImageLayer> {
fn finish(self, end_key: Key) -> anyhow::Result<ImageLayer> {
let index_start_blk =
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
@@ -572,13 +587,15 @@ impl ImageLayerWriterInner {
file.write_all(buf.as_ref())?;
}
let key_range = self.start_key.clone()..end_key;
// Fill in the summary on blk 0
let summary = Summary {
magic: IMAGE_FILE_MAGIC,
format_version: STORAGE_FORMAT_VERSION,
tenant_id: self.tenant_id,
timeline_id: self.timeline_id,
key_range: self.key_range.clone(),
key_range: key_range.clone(),
lsn: self.lsn,
index_start_blk,
index_root_blk,
@@ -593,7 +610,7 @@ impl ImageLayerWriterInner {
let desc = PersistentLayerDesc::new_img(
self.tenant_id,
self.timeline_id,
self.key_range.clone(),
key_range.clone(),
self.lsn,
self.is_incremental, // for now, image layer ALWAYS covers the full range
metadata.len(),
@@ -627,7 +644,7 @@ impl ImageLayerWriterInner {
self.timeline_id,
self.tenant_id,
&ImageFileName {
key_range: self.key_range.clone(),
key_range,
lsn: self.lsn,
},
);
@@ -637,6 +654,10 @@ impl ImageLayerWriterInner {
Ok(layer)
}
fn size(&self) -> u64 {
self.blob_writer.size() + self.tree.borrow_writer().size()
}
}
/// A builder object for constructing a new image layer.
@@ -673,7 +694,7 @@ impl ImageLayerWriter {
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_id: TenantId,
key_range: &Range<Key>,
start_key: Key,
lsn: Lsn,
is_incremental: bool,
) -> anyhow::Result<ImageLayerWriter> {
@@ -682,8 +703,8 @@ impl ImageLayerWriter {
conf,
timeline_id,
tenant_id,
key_range,
lsn,
start_key,
is_incremental,
)?),
})
@@ -701,8 +722,12 @@ impl ImageLayerWriter {
///
/// Finish writing the image layer.
///
pub fn finish(mut self) -> anyhow::Result<ImageLayer> {
self.inner.take().unwrap().finish()
pub fn finish(mut self, end_key: Key) -> anyhow::Result<ImageLayer> {
self.inner.take().unwrap().finish(end_key)
}
pub fn size(&self) -> u64 {
self.inner.as_ref().unwrap().size()
}
}

View File

@@ -11,6 +11,7 @@ use crate::tenant::blob_io::{BlobCursor, BlobWriter};
use crate::tenant::block_io::BlockReader;
use crate::tenant::ephemeral_file::EphemeralFile;
use crate::tenant::storage_layer::{ValueReconstructResult, ValueReconstructState};
use crate::tenant::timeline::ENABLE_TIERED_COMPACTION;
use crate::walrecord;
use anyhow::{ensure, Result};
use pageserver_api::models::InMemoryLayerInfo;
@@ -149,8 +150,8 @@ impl Layer for InMemoryLayer {
.unwrap_or_default();
println!(
"----- in-memory layer for tli {} LSNs {}-{} ----",
self.timeline_id, self.start_lsn, end_str,
"----- in-memory layer LSNs {}-{} ----",
self.start_lsn, end_str,
);
if !verbose {
@@ -341,11 +342,18 @@ impl InMemoryLayer {
// rare though, so we just accept the potential latency hit for now.
let inner = self.inner.read().unwrap();
let mut keys: Vec<(&Key, &VecMap<Lsn, u64>)> = inner.index.iter().collect();
keys.sort_by_key(|k| k.0);
let mut delta_layer_writer = DeltaLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_id,
Key::MIN,
if ENABLE_TIERED_COMPACTION {
keys.first().unwrap().0.clone()
} else {
Key::MIN
},
self.start_lsn..inner.end_lsn.unwrap(),
)?;
@@ -353,9 +361,6 @@ impl InMemoryLayer {
let mut cursor = inner.file.block_cursor();
let mut keys: Vec<(&Key, &VecMap<Lsn, u64>)> = inner.index.iter().collect();
keys.sort_by_key(|k| k.0);
for (key, vec_map) in keys.iter() {
let key = **key;
// Write all page versions
@@ -366,7 +371,11 @@ impl InMemoryLayer {
}
}
let delta_layer = delta_layer_writer.finish(Key::MAX)?;
let delta_layer = delta_layer_writer.finish(if ENABLE_TIERED_COMPACTION {
keys.last().unwrap().0.next()
} else {
Key::MAX
})?;
Ok(delta_layer)
}
}

View File

@@ -173,13 +173,14 @@ impl PersistentLayerDesc {
pub fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
println!(
"----- layer for ten {} tli {} keys {}-{} lsn {}-{} ----",
self.tenant_id,
self.timeline_id,
"----- layer for keys {}-{} lsn {}-{} size {} is_delta {} is_incremental {} ----",
self.key_range.start,
self.key_range.end,
self.lsn_range.start,
self.lsn_range.end
self.lsn_range.end,
self.file_size,
self.is_delta,
self.is_incremental
);
Ok(())

View File

@@ -218,15 +218,12 @@ impl RemoteLayer {
}
/// Create a Layer struct representing this layer, after it has been downloaded.
pub fn create_downloaded_layer<L>(
pub fn create_downloaded_layer(
&self,
layer_map_lock_held_witness: &BatchedUpdates<'_, L>,
layer_map_lock_held_witness: &BatchedUpdates<'_>,
conf: &'static PageServerConf,
file_size: u64,
) -> Arc<dyn PersistentLayer>
where
L: ?Sized + Layer,
{
) -> Arc<dyn PersistentLayer> {
if self.desc.is_delta {
let fname = self.desc.delta_file_name();
Arc::new(DeltaLayer::new(

View File

@@ -14,35 +14,43 @@ use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::completion;
use super::timeline::ENABLE_TIERED_COMPACTION;
/// Start per tenant background loops: compaction and gc.
pub fn start_background_loops(
tenant: &Arc<Tenant>,
background_jobs_can_start: Option<&completion::Barrier>,
) {
let tenant_id = tenant.tenant_id;
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::Compaction,
Some(tenant_id),
None,
&format!("compactor for tenant {tenant_id}"),
false,
{
let tenant = Arc::clone(tenant);
let background_jobs_can_start = background_jobs_can_start.cloned();
async move {
let cancel = task_mgr::shutdown_token();
tokio::select! {
_ = cancel.cancelled() => { return Ok(()) },
_ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
};
compaction_loop(tenant, cancel)
.instrument(info_span!("compaction_loop", tenant_id = %tenant_id))
.await;
Ok(())
}
},
);
// start two compaction threads
let range = if ENABLE_TIERED_COMPACTION { 0..4 } else { 0..1 };
for cpt_id in range {
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::Compaction,
Some(tenant_id),
None,
&format!("compactor for tenant {tenant_id}"),
false,
{
let tenant = Arc::clone(tenant);
let background_jobs_can_start = background_jobs_can_start.cloned();
async move {
let cancel = task_mgr::shutdown_token();
tokio::select! {
_ = cancel.cancelled() => { return Ok(()) },
_ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
};
compaction_loop(tenant, cancel)
.instrument(
info_span!("compaction_loop", tenant_id = %tenant_id, cpt_id = %cpt_id),
)
.await;
Ok(())
}
},
);
}
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::GarbageCollector,

File diff suppressed because it is too large Load Diff

View File

@@ -197,9 +197,11 @@ impl Timeline {
// We don't want to hold the layer map lock during eviction.
// So, we just need to deal with this.
let candidates: Vec<Arc<dyn PersistentLayer>> = {
let layers = self.layers.read().await;
let guard = self.layers.read().await;
let (layers, _) = &*guard;
let mut candidates = Vec::new();
for hist_layer in layers.iter_historic_layers() {
let hist_layer = self.lcache.get_from_desc(&hist_layer);
if hist_layer.is_remote_layer() {
continue;
}