Compare commits

..

5 Commits

Author SHA1 Message Date
Heikki Linnakangas
2efad33ab9 Temporarily disable submodules check
It failed on a PR that looks totally fine:
https://github.com/neondatabase/neon/actions/runs/10993839972/job/30526595474?pr=9084

Disable it to unblock that work and other submodule changes, while we
investigate.
2024-09-23 18:52:32 +03:00
Nikita Kalyanov
f446e08fb8 change HTTP method to comply with spec (#9100)
There is discrepancy with the spec, it has PUT
2024-09-23 15:53:06 +02:00
Christian Schwarz
4d5add9ca0 compact_level0_phase1: remove final traces of value access mode config (#8935)
refs https://github.com/neondatabase/neon/issues/8184
stacked atop https://github.com/neondatabase/neon/pull/8934

This PR changes from ignoring the config field to rejecting configs that
contain it.

PR https://github.com/neondatabase/infra/pull/1903 removes the field
usage from `pageserver.toml`.

It rolls into prod sooner or in the same release as this PR.
2024-09-23 15:05:22 +02:00
Christian Schwarz
59b4c2eaf9 walredo: add a ping method (#8952)
Not used in production, but in benchmarks, to demonstrate minimal RTT.
(It would be nice to not have to copy the 8KiB of zeroes, but, that
would require larger protocol changes).

Found this useful in investigation
https://github.com/neondatabase/neon/pull/8952.
2024-09-23 10:19:37 +00:00
Vlad Lazar
5432155b0d storcon: update compute hook state on detach (#9045)
## Problem

Previously, the storage controller may send compute notifications
containing stale pageservers (i.e. pageserver serving the shard was
detached). This happened because detaches did not update the compute
hook state.

## Summary of Changes

Update compute hook state on shard detach.

Fixes #8928
2024-09-23 10:05:02 +01:00
22 changed files with 273 additions and 222 deletions

View File

@@ -1207,7 +1207,8 @@ jobs:
# Usually we do `needs: [...]`
needs:
- build-and-test-locally
- check-submodules
# XXX: Temporarily disabled, while we investigate an unexpected failure with it
#- check-submodules
- check-codestyle-python
- check-codestyle-rust
- promote-images

View File

@@ -104,9 +104,6 @@ pub struct ConfigToml {
pub image_compression: ImageCompressionAlgorithm,
pub ephemeral_bytes_per_memory_kb: usize,
pub l0_flush: Option<crate::models::L0FlushConfig>,
#[serde(skip_serializing)]
// TODO(https://github.com/neondatabase/neon/issues/8184): remove after this field is removed from all pageserver.toml's
pub compact_level0_phase1_value_access: serde::de::IgnoredAny,
pub virtual_file_direct_io: crate::models::virtual_file::DirectIoMode,
pub io_buffer_alignment: usize,
}
@@ -384,7 +381,6 @@ impl Default for ConfigToml {
image_compression: (DEFAULT_IMAGE_COMPRESSION),
ephemeral_bytes_per_memory_kb: (DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
l0_flush: None,
compact_level0_phase1_value_access: Default::default(),
virtual_file_direct_io: crate::models::virtual_file::DirectIoMode::default(),
io_buffer_alignment: DEFAULT_IO_BUFFER_ALIGNMENT,

View File

@@ -1,7 +1,7 @@
//! Quantify a single walredo manager's throughput under N concurrent callers.
//!
//! The benchmark implementation ([`bench_impl`]) is parametrized by
//! - `redo_work` => [`Request::short_request`] or [`Request::medium_request`]
//! - `redo_work` => an async closure that takes a `PostgresRedoManager` and performs one redo
//! - `n_redos` => number of times the benchmark shell execute the `redo_work`
//! - `nclients` => number of clients (more on this shortly).
//!
@@ -10,7 +10,7 @@
//! Each task executes the `redo_work` `n_redos/nclients` times.
//!
//! We exercise the following combinations:
//! - `redo_work = short / medium``
//! - `redo_work = ping / short / medium``
//! - `nclients = [1, 2, 4, 8, 16, 32, 64, 128]`
//!
//! We let `criterion` determine the `n_redos` using `iter_custom`.
@@ -27,33 +27,43 @@
//!
//! # Reference Numbers
//!
//! 2024-04-15 on i3en.3xlarge
//! 2024-09-18 on im4gn.2xlarge
//!
//! ```text
//! short/1 time: [24.584 µs 24.737 µs 24.922 µs]
//! short/2 time: [33.479 µs 33.660 µs 33.888 µs]
//! short/4 time: [42.713 µs 43.046 µs 43.440 µs]
//! short/8 time: [71.814 µs 72.478 µs 73.240 µs]
//! short/16 time: [132.73 µs 134.45 µs 136.22 µs]
//! short/32 time: [258.31 µs 260.73 µs 263.27 µs]
//! short/64 time: [511.61 µs 514.44 µs 517.51 µs]
//! short/128 time: [992.64 µs 998.23 µs 1.0042 ms]
//! medium/1 time: [110.11 µs 110.50 µs 110.96 µs]
//! medium/2 time: [153.06 µs 153.85 µs 154.99 µs]
//! medium/4 time: [317.51 µs 319.92 µs 322.85 µs]
//! medium/8 time: [638.30 µs 644.68 µs 652.12 µs]
//! medium/16 time: [1.2651 ms 1.2773 ms 1.2914 ms]
//! medium/32 time: [2.5117 ms 2.5410 ms 2.5720 ms]
//! medium/64 time: [4.8088 ms 4.8555 ms 4.9047 ms]
//! medium/128 time: [8.8311 ms 8.9849 ms 9.1263 ms]
//! ping/1 time: [21.789 µs 21.918 µs 22.078 µs]
//! ping/2 time: [27.686 µs 27.812 µs 27.970 µs]
//! ping/4 time: [35.468 µs 35.671 µs 35.926 µs]
//! ping/8 time: [59.682 µs 59.987 µs 60.363 µs]
//! ping/16 time: [101.79 µs 102.37 µs 103.08 µs]
//! ping/32 time: [184.18 µs 185.15 µs 186.36 µs]
//! ping/64 time: [349.86 µs 351.45 µs 353.47 µs]
//! ping/128 time: [684.53 µs 687.98 µs 692.17 µs]
//! short/1 time: [31.833 µs 32.126 µs 32.428 µs]
//! short/2 time: [35.558 µs 35.756 µs 35.992 µs]
//! short/4 time: [44.850 µs 45.138 µs 45.484 µs]
//! short/8 time: [65.985 µs 66.379 µs 66.853 µs]
//! short/16 time: [127.06 µs 127.90 µs 128.87 µs]
//! short/32 time: [252.98 µs 254.70 µs 256.73 µs]
//! short/64 time: [497.13 µs 499.86 µs 503.26 µs]
//! short/128 time: [987.46 µs 993.45 µs 1.0004 ms]
//! medium/1 time: [137.91 µs 138.55 µs 139.35 µs]
//! medium/2 time: [192.00 µs 192.91 µs 194.07 µs]
//! medium/4 time: [389.62 µs 391.55 µs 394.01 µs]
//! medium/8 time: [776.80 µs 780.33 µs 784.77 µs]
//! medium/16 time: [1.5323 ms 1.5383 ms 1.5459 ms]
//! medium/32 time: [3.0120 ms 3.0226 ms 3.0350 ms]
//! medium/64 time: [5.7405 ms 5.7787 ms 5.8166 ms]
//! medium/128 time: [10.412 ms 10.574 ms 10.718 ms]
//! ```
use anyhow::Context;
use bytes::{Buf, Bytes};
use criterion::{BenchmarkId, Criterion};
use once_cell::sync::Lazy;
use pageserver::{config::PageServerConf, walrecord::NeonWalRecord, walredo::PostgresRedoManager};
use pageserver_api::{key::Key, shard::TenantShardId};
use std::{
future::Future,
sync::Arc,
time::{Duration, Instant},
};
@@ -61,40 +71,59 @@ use tokio::{sync::Barrier, task::JoinSet};
use utils::{id::TenantId, lsn::Lsn};
fn bench(c: &mut Criterion) {
{
let nclients = [1, 2, 4, 8, 16, 32, 64, 128];
for nclients in nclients {
let mut group = c.benchmark_group("short");
group.bench_with_input(
BenchmarkId::from_parameter(nclients),
&nclients,
|b, nclients| {
let redo_work = Arc::new(Request::short_input());
b.iter_custom(|iters| bench_impl(Arc::clone(&redo_work), iters, *nclients));
},
);
}
}
{
let nclients = [1, 2, 4, 8, 16, 32, 64, 128];
for nclients in nclients {
let mut group = c.benchmark_group("medium");
group.bench_with_input(
BenchmarkId::from_parameter(nclients),
&nclients,
|b, nclients| {
let redo_work = Arc::new(Request::medium_input());
b.iter_custom(|iters| bench_impl(Arc::clone(&redo_work), iters, *nclients));
},
);
}
macro_rules! bench_group {
($name:expr, $redo_work:expr) => {{
let name: &str = $name;
let nclients = [1, 2, 4, 8, 16, 32, 64, 128];
for nclients in nclients {
let mut group = c.benchmark_group(name);
group.bench_with_input(
BenchmarkId::from_parameter(nclients),
&nclients,
|b, nclients| {
b.iter_custom(|iters| bench_impl($redo_work, iters, *nclients));
},
);
}
}};
}
//
// benchmark the protocol implementation
//
let pg_version = 14;
bench_group!(
"ping",
Arc::new(move |mgr: Arc<PostgresRedoManager>| async move {
let _: () = mgr.ping(pg_version).await.unwrap();
})
);
//
// benchmarks with actual record redo
//
let make_redo_work = |req: &'static Request| {
Arc::new(move |mgr: Arc<PostgresRedoManager>| async move {
let page = req.execute(&mgr).await.unwrap();
assert_eq!(page.remaining(), 8192);
})
};
bench_group!("short", {
static REQUEST: Lazy<Request> = Lazy::new(Request::short_input);
make_redo_work(&REQUEST)
});
bench_group!("medium", {
static REQUEST: Lazy<Request> = Lazy::new(Request::medium_input);
make_redo_work(&REQUEST)
});
}
criterion::criterion_group!(benches, bench);
criterion::criterion_main!(benches);
// Returns the sum of each client's wall-clock time spent executing their share of the n_redos.
fn bench_impl(redo_work: Arc<Request>, n_redos: u64, nclients: u64) -> Duration {
fn bench_impl<F, Fut>(redo_work: Arc<F>, n_redos: u64, nclients: u64) -> Duration
where
F: Fn(Arc<PostgresRedoManager>) -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
let repo_dir = camino_tempfile::tempdir_in(env!("CARGO_TARGET_TMPDIR")).unwrap();
let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf());
@@ -135,17 +164,20 @@ fn bench_impl(redo_work: Arc<Request>, n_redos: u64, nclients: u64) -> Duration
})
}
async fn client(
async fn client<F, Fut>(
mgr: Arc<PostgresRedoManager>,
start: Arc<Barrier>,
redo_work: Arc<Request>,
redo_work: Arc<F>,
n_redos: u64,
) -> Duration {
) -> Duration
where
F: Fn(Arc<PostgresRedoManager>) -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
start.wait().await;
let start = Instant::now();
for _ in 0..n_redos {
let page = redo_work.execute(&mgr).await.unwrap();
assert_eq!(page.remaining(), 8192);
redo_work(Arc::clone(&mgr)).await;
// The real pageserver will rarely if ever do 2 walredos in a row without
// yielding to the executor.
tokio::task::yield_now().await;

View File

@@ -432,7 +432,7 @@ impl Client {
self.mgmt_api_endpoint
);
self.request(Method::POST, &uri, req)
self.request(Method::PUT, &uri, req)
.await?
.json()
.await

View File

@@ -324,7 +324,6 @@ impl PageServerConf {
max_vectored_read_bytes,
image_compression,
ephemeral_bytes_per_memory_kb,
compact_level0_phase1_value_access: _,
l0_flush,
virtual_file_direct_io,
concurrent_tenant_warmup,
@@ -535,16 +534,6 @@ mod tests {
.expect("parse_and_validate");
}
#[test]
fn test_compactl0_phase1_access_mode_is_ignored_silently() {
let input = indoc::indoc! {r#"
[compact_level0_phase1_value_access]
mode = "streaming-kmerge"
validate = "key-lsn-value"
"#};
toml_edit::de::from_str::<pageserver_api::config::ConfigToml>(input).unwrap();
}
/// If there's a typo in the pageserver config, we'd rather catch that typo
/// and fail pageserver startup than silently ignoring the typo, leaving whoever
/// made it in the believe that their config change is effective.

View File

@@ -2955,7 +2955,7 @@ pub fn make_router(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/preserve_initdb_archive",
|r| api_handler(r, timeline_preserve_initdb_handler),
)
.post(
.put(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/archival_config",
|r| api_handler(r, timeline_archival_config_handler),
)

View File

@@ -205,6 +205,22 @@ impl PostgresRedoManager {
}
}
/// Do a ping request-response roundtrip.
///
/// Not used in production, but by Rust benchmarks.
///
/// # Cancel-Safety
///
/// This method is cancellation-safe.
pub async fn ping(&self, pg_version: u32) -> Result<(), Error> {
self.do_with_walredo_process(pg_version, |proc| async move {
proc.ping(Duration::from_secs(1))
.await
.map_err(Error::Other)
})
.await
}
pub fn status(&self) -> WalRedoManagerStatus {
WalRedoManagerStatus {
last_redo_at: {
@@ -297,6 +313,9 @@ impl PostgresRedoManager {
}
}
/// # Cancel-Safety
///
/// This method is cancel-safe iff `closure` is cancel-safe.
async fn do_with_walredo_process<
F: FnOnce(Arc<Process>) -> Fut,
Fut: Future<Output = Result<O, Error>>,
@@ -537,6 +556,17 @@ mod tests {
use tracing::Instrument;
use utils::{id::TenantId, lsn::Lsn};
#[tokio::test]
async fn test_ping() {
let h = RedoHarness::new().unwrap();
h.manager
.ping(14)
.instrument(h.span())
.await
.expect("ping should work");
}
#[tokio::test]
async fn short_v14_redo() {
let expected = std::fs::read("test_data/short_v14_redo.page").unwrap();

View File

@@ -6,6 +6,7 @@ use self::no_leak_child::NoLeakChild;
use crate::{
config::PageServerConf,
metrics::{WalRedoKillCause, WAL_REDO_PROCESS_COUNTERS, WAL_REDO_RECORD_COUNTER},
page_cache::PAGE_SZ,
span::debug_assert_current_span_has_tenant_id,
walrecord::NeonWalRecord,
};
@@ -237,6 +238,26 @@ impl WalRedoProcess {
res
}
/// Do a ping request-response roundtrip.
///
/// Not used in production, but by Rust benchmarks.
pub(crate) async fn ping(&self, timeout: Duration) -> anyhow::Result<()> {
let mut writebuf: Vec<u8> = Vec::with_capacity(4);
protocol::build_ping_msg(&mut writebuf);
let Ok(res) = tokio::time::timeout(timeout, self.apply_wal_records0(&writebuf)).await
else {
anyhow::bail!("WAL redo ping timed out");
};
let response = res?;
if response.len() != PAGE_SZ {
anyhow::bail!(
"WAL redo ping response should respond with page-sized response: {}",
response.len()
);
}
Ok(())
}
/// # Cancel-Safety
///
/// When not polled to completion (e.g. because in `tokio::select!` another

View File

@@ -55,3 +55,8 @@ pub(crate) fn build_get_page_msg(tag: BufferTag, buf: &mut Vec<u8>) {
tag.ser_into(buf)
.expect("serialize BufferTag should always succeed");
}
pub(crate) fn build_ping_msg(buf: &mut Vec<u8>) {
buf.put_u8(b'H');
buf.put_u32(4);
}

View File

@@ -23,7 +23,7 @@ SHLIB_LINK_INTERNAL = $(libpq)
SHLIB_LINK = -lcurl
EXTENSION = neon
DATA = neon--1.0.sql neon--1.0--1.1.sql neon--1.1--1.2.sql neon--1.2--1.3.sql neon--1.3--1.2.sql neon--1.2--1.1.sql neon--1.1--1.0.sql neon--1.3--1.4.sql neon--1.4--1.3.sql neon--1.4--1.5.sql neon--1.5--1.4.sql
DATA = neon--1.0.sql neon--1.0--1.1.sql neon--1.1--1.2.sql neon--1.2--1.3.sql neon--1.3--1.2.sql neon--1.2--1.1.sql neon--1.1--1.0.sql neon--1.3--1.4.sql neon--1.4--1.3.sql
PGFILEDESC = "neon - cloud storage for PostgreSQL"
EXTRA_CLEAN = \

View File

@@ -1263,7 +1263,7 @@ approximate_working_set_size_seconds(PG_FUNCTION_ARGS)
int32 dc;
time_t duration = PG_ARGISNULL(0) ? (time_t)-1 : PG_GETARG_INT32(0);
LWLockAcquire(lfc_lock, LW_SHARED);
dc = (int32) estimateSHLL(&lfc_ctl->wss_estimation, duration, 1.0);
dc = (int32) estimateSHLL(&lfc_ctl->wss_estimation, duration);
LWLockRelease(lfc_lock);
PG_RETURN_INT32(dc);
}
@@ -1280,7 +1280,7 @@ approximate_working_set_size(PG_FUNCTION_ARGS)
int32 dc;
bool reset = PG_GETARG_BOOL(0);
LWLockAcquire(lfc_lock, reset ? LW_EXCLUSIVE : LW_SHARED);
dc = (int32) estimateSHLL(&lfc_ctl->wss_estimation, (time_t)-1, 1.0);
dc = (int32) estimateSHLL(&lfc_ctl->wss_estimation, (time_t)-1);
if (reset)
memset(lfc_ctl->wss_estimation.regs, 0, sizeof lfc_ctl->wss_estimation.regs);
LWLockRelease(lfc_lock);
@@ -1288,21 +1288,3 @@ approximate_working_set_size(PG_FUNCTION_ARGS)
}
PG_RETURN_NULL();
}
PG_FUNCTION_INFO_V1(approximate_optimal_cache_size);
Datum
approximate_optimal_cache_size(PG_FUNCTION_ARGS)
{
if (lfc_size_limit != 0)
{
int32 dc;
time_t duration = PG_ARGISNULL(0) ? (time_t)-1 : PG_GETARG_INT32(0);
double min_hit_ratio = PG_ARGISNULL(1) ? 1.0 : PG_GETARG_FLOAT8(1);
LWLockAcquire(lfc_lock, LW_SHARED);
dc = (int32) estimateSHLL(&lfc_ctl->wss_estimation, duration, min_hit_ratio);
LWLockRelease(lfc_lock);
PG_RETURN_INT32(dc);
}
PG_RETURN_NULL();
}

View File

@@ -6,7 +6,7 @@
* Portions Copyright (c) 2014-2023, PostgreSQL Global Development Group
*
* Implements https://hal.science/hal-00465313/document
*
*
* Based on Hideaki Ohno's C++ implementation. This is probably not ideally
* suited to estimating the cardinality of very large sets; in particular, we
* have not attempted to further optimize the implementation as described in
@@ -126,69 +126,22 @@ addSHLL(HyperLogLogState *cState, uint32 hash)
/* Compute the rank of the remaining 32 - "k" (registerWidth) bits */
count = rho(hash << HLL_BIT_WIDTH, HLL_C_BITS);
if (cState->regs[index][count].ts)
{
/* update histgoram */
int64_t delta = (now - cState->regs[index][count].ts)/USECS_PER_SEC;
uint32_t new_histogram[HIST_SIZE] = {0};
for (int i = 0; i < HIST_SIZE; i++) {
/* Use middle point of interval */
uint32 interval_log2 = pg_ceil_log2_32((delta + (HIST_MIN_INTERVAL*((1<<i) + ((1<<i)/2))/2)) / HIST_MIN_INTERVAL);
uint32 cell = Min(interval_log2, HIST_SIZE-1);
new_histogram[cell] += cState->regs[index][count].histogram[i];
}
memcpy(cState->regs[index][count].histogram, new_histogram, sizeof new_histogram);
}
cState->regs[index][count].ts = now;
cState->regs[index][count].histogram[0] += 1; // most recent access always goes to first histogram backet
}
static uint32_t
getAccessCount(const HyperLogLogRegister* reg, time_t duration)
{
uint32_t count = 0;
/* Simplest solution is to take in account all points fro overlapped interval */
for (size_t i = 0; i < HIST_SIZE && HIST_MIN_INTERVAL*((1 << i)/2) <= duration; i++) {
count += reg->histogram[i];
}
return count;
cState->regs[index][count] = now;
}
static uint8
getMaximum(const HyperLogLogRegister* reg, TimestampTz since, time_t duration, double min_hit_ratio)
getMaximum(const TimestampTz* reg, TimestampTz since)
{
uint8 max = 0;
size_t i, j;
if (min_hit_ratio == 1.0)
for (size_t i = 0; i < HLL_C_BITS + 1; i++)
{
for (i = 0; i < HLL_C_BITS + 1; i++)
if (reg[i] >= since)
{
if (reg[i].ts >= since)
{
max = i;
}
}
}
else
{
uint32_t total_count = 0;
for (i = 0; i < HLL_C_BITS + 1; i++)
{
total_count += getAccessCount(&reg[i], duration);
}
if (total_count != 0)
{
const double threshold = total_count * (1 - min_hit_ratio);
for (i = 0; i < HLL_C_BITS + 1; i++)
{
// Take in account only bits with access frequncy exceeding maximal miss rate (1 - hit rate)
if (reg[i].ts >= since && getAccessCount(&reg[i], duration) >= threshold)
{
max = i;
}
}
max = i;
}
}
return max;
}
@@ -197,7 +150,7 @@ getMaximum(const HyperLogLogRegister* reg, TimestampTz since, time_t duration, d
* Estimates cardinality, based on elements added so far
*/
double
estimateSHLL(HyperLogLogState *cState, time_t duration, double min_hit_ratio)
estimateSHLL(HyperLogLogState *cState, time_t duration)
{
double result;
double sum = 0.0;
@@ -208,7 +161,7 @@ estimateSHLL(HyperLogLogState *cState, time_t duration, double min_hit_ratio)
for (i = 0; i < HLL_N_REGISTERS; i++)
{
R[i] = getMaximum(cState->regs[i], since, duration, min_hit_ratio);
R[i] = getMaximum(cState->regs[i], since);
sum += 1.0 / pow(2.0, R[i]);
}

View File

@@ -53,14 +53,6 @@
#define HLL_C_BITS (32 - HLL_BIT_WIDTH)
#define HLL_N_REGISTERS (1 << HLL_BIT_WIDTH)
/*
* Number of histogram cells. We use exponential histogram with first interval
* equals to one minutes. Autoscaler request LFC statistic with intervals 1,2,...,60 minutes
* so 2^8=64 seems to be enough for our needs.
*/
#define HIST_SIZE 8
#define HIST_MIN_INTERVAL 60 /* seconds */
/*
* HyperLogLog is an approximate technique for computing the number of distinct
* entries in a set. Importantly, it does this by using a fixed amount of
@@ -77,21 +69,18 @@
* modified timestamp >= the query timestamp. This value is the number of bits
* for this register in the normal HLL calculation.
*
* The memory usage is 2^B * (C + 1) * sizeof(HyperLogLogRegister), or 920kiB.
* The memory usage is 2^B * (C + 1) * sizeof(TimetampTz), or 184kiB.
* Usage could be halved if we decide to reduce the required time dimension
* precision; as 32 bits in second precision should be enough for statistics.
* However, that is not yet implemented.
*/
typedef struct
{
TimestampTz ts; /* last access timestamp */
uint32_t histogram[HIST_SIZE]; /* access counter exponential histogram */
} HyperLogLogRegister;
typedef struct HyperLogLogState
{
HyperLogLogRegister regs[HLL_N_REGISTERS][HLL_C_BITS + 1];
TimestampTz regs[HLL_N_REGISTERS][HLL_C_BITS + 1];
} HyperLogLogState;
extern void initSHLL(HyperLogLogState *cState);
extern void addSHLL(HyperLogLogState *cState, uint32 hash);
extern double estimateSHLL(HyperLogLogState *cState, time_t dutration, double min_hit_ratio);
extern double estimateSHLL(HyperLogLogState *cState, time_t dutration);
#endif

View File

@@ -1,10 +0,0 @@
\echo Use "ALTER EXTENSION neon UPDATE TO '1.5'" to load this file. \quit
-- returns minimal LFC cache size (in 8kb pages) provided specified hit rate
CREATE FUNCTION approximate_optimal_cache_size(duration_sec integer default null, min_hit_ration float8 default null)
RETURNS integer
AS 'MODULE_PATHNAME', 'approximate_optimal_cache_size'
LANGUAGE C PARALLEL SAFE;
GRANT EXECUTE ON FUNCTION approximate_optimal_cache_size(integer,float8) TO pg_monitor;

View File

@@ -1 +0,0 @@
DROP FUNCTION IF EXISTS approximate_optimal_cache_size(integer,float8) CASCADE;

View File

@@ -24,6 +24,7 @@
* PushPage ('P'): Copy a page image (in the payload) to buffer cache
* ApplyRecord ('A'): Apply a WAL record (in the payload)
* GetPage ('G'): Return a page image from buffer cache.
* Ping ('H'): Return the input message.
*
* Currently, you only get a response to GetPage requests; the response is
* simply a 8k page, without any headers. Errors are logged to stderr.
@@ -133,6 +134,7 @@ static void ApplyRecord(StringInfo input_message);
static void apply_error_callback(void *arg);
static bool redo_block_filter(XLogReaderState *record, uint8 block_id);
static void GetPage(StringInfo input_message);
static void Ping(StringInfo input_message);
static ssize_t buffered_read(void *buf, size_t count);
static void CreateFakeSharedMemoryAndSemaphores();
@@ -394,6 +396,10 @@ WalRedoMain(int argc, char *argv[])
GetPage(&input_message);
break;
case 'H': /* Ping */
Ping(&input_message);
break;
/*
* EOF means we're done. Perform normal shutdown.
*/
@@ -1057,6 +1063,36 @@ GetPage(StringInfo input_message)
}
static void
Ping(StringInfo input_message)
{
int tot_written;
/* Response: the input message */
tot_written = 0;
do {
ssize_t rc;
/* We don't need alignment, but it's bad practice to use char[BLCKSZ] */
#if PG_VERSION_NUM >= 160000
static const PGIOAlignedBlock response;
#else
static const PGAlignedBlock response;
#endif
rc = write(STDOUT_FILENO, &response.data[tot_written], BLCKSZ - tot_written);
if (rc < 0) {
/* If interrupted by signal, just retry */
if (errno == EINTR)
continue;
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not write to stdout: %m")));
}
tot_written += rc;
} while (tot_written < BLCKSZ);
elog(TRACE, "Page sent back for ping");
}
/* Buffer used by buffered_read() */
static char stdin_buf[16 * 1024];
static size_t stdin_len = 0; /* # of bytes in buffer */

View File

@@ -71,6 +71,37 @@ impl ComputeHookTenant {
}
}
fn is_sharded(&self) -> bool {
matches!(self, ComputeHookTenant::Sharded(_))
}
/// Clear compute hook state for the specified shard.
/// Only valid for [`ComputeHookTenant::Sharded`] instances.
fn remove_shard(&mut self, tenant_shard_id: TenantShardId, stripe_size: ShardStripeSize) {
match self {
ComputeHookTenant::Sharded(sharded) => {
if sharded.stripe_size != stripe_size
|| sharded.shard_count != tenant_shard_id.shard_count
{
tracing::warn!("Shard split detected while handling detach")
}
let shard_idx = sharded.shards.iter().position(|(shard_number, _node_id)| {
*shard_number == tenant_shard_id.shard_number
});
if let Some(shard_idx) = shard_idx {
sharded.shards.remove(shard_idx);
} else {
tracing::warn!("Shard not found while handling detach")
}
}
ComputeHookTenant::Unsharded(_) => {
unreachable!("Detach of unsharded tenants is handled externally");
}
}
}
/// Set one shard's location. If stripe size or shard count have changed, Self is reset
/// and drops existing content.
fn update(
@@ -614,6 +645,36 @@ impl ComputeHook {
self.notify_execute(maybe_send_result, tenant_shard_id, cancel)
.await
}
/// Reflect a detach for a particular shard in the compute hook state.
///
/// The goal is to avoid sending compute notifications with stale information (i.e.
/// including detach pageservers).
#[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
pub(super) fn handle_detach(
&self,
tenant_shard_id: TenantShardId,
stripe_size: ShardStripeSize,
) {
use std::collections::hash_map::Entry;
let mut state_locked = self.state.lock().unwrap();
match state_locked.entry(tenant_shard_id.tenant_id) {
Entry::Vacant(_) => {
tracing::warn!("Compute hook tenant not found for detach");
}
Entry::Occupied(mut e) => {
let sharded = e.get().is_sharded();
if !sharded {
e.remove();
} else {
e.get_mut().remove_shard(tenant_shard_id, stripe_size);
}
tracing::debug!("Compute hook handled shard detach");
}
}
}
}
#[cfg(test)]

View File

@@ -1849,7 +1849,7 @@ pub fn make_router(
RequestName("v1_tenant_timeline"),
)
})
.post(
.put(
"/v1/tenant/:tenant_id/timeline/:timeline_id/archival_config",
|r| {
tenant_service_handler(

View File

@@ -238,7 +238,7 @@ impl PageserverClient {
) -> Result<()> {
measured_request!(
"timeline_archival_config",
crate::metrics::Method::Post,
crate::metrics::Method::Put,
&self.node_id_label,
self.inner
.timeline_archival_config(tenant_shard_id, timeline_id, req)

View File

@@ -820,6 +820,16 @@ impl Reconciler {
self.location_config(&node, conf, None, false).await?;
}
// The condition below identifies a detach. We must have no attached intent and
// must have been attached to something previously. Pass this information to
// the [`ComputeHook`] such that it can update its tenant-wide state.
if self.intent.attached.is_none() && !self.detach.is_empty() {
// TODO: Consider notifying control plane about detaches. This would avoid situations
// where the compute tries to start-up with a stale set of pageservers.
self.compute_hook
.handle_detach(self.tenant_shard_id, self.shard.stripe_size);
}
failpoint_support::sleep_millis_async!("sleep-on-reconcile-epilogue");
Ok(())

View File

@@ -631,7 +631,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
log.info(
f"requesting timeline archival config {config} for tenant {tenant_id} and timeline {timeline_id}"
)
res = self.post(
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/archival_config",
json=config,
)

View File

@@ -114,46 +114,3 @@ def test_sliding_working_set_approximation(neon_simple_env: NeonEnv):
assert estimation_1k >= 20 and estimation_1k <= 40
assert estimation_10k >= 200 and estimation_10k <= 400
def test_optimal_cache_size_approximation(neon_simple_env: NeonEnv):
env = neon_simple_env
endpoint = env.endpoints.create_start(
branch_name="main",
config_lines=[
"autovacuum = off",
"shared_buffers=1MB",
"neon.max_file_cache_size=256MB",
"neon.file_cache_size_limit=245MB",
],
)
conn = endpoint.connect()
cur = conn.cursor()
cur.execute("create extension neon version '1.5'")
cur.execute(
"create table t_huge(pk integer primary key, count integer default 0, payload text default repeat('?', 128))"
)
cur.execute(
"create table t_small(pk integer primary key, count integer default 0, payload text default repeat('?', 128))"
)
cur.execute(
"insert into t_huge(pk) values (generate_series(1,1000000))"
) # table size is 21277 pages
cur.execute(
"insert into t_small(pk) values (generate_series(1,100000))"
) # table size is 2128 pages
time.sleep(2)
before = time.monotonic()
for _ in range(100):
cur.execute("select sum(count) from t_small")
cur.execute("select sum(count) from t_huge")
after = time.monotonic()
cur.execute(f"select approximate_working_set_size_seconds({int(after - before + 1)})")
ws_estimation = cur.fetchall()[0][0]
log.info(f"Working set size estimaton {ws_estimation}")
cur.execute(f"select approximate_optimal_cache_size({int(after - before + 1)}, 0.99)")
optimal_cache_size = cur.fetchall()[0][0]
log.info(f"Optimal cache size for 99% hit rate {optimal_cache_size}")
assert ws_estimation >= 20000 and ws_estimation <= 30000
assert optimal_cache_size >= 2000 and optimal_cache_size <= 3000