mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-19 03:12:55 +00:00
Compare commits
1 Commits
hackathon/
...
skyzh/refa
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9479470d05 |
@@ -60,7 +60,7 @@ use utils::serde_percent::Percent;
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
|
||||
tenant::{self, storage_layer::PersistentLayer, Timeline},
|
||||
tenant::{self, storage_layer::{PersistentLayer, RemoteLayerDesc}, Timeline},
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
@@ -329,7 +329,7 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
// If we get far enough in the list that we start to evict layers that are below
|
||||
// the tenant's min-resident-size threshold, print a warning, and memorize the disk
|
||||
// usage at that point, in 'usage_planned_min_resident_size_respecting'.
|
||||
let mut batched: HashMap<_, Vec<Arc<dyn PersistentLayer>>> = HashMap::new();
|
||||
let mut batched: HashMap<_, Vec<Arc<RemoteLayerDesc>>> = HashMap::new();
|
||||
let mut warned = None;
|
||||
let mut usage_planned = usage_pre;
|
||||
for (i, (partition, candidate)) in candidates.into_iter().enumerate() {
|
||||
@@ -434,7 +434,7 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
#[derive(Clone)]
|
||||
struct EvictionCandidate {
|
||||
timeline: Arc<Timeline>,
|
||||
layer: Arc<dyn PersistentLayer>,
|
||||
layer: Arc<RemoteLayerDesc>,
|
||||
last_activity_ts: SystemTime,
|
||||
}
|
||||
|
||||
|
||||
@@ -84,6 +84,7 @@ pub mod block_io;
|
||||
pub mod disk_btree;
|
||||
pub(crate) mod ephemeral_file;
|
||||
pub mod layer_map;
|
||||
pub mod layer_cache;
|
||||
|
||||
pub mod metadata;
|
||||
mod par_fsync;
|
||||
|
||||
34
pageserver/src/tenant/layer_cache.rs
Normal file
34
pageserver/src/tenant/layer_cache.rs
Normal file
@@ -0,0 +1,34 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
|
||||
use super::storage_layer::{LayerFileName, PersistentLayer, RemoteLayerDesc};
|
||||
|
||||
pub struct LayerCache {
|
||||
layers: Mutex<HashMap<LayerFileName, Arc<dyn PersistentLayer>>>,
|
||||
}
|
||||
|
||||
impl LayerCache {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
layers: Mutex::new(HashMap::new()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get(&self, layer_fname: &LayerFileName) -> Option<Arc<dyn PersistentLayer>> {
|
||||
let guard: std::sync::MutexGuard<HashMap<LayerFileName, Arc<dyn PersistentLayer>>> =
|
||||
self.layers.lock().unwrap();
|
||||
guard.get(layer_fname).cloned()
|
||||
}
|
||||
|
||||
pub fn contains(&self, layer_fname: &LayerFileName) -> bool {
|
||||
let guard = self.layers.lock().unwrap();
|
||||
guard.contains_key(layer_fname)
|
||||
}
|
||||
|
||||
pub fn insert(&self, layer_fname: LayerFileName, persistent_layer: Arc<dyn PersistentLayer>) {
|
||||
let mut guard = self.layers.lock().unwrap();
|
||||
guard.insert(layer_fname, persistent_layer);
|
||||
}
|
||||
}
|
||||
@@ -61,6 +61,7 @@ use historic_layer_coverage::BufferedHistoricLayerCoverage;
|
||||
pub use historic_layer_coverage::Replacement;
|
||||
|
||||
use super::storage_layer::range_eq;
|
||||
use super::storage_layer::PersistentLayer;
|
||||
|
||||
///
|
||||
/// LayerMap tracks what layers exist on a timeline.
|
||||
@@ -138,24 +139,19 @@ where
|
||||
self.layer_map.remove_historic_noflush(layer)
|
||||
}
|
||||
|
||||
/// Replaces existing layer iff it is the `expected`.
|
||||
/// Ensure the downloaded layer matches existing layer.
|
||||
///
|
||||
/// If the expected layer has been removed it will not be inserted by this function.
|
||||
///
|
||||
/// Returned `Replacement` describes succeeding in replacement or the reason why it could not
|
||||
/// Returned `Replacement` describes succeeding in checking or the reason why it could not
|
||||
/// be done.
|
||||
///
|
||||
/// TODO replacement can be done without buffering and rebuilding layer map updates.
|
||||
/// One way to do that is to add a layer of indirection for returned values, so
|
||||
/// that we can replace values only by updating a hashmap.
|
||||
pub fn replace_historic(
|
||||
&mut self,
|
||||
pub fn ensure_consistent(
|
||||
&self,
|
||||
expected: &Arc<L>,
|
||||
new: Arc<L>,
|
||||
new: Arc<dyn PersistentLayer>,
|
||||
) -> anyhow::Result<Replacement<Arc<L>>> {
|
||||
fail::fail_point!("layermap-replace-notfound", |_| Ok(Replacement::NotFound));
|
||||
|
||||
self.layer_map.replace_historic_noflush(expected, new)
|
||||
self.layer_map
|
||||
.ensure_consistent_noflush(expected, new)
|
||||
}
|
||||
|
||||
// We will flush on drop anyway, but this method makes it
|
||||
@@ -309,16 +305,16 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
pub(self) fn replace_historic_noflush(
|
||||
&mut self,
|
||||
pub(self) fn ensure_consistent_noflush(
|
||||
&self,
|
||||
expected: &Arc<L>,
|
||||
new: Arc<L>,
|
||||
new: Arc<dyn PersistentLayer>,
|
||||
) -> anyhow::Result<Replacement<Arc<L>>> {
|
||||
let key = historic_layer_coverage::LayerKey::from(&**expected);
|
||||
let other = historic_layer_coverage::LayerKey::from(&*new);
|
||||
|
||||
let expected_l0 = Self::is_l0(expected);
|
||||
let new_l0 = Self::is_l0(&new);
|
||||
let new_l0 = LayerMap::<dyn PersistentLayer>::is_l0(&*new);
|
||||
|
||||
anyhow::ensure!(
|
||||
key == other,
|
||||
@@ -345,17 +341,7 @@ where
|
||||
None
|
||||
};
|
||||
|
||||
let replaced = self.historic.replace(&key, new.clone(), |existing| {
|
||||
Self::compare_arced_layers(existing, expected)
|
||||
});
|
||||
|
||||
if let Replacement::Replaced { .. } = &replaced {
|
||||
if let Some(index) = l0_index {
|
||||
self.l0_delta_layers[index] = new;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(replaced)
|
||||
Ok(Replacement::Replaced { in_buffered: false })
|
||||
}
|
||||
|
||||
/// Helper function for BatchedUpdates::drop.
|
||||
|
||||
@@ -37,7 +37,7 @@ pub use delta_layer::{DeltaLayer, DeltaLayerWriter};
|
||||
pub use filename::{DeltaFileName, ImageFileName, LayerFileName};
|
||||
pub use image_layer::{ImageLayer, ImageLayerWriter};
|
||||
pub use inmemory_layer::InMemoryLayer;
|
||||
pub use remote_layer::RemoteLayer;
|
||||
pub use remote_layer::RemoteLayerDesc;
|
||||
|
||||
use super::layer_map::BatchedUpdates;
|
||||
|
||||
@@ -431,14 +431,6 @@ pub trait PersistentLayer: Layer {
|
||||
/// Permanently remove this layer from disk.
|
||||
fn delete_resident_layer_file(&self) -> Result<()>;
|
||||
|
||||
fn downcast_remote_layer(self: Arc<Self>) -> Option<std::sync::Arc<RemoteLayer>> {
|
||||
None
|
||||
}
|
||||
|
||||
fn is_remote_layer(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
/// Returns None if the layer file size is not known.
|
||||
///
|
||||
/// Should not change over the lifetime of the layer object because
|
||||
@@ -450,16 +442,6 @@ pub trait PersistentLayer: Layer {
|
||||
fn access_stats(&self) -> &LayerAccessStats;
|
||||
}
|
||||
|
||||
pub fn downcast_remote_layer(
|
||||
layer: &Arc<dyn PersistentLayer>,
|
||||
) -> Option<std::sync::Arc<RemoteLayer>> {
|
||||
if layer.is_remote_layer() {
|
||||
Arc::clone(layer).downcast_remote_layer()
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Holds metadata about a layer without any content. Used mostly for testing.
|
||||
///
|
||||
/// To use filenames as fixtures, parse them as [`LayerFileName`] then convert from that to a
|
||||
|
||||
@@ -30,6 +30,7 @@ use crate::repository::{Key, Value, KEY_SIZE};
|
||||
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
|
||||
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockReader, FileBlockReader};
|
||||
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
|
||||
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use crate::tenant::storage_layer::{
|
||||
PersistentLayer, ValueReconstructResult, ValueReconstructState,
|
||||
};
|
||||
@@ -57,7 +58,7 @@ use utils::{
|
||||
|
||||
use super::{
|
||||
DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, LayerFileName, LayerIter,
|
||||
LayerKeyIter, PathOrConf,
|
||||
LayerKeyIter, PathOrConf, RemoteLayerDesc,
|
||||
};
|
||||
|
||||
///
|
||||
@@ -663,6 +664,17 @@ impl DeltaLayer {
|
||||
&self.layer_name(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Create layer descriptor for this image layer
|
||||
pub fn layer_desc(&self) -> RemoteLayerDesc {
|
||||
RemoteLayerDesc::new_delta(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
&self.layer_name(),
|
||||
&LayerFileMetadata::new(self.file_size()),
|
||||
LayerAccessStats::empty_will_record_residence_event_later(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// A builder object for constructing a new delta layer.
|
||||
|
||||
@@ -26,6 +26,7 @@ use crate::repository::{Key, KEY_SIZE};
|
||||
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
|
||||
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
|
||||
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
|
||||
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use crate::tenant::storage_layer::{
|
||||
LayerAccessStats, PersistentLayer, ValueReconstructResult, ValueReconstructState,
|
||||
};
|
||||
@@ -53,7 +54,7 @@ use utils::{
|
||||
};
|
||||
|
||||
use super::filename::{ImageFileName, LayerFileName};
|
||||
use super::{Layer, LayerAccessStatsReset, LayerIter, PathOrConf};
|
||||
use super::{Layer, LayerAccessStatsReset, LayerIter, PathOrConf, RemoteLayerDesc};
|
||||
|
||||
///
|
||||
/// Header stored in the beginning of the file
|
||||
@@ -464,6 +465,17 @@ impl ImageLayer {
|
||||
&self.layer_name(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Create layer descriptor for this image layer
|
||||
pub fn layer_desc(&self) -> RemoteLayerDesc {
|
||||
RemoteLayerDesc::new_img(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
&self.layer_name(),
|
||||
&LayerFileMetadata::new(self.file_size()),
|
||||
LayerAccessStats::empty_will_record_residence_event_later(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// A builder object for constructing a new image layer.
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
//! A RemoteLayer is an in-memory placeholder for a layer file that exists
|
||||
//! A RemoteLayerDesc is an in-memory placeholder for a layer file that exists
|
||||
//! in remote storage.
|
||||
//!
|
||||
use crate::config::PageServerConf;
|
||||
@@ -25,19 +25,19 @@ use super::{
|
||||
LayerResidenceStatus, PersistentLayer,
|
||||
};
|
||||
|
||||
/// RemoteLayer is a not yet downloaded [`ImageLayer`] or
|
||||
/// RemoteLayerDesc is a not yet downloaded [`ImageLayer`] or
|
||||
/// [`crate::storage_layer::DeltaLayer`].
|
||||
///
|
||||
/// RemoteLayer might be downloaded on-demand during operations which are
|
||||
/// RemoteLayerDesc might be downloaded on-demand during operations which are
|
||||
/// allowed download remote layers and during which, it gets replaced with a
|
||||
/// concrete `DeltaLayer` or `ImageLayer`.
|
||||
///
|
||||
/// See: [`crate::context::RequestContext`] for authorization to download
|
||||
pub struct RemoteLayer {
|
||||
tenantid: TenantId,
|
||||
timelineid: TimelineId,
|
||||
key_range: Range<Key>,
|
||||
lsn_range: Range<Lsn>,
|
||||
pub struct RemoteLayerDesc {
|
||||
pub(crate) tenantid: TenantId,
|
||||
pub(crate) timelineid: TimelineId,
|
||||
pub(crate) key_range: Range<Key>,
|
||||
pub(crate) lsn_range: Range<Lsn>,
|
||||
|
||||
pub file_name: LayerFileName,
|
||||
|
||||
@@ -54,7 +54,7 @@ pub struct RemoteLayer {
|
||||
/// Has `LayerMap::replace` failed for this (true) or not (false).
|
||||
///
|
||||
/// Used together with [`ongoing_download`] semaphore in `Timeline::download_remote_layer`.
|
||||
/// The field is used to mark a RemoteLayer permanently (until restart or ignore+load)
|
||||
/// The field is used to mark a RemoteLayerDesc permanently (until restart or ignore+load)
|
||||
/// unprocessable, because a LayerMap::replace failed.
|
||||
///
|
||||
/// It is very unlikely to accumulate these in the Timeline's LayerMap, but having this avoids
|
||||
@@ -63,9 +63,9 @@ pub struct RemoteLayer {
|
||||
pub(crate) download_replacement_failure: std::sync::atomic::AtomicBool,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for RemoteLayer {
|
||||
impl std::fmt::Debug for RemoteLayerDesc {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("RemoteLayer")
|
||||
f.debug_struct("RemoteLayerDesc")
|
||||
.field("file_name", &self.file_name)
|
||||
.field("layer_metadata", &self.layer_metadata)
|
||||
.field("is_incremental", &self.is_incremental)
|
||||
@@ -73,7 +73,7 @@ impl std::fmt::Debug for RemoteLayer {
|
||||
}
|
||||
}
|
||||
|
||||
impl Layer for RemoteLayer {
|
||||
impl Layer for RemoteLayerDesc {
|
||||
fn get_key_range(&self) -> Range<Key> {
|
||||
self.key_range.clone()
|
||||
}
|
||||
@@ -119,7 +119,7 @@ impl Layer for RemoteLayer {
|
||||
}
|
||||
}
|
||||
|
||||
impl PersistentLayer for RemoteLayer {
|
||||
impl PersistentLayer for RemoteLayerDesc {
|
||||
fn get_tenant_id(&self) -> TenantId {
|
||||
self.tenantid
|
||||
}
|
||||
@@ -160,14 +160,6 @@ impl PersistentLayer for RemoteLayer {
|
||||
bail!("remote layer has no layer file");
|
||||
}
|
||||
|
||||
fn downcast_remote_layer<'a>(self: Arc<Self>) -> Option<std::sync::Arc<RemoteLayer>> {
|
||||
Some(self)
|
||||
}
|
||||
|
||||
fn is_remote_layer(&self) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
fn file_size(&self) -> u64 {
|
||||
self.layer_metadata.file_size()
|
||||
}
|
||||
@@ -201,15 +193,15 @@ impl PersistentLayer for RemoteLayer {
|
||||
}
|
||||
}
|
||||
|
||||
impl RemoteLayer {
|
||||
impl RemoteLayerDesc {
|
||||
pub fn new_img(
|
||||
tenantid: TenantId,
|
||||
timelineid: TimelineId,
|
||||
fname: &ImageFileName,
|
||||
layer_metadata: &LayerFileMetadata,
|
||||
access_stats: LayerAccessStats,
|
||||
) -> RemoteLayer {
|
||||
RemoteLayer {
|
||||
) -> RemoteLayerDesc {
|
||||
RemoteLayerDesc {
|
||||
tenantid,
|
||||
timelineid,
|
||||
key_range: fname.key_range.clone(),
|
||||
@@ -230,8 +222,8 @@ impl RemoteLayer {
|
||||
fname: &DeltaFileName,
|
||||
layer_metadata: &LayerFileMetadata,
|
||||
access_stats: LayerAccessStats,
|
||||
) -> RemoteLayer {
|
||||
RemoteLayer {
|
||||
) -> RemoteLayerDesc {
|
||||
RemoteLayerDesc {
|
||||
tenantid,
|
||||
timelineid,
|
||||
key_range: fname.key_range.clone(),
|
||||
|
||||
@@ -35,7 +35,7 @@ use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata};
|
||||
use crate::tenant::storage_layer::{
|
||||
DeltaFileName, DeltaLayerWriter, ImageFileName, ImageLayerWriter, InMemoryLayer,
|
||||
LayerAccessStats, LayerFileName, RemoteLayer,
|
||||
LayerAccessStats, LayerFileName, RemoteLayerDesc,
|
||||
};
|
||||
use crate::tenant::{
|
||||
ephemeral_file::is_ephemeral_file,
|
||||
@@ -77,6 +77,7 @@ use self::eviction_task::EvictionTaskTimelineState;
|
||||
use self::walreceiver::{WalReceiver, WalReceiverConf};
|
||||
|
||||
use super::config::TenantConf;
|
||||
use super::layer_cache::LayerCache;
|
||||
use super::layer_map::BatchedUpdates;
|
||||
use super::remote_timeline_client::index::IndexPart;
|
||||
use super::remote_timeline_client::RemoteTimelineClient;
|
||||
@@ -119,7 +120,7 @@ pub struct Timeline {
|
||||
|
||||
pub pg_version: u32,
|
||||
|
||||
pub(super) layers: RwLock<LayerMap<dyn PersistentLayer>>,
|
||||
pub(super) layers: RwLock<LayerMap<RemoteLayerDesc>>,
|
||||
|
||||
/// Set of key ranges which should be covered by image layers to
|
||||
/// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
|
||||
@@ -241,6 +242,8 @@ pub struct Timeline {
|
||||
pub delete_lock: tokio::sync::Mutex<bool>,
|
||||
|
||||
eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
|
||||
|
||||
layer_cache: Arc<LayerCache>,
|
||||
}
|
||||
|
||||
/// Internal structure to hold all data needed for logical size calculation.
|
||||
@@ -1007,20 +1010,22 @@ impl Timeline {
|
||||
|
||||
#[instrument(skip_all, fields(tenant = %self.tenant_id, timeline = %self.timeline_id))]
|
||||
pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
|
||||
let Some(layer) = self.find_layer(layer_file_name) else { return Ok(None) };
|
||||
let Some(remote_layer) = layer.downcast_remote_layer() else { return Ok(Some(false)) };
|
||||
let Some(remote_layer_desc) = self.find_layer_desc(layer_file_name) else { return Ok(None) };
|
||||
if self.layer_cache.contains(&remote_layer_desc.filename()) {
|
||||
return Ok(Some(false));
|
||||
}
|
||||
if self.remote_client.is_none() {
|
||||
return Ok(Some(false));
|
||||
}
|
||||
|
||||
self.download_remote_layer(remote_layer).await?;
|
||||
self.download_remote_layer(remote_layer_desc).await?;
|
||||
Ok(Some(true))
|
||||
}
|
||||
|
||||
/// Like [`evict_layer_batch`], but for just one layer.
|
||||
/// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`.
|
||||
pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
|
||||
let Some(local_layer) = self.find_layer(layer_file_name) else { return Ok(None) };
|
||||
let Some(local_layer) = self.find_layer_desc(layer_file_name) else { return Ok(None) };
|
||||
let remote_client = self
|
||||
.remote_client
|
||||
.as_ref()
|
||||
@@ -1047,7 +1052,7 @@ impl Timeline {
|
||||
pub async fn evict_layers(
|
||||
&self,
|
||||
_: &GenericRemoteStorage,
|
||||
layers_to_evict: &[Arc<dyn PersistentLayer>],
|
||||
layers_to_evict: &[Arc<RemoteLayerDesc>],
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<Vec<Option<anyhow::Result<bool>>>> {
|
||||
let remote_client = self.remote_client.clone().expect(
|
||||
@@ -1082,7 +1087,7 @@ impl Timeline {
|
||||
async fn evict_layer_batch(
|
||||
&self,
|
||||
remote_client: &Arc<RemoteTimelineClient>,
|
||||
layers_to_evict: &[Arc<dyn PersistentLayer>],
|
||||
layers_to_evict: &[Arc<RemoteLayerDesc>],
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<Vec<Option<anyhow::Result<bool>>>> {
|
||||
// ensure that the layers have finished uploading
|
||||
@@ -1131,12 +1136,12 @@ impl Timeline {
|
||||
fn evict_layer_batch_impl(
|
||||
&self,
|
||||
_layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
|
||||
local_layer: &Arc<dyn PersistentLayer>,
|
||||
batch_updates: &mut BatchedUpdates<'_, dyn PersistentLayer>,
|
||||
local_layer: &Arc<RemoteLayerDesc>,
|
||||
batch_updates: &mut BatchedUpdates<'_, RemoteLayerDesc>,
|
||||
) -> anyhow::Result<bool> {
|
||||
use super::layer_map::Replacement;
|
||||
|
||||
if local_layer.is_remote_layer() {
|
||||
if !self.layer_cache.contains(&local_layer.filename()) {
|
||||
// TODO(issue #3851): consider returning an err here instead of false,
|
||||
// which is the same out the match later
|
||||
return Ok(false);
|
||||
@@ -1163,7 +1168,7 @@ impl Timeline {
|
||||
let layer_metadata = LayerFileMetadata::new(layer_file_size);
|
||||
|
||||
let new_remote_layer = Arc::new(match local_layer.filename() {
|
||||
LayerFileName::Image(image_name) => RemoteLayer::new_img(
|
||||
LayerFileName::Image(image_name) => RemoteLayerDesc::new_img(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
&image_name,
|
||||
@@ -1172,7 +1177,7 @@ impl Timeline {
|
||||
.access_stats()
|
||||
.clone_for_residence_change(batch_updates, LayerResidenceStatus::Evicted),
|
||||
),
|
||||
LayerFileName::Delta(delta_name) => RemoteLayer::new_delta(
|
||||
LayerFileName::Delta(delta_name) => RemoteLayerDesc::new_delta(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
&delta_name,
|
||||
@@ -1183,6 +1188,7 @@ impl Timeline {
|
||||
),
|
||||
});
|
||||
|
||||
/*
|
||||
let replaced = match batch_updates.replace_historic(local_layer, new_remote_layer)? {
|
||||
Replacement::Replaced { .. } => {
|
||||
if let Err(e) = local_layer.delete_resident_layer_file() {
|
||||
@@ -1233,8 +1239,10 @@ impl Timeline {
|
||||
false
|
||||
}
|
||||
};
|
||||
*/
|
||||
|
||||
Ok(replaced)
|
||||
// Ok(replaced)
|
||||
Ok(true)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1419,6 +1427,8 @@ impl Timeline {
|
||||
EvictionTaskTimelineState::default(),
|
||||
),
|
||||
delete_lock: tokio::sync::Mutex::new(false),
|
||||
|
||||
layer_cache: Arc::new(LayerCache::new()),
|
||||
};
|
||||
result.repartition_threshold = result.get_checkpoint_distance() / 10;
|
||||
result
|
||||
@@ -1565,9 +1575,12 @@ impl Timeline {
|
||||
LayerAccessStats::for_loading_layer(&updates, LayerResidenceStatus::Resident),
|
||||
);
|
||||
|
||||
let remote_desc = layer.layer_desc();
|
||||
|
||||
trace!("found layer {}", layer.path().display());
|
||||
total_physical_size += file_size;
|
||||
updates.insert_historic(Arc::new(layer));
|
||||
self.layer_cache.insert(layer.filename(), Arc::new(layer));
|
||||
updates.insert_historic(Arc::new(remote_desc));
|
||||
num_layers += 1;
|
||||
} else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) {
|
||||
// Create a DeltaLayer struct for each delta file.
|
||||
@@ -1599,7 +1612,9 @@ impl Timeline {
|
||||
|
||||
trace!("found layer {}", layer.path().display());
|
||||
total_physical_size += file_size;
|
||||
updates.insert_historic(Arc::new(layer));
|
||||
let remote_desc = layer.layer_desc();
|
||||
self.layer_cache.insert(layer.filename(), Arc::new(layer));
|
||||
updates.insert_historic(Arc::new(remote_desc));
|
||||
num_layers += 1;
|
||||
} else if fname == METADATA_FILE_NAME || fname.ends_with(".old") {
|
||||
// ignore these
|
||||
@@ -1644,9 +1659,9 @@ impl Timeline {
|
||||
async fn create_remote_layers(
|
||||
&self,
|
||||
index_part: &IndexPart,
|
||||
local_layers: HashMap<LayerFileName, Arc<dyn PersistentLayer>>,
|
||||
local_layers: HashMap<LayerFileName, Arc<RemoteLayerDesc>>,
|
||||
up_to_date_disk_consistent_lsn: Lsn,
|
||||
) -> anyhow::Result<HashMap<LayerFileName, Arc<dyn PersistentLayer>>> {
|
||||
) -> anyhow::Result<HashMap<LayerFileName, Arc<RemoteLayerDesc>>> {
|
||||
// Are we missing some files that are present in remote storage?
|
||||
// Create RemoteLayer instances for them.
|
||||
let mut local_only_layers = local_layers;
|
||||
@@ -1725,7 +1740,7 @@ impl Timeline {
|
||||
continue;
|
||||
}
|
||||
|
||||
let remote_layer = RemoteLayer::new_img(
|
||||
let remote_layer = RemoteLayerDesc::new_img(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
imgfilename,
|
||||
@@ -1753,7 +1768,7 @@ impl Timeline {
|
||||
);
|
||||
continue;
|
||||
}
|
||||
let remote_layer = RemoteLayer::new_delta(
|
||||
let remote_layer = RemoteLayerDesc::new_delta(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
deltafilename,
|
||||
@@ -2159,7 +2174,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
|
||||
fn find_layer_desc(&self, layer_file_name: &str) -> Option<Arc<RemoteLayerDesc>> {
|
||||
for historic_layer in self.layers.read().unwrap().iter_historic_layers() {
|
||||
let historic_layer_name = historic_layer.filename().file_name();
|
||||
if layer_file_name == historic_layer_name {
|
||||
@@ -2176,10 +2191,10 @@ impl Timeline {
|
||||
&self,
|
||||
// we cannot remove layers otherwise, since gc and compaction will race
|
||||
_layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
|
||||
layer: Arc<dyn PersistentLayer>,
|
||||
updates: &mut BatchedUpdates<'_, dyn PersistentLayer>,
|
||||
layer: Arc<RemoteLayerDesc>,
|
||||
updates: &mut BatchedUpdates<'_, RemoteLayerDesc>,
|
||||
) -> anyhow::Result<()> {
|
||||
if !layer.is_remote_layer() {
|
||||
if self.layer_cache.contains(&layer.filename()) {
|
||||
layer.delete_resident_layer_file()?;
|
||||
let layer_file_size = layer.file_size();
|
||||
self.metrics
|
||||
@@ -2428,13 +2443,7 @@ impl Timeline {
|
||||
|
||||
if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
|
||||
// If it's a remote layer, download it and retry.
|
||||
if let Some(remote_layer) =
|
||||
super::storage_layer::downcast_remote_layer(&layer)
|
||||
{
|
||||
// TODO: push a breadcrumb to 'traversal_path' to record the fact that
|
||||
// we downloaded / would need to download this layer.
|
||||
remote_layer // download happens outside the scope of `layers` guard object
|
||||
} else {
|
||||
if let Some(layer) = self.layer_cache.get(&layer.filename()) {
|
||||
// Get all the data needed to reconstruct the page version from this layer.
|
||||
// But if we have an older cached page image, no need to go past that.
|
||||
let lsn_floor = max(cached_lsn + 1, lsn_floor);
|
||||
@@ -2457,6 +2466,10 @@ impl Timeline {
|
||||
}),
|
||||
));
|
||||
continue 'outer;
|
||||
} else {
|
||||
// TODO: push a breadcrumb to 'traversal_path' to record the fact that
|
||||
// we downloaded / would need to download this layer.
|
||||
layer // download happens outside the scope of `layers` guard object
|
||||
}
|
||||
} else if timeline.ancestor_timeline.is_some() {
|
||||
// Nothing on this timeline. Traverse to parent
|
||||
@@ -2896,7 +2909,8 @@ impl Timeline {
|
||||
LayerResidenceStatus::Resident,
|
||||
LayerResidenceEventReason::LayerCreate,
|
||||
);
|
||||
batch_updates.insert_historic(l);
|
||||
batch_updates.insert_historic(Arc::new(l.layer_desc()));
|
||||
self.layer_cache.insert(l.filename(), l);
|
||||
batch_updates.flush();
|
||||
|
||||
// update the timeline's physical size
|
||||
@@ -3142,7 +3156,9 @@ impl Timeline {
|
||||
LayerResidenceStatus::Resident,
|
||||
LayerResidenceEventReason::LayerCreate,
|
||||
);
|
||||
updates.insert_historic(l);
|
||||
updates.insert_historic(Arc::new(l.layer_desc()));
|
||||
let x: Arc<dyn PersistentLayer> = l;
|
||||
self.layer_cache.insert(x.filename(), x)
|
||||
}
|
||||
updates.flush();
|
||||
drop(layers);
|
||||
@@ -3155,7 +3171,7 @@ impl Timeline {
|
||||
#[derive(Default)]
|
||||
struct CompactLevel0Phase1Result {
|
||||
new_layers: Vec<DeltaLayer>,
|
||||
deltas_to_compact: Vec<Arc<dyn PersistentLayer>>,
|
||||
deltas_to_compact: Vec<Arc<RemoteLayerDesc>>,
|
||||
}
|
||||
|
||||
/// Top-level failure to compact.
|
||||
@@ -3165,7 +3181,7 @@ enum CompactionError {
|
||||
///
|
||||
/// This should not happen repeatedly, but will be retried once by top-level
|
||||
/// `Timeline::compact`.
|
||||
DownloadRequired(Vec<Arc<RemoteLayer>>),
|
||||
DownloadRequired(Vec<Arc<RemoteLayerDesc>>),
|
||||
/// Compaction cannot be done right now; page reconstruction and so on.
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
@@ -3237,13 +3253,9 @@ impl Timeline {
|
||||
|
||||
let remotes = deltas_to_compact
|
||||
.iter()
|
||||
.filter(|l| l.is_remote_layer())
|
||||
.filter(|l| !self.layer_cache.contains(&l.filename()))
|
||||
.inspect(|l| info!("compact requires download of {}", l.filename().file_name()))
|
||||
.map(|l| {
|
||||
l.clone()
|
||||
.downcast_remote_layer()
|
||||
.expect("just checked it is remote layer")
|
||||
})
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
if !remotes.is_empty() {
|
||||
@@ -3583,13 +3595,15 @@ impl Timeline {
|
||||
.add(metadata.len());
|
||||
|
||||
new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len()));
|
||||
let remote_desc = l.layer_desc();
|
||||
let x: Arc<dyn PersistentLayer + 'static> = Arc::new(l);
|
||||
x.access_stats().record_residence_event(
|
||||
&updates,
|
||||
LayerResidenceStatus::Resident,
|
||||
LayerResidenceEventReason::LayerCreate,
|
||||
);
|
||||
updates.insert_historic(x);
|
||||
updates.insert_historic(Arc::new(remote_desc));
|
||||
self.layer_cache.insert(x.filename(), x)
|
||||
}
|
||||
|
||||
// Now that we have reshuffled the data to set of new delta layers, we can
|
||||
@@ -4062,7 +4076,7 @@ impl Timeline {
|
||||
#[instrument(skip_all, fields(layer=%remote_layer.short_id()))]
|
||||
pub async fn download_remote_layer(
|
||||
&self,
|
||||
remote_layer: Arc<RemoteLayer>,
|
||||
remote_layer: Arc<RemoteLayerDesc>,
|
||||
) -> anyhow::Result<()> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
@@ -4119,10 +4133,12 @@ impl Timeline {
|
||||
// Delta- or ImageLayer in the layer map.
|
||||
let mut layers = self_clone.layers.write().unwrap();
|
||||
let mut updates = layers.batch_update();
|
||||
let new_layer = remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size);
|
||||
let new_layer =
|
||||
remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size);
|
||||
{
|
||||
use crate::tenant::layer_map::Replacement;
|
||||
let l: Arc<dyn PersistentLayer> = remote_layer.clone();
|
||||
/*
|
||||
let failure = match updates.replace_historic(&l, new_layer) {
|
||||
Ok(Replacement::Replaced { .. }) => false,
|
||||
Ok(Replacement::NotFound) => {
|
||||
@@ -4177,8 +4193,9 @@ impl Timeline {
|
||||
remote_layer
|
||||
.download_replacement_failure
|
||||
.store(true, Relaxed);
|
||||
}
|
||||
} */
|
||||
}
|
||||
|
||||
updates.flush();
|
||||
drop(layers);
|
||||
|
||||
@@ -4191,7 +4208,10 @@ impl Timeline {
|
||||
remote_layer.ongoing_download.close();
|
||||
} else {
|
||||
// Keep semaphore open. We'll drop the permit at the end of the function.
|
||||
error!("layer file download failed: {:?}", result.as_ref().unwrap_err());
|
||||
error!(
|
||||
"layer file download failed: {:?}",
|
||||
result.as_ref().unwrap_err()
|
||||
);
|
||||
}
|
||||
|
||||
// Don't treat it as an error if the task that triggered the download
|
||||
@@ -4205,7 +4225,8 @@ impl Timeline {
|
||||
drop(permit);
|
||||
|
||||
Ok(())
|
||||
}.in_current_span(),
|
||||
}
|
||||
.in_current_span(),
|
||||
);
|
||||
|
||||
receiver.await.context("download task cancelled")?
|
||||
@@ -4278,7 +4299,7 @@ impl Timeline {
|
||||
let layers = self.layers.read().unwrap();
|
||||
layers
|
||||
.iter_historic_layers()
|
||||
.filter_map(|l| l.downcast_remote_layer())
|
||||
.filter(|l| !self.layer_cache.contains(&l.filename()))
|
||||
.map(|l| self.download_remote_layer(l))
|
||||
.for_each(|dl| downloads.push(dl))
|
||||
}
|
||||
@@ -4353,7 +4374,7 @@ pub struct DiskUsageEvictionInfo {
|
||||
}
|
||||
|
||||
pub struct LocalLayerInfoForDiskUsageEviction {
|
||||
pub layer: Arc<dyn PersistentLayer>,
|
||||
pub layer: Arc<RemoteLayerDesc>,
|
||||
pub last_activity_ts: SystemTime,
|
||||
}
|
||||
|
||||
@@ -4387,7 +4408,7 @@ impl Timeline {
|
||||
let file_size = l.file_size();
|
||||
max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
|
||||
|
||||
if l.is_remote_layer() {
|
||||
if !self.layer_cache.contains(&l.filename()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
@@ -29,7 +29,7 @@ use crate::{
|
||||
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
|
||||
tenant::{
|
||||
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
|
||||
storage_layer::PersistentLayer,
|
||||
storage_layer::{PersistentLayer, RemoteLayerDesc},
|
||||
LogicalSizeCalculationCause, Tenant,
|
||||
},
|
||||
};
|
||||
@@ -184,11 +184,11 @@ impl Timeline {
|
||||
// NB: all the checks can be invalidated as soon as we release the layer map lock.
|
||||
// We don't want to hold the layer map lock during eviction.
|
||||
// So, we just need to deal with this.
|
||||
let candidates: Vec<Arc<dyn PersistentLayer>> = {
|
||||
let candidates: Vec<Arc<RemoteLayerDesc>> = {
|
||||
let layers = self.layers.read().unwrap();
|
||||
let mut candidates = Vec::new();
|
||||
for hist_layer in layers.iter_historic_layers() {
|
||||
if hist_layer.is_remote_layer() {
|
||||
if !self.layer_cache.contains(&hist_layer.filename()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user