From 5ec8881c0bda42db3a735d832171e500dfb11ad8 Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Tue, 1 Jul 2025 11:11:24 -0700 Subject: [PATCH] feat(pageserver): resolve feature flag based on remote size (#12400) ## Problem Part of #11813 ## Summary of changes * Compute tenant remote size in the housekeeping loop. * Add a new `TenantFeatureResolver` struct to cache the tenant-specific properties. * Evaluate feature flag based on the remote size. --------- Signed-off-by: Alex Chi Z --- pageserver/src/feature_resolver.rs | 103 ++++++++++++++++++- pageserver/src/http/routes.rs | 12 ++- pageserver/src/tenant.rs | 14 ++- pageserver/src/tenant/timeline.rs | 6 +- pageserver/src/tenant/timeline/compaction.rs | 2 +- 5 files changed, 120 insertions(+), 17 deletions(-) diff --git a/pageserver/src/feature_resolver.rs b/pageserver/src/feature_resolver.rs index 3080b0db34..6ce4522080 100644 --- a/pageserver/src/feature_resolver.rs +++ b/pageserver/src/feature_resolver.rs @@ -6,12 +6,13 @@ use posthog_client_lite::{ CaptureEvent, FeatureResolverBackgroundLoop, PostHogEvaluationError, PostHogFlagFilterPropertyValue, }; +use rand::Rng; use remote_storage::RemoteStorageKind; use serde_json::json; use tokio_util::sync::CancellationToken; use utils::id::TenantId; -use crate::{config::PageServerConf, metrics::FEATURE_FLAG_EVALUATION}; +use crate::{config::PageServerConf, metrics::FEATURE_FLAG_EVALUATION, tenant::TenantShard}; const DEFAULT_POSTHOG_REFRESH_INTERVAL: Duration = Duration::from_secs(600); @@ -138,6 +139,7 @@ impl FeatureResolver { } Arc::new(properties) }; + let fake_tenants = { let mut tenants = Vec::new(); for i in 0..10 { @@ -147,9 +149,16 @@ impl FeatureResolver { conf.id, i ); + + let tenant_properties = PerTenantProperties { + remote_size_mb: Some(rand::thread_rng().gen_range(100.0..1000000.00)), + } + .into_posthog_properties(); + let properties = Self::collect_properties_inner( distinct_id.clone(), Some(&internal_properties), + &tenant_properties, ); tenants.push(CaptureEvent { event: "initial_tenant_report".to_string(), @@ -183,6 +192,7 @@ impl FeatureResolver { fn collect_properties_inner( tenant_id: String, internal_properties: Option<&HashMap>, + tenant_properties: &HashMap, ) -> HashMap { let mut properties = HashMap::new(); if let Some(internal_properties) = internal_properties { @@ -194,6 +204,9 @@ impl FeatureResolver { "tenant_id".to_string(), PostHogFlagFilterPropertyValue::String(tenant_id), ); + for (key, value) in tenant_properties.iter() { + properties.insert(key.clone(), value.clone()); + } properties } @@ -201,8 +214,13 @@ impl FeatureResolver { pub(crate) fn collect_properties( &self, tenant_id: TenantId, + tenant_properties: &HashMap, ) -> HashMap { - Self::collect_properties_inner(tenant_id.to_string(), self.internal_properties.as_deref()) + Self::collect_properties_inner( + tenant_id.to_string(), + self.internal_properties.as_deref(), + tenant_properties, + ) } /// Evaluate a multivariate feature flag. Currently, we do not support any properties. @@ -214,6 +232,7 @@ impl FeatureResolver { &self, flag_key: &str, tenant_id: TenantId, + tenant_properties: &HashMap, ) -> Result { let force_overrides = self.force_overrides_for_testing.load(); if let Some(value) = force_overrides.get(flag_key) { @@ -224,7 +243,7 @@ impl FeatureResolver { let res = inner.feature_store().evaluate_multivariate( flag_key, &tenant_id.to_string(), - &self.collect_properties(tenant_id), + &self.collect_properties(tenant_id, tenant_properties), ); match &res { Ok(value) => { @@ -257,6 +276,7 @@ impl FeatureResolver { &self, flag_key: &str, tenant_id: TenantId, + tenant_properties: &HashMap, ) -> Result<(), PostHogEvaluationError> { let force_overrides = self.force_overrides_for_testing.load(); if let Some(value) = force_overrides.get(flag_key) { @@ -271,7 +291,7 @@ impl FeatureResolver { let res = inner.feature_store().evaluate_boolean( flag_key, &tenant_id.to_string(), - &self.collect_properties(tenant_id), + &self.collect_properties(tenant_id, tenant_properties), ); match &res { Ok(()) => { @@ -317,3 +337,78 @@ impl FeatureResolver { .store(Arc::new(force_overrides)); } } + +struct PerTenantProperties { + pub remote_size_mb: Option, +} + +impl PerTenantProperties { + pub fn into_posthog_properties(self) -> HashMap { + let mut properties = HashMap::new(); + if let Some(remote_size_mb) = self.remote_size_mb { + properties.insert( + "tenant_remote_size_mb".to_string(), + PostHogFlagFilterPropertyValue::Number(remote_size_mb), + ); + } + properties + } +} + +#[derive(Clone)] +pub struct TenantFeatureResolver { + inner: FeatureResolver, + tenant_id: TenantId, + cached_tenant_properties: Arc>>, +} + +impl TenantFeatureResolver { + pub fn new(inner: FeatureResolver, tenant_id: TenantId) -> Self { + Self { + inner, + tenant_id, + cached_tenant_properties: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))), + } + } + + pub fn evaluate_multivariate(&self, flag_key: &str) -> Result { + self.inner.evaluate_multivariate( + flag_key, + self.tenant_id, + &self.cached_tenant_properties.load(), + ) + } + + pub fn evaluate_boolean(&self, flag_key: &str) -> Result<(), PostHogEvaluationError> { + self.inner.evaluate_boolean( + flag_key, + self.tenant_id, + &self.cached_tenant_properties.load(), + ) + } + + pub fn collect_properties(&self) -> HashMap { + self.inner + .collect_properties(self.tenant_id, &self.cached_tenant_properties.load()) + } + + pub fn is_feature_flag_boolean(&self, flag_key: &str) -> Result { + self.inner.is_feature_flag_boolean(flag_key) + } + + pub fn update_cached_tenant_properties(&self, tenant_shard: &TenantShard) { + let mut remote_size_mb = None; + for timeline in tenant_shard.list_timelines() { + let size = timeline.metrics.resident_physical_size_get(); + if size == 0 { + remote_size_mb = None; + } + if let Some(ref mut remote_size_mb) = remote_size_mb { + *remote_size_mb += size as f64 / 1024.0 / 1024.0; + } + } + self.cached_tenant_properties.store(Arc::new( + PerTenantProperties { remote_size_mb }.into_posthog_properties(), + )); + } +} diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 119275f885..b18b7d6bcd 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -3697,23 +3697,25 @@ async fn tenant_evaluate_feature_flag( let tenant = state .tenant_manager .get_attached_tenant_shard(tenant_shard_id)?; - let properties = tenant.feature_resolver.collect_properties(tenant_shard_id.tenant_id); + // TODO: the properties we get here might be stale right after it is collected. But such races are rare (updated every 10s) + // and we don't need to worry about it for now. + let properties = tenant.feature_resolver.collect_properties(); if as_type.as_deref() == Some("boolean") { - let result = tenant.feature_resolver.evaluate_boolean(&flag, tenant_shard_id.tenant_id); + let result = tenant.feature_resolver.evaluate_boolean(&flag); let result = result.map(|_| true).map_err(|e| e.to_string()); json_response(StatusCode::OK, json!({ "result": result, "properties": properties })) } else if as_type.as_deref() == Some("multivariate") { - let result = tenant.feature_resolver.evaluate_multivariate(&flag, tenant_shard_id.tenant_id).map_err(|e| e.to_string()); + let result = tenant.feature_resolver.evaluate_multivariate(&flag).map_err(|e| e.to_string()); json_response(StatusCode::OK, json!({ "result": result, "properties": properties })) } else { // Auto infer the type of the feature flag. let is_boolean = tenant.feature_resolver.is_feature_flag_boolean(&flag).map_err(|e| ApiError::InternalServerError(anyhow::anyhow!("{e}")))?; if is_boolean { - let result = tenant.feature_resolver.evaluate_boolean(&flag, tenant_shard_id.tenant_id); + let result = tenant.feature_resolver.evaluate_boolean(&flag); let result = result.map(|_| true).map_err(|e| e.to_string()); json_response(StatusCode::OK, json!({ "result": result, "properties": properties })) } else { - let result = tenant.feature_resolver.evaluate_multivariate(&flag, tenant_shard_id.tenant_id).map_err(|e| e.to_string()); + let result = tenant.feature_resolver.evaluate_multivariate(&flag).map_err(|e| e.to_string()); json_response(StatusCode::OK, json!({ "result": result, "properties": properties })) } } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index fcb18e8553..3756ebfad9 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -86,7 +86,7 @@ use crate::context; use crate::context::RequestContextBuilder; use crate::context::{DownloadBehavior, RequestContext}; use crate::deletion_queue::{DeletionQueueClient, DeletionQueueError}; -use crate::feature_resolver::FeatureResolver; +use crate::feature_resolver::{FeatureResolver, TenantFeatureResolver}; use crate::l0_flush::L0FlushGlobalState; use crate::metrics::{ BROKEN_TENANTS_SET, CIRCUIT_BREAKERS_BROKEN, CIRCUIT_BREAKERS_UNBROKEN, CONCURRENT_INITDBS, @@ -386,7 +386,7 @@ pub struct TenantShard { l0_flush_global_state: L0FlushGlobalState, - pub(crate) feature_resolver: FeatureResolver, + pub(crate) feature_resolver: TenantFeatureResolver, } impl std::fmt::Debug for TenantShard { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -3263,7 +3263,7 @@ impl TenantShard { }; let gc_compaction_strategy = self .feature_resolver - .evaluate_multivariate("gc-comapction-strategy", self.tenant_shard_id.tenant_id) + .evaluate_multivariate("gc-comapction-strategy") .ok(); let span = if let Some(gc_compaction_strategy) = gc_compaction_strategy { info_span!("gc_compact_timeline", timeline_id = %timeline.timeline_id, strategy = %gc_compaction_strategy) @@ -3408,6 +3408,9 @@ impl TenantShard { if let Some(ref walredo_mgr) = self.walredo_mgr { walredo_mgr.maybe_quiesce(WALREDO_IDLE_TIMEOUT); } + + // Update the feature resolver with the latest tenant-spcific data. + self.feature_resolver.update_cached_tenant_properties(self); } pub fn timeline_has_no_attached_children(&self, timeline_id: TimelineId) -> bool { @@ -4490,7 +4493,10 @@ impl TenantShard { gc_block: Default::default(), l0_flush_global_state, basebackup_cache, - feature_resolver, + feature_resolver: TenantFeatureResolver::new( + feature_resolver, + tenant_shard_id.tenant_id, + ), } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index aca44718fa..443fb7fafb 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -106,7 +106,7 @@ use crate::context::{ DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder, }; use crate::disk_usage_eviction_task::{DiskUsageEvictionInfo, EvictionCandidate, finite_f32}; -use crate::feature_resolver::FeatureResolver; +use crate::feature_resolver::TenantFeatureResolver; use crate::keyspace::{KeyPartitioning, KeySpace}; use crate::l0_flush::{self, L0FlushGlobalState}; use crate::metrics::{ @@ -202,7 +202,7 @@ pub struct TimelineResources { pub l0_compaction_trigger: Arc, pub l0_flush_global_state: l0_flush::L0FlushGlobalState, pub basebackup_cache: Arc, - pub feature_resolver: FeatureResolver, + pub feature_resolver: TenantFeatureResolver, } pub struct Timeline { @@ -450,7 +450,7 @@ pub struct Timeline { /// A channel to send async requests to prepare a basebackup for the basebackup cache. basebackup_cache: Arc, - feature_resolver: FeatureResolver, + feature_resolver: TenantFeatureResolver, } pub(crate) enum PreviousHeatmap { diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 43573c28a2..9b64938b3e 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -1319,7 +1319,7 @@ impl Timeline { || cfg!(feature = "testing") || self .feature_resolver - .evaluate_boolean("image-compaction-boundary", self.tenant_shard_id.tenant_id) + .evaluate_boolean("image-compaction-boundary") .is_ok() { let last_repartition_lsn = self.partitioning.read().1;