feat(pageserver): report tenant properties to posthog (#12113)

## Problem

Part of https://github.com/neondatabase/neon/issues/11813

In PostHog UI, we need to create the properties before using them as a
filter. We report all variants automatically when we start the
pageserver. In the future, we can report all real tenants instead of
fake tenants (we do that now to save money + we don't need real tenants
in the UI).

## Summary of changes

* Collect `region`, `availability_zone`, `pageserver_id` properties and
use them in the feature evaluation.
* Report 10 fake tenants on each pageserver startup.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
Alex Chi Z.
2025-06-05 15:48:36 +08:00
committed by GitHub
parent c8a96cf722
commit d8ebd1d771
4 changed files with 190 additions and 18 deletions

View File

@@ -6,7 +6,7 @@ use arc_swap::ArcSwap;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, info_span};
use crate::{FeatureStore, PostHogClient, PostHogClientConfig};
use crate::{CaptureEvent, FeatureStore, PostHogClient, PostHogClientConfig};
/// A background loop that fetches feature flags from PostHog and updates the feature store.
pub struct FeatureResolverBackgroundLoop {
@@ -24,9 +24,16 @@ impl FeatureResolverBackgroundLoop {
}
}
pub fn spawn(self: Arc<Self>, handle: &tokio::runtime::Handle, refresh_period: Duration) {
pub fn spawn(
self: Arc<Self>,
handle: &tokio::runtime::Handle,
refresh_period: Duration,
fake_tenants: Vec<CaptureEvent>,
) {
let this = self.clone();
let cancel = self.cancel.clone();
// Main loop of updating the feature flags.
handle.spawn(
async move {
tracing::info!("Starting PostHog feature resolver");
@@ -56,6 +63,22 @@ impl FeatureResolverBackgroundLoop {
}
.instrument(info_span!("posthog_feature_resolver")),
);
// Report fake tenants to PostHog so that we have the combination of all the properties in the UI.
// Do one report per pageserver restart.
let this = self.clone();
handle.spawn(
async move {
tracing::info!("Starting PostHog feature reporter");
for tenant in &fake_tenants {
tracing::info!("Reporting fake tenant: {:?}", tenant);
}
if let Err(e) = this.posthog_client.capture_event_batch(&fake_tenants).await {
tracing::warn!("Cannot report fake tenants: {}", e);
}
}
.instrument(info_span!("posthog_feature_reporter")),
);
}
pub fn feature_store(&self) -> Arc<FeatureStore> {

View File

@@ -64,7 +64,7 @@ pub struct LocalEvaluationFlagFilterProperty {
operator: String,
}
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(untagged)]
pub enum PostHogFlagFilterPropertyValue {
String(String),
@@ -507,6 +507,13 @@ pub struct PostHogClient {
client: reqwest::Client,
}
#[derive(Serialize, Debug)]
pub struct CaptureEvent {
pub event: String,
pub distinct_id: String,
pub properties: serde_json::Value,
}
impl PostHogClient {
pub fn new(config: PostHogClientConfig) -> Self {
let client = reqwest::Client::new();
@@ -570,12 +577,12 @@ impl PostHogClient {
&self,
event: &str,
distinct_id: &str,
properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
properties: &serde_json::Value,
) -> anyhow::Result<()> {
// PUBLIC_URL/capture/
// with bearer token of self.client_api_key
let url = format!("{}/capture/", self.config.public_api_url);
self.client
let response = self
.client
.post(url)
.body(serde_json::to_string(&json!({
"api_key": self.config.client_api_key,
@@ -585,6 +592,39 @@ impl PostHogClient {
}))?)
.send()
.await?;
let status = response.status();
let body = response.text().await?;
if !status.is_success() {
return Err(anyhow::anyhow!(
"Failed to capture events: {}, {}",
status,
body
));
}
Ok(())
}
pub async fn capture_event_batch(&self, events: &[CaptureEvent]) -> anyhow::Result<()> {
// PUBLIC_URL/batch/
let url = format!("{}/batch/", self.config.public_api_url);
let response = self
.client
.post(url)
.body(serde_json::to_string(&json!({
"api_key": self.config.client_api_key,
"batch": events,
}))?)
.send()
.await?;
let status = response.status();
let body = response.text().await?;
if !status.is_success() {
return Err(anyhow::anyhow!(
"Failed to capture events: {}, {}",
status,
body
));
}
Ok(())
}
}

View File

@@ -1,8 +1,11 @@
use std::{collections::HashMap, sync::Arc, time::Duration};
use posthog_client_lite::{
FeatureResolverBackgroundLoop, PostHogClientConfig, PostHogEvaluationError,
CaptureEvent, FeatureResolverBackgroundLoop, PostHogClientConfig, PostHogEvaluationError,
PostHogFlagFilterPropertyValue,
};
use remote_storage::RemoteStorageKind;
use serde_json::json;
use tokio_util::sync::CancellationToken;
use utils::id::TenantId;
@@ -11,11 +14,15 @@ use crate::{config::PageServerConf, metrics::FEATURE_FLAG_EVALUATION};
#[derive(Clone)]
pub struct FeatureResolver {
inner: Option<Arc<FeatureResolverBackgroundLoop>>,
internal_properties: Option<Arc<HashMap<String, PostHogFlagFilterPropertyValue>>>,
}
impl FeatureResolver {
pub fn new_disabled() -> Self {
Self { inner: None }
Self {
inner: None,
internal_properties: None,
}
}
pub fn spawn(
@@ -36,14 +43,114 @@ impl FeatureResolver {
shutdown_pageserver,
);
let inner = Arc::new(inner);
// TODO: make this configurable
inner.clone().spawn(handle, Duration::from_secs(60));
Ok(FeatureResolver { inner: Some(inner) })
// The properties shared by all tenants on this pageserver.
let internal_properties = {
let mut properties = HashMap::new();
properties.insert(
"pageserver_id".to_string(),
PostHogFlagFilterPropertyValue::String(conf.id.to_string()),
);
if let Some(availability_zone) = &conf.availability_zone {
properties.insert(
"availability_zone".to_string(),
PostHogFlagFilterPropertyValue::String(availability_zone.clone()),
);
}
// Infer region based on the remote storage config.
if let Some(remote_storage) = &conf.remote_storage_config {
match &remote_storage.storage {
RemoteStorageKind::AwsS3(config) => {
properties.insert(
"region".to_string(),
PostHogFlagFilterPropertyValue::String(format!(
"aws-{}",
config.bucket_region
)),
);
}
RemoteStorageKind::AzureContainer(config) => {
properties.insert(
"region".to_string(),
PostHogFlagFilterPropertyValue::String(format!(
"azure-{}",
config.container_region
)),
);
}
RemoteStorageKind::LocalFs { .. } => {
properties.insert(
"region".to_string(),
PostHogFlagFilterPropertyValue::String("local".to_string()),
);
}
}
}
// TODO: add pageserver URL.
Arc::new(properties)
};
let fake_tenants = {
let mut tenants = Vec::new();
for i in 0..10 {
let distinct_id = format!(
"fake_tenant_{}_{}_{}",
conf.availability_zone.as_deref().unwrap_or_default(),
conf.id,
i
);
let properties = Self::collect_properties_inner(
distinct_id.clone(),
Some(&internal_properties),
);
tenants.push(CaptureEvent {
event: "initial_tenant_report".to_string(),
distinct_id,
properties: json!({ "$set": properties }), // use `$set` to set the person properties instead of the event properties
});
}
tenants
};
// TODO: make refresh period configurable
inner
.clone()
.spawn(handle, Duration::from_secs(60), fake_tenants);
Ok(FeatureResolver {
inner: Some(inner),
internal_properties: Some(internal_properties),
})
} else {
Ok(FeatureResolver { inner: None })
Ok(FeatureResolver {
inner: None,
internal_properties: None,
})
}
}
fn collect_properties_inner(
tenant_id: String,
internal_properties: Option<&HashMap<String, PostHogFlagFilterPropertyValue>>,
) -> HashMap<String, PostHogFlagFilterPropertyValue> {
let mut properties = HashMap::new();
if let Some(internal_properties) = internal_properties {
for (key, value) in internal_properties.iter() {
properties.insert(key.clone(), value.clone());
}
}
properties.insert(
"tenant_id".to_string(),
PostHogFlagFilterPropertyValue::String(tenant_id),
);
properties
}
/// Collect all properties availble for the feature flag evaluation.
pub(crate) fn collect_properties(
&self,
tenant_id: TenantId,
) -> HashMap<String, PostHogFlagFilterPropertyValue> {
Self::collect_properties_inner(tenant_id.to_string(), self.internal_properties.as_deref())
}
/// Evaluate a multivariate feature flag. Currently, we do not support any properties.
///
/// Error handling: the caller should inspect the error and decide the behavior when a feature flag
@@ -58,7 +165,7 @@ impl FeatureResolver {
let res = inner.feature_store().evaluate_multivariate(
flag_key,
&tenant_id.to_string(),
&HashMap::new(),
&self.collect_properties(tenant_id),
);
match &res {
Ok(value) => {
@@ -96,7 +203,7 @@ impl FeatureResolver {
let res = inner.feature_store().evaluate_boolean(
flag_key,
&tenant_id.to_string(),
&HashMap::new(),
&self.collect_properties(tenant_id),
);
match &res {
Ok(()) => {

View File

@@ -43,6 +43,7 @@ use pageserver_api::models::{
use pageserver_api::shard::{ShardCount, TenantShardId};
use remote_storage::{DownloadError, GenericRemoteStorage, TimeTravelError};
use scopeguard::defer;
use serde_json::json;
use tenant_size_model::svg::SvgBranchKind;
use tenant_size_model::{SizeResult, StorageModel};
use tokio::time::Instant;
@@ -3679,23 +3680,24 @@ async fn tenant_evaluate_feature_flag(
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
let properties = tenant.feature_resolver.collect_properties(tenant_shard_id.tenant_id);
if as_type == "boolean" {
let result = tenant.feature_resolver.evaluate_boolean(&flag, tenant_shard_id.tenant_id);
let result = result.map(|_| true).map_err(|e| e.to_string());
json_response(StatusCode::OK, result)
json_response(StatusCode::OK, json!({ "result": result, "properties": properties }))
} else if as_type == "multivariate" {
let result = tenant.feature_resolver.evaluate_multivariate(&flag, tenant_shard_id.tenant_id).map_err(|e| e.to_string());
json_response(StatusCode::OK, result)
json_response(StatusCode::OK, json!({ "result": result, "properties": properties }))
} else {
// Auto infer the type of the feature flag.
let is_boolean = tenant.feature_resolver.is_feature_flag_boolean(&flag).map_err(|e| ApiError::InternalServerError(anyhow::anyhow!("{e}")))?;
if is_boolean {
let result = tenant.feature_resolver.evaluate_boolean(&flag, tenant_shard_id.tenant_id);
let result = result.map(|_| true).map_err(|e| e.to_string());
json_response(StatusCode::OK, result)
json_response(StatusCode::OK, json!({ "result": result, "properties": properties }))
} else {
let result = tenant.feature_resolver.evaluate_multivariate(&flag, tenant_shard_id.tenant_id).map_err(|e| e.to_string());
json_response(StatusCode::OK, result)
json_response(StatusCode::OK, json!({ "result": result, "properties": properties }))
}
}
}