mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-13 08:22:55 +00:00
remove_remote_layer: uninteresting refactorings (#4936)
In the quest to solve #4745 by moving the download/evictedness to be internally mutable factor of a Layer and get rid of `trait PersistentLayer` at least for prod usage, `layer_removal_cs`, we present some misc cleanups. --------- Co-authored-by: Dmitry Rodionov <dmitry@neon.tech>
This commit is contained in:
@@ -304,17 +304,18 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
// Debug-log the list of candidates
|
||||
let now = SystemTime::now();
|
||||
for (i, (partition, candidate)) in candidates.iter().enumerate() {
|
||||
let desc = candidate.layer.layer_desc();
|
||||
debug!(
|
||||
"cand {}/{}: size={}, no_access_for={}us, partition={:?}, {}/{}/{}",
|
||||
i + 1,
|
||||
candidates.len(),
|
||||
candidate.layer.file_size(),
|
||||
desc.file_size,
|
||||
now.duration_since(candidate.last_activity_ts)
|
||||
.unwrap()
|
||||
.as_micros(),
|
||||
partition,
|
||||
candidate.layer.get_tenant_id(),
|
||||
candidate.layer.get_timeline_id(),
|
||||
desc.tenant_id,
|
||||
desc.timeline_id,
|
||||
candidate.layer,
|
||||
);
|
||||
}
|
||||
@@ -346,7 +347,7 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
warned = Some(usage_planned);
|
||||
}
|
||||
|
||||
usage_planned.add_available_bytes(candidate.layer.file_size());
|
||||
usage_planned.add_available_bytes(candidate.layer.layer_desc().file_size);
|
||||
|
||||
batched
|
||||
.entry(TimelineKey(candidate.timeline))
|
||||
@@ -389,15 +390,16 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
Ok(results) => {
|
||||
assert_eq!(results.len(), batch.len());
|
||||
for (result, layer) in results.into_iter().zip(batch.iter()) {
|
||||
let file_size = layer.layer_desc().file_size;
|
||||
match result {
|
||||
Some(Ok(())) => {
|
||||
usage_assumed.add_available_bytes(layer.file_size());
|
||||
usage_assumed.add_available_bytes(file_size);
|
||||
}
|
||||
Some(Err(EvictionError::CannotEvictRemoteLayer)) => {
|
||||
unreachable!("get_local_layers_for_disk_usage_eviction finds only local layers")
|
||||
}
|
||||
Some(Err(EvictionError::FileNotFound)) => {
|
||||
evictions_failed.file_sizes += layer.file_size();
|
||||
evictions_failed.file_sizes += file_size;
|
||||
evictions_failed.count += 1;
|
||||
}
|
||||
Some(Err(
|
||||
@@ -406,7 +408,7 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
)) => {
|
||||
let e = utils::error::report_compact_sources(&e);
|
||||
warn!(%layer, "failed to evict layer: {e}");
|
||||
evictions_failed.file_sizes += layer.file_size();
|
||||
evictions_failed.file_sizes += file_size;
|
||||
evictions_failed.count += 1;
|
||||
}
|
||||
None => {
|
||||
|
||||
@@ -121,7 +121,7 @@ impl BatchedUpdates<'_> {
|
||||
///
|
||||
/// This should be called when the corresponding file on disk has been deleted.
|
||||
///
|
||||
pub fn remove_historic(&mut self, layer_desc: PersistentLayerDesc) {
|
||||
pub fn remove_historic(&mut self, layer_desc: &PersistentLayerDesc) {
|
||||
self.layer_map.remove_historic_noflush(layer_desc)
|
||||
}
|
||||
|
||||
@@ -253,11 +253,11 @@ impl LayerMap {
|
||||
///
|
||||
/// Helper function for BatchedUpdates::remove_historic
|
||||
///
|
||||
pub fn remove_historic_noflush(&mut self, layer_desc: PersistentLayerDesc) {
|
||||
pub fn remove_historic_noflush(&mut self, layer_desc: &PersistentLayerDesc) {
|
||||
self.historic
|
||||
.remove(historic_layer_coverage::LayerKey::from(&layer_desc));
|
||||
.remove(historic_layer_coverage::LayerKey::from(layer_desc));
|
||||
let layer_key = layer_desc.key();
|
||||
if Self::is_l0(&layer_desc) {
|
||||
if Self::is_l0(layer_desc) {
|
||||
let len_before = self.l0_delta_layers.len();
|
||||
let mut l0_delta_layers = std::mem::take(&mut self.l0_delta_layers);
|
||||
l0_delta_layers.retain(|other| other.key() != layer_key);
|
||||
@@ -766,8 +766,7 @@ mod tests {
|
||||
expected_in_counts
|
||||
);
|
||||
|
||||
map.batch_update()
|
||||
.remove_historic(downloaded.layer_desc().clone());
|
||||
map.batch_update().remove_historic(downloaded.layer_desc());
|
||||
assert_eq!(count_layer_in(&map, downloaded.layer_desc()), (0, 0));
|
||||
}
|
||||
|
||||
|
||||
@@ -401,16 +401,6 @@ pub trait AsLayerDesc {
|
||||
/// An image layer is a snapshot of all the data in a key-range, at a single
|
||||
/// LSN.
|
||||
pub trait PersistentLayer: Layer + AsLayerDesc {
|
||||
/// Identify the tenant this layer belongs to
|
||||
fn get_tenant_id(&self) -> TenantId {
|
||||
self.layer_desc().tenant_id
|
||||
}
|
||||
|
||||
/// Identify the timeline this layer belongs to
|
||||
fn get_timeline_id(&self) -> TimelineId {
|
||||
self.layer_desc().timeline_id
|
||||
}
|
||||
|
||||
/// File name used for this layer, both in the pageserver's local filesystem
|
||||
/// state as well as in the remote storage.
|
||||
fn filename(&self) -> LayerFileName {
|
||||
@@ -436,14 +426,6 @@ pub trait PersistentLayer: Layer + AsLayerDesc {
|
||||
false
|
||||
}
|
||||
|
||||
/// Returns None if the layer file size is not known.
|
||||
///
|
||||
/// Should not change over the lifetime of the layer object because
|
||||
/// current_physical_size is computed as the som of this value.
|
||||
fn file_size(&self) -> u64 {
|
||||
self.layer_desc().file_size
|
||||
}
|
||||
|
||||
fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo;
|
||||
|
||||
fn access_stats(&self) -> &LayerAccessStats;
|
||||
|
||||
@@ -108,12 +108,10 @@ impl From<&DeltaLayer> for Summary {
|
||||
// Flag indicating that this version initialize the page
|
||||
const WILL_INIT: u64 = 1;
|
||||
|
||||
///
|
||||
/// Struct representing reference to BLOB in layers. Reference contains BLOB
|
||||
/// offset, and for WAL records it also contains `will_init` flag. The flag
|
||||
/// helps to determine the range of records that needs to be applied, without
|
||||
/// reading/deserializing records themselves.
|
||||
///
|
||||
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
|
||||
pub struct BlobRef(pub u64);
|
||||
|
||||
@@ -138,10 +136,8 @@ impl BlobRef {
|
||||
pub const DELTA_KEY_SIZE: usize = KEY_SIZE + 8;
|
||||
struct DeltaKey([u8; DELTA_KEY_SIZE]);
|
||||
|
||||
///
|
||||
/// This is the key of the B-tree index stored in the delta layer. It consists
|
||||
/// of the serialized representation of a Key and LSN.
|
||||
///
|
||||
impl DeltaKey {
|
||||
fn from_slice(buf: &[u8]) -> Self {
|
||||
let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
|
||||
|
||||
@@ -323,15 +323,10 @@ impl ImageLayer {
|
||||
) -> Result<&ImageLayerInner> {
|
||||
self.access_stats
|
||||
.record_access(access_kind, ctx.task_kind());
|
||||
loop {
|
||||
if let Some(inner) = self.inner.get() {
|
||||
return Ok(inner);
|
||||
}
|
||||
self.inner
|
||||
.get_or_try_init(|| self.load_inner())
|
||||
.await
|
||||
.with_context(|| format!("Failed to load image layer {}", self.path().display()))?;
|
||||
}
|
||||
self.inner
|
||||
.get_or_try_init(|| self.load_inner())
|
||||
.await
|
||||
.with_context(|| format!("Failed to load image layer {}", self.path().display()))
|
||||
}
|
||||
|
||||
async fn load_inner(&self) -> Result<ImageLayerInner> {
|
||||
|
||||
@@ -1160,7 +1160,7 @@ impl Timeline {
|
||||
return Err(EvictionError::CannotEvictRemoteLayer);
|
||||
}
|
||||
|
||||
let layer_file_size = local_layer.file_size();
|
||||
let layer_file_size = local_layer.layer_desc().file_size;
|
||||
|
||||
let local_layer_mtime = local_layer
|
||||
.local_path()
|
||||
@@ -1590,7 +1590,6 @@ impl Timeline {
|
||||
///
|
||||
pub(super) async fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
|
||||
let mut guard = self.layers.write().await;
|
||||
let mut num_layers = 0;
|
||||
|
||||
let timer = self.metrics.load_layer_map_histo.start_timer();
|
||||
|
||||
@@ -1608,12 +1607,12 @@ impl Timeline {
|
||||
let fname = direntry.file_name();
|
||||
let fname = fname.to_string_lossy();
|
||||
|
||||
if let Some(imgfilename) = ImageFileName::parse_str(&fname) {
|
||||
if let Some(filename) = ImageFileName::parse_str(&fname) {
|
||||
// create an ImageLayer struct for each image file.
|
||||
if imgfilename.lsn > disk_consistent_lsn {
|
||||
if filename.lsn > disk_consistent_lsn {
|
||||
info!(
|
||||
"found future image layer {} on timeline {} disk_consistent_lsn is {}",
|
||||
imgfilename, self.timeline_id, disk_consistent_lsn
|
||||
filename, self.timeline_id, disk_consistent_lsn
|
||||
);
|
||||
|
||||
rename_to_backup(&direntry_path)?;
|
||||
@@ -1621,31 +1620,31 @@ impl Timeline {
|
||||
}
|
||||
|
||||
let file_size = direntry_path.metadata()?.len();
|
||||
let stats =
|
||||
LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Resident);
|
||||
|
||||
let layer = ImageLayer::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_id,
|
||||
&imgfilename,
|
||||
&filename,
|
||||
file_size,
|
||||
LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Resident),
|
||||
stats,
|
||||
);
|
||||
|
||||
trace!("found layer {}", layer.path().display());
|
||||
total_physical_size += file_size;
|
||||
loaded_layers.push(Arc::new(layer));
|
||||
num_layers += 1;
|
||||
} else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) {
|
||||
} else if let Some(filename) = DeltaFileName::parse_str(&fname) {
|
||||
// Create a DeltaLayer struct for each delta file.
|
||||
// The end-LSN is exclusive, while disk_consistent_lsn is
|
||||
// inclusive. For example, if disk_consistent_lsn is 100, it is
|
||||
// OK for a delta layer to have end LSN 101, but if the end LSN
|
||||
// is 102, then it might not have been fully flushed to disk
|
||||
// before crash.
|
||||
if deltafilename.lsn_range.end > disk_consistent_lsn + 1 {
|
||||
if filename.lsn_range.end > disk_consistent_lsn + 1 {
|
||||
info!(
|
||||
"found future delta layer {} on timeline {} disk_consistent_lsn is {}",
|
||||
deltafilename, self.timeline_id, disk_consistent_lsn
|
||||
filename, self.timeline_id, disk_consistent_lsn
|
||||
);
|
||||
|
||||
rename_to_backup(&direntry_path)?;
|
||||
@@ -1653,20 +1652,20 @@ impl Timeline {
|
||||
}
|
||||
|
||||
let file_size = direntry_path.metadata()?.len();
|
||||
let stats =
|
||||
LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Resident);
|
||||
|
||||
let layer = DeltaLayer::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_id,
|
||||
&deltafilename,
|
||||
&filename,
|
||||
file_size,
|
||||
LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Resident),
|
||||
stats,
|
||||
);
|
||||
|
||||
trace!("found layer {}", layer.path().display());
|
||||
total_physical_size += file_size;
|
||||
loaded_layers.push(Arc::new(layer));
|
||||
num_layers += 1;
|
||||
} else if fname == METADATA_FILE_NAME || fname.ends_with(".old") {
|
||||
// ignore these
|
||||
} else if remote_timeline_client::is_temp_download_file(&direntry_path) {
|
||||
@@ -1691,6 +1690,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
let num_layers = loaded_layers.len();
|
||||
guard.initialize_local_layers(loaded_layers, Lsn(disk_consistent_lsn.0) + 1);
|
||||
|
||||
info!(
|
||||
@@ -1791,13 +1791,15 @@ impl Timeline {
|
||||
);
|
||||
continue;
|
||||
}
|
||||
let stats =
|
||||
LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Evicted);
|
||||
|
||||
let remote_layer = RemoteLayer::new_img(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
imgfilename,
|
||||
&remote_layer_metadata,
|
||||
LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Evicted),
|
||||
stats,
|
||||
);
|
||||
let remote_layer = Arc::new(remote_layer);
|
||||
added_remote_layers.push(remote_layer);
|
||||
@@ -1816,12 +1818,15 @@ impl Timeline {
|
||||
);
|
||||
continue;
|
||||
}
|
||||
let stats =
|
||||
LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Evicted);
|
||||
|
||||
let remote_layer = RemoteLayer::new_delta(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
deltafilename,
|
||||
&remote_layer_metadata,
|
||||
LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Evicted),
|
||||
stats,
|
||||
);
|
||||
let remote_layer = Arc::new(remote_layer);
|
||||
added_remote_layers.push(remote_layer);
|
||||
@@ -2269,15 +2274,16 @@ trait TraversalLayerExt {
|
||||
|
||||
impl TraversalLayerExt for Arc<dyn PersistentLayer> {
|
||||
fn traversal_id(&self) -> TraversalId {
|
||||
let timeline_id = self.layer_desc().timeline_id;
|
||||
match self.local_path() {
|
||||
Some(local_path) => {
|
||||
debug_assert!(local_path.to_str().unwrap().contains(&format!("{}", self.get_timeline_id())),
|
||||
debug_assert!(local_path.to_str().unwrap().contains(&format!("{}", timeline_id)),
|
||||
"need timeline ID to uniquely identify the layer when traversal crosses ancestor boundary",
|
||||
);
|
||||
format!("{}", local_path.display())
|
||||
}
|
||||
None => {
|
||||
format!("remote {}/{self}", self.get_timeline_id())
|
||||
format!("remote {}/{self}", timeline_id)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2813,7 +2819,10 @@ impl Timeline {
|
||||
// We will remove frozen layer and add delta layer in one atomic operation later.
|
||||
let layer = self.create_delta_layer(&frozen_layer).await?;
|
||||
(
|
||||
HashMap::from([(layer.filename(), LayerFileMetadata::new(layer.file_size()))]),
|
||||
HashMap::from([(
|
||||
layer.filename(),
|
||||
LayerFileMetadata::new(layer.layer_desc().file_size),
|
||||
)]),
|
||||
Some(layer),
|
||||
)
|
||||
};
|
||||
@@ -2833,7 +2842,7 @@ impl Timeline {
|
||||
);
|
||||
|
||||
// update metrics
|
||||
let sz = l.file_size();
|
||||
let sz = l.layer_desc().file_size;
|
||||
self.metrics.resident_physical_size_gauge.add(sz);
|
||||
self.metrics.num_persistent_files_created.inc_by(1);
|
||||
self.metrics.persistent_bytes_written.inc_by(sz);
|
||||
@@ -3452,14 +3461,14 @@ impl Timeline {
|
||||
// "gaps" in the sequence of level 0 files should only happen in case
|
||||
// of a crash, partial download from cloud storage, or something like
|
||||
// that, so it's not a big deal in practice.
|
||||
level0_deltas.sort_by_key(|l| l.get_lsn_range().start);
|
||||
level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start);
|
||||
let mut level0_deltas_iter = level0_deltas.iter();
|
||||
|
||||
let first_level0_delta = level0_deltas_iter.next().unwrap();
|
||||
let mut prev_lsn_end = first_level0_delta.get_lsn_range().end;
|
||||
let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end;
|
||||
let mut deltas_to_compact = vec![Arc::clone(first_level0_delta)];
|
||||
for l in level0_deltas_iter {
|
||||
let lsn_range = l.get_lsn_range();
|
||||
let lsn_range = &l.layer_desc().lsn_range;
|
||||
|
||||
if lsn_range.start != prev_lsn_end {
|
||||
break;
|
||||
@@ -3468,8 +3477,13 @@ impl Timeline {
|
||||
prev_lsn_end = lsn_range.end;
|
||||
}
|
||||
let lsn_range = Range {
|
||||
start: deltas_to_compact.first().unwrap().get_lsn_range().start,
|
||||
end: deltas_to_compact.last().unwrap().get_lsn_range().end,
|
||||
start: deltas_to_compact
|
||||
.first()
|
||||
.unwrap()
|
||||
.layer_desc()
|
||||
.lsn_range
|
||||
.start,
|
||||
end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end,
|
||||
};
|
||||
|
||||
let remotes = deltas_to_compact
|
||||
@@ -3521,33 +3535,22 @@ impl Timeline {
|
||||
let mut prev: Option<Key> = None;
|
||||
|
||||
let mut all_value_refs = Vec::new();
|
||||
let mut all_keys = Vec::new();
|
||||
|
||||
for l in deltas_to_compact.iter() {
|
||||
// TODO: replace this with an await once we fully go async
|
||||
all_value_refs.extend(
|
||||
Handle::current().block_on(
|
||||
l.clone()
|
||||
.downcast_delta_layer()
|
||||
.expect("delta layer")
|
||||
.load_val_refs(ctx),
|
||||
)?,
|
||||
);
|
||||
let delta = l.clone().downcast_delta_layer().expect("delta layer");
|
||||
Handle::current().block_on(async {
|
||||
all_value_refs.extend(delta.load_val_refs(ctx).await?);
|
||||
all_keys.extend(delta.load_keys(ctx).await?);
|
||||
anyhow::Ok(())
|
||||
})?;
|
||||
}
|
||||
|
||||
// The current stdlib sorting implementation is designed in a way where it is
|
||||
// particularly fast where the slice is made up of sorted sub-ranges.
|
||||
all_value_refs.sort_by_key(|(key, lsn, _value_ref)| (*key, *lsn));
|
||||
|
||||
let mut all_keys = Vec::new();
|
||||
for l in deltas_to_compact.iter() {
|
||||
// TODO: replace this with an await once we fully go async
|
||||
all_keys.extend(
|
||||
Handle::current().block_on(
|
||||
l.clone()
|
||||
.downcast_delta_layer()
|
||||
.expect("delta layer")
|
||||
.load_keys(ctx),
|
||||
)?,
|
||||
);
|
||||
}
|
||||
// The current stdlib sorting implementation is designed in a way where it is
|
||||
// particularly fast where the slice is made up of sorted sub-ranges.
|
||||
all_keys.sort_by_key(|(key, lsn, _size)| (*key, *lsn));
|
||||
@@ -4656,7 +4659,7 @@ impl std::fmt::Debug for LocalLayerInfoForDiskUsageEviction {
|
||||
|
||||
impl LocalLayerInfoForDiskUsageEviction {
|
||||
pub fn file_size(&self) -> u64 {
|
||||
self.layer.file_size()
|
||||
self.layer.layer_desc().file_size
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -120,10 +120,9 @@ impl LayerManager {
|
||||
|
||||
ensure!(
|
||||
lsn > last_record_lsn,
|
||||
"cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})\n{}",
|
||||
"cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})",
|
||||
lsn,
|
||||
last_record_lsn,
|
||||
std::backtrace::Backtrace::force_capture(),
|
||||
);
|
||||
|
||||
// Do we have a layer open for writing already?
|
||||
@@ -278,7 +277,7 @@ impl LayerManager {
|
||||
updates: &mut BatchedUpdates<'_>,
|
||||
mapping: &mut LayerFileManager,
|
||||
) {
|
||||
updates.remove_historic(layer.layer_desc().clone());
|
||||
updates.remove_historic(layer.layer_desc());
|
||||
mapping.remove(layer);
|
||||
}
|
||||
|
||||
@@ -292,10 +291,10 @@ impl LayerManager {
|
||||
metrics: &TimelineMetrics,
|
||||
mapping: &mut LayerFileManager,
|
||||
) -> anyhow::Result<()> {
|
||||
let desc = layer.layer_desc();
|
||||
if !layer.is_remote_layer() {
|
||||
layer.delete_resident_layer_file()?;
|
||||
let layer_file_size = layer.file_size();
|
||||
metrics.resident_physical_size_gauge.sub(layer_file_size);
|
||||
metrics.resident_physical_size_gauge.sub(desc.file_size);
|
||||
}
|
||||
|
||||
// TODO Removing from the bottom of the layer map is expensive.
|
||||
@@ -303,7 +302,7 @@ impl LayerManager {
|
||||
// won't be needed for page reconstruction for this timeline,
|
||||
// and mark what we can't delete yet as deleted from the layer
|
||||
// map index without actually rebuilding the index.
|
||||
updates.remove_historic(layer.layer_desc().clone());
|
||||
updates.remove_historic(desc);
|
||||
mapping.remove(layer);
|
||||
|
||||
Ok(())
|
||||
|
||||
Reference in New Issue
Block a user