Compare commits

...

1 Commits

Author SHA1 Message Date
Alex Chi
9479470d05 refactor: use layer cache for layer download
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-05-31 14:50:49 -04:00
10 changed files with 170 additions and 130 deletions

View File

@@ -60,7 +60,7 @@ use utils::serde_percent::Percent;
use crate::{
config::PageServerConf,
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
tenant::{self, storage_layer::PersistentLayer, Timeline},
tenant::{self, storage_layer::{PersistentLayer, RemoteLayerDesc}, Timeline},
};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
@@ -329,7 +329,7 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
// If we get far enough in the list that we start to evict layers that are below
// the tenant's min-resident-size threshold, print a warning, and memorize the disk
// usage at that point, in 'usage_planned_min_resident_size_respecting'.
let mut batched: HashMap<_, Vec<Arc<dyn PersistentLayer>>> = HashMap::new();
let mut batched: HashMap<_, Vec<Arc<RemoteLayerDesc>>> = HashMap::new();
let mut warned = None;
let mut usage_planned = usage_pre;
for (i, (partition, candidate)) in candidates.into_iter().enumerate() {
@@ -434,7 +434,7 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
#[derive(Clone)]
struct EvictionCandidate {
timeline: Arc<Timeline>,
layer: Arc<dyn PersistentLayer>,
layer: Arc<RemoteLayerDesc>,
last_activity_ts: SystemTime,
}

View File

@@ -84,6 +84,7 @@ pub mod block_io;
pub mod disk_btree;
pub(crate) mod ephemeral_file;
pub mod layer_map;
pub mod layer_cache;
pub mod metadata;
mod par_fsync;

View File

@@ -0,0 +1,34 @@
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use super::storage_layer::{LayerFileName, PersistentLayer, RemoteLayerDesc};
pub struct LayerCache {
layers: Mutex<HashMap<LayerFileName, Arc<dyn PersistentLayer>>>,
}
impl LayerCache {
pub fn new() -> Self {
Self {
layers: Mutex::new(HashMap::new()),
}
}
pub fn get(&self, layer_fname: &LayerFileName) -> Option<Arc<dyn PersistentLayer>> {
let guard: std::sync::MutexGuard<HashMap<LayerFileName, Arc<dyn PersistentLayer>>> =
self.layers.lock().unwrap();
guard.get(layer_fname).cloned()
}
pub fn contains(&self, layer_fname: &LayerFileName) -> bool {
let guard = self.layers.lock().unwrap();
guard.contains_key(layer_fname)
}
pub fn insert(&self, layer_fname: LayerFileName, persistent_layer: Arc<dyn PersistentLayer>) {
let mut guard = self.layers.lock().unwrap();
guard.insert(layer_fname, persistent_layer);
}
}

View File

@@ -61,6 +61,7 @@ use historic_layer_coverage::BufferedHistoricLayerCoverage;
pub use historic_layer_coverage::Replacement;
use super::storage_layer::range_eq;
use super::storage_layer::PersistentLayer;
///
/// LayerMap tracks what layers exist on a timeline.
@@ -138,24 +139,19 @@ where
self.layer_map.remove_historic_noflush(layer)
}
/// Replaces existing layer iff it is the `expected`.
/// Ensure the downloaded layer matches existing layer.
///
/// If the expected layer has been removed it will not be inserted by this function.
///
/// Returned `Replacement` describes succeeding in replacement or the reason why it could not
/// Returned `Replacement` describes succeeding in checking or the reason why it could not
/// be done.
///
/// TODO replacement can be done without buffering and rebuilding layer map updates.
/// One way to do that is to add a layer of indirection for returned values, so
/// that we can replace values only by updating a hashmap.
pub fn replace_historic(
&mut self,
pub fn ensure_consistent(
&self,
expected: &Arc<L>,
new: Arc<L>,
new: Arc<dyn PersistentLayer>,
) -> anyhow::Result<Replacement<Arc<L>>> {
fail::fail_point!("layermap-replace-notfound", |_| Ok(Replacement::NotFound));
self.layer_map.replace_historic_noflush(expected, new)
self.layer_map
.ensure_consistent_noflush(expected, new)
}
// We will flush on drop anyway, but this method makes it
@@ -309,16 +305,16 @@ where
}
}
pub(self) fn replace_historic_noflush(
&mut self,
pub(self) fn ensure_consistent_noflush(
&self,
expected: &Arc<L>,
new: Arc<L>,
new: Arc<dyn PersistentLayer>,
) -> anyhow::Result<Replacement<Arc<L>>> {
let key = historic_layer_coverage::LayerKey::from(&**expected);
let other = historic_layer_coverage::LayerKey::from(&*new);
let expected_l0 = Self::is_l0(expected);
let new_l0 = Self::is_l0(&new);
let new_l0 = LayerMap::<dyn PersistentLayer>::is_l0(&*new);
anyhow::ensure!(
key == other,
@@ -345,17 +341,7 @@ where
None
};
let replaced = self.historic.replace(&key, new.clone(), |existing| {
Self::compare_arced_layers(existing, expected)
});
if let Replacement::Replaced { .. } = &replaced {
if let Some(index) = l0_index {
self.l0_delta_layers[index] = new;
}
}
Ok(replaced)
Ok(Replacement::Replaced { in_buffered: false })
}
/// Helper function for BatchedUpdates::drop.

View File

@@ -37,7 +37,7 @@ pub use delta_layer::{DeltaLayer, DeltaLayerWriter};
pub use filename::{DeltaFileName, ImageFileName, LayerFileName};
pub use image_layer::{ImageLayer, ImageLayerWriter};
pub use inmemory_layer::InMemoryLayer;
pub use remote_layer::RemoteLayer;
pub use remote_layer::RemoteLayerDesc;
use super::layer_map::BatchedUpdates;
@@ -431,14 +431,6 @@ pub trait PersistentLayer: Layer {
/// Permanently remove this layer from disk.
fn delete_resident_layer_file(&self) -> Result<()>;
fn downcast_remote_layer(self: Arc<Self>) -> Option<std::sync::Arc<RemoteLayer>> {
None
}
fn is_remote_layer(&self) -> bool {
false
}
/// Returns None if the layer file size is not known.
///
/// Should not change over the lifetime of the layer object because
@@ -450,16 +442,6 @@ pub trait PersistentLayer: Layer {
fn access_stats(&self) -> &LayerAccessStats;
}
pub fn downcast_remote_layer(
layer: &Arc<dyn PersistentLayer>,
) -> Option<std::sync::Arc<RemoteLayer>> {
if layer.is_remote_layer() {
Arc::clone(layer).downcast_remote_layer()
} else {
None
}
}
/// Holds metadata about a layer without any content. Used mostly for testing.
///
/// To use filenames as fixtures, parse them as [`LayerFileName`] then convert from that to a

View File

@@ -30,6 +30,7 @@ use crate::repository::{Key, Value, KEY_SIZE};
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::storage_layer::{
PersistentLayer, ValueReconstructResult, ValueReconstructState,
};
@@ -57,7 +58,7 @@ use utils::{
use super::{
DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, LayerFileName, LayerIter,
LayerKeyIter, PathOrConf,
LayerKeyIter, PathOrConf, RemoteLayerDesc,
};
///
@@ -663,6 +664,17 @@ impl DeltaLayer {
&self.layer_name(),
)
}
/// Create layer descriptor for this image layer
pub fn layer_desc(&self) -> RemoteLayerDesc {
RemoteLayerDesc::new_delta(
self.tenant_id,
self.timeline_id,
&self.layer_name(),
&LayerFileMetadata::new(self.file_size()),
LayerAccessStats::empty_will_record_residence_event_later(),
)
}
}
/// A builder object for constructing a new delta layer.

View File

@@ -26,6 +26,7 @@ use crate::repository::{Key, KEY_SIZE};
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::storage_layer::{
LayerAccessStats, PersistentLayer, ValueReconstructResult, ValueReconstructState,
};
@@ -53,7 +54,7 @@ use utils::{
};
use super::filename::{ImageFileName, LayerFileName};
use super::{Layer, LayerAccessStatsReset, LayerIter, PathOrConf};
use super::{Layer, LayerAccessStatsReset, LayerIter, PathOrConf, RemoteLayerDesc};
///
/// Header stored in the beginning of the file
@@ -464,6 +465,17 @@ impl ImageLayer {
&self.layer_name(),
)
}
/// Create layer descriptor for this image layer
pub fn layer_desc(&self) -> RemoteLayerDesc {
RemoteLayerDesc::new_img(
self.tenant_id,
self.timeline_id,
&self.layer_name(),
&LayerFileMetadata::new(self.file_size()),
LayerAccessStats::empty_will_record_residence_event_later(),
)
}
}
/// A builder object for constructing a new image layer.

View File

@@ -1,4 +1,4 @@
//! A RemoteLayer is an in-memory placeholder for a layer file that exists
//! A RemoteLayerDesc is an in-memory placeholder for a layer file that exists
//! in remote storage.
//!
use crate::config::PageServerConf;
@@ -25,19 +25,19 @@ use super::{
LayerResidenceStatus, PersistentLayer,
};
/// RemoteLayer is a not yet downloaded [`ImageLayer`] or
/// RemoteLayerDesc is a not yet downloaded [`ImageLayer`] or
/// [`crate::storage_layer::DeltaLayer`].
///
/// RemoteLayer might be downloaded on-demand during operations which are
/// RemoteLayerDesc might be downloaded on-demand during operations which are
/// allowed download remote layers and during which, it gets replaced with a
/// concrete `DeltaLayer` or `ImageLayer`.
///
/// See: [`crate::context::RequestContext`] for authorization to download
pub struct RemoteLayer {
tenantid: TenantId,
timelineid: TimelineId,
key_range: Range<Key>,
lsn_range: Range<Lsn>,
pub struct RemoteLayerDesc {
pub(crate) tenantid: TenantId,
pub(crate) timelineid: TimelineId,
pub(crate) key_range: Range<Key>,
pub(crate) lsn_range: Range<Lsn>,
pub file_name: LayerFileName,
@@ -54,7 +54,7 @@ pub struct RemoteLayer {
/// Has `LayerMap::replace` failed for this (true) or not (false).
///
/// Used together with [`ongoing_download`] semaphore in `Timeline::download_remote_layer`.
/// The field is used to mark a RemoteLayer permanently (until restart or ignore+load)
/// The field is used to mark a RemoteLayerDesc permanently (until restart or ignore+load)
/// unprocessable, because a LayerMap::replace failed.
///
/// It is very unlikely to accumulate these in the Timeline's LayerMap, but having this avoids
@@ -63,9 +63,9 @@ pub struct RemoteLayer {
pub(crate) download_replacement_failure: std::sync::atomic::AtomicBool,
}
impl std::fmt::Debug for RemoteLayer {
impl std::fmt::Debug for RemoteLayerDesc {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RemoteLayer")
f.debug_struct("RemoteLayerDesc")
.field("file_name", &self.file_name)
.field("layer_metadata", &self.layer_metadata)
.field("is_incremental", &self.is_incremental)
@@ -73,7 +73,7 @@ impl std::fmt::Debug for RemoteLayer {
}
}
impl Layer for RemoteLayer {
impl Layer for RemoteLayerDesc {
fn get_key_range(&self) -> Range<Key> {
self.key_range.clone()
}
@@ -119,7 +119,7 @@ impl Layer for RemoteLayer {
}
}
impl PersistentLayer for RemoteLayer {
impl PersistentLayer for RemoteLayerDesc {
fn get_tenant_id(&self) -> TenantId {
self.tenantid
}
@@ -160,14 +160,6 @@ impl PersistentLayer for RemoteLayer {
bail!("remote layer has no layer file");
}
fn downcast_remote_layer<'a>(self: Arc<Self>) -> Option<std::sync::Arc<RemoteLayer>> {
Some(self)
}
fn is_remote_layer(&self) -> bool {
true
}
fn file_size(&self) -> u64 {
self.layer_metadata.file_size()
}
@@ -201,15 +193,15 @@ impl PersistentLayer for RemoteLayer {
}
}
impl RemoteLayer {
impl RemoteLayerDesc {
pub fn new_img(
tenantid: TenantId,
timelineid: TimelineId,
fname: &ImageFileName,
layer_metadata: &LayerFileMetadata,
access_stats: LayerAccessStats,
) -> RemoteLayer {
RemoteLayer {
) -> RemoteLayerDesc {
RemoteLayerDesc {
tenantid,
timelineid,
key_range: fname.key_range.clone(),
@@ -230,8 +222,8 @@ impl RemoteLayer {
fname: &DeltaFileName,
layer_metadata: &LayerFileMetadata,
access_stats: LayerAccessStats,
) -> RemoteLayer {
RemoteLayer {
) -> RemoteLayerDesc {
RemoteLayerDesc {
tenantid,
timelineid,
key_range: fname.key_range.clone(),

View File

@@ -35,7 +35,7 @@ use crate::context::{DownloadBehavior, RequestContext};
use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata};
use crate::tenant::storage_layer::{
DeltaFileName, DeltaLayerWriter, ImageFileName, ImageLayerWriter, InMemoryLayer,
LayerAccessStats, LayerFileName, RemoteLayer,
LayerAccessStats, LayerFileName, RemoteLayerDesc,
};
use crate::tenant::{
ephemeral_file::is_ephemeral_file,
@@ -77,6 +77,7 @@ use self::eviction_task::EvictionTaskTimelineState;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::config::TenantConf;
use super::layer_cache::LayerCache;
use super::layer_map::BatchedUpdates;
use super::remote_timeline_client::index::IndexPart;
use super::remote_timeline_client::RemoteTimelineClient;
@@ -119,7 +120,7 @@ pub struct Timeline {
pub pg_version: u32,
pub(super) layers: RwLock<LayerMap<dyn PersistentLayer>>,
pub(super) layers: RwLock<LayerMap<RemoteLayerDesc>>,
/// Set of key ranges which should be covered by image layers to
/// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
@@ -241,6 +242,8 @@ pub struct Timeline {
pub delete_lock: tokio::sync::Mutex<bool>,
eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
layer_cache: Arc<LayerCache>,
}
/// Internal structure to hold all data needed for logical size calculation.
@@ -1007,20 +1010,22 @@ impl Timeline {
#[instrument(skip_all, fields(tenant = %self.tenant_id, timeline = %self.timeline_id))]
pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
let Some(layer) = self.find_layer(layer_file_name) else { return Ok(None) };
let Some(remote_layer) = layer.downcast_remote_layer() else { return Ok(Some(false)) };
let Some(remote_layer_desc) = self.find_layer_desc(layer_file_name) else { return Ok(None) };
if self.layer_cache.contains(&remote_layer_desc.filename()) {
return Ok(Some(false));
}
if self.remote_client.is_none() {
return Ok(Some(false));
}
self.download_remote_layer(remote_layer).await?;
self.download_remote_layer(remote_layer_desc).await?;
Ok(Some(true))
}
/// Like [`evict_layer_batch`], but for just one layer.
/// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`.
pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
let Some(local_layer) = self.find_layer(layer_file_name) else { return Ok(None) };
let Some(local_layer) = self.find_layer_desc(layer_file_name) else { return Ok(None) };
let remote_client = self
.remote_client
.as_ref()
@@ -1047,7 +1052,7 @@ impl Timeline {
pub async fn evict_layers(
&self,
_: &GenericRemoteStorage,
layers_to_evict: &[Arc<dyn PersistentLayer>],
layers_to_evict: &[Arc<RemoteLayerDesc>],
cancel: CancellationToken,
) -> anyhow::Result<Vec<Option<anyhow::Result<bool>>>> {
let remote_client = self.remote_client.clone().expect(
@@ -1082,7 +1087,7 @@ impl Timeline {
async fn evict_layer_batch(
&self,
remote_client: &Arc<RemoteTimelineClient>,
layers_to_evict: &[Arc<dyn PersistentLayer>],
layers_to_evict: &[Arc<RemoteLayerDesc>],
cancel: CancellationToken,
) -> anyhow::Result<Vec<Option<anyhow::Result<bool>>>> {
// ensure that the layers have finished uploading
@@ -1131,12 +1136,12 @@ impl Timeline {
fn evict_layer_batch_impl(
&self,
_layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
local_layer: &Arc<dyn PersistentLayer>,
batch_updates: &mut BatchedUpdates<'_, dyn PersistentLayer>,
local_layer: &Arc<RemoteLayerDesc>,
batch_updates: &mut BatchedUpdates<'_, RemoteLayerDesc>,
) -> anyhow::Result<bool> {
use super::layer_map::Replacement;
if local_layer.is_remote_layer() {
if !self.layer_cache.contains(&local_layer.filename()) {
// TODO(issue #3851): consider returning an err here instead of false,
// which is the same out the match later
return Ok(false);
@@ -1163,7 +1168,7 @@ impl Timeline {
let layer_metadata = LayerFileMetadata::new(layer_file_size);
let new_remote_layer = Arc::new(match local_layer.filename() {
LayerFileName::Image(image_name) => RemoteLayer::new_img(
LayerFileName::Image(image_name) => RemoteLayerDesc::new_img(
self.tenant_id,
self.timeline_id,
&image_name,
@@ -1172,7 +1177,7 @@ impl Timeline {
.access_stats()
.clone_for_residence_change(batch_updates, LayerResidenceStatus::Evicted),
),
LayerFileName::Delta(delta_name) => RemoteLayer::new_delta(
LayerFileName::Delta(delta_name) => RemoteLayerDesc::new_delta(
self.tenant_id,
self.timeline_id,
&delta_name,
@@ -1183,6 +1188,7 @@ impl Timeline {
),
});
/*
let replaced = match batch_updates.replace_historic(local_layer, new_remote_layer)? {
Replacement::Replaced { .. } => {
if let Err(e) = local_layer.delete_resident_layer_file() {
@@ -1233,8 +1239,10 @@ impl Timeline {
false
}
};
*/
Ok(replaced)
// Ok(replaced)
Ok(true)
}
}
@@ -1419,6 +1427,8 @@ impl Timeline {
EvictionTaskTimelineState::default(),
),
delete_lock: tokio::sync::Mutex::new(false),
layer_cache: Arc::new(LayerCache::new()),
};
result.repartition_threshold = result.get_checkpoint_distance() / 10;
result
@@ -1565,9 +1575,12 @@ impl Timeline {
LayerAccessStats::for_loading_layer(&updates, LayerResidenceStatus::Resident),
);
let remote_desc = layer.layer_desc();
trace!("found layer {}", layer.path().display());
total_physical_size += file_size;
updates.insert_historic(Arc::new(layer));
self.layer_cache.insert(layer.filename(), Arc::new(layer));
updates.insert_historic(Arc::new(remote_desc));
num_layers += 1;
} else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) {
// Create a DeltaLayer struct for each delta file.
@@ -1599,7 +1612,9 @@ impl Timeline {
trace!("found layer {}", layer.path().display());
total_physical_size += file_size;
updates.insert_historic(Arc::new(layer));
let remote_desc = layer.layer_desc();
self.layer_cache.insert(layer.filename(), Arc::new(layer));
updates.insert_historic(Arc::new(remote_desc));
num_layers += 1;
} else if fname == METADATA_FILE_NAME || fname.ends_with(".old") {
// ignore these
@@ -1644,9 +1659,9 @@ impl Timeline {
async fn create_remote_layers(
&self,
index_part: &IndexPart,
local_layers: HashMap<LayerFileName, Arc<dyn PersistentLayer>>,
local_layers: HashMap<LayerFileName, Arc<RemoteLayerDesc>>,
up_to_date_disk_consistent_lsn: Lsn,
) -> anyhow::Result<HashMap<LayerFileName, Arc<dyn PersistentLayer>>> {
) -> anyhow::Result<HashMap<LayerFileName, Arc<RemoteLayerDesc>>> {
// Are we missing some files that are present in remote storage?
// Create RemoteLayer instances for them.
let mut local_only_layers = local_layers;
@@ -1725,7 +1740,7 @@ impl Timeline {
continue;
}
let remote_layer = RemoteLayer::new_img(
let remote_layer = RemoteLayerDesc::new_img(
self.tenant_id,
self.timeline_id,
imgfilename,
@@ -1753,7 +1768,7 @@ impl Timeline {
);
continue;
}
let remote_layer = RemoteLayer::new_delta(
let remote_layer = RemoteLayerDesc::new_delta(
self.tenant_id,
self.timeline_id,
deltafilename,
@@ -2159,7 +2174,7 @@ impl Timeline {
}
}
fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
fn find_layer_desc(&self, layer_file_name: &str) -> Option<Arc<RemoteLayerDesc>> {
for historic_layer in self.layers.read().unwrap().iter_historic_layers() {
let historic_layer_name = historic_layer.filename().file_name();
if layer_file_name == historic_layer_name {
@@ -2176,10 +2191,10 @@ impl Timeline {
&self,
// we cannot remove layers otherwise, since gc and compaction will race
_layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
layer: Arc<dyn PersistentLayer>,
updates: &mut BatchedUpdates<'_, dyn PersistentLayer>,
layer: Arc<RemoteLayerDesc>,
updates: &mut BatchedUpdates<'_, RemoteLayerDesc>,
) -> anyhow::Result<()> {
if !layer.is_remote_layer() {
if self.layer_cache.contains(&layer.filename()) {
layer.delete_resident_layer_file()?;
let layer_file_size = layer.file_size();
self.metrics
@@ -2428,13 +2443,7 @@ impl Timeline {
if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
// If it's a remote layer, download it and retry.
if let Some(remote_layer) =
super::storage_layer::downcast_remote_layer(&layer)
{
// TODO: push a breadcrumb to 'traversal_path' to record the fact that
// we downloaded / would need to download this layer.
remote_layer // download happens outside the scope of `layers` guard object
} else {
if let Some(layer) = self.layer_cache.get(&layer.filename()) {
// Get all the data needed to reconstruct the page version from this layer.
// But if we have an older cached page image, no need to go past that.
let lsn_floor = max(cached_lsn + 1, lsn_floor);
@@ -2457,6 +2466,10 @@ impl Timeline {
}),
));
continue 'outer;
} else {
// TODO: push a breadcrumb to 'traversal_path' to record the fact that
// we downloaded / would need to download this layer.
layer // download happens outside the scope of `layers` guard object
}
} else if timeline.ancestor_timeline.is_some() {
// Nothing on this timeline. Traverse to parent
@@ -2896,7 +2909,8 @@ impl Timeline {
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
batch_updates.insert_historic(l);
batch_updates.insert_historic(Arc::new(l.layer_desc()));
self.layer_cache.insert(l.filename(), l);
batch_updates.flush();
// update the timeline's physical size
@@ -3142,7 +3156,9 @@ impl Timeline {
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
updates.insert_historic(l);
updates.insert_historic(Arc::new(l.layer_desc()));
let x: Arc<dyn PersistentLayer> = l;
self.layer_cache.insert(x.filename(), x)
}
updates.flush();
drop(layers);
@@ -3155,7 +3171,7 @@ impl Timeline {
#[derive(Default)]
struct CompactLevel0Phase1Result {
new_layers: Vec<DeltaLayer>,
deltas_to_compact: Vec<Arc<dyn PersistentLayer>>,
deltas_to_compact: Vec<Arc<RemoteLayerDesc>>,
}
/// Top-level failure to compact.
@@ -3165,7 +3181,7 @@ enum CompactionError {
///
/// This should not happen repeatedly, but will be retried once by top-level
/// `Timeline::compact`.
DownloadRequired(Vec<Arc<RemoteLayer>>),
DownloadRequired(Vec<Arc<RemoteLayerDesc>>),
/// Compaction cannot be done right now; page reconstruction and so on.
Other(anyhow::Error),
}
@@ -3237,13 +3253,9 @@ impl Timeline {
let remotes = deltas_to_compact
.iter()
.filter(|l| l.is_remote_layer())
.filter(|l| !self.layer_cache.contains(&l.filename()))
.inspect(|l| info!("compact requires download of {}", l.filename().file_name()))
.map(|l| {
l.clone()
.downcast_remote_layer()
.expect("just checked it is remote layer")
})
.cloned()
.collect::<Vec<_>>();
if !remotes.is_empty() {
@@ -3583,13 +3595,15 @@ impl Timeline {
.add(metadata.len());
new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len()));
let remote_desc = l.layer_desc();
let x: Arc<dyn PersistentLayer + 'static> = Arc::new(l);
x.access_stats().record_residence_event(
&updates,
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
updates.insert_historic(x);
updates.insert_historic(Arc::new(remote_desc));
self.layer_cache.insert(x.filename(), x)
}
// Now that we have reshuffled the data to set of new delta layers, we can
@@ -4062,7 +4076,7 @@ impl Timeline {
#[instrument(skip_all, fields(layer=%remote_layer.short_id()))]
pub async fn download_remote_layer(
&self,
remote_layer: Arc<RemoteLayer>,
remote_layer: Arc<RemoteLayerDesc>,
) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_and_timeline_id();
@@ -4119,10 +4133,12 @@ impl Timeline {
// Delta- or ImageLayer in the layer map.
let mut layers = self_clone.layers.write().unwrap();
let mut updates = layers.batch_update();
let new_layer = remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size);
let new_layer =
remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size);
{
use crate::tenant::layer_map::Replacement;
let l: Arc<dyn PersistentLayer> = remote_layer.clone();
/*
let failure = match updates.replace_historic(&l, new_layer) {
Ok(Replacement::Replaced { .. }) => false,
Ok(Replacement::NotFound) => {
@@ -4177,8 +4193,9 @@ impl Timeline {
remote_layer
.download_replacement_failure
.store(true, Relaxed);
}
} */
}
updates.flush();
drop(layers);
@@ -4191,7 +4208,10 @@ impl Timeline {
remote_layer.ongoing_download.close();
} else {
// Keep semaphore open. We'll drop the permit at the end of the function.
error!("layer file download failed: {:?}", result.as_ref().unwrap_err());
error!(
"layer file download failed: {:?}",
result.as_ref().unwrap_err()
);
}
// Don't treat it as an error if the task that triggered the download
@@ -4205,7 +4225,8 @@ impl Timeline {
drop(permit);
Ok(())
}.in_current_span(),
}
.in_current_span(),
);
receiver.await.context("download task cancelled")?
@@ -4278,7 +4299,7 @@ impl Timeline {
let layers = self.layers.read().unwrap();
layers
.iter_historic_layers()
.filter_map(|l| l.downcast_remote_layer())
.filter(|l| !self.layer_cache.contains(&l.filename()))
.map(|l| self.download_remote_layer(l))
.for_each(|dl| downloads.push(dl))
}
@@ -4353,7 +4374,7 @@ pub struct DiskUsageEvictionInfo {
}
pub struct LocalLayerInfoForDiskUsageEviction {
pub layer: Arc<dyn PersistentLayer>,
pub layer: Arc<RemoteLayerDesc>,
pub last_activity_ts: SystemTime,
}
@@ -4387,7 +4408,7 @@ impl Timeline {
let file_size = l.file_size();
max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
if l.is_remote_layer() {
if !self.layer_cache.contains(&l.filename()) {
continue;
}

View File

@@ -29,7 +29,7 @@ use crate::{
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
tenant::{
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
storage_layer::PersistentLayer,
storage_layer::{PersistentLayer, RemoteLayerDesc},
LogicalSizeCalculationCause, Tenant,
},
};
@@ -184,11 +184,11 @@ impl Timeline {
// NB: all the checks can be invalidated as soon as we release the layer map lock.
// We don't want to hold the layer map lock during eviction.
// So, we just need to deal with this.
let candidates: Vec<Arc<dyn PersistentLayer>> = {
let candidates: Vec<Arc<RemoteLayerDesc>> = {
let layers = self.layers.read().unwrap();
let mut candidates = Vec::new();
for hist_layer in layers.iter_historic_layers() {
if hist_layer.is_remote_layer() {
if !self.layer_cache.contains(&hist_layer.filename()) {
continue;
}