Compare commits

...

7 Commits

Author SHA1 Message Date
Christian Schwarz
98d0bca8d1 Merge branch 'problame/repartition-bail-on-concurrent-call' into problame/avoid-count-deltas-if-no-changes 2024-02-21 18:05:29 +00:00
Christian Schwarz
bafa6d37d9 Merge remote-tracking branch 'origin/main' into problame/repartition-bail-on-concurrent-call 2024-02-21 18:05:28 +00:00
Christian Schwarz
ad23c945df WIP 2024-02-21 18:04:04 +00:00
Christian Schwarz
4536caa253 LayerMap: keep track of rebuilds 2024-02-21 17:02:39 +00:00
Christian Schwarz
d7920a2a57 Merge remote-tracking branch 'origin/main' into problame/repartition-bail-on-concurrent-call 2024-02-21 16:01:29 +00:00
Christian Schwarz
f990e07581 Merge remote-tracking branch 'origin/main' into problame/repartition-bail-on-concurrent-call 2024-02-21 15:56:07 +00:00
Christian Schwarz
1af2f3caf1 Timeline::repartition: enforce no concurrent callers & lsn to not move backwards
This PR enforces aspects of `Timeline::repartition` that were already
true at runtime:

- it's not called concurrently, so, bail out if it is anyway (see
  comment why it's not called concurrently)
- the `lsn` should never be moving backwards over the lifetime of a
  Timeline object, because last_record_lsn() can only move forwards
  over the lifetime of a Timeline object

part of #6861
2024-02-21 15:52:28 +00:00
3 changed files with 199 additions and 43 deletions

View File

