mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-26 06:40:38 +00:00
Compare commits
11 Commits
skyzh/refa
...
skyzh/cli-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5da45b182d | ||
|
|
b23e085efb | ||
|
|
e4c2713dc6 | ||
|
|
d70fcbd007 | ||
|
|
36fee50f4d | ||
|
|
330083638f | ||
|
|
952d6e43a2 | ||
|
|
b6447462dc | ||
|
|
b68c22be81 | ||
|
|
5e92758d20 | ||
|
|
9073db2622 |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -2617,6 +2617,7 @@ dependencies = [
|
||||
"anyhow",
|
||||
"byteorder",
|
||||
"bytes",
|
||||
"clap 4.3.0",
|
||||
"const_format",
|
||||
"enum-map",
|
||||
"postgres_ffi",
|
||||
|
||||
@@ -33,5 +33,7 @@ pub fn init_tracing_and_logging(default_log_level: &str) -> anyhow::Result<()> {
|
||||
.init();
|
||||
tracing::info!("logging and tracing started");
|
||||
|
||||
utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -2,12 +2,11 @@ use std::borrow::Cow;
|
||||
use std::collections::HashMap;
|
||||
use std::fs::File;
|
||||
use std::io::{BufReader, Write};
|
||||
use std::num::NonZeroU64;
|
||||
use std::path::PathBuf;
|
||||
use std::process::{Child, Command};
|
||||
use std::{io, result};
|
||||
|
||||
use anyhow::{bail, Context};
|
||||
use anyhow::Context;
|
||||
use pageserver_api::models::{self, TenantInfo, TimelineInfo};
|
||||
use postgres_backend::AuthType;
|
||||
use postgres_connection::{parse_host_port, PgConnectionConfig};
|
||||
@@ -313,63 +312,8 @@ impl PageServerNode {
|
||||
new_tenant_id: Option<TenantId>,
|
||||
settings: HashMap<&str, &str>,
|
||||
) -> anyhow::Result<TenantId> {
|
||||
let mut settings = settings.clone();
|
||||
|
||||
let config = models::TenantConfig {
|
||||
checkpoint_distance: settings
|
||||
.remove("checkpoint_distance")
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()?,
|
||||
checkpoint_timeout: settings.remove("checkpoint_timeout").map(|x| x.to_string()),
|
||||
compaction_target_size: settings
|
||||
.remove("compaction_target_size")
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()?,
|
||||
compaction_period: settings.remove("compaction_period").map(|x| x.to_string()),
|
||||
compaction_threshold: settings
|
||||
.remove("compaction_threshold")
|
||||
.map(|x| x.parse::<usize>())
|
||||
.transpose()?,
|
||||
gc_horizon: settings
|
||||
.remove("gc_horizon")
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()?,
|
||||
gc_period: settings.remove("gc_period").map(|x| x.to_string()),
|
||||
image_creation_threshold: settings
|
||||
.remove("image_creation_threshold")
|
||||
.map(|x| x.parse::<usize>())
|
||||
.transpose()?,
|
||||
pitr_interval: settings.remove("pitr_interval").map(|x| x.to_string()),
|
||||
walreceiver_connect_timeout: settings
|
||||
.remove("walreceiver_connect_timeout")
|
||||
.map(|x| x.to_string()),
|
||||
lagging_wal_timeout: settings
|
||||
.remove("lagging_wal_timeout")
|
||||
.map(|x| x.to_string()),
|
||||
max_lsn_wal_lag: settings
|
||||
.remove("max_lsn_wal_lag")
|
||||
.map(|x| x.parse::<NonZeroU64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
|
||||
trace_read_requests: settings
|
||||
.remove("trace_read_requests")
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'trace_read_requests' as bool")?,
|
||||
eviction_policy: settings
|
||||
.remove("eviction_policy")
|
||||
.map(serde_json::from_str)
|
||||
.transpose()
|
||||
.context("Failed to parse 'eviction_policy' json")?,
|
||||
min_resident_size_override: settings
|
||||
.remove("min_resident_size_override")
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'min_resident_size_override' as integer")?,
|
||||
evictions_low_residence_duration_metric_threshold: settings
|
||||
.remove("evictions_low_residence_duration_metric_threshold")
|
||||
.map(|x| x.to_string()),
|
||||
};
|
||||
let settings = settings.clone();
|
||||
let config = models::TenantConfig::deserialize_from_settings(settings)?;
|
||||
|
||||
// If tenant ID was not specified, generate one
|
||||
let new_tenant_id = new_tenant_id.unwrap_or(TenantId::generate());
|
||||
@@ -378,9 +322,6 @@ impl PageServerNode {
|
||||
new_tenant_id,
|
||||
config,
|
||||
};
|
||||
if !settings.is_empty() {
|
||||
bail!("Unrecognized tenant settings: {settings:?}")
|
||||
}
|
||||
self.http_request(Method::POST, format!("{}/tenant", self.http_base_url))?
|
||||
.json(&request)
|
||||
.send()?
|
||||
@@ -400,76 +341,9 @@ impl PageServerNode {
|
||||
pub fn tenant_config(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
mut settings: HashMap<&str, &str>,
|
||||
settings: HashMap<&str, &str>,
|
||||
) -> anyhow::Result<()> {
|
||||
let config = {
|
||||
// Braces to make the diff easier to read
|
||||
models::TenantConfig {
|
||||
checkpoint_distance: settings
|
||||
.remove("checkpoint_distance")
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'checkpoint_distance' as an integer")?,
|
||||
checkpoint_timeout: settings.remove("checkpoint_timeout").map(|x| x.to_string()),
|
||||
compaction_target_size: settings
|
||||
.remove("compaction_target_size")
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'compaction_target_size' as an integer")?,
|
||||
compaction_period: settings.remove("compaction_period").map(|x| x.to_string()),
|
||||
compaction_threshold: settings
|
||||
.remove("compaction_threshold")
|
||||
.map(|x| x.parse::<usize>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'compaction_threshold' as an integer")?,
|
||||
gc_horizon: settings
|
||||
.remove("gc_horizon")
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'gc_horizon' as an integer")?,
|
||||
gc_period: settings.remove("gc_period").map(|x| x.to_string()),
|
||||
image_creation_threshold: settings
|
||||
.remove("image_creation_threshold")
|
||||
.map(|x| x.parse::<usize>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'image_creation_threshold' as non zero integer")?,
|
||||
pitr_interval: settings.remove("pitr_interval").map(|x| x.to_string()),
|
||||
walreceiver_connect_timeout: settings
|
||||
.remove("walreceiver_connect_timeout")
|
||||
.map(|x| x.to_string()),
|
||||
lagging_wal_timeout: settings
|
||||
.remove("lagging_wal_timeout")
|
||||
.map(|x| x.to_string()),
|
||||
max_lsn_wal_lag: settings
|
||||
.remove("max_lsn_wal_lag")
|
||||
.map(|x| x.parse::<NonZeroU64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'max_lsn_wal_lag' as non zero integer")?,
|
||||
trace_read_requests: settings
|
||||
.remove("trace_read_requests")
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'trace_read_requests' as bool")?,
|
||||
eviction_policy: settings
|
||||
.remove("eviction_policy")
|
||||
.map(serde_json::from_str)
|
||||
.transpose()
|
||||
.context("Failed to parse 'eviction_policy' json")?,
|
||||
min_resident_size_override: settings
|
||||
.remove("min_resident_size_override")
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'min_resident_size_override' as an integer")?,
|
||||
evictions_low_residence_duration_metric_threshold: settings
|
||||
.remove("evictions_low_residence_duration_metric_threshold")
|
||||
.map(|x| x.to_string()),
|
||||
}
|
||||
};
|
||||
|
||||
if !settings.is_empty() {
|
||||
bail!("Unrecognized tenant settings: {settings:?}")
|
||||
}
|
||||
|
||||
let config = models::TenantConfig::deserialize_from_settings(settings)?;
|
||||
self.http_request(Method::PUT, format!("{}/tenant/config", self.http_base_url))?
|
||||
.json(&models::TenantConfigRequest { tenant_id, config })
|
||||
.send()?
|
||||
|
||||
@@ -5,6 +5,7 @@ edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
clap.workspace = true
|
||||
serde.workspace = true
|
||||
serde_with.workspace = true
|
||||
serde_json.workspace = true
|
||||
|
||||
@@ -5,6 +5,7 @@ use std::{
|
||||
};
|
||||
|
||||
use byteorder::{BigEndian, ReadBytesExt};
|
||||
use clap::Parser;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::{serde_as, DisplayFromStr};
|
||||
use strum_macros;
|
||||
@@ -201,28 +202,72 @@ impl std::ops::Deref for TenantCreateRequest {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Default)]
|
||||
#[derive(Serialize, Deserialize, Debug, Default, clap::Parser)]
|
||||
#[clap(rename_all = "snake_case")]
|
||||
pub struct TenantConfig {
|
||||
#[clap(long)]
|
||||
pub checkpoint_distance: Option<u64>,
|
||||
#[clap(long)]
|
||||
pub checkpoint_timeout: Option<String>,
|
||||
#[clap(long)]
|
||||
pub compaction_target_size: Option<u64>,
|
||||
#[clap(long)]
|
||||
pub compaction_period: Option<String>,
|
||||
#[clap(long)]
|
||||
pub compaction_threshold: Option<usize>,
|
||||
#[clap(long)]
|
||||
pub gc_horizon: Option<u64>,
|
||||
#[clap(long)]
|
||||
pub gc_period: Option<String>,
|
||||
#[clap(long)]
|
||||
pub image_creation_threshold: Option<usize>,
|
||||
#[clap(long)]
|
||||
pub pitr_interval: Option<String>,
|
||||
#[clap(long)]
|
||||
pub walreceiver_connect_timeout: Option<String>,
|
||||
#[clap(long)]
|
||||
pub lagging_wal_timeout: Option<String>,
|
||||
#[clap(long)]
|
||||
pub max_lsn_wal_lag: Option<NonZeroU64>,
|
||||
#[clap(long)]
|
||||
pub trace_read_requests: Option<bool>,
|
||||
// We defer the parsing of the eviction_policy field to the request handler.
|
||||
// Otherwise we'd have to move the types for eviction policy into this package.
|
||||
// We might do that once the eviction feature has stabilizied.
|
||||
// For now, this field is not even documented in the openapi_spec.yml.
|
||||
#[clap(long, value_parser = parse_json)]
|
||||
pub eviction_policy: Option<serde_json::Value>,
|
||||
#[clap(long)]
|
||||
pub min_resident_size_override: Option<u64>,
|
||||
#[clap(long)]
|
||||
pub evictions_low_residence_duration_metric_threshold: Option<String>,
|
||||
pub gc_feedback: Option<bool>,
|
||||
}
|
||||
|
||||
fn parse_json(s: &str) -> Result<serde_json::Value, serde_json::Error> {
|
||||
serde_json::from_str(s)
|
||||
}
|
||||
|
||||
impl TenantConfig {
|
||||
pub fn deserialize_from_settings(settings: HashMap<&str, &str>) -> Result<Self, anyhow::Error> {
|
||||
// Here we are using `clap` to parse the settings. This is not ideal, but it's the easiest
|
||||
// way to simplify th code. To convert settings into a list of command line arguments, we
|
||||
// need the program name argv0, each key into a long-form option, and each value proceeding it.
|
||||
let config = TenantConfig::try_parse_from(
|
||||
std::iter::once("argv0".to_string()).chain(
|
||||
settings
|
||||
.iter()
|
||||
.flat_map(|(k, v)| [format!("--{k}"), v.to_string()]),
|
||||
),
|
||||
)
|
||||
.map_err(|e| {
|
||||
anyhow::anyhow!(
|
||||
"failed to parse: {}",
|
||||
e.to_string().split('\n').next().unwrap_or_default()
|
||||
) // only get the first line as other lines in clap errors are not useful
|
||||
})?;
|
||||
Ok(config)
|
||||
}
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
@@ -281,6 +326,7 @@ impl TenantConfigRequest {
|
||||
eviction_policy: None,
|
||||
min_resident_size_override: None,
|
||||
evictions_low_residence_duration_metric_threshold: None,
|
||||
gc_feedback: None,
|
||||
};
|
||||
TenantConfigRequest { tenant_id, config }
|
||||
}
|
||||
@@ -871,17 +917,28 @@ mod tests {
|
||||
err
|
||||
);
|
||||
|
||||
let attach_request = json!({
|
||||
"config": {
|
||||
"unknown_field": "unknown_value".to_string(),
|
||||
},
|
||||
});
|
||||
let err = serde_json::from_value::<TenantAttachRequest>(attach_request).unwrap_err();
|
||||
let config = HashMap::from_iter(std::iter::once(("unknown_field", "unknown_value")));
|
||||
let err = TenantConfig::deserialize_from_settings(config).unwrap_err();
|
||||
assert!(
|
||||
err.to_string().contains("unknown field `unknown_field`"),
|
||||
err.to_string()
|
||||
.contains("unexpected argument '--unknown_field' found"),
|
||||
"expect unknown field `unknown_field` error, got: {}",
|
||||
err
|
||||
);
|
||||
|
||||
let config = HashMap::from_iter(std::iter::once(("checkpoint_distance", "not_a_number")));
|
||||
let err = TenantConfig::deserialize_from_settings(config).unwrap_err();
|
||||
assert!(
|
||||
err.to_string().contains("invalid digit found in string") && err.to_string().contains("checkpoint_distance"),
|
||||
"expect error to contain both 'invalid digit found in string' and the field 'checkpoint_distance', got: {}",
|
||||
err
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_json_field() {
|
||||
let config = vec![("eviction_policy", "{\"kind\": \"NoEviction\"}")];
|
||||
TenantConfig::deserialize_from_settings(config.into_iter().collect()).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -108,7 +108,7 @@ pub mod defaults {
|
||||
|
||||
#min_resident_size_override = .. # in bytes
|
||||
#evictions_low_residence_duration_metric_threshold = '{DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD}'
|
||||
|
||||
#gc_feedback = false
|
||||
# [remote_storage]
|
||||
|
||||
"###
|
||||
@@ -828,6 +828,14 @@ impl PageServerConf {
|
||||
)?);
|
||||
}
|
||||
|
||||
if let Some(gc_feedback) = item.get("gc_feedback") {
|
||||
t_conf.gc_feedback = Some(
|
||||
gc_feedback
|
||||
.as_bool()
|
||||
.with_context(|| "configure option gc_feedback is not a bool".to_string())?,
|
||||
);
|
||||
}
|
||||
|
||||
Ok(t_conf)
|
||||
}
|
||||
|
||||
|
||||
@@ -60,7 +60,7 @@ use utils::serde_percent::Percent;
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
|
||||
tenant::{self, storage_layer::{PersistentLayer, RemoteLayerDesc}, Timeline},
|
||||
tenant::{self, storage_layer::PersistentLayer, Timeline},
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
@@ -329,7 +329,7 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
// If we get far enough in the list that we start to evict layers that are below
|
||||
// the tenant's min-resident-size threshold, print a warning, and memorize the disk
|
||||
// usage at that point, in 'usage_planned_min_resident_size_respecting'.
|
||||
let mut batched: HashMap<_, Vec<Arc<RemoteLayerDesc>>> = HashMap::new();
|
||||
let mut batched: HashMap<_, Vec<Arc<dyn PersistentLayer>>> = HashMap::new();
|
||||
let mut warned = None;
|
||||
let mut usage_planned = usage_pre;
|
||||
for (i, (partition, candidate)) in candidates.into_iter().enumerate() {
|
||||
@@ -434,7 +434,7 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
|
||||
#[derive(Clone)]
|
||||
struct EvictionCandidate {
|
||||
timeline: Arc<Timeline>,
|
||||
layer: Arc<RemoteLayerDesc>,
|
||||
layer: Arc<dyn PersistentLayer>,
|
||||
last_activity_ts: SystemTime,
|
||||
}
|
||||
|
||||
|
||||
@@ -84,7 +84,6 @@ pub mod block_io;
|
||||
pub mod disk_btree;
|
||||
pub(crate) mod ephemeral_file;
|
||||
pub mod layer_map;
|
||||
pub mod layer_cache;
|
||||
|
||||
pub mod metadata;
|
||||
mod par_fsync;
|
||||
@@ -3210,6 +3209,7 @@ pub mod harness {
|
||||
evictions_low_residence_duration_metric_threshold: Some(
|
||||
tenant_conf.evictions_low_residence_duration_metric_threshold,
|
||||
),
|
||||
gc_feedback: Some(tenant_conf.gc_feedback),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -99,6 +99,7 @@ pub struct TenantConf {
|
||||
// See the corresponding metric's help string.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub evictions_low_residence_duration_metric_threshold: Duration,
|
||||
pub gc_feedback: bool,
|
||||
}
|
||||
|
||||
/// Same as TenantConf, but this struct preserves the information about
|
||||
@@ -175,6 +176,10 @@ pub struct TenantConfOpt {
|
||||
#[serde(with = "humantime_serde")]
|
||||
#[serde(default)]
|
||||
pub evictions_low_residence_duration_metric_threshold: Option<Duration>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
pub gc_feedback: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
@@ -242,6 +247,7 @@ impl TenantConfOpt {
|
||||
evictions_low_residence_duration_metric_threshold: self
|
||||
.evictions_low_residence_duration_metric_threshold
|
||||
.unwrap_or(global_conf.evictions_low_residence_duration_metric_threshold),
|
||||
gc_feedback: self.gc_feedback.unwrap_or(global_conf.gc_feedback),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -278,6 +284,7 @@ impl Default for TenantConf {
|
||||
DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD,
|
||||
)
|
||||
.expect("cannot parse default evictions_low_residence_duration_metric_threshold"),
|
||||
gc_feedback: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -372,6 +379,7 @@ impl TryFrom<&'_ models::TenantConfig> for TenantConfOpt {
|
||||
))?,
|
||||
);
|
||||
}
|
||||
tenant_conf.gc_feedback = request_data.gc_feedback;
|
||||
|
||||
Ok(tenant_conf)
|
||||
}
|
||||
|
||||
@@ -1,34 +0,0 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
|
||||
use super::storage_layer::{LayerFileName, PersistentLayer, RemoteLayerDesc};
|
||||
|
||||
pub struct LayerCache {
|
||||
layers: Mutex<HashMap<LayerFileName, Arc<dyn PersistentLayer>>>,
|
||||
}
|
||||
|
||||
impl LayerCache {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
layers: Mutex::new(HashMap::new()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get(&self, layer_fname: &LayerFileName) -> Option<Arc<dyn PersistentLayer>> {
|
||||
let guard: std::sync::MutexGuard<HashMap<LayerFileName, Arc<dyn PersistentLayer>>> =
|
||||
self.layers.lock().unwrap();
|
||||
guard.get(layer_fname).cloned()
|
||||
}
|
||||
|
||||
pub fn contains(&self, layer_fname: &LayerFileName) -> bool {
|
||||
let guard = self.layers.lock().unwrap();
|
||||
guard.contains_key(layer_fname)
|
||||
}
|
||||
|
||||
pub fn insert(&self, layer_fname: LayerFileName, persistent_layer: Arc<dyn PersistentLayer>) {
|
||||
let mut guard = self.layers.lock().unwrap();
|
||||
guard.insert(layer_fname, persistent_layer);
|
||||
}
|
||||
}
|
||||
@@ -61,7 +61,6 @@ use historic_layer_coverage::BufferedHistoricLayerCoverage;
|
||||
pub use historic_layer_coverage::Replacement;
|
||||
|
||||
use super::storage_layer::range_eq;
|
||||
use super::storage_layer::PersistentLayer;
|
||||
|
||||
///
|
||||
/// LayerMap tracks what layers exist on a timeline.
|
||||
@@ -139,19 +138,24 @@ where
|
||||
self.layer_map.remove_historic_noflush(layer)
|
||||
}
|
||||
|
||||
/// Ensure the downloaded layer matches existing layer.
|
||||
/// Replaces existing layer iff it is the `expected`.
|
||||
///
|
||||
/// Returned `Replacement` describes succeeding in checking or the reason why it could not
|
||||
/// If the expected layer has been removed it will not be inserted by this function.
|
||||
///
|
||||
/// Returned `Replacement` describes succeeding in replacement or the reason why it could not
|
||||
/// be done.
|
||||
pub fn ensure_consistent(
|
||||
&self,
|
||||
///
|
||||
/// TODO replacement can be done without buffering and rebuilding layer map updates.
|
||||
/// One way to do that is to add a layer of indirection for returned values, so
|
||||
/// that we can replace values only by updating a hashmap.
|
||||
pub fn replace_historic(
|
||||
&mut self,
|
||||
expected: &Arc<L>,
|
||||
new: Arc<dyn PersistentLayer>,
|
||||
new: Arc<L>,
|
||||
) -> anyhow::Result<Replacement<Arc<L>>> {
|
||||
fail::fail_point!("layermap-replace-notfound", |_| Ok(Replacement::NotFound));
|
||||
|
||||
self.layer_map
|
||||
.ensure_consistent_noflush(expected, new)
|
||||
self.layer_map.replace_historic_noflush(expected, new)
|
||||
}
|
||||
|
||||
// We will flush on drop anyway, but this method makes it
|
||||
@@ -305,16 +309,16 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
pub(self) fn ensure_consistent_noflush(
|
||||
&self,
|
||||
pub(self) fn replace_historic_noflush(
|
||||
&mut self,
|
||||
expected: &Arc<L>,
|
||||
new: Arc<dyn PersistentLayer>,
|
||||
new: Arc<L>,
|
||||
) -> anyhow::Result<Replacement<Arc<L>>> {
|
||||
let key = historic_layer_coverage::LayerKey::from(&**expected);
|
||||
let other = historic_layer_coverage::LayerKey::from(&*new);
|
||||
|
||||
let expected_l0 = Self::is_l0(expected);
|
||||
let new_l0 = LayerMap::<dyn PersistentLayer>::is_l0(&*new);
|
||||
let new_l0 = Self::is_l0(&new);
|
||||
|
||||
anyhow::ensure!(
|
||||
key == other,
|
||||
@@ -341,7 +345,17 @@ where
|
||||
None
|
||||
};
|
||||
|
||||
Ok(Replacement::Replaced { in_buffered: false })
|
||||
let replaced = self.historic.replace(&key, new.clone(), |existing| {
|
||||
Self::compare_arced_layers(existing, expected)
|
||||
});
|
||||
|
||||
if let Replacement::Replaced { .. } = &replaced {
|
||||
if let Some(index) = l0_index {
|
||||
self.l0_delta_layers[index] = new;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(replaced)
|
||||
}
|
||||
|
||||
/// Helper function for BatchedUpdates::drop.
|
||||
|
||||
@@ -204,6 +204,35 @@ fn test_off_by_one() {
|
||||
assert_eq!(version.image_coverage.query(5), None);
|
||||
}
|
||||
|
||||
/// White-box regression test, checking for incorrect removal of node at key.end
|
||||
#[test]
|
||||
fn test_regression() {
|
||||
let mut map = HistoricLayerCoverage::<String>::new();
|
||||
map.insert(
|
||||
LayerKey {
|
||||
key: 0..5,
|
||||
lsn: 0..5,
|
||||
is_image: false,
|
||||
},
|
||||
"Layer 1".to_string(),
|
||||
);
|
||||
map.insert(
|
||||
LayerKey {
|
||||
key: 0..5,
|
||||
lsn: 1..2,
|
||||
is_image: false,
|
||||
},
|
||||
"Layer 2".to_string(),
|
||||
);
|
||||
|
||||
// If an insertion operation improperly deletes the endpoint of a previous layer
|
||||
// (which is more likely to happen with layers that collide on key.end), we will
|
||||
// end up with an infinite layer, covering the entire keyspace. Here we assert
|
||||
// that there's no layer at key 100 because we didn't insert any layer there.
|
||||
let version = map.get_version(100).unwrap();
|
||||
assert_eq!(version.delta_coverage.query(100), None);
|
||||
}
|
||||
|
||||
/// Cover edge cases where layers begin or end on the same key
|
||||
#[test]
|
||||
fn test_key_collision() {
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use std::ops::Range;
|
||||
|
||||
// TODO the `im` crate has 20x more downloads and also has
|
||||
// persistent/immutable BTree. It also runs a bit faster but
|
||||
// results are not the same on some tests.
|
||||
// NOTE the `im` crate has 20x more downloads and also has
|
||||
// persistent/immutable BTree. But it's bugged so rpds is a
|
||||
// better choice https://github.com/neondatabase/neon/issues/3395
|
||||
use rpds::RedBlackTreeMapSync;
|
||||
|
||||
/// Data structure that can efficiently:
|
||||
@@ -10,19 +10,22 @@ use rpds::RedBlackTreeMapSync;
|
||||
/// - iterate the latest layers in a key range
|
||||
/// - insert layers in non-decreasing lsn.start order
|
||||
///
|
||||
/// The struct is parameterized over Value for easier
|
||||
/// testing, but in practice it's some sort of layer.
|
||||
/// For a detailed explanation and justification of this approach, see:
|
||||
/// https://neon.tech/blog/persistent-structures-in-neons-wal-indexing
|
||||
///
|
||||
/// NOTE The struct is parameterized over Value for easier
|
||||
/// testing, but in practice it's some sort of layer.
|
||||
pub struct LayerCoverage<Value> {
|
||||
/// For every change in coverage (as we sweep the key space)
|
||||
/// we store (lsn.end, value).
|
||||
///
|
||||
/// We use an immutable/persistent tree so that we can keep historic
|
||||
/// versions of this coverage without cloning the whole thing and
|
||||
/// incurring quadratic memory cost. See HistoricLayerCoverage.
|
||||
/// NOTE We use an immutable/persistent tree so that we can keep historic
|
||||
/// versions of this coverage without cloning the whole thing and
|
||||
/// incurring quadratic memory cost. See HistoricLayerCoverage.
|
||||
///
|
||||
/// We use the Sync version of the map because we want Self to
|
||||
/// be Sync. Using nonsync might be faster, if we can work with
|
||||
/// that.
|
||||
/// NOTE We use the Sync version of the map because we want Self to
|
||||
/// be Sync. Using nonsync might be faster, if we can work with
|
||||
/// that.
|
||||
nodes: RedBlackTreeMapSync<i128, Option<(u64, Value)>>,
|
||||
}
|
||||
|
||||
@@ -41,6 +44,13 @@ impl<Value: Clone> LayerCoverage<Value> {
|
||||
|
||||
/// Helper function to subdivide the key range without changing any values
|
||||
///
|
||||
/// This operation has no semantic effect by itself. It only helps us pin in
|
||||
/// place the part of the coverage we don't want to change when inserting.
|
||||
///
|
||||
/// As an analogy, think of a polygon. If you add a vertex along one of the
|
||||
/// segments, the polygon is still the same, but it behaves differently when
|
||||
/// we move or delete one of the other points.
|
||||
///
|
||||
/// Complexity: O(log N)
|
||||
fn add_node(&mut self, key: i128) {
|
||||
let value = match self.nodes.range(..=key).last() {
|
||||
@@ -74,7 +84,7 @@ impl<Value: Clone> LayerCoverage<Value> {
|
||||
let mut to_update = Vec::new();
|
||||
let mut to_remove = Vec::new();
|
||||
let mut prev_covered = false;
|
||||
for (k, node) in self.nodes.range(key.clone()) {
|
||||
for (k, node) in self.nodes.range(key) {
|
||||
let needs_cover = match node {
|
||||
None => true,
|
||||
Some((h, _)) => h < &lsn.end,
|
||||
@@ -87,9 +97,8 @@ impl<Value: Clone> LayerCoverage<Value> {
|
||||
}
|
||||
prev_covered = needs_cover;
|
||||
}
|
||||
if !prev_covered {
|
||||
to_remove.push(key.end);
|
||||
}
|
||||
// TODO check if the nodes inserted at key.start and key.end are safe
|
||||
// to remove. It's fine to keep them but they could be redundant.
|
||||
for k in to_update {
|
||||
self.nodes.insert_mut(k, Some((lsn.end, value.clone())));
|
||||
}
|
||||
|
||||
@@ -37,7 +37,7 @@ pub use delta_layer::{DeltaLayer, DeltaLayerWriter};
|
||||
pub use filename::{DeltaFileName, ImageFileName, LayerFileName};
|
||||
pub use image_layer::{ImageLayer, ImageLayerWriter};
|
||||
pub use inmemory_layer::InMemoryLayer;
|
||||
pub use remote_layer::RemoteLayerDesc;
|
||||
pub use remote_layer::RemoteLayer;
|
||||
|
||||
use super::layer_map::BatchedUpdates;
|
||||
|
||||
@@ -431,6 +431,14 @@ pub trait PersistentLayer: Layer {
|
||||
/// Permanently remove this layer from disk.
|
||||
fn delete_resident_layer_file(&self) -> Result<()>;
|
||||
|
||||
fn downcast_remote_layer(self: Arc<Self>) -> Option<std::sync::Arc<RemoteLayer>> {
|
||||
None
|
||||
}
|
||||
|
||||
fn is_remote_layer(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
/// Returns None if the layer file size is not known.
|
||||
///
|
||||
/// Should not change over the lifetime of the layer object because
|
||||
@@ -442,6 +450,16 @@ pub trait PersistentLayer: Layer {
|
||||
fn access_stats(&self) -> &LayerAccessStats;
|
||||
}
|
||||
|
||||
pub fn downcast_remote_layer(
|
||||
layer: &Arc<dyn PersistentLayer>,
|
||||
) -> Option<std::sync::Arc<RemoteLayer>> {
|
||||
if layer.is_remote_layer() {
|
||||
Arc::clone(layer).downcast_remote_layer()
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Holds metadata about a layer without any content. Used mostly for testing.
|
||||
///
|
||||
/// To use filenames as fixtures, parse them as [`LayerFileName`] then convert from that to a
|
||||
|
||||
@@ -30,7 +30,6 @@ use crate::repository::{Key, Value, KEY_SIZE};
|
||||
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
|
||||
use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockReader, FileBlockReader};
|
||||
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
|
||||
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use crate::tenant::storage_layer::{
|
||||
PersistentLayer, ValueReconstructResult, ValueReconstructState,
|
||||
};
|
||||
@@ -58,7 +57,7 @@ use utils::{
|
||||
|
||||
use super::{
|
||||
DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, LayerFileName, LayerIter,
|
||||
LayerKeyIter, PathOrConf, RemoteLayerDesc,
|
||||
LayerKeyIter, PathOrConf,
|
||||
};
|
||||
|
||||
///
|
||||
@@ -664,17 +663,6 @@ impl DeltaLayer {
|
||||
&self.layer_name(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Create layer descriptor for this image layer
|
||||
pub fn layer_desc(&self) -> RemoteLayerDesc {
|
||||
RemoteLayerDesc::new_delta(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
&self.layer_name(),
|
||||
&LayerFileMetadata::new(self.file_size()),
|
||||
LayerAccessStats::empty_will_record_residence_event_later(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// A builder object for constructing a new delta layer.
|
||||
|
||||
@@ -26,7 +26,6 @@ use crate::repository::{Key, KEY_SIZE};
|
||||
use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
|
||||
use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader};
|
||||
use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
|
||||
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
|
||||
use crate::tenant::storage_layer::{
|
||||
LayerAccessStats, PersistentLayer, ValueReconstructResult, ValueReconstructState,
|
||||
};
|
||||
@@ -54,7 +53,7 @@ use utils::{
|
||||
};
|
||||
|
||||
use super::filename::{ImageFileName, LayerFileName};
|
||||
use super::{Layer, LayerAccessStatsReset, LayerIter, PathOrConf, RemoteLayerDesc};
|
||||
use super::{Layer, LayerAccessStatsReset, LayerIter, PathOrConf};
|
||||
|
||||
///
|
||||
/// Header stored in the beginning of the file
|
||||
@@ -465,17 +464,6 @@ impl ImageLayer {
|
||||
&self.layer_name(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Create layer descriptor for this image layer
|
||||
pub fn layer_desc(&self) -> RemoteLayerDesc {
|
||||
RemoteLayerDesc::new_img(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
&self.layer_name(),
|
||||
&LayerFileMetadata::new(self.file_size()),
|
||||
LayerAccessStats::empty_will_record_residence_event_later(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// A builder object for constructing a new image layer.
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
//! A RemoteLayerDesc is an in-memory placeholder for a layer file that exists
|
||||
//! A RemoteLayer is an in-memory placeholder for a layer file that exists
|
||||
//! in remote storage.
|
||||
//!
|
||||
use crate::config::PageServerConf;
|
||||
@@ -25,19 +25,19 @@ use super::{
|
||||
LayerResidenceStatus, PersistentLayer,
|
||||
};
|
||||
|
||||
/// RemoteLayerDesc is a not yet downloaded [`ImageLayer`] or
|
||||
/// RemoteLayer is a not yet downloaded [`ImageLayer`] or
|
||||
/// [`crate::storage_layer::DeltaLayer`].
|
||||
///
|
||||
/// RemoteLayerDesc might be downloaded on-demand during operations which are
|
||||
/// RemoteLayer might be downloaded on-demand during operations which are
|
||||
/// allowed download remote layers and during which, it gets replaced with a
|
||||
/// concrete `DeltaLayer` or `ImageLayer`.
|
||||
///
|
||||
/// See: [`crate::context::RequestContext`] for authorization to download
|
||||
pub struct RemoteLayerDesc {
|
||||
pub(crate) tenantid: TenantId,
|
||||
pub(crate) timelineid: TimelineId,
|
||||
pub(crate) key_range: Range<Key>,
|
||||
pub(crate) lsn_range: Range<Lsn>,
|
||||
pub struct RemoteLayer {
|
||||
tenantid: TenantId,
|
||||
timelineid: TimelineId,
|
||||
key_range: Range<Key>,
|
||||
lsn_range: Range<Lsn>,
|
||||
|
||||
pub file_name: LayerFileName,
|
||||
|
||||
@@ -54,7 +54,7 @@ pub struct RemoteLayerDesc {
|
||||
/// Has `LayerMap::replace` failed for this (true) or not (false).
|
||||
///
|
||||
/// Used together with [`ongoing_download`] semaphore in `Timeline::download_remote_layer`.
|
||||
/// The field is used to mark a RemoteLayerDesc permanently (until restart or ignore+load)
|
||||
/// The field is used to mark a RemoteLayer permanently (until restart or ignore+load)
|
||||
/// unprocessable, because a LayerMap::replace failed.
|
||||
///
|
||||
/// It is very unlikely to accumulate these in the Timeline's LayerMap, but having this avoids
|
||||
@@ -63,9 +63,9 @@ pub struct RemoteLayerDesc {
|
||||
pub(crate) download_replacement_failure: std::sync::atomic::AtomicBool,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for RemoteLayerDesc {
|
||||
impl std::fmt::Debug for RemoteLayer {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("RemoteLayerDesc")
|
||||
f.debug_struct("RemoteLayer")
|
||||
.field("file_name", &self.file_name)
|
||||
.field("layer_metadata", &self.layer_metadata)
|
||||
.field("is_incremental", &self.is_incremental)
|
||||
@@ -73,7 +73,7 @@ impl std::fmt::Debug for RemoteLayerDesc {
|
||||
}
|
||||
}
|
||||
|
||||
impl Layer for RemoteLayerDesc {
|
||||
impl Layer for RemoteLayer {
|
||||
fn get_key_range(&self) -> Range<Key> {
|
||||
self.key_range.clone()
|
||||
}
|
||||
@@ -119,7 +119,7 @@ impl Layer for RemoteLayerDesc {
|
||||
}
|
||||
}
|
||||
|
||||
impl PersistentLayer for RemoteLayerDesc {
|
||||
impl PersistentLayer for RemoteLayer {
|
||||
fn get_tenant_id(&self) -> TenantId {
|
||||
self.tenantid
|
||||
}
|
||||
@@ -160,6 +160,14 @@ impl PersistentLayer for RemoteLayerDesc {
|
||||
bail!("remote layer has no layer file");
|
||||
}
|
||||
|
||||
fn downcast_remote_layer<'a>(self: Arc<Self>) -> Option<std::sync::Arc<RemoteLayer>> {
|
||||
Some(self)
|
||||
}
|
||||
|
||||
fn is_remote_layer(&self) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
fn file_size(&self) -> u64 {
|
||||
self.layer_metadata.file_size()
|
||||
}
|
||||
@@ -193,15 +201,15 @@ impl PersistentLayer for RemoteLayerDesc {
|
||||
}
|
||||
}
|
||||
|
||||
impl RemoteLayerDesc {
|
||||
impl RemoteLayer {
|
||||
pub fn new_img(
|
||||
tenantid: TenantId,
|
||||
timelineid: TimelineId,
|
||||
fname: &ImageFileName,
|
||||
layer_metadata: &LayerFileMetadata,
|
||||
access_stats: LayerAccessStats,
|
||||
) -> RemoteLayerDesc {
|
||||
RemoteLayerDesc {
|
||||
) -> RemoteLayer {
|
||||
RemoteLayer {
|
||||
tenantid,
|
||||
timelineid,
|
||||
key_range: fname.key_range.clone(),
|
||||
@@ -222,8 +230,8 @@ impl RemoteLayerDesc {
|
||||
fname: &DeltaFileName,
|
||||
layer_metadata: &LayerFileMetadata,
|
||||
access_stats: LayerAccessStats,
|
||||
) -> RemoteLayerDesc {
|
||||
RemoteLayerDesc {
|
||||
) -> RemoteLayer {
|
||||
RemoteLayer {
|
||||
tenantid,
|
||||
timelineid,
|
||||
key_range: fname.key_range.clone(),
|
||||
|
||||
@@ -35,7 +35,7 @@ use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata};
|
||||
use crate::tenant::storage_layer::{
|
||||
DeltaFileName, DeltaLayerWriter, ImageFileName, ImageLayerWriter, InMemoryLayer,
|
||||
LayerAccessStats, LayerFileName, RemoteLayerDesc,
|
||||
LayerAccessStats, LayerFileName, RemoteLayer,
|
||||
};
|
||||
use crate::tenant::{
|
||||
ephemeral_file::is_ephemeral_file,
|
||||
@@ -77,7 +77,6 @@ use self::eviction_task::EvictionTaskTimelineState;
|
||||
use self::walreceiver::{WalReceiver, WalReceiverConf};
|
||||
|
||||
use super::config::TenantConf;
|
||||
use super::layer_cache::LayerCache;
|
||||
use super::layer_map::BatchedUpdates;
|
||||
use super::remote_timeline_client::index::IndexPart;
|
||||
use super::remote_timeline_client::RemoteTimelineClient;
|
||||
@@ -120,7 +119,7 @@ pub struct Timeline {
|
||||
|
||||
pub pg_version: u32,
|
||||
|
||||
pub(super) layers: RwLock<LayerMap<RemoteLayerDesc>>,
|
||||
pub(super) layers: RwLock<LayerMap<dyn PersistentLayer>>,
|
||||
|
||||
/// Set of key ranges which should be covered by image layers to
|
||||
/// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
|
||||
@@ -242,8 +241,6 @@ pub struct Timeline {
|
||||
pub delete_lock: tokio::sync::Mutex<bool>,
|
||||
|
||||
eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
|
||||
|
||||
layer_cache: Arc<LayerCache>,
|
||||
}
|
||||
|
||||
/// Internal structure to hold all data needed for logical size calculation.
|
||||
@@ -1010,22 +1007,20 @@ impl Timeline {
|
||||
|
||||
#[instrument(skip_all, fields(tenant = %self.tenant_id, timeline = %self.timeline_id))]
|
||||
pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
|
||||
let Some(remote_layer_desc) = self.find_layer_desc(layer_file_name) else { return Ok(None) };
|
||||
if self.layer_cache.contains(&remote_layer_desc.filename()) {
|
||||
return Ok(Some(false));
|
||||
}
|
||||
let Some(layer) = self.find_layer(layer_file_name) else { return Ok(None) };
|
||||
let Some(remote_layer) = layer.downcast_remote_layer() else { return Ok(Some(false)) };
|
||||
if self.remote_client.is_none() {
|
||||
return Ok(Some(false));
|
||||
}
|
||||
|
||||
self.download_remote_layer(remote_layer_desc).await?;
|
||||
self.download_remote_layer(remote_layer).await?;
|
||||
Ok(Some(true))
|
||||
}
|
||||
|
||||
/// Like [`evict_layer_batch`], but for just one layer.
|
||||
/// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`.
|
||||
pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
|
||||
let Some(local_layer) = self.find_layer_desc(layer_file_name) else { return Ok(None) };
|
||||
let Some(local_layer) = self.find_layer(layer_file_name) else { return Ok(None) };
|
||||
let remote_client = self
|
||||
.remote_client
|
||||
.as_ref()
|
||||
@@ -1052,7 +1047,7 @@ impl Timeline {
|
||||
pub async fn evict_layers(
|
||||
&self,
|
||||
_: &GenericRemoteStorage,
|
||||
layers_to_evict: &[Arc<RemoteLayerDesc>],
|
||||
layers_to_evict: &[Arc<dyn PersistentLayer>],
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<Vec<Option<anyhow::Result<bool>>>> {
|
||||
let remote_client = self.remote_client.clone().expect(
|
||||
@@ -1087,7 +1082,7 @@ impl Timeline {
|
||||
async fn evict_layer_batch(
|
||||
&self,
|
||||
remote_client: &Arc<RemoteTimelineClient>,
|
||||
layers_to_evict: &[Arc<RemoteLayerDesc>],
|
||||
layers_to_evict: &[Arc<dyn PersistentLayer>],
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<Vec<Option<anyhow::Result<bool>>>> {
|
||||
// ensure that the layers have finished uploading
|
||||
@@ -1136,12 +1131,12 @@ impl Timeline {
|
||||
fn evict_layer_batch_impl(
|
||||
&self,
|
||||
_layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
|
||||
local_layer: &Arc<RemoteLayerDesc>,
|
||||
batch_updates: &mut BatchedUpdates<'_, RemoteLayerDesc>,
|
||||
local_layer: &Arc<dyn PersistentLayer>,
|
||||
batch_updates: &mut BatchedUpdates<'_, dyn PersistentLayer>,
|
||||
) -> anyhow::Result<bool> {
|
||||
use super::layer_map::Replacement;
|
||||
|
||||
if !self.layer_cache.contains(&local_layer.filename()) {
|
||||
if local_layer.is_remote_layer() {
|
||||
// TODO(issue #3851): consider returning an err here instead of false,
|
||||
// which is the same out the match later
|
||||
return Ok(false);
|
||||
@@ -1168,7 +1163,7 @@ impl Timeline {
|
||||
let layer_metadata = LayerFileMetadata::new(layer_file_size);
|
||||
|
||||
let new_remote_layer = Arc::new(match local_layer.filename() {
|
||||
LayerFileName::Image(image_name) => RemoteLayerDesc::new_img(
|
||||
LayerFileName::Image(image_name) => RemoteLayer::new_img(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
&image_name,
|
||||
@@ -1177,7 +1172,7 @@ impl Timeline {
|
||||
.access_stats()
|
||||
.clone_for_residence_change(batch_updates, LayerResidenceStatus::Evicted),
|
||||
),
|
||||
LayerFileName::Delta(delta_name) => RemoteLayerDesc::new_delta(
|
||||
LayerFileName::Delta(delta_name) => RemoteLayer::new_delta(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
&delta_name,
|
||||
@@ -1188,7 +1183,6 @@ impl Timeline {
|
||||
),
|
||||
});
|
||||
|
||||
/*
|
||||
let replaced = match batch_updates.replace_historic(local_layer, new_remote_layer)? {
|
||||
Replacement::Replaced { .. } => {
|
||||
if let Err(e) = local_layer.delete_resident_layer_file() {
|
||||
@@ -1239,10 +1233,8 @@ impl Timeline {
|
||||
false
|
||||
}
|
||||
};
|
||||
*/
|
||||
|
||||
// Ok(replaced)
|
||||
Ok(true)
|
||||
Ok(replaced)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1299,6 +1291,13 @@ impl Timeline {
|
||||
.unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
|
||||
}
|
||||
|
||||
fn get_gc_feedback(&self) -> bool {
|
||||
let tenant_conf = self.tenant_conf.read().unwrap();
|
||||
tenant_conf
|
||||
.gc_feedback
|
||||
.unwrap_or(self.conf.default_tenant_conf.gc_feedback)
|
||||
}
|
||||
|
||||
pub(super) fn tenant_conf_updated(&self) {
|
||||
// NB: Most tenant conf options are read by background loops, so,
|
||||
// changes will automatically be picked up.
|
||||
@@ -1427,8 +1426,6 @@ impl Timeline {
|
||||
EvictionTaskTimelineState::default(),
|
||||
),
|
||||
delete_lock: tokio::sync::Mutex::new(false),
|
||||
|
||||
layer_cache: Arc::new(LayerCache::new()),
|
||||
};
|
||||
result.repartition_threshold = result.get_checkpoint_distance() / 10;
|
||||
result
|
||||
@@ -1575,12 +1572,9 @@ impl Timeline {
|
||||
LayerAccessStats::for_loading_layer(&updates, LayerResidenceStatus::Resident),
|
||||
);
|
||||
|
||||
let remote_desc = layer.layer_desc();
|
||||
|
||||
trace!("found layer {}", layer.path().display());
|
||||
total_physical_size += file_size;
|
||||
self.layer_cache.insert(layer.filename(), Arc::new(layer));
|
||||
updates.insert_historic(Arc::new(remote_desc));
|
||||
updates.insert_historic(Arc::new(layer));
|
||||
num_layers += 1;
|
||||
} else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) {
|
||||
// Create a DeltaLayer struct for each delta file.
|
||||
@@ -1612,9 +1606,7 @@ impl Timeline {
|
||||
|
||||
trace!("found layer {}", layer.path().display());
|
||||
total_physical_size += file_size;
|
||||
let remote_desc = layer.layer_desc();
|
||||
self.layer_cache.insert(layer.filename(), Arc::new(layer));
|
||||
updates.insert_historic(Arc::new(remote_desc));
|
||||
updates.insert_historic(Arc::new(layer));
|
||||
num_layers += 1;
|
||||
} else if fname == METADATA_FILE_NAME || fname.ends_with(".old") {
|
||||
// ignore these
|
||||
@@ -1659,9 +1651,9 @@ impl Timeline {
|
||||
async fn create_remote_layers(
|
||||
&self,
|
||||
index_part: &IndexPart,
|
||||
local_layers: HashMap<LayerFileName, Arc<RemoteLayerDesc>>,
|
||||
local_layers: HashMap<LayerFileName, Arc<dyn PersistentLayer>>,
|
||||
up_to_date_disk_consistent_lsn: Lsn,
|
||||
) -> anyhow::Result<HashMap<LayerFileName, Arc<RemoteLayerDesc>>> {
|
||||
) -> anyhow::Result<HashMap<LayerFileName, Arc<dyn PersistentLayer>>> {
|
||||
// Are we missing some files that are present in remote storage?
|
||||
// Create RemoteLayer instances for them.
|
||||
let mut local_only_layers = local_layers;
|
||||
@@ -1740,7 +1732,7 @@ impl Timeline {
|
||||
continue;
|
||||
}
|
||||
|
||||
let remote_layer = RemoteLayerDesc::new_img(
|
||||
let remote_layer = RemoteLayer::new_img(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
imgfilename,
|
||||
@@ -1768,7 +1760,7 @@ impl Timeline {
|
||||
);
|
||||
continue;
|
||||
}
|
||||
let remote_layer = RemoteLayerDesc::new_delta(
|
||||
let remote_layer = RemoteLayer::new_delta(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
deltafilename,
|
||||
@@ -2174,7 +2166,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
fn find_layer_desc(&self, layer_file_name: &str) -> Option<Arc<RemoteLayerDesc>> {
|
||||
fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
|
||||
for historic_layer in self.layers.read().unwrap().iter_historic_layers() {
|
||||
let historic_layer_name = historic_layer.filename().file_name();
|
||||
if layer_file_name == historic_layer_name {
|
||||
@@ -2191,10 +2183,10 @@ impl Timeline {
|
||||
&self,
|
||||
// we cannot remove layers otherwise, since gc and compaction will race
|
||||
_layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
|
||||
layer: Arc<RemoteLayerDesc>,
|
||||
updates: &mut BatchedUpdates<'_, RemoteLayerDesc>,
|
||||
layer: Arc<dyn PersistentLayer>,
|
||||
updates: &mut BatchedUpdates<'_, dyn PersistentLayer>,
|
||||
) -> anyhow::Result<()> {
|
||||
if self.layer_cache.contains(&layer.filename()) {
|
||||
if !layer.is_remote_layer() {
|
||||
layer.delete_resident_layer_file()?;
|
||||
let layer_file_size = layer.file_size();
|
||||
self.metrics
|
||||
@@ -2443,7 +2435,13 @@ impl Timeline {
|
||||
|
||||
if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
|
||||
// If it's a remote layer, download it and retry.
|
||||
if let Some(layer) = self.layer_cache.get(&layer.filename()) {
|
||||
if let Some(remote_layer) =
|
||||
super::storage_layer::downcast_remote_layer(&layer)
|
||||
{
|
||||
// TODO: push a breadcrumb to 'traversal_path' to record the fact that
|
||||
// we downloaded / would need to download this layer.
|
||||
remote_layer // download happens outside the scope of `layers` guard object
|
||||
} else {
|
||||
// Get all the data needed to reconstruct the page version from this layer.
|
||||
// But if we have an older cached page image, no need to go past that.
|
||||
let lsn_floor = max(cached_lsn + 1, lsn_floor);
|
||||
@@ -2466,10 +2464,6 @@ impl Timeline {
|
||||
}),
|
||||
));
|
||||
continue 'outer;
|
||||
} else {
|
||||
// TODO: push a breadcrumb to 'traversal_path' to record the fact that
|
||||
// we downloaded / would need to download this layer.
|
||||
layer // download happens outside the scope of `layers` guard object
|
||||
}
|
||||
} else if timeline.ancestor_timeline.is_some() {
|
||||
// Nothing on this timeline. Traverse to parent
|
||||
@@ -2909,8 +2903,7 @@ impl Timeline {
|
||||
LayerResidenceStatus::Resident,
|
||||
LayerResidenceEventReason::LayerCreate,
|
||||
);
|
||||
batch_updates.insert_historic(Arc::new(l.layer_desc()));
|
||||
self.layer_cache.insert(l.filename(), l);
|
||||
batch_updates.insert_historic(l);
|
||||
batch_updates.flush();
|
||||
|
||||
// update the timeline's physical size
|
||||
@@ -3138,6 +3131,7 @@ impl Timeline {
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let mut updates = layers.batch_update();
|
||||
let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id);
|
||||
|
||||
for l in image_layers {
|
||||
let path = l.filename();
|
||||
let metadata = timeline_path
|
||||
@@ -3156,9 +3150,7 @@ impl Timeline {
|
||||
LayerResidenceStatus::Resident,
|
||||
LayerResidenceEventReason::LayerCreate,
|
||||
);
|
||||
updates.insert_historic(Arc::new(l.layer_desc()));
|
||||
let x: Arc<dyn PersistentLayer> = l;
|
||||
self.layer_cache.insert(x.filename(), x)
|
||||
updates.insert_historic(l);
|
||||
}
|
||||
updates.flush();
|
||||
drop(layers);
|
||||
@@ -3171,7 +3163,7 @@ impl Timeline {
|
||||
#[derive(Default)]
|
||||
struct CompactLevel0Phase1Result {
|
||||
new_layers: Vec<DeltaLayer>,
|
||||
deltas_to_compact: Vec<Arc<RemoteLayerDesc>>,
|
||||
deltas_to_compact: Vec<Arc<dyn PersistentLayer>>,
|
||||
}
|
||||
|
||||
/// Top-level failure to compact.
|
||||
@@ -3181,7 +3173,7 @@ enum CompactionError {
|
||||
///
|
||||
/// This should not happen repeatedly, but will be retried once by top-level
|
||||
/// `Timeline::compact`.
|
||||
DownloadRequired(Vec<Arc<RemoteLayerDesc>>),
|
||||
DownloadRequired(Vec<Arc<RemoteLayer>>),
|
||||
/// Compaction cannot be done right now; page reconstruction and so on.
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
@@ -3253,9 +3245,13 @@ impl Timeline {
|
||||
|
||||
let remotes = deltas_to_compact
|
||||
.iter()
|
||||
.filter(|l| !self.layer_cache.contains(&l.filename()))
|
||||
.filter(|l| l.is_remote_layer())
|
||||
.inspect(|l| info!("compact requires download of {}", l.filename().file_name()))
|
||||
.cloned()
|
||||
.map(|l| {
|
||||
l.clone()
|
||||
.downcast_remote_layer()
|
||||
.expect("just checked it is remote layer")
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
if !remotes.is_empty() {
|
||||
@@ -3595,15 +3591,13 @@ impl Timeline {
|
||||
.add(metadata.len());
|
||||
|
||||
new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len()));
|
||||
let remote_desc = l.layer_desc();
|
||||
let x: Arc<dyn PersistentLayer + 'static> = Arc::new(l);
|
||||
x.access_stats().record_residence_event(
|
||||
&updates,
|
||||
LayerResidenceStatus::Resident,
|
||||
LayerResidenceEventReason::LayerCreate,
|
||||
);
|
||||
updates.insert_historic(Arc::new(remote_desc));
|
||||
self.layer_cache.insert(x.filename(), x)
|
||||
updates.insert_historic(x);
|
||||
}
|
||||
|
||||
// Now that we have reshuffled the data to set of new delta layers, we can
|
||||
@@ -3910,7 +3904,7 @@ impl Timeline {
|
||||
// delta layers. Image layers can form "stairs" preventing old image from been deleted.
|
||||
// But image layers are in any case less sparse than delta layers. Also we need some
|
||||
// protection from replacing recent image layers with new one after each GC iteration.
|
||||
if l.is_incremental() && !LayerMap::is_l0(&*l) {
|
||||
if self.get_gc_feedback() && l.is_incremental() && !LayerMap::is_l0(&*l) {
|
||||
wanted_image_layers.add_range(l.get_key_range());
|
||||
}
|
||||
result.layers_not_updated += 1;
|
||||
@@ -4076,7 +4070,7 @@ impl Timeline {
|
||||
#[instrument(skip_all, fields(layer=%remote_layer.short_id()))]
|
||||
pub async fn download_remote_layer(
|
||||
&self,
|
||||
remote_layer: Arc<RemoteLayerDesc>,
|
||||
remote_layer: Arc<RemoteLayer>,
|
||||
) -> anyhow::Result<()> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
@@ -4133,12 +4127,10 @@ impl Timeline {
|
||||
// Delta- or ImageLayer in the layer map.
|
||||
let mut layers = self_clone.layers.write().unwrap();
|
||||
let mut updates = layers.batch_update();
|
||||
let new_layer =
|
||||
remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size);
|
||||
let new_layer = remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size);
|
||||
{
|
||||
use crate::tenant::layer_map::Replacement;
|
||||
let l: Arc<dyn PersistentLayer> = remote_layer.clone();
|
||||
/*
|
||||
let failure = match updates.replace_historic(&l, new_layer) {
|
||||
Ok(Replacement::Replaced { .. }) => false,
|
||||
Ok(Replacement::NotFound) => {
|
||||
@@ -4193,9 +4185,8 @@ impl Timeline {
|
||||
remote_layer
|
||||
.download_replacement_failure
|
||||
.store(true, Relaxed);
|
||||
} */
|
||||
}
|
||||
}
|
||||
|
||||
updates.flush();
|
||||
drop(layers);
|
||||
|
||||
@@ -4208,10 +4199,7 @@ impl Timeline {
|
||||
remote_layer.ongoing_download.close();
|
||||
} else {
|
||||
// Keep semaphore open. We'll drop the permit at the end of the function.
|
||||
error!(
|
||||
"layer file download failed: {:?}",
|
||||
result.as_ref().unwrap_err()
|
||||
);
|
||||
error!("layer file download failed: {:?}", result.as_ref().unwrap_err());
|
||||
}
|
||||
|
||||
// Don't treat it as an error if the task that triggered the download
|
||||
@@ -4225,8 +4213,7 @@ impl Timeline {
|
||||
drop(permit);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
.in_current_span(),
|
||||
}.in_current_span(),
|
||||
);
|
||||
|
||||
receiver.await.context("download task cancelled")?
|
||||
@@ -4299,7 +4286,7 @@ impl Timeline {
|
||||
let layers = self.layers.read().unwrap();
|
||||
layers
|
||||
.iter_historic_layers()
|
||||
.filter(|l| !self.layer_cache.contains(&l.filename()))
|
||||
.filter_map(|l| l.downcast_remote_layer())
|
||||
.map(|l| self.download_remote_layer(l))
|
||||
.for_each(|dl| downloads.push(dl))
|
||||
}
|
||||
@@ -4374,7 +4361,7 @@ pub struct DiskUsageEvictionInfo {
|
||||
}
|
||||
|
||||
pub struct LocalLayerInfoForDiskUsageEviction {
|
||||
pub layer: Arc<RemoteLayerDesc>,
|
||||
pub layer: Arc<dyn PersistentLayer>,
|
||||
pub last_activity_ts: SystemTime,
|
||||
}
|
||||
|
||||
@@ -4408,7 +4395,7 @@ impl Timeline {
|
||||
let file_size = l.file_size();
|
||||
max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
|
||||
|
||||
if !self.layer_cache.contains(&l.filename()) {
|
||||
if l.is_remote_layer() {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
@@ -29,7 +29,7 @@ use crate::{
|
||||
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
|
||||
tenant::{
|
||||
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
|
||||
storage_layer::{PersistentLayer, RemoteLayerDesc},
|
||||
storage_layer::PersistentLayer,
|
||||
LogicalSizeCalculationCause, Tenant,
|
||||
},
|
||||
};
|
||||
@@ -184,11 +184,11 @@ impl Timeline {
|
||||
// NB: all the checks can be invalidated as soon as we release the layer map lock.
|
||||
// We don't want to hold the layer map lock during eviction.
|
||||
// So, we just need to deal with this.
|
||||
let candidates: Vec<Arc<RemoteLayerDesc>> = {
|
||||
let candidates: Vec<Arc<dyn PersistentLayer>> = {
|
||||
let layers = self.layers.read().unwrap();
|
||||
let mut candidates = Vec::new();
|
||||
for hist_layer in layers.iter_historic_layers() {
|
||||
if !self.layer_cache.contains(&hist_layer.filename()) {
|
||||
if hist_layer.is_remote_layer() {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
@@ -158,6 +158,7 @@ def test_fully_custom_config(positive_env: NeonEnv):
|
||||
"threshold": "23h",
|
||||
},
|
||||
"evictions_low_residence_duration_metric_threshold": "2days",
|
||||
"gc_feedback": True,
|
||||
"gc_horizon": 23 * (1024 * 1024),
|
||||
"gc_period": "2h 13m",
|
||||
"image_creation_threshold": 7,
|
||||
|
||||
Reference in New Issue
Block a user