Compare commits

...

4 Commits

Author SHA1 Message Date
Vlad Lazar
84d1af736e fixup: update external loop state before handling keyspaces
`ValuesReconstructState::consume_done_keys` may only be called once
after a layer visit. The code in the previous commit called it for
each keyspace, resulting in keys not being marked done in the split
keyspace by floor LSN scenario.
2024-09-19 10:44:20 +01:00
Vlad Lazar
3a218d7525 pageserver: extend nested image test with will init keys 2024-09-19 10:43:55 +01:00
Vlad Lazar
f0ad90f3ee pageserver: handle reading up to different floor LSNs in deltas
Problem

Different keyspaces may require different floor LSNs in vectored
delta layer visits. This patch adds support for such cases.

Summary of Changes

* Rework layer visit collection. Each layer type has a separate
visit type which is aware of the requirements. For delta layers
we track the floor LSN of keyspaces and merge only when that's
matching. Other layer types do not have this requirement so they
merge everything.
* Thread the new visit types into the `get_values_reconstruct_data`
calls. For delta layers, the code was adapted such that it may
merge reads across keyspaces with different LSN floor requirements.
* Tweak the fringe update code in `get_vectored_reconstruct_data_timeline`
to handle different cont LSNs for different keyspaces. In practice,
we will only update the fringe from one keyspace, since this "keyspace
split" only happens when an image layer overlaps a delta layer (and
image layers always complete all their keys).
* Update tests with the new interfaces
2024-09-18 13:35:19 +01:00
Vlad Lazar
a8ad678574 tests: add unit test for vec read with overlapped images 2024-09-18 13:33:51 +01:00
8 changed files with 757 additions and 201 deletions

View File

@@ -4164,9 +4164,18 @@ pub(crate) mod harness {
let records_neon = records.iter().all(|r| apply_neon::can_apply_in_neon(&r.1));
if records_neon {
// For Neon wal records, we can decode without spawning postgres, so do so.
let base_img = base_img.expect("Neon WAL redo requires base image").1;
let mut page = BytesMut::new();
page.extend_from_slice(&base_img);
let mut page = match (base_img, records.first()) {
(Some((_lsn, img)), _) => {
let mut page = BytesMut::new();
page.extend_from_slice(&img);
page
}
(_, Some((_lsn, rec))) if rec.will_init() => BytesMut::new(),
_ => {
panic!("Neon WAL redo requires base image or will init record");
}
};
for (record_lsn, record) in records {
apply_neon::apply_in_neon(&record, record_lsn, key, &mut page)?;
}
@@ -8470,4 +8479,135 @@ mod tests {
Ok(())
}
// Regression test for https://github.com/neondatabase/neon/issues/9012
// Create an image arrangement where we have to read at different LSN ranges
// from a delta layer. This is achieved by overlapping an image layer on top of
// a delta layer. Like so:
//
// A B
// +----------------+ -> delta_layer
// | | ^ lsn
// | =========|-> nested_image_layer |
// | C | |
// +----------------+ |
// ======== -> baseline_image_layer +-------> key
//
//
// When querying the key range [A, B) we need to read at different LSN ranges
// for [A, C) and [C, B). This test checks that the described edge case is handled correctly.
#[tokio::test]
async fn test_vectored_read_with_nested_image_layer() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_vectored_read_with_nested_image_layer").await?;
let (tenant, ctx) = harness.load().await;
let will_init_keys = [2, 6];
fn get_key(id: u32) -> Key {
let mut key = Key::from_hex("110000000033333333444444445500000000").unwrap();
key.field6 = id;
key
}
let mut expected_key_values = HashMap::new();
let baseline_image_layer_lsn = Lsn(0x10);
let mut baseline_img_layer = Vec::new();
for i in 0..5 {
let key = get_key(i);
let value = format!("value {i}@{baseline_image_layer_lsn}");
let removed = expected_key_values.insert(key, value.clone());
assert!(removed.is_none());
baseline_img_layer.push((key, Bytes::from(value)));
}
let nested_image_layer_lsn = Lsn(0x50);
let mut nested_img_layer = Vec::new();
for i in 5..10 {
let key = get_key(i);
let value = format!("value {i}@{nested_image_layer_lsn}");
let removed = expected_key_values.insert(key, value.clone());
assert!(removed.is_none());
nested_img_layer.push((key, Bytes::from(value)));
}
let mut delta_layer_spec = Vec::default();
let delta_layer_start_lsn = Lsn(0x20);
let mut delta_layer_end_lsn = delta_layer_start_lsn;
for i in 0..10 {
let key = get_key(i);
let key_in_nested = nested_img_layer
.iter()
.any(|(key_with_img, _)| *key_with_img == key);
let lsn = {
if key_in_nested {
Lsn(nested_image_layer_lsn.0 + 0x10)
} else {
delta_layer_start_lsn
}
};
let will_init = will_init_keys.contains(&i);
if will_init {
delta_layer_spec.push((key, lsn, Value::WalRecord(NeonWalRecord::wal_init())));
expected_key_values.insert(key, "".to_string());
} else {
let delta = format!("@{lsn}");
delta_layer_spec.push((
key,
lsn,
Value::WalRecord(NeonWalRecord::wal_append(&delta)),
));
expected_key_values
.get_mut(&key)
.expect("An image exists for each key")
.push_str(delta.as_str());
}
delta_layer_end_lsn = std::cmp::max(delta_layer_start_lsn, lsn);
}
delta_layer_end_lsn = Lsn(delta_layer_end_lsn.0 + 1);
assert!(
nested_image_layer_lsn > delta_layer_start_lsn
&& nested_image_layer_lsn < delta_layer_end_lsn
);
let tline = tenant
.create_test_timeline_with_layers(
TIMELINE_ID,
baseline_image_layer_lsn,
DEFAULT_PG_VERSION,
&ctx,
vec![DeltaLayerTestDesc::new_with_inferred_key_range(
delta_layer_start_lsn..delta_layer_end_lsn,
delta_layer_spec,
)], // delta layers
vec![
(baseline_image_layer_lsn, baseline_img_layer),
(nested_image_layer_lsn, nested_img_layer),
], // image layers
delta_layer_end_lsn,
)
.await?;
let keyspace = KeySpace::single(get_key(0)..get_key(10));
let results = tline
.get_vectored(keyspace, delta_layer_end_lsn, &ctx)
.await
.expect("No vectored errors");
for (key, res) in results {
let value = res.expect("No key errors");
let expected_value = expected_key_values.remove(&key).expect("No unknown keys");
assert_eq!(value, Bytes::from(expected_value));
}
Ok(())
}
}

