From d8ebd1d7717a4871cca5dda1b60ae4167e4bf514 Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Thu, 5 Jun 2025 15:48:36 +0800 Subject: [PATCH] feat(pageserver): report tenant properties to posthog (#12113) ## Problem Part of https://github.com/neondatabase/neon/issues/11813 In PostHog UI, we need to create the properties before using them as a filter. We report all variants automatically when we start the pageserver. In the future, we can report all real tenants instead of fake tenants (we do that now to save money + we don't need real tenants in the UI). ## Summary of changes * Collect `region`, `availability_zone`, `pageserver_id` properties and use them in the feature evaluation. * Report 10 fake tenants on each pageserver startup. --------- Signed-off-by: Alex Chi Z --- .../src/background_loop.rs | 27 +++- libs/posthog_client_lite/src/lib.rs | 48 ++++++- pageserver/src/feature_resolver.rs | 123 ++++++++++++++++-- pageserver/src/http/routes.rs | 10 +- 4 files changed, 190 insertions(+), 18 deletions(-) diff --git a/libs/posthog_client_lite/src/background_loop.rs b/libs/posthog_client_lite/src/background_loop.rs index a05f6096b1..a404c76da9 100644 --- a/libs/posthog_client_lite/src/background_loop.rs +++ b/libs/posthog_client_lite/src/background_loop.rs @@ -6,7 +6,7 @@ use arc_swap::ArcSwap; use tokio_util::sync::CancellationToken; use tracing::{Instrument, info_span}; -use crate::{FeatureStore, PostHogClient, PostHogClientConfig}; +use crate::{CaptureEvent, FeatureStore, PostHogClient, PostHogClientConfig}; /// A background loop that fetches feature flags from PostHog and updates the feature store. pub struct FeatureResolverBackgroundLoop { @@ -24,9 +24,16 @@ impl FeatureResolverBackgroundLoop { } } - pub fn spawn(self: Arc, handle: &tokio::runtime::Handle, refresh_period: Duration) { + pub fn spawn( + self: Arc, + handle: &tokio::runtime::Handle, + refresh_period: Duration, + fake_tenants: Vec, + ) { let this = self.clone(); let cancel = self.cancel.clone(); + + // Main loop of updating the feature flags. handle.spawn( async move { tracing::info!("Starting PostHog feature resolver"); @@ -56,6 +63,22 @@ impl FeatureResolverBackgroundLoop { } .instrument(info_span!("posthog_feature_resolver")), ); + + // Report fake tenants to PostHog so that we have the combination of all the properties in the UI. + // Do one report per pageserver restart. + let this = self.clone(); + handle.spawn( + async move { + tracing::info!("Starting PostHog feature reporter"); + for tenant in &fake_tenants { + tracing::info!("Reporting fake tenant: {:?}", tenant); + } + if let Err(e) = this.posthog_client.capture_event_batch(&fake_tenants).await { + tracing::warn!("Cannot report fake tenants: {}", e); + } + } + .instrument(info_span!("posthog_feature_reporter")), + ); } pub fn feature_store(&self) -> Arc { diff --git a/libs/posthog_client_lite/src/lib.rs b/libs/posthog_client_lite/src/lib.rs index 281a8f8011..f607b1be0a 100644 --- a/libs/posthog_client_lite/src/lib.rs +++ b/libs/posthog_client_lite/src/lib.rs @@ -64,7 +64,7 @@ pub struct LocalEvaluationFlagFilterProperty { operator: String, } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] #[serde(untagged)] pub enum PostHogFlagFilterPropertyValue { String(String), @@ -507,6 +507,13 @@ pub struct PostHogClient { client: reqwest::Client, } +#[derive(Serialize, Debug)] +pub struct CaptureEvent { + pub event: String, + pub distinct_id: String, + pub properties: serde_json::Value, +} + impl PostHogClient { pub fn new(config: PostHogClientConfig) -> Self { let client = reqwest::Client::new(); @@ -570,12 +577,12 @@ impl PostHogClient { &self, event: &str, distinct_id: &str, - properties: &HashMap, + properties: &serde_json::Value, ) -> anyhow::Result<()> { // PUBLIC_URL/capture/ - // with bearer token of self.client_api_key let url = format!("{}/capture/", self.config.public_api_url); - self.client + let response = self + .client .post(url) .body(serde_json::to_string(&json!({ "api_key": self.config.client_api_key, @@ -585,6 +592,39 @@ impl PostHogClient { }))?) .send() .await?; + let status = response.status(); + let body = response.text().await?; + if !status.is_success() { + return Err(anyhow::anyhow!( + "Failed to capture events: {}, {}", + status, + body + )); + } + Ok(()) + } + + pub async fn capture_event_batch(&self, events: &[CaptureEvent]) -> anyhow::Result<()> { + // PUBLIC_URL/batch/ + let url = format!("{}/batch/", self.config.public_api_url); + let response = self + .client + .post(url) + .body(serde_json::to_string(&json!({ + "api_key": self.config.client_api_key, + "batch": events, + }))?) + .send() + .await?; + let status = response.status(); + let body = response.text().await?; + if !status.is_success() { + return Err(anyhow::anyhow!( + "Failed to capture events: {}, {}", + status, + body + )); + } Ok(()) } } diff --git a/pageserver/src/feature_resolver.rs b/pageserver/src/feature_resolver.rs index 7463186c06..50de3b691c 100644 --- a/pageserver/src/feature_resolver.rs +++ b/pageserver/src/feature_resolver.rs @@ -1,8 +1,11 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; use posthog_client_lite::{ - FeatureResolverBackgroundLoop, PostHogClientConfig, PostHogEvaluationError, + CaptureEvent, FeatureResolverBackgroundLoop, PostHogClientConfig, PostHogEvaluationError, + PostHogFlagFilterPropertyValue, }; +use remote_storage::RemoteStorageKind; +use serde_json::json; use tokio_util::sync::CancellationToken; use utils::id::TenantId; @@ -11,11 +14,15 @@ use crate::{config::PageServerConf, metrics::FEATURE_FLAG_EVALUATION}; #[derive(Clone)] pub struct FeatureResolver { inner: Option>, + internal_properties: Option>>, } impl FeatureResolver { pub fn new_disabled() -> Self { - Self { inner: None } + Self { + inner: None, + internal_properties: None, + } } pub fn spawn( @@ -36,14 +43,114 @@ impl FeatureResolver { shutdown_pageserver, ); let inner = Arc::new(inner); - // TODO: make this configurable - inner.clone().spawn(handle, Duration::from_secs(60)); - Ok(FeatureResolver { inner: Some(inner) }) + + // The properties shared by all tenants on this pageserver. + let internal_properties = { + let mut properties = HashMap::new(); + properties.insert( + "pageserver_id".to_string(), + PostHogFlagFilterPropertyValue::String(conf.id.to_string()), + ); + if let Some(availability_zone) = &conf.availability_zone { + properties.insert( + "availability_zone".to_string(), + PostHogFlagFilterPropertyValue::String(availability_zone.clone()), + ); + } + // Infer region based on the remote storage config. + if let Some(remote_storage) = &conf.remote_storage_config { + match &remote_storage.storage { + RemoteStorageKind::AwsS3(config) => { + properties.insert( + "region".to_string(), + PostHogFlagFilterPropertyValue::String(format!( + "aws-{}", + config.bucket_region + )), + ); + } + RemoteStorageKind::AzureContainer(config) => { + properties.insert( + "region".to_string(), + PostHogFlagFilterPropertyValue::String(format!( + "azure-{}", + config.container_region + )), + ); + } + RemoteStorageKind::LocalFs { .. } => { + properties.insert( + "region".to_string(), + PostHogFlagFilterPropertyValue::String("local".to_string()), + ); + } + } + } + // TODO: add pageserver URL. + Arc::new(properties) + }; + let fake_tenants = { + let mut tenants = Vec::new(); + for i in 0..10 { + let distinct_id = format!( + "fake_tenant_{}_{}_{}", + conf.availability_zone.as_deref().unwrap_or_default(), + conf.id, + i + ); + let properties = Self::collect_properties_inner( + distinct_id.clone(), + Some(&internal_properties), + ); + tenants.push(CaptureEvent { + event: "initial_tenant_report".to_string(), + distinct_id, + properties: json!({ "$set": properties }), // use `$set` to set the person properties instead of the event properties + }); + } + tenants + }; + // TODO: make refresh period configurable + inner + .clone() + .spawn(handle, Duration::from_secs(60), fake_tenants); + Ok(FeatureResolver { + inner: Some(inner), + internal_properties: Some(internal_properties), + }) } else { - Ok(FeatureResolver { inner: None }) + Ok(FeatureResolver { + inner: None, + internal_properties: None, + }) } } + fn collect_properties_inner( + tenant_id: String, + internal_properties: Option<&HashMap>, + ) -> HashMap { + let mut properties = HashMap::new(); + if let Some(internal_properties) = internal_properties { + for (key, value) in internal_properties.iter() { + properties.insert(key.clone(), value.clone()); + } + } + properties.insert( + "tenant_id".to_string(), + PostHogFlagFilterPropertyValue::String(tenant_id), + ); + properties + } + + /// Collect all properties availble for the feature flag evaluation. + pub(crate) fn collect_properties( + &self, + tenant_id: TenantId, + ) -> HashMap { + Self::collect_properties_inner(tenant_id.to_string(), self.internal_properties.as_deref()) + } + /// Evaluate a multivariate feature flag. Currently, we do not support any properties. /// /// Error handling: the caller should inspect the error and decide the behavior when a feature flag @@ -58,7 +165,7 @@ impl FeatureResolver { let res = inner.feature_store().evaluate_multivariate( flag_key, &tenant_id.to_string(), - &HashMap::new(), + &self.collect_properties(tenant_id), ); match &res { Ok(value) => { @@ -96,7 +203,7 @@ impl FeatureResolver { let res = inner.feature_store().evaluate_boolean( flag_key, &tenant_id.to_string(), - &HashMap::new(), + &self.collect_properties(tenant_id), ); match &res { Ok(()) => { diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 1effa10404..c8a2a0209f 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -43,6 +43,7 @@ use pageserver_api::models::{ use pageserver_api::shard::{ShardCount, TenantShardId}; use remote_storage::{DownloadError, GenericRemoteStorage, TimeTravelError}; use scopeguard::defer; +use serde_json::json; use tenant_size_model::svg::SvgBranchKind; use tenant_size_model::{SizeResult, StorageModel}; use tokio::time::Instant; @@ -3679,23 +3680,24 @@ async fn tenant_evaluate_feature_flag( let tenant = state .tenant_manager .get_attached_tenant_shard(tenant_shard_id)?; + let properties = tenant.feature_resolver.collect_properties(tenant_shard_id.tenant_id); if as_type == "boolean" { let result = tenant.feature_resolver.evaluate_boolean(&flag, tenant_shard_id.tenant_id); let result = result.map(|_| true).map_err(|e| e.to_string()); - json_response(StatusCode::OK, result) + json_response(StatusCode::OK, json!({ "result": result, "properties": properties })) } else if as_type == "multivariate" { let result = tenant.feature_resolver.evaluate_multivariate(&flag, tenant_shard_id.tenant_id).map_err(|e| e.to_string()); - json_response(StatusCode::OK, result) + json_response(StatusCode::OK, json!({ "result": result, "properties": properties })) } else { // Auto infer the type of the feature flag. let is_boolean = tenant.feature_resolver.is_feature_flag_boolean(&flag).map_err(|e| ApiError::InternalServerError(anyhow::anyhow!("{e}")))?; if is_boolean { let result = tenant.feature_resolver.evaluate_boolean(&flag, tenant_shard_id.tenant_id); let result = result.map(|_| true).map_err(|e| e.to_string()); - json_response(StatusCode::OK, result) + json_response(StatusCode::OK, json!({ "result": result, "properties": properties })) } else { let result = tenant.feature_resolver.evaluate_multivariate(&flag, tenant_shard_id.tenant_id).map_err(|e| e.to_string()); - json_response(StatusCode::OK, result) + json_response(StatusCode::OK, json!({ "result": result, "properties": properties })) } } }