Compare commits

..

11 Commits

Author SHA1 Message Date
Alex Chi
5da45b182d add comments 2023-06-05 13:27:00 -04:00
Alex Chi
b23e085efb fix test
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-01 14:28:52 -04:00
Alex Chi
e4c2713dc6 fix eviction policy parsing
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-01 13:56:44 -04:00
Alex Chi
d70fcbd007 Merge branch 'main' of https://github.com/neondatabase/neon into skyzh/cli-parse-reject 2023-06-01 13:24:15 -04:00
Joonas Koivunen
36fee50f4d compute_ctl: enable tracing panic hook (#4375)
compute_ctl can panic, but `tracing` is used for logging. panic stderr
output can interleave with messages from normal logging. The fix is to
use the established way (pageserver, safekeeper, storage_broker) of using
`tracing` to report panics.
2023-06-01 20:12:07 +03:00
bojanserafimov
330083638f Fix stale and misleading comment in LayerMap (#4297) 2023-06-01 05:04:46 +03:00
Konstantin Knizhnik
952d6e43a2 Add pageserver parameter forced_image_creation_limit which can be used… (#4353)
This parameter can be use to restrict number of image layers generated
because of GC request (wanted image layers).
Been set to zero it completely eliminates creation of such image layers.
So it allows to avoid extra storage consumption after merging #3673

## Problem
PR #3673 forces generation of missed image layers. So i short term is
cause cause increase (in worst case up to two times) size of storage.
It was intended (by me) that GC period is comparable with PiTR interval.
But looks like it is not the case now - GC is performed much more
frequently. It may cause the problem with space exhaustion: GC forces
new image creation while large PiTR still prevent GC from collecting old
layers.

## Summary of changes

Add new pageserver parameter` forced_image_creation_limit` which
restrict number of created image layers which are requested by GC.

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist
2023-05-31 21:37:20 +03:00
bojanserafimov
b6447462dc Fix layer map correctness bug (#4342) 2023-05-31 12:23:00 -04:00
Alex Chi
b68c22be81 value parser
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-05-26 11:03:49 -04:00
Alex Chi
5e92758d20 use clap instead
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-05-26 10:17:05 -04:00
Alex Chi
9073db2622 use serde for cli config parse
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-05-26 09:51:25 -04:00
20 changed files with 285 additions and 326 deletions

1
Cargo.lock generated
View File

@@ -2617,6 +2617,7 @@ dependencies = [
"anyhow",
"byteorder",
"bytes",
"clap 4.3.0",
"const_format",
"enum-map",
"postgres_ffi",

View File

@@ -33,5 +33,7 @@ pub fn init_tracing_and_logging(default_log_level: &str) -> anyhow::Result<()> {
.init();
tracing::info!("logging and tracing started");
utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
Ok(())
}

View File

@@ -2,12 +2,11 @@ use std::borrow::Cow;
use std::collections::HashMap;
use std::fs::File;
use std::io::{BufReader, Write};
use std::num::NonZeroU64;
use std::path::PathBuf;
use std::process::{Child, Command};
use std::{io, result};
use anyhow::{bail, Context};
use anyhow::Context;
use pageserver_api::models::{self, TenantInfo, TimelineInfo};
use postgres_backend::AuthType;
use postgres_connection::{parse_host_port, PgConnectionConfig};
@@ -313,63 +312,8 @@ impl PageServerNode {
new_tenant_id: Option<TenantId>,
settings: HashMap<&str, &str>,
) -> anyhow::Result<TenantId> {
let mut settings = settings.clone();
let config = models::TenantConfig {
checkpoint_distance: settings
.remove("checkpoint_distance")
.map(|x| x.parse::<u64>())
.transpose()?,
checkpoint_timeout: settings.remove("checkpoint_timeout").map(|x| x.to_string()),
compaction_target_size: settings
.remove("compaction_target_size")
.map(|x| x.parse::<u64>())
.transpose()?,
compaction_period: settings.remove("compaction_period").map(|x| x.to_string()),
compaction_threshold: settings
.remove("compaction_threshold")
.map(|x| x.parse::<usize>())
.transpose()?,
gc_horizon: settings
.remove("gc_horizon")
.map(|x| x.parse::<u64>())
.transpose()?,
gc_period: settings.remove("gc_period").map(|x| x.to_string()),
image_creation_threshold: settings
.remove("image_creation_threshold")
.map(|x| x.parse::<usize>())
.transpose()?,
pitr_interval: settings.remove("pitr_interval").map(|x| x.to_string()),
walreceiver_connect_timeout: settings
.remove("walreceiver_connect_timeout")
.map(|x| x.to_string()),
lagging_wal_timeout: settings
.remove("lagging_wal_timeout")
.map(|x| x.to_string()),
max_lsn_wal_lag: settings
.remove("max_lsn_wal_lag")
.map(|x| x.parse::<NonZeroU64>())
.transpose()
.context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
trace_read_requests: settings
.remove("trace_read_requests")
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'trace_read_requests' as bool")?,
eviction_policy: settings
.remove("eviction_policy")
.map(serde_json::from_str)
.transpose()
.context("Failed to parse 'eviction_policy' json")?,
min_resident_size_override: settings
.remove("min_resident_size_override")
.map(|x| x.parse::<u64>())
.transpose()
.context("Failed to parse 'min_resident_size_override' as integer")?,
evictions_low_residence_duration_metric_threshold: settings
.remove("evictions_low_residence_duration_metric_threshold")
.map(|x| x.to_string()),
};
let settings = settings.clone();
let config = models::TenantConfig::deserialize_from_settings(settings)?;
// If tenant ID was not specified, generate one
let new_tenant_id = new_tenant_id.unwrap_or(TenantId::generate());
@@ -378,9 +322,6 @@ impl PageServerNode {
new_tenant_id,
config,
};
if !settings.is_empty() {
bail!("Unrecognized tenant settings: {settings:?}")
}
self.http_request(Method::POST, format!("{}/tenant", self.http_base_url))?
.json(&request)
.send()?
@@ -400,76 +341,9 @@ impl PageServerNode {
pub fn tenant_config(
&self,
tenant_id: TenantId,
mut settings: HashMap<&str, &str>,
settings: HashMap<&str, &str>,
) -> anyhow::Result<()> {
let config = {
// Braces to make the diff easier to read
models::TenantConfig {
checkpoint_distance: settings
.remove("checkpoint_distance")
.map(|x| x.parse::<u64>())
.transpose()
.context("Failed to parse 'checkpoint_distance' as an integer")?,
checkpoint_timeout: settings.remove("checkpoint_timeout").map(|x| x.to_string()),
compaction_target_size: settings
.remove("compaction_target_size")
.map(|x| x.parse::<u64>())
.transpose()
.context("Failed to parse 'compaction_target_size' as an integer")?,
compaction_period: settings.remove("compaction_period").map(|x| x.to_string()),
compaction_threshold: settings
.remove("compaction_threshold")
.map(|x| x.parse::<usize>())
.transpose()
.context("Failed to parse 'compaction_threshold' as an integer")?,
gc_horizon: settings
.remove("gc_horizon")
.map(|x| x.parse::<u64>())
.transpose()
.context("Failed to parse 'gc_horizon' as an integer")?,
gc_period: settings.remove("gc_period").map(|x| x.to_string()),
image_creation_threshold: settings
.remove("image_creation_threshold")
.map(|x| x.parse::<usize>())
.transpose()
.context("Failed to parse 'image_creation_threshold' as non zero integer")?,
pitr_interval: settings.remove("pitr_interval").map(|x| x.to_string()),
walreceiver_connect_timeout: settings
.remove("walreceiver_connect_timeout")
.map(|x| x.to_string()),
lagging_wal_timeout: settings
.remove("lagging_wal_timeout")
.map(|x| x.to_string()),
max_lsn_wal_lag: settings
.remove("max_lsn_wal_lag")
.map(|x| x.parse::<NonZeroU64>())
.transpose()
.context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
trace_read_requests: settings
.remove("trace_read_requests")
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'trace_read_requests' as bool")?,
eviction_policy: settings
.remove("eviction_policy")
.map(serde_json::from_str)
.transpose()
.context("Failed to parse 'eviction_policy' json")?,
min_resident_size_override: settings
.remove("min_resident_size_override")
.map(|x| x.parse::<u64>())
.transpose()
.context("Failed to parse 'min_resident_size_override' as an integer")?,
evictions_low_residence_duration_metric_threshold: settings
.remove("evictions_low_residence_duration_metric_threshold")
.map(|x| x.to_string()),
}
};
if !settings.is_empty() {
bail!("Unrecognized tenant settings: {settings:?}")
}
let config = models::TenantConfig::deserialize_from_settings(settings)?;
self.http_request(Method::PUT, format!("{}/tenant/config", self.http_base_url))?
.json(&models::TenantConfigRequest { tenant_id, config })
.send()?

View File

@@ -5,6 +5,7 @@ edition.workspace = true
license.workspace = true
[dependencies]
clap.workspace = true
serde.workspace = true
serde_with.workspace = true
serde_json.workspace = true

View File

@@ -5,6 +5,7 @@ use std::{
};
use byteorder::{BigEndian, ReadBytesExt};
use clap::Parser;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use strum_macros;
@@ -201,28 +202,72 @@ impl std::ops::Deref for TenantCreateRequest {
}
}
#[derive(Serialize, Deserialize, Debug, Default)]
#[derive(Serialize, Deserialize, Debug, Default, clap::Parser)]
#[clap(rename_all = "snake_case")]
pub struct TenantConfig {
#[clap(long)]
pub checkpoint_distance: Option<u64>,
#[clap(long)]
pub checkpoint_timeout: Option<String>,
#[clap(long)]
pub compaction_target_size: Option<u64>,
#[clap(long)]
pub compaction_period: Option<String>,
#[clap(long)]
pub compaction_threshold: Option<usize>,
#[clap(long)]
pub gc_horizon: Option<u64>,
#[clap(long)]
pub gc_period: Option<String>,
#[clap(long)]
pub image_creation_threshold: Option<usize>,
#[clap(long)]
pub pitr_interval: Option<String>,
#[clap(long)]
pub walreceiver_connect_timeout: Option<String>,
#[clap(long)]
pub lagging_wal_timeout: Option<String>,
#[clap(long)]
pub max_lsn_wal_lag: Option<NonZeroU64>,
#[clap(long)]
pub trace_read_requests: Option<bool>,
// We defer the parsing of the eviction_policy field to the request handler.
// Otherwise we'd have to move the types for eviction policy into this package.
// We might do that once the eviction feature has stabilizied.
// For now, this field is not even documented in the openapi_spec.yml.
#[clap(long, value_parser = parse_json)]
pub eviction_policy: Option<serde_json::Value>,
#[clap(long)]
pub min_resident_size_override: Option<u64>,
#[clap(long)]
pub evictions_low_residence_duration_metric_threshold: Option<String>,
pub gc_feedback: Option<bool>,
}
fn parse_json(s: &str) -> Result<serde_json::Value, serde_json::Error> {
serde_json::from_str(s)
}
impl TenantConfig {
pub fn deserialize_from_settings(settings: HashMap<&str, &str>) -> Result<Self, anyhow::Error> {
// Here we are using `clap` to parse the settings. This is not ideal, but it's the easiest
// way to simplify th code. To convert settings into a list of command line arguments, we
// need the program name argv0, each key into a long-form option, and each value proceeding it.
let config = TenantConfig::try_parse_from(
std::iter::once("argv0".to_string()).chain(
settings
.iter()
.flat_map(|(k, v)| [format!("--{k}"), v.to_string()]),
),
)
.map_err(|e| {
anyhow::anyhow!(
"failed to parse: {}",
e.to_string().split('\n').next().unwrap_or_default()
) // only get the first line as other lines in clap errors are not useful
})?;
Ok(config)
}
}
#[serde_as]
@@ -281,6 +326,7 @@ impl TenantConfigRequest {
eviction_policy: None,
min_resident_size_override: None,
evictions_low_residence_duration_metric_threshold: None,
gc_feedback: None,
};
TenantConfigRequest { tenant_id, config }
}
@@ -871,17 +917,28 @@ mod tests {
err
);
let attach_request = json!({
"config": {
"unknown_field": "unknown_value".to_string(),
},
});
let err = serde_json::from_value::<TenantAttachRequest>(attach_request).unwrap_err();
let config = HashMap::from_iter(std::iter::once(("unknown_field", "unknown_value")));
let err = TenantConfig::deserialize_from_settings(config).unwrap_err();
assert!(
err.to_string().contains("unknown field `unknown_field`"),
err.to_string()
.contains("unexpected argument '--unknown_field' found"),
"expect unknown field `unknown_field` error, got: {}",
err
);
let config = HashMap::from_iter(std::iter::once(("checkpoint_distance", "not_a_number")));
let err = TenantConfig::deserialize_from_settings(config).unwrap_err();
assert!(
err.to_string().contains("invalid digit found in string") && err.to_string().contains("checkpoint_distance"),
"expect error to contain both 'invalid digit found in string' and the field 'checkpoint_distance', got: {}",
err
);
}
#[test]
fn test_parse_json_field() {
let config = vec![("eviction_policy", "{\"kind\": \"NoEviction\"}")];
TenantConfig::deserialize_from_settings(config.into_iter().collect()).unwrap();
}
#[test]

View File

@@ -108,7 +108,7 @@ pub mod defaults {
#min_resident_size_override = .. # in bytes
#evictions_low_residence_duration_metric_threshold = '{DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD}'
#gc_feedback = false
# [remote_storage]
"###
@@ -828,6 +828,14 @@ impl PageServerConf {
)?);
}
if let Some(gc_feedback) = item.get("gc_feedback") {
t_conf.gc_feedback = Some(
gc_feedback
.as_bool()
.with_context(|| "configure option gc_feedback is not a bool".to_string())?,
);
}
Ok(t_conf)
}

View File

@@ -60,7 +60,7 @@ use utils::serde_percent::Percent;
use crate::{
config::PageServerConf,
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
tenant::{self, storage_layer::{PersistentLayer, RemoteLayerDesc}, Timeline},
tenant::{self, storage_layer::PersistentLayer, Timeline},
};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
@@ -329,7 +329,7 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
// If we get far enough in the list that we start to evict layers that are below
// the tenant's min-resident-size threshold, print a warning, and memorize the disk
// usage at that point, in 'usage_planned_min_resident_size_respecting'.
let mut batched: HashMap<_, Vec<Arc<RemoteLayerDesc>>> = HashMap::new();
let mut batched: HashMap<_, Vec<Arc<dyn PersistentLayer>>> = HashMap::new();
let mut warned = None;
let mut usage_planned = usage_pre;
for (i, (partition, candidate)) in candidates.into_iter().enumerate() {
@@ -434,7 +434,7 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
#[derive(Clone)]
struct EvictionCandidate {
timeline: Arc<Timeline>,
layer: Arc<RemoteLayerDesc>,
layer: Arc<dyn PersistentLayer>,
last_activity_ts: SystemTime,
}

View File

@@ -84,7 +84,6 @@ pub mod block_io;
pub mod disk_btree;
pub(crate) mod ephemeral_file;
pub mod layer_map;
pub mod layer_cache;
pub mod metadata;
mod par_fsync;
@@ -3210,6 +3209,7 @@ pub mod harness {
evictions_low_residence_duration_metric_threshold: Some(
tenant_conf.evictions_low_residence_duration_metric_threshold,
),
gc_feedback: Some(tenant_conf.gc_feedback),
}
}
}

View File

@@ -99,6 +99,7 @@ pub struct TenantConf {
// See the corresponding metric's help string.
#[serde(with = "humantime_serde")]
pub evictions_low_residence_duration_metric_threshold: Duration,
pub gc_feedback: bool,
}
/// Same as TenantConf, but this struct preserves the information about
@@ -175,6 +176,10 @@ pub struct TenantConfOpt {
#[serde(with = "humantime_serde")]
#[serde(default)]
pub evictions_low_residence_duration_metric_threshold: Option<Duration>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub gc_feedback: Option<bool>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
@@ -242,6 +247,7 @@ impl TenantConfOpt {
evictions_low_residence_duration_metric_threshold: self
.evictions_low_residence_duration_metric_threshold
.unwrap_or(global_conf.evictions_low_residence_duration_metric_threshold),
gc_feedback: self.gc_feedback.unwrap_or(global_conf.gc_feedback),
}
}
}
@@ -278,6 +284,7 @@ impl Default for TenantConf {
DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD,
)
.expect("cannot parse default evictions_low_residence_duration_metric_threshold"),
gc_feedback: false,
}
}
}
@@ -372,6 +379,7 @@ impl TryFrom<&'_ models::TenantConfig> for TenantConfOpt {
))?,
);
}
tenant_conf.gc_feedback = request_data.gc_feedback;
Ok(tenant_conf)
}

