mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-28 02:20:42 +00:00
pageserver: handle in-memory layer overlaps with persistent layers (#11000)
## Problem Image layers may be nested inside in-memory layers as diagnosed [here](https://github.com/neondatabase/neon/issues/10720#issuecomment-2649419252). The read path doesn't support this and may skip over the image layer, resulting in a failure to reconstruct the page. ## Summary of changes We already support nesting of image layers inside delta layers. The logic lives in `LayerMap::select_layer`. The main goal of this PR is to propagate the candidate in-memory layer down to that point and update the selection logic. Important changes are: 1. Support partial reads for the in-memory layer. Previously, we could only specify the start LSN of the read. We need to control the end LSN too. 2. `LayerMap::ranged_search` considers in-memory layers too. Previously, the search for in-memory layers was done explicitly in `Timeline::get_reconstruct_data_timeline`. Note that `LayerMap::ranged_search` now returns a weak readable layer which the `LayerManager` can upgrade. This dance is such that we can unit test the layer selection logic. 3. Update `LayerMap::select_layer` to consider the candidate in-memory layer too Loosely related drive bys: 1. Remove the "keys not found" tracking in the ranged search. This wasn't used anywhere and it just complicates things. 2. Remove the difficulty map stuff from the layer map. Again, not used anywhere. Closes https://github.com/neondatabase/neon/issues/9185 Closes https://github.com/neondatabase/neon/issues/10720
This commit is contained in:
@@ -7,7 +7,6 @@ use std::time::Instant;
|
||||
|
||||
use criterion::measurement::WallTime;
|
||||
use criterion::{BenchmarkGroup, Criterion, black_box, criterion_group, criterion_main};
|
||||
use pageserver::keyspace::{KeyPartitioning, KeySpace};
|
||||
use pageserver::tenant::layer_map::LayerMap;
|
||||
use pageserver::tenant::storage_layer::{LayerName, PersistentLayerDesc};
|
||||
use pageserver_api::key::Key;
|
||||
@@ -72,41 +71,6 @@ fn uniform_query_pattern(layer_map: &LayerMap) -> Vec<(Key, Lsn)> {
|
||||
.collect()
|
||||
}
|
||||
|
||||
// Construct a partitioning for testing get_difficulty map when we
|
||||
// don't have an exact result of `collect_keyspace` to work with.
|
||||
fn uniform_key_partitioning(layer_map: &LayerMap, _lsn: Lsn) -> KeyPartitioning {
|
||||
let mut parts = Vec::new();
|
||||
|
||||
// We add a partition boundary at the start of each image layer,
|
||||
// no matter what lsn range it covers. This is just the easiest
|
||||
// thing to do. A better thing to do would be to get a real
|
||||
// partitioning from some database. Even better, remove the need
|
||||
// for key partitions by deciding where to create image layers
|
||||
// directly based on a coverage-based difficulty map.
|
||||
let mut keys: Vec<_> = layer_map
|
||||
.iter_historic_layers()
|
||||
.filter_map(|l| {
|
||||
if l.is_incremental() {
|
||||
None
|
||||
} else {
|
||||
let kr = l.get_key_range();
|
||||
Some(kr.start.next())
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
keys.sort();
|
||||
|
||||
let mut current_key = Key::from_hex("000000000000000000000000000000000000").unwrap();
|
||||
for key in keys {
|
||||
parts.push(KeySpace {
|
||||
ranges: vec![current_key..key],
|
||||
});
|
||||
current_key = key;
|
||||
}
|
||||
|
||||
KeyPartitioning { parts }
|
||||
}
|
||||
|
||||
// Benchmark using metadata extracted from our performance test environment, from
|
||||
// a project where we have run pgbench many timmes. The pgbench database was initialized
|
||||
// between each test run.
|
||||
@@ -148,41 +112,6 @@ fn bench_from_real_project(c: &mut Criterion) {
|
||||
// Choose uniformly distributed queries
|
||||
let queries: Vec<(Key, Lsn)> = uniform_query_pattern(&layer_map);
|
||||
|
||||
// Choose inputs for get_difficulty_map
|
||||
let latest_lsn = layer_map
|
||||
.iter_historic_layers()
|
||||
.map(|l| l.get_lsn_range().end)
|
||||
.max()
|
||||
.unwrap();
|
||||
let partitioning = uniform_key_partitioning(&layer_map, latest_lsn);
|
||||
|
||||
// Check correctness of get_difficulty_map
|
||||
// TODO put this in a dedicated test outside of this mod
|
||||
{
|
||||
println!("running correctness check");
|
||||
|
||||
let now = Instant::now();
|
||||
let result_bruteforce = layer_map.get_difficulty_map_bruteforce(latest_lsn, &partitioning);
|
||||
assert!(result_bruteforce.len() == partitioning.parts.len());
|
||||
println!("Finished bruteforce in {:?}", now.elapsed());
|
||||
|
||||
let now = Instant::now();
|
||||
let result_fast = layer_map.get_difficulty_map(latest_lsn, &partitioning, None);
|
||||
assert!(result_fast.len() == partitioning.parts.len());
|
||||
println!("Finished fast in {:?}", now.elapsed());
|
||||
|
||||
// Assert results are equal. Manually iterate for easier debugging.
|
||||
let zip = std::iter::zip(
|
||||
&partitioning.parts,
|
||||
std::iter::zip(result_bruteforce, result_fast),
|
||||
);
|
||||
for (_part, (bruteforce, fast)) in zip {
|
||||
assert_eq!(bruteforce, fast);
|
||||
}
|
||||
|
||||
println!("No issues found");
|
||||
}
|
||||
|
||||
// Define and name the benchmark function
|
||||
let mut group = c.benchmark_group("real_map");
|
||||
group.bench_function("uniform_queries", |b| {
|
||||
@@ -192,11 +121,6 @@ fn bench_from_real_project(c: &mut Criterion) {
|
||||
}
|
||||
});
|
||||
});
|
||||
group.bench_function("get_difficulty_map", |b| {
|
||||
b.iter(|| {
|
||||
layer_map.get_difficulty_map(latest_lsn, &partitioning, Some(3));
|
||||
});
|
||||
});
|
||||
group.finish();
|
||||
}
|
||||
|
||||
|
||||
@@ -2501,6 +2501,7 @@ impl Tenant {
|
||||
initdb_lsn: Lsn,
|
||||
pg_version: u32,
|
||||
ctx: &RequestContext,
|
||||
in_memory_layer_desc: Vec<timeline::InMemoryLayerTestDesc>,
|
||||
delta_layer_desc: Vec<timeline::DeltaLayerTestDesc>,
|
||||
image_layer_desc: Vec<(Lsn, Vec<(pageserver_api::key::Key, bytes::Bytes)>)>,
|
||||
end_lsn: Lsn,
|
||||
@@ -2522,6 +2523,11 @@ impl Tenant {
|
||||
.force_create_image_layer(lsn, images, Some(initdb_lsn), ctx)
|
||||
.await?;
|
||||
}
|
||||
for in_memory in in_memory_layer_desc {
|
||||
tline
|
||||
.force_create_in_memory_layer(in_memory, Some(initdb_lsn), ctx)
|
||||
.await?;
|
||||
}
|
||||
let layer_names = tline
|
||||
.layers
|
||||
.read()
|
||||
@@ -5913,6 +5919,8 @@ mod tests {
|
||||
#[cfg(feature = "testing")]
|
||||
use timeline::GcInfo;
|
||||
#[cfg(feature = "testing")]
|
||||
use timeline::InMemoryLayerTestDesc;
|
||||
#[cfg(feature = "testing")]
|
||||
use timeline::compaction::{KeyHistoryRetention, KeyLogAtLsn};
|
||||
use timeline::{CompactOptions, DeltaLayerTestDesc};
|
||||
use utils::id::TenantId;
|
||||
@@ -7925,6 +7933,7 @@ mod tests {
|
||||
Lsn(0x10),
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
Vec::new(), // in-memory layers
|
||||
Vec::new(), // delta layers
|
||||
vec![(Lsn(0x20), vec![(base_key, test_img("data key 1"))])], // image layers
|
||||
Lsn(0x20), // it's fine to not advance LSN to 0x30 while using 0x30 to get below because `get_vectored_impl` does not wait for LSN
|
||||
@@ -8012,6 +8021,7 @@ mod tests {
|
||||
Lsn(0x10),
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
Vec::new(), // in-memory layers
|
||||
Vec::new(), // delta layers
|
||||
vec![(
|
||||
Lsn(0x20),
|
||||
@@ -8227,6 +8237,7 @@ mod tests {
|
||||
Lsn(0x10),
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
Vec::new(), // in-memory layers
|
||||
// delta layers
|
||||
vec![
|
||||
DeltaLayerTestDesc::new_with_inferred_key_range(
|
||||
@@ -8307,6 +8318,7 @@ mod tests {
|
||||
Lsn(0x10),
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
Vec::new(), // in-memory layers
|
||||
// delta layers
|
||||
vec![
|
||||
DeltaLayerTestDesc::new_with_inferred_key_range(
|
||||
@@ -8380,6 +8392,7 @@ mod tests {
|
||||
Lsn(0x10),
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
Vec::new(), // in-memory layers
|
||||
// delta layers
|
||||
vec![
|
||||
DeltaLayerTestDesc::new_with_inferred_key_range(
|
||||
@@ -8512,6 +8525,7 @@ mod tests {
|
||||
Lsn(0x10),
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
Vec::new(), // in-memory layers
|
||||
vec![
|
||||
DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x20)..Lsn(0x48), delta1),
|
||||
DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x20)..Lsn(0x48), delta2),
|
||||
@@ -8705,6 +8719,7 @@ mod tests {
|
||||
Lsn(0x10),
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
Vec::new(), // in-memory layers
|
||||
vec![DeltaLayerTestDesc::new_with_inferred_key_range(
|
||||
Lsn(0x10)..Lsn(0x40),
|
||||
delta1,
|
||||
@@ -8761,6 +8776,7 @@ mod tests {
|
||||
Lsn(0x10),
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
Vec::new(), // in-memory layers
|
||||
Vec::new(),
|
||||
image_layers,
|
||||
end_lsn,
|
||||
@@ -8967,6 +8983,7 @@ mod tests {
|
||||
Lsn(0x08),
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
Vec::new(), // in-memory layers
|
||||
vec![
|
||||
DeltaLayerTestDesc::new_with_inferred_key_range(
|
||||
Lsn(0x08)..Lsn(0x10),
|
||||
@@ -8985,7 +9002,7 @@ mod tests {
|
||||
delta3,
|
||||
),
|
||||
], // delta layers
|
||||
vec![], // image layers
|
||||
vec![], // image layers
|
||||
Lsn(0x50),
|
||||
)
|
||||
.await?
|
||||
@@ -8996,6 +9013,7 @@ mod tests {
|
||||
Lsn(0x10),
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
Vec::new(), // in-memory layers
|
||||
vec![
|
||||
DeltaLayerTestDesc::new_with_inferred_key_range(
|
||||
Lsn(0x10)..Lsn(0x48),
|
||||
@@ -9546,6 +9564,7 @@ mod tests {
|
||||
Lsn(0x10),
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
Vec::new(), // in-memory layers
|
||||
vec![
|
||||
DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x10)..Lsn(0x48), delta1),
|
||||
DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x10)..Lsn(0x48), delta2),
|
||||
@@ -9793,6 +9812,7 @@ mod tests {
|
||||
Lsn(0x10),
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
Vec::new(), // in-memory layers
|
||||
vec![
|
||||
// delta1 and delta 2 only contain a single key but multiple updates
|
||||
DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x10)..Lsn(0x30), delta1),
|
||||
@@ -10028,6 +10048,7 @@ mod tests {
|
||||
Lsn(0x10),
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
vec![], // in-memory layers
|
||||
vec![], // delta layers
|
||||
vec![(Lsn(0x18), img_layer)], // image layers
|
||||
Lsn(0x18),
|
||||
@@ -10274,6 +10295,7 @@ mod tests {
|
||||
baseline_image_layer_lsn,
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
vec![], // in-memory layers
|
||||
vec![DeltaLayerTestDesc::new_with_inferred_key_range(
|
||||
delta_layer_start_lsn..delta_layer_end_lsn,
|
||||
delta_layer_spec,
|
||||
@@ -10305,6 +10327,158 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
#[tokio::test]
|
||||
async fn test_vectored_read_with_image_layer_inside_inmem() -> anyhow::Result<()> {
|
||||
let harness =
|
||||
TenantHarness::create("test_vectored_read_with_image_layer_inside_inmem").await?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
let will_init_keys = [2, 6];
|
||||
fn get_key(id: u32) -> Key {
|
||||
let mut key = Key::from_hex("110000000033333333444444445500000000").unwrap();
|
||||
key.field6 = id;
|
||||
key
|
||||
}
|
||||
|
||||
let mut expected_key_values = HashMap::new();
|
||||
|
||||
let baseline_image_layer_lsn = Lsn(0x10);
|
||||
let mut baseline_img_layer = Vec::new();
|
||||
for i in 0..5 {
|
||||
let key = get_key(i);
|
||||
let value = format!("value {i}@{baseline_image_layer_lsn}");
|
||||
|
||||
let removed = expected_key_values.insert(key, value.clone());
|
||||
assert!(removed.is_none());
|
||||
|
||||
baseline_img_layer.push((key, Bytes::from(value)));
|
||||
}
|
||||
|
||||
let nested_image_layer_lsn = Lsn(0x50);
|
||||
let mut nested_img_layer = Vec::new();
|
||||
for i in 5..10 {
|
||||
let key = get_key(i);
|
||||
let value = format!("value {i}@{nested_image_layer_lsn}");
|
||||
|
||||
let removed = expected_key_values.insert(key, value.clone());
|
||||
assert!(removed.is_none());
|
||||
|
||||
nested_img_layer.push((key, Bytes::from(value)));
|
||||
}
|
||||
|
||||
let frozen_layer = {
|
||||
let lsn_range = Lsn(0x40)..Lsn(0x60);
|
||||
let mut data = Vec::new();
|
||||
for i in 0..10 {
|
||||
let key = get_key(i);
|
||||
let key_in_nested = nested_img_layer
|
||||
.iter()
|
||||
.any(|(key_with_img, _)| *key_with_img == key);
|
||||
let lsn = {
|
||||
if key_in_nested {
|
||||
Lsn(nested_image_layer_lsn.0 + 5)
|
||||
} else {
|
||||
lsn_range.start
|
||||
}
|
||||
};
|
||||
|
||||
let will_init = will_init_keys.contains(&i);
|
||||
if will_init {
|
||||
data.push((key, lsn, Value::WalRecord(NeonWalRecord::wal_init(""))));
|
||||
|
||||
expected_key_values.insert(key, "".to_string());
|
||||
} else {
|
||||
let delta = format!("@{lsn}");
|
||||
data.push((
|
||||
key,
|
||||
lsn,
|
||||
Value::WalRecord(NeonWalRecord::wal_append(&delta)),
|
||||
));
|
||||
|
||||
expected_key_values
|
||||
.get_mut(&key)
|
||||
.expect("An image exists for each key")
|
||||
.push_str(delta.as_str());
|
||||
}
|
||||
}
|
||||
|
||||
InMemoryLayerTestDesc {
|
||||
lsn_range,
|
||||
is_open: false,
|
||||
data,
|
||||
}
|
||||
};
|
||||
|
||||
let (open_layer, last_record_lsn) = {
|
||||
let start_lsn = Lsn(0x70);
|
||||
let mut data = Vec::new();
|
||||
let mut end_lsn = Lsn(0);
|
||||
for i in 0..10 {
|
||||
let key = get_key(i);
|
||||
let lsn = Lsn(start_lsn.0 + i as u64);
|
||||
let delta = format!("@{lsn}");
|
||||
data.push((
|
||||
key,
|
||||
lsn,
|
||||
Value::WalRecord(NeonWalRecord::wal_append(&delta)),
|
||||
));
|
||||
|
||||
expected_key_values
|
||||
.get_mut(&key)
|
||||
.expect("An image exists for each key")
|
||||
.push_str(delta.as_str());
|
||||
|
||||
end_lsn = std::cmp::max(end_lsn, lsn);
|
||||
}
|
||||
|
||||
(
|
||||
InMemoryLayerTestDesc {
|
||||
lsn_range: start_lsn..Lsn::MAX,
|
||||
is_open: true,
|
||||
data,
|
||||
},
|
||||
end_lsn,
|
||||
)
|
||||
};
|
||||
|
||||
assert!(
|
||||
nested_image_layer_lsn > frozen_layer.lsn_range.start
|
||||
&& nested_image_layer_lsn < frozen_layer.lsn_range.end
|
||||
);
|
||||
|
||||
let tline = tenant
|
||||
.create_test_timeline_with_layers(
|
||||
TIMELINE_ID,
|
||||
baseline_image_layer_lsn,
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
vec![open_layer, frozen_layer], // in-memory layers
|
||||
Vec::new(), // delta layers
|
||||
vec![
|
||||
(baseline_image_layer_lsn, baseline_img_layer),
|
||||
(nested_image_layer_lsn, nested_img_layer),
|
||||
], // image layers
|
||||
last_record_lsn,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let keyspace = KeySpace::single(get_key(0)..get_key(10));
|
||||
let results = tline
|
||||
.get_vectored(keyspace, last_record_lsn, IoConcurrency::sequential(), &ctx)
|
||||
.await
|
||||
.expect("No vectored errors");
|
||||
for (key, res) in results {
|
||||
let value = res.expect("No key errors");
|
||||
let expected_value = expected_key_values.remove(&key).expect("No unknown keys");
|
||||
assert_eq!(value, Bytes::from(expected_value.clone()));
|
||||
|
||||
tracing::info!("key={key} value={expected_value}");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn sort_layer_key(k1: &PersistentLayerKey, k2: &PersistentLayerKey) -> std::cmp::Ordering {
|
||||
(
|
||||
k1.is_delta,
|
||||
@@ -10420,6 +10594,7 @@ mod tests {
|
||||
Lsn(0x10),
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
vec![], // in-memory layers
|
||||
vec![
|
||||
DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x20)..Lsn(0x48), delta1),
|
||||
DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x20)..Lsn(0x48), delta2),
|
||||
@@ -10804,6 +10979,7 @@ mod tests {
|
||||
Lsn(0x10),
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
vec![], // in-memory layers
|
||||
vec![
|
||||
// delta1/2/4 only contain a single key but multiple updates
|
||||
DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x20)..Lsn(0x28), delta1),
|
||||
@@ -11055,6 +11231,7 @@ mod tests {
|
||||
Lsn(0x10),
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
vec![], // in-memory layers
|
||||
vec![
|
||||
// delta1/2/4 only contain a single key but multiple updates
|
||||
DeltaLayerTestDesc::new_with_inferred_key_range(Lsn(0x20)..Lsn(0x28), delta1),
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -63,6 +63,8 @@ pub struct HistoricLayerCoverage<Value> {
|
||||
/// The latest state
|
||||
head: LayerCoverageTuple<Value>,
|
||||
|
||||
/// TODO: this could be an ordered vec using binary search.
|
||||
/// We push into this map everytime we add a layer, so might see some benefit
|
||||
/// All previous states
|
||||
historic: BTreeMap<u64, LayerCoverageTuple<Value>>,
|
||||
}
|
||||
@@ -419,6 +421,10 @@ pub struct BufferedHistoricLayerCoverage<Value> {
|
||||
buffer: BTreeMap<LayerKey, Option<Value>>,
|
||||
|
||||
/// All current layers. This is not used for search. Only to make rebuilds easier.
|
||||
// TODO: This map is never cleared. Rebuilds could use the post-trim last entry of
|
||||
// [`Self::historic_coverage`] instead of doubling memory usage.
|
||||
// [`Self::len`]: can require rebuild and serve from latest historic
|
||||
// [`Self::iter`]: already requires rebuild => can serve from latest historic
|
||||
layers: BTreeMap<LayerKey, Value>,
|
||||
}
|
||||
|
||||
|
||||
@@ -40,6 +40,7 @@ use utils::sync::gate::GateGuard;
|
||||
|
||||
use self::inmemory_layer::InMemoryLayerFileId;
|
||||
use super::PageReconstructError;
|
||||
use super::layer_map::InMemoryLayerDesc;
|
||||
use super::timeline::{GetVectoredError, ReadPath};
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{AccessStatsBehavior, RequestContext};
|
||||
@@ -721,6 +722,12 @@ struct LayerToVisitId {
|
||||
lsn_floor: Lsn,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Hash)]
|
||||
pub enum ReadableLayerWeak {
|
||||
PersistentLayer(Arc<PersistentLayerDesc>),
|
||||
InMemoryLayer(InMemoryLayerDesc),
|
||||
}
|
||||
|
||||
/// Layer wrapper for the read path. Note that it is valid
|
||||
/// to use these layers even after external operations have
|
||||
/// been performed on them (compaction, freeze, etc.).
|
||||
@@ -873,7 +880,7 @@ impl ReadableLayer {
|
||||
}
|
||||
ReadableLayer::InMemoryLayer(layer) => {
|
||||
layer
|
||||
.get_values_reconstruct_data(keyspace, lsn_range.end, reconstruct_state, ctx)
|
||||
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, ctx)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -416,7 +416,7 @@ impl InMemoryLayer {
|
||||
pub(crate) async fn get_values_reconstruct_data(
|
||||
self: &Arc<InMemoryLayer>,
|
||||
keyspace: KeySpace,
|
||||
end_lsn: Lsn,
|
||||
lsn_range: Range<Lsn>,
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), GetVectoredError> {
|
||||
@@ -433,8 +433,6 @@ impl InMemoryLayer {
|
||||
let mut reads: HashMap<Key, Vec<ValueRead>> = HashMap::new();
|
||||
let mut ios: HashMap<(Key, Lsn), OnDiskValueIo> = Default::default();
|
||||
|
||||
let lsn_range = self.start_lsn..end_lsn;
|
||||
|
||||
for range in keyspace.ranges.iter() {
|
||||
for (key, vec_map) in inner
|
||||
.index
|
||||
|
||||
@@ -49,6 +49,7 @@ async fn smoke_test() {
|
||||
Lsn(0x10),
|
||||
14,
|
||||
&ctx,
|
||||
Default::default(), // in-memory layers
|
||||
Default::default(),
|
||||
image_layers,
|
||||
Lsn(0x100),
|
||||
|
||||
@@ -3914,39 +3914,22 @@ impl Timeline {
|
||||
let guard = timeline.layers.read().await;
|
||||
let layers = guard.layer_map()?;
|
||||
|
||||
let in_memory_layer = layers.find_in_memory_layer(|l| {
|
||||
let start_lsn = l.get_lsn_range().start;
|
||||
cont_lsn > start_lsn
|
||||
});
|
||||
for range in unmapped_keyspace.ranges.iter() {
|
||||
let results = layers.range_search(range.clone(), cont_lsn);
|
||||
|
||||
match in_memory_layer {
|
||||
Some(l) => {
|
||||
let lsn_range = l.get_lsn_range().start..cont_lsn;
|
||||
fringe.update(
|
||||
ReadableLayer::InMemoryLayer(l),
|
||||
unmapped_keyspace.clone(),
|
||||
lsn_range,
|
||||
);
|
||||
}
|
||||
None => {
|
||||
for range in unmapped_keyspace.ranges.iter() {
|
||||
let results = layers.range_search(range.clone(), cont_lsn);
|
||||
|
||||
results
|
||||
.found
|
||||
.into_iter()
|
||||
.map(|(SearchResult { layer, lsn_floor }, keyspace_accum)| {
|
||||
(
|
||||
ReadableLayer::PersistentLayer(guard.get_from_desc(&layer)),
|
||||
keyspace_accum.to_keyspace(),
|
||||
lsn_floor..cont_lsn,
|
||||
)
|
||||
})
|
||||
.for_each(|(layer, keyspace, lsn_range)| {
|
||||
fringe.update(layer, keyspace, lsn_range)
|
||||
});
|
||||
}
|
||||
}
|
||||
results
|
||||
.found
|
||||
.into_iter()
|
||||
.map(|(SearchResult { layer, lsn_floor }, keyspace_accum)| {
|
||||
(
|
||||
guard.upgrade(layer),
|
||||
keyspace_accum.to_keyspace(),
|
||||
lsn_floor..cont_lsn,
|
||||
)
|
||||
})
|
||||
.for_each(|(layer, keyspace, lsn_range)| {
|
||||
fringe.update(layer, keyspace, lsn_range)
|
||||
});
|
||||
}
|
||||
|
||||
// It's safe to drop the layer map lock after planning the next round of reads.
|
||||
@@ -5555,6 +5538,14 @@ pub struct DeltaLayerTestDesc {
|
||||
pub data: Vec<(Key, Lsn, Value)>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[derive(Clone)]
|
||||
pub struct InMemoryLayerTestDesc {
|
||||
pub lsn_range: Range<Lsn>,
|
||||
pub data: Vec<(Key, Lsn, Value)>,
|
||||
pub is_open: bool,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl DeltaLayerTestDesc {
|
||||
pub fn new(lsn_range: Range<Lsn>, key_range: Range<Key>, data: Vec<(Key, Lsn, Value)>) -> Self {
|
||||
@@ -6567,6 +6558,92 @@ impl Timeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Force create an in-memory layer and place them into the layer map.
|
||||
#[cfg(test)]
|
||||
pub(super) async fn force_create_in_memory_layer(
|
||||
self: &Arc<Timeline>,
|
||||
mut in_memory: InMemoryLayerTestDesc,
|
||||
check_start_lsn: Option<Lsn>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
use utils::bin_ser::BeSer;
|
||||
|
||||
// Validate LSNs
|
||||
if let Some(check_start_lsn) = check_start_lsn {
|
||||
assert!(in_memory.lsn_range.start >= check_start_lsn);
|
||||
}
|
||||
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
let layer_end_lsn = if in_memory.is_open {
|
||||
in_memory
|
||||
.data
|
||||
.iter()
|
||||
.map(|(_key, lsn, _value)| lsn)
|
||||
.max()
|
||||
.cloned()
|
||||
} else {
|
||||
Some(in_memory.lsn_range.end)
|
||||
};
|
||||
|
||||
if let Some(end) = layer_end_lsn {
|
||||
assert!(
|
||||
end <= last_record_lsn,
|
||||
"advance last record lsn before inserting a layer, end_lsn={}, last_record_lsn={}",
|
||||
end,
|
||||
last_record_lsn,
|
||||
);
|
||||
}
|
||||
|
||||
in_memory.data.iter().for_each(|(_key, lsn, _value)| {
|
||||
assert!(*lsn >= in_memory.lsn_range.start);
|
||||
assert!(*lsn < in_memory.lsn_range.end);
|
||||
});
|
||||
|
||||
// Build the batch
|
||||
in_memory
|
||||
.data
|
||||
.sort_unstable_by(|(ka, la, _), (kb, lb, _)| (ka, la).cmp(&(kb, lb)));
|
||||
|
||||
let data = in_memory
|
||||
.data
|
||||
.into_iter()
|
||||
.map(|(key, lsn, value)| {
|
||||
let value_size = value.serialized_size().unwrap() as usize;
|
||||
(key.to_compact(), lsn, value_size, value)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let batch = SerializedValueBatch::from_values(data);
|
||||
|
||||
// Create the in-memory layer and write the batch into it
|
||||
let layer = InMemoryLayer::create(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_shard_id,
|
||||
in_memory.lsn_range.start,
|
||||
&self.gate,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
layer.put_batch(batch, ctx).await.unwrap();
|
||||
if !in_memory.is_open {
|
||||
layer.freeze(in_memory.lsn_range.end).await;
|
||||
}
|
||||
|
||||
info!("force created in-memory layer {:?}", in_memory.lsn_range);
|
||||
|
||||
// Link the layer to the layer map
|
||||
{
|
||||
let mut guard = self.layers.write().await;
|
||||
let layer_map = guard.open_mut().unwrap();
|
||||
layer_map.force_insert_in_memory_layer(Arc::new(layer));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Return all keys at the LSN in the image layers
|
||||
#[cfg(test)]
|
||||
pub(crate) async fn inspect_image_layers(
|
||||
@@ -6999,6 +7076,7 @@ mod tests {
|
||||
Lsn(0x10),
|
||||
14,
|
||||
&ctx,
|
||||
Vec::new(), // in-memory layers
|
||||
delta_layers,
|
||||
image_layers,
|
||||
Lsn(0x100),
|
||||
@@ -7132,6 +7210,7 @@ mod tests {
|
||||
Lsn(0x10),
|
||||
14,
|
||||
&ctx,
|
||||
Vec::new(), // in-memory layers
|
||||
delta_layers,
|
||||
image_layers,
|
||||
Lsn(0x100),
|
||||
|
||||
@@ -8,14 +8,14 @@ use tracing::trace;
|
||||
use utils::id::TimelineId;
|
||||
use utils::lsn::{AtomicLsn, Lsn};
|
||||
|
||||
use super::TimelineWriterState;
|
||||
use super::{ReadableLayer, TimelineWriterState};
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::RequestContext;
|
||||
use crate::metrics::TimelineMetrics;
|
||||
use crate::tenant::layer_map::{BatchedUpdates, LayerMap};
|
||||
use crate::tenant::storage_layer::{
|
||||
AsLayerDesc, InMemoryLayer, Layer, LayerVisibilityHint, PersistentLayerDesc,
|
||||
PersistentLayerKey, ResidentLayer,
|
||||
PersistentLayerKey, ReadableLayerWeak, ResidentLayer,
|
||||
};
|
||||
|
||||
/// Provides semantic APIs to manipulate the layer map.
|
||||
@@ -37,6 +37,21 @@ impl Default for LayerManager {
|
||||
}
|
||||
|
||||
impl LayerManager {
|
||||
pub(crate) fn upgrade(&self, weak: ReadableLayerWeak) -> ReadableLayer {
|
||||
match weak {
|
||||
ReadableLayerWeak::PersistentLayer(desc) => {
|
||||
ReadableLayer::PersistentLayer(self.get_from_desc(&desc))
|
||||
}
|
||||
ReadableLayerWeak::InMemoryLayer(desc) => {
|
||||
let inmem = self
|
||||
.layer_map()
|
||||
.expect("no concurrent shutdown")
|
||||
.in_memory_layer(&desc);
|
||||
ReadableLayer::InMemoryLayer(inmem)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_from_key(&self, key: &PersistentLayerKey) -> Layer {
|
||||
// The assumption for the `expect()` is that all code maintains the following invariant:
|
||||
// A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
|
||||
@@ -470,6 +485,25 @@ impl OpenLayerManager {
|
||||
mapping.remove(layer);
|
||||
layer.delete_on_drop();
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn force_insert_in_memory_layer(&mut self, layer: Arc<InMemoryLayer>) {
|
||||
use pageserver_api::models::InMemoryLayerInfo;
|
||||
|
||||
match layer.info() {
|
||||
InMemoryLayerInfo::Open { .. } => {
|
||||
assert!(self.layer_map.open_layer.is_none());
|
||||
self.layer_map.open_layer = Some(layer);
|
||||
}
|
||||
InMemoryLayerInfo::Frozen { lsn_start, .. } => {
|
||||
if let Some(last) = self.layer_map.frozen_layers.back() {
|
||||
assert!(last.get_lsn_range().end <= lsn_start);
|
||||
}
|
||||
|
||||
self.layer_map.frozen_layers.push_back(layer);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct LayerFileManager<T>(HashMap<PersistentLayerKey, T>);
|
||||
|
||||
Reference in New Issue
Block a user