@@ -61,6 +61,8 @@ use utils::lsn::Lsn;
use historic_layer_coverage::BufferedHistoricLayerCoverage;
pub use historic_layer_coverage::LayerKey;
pub(crate) use self::historic_layer_coverage::RebuildVersion;
use super::storage_layer::PersistentLayerDesc;
///
@@ -500,7 +502,7 @@ impl LayerMap {
///
/// Helper function for BatchedUpdates::remove_historic
///
pub fn remove_historic_noflush(&mut self, layer_desc: &PersistentLayerDesc) {
pub(self) fn remove_historic_noflush(&mut self, layer_desc: &PersistentLayerDesc) {
self.historic
.remove(historic_layer_coverage::LayerKey::from(layer_desc));
let layer_key = layer_desc.key();
@@ -525,6 +527,10 @@ impl LayerMap {
self.historic.rebuild();
}
pub fn get_rebuild_version(&self) -> RebuildVersion {
self.historic.get_rebuild_version()
}
/// Is there a newer image layer for given key- and LSN-range? Or a set
/// of image layers within the specified lsn range that cover the entire
/// specified key range?

View File

@@ -413,6 +413,8 @@ fn test_persistent_overlapping() {
/// See this for more on persistent and retroactive techniques:
/// <https://www.youtube.com/watch?v=WqCWghETNDc&t=581s>
pub struct BufferedHistoricLayerCoverage<Value> {
rebuild_version: RebuildVersion,
/// A persistent layer map that we rebuild when we need to retroactively update
historic_coverage: HistoricLayerCoverage<Value>,
@@ -438,9 +440,33 @@ impl<T: Clone> Default for BufferedHistoricLayerCoverage<T> {
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct RebuildVersion(u64);
impl RebuildVersion {
fn inc(&mut self) {
self.0
.checked_add(1)
.expect("at current clock cycles, we won't hit this");
}
}
impl Default for RebuildVersion {
fn default() -> Self {
RebuildVersion(1)
}
}
impl std::fmt::Display for RebuildVersion {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
pub fn new() -> Self {
Self {
rebuild_version: RebuildVersion::default(),
historic_coverage: HistoricLayerCoverage::<Value>::new(),
buffer: BTreeMap::new(),
layers: BTreeMap::new(),
@@ -462,6 +488,8 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
None => return, // No need to rebuild if buffer is empty
};
self.rebuild_version.inc();
// Apply buffered updates to self.layers
let num_updates = self.buffer.len();
self.buffer.retain(|layer_key, layer| {
@@ -493,11 +521,17 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
// TODO maybe only warn if ratio is at least 10
info!(
version = %self.rebuild_version,
"Rebuilt layer map. Did {} insertions to process a batch of {} updates.",
num_inserted, num_updates,
num_inserted,
num_updates,
)
}
pub fn get_rebuild_version(&self) -> RebuildVersion {
self.rebuild_version
}
/// Iterate all the layers
pub fn iter(&self) -> impl '_ + Iterator<Item = Value> {
// NOTE we can actually perform this without rebuilding,

View File

@@ -111,10 +111,10 @@ use self::layer_manager::LayerManager;
use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::remote_timeline_client::RemoteTimelineClient;
use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline};
use super::{config::TenantConf, storage_layer::ReadableLayerDesc};
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
use super::{layer_map, remote_timeline_client::RemoteTimelineClient};
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
@@ -210,6 +210,10 @@ pub struct Timeline {
/// so that e.g. on-demand-download/eviction, and layer spreading, can operate just on `LayerFileManager`.
pub(crate) layers: Arc<tokio::sync::RwLock<LayerManager>>,
/// State that [`Self::create_image_layers`] keeps across invocations to determine if it can
/// skip an invocation becaucse nothing changed.
create_image_layers_skipper_state: std::sync::Mutex<Option<CreateImageLayersParams>>,
/// Set of key ranges which should be covered by image layers to
/// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
/// It is used by compaction task when it checks if new image layer should be created.
@@ -302,8 +306,9 @@ pub struct Timeline {
// though let's keep them both for better error visibility.
pub initdb_lsn: Lsn,
/// When did we last calculate the partitioning?
partitioning: Mutex<(KeyPartitioning, Lsn)>,
/// Used by [`Timline::repartition`] to avoid re-computing partitioning on every iteration.
/// Must bump [`CompactionKeyspacePartitioning::version`] when changing.
partitioning: tokio::sync::Mutex<CompactionKeyspacePartitioning>,
/// Configuration: how often should the partitioning be recalculated.
repartition_threshold: u64,
@@ -400,6 +405,57 @@ pub struct GcInfo {
pub pitr_cutoff: Lsn,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
struct CreateImageLayersParams {
/// From [`layer_map::LayerMap::get_rebuild_version`].
layer_map_version: layer_map::RebuildVersion,
/// From [`Timeline::partitioning`].
partitioning_version: CompactionKeyspacePartitioningVersion,
}
/// The version of the keyspace partitioning maintained in [`Timeline::partitioning`].
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct CompactionKeyspacePartitioningVersion(u64);
impl CompactionKeyspacePartitioningVersion {
fn inc(&mut self) {
self.0
.checked_add(1)
.expect("at current clock cycles, we won't hit this");
}
}
#[derive(Clone)]
struct CompactionKeyspacePartitioning {
version: CompactionKeyspacePartitioningVersion,
partitioning: KeyPartitioning,
lsn: Lsn,
}
struct CompactionKeyspacePartition<'a> {
key_space: &'a KeySpace,
lsn: Lsn,
}
impl CompactionKeyspacePartitioning {
pub fn update(&mut self, partitioning: KeyPartitioning, lsn: Lsn) {
assert!(self.lsn <= lsn);
self.version.inc();
self.partitioning = partitioning;
self.lsn = lsn;
}
pub fn iter(&self) -> impl Iterator<Item = CompactionKeyspacePartition<'_>> {
self.partitioning
.parts
.iter()
.map(|key_space| CompactionKeyspacePartition {
key_space,
lsn: self.lsn,
})
}
}
/// An error happened in a get() operation.
#[derive(thiserror::Error, Debug)]
pub(crate) enum PageReconstructError {
@@ -1154,7 +1210,7 @@ impl Timeline {
)
.await
{
Ok((partitioning, lsn)) => {
Ok(partitioning) => {
// Disables access_stats updates, so that the files we read remain candidates for eviction after we're done with them
let image_ctx = RequestContextBuilder::extend(ctx)
.access_stats_behavior(AccessStatsBehavior::Skip)
@@ -1168,7 +1224,7 @@ impl Timeline {
// 3. Create new image layers for partitions that have been modified
// "enough".
let layers = self
.create_image_layers(&partitioning, lsn, false, &image_ctx)
.create_image_layers(&partitioning, false, &image_ctx)
.await
.map_err(anyhow::Error::from)?;
if let Some(remote_client) = &self.remote_client {
@@ -1659,8 +1715,15 @@ impl Timeline {
// initial logical size is 0.
LogicalSize::empty_initial()
},
partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))),
partitioning: tokio::sync::Mutex::new(CompactionKeyspacePartitioning {
// The other fields don't matter because this is 0 and hence repartition(),
// will calculate a new one
lsn: Lsn(0),
version: CompactionKeyspacePartitioningVersion(0),
partitioning: KeyPartitioning::new(),
}),
repartition_threshold: 0,
create_image_layers_skipper_state: std::sync::Mutex::new(None),
last_received_wal: Mutex::new(None),
rel_size_cache: RwLock::new(HashMap::new()),
@@ -3151,7 +3214,7 @@ impl Timeline {
}
// Note: The 'ctx' in use here has DownloadBehavior::Error. We should not
// require downloading anything during initial import.
let (partitioning, _lsn) = self
let partitioning = self
.repartition(
self.initdb_lsn,
self.get_compaction_target_size(),
@@ -3166,8 +3229,7 @@ impl Timeline {
// For image layers, we add them immediately into the layer map.
(
self.create_image_layers(&partitioning, self.initdb_lsn, true, ctx)
.await?,
self.create_image_layers(&partitioning, true, ctx).await?,
None,
)
} else {
@@ -3405,36 +3467,46 @@ impl Timeline {
partition_size: u64,
flags: EnumSet<CompactFlags>,
ctx: &RequestContext,
) -> anyhow::Result<(KeyPartitioning, Lsn)> {
{
let partitioning_guard = self.partitioning.lock().unwrap();
let distance = lsn.0 - partitioning_guard.1 .0;
if partitioning_guard.1 != Lsn(0)
&& distance <= self.repartition_threshold
&& !flags.contains(CompactFlags::ForceRepartition)
{
debug!(
distance,
threshold = self.repartition_threshold,
"no repartitioning needed"
);
return Ok((partitioning_guard.0.clone(), partitioning_guard.1));
}
) -> anyhow::Result<CompactionKeyspacePartitioning> {
let Ok(mut partitioning_guard) = self.partitioning.try_lock() else {
// NB: there are two callers, one is the compaction task, of which there is only one per struct Tenant and hence Timelien.
// The other is the initdb optimization in flush_frozen_layer, used by `boostrap_timeline`, which runs before `.activate()`
// and hence before the compaction task starts.
anyhow::bail!("repartition() called concurrently, this should not happen");
};
if lsn < partitioning_guard.lsn {
anyhow::bail!("repartition() called with LSN going backwards, this should not happen");
}
let distance = lsn.0 - partitioning_guard.lsn.0;
if partitioning_guard.lsn != Lsn(0)
&& distance <= self.repartition_threshold
&& !flags.contains(CompactFlags::ForceRepartition)
{
debug!(
distance,
threshold = self.repartition_threshold,
"no repartitioning needed"
);
return Ok(partitioning_guard.clone());
}
let keyspace = self.collect_keyspace(lsn, ctx).await?;
let partitioning = keyspace.partition(partition_size);
let mut partitioning_guard = self.partitioning.lock().unwrap();
if lsn > partitioning_guard.1 {
*partitioning_guard = (partitioning, lsn);
} else {
warn!("Concurrent repartitioning of keyspace. This unexpected, but probably harmless");
}
Ok((partitioning_guard.0.clone(), partitioning_guard.1))
partitioning_guard.update(partitioning, lsn);
Ok(partitioning_guard.clone())
}
// Is it time to create a new image layer for the given partition?
async fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> bool {
async fn time_for_new_image_layer(
&self,
&CompactionKeyspacePartition {
key_space: partition,
lsn,
}: &CompactionKeyspacePartition<'_>,
) -> bool {
let threshold = self.get_image_creation_threshold();
let guard = self.layers.read().await;
@@ -3509,14 +3581,14 @@ impl Timeline {
false
}
#[tracing::instrument(skip_all, fields(%lsn, %force))]
#[tracing::instrument(skip_all, fields(lsn=%partitioning.lsn, %force))]
async fn create_image_layers(
self: &Arc<Timeline>,
partitioning: &KeyPartitioning,
lsn: Lsn,
partitioning: &CompactionKeyspacePartitioning,
force: bool,
ctx: &RequestContext,
) -> Result<Vec<ResidentLayer>, CreateImageLayersError> {
// REVIEW: should we not even bump this timer if we're taking short exit?
let timer = self.metrics.create_images_time_histo.start_timer();
let mut image_layers = Vec::new();
@@ -3531,9 +3603,30 @@ impl Timeline {
// image layers <100000000..100000099> and <200000000..200000199> are not completely covering it.
let mut start = Key::MIN;
for partition in partitioning.parts.iter() {
let img_range = start..partition.ranges.last().unwrap().end;
if !force && !self.time_for_new_image_layer(partition, lsn).await {
let new_inputs = {
let layer_manager = self.layers.read().await;
let layer_map = layer_manager.layer_map();
CreateImageLayersParams {
layer_map_version: layer_map.get_rebuild_version(),
partitioning_version: partitioning.version,
}
};
let last_inputs = {
// bail out early if nothing changed
let last = self.create_image_layers_skipper_state.lock().unwrap();
match (force, &*last, &new_inputs) {
(false, Some(last), new_inputs) if last == new_inputs => {
debug!(?last, ?new_inputs, "inputs to image layer creation algorithm have not changed since last invocation");
return Ok(image_layers);
}
_ => (), // we'll update .last once we finish with success
}
*last
};
for partition in partitioning.iter() {
let img_range = start..partition.key_space.ranges.last().unwrap().end;
if !force && !self.time_for_new_image_layer(&partition).await {
start = img_range.end;
continue;
}
@@ -3543,7 +3636,7 @@ impl Timeline {
self.timeline_id,
self.tenant_shard_id,
&img_range,
lsn,
partitioning.lsn,
)
.await?;
@@ -3556,7 +3649,7 @@ impl Timeline {
let mut wrote_keys = false;
let mut key_request_accum = KeySpaceAccum::new();
for range in &partition.ranges {
for range in &partition.key_space.ranges {
let mut key = range.start;
while key < range.end {
// Decide whether to retain this key: usually we do, but sharded tenants may
@@ -3580,7 +3673,11 @@ impl Timeline {
|| last_key_in_range
{
let results = self
.get_vectored(key_request_accum.consume_keyspace(), lsn, ctx)
.get_vectored(
key_request_accum.consume_keyspace(),
partitioning.lsn,
ctx,
)
.await?;
for (img_key, img) in results {
@@ -3669,6 +3766,25 @@ impl Timeline {
.context("fsync of timeline dir")?;
}
// Remember the inputs so that we take the early exit next compaction iteration
// if nothing changed in the meantime.
// NB: since we created `new_inputs`, the layer map might have changed and hence
// `get_rebuild_version()` might have advanced already, e.g., by a
// concurrent garbage collection iteration that removed layers. That's ok, next
// `new_inputs` will observe an increased `get_rebuild_version()` and run again.
// Same goes for the modifications that we're doing below: if `image_layers` is
// not empty, we'll insert them into the layer map in the code below, which
// will bump `get_rebuild_version()`, which will make us re-run once more.
//
{
let mut state = self.create_image_layers_skipper_state.lock().unwrap();
if *state != last_inputs {
warn!(?last_inputs, ?new_inputs, observed=?*state, "unexpected: create_image_layers called concurrently? not caching inputs");
} else {
*state = Some(new_inputs);
}
}
let mut guard = self.layers.write().await;
// FIXME: we could add the images to be uploaded *before* returning from here, but right