|
|
|
|
@@ -111,10 +111,10 @@ use self::layer_manager::LayerManager;
|
|
|
|
|
use self::logical_size::LogicalSize;
|
|
|
|
|
use self::walreceiver::{WalReceiver, WalReceiverConf};
|
|
|
|
|
|
|
|
|
|
use super::remote_timeline_client::RemoteTimelineClient;
|
|
|
|
|
use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline};
|
|
|
|
|
use super::{config::TenantConf, storage_layer::ReadableLayerDesc};
|
|
|
|
|
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
|
|
|
|
|
use super::{layer_map, remote_timeline_client::RemoteTimelineClient};
|
|
|
|
|
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
|
|
|
|
|
|
|
|
|
|
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
|
|
|
|
@@ -210,6 +210,10 @@ pub struct Timeline {
|
|
|
|
|
/// so that e.g. on-demand-download/eviction, and layer spreading, can operate just on `LayerFileManager`.
|
|
|
|
|
pub(crate) layers: Arc<tokio::sync::RwLock<LayerManager>>,
|
|
|
|
|
|
|
|
|
|
/// State that [`Self::create_image_layers`] keeps across invocations to determine if it can
|
|
|
|
|
/// skip an invocation becaucse nothing changed.
|
|
|
|
|
create_image_layers_skipper_state: std::sync::Mutex<Option<CreateImageLayersParams>>,
|
|
|
|
|
|
|
|
|
|
/// Set of key ranges which should be covered by image layers to
|
|
|
|
|
/// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
|
|
|
|
|
/// It is used by compaction task when it checks if new image layer should be created.
|
|
|
|
|
@@ -302,8 +306,9 @@ pub struct Timeline {
|
|
|
|
|
// though let's keep them both for better error visibility.
|
|
|
|
|
pub initdb_lsn: Lsn,
|
|
|
|
|
|
|
|
|
|
/// When did we last calculate the partitioning?
|
|
|
|
|
partitioning: Mutex<(KeyPartitioning, Lsn)>,
|
|
|
|
|
/// Used by [`Timline::repartition`] to avoid re-computing partitioning on every iteration.
|
|
|
|
|
/// Must bump [`CompactionKeyspacePartitioning::version`] when changing.
|
|
|
|
|
partitioning: tokio::sync::Mutex<CompactionKeyspacePartitioning>,
|
|
|
|
|
|
|
|
|
|
/// Configuration: how often should the partitioning be recalculated.
|
|
|
|
|
repartition_threshold: u64,
|
|
|
|
|
@@ -400,6 +405,57 @@ pub struct GcInfo {
|
|
|
|
|
pub pitr_cutoff: Lsn,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
|
|
|
|
struct CreateImageLayersParams {
|
|
|
|
|
/// From [`layer_map::LayerMap::get_rebuild_version`].
|
|
|
|
|
layer_map_version: layer_map::RebuildVersion,
|
|
|
|
|
/// From [`Timeline::partitioning`].
|
|
|
|
|
partitioning_version: CompactionKeyspacePartitioningVersion,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// The version of the keyspace partitioning maintained in [`Timeline::partitioning`].
|
|
|
|
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
|
|
|
|
struct CompactionKeyspacePartitioningVersion(u64);
|
|
|
|
|
|
|
|
|
|
impl CompactionKeyspacePartitioningVersion {
|
|
|
|
|
fn inc(&mut self) {
|
|
|
|
|
self.0
|
|
|
|
|
.checked_add(1)
|
|
|
|
|
.expect("at current clock cycles, we won't hit this");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Clone)]
|
|
|
|
|
struct CompactionKeyspacePartitioning {
|
|
|
|
|
version: CompactionKeyspacePartitioningVersion,
|
|
|
|
|
partitioning: KeyPartitioning,
|
|
|
|
|
lsn: Lsn,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
struct CompactionKeyspacePartition<'a> {
|
|
|
|
|
key_space: &'a KeySpace,
|
|
|
|
|
lsn: Lsn,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl CompactionKeyspacePartitioning {
|
|
|
|
|
pub fn update(&mut self, partitioning: KeyPartitioning, lsn: Lsn) {
|
|
|
|
|
assert!(self.lsn <= lsn);
|
|
|
|
|
self.version.inc();
|
|
|
|
|
self.partitioning = partitioning;
|
|
|
|
|
self.lsn = lsn;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn iter(&self) -> impl Iterator<Item = CompactionKeyspacePartition<'_>> {
|
|
|
|
|
self.partitioning
|
|
|
|
|
.parts
|
|
|
|
|
.iter()
|
|
|
|
|
.map(|key_space| CompactionKeyspacePartition {
|
|
|
|
|
key_space,
|
|
|
|
|
lsn: self.lsn,
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// An error happened in a get() operation.
|
|
|
|
|
#[derive(thiserror::Error, Debug)]
|
|
|
|
|
pub(crate) enum PageReconstructError {
|
|
|
|
|
@@ -1154,7 +1210,7 @@ impl Timeline {
|
|
|
|
|
)
|
|
|
|
|
.await
|
|
|
|
|
{
|
|
|
|
|
Ok((partitioning, lsn)) => {
|
|
|
|
|
Ok(partitioning) => {
|
|
|
|
|
// Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
|
|
|
|
|
let image_ctx = RequestContextBuilder::extend(ctx)
|
|
|
|
|
.access_stats_behavior(AccessStatsBehavior::Skip)
|
|
|
|
|
@@ -1168,7 +1224,7 @@ impl Timeline {
|
|
|
|
|
// 3. Create new image layers for partitions that have been modified
|
|
|
|
|
// "enough".
|
|
|
|
|
let layers = self
|
|
|
|
|
.create_image_layers(&partitioning, lsn, false, &image_ctx)
|
|
|
|
|
.create_image_layers(&partitioning, false, &image_ctx)
|
|
|
|
|
.await
|
|
|
|
|
.map_err(anyhow::Error::from)?;
|
|
|
|
|
if let Some(remote_client) = &self.remote_client {
|
|
|
|
|
@@ -1659,8 +1715,15 @@ impl Timeline {
|
|
|
|
|
// initial logical size is 0.
|
|
|
|
|
LogicalSize::empty_initial()
|
|
|
|
|
},
|
|
|
|
|
partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))),
|
|
|
|
|
partitioning: tokio::sync::Mutex::new(CompactionKeyspacePartitioning {
|
|
|
|
|
// The other fields don't matter because this is 0 and hence repartition(),
|
|
|
|
|
// will calculate a new one
|
|
|
|
|
lsn: Lsn(0),
|
|
|
|
|
version: CompactionKeyspacePartitioningVersion(0),
|
|
|
|
|
partitioning: KeyPartitioning::new(),
|
|
|
|
|
}),
|
|
|
|
|
repartition_threshold: 0,
|
|
|
|
|
create_image_layers_skipper_state: std::sync::Mutex::new(None),
|
|
|
|
|
|
|
|
|
|
last_received_wal: Mutex::new(None),
|
|
|
|
|
rel_size_cache: RwLock::new(HashMap::new()),
|
|
|
|
|
@@ -3151,7 +3214,7 @@ impl Timeline {
|
|
|
|
|
}
|
|
|
|
|
// Note: The 'ctx' in use here has DownloadBehavior::Error. We should not
|
|
|
|
|
// require downloading anything during initial import.
|
|
|
|
|
let (partitioning, _lsn) = self
|
|
|
|
|
let partitioning = self
|
|
|
|
|
.repartition(
|
|
|
|
|
self.initdb_lsn,
|
|
|
|
|
self.get_compaction_target_size(),
|
|
|
|
|
@@ -3166,8 +3229,7 @@ impl Timeline {
|
|
|
|
|
|
|
|
|
|
// For image layers, we add them immediately into the layer map.
|
|
|
|
|
(
|
|
|
|
|
self.create_image_layers(&partitioning, self.initdb_lsn, true, ctx)
|
|
|
|
|
.await?,
|
|
|
|
|
self.create_image_layers(&partitioning, true, ctx).await?,
|
|
|
|
|
None,
|
|
|
|
|
)
|
|
|
|
|
} else {
|
|
|
|
|
@@ -3405,36 +3467,46 @@ impl Timeline {
|
|
|
|
|
partition_size: u64,
|
|
|
|
|
flags: EnumSet<CompactFlags>,
|
|
|
|
|
ctx: &RequestContext,
|
|
|
|
|
) -> anyhow::Result<(KeyPartitioning, Lsn)> {
|
|
|
|
|
{
|
|
|
|
|
let partitioning_guard = self.partitioning.lock().unwrap();
|
|
|
|
|
let distance = lsn.0 - partitioning_guard.1 .0;
|
|
|
|
|
if partitioning_guard.1 != Lsn(0)
|
|
|
|
|
&& distance <= self.repartition_threshold
|
|
|
|
|
&& !flags.contains(CompactFlags::ForceRepartition)
|
|
|
|
|
{
|
|
|
|
|
debug!(
|
|
|
|
|
distance,
|
|
|
|
|
threshold = self.repartition_threshold,
|
|
|
|
|
"no repartitioning needed"
|
|
|
|
|
);
|
|
|
|
|
return Ok((partitioning_guard.0.clone(), partitioning_guard.1));
|
|
|
|
|
}
|
|
|
|
|
) -> anyhow::Result<CompactionKeyspacePartitioning> {
|
|
|
|
|
let Ok(mut partitioning_guard) = self.partitioning.try_lock() else {
|
|
|
|
|
// NB: there are two callers, one is the compaction task, of which there is only one per struct Tenant and hence Timelien.
|
|
|
|
|
// The other is the initdb optimization in flush_frozen_layer, used by `boostrap_timeline`, which runs before `.activate()`
|
|
|
|
|
// and hence before the compaction task starts.
|
|
|
|
|
anyhow::bail!("repartition() called concurrently, this should not happen");
|
|
|
|
|
};
|
|
|
|
|
if lsn < partitioning_guard.lsn {
|
|
|
|
|
anyhow::bail!("repartition() called with LSN going backwards, this should not happen");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let distance = lsn.0 - partitioning_guard.lsn.0;
|
|
|
|
|
if partitioning_guard.lsn != Lsn(0)
|
|
|
|
|
&& distance <= self.repartition_threshold
|
|
|
|
|
&& !flags.contains(CompactFlags::ForceRepartition)
|
|
|
|
|
{
|
|
|
|
|
debug!(
|
|
|
|
|
distance,
|
|
|
|
|
threshold = self.repartition_threshold,
|
|
|
|
|
"no repartitioning needed"
|
|
|
|
|
);
|
|
|
|
|
return Ok(partitioning_guard.clone());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let keyspace = self.collect_keyspace(lsn, ctx).await?;
|
|
|
|
|
let partitioning = keyspace.partition(partition_size);
|
|
|
|
|
|
|
|
|
|
let mut partitioning_guard = self.partitioning.lock().unwrap();
|
|
|
|
|
if lsn > partitioning_guard.1 {
|
|
|
|
|
*partitioning_guard = (partitioning, lsn);
|
|
|
|
|
} else {
|
|
|
|
|
warn!("Concurrent repartitioning of keyspace. This unexpected, but probably harmless");
|
|
|
|
|
}
|
|
|
|
|
Ok((partitioning_guard.0.clone(), partitioning_guard.1))
|
|
|
|
|
partitioning_guard.update(partitioning, lsn);
|
|
|
|
|
|
|
|
|
|
Ok(partitioning_guard.clone())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Is it time to create a new image layer for the given partition?
|
|
|
|
|
async fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> bool {
|
|
|
|
|
async fn time_for_new_image_layer(
|
|
|
|
|
&self,
|
|
|
|
|
&CompactionKeyspacePartition {
|
|
|
|
|
key_space: partition,
|
|
|
|
|
lsn,
|
|
|
|
|
}: &CompactionKeyspacePartition<'_>,
|
|
|
|
|
) -> bool {
|
|
|
|
|
let threshold = self.get_image_creation_threshold();
|
|
|
|
|
|
|
|
|
|
let guard = self.layers.read().await;
|
|
|
|
|
@@ -3509,14 +3581,14 @@ impl Timeline {
|
|
|
|
|
false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tracing::instrument(skip_all, fields(%lsn, %force))]
|
|
|
|
|
#[tracing::instrument(skip_all, fields(lsn=%partitioning.lsn, %force))]
|
|
|
|
|
async fn create_image_layers(
|
|
|
|
|
self: &Arc<Timeline>,
|
|
|
|
|
partitioning: &KeyPartitioning,
|
|
|
|
|
lsn: Lsn,
|
|
|
|
|
partitioning: &CompactionKeyspacePartitioning,
|
|
|
|
|
force: bool,
|
|
|
|
|
ctx: &RequestContext,
|
|
|
|
|
) -> Result<Vec<ResidentLayer>, CreateImageLayersError> {
|
|
|
|
|
// REVIEW: should we not even bump this timer if we're taking short exit?
|
|
|
|
|
let timer = self.metrics.create_images_time_histo.start_timer();
|
|
|
|
|
let mut image_layers = Vec::new();
|
|
|
|
|
|
|
|
|
|
@@ -3531,9 +3603,30 @@ impl Timeline {
|
|
|
|
|
// image layers <100000000..100000099> and <200000000..200000199> are not completely covering it.
|
|
|
|
|
let mut start = Key::MIN;
|
|
|
|
|
|
|
|
|
|
for partition in partitioning.parts.iter() {
|
|
|
|
|
let img_range = start..partition.ranges.last().unwrap().end;
|
|
|
|
|
if !force && !self.time_for_new_image_layer(partition, lsn).await {
|
|
|
|
|
let new_inputs = {
|
|
|
|
|
let layer_manager = self.layers.read().await;
|
|
|
|
|
let layer_map = layer_manager.layer_map();
|
|
|
|
|
CreateImageLayersParams {
|
|
|
|
|
layer_map_version: layer_map.get_rebuild_version(),
|
|
|
|
|
partitioning_version: partitioning.version,
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
let last_inputs = {
|
|
|
|
|
// bail out early if nothing changed
|
|
|
|
|
let last = self.create_image_layers_skipper_state.lock().unwrap();
|
|
|
|
|
match (force, &*last, &new_inputs) {
|
|
|
|
|
(false, Some(last), new_inputs) if last == new_inputs => {
|
|
|
|
|
debug!(?last, ?new_inputs, "inputs to image layer creation algorithm have not changed since last invocation");
|
|
|
|
|
return Ok(image_layers);
|
|
|
|
|
}
|
|
|
|
|
_ => (), // we'll update .last once we finish with success
|
|
|
|
|
}
|
|
|
|
|
*last
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
for partition in partitioning.iter() {
|
|
|
|
|
let img_range = start..partition.key_space.ranges.last().unwrap().end;
|
|
|
|
|
if !force && !self.time_for_new_image_layer(&partition).await {
|
|
|
|
|
start = img_range.end;
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
@@ -3543,7 +3636,7 @@ impl Timeline {
|
|
|
|
|
self.timeline_id,
|
|
|
|
|
self.tenant_shard_id,
|
|
|
|
|
&img_range,
|
|
|
|
|
lsn,
|
|
|
|
|
partitioning.lsn,
|
|
|
|
|
)
|
|
|
|
|
.await?;
|
|
|
|
|
|
|
|
|
|
@@ -3556,7 +3649,7 @@ impl Timeline {
|
|
|
|
|
let mut wrote_keys = false;
|
|
|
|
|
|
|
|
|
|
let mut key_request_accum = KeySpaceAccum::new();
|
|
|
|
|
for range in &partition.ranges {
|
|
|
|
|
for range in &partition.key_space.ranges {
|
|
|
|
|
let mut key = range.start;
|
|
|
|
|
while key < range.end {
|
|
|
|
|
// Decide whether to retain this key: usually we do, but sharded tenants may
|
|
|
|
|
@@ -3580,7 +3673,11 @@ impl Timeline {
|
|
|
|
|
|| last_key_in_range
|
|
|
|
|
{
|
|
|
|
|
let results = self
|
|
|
|
|
.get_vectored(key_request_accum.consume_keyspace(), lsn, ctx)
|
|
|
|
|
.get_vectored(
|
|
|
|
|
key_request_accum.consume_keyspace(),
|
|
|
|
|
partitioning.lsn,
|
|
|
|
|
ctx,
|
|
|
|
|
)
|
|
|
|
|
.await?;
|
|
|
|
|
|
|
|
|
|
for (img_key, img) in results {
|
|
|
|
|
@@ -3669,6 +3766,25 @@ impl Timeline {
|
|
|
|
|
.context("fsync of timeline dir")?;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Remember the inputs so that we take the early exit next compaction iteration
|
|
|
|
|
// if nothing changed in the meantime.
|
|
|
|
|
// NB: since we created `new_inputs`, the layer map might have changed and hence
|
|
|
|
|
// `get_rebuild_version()` might have advanced already, e.g., by a
|
|
|
|
|
// concurrent garbage collection iteration that removed layers. That's ok, next
|
|
|
|
|
// `new_inputs` will observe an increased `get_rebuild_version()` and run again.
|
|
|
|
|
// Same goes for the modifications that we're doing below: if `image_layers` is
|
|
|
|
|
// not empty, we'll insert them into the layer map in the code below, which
|
|
|
|
|
// will bump `get_rebuild_version()`, which will make us re-run once more.
|
|
|
|
|
//
|
|
|
|
|
{
|
|
|
|
|
let mut state = self.create_image_layers_skipper_state.lock().unwrap();
|
|
|
|
|
if *state != last_inputs {
|
|
|
|
|
warn!(?last_inputs, ?new_inputs, observed=?*state, "unexpected: create_image_layers called concurrently? not caching inputs");
|
|
|
|
|
} else {
|
|
|
|
|
*state = Some(new_inputs);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let mut guard = self.layers.write().await;
|
|
|
|
|
|
|
|
|
|
// FIXME: we could add the images to be uploaded *before* returning from here, but right
|
|
|
|
|
|