diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 0586ec38c8..f09617849c 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -292,7 +292,7 @@ pub struct Timeline { pub initdb_lsn: Lsn, /// When did we last calculate the partitioning? - partitioning: Mutex<(KeyPartitioning, Lsn)>, + partitioning: tokio::sync::Mutex<(KeyPartitioning, Lsn)>, /// Configuration: how often should the partitioning be recalculated. repartition_threshold: u64, @@ -1640,7 +1640,7 @@ impl Timeline { // initial logical size is 0. LogicalSize::empty_initial() }, - partitioning: Mutex::new((KeyPartitioning::new(), Lsn(0))), + partitioning: tokio::sync::Mutex::new((KeyPartitioning::new(), Lsn(0))), repartition_threshold: 0, last_received_wal: Mutex::new(None), @@ -3354,30 +3354,34 @@ impl Timeline { flags: EnumSet, ctx: &RequestContext, ) -> anyhow::Result<(KeyPartitioning, Lsn)> { - { - let partitioning_guard = self.partitioning.lock().unwrap(); - let distance = lsn.0 - partitioning_guard.1 .0; - if partitioning_guard.1 != Lsn(0) - && distance <= self.repartition_threshold - && !flags.contains(CompactFlags::ForceRepartition) - { - debug!( - distance, - threshold = self.repartition_threshold, - "no repartitioning needed" - ); - return Ok((partitioning_guard.0.clone(), partitioning_guard.1)); - } + let Ok(mut partitioning_guard) = self.partitioning.try_lock() else { + // NB: there are two callers, one is the compaction task, of which there is only one per struct Tenant and hence Timeline. + // The other is the initdb optimization in flush_frozen_layer, used by `boostrap_timeline`, which runs before `.activate()` + // and hence before the compaction task starts. + anyhow::bail!("repartition() called concurrently, this should not happen"); + }; + if lsn < partitioning_guard.1 { + anyhow::bail!("repartition() called with LSN going backwards, this should not happen"); } + + let distance = lsn.0 - partitioning_guard.1 .0; + if partitioning_guard.1 != Lsn(0) + && distance <= self.repartition_threshold + && !flags.contains(CompactFlags::ForceRepartition) + { + debug!( + distance, + threshold = self.repartition_threshold, + "no repartitioning needed" + ); + return Ok((partitioning_guard.0.clone(), partitioning_guard.1)); + } + let keyspace = self.collect_keyspace(lsn, ctx).await?; let partitioning = keyspace.partition(partition_size); - let mut partitioning_guard = self.partitioning.lock().unwrap(); - if lsn > partitioning_guard.1 { - *partitioning_guard = (partitioning, lsn); - } else { - warn!("Concurrent repartitioning of keyspace. This unexpected, but probably harmless"); - } + *partitioning_guard = (partitioning, lsn); + Ok((partitioning_guard.0.clone(), partitioning_guard.1)) }