From 5c8f96e4d24241d00c989c70a3aa9d9696ae028e Mon Sep 17 00:00:00 2001 From: Alex Chi Date: Thu, 15 Jun 2023 16:29:29 -0400 Subject: [PATCH] refactor: move evict and download to layer cache Signed-off-by: Alex Chi --- pageserver/src/disk_usage_eviction_task.rs | 5 +- pageserver/src/http/routes.rs | 2 + pageserver/src/tenant/layer_cache.rs | 257 +++++++++++++++++- pageserver/src/tenant/timeline.rs | 231 +--------------- .../src/tenant/timeline/eviction_task.rs | 1 + 5 files changed, 264 insertions(+), 232 deletions(-) diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index 61cbd5066f..ca2b0770d5 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -379,7 +379,10 @@ pub async fn disk_usage_eviction_task_iteration_impl( debug!(%timeline_id, "evicting batch for timeline"); async { - let results = timeline.evict_layers(storage, &batch, cancel.clone()).await; + let results = timeline + .lcache + .evict_layers(storage, &batch, cancel.clone()) + .await; match results { Err(e) => { diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index fc8da70cc0..16b2574a67 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -719,6 +719,7 @@ async fn layer_download_handler( let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?; let downloaded = timeline + .lcache .download_layer(layer_file_name) .await .map_err(ApiError::InternalServerError)?; @@ -744,6 +745,7 @@ async fn evict_timeline_layer_handler( let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?; let evicted = timeline + .lcache .evict_layer(layer_file_name) .await .map_err(ApiError::InternalServerError)?; diff --git a/pageserver/src/tenant/layer_cache.rs b/pageserver/src/tenant/layer_cache.rs index 060cfa09d8..d16bb15815 100644 --- a/pageserver/src/tenant/layer_cache.rs +++ b/pageserver/src/tenant/layer_cache.rs @@ -1,9 +1,19 @@ +use super::remote_timeline_client::RemoteTimelineClient; use super::storage_layer::{PersistentLayer, PersistentLayerDesc, PersistentLayerKey, RemoteLayer}; use super::Timeline; use crate::tenant::layer_map::{self, LayerMap}; -use anyhow::Result; +use crate::tenant::remote_timeline_client::index::LayerFileMetadata; +use crate::tenant::storage_layer::LayerFileName; +use anyhow::{Context, Result}; +use pageserver_api::models::TimelineState; +use remote_storage::GenericRemoteStorage; use std::sync::{Mutex, Weak}; +use std::time::SystemTime; use std::{collections::HashMap, sync::Arc}; +use tokio_util::sync::CancellationToken; +use tracing::instrument; +use tracing::{error, info, warn}; +use utils::id::{TenantId, TimelineId}; pub struct LayerCache { /// Layer removal lock. @@ -22,6 +32,9 @@ pub struct LayerCache { timeline: Weak, mapping: Mutex>>, + + tenant_id: TenantId, + timeline_id: TimelineId, } pub struct LayerInUseWrite(tokio::sync::OwnedRwLockWriteGuard<()>); @@ -33,11 +46,14 @@ pub struct DeleteGuard(Arc>); impl LayerCache { pub fn new(timeline: Weak) -> Self { + let timeline_owned = timeline.upgrade().expect("cannot upgrade"); Self { layers_operation_lock: Arc::new(tokio::sync::RwLock::new(())), layers_removal_lock: Arc::new(tokio::sync::Mutex::new(())), mapping: Mutex::new(HashMap::new()), timeline, + tenant_id: timeline_owned.tenant_id, + timeline_id: timeline_owned.timeline_id, } } @@ -140,4 +156,243 @@ impl LayerCache { let mut guard = self.mapping.lock().unwrap(); guard.remove(&layer.layer_desc().key()); } + + #[instrument(skip_all, fields(tenant = %self.tenant_id, timeline = %self.timeline_id))] + pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result> { + let Some(layer) = self.find_layer(layer_file_name) else { return Ok(None) }; + let Some(remote_layer) = layer.downcast_remote_layer() else { return Ok(Some(false)) }; + let Some(timeline) = self.timeline.upgrade() else { return Ok(None) }; + if timeline.remote_client.is_none() { + return Ok(Some(false)); + } + + timeline.download_remote_layer(remote_layer).await?; + Ok(Some(true)) + } + + fn find_layer(&self, layer_file_name: &str) -> Option> { + let layers = self.mapping.lock().unwrap(); + for (_, historic_layer) in layers.iter() { + let historic_layer_name = historic_layer.filename().file_name(); + if layer_file_name == historic_layer_name { + return Some(historic_layer.clone()); + } + } + + None + } + + /// Like [`evict_layer_batch`], but for just one layer. + /// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`. + pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result> { + let Some(local_layer) = self.find_layer(layer_file_name) else { return Ok(None) }; + let Some(timeline) = self.timeline.upgrade() else { return Ok(None) }; + let remote_client = timeline + .remote_client + .as_ref() + .ok_or_else(|| anyhow::anyhow!("remote storage not configured; cannot evict"))?; + + let cancel = CancellationToken::new(); + let results = self + .evict_layer_batch(remote_client, &[local_layer], cancel) + .await?; + assert_eq!(results.len(), 1); + let result: Option> = results.into_iter().next().unwrap(); + match result { + None => anyhow::bail!("task_mgr shutdown requested"), + Some(Ok(b)) => Ok(Some(b)), + Some(Err(e)) => Err(e), + } + } + + /// Evict a batch of layers. + /// + /// GenericRemoteStorage reference is required as a witness[^witness_article] for "remote storage is configured." + /// + /// [^witness_article]: https://willcrichton.net/rust-api-type-patterns/witnesses.html + pub async fn evict_layers( + &self, + _: &GenericRemoteStorage, + layers_to_evict: &[Arc], + cancel: CancellationToken, + ) -> anyhow::Result>>> { + let Some(timeline) = self.timeline.upgrade() else { return Ok(vec![]) }; + let remote_client = timeline.remote_client.clone().expect( + "GenericRemoteStorage is configured, so timeline must have RemoteTimelineClient", + ); + + self.evict_layer_batch(&remote_client, layers_to_evict, cancel) + .await + } + + /// Evict multiple layers at once, continuing through errors. + /// + /// Try to evict the given `layers_to_evict` by + /// + /// 1. Replacing the given layer object in the layer map with a corresponding [`RemoteLayer`] object. + /// 2. Deleting the now unreferenced layer file from disk. + /// + /// The `remote_client` should be this timeline's `self.remote_client`. + /// We make the caller provide it so that they are responsible for handling the case + /// where someone wants to evict the layer but no remote storage is configured. + /// + /// Returns either `Err()` or `Ok(results)` where `results.len() == layers_to_evict.len()`. + /// If `Err()` is returned, no eviction was attempted. + /// Each position of `Ok(results)` corresponds to the layer in `layers_to_evict`. + /// Meaning of each `result[i]`: + /// - `Some(Err(...))` if layer replacement failed for an unexpected reason + /// - `Some(Ok(true))` if everything went well. + /// - `Some(Ok(false))` if there was an expected reason why the layer could not be replaced, e.g.: + /// - evictee was not yet downloaded + /// - replacement failed for an expectable reason (e.g., layer removed by GC before we grabbed all locks) + /// - `None` if no eviction attempt was made for the layer because `cancel.is_cancelled() == true`. + async fn evict_layer_batch( + &self, + remote_client: &Arc, + layers_to_evict: &[Arc], + cancel: CancellationToken, + ) -> anyhow::Result>>> { + // ensure that the layers have finished uploading + // (don't hold the layer_removal_cs while we do it, we're not removing anything yet) + remote_client + .wait_completion() + .await + .context("wait for layer upload ops to complete")?; + + // now lock out layer removal (compaction, gc, timeline deletion) + let layer_removal_guard = self.delete_guard().await; + + { + // to avoid racing with detach and delete_timeline + let Some(timeline) = self.timeline.upgrade() else { return Ok(vec![]) }; + let state = timeline.current_state(); + anyhow::ensure!( + state == TimelineState::Active, + "timeline is not active but {state:?}" + ); + } + + let mut results = Vec::with_capacity(layers_to_evict.len()); + + for l in layers_to_evict.iter() { + let res = if cancel.is_cancelled() { + None + } else { + Some(self.evict_layer_batch_impl(&layer_removal_guard, l)) + }; + results.push(res); + } + + // commit the updates & release locks + drop(layer_removal_guard); + + assert_eq!(results.len(), layers_to_evict.len()); + Ok(results) + } + + fn evict_layer_batch_impl( + &self, + _layer_removal_cs: &DeleteGuard, + local_layer: &Arc, + ) -> anyhow::Result { + if local_layer.is_remote_layer() { + // TODO(issue #3851): consider returning an err here instead of false, + // which is the same out the match later + return Ok(false); + } + + let layer_file_size = local_layer.file_size(); + + let local_layer_mtime = local_layer + .local_path() + .expect("local layer should have a local path") + .metadata() + .context("get local layer file stat")? + .modified() + .context("get mtime of layer file")?; + let local_layer_residence_duration = + match SystemTime::now().duration_since(local_layer_mtime) { + Err(e) => { + warn!("layer mtime is in the future: {}", e); + None + } + Ok(delta) => Some(delta), + }; + + let layer_metadata = LayerFileMetadata::new(layer_file_size); + + let new_remote_layer = Arc::new(match local_layer.filename() { + LayerFileName::Image(image_name) => RemoteLayer::new_img( + self.tenant_id, + self.timeline_id, + &image_name, + &layer_metadata, + local_layer + .access_stats() + .clone_for_residence_change(LayerResidenceStatus::Evicted), + ), + LayerFileName::Delta(delta_name) => RemoteLayer::new_delta( + self.tenant_id, + self.timeline_id, + &delta_name, + &layer_metadata, + local_layer + .access_stats() + .clone_for_residence_change(LayerResidenceStatus::Evicted), + ), + }); + + assert_eq!(local_layer.layer_desc(), new_remote_layer.layer_desc()); + + let succeed = match self.replace_and_verify(local_layer.clone(), new_remote_layer) { + Ok(()) => { + if let Err(e) = local_layer.delete_resident_layer_file() { + error!("failed to remove layer file on evict after replacement: {e:#?}"); + } + // Always decrement the physical size gauge, even if we failed to delete the file. + // Rationale: we already replaced the layer with a remote layer in the layer map, + // and any subsequent download_remote_layer will + // 1. overwrite the file on disk and + // 2. add the downloaded size to the resident size gauge. + // + // If there is no re-download, and we restart the pageserver, then load_layer_map + // will treat the file as a local layer again, count it towards resident size, + // and it'll be like the layer removal never happened. + // The bump in resident size is perhaps unexpected but overall a robust behavior. + + let Some(timeline) = self.timeline.upgrade() else { return Ok(false) }; + + timeline + .metrics + .resident_physical_size_gauge + .sub(layer_file_size); + + timeline.metrics.evictions.inc(); + + if let Some(delta) = local_layer_residence_duration { + timeline + .metrics + .evictions_with_low_residence_duration + .read() + .unwrap() + .observe(delta); + info!(layer=%local_layer.short_id(), residence_millis=delta.as_millis(), "evicted layer after known residence period"); + } else { + info!(layer=%local_layer.short_id(), "evicted layer after unknown residence period"); + } + + true + } + Err(err) => { + if cfg!(debug_assertions) { + error!(evicted=?local_layer, "failed to replace: {err}"); + } else { + error!(evicted=?local_layer, "failed to replace: {err}"); + } + false + } + }; + + Ok(succeed) + } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 7ba5d218c1..362628875c 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -151,7 +151,7 @@ pub struct Timeline { pub(crate) layers: tokio::sync::RwLock<(LayerMap, LayerMapping)>, - pub(super) lcache: LayerCache, + pub(crate) lcache: LayerCache, /// Set of key ranges which should be covered by image layers to /// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored. @@ -1081,235 +1081,6 @@ impl Timeline { historic_layers, } } - - #[instrument(skip_all, fields(tenant = %self.tenant_id, timeline = %self.timeline_id))] - pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result> { - let Some(layer) = self.find_layer(layer_file_name).await else { return Ok(None) }; - let Some(remote_layer) = layer.downcast_remote_layer() else { return Ok(Some(false)) }; - if self.remote_client.is_none() { - return Ok(Some(false)); - } - - self.download_remote_layer(remote_layer).await?; - Ok(Some(true)) - } - - /// Like [`evict_layer_batch`], but for just one layer. - /// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`. - pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result> { - let Some(local_layer) = self.find_layer(layer_file_name).await else { return Ok(None) }; - let remote_client = self - .remote_client - .as_ref() - .ok_or_else(|| anyhow::anyhow!("remote storage not configured; cannot evict"))?; - - let cancel = CancellationToken::new(); - let results = self - .evict_layer_batch(remote_client, &[local_layer], cancel) - .await?; - assert_eq!(results.len(), 1); - let result: Option> = results.into_iter().next().unwrap(); - match result { - None => anyhow::bail!("task_mgr shutdown requested"), - Some(Ok(b)) => Ok(Some(b)), - Some(Err(e)) => Err(e), - } - } - - /// Evict a batch of layers. - /// - /// GenericRemoteStorage reference is required as a witness[^witness_article] for "remote storage is configured." - /// - /// [^witness_article]: https://willcrichton.net/rust-api-type-patterns/witnesses.html - pub async fn evict_layers( - &self, - _: &GenericRemoteStorage, - layers_to_evict: &[Arc], - cancel: CancellationToken, - ) -> anyhow::Result>>> { - let remote_client = self.remote_client.clone().expect( - "GenericRemoteStorage is configured, so timeline must have RemoteTimelineClient", - ); - - self.evict_layer_batch(&remote_client, layers_to_evict, cancel) - .await - } - - /// Evict multiple layers at once, continuing through errors. - /// - /// Try to evict the given `layers_to_evict` by - /// - /// 1. Replacing the given layer object in the layer map with a corresponding [`RemoteLayer`] object. - /// 2. Deleting the now unreferenced layer file from disk. - /// - /// The `remote_client` should be this timeline's `self.remote_client`. - /// We make the caller provide it so that they are responsible for handling the case - /// where someone wants to evict the layer but no remote storage is configured. - /// - /// Returns either `Err()` or `Ok(results)` where `results.len() == layers_to_evict.len()`. - /// If `Err()` is returned, no eviction was attempted. - /// Each position of `Ok(results)` corresponds to the layer in `layers_to_evict`. - /// Meaning of each `result[i]`: - /// - `Some(Err(...))` if layer replacement failed for an unexpected reason - /// - `Some(Ok(true))` if everything went well. - /// - `Some(Ok(false))` if there was an expected reason why the layer could not be replaced, e.g.: - /// - evictee was not yet downloaded - /// - replacement failed for an expectable reason (e.g., layer removed by GC before we grabbed all locks) - /// - `None` if no eviction attempt was made for the layer because `cancel.is_cancelled() == true`. - async fn evict_layer_batch( - &self, - remote_client: &Arc, - layers_to_evict: &[Arc], - cancel: CancellationToken, - ) -> anyhow::Result>>> { - // ensure that the layers have finished uploading - // (don't hold the layer_removal_cs while we do it, we're not removing anything yet) - remote_client - .wait_completion() - .await - .context("wait for layer upload ops to complete")?; - - // now lock out layer removal (compaction, gc, timeline deletion) - let layer_removal_guard = self.lcache.delete_guard().await; - - { - // to avoid racing with detach and delete_timeline - let state = self.current_state(); - anyhow::ensure!( - state == TimelineState::Active, - "timeline is not active but {state:?}" - ); - } - - // start the batch update - let mut guard = self.layers.write().await; - let (layer_map, _) = &mut *guard; - let mut batch_updates = layer_map.batch_update(); - - let mut results = Vec::with_capacity(layers_to_evict.len()); - - for l in layers_to_evict.iter() { - let res = if cancel.is_cancelled() { - None - } else { - Some(self.evict_layer_batch_impl(&layer_removal_guard, l, &mut batch_updates)) - }; - results.push(res); - } - - // commit the updates & release locks - batch_updates.flush(); - drop_wlock(guard); - drop(layer_removal_guard); - - assert_eq!(results.len(), layers_to_evict.len()); - Ok(results) - } - - fn evict_layer_batch_impl( - &self, - _layer_removal_cs: &DeleteGuard, - local_layer: &Arc, - batch_updates: &mut BatchedUpdates<'_>, - ) -> anyhow::Result { - if local_layer.is_remote_layer() { - // TODO(issue #3851): consider returning an err here instead of false, - // which is the same out the match later - return Ok(false); - } - - let layer_file_size = local_layer.file_size(); - - let local_layer_mtime = local_layer - .local_path() - .expect("local layer should have a local path") - .metadata() - .context("get local layer file stat")? - .modified() - .context("get mtime of layer file")?; - let local_layer_residence_duration = - match SystemTime::now().duration_since(local_layer_mtime) { - Err(e) => { - warn!("layer mtime is in the future: {}", e); - None - } - Ok(delta) => Some(delta), - }; - - let layer_metadata = LayerFileMetadata::new(layer_file_size); - - let new_remote_layer = Arc::new(match local_layer.filename() { - LayerFileName::Image(image_name) => RemoteLayer::new_img( - self.tenant_id, - self.timeline_id, - &image_name, - &layer_metadata, - local_layer - .access_stats() - .clone_for_residence_change(batch_updates, LayerResidenceStatus::Evicted), - ), - LayerFileName::Delta(delta_name) => RemoteLayer::new_delta( - self.tenant_id, - self.timeline_id, - &delta_name, - &layer_metadata, - local_layer - .access_stats() - .clone_for_residence_change(batch_updates, LayerResidenceStatus::Evicted), - ), - }); - - assert_eq!(local_layer.layer_desc(), new_remote_layer.layer_desc()); - - let succeed = match self - .lcache - .replace_and_verify(local_layer.clone(), new_remote_layer) - { - Ok(()) => { - if let Err(e) = local_layer.delete_resident_layer_file() { - error!("failed to remove layer file on evict after replacement: {e:#?}"); - } - // Always decrement the physical size gauge, even if we failed to delete the file. - // Rationale: we already replaced the layer with a remote layer in the layer map, - // and any subsequent download_remote_layer will - // 1. overwrite the file on disk and - // 2. add the downloaded size to the resident size gauge. - // - // If there is no re-download, and we restart the pageserver, then load_layer_map - // will treat the file as a local layer again, count it towards resident size, - // and it'll be like the layer removal never happened. - // The bump in resident size is perhaps unexpected but overall a robust behavior. - self.metrics - .resident_physical_size_gauge - .sub(layer_file_size); - - self.metrics.evictions.inc(); - - if let Some(delta) = local_layer_residence_duration { - self.metrics - .evictions_with_low_residence_duration - .read() - .unwrap() - .observe(delta); - info!(layer=%local_layer.short_id(), residence_millis=delta.as_millis(), "evicted layer after known residence period"); - } else { - info!(layer=%local_layer.short_id(), "evicted layer after unknown residence period"); - } - - true - } - Err(err) => { - if cfg!(debug_assertions) { - error!(evicted=?local_layer, "failed to replace: {err}"); - } else { - error!(evicted=?local_layer, "failed to replace: {err}"); - } - false - } - }; - - Ok(succeed) - } } // Private functions diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 89231a31ed..088211ab56 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -254,6 +254,7 @@ impl Timeline { }; let results = match self + .lcache .evict_layer_batch(remote_client, &candidates[..], cancel.clone()) .await {