Compare commits

...

8 Commits

Author SHA1 Message Date
John Spray
2625f21751 fuckery 2024-07-26 21:25:11 +00:00
John Spray
7e655d5e72 Optimization: btreemap instead of vec sort 2024-07-26 16:21:41 +00:00
John Spray
61bf7f046e Test & bench for layer visibility 2024-07-26 16:21:41 +00:00
John Spray
23ba598970 pageserver: remove spurious fsync after image layer creation 2024-07-26 16:21:41 +00:00
John Spray
e00e498817 pageserver: remove unused inplace flag to timeline deletion 2024-07-26 16:21:41 +00:00
John Spray
ceaac67347 pageserver: implementation of update_layer_visibility 2024-07-26 16:21:41 +00:00
John Spray
826e604772 make iter_historic_layers return references 2024-07-26 15:18:22 +00:00
John Spray
97416d5e6c cargo: add range-set-blaze dependency 2024-07-26 13:26:59 +01:00
13 changed files with 462 additions and 59 deletions

56
Cargo.lock generated
View File

@@ -1418,7 +1418,7 @@ dependencies = [
"clap",
"criterion-plot",
"is-terminal",
"itertools",
"itertools 0.10.5",
"num-traits",
"once_cell",
"oorandom",
@@ -1439,7 +1439,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1"
dependencies = [
"cast",
"itertools",
"itertools 0.10.5",
]
[[package]]
@@ -2133,6 +2133,12 @@ dependencies = [
"slab",
]
[[package]]
name = "gen_ops"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "304de19db7028420975a296ab0fcbbc8e69438c4ed254a1e41e2a7f37d5f0e0a"
[[package]]
name = "generic-array"
version = "0.14.7"
@@ -2757,6 +2763,15 @@ dependencies = [
"either",
]
[[package]]
name = "itertools"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569"
dependencies = [
"either",
]
[[package]]
name = "itoa"
version = "1.0.6"
@@ -3574,7 +3589,7 @@ dependencies = [
"humantime",
"humantime-serde",
"hyper 0.14.26",
"itertools",
"itertools 0.10.5",
"leaky-bucket",
"md5",
"metrics",
@@ -3594,6 +3609,7 @@ dependencies = [
"pq_proto",
"procfs 0.14.2",
"rand 0.8.5",
"range-set-blaze",
"regex",
"remote_storage",
"reqwest 0.12.4",
@@ -3644,7 +3660,7 @@ dependencies = [
"hex",
"humantime",
"humantime-serde",
"itertools",
"itertools 0.10.5",
"postgres_ffi",
"rand 0.8.5",
"serde",
@@ -3702,7 +3718,7 @@ dependencies = [
"hex-literal",
"humantime",
"humantime-serde",
"itertools",
"itertools 0.10.5",
"metrics",
"once_cell",
"pageserver_api",
@@ -4034,7 +4050,7 @@ name = "postgres_connection"
version = "0.1.0"
dependencies = [
"anyhow",
"itertools",
"itertools 0.10.5",
"once_cell",
"postgres",
"tokio-postgres",
@@ -4092,7 +4108,7 @@ version = "0.1.0"
dependencies = [
"byteorder",
"bytes",
"itertools",
"itertools 0.10.5",
"pin-project-lite",
"postgres-protocol",
"rand 0.8.5",
@@ -4210,7 +4226,7 @@ checksum = "119533552c9a7ffacc21e099c24a0ac8bb19c2a2a3f363de84cd9b844feab270"
dependencies = [
"bytes",
"heck 0.4.1",
"itertools",
"itertools 0.10.5",
"lazy_static",
"log",
"multimap",
@@ -4231,7 +4247,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4"
dependencies = [
"anyhow",
"itertools",
"itertools 0.10.5",
"proc-macro2",
"quote",
"syn 1.0.109",
@@ -4288,7 +4304,7 @@ dependencies = [
"hyper-util",
"indexmap 2.0.1",
"ipnet",
"itertools",
"itertools 0.10.5",
"lasso",
"md5",
"measured",
@@ -4464,6 +4480,18 @@ dependencies = [
"rand_core 0.5.1",
]
[[package]]
name = "range-set-blaze"
version = "0.1.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8421b5d459262eabbe49048d362897ff3e3830b44eac6cfe341d6acb2f0f13d2"
dependencies = [
"gen_ops",
"itertools 0.12.1",
"num-integer",
"num-traits",
]
[[package]]
name = "rayon"
version = "1.7.0"
@@ -4632,7 +4660,7 @@ dependencies = [
"humantime",
"humantime-serde",
"hyper 0.14.26",
"itertools",
"itertools 0.10.5",
"metrics",
"once_cell",
"pin-project-lite",
@@ -5728,7 +5756,7 @@ dependencies = [
"hex",
"humantime",
"hyper 0.14.26",
"itertools",
"itertools 0.10.5",
"lasso",
"measured",
"metrics",
@@ -5794,7 +5822,7 @@ dependencies = [
"futures-util",
"hex",
"humantime",
"itertools",
"itertools 0.10.5",
"once_cell",
"pageserver",
"pageserver_api",
@@ -7447,7 +7475,7 @@ dependencies = [
"hmac",
"hyper 0.14.26",
"indexmap 1.9.3",
"itertools",
"itertools 0.10.5",
"libc",
"log",
"memchr",

View File

@@ -49,6 +49,7 @@ postgres_backend.workspace = true
postgres-protocol.workspace = true
postgres-types.workspace = true
rand.workspace = true
range-set-blaze = { version = "0.1.16", features = ["alloc"] }
regex.workspace = true
scopeguard.workspace = true
serde.workspace = true

View File

@@ -1,3 +1,4 @@
use criterion::measurement::WallTime;
use pageserver::keyspace::{KeyPartitioning, KeySpace};
use pageserver::repository::Key;
use pageserver::tenant::layer_map::LayerMap;
@@ -15,7 +16,11 @@ use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use criterion::{black_box, criterion_group, criterion_main, BenchmarkGroup, Criterion};
fn fixture_path(relative: &str) -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(relative)
}
fn build_layer_map(filename_dump: PathBuf) -> LayerMap {
let mut layer_map = LayerMap::default();
@@ -109,7 +114,7 @@ fn uniform_key_partitioning(layer_map: &LayerMap, _lsn: Lsn) -> KeyPartitioning
// between each test run.
fn bench_from_captest_env(c: &mut Criterion) {
// TODO consider compressing this file
let layer_map = build_layer_map(PathBuf::from("benches/odd-brook-layernames.txt"));
let layer_map = build_layer_map(fixture_path("benches/odd-brook-layernames.txt"));
let queries: Vec<(Key, Lsn)> = uniform_query_pattern(&layer_map);
// Test with uniform query pattern
@@ -139,7 +144,7 @@ fn bench_from_captest_env(c: &mut Criterion) {
fn bench_from_real_project(c: &mut Criterion) {
// Init layer map
let now = Instant::now();
let layer_map = build_layer_map(PathBuf::from("benches/odd-brook-layernames.txt"));
let layer_map = build_layer_map(fixture_path("benches/odd-brook-layernames.txt"));
println!("Finished layer map init in {:?}", now.elapsed());
// Choose uniformly distributed queries
@@ -242,7 +247,72 @@ fn bench_sequential(c: &mut Criterion) {
group.finish();
}
fn bench_visibility_with_map(
group: &mut BenchmarkGroup<WallTime>,
layer_map: LayerMap,
read_points: Vec<Lsn>,
bench_name: &str,
) {
group.bench_function(bench_name, |b| {
b.iter(|| black_box(layer_map.get_visibility(read_points.clone())));
});
}
// Benchmark using synthetic data. Arrange image layers on stacked diagonal lines.
fn bench_visibility(c: &mut Criterion) {
let mut group = c.benchmark_group("visibility");
{
// Init layer map. Create 100_000 layers arranged in 1000 diagonal lines.
let now = Instant::now();
let mut layer_map = LayerMap::default();
let mut updates = layer_map.batch_update();
for i in 0..100_000 {
let i32 = (i as u32) % 100;
let zero = Key::from_hex("000000000000000000000000000000000000").unwrap();
let layer = PersistentLayerDesc::new_img(
TenantShardId::unsharded(TenantId::generate()),
TimelineId::generate(),
zero.add(10 * i32)..zero.add(10 * i32 + 1),
Lsn(i),
0,
);
updates.insert_historic(layer);
}
updates.flush();
println!("Finished layer map init in {:?}", now.elapsed());
let mut read_points = Vec::new();
for i in (0..100_000).step_by(1000) {
read_points.push(Lsn(i));
}
bench_visibility_with_map(&mut group, layer_map, read_points, "sequential");
}
{
let layer_map = build_layer_map(fixture_path("benches/odd-brook-layernames.txt"));
let read_points = vec![Lsn(0x1C760FA190)];
bench_visibility_with_map(&mut group, layer_map, read_points, "real_map");
let layer_map = build_layer_map(fixture_path("benches/odd-brook-layernames.txt"));
let read_points = vec![
Lsn(0x1C760FA190),
Lsn(0x000000931BEAD539),
Lsn(0x000000931BF63011),
Lsn(0x000000931B33AE68),
Lsn(0x00000038E67ABFA0),
Lsn(0x000000931B33AE68),
Lsn(0x000000914E3F38F0),
Lsn(0x000000931B33AE68),
];
bench_visibility_with_map(&mut group, layer_map, read_points, "real_map_many_branches");
}
group.finish();
}
criterion_group!(group_1, bench_from_captest_env);
criterion_group!(group_2, bench_from_real_project);
criterion_group!(group_3, bench_sequential);
criterion_main!(group_1, group_2, group_3);
criterion_group!(group_4, bench_visibility);
criterion_main!(group_1, group_2, group_3, group_4);

View File

@@ -525,6 +525,15 @@ static RESIDENT_PHYSICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
.expect("failed to define a metric")
});
static VISIBLE_PHYSICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_visible_physical_size",
"The size of the layer files present in the pageserver's filesystem.",
&["tenant_id", "shard_id", "timeline_id"]
)
.expect("failed to define a metric")
});
pub(crate) static RESIDENT_PHYSICAL_SIZE_GLOBAL: Lazy<UIntGauge> = Lazy::new(|| {
register_uint_gauge!(
"pageserver_resident_physical_size_global",
@@ -2188,6 +2197,7 @@ pub(crate) struct TimelineMetrics {
pub(crate) layer_count_delta: UIntGauge,
pub standby_horizon_gauge: IntGauge,
pub resident_physical_size_gauge: UIntGauge,
pub visible_physical_size_gauge: UIntGauge,
/// copy of LayeredTimeline.current_logical_size
pub current_logical_size_gauge: UIntGauge,
pub aux_file_size_gauge: IntGauge,
@@ -2310,6 +2320,9 @@ impl TimelineMetrics {
let resident_physical_size_gauge = RESIDENT_PHYSICAL_SIZE
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
let visible_physical_size_gauge = VISIBLE_PHYSICAL_SIZE
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
// TODO: we shouldn't expose this metric
let current_logical_size_gauge = CURRENT_LOGICAL_SIZE
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
@@ -2364,6 +2377,7 @@ impl TimelineMetrics {
layer_count_delta,
standby_horizon_gauge,
resident_physical_size_gauge,
visible_physical_size_gauge,
current_logical_size_gauge,
aux_file_size_gauge,
directory_entries_count_gauge,
@@ -2415,6 +2429,7 @@ impl TimelineMetrics {
RESIDENT_PHYSICAL_SIZE_GLOBAL.sub(self.resident_physical_size_get());
let _ = RESIDENT_PHYSICAL_SIZE.remove_label_values(&[tenant_id, shard_id, timeline_id]);
}
let _ = VISIBLE_PHYSICAL_SIZE.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = CURRENT_LOGICAL_SIZE.remove_label_values(&[tenant_id, shard_id, timeline_id]);
if let Some(metric) = Lazy::get(&DIRECTORY_ENTRIES_COUNT) {
let _ = metric.remove_label_values(&[tenant_id, shard_id, timeline_id]);

View File

@@ -1563,7 +1563,7 @@ impl Tenant {
self: Arc<Self>,
timeline_id: TimelineId,
) -> Result<(), DeleteTimelineError> {
DeleteTimelineFlow::run(&self, timeline_id, false).await?;
DeleteTimelineFlow::run(&self, timeline_id).await?;
Ok(())
}

View File

@@ -51,8 +51,9 @@ use crate::keyspace::KeyPartitioning;
use crate::repository::Key;
use crate::tenant::storage_layer::InMemoryLayer;
use anyhow::Result;
use pageserver_api::keyspace::KeySpaceAccum;
use std::collections::{HashMap, VecDeque};
use pageserver_api::keyspace::{KeySpace, KeySpaceAccum};
use range_set_blaze::{CheckSortedDisjoint, RangeSetBlaze};
use std::collections::{BTreeMap, HashMap, VecDeque};
use std::iter::Peekable;
use std::ops::Range;
use std::sync::Arc;
@@ -61,7 +62,7 @@ use utils::lsn::Lsn;
use historic_layer_coverage::BufferedHistoricLayerCoverage;
pub use historic_layer_coverage::LayerKey;
use super::storage_layer::PersistentLayerDesc;
use super::storage_layer::{LayerVisibilityHint, PersistentLayerDesc};
///
/// LayerMap tracks what layers exist on a timeline.
@@ -543,7 +544,7 @@ impl LayerMap {
true
}
pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<PersistentLayerDesc>> {
pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = &Arc<PersistentLayerDesc>> {
self.historic.iter()
}
@@ -871,11 +872,193 @@ impl LayerMap {
println!("End dump LayerMap");
Ok(())
}
/// `read_points` represent the tip of a timeline and any branch points, i.e. the places
/// where we expect to serve reads.
///
/// This function is O(N) and should be called infrequently. The caller is responsible for
/// looking up and updating the Layer objects for these layer descriptors.
pub fn get_visibility<'a>(
&'a self,
mut read_points: Vec<Lsn>,
) -> (
Vec<(&'a Arc<PersistentLayerDesc>, LayerVisibilityHint)>,
KeySpace,
) {
// This is like a KeySpace, but this type is intended for efficient unions with image layer ranges, whereas
// KeySpace is intended to be composed statically and iterated over.
struct KeyShadow {
// Map of range start to range end
inner: RangeSetBlaze<i128>,
}
impl KeyShadow {
fn new() -> Self {
Self {
inner: Default::default(),
}
}
fn contains(&self, range: &Range<Key>) -> bool {
let range_incl = range.start.to_i128()..=range.end.to_i128() - 1;
self.inner.is_superset(&RangeSetBlaze::from_sorted_disjoint(
CheckSortedDisjoint::from([range_incl]),
))
}
/// Add the input range to the keys covered by self.
///
/// Return true if inserting this range covered some keys that were previously not covered
fn cover(&mut self, insert: Range<Key>) -> bool {
let range_incl = insert.start.to_i128()..=insert.end.to_i128() - 1;
self.inner.ranges_insert(range_incl)
}
fn reset(&mut self) {
self.inner = Default::default();
}
fn to_keyspace(&self) -> KeySpace {
let mut accum = KeySpaceAccum::new();
for range_incl in self.inner.ranges() {
let range = Range {
start: Key::from_i128(*range_incl.start()),
end: Key::from_i128(range_incl.end() + 1),
};
accum.add_range(range)
}
accum.to_keyspace()
}
}
// The 'shadow' will be updated as we sweep through the layers: an image layer subtracts from the shadow,
// and a ReadPoint
read_points.sort_by_key(|rp| rp.0);
let mut shadow = KeyShadow::new();
// We will interleave all our read points and layers into a sorted collection
enum Item<'a> {
ReadPoint { lsn: Lsn },
Layer(&'a Arc<PersistentLayerDesc>),
}
let mut items: BTreeMap<std::cmp::Reverse<_>, Item<'a>> = BTreeMap::new();
//let mut items: Vec<Item<'a>> = Vec::with_capacity(self.historic.len() + read_points.len());
// Ordering: we want to iterate like this:
// 1. Highest LSNs first
// 2. Consider ReadPoints before image layers if they're at the same LSN
items.extend(self.iter_historic_layers().map(|l| {
(
std::cmp::Reverse({
if l.is_delta() {
(l.get_lsn_range().end, 1)
} else {
(l.image_layer_lsn(), 2)
}
}),
Item::Layer(l),
)
}));
items.extend(
read_points
.into_iter()
.map(|lsn| (std::cmp::Reverse((lsn, 0)), Item::ReadPoint { lsn })),
);
let mut results: Vec<(&'a Arc<PersistentLayerDesc>, LayerVisibilityHint)> =
Vec::with_capacity(self.historic.len());
let mut maybe_covered_deltas: Vec<&'a Arc<PersistentLayerDesc>> = Vec::new();
for item in items.values() {
let (reached_lsn, is_readpoint) = match &item {
Item::ReadPoint { lsn } => (lsn, true),
Item::Layer(layer) => (&layer.lsn_range.start, false),
};
if !maybe_covered_deltas.is_empty() {
maybe_covered_deltas.retain(|d| {
if *reached_lsn >= d.lsn_range.start && is_readpoint {
// We encountered a readpoint within the delta layer: it is visible
results.push((d, LayerVisibilityHint::Visible));
false
} else if *reached_lsn < d.lsn_range.start {
// We passed the layer's range without encountering a read point: it is not visible
eprintln!("covered maybe @ {}", reached_lsn);
results.push((d, LayerVisibilityHint::Covered));
false
} else {
// We're still in the delta layer: continue iterating
true
}
});
}
match item {
Item::ReadPoint { lsn } => {
// TODO: propagate the child timeline's shadow from their own run of this function, so that we don't have
// to assume that the whole key range is visible at the branch point.
eprintln!("reset @ {lsn}");
shadow.reset();
}
Item::Layer(layer) => {
let visibility = if layer.is_delta() {
if !shadow.contains(&layer.key_range) {
eprintln!("shadow doesn't cover {}..{} @ {}",
layer.get_key_range().start,
layer.get_key_range().end,
layer.get_lsn_range().end);
LayerVisibilityHint::Visible
} else {
// If a layer isn't visible based on current state, we must defer deciding whether
// it is truly not visible until we have advanced past the delta's range: we might
// encounter another branch point within this delta layer's LSN range.
maybe_covered_deltas.push(layer);
continue;
}
} else if shadow.cover(layer.get_key_range()) {
// An image layer in a region which wasn't fully covered yet: this layer is visible, but layers below it will be covered
eprintln!("inserted coverage {}..{} @ {}",
layer.get_key_range().start,
layer.get_key_range().end,
layer.image_layer_lsn());
LayerVisibilityHint::Visible
} else {
// An image layer in a region that was already covered
eprintln!("covered image @ {}", layer.image_layer_lsn());
LayerVisibilityHint::Covered
};
results.push((layer, visibility));
}
}
}
// Drain any remaining maybe_covered deltas
results.extend(
maybe_covered_deltas
.into_iter()
.map(|d| (d, LayerVisibilityHint::Covered)),
);
eprintln!("shadow: {:?}", shadow.inner);
(results, shadow.to_keyspace())
}
}
#[cfg(test)]
mod tests {
use pageserver_api::keyspace::KeySpace;
use crate::tenant::{storage_layer::LayerName, IndexPart};
use pageserver_api::{key::DBDIR_KEY, keyspace::KeySpace};
use std::collections::HashMap;
use utils::{
id::{TenantId, TimelineId},
shard::TenantShardId,
};
use super::*;
@@ -1002,4 +1185,68 @@ mod tests {
}
}
}
#[test]
fn layer_visibility() {
// Load a large example layermap
let index_raw =
std::fs::read_to_string("test_data/indices/mixed_workload/index_part.json").unwrap();
let index: IndexPart = serde_json::from_str::<IndexPart>(&index_raw).unwrap();
let tenant_id = TenantId::generate();
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
let timeline_id = TimelineId::generate();
let mut layer_map = LayerMap::default();
let mut updates = layer_map.batch_update();
for (layer_name, layer_metadata) in index.layer_metadata {
let layer_desc = match layer_name {
LayerName::Image(layer_name) => PersistentLayerDesc {
key_range: layer_name.key_range.clone(),
lsn_range: layer_name.lsn_as_range(),
tenant_shard_id,
timeline_id,
is_delta: false,
file_size: layer_metadata.file_size,
},
LayerName::Delta(layer_name) => PersistentLayerDesc {
key_range: layer_name.key_range,
lsn_range: layer_name.lsn_range,
tenant_shard_id,
timeline_id,
is_delta: true,
file_size: layer_metadata.file_size,
},
};
updates.insert_historic(layer_desc);
}
updates.flush();
let read_points = vec![index.metadata.disk_consistent_lsn()];
let (layer_visibilities, shadow) = layer_map.get_visibility(read_points);
for (layer_desc, visibility) in &layer_visibilities {
tracing::info!("{layer_desc:?}: {visibility:?}");
eprintln!("{layer_desc:?}: {visibility:?}");
}
// At least some layers should be marked covered
assert!(layer_visibilities
.iter()
.any(|i| matches!(i.1, LayerVisibilityHint::Covered)));
// The shadow should be non-empty, since there were some image layers
assert!(!shadow.ranges.is_empty());
let layer_visibilities = layer_visibilities.into_iter().collect::<HashMap<_, _>>();
// Sanity: the layer that holds latest data for the DBDIR key should always be visible
// (just using this key as a key that will always exist for any layermap fixture)
let dbdir_layer = layer_map
.search(DBDIR_KEY, index.metadata.disk_consistent_lsn())
.unwrap();
assert!(matches!(
layer_visibilities.get(&dbdir_layer.layer).unwrap(),
LayerVisibilityHint::Visible
));
}
}

View File

@@ -499,14 +499,14 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
}
/// Iterate all the layers
pub fn iter(&self) -> impl '_ + Iterator<Item = Value> {
pub fn iter(&self) -> impl '_ + Iterator<Item = &Value> {
// NOTE we can actually perform this without rebuilding,
// but it's not necessary for now.
if !self.buffer.is_empty() {
panic!("rebuild pls")
}
self.layers.values().cloned()
self.layers.values()
}
/// Return a reference to a queryable map, assuming all updates
@@ -521,6 +521,10 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
Ok(&self.historic_coverage)
}
pub(crate) fn len(&self) -> usize {
self.layers.len()
}
}
#[test]

View File

@@ -452,7 +452,7 @@ pub enum ValueReconstructResult {
/// of layers (for example when creating a branch that makes some previously covered layers visible). It should
/// be used for cache management but not for correctness-critical checks.
#[derive(Default, Debug, Clone, PartialEq, Eq)]
pub(crate) enum LayerVisibilityHint {
pub enum LayerVisibilityHint {
/// A Visible layer might be read while serving a read, because there is not an image layer between it
/// and a readable LSN (the tip of the branch or a child's branch point)
Visible,

View File

@@ -1808,9 +1808,11 @@ impl Timeline {
}
match self.get_compaction_algorithm_settings().kind {
CompactionAlgorithm::Tiered => self.compact_tiered(cancel, ctx).await,
CompactionAlgorithm::Legacy => self.compact_legacy(cancel, flags, ctx).await,
CompactionAlgorithm::Tiered => self.compact_tiered(cancel, ctx).await?,
CompactionAlgorithm::Legacy => self.compact_legacy(cancel, flags, ctx).await?,
}
Ok(())
}
/// Mutate the timeline with a [`TimelineWriter`].
@@ -2728,6 +2730,9 @@ impl Timeline {
// Tenant::create_timeline will wait for these uploads to happen before returning, or
// on retry.
// Now that we have the full layer map, we may calculate the visibility of layers within it (a global scan)
self.update_layer_visibility().await;
info!(
"loaded layer map with {} layers at {}, total physical size: {}",
num_layers, disk_consistent_lsn, total_physical_size
@@ -4658,27 +4663,6 @@ impl Timeline {
}
}
// The writer.finish() above already did the fsync of the inodes.
// We just need to fsync the directory in which these inodes are linked,
// which we know to be the timeline directory.
if !image_layers.is_empty() {
// We use fatal_err() below because the after writer.finish() returns with success,
// the in-memory state of the filesystem already has the layer file in its final place,
// and subsequent pageserver code could think it's durable while it really isn't.
let timeline_dir = VirtualFile::open(
&self
.conf
.timeline_path(&self.tenant_shard_id, &self.timeline_id),
ctx,
)
.await
.fatal_err("VirtualFile::open for timeline dir fsync");
timeline_dir
.sync_all()
.await
.fatal_err("VirtualFile::sync_all timeline dir");
}
let mut guard = self.layers.write().await;
// FIXME: we could add the images to be uploaded *before* returning from here, but right
@@ -4687,6 +4671,9 @@ impl Timeline {
drop_wlock(guard);
timer.stop_and_record();
// Creating image layers may have caused some previously visible layers to be covered
self.update_layer_visibility().await;
Ok(image_layers)
}

View File

@@ -29,7 +29,9 @@ use crate::page_cache;
use crate::tenant::config::defaults::{DEFAULT_CHECKPOINT_DISTANCE, DEFAULT_COMPACTION_THRESHOLD};
use crate::tenant::remote_timeline_client::WaitCompletionError;
use crate::tenant::storage_layer::merge_iterator::MergeIterator;
use crate::tenant::storage_layer::{AsLayerDesc, PersistentLayerDesc, ValueReconstructState};
use crate::tenant::storage_layer::{
AsLayerDesc, LayerVisibilityHint, PersistentLayerDesc, ValueReconstructState,
};
use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
use crate::tenant::timeline::{Hole, ImageLayerCreationOutcome};
use crate::tenant::timeline::{Layer, ResidentLayer};
@@ -431,6 +433,52 @@ impl Timeline {
Ok(())
}
/// Update the LayerVisibilityHint of layers covered by image layers, based on whether there is
/// an image layer between them and the most recent readable LSN (branch point or tip of timeline). The
/// purpose of the visibility hint is to record which layers need to be available to service reads.
///
/// The result may be used as an input to eviction and secondary downloads to de-prioritize layers
/// that we know won't be needed for reads.
pub(super) async fn update_layer_visibility(&self) {
let head_lsn = self.get_last_record_lsn();
// We will sweep through layers in reverse-LSN order. We only do historic layers. L0 deltas
// are implicitly left visible, because LayerVisibilityHint's default is Visible, and we never modify it here.
// Note that L0 deltas _can_ be covered by image layers, but we consider them 'visible' because we anticipate that
// they will be subject to L0->L1 compaction in the near future.
let layer_manager = self.layers.read().await;
let layer_map = layer_manager.layer_map();
let readable_points = {
let children = self.gc_info.read().unwrap().retain_lsns.clone();
let mut readable_points = Vec::with_capacity(children.len() + 1);
for (child_lsn, _child_timeline_id) in &children {
readable_points.push(*child_lsn);
}
readable_points.push(head_lsn);
readable_points
};
let mut visible_size = 0;
let (layer_visibility, covered) = layer_map.get_visibility(readable_points);
for (layer_desc, visibility) in layer_visibility {
// FIXME: a more efficiency bulk zip() through the layers rather than NlogN getting each one
let layer = layer_manager.get_from_desc(&layer_desc);
if matches!(visibility, LayerVisibilityHint::Visible) {
visible_size += layer.metadata().file_size;
}
layer.access_stats().set_visibility(visibility);
}
// TODO: publish our covered KeySpace to our parent, so that when they update their visibility, they can
// avoid assuming that everything at a branch point is visible.
drop(covered);
self.metrics.visible_physical_size_gauge.set(visible_size);
}
/// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
/// as Level 1 files.
async fn compact_level0(
@@ -1596,7 +1644,7 @@ impl CompactionJobExecutor for TimelineAdaptor {
.filter(|l| {
overlaps_with(&l.lsn_range, lsn_range) && overlaps_with(&l.key_range, key_range)
})
.map(OwnArc)
.map(|l| OwnArc(l.clone()))
.collect();
Ok(result)
}

View File

@@ -206,11 +206,10 @@ impl DeleteTimelineFlow {
// NB: If this fails half-way through, and is retried, the retry will go through
// all the same steps again. Make sure the code here is idempotent, and don't
// error out if some of the shutdown tasks have already been completed!
#[instrument(skip_all, fields(%inplace))]
#[instrument(skip_all)]
pub async fn run(
tenant: &Arc<Tenant>,
timeline_id: TimelineId,
inplace: bool,
) -> Result<(), DeleteTimelineError> {
super::debug_assert_current_span_has_tenant_and_timeline_id();
@@ -235,11 +234,7 @@ impl DeleteTimelineFlow {
))?
});
if inplace {
Self::background(guard, tenant.conf, tenant, &timeline).await?
} else {
Self::schedule_background(guard, tenant.conf, Arc::clone(tenant), timeline);
}
Self::schedule_background(guard, tenant.conf, Arc::clone(tenant), timeline);
Ok(())
}

View File

@@ -0,0 +1,7 @@
# This was captured from one shard of a large tenant in staging.
# It has a mixture of deltas and image layers, >1000 layers in total.
# This is suitable for general smoke tests that want an index which is not
# trivially small, but doesn't contain weird/pathological cases.

File diff suppressed because one or more lines are too long