Compare commits

..

13 Commits

Author SHA1 Message Date
Konstantin Knizhnik
f992cfe86f Call AtEOXact_SMgr at end of page redo 2025-03-02 17:01:40 +02:00
Konstantin Knizhnik
6d82eddecb Close relation in walredo 2025-03-02 09:17:28 +02:00
Konstantin Knizhnik
a25813b8a6 Reset local relation cache in walredo 2025-03-01 22:05:03 +02:00
Konstantin Knizhnik
3a8f50c6c8 Fix indentation 2025-02-27 07:50:44 +02:00
Konstantin Knizhnik
51f16af1ac Ignore LOG message of walredo process at PS 2025-02-26 20:55:02 +02:00
Konstantin Knizhnik
9e4a223413 Set log_min_messages to WARNING for walredo process 2025-02-26 16:18:54 +02:00
Konstantin Knizhnik
ef55ee0e6c Set log_min_messages to WARNING for walredo process 2025-02-26 16:18:30 +02:00
Konstantin Knizhnik
adfd4b1836 Print stack trace when number of writtern pages during walredo exceeds 32 2025-02-25 13:40:44 +02:00
Konstantin Knizhnik
0ee2b646ab Allow futex syscall to enable errbacktrace 2025-02-22 21:06:49 +02:00
Konstantin Knizhnik
212d27f24e Increase inmem SMGR size for walredo process to 100 pagees 2025-02-22 16:00:55 +02:00
Konstantin Knizhnik
b1d8771d5f Store prefetch results in LFC cache once as soon as they are received (#10442)
## Problem

Prefetch is performed locally, so different backers can request the same
pages form PS.
Such duplicated request increase load of page server and network
traffic.
Making prefetch global seems to be very difficult and undesirable,
because different queries can access chunks on different speed. Storing
prefetch chunks in LFC will not completely eliminate duplicates, but can
minimise such requests.

The problem with storing prefetch result in LFC is that in this case
page is not protected by share buffer lock.
So we will have to perform extra synchronisation at LFC side.

See:

https://neondb.slack.com/archives/C0875PUD0LC/p1736772890602029?thread_ts=1736762541.116949&cid=C0875PUD0LC

@MMeent implementation of prewarm:
See https://github.com/neondatabase/neon/pull/10312/


## Summary of changes

Use conditional variables to sycnhronize access to LFC entry.

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-02-21 16:56:16 +00:00
Vlad Lazar
3e82addd64 storcon: use Duration for duration's in the storage controller tenant config (#10928)
## Problem

The storage controller treats durations in the tenant config as strings.
These are loaded from the db.
The pageserver maps these durations to a seconds only format and we
always get a mismatch compared
to what's in the db.

## Summary of changes

Treat durations as durations inside the storage controller and not as
strings.
Nothing changes in the cross service API's themselves or the way things
are stored in the db.

I also added some logging which I would have made the investigation a
10min job:
1. Reason for why the reconciliation was spawned
2. Location config diff between the observed and wanted states
2025-02-21 15:45:00 +00:00
John Spray
5e3c234edc storcon: do more concurrent optimisations (#10929)
## Problem

Now that we rely on the optimisation logic to handle fixing things up
after some tenants are in the wrong AZ (e.g. after node failure), it's
no longer appropriate to treat optimisations as an ultra-low-priority
task. We used to reflect that low priority with a very low limit on
concurrent execution, such that we would only migrate 2 things every 20
seconds.

## Summary of changes

- Increase MAX_OPTIMIZATIONS_EXEC_PER_PASS from 2 to 16
- Increase MAX_OPTIMIZATIONS_PLAN_PER_PASS from 8 to 64.

Since we recently gave user-initiated actions their own semaphore, this
should not risk starving out API requests.
2025-02-21 14:58:49 +00:00
22 changed files with 1046 additions and 412 deletions

28
Cargo.lock generated
View File

@@ -1874,6 +1874,12 @@ dependencies = [
"syn 2.0.90",
]
[[package]]
name = "difflib"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8"
[[package]]
name = "digest"
version = "0.10.7"
@@ -3263,7 +3269,8 @@ checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b"
[[package]]
name = "jemalloc_pprof"
version = "0.6.0"
source = "git+https://github.com/erikgrinaker/rust-jemalloc-pprof?branch=symbolize#0b146a1e2013bbc7fc8dc45f208f868c0b8ed193"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a883828bd6a4b957cd9f618886ff19e5f3ebd34e06ba0e855849e049fef32fb"
dependencies = [
"anyhow",
"libc",
@@ -3330,6 +3337,17 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "json-structural-diff"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e878e36a8a44c158505c2c818abdc1350413ad83dcb774a0459f6a7ef2b65cbf"
dependencies = [
"difflib",
"regex",
"serde_json",
]
[[package]]
name = "jsonwebtoken"
version = "9.2.0"
@@ -3452,7 +3470,8 @@ dependencies = [
[[package]]
name = "mappings"
version = "0.6.0"
source = "git+https://github.com/erikgrinaker/rust-jemalloc-pprof?branch=symbolize#0b146a1e2013bbc7fc8dc45f208f868c0b8ed193"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce9229c438fbf1c333926e2053c4c091feabbd40a1b590ec62710fea2384af9e"
dependencies = [
"anyhow",
"libc",
@@ -4727,10 +4746,10 @@ dependencies = [
[[package]]
name = "pprof_util"
version = "0.6.0"
source = "git+https://github.com/erikgrinaker/rust-jemalloc-pprof?branch=symbolize#0b146a1e2013bbc7fc8dc45f208f868c0b8ed193"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65c568b3f8c1c37886ae07459b1946249e725c315306b03be5632f84c239f781"
dependencies = [
"anyhow",
"backtrace",
"flate2",
"num",
"paste",
@@ -6441,6 +6460,7 @@ dependencies = [
"humantime",
"hyper 0.14.30",
"itertools 0.10.5",
"json-structural-diff",
"lasso",
"measured",
"metrics",

View File

@@ -116,7 +116,7 @@ inferno = "0.12.0"
ipnet = "2.10.0"
itertools = "0.10"
itoa = "1.0.11"
jemalloc_pprof = { git = "https://github.com/erikgrinaker/rust-jemalloc-pprof", branch = "symbolize", version = "0.6", features = ["symbolize"] }
jemalloc_pprof = "0.6"
jsonwebtoken = "9"
lasso = "0.7"
libc = "0.2"
@@ -210,6 +210,7 @@ rustls-native-certs = "0.8"
x509-parser = "0.16"
whoami = "1.5.1"
zerocopy = { version = "0.7", features = ["derive"] }
json-structural-diff = { version = "0.2.0" }
## TODO replace this with tracing
env_logger = "0.10"

View File

@@ -335,13 +335,21 @@ impl PageServerNode {
.map(|x| x.parse::<u64>())
.transpose()
.context("Failed to parse 'checkpoint_distance' as an integer")?,
checkpoint_timeout: settings.remove("checkpoint_timeout").map(|x| x.to_string()),
checkpoint_timeout: settings
.remove("checkpoint_timeout")
.map(humantime::parse_duration)
.transpose()
.context("Failed to parse 'checkpoint_timeout' as duration")?,
compaction_target_size: settings
.remove("compaction_target_size")
.map(|x| x.parse::<u64>())
.transpose()
.context("Failed to parse 'compaction_target_size' as an integer")?,
compaction_period: settings.remove("compaction_period").map(|x| x.to_string()),
compaction_period: settings
.remove("compaction_period")
.map(humantime::parse_duration)
.transpose()
.context("Failed to parse 'compaction_period' as duration")?,
compaction_threshold: settings
.remove("compaction_threshold")
.map(|x| x.parse::<usize>())
@@ -387,7 +395,10 @@ impl PageServerNode {
.map(|x| x.parse::<u64>())
.transpose()
.context("Failed to parse 'gc_horizon' as an integer")?,
gc_period: settings.remove("gc_period").map(|x| x.to_string()),
gc_period: settings.remove("gc_period")
.map(humantime::parse_duration)
.transpose()
.context("Failed to parse 'gc_period' as duration")?,
image_creation_threshold: settings
.remove("image_creation_threshold")
.map(|x| x.parse::<usize>())
@@ -403,13 +414,20 @@ impl PageServerNode {
.map(|x| x.parse::<usize>())
.transpose()
.context("Failed to parse 'image_creation_preempt_threshold' as integer")?,
pitr_interval: settings.remove("pitr_interval").map(|x| x.to_string()),
pitr_interval: settings.remove("pitr_interval")
.map(humantime::parse_duration)
.transpose()
.context("Failed to parse 'pitr_interval' as duration")?,
walreceiver_connect_timeout: settings
.remove("walreceiver_connect_timeout")
.map(|x| x.to_string()),
.map(humantime::parse_duration)
.transpose()
.context("Failed to parse 'walreceiver_connect_timeout' as duration")?,
lagging_wal_timeout: settings
.remove("lagging_wal_timeout")
.map(|x| x.to_string()),
.map(humantime::parse_duration)
.transpose()
.context("Failed to parse 'lagging_wal_timeout' as duration")?,
max_lsn_wal_lag: settings
.remove("max_lsn_wal_lag")
.map(|x| x.parse::<NonZeroU64>())
@@ -427,8 +445,14 @@ impl PageServerNode {
.context("Failed to parse 'min_resident_size_override' as integer")?,
evictions_low_residence_duration_metric_threshold: settings
.remove("evictions_low_residence_duration_metric_threshold")
.map(|x| x.to_string()),
heatmap_period: settings.remove("heatmap_period").map(|x| x.to_string()),
.map(humantime::parse_duration)
.transpose()
.context("Failed to parse 'evictions_low_residence_duration_metric_threshold' as duration")?,
heatmap_period: settings
.remove("heatmap_period")
.map(humantime::parse_duration)
.transpose()
.context("Failed to parse 'heatmap_period' as duration")?,
lazy_slru_download: settings
.remove("lazy_slru_download")
.map(|x| x.parse::<bool>())
@@ -439,10 +463,15 @@ impl PageServerNode {
.map(serde_json::from_str)
.transpose()
.context("parse `timeline_get_throttle` from json")?,
lsn_lease_length: settings.remove("lsn_lease_length").map(|x| x.to_string()),
lsn_lease_length: settings.remove("lsn_lease_length")
.map(humantime::parse_duration)
.transpose()
.context("Failed to parse 'lsn_lease_length' as duration")?,
lsn_lease_length_for_ts: settings
.remove("lsn_lease_length_for_ts")
.map(|x| x.to_string()),
.map(humantime::parse_duration)
.transpose()
.context("Failed to parse 'lsn_lease_length_for_ts' as duration")?,
timeline_offloading: settings
.remove("timeline_offloading")
.map(|x| x.parse::<bool>())

View File

@@ -959,7 +959,7 @@ async fn main() -> anyhow::Result<()> {
threshold: threshold.into(),
},
)),
heatmap_period: Some("300s".to_string()),
heatmap_period: Some(Duration::from_secs(300)),
..Default::default()
},
})

View File

@@ -495,10 +495,19 @@ pub async fn profile_heap_handler(req: Request<Body>) -> Result<Response<Body>,
}
Format::Pprof => {
let data = tokio::task::spawn_blocking(move || prof_ctl.dump_pprof())
.await
.map_err(|join_err| ApiError::InternalServerError(join_err.into()))?
.map_err(ApiError::InternalServerError)?;
let data = tokio::task::spawn_blocking(move || {
let bytes = prof_ctl.dump_pprof()?;
// Symbolize the profile.
// TODO: consider moving this upstream to jemalloc_pprof and avoiding the
// serialization roundtrip.
let profile = pprof::decode(&bytes)?;
let profile = pprof::symbolize(profile)?;
let profile = pprof::strip_locations(profile, STRIP_MAPPINGS, &STRIP_FUNCTIONS);
pprof::encode(&profile)
})
.await
.map_err(|join_err| ApiError::InternalServerError(join_err.into()))?
.map_err(ApiError::InternalServerError)?;
Response::builder()
.status(200)
.header(CONTENT_TYPE, "application/octet-stream")
@@ -511,6 +520,8 @@ pub async fn profile_heap_handler(req: Request<Body>) -> Result<Response<Body>,
let body = tokio::task::spawn_blocking(move || {
let bytes = prof_ctl.dump_pprof()?;
let profile = pprof::decode(&bytes)?;
let profile = pprof::symbolize(profile)?;
let profile = pprof::strip_locations(profile, STRIP_MAPPINGS, &STRIP_FUNCTIONS);
let mut opts = inferno::flamegraph::Options::default();
opts.title = "Heap inuse".to_string();
opts.count_name = "bytes".to_string();

View File

@@ -526,9 +526,13 @@ pub struct TenantConfigPatch {
#[derive(Serialize, Deserialize, Debug, Default, Clone, Eq, PartialEq)]
pub struct TenantConfig {
pub checkpoint_distance: Option<u64>,
pub checkpoint_timeout: Option<String>,
#[serde(default)]
#[serde(with = "humantime_serde")]
pub checkpoint_timeout: Option<Duration>,
pub compaction_target_size: Option<u64>,
pub compaction_period: Option<String>,
#[serde(default)]
#[serde(with = "humantime_serde")]
pub compaction_period: Option<Duration>,
pub compaction_threshold: Option<usize>,
pub compaction_upper_limit: Option<usize>,
// defer parsing compaction_algorithm, like eviction_policy
@@ -539,22 +543,38 @@ pub struct TenantConfig {
pub l0_flush_stall_threshold: Option<usize>,
pub l0_flush_wait_upload: Option<bool>,
pub gc_horizon: Option<u64>,
pub gc_period: Option<String>,
#[serde(default)]
#[serde(with = "humantime_serde")]
pub gc_period: Option<Duration>,
pub image_creation_threshold: Option<usize>,
pub pitr_interval: Option<String>,
pub walreceiver_connect_timeout: Option<String>,
pub lagging_wal_timeout: Option<String>,
#[serde(default)]
#[serde(with = "humantime_serde")]
pub pitr_interval: Option<Duration>,
#[serde(default)]
#[serde(with = "humantime_serde")]
pub walreceiver_connect_timeout: Option<Duration>,
#[serde(default)]
#[serde(with = "humantime_serde")]
pub lagging_wal_timeout: Option<Duration>,
pub max_lsn_wal_lag: Option<NonZeroU64>,
pub eviction_policy: Option<EvictionPolicy>,
pub min_resident_size_override: Option<u64>,
pub evictions_low_residence_duration_metric_threshold: Option<String>,
pub heatmap_period: Option<String>,
#[serde(default)]
#[serde(with = "humantime_serde")]
pub evictions_low_residence_duration_metric_threshold: Option<Duration>,
#[serde(default)]
#[serde(with = "humantime_serde")]
pub heatmap_period: Option<Duration>,
pub lazy_slru_download: Option<bool>,
pub timeline_get_throttle: Option<ThrottleConfig>,
pub image_layer_creation_check_threshold: Option<u8>,
pub image_creation_preempt_threshold: Option<usize>,
pub lsn_lease_length: Option<String>,
pub lsn_lease_length_for_ts: Option<String>,
#[serde(default)]
#[serde(with = "humantime_serde")]
pub lsn_lease_length: Option<Duration>,
#[serde(default)]
#[serde(with = "humantime_serde")]
pub lsn_lease_length_for_ts: Option<Duration>,
pub timeline_offloading: Option<bool>,
pub wal_receiver_protocol_override: Option<PostgresClientProtocol>,
pub rel_size_v2_enabled: Option<bool>,
@@ -564,7 +584,10 @@ pub struct TenantConfig {
}
impl TenantConfig {
pub fn apply_patch(self, patch: TenantConfigPatch) -> TenantConfig {
pub fn apply_patch(
self,
patch: TenantConfigPatch,
) -> Result<TenantConfig, humantime::DurationError> {
let Self {
mut checkpoint_distance,
mut checkpoint_timeout,
@@ -604,11 +627,17 @@ impl TenantConfig {
} = self;
patch.checkpoint_distance.apply(&mut checkpoint_distance);
patch.checkpoint_timeout.apply(&mut checkpoint_timeout);
patch
.checkpoint_timeout
.map(|v| humantime::parse_duration(&v))?
.apply(&mut checkpoint_timeout);
patch
.compaction_target_size
.apply(&mut compaction_target_size);
patch.compaction_period.apply(&mut compaction_period);
patch
.compaction_period
.map(|v| humantime::parse_duration(&v))?
.apply(&mut compaction_period);
patch.compaction_threshold.apply(&mut compaction_threshold);
patch
.compaction_upper_limit
@@ -626,15 +655,25 @@ impl TenantConfig {
.apply(&mut l0_flush_stall_threshold);
patch.l0_flush_wait_upload.apply(&mut l0_flush_wait_upload);
patch.gc_horizon.apply(&mut gc_horizon);
patch.gc_period.apply(&mut gc_period);
patch
.gc_period
.map(|v| humantime::parse_duration(&v))?
.apply(&mut gc_period);
patch
.image_creation_threshold
.apply(&mut image_creation_threshold);
patch.pitr_interval.apply(&mut pitr_interval);
patch
.pitr_interval
.map(|v| humantime::parse_duration(&v))?
.apply(&mut pitr_interval);
patch
.walreceiver_connect_timeout
.map(|v| humantime::parse_duration(&v))?
.apply(&mut walreceiver_connect_timeout);
patch.lagging_wal_timeout.apply(&mut lagging_wal_timeout);
patch
.lagging_wal_timeout
.map(|v| humantime::parse_duration(&v))?
.apply(&mut lagging_wal_timeout);
patch.max_lsn_wal_lag.apply(&mut max_lsn_wal_lag);
patch.eviction_policy.apply(&mut eviction_policy);
patch
@@ -642,8 +681,12 @@ impl TenantConfig {
.apply(&mut min_resident_size_override);
patch
.evictions_low_residence_duration_metric_threshold
.map(|v| humantime::parse_duration(&v))?
.apply(&mut evictions_low_residence_duration_metric_threshold);
patch.heatmap_period.apply(&mut heatmap_period);
patch
.heatmap_period
.map(|v| humantime::parse_duration(&v))?
.apply(&mut heatmap_period);
patch.lazy_slru_download.apply(&mut lazy_slru_download);
patch
.timeline_get_throttle
@@ -654,9 +697,13 @@ impl TenantConfig {
patch
.image_creation_preempt_threshold
.apply(&mut image_creation_preempt_threshold);
patch.lsn_lease_length.apply(&mut lsn_lease_length);
patch
.lsn_lease_length
.map(|v| humantime::parse_duration(&v))?
.apply(&mut lsn_lease_length);
patch
.lsn_lease_length_for_ts
.map(|v| humantime::parse_duration(&v))?
.apply(&mut lsn_lease_length_for_ts);
patch.timeline_offloading.apply(&mut timeline_offloading);
patch
@@ -673,7 +720,7 @@ impl TenantConfig {
.gc_compaction_ratio_percent
.apply(&mut gc_compaction_ratio_percent);
Self {
Ok(Self {
checkpoint_distance,
checkpoint_timeout,
compaction_target_size,
@@ -709,7 +756,7 @@ impl TenantConfig {
gc_compaction_enabled,
gc_compaction_initial_threshold_kb,
gc_compaction_ratio_percent,
}
})
}
}
@@ -2503,7 +2550,7 @@ mod tests {
..base.clone()
};
let patched = base.apply_patch(decoded.config);
let patched = base.apply_patch(decoded.config).unwrap();
assert_eq!(patched, expected);
}

View File

@@ -672,10 +672,6 @@ fn start_pageserver(
}
});
// Allocate a bunch of memory.
let alloc = allocate(256 * 1024 * 1024);
println!("allocated {}b", alloc.len());
// Wait for cancellation signal and shut down the pageserver.
//
// This cancels the `shutdown_pageserver` cancellation tree. Right now that tree doesn't
@@ -699,17 +695,6 @@ fn start_pageserver(
})
}
#[inline(never)]
fn allocate(size: usize) -> Vec<u8> {
allocate_inline(size)
}
#[inline(always)]
fn allocate_inline(size: usize) -> Vec<u8> {
println!("allocating {size}b");
vec![9; size]
}
async fn create_remote_storage_client(
conf: &'static PageServerConf,
) -> anyhow::Result<GenericRemoteStorage> {

View File

@@ -693,16 +693,15 @@ impl TryFrom<&'_ models::TenantConfig> for TenantConfOpt {
/// This is a conversion from our internal tenant config object to the one used
/// in external APIs.
impl From<TenantConfOpt> for models::TenantConfig {
// TODO(vlad): These are now the same, but they have different serialization logic.
// Can we merge them?
fn from(value: TenantConfOpt) -> Self {
fn humantime(d: Duration) -> String {
format!("{}s", d.as_secs())
}
Self {
checkpoint_distance: value.checkpoint_distance,
checkpoint_timeout: value.checkpoint_timeout.map(humantime),
checkpoint_timeout: value.checkpoint_timeout,
compaction_algorithm: value.compaction_algorithm,
compaction_target_size: value.compaction_target_size,
compaction_period: value.compaction_period.map(humantime),
compaction_period: value.compaction_period,
compaction_threshold: value.compaction_threshold,
compaction_upper_limit: value.compaction_upper_limit,
compaction_l0_first: value.compaction_l0_first,
@@ -711,24 +710,23 @@ impl From<TenantConfOpt> for models::TenantConfig {
l0_flush_stall_threshold: value.l0_flush_stall_threshold,
l0_flush_wait_upload: value.l0_flush_wait_upload,
gc_horizon: value.gc_horizon,
gc_period: value.gc_period.map(humantime),
gc_period: value.gc_period,
image_creation_threshold: value.image_creation_threshold,
pitr_interval: value.pitr_interval.map(humantime),
walreceiver_connect_timeout: value.walreceiver_connect_timeout.map(humantime),
lagging_wal_timeout: value.lagging_wal_timeout.map(humantime),
pitr_interval: value.pitr_interval,
walreceiver_connect_timeout: value.walreceiver_connect_timeout,
lagging_wal_timeout: value.lagging_wal_timeout,
max_lsn_wal_lag: value.max_lsn_wal_lag,
eviction_policy: value.eviction_policy,
min_resident_size_override: value.min_resident_size_override,
evictions_low_residence_duration_metric_threshold: value
.evictions_low_residence_duration_metric_threshold
.map(humantime),
heatmap_period: value.heatmap_period.map(humantime),
.evictions_low_residence_duration_metric_threshold,
heatmap_period: value.heatmap_period,
lazy_slru_download: value.lazy_slru_download,
timeline_get_throttle: value.timeline_get_throttle,
image_layer_creation_check_threshold: value.image_layer_creation_check_threshold,
image_creation_preempt_threshold: value.image_creation_preempt_threshold,
lsn_lease_length: value.lsn_lease_length.map(humantime),
lsn_lease_length_for_ts: value.lsn_lease_length_for_ts.map(humantime),
lsn_lease_length: value.lsn_lease_length,
lsn_lease_length_for_ts: value.lsn_lease_length_for_ts,
timeline_offloading: value.timeline_offloading,
wal_receiver_protocol_override: value.wal_receiver_protocol_override,
rel_size_v2_enabled: value.rel_size_v2_enabled,
@@ -760,29 +758,10 @@ mod tests {
assert_eq!(small_conf, serde_json::from_str(&json_form).unwrap());
}
#[test]
fn test_try_from_models_tenant_config_err() {
let tenant_config = models::TenantConfig {
lagging_wal_timeout: Some("5a".to_string()),
..TenantConfig::default()
};
let tenant_conf_opt = TenantConfOpt::try_from(&tenant_config);
assert!(
tenant_conf_opt.is_err(),
"Suceeded to convert TenantConfig to TenantConfOpt"
);
let expected_error_str =
"lagging_wal_timeout: invalid value: string \"5a\", expected a duration";
assert_eq!(tenant_conf_opt.unwrap_err().to_string(), expected_error_str);
}
#[test]
fn test_try_from_models_tenant_config_success() {
let tenant_config = models::TenantConfig {
lagging_wal_timeout: Some("5s".to_string()),
lagging_wal_timeout: Some(Duration::from_secs(5)),
..TenantConfig::default()
};

View File

@@ -136,7 +136,9 @@ impl WalRedoProcess {
Ok(0) => break Ok(()), // eof
Ok(num_bytes) => {
let output = String::from_utf8_lossy(&buf[..num_bytes]);
error!(%output, "received output");
if !output.contains("LOG:") {
error!(%output, "received output");
}
}
Err(e) => {
break Err(e);

File diff suppressed because it is too large Load Diff

View File

@@ -56,6 +56,7 @@ uint32 WAIT_EVENT_NEON_LFC_MAINTENANCE;
uint32 WAIT_EVENT_NEON_LFC_READ;
uint32 WAIT_EVENT_NEON_LFC_TRUNCATE;
uint32 WAIT_EVENT_NEON_LFC_WRITE;
uint32 WAIT_EVENT_NEON_LFC_CV_WAIT;
uint32 WAIT_EVENT_NEON_PS_STARTING;
uint32 WAIT_EVENT_NEON_PS_CONFIGURING;
uint32 WAIT_EVENT_NEON_PS_SEND;
@@ -538,6 +539,7 @@ neon_shmem_startup_hook(void)
WAIT_EVENT_NEON_LFC_READ = WaitEventExtensionNew("Neon/FileCache_Read");
WAIT_EVENT_NEON_LFC_TRUNCATE = WaitEventExtensionNew("Neon/FileCache_Truncate");
WAIT_EVENT_NEON_LFC_WRITE = WaitEventExtensionNew("Neon/FileCache_Write");
WAIT_EVENT_NEON_LFC_CV_WAIT = WaitEventExtensionNew("Neon/FileCache_CvWait");
WAIT_EVENT_NEON_PS_STARTING = WaitEventExtensionNew("Neon/PS_Starting");
WAIT_EVENT_NEON_PS_CONFIGURING = WaitEventExtensionNew("Neon/PS_Configuring");
WAIT_EVENT_NEON_PS_SEND = WaitEventExtensionNew("Neon/PS_SendIO");

View File

@@ -28,6 +28,7 @@ extern uint32 WAIT_EVENT_NEON_LFC_MAINTENANCE;
extern uint32 WAIT_EVENT_NEON_LFC_READ;
extern uint32 WAIT_EVENT_NEON_LFC_TRUNCATE;
extern uint32 WAIT_EVENT_NEON_LFC_WRITE;
extern uint32 WAIT_EVENT_NEON_LFC_CV_WAIT;
extern uint32 WAIT_EVENT_NEON_PS_STARTING;
extern uint32 WAIT_EVENT_NEON_PS_CONFIGURING;
extern uint32 WAIT_EVENT_NEON_PS_SEND;
@@ -38,6 +39,7 @@ extern uint32 WAIT_EVENT_NEON_WAL_DL;
#define WAIT_EVENT_NEON_LFC_READ WAIT_EVENT_BUFFILE_READ
#define WAIT_EVENT_NEON_LFC_TRUNCATE WAIT_EVENT_BUFFILE_TRUNCATE
#define WAIT_EVENT_NEON_LFC_WRITE WAIT_EVENT_BUFFILE_WRITE
#define WAIT_EVENT_NEON_LFC_CV_WAIT WAIT_EVENT_BUFFILE_READ
#define WAIT_EVENT_NEON_PS_STARTING PG_WAIT_EXTENSION
#define WAIT_EVENT_NEON_PS_CONFIGURING PG_WAIT_EXTENSION
#define WAIT_EVENT_NEON_PS_SEND PG_WAIT_EXTENSION

View File

@@ -233,6 +233,7 @@ extern char *neon_timeline;
extern char *neon_tenant;
extern int32 max_cluster_size;
extern int neon_protocol_version;
extern bool lfc_store_prefetch_result;
extern shardno_t get_shard_number(BufferTag* tag);
@@ -301,14 +302,16 @@ extern bool lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno);
extern int lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno, int nblocks, bits8 *bitmap);
extern void lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno);
extern void lfc_init(void);
extern bool lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
const void* buffer, XLogRecPtr lsn);
static inline bool
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
void *buffer)
{
bits8 rv = 0;
bits8 rv = 1;
return lfc_readv_select(rinfo, forkNum, blkno, &buffer, 1, &rv) == 1;
}

View File

@@ -162,7 +162,7 @@ static uint32 local_request_counter;
* UNUSED ------> REQUESTED --> RECEIVED
* ^ : | |
* | : v |
* | : TAG_UNUSED |
* | : TAG_REMAINS |
* | : | |
* +----------------+------------+
* :
@@ -181,7 +181,7 @@ typedef enum PrefetchStatus
/* must fit in uint8; bits 0x1 are used */
typedef enum {
PRFSF_NONE = 0x0,
PRFSF_SEQ = 0x1,
PRFSF_LFC = 0x1 /* received prefetch result is stored in LFC */
} PrefetchRequestFlags;
typedef struct PrefetchRequest
@@ -305,7 +305,7 @@ GetLastWrittenLSNv(NRelFileInfo relfilenode, ForkNumber forknum,
static void
neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum,
BlockNumber blkno, neon_request_lsns *output,
BlockNumber nblocks, const bits8 *mask);
BlockNumber nblocks);
static bool neon_prefetch_response_usable(neon_request_lsns *request_lsns,
PrefetchRequest *slot);
@@ -363,6 +363,7 @@ compact_prefetch_buffers(void)
target_slot->buftag = source_slot->buftag;
target_slot->shard_no = source_slot->shard_no;
target_slot->status = source_slot->status;
target_slot->flags = source_slot->flags;
target_slot->response = source_slot->response;
target_slot->reqid = source_slot->reqid;
target_slot->request_lsns = source_slot->request_lsns;
@@ -452,6 +453,18 @@ prefetch_pump_state(void)
/* update slot state */
slot->status = PRFS_RECEIVED;
slot->response = response;
if (response->tag == T_NeonGetPageResponse && !(slot->flags & PRFSF_LFC) && lfc_store_prefetch_result)
{
/*
* Store prefetched result in LFC (please read comments to lfc_prefetch
* explaining why it can be done without holding shared buffer lock
*/
if (lfc_prefetch(BufTagGetNRelFileInfo(slot->buftag), slot->buftag.forkNum, slot->buftag.blockNum, ((NeonGetPageResponse*)response)->page, slot->request_lsns.not_modified_since))
{
slot->flags |= PRFSF_LFC;
}
}
}
}
@@ -713,6 +726,18 @@ prefetch_read(PrefetchRequest *slot)
/* update slot state */
slot->status = PRFS_RECEIVED;
slot->response = response;
if (response->tag == T_NeonGetPageResponse && !(slot->flags & PRFSF_LFC) && lfc_store_prefetch_result)
{
/*
* Store prefetched result in LFC (please read comments to lfc_prefetch
* explaining why it can be done without holding shared buffer lock
*/
if (lfc_prefetch(BufTagGetNRelFileInfo(buftag), buftag.forkNum, buftag.blockNum, ((NeonGetPageResponse*)response)->page, slot->request_lsns.not_modified_since))
{
slot->flags |= PRFSF_LFC;
}
}
return true;
}
else
@@ -864,7 +889,7 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
else
neon_get_request_lsns(BufTagGetNRelFileInfo(slot->buftag),
slot->buftag.forkNum, slot->buftag.blockNum,
&slot->request_lsns, 1, NULL);
&slot->request_lsns, 1);
request.hdr.lsn = slot->request_lsns.request_lsn;
request.hdr.not_modified_since = slot->request_lsns.not_modified_since;
@@ -890,6 +915,73 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
Assert(!found);
}
/*
* Lookup of already received prefetch requests. Only already received responses matching required LSNs are accepted.
* Present pages are marked in "mask" bitmap and total number of such pages is returned.
*/
static int
prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blocknum, neon_request_lsns *lsns,
BlockNumber nblocks, void **buffers, bits8 *mask)
{
int hits = 0;
PrefetchRequest hashkey;
/*
* Use an intermediate PrefetchRequest struct as the hash key to ensure
* correct alignment and that the padding bytes are cleared.
*/
memset(&hashkey.buftag, 0, sizeof(BufferTag));
CopyNRelFileInfoToBufTag(hashkey.buftag, rinfo);
hashkey.buftag.forkNum = forknum;
for (int i = 0; i < nblocks; i++)
{
PrfHashEntry *entry;
hashkey.buftag.blockNum = blocknum + i;
entry = prfh_lookup(MyPState->prf_hash, &hashkey);
if (entry != NULL)
{
PrefetchRequest *slot = entry->slot;
uint64 ring_index = slot->my_ring_index;
Assert(slot == GetPrfSlot(ring_index));
Assert(slot->status != PRFS_UNUSED);
Assert(MyPState->ring_last <= ring_index &&
ring_index < MyPState->ring_unused);
Assert(BufferTagsEqual(&slot->buftag, &hashkey.buftag));
if (slot->status != PRFS_RECEIVED)
continue;
/*
* If the caller specified a request LSN to use, only accept
* prefetch responses that satisfy that request.
*/
if (!neon_prefetch_response_usable(&lsns[i], slot))
continue;
memcpy(buffers[i], ((NeonGetPageResponse*)slot->response)->page, BLCKSZ);
prefetch_set_unused(ring_index);
BITMAP_SET(mask, i);
hits += 1;
}
}
pgBufferUsage.prefetch.hits += hits;
return hits;
}
#if PG_MAJORVERSION_NUM < 17
static bool
prefetch_lookup(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkn, neon_request_lsns *lsns, void *buffer)
{
bits8 present = 0;
return prefetch_lookupv(rinfo, forkNum, blkn, lsns, 1, &buffer, &present) != 0;
}
#endif
/*
* prefetch_register_bufferv() - register and prefetch buffers
*
@@ -1013,8 +1105,6 @@ Retry:
/* The buffered request is good enough, return that index */
if (is_prefetch)
pgBufferUsage.prefetch.duplicates++;
else
pgBufferUsage.prefetch.hits++;
continue;
}
}
@@ -1116,6 +1206,7 @@ Retry:
slot->buftag = hashkey.buftag;
slot->shard_no = get_shard_number(&tag);
slot->my_ring_index = ring_index;
slot->flags = 0;
min_ring_index = Min(min_ring_index, ring_index);
@@ -2056,8 +2147,7 @@ GetLastWrittenLSNv(NRelFileInfo relfilenode, ForkNumber forknum,
*/
static void
neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
neon_request_lsns *output, BlockNumber nblocks,
const bits8 *mask)
neon_request_lsns *output, BlockNumber nblocks)
{
XLogRecPtr last_written_lsns[PG_IOV_MAX];
@@ -2145,9 +2235,6 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
neon_request_lsns *result = &output[i];
XLogRecPtr last_written_lsn = last_written_lsns[i];
if (PointerIsValid(mask) && !BITMAP_ISSET(mask, i))
continue;
if (last_written_lsn > replay_lsn)
{
/* GetCurrentReplayRecPtr was introduced in v15 */
@@ -2190,8 +2277,6 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
neon_request_lsns *result = &output[i];
XLogRecPtr last_written_lsn = last_written_lsns[i];
if (PointerIsValid(mask) && !BITMAP_ISSET(mask, i))
continue;
/*
* Use the latest LSN that was evicted from the buffer cache as the
* 'not_modified_since' hint. Any pages modified by later WAL records
@@ -2413,7 +2498,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
}
neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum,
REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1, NULL);
REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1);
{
NeonExistsRequest request = {
.hdr.tag = T_NeonExistsRequest,
@@ -2832,8 +2917,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
while (nblocks > 0)
{
int iterblocks = Min(nblocks, PG_IOV_MAX);
bits8 lfc_present[PG_IOV_MAX / 8];
memset(lfc_present, 0, sizeof(lfc_present));
bits8 lfc_present[PG_IOV_MAX / 8] = {0};
if (lfc_cache_containsv(InfoFromSMgrRel(reln), forknum, blocknum,
iterblocks, lfc_present) == iterblocks)
@@ -2844,12 +2928,13 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
}
tag.blockNum = blocknum;
for (int i = 0; i < PG_IOV_MAX / 8; i++)
lfc_present[i] = ~(lfc_present[i]);
ring_index = prefetch_register_bufferv(tag, NULL, iterblocks,
lfc_present, true);
nblocks -= iterblocks;
blocknum += iterblocks;
@@ -3105,7 +3190,8 @@ Retry:
}
}
memcpy(buffer, getpage_resp->page, BLCKSZ);
lfc_write(rinfo, forkNum, blockno, buffer);
if (!lfc_store_prefetch_result)
lfc_write(rinfo, forkNum, blockno, buffer);
break;
}
case T_NeonErrorResponse:
@@ -3190,6 +3276,17 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
}
/* Try to read PS results if they are available */
prefetch_pump_state();
neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum, blkno, &request_lsns, 1);
if (prefetch_lookup(InfoFromSMgrRel(reln), forkNum, blkno, &request_lsns, buffer))
{
/* Prefetch hit */
return;
}
/* Try to read from local file cache */
if (lfc_read(InfoFromSMgrRel(reln), forkNum, blkno, buffer))
{
@@ -3197,9 +3294,11 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
return;
}
neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum, blkno, &request_lsns, 1, NULL);
neon_read_at_lsn(InfoFromSMgrRel(reln), forkNum, blkno, request_lsns, buffer);
/*
* Try to receive prefetch results once again just to make sure we don't leave the smgr code while the OS might still have buffered bytes.
*/
prefetch_pump_state();
#ifdef DEBUG_COMPARE_LOCAL
@@ -3280,11 +3379,14 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
#if PG_MAJORVERSION_NUM >= 17
static void
neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
void **buffers, BlockNumber nblocks)
void **buffers, BlockNumber nblocks)
{
bits8 prefetch_hits[PG_IOV_MAX / 8] = {0};
bits8 lfc_hits[PG_IOV_MAX / 8];
bits8 read[PG_IOV_MAX / 8];
neon_request_lsns request_lsns[PG_IOV_MAX];
int lfc_result;
int prefetch_result;
switch (reln->smgr_relpersistence)
{
@@ -3307,38 +3409,52 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
neon_log(ERROR, "Read request too large: %d is larger than max %d",
nblocks, PG_IOV_MAX);
memset(read, 0, sizeof(read));
/* Try to read PS results if they are available */
prefetch_pump_state();
neon_get_request_lsns(InfoFromSMgrRel(reln), forknum, blocknum,
request_lsns, nblocks);
prefetch_result = prefetch_lookupv(InfoFromSMgrRel(reln), forknum, blocknum, request_lsns, nblocks, buffers, prefetch_hits);
if (prefetch_result == nblocks)
return;
/* invert the result: exclude prefetched blocks */
for (int i = 0; i < PG_IOV_MAX / 8; i++)
lfc_hits[i] = ~prefetch_hits[i];
/* Try to read from local file cache */
lfc_result = lfc_readv_select(InfoFromSMgrRel(reln), forknum, blocknum, buffers,
nblocks, read);
nblocks, lfc_hits);
if (lfc_result > 0)
MyNeonCounters->file_cache_hits_total += lfc_result;
/* Read all blocks from LFC, so we're done */
if (lfc_result == nblocks)
if (prefetch_result + lfc_result == nblocks)
return;
if (lfc_result == -1)
if (lfc_result <= 0)
{
/* can't use the LFC result, so read all blocks from PS */
for (int i = 0; i < PG_IOV_MAX / 8; i++)
read[i] = 0xFF;
read[i] = ~prefetch_hits[i];
}
else
{
/* invert the result: exclude blocks read from lfc */
for (int i = 0; i < PG_IOV_MAX / 8; i++)
read[i] = ~(read[i]);
read[i] = ~(prefetch_hits[i] | lfc_hits[i]);
}
neon_get_request_lsns(InfoFromSMgrRel(reln), forknum, blocknum,
request_lsns, nblocks, read);
neon_read_at_lsnv(InfoFromSMgrRel(reln), forknum, blocknum, request_lsns,
buffers, nblocks, read);
/*
* Try to receive prefetch results once again just to make sure we don't leave the smgr code while the OS might still have buffered bytes.
*/
prefetch_pump_state();
#ifdef DEBUG_COMPARE_LOCAL
@@ -3610,7 +3726,7 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
}
neon_get_request_lsns(InfoFromSMgrRel(reln), forknum,
REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1, NULL);
REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1);
{
NeonNblocksRequest request = {
@@ -3695,7 +3811,7 @@ neon_dbsize(Oid dbNode)
NRelFileInfo dummy_node = {0};
neon_get_request_lsns(dummy_node, MAIN_FORKNUM,
REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1, NULL);
REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1);
{
NeonDbSizeRequest request = {
@@ -4430,7 +4546,12 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
if (no_redo_needed)
{
SetLastWrittenLSNForBlock(end_recptr, rinfo, forknum, blkno);
lfc_evict(rinfo, forknum, blkno);
/*
* Redo changes if page exists in LFC.
* We should perform this check after assigning LwLSN to prevent
* prefetching of some older version of the page by some other backend.
*/
no_redo_needed = !lfc_cache_contains(rinfo, forknum, blkno);
}
LWLockRelease(partitionLock);

View File

@@ -32,8 +32,8 @@
#include "inmem_smgr.h"
/* Size of the in-memory smgr */
#define MAX_PAGES 64
/* Size of the in-memory smgr: XLR_MAX_BLOCK_ID is 32, but we can update up to 3 forks for each block */
#define MAX_PAGES 100
/* If more than WARN_PAGES are used, print a warning in the log */
#define WARN_PAGES 32
@@ -285,12 +285,12 @@ inmem_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
* WARN_PAGES, print a warning so that we get alerted and get to
* investigate why we're accessing so many buffers.
*/
elog(used_pages >= WARN_PAGES ? WARNING : DEBUG1,
"inmem_write() called for %u/%u/%u.%u blk %u: used_pages %u",
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forknum,
blocknum,
used_pages);
if (used_pages >= WARN_PAGES)
ereport(WARNING, (errmsg("inmem_write() called for %u/%u/%u.%u blk %u: used_pages %u",
RelFileInfoFmt(InfoFromSMgrRel(reln)),
forknum,
blocknum,
used_pages), errbacktrace()));
if (used_pages == MAX_PAGES)
elog(ERROR, "Inmem storage overflow");

View File

@@ -142,7 +142,7 @@ static BufferTag target_redo_tag;
static XLogReaderState *reader_state;
#define TRACE LOG
#define TRACE DEBUG1
#ifdef HAVE_LIBSECCOMP
@@ -194,6 +194,7 @@ static PgSeccompRule allowed_syscalls[] =
* is stored in MyProcPid anyway.
*/
PG_SCMP_ALLOW(getpid),
PG_SCMP_ALLOW(futex), /* needed for errbacktrace */
/* Enable those for a proper shutdown. */
#if 0
@@ -253,7 +254,7 @@ WalRedoMain(int argc, char *argv[])
* which is super strange but that's not something we can solve
* for here. ¯\_(-_-)_/¯
*/
SetConfigOption("log_min_messages", "FATAL", PGC_SUSET, PGC_S_OVERRIDE);
SetConfigOption("log_min_messages", "WARNING", PGC_SUSET, PGC_S_OVERRIDE);
SetConfigOption("client_min_messages", "ERROR", PGC_SUSET,
PGC_S_OVERRIDE);
@@ -758,6 +759,11 @@ BeginRedoForBlock(StringInfo input_message)
{
reln->smgr_cached_nblocks[forknum] = blknum + 1;
}
if (target_redo_tag.forkNum == MAIN_FORKNUM)
{
reln->smgr_cached_nblocks[FSM_FORKNUM] = MaxBlockNumber;
reln->smgr_cached_nblocks[VISIBILITYMAP_FORKNUM] = MaxBlockNumber;
}
}
/*
@@ -1053,6 +1059,9 @@ GetPage(StringInfo input_message)
DropRelationAllLocalBuffers(rinfo);
wal_redo_buffer = InvalidBuffer;
/* Remove relation from SMGR relastion cache */
AtEOXact_SMgr();
elog(TRACE, "Page sent back for block %u", blknum);
}

View File

@@ -24,6 +24,7 @@ hex.workspace = true
hyper0.workspace = true
humantime.workspace = true
itertools.workspace = true
json-structural-diff.workspace = true
lasso.workspace = true
once_cell.workspace = true
pageserver_api.workspace = true

View File

@@ -1,6 +1,7 @@
use crate::pageserver_client::PageserverClient;
use crate::persistence::Persistence;
use crate::{compute_hook, service};
use json_structural_diff::JsonDiff;
use pageserver_api::controller_api::{AvailabilityZone, MigrationConfig, PlacementPolicy};
use pageserver_api::models::{
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig, TenantWaitLsnRequest,
@@ -24,7 +25,7 @@ use crate::compute_hook::{ComputeHook, NotifyError};
use crate::node::Node;
use crate::tenant_shard::{IntentState, ObservedState, ObservedStateDelta, ObservedStateLocation};
const DEFAULT_HEATMAP_PERIOD: &str = "60s";
const DEFAULT_HEATMAP_PERIOD: Duration = Duration::from_secs(60);
/// Object with the lifetime of the background reconcile task that is created
/// for tenants which have a difference between their intent and observed states.
@@ -880,7 +881,27 @@ impl Reconciler {
self.generation = Some(generation);
wanted_conf.generation = generation.into();
}
tracing::info!(node_id=%node.get_id(), "Observed configuration requires update.");
let diff = match observed {
Some(ObservedStateLocation {
conf: Some(observed),
}) => {
let diff = JsonDiff::diff(
&serde_json::to_value(observed.clone()).unwrap(),
&serde_json::to_value(wanted_conf.clone()).unwrap(),
false,
);
if let Some(json_diff) = diff.diff {
serde_json::to_string(&json_diff).unwrap_or("diff err".to_string())
} else {
"unknown".to_string()
}
}
_ => "full".to_string(),
};
tracing::info!(node_id=%node.get_id(), "Observed configuration requires update: {diff}");
// Because `node` comes from a ref to &self, clone it before calling into a &mut self
// function: this could be avoided by refactoring the state mutated by location_config into
@@ -1180,7 +1201,7 @@ fn ha_aware_config(config: &TenantConfig, has_secondaries: bool) -> TenantConfig
let mut config = config.clone();
if has_secondaries {
if config.heatmap_period.is_none() {
config.heatmap_period = Some(DEFAULT_HEATMAP_PERIOD.to_string());
config.heatmap_period = Some(DEFAULT_HEATMAP_PERIOD);
}
} else {
config.heatmap_period = None;

View File

@@ -2926,7 +2926,9 @@ impl Service {
first
};
let updated_config = base.apply_patch(patch);
let updated_config = base
.apply_patch(patch)
.map_err(|err| ApiError::BadRequest(anyhow::anyhow!(err)))?;
self.set_tenant_config_and_reconcile(tenant_id, updated_config)
.await
}
@@ -6654,11 +6656,12 @@ impl Service {
) -> Option<ReconcilerWaiter> {
let reconcile_needed = shard.get_reconcile_needed(nodes);
match reconcile_needed {
let reconcile_reason = match reconcile_needed {
ReconcileNeeded::No => return None,
ReconcileNeeded::WaitExisting(waiter) => return Some(waiter),
ReconcileNeeded::Yes => {
ReconcileNeeded::Yes(reason) => {
// Fall through to try and acquire units for spawning reconciler
reason
}
};
@@ -6697,6 +6700,7 @@ impl Service {
};
shard.spawn_reconciler(
reconcile_reason,
&self.result_tx,
nodes,
&self.compute_hook,
@@ -6821,7 +6825,7 @@ impl Service {
// with the frequency of background calls, this acts as an implicit rate limit that runs a small
// trickle of optimizations in the background, rather than executing a large number in parallel
// when a change occurs.
const MAX_OPTIMIZATIONS_EXEC_PER_PASS: usize = 2;
const MAX_OPTIMIZATIONS_EXEC_PER_PASS: usize = 16;
// Synchronous prepare: scan shards for possible scheduling optimizations
let candidate_work = self.optimize_all_plan();
@@ -6872,7 +6876,7 @@ impl Service {
// How many candidate optimizations we will generate, before evaluating them for readniess: setting
// this higher than the execution limit gives us a chance to execute some work even if the first
// few optimizations we find are not ready.
const MAX_OPTIMIZATIONS_PLAN_PER_PASS: usize = 8;
const MAX_OPTIMIZATIONS_PLAN_PER_PASS: usize = 64;
let mut work = Vec::new();
let mut locked = self.inner.write().unwrap();

View File

@@ -481,7 +481,14 @@ pub(crate) enum ReconcileNeeded {
/// spawned: wait for the existing reconciler rather than spawning a new one.
WaitExisting(ReconcilerWaiter),
/// shard needs reconciliation: call into [`TenantShard::spawn_reconciler`]
Yes,
Yes(ReconcileReason),
}
#[derive(Debug)]
pub(crate) enum ReconcileReason {
ActiveNodesDirty,
UnknownLocation,
PendingComputeNotification,
}
/// Pending modification to the observed state of a tenant shard.
@@ -1341,12 +1348,18 @@ impl TenantShard {
let active_nodes_dirty = self.dirty(pageservers);
// Even if there is no pageserver work to be done, if we have a pending notification to computes,
// wake up a reconciler to send it.
let do_reconcile =
active_nodes_dirty || dirty_observed || self.pending_compute_notification;
let reconcile_needed = match (
active_nodes_dirty,
dirty_observed,
self.pending_compute_notification,
) {
(true, _, _) => ReconcileNeeded::Yes(ReconcileReason::ActiveNodesDirty),
(_, true, _) => ReconcileNeeded::Yes(ReconcileReason::UnknownLocation),
(_, _, true) => ReconcileNeeded::Yes(ReconcileReason::PendingComputeNotification),
_ => ReconcileNeeded::No,
};
if !do_reconcile {
if matches!(reconcile_needed, ReconcileNeeded::No) {
tracing::debug!("Not dirty, no reconciliation needed.");
return ReconcileNeeded::No;
}
@@ -1389,7 +1402,7 @@ impl TenantShard {
}
}
ReconcileNeeded::Yes
reconcile_needed
}
/// Ensure the sequence number is set to a value where waiting for this value will make us wait
@@ -1479,6 +1492,7 @@ impl TenantShard {
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
pub(crate) fn spawn_reconciler(
&mut self,
reason: ReconcileReason,
result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResultRequest>,
pageservers: &Arc<HashMap<NodeId, Node>>,
compute_hook: &Arc<ComputeHook>,
@@ -1538,7 +1552,7 @@ impl TenantShard {
let reconcile_seq = self.sequence;
let long_reconcile_threshold = service_config.long_reconcile_threshold;
tracing::info!(seq=%reconcile_seq, "Spawning Reconciler for sequence {}", self.sequence);
tracing::info!(seq=%reconcile_seq, "Spawning Reconciler ({reason:?})");
let must_notify = self.pending_compute_notification;
let reconciler_span = tracing::info_span!(parent: None, "reconciler", seq=%reconcile_seq,
tenant_id=%reconciler.tenant_shard_id.tenant_id,

View File

@@ -0,0 +1,101 @@
from __future__ import annotations
import time
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from fixtures.utils import USE_LFC
@pytest.mark.timeout(600)
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
def test_lfc_prefetch(neon_simple_env: NeonEnv):
"""
Test resizing the Local File Cache
"""
env = neon_simple_env
endpoint = env.endpoints.create_start(
"main",
config_lines=[
"neon.max_file_cache_size=1GB",
"neon.file_cache_size_limit=1GB",
"effective_io_concurrency=100",
"shared_buffers=1MB",
"enable_bitmapscan=off",
"enable_seqscan=off",
"autovacuum=off",
],
)
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("create extension neon")
cur.execute("create table t(pk integer, sk integer, filler text default repeat('x',200))")
cur.execute("set statement_timeout=0")
cur.execute("select setseed(0.5)")
cur.execute("insert into t values (generate_series(1,1000000),random()*1000000)")
cur.execute("create index on t(sk)")
cur.execute("vacuum t")
# reset LFC
cur.execute("alter system set neon.file_cache_size_limit=0")
cur.execute("select pg_reload_conf()")
time.sleep(1)
cur.execute("alter system set neon.file_cache_size_limit='1GB'")
cur.execute("select pg_reload_conf()")
cur.execute(
"explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 100000 and 200000 limit 100) s1"
)
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
log.info(f"Unused prefetches: {prefetch_expired}")
cur.execute(
"explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 200000 and 300000 limit 100) s2"
)
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
log.info(f"Unused prefetches: {prefetch_expired}")
cur.execute(
"explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 300000 and 400000 limit 100) s3"
)
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
log.info(f"Unused prefetches: {prefetch_expired}")
cur.execute(
"explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 100000 and 200000 limit 100) s4"
)
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
log.info(f"Unused prefetches: {prefetch_expired}")
# if prefetch requests are not stored in LFC, we continue to sent unused prefetch request tyo PS
assert prefetch_expired > 0
cur.execute("set neon.store_prefetch_result_in_lfc=on")
cur.execute(
"explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 500000 and 600000 limit 100) s5"
)
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
log.info(f"Unused prefetches: {prefetch_expired}")
cur.execute(
"explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 600000 and 700000 limit 100) s6"
)
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
log.info(f"Unused prefetches: {prefetch_expired}")
cur.execute(
"explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 700000 and 800000 limit 100) s7"
)
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
log.info(f"Unused prefetches: {prefetch_expired}")
cur.execute(
"explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 500000 and 600000 limit 100) s8"
)
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
log.info(f"Unused prefetches: {prefetch_expired}")
# No redundant prefethc requrests if prefetch results are stored in LFC
assert prefetch_expired == 0

View File

@@ -3817,3 +3817,43 @@ def test_update_node_on_registration(neon_env_builder: NeonEnvBuilder):
nodes = env.storage_controller.node_list()
assert len(nodes) == 1
assert nodes[0]["listen_https_port"] is None
def test_storage_controller_location_conf_equivalence(neon_env_builder: NeonEnvBuilder):
"""
Validate that a storage controller restart with no shards in a transient state
performs zero reconciliations at start-up. Implicitly, this means that the location
configs returned by the pageserver are identical to the persisted state in the
storage controller database.
"""
neon_env_builder.num_pageservers = 1
neon_env_builder.storage_controller_config = {
"start_as_candidate": False,
}
env = neon_env_builder.init_configs()
env.start()
tenant_id = TenantId.generate()
env.storage_controller.tenant_create(
tenant_id, shard_count=2, tenant_config={"pitr_interval": "1h2m3s"}
)
env.storage_controller.reconcile_until_idle()
reconciles_before_restart = env.storage_controller.get_metric_value(
"storage_controller_reconcile_complete_total", filter={"status": "ok"}
)
assert reconciles_before_restart != 0
env.storage_controller.stop()
env.storage_controller.start()
env.storage_controller.reconcile_until_idle()
reconciles_after_restart = env.storage_controller.get_metric_value(
"storage_controller_reconcile_complete_total", filter={"status": "ok"}
)
assert reconciles_after_restart == 0