mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-04 12:02:55 +00:00
feat(storcon): retrieve feature flag and pass to pageservers (#12324)
## Problem part of https://github.com/neondatabase/neon/issues/11813 ## Summary of changes It costs $$$ to directly retrieve the feature flags from the pageserver. Therefore, this patch adds new APIs to retrieve the spec from the storcon and updates it via pageserver. * Storcon retrieves the feature flag and send it to the pageservers. * If the feature flag gets updated outside of the normal refresh loop of the pageserver, pageserver won't fetch the flags on its own as long as the last updated time <= refresh_period. Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
@@ -27,6 +27,7 @@ governor.workspace = true
|
||||
hex.workspace = true
|
||||
hyper0.workspace = true
|
||||
humantime.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
itertools.workspace = true
|
||||
json-structural-diff.workspace = true
|
||||
lasso.workspace = true
|
||||
@@ -34,6 +35,7 @@ once_cell.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
pageserver_client.workspace = true
|
||||
postgres_connection.workspace = true
|
||||
posthog_client_lite.workspace = true
|
||||
rand.workspace = true
|
||||
reqwest = { workspace = true, features = ["stream"] }
|
||||
routerify.workspace = true
|
||||
|
||||
@@ -14,11 +14,13 @@ use http_utils::tls_certs::ReloadingCertificateResolver;
|
||||
use hyper0::Uri;
|
||||
use metrics::BuildInfo;
|
||||
use metrics::launch_timestamp::LaunchTimestamp;
|
||||
use pageserver_api::config::PostHogConfig;
|
||||
use reqwest::Certificate;
|
||||
use storage_controller::http::make_router;
|
||||
use storage_controller::metrics::preinitialize_metrics;
|
||||
use storage_controller::persistence::Persistence;
|
||||
use storage_controller::service::chaos_injector::ChaosInjector;
|
||||
use storage_controller::service::feature_flag::FeatureFlagService;
|
||||
use storage_controller::service::{
|
||||
Config, HEARTBEAT_INTERVAL_DEFAULT, LONG_RECONCILE_THRESHOLD_DEFAULT,
|
||||
MAX_OFFLINE_INTERVAL_DEFAULT, MAX_WARMING_UP_INTERVAL_DEFAULT,
|
||||
@@ -252,6 +254,8 @@ struct Secrets {
|
||||
peer_jwt_token: Option<String>,
|
||||
}
|
||||
|
||||
const POSTHOG_CONFIG_ENV: &str = "POSTHOG_CONFIG";
|
||||
|
||||
impl Secrets {
|
||||
const DATABASE_URL_ENV: &'static str = "DATABASE_URL";
|
||||
const PAGESERVER_JWT_TOKEN_ENV: &'static str = "PAGESERVER_JWT_TOKEN";
|
||||
@@ -409,6 +413,18 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
None => Vec::new(),
|
||||
};
|
||||
|
||||
let posthog_config = if let Ok(json) = std::env::var(POSTHOG_CONFIG_ENV) {
|
||||
let res: Result<PostHogConfig, _> = serde_json::from_str(&json);
|
||||
if let Ok(config) = res {
|
||||
Some(config)
|
||||
} else {
|
||||
tracing::warn!("Invalid posthog config: {json}");
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let config = Config {
|
||||
pageserver_jwt_token: secrets.pageserver_jwt_token,
|
||||
safekeeper_jwt_token: secrets.safekeeper_jwt_token,
|
||||
@@ -455,6 +471,7 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
timelines_onto_safekeepers: args.timelines_onto_safekeepers,
|
||||
use_local_compute_notifications: args.use_local_compute_notifications,
|
||||
timeline_safekeeper_count: args.timeline_safekeeper_count,
|
||||
posthog_config: posthog_config.clone(),
|
||||
#[cfg(feature = "testing")]
|
||||
kick_secondary_downloads: args.kick_secondary_downloads,
|
||||
};
|
||||
@@ -537,6 +554,23 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
)
|
||||
});
|
||||
|
||||
let feature_flag_task = if let Some(posthog_config) = posthog_config {
|
||||
let service = service.clone();
|
||||
let cancel = CancellationToken::new();
|
||||
let cancel_bg = cancel.clone();
|
||||
let task = tokio::task::spawn(
|
||||
async move {
|
||||
let feature_flag_service = FeatureFlagService::new(service, posthog_config);
|
||||
let feature_flag_service = Arc::new(feature_flag_service);
|
||||
feature_flag_service.run(cancel_bg).await
|
||||
}
|
||||
.instrument(tracing::info_span!("feature_flag_service")),
|
||||
);
|
||||
Some((task, cancel))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Wait until we receive a signal
|
||||
let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())?;
|
||||
let mut sigquit = tokio::signal::unix::signal(SignalKind::quit())?;
|
||||
@@ -584,6 +618,12 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
chaos_jh.await.ok();
|
||||
}
|
||||
|
||||
// If we were running the feature flag service, stop that so that we're not calling into Service while it shuts down
|
||||
if let Some((feature_flag_task, feature_flag_cancel)) = feature_flag_task {
|
||||
feature_flag_cancel.cancel();
|
||||
feature_flag_task.await.ok();
|
||||
}
|
||||
|
||||
service.shutdown().await;
|
||||
tracing::info!("Service shutdown complete");
|
||||
|
||||
|
||||
@@ -376,4 +376,13 @@ impl PageserverClient {
|
||||
.await
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) async fn update_feature_flag_spec(&self, spec: String) -> Result<()> {
|
||||
measured_request!(
|
||||
"update_feature_flag_spec",
|
||||
crate::metrics::Method::Post,
|
||||
&self.node_id_label,
|
||||
self.inner.update_feature_flag_spec(spec).await
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
pub mod chaos_injector;
|
||||
mod context_iterator;
|
||||
pub mod feature_flag;
|
||||
pub(crate) mod safekeeper_reconciler;
|
||||
mod safekeeper_service;
|
||||
|
||||
@@ -25,6 +26,7 @@ use futures::stream::FuturesUnordered;
|
||||
use http_utils::error::ApiError;
|
||||
use hyper::Uri;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::config::PostHogConfig;
|
||||
use pageserver_api::controller_api::{
|
||||
AvailabilityZone, MetadataHealthRecord, MetadataHealthUpdateRequest, NodeAvailability,
|
||||
NodeRegisterRequest, NodeSchedulingPolicy, NodeShard, NodeShardResponse, PlacementPolicy,
|
||||
@@ -471,6 +473,9 @@ pub struct Config {
|
||||
/// Safekeepers will be choosen from different availability zones.
|
||||
pub timeline_safekeeper_count: i64,
|
||||
|
||||
/// PostHog integration config
|
||||
pub posthog_config: Option<PostHogConfig>,
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
pub kick_secondary_downloads: bool,
|
||||
}
|
||||
|
||||
117
storage_controller/src/service/feature_flag.rs
Normal file
117
storage_controller/src/service/feature_flag.rs
Normal file
@@ -0,0 +1,117 @@
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use futures::StreamExt;
|
||||
use pageserver_api::config::PostHogConfig;
|
||||
use pageserver_client::mgmt_api;
|
||||
use posthog_client_lite::{PostHogClient, PostHogClientConfig};
|
||||
use reqwest::StatusCode;
|
||||
use tokio::time::MissedTickBehavior;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::{pageserver_client::PageserverClient, service::Service};
|
||||
|
||||
pub struct FeatureFlagService {
|
||||
service: Arc<Service>,
|
||||
config: PostHogConfig,
|
||||
client: PostHogClient,
|
||||
http_client: reqwest::Client,
|
||||
}
|
||||
|
||||
const DEFAULT_POSTHOG_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
|
||||
|
||||
impl FeatureFlagService {
|
||||
pub fn new(service: Arc<Service>, config: PostHogConfig) -> Self {
|
||||
let client = PostHogClient::new(PostHogClientConfig {
|
||||
project_id: config.project_id.clone(),
|
||||
server_api_key: config.server_api_key.clone(),
|
||||
client_api_key: config.client_api_key.clone(),
|
||||
private_api_url: config.private_api_url.clone(),
|
||||
public_api_url: config.public_api_url.clone(),
|
||||
});
|
||||
Self {
|
||||
service,
|
||||
config,
|
||||
client,
|
||||
http_client: reqwest::Client::new(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn refresh(self: Arc<Self>, cancel: CancellationToken) -> Result<(), anyhow::Error> {
|
||||
let nodes = {
|
||||
let inner = self.service.inner.read().unwrap();
|
||||
inner.nodes.clone()
|
||||
};
|
||||
|
||||
let feature_flag_spec = self.client.get_feature_flags_local_evaluation_raw().await?;
|
||||
let stream = futures::stream::iter(nodes.values().cloned()).map(|node| {
|
||||
let this = self.clone();
|
||||
let feature_flag_spec = feature_flag_spec.clone();
|
||||
async move {
|
||||
let res = async {
|
||||
let client = PageserverClient::new(
|
||||
node.get_id(),
|
||||
this.http_client.clone(),
|
||||
node.base_url(),
|
||||
// TODO: what if we rotate the token during storcon lifetime?
|
||||
this.service.config.pageserver_jwt_token.as_deref(),
|
||||
);
|
||||
|
||||
client.update_feature_flag_spec(feature_flag_spec).await?;
|
||||
tracing::info!(
|
||||
"Updated {}({}) with feature flag spec",
|
||||
node.get_id(),
|
||||
node.base_url()
|
||||
);
|
||||
Ok::<_, mgmt_api::Error>(())
|
||||
};
|
||||
|
||||
if let Err(e) = res.await {
|
||||
if let mgmt_api::Error::ApiError(status, _) = e {
|
||||
if status == StatusCode::NOT_FOUND {
|
||||
// This is expected during deployments where the API is not available, so we can ignore it
|
||||
return;
|
||||
}
|
||||
}
|
||||
tracing::warn!(
|
||||
"Failed to update feature flag spec for {}: {e}",
|
||||
node.get_id()
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
let mut stream = stream.buffer_unordered(8);
|
||||
|
||||
while stream.next().await.is_some() {
|
||||
if cancel.is_cancelled() {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn run(self: Arc<Self>, cancel: CancellationToken) {
|
||||
let refresh_interval = self
|
||||
.config
|
||||
.refresh_interval
|
||||
.unwrap_or(DEFAULT_POSTHOG_REFRESH_INTERVAL);
|
||||
let mut interval = tokio::time::interval(refresh_interval);
|
||||
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
|
||||
tracing::info!(
|
||||
"Starting feature flag service with refresh interval: {:?}",
|
||||
refresh_interval
|
||||
);
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = interval.tick() => {}
|
||||
_ = cancel.cancelled() => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
let res = self.clone().refresh(cancel.clone()).await;
|
||||
if let Err(e) = res {
|
||||
tracing::error!("Failed to refresh feature flags: {e:#?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user