feat(pageserver): support force overriding feature flags (#12233)

## Problem

Part of #11813 

## Summary of changes

Add a test API to make it easier to manipulate the feature flags within
tests.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
Alex Chi Z.
2025-06-23 13:31:53 -04:00
committed by GitHub
parent 6c3aba7c44
commit 85164422d0
5 changed files with 159 additions and 7 deletions

View File

@@ -583,7 +583,7 @@ fn start_pageserver(
deletion_queue_client,
l0_flush_global_state,
basebackup_prepare_sender,
feature_resolver,
feature_resolver: feature_resolver.clone(),
},
shutdown_pageserver.clone(),
);
@@ -715,6 +715,7 @@ fn start_pageserver(
disk_usage_eviction_state,
deletion_queue.new_client(),
secondary_controller,
feature_resolver,
)
.context("Failed to initialize router state")?,
);

View File

@@ -1,5 +1,6 @@
use std::{collections::HashMap, sync::Arc, time::Duration};
use arc_swap::ArcSwap;
use pageserver_api::config::NodeMetadata;
use posthog_client_lite::{
CaptureEvent, FeatureResolverBackgroundLoop, PostHogClientConfig, PostHogEvaluationError,
@@ -18,6 +19,7 @@ const DEFAULT_POSTHOG_REFRESH_INTERVAL: Duration = Duration::from_secs(600);
pub struct FeatureResolver {
inner: Option<Arc<FeatureResolverBackgroundLoop>>,
internal_properties: Option<Arc<HashMap<String, PostHogFlagFilterPropertyValue>>>,
force_overrides_for_testing: Arc<ArcSwap<HashMap<String, String>>>,
}
impl FeatureResolver {
@@ -25,6 +27,7 @@ impl FeatureResolver {
Self {
inner: None,
internal_properties: None,
force_overrides_for_testing: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))),
}
}
@@ -151,11 +154,13 @@ impl FeatureResolver {
Ok(FeatureResolver {
inner: Some(inner),
internal_properties: Some(internal_properties),
force_overrides_for_testing: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))),
})
} else {
Ok(FeatureResolver {
inner: None,
internal_properties: None,
force_overrides_for_testing: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))),
})
}
}
@@ -195,6 +200,11 @@ impl FeatureResolver {
flag_key: &str,
tenant_id: TenantId,
) -> Result<String, PostHogEvaluationError> {
let force_overrides = self.force_overrides_for_testing.load();
if let Some(value) = force_overrides.get(flag_key) {
return Ok(value.clone());
}
if let Some(inner) = &self.inner {
let res = inner.feature_store().evaluate_multivariate(
flag_key,
@@ -233,6 +243,15 @@ impl FeatureResolver {
flag_key: &str,
tenant_id: TenantId,
) -> Result<(), PostHogEvaluationError> {
let force_overrides = self.force_overrides_for_testing.load();
if let Some(value) = force_overrides.get(flag_key) {
return if value == "true" {
Ok(())
} else {
Err(PostHogEvaluationError::NoConditionGroupMatched)
};
}
if let Some(inner) = &self.inner {
let res = inner.feature_store().evaluate_boolean(
flag_key,
@@ -264,8 +283,22 @@ impl FeatureResolver {
inner.feature_store().is_feature_flag_boolean(flag_key)
} else {
Err(PostHogEvaluationError::NotAvailable(
"PostHog integration is not enabled".to_string(),
"PostHog integration is not enabled, cannot auto-determine the flag type"
.to_string(),
))
}
}
/// Force override a feature flag for testing. This is only for testing purposes. Assume the caller only call it
/// from a single thread so it won't race.
pub fn force_override_for_testing(&self, flag_key: &str, value: Option<&str>) {
let mut force_overrides = self.force_overrides_for_testing.load().as_ref().clone();
if let Some(value) = value {
force_overrides.insert(flag_key.to_string(), value.to_string());
} else {
force_overrides.remove(flag_key);
}
self.force_overrides_for_testing
.store(Arc::new(force_overrides));
}
}

View File

@@ -59,6 +59,7 @@ use crate::config::PageServerConf;
use crate::context;
use crate::context::{DownloadBehavior, RequestContext, RequestContextBuilder};
use crate::deletion_queue::DeletionQueueClient;
use crate::feature_resolver::FeatureResolver;
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::task_mgr::TaskKind;
use crate::tenant::config::LocationConf;
@@ -107,6 +108,7 @@ pub struct State {
deletion_queue_client: DeletionQueueClient,
secondary_controller: SecondaryController,
latest_utilization: tokio::sync::Mutex<Option<(std::time::Instant, bytes::Bytes)>>,
feature_resolver: FeatureResolver,
}
impl State {
@@ -120,6 +122,7 @@ impl State {
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
deletion_queue_client: DeletionQueueClient,
secondary_controller: SecondaryController,
feature_resolver: FeatureResolver,
) -> anyhow::Result<Self> {
let allowlist_routes = &[
"/v1/status",
@@ -140,6 +143,7 @@ impl State {
deletion_queue_client,
secondary_controller,
latest_utilization: Default::default(),
feature_resolver,
})
}
}
@@ -3675,8 +3679,8 @@ async fn tenant_evaluate_feature_flag(
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let flag: String = must_parse_query_param(&request, "flag")?;
let as_type: String = must_parse_query_param(&request, "as")?;
let flag: String = parse_request_param(&request, "flag_key")?;
let as_type: Option<String> = parse_query_param(&request, "as")?;
let state = get_state(&request);
@@ -3685,11 +3689,11 @@ async fn tenant_evaluate_feature_flag(
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
let properties = tenant.feature_resolver.collect_properties(tenant_shard_id.tenant_id);
if as_type == "boolean" {
if as_type.as_deref() == Some("boolean") {
let result = tenant.feature_resolver.evaluate_boolean(&flag, tenant_shard_id.tenant_id);
let result = result.map(|_| true).map_err(|e| e.to_string());
json_response(StatusCode::OK, json!({ "result": result, "properties": properties }))
} else if as_type == "multivariate" {
} else if as_type.as_deref() == Some("multivariate") {
let result = tenant.feature_resolver.evaluate_multivariate(&flag, tenant_shard_id.tenant_id).map_err(|e| e.to_string());
json_response(StatusCode::OK, json!({ "result": result, "properties": properties }))
} else {
@@ -3709,6 +3713,35 @@ async fn tenant_evaluate_feature_flag(
.await
}
async fn force_override_feature_flag_for_testing_put(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
let flag: String = parse_request_param(&request, "flag_key")?;
let value: String = must_parse_query_param(&request, "value")?;
let state = get_state(&request);
state
.feature_resolver
.force_override_for_testing(&flag, Some(&value));
json_response(StatusCode::OK, ())
}
async fn force_override_feature_flag_for_testing_delete(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
let flag: String = parse_request_param(&request, "flag_key")?;
let state = get_state(&request);
state
.feature_resolver
.force_override_for_testing(&flag, None);
json_response(StatusCode::OK, ())
}
/// Common functionality of all the HTTP API handlers.
///
/// - Adds a tracing span to each request (by `request_span`)
@@ -4085,8 +4118,14 @@ pub fn make_router(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/activate_post_import",
|r| api_handler(r, activate_post_import_handler),
)
.get("/v1/tenant/:tenant_shard_id/feature_flag", |r| {
.get("/v1/tenant/:tenant_shard_id/feature_flag/:flag_key", |r| {
api_handler(r, tenant_evaluate_feature_flag)
})
.put("/v1/feature_flag/:flag_key", |r| {
testing_api_handler("force override feature flag - put", r, force_override_feature_flag_for_testing_put)
})
.delete("/v1/feature_flag/:flag_key", |r| {
testing_api_handler("force override feature flag - delete", r, force_override_feature_flag_for_testing_delete)
})
.any(handler_404))
}

View File

@@ -1219,3 +1219,31 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
)
self.verbose_error(res)
return res.json()
def force_override_feature_flag(self, flag: str, value: str | None = None):
if value is None:
res = self.delete(
f"http://localhost:{self.port}/v1/feature_flag/{flag}",
)
else:
res = self.put(
f"http://localhost:{self.port}/v1/feature_flag/{flag}",
params={"value": value},
)
self.verbose_error(res)
def evaluate_feature_flag_boolean(self, tenant_id: TenantId, flag: str) -> Any:
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/feature_flag/{flag}",
params={"as": "boolean"},
)
self.verbose_error(res)
return res.json()
def evaluate_feature_flag_multivariate(self, tenant_id: TenantId, flag: str) -> Any:
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/feature_flag/{flag}",
params={"as": "multivariate"},
)
self.verbose_error(res)
return res.json()

View File

@@ -0,0 +1,51 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from fixtures.utils import run_only_on_default_postgres
if TYPE_CHECKING:
from fixtures.neon_fixtures import NeonEnvBuilder
@run_only_on_default_postgres("Pageserver-only test only needs to run on one version")
def test_feature_flag(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
env.pageserver.http_client().force_override_feature_flag("test-feature-flag", "true")
assert env.pageserver.http_client().evaluate_feature_flag_boolean(
env.initial_tenant, "test-feature-flag"
)["result"]["Ok"]
assert (
env.pageserver.http_client().evaluate_feature_flag_multivariate(
env.initial_tenant, "test-feature-flag"
)["result"]["Ok"]
== "true"
)
env.pageserver.http_client().force_override_feature_flag("test-feature-flag", "false")
assert (
env.pageserver.http_client().evaluate_feature_flag_boolean(
env.initial_tenant, "test-feature-flag"
)["result"]["Err"]
== "No condition group is matched"
)
assert (
env.pageserver.http_client().evaluate_feature_flag_multivariate(
env.initial_tenant, "test-feature-flag"
)["result"]["Ok"]
== "false"
)
env.pageserver.http_client().force_override_feature_flag("test-feature-flag", None)
assert (
"Err"
in env.pageserver.http_client().evaluate_feature_flag_boolean(
env.initial_tenant, "test-feature-flag"
)["result"]
)
assert (
"Err"
in env.pageserver.http_client().evaluate_feature_flag_multivariate(
env.initial_tenant, "test-feature-flag"
)["result"]
)