feat(pageserver): integrate PostHog with gc-compaction rollout (#11917)

## Problem

part of https://github.com/neondatabase/neon/issues/11813

## Summary of changes

* Integrate feature store with tenant structure.
* gc-compaction picks up the current strategy from the feature store.
* We only log them for now for testing purpose. They will not be used
until we have more patches to support different strategies defined in
PostHog.
* We don't support property-based evaulation for now; it will be
implemented later.
* Evaluating result of the feature flag is not cached -- it's not
efficient and cannot be used on hot path right now.
* We don't report the evaluation result back to PostHog right now.

I plan to enable it in staging once we get the patch merged.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
Alex Chi Z.
2025-05-26 21:09:37 +08:00
committed by GitHub
parent 841517ee37
commit dc953de85d
13 changed files with 267 additions and 79 deletions

View File

@@ -45,6 +45,21 @@ pub struct NodeMetadata {
pub other: HashMap<String, serde_json::Value>,
}
/// PostHog integration config.
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct PostHogConfig {
/// PostHog project ID
pub project_id: String,
/// Server-side (private) API key
pub server_api_key: String,
/// Client-side (public) API key
pub client_api_key: String,
/// Private API URL
pub private_api_url: String,
/// Public API URL
pub public_api_url: String,
}
/// `pageserver.toml`
///
/// We use serde derive with `#[serde(default)]` to generate a deserializer
@@ -186,6 +201,8 @@ pub struct ConfigToml {
pub tracing: Option<Tracing>,
pub enable_tls_page_service_api: bool,
pub dev_mode: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub posthog_config: Option<PostHogConfig>,
pub timeline_import_config: TimelineImportConfig,
#[serde(skip_serializing_if = "Option::is_none")]
pub basebackup_cache_config: Option<BasebackupCacheConfig>,
@@ -701,6 +718,7 @@ impl Default for ConfigToml {
import_job_checkpoint_threshold: NonZeroUsize::new(128).unwrap(),
},
basebackup_cache_config: None,
posthog_config: None,
}
}
}

View File

@@ -6,9 +6,14 @@ license.workspace = true
[dependencies]
anyhow.workspace = true
arc-swap.workspace = true
reqwest.workspace = true
serde.workspace = true
serde_json.workspace = true
serde.workspace = true
sha2.workspace = true
workspace_hack.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time"] }
tokio-util.workspace = true
tracing-utils.workspace = true
tracing.workspace = true
workspace_hack.workspace = true

View File

@@ -0,0 +1,59 @@
//! A background loop that fetches feature flags from PostHog and updates the feature store.
use std::{sync::Arc, time::Duration};
use arc_swap::ArcSwap;
use tokio_util::sync::CancellationToken;
use crate::{FeatureStore, PostHogClient, PostHogClientConfig};
/// A background loop that fetches feature flags from PostHog and updates the feature store.
pub struct FeatureResolverBackgroundLoop {
posthog_client: PostHogClient,
feature_store: ArcSwap<FeatureStore>,
cancel: CancellationToken,
}
impl FeatureResolverBackgroundLoop {
pub fn new(config: PostHogClientConfig, shutdown_pageserver: CancellationToken) -> Self {
Self {
posthog_client: PostHogClient::new(config),
feature_store: ArcSwap::new(Arc::new(FeatureStore::new())),
cancel: shutdown_pageserver,
}
}
pub fn spawn(self: Arc<Self>, handle: &tokio::runtime::Handle, refresh_period: Duration) {
let this = self.clone();
let cancel = self.cancel.clone();
handle.spawn(async move {
tracing::info!("Starting PostHog feature resolver");
let mut ticker = tokio::time::interval(refresh_period);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = ticker.tick() => {}
_ = cancel.cancelled() => break
}
let resp = match this
.posthog_client
.get_feature_flags_local_evaluation()
.await
{
Ok(resp) => resp,
Err(e) => {
tracing::warn!("Cannot get feature flags: {}", e);
continue;
}
};
let feature_store = FeatureStore::new_with_flags(resp.flags);
this.feature_store.store(Arc::new(feature_store));
}
tracing::info!("PostHog feature resolver stopped");
});
}
pub fn feature_store(&self) -> Arc<FeatureStore> {
self.feature_store.load_full()
}
}

View File

