From fc7267a760f9fbe8848580f53b508fd307c59098 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Sat, 26 Jul 2025 15:01:16 +0200 Subject: [PATCH] feature-gate compute side code --- compute_tools/src/monitor.rs | 61 +++++++++++++++------------ control_plane/src/bin/neon_local.rs | 16 ++++++- control_plane/src/endpoint.rs | 5 ++- libs/compute_api/src/spec.rs | 2 + test_runner/fixtures/neon_cli.py | 3 ++ test_runner/fixtures/neon_fixtures.py | 10 +++++ 6 files changed, 66 insertions(+), 31 deletions(-) diff --git a/compute_tools/src/monitor.rs b/compute_tools/src/monitor.rs index 390158078a..2fc0f20a75 100644 --- a/compute_tools/src/monitor.rs +++ b/compute_tools/src/monitor.rs @@ -345,37 +345,42 @@ impl ComputeMonitor { } } - let mode: ComputeMode = self + if self .compute - .state - .lock() - .unwrap() - .pspec - .as_ref() - .expect("we launch ComputeMonitor only after we received a spec") - .spec - .mode; - match mode { - // TODO: can the .spec.mode ever change? if it can (e.g. secondary promote to primary) - // then we should make sure that lsn_lease_state transitions back to None so we stop renewing it. - ComputeMode::Primary => (), - ComputeMode::Static(_) => (), - ComputeMode::Replica => { - // TODO: instead of apply_lsn, use min inflight request LSN - match cli.query_one("SELECT pg_last_wal_replay_lsn() as apply_lsn", &[]) { - Ok(r) => match r.try_get::<&str, postgres_types::PgLsn>("apply_lsn") { - Ok(apply_lsn) => { - let apply_lsn = Lsn(apply_lsn.into()); - self.compute - .ro_replica - .update_min_inflight_request_lsn(apply_lsn); - } + .has_feature(ComputeFeature::StandbyHorizonLeasesExperimental) + { + let mode: ComputeMode = self + .compute + .state + .lock() + .unwrap() + .pspec + .as_ref() + .expect("we launch ComputeMonitor only after we received a spec") + .spec + .mode; + match mode { + // TODO: can the .spec.mode ever change? if it can (e.g. secondary promote to primary) + // then we should make sure that lsn_lease_state transitions back to None so we stop renewing it. + ComputeMode::Primary => (), + ComputeMode::Static(_) => (), + ComputeMode::Replica => { + // TODO: instead of apply_lsn, use min inflight request LSN + match cli.query_one("SELECT pg_last_wal_replay_lsn() as apply_lsn", &[]) { + Ok(r) => match r.try_get::<&str, postgres_types::PgLsn>("apply_lsn") { + Ok(apply_lsn) => { + let apply_lsn = Lsn(apply_lsn.into()); + self.compute + .ro_replica + .update_min_inflight_request_lsn(apply_lsn); + } + Err(e) => { + anyhow::bail!("parse apply_lsn: {e}"); + } + }, Err(e) => { - anyhow::bail!("parse apply_lsn: {e}"); + anyhow::bail!("query apply_lsn: {e}"); } - }, - Err(e) => { - anyhow::bail!("query apply_lsn: {e}"); } } } diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 6021933d6a..b1cc770b88 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -16,7 +16,7 @@ use std::time::Duration; use anyhow::{Context, Result, anyhow, bail}; use clap::Parser; use compute_api::requests::ComputeClaimsScope; -use compute_api::spec::{ComputeMode, PageserverProtocol}; +use compute_api::spec::{ComputeFeature, ComputeMode, PageserverProtocol}; use control_plane::broker::StorageBroker; use control_plane::endpoint::{ComputeControlPlane, EndpointTerminateMode}; use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_ADDR, EndpointStorage}; @@ -595,6 +595,9 @@ struct EndpointCreateCmdArgs { #[clap(long = "pageserver-id")] endpoint_pageserver_id: Option, + #[clap(long, value_parser = parse_compute_features)] + features: Option, + #[clap( long, help = "Don't do basebackup, create endpoint directory with only config files", @@ -633,6 +636,13 @@ struct EndpointCreateCmdArgs { allow_multiple: bool, } +#[derive(Clone, serde::Deserialize)] +#[serde(transparent)] +struct ComputeFeatures(Vec); +fn parse_compute_features(s: &str) -> anyhow::Result { + serde_json::from_str(s).context("parse compute features arg as JSON array") +} + #[derive(clap::Args)] #[clap(about = "Start postgres. If the endpoint doesn't exist yet, it is created.")] struct EndpointStartCmdArgs { @@ -1480,6 +1490,10 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res args.grpc, !args.update_catalog, false, + args.features + .as_ref() + .map(|x| x.0.clone()) + .unwrap_or(vec![]), )?; } EndpointCmd::Start(args) => { diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 74ab15dc97..c5c63da8e5 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -198,6 +198,7 @@ impl ComputeControlPlane { grpc: bool, skip_pg_catalog_updates: bool, drop_subscriptions_before_start: bool, + features: Vec, ) -> Result> { let pg_port = pg_port.unwrap_or_else(|| self.get_port()); let external_http_port = external_http_port.unwrap_or_else(|| self.get_port() + 1); @@ -232,7 +233,7 @@ impl ComputeControlPlane { drop_subscriptions_before_start, grpc, reconfigure_concurrency: 1, - features: vec![], + features: features.clone(), cluster: None, compute_ctl_config: compute_ctl_config.clone(), }); @@ -253,7 +254,7 @@ impl ComputeControlPlane { skip_pg_catalog_updates, drop_subscriptions_before_start, reconfigure_concurrency: 1, - features: vec![], + features, cluster: None, compute_ctl_config, })?, diff --git a/libs/compute_api/src/spec.rs b/libs/compute_api/src/spec.rs index 60311aa3e6..202834dbb4 100644 --- a/libs/compute_api/src/spec.rs +++ b/libs/compute_api/src/spec.rs @@ -207,6 +207,8 @@ pub enum ComputeFeature { /// Enable TLS functionality. TlsExperimental, + StandbyHorizonLeasesExperimental, + /// This is a special feature flag that is used to represent unknown feature flags. /// Basically all unknown to enum flags are represented as this one. See unit test /// `parse_unknown_features()` for more details. diff --git a/test_runner/fixtures/neon_cli.py b/test_runner/fixtures/neon_cli.py index 1abd3396e4..7643ad6e1f 100644 --- a/test_runner/fixtures/neon_cli.py +++ b/test_runner/fixtures/neon_cli.py @@ -503,6 +503,7 @@ class NeonLocalCli(AbstractNeonCli): pageserver_id: int | None = None, allow_multiple=False, update_catalog: bool = False, + features: list[str] | None = None, ) -> subprocess.CompletedProcess[str]: args = [ "endpoint", @@ -534,6 +535,8 @@ class NeonLocalCli(AbstractNeonCli): args.extend(["--allow-multiple"]) if update_catalog: args.extend(["--update-catalog"]) + if features is not None: + args.extend(["--features", json.dumps(features)]) res = self.raw_cli(args) res.check_returncode() diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index d922be55d7..f87182846e 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -4285,6 +4285,7 @@ class Endpoint(PgProtocol, LogUtils): pageserver_id: int | None = None, allow_multiple: bool = False, update_catalog: bool = False, + features: list[str] | None = None, ) -> Self: """ Create a new Postgres endpoint. @@ -4312,6 +4313,7 @@ class Endpoint(PgProtocol, LogUtils): pageserver_id=pageserver_id, allow_multiple=allow_multiple, update_catalog=update_catalog, + features=features, ) path = Path("endpoints") / self.endpoint_id / "pgdata" self.pgdata_dir = self.env.repo_dir / path @@ -4617,6 +4619,7 @@ class Endpoint(PgProtocol, LogUtils): basebackup_request_tries: int | None = None, autoprewarm: bool = False, offload_lfc_interval_seconds: int | None = None, + features: list[str] | None = None, ) -> Self: """ Create an endpoint, apply config, and start Postgres. @@ -4632,6 +4635,7 @@ class Endpoint(PgProtocol, LogUtils): lsn=lsn, pageserver_id=pageserver_id, allow_multiple=allow_multiple, + features=features, ).start( remote_ext_base_url=remote_ext_base_url, pageserver_id=pageserver_id, @@ -4725,6 +4729,7 @@ class EndpointFactory: basebackup_request_tries: int | None = None, autoprewarm: bool = False, offload_lfc_interval_seconds: int | None = None, + features: list[str] | None = None, ) -> Endpoint: ep = Endpoint( self.env, @@ -4748,6 +4753,7 @@ class EndpointFactory: basebackup_request_tries=basebackup_request_tries, autoprewarm=autoprewarm, offload_lfc_interval_seconds=offload_lfc_interval_seconds, + features=features, ) def create( @@ -4761,6 +4767,7 @@ class EndpointFactory: config_lines: list[str] | None = None, pageserver_id: int | None = None, update_catalog: bool = False, + features: list[str] | None = None, ) -> Endpoint: ep = Endpoint( self.env, @@ -4784,6 +4791,7 @@ class EndpointFactory: config_lines=config_lines, pageserver_id=pageserver_id, update_catalog=update_catalog, + features=features, ) def stop_all(self, fail_on_error=True) -> Self: @@ -4827,6 +4835,7 @@ class EndpointFactory: endpoint_id: str | None = None, grpc: bool | None = None, config_lines: list[str] | None = None, + features: list[str] | None = None, ) -> Endpoint: branch_name = origin.branch_name assert origin in self.endpoints @@ -4840,6 +4849,7 @@ class EndpointFactory: grpc=grpc, hot_standby=True, config_lines=config_lines, + features=features, )