diff --git a/Cargo.lock b/Cargo.lock index 12232eaece..f62026696e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7616,6 +7616,7 @@ dependencies = [ "once_cell", "pin-project-lite", "postgres_connection", + "pprof", "pq_proto", "rand 0.8.5", "regex", diff --git a/build-tools.Dockerfile b/build-tools.Dockerfile index 317eded26e..c103ceaea5 100644 --- a/build-tools.Dockerfile +++ b/build-tools.Dockerfile @@ -292,7 +292,7 @@ WORKDIR /home/nonroot # Rust # Please keep the version of llvm (installed above) in sync with rust llvm (`rustc --version --verbose | grep LLVM`) -ENV RUSTC_VERSION=1.84.1 +ENV RUSTC_VERSION=1.85.0 ENV RUSTUP_HOME="/home/nonroot/.rustup" ENV PATH="/home/nonroot/.cargo/bin:${PATH}" ARG RUSTFILT_VERSION=0.2.1 diff --git a/compute/compute-node.Dockerfile b/compute/compute-node.Dockerfile index a96d2aa70f..f60a1f594f 100644 --- a/compute/compute-node.Dockerfile +++ b/compute/compute-node.Dockerfile @@ -1669,7 +1669,11 @@ COPY --from=pg_anon-build /usr/local/pgsql/ /usr/local/pgsql/ COPY --from=pg_ivm-build /usr/local/pgsql/ /usr/local/pgsql/ COPY --from=pg_partman-build /usr/local/pgsql/ /usr/local/pgsql/ COPY --from=pg_mooncake-build /usr/local/pgsql/ /usr/local/pgsql/ -COPY --from=pg_duckdb-build /usr/local/pgsql/ /usr/local/pgsql/ + +# Disabled temporarily, because it clashed with pg_mooncake. pg_mooncake +# also depends on libduckdb, but a different version. +#COPY --from=pg_duckdb-build /usr/local/pgsql/ /usr/local/pgsql/ + COPY --from=pg_repack-build /usr/local/pgsql/ /usr/local/pgsql/ COPY --from=pgaudit-build /usr/local/pgsql/ /usr/local/pgsql/ COPY --from=pgauditlogtofile-build /usr/local/pgsql/ /usr/local/pgsql/ diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index a8803ec793..1cdae718fe 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -41,7 +41,6 @@ use std::process::exit; use std::str::FromStr; use std::sync::atomic::Ordering; use std::sync::{mpsc, Arc, Condvar, Mutex, RwLock}; -use std::time::SystemTime; use std::{thread, time::Duration}; use anyhow::{Context, Result}; @@ -86,19 +85,6 @@ fn parse_remote_ext_config(arg: &str) -> Result { } } -/// Generate a compute ID if one is not supplied. This exists to keep forward -/// compatibility tests working, but will be removed in a future iteration. -fn generate_compute_id() -> String { - let now = SystemTime::now(); - - format!( - "compute-{}", - now.duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs() - ) -} - #[derive(Parser)] #[command(rename_all = "kebab-case")] struct Cli { @@ -112,16 +98,13 @@ struct Cli { /// outside the compute will talk to the compute through this port. Keep /// the previous name for this argument around for a smoother release /// with the control plane. - /// - /// TODO: Remove the alias after the control plane release which teaches the - /// control plane about the renamed argument. - #[arg(long, alias = "http-port", default_value_t = 3080)] + #[arg(long, default_value_t = 3080)] pub external_http_port: u16, - /// The port to bind the internal listening HTTP server to. Clients like + /// The port to bind the internal listening HTTP server to. Clients include /// the neon extension (for installing remote extensions) and local_proxy. - #[arg(long)] - pub internal_http_port: Option, + #[arg(long, default_value_t = 3081)] + pub internal_http_port: u16, #[arg(short = 'D', long, value_name = "DATADIR")] pub pgdata: String, @@ -156,7 +139,7 @@ struct Cli { #[arg(short = 'S', long, group = "spec-path")] pub spec_path: Option, - #[arg(short = 'i', long, group = "compute-id", default_value = generate_compute_id())] + #[arg(short = 'i', long, group = "compute-id")] pub compute_id: String, #[arg(short = 'p', long, conflicts_with_all = ["spec", "spec-path"], value_name = "CONTROL_PLANE_API_BASE_URL")] @@ -359,7 +342,7 @@ fn wait_spec( pgbin: cli.pgbin.clone(), pgversion: get_pg_version_string(&cli.pgbin), external_http_port: cli.external_http_port, - internal_http_port: cli.internal_http_port.unwrap_or(cli.external_http_port + 1), + internal_http_port: cli.internal_http_port, live_config_allowed, state: Mutex::new(new_state), state_changed: Condvar::new(), @@ -383,7 +366,7 @@ fn wait_spec( // The internal HTTP server could be launched later, but there isn't much // sense in waiting. - Server::Internal(cli.internal_http_port.unwrap_or(cli.external_http_port + 1)).launch(&compute); + Server::Internal(cli.internal_http_port).launch(&compute); if !spec_set { // No spec provided, hang waiting for it. diff --git a/compute_tools/src/sql/drop_subscriptions.sql b/compute_tools/src/sql/drop_subscriptions.sql index dfb925e48e..03e8e158fa 100644 --- a/compute_tools/src/sql/drop_subscriptions.sql +++ b/compute_tools/src/sql/drop_subscriptions.sql @@ -2,6 +2,7 @@ DO $$ DECLARE subname TEXT; BEGIN + LOCK TABLE pg_subscription IN ACCESS EXCLUSIVE MODE; FOR subname IN SELECT pg_subscription.subname FROM pg_subscription WHERE subdbid = (SELECT oid FROM pg_database WHERE datname = {datname_str}) LOOP EXECUTE format('ALTER SUBSCRIPTION %I DISABLE;', subname); EXECUTE format('ALTER SUBSCRIPTION %I SET (slot_name = NONE);', subname); diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index c3c8229c38..407578abb8 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -46,6 +46,8 @@ use std::process::Command; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; +use std::time::SystemTime; +use std::time::UNIX_EPOCH; use anyhow::{anyhow, bail, Context, Result}; use compute_api::requests::ConfigurationRequest; @@ -59,6 +61,7 @@ use nix::sys::signal::Signal; use pageserver_api::shard::ShardStripeSize; use reqwest::header::CONTENT_TYPE; use serde::{Deserialize, Serialize}; +use tracing::debug; use url::Host; use utils::id::{NodeId, TenantId, TimelineId}; @@ -81,8 +84,10 @@ pub struct EndpointConf { internal_http_port: u16, pg_version: u32, skip_pg_catalog_updates: bool, + reconfigure_concurrency: usize, drop_subscriptions_before_start: bool, features: Vec, + cluster: Option, } // @@ -179,7 +184,9 @@ impl ComputeControlPlane { // we also skip catalog updates in the cloud. skip_pg_catalog_updates, drop_subscriptions_before_start, + reconfigure_concurrency: 1, features: vec![], + cluster: None, }); ep.create_endpoint_dir()?; @@ -196,7 +203,9 @@ impl ComputeControlPlane { pg_version, skip_pg_catalog_updates, drop_subscriptions_before_start, + reconfigure_concurrency: 1, features: vec![], + cluster: None, })?, )?; std::fs::write( @@ -261,8 +270,11 @@ pub struct Endpoint { skip_pg_catalog_updates: bool, drop_subscriptions_before_start: bool, + reconfigure_concurrency: usize, // Feature flags features: Vec, + // Cluster settings + cluster: Option, } #[derive(PartialEq, Eq)] @@ -302,6 +314,8 @@ impl Endpoint { let conf: EndpointConf = serde_json::from_slice(&std::fs::read(entry.path().join("endpoint.json"))?)?; + debug!("serialized endpoint conf: {:?}", conf); + Ok(Endpoint { pg_address: SocketAddr::new(IpAddr::from(Ipv4Addr::LOCALHOST), conf.pg_port), external_http_address: SocketAddr::new( @@ -319,8 +333,10 @@ impl Endpoint { tenant_id: conf.tenant_id, pg_version: conf.pg_version, skip_pg_catalog_updates: conf.skip_pg_catalog_updates, + reconfigure_concurrency: conf.reconfigure_concurrency, drop_subscriptions_before_start: conf.drop_subscriptions_before_start, features: conf.features, + cluster: conf.cluster, }) } @@ -607,7 +623,7 @@ impl Endpoint { }; // Create spec file - let spec = ComputeSpec { + let mut spec = ComputeSpec { skip_pg_catalog_updates: self.skip_pg_catalog_updates, format_version: 1.0, operation_uuid: None, @@ -640,7 +656,7 @@ impl Endpoint { Vec::new() }, settings: None, - postgresql_conf: Some(postgresql_conf), + postgresql_conf: Some(postgresql_conf.clone()), }, delta_operations: None, tenant_id: Some(self.tenant_id), @@ -653,9 +669,35 @@ impl Endpoint { pgbouncer_settings: None, shard_stripe_size: Some(shard_stripe_size), local_proxy_config: None, - reconfigure_concurrency: 1, + reconfigure_concurrency: self.reconfigure_concurrency, drop_subscriptions_before_start: self.drop_subscriptions_before_start, }; + + // this strange code is needed to support respec() in tests + if self.cluster.is_some() { + debug!("Cluster is already set in the endpoint spec, using it"); + spec.cluster = self.cluster.clone().unwrap(); + + debug!("spec.cluster {:?}", spec.cluster); + + // fill missing fields again + if create_test_user { + spec.cluster.roles.push(Role { + name: PgIdent::from_str("test").unwrap(), + encrypted_password: None, + options: None, + }); + spec.cluster.databases.push(Database { + name: PgIdent::from_str("neondb").unwrap(), + owner: PgIdent::from_str("test").unwrap(), + options: None, + restrict_conn: false, + invalid: false, + }); + } + spec.cluster.postgresql_conf = Some(postgresql_conf); + } + let spec_path = self.endpoint_path().join("spec.json"); std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?; @@ -673,18 +715,14 @@ impl Endpoint { println!("Also at '{}'", conn_str); } let mut cmd = Command::new(self.env.neon_distrib_dir.join("compute_ctl")); - //cmd.args([ - // "--external-http-port", - // &self.external_http_address.port().to_string(), - //]) - //.args([ - // "--internal-http-port", - // &self.internal_http_address.port().to_string(), - //]) cmd.args([ - "--http-port", + "--external-http-port", &self.external_http_address.port().to_string(), ]) + .args([ + "--internal-http-port", + &self.internal_http_address.port().to_string(), + ]) .args(["--pgdata", self.pgdata().to_str().unwrap()]) .args(["--connstr", &conn_str]) .args([ @@ -701,20 +739,16 @@ impl Endpoint { ]) // TODO: It would be nice if we generated compute IDs with the same // algorithm as the real control plane. - // - // TODO: Add this back when - // https://github.com/neondatabase/neon/pull/10747 is merged. - // - //.args([ - // "--compute-id", - // &format!( - // "compute-{}", - // SystemTime::now() - // .duration_since(UNIX_EPOCH) - // .unwrap() - // .as_secs() - // ), - //]) + .args([ + "--compute-id", + &format!( + "compute-{}", + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() + ), + ]) .stdin(std::process::Stdio::null()) .stderr(logfile.try_clone()?) .stdout(logfile); diff --git a/control_plane/storcon_cli/src/main.rs b/control_plane/storcon_cli/src/main.rs index 3c574efc63..953ade83ad 100644 --- a/control_plane/storcon_cli/src/main.rs +++ b/control_plane/storcon_cli/src/main.rs @@ -47,6 +47,9 @@ enum Command { listen_http_addr: String, #[arg(long)] listen_http_port: u16, + #[arg(long)] + listen_https_port: Option, + #[arg(long)] availability_zone_id: String, }, @@ -394,6 +397,7 @@ async fn main() -> anyhow::Result<()> { listen_pg_port, listen_http_addr, listen_http_port, + listen_https_port, availability_zone_id, } => { storcon_client @@ -406,6 +410,7 @@ async fn main() -> anyhow::Result<()> { listen_pg_port, listen_http_addr, listen_http_port, + listen_https_port, availability_zone_id: AvailabilityZone(availability_zone_id), }), ) diff --git a/docker-compose/compute_wrapper/shell/compute.sh b/docker-compose/compute_wrapper/shell/compute.sh index b4f8d3d66a..9dbdcce69f 100755 --- a/docker-compose/compute_wrapper/shell/compute.sh +++ b/docker-compose/compute_wrapper/shell/compute.sh @@ -77,4 +77,5 @@ echo "Start compute node" /usr/local/bin/compute_ctl --pgdata /var/db/postgres/compute \ -C "postgresql://cloud_admin@localhost:55433/postgres" \ -b /usr/local/bin/postgres \ + --compute-id "compute-$RANDOM" \ -S ${SPEC_FILE} diff --git a/libs/compute_api/src/spec.rs b/libs/compute_api/src/spec.rs index 767a34bcbc..8fffae92fb 100644 --- a/libs/compute_api/src/spec.rs +++ b/libs/compute_api/src/spec.rs @@ -252,7 +252,7 @@ pub enum ComputeMode { Replica, } -#[derive(Clone, Debug, Default, Deserialize, Serialize)] +#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)] pub struct Cluster { pub cluster_id: Option, pub name: Option, @@ -283,7 +283,7 @@ pub struct DeltaOp { /// Rust representation of Postgres role info with only those fields /// that matter for us. -#[derive(Clone, Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] pub struct Role { pub name: PgIdent, pub encrypted_password: Option, @@ -292,7 +292,7 @@ pub struct Role { /// Rust representation of Postgres database info with only those fields /// that matter for us. -#[derive(Clone, Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] pub struct Database { pub name: PgIdent, pub owner: PgIdent, @@ -308,7 +308,7 @@ pub struct Database { /// Common type representing both SQL statement params with or without value, /// like `LOGIN` or `OWNER username` in the `CREATE/ALTER ROLE`, and config /// options like `wal_level = logical`. -#[derive(Clone, Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)] pub struct GenericOption { pub name: String, pub value: Option, diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index 0f33bcf45b..1aff5a7012 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -122,6 +122,8 @@ pub struct ConfigToml { pub page_service_pipelining: PageServicePipeliningConfig, pub get_vectored_concurrent_io: GetVectoredConcurrentIo, pub enable_read_path_debugging: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub validate_wal_contiguity: Option, } #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -521,6 +523,7 @@ impl Default for ConfigToml { } else { None }, + validate_wal_contiguity: None, } } } diff --git a/libs/pageserver_api/src/controller_api.rs b/libs/pageserver_api/src/controller_api.rs index 42f6e47e63..f94bfab581 100644 --- a/libs/pageserver_api/src/controller_api.rs +++ b/libs/pageserver_api/src/controller_api.rs @@ -57,6 +57,7 @@ pub struct NodeRegisterRequest { pub listen_http_addr: String, pub listen_http_port: u16, + pub listen_https_port: Option, pub availability_zone_id: AvailabilityZone, } @@ -105,6 +106,7 @@ pub struct TenantLocateResponseShard { pub listen_http_addr: String, pub listen_http_port: u16, + pub listen_https_port: Option, } #[derive(Serialize, Deserialize)] @@ -148,6 +150,7 @@ pub struct NodeDescribeResponse { pub listen_http_addr: String, pub listen_http_port: u16, + pub listen_https_port: Option, pub listen_pg_addr: String, pub listen_pg_port: u16, diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index 62e0f4cfba..5020d82adf 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -27,7 +27,7 @@ humantime.workspace = true fail.workspace = true futures = { workspace = true } jsonwebtoken.workspace = true -nix = {workspace = true, features = [ "ioctl" ] } +nix = { workspace = true, features = ["ioctl"] } once_cell.workspace = true pin-project-lite.workspace = true regex.workspace = true @@ -61,6 +61,7 @@ bytes.workspace = true criterion.workspace = true hex-literal.workspace = true camino-tempfile.workspace = true +pprof.workspace = true serde_assert.workspace = true tokio = { workspace = true, features = ["test-util"] } diff --git a/libs/utils/benches/README.md b/libs/utils/benches/README.md new file mode 100644 index 0000000000..e23ec268c2 --- /dev/null +++ b/libs/utils/benches/README.md @@ -0,0 +1,26 @@ +## Utils Benchmarks + +To run benchmarks: + +```sh +# All benchmarks. +cargo bench --package utils + +# Specific file. +cargo bench --package utils --bench benchmarks + +# Specific benchmark. +cargo bench --package utils --bench benchmarks warn_slow/enabled=true + +# List available benchmarks. +cargo bench --package utils --benches -- --list + +# Generate flamegraph profiles using pprof-rs, profiling for 10 seconds. +# Output in target/criterion/*/profile/flamegraph.svg. +cargo bench --package utils --bench benchmarks warn_slow/enabled=true --profile-time 10 +``` + +Additional charts and statistics are available in `target/criterion/report/index.html`. + +Benchmarks are automatically compared against the previous run. To compare against other runs, see +`--baseline` and `--save-baseline`. \ No newline at end of file diff --git a/libs/utils/benches/benchmarks.rs b/libs/utils/benches/benchmarks.rs index 44eb36387c..cff3792f3a 100644 --- a/libs/utils/benches/benchmarks.rs +++ b/libs/utils/benches/benchmarks.rs @@ -1,5 +1,18 @@ -use criterion::{criterion_group, criterion_main, Criterion}; +use std::time::Duration; + +use criterion::{criterion_group, criterion_main, Bencher, Criterion}; +use pprof::criterion::{Output, PProfProfiler}; use utils::id; +use utils::logging::warn_slow; + +// Register benchmarks with Criterion. +criterion_group!( + name = benches; + config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))); + targets = bench_id_stringify, + bench_warn_slow, +); +criterion_main!(benches); pub fn bench_id_stringify(c: &mut Criterion) { // Can only use public methods. @@ -16,5 +29,31 @@ pub fn bench_id_stringify(c: &mut Criterion) { }); } -criterion_group!(benches, bench_id_stringify); -criterion_main!(benches); +pub fn bench_warn_slow(c: &mut Criterion) { + for enabled in [false, true] { + c.bench_function(&format!("warn_slow/enabled={enabled}"), |b| { + run_bench(b, enabled).unwrap() + }); + } + + // The actual benchmark. + fn run_bench(b: &mut Bencher, enabled: bool) -> anyhow::Result<()> { + const THRESHOLD: Duration = Duration::from_secs(1); + + // Use a multi-threaded runtime to avoid thread parking overhead when yielding. + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build()?; + + // Test both with and without warn_slow, since we're essentially measuring Tokio scheduling + // performance too. Use a simple noop future that yields once, to avoid any scheduler fast + // paths for a ready future. + if enabled { + b.iter(|| runtime.block_on(warn_slow("ready", THRESHOLD, tokio::task::yield_now()))); + } else { + b.iter(|| runtime.block_on(tokio::task::yield_now())); + } + + Ok(()) + } +} diff --git a/libs/utils/src/logging.rs b/libs/utils/src/logging.rs index 4a6069294d..95c69ac8ba 100644 --- a/libs/utils/src/logging.rs +++ b/libs/utils/src/logging.rs @@ -1,9 +1,13 @@ +use std::future::Future; use std::str::FromStr; +use std::time::Duration; use anyhow::Context; use metrics::{IntCounter, IntCounterVec}; use once_cell::sync::Lazy; use strum_macros::{EnumString, VariantNames}; +use tokio::time::Instant; +use tracing::warn; /// Logs a critical error, similarly to `tracing::error!`. This will: /// @@ -318,6 +322,41 @@ impl std::fmt::Debug for SecretString { } } +/// Logs a periodic warning if a future is slow to complete. +/// +/// This is performance-sensitive as it's used on the GetPage read path. +#[inline] +pub async fn warn_slow(name: &str, threshold: Duration, f: impl Future) -> O { + // TODO: we unfortunately have to pin the future on the heap, since GetPage futures are huge and + // won't fit on the stack. + let mut f = Box::pin(f); + + let started = Instant::now(); + let mut attempt = 1; + + loop { + // NB: use timeout_at() instead of timeout() to avoid an extra clock reading in the common + // case where the timeout doesn't fire. + let deadline = started + attempt * threshold; + if let Ok(output) = tokio::time::timeout_at(deadline, &mut f).await { + // NB: we check if we exceeded the threshold even if the timeout never fired, because + // scheduling or execution delays may cause the future to succeed even if it exceeds the + // timeout. This costs an extra unconditional clock reading, but seems worth it to avoid + // false negatives. + let elapsed = started.elapsed(); + if elapsed >= threshold { + warn!("slow {name} completed after {:.3}s", elapsed.as_secs_f64()); + } + return output; + } + + let elapsed = started.elapsed().as_secs_f64(); + warn!("slow {name} still running after {elapsed:.3}s",); + + attempt += 1; + } +} + #[cfg(test)] mod tests { use metrics::{core::Opts, IntCounterVec}; diff --git a/libs/wal_decoder/proto/interpreted_wal.proto b/libs/wal_decoder/proto/interpreted_wal.proto index d68484d30f..7b40201a75 100644 --- a/libs/wal_decoder/proto/interpreted_wal.proto +++ b/libs/wal_decoder/proto/interpreted_wal.proto @@ -5,6 +5,7 @@ package interpreted_wal; message InterpretedWalRecords { repeated InterpretedWalRecord records = 1; optional uint64 next_record_lsn = 2; + optional uint64 raw_wal_start_lsn = 3; } message InterpretedWalRecord { diff --git a/libs/wal_decoder/src/models.rs b/libs/wal_decoder/src/models.rs index 51bf7e44ab..7e1934c6c3 100644 --- a/libs/wal_decoder/src/models.rs +++ b/libs/wal_decoder/src/models.rs @@ -60,7 +60,11 @@ pub struct InterpretedWalRecords { pub records: Vec, // Start LSN of the next record after the batch. // Note that said record may not belong to the current shard. - pub next_record_lsn: Option, + pub next_record_lsn: Lsn, + // Inclusive start LSN of the PG WAL from which the interpreted + // WAL records were extracted. Note that this is not necessarily the + // start LSN of the first interpreted record in the batch. + pub raw_wal_start_lsn: Option, } /// An interpreted Postgres WAL record, ready to be handled by the pageserver diff --git a/libs/wal_decoder/src/wire_format.rs b/libs/wal_decoder/src/wire_format.rs index 944ee5c919..52ed5c70b5 100644 --- a/libs/wal_decoder/src/wire_format.rs +++ b/libs/wal_decoder/src/wire_format.rs @@ -167,7 +167,8 @@ impl TryFrom for proto::InterpretedWalRecords { .collect::, _>>()?; Ok(proto::InterpretedWalRecords { records, - next_record_lsn: value.next_record_lsn.map(|l| l.0), + next_record_lsn: Some(value.next_record_lsn.0), + raw_wal_start_lsn: value.raw_wal_start_lsn.map(|l| l.0), }) } } @@ -254,7 +255,11 @@ impl TryFrom for InterpretedWalRecords { Ok(InterpretedWalRecords { records, - next_record_lsn: value.next_record_lsn.map(Lsn::from), + next_record_lsn: value + .next_record_lsn + .map(Lsn::from) + .expect("Always provided"), + raw_wal_start_lsn: value.raw_wal_start_lsn.map(Lsn::from), }) } } diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index fa098e9364..e2b9a7f073 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -134,6 +134,7 @@ fn main() -> anyhow::Result<()> { info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine"); info!(?conf.virtual_file_io_mode, "starting with virtual_file IO mode"); info!(?conf.wal_receiver_protocol, "starting with WAL receiver protocol"); + info!(?conf.validate_wal_contiguity, "starting with WAL contiguity validation"); info!(?conf.page_service_pipelining, "starting with page service pipelining config"); info!(?conf.get_vectored_concurrent_io, "starting with get_vectored IO concurrency config"); diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index c5368f6806..09d9444dd5 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -197,6 +197,10 @@ pub struct PageServerConf { /// Enable read path debugging. If enabled, read key errors will print a backtrace of the layer /// files read. pub enable_read_path_debugging: bool, + + /// Interpreted protocol feature: if enabled, validate that the logical WAL received from + /// safekeepers does not have gaps. + pub validate_wal_contiguity: bool, } /// Token for authentication to safekeepers @@ -360,6 +364,7 @@ impl PageServerConf { page_service_pipelining, get_vectored_concurrent_io, enable_read_path_debugging, + validate_wal_contiguity, } = config_toml; let mut conf = PageServerConf { @@ -446,6 +451,7 @@ impl PageServerConf { virtual_file_io_mode: virtual_file_io_mode.unwrap_or(virtual_file::IoMode::preferred()), no_sync: no_sync.unwrap_or(false), enable_read_path_debugging: enable_read_path_debugging.unwrap_or(false), + validate_wal_contiguity: validate_wal_contiguity.unwrap_or(false), }; // ------------------------------------------------------------ diff --git a/pageserver/src/controller_upcall_client.rs b/pageserver/src/controller_upcall_client.rs index d41bfd9021..4990f17b40 100644 --- a/pageserver/src/controller_upcall_client.rs +++ b/pageserver/src/controller_upcall_client.rs @@ -173,6 +173,7 @@ impl ControlPlaneGenerationsApi for ControllerUpcallClient { listen_pg_port: m.postgres_port, listen_http_addr: m.http_host, listen_http_port: m.http_port, + listen_https_port: None, // TODO: Support https. availability_zone_id: az_id.expect("Checked above"), }) } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 0c8da6f2a8..7285697040 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -34,11 +34,13 @@ use std::str::FromStr; use std::sync::Arc; use std::time::SystemTime; use std::time::{Duration, Instant}; +use strum_macros::IntoStaticStr; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncWriteExt, BufWriter}; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tracing::*; +use utils::logging::warn_slow; use utils::sync::gate::{Gate, GateGuard}; use utils::sync::spsc_fold; use utils::{ @@ -81,6 +83,9 @@ use std::os::fd::AsRawFd; /// NB: this is a different value than [`crate::http::routes::ACTIVE_TENANT_TIMEOUT`]. const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000); +/// Threshold at which to log a warning about slow GetPage requests. +const WARN_SLOW_GETPAGE_THRESHOLD: Duration = Duration::from_secs(30); + /////////////////////////////////////////////////////////////////////////////// pub struct Listener { @@ -594,6 +599,7 @@ struct BatchedTestRequest { /// NB: we only hold [`timeline::handle::WeakHandle`] inside this enum, /// so that we don't keep the [`Timeline::gate`] open while the batch /// is being built up inside the [`spsc_fold`] (pagestream pipelining). +#[derive(IntoStaticStr)] enum BatchedFeMessage { Exists { span: Span, @@ -638,6 +644,10 @@ enum BatchedFeMessage { } impl BatchedFeMessage { + fn as_static_str(&self) -> &'static str { + self.into() + } + fn observe_execution_start(&mut self, at: Instant) { match self { BatchedFeMessage::Exists { timer, .. } @@ -1463,17 +1473,20 @@ impl PageServerHandler { } }; - let err = self - .pagesteam_handle_batched_message( + let result = warn_slow( + msg.as_static_str(), + WARN_SLOW_GETPAGE_THRESHOLD, + self.pagesteam_handle_batched_message( pgb_writer, msg, io_concurrency.clone(), &cancel, protocol_version, ctx, - ) - .await; - match err { + ), + ) + .await; + match result { Ok(()) => {} Err(e) => break e, } @@ -1636,13 +1649,17 @@ impl PageServerHandler { return Err(e); } }; - self.pagesteam_handle_batched_message( - pgb_writer, - batch, - io_concurrency.clone(), - &cancel, - protocol_version, - &ctx, + warn_slow( + batch.as_static_str(), + WARN_SLOW_GETPAGE_THRESHOLD, + self.pagesteam_handle_batched_message( + pgb_writer, + batch, + io_concurrency.clone(), + &cancel, + protocol_version, + &ctx, + ), ) .await?; } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index b9425d2777..30de4d90dc 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -2874,6 +2874,7 @@ impl Timeline { auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(), availability_zone: self.conf.availability_zone.clone(), ingest_batch_size: self.conf.ingest_batch_size, + validate_wal_contiguity: self.conf.validate_wal_contiguity, }, broker_client, ctx, diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 58a87dbd5f..0361ce8cd1 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -2212,7 +2212,7 @@ impl Timeline { let sub_compaction_max_job_size_mb = sub_compaction_max_job_size_mb.unwrap_or(GC_COMPACT_MAX_SIZE_MB); - let mut compact_jobs = Vec::new(); + let mut compact_jobs = Vec::::new(); // For now, we simply use the key partitioning information; we should do a more fine-grained partitioning // by estimating the amount of files read for a compaction job. We should also partition on LSN. let ((dense_ks, sparse_ks), _) = self.partitioning.read().as_ref().clone(); @@ -2299,16 +2299,25 @@ impl Timeline { } else { end }; - info!( - "splitting compaction job: {}..{}, estimated_size={}", - start, end, total_size - ); - compact_jobs.push(GcCompactJob { - dry_run: job.dry_run, - compact_key_range: start..end, - compact_lsn_range: job.compact_lsn_range.start..compact_below_lsn, - }); - current_start = Some(end); + if total_size == 0 && !compact_jobs.is_empty() { + info!( + "splitting compaction job: {}..{}, estimated_size={}, extending the previous job", + start, end, total_size + ); + compact_jobs.last_mut().unwrap().compact_key_range.end = end; + current_start = Some(end); + } else { + info!( + "splitting compaction job: {}..{}, estimated_size={}", + start, end, total_size + ); + compact_jobs.push(GcCompactJob { + dry_run: job.dry_run, + compact_key_range: start..end, + compact_lsn_range: job.compact_lsn_range.start..compact_below_lsn, + }); + current_start = Some(end); + } } } Ok(compact_jobs) diff --git a/pageserver/src/tenant/timeline/walreceiver.rs b/pageserver/src/tenant/timeline/walreceiver.rs index f831f5e48a..67429bff98 100644 --- a/pageserver/src/tenant/timeline/walreceiver.rs +++ b/pageserver/src/tenant/timeline/walreceiver.rs @@ -56,6 +56,7 @@ pub struct WalReceiverConf { pub auth_token: Option>, pub availability_zone: Option, pub ingest_batch_size: u64, + pub validate_wal_contiguity: bool, } pub struct WalReceiver { diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 65f9d39078..1955345315 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -537,6 +537,7 @@ impl ConnectionManagerState { let connect_timeout = self.conf.wal_connect_timeout; let ingest_batch_size = self.conf.ingest_batch_size; let protocol = self.conf.protocol; + let validate_wal_contiguity = self.conf.validate_wal_contiguity; let timeline = Arc::clone(&self.timeline); let ctx = ctx.detached_child( TaskKind::WalReceiverConnectionHandler, @@ -558,6 +559,7 @@ impl ConnectionManagerState { ctx, node_id, ingest_batch_size, + validate_wal_contiguity, ) .await; @@ -1563,6 +1565,7 @@ mod tests { auth_token: None, availability_zone: None, ingest_batch_size: 1, + validate_wal_contiguity: false, }, wal_connection: None, wal_stream_candidates: HashMap::new(), diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 23db4f88d2..ff05a8f902 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -120,6 +120,7 @@ pub(super) async fn handle_walreceiver_connection( ctx: RequestContext, safekeeper_node: NodeId, ingest_batch_size: u64, + validate_wal_contiguity: bool, ) -> Result<(), WalReceiverError> { debug_assert_current_span_has_tenant_and_timeline_id(); @@ -274,6 +275,7 @@ pub(super) async fn handle_walreceiver_connection( } => Some((format, compression)), }; + let mut expected_wal_start = startpoint; while let Some(replication_message) = { select! { _ = cancellation.cancelled() => { @@ -340,13 +342,49 @@ pub(super) async fn handle_walreceiver_connection( ) })?; + // Guard against WAL gaps. If the start LSN of the PG WAL section + // from which the interpreted records were extracted, doesn't match + // the end of the previous batch (or the starting point for the first batch), + // then kill this WAL receiver connection and start a new one. + if validate_wal_contiguity { + if let Some(raw_wal_start_lsn) = batch.raw_wal_start_lsn { + match raw_wal_start_lsn.cmp(&expected_wal_start) { + std::cmp::Ordering::Greater => { + let msg = format!( + "Gap in streamed WAL: [{}, {})", + expected_wal_start, raw_wal_start_lsn + ); + critical!("{msg}"); + return Err(WalReceiverError::Other(anyhow!(msg))); + } + std::cmp::Ordering::Less => { + // Other shards are reading WAL behind us. + // This is valid, but check that we received records + // that we haven't seen before. + if let Some(first_rec) = batch.records.first() { + if first_rec.next_record_lsn < last_rec_lsn { + let msg = format!( + "Received record with next_record_lsn multiple times ({} < {})", + first_rec.next_record_lsn, expected_wal_start + ); + critical!("{msg}"); + return Err(WalReceiverError::Other(anyhow!(msg))); + } + } + } + std::cmp::Ordering::Equal => {} + } + } + } + let InterpretedWalRecords { records, next_record_lsn, + raw_wal_start_lsn: _, } = batch; tracing::debug!( - "Received WAL up to {} with next_record_lsn={:?}", + "Received WAL up to {} with next_record_lsn={}", streaming_lsn, next_record_lsn ); @@ -423,12 +461,11 @@ pub(super) async fn handle_walreceiver_connection( // need to advance last record LSN on all shards. If we've not ingested the latest // record, then set the LSN of the modification past it. This way all shards // advance their last record LSN at the same time. - let needs_last_record_lsn_advance = match next_record_lsn { - Some(lsn) if lsn > modification.get_lsn() => { - modification.set_lsn(lsn).unwrap(); - true - } - _ => false, + let needs_last_record_lsn_advance = if next_record_lsn > modification.get_lsn() { + modification.set_lsn(next_record_lsn).unwrap(); + true + } else { + false }; if uncommitted_records > 0 || needs_last_record_lsn_advance { @@ -446,9 +483,8 @@ pub(super) async fn handle_walreceiver_connection( timeline.get_last_record_lsn() ); - if let Some(lsn) = next_record_lsn { - last_rec_lsn = lsn; - } + last_rec_lsn = next_record_lsn; + expected_wal_start = streaming_lsn; Some(streaming_lsn) } diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index f1087a8ccb..6c812f347f 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -474,8 +474,7 @@ readahead_buffer_resize(int newsize, void *extra) */ if (MyPState->n_requests_inflight > newsize) { - Assert(MyPState->ring_unused >= MyPState->n_requests_inflight - newsize); - prefetch_wait_for(MyPState->ring_unused - (MyPState->n_requests_inflight - newsize)); + prefetch_wait_for(MyPState->ring_unused - newsize - 1); Assert(MyPState->n_requests_inflight <= newsize); } diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 38a7f202ba..591d60ea79 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,5 +1,5 @@ [toolchain] -channel = "1.84.1" +channel = "1.85.0" profile = "default" # The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy. # https://rust-lang.github.io/rustup/concepts/profiles.html diff --git a/safekeeper/src/send_interpreted_wal.rs b/safekeeper/src/send_interpreted_wal.rs index 5916675c3f..fb06339604 100644 --- a/safekeeper/src/send_interpreted_wal.rs +++ b/safekeeper/src/send_interpreted_wal.rs @@ -295,6 +295,10 @@ impl InterpretedWalReader { let mut wal_decoder = WalStreamDecoder::new(start_pos, self.pg_version); + // Tracks the start of the PG WAL LSN from which the current batch of + // interpreted records originated. + let mut current_batch_wal_start_lsn: Option = None; + loop { tokio::select! { // Main branch for reading WAL and forwarding it @@ -302,7 +306,7 @@ impl InterpretedWalReader { let wal = wal_or_reset.map(|wor| wor.get_wal().expect("reset handled in select branch below")); let WalBytes { wal, - wal_start_lsn: _, + wal_start_lsn, wal_end_lsn, available_wal_end_lsn, } = match wal { @@ -315,6 +319,12 @@ impl InterpretedWalReader { } }; + // We will already have a value if the previous chunks of WAL + // did not decode into anything useful. + if current_batch_wal_start_lsn.is_none() { + current_batch_wal_start_lsn = Some(wal_start_lsn); + } + wal_decoder.feed_bytes(&wal); // Deserialize and interpret WAL records from this batch of WAL. @@ -363,7 +373,9 @@ impl InterpretedWalReader { let max_next_record_lsn = match max_next_record_lsn { Some(lsn) => lsn, - None => { continue; } + None => { + continue; + } }; // Update the current position such that new receivers can decide @@ -377,21 +389,38 @@ impl InterpretedWalReader { } } + let batch_wal_start_lsn = current_batch_wal_start_lsn.take().unwrap(); + // Send interpreted records downstream. Anything that has already been seen // by a shard is filtered out. let mut shard_senders_to_remove = Vec::new(); for (shard, states) in &mut self.shard_senders { for state in states { - if max_next_record_lsn <= state.next_record_lsn { - continue; - } - let shard_sender_id = ShardSenderId::new(*shard, state.sender_id); - let records = records_by_sender.remove(&shard_sender_id).unwrap_or_default(); - let batch = InterpretedWalRecords { - records, - next_record_lsn: Some(max_next_record_lsn), + let batch = if max_next_record_lsn > state.next_record_lsn { + // This batch contains at least one record that this shard has not + // seen yet. + let records = records_by_sender.remove(&shard_sender_id).unwrap_or_default(); + + InterpretedWalRecords { + records, + next_record_lsn: max_next_record_lsn, + raw_wal_start_lsn: Some(batch_wal_start_lsn), + } + } else if wal_end_lsn > state.next_record_lsn { + // All the records in this batch were seen by the shard + // However, the batch maps to a chunk of WAL that the + // shard has not yet seen. Notify it of the start LSN + // of the PG WAL chunk such that it doesn't look like a gap. + InterpretedWalRecords { + records: Vec::default(), + next_record_lsn: state.next_record_lsn, + raw_wal_start_lsn: Some(batch_wal_start_lsn), + } + } else { + // The shard has seen this chunk of WAL before. Skip it. + continue; }; let res = state.tx.send(Batch { @@ -403,7 +432,7 @@ impl InterpretedWalReader { if res.is_err() { shard_senders_to_remove.push(shard_sender_id); } else { - state.next_record_lsn = max_next_record_lsn; + state.next_record_lsn = std::cmp::max(state.next_record_lsn, max_next_record_lsn); } } } diff --git a/storage_controller/migrations/2025-02-11-144848_pageserver_use_https/down.sql b/storage_controller/migrations/2025-02-11-144848_pageserver_use_https/down.sql new file mode 100644 index 0000000000..0f051d3ac3 --- /dev/null +++ b/storage_controller/migrations/2025-02-11-144848_pageserver_use_https/down.sql @@ -0,0 +1 @@ +ALTER TABLE nodes DROP listen_https_port; diff --git a/storage_controller/migrations/2025-02-11-144848_pageserver_use_https/up.sql b/storage_controller/migrations/2025-02-11-144848_pageserver_use_https/up.sql new file mode 100644 index 0000000000..172237d477 --- /dev/null +++ b/storage_controller/migrations/2025-02-11-144848_pageserver_use_https/up.sql @@ -0,0 +1 @@ +ALTER TABLE nodes ADD listen_https_port INTEGER; diff --git a/storage_controller/src/main.rs b/storage_controller/src/main.rs index 9a9958f7a6..be074d269d 100644 --- a/storage_controller/src/main.rs +++ b/storage_controller/src/main.rs @@ -126,6 +126,10 @@ struct Cli { #[arg(long)] long_reconcile_threshold: Option, + + // Flag to use https for requests to pageserver API. + #[arg(long, default_value = "false")] + use_https_pageserver_api: bool, } enum StrictMode { @@ -321,6 +325,7 @@ async fn async_main() -> anyhow::Result<()> { address_for_peers: args.address_for_peers, start_as_candidate: args.start_as_candidate, http_service_port: args.listen.port() as i32, + use_https_pageserver_api: args.use_https_pageserver_api, }; // Validate that we can connect to the database diff --git a/storage_controller/src/node.rs b/storage_controller/src/node.rs index f5c2d329e0..3762d13c10 100644 --- a/storage_controller/src/node.rs +++ b/storage_controller/src/node.rs @@ -1,5 +1,6 @@ use std::{str::FromStr, time::Duration}; +use anyhow::anyhow; use pageserver_api::{ controller_api::{ AvailabilityZone, NodeAvailability, NodeDescribeResponse, NodeRegisterRequest, @@ -32,12 +33,16 @@ pub(crate) struct Node { listen_http_addr: String, listen_http_port: u16, + listen_https_port: Option, listen_pg_addr: String, listen_pg_port: u16, availability_zone_id: AvailabilityZone, + // Flag from storcon's config to use https for pageserver admin API. + // Invariant: if |true|, listen_https_port should contain a value. + use_https: bool, // This cancellation token means "stop any RPCs in flight to this node, and don't start // any more". It is not related to process shutdown. #[serde(skip)] @@ -56,7 +61,16 @@ pub(crate) enum AvailabilityTransition { impl Node { pub(crate) fn base_url(&self) -> String { - format!("http://{}:{}", self.listen_http_addr, self.listen_http_port) + if self.use_https { + format!( + "https://{}:{}", + self.listen_http_addr, + self.listen_https_port + .expect("https port should be specified if use_https is on") + ) + } else { + format!("http://{}:{}", self.listen_http_addr, self.listen_http_port) + } } pub(crate) fn get_id(&self) -> NodeId { @@ -82,11 +96,20 @@ impl Node { self.id == register_req.node_id && self.listen_http_addr == register_req.listen_http_addr && self.listen_http_port == register_req.listen_http_port + // Note: listen_https_port may change. See [`Self::need_update`] for mode details. + // && self.listen_https_port == register_req.listen_https_port && self.listen_pg_addr == register_req.listen_pg_addr && self.listen_pg_port == register_req.listen_pg_port && self.availability_zone_id == register_req.availability_zone_id } + // Do we need to update an existing record in DB on this registration request? + pub(crate) fn need_update(&self, register_req: &NodeRegisterRequest) -> bool { + // listen_https_port is checked here because it may change during migration to https. + // After migration, this check may be moved to registration_match. + self.listen_https_port != register_req.listen_https_port + } + /// For a shard located on this node, populate a response object /// with this node's address information. pub(crate) fn shard_location(&self, shard_id: TenantShardId) -> TenantLocateResponseShard { @@ -95,6 +118,7 @@ impl Node { node_id: self.id, listen_http_addr: self.listen_http_addr.clone(), listen_http_port: self.listen_http_port, + listen_https_port: self.listen_https_port, listen_pg_addr: self.listen_pg_addr.clone(), listen_pg_port: self.listen_pg_port, } @@ -175,25 +199,34 @@ impl Node { } } + #[allow(clippy::too_many_arguments)] pub(crate) fn new( id: NodeId, listen_http_addr: String, listen_http_port: u16, + listen_https_port: Option, listen_pg_addr: String, listen_pg_port: u16, availability_zone_id: AvailabilityZone, - ) -> Self { - Self { + use_https: bool, + ) -> anyhow::Result { + if use_https && listen_https_port.is_none() { + return Err(anyhow!("https is enabled, but node has no https port")); + } + + Ok(Self { id, listen_http_addr, listen_http_port, + listen_https_port, listen_pg_addr, listen_pg_port, scheduling: NodeSchedulingPolicy::Active, availability: NodeAvailability::Offline, availability_zone_id, + use_https, cancel: CancellationToken::new(), - } + }) } pub(crate) fn to_persistent(&self) -> NodePersistence { @@ -202,14 +235,19 @@ impl Node { scheduling_policy: self.scheduling.into(), listen_http_addr: self.listen_http_addr.clone(), listen_http_port: self.listen_http_port as i32, + listen_https_port: self.listen_https_port.map(|x| x as i32), listen_pg_addr: self.listen_pg_addr.clone(), listen_pg_port: self.listen_pg_port as i32, availability_zone_id: self.availability_zone_id.0.clone(), } } - pub(crate) fn from_persistent(np: NodePersistence) -> Self { - Self { + pub(crate) fn from_persistent(np: NodePersistence, use_https: bool) -> anyhow::Result { + if use_https && np.listen_https_port.is_none() { + return Err(anyhow!("https is enabled, but node has no https port")); + } + + Ok(Self { id: NodeId(np.node_id as u64), // At startup we consider a node offline until proven otherwise. availability: NodeAvailability::Offline, @@ -217,11 +255,13 @@ impl Node { .expect("Bad scheduling policy in DB"), listen_http_addr: np.listen_http_addr, listen_http_port: np.listen_http_port as u16, + listen_https_port: np.listen_https_port.map(|x| x as u16), listen_pg_addr: np.listen_pg_addr, listen_pg_port: np.listen_pg_port as u16, availability_zone_id: AvailabilityZone(np.availability_zone_id), + use_https, cancel: CancellationToken::new(), - } + }) } /// Wrapper for issuing requests to pageserver management API: takes care of generic @@ -285,8 +325,9 @@ impl Node { warn_threshold, max_retries, &format!( - "Call to node {} ({}:{}) management API", - self.id, self.listen_http_addr, self.listen_http_port + "Call to node {} ({}) management API", + self.id, + self.base_url(), ), cancel, ) @@ -302,6 +343,7 @@ impl Node { availability_zone_id: self.availability_zone_id.0.clone(), listen_http_addr: self.listen_http_addr.clone(), listen_http_port: self.listen_http_port, + listen_https_port: self.listen_https_port, listen_pg_addr: self.listen_pg_addr.clone(), listen_pg_port: self.listen_pg_port, } diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs index 67b60eadf3..459c11add9 100644 --- a/storage_controller/src/persistence.rs +++ b/storage_controller/src/persistence.rs @@ -375,18 +375,23 @@ impl Persistence { Ok(nodes) } - pub(crate) async fn update_node( + pub(crate) async fn update_node( &self, input_node_id: NodeId, - input_scheduling: NodeSchedulingPolicy, - ) -> DatabaseResult<()> { + values: V, + ) -> DatabaseResult<()> + where + V: diesel::AsChangeset + Clone + Send + Sync, + V::Changeset: diesel::query_builder::QueryFragment + Send, // valid Postgres SQL + { use crate::schema::nodes::dsl::*; let updated = self .with_measured_conn(DatabaseOperation::UpdateNode, move |conn| { + let values = values.clone(); Box::pin(async move { let updated = diesel::update(nodes) .filter(node_id.eq(input_node_id.0 as i64)) - .set((scheduling_policy.eq(String::from(input_scheduling)),)) + .set(values) .execute(conn) .await?; Ok(updated) @@ -403,6 +408,32 @@ impl Persistence { } } + pub(crate) async fn update_node_scheduling_policy( + &self, + input_node_id: NodeId, + input_scheduling: NodeSchedulingPolicy, + ) -> DatabaseResult<()> { + use crate::schema::nodes::dsl::*; + self.update_node( + input_node_id, + scheduling_policy.eq(String::from(input_scheduling)), + ) + .await + } + + pub(crate) async fn update_node_on_registration( + &self, + input_node_id: NodeId, + input_https_port: Option, + ) -> DatabaseResult<()> { + use crate::schema::nodes::dsl::*; + self.update_node( + input_node_id, + listen_https_port.eq(input_https_port.map(|x| x as i32)), + ) + .await + } + /// At startup, load the high level state for shards, such as their config + policy. This will /// be enriched at runtime with state discovered on pageservers. /// @@ -1452,6 +1483,7 @@ pub(crate) struct NodePersistence { pub(crate) listen_pg_addr: String, pub(crate) listen_pg_port: i32, pub(crate) availability_zone_id: String, + pub(crate) listen_https_port: Option, } /// Tenant metadata health status that are stored durably. diff --git a/storage_controller/src/scheduler.rs b/storage_controller/src/scheduler.rs index 106a7b2699..44936d018a 100644 --- a/storage_controller/src/scheduler.rs +++ b/storage_controller/src/scheduler.rs @@ -930,13 +930,16 @@ pub(crate) mod test_utils { NodeId(i), format!("httphost-{i}"), 80 + i as u16, + None, format!("pghost-{i}"), 5432 + i as u16, az_iter .next() .cloned() .unwrap_or(AvailabilityZone("test-az".to_string())), - ); + false, + ) + .unwrap(); node.set_availability(NodeAvailability::Active(test_utilization::simple(0, 0))); assert!(node.is_available()); node diff --git a/storage_controller/src/schema.rs b/storage_controller/src/schema.rs index 14c30c296d..361253bd19 100644 --- a/storage_controller/src/schema.rs +++ b/storage_controller/src/schema.rs @@ -26,6 +26,7 @@ diesel::table! { listen_pg_addr -> Varchar, listen_pg_port -> Int4, availability_zone_id -> Varchar, + listen_https_port -> Nullable, } } diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index fc6d2f3d29..25a1cb4252 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -399,6 +399,8 @@ pub struct Config { pub http_service_port: i32, pub long_reconcile_threshold: Duration, + + pub use_https_pageserver_api: bool, } impl From for ApiError { @@ -1401,8 +1403,8 @@ impl Service { .list_nodes() .await? .into_iter() - .map(Node::from_persistent) - .collect::>(); + .map(|x| Node::from_persistent(x, config.use_https_pageserver_api)) + .collect::>>()?; let nodes: HashMap = nodes.into_iter().map(|n| (n.get_id(), n)).collect(); tracing::info!("Loaded {} nodes from database.", nodes.len()); metrics::METRICS_REGISTRY @@ -1501,10 +1503,13 @@ impl Service { NodeId(node_id as u64), "".to_string(), 123, + None, "".to_string(), 123, AvailabilityZone("test_az".to_string()), - ); + false, + ) + .unwrap(); scheduler.node_upsert(&node); } @@ -5907,8 +5912,10 @@ impl Service { ) .await; + #[derive(PartialEq)] enum RegistrationStatus { - Matched, + UpToDate, + NeedUpdate, Mismatched, New, } @@ -5917,7 +5924,11 @@ impl Service { let locked = self.inner.read().unwrap(); if let Some(node) = locked.nodes.get(®ister_req.node_id) { if node.registration_match(®ister_req) { - RegistrationStatus::Matched + if node.need_update(®ister_req) { + RegistrationStatus::NeedUpdate + } else { + RegistrationStatus::UpToDate + } } else { RegistrationStatus::Mismatched } @@ -5927,9 +5938,9 @@ impl Service { }; match registration_status { - RegistrationStatus::Matched => { + RegistrationStatus::UpToDate => { tracing::info!( - "Node {} re-registered with matching address", + "Node {} re-registered with matching address and is up to date", register_req.node_id ); @@ -5947,7 +5958,7 @@ impl Service { "Node is already registered with different address".to_string(), )); } - RegistrationStatus::New => { + RegistrationStatus::New | RegistrationStatus::NeedUpdate => { // fallthrough } } @@ -5976,6 +5987,16 @@ impl Service { )); } + if self.config.use_https_pageserver_api && register_req.listen_https_port.is_none() { + return Err(ApiError::PreconditionFailed( + format!( + "Node {} has no https port, but use_https is enabled", + register_req.node_id + ) + .into(), + )); + } + // Ordering: we must persist the new node _before_ adding it to in-memory state. // This ensures that before we use it for anything or expose it via any external // API, it is guaranteed to be available after a restart. @@ -5983,13 +6004,29 @@ impl Service { register_req.node_id, register_req.listen_http_addr, register_req.listen_http_port, + register_req.listen_https_port, register_req.listen_pg_addr, register_req.listen_pg_port, register_req.availability_zone_id.clone(), + self.config.use_https_pageserver_api, ); + let new_node = match new_node { + Ok(new_node) => new_node, + Err(error) => return Err(ApiError::InternalServerError(error)), + }; - // TODO: idempotency if the node already exists in the database - self.persistence.insert_node(&new_node).await?; + match registration_status { + RegistrationStatus::New => self.persistence.insert_node(&new_node).await?, + RegistrationStatus::NeedUpdate => { + self.persistence + .update_node_on_registration( + register_req.node_id, + register_req.listen_https_port, + ) + .await? + } + _ => unreachable!("Other statuses have been processed earlier"), + } let mut locked = self.inner.write().unwrap(); let mut new_nodes = (*locked.nodes).clone(); @@ -6004,12 +6041,24 @@ impl Service { .storage_controller_pageserver_nodes .set(locked.nodes.len() as i64); - tracing::info!( - "Registered pageserver {} ({}), now have {} pageservers", - register_req.node_id, - register_req.availability_zone_id, - locked.nodes.len() - ); + match registration_status { + RegistrationStatus::New => { + tracing::info!( + "Registered pageserver {} ({}), now have {} pageservers", + register_req.node_id, + register_req.availability_zone_id, + locked.nodes.len() + ); + } + RegistrationStatus::NeedUpdate => { + tracing::info!( + "Re-registered and updated node {} ({})", + register_req.node_id, + register_req.availability_zone_id, + ); + } + _ => unreachable!("Other statuses have been processed earlier"), + } Ok(()) } @@ -6027,7 +6076,9 @@ impl Service { if let Some(scheduling) = scheduling { // Scheduling is a persistent part of Node: we must write updates to the database before // applying them in memory - self.persistence.update_node(node_id, scheduling).await?; + self.persistence + .update_node_scheduling_policy(node_id, scheduling) + .await?; } // If we're activating a node, then before setting it active we must reconcile any shard locations diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 58c5dbfd29..1d282971b1 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1167,15 +1167,15 @@ class NeonEnv: "max_batch_size": 32, } - # Concurrent IO (https://github.com/neondatabase/neon/issues/9378): - # enable concurrent IO by default in tests and benchmarks. - # Compat tests are exempt because old versions fail to parse the new config. - get_vectored_concurrent_io = self.pageserver_get_vectored_concurrent_io if config.test_may_use_compatibility_snapshot_binaries: log.info( - "Forcing use of binary-built-in default to avoid forward-compatibility related test failures" + "Skipping WAL contiguity validation to avoid forward-compatibility related test failures" ) - get_vectored_concurrent_io = None + else: + # Look for gaps in WAL received from safekeepeers + ps_cfg["validate_wal_contiguity"] = True + + get_vectored_concurrent_io = self.pageserver_get_vectored_concurrent_io if get_vectored_concurrent_io is not None: ps_cfg["get_vectored_concurrent_io"] = { "mode": self.pageserver_get_vectored_concurrent_io, @@ -1630,6 +1630,7 @@ def neon_env_builder( class PageserverPort: pg: int http: int + https: int | None = None class LogUtils: @@ -1886,6 +1887,7 @@ class NeonStorageController(MetricsGetter, LogUtils): "node_id": int(node.id), "listen_http_addr": "localhost", "listen_http_port": node.service_port.http, + "listen_https_port": node.service_port.https, "listen_pg_addr": "localhost", "listen_pg_port": node.service_port.pg, "availability_zone_id": node.az_id, diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index 1d95312140..7e895422d2 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -3764,3 +3764,56 @@ def test_storage_controller_node_flap_detach_race( assert len(locs) == 1, f"{shard} has {len(locs)} attached locations" wait_until(validate_locations, timeout=10) + + +def test_update_node_on_registration(neon_env_builder: NeonEnvBuilder): + """ + Check that storage controller handles node_register requests with updated fields correctly. + 1. Run storage controller and register 1 pageserver without https port. + 2. Register the same pageserver with https port. Check that port has been updated. + 3. Restart the storage controller. Check that https port is persistent. + 4. Register the same pageserver without https port again (rollback). Check that port has been removed. + """ + neon_env_builder.num_pageservers = 1 + env = neon_env_builder.init_configs() + + env.storage_controller.start() + env.storage_controller.wait_until_ready() + + pageserver = env.pageservers[0] + + # Step 1. Register pageserver without https port. + env.storage_controller.node_register(pageserver) + env.storage_controller.consistency_check() + + nodes = env.storage_controller.node_list() + assert len(nodes) == 1 + assert nodes[0]["listen_https_port"] is None + + # Step 2. Register pageserver with https port. + pageserver.service_port.https = 1234 + env.storage_controller.node_register(pageserver) + env.storage_controller.consistency_check() + + nodes = env.storage_controller.node_list() + assert len(nodes) == 1 + assert nodes[0]["listen_https_port"] == 1234 + + # Step 3. Restart storage controller. + env.storage_controller.stop() + env.storage_controller.start() + env.storage_controller.wait_until_ready() + env.storage_controller.consistency_check() + + nodes = env.storage_controller.node_list() + assert len(nodes) == 1 + assert nodes[0]["listen_https_port"] == 1234 + + # Step 4. Register pageserver with no https port again. + pageserver.service_port.https = None + env.storage_controller.node_register(pageserver) + env.storage_controller.consistency_check() + + nodes = env.storage_controller.node_list() + assert len(nodes) == 1 + assert nodes[0]["listen_https_port"] is None diff --git a/test_runner/regress/test_subscriber_branching.py b/test_runner/regress/test_subscriber_branching.py index 849d4f024d..6175643389 100644 --- a/test_runner/regress/test_subscriber_branching.py +++ b/test_runner/regress/test_subscriber_branching.py @@ -1,9 +1,10 @@ from __future__ import annotations +import threading import time from fixtures.log_helper import log -from fixtures.neon_fixtures import NeonEnv +from fixtures.neon_fixtures import NeonEnv, logical_replication_sync from fixtures.utils import query_scalar, wait_until @@ -239,3 +240,173 @@ def test_subscriber_branching(neon_simple_env: NeonEnv): res = scur_postgres.fetchall() assert len(res) == 1 assert str(sub_child_2_timeline_id) == res[0][0] + + +def test_multiple_subscription_branching(neon_simple_env: NeonEnv): + """ + Test that compute_ctl can handle concurrent deletion of subscriptions in a multiple databases + """ + env = neon_simple_env + + NUMBER_OF_DBS = 5 + + # Create and start endpoint so that neon_local put all the generated + # stuff into the spec.json file. + endpoint = env.endpoints.create_start( + "main", + config_lines=[ + "max_replication_slots = 10", + "max_logical_replication_workers=10", + "max_worker_processes=10", + ], + ) + + TEST_DB_NAMES = [ + { + "name": "neondb", + "owner": "cloud_admin", + }, + { + "name": "publisher_db", + "owner": "cloud_admin", + }, + ] + + for i in range(NUMBER_OF_DBS): + TEST_DB_NAMES.append( + { + "name": f"db{i}", + "owner": "cloud_admin", + } + ) + + # Update the spec.json file to create the databases + # and reconfigure the endpoint to apply the changes. + endpoint.respec_deep( + **{ + "skip_pg_catalog_updates": False, + "cluster": { + "databases": TEST_DB_NAMES, + }, + } + ) + endpoint.reconfigure() + + connstr = endpoint.connstr(dbname="publisher_db").replace("'", "''") + + # create table, replication and subscription for each of the databases + with endpoint.cursor(dbname="publisher_db") as publisher_cursor: + for i in range(NUMBER_OF_DBS): + publisher_cursor.execute(f"CREATE TABLE t{i}(a int)") + publisher_cursor.execute(f"CREATE PUBLICATION mypub{i} FOR TABLE t{i}") + publisher_cursor.execute( + f"select pg_catalog.pg_create_logical_replication_slot('mysub{i}', 'pgoutput');" + ) + publisher_cursor.execute(f"INSERT INTO t{i} VALUES ({i})") + + with endpoint.cursor(dbname=f"db{i}") as cursor: + cursor.execute(f"CREATE TABLE t{i}(a int)") + cursor.execute( + f"CREATE SUBSCRIPTION mysub{i} CONNECTION '{connstr}' PUBLICATION mypub{i} WITH (create_slot = false) " + ) + + # wait for the subscription to be active + for i in range(NUMBER_OF_DBS): + logical_replication_sync( + endpoint, + endpoint, + f"mysub{i}", + sub_dbname=f"db{i}", + pub_dbname="publisher_db", + ) + + # Check that replication is working + for i in range(NUMBER_OF_DBS): + with endpoint.cursor(dbname=f"db{i}") as cursor: + cursor.execute(f"SELECT * FROM t{i}") + rows = cursor.fetchall() + assert len(rows) == 1 + assert rows[0][0] == i + + last_insert_lsn = query_scalar(cursor, "select pg_current_wal_insert_lsn();") + + def start_publisher_workload(table_num: int, duration: int): + start = time.time() + with endpoint.cursor(dbname="publisher_db") as cur: + while time.time() - start < duration: + cur.execute(f"INSERT INTO t{i} SELECT FROM generate_series(1,1000)") + + LOAD_DURATION = 5 + threads = [ + threading.Thread(target=start_publisher_workload, args=(i, LOAD_DURATION)) + for i in range(NUMBER_OF_DBS) + ] + + for thread in threads: + thread.start() + + sub_child_1_timeline_id = env.create_branch( + "subscriber_child_1", + ancestor_branch_name="main", + ancestor_start_lsn=last_insert_lsn, + ) + + sub_child_1 = env.endpoints.create("subscriber_child_1") + + sub_child_1.respec( + skip_pg_catalog_updates=False, + reconfigure_concurrency=5, + drop_subscriptions_before_start=True, + cluster={ + "databases": TEST_DB_NAMES, + "roles": [], + }, + ) + + sub_child_1.start() + + # ensure that subscription deletion happened on this timeline + with sub_child_1.cursor() as scur_postgres: + scur_postgres.execute("SELECT timeline_id from neon.drop_subscriptions_done") + res = scur_postgres.fetchall() + log.info(f"res = {res}") + assert len(res) == 1 + assert str(sub_child_1_timeline_id) == res[0][0] + + # ensure that there are no subscriptions in the databases + for i in range(NUMBER_OF_DBS): + with sub_child_1.cursor(dbname=f"db{i}") as cursor: + cursor.execute("SELECT * FROM pg_catalog.pg_subscription") + res = cursor.fetchall() + assert len(res) == 0 + + # ensure that there are no unexpected rows in the tables + cursor.execute(f"SELECT * FROM t{i}") + rows = cursor.fetchall() + assert len(rows) == 1 + assert rows[0][0] == i + + for thread in threads: + thread.join() + + # ensure that logical replication is still working in main endpoint + # wait for it to catch up + for i in range(NUMBER_OF_DBS): + logical_replication_sync( + endpoint, + endpoint, + f"mysub{i}", + sub_dbname=f"db{i}", + pub_dbname="publisher_db", + ) + + # verify that the data is the same in publisher and subscriber tables + with endpoint.cursor(dbname="publisher_db") as publisher_cursor: + for i in range(NUMBER_OF_DBS): + with endpoint.cursor(dbname=f"db{i}") as cursor: + publisher_cursor.execute(f"SELECT count(*) FROM t{i}") + cursor.execute(f"SELECT count(*) FROM t{i}") + pub_res = publisher_cursor.fetchone() + sub_res = cursor.fetchone() + log.info(f"for table t{i}: pub_res = {pub_res}, sub_res = {sub_res}") + assert pub_res == sub_res diff --git a/vendor/postgres-v15 b/vendor/postgres-v15 index 023f1020ec..6ff5044377 160000 --- a/vendor/postgres-v15 +++ b/vendor/postgres-v15 @@ -1 +1 @@ -Subproject commit 023f1020ecb07af3bb0ddbf4622e1a3c3fa276a4 +Subproject commit 6ff50443773b69749e16da6db9d4f4b19064b4b7 diff --git a/vendor/postgres-v16 b/vendor/postgres-v16 index 6cb8d22079..261ed10e9b 160000 --- a/vendor/postgres-v16 +++ b/vendor/postgres-v16 @@ -1 +1 @@ -Subproject commit 6cb8d22079570b50fcaff29124d40807c1e63a82 +Subproject commit 261ed10e9b8c8dda01ad7aefb18e944e30aa161d diff --git a/vendor/revisions.json b/vendor/revisions.json index 3379cf1ba8..f85cec3a0b 100644 --- a/vendor/revisions.json +++ b/vendor/revisions.json @@ -5,11 +5,11 @@ ], "v16": [ "16.8", - "6cb8d22079570b50fcaff29124d40807c1e63a82" + "261ed10e9b8c8dda01ad7aefb18e944e30aa161d" ], "v15": [ "15.12", - "023f1020ecb07af3bb0ddbf4622e1a3c3fa276a4" + "6ff50443773b69749e16da6db9d4f4b19064b4b7" ], "v14": [ "14.17",