pgserver: move mapping logic to layer cache

Signed-off-by: Alex Chi <iskyzh@gmail.com>
This commit is contained in:
Alex Chi
2023-06-14 14:58:50 -04:00
parent fc190a2a19
commit a2056666ae
5 changed files with 204 additions and 161 deletions

View File

@@ -85,6 +85,7 @@ pub mod blob_io;
pub mod block_io;
pub mod disk_btree;
pub(crate) mod ephemeral_file;
pub mod layer_cache;
pub mod layer_map;
pub mod manifest;
@@ -1559,7 +1560,7 @@ impl Tenant {
// No timeout here, GC & Compaction should be responsive to the
// `TimelineState::Stopping` change.
info!("waiting for layer_removal_cs.lock()");
let layer_removal_guard = timeline.layer_removal_cs.lock().await;
let layer_removal_guard = timeline.lcache.delete_guard().await;
info!("got layer_removal_cs.lock(), deleting layer files");
// NB: storage_sync upload tasks that reference these layers have been cancelled

View File

@@ -0,0 +1,143 @@
use super::storage_layer::{PersistentLayer, PersistentLayerDesc, PersistentLayerKey, RemoteLayer};
use super::Timeline;
use crate::tenant::layer_map::{self, LayerMap};
use anyhow::Result;
use std::sync::{Mutex, Weak};
use std::{collections::HashMap, sync::Arc};
pub struct LayerCache {
/// Layer removal lock.
/// A lock to ensure that no layer of the timeline is removed concurrently by other tasks.
/// This lock is acquired in [`Timeline::gc`], [`Timeline::compact`],
/// and [`Tenant::delete_timeline`]. This is an `Arc<Mutex>` lock because we need an owned
/// lock guard in functions that will be spawned to tokio I/O pool (which requires `'static`).
pub layers_removal_lock: Arc<tokio::sync::Mutex<()>>,
/// We need this lock b/c we do not have any way to prevent GC/compaction from removing files in-use.
/// We need to do reference counting on Arc to prevent this from happening, and we can safely remove this lock.
pub layers_operation_lock: Arc<tokio::sync::RwLock<()>>,
/// Will be useful when we move evict / download to layer cache.
#[allow(unused)]
timeline: Weak<Timeline>,
mapping: Mutex<HashMap<PersistentLayerKey, Arc<dyn PersistentLayer>>>,
}
pub struct LayerInUseWrite(tokio::sync::OwnedRwLockWriteGuard<()>);
pub struct LayerInUseRead(tokio::sync::OwnedRwLockReadGuard<()>);
#[derive(Clone)]
pub struct DeleteGuard(Arc<tokio::sync::OwnedMutexGuard<()>>);
impl LayerCache {
pub fn new(timeline: Weak<Timeline>) -> Self {
Self {
layers_operation_lock: Arc::new(tokio::sync::RwLock::new(())),
layers_removal_lock: Arc::new(tokio::sync::Mutex::new(())),
mapping: Mutex::new(HashMap::new()),
timeline,
}
}
pub fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc<dyn PersistentLayer> {
let guard = self.mapping.lock().unwrap();
guard.get(&desc.key()).expect("not found").clone()
}
/// This function is to mock the original behavior of `layers` lock in `Timeline`. Can be removed after we ensure
/// we won't delete files that are being read.
pub async fn layer_in_use_write(&self) -> LayerInUseWrite {
LayerInUseWrite(self.layers_operation_lock.clone().write_owned().await)
}
/// This function is to mock the original behavior of `layers` lock in `Timeline`. Can be removed after we ensure
/// we won't delete files that are being read.
pub async fn layer_in_use_read(&self) -> LayerInUseRead {
LayerInUseRead(self.layers_operation_lock.clone().read_owned().await)
}
/// Ensures only one of compaction / gc can happen at a time.
pub async fn delete_guard(&self) -> DeleteGuard {
DeleteGuard(Arc::new(
self.layers_removal_lock.clone().lock_owned().await,
))
}
/// Should only be called when initializing the timeline. Bypass checks and layer operation lock.
pub fn remove_local_when_init(&self, layer: Arc<dyn PersistentLayer>) {
let mut guard = self.mapping.lock().unwrap();
guard.remove(&layer.layer_desc().key());
}
/// Should only be called when initializing the timeline. Bypass checks and layer operation lock.
pub fn populate_remote_when_init(&self, layer: Arc<RemoteLayer>) {
let mut guard = self.mapping.lock().unwrap();
guard.insert(layer.layer_desc().key(), layer);
}
/// Should only be called when initializing the timeline. Bypass checks and layer operation lock.
pub fn populate_local_when_init(&self, layer: Arc<dyn PersistentLayer>) {
let mut guard = self.mapping.lock().unwrap();
guard.insert(layer.layer_desc().key(), layer);
}
/// Called within read path.
pub fn replace_and_verify(
&self,
expected: Arc<dyn PersistentLayer>,
new: Arc<dyn PersistentLayer>,
) -> Result<()> {
let mut guard = self.mapping.lock().unwrap();
use super::layer_map::LayerKey;
let key = LayerKey::from(&*expected);
let other = LayerKey::from(&*new);
let expected_l0 = LayerMap::is_l0(expected.layer_desc());
let new_l0 = LayerMap::is_l0(new.layer_desc());
fail::fail_point!("layermap-replace-notfound", |_| anyhow::bail!(
"replacing downloaded layer into layermap failed because layer was not found"
));
anyhow::ensure!(
key == other,
"replacing downloaded layer into layermap failed because two layers have different keys: {key:?} != {other:?}"
);
anyhow::ensure!(
expected_l0 == new_l0,
"replacing downloaded layer into layermap failed because one layer is l0 while the other is not: {expected_l0} != {new_l0}"
);
if let Some(layer) = guard.get_mut(&expected.layer_desc().key()) {
anyhow::ensure!(
layer_map::compare_arced_layers(&expected, layer),
"replacing downloaded layer into layermap failed because another layer was found instead of expected, expected={expected:?}, new={new:?}",
expected = Arc::as_ptr(&expected),
new = Arc::as_ptr(layer),
);
*layer = new;
Ok(())
} else {
anyhow::bail!(
"replacing downloaded layer into layermap failed because layer was not found"
);
}
}
/// Called within write path. When compaction and image layer creation we will create new layers.
pub fn create_new_layer(&self, layer: Arc<dyn PersistentLayer>) {
let mut guard = self.mapping.lock().unwrap();
guard.insert(layer.layer_desc().key(), layer);
}
/// Called within write path. When GC and compaction we will remove layers and delete them on disk.
/// Will move logic to delete files here later.
pub fn delete_layer(&self, layer: Arc<dyn PersistentLayer>) {
let mut guard = self.mapping.lock().unwrap();
guard.remove(&layer.layer_desc().key());
}
}

View File

@@ -686,10 +686,7 @@ mod tests {
mod l0_delta_layers_updated {
use crate::tenant::{
storage_layer::{PersistentLayer, PersistentLayerDesc},
timeline::LayerMapping,
};
use crate::tenant::storage_layer::{PersistentLayer, PersistentLayerDesc};
use super::*;
@@ -722,31 +719,6 @@ mod tests {
)
}
#[test]
fn replacing_missing_l0_is_notfound() {
// original impl had an oversight, and L0 was an anyhow::Error. anyhow::Error should
// however only happen for precondition failures.
let layer = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000053423C21-0000000053424D69";
let layer = LayerFileName::from_str(layer).unwrap();
let layer = LayerDescriptor::from(layer);
// same skeletan construction; see scenario below
let not_found = Arc::new(layer.clone());
let new_version = Arc::new(layer);
// after the immutable storage state refactor, the replace operation
// will not use layer map any more. We keep it here for consistency in test cases
// and can remove it in the future.
let _map = LayerMap::default();
let mut mapping = LayerMapping::new();
mapping
.replace_and_verify(not_found, new_version)
.unwrap_err();
}
fn l0_delta_layers_updated_scenario(layer_name: &str, expected_l0: bool) {
let name = LayerFileName::from_str(layer_name).unwrap();
let skeleton = LayerDescriptor::from(name);
@@ -755,7 +727,6 @@ mod tests {
let downloaded = Arc::new(skeleton);
let mut map = LayerMap::default();
let mut mapping = LayerMapping::new();
// two disjoint Arcs in different lifecycle phases. even if it seems they must be the
// same layer, we use LayerMap::compare_arced_layers as the identity of layers.
@@ -765,20 +736,11 @@ mod tests {
map.batch_update()
.insert_historic(remote.layer_desc().clone());
mapping.insert(remote.clone());
assert_eq!(
count_layer_in(&map, remote.layer_desc()),
expected_in_counts
);
mapping
.replace_and_verify(remote, downloaded.clone())
.expect("name derived attributes are the same");
assert_eq!(
count_layer_in(&map, downloaded.layer_desc()),
expected_in_counts
);
map.batch_update()
.remove_historic(downloaded.layer_desc().clone());
assert_eq!(count_layer_in(&map, downloaded.layer_desc()), (0, 0));

View File

@@ -79,11 +79,12 @@ use self::eviction_task::EvictionTaskTimelineState;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::config::TenantConf;
use super::layer_cache::{DeleteGuard, LayerCache};
use super::layer_map::BatchedUpdates;
use super::remote_timeline_client::index::IndexPart;
use super::remote_timeline_client::RemoteTimelineClient;
use super::storage_layer::{
DeltaLayer, ImageLayer, Layer, LayerAccessStatsReset, PersistentLayerDesc, PersistentLayerKey,
DeltaLayer, ImageLayer, Layer, LayerAccessStatsReset, PersistentLayerDesc,
};
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
@@ -117,65 +118,11 @@ impl PartialOrd for Hole {
}
}
pub struct LayerMapping(HashMap<PersistentLayerKey, Arc<dyn PersistentLayer>>);
pub struct LayerMapping(());
impl LayerMapping {
fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc<dyn PersistentLayer> {
self.0.get(&desc.key()).expect("not found").clone()
}
pub(crate) fn insert(&mut self, layer: Arc<dyn PersistentLayer>) {
self.0.insert(layer.layer_desc().key(), layer);
}
pub(crate) fn new() -> Self {
Self(HashMap::new())
}
pub(crate) fn remove(&mut self, layer: Arc<dyn PersistentLayer>) {
self.0.remove(&layer.layer_desc().key());
}
pub(crate) fn replace_and_verify(
&mut self,
expected: Arc<dyn PersistentLayer>,
new: Arc<dyn PersistentLayer>,
) -> Result<()> {
use super::layer_map::LayerKey;
let key = LayerKey::from(&*expected);
let other = LayerKey::from(&*new);
let expected_l0 = LayerMap::is_l0(expected.layer_desc());
let new_l0 = LayerMap::is_l0(new.layer_desc());
fail::fail_point!("layermap-replace-notfound", |_| anyhow::bail!(
"replacing downloaded layer into layermap failed because layer was not found"
));
anyhow::ensure!(
key == other,
"replacing downloaded layer into layermap failed because two layers have different keys: {key:?} != {other:?}"
);
anyhow::ensure!(
expected_l0 == new_l0,
"replacing downloaded layer into layermap failed because one layer is l0 while the other is not: {expected_l0} != {new_l0}"
);
if let Some(layer) = self.0.get_mut(&expected.layer_desc().key()) {
anyhow::ensure!(
layer_map::compare_arced_layers(&expected, layer),
"replacing downloaded layer into layermap failed because another layer was found instead of expected, expected={expected:?}, new={new:?}",
expected = Arc::as_ptr(&expected),
new = Arc::as_ptr(layer),
);
*layer = new;
Ok(())
} else {
anyhow::bail!(
"replacing downloaded layer into layermap failed because layer was not found"
);
}
Self(())
}
}
@@ -192,7 +139,7 @@ fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
}
pub struct Timeline {
conf: &'static PageServerConf,
pub(super) conf: &'static PageServerConf,
tenant_conf: Arc<RwLock<TenantConfOpt>>,
myself: Weak<Self>,
@@ -204,6 +151,8 @@ pub struct Timeline {
pub(crate) layers: tokio::sync::RwLock<(LayerMap, LayerMapping)>,
pub(super) lcache: LayerCache,
/// Set of key ranges which should be covered by image layers to
/// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
/// It is used by compaction task when it checks if new image layer should be created.
@@ -275,13 +224,6 @@ pub struct Timeline {
/// to be notified when layer flushing has finished, subscribe to the layer_flush_done channel
layer_flush_done_tx: tokio::sync::watch::Sender<(u64, anyhow::Result<()>)>,
/// Layer removal lock.
/// A lock to ensure that no layer of the timeline is removed concurrently by other tasks.
/// This lock is acquired in [`Timeline::gc`], [`Timeline::compact`],
/// and [`Tenant::delete_timeline`]. This is an `Arc<Mutex>` lock because we need an owned
/// lock guard in functions that will be spawned to tokio I/O pool (which requires `'static`).
pub(super) layer_removal_cs: Arc<tokio::sync::Mutex<()>>,
// Needed to ensure that we can't create a branch at a point that was already garbage collected
pub latest_gc_cutoff_lsn: Rcu<Lsn>,
@@ -896,7 +838,7 @@ impl Timeline {
// Below are functions compact_level0() and create_image_layers()
// but they are a bit ad hoc and don't quite work like it's explained
// above. Rewrite it.
let layer_removal_cs = Arc::new(self.layer_removal_cs.clone().lock_owned().await);
let layer_removal_cs = self.lcache.delete_guard().await;
// Is the timeline being deleted?
if self.is_stopping() {
return Err(anyhow::anyhow!("timeline is Stopping").into());
@@ -1119,7 +1061,7 @@ impl Timeline {
pub async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
let guard = self.layers.read().await;
let (layer_map, mapping) = &*guard;
let (layer_map, _) = &*guard;
let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
if let Some(open_layer) = &layer_map.open_layer {
in_memory_layers.push(open_layer.info());
@@ -1130,7 +1072,7 @@ impl Timeline {
let mut historic_layers = Vec::new();
for historic_layer in layer_map.iter_historic_layers() {
let historic_layer = mapping.get_from_desc(&historic_layer);
let historic_layer = self.lcache.get_from_desc(&historic_layer);
historic_layers.push(historic_layer.info(reset));
}
@@ -1228,7 +1170,7 @@ impl Timeline {
.context("wait for layer upload ops to complete")?;
// now lock out layer removal (compaction, gc, timeline deletion)
let layer_removal_guard = self.layer_removal_cs.lock().await;
let layer_removal_guard = self.lcache.delete_guard().await;
{
// to avoid racing with detach and delete_timeline
@@ -1241,7 +1183,7 @@ impl Timeline {
// start the batch update
let mut guard = self.layers.write().await;
let (layer_map, mapping) = &mut *guard;
let (layer_map, _) = &mut *guard;
let mut batch_updates = layer_map.batch_update();
let mut results = Vec::with_capacity(layers_to_evict.len());
@@ -1250,12 +1192,7 @@ impl Timeline {
let res = if cancel.is_cancelled() {
None
} else {
Some(self.evict_layer_batch_impl(
&layer_removal_guard,
l,
&mut batch_updates,
mapping,
))
Some(self.evict_layer_batch_impl(&layer_removal_guard, l, &mut batch_updates))
};
results.push(res);
}
@@ -1271,10 +1208,9 @@ impl Timeline {
fn evict_layer_batch_impl(
&self,
_layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
_layer_removal_cs: &DeleteGuard,
local_layer: &Arc<dyn PersistentLayer>,
batch_updates: &mut BatchedUpdates<'_>,
mapping: &mut LayerMapping,
) -> anyhow::Result<bool> {
if local_layer.is_remote_layer() {
// TODO(issue #3851): consider returning an err here instead of false,
@@ -1325,7 +1261,10 @@ impl Timeline {
assert_eq!(local_layer.layer_desc(), new_remote_layer.layer_desc());
let succeed = match mapping.replace_and_verify(local_layer.clone(), new_remote_layer) {
let succeed = match self
.lcache
.replace_and_verify(local_layer.clone(), new_remote_layer)
{
Ok(()) => {
if let Err(e) = local_layer.delete_resident_layer_file() {
error!("failed to remove layer file on evict after replacement: {e:#?}");
@@ -1494,6 +1433,7 @@ impl Timeline {
tenant_id,
pg_version,
layers: tokio::sync::RwLock::new((LayerMap::default(), LayerMapping::new())),
lcache: LayerCache::new(myself.clone()),
wanted_image_layers: Mutex::new(None),
walredo_mgr,
@@ -1529,7 +1469,6 @@ impl Timeline {
layer_flush_done_tx,
write_lock: tokio::sync::Mutex::new(()),
layer_removal_cs: Default::default(),
gc_info: std::sync::RwLock::new(GcInfo {
retain_lsns: Vec::new(),
@@ -1685,7 +1624,7 @@ impl Timeline {
///
pub(super) async fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
let mut guard = self.layers.write().await;
let (layers, mapping) = &mut *guard;
let (layers, _) = &mut *guard;
let mut updates = layers.batch_update();
let mut num_layers = 0;
@@ -1729,7 +1668,7 @@ impl Timeline {
trace!("found layer {}", layer.path().display());
total_physical_size += file_size;
updates.insert_historic(layer.layer_desc().clone());
mapping.insert(Arc::new(layer));
self.lcache.populate_local_when_init(Arc::new(layer));
num_layers += 1;
} else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) {
// Create a DeltaLayer struct for each delta file.
@@ -1762,7 +1701,7 @@ impl Timeline {
trace!("found layer {}", layer.path().display());
total_physical_size += file_size;
updates.insert_historic(layer.layer_desc().clone());
mapping.insert(Arc::new(layer));
self.lcache.populate_local_when_init(Arc::new(layer));
num_layers += 1;
} else if fname == METADATA_FILE_NAME || fname.ends_with(".old") {
// ignore these
@@ -1817,7 +1756,7 @@ impl Timeline {
// We're holding a layer map lock for a while but this
// method is only called during init so it's fine.
let mut guard = self.layers.write().await;
let (layer_map, mapping) = &mut *guard;
let (layer_map, _) = &mut *guard;
let mut updates = layer_map.batch_update();
for remote_layer_name in &index_part.timeline_layers {
let local_layer = local_only_layers.remove(remote_layer_name);
@@ -1863,7 +1802,7 @@ impl Timeline {
} else {
self.metrics.resident_physical_size_gauge.sub(local_size);
updates.remove_historic(local_layer.layer_desc().clone());
mapping.remove(local_layer);
self.lcache.remove_local_when_init(local_layer);
// fall-through to adding the remote layer
}
} else {
@@ -1903,7 +1842,7 @@ impl Timeline {
let remote_layer = Arc::new(remote_layer);
updates.insert_historic(remote_layer.layer_desc().clone());
mapping.insert(remote_layer);
self.lcache.populate_remote_when_init(remote_layer);
}
LayerFileName::Delta(deltafilename) => {
// Create a RemoteLayer for the delta file.
@@ -1931,7 +1870,7 @@ impl Timeline {
);
let remote_layer = Arc::new(remote_layer);
updates.insert_historic(remote_layer.layer_desc().clone());
mapping.insert(remote_layer);
self.lcache.populate_remote_when_init(remote_layer);
}
}
}
@@ -1972,10 +1911,10 @@ impl Timeline {
let local_layers = {
let guard = self.layers.read().await;
let (layers, mapping) = &*guard;
let (layers, _) = &*guard;
layers
.iter_historic_layers()
.map(|l| (l.filename(), mapping.get_from_desc(&l)))
.map(|l| (l.filename(), self.lcache.get_from_desc(&l)))
.collect::<HashMap<_, _>>()
};
@@ -2349,11 +2288,11 @@ impl Timeline {
async fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
let guard = self.layers.read().await;
let (layers, mapping) = &*guard;
let (layers, _) = &*guard;
for historic_layer in layers.iter_historic_layers() {
let historic_layer_name = historic_layer.filename().file_name();
if layer_file_name == historic_layer_name {
return Some(mapping.get_from_desc(&historic_layer));
return Some(self.lcache.get_from_desc(&historic_layer));
}
}
@@ -2365,12 +2304,11 @@ impl Timeline {
fn delete_historic_layer(
&self,
// we cannot remove layers otherwise, since gc and compaction will race
_layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
_layer_removal_cs: DeleteGuard,
layer: Arc<PersistentLayerDesc>,
updates: &mut BatchedUpdates<'_>,
mapping: &mut LayerMapping,
) -> anyhow::Result<()> {
let layer = mapping.get_from_desc(&layer);
let layer = self.lcache.get_from_desc(&layer);
if !layer.is_remote_layer() {
layer.delete_resident_layer_file()?;
let layer_file_size = layer.file_size();
@@ -2385,7 +2323,7 @@ impl Timeline {
// and mark what we can't delete yet as deleted from the layer
// map index without actually rebuilding the index.
updates.remove_historic(layer.layer_desc().clone());
mapping.remove(layer);
self.lcache.delete_layer(layer);
Ok(())
}
@@ -2571,7 +2509,7 @@ impl Timeline {
'layer_map_search: loop {
let remote_layer = {
let guard = timeline.layers.read().await;
let (layers, mapping) = &*guard;
let (layers, _) = &*guard;
// Check the open and frozen in-memory layers first, in order from newest
// to oldest.
@@ -2633,7 +2571,7 @@ impl Timeline {
}
if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
let layer = mapping.get_from_desc(&layer);
let layer = timeline.lcache.get_from_desc(&layer);
// If it's a remote layer, download it and retry.
if let Some(remote_layer) =
super::storage_layer::downcast_remote_layer(&layer)
@@ -3136,7 +3074,7 @@ impl Timeline {
// Add it to the layer map
let l = Arc::new(new_delta);
let mut guard = self.layers.write().await;
let (layers, mapping) = &mut *guard;
let (layers, _) = &mut *guard;
let mut batch_updates = layers.batch_update();
l.access_stats().record_residence_event(
&batch_updates,
@@ -3144,7 +3082,7 @@ impl Timeline {
LayerResidenceEventReason::LayerCreate,
);
batch_updates.insert_historic(l.layer_desc().clone());
mapping.insert(l);
self.lcache.create_new_layer(l);
batch_updates.flush();
// update the timeline's physical size
@@ -3374,7 +3312,7 @@ impl Timeline {
let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len());
let mut guard = self.layers.write().await;
let (layers, mapping) = &mut *guard;
let (layers, _) = &mut *guard;
let mut updates = layers.batch_update();
let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id);
@@ -3397,7 +3335,7 @@ impl Timeline {
LayerResidenceEventReason::LayerCreate,
);
updates.insert_historic(l.layer_desc().clone());
mapping.insert(l);
self.lcache.create_new_layer(l);
}
updates.flush();
drop_wlock(guard);
@@ -3439,12 +3377,12 @@ impl Timeline {
/// start of level0 files compaction, the on-demand download should be revisited as well.
async fn compact_level0_phase1(
&self,
_layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
_layer_removal_cs: DeleteGuard,
target_file_size: u64,
ctx: &RequestContext,
) -> Result<CompactLevel0Phase1Result, CompactionError> {
let guard = self.layers.read().await;
let (layers, mapping) = &*guard;
let (layers, _) = &*guard;
let mut level0_deltas = layers.get_level0_deltas()?;
// Only compact if enough layers have accumulated.
@@ -3492,7 +3430,7 @@ impl Timeline {
let remotes = deltas_to_compact
.iter()
.map(|l| mapping.get_from_desc(l))
.map(|l| self.lcache.get_from_desc(l))
.filter(|l| l.is_remote_layer())
.inspect(|l| info!("compact requires download of {}", l.filename().file_name()))
.map(|l| {
@@ -3504,7 +3442,7 @@ impl Timeline {
let deltas_to_compact_layers = deltas_to_compact
.iter()
.map(|l| mapping.get_from_desc(l))
.map(|l| self.lcache.get_from_desc(l))
.collect_vec();
drop_rlock(guard);
@@ -3785,7 +3723,7 @@ impl Timeline {
///
async fn compact_level0(
self: &Arc<Self>,
layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
layer_removal_cs: DeleteGuard,
target_file_size: u64,
ctx: &RequestContext,
) -> Result<(), CompactionError> {
@@ -3813,7 +3751,7 @@ impl Timeline {
}
let mut guard = self.layers.write().await;
let (layers, mapping) = &mut *guard;
let (layers, _) = &mut *guard;
let mut updates = layers.batch_update();
let mut new_layer_paths = HashMap::with_capacity(new_layers.len());
for l in new_layers {
@@ -3846,7 +3784,7 @@ impl Timeline {
LayerResidenceEventReason::LayerCreate,
);
updates.insert_historic(x.layer_desc().clone());
mapping.insert(x);
self.lcache.create_new_layer(x);
}
// Now that we have reshuffled the data to set of new delta layers, we can
@@ -3854,7 +3792,7 @@ impl Timeline {
let mut layer_names_to_delete = Vec::with_capacity(deltas_to_compact.len());
for l in deltas_to_compact {
layer_names_to_delete.push(l.filename());
self.delete_historic_layer(layer_removal_cs.clone(), l, &mut updates, mapping)?;
self.delete_historic_layer(layer_removal_cs.clone(), l, &mut updates)?;
}
updates.flush();
drop_wlock(guard);
@@ -3974,7 +3912,7 @@ impl Timeline {
fail_point!("before-timeline-gc");
let layer_removal_cs = Arc::new(self.layer_removal_cs.clone().lock_owned().await);
let layer_removal_cs = self.lcache.delete_guard().await;
// Is the timeline being deleted?
if self.is_stopping() {
anyhow::bail!("timeline is Stopping");
@@ -4012,7 +3950,7 @@ impl Timeline {
async fn gc_timeline(
&self,
layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
layer_removal_cs: DeleteGuard,
horizon_cutoff: Lsn,
pitr_cutoff: Lsn,
retain_lsns: Vec<Lsn>,
@@ -4074,7 +4012,7 @@ impl Timeline {
//
// TODO holding a write lock is too agressive and avoidable
let mut guard = self.layers.write().await;
let (layers, mapping) = &mut *guard;
let (layers, _) = &mut *guard;
'outer: for l in layers.iter_historic_layers() {
result.layers_total += 1;
@@ -4190,7 +4128,6 @@ impl Timeline {
layer_removal_cs.clone(),
doomed_layer,
&mut updates,
mapping,
)?; // FIXME: schedule succeeded deletions before returning?
result.layers_removed += 1;
}
@@ -4376,13 +4313,13 @@ impl Timeline {
// Download complete. Replace the RemoteLayer with the corresponding
// Delta- or ImageLayer in the layer map.
let mut guard = self_clone.layers.write().await;
let (layers, mapping) = &mut *guard;
let (layers, _) = &mut *guard;
let updates = layers.batch_update();
let new_layer =
remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size);
{
let l: Arc<dyn PersistentLayer> = remote_layer.clone();
let failure = match mapping.replace_and_verify(l, new_layer) {
let failure = match self_clone.lcache.replace_and_verify(l, new_layer) {
Ok(()) => false,
Err(e) => {
// this is a precondition failure, the layer filename derived
@@ -4511,10 +4448,10 @@ impl Timeline {
let mut downloads = Vec::new();
{
let guard = self.layers.read().await;
let (layers, mapping) = &*guard;
let (layers, _) = &*guard;
layers
.iter_historic_layers()
.map(|l| mapping.get_from_desc(&l))
.map(|l| self.lcache.get_from_desc(&l))
.filter_map(|l| l.downcast_remote_layer())
.map(|l| self.download_remote_layer(l))
.for_each(|dl| downloads.push(dl))
@@ -4616,7 +4553,7 @@ impl LocalLayerInfoForDiskUsageEviction {
impl Timeline {
pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
let guard = self.layers.read().await;
let (layers, mapping) = &*guard;
let (layers, _) = &*guard;
let mut max_layer_size: Option<u64> = None;
let mut resident_layers = Vec::new();
@@ -4625,7 +4562,7 @@ impl Timeline {
let file_size = l.file_size();
max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
let l = mapping.get_from_desc(&l);
let l = self.lcache.get_from_desc(&l);
if l.is_remote_layer() {
continue;

View File

@@ -198,10 +198,10 @@ impl Timeline {
// So, we just need to deal with this.
let candidates: Vec<Arc<dyn PersistentLayer>> = {
let guard = self.layers.read().await;
let (layers, mapping) = &*guard;
let (layers, _) = &*guard;
let mut candidates = Vec::new();
for hist_layer in layers.iter_historic_layers() {
let hist_layer = mapping.get_from_desc(&hist_layer);
let hist_layer = self.lcache.get_from_desc(&hist_layer);
if hist_layer.is_remote_layer() {
continue;
}