mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-27 18:10:37 +00:00
semantic layer map operations (#4618)
## Problem ref https://github.com/neondatabase/neon/issues/4373 ## Summary of changes A step towards immutable layer map. I decided to finish the refactor with this new approach and apply https://github.com/neondatabase/neon/pull/4455 on this patch later. In this PR, we moved all modifications of the layer map to one place with semantic operations like `initialize_local_layers`, `finish_compact_l0`, `finish_gc_timeline`, etc, which is now part of `LayerManager`. This makes it easier to build new features upon this PR: * For immutable storage state refactor, we can simply replace the layer map with `ArcSwap<LayerMap>` and remove the `layers` lock. Moving towards it requires us to put all layer map changes in a single place as in https://github.com/neondatabase/neon/pull/4455. * For manifest, we can write to manifest in each of the semantic functions. --------- Signed-off-by: Alex Chi Z <chi@neon.tech> Co-authored-by: Christian Schwarz <christian@neon.tech>
This commit is contained in:
@@ -429,7 +429,7 @@ impl Tenant {
|
||||
.layers
|
||||
.read()
|
||||
.await
|
||||
.0
|
||||
.layer_map()
|
||||
.iter_historic_layers()
|
||||
.next()
|
||||
.is_some(),
|
||||
|
||||
@@ -659,7 +659,7 @@ mod tests {
|
||||
|
||||
use crate::tenant::{
|
||||
storage_layer::{AsLayerDesc, PersistentLayerDesc},
|
||||
timeline::LayerFileManager,
|
||||
timeline::layer_manager::LayerFileManager,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
|
||||
@@ -41,7 +41,7 @@ pub use inmemory_layer::InMemoryLayer;
|
||||
pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey};
|
||||
pub use remote_layer::RemoteLayer;
|
||||
|
||||
use super::layer_map::BatchedUpdates;
|
||||
use super::timeline::layer_manager::LayerManager;
|
||||
|
||||
pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
|
||||
where
|
||||
@@ -170,7 +170,7 @@ impl LayerAccessStats {
|
||||
///
|
||||
/// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
|
||||
pub(crate) fn for_loading_layer(
|
||||
layer_map_lock_held_witness: &BatchedUpdates<'_>,
|
||||
layer_map_lock_held_witness: &LayerManager,
|
||||
status: LayerResidenceStatus,
|
||||
) -> Self {
|
||||
let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default()));
|
||||
@@ -189,7 +189,7 @@ impl LayerAccessStats {
|
||||
/// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
|
||||
pub(crate) fn clone_for_residence_change(
|
||||
&self,
|
||||
layer_map_lock_held_witness: &BatchedUpdates<'_>,
|
||||
layer_map_lock_held_witness: &LayerManager,
|
||||
new_status: LayerResidenceStatus,
|
||||
) -> LayerAccessStats {
|
||||
let clone = {
|
||||
@@ -221,7 +221,7 @@ impl LayerAccessStats {
|
||||
///
|
||||
pub(crate) fn record_residence_event(
|
||||
&self,
|
||||
_layer_map_lock_held_witness: &BatchedUpdates<'_>,
|
||||
_layer_map_lock_held_witness: &LayerManager,
|
||||
status: LayerResidenceStatus,
|
||||
reason: LayerResidenceEventReason,
|
||||
) {
|
||||
|
||||
@@ -4,9 +4,9 @@
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::RequestContext;
|
||||
use crate::repository::Key;
|
||||
use crate::tenant::layer_map::BatchedUpdates;
|
||||
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
|
||||
use crate::tenant::timeline::layer_manager::LayerManager;
|
||||
use anyhow::{bail, Result};
|
||||
use pageserver_api::models::HistoricLayerInfo;
|
||||
use std::ops::Range;
|
||||
@@ -224,7 +224,7 @@ impl RemoteLayer {
|
||||
/// Create a Layer struct representing this layer, after it has been downloaded.
|
||||
pub fn create_downloaded_layer(
|
||||
&self,
|
||||
layer_map_lock_held_witness: &BatchedUpdates<'_>,
|
||||
layer_map_lock_held_witness: &LayerManager,
|
||||
conf: &'static PageServerConf,
|
||||
file_size: u64,
|
||||
) -> Arc<dyn PersistentLayer> {
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
//!
|
||||
|
||||
mod eviction_task;
|
||||
pub mod layer_manager;
|
||||
mod logical_size;
|
||||
pub mod span;
|
||||
pub mod uninit;
|
||||
@@ -82,16 +81,15 @@ use crate::{is_temporary, task_mgr};
|
||||
|
||||
pub(super) use self::eviction_task::EvictionTaskTenantState;
|
||||
use self::eviction_task::EvictionTaskTimelineState;
|
||||
use self::layer_manager::LayerManager;
|
||||
use self::logical_size::LogicalSize;
|
||||
use self::walreceiver::{WalReceiver, WalReceiverConf};
|
||||
|
||||
use super::config::TenantConf;
|
||||
use super::layer_map::BatchedUpdates;
|
||||
use super::remote_timeline_client::index::IndexPart;
|
||||
use super::remote_timeline_client::RemoteTimelineClient;
|
||||
use super::storage_layer::{
|
||||
AsLayerDesc, DeltaLayer, ImageLayer, Layer, LayerAccessStatsReset, PersistentLayerDesc,
|
||||
PersistentLayerKey,
|
||||
};
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
@@ -125,78 +123,6 @@ impl PartialOrd for Hole {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct LayerFileManager<T: AsLayerDesc + ?Sized = dyn PersistentLayer>(
|
||||
HashMap<PersistentLayerKey, Arc<T>>,
|
||||
);
|
||||
|
||||
impl<T: AsLayerDesc + ?Sized> LayerFileManager<T> {
|
||||
fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc<T> {
|
||||
// The assumption for the `expect()` is that all code maintains the following invariant:
|
||||
// A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
|
||||
self.0
|
||||
.get(&desc.key())
|
||||
.with_context(|| format!("get layer from desc: {}", desc.filename()))
|
||||
.expect("not found")
|
||||
.clone()
|
||||
}
|
||||
|
||||
pub(crate) fn insert(&mut self, layer: Arc<T>) {
|
||||
let present = self.0.insert(layer.layer_desc().key(), layer.clone());
|
||||
if present.is_some() && cfg!(debug_assertions) {
|
||||
panic!("overwriting a layer: {:?}", layer.layer_desc())
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn new() -> Self {
|
||||
Self(HashMap::new())
|
||||
}
|
||||
|
||||
pub(crate) fn remove(&mut self, layer: Arc<T>) {
|
||||
let present = self.0.remove(&layer.layer_desc().key());
|
||||
if present.is_none() && cfg!(debug_assertions) {
|
||||
panic!(
|
||||
"removing layer that is not present in layer mapping: {:?}",
|
||||
layer.layer_desc()
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn replace_and_verify(&mut self, expected: Arc<T>, new: Arc<T>) -> Result<()> {
|
||||
let key = expected.layer_desc().key();
|
||||
let other = new.layer_desc().key();
|
||||
|
||||
let expected_l0 = LayerMap::is_l0(expected.layer_desc());
|
||||
let new_l0 = LayerMap::is_l0(new.layer_desc());
|
||||
|
||||
fail::fail_point!("layermap-replace-notfound", |_| anyhow::bail!(
|
||||
"layermap-replace-notfound"
|
||||
));
|
||||
|
||||
anyhow::ensure!(
|
||||
key == other,
|
||||
"expected and new layer have different keys: {key:?} != {other:?}"
|
||||
);
|
||||
|
||||
anyhow::ensure!(
|
||||
expected_l0 == new_l0,
|
||||
"one layer is l0 while the other is not: {expected_l0} != {new_l0}"
|
||||
);
|
||||
|
||||
if let Some(layer) = self.0.get_mut(&expected.layer_desc().key()) {
|
||||
anyhow::ensure!(
|
||||
compare_arced_layers(&expected, layer),
|
||||
"another layer was found instead of expected, expected={expected:?}, new={new:?}",
|
||||
expected = Arc::as_ptr(&expected),
|
||||
new = Arc::as_ptr(layer),
|
||||
);
|
||||
*layer = new;
|
||||
Ok(())
|
||||
} else {
|
||||
anyhow::bail!("layer was not found");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
|
||||
/// Can be removed after all refactors are done.
|
||||
fn drop_rlock<T>(rlock: tokio::sync::OwnedRwLockReadGuard<T>) {
|
||||
@@ -236,7 +162,7 @@ pub struct Timeline {
|
||||
///
|
||||
/// In the future, we'll be able to split up the tuple of LayerMap and `LayerFileManager`,
|
||||
/// so that e.g. on-demand-download/eviction, and layer spreading, can operate just on `LayerFileManager`.
|
||||
pub(crate) layers: Arc<tokio::sync::RwLock<(LayerMap, LayerFileManager)>>,
|
||||
pub(crate) layers: Arc<tokio::sync::RwLock<LayerManager>>,
|
||||
|
||||
/// Set of key ranges which should be covered by image layers to
|
||||
/// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
|
||||
@@ -587,7 +513,7 @@ impl Timeline {
|
||||
/// Hence, the result **does not represent local filesystem usage**.
|
||||
pub async fn layer_size_sum(&self) -> u64 {
|
||||
let guard = self.layers.read().await;
|
||||
let (layer_map, _) = &*guard;
|
||||
let layer_map = guard.layer_map();
|
||||
let mut size = 0;
|
||||
for l in layer_map.iter_historic_layers() {
|
||||
size += l.file_size();
|
||||
@@ -898,7 +824,7 @@ impl Timeline {
|
||||
let last_lsn = self.get_last_record_lsn();
|
||||
let open_layer_size = {
|
||||
let guard = self.layers.read().await;
|
||||
let (layers, _) = &*guard;
|
||||
let layers = guard.layer_map();
|
||||
let Some(open_layer) = layers.open_layer.as_ref() else {
|
||||
return Ok(());
|
||||
};
|
||||
@@ -1030,7 +956,7 @@ impl Timeline {
|
||||
|
||||
pub async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
|
||||
let guard = self.layers.read().await;
|
||||
let (layer_map, mapping) = &*guard;
|
||||
let layer_map = guard.layer_map();
|
||||
let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
|
||||
if let Some(open_layer) = &layer_map.open_layer {
|
||||
in_memory_layers.push(open_layer.info());
|
||||
@@ -1041,7 +967,7 @@ impl Timeline {
|
||||
|
||||
let mut historic_layers = Vec::new();
|
||||
for historic_layer in layer_map.iter_historic_layers() {
|
||||
let historic_layer = mapping.get_from_desc(&historic_layer);
|
||||
let historic_layer = guard.get_from_desc(&historic_layer);
|
||||
historic_layers.push(historic_layer.info(reset));
|
||||
}
|
||||
|
||||
@@ -1152,27 +1078,18 @@ impl Timeline {
|
||||
|
||||
// start the batch update
|
||||
let mut guard = self.layers.write().await;
|
||||
let (layer_map, mapping) = &mut *guard;
|
||||
let mut batch_updates = layer_map.batch_update();
|
||||
|
||||
let mut results = Vec::with_capacity(layers_to_evict.len());
|
||||
|
||||
for l in layers_to_evict.iter() {
|
||||
let res = if cancel.is_cancelled() {
|
||||
None
|
||||
} else {
|
||||
Some(self.evict_layer_batch_impl(
|
||||
&layer_removal_guard,
|
||||
l,
|
||||
&mut batch_updates,
|
||||
mapping,
|
||||
))
|
||||
Some(self.evict_layer_batch_impl(&layer_removal_guard, l, &mut guard))
|
||||
};
|
||||
results.push(res);
|
||||
}
|
||||
|
||||
// commit the updates & release locks
|
||||
batch_updates.flush();
|
||||
drop_wlock(guard);
|
||||
drop(layer_removal_guard);
|
||||
|
||||
@@ -1184,8 +1101,7 @@ impl Timeline {
|
||||
&self,
|
||||
_layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
|
||||
local_layer: &Arc<dyn PersistentLayer>,
|
||||
batch_updates: &mut BatchedUpdates<'_>,
|
||||
mapping: &mut LayerFileManager,
|
||||
layer_mgr: &mut LayerManager,
|
||||
) -> anyhow::Result<bool> {
|
||||
if local_layer.is_remote_layer() {
|
||||
// TODO(issue #3851): consider returning an err here instead of false,
|
||||
@@ -1221,7 +1137,7 @@ impl Timeline {
|
||||
&layer_metadata,
|
||||
local_layer
|
||||
.access_stats()
|
||||
.clone_for_residence_change(batch_updates, LayerResidenceStatus::Evicted),
|
||||
.clone_for_residence_change(layer_mgr, LayerResidenceStatus::Evicted),
|
||||
),
|
||||
LayerFileName::Delta(delta_name) => RemoteLayer::new_delta(
|
||||
self.tenant_id,
|
||||
@@ -1230,13 +1146,13 @@ impl Timeline {
|
||||
&layer_metadata,
|
||||
local_layer
|
||||
.access_stats()
|
||||
.clone_for_residence_change(batch_updates, LayerResidenceStatus::Evicted),
|
||||
.clone_for_residence_change(layer_mgr, LayerResidenceStatus::Evicted),
|
||||
),
|
||||
});
|
||||
|
||||
assert_eq!(local_layer.layer_desc(), new_remote_layer.layer_desc());
|
||||
|
||||
let succeed = match mapping.replace_and_verify(local_layer.clone(), new_remote_layer) {
|
||||
let succeed = match layer_mgr.replace_and_verify(local_layer.clone(), new_remote_layer) {
|
||||
Ok(()) => {
|
||||
if let Err(e) = local_layer.delete_resident_layer_file() {
|
||||
error!("failed to remove layer file on evict after replacement: {e:#?}");
|
||||
@@ -1407,10 +1323,7 @@ impl Timeline {
|
||||
timeline_id,
|
||||
tenant_id,
|
||||
pg_version,
|
||||
layers: Arc::new(tokio::sync::RwLock::new((
|
||||
LayerMap::default(),
|
||||
LayerFileManager::new(),
|
||||
))),
|
||||
layers: Arc::new(tokio::sync::RwLock::new(LayerManager::create())),
|
||||
wanted_image_layers: Mutex::new(None),
|
||||
|
||||
walredo_mgr,
|
||||
@@ -1595,7 +1508,7 @@ impl Timeline {
|
||||
let mut layers = self.layers.try_write().expect(
|
||||
"in the context where we call this function, no other task has access to the object",
|
||||
);
|
||||
layers.0.next_open_layer_at = Some(Lsn(start_lsn.0));
|
||||
layers.initialize_empty(Lsn(start_lsn.0));
|
||||
}
|
||||
|
||||
///
|
||||
@@ -1603,8 +1516,6 @@ impl Timeline {
|
||||
///
|
||||
pub(super) async fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
|
||||
let mut guard = self.layers.write().await;
|
||||
let (layers, mapping) = &mut *guard;
|
||||
let mut updates = layers.batch_update();
|
||||
let mut num_layers = 0;
|
||||
|
||||
let timer = self.metrics.load_layer_map_histo.start_timer();
|
||||
@@ -1615,6 +1526,8 @@ impl Timeline {
|
||||
// total size of layer files in the current timeline directory
|
||||
let mut total_physical_size = 0;
|
||||
|
||||
let mut loaded_layers = Vec::<Arc<dyn PersistentLayer>>::new();
|
||||
|
||||
for direntry in fs::read_dir(timeline_path)? {
|
||||
let direntry = direntry?;
|
||||
let direntry_path = direntry.path();
|
||||
@@ -1641,12 +1554,12 @@ impl Timeline {
|
||||
self.tenant_id,
|
||||
&imgfilename,
|
||||
file_size,
|
||||
LayerAccessStats::for_loading_layer(&updates, LayerResidenceStatus::Resident),
|
||||
LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Resident),
|
||||
);
|
||||
|
||||
trace!("found layer {}", layer.path().display());
|
||||
total_physical_size += file_size;
|
||||
self.insert_historic_layer(Arc::new(layer), &mut updates, mapping);
|
||||
loaded_layers.push(Arc::new(layer));
|
||||
num_layers += 1;
|
||||
} else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) {
|
||||
// Create a DeltaLayer struct for each delta file.
|
||||
@@ -1673,12 +1586,12 @@ impl Timeline {
|
||||
self.tenant_id,
|
||||
&deltafilename,
|
||||
file_size,
|
||||
LayerAccessStats::for_loading_layer(&updates, LayerResidenceStatus::Resident),
|
||||
LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Resident),
|
||||
);
|
||||
|
||||
trace!("found layer {}", layer.path().display());
|
||||
total_physical_size += file_size;
|
||||
self.insert_historic_layer(Arc::new(layer), &mut updates, mapping);
|
||||
loaded_layers.push(Arc::new(layer));
|
||||
num_layers += 1;
|
||||
} else if fname == METADATA_FILE_NAME || fname.ends_with(".old") {
|
||||
// ignore these
|
||||
@@ -1704,8 +1617,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
updates.flush();
|
||||
layers.next_open_layer_at = Some(Lsn(disk_consistent_lsn.0) + 1);
|
||||
guard.initialize_local_layers(loaded_layers, Lsn(disk_consistent_lsn.0) + 1);
|
||||
|
||||
info!(
|
||||
"loaded layer map with {} layers at {}, total physical size: {}",
|
||||
@@ -1733,8 +1645,9 @@ impl Timeline {
|
||||
// We're holding a layer map lock for a while but this
|
||||
// method is only called during init so it's fine.
|
||||
let mut guard = self.layers.write().await;
|
||||
let (layer_map, mapping) = &mut *guard;
|
||||
let mut updates = layer_map.batch_update();
|
||||
|
||||
let mut corrupted_local_layers = Vec::new();
|
||||
let mut added_remote_layers = Vec::new();
|
||||
for remote_layer_name in &index_part.timeline_layers {
|
||||
let local_layer = local_only_layers.remove(remote_layer_name);
|
||||
|
||||
@@ -1778,7 +1691,7 @@ impl Timeline {
|
||||
anyhow::bail!("could not rename file {local_layer_path:?}: {err:?}");
|
||||
} else {
|
||||
self.metrics.resident_physical_size_gauge.sub(local_size);
|
||||
self.remove_historic_layer(local_layer, &mut updates, mapping);
|
||||
corrupted_local_layers.push(local_layer);
|
||||
// fall-through to adding the remote layer
|
||||
}
|
||||
} else {
|
||||
@@ -1810,14 +1723,10 @@ impl Timeline {
|
||||
self.timeline_id,
|
||||
imgfilename,
|
||||
&remote_layer_metadata,
|
||||
LayerAccessStats::for_loading_layer(
|
||||
&updates,
|
||||
LayerResidenceStatus::Evicted,
|
||||
),
|
||||
LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Evicted),
|
||||
);
|
||||
let remote_layer = Arc::new(remote_layer);
|
||||
|
||||
self.insert_historic_layer(remote_layer, &mut updates, mapping);
|
||||
added_remote_layers.push(remote_layer);
|
||||
}
|
||||
LayerFileName::Delta(deltafilename) => {
|
||||
// Create a RemoteLayer for the delta file.
|
||||
@@ -1838,18 +1747,14 @@ impl Timeline {
|
||||
self.timeline_id,
|
||||
deltafilename,
|
||||
&remote_layer_metadata,
|
||||
LayerAccessStats::for_loading_layer(
|
||||
&updates,
|
||||
LayerResidenceStatus::Evicted,
|
||||
),
|
||||
LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Evicted),
|
||||
);
|
||||
let remote_layer = Arc::new(remote_layer);
|
||||
self.insert_historic_layer(remote_layer, &mut updates, mapping);
|
||||
added_remote_layers.push(remote_layer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
updates.flush();
|
||||
guard.initialize_remote_layers(corrupted_local_layers, added_remote_layers);
|
||||
Ok(local_only_layers)
|
||||
}
|
||||
|
||||
@@ -1885,10 +1790,10 @@ impl Timeline {
|
||||
|
||||
let local_layers = {
|
||||
let guard = self.layers.read().await;
|
||||
let (layers, mapping) = &*guard;
|
||||
let layers = guard.layer_map();
|
||||
layers
|
||||
.iter_historic_layers()
|
||||
.map(|l| (l.filename(), mapping.get_from_desc(&l)))
|
||||
.map(|l| (l.filename(), guard.get_from_desc(&l)))
|
||||
.collect::<HashMap<_, _>>()
|
||||
};
|
||||
|
||||
@@ -2262,70 +2167,15 @@ impl Timeline {
|
||||
|
||||
async fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
|
||||
let guard = self.layers.read().await;
|
||||
let (layers, mapping) = &*guard;
|
||||
for historic_layer in layers.iter_historic_layers() {
|
||||
for historic_layer in guard.layer_map().iter_historic_layers() {
|
||||
let historic_layer_name = historic_layer.filename().file_name();
|
||||
if layer_file_name == historic_layer_name {
|
||||
return Some(mapping.get_from_desc(&historic_layer));
|
||||
return Some(guard.get_from_desc(&historic_layer));
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
/// Helper function to insert a layer from both layer map and layer file manager. Will be removed in the future
|
||||
/// after we introduce `LayerMapManager`.
|
||||
fn insert_historic_layer(
|
||||
&self,
|
||||
layer: Arc<dyn PersistentLayer>,
|
||||
updates: &mut BatchedUpdates<'_>,
|
||||
mapping: &mut LayerFileManager,
|
||||
) {
|
||||
updates.insert_historic(layer.layer_desc().clone());
|
||||
mapping.insert(layer);
|
||||
}
|
||||
|
||||
/// Helper function to remove a layer from both layer map and layer file manager. Will be removed in the future
|
||||
/// after we introduce `LayerMapManager`.
|
||||
fn remove_historic_layer(
|
||||
&self,
|
||||
layer: Arc<dyn PersistentLayer>,
|
||||
updates: &mut BatchedUpdates<'_>,
|
||||
mapping: &mut LayerFileManager,
|
||||
) {
|
||||
updates.remove_historic(layer.layer_desc().clone());
|
||||
mapping.remove(layer);
|
||||
}
|
||||
|
||||
/// Removes the layer from local FS (if present) and from memory.
|
||||
/// Remote storage is not affected by this operation.
|
||||
fn delete_historic_layer(
|
||||
&self,
|
||||
// we cannot remove layers otherwise, since gc and compaction will race
|
||||
_layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
|
||||
layer: Arc<PersistentLayerDesc>,
|
||||
updates: &mut BatchedUpdates<'_>,
|
||||
mapping: &mut LayerFileManager,
|
||||
) -> anyhow::Result<()> {
|
||||
let layer = mapping.get_from_desc(&layer);
|
||||
if !layer.is_remote_layer() {
|
||||
layer.delete_resident_layer_file()?;
|
||||
let layer_file_size = layer.file_size();
|
||||
self.metrics
|
||||
.resident_physical_size_gauge
|
||||
.sub(layer_file_size);
|
||||
}
|
||||
|
||||
// TODO Removing from the bottom of the layer map is expensive.
|
||||
// Maybe instead discard all layer map historic versions that
|
||||
// won't be needed for page reconstruction for this timeline,
|
||||
// and mark what we can't delete yet as deleted from the layer
|
||||
// map index without actually rebuilding the index.
|
||||
updates.remove_historic(layer.layer_desc().clone());
|
||||
mapping.remove(layer);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
type TraversalId = String;
|
||||
@@ -2500,7 +2350,7 @@ impl Timeline {
|
||||
'layer_map_search: loop {
|
||||
let remote_layer = {
|
||||
let guard = timeline.layers.read().await;
|
||||
let (layers, mapping) = &*guard;
|
||||
let layers = guard.layer_map();
|
||||
|
||||
// Check the open and frozen in-memory layers first, in order from newest
|
||||
// to oldest.
|
||||
@@ -2562,7 +2412,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
|
||||
let layer = mapping.get_from_desc(&layer);
|
||||
let layer = guard.get_from_desc(&layer);
|
||||
// If it's a remote layer, download it and retry.
|
||||
if let Some(remote_layer) =
|
||||
super::storage_layer::downcast_remote_layer(&layer)
|
||||
@@ -2685,52 +2535,13 @@ impl Timeline {
|
||||
///
|
||||
async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
|
||||
let mut guard = self.layers.write().await;
|
||||
let (layers, _) = &mut *guard;
|
||||
|
||||
ensure!(lsn.is_aligned());
|
||||
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
ensure!(
|
||||
lsn > last_record_lsn,
|
||||
"cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})\n{}",
|
||||
let layer = guard.get_layer_for_write(
|
||||
lsn,
|
||||
last_record_lsn,
|
||||
std::backtrace::Backtrace::force_capture(),
|
||||
);
|
||||
|
||||
// Do we have a layer open for writing already?
|
||||
let layer;
|
||||
if let Some(open_layer) = &layers.open_layer {
|
||||
if open_layer.get_lsn_range().start > lsn {
|
||||
bail!(
|
||||
"unexpected open layer in the future: open layers starts at {}, write lsn {}",
|
||||
open_layer.get_lsn_range().start,
|
||||
lsn
|
||||
);
|
||||
}
|
||||
|
||||
layer = Arc::clone(open_layer);
|
||||
} else {
|
||||
// No writeable layer yet. Create one.
|
||||
let start_lsn = layers
|
||||
.next_open_layer_at
|
||||
.context("No next open layer found")?;
|
||||
|
||||
trace!(
|
||||
"creating layer for write at {}/{} for record at {}",
|
||||
self.timeline_id,
|
||||
start_lsn,
|
||||
lsn
|
||||
);
|
||||
let new_layer =
|
||||
InMemoryLayer::create(self.conf, self.timeline_id, self.tenant_id, start_lsn)?;
|
||||
let layer_rc = Arc::new(new_layer);
|
||||
|
||||
layers.open_layer = Some(Arc::clone(&layer_rc));
|
||||
layers.next_open_layer_at = None;
|
||||
|
||||
layer = layer_rc;
|
||||
}
|
||||
self.get_last_record_lsn(),
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_id,
|
||||
)?;
|
||||
Ok(layer)
|
||||
}
|
||||
|
||||
@@ -2763,21 +2574,7 @@ impl Timeline {
|
||||
Some(self.write_lock.lock().await)
|
||||
};
|
||||
let mut guard = self.layers.write().await;
|
||||
let (layers, _) = &mut *guard;
|
||||
if let Some(open_layer) = &layers.open_layer {
|
||||
let open_layer_rc = Arc::clone(open_layer);
|
||||
// Does this layer need freezing?
|
||||
let end_lsn = Lsn(self.get_last_record_lsn().0 + 1);
|
||||
open_layer.freeze(end_lsn);
|
||||
|
||||
// The layer is no longer open, update the layer map to reflect this.
|
||||
// We will replace it with on-disk historics below.
|
||||
layers.frozen_layers.push_back(open_layer_rc);
|
||||
layers.open_layer = None;
|
||||
layers.next_open_layer_at = Some(end_lsn);
|
||||
self.last_freeze_at.store(end_lsn);
|
||||
}
|
||||
drop_wlock(guard);
|
||||
guard.try_freeze_in_memory_layer(self.get_last_record_lsn(), &self.last_freeze_at);
|
||||
}
|
||||
|
||||
/// Layer flusher task's main loop.
|
||||
@@ -2802,8 +2599,7 @@ impl Timeline {
|
||||
let result = loop {
|
||||
let layer_to_flush = {
|
||||
let guard = self.layers.read().await;
|
||||
let (layers, _) = &*guard;
|
||||
layers.frozen_layers.front().cloned()
|
||||
guard.layer_map().frozen_layers.front().cloned()
|
||||
// drop 'layers' lock to allow concurrent reads and writes
|
||||
};
|
||||
let Some(layer_to_flush) = layer_to_flush else { break Ok(()) };
|
||||
@@ -2921,12 +2717,11 @@ impl Timeline {
|
||||
pausable_failpoint!("flush-frozen-before-sync");
|
||||
|
||||
// The new on-disk layers are now in the layer map. We can remove the
|
||||
// in-memory layer from the map now. We do not modify `LayerFileManager` because
|
||||
// it only contains persistent layers. The flushed layer is stored in
|
||||
// in-memory layer from the map now. The flushed layer is stored in
|
||||
// the mapping in `create_delta_layer`.
|
||||
{
|
||||
let mut layers = self.layers.write().await;
|
||||
let l = layers.0.frozen_layers.pop_front();
|
||||
let mut guard = self.layers.write().await;
|
||||
let l = guard.layer_map_mut().frozen_layers.pop_front();
|
||||
|
||||
// Only one thread may call this function at a time (for this
|
||||
// timeline). If two threads tried to flush the same frozen
|
||||
@@ -3065,15 +2860,12 @@ impl Timeline {
|
||||
// Add it to the layer map
|
||||
let l = Arc::new(new_delta);
|
||||
let mut guard = self.layers.write().await;
|
||||
let (layers, mapping) = &mut *guard;
|
||||
let mut batch_updates = layers.batch_update();
|
||||
l.access_stats().record_residence_event(
|
||||
&batch_updates,
|
||||
&guard,
|
||||
LayerResidenceStatus::Resident,
|
||||
LayerResidenceEventReason::LayerCreate,
|
||||
);
|
||||
self.insert_historic_layer(l, &mut batch_updates, mapping);
|
||||
batch_updates.flush();
|
||||
guard.track_new_l0_delta_layer(l);
|
||||
|
||||
// update metrics
|
||||
self.metrics.resident_physical_size_gauge.add(sz);
|
||||
@@ -3122,7 +2914,7 @@ impl Timeline {
|
||||
let threshold = self.get_image_creation_threshold();
|
||||
|
||||
let guard = self.layers.read().await;
|
||||
let (layers, _) = &*guard;
|
||||
let layers = guard.layer_map();
|
||||
|
||||
let mut max_deltas = 0;
|
||||
{
|
||||
@@ -3301,11 +3093,9 @@ impl Timeline {
|
||||
let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len());
|
||||
|
||||
let mut guard = self.layers.write().await;
|
||||
let (layers, mapping) = &mut *guard;
|
||||
let mut updates = layers.batch_update();
|
||||
let timeline_path = self.conf.timeline_path(&self.tenant_id, &self.timeline_id);
|
||||
|
||||
for l in image_layers {
|
||||
for l in &image_layers {
|
||||
let path = l.filename();
|
||||
let metadata = timeline_path
|
||||
.join(path.file_name())
|
||||
@@ -3319,13 +3109,12 @@ impl Timeline {
|
||||
.add(metadata.len());
|
||||
let l = Arc::new(l);
|
||||
l.access_stats().record_residence_event(
|
||||
&updates,
|
||||
&guard,
|
||||
LayerResidenceStatus::Resident,
|
||||
LayerResidenceEventReason::LayerCreate,
|
||||
);
|
||||
self.insert_historic_layer(l, &mut updates, mapping);
|
||||
}
|
||||
updates.flush();
|
||||
guard.track_new_image_layers(image_layers);
|
||||
drop_wlock(guard);
|
||||
timer.stop_and_record();
|
||||
|
||||
@@ -3487,18 +3276,18 @@ impl Timeline {
|
||||
fn compact_level0_phase1(
|
||||
self: Arc<Self>,
|
||||
_layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
|
||||
guard: tokio::sync::OwnedRwLockReadGuard<(LayerMap, LayerFileManager)>,
|
||||
guard: tokio::sync::OwnedRwLockReadGuard<LayerManager>,
|
||||
mut stats: CompactLevel0Phase1StatsBuilder,
|
||||
target_file_size: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<CompactLevel0Phase1Result, CompactionError> {
|
||||
stats.read_lock_held_spawn_blocking_startup_micros =
|
||||
stats.read_lock_acquisition_micros.till_now(); // set by caller
|
||||
let (layers, mapping) = &*guard;
|
||||
let layers = guard.layer_map();
|
||||
let level0_deltas = layers.get_level0_deltas()?;
|
||||
let mut level0_deltas = level0_deltas
|
||||
.into_iter()
|
||||
.map(|x| mapping.get_from_desc(&x))
|
||||
.map(|x| guard.get_from_desc(&x))
|
||||
.collect_vec();
|
||||
stats.level0_deltas_count = Some(level0_deltas.len());
|
||||
// Only compact if enough layers have accumulated.
|
||||
@@ -3914,9 +3703,11 @@ impl Timeline {
|
||||
}
|
||||
|
||||
let mut guard = self.layers.write().await;
|
||||
let (layers, mapping) = &mut *guard;
|
||||
let mut updates = layers.batch_update();
|
||||
let mut new_layer_paths = HashMap::with_capacity(new_layers.len());
|
||||
|
||||
let mut insert_layers = Vec::new();
|
||||
let mut remove_layers = Vec::new();
|
||||
|
||||
for l in new_layers {
|
||||
let new_delta_path = l.path();
|
||||
|
||||
@@ -3942,11 +3733,11 @@ impl Timeline {
|
||||
new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len()));
|
||||
let x: Arc<dyn PersistentLayer + 'static> = Arc::new(l);
|
||||
x.access_stats().record_residence_event(
|
||||
&updates,
|
||||
&guard,
|
||||
LayerResidenceStatus::Resident,
|
||||
LayerResidenceEventReason::LayerCreate,
|
||||
);
|
||||
self.insert_historic_layer(x, &mut updates, mapping);
|
||||
insert_layers.push(x);
|
||||
}
|
||||
|
||||
// Now that we have reshuffled the data to set of new delta layers, we can
|
||||
@@ -3954,12 +3745,16 @@ impl Timeline {
|
||||
let mut layer_names_to_delete = Vec::with_capacity(deltas_to_compact.len());
|
||||
for l in deltas_to_compact {
|
||||
layer_names_to_delete.push(l.filename());
|
||||
// NB: the layer file identified by descriptor `l` is guaranteed to be present
|
||||
// in the LayerFileManager because we kept holding `layer_removal_cs` the entire
|
||||
// time, even though we dropped `Timeline::layers` inbetween.
|
||||
self.delete_historic_layer(layer_removal_cs.clone(), l, &mut updates, mapping)?;
|
||||
remove_layers.push(guard.get_from_desc(&l));
|
||||
}
|
||||
updates.flush();
|
||||
|
||||
guard.finish_compact_l0(
|
||||
layer_removal_cs,
|
||||
remove_layers,
|
||||
insert_layers,
|
||||
&self.metrics,
|
||||
)?;
|
||||
|
||||
drop_wlock(guard);
|
||||
|
||||
// Also schedule the deletions in remote storage
|
||||
@@ -4178,7 +3973,7 @@ impl Timeline {
|
||||
//
|
||||
// TODO holding a write lock is too agressive and avoidable
|
||||
let mut guard = self.layers.write().await;
|
||||
let (layers, mapping) = &mut *guard;
|
||||
let layers = guard.layer_map();
|
||||
'outer: for l in layers.iter_historic_layers() {
|
||||
result.layers_total += 1;
|
||||
|
||||
@@ -4274,7 +4069,6 @@ impl Timeline {
|
||||
.unwrap()
|
||||
.replace((new_gc_cutoff, wanted_image_layers.to_keyspace()));
|
||||
|
||||
let mut updates = layers.batch_update();
|
||||
if !layers_to_remove.is_empty() {
|
||||
// Persist the new GC cutoff value in the metadata file, before
|
||||
// we actually remove anything.
|
||||
@@ -4284,18 +4078,15 @@ impl Timeline {
|
||||
// (couldn't do this in the loop above, because you cannot modify a collection
|
||||
// while iterating it. BTreeMap::retain() would be another option)
|
||||
let mut layer_names_to_delete = Vec::with_capacity(layers_to_remove.len());
|
||||
{
|
||||
for doomed_layer in layers_to_remove {
|
||||
layer_names_to_delete.push(doomed_layer.filename());
|
||||
self.delete_historic_layer(
|
||||
layer_removal_cs.clone(),
|
||||
doomed_layer,
|
||||
&mut updates,
|
||||
mapping,
|
||||
)?; // FIXME: schedule succeeded deletions before returning?
|
||||
result.layers_removed += 1;
|
||||
}
|
||||
let gc_layers = layers_to_remove
|
||||
.iter()
|
||||
.map(|x| guard.get_from_desc(x))
|
||||
.collect();
|
||||
for doomed_layer in layers_to_remove {
|
||||
layer_names_to_delete.push(doomed_layer.filename());
|
||||
result.layers_removed += 1;
|
||||
}
|
||||
let apply = guard.finish_gc_timeline(layer_removal_cs, gc_layers, &self.metrics)?;
|
||||
|
||||
if result.layers_removed != 0 {
|
||||
fail_point!("after-timeline-gc-removed-layers");
|
||||
@@ -4304,8 +4095,9 @@ impl Timeline {
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
remote_client.schedule_layer_file_deletion(&layer_names_to_delete)?;
|
||||
}
|
||||
|
||||
apply.flush();
|
||||
}
|
||||
updates.flush();
|
||||
|
||||
info!(
|
||||
"GC completed removing {} layers, cutoff {}",
|
||||
@@ -4477,13 +4269,11 @@ impl Timeline {
|
||||
// Download complete. Replace the RemoteLayer with the corresponding
|
||||
// Delta- or ImageLayer in the layer map.
|
||||
let mut guard = self_clone.layers.write().await;
|
||||
let (layers, mapping) = &mut *guard;
|
||||
let updates = layers.batch_update();
|
||||
let new_layer =
|
||||
remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size);
|
||||
remote_layer.create_downloaded_layer(&guard, self_clone.conf, *size);
|
||||
{
|
||||
let l: Arc<dyn PersistentLayer> = remote_layer.clone();
|
||||
let failure = match mapping.replace_and_verify(l, new_layer) {
|
||||
let failure = match guard.replace_and_verify(l, new_layer) {
|
||||
Ok(()) => false,
|
||||
Err(e) => {
|
||||
// this is a precondition failure, the layer filename derived
|
||||
@@ -4511,7 +4301,6 @@ impl Timeline {
|
||||
.store(true, Relaxed);
|
||||
}
|
||||
}
|
||||
updates.flush();
|
||||
drop_wlock(guard);
|
||||
|
||||
info!("on-demand download successful");
|
||||
@@ -4612,10 +4401,10 @@ impl Timeline {
|
||||
let mut downloads = Vec::new();
|
||||
{
|
||||
let guard = self.layers.read().await;
|
||||
let (layers, mapping) = &*guard;
|
||||
let layers = guard.layer_map();
|
||||
layers
|
||||
.iter_historic_layers()
|
||||
.map(|l| mapping.get_from_desc(&l))
|
||||
.map(|l| guard.get_from_desc(&l))
|
||||
.filter_map(|l| l.downcast_remote_layer())
|
||||
.map(|l| self.download_remote_layer(l))
|
||||
.for_each(|dl| downloads.push(dl))
|
||||
@@ -4717,7 +4506,7 @@ impl LocalLayerInfoForDiskUsageEviction {
|
||||
impl Timeline {
|
||||
pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
|
||||
let guard = self.layers.read().await;
|
||||
let (layers, mapping) = &*guard;
|
||||
let layers = guard.layer_map();
|
||||
|
||||
let mut max_layer_size: Option<u64> = None;
|
||||
let mut resident_layers = Vec::new();
|
||||
@@ -4726,7 +4515,7 @@ impl Timeline {
|
||||
let file_size = l.file_size();
|
||||
max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
|
||||
|
||||
let l = mapping.get_from_desc(&l);
|
||||
let l = guard.get_from_desc(&l);
|
||||
|
||||
if l.is_remote_layer() {
|
||||
continue;
|
||||
|
||||
@@ -198,10 +198,10 @@ impl Timeline {
|
||||
// So, we just need to deal with this.
|
||||
let candidates: Vec<Arc<dyn PersistentLayer>> = {
|
||||
let guard = self.layers.read().await;
|
||||
let (layers, mapping) = &*guard;
|
||||
let layers = guard.layer_map();
|
||||
let mut candidates = Vec::new();
|
||||
for hist_layer in layers.iter_historic_layers() {
|
||||
let hist_layer = mapping.get_from_desc(&hist_layer);
|
||||
let hist_layer = guard.get_from_desc(&hist_layer);
|
||||
if hist_layer.is_remote_layer() {
|
||||
continue;
|
||||
}
|
||||
|
||||
370
pageserver/src/tenant/timeline/layer_manager.rs
Normal file
370
pageserver/src/tenant/timeline/layer_manager.rs
Normal file
@@ -0,0 +1,370 @@
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use tracing::trace;
|
||||
use utils::{
|
||||
id::{TenantId, TimelineId},
|
||||
lsn::{AtomicLsn, Lsn},
|
||||
};
|
||||
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
metrics::TimelineMetrics,
|
||||
tenant::{
|
||||
layer_map::{BatchedUpdates, LayerMap},
|
||||
storage_layer::{
|
||||
AsLayerDesc, DeltaLayer, ImageLayer, InMemoryLayer, Layer, PersistentLayer,
|
||||
PersistentLayerDesc, PersistentLayerKey, RemoteLayer,
|
||||
},
|
||||
timeline::compare_arced_layers,
|
||||
},
|
||||
};
|
||||
|
||||
/// Provides semantic APIs to manipulate the layer map.
|
||||
pub struct LayerManager {
|
||||
layer_map: LayerMap,
|
||||
layer_fmgr: LayerFileManager,
|
||||
}
|
||||
|
||||
/// After GC, the layer map changes will not be applied immediately. Users should manually apply the changes after
|
||||
/// scheduling deletes in remote client.
|
||||
pub struct ApplyGcResultGuard<'a>(BatchedUpdates<'a>);
|
||||
|
||||
impl ApplyGcResultGuard<'_> {
|
||||
pub fn flush(self) {
|
||||
self.0.flush();
|
||||
}
|
||||
}
|
||||
|
||||
impl LayerManager {
|
||||
pub fn create() -> Self {
|
||||
Self {
|
||||
layer_map: LayerMap::default(),
|
||||
layer_fmgr: LayerFileManager::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc<dyn PersistentLayer> {
|
||||
self.layer_fmgr.get_from_desc(desc)
|
||||
}
|
||||
|
||||
/// Get an immutable reference to the layer map.
|
||||
///
|
||||
/// We expect users only to be able to get an immutable layer map. If users want to make modifications,
|
||||
/// they should use the below semantic APIs. This design makes us step closer to immutable storage state.
|
||||
pub fn layer_map(&self) -> &LayerMap {
|
||||
&self.layer_map
|
||||
}
|
||||
|
||||
/// Get a mutable reference to the layer map. This function will be removed once `flush_frozen_layer`
|
||||
/// gets a refactor.
|
||||
pub fn layer_map_mut(&mut self) -> &mut LayerMap {
|
||||
&mut self.layer_map
|
||||
}
|
||||
|
||||
/// Replace layers in the layer file manager, used in evictions and layer downloads.
|
||||
pub fn replace_and_verify(
|
||||
&mut self,
|
||||
expected: Arc<dyn PersistentLayer>,
|
||||
new: Arc<dyn PersistentLayer>,
|
||||
) -> Result<()> {
|
||||
self.layer_fmgr.replace_and_verify(expected, new)
|
||||
}
|
||||
|
||||
/// Called from `load_layer_map`. Initialize the layer manager with:
|
||||
/// 1. all on-disk layers
|
||||
/// 2. next open layer (with disk disk_consistent_lsn LSN)
|
||||
pub fn initialize_local_layers(
|
||||
&mut self,
|
||||
on_disk_layers: Vec<Arc<dyn PersistentLayer>>,
|
||||
next_open_layer_at: Lsn,
|
||||
) {
|
||||
let mut updates = self.layer_map.batch_update();
|
||||
for layer in on_disk_layers {
|
||||
Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr);
|
||||
}
|
||||
updates.flush();
|
||||
self.layer_map.next_open_layer_at = Some(next_open_layer_at);
|
||||
}
|
||||
|
||||
/// Initialize when creating a new timeline, called in `init_empty_layer_map`.
|
||||
pub fn initialize_empty(&mut self, next_open_layer_at: Lsn) {
|
||||
self.layer_map.next_open_layer_at = Some(next_open_layer_at);
|
||||
}
|
||||
|
||||
pub fn initialize_remote_layers(
|
||||
&mut self,
|
||||
corrupted_local_layers: Vec<Arc<dyn PersistentLayer>>,
|
||||
remote_layers: Vec<Arc<RemoteLayer>>,
|
||||
) {
|
||||
let mut updates = self.layer_map.batch_update();
|
||||
for layer in corrupted_local_layers {
|
||||
Self::remove_historic_layer(layer, &mut updates, &mut self.layer_fmgr);
|
||||
}
|
||||
for layer in remote_layers {
|
||||
Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr);
|
||||
}
|
||||
updates.flush();
|
||||
}
|
||||
|
||||
/// Open a new writable layer to append data if there is no open layer, otherwise return the current open layer,
|
||||
/// called within `get_layer_for_write`.
|
||||
pub fn get_layer_for_write(
|
||||
&mut self,
|
||||
lsn: Lsn,
|
||||
last_record_lsn: Lsn,
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<Arc<InMemoryLayer>> {
|
||||
ensure!(lsn.is_aligned());
|
||||
|
||||
ensure!(
|
||||
lsn > last_record_lsn,
|
||||
"cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})\n{}",
|
||||
lsn,
|
||||
last_record_lsn,
|
||||
std::backtrace::Backtrace::force_capture(),
|
||||
);
|
||||
|
||||
// Do we have a layer open for writing already?
|
||||
let layer = if let Some(open_layer) = &self.layer_map.open_layer {
|
||||
if open_layer.get_lsn_range().start > lsn {
|
||||
bail!(
|
||||
"unexpected open layer in the future: open layers starts at {}, write lsn {}",
|
||||
open_layer.get_lsn_range().start,
|
||||
lsn
|
||||
);
|
||||
}
|
||||
|
||||
Arc::clone(open_layer)
|
||||
} else {
|
||||
// No writeable layer yet. Create one.
|
||||
let start_lsn = self
|
||||
.layer_map
|
||||
.next_open_layer_at
|
||||
.context("No next open layer found")?;
|
||||
|
||||
trace!(
|
||||
"creating in-memory layer at {}/{} for record at {}",
|
||||
timeline_id,
|
||||
start_lsn,
|
||||
lsn
|
||||
);
|
||||
|
||||
let new_layer = InMemoryLayer::create(conf, timeline_id, tenant_id, start_lsn)?;
|
||||
let layer = Arc::new(new_layer);
|
||||
|
||||
self.layer_map.open_layer = Some(layer.clone());
|
||||
self.layer_map.next_open_layer_at = None;
|
||||
|
||||
layer
|
||||
};
|
||||
|
||||
Ok(layer)
|
||||
}
|
||||
|
||||
/// Called from `freeze_inmem_layer`, returns true if successfully frozen.
|
||||
pub fn try_freeze_in_memory_layer(
|
||||
&mut self,
|
||||
Lsn(last_record_lsn): Lsn,
|
||||
last_freeze_at: &AtomicLsn,
|
||||
) {
|
||||
let end_lsn = Lsn(last_record_lsn + 1);
|
||||
|
||||
if let Some(open_layer) = &self.layer_map.open_layer {
|
||||
let open_layer_rc = Arc::clone(open_layer);
|
||||
// Does this layer need freezing?
|
||||
open_layer.freeze(end_lsn);
|
||||
|
||||
// The layer is no longer open, update the layer map to reflect this.
|
||||
// We will replace it with on-disk historics below.
|
||||
self.layer_map.frozen_layers.push_back(open_layer_rc);
|
||||
self.layer_map.open_layer = None;
|
||||
self.layer_map.next_open_layer_at = Some(end_lsn);
|
||||
last_freeze_at.store(end_lsn);
|
||||
}
|
||||
}
|
||||
|
||||
/// Add image layers to the layer map, called from `create_image_layers`.
|
||||
pub fn track_new_image_layers(&mut self, image_layers: Vec<ImageLayer>) {
|
||||
let mut updates = self.layer_map.batch_update();
|
||||
for layer in image_layers {
|
||||
Self::insert_historic_layer(Arc::new(layer), &mut updates, &mut self.layer_fmgr);
|
||||
}
|
||||
updates.flush();
|
||||
}
|
||||
|
||||
/// Insert into the layer map when a new delta layer is created, called from `create_delta_layer`.
|
||||
pub fn track_new_l0_delta_layer(&mut self, delta_layer: Arc<DeltaLayer>) {
|
||||
let mut updates = self.layer_map.batch_update();
|
||||
Self::insert_historic_layer(delta_layer, &mut updates, &mut self.layer_fmgr);
|
||||
updates.flush();
|
||||
}
|
||||
|
||||
/// Called when compaction is completed.
|
||||
pub fn finish_compact_l0(
|
||||
&mut self,
|
||||
layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
|
||||
compact_from: Vec<Arc<dyn PersistentLayer>>,
|
||||
compact_to: Vec<Arc<dyn PersistentLayer>>,
|
||||
metrics: &TimelineMetrics,
|
||||
) -> Result<()> {
|
||||
let mut updates = self.layer_map.batch_update();
|
||||
for l in compact_to {
|
||||
Self::insert_historic_layer(l, &mut updates, &mut self.layer_fmgr);
|
||||
}
|
||||
for l in compact_from {
|
||||
// NB: the layer file identified by descriptor `l` is guaranteed to be present
|
||||
// in the LayerFileManager because compaction kept holding `layer_removal_cs` the entire
|
||||
// time, even though we dropped `Timeline::layers` inbetween.
|
||||
Self::delete_historic_layer(
|
||||
layer_removal_cs.clone(),
|
||||
l,
|
||||
&mut updates,
|
||||
metrics,
|
||||
&mut self.layer_fmgr,
|
||||
)?;
|
||||
}
|
||||
updates.flush();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Called when garbage collect the timeline. Returns a guard that will apply the updates to the layer map.
|
||||
pub fn finish_gc_timeline(
|
||||
&mut self,
|
||||
layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
|
||||
gc_layers: Vec<Arc<dyn PersistentLayer>>,
|
||||
metrics: &TimelineMetrics,
|
||||
) -> Result<ApplyGcResultGuard> {
|
||||
let mut updates = self.layer_map.batch_update();
|
||||
for doomed_layer in gc_layers {
|
||||
Self::delete_historic_layer(
|
||||
layer_removal_cs.clone(),
|
||||
doomed_layer,
|
||||
&mut updates,
|
||||
metrics,
|
||||
&mut self.layer_fmgr,
|
||||
)?; // FIXME: schedule succeeded deletions in timeline.rs `gc_timeline` instead of in batch?
|
||||
}
|
||||
Ok(ApplyGcResultGuard(updates))
|
||||
}
|
||||
|
||||
/// Helper function to insert a layer into the layer map and file manager.
|
||||
fn insert_historic_layer(
|
||||
layer: Arc<dyn PersistentLayer>,
|
||||
updates: &mut BatchedUpdates<'_>,
|
||||
mapping: &mut LayerFileManager,
|
||||
) {
|
||||
updates.insert_historic(layer.layer_desc().clone());
|
||||
mapping.insert(layer);
|
||||
}
|
||||
|
||||
/// Helper function to remove a layer into the layer map and file manager
|
||||
fn remove_historic_layer(
|
||||
layer: Arc<dyn PersistentLayer>,
|
||||
updates: &mut BatchedUpdates<'_>,
|
||||
mapping: &mut LayerFileManager,
|
||||
) {
|
||||
updates.remove_historic(layer.layer_desc().clone());
|
||||
mapping.remove(layer);
|
||||
}
|
||||
|
||||
/// Removes the layer from local FS (if present) and from memory.
|
||||
/// Remote storage is not affected by this operation.
|
||||
fn delete_historic_layer(
|
||||
// we cannot remove layers otherwise, since gc and compaction will race
|
||||
_layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
|
||||
layer: Arc<dyn PersistentLayer>,
|
||||
updates: &mut BatchedUpdates<'_>,
|
||||
metrics: &TimelineMetrics,
|
||||
mapping: &mut LayerFileManager,
|
||||
) -> anyhow::Result<()> {
|
||||
if !layer.is_remote_layer() {
|
||||
layer.delete_resident_layer_file()?;
|
||||
let layer_file_size = layer.file_size();
|
||||
metrics.resident_physical_size_gauge.sub(layer_file_size);
|
||||
}
|
||||
|
||||
// TODO Removing from the bottom of the layer map is expensive.
|
||||
// Maybe instead discard all layer map historic versions that
|
||||
// won't be needed for page reconstruction for this timeline,
|
||||
// and mark what we can't delete yet as deleted from the layer
|
||||
// map index without actually rebuilding the index.
|
||||
updates.remove_historic(layer.layer_desc().clone());
|
||||
mapping.remove(layer);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct LayerFileManager<T: AsLayerDesc + ?Sized = dyn PersistentLayer>(
|
||||
HashMap<PersistentLayerKey, Arc<T>>,
|
||||
);
|
||||
|
||||
impl<T: AsLayerDesc + ?Sized> LayerFileManager<T> {
|
||||
fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc<T> {
|
||||
// The assumption for the `expect()` is that all code maintains the following invariant:
|
||||
// A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
|
||||
self.0
|
||||
.get(&desc.key())
|
||||
.with_context(|| format!("get layer from desc: {}", desc.filename()))
|
||||
.expect("not found")
|
||||
.clone()
|
||||
}
|
||||
|
||||
pub(crate) fn insert(&mut self, layer: Arc<T>) {
|
||||
let present = self.0.insert(layer.layer_desc().key(), layer.clone());
|
||||
if present.is_some() && cfg!(debug_assertions) {
|
||||
panic!("overwriting a layer: {:?}", layer.layer_desc())
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn new() -> Self {
|
||||
Self(HashMap::new())
|
||||
}
|
||||
|
||||
pub(crate) fn remove(&mut self, layer: Arc<T>) {
|
||||
let present = self.0.remove(&layer.layer_desc().key());
|
||||
if present.is_none() && cfg!(debug_assertions) {
|
||||
panic!(
|
||||
"removing layer that is not present in layer mapping: {:?}",
|
||||
layer.layer_desc()
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn replace_and_verify(&mut self, expected: Arc<T>, new: Arc<T>) -> Result<()> {
|
||||
let key = expected.layer_desc().key();
|
||||
let other = new.layer_desc().key();
|
||||
|
||||
let expected_l0 = LayerMap::is_l0(expected.layer_desc());
|
||||
let new_l0 = LayerMap::is_l0(new.layer_desc());
|
||||
|
||||
fail::fail_point!("layermap-replace-notfound", |_| anyhow::bail!(
|
||||
"layermap-replace-notfound"
|
||||
));
|
||||
|
||||
anyhow::ensure!(
|
||||
key == other,
|
||||
"expected and new layer have different keys: {key:?} != {other:?}"
|
||||
);
|
||||
|
||||
anyhow::ensure!(
|
||||
expected_l0 == new_l0,
|
||||
"one layer is l0 while the other is not: {expected_l0} != {new_l0}"
|
||||
);
|
||||
|
||||
if let Some(layer) = self.0.get_mut(&key) {
|
||||
anyhow::ensure!(
|
||||
compare_arced_layers(&expected, layer),
|
||||
"another layer was found instead of expected, expected={expected:?}, new={new:?}",
|
||||
expected = Arc::as_ptr(&expected),
|
||||
new = Arc::as_ptr(layer),
|
||||
);
|
||||
*layer = new;
|
||||
Ok(())
|
||||
} else {
|
||||
anyhow::bail!("layer was not found");
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user