fix(pageserver): make posthog config parsing more robust (#12356)

## Problem

In our infra config, we have to split server_api_key and other fields in
two files: the former one in the sops file, and the latter one in the
normal config. It creates the situation that we might misconfigure some
regions that it only has part of the fields available, causing
storcon/pageserver refuse to start.

## Summary of changes

Allow PostHog config to have part of the fields available. Parse it
later.

Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
Alex Chi Z.
2025-06-26 11:49:08 -04:00
committed by GitHub
parent 605fb04f89
commit 33c0d5e2f4
7 changed files with 95 additions and 30 deletions

1
Cargo.lock generated
View File

@@ -4408,6 +4408,7 @@ dependencies = [
"postgres_backend",
"postgres_ffi_types",
"postgres_versioninfo",
"posthog_client_lite",
"rand 0.8.5",
"remote_storage",
"reqwest",

View File

@@ -19,6 +19,7 @@ byteorder.workspace = true
utils.workspace = true
postgres_ffi_types.workspace = true
postgres_versioninfo.workspace = true
posthog_client_lite.workspace = true
enum-map.workspace = true
strum.workspace = true
strum_macros.workspace = true

View File

@@ -4,6 +4,7 @@ use camino::Utf8PathBuf;
mod tests;
use const_format::formatcp;
use posthog_client_lite::PostHogClientConfig;
pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
@@ -68,15 +69,25 @@ impl Display for NodeMetadata {
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct PostHogConfig {
/// PostHog project ID
pub project_id: String,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub project_id: Option<String>,
/// Server-side (private) API key
pub server_api_key: String,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub server_api_key: Option<String>,
/// Client-side (public) API key
pub client_api_key: String,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub client_api_key: Option<String>,
/// Private API URL
pub private_api_url: String,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub private_api_url: Option<String>,
/// Public API URL
pub public_api_url: String,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub public_api_url: Option<String>,
/// Refresh interval for the feature flag spec.
/// The storcon will push the feature flag spec to the pageserver. If the pageserver does not receive
/// the spec for `refresh_interval`, it will fetch the spec from the PostHog API.
@@ -86,6 +97,33 @@ pub struct PostHogConfig {
pub refresh_interval: Option<Duration>,
}
impl PostHogConfig {
pub fn try_into_posthog_config(self) -> Result<PostHogClientConfig, &'static str> {
let Some(project_id) = self.project_id else {
return Err("project_id is required");
};
let Some(server_api_key) = self.server_api_key else {
return Err("server_api_key is required");
};
let Some(client_api_key) = self.client_api_key else {
return Err("client_api_key is required");
};
let Some(private_api_url) = self.private_api_url else {
return Err("private_api_url is required");
};
let Some(public_api_url) = self.public_api_url else {
return Err("public_api_url is required");
};
Ok(PostHogClientConfig {
project_id,
server_api_key,
client_api_key,
private_api_url,
public_api_url,
})
}
}
/// `pageserver.toml`
///
/// We use serde derive with `#[serde(default)]` to generate a deserializer

View File

@@ -781,4 +781,21 @@ mod tests {
PageServerConf::parse_and_validate(NodeId(0), config_toml, &workdir)
.expect("parse_and_validate");
}
#[test]
fn test_config_posthog_incomplete_config_is_valid() {
let input = r#"
control_plane_api = "http://localhost:6666"
[posthog_config]
server_api_key = "phs_AAA"
private_api_url = "https://us.posthog.com"
public_api_url = "https://us.i.posthog.com"
"#;
let config_toml = toml_edit::de::from_str::<pageserver_api::config::ConfigToml>(input)
.expect("posthogconfig is valid");
let workdir = Utf8PathBuf::from("/nonexistent");
PageServerConf::parse_and_validate(NodeId(0), config_toml, &workdir)
.expect("parse_and_validate");
}
}

View File

@@ -3,7 +3,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration};
use arc_swap::ArcSwap;
use pageserver_api::config::NodeMetadata;
use posthog_client_lite::{
CaptureEvent, FeatureResolverBackgroundLoop, PostHogClientConfig, PostHogEvaluationError,
CaptureEvent, FeatureResolverBackgroundLoop, PostHogEvaluationError,
PostHogFlagFilterPropertyValue,
};
use remote_storage::RemoteStorageKind;
@@ -45,16 +45,24 @@ impl FeatureResolver {
) -> anyhow::Result<Self> {
// DO NOT block in this function: make it return as fast as possible to avoid startup delays.
if let Some(posthog_config) = &conf.posthog_config {
let inner = FeatureResolverBackgroundLoop::new(
PostHogClientConfig {
server_api_key: posthog_config.server_api_key.clone(),
client_api_key: posthog_config.client_api_key.clone(),
project_id: posthog_config.project_id.clone(),
private_api_url: posthog_config.private_api_url.clone(),
public_api_url: posthog_config.public_api_url.clone(),
},
shutdown_pageserver,
);
let posthog_client_config = match posthog_config.clone().try_into_posthog_config() {
Ok(config) => config,
Err(e) => {
tracing::warn!(
"invalid posthog config, skipping posthog integration: {}",
e
);
return Ok(FeatureResolver {
inner: None,
internal_properties: None,
force_overrides_for_testing: Arc::new(ArcSwap::new(Arc::new(
HashMap::new(),
))),
});
}
};
let inner =
FeatureResolverBackgroundLoop::new(posthog_client_config, shutdown_pageserver);
let inner = Arc::new(inner);
// The properties shared by all tenants on this pageserver.

View File

@@ -560,9 +560,15 @@ async fn async_main() -> anyhow::Result<()> {
let cancel_bg = cancel.clone();
let task = tokio::task::spawn(
async move {
let feature_flag_service = FeatureFlagService::new(service, posthog_config);
let feature_flag_service = Arc::new(feature_flag_service);
feature_flag_service.run(cancel_bg).await
match FeatureFlagService::new(service, posthog_config) {
Ok(feature_flag_service) => {
let feature_flag_service = Arc::new(feature_flag_service);
feature_flag_service.run(cancel_bg).await
}
Err(e) => {
tracing::warn!("Failed to create feature flag service: {}", e);
}
};
}
.instrument(tracing::info_span!("feature_flag_service")),
);

View File

@@ -3,7 +3,7 @@ use std::{sync::Arc, time::Duration};
use futures::StreamExt;
use pageserver_api::config::PostHogConfig;
use pageserver_client::mgmt_api;
use posthog_client_lite::{PostHogClient, PostHogClientConfig};
use posthog_client_lite::PostHogClient;
use reqwest::StatusCode;
use tokio::time::MissedTickBehavior;
use tokio_util::sync::CancellationToken;
@@ -20,20 +20,14 @@ pub struct FeatureFlagService {
const DEFAULT_POSTHOG_REFRESH_INTERVAL: Duration = Duration::from_secs(30);
impl FeatureFlagService {
pub fn new(service: Arc<Service>, config: PostHogConfig) -> Self {
let client = PostHogClient::new(PostHogClientConfig {
project_id: config.project_id.clone(),
server_api_key: config.server_api_key.clone(),
client_api_key: config.client_api_key.clone(),
private_api_url: config.private_api_url.clone(),
public_api_url: config.public_api_url.clone(),
});
Self {
pub fn new(service: Arc<Service>, config: PostHogConfig) -> Result<Self, &'static str> {
let client = PostHogClient::new(config.clone().try_into_posthog_config()?);
Ok(Self {
service,
config,
client,
http_client: reqwest::Client::new(),
}
})
}
async fn refresh(self: Arc<Self>, cancel: CancellationToken) -> Result<(), anyhow::Error> {