View File

@@ -287,7 +287,7 @@ pub(crate) enum ReadableLayer {
/// A partial description of a read to be done.
#[derive(Debug, Clone)]
struct ReadDesc {
struct VisitLocation {
/// An id used to resolve the readable layer within the fringe
layer_id: LayerId,
/// Lsn range for the read, used for selecting the next read
@@ -303,46 +303,442 @@ struct ReadDesc {
/// a two layer indexing scheme.
#[derive(Debug)]
pub(crate) struct LayerFringe {
planned_reads_by_lsn: BinaryHeap<ReadDesc>,
layers: HashMap<LayerId, LayerKeyspace>,
visits_by_lsn_index: BinaryHeap<VisitLocation>,
layer_visits: HashMap<LayerId, LayerVisitBuilder>,
}
#[derive(Debug)]
struct LayerKeyspace {
layer: ReadableLayer,
target_keyspace: KeySpaceRandomAccum,
pub(crate) enum LayerVisitBuilder {
InMemoryLayer(InMemoryLayerVisitBuilder),
PersistentLayer(PersistentLayerVisitBuilder),
}
#[derive(Debug)]
pub(crate) enum LayerVisit {
InMemoryLayer(InMemoryLayerVisit),
PersistentLayer(PersistentLayerVisit),
}
#[derive(Debug)]
pub(crate) enum PersistentLayerVisitBuilder {
DeltaLayer(DeltaLayerVisitBuilder),
ImageLayer(ImageLayerVisitBuilder),
}
#[derive(Debug)]
pub(crate) enum PersistentLayerVisit {
DeltaLayer(DeltaLayerVisit),
ImageLayer(ImageLayerVisit),
}
#[derive(Debug)]
pub(crate) struct InMemoryLayerVisitBuilder {
/// Key space accumulator which will define which keys we are
/// interested in for this layer visit.
keyspace_accum: KeySpaceRandomAccum,
/// Ignore any keys with an LSN greater or equal
/// than the specified one.
lsn_ceil: Lsn,
/// Only consider keys at LSN greater or equal than the specified one
lsn_floor: Lsn,
// The in-memory layer to visit
layer: Arc<InMemoryLayer>,
}
#[derive(Debug)]
pub(crate) struct InMemoryLayerVisit {
/// Key space of keys considered by the visit
keyspace: KeySpace,
/// Ignore any keys with an LSN greater or equal
/// than the specified one.
lsn_ceil: Lsn,
/// Only consider keys at LSN greater or equal than the specified one
lsn_floor: Lsn,
}
#[derive(Debug)]
pub(crate) struct DeltaLayerVisitBuilder {
/// List of key spaces accumulators which will define what deltas are read.
/// Each accumulator has an associated LSN which specifies
/// the LSN floor for it (i.e. do not read below this LSN).
keyspace_accums: HashMap<Lsn, KeySpaceRandomAccum>,
/// Ignore any keys with an LSN greater or equal
/// than the specified one.
lsn_ceil: Lsn,
/// Handle for the layer to visit (guaranteed to be a delta layer)
layer: Layer,
}
#[derive(Debug)]
pub(crate) struct DeltaLayerVisit {
/// List of key spaces considered during the visit.
/// Each keyspace has an associated LSN which specifies
/// the LSN floor for it (i.e. do not read below this LSN).
keyspaces: Vec<(Lsn, KeySpace)>,
/// Ignore any keys with an LSN greater or equal
/// than the specified one.
lsn_ceil: Lsn,
}
#[derive(Debug)]
pub(crate) struct ImageLayerVisitBuilder {
/// Key space which defines which keys we are
/// interested in for this layer visit.
keyspace_accum: KeySpaceRandomAccum,
/// Handle for the layer to visit (guaranteed to be an image layer)
layer: Layer,
/// Only consider keys at LSN greater or equal than the specified one
lsn_floor: Lsn,
}
#[derive(Debug)]
pub(crate) struct ImageLayerVisit {
/// Key space which defines which keys we are
/// interested in for this layer visit.
keyspace: KeySpace,
/// Only consider keys at LSN greater or equal than the specified one
lsn_floor: Lsn,
}
impl LayerVisitBuilder {
pub(crate) fn new(layer: ReadableLayer, keyspace: KeySpace, lsn_range: Range<Lsn>) -> Self {
match layer {
ReadableLayer::InMemoryLayer(in_mem) => {
Self::InMemoryLayer(InMemoryLayerVisitBuilder::new(in_mem, keyspace, lsn_range))
}
ReadableLayer::PersistentLayer(persistent) => Self::PersistentLayer(
PersistentLayerVisitBuilder::new(persistent, keyspace, lsn_range),
),
}
}
}
impl InMemoryLayerVisitBuilder {
fn new(layer: Arc<InMemoryLayer>, keyspace: KeySpace, lsn_range: Range<Lsn>) -> Self {
assert_eq!(lsn_range.start, layer.get_lsn_range().start);
let mut keyspace_accum = KeySpaceRandomAccum::new();
keyspace_accum.add_keyspace(keyspace);
Self {
keyspace_accum,
lsn_floor: lsn_range.start,
lsn_ceil: lsn_range.end,
layer,
}
}
}
impl PersistentLayerVisitBuilder {
fn new(layer: Layer, keyspace: KeySpace, lsn_range: Range<Lsn>) -> Self {
let is_delta = layer.layer_desc().is_delta;
if is_delta {
Self::DeltaLayer(DeltaLayerVisitBuilder::new(layer, keyspace, lsn_range))
} else {
Self::ImageLayer(ImageLayerVisitBuilder::new(layer, keyspace, lsn_range))
}
}
}
impl DeltaLayerVisitBuilder {
fn new(layer: Layer, keyspace: KeySpace, lsn_range: Range<Lsn>) -> Self {
assert!(layer.layer_desc().is_delta);
let mut keyspace_accum = KeySpaceRandomAccum::new();
keyspace_accum.add_keyspace(keyspace);
Self {
keyspace_accums: HashMap::from([(lsn_range.start, keyspace_accum)]),
lsn_ceil: lsn_range.end,
layer,
}
}
}
impl ImageLayerVisitBuilder {
fn new(layer: Layer, keyspace: KeySpace, lsn_range: Range<Lsn>) -> Self {
assert!(!layer.layer_desc().is_delta);
assert_eq!(lsn_range.start, layer.layer_desc().lsn_range.start);
let mut keyspace_accum = KeySpaceRandomAccum::new();
keyspace_accum.add_keyspace(keyspace);
Self {
keyspace_accum,
lsn_floor: lsn_range.start,
layer,
}
}
}
pub(crate) trait LayerVisitBuilderUpdate {
type L;
type LV;
/// Extend an already planned layer visit to also include the keys
/// in the provided keyspace and LSN range.
fn update(&mut self, keyspace: KeySpace, lsn_range: Range<Lsn>);
/// Build the visit!
fn build(self) -> (Self::L, Self::LV);
}
impl LayerVisitBuilderUpdate for LayerVisitBuilder {
type L = ReadableLayer;
type LV = LayerVisit;
fn update(&mut self, keyspace: KeySpace, lsn_range: Range<Lsn>) {
match self {
LayerVisitBuilder::InMemoryLayer(v) => v.update(keyspace, lsn_range),
LayerVisitBuilder::PersistentLayer(v) => v.update(keyspace, lsn_range),
}
}
fn build(self) -> (Self::L, Self::LV) {
match self {
LayerVisitBuilder::InMemoryLayer(in_mem) => (
ReadableLayer::InMemoryLayer(in_mem.layer),
LayerVisit::InMemoryLayer(InMemoryLayerVisit {
keyspace: in_mem.keyspace_accum.to_keyspace(),
lsn_ceil: in_mem.lsn_ceil,
lsn_floor: in_mem.lsn_floor,
}),
),
LayerVisitBuilder::PersistentLayer(pers) => {
let (layer, visit) = pers.build();
(
ReadableLayer::PersistentLayer(layer),
LayerVisit::PersistentLayer(visit),
)
}
}
}
}
impl LayerVisitBuilderUpdate for PersistentLayerVisitBuilder {
type L = Layer;
type LV = PersistentLayerVisit;
fn update(&mut self, keyspace: KeySpace, lsn_range: Range<Lsn>) {
match self {
PersistentLayerVisitBuilder::DeltaLayer(v) => v.update(keyspace, lsn_range),
PersistentLayerVisitBuilder::ImageLayer(v) => v.update(keyspace, lsn_range),
}
}
fn build(self) -> (Self::L, Self::LV) {
match self {
PersistentLayerVisitBuilder::DeltaLayer(delta) => {
let (layer, visit) = delta.build();
(layer, PersistentLayerVisit::DeltaLayer(visit))
}
PersistentLayerVisitBuilder::ImageLayer(img) => {
let (layer, visit) = img.build();
(layer, PersistentLayerVisit::ImageLayer(visit))
}
}
}
}
impl LayerVisitBuilderUpdate for InMemoryLayerVisitBuilder {
type L = Arc<InMemoryLayer>;
type LV = InMemoryLayerVisit;
fn update(&mut self, keyspace: KeySpace, lsn_range: Range<Lsn>) {
// Note: I cannot think of any cases when this update should happen,
// since in memory layers span the entire key range.
assert_eq!(lsn_range.end, self.lsn_ceil);
assert_eq!(lsn_range.start, self.lsn_floor);
self.keyspace_accum.add_keyspace(keyspace);
}
fn build(self) -> (Self::L, Self::LV) {
(
self.layer,
InMemoryLayerVisit {
keyspace: self.keyspace_accum.to_keyspace(),
lsn_floor: self.lsn_floor,
lsn_ceil: self.lsn_ceil,
},
)
}
}
impl LayerVisitBuilderUpdate for DeltaLayerVisitBuilder {
type L = Layer;
type LV = DeltaLayerVisit;
fn update(&mut self, keyspace: KeySpace, lsn_range: Range<Lsn>) {
assert_eq!(lsn_range.end, self.lsn_ceil);
self.keyspace_accums
.entry(lsn_range.start)
.or_default()
.add_keyspace(keyspace);
}
fn build(self) -> (Self::L, Self::LV) {
use itertools::Itertools;
let keyspaces = self
.keyspace_accums
.into_iter()
.filter_map(|(lsn_floor, accum)| {
let keyspace = accum.to_keyspace();
if keyspace.is_empty() {
None
} else {
Some((lsn_floor, keyspace))
}
})
.sorted_by_key(|(_lsn_floor, keyspace)| keyspace.start().unwrap())
.collect::<Vec<(Lsn, KeySpace)>>();
if cfg!(debug_assertions) {
// Check that the keyspaces we are going to read from
// a layer are non-overlapping.
//
// The keyspaces provided to vectored read initially are non-overlapping.
// We may split keyspaces at each step and keyspaces resulting from a split
// are non-overlapping as well. One can prove that the property holds by
// induction.
let mut prev_end: Option<Key> = None;
for (_lsn_floor, crnt) in keyspaces.iter() {
if let Some(prev_end) = prev_end {
debug_assert!(prev_end <= crnt.start().unwrap())
}
prev_end = Some(crnt.end().unwrap());
}
}
(
self.layer,
DeltaLayerVisit {
keyspaces,
lsn_ceil: self.lsn_ceil,
},
)
}
}
impl LayerVisitBuilderUpdate for ImageLayerVisitBuilder {
type L = Layer;
type LV = ImageLayerVisit;
fn update(&mut self, keyspace: KeySpace, lsn_range: Range<Lsn>) {
assert_eq!(lsn_range.start, self.lsn_floor);
self.keyspace_accum.add_keyspace(keyspace);
}
fn build(self) -> (Self::L, Self::LV) {
(
self.layer,
ImageLayerVisit {
keyspace: self.keyspace_accum.to_keyspace(),
lsn_floor: self.lsn_floor,
},
)
}
}
pub(crate) trait LayerVisitDetails {
/// Returns the key spaces planned for the visit
/// and their associated floor LSNs.
fn keyspaces(&self) -> Vec<(Lsn, KeySpace)>;
}
impl LayerVisitDetails for LayerVisit {
fn keyspaces(&self) -> Vec<(Lsn, KeySpace)> {
match self {
LayerVisit::PersistentLayer(pers) => pers.keyspaces(),
LayerVisit::InMemoryLayer(in_mem) => in_mem.keyspaces(),
}
}
}
impl LayerVisitDetails for PersistentLayerVisit {
fn keyspaces(&self) -> Vec<(Lsn, KeySpace)> {
match self {
PersistentLayerVisit::DeltaLayer(delta) => delta.keyspaces(),
PersistentLayerVisit::ImageLayer(image) => image.keyspaces(),
}
}
}
impl LayerVisitDetails for DeltaLayerVisit {
fn keyspaces(&self) -> Vec<(Lsn, KeySpace)> {
self.keyspaces.clone()
}
}
impl LayerVisitDetails for ImageLayerVisit {
fn keyspaces(&self) -> Vec<(Lsn, KeySpace)> {
vec![(self.lsn_floor, self.keyspace.clone())]
}
}
impl LayerVisitDetails for InMemoryLayerVisit {
fn keyspaces(&self) -> Vec<(Lsn, KeySpace)> {
vec![(self.lsn_floor, self.keyspace.clone())]
}
}
impl LayerVisit {
fn into_persistent_layer_visit(self) -> PersistentLayerVisit {
match self {
LayerVisit::PersistentLayer(visit) => visit,
LayerVisit::InMemoryLayer(visit) => {
panic!("Invalid attempt to cast to PersistentLayerVisit: {visit:?}");
}
}
}
fn into_in_memory_layer_visit(self) -> InMemoryLayerVisit {
match self {
LayerVisit::InMemoryLayer(visit) => visit,
LayerVisit::PersistentLayer(visit) => {
panic!("Invalid attempt to cast to InMemoryLayerVisit: {visit:?}");
}
}
}
}
impl PersistentLayerVisit {
fn into_delta_layer_visit(self) -> DeltaLayerVisit {
match self {
PersistentLayerVisit::DeltaLayer(visit) => visit,
PersistentLayerVisit::ImageLayer(visit) => {
panic!("Invalid attempt to cast to DeltaLayerVisit: {visit:?}");
}
}
}
fn into_image_layer_visit(self) -> ImageLayerVisit {
match self {
PersistentLayerVisit::ImageLayer(visit) => visit,
PersistentLayerVisit::DeltaLayer(visit) => {
panic!("Invalid attempt to cast to ImageLayerVisit: {visit:?}");
}
}
}
}
impl LayerFringe {
pub(crate) fn new() -> Self {
LayerFringe {
planned_reads_by_lsn: BinaryHeap::new(),
layers: HashMap::new(),
visits_by_lsn_index: BinaryHeap::new(),
layer_visits: HashMap::new(),
}
}
pub(crate) fn next_layer(&mut self) -> Option<(ReadableLayer, KeySpace, Range<Lsn>)> {
let read_desc = match self.planned_reads_by_lsn.pop() {
pub(crate) fn next_layer(&mut self) -> Option<(ReadableLayer, LayerVisit)> {
let read_desc = match self.visits_by_lsn_index.pop() {
Some(desc) => desc,
None => return None,
};
let removed = self.layers.remove_entry(&read_desc.layer_id);
match removed {
Some((
_,
LayerKeyspace {
layer,
mut target_keyspace,
},
)) => Some((
layer,
target_keyspace.consume_keyspace(),
read_desc.lsn_range,
)),
None => unreachable!("fringe internals are always consistent"),
}
let removed = self.layer_visits.remove(&read_desc.layer_id)?;
Some(removed.build())
}
pub(crate) fn update(
@@ -352,22 +748,18 @@ impl LayerFringe {
lsn_range: Range<Lsn>,
) {
let layer_id = layer.id();
let entry = self.layers.entry(layer_id.clone());
let entry = self.layer_visits.entry(layer_id.clone());
match entry {
Entry::Occupied(mut entry) => {
entry.get_mut().target_keyspace.add_keyspace(keyspace);
entry.get_mut().update(keyspace, lsn_range);
}
Entry::Vacant(entry) => {
self.planned_reads_by_lsn.push(ReadDesc {
lsn_range,
self.visits_by_lsn_index.push(VisitLocation {
lsn_range: lsn_range.clone(),
layer_id: layer_id.clone(),
});
let mut accum = KeySpaceRandomAccum::new();
accum.add_keyspace(keyspace);
entry.insert(LayerKeyspace {
layer,
target_keyspace: accum,
});
entry.insert(LayerVisitBuilder::new(layer, keyspace, lsn_range));
}
}
}
@@ -379,7 +771,7 @@ impl Default for LayerFringe {
}
}
impl Ord for ReadDesc {
impl Ord for VisitLocation {
fn cmp(&self, other: &Self) -> Ordering {
let ord = self.lsn_range.end.cmp(&other.lsn_range.end);
if ord == std::cmp::Ordering::Equal {
@@ -390,19 +782,19 @@ impl Ord for ReadDesc {
}
}
impl PartialOrd for ReadDesc {
impl PartialOrd for VisitLocation {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for ReadDesc {
impl PartialEq for VisitLocation {
fn eq(&self, other: &Self) -> bool {
self.lsn_range == other.lsn_range
}
}
impl Eq for ReadDesc {}
impl Eq for VisitLocation {}
impl ReadableLayer {
pub(crate) fn id(&self) -> LayerId {
@@ -414,20 +806,27 @@ impl ReadableLayer {
pub(crate) async fn get_values_reconstruct_data(
&self,
keyspace: KeySpace,
lsn_range: Range<Lsn>,
visit: LayerVisit,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
match self {
ReadableLayer::PersistentLayer(layer) => {
layer
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, ctx)
.get_values_reconstruct_data(
visit.into_persistent_layer_visit(),
reconstruct_state,
ctx,
)
.await
}
ReadableLayer::InMemoryLayer(layer) => {
layer
.get_values_reconstruct_data(keyspace, lsn_range.end, reconstruct_state, ctx)
.get_values_reconstruct_data(
visit.into_in_memory_layer_visit(),
reconstruct_state,
ctx,
)
.await
}
}

View File

@@ -75,7 +75,7 @@ use utils::{
lsn::Lsn,
};
use super::{AsLayerDesc, LayerName, PersistentLayerDesc, ValuesReconstructState};
use super::{AsLayerDesc, DeltaLayerVisit, LayerName, PersistentLayerDesc, ValuesReconstructState};
///
/// Header stored in the beginning of the file
@@ -841,8 +841,7 @@ impl DeltaLayerInner {
// can be further optimised to visit the index only once.
pub(super) async fn get_values_reconstruct_data(
&self,
keyspace: KeySpace,
lsn_range: Range<Lsn>,
visit: DeltaLayerVisit,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
@@ -863,8 +862,8 @@ impl DeltaLayerInner {
let data_end_offset = self.index_start_offset();
let reads = Self::plan_reads(
&keyspace,
lsn_range.clone(),
&visit.keyspaces,
visit.lsn_ceil,
data_end_offset,
index_reader,
planner,
@@ -877,14 +876,16 @@ impl DeltaLayerInner {
self.do_reads_and_update_state(reads, reconstruct_state, ctx)
.await;
reconstruct_state.on_lsn_advanced(&keyspace, lsn_range.start);
for (lsn_floor, keyspace) in visit.keyspaces {
reconstruct_state.on_lsn_advanced(&keyspace, lsn_floor);
}
Ok(())
}
async fn plan_reads<Reader>(
keyspace: &KeySpace,
lsn_range: Range<Lsn>,
keyspaces: &Vec<(Lsn, KeySpace)>,
lsn_ceil: Lsn,
data_end_offset: u64,
index_reader: DiskBtreeReader<Reader, DELTA_KEY_SIZE>,
mut planner: VectoredReadPlanner,
@@ -898,48 +899,52 @@ impl DeltaLayerInner {
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
.build();
for range in keyspace.ranges.iter() {
let mut range_end_handled = false;
for (lsn_floor, keyspace) in keyspaces {
let lsn_range = *lsn_floor..lsn_ceil;
let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
let index_stream = index_reader.clone().into_stream(&start_key.0, &ctx);
let mut index_stream = std::pin::pin!(index_stream);
for range in keyspace.ranges.iter() {
let mut range_end_handled = false;
while let Some(index_entry) = index_stream.next().await {
let (raw_key, value) = index_entry?;
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
let blob_ref = BlobRef(value);
let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
let index_stream = index_reader.clone().into_stream(&start_key.0, &ctx);
let mut index_stream = std::pin::pin!(index_stream);
// Lsns are not monotonically increasing across keys, so we don't assert on them.
assert!(key >= range.start);
while let Some(index_entry) = index_stream.next().await {
let (raw_key, value) = index_entry?;
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
let blob_ref = BlobRef(value);
let outside_lsn_range = !lsn_range.contains(&lsn);
let below_cached_lsn = reconstruct_state.get_cached_lsn(&key) >= Some(lsn);
// Lsns are not monotonically increasing across keys, so we don't assert on them.
assert!(key >= range.start);
let flag = {
if outside_lsn_range || below_cached_lsn {
BlobFlag::Ignore
} else if blob_ref.will_init() {
BlobFlag::ReplaceAll
let outside_lsn_range = !lsn_range.contains(&lsn);
let below_cached_lsn = reconstruct_state.get_cached_lsn(&key) >= Some(lsn);
let flag = {
if outside_lsn_range || below_cached_lsn {
BlobFlag::Ignore
} else if blob_ref.will_init() {
BlobFlag::ReplaceAll
} else {
// Usual path: add blob to the read
BlobFlag::None
}
};
if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) {
planner.handle_range_end(blob_ref.pos());
range_end_handled = true;
break;
} else {
// Usual path: add blob to the read
BlobFlag::None
planner.handle(key, lsn, blob_ref.pos(), flag);
}
};
if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) {
planner.handle_range_end(blob_ref.pos());
range_end_handled = true;
break;
} else {
planner.handle(key, lsn, blob_ref.pos(), flag);
}
}
if !range_end_handled {
tracing::debug!("Handling range end fallback at {}", data_end_offset);
planner.handle_range_end(data_end_offset);
if !range_end_handled {
tracing::debug!("Handling range end fallback at {}", data_end_offset);
planner.handle_range_end(data_end_offset);
}
}
}
@@ -1641,8 +1646,8 @@ pub(crate) mod test {
// Plan and validate
let vectored_reads = DeltaLayerInner::plan_reads(
&keyspace,
lsn_range.clone(),
&vec![(lsn_range.start, keyspace.clone())],
lsn_range.end,
disk_offset,
reader,
planner,
@@ -1895,8 +1900,8 @@ pub(crate) mod test {
let data_end_offset = inner.index_start_blk as u64 * PAGE_SZ as u64;
let vectored_reads = DeltaLayerInner::plan_reads(
&keyspace,
entries_meta.lsn_range.clone(),
&vec![(entries_meta.lsn_range.start, keyspace)],
entries_meta.lsn_range.end,
data_end_offset,
index_reader,
planner,

View File

@@ -69,7 +69,7 @@ use utils::{
};
use super::layer_name::ImageLayerName;
use super::{AsLayerDesc, LayerName, PersistentLayerDesc, ValuesReconstructState};
use super::{AsLayerDesc, ImageLayerVisit, LayerName, PersistentLayerDesc, ValuesReconstructState};
///
/// Header stored in the beginning of the file
@@ -435,12 +435,12 @@ impl ImageLayerInner {
// the reconstruct state with whatever is found.
pub(super) async fn get_values_reconstruct_data(
&self,
keyspace: KeySpace,
visit: ImageLayerVisit,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
let reads = self
.plan_reads(keyspace, None, ctx)
.plan_reads(visit.keyspace, None, ctx)
.await
.map_err(GetVectoredError::Other)?;

View File

@@ -17,7 +17,6 @@ use anyhow::{anyhow, Context, Result};
use bytes::Bytes;
use camino::Utf8PathBuf;
use pageserver_api::key::CompactKey;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::InMemoryLayerInfo;
use pageserver_api::shard::TenantShardId;
use std::collections::{BTreeMap, HashMap};
@@ -36,7 +35,8 @@ use std::sync::atomic::{AtomicU64, AtomicUsize};
use tokio::sync::RwLock;
use super::{
DeltaLayerWriter, PersistentLayerDesc, ValueReconstructSituation, ValuesReconstructState,
DeltaLayerWriter, InMemoryLayerVisit, PersistentLayerDesc, ValueReconstructSituation,
ValuesReconstructState,
};
pub(crate) mod vectored_dio_read;
@@ -416,11 +416,14 @@ impl InMemoryLayer {
// If the key is cached, go no further than the cached Lsn.
pub(crate) async fn get_values_reconstruct_data(
&self,
keyspace: KeySpace,
end_lsn: Lsn,
visit: InMemoryLayerVisit,
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
let InMemoryLayerVisit {
keyspace, lsn_ceil, ..
} = visit;
let ctx = RequestContextBuilder::extend(ctx)
.page_content_kind(PageContentKind::InMemoryLayer)
.build();
@@ -440,8 +443,8 @@ impl InMemoryLayer {
{
let key = Key::from_compact(*key);
let lsn_range = match reconstruct_state.get_cached_lsn(&key) {
Some(cached_lsn) => (cached_lsn + 1)..end_lsn,
None => self.start_lsn..end_lsn,
Some(cached_lsn) => (cached_lsn + 1)..lsn_ceil,
None => self.start_lsn..lsn_ceil,
};
let slice = vec_map.slice_range(lsn_range);

View File

@@ -1,9 +1,7 @@
use anyhow::Context;
use camino::{Utf8Path, Utf8PathBuf};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::HistoricLayerInfo;
use pageserver_api::shard::{ShardIdentity, ShardIndex, TenantShardId};
use std::ops::Range;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Weak};
use std::time::{Duration, SystemTime};
@@ -23,7 +21,7 @@ use super::delta_layer::{self, DeltaEntry};
use super::image_layer::{self};
use super::{
AsLayerDesc, ImageLayerWriter, LayerAccessStats, LayerAccessStatsReset, LayerName,
LayerVisibilityHint, PersistentLayerDesc, ValuesReconstructState,
LayerVisibilityHint, PersistentLayerDesc, PersistentLayerVisit, ValuesReconstructState,
};
use utils::generation::Generation;
@@ -303,8 +301,7 @@ impl Layer {
pub(crate) async fn get_values_reconstruct_data(
&self,
keyspace: KeySpace,
lsn_range: Range<Lsn>,
visit: PersistentLayerVisit,
reconstruct_data: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
@@ -322,7 +319,7 @@ impl Layer {
self.record_access(ctx);
layer
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, &self.0, ctx)
.get_values_reconstruct_data(visit, reconstruct_data, &self.0, ctx)
.instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self))
.await
.map_err(|err| match err {
@@ -1741,8 +1738,7 @@ impl DownloadedLayer {
async fn get_values_reconstruct_data(
&self,
keyspace: KeySpace,
lsn_range: Range<Lsn>,
visit: PersistentLayerVisit,
reconstruct_data: &mut ValuesReconstructState,
owner: &Arc<LayerInner>,
ctx: &RequestContext,
@@ -1755,11 +1751,11 @@ impl DownloadedLayer {
.map_err(GetVectoredError::Other)?
{
Delta(d) => {
d.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, ctx)
d.get_values_reconstruct_data(visit.into_delta_layer_visit(), reconstruct_data, ctx)
.await
}
Image(i) => {
i.get_values_reconstruct_data(keyspace, reconstruct_data, ctx)
i.get_values_reconstruct_data(visit.into_image_layer_visit(), reconstruct_data, ctx)
.await
}
}

View File

@@ -1,6 +1,6 @@
use std::time::UNIX_EPOCH;
use pageserver_api::key::CONTROLFILE_KEY;
use pageserver_api::{key::CONTROLFILE_KEY, keyspace::KeySpace};
use tokio::task::JoinSet;
use utils::{
completion::{self, Completion},
@@ -9,7 +9,10 @@ use utils::{
use super::failpoints::{Failpoint, FailpointKind};
use super::*;
use crate::{context::DownloadBehavior, tenant::storage_layer::LayerVisibilityHint};
use crate::{
context::DownloadBehavior,
tenant::storage_layer::{ImageLayerVisit, LayerVisibilityHint},
};
use crate::{task_mgr::TaskKind, tenant::harness::TenantHarness};
/// Used in tests to advance a future to wanted await point, and not futher.
@@ -56,13 +59,14 @@ async fn smoke_test() {
let img_before = {
let mut data = ValuesReconstructState::default();
let visit = ImageLayerVisit {
keyspace: controlfile_keyspace.clone(),
lsn_floor: Lsn(0x10),
};
let visit = PersistentLayerVisit::ImageLayer(visit);
layer
.get_values_reconstruct_data(
controlfile_keyspace.clone(),
Lsn(0x10)..Lsn(0x11),
&mut data,
&ctx,
)
.get_values_reconstruct_data(visit, &mut data, &ctx)
.await
.unwrap();
data.keys
@@ -88,13 +92,14 @@ async fn smoke_test() {
// on accesses when the layer is evicted, it will automatically be downloaded.
let img_after = {
let mut data = ValuesReconstructState::default();
let visit = ImageLayerVisit {
keyspace: controlfile_keyspace.clone(),
lsn_floor: Lsn(0x10),
};
let visit = PersistentLayerVisit::ImageLayer(visit);
layer
.get_values_reconstruct_data(
controlfile_keyspace.clone(),
Lsn(0x10)..Lsn(0x11),
&mut data,
&ctx,
)
.get_values_reconstruct_data(visit, &mut data, &ctx)
.instrument(download_span.clone())
.await
.unwrap();

View File

@@ -136,7 +136,8 @@ use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::{
config::TenantConf, storage_layer::inmemory_layer, storage_layer::LayerVisibilityHint,
config::TenantConf,
storage_layer::{inmemory_layer, LayerVisibilityHint, LayerVisitDetails},
upload_queue::NotInitialized,
};
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
@@ -3153,12 +3154,12 @@ impl Timeline {
async fn get_vectored_reconstruct_data_timeline(
timeline: &Timeline,
keyspace: KeySpace,
mut cont_lsn: Lsn,
cont_lsn: Lsn,
reconstruct_state: &mut ValuesReconstructState,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> Result<TimelineVisitOutcome, GetVectoredError> {
let mut unmapped_keyspace = keyspace.clone();
let mut unmapped_keyspaces = vec![(cont_lsn, keyspace.clone())];
let mut fringe = LayerFringe::new();
let mut completed_keyspace = KeySpace::default();
@@ -3171,84 +3172,86 @@ impl Timeline {
let (keys_done_last_step, keys_with_image_coverage) =
reconstruct_state.consume_done_keys();
unmapped_keyspace.remove_overlapping_with(&keys_done_last_step);
// Update state that is external to the loop.
completed_keyspace.merge(&keys_done_last_step);
if let Some(keys_with_image_coverage) = keys_with_image_coverage {
unmapped_keyspace
.remove_overlapping_with(&KeySpace::single(keys_with_image_coverage.clone()));
if let Some(keys_with_image_coverage) = keys_with_image_coverage.clone() {
image_covered_keyspace.add_range(keys_with_image_coverage);
}
// Do not descent any further if the last layer we visited
// completed all keys in the keyspace it inspected. This is not
// required for correctness, but avoids visiting extra layers
// which turns out to be a perf bottleneck in some cases.
if !unmapped_keyspace.is_empty() {
let guard = timeline.layers.read().await;
let layers = guard.layer_map()?;
let in_memory_layer = layers.find_in_memory_layer(|l| {
let start_lsn = l.get_lsn_range().start;
cont_lsn > start_lsn
});
match in_memory_layer {
Some(l) => {
let lsn_range = l.get_lsn_range().start..cont_lsn;
fringe.update(
ReadableLayer::InMemoryLayer(l),
unmapped_keyspace.clone(),
lsn_range,
);
}
None => {
for range in unmapped_keyspace.ranges.iter() {
let results = layers.range_search(range.clone(), cont_lsn);
results
.found
.into_iter()
.map(|(SearchResult { layer, lsn_floor }, keyspace_accum)| {
(
ReadableLayer::PersistentLayer(guard.get_from_desc(&layer)),
keyspace_accum.to_keyspace(),
lsn_floor..cont_lsn,
)
})
.for_each(|(layer, keyspace, lsn_range)| {
fringe.update(layer, keyspace, lsn_range)
});
}
}
for (cont_lsn, unmapped_keyspace) in unmapped_keyspaces.iter_mut() {
// Remove any completed keys from the currently inspected keyspace.
unmapped_keyspace.remove_overlapping_with(&keys_done_last_step);
if let Some(keys_with_image_coverage) = keys_with_image_coverage.clone() {
unmapped_keyspace
.remove_overlapping_with(&KeySpace::single(keys_with_image_coverage));
}
// It's safe to drop the layer map lock after planning the next round of reads.
// The fringe keeps readable handles for the layers which are safe to read even
// if layers were compacted or flushed.
//
// The more interesting consideration is: "Why is the read algorithm still correct
// if the layer map changes while it is operating?". Doing a vectored read on a
// timeline boils down to pushing an imaginary lsn boundary downwards for each range
// covered by the read. The layer map tells us how to move the lsn downwards for a
// range at *a particular point in time*. It is fine for the answer to be different
// at two different time points.
drop(guard);
// Do not descent any further if the last layer we visited
// completed all keys in the keyspace it inspected. This is not
// required for correctness, but avoids visiting extra layers
// which turns out to be a perf bottleneck in some cases.
if !unmapped_keyspace.is_empty() {
let guard = timeline.layers.read().await;
let layers = guard.layer_map()?;
let in_memory_layer = layers.find_in_memory_layer(|l| {
let start_lsn = l.get_lsn_range().start;
*cont_lsn > start_lsn
});
match in_memory_layer {
Some(l) => {
let lsn_range = l.get_lsn_range().start..*cont_lsn;
fringe.update(
ReadableLayer::InMemoryLayer(l),
unmapped_keyspace.clone(),
lsn_range,
);
}
None => {
for range in unmapped_keyspace.ranges.iter() {
let results = layers.range_search(range.clone(), *cont_lsn);
results
.found
.into_iter()
.map(|(SearchResult { layer, lsn_floor }, keyspace_accum)| {
(
ReadableLayer::PersistentLayer(
guard.get_from_desc(&layer),
),
keyspace_accum.to_keyspace(),
lsn_floor..*cont_lsn,
)
})
.for_each(|(layer, keyspace, lsn_range)| {
fringe.update(layer, keyspace, lsn_range)
});
}
}
}
// It's safe to drop the layer map lock after planning the next round of reads.
// The fringe keeps readable handles for the layers which are safe to read even
// if layers were compacted or flushed.
//
// The more interesting consideration is: "Why is the read algorithm still correct
// if the layer map changes while it is operating?". Doing a vectored read on a
// timeline boils down to pushing an imaginary lsn boundary downwards for each range
// covered by the read. The layer map tells us how to move the lsn downwards for a
// range at *a particular point in time*. It is fine for the answer to be different
// at two different time points.
drop(guard);
}
}
if let Some((layer_to_read, keyspace_to_read, lsn_range)) = fringe.next_layer() {
let next_cont_lsn = lsn_range.start;
if let Some((layer_to_read, layer_visit)) = fringe.next_layer() {
unmapped_keyspaces = layer_visit.keyspaces();
layer_to_read
.get_values_reconstruct_data(
keyspace_to_read.clone(),
lsn_range,
reconstruct_state,
ctx,
)
.get_values_reconstruct_data(layer_visit, reconstruct_state, ctx)
.await?;
unmapped_keyspace = keyspace_to_read;
cont_lsn = next_cont_lsn;
reconstruct_state.on_layer_visited(&layer_to_read);
} else {
break;
@@ -5502,19 +5505,24 @@ impl Timeline {
lsn: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<Vec<(Key, Bytes)>> {
use super::storage_layer::{LayerVisitBuilder, LayerVisitBuilderUpdate};
let mut all_data = Vec::new();
let guard = self.layers.read().await;
for layer in guard.layer_map()?.iter_historic_layers() {
if !layer.is_delta() && layer.image_layer_lsn() == lsn {
let layer = guard.get_from_desc(&layer);
let mut reconstruct_data = ValuesReconstructState::default();
let (layer, visit) = LayerVisitBuilder::new(
ReadableLayer::PersistentLayer(layer),
KeySpace::single(Key::MIN..Key::MAX),
lsn..Lsn(lsn.0 + 1),
)
.build();
layer
.get_values_reconstruct_data(
KeySpace::single(Key::MIN..Key::MAX),
lsn..Lsn(lsn.0 + 1),
&mut reconstruct_data,
ctx,
)
.get_values_reconstruct_data(visit, &mut reconstruct_data, ctx)
.await?;
for (k, v) in reconstruct_data.keys {
all_data.push((k, v?.img.unwrap().1));