feature-gate compute side code

This commit is contained in:
Christian Schwarz
2025-07-26 15:01:16 +02:00
committed by Christian Schwarz
parent 3365c8c648
commit fc7267a760
6 changed files with 66 additions and 31 deletions

View File

@@ -345,37 +345,42 @@ impl ComputeMonitor {
}
}
let mode: ComputeMode = self
if self
.compute
.state
.lock()
.unwrap()
.pspec
.as_ref()
.expect("we launch ComputeMonitor only after we received a spec")
.spec
.mode;
match mode {
// TODO: can the .spec.mode ever change? if it can (e.g. secondary promote to primary)
// then we should make sure that lsn_lease_state transitions back to None so we stop renewing it.
ComputeMode::Primary => (),
ComputeMode::Static(_) => (),
ComputeMode::Replica => {
// TODO: instead of apply_lsn, use min inflight request LSN
match cli.query_one("SELECT pg_last_wal_replay_lsn() as apply_lsn", &[]) {
Ok(r) => match r.try_get::<&str, postgres_types::PgLsn>("apply_lsn") {
Ok(apply_lsn) => {
let apply_lsn = Lsn(apply_lsn.into());
self.compute
.ro_replica
.update_min_inflight_request_lsn(apply_lsn);
}
.has_feature(ComputeFeature::StandbyHorizonLeasesExperimental)
{
let mode: ComputeMode = self
.compute
.state
.lock()
.unwrap()
.pspec
.as_ref()
.expect("we launch ComputeMonitor only after we received a spec")
.spec
.mode;
match mode {
// TODO: can the .spec.mode ever change? if it can (e.g. secondary promote to primary)
// then we should make sure that lsn_lease_state transitions back to None so we stop renewing it.
ComputeMode::Primary => (),
ComputeMode::Static(_) => (),
ComputeMode::Replica => {
// TODO: instead of apply_lsn, use min inflight request LSN
match cli.query_one("SELECT pg_last_wal_replay_lsn() as apply_lsn", &[]) {
Ok(r) => match r.try_get::<&str, postgres_types::PgLsn>("apply_lsn") {
Ok(apply_lsn) => {
let apply_lsn = Lsn(apply_lsn.into());
self.compute
.ro_replica
.update_min_inflight_request_lsn(apply_lsn);
}
Err(e) => {
anyhow::bail!("parse apply_lsn: {e}");
}
},
Err(e) => {
anyhow::bail!("parse apply_lsn: {e}");
anyhow::bail!("query apply_lsn: {e}");
}
},
Err(e) => {
anyhow::bail!("query apply_lsn: {e}");
}
}
}

View File

@@ -16,7 +16,7 @@ use std::time::Duration;
use anyhow::{Context, Result, anyhow, bail};
use clap::Parser;
use compute_api::requests::ComputeClaimsScope;
use compute_api::spec::{ComputeMode, PageserverProtocol};
use compute_api::spec::{ComputeFeature, ComputeMode, PageserverProtocol};
use control_plane::broker::StorageBroker;
use control_plane::endpoint::{ComputeControlPlane, EndpointTerminateMode};
use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_ADDR, EndpointStorage};
@@ -595,6 +595,9 @@ struct EndpointCreateCmdArgs {
#[clap(long = "pageserver-id")]
endpoint_pageserver_id: Option<NodeId>,
#[clap(long, value_parser = parse_compute_features)]
features: Option<ComputeFeatures>,
#[clap(
long,
help = "Don't do basebackup, create endpoint directory with only config files",
@@ -633,6 +636,13 @@ struct EndpointCreateCmdArgs {
allow_multiple: bool,
}
#[derive(Clone, serde::Deserialize)]
#[serde(transparent)]
struct ComputeFeatures(Vec<ComputeFeature>);
fn parse_compute_features(s: &str) -> anyhow::Result<ComputeFeatures> {
serde_json::from_str(s).context("parse compute features arg as JSON array")
}
#[derive(clap::Args)]
#[clap(about = "Start postgres. If the endpoint doesn't exist yet, it is created.")]
struct EndpointStartCmdArgs {
@@ -1480,6 +1490,10 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
args.grpc,
!args.update_catalog,
false,
args.features
.as_ref()
.map(|x| x.0.clone())
.unwrap_or(vec![]),
)?;
}
EndpointCmd::Start(args) => {

View File

@@ -198,6 +198,7 @@ impl ComputeControlPlane {
grpc: bool,
skip_pg_catalog_updates: bool,
drop_subscriptions_before_start: bool,
features: Vec<ComputeFeature>,
) -> Result<Arc<Endpoint>> {
let pg_port = pg_port.unwrap_or_else(|| self.get_port());
let external_http_port = external_http_port.unwrap_or_else(|| self.get_port() + 1);
@@ -232,7 +233,7 @@ impl ComputeControlPlane {
drop_subscriptions_before_start,
grpc,
reconfigure_concurrency: 1,
features: vec![],
features: features.clone(),
cluster: None,
compute_ctl_config: compute_ctl_config.clone(),
});
@@ -253,7 +254,7 @@ impl ComputeControlPlane {
skip_pg_catalog_updates,
drop_subscriptions_before_start,
reconfigure_concurrency: 1,
features: vec![],
features,
cluster: None,
compute_ctl_config,
})?,

View File

@@ -207,6 +207,8 @@ pub enum ComputeFeature {
/// Enable TLS functionality.
TlsExperimental,
StandbyHorizonLeasesExperimental,
/// This is a special feature flag that is used to represent unknown feature flags.
/// Basically all unknown to enum flags are represented as this one. See unit test
/// `parse_unknown_features()` for more details.

View File

@@ -503,6 +503,7 @@ class NeonLocalCli(AbstractNeonCli):
pageserver_id: int | None = None,
allow_multiple=False,
update_catalog: bool = False,
features: list[str] | None = None,
) -> subprocess.CompletedProcess[str]:
args = [
"endpoint",
@@ -534,6 +535,8 @@ class NeonLocalCli(AbstractNeonCli):
args.extend(["--allow-multiple"])
if update_catalog:
args.extend(["--update-catalog"])
if features is not None:
args.extend(["--features", json.dumps(features)])
res = self.raw_cli(args)
res.check_returncode()

View File

@@ -4285,6 +4285,7 @@ class Endpoint(PgProtocol, LogUtils):
pageserver_id: int | None = None,
allow_multiple: bool = False,
update_catalog: bool = False,
features: list[str] | None = None,
) -> Self:
"""
Create a new Postgres endpoint.
@@ -4312,6 +4313,7 @@ class Endpoint(PgProtocol, LogUtils):
pageserver_id=pageserver_id,
allow_multiple=allow_multiple,
update_catalog=update_catalog,
features=features,
)
path = Path("endpoints") / self.endpoint_id / "pgdata"
self.pgdata_dir = self.env.repo_dir / path
@@ -4617,6 +4619,7 @@ class Endpoint(PgProtocol, LogUtils):
basebackup_request_tries: int | None = None,
autoprewarm: bool = False,
offload_lfc_interval_seconds: int | None = None,
features: list[str] | None = None,
) -> Self:
"""
Create an endpoint, apply config, and start Postgres.
@@ -4632,6 +4635,7 @@ class Endpoint(PgProtocol, LogUtils):
lsn=lsn,
pageserver_id=pageserver_id,
allow_multiple=allow_multiple,
features=features,
).start(
remote_ext_base_url=remote_ext_base_url,
pageserver_id=pageserver_id,
@@ -4725,6 +4729,7 @@ class EndpointFactory:
basebackup_request_tries: int | None = None,
autoprewarm: bool = False,
offload_lfc_interval_seconds: int | None = None,
features: list[str] | None = None,
) -> Endpoint:
ep = Endpoint(
self.env,
@@ -4748,6 +4753,7 @@ class EndpointFactory:
basebackup_request_tries=basebackup_request_tries,
autoprewarm=autoprewarm,
offload_lfc_interval_seconds=offload_lfc_interval_seconds,
features=features,
)
def create(
@@ -4761,6 +4767,7 @@ class EndpointFactory:
config_lines: list[str] | None = None,
pageserver_id: int | None = None,
update_catalog: bool = False,
features: list[str] | None = None,
) -> Endpoint:
ep = Endpoint(
self.env,
@@ -4784,6 +4791,7 @@ class EndpointFactory:
config_lines=config_lines,
pageserver_id=pageserver_id,
update_catalog=update_catalog,
features=features,
)
def stop_all(self, fail_on_error=True) -> Self:
@@ -4827,6 +4835,7 @@ class EndpointFactory:
endpoint_id: str | None = None,
grpc: bool | None = None,
config_lines: list[str] | None = None,
features: list[str] | None = None,
) -> Endpoint:
branch_name = origin.branch_name
assert origin in self.endpoints
@@ -4840,6 +4849,7 @@ class EndpointFactory:
grpc=grpc,
hot_standby=True,
config_lines=config_lines,
features=features,
)