mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
Compare commits
4 Commits
release-pr
...
vlad/fix-v
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
84d1af736e | ||
|
|
3a218d7525 | ||
|
|
f0ad90f3ee | ||
|
|
a8ad678574 |
@@ -4164,9 +4164,18 @@ pub(crate) mod harness {
|
||||
let records_neon = records.iter().all(|r| apply_neon::can_apply_in_neon(&r.1));
|
||||
if records_neon {
|
||||
// For Neon wal records, we can decode without spawning postgres, so do so.
|
||||
let base_img = base_img.expect("Neon WAL redo requires base image").1;
|
||||
let mut page = BytesMut::new();
|
||||
page.extend_from_slice(&base_img);
|
||||
let mut page = match (base_img, records.first()) {
|
||||
(Some((_lsn, img)), _) => {
|
||||
let mut page = BytesMut::new();
|
||||
page.extend_from_slice(&img);
|
||||
page
|
||||
}
|
||||
(_, Some((_lsn, rec))) if rec.will_init() => BytesMut::new(),
|
||||
_ => {
|
||||
panic!("Neon WAL redo requires base image or will init record");
|
||||
}
|
||||
};
|
||||
|
||||
for (record_lsn, record) in records {
|
||||
apply_neon::apply_in_neon(&record, record_lsn, key, &mut page)?;
|
||||
}
|
||||
@@ -8470,4 +8479,135 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Regression test for https://github.com/neondatabase/neon/issues/9012
|
||||
// Create an image arrangement where we have to read at different LSN ranges
|
||||
// from a delta layer. This is achieved by overlapping an image layer on top of
|
||||
// a delta layer. Like so:
|
||||
//
|
||||
// A B
|
||||
// +----------------+ -> delta_layer
|
||||
// | | ^ lsn
|
||||
// | =========|-> nested_image_layer |
|
||||
// | C | |
|
||||
// +----------------+ |
|
||||
// ======== -> baseline_image_layer +-------> key
|
||||
//
|
||||
//
|
||||
// When querying the key range [A, B) we need to read at different LSN ranges
|
||||
// for [A, C) and [C, B). This test checks that the described edge case is handled correctly.
|
||||
#[tokio::test]
|
||||
async fn test_vectored_read_with_nested_image_layer() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_vectored_read_with_nested_image_layer").await?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
let will_init_keys = [2, 6];
|
||||
fn get_key(id: u32) -> Key {
|
||||
let mut key = Key::from_hex("110000000033333333444444445500000000").unwrap();
|
||||
key.field6 = id;
|
||||
key
|
||||
}
|
||||
|
||||
let mut expected_key_values = HashMap::new();
|
||||
|
||||
let baseline_image_layer_lsn = Lsn(0x10);
|
||||
let mut baseline_img_layer = Vec::new();
|
||||
for i in 0..5 {
|
||||
let key = get_key(i);
|
||||
let value = format!("value {i}@{baseline_image_layer_lsn}");
|
||||
|
||||
let removed = expected_key_values.insert(key, value.clone());
|
||||
assert!(removed.is_none());
|
||||
|
||||
baseline_img_layer.push((key, Bytes::from(value)));
|
||||
}
|
||||
|
||||
let nested_image_layer_lsn = Lsn(0x50);
|
||||
let mut nested_img_layer = Vec::new();
|
||||
for i in 5..10 {
|
||||
let key = get_key(i);
|
||||
let value = format!("value {i}@{nested_image_layer_lsn}");
|
||||
|
||||
let removed = expected_key_values.insert(key, value.clone());
|
||||
assert!(removed.is_none());
|
||||
|
||||
nested_img_layer.push((key, Bytes::from(value)));
|
||||
}
|
||||
|
||||
let mut delta_layer_spec = Vec::default();
|
||||
let delta_layer_start_lsn = Lsn(0x20);
|
||||
let mut delta_layer_end_lsn = delta_layer_start_lsn;
|
||||
|
||||
for i in 0..10 {
|
||||
let key = get_key(i);
|
||||
let key_in_nested = nested_img_layer
|
||||
.iter()
|
||||
.any(|(key_with_img, _)| *key_with_img == key);
|
||||
let lsn = {
|
||||
if key_in_nested {
|
||||
Lsn(nested_image_layer_lsn.0 + 0x10)
|
||||
} else {
|
||||
delta_layer_start_lsn
|
||||
}
|
||||
};
|
||||
|
||||
let will_init = will_init_keys.contains(&i);
|
||||
if will_init {
|
||||
delta_layer_spec.push((key, lsn, Value::WalRecord(NeonWalRecord::wal_init())));
|
||||
|
||||
expected_key_values.insert(key, "".to_string());
|
||||
} else {
|
||||
let delta = format!("@{lsn}");
|
||||
delta_layer_spec.push((
|
||||
key,
|
||||
lsn,
|
||||
Value::WalRecord(NeonWalRecord::wal_append(&delta)),
|
||||
));
|
||||
|
||||
expected_key_values
|
||||
.get_mut(&key)
|
||||
.expect("An image exists for each key")
|
||||
.push_str(delta.as_str());
|
||||
}
|
||||
delta_layer_end_lsn = std::cmp::max(delta_layer_start_lsn, lsn);
|
||||
}
|
||||
|
||||
delta_layer_end_lsn = Lsn(delta_layer_end_lsn.0 + 1);
|
||||
|
||||
assert!(
|
||||
nested_image_layer_lsn > delta_layer_start_lsn
|
||||
&& nested_image_layer_lsn < delta_layer_end_lsn
|
||||
);
|
||||
|
||||
let tline = tenant
|
||||
.create_test_timeline_with_layers(
|
||||
TIMELINE_ID,
|
||||
baseline_image_layer_lsn,
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
vec![DeltaLayerTestDesc::new_with_inferred_key_range(
|
||||
delta_layer_start_lsn..delta_layer_end_lsn,
|
||||
delta_layer_spec,
|
||||
)], // delta layers
|
||||
vec![
|
||||
(baseline_image_layer_lsn, baseline_img_layer),
|
||||
(nested_image_layer_lsn, nested_img_layer),
|
||||
], // image layers
|
||||
delta_layer_end_lsn,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let keyspace = KeySpace::single(get_key(0)..get_key(10));
|
||||
let results = tline
|
||||
.get_vectored(keyspace, delta_layer_end_lsn, &ctx)
|
||||
.await
|
||||
.expect("No vectored errors");
|
||||
for (key, res) in results {
|
||||
let value = res.expect("No key errors");
|
||||
let expected_value = expected_key_values.remove(&key).expect("No unknown keys");
|
||||
assert_eq!(value, Bytes::from(expected_value));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -287,7 +287,7 @@ pub(crate) enum ReadableLayer {
|
||||
|
||||
/// A partial description of a read to be done.
|
||||
#[derive(Debug, Clone)]
|
||||
struct ReadDesc {
|
||||
struct VisitLocation {
|
||||
/// An id used to resolve the readable layer within the fringe
|
||||
layer_id: LayerId,
|
||||
/// Lsn range for the read, used for selecting the next read
|
||||
@@ -303,46 +303,442 @@ struct ReadDesc {
|
||||
/// a two layer indexing scheme.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct LayerFringe {
|
||||
planned_reads_by_lsn: BinaryHeap<ReadDesc>,
|
||||
layers: HashMap<LayerId, LayerKeyspace>,
|
||||
visits_by_lsn_index: BinaryHeap<VisitLocation>,
|
||||
layer_visits: HashMap<LayerId, LayerVisitBuilder>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct LayerKeyspace {
|
||||
layer: ReadableLayer,
|
||||
target_keyspace: KeySpaceRandomAccum,
|
||||
pub(crate) enum LayerVisitBuilder {
|
||||
InMemoryLayer(InMemoryLayerVisitBuilder),
|
||||
PersistentLayer(PersistentLayerVisitBuilder),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum LayerVisit {
|
||||
InMemoryLayer(InMemoryLayerVisit),
|
||||
PersistentLayer(PersistentLayerVisit),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum PersistentLayerVisitBuilder {
|
||||
DeltaLayer(DeltaLayerVisitBuilder),
|
||||
ImageLayer(ImageLayerVisitBuilder),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum PersistentLayerVisit {
|
||||
DeltaLayer(DeltaLayerVisit),
|
||||
ImageLayer(ImageLayerVisit),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct InMemoryLayerVisitBuilder {
|
||||
/// Key space accumulator which will define which keys we are
|
||||
/// interested in for this layer visit.
|
||||
keyspace_accum: KeySpaceRandomAccum,
|
||||
/// Ignore any keys with an LSN greater or equal
|
||||
/// than the specified one.
|
||||
lsn_ceil: Lsn,
|
||||
/// Only consider keys at LSN greater or equal than the specified one
|
||||
lsn_floor: Lsn,
|
||||
// The in-memory layer to visit
|
||||
layer: Arc<InMemoryLayer>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct InMemoryLayerVisit {
|
||||
/// Key space of keys considered by the visit
|
||||
keyspace: KeySpace,
|
||||
/// Ignore any keys with an LSN greater or equal
|
||||
/// than the specified one.
|
||||
lsn_ceil: Lsn,
|
||||
/// Only consider keys at LSN greater or equal than the specified one
|
||||
lsn_floor: Lsn,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct DeltaLayerVisitBuilder {
|
||||
/// List of key spaces accumulators which will define what deltas are read.
|
||||
/// Each accumulator has an associated LSN which specifies
|
||||
/// the LSN floor for it (i.e. do not read below this LSN).
|
||||
keyspace_accums: HashMap<Lsn, KeySpaceRandomAccum>,
|
||||
/// Ignore any keys with an LSN greater or equal
|
||||
/// than the specified one.
|
||||
lsn_ceil: Lsn,
|
||||
/// Handle for the layer to visit (guaranteed to be a delta layer)
|
||||
layer: Layer,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct DeltaLayerVisit {
|
||||
/// List of key spaces considered during the visit.
|
||||
/// Each keyspace has an associated LSN which specifies
|
||||
/// the LSN floor for it (i.e. do not read below this LSN).
|
||||
keyspaces: Vec<(Lsn, KeySpace)>,
|
||||
/// Ignore any keys with an LSN greater or equal
|
||||
/// than the specified one.
|
||||
lsn_ceil: Lsn,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct ImageLayerVisitBuilder {
|
||||
/// Key space which defines which keys we are
|
||||
/// interested in for this layer visit.
|
||||
keyspace_accum: KeySpaceRandomAccum,
|
||||
/// Handle for the layer to visit (guaranteed to be an image layer)
|
||||
layer: Layer,
|
||||
/// Only consider keys at LSN greater or equal than the specified one
|
||||
lsn_floor: Lsn,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct ImageLayerVisit {
|
||||
/// Key space which defines which keys we are
|
||||
/// interested in for this layer visit.
|
||||
keyspace: KeySpace,
|
||||
/// Only consider keys at LSN greater or equal than the specified one
|
||||
lsn_floor: Lsn,
|
||||
}
|
||||
|
||||
impl LayerVisitBuilder {
|
||||
pub(crate) fn new(layer: ReadableLayer, keyspace: KeySpace, lsn_range: Range<Lsn>) -> Self {
|
||||
match layer {
|
||||
ReadableLayer::InMemoryLayer(in_mem) => {
|
||||
Self::InMemoryLayer(InMemoryLayerVisitBuilder::new(in_mem, keyspace, lsn_range))
|
||||
}
|
||||
ReadableLayer::PersistentLayer(persistent) => Self::PersistentLayer(
|
||||
PersistentLayerVisitBuilder::new(persistent, keyspace, lsn_range),
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl InMemoryLayerVisitBuilder {
|
||||
fn new(layer: Arc<InMemoryLayer>, keyspace: KeySpace, lsn_range: Range<Lsn>) -> Self {
|
||||
assert_eq!(lsn_range.start, layer.get_lsn_range().start);
|
||||
|
||||
let mut keyspace_accum = KeySpaceRandomAccum::new();
|
||||
keyspace_accum.add_keyspace(keyspace);
|
||||
|
||||
Self {
|
||||
keyspace_accum,
|
||||
lsn_floor: lsn_range.start,
|
||||
lsn_ceil: lsn_range.end,
|
||||
layer,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PersistentLayerVisitBuilder {
|
||||
fn new(layer: Layer, keyspace: KeySpace, lsn_range: Range<Lsn>) -> Self {
|
||||
let is_delta = layer.layer_desc().is_delta;
|
||||
if is_delta {
|
||||
Self::DeltaLayer(DeltaLayerVisitBuilder::new(layer, keyspace, lsn_range))
|
||||
} else {
|
||||
Self::ImageLayer(ImageLayerVisitBuilder::new(layer, keyspace, lsn_range))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DeltaLayerVisitBuilder {
|
||||
fn new(layer: Layer, keyspace: KeySpace, lsn_range: Range<Lsn>) -> Self {
|
||||
assert!(layer.layer_desc().is_delta);
|
||||
|
||||
let mut keyspace_accum = KeySpaceRandomAccum::new();
|
||||
keyspace_accum.add_keyspace(keyspace);
|
||||
|
||||
Self {
|
||||
keyspace_accums: HashMap::from([(lsn_range.start, keyspace_accum)]),
|
||||
lsn_ceil: lsn_range.end,
|
||||
layer,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ImageLayerVisitBuilder {
|
||||
fn new(layer: Layer, keyspace: KeySpace, lsn_range: Range<Lsn>) -> Self {
|
||||
assert!(!layer.layer_desc().is_delta);
|
||||
assert_eq!(lsn_range.start, layer.layer_desc().lsn_range.start);
|
||||
|
||||
let mut keyspace_accum = KeySpaceRandomAccum::new();
|
||||
keyspace_accum.add_keyspace(keyspace);
|
||||
|
||||
Self {
|
||||
keyspace_accum,
|
||||
lsn_floor: lsn_range.start,
|
||||
layer,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) trait LayerVisitBuilderUpdate {
|
||||
type L;
|
||||
type LV;
|
||||
|
||||
/// Extend an already planned layer visit to also include the keys
|
||||
/// in the provided keyspace and LSN range.
|
||||
fn update(&mut self, keyspace: KeySpace, lsn_range: Range<Lsn>);
|
||||
|
||||
/// Build the visit!
|
||||
fn build(self) -> (Self::L, Self::LV);
|
||||
}
|
||||
|
||||
impl LayerVisitBuilderUpdate for LayerVisitBuilder {
|
||||
type L = ReadableLayer;
|
||||
type LV = LayerVisit;
|
||||
|
||||
fn update(&mut self, keyspace: KeySpace, lsn_range: Range<Lsn>) {
|
||||
match self {
|
||||
LayerVisitBuilder::InMemoryLayer(v) => v.update(keyspace, lsn_range),
|
||||
LayerVisitBuilder::PersistentLayer(v) => v.update(keyspace, lsn_range),
|
||||
}
|
||||
}
|
||||
|
||||
fn build(self) -> (Self::L, Self::LV) {
|
||||
match self {
|
||||
LayerVisitBuilder::InMemoryLayer(in_mem) => (
|
||||
ReadableLayer::InMemoryLayer(in_mem.layer),
|
||||
LayerVisit::InMemoryLayer(InMemoryLayerVisit {
|
||||
keyspace: in_mem.keyspace_accum.to_keyspace(),
|
||||
lsn_ceil: in_mem.lsn_ceil,
|
||||
lsn_floor: in_mem.lsn_floor,
|
||||
}),
|
||||
),
|
||||
LayerVisitBuilder::PersistentLayer(pers) => {
|
||||
let (layer, visit) = pers.build();
|
||||
(
|
||||
ReadableLayer::PersistentLayer(layer),
|
||||
LayerVisit::PersistentLayer(visit),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl LayerVisitBuilderUpdate for PersistentLayerVisitBuilder {
|
||||
type L = Layer;
|
||||
type LV = PersistentLayerVisit;
|
||||
|
||||
fn update(&mut self, keyspace: KeySpace, lsn_range: Range<Lsn>) {
|
||||
match self {
|
||||
PersistentLayerVisitBuilder::DeltaLayer(v) => v.update(keyspace, lsn_range),
|
||||
PersistentLayerVisitBuilder::ImageLayer(v) => v.update(keyspace, lsn_range),
|
||||
}
|
||||
}
|
||||
|
||||
fn build(self) -> (Self::L, Self::LV) {
|
||||
match self {
|
||||
PersistentLayerVisitBuilder::DeltaLayer(delta) => {
|
||||
let (layer, visit) = delta.build();
|
||||
(layer, PersistentLayerVisit::DeltaLayer(visit))
|
||||
}
|
||||
PersistentLayerVisitBuilder::ImageLayer(img) => {
|
||||
let (layer, visit) = img.build();
|
||||
(layer, PersistentLayerVisit::ImageLayer(visit))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl LayerVisitBuilderUpdate for InMemoryLayerVisitBuilder {
|
||||
type L = Arc<InMemoryLayer>;
|
||||
type LV = InMemoryLayerVisit;
|
||||
|
||||
fn update(&mut self, keyspace: KeySpace, lsn_range: Range<Lsn>) {
|
||||
// Note: I cannot think of any cases when this update should happen,
|
||||
// since in memory layers span the entire key range.
|
||||
assert_eq!(lsn_range.end, self.lsn_ceil);
|
||||
assert_eq!(lsn_range.start, self.lsn_floor);
|
||||
self.keyspace_accum.add_keyspace(keyspace);
|
||||
}
|
||||
|
||||
fn build(self) -> (Self::L, Self::LV) {
|
||||
(
|
||||
self.layer,
|
||||
InMemoryLayerVisit {
|
||||
keyspace: self.keyspace_accum.to_keyspace(),
|
||||
lsn_floor: self.lsn_floor,
|
||||
lsn_ceil: self.lsn_ceil,
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl LayerVisitBuilderUpdate for DeltaLayerVisitBuilder {
|
||||
type L = Layer;
|
||||
type LV = DeltaLayerVisit;
|
||||
|
||||
fn update(&mut self, keyspace: KeySpace, lsn_range: Range<Lsn>) {
|
||||
assert_eq!(lsn_range.end, self.lsn_ceil);
|
||||
self.keyspace_accums
|
||||
.entry(lsn_range.start)
|
||||
.or_default()
|
||||
.add_keyspace(keyspace);
|
||||
}
|
||||
|
||||
fn build(self) -> (Self::L, Self::LV) {
|
||||
use itertools::Itertools;
|
||||
|
||||
let keyspaces = self
|
||||
.keyspace_accums
|
||||
.into_iter()
|
||||
.filter_map(|(lsn_floor, accum)| {
|
||||
let keyspace = accum.to_keyspace();
|
||||
if keyspace.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some((lsn_floor, keyspace))
|
||||
}
|
||||
})
|
||||
.sorted_by_key(|(_lsn_floor, keyspace)| keyspace.start().unwrap())
|
||||
.collect::<Vec<(Lsn, KeySpace)>>();
|
||||
|
||||
if cfg!(debug_assertions) {
|
||||
// Check that the keyspaces we are going to read from
|
||||
// a layer are non-overlapping.
|
||||
//
|
||||
// The keyspaces provided to vectored read initially are non-overlapping.
|
||||
// We may split keyspaces at each step and keyspaces resulting from a split
|
||||
// are non-overlapping as well. One can prove that the property holds by
|
||||
// induction.
|
||||
let mut prev_end: Option<Key> = None;
|
||||
for (_lsn_floor, crnt) in keyspaces.iter() {
|
||||
if let Some(prev_end) = prev_end {
|
||||
debug_assert!(prev_end <= crnt.start().unwrap())
|
||||
}
|
||||
|
||||
prev_end = Some(crnt.end().unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
(
|
||||
self.layer,
|
||||
DeltaLayerVisit {
|
||||
keyspaces,
|
||||
lsn_ceil: self.lsn_ceil,
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl LayerVisitBuilderUpdate for ImageLayerVisitBuilder {
|
||||
type L = Layer;
|
||||
type LV = ImageLayerVisit;
|
||||
|
||||
fn update(&mut self, keyspace: KeySpace, lsn_range: Range<Lsn>) {
|
||||
assert_eq!(lsn_range.start, self.lsn_floor);
|
||||
self.keyspace_accum.add_keyspace(keyspace);
|
||||
}
|
||||
|
||||
fn build(self) -> (Self::L, Self::LV) {
|
||||
(
|
||||
self.layer,
|
||||
ImageLayerVisit {
|
||||
keyspace: self.keyspace_accum.to_keyspace(),
|
||||
lsn_floor: self.lsn_floor,
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) trait LayerVisitDetails {
|
||||
/// Returns the key spaces planned for the visit
|
||||
/// and their associated floor LSNs.
|
||||
fn keyspaces(&self) -> Vec<(Lsn, KeySpace)>;
|
||||
}
|
||||
|
||||
impl LayerVisitDetails for LayerVisit {
|
||||
fn keyspaces(&self) -> Vec<(Lsn, KeySpace)> {
|
||||
match self {
|
||||
LayerVisit::PersistentLayer(pers) => pers.keyspaces(),
|
||||
LayerVisit::InMemoryLayer(in_mem) => in_mem.keyspaces(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl LayerVisitDetails for PersistentLayerVisit {
|
||||
fn keyspaces(&self) -> Vec<(Lsn, KeySpace)> {
|
||||
match self {
|
||||
PersistentLayerVisit::DeltaLayer(delta) => delta.keyspaces(),
|
||||
PersistentLayerVisit::ImageLayer(image) => image.keyspaces(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl LayerVisitDetails for DeltaLayerVisit {
|
||||
fn keyspaces(&self) -> Vec<(Lsn, KeySpace)> {
|
||||
self.keyspaces.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl LayerVisitDetails for ImageLayerVisit {
|
||||
fn keyspaces(&self) -> Vec<(Lsn, KeySpace)> {
|
||||
vec![(self.lsn_floor, self.keyspace.clone())]
|
||||
}
|
||||
}
|
||||
|
||||
impl LayerVisitDetails for InMemoryLayerVisit {
|
||||
fn keyspaces(&self) -> Vec<(Lsn, KeySpace)> {
|
||||
vec![(self.lsn_floor, self.keyspace.clone())]
|
||||
}
|
||||
}
|
||||
|
||||
impl LayerVisit {
|
||||
fn into_persistent_layer_visit(self) -> PersistentLayerVisit {
|
||||
match self {
|
||||
LayerVisit::PersistentLayer(visit) => visit,
|
||||
LayerVisit::InMemoryLayer(visit) => {
|
||||
panic!("Invalid attempt to cast to PersistentLayerVisit: {visit:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn into_in_memory_layer_visit(self) -> InMemoryLayerVisit {
|
||||
match self {
|
||||
LayerVisit::InMemoryLayer(visit) => visit,
|
||||
LayerVisit::PersistentLayer(visit) => {
|
||||
panic!("Invalid attempt to cast to InMemoryLayerVisit: {visit:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PersistentLayerVisit {
|
||||
fn into_delta_layer_visit(self) -> DeltaLayerVisit {
|
||||
match self {
|
||||
PersistentLayerVisit::DeltaLayer(visit) => visit,
|
||||
PersistentLayerVisit::ImageLayer(visit) => {
|
||||
panic!("Invalid attempt to cast to DeltaLayerVisit: {visit:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn into_image_layer_visit(self) -> ImageLayerVisit {
|
||||
match self {
|
||||
PersistentLayerVisit::ImageLayer(visit) => visit,
|
||||
PersistentLayerVisit::DeltaLayer(visit) => {
|
||||
panic!("Invalid attempt to cast to ImageLayerVisit: {visit:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl LayerFringe {
|
||||
pub(crate) fn new() -> Self {
|
||||
LayerFringe {
|
||||
planned_reads_by_lsn: BinaryHeap::new(),
|
||||
layers: HashMap::new(),
|
||||
visits_by_lsn_index: BinaryHeap::new(),
|
||||
layer_visits: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn next_layer(&mut self) -> Option<(ReadableLayer, KeySpace, Range<Lsn>)> {
|
||||
let read_desc = match self.planned_reads_by_lsn.pop() {
|
||||
pub(crate) fn next_layer(&mut self) -> Option<(ReadableLayer, LayerVisit)> {
|
||||
let read_desc = match self.visits_by_lsn_index.pop() {
|
||||
Some(desc) => desc,
|
||||
None => return None,
|
||||
};
|
||||
|
||||
let removed = self.layers.remove_entry(&read_desc.layer_id);
|
||||
|
||||
match removed {
|
||||
Some((
|
||||
_,
|
||||
LayerKeyspace {
|
||||
layer,
|
||||
mut target_keyspace,
|
||||
},
|
||||
)) => Some((
|
||||
layer,
|
||||
target_keyspace.consume_keyspace(),
|
||||
read_desc.lsn_range,
|
||||
)),
|
||||
None => unreachable!("fringe internals are always consistent"),
|
||||
}
|
||||
let removed = self.layer_visits.remove(&read_desc.layer_id)?;
|
||||
Some(removed.build())
|
||||
}
|
||||
|
||||
pub(crate) fn update(
|
||||
@@ -352,22 +748,18 @@ impl LayerFringe {
|
||||
lsn_range: Range<Lsn>,
|
||||
) {
|
||||
let layer_id = layer.id();
|
||||
let entry = self.layers.entry(layer_id.clone());
|
||||
let entry = self.layer_visits.entry(layer_id.clone());
|
||||
match entry {
|
||||
Entry::Occupied(mut entry) => {
|
||||
entry.get_mut().target_keyspace.add_keyspace(keyspace);
|
||||
entry.get_mut().update(keyspace, lsn_range);
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
self.planned_reads_by_lsn.push(ReadDesc {
|
||||
lsn_range,
|
||||
self.visits_by_lsn_index.push(VisitLocation {
|
||||
lsn_range: lsn_range.clone(),
|
||||
layer_id: layer_id.clone(),
|
||||
});
|
||||
let mut accum = KeySpaceRandomAccum::new();
|
||||
accum.add_keyspace(keyspace);
|
||||
entry.insert(LayerKeyspace {
|
||||
layer,
|
||||
target_keyspace: accum,
|
||||
});
|
||||
|
||||
entry.insert(LayerVisitBuilder::new(layer, keyspace, lsn_range));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -379,7 +771,7 @@ impl Default for LayerFringe {
|
||||
}
|
||||
}
|
||||
|
||||
impl Ord for ReadDesc {
|
||||
impl Ord for VisitLocation {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
let ord = self.lsn_range.end.cmp(&other.lsn_range.end);
|
||||
if ord == std::cmp::Ordering::Equal {
|
||||
@@ -390,19 +782,19 @@ impl Ord for ReadDesc {
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialOrd for ReadDesc {
|
||||
impl PartialOrd for VisitLocation {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for ReadDesc {
|
||||
impl PartialEq for VisitLocation {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.lsn_range == other.lsn_range
|
||||
}
|
||||
}
|
||||
|
||||
impl Eq for ReadDesc {}
|
||||
impl Eq for VisitLocation {}
|
||||
|
||||
impl ReadableLayer {
|
||||
pub(crate) fn id(&self) -> LayerId {
|
||||
@@ -414,20 +806,27 @@ impl ReadableLayer {
|
||||
|
||||
pub(crate) async fn get_values_reconstruct_data(
|
||||
&self,
|
||||
keyspace: KeySpace,
|
||||
lsn_range: Range<Lsn>,
|
||||
visit: LayerVisit,
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), GetVectoredError> {
|
||||
match self {
|
||||
ReadableLayer::PersistentLayer(layer) => {
|
||||
layer
|
||||
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, ctx)
|
||||
.get_values_reconstruct_data(
|
||||
visit.into_persistent_layer_visit(),
|
||||
reconstruct_state,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
}
|
||||
ReadableLayer::InMemoryLayer(layer) => {
|
||||
layer
|
||||
.get_values_reconstruct_data(keyspace, lsn_range.end, reconstruct_state, ctx)
|
||||
.get_values_reconstruct_data(
|
||||
visit.into_in_memory_layer_visit(),
|
||||
reconstruct_state,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -75,7 +75,7 @@ use utils::{
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
use super::{AsLayerDesc, LayerName, PersistentLayerDesc, ValuesReconstructState};
|
||||
use super::{AsLayerDesc, DeltaLayerVisit, LayerName, PersistentLayerDesc, ValuesReconstructState};
|
||||
|
||||
///
|
||||
/// Header stored in the beginning of the file
|
||||
@@ -841,8 +841,7 @@ impl DeltaLayerInner {
|
||||
// can be further optimised to visit the index only once.
|
||||
pub(super) async fn get_values_reconstruct_data(
|
||||
&self,
|
||||
keyspace: KeySpace,
|
||||
lsn_range: Range<Lsn>,
|
||||
visit: DeltaLayerVisit,
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), GetVectoredError> {
|
||||
@@ -863,8 +862,8 @@ impl DeltaLayerInner {
|
||||
let data_end_offset = self.index_start_offset();
|
||||
|
||||
let reads = Self::plan_reads(
|
||||
&keyspace,
|
||||
lsn_range.clone(),
|
||||
&visit.keyspaces,
|
||||
visit.lsn_ceil,
|
||||
data_end_offset,
|
||||
index_reader,
|
||||
planner,
|
||||
@@ -877,14 +876,16 @@ impl DeltaLayerInner {
|
||||
self.do_reads_and_update_state(reads, reconstruct_state, ctx)
|
||||
.await;
|
||||
|
||||
reconstruct_state.on_lsn_advanced(&keyspace, lsn_range.start);
|
||||
for (lsn_floor, keyspace) in visit.keyspaces {
|
||||
reconstruct_state.on_lsn_advanced(&keyspace, lsn_floor);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn plan_reads<Reader>(
|
||||
keyspace: &KeySpace,
|
||||
lsn_range: Range<Lsn>,
|
||||
keyspaces: &Vec<(Lsn, KeySpace)>,
|
||||
lsn_ceil: Lsn,
|
||||
data_end_offset: u64,
|
||||
index_reader: DiskBtreeReader<Reader, DELTA_KEY_SIZE>,
|
||||
mut planner: VectoredReadPlanner,
|
||||
@@ -898,48 +899,52 @@ impl DeltaLayerInner {
|
||||
.page_content_kind(PageContentKind::DeltaLayerBtreeNode)
|
||||
.build();
|
||||
|
||||
for range in keyspace.ranges.iter() {
|
||||
let mut range_end_handled = false;
|
||||
for (lsn_floor, keyspace) in keyspaces {
|
||||
let lsn_range = *lsn_floor..lsn_ceil;
|
||||
|
||||
let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
|
||||
let index_stream = index_reader.clone().into_stream(&start_key.0, &ctx);
|
||||
let mut index_stream = std::pin::pin!(index_stream);
|
||||
for range in keyspace.ranges.iter() {
|
||||
let mut range_end_handled = false;
|
||||
|
||||
while let Some(index_entry) = index_stream.next().await {
|
||||
let (raw_key, value) = index_entry?;
|
||||
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
|
||||
let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
|
||||
let blob_ref = BlobRef(value);
|
||||
let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start);
|
||||
let index_stream = index_reader.clone().into_stream(&start_key.0, &ctx);
|
||||
let mut index_stream = std::pin::pin!(index_stream);
|
||||
|
||||
// Lsns are not monotonically increasing across keys, so we don't assert on them.
|
||||
assert!(key >= range.start);
|
||||
while let Some(index_entry) = index_stream.next().await {
|
||||
let (raw_key, value) = index_entry?;
|
||||
let key = Key::from_slice(&raw_key[..KEY_SIZE]);
|
||||
let lsn = DeltaKey::extract_lsn_from_buf(&raw_key);
|
||||
let blob_ref = BlobRef(value);
|
||||
|
||||
let outside_lsn_range = !lsn_range.contains(&lsn);
|
||||
let below_cached_lsn = reconstruct_state.get_cached_lsn(&key) >= Some(lsn);
|
||||
// Lsns are not monotonically increasing across keys, so we don't assert on them.
|
||||
assert!(key >= range.start);
|
||||
|
||||
let flag = {
|
||||
if outside_lsn_range || below_cached_lsn {
|
||||
BlobFlag::Ignore
|
||||
} else if blob_ref.will_init() {
|
||||
BlobFlag::ReplaceAll
|
||||
let outside_lsn_range = !lsn_range.contains(&lsn);
|
||||
let below_cached_lsn = reconstruct_state.get_cached_lsn(&key) >= Some(lsn);
|
||||
|
||||
let flag = {
|
||||
if outside_lsn_range || below_cached_lsn {
|
||||
BlobFlag::Ignore
|
||||
} else if blob_ref.will_init() {
|
||||
BlobFlag::ReplaceAll
|
||||
} else {
|
||||
// Usual path: add blob to the read
|
||||
BlobFlag::None
|
||||
}
|
||||
};
|
||||
|
||||
if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) {
|
||||
planner.handle_range_end(blob_ref.pos());
|
||||
range_end_handled = true;
|
||||
break;
|
||||
} else {
|
||||
// Usual path: add blob to the read
|
||||
BlobFlag::None
|
||||
planner.handle(key, lsn, blob_ref.pos(), flag);
|
||||
}
|
||||
};
|
||||
|
||||
if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) {
|
||||
planner.handle_range_end(blob_ref.pos());
|
||||
range_end_handled = true;
|
||||
break;
|
||||
} else {
|
||||
planner.handle(key, lsn, blob_ref.pos(), flag);
|
||||
}
|
||||
}
|
||||
|
||||
if !range_end_handled {
|
||||
tracing::debug!("Handling range end fallback at {}", data_end_offset);
|
||||
planner.handle_range_end(data_end_offset);
|
||||
if !range_end_handled {
|
||||
tracing::debug!("Handling range end fallback at {}", data_end_offset);
|
||||
planner.handle_range_end(data_end_offset);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1641,8 +1646,8 @@ pub(crate) mod test {
|
||||
|
||||
// Plan and validate
|
||||
let vectored_reads = DeltaLayerInner::plan_reads(
|
||||
&keyspace,
|
||||
lsn_range.clone(),
|
||||
&vec![(lsn_range.start, keyspace.clone())],
|
||||
lsn_range.end,
|
||||
disk_offset,
|
||||
reader,
|
||||
planner,
|
||||
@@ -1895,8 +1900,8 @@ pub(crate) mod test {
|
||||
let data_end_offset = inner.index_start_blk as u64 * PAGE_SZ as u64;
|
||||
|
||||
let vectored_reads = DeltaLayerInner::plan_reads(
|
||||
&keyspace,
|
||||
entries_meta.lsn_range.clone(),
|
||||
&vec![(entries_meta.lsn_range.start, keyspace)],
|
||||
entries_meta.lsn_range.end,
|
||||
data_end_offset,
|
||||
index_reader,
|
||||
planner,
|
||||
|
||||
@@ -69,7 +69,7 @@ use utils::{
|
||||
};
|
||||
|
||||
use super::layer_name::ImageLayerName;
|
||||
use super::{AsLayerDesc, LayerName, PersistentLayerDesc, ValuesReconstructState};
|
||||
use super::{AsLayerDesc, ImageLayerVisit, LayerName, PersistentLayerDesc, ValuesReconstructState};
|
||||
|
||||
///
|
||||
/// Header stored in the beginning of the file
|
||||
@@ -435,12 +435,12 @@ impl ImageLayerInner {
|
||||
// the reconstruct state with whatever is found.
|
||||
pub(super) async fn get_values_reconstruct_data(
|
||||
&self,
|
||||
keyspace: KeySpace,
|
||||
visit: ImageLayerVisit,
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), GetVectoredError> {
|
||||
let reads = self
|
||||
.plan_reads(keyspace, None, ctx)
|
||||
.plan_reads(visit.keyspace, None, ctx)
|
||||
.await
|
||||
.map_err(GetVectoredError::Other)?;
|
||||
|
||||
|
||||
@@ -17,7 +17,6 @@ use anyhow::{anyhow, Context, Result};
|
||||
use bytes::Bytes;
|
||||
use camino::Utf8PathBuf;
|
||||
use pageserver_api::key::CompactKey;
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::InMemoryLayerInfo;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
@@ -36,7 +35,8 @@ use std::sync::atomic::{AtomicU64, AtomicUsize};
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use super::{
|
||||
DeltaLayerWriter, PersistentLayerDesc, ValueReconstructSituation, ValuesReconstructState,
|
||||
DeltaLayerWriter, InMemoryLayerVisit, PersistentLayerDesc, ValueReconstructSituation,
|
||||
ValuesReconstructState,
|
||||
};
|
||||
|
||||
pub(crate) mod vectored_dio_read;
|
||||
@@ -416,11 +416,14 @@ impl InMemoryLayer {
|
||||
// If the key is cached, go no further than the cached Lsn.
|
||||
pub(crate) async fn get_values_reconstruct_data(
|
||||
&self,
|
||||
keyspace: KeySpace,
|
||||
end_lsn: Lsn,
|
||||
visit: InMemoryLayerVisit,
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), GetVectoredError> {
|
||||
let InMemoryLayerVisit {
|
||||
keyspace, lsn_ceil, ..
|
||||
} = visit;
|
||||
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
.page_content_kind(PageContentKind::InMemoryLayer)
|
||||
.build();
|
||||
@@ -440,8 +443,8 @@ impl InMemoryLayer {
|
||||
{
|
||||
let key = Key::from_compact(*key);
|
||||
let lsn_range = match reconstruct_state.get_cached_lsn(&key) {
|
||||
Some(cached_lsn) => (cached_lsn + 1)..end_lsn,
|
||||
None => self.start_lsn..end_lsn,
|
||||
Some(cached_lsn) => (cached_lsn + 1)..lsn_ceil,
|
||||
None => self.start_lsn..lsn_ceil,
|
||||
};
|
||||
|
||||
let slice = vec_map.slice_range(lsn_range);
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
use anyhow::Context;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::HistoricLayerInfo;
|
||||
use pageserver_api::shard::{ShardIdentity, ShardIndex, TenantShardId};
|
||||
use std::ops::Range;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::time::{Duration, SystemTime};
|
||||
@@ -23,7 +21,7 @@ use super::delta_layer::{self, DeltaEntry};
|
||||
use super::image_layer::{self};
|
||||
use super::{
|
||||
AsLayerDesc, ImageLayerWriter, LayerAccessStats, LayerAccessStatsReset, LayerName,
|
||||
LayerVisibilityHint, PersistentLayerDesc, ValuesReconstructState,
|
||||
LayerVisibilityHint, PersistentLayerDesc, PersistentLayerVisit, ValuesReconstructState,
|
||||
};
|
||||
|
||||
use utils::generation::Generation;
|
||||
@@ -303,8 +301,7 @@ impl Layer {
|
||||
|
||||
pub(crate) async fn get_values_reconstruct_data(
|
||||
&self,
|
||||
keyspace: KeySpace,
|
||||
lsn_range: Range<Lsn>,
|
||||
visit: PersistentLayerVisit,
|
||||
reconstruct_data: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), GetVectoredError> {
|
||||
@@ -322,7 +319,7 @@ impl Layer {
|
||||
self.record_access(ctx);
|
||||
|
||||
layer
|
||||
.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, &self.0, ctx)
|
||||
.get_values_reconstruct_data(visit, reconstruct_data, &self.0, ctx)
|
||||
.instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self))
|
||||
.await
|
||||
.map_err(|err| match err {
|
||||
@@ -1741,8 +1738,7 @@ impl DownloadedLayer {
|
||||
|
||||
async fn get_values_reconstruct_data(
|
||||
&self,
|
||||
keyspace: KeySpace,
|
||||
lsn_range: Range<Lsn>,
|
||||
visit: PersistentLayerVisit,
|
||||
reconstruct_data: &mut ValuesReconstructState,
|
||||
owner: &Arc<LayerInner>,
|
||||
ctx: &RequestContext,
|
||||
@@ -1755,11 +1751,11 @@ impl DownloadedLayer {
|
||||
.map_err(GetVectoredError::Other)?
|
||||
{
|
||||
Delta(d) => {
|
||||
d.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, ctx)
|
||||
d.get_values_reconstruct_data(visit.into_delta_layer_visit(), reconstruct_data, ctx)
|
||||
.await
|
||||
}
|
||||
Image(i) => {
|
||||
i.get_values_reconstruct_data(keyspace, reconstruct_data, ctx)
|
||||
i.get_values_reconstruct_data(visit.into_image_layer_visit(), reconstruct_data, ctx)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use std::time::UNIX_EPOCH;
|
||||
|
||||
use pageserver_api::key::CONTROLFILE_KEY;
|
||||
use pageserver_api::{key::CONTROLFILE_KEY, keyspace::KeySpace};
|
||||
use tokio::task::JoinSet;
|
||||
use utils::{
|
||||
completion::{self, Completion},
|
||||
@@ -9,7 +9,10 @@ use utils::{
|
||||
|
||||
use super::failpoints::{Failpoint, FailpointKind};
|
||||
use super::*;
|
||||
use crate::{context::DownloadBehavior, tenant::storage_layer::LayerVisibilityHint};
|
||||
use crate::{
|
||||
context::DownloadBehavior,
|
||||
tenant::storage_layer::{ImageLayerVisit, LayerVisibilityHint},
|
||||
};
|
||||
use crate::{task_mgr::TaskKind, tenant::harness::TenantHarness};
|
||||
|
||||
/// Used in tests to advance a future to wanted await point, and not futher.
|
||||
@@ -56,13 +59,14 @@ async fn smoke_test() {
|
||||
|
||||
let img_before = {
|
||||
let mut data = ValuesReconstructState::default();
|
||||
let visit = ImageLayerVisit {
|
||||
keyspace: controlfile_keyspace.clone(),
|
||||
lsn_floor: Lsn(0x10),
|
||||
};
|
||||
let visit = PersistentLayerVisit::ImageLayer(visit);
|
||||
|
||||
layer
|
||||
.get_values_reconstruct_data(
|
||||
controlfile_keyspace.clone(),
|
||||
Lsn(0x10)..Lsn(0x11),
|
||||
&mut data,
|
||||
&ctx,
|
||||
)
|
||||
.get_values_reconstruct_data(visit, &mut data, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
data.keys
|
||||
@@ -88,13 +92,14 @@ async fn smoke_test() {
|
||||
// on accesses when the layer is evicted, it will automatically be downloaded.
|
||||
let img_after = {
|
||||
let mut data = ValuesReconstructState::default();
|
||||
let visit = ImageLayerVisit {
|
||||
keyspace: controlfile_keyspace.clone(),
|
||||
lsn_floor: Lsn(0x10),
|
||||
};
|
||||
let visit = PersistentLayerVisit::ImageLayer(visit);
|
||||
|
||||
layer
|
||||
.get_values_reconstruct_data(
|
||||
controlfile_keyspace.clone(),
|
||||
Lsn(0x10)..Lsn(0x11),
|
||||
&mut data,
|
||||
&ctx,
|
||||
)
|
||||
.get_values_reconstruct_data(visit, &mut data, &ctx)
|
||||
.instrument(download_span.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -136,7 +136,8 @@ use self::logical_size::LogicalSize;
|
||||
use self::walreceiver::{WalReceiver, WalReceiverConf};
|
||||
|
||||
use super::{
|
||||
config::TenantConf, storage_layer::inmemory_layer, storage_layer::LayerVisibilityHint,
|
||||
config::TenantConf,
|
||||
storage_layer::{inmemory_layer, LayerVisibilityHint, LayerVisitDetails},
|
||||
upload_queue::NotInitialized,
|
||||
};
|
||||
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
|
||||
@@ -3153,12 +3154,12 @@ impl Timeline {
|
||||
async fn get_vectored_reconstruct_data_timeline(
|
||||
timeline: &Timeline,
|
||||
keyspace: KeySpace,
|
||||
mut cont_lsn: Lsn,
|
||||
cont_lsn: Lsn,
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<TimelineVisitOutcome, GetVectoredError> {
|
||||
let mut unmapped_keyspace = keyspace.clone();
|
||||
let mut unmapped_keyspaces = vec![(cont_lsn, keyspace.clone())];
|
||||
let mut fringe = LayerFringe::new();
|
||||
|
||||
let mut completed_keyspace = KeySpace::default();
|
||||
@@ -3171,84 +3172,86 @@ impl Timeline {
|
||||
|
||||
let (keys_done_last_step, keys_with_image_coverage) =
|
||||
reconstruct_state.consume_done_keys();
|
||||
unmapped_keyspace.remove_overlapping_with(&keys_done_last_step);
|
||||
|
||||
// Update state that is external to the loop.
|
||||
completed_keyspace.merge(&keys_done_last_step);
|
||||
if let Some(keys_with_image_coverage) = keys_with_image_coverage {
|
||||
unmapped_keyspace
|
||||
.remove_overlapping_with(&KeySpace::single(keys_with_image_coverage.clone()));
|
||||
if let Some(keys_with_image_coverage) = keys_with_image_coverage.clone() {
|
||||
image_covered_keyspace.add_range(keys_with_image_coverage);
|
||||
}
|
||||
|
||||
// Do not descent any further if the last layer we visited
|
||||
// completed all keys in the keyspace it inspected. This is not
|
||||
// required for correctness, but avoids visiting extra layers
|
||||
// which turns out to be a perf bottleneck in some cases.
|
||||
if !unmapped_keyspace.is_empty() {
|
||||
let guard = timeline.layers.read().await;
|
||||
let layers = guard.layer_map()?;
|
||||
|
||||
let in_memory_layer = layers.find_in_memory_layer(|l| {
|
||||
let start_lsn = l.get_lsn_range().start;
|
||||
cont_lsn > start_lsn
|
||||
});
|
||||
|
||||
match in_memory_layer {
|
||||
Some(l) => {
|
||||
let lsn_range = l.get_lsn_range().start..cont_lsn;
|
||||
fringe.update(
|
||||
ReadableLayer::InMemoryLayer(l),
|
||||
unmapped_keyspace.clone(),
|
||||
lsn_range,
|
||||
);
|
||||
}
|
||||
None => {
|
||||
for range in unmapped_keyspace.ranges.iter() {
|
||||
let results = layers.range_search(range.clone(), cont_lsn);
|
||||
|
||||
results
|
||||
.found
|
||||
.into_iter()
|
||||
.map(|(SearchResult { layer, lsn_floor }, keyspace_accum)| {
|
||||
(
|
||||
ReadableLayer::PersistentLayer(guard.get_from_desc(&layer)),
|
||||
keyspace_accum.to_keyspace(),
|
||||
lsn_floor..cont_lsn,
|
||||
)
|
||||
})
|
||||
.for_each(|(layer, keyspace, lsn_range)| {
|
||||
fringe.update(layer, keyspace, lsn_range)
|
||||
});
|
||||
}
|
||||
}
|
||||
for (cont_lsn, unmapped_keyspace) in unmapped_keyspaces.iter_mut() {
|
||||
// Remove any completed keys from the currently inspected keyspace.
|
||||
unmapped_keyspace.remove_overlapping_with(&keys_done_last_step);
|
||||
if let Some(keys_with_image_coverage) = keys_with_image_coverage.clone() {
|
||||
unmapped_keyspace
|
||||
.remove_overlapping_with(&KeySpace::single(keys_with_image_coverage));
|
||||
}
|
||||
|
||||
// It's safe to drop the layer map lock after planning the next round of reads.
|
||||
// The fringe keeps readable handles for the layers which are safe to read even
|
||||
// if layers were compacted or flushed.
|
||||
//
|
||||
// The more interesting consideration is: "Why is the read algorithm still correct
|
||||
// if the layer map changes while it is operating?". Doing a vectored read on a
|
||||
// timeline boils down to pushing an imaginary lsn boundary downwards for each range
|
||||
// covered by the read. The layer map tells us how to move the lsn downwards for a
|
||||
// range at *a particular point in time*. It is fine for the answer to be different
|
||||
// at two different time points.
|
||||
drop(guard);
|
||||
// Do not descent any further if the last layer we visited
|
||||
// completed all keys in the keyspace it inspected. This is not
|
||||
// required for correctness, but avoids visiting extra layers
|
||||
// which turns out to be a perf bottleneck in some cases.
|
||||
if !unmapped_keyspace.is_empty() {
|
||||
let guard = timeline.layers.read().await;
|
||||
let layers = guard.layer_map()?;
|
||||
|
||||
let in_memory_layer = layers.find_in_memory_layer(|l| {
|
||||
let start_lsn = l.get_lsn_range().start;
|
||||
*cont_lsn > start_lsn
|
||||
});
|
||||
|
||||
match in_memory_layer {
|
||||
Some(l) => {
|
||||
let lsn_range = l.get_lsn_range().start..*cont_lsn;
|
||||
fringe.update(
|
||||
ReadableLayer::InMemoryLayer(l),
|
||||
unmapped_keyspace.clone(),
|
||||
lsn_range,
|
||||
);
|
||||
}
|
||||
None => {
|
||||
for range in unmapped_keyspace.ranges.iter() {
|
||||
let results = layers.range_search(range.clone(), *cont_lsn);
|
||||
|
||||
results
|
||||
.found
|
||||
.into_iter()
|
||||
.map(|(SearchResult { layer, lsn_floor }, keyspace_accum)| {
|
||||
(
|
||||
ReadableLayer::PersistentLayer(
|
||||
guard.get_from_desc(&layer),
|
||||
),
|
||||
keyspace_accum.to_keyspace(),
|
||||
lsn_floor..*cont_lsn,
|
||||
)
|
||||
})
|
||||
.for_each(|(layer, keyspace, lsn_range)| {
|
||||
fringe.update(layer, keyspace, lsn_range)
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// It's safe to drop the layer map lock after planning the next round of reads.
|
||||
// The fringe keeps readable handles for the layers which are safe to read even
|
||||
// if layers were compacted or flushed.
|
||||
//
|
||||
// The more interesting consideration is: "Why is the read algorithm still correct
|
||||
// if the layer map changes while it is operating?". Doing a vectored read on a
|
||||
// timeline boils down to pushing an imaginary lsn boundary downwards for each range
|
||||
// covered by the read. The layer map tells us how to move the lsn downwards for a
|
||||
// range at *a particular point in time*. It is fine for the answer to be different
|
||||
// at two different time points.
|
||||
drop(guard);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some((layer_to_read, keyspace_to_read, lsn_range)) = fringe.next_layer() {
|
||||
let next_cont_lsn = lsn_range.start;
|
||||
if let Some((layer_to_read, layer_visit)) = fringe.next_layer() {
|
||||
unmapped_keyspaces = layer_visit.keyspaces();
|
||||
layer_to_read
|
||||
.get_values_reconstruct_data(
|
||||
keyspace_to_read.clone(),
|
||||
lsn_range,
|
||||
reconstruct_state,
|
||||
ctx,
|
||||
)
|
||||
.get_values_reconstruct_data(layer_visit, reconstruct_state, ctx)
|
||||
.await?;
|
||||
|
||||
unmapped_keyspace = keyspace_to_read;
|
||||
cont_lsn = next_cont_lsn;
|
||||
|
||||
reconstruct_state.on_layer_visited(&layer_to_read);
|
||||
} else {
|
||||
break;
|
||||
@@ -5502,19 +5505,24 @@ impl Timeline {
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Vec<(Key, Bytes)>> {
|
||||
use super::storage_layer::{LayerVisitBuilder, LayerVisitBuilderUpdate};
|
||||
|
||||
let mut all_data = Vec::new();
|
||||
let guard = self.layers.read().await;
|
||||
for layer in guard.layer_map()?.iter_historic_layers() {
|
||||
if !layer.is_delta() && layer.image_layer_lsn() == lsn {
|
||||
let layer = guard.get_from_desc(&layer);
|
||||
let mut reconstruct_data = ValuesReconstructState::default();
|
||||
|
||||
let (layer, visit) = LayerVisitBuilder::new(
|
||||
ReadableLayer::PersistentLayer(layer),
|
||||
KeySpace::single(Key::MIN..Key::MAX),
|
||||
lsn..Lsn(lsn.0 + 1),
|
||||
)
|
||||
.build();
|
||||
|
||||
layer
|
||||
.get_values_reconstruct_data(
|
||||
KeySpace::single(Key::MIN..Key::MAX),
|
||||
lsn..Lsn(lsn.0 + 1),
|
||||
&mut reconstruct_data,
|
||||
ctx,
|
||||
)
|
||||
.get_values_reconstruct_data(visit, &mut reconstruct_data, ctx)
|
||||
.await?;
|
||||
for (k, v) in reconstruct_data.keys {
|
||||
all_data.push((k, v?.img.unwrap().1));
|
||||
|
||||
Reference in New Issue
Block a user