From 6c77638ea15be150a128ce3d09823dfaafb966fc Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Wed, 25 Jun 2025 10:58:18 -0400 Subject: [PATCH] feat(storcon): retrieve feature flag and pass to pageservers (#12324) ## Problem part of https://github.com/neondatabase/neon/issues/11813 ## Summary of changes It costs $$$ to directly retrieve the feature flags from the pageserver. Therefore, this patch adds new APIs to retrieve the spec from the storcon and updates it via pageserver. * Storcon retrieves the feature flag and send it to the pageservers. * If the feature flag gets updated outside of the normal refresh loop of the pageserver, pageserver won't fetch the flags on its own as long as the last updated time <= refresh_period. Signed-off-by: Alex Chi Z --- Cargo.lock | 2 + control_plane/src/local_env.rs | 4 + control_plane/src/storage_controller.rs | 17 ++- libs/pageserver_api/src/config.rs | 7 +- .../src/background_loop.rs | 61 ++++++--- libs/posthog_client_lite/src/lib.rs | 30 +++-- pageserver/client/src/mgmt_api.rs | 9 ++ pageserver/src/feature_resolver.rs | 7 ++ pageserver/src/http/routes.rs | 17 +++ storage_controller/Cargo.toml | 2 + storage_controller/src/main.rs | 40 ++++++ storage_controller/src/pageserver_client.rs | 9 ++ storage_controller/src/service.rs | 5 + .../src/service/feature_flag.rs | 117 ++++++++++++++++++ 14 files changed, 294 insertions(+), 33 deletions(-) create mode 100644 storage_controller/src/service/feature_flag.rs diff --git a/Cargo.lock b/Cargo.lock index 51724da061..1fee728d9c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6815,6 +6815,7 @@ dependencies = [ "hex", "http-utils", "humantime", + "humantime-serde", "hyper 0.14.30", "itertools 0.10.5", "json-structural-diff", @@ -6825,6 +6826,7 @@ dependencies = [ "pageserver_api", "pageserver_client", "postgres_connection", + "posthog_client_lite", "rand 0.8.5", "regex", "reqwest", diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 370921a85c..16cd2d8c08 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -12,6 +12,7 @@ use std::{env, fs}; use anyhow::{Context, bail}; use clap::ValueEnum; +use pageserver_api::config::PostHogConfig; use pem::Pem; use postgres_backend::AuthType; use reqwest::{Certificate, Url}; @@ -213,6 +214,8 @@ pub struct NeonStorageControllerConf { pub timeline_safekeeper_count: Option, + pub posthog_config: Option, + pub kick_secondary_downloads: Option, } @@ -245,6 +248,7 @@ impl Default for NeonStorageControllerConf { use_https_safekeeper_api: false, use_local_compute_notifications: true, timeline_safekeeper_count: None, + posthog_config: None, kick_secondary_downloads: None, } } diff --git a/control_plane/src/storage_controller.rs b/control_plane/src/storage_controller.rs index 218b17d88d..dea7ae2ccf 100644 --- a/control_plane/src/storage_controller.rs +++ b/control_plane/src/storage_controller.rs @@ -642,6 +642,18 @@ impl StorageController { args.push(format!("--timeline-safekeeper-count={sk_cnt}")); } + let mut envs = vec![ + ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), + ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), + ]; + + if let Some(posthog_config) = &self.config.posthog_config { + envs.push(( + "POSTHOG_CONFIG".to_string(), + serde_json::to_string(posthog_config)?, + )); + } + println!("Starting storage controller"); background_process::start_process( @@ -649,10 +661,7 @@ impl StorageController { &instance_dir, &self.env.storage_controller_bin(), args, - vec![ - ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), - ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), - ], + envs, background_process::InitialPidFile::Create(self.pid_file(start_args.instance_id)), &start_args.start_timeout, || async { diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index cfb1190a27..76730c9ee6 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -63,7 +63,8 @@ impl Display for NodeMetadata { } } -/// PostHog integration config. +/// PostHog integration config. This is used in pageserver, storcon, and neon_local. +/// Ensure backward compatibility when adding new fields. #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] pub struct PostHogConfig { /// PostHog project ID @@ -76,7 +77,9 @@ pub struct PostHogConfig { pub private_api_url: String, /// Public API URL pub public_api_url: String, - /// Refresh interval for the feature flag spec + /// Refresh interval for the feature flag spec. + /// The storcon will push the feature flag spec to the pageserver. If the pageserver does not receive + /// the spec for `refresh_interval`, it will fetch the spec from the PostHog API. #[serde(skip_serializing_if = "Option::is_none")] #[serde(with = "humantime_serde")] pub refresh_interval: Option, diff --git a/libs/posthog_client_lite/src/background_loop.rs b/libs/posthog_client_lite/src/background_loop.rs index dc813ccb4a..08cb0d2264 100644 --- a/libs/posthog_client_lite/src/background_loop.rs +++ b/libs/posthog_client_lite/src/background_loop.rs @@ -1,17 +1,22 @@ //! A background loop that fetches feature flags from PostHog and updates the feature store. -use std::{sync::Arc, time::Duration}; +use std::{ + sync::Arc, + time::{Duration, SystemTime}, +}; use arc_swap::ArcSwap; use tokio_util::sync::CancellationToken; use tracing::{Instrument, info_span}; -use crate::{CaptureEvent, FeatureStore, PostHogClient, PostHogClientConfig}; +use crate::{ + CaptureEvent, FeatureStore, LocalEvaluationResponse, PostHogClient, PostHogClientConfig, +}; /// A background loop that fetches feature flags from PostHog and updates the feature store. pub struct FeatureResolverBackgroundLoop { posthog_client: PostHogClient, - feature_store: ArcSwap, + feature_store: ArcSwap<(SystemTime, Arc)>, cancel: CancellationToken, } @@ -19,11 +24,35 @@ impl FeatureResolverBackgroundLoop { pub fn new(config: PostHogClientConfig, shutdown_pageserver: CancellationToken) -> Self { Self { posthog_client: PostHogClient::new(config), - feature_store: ArcSwap::new(Arc::new(FeatureStore::new())), + feature_store: ArcSwap::new(Arc::new(( + SystemTime::UNIX_EPOCH, + Arc::new(FeatureStore::new()), + ))), cancel: shutdown_pageserver, } } + /// Update the feature store with a new feature flag spec bypassing the normal refresh loop. + pub fn update(&self, spec: String) -> anyhow::Result<()> { + let resp: LocalEvaluationResponse = serde_json::from_str(&spec)?; + self.update_feature_store_nofail(resp, "http_propagate"); + Ok(()) + } + + fn update_feature_store_nofail(&self, resp: LocalEvaluationResponse, source: &'static str) { + let project_id = self.posthog_client.config.project_id.parse::().ok(); + match FeatureStore::new_with_flags(resp.flags, project_id) { + Ok(feature_store) => { + self.feature_store + .store(Arc::new((SystemTime::now(), Arc::new(feature_store)))); + tracing::info!("Feature flag updated from {}", source); + } + Err(e) => { + tracing::warn!("Cannot process feature flag spec from {}: {}", source, e); + } + } + } + pub fn spawn( self: Arc, handle: &tokio::runtime::Handle, @@ -47,6 +76,17 @@ impl FeatureResolverBackgroundLoop { _ = ticker.tick() => {} _ = cancel.cancelled() => break } + { + let last_update = this.feature_store.load().0; + if let Ok(elapsed) = last_update.elapsed() { + if elapsed < refresh_period { + tracing::debug!( + "Skipping feature flag refresh because it's too soon" + ); + continue; + } + } + } let resp = match this .posthog_client .get_feature_flags_local_evaluation() @@ -58,16 +98,7 @@ impl FeatureResolverBackgroundLoop { continue; } }; - let project_id = this.posthog_client.config.project_id.parse::().ok(); - match FeatureStore::new_with_flags(resp.flags, project_id) { - Ok(feature_store) => { - this.feature_store.store(Arc::new(feature_store)); - tracing::info!("Feature flag updated"); - } - Err(e) => { - tracing::warn!("Cannot process feature flag spec: {}", e); - } - } + this.update_feature_store_nofail(resp, "refresh_loop"); } tracing::info!("PostHog feature resolver stopped"); } @@ -92,6 +123,6 @@ impl FeatureResolverBackgroundLoop { } pub fn feature_store(&self) -> Arc { - self.feature_store.load_full() + self.feature_store.load().1.clone() } } diff --git a/libs/posthog_client_lite/src/lib.rs b/libs/posthog_client_lite/src/lib.rs index f21047bcfc..d042ee2410 100644 --- a/libs/posthog_client_lite/src/lib.rs +++ b/libs/posthog_client_lite/src/lib.rs @@ -544,17 +544,8 @@ impl PostHogClient { self.config.server_api_key.starts_with("phs_") } - /// Fetch the feature flag specs from the server. - /// - /// This is unfortunately an undocumented API at: - /// - - /// - - /// - /// The handling logic in [`FeatureStore`] mostly follows the Python API implementation. - /// See `_compute_flag_locally` in - pub async fn get_feature_flags_local_evaluation( - &self, - ) -> anyhow::Result { + /// Get the raw JSON spec, same as `get_feature_flags_local_evaluation` but without parsing. + pub async fn get_feature_flags_local_evaluation_raw(&self) -> anyhow::Result { // BASE_URL/api/projects/:project_id/feature_flags/local_evaluation // with bearer token of self.server_api_key // OR @@ -588,7 +579,22 @@ impl PostHogClient { body )); } - Ok(serde_json::from_str(&body)?) + Ok(body) + } + + /// Fetch the feature flag specs from the server. + /// + /// This is unfortunately an undocumented API at: + /// - + /// - + /// + /// The handling logic in [`FeatureStore`] mostly follows the Python API implementation. + /// See `_compute_flag_locally` in + pub async fn get_feature_flags_local_evaluation( + &self, + ) -> Result { + let raw = self.get_feature_flags_local_evaluation_raw().await?; + Ok(serde_json::from_str(&raw)?) } /// Capture an event. This will only be used to report the feature flag usage back to PostHog, though diff --git a/pageserver/client/src/mgmt_api.rs b/pageserver/client/src/mgmt_api.rs index 3919a6e788..af4be23b9b 100644 --- a/pageserver/client/src/mgmt_api.rs +++ b/pageserver/client/src/mgmt_api.rs @@ -844,4 +844,13 @@ impl Client { .await .map_err(Error::ReceiveBody) } + + pub async fn update_feature_flag_spec(&self, spec: String) -> Result<()> { + let uri = format!("{}/v1/feature_flag_spec", self.mgmt_api_endpoint); + self.request(Method::POST, uri, spec) + .await? + .json() + .await + .map_err(Error::ReceiveBody) + } } diff --git a/pageserver/src/feature_resolver.rs b/pageserver/src/feature_resolver.rs index b0a68dfc4d..92a9ef2880 100644 --- a/pageserver/src/feature_resolver.rs +++ b/pageserver/src/feature_resolver.rs @@ -31,6 +31,13 @@ impl FeatureResolver { } } + pub fn update(&self, spec: String) -> anyhow::Result<()> { + if let Some(inner) = &self.inner { + inner.update(spec)?; + } + Ok(()) + } + pub fn spawn( conf: &PageServerConf, shutdown_pageserver: CancellationToken, diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 3755cbda6a..aa9bec657c 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -3743,6 +3743,20 @@ async fn force_override_feature_flag_for_testing_delete( json_response(StatusCode::OK, ()) } +async fn update_feature_flag_spec( + mut request: Request, + _cancel: CancellationToken, +) -> Result, ApiError> { + check_permission(&request, None)?; + let body = json_request(&mut request).await?; + let state = get_state(&request); + state + .feature_resolver + .update(body) + .map_err(ApiError::InternalServerError)?; + json_response(StatusCode::OK, ()) +} + /// Common functionality of all the HTTP API handlers. /// /// - Adds a tracing span to each request (by `request_span`) @@ -4128,5 +4142,8 @@ pub fn make_router( .delete("/v1/feature_flag/:flag_key", |r| { testing_api_handler("force override feature flag - delete", r, force_override_feature_flag_for_testing_delete) }) + .post("/v1/feature_flag_spec", |r| { + api_handler(r, update_feature_flag_spec) + }) .any(handler_404)) } diff --git a/storage_controller/Cargo.toml b/storage_controller/Cargo.toml index c41e174d9d..3a0806b3b2 100644 --- a/storage_controller/Cargo.toml +++ b/storage_controller/Cargo.toml @@ -27,6 +27,7 @@ governor.workspace = true hex.workspace = true hyper0.workspace = true humantime.workspace = true +humantime-serde.workspace = true itertools.workspace = true json-structural-diff.workspace = true lasso.workspace = true @@ -34,6 +35,7 @@ once_cell.workspace = true pageserver_api.workspace = true pageserver_client.workspace = true postgres_connection.workspace = true +posthog_client_lite.workspace = true rand.workspace = true reqwest = { workspace = true, features = ["stream"] } routerify.workspace = true diff --git a/storage_controller/src/main.rs b/storage_controller/src/main.rs index ff134a4ebc..296a98e620 100644 --- a/storage_controller/src/main.rs +++ b/storage_controller/src/main.rs @@ -14,11 +14,13 @@ use http_utils::tls_certs::ReloadingCertificateResolver; use hyper0::Uri; use metrics::BuildInfo; use metrics::launch_timestamp::LaunchTimestamp; +use pageserver_api::config::PostHogConfig; use reqwest::Certificate; use storage_controller::http::make_router; use storage_controller::metrics::preinitialize_metrics; use storage_controller::persistence::Persistence; use storage_controller::service::chaos_injector::ChaosInjector; +use storage_controller::service::feature_flag::FeatureFlagService; use storage_controller::service::{ Config, HEARTBEAT_INTERVAL_DEFAULT, LONG_RECONCILE_THRESHOLD_DEFAULT, MAX_OFFLINE_INTERVAL_DEFAULT, MAX_WARMING_UP_INTERVAL_DEFAULT, @@ -252,6 +254,8 @@ struct Secrets { peer_jwt_token: Option, } +const POSTHOG_CONFIG_ENV: &str = "POSTHOG_CONFIG"; + impl Secrets { const DATABASE_URL_ENV: &'static str = "DATABASE_URL"; const PAGESERVER_JWT_TOKEN_ENV: &'static str = "PAGESERVER_JWT_TOKEN"; @@ -409,6 +413,18 @@ async fn async_main() -> anyhow::Result<()> { None => Vec::new(), }; + let posthog_config = if let Ok(json) = std::env::var(POSTHOG_CONFIG_ENV) { + let res: Result = serde_json::from_str(&json); + if let Ok(config) = res { + Some(config) + } else { + tracing::warn!("Invalid posthog config: {json}"); + None + } + } else { + None + }; + let config = Config { pageserver_jwt_token: secrets.pageserver_jwt_token, safekeeper_jwt_token: secrets.safekeeper_jwt_token, @@ -455,6 +471,7 @@ async fn async_main() -> anyhow::Result<()> { timelines_onto_safekeepers: args.timelines_onto_safekeepers, use_local_compute_notifications: args.use_local_compute_notifications, timeline_safekeeper_count: args.timeline_safekeeper_count, + posthog_config: posthog_config.clone(), #[cfg(feature = "testing")] kick_secondary_downloads: args.kick_secondary_downloads, }; @@ -537,6 +554,23 @@ async fn async_main() -> anyhow::Result<()> { ) }); + let feature_flag_task = if let Some(posthog_config) = posthog_config { + let service = service.clone(); + let cancel = CancellationToken::new(); + let cancel_bg = cancel.clone(); + let task = tokio::task::spawn( + async move { + let feature_flag_service = FeatureFlagService::new(service, posthog_config); + let feature_flag_service = Arc::new(feature_flag_service); + feature_flag_service.run(cancel_bg).await + } + .instrument(tracing::info_span!("feature_flag_service")), + ); + Some((task, cancel)) + } else { + None + }; + // Wait until we receive a signal let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())?; let mut sigquit = tokio::signal::unix::signal(SignalKind::quit())?; @@ -584,6 +618,12 @@ async fn async_main() -> anyhow::Result<()> { chaos_jh.await.ok(); } + // If we were running the feature flag service, stop that so that we're not calling into Service while it shuts down + if let Some((feature_flag_task, feature_flag_cancel)) = feature_flag_task { + feature_flag_cancel.cancel(); + feature_flag_task.await.ok(); + } + service.shutdown().await; tracing::info!("Service shutdown complete"); diff --git a/storage_controller/src/pageserver_client.rs b/storage_controller/src/pageserver_client.rs index 817409e112..d6fe173eb3 100644 --- a/storage_controller/src/pageserver_client.rs +++ b/storage_controller/src/pageserver_client.rs @@ -376,4 +376,13 @@ impl PageserverClient { .await ) } + + pub(crate) async fn update_feature_flag_spec(&self, spec: String) -> Result<()> { + measured_request!( + "update_feature_flag_spec", + crate::metrics::Method::Post, + &self.node_id_label, + self.inner.update_feature_flag_spec(spec).await + ) + } } diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 8424c27cf8..b4dfd01249 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -1,5 +1,6 @@ pub mod chaos_injector; mod context_iterator; +pub mod feature_flag; pub(crate) mod safekeeper_reconciler; mod safekeeper_service; @@ -25,6 +26,7 @@ use futures::stream::FuturesUnordered; use http_utils::error::ApiError; use hyper::Uri; use itertools::Itertools; +use pageserver_api::config::PostHogConfig; use pageserver_api::controller_api::{ AvailabilityZone, MetadataHealthRecord, MetadataHealthUpdateRequest, NodeAvailability, NodeRegisterRequest, NodeSchedulingPolicy, NodeShard, NodeShardResponse, PlacementPolicy, @@ -471,6 +473,9 @@ pub struct Config { /// Safekeepers will be choosen from different availability zones. pub timeline_safekeeper_count: i64, + /// PostHog integration config + pub posthog_config: Option, + #[cfg(feature = "testing")] pub kick_secondary_downloads: bool, } diff --git a/storage_controller/src/service/feature_flag.rs b/storage_controller/src/service/feature_flag.rs new file mode 100644 index 0000000000..645eb75237 --- /dev/null +++ b/storage_controller/src/service/feature_flag.rs @@ -0,0 +1,117 @@ +use std::{sync::Arc, time::Duration}; + +use futures::StreamExt; +use pageserver_api::config::PostHogConfig; +use pageserver_client::mgmt_api; +use posthog_client_lite::{PostHogClient, PostHogClientConfig}; +use reqwest::StatusCode; +use tokio::time::MissedTickBehavior; +use tokio_util::sync::CancellationToken; + +use crate::{pageserver_client::PageserverClient, service::Service}; + +pub struct FeatureFlagService { + service: Arc, + config: PostHogConfig, + client: PostHogClient, + http_client: reqwest::Client, +} + +const DEFAULT_POSTHOG_REFRESH_INTERVAL: Duration = Duration::from_secs(30); + +impl FeatureFlagService { + pub fn new(service: Arc, config: PostHogConfig) -> Self { + let client = PostHogClient::new(PostHogClientConfig { + project_id: config.project_id.clone(), + server_api_key: config.server_api_key.clone(), + client_api_key: config.client_api_key.clone(), + private_api_url: config.private_api_url.clone(), + public_api_url: config.public_api_url.clone(), + }); + Self { + service, + config, + client, + http_client: reqwest::Client::new(), + } + } + + async fn refresh(self: Arc, cancel: CancellationToken) -> Result<(), anyhow::Error> { + let nodes = { + let inner = self.service.inner.read().unwrap(); + inner.nodes.clone() + }; + + let feature_flag_spec = self.client.get_feature_flags_local_evaluation_raw().await?; + let stream = futures::stream::iter(nodes.values().cloned()).map(|node| { + let this = self.clone(); + let feature_flag_spec = feature_flag_spec.clone(); + async move { + let res = async { + let client = PageserverClient::new( + node.get_id(), + this.http_client.clone(), + node.base_url(), + // TODO: what if we rotate the token during storcon lifetime? + this.service.config.pageserver_jwt_token.as_deref(), + ); + + client.update_feature_flag_spec(feature_flag_spec).await?; + tracing::info!( + "Updated {}({}) with feature flag spec", + node.get_id(), + node.base_url() + ); + Ok::<_, mgmt_api::Error>(()) + }; + + if let Err(e) = res.await { + if let mgmt_api::Error::ApiError(status, _) = e { + if status == StatusCode::NOT_FOUND { + // This is expected during deployments where the API is not available, so we can ignore it + return; + } + } + tracing::warn!( + "Failed to update feature flag spec for {}: {e}", + node.get_id() + ); + } + } + }); + let mut stream = stream.buffer_unordered(8); + + while stream.next().await.is_some() { + if cancel.is_cancelled() { + return Ok(()); + } + } + + Ok(()) + } + + pub async fn run(self: Arc, cancel: CancellationToken) { + let refresh_interval = self + .config + .refresh_interval + .unwrap_or(DEFAULT_POSTHOG_REFRESH_INTERVAL); + let mut interval = tokio::time::interval(refresh_interval); + interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + tracing::info!( + "Starting feature flag service with refresh interval: {:?}", + refresh_interval + ); + loop { + tokio::select! { + _ = interval.tick() => {} + _ = cancel.cancelled() => { + break; + } + } + let res = self.clone().refresh(cancel.clone()).await; + if let Err(e) = res { + tracing::error!("Failed to refresh feature flags: {e:#?}"); + } + } + } +}