fix: EphemeralFiles can outlive their Timeline via enum LayerManager (#8229)

Ephemeral files cleanup on drop but did not delay shutdown, leading to
problems with restarting the tenant. The solution is as proposed:
- make ephemeral files carry the gate guard to delay `Timeline::gate`
closing
- flush in-memory layers and strong references to those on
`Timeline::shutdown`

The above are realized by making LayerManager an `enum` with `Open` and
`Closed` variants, and fail requests to modify `LayerMap`.

Additionally:

- fix too eager anyhow conversions in compaction
- unify how we freeze layers and handle errors
- optimize likely_resident_layers to read LayerFileManager hashmap
values instead of bouncing through LayerMap

Fixes: #7830
This commit is contained in:
Joonas Koivunen
2024-08-07 17:50:09 +03:00
committed by John Spray
parent 1f73dfb842
commit c0776b8724
16 changed files with 505 additions and 306 deletions

View File

@@ -78,8 +78,9 @@ impl Drop for GateGuard {
}
}
#[derive(Debug)]
#[derive(Debug, thiserror::Error)]
pub enum GateError {
#[error("gate is closed")]
GateClosed,
}

View File

@@ -61,7 +61,11 @@ async fn ingest(
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
let layer = InMemoryLayer::create(conf, timeline_id, tenant_shard_id, lsn, &ctx).await?;
let gate = utils::sync::gate::Gate::default();
let entered = gate.enter().unwrap();
let layer =
InMemoryLayer::create(conf, timeline_id, tenant_shard_id, lsn, entered, &ctx).await?;
let data = Value::Image(Bytes::from(vec![0u8; put_size])).ser()?;
let ctx = RequestContext::new(

View File

@@ -1162,7 +1162,10 @@ async fn layer_map_info_handler(
let timeline =
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;
let layer_map_info = timeline.layer_map_info(reset).await;
let layer_map_info = timeline
.layer_map_info(reset)
.await
.map_err(|_shutdown| ApiError::ShuttingDown)?;
json_response(StatusCode::OK, layer_map_info)
}

View File

@@ -601,6 +601,12 @@ impl From<PageReconstructError> for GcError {
}
}
impl From<timeline::layer_manager::Shutdown> for GcError {
fn from(_: timeline::layer_manager::Shutdown) -> Self {
GcError::TimelineCancelled
}
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum LoadConfigError {
#[error("TOML deserialization error: '{0}'")]
@@ -710,6 +716,7 @@ impl Tenant {
.read()
.await
.layer_map()
.expect("currently loading, layer manager cannot be shutdown already")
.iter_historic_layers()
.next()
.is_some(),
@@ -4674,10 +4681,10 @@ mod tests {
let layer_map = tline.layers.read().await;
let level0_deltas = layer_map
.layer_map()
.get_level0_deltas()
.into_iter()
.map(|desc| layer_map.get_from_desc(&desc))
.layer_map()?
.level0_deltas()
.iter()
.map(|desc| layer_map.get_from_desc(desc))
.collect::<Vec<_>>();
assert!(!level0_deltas.is_empty());
@@ -4908,11 +4915,13 @@ mod tests {
let inserted = bulk_insert_compact_gc(&tenant, &tline, &ctx, lsn, 50, 10000).await?;
let guard = tline.layers.read().await;
guard.layer_map().dump(true, &ctx).await?;
let lm = guard.layer_map()?;
lm.dump(true, &ctx).await?;
let mut reads = Vec::new();
let mut prev = None;
guard.layer_map().iter_historic_layers().for_each(|desc| {
lm.iter_historic_layers().for_each(|desc| {
if !desc.is_delta() {
prev = Some(desc.clone());
return;
@@ -5918,23 +5927,12 @@ mod tests {
tline.freeze_and_flush().await?; // force create a delta layer
}
let before_num_l0_delta_files = tline
.layers
.read()
.await
.layer_map()
.get_level0_deltas()
.len();
let before_num_l0_delta_files =
tline.layers.read().await.layer_map()?.level0_deltas().len();
tline.compact(&cancel, EnumSet::empty(), &ctx).await?;
let after_num_l0_delta_files = tline
.layers
.read()
.await
.layer_map()
.get_level0_deltas()
.len();
let after_num_l0_delta_files = tline.layers.read().await.layer_map()?.level0_deltas().len();
assert!(after_num_l0_delta_files < before_num_l0_delta_files, "after_num_l0_delta_files={after_num_l0_delta_files}, before_num_l0_delta_files={before_num_l0_delta_files}");

View File

@@ -29,6 +29,7 @@ impl EphemeralFile {
conf: &PageServerConf,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
gate_guard: utils::sync::gate::GateGuard,
ctx: &RequestContext,
) -> Result<EphemeralFile, io::Error> {
static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
@@ -51,10 +52,12 @@ impl EphemeralFile {
)
.await?;
let prewarm = conf.l0_flush.prewarm_on_write();
Ok(EphemeralFile {
_tenant_shard_id: tenant_shard_id,
_timeline_id: timeline_id,
rw: page_caching::RW::new(file, conf.l0_flush.prewarm_on_write()),
rw: page_caching::RW::new(file, prewarm, gate_guard),
})
}
@@ -161,7 +164,11 @@ mod tests {
async fn test_ephemeral_blobs() -> Result<(), io::Error> {
let (conf, tenant_id, timeline_id, ctx) = harness("ephemeral_blobs")?;
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &ctx).await?;
let gate = utils::sync::gate::Gate::default();
let entered = gate.enter().unwrap();
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, entered, &ctx).await?;
let pos_foo = file.write_blob(b"foo", &ctx).await?;
assert_eq!(
@@ -215,4 +222,38 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn ephemeral_file_holds_gate_open() {
const FOREVER: std::time::Duration = std::time::Duration::from_secs(5);
let (conf, tenant_id, timeline_id, ctx) =
harness("ephemeral_file_holds_gate_open").unwrap();
let gate = utils::sync::gate::Gate::default();
let file = EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx)
.await
.unwrap();
let mut closing = tokio::task::spawn(async move {
gate.close().await;
});
// gate is entered until the ephemeral file is dropped
// do not start paused tokio-epoll-uring has a sleep loop
tokio::time::pause();
tokio::time::timeout(FOREVER, &mut closing)
.await
.expect_err("closing cannot complete before dropping");
// this is a requirement of the reset_tenant functionality: we have to be able to restart a
// tenant fast, and for that, we need all tenant_dir operations be guarded by entering a gate
drop(file);
tokio::time::timeout(FOREVER, &mut closing)
.await
.expect("closing completes right away")
.expect("closing does not panic");
}
}

View File

@@ -18,6 +18,8 @@ use super::zero_padded_read_write;
pub struct RW {
page_cache_file_id: page_cache::FileId,
rw: super::zero_padded_read_write::RW<PreWarmingWriter>,
/// Gate guard is held on as long as we need to do operations in the path (delete on drop).
_gate_guard: utils::sync::gate::GateGuard,
}
/// When we flush a block to the underlying [`crate::virtual_file::VirtualFile`],
@@ -29,7 +31,11 @@ pub enum PrewarmOnWrite {
}
impl RW {
pub fn new(file: VirtualFile, prewarm_on_write: PrewarmOnWrite) -> Self {
pub fn new(
file: VirtualFile,
prewarm_on_write: PrewarmOnWrite,
_gate_guard: utils::sync::gate::GateGuard,
) -> Self {
let page_cache_file_id = page_cache::next_file_id();
Self {
page_cache_file_id,
@@ -38,6 +44,7 @@ impl RW {
file,
prewarm_on_write,
)),
_gate_guard,
}
}
@@ -145,6 +152,7 @@ impl Drop for RW {
// We leave them there, [`crate::page_cache::PageCache::find_victim`] will evict them when needed.
// unlink the file
// we are clear to do this, because we have entered a gate
let res = std::fs::remove_file(&self.rw.as_writer().file.path);
if let Err(e) = res {
if e.kind() != std::io::ErrorKind::NotFound {

View File

@@ -846,8 +846,8 @@ impl LayerMap {
}
/// Return all L0 delta layers
pub fn get_level0_deltas(&self) -> Vec<Arc<PersistentLayerDesc>> {
self.l0_delta_layers.to_vec()
pub fn level0_deltas(&self) -> &Vec<Arc<PersistentLayerDesc>> {
&self.l0_delta_layers
}
/// debugging function to print out the contents of the layer map

View File

@@ -1767,14 +1767,9 @@ impl TenantManager {
let parent_timelines = timelines.keys().cloned().collect::<Vec<_>>();
for timeline in timelines.values() {
tracing::info!(timeline_id=%timeline.timeline_id, "Loading list of layers to hardlink");
let timeline_layers = timeline
.layers
.read()
.await
.likely_resident_layers()
.collect::<Vec<_>>();
let layers = timeline.layers.read().await;
for layer in timeline_layers {
for layer in layers.likely_resident_layers() {
let relative_path = layer
.local_path()
.strip_prefix(&parent_path)

View File

@@ -1957,6 +1957,7 @@ pub(crate) mod test {
.await
.likely_resident_layers()
.next()
.cloned()
.unwrap();
{
@@ -2031,7 +2032,8 @@ pub(crate) mod test {
.read()
.await
.likely_resident_layers()
.find(|x| x != &initdb_layer)
.find(|&x| x != &initdb_layer)
.cloned()
.unwrap();
// create a copy for the timeline, so we don't overwrite the file

View File

@@ -385,11 +385,13 @@ impl InMemoryLayer {
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
start_lsn: Lsn,
gate_guard: utils::sync::gate::GateGuard,
ctx: &RequestContext,
) -> Result<InMemoryLayer> {
trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");
let file = EphemeralFile::create(conf, tenant_shard_id, timeline_id, ctx).await?;
let file =
EphemeralFile::create(conf, tenant_shard_id, timeline_id, gate_guard, ctx).await?;
let key = InMemoryLayerFileId(file.page_cache_file_id());
Ok(InMemoryLayer {

View File

@@ -39,7 +39,7 @@ async fn smoke_test() {
let layer = {
let mut layers = {
let layers = timeline.layers.read().await;
layers.likely_resident_layers().collect::<Vec<_>>()
layers.likely_resident_layers().cloned().collect::<Vec<_>>()
};
assert_eq!(layers.len(), 1);
@@ -176,7 +176,7 @@ async fn smoke_test() {
{
let layers = &[layer];
let mut g = timeline.layers.write().await;
g.finish_gc_timeline(layers);
g.open_mut().unwrap().finish_gc_timeline(layers);
// this just updates the remote_physical_size for demonstration purposes
rtc.schedule_gc_update(layers).unwrap();
}
@@ -216,7 +216,7 @@ async fn evict_and_wait_on_wanted_deleted() {
let layer = {
let mut layers = {
let layers = timeline.layers.read().await;
layers.likely_resident_layers().collect::<Vec<_>>()
layers.likely_resident_layers().cloned().collect::<Vec<_>>()
};
assert_eq!(layers.len(), 1);
@@ -260,7 +260,7 @@ async fn evict_and_wait_on_wanted_deleted() {
// the deletion of the layer in remote_storage happens.
{
let mut layers = timeline.layers.write().await;
layers.finish_gc_timeline(&[layer]);
layers.open_mut().unwrap().finish_gc_timeline(&[layer]);
}
SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
@@ -301,7 +301,7 @@ fn read_wins_pending_eviction() {
let layer = {
let mut layers = {
let layers = timeline.layers.read().await;
layers.likely_resident_layers().collect::<Vec<_>>()
layers.likely_resident_layers().cloned().collect::<Vec<_>>()
};
assert_eq!(layers.len(), 1);
@@ -433,7 +433,7 @@ fn multiple_pending_evictions_scenario(name: &'static str, in_order: bool) {
let layer = {
let mut layers = {
let layers = timeline.layers.read().await;
layers.likely_resident_layers().collect::<Vec<_>>()
layers.likely_resident_layers().cloned().collect::<Vec<_>>()
};
assert_eq!(layers.len(), 1);
@@ -602,7 +602,7 @@ async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() {
let layer = {
let mut layers = {
let layers = timeline.layers.read().await;
layers.likely_resident_layers().collect::<Vec<_>>()
layers.likely_resident_layers().cloned().collect::<Vec<_>>()
};
assert_eq!(layers.len(), 1);
@@ -682,7 +682,7 @@ async fn evict_and_wait_does_not_wait_for_download() {
let layer = {
let mut layers = {
let layers = timeline.layers.read().await;
layers.likely_resident_layers().collect::<Vec<_>>()
layers.likely_resident_layers().cloned().collect::<Vec<_>>()
};
assert_eq!(layers.len(), 1);
@@ -801,9 +801,9 @@ async fn eviction_cancellation_on_drop() {
let (evicted_layer, not_evicted) = {
let mut layers = {
let mut guard = timeline.layers.write().await;
let layers = guard.likely_resident_layers().collect::<Vec<_>>();
let layers = guard.likely_resident_layers().cloned().collect::<Vec<_>>();
// remove the layers from layermap
guard.finish_gc_timeline(&layers);
guard.open_mut().unwrap().finish_gc_timeline(&layers);
layers
};

View File

@@ -527,6 +527,12 @@ pub(crate) enum PageReconstructError {
MissingKey(MissingKeyError),
}
impl From<layer_manager::Shutdown> for PageReconstructError {
fn from(_: layer_manager::Shutdown) -> Self {
PageReconstructError::Cancelled
}
}
impl GetVectoredError {
#[cfg(test)]
pub(crate) fn is_missing_key_error(&self) -> bool {
@@ -534,6 +540,12 @@ impl GetVectoredError {
}
}
impl From<layer_manager::Shutdown> for GetVectoredError {
fn from(_: layer_manager::Shutdown) -> Self {
GetVectoredError::Cancelled
}
}
pub struct MissingKeyError {
key: Key,
shard: ShardNumber,
@@ -597,6 +609,12 @@ pub(crate) enum CreateImageLayersError {
Other(#[from] anyhow::Error),
}
impl From<layer_manager::Shutdown> for CreateImageLayersError {
fn from(_: layer_manager::Shutdown) -> Self {
CreateImageLayersError::Cancelled
}
}
#[derive(thiserror::Error, Debug, Clone)]
pub(crate) enum FlushLayerError {
/// Timeline cancellation token was cancelled
@@ -634,6 +652,12 @@ impl FlushLayerError {
}
}
impl From<layer_manager::Shutdown> for FlushLayerError {
fn from(_: layer_manager::Shutdown) -> Self {
FlushLayerError::Cancelled
}
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum GetVectoredError {
#[error("timeline shutting down")]
@@ -1198,12 +1222,7 @@ impl Timeline {
/// Hence, the result **does not represent local filesystem usage**.
pub(crate) async fn layer_size_sum(&self) -> u64 {
let guard = self.layers.read().await;
let layer_map = guard.layer_map();
let mut size = 0;
for l in layer_map.iter_historic_layers() {
size += l.file_size;
}
size
guard.layer_size_sum()
}
pub(crate) fn resident_physical_size(&self) -> u64 {
@@ -1370,16 +1389,15 @@ impl Timeline {
// This exists to provide a non-span creating version of `freeze_and_flush` we can call without
// polluting the span hierarchy.
pub(crate) async fn freeze_and_flush0(&self) -> Result<(), FlushLayerError> {
let to_lsn = {
let token = {
// Freeze the current open in-memory layer. It will be written to disk on next
// iteration.
let mut g = self.write_lock.lock().await;
let to_lsn = self.get_last_record_lsn();
self.freeze_inmem_layer_at(to_lsn, &mut g).await;
to_lsn
self.freeze_inmem_layer_at(to_lsn, &mut g).await?
};
self.flush_frozen_layers_and_wait(to_lsn).await
self.wait_flush_completion(token).await
}
// Check if an open ephemeral layer should be closed: this provides
@@ -1393,12 +1411,20 @@ impl Timeline {
return;
};
// FIXME: why not early exit? because before #7927 the state would had been cleared every
// time, and this was missed.
// if write_guard.is_none() { return; }
let Ok(layers_guard) = self.layers.try_read() else {
// Don't block if the layer lock is busy
return;
};
let Some(open_layer) = &layers_guard.layer_map().open_layer else {
let Ok(lm) = layers_guard.layer_map() else {
return;
};
let Some(open_layer) = &lm.open_layer else {
// If there is no open layer, we have no layer freezing to do. However, we might need to generate
// some updates to disk_consistent_lsn and remote_consistent_lsn, in case we ingested some WAL regions
// that didn't result in writes to this shard.
@@ -1424,9 +1450,16 @@ impl Timeline {
);
// The flush loop will update remote consistent LSN as well as disk consistent LSN.
self.flush_frozen_layers_and_wait(last_record_lsn)
.await
.ok();
// We know there is no open layer, so we can request freezing without actually
// freezing anything. This is true even if we have dropped the layers_guard, we
// still hold the write_guard.
let _ = async {
let token = self
.freeze_inmem_layer_at(last_record_lsn, &mut write_guard)
.await?;
self.wait_flush_completion(token).await
}
.await;
}
}
@@ -1464,33 +1497,26 @@ impl Timeline {
self.last_freeze_at.load(),
open_layer.get_opened_at(),
) {
let at_lsn = match open_layer.info() {
match open_layer.info() {
InMemoryLayerInfo::Frozen { lsn_start, lsn_end } => {
// We may reach this point if the layer was already frozen by not yet flushed: flushing
// happens asynchronously in the background.
tracing::debug!(
"Not freezing open layer, it's already frozen ({lsn_start}..{lsn_end})"
);
None
}
InMemoryLayerInfo::Open { .. } => {
// Upgrade to a write lock and freeze the layer
drop(layers_guard);
let mut layers_guard = self.layers.write().await;
let froze = layers_guard
.try_freeze_in_memory_layer(
current_lsn,
&self.last_freeze_at,
&mut write_guard,
)
let res = self
.freeze_inmem_layer_at(current_lsn, &mut write_guard)
.await;
Some(current_lsn).filter(|_| froze)
}
};
if let Some(lsn) = at_lsn {
let res: Result<u64, _> = self.flush_frozen_layers(lsn);
if let Err(e) = res {
tracing::info!("failed to flush frozen layer after background freeze: {e:#}");
if let Err(e) = res {
tracing::info!(
"failed to flush frozen layer after background freeze: {e:#}"
);
}
}
}
}
@@ -1644,6 +1670,11 @@ impl Timeline {
// about corner cases like s3 suddenly hanging up?
self.remote_client.shutdown().await;
}
Err(FlushLayerError::Cancelled) => {
// this is likely the second shutdown, ignore silently.
// TODO: this can be removed once https://github.com/neondatabase/neon/issues/5080
debug_assert!(self.cancel.is_cancelled());
}
Err(e) => {
// Non-fatal. Shutdown is infallible. Failures to flush just mean that
// we have some extra WAL replay to do next time the timeline starts.
@@ -1662,6 +1693,7 @@ impl Timeline {
// Transition the remote_client into a state where it's only useful for timeline deletion.
// (The deletion use case is why we can't just hook up remote_client to Self::cancel).)
self.remote_client.stop();
// As documented in remote_client.stop()'s doc comment, it's our responsibility
// to shut down the upload queue tasks.
// TODO: fix that, task management should be encapsulated inside remote_client.
@@ -1672,10 +1704,17 @@ impl Timeline {
)
.await;
// TODO: work toward making this a no-op. See this funciton's doc comment for more context.
// TODO: work toward making this a no-op. See this function's doc comment for more context.
tracing::debug!("Waiting for tasks...");
task_mgr::shutdown_tasks(None, Some(self.tenant_shard_id), Some(self.timeline_id)).await;
{
// Allow any remaining in-memory layers to do cleanup -- until that, they hold the gate
// open.
let mut write_guard = self.write_lock.lock().await;
self.layers.write().await.shutdown(&mut write_guard);
}
// Finally wait until any gate-holders are complete.
//
// TODO: once above shutdown_tasks is a no-op, we can close the gate before calling shutdown_tasks
@@ -1769,9 +1808,12 @@ impl Timeline {
}
}
pub(crate) async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
pub(crate) async fn layer_map_info(
&self,
reset: LayerAccessStatsReset,
) -> Result<LayerMapInfo, layer_manager::Shutdown> {
let guard = self.layers.read().await;
let layer_map = guard.layer_map();
let layer_map = guard.layer_map()?;
let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
if let Some(open_layer) = &layer_map.open_layer {
in_memory_layers.push(open_layer.info());
@@ -1780,16 +1822,15 @@ impl Timeline {
in_memory_layers.push(frozen_layer.info());
}
let mut historic_layers = Vec::new();
for historic_layer in layer_map.iter_historic_layers() {
let historic_layer = guard.get_from_desc(&historic_layer);
historic_layers.push(historic_layer.info(reset));
}
let historic_layers = layer_map
.iter_historic_layers()
.map(|desc| guard.get_from_desc(&desc).info(reset))
.collect();
LayerMapInfo {
Ok(LayerMapInfo {
in_memory_layers,
historic_layers,
}
})
}
#[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))]
@@ -1797,7 +1838,7 @@ impl Timeline {
&self,
layer_file_name: &LayerName,
) -> anyhow::Result<Option<bool>> {
let Some(layer) = self.find_layer(layer_file_name).await else {
let Some(layer) = self.find_layer(layer_file_name).await? else {
return Ok(None);
};
@@ -1818,7 +1859,7 @@ impl Timeline {
.enter()
.map_err(|_| anyhow::anyhow!("Shutting down"))?;
let Some(local_layer) = self.find_layer(layer_file_name).await else {
let Some(local_layer) = self.find_layer(layer_file_name).await? else {
return Ok(None);
};
@@ -2304,7 +2345,10 @@ impl Timeline {
let mut layers = self.layers.try_write().expect(
"in the context where we call this function, no other task has access to the object",
);
layers.initialize_empty(Lsn(start_lsn.0));
layers
.open_mut()
.expect("in this context the LayerManager must still be open")
.initialize_empty(Lsn(start_lsn.0));
}
/// Scan the timeline directory, cleanup, populate the layer map, and schedule uploads for local-only
@@ -2436,7 +2480,10 @@ impl Timeline {
let num_layers = loaded_layers.len();
guard.initialize_local_layers(loaded_layers, disk_consistent_lsn + 1);
guard
.open_mut()
.expect("layermanager must be open during init")
.initialize_local_layers(loaded_layers, disk_consistent_lsn + 1);
self.remote_client
.schedule_layer_file_deletion(&needs_cleanup)?;
@@ -2471,7 +2518,7 @@ impl Timeline {
// Now that we have the full layer map, we may calculate the visibility of layers within it (a global scan)
drop(guard); // drop write lock, update_layer_visibility will take a read lock.
self.update_layer_visibility().await;
self.update_layer_visibility().await?;
info!(
"loaded layer map with {} layers at {}, total physical size: {}",
@@ -2893,16 +2940,17 @@ impl Timeline {
}
}
async fn find_layer(&self, layer_name: &LayerName) -> Option<Layer> {
async fn find_layer(
&self,
layer_name: &LayerName,
) -> Result<Option<Layer>, layer_manager::Shutdown> {
let guard = self.layers.read().await;
for historic_layer in guard.layer_map().iter_historic_layers() {
let historic_layer_name = historic_layer.layer_name();
if layer_name == &historic_layer_name {
return Some(guard.get_from_desc(&historic_layer));
}
}
None
let layer = guard
.layer_map()?
.iter_historic_layers()
.find(|l| &l.layer_name() == layer_name)
.map(|found| guard.get_from_desc(&found));
Ok(layer)
}
/// The timeline heatmap is a hint to secondary locations from the primary location,
@@ -2953,6 +3001,7 @@ impl Timeline {
}
impl Timeline {
#[allow(unknown_lints)] // doc_lazy_continuation is still a new lint
#[allow(clippy::doc_lazy_continuation)]
/// Get the data needed to reconstruct all keys in the provided keyspace
///
@@ -3104,7 +3153,7 @@ impl Timeline {
// which turns out to be a perf bottleneck in some cases.
if !unmapped_keyspace.is_empty() {
let guard = timeline.layers.read().await;
let layers = guard.layer_map();
let layers = guard.layer_map()?;
let in_memory_layer = layers.find_in_memory_layer(|l| {
let start_lsn = l.get_lsn_range().start;
@@ -3256,22 +3305,35 @@ impl Timeline {
}
}
/// Returns a non-frozen open in-memory layer for ingestion.
///
/// Get a handle to the latest layer for appending.
///
/// Takes a witness of timeline writer state lock being held, because it makes no sense to call
/// this function without holding the mutex.
async fn get_layer_for_write(
&self,
lsn: Lsn,
_guard: &tokio::sync::MutexGuard<'_, Option<TimelineWriterState>>,
ctx: &RequestContext,
) -> anyhow::Result<Arc<InMemoryLayer>> {
let mut guard = self.layers.write().await;
let gate_guard = self.gate.enter().context("enter gate for inmem layer")?;
let last_record_lsn = self.get_last_record_lsn();
ensure!(
lsn > last_record_lsn,
"cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})",
lsn,
last_record_lsn,
);
let layer = guard
.open_mut()?
.get_layer_for_write(
lsn,
self.get_last_record_lsn(),
self.conf,
self.timeline_id,
self.tenant_shard_id,
gate_guard,
ctx,
)
.await?;
@@ -3285,21 +3347,48 @@ impl Timeline {
self.last_record_lsn.advance(new_lsn);
}
/// Freeze any existing open in-memory layer and unconditionally notify the flush loop.
///
/// Unconditional flush loop notification is given because in sharded cases we will want to
/// leave an Lsn gap. Unsharded tenants do not have Lsn gaps.
async fn freeze_inmem_layer_at(
&self,
at: Lsn,
write_lock: &mut tokio::sync::MutexGuard<'_, Option<TimelineWriterState>>,
) {
) -> Result<u64, FlushLayerError> {
let frozen = {
let mut guard = self.layers.write().await;
guard
.open_mut()?
.try_freeze_in_memory_layer(at, &self.last_freeze_at, write_lock)
.await
};
if frozen {
let now = Instant::now();
*(self.last_freeze_ts.write().unwrap()) = now;
}
// Increment the flush cycle counter and wake up the flush task.
// Remember the new value, so that when we listen for the flush
// to finish, we know when the flush that we initiated has
// finished, instead of some other flush that was started earlier.
let mut my_flush_request = 0;
let flush_loop_state = { *self.flush_loop_state.lock().unwrap() };
if !matches!(flush_loop_state, FlushLoopState::Running { .. }) {
return Err(FlushLayerError::NotRunning(flush_loop_state));
}
self.layer_flush_start_tx.send_modify(|(counter, lsn)| {
my_flush_request = *counter + 1;
*counter = my_flush_request;
*lsn = std::cmp::max(at, *lsn);
});
assert_ne!(my_flush_request, 0);
Ok(my_flush_request)
}
/// Layer flusher task's main loop.
@@ -3336,7 +3425,11 @@ impl Timeline {
let layer_to_flush = {
let guard = self.layers.read().await;
guard.layer_map().frozen_layers.front().cloned()
let Ok(lm) = guard.layer_map() else {
info!("dropping out of flush loop for timeline shutdown");
return;
};
lm.frozen_layers.front().cloned()
// drop 'layers' lock to allow concurrent reads and writes
};
let Some(layer_to_flush) = layer_to_flush else {
@@ -3393,34 +3486,7 @@ impl Timeline {
}
}
/// Request the flush loop to write out all frozen layers up to `at_lsn` as Delta L0 files to disk.
/// The caller is responsible for the freezing, e.g., [`Self::freeze_inmem_layer_at`].
///
/// `at_lsn` may be higher than the highest LSN of a frozen layer: if this is the
/// case, it means no data will be written between the top of the highest frozen layer and
/// to_lsn, e.g. because this tenant shard has ingested up to to_lsn and not written any data
/// locally for that part of the WAL.
fn flush_frozen_layers(&self, at_lsn: Lsn) -> Result<u64, FlushLayerError> {
// Increment the flush cycle counter and wake up the flush task.
// Remember the new value, so that when we listen for the flush
// to finish, we know when the flush that we initiated has
// finished, instead of some other flush that was started earlier.
let mut my_flush_request = 0;
let flush_loop_state = { *self.flush_loop_state.lock().unwrap() };
if !matches!(flush_loop_state, FlushLoopState::Running { .. }) {
return Err(FlushLayerError::NotRunning(flush_loop_state));
}
self.layer_flush_start_tx.send_modify(|(counter, lsn)| {
my_flush_request = *counter + 1;
*counter = my_flush_request;
*lsn = std::cmp::max(at_lsn, *lsn);
});
Ok(my_flush_request)
}
/// Waits any flush request created by [`Self::freeze_inmem_layer_at`] to complete.
async fn wait_flush_completion(&self, request: u64) -> Result<(), FlushLayerError> {
let mut rx = self.layer_flush_done_tx.subscribe();
loop {
@@ -3453,11 +3519,6 @@ impl Timeline {
}
}
async fn flush_frozen_layers_and_wait(&self, at_lsn: Lsn) -> Result<(), FlushLayerError> {
let token = self.flush_frozen_layers(at_lsn)?;
self.wait_flush_completion(token).await
}
/// Flush one frozen in-memory layer to disk, as a new delta layer.
///
/// Return value is the last lsn (inclusive) of the layer that was frozen.
@@ -3594,11 +3655,11 @@ impl Timeline {
{
let mut guard = self.layers.write().await;
if self.cancel.is_cancelled() {
return Err(FlushLayerError::Cancelled);
}
guard.finish_flush_l0_layer(delta_layer_to_add.as_ref(), &frozen_layer, &self.metrics);
guard.open_mut()?.finish_flush_l0_layer(
delta_layer_to_add.as_ref(),
&frozen_layer,
&self.metrics,
);
if self.set_disk_consistent_lsn(disk_consistent_lsn) {
// Schedule remote uploads that will reflect our new disk_consistent_lsn
@@ -3806,7 +3867,9 @@ impl Timeline {
let threshold = self.get_image_creation_threshold();
let guard = self.layers.read().await;
let layers = guard.layer_map();
let Ok(layers) = guard.layer_map() else {
return false;
};
let mut max_deltas = 0;
for part_range in &partition.ranges {
@@ -4214,13 +4277,16 @@ impl Timeline {
let mut guard = self.layers.write().await;
// FIXME: we could add the images to be uploaded *before* returning from here, but right
// now they are being scheduled outside of write lock
guard.track_new_image_layers(&image_layers, &self.metrics);
// now they are being scheduled outside of write lock; current way is inconsistent with
// compaction lock order.
guard
.open_mut()?
.track_new_image_layers(&image_layers, &self.metrics);
drop_wlock(guard);
timer.stop_and_record();
// Creating image layers may have caused some previously visible layers to be covered
self.update_layer_visibility().await;
self.update_layer_visibility().await?;
Ok(image_layers)
}
@@ -4379,6 +4445,12 @@ impl CompactionError {
}
}
impl From<layer_manager::Shutdown> for CompactionError {
fn from(_: layer_manager::Shutdown) -> Self {
CompactionError::ShuttingDown
}
}
#[serde_as]
#[derive(serde::Serialize)]
struct RecordedDuration(#[serde_as(as = "serde_with::DurationMicroSeconds")] Duration);
@@ -4484,11 +4556,14 @@ impl Timeline {
.collect();
if !new_images.is_empty() {
guard.track_new_image_layers(new_images, &self.metrics);
guard
.open_mut()?
.track_new_image_layers(new_images, &self.metrics);
}
// deletion will happen later, the layer file manager calls garbage_collect_on_drop
guard.finish_compact_l0(&remove_layers, &insert_layers, &self.metrics);
guard
.open_mut()?
.finish_compact_l0(&remove_layers, &insert_layers, &self.metrics);
self.remote_client
.schedule_compaction_update(&remove_layers, new_deltas)?;
@@ -4502,7 +4577,7 @@ impl Timeline {
self: &Arc<Self>,
mut replace_layers: Vec<(Layer, ResidentLayer)>,
mut drop_layers: Vec<Layer>,
) -> Result<(), super::upload_queue::NotInitialized> {
) -> Result<(), CompactionError> {
let mut guard = self.layers.write().await;
// Trim our lists in case our caller (compaction) raced with someone else (GC) removing layers: we want
@@ -4510,7 +4585,9 @@ impl Timeline {
replace_layers.retain(|(l, _)| guard.contains(l));
drop_layers.retain(|l| guard.contains(l));
guard.rewrite_layers(&replace_layers, &drop_layers, &self.metrics);
guard
.open_mut()?
.rewrite_layers(&replace_layers, &drop_layers, &self.metrics);
let upload_layers: Vec<_> = replace_layers.into_iter().map(|r| r.1).collect();
@@ -4799,7 +4876,7 @@ impl Timeline {
//
// TODO holding a write lock is too agressive and avoidable
let mut guard = self.layers.write().await;
let layers = guard.layer_map();
let layers = guard.layer_map()?;
'outer: for l in layers.iter_historic_layers() {
result.layers_total += 1;
@@ -4927,7 +5004,7 @@ impl Timeline {
}
})?;
guard.finish_gc_timeline(&gc_layers);
guard.open_mut()?.finish_gc_timeline(&gc_layers);
#[cfg(feature = "testing")]
{
@@ -5083,9 +5160,13 @@ impl Timeline {
let remaining = {
let guard = self.layers.read().await;
guard
.layer_map()
.iter_historic_layers()
let Ok(lm) = guard.layer_map() else {
// technically here we could look into iterating accessible layers, but downloading
// all layers of a shutdown timeline makes no sense regardless.
tracing::info!("attempted to download all layers of shutdown timeline");
return;
};
lm.iter_historic_layers()
.map(|desc| guard.get_from_desc(&desc))
.collect::<Vec<_>>()
};
@@ -5195,7 +5276,7 @@ impl Timeline {
let last_activity_ts = layer.latest_activity();
EvictionCandidate {
layer: layer.into(),
layer: layer.to_owned().into(),
last_activity_ts,
relative_last_activity: finite_f32::FiniteF32::ZERO,
}
@@ -5280,7 +5361,7 @@ impl Timeline {
{
let mut guard = self.layers.write().await;
guard.force_insert_layer(image_layer);
guard.open_mut().unwrap().force_insert_layer(image_layer);
}
Ok(())
@@ -5324,7 +5405,7 @@ impl Timeline {
}
let guard = self.layers.read().await;
for layer in guard.layer_map().iter_historic_layers() {
for layer in guard.layer_map()?.iter_historic_layers() {
if layer.is_delta()
&& overlaps_with(&layer.lsn_range, &deltas.lsn_range)
&& layer.lsn_range != deltas.lsn_range
@@ -5354,7 +5435,7 @@ impl Timeline {
{
let mut guard = self.layers.write().await;
guard.force_insert_layer(delta_layer);
guard.open_mut().unwrap().force_insert_layer(delta_layer);
}
Ok(())
@@ -5369,7 +5450,7 @@ impl Timeline {
) -> anyhow::Result<Vec<(Key, Bytes)>> {
let mut all_data = Vec::new();
let guard = self.layers.read().await;
for layer in guard.layer_map().iter_historic_layers() {
for layer in guard.layer_map()?.iter_historic_layers() {
if !layer.is_delta() && layer.image_layer_lsn() == lsn {
let layer = guard.get_from_desc(&layer);
let mut reconstruct_data = ValuesReconstructState::default();
@@ -5397,7 +5478,7 @@ impl Timeline {
) -> anyhow::Result<Vec<super::storage_layer::PersistentLayerKey>> {
let mut layers = Vec::new();
let guard = self.layers.read().await;
for layer in guard.layer_map().iter_historic_layers() {
for layer in guard.layer_map()?.iter_historic_layers() {
layers.push(layer.key());
}
Ok(layers)
@@ -5414,7 +5495,7 @@ impl Timeline {
/// Tracking writes ingestion does to a particular in-memory layer.
///
/// Cleared upon freezing a layer.
struct TimelineWriterState {
pub(crate) struct TimelineWriterState {
open_layer: Arc<InMemoryLayer>,
current_size: u64,
// Previous Lsn which passed through
@@ -5522,7 +5603,10 @@ impl<'a> TimelineWriter<'a> {
}
async fn open_layer(&mut self, at: Lsn, ctx: &RequestContext) -> anyhow::Result<()> {
let layer = self.tl.get_layer_for_write(at, ctx).await?;
let layer = self
.tl
.get_layer_for_write(at, &self.write_guard, ctx)
.await?;
let initial_size = layer.size().await?;
let last_freeze_at = self.last_freeze_at.load();
@@ -5535,15 +5619,15 @@ impl<'a> TimelineWriter<'a> {
Ok(())
}
async fn roll_layer(&mut self, freeze_at: Lsn) -> anyhow::Result<()> {
async fn roll_layer(&mut self, freeze_at: Lsn) -> Result<(), FlushLayerError> {
let current_size = self.write_guard.as_ref().unwrap().current_size;
// self.write_guard will be taken by the freezing
self.tl
.freeze_inmem_layer_at(freeze_at, &mut self.write_guard)
.await;
.await?;
self.tl.flush_frozen_layers(freeze_at)?;
assert!(self.write_guard.is_none());
if current_size >= self.get_checkpoint_distance() * 2 {
warn!("Flushed oversized open layer with size {}", current_size)
@@ -5708,6 +5792,7 @@ mod tests {
let layers = timeline.layers.read().await;
let desc = layers
.layer_map()
.unwrap()
.iter_historic_layers()
.next()
.expect("must find one layer to evict");

View File

@@ -371,7 +371,7 @@ impl Timeline {
);
let layers = self.layers.read().await;
for layer_desc in layers.layer_map().iter_historic_layers() {
for layer_desc in layers.layer_map()?.iter_historic_layers() {
let layer = layers.get_from_desc(&layer_desc);
if layer.metadata().shard.shard_count == self.shard_identity.count {
// This layer does not belong to a historic ancestor, no need to re-image it.
@@ -549,7 +549,9 @@ impl Timeline {
///
/// The result may be used as an input to eviction and secondary downloads to de-prioritize layers
/// that we know won't be needed for reads.
pub(super) async fn update_layer_visibility(&self) {
pub(super) async fn update_layer_visibility(
&self,
) -> Result<(), super::layer_manager::Shutdown> {
let head_lsn = self.get_last_record_lsn();
// We will sweep through layers in reverse-LSN order. We only do historic layers. L0 deltas
@@ -557,7 +559,7 @@ impl Timeline {
// Note that L0 deltas _can_ be covered by image layers, but we consider them 'visible' because we anticipate that
// they will be subject to L0->L1 compaction in the near future.
let layer_manager = self.layers.read().await;
let layer_map = layer_manager.layer_map();
let layer_map = layer_manager.layer_map()?;
let readable_points = {
let children = self.gc_info.read().unwrap().retain_lsns.clone();
@@ -580,6 +582,7 @@ impl Timeline {
// TODO: publish our covered KeySpace to our parent, so that when they update their visibility, they can
// avoid assuming that everything at a branch point is visible.
drop(covered);
Ok(())
}
/// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
@@ -633,12 +636,8 @@ impl Timeline {
) -> Result<CompactLevel0Phase1Result, CompactionError> {
stats.read_lock_held_spawn_blocking_startup_micros =
stats.read_lock_acquisition_micros.till_now(); // set by caller
let layers = guard.layer_map();
let level0_deltas = layers.get_level0_deltas();
let mut level0_deltas = level0_deltas
.into_iter()
.map(|x| guard.get_from_desc(&x))
.collect_vec();
let layers = guard.layer_map()?;
let level0_deltas = layers.level0_deltas();
stats.level0_deltas_count = Some(level0_deltas.len());
// Only compact if enough layers have accumulated.
@@ -651,6 +650,11 @@ impl Timeline {
return Ok(CompactLevel0Phase1Result::default());
}
let mut level0_deltas = level0_deltas
.iter()
.map(|x| guard.get_from_desc(x))
.collect::<Vec<_>>();
// Gather the files to compact in this iteration.
//
// Start with the oldest Level 0 delta file, and collect any other
@@ -1407,10 +1411,9 @@ impl Timeline {
// Find the top of the historical layers
let end_lsn = {
let guard = self.layers.read().await;
let layers = guard.layer_map();
let layers = guard.layer_map()?;
let l0_deltas = layers.get_level0_deltas();
drop(guard);
let l0_deltas = layers.level0_deltas();
// As an optimization, if we find that there are too few L0 layers,
// bail out early. We know that the compaction algorithm would do
@@ -1782,7 +1785,7 @@ impl Timeline {
// 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection.
let (layer_selection, gc_cutoff, retain_lsns_below_horizon) = {
let guard = self.layers.read().await;
let layers = guard.layer_map();
let layers = guard.layer_map()?;
let gc_info = self.gc_info.read().unwrap();
let mut retain_lsns_below_horizon = Vec::new();
let gc_cutoff = gc_info.cutoffs.select_min();
@@ -2216,7 +2219,9 @@ impl Timeline {
// Step 3: Place back to the layer map.
{
let mut guard = self.layers.write().await;
guard.finish_gc_compaction(&layer_selection, &compact_to, &self.metrics)
guard
.open_mut()?
.finish_gc_compaction(&layer_selection, &compact_to, &self.metrics)
};
self.remote_client
.schedule_compaction_update(&layer_selection, &compact_to)?;
@@ -2296,7 +2301,7 @@ impl CompactionJobExecutor for TimelineAdaptor {
self.flush_updates().await?;
let guard = self.timeline.layers.read().await;
let layer_map = guard.layer_map();
let layer_map = guard.layer_map()?;
let result = layer_map
.iter_historic_layers()

View File

@@ -74,6 +74,11 @@ impl From<crate::tenant::upload_queue::NotInitialized> for Error {
Error::ShuttingDown
}
}
impl From<super::layer_manager::Shutdown> for Error {
fn from(_: super::layer_manager::Shutdown) -> Self {
Error::ShuttingDown
}
}
impl From<FlushLayerError> for Error {
fn from(value: FlushLayerError) -> Self {
@@ -277,7 +282,7 @@ pub(super) async fn prepare(
// between retries, these can change if compaction or gc ran in between. this will mean
// we have to redo work.
partition_work(ancestor_lsn, &layers)
partition_work(ancestor_lsn, &layers)?
};
// TODO: layers are already sorted by something: use that to determine how much of remote
@@ -383,14 +388,14 @@ pub(super) async fn prepare(
fn partition_work(
ancestor_lsn: Lsn,
source_layermap: &LayerManager,
) -> (usize, Vec<Layer>, Vec<Layer>) {
source: &LayerManager,
) -> Result<(usize, Vec<Layer>, Vec<Layer>), Error> {
let mut straddling_branchpoint = vec![];
let mut rest_of_historic = vec![];
let mut later_by_lsn = 0;
for desc in source_layermap.layer_map().iter_historic_layers() {
for desc in source.layer_map()?.iter_historic_layers() {
// off by one chances here:
// - start is inclusive
// - end is exclusive
@@ -409,10 +414,10 @@ fn partition_work(
&mut rest_of_historic
};
target.push(source_layermap.get_from_desc(&desc));
target.push(source.get_from_desc(&desc));
}
(later_by_lsn, straddling_branchpoint, rest_of_historic)
Ok((later_by_lsn, straddling_branchpoint, rest_of_historic))
}
async fn upload_rewritten_layer(

View File

@@ -213,51 +213,45 @@ impl Timeline {
let mut js = tokio::task::JoinSet::new();
{
let guard = self.layers.read().await;
let layers = guard.layer_map();
for layer in layers.iter_historic_layers() {
let layer = guard.get_from_desc(&layer);
// guard against eviction while we inspect it; it might be that eviction_task and
// disk_usage_eviction_task both select the same layers to be evicted, and
// seemingly free up double the space. both succeeding is of no consequence.
guard
.likely_resident_layers()
.filter(|layer| {
let last_activity_ts = layer.latest_activity();
if !layer.is_likely_resident() {
continue;
}
let no_activity_for = match now.duration_since(last_activity_ts) {
Ok(d) => d,
Err(_e) => {
// We reach here if `now` < `last_activity_ts`, which can legitimately
// happen if there is an access between us getting `now`, and us getting
// the access stats from the layer.
//
// The other reason why it can happen is system clock skew because
// SystemTime::now() is not monotonic, so, even if there is no access
// to the layer after we get `now` at the beginning of this function,
// it could be that `now` < `last_activity_ts`.
//
// To distinguish the cases, we would need to record `Instant`s in the
// access stats (i.e., monotonic timestamps), but then, the timestamps
// values in the access stats would need to be `Instant`'s, and hence
// they would be meaningless outside of the pageserver process.
// At the time of writing, the trade-off is that access stats are more
// valuable than detecting clock skew.
return false;
}
};
let last_activity_ts = layer.latest_activity();
let no_activity_for = match now.duration_since(last_activity_ts) {
Ok(d) => d,
Err(_e) => {
// We reach here if `now` < `last_activity_ts`, which can legitimately
// happen if there is an access between us getting `now`, and us getting
// the access stats from the layer.
//
// The other reason why it can happen is system clock skew because
// SystemTime::now() is not monotonic, so, even if there is no access
// to the layer after we get `now` at the beginning of this function,
// it could be that `now` < `last_activity_ts`.
//
// To distinguish the cases, we would need to record `Instant`s in the
// access stats (i.e., monotonic timestamps), but then, the timestamps
// values in the access stats would need to be `Instant`'s, and hence
// they would be meaningless outside of the pageserver process.
// At the time of writing, the trade-off is that access stats are more
// valuable than detecting clock skew.
continue;
}
};
if no_activity_for > p.threshold {
no_activity_for > p.threshold
})
.cloned()
.for_each(|layer| {
js.spawn(async move {
layer
.evict_and_wait(std::time::Duration::from_secs(5))
.await
});
stats.candidates += 1;
}
}
});
};
let join_all = async move {

View File

@@ -1,4 +1,4 @@
use anyhow::{bail, ensure, Context, Result};
use anyhow::{bail, ensure, Context};
use itertools::Itertools;
use pageserver_api::shard::TenantShardId;
use std::{collections::HashMap, sync::Arc};
@@ -24,39 +24,142 @@ use crate::{
use super::TimelineWriterState;
/// Provides semantic APIs to manipulate the layer map.
#[derive(Default)]
pub(crate) struct LayerManager {
layer_map: LayerMap,
layer_fmgr: LayerFileManager<Layer>,
pub(crate) enum LayerManager {
/// Open as in not shutdown layer manager; we still have in-memory layers and we can manipulate
/// the layers.
Open(OpenLayerManager),
/// Shutdown layer manager where there are no more in-memory layers and persistent layers are
/// read-only.
Closed {
layers: HashMap<PersistentLayerKey, Layer>,
},
}
impl Default for LayerManager {
fn default() -> Self {
LayerManager::Open(OpenLayerManager::default())
}
}
impl LayerManager {
pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
self.layer_fmgr.get_from_desc(desc)
pub(crate) fn get_from_key(&self, key: &PersistentLayerKey) -> Layer {
// The assumption for the `expect()` is that all code maintains the following invariant:
// A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
self.layers()
.get(key)
.with_context(|| format!("get layer from key: {key}"))
.expect("not found")
.clone()
}
pub(crate) fn get_from_key(&self, desc: &PersistentLayerKey) -> Layer {
self.layer_fmgr.get_from_key(desc)
pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
self.get_from_key(&desc.key())
}
/// Get an immutable reference to the layer map.
///
/// We expect users only to be able to get an immutable layer map. If users want to make modifications,
/// they should use the below semantic APIs. This design makes us step closer to immutable storage state.
pub(crate) fn layer_map(&self) -> &LayerMap {
&self.layer_map
pub(crate) fn layer_map(&self) -> Result<&LayerMap, Shutdown> {
use LayerManager::*;
match self {
Open(OpenLayerManager { layer_map, .. }) => Ok(layer_map),
Closed { .. } => Err(Shutdown),
}
}
pub(crate) fn open_mut(&mut self) -> Result<&mut OpenLayerManager, Shutdown> {
use LayerManager::*;
match self {
Open(open) => Ok(open),
Closed { .. } => Err(Shutdown),
}
}
/// LayerManager shutdown. The in-memory layers do cleanup on drop, so we must drop them in
/// order to allow shutdown to complete.
///
/// If there was a want to flush in-memory layers, it must have happened earlier.
pub(crate) fn shutdown(&mut self, writer_state: &mut Option<TimelineWriterState>) {
use LayerManager::*;
match self {
Open(OpenLayerManager {
layer_map,
layer_fmgr: LayerFileManager(hashmap),
}) => {
let open = layer_map.open_layer.take();
let frozen = layer_map.frozen_layers.len();
let taken_writer_state = writer_state.take();
tracing::info!(open = open.is_some(), frozen, "dropped inmemory layers");
let layers = std::mem::take(hashmap);
*self = Closed { layers };
assert_eq!(open.is_some(), taken_writer_state.is_some());
}
Closed { .. } => {
tracing::debug!("ignoring multiple shutdowns on layer manager")
}
}
}
/// Sum up the historic layer sizes
pub(crate) fn layer_size_sum(&self) -> u64 {
self.layers()
.values()
.map(|l| l.layer_desc().file_size)
.sum()
}
pub(crate) fn likely_resident_layers(&self) -> impl Iterator<Item = &'_ Layer> + '_ {
self.layers().values().filter(|l| l.is_likely_resident())
}
pub(crate) fn contains(&self, layer: &Layer) -> bool {
self.contains_key(&layer.layer_desc().key())
}
pub(crate) fn contains_key(&self, key: &PersistentLayerKey) -> bool {
self.layers().contains_key(key)
}
pub(crate) fn all_persistent_layers(&self) -> Vec<PersistentLayerKey> {
self.layers().keys().cloned().collect_vec()
}
fn layers(&self) -> &HashMap<PersistentLayerKey, Layer> {
use LayerManager::*;
match self {
Open(OpenLayerManager { layer_fmgr, .. }) => &layer_fmgr.0,
Closed { layers } => layers,
}
}
}
#[derive(Default)]
pub(crate) struct OpenLayerManager {
layer_map: LayerMap,
layer_fmgr: LayerFileManager<Layer>,
}
impl std::fmt::Debug for OpenLayerManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OpenLayerManager")
.field("layer_count", &self.layer_fmgr.0.len())
.finish()
}
}
#[derive(Debug, thiserror::Error)]
#[error("layer manager has been shutdown")]
pub(crate) struct Shutdown;
impl OpenLayerManager {
/// Called from `load_layer_map`. Initialize the layer manager with:
/// 1. all on-disk layers
/// 2. next open layer (with disk disk_consistent_lsn LSN)
pub(crate) fn initialize_local_layers(
&mut self,
on_disk_layers: Vec<Layer>,
next_open_layer_at: Lsn,
) {
pub(crate) fn initialize_local_layers(&mut self, layers: Vec<Layer>, next_open_layer_at: Lsn) {
let mut updates = self.layer_map.batch_update();
for layer in on_disk_layers {
for layer in layers {
Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr);
}
updates.flush();
@@ -68,26 +171,19 @@ impl LayerManager {
self.layer_map.next_open_layer_at = Some(next_open_layer_at);
}
/// Open a new writable layer to append data if there is no open layer, otherwise return the current open layer,
/// called within `get_layer_for_write`.
/// Open a new writable layer to append data if there is no open layer, otherwise return the
/// current open layer, called within `get_layer_for_write`.
pub(crate) async fn get_layer_for_write(
&mut self,
lsn: Lsn,
last_record_lsn: Lsn,
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
gate_guard: utils::sync::gate::GateGuard,
ctx: &RequestContext,
) -> Result<Arc<InMemoryLayer>> {
) -> anyhow::Result<Arc<InMemoryLayer>> {
ensure!(lsn.is_aligned());
ensure!(
lsn > last_record_lsn,
"cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})",
lsn,
last_record_lsn,
);
// Do we have a layer open for writing already?
let layer = if let Some(open_layer) = &self.layer_map.open_layer {
if open_layer.get_lsn_range().start > lsn {
@@ -113,8 +209,15 @@ impl LayerManager {
lsn
);
let new_layer =
InMemoryLayer::create(conf, timeline_id, tenant_shard_id, start_lsn, ctx).await?;
let new_layer = InMemoryLayer::create(
conf,
timeline_id,
tenant_shard_id,
start_lsn,
gate_guard,
ctx,
)
.await?;
let layer = Arc::new(new_layer);
self.layer_map.open_layer = Some(layer.clone());
@@ -168,7 +271,7 @@ impl LayerManager {
froze
}
/// Add image layers to the layer map, called from `create_image_layers`.
/// Add image layers to the layer map, called from [`super::Timeline::create_image_layers`].
pub(crate) fn track_new_image_layers(
&mut self,
image_layers: &[ResidentLayer],
@@ -241,7 +344,7 @@ impl LayerManager {
self.finish_compact_l0(compact_from, compact_to, metrics)
}
/// Called when compaction is completed.
/// Called post-compaction when some previous generation image layers were trimmed.
pub(crate) fn rewrite_layers(
&mut self,
rewrite_layers: &[(Layer, ResidentLayer)],
@@ -330,31 +433,6 @@ impl LayerManager {
mapping.remove(layer);
layer.delete_on_drop();
}
pub(crate) fn likely_resident_layers(&self) -> impl Iterator<Item = Layer> + '_ {
// for small layer maps, we most likely have all resident, but for larger more are likely
// to be evicted assuming lots of layers correlated with longer lifespan.
self.layer_map().iter_historic_layers().filter_map(|desc| {
self.layer_fmgr
.0
.get(&desc.key())
.filter(|l| l.is_likely_resident())
.cloned()
})
}
pub(crate) fn contains(&self, layer: &Layer) -> bool {
self.layer_fmgr.contains(layer)
}
pub(crate) fn contains_key(&self, key: &PersistentLayerKey) -> bool {
self.layer_fmgr.contains_key(key)
}
pub(crate) fn all_persistent_layers(&self) -> Vec<PersistentLayerKey> {
self.layer_fmgr.0.keys().cloned().collect_vec()
}
}
pub(crate) struct LayerFileManager<T>(HashMap<PersistentLayerKey, T>);
@@ -366,24 +444,6 @@ impl<T> Default for LayerFileManager<T> {
}
impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
fn get_from_key(&self, key: &PersistentLayerKey) -> T {
// The assumption for the `expect()` is that all code maintains the following invariant:
// A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
self.0
.get(key)
.with_context(|| format!("get layer from key: {}", key))
.expect("not found")
.clone()
}
fn get_from_desc(&self, desc: &PersistentLayerDesc) -> T {
self.get_from_key(&desc.key())
}
fn contains_key(&self, key: &PersistentLayerKey) -> bool {
self.0.contains_key(key)
}
pub(crate) fn insert(&mut self, layer: T) {
let present = self.0.insert(layer.layer_desc().key(), layer.clone());
if present.is_some() && cfg!(debug_assertions) {
@@ -391,10 +451,6 @@ impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
}
}
pub(crate) fn contains(&self, layer: &T) -> bool {
self.0.contains_key(&layer.layer_desc().key())
}
pub(crate) fn remove(&mut self, layer: &T) {
let present = self.0.remove(&layer.layer_desc().key());
if present.is_none() && cfg!(debug_assertions) {