mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-10 06:00:38 +00:00
Compare commits
3 Commits
conrad/ref
...
skyzh/reve
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d33feb0c8e | ||
|
|
674bdcb7c6 | ||
|
|
a3b6c4b0c0 |
@@ -125,7 +125,6 @@ fn main() -> anyhow::Result<()> {
|
||||
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
|
||||
info!(?conf.get_impl, "starting with get page implementation");
|
||||
info!(?conf.get_vectored_impl, "starting with vectored get page implementation");
|
||||
info!(?conf.compact_level0_phase1_value_access, "starting with setting for compact_level0_phase1_value_access");
|
||||
|
||||
let tenants_path = conf.tenants_path();
|
||||
if !tenants_path.exists() {
|
||||
|
||||
@@ -29,7 +29,6 @@ use utils::{
|
||||
logging::LogFormat,
|
||||
};
|
||||
|
||||
use crate::tenant::timeline::compaction::CompactL0Phase1ValueAccess;
|
||||
use crate::tenant::vectored_blob_io::MaxVectoredReadBytes;
|
||||
use crate::tenant::{config::TenantConfOpt, timeline::GetImpl};
|
||||
use crate::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
|
||||
@@ -296,10 +295,6 @@ pub struct PageServerConf {
|
||||
pub ephemeral_bytes_per_memory_kb: usize,
|
||||
|
||||
pub l0_flush: L0FlushConfig,
|
||||
|
||||
/// This flag is temporary and will be removed after gradual rollout.
|
||||
/// See <https://github.com/neondatabase/neon/issues/8184>.
|
||||
pub compact_level0_phase1_value_access: CompactL0Phase1ValueAccess,
|
||||
}
|
||||
|
||||
/// We do not want to store this in a PageServerConf because the latter may be logged
|
||||
@@ -406,8 +401,6 @@ struct PageServerConfigBuilder {
|
||||
ephemeral_bytes_per_memory_kb: BuilderValue<usize>,
|
||||
|
||||
l0_flush: BuilderValue<L0FlushConfig>,
|
||||
|
||||
compact_level0_phase1_value_access: BuilderValue<CompactL0Phase1ValueAccess>,
|
||||
}
|
||||
|
||||
impl PageServerConfigBuilder {
|
||||
@@ -497,7 +490,6 @@ impl PageServerConfigBuilder {
|
||||
validate_vectored_get: Set(DEFAULT_VALIDATE_VECTORED_GET),
|
||||
ephemeral_bytes_per_memory_kb: Set(DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
|
||||
l0_flush: Set(L0FlushConfig::default()),
|
||||
compact_level0_phase1_value_access: Set(CompactL0Phase1ValueAccess::default()),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -681,10 +673,6 @@ impl PageServerConfigBuilder {
|
||||
self.l0_flush = BuilderValue::Set(value);
|
||||
}
|
||||
|
||||
pub fn compact_level0_phase1_value_access(&mut self, value: CompactL0Phase1ValueAccess) {
|
||||
self.compact_level0_phase1_value_access = BuilderValue::Set(value);
|
||||
}
|
||||
|
||||
pub fn build(self, id: NodeId) -> anyhow::Result<PageServerConf> {
|
||||
let default = Self::default_values();
|
||||
|
||||
@@ -742,7 +730,6 @@ impl PageServerConfigBuilder {
|
||||
image_compression,
|
||||
ephemeral_bytes_per_memory_kb,
|
||||
l0_flush,
|
||||
compact_level0_phase1_value_access,
|
||||
}
|
||||
CUSTOM LOGIC
|
||||
{
|
||||
@@ -1015,9 +1002,6 @@ impl PageServerConf {
|
||||
"l0_flush" => {
|
||||
builder.l0_flush(utils::toml_edit_ext::deserialize_item(item).context("l0_flush")?)
|
||||
}
|
||||
"compact_level0_phase1_value_access" => {
|
||||
builder.compact_level0_phase1_value_access(utils::toml_edit_ext::deserialize_item(item).context("compact_level0_phase1_value_access")?)
|
||||
}
|
||||
_ => bail!("unrecognized pageserver option '{key}'"),
|
||||
}
|
||||
}
|
||||
@@ -1102,7 +1086,6 @@ impl PageServerConf {
|
||||
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
|
||||
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
|
||||
l0_flush: L0FlushConfig::default(),
|
||||
compact_level0_phase1_value_access: CompactL0Phase1ValueAccess::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1344,7 +1327,6 @@ background_task_maximum_delay = '334 s'
|
||||
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
|
||||
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
|
||||
l0_flush: L0FlushConfig::default(),
|
||||
compact_level0_phase1_value_access: CompactL0Phase1ValueAccess::default(),
|
||||
},
|
||||
"Correct defaults should be used when no config values are provided"
|
||||
);
|
||||
@@ -1419,7 +1401,6 @@ background_task_maximum_delay = '334 s'
|
||||
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
|
||||
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
|
||||
l0_flush: L0FlushConfig::default(),
|
||||
compact_level0_phase1_value_access: CompactL0Phase1ValueAccess::default(),
|
||||
},
|
||||
"Should be able to parse all basic config values correctly"
|
||||
);
|
||||
|
||||
@@ -8,7 +8,8 @@ use std::time::Duration;
|
||||
pub use pageserver_api::key::{Key, KEY_SIZE};
|
||||
|
||||
/// A 'value' stored for a one Key.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[cfg_attr(test, derive(PartialEq))]
|
||||
pub enum Value {
|
||||
/// An Image value contains a full copy of the value
|
||||
Image(Bytes),
|
||||
|
||||
@@ -296,19 +296,13 @@ where
|
||||
let mut stack = Vec::new();
|
||||
stack.push((self.root_blk, None));
|
||||
let block_cursor = self.reader.block_cursor();
|
||||
let mut node_buf = [0_u8; PAGE_SZ];
|
||||
while let Some((node_blknum, opt_iter)) = stack.pop() {
|
||||
// Read the node, through the PS PageCache, into local variable `node_buf`.
|
||||
// We could keep the page cache read guard alive, but, at the time of writing,
|
||||
// we run quite small PS PageCache s => can't risk running out of
|
||||
// PageCache space because this stream isn't consumed fast enough.
|
||||
let page_read_guard = block_cursor
|
||||
// Locate the node.
|
||||
let node_buf = block_cursor
|
||||
.read_blk(self.start_blk + node_blknum, ctx)
|
||||
.await?;
|
||||
node_buf.copy_from_slice(page_read_guard.as_ref());
|
||||
drop(page_read_guard); // drop page cache read guard early
|
||||
|
||||
let node = OnDiskNode::deparse(&node_buf)?;
|
||||
let node = OnDiskNode::deparse(node_buf.as_ref())?;
|
||||
let prefix_len = node.prefix_len as usize;
|
||||
let suffix_len = node.suffix_len as usize;
|
||||
|
||||
@@ -351,7 +345,6 @@ where
|
||||
Either::Left(idx..node.num_children.into())
|
||||
};
|
||||
|
||||
|
||||
// idx points to the first match now. Keep going from there
|
||||
while let Some(idx) = iter.next() {
|
||||
let key_off = idx * suffix_len;
|
||||
|
||||
@@ -59,7 +59,7 @@ use std::{
|
||||
collections::{BTreeMap, HashMap, HashSet},
|
||||
sync::atomic::AtomicU64,
|
||||
};
|
||||
use std::{cmp::min, ops::ControlFlow};
|
||||
use std::{cmp::min, cmp::Ordering, ops::ControlFlow};
|
||||
use std::{
|
||||
collections::btree_map::Entry,
|
||||
ops::{Deref, Range},
|
||||
@@ -180,6 +180,25 @@ impl std::fmt::Display for ImageLayerCreationMode {
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrapper for key range to provide reverse ordering by range length for BinaryHeap
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub(crate) struct Hole {
|
||||
key_range: Range<Key>,
|
||||
coverage_size: usize,
|
||||
}
|
||||
|
||||
impl Ord for Hole {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
other.coverage_size.cmp(&self.coverage_size) // inverse order
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialOrd for Hole {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
|
||||
/// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
|
||||
/// Can be removed after all refactors are done.
|
||||
fn drop_rlock<T>(rlock: tokio::sync::RwLockReadGuard<T>) {
|
||||
|
||||
@@ -35,8 +35,8 @@ use crate::tenant::storage_layer::merge_iterator::MergeIterator;
|
||||
use crate::tenant::storage_layer::{
|
||||
AsLayerDesc, PersistentLayerDesc, PersistentLayerKey, ValueReconstructState,
|
||||
};
|
||||
use crate::tenant::timeline::ImageLayerCreationOutcome;
|
||||
use crate::tenant::timeline::{drop_rlock, DeltaLayerWriter, ImageLayerWriter};
|
||||
use crate::tenant::timeline::{Hole, ImageLayerCreationOutcome};
|
||||
use crate::tenant::timeline::{Layer, ResidentLayer};
|
||||
use crate::tenant::DeltaLayer;
|
||||
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
|
||||
@@ -752,230 +752,66 @@ impl Timeline {
|
||||
.read_lock_held_spawn_blocking_startup_micros
|
||||
.till_now();
|
||||
|
||||
// TODO: replace with streaming k-merge
|
||||
let all_keys = {
|
||||
let mut all_keys = Vec::new();
|
||||
for l in deltas_to_compact.iter() {
|
||||
all_keys.extend(l.load_keys(ctx).await.map_err(CompactionError::Other)?);
|
||||
}
|
||||
// The current stdlib sorting implementation is designed in a way where it is
|
||||
// particularly fast where the slice is made up of sorted sub-ranges.
|
||||
all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
|
||||
all_keys
|
||||
};
|
||||
// Determine N largest holes where N is number of compacted layers.
|
||||
let max_holes = deltas_to_compact.len();
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
|
||||
let min_hole_coverage_size = 3; // TODO: something more flexible?
|
||||
|
||||
// min-heap (reserve space for one more element added before eviction)
|
||||
let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
|
||||
let mut prev: Option<Key> = None;
|
||||
|
||||
let mut all_keys = Vec::new();
|
||||
|
||||
for l in deltas_to_compact.iter() {
|
||||
all_keys.extend(l.load_keys(ctx).await.map_err(CompactionError::Other)?);
|
||||
}
|
||||
|
||||
// FIXME: should spawn_blocking the rest of this function
|
||||
|
||||
// The current stdlib sorting implementation is designed in a way where it is
|
||||
// particularly fast where the slice is made up of sorted sub-ranges.
|
||||
all_keys.sort_by_key(|DeltaEntry { key, lsn, .. }| (*key, *lsn));
|
||||
|
||||
stats.read_lock_held_key_sort_micros = stats.read_lock_held_prerequisites_micros.till_now();
|
||||
|
||||
// Determine N largest holes where N is number of compacted layers. The vec is sorted by key range start.
|
||||
//
|
||||
// A hole is a key range for which this compaction doesn't have any WAL records.
|
||||
// Our goal in this compaction iteration is to avoid creating L1s that, in terms of their key range,
|
||||
// cover the hole, but actually don't contain any WAL records for that key range.
|
||||
// The reason is that the mere stack of L1s (`count_deltas`) triggers image layer creation (`create_image_layers`).
|
||||
// That image layer creation would be useless for a hole range covered by L1s that don't contain any WAL records.
|
||||
//
|
||||
// The algorithm chooses holes as follows.
|
||||
// - Slide a 2-window over the keys in key orde to get the hole range (=distance between two keys).
|
||||
// - Filter: min threshold on range length
|
||||
// - Rank: by coverage size (=number of image layers required to reconstruct each key in the range for which we have any data)
|
||||
//
|
||||
// For more details, intuition, and some ASCII art see https://github.com/neondatabase/neon/pull/3597#discussion_r1112704451
|
||||
#[derive(PartialEq, Eq)]
|
||||
struct Hole {
|
||||
key_range: Range<Key>,
|
||||
coverage_size: usize,
|
||||
}
|
||||
let holes: Vec<Hole> = {
|
||||
use std::cmp::Ordering;
|
||||
impl Ord for Hole {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
self.coverage_size.cmp(&other.coverage_size).reverse()
|
||||
}
|
||||
}
|
||||
impl PartialOrd for Hole {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
|
||||
Some(self.cmp(other))
|
||||
}
|
||||
}
|
||||
let max_holes = deltas_to_compact.len();
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
|
||||
let min_hole_coverage_size = 3; // TODO: something more flexible?
|
||||
// min-heap (reserve space for one more element added before eviction)
|
||||
let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
|
||||
let mut prev: Option<Key> = None;
|
||||
|
||||
for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
|
||||
if let Some(prev_key) = prev {
|
||||
// just first fast filter, do not create hole entries for metadata keys. The last hole in the
|
||||
// compaction is the gap between data key and metadata keys.
|
||||
if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
|
||||
&& !Key::is_metadata_key(&prev_key)
|
||||
{
|
||||
let key_range = prev_key..next_key;
|
||||
// Measuring hole by just subtraction of i128 representation of key range boundaries
|
||||
// has not so much sense, because largest holes will corresponds field1/field2 changes.
|
||||
// But we are mostly interested to eliminate holes which cause generation of excessive image layers.
|
||||
// That is why it is better to measure size of hole as number of covering image layers.
|
||||
let coverage_size =
|
||||
layers.image_coverage(&key_range, last_record_lsn).len();
|
||||
if coverage_size >= min_hole_coverage_size {
|
||||
heap.push(Hole {
|
||||
key_range,
|
||||
coverage_size,
|
||||
});
|
||||
if heap.len() > max_holes {
|
||||
heap.pop(); // remove smallest hole
|
||||
}
|
||||
for &DeltaEntry { key: next_key, .. } in all_keys.iter() {
|
||||
if let Some(prev_key) = prev {
|
||||
// just first fast filter, do not create hole entries for metadata keys. The last hole in the
|
||||
// compaction is the gap between data key and metadata keys.
|
||||
if next_key.to_i128() - prev_key.to_i128() >= min_hole_range
|
||||
&& !Key::is_metadata_key(&prev_key)
|
||||
{
|
||||
let key_range = prev_key..next_key;
|
||||
// Measuring hole by just subtraction of i128 representation of key range boundaries
|
||||
// has not so much sense, because largest holes will corresponds field1/field2 changes.
|
||||
// But we are mostly interested to eliminate holes which cause generation of excessive image layers.
|
||||
// That is why it is better to measure size of hole as number of covering image layers.
|
||||
let coverage_size = layers.image_coverage(&key_range, last_record_lsn).len();
|
||||
if coverage_size >= min_hole_coverage_size {
|
||||
heap.push(Hole {
|
||||
key_range,
|
||||
coverage_size,
|
||||
});
|
||||
if heap.len() > max_holes {
|
||||
heap.pop(); // remove smallest hole
|
||||
}
|
||||
}
|
||||
}
|
||||
prev = Some(next_key.next());
|
||||
}
|
||||
let mut holes = heap.into_vec();
|
||||
holes.sort_unstable_by_key(|hole| hole.key_range.start);
|
||||
holes
|
||||
};
|
||||
prev = Some(next_key.next());
|
||||
}
|
||||
stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
|
||||
drop_rlock(guard);
|
||||
stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
|
||||
let mut holes = heap.into_vec();
|
||||
holes.sort_unstable_by_key(|hole| hole.key_range.start);
|
||||
let mut next_hole = 0; // index of next hole in holes vector
|
||||
|
||||
// This iterator walks through all key-value pairs from all the layers
|
||||
// we're compacting, in key, LSN order.
|
||||
// If there's both a Value::Image and Value::WalRecord for the same (key,lsn),
|
||||
// then the Value::Image is ordered before Value::WalRecord.
|
||||
//
|
||||
// TODO(https://github.com/neondatabase/neon/issues/8184): remove the page cached blob_io
|
||||
// option and validation code once we've reached confidence.
|
||||
enum AllValuesIter<'a> {
|
||||
PageCachedBlobIo {
|
||||
all_keys_iter: VecIter<'a>,
|
||||
},
|
||||
StreamingKmergeBypassingPageCache {
|
||||
merge_iter: MergeIterator<'a>,
|
||||
},
|
||||
ValidatingStreamingKmergeBypassingPageCache {
|
||||
mode: CompactL0BypassPageCacheValidation,
|
||||
merge_iter: MergeIterator<'a>,
|
||||
all_keys_iter: VecIter<'a>,
|
||||
},
|
||||
}
|
||||
type VecIter<'a> = std::slice::Iter<'a, DeltaEntry<'a>>; // TODO: distinguished lifetimes
|
||||
impl AllValuesIter<'_> {
|
||||
async fn next_all_keys_iter(
|
||||
iter: &mut VecIter<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
|
||||
let Some(DeltaEntry {
|
||||
key,
|
||||
lsn,
|
||||
val: value_ref,
|
||||
..
|
||||
}) = iter.next()
|
||||
else {
|
||||
return Ok(None);
|
||||
};
|
||||
let value = value_ref.load(ctx).await?;
|
||||
Ok(Some((*key, *lsn, value)))
|
||||
}
|
||||
async fn next(
|
||||
&mut self,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Option<(Key, Lsn, Value)>> {
|
||||
match self {
|
||||
AllValuesIter::PageCachedBlobIo { all_keys_iter: iter } => {
|
||||
Self::next_all_keys_iter(iter, ctx).await
|
||||
}
|
||||
AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter } => merge_iter.next().await,
|
||||
AllValuesIter::ValidatingStreamingKmergeBypassingPageCache { mode, merge_iter, all_keys_iter } => async {
|
||||
// advance both iterators
|
||||
let all_keys_iter_item = Self::next_all_keys_iter(all_keys_iter, ctx).await;
|
||||
let merge_iter_item = merge_iter.next().await;
|
||||
// compare results & log warnings as needed
|
||||
macro_rules! rate_limited_warn {
|
||||
($($arg:tt)*) => {{
|
||||
if cfg!(debug_assertions) || cfg!(feature = "testing") {
|
||||
warn!($($arg)*);
|
||||
panic!("CompactL0BypassPageCacheValidation failure, check logs");
|
||||
}
|
||||
use once_cell::sync::Lazy;
|
||||
use utils::rate_limit::RateLimit;
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
static LOGGED: Lazy<Mutex<RateLimit>> =
|
||||
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(10))));
|
||||
let mut rate_limit = LOGGED.lock().unwrap();
|
||||
rate_limit.call(|| {
|
||||
warn!($($arg)*);
|
||||
});
|
||||
}}
|
||||
}
|
||||
match (&all_keys_iter_item, &merge_iter_item) {
|
||||
(Err(_), Err(_)) => {
|
||||
// don't bother asserting equivality of the errors
|
||||
}
|
||||
(Err(all_keys), Ok(merge)) => {
|
||||
rate_limited_warn!(?merge, "all_keys_iter returned an error where merge did not: {all_keys:?}");
|
||||
},
|
||||
(Ok(all_keys), Err(merge)) => {
|
||||
rate_limited_warn!(?all_keys, "merge returned an error where all_keys_iter did not: {merge:?}");
|
||||
},
|
||||
(Ok(None), Ok(None)) => { }
|
||||
(Ok(Some(all_keys)), Ok(None)) => {
|
||||
rate_limited_warn!(?all_keys, "merge returned None where all_keys_iter returned Some");
|
||||
}
|
||||
(Ok(None), Ok(Some(merge))) => {
|
||||
rate_limited_warn!(?merge, "all_keys_iter returned None where merge returned Some");
|
||||
}
|
||||
(Ok(Some((all_keys_key, all_keys_lsn, all_keys_value))), Ok(Some((merge_key, merge_lsn, merge_value)))) => {
|
||||
match mode {
|
||||
// TODO: in this mode, we still load the value from disk for both iterators, even though we only need the all_keys_iter one
|
||||
CompactL0BypassPageCacheValidation::KeyLsn => {
|
||||
let all_keys = (all_keys_key, all_keys_lsn);
|
||||
let merge = (merge_key, merge_lsn);
|
||||
if all_keys != merge {
|
||||
rate_limited_warn!(?all_keys, ?merge, "merge returned a different (Key,LSN) than all_keys_iter");
|
||||
}
|
||||
}
|
||||
CompactL0BypassPageCacheValidation::KeyLsnValue => {
|
||||
let all_keys = (all_keys_key, all_keys_lsn, all_keys_value);
|
||||
let merge = (merge_key, merge_lsn, merge_value);
|
||||
if all_keys != merge {
|
||||
rate_limited_warn!(?all_keys, ?merge, "merge returned a different (Key,LSN,Value) than all_keys_iter");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// in case of mismatch, trust the legacy all_keys_iter_item
|
||||
all_keys_iter_item
|
||||
}.instrument(info_span!("next")).await
|
||||
}
|
||||
}
|
||||
}
|
||||
let mut all_values_iter = match &self.conf.compact_level0_phase1_value_access {
|
||||
CompactL0Phase1ValueAccess::PageCachedBlobIo => AllValuesIter::PageCachedBlobIo {
|
||||
all_keys_iter: all_keys.iter(),
|
||||
},
|
||||
CompactL0Phase1ValueAccess::StreamingKmerge { validate } => {
|
||||
let merge_iter = {
|
||||
let mut deltas = Vec::with_capacity(deltas_to_compact.len());
|
||||
for l in deltas_to_compact.iter() {
|
||||
let l = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
|
||||
deltas.push(l);
|
||||
}
|
||||
MergeIterator::create(&deltas, &[], ctx)
|
||||
};
|
||||
match validate {
|
||||
None => AllValuesIter::StreamingKmergeBypassingPageCache { merge_iter },
|
||||
Some(validate) => AllValuesIter::ValidatingStreamingKmergeBypassingPageCache {
|
||||
mode: validate.clone(),
|
||||
merge_iter,
|
||||
all_keys_iter: all_keys.iter(),
|
||||
},
|
||||
}
|
||||
}
|
||||
};
|
||||
let all_values_iter = all_keys.iter();
|
||||
|
||||
// This iterator walks through all keys and is needed to calculate size used by each key
|
||||
let mut all_keys_iter = all_keys
|
||||
@@ -1046,13 +882,12 @@ impl Timeline {
|
||||
let mut key_values_total_size = 0u64;
|
||||
let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key
|
||||
let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key
|
||||
let mut next_hole = 0; // index of next hole in holes vector
|
||||
|
||||
while let Some((key, lsn, value)) = all_values_iter
|
||||
.next(ctx)
|
||||
.await
|
||||
.map_err(CompactionError::Other)?
|
||||
for &DeltaEntry {
|
||||
key, lsn, ref val, ..
|
||||
} in all_values_iter
|
||||
{
|
||||
let value = val.load(ctx).await.map_err(CompactionError::Other)?;
|
||||
let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
|
||||
// We need to check key boundaries once we reach next key or end of layer with the same key
|
||||
if !same_key || lsn == dup_end_lsn {
|
||||
@@ -1240,10 +1075,6 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
// Without this, rustc complains about deltas_to_compact still
|
||||
// being borrowed when we `.into_iter()` below.
|
||||
drop(all_values_iter);
|
||||
|
||||
Ok(CompactLevel0Phase1Result {
|
||||
new_layers,
|
||||
deltas_to_compact: deltas_to_compact
|
||||
@@ -1351,43 +1182,6 @@ impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
|
||||
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
|
||||
pub enum CompactL0Phase1ValueAccess {
|
||||
/// The old way.
|
||||
PageCachedBlobIo,
|
||||
/// The new way.
|
||||
StreamingKmerge {
|
||||
/// If set, we run both the old way and the new way, validate that
|
||||
/// they are identical (=> [`CompactL0BypassPageCacheValidation`]),
|
||||
/// and if the validation fails,
|
||||
/// - in tests: fail them with a panic or
|
||||
/// - in prod, log a rate-limited warning and use the old way's results.
|
||||
///
|
||||
/// If not set, we only run the new way and trust its results.
|
||||
validate: Option<CompactL0BypassPageCacheValidation>,
|
||||
},
|
||||
}
|
||||
|
||||
/// See [`CompactL0Phase1ValueAccess::StreamingKmerge`].
|
||||
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub enum CompactL0BypassPageCacheValidation {
|
||||
/// Validate that the series of (key, lsn) pairs are the same.
|
||||
KeyLsn,
|
||||
/// Validate that the entire output of old and new way is identical.
|
||||
KeyLsnValue,
|
||||
}
|
||||
|
||||
impl Default for CompactL0Phase1ValueAccess {
|
||||
fn default() -> Self {
|
||||
CompactL0Phase1ValueAccess::StreamingKmerge {
|
||||
// TODO(https://github.com/neondatabase/neon/issues/8184): change to None once confident
|
||||
validate: Some(CompactL0BypassPageCacheValidation::KeyLsnValue),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Timeline {
|
||||
/// Entry point for new tiered compaction algorithm.
|
||||
///
|
||||
|
||||
Reference in New Issue
Block a user