mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-15 04:00:38 +00:00
pageserver: handle reading up to different floor LSNs in deltas
Problem Different keyspaces may require different floor LSNs in vectored delta layer visits. This patch adds support for such cases. Summary of Changes * Rework layer visit collection. Each layer type has a separate visit type which is aware of the requirements. For delta layers we track the floor LSN of keyspaces and merge only when that's matching. Other layer types do not have this requirement so they merge everything. * Thread the new visit types into the `get_values_reconstruct_data` calls. For delta layers, the code was adapted such that it may merge reads across keyspaces with different LSN floor requirements. * Tweak the fringe update code in `get_vectored_reconstruct_data_timeline` to handle different cont LSNs for different keyspaces. In practice, we will only update the fringe from one keyspace, since this "keyspace split" only happens when an image layer overlaps a delta layer (and image layers always complete all their keys). * Update tests with the new interfaces
This commit is contained in:
@@ -287,7 +287,7 @@ pub(crate) enum ReadableLayer {
|
||||
|
||||
/// A partial description of a read to be done.
|
||||
#[derive(Debug, Clone)]
|
||||
struct ReadDesc {
|
||||
struct VisitLocation {
|
||||
/// An id used to resolve the readable layer within the fringe
|
||||
layer_id: LayerId,
|
||||
/// Lsn range for the read, used for selecting the next read
|
||||
@@ -303,46 +303,442 @@ struct ReadDesc {
|
||||
/// a two layer indexing scheme.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct LayerFringe {
|
||||
planned_reads_by_lsn: BinaryHeap<ReadDesc>,
|
||||
layers: HashMap<LayerId, LayerKeyspace>,
|
||||
visits_by_lsn_index: BinaryHeap<VisitLocation>,
|
||||
layer_visits: HashMap<LayerId, LayerVisitBuilder>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct LayerKeyspace {
|
||||
layer: ReadableLayer,
|
||||
target_keyspace: KeySpaceRandomAccum,
|
||||
pub(crate) enum LayerVisitBuilder {
|
||||
InMemoryLayer(InMemoryLayerVisitBuilder),
|
||||
PersistentLayer(PersistentLayerVisitBuilder),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum LayerVisit {
|
||||
InMemoryLayer(InMemoryLayerVisit),
|
||||
PersistentLayer(PersistentLayerVisit),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum PersistentLayerVisitBuilder {
|
||||
DeltaLayer(DeltaLayerVisitBuilder),
|
||||
ImageLayer(ImageLayerVisitBuilder),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum PersistentLayerVisit {
|
||||
DeltaLayer(DeltaLayerVisit),
|
||||
ImageLayer(ImageLayerVisit),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct InMemoryLayerVisitBuilder {
|
||||
/// Key space accumulator which will define which keys we are
|
||||
/// interested in for this layer visit.
|
||||
keyspace_accum: KeySpaceRandomAccum,
|
||||
/// Ignore any keys with an LSN greater or equal
|
||||
/// than the specified one.
|
||||
lsn_ceil: Lsn,
|
||||
/// Only consider keys at LSN greater or equal than the specified one
|
||||
lsn_floor: Lsn,
|
||||
// The in-memory layer to visit
|
||||
layer: Arc<InMemoryLayer>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct InMemoryLayerVisit {
|
||||
/// Key space of keys considered by the visit
|
||||
keyspace: KeySpace,
|
||||
/// Ignore any keys with an LSN greater or equal
|
||||
/// than the specified one.
|
||||
lsn_ceil: Lsn,
|
||||
/// Only consider keys at LSN greater or equal than the specified one
|
||||
lsn_floor: Lsn,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct DeltaLayerVisitBuilder {
|
||||
/// List of key spaces accumulators which will define what deltas are read.
|
||||
/// Each accumulator has an associated LSN which specifies
|
||||
/// the LSN floor for it (i.e. do not read below this LSN).
|
||||
keyspace_accums: HashMap<Lsn, KeySpaceRandomAccum>,
|
||||
/// Ignore any keys with an LSN greater or equal
|
||||
/// than the specified one.
|
||||
lsn_ceil: Lsn,
|
||||
/// Handle for the layer to visit (guaranteed to be a delta layer)
|
||||
layer: Layer,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct DeltaLayerVisit {
|
||||
/// List of key spaces considered during the visit.
|
||||
/// Each keyspace has an associated LSN which specifies
|
||||
/// the LSN floor for it (i.e. do not read below this LSN).
|
||||
keyspaces: Vec<(Lsn, KeySpace)>,
|
||||
/// Ignore any keys with an LSN greater or equal
|
||||
/// than the specified one.
|
||||
lsn_ceil: Lsn,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct ImageLayerVisitBuilder {
|
||||
/// Key space which defines which keys we are
|
||||
/// interested in for this layer visit.
|
||||
keyspace_accum: KeySpaceRandomAccum,
|
||||
/// Handle for the layer to visit (guaranteed to be an image layer)
|
||||
layer: Layer,
|
||||
/// Only consider keys at LSN greater or equal than the specified one
|
||||
lsn_floor: Lsn,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct ImageLayerVisit {
|
||||
/// Key space which defines which keys we are
|
||||
/// interested in for this layer visit.
|
||||
keyspace: KeySpace,
|
||||
/// Only consider keys at LSN greater or equal than the specified one
|
||||
lsn_floor: Lsn,
|
||||
}
|
||||
|
||||
impl LayerVisitBuilder {
|
||||
pub(crate) fn new(layer: ReadableLayer, keyspace: KeySpace, lsn_range: Range<Lsn>) -> Self {
|
||||
match layer {
|
||||
ReadableLayer::InMemoryLayer(in_mem) => {
|
||||
Self::InMemoryLayer(InMemoryLayerVisitBuilder::new(in_mem, keyspace, lsn_range))
|
||||
}
|
||||
ReadableLayer::PersistentLayer(persistent) => Self::PersistentLayer(
|
||||
PersistentLayerVisitBuilder::new(persistent, keyspace, lsn_range),
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl InMemoryLayerVisitBuilder {
|
||||
fn new(layer: Arc<InMemoryLayer>, keyspace: KeySpace, lsn_range: Range<Lsn>) -> Self {
|
||||
assert_eq!(lsn_range.start, layer.get_lsn_range().start);
|
||||
|
||||
let mut keyspace_accum = KeySpaceRandomAccum::new();
|
||||
keyspace_accum.add_keyspace(keyspace);
|
||||
|
||||
Self {
|
||||
keyspace_accum,
|
||||
lsn_floor: lsn_range.start,
|
||||
lsn_ceil: lsn_range.end,
|
||||
layer,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PersistentLayerVisitBuilder {
|
||||
fn new(layer: Layer, keyspace: KeySpace, lsn_range: Range<Lsn>) -> Self {
|
||||
let is_delta = layer.layer_desc().is_delta;
|
||||
if is_delta {
|
||||
Self::DeltaLayer(DeltaLayerVisitBuilder::new(layer, keyspace, lsn_range))
|
||||
} else {
|
||||
Self::ImageLayer(ImageLayerVisitBuilder::new(layer, keyspace, lsn_range))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DeltaLayerVisitBuilder {
|
||||
fn new(layer: Layer, keyspace: KeySpace, lsn_range: Range<Lsn>) -> Self {
|
||||
assert!(layer.layer_desc().is_delta);
|
||||
|
||||
let mut keyspace_accum = KeySpaceRandomAccum::new();
|
||||
keyspace_accum.add_keyspace(keyspace);
|
||||
|
||||
Self {
|
||||
keyspace_accums: HashMap::from([(lsn_range.start, keyspace_accum)]),
|
||||
lsn_ceil: lsn_range.end,
|
||||
layer,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ImageLayerVisitBuilder {
|
||||
fn new(layer: Layer, keyspace: KeySpace, lsn_range: Range<Lsn>) -> Self {
|
||||
assert!(!layer.layer_desc().is_delta);
|
||||
assert_eq!(lsn_range.start, layer.layer_desc().lsn_range.start);
|
||||
|
||||
let mut keyspace_accum = KeySpaceRandomAccum::new();
|
||||
keyspace_accum.add_keyspace(keyspace);
|
||||
|
||||
Self {
|
||||
keyspace_accum,
|
||||
lsn_floor: lsn_range.start,
|
||||
layer,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) trait LayerVisitBuilderUpdate {
|
||||
type L;
|
||||
type LV;
|
||||
|
||||
/// Extend an already planned layer visit to also include the keys
|
||||
/// in the provided keyspace and LSN range.
|
||||
fn update(&mut self, keyspace: KeySpace, lsn_range: Range<Lsn>);
|
||||
|
||||
/// Build the visit!
|
||||
fn build(self) -> (Self::L, Self::LV);
|
||||
}
|
||||
|
||||
impl LayerVisitBuilderUpdate for LayerVisitBuilder {
|
||||
type L = ReadableLayer;
|
||||
type LV = LayerVisit;
|
||||
|
||||
fn update(&mut self, keyspace: KeySpace, lsn_range: Range<Lsn>) {
|
||||
match self {
|
||||
LayerVisitBuilder::InMemoryLayer(v) => v.update(keyspace, lsn_range),
|
||||
LayerVisitBuilder::PersistentLayer(v) => v.update(keyspace, lsn_range),
|
||||
}
|
||||
}
|
||||
|
||||
fn build(self) -> (Self::L, Self::LV) {
|
||||
match self {
|
||||
LayerVisitBuilder::InMemoryLayer(in_mem) => (
|
||||
ReadableLayer::InMemoryLayer(in_mem.layer),
|
||||
LayerVisit::InMemoryLayer(InMemoryLayerVisit {
|
||||
keyspace: in_mem.keyspace_accum.to_keyspace(),
|
||||
lsn_ceil: in_mem.lsn_ceil,
|
||||
lsn_floor: in_mem.lsn_floor,
|
||||
}),
|
||||
),
|
||||
LayerVisitBuilder::PersistentLayer(pers) => {
|
||||
let (layer, visit) = pers.build();
|
||||
(
|
||||
ReadableLayer::PersistentLayer(layer),
|
||||
LayerVisit::PersistentLayer(visit),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl LayerVisitBuilderUpdate for PersistentLayerVisitBuilder {
|
||||
type L = Layer;
|
||||
type LV = PersistentLayerVisit;
|
||||
|
||||
fn update(&mut self, keyspace: KeySpace, lsn_range: Range<Lsn>) {
|
||||
match self {
|
||||
PersistentLayerVisitBuilder::DeltaLayer(v) => v.update(keyspace, lsn_range),
|
||||
PersistentLayerVisitBuilder::ImageLayer(v) => v.update(keyspace, lsn_range),
|
||||
}
|
||||
}
|
||||
|
||||
fn build(self) -> (Self::L, Self::LV) {
|
||||
match self {
|
||||
PersistentLayerVisitBuilder::DeltaLayer(delta) => {
|
||||
let (layer, visit) = delta.build();
|
||||
(layer, PersistentLayerVisit::DeltaLayer(visit))
|
||||
}
|
||||
PersistentLayerVisitBuilder::ImageLayer(img) => {
|
||||
let (layer, visit) = img.build();
|
||||
(layer, PersistentLayerVisit::ImageLayer(visit))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl LayerVisitBuilderUpdate for InMemoryLayerVisitBuilder {
|
||||
type L = Arc<InMemoryLayer>;
|
||||
type LV = InMemoryLayerVisit;
|
||||
|
||||
fn update(&mut self, keyspace: KeySpace, lsn_range: Range<Lsn>) {
|
||||
// Note: I cannot think of any cases when this update should happen,
|
||||
// since in memory layers span the entire key range.
|
||||
assert_eq!(lsn_range.end, self.lsn_ceil);
|
||||
assert_eq!(lsn_range.start, self.lsn_floor);
|
||||
self.keyspace_accum.add_keyspace(keyspace);
|
||||
}
|
||||
|
||||
fn build(self) -> (Self::L, Self::LV) {
|
||||
(
|
||||
self.layer,
|
||||
InMemoryLayerVisit {
|
||||
keyspace: self.keyspace_accum.to_keyspace(),
|
||||
lsn_floor: self.lsn_floor,
|
||||
lsn_ceil: self.lsn_ceil,
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl LayerVisitBuilderUpdate for DeltaLayerVisitBuilder {
|
||||
type L = Layer;
|
||||
type LV = DeltaLayerVisit;
|
||||
|
||||
fn update(&mut self, keyspace: KeySpace, lsn_range: Range<Lsn>) {
|
||||
assert_eq!(lsn_range.end, self.lsn_ceil);
|
||||
self.keyspace_accums
|
||||
.entry(lsn_range.start)
|
||||
.or_default()
|
||||
.add_keyspace(keyspace);
|
||||
}
|
||||
|
||||
fn build(self) -> (Self::L, Self::LV) {
|
||||
use itertools::Itertools;
|
||||
|
||||
let keyspaces = self
|
||||
.keyspace_accums
|
||||
.into_iter()
|
||||
.filter_map(|(lsn_floor, accum)| {
|
||||
let keyspace = accum.to_keyspace();
|
||||
if keyspace.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some((lsn_floor, keyspace))
|
||||
}
|
||||
})
|
||||
.sorted_by_key(|(_lsn_floor, keyspace)| keyspace.start().unwrap())
|
||||
.collect::<Vec<(Lsn, KeySpace)>>();
|
||||
|
||||
if cfg!(debug_assertions) {
|
||||
// Check that the keyspaces we are going to read from
|
||||
// a layer are non-overlapping.
|
||||
//
|
||||
// The keyspaces provided to vectored read initially are non-overlapping.
|
||||
// We may split keyspaces at each step and keyspaces resulting from a split
|
||||
// are non-overlapping as well. One can prove that the property holds by
|
||||
// induction.
|
||||
let mut prev_end: Option<Key> = None;
|
||||
for (_lsn_floor, crnt) in keyspaces.iter() {
|
||||
if let Some(prev_end) = prev_end {
|
||||
debug_assert!(prev_end <= crnt.start().unwrap())
|
||||
}
|
||||
|
||||
prev_end = Some(crnt.end().unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
(
|
||||
self.layer,
|
||||
DeltaLayerVisit {
|
||||
keyspaces,
|
||||
lsn_ceil: self.lsn_ceil,
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl LayerVisitBuilderUpdate for ImageLayerVisitBuilder {
|
||||
type L = Layer;
|
||||
type LV = ImageLayerVisit;
|
||||
|
||||
fn update(&mut self, keyspace: KeySpace, lsn_range: Range<Lsn>) {
|
||||
assert_eq!(lsn_range.start, self.lsn_floor);
|
||||
self.keyspace_accum.add_keyspace(keyspace);
|
||||
}
|
||||
|
||||
fn build(self) -> (Self::L, Self::LV) {
|
||||
(
|
||||
self.layer,
|
||||
ImageLayerVisit {
|
||||
keyspace: self.keyspace_accum.to_keyspace(),
|
||||
lsn_floor: self.lsn_floor,
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) trait LayerVisitDetails {
|
||||
/// Returns the key spaces planned for the visit
|
||||
/// and their associated floor LSNs.
|
||||
fn keyspaces(&self) -> Vec<(Lsn, KeySpace)>;
|
||||
}
|
||||
|
||||
impl LayerVisitDetails for LayerVisit {
|
||||
fn keyspaces(&self) -> Vec<(Lsn, KeySpace)> {
|
||||
match self {
|
||||
LayerVisit::PersistentLayer(pers) => pers.keyspaces(),
|
||||
LayerVisit::InMemoryLayer(in_mem) => in_mem.keyspaces(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl LayerVisitDetails for PersistentLayerVisit {
|
||||
fn keyspaces(&self) -> Vec<(Lsn, KeySpace)> {
|
||||
match self {
|
||||
PersistentLayerVisit::DeltaLayer(delta) => delta.keyspaces(),
|
||||
PersistentLayerVisit::ImageLayer(image) => image.keyspaces(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl LayerVisitDetails for DeltaLayerVisit {
|
||||
fn keyspaces(&self) -> Vec<(Lsn, KeySpace)> {
|
||||
self.keyspaces.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl LayerVisitDetails for ImageLayerVisit {
|
||||
fn keyspaces(&self) -> Vec<(Lsn, KeySpace)> {
|
||||
vec![(self.lsn_floor, self.keyspace.clone())]
|
||||
}
|
||||
}
|
||||
|
||||
impl LayerVisitDetails for InMemoryLayerVisit {
|
||||
fn keyspaces(&self) -> Vec<(Lsn, KeySpace)> {
|
||||
vec![(self.lsn_floor, self.keyspace.clone())]
|
||||
}
|
||||
}
|
||||
|
||||
impl LayerVisit {
|
||||
fn into_persistent_layer_visit(self) -> PersistentLayerVisit {
|
||||
match self {
|
||||
LayerVisit::PersistentLayer(visit) => visit,
|
||||
LayerVisit::InMemoryLayer(visit) => {
|
||||
panic!("Invalid attempt to cast to PersistentLayerVisit: {visit:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn into_in_memory_layer_visit(self) -> InMemoryLayerVisit {
|
||||
match self {
|
||||
LayerVisit::InMemoryLayer(visit) => visit,
|
||||
LayerVisit::PersistentLayer(visit) => {
|
||||
panic!("Invalid attempt to cast to InMemoryLayerVisit: {visit:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PersistentLayerVisit {
|
||||
fn into_delta_layer_visit(self) -> DeltaLayerVisit {
|
||||
match self {
|
||||
PersistentLayerVisit::DeltaLayer(visit) => visit,
|
||||
PersistentLayerVisit::ImageLayer(visit) => {
|
||||
panic!("Invalid attempt to cast to DeltaLayerVisit: {visit:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn into_image_layer_visit(self) -> ImageLayerVisit {
|
||||
match self {
|
||||
PersistentLayerVisit::ImageLayer(visit) => visit,
|
||||
PersistentLayerVisit::DeltaLayer(visit) => {
|
||||
panic!("Invalid attempt to cast to ImageLayerVisit: {visit:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl LayerFringe {
|
||||
pub(crate) fn new() -> Self {
|
||||
LayerFringe {
|
||||
planned_reads_by_lsn: BinaryHeap::new(),
|
||||
layers: HashMap::new(),
|
||||
visits_by_lsn_index: BinaryHeap::new(),
|
||||
layer_visits: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn next_layer(&mut self) -> Option<(ReadableLayer, KeySpace, Range<Lsn>)> {
|
||||
let read_desc = match self.planned_reads_by_lsn.pop() {
|
||||
pub(crate) fn next_layer(&mut self) -> Option<(ReadableLayer, LayerVisit)> {
|
||||
let read_desc = match self.visits_by_lsn_index.pop() {
|
||||
Some(desc) => desc,
|
||||
None => return None,
|
||||
};
|
||||
|
||||
let removed = self.layers.remove_entry(&read_desc.layer_id);
|
||||
|
||||
match removed {
|
||||
Some((
|
||||
_,
|
||||
LayerKeyspace {
|
||||
layer,
|
||||
mut target_keyspace,
|
||||
},
|
||||
)) => Some((
|
||||
layer,
|
||||
target_keyspace.consume_keyspace(),
|
||||
read_desc.lsn_range,
|
||||
)),
|
||||
None => unreachable!("fringe internals are always consistent"),
|
||||
}
|
||||
let removed = self.layer_visits.remove(&read_desc.layer_id)?;
|
||||
Some(removed.build())
|
||||
}
|
||||
|
||||
pub(crate) fn update(
|
||||
@@ -352,22 +748,18 @@ impl LayerFringe {
|
||||
lsn_range: Range<Lsn>,
|
||||
) {
|
||||
let layer_id = layer.id();
|
||||
let entry = self.layers.entry(layer_id.clone());
|
||||
let entry = self.layer_visits.entry(layer_id.clone());
|
||||
match entry {
|
||||
Entry::Occupied(mut entry) => {
|
||||
entry.get_mut().target_keyspace.add_keyspace(keyspace);
|
||||
entry.get_mut().update(keyspace, lsn_range);
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
self.planned_reads_by_lsn.push(ReadDesc {
|
||||
lsn_range,
|
||||
self.visits_by_lsn_index.push(VisitLocation {
|
||||
lsn_range: lsn_range.clone(),
|
||||
layer_id: layer_id.clone(),
|
||||
});
|
||||
let mut accum = KeySpaceRandomAccum::new();
|
||||
accum.add_keyspace(keyspace);
|
||||
entry.insert(LayerKeyspace {
|
||||
layer,
|
||||
target_keyspace: accum,
|
||||
});
|
||||
|
||||
entry.insert(LayerVisitBuilder::new(layer, keyspace, lsn_range));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -379,7 +771,7 @@ impl Default for LayerFringe {
|
||||
}
|
||||
}
|
||||
|
||||
impl Ord for ReadDesc {
|
||||
impl Ord for VisitLocation {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
let ord = self.lsn_range.end.cmp(&other.lsn_range.end);
|
||||
if ord == std::cmp::Ordering::Equal {
|
||||
@@ -390,19 +782,19 @@ impl Ord for ReadDesc {
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialOrd for ReadDesc {
|
||||
impl PartialOrd for VisitLocation {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for ReadDesc {
|
||||
impl PartialEq for VisitLocation {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.lsn_range == other.lsn_range
|
||||
}
|
||||
}
|
||||
|
||||
impl Eq for ReadDesc {}
|
||||
impl Eq for VisitLocation {}
|
||||
|
||||
impl ReadableLayer {
|
||||
pub(crate) fn id(&self) -> LayerId {
|
||||
@@ -414,20 +806,27 @@ impl ReadableLayer {
|
||||
|
||||
pub(crate) async fn get_values_reconstruct_data(
|
||||
&self,
|
||||
keyspace: KeySpace,
|
||||
lsn_range: Range<Lsn>,
|
||||
visit: LayerVisit,
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), GetVectoredError> {
|
||||
match self {
|
||||
ReadableLayer::PersistentLayer(layer) => {
|
||||
layer
|
||||
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, ctx)
|
||||
.get_values_reconstruct_data(
|
||||
visit.into_persistent_layer_visit(),
|
||||
reconstruct_state,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
}
|
||||
ReadableLayer::InMemoryLayer(layer) => {
|
||||
layer
|
||||
.get_values_reconstruct_data(keyspace, lsn_range.end, reconstruct_state, ctx)
|
||||
.get_values_reconstruct_data(
|
||||
visit.into_in_memory_layer_visit(),
|
||||
reconstruct_state,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -75,7 +75,7 @@ use utils::{
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
use super::{AsLayerDesc, LayerName, PersistentLayerDesc, ValuesReconstructState};
|
||||
use super::{AsLayerDesc, DeltaLayerVisit, LayerName, PersistentLayerDesc, ValuesReconstructState};
|
||||
|
||||
///
|
||||
/// Header stored in the beginning of the file
|
||||
@@ -841,8 +841,7 @@ impl DeltaLayerInner {
|
||||
// can be further optimised to visit the index only once.
|
||||
pub(super) async fn get_values_reconstruct_data(
|
||||
&self,
|
||||
keyspace: KeySpace,
|
||||
lsn_range: Range<Lsn>,
|
||||
visit: DeltaLayerVisit,
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), GetVectoredError> {
|
||||
@@ -863,8 +862,8 @@ impl DeltaLayerInner {
|
||||
let data_end_offset = self.index_start_offset();
|
||||
|
||||
let reads = Self::plan_reads(
|
||||
&keyspace,
|
||||
lsn_range.clone(),
|
||||
&visit.keyspaces,
|
||||
visit.lsn_ceil,
|
||||
data_end_offset,
|
||||
index_reader,
|
||||
planner,
|
||||
@@ -877,14 +876,16 @@ impl DeltaLayerInner {
|
||||
self.do_reads_and_update_state(reads, reconstruct_state, ctx)
|
||||
.await;
|
||||
|
||||
reconstruct_state.on_lsn_advanced(&keyspace, lsn_range.start);
|
||||
for (lsn_floor, keyspace) in visit.keyspaces {
|
||||
reconstruct_state.on_lsn_advanced(&keyspace, lsn_floor);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn plan_reads<Reader>(
|
||||
keyspace: &KeySpace,
|
||||
lsn_range: Range<Lsn>,
|
||||
keyspaces: &Vec<(Lsn, KeySpace)>,
|
||||
lsn_ceil: Lsn,
|
||||
data_end_offset: u64,
|
||||
index_reader: DiskBtreeReader<Reader, DELTA_KEY_SIZE>,
|
||||
mut planner: VectoredReadPlanner,
|
||||
@@ -898,48 +899,52 @@ impl DeltaLayerInner {
|
||||
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
|
||||
.build();
|
||||
|
||||
for range in keyspace.ranges.iter() {
|
||||
let mut range_end_handled = false;
|
||||
for (lsn_floor, keyspace) in keyspaces {
|
||||
let lsn_range = *lsn_floor..lsn_ceil;
|
||||
|
||||
let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
|
||||
let index_stream = index_reader.clone().into_stream(&start_key.0, &ctx);
|
||||
let mut index_stream = std::pin::pin!(index_stream);
|
||||
for range in keyspace.ranges.iter() {
|
||||
let mut range_end_handled = false;
|
||||
|
||||
while let Some(index_entry) = index_stream.next().await {
|
||||
let (raw_key, value) = index_entry?;
|
||||
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
|
||||
let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
|
||||
let blob_ref = BlobRef(value);
|
||||
let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
|
||||
let index_stream = index_reader.clone().into_stream(&start_key.0, &ctx);
|
||||
let mut index_stream = std::pin::pin!(index_stream);
|
||||
|
||||
// Lsns are not monotonically increasing across keys, so we don't assert on them.
|
||||
assert!(key >= range.start);
|
||||
while let Some(index_entry) = index_stream.next().await {
|
||||
let (raw_key, value) = index_entry?;
|
||||
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
|
||||
let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
|
||||
let blob_ref = BlobRef(value);
|
||||
|
||||
let outside_lsn_range = !lsn_range.contains(&lsn);
|
||||
let below_cached_lsn = reconstruct_state.get_cached_lsn(&key) >= Some(lsn);
|
||||
// Lsns are not monotonically increasing across keys, so we don't assert on them.
|
||||
assert!(key >= range.start);
|
||||
|
||||
let flag = {
|
||||
if outside_lsn_range || below_cached_lsn {
|
||||
BlobFlag::Ignore
|
||||
} else if blob_ref.will_init() {
|
||||
BlobFlag::ReplaceAll
|
||||
let outside_lsn_range = !lsn_range.contains(&lsn);
|
||||
let below_cached_lsn = reconstruct_state.get_cached_lsn(&key) >= Some(lsn);
|
||||
|
||||
let flag = {
|
||||
if outside_lsn_range || below_cached_lsn {
|
||||
BlobFlag::Ignore
|
||||
} else if blob_ref.will_init() {
|
||||
BlobFlag::ReplaceAll
|
||||
} else {
|
||||
// Usual path: add blob to the read
|
||||
BlobFlag::None
|
||||
}
|
||||
};
|
||||
|
||||
if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) {
|
||||
planner.handle_range_end(blob_ref.pos());
|
||||
range_end_handled = true;
|
||||
break;
|
||||
} else {
|
||||
// Usual path: add blob to the read
|
||||
BlobFlag::None
|
||||
planner.handle(key, lsn, blob_ref.pos(), flag);
|
||||
}
|
||||
};
|
||||
|
||||
if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) {
|
||||
planner.handle_range_end(blob_ref.pos());
|
||||
range_end_handled = true;
|
||||
break;
|
||||
} else {
|
||||
planner.handle(key, lsn, blob_ref.pos(), flag);
|
||||
}
|
||||
}
|
||||
|
||||
if !range_end_handled {
|
||||
tracing::debug!("Handling range end fallback at {}", data_end_offset);
|
||||
planner.handle_range_end(data_end_offset);
|
||||
if !range_end_handled {
|
||||
tracing::debug!("Handling range end fallback at {}", data_end_offset);
|
||||
planner.handle_range_end(data_end_offset);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1641,8 +1646,8 @@ pub(crate) mod test {
|
||||
|
||||
// Plan and validate
|
||||
let vectored_reads = DeltaLayerInner::plan_reads(
|
||||
&keyspace,
|
||||
lsn_range.clone(),
|
||||
&vec![(lsn_range.start, keyspace.clone())],
|
||||
lsn_range.end,
|
||||
disk_offset,
|
||||
reader,
|
||||
planner,
|
||||
@@ -1895,8 +1900,8 @@ pub(crate) mod test {
|
||||
let data_end_offset = inner.index_start_blk as u64 * PAGE_SZ as u64;
|
||||
|
||||
let vectored_reads = DeltaLayerInner::plan_reads(
|
||||
&keyspace,
|
||||
entries_meta.lsn_range.clone(),
|
||||
&vec![(entries_meta.lsn_range.start, keyspace)],
|
||||
entries_meta.lsn_range.end,
|
||||
data_end_offset,
|
||||
index_reader,
|
||||
planner,
|
||||
|
||||
@@ -69,7 +69,7 @@ use utils::{
|
||||
};
|
||||
|
||||
use super::layer_name::ImageLayerName;
|
||||
use super::{AsLayerDesc, LayerName, PersistentLayerDesc, ValuesReconstructState};
|
||||
use super::{AsLayerDesc, ImageLayerVisit, LayerName, PersistentLayerDesc, ValuesReconstructState};
|
||||
|
||||
///
|
||||
/// Header stored in the beginning of the file
|
||||
@@ -435,12 +435,12 @@ impl ImageLayerInner {
|
||||
// the reconstruct state with whatever is found.
|
||||
pub(super) async fn get_values_reconstruct_data(
|
||||
&self,
|
||||
keyspace: KeySpace,
|
||||
visit: ImageLayerVisit,
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), GetVectoredError> {
|
||||
let reads = self
|
||||
.plan_reads(keyspace, None, ctx)
|
||||
.plan_reads(visit.keyspace, None, ctx)
|
||||
.await
|
||||
.map_err(GetVectoredError::Other)?;
|
||||
|
||||
|
||||
@@ -17,7 +17,6 @@ use anyhow::{anyhow, Context, Result};
|
||||
use bytes::Bytes;
|
||||
use camino::Utf8PathBuf;
|
||||
use pageserver_api::key::CompactKey;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::InMemoryLayerInfo;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
@@ -36,7 +35,8 @@ use std::sync::atomic::{AtomicU64, AtomicUsize};
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use super::{
|
||||
DeltaLayerWriter, PersistentLayerDesc, ValueReconstructSituation, ValuesReconstructState,
|
||||
DeltaLayerWriter, InMemoryLayerVisit, PersistentLayerDesc, ValueReconstructSituation,
|
||||
ValuesReconstructState,
|
||||
};
|
||||
|
||||
pub(crate) mod vectored_dio_read;
|
||||
@@ -416,11 +416,14 @@ impl InMemoryLayer {
|
||||
// If the key is cached, go no further than the cached Lsn.
|
||||
pub(crate) async fn get_values_reconstruct_data(
|
||||
&self,
|
||||
keyspace: KeySpace,
|
||||
end_lsn: Lsn,
|
||||
visit: InMemoryLayerVisit,
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), GetVectoredError> {
|
||||
let InMemoryLayerVisit {
|
||||
keyspace, lsn_ceil, ..
|
||||
} = visit;
|
||||
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::InMemoryLayer)
|
||||
.build();
|
||||
@@ -440,8 +443,8 @@ impl InMemoryLayer {
|
||||
{
|
||||
let key = Key::from_compact(*key);
|
||||
let lsn_range = match reconstruct_state.get_cached_lsn(&key) {
|
||||
Some(cached_lsn) => (cached_lsn + 1)..end_lsn,
|
||||
None => self.start_lsn..end_lsn,
|
||||
Some(cached_lsn) => (cached_lsn + 1)..lsn_ceil,
|
||||
None => self.start_lsn..lsn_ceil,
|
||||
};
|
||||
|
||||
let slice = vec_map.slice_range(lsn_range);
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
use anyhow::Context;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::HistoricLayerInfo;
|
||||
use pageserver_api::shard::{ShardIdentity, ShardIndex, TenantShardId};
|
||||
use std::ops::Range;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::time::{Duration, SystemTime};
|
||||
@@ -23,7 +21,7 @@ use super::delta_layer::{self, DeltaEntry};
|
||||
use super::image_layer::{self};
|
||||
use super::{
|
||||
AsLayerDesc, ImageLayerWriter, LayerAccessStats, LayerAccessStatsReset, LayerName,
|
||||
LayerVisibilityHint, PersistentLayerDesc, ValuesReconstructState,
|
||||
LayerVisibilityHint, PersistentLayerDesc, PersistentLayerVisit, ValuesReconstructState,
|
||||
};
|
||||
|
||||
use utils::generation::Generation;
|
||||
@@ -303,8 +301,7 @@ impl Layer {
|
||||
|
||||
pub(crate) async fn get_values_reconstruct_data(
|
||||
&self,
|
||||
keyspace: KeySpace,
|
||||
lsn_range: Range<Lsn>,
|
||||
visit: PersistentLayerVisit,
|
||||
reconstruct_data: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), GetVectoredError> {
|
||||
@@ -322,7 +319,7 @@ impl Layer {
|
||||
self.record_access(ctx);
|
||||
|
||||
layer
|
||||
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, &self.0, ctx)
|
||||
.get_values_reconstruct_data(visit, reconstruct_data, &self.0, ctx)
|
||||
.instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self))
|
||||
.await
|
||||
.map_err(|err| match err {
|
||||
@@ -1741,8 +1738,7 @@ impl DownloadedLayer {
|
||||
|
||||
async fn get_values_reconstruct_data(
|
||||
&self,
|
||||
keyspace: KeySpace,
|
||||
lsn_range: Range<Lsn>,
|
||||
visit: PersistentLayerVisit,
|
||||
reconstruct_data: &mut ValuesReconstructState,
|
||||
owner: &Arc<LayerInner>,
|
||||
ctx: &RequestContext,
|
||||
@@ -1755,11 +1751,11 @@ impl DownloadedLayer {
|
||||
.map_err(GetVectoredError::Other)?
|
||||
{
|
||||
Delta(d) => {
|
||||
d.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, ctx)
|
||||
d.get_values_reconstruct_data(visit.into_delta_layer_visit(), reconstruct_data, ctx)
|
||||
.await
|
||||
}
|
||||
Image(i) => {
|
||||
i.get_values_reconstruct_data(keyspace, reconstruct_data, ctx)
|
||||
i.get_values_reconstruct_data(visit.into_image_layer_visit(), reconstruct_data, ctx)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use std::time::UNIX_EPOCH;
|
||||
|
||||
use pageserver_api::key::CONTROLFILE_KEY;
|
||||
use pageserver_api::{key::CONTROLFILE_KEY, keyspace::KeySpace};
|
||||
use tokio::task::JoinSet;
|
||||
use utils::{
|
||||
completion::{self, Completion},
|
||||
@@ -9,7 +9,10 @@ use utils::{
|
||||
|
||||
use super::failpoints::{Failpoint, FailpointKind};
|
||||
use super::*;
|
||||
use crate::{context::DownloadBehavior, tenant::storage_layer::LayerVisibilityHint};
|
||||
use crate::{
|
||||
context::DownloadBehavior,
|
||||
tenant::storage_layer::{ImageLayerVisit, LayerVisibilityHint},
|
||||
};
|
||||
use crate::{task_mgr::TaskKind, tenant::harness::TenantHarness};
|
||||
|
||||
/// Used in tests to advance a future to wanted await point, and not futher.
|
||||
@@ -56,13 +59,14 @@ async fn smoke_test() {
|
||||
|
||||
let img_before = {
|
||||
let mut data = ValuesReconstructState::default();
|
||||
let visit = ImageLayerVisit {
|
||||
keyspace: controlfile_keyspace.clone(),
|
||||
lsn_floor: Lsn(0x10),
|
||||
};
|
||||
let visit = PersistentLayerVisit::ImageLayer(visit);
|
||||
|
||||
layer
|
||||
.get_values_reconstruct_data(
|
||||
controlfile_keyspace.clone(),
|
||||
Lsn(0x10)..Lsn(0x11),
|
||||
&mut data,
|
||||
&ctx,
|
||||
)
|
||||
.get_values_reconstruct_data(visit, &mut data, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
data.keys
|
||||
@@ -88,13 +92,14 @@ async fn smoke_test() {
|
||||
// on accesses when the layer is evicted, it will automatically be downloaded.
|
||||
let img_after = {
|
||||
let mut data = ValuesReconstructState::default();
|
||||
let visit = ImageLayerVisit {
|
||||
keyspace: controlfile_keyspace.clone(),
|
||||
lsn_floor: Lsn(0x10),
|
||||
};
|
||||
let visit = PersistentLayerVisit::ImageLayer(visit);
|
||||
|
||||
layer
|
||||
.get_values_reconstruct_data(
|
||||
controlfile_keyspace.clone(),
|
||||
Lsn(0x10)..Lsn(0x11),
|
||||
&mut data,
|
||||
&ctx,
|
||||
)
|
||||
.get_values_reconstruct_data(visit, &mut data, &ctx)
|
||||
.instrument(download_span.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -136,7 +136,8 @@ use self::logical_size::LogicalSize;
|
||||
use self::walreceiver::{WalReceiver, WalReceiverConf};
|
||||
|
||||
use super::{
|
||||
config::TenantConf, storage_layer::inmemory_layer, storage_layer::LayerVisibilityHint,
|
||||
config::TenantConf,
|
||||
storage_layer::{inmemory_layer, LayerVisibilityHint, LayerVisitDetails},
|
||||
upload_queue::NotInitialized,
|
||||
};
|
||||
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
|
||||
@@ -3153,12 +3154,12 @@ impl Timeline {
|
||||
async fn get_vectored_reconstruct_data_timeline(
|
||||
timeline: &Timeline,
|
||||
keyspace: KeySpace,
|
||||
mut cont_lsn: Lsn,
|
||||
cont_lsn: Lsn,
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<TimelineVisitOutcome, GetVectoredError> {
|
||||
let mut unmapped_keyspace = keyspace.clone();
|
||||
let mut unmapped_keyspaces = vec![(cont_lsn, keyspace.clone())];
|
||||
let mut fringe = LayerFringe::new();
|
||||
|
||||
let mut completed_keyspace = KeySpace::default();
|
||||
@@ -3169,86 +3170,83 @@ impl Timeline {
|
||||
return Err(GetVectoredError::Cancelled);
|
||||
}
|
||||
|
||||
let (keys_done_last_step, keys_with_image_coverage) =
|
||||
reconstruct_state.consume_done_keys();
|
||||
unmapped_keyspace.remove_overlapping_with(&keys_done_last_step);
|
||||
completed_keyspace.merge(&keys_done_last_step);
|
||||
if let Some(keys_with_image_coverage) = keys_with_image_coverage {
|
||||
unmapped_keyspace
|
||||
.remove_overlapping_with(&KeySpace::single(keys_with_image_coverage.clone()));
|
||||
image_covered_keyspace.add_range(keys_with_image_coverage);
|
||||
}
|
||||
|
||||
// Do not descent any further if the last layer we visited
|
||||
// completed all keys in the keyspace it inspected. This is not
|
||||
// required for correctness, but avoids visiting extra layers
|
||||
// which turns out to be a perf bottleneck in some cases.
|
||||
if !unmapped_keyspace.is_empty() {
|
||||
let guard = timeline.layers.read().await;
|
||||
let layers = guard.layer_map()?;
|
||||
|
||||
let in_memory_layer = layers.find_in_memory_layer(|l| {
|
||||
let start_lsn = l.get_lsn_range().start;
|
||||
cont_lsn > start_lsn
|
||||
});
|
||||
|
||||
match in_memory_layer {
|
||||
Some(l) => {
|
||||
let lsn_range = l.get_lsn_range().start..cont_lsn;
|
||||
fringe.update(
|
||||
ReadableLayer::InMemoryLayer(l),
|
||||
unmapped_keyspace.clone(),
|
||||
lsn_range,
|
||||
);
|
||||
}
|
||||
None => {
|
||||
for range in unmapped_keyspace.ranges.iter() {
|
||||
let results = layers.range_search(range.clone(), cont_lsn);
|
||||
|
||||
results
|
||||
.found
|
||||
.into_iter()
|
||||
.map(|(SearchResult { layer, lsn_floor }, keyspace_accum)| {
|
||||
(
|
||||
ReadableLayer::PersistentLayer(guard.get_from_desc(&layer)),
|
||||
keyspace_accum.to_keyspace(),
|
||||
lsn_floor..cont_lsn,
|
||||
)
|
||||
})
|
||||
.for_each(|(layer, keyspace, lsn_range)| {
|
||||
fringe.update(layer, keyspace, lsn_range)
|
||||
});
|
||||
}
|
||||
}
|
||||
for (cont_lsn, unmapped_keyspace) in unmapped_keyspaces.iter_mut() {
|
||||
let (keys_done_last_step, keys_with_image_coverage) =
|
||||
reconstruct_state.consume_done_keys();
|
||||
unmapped_keyspace.remove_overlapping_with(&keys_done_last_step);
|
||||
completed_keyspace.merge(&keys_done_last_step);
|
||||
if let Some(keys_with_image_coverage) = keys_with_image_coverage {
|
||||
unmapped_keyspace.remove_overlapping_with(&KeySpace::single(
|
||||
keys_with_image_coverage.clone(),
|
||||
));
|
||||
image_covered_keyspace.add_range(keys_with_image_coverage);
|
||||
}
|
||||
|
||||
// It's safe to drop the layer map lock after planning the next round of reads.
|
||||
// The fringe keeps readable handles for the layers which are safe to read even
|
||||
// if layers were compacted or flushed.
|
||||
//
|
||||
// The more interesting consideration is: "Why is the read algorithm still correct
|
||||
// if the layer map changes while it is operating?". Doing a vectored read on a
|
||||
// timeline boils down to pushing an imaginary lsn boundary downwards for each range
|
||||
// covered by the read. The layer map tells us how to move the lsn downwards for a
|
||||
// range at *a particular point in time*. It is fine for the answer to be different
|
||||
// at two different time points.
|
||||
drop(guard);
|
||||
// Do not descent any further if the last layer we visited
|
||||
// completed all keys in the keyspace it inspected. This is not
|
||||
// required for correctness, but avoids visiting extra layers
|
||||
// which turns out to be a perf bottleneck in some cases.
|
||||
if !unmapped_keyspace.is_empty() {
|
||||
let guard = timeline.layers.read().await;
|
||||
let layers = guard.layer_map()?;
|
||||
|
||||
let in_memory_layer = layers.find_in_memory_layer(|l| {
|
||||
let start_lsn = l.get_lsn_range().start;
|
||||
*cont_lsn > start_lsn
|
||||
});
|
||||
|
||||
match in_memory_layer {
|
||||
Some(l) => {
|
||||
let lsn_range = l.get_lsn_range().start..*cont_lsn;
|
||||
fringe.update(
|
||||
ReadableLayer::InMemoryLayer(l),
|
||||
unmapped_keyspace.clone(),
|
||||
lsn_range,
|
||||
);
|
||||
}
|
||||
None => {
|
||||
for range in unmapped_keyspace.ranges.iter() {
|
||||
let results = layers.range_search(range.clone(), *cont_lsn);
|
||||
|
||||
results
|
||||
.found
|
||||
.into_iter()
|
||||
.map(|(SearchResult { layer, lsn_floor }, keyspace_accum)| {
|
||||
(
|
||||
ReadableLayer::PersistentLayer(
|
||||
guard.get_from_desc(&layer),
|
||||
),
|
||||
keyspace_accum.to_keyspace(),
|
||||
lsn_floor..*cont_lsn,
|
||||
)
|
||||
})
|
||||
.for_each(|(layer, keyspace, lsn_range)| {
|
||||
fringe.update(layer, keyspace, lsn_range)
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// It's safe to drop the layer map lock after planning the next round of reads.
|
||||
// The fringe keeps readable handles for the layers which are safe to read even
|
||||
// if layers were compacted or flushed.
|
||||
//
|
||||
// The more interesting consideration is: "Why is the read algorithm still correct
|
||||
// if the layer map changes while it is operating?". Doing a vectored read on a
|
||||
// timeline boils down to pushing an imaginary lsn boundary downwards for each range
|
||||
// covered by the read. The layer map tells us how to move the lsn downwards for a
|
||||
// range at *a particular point in time*. It is fine for the answer to be different
|
||||
// at two different time points.
|
||||
drop(guard);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some((layer_to_read, keyspace_to_read, lsn_range)) = fringe.next_layer() {
|
||||
let next_cont_lsn = lsn_range.start;
|
||||
if let Some((layer_to_read, layer_visit)) = fringe.next_layer() {
|
||||
unmapped_keyspaces = layer_visit.keyspaces();
|
||||
layer_to_read
|
||||
.get_values_reconstruct_data(
|
||||
keyspace_to_read.clone(),
|
||||
lsn_range,
|
||||
reconstruct_state,
|
||||
ctx,
|
||||
)
|
||||
.get_values_reconstruct_data(layer_visit, reconstruct_state, ctx)
|
||||
.await?;
|
||||
|
||||
unmapped_keyspace = keyspace_to_read;
|
||||
cont_lsn = next_cont_lsn;
|
||||
|
||||
reconstruct_state.on_layer_visited(&layer_to_read);
|
||||
} else {
|
||||
break;
|
||||
@@ -5502,19 +5500,24 @@ impl Timeline {
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Vec<(Key, Bytes)>> {
|
||||
use super::storage_layer::{LayerVisitBuilder, LayerVisitBuilderUpdate};
|
||||
|
||||
let mut all_data = Vec::new();
|
||||
let guard = self.layers.read().await;
|
||||
for layer in guard.layer_map()?.iter_historic_layers() {
|
||||
if !layer.is_delta() && layer.image_layer_lsn() == lsn {
|
||||
let layer = guard.get_from_desc(&layer);
|
||||
let mut reconstruct_data = ValuesReconstructState::default();
|
||||
|
||||
let (layer, visit) = LayerVisitBuilder::new(
|
||||
ReadableLayer::PersistentLayer(layer),
|
||||
KeySpace::single(Key::MIN..Key::MAX),
|
||||
lsn..Lsn(lsn.0 + 1),
|
||||
)
|
||||
.build();
|
||||
|
||||
layer
|
||||
.get_values_reconstruct_data(
|
||||
KeySpace::single(Key::MIN..Key::MAX),
|
||||
lsn..Lsn(lsn.0 + 1),
|
||||
&mut reconstruct_data,
|
||||
ctx,
|
||||
)
|
||||
.get_values_reconstruct_data(visit, &mut reconstruct_data, ctx)
|
||||
.await?;
|
||||
for (k, v) in reconstruct_data.keys {
|
||||
all_data.push((k, v?.img.unwrap().1));
|
||||
|
||||
Reference in New Issue
Block a user