mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-04 12:02:55 +00:00
Minor improvements to tiered compaction (#7020)
Minor non-functional improvements to tiered compaction, mostly consisting of comment fixes. Followup of #6830, part of #6768
This commit is contained in:
@@ -63,7 +63,7 @@ pub async fn compact_tiered<E: CompactionJobExecutor>(
|
||||
);
|
||||
|
||||
// Identify the range of LSNs that belong to this level. We assume that
|
||||
// each file in this level span an LSN range up to 1.75x target file
|
||||
// each file in this level spans an LSN range up to 1.75x target file
|
||||
// size. That should give us enough slop that if we created a slightly
|
||||
// oversized L0 layer, e.g. because flushing the in-memory layer was
|
||||
// delayed for some reason, we don't consider the oversized layer to
|
||||
@@ -248,7 +248,6 @@ enum CompactionStrategy {
|
||||
CreateImage,
|
||||
}
|
||||
|
||||
#[allow(dead_code)] // Todo
|
||||
struct CompactionJob<E: CompactionJobExecutor> {
|
||||
key_range: Range<E::Key>,
|
||||
lsn_range: Range<Lsn>,
|
||||
@@ -345,7 +344,7 @@ where
|
||||
///
|
||||
/// TODO: Currently, this is called exactly once for the level, and we
|
||||
/// decide whether to create new image layers to cover the whole level, or
|
||||
/// write a new set of delta. In the future, this should try to partition
|
||||
/// write a new set of deltas. In the future, this should try to partition
|
||||
/// the key space, and make the decision separately for each partition.
|
||||
async fn divide_job(&mut self, job_id: JobId, ctx: &E::RequestContext) -> anyhow::Result<()> {
|
||||
let job = &self.jobs[job_id.0];
|
||||
@@ -709,18 +708,6 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
// Sliding window through keyspace and values
|
||||
//
|
||||
// This is used to decide what layer to write next, from the beginning of the window.
|
||||
//
|
||||
// Candidates:
|
||||
//
|
||||
// 1. Create an image layer, snapping to previous images
|
||||
// 2. Create a delta layer, snapping to previous images
|
||||
// 3. Create an image layer, snapping to
|
||||
//
|
||||
//
|
||||
|
||||
// Take previous partitioning, based on the image layers below.
|
||||
//
|
||||
// Candidate is at the front:
|
||||
@@ -739,6 +726,10 @@ struct WindowElement<K> {
|
||||
last_key: K, // inclusive
|
||||
accum_size: u64,
|
||||
}
|
||||
|
||||
// Sliding window through keyspace and values
|
||||
//
|
||||
// This is used to decide what layer to write next, from the beginning of the window.
|
||||
struct Window<K> {
|
||||
elems: VecDeque<WindowElement<K>>,
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
//! An LSM tree consists of multiple levels, each exponential larger than the
|
||||
//! previous level. And each level consists of be multiple "tiers". With tiered
|
||||
//! An LSM tree consists of multiple levels, each exponentially larger than the
|
||||
//! previous level. And each level consists of multiple "tiers". With tiered
|
||||
//! compaction, a level is compacted when it has accumulated more than N tiers,
|
||||
//! forming one tier on the next level.
|
||||
//!
|
||||
@@ -170,13 +170,6 @@ where
|
||||
})
|
||||
}
|
||||
|
||||
// helper struct used in depth()
|
||||
struct Event<K> {
|
||||
key: K,
|
||||
layer_idx: usize,
|
||||
start: bool,
|
||||
}
|
||||
|
||||
impl<L> Level<L> {
|
||||
/// Count the number of deltas stacked on each other.
|
||||
pub fn depth<K>(&self) -> u64
|
||||
@@ -184,6 +177,11 @@ impl<L> Level<L> {
|
||||
K: CompactionKey,
|
||||
L: CompactionLayer<K>,
|
||||
{
|
||||
struct Event<K> {
|
||||
key: K,
|
||||
layer_idx: usize,
|
||||
start: bool,
|
||||
}
|
||||
let mut events: Vec<Event<K>> = Vec::new();
|
||||
for (idx, l) in self.layers.iter().enumerate() {
|
||||
events.push(Event {
|
||||
@@ -202,7 +200,7 @@ impl<L> Level<L> {
|
||||
// Sweep the key space left to right. Stop at each distinct key, and
|
||||
// count the number of deltas on top of the highest image at that key.
|
||||
//
|
||||
// This is a little enefficient, as we walk through the active_set on
|
||||
// This is a little inefficient, as we walk through the active_set on
|
||||
// every key. We could increment/decrement a counter on each step
|
||||
// instead, but that'd require a bit more complex bookkeeping.
|
||||
let mut active_set: BTreeSet<(Lsn, bool, usize)> = BTreeSet::new();
|
||||
@@ -236,6 +234,7 @@ impl<L> Level<L> {
|
||||
}
|
||||
}
|
||||
}
|
||||
debug_assert_eq!(active_set, BTreeSet::new());
|
||||
max_depth
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,12 +4,12 @@
|
||||
//! All the heavy lifting is done by the create_image and create_delta
|
||||
//! functions that the implementor provides.
|
||||
use async_trait::async_trait;
|
||||
use futures::Future;
|
||||
use pageserver_api::{key::Key, keyspace::key_range_size};
|
||||
use std::ops::Range;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
/// Public interface. This is the main thing that the implementor needs to provide
|
||||
#[async_trait]
|
||||
pub trait CompactionJobExecutor {
|
||||
// Type system.
|
||||
//
|
||||
@@ -17,8 +17,7 @@ pub trait CompactionJobExecutor {
|
||||
// compaction doesn't distinguish whether they are stored locally or
|
||||
// remotely.
|
||||
//
|
||||
// The keyspace is defined by CompactionKey trait.
|
||||
//
|
||||
// The keyspace is defined by the CompactionKey trait.
|
||||
type Key: CompactionKey;
|
||||
|
||||
type Layer: CompactionLayer<Self::Key> + Clone;
|
||||
@@ -35,27 +34,27 @@ pub trait CompactionJobExecutor {
|
||||
// ----
|
||||
|
||||
/// Return all layers that overlap the given bounding box.
|
||||
async fn get_layers(
|
||||
fn get_layers(
|
||||
&mut self,
|
||||
key_range: &Range<Self::Key>,
|
||||
lsn_range: &Range<Lsn>,
|
||||
ctx: &Self::RequestContext,
|
||||
) -> anyhow::Result<Vec<Self::Layer>>;
|
||||
) -> impl Future<Output = anyhow::Result<Vec<Self::Layer>>> + Send;
|
||||
|
||||
async fn get_keyspace(
|
||||
fn get_keyspace(
|
||||
&mut self,
|
||||
key_range: &Range<Self::Key>,
|
||||
lsn: Lsn,
|
||||
ctx: &Self::RequestContext,
|
||||
) -> anyhow::Result<CompactionKeySpace<Self::Key>>;
|
||||
) -> impl Future<Output = anyhow::Result<CompactionKeySpace<Self::Key>>> + Send;
|
||||
|
||||
/// NB: This is a pretty expensive operation. In the real pageserver
|
||||
/// implementation, it downloads the layer, and keeps it resident
|
||||
/// until the DeltaLayer is dropped.
|
||||
async fn downcast_delta_layer(
|
||||
fn downcast_delta_layer(
|
||||
&self,
|
||||
layer: &Self::Layer,
|
||||
) -> anyhow::Result<Option<Self::DeltaLayer>>;
|
||||
) -> impl Future<Output = anyhow::Result<Option<Self::DeltaLayer>>> + Send;
|
||||
|
||||
// ----
|
||||
// Functions to execute the plan
|
||||
@@ -63,33 +62,33 @@ pub trait CompactionJobExecutor {
|
||||
|
||||
/// Create a new image layer, materializing all the values in the key range,
|
||||
/// at given 'lsn'.
|
||||
async fn create_image(
|
||||
fn create_image(
|
||||
&mut self,
|
||||
lsn: Lsn,
|
||||
key_range: &Range<Self::Key>,
|
||||
ctx: &Self::RequestContext,
|
||||
) -> anyhow::Result<()>;
|
||||
) -> impl Future<Output = anyhow::Result<()>> + Send;
|
||||
|
||||
/// Create a new delta layer, containing all the values from 'input_layers'
|
||||
/// in the given key and LSN range.
|
||||
async fn create_delta(
|
||||
fn create_delta(
|
||||
&mut self,
|
||||
lsn_range: &Range<Lsn>,
|
||||
key_range: &Range<Self::Key>,
|
||||
input_layers: &[Self::DeltaLayer],
|
||||
ctx: &Self::RequestContext,
|
||||
) -> anyhow::Result<()>;
|
||||
) -> impl Future<Output = anyhow::Result<()>> + Send;
|
||||
|
||||
/// Delete a layer. The compaction implementation will call this only after
|
||||
/// all the create_image() or create_delta() calls that deletion of this
|
||||
/// layer depends on have finished. But if the implementor has extra lazy
|
||||
/// background tasks, like uploading the index json file to remote storage,
|
||||
/// background tasks, like uploading the index json file to remote storage.
|
||||
/// it is the implementation's responsibility to track those.
|
||||
async fn delete_layer(
|
||||
fn delete_layer(
|
||||
&mut self,
|
||||
layer: &Self::Layer,
|
||||
ctx: &Self::RequestContext,
|
||||
) -> anyhow::Result<()>;
|
||||
) -> impl Future<Output = anyhow::Result<()>> + Send;
|
||||
}
|
||||
|
||||
pub trait CompactionKey: std::cmp::Ord + Clone + Copy + std::fmt::Display {
|
||||
|
||||
@@ -429,7 +429,6 @@ impl From<&Arc<MockImageLayer>> for MockLayer {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl interface::CompactionJobExecutor for MockTimeline {
|
||||
type Key = Key;
|
||||
type Layer = MockLayer;
|
||||
|
||||
Reference in New Issue
Block a user