@@ -1,5 +1,9 @@
//! A lite version of the PostHog client that only supports local evaluation of feature flags.
mod background_loop;
pub use background_loop::FeatureResolverBackgroundLoop;
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
@@ -20,8 +24,7 @@ pub enum PostHogEvaluationError {
#[derive(Deserialize)]
pub struct LocalEvaluationResponse {
#[allow(dead_code)]
flags: Vec<LocalEvaluationFlag>,
pub flags: Vec<LocalEvaluationFlag>,
}
#[derive(Deserialize)]
@@ -94,6 +97,12 @@ impl FeatureStore {
}
}
pub fn new_with_flags(flags: Vec<LocalEvaluationFlag>) -> Self {
let mut store = Self::new();
store.set_flags(flags);
store
}
pub fn set_flags(&mut self, flags: Vec<LocalEvaluationFlag>) {
self.flags.clear();
for flag in flags {
@@ -267,6 +276,7 @@ impl FeatureStore {
&self,
flag_key: &str,
user_id: &str,
properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
) -> Result<String, PostHogEvaluationError> {
let hash_on_global_rollout_percentage =
Self::consistent_hash(user_id, flag_key, "multivariate");
@@ -276,7 +286,7 @@ impl FeatureStore {
flag_key,
hash_on_global_rollout_percentage,
hash_on_group_rollout_percentage,
&HashMap::new(),
properties,
)
}
@@ -344,6 +354,19 @@ impl FeatureStore {
}
}
pub struct PostHogClientConfig {
/// The server API key.
pub server_api_key: String,
/// The client API key.
pub client_api_key: String,
/// The project ID.
pub project_id: String,
/// The private API URL.
pub private_api_url: String,
/// The public API URL.
pub public_api_url: String,
}
/// A lite PostHog client.
///
/// At the point of writing this code, PostHog does not have a functional Rust client with feature flag support.
@@ -360,37 +383,16 @@ impl FeatureStore {
/// want to report the feature flag usage back to PostHog. The current plan is to use PostHog only as an UI to
/// configure feature flags so it is very likely that the client API will not be used.
pub struct PostHogClient {
/// The server API key.
server_api_key: String,
/// The client API key.
client_api_key: String,
/// The project ID.
project_id: String,
/// The private API URL.
private_api_url: String,
/// The public API URL.
public_api_url: String,
/// The config.
config: PostHogClientConfig,
/// The HTTP client.
client: reqwest::Client,
}
impl PostHogClient {
pub fn new(
server_api_key: String,
client_api_key: String,
project_id: String,
private_api_url: String,
public_api_url: String,
) -> Self {
pub fn new(config: PostHogClientConfig) -> Self {
let client = reqwest::Client::new();
Self {
server_api_key,
client_api_key,
project_id,
private_api_url,
public_api_url,
client,
}
Self { config, client }
}
pub fn new_with_us_region(
@@ -398,13 +400,13 @@ impl PostHogClient {
client_api_key: String,
project_id: String,
) -> Self {
Self::new(
Self::new(PostHogClientConfig {
server_api_key,
client_api_key,
project_id,
"https://us.posthog.com".to_string(),
"https://us.i.posthog.com".to_string(),
)
private_api_url: "https://us.posthog.com".to_string(),
public_api_url: "https://us.i.posthog.com".to_string(),
})
}
/// Fetch the feature flag specs from the server.
@@ -422,12 +424,12 @@ impl PostHogClient {
// with bearer token of self.server_api_key
let url = format!(
"{}/api/projects/{}/feature_flags/local_evaluation",
self.private_api_url, self.project_id
self.config.private_api_url, self.config.project_id
);
let response = self
.client
.get(url)
.bearer_auth(&self.server_api_key)
.bearer_auth(&self.config.server_api_key)
.send()
.await?;
let body = response.text().await?;
@@ -446,11 +448,11 @@ impl PostHogClient {
) -> anyhow::Result<()> {
// PUBLIC_URL/capture/
// with bearer token of self.client_api_key
let url = format!("{}/capture/", self.public_api_url);
let url = format!("{}/capture/", self.config.public_api_url);
self.client
.post(url)
.body(serde_json::to_string(&json!({
"api_key": self.client_api_key,
"api_key": self.config.client_api_key,
"distinct_id": distinct_id,
"event": event,
"properties": properties,