mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-22 15:41:15 +00:00
## Problem Part of #11813 PostHog has two endpoints to retrieve feature flags: the old project ID one that uses personal API token, and the new one using a special feature flag secure token that can only retrieve feature flag. The new API I added in this patch is not documented in the PostHog API doc but it's used in their Python SDK. ## Summary of changes Add support for "feature flag secure token API". The API has no way of providing a project ID so we verify if the retrieved spec is consistent with the project ID specified by comparing the `team_id` field. --------- Signed-off-by: Alex Chi Z <chi@neon.tech>
95 lines
3.6 KiB
Rust
95 lines
3.6 KiB
Rust
//! A background loop that fetches feature flags from PostHog and updates the feature store.
|
|
|
|
use std::{sync::Arc, time::Duration};
|
|
|
|
use arc_swap::ArcSwap;
|
|
use tokio_util::sync::CancellationToken;
|
|
use tracing::{Instrument, info_span};
|
|
|
|
use crate::{CaptureEvent, FeatureStore, PostHogClient, PostHogClientConfig};
|
|
|
|
/// A background loop that fetches feature flags from PostHog and updates the feature store.
|
|
pub struct FeatureResolverBackgroundLoop {
|
|
posthog_client: PostHogClient,
|
|
feature_store: ArcSwap<FeatureStore>,
|
|
cancel: CancellationToken,
|
|
}
|
|
|
|
impl FeatureResolverBackgroundLoop {
|
|
pub fn new(config: PostHogClientConfig, shutdown_pageserver: CancellationToken) -> Self {
|
|
Self {
|
|
posthog_client: PostHogClient::new(config),
|
|
feature_store: ArcSwap::new(Arc::new(FeatureStore::new())),
|
|
cancel: shutdown_pageserver,
|
|
}
|
|
}
|
|
|
|
pub fn spawn(
|
|
self: Arc<Self>,
|
|
handle: &tokio::runtime::Handle,
|
|
refresh_period: Duration,
|
|
fake_tenants: Vec<CaptureEvent>,
|
|
) {
|
|
let this = self.clone();
|
|
let cancel = self.cancel.clone();
|
|
|
|
// Main loop of updating the feature flags.
|
|
handle.spawn(
|
|
async move {
|
|
tracing::info!("Starting PostHog feature resolver");
|
|
let mut ticker = tokio::time::interval(refresh_period);
|
|
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
|
loop {
|
|
tokio::select! {
|
|
_ = ticker.tick() => {}
|
|
_ = cancel.cancelled() => break
|
|
}
|
|
let resp = match this
|
|
.posthog_client
|
|
.get_feature_flags_local_evaluation()
|
|
.await
|
|
{
|
|
Ok(resp) => resp,
|
|
Err(e) => {
|
|
tracing::warn!("Cannot get feature flags: {}", e);
|
|
continue;
|
|
}
|
|
};
|
|
let project_id = this.posthog_client.config.project_id.parse::<u64>().ok();
|
|
match FeatureStore::new_with_flags(resp.flags, project_id) {
|
|
Ok(feature_store) => {
|
|
this.feature_store.store(Arc::new(feature_store));
|
|
tracing::info!("Feature flag updated");
|
|
}
|
|
Err(e) => {
|
|
tracing::warn!("Cannot process feature flag spec: {}", e);
|
|
}
|
|
}
|
|
}
|
|
tracing::info!("PostHog feature resolver stopped");
|
|
}
|
|
.instrument(info_span!("posthog_feature_resolver")),
|
|
);
|
|
|
|
// Report fake tenants to PostHog so that we have the combination of all the properties in the UI.
|
|
// Do one report per pageserver restart.
|
|
let this = self.clone();
|
|
handle.spawn(
|
|
async move {
|
|
tracing::info!("Starting PostHog feature reporter");
|
|
for tenant in &fake_tenants {
|
|
tracing::info!("Reporting fake tenant: {:?}", tenant);
|
|
}
|
|
if let Err(e) = this.posthog_client.capture_event_batch(&fake_tenants).await {
|
|
tracing::warn!("Cannot report fake tenants: {}", e);
|
|
}
|
|
}
|
|
.instrument(info_span!("posthog_feature_reporter")),
|
|
);
|
|
}
|
|
|
|
pub fn feature_store(&self) -> Arc<FeatureStore> {
|
|
self.feature_store.load_full()
|
|
}
|
|
}
|