mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-16 06:40:38 +00:00
Compare commits
1 Commits
rustls
...
skyzh/refa
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9479470d05 |
@@ -60,7 +60,7 @@ use utils::serde_percent::Percent;
|
|||||||
use crate::{
|
use crate::{
|
||||||
config::PageServerConf,
|
config::PageServerConf,
|
||||||
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
|
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
|
||||||
tenant::{self, storage_layer::PersistentLayer, Timeline},
|
tenant::{self, storage_layer::{PersistentLayer, RemoteLayerDesc}, Timeline},
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
@@ -329,7 +329,7 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
|||||||
// If we get far enough in the list that we start to evict layers that are below
|
// If we get far enough in the list that we start to evict layers that are below
|
||||||
// the tenant's min-resident-size threshold, print a warning, and memorize the disk
|
// the tenant's min-resident-size threshold, print a warning, and memorize the disk
|
||||||
// usage at that point, in 'usage_planned_min_resident_size_respecting'.
|
// usage at that point, in 'usage_planned_min_resident_size_respecting'.
|
||||||
let mut batched: HashMap<_, Vec<Arc<dyn PersistentLayer>>> = HashMap::new();
|
let mut batched: HashMap<_, Vec<Arc<RemoteLayerDesc>>> = HashMap::new();
|
||||||
let mut warned = None;
|
let mut warned = None;
|
||||||
let mut usage_planned = usage_pre;
|
let mut usage_planned = usage_pre;
|
||||||
for (i, (partition, candidate)) in candidates.into_iter().enumerate() {
|
for (i, (partition, candidate)) in candidates.into_iter().enumerate() {
|
||||||
@@ -434,7 +434,7 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
|||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
struct EvictionCandidate {
|
struct EvictionCandidate {
|
||||||
timeline: Arc<Timeline>,
|
timeline: Arc<Timeline>,
|
||||||
layer: Arc<dyn PersistentLayer>,
|
layer: Arc<RemoteLayerDesc>,
|
||||||
last_activity_ts: SystemTime,
|
last_activity_ts: SystemTime,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -84,6 +84,7 @@ pub mod block_io;
|
|||||||
pub mod disk_btree;
|
pub mod disk_btree;
|
||||||
pub(crate) mod ephemeral_file;
|
pub(crate) mod ephemeral_file;
|
||||||
pub mod layer_map;
|
pub mod layer_map;
|
||||||
|
pub mod layer_cache;
|
||||||
|
|
||||||
pub mod metadata;
|
pub mod metadata;
|
||||||
mod par_fsync;
|
mod par_fsync;
|
||||||
|
|||||||
34
pageserver/src/tenant/layer_cache.rs
Normal file
34
pageserver/src/tenant/layer_cache.rs
Normal file
@@ -0,0 +1,34 @@
|
|||||||
|
use std::{
|
||||||
|
collections::HashMap,
|
||||||
|
sync::{Arc, Mutex},
|
||||||
|
};
|
||||||
|
|
||||||
|
use super::storage_layer::{LayerFileName, PersistentLayer, RemoteLayerDesc};
|
||||||
|
|
||||||
|
pub struct LayerCache {
|
||||||
|
layers: Mutex<HashMap<LayerFileName, Arc<dyn PersistentLayer>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LayerCache {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
layers: Mutex::new(HashMap::new()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get(&self, layer_fname: &LayerFileName) -> Option<Arc<dyn PersistentLayer>> {
|
||||||
|
let guard: std::sync::MutexGuard<HashMap<LayerFileName, Arc<dyn PersistentLayer>>> =
|
||||||
|
self.layers.lock().unwrap();
|
||||||
|
guard.get(layer_fname).cloned()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn contains(&self, layer_fname: &LayerFileName) -> bool {
|
||||||
|
let guard = self.layers.lock().unwrap();
|
||||||
|
guard.contains_key(layer_fname)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn insert(&self, layer_fname: LayerFileName, persistent_layer: Arc<dyn PersistentLayer>) {
|
||||||
|
let mut guard = self.layers.lock().unwrap();
|
||||||
|
guard.insert(layer_fname, persistent_layer);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -61,6 +61,7 @@ use historic_layer_coverage::BufferedHistoricLayerCoverage;
|
|||||||
pub use historic_layer_coverage::Replacement;
|
pub use historic_layer_coverage::Replacement;
|
||||||
|
|
||||||
use super::storage_layer::range_eq;
|
use super::storage_layer::range_eq;
|
||||||
|
use super::storage_layer::PersistentLayer;
|
||||||
|
|
||||||
///
|
///
|
||||||
/// LayerMap tracks what layers exist on a timeline.
|
/// LayerMap tracks what layers exist on a timeline.
|
||||||
@@ -138,24 +139,19 @@ where
|
|||||||
self.layer_map.remove_historic_noflush(layer)
|
self.layer_map.remove_historic_noflush(layer)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Replaces existing layer iff it is the `expected`.
|
/// Ensure the downloaded layer matches existing layer.
|
||||||
///
|
///
|
||||||
/// If the expected layer has been removed it will not be inserted by this function.
|
/// Returned `Replacement` describes succeeding in checking or the reason why it could not
|
||||||
///
|
|
||||||
/// Returned `Replacement` describes succeeding in replacement or the reason why it could not
|
|
||||||
/// be done.
|
/// be done.
|
||||||
///
|
pub fn ensure_consistent(
|
||||||
/// TODO replacement can be done without buffering and rebuilding layer map updates.
|
&self,
|
||||||
/// One way to do that is to add a layer of indirection for returned values, so
|
|
||||||
/// that we can replace values only by updating a hashmap.
|
|
||||||
pub fn replace_historic(
|
|
||||||
&mut self,
|
|
||||||
expected: &Arc<L>,
|
expected: &Arc<L>,
|
||||||
new: Arc<L>,
|
new: Arc<dyn PersistentLayer>,
|
||||||
) -> anyhow::Result<Replacement<Arc<L>>> {
|
) -> anyhow::Result<Replacement<Arc<L>>> {
|
||||||
fail::fail_point!("layermap-replace-notfound", |_| Ok(Replacement::NotFound));
|
fail::fail_point!("layermap-replace-notfound", |_| Ok(Replacement::NotFound));
|
||||||
|
|
||||||
self.layer_map.replace_historic_noflush(expected, new)
|
self.layer_map
|
||||||
|
.ensure_consistent_noflush(expected, new)
|
||||||
}
|
}
|
||||||
|
|
||||||
// We will flush on drop anyway, but this method makes it
|
// We will flush on drop anyway, but this method makes it
|
||||||
@@ -309,16 +305,16 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(self) fn replace_historic_noflush(
|
pub(self) fn ensure_consistent_noflush(
|
||||||
&mut self,
|
&self,
|
||||||
expected: &Arc<L>,
|
expected: &Arc<L>,
|
||||||
new: Arc<L>,
|
new: Arc<dyn PersistentLayer>,
|
||||||
) -> anyhow::Result<Replacement<Arc<L>>> {
|
) -> anyhow::Result<Replacement<Arc<L>>> {
|
||||||
let key = historic_layer_coverage::LayerKey::from(&**expected);
|
let key = historic_layer_coverage::LayerKey::from(&**expected);
|
||||||
let other = historic_layer_coverage::LayerKey::from(&*new);
|
let other = historic_layer_coverage::LayerKey::from(&*new);
|
||||||
|
|
||||||
let expected_l0 = Self::is_l0(expected);
|
let expected_l0 = Self::is_l0(expected);
|
||||||
let new_l0 = Self::is_l0(&new);
|
let new_l0 = LayerMap::<dyn PersistentLayer>::is_l0(&*new);
|
||||||
|
|
||||||
anyhow::ensure!(
|
anyhow::ensure!(
|
||||||
key == other,
|
key == other,
|
||||||
@@ -345,17 +341,7 @@ where
|
|||||||
None
|
None
|
||||||
};
|
};
|
||||||
|
|
||||||
let replaced = self.historic.replace(&key, new.clone(), |existing| {
|
Ok(Replacement::Replaced { in_buffered: false })
|
||||||
Self::compare_arced_layers(existing, expected)
|
|
||||||
});
|
|
||||||
|
|
||||||
if let Replacement::Replaced { .. } = &replaced {
|
|
||||||
if let Some(index) = l0_index {
|
|
||||||
self.l0_delta_layers[index] = new;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(replaced)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Helper function for BatchedUpdates::drop.
|
/// Helper function for BatchedUpdates::drop.
|
||||||
|
|||||||
@@ -37,7 +37,7 @@ pub use delta_layer::{DeltaLayer, DeltaLayerWriter};
|
|||||||
pub use filename::{DeltaFileName, ImageFileName, LayerFileName};
|
pub use filename::{DeltaFileName, ImageFileName, LayerFileName};
|
||||||
pub use image_layer::{ImageLayer, ImageLayerWriter};
|
pub use image_layer::{ImageLayer, ImageLayerWriter};
|
||||||
pub use inmemory_layer::InMemoryLayer;
|
pub use inmemory_layer::InMemoryLayer;
|
||||||
pub use remote_layer::RemoteLayer;
|
pub use remote_layer::RemoteLayerDesc;
|
||||||
|
|
||||||
use super::layer_map::BatchedUpdates;
|
use super::layer_map::BatchedUpdates;
|
||||||
|
|
||||||
@@ -431,14 +431,6 @@ pub trait PersistentLayer: Layer {
|
|||||||
/// Permanently remove this layer from disk.
|
/// Permanently remove this layer from disk.
|
||||||
fn delete_resident_layer_file(&self) -> Result<()>;
|
fn delete_resident_layer_file(&self) -> Result<()>;
|
||||||
|
|
||||||
fn downcast_remote_layer(self: Arc<Self>) -> Option<std::sync::Arc<RemoteLayer>> {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
|
|
||||||
fn is_remote_layer(&self) -> bool {
|
|
||||||
false
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns None if the layer file size is not known.
|
/// Returns None if the layer file size is not known.
|
||||||
///
|
///
|
||||||
/// Should not change over the lifetime of the layer object because
|
/// Should not change over the lifetime of the layer object because
|
||||||
@@ -450,16 +442,6 @@ pub trait PersistentLayer: Layer {
|
|||||||
fn access_stats(&self) -> &LayerAccessStats;
|
fn access_stats(&self) -> &LayerAccessStats;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn downcast_remote_layer(
|
|
||||||
layer: &Arc<dyn PersistentLayer>,
|
|
||||||
) -> Option<std::sync::Arc<RemoteLayer>> {
|
|
||||||
if layer.is_remote_layer() {
|
|
||||||
Arc::clone(layer).downcast_remote_layer()
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Holds metadata about a layer without any content. Used mostly for testing.
|
/// Holds metadata about a layer without any content. Used mostly for testing.
|
||||||
///
|
///
|
||||||
/// To use filenames as fixtures, parse them as [`LayerFileName`] then convert from that to a
|
/// To use filenames as fixtures, parse them as [`LayerFileName`] then convert from that to a
|
||||||
|
|||||||
@@ -30,6 +30,7 @@ use crate::repository::{Key, Value, KEY_SIZE};
|
|||||||
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
|
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
|
||||||
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockReader, FileBlockReader};
|
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockReader, FileBlockReader};
|
||||||
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
|
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
|
||||||
|
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||||
use crate::tenant::storage_layer::{
|
use crate::tenant::storage_layer::{
|
||||||
PersistentLayer, ValueReconstructResult, ValueReconstructState,
|
PersistentLayer, ValueReconstructResult, ValueReconstructState,
|
||||||
};
|
};
|
||||||
@@ -57,7 +58,7 @@ use utils::{
|
|||||||
|
|
||||||
use super::{
|
use super::{
|
||||||
DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, LayerFileName, LayerIter,
|
DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, LayerFileName, LayerIter,
|
||||||
LayerKeyIter, PathOrConf,
|
LayerKeyIter, PathOrConf, RemoteLayerDesc,
|
||||||
};
|
};
|
||||||
|
|
||||||
///
|
///
|
||||||
@@ -663,6 +664,17 @@ impl DeltaLayer {
|
|||||||
&self.layer_name(),
|
&self.layer_name(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Create layer descriptor for this image layer
|
||||||
|
pub fn layer_desc(&self) -> RemoteLayerDesc {
|
||||||
|
RemoteLayerDesc::new_delta(
|
||||||
|
self.tenant_id,
|
||||||
|
self.timeline_id,
|
||||||
|
&self.layer_name(),
|
||||||
|
&LayerFileMetadata::new(self.file_size()),
|
||||||
|
LayerAccessStats::empty_will_record_residence_event_later(),
|
||||||
|
)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A builder object for constructing a new delta layer.
|
/// A builder object for constructing a new delta layer.
|
||||||
|
|||||||
@@ -26,6 +26,7 @@ use crate::repository::{Key, KEY_SIZE};
|
|||||||
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
|
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
|
||||||
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
|
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
|
||||||
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
|
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
|
||||||
|
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||||
use crate::tenant::storage_layer::{
|
use crate::tenant::storage_layer::{
|
||||||
LayerAccessStats, PersistentLayer, ValueReconstructResult, ValueReconstructState,
|
LayerAccessStats, PersistentLayer, ValueReconstructResult, ValueReconstructState,
|
||||||
};
|
};
|
||||||
@@ -53,7 +54,7 @@ use utils::{
|
|||||||
};
|
};
|
||||||
|
|
||||||
use super::filename::{ImageFileName, LayerFileName};
|
use super::filename::{ImageFileName, LayerFileName};
|
||||||
use super::{Layer, LayerAccessStatsReset, LayerIter, PathOrConf};
|
use super::{Layer, LayerAccessStatsReset, LayerIter, PathOrConf, RemoteLayerDesc};
|
||||||
|
|
||||||
///
|
///
|
||||||
/// Header stored in the beginning of the file
|
/// Header stored in the beginning of the file
|
||||||
@@ -464,6 +465,17 @@ impl ImageLayer {
|
|||||||
&self.layer_name(),
|
&self.layer_name(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Create layer descriptor for this image layer
|
||||||
|
pub fn layer_desc(&self) -> RemoteLayerDesc {
|
||||||
|
RemoteLayerDesc::new_img(
|
||||||
|
self.tenant_id,
|
||||||
|
self.timeline_id,
|
||||||
|
&self.layer_name(),
|
||||||
|
&LayerFileMetadata::new(self.file_size()),
|
||||||
|
LayerAccessStats::empty_will_record_residence_event_later(),
|
||||||
|
)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A builder object for constructing a new image layer.
|
/// A builder object for constructing a new image layer.
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
//! A RemoteLayer is an in-memory placeholder for a layer file that exists
|
//! A RemoteLayerDesc is an in-memory placeholder for a layer file that exists
|
||||||
//! in remote storage.
|
//! in remote storage.
|
||||||
//!
|
//!
|
||||||
use crate::config::PageServerConf;
|
use crate::config::PageServerConf;
|
||||||
@@ -25,19 +25,19 @@ use super::{
|
|||||||
LayerResidenceStatus, PersistentLayer,
|
LayerResidenceStatus, PersistentLayer,
|
||||||
};
|
};
|
||||||
|
|
||||||
/// RemoteLayer is a not yet downloaded [`ImageLayer`] or
|
/// RemoteLayerDesc is a not yet downloaded [`ImageLayer`] or
|
||||||
/// [`crate::storage_layer::DeltaLayer`].
|
/// [`crate::storage_layer::DeltaLayer`].
|
||||||
///
|
///
|
||||||
/// RemoteLayer might be downloaded on-demand during operations which are
|
/// RemoteLayerDesc might be downloaded on-demand during operations which are
|
||||||
/// allowed download remote layers and during which, it gets replaced with a
|
/// allowed download remote layers and during which, it gets replaced with a
|
||||||
/// concrete `DeltaLayer` or `ImageLayer`.
|
/// concrete `DeltaLayer` or `ImageLayer`.
|
||||||
///
|
///
|
||||||
/// See: [`crate::context::RequestContext`] for authorization to download
|
/// See: [`crate::context::RequestContext`] for authorization to download
|
||||||
pub struct RemoteLayer {
|
pub struct RemoteLayerDesc {
|
||||||
tenantid: TenantId,
|
pub(crate) tenantid: TenantId,
|
||||||
timelineid: TimelineId,
|
pub(crate) timelineid: TimelineId,
|
||||||
key_range: Range<Key>,
|
pub(crate) key_range: Range<Key>,
|
||||||
lsn_range: Range<Lsn>,
|
pub(crate) lsn_range: Range<Lsn>,
|
||||||
|
|
||||||
pub file_name: LayerFileName,
|
pub file_name: LayerFileName,
|
||||||
|
|
||||||
@@ -54,7 +54,7 @@ pub struct RemoteLayer {
|
|||||||
/// Has `LayerMap::replace` failed for this (true) or not (false).
|
/// Has `LayerMap::replace` failed for this (true) or not (false).
|
||||||
///
|
///
|
||||||
/// Used together with [`ongoing_download`] semaphore in `Timeline::download_remote_layer`.
|
/// Used together with [`ongoing_download`] semaphore in `Timeline::download_remote_layer`.
|
||||||
/// The field is used to mark a RemoteLayer permanently (until restart or ignore+load)
|
/// The field is used to mark a RemoteLayerDesc permanently (until restart or ignore+load)
|
||||||
/// unprocessable, because a LayerMap::replace failed.
|
/// unprocessable, because a LayerMap::replace failed.
|
||||||
///
|
///
|
||||||
/// It is very unlikely to accumulate these in the Timeline's LayerMap, but having this avoids
|
/// It is very unlikely to accumulate these in the Timeline's LayerMap, but having this avoids
|
||||||
@@ -63,9 +63,9 @@ pub struct RemoteLayer {
|
|||||||
pub(crate) download_replacement_failure: std::sync::atomic::AtomicBool,
|
pub(crate) download_replacement_failure: std::sync::atomic::AtomicBool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::fmt::Debug for RemoteLayer {
|
impl std::fmt::Debug for RemoteLayerDesc {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
f.debug_struct("RemoteLayer")
|
f.debug_struct("RemoteLayerDesc")
|
||||||
.field("file_name", &self.file_name)
|
.field("file_name", &self.file_name)
|
||||||
.field("layer_metadata", &self.layer_metadata)
|
.field("layer_metadata", &self.layer_metadata)
|
||||||
.field("is_incremental", &self.is_incremental)
|
.field("is_incremental", &self.is_incremental)
|
||||||
@@ -73,7 +73,7 @@ impl std::fmt::Debug for RemoteLayer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Layer for RemoteLayer {
|
impl Layer for RemoteLayerDesc {
|
||||||
fn get_key_range(&self) -> Range<Key> {
|
fn get_key_range(&self) -> Range<Key> {
|
||||||
self.key_range.clone()
|
self.key_range.clone()
|
||||||
}
|
}
|
||||||
@@ -119,7 +119,7 @@ impl Layer for RemoteLayer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PersistentLayer for RemoteLayer {
|
impl PersistentLayer for RemoteLayerDesc {
|
||||||
fn get_tenant_id(&self) -> TenantId {
|
fn get_tenant_id(&self) -> TenantId {
|
||||||
self.tenantid
|
self.tenantid
|
||||||
}
|
}
|
||||||
@@ -160,14 +160,6 @@ impl PersistentLayer for RemoteLayer {
|
|||||||
bail!("remote layer has no layer file");
|
bail!("remote layer has no layer file");
|
||||||
}
|
}
|
||||||
|
|
||||||
fn downcast_remote_layer<'a>(self: Arc<Self>) -> Option<std::sync::Arc<RemoteLayer>> {
|
|
||||||
Some(self)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn is_remote_layer(&self) -> bool {
|
|
||||||
true
|
|
||||||
}
|
|
||||||
|
|
||||||
fn file_size(&self) -> u64 {
|
fn file_size(&self) -> u64 {
|
||||||
self.layer_metadata.file_size()
|
self.layer_metadata.file_size()
|
||||||
}
|
}
|
||||||
@@ -201,15 +193,15 @@ impl PersistentLayer for RemoteLayer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RemoteLayer {
|
impl RemoteLayerDesc {
|
||||||
pub fn new_img(
|
pub fn new_img(
|
||||||
tenantid: TenantId,
|
tenantid: TenantId,
|
||||||
timelineid: TimelineId,
|
timelineid: TimelineId,
|
||||||
fname: &ImageFileName,
|
fname: &ImageFileName,
|
||||||
layer_metadata: &LayerFileMetadata,
|
layer_metadata: &LayerFileMetadata,
|
||||||
access_stats: LayerAccessStats,
|
access_stats: LayerAccessStats,
|
||||||
) -> RemoteLayer {
|
) -> RemoteLayerDesc {
|
||||||
RemoteLayer {
|
RemoteLayerDesc {
|
||||||
tenantid,
|
tenantid,
|
||||||
timelineid,
|
timelineid,
|
||||||
key_range: fname.key_range.clone(),
|
key_range: fname.key_range.clone(),
|
||||||
@@ -230,8 +222,8 @@ impl RemoteLayer {
|
|||||||
fname: &DeltaFileName,
|
fname: &DeltaFileName,
|
||||||
layer_metadata: &LayerFileMetadata,
|
layer_metadata: &LayerFileMetadata,
|
||||||
access_stats: LayerAccessStats,
|
access_stats: LayerAccessStats,
|
||||||
) -> RemoteLayer {
|
) -> RemoteLayerDesc {
|
||||||
RemoteLayer {
|
RemoteLayerDesc {
|
||||||
tenantid,
|
tenantid,
|
||||||
timelineid,
|
timelineid,
|
||||||
key_range: fname.key_range.clone(),
|
key_range: fname.key_range.clone(),
|
||||||
|
|||||||
@@ -35,7 +35,7 @@ use crate::context::{DownloadBehavior, RequestContext};
|
|||||||
use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata};
|
use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata};
|
||||||
use crate::tenant::storage_layer::{
|
use crate::tenant::storage_layer::{
|
||||||
DeltaFileName, DeltaLayerWriter, ImageFileName, ImageLayerWriter, InMemoryLayer,
|
DeltaFileName, DeltaLayerWriter, ImageFileName, ImageLayerWriter, InMemoryLayer,
|
||||||
LayerAccessStats, LayerFileName, RemoteLayer,
|
LayerAccessStats, LayerFileName, RemoteLayerDesc,
|
||||||
};
|
};
|
||||||
use crate::tenant::{
|
use crate::tenant::{
|
||||||
ephemeral_file::is_ephemeral_file,
|
ephemeral_file::is_ephemeral_file,
|
||||||
@@ -77,6 +77,7 @@ use self::eviction_task::EvictionTaskTimelineState;
|
|||||||
use self::walreceiver::{WalReceiver, WalReceiverConf};
|
use self::walreceiver::{WalReceiver, WalReceiverConf};
|
||||||
|
|
||||||
use super::config::TenantConf;
|
use super::config::TenantConf;
|
||||||
|
use super::layer_cache::LayerCache;
|
||||||
use super::layer_map::BatchedUpdates;
|
use super::layer_map::BatchedUpdates;
|
||||||
use super::remote_timeline_client::index::IndexPart;
|
use super::remote_timeline_client::index::IndexPart;
|
||||||
use super::remote_timeline_client::RemoteTimelineClient;
|
use super::remote_timeline_client::RemoteTimelineClient;
|
||||||
@@ -119,7 +120,7 @@ pub struct Timeline {
|
|||||||
|
|
||||||
pub pg_version: u32,
|
pub pg_version: u32,
|
||||||
|
|
||||||
pub(super) layers: RwLock<LayerMap<dyn PersistentLayer>>,
|
pub(super) layers: RwLock<LayerMap<RemoteLayerDesc>>,
|
||||||
|
|
||||||
/// Set of key ranges which should be covered by image layers to
|
/// Set of key ranges which should be covered by image layers to
|
||||||
/// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
|
/// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
|
||||||
@@ -241,6 +242,8 @@ pub struct Timeline {
|
|||||||
pub delete_lock: tokio::sync::Mutex<bool>,
|
pub delete_lock: tokio::sync::Mutex<bool>,
|
||||||
|
|
||||||
eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
|
eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
|
||||||
|
|
||||||
|
layer_cache: Arc<LayerCache>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Internal structure to hold all data needed for logical size calculation.
|
/// Internal structure to hold all data needed for logical size calculation.
|
||||||
@@ -1007,20 +1010,22 @@ impl Timeline {
|
|||||||
|
|
||||||
#[instrument(skip_all, fields(tenant = %self.tenant_id, timeline = %self.timeline_id))]
|
#[instrument(skip_all, fields(tenant = %self.tenant_id, timeline = %self.timeline_id))]
|
||||||
pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
|
pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
|
||||||
let Some(layer) = self.find_layer(layer_file_name) else { return Ok(None) };
|
let Some(remote_layer_desc) = self.find_layer_desc(layer_file_name) else { return Ok(None) };
|
||||||
let Some(remote_layer) = layer.downcast_remote_layer() else { return Ok(Some(false)) };
|
if self.layer_cache.contains(&remote_layer_desc.filename()) {
|
||||||
|
return Ok(Some(false));
|
||||||
|
}
|
||||||
if self.remote_client.is_none() {
|
if self.remote_client.is_none() {
|
||||||
return Ok(Some(false));
|
return Ok(Some(false));
|
||||||
}
|
}
|
||||||
|
|
||||||
self.download_remote_layer(remote_layer).await?;
|
self.download_remote_layer(remote_layer_desc).await?;
|
||||||
Ok(Some(true))
|
Ok(Some(true))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Like [`evict_layer_batch`], but for just one layer.
|
/// Like [`evict_layer_batch`], but for just one layer.
|
||||||
/// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`.
|
/// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`.
|
||||||
pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
|
pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
|
||||||
let Some(local_layer) = self.find_layer(layer_file_name) else { return Ok(None) };
|
let Some(local_layer) = self.find_layer_desc(layer_file_name) else { return Ok(None) };
|
||||||
let remote_client = self
|
let remote_client = self
|
||||||
.remote_client
|
.remote_client
|
||||||
.as_ref()
|
.as_ref()
|
||||||
@@ -1047,7 +1052,7 @@ impl Timeline {
|
|||||||
pub async fn evict_layers(
|
pub async fn evict_layers(
|
||||||
&self,
|
&self,
|
||||||
_: &GenericRemoteStorage,
|
_: &GenericRemoteStorage,
|
||||||
layers_to_evict: &[Arc<dyn PersistentLayer>],
|
layers_to_evict: &[Arc<RemoteLayerDesc>],
|
||||||
cancel: CancellationToken,
|
cancel: CancellationToken,
|
||||||
) -> anyhow::Result<Vec<Option<anyhow::Result<bool>>>> {
|
) -> anyhow::Result<Vec<Option<anyhow::Result<bool>>>> {
|
||||||
let remote_client = self.remote_client.clone().expect(
|
let remote_client = self.remote_client.clone().expect(
|
||||||
@@ -1082,7 +1087,7 @@ impl Timeline {
|
|||||||
async fn evict_layer_batch(
|
async fn evict_layer_batch(
|
||||||
&self,
|
&self,
|
||||||
remote_client: &Arc<RemoteTimelineClient>,
|
remote_client: &Arc<RemoteTimelineClient>,
|
||||||
layers_to_evict: &[Arc<dyn PersistentLayer>],
|
layers_to_evict: &[Arc<RemoteLayerDesc>],
|
||||||
cancel: CancellationToken,
|
cancel: CancellationToken,
|
||||||
) -> anyhow::Result<Vec<Option<anyhow::Result<bool>>>> {
|
) -> anyhow::Result<Vec<Option<anyhow::Result<bool>>>> {
|
||||||
// ensure that the layers have finished uploading
|
// ensure that the layers have finished uploading
|
||||||
@@ -1131,12 +1136,12 @@ impl Timeline {
|
|||||||
fn evict_layer_batch_impl(
|
fn evict_layer_batch_impl(
|
||||||
&self,
|
&self,
|
||||||
_layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
|
_layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
|
||||||
local_layer: &Arc<dyn PersistentLayer>,
|
local_layer: &Arc<RemoteLayerDesc>,
|
||||||
batch_updates: &mut BatchedUpdates<'_, dyn PersistentLayer>,
|
batch_updates: &mut BatchedUpdates<'_, RemoteLayerDesc>,
|
||||||
) -> anyhow::Result<bool> {
|
) -> anyhow::Result<bool> {
|
||||||
use super::layer_map::Replacement;
|
use super::layer_map::Replacement;
|
||||||
|
|
||||||
if local_layer.is_remote_layer() {
|
if !self.layer_cache.contains(&local_layer.filename()) {
|
||||||
// TODO(issue #3851): consider returning an err here instead of false,
|
// TODO(issue #3851): consider returning an err here instead of false,
|
||||||
// which is the same out the match later
|
// which is the same out the match later
|
||||||
return Ok(false);
|
return Ok(false);
|
||||||
@@ -1163,7 +1168,7 @@ impl Timeline {
|
|||||||
let layer_metadata = LayerFileMetadata::new(layer_file_size);
|
let layer_metadata = LayerFileMetadata::new(layer_file_size);
|
||||||
|
|
||||||
let new_remote_layer = Arc::new(match local_layer.filename() {
|
let new_remote_layer = Arc::new(match local_layer.filename() {
|
||||||
LayerFileName::Image(image_name) => RemoteLayer::new_img(
|
LayerFileName::Image(image_name) => RemoteLayerDesc::new_img(
|
||||||
self.tenant_id,
|
self.tenant_id,
|
||||||
self.timeline_id,
|
self.timeline_id,
|
||||||
&image_name,
|
&image_name,
|
||||||
@@ -1172,7 +1177,7 @@ impl Timeline {
|
|||||||
.access_stats()
|
.access_stats()
|
||||||
.clone_for_residence_change(batch_updates, LayerResidenceStatus::Evicted),
|
.clone_for_residence_change(batch_updates, LayerResidenceStatus::Evicted),
|
||||||
),
|
),
|
||||||
LayerFileName::Delta(delta_name) => RemoteLayer::new_delta(
|
LayerFileName::Delta(delta_name) => RemoteLayerDesc::new_delta(
|
||||||
self.tenant_id,
|
self.tenant_id,
|
||||||
self.timeline_id,
|
self.timeline_id,
|
||||||
&delta_name,
|
&delta_name,
|
||||||
@@ -1183,6 +1188,7 @@ impl Timeline {
|
|||||||
),
|
),
|
||||||
});
|
});
|
||||||
|
|
||||||
|
/*
|
||||||
let replaced = match batch_updates.replace_historic(local_layer, new_remote_layer)? {
|
let replaced = match batch_updates.replace_historic(local_layer, new_remote_layer)? {
|
||||||
Replacement::Replaced { .. } => {
|
Replacement::Replaced { .. } => {
|
||||||
if let Err(e) = local_layer.delete_resident_layer_file() {
|
if let Err(e) = local_layer.delete_resident_layer_file() {
|
||||||
@@ -1233,8 +1239,10 @@ impl Timeline {
|
|||||||
false
|
false
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
*/
|
||||||
|
|
||||||
Ok(replaced)
|
// Ok(replaced)
|
||||||
|
Ok(true)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1419,6 +1427,8 @@ impl Timeline {
|
|||||||
EvictionTaskTimelineState::default(),
|
EvictionTaskTimelineState::default(),
|
||||||
),
|
),
|
||||||
delete_lock: tokio::sync::Mutex::new(false),
|
delete_lock: tokio::sync::Mutex::new(false),
|
||||||
|
|
||||||
|
layer_cache: Arc::new(LayerCache::new()),
|
||||||
};
|
};
|
||||||
result.repartition_threshold = result.get_checkpoint_distance() / 10;
|
result.repartition_threshold = result.get_checkpoint_distance() / 10;
|
||||||
result
|
result
|
||||||
@@ -1565,9 +1575,12 @@ impl Timeline {
|
|||||||
LayerAccessStats::for_loading_layer(&updates, LayerResidenceStatus::Resident),
|
LayerAccessStats::for_loading_layer(&updates, LayerResidenceStatus::Resident),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let remote_desc = layer.layer_desc();
|
||||||
|
|
||||||
trace!("found layer {}", layer.path().display());
|
trace!("found layer {}", layer.path().display());
|
||||||
total_physical_size += file_size;
|
total_physical_size += file_size;
|
||||||
updates.insert_historic(Arc::new(layer));
|
self.layer_cache.insert(layer.filename(), Arc::new(layer));
|
||||||
|
updates.insert_historic(Arc::new(remote_desc));
|
||||||
num_layers += 1;
|
num_layers += 1;
|
||||||
} else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) {
|
} else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) {
|
||||||
// Create a DeltaLayer struct for each delta file.
|
// Create a DeltaLayer struct for each delta file.
|
||||||
@@ -1599,7 +1612,9 @@ impl Timeline {
|
|||||||
|
|
||||||
trace!("found layer {}", layer.path().display());
|
trace!("found layer {}", layer.path().display());
|
||||||
total_physical_size += file_size;
|
total_physical_size += file_size;
|
||||||
updates.insert_historic(Arc::new(layer));
|
let remote_desc = layer.layer_desc();
|
||||||
|
self.layer_cache.insert(layer.filename(), Arc::new(layer));
|
||||||
|
updates.insert_historic(Arc::new(remote_desc));
|
||||||
num_layers += 1;
|
num_layers += 1;
|
||||||
} else if fname == METADATA_FILE_NAME || fname.ends_with(".old") {
|
} else if fname == METADATA_FILE_NAME || fname.ends_with(".old") {
|
||||||
// ignore these
|
// ignore these
|
||||||
@@ -1644,9 +1659,9 @@ impl Timeline {
|
|||||||
async fn create_remote_layers(
|
async fn create_remote_layers(
|
||||||
&self,
|
&self,
|
||||||
index_part: &IndexPart,
|
index_part: &IndexPart,
|
||||||
local_layers: HashMap<LayerFileName, Arc<dyn PersistentLayer>>,
|
local_layers: HashMap<LayerFileName, Arc<RemoteLayerDesc>>,
|
||||||
up_to_date_disk_consistent_lsn: Lsn,
|
up_to_date_disk_consistent_lsn: Lsn,
|
||||||
) -> anyhow::Result<HashMap<LayerFileName, Arc<dyn PersistentLayer>>> {
|
) -> anyhow::Result<HashMap<LayerFileName, Arc<RemoteLayerDesc>>> {
|
||||||
// Are we missing some files that are present in remote storage?
|
// Are we missing some files that are present in remote storage?
|
||||||
// Create RemoteLayer instances for them.
|
// Create RemoteLayer instances for them.
|
||||||
let mut local_only_layers = local_layers;
|
let mut local_only_layers = local_layers;
|
||||||
@@ -1725,7 +1740,7 @@ impl Timeline {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
let remote_layer = RemoteLayer::new_img(
|
let remote_layer = RemoteLayerDesc::new_img(
|
||||||
self.tenant_id,
|
self.tenant_id,
|
||||||
self.timeline_id,
|
self.timeline_id,
|
||||||
imgfilename,
|
imgfilename,
|
||||||
@@ -1753,7 +1768,7 @@ impl Timeline {
|
|||||||
);
|
);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
let remote_layer = RemoteLayer::new_delta(
|
let remote_layer = RemoteLayerDesc::new_delta(
|
||||||
self.tenant_id,
|
self.tenant_id,
|
||||||
self.timeline_id,
|
self.timeline_id,
|
||||||
deltafilename,
|
deltafilename,
|
||||||
@@ -2159,7 +2174,7 @@ impl Timeline {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
|
fn find_layer_desc(&self, layer_file_name: &str) -> Option<Arc<RemoteLayerDesc>> {
|
||||||
for historic_layer in self.layers.read().unwrap().iter_historic_layers() {
|
for historic_layer in self.layers.read().unwrap().iter_historic_layers() {
|
||||||
let historic_layer_name = historic_layer.filename().file_name();
|
let historic_layer_name = historic_layer.filename().file_name();
|
||||||
if layer_file_name == historic_layer_name {
|
if layer_file_name == historic_layer_name {
|
||||||
@@ -2176,10 +2191,10 @@ impl Timeline {
|
|||||||
&self,
|
&self,
|
||||||
// we cannot remove layers otherwise, since gc and compaction will race
|
// we cannot remove layers otherwise, since gc and compaction will race
|
||||||
_layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
|
_layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
|
||||||
layer: Arc<dyn PersistentLayer>,
|
layer: Arc<RemoteLayerDesc>,
|
||||||
updates: &mut BatchedUpdates<'_, dyn PersistentLayer>,
|
updates: &mut BatchedUpdates<'_, RemoteLayerDesc>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
if !layer.is_remote_layer() {
|
if self.layer_cache.contains(&layer.filename()) {
|
||||||
layer.delete_resident_layer_file()?;
|
layer.delete_resident_layer_file()?;
|
||||||
let layer_file_size = layer.file_size();
|
let layer_file_size = layer.file_size();
|
||||||
self.metrics
|
self.metrics
|
||||||
@@ -2428,13 +2443,7 @@ impl Timeline {
|
|||||||
|
|
||||||
if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
|
if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
|
||||||
// If it's a remote layer, download it and retry.
|
// If it's a remote layer, download it and retry.
|
||||||
if let Some(remote_layer) =
|
if let Some(layer) = self.layer_cache.get(&layer.filename()) {
|
||||||
super::storage_layer::downcast_remote_layer(&layer)
|
|
||||||
{
|
|
||||||
// TODO: push a breadcrumb to 'traversal_path' to record the fact that
|
|
||||||
// we downloaded / would need to download this layer.
|
|
||||||
remote_layer // download happens outside the scope of `layers` guard object
|
|
||||||
} else {
|
|
||||||
// Get all the data needed to reconstruct the page version from this layer.
|
// Get all the data needed to reconstruct the page version from this layer.
|
||||||
// But if we have an older cached page image, no need to go past that.
|
// But if we have an older cached page image, no need to go past that.
|
||||||
let lsn_floor = max(cached_lsn + 1, lsn_floor);
|
let lsn_floor = max(cached_lsn + 1, lsn_floor);
|
||||||
@@ -2457,6 +2466,10 @@ impl Timeline {
|
|||||||
}),
|
}),
|
||||||
));
|
));
|
||||||
continue 'outer;
|
continue 'outer;
|
||||||
|
} else {
|
||||||
|
// TODO: push a breadcrumb to 'traversal_path' to record the fact that
|
||||||
|
// we downloaded / would need to download this layer.
|
||||||
|
layer // download happens outside the scope of `layers` guard object
|
||||||
}
|
}
|
||||||
} else if timeline.ancestor_timeline.is_some() {
|
} else if timeline.ancestor_timeline.is_some() {
|
||||||
// Nothing on this timeline. Traverse to parent
|
// Nothing on this timeline. Traverse to parent
|
||||||
@@ -2896,7 +2909,8 @@ impl Timeline {
|
|||||||
LayerResidenceStatus::Resident,
|
LayerResidenceStatus::Resident,
|
||||||
LayerResidenceEventReason::LayerCreate,
|
LayerResidenceEventReason::LayerCreate,
|
||||||
);
|
);
|
||||||
batch_updates.insert_historic(l);
|
batch_updates.insert_historic(Arc::new(l.layer_desc()));
|
||||||
|
self.layer_cache.insert(l.filename(), l);
|
||||||
batch_updates.flush();
|
batch_updates.flush();
|
||||||
|
|
||||||
// update the timeline's physical size
|
// update the timeline's physical size
|
||||||
@@ -3142,7 +3156,9 @@ impl Timeline {
|
|||||||
LayerResidenceStatus::Resident,
|
LayerResidenceStatus::Resident,
|
||||||
LayerResidenceEventReason::LayerCreate,
|
LayerResidenceEventReason::LayerCreate,
|
||||||
);
|
);
|
||||||
updates.insert_historic(l);
|
updates.insert_historic(Arc::new(l.layer_desc()));
|
||||||
|
let x: Arc<dyn PersistentLayer> = l;
|
||||||
|
self.layer_cache.insert(x.filename(), x)
|
||||||
}
|
}
|
||||||
updates.flush();
|
updates.flush();
|
||||||
drop(layers);
|
drop(layers);
|
||||||
@@ -3155,7 +3171,7 @@ impl Timeline {
|
|||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
struct CompactLevel0Phase1Result {
|
struct CompactLevel0Phase1Result {
|
||||||
new_layers: Vec<DeltaLayer>,
|
new_layers: Vec<DeltaLayer>,
|
||||||
deltas_to_compact: Vec<Arc<dyn PersistentLayer>>,
|
deltas_to_compact: Vec<Arc<RemoteLayerDesc>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Top-level failure to compact.
|
/// Top-level failure to compact.
|
||||||
@@ -3165,7 +3181,7 @@ enum CompactionError {
|
|||||||
///
|
///
|
||||||
/// This should not happen repeatedly, but will be retried once by top-level
|
/// This should not happen repeatedly, but will be retried once by top-level
|
||||||
/// `Timeline::compact`.
|
/// `Timeline::compact`.
|
||||||
DownloadRequired(Vec<Arc<RemoteLayer>>),
|
DownloadRequired(Vec<Arc<RemoteLayerDesc>>),
|
||||||
/// Compaction cannot be done right now; page reconstruction and so on.
|
/// Compaction cannot be done right now; page reconstruction and so on.
|
||||||
Other(anyhow::Error),
|
Other(anyhow::Error),
|
||||||
}
|
}
|
||||||
@@ -3237,13 +3253,9 @@ impl Timeline {
|
|||||||
|
|
||||||
let remotes = deltas_to_compact
|
let remotes = deltas_to_compact
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|l| l.is_remote_layer())
|
.filter(|l| !self.layer_cache.contains(&l.filename()))
|
||||||
.inspect(|l| info!("compact requires download of {}", l.filename().file_name()))
|
.inspect(|l| info!("compact requires download of {}", l.filename().file_name()))
|
||||||
.map(|l| {
|
.cloned()
|
||||||
l.clone()
|
|
||||||
.downcast_remote_layer()
|
|
||||||
.expect("just checked it is remote layer")
|
|
||||||
})
|
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
if !remotes.is_empty() {
|
if !remotes.is_empty() {
|
||||||
@@ -3583,13 +3595,15 @@ impl Timeline {
|
|||||||
.add(metadata.len());
|
.add(metadata.len());
|
||||||
|
|
||||||
new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len()));
|
new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len()));
|
||||||
|
let remote_desc = l.layer_desc();
|
||||||
let x: Arc<dyn PersistentLayer + 'static> = Arc::new(l);
|
let x: Arc<dyn PersistentLayer + 'static> = Arc::new(l);
|
||||||
x.access_stats().record_residence_event(
|
x.access_stats().record_residence_event(
|
||||||
&updates,
|
&updates,
|
||||||
LayerResidenceStatus::Resident,
|
LayerResidenceStatus::Resident,
|
||||||
LayerResidenceEventReason::LayerCreate,
|
LayerResidenceEventReason::LayerCreate,
|
||||||
);
|
);
|
||||||
updates.insert_historic(x);
|
updates.insert_historic(Arc::new(remote_desc));
|
||||||
|
self.layer_cache.insert(x.filename(), x)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now that we have reshuffled the data to set of new delta layers, we can
|
// Now that we have reshuffled the data to set of new delta layers, we can
|
||||||
@@ -4062,7 +4076,7 @@ impl Timeline {
|
|||||||
#[instrument(skip_all, fields(layer=%remote_layer.short_id()))]
|
#[instrument(skip_all, fields(layer=%remote_layer.short_id()))]
|
||||||
pub async fn download_remote_layer(
|
pub async fn download_remote_layer(
|
||||||
&self,
|
&self,
|
||||||
remote_layer: Arc<RemoteLayer>,
|
remote_layer: Arc<RemoteLayerDesc>,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||||
|
|
||||||
@@ -4119,10 +4133,12 @@ impl Timeline {
|
|||||||
// Delta- or ImageLayer in the layer map.
|
// Delta- or ImageLayer in the layer map.
|
||||||
let mut layers = self_clone.layers.write().unwrap();
|
let mut layers = self_clone.layers.write().unwrap();
|
||||||
let mut updates = layers.batch_update();
|
let mut updates = layers.batch_update();
|
||||||
let new_layer = remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size);
|
let new_layer =
|
||||||
|
remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size);
|
||||||
{
|
{
|
||||||
use crate::tenant::layer_map::Replacement;
|
use crate::tenant::layer_map::Replacement;
|
||||||
let l: Arc<dyn PersistentLayer> = remote_layer.clone();
|
let l: Arc<dyn PersistentLayer> = remote_layer.clone();
|
||||||
|
/*
|
||||||
let failure = match updates.replace_historic(&l, new_layer) {
|
let failure = match updates.replace_historic(&l, new_layer) {
|
||||||
Ok(Replacement::Replaced { .. }) => false,
|
Ok(Replacement::Replaced { .. }) => false,
|
||||||
Ok(Replacement::NotFound) => {
|
Ok(Replacement::NotFound) => {
|
||||||
@@ -4177,8 +4193,9 @@ impl Timeline {
|
|||||||
remote_layer
|
remote_layer
|
||||||
.download_replacement_failure
|
.download_replacement_failure
|
||||||
.store(true, Relaxed);
|
.store(true, Relaxed);
|
||||||
}
|
} */
|
||||||
}
|
}
|
||||||
|
|
||||||
updates.flush();
|
updates.flush();
|
||||||
drop(layers);
|
drop(layers);
|
||||||
|
|
||||||
@@ -4191,7 +4208,10 @@ impl Timeline {
|
|||||||
remote_layer.ongoing_download.close();
|
remote_layer.ongoing_download.close();
|
||||||
} else {
|
} else {
|
||||||
// Keep semaphore open. We'll drop the permit at the end of the function.
|
// Keep semaphore open. We'll drop the permit at the end of the function.
|
||||||
error!("layer file download failed: {:?}", result.as_ref().unwrap_err());
|
error!(
|
||||||
|
"layer file download failed: {:?}",
|
||||||
|
result.as_ref().unwrap_err()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Don't treat it as an error if the task that triggered the download
|
// Don't treat it as an error if the task that triggered the download
|
||||||
@@ -4205,7 +4225,8 @@ impl Timeline {
|
|||||||
drop(permit);
|
drop(permit);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}.in_current_span(),
|
}
|
||||||
|
.in_current_span(),
|
||||||
);
|
);
|
||||||
|
|
||||||
receiver.await.context("download task cancelled")?
|
receiver.await.context("download task cancelled")?
|
||||||
@@ -4278,7 +4299,7 @@ impl Timeline {
|
|||||||
let layers = self.layers.read().unwrap();
|
let layers = self.layers.read().unwrap();
|
||||||
layers
|
layers
|
||||||
.iter_historic_layers()
|
.iter_historic_layers()
|
||||||
.filter_map(|l| l.downcast_remote_layer())
|
.filter(|l| !self.layer_cache.contains(&l.filename()))
|
||||||
.map(|l| self.download_remote_layer(l))
|
.map(|l| self.download_remote_layer(l))
|
||||||
.for_each(|dl| downloads.push(dl))
|
.for_each(|dl| downloads.push(dl))
|
||||||
}
|
}
|
||||||
@@ -4353,7 +4374,7 @@ pub struct DiskUsageEvictionInfo {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub struct LocalLayerInfoForDiskUsageEviction {
|
pub struct LocalLayerInfoForDiskUsageEviction {
|
||||||
pub layer: Arc<dyn PersistentLayer>,
|
pub layer: Arc<RemoteLayerDesc>,
|
||||||
pub last_activity_ts: SystemTime,
|
pub last_activity_ts: SystemTime,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -4387,7 +4408,7 @@ impl Timeline {
|
|||||||
let file_size = l.file_size();
|
let file_size = l.file_size();
|
||||||
max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
|
max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
|
||||||
|
|
||||||
if l.is_remote_layer() {
|
if !self.layer_cache.contains(&l.filename()) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -29,7 +29,7 @@ use crate::{
|
|||||||
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
|
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
|
||||||
tenant::{
|
tenant::{
|
||||||
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
|
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
|
||||||
storage_layer::PersistentLayer,
|
storage_layer::{PersistentLayer, RemoteLayerDesc},
|
||||||
LogicalSizeCalculationCause, Tenant,
|
LogicalSizeCalculationCause, Tenant,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
@@ -184,11 +184,11 @@ impl Timeline {
|
|||||||
// NB: all the checks can be invalidated as soon as we release the layer map lock.
|
// NB: all the checks can be invalidated as soon as we release the layer map lock.
|
||||||
// We don't want to hold the layer map lock during eviction.
|
// We don't want to hold the layer map lock during eviction.
|
||||||
// So, we just need to deal with this.
|
// So, we just need to deal with this.
|
||||||
let candidates: Vec<Arc<dyn PersistentLayer>> = {
|
let candidates: Vec<Arc<RemoteLayerDesc>> = {
|
||||||
let layers = self.layers.read().unwrap();
|
let layers = self.layers.read().unwrap();
|
||||||
let mut candidates = Vec::new();
|
let mut candidates = Vec::new();
|
||||||
for hist_layer in layers.iter_historic_layers() {
|
for hist_layer in layers.iter_historic_layers() {
|
||||||
if hist_layer.is_remote_layer() {
|
if !self.layer_cache.contains(&hist_layer.filename()) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user