diff --git a/libs/posthog_client_lite/src/background_loop.rs b/libs/posthog_client_lite/src/background_loop.rs index 9ffcda3728..a05f6096b1 100644 --- a/libs/posthog_client_lite/src/background_loop.rs +++ b/libs/posthog_client_lite/src/background_loop.rs @@ -4,6 +4,7 @@ use std::{sync::Arc, time::Duration}; use arc_swap::ArcSwap; use tokio_util::sync::CancellationToken; +use tracing::{Instrument, info_span}; use crate::{FeatureStore, PostHogClient, PostHogClientConfig}; @@ -26,31 +27,35 @@ impl FeatureResolverBackgroundLoop { pub fn spawn(self: Arc, handle: &tokio::runtime::Handle, refresh_period: Duration) { let this = self.clone(); let cancel = self.cancel.clone(); - handle.spawn(async move { - tracing::info!("Starting PostHog feature resolver"); - let mut ticker = tokio::time::interval(refresh_period); - ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - loop { - tokio::select! { - _ = ticker.tick() => {} - _ = cancel.cancelled() => break - } - let resp = match this - .posthog_client - .get_feature_flags_local_evaluation() - .await - { - Ok(resp) => resp, - Err(e) => { - tracing::warn!("Cannot get feature flags: {}", e); - continue; + handle.spawn( + async move { + tracing::info!("Starting PostHog feature resolver"); + let mut ticker = tokio::time::interval(refresh_period); + ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + loop { + tokio::select! { + _ = ticker.tick() => {} + _ = cancel.cancelled() => break } - }; - let feature_store = FeatureStore::new_with_flags(resp.flags); - this.feature_store.store(Arc::new(feature_store)); + let resp = match this + .posthog_client + .get_feature_flags_local_evaluation() + .await + { + Ok(resp) => resp, + Err(e) => { + tracing::warn!("Cannot get feature flags: {}", e); + continue; + } + }; + let feature_store = FeatureStore::new_with_flags(resp.flags); + this.feature_store.store(Arc::new(feature_store)); + tracing::info!("Feature flag updated"); + } + tracing::info!("PostHog feature resolver stopped"); } - tracing::info!("PostHog feature resolver stopped"); - }); + .instrument(info_span!("posthog_feature_resolver")), + ); } pub fn feature_store(&self) -> Arc { diff --git a/libs/posthog_client_lite/src/lib.rs b/libs/posthog_client_lite/src/lib.rs index 8aa8da2898..ff12051196 100644 --- a/libs/posthog_client_lite/src/lib.rs +++ b/libs/posthog_client_lite/src/lib.rs @@ -448,6 +448,18 @@ impl FeatureStore { ))) } } + + /// Infer whether a feature flag is a boolean flag by checking if it has a multivariate filter. + pub fn is_feature_flag_boolean(&self, flag_key: &str) -> Result { + if let Some(flag_config) = self.flags.get(flag_key) { + Ok(flag_config.filters.multivariate.is_none()) + } else { + Err(PostHogEvaluationError::NotAvailable(format!( + "Not found in the local evaluation spec: {}", + flag_key + ))) + } + } } pub struct PostHogClientConfig { @@ -528,7 +540,15 @@ impl PostHogClient { .bearer_auth(&self.config.server_api_key) .send() .await?; + let status = response.status(); let body = response.text().await?; + if !status.is_success() { + return Err(anyhow::anyhow!( + "Failed to get feature flags: {}, {}", + status, + body + )); + } Ok(serde_json::from_str(&body)?) } diff --git a/pageserver/src/feature_resolver.rs b/pageserver/src/feature_resolver.rs index 2b0f368079..7e31b930d0 100644 --- a/pageserver/src/feature_resolver.rs +++ b/pageserver/src/feature_resolver.rs @@ -91,4 +91,14 @@ impl FeatureResolver { )) } } + + pub fn is_feature_flag_boolean(&self, flag_key: &str) -> Result { + if let Some(inner) = &self.inner { + inner.feature_store().is_feature_flag_boolean(flag_key) + } else { + Err(PostHogEvaluationError::NotAvailable( + "PostHog integration is not enabled".to_string(), + )) + } + } } diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index c449e3373f..1effa10404 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -3663,6 +3663,46 @@ async fn read_tar_eof(mut reader: (impl tokio::io::AsyncRead + Unpin)) -> anyhow Ok(()) } +async fn tenant_evaluate_feature_flag( + request: Request, + _cancel: CancellationToken, +) -> Result, ApiError> { + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; + check_permission(&request, Some(tenant_shard_id.tenant_id))?; + + let flag: String = must_parse_query_param(&request, "flag")?; + let as_type: String = must_parse_query_param(&request, "as")?; + + let state = get_state(&request); + + async { + let tenant = state + .tenant_manager + .get_attached_tenant_shard(tenant_shard_id)?; + if as_type == "boolean" { + let result = tenant.feature_resolver.evaluate_boolean(&flag, tenant_shard_id.tenant_id); + let result = result.map(|_| true).map_err(|e| e.to_string()); + json_response(StatusCode::OK, result) + } else if as_type == "multivariate" { + let result = tenant.feature_resolver.evaluate_multivariate(&flag, tenant_shard_id.tenant_id).map_err(|e| e.to_string()); + json_response(StatusCode::OK, result) + } else { + // Auto infer the type of the feature flag. + let is_boolean = tenant.feature_resolver.is_feature_flag_boolean(&flag).map_err(|e| ApiError::InternalServerError(anyhow::anyhow!("{e}")))?; + if is_boolean { + let result = tenant.feature_resolver.evaluate_boolean(&flag, tenant_shard_id.tenant_id); + let result = result.map(|_| true).map_err(|e| e.to_string()); + json_response(StatusCode::OK, result) + } else { + let result = tenant.feature_resolver.evaluate_multivariate(&flag, tenant_shard_id.tenant_id).map_err(|e| e.to_string()); + json_response(StatusCode::OK, result) + } + } + } + .instrument(info_span!("tenant_evaluate_feature_flag", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug())) + .await +} + /// Common functionality of all the HTTP API handlers. /// /// - Adds a tracing span to each request (by `request_span`) @@ -4039,5 +4079,8 @@ pub fn make_router( "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/activate_post_import", |r| api_handler(r, activate_post_import_handler), ) + .get("/v1/tenant/:tenant_shard_id/feature_flag", |r| { + api_handler(r, tenant_evaluate_feature_flag) + }) .any(handler_404)) } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 451d266bc0..3a054aff83 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -383,7 +383,7 @@ pub struct TenantShard { l0_flush_global_state: L0FlushGlobalState, - feature_resolver: FeatureResolver, + pub(crate) feature_resolver: FeatureResolver, } impl std::fmt::Debug for TenantShard { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {