Compare commits

...

1 Commits

Author SHA1 Message Date
Alex Chi
9479470d05 refactor: use layer cache for layer download
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-05-31 14:50:49 -04:00
10 changed files with 170 additions and 130 deletions

View File

@@ -60,7 +60,7 @@ use utils::serde_percent::Percent;
use crate::{ use crate::{
config::PageServerConf, config::PageServerConf,
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}, task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
tenant::{self, storage_layer::PersistentLayer, Timeline}, tenant::{self, storage_layer::{PersistentLayer, RemoteLayerDesc}, Timeline},
}; };
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
@@ -329,7 +329,7 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
// If we get far enough in the list that we start to evict layers that are below // If we get far enough in the list that we start to evict layers that are below
// the tenant's min-resident-size threshold, print a warning, and memorize the disk // the tenant's min-resident-size threshold, print a warning, and memorize the disk
// usage at that point, in 'usage_planned_min_resident_size_respecting'. // usage at that point, in 'usage_planned_min_resident_size_respecting'.
let mut batched: HashMap<_, Vec<Arc<dyn PersistentLayer>>> = HashMap::new(); let mut batched: HashMap<_, Vec<Arc<RemoteLayerDesc>>> = HashMap::new();
let mut warned = None; let mut warned = None;
let mut usage_planned = usage_pre; let mut usage_planned = usage_pre;
for (i, (partition, candidate)) in candidates.into_iter().enumerate() { for (i, (partition, candidate)) in candidates.into_iter().enumerate() {
@@ -434,7 +434,7 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
#[derive(Clone)] #[derive(Clone)]
struct EvictionCandidate { struct EvictionCandidate {
timeline: Arc<Timeline>, timeline: Arc<Timeline>,
layer: Arc<dyn PersistentLayer>, layer: Arc<RemoteLayerDesc>,
last_activity_ts: SystemTime, last_activity_ts: SystemTime,
} }

View File

@@ -84,6 +84,7 @@ pub mod block_io;
pub mod disk_btree; pub mod disk_btree;
pub(crate) mod ephemeral_file; pub(crate) mod ephemeral_file;
pub mod layer_map; pub mod layer_map;
pub mod layer_cache;
pub mod metadata; pub mod metadata;
mod par_fsync; mod par_fsync;

View File

@@ -0,0 +1,34 @@
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use super::storage_layer::{LayerFileName, PersistentLayer, RemoteLayerDesc};
pub struct LayerCache {
layers: Mutex<HashMap<LayerFileName, Arc<dyn PersistentLayer>>>,
}
impl LayerCache {
pub fn new() -> Self {
Self {
layers: Mutex::new(HashMap::new()),
}
}
pub fn get(&self, layer_fname: &LayerFileName) -> Option<Arc<dyn PersistentLayer>> {
let guard: std::sync::MutexGuard<HashMap<LayerFileName, Arc<dyn PersistentLayer>>> =
self.layers.lock().unwrap();
guard.get(layer_fname).cloned()
}
pub fn contains(&self, layer_fname: &LayerFileName) -> bool {
let guard = self.layers.lock().unwrap();
guard.contains_key(layer_fname)
}
pub fn insert(&self, layer_fname: LayerFileName, persistent_layer: Arc<dyn PersistentLayer>) {
let mut guard = self.layers.lock().unwrap();
guard.insert(layer_fname, persistent_layer);
}
}

View File

@@ -61,6 +61,7 @@ use historic_layer_coverage::BufferedHistoricLayerCoverage;
pub use historic_layer_coverage::Replacement; pub use historic_layer_coverage::Replacement;
use super::storage_layer::range_eq; use super::storage_layer::range_eq;
use super::storage_layer::PersistentLayer;
/// ///
/// LayerMap tracks what layers exist on a timeline. /// LayerMap tracks what layers exist on a timeline.
@@ -138,24 +139,19 @@ where
self.layer_map.remove_historic_noflush(layer) self.layer_map.remove_historic_noflush(layer)
} }
/// Replaces existing layer iff it is the `expected`. /// Ensure the downloaded layer matches existing layer.
/// ///
/// If the expected layer has been removed it will not be inserted by this function. /// Returned `Replacement` describes succeeding in checking or the reason why it could not
///
/// Returned `Replacement` describes succeeding in replacement or the reason why it could not
/// be done. /// be done.
/// pub fn ensure_consistent(
/// TODO replacement can be done without buffering and rebuilding layer map updates. &self,
/// One way to do that is to add a layer of indirection for returned values, so
/// that we can replace values only by updating a hashmap.
pub fn replace_historic(
&mut self,
expected: &Arc<L>, expected: &Arc<L>,
new: Arc<L>, new: Arc<dyn PersistentLayer>,
) -> anyhow::Result<Replacement<Arc<L>>> { ) -> anyhow::Result<Replacement<Arc<L>>> {
fail::fail_point!("layermap-replace-notfound", |_| Ok(Replacement::NotFound)); fail::fail_point!("layermap-replace-notfound", |_| Ok(Replacement::NotFound));
self.layer_map.replace_historic_noflush(expected, new) self.layer_map
.ensure_consistent_noflush(expected, new)
} }
// We will flush on drop anyway, but this method makes it // We will flush on drop anyway, but this method makes it
@@ -309,16 +305,16 @@ where
} }
} }
pub(self) fn replace_historic_noflush( pub(self) fn ensure_consistent_noflush(
&mut self, &self,
expected: &Arc<L>, expected: &Arc<L>,
new: Arc<L>, new: Arc<dyn PersistentLayer>,
) -> anyhow::Result<Replacement<Arc<L>>> { ) -> anyhow::Result<Replacement<Arc<L>>> {
let key = historic_layer_coverage::LayerKey::from(&**expected); let key = historic_layer_coverage::LayerKey::from(&**expected);
let other = historic_layer_coverage::LayerKey::from(&*new); let other = historic_layer_coverage::LayerKey::from(&*new);
let expected_l0 = Self::is_l0(expected); let expected_l0 = Self::is_l0(expected);
let new_l0 = Self::is_l0(&new); let new_l0 = LayerMap::<dyn PersistentLayer>::is_l0(&*new);
anyhow::ensure!( anyhow::ensure!(
key == other, key == other,
@@ -345,17 +341,7 @@ where
None None
}; };
let replaced = self.historic.replace(&key, new.clone(), |existing| { Ok(Replacement::Replaced { in_buffered: false })
Self::compare_arced_layers(existing, expected)
});
if let Replacement::Replaced { .. } = &replaced {
if let Some(index) = l0_index {
self.l0_delta_layers[index] = new;
}
}
Ok(replaced)
} }
/// Helper function for BatchedUpdates::drop. /// Helper function for BatchedUpdates::drop.

View File

@@ -37,7 +37,7 @@ pub use delta_layer::{DeltaLayer, DeltaLayerWriter};
pub use filename::{DeltaFileName, ImageFileName, LayerFileName}; pub use filename::{DeltaFileName, ImageFileName, LayerFileName};
pub use image_layer::{ImageLayer, ImageLayerWriter}; pub use image_layer::{ImageLayer, ImageLayerWriter};
pub use inmemory_layer::InMemoryLayer; pub use inmemory_layer::InMemoryLayer;
pub use remote_layer::RemoteLayer; pub use remote_layer::RemoteLayerDesc;
use super::layer_map::BatchedUpdates; use super::layer_map::BatchedUpdates;
@@ -431,14 +431,6 @@ pub trait PersistentLayer: Layer {
/// Permanently remove this layer from disk. /// Permanently remove this layer from disk.
fn delete_resident_layer_file(&self) -> Result<()>; fn delete_resident_layer_file(&self) -> Result<()>;
fn downcast_remote_layer(self: Arc<Self>) -> Option<std::sync::Arc<RemoteLayer>> {
None
}
fn is_remote_layer(&self) -> bool {
false
}
/// Returns None if the layer file size is not known. /// Returns None if the layer file size is not known.
/// ///
/// Should not change over the lifetime of the layer object because /// Should not change over the lifetime of the layer object because
@@ -450,16 +442,6 @@ pub trait PersistentLayer: Layer {
fn access_stats(&self) -> &LayerAccessStats; fn access_stats(&self) -> &LayerAccessStats;
} }
pub fn downcast_remote_layer(
layer: &Arc<dyn PersistentLayer>,
) -> Option<std::sync::Arc<RemoteLayer>> {
if layer.is_remote_layer() {
Arc::clone(layer).downcast_remote_layer()
} else {
None
}
}
/// Holds metadata about a layer without any content. Used mostly for testing. /// Holds metadata about a layer without any content. Used mostly for testing.
/// ///
/// To use filenames as fixtures, parse them as [`LayerFileName`] then convert from that to a /// To use filenames as fixtures, parse them as [`LayerFileName`] then convert from that to a

View File

@@ -30,6 +30,7 @@ use crate::repository::{Key, Value, KEY_SIZE};
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter}; use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockReader, FileBlockReader}; use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection}; use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::storage_layer::{ use crate::tenant::storage_layer::{
PersistentLayer, ValueReconstructResult, ValueReconstructState, PersistentLayer, ValueReconstructResult, ValueReconstructState,
}; };
@@ -57,7 +58,7 @@ use utils::{
use super::{ use super::{
DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, LayerFileName, LayerIter, DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, LayerFileName, LayerIter,
LayerKeyIter, PathOrConf, LayerKeyIter, PathOrConf, RemoteLayerDesc,
}; };
/// ///
@@ -663,6 +664,17 @@ impl DeltaLayer {
&self.layer_name(), &self.layer_name(),
) )
} }
/// Create layer descriptor for this image layer
pub fn layer_desc(&self) -> RemoteLayerDesc {
RemoteLayerDesc::new_delta(
self.tenant_id,
self.timeline_id,
&self.layer_name(),
&LayerFileMetadata::new(self.file_size()),
LayerAccessStats::empty_will_record_residence_event_later(),
)
}
} }
/// A builder object for constructing a new delta layer. /// A builder object for constructing a new delta layer.

View File

@@ -26,6 +26,7 @@ use crate::repository::{Key, KEY_SIZE};
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter}; use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader}; use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection}; use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::storage_layer::{ use crate::tenant::storage_layer::{
LayerAccessStats, PersistentLayer, ValueReconstructResult, ValueReconstructState, LayerAccessStats, PersistentLayer, ValueReconstructResult, ValueReconstructState,
}; };
@@ -53,7 +54,7 @@ use utils::{
}; };
use super::filename::{ImageFileName, LayerFileName}; use super::filename::{ImageFileName, LayerFileName};
use super::{Layer, LayerAccessStatsReset, LayerIter, PathOrConf}; use super::{Layer, LayerAccessStatsReset, LayerIter, PathOrConf, RemoteLayerDesc};
/// ///
/// Header stored in the beginning of the file /// Header stored in the beginning of the file
@@ -464,6 +465,17 @@ impl ImageLayer {
&self.layer_name(), &self.layer_name(),
) )
} }
/// Create layer descriptor for this image layer
pub fn layer_desc(&self) -> RemoteLayerDesc {
RemoteLayerDesc::new_img(
self.tenant_id,
self.timeline_id,
&self.layer_name(),
&LayerFileMetadata::new(self.file_size()),
LayerAccessStats::empty_will_record_residence_event_later(),
)
}
} }
/// A builder object for constructing a new image layer. /// A builder object for constructing a new image layer.

View File

@@ -1,4 +1,4 @@
//! A RemoteLayer is an in-memory placeholder for a layer file that exists //! A RemoteLayerDesc is an in-memory placeholder for a layer file that exists
//! in remote storage. //! in remote storage.
//! //!
use crate::config::PageServerConf; use crate::config::PageServerConf;
@@ -25,19 +25,19 @@ use super::{
LayerResidenceStatus, PersistentLayer, LayerResidenceStatus, PersistentLayer,
}; };
/// RemoteLayer is a not yet downloaded [`ImageLayer`] or /// RemoteLayerDesc is a not yet downloaded [`ImageLayer`] or
/// [`crate::storage_layer::DeltaLayer`]. /// [`crate::storage_layer::DeltaLayer`].
/// ///
/// RemoteLayer might be downloaded on-demand during operations which are /// RemoteLayerDesc might be downloaded on-demand during operations which are
/// allowed download remote layers and during which, it gets replaced with a /// allowed download remote layers and during which, it gets replaced with a
/// concrete `DeltaLayer` or `ImageLayer`. /// concrete `DeltaLayer` or `ImageLayer`.
/// ///
/// See: [`crate::context::RequestContext`] for authorization to download /// See: [`crate::context::RequestContext`] for authorization to download
pub struct RemoteLayer { pub struct RemoteLayerDesc {
tenantid: TenantId, pub(crate) tenantid: TenantId,
timelineid: TimelineId, pub(crate) timelineid: TimelineId,
key_range: Range<Key>, pub(crate) key_range: Range<Key>,
lsn_range: Range<Lsn>, pub(crate) lsn_range: Range<Lsn>,
pub file_name: LayerFileName, pub file_name: LayerFileName,
@@ -54,7 +54,7 @@ pub struct RemoteLayer {
/// Has `LayerMap::replace` failed for this (true) or not (false). /// Has `LayerMap::replace` failed for this (true) or not (false).
/// ///
/// Used together with [`ongoing_download`] semaphore in `Timeline::download_remote_layer`. /// Used together with [`ongoing_download`] semaphore in `Timeline::download_remote_layer`.
/// The field is used to mark a RemoteLayer permanently (until restart or ignore+load) /// The field is used to mark a RemoteLayerDesc permanently (until restart or ignore+load)
/// unprocessable, because a LayerMap::replace failed. /// unprocessable, because a LayerMap::replace failed.
/// ///
/// It is very unlikely to accumulate these in the Timeline's LayerMap, but having this avoids /// It is very unlikely to accumulate these in the Timeline's LayerMap, but having this avoids
@@ -63,9 +63,9 @@ pub struct RemoteLayer {
pub(crate) download_replacement_failure: std::sync::atomic::AtomicBool, pub(crate) download_replacement_failure: std::sync::atomic::AtomicBool,
} }
impl std::fmt::Debug for RemoteLayer { impl std::fmt::Debug for RemoteLayerDesc {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RemoteLayer") f.debug_struct("RemoteLayerDesc")
.field("file_name", &self.file_name) .field("file_name", &self.file_name)
.field("layer_metadata", &self.layer_metadata) .field("layer_metadata", &self.layer_metadata)
.field("is_incremental", &self.is_incremental) .field("is_incremental", &self.is_incremental)
@@ -73,7 +73,7 @@ impl std::fmt::Debug for RemoteLayer {
} }
} }
impl Layer for RemoteLayer { impl Layer for RemoteLayerDesc {
fn get_key_range(&self) -> Range<Key> { fn get_key_range(&self) -> Range<Key> {
self.key_range.clone() self.key_range.clone()
} }
@@ -119,7 +119,7 @@ impl Layer for RemoteLayer {
} }
} }
impl PersistentLayer for RemoteLayer { impl PersistentLayer for RemoteLayerDesc {
fn get_tenant_id(&self) -> TenantId { fn get_tenant_id(&self) -> TenantId {
self.tenantid self.tenantid
} }
@@ -160,14 +160,6 @@ impl PersistentLayer for RemoteLayer {
bail!("remote layer has no layer file"); bail!("remote layer has no layer file");
} }
fn downcast_remote_layer<'a>(self: Arc<Self>) -> Option<std::sync::Arc<RemoteLayer>> {
Some(self)
}
fn is_remote_layer(&self) -> bool {
true
}
fn file_size(&self) -> u64 { fn file_size(&self) -> u64 {
self.layer_metadata.file_size() self.layer_metadata.file_size()
} }
@@ -201,15 +193,15 @@ impl PersistentLayer for RemoteLayer {
} }
} }
impl RemoteLayer { impl RemoteLayerDesc {
pub fn new_img( pub fn new_img(
tenantid: TenantId, tenantid: TenantId,
timelineid: TimelineId, timelineid: TimelineId,
fname: &ImageFileName, fname: &ImageFileName,
layer_metadata: &LayerFileMetadata, layer_metadata: &LayerFileMetadata,
access_stats: LayerAccessStats, access_stats: LayerAccessStats,
) -> RemoteLayer { ) -> RemoteLayerDesc {
RemoteLayer { RemoteLayerDesc {
tenantid, tenantid,
timelineid, timelineid,
key_range: fname.key_range.clone(), key_range: fname.key_range.clone(),
@@ -230,8 +222,8 @@ impl RemoteLayer {
fname: &DeltaFileName, fname: &DeltaFileName,
layer_metadata: &LayerFileMetadata, layer_metadata: &LayerFileMetadata,
access_stats: LayerAccessStats, access_stats: LayerAccessStats,
) -> RemoteLayer { ) -> RemoteLayerDesc {
RemoteLayer { RemoteLayerDesc {
tenantid, tenantid,
timelineid, timelineid,
key_range: fname.key_range.clone(), key_range: fname.key_range.clone(),

View File

@@ -35,7 +35,7 @@ use crate::context::{DownloadBehavior, RequestContext};
use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata}; use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata};
use crate::tenant::storage_layer::{ use crate::tenant::storage_layer::{
DeltaFileName, DeltaLayerWriter, ImageFileName, ImageLayerWriter, InMemoryLayer, DeltaFileName, DeltaLayerWriter, ImageFileName, ImageLayerWriter, InMemoryLayer,
LayerAccessStats, LayerFileName, RemoteLayer, LayerAccessStats, LayerFileName, RemoteLayerDesc,
}; };
use crate::tenant::{ use crate::tenant::{
ephemeral_file::is_ephemeral_file, ephemeral_file::is_ephemeral_file,
@@ -77,6 +77,7 @@ use self::eviction_task::EvictionTaskTimelineState;
use self::walreceiver::{WalReceiver, WalReceiverConf}; use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::config::TenantConf; use super::config::TenantConf;
use super::layer_cache::LayerCache;
use super::layer_map::BatchedUpdates; use super::layer_map::BatchedUpdates;
use super::remote_timeline_client::index::IndexPart; use super::remote_timeline_client::index::IndexPart;
use super::remote_timeline_client::RemoteTimelineClient; use super::remote_timeline_client::RemoteTimelineClient;
@@ -119,7 +120,7 @@ pub struct Timeline {
pub pg_version: u32, pub pg_version: u32,
pub(super) layers: RwLock<LayerMap<dyn PersistentLayer>>, pub(super) layers: RwLock<LayerMap<RemoteLayerDesc>>,
/// Set of key ranges which should be covered by image layers to /// Set of key ranges which should be covered by image layers to
/// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored. /// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
@@ -241,6 +242,8 @@ pub struct Timeline {
pub delete_lock: tokio::sync::Mutex<bool>, pub delete_lock: tokio::sync::Mutex<bool>,
eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>, eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
layer_cache: Arc<LayerCache>,
} }
/// Internal structure to hold all data needed for logical size calculation. /// Internal structure to hold all data needed for logical size calculation.
@@ -1007,20 +1010,22 @@ impl Timeline {
#[instrument(skip_all, fields(tenant = %self.tenant_id, timeline = %self.timeline_id))] #[instrument(skip_all, fields(tenant = %self.tenant_id, timeline = %self.timeline_id))]
pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> { pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
let Some(layer) = self.find_layer(layer_file_name) else { return Ok(None) }; let Some(remote_layer_desc) = self.find_layer_desc(layer_file_name) else { return Ok(None) };
let Some(remote_layer) = layer.downcast_remote_layer() else { return Ok(Some(false)) }; if self.layer_cache.contains(&remote_layer_desc.filename()) {
return Ok(Some(false));
}
if self.remote_client.is_none() { if self.remote_client.is_none() {
return Ok(Some(false)); return Ok(Some(false));
} }
self.download_remote_layer(remote_layer).await?; self.download_remote_layer(remote_layer_desc).await?;
Ok(Some(true)) Ok(Some(true))
} }
/// Like [`evict_layer_batch`], but for just one layer. /// Like [`evict_layer_batch`], but for just one layer.
/// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`. /// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`.
pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> { pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
let Some(local_layer) = self.find_layer(layer_file_name) else { return Ok(None) }; let Some(local_layer) = self.find_layer_desc(layer_file_name) else { return Ok(None) };
let remote_client = self let remote_client = self
.remote_client .remote_client
.as_ref() .as_ref()
@@ -1047,7 +1052,7 @@ impl Timeline {
pub async fn evict_layers( pub async fn evict_layers(
&self, &self,
_: &GenericRemoteStorage, _: &GenericRemoteStorage,
layers_to_evict: &[Arc<dyn PersistentLayer>], layers_to_evict: &[Arc<RemoteLayerDesc>],
cancel: CancellationToken, cancel: CancellationToken,
) -> anyhow::Result<Vec<Option<anyhow::Result<bool>>>> { ) -> anyhow::Result<Vec<Option<anyhow::Result<bool>>>> {
let remote_client = self.remote_client.clone().expect( let remote_client = self.remote_client.clone().expect(
@@ -1082,7 +1087,7 @@ impl Timeline {
async fn evict_layer_batch( async fn evict_layer_batch(
&self, &self,
remote_client: &Arc<RemoteTimelineClient>, remote_client: &Arc<RemoteTimelineClient>,
layers_to_evict: &[Arc<dyn PersistentLayer>], layers_to_evict: &[Arc<RemoteLayerDesc>],
cancel: CancellationToken, cancel: CancellationToken,
) -> anyhow::Result<Vec<Option<anyhow::Result<bool>>>> { ) -> anyhow::Result<Vec<Option<anyhow::Result<bool>>>> {
// ensure that the layers have finished uploading // ensure that the layers have finished uploading
@@ -1131,12 +1136,12 @@ impl Timeline {
fn evict_layer_batch_impl( fn evict_layer_batch_impl(
&self, &self,
_layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>, _layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
local_layer: &Arc<dyn PersistentLayer>, local_layer: &Arc<RemoteLayerDesc>,
batch_updates: &mut BatchedUpdates<'_, dyn PersistentLayer>, batch_updates: &mut BatchedUpdates<'_, RemoteLayerDesc>,
) -> anyhow::Result<bool> { ) -> anyhow::Result<bool> {
use super::layer_map::Replacement; use super::layer_map::Replacement;
if local_layer.is_remote_layer() { if !self.layer_cache.contains(&local_layer.filename()) {
// TODO(issue #3851): consider returning an err here instead of false, // TODO(issue #3851): consider returning an err here instead of false,
// which is the same out the match later // which is the same out the match later
return Ok(false); return Ok(false);
@@ -1163,7 +1168,7 @@ impl Timeline {
let layer_metadata = LayerFileMetadata::new(layer_file_size); let layer_metadata = LayerFileMetadata::new(layer_file_size);
let new_remote_layer = Arc::new(match local_layer.filename() { let new_remote_layer = Arc::new(match local_layer.filename() {
LayerFileName::Image(image_name) => RemoteLayer::new_img( LayerFileName::Image(image_name) => RemoteLayerDesc::new_img(
self.tenant_id, self.tenant_id,
self.timeline_id, self.timeline_id,
&image_name, &image_name,
@@ -1172,7 +1177,7 @@ impl Timeline {
.access_stats() .access_stats()
.clone_for_residence_change(batch_updates, LayerResidenceStatus::Evicted), .clone_for_residence_change(batch_updates, LayerResidenceStatus::Evicted),
), ),
LayerFileName::Delta(delta_name) => RemoteLayer::new_delta( LayerFileName::Delta(delta_name) => RemoteLayerDesc::new_delta(
self.tenant_id, self.tenant_id,
self.timeline_id, self.timeline_id,
&delta_name, &delta_name,
@@ -1183,6 +1188,7 @@ impl Timeline {
), ),
}); });
/*
let replaced = match batch_updates.replace_historic(local_layer, new_remote_layer)? { let replaced = match batch_updates.replace_historic(local_layer, new_remote_layer)? {
Replacement::Replaced { .. } => { Replacement::Replaced { .. } => {
if let Err(e) = local_layer.delete_resident_layer_file() { if let Err(e) = local_layer.delete_resident_layer_file() {
@@ -1233,8 +1239,10 @@ impl Timeline {
false false
} }
}; };
*/
Ok(replaced) // Ok(replaced)
Ok(true)
} }
} }
@@ -1419,6 +1427,8 @@ impl Timeline {
EvictionTaskTimelineState::default(), EvictionTaskTimelineState::default(),
), ),
delete_lock: tokio::sync::Mutex::new(false), delete_lock: tokio::sync::Mutex::new(false),
layer_cache: Arc::new(LayerCache::new()),
}; };
result.repartition_threshold = result.get_checkpoint_distance() / 10; result.repartition_threshold = result.get_checkpoint_distance() / 10;
result result
@@ -1565,9 +1575,12 @@ impl Timeline {
LayerAccessStats::for_loading_layer(&updates, LayerResidenceStatus::Resident), LayerAccessStats::for_loading_layer(&updates, LayerResidenceStatus::Resident),
); );
let remote_desc = layer.layer_desc();
trace!("found layer {}", layer.path().display()); trace!("found layer {}", layer.path().display());
total_physical_size += file_size; total_physical_size += file_size;
updates.insert_historic(Arc::new(layer)); self.layer_cache.insert(layer.filename(), Arc::new(layer));
updates.insert_historic(Arc::new(remote_desc));
num_layers += 1; num_layers += 1;
} else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) { } else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) {
// Create a DeltaLayer struct for each delta file. // Create a DeltaLayer struct for each delta file.
@@ -1599,7 +1612,9 @@ impl Timeline {
trace!("found layer {}", layer.path().display()); trace!("found layer {}", layer.path().display());
total_physical_size += file_size; total_physical_size += file_size;
updates.insert_historic(Arc::new(layer)); let remote_desc = layer.layer_desc();
self.layer_cache.insert(layer.filename(), Arc::new(layer));
updates.insert_historic(Arc::new(remote_desc));
num_layers += 1; num_layers += 1;
} else if fname == METADATA_FILE_NAME || fname.ends_with(".old") { } else if fname == METADATA_FILE_NAME || fname.ends_with(".old") {
// ignore these // ignore these
@@ -1644,9 +1659,9 @@ impl Timeline {
async fn create_remote_layers( async fn create_remote_layers(
&self, &self,
index_part: &IndexPart, index_part: &IndexPart,
local_layers: HashMap<LayerFileName, Arc<dyn PersistentLayer>>, local_layers: HashMap<LayerFileName, Arc<RemoteLayerDesc>>,
up_to_date_disk_consistent_lsn: Lsn, up_to_date_disk_consistent_lsn: Lsn,
) -> anyhow::Result<HashMap<LayerFileName, Arc<dyn PersistentLayer>>> { ) -> anyhow::Result<HashMap<LayerFileName, Arc<RemoteLayerDesc>>> {
// Are we missing some files that are present in remote storage? // Are we missing some files that are present in remote storage?
// Create RemoteLayer instances for them. // Create RemoteLayer instances for them.
let mut local_only_layers = local_layers; let mut local_only_layers = local_layers;
@@ -1725,7 +1740,7 @@ impl Timeline {
continue; continue;
} }
let remote_layer = RemoteLayer::new_img( let remote_layer = RemoteLayerDesc::new_img(
self.tenant_id, self.tenant_id,
self.timeline_id, self.timeline_id,
imgfilename, imgfilename,
@@ -1753,7 +1768,7 @@ impl Timeline {
); );
continue; continue;
} }
let remote_layer = RemoteLayer::new_delta( let remote_layer = RemoteLayerDesc::new_delta(
self.tenant_id, self.tenant_id,
self.timeline_id, self.timeline_id,
deltafilename, deltafilename,
@@ -2159,7 +2174,7 @@ impl Timeline {
} }
} }
fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> { fn find_layer_desc(&self, layer_file_name: &str) -> Option<Arc<RemoteLayerDesc>> {
for historic_layer in self.layers.read().unwrap().iter_historic_layers() { for historic_layer in self.layers.read().unwrap().iter_historic_layers() {
let historic_layer_name = historic_layer.filename().file_name(); let historic_layer_name = historic_layer.filename().file_name();
if layer_file_name == historic_layer_name { if layer_file_name == historic_layer_name {
@@ -2176,10 +2191,10 @@ impl Timeline {
&self, &self,
// we cannot remove layers otherwise, since gc and compaction will race // we cannot remove layers otherwise, since gc and compaction will race
_layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>, _layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
layer: Arc<dyn PersistentLayer>, layer: Arc<RemoteLayerDesc>,
updates: &mut BatchedUpdates<'_, dyn PersistentLayer>, updates: &mut BatchedUpdates<'_, RemoteLayerDesc>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
if !layer.is_remote_layer() { if self.layer_cache.contains(&layer.filename()) {
layer.delete_resident_layer_file()?; layer.delete_resident_layer_file()?;
let layer_file_size = layer.file_size(); let layer_file_size = layer.file_size();
self.metrics self.metrics
@@ -2428,13 +2443,7 @@ impl Timeline {
if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) { if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
// If it's a remote layer, download it and retry. // If it's a remote layer, download it and retry.
if let Some(remote_layer) = if let Some(layer) = self.layer_cache.get(&layer.filename()) {
super::storage_layer::downcast_remote_layer(&layer)
{
// TODO: push a breadcrumb to 'traversal_path' to record the fact that
// we downloaded / would need to download this layer.
remote_layer // download happens outside the scope of `layers` guard object
} else {
// Get all the data needed to reconstruct the page version from this layer. // Get all the data needed to reconstruct the page version from this layer.
// But if we have an older cached page image, no need to go past that. // But if we have an older cached page image, no need to go past that.
let lsn_floor = max(cached_lsn + 1, lsn_floor); let lsn_floor = max(cached_lsn + 1, lsn_floor);
@@ -2457,6 +2466,10 @@ impl Timeline {
}), }),
)); ));
continue 'outer; continue 'outer;
} else {
// TODO: push a breadcrumb to 'traversal_path' to record the fact that
// we downloaded / would need to download this layer.
layer // download happens outside the scope of `layers` guard object
} }
} else if timeline.ancestor_timeline.is_some() { } else if timeline.ancestor_timeline.is_some() {
// Nothing on this timeline. Traverse to parent // Nothing on this timeline. Traverse to parent
@@ -2896,7 +2909,8 @@ impl Timeline {
LayerResidenceStatus::Resident, LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate, LayerResidenceEventReason::LayerCreate,
); );
batch_updates.insert_historic(l); batch_updates.insert_historic(Arc::new(l.layer_desc()));
self.layer_cache.insert(l.filename(), l);
batch_updates.flush(); batch_updates.flush();
// update the timeline's physical size // update the timeline's physical size
@@ -3142,7 +3156,9 @@ impl Timeline {
LayerResidenceStatus::Resident, LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate, LayerResidenceEventReason::LayerCreate,
); );
updates.insert_historic(l); updates.insert_historic(Arc::new(l.layer_desc()));
let x: Arc<dyn PersistentLayer> = l;
self.layer_cache.insert(x.filename(), x)
} }
updates.flush(); updates.flush();
drop(layers); drop(layers);
@@ -3155,7 +3171,7 @@ impl Timeline {
#[derive(Default)] #[derive(Default)]
struct CompactLevel0Phase1Result { struct CompactLevel0Phase1Result {
new_layers: Vec<DeltaLayer>, new_layers: Vec<DeltaLayer>,
deltas_to_compact: Vec<Arc<dyn PersistentLayer>>, deltas_to_compact: Vec<Arc<RemoteLayerDesc>>,
} }
/// Top-level failure to compact. /// Top-level failure to compact.
@@ -3165,7 +3181,7 @@ enum CompactionError {
/// ///
/// This should not happen repeatedly, but will be retried once by top-level /// This should not happen repeatedly, but will be retried once by top-level
/// `Timeline::compact`. /// `Timeline::compact`.
DownloadRequired(Vec<Arc<RemoteLayer>>), DownloadRequired(Vec<Arc<RemoteLayerDesc>>),
/// Compaction cannot be done right now; page reconstruction and so on. /// Compaction cannot be done right now; page reconstruction and so on.
Other(anyhow::Error), Other(anyhow::Error),
} }
@@ -3237,13 +3253,9 @@ impl Timeline {
let remotes = deltas_to_compact let remotes = deltas_to_compact
.iter() .iter()
.filter(|l| l.is_remote_layer()) .filter(|l| !self.layer_cache.contains(&l.filename()))
.inspect(|l| info!("compact requires download of {}", l.filename().file_name())) .inspect(|l| info!("compact requires download of {}", l.filename().file_name()))
.map(|l| { .cloned()
l.clone()
.downcast_remote_layer()
.expect("just checked it is remote layer")
})
.collect::<Vec<_>>(); .collect::<Vec<_>>();
if !remotes.is_empty() { if !remotes.is_empty() {
@@ -3583,13 +3595,15 @@ impl Timeline {
.add(metadata.len()); .add(metadata.len());
new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len())); new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len()));
let remote_desc = l.layer_desc();
let x: Arc<dyn PersistentLayer + 'static> = Arc::new(l); let x: Arc<dyn PersistentLayer + 'static> = Arc::new(l);
x.access_stats().record_residence_event( x.access_stats().record_residence_event(
&updates, &updates,
LayerResidenceStatus::Resident, LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate, LayerResidenceEventReason::LayerCreate,
); );
updates.insert_historic(x); updates.insert_historic(Arc::new(remote_desc));
self.layer_cache.insert(x.filename(), x)
} }
// Now that we have reshuffled the data to set of new delta layers, we can // Now that we have reshuffled the data to set of new delta layers, we can
@@ -4062,7 +4076,7 @@ impl Timeline {
#[instrument(skip_all, fields(layer=%remote_layer.short_id()))] #[instrument(skip_all, fields(layer=%remote_layer.short_id()))]
pub async fn download_remote_layer( pub async fn download_remote_layer(
&self, &self,
remote_layer: Arc<RemoteLayer>, remote_layer: Arc<RemoteLayerDesc>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_and_timeline_id(); debug_assert_current_span_has_tenant_and_timeline_id();
@@ -4119,10 +4133,12 @@ impl Timeline {
// Delta- or ImageLayer in the layer map. // Delta- or ImageLayer in the layer map.
let mut layers = self_clone.layers.write().unwrap(); let mut layers = self_clone.layers.write().unwrap();
let mut updates = layers.batch_update(); let mut updates = layers.batch_update();
let new_layer = remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size); let new_layer =
remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size);
{ {
use crate::tenant::layer_map::Replacement; use crate::tenant::layer_map::Replacement;
let l: Arc<dyn PersistentLayer> = remote_layer.clone(); let l: Arc<dyn PersistentLayer> = remote_layer.clone();
/*
let failure = match updates.replace_historic(&l, new_layer) { let failure = match updates.replace_historic(&l, new_layer) {
Ok(Replacement::Replaced { .. }) => false, Ok(Replacement::Replaced { .. }) => false,
Ok(Replacement::NotFound) => { Ok(Replacement::NotFound) => {
@@ -4177,8 +4193,9 @@ impl Timeline {
remote_layer remote_layer
.download_replacement_failure .download_replacement_failure
.store(true, Relaxed); .store(true, Relaxed);
} } */
} }
updates.flush(); updates.flush();
drop(layers); drop(layers);
@@ -4191,7 +4208,10 @@ impl Timeline {
remote_layer.ongoing_download.close(); remote_layer.ongoing_download.close();
} else { } else {
// Keep semaphore open. We'll drop the permit at the end of the function. // Keep semaphore open. We'll drop the permit at the end of the function.
error!("layer file download failed: {:?}", result.as_ref().unwrap_err()); error!(
"layer file download failed: {:?}",
result.as_ref().unwrap_err()
);
} }
// Don't treat it as an error if the task that triggered the download // Don't treat it as an error if the task that triggered the download
@@ -4205,7 +4225,8 @@ impl Timeline {
drop(permit); drop(permit);
Ok(()) Ok(())
}.in_current_span(), }
.in_current_span(),
); );
receiver.await.context("download task cancelled")? receiver.await.context("download task cancelled")?
@@ -4278,7 +4299,7 @@ impl Timeline {
let layers = self.layers.read().unwrap(); let layers = self.layers.read().unwrap();
layers layers
.iter_historic_layers() .iter_historic_layers()
.filter_map(|l| l.downcast_remote_layer()) .filter(|l| !self.layer_cache.contains(&l.filename()))
.map(|l| self.download_remote_layer(l)) .map(|l| self.download_remote_layer(l))
.for_each(|dl| downloads.push(dl)) .for_each(|dl| downloads.push(dl))
} }
@@ -4353,7 +4374,7 @@ pub struct DiskUsageEvictionInfo {
} }
pub struct LocalLayerInfoForDiskUsageEviction { pub struct LocalLayerInfoForDiskUsageEviction {
pub layer: Arc<dyn PersistentLayer>, pub layer: Arc<RemoteLayerDesc>,
pub last_activity_ts: SystemTime, pub last_activity_ts: SystemTime,
} }
@@ -4387,7 +4408,7 @@ impl Timeline {
let file_size = l.file_size(); let file_size = l.file_size();
max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size))); max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
if l.is_remote_layer() { if !self.layer_cache.contains(&l.filename()) {
continue; continue;
} }

View File

@@ -29,7 +29,7 @@ use crate::{
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}, task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
tenant::{ tenant::{
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold}, config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
storage_layer::PersistentLayer, storage_layer::{PersistentLayer, RemoteLayerDesc},
LogicalSizeCalculationCause, Tenant, LogicalSizeCalculationCause, Tenant,
}, },
}; };
@@ -184,11 +184,11 @@ impl Timeline {
// NB: all the checks can be invalidated as soon as we release the layer map lock. // NB: all the checks can be invalidated as soon as we release the layer map lock.
// We don't want to hold the layer map lock during eviction. // We don't want to hold the layer map lock during eviction.
// So, we just need to deal with this. // So, we just need to deal with this.
let candidates: Vec<Arc<dyn PersistentLayer>> = { let candidates: Vec<Arc<RemoteLayerDesc>> = {
let layers = self.layers.read().unwrap(); let layers = self.layers.read().unwrap();
let mut candidates = Vec::new(); let mut candidates = Vec::new();
for hist_layer in layers.iter_historic_layers() { for hist_layer in layers.iter_historic_layers() {
if hist_layer.is_remote_layer() { if !self.layer_cache.contains(&hist_layer.filename()) {
continue; continue;
} }