View File

@@ -1,34 +0,0 @@
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use super::storage_layer::{LayerFileName, PersistentLayer, RemoteLayerDesc};
pub struct LayerCache {
layers: Mutex<HashMap<LayerFileName, Arc<dyn PersistentLayer>>>,
}
impl LayerCache {
pub fn new() -> Self {
Self {
layers: Mutex::new(HashMap::new()),
}
}
pub fn get(&self, layer_fname: &LayerFileName) -> Option<Arc<dyn PersistentLayer>> {
let guard: std::sync::MutexGuard<HashMap<LayerFileName, Arc<dyn PersistentLayer>>> =
self.layers.lock().unwrap();
guard.get(layer_fname).cloned()
}
pub fn contains(&self, layer_fname: &LayerFileName) -> bool {
let guard = self.layers.lock().unwrap();
guard.contains_key(layer_fname)
}
pub fn insert(&self, layer_fname: LayerFileName, persistent_layer: Arc<dyn PersistentLayer>) {
let mut guard = self.layers.lock().unwrap();
guard.insert(layer_fname, persistent_layer);
}
}

View File

@@ -61,7 +61,6 @@ use historic_layer_coverage::BufferedHistoricLayerCoverage;
pub use historic_layer_coverage::Replacement;
use super::storage_layer::range_eq;
use super::storage_layer::PersistentLayer;
///
/// LayerMap tracks what layers exist on a timeline.
@@ -139,19 +138,24 @@ where
self.layer_map.remove_historic_noflush(layer)
}
/// Ensure the downloaded layer matches existing layer.
/// Replaces existing layer iff it is the `expected`.
///
/// Returned `Replacement` describes succeeding in checking or the reason why it could not
/// If the expected layer has been removed it will not be inserted by this function.
///
/// Returned `Replacement` describes succeeding in replacement or the reason why it could not
/// be done.
pub fn ensure_consistent(
&self,
///
/// TODO replacement can be done without buffering and rebuilding layer map updates.
/// One way to do that is to add a layer of indirection for returned values, so
/// that we can replace values only by updating a hashmap.
pub fn replace_historic(
&mut self,
expected: &Arc<L>,
new: Arc<dyn PersistentLayer>,
new: Arc<L>,
) -> anyhow::Result<Replacement<Arc<L>>> {
fail::fail_point!("layermap-replace-notfound", |_| Ok(Replacement::NotFound));
self.layer_map
.ensure_consistent_noflush(expected, new)
self.layer_map.replace_historic_noflush(expected, new)
}
// We will flush on drop anyway, but this method makes it
@@ -305,16 +309,16 @@ where
}
}
pub(self) fn ensure_consistent_noflush(
&self,
pub(self) fn replace_historic_noflush(
&mut self,
expected: &Arc<L>,
new: Arc<dyn PersistentLayer>,
new: Arc<L>,
) -> anyhow::Result<Replacement<Arc<L>>> {
let key = historic_layer_coverage::LayerKey::from(&**expected);
let other = historic_layer_coverage::LayerKey::from(&*new);
let expected_l0 = Self::is_l0(expected);
let new_l0 = LayerMap::<dyn PersistentLayer>::is_l0(&*new);
let new_l0 = Self::is_l0(&new);
anyhow::ensure!(
key == other,
@@ -341,7 +345,17 @@ where
None
};
Ok(Replacement::Replaced { in_buffered: false })
let replaced = self.historic.replace(&key, new.clone(), |existing| {
Self::compare_arced_layers(existing, expected)
});
if let Replacement::Replaced { .. } = &replaced {
if let Some(index) = l0_index {
self.l0_delta_layers[index] = new;
}
}
Ok(replaced)
}
/// Helper function for BatchedUpdates::drop.

View File

@@ -204,6 +204,35 @@ fn test_off_by_one() {
assert_eq!(version.image_coverage.query(5), None);
}
/// White-box regression test, checking for incorrect removal of node at key.end
#[test]
fn test_regression() {
let mut map = HistoricLayerCoverage::<String>::new();
map.insert(
LayerKey {
key: 0..5,
lsn: 0..5,
is_image: false,
},
"Layer 1".to_string(),
);
map.insert(
LayerKey {
key: 0..5,
lsn: 1..2,
is_image: false,
},
"Layer 2".to_string(),
);
// If an insertion operation improperly deletes the endpoint of a previous layer
// (which is more likely to happen with layers that collide on key.end), we will
// end up with an infinite layer, covering the entire keyspace. Here we assert
// that there's no layer at key 100 because we didn't insert any layer there.
let version = map.get_version(100).unwrap();
assert_eq!(version.delta_coverage.query(100), None);
}
/// Cover edge cases where layers begin or end on the same key
#[test]
fn test_key_collision() {

View File

@@ -1,8 +1,8 @@
use std::ops::Range;
// TODO the `im` crate has 20x more downloads and also has
// persistent/immutable BTree. It also runs a bit faster but
// results are not the same on some tests.
// NOTE the `im` crate has 20x more downloads and also has
// persistent/immutable BTree. But it's bugged so rpds is a
// better choice https://github.com/neondatabase/neon/issues/3395
use rpds::RedBlackTreeMapSync;
/// Data structure that can efficiently:
@@ -10,19 +10,22 @@ use rpds::RedBlackTreeMapSync;
/// - iterate the latest layers in a key range
/// - insert layers in non-decreasing lsn.start order
///
/// The struct is parameterized over Value for easier
/// testing, but in practice it's some sort of layer.
/// For a detailed explanation and justification of this approach, see:
/// https://neon.tech/blog/persistent-structures-in-neons-wal-indexing
///
/// NOTE The struct is parameterized over Value for easier
/// testing, but in practice it's some sort of layer.
pub struct LayerCoverage<Value> {
/// For every change in coverage (as we sweep the key space)
/// we store (lsn.end, value).
///
/// We use an immutable/persistent tree so that we can keep historic
/// versions of this coverage without cloning the whole thing and
/// incurring quadratic memory cost. See HistoricLayerCoverage.
/// NOTE We use an immutable/persistent tree so that we can keep historic
/// versions of this coverage without cloning the whole thing and
/// incurring quadratic memory cost. See HistoricLayerCoverage.
///
/// We use the Sync version of the map because we want Self to
/// be Sync. Using nonsync might be faster, if we can work with
/// that.
/// NOTE We use the Sync version of the map because we want Self to
/// be Sync. Using nonsync might be faster, if we can work with
/// that.
nodes: RedBlackTreeMapSync<i128, Option<(u64, Value)>>,
}
@@ -41,6 +44,13 @@ impl<Value: Clone> LayerCoverage<Value> {
/// Helper function to subdivide the key range without changing any values
///
/// This operation has no semantic effect by itself. It only helps us pin in
/// place the part of the coverage we don't want to change when inserting.
///
/// As an analogy, think of a polygon. If you add a vertex along one of the
/// segments, the polygon is still the same, but it behaves differently when
/// we move or delete one of the other points.
///
/// Complexity: O(log N)
fn add_node(&mut self, key: i128) {
let value = match self.nodes.range(..=key).last() {
@@ -74,7 +84,7 @@ impl<Value: Clone> LayerCoverage<Value> {
let mut to_update = Vec::new();
let mut to_remove = Vec::new();
let mut prev_covered = false;
for (k, node) in self.nodes.range(key.clone()) {
for (k, node) in self.nodes.range(key) {
let needs_cover = match node {
None => true,
Some((h, _)) => h < &lsn.end,
@@ -87,9 +97,8 @@ impl<Value: Clone> LayerCoverage<Value> {
}
prev_covered = needs_cover;
}
if !prev_covered {
to_remove.push(key.end);
}
// TODO check if the nodes inserted at key.start and key.end are safe
// to remove. It's fine to keep them but they could be redundant.
for k in to_update {
self.nodes.insert_mut(k, Some((lsn.end, value.clone())));
}

View File

@@ -37,7 +37,7 @@ pub use delta_layer::{DeltaLayer, DeltaLayerWriter};
pub use filename::{DeltaFileName, ImageFileName, LayerFileName};
pub use image_layer::{ImageLayer, ImageLayerWriter};
pub use inmemory_layer::InMemoryLayer;
pub use remote_layer::RemoteLayerDesc;
pub use remote_layer::RemoteLayer;
use super::layer_map::BatchedUpdates;
@@ -431,6 +431,14 @@ pub trait PersistentLayer: Layer {
/// Permanently remove this layer from disk.
fn delete_resident_layer_file(&self) -> Result<()>;
fn downcast_remote_layer(self: Arc<Self>) -> Option<std::sync::Arc<RemoteLayer>> {
None
}
fn is_remote_layer(&self) -> bool {
false
}
/// Returns None if the layer file size is not known.
///
/// Should not change over the lifetime of the layer object because
@@ -442,6 +450,16 @@ pub trait PersistentLayer: Layer {
fn access_stats(&self) -> &LayerAccessStats;
}
pub fn downcast_remote_layer(
layer: &Arc<dyn PersistentLayer>,
) -> Option<std::sync::Arc<RemoteLayer>> {
if layer.is_remote_layer() {
Arc::clone(layer).downcast_remote_layer()
} else {
None
}
}
/// Holds metadata about a layer without any content. Used mostly for testing.
///
/// To use filenames as fixtures, parse them as [`LayerFileName`] then convert from that to a

View File

@@ -30,7 +30,6 @@ use crate::repository::{Key, Value, KEY_SIZE};
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::storage_layer::{
PersistentLayer, ValueReconstructResult, ValueReconstructState,
};
@@ -58,7 +57,7 @@ use utils::{
use super::{
DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, LayerFileName, LayerIter,
LayerKeyIter, PathOrConf, RemoteLayerDesc,
LayerKeyIter, PathOrConf,
};
///
@@ -664,17 +663,6 @@ impl DeltaLayer {
&self.layer_name(),
)
}
/// Create layer descriptor for this image layer
pub fn layer_desc(&self) -> RemoteLayerDesc {
RemoteLayerDesc::new_delta(
self.tenant_id,
self.timeline_id,
&self.layer_name(),
&LayerFileMetadata::new(self.file_size()),
LayerAccessStats::empty_will_record_residence_event_later(),
)
}
}
/// A builder object for constructing a new delta layer.

View File

@@ -26,7 +26,6 @@ use crate::repository::{Key, KEY_SIZE};
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::storage_layer::{
LayerAccessStats, PersistentLayer, ValueReconstructResult, ValueReconstructState,
};
@@ -54,7 +53,7 @@ use utils::{
};
use super::filename::{ImageFileName, LayerFileName};
use super::{Layer, LayerAccessStatsReset, LayerIter, PathOrConf, RemoteLayerDesc};
use super::{Layer, LayerAccessStatsReset, LayerIter, PathOrConf};
///
/// Header stored in the beginning of the file
@@ -465,17 +464,6 @@ impl ImageLayer {
&self.layer_name(),
)
}
/// Create layer descriptor for this image layer
pub fn layer_desc(&self) -> RemoteLayerDesc {
RemoteLayerDesc::new_img(
self.tenant_id,
self.timeline_id,
&self.layer_name(),
&LayerFileMetadata::new(self.file_size()),
LayerAccessStats::empty_will_record_residence_event_later(),
)
}
}
/// A builder object for constructing a new image layer.

View File

@@ -1,4 +1,4 @@
//! A RemoteLayerDesc is an in-memory placeholder for a layer file that exists
//! A RemoteLayer is an in-memory placeholder for a layer file that exists
//! in remote storage.
//!
use crate::config::PageServerConf;
@@ -25,19 +25,19 @@ use super::{
LayerResidenceStatus, PersistentLayer,
};
/// RemoteLayerDesc is a not yet downloaded [`ImageLayer`] or
/// RemoteLayer is a not yet downloaded [`ImageLayer`] or
/// [`crate::storage_layer::DeltaLayer`].
///
/// RemoteLayerDesc might be downloaded on-demand during operations which are
/// RemoteLayer might be downloaded on-demand during operations which are
/// allowed download remote layers and during which, it gets replaced with a
/// concrete `DeltaLayer` or `ImageLayer`.
///
/// See: [`crate::context::RequestContext`] for authorization to download
pub struct RemoteLayerDesc {
pub(crate) tenantid: TenantId,
pub(crate) timelineid: TimelineId,
pub(crate) key_range: Range<Key>,
pub(crate) lsn_range: Range<Lsn>,
pub struct RemoteLayer {
tenantid: TenantId,
timelineid: TimelineId,
key_range: Range<Key>,
lsn_range: Range<Lsn>,
pub file_name: LayerFileName,
@@ -54,7 +54,7 @@ pub struct RemoteLayerDesc {
/// Has `LayerMap::replace` failed for this (true) or not (false).
///
/// Used together with [`ongoing_download`] semaphore in `Timeline::download_remote_layer`.
/// The field is used to mark a RemoteLayerDesc permanently (until restart or ignore+load)
/// The field is used to mark a RemoteLayer permanently (until restart or ignore+load)
/// unprocessable, because a LayerMap::replace failed.
///
/// It is very unlikely to accumulate these in the Timeline's LayerMap, but having this avoids
@@ -63,9 +63,9 @@ pub struct RemoteLayerDesc {
pub(crate) download_replacement_failure: std::sync::atomic::AtomicBool,
}
impl std::fmt::Debug for RemoteLayerDesc {
impl std::fmt::Debug for RemoteLayer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RemoteLayerDesc")
f.debug_struct("RemoteLayer")
.field("file_name", &self.file_name)
.field("layer_metadata", &self.layer_metadata)
.field("is_incremental", &self.is_incremental)
@@ -73,7 +73,7 @@ impl std::fmt::Debug for RemoteLayerDesc {
}
}
impl Layer for RemoteLayerDesc {
impl Layer for RemoteLayer {
fn get_key_range(&self) -> Range<Key> {
self.key_range.clone()
}
@@ -119,7 +119,7 @@ impl Layer for RemoteLayerDesc {
}
}
impl PersistentLayer for RemoteLayerDesc {
impl PersistentLayer for RemoteLayer {
fn get_tenant_id(&self) -> TenantId {
self.tenantid
}
@@ -160,6 +160,14 @@ impl PersistentLayer for RemoteLayerDesc {
bail!("remote layer has no layer file");
}
fn downcast_remote_layer<'a>(self: Arc<Self>) -> Option<std::sync::Arc<RemoteLayer>> {
Some(self)
}
fn is_remote_layer(&self) -> bool {
true
}
fn file_size(&self) -> u64 {
self.layer_metadata.file_size()
}
@@ -193,15 +201,15 @@ impl PersistentLayer for RemoteLayerDesc {
}
}
impl RemoteLayerDesc {
impl RemoteLayer {
pub fn new_img(
tenantid: TenantId,
timelineid: TimelineId,
fname: &ImageFileName,
layer_metadata: &LayerFileMetadata,
access_stats: LayerAccessStats,
) -> RemoteLayerDesc {
RemoteLayerDesc {
) -> RemoteLayer {
RemoteLayer {
tenantid,
timelineid,
key_range: fname.key_range.clone(),
@@ -222,8 +230,8 @@ impl RemoteLayerDesc {
fname: &DeltaFileName,
layer_metadata: &LayerFileMetadata,
access_stats: LayerAccessStats,
) -> RemoteLayerDesc {
RemoteLayerDesc {
) -> RemoteLayer {
RemoteLayer {
tenantid,
timelineid,
key_range: fname.key_range.clone(),

View File

@@ -35,7 +35,7 @@ use crate::context::{DownloadBehavior, RequestContext};
use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata};
use crate::tenant::storage_layer::{
DeltaFileName, DeltaLayerWriter, ImageFileName, ImageLayerWriter, InMemoryLayer,
LayerAccessStats, LayerFileName, RemoteLayerDesc,
LayerAccessStats, LayerFileName, RemoteLayer,
};
use crate::tenant::{
ephemeral_file::is_ephemeral_file,
@@ -77,7 +77,6 @@ use self::eviction_task::EvictionTaskTimelineState;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::config::TenantConf;
use super::layer_cache::LayerCache;
use super::layer_map::BatchedUpdates;
use super::remote_timeline_client::index::IndexPart;
use super::remote_timeline_client::RemoteTimelineClient;
@@ -120,7 +119,7 @@ pub struct Timeline {
pub pg_version: u32,
pub(super) layers: RwLock<LayerMap<RemoteLayerDesc>>,
pub(super) layers: RwLock<LayerMap<dyn PersistentLayer>>,
/// Set of key ranges which should be covered by image layers to
/// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
@@ -242,8 +241,6 @@ pub struct Timeline {
pub delete_lock: tokio::sync::Mutex<bool>,
eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
layer_cache: Arc<LayerCache>,
}
/// Internal structure to hold all data needed for logical size calculation.
@@ -1010,22 +1007,20 @@ impl Timeline {
#[instrument(skip_all, fields(tenant = %self.tenant_id, timeline = %self.timeline_id))]
pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
let Some(remote_layer_desc) = self.find_layer_desc(layer_file_name) else { return Ok(None) };
if self.layer_cache.contains(&remote_layer_desc.filename()) {
return Ok(Some(false));
}
let Some(layer) = self.find_layer(layer_file_name) else { return Ok(None) };
let Some(remote_layer) = layer.downcast_remote_layer() else { return Ok(Some(false)) };
if self.remote_client.is_none() {
return Ok(Some(false));
}
self.download_remote_layer(remote_layer_desc).await?;
self.download_remote_layer(remote_layer).await?;
Ok(Some(true))
}
/// Like [`evict_layer_batch`], but for just one layer.
/// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`.
pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
let Some(local_layer) = self.find_layer_desc(layer_file_name) else { return Ok(None) };
let Some(local_layer) = self.find_layer(layer_file_name) else { return Ok(None) };
let remote_client = self
.remote_client
.as_ref()
@@ -1052,7 +1047,7 @@ impl Timeline {
pub async fn evict_layers(
&self,
_: &GenericRemoteStorage,
layers_to_evict: &[Arc<RemoteLayerDesc>],
layers_to_evict: &[Arc<dyn PersistentLayer>],
cancel: CancellationToken,
) -> anyhow::Result<Vec<Option<anyhow::Result<bool>>>> {
let remote_client = self.remote_client.clone().expect(
@@ -1087,7 +1082,7 @@ impl Timeline {
async fn evict_layer_batch(
&self,
remote_client: &Arc<RemoteTimelineClient>,
layers_to_evict: &[Arc<RemoteLayerDesc>],
layers_to_evict: &[Arc<dyn PersistentLayer>],
cancel: CancellationToken,
) -> anyhow::Result<Vec<Option<anyhow::Result<bool>>>> {
// ensure that the layers have finished uploading
@@ -1136,12 +1131,12 @@ impl Timeline {
fn evict_layer_batch_impl(
&self,
_layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
local_layer: &Arc<RemoteLayerDesc>,
batch_updates: &mut BatchedUpdates<'_, RemoteLayerDesc>,
local_layer: &Arc<dyn PersistentLayer>,
batch_updates: &mut BatchedUpdates<'_, dyn PersistentLayer>,
) -> anyhow::Result<bool> {
use super::layer_map::Replacement;
if !self.layer_cache.contains(&local_layer.filename()) {
if local_layer.is_remote_layer() {
// TODO(issue #3851): consider returning an err here instead of false,
// which is the same out the match later
return Ok(false);
@@ -1168,7 +1163,7 @@ impl Timeline {
let layer_metadata = LayerFileMetadata::new(layer_file_size);
let new_remote_layer = Arc::new(match local_layer.filename() {
LayerFileName::Image(image_name) => RemoteLayerDesc::new_img(
LayerFileName::Image(image_name) => RemoteLayer::new_img(
self.tenant_id,
self.timeline_id,
&image_name,
@@ -1177,7 +1172,7 @@ impl Timeline {
.access_stats()
.clone_for_residence_change(batch_updates, LayerResidenceStatus::Evicted),
),
LayerFileName::Delta(delta_name) => RemoteLayerDesc::new_delta(
LayerFileName::Delta(delta_name) => RemoteLayer::new_delta(
self.tenant_id,
self.timeline_id,
&delta_name,
@@ -1188,7 +1183,6 @@ impl Timeline {
),
});
/*
let replaced = match batch_updates.replace_historic(local_layer, new_remote_layer)? {
Replacement::Replaced { .. } => {
if let Err(e) = local_layer.delete_resident_layer_file() {
@@ -1239,10 +1233,8 @@ impl Timeline {
false
}
};
*/
// Ok(replaced)
Ok(true)
Ok(replaced)
}
}
@@ -1299,6 +1291,13 @@ impl Timeline {
.unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
}
fn get_gc_feedback(&self) -> bool {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.gc_feedback
.unwrap_or(self.conf.default_tenant_conf.gc_feedback)
}
pub(super) fn tenant_conf_updated(&self) {
// NB: Most tenant conf options are read by background loops, so,
// changes will automatically be picked up.
@@ -1427,8 +1426,6 @@ impl Timeline {
EvictionTaskTimelineState::default(),
),
delete_lock: tokio::sync::Mutex::new(false),
layer_cache: Arc::new(LayerCache::new()),
};
result.repartition_threshold = result.get_checkpoint_distance() / 10;
result
@@ -1575,12 +1572,9 @@ impl Timeline {
LayerAccessStats::for_loading_layer(&updates, LayerResidenceStatus::Resident),
);
let remote_desc = layer.layer_desc();
trace!("found layer {}", layer.path().display());
total_physical_size += file_size;
self.layer_cache.insert(layer.filename(), Arc::new(layer));
updates.insert_historic(Arc::new(remote_desc));
updates.insert_historic(Arc::new(layer));
num_layers += 1;
} else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) {
// Create a DeltaLayer struct for each delta file.
@@ -1612,9 +1606,7 @@ impl Timeline {
trace!("found layer {}", layer.path().display());
total_physical_size += file_size;
let remote_desc = layer.layer_desc();
self.layer_cache.insert(layer.filename(), Arc::new(layer));
updates.insert_historic(Arc::new(remote_desc));
updates.insert_historic(Arc::new(layer));
num_layers += 1;
} else if fname == METADATA_FILE_NAME || fname.ends_with(".old") {
// ignore these
@@ -1659,9 +1651,9 @@ impl Timeline {
async fn create_remote_layers(
&self,
index_part: &IndexPart,
local_layers: HashMap<LayerFileName, Arc<RemoteLayerDesc>>,
local_layers: HashMap<LayerFileName, Arc<dyn PersistentLayer>>,
up_to_date_disk_consistent_lsn: Lsn,
) -> anyhow::Result<HashMap<LayerFileName, Arc<RemoteLayerDesc>>> {
) -> anyhow::Result<HashMap<LayerFileName, Arc<dyn PersistentLayer>>> {
// Are we missing some files that are present in remote storage?
// Create RemoteLayer instances for them.
let mut local_only_layers = local_layers;
@@ -1740,7 +1732,7 @@ impl Timeline {
continue;
}
let remote_layer = RemoteLayerDesc::new_img(
let remote_layer = RemoteLayer::new_img(
self.tenant_id,
self.timeline_id,
imgfilename,
@@ -1768,7 +1760,7 @@ impl Timeline {
);
continue;
}
let remote_layer = RemoteLayerDesc::new_delta(
let remote_layer = RemoteLayer::new_delta(
self.tenant_id,
self.timeline_id,
deltafilename,
@@ -2174,7 +2166,7 @@ impl Timeline {
}
}
fn find_layer_desc(&self, layer_file_name: &str) -> Option<Arc<RemoteLayerDesc>> {
fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
for historic_layer in self.layers.read().unwrap().iter_historic_layers() {
let historic_layer_name = historic_layer.filename().file_name();
if layer_file_name == historic_layer_name {
@@ -2191,10 +2183,10 @@ impl Timeline {
&self,
// we cannot remove layers otherwise, since gc and compaction will race
_layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
layer: Arc<RemoteLayerDesc>,
updates: &mut BatchedUpdates<'_, RemoteLayerDesc>,
layer: Arc<dyn PersistentLayer>,
updates: &mut BatchedUpdates<'_, dyn PersistentLayer>,
) -> anyhow::Result<()> {
if self.layer_cache.contains(&layer.filename()) {
if !layer.is_remote_layer() {
layer.delete_resident_layer_file()?;
let layer_file_size = layer.file_size();
self.metrics
@@ -2443,7 +2435,13 @@ impl Timeline {
if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
// If it's a remote layer, download it and retry.
if let Some(layer) = self.layer_cache.get(&layer.filename()) {
if let Some(remote_layer) =
super::storage_layer::downcast_remote_layer(&layer)
{
// TODO: push a breadcrumb to 'traversal_path' to record the fact that
// we downloaded / would need to download this layer.
remote_layer // download happens outside the scope of `layers` guard object
} else {
// Get all the data needed to reconstruct the page version from this layer.
// But if we have an older cached page image, no need to go past that.
let lsn_floor = max(cached_lsn + 1, lsn_floor);
@@ -2466,10 +2464,6 @@ impl Timeline {
}),
));
continue 'outer;
} else {
// TODO: push a breadcrumb to 'traversal_path' to record the fact that
// we downloaded / would need to download this layer.
layer // download happens outside the scope of `layers` guard object
}
} else if timeline.ancestor_timeline.is_some() {
// Nothing on this timeline. Traverse to parent
@@ -2909,8 +2903,7 @@ impl Timeline {
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
batch_updates.insert_historic(Arc::new(l.layer_desc()));
self.layer_cache.insert(l.filename(), l);
batch_updates.insert_historic(l);
batch_updates.flush();
// update the timeline's physical size
@@ -3138,6 +3131,7 @@ impl Timeline {
let mut layers = self.layers.write().unwrap();
let mut updates = layers.batch_update();
let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id);
for l in image_layers {
let path = l.filename();
let metadata = timeline_path
@@ -3156,9 +3150,7 @@ impl Timeline {
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
updates.insert_historic(Arc::new(l.layer_desc()));
let x: Arc<dyn PersistentLayer> = l;
self.layer_cache.insert(x.filename(), x)
updates.insert_historic(l);
}
updates.flush();
drop(layers);
@@ -3171,7 +3163,7 @@ impl Timeline {
#[derive(Default)]
struct CompactLevel0Phase1Result {
new_layers: Vec<DeltaLayer>,
deltas_to_compact: Vec<Arc<RemoteLayerDesc>>,
deltas_to_compact: Vec<Arc<dyn PersistentLayer>>,
}
/// Top-level failure to compact.
@@ -3181,7 +3173,7 @@ enum CompactionError {
///
/// This should not happen repeatedly, but will be retried once by top-level
/// `Timeline::compact`.
DownloadRequired(Vec<Arc<RemoteLayerDesc>>),
DownloadRequired(Vec<Arc<RemoteLayer>>),
/// Compaction cannot be done right now; page reconstruction and so on.
Other(anyhow::Error),
}
@@ -3253,9 +3245,13 @@ impl Timeline {
let remotes = deltas_to_compact
.iter()
.filter(|l| !self.layer_cache.contains(&l.filename()))
.filter(|l| l.is_remote_layer())
.inspect(|l| info!("compact requires download of {}", l.filename().file_name()))
.cloned()
.map(|l| {
l.clone()
.downcast_remote_layer()
.expect("just checked it is remote layer")
})
.collect::<Vec<_>>();
if !remotes.is_empty() {
@@ -3595,15 +3591,13 @@ impl Timeline {
.add(metadata.len());
new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len()));
let remote_desc = l.layer_desc();
let x: Arc<dyn PersistentLayer + 'static> = Arc::new(l);
x.access_stats().record_residence_event(
&updates,
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
updates.insert_historic(Arc::new(remote_desc));
self.layer_cache.insert(x.filename(), x)
updates.insert_historic(x);
}
// Now that we have reshuffled the data to set of new delta layers, we can
@@ -3910,7 +3904,7 @@ impl Timeline {
// delta layers. Image layers can form "stairs" preventing old image from been deleted.
// But image layers are in any case less sparse than delta layers. Also we need some
// protection from replacing recent image layers with new one after each GC iteration.
if l.is_incremental() && !LayerMap::is_l0(&*l) {
if self.get_gc_feedback() && l.is_incremental() && !LayerMap::is_l0(&*l) {
wanted_image_layers.add_range(l.get_key_range());
}
result.layers_not_updated += 1;
@@ -4076,7 +4070,7 @@ impl Timeline {
#[instrument(skip_all, fields(layer=%remote_layer.short_id()))]
pub async fn download_remote_layer(
&self,
remote_layer: Arc<RemoteLayerDesc>,
remote_layer: Arc<RemoteLayer>,
) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_and_timeline_id();
@@ -4133,12 +4127,10 @@ impl Timeline {
// Delta- or ImageLayer in the layer map.
let mut layers = self_clone.layers.write().unwrap();
let mut updates = layers.batch_update();
let new_layer =
remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size);
let new_layer = remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size);
{
use crate::tenant::layer_map::Replacement;
let l: Arc<dyn PersistentLayer> = remote_layer.clone();
/*
let failure = match updates.replace_historic(&l, new_layer) {
Ok(Replacement::Replaced { .. }) => false,
Ok(Replacement::NotFound) => {
@@ -4193,9 +4185,8 @@ impl Timeline {
remote_layer
.download_replacement_failure
.store(true, Relaxed);
} */
}
}
updates.flush();
drop(layers);
@@ -4208,10 +4199,7 @@ impl Timeline {
remote_layer.ongoing_download.close();
} else {
// Keep semaphore open. We'll drop the permit at the end of the function.
error!(
"layer file download failed: {:?}",
result.as_ref().unwrap_err()
);
error!("layer file download failed: {:?}", result.as_ref().unwrap_err());
}
// Don't treat it as an error if the task that triggered the download
@@ -4225,8 +4213,7 @@ impl Timeline {
drop(permit);
Ok(())
}
.in_current_span(),
}.in_current_span(),
);
receiver.await.context("download task cancelled")?
@@ -4299,7 +4286,7 @@ impl Timeline {
let layers = self.layers.read().unwrap();
layers
.iter_historic_layers()
.filter(|l| !self.layer_cache.contains(&l.filename()))
.filter_map(|l| l.downcast_remote_layer())
.map(|l| self.download_remote_layer(l))
.for_each(|dl| downloads.push(dl))
}
@@ -4374,7 +4361,7 @@ pub struct DiskUsageEvictionInfo {
}
pub struct LocalLayerInfoForDiskUsageEviction {
pub layer: Arc<RemoteLayerDesc>,
pub layer: Arc<dyn PersistentLayer>,
pub last_activity_ts: SystemTime,
}
@@ -4408,7 +4395,7 @@ impl Timeline {
let file_size = l.file_size();
max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
if !self.layer_cache.contains(&l.filename()) {
if l.is_remote_layer() {
continue;
}

View File

@@ -29,7 +29,7 @@ use crate::{
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
tenant::{
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
storage_layer::{PersistentLayer, RemoteLayerDesc},
storage_layer::PersistentLayer,
LogicalSizeCalculationCause, Tenant,
},
};
@@ -184,11 +184,11 @@ impl Timeline {
// NB: all the checks can be invalidated as soon as we release the layer map lock.
// We don't want to hold the layer map lock during eviction.
// So, we just need to deal with this.
let candidates: Vec<Arc<RemoteLayerDesc>> = {
let candidates: Vec<Arc<dyn PersistentLayer>> = {
let layers = self.layers.read().unwrap();
let mut candidates = Vec::new();
for hist_layer in layers.iter_historic_layers() {
if !self.layer_cache.contains(&hist_layer.filename()) {
if hist_layer.is_remote_layer() {
continue;
}

View File

@@ -158,6 +158,7 @@ def test_fully_custom_config(positive_env: NeonEnv):
"threshold": "23h",
},
"evictions_low_residence_duration_metric_threshold": "2days",
"gc_feedback": True,
"gc_horizon": 23 * (1024 * 1024),
"gc_period": "2h 13m",
"image_creation_threshold": 7,