Add background worker that periodically spawns

synthetic size calculation.
Add new pageserver config param calculate_synthetic_size_interval
This commit is contained in:
Anastasia Lubennikova
2023-01-11 21:30:17 +02:00
parent ba0190e3e8
commit 0675859bb0
5 changed files with 115 additions and 2 deletions

View File

@@ -336,6 +336,7 @@ fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> {
pageserver::consumption_metrics::collect_metrics(
metric_collection_endpoint,
conf.metric_collection_interval,
conf.synthetic_size_calculation_interval,
conf.id,
)
.instrument(info_span!("metrics_collection"))

View File

@@ -59,6 +59,8 @@ pub mod defaults {
pub const DEFAULT_METRIC_COLLECTION_INTERVAL: &str = "10 min";
pub const DEFAULT_METRIC_COLLECTION_ENDPOINT: Option<reqwest::Url> = None;
pub const DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL: &str = "10 min";
///
/// Default built-in configuration file.
///
@@ -83,6 +85,7 @@ pub mod defaults {
#concurrent_tenant_size_logical_size_queries = '{DEFAULT_CONCURRENT_TENANT_SIZE_LOGICAL_SIZE_QUERIES}'
#metric_collection_interval = '{DEFAULT_METRIC_COLLECTION_INTERVAL}'
#synthetic_size_calculation_interval = '{DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL}'
# [tenant_config]
#checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes
@@ -152,6 +155,7 @@ pub struct PageServerConf {
// How often to collect metrics and send them to the metrics endpoint.
pub metric_collection_interval: Duration,
pub metric_collection_endpoint: Option<Url>,
pub synthetic_size_calculation_interval: Duration,
pub test_remote_failures: u64,
}
@@ -215,6 +219,7 @@ struct PageServerConfigBuilder {
metric_collection_interval: BuilderValue<Duration>,
metric_collection_endpoint: BuilderValue<Option<Url>>,
synthetic_size_calculation_interval: BuilderValue<Duration>,
test_remote_failures: BuilderValue<u64>,
}
@@ -255,6 +260,10 @@ impl Default for PageServerConfigBuilder {
DEFAULT_METRIC_COLLECTION_INTERVAL,
)
.expect("cannot parse default metric collection interval")),
synthetic_size_calculation_interval: Set(humantime::parse_duration(
DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL,
)
.expect("cannot parse default synthetic size calculation interval")),
metric_collection_endpoint: Set(DEFAULT_METRIC_COLLECTION_ENDPOINT),
test_remote_failures: Set(0),
@@ -342,6 +351,14 @@ impl PageServerConfigBuilder {
self.metric_collection_endpoint = BuilderValue::Set(metric_collection_endpoint)
}
pub fn synthetic_size_calculation_interval(
&mut self,
synthetic_size_calculation_interval: Duration,
) {
self.synthetic_size_calculation_interval =
BuilderValue::Set(synthetic_size_calculation_interval)
}
pub fn test_remote_failures(&mut self, fail_first: u64) {
self.test_remote_failures = BuilderValue::Set(fail_first);
}
@@ -399,6 +416,9 @@ impl PageServerConfigBuilder {
metric_collection_endpoint: self
.metric_collection_endpoint
.ok_or(anyhow!("missing metric_collection_endpoint"))?,
synthetic_size_calculation_interval: self
.synthetic_size_calculation_interval
.ok_or(anyhow!("missing synthetic_size_calculation_interval"))?,
test_remote_failures: self
.test_remote_failures
.ok_or(anyhow!("missing test_remote_failuers"))?,
@@ -577,7 +597,8 @@ impl PageServerConf {
let endpoint = parse_toml_string(key, item)?.parse().context("failed to parse metric_collection_endpoint")?;
builder.metric_collection_endpoint(Some(endpoint));
},
"synthetic_size_calculation_interval" =>
builder.synthetic_size_calculation_interval(parse_toml_duration(key, item)?),
"test_remote_failures" => builder.test_remote_failures(parse_toml_u64(key, item)?),
_ => bail!("unrecognized pageserver option '{key}'"),
}
@@ -701,6 +722,7 @@ impl PageServerConf {
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
metric_collection_interval: Duration::from_secs(60),
metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT,
synthetic_size_calculation_interval: Duration::from_secs(60),
test_remote_failures: 0,
}
}
@@ -834,6 +856,7 @@ id = 10
metric_collection_interval = '222 s'
metric_collection_endpoint = 'http://localhost:80/metrics'
synthetic_size_calculation_interval = '333 s'
log_format = 'json'
"#;
@@ -880,6 +903,9 @@ log_format = 'json'
defaults::DEFAULT_METRIC_COLLECTION_INTERVAL
)?,
metric_collection_endpoint: defaults::DEFAULT_METRIC_COLLECTION_ENDPOINT,
synthetic_size_calculation_interval: humantime::parse_duration(
defaults::DEFAULT_SYNTHETIC_SIZE_CALCULATION_INTERVAL
)?,
test_remote_failures: 0,
},
"Correct defaults should be used when no config values are provided"
@@ -926,6 +952,7 @@ log_format = 'json'
concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(),
metric_collection_interval: Duration::from_secs(222),
metric_collection_endpoint: Some(Url::parse("http://localhost:80/metrics")?),
synthetic_size_calculation_interval: Duration::from_secs(333),
test_remote_failures: 0,
},
"Should be able to parse all basic config values correctly"

View File

@@ -10,6 +10,9 @@ use utils::id::NodeId;
use utils::id::TimelineId;
use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::task_mgr::BACKGROUND_RUNTIME;
use crate::tenant::mgr;
use pageserver_api::models::TenantState;
use utils::id::TenantId;
@@ -142,12 +145,29 @@ struct EventChunk<'a> {
pub async fn collect_metrics(
metric_collection_endpoint: &Url,
metric_collection_interval: Duration,
synthetic_size_calculation_interval: Duration,
node_id: NodeId,
) -> anyhow::Result<()> {
let mut ticker = tokio::time::interval(metric_collection_interval);
info!("starting collect_metrics");
// spin up background worker that caclulates tenant sizes
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::CalculateSyntheticSize,
None,
None,
"synthetic size calculation",
true,
async move {
calculate_synthetic_size_worker(synthetic_size_calculation_interval)
.instrument(info_span!("synthetic_size_worker"))
.await?;
Ok(())
},
);
// define client here to reuse it for all requests
let client = reqwest::Client::new();
let mut cached_metrics: HashMap<ConsumptionMetricsKey, u64> = HashMap::new();
@@ -252,6 +272,16 @@ pub async fn collect_metrics_task(
));
// TODO add SyntheticStorageSize metric
let tenant_synthetic_size = tenant.calculate_synthetic_size().await?;
info!("tenant_synthetic_size: {}", tenant_synthetic_size);
current_metrics.push((
ConsumptionMetricsKey {
tenant_id,
timeline_id: None,
metric: ConsumptionMetricKind::SyntheticStorageSize,
},
tenant_synthetic_size,
));
}
// Filter metrics
@@ -261,7 +291,7 @@ pub async fn collect_metrics_task(
});
if current_metrics.is_empty() {
trace!("no new metrics to send");
info!("no new metrics to send");
return Ok(());
}
@@ -322,3 +352,37 @@ pub async fn collect_metrics_task(
Ok(())
}
/// Caclculate synthetic size for each active tenant
pub async fn calculate_synthetic_size_worker(
synthetic_size_calculation_interval: Duration,
) -> anyhow::Result<()> {
info!("starting calculate_synthetic_size_worker");
let mut ticker = tokio::time::interval(synthetic_size_calculation_interval);
loop {
tokio::select! {
_ = task_mgr::shutdown_watcher() => {
info!("calculate_synthetic_size_worker received cancellation request");
return Ok(());
},
_ = ticker.tick() => {
let tenants = mgr::list_tenants().await;
// iterate through list of Active tenants and collect metrics
for (tenant_id, tenant_state) in tenants {
if tenant_state != TenantState::Active {
continue;
}
let tenant = mgr::get_tenant(tenant_id, true).await?;
info!("spawn calculate_synthetic_size for tenant {}", tenant_id);
tenant.calculate_synthetic_size().await?;
}
}
}
}
}

View File

@@ -220,6 +220,8 @@ pub enum TaskKind {
// task that drives downloading layers
DownloadAllRemoteLayers,
// Task that calculates synthetis size for all active tenants
CalculateSyntheticSize,
}
#[derive(Default)]

View File

@@ -2359,6 +2359,25 @@ impl Tenant {
size::gather_inputs(self, logical_sizes_at_once, &mut shared_cache).await
}
/// Calculate synthetic tenant size
/// This is periodically called by background worker
///
#[instrument(skip_all, fields(tenant_id=%self.tenant_id))]
pub async fn calculate_synthetic_size(&self) -> anyhow::Result<u64> {
let inputs = self.gather_size_inputs().await?;
let size = inputs
.calculate()
.unwrap_or_else(|e| panic!("err {}, inputs {:?}", e, inputs)); // FIXME this panic is debug only.
info!(
"calculate_synthetic_size for tenant {} size: {}",
self.tenant_id, size,
);
Ok(size)
}
}
fn remove_timeline_and_uninit_mark(timeline_dir: &Path, uninit_mark: &Path) -> anyhow::Result<()> {