mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-16 12:40:36 +00:00
Compare commits
10 Commits
release-79
...
heikki/use
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a9edb99749 | ||
|
|
df264380b9 | ||
|
|
4bbe75de8c | ||
|
|
c0c3ed94a9 | ||
|
|
b1d8771d5f | ||
|
|
3e82addd64 | ||
|
|
5e3c234edc | ||
|
|
ff3819efc7 | ||
|
|
f927ae6e15 | ||
|
|
61d385caea |
20
Cargo.lock
generated
20
Cargo.lock
generated
@@ -1316,6 +1316,8 @@ dependencies = [
|
||||
"flate2",
|
||||
"futures",
|
||||
"http 1.1.0",
|
||||
"hyper 1.4.1",
|
||||
"hyper-util",
|
||||
"metrics",
|
||||
"nix 0.27.1",
|
||||
"notify",
|
||||
@@ -1874,6 +1876,12 @@ dependencies = [
|
||||
"syn 2.0.90",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "difflib"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8"
|
||||
|
||||
[[package]]
|
||||
name = "digest"
|
||||
version = "0.10.7"
|
||||
@@ -3331,6 +3339,17 @@ dependencies = [
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "json-structural-diff"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e878e36a8a44c158505c2c818abdc1350413ad83dcb774a0459f6a7ef2b65cbf"
|
||||
dependencies = [
|
||||
"difflib",
|
||||
"regex",
|
||||
"serde_json",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "jsonwebtoken"
|
||||
version = "9.2.0"
|
||||
@@ -6443,6 +6462,7 @@ dependencies = [
|
||||
"humantime",
|
||||
"hyper 0.14.30",
|
||||
"itertools 0.10.5",
|
||||
"json-structural-diff",
|
||||
"lasso",
|
||||
"measured",
|
||||
"metrics",
|
||||
|
||||
@@ -210,6 +210,7 @@ rustls-native-certs = "0.8"
|
||||
x509-parser = "0.16"
|
||||
whoami = "1.5.1"
|
||||
zerocopy = { version = "0.7", features = ["derive"] }
|
||||
json-structural-diff = { version = "0.2.0" }
|
||||
|
||||
## TODO replace this with tracing
|
||||
env_logger = "0.10"
|
||||
|
||||
@@ -395,15 +395,22 @@ RUN case "${PG_VERSION:?}" in \
|
||||
cd plv8-src && \
|
||||
if [[ "${PG_VERSION:?}" < "v17" ]]; then patch -p1 < /ext-src/plv8-3.1.10.patch; fi
|
||||
|
||||
FROM pg-build AS plv8-build
|
||||
# Step 1: Build the vendored V8 engine. It doesn't depend on PostgreSQL, so use
|
||||
# 'build-deps' as the base. This enables caching and avoids unnecessary rebuilds.
|
||||
# (The V8 engine takes a very long time to build)
|
||||
FROM build-deps AS plv8-build
|
||||
ARG PG_VERSION
|
||||
WORKDIR /ext-src/plv8-src
|
||||
RUN apt update && \
|
||||
apt install --no-install-recommends --no-install-suggests -y \
|
||||
ninja-build python3-dev libncurses5 binutils clang \
|
||||
&& apt clean && rm -rf /var/lib/apt/lists/*
|
||||
|
||||
COPY --from=plv8-src /ext-src/ /ext-src/
|
||||
WORKDIR /ext-src/plv8-src
|
||||
RUN make DOCKER=1 -j $(getconf _NPROCESSORS_ONLN) v8
|
||||
|
||||
# Step 2: Build the PostgreSQL-dependent parts
|
||||
COPY --from=pg-build /usr/local/pgsql /usr/local/pgsql
|
||||
ENV PATH="/usr/local/pgsql/bin:$PATH"
|
||||
RUN \
|
||||
# generate and copy upgrade scripts
|
||||
make generate_upgrades && \
|
||||
|
||||
@@ -5,16 +5,6 @@ commands:
|
||||
user: root
|
||||
sysvInitAction: sysinit
|
||||
shell: 'cgconfigparser -l /etc/cgconfig.conf -s 1664'
|
||||
# restrict permissions on /neonvm/bin/resize-swap, because we grant access to compute_ctl for
|
||||
# running it as root.
|
||||
- name: chmod-resize-swap
|
||||
user: root
|
||||
sysvInitAction: sysinit
|
||||
shell: 'chmod 711 /neonvm/bin/resize-swap'
|
||||
- name: chmod-set-disk-quota
|
||||
user: root
|
||||
sysvInitAction: sysinit
|
||||
shell: 'chmod 711 /neonvm/bin/set-disk-quota'
|
||||
- name: pgbouncer
|
||||
user: postgres
|
||||
sysvInitAction: respawn
|
||||
@@ -44,12 +34,9 @@ shutdownHook: |
|
||||
files:
|
||||
- filename: compute_ctl-sudoers
|
||||
content: |
|
||||
# Allow postgres user (which is what compute_ctl runs as) to run /neonvm/bin/resize-swap
|
||||
# and /neonvm/bin/set-disk-quota as root without requiring entering a password (NOPASSWD),
|
||||
# regardless of hostname (ALL)
|
||||
#
|
||||
# Also allow it to shut down the VM. The fast_import job does that when it's finished.
|
||||
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota, /neonvm/bin/poweroff
|
||||
# Allow postgres user to shut down the VM. The fast_import job does that
|
||||
# when it's finished.
|
||||
postgres ALL=(root) NOPASSWD: /neonvm/bin/poweroff
|
||||
- filename: cgconfig.conf
|
||||
content: |
|
||||
# Configuration for cgroups in VM compute nodes
|
||||
@@ -111,10 +98,7 @@ merge: |
|
||||
&& echo 'root - nofile 1048576' >>/etc/security/limits.conf \
|
||||
)
|
||||
|
||||
# Allow postgres user (compute_ctl) to run swap resizer.
|
||||
# Need to install sudo in order to allow this.
|
||||
#
|
||||
# Also, remove the 'read' permission from group/other on /neonvm/bin/resize-swap, just to be safe.
|
||||
# Install sudo so that the fast_import can do "sudo poweroff"
|
||||
RUN set -e \
|
||||
&& apt update \
|
||||
&& apt install --no-install-recommends -y \
|
||||
|
||||
@@ -5,16 +5,6 @@ commands:
|
||||
user: root
|
||||
sysvInitAction: sysinit
|
||||
shell: 'cgconfigparser -l /etc/cgconfig.conf -s 1664'
|
||||
# restrict permissions on /neonvm/bin/resize-swap, because we grant access to compute_ctl for
|
||||
# running it as root.
|
||||
- name: chmod-resize-swap
|
||||
user: root
|
||||
sysvInitAction: sysinit
|
||||
shell: 'chmod 711 /neonvm/bin/resize-swap'
|
||||
- name: chmod-set-disk-quota
|
||||
user: root
|
||||
sysvInitAction: sysinit
|
||||
shell: 'chmod 711 /neonvm/bin/set-disk-quota'
|
||||
- name: pgbouncer
|
||||
user: postgres
|
||||
sysvInitAction: respawn
|
||||
@@ -44,12 +34,9 @@ shutdownHook: |
|
||||
files:
|
||||
- filename: compute_ctl-sudoers
|
||||
content: |
|
||||
# Allow postgres user (which is what compute_ctl runs as) to run /neonvm/bin/resize-swap
|
||||
# and /neonvm/bin/set-disk-quota as root without requiring entering a password (NOPASSWD),
|
||||
# regardless of hostname (ALL)
|
||||
#
|
||||
# Also allow it to shut down the VM. The fast_import job does that when it's finished.
|
||||
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota, /neonvm/bin/poweroff
|
||||
# Allow postgres user to shut down the VM. The fast_import job does that
|
||||
# when it's finished.
|
||||
postgres ALL=(root) NOPASSWD: /neonvm/bin/poweroff
|
||||
- filename: cgconfig.conf
|
||||
content: |
|
||||
# Configuration for cgroups in VM compute nodes
|
||||
@@ -107,10 +94,7 @@ merge: |
|
||||
&& echo 'root - nofile 1048576' >>/etc/security/limits.conf \
|
||||
)
|
||||
|
||||
# Allow postgres user (compute_ctl) to run swap resizer.
|
||||
# Need to install sudo in order to allow this.
|
||||
#
|
||||
# Also, remove the 'read' permission from group/other on /neonvm/bin/resize-swap, just to be safe.
|
||||
# Install sudo so that the fast_import can do "sudo poweroff"
|
||||
RUN set -e \
|
||||
&& apt update \
|
||||
&& apt install --no-install-recommends -y \
|
||||
|
||||
@@ -25,6 +25,8 @@ fail.workspace = true
|
||||
flate2.workspace = true
|
||||
futures.workspace = true
|
||||
http.workspace = true
|
||||
hyper-util.workspace = true
|
||||
hyper.workspace = true
|
||||
metrics.workspace = true
|
||||
nix.workspace = true
|
||||
notify.workspace = true
|
||||
|
||||
@@ -46,9 +46,9 @@ use std::{thread, time::Duration};
|
||||
use anyhow::{Context, Result};
|
||||
use chrono::Utc;
|
||||
use clap::Parser;
|
||||
use compute_tools::disk_quota::set_disk_quota;
|
||||
use compute_tools::http::server::Server;
|
||||
use compute_tools::lsn_lease::launch_lsn_lease_bg_task_for_static;
|
||||
use compute_tools::neonvmd_client::{resize_swap, set_disk_quota};
|
||||
use signal_hook::consts::{SIGQUIT, SIGTERM};
|
||||
use signal_hook::{consts::SIGINT, iterator::Signals};
|
||||
use tracing::{error, info, warn};
|
||||
@@ -66,7 +66,6 @@ use compute_tools::logger::*;
|
||||
use compute_tools::monitor::launch_monitor;
|
||||
use compute_tools::params::*;
|
||||
use compute_tools::spec::*;
|
||||
use compute_tools::swap::resize_swap;
|
||||
use rlimit::{setrlimit, Resource};
|
||||
use utils::failpoint_support;
|
||||
|
||||
@@ -130,6 +129,7 @@ struct Cli {
|
||||
#[arg(long, action = clap::ArgAction::SetTrue)]
|
||||
pub resize_swap_on_bind: bool,
|
||||
|
||||
/// This is no longer used for anything. It's kept for now just for backwards-compatibility.
|
||||
#[arg(long)]
|
||||
pub set_disk_quota_for_fs: Option<String>,
|
||||
|
||||
@@ -457,10 +457,8 @@ fn start_postgres(
|
||||
}
|
||||
|
||||
// Set disk quota if the compute spec says so
|
||||
if let (Some(disk_quota_bytes), Some(disk_quota_fs_mountpoint)) =
|
||||
(disk_quota_bytes, cli.set_disk_quota_for_fs.as_ref())
|
||||
{
|
||||
match set_disk_quota(disk_quota_bytes, disk_quota_fs_mountpoint) {
|
||||
if let Some(disk_quota_bytes) = disk_quota_bytes {
|
||||
match set_disk_quota(disk_quota_bytes) {
|
||||
Ok(()) => {
|
||||
let size_mib = disk_quota_bytes as f32 / (1 << 20) as f32; // just for more coherent display.
|
||||
info!(%disk_quota_bytes, %size_mib, "set disk quota");
|
||||
|
||||
@@ -1,25 +0,0 @@
|
||||
use anyhow::Context;
|
||||
|
||||
pub const DISK_QUOTA_BIN: &str = "/neonvm/bin/set-disk-quota";
|
||||
|
||||
/// If size_bytes is 0, it disables the quota. Otherwise, it sets filesystem quota to size_bytes.
|
||||
/// `fs_mountpoint` should point to the mountpoint of the filesystem where the quota should be set.
|
||||
pub fn set_disk_quota(size_bytes: u64, fs_mountpoint: &str) -> anyhow::Result<()> {
|
||||
let size_kb = size_bytes / 1024;
|
||||
// run `/neonvm/bin/set-disk-quota {size_kb} {mountpoint}`
|
||||
let child_result = std::process::Command::new("/usr/bin/sudo")
|
||||
.arg(DISK_QUOTA_BIN)
|
||||
.arg(size_kb.to_string())
|
||||
.arg(fs_mountpoint)
|
||||
.spawn();
|
||||
|
||||
child_result
|
||||
.context("spawn() failed")
|
||||
.and_then(|mut child| child.wait().context("wait() failed"))
|
||||
.and_then(|status| match status.success() {
|
||||
true => Ok(()),
|
||||
false => Err(anyhow::anyhow!("process exited with {status}")),
|
||||
})
|
||||
// wrap any prior error with the overall context that we couldn't run the command
|
||||
.with_context(|| format!("could not run `/usr/bin/sudo {DISK_QUOTA_BIN}`"))
|
||||
}
|
||||
@@ -11,7 +11,6 @@ pub mod http;
|
||||
pub mod logger;
|
||||
pub mod catalog;
|
||||
pub mod compute;
|
||||
pub mod disk_quota;
|
||||
pub mod extension_server;
|
||||
pub mod installed_extensions;
|
||||
pub mod local_proxy;
|
||||
@@ -19,9 +18,9 @@ pub mod lsn_lease;
|
||||
pub mod metrics;
|
||||
mod migration;
|
||||
pub mod monitor;
|
||||
pub mod neonvmd_client;
|
||||
pub mod params;
|
||||
pub mod pg_helpers;
|
||||
pub mod spec;
|
||||
mod spec_apply;
|
||||
pub mod swap;
|
||||
pub mod sync_sk;
|
||||
|
||||
102
compute_tools/src/neonvmd_client.rs
Normal file
102
compute_tools/src/neonvmd_client.rs
Normal file
@@ -0,0 +1,102 @@
|
||||
use anyhow::Context;
|
||||
use hyper::client::conn;
|
||||
use hyper::client::conn::http1::SendRequest;
|
||||
use hyper::{Request, StatusCode};
|
||||
use hyper_util::rt::TokioIo;
|
||||
use tracing::warn;
|
||||
|
||||
const NEONVM_DAEMON_CONTROL_SOCKET_PATH: &str = "/run/neonvm-daemon-socket";
|
||||
|
||||
/// Open a connection to neonvm-daemon's control socket, prepare to send
|
||||
/// requests to it with hyper.
|
||||
async fn connect_neonvm_daemon<B>() -> anyhow::Result<SendRequest<B>>
|
||||
where
|
||||
B: hyper::body::Body + 'static + Send,
|
||||
B::Data: Send,
|
||||
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
|
||||
{
|
||||
let mut attempts = 0;
|
||||
let stream = loop {
|
||||
match tokio::net::UnixStream::connect(NEONVM_DAEMON_CONTROL_SOCKET_PATH).await {
|
||||
Ok(stream) => break stream,
|
||||
Err(err) if err.kind() == std::io::ErrorKind::NotFound && attempts < 50 => {
|
||||
// Retry
|
||||
warn!("neonvm-daemon control socket not found, retrying...");
|
||||
attempts += 1;
|
||||
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||
}
|
||||
Err(err) => Err(err).context("opening neonvm-daemon control socket")?,
|
||||
}
|
||||
};
|
||||
let io = TokioIo::new(stream);
|
||||
let (request_sender, connection) = conn::http1::handshake(io).await.unwrap();
|
||||
|
||||
// spawn a task to poll the connection and drive the HTTP state
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = connection.await {
|
||||
eprintln!("Error in connection: {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
Ok(request_sender)
|
||||
}
|
||||
|
||||
pub fn resize_swap(size_bytes: u64) -> anyhow::Result<()> {
|
||||
let rt = tokio::runtime::Handle::current();
|
||||
rt.block_on(resize_swap_async(size_bytes))
|
||||
}
|
||||
|
||||
pub async fn resize_swap_async(size_bytes: u64) -> anyhow::Result<()> {
|
||||
let mut neonvmd = connect_neonvm_daemon().await?;
|
||||
|
||||
// Passing 'once' causes neonvm-daemon to reject any future resize requests
|
||||
let request = Request::builder()
|
||||
.method("POST")
|
||||
.uri("/resize-swap-once")
|
||||
.header("Host", "localhost") // hyper requires Host, even though the server won't care
|
||||
.body(format!("{}", size_bytes))
|
||||
.unwrap();
|
||||
|
||||
let resp = neonvmd.send_request(request).await?;
|
||||
let status = resp.status();
|
||||
match status {
|
||||
StatusCode::OK => Ok(()),
|
||||
StatusCode::CONFLICT => {
|
||||
// 409 Conflict means that the swap was already resized. That happens if the
|
||||
// compute_ctl restarts within the VM. That's considered OK.
|
||||
warn!("Swap was already resized");
|
||||
Ok(())
|
||||
}
|
||||
_ => Err(anyhow::anyhow!(
|
||||
"error resizing swap: {}",
|
||||
status.to_string()
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_disk_quota(size_bytes: u64) -> anyhow::Result<()> {
|
||||
let rt = tokio::runtime::Handle::current();
|
||||
rt.block_on(set_disk_quota_async(size_bytes))
|
||||
}
|
||||
|
||||
/// If size_bytes is 0, it disables the quota. Otherwise, it sets filesystem quota to size_bytes.
|
||||
pub async fn set_disk_quota_async(size_bytes: u64) -> anyhow::Result<()> {
|
||||
let mut neonvmd = connect_neonvm_daemon().await?;
|
||||
|
||||
let request = Request::builder()
|
||||
.method("POST")
|
||||
.uri("/set-disk-quota")
|
||||
.header("Host", "localhost") // hyper requires Host, even though the server won't care
|
||||
.body(format!("{}", size_bytes))
|
||||
.unwrap();
|
||||
|
||||
let resp = neonvmd.send_request(request).await?;
|
||||
let status = resp.status();
|
||||
match status {
|
||||
StatusCode::OK => Ok(()),
|
||||
_ => Err(anyhow::anyhow!(
|
||||
"error setting disk quota: {}",
|
||||
status.to_string()
|
||||
)),
|
||||
}
|
||||
}
|
||||
@@ -7,12 +7,12 @@ use std::sync::Arc;
|
||||
|
||||
use crate::compute::construct_superuser_query;
|
||||
use crate::pg_helpers::{escape_literal, DatabaseExt, Escaping, GenericOptionsSearch, RoleExt};
|
||||
use anyhow::{bail, Result};
|
||||
use anyhow::Result;
|
||||
use compute_api::spec::{ComputeFeature, ComputeSpec, Database, PgIdent, Role};
|
||||
use futures::future::join_all;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio_postgres::Client;
|
||||
use tracing::{debug, info_span, Instrument};
|
||||
use tracing::{debug, info_span, warn, Instrument};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub enum DB {
|
||||
@@ -47,6 +47,11 @@ pub enum PerDatabasePhase {
|
||||
DeleteDBRoleReferences,
|
||||
ChangeSchemaPerms,
|
||||
HandleAnonExtension,
|
||||
/// This is a shared phase, used for both i) dropping dangling LR subscriptions
|
||||
/// before dropping the DB, and ii) dropping all subscriptions after creating
|
||||
/// a fresh branch.
|
||||
/// N.B. we will skip all DBs that are not present in Postgres, invalid, or
|
||||
/// have `datallowconn = false` (`restrict_conn`).
|
||||
DropLogicalSubscriptions,
|
||||
}
|
||||
|
||||
@@ -168,7 +173,7 @@ where
|
||||
///
|
||||
/// In the future we may generate a single stream of changes and then
|
||||
/// sort/merge/batch execution, but for now this is a nice way to improve
|
||||
/// batching behaviour of the commands.
|
||||
/// batching behavior of the commands.
|
||||
async fn get_operations<'a>(
|
||||
spec: &'a ComputeSpec,
|
||||
ctx: &'a RwLock<MutableApplyContext>,
|
||||
@@ -451,6 +456,38 @@ async fn get_operations<'a>(
|
||||
)),
|
||||
}))),
|
||||
ApplySpecPhase::RunInEachDatabase { db, subphase } => {
|
||||
// Do some checks that user DB exists and we can access it.
|
||||
//
|
||||
// During the phases like DropLogicalSubscriptions, DeleteDBRoleReferences,
|
||||
// which happen before dropping the DB, the current run could be a retry,
|
||||
// so it's a valid case when DB is absent already. The case of
|
||||
// `pg_database.datallowconn = false`/`restrict_conn` is a bit tricky, as
|
||||
// in theory user can have some dangling objects there, so we will fail at
|
||||
// the actual drop later. Yet, to fix that in the current code we would need
|
||||
// to ALTER DATABASE, and then check back, but that even more invasive, so
|
||||
// that's not what we really want to do here.
|
||||
//
|
||||
// For ChangeSchemaPerms, skipping DBs we cannot access is totally fine.
|
||||
if let DB::UserDB(db) = db {
|
||||
let databases = &ctx.read().await.dbs;
|
||||
|
||||
let edb = match databases.get(&db.name) {
|
||||
Some(edb) => edb,
|
||||
None => {
|
||||
warn!("skipping RunInEachDatabase phase {:?}, database {} doesn't exist in PostgreSQL", subphase, db.name);
|
||||
return Ok(Box::new(empty()));
|
||||
}
|
||||
};
|
||||
|
||||
if edb.restrict_conn || edb.invalid {
|
||||
warn!(
|
||||
"skipping RunInEachDatabase phase {:?}, database {} is (restrict_conn={}, invalid={})",
|
||||
subphase, db.name, edb.restrict_conn, edb.invalid
|
||||
);
|
||||
return Ok(Box::new(empty()));
|
||||
}
|
||||
}
|
||||
|
||||
match subphase {
|
||||
PerDatabasePhase::DropLogicalSubscriptions => {
|
||||
match &db {
|
||||
@@ -530,25 +567,12 @@ async fn get_operations<'a>(
|
||||
Ok(Box::new(operations))
|
||||
}
|
||||
PerDatabasePhase::ChangeSchemaPerms => {
|
||||
let ctx = ctx.read().await;
|
||||
let databases = &ctx.dbs;
|
||||
|
||||
let db = match &db {
|
||||
// ignore schema permissions on the system database
|
||||
DB::SystemDB => return Ok(Box::new(empty())),
|
||||
DB::UserDB(db) => db,
|
||||
};
|
||||
|
||||
if databases.get(&db.name).is_none() {
|
||||
bail!("database {} doesn't exist in PostgreSQL", db.name);
|
||||
}
|
||||
|
||||
let edb = databases.get(&db.name).unwrap();
|
||||
|
||||
if edb.restrict_conn || edb.invalid {
|
||||
return Ok(Box::new(empty()));
|
||||
}
|
||||
|
||||
let operations = vec![
|
||||
Operation {
|
||||
query: format!(
|
||||
@@ -566,6 +590,7 @@ async fn get_operations<'a>(
|
||||
|
||||
Ok(Box::new(operations))
|
||||
}
|
||||
// TODO: remove this completely https://github.com/neondatabase/cloud/issues/22663
|
||||
PerDatabasePhase::HandleAnonExtension => {
|
||||
// Only install Anon into user databases
|
||||
let db = match &db {
|
||||
|
||||
@@ -1,45 +0,0 @@
|
||||
use std::path::Path;
|
||||
|
||||
use anyhow::{anyhow, Context};
|
||||
use tracing::warn;
|
||||
|
||||
pub const RESIZE_SWAP_BIN: &str = "/neonvm/bin/resize-swap";
|
||||
|
||||
pub fn resize_swap(size_bytes: u64) -> anyhow::Result<()> {
|
||||
// run `/neonvm/bin/resize-swap --once {size_bytes}`
|
||||
//
|
||||
// Passing '--once' causes resize-swap to delete itself after successful completion, which
|
||||
// means that if compute_ctl restarts later, we won't end up calling 'swapoff' while
|
||||
// postgres is running.
|
||||
//
|
||||
// NOTE: resize-swap is not very clever. If present, --once MUST be the first arg.
|
||||
let child_result = std::process::Command::new("/usr/bin/sudo")
|
||||
.arg(RESIZE_SWAP_BIN)
|
||||
.arg("--once")
|
||||
.arg(size_bytes.to_string())
|
||||
.spawn();
|
||||
|
||||
child_result
|
||||
.context("spawn() failed")
|
||||
.and_then(|mut child| child.wait().context("wait() failed"))
|
||||
.and_then(|status| match status.success() {
|
||||
true => Ok(()),
|
||||
false => {
|
||||
// The command failed. Maybe it was because the resize-swap file doesn't exist?
|
||||
// The --once flag causes it to delete itself on success so we don't disable swap
|
||||
// while postgres is running; maybe this is fine.
|
||||
match Path::new(RESIZE_SWAP_BIN).try_exists() {
|
||||
Err(_) | Ok(true) => Err(anyhow!("process exited with {status}")),
|
||||
// The path doesn't exist; we're actually ok
|
||||
Ok(false) => {
|
||||
warn!("ignoring \"not found\" error from resize-swap to avoid swapoff while compute is running");
|
||||
Ok(())
|
||||
},
|
||||
}
|
||||
}
|
||||
})
|
||||
// wrap any prior error with the overall context that we couldn't run the command
|
||||
.with_context(|| {
|
||||
format!("could not run `/usr/bin/sudo {RESIZE_SWAP_BIN} --once {size_bytes}`")
|
||||
})
|
||||
}
|
||||
@@ -335,13 +335,21 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'checkpoint_distance' as an integer")?,
|
||||
checkpoint_timeout: settings.remove("checkpoint_timeout").map(|x| x.to_string()),
|
||||
checkpoint_timeout: settings
|
||||
.remove("checkpoint_timeout")
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'checkpoint_timeout' as duration")?,
|
||||
compaction_target_size: settings
|
||||
.remove("compaction_target_size")
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'compaction_target_size' as an integer")?,
|
||||
compaction_period: settings.remove("compaction_period").map(|x| x.to_string()),
|
||||
compaction_period: settings
|
||||
.remove("compaction_period")
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'compaction_period' as duration")?,
|
||||
compaction_threshold: settings
|
||||
.remove("compaction_threshold")
|
||||
.map(|x| x.parse::<usize>())
|
||||
@@ -387,7 +395,10 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<u64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'gc_horizon' as an integer")?,
|
||||
gc_period: settings.remove("gc_period").map(|x| x.to_string()),
|
||||
gc_period: settings.remove("gc_period")
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'gc_period' as duration")?,
|
||||
image_creation_threshold: settings
|
||||
.remove("image_creation_threshold")
|
||||
.map(|x| x.parse::<usize>())
|
||||
@@ -403,13 +414,20 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<usize>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'image_creation_preempt_threshold' as integer")?,
|
||||
pitr_interval: settings.remove("pitr_interval").map(|x| x.to_string()),
|
||||
pitr_interval: settings.remove("pitr_interval")
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'pitr_interval' as duration")?,
|
||||
walreceiver_connect_timeout: settings
|
||||
.remove("walreceiver_connect_timeout")
|
||||
.map(|x| x.to_string()),
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'walreceiver_connect_timeout' as duration")?,
|
||||
lagging_wal_timeout: settings
|
||||
.remove("lagging_wal_timeout")
|
||||
.map(|x| x.to_string()),
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'lagging_wal_timeout' as duration")?,
|
||||
max_lsn_wal_lag: settings
|
||||
.remove("max_lsn_wal_lag")
|
||||
.map(|x| x.parse::<NonZeroU64>())
|
||||
@@ -427,8 +445,14 @@ impl PageServerNode {
|
||||
.context("Failed to parse 'min_resident_size_override' as integer")?,
|
||||
evictions_low_residence_duration_metric_threshold: settings
|
||||
.remove("evictions_low_residence_duration_metric_threshold")
|
||||
.map(|x| x.to_string()),
|
||||
heatmap_period: settings.remove("heatmap_period").map(|x| x.to_string()),
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'evictions_low_residence_duration_metric_threshold' as duration")?,
|
||||
heatmap_period: settings
|
||||
.remove("heatmap_period")
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'heatmap_period' as duration")?,
|
||||
lazy_slru_download: settings
|
||||
.remove("lazy_slru_download")
|
||||
.map(|x| x.parse::<bool>())
|
||||
@@ -439,10 +463,15 @@ impl PageServerNode {
|
||||
.map(serde_json::from_str)
|
||||
.transpose()
|
||||
.context("parse `timeline_get_throttle` from json")?,
|
||||
lsn_lease_length: settings.remove("lsn_lease_length").map(|x| x.to_string()),
|
||||
lsn_lease_length: settings.remove("lsn_lease_length")
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'lsn_lease_length' as duration")?,
|
||||
lsn_lease_length_for_ts: settings
|
||||
.remove("lsn_lease_length_for_ts")
|
||||
.map(|x| x.to_string()),
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'lsn_lease_length_for_ts' as duration")?,
|
||||
timeline_offloading: settings
|
||||
.remove("timeline_offloading")
|
||||
.map(|x| x.parse::<bool>())
|
||||
|
||||
@@ -959,7 +959,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
threshold: threshold.into(),
|
||||
},
|
||||
)),
|
||||
heatmap_period: Some("300s".to_string()),
|
||||
heatmap_period: Some(Duration::from_secs(300)),
|
||||
..Default::default()
|
||||
},
|
||||
})
|
||||
|
||||
@@ -526,9 +526,13 @@ pub struct TenantConfigPatch {
|
||||
#[derive(Serialize, Deserialize, Debug, Default, Clone, Eq, PartialEq)]
|
||||
pub struct TenantConfig {
|
||||
pub checkpoint_distance: Option<u64>,
|
||||
pub checkpoint_timeout: Option<String>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub checkpoint_timeout: Option<Duration>,
|
||||
pub compaction_target_size: Option<u64>,
|
||||
pub compaction_period: Option<String>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub compaction_period: Option<Duration>,
|
||||
pub compaction_threshold: Option<usize>,
|
||||
pub compaction_upper_limit: Option<usize>,
|
||||
// defer parsing compaction_algorithm, like eviction_policy
|
||||
@@ -539,22 +543,38 @@ pub struct TenantConfig {
|
||||
pub l0_flush_stall_threshold: Option<usize>,
|
||||
pub l0_flush_wait_upload: Option<bool>,
|
||||
pub gc_horizon: Option<u64>,
|
||||
pub gc_period: Option<String>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub gc_period: Option<Duration>,
|
||||
pub image_creation_threshold: Option<usize>,
|
||||
pub pitr_interval: Option<String>,
|
||||
pub walreceiver_connect_timeout: Option<String>,
|
||||
pub lagging_wal_timeout: Option<String>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub pitr_interval: Option<Duration>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub walreceiver_connect_timeout: Option<Duration>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub lagging_wal_timeout: Option<Duration>,
|
||||
pub max_lsn_wal_lag: Option<NonZeroU64>,
|
||||
pub eviction_policy: Option<EvictionPolicy>,
|
||||
pub min_resident_size_override: Option<u64>,
|
||||
pub evictions_low_residence_duration_metric_threshold: Option<String>,
|
||||
pub heatmap_period: Option<String>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub evictions_low_residence_duration_metric_threshold: Option<Duration>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub heatmap_period: Option<Duration>,
|
||||
pub lazy_slru_download: Option<bool>,
|
||||
pub timeline_get_throttle: Option<ThrottleConfig>,
|
||||
pub image_layer_creation_check_threshold: Option<u8>,
|
||||
pub image_creation_preempt_threshold: Option<usize>,
|
||||
pub lsn_lease_length: Option<String>,
|
||||
pub lsn_lease_length_for_ts: Option<String>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub lsn_lease_length: Option<Duration>,
|
||||
#[serde(default)]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub lsn_lease_length_for_ts: Option<Duration>,
|
||||
pub timeline_offloading: Option<bool>,
|
||||
pub wal_receiver_protocol_override: Option<PostgresClientProtocol>,
|
||||
pub rel_size_v2_enabled: Option<bool>,
|
||||
@@ -564,7 +584,10 @@ pub struct TenantConfig {
|
||||
}
|
||||
|
||||
impl TenantConfig {
|
||||
pub fn apply_patch(self, patch: TenantConfigPatch) -> TenantConfig {
|
||||
pub fn apply_patch(
|
||||
self,
|
||||
patch: TenantConfigPatch,
|
||||
) -> Result<TenantConfig, humantime::DurationError> {
|
||||
let Self {
|
||||
mut checkpoint_distance,
|
||||
mut checkpoint_timeout,
|
||||
@@ -604,11 +627,17 @@ impl TenantConfig {
|
||||
} = self;
|
||||
|
||||
patch.checkpoint_distance.apply(&mut checkpoint_distance);
|
||||
patch.checkpoint_timeout.apply(&mut checkpoint_timeout);
|
||||
patch
|
||||
.checkpoint_timeout
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut checkpoint_timeout);
|
||||
patch
|
||||
.compaction_target_size
|
||||
.apply(&mut compaction_target_size);
|
||||
patch.compaction_period.apply(&mut compaction_period);
|
||||
patch
|
||||
.compaction_period
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut compaction_period);
|
||||
patch.compaction_threshold.apply(&mut compaction_threshold);
|
||||
patch
|
||||
.compaction_upper_limit
|
||||
@@ -626,15 +655,25 @@ impl TenantConfig {
|
||||
.apply(&mut l0_flush_stall_threshold);
|
||||
patch.l0_flush_wait_upload.apply(&mut l0_flush_wait_upload);
|
||||
patch.gc_horizon.apply(&mut gc_horizon);
|
||||
patch.gc_period.apply(&mut gc_period);
|
||||
patch
|
||||
.gc_period
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut gc_period);
|
||||
patch
|
||||
.image_creation_threshold
|
||||
.apply(&mut image_creation_threshold);
|
||||
patch.pitr_interval.apply(&mut pitr_interval);
|
||||
patch
|
||||
.pitr_interval
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut pitr_interval);
|
||||
patch
|
||||
.walreceiver_connect_timeout
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut walreceiver_connect_timeout);
|
||||
patch.lagging_wal_timeout.apply(&mut lagging_wal_timeout);
|
||||
patch
|
||||
.lagging_wal_timeout
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut lagging_wal_timeout);
|
||||
patch.max_lsn_wal_lag.apply(&mut max_lsn_wal_lag);
|
||||
patch.eviction_policy.apply(&mut eviction_policy);
|
||||
patch
|
||||
@@ -642,8 +681,12 @@ impl TenantConfig {
|
||||
.apply(&mut min_resident_size_override);
|
||||
patch
|
||||
.evictions_low_residence_duration_metric_threshold
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut evictions_low_residence_duration_metric_threshold);
|
||||
patch.heatmap_period.apply(&mut heatmap_period);
|
||||
patch
|
||||
.heatmap_period
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut heatmap_period);
|
||||
patch.lazy_slru_download.apply(&mut lazy_slru_download);
|
||||
patch
|
||||
.timeline_get_throttle
|
||||
@@ -654,9 +697,13 @@ impl TenantConfig {
|
||||
patch
|
||||
.image_creation_preempt_threshold
|
||||
.apply(&mut image_creation_preempt_threshold);
|
||||
patch.lsn_lease_length.apply(&mut lsn_lease_length);
|
||||
patch
|
||||
.lsn_lease_length
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut lsn_lease_length);
|
||||
patch
|
||||
.lsn_lease_length_for_ts
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut lsn_lease_length_for_ts);
|
||||
patch.timeline_offloading.apply(&mut timeline_offloading);
|
||||
patch
|
||||
@@ -673,7 +720,7 @@ impl TenantConfig {
|
||||
.gc_compaction_ratio_percent
|
||||
.apply(&mut gc_compaction_ratio_percent);
|
||||
|
||||
Self {
|
||||
Ok(Self {
|
||||
checkpoint_distance,
|
||||
checkpoint_timeout,
|
||||
compaction_target_size,
|
||||
@@ -709,7 +756,7 @@ impl TenantConfig {
|
||||
gc_compaction_enabled,
|
||||
gc_compaction_initial_threshold_kb,
|
||||
gc_compaction_ratio_percent,
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2503,7 +2550,7 @@ mod tests {
|
||||
..base.clone()
|
||||
};
|
||||
|
||||
let patched = base.apply_patch(decoded.config);
|
||||
let patched = base.apply_patch(decoded.config).unwrap();
|
||||
|
||||
assert_eq!(patched, expected);
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
[package]
|
||||
name = "vm_monitor"
|
||||
version = "0.1.0"
|
||||
edition.workspace = true
|
||||
edition = "2024"
|
||||
license.workspace = true
|
||||
|
||||
[[bin]]
|
||||
|
||||
@@ -1,12 +1,10 @@
|
||||
use std::fmt::{self, Debug, Formatter};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use anyhow::{anyhow, Context};
|
||||
use cgroups_rs::{
|
||||
hierarchies::{self, is_cgroup2_unified_mode},
|
||||
memory::MemController,
|
||||
Subsystem,
|
||||
};
|
||||
use anyhow::{Context, anyhow};
|
||||
use cgroups_rs::Subsystem;
|
||||
use cgroups_rs::hierarchies::{self, is_cgroup2_unified_mode};
|
||||
use cgroups_rs::memory::MemController;
|
||||
use tokio::sync::watch;
|
||||
use tracing::{info, warn};
|
||||
|
||||
|
||||
@@ -6,17 +6,15 @@
|
||||
//! the cgroup (requesting upscale), and the signals that go to the cgroup
|
||||
//! (notifying it of upscale).
|
||||
|
||||
use anyhow::{bail, Context};
|
||||
use anyhow::{Context, bail};
|
||||
use axum::extract::ws::{Message, Utf8Bytes, WebSocket};
|
||||
use futures::{
|
||||
stream::{SplitSink, SplitStream},
|
||||
SinkExt, StreamExt,
|
||||
};
|
||||
use futures::stream::{SplitSink, SplitStream};
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use tracing::{debug, info};
|
||||
|
||||
use crate::protocol::{
|
||||
OutboundMsg, OutboundMsgKind, ProtocolRange, ProtocolResponse, ProtocolVersion,
|
||||
PROTOCOL_MAX_VERSION, PROTOCOL_MIN_VERSION,
|
||||
OutboundMsg, OutboundMsgKind, PROTOCOL_MAX_VERSION, PROTOCOL_MIN_VERSION, ProtocolRange,
|
||||
ProtocolResponse, ProtocolVersion,
|
||||
};
|
||||
|
||||
/// The central handler for all communications in the monitor.
|
||||
|
||||
@@ -2,12 +2,14 @@
|
||||
|
||||
use std::num::NonZeroU64;
|
||||
|
||||
use crate::MiB;
|
||||
use anyhow::{anyhow, Context};
|
||||
use tokio_postgres::{types::ToSql, Client, NoTls, Row};
|
||||
use anyhow::{Context, anyhow};
|
||||
use tokio_postgres::types::ToSql;
|
||||
use tokio_postgres::{Client, NoTls, Row};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info};
|
||||
|
||||
use crate::MiB;
|
||||
|
||||
/// Manages Postgres' file cache by keeping a connection open.
|
||||
#[derive(Debug)]
|
||||
pub struct FileCacheState {
|
||||
|
||||
@@ -2,24 +2,26 @@
|
||||
#![deny(clippy::undocumented_unsafe_blocks)]
|
||||
#![cfg(target_os = "linux")]
|
||||
|
||||
use std::fmt::Debug;
|
||||
use std::net::SocketAddr;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Context;
|
||||
use axum::{
|
||||
extract::{ws::WebSocket, State, WebSocketUpgrade},
|
||||
response::Response,
|
||||
};
|
||||
use axum::{routing::get, Router};
|
||||
use axum::Router;
|
||||
use axum::extract::ws::WebSocket;
|
||||
use axum::extract::{State, WebSocketUpgrade};
|
||||
use axum::response::Response;
|
||||
use axum::routing::get;
|
||||
use clap::Parser;
|
||||
use futures::Future;
|
||||
use std::net::SocketAddr;
|
||||
use std::{fmt::Debug, time::Duration};
|
||||
use runner::Runner;
|
||||
use sysinfo::{RefreshKind, System, SystemExt};
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::{sync::broadcast, task::JoinHandle};
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info};
|
||||
|
||||
use runner::Runner;
|
||||
|
||||
// Code that interfaces with agent
|
||||
pub mod dispatcher;
|
||||
pub mod protocol;
|
||||
|
||||
@@ -35,7 +35,8 @@
|
||||
use core::fmt;
|
||||
use std::cmp;
|
||||
|
||||
use serde::{de::Error, Deserialize, Serialize};
|
||||
use serde::de::Error;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// A Message we send to the agent.
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
use std::fmt::Debug;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use anyhow::{bail, Context};
|
||||
use anyhow::{Context, bail};
|
||||
use axum::extract::ws::{Message, WebSocket};
|
||||
use futures::StreamExt;
|
||||
use tokio::sync::{broadcast, watch};
|
||||
@@ -18,7 +18,7 @@ use crate::cgroup::{self, CgroupWatcher};
|
||||
use crate::dispatcher::Dispatcher;
|
||||
use crate::filecache::{FileCacheConfig, FileCacheState};
|
||||
use crate::protocol::{InboundMsg, InboundMsgKind, OutboundMsg, OutboundMsgKind, Resources};
|
||||
use crate::{bytes_to_mebibytes, get_total_system_memory, spawn_with_cancel, Args, MiB};
|
||||
use crate::{Args, MiB, bytes_to_mebibytes, get_total_system_memory, spawn_with_cancel};
|
||||
|
||||
/// Central struct that interacts with agent, dispatcher, and cgroup to handle
|
||||
/// signals from the agent.
|
||||
@@ -233,7 +233,9 @@ impl Runner {
|
||||
//
|
||||
// TODO: make the duration here configurable.
|
||||
if last_time.elapsed() > Duration::from_secs(5) {
|
||||
bail!("haven't gotten cgroup memory stats recently enough to determine downscaling information");
|
||||
bail!(
|
||||
"haven't gotten cgroup memory stats recently enough to determine downscaling information"
|
||||
);
|
||||
} else if last_history.samples_count <= 1 {
|
||||
let status = "haven't received enough cgroup memory stats yet";
|
||||
info!(status, "discontinuing downscale");
|
||||
|
||||
@@ -693,16 +693,15 @@ impl TryFrom<&'_ models::TenantConfig> for TenantConfOpt {
|
||||
/// This is a conversion from our internal tenant config object to the one used
|
||||
/// in external APIs.
|
||||
impl From<TenantConfOpt> for models::TenantConfig {
|
||||
// TODO(vlad): These are now the same, but they have different serialization logic.
|
||||
// Can we merge them?
|
||||
fn from(value: TenantConfOpt) -> Self {
|
||||
fn humantime(d: Duration) -> String {
|
||||
format!("{}s", d.as_secs())
|
||||
}
|
||||
Self {
|
||||
checkpoint_distance: value.checkpoint_distance,
|
||||
checkpoint_timeout: value.checkpoint_timeout.map(humantime),
|
||||
checkpoint_timeout: value.checkpoint_timeout,
|
||||
compaction_algorithm: value.compaction_algorithm,
|
||||
compaction_target_size: value.compaction_target_size,
|
||||
compaction_period: value.compaction_period.map(humantime),
|
||||
compaction_period: value.compaction_period,
|
||||
compaction_threshold: value.compaction_threshold,
|
||||
compaction_upper_limit: value.compaction_upper_limit,
|
||||
compaction_l0_first: value.compaction_l0_first,
|
||||
@@ -711,24 +710,23 @@ impl From<TenantConfOpt> for models::TenantConfig {
|
||||
l0_flush_stall_threshold: value.l0_flush_stall_threshold,
|
||||
l0_flush_wait_upload: value.l0_flush_wait_upload,
|
||||
gc_horizon: value.gc_horizon,
|
||||
gc_period: value.gc_period.map(humantime),
|
||||
gc_period: value.gc_period,
|
||||
image_creation_threshold: value.image_creation_threshold,
|
||||
pitr_interval: value.pitr_interval.map(humantime),
|
||||
walreceiver_connect_timeout: value.walreceiver_connect_timeout.map(humantime),
|
||||
lagging_wal_timeout: value.lagging_wal_timeout.map(humantime),
|
||||
pitr_interval: value.pitr_interval,
|
||||
walreceiver_connect_timeout: value.walreceiver_connect_timeout,
|
||||
lagging_wal_timeout: value.lagging_wal_timeout,
|
||||
max_lsn_wal_lag: value.max_lsn_wal_lag,
|
||||
eviction_policy: value.eviction_policy,
|
||||
min_resident_size_override: value.min_resident_size_override,
|
||||
evictions_low_residence_duration_metric_threshold: value
|
||||
.evictions_low_residence_duration_metric_threshold
|
||||
.map(humantime),
|
||||
heatmap_period: value.heatmap_period.map(humantime),
|
||||
.evictions_low_residence_duration_metric_threshold,
|
||||
heatmap_period: value.heatmap_period,
|
||||
lazy_slru_download: value.lazy_slru_download,
|
||||
timeline_get_throttle: value.timeline_get_throttle,
|
||||
image_layer_creation_check_threshold: value.image_layer_creation_check_threshold,
|
||||
image_creation_preempt_threshold: value.image_creation_preempt_threshold,
|
||||
lsn_lease_length: value.lsn_lease_length.map(humantime),
|
||||
lsn_lease_length_for_ts: value.lsn_lease_length_for_ts.map(humantime),
|
||||
lsn_lease_length: value.lsn_lease_length,
|
||||
lsn_lease_length_for_ts: value.lsn_lease_length_for_ts,
|
||||
timeline_offloading: value.timeline_offloading,
|
||||
wal_receiver_protocol_override: value.wal_receiver_protocol_override,
|
||||
rel_size_v2_enabled: value.rel_size_v2_enabled,
|
||||
@@ -760,29 +758,10 @@ mod tests {
|
||||
assert_eq!(small_conf, serde_json::from_str(&json_form).unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_try_from_models_tenant_config_err() {
|
||||
let tenant_config = models::TenantConfig {
|
||||
lagging_wal_timeout: Some("5a".to_string()),
|
||||
..TenantConfig::default()
|
||||
};
|
||||
|
||||
let tenant_conf_opt = TenantConfOpt::try_from(&tenant_config);
|
||||
|
||||
assert!(
|
||||
tenant_conf_opt.is_err(),
|
||||
"Suceeded to convert TenantConfig to TenantConfOpt"
|
||||
);
|
||||
|
||||
let expected_error_str =
|
||||
"lagging_wal_timeout: invalid value: string \"5a\", expected a duration";
|
||||
assert_eq!(tenant_conf_opt.unwrap_err().to_string(), expected_error_str);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_try_from_models_tenant_config_success() {
|
||||
let tenant_config = models::TenantConfig {
|
||||
lagging_wal_timeout: Some("5s".to_string()),
|
||||
lagging_wal_timeout: Some(Duration::from_secs(5)),
|
||||
..TenantConfig::default()
|
||||
};
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -56,6 +56,7 @@ uint32 WAIT_EVENT_NEON_LFC_MAINTENANCE;
|
||||
uint32 WAIT_EVENT_NEON_LFC_READ;
|
||||
uint32 WAIT_EVENT_NEON_LFC_TRUNCATE;
|
||||
uint32 WAIT_EVENT_NEON_LFC_WRITE;
|
||||
uint32 WAIT_EVENT_NEON_LFC_CV_WAIT;
|
||||
uint32 WAIT_EVENT_NEON_PS_STARTING;
|
||||
uint32 WAIT_EVENT_NEON_PS_CONFIGURING;
|
||||
uint32 WAIT_EVENT_NEON_PS_SEND;
|
||||
@@ -538,6 +539,7 @@ neon_shmem_startup_hook(void)
|
||||
WAIT_EVENT_NEON_LFC_READ = WaitEventExtensionNew("Neon/FileCache_Read");
|
||||
WAIT_EVENT_NEON_LFC_TRUNCATE = WaitEventExtensionNew("Neon/FileCache_Truncate");
|
||||
WAIT_EVENT_NEON_LFC_WRITE = WaitEventExtensionNew("Neon/FileCache_Write");
|
||||
WAIT_EVENT_NEON_LFC_CV_WAIT = WaitEventExtensionNew("Neon/FileCache_CvWait");
|
||||
WAIT_EVENT_NEON_PS_STARTING = WaitEventExtensionNew("Neon/PS_Starting");
|
||||
WAIT_EVENT_NEON_PS_CONFIGURING = WaitEventExtensionNew("Neon/PS_Configuring");
|
||||
WAIT_EVENT_NEON_PS_SEND = WaitEventExtensionNew("Neon/PS_SendIO");
|
||||
|
||||
@@ -28,6 +28,7 @@ extern uint32 WAIT_EVENT_NEON_LFC_MAINTENANCE;
|
||||
extern uint32 WAIT_EVENT_NEON_LFC_READ;
|
||||
extern uint32 WAIT_EVENT_NEON_LFC_TRUNCATE;
|
||||
extern uint32 WAIT_EVENT_NEON_LFC_WRITE;
|
||||
extern uint32 WAIT_EVENT_NEON_LFC_CV_WAIT;
|
||||
extern uint32 WAIT_EVENT_NEON_PS_STARTING;
|
||||
extern uint32 WAIT_EVENT_NEON_PS_CONFIGURING;
|
||||
extern uint32 WAIT_EVENT_NEON_PS_SEND;
|
||||
@@ -38,6 +39,7 @@ extern uint32 WAIT_EVENT_NEON_WAL_DL;
|
||||
#define WAIT_EVENT_NEON_LFC_READ WAIT_EVENT_BUFFILE_READ
|
||||
#define WAIT_EVENT_NEON_LFC_TRUNCATE WAIT_EVENT_BUFFILE_TRUNCATE
|
||||
#define WAIT_EVENT_NEON_LFC_WRITE WAIT_EVENT_BUFFILE_WRITE
|
||||
#define WAIT_EVENT_NEON_LFC_CV_WAIT WAIT_EVENT_BUFFILE_READ
|
||||
#define WAIT_EVENT_NEON_PS_STARTING PG_WAIT_EXTENSION
|
||||
#define WAIT_EVENT_NEON_PS_CONFIGURING PG_WAIT_EXTENSION
|
||||
#define WAIT_EVENT_NEON_PS_SEND PG_WAIT_EXTENSION
|
||||
|
||||
@@ -233,6 +233,7 @@ extern char *neon_timeline;
|
||||
extern char *neon_tenant;
|
||||
extern int32 max_cluster_size;
|
||||
extern int neon_protocol_version;
|
||||
extern bool lfc_store_prefetch_result;
|
||||
|
||||
extern shardno_t get_shard_number(BufferTag* tag);
|
||||
|
||||
@@ -301,14 +302,16 @@ extern bool lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum,
|
||||
BlockNumber blkno);
|
||||
extern int lfc_cache_containsv(NRelFileInfo rinfo, ForkNumber forkNum,
|
||||
BlockNumber blkno, int nblocks, bits8 *bitmap);
|
||||
extern void lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno);
|
||||
extern void lfc_init(void);
|
||||
extern bool lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
const void* buffer, XLogRecPtr lsn);
|
||||
|
||||
|
||||
static inline bool
|
||||
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
void *buffer)
|
||||
{
|
||||
bits8 rv = 0;
|
||||
bits8 rv = 1;
|
||||
return lfc_readv_select(rinfo, forkNum, blkno, &buffer, 1, &rv) == 1;
|
||||
}
|
||||
|
||||
|
||||
@@ -162,7 +162,7 @@ static uint32 local_request_counter;
|
||||
* UNUSED ------> REQUESTED --> RECEIVED
|
||||
* ^ : | |
|
||||
* | : v |
|
||||
* | : TAG_UNUSED |
|
||||
* | : TAG_REMAINS |
|
||||
* | : | |
|
||||
* +----------------+------------+
|
||||
* :
|
||||
@@ -181,7 +181,7 @@ typedef enum PrefetchStatus
|
||||
/* must fit in uint8; bits 0x1 are used */
|
||||
typedef enum {
|
||||
PRFSF_NONE = 0x0,
|
||||
PRFSF_SEQ = 0x1,
|
||||
PRFSF_LFC = 0x1 /* received prefetch result is stored in LFC */
|
||||
} PrefetchRequestFlags;
|
||||
|
||||
typedef struct PrefetchRequest
|
||||
@@ -305,7 +305,7 @@ GetLastWrittenLSNv(NRelFileInfo relfilenode, ForkNumber forknum,
|
||||
static void
|
||||
neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum,
|
||||
BlockNumber blkno, neon_request_lsns *output,
|
||||
BlockNumber nblocks, const bits8 *mask);
|
||||
BlockNumber nblocks);
|
||||
static bool neon_prefetch_response_usable(neon_request_lsns *request_lsns,
|
||||
PrefetchRequest *slot);
|
||||
|
||||
@@ -363,6 +363,7 @@ compact_prefetch_buffers(void)
|
||||
target_slot->buftag = source_slot->buftag;
|
||||
target_slot->shard_no = source_slot->shard_no;
|
||||
target_slot->status = source_slot->status;
|
||||
target_slot->flags = source_slot->flags;
|
||||
target_slot->response = source_slot->response;
|
||||
target_slot->reqid = source_slot->reqid;
|
||||
target_slot->request_lsns = source_slot->request_lsns;
|
||||
@@ -452,6 +453,18 @@ prefetch_pump_state(void)
|
||||
/* update slot state */
|
||||
slot->status = PRFS_RECEIVED;
|
||||
slot->response = response;
|
||||
|
||||
if (response->tag == T_NeonGetPageResponse && !(slot->flags & PRFSF_LFC) && lfc_store_prefetch_result)
|
||||
{
|
||||
/*
|
||||
* Store prefetched result in LFC (please read comments to lfc_prefetch
|
||||
* explaining why it can be done without holding shared buffer lock
|
||||
*/
|
||||
if (lfc_prefetch(BufTagGetNRelFileInfo(slot->buftag), slot->buftag.forkNum, slot->buftag.blockNum, ((NeonGetPageResponse*)response)->page, slot->request_lsns.not_modified_since))
|
||||
{
|
||||
slot->flags |= PRFSF_LFC;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -713,6 +726,18 @@ prefetch_read(PrefetchRequest *slot)
|
||||
/* update slot state */
|
||||
slot->status = PRFS_RECEIVED;
|
||||
slot->response = response;
|
||||
|
||||
if (response->tag == T_NeonGetPageResponse && !(slot->flags & PRFSF_LFC) && lfc_store_prefetch_result)
|
||||
{
|
||||
/*
|
||||
* Store prefetched result in LFC (please read comments to lfc_prefetch
|
||||
* explaining why it can be done without holding shared buffer lock
|
||||
*/
|
||||
if (lfc_prefetch(BufTagGetNRelFileInfo(buftag), buftag.forkNum, buftag.blockNum, ((NeonGetPageResponse*)response)->page, slot->request_lsns.not_modified_since))
|
||||
{
|
||||
slot->flags |= PRFSF_LFC;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
else
|
||||
@@ -864,7 +889,7 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
|
||||
else
|
||||
neon_get_request_lsns(BufTagGetNRelFileInfo(slot->buftag),
|
||||
slot->buftag.forkNum, slot->buftag.blockNum,
|
||||
&slot->request_lsns, 1, NULL);
|
||||
&slot->request_lsns, 1);
|
||||
request.hdr.lsn = slot->request_lsns.request_lsn;
|
||||
request.hdr.not_modified_since = slot->request_lsns.not_modified_since;
|
||||
|
||||
@@ -890,6 +915,73 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns
|
||||
Assert(!found);
|
||||
}
|
||||
|
||||
/*
|
||||
* Lookup of already received prefetch requests. Only already received responses matching required LSNs are accepted.
|
||||
* Present pages are marked in "mask" bitmap and total number of such pages is returned.
|
||||
*/
|
||||
static int
|
||||
prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blocknum, neon_request_lsns *lsns,
|
||||
BlockNumber nblocks, void **buffers, bits8 *mask)
|
||||
{
|
||||
int hits = 0;
|
||||
PrefetchRequest hashkey;
|
||||
|
||||
/*
|
||||
* Use an intermediate PrefetchRequest struct as the hash key to ensure
|
||||
* correct alignment and that the padding bytes are cleared.
|
||||
*/
|
||||
memset(&hashkey.buftag, 0, sizeof(BufferTag));
|
||||
CopyNRelFileInfoToBufTag(hashkey.buftag, rinfo);
|
||||
hashkey.buftag.forkNum = forknum;
|
||||
|
||||
for (int i = 0; i < nblocks; i++)
|
||||
{
|
||||
PrfHashEntry *entry;
|
||||
|
||||
hashkey.buftag.blockNum = blocknum + i;
|
||||
entry = prfh_lookup(MyPState->prf_hash, &hashkey);
|
||||
|
||||
if (entry != NULL)
|
||||
{
|
||||
PrefetchRequest *slot = entry->slot;
|
||||
uint64 ring_index = slot->my_ring_index;
|
||||
Assert(slot == GetPrfSlot(ring_index));
|
||||
|
||||
Assert(slot->status != PRFS_UNUSED);
|
||||
Assert(MyPState->ring_last <= ring_index &&
|
||||
ring_index < MyPState->ring_unused);
|
||||
Assert(BufferTagsEqual(&slot->buftag, &hashkey.buftag));
|
||||
|
||||
if (slot->status != PRFS_RECEIVED)
|
||||
continue;
|
||||
|
||||
/*
|
||||
* If the caller specified a request LSN to use, only accept
|
||||
* prefetch responses that satisfy that request.
|
||||
*/
|
||||
if (!neon_prefetch_response_usable(&lsns[i], slot))
|
||||
continue;
|
||||
|
||||
memcpy(buffers[i], ((NeonGetPageResponse*)slot->response)->page, BLCKSZ);
|
||||
prefetch_set_unused(ring_index);
|
||||
BITMAP_SET(mask, i);
|
||||
|
||||
hits += 1;
|
||||
}
|
||||
}
|
||||
pgBufferUsage.prefetch.hits += hits;
|
||||
return hits;
|
||||
}
|
||||
|
||||
#if PG_MAJORVERSION_NUM < 17
|
||||
static bool
|
||||
prefetch_lookup(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkn, neon_request_lsns *lsns, void *buffer)
|
||||
{
|
||||
bits8 present = 0;
|
||||
return prefetch_lookupv(rinfo, forkNum, blkn, lsns, 1, &buffer, &present) != 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
/*
|
||||
* prefetch_register_bufferv() - register and prefetch buffers
|
||||
*
|
||||
@@ -1013,8 +1105,6 @@ Retry:
|
||||
/* The buffered request is good enough, return that index */
|
||||
if (is_prefetch)
|
||||
pgBufferUsage.prefetch.duplicates++;
|
||||
else
|
||||
pgBufferUsage.prefetch.hits++;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
@@ -1116,6 +1206,7 @@ Retry:
|
||||
slot->buftag = hashkey.buftag;
|
||||
slot->shard_no = get_shard_number(&tag);
|
||||
slot->my_ring_index = ring_index;
|
||||
slot->flags = 0;
|
||||
|
||||
min_ring_index = Min(min_ring_index, ring_index);
|
||||
|
||||
@@ -2056,8 +2147,7 @@ GetLastWrittenLSNv(NRelFileInfo relfilenode, ForkNumber forknum,
|
||||
*/
|
||||
static void
|
||||
neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
neon_request_lsns *output, BlockNumber nblocks,
|
||||
const bits8 *mask)
|
||||
neon_request_lsns *output, BlockNumber nblocks)
|
||||
{
|
||||
XLogRecPtr last_written_lsns[PG_IOV_MAX];
|
||||
|
||||
@@ -2145,9 +2235,6 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
neon_request_lsns *result = &output[i];
|
||||
XLogRecPtr last_written_lsn = last_written_lsns[i];
|
||||
|
||||
if (PointerIsValid(mask) && !BITMAP_ISSET(mask, i))
|
||||
continue;
|
||||
|
||||
if (last_written_lsn > replay_lsn)
|
||||
{
|
||||
/* GetCurrentReplayRecPtr was introduced in v15 */
|
||||
@@ -2190,8 +2277,6 @@ neon_get_request_lsns(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
neon_request_lsns *result = &output[i];
|
||||
XLogRecPtr last_written_lsn = last_written_lsns[i];
|
||||
|
||||
if (PointerIsValid(mask) && !BITMAP_ISSET(mask, i))
|
||||
continue;
|
||||
/*
|
||||
* Use the latest LSN that was evicted from the buffer cache as the
|
||||
* 'not_modified_since' hint. Any pages modified by later WAL records
|
||||
@@ -2413,7 +2498,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
|
||||
}
|
||||
|
||||
neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum,
|
||||
REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1, NULL);
|
||||
REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1);
|
||||
{
|
||||
NeonExistsRequest request = {
|
||||
.hdr.tag = T_NeonExistsRequest,
|
||||
@@ -2832,8 +2917,7 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
while (nblocks > 0)
|
||||
{
|
||||
int iterblocks = Min(nblocks, PG_IOV_MAX);
|
||||
bits8 lfc_present[PG_IOV_MAX / 8];
|
||||
memset(lfc_present, 0, sizeof(lfc_present));
|
||||
bits8 lfc_present[PG_IOV_MAX / 8] = {0};
|
||||
|
||||
if (lfc_cache_containsv(InfoFromSMgrRel(reln), forknum, blocknum,
|
||||
iterblocks, lfc_present) == iterblocks)
|
||||
@@ -2844,12 +2928,13 @@ neon_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
}
|
||||
|
||||
tag.blockNum = blocknum;
|
||||
|
||||
|
||||
for (int i = 0; i < PG_IOV_MAX / 8; i++)
|
||||
lfc_present[i] = ~(lfc_present[i]);
|
||||
|
||||
ring_index = prefetch_register_bufferv(tag, NULL, iterblocks,
|
||||
lfc_present, true);
|
||||
|
||||
nblocks -= iterblocks;
|
||||
blocknum += iterblocks;
|
||||
|
||||
@@ -3105,7 +3190,8 @@ Retry:
|
||||
}
|
||||
}
|
||||
memcpy(buffer, getpage_resp->page, BLCKSZ);
|
||||
lfc_write(rinfo, forkNum, blockno, buffer);
|
||||
if (!lfc_store_prefetch_result)
|
||||
lfc_write(rinfo, forkNum, blockno, buffer);
|
||||
break;
|
||||
}
|
||||
case T_NeonErrorResponse:
|
||||
@@ -3190,6 +3276,17 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
|
||||
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
||||
}
|
||||
|
||||
/* Try to read PS results if they are available */
|
||||
prefetch_pump_state();
|
||||
|
||||
neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum, blkno, &request_lsns, 1);
|
||||
|
||||
if (prefetch_lookup(InfoFromSMgrRel(reln), forkNum, blkno, &request_lsns, buffer))
|
||||
{
|
||||
/* Prefetch hit */
|
||||
return;
|
||||
}
|
||||
|
||||
/* Try to read from local file cache */
|
||||
if (lfc_read(InfoFromSMgrRel(reln), forkNum, blkno, buffer))
|
||||
{
|
||||
@@ -3197,9 +3294,11 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
|
||||
return;
|
||||
}
|
||||
|
||||
neon_get_request_lsns(InfoFromSMgrRel(reln), forkNum, blkno, &request_lsns, 1, NULL);
|
||||
neon_read_at_lsn(InfoFromSMgrRel(reln), forkNum, blkno, request_lsns, buffer);
|
||||
|
||||
/*
|
||||
* Try to receive prefetch results once again just to make sure we don't leave the smgr code while the OS might still have buffered bytes.
|
||||
*/
|
||||
prefetch_pump_state();
|
||||
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
@@ -3280,11 +3379,14 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
|
||||
#if PG_MAJORVERSION_NUM >= 17
|
||||
static void
|
||||
neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
void **buffers, BlockNumber nblocks)
|
||||
void **buffers, BlockNumber nblocks)
|
||||
{
|
||||
bits8 prefetch_hits[PG_IOV_MAX / 8] = {0};
|
||||
bits8 lfc_hits[PG_IOV_MAX / 8];
|
||||
bits8 read[PG_IOV_MAX / 8];
|
||||
neon_request_lsns request_lsns[PG_IOV_MAX];
|
||||
int lfc_result;
|
||||
int prefetch_result;
|
||||
|
||||
switch (reln->smgr_relpersistence)
|
||||
{
|
||||
@@ -3307,38 +3409,52 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
neon_log(ERROR, "Read request too large: %d is larger than max %d",
|
||||
nblocks, PG_IOV_MAX);
|
||||
|
||||
memset(read, 0, sizeof(read));
|
||||
/* Try to read PS results if they are available */
|
||||
prefetch_pump_state();
|
||||
|
||||
neon_get_request_lsns(InfoFromSMgrRel(reln), forknum, blocknum,
|
||||
request_lsns, nblocks);
|
||||
|
||||
|
||||
prefetch_result = prefetch_lookupv(InfoFromSMgrRel(reln), forknum, blocknum, request_lsns, nblocks, buffers, prefetch_hits);
|
||||
|
||||
if (prefetch_result == nblocks)
|
||||
return;
|
||||
|
||||
/* invert the result: exclude prefetched blocks */
|
||||
for (int i = 0; i < PG_IOV_MAX / 8; i++)
|
||||
lfc_hits[i] = ~prefetch_hits[i];
|
||||
|
||||
/* Try to read from local file cache */
|
||||
lfc_result = lfc_readv_select(InfoFromSMgrRel(reln), forknum, blocknum, buffers,
|
||||
nblocks, read);
|
||||
nblocks, lfc_hits);
|
||||
|
||||
if (lfc_result > 0)
|
||||
MyNeonCounters->file_cache_hits_total += lfc_result;
|
||||
|
||||
/* Read all blocks from LFC, so we're done */
|
||||
if (lfc_result == nblocks)
|
||||
if (prefetch_result + lfc_result == nblocks)
|
||||
return;
|
||||
|
||||
if (lfc_result == -1)
|
||||
if (lfc_result <= 0)
|
||||
{
|
||||
/* can't use the LFC result, so read all blocks from PS */
|
||||
for (int i = 0; i < PG_IOV_MAX / 8; i++)
|
||||
read[i] = 0xFF;
|
||||
read[i] = ~prefetch_hits[i];
|
||||
}
|
||||
else
|
||||
{
|
||||
/* invert the result: exclude blocks read from lfc */
|
||||
for (int i = 0; i < PG_IOV_MAX / 8; i++)
|
||||
read[i] = ~(read[i]);
|
||||
read[i] = ~(prefetch_hits[i] | lfc_hits[i]);
|
||||
}
|
||||
|
||||
neon_get_request_lsns(InfoFromSMgrRel(reln), forknum, blocknum,
|
||||
request_lsns, nblocks, read);
|
||||
|
||||
neon_read_at_lsnv(InfoFromSMgrRel(reln), forknum, blocknum, request_lsns,
|
||||
buffers, nblocks, read);
|
||||
|
||||
/*
|
||||
* Try to receive prefetch results once again just to make sure we don't leave the smgr code while the OS might still have buffered bytes.
|
||||
*/
|
||||
prefetch_pump_state();
|
||||
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
@@ -3610,7 +3726,7 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
|
||||
}
|
||||
|
||||
neon_get_request_lsns(InfoFromSMgrRel(reln), forknum,
|
||||
REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1, NULL);
|
||||
REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1);
|
||||
|
||||
{
|
||||
NeonNblocksRequest request = {
|
||||
@@ -3695,7 +3811,7 @@ neon_dbsize(Oid dbNode)
|
||||
NRelFileInfo dummy_node = {0};
|
||||
|
||||
neon_get_request_lsns(dummy_node, MAIN_FORKNUM,
|
||||
REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1, NULL);
|
||||
REL_METADATA_PSEUDO_BLOCKNO, &request_lsns, 1);
|
||||
|
||||
{
|
||||
NeonDbSizeRequest request = {
|
||||
@@ -4430,7 +4546,12 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
|
||||
if (no_redo_needed)
|
||||
{
|
||||
SetLastWrittenLSNForBlock(end_recptr, rinfo, forknum, blkno);
|
||||
lfc_evict(rinfo, forknum, blkno);
|
||||
/*
|
||||
* Redo changes if page exists in LFC.
|
||||
* We should perform this check after assigning LwLSN to prevent
|
||||
* prefetching of some older version of the page by some other backend.
|
||||
*/
|
||||
no_redo_needed = !lfc_cache_contains(rinfo, forknum, blkno);
|
||||
}
|
||||
|
||||
LWLockRelease(partitionLock);
|
||||
|
||||
@@ -24,6 +24,7 @@ hex.workspace = true
|
||||
hyper0.workspace = true
|
||||
humantime.workspace = true
|
||||
itertools.workspace = true
|
||||
json-structural-diff.workspace = true
|
||||
lasso.workspace = true
|
||||
once_cell.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
|
||||
@@ -598,7 +598,10 @@ async fn handle_tenant_timeline_passthrough(
|
||||
|
||||
let _timer = latency.start_timer(labels.clone());
|
||||
|
||||
let client = mgmt_api::Client::new(node.base_url(), service.get_config().jwt_token.as_deref());
|
||||
let client = mgmt_api::Client::new(
|
||||
node.base_url(),
|
||||
service.get_config().pageserver_jwt_token.as_deref(),
|
||||
);
|
||||
let resp = client.get_raw(path).await.map_err(|e|
|
||||
// We return 503 here because if we can't successfully send a request to the pageserver,
|
||||
// either we aren't available or the pageserver is unavailable.
|
||||
@@ -1354,10 +1357,7 @@ async fn handle_safekeeper_scheduling_policy(
|
||||
.set_safekeeper_scheduling_policy(id, body.scheduling_policy)
|
||||
.await?;
|
||||
|
||||
Ok(Response::builder()
|
||||
.status(StatusCode::NO_CONTENT)
|
||||
.body(Body::empty())
|
||||
.unwrap())
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
/// Common wrapper for request handlers that call into Service and will operate on tenants: they must only
|
||||
|
||||
@@ -53,6 +53,10 @@ struct Cli {
|
||||
#[arg(long)]
|
||||
jwt_token: Option<String>,
|
||||
|
||||
/// Token for authenticating this service with the safekeepers it controls
|
||||
#[arg(long)]
|
||||
safekeeper_jwt_token: Option<String>,
|
||||
|
||||
/// Token for authenticating this service with the control plane, when calling
|
||||
/// the compute notification endpoint
|
||||
#[arg(long)]
|
||||
@@ -153,7 +157,8 @@ impl Default for StrictMode {
|
||||
struct Secrets {
|
||||
database_url: String,
|
||||
public_key: Option<JwtAuth>,
|
||||
jwt_token: Option<String>,
|
||||
pageserver_jwt_token: Option<String>,
|
||||
safekeeper_jwt_token: Option<String>,
|
||||
control_plane_jwt_token: Option<String>,
|
||||
peer_jwt_token: Option<String>,
|
||||
}
|
||||
@@ -161,6 +166,7 @@ struct Secrets {
|
||||
impl Secrets {
|
||||
const DATABASE_URL_ENV: &'static str = "DATABASE_URL";
|
||||
const PAGESERVER_JWT_TOKEN_ENV: &'static str = "PAGESERVER_JWT_TOKEN";
|
||||
const SAFEKEEPER_JWT_TOKEN_ENV: &'static str = "SAFEKEEPER_JWT_TOKEN";
|
||||
const CONTROL_PLANE_JWT_TOKEN_ENV: &'static str = "CONTROL_PLANE_JWT_TOKEN";
|
||||
const PEER_JWT_TOKEN_ENV: &'static str = "PEER_JWT_TOKEN";
|
||||
const PUBLIC_KEY_ENV: &'static str = "PUBLIC_KEY";
|
||||
@@ -184,7 +190,14 @@ impl Secrets {
|
||||
let this = Self {
|
||||
database_url,
|
||||
public_key,
|
||||
jwt_token: Self::load_secret(&args.jwt_token, Self::PAGESERVER_JWT_TOKEN_ENV),
|
||||
pageserver_jwt_token: Self::load_secret(
|
||||
&args.jwt_token,
|
||||
Self::PAGESERVER_JWT_TOKEN_ENV,
|
||||
),
|
||||
safekeeper_jwt_token: Self::load_secret(
|
||||
&args.safekeeper_jwt_token,
|
||||
Self::SAFEKEEPER_JWT_TOKEN_ENV,
|
||||
),
|
||||
control_plane_jwt_token: Self::load_secret(
|
||||
&args.control_plane_jwt_token,
|
||||
Self::CONTROL_PLANE_JWT_TOKEN_ENV,
|
||||
@@ -264,11 +277,17 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
|
||||
let secrets = Secrets::load(&args).await?;
|
||||
|
||||
// TODO: once we've rolled out the safekeeper JWT token everywhere, put it into the validation code below
|
||||
tracing::info!(
|
||||
"safekeeper_jwt_token set: {:?}",
|
||||
secrets.safekeeper_jwt_token.is_some()
|
||||
);
|
||||
|
||||
// Validate required secrets and arguments are provided in strict mode
|
||||
match strict_mode {
|
||||
StrictMode::Strict
|
||||
if (secrets.public_key.is_none()
|
||||
|| secrets.jwt_token.is_none()
|
||||
|| secrets.pageserver_jwt_token.is_none()
|
||||
|| secrets.control_plane_jwt_token.is_none()) =>
|
||||
{
|
||||
// Production systems should always have secrets configured: if public_key was not set
|
||||
@@ -293,7 +312,8 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
}
|
||||
|
||||
let config = Config {
|
||||
jwt_token: secrets.jwt_token,
|
||||
pageserver_jwt_token: secrets.pageserver_jwt_token,
|
||||
safekeeper_jwt_token: secrets.safekeeper_jwt_token,
|
||||
control_plane_jwt_token: secrets.control_plane_jwt_token,
|
||||
peer_jwt_token: secrets.peer_jwt_token,
|
||||
compute_hook_url: args.compute_hook_url,
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use crate::pageserver_client::PageserverClient;
|
||||
use crate::persistence::Persistence;
|
||||
use crate::{compute_hook, service};
|
||||
use json_structural_diff::JsonDiff;
|
||||
use pageserver_api::controller_api::{AvailabilityZone, MigrationConfig, PlacementPolicy};
|
||||
use pageserver_api::models::{
|
||||
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig, TenantWaitLsnRequest,
|
||||
@@ -24,7 +25,7 @@ use crate::compute_hook::{ComputeHook, NotifyError};
|
||||
use crate::node::Node;
|
||||
use crate::tenant_shard::{IntentState, ObservedState, ObservedStateDelta, ObservedStateLocation};
|
||||
|
||||
const DEFAULT_HEATMAP_PERIOD: &str = "60s";
|
||||
const DEFAULT_HEATMAP_PERIOD: Duration = Duration::from_secs(60);
|
||||
|
||||
/// Object with the lifetime of the background reconcile task that is created
|
||||
/// for tenants which have a difference between their intent and observed states.
|
||||
@@ -296,7 +297,7 @@ impl Reconciler {
|
||||
.location_config(tenant_shard_id, config.clone(), flush_ms, lazy)
|
||||
.await
|
||||
},
|
||||
&self.service_config.jwt_token,
|
||||
&self.service_config.pageserver_jwt_token,
|
||||
1,
|
||||
3,
|
||||
timeout,
|
||||
@@ -417,7 +418,7 @@ impl Reconciler {
|
||||
let client = PageserverClient::new(
|
||||
node.get_id(),
|
||||
node.base_url(),
|
||||
self.service_config.jwt_token.as_deref(),
|
||||
self.service_config.pageserver_jwt_token.as_deref(),
|
||||
);
|
||||
|
||||
client
|
||||
@@ -440,7 +441,7 @@ impl Reconciler {
|
||||
let client = PageserverClient::new(
|
||||
node.get_id(),
|
||||
node.base_url(),
|
||||
self.service_config.jwt_token.as_deref(),
|
||||
self.service_config.pageserver_jwt_token.as_deref(),
|
||||
);
|
||||
|
||||
let timelines = client.timeline_list(&tenant_shard_id).await?;
|
||||
@@ -478,7 +479,7 @@ impl Reconciler {
|
||||
)
|
||||
.await
|
||||
},
|
||||
&self.service_config.jwt_token,
|
||||
&self.service_config.pageserver_jwt_token,
|
||||
1,
|
||||
3,
|
||||
request_download_timeout * 2,
|
||||
@@ -771,7 +772,7 @@ impl Reconciler {
|
||||
let observed_conf = match attached_node
|
||||
.with_client_retries(
|
||||
|client| async move { client.get_location_config(tenant_shard_id).await },
|
||||
&self.service_config.jwt_token,
|
||||
&self.service_config.pageserver_jwt_token,
|
||||
1,
|
||||
1,
|
||||
Duration::from_secs(5),
|
||||
@@ -880,7 +881,27 @@ impl Reconciler {
|
||||
self.generation = Some(generation);
|
||||
wanted_conf.generation = generation.into();
|
||||
}
|
||||
tracing::info!(node_id=%node.get_id(), "Observed configuration requires update.");
|
||||
|
||||
let diff = match observed {
|
||||
Some(ObservedStateLocation {
|
||||
conf: Some(observed),
|
||||
}) => {
|
||||
let diff = JsonDiff::diff(
|
||||
&serde_json::to_value(observed.clone()).unwrap(),
|
||||
&serde_json::to_value(wanted_conf.clone()).unwrap(),
|
||||
false,
|
||||
);
|
||||
|
||||
if let Some(json_diff) = diff.diff {
|
||||
serde_json::to_string(&json_diff).unwrap_or("diff err".to_string())
|
||||
} else {
|
||||
"unknown".to_string()
|
||||
}
|
||||
}
|
||||
_ => "full".to_string(),
|
||||
};
|
||||
|
||||
tracing::info!(node_id=%node.get_id(), "Observed configuration requires update: {diff}");
|
||||
|
||||
// Because `node` comes from a ref to &self, clone it before calling into a &mut self
|
||||
// function: this could be avoided by refactoring the state mutated by location_config into
|
||||
@@ -1099,7 +1120,7 @@ impl Reconciler {
|
||||
match origin
|
||||
.with_client_retries(
|
||||
|client| async move { client.get_location_config(tenant_shard_id).await },
|
||||
&self.service_config.jwt_token,
|
||||
&self.service_config.pageserver_jwt_token,
|
||||
1,
|
||||
3,
|
||||
Duration::from_secs(5),
|
||||
@@ -1180,7 +1201,7 @@ fn ha_aware_config(config: &TenantConfig, has_secondaries: bool) -> TenantConfig
|
||||
let mut config = config.clone();
|
||||
if has_secondaries {
|
||||
if config.heatmap_period.is_none() {
|
||||
config.heatmap_period = Some(DEFAULT_HEATMAP_PERIOD.to_string());
|
||||
config.heatmap_period = Some(DEFAULT_HEATMAP_PERIOD);
|
||||
}
|
||||
} else {
|
||||
config.heatmap_period = None;
|
||||
|
||||
@@ -348,7 +348,12 @@ pub struct Config {
|
||||
// All pageservers managed by one instance of this service must have
|
||||
// the same public key. This JWT token will be used to authenticate
|
||||
// this service to the pageservers it manages.
|
||||
pub jwt_token: Option<String>,
|
||||
pub pageserver_jwt_token: Option<String>,
|
||||
|
||||
// All safekeepers managed by one instance of this service must have
|
||||
// the same public key. This JWT token will be used to authenticate
|
||||
// this service to the safekeepers it manages.
|
||||
pub safekeeper_jwt_token: Option<String>,
|
||||
|
||||
// This JWT token will be used to authenticate this service to the control plane.
|
||||
pub control_plane_jwt_token: Option<String>,
|
||||
@@ -882,7 +887,7 @@ impl Service {
|
||||
let response = node
|
||||
.with_client_retries(
|
||||
|client| async move { client.list_location_config().await },
|
||||
&self.config.jwt_token,
|
||||
&self.config.pageserver_jwt_token,
|
||||
1,
|
||||
5,
|
||||
timeout,
|
||||
@@ -983,7 +988,7 @@ impl Service {
|
||||
let client = PageserverClient::new(
|
||||
node.get_id(),
|
||||
node.base_url(),
|
||||
self.config.jwt_token.as_deref(),
|
||||
self.config.pageserver_jwt_token.as_deref(),
|
||||
);
|
||||
match client
|
||||
.location_config(
|
||||
@@ -1553,14 +1558,14 @@ impl Service {
|
||||
let reconcilers_cancel = cancel.child_token();
|
||||
|
||||
let heartbeater_ps = Heartbeater::new(
|
||||
config.jwt_token.clone(),
|
||||
config.pageserver_jwt_token.clone(),
|
||||
config.max_offline_interval,
|
||||
config.max_warming_up_interval,
|
||||
cancel.clone(),
|
||||
);
|
||||
|
||||
let heartbeater_sk = Heartbeater::new(
|
||||
config.jwt_token.clone(),
|
||||
config.safekeeper_jwt_token.clone(),
|
||||
config.max_offline_interval,
|
||||
config.max_warming_up_interval,
|
||||
cancel.clone(),
|
||||
@@ -1907,7 +1912,7 @@ impl Service {
|
||||
let configs = match node
|
||||
.with_client_retries(
|
||||
|client| async move { client.list_location_config().await },
|
||||
&self.config.jwt_token,
|
||||
&self.config.pageserver_jwt_token,
|
||||
1,
|
||||
5,
|
||||
SHORT_RECONCILE_TIMEOUT,
|
||||
@@ -1965,7 +1970,7 @@ impl Service {
|
||||
.location_config(tenant_shard_id, config, None, false)
|
||||
.await
|
||||
},
|
||||
&self.config.jwt_token,
|
||||
&self.config.pageserver_jwt_token,
|
||||
1,
|
||||
5,
|
||||
SHORT_RECONCILE_TIMEOUT,
|
||||
@@ -2921,7 +2926,9 @@ impl Service {
|
||||
first
|
||||
};
|
||||
|
||||
let updated_config = base.apply_patch(patch);
|
||||
let updated_config = base
|
||||
.apply_patch(patch)
|
||||
.map_err(|err| ApiError::BadRequest(anyhow::anyhow!(err)))?;
|
||||
self.set_tenant_config_and_reconcile(tenant_id, updated_config)
|
||||
.await
|
||||
}
|
||||
@@ -3100,7 +3107,7 @@ impl Service {
|
||||
let client = PageserverClient::new(
|
||||
node.get_id(),
|
||||
node.base_url(),
|
||||
self.config.jwt_token.as_deref(),
|
||||
self.config.pageserver_jwt_token.as_deref(),
|
||||
);
|
||||
|
||||
tracing::info!("Doing time travel recovery for shard {tenant_shard_id}",);
|
||||
@@ -3161,7 +3168,7 @@ impl Service {
|
||||
let client = PageserverClient::new(
|
||||
node.get_id(),
|
||||
node.base_url(),
|
||||
self.config.jwt_token.as_deref(),
|
||||
self.config.pageserver_jwt_token.as_deref(),
|
||||
);
|
||||
futs.push(async move {
|
||||
let result = client
|
||||
@@ -3284,7 +3291,7 @@ impl Service {
|
||||
.tenant_delete(TenantShardId::unsharded(tenant_id))
|
||||
.await
|
||||
},
|
||||
&self.config.jwt_token,
|
||||
&self.config.pageserver_jwt_token,
|
||||
1,
|
||||
3,
|
||||
RECONCILE_TIMEOUT,
|
||||
@@ -3503,7 +3510,7 @@ impl Service {
|
||||
let timeline_info = create_one(
|
||||
shard_zero_tid,
|
||||
shard_zero_locations,
|
||||
self.config.jwt_token.clone(),
|
||||
self.config.pageserver_jwt_token.clone(),
|
||||
create_req.clone(),
|
||||
)
|
||||
.await?;
|
||||
@@ -3519,7 +3526,7 @@ impl Service {
|
||||
// Create timeline on remaining shards with number >0
|
||||
if !targets.0.is_empty() {
|
||||
// If we had multiple shards, issue requests for the remainder now.
|
||||
let jwt = &self.config.jwt_token;
|
||||
let jwt = &self.config.pageserver_jwt_token;
|
||||
self.tenant_for_shards(
|
||||
targets
|
||||
.0
|
||||
@@ -3602,7 +3609,7 @@ impl Service {
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
node,
|
||||
self.config.jwt_token.clone(),
|
||||
self.config.pageserver_jwt_token.clone(),
|
||||
req.clone(),
|
||||
))
|
||||
})
|
||||
@@ -3683,7 +3690,7 @@ impl Service {
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
node,
|
||||
self.config.jwt_token.clone(),
|
||||
self.config.pageserver_jwt_token.clone(),
|
||||
))
|
||||
})
|
||||
.await?;
|
||||
@@ -3757,7 +3764,7 @@ impl Service {
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
node,
|
||||
self.config.jwt_token.clone(),
|
||||
self.config.pageserver_jwt_token.clone(),
|
||||
dir,
|
||||
))
|
||||
})
|
||||
@@ -3872,7 +3879,7 @@ impl Service {
|
||||
futs.push(async move {
|
||||
node.with_client_retries(
|
||||
|client| op(tenant_shard_id, client),
|
||||
&self.config.jwt_token,
|
||||
&self.config.pageserver_jwt_token,
|
||||
warn_threshold,
|
||||
max_retries,
|
||||
timeout,
|
||||
@@ -4121,7 +4128,7 @@ impl Service {
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
node,
|
||||
self.config.jwt_token.clone(),
|
||||
self.config.pageserver_jwt_token.clone(),
|
||||
))
|
||||
})
|
||||
.await?;
|
||||
@@ -4143,7 +4150,7 @@ impl Service {
|
||||
shard_zero_tid,
|
||||
timeline_id,
|
||||
shard_zero_locations.latest.node,
|
||||
self.config.jwt_token.clone(),
|
||||
self.config.pageserver_jwt_token.clone(),
|
||||
)
|
||||
.await?;
|
||||
Ok(shard_zero_status)
|
||||
@@ -4542,7 +4549,7 @@ impl Service {
|
||||
|
||||
client.location_config(child_id, config, None, false).await
|
||||
},
|
||||
&self.config.jwt_token,
|
||||
&self.config.pageserver_jwt_token,
|
||||
1,
|
||||
10,
|
||||
Duration::from_secs(5),
|
||||
@@ -5142,7 +5149,7 @@ impl Service {
|
||||
let client = PageserverClient::new(
|
||||
node.get_id(),
|
||||
node.base_url(),
|
||||
self.config.jwt_token.as_deref(),
|
||||
self.config.pageserver_jwt_token.as_deref(),
|
||||
);
|
||||
let response = client
|
||||
.tenant_shard_split(
|
||||
@@ -5468,7 +5475,7 @@ impl Service {
|
||||
let client = PageserverClient::new(
|
||||
node.get_id(),
|
||||
node.base_url(),
|
||||
self.config.jwt_token.as_deref(),
|
||||
self.config.pageserver_jwt_token.as_deref(),
|
||||
);
|
||||
|
||||
let scan_result = client
|
||||
@@ -6649,11 +6656,12 @@ impl Service {
|
||||
) -> Option<ReconcilerWaiter> {
|
||||
let reconcile_needed = shard.get_reconcile_needed(nodes);
|
||||
|
||||
match reconcile_needed {
|
||||
let reconcile_reason = match reconcile_needed {
|
||||
ReconcileNeeded::No => return None,
|
||||
ReconcileNeeded::WaitExisting(waiter) => return Some(waiter),
|
||||
ReconcileNeeded::Yes => {
|
||||
ReconcileNeeded::Yes(reason) => {
|
||||
// Fall through to try and acquire units for spawning reconciler
|
||||
reason
|
||||
}
|
||||
};
|
||||
|
||||
@@ -6692,6 +6700,7 @@ impl Service {
|
||||
};
|
||||
|
||||
shard.spawn_reconciler(
|
||||
reconcile_reason,
|
||||
&self.result_tx,
|
||||
nodes,
|
||||
&self.compute_hook,
|
||||
@@ -6816,7 +6825,7 @@ impl Service {
|
||||
// with the frequency of background calls, this acts as an implicit rate limit that runs a small
|
||||
// trickle of optimizations in the background, rather than executing a large number in parallel
|
||||
// when a change occurs.
|
||||
const MAX_OPTIMIZATIONS_EXEC_PER_PASS: usize = 2;
|
||||
const MAX_OPTIMIZATIONS_EXEC_PER_PASS: usize = 16;
|
||||
|
||||
// Synchronous prepare: scan shards for possible scheduling optimizations
|
||||
let candidate_work = self.optimize_all_plan();
|
||||
@@ -6867,7 +6876,7 @@ impl Service {
|
||||
// How many candidate optimizations we will generate, before evaluating them for readniess: setting
|
||||
// this higher than the execution limit gives us a chance to execute some work even if the first
|
||||
// few optimizations we find are not ready.
|
||||
const MAX_OPTIMIZATIONS_PLAN_PER_PASS: usize = 8;
|
||||
const MAX_OPTIMIZATIONS_PLAN_PER_PASS: usize = 64;
|
||||
|
||||
let mut work = Vec::new();
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
@@ -7094,7 +7103,7 @@ impl Service {
|
||||
match attached_node
|
||||
.with_client_retries(
|
||||
|client| async move { client.tenant_heatmap_upload(tenant_shard_id).await },
|
||||
&self.config.jwt_token,
|
||||
&self.config.pageserver_jwt_token,
|
||||
3,
|
||||
10,
|
||||
SHORT_RECONCILE_TIMEOUT,
|
||||
@@ -7130,7 +7139,7 @@ impl Service {
|
||||
)
|
||||
.await
|
||||
},
|
||||
&self.config.jwt_token,
|
||||
&self.config.pageserver_jwt_token,
|
||||
3,
|
||||
10,
|
||||
SHORT_RECONCILE_TIMEOUT,
|
||||
@@ -7185,7 +7194,7 @@ impl Service {
|
||||
let request = request_ref.clone();
|
||||
client.top_tenant_shards(request.clone()).await
|
||||
},
|
||||
&self.config.jwt_token,
|
||||
&self.config.pageserver_jwt_token,
|
||||
3,
|
||||
3,
|
||||
Duration::from_secs(5),
|
||||
@@ -7358,7 +7367,7 @@ impl Service {
|
||||
match node
|
||||
.with_client_retries(
|
||||
|client| async move { client.tenant_secondary_status(tenant_shard_id).await },
|
||||
&self.config.jwt_token,
|
||||
&self.config.pageserver_jwt_token,
|
||||
1,
|
||||
3,
|
||||
Duration::from_millis(250),
|
||||
|
||||
@@ -481,7 +481,14 @@ pub(crate) enum ReconcileNeeded {
|
||||
/// spawned: wait for the existing reconciler rather than spawning a new one.
|
||||
WaitExisting(ReconcilerWaiter),
|
||||
/// shard needs reconciliation: call into [`TenantShard::spawn_reconciler`]
|
||||
Yes,
|
||||
Yes(ReconcileReason),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum ReconcileReason {
|
||||
ActiveNodesDirty,
|
||||
UnknownLocation,
|
||||
PendingComputeNotification,
|
||||
}
|
||||
|
||||
/// Pending modification to the observed state of a tenant shard.
|
||||
@@ -1341,12 +1348,18 @@ impl TenantShard {
|
||||
|
||||
let active_nodes_dirty = self.dirty(pageservers);
|
||||
|
||||
// Even if there is no pageserver work to be done, if we have a pending notification to computes,
|
||||
// wake up a reconciler to send it.
|
||||
let do_reconcile =
|
||||
active_nodes_dirty || dirty_observed || self.pending_compute_notification;
|
||||
let reconcile_needed = match (
|
||||
active_nodes_dirty,
|
||||
dirty_observed,
|
||||
self.pending_compute_notification,
|
||||
) {
|
||||
(true, _, _) => ReconcileNeeded::Yes(ReconcileReason::ActiveNodesDirty),
|
||||
(_, true, _) => ReconcileNeeded::Yes(ReconcileReason::UnknownLocation),
|
||||
(_, _, true) => ReconcileNeeded::Yes(ReconcileReason::PendingComputeNotification),
|
||||
_ => ReconcileNeeded::No,
|
||||
};
|
||||
|
||||
if !do_reconcile {
|
||||
if matches!(reconcile_needed, ReconcileNeeded::No) {
|
||||
tracing::debug!("Not dirty, no reconciliation needed.");
|
||||
return ReconcileNeeded::No;
|
||||
}
|
||||
@@ -1389,7 +1402,7 @@ impl TenantShard {
|
||||
}
|
||||
}
|
||||
|
||||
ReconcileNeeded::Yes
|
||||
reconcile_needed
|
||||
}
|
||||
|
||||
/// Ensure the sequence number is set to a value where waiting for this value will make us wait
|
||||
@@ -1479,6 +1492,7 @@ impl TenantShard {
|
||||
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
|
||||
pub(crate) fn spawn_reconciler(
|
||||
&mut self,
|
||||
reason: ReconcileReason,
|
||||
result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResultRequest>,
|
||||
pageservers: &Arc<HashMap<NodeId, Node>>,
|
||||
compute_hook: &Arc<ComputeHook>,
|
||||
@@ -1538,7 +1552,7 @@ impl TenantShard {
|
||||
let reconcile_seq = self.sequence;
|
||||
let long_reconcile_threshold = service_config.long_reconcile_threshold;
|
||||
|
||||
tracing::info!(seq=%reconcile_seq, "Spawning Reconciler for sequence {}", self.sequence);
|
||||
tracing::info!(seq=%reconcile_seq, "Spawning Reconciler ({reason:?})");
|
||||
let must_notify = self.pending_compute_notification;
|
||||
let reconciler_span = tracing::info_span!(parent: None, "reconciler", seq=%reconcile_seq,
|
||||
tenant_id=%reconciler.tenant_shard_id.tenant_id,
|
||||
|
||||
@@ -501,19 +501,31 @@ def test_compute_installed_extensions_metric(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test that the compute_installed_extensions properly reports accurate
|
||||
results. Important to note that currently this metric is only gathered on
|
||||
compute start.
|
||||
compute start. We install the neon extension into a database other than
|
||||
postgres because compute_ctl will run `ALTER EXTENSION neon UPDATE` during
|
||||
Postgres startup in the postgres database, creating a race condition.
|
||||
"""
|
||||
DB_NAME = "test"
|
||||
|
||||
env = neon_simple_env
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
endpoint.safe_psql(f"CREATE DATABASE {DB_NAME}")
|
||||
|
||||
# The metric is only gathered on compute start, so restart to check that
|
||||
# plpgsql is now in 3 databases, instead of its regular 2, template1 and
|
||||
# postgres.
|
||||
endpoint.stop()
|
||||
endpoint.start()
|
||||
|
||||
client = endpoint.http_client()
|
||||
|
||||
def __has_plpgsql(samples: list[Sample]) -> bool:
|
||||
"""
|
||||
Check that plpgsql is installed in the template1 and postgres databases
|
||||
Check that plpgsql is installed in the template1, postgres, and test
|
||||
databases
|
||||
"""
|
||||
return len(samples) == 1 and samples[0].value == 2
|
||||
return len(samples) == 1 and samples[0].value == 3
|
||||
|
||||
wait_until(
|
||||
collect_metric(
|
||||
@@ -525,8 +537,8 @@ def test_compute_installed_extensions_metric(neon_simple_env: NeonEnv):
|
||||
name="compute_installed_extensions",
|
||||
)
|
||||
|
||||
# Install the neon extension, so we can check for it on the restart
|
||||
endpoint.safe_psql("CREATE EXTENSION neon VERSION '1.0'")
|
||||
# Install the neon extension, so we can check for it on the restart.
|
||||
endpoint.safe_psql("CREATE EXTENSION neon VERSION '1.0'", dbname=DB_NAME)
|
||||
|
||||
# The metric is only gathered on compute start, so restart to check if the
|
||||
# neon extension will now be there.
|
||||
|
||||
101
test_runner/regress/test_lfc_prefetch.py
Normal file
101
test_runner/regress/test_lfc_prefetch.py
Normal file
@@ -0,0 +1,101 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
from fixtures.utils import USE_LFC
|
||||
|
||||
|
||||
@pytest.mark.timeout(600)
|
||||
@pytest.mark.skipif(not USE_LFC, reason="LFC is disabled, skipping")
|
||||
def test_lfc_prefetch(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test resizing the Local File Cache
|
||||
"""
|
||||
env = neon_simple_env
|
||||
endpoint = env.endpoints.create_start(
|
||||
"main",
|
||||
config_lines=[
|
||||
"neon.max_file_cache_size=1GB",
|
||||
"neon.file_cache_size_limit=1GB",
|
||||
"effective_io_concurrency=100",
|
||||
"shared_buffers=1MB",
|
||||
"enable_bitmapscan=off",
|
||||
"enable_seqscan=off",
|
||||
"autovacuum=off",
|
||||
],
|
||||
)
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
cur.execute("create extension neon")
|
||||
cur.execute("create table t(pk integer, sk integer, filler text default repeat('x',200))")
|
||||
cur.execute("set statement_timeout=0")
|
||||
cur.execute("select setseed(0.5)")
|
||||
cur.execute("insert into t values (generate_series(1,1000000),random()*1000000)")
|
||||
cur.execute("create index on t(sk)")
|
||||
cur.execute("vacuum t")
|
||||
|
||||
# reset LFC
|
||||
cur.execute("alter system set neon.file_cache_size_limit=0")
|
||||
cur.execute("select pg_reload_conf()")
|
||||
time.sleep(1)
|
||||
cur.execute("alter system set neon.file_cache_size_limit='1GB'")
|
||||
cur.execute("select pg_reload_conf()")
|
||||
|
||||
cur.execute(
|
||||
"explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 100000 and 200000 limit 100) s1"
|
||||
)
|
||||
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
|
||||
log.info(f"Unused prefetches: {prefetch_expired}")
|
||||
|
||||
cur.execute(
|
||||
"explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 200000 and 300000 limit 100) s2"
|
||||
)
|
||||
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
|
||||
log.info(f"Unused prefetches: {prefetch_expired}")
|
||||
|
||||
cur.execute(
|
||||
"explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 300000 and 400000 limit 100) s3"
|
||||
)
|
||||
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
|
||||
log.info(f"Unused prefetches: {prefetch_expired}")
|
||||
|
||||
cur.execute(
|
||||
"explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 100000 and 200000 limit 100) s4"
|
||||
)
|
||||
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
|
||||
log.info(f"Unused prefetches: {prefetch_expired}")
|
||||
|
||||
# if prefetch requests are not stored in LFC, we continue to sent unused prefetch request tyo PS
|
||||
assert prefetch_expired > 0
|
||||
|
||||
cur.execute("set neon.store_prefetch_result_in_lfc=on")
|
||||
|
||||
cur.execute(
|
||||
"explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 500000 and 600000 limit 100) s5"
|
||||
)
|
||||
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
|
||||
log.info(f"Unused prefetches: {prefetch_expired}")
|
||||
|
||||
cur.execute(
|
||||
"explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 600000 and 700000 limit 100) s6"
|
||||
)
|
||||
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
|
||||
log.info(f"Unused prefetches: {prefetch_expired}")
|
||||
|
||||
cur.execute(
|
||||
"explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 700000 and 800000 limit 100) s7"
|
||||
)
|
||||
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
|
||||
log.info(f"Unused prefetches: {prefetch_expired}")
|
||||
|
||||
cur.execute(
|
||||
"explain (analyze,prefetch,format json) select sum(pk) from (select pk from t where sk between 500000 and 600000 limit 100) s8"
|
||||
)
|
||||
prefetch_expired = cur.fetchall()[0][0][0]["Plan"]["Prefetch Expired Requests"]
|
||||
log.info(f"Unused prefetches: {prefetch_expired}")
|
||||
|
||||
# No redundant prefethc requrests if prefetch results are stored in LFC
|
||||
assert prefetch_expired == 0
|
||||
@@ -3817,3 +3817,43 @@ def test_update_node_on_registration(neon_env_builder: NeonEnvBuilder):
|
||||
nodes = env.storage_controller.node_list()
|
||||
assert len(nodes) == 1
|
||||
assert nodes[0]["listen_https_port"] is None
|
||||
|
||||
|
||||
def test_storage_controller_location_conf_equivalence(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Validate that a storage controller restart with no shards in a transient state
|
||||
performs zero reconciliations at start-up. Implicitly, this means that the location
|
||||
configs returned by the pageserver are identical to the persisted state in the
|
||||
storage controller database.
|
||||
"""
|
||||
neon_env_builder.num_pageservers = 1
|
||||
neon_env_builder.storage_controller_config = {
|
||||
"start_as_candidate": False,
|
||||
}
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
tenant_id = TenantId.generate()
|
||||
env.storage_controller.tenant_create(
|
||||
tenant_id, shard_count=2, tenant_config={"pitr_interval": "1h2m3s"}
|
||||
)
|
||||
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
reconciles_before_restart = env.storage_controller.get_metric_value(
|
||||
"storage_controller_reconcile_complete_total", filter={"status": "ok"}
|
||||
)
|
||||
|
||||
assert reconciles_before_restart != 0
|
||||
|
||||
env.storage_controller.stop()
|
||||
env.storage_controller.start()
|
||||
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
reconciles_after_restart = env.storage_controller.get_metric_value(
|
||||
"storage_controller_reconcile_complete_total", filter={"status": "ok"}
|
||||
)
|
||||
|
||||
assert reconciles_after_restart == 0
|
||||
|
||||
Reference in New Issue
Block a user