mirror of
https://github.com/neondatabase/neon.git
synced 2026-04-24 18:00:37 +00:00
Compare commits
7 Commits
jcsp/issue
...
jcsp/layer
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
93e069ceae | ||
|
|
90e27f1800 | ||
|
|
0f533194d2 | ||
|
|
5b181443b3 | ||
|
|
41401ea2b8 | ||
|
|
2a8197b7ce | ||
|
|
d07bc7ba01 |
@@ -3,7 +3,6 @@ use pageserver::repository::Key;
|
||||
use pageserver::tenant::layer_map::LayerMap;
|
||||
use pageserver::tenant::storage_layer::LayerFileName;
|
||||
use pageserver::tenant::storage_layer::PersistentLayerDesc;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
|
||||
use std::cmp::{max, min};
|
||||
use std::fs::File;
|
||||
@@ -11,7 +10,6 @@ use std::io::{BufRead, BufReader};
|
||||
use std::path::PathBuf;
|
||||
use std::str::FromStr;
|
||||
use std::time::Instant;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
@@ -211,13 +209,8 @@ fn bench_sequential(c: &mut Criterion) {
|
||||
for i in 0..100_000 {
|
||||
let i32 = (i as u32) % 100;
|
||||
let zero = Key::from_hex("000000000000000000000000000000000000").unwrap();
|
||||
let layer = PersistentLayerDesc::new_img(
|
||||
TenantShardId::unsharded(TenantId::generate()),
|
||||
TimelineId::generate(),
|
||||
zero.add(10 * i32)..zero.add(10 * i32 + 1),
|
||||
Lsn(i),
|
||||
0,
|
||||
);
|
||||
let layer =
|
||||
PersistentLayerDesc::new_img(zero.add(10 * i32)..zero.add(10 * i32 + 1), Lsn(i), 0);
|
||||
updates.insert_historic(layer);
|
||||
}
|
||||
updates.flush();
|
||||
|
||||
@@ -310,8 +310,8 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
.unwrap()
|
||||
.as_micros(),
|
||||
partition,
|
||||
desc.tenant_shard_id,
|
||||
desc.timeline_id,
|
||||
candidate.timeline.tenant_shard_id,
|
||||
candidate.timeline.timeline_id,
|
||||
candidate.layer,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -1271,11 +1271,12 @@ impl RemoteTimelineClient {
|
||||
|
||||
let upload_result: anyhow::Result<()> = match &task.op {
|
||||
UploadOp::UploadLayer(ref layer, ref layer_metadata) => {
|
||||
let path = layer.local_path();
|
||||
let path = layer.local_path_from_id(&self.tenant_shard_id, &self.timeline_id);
|
||||
|
||||
upload::upload_timeline_layer(
|
||||
self.conf,
|
||||
&self.storage_impl,
|
||||
path,
|
||||
&path,
|
||||
layer_metadata,
|
||||
self.generation,
|
||||
)
|
||||
|
||||
@@ -24,7 +24,7 @@ use tracing::warn;
|
||||
use utils::history_buffer::HistoryBufferWithDropCounter;
|
||||
use utils::rate_limit::RateLimit;
|
||||
|
||||
use utils::{id::TimelineId, lsn::Lsn};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
pub use delta_layer::{DeltaLayer, DeltaLayerWriter, ValueRef};
|
||||
pub use filename::{DeltaFileName, ImageFileName, LayerFileName};
|
||||
@@ -301,31 +301,17 @@ pub trait AsLayerDesc {
|
||||
}
|
||||
|
||||
pub mod tests {
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
|
||||
use super::*;
|
||||
|
||||
impl From<DeltaFileName> for PersistentLayerDesc {
|
||||
fn from(value: DeltaFileName) -> Self {
|
||||
PersistentLayerDesc::new_delta(
|
||||
TenantShardId::from([0; 18]),
|
||||
TimelineId::from_array([0; 16]),
|
||||
value.key_range,
|
||||
value.lsn_range,
|
||||
233,
|
||||
)
|
||||
PersistentLayerDesc::new_delta(value.key_range, value.lsn_range, 233)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ImageFileName> for PersistentLayerDesc {
|
||||
fn from(value: ImageFileName) -> Self {
|
||||
PersistentLayerDesc::new_img(
|
||||
TenantShardId::from([0; 18]),
|
||||
TimelineId::from_array([0; 16]),
|
||||
value.key_range,
|
||||
value.lsn,
|
||||
233,
|
||||
)
|
||||
PersistentLayerDesc::new_img(value.key_range, value.lsn, 233)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -84,17 +84,6 @@ pub struct Summary {
|
||||
pub index_root_blk: u32,
|
||||
}
|
||||
|
||||
impl From<&DeltaLayer> for Summary {
|
||||
fn from(layer: &DeltaLayer) -> Self {
|
||||
Self::expected(
|
||||
layer.desc.tenant_shard_id.tenant_id,
|
||||
layer.desc.timeline_id,
|
||||
layer.desc.key_range.clone(),
|
||||
layer.desc.lsn_range.clone(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl Summary {
|
||||
pub(super) fn expected(
|
||||
tenant_id: TenantId,
|
||||
@@ -320,15 +309,9 @@ impl DeltaLayer {
|
||||
.metadata()
|
||||
.context("get file metadata to determine size")?;
|
||||
|
||||
// TODO(sharding): we must get the TenantShardId from the path instead of reading the Summary.
|
||||
// we should also validate the path against the Summary, as both should contain the same tenant, timeline, key, lsn.
|
||||
let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
|
||||
|
||||
Ok(DeltaLayer {
|
||||
path: path.to_path_buf(),
|
||||
desc: PersistentLayerDesc::new_delta(
|
||||
tenant_shard_id,
|
||||
summary.timeline_id,
|
||||
summary.key_range,
|
||||
summary.lsn_range,
|
||||
metadata.len(),
|
||||
@@ -505,8 +488,6 @@ impl DeltaLayerWriterInner {
|
||||
// set inner.file here. The first read will have to re-open it.
|
||||
|
||||
let desc = PersistentLayerDesc::new_delta(
|
||||
self.tenant_shard_id,
|
||||
self.timeline_id,
|
||||
self.key_start..key_end,
|
||||
self.lsn_range.clone(),
|
||||
metadata.len(),
|
||||
@@ -517,7 +498,7 @@ impl DeltaLayerWriterInner {
|
||||
|
||||
let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
|
||||
|
||||
trace!("created delta layer {}", layer.local_path());
|
||||
trace!("created delta layer {}", self.path);
|
||||
|
||||
Ok(layer)
|
||||
}
|
||||
|
||||
@@ -85,17 +85,6 @@ pub struct Summary {
|
||||
// the 'values' part starts after the summary header, on block 1.
|
||||
}
|
||||
|
||||
impl From<&ImageLayer> for Summary {
|
||||
fn from(layer: &ImageLayer) -> Self {
|
||||
Self::expected(
|
||||
layer.desc.tenant_shard_id.tenant_id,
|
||||
layer.desc.timeline_id,
|
||||
layer.desc.key_range.clone(),
|
||||
layer.lsn,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl Summary {
|
||||
pub(super) fn expected(
|
||||
tenant_id: TenantId,
|
||||
@@ -278,19 +267,9 @@ impl ImageLayer {
|
||||
.metadata()
|
||||
.context("get file metadata to determine size")?;
|
||||
|
||||
// TODO(sharding): we should get TenantShardId from path.
|
||||
// OR, not at all: any layer we load from disk should also get reconciled with remote IndexPart.
|
||||
let tenant_shard_id = TenantShardId::unsharded(summary.tenant_id);
|
||||
|
||||
Ok(ImageLayer {
|
||||
path: path.to_path_buf(),
|
||||
desc: PersistentLayerDesc::new_img(
|
||||
tenant_shard_id,
|
||||
summary.timeline_id,
|
||||
summary.key_range,
|
||||
summary.lsn,
|
||||
metadata.len(),
|
||||
), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
|
||||
desc: PersistentLayerDesc::new_img(summary.key_range, summary.lsn, metadata.len()), // Now we assume image layer ALWAYS covers the full range. This may change in the future.
|
||||
lsn: summary.lsn,
|
||||
access_stats: LayerAccessStats::empty_will_record_residence_event_later(),
|
||||
inner: OnceCell::new(),
|
||||
@@ -581,13 +560,7 @@ impl ImageLayerWriterInner {
|
||||
.await
|
||||
.context("get metadata to determine file size")?;
|
||||
|
||||
let desc = PersistentLayerDesc::new_img(
|
||||
self.tenant_shard_id,
|
||||
self.timeline_id,
|
||||
self.key_range.clone(),
|
||||
self.lsn,
|
||||
metadata.len(),
|
||||
);
|
||||
let desc = PersistentLayerDesc::new_img(self.key_range.clone(), self.lsn, metadata.len());
|
||||
|
||||
// Note: Because we open the file in write-only mode, we cannot
|
||||
// reuse the same VirtualFile for reading later. That's why we don't
|
||||
@@ -599,7 +572,7 @@ impl ImageLayerWriterInner {
|
||||
// FIXME: why not carry the virtualfile here, it supports renaming?
|
||||
let layer = Layer::finish_creating(self.conf, timeline, desc, &self.path)?;
|
||||
|
||||
trace!("created image layer {}", layer.local_path());
|
||||
trace!("created image layer {}", self.path);
|
||||
|
||||
Ok(layer)
|
||||
}
|
||||
|
||||
@@ -3,13 +3,15 @@ use camino::{Utf8Path, Utf8PathBuf};
|
||||
use pageserver_api::models::{
|
||||
HistoricLayerInfo, LayerAccessKind, LayerResidenceEventReason, LayerResidenceStatus,
|
||||
};
|
||||
use pageserver_api::shard::ShardIndex;
|
||||
use pageserver_api::shard::{ShardIndex, TenantShardId};
|
||||
use std::ops::Range;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Weak};
|
||||
use std::time::SystemTime;
|
||||
use tracing::Instrument;
|
||||
use utils::id::TimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::sync::gate::GateError;
|
||||
use utils::sync::heavier_once_cell;
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
@@ -81,12 +83,7 @@ impl Layer {
|
||||
file_name: LayerFileName,
|
||||
metadata: LayerFileMetadata,
|
||||
) -> Self {
|
||||
let desc = PersistentLayerDesc::from_filename(
|
||||
timeline.tenant_shard_id,
|
||||
timeline.timeline_id,
|
||||
file_name,
|
||||
metadata.file_size(),
|
||||
);
|
||||
let desc = PersistentLayerDesc::from_filename(file_name, metadata.file_size());
|
||||
|
||||
let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted);
|
||||
|
||||
@@ -100,7 +97,7 @@ impl Layer {
|
||||
metadata.shard,
|
||||
)));
|
||||
|
||||
debug_assert!(owner.0.needs_download_blocking().unwrap().is_some());
|
||||
debug_assert!(owner.0.needs_download_blocking(timeline).unwrap().is_some());
|
||||
|
||||
owner
|
||||
}
|
||||
@@ -112,12 +109,7 @@ impl Layer {
|
||||
file_name: LayerFileName,
|
||||
metadata: LayerFileMetadata,
|
||||
) -> ResidentLayer {
|
||||
let desc = PersistentLayerDesc::from_filename(
|
||||
timeline.tenant_shard_id,
|
||||
timeline.timeline_id,
|
||||
file_name,
|
||||
metadata.file_size(),
|
||||
);
|
||||
let desc = PersistentLayerDesc::from_filename(file_name, metadata.file_size());
|
||||
|
||||
let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident);
|
||||
|
||||
@@ -144,7 +136,7 @@ impl Layer {
|
||||
|
||||
let downloaded = resident.expect("just initialized");
|
||||
|
||||
debug_assert!(owner.0.needs_download_blocking().unwrap().is_none());
|
||||
debug_assert!(owner.0.needs_download_blocking(timeline).unwrap().is_none());
|
||||
|
||||
timeline
|
||||
.metrics
|
||||
@@ -189,7 +181,7 @@ impl Layer {
|
||||
let downloaded = resident.expect("just initialized");
|
||||
|
||||
// if the rename works, the path is as expected
|
||||
std::fs::rename(temp_path, owner.local_path())
|
||||
std::fs::rename(temp_path, owner.local_path(timeline))
|
||||
.with_context(|| format!("rename temporary file as correct path for {owner}"))?;
|
||||
|
||||
Ok(ResidentLayer { downloaded, owner })
|
||||
@@ -309,8 +301,12 @@ impl Layer {
|
||||
&self.0.access_stats
|
||||
}
|
||||
|
||||
pub(crate) fn local_path(&self) -> &Utf8Path {
|
||||
&self.0.path
|
||||
fn local_path(&self, timeline: &Timeline) -> Utf8PathBuf {
|
||||
self.0.local_path(timeline)
|
||||
}
|
||||
|
||||
pub(crate) fn filename(&self) -> LayerFileName {
|
||||
self.0.desc.filename()
|
||||
}
|
||||
|
||||
pub(crate) fn metadata(&self) -> LayerFileMetadata {
|
||||
@@ -402,13 +398,9 @@ impl ResidentOrWantedEvicted {
|
||||
}
|
||||
|
||||
struct LayerInner {
|
||||
/// Only needed to check ondemand_download_behavior_treat_error_as_warn and creation of
|
||||
/// [`Self::path`].
|
||||
/// Only needed to check ondemand_download_behavior_treat_error_as_warn and in [`Self::local_path_from_id`]
|
||||
conf: &'static PageServerConf,
|
||||
|
||||
/// Full path to the file; unclear if this should exist anymore.
|
||||
path: Utf8PathBuf,
|
||||
|
||||
desc: PersistentLayerDesc,
|
||||
|
||||
/// Timeline access is needed for remote timeline client and metrics.
|
||||
@@ -486,12 +478,32 @@ impl Drop for LayerInner {
|
||||
return;
|
||||
}
|
||||
|
||||
let span = tracing::info_span!(parent: None, "layer_delete", tenant_id = %self.layer_desc().tenant_shard_id.tenant_id, shard_id=%self.layer_desc().tenant_shard_id.shard_slug(), timeline_id = %self.layer_desc().timeline_id);
|
||||
// We will only do I/O on drop if our Timeline still exists. Otherwise, we may safely
|
||||
// leave garbage layers behind to be cleaned up the next time this Timeline is instantiated.
|
||||
let Some(timeline) = self.timeline.upgrade() else {
|
||||
// no need to nag that timeline is gone: under normal situation on
|
||||
// task_mgr::remove_tenant_from_memory the timeline is gone before we get dropped.
|
||||
LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::TimelineGone);
|
||||
return;
|
||||
};
|
||||
|
||||
// We will only do I/O during drop if our Timeline's layer_gate is open: this avoids
|
||||
// the risk that we would race with Timeline::shutdown and end up doing I/O to a timeline
|
||||
// path for which the Timeline object has been torn down already.
|
||||
let _gate_guard = match timeline.layer_gate.enter() {
|
||||
Ok(g) => g,
|
||||
Err(GateError::GateClosed) => {
|
||||
LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::TimelineGone);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// If timeline is alive, we can construct a span with IDs for this function.
|
||||
let span = tracing::info_span!(parent: None, "layer_delete", tenant_id = %timeline.tenant_shard_id.tenant_id, shard_id=%timeline.tenant_shard_id.shard_slug(), timeline_id = %timeline.timeline_id);
|
||||
let path = self.local_path(&timeline);
|
||||
|
||||
let path = std::mem::take(&mut self.path);
|
||||
let file_name = self.layer_desc().filename();
|
||||
let file_size = self.layer_desc().file_size;
|
||||
let timeline = self.timeline.clone();
|
||||
let meta = self.metadata();
|
||||
let status = self.status.clone();
|
||||
|
||||
@@ -519,32 +531,26 @@ impl Drop for LayerInner {
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(timeline) = timeline.upgrade() {
|
||||
if removed {
|
||||
timeline.metrics.resident_physical_size_sub(file_size);
|
||||
}
|
||||
if let Some(remote_client) = timeline.remote_client.as_ref() {
|
||||
let res = remote_client.schedule_deletion_of_unlinked(vec![(file_name, meta)]);
|
||||
if removed {
|
||||
timeline.metrics.resident_physical_size_sub(file_size);
|
||||
}
|
||||
if let Some(remote_client) = timeline.remote_client.as_ref() {
|
||||
let res = remote_client.schedule_deletion_of_unlinked(vec![(file_name, meta)]);
|
||||
|
||||
if let Err(e) = res {
|
||||
// test_timeline_deletion_with_files_stuck_in_upload_queue is good at
|
||||
// demonstrating this deadlock (without spawn_blocking): stop will drop
|
||||
// queued items, which will have ResidentLayer's, and those drops would try
|
||||
// to re-entrantly lock the RemoteTimelineClient inner state.
|
||||
if !timeline.is_active() {
|
||||
tracing::info!("scheduling deletion on drop failed: {e:#}");
|
||||
} else {
|
||||
tracing::warn!("scheduling deletion on drop failed: {e:#}");
|
||||
}
|
||||
LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::DeleteSchedulingFailed);
|
||||
if let Err(e) = res {
|
||||
// test_timeline_deletion_with_files_stuck_in_upload_queue is good at
|
||||
// demonstrating this deadlock (without spawn_blocking): stop will drop
|
||||
// queued items, which will have ResidentLayer's, and those drops would try
|
||||
// to re-entrantly lock the RemoteTimelineClient inner state.
|
||||
if !timeline.is_active() {
|
||||
tracing::info!("scheduling deletion on drop failed: {e:#}");
|
||||
} else {
|
||||
LAYER_IMPL_METRICS.inc_completed_deletes();
|
||||
tracing::warn!("scheduling deletion on drop failed: {e:#}");
|
||||
}
|
||||
LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::DeleteSchedulingFailed);
|
||||
} else {
|
||||
LAYER_IMPL_METRICS.inc_completed_deletes();
|
||||
}
|
||||
} else {
|
||||
// no need to nag that timeline is gone: under normal situation on
|
||||
// task_mgr::remove_tenant_from_memory the timeline is gone before we get dropped.
|
||||
LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::TimelineGone);
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -560,10 +566,6 @@ impl LayerInner {
|
||||
generation: Generation,
|
||||
shard: ShardIndex,
|
||||
) -> Self {
|
||||
let path = conf
|
||||
.timeline_path(&timeline.tenant_shard_id, &timeline.timeline_id)
|
||||
.join(desc.filename().to_string());
|
||||
|
||||
let (inner, version) = if let Some(inner) = downloaded {
|
||||
let version = inner.version;
|
||||
let resident = ResidentOrWantedEvicted::Resident(inner);
|
||||
@@ -574,7 +576,6 @@ impl LayerInner {
|
||||
|
||||
LayerInner {
|
||||
conf,
|
||||
path,
|
||||
desc,
|
||||
timeline: Arc::downgrade(timeline),
|
||||
have_remote_client: timeline.remote_client.is_some(),
|
||||
@@ -590,6 +591,25 @@ impl LayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
/// All call sites that need this function should already have a Timeline (e.g. from
|
||||
/// upgrading the Self::timeline weak pointer) -- it doesn't make sense to try and
|
||||
/// do anything with the local file if the Timeline isn't still alive.
|
||||
fn local_path(&self, timeline: &Timeline) -> Utf8PathBuf {
|
||||
self.local_path_from_id(&timeline.tenant_shard_id, &timeline.timeline_id)
|
||||
}
|
||||
|
||||
/// Use this instead of `local_path` if you don't have a Timeline but do have its ID: this
|
||||
/// is used by external callers such as [`crate::tenant::RemoteTimelineClient`]
|
||||
pub(crate) fn local_path_from_id(
|
||||
&self,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
) -> Utf8PathBuf {
|
||||
self.conf
|
||||
.timeline_path(tenant_shard_id, timeline_id)
|
||||
.join(self.desc.filename().to_string())
|
||||
}
|
||||
|
||||
fn delete_on_drop(&self) {
|
||||
let res =
|
||||
self.wanted_deleted
|
||||
@@ -683,7 +703,7 @@ impl LayerInner {
|
||||
// check if we really need to be downloaded; could have been already downloaded by a
|
||||
// cancelled previous attempt.
|
||||
let needs_download = self
|
||||
.needs_download()
|
||||
.needs_download(&timeline)
|
||||
.await
|
||||
.map_err(DownloadError::PreStatFailed)?;
|
||||
|
||||
@@ -833,12 +853,13 @@ impl LayerInner {
|
||||
// block tenant::mgr::remove_tenant_from_memory.
|
||||
|
||||
let this: Arc<Self> = self.clone();
|
||||
let timeline_clone = timeline.clone();
|
||||
|
||||
crate::task_mgr::spawn(
|
||||
&tokio::runtime::Handle::current(),
|
||||
crate::task_mgr::TaskKind::RemoteDownloadTask,
|
||||
Some(self.desc.tenant_shard_id.tenant_id),
|
||||
Some(self.desc.timeline_id),
|
||||
Some(timeline.tenant_shard_id.tenant_id),
|
||||
Some(timeline.timeline_id),
|
||||
&task_name,
|
||||
false,
|
||||
async move {
|
||||
@@ -894,7 +915,7 @@ impl LayerInner {
|
||||
match rx.await {
|
||||
Ok((Ok(()), permit)) => {
|
||||
if let Some(reason) = self
|
||||
.needs_download()
|
||||
.needs_download(&timeline_clone)
|
||||
.await
|
||||
.map_err(DownloadError::PostStatFailed)?
|
||||
{
|
||||
@@ -929,16 +950,26 @@ impl LayerInner {
|
||||
}
|
||||
}
|
||||
|
||||
async fn needs_download(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
|
||||
match tokio::fs::metadata(&self.path).await {
|
||||
async fn needs_download(
|
||||
&self,
|
||||
timeline: &Timeline,
|
||||
) -> Result<Option<NeedsDownload>, std::io::Error> {
|
||||
let path = self.local_path(timeline);
|
||||
|
||||
match tokio::fs::metadata(path).await {
|
||||
Ok(m) => Ok(self.is_file_present_and_good_size(&m).err()),
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Some(NeedsDownload::NotFound)),
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
}
|
||||
|
||||
fn needs_download_blocking(&self) -> Result<Option<NeedsDownload>, std::io::Error> {
|
||||
match self.path.metadata() {
|
||||
fn needs_download_blocking(
|
||||
&self,
|
||||
timeline: &Timeline,
|
||||
) -> Result<Option<NeedsDownload>, std::io::Error> {
|
||||
let path = self.local_path(timeline);
|
||||
|
||||
match path.metadata() {
|
||||
Ok(m) => Ok(self.is_file_present_and_good_size(&m).err()),
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Some(NeedsDownload::NotFound)),
|
||||
Err(e) => Err(e),
|
||||
@@ -1004,7 +1035,10 @@ impl LayerInner {
|
||||
//
|
||||
// FIXME: this is not true anymore, we can safely evict wanted deleted files.
|
||||
} else if can_evict && evict {
|
||||
let span = tracing::info_span!(parent: None, "layer_evict", tenant_id = %self.desc.tenant_shard_id.tenant_id, shard_id = %self.desc.tenant_shard_id.shard_slug(), timeline_id = %self.desc.timeline_id, layer=%self, %version);
|
||||
// If timeline is alive, we can construct a span with IDs for this function.
|
||||
let span = self.timeline.upgrade().map(|timeline| {
|
||||
tracing::info_span!(parent: None, "layer_evict", tenant_id = %timeline.tenant_shard_id.tenant_id, shard_id=%timeline.tenant_shard_id.shard_slug(), timeline_id = %timeline.timeline_id)
|
||||
});
|
||||
|
||||
// downgrade for queueing, in case there's a tear down already ongoing we should not
|
||||
// hold it alive.
|
||||
@@ -1015,7 +1049,7 @@ impl LayerInner {
|
||||
// drop while the `self.inner` is being locked, leading to a deadlock.
|
||||
|
||||
crate::task_mgr::BACKGROUND_RUNTIME.spawn_blocking(move || {
|
||||
let _g = span.entered();
|
||||
let _g = span.map(|s| s.entered());
|
||||
|
||||
// if LayerInner is already dropped here, do nothing because the delete on drop
|
||||
// has already ran while we were in queue
|
||||
@@ -1075,7 +1109,9 @@ impl LayerInner {
|
||||
LayerResidenceEventReason::ResidenceChange,
|
||||
);
|
||||
|
||||
let res = match capture_mtime_and_remove(&self.path) {
|
||||
let local_path = self.local_path(&timeline);
|
||||
|
||||
let res = match capture_mtime_and_remove(&local_path) {
|
||||
Ok(local_layer_mtime) => {
|
||||
let duration = SystemTime::now().duration_since(local_layer_mtime);
|
||||
match duration {
|
||||
@@ -1227,6 +1263,11 @@ impl DownloadedLayer {
|
||||
owner: &Arc<LayerInner>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<&'a LayerKind> {
|
||||
let timeline = owner
|
||||
.timeline
|
||||
.upgrade()
|
||||
.ok_or(DownloadError::TimelineShutdown)?;
|
||||
|
||||
let init = || async {
|
||||
assert_eq!(
|
||||
Weak::as_ptr(&self.owner),
|
||||
@@ -1236,23 +1277,23 @@ impl DownloadedLayer {
|
||||
|
||||
let res = if owner.desc.is_delta {
|
||||
let summary = Some(delta_layer::Summary::expected(
|
||||
owner.desc.tenant_shard_id.tenant_id,
|
||||
owner.desc.timeline_id,
|
||||
timeline.tenant_shard_id.tenant_id,
|
||||
timeline.timeline_id,
|
||||
owner.desc.key_range.clone(),
|
||||
owner.desc.lsn_range.clone(),
|
||||
));
|
||||
delta_layer::DeltaLayerInner::load(&owner.path, summary, ctx)
|
||||
delta_layer::DeltaLayerInner::load(&owner.local_path(&timeline), summary, ctx)
|
||||
.await
|
||||
.map(|res| res.map(LayerKind::Delta))
|
||||
} else {
|
||||
let lsn = owner.desc.image_layer_lsn();
|
||||
let summary = Some(image_layer::Summary::expected(
|
||||
owner.desc.tenant_shard_id.tenant_id,
|
||||
owner.desc.timeline_id,
|
||||
timeline.tenant_shard_id.tenant_id,
|
||||
timeline.timeline_id,
|
||||
owner.desc.key_range.clone(),
|
||||
lsn,
|
||||
));
|
||||
image_layer::ImageLayerInner::load(&owner.path, lsn, summary, ctx)
|
||||
image_layer::ImageLayerInner::load(&owner.local_path(&timeline), lsn, summary, ctx)
|
||||
.await
|
||||
.map(|res| res.map(LayerKind::Image))
|
||||
};
|
||||
@@ -1376,8 +1417,14 @@ impl ResidentLayer {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn local_path(&self) -> &Utf8Path {
|
||||
&self.owner.0.path
|
||||
pub(crate) fn local_path_from_id(
|
||||
&self,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
) -> Utf8PathBuf {
|
||||
self.owner
|
||||
.0
|
||||
.local_path_from_id(tenant_shard_id, timeline_id)
|
||||
}
|
||||
|
||||
pub(crate) fn access_stats(&self) -> &LayerAccessStats {
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use core::fmt::Display;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::ops::Range;
|
||||
use utils::{id::TimelineId, lsn::Lsn};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::repository::Key;
|
||||
|
||||
@@ -9,16 +8,11 @@ use super::{DeltaFileName, ImageFileName, LayerFileName};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[cfg(test)]
|
||||
use utils::id::TenantId;
|
||||
|
||||
/// A unique identifier of a persistent layer. This is different from `LayerDescriptor`, which is only used in the
|
||||
/// benchmarks. This struct contains all necessary information to find the image / delta layer. It also provides
|
||||
/// a unified way to generate layer information like file name.
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
|
||||
pub struct PersistentLayerDesc {
|
||||
pub tenant_shard_id: TenantShardId,
|
||||
pub timeline_id: TimelineId,
|
||||
/// Range of keys that this layer covers
|
||||
pub key_range: Range<Key>,
|
||||
/// Inclusive start, exclusive end of the LSN range that this layer holds.
|
||||
@@ -57,8 +51,6 @@ impl PersistentLayerDesc {
|
||||
#[cfg(test)]
|
||||
pub fn new_test(key_range: Range<Key>) -> Self {
|
||||
Self {
|
||||
tenant_shard_id: TenantShardId::unsharded(TenantId::generate()),
|
||||
timeline_id: TimelineId::generate(),
|
||||
key_range,
|
||||
lsn_range: Lsn(0)..Lsn(1),
|
||||
is_delta: false,
|
||||
@@ -66,16 +58,8 @@ impl PersistentLayerDesc {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_img(
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
key_range: Range<Key>,
|
||||
lsn: Lsn,
|
||||
file_size: u64,
|
||||
) -> Self {
|
||||
pub fn new_img(key_range: Range<Key>, lsn: Lsn, file_size: u64) -> Self {
|
||||
Self {
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
key_range,
|
||||
lsn_range: Self::image_layer_lsn_range(lsn),
|
||||
is_delta: false,
|
||||
@@ -83,16 +67,8 @@ impl PersistentLayerDesc {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_delta(
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
key_range: Range<Key>,
|
||||
lsn_range: Range<Lsn>,
|
||||
file_size: u64,
|
||||
) -> Self {
|
||||
pub fn new_delta(key_range: Range<Key>, lsn_range: Range<Lsn>, file_size: u64) -> Self {
|
||||
Self {
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
key_range,
|
||||
lsn_range,
|
||||
is_delta: true,
|
||||
@@ -100,23 +76,10 @@ impl PersistentLayerDesc {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_filename(
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
filename: LayerFileName,
|
||||
file_size: u64,
|
||||
) -> Self {
|
||||
pub fn from_filename(filename: LayerFileName, file_size: u64) -> Self {
|
||||
match filename {
|
||||
LayerFileName::Image(i) => {
|
||||
Self::new_img(tenant_shard_id, timeline_id, i.key_range, i.lsn, file_size)
|
||||
}
|
||||
LayerFileName::Delta(d) => Self::new_delta(
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
d.key_range,
|
||||
d.lsn_range,
|
||||
file_size,
|
||||
),
|
||||
LayerFileName::Image(i) => Self::new_img(i.key_range, i.lsn, file_size),
|
||||
LayerFileName::Delta(d) => Self::new_delta(d.key_range, d.lsn_range, file_size),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -173,10 +136,6 @@ impl PersistentLayerDesc {
|
||||
self.key_range.clone()
|
||||
}
|
||||
|
||||
pub fn get_timeline_id(&self) -> TimelineId {
|
||||
self.timeline_id
|
||||
}
|
||||
|
||||
/// Does this layer only contain some data for the key-range (incremental),
|
||||
/// or does it contain a version of every page? This is important to know
|
||||
/// for garbage collecting old layers: an incremental layer depends on
|
||||
@@ -192,9 +151,7 @@ impl PersistentLayerDesc {
|
||||
pub fn dump(&self) {
|
||||
if self.is_delta {
|
||||
println!(
|
||||
"----- delta layer for ten {} tli {} keys {}-{} lsn {}-{} is_incremental {} size {} ----",
|
||||
self.tenant_shard_id,
|
||||
self.timeline_id,
|
||||
"----- delta layer keys {}-{} lsn {}-{} is_incremental {} size {} ----",
|
||||
self.key_range.start,
|
||||
self.key_range.end,
|
||||
self.lsn_range.start,
|
||||
@@ -204,9 +161,7 @@ impl PersistentLayerDesc {
|
||||
);
|
||||
} else {
|
||||
println!(
|
||||
"----- image layer for ten {} tli {} key {}-{} at {} is_incremental {} size {} ----",
|
||||
self.tenant_shard_id,
|
||||
self.timeline_id,
|
||||
"----- image layer key {}-{} at {} is_incremental {} size {} ----",
|
||||
self.key_range.start,
|
||||
self.key_range.end,
|
||||
self.image_layer_lsn(),
|
||||
|
||||
@@ -313,6 +313,10 @@ pub struct Timeline {
|
||||
/// Gate to prevent shutdown completing while I/O is still happening to this timeline's data
|
||||
pub(crate) gate: Gate,
|
||||
|
||||
/// Gate to prevent shutdown completing until all Layers for this Timeline have finished
|
||||
/// doing any background I/O such as deleting files on drop.
|
||||
pub(crate) layer_gate: Gate,
|
||||
|
||||
/// Cancellation token scoped to this timeline: anything doing long-running work relating
|
||||
/// to the timeline should drop out when this token fires.
|
||||
pub(crate) cancel: CancellationToken,
|
||||
@@ -1002,8 +1006,15 @@ impl Timeline {
|
||||
)
|
||||
.await;
|
||||
|
||||
// Finally wait until any gate-holders are complete
|
||||
// Wait until any normal gate-holders such as page_service requests are complete
|
||||
self.gate.close().await;
|
||||
|
||||
// Drop our references to layers: this should permit all layers to be dropped, and any I/O
|
||||
// in their drop() method to complete.
|
||||
self.layers.write().await.clear();
|
||||
|
||||
// Wait until any Layer gate holders such as LayerInner::drop are complete
|
||||
self.layer_gate.close().await;
|
||||
}
|
||||
|
||||
pub fn set_state(&self, new_state: TimelineState) {
|
||||
@@ -1445,6 +1456,7 @@ impl Timeline {
|
||||
|
||||
cancel,
|
||||
gate: Gate::new(format!("Timeline<{tenant_shard_id}/{timeline_id}>")),
|
||||
layer_gate: Gate::new(format!("TimelineLayers<{tenant_shard_id}/{timeline_id}>")),
|
||||
|
||||
compaction_lock: tokio::sync::Mutex::default(),
|
||||
gc_lock: tokio::sync::Mutex::default(),
|
||||
@@ -2176,7 +2188,7 @@ trait TraversalLayerExt {
|
||||
|
||||
impl TraversalLayerExt for Layer {
|
||||
fn traversal_id(&self) -> TraversalId {
|
||||
self.local_path().to_string()
|
||||
self.filename().to_string()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2890,7 +2902,8 @@ impl Timeline {
|
||||
let _g = span.entered();
|
||||
let new_delta =
|
||||
Handle::current().block_on(frozen_layer.write_to_disk(&self_clone, &ctx))?;
|
||||
let new_delta_path = new_delta.local_path().to_owned();
|
||||
let new_delta_path = new_delta
|
||||
.local_path_from_id(&self_clone.tenant_shard_id, &self_clone.timeline_id);
|
||||
|
||||
// Sync it to disk.
|
||||
//
|
||||
@@ -3134,7 +3147,7 @@ impl Timeline {
|
||||
// and fsync them all in parallel.
|
||||
let all_paths = image_layers
|
||||
.iter()
|
||||
.map(|layer| layer.local_path().to_owned())
|
||||
.map(|layer| layer.local_path_from_id(&self.tenant_shard_id, &self.timeline_id))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
par_fsync::par_fsync_async(&all_paths)
|
||||
@@ -3683,7 +3696,7 @@ impl Timeline {
|
||||
// FIXME: the writer already fsyncs all data, only rename needs to be fsynced here
|
||||
let layer_paths: Vec<Utf8PathBuf> = new_layers
|
||||
.iter()
|
||||
.map(|l| l.local_path().to_owned())
|
||||
.map(|l| l.local_path_from_id(&self.tenant_shard_id, &self.timeline_id))
|
||||
.collect();
|
||||
|
||||
// Fsync all the layer files and directory using multiple threads to
|
||||
|
||||
@@ -33,6 +33,11 @@ impl LayerManager {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn clear(&mut self) {
|
||||
self.layer_map = LayerMap::default();
|
||||
self.layer_fmgr.clear();
|
||||
}
|
||||
|
||||
pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
|
||||
self.layer_fmgr.get_from_desc(desc)
|
||||
}
|
||||
@@ -271,6 +276,10 @@ impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn clear(&mut self) {
|
||||
self.0.clear();
|
||||
}
|
||||
|
||||
pub(crate) fn contains(&self, layer: &T) -> bool {
|
||||
self.0.contains_key(&layer.layer_desc().key())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user