Compare commits

...

3 Commits

Author SHA1 Message Date
Alex Chi Z
84ab5e4101 fix clippy warnings
Signed-off-by: Alex Chi Z <chi@neon.tech>
2023-08-04 15:19:41 -04:00
Alex Chi Z
607f341e24 more assertions
Signed-off-by: Alex Chi Z <chi@neon.tech>
2023-08-04 14:55:25 -04:00
Alex Chi Z
2275b74a75 use read lock for compaction
Signed-off-by: Alex Chi Z <chi@neon.tech>
2023-08-04 14:50:55 -04:00
2 changed files with 90 additions and 32 deletions

View File

@@ -3780,7 +3780,7 @@ impl Timeline {
.context("wait for layer upload ops to complete")?;
}
let mut guard = self.layers.write().await;
let guard = self.layers.modify().await;
let mut new_layer_paths = HashMap::with_capacity(new_layers.len());
// In some rare cases, we may generate a file with exactly the same key range / LSN as before the compaction.
@@ -3844,7 +3844,7 @@ impl Timeline {
}
guard
.finish_compact_l0(
.finish_compact_l0_consume_guard(
layer_removal_cs,
remove_layers,
insert_layers,
@@ -3852,8 +3852,6 @@ impl Timeline {
)
.await?;
drop_wlock(guard);
// Also schedule the deletions in remote storage
if let Some(remote_client) = &self.remote_client {
remote_client.schedule_layer_file_deletion(&layer_names_to_delete)?;

View File

@@ -1,8 +1,9 @@
use anyhow::{bail, ensure, Context, Result};
use arc_swap::ArcSwap;
use dashmap::DashMap;
use either::Either;
use std::sync::Arc;
use tracing::trace;
use tracing::{log::warn, trace};
use utils::{
id::{TenantId, TimelineId},
lsn::{AtomicLsn, Lsn},
@@ -55,7 +56,8 @@ pub struct LayerManagerWriteGuard {
layer_manager: Arc<LayerManager>,
/// Mock the behavior of the layer map lock.
#[allow(dead_code)]
pseudo_lock: tokio::sync::OwnedRwLockWriteGuard<()>,
pseudo_lock:
Either<tokio::sync::OwnedRwLockWriteGuard<()>, tokio::sync::OwnedRwLockReadGuard<()>>,
}
impl LayerManager {
@@ -92,7 +94,22 @@ impl LayerManager {
layer_map: self.layer_map.load_full(),
layer_fmgr: Arc::clone(&self.layer_fmgr),
},
pseudo_lock,
pseudo_lock: Either::Left(pseudo_lock),
layer_manager: self.clone(),
}
}
/// Take the snapshot of the layer map and return a write guard. With the `modify` call, the guard
/// will only hold a read lock instead of write lock.
pub async fn modify(self: &Arc<Self>) -> LayerManagerWriteGuard {
// take the lock before taking snapshot
let pseudo_lock = self.pseudo_lock.clone().read_owned().await;
LayerManagerWriteGuard {
snapshot: LayerSnapshot {
layer_map: self.layer_map.load_full(),
layer_fmgr: Arc::clone(&self.layer_fmgr),
},
pseudo_lock: Either::Right(pseudo_lock),
layer_manager: self.clone(),
}
}
@@ -108,7 +125,7 @@ impl LayerManager {
layer_map: self.layer_map.load_full(),
layer_fmgr: Arc::clone(&self.layer_fmgr),
},
pseudo_lock,
pseudo_lock: Either::Left(pseudo_lock),
layer_manager: self.clone(),
})
}
@@ -207,6 +224,11 @@ impl LayerManagerWriteGuard {
on_disk_layers: Vec<Arc<dyn PersistentLayer>>,
next_open_layer_at: Lsn,
) {
assert!(
self.pseudo_lock.is_left(),
"should use `write` guard for this function."
);
self.layer_manager
.initialize_update(|mut layer_map| {
let mut updates = layer_map.batch_update();
@@ -235,6 +257,11 @@ impl LayerManagerWriteGuard {
corrupted_local_layers: Vec<Arc<dyn PersistentLayer>>,
remote_layers: Vec<Arc<RemoteLayer>>,
) {
assert!(
self.pseudo_lock.is_left(),
"should use `write` guard for this function."
);
self.layer_manager
.initialize_update(|mut layer_map| {
let mut updates = layer_map.batch_update();
@@ -261,6 +288,11 @@ impl LayerManagerWriteGuard {
timeline_id: TimelineId,
tenant_id: TenantId,
) -> Result<Arc<InMemoryLayer>> {
assert!(
self.pseudo_lock.is_left(),
"should use `write` guard for this function."
);
ensure!(lsn.is_aligned());
ensure!(
@@ -317,6 +349,11 @@ impl LayerManagerWriteGuard {
Lsn(last_record_lsn): Lsn,
last_freeze_at: &AtomicLsn,
) {
assert!(
self.pseudo_lock.is_left(),
"should use `write` guard for this function."
);
let end_lsn = Lsn(last_record_lsn + 1);
if let Some(open_layer) = &self.snapshot.layer_map.open_layer {
@@ -342,6 +379,10 @@ impl LayerManagerWriteGuard {
/// Add image layers to the layer map, called from `create_image_layers`.
pub(crate) async fn track_new_image_layers(&mut self, image_layers: Vec<ImageLayer>) {
assert!(
self.pseudo_lock.is_left(),
"should use `write` guard for this function."
);
self.layer_manager
.update(|mut layer_map| {
let mut updates: BatchedUpdates<'_> = layer_map.batch_update();
@@ -365,6 +406,10 @@ impl LayerManagerWriteGuard {
delta_layer: Option<DeltaLayer>,
frozen_layer_for_check: &Arc<InMemoryLayer>,
) {
assert!(
self.pseudo_lock.is_left(),
"should use `write` guard for this function."
);
self.layer_manager
.update(|mut layer_map| {
let l = layer_map.frozen_layers.pop_front();
@@ -390,42 +435,53 @@ impl LayerManagerWriteGuard {
}
/// Called when compaction is completed.
pub(crate) async fn finish_compact_l0(
&mut self,
layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
pub(crate) async fn finish_compact_l0_consume_guard(
self,
_layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
compact_from: Vec<Arc<dyn PersistentLayer>>,
compact_to: Vec<Arc<dyn PersistentLayer>>,
metrics: &TimelineMetrics,
) -> Result<()> {
self.layer_manager
assert!(
self.pseudo_lock.is_right(),
"should use `modify` guard for this function."
);
let compact_from = self
.layer_manager
.update(|mut layer_map| {
let mut updates = layer_map.batch_update();
for l in compact_to {
Self::insert_historic_layer(l, &mut updates, &self.snapshot.layer_fmgr);
}
for l in compact_from {
// NB: the layer file identified by descriptor `l` is guaranteed to be present
// in the LayerFileManager because compaction kept holding `layer_removal_cs` the entire
// time, even though we dropped `Timeline::layers` inbetween.
if let Err(e) = Self::delete_historic_layer(
layer_removal_cs.clone(),
l,
&mut updates,
metrics,
&self.snapshot.layer_fmgr,
) {
// If this fails, we will need to return the "partially" modified layer map
// now. Eventually, we should decouple file deletion and layer map updates, so
// that this part can be moved out of the `update` section.
updates.flush();
return Ok((layer_map, Err(e)));
}
for l in &compact_from {
// only remove from the layer map, not from file manager
updates.remove_historic(l.layer_desc().clone());
}
updates.flush();
Ok((layer_map, Ok(())))
Ok((layer_map, Ok::<_, anyhow::Error>(compact_from)))
})
.await
.unwrap() // unwrap the first level error, which is always Ok.
.await??;
drop(self.pseudo_lock);
// acquire the write lock so that all read threads are blocked, and once this lock is acquired,
// new reads will be based on the updated layer map.
let guard = self.layer_manager.pseudo_lock.write().await;
drop(guard);
// now that no one has access to the old layer map, we can safely remove the layers from disk.
for layer in compact_from {
self.snapshot.layer_fmgr.remove(layer.clone());
// NB: the layer file identified by descriptor `l` is guaranteed to be present
// in the LayerFileManager because compaction kept holding `layer_removal_cs` the entire
// time, even though we dropped `Timeline::layers` inbetween.
if !layer.is_remote_layer() {
if let Err(e) = layer.delete_resident_layer_file() {
warn!("Failed to delete resident layer file: {}", e);
} else {
let layer_file_size = layer.file_size();
metrics.resident_physical_size_gauge.sub(layer_file_size);
}
}
}
Ok(())
}
/// Called when garbage collect the timeline. Returns a guard that will apply the updates to the layer map.
@@ -435,6 +491,10 @@ impl LayerManagerWriteGuard {
gc_layers: Vec<Arc<dyn PersistentLayer>>,
metrics: &TimelineMetrics,
) -> Result<()> {
assert!(
self.pseudo_lock.is_left(),
"should use `write` guard for this function."
);
self.layer_manager
.update(|mut layer_map| {
let mut updates = layer_map.batch_update();