From 0675859bb0991da1eb8ed6c54a6a0fc1849e1b22 Mon Sep 17 00:00:00 2001 From: Anastasia Lubennikova Date: Wed, 11 Jan 2023 21:30:17 +0200 Subject: [PATCH] Add background worker that periodically spawns synthetic size calculation. Add new pageserver config param calculate_synthetic_size_interval --- pageserver/src/bin/pageserver.rs | 1 + pageserver/src/config.rs | 29 +++++++++++- pageserver/src/consumption_metrics.rs | 66 ++++++++++++++++++++++++++- pageserver/src/task_mgr.rs | 2 + pageserver/src/tenant.rs | 19 ++++++++ 5 files changed, 115 insertions(+), 2 deletions(-) diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 18ec1ac68b..5de6e4def5 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -336,6 +336,7 @@ fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> { pageserver::consumption_metrics::collect_metrics( metric_collection_endpoint, conf.metric_collection_interval, + conf.synthetic_size_calculation_interval, conf.id, ) .instrument(info_span!("metrics_collection")) diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 7b99d98581..51d1664e52 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -59,6 +59,8 @@ pub mod defaults { pub const DEFAULT_METRIC_COLLECTION_INTERVAL: &str = "10 min"; pub const DEFAULT_METRIC_COLLECTION_ENDPOINT: Option = None; + pub const DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL: &str = "10 min"; + /// /// Default built-in configuration file. /// @@ -83,6 +85,7 @@ pub mod defaults { #concurrent_tenant_size_logical_size_queries = '{DEFAULT_CONCURRENT_TENANT_SIZE_LOGICAL_SIZE_QUERIES}' #metric_collection_interval = '{DEFAULT_METRIC_COLLECTION_INTERVAL}' +#synthetic_size_calculation_interval = '{DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL}' # [tenant_config] #checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes @@ -152,6 +155,7 @@ pub struct PageServerConf { // How often to collect metrics and send them to the metrics endpoint. pub metric_collection_interval: Duration, pub metric_collection_endpoint: Option, + pub synthetic_size_calculation_interval: Duration, pub test_remote_failures: u64, } @@ -215,6 +219,7 @@ struct PageServerConfigBuilder { metric_collection_interval: BuilderValue, metric_collection_endpoint: BuilderValue>, + synthetic_size_calculation_interval: BuilderValue, test_remote_failures: BuilderValue, } @@ -255,6 +260,10 @@ impl Default for PageServerConfigBuilder { DEFAULT_METRIC_COLLECTION_INTERVAL, ) .expect("cannot parse default metric collection interval")), + synthetic_size_calculation_interval: Set(humantime::parse_duration( + DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL, + ) + .expect("cannot parse default synthetic size calculation interval")), metric_collection_endpoint: Set(DEFAULT_METRIC_COLLECTION_ENDPOINT), test_remote_failures: Set(0), @@ -342,6 +351,14 @@ impl PageServerConfigBuilder { self.metric_collection_endpoint = BuilderValue::Set(metric_collection_endpoint) } + pub fn synthetic_size_calculation_interval( + &mut self, + synthetic_size_calculation_interval: Duration, + ) { + self.synthetic_size_calculation_interval = + BuilderValue::Set(synthetic_size_calculation_interval) + } + pub fn test_remote_failures(&mut self, fail_first: u64) { self.test_remote_failures = BuilderValue::Set(fail_first); } @@ -399,6 +416,9 @@ impl PageServerConfigBuilder { metric_collection_endpoint: self .metric_collection_endpoint .ok_or(anyhow!("missing metric_collection_endpoint"))?, + synthetic_size_calculation_interval: self + .synthetic_size_calculation_interval + .ok_or(anyhow!("missing synthetic_size_calculation_interval"))?, test_remote_failures: self .test_remote_failures .ok_or(anyhow!("missing test_remote_failuers"))?, @@ -577,7 +597,8 @@ impl PageServerConf { let endpoint = parse_toml_string(key, item)?.parse().context("failed to parse metric_collection_endpoint")?; builder.metric_collection_endpoint(Some(endpoint)); }, - + "synthetic_size_calculation_interval" => + builder.synthetic_size_calculation_interval(parse_toml_duration(key, item)?), "test_remote_failures" => builder.test_remote_failures(parse_toml_u64(key, item)?), _ => bail!("unrecognized pageserver option '{key}'"), } @@ -701,6 +722,7 @@ impl PageServerConf { concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(), metric_collection_interval: Duration::from_secs(60), metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT, + synthetic_size_calculation_interval: Duration::from_secs(60), test_remote_failures: 0, } } @@ -834,6 +856,7 @@ id = 10 metric_collection_interval = '222 s' metric_collection_endpoint = 'http://localhost:80/metrics' +synthetic_size_calculation_interval = '333 s' log_format = 'json' "#; @@ -880,6 +903,9 @@ log_format = 'json' defaults::DEFAULT_METRIC_COLLECTION_INTERVAL )?, metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT, + synthetic_size_calculation_interval: humantime::parse_duration( + defaults::DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL + )?, test_remote_failures: 0, }, "Correct defaults should be used when no config values are provided" @@ -926,6 +952,7 @@ log_format = 'json' concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(), metric_collection_interval: Duration::from_secs(222), metric_collection_endpoint: Some(Url::parse("http://localhost:80/metrics")?), + synthetic_size_calculation_interval: Duration::from_secs(333), test_remote_failures: 0, }, "Should be able to parse all basic config values correctly" diff --git a/pageserver/src/consumption_metrics.rs b/pageserver/src/consumption_metrics.rs index c411a9e025..95b2825cc7 100644 --- a/pageserver/src/consumption_metrics.rs +++ b/pageserver/src/consumption_metrics.rs @@ -10,6 +10,9 @@ use utils::id::NodeId; use utils::id::TimelineId; use crate::task_mgr; +use crate::task_mgr::TaskKind; +use crate::task_mgr::BACKGROUND_RUNTIME; + use crate::tenant::mgr; use pageserver_api::models::TenantState; use utils::id::TenantId; @@ -142,12 +145,29 @@ struct EventChunk<'a> { pub async fn collect_metrics( metric_collection_endpoint: &Url, metric_collection_interval: Duration, + synthetic_size_calculation_interval: Duration, node_id: NodeId, ) -> anyhow::Result<()> { let mut ticker = tokio::time::interval(metric_collection_interval); info!("starting collect_metrics"); + // spin up background worker that caclulates tenant sizes + task_mgr::spawn( + BACKGROUND_RUNTIME.handle(), + TaskKind::CalculateSyntheticSize, + None, + None, + "synthetic size calculation", + true, + async move { + calculate_synthetic_size_worker(synthetic_size_calculation_interval) + .instrument(info_span!("synthetic_size_worker")) + .await?; + Ok(()) + }, + ); + // define client here to reuse it for all requests let client = reqwest::Client::new(); let mut cached_metrics: HashMap = HashMap::new(); @@ -252,6 +272,16 @@ pub async fn collect_metrics_task( )); // TODO add SyntheticStorageSize metric + let tenant_synthetic_size = tenant.calculate_synthetic_size().await?; + info!("tenant_synthetic_size: {}", tenant_synthetic_size); + current_metrics.push(( + ConsumptionMetricsKey { + tenant_id, + timeline_id: None, + metric: ConsumptionMetricKind::SyntheticStorageSize, + }, + tenant_synthetic_size, + )); } // Filter metrics @@ -261,7 +291,7 @@ pub async fn collect_metrics_task( }); if current_metrics.is_empty() { - trace!("no new metrics to send"); + info!("no new metrics to send"); return Ok(()); } @@ -322,3 +352,37 @@ pub async fn collect_metrics_task( Ok(()) } + +/// Caclculate synthetic size for each active tenant +pub async fn calculate_synthetic_size_worker( + synthetic_size_calculation_interval: Duration, +) -> anyhow::Result<()> { + info!("starting calculate_synthetic_size_worker"); + + let mut ticker = tokio::time::interval(synthetic_size_calculation_interval); + + loop { + tokio::select! { + _ = task_mgr::shutdown_watcher() => { + info!("calculate_synthetic_size_worker received cancellation request"); + return Ok(()); + }, + _ = ticker.tick() => { + + let tenants = mgr::list_tenants().await; + // iterate through list of Active tenants and collect metrics + for (tenant_id, tenant_state) in tenants { + + if tenant_state != TenantState::Active { + continue; + } + + let tenant = mgr::get_tenant(tenant_id, true).await?; + info!("spawn calculate_synthetic_size for tenant {}", tenant_id); + tenant.calculate_synthetic_size().await?; + + } + } + } + } +} diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index a1b3ad26b0..02e2e2ee14 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -220,6 +220,8 @@ pub enum TaskKind { // task that drives downloading layers DownloadAllRemoteLayers, + // Task that calculates synthetis size for all active tenants + CalculateSyntheticSize, } #[derive(Default)] diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 66f7e79877..1ff3d88c40 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -2359,6 +2359,25 @@ impl Tenant { size::gather_inputs(self, logical_sizes_at_once, &mut shared_cache).await } + + /// Calculate synthetic tenant size + /// This is periodically called by background worker + /// + #[instrument(skip_all, fields(tenant_id=%self.tenant_id))] + pub async fn calculate_synthetic_size(&self) -> anyhow::Result { + let inputs = self.gather_size_inputs().await?; + + let size = inputs + .calculate() + .unwrap_or_else(|e| panic!("err {}, inputs {:?}", e, inputs)); // FIXME this panic is debug only. + + info!( + "calculate_synthetic_size for tenant {} size: {}", + self.tenant_id, size, + ); + + Ok(size) + } } fn remove_timeline_and_uninit_mark(timeline_dir: &Path, uninit_mark: &Path) -> anyhow::Result<()> {