Merge branch 'main' into bodobolero/duckdb_static

This commit is contained in:
Peter Bendel
2025-02-21 08:49:42 +01:00
committed by GitHub
44 changed files with 767 additions and 152 deletions

1
Cargo.lock generated
View File

@@ -7616,6 +7616,7 @@ dependencies = [
"once_cell",
"pin-project-lite",
"postgres_connection",
"pprof",
"pq_proto",
"rand 0.8.5",
"regex",

View File

@@ -292,7 +292,7 @@ WORKDIR /home/nonroot
# Rust
# Please keep the version of llvm (installed above) in sync with rust llvm (`rustc --version --verbose | grep LLVM`)
ENV RUSTC_VERSION=1.84.1
ENV RUSTC_VERSION=1.85.0
ENV RUSTUP_HOME="/home/nonroot/.rustup"
ENV PATH="/home/nonroot/.cargo/bin:${PATH}"
ARG RUSTFILT_VERSION=0.2.1

View File

@@ -1669,7 +1669,11 @@ COPY --from=pg_anon-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_ivm-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_partman-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_mooncake-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_duckdb-build /usr/local/pgsql/ /usr/local/pgsql/
# Disabled temporarily, because it clashed with pg_mooncake. pg_mooncake
# also depends on libduckdb, but a different version.
#COPY --from=pg_duckdb-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_repack-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pgaudit-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pgauditlogtofile-build /usr/local/pgsql/ /usr/local/pgsql/

View File

@@ -41,7 +41,6 @@ use std::process::exit;
use std::str::FromStr;
use std::sync::atomic::Ordering;
use std::sync::{mpsc, Arc, Condvar, Mutex, RwLock};
use std::time::SystemTime;
use std::{thread, time::Duration};
use anyhow::{Context, Result};
@@ -86,19 +85,6 @@ fn parse_remote_ext_config(arg: &str) -> Result<String> {
}
}
/// Generate a compute ID if one is not supplied. This exists to keep forward
/// compatibility tests working, but will be removed in a future iteration.
fn generate_compute_id() -> String {
let now = SystemTime::now();
format!(
"compute-{}",
now.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs()
)
}
#[derive(Parser)]
#[command(rename_all = "kebab-case")]
struct Cli {
@@ -112,16 +98,13 @@ struct Cli {
/// outside the compute will talk to the compute through this port. Keep
/// the previous name for this argument around for a smoother release
/// with the control plane.
///
/// TODO: Remove the alias after the control plane release which teaches the
/// control plane about the renamed argument.
#[arg(long, alias = "http-port", default_value_t = 3080)]
#[arg(long, default_value_t = 3080)]
pub external_http_port: u16,
/// The port to bind the internal listening HTTP server to. Clients like
/// The port to bind the internal listening HTTP server to. Clients include
/// the neon extension (for installing remote extensions) and local_proxy.
#[arg(long)]
pub internal_http_port: Option<u16>,
#[arg(long, default_value_t = 3081)]
pub internal_http_port: u16,
#[arg(short = 'D', long, value_name = "DATADIR")]
pub pgdata: String,
@@ -156,7 +139,7 @@ struct Cli {
#[arg(short = 'S', long, group = "spec-path")]
pub spec_path: Option<OsString>,
#[arg(short = 'i', long, group = "compute-id", default_value = generate_compute_id())]
#[arg(short = 'i', long, group = "compute-id")]
pub compute_id: String,
#[arg(short = 'p', long, conflicts_with_all = ["spec", "spec-path"], value_name = "CONTROL_PLANE_API_BASE_URL")]
@@ -359,7 +342,7 @@ fn wait_spec(
pgbin: cli.pgbin.clone(),
pgversion: get_pg_version_string(&cli.pgbin),
external_http_port: cli.external_http_port,
internal_http_port: cli.internal_http_port.unwrap_or(cli.external_http_port + 1),
internal_http_port: cli.internal_http_port,
live_config_allowed,
state: Mutex::new(new_state),
state_changed: Condvar::new(),
@@ -383,7 +366,7 @@ fn wait_spec(
// The internal HTTP server could be launched later, but there isn't much
// sense in waiting.
Server::Internal(cli.internal_http_port.unwrap_or(cli.external_http_port + 1)).launch(&compute);
Server::Internal(cli.internal_http_port).launch(&compute);
if !spec_set {
// No spec provided, hang waiting for it.

View File

@@ -2,6 +2,7 @@ DO $$
DECLARE
subname TEXT;
BEGIN
LOCK TABLE pg_subscription IN ACCESS EXCLUSIVE MODE;
FOR subname IN SELECT pg_subscription.subname FROM pg_subscription WHERE subdbid = (SELECT oid FROM pg_database WHERE datname = {datname_str}) LOOP
EXECUTE format('ALTER SUBSCRIPTION %I DISABLE;', subname);
EXECUTE format('ALTER SUBSCRIPTION %I SET (slot_name = NONE);', subname);

View File

@@ -46,6 +46,8 @@ use std::process::Command;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use anyhow::{anyhow, bail, Context, Result};
use compute_api::requests::ConfigurationRequest;
@@ -59,6 +61,7 @@ use nix::sys::signal::Signal;
use pageserver_api::shard::ShardStripeSize;
use reqwest::header::CONTENT_TYPE;
use serde::{Deserialize, Serialize};
use tracing::debug;
use url::Host;
use utils::id::{NodeId, TenantId, TimelineId};
@@ -81,8 +84,10 @@ pub struct EndpointConf {
internal_http_port: u16,
pg_version: u32,
skip_pg_catalog_updates: bool,
reconfigure_concurrency: usize,
drop_subscriptions_before_start: bool,
features: Vec<ComputeFeature>,
cluster: Option<Cluster>,
}
//
@@ -179,7 +184,9 @@ impl ComputeControlPlane {
// we also skip catalog updates in the cloud.
skip_pg_catalog_updates,
drop_subscriptions_before_start,
reconfigure_concurrency: 1,
features: vec![],
cluster: None,
});
ep.create_endpoint_dir()?;
@@ -196,7 +203,9 @@ impl ComputeControlPlane {
pg_version,
skip_pg_catalog_updates,
drop_subscriptions_before_start,
reconfigure_concurrency: 1,
features: vec![],
cluster: None,
})?,
)?;
std::fs::write(
@@ -261,8 +270,11 @@ pub struct Endpoint {
skip_pg_catalog_updates: bool,
drop_subscriptions_before_start: bool,
reconfigure_concurrency: usize,
// Feature flags
features: Vec<ComputeFeature>,
// Cluster settings
cluster: Option<Cluster>,
}
#[derive(PartialEq, Eq)]
@@ -302,6 +314,8 @@ impl Endpoint {
let conf: EndpointConf =
serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?;
debug!("serialized endpoint conf: {:?}", conf);
Ok(Endpoint {
pg_address: SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), conf.pg_port),
external_http_address: SocketAddr::new(
@@ -319,8 +333,10 @@ impl Endpoint {
tenant_id: conf.tenant_id,
pg_version: conf.pg_version,
skip_pg_catalog_updates: conf.skip_pg_catalog_updates,
reconfigure_concurrency: conf.reconfigure_concurrency,
drop_subscriptions_before_start: conf.drop_subscriptions_before_start,
features: conf.features,
cluster: conf.cluster,
})
}
@@ -607,7 +623,7 @@ impl Endpoint {
};
// Create spec file
let spec = ComputeSpec {
let mut spec = ComputeSpec {
skip_pg_catalog_updates: self.skip_pg_catalog_updates,
format_version: 1.0,
operation_uuid: None,
@@ -640,7 +656,7 @@ impl Endpoint {
Vec::new()
},
settings: None,
postgresql_conf: Some(postgresql_conf),
postgresql_conf: Some(postgresql_conf.clone()),
},
delta_operations: None,
tenant_id: Some(self.tenant_id),
@@ -653,9 +669,35 @@ impl Endpoint {
pgbouncer_settings: None,
shard_stripe_size: Some(shard_stripe_size),
local_proxy_config: None,
reconfigure_concurrency: 1,
reconfigure_concurrency: self.reconfigure_concurrency,
drop_subscriptions_before_start: self.drop_subscriptions_before_start,
};
// this strange code is needed to support respec() in tests
if self.cluster.is_some() {
debug!("Cluster is already set in the endpoint spec, using it");
spec.cluster = self.cluster.clone().unwrap();
debug!("spec.cluster {:?}", spec.cluster);
// fill missing fields again
if create_test_user {
spec.cluster.roles.push(Role {
name: PgIdent::from_str("test").unwrap(),
encrypted_password: None,
options: None,
});
spec.cluster.databases.push(Database {
name: PgIdent::from_str("neondb").unwrap(),
owner: PgIdent::from_str("test").unwrap(),
options: None,
restrict_conn: false,
invalid: false,
});
}
spec.cluster.postgresql_conf = Some(postgresql_conf);
}
let spec_path = self.endpoint_path().join("spec.json");
std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
@@ -673,18 +715,14 @@ impl Endpoint {
println!("Also at '{}'", conn_str);
}
let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl"));
//cmd.args([
// "--external-http-port",
// &self.external_http_address.port().to_string(),
//])
//.args([
// "--internal-http-port",
// &self.internal_http_address.port().to_string(),
//])
cmd.args([
"--http-port",
"--external-http-port",
&self.external_http_address.port().to_string(),
])
.args([
"--internal-http-port",
&self.internal_http_address.port().to_string(),
])
.args(["--pgdata", self.pgdata().to_str().unwrap()])
.args(["--connstr", &conn_str])
.args([
@@ -701,20 +739,16 @@ impl Endpoint {
])
// TODO: It would be nice if we generated compute IDs with the same
// algorithm as the real control plane.
//
// TODO: Add this back when
// https://github.com/neondatabase/neon/pull/10747 is merged.
//
//.args([
// "--compute-id",
// &format!(
// "compute-{}",
// SystemTime::now()
// .duration_since(UNIX_EPOCH)
// .unwrap()
// .as_secs()
// ),
//])
.args([
"--compute-id",
&format!(
"compute-{}",
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
),
])
.stdin(std::process::Stdio::null())
.stderr(logfile.try_clone()?)
.stdout(logfile);

View File

@@ -47,6 +47,9 @@ enum Command {
listen_http_addr: String,
#[arg(long)]
listen_http_port: u16,
#[arg(long)]
listen_https_port: Option<u16>,
#[arg(long)]
availability_zone_id: String,
},
@@ -394,6 +397,7 @@ async fn main() -> anyhow::Result<()> {
listen_pg_port,
listen_http_addr,
listen_http_port,
listen_https_port,
availability_zone_id,
} => {
storcon_client
@@ -406,6 +410,7 @@ async fn main() -> anyhow::Result<()> {
listen_pg_port,
listen_http_addr,
listen_http_port,
listen_https_port,
availability_zone_id: AvailabilityZone(availability_zone_id),
}),
)

View File

@@ -77,4 +77,5 @@ echo "Start compute node"
/usr/local/bin/compute_ctl --pgdata /var/db/postgres/compute \
-C "postgresql://cloud_admin@localhost:55433/postgres" \
-b /usr/local/bin/postgres \
--compute-id "compute-$RANDOM" \
-S ${SPEC_FILE}

View File

@@ -252,7 +252,7 @@ pub enum ComputeMode {
Replica,
}
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
pub struct Cluster {
pub cluster_id: Option<String>,
pub name: Option<String>,
@@ -283,7 +283,7 @@ pub struct DeltaOp {
/// Rust representation of Postgres role info with only those fields
/// that matter for us.
#[derive(Clone, Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
pub struct Role {
pub name: PgIdent,
pub encrypted_password: Option<String>,
@@ -292,7 +292,7 @@ pub struct Role {
/// Rust representation of Postgres database info with only those fields
/// that matter for us.
#[derive(Clone, Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
pub struct Database {
pub name: PgIdent,
pub owner: PgIdent,
@@ -308,7 +308,7 @@ pub struct Database {
/// Common type representing both SQL statement params with or without value,
/// like `LOGIN` or `OWNER username` in the `CREATE/ALTER ROLE`, and config
/// options like `wal_level = logical`.
#[derive(Clone, Debug, Deserialize, Serialize)]
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
pub struct GenericOption {
pub name: String,
pub value: Option<String>,

View File

@@ -122,6 +122,8 @@ pub struct ConfigToml {
pub page_service_pipelining: PageServicePipeliningConfig,
pub get_vectored_concurrent_io: GetVectoredConcurrentIo,
pub enable_read_path_debugging: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub validate_wal_contiguity: Option<bool>,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -521,6 +523,7 @@ impl Default for ConfigToml {
} else {
None
},
validate_wal_contiguity: None,
}
}
}

View File

@@ -57,6 +57,7 @@ pub struct NodeRegisterRequest {
pub listen_http_addr: String,
pub listen_http_port: u16,
pub listen_https_port: Option<u16>,
pub availability_zone_id: AvailabilityZone,
}
@@ -105,6 +106,7 @@ pub struct TenantLocateResponseShard {
pub listen_http_addr: String,
pub listen_http_port: u16,
pub listen_https_port: Option<u16>,
}
#[derive(Serialize, Deserialize)]
@@ -148,6 +150,7 @@ pub struct NodeDescribeResponse {
pub listen_http_addr: String,
pub listen_http_port: u16,
pub listen_https_port: Option<u16>,
pub listen_pg_addr: String,
pub listen_pg_port: u16,

View File

@@ -27,7 +27,7 @@ humantime.workspace = true
fail.workspace = true
futures = { workspace = true }
jsonwebtoken.workspace = true
nix = {workspace = true, features = [ "ioctl" ] }
nix = { workspace = true, features = ["ioctl"] }
once_cell.workspace = true
pin-project-lite.workspace = true
regex.workspace = true
@@ -61,6 +61,7 @@ bytes.workspace = true
criterion.workspace = true
hex-literal.workspace = true
camino-tempfile.workspace = true
pprof.workspace = true
serde_assert.workspace = true
tokio = { workspace = true, features = ["test-util"] }

View File

@@ -0,0 +1,26 @@
## Utils Benchmarks
To run benchmarks:
```sh
# All benchmarks.
cargo bench --package utils
# Specific file.
cargo bench --package utils --bench benchmarks
# Specific benchmark.
cargo bench --package utils --bench benchmarks warn_slow/enabled=true
# List available benchmarks.
cargo bench --package utils --benches -- --list
# Generate flamegraph profiles using pprof-rs, profiling for 10 seconds.
# Output in target/criterion/*/profile/flamegraph.svg.
cargo bench --package utils --bench benchmarks warn_slow/enabled=true --profile-time 10
```
Additional charts and statistics are available in `target/criterion/report/index.html`.
Benchmarks are automatically compared against the previous run. To compare against other runs, see
`--baseline` and `--save-baseline`.

View File

@@ -1,5 +1,18 @@
use criterion::{criterion_group, criterion_main, Criterion};
use std::time::Duration;
use criterion::{criterion_group, criterion_main, Bencher, Criterion};
use pprof::criterion::{Output, PProfProfiler};
use utils::id;
use utils::logging::warn_slow;
// Register benchmarks with Criterion.
criterion_group!(
name = benches;
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
targets = bench_id_stringify,
bench_warn_slow,
);
criterion_main!(benches);
pub fn bench_id_stringify(c: &mut Criterion) {
// Can only use public methods.
@@ -16,5 +29,31 @@ pub fn bench_id_stringify(c: &mut Criterion) {
});
}
criterion_group!(benches, bench_id_stringify);
criterion_main!(benches);
pub fn bench_warn_slow(c: &mut Criterion) {
for enabled in [false, true] {
c.bench_function(&format!("warn_slow/enabled={enabled}"), |b| {
run_bench(b, enabled).unwrap()
});
}
// The actual benchmark.
fn run_bench(b: &mut Bencher, enabled: bool) -> anyhow::Result<()> {
const THRESHOLD: Duration = Duration::from_secs(1);
// Use a multi-threaded runtime to avoid thread parking overhead when yielding.
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?;
// Test both with and without warn_slow, since we're essentially measuring Tokio scheduling
// performance too. Use a simple noop future that yields once, to avoid any scheduler fast
// paths for a ready future.
if enabled {
b.iter(|| runtime.block_on(warn_slow("ready", THRESHOLD, tokio::task::yield_now())));
} else {
b.iter(|| runtime.block_on(tokio::task::yield_now()));
}
Ok(())
}
}

View File

@@ -1,9 +1,13 @@
use std::future::Future;
use std::str::FromStr;
use std::time::Duration;
use anyhow::Context;
use metrics::{IntCounter, IntCounterVec};
use once_cell::sync::Lazy;
use strum_macros::{EnumString, VariantNames};
use tokio::time::Instant;
use tracing::warn;
/// Logs a critical error, similarly to `tracing::error!`. This will:
///
@@ -318,6 +322,41 @@ impl std::fmt::Debug for SecretString {
}
}
/// Logs a periodic warning if a future is slow to complete.
///
/// This is performance-sensitive as it's used on the GetPage read path.
#[inline]
pub async fn warn_slow<O>(name: &str, threshold: Duration, f: impl Future<Output = O>) -> O {
// TODO: we unfortunately have to pin the future on the heap, since GetPage futures are huge and
// won't fit on the stack.
let mut f = Box::pin(f);
let started = Instant::now();
let mut attempt = 1;
loop {
// NB: use timeout_at() instead of timeout() to avoid an extra clock reading in the common
// case where the timeout doesn't fire.
let deadline = started + attempt * threshold;
if let Ok(output) = tokio::time::timeout_at(deadline, &mut f).await {
// NB: we check if we exceeded the threshold even if the timeout never fired, because
// scheduling or execution delays may cause the future to succeed even if it exceeds the
// timeout. This costs an extra unconditional clock reading, but seems worth it to avoid
// false negatives.
let elapsed = started.elapsed();
if elapsed >= threshold {
warn!("slow {name} completed after {:.3}s", elapsed.as_secs_f64());
}
return output;
}
let elapsed = started.elapsed().as_secs_f64();
warn!("slow {name} still running after {elapsed:.3}s",);
attempt += 1;
}
}
#[cfg(test)]
mod tests {
use metrics::{core::Opts, IntCounterVec};

View File

@@ -5,6 +5,7 @@ package interpreted_wal;
message InterpretedWalRecords {
repeated InterpretedWalRecord records = 1;
optional uint64 next_record_lsn = 2;
optional uint64 raw_wal_start_lsn = 3;
}
message InterpretedWalRecord {

View File

@@ -60,7 +60,11 @@ pub struct InterpretedWalRecords {
pub records: Vec<InterpretedWalRecord>,
// Start LSN of the next record after the batch.
// Note that said record may not belong to the current shard.
pub next_record_lsn: Option<Lsn>,
pub next_record_lsn: Lsn,
// Inclusive start LSN of the PG WAL from which the interpreted
// WAL records were extracted. Note that this is not necessarily the
// start LSN of the first interpreted record in the batch.
pub raw_wal_start_lsn: Option<Lsn>,
}
/// An interpreted Postgres WAL record, ready to be handled by the pageserver

View File

@@ -167,7 +167,8 @@ impl TryFrom<InterpretedWalRecords> for proto::InterpretedWalRecords {
.collect::<Result<Vec<_>, _>>()?;
Ok(proto::InterpretedWalRecords {
records,
next_record_lsn: value.next_record_lsn.map(|l| l.0),
next_record_lsn: Some(value.next_record_lsn.0),
raw_wal_start_lsn: value.raw_wal_start_lsn.map(|l| l.0),
})
}
}
@@ -254,7 +255,11 @@ impl TryFrom<proto::InterpretedWalRecords> for InterpretedWalRecords {
Ok(InterpretedWalRecords {
records,
next_record_lsn: value.next_record_lsn.map(Lsn::from),
next_record_lsn: value
.next_record_lsn
.map(Lsn::from)
.expect("Always provided"),
raw_wal_start_lsn: value.raw_wal_start_lsn.map(Lsn::from),
})
}
}

View File

@@ -134,6 +134,7 @@ fn main() -> anyhow::Result<()> {
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
info!(?conf.virtual_file_io_mode, "starting with virtual_file IO mode");
info!(?conf.wal_receiver_protocol, "starting with WAL receiver protocol");
info!(?conf.validate_wal_contiguity, "starting with WAL contiguity validation");
info!(?conf.page_service_pipelining, "starting with page service pipelining config");
info!(?conf.get_vectored_concurrent_io, "starting with get_vectored IO concurrency config");

View File

@@ -197,6 +197,10 @@ pub struct PageServerConf {
/// Enable read path debugging. If enabled, read key errors will print a backtrace of the layer
/// files read.
pub enable_read_path_debugging: bool,
/// Interpreted protocol feature: if enabled, validate that the logical WAL received from
/// safekeepers does not have gaps.
pub validate_wal_contiguity: bool,
}
/// Token for authentication to safekeepers
@@ -360,6 +364,7 @@ impl PageServerConf {
page_service_pipelining,
get_vectored_concurrent_io,
enable_read_path_debugging,
validate_wal_contiguity,
} = config_toml;
let mut conf = PageServerConf {
@@ -446,6 +451,7 @@ impl PageServerConf {
virtual_file_io_mode: virtual_file_io_mode.unwrap_or(virtual_file::IoMode::preferred()),
no_sync: no_sync.unwrap_or(false),
enable_read_path_debugging: enable_read_path_debugging.unwrap_or(false),
validate_wal_contiguity: validate_wal_contiguity.unwrap_or(false),
};
// ------------------------------------------------------------

View File

@@ -173,6 +173,7 @@ impl ControlPlaneGenerationsApi for ControllerUpcallClient {
listen_pg_port: m.postgres_port,
listen_http_addr: m.http_host,
listen_http_port: m.http_port,
listen_https_port: None, // TODO: Support https.
availability_zone_id: az_id.expect("Checked above"),
})
}

View File

@@ -34,11 +34,13 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::SystemTime;
use std::time::{Duration, Instant};
use strum_macros::IntoStaticStr;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::logging::warn_slow;
use utils::sync::gate::{Gate, GateGuard};
use utils::sync::spsc_fold;
use utils::{
@@ -81,6 +83,9 @@ use std::os::fd::AsRawFd;
/// NB: this is a different value than [`crate::http::routes::ACTIVE_TENANT_TIMEOUT`].
const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
/// Threshold at which to log a warning about slow GetPage requests.
const WARN_SLOW_GETPAGE_THRESHOLD: Duration = Duration::from_secs(30);
///////////////////////////////////////////////////////////////////////////////
pub struct Listener {
@@ -594,6 +599,7 @@ struct BatchedTestRequest {
/// NB: we only hold [`timeline::handle::WeakHandle`] inside this enum,
/// so that we don't keep the [`Timeline::gate`] open while the batch
/// is being built up inside the [`spsc_fold`] (pagestream pipelining).
#[derive(IntoStaticStr)]
enum BatchedFeMessage {
Exists {
span: Span,
@@ -638,6 +644,10 @@ enum BatchedFeMessage {
}
impl BatchedFeMessage {
fn as_static_str(&self) -> &'static str {
self.into()
}
fn observe_execution_start(&mut self, at: Instant) {
match self {
BatchedFeMessage::Exists { timer, .. }
@@ -1463,17 +1473,20 @@ impl PageServerHandler {
}
};
let err = self
.pagesteam_handle_batched_message(
let result = warn_slow(
msg.as_static_str(),
WARN_SLOW_GETPAGE_THRESHOLD,
self.pagesteam_handle_batched_message(
pgb_writer,
msg,
io_concurrency.clone(),
&cancel,
protocol_version,
ctx,
)
.await;
match err {
),
)
.await;
match result {
Ok(()) => {}
Err(e) => break e,
}
@@ -1636,13 +1649,17 @@ impl PageServerHandler {
return Err(e);
}
};
self.pagesteam_handle_batched_message(
pgb_writer,
batch,
io_concurrency.clone(),
&cancel,
protocol_version,
&ctx,
warn_slow(
batch.as_static_str(),
WARN_SLOW_GETPAGE_THRESHOLD,
self.pagesteam_handle_batched_message(
pgb_writer,
batch,
io_concurrency.clone(),
&cancel,
protocol_version,
&ctx,
),
)
.await?;
}

View File

@@ -2874,6 +2874,7 @@ impl Timeline {
auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(),
availability_zone: self.conf.availability_zone.clone(),
ingest_batch_size: self.conf.ingest_batch_size,
validate_wal_contiguity: self.conf.validate_wal_contiguity,
},
broker_client,
ctx,

View File

@@ -2212,7 +2212,7 @@ impl Timeline {
let sub_compaction_max_job_size_mb =
sub_compaction_max_job_size_mb.unwrap_or(GC_COMPACT_MAX_SIZE_MB);
let mut compact_jobs = Vec::new();
let mut compact_jobs = Vec::<GcCompactJob>::new();
// For now, we simply use the key partitioning information; we should do a more fine-grained partitioning
// by estimating the amount of files read for a compaction job. We should also partition on LSN.
let ((dense_ks, sparse_ks), _) = self.partitioning.read().as_ref().clone();
@@ -2299,16 +2299,25 @@ impl Timeline {
} else {
end
};
info!(
"splitting compaction job: {}..{}, estimated_size={}",
start, end, total_size
);
compact_jobs.push(GcCompactJob {
dry_run: job.dry_run,
compact_key_range: start..end,
compact_lsn_range: job.compact_lsn_range.start..compact_below_lsn,
});
current_start = Some(end);
if total_size == 0 && !compact_jobs.is_empty() {
info!(
"splitting compaction job: {}..{}, estimated_size={}, extending the previous job",
start, end, total_size
);
compact_jobs.last_mut().unwrap().compact_key_range.end = end;
current_start = Some(end);
} else {
info!(
"splitting compaction job: {}..{}, estimated_size={}",
start, end, total_size
);
compact_jobs.push(GcCompactJob {
dry_run: job.dry_run,
compact_key_range: start..end,
compact_lsn_range: job.compact_lsn_range.start..compact_below_lsn,
});
current_start = Some(end);
}
}
}
Ok(compact_jobs)

View File

@@ -56,6 +56,7 @@ pub struct WalReceiverConf {
pub auth_token: Option<Arc<String>>,
pub availability_zone: Option<String>,
pub ingest_batch_size: u64,
pub validate_wal_contiguity: bool,
}
pub struct WalReceiver {

View File

@@ -537,6 +537,7 @@ impl ConnectionManagerState {
let connect_timeout = self.conf.wal_connect_timeout;
let ingest_batch_size = self.conf.ingest_batch_size;
let protocol = self.conf.protocol;
let validate_wal_contiguity = self.conf.validate_wal_contiguity;
let timeline = Arc::clone(&self.timeline);
let ctx = ctx.detached_child(
TaskKind::WalReceiverConnectionHandler,
@@ -558,6 +559,7 @@ impl ConnectionManagerState {
ctx,
node_id,
ingest_batch_size,
validate_wal_contiguity,
)
.await;
@@ -1563,6 +1565,7 @@ mod tests {
auth_token: None,
availability_zone: None,
ingest_batch_size: 1,
validate_wal_contiguity: false,
},
wal_connection: None,
wal_stream_candidates: HashMap::new(),

View File

@@ -120,6 +120,7 @@ pub(super) async fn handle_walreceiver_connection(
ctx: RequestContext,
safekeeper_node: NodeId,
ingest_batch_size: u64,
validate_wal_contiguity: bool,
) -> Result<(), WalReceiverError> {
debug_assert_current_span_has_tenant_and_timeline_id();
@@ -274,6 +275,7 @@ pub(super) async fn handle_walreceiver_connection(
} => Some((format, compression)),
};
let mut expected_wal_start = startpoint;
while let Some(replication_message) = {
select! {
_ = cancellation.cancelled() => {
@@ -340,13 +342,49 @@ pub(super) async fn handle_walreceiver_connection(
)
})?;
// Guard against WAL gaps. If the start LSN of the PG WAL section
// from which the interpreted records were extracted, doesn't match
// the end of the previous batch (or the starting point for the first batch),
// then kill this WAL receiver connection and start a new one.
if validate_wal_contiguity {
if let Some(raw_wal_start_lsn) = batch.raw_wal_start_lsn {
match raw_wal_start_lsn.cmp(&expected_wal_start) {
std::cmp::Ordering::Greater => {
let msg = format!(
"Gap in streamed WAL: [{}, {})",
expected_wal_start, raw_wal_start_lsn
);
critical!("{msg}");
return Err(WalReceiverError::Other(anyhow!(msg)));
}
std::cmp::Ordering::Less => {
// Other shards are reading WAL behind us.
// This is valid, but check that we received records
// that we haven't seen before.
if let Some(first_rec) = batch.records.first() {
if first_rec.next_record_lsn < last_rec_lsn {
let msg = format!(
"Received record with next_record_lsn multiple times ({} < {})",
first_rec.next_record_lsn, expected_wal_start
);
critical!("{msg}");
return Err(WalReceiverError::Other(anyhow!(msg)));
}
}
}
std::cmp::Ordering::Equal => {}
}
}
}
let InterpretedWalRecords {
records,
next_record_lsn,
raw_wal_start_lsn: _,
} = batch;
tracing::debug!(
"Received WAL up to {} with next_record_lsn={:?}",
"Received WAL up to {} with next_record_lsn={}",
streaming_lsn,
next_record_lsn
);
@@ -423,12 +461,11 @@ pub(super) async fn handle_walreceiver_connection(
// need to advance last record LSN on all shards. If we've not ingested the latest
// record, then set the LSN of the modification past it. This way all shards
// advance their last record LSN at the same time.
let needs_last_record_lsn_advance = match next_record_lsn {
Some(lsn) if lsn > modification.get_lsn() => {
modification.set_lsn(lsn).unwrap();
true
}
_ => false,
let needs_last_record_lsn_advance = if next_record_lsn > modification.get_lsn() {
modification.set_lsn(next_record_lsn).unwrap();
true
} else {
false
};
if uncommitted_records > 0 || needs_last_record_lsn_advance {
@@ -446,9 +483,8 @@ pub(super) async fn handle_walreceiver_connection(
timeline.get_last_record_lsn()
);
if let Some(lsn) = next_record_lsn {
last_rec_lsn = lsn;
}
last_rec_lsn = next_record_lsn;
expected_wal_start = streaming_lsn;
Some(streaming_lsn)
}

View File

@@ -474,8 +474,7 @@ readahead_buffer_resize(int newsize, void *extra)
*/
if (MyPState->n_requests_inflight > newsize)
{
Assert(MyPState->ring_unused >= MyPState->n_requests_inflight - newsize);
prefetch_wait_for(MyPState->ring_unused - (MyPState->n_requests_inflight - newsize));
prefetch_wait_for(MyPState->ring_unused - newsize - 1);
Assert(MyPState->n_requests_inflight <= newsize);
}

View File

@@ -1,5 +1,5 @@
[toolchain]
channel = "1.84.1"
channel = "1.85.0"
profile = "default"
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
# https://rust-lang.github.io/rustup/concepts/profiles.html

View File

@@ -295,6 +295,10 @@ impl InterpretedWalReader {
let mut wal_decoder = WalStreamDecoder::new(start_pos, self.pg_version);
// Tracks the start of the PG WAL LSN from which the current batch of
// interpreted records originated.
let mut current_batch_wal_start_lsn: Option<Lsn> = None;
loop {
tokio::select! {
// Main branch for reading WAL and forwarding it
@@ -302,7 +306,7 @@ impl InterpretedWalReader {
let wal = wal_or_reset.map(|wor| wor.get_wal().expect("reset handled in select branch below"));
let WalBytes {
wal,
wal_start_lsn: _,
wal_start_lsn,
wal_end_lsn,
available_wal_end_lsn,
} = match wal {
@@ -315,6 +319,12 @@ impl InterpretedWalReader {
}
};
// We will already have a value if the previous chunks of WAL
// did not decode into anything useful.
if current_batch_wal_start_lsn.is_none() {
current_batch_wal_start_lsn = Some(wal_start_lsn);
}
wal_decoder.feed_bytes(&wal);
// Deserialize and interpret WAL records from this batch of WAL.
@@ -363,7 +373,9 @@ impl InterpretedWalReader {
let max_next_record_lsn = match max_next_record_lsn {
Some(lsn) => lsn,
None => { continue; }
None => {
continue;
}
};
// Update the current position such that new receivers can decide
@@ -377,21 +389,38 @@ impl InterpretedWalReader {
}
}
let batch_wal_start_lsn = current_batch_wal_start_lsn.take().unwrap();
// Send interpreted records downstream. Anything that has already been seen
// by a shard is filtered out.
let mut shard_senders_to_remove = Vec::new();
for (shard, states) in &mut self.shard_senders {
for state in states {
if max_next_record_lsn <= state.next_record_lsn {
continue;
}
let shard_sender_id = ShardSenderId::new(*shard, state.sender_id);
let records = records_by_sender.remove(&shard_sender_id).unwrap_or_default();
let batch = InterpretedWalRecords {
records,
next_record_lsn: Some(max_next_record_lsn),
let batch = if max_next_record_lsn > state.next_record_lsn {
// This batch contains at least one record that this shard has not
// seen yet.
let records = records_by_sender.remove(&shard_sender_id).unwrap_or_default();
InterpretedWalRecords {
records,
next_record_lsn: max_next_record_lsn,
raw_wal_start_lsn: Some(batch_wal_start_lsn),
}
} else if wal_end_lsn > state.next_record_lsn {
// All the records in this batch were seen by the shard
// However, the batch maps to a chunk of WAL that the
// shard has not yet seen. Notify it of the start LSN
// of the PG WAL chunk such that it doesn't look like a gap.
InterpretedWalRecords {
records: Vec::default(),
next_record_lsn: state.next_record_lsn,
raw_wal_start_lsn: Some(batch_wal_start_lsn),
}
} else {
// The shard has seen this chunk of WAL before. Skip it.
continue;
};
let res = state.tx.send(Batch {
@@ -403,7 +432,7 @@ impl InterpretedWalReader {
if res.is_err() {
shard_senders_to_remove.push(shard_sender_id);
} else {
state.next_record_lsn = max_next_record_lsn;
state.next_record_lsn = std::cmp::max(state.next_record_lsn, max_next_record_lsn);
}
}
}

View File

@@ -0,0 +1 @@
ALTER TABLE nodes DROP listen_https_port;

View File

@@ -0,0 +1 @@
ALTER TABLE nodes ADD listen_https_port INTEGER;

View File

@@ -126,6 +126,10 @@ struct Cli {
#[arg(long)]
long_reconcile_threshold: Option<humantime::Duration>,
// Flag to use https for requests to pageserver API.
#[arg(long, default_value = "false")]
use_https_pageserver_api: bool,
}
enum StrictMode {
@@ -321,6 +325,7 @@ async fn async_main() -> anyhow::Result<()> {
address_for_peers: args.address_for_peers,
start_as_candidate: args.start_as_candidate,
http_service_port: args.listen.port() as i32,
use_https_pageserver_api: args.use_https_pageserver_api,
};
// Validate that we can connect to the database

View File

@@ -1,5 +1,6 @@
use std::{str::FromStr, time::Duration};
use anyhow::anyhow;
use pageserver_api::{
controller_api::{
AvailabilityZone, NodeAvailability, NodeDescribeResponse, NodeRegisterRequest,
@@ -32,12 +33,16 @@ pub(crate) struct Node {
listen_http_addr: String,
listen_http_port: u16,
listen_https_port: Option<u16>,
listen_pg_addr: String,
listen_pg_port: u16,
availability_zone_id: AvailabilityZone,
// Flag from storcon's config to use https for pageserver admin API.
// Invariant: if |true|, listen_https_port should contain a value.
use_https: bool,
// This cancellation token means "stop any RPCs in flight to this node, and don't start
// any more". It is not related to process shutdown.
#[serde(skip)]
@@ -56,7 +61,16 @@ pub(crate) enum AvailabilityTransition {
impl Node {
pub(crate) fn base_url(&self) -> String {
format!("http://{}:{}", self.listen_http_addr, self.listen_http_port)
if self.use_https {
format!(
"https://{}:{}",
self.listen_http_addr,
self.listen_https_port
.expect("https port should be specified if use_https is on")
)
} else {
format!("http://{}:{}", self.listen_http_addr, self.listen_http_port)
}
}
pub(crate) fn get_id(&self) -> NodeId {
@@ -82,11 +96,20 @@ impl Node {
self.id == register_req.node_id
&& self.listen_http_addr == register_req.listen_http_addr
&& self.listen_http_port == register_req.listen_http_port
// Note: listen_https_port may change. See [`Self::need_update`] for mode details.
// && self.listen_https_port == register_req.listen_https_port
&& self.listen_pg_addr == register_req.listen_pg_addr
&& self.listen_pg_port == register_req.listen_pg_port
&& self.availability_zone_id == register_req.availability_zone_id
}
// Do we need to update an existing record in DB on this registration request?
pub(crate) fn need_update(&self, register_req: &NodeRegisterRequest) -> bool {
// listen_https_port is checked here because it may change during migration to https.
// After migration, this check may be moved to registration_match.
self.listen_https_port != register_req.listen_https_port
}
/// For a shard located on this node, populate a response object
/// with this node's address information.
pub(crate) fn shard_location(&self, shard_id: TenantShardId) -> TenantLocateResponseShard {
@@ -95,6 +118,7 @@ impl Node {
node_id: self.id,
listen_http_addr: self.listen_http_addr.clone(),
listen_http_port: self.listen_http_port,
listen_https_port: self.listen_https_port,
listen_pg_addr: self.listen_pg_addr.clone(),
listen_pg_port: self.listen_pg_port,
}
@@ -175,25 +199,34 @@ impl Node {
}
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
id: NodeId,
listen_http_addr: String,
listen_http_port: u16,
listen_https_port: Option<u16>,
listen_pg_addr: String,
listen_pg_port: u16,
availability_zone_id: AvailabilityZone,
) -> Self {
Self {
use_https: bool,
) -> anyhow::Result<Self> {
if use_https && listen_https_port.is_none() {
return Err(anyhow!("https is enabled, but node has no https port"));
}
Ok(Self {
id,
listen_http_addr,
listen_http_port,
listen_https_port,
listen_pg_addr,
listen_pg_port,
scheduling: NodeSchedulingPolicy::Active,
availability: NodeAvailability::Offline,
availability_zone_id,
use_https,
cancel: CancellationToken::new(),
}
})
}
pub(crate) fn to_persistent(&self) -> NodePersistence {
@@ -202,14 +235,19 @@ impl Node {
scheduling_policy: self.scheduling.into(),
listen_http_addr: self.listen_http_addr.clone(),
listen_http_port: self.listen_http_port as i32,
listen_https_port: self.listen_https_port.map(|x| x as i32),
listen_pg_addr: self.listen_pg_addr.clone(),
listen_pg_port: self.listen_pg_port as i32,
availability_zone_id: self.availability_zone_id.0.clone(),
}
}
pub(crate) fn from_persistent(np: NodePersistence) -> Self {
Self {
pub(crate) fn from_persistent(np: NodePersistence, use_https: bool) -> anyhow::Result<Self> {
if use_https && np.listen_https_port.is_none() {
return Err(anyhow!("https is enabled, but node has no https port"));
}
Ok(Self {
id: NodeId(np.node_id as u64),
// At startup we consider a node offline until proven otherwise.
availability: NodeAvailability::Offline,
@@ -217,11 +255,13 @@ impl Node {
.expect("Bad scheduling policy in DB"),
listen_http_addr: np.listen_http_addr,
listen_http_port: np.listen_http_port as u16,
listen_https_port: np.listen_https_port.map(|x| x as u16),
listen_pg_addr: np.listen_pg_addr,
listen_pg_port: np.listen_pg_port as u16,
availability_zone_id: AvailabilityZone(np.availability_zone_id),
use_https,
cancel: CancellationToken::new(),
}
})
}
/// Wrapper for issuing requests to pageserver management API: takes care of generic
@@ -285,8 +325,9 @@ impl Node {
warn_threshold,
max_retries,
&format!(
"Call to node {} ({}:{}) management API",
self.id, self.listen_http_addr, self.listen_http_port
"Call to node {} ({}) management API",
self.id,
self.base_url(),
),
cancel,
)
@@ -302,6 +343,7 @@ impl Node {
availability_zone_id: self.availability_zone_id.0.clone(),
listen_http_addr: self.listen_http_addr.clone(),
listen_http_port: self.listen_http_port,
listen_https_port: self.listen_https_port,
listen_pg_addr: self.listen_pg_addr.clone(),
listen_pg_port: self.listen_pg_port,
}

View File

@@ -375,18 +375,23 @@ impl Persistence {
Ok(nodes)
}
pub(crate) async fn update_node(
pub(crate) async fn update_node<V>(
&self,
input_node_id: NodeId,
input_scheduling: NodeSchedulingPolicy,
) -> DatabaseResult<()> {
values: V,
) -> DatabaseResult<()>
where
V: diesel::AsChangeset<Target = crate::schema::nodes::table> + Clone + Send + Sync,
V::Changeset: diesel::query_builder::QueryFragment<diesel::pg::Pg> + Send, // valid Postgres SQL
{
use crate::schema::nodes::dsl::*;
let updated = self
.with_measured_conn(DatabaseOperation::UpdateNode, move |conn| {
let values = values.clone();
Box::pin(async move {
let updated = diesel::update(nodes)
.filter(node_id.eq(input_node_id.0 as i64))
.set((scheduling_policy.eq(String::from(input_scheduling)),))
.set(values)
.execute(conn)
.await?;
Ok(updated)
@@ -403,6 +408,32 @@ impl Persistence {
}
}
pub(crate) async fn update_node_scheduling_policy(
&self,
input_node_id: NodeId,
input_scheduling: NodeSchedulingPolicy,
) -> DatabaseResult<()> {
use crate::schema::nodes::dsl::*;
self.update_node(
input_node_id,
scheduling_policy.eq(String::from(input_scheduling)),
)
.await
}
pub(crate) async fn update_node_on_registration(
&self,
input_node_id: NodeId,
input_https_port: Option<u16>,
) -> DatabaseResult<()> {
use crate::schema::nodes::dsl::*;
self.update_node(
input_node_id,
listen_https_port.eq(input_https_port.map(|x| x as i32)),
)
.await
}
/// At startup, load the high level state for shards, such as their config + policy. This will
/// be enriched at runtime with state discovered on pageservers.
///
@@ -1452,6 +1483,7 @@ pub(crate) struct NodePersistence {
pub(crate) listen_pg_addr: String,
pub(crate) listen_pg_port: i32,
pub(crate) availability_zone_id: String,
pub(crate) listen_https_port: Option<i32>,
}
/// Tenant metadata health status that are stored durably.

View File

@@ -930,13 +930,16 @@ pub(crate) mod test_utils {
NodeId(i),
format!("httphost-{i}"),
80 + i as u16,
None,
format!("pghost-{i}"),
5432 + i as u16,
az_iter
.next()
.cloned()
.unwrap_or(AvailabilityZone("test-az".to_string())),
);
false,
)
.unwrap();
node.set_availability(NodeAvailability::Active(test_utilization::simple(0, 0)));
assert!(node.is_available());
node

View File

@@ -26,6 +26,7 @@ diesel::table! {
listen_pg_addr -> Varchar,
listen_pg_port -> Int4,
availability_zone_id -> Varchar,
listen_https_port -> Nullable<Int4>,
}
}

View File

@@ -399,6 +399,8 @@ pub struct Config {
pub http_service_port: i32,
pub long_reconcile_threshold: Duration,
pub use_https_pageserver_api: bool,
}
impl From<DatabaseError> for ApiError {
@@ -1401,8 +1403,8 @@ impl Service {
.list_nodes()
.await?
.into_iter()
.map(Node::from_persistent)
.collect::<Vec<_>>();
.map(|x| Node::from_persistent(x, config.use_https_pageserver_api))
.collect::<anyhow::Result<Vec<Node>>>()?;
let nodes: HashMap<NodeId, Node> = nodes.into_iter().map(|n| (n.get_id(), n)).collect();
tracing::info!("Loaded {} nodes from database.", nodes.len());
metrics::METRICS_REGISTRY
@@ -1501,10 +1503,13 @@ impl Service {
NodeId(node_id as u64),
"".to_string(),
123,
None,
"".to_string(),
123,
AvailabilityZone("test_az".to_string()),
);
false,
)
.unwrap();
scheduler.node_upsert(&node);
}
@@ -5907,8 +5912,10 @@ impl Service {
)
.await;
#[derive(PartialEq)]
enum RegistrationStatus {
Matched,
UpToDate,
NeedUpdate,
Mismatched,
New,
}
@@ -5917,7 +5924,11 @@ impl Service {
let locked = self.inner.read().unwrap();
if let Some(node) = locked.nodes.get(&register_req.node_id) {
if node.registration_match(&register_req) {
RegistrationStatus::Matched
if node.need_update(&register_req) {
RegistrationStatus::NeedUpdate
} else {
RegistrationStatus::UpToDate
}
} else {
RegistrationStatus::Mismatched
}
@@ -5927,9 +5938,9 @@ impl Service {
};
match registration_status {
RegistrationStatus::Matched => {
RegistrationStatus::UpToDate => {
tracing::info!(
"Node {} re-registered with matching address",
"Node {} re-registered with matching address and is up to date",
register_req.node_id
);
@@ -5947,7 +5958,7 @@ impl Service {
"Node is already registered with different address".to_string(),
));
}
RegistrationStatus::New => {
RegistrationStatus::New | RegistrationStatus::NeedUpdate => {
// fallthrough
}
}
@@ -5976,6 +5987,16 @@ impl Service {
));
}
if self.config.use_https_pageserver_api && register_req.listen_https_port.is_none() {
return Err(ApiError::PreconditionFailed(
format!(
"Node {} has no https port, but use_https is enabled",
register_req.node_id
)
.into(),
));
}
// Ordering: we must persist the new node _before_ adding it to in-memory state.
// This ensures that before we use it for anything or expose it via any external
// API, it is guaranteed to be available after a restart.
@@ -5983,13 +6004,29 @@ impl Service {
register_req.node_id,
register_req.listen_http_addr,
register_req.listen_http_port,
register_req.listen_https_port,
register_req.listen_pg_addr,
register_req.listen_pg_port,
register_req.availability_zone_id.clone(),
self.config.use_https_pageserver_api,
);
let new_node = match new_node {
Ok(new_node) => new_node,
Err(error) => return Err(ApiError::InternalServerError(error)),
};
// TODO: idempotency if the node already exists in the database
self.persistence.insert_node(&new_node).await?;
match registration_status {
RegistrationStatus::New => self.persistence.insert_node(&new_node).await?,
RegistrationStatus::NeedUpdate => {
self.persistence
.update_node_on_registration(
register_req.node_id,
register_req.listen_https_port,
)
.await?
}
_ => unreachable!("Other statuses have been processed earlier"),
}
let mut locked = self.inner.write().unwrap();
let mut new_nodes = (*locked.nodes).clone();
@@ -6004,12 +6041,24 @@ impl Service {
.storage_controller_pageserver_nodes
.set(locked.nodes.len() as i64);
tracing::info!(
"Registered pageserver {} ({}), now have {} pageservers",
register_req.node_id,
register_req.availability_zone_id,
locked.nodes.len()
);
match registration_status {
RegistrationStatus::New => {
tracing::info!(
"Registered pageserver {} ({}), now have {} pageservers",
register_req.node_id,
register_req.availability_zone_id,
locked.nodes.len()
);
}
RegistrationStatus::NeedUpdate => {
tracing::info!(
"Re-registered and updated node {} ({})",
register_req.node_id,
register_req.availability_zone_id,
);
}
_ => unreachable!("Other statuses have been processed earlier"),
}
Ok(())
}
@@ -6027,7 +6076,9 @@ impl Service {
if let Some(scheduling) = scheduling {
// Scheduling is a persistent part of Node: we must write updates to the database before
// applying them in memory
self.persistence.update_node(node_id, scheduling).await?;
self.persistence
.update_node_scheduling_policy(node_id, scheduling)
.await?;
}
// If we're activating a node, then before setting it active we must reconcile any shard locations

View File

@@ -1167,15 +1167,15 @@ class NeonEnv:
"max_batch_size": 32,
}
# Concurrent IO (https://github.com/neondatabase/neon/issues/9378):
# enable concurrent IO by default in tests and benchmarks.
# Compat tests are exempt because old versions fail to parse the new config.
get_vectored_concurrent_io = self.pageserver_get_vectored_concurrent_io
if config.test_may_use_compatibility_snapshot_binaries:
log.info(
"Forcing use of binary-built-in default to avoid forward-compatibility related test failures"
"Skipping WAL contiguity validation to avoid forward-compatibility related test failures"
)
get_vectored_concurrent_io = None
else:
# Look for gaps in WAL received from safekeepeers
ps_cfg["validate_wal_contiguity"] = True
get_vectored_concurrent_io = self.pageserver_get_vectored_concurrent_io
if get_vectored_concurrent_io is not None:
ps_cfg["get_vectored_concurrent_io"] = {
"mode": self.pageserver_get_vectored_concurrent_io,
@@ -1630,6 +1630,7 @@ def neon_env_builder(
class PageserverPort:
pg: int
http: int
https: int | None = None
class LogUtils:
@@ -1886,6 +1887,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
"node_id": int(node.id),
"listen_http_addr": "localhost",
"listen_http_port": node.service_port.http,
"listen_https_port": node.service_port.https,
"listen_pg_addr": "localhost",
"listen_pg_port": node.service_port.pg,
"availability_zone_id": node.az_id,

View File

@@ -3764,3 +3764,56 @@ def test_storage_controller_node_flap_detach_race(
assert len(locs) == 1, f"{shard} has {len(locs)} attached locations"
wait_until(validate_locations, timeout=10)
def test_update_node_on_registration(neon_env_builder: NeonEnvBuilder):
"""
Check that storage controller handles node_register requests with updated fields correctly.
1. Run storage controller and register 1 pageserver without https port.
2. Register the same pageserver with https port. Check that port has been updated.
3. Restart the storage controller. Check that https port is persistent.
4. Register the same pageserver without https port again (rollback). Check that port has been removed.
"""
neon_env_builder.num_pageservers = 1
env = neon_env_builder.init_configs()
env.storage_controller.start()
env.storage_controller.wait_until_ready()
pageserver = env.pageservers[0]
# Step 1. Register pageserver without https port.
env.storage_controller.node_register(pageserver)
env.storage_controller.consistency_check()
nodes = env.storage_controller.node_list()
assert len(nodes) == 1
assert nodes[0]["listen_https_port"] is None
# Step 2. Register pageserver with https port.
pageserver.service_port.https = 1234
env.storage_controller.node_register(pageserver)
env.storage_controller.consistency_check()
nodes = env.storage_controller.node_list()
assert len(nodes) == 1
assert nodes[0]["listen_https_port"] == 1234
# Step 3. Restart storage controller.
env.storage_controller.stop()
env.storage_controller.start()
env.storage_controller.wait_until_ready()
env.storage_controller.consistency_check()
nodes = env.storage_controller.node_list()
assert len(nodes) == 1
assert nodes[0]["listen_https_port"] == 1234
# Step 4. Register pageserver with no https port again.
pageserver.service_port.https = None
env.storage_controller.node_register(pageserver)
env.storage_controller.consistency_check()
nodes = env.storage_controller.node_list()
assert len(nodes) == 1
assert nodes[0]["listen_https_port"] is None

View File

@@ -1,9 +1,10 @@
from __future__ import annotations
import threading
import time
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from fixtures.neon_fixtures import NeonEnv, logical_replication_sync
from fixtures.utils import query_scalar, wait_until
@@ -239,3 +240,173 @@ def test_subscriber_branching(neon_simple_env: NeonEnv):
res = scur_postgres.fetchall()
assert len(res) == 1
assert str(sub_child_2_timeline_id) == res[0][0]
def test_multiple_subscription_branching(neon_simple_env: NeonEnv):
"""
Test that compute_ctl can handle concurrent deletion of subscriptions in a multiple databases
"""
env = neon_simple_env
NUMBER_OF_DBS = 5
# Create and start endpoint so that neon_local put all the generated
# stuff into the spec.json file.
endpoint = env.endpoints.create_start(
"main",
config_lines=[
"max_replication_slots = 10",
"max_logical_replication_workers=10",
"max_worker_processes=10",
],
)
TEST_DB_NAMES = [
{
"name": "neondb",
"owner": "cloud_admin",
},
{
"name": "publisher_db",
"owner": "cloud_admin",
},
]
for i in range(NUMBER_OF_DBS):
TEST_DB_NAMES.append(
{
"name": f"db{i}",
"owner": "cloud_admin",
}
)
# Update the spec.json file to create the databases
# and reconfigure the endpoint to apply the changes.
endpoint.respec_deep(
**{
"skip_pg_catalog_updates": False,
"cluster": {
"databases": TEST_DB_NAMES,
},
}
)
endpoint.reconfigure()
connstr = endpoint.connstr(dbname="publisher_db").replace("'", "''")
# create table, replication and subscription for each of the databases
with endpoint.cursor(dbname="publisher_db") as publisher_cursor:
for i in range(NUMBER_OF_DBS):
publisher_cursor.execute(f"CREATE TABLE t{i}(a int)")
publisher_cursor.execute(f"CREATE PUBLICATION mypub{i} FOR TABLE t{i}")
publisher_cursor.execute(
f"select pg_catalog.pg_create_logical_replication_slot('mysub{i}', 'pgoutput');"
)
publisher_cursor.execute(f"INSERT INTO t{i} VALUES ({i})")
with endpoint.cursor(dbname=f"db{i}") as cursor:
cursor.execute(f"CREATE TABLE t{i}(a int)")
cursor.execute(
f"CREATE SUBSCRIPTION mysub{i} CONNECTION '{connstr}' PUBLICATION mypub{i} WITH (create_slot = false) "
)
# wait for the subscription to be active
for i in range(NUMBER_OF_DBS):
logical_replication_sync(
endpoint,
endpoint,
f"mysub{i}",
sub_dbname=f"db{i}",
pub_dbname="publisher_db",
)
# Check that replication is working
for i in range(NUMBER_OF_DBS):
with endpoint.cursor(dbname=f"db{i}") as cursor:
cursor.execute(f"SELECT * FROM t{i}")
rows = cursor.fetchall()
assert len(rows) == 1
assert rows[0][0] == i
last_insert_lsn = query_scalar(cursor, "select pg_current_wal_insert_lsn();")
def start_publisher_workload(table_num: int, duration: int):
start = time.time()
with endpoint.cursor(dbname="publisher_db") as cur:
while time.time() - start < duration:
cur.execute(f"INSERT INTO t{i} SELECT FROM generate_series(1,1000)")
LOAD_DURATION = 5
threads = [
threading.Thread(target=start_publisher_workload, args=(i, LOAD_DURATION))
for i in range(NUMBER_OF_DBS)
]
for thread in threads:
thread.start()
sub_child_1_timeline_id = env.create_branch(
"subscriber_child_1",
ancestor_branch_name="main",
ancestor_start_lsn=last_insert_lsn,
)
sub_child_1 = env.endpoints.create("subscriber_child_1")
sub_child_1.respec(
skip_pg_catalog_updates=False,
reconfigure_concurrency=5,
drop_subscriptions_before_start=True,
cluster={
"databases": TEST_DB_NAMES,
"roles": [],
},
)
sub_child_1.start()
# ensure that subscription deletion happened on this timeline
with sub_child_1.cursor() as scur_postgres:
scur_postgres.execute("SELECT timeline_id from neon.drop_subscriptions_done")
res = scur_postgres.fetchall()
log.info(f"res = {res}")
assert len(res) == 1
assert str(sub_child_1_timeline_id) == res[0][0]
# ensure that there are no subscriptions in the databases
for i in range(NUMBER_OF_DBS):
with sub_child_1.cursor(dbname=f"db{i}") as cursor:
cursor.execute("SELECT * FROM pg_catalog.pg_subscription")
res = cursor.fetchall()
assert len(res) == 0
# ensure that there are no unexpected rows in the tables
cursor.execute(f"SELECT * FROM t{i}")
rows = cursor.fetchall()
assert len(rows) == 1
assert rows[0][0] == i
for thread in threads:
thread.join()
# ensure that logical replication is still working in main endpoint
# wait for it to catch up
for i in range(NUMBER_OF_DBS):
logical_replication_sync(
endpoint,
endpoint,
f"mysub{i}",
sub_dbname=f"db{i}",
pub_dbname="publisher_db",
)
# verify that the data is the same in publisher and subscriber tables
with endpoint.cursor(dbname="publisher_db") as publisher_cursor:
for i in range(NUMBER_OF_DBS):
with endpoint.cursor(dbname=f"db{i}") as cursor:
publisher_cursor.execute(f"SELECT count(*) FROM t{i}")
cursor.execute(f"SELECT count(*) FROM t{i}")
pub_res = publisher_cursor.fetchone()
sub_res = cursor.fetchone()
log.info(f"for table t{i}: pub_res = {pub_res}, sub_res = {sub_res}")
assert pub_res == sub_res

View File

@@ -5,11 +5,11 @@
],
"v16": [
"16.8",
"6cb8d22079570b50fcaff29124d40807c1e63a82"
"261ed10e9b8c8dda01ad7aefb18e944e30aa161d"
],
"v15": [
"15.12",
"023f1020ecb07af3bb0ddbf4622e1a3c3fa276a4"
"6ff50443773b69749e16da6db9d4f4b19064b4b7"
],
"v14": [
"14.17",