blanket rename

This commit is contained in:
Joonas Koivunen
2023-08-24 20:06:05 +03:00
parent fb4d404553
commit ecf34bb3e4
7 changed files with 350 additions and 304 deletions

View File

@@ -62,7 +62,7 @@ use crate::{
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
tenant::{
self,
storage_layer::{AsLayerDesc, LayerE},
storage_layer::{AsLayerDesc, Layer},
timeline::EvictionError,
Timeline,
},
@@ -481,7 +481,7 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
#[derive(Clone)]
struct EvictionCandidate {
timeline: Arc<Timeline>,
layer: Arc<LayerE>,
layer: Layer,
last_activity_ts: SystemTime,
}

View File

@@ -606,7 +606,7 @@ impl RemoteTimelineClient {
let upload_queue = guard.initialized_mut()?;
// FIXME: we might be still including no longer existing files in the index_part because
// that consistency is built on strings and gentleman agreements, not Weak<LayerE> which
// that consistency is built on strings and gentleman agreements, not WeakLayer which
// could be upgraded at the time of rendering of index_part.
upload_queue
.latest_files
@@ -1365,7 +1365,7 @@ mod tests {
context::RequestContext,
tenant::{
harness::{TenantHarness, TIMELINE_ID},
storage_layer::{LayerE, PersistentLayerDesc},
storage_layer::{Layer, PersistentLayerDesc},
Tenant, Timeline,
},
DEFAULT_PG_VERSION,
@@ -1541,7 +1541,7 @@ mod tests {
std::fs::write(timeline_path.join(filename.file_name()), content).unwrap();
}
let layer_file_1 = LayerE::for_written(
let layer_file_1 = Layer::for_written(
harness.conf,
&timeline,
PersistentLayerDesc::from_filename(
@@ -1556,7 +1556,7 @@ mod tests {
// FIXME: need that api for local files
assert!(layer_file_1.needs_download_blocking().unwrap().is_none());
let layer_file_2 = LayerE::for_written(
let layer_file_2 = Layer::for_written(
harness.conf,
&timeline,
PersistentLayerDesc::from_filename(
@@ -1569,7 +1569,7 @@ mod tests {
.unwrap();
assert!(layer_file_2.needs_download_blocking().unwrap().is_none());
let layer_file_3 = LayerE::for_written(
let layer_file_3 = Layer::for_written(
harness.conf,
&timeline,
PersistentLayerDesc::from_filename(
@@ -1721,7 +1721,7 @@ mod tests {
)
.unwrap();
let layer_file_1 = LayerE::for_written(
let layer_file_1 = Layer::for_written(
harness.conf,
&timeline,
PersistentLayerDesc::from_filename(

View File

@@ -81,7 +81,7 @@ pub struct ValueReconstructState {
pub img: Option<(Lsn, Bytes)>,
}
/// Return value from [`LayerE::get_value_reconstruct_data`]
/// Return value from [`Layer::get_value_reconstruct_data`]
#[derive(Clone, Copy, Debug)]
pub enum ValueReconstructResult {
/// Got all the data needed to reconstruct the requested page
@@ -312,7 +312,7 @@ impl LayerAccessStats {
///
/// However when we want something evicted, we cannot evict it right away as there might be current
/// reads happening on it. It has been for example searched from [`LayerMap`] but not yet
/// [`LayerE::get_value_reconstruct_data`].
/// [`Layer::get_value_reconstruct_data`].
///
/// [`LayerMap`]: crate::tenant::layer_map::LayerMap
enum ResidentOrWantedEvicted {
@@ -363,9 +363,266 @@ impl ResidentOrWantedEvicted {
/// LSN.
///
/// This type models the on-disk layers, which can be evicted and on-demand downloaded.
#[derive(Clone)]
pub(crate) struct Layer(Arc<LayerInner>);
impl std::fmt::Display for Layer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.layer_desc().short_id())
}
}
impl std::fmt::Debug for Layer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self)
}
}
impl AsLayerDesc for Layer {
fn layer_desc(&self) -> &PersistentLayerDesc {
self.0.layer_desc()
}
}
impl Layer {
pub(crate) fn for_evicted(
conf: &'static PageServerConf,
timeline: &Arc<Timeline>,
file_name: LayerFileName,
metadata: LayerFileMetadata,
) -> Self {
let path = conf
.timeline_path(&timeline.tenant_id, &timeline.timeline_id)
.join(file_name.file_name());
let desc = PersistentLayerDesc::from_filename(
timeline.tenant_id,
timeline.timeline_id,
file_name,
metadata.file_size(),
);
let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted);
let outer = Arc::new(LayerInner {
conf,
path,
desc,
timeline: Arc::downgrade(timeline),
access_stats,
inner: heavier_once_cell::OnceCell::default(),
wanted_garbage_collected: AtomicBool::default(),
wanted_evicted: AtomicBool::default(),
version: AtomicUsize::default(),
have_remote_client: timeline.remote_client.is_some(),
status: tokio::sync::broadcast::channel(1).0,
});
debug_assert!(outer.needs_download_blocking().unwrap().is_some());
Layer(outer)
}
pub(crate) fn for_resident(
conf: &'static PageServerConf,
timeline: &Arc<Timeline>,
file_name: LayerFileName,
metadata: LayerFileMetadata,
) -> ResidentLayer {
let path = conf
.timeline_path(&timeline.tenant_id, &timeline.timeline_id)
.join(file_name.file_name());
let desc = PersistentLayerDesc::from_filename(
timeline.tenant_id,
timeline.timeline_id,
file_name,
metadata.file_size(),
);
let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident);
let mut resident = None;
let outer = Arc::new_cyclic(|owner| {
let inner = Arc::new(DownloadedLayer {
owner: owner.clone(),
kind: tokio::sync::OnceCell::default(),
});
resident = Some(inner.clone());
LayerInner {
conf,
path,
desc,
timeline: Arc::downgrade(timeline),
have_remote_client: timeline.remote_client.is_some(),
access_stats,
wanted_garbage_collected: AtomicBool::new(false),
wanted_evicted: AtomicBool::new(false),
inner: heavier_once_cell::OnceCell::new(ResidentOrWantedEvicted::Resident(inner)),
version: AtomicUsize::new(0),
status: tokio::sync::broadcast::channel(1).0,
}
});
debug_assert!(outer.needs_download_blocking().unwrap().is_none());
let downloaded = resident.expect("just initialized");
ResidentLayer {
downloaded,
owner: Layer(outer),
}
}
pub(crate) fn for_written(
conf: &'static PageServerConf,
timeline: &Arc<Timeline>,
desc: PersistentLayerDesc,
) -> anyhow::Result<ResidentLayer> {
let path = conf
.timeline_path(&desc.tenant_id, &desc.timeline_id)
.join(desc.filename().to_string());
let mut resident = None;
let outer = Arc::new_cyclic(|owner| {
let inner = Arc::new(DownloadedLayer {
owner: owner.clone(),
kind: tokio::sync::OnceCell::default(),
});
resident = Some(inner.clone());
LayerInner {
conf,
path,
desc,
timeline: Arc::downgrade(timeline),
have_remote_client: timeline.remote_client.is_some(),
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
wanted_garbage_collected: AtomicBool::new(false),
wanted_evicted: AtomicBool::new(false),
inner: heavier_once_cell::OnceCell::new(ResidentOrWantedEvicted::Resident(inner)),
version: AtomicUsize::new(0),
status: tokio::sync::broadcast::channel(1).0,
}
});
// FIXME: ugly, but if we don't do this check here, any error will pop up at read time
// but we cannot check it because DeltaLayerWriter and ImageLayerWriter create the
// instances *before* renaming the file to final destination
// anyhow::ensure!(
// outer.needs_download_blocking()?.is_none(),
// "should not need downloading if it was just written"
// );
// FIXME: because we can now do garbage collection on drop, should we mark these files as
// garbage collected until they get really get added to LayerMap? consider that files are
// written out to disk, fsynced, renamed by `{Delta,Image}LayerWriter`, then waiting for
// remaining files to be generated (compaction, create_image_layers) before being added to
// LayerMap. We could panic or just error out during that time, even for unrelated reasons,
// but the files would be left.
Ok(ResidentLayer {
downloaded: resident.expect("just wrote Some"),
owner: Layer(outer),
})
}
pub(crate) async fn evict_and_wait(
&self,
rtc: &RemoteTimelineClient,
) -> Result<(), super::timeline::EvictionError> {
self.0.evict_and_wait(rtc).await
}
/// Delete the layer file when the `self` gets dropped, also schedule a remote index upload
/// then perhaps.
pub(crate) fn garbage_collect(&self) {
self.0.garbage_collect();
}
/// Return data needed to reconstruct given page at LSN.
///
/// It is up to the caller to collect more data from previous layer and
/// perform WAL redo, if necessary.
///
/// See PageReconstructResult for possible return values. The collected data
/// is appended to reconstruct_data; the caller should pass an empty struct
/// on first call, or a struct with a cached older image of the page if one
/// is available. If this returns ValueReconstructResult::Continue, look up
/// the predecessor layer and call again with the same 'reconstruct_data' to
/// collect more data.
pub(crate) async fn get_value_reconstruct_data(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_data: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
use anyhow::ensure;
let layer = self.0.get_or_maybe_download(true, Some(ctx)).await?;
self.0
.access_stats
.record_access(LayerAccessKind::GetValueReconstructData, ctx);
if self.layer_desc().is_delta {
ensure!(lsn_range.start >= self.layer_desc().lsn_range.start);
ensure!(self.layer_desc().key_range.contains(&key));
} else {
ensure!(self.layer_desc().key_range.contains(&key));
ensure!(lsn_range.start >= self.layer_desc().image_layer_lsn());
ensure!(lsn_range.end >= self.layer_desc().image_layer_lsn());
}
layer
.get_value_reconstruct_data(key, lsn_range, reconstruct_data, &self.0)
.await
}
/// Download the layer if evicted.
///
/// Will not error when it is already downloaded.
pub(crate) async fn get_or_download(&self) -> anyhow::Result<()> {
self.0.get_or_maybe_download(true, None).await?;
Ok(())
}
/// Creates a guard object which prohibit evicting this layer as long as the value is kept
/// around.
pub(crate) async fn guard_against_eviction(
&self,
allow_download: bool,
) -> anyhow::Result<ResidentLayer> {
let downloaded = self.0.get_or_maybe_download(allow_download, None).await?;
Ok(ResidentLayer {
downloaded,
owner: self.clone(),
})
}
pub(crate) fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
self.0.info(reset)
}
pub(crate) fn access_stats(&self) -> &LayerAccessStats {
&self.0.access_stats
}
pub(crate) fn local_path(&self) -> &std::path::Path {
&self.0.path
}
#[cfg(test)]
pub(crate) fn needs_download_blocking(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
self.0.needs_download_blocking()
}
}
// TODO:
// - internal arc, because I've now worked away majority of external wrapping
pub(crate) struct LayerE {
struct LayerInner {
// only needed to check ondemand_download_behavior_treat_error_as_warn
conf: &'static PageServerConf,
path: PathBuf,
@@ -380,7 +637,7 @@ pub(crate) struct LayerE {
/// Initialization and deinitialization is done while holding a permit.
inner: heavier_once_cell::OnceCell<ResidentOrWantedEvicted>,
/// Do we want to garbage collect this when `LayerE` is dropped, where garbage collection
/// Do we want to garbage collect this when `LayerInner` is dropped, where garbage collection
/// means:
/// - schedule remote deletion
/// - instant local deletion
@@ -407,31 +664,25 @@ pub(crate) struct LayerE {
status: tokio::sync::broadcast::Sender<Status>,
}
impl std::fmt::Display for LayerInner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.layer_desc().short_id())
}
}
impl AsLayerDesc for LayerInner {
fn layer_desc(&self) -> &PersistentLayerDesc {
&self.desc
}
}
#[derive(Debug, Clone, Copy)]
enum Status {
Evicted,
Downloaded,
}
impl std::fmt::Display for LayerE {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.layer_desc().short_id())
}
}
impl std::fmt::Debug for LayerE {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self)
}
}
impl AsLayerDesc for LayerE {
fn layer_desc(&self) -> &PersistentLayerDesc {
&self.desc
}
}
impl Drop for LayerE {
impl Drop for LayerInner {
fn drop(&mut self) {
if !*self.wanted_garbage_collected.get_mut() {
// should we try to evict if the last wish was for eviction?
@@ -488,152 +739,13 @@ impl Drop for LayerE {
}
}
impl LayerE {
pub(crate) fn for_evicted(
conf: &'static PageServerConf,
timeline: &Arc<Timeline>,
file_name: LayerFileName,
metadata: LayerFileMetadata,
) -> Arc<LayerE> {
let path = conf
.timeline_path(&timeline.tenant_id, &timeline.timeline_id)
.join(file_name.file_name());
let desc = PersistentLayerDesc::from_filename(
timeline.tenant_id,
timeline.timeline_id,
file_name,
metadata.file_size(),
);
let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted);
let outer = Arc::new(LayerE {
conf,
path,
desc,
timeline: Arc::downgrade(timeline),
access_stats,
inner: heavier_once_cell::OnceCell::default(),
wanted_garbage_collected: AtomicBool::default(),
wanted_evicted: AtomicBool::default(),
version: AtomicUsize::default(),
have_remote_client: timeline.remote_client.is_some(),
status: tokio::sync::broadcast::channel(1).0,
});
debug_assert!(outer.needs_download_blocking().unwrap().is_some());
outer
}
pub(crate) fn for_resident(
conf: &'static PageServerConf,
timeline: &Arc<Timeline>,
file_name: LayerFileName,
metadata: LayerFileMetadata,
) -> ResidentLayer {
let path = conf
.timeline_path(&timeline.tenant_id, &timeline.timeline_id)
.join(file_name.file_name());
let desc = PersistentLayerDesc::from_filename(
timeline.tenant_id,
timeline.timeline_id,
file_name,
metadata.file_size(),
);
let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident);
let mut resident = None;
let outer = Arc::new_cyclic(|owner| {
let inner = Arc::new(DownloadedLayer {
owner: owner.clone(),
kind: tokio::sync::OnceCell::default(),
});
resident = Some(inner.clone());
LayerE {
conf,
path,
desc,
timeline: Arc::downgrade(timeline),
have_remote_client: timeline.remote_client.is_some(),
access_stats,
wanted_garbage_collected: AtomicBool::new(false),
wanted_evicted: AtomicBool::new(false),
inner: heavier_once_cell::OnceCell::new(ResidentOrWantedEvicted::Resident(inner)),
version: AtomicUsize::new(0),
status: tokio::sync::broadcast::channel(1).0,
}
});
debug_assert!(outer.needs_download_blocking().unwrap().is_none());
let downloaded = resident.expect("just initialized");
ResidentLayer {
downloaded,
owner: outer,
}
}
pub(crate) fn for_written(
conf: &'static PageServerConf,
timeline: &Arc<Timeline>,
desc: PersistentLayerDesc,
) -> anyhow::Result<ResidentLayer> {
let path = conf
.timeline_path(&desc.tenant_id, &desc.timeline_id)
.join(desc.filename().to_string());
let mut resident = None;
let outer = Arc::new_cyclic(|owner| {
let inner = Arc::new(DownloadedLayer {
owner: owner.clone(),
kind: tokio::sync::OnceCell::default(),
});
resident = Some(inner.clone());
LayerE {
conf,
path,
desc,
timeline: Arc::downgrade(timeline),
have_remote_client: timeline.remote_client.is_some(),
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
wanted_garbage_collected: AtomicBool::new(false),
wanted_evicted: AtomicBool::new(false),
inner: heavier_once_cell::OnceCell::new(ResidentOrWantedEvicted::Resident(inner)),
version: AtomicUsize::new(0),
status: tokio::sync::broadcast::channel(1).0,
}
});
// FIXME: ugly, but if we don't do this check here, any error will pop up at read time
// but we cannot check it because DeltaLayerWriter and ImageLayerWriter create the
// instances *before* renaming the file to final destination
// anyhow::ensure!(
// outer.needs_download_blocking()?.is_none(),
// "should not need downloading if it was just written"
// );
// FIXME: because we can now do garbage collection on drop, should we mark these files as
// garbage collected until they get really get added to LayerMap? consider that files are
// written out to disk, fsynced, renamed by `{Delta,Image}LayerWriter`, then waiting for
// remaining files to be generated (compaction, create_image_layers) before being added to
// LayerMap. We could panic or just error out during that time, even for unrelated reasons,
// but the files would be left.
Ok(ResidentLayer {
downloaded: resident.expect("just wrote Some"),
owner: outer,
})
impl LayerInner {
fn garbage_collect(&self) {
self.wanted_garbage_collected.store(true, Ordering::Release);
}
pub(crate) async fn evict_and_wait(
self: &Arc<Self>,
&self,
_: &RemoteTimelineClient,
) -> Result<(), super::timeline::EvictionError> {
use tokio::sync::broadcast::error::RecvError;
@@ -682,69 +794,6 @@ impl LayerE {
Self::get_or_apply_evictedness(locked, &self.wanted_evicted)
}
/// Delete the layer file when the `self` gets dropped, also schedule a remote index upload
/// then perhaps.
pub(crate) fn garbage_collect(&self) {
self.wanted_garbage_collected.store(true, Ordering::Release);
}
/// Return data needed to reconstruct given page at LSN.
///
/// It is up to the caller to collect more data from previous layer and
/// perform WAL redo, if necessary.
///
/// See PageReconstructResult for possible return values. The collected data
/// is appended to reconstruct_data; the caller should pass an empty struct
/// on first call, or a struct with a cached older image of the page if one
/// is available. If this returns ValueReconstructResult::Continue, look up
/// the predecessor layer and call again with the same 'reconstruct_data' to
/// collect more data.
pub(crate) async fn get_value_reconstruct_data(
self: &Arc<Self>,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_data: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
use anyhow::ensure;
let layer = self.get_or_maybe_download(true, Some(ctx)).await?;
self.access_stats
.record_access(LayerAccessKind::GetValueReconstructData, ctx);
if self.layer_desc().is_delta {
ensure!(lsn_range.start >= self.layer_desc().lsn_range.start);
ensure!(self.layer_desc().key_range.contains(&key));
} else {
ensure!(self.layer_desc().key_range.contains(&key));
ensure!(lsn_range.start >= self.layer_desc().image_layer_lsn());
ensure!(lsn_range.end >= self.layer_desc().image_layer_lsn());
}
layer
.get_value_reconstruct_data(key, lsn_range, reconstruct_data, self)
.await
}
/// Creates a guard object which prohibit evicting this layer as long as the value is kept
/// around.
pub(crate) async fn guard_against_eviction(
self: &Arc<Self>,
allow_download: bool,
) -> anyhow::Result<ResidentLayer> {
let downloaded = self.get_or_maybe_download(allow_download, None).await?;
Ok(ResidentLayer {
downloaded,
owner: self.clone(),
})
}
pub(crate) async fn get_or_download(self: &Arc<Self>) -> anyhow::Result<()> {
self.get_or_maybe_download(true, None).await?;
Ok(())
}
fn get_or_apply_evictedness(
guard: Option<heavier_once_cell::Guard<'_, ResidentOrWantedEvicted>>,
wanted_evicted: &AtomicBool,
@@ -834,7 +883,7 @@ impl LayerE {
}
if !allow_download {
// this does look weird, but for LayerE the "downloading" means also changing
// this does look weird, but for LayerInner the "downloading" means also changing
// internal once related state ...
return Err(DownloadError::DownloadRequired);
}
@@ -845,7 +894,7 @@ impl LayerE {
// this is sadly needed because of task_mgr::shutdown_tasks, otherwise we cannot
// block tenant::mgr::remove_tenant_from_memory.
let this = self.clone();
let this: Arc<Self> = self.clone();
crate::task_mgr::spawn(
&tokio::runtime::Handle::current(),
TaskKind::RemoteDownloadTask,
@@ -955,21 +1004,16 @@ impl LayerE {
)
}
pub(crate) fn local_path(&self) -> &std::path::Path {
// maybe it does make sense to have this or maybe not
&self.path
}
async fn needs_download(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
match tokio::fs::metadata(self.local_path()).await {
match tokio::fs::metadata(&self.path).await {
Ok(m) => Ok(self.is_file_present_and_good_size(&m)),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Some(NeedsDownload::NotFound)),
Err(e) => Err(e),
}
}
pub(crate) fn needs_download_blocking(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
match self.local_path().metadata() {
fn needs_download_blocking(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
match self.path.metadata() {
Ok(m) => Ok(self.is_file_present_and_good_size(&m)),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Some(NeedsDownload::NotFound)),
Err(e) => Err(e),
@@ -991,13 +1035,14 @@ impl LayerE {
}
}
pub(crate) fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo {
let layer_file_name = self.desc.filename().file_name();
let remote = self
.needs_download_blocking()
.map(|maybe| maybe.is_some())
.unwrap_or(false);
.unwrap_or(true);
let access_stats = self.access_stats.as_api_model(reset);
if self.desc.is_delta {
@@ -1024,12 +1069,8 @@ impl LayerE {
}
}
pub(crate) fn access_stats(&self) -> &LayerAccessStats {
&self.access_stats
}
/// Our resident layer has been dropped, we might hold the lock elsewhere.
fn on_drop(self: Arc<LayerE>) {
fn on_drop(self: Arc<LayerInner>) {
let gc = self.wanted_garbage_collected.load(Ordering::Acquire);
let evict = self.wanted_evicted.load(Ordering::Acquire);
let can_evict = self.have_remote_client;
@@ -1053,7 +1094,7 @@ impl LayerE {
let eviction = {
let span = tracing::info_span!(parent: span.clone(), "blocking");
async move {
// the layer is already gone, don't do anything. LayerE drop has already ran.
// the layer is already gone, don't do anything. LayerInner drop has already ran.
let Some(this) = this.upgrade() else { return; };
// deleted or detached timeline, don't do anything.
@@ -1206,7 +1247,7 @@ impl std::fmt::Display for NeedsDownload {
/// or garbage collection happens.
#[derive(Clone)]
pub(crate) struct ResidentLayer {
owner: Arc<LayerE>,
owner: Layer,
downloaded: Arc<DownloadedLayer>,
}
@@ -1223,7 +1264,7 @@ impl std::fmt::Debug for ResidentLayer {
}
impl ResidentLayer {
pub(crate) fn drop_eviction_guard(self) -> Arc<LayerE> {
pub(crate) fn drop_eviction_guard(self) -> Layer {
self.into()
}
@@ -1234,9 +1275,11 @@ impl ResidentLayer {
) -> anyhow::Result<Vec<DeltaEntry<'_>>> {
use LayerKind::*;
match self.downloaded.get(&self.owner).await? {
let inner = &self.owner.0;
match self.downloaded.get(inner).await? {
Delta(d) => {
self.owner
inner
.access_stats
.record_access(LayerAccessKind::KeyIter, ctx);
@@ -1248,6 +1291,19 @@ impl ResidentLayer {
Image(_) => anyhow::bail!("cannot load_keys on a image layer"),
}
}
pub(crate) fn local_path(&self) -> &std::path::Path {
&self.owner.0.path
}
pub(crate) fn access_stats(&self) -> &LayerAccessStats {
self.owner.access_stats()
}
#[cfg(test)]
pub(crate) fn needs_download_blocking(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
self.owner.needs_download_blocking()
}
}
impl AsLayerDesc for ResidentLayer {
@@ -1256,34 +1312,26 @@ impl AsLayerDesc for ResidentLayer {
}
}
impl AsRef<Arc<LayerE>> for ResidentLayer {
fn as_ref(&self) -> &Arc<LayerE> {
impl AsRef<Layer> for ResidentLayer {
fn as_ref(&self) -> &Layer {
&self.owner
}
}
/// Allow slimming down if we don't want the `2*usize` with eviction candidates?
impl From<ResidentLayer> for Arc<LayerE> {
impl From<ResidentLayer> for Layer {
fn from(value: ResidentLayer) -> Self {
value.owner
}
}
impl std::ops::Deref for ResidentLayer {
type Target = LayerE;
fn deref(&self) -> &Self::Target {
&self.owner
}
}
#[derive(Debug, thiserror::Error)]
#[error("Layer has been removed from LayerMap already")]
pub(crate) struct RemovedFromLayerMap;
/// Holds the actual downloaded layer, and handles evicting the file on drop.
pub(crate) struct DownloadedLayer {
owner: Weak<LayerE>,
owner: Weak<LayerInner>,
kind: tokio::sync::OnceCell<anyhow::Result<LayerKind>>,
}
@@ -1308,7 +1356,7 @@ impl Drop for DownloadedLayer {
}
impl DownloadedLayer {
async fn get(&self, owner: &LayerE) -> anyhow::Result<&LayerKind> {
async fn get(&self, owner: &LayerInner) -> anyhow::Result<&LayerKind> {
// the owner is required so that we don't have to upgrade the self.owner, which will only
// be used on drop. this way, initializing a DownloadedLayer without an owner is statically
// impossible, so we can just not worry about it.
@@ -1347,7 +1395,7 @@ impl DownloadedLayer {
key: Key,
lsn_range: Range<Lsn>,
reconstruct_data: &mut ValueReconstructState,
owner: &LayerE,
owner: &LayerInner,
) -> anyhow::Result<ValueReconstructResult> {
use LayerKind::*;

View File

@@ -34,7 +34,7 @@ use crate::repository::{Key, Value, KEY_SIZE};
use crate::tenant::blob_io::{BlobWriter, WriteBlobWriter};
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::storage_layer::{LayerE, ValueReconstructResult, ValueReconstructState};
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
use crate::tenant::Timeline;
use crate::virtual_file::VirtualFile;
use crate::{walrecord, TEMP_FILE_SUFFIX};
@@ -561,7 +561,7 @@ impl DeltaLayerWriterInner {
// fsync the file
file.sync_all()?;
let layer = LayerE::for_written(self.conf, timeline, desc)?;
let layer = Layer::for_written(self.conf, timeline, desc)?;
// Rename the file to its final name
//
// Note: This overwrites any existing file. There shouldn't be any.

View File

@@ -59,7 +59,7 @@ use utils::{
};
use super::filename::ImageFileName;
use super::{AsLayerDesc, LayerE, PersistentLayerDesc, ResidentLayer};
use super::{AsLayerDesc, Layer, PersistentLayerDesc, ResidentLayer};
///
/// Header stored in the beginning of the file
@@ -495,7 +495,7 @@ impl ImageLayerWriterInner {
// fsync the file
file.sync_all()?;
let layer = LayerE::for_written(self.conf, timeline, desc)?;
let layer = Layer::for_written(self.conf, timeline, desc)?;
// Rename the file to its final name
//

View File

@@ -36,7 +36,7 @@ use crate::context::{
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::storage_layer::delta_layer::DeltaEntry;
use crate::tenant::storage_layer::{
AsLayerDesc, DeltaLayerWriter, ImageLayerWriter, InMemoryLayer, LayerAccessStatsReset, LayerE,
AsLayerDesc, DeltaLayerWriter, ImageLayerWriter, InMemoryLayer, Layer, LayerAccessStatsReset,
LayerFileName, ResidentLayer, ValueReconstructResult, ValueReconstructState,
};
use crate::tenant::timeline::logical_size::CurrentLogicalSize;
@@ -978,7 +978,7 @@ impl Timeline {
let Ok(local_layer) = local_layer.guard_against_eviction(false).await else { return Ok(Some(false)); };
let local_layer: Arc<LayerE> = local_layer.into();
let local_layer: Layer = local_layer.into();
let remote_client = self
.remote_client
@@ -1001,7 +1001,7 @@ impl Timeline {
/// Evict a batch of layers.
pub(crate) async fn evict_layers(
&self,
layers_to_evict: &[Arc<LayerE>],
layers_to_evict: &[Layer],
cancel: &CancellationToken,
) -> anyhow::Result<Vec<Option<Result<(), EvictionError>>>> {
let remote_client = self
@@ -1030,7 +1030,7 @@ impl Timeline {
async fn evict_layer_batch(
&self,
remote_client: &Arc<RemoteTimelineClient>,
layers_to_evict: &[Arc<LayerE>],
layers_to_evict: &[Layer],
cancel: &CancellationToken,
) -> anyhow::Result<Vec<Option<Result<(), EvictionError>>>> {
// ensure that the layers have finished uploading
@@ -1510,17 +1510,17 @@ impl Timeline {
let layer = match decision {
NeedsUpload(m) => {
total_physical_size += m.file_size();
let resident = LayerE::for_resident(conf, &this, name, m.clone());
let resident = Layer::for_resident(conf, &this, name, m.clone());
let layer = resident.as_ref().clone();
needs_upload.push((resident, m));
layer
}
UseLocal(m) => {
total_physical_size += m.file_size();
LayerE::for_resident(conf, &this, name, m).drop_eviction_guard()
Layer::for_resident(conf, &this, name, m).drop_eviction_guard()
}
Evicted(remote) | UseRemote { remote, .. } => {
LayerE::for_evicted(conf, &this, name, remote)
Layer::for_evicted(conf, &this, name, remote)
}
};
@@ -1882,7 +1882,7 @@ impl Timeline {
}
}
async fn find_layer(&self, layer_file_name: &str) -> Option<Arc<LayerE>> {
async fn find_layer(&self, layer_file_name: &str) -> Option<Layer> {
let guard = self.layers.read().await;
for historic_layer in guard.layer_map().iter_historic_layers() {
let historic_layer_name = historic_layer.filename().file_name();
@@ -1901,7 +1901,7 @@ trait TraversalLayerExt {
fn traversal_id(&self) -> TraversalId;
}
impl TraversalLayerExt for Arc<LayerE> {
impl TraversalLayerExt for Layer {
fn traversal_id(&self) -> TraversalId {
self.local_path().display().to_string()
}
@@ -2140,7 +2140,7 @@ impl Timeline {
result,
cont_lsn,
Box::new({
let layer = Arc::clone(&layer);
let layer = layer.to_owned();
move || layer.traversal_id()
}),
));
@@ -2759,7 +2759,7 @@ impl Timeline {
#[derive(Default)]
struct CompactLevel0Phase1Result {
new_layers: Vec<ResidentLayer>,
deltas_to_compact: Vec<Arc<LayerE>>,
deltas_to_compact: Vec<Layer>,
}
/// Top-level failure to compact.
@@ -3738,7 +3738,7 @@ impl Timeline {
let gc_layers = layers_to_remove
.iter()
.map(|x| guard.get_from_desc(x))
.collect::<Vec<Arc<LayerE>>>();
.collect::<Vec<Layer>>();
result.layers_removed = gc_layers.len() as u64;
@@ -4014,7 +4014,7 @@ pub(crate) struct DiskUsageEvictionInfo {
}
pub(crate) struct LocalLayerInfoForDiskUsageEviction {
pub layer: Arc<LayerE>,
pub layer: Layer,
pub last_activity_ts: SystemTime,
}
@@ -4189,12 +4189,10 @@ fn rename_to_backup(path: &Path) -> anyhow::Result<()> {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use utils::{id::TimelineId, lsn::Lsn};
use crate::tenant::{
harness::TenantHarness, storage_layer::LayerE, timeline::EvictionError, Timeline,
harness::TenantHarness, storage_layer::Layer, timeline::EvictionError, Timeline,
};
#[tokio::test]
@@ -4287,7 +4285,7 @@ mod tests {
.expect("no cancellation")
}
async fn find_some_layer(timeline: &Timeline) -> Arc<LayerE> {
async fn find_some_layer(timeline: &Timeline) -> Layer {
let layers = timeline.layers.read().await;
let desc = layers
.layer_map()

View File

@@ -12,7 +12,7 @@ use crate::{
tenant::{
layer_map::{BatchedUpdates, LayerMap},
storage_layer::{
AsLayerDesc, InMemoryLayer, LayerE, PersistentLayerDesc, PersistentLayerKey,
AsLayerDesc, InMemoryLayer, Layer, PersistentLayerDesc, PersistentLayerKey,
ResidentLayer,
},
},
@@ -21,7 +21,7 @@ use crate::{
/// Provides semantic APIs to manipulate the layer map.
pub(crate) struct LayerManager {
layer_map: LayerMap,
layer_fmgr: LayerFileManager<LayerE>,
layer_fmgr: LayerFileManager<Layer>,
}
/// After GC, the layer map changes will not be applied immediately. Users should manually apply the changes after
@@ -42,7 +42,7 @@ impl LayerManager {
}
}
pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc<LayerE> {
pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
self.layer_fmgr.get_from_desc(desc)
}
@@ -59,7 +59,7 @@ impl LayerManager {
/// 2. next open layer (with disk disk_consistent_lsn LSN)
pub(crate) fn initialize_local_layers(
&mut self,
on_disk_layers: Vec<Arc<LayerE>>,
on_disk_layers: Vec<Layer>,
next_open_layer_at: Lsn,
) {
let mut updates = self.layer_map.batch_update();
@@ -211,7 +211,7 @@ impl LayerManager {
pub(crate) fn finish_compact_l0(
&mut self,
layer_removal_cs: &Arc<tokio::sync::OwnedMutexGuard<()>>,
compact_from: Vec<Arc<LayerE>>,
compact_from: Vec<Layer>,
compact_to: &[ResidentLayer],
metrics: &crate::metrics::TimelineMetrics,
) -> Result<()> {
@@ -241,7 +241,7 @@ impl LayerManager {
pub(crate) fn finish_gc_timeline(
&mut self,
layer_removal_cs: &Arc<tokio::sync::OwnedMutexGuard<()>>,
gc_layers: Vec<Arc<LayerE>>,
gc_layers: Vec<Layer>,
) -> Result<ApplyGcResultGuard> {
let mut updates = self.layer_map.batch_update();
for doomed_layer in gc_layers {
@@ -257,9 +257,9 @@ impl LayerManager {
/// Helper function to insert a layer into the layer map and file manager.
fn insert_historic_layer(
layer: Arc<LayerE>,
layer: Layer,
updates: &mut BatchedUpdates<'_>,
mapping: &mut LayerFileManager<LayerE>,
mapping: &mut LayerFileManager<Layer>,
) {
updates.insert_historic(layer.layer_desc().clone());
mapping.insert(layer);
@@ -270,9 +270,9 @@ impl LayerManager {
fn delete_historic_layer(
// we cannot remove layers otherwise, since gc and compaction will race
_layer_removal_cs: &Arc<tokio::sync::OwnedMutexGuard<()>>,
layer: Arc<LayerE>,
layer: Layer,
updates: &mut BatchedUpdates<'_>,
mapping: &mut LayerFileManager<LayerE>,
mapping: &mut LayerFileManager<Layer>,
) -> anyhow::Result<()> {
let desc = layer.layer_desc();