mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-15 04:00:38 +00:00
@@ -323,16 +323,15 @@ impl ValuesReconstructState {
|
||||
let key_done = match state.situation {
|
||||
ValueReconstructSituation::Complete => unreachable!(),
|
||||
ValueReconstructSituation::Continue => match future_value {
|
||||
FutureValue::ValueImage { rx } => {
|
||||
FutureValue::Img { rx } => {
|
||||
assert!(state.img.is_none());
|
||||
state.img = Some((lsn, rx));
|
||||
true
|
||||
}
|
||||
FutureValue::ValueWalRecord { will_init, rx } => {
|
||||
FutureValue::WalRecord { will_init, rx } => {
|
||||
state.records.push((lsn, rx));
|
||||
will_init
|
||||
}
|
||||
FutureValue::RawImage { rx } => todo!(),
|
||||
},
|
||||
};
|
||||
|
||||
@@ -368,14 +367,11 @@ impl ValuesReconstructState {
|
||||
}
|
||||
|
||||
enum FutureValue {
|
||||
RawImage {
|
||||
rx: sync::oneshot::Receiver<Result<Bytes, std::io::Error>>,
|
||||
},
|
||||
ValueWalRecord {
|
||||
WalRecord {
|
||||
will_init: bool,
|
||||
rx: sync::oneshot::Receiver<Result<Bytes, std::io::Error>>,
|
||||
},
|
||||
ValueImage {
|
||||
Img {
|
||||
rx: sync::oneshot::Receiver<Result<Bytes, std::io::Error>>,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -39,7 +39,7 @@ use crate::tenant::disk_btree::{
|
||||
use crate::tenant::storage_layer::layer::S3_UPLOAD_LIMIT;
|
||||
use crate::tenant::timeline::GetVectoredError;
|
||||
use crate::tenant::vectored_blob_io::{
|
||||
BlobFlag, BlobType, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
|
||||
BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
|
||||
VectoredReadCoalesceMode, VectoredReadPlanner,
|
||||
};
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
|
||||
@@ -934,17 +934,7 @@ impl DeltaLayerInner {
|
||||
range_end_handled = true;
|
||||
break;
|
||||
} else {
|
||||
planner.handle(
|
||||
key,
|
||||
lsn,
|
||||
blob_ref.pos(),
|
||||
flag,
|
||||
if blob_ref.will_init() {
|
||||
BlobType::ValueImage
|
||||
} else {
|
||||
BlobType::ValueImage
|
||||
},
|
||||
);
|
||||
planner.handle(key, lsn, blob_ref.pos(), flag, blob_ref.will_init());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1012,12 +1002,9 @@ impl DeltaLayerInner {
|
||||
reconstruct_state.update_key(
|
||||
&blob_meta.key,
|
||||
blob_meta.lsn,
|
||||
match blob_meta.blob_type {
|
||||
BlobType::RawImage => unreachable!(),
|
||||
BlobType::ValueImage => super::FutureValue::ValueImage { rx },
|
||||
BlobType::ValueWalRecord { will_init } => {
|
||||
super::FutureValue::ValueWalRecord { will_init, rx }
|
||||
}
|
||||
super::FutureValue::WalRecord {
|
||||
will_init: blob_meta.will_init,
|
||||
rx,
|
||||
},
|
||||
);
|
||||
}
|
||||
@@ -1192,7 +1179,7 @@ impl DeltaLayerInner {
|
||||
BlobMeta {
|
||||
key,
|
||||
lsn,
|
||||
blob_type: todo!(),
|
||||
will_init: false,
|
||||
},
|
||||
start_offset..end_offset,
|
||||
))
|
||||
|
||||
@@ -513,7 +513,7 @@ impl ImageLayerInner {
|
||||
range_end_handled = true;
|
||||
break;
|
||||
} else {
|
||||
planner.handle(key, self.lsn, offset, flag, crate::tenant::vectored_blob_io::BlobType::RawImage);
|
||||
planner.handle(key, self.lsn, offset, flag, true);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -591,7 +591,7 @@ impl ImageLayerInner {
|
||||
reconstruct_state.update_key(
|
||||
&blob_meta.key,
|
||||
blob_meta.lsn,
|
||||
super::FutureValue::RawImage { rx },
|
||||
super::FutureValue::Img { rx },
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -467,7 +467,11 @@ impl InMemoryLayer {
|
||||
|
||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||
senders.insert((key, *entry_lsn), tx);
|
||||
reconstruct_state.update_key(&key, *entry_lsn, todo!());
|
||||
reconstruct_state.update_key(
|
||||
&key,
|
||||
*entry_lsn,
|
||||
crate::tenant::storage_layer::FutureValue::WalRecord { will_init, rx },
|
||||
);
|
||||
|
||||
if will_init {
|
||||
break;
|
||||
|
||||
@@ -33,7 +33,7 @@ use crate::virtual_file::{self, VirtualFile};
|
||||
pub struct BlobMeta {
|
||||
pub key: Key,
|
||||
pub lsn: Lsn,
|
||||
pub blob_type: BlobType,
|
||||
pub will_init: bool,
|
||||
}
|
||||
|
||||
/// Blob offsets into [`VectoredBlobsBuf::buf`]
|
||||
@@ -346,13 +346,6 @@ pub enum BlobFlag {
|
||||
ReplaceAll,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub(crate) enum BlobType {
|
||||
RawImage,
|
||||
ValueImage,
|
||||
ValueWalRecord{ will_init: bool },
|
||||
}
|
||||
|
||||
/// Planner for vectored blob reads.
|
||||
///
|
||||
/// Blob offsets are received via [`VectoredReadPlanner::handle`]
|
||||
@@ -364,9 +357,9 @@ pub(crate) enum BlobType {
|
||||
pub struct VectoredReadPlanner {
|
||||
// Track all the blob offsets. Start offsets must be ordered.
|
||||
// Note: last bool is will_init
|
||||
blobs: BTreeMap<Key, Vec<(Lsn, u64, u64, BlobType)>>,
|
||||
blobs: BTreeMap<Key, Vec<(Lsn, u64, u64, bool)>>,
|
||||
// Arguments for previous blob passed into [`VectoredReadPlanner::handle`]
|
||||
prev: Option<(Key, Lsn, u64, BlobFlag, BlobType)>,
|
||||
prev: Option<(Key, Lsn, u64, BlobFlag, bool)>,
|
||||
|
||||
max_read_size: usize,
|
||||
|
||||
@@ -401,12 +394,12 @@ impl VectoredReadPlanner {
|
||||
/// This is used for WAL records that `will_init`.
|
||||
/// * [`BlobFlag::Ignore`]: This blob should not be included in the read. This happens
|
||||
/// if the blob is cached.
|
||||
pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64, flag: BlobFlag, blob_type: BlobType) {
|
||||
pub fn handle(&mut self, key: Key, lsn: Lsn, offset: u64, flag: BlobFlag, will_init: bool) {
|
||||
// Implementation note: internally lag behind by one blob such that
|
||||
// we have a start and end offset when initialising [`VectoredRead`]
|
||||
let (prev_key, prev_lsn, prev_offset, prev_flag, prev_blob_type) = match self.prev {
|
||||
let (prev_key, prev_lsn, prev_offset, prev_flag, prev_will_init) = match self.prev {
|
||||
None => {
|
||||
self.prev = Some((key, lsn, offset, flag, blob_type));
|
||||
self.prev = Some((key, lsn, offset, flag, will_init));
|
||||
return;
|
||||
}
|
||||
Some(prev) => prev,
|
||||
@@ -418,21 +411,21 @@ impl VectoredReadPlanner {
|
||||
prev_offset,
|
||||
offset,
|
||||
prev_flag,
|
||||
prev_blob_type,
|
||||
prev_will_init,
|
||||
);
|
||||
|
||||
self.prev = Some((key, lsn, offset, flag, blob_type));
|
||||
self.prev = Some((key, lsn, offset, flag, will_init));
|
||||
}
|
||||
|
||||
pub fn handle_range_end(&mut self, offset: u64) {
|
||||
if let Some((prev_key, prev_lsn, prev_offset, prev_flag, prev_blob_type)) = self.prev {
|
||||
if let Some((prev_key, prev_lsn, prev_offset, prev_flag, prev_will_init)) = self.prev {
|
||||
self.add_blob(
|
||||
prev_key,
|
||||
prev_lsn,
|
||||
prev_offset,
|
||||
offset,
|
||||
prev_flag,
|
||||
prev_blob_type,
|
||||
prev_will_init,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -446,17 +439,17 @@ impl VectoredReadPlanner {
|
||||
start_offset: u64,
|
||||
end_offset: u64,
|
||||
flag: BlobFlag,
|
||||
blob_type: BlobType,
|
||||
will_init: bool,
|
||||
) {
|
||||
match flag {
|
||||
BlobFlag::None => {
|
||||
let blobs_for_key = self.blobs.entry(key).or_default();
|
||||
blobs_for_key.push((lsn, start_offset, end_offset, blob_type));
|
||||
blobs_for_key.push((lsn, start_offset, end_offset, will_init));
|
||||
}
|
||||
BlobFlag::ReplaceAll => {
|
||||
let blobs_for_key = self.blobs.entry(key).or_default();
|
||||
blobs_for_key.clear();
|
||||
blobs_for_key.push((lsn, start_offset, end_offset, blob_type));
|
||||
blobs_for_key.push((lsn, start_offset, end_offset, will_init));
|
||||
}
|
||||
BlobFlag::Ignore => {}
|
||||
}
|
||||
@@ -467,7 +460,7 @@ impl VectoredReadPlanner {
|
||||
let mut reads = Vec::new();
|
||||
|
||||
for (key, blobs_for_key) in self.blobs {
|
||||
for (lsn, start_offset, end_offset, blob_type) in blobs_for_key {
|
||||
for (lsn, start_offset, end_offset, will_init) in blobs_for_key {
|
||||
let extended = match &mut current_read_builder {
|
||||
Some(read_builder) => read_builder.extend(
|
||||
start_offset,
|
||||
@@ -475,7 +468,7 @@ impl VectoredReadPlanner {
|
||||
BlobMeta {
|
||||
key,
|
||||
lsn,
|
||||
blob_type,
|
||||
will_init,
|
||||
},
|
||||
),
|
||||
None => VectoredReadExtended::No,
|
||||
@@ -488,7 +481,7 @@ impl VectoredReadPlanner {
|
||||
BlobMeta {
|
||||
key,
|
||||
lsn,
|
||||
blob_type,
|
||||
will_init,
|
||||
},
|
||||
self.max_read_size,
|
||||
self.mode,
|
||||
@@ -706,6 +699,7 @@ impl StreamingVectoredReadPlanner {
|
||||
start_offset: u64,
|
||||
end_offset: u64,
|
||||
is_last_blob_in_read: bool,
|
||||
// destination: oneshot::Sender<Result<Bytes, std::io::Error>>,
|
||||
) -> Option<VectoredRead> {
|
||||
match &mut self.read_builder {
|
||||
Some(read_builder) => {
|
||||
@@ -715,7 +709,7 @@ impl StreamingVectoredReadPlanner {
|
||||
BlobMeta {
|
||||
key,
|
||||
lsn,
|
||||
blob_type: todo!(),
|
||||
will_init: false,
|
||||
},
|
||||
);
|
||||
assert_eq!(extended, VectoredReadExtended::Yes);
|
||||
@@ -728,7 +722,7 @@ impl StreamingVectoredReadPlanner {
|
||||
BlobMeta {
|
||||
key,
|
||||
lsn,
|
||||
blob_type: todo!(),
|
||||
will_init: false,
|
||||
},
|
||||
self.mode,
|
||||
))
|
||||
|
||||
Reference in New Issue
Block a user