From 44300adeb764a1808f154cfff5bd5ca65b8e8b65 Mon Sep 17 00:00:00 2001 From: Cuong Nguyen Date: Wed, 29 Nov 2023 22:46:17 -0500 Subject: [PATCH] Make ingest_batch_size a pageserver config --- control_plane/src/pageserver.rs | 10 ------- libs/pageserver_api/src/models.rs | 1 - pageserver/src/config.rs | 29 ++++++++++++++++--- pageserver/src/http/openapi_spec.yml | 2 -- pageserver/src/tenant.rs | 1 - pageserver/src/tenant/config.rs | 11 ------- pageserver/src/tenant/timeline.rs | 5 +--- pageserver/src/tenant/timeline/walreceiver.rs | 2 +- .../walreceiver/connection_manager.rs | 2 +- .../walreceiver/walreceiver_connection.rs | 5 ++-- 10 files changed, 30 insertions(+), 38 deletions(-) diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 5f56c8fc7f..981ddd3e98 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -408,11 +408,6 @@ impl PageServerNode { .transpose() .context("Failed to parse 'gc_feedback' as bool")?, heatmap_period: settings.remove("heatmap_period").map(|x| x.to_string()), - ingest_batch_size: settings - .remove("ingest_batch_size") - .map(|x| x.parse::()) - .transpose() - .context("Failed to parse 'ingest_batch_size' as integer")?, }; let request = models::TenantCreateRequest { @@ -511,11 +506,6 @@ impl PageServerNode { .transpose() .context("Failed to parse 'gc_feedback' as bool")?, heatmap_period: settings.remove("heatmap_period").map(|x| x.to_string()), - ingest_batch_size: settings - .remove("ingest_batch_size") - .map(|x| x.parse::()) - .transpose() - .context("Failed to parse 'ingest_batch_size' as integer")?, } }; diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index dad168cdb2..fbc7d73235 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -238,7 +238,6 @@ pub struct TenantConfig { pub evictions_low_residence_duration_metric_threshold: Option, pub gc_feedback: Option, pub heatmap_period: Option, - pub ingest_batch_size: Option, } /// A flattened analog of a `pagesever::tenant::LocationMode`, which diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 817bc8b6c1..a6a11910c9 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -72,6 +72,8 @@ pub mod defaults { pub const DEFAULT_HEATMAP_UPLOAD_CONCURRENCY: usize = 8; + pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100; + /// /// Default built-in configuration file. /// @@ -84,6 +86,7 @@ pub mod defaults { #wait_lsn_timeout = '{DEFAULT_WAIT_LSN_TIMEOUT}' #wal_redo_timeout = '{DEFAULT_WAL_REDO_TIMEOUT}' +#page_cache_size = {DEFAULT_PAGE_CACHE_SIZE} #max_file_descriptors = {DEFAULT_MAX_FILE_DESCRIPTORS} # initial superuser role name to use when creating a new tenant @@ -103,6 +106,8 @@ pub mod defaults { #background_task_maximum_delay = '{DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY}' +#ingest_batch_size = {DEFAULT_INGEST_BATCH_SIZE} + [tenant_config] #checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes #checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT} @@ -118,7 +123,6 @@ pub mod defaults { #min_resident_size_override = .. # in bytes #evictions_low_residence_duration_metric_threshold = '{DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD}' #gc_feedback = false -#ingest_batch_size = {DEFAULT_INGEST_BATCH_SIZE} #heatmap_upload_concurrency = {DEFAULT_HEATMAP_UPLOAD_CONCURRENCY} @@ -224,6 +228,9 @@ pub struct PageServerConf { /// How many heatmap uploads may be done concurrency: lower values implicitly deprioritize /// heatmap uploads vs. other remote storage operations. pub heatmap_upload_concurrency: usize, + + /// Maximum number of WAL records to be ingested and committed at the same time + pub ingest_batch_size: u64, } /// We do not want to store this in a PageServerConf because the latter may be logged @@ -304,6 +311,8 @@ struct PageServerConfigBuilder { control_plane_emergency_mode: BuilderValue, heatmap_upload_concurrency: BuilderValue, + + ingest_batch_size: BuilderValue, } impl Default for PageServerConfigBuilder { @@ -374,6 +383,8 @@ impl Default for PageServerConfigBuilder { control_plane_emergency_mode: Set(false), heatmap_upload_concurrency: Set(DEFAULT_HEATMAP_UPLOAD_CONCURRENCY), + + ingest_batch_size: Set(DEFAULT_INGEST_BATCH_SIZE), } } } @@ -518,6 +529,10 @@ impl PageServerConfigBuilder { self.heatmap_upload_concurrency = BuilderValue::Set(value) } + pub fn ingest_batch_size(&mut self, ingest_batch_size: u64) { + self.ingest_batch_size = BuilderValue::Set(ingest_batch_size) + } + pub fn build(self) -> anyhow::Result { let concurrent_tenant_size_logical_size_queries = self .concurrent_tenant_size_logical_size_queries @@ -612,10 +627,12 @@ impl PageServerConfigBuilder { control_plane_emergency_mode: self .control_plane_emergency_mode .ok_or(anyhow!("missing control_plane_emergency_mode"))?, - heatmap_upload_concurrency: self .heatmap_upload_concurrency .ok_or(anyhow!("missing heatmap_upload_concurrency"))?, + ingest_batch_size: self + .ingest_batch_size + .ok_or(anyhow!("missing ingest_batch_size"))?, }) } } @@ -853,6 +870,7 @@ impl PageServerConf { "heatmap_upload_concurrency" => { builder.heatmap_upload_concurrency(parse_toml_u64(key, item)? as usize) }, + "ingest_batch_size" => builder.ingest_batch_size(parse_toml_u64(key, item)?), _ => bail!("unrecognized pageserver option '{key}'"), } } @@ -920,6 +938,7 @@ impl PageServerConf { control_plane_api_token: None, control_plane_emergency_mode: false, heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY, + ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE, } } } @@ -1145,7 +1164,8 @@ background_task_maximum_delay = '334 s' control_plane_api: None, control_plane_api_token: None, control_plane_emergency_mode: false, - heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY + heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY, + ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE, }, "Correct defaults should be used when no config values are provided" ); @@ -1203,7 +1223,8 @@ background_task_maximum_delay = '334 s' control_plane_api: None, control_plane_api_token: None, control_plane_emergency_mode: false, - heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY + heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY, + ingest_batch_size: 1, }, "Should be able to parse all basic config values correctly" ); diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index 642b1f848b..fbb059c17b 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -1397,8 +1397,6 @@ components: type: string image_creation_threshold: type: integer - ingest_batch_size: - type: integer walreceiver_connect_timeout: type: string lagging_wal_timeout: diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 85289facf7..89d332f178 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3716,7 +3716,6 @@ pub(crate) mod harness { ), gc_feedback: Some(tenant_conf.gc_feedback), heatmap_period: Some(tenant_conf.heatmap_period), - ingest_batch_size: Some(tenant_conf.ingest_batch_size), } } } diff --git a/pageserver/src/tenant/config.rs b/pageserver/src/tenant/config.rs index 485d354ba0..2d4cd350d7 100644 --- a/pageserver/src/tenant/config.rs +++ b/pageserver/src/tenant/config.rs @@ -341,8 +341,6 @@ pub struct TenantConf { /// may be disabled if a Tenant will not have secondary locations: only secondary /// locations will use the heatmap uploaded by attached locations. pub heatmap_period: Duration, - - pub ingest_batch_size: NonZeroU64, } /// Same as TenantConf, but this struct preserves the information about @@ -428,10 +426,6 @@ pub struct TenantConfOpt { #[serde(with = "humantime_serde")] #[serde(default)] pub heatmap_period: Option, - - #[serde(skip_serializing_if = "Option::is_none")] - #[serde(default)] - pub ingest_batch_size: Option, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] @@ -501,9 +495,6 @@ impl TenantConfOpt { .unwrap_or(global_conf.evictions_low_residence_duration_metric_threshold), gc_feedback: self.gc_feedback.unwrap_or(global_conf.gc_feedback), heatmap_period: self.heatmap_period.unwrap_or(global_conf.heatmap_period), - ingest_batch_size: self - .ingest_batch_size - .unwrap_or(global_conf.ingest_batch_size), } } } @@ -542,8 +533,6 @@ impl Default for TenantConf { .expect("cannot parse default evictions_low_residence_duration_metric_threshold"), gc_feedback: false, heatmap_period: Duration::ZERO, - ingest_batch_size: NonZeroU64::new(DEFAULT_INGEST_BATCH_SIZE) - .expect("cannot parse default ingest_batch_size"), } } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index fcdbe67b36..35d7d5040b 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1430,10 +1430,7 @@ impl Timeline { .tenant_conf .max_lsn_wal_lag .unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag); - let ingest_batch_size = tenant_conf_guard - .tenant_conf - .ingest_batch_size - .unwrap_or(self.conf.default_tenant_conf.ingest_batch_size); + let ingest_batch_size = self.conf.ingest_batch_size; drop(tenant_conf_guard); let mut guard = self.walreceiver.lock().unwrap(); diff --git a/pageserver/src/tenant/timeline/walreceiver.rs b/pageserver/src/tenant/timeline/walreceiver.rs index d2dc82ed76..2fab6722b8 100644 --- a/pageserver/src/tenant/timeline/walreceiver.rs +++ b/pageserver/src/tenant/timeline/walreceiver.rs @@ -58,7 +58,7 @@ pub struct WalReceiverConf { pub max_lsn_wal_lag: NonZeroU64, pub auth_token: Option>, pub availability_zone: Option, - pub ingest_batch_size: NonZeroU64, + pub ingest_batch_size: u64, } pub struct WalReceiver { diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 0b0e1167c7..1b145aafba 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -1347,7 +1347,7 @@ mod tests { max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(), auth_token: None, availability_zone: None, - ingest_batch_size: NonZeroU64::new(1).unwrap(), + ingest_batch_size: 1, }, wal_connection: None, wal_stream_candidates: HashMap::new(), diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index d4234c22f7..88e83fd8c4 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -2,7 +2,6 @@ use std::{ error::Error, - num::NonZeroU64, pin::pin, str::FromStr, sync::Arc, @@ -116,7 +115,7 @@ pub(super) async fn handle_walreceiver_connection( connect_timeout: Duration, ctx: RequestContext, node: NodeId, - ingest_batch_size: NonZeroU64, + ingest_batch_size: u64, ) -> Result<(), WalReceiverError> { debug_assert_current_span_has_tenant_and_timeline_id(); @@ -337,7 +336,7 @@ pub(super) async fn handle_walreceiver_connection( uncommitted_records += 1; // Commit every ingest_batch_size records. - if uncommitted_records >= ingest_batch_size.get() { + if uncommitted_records >= ingest_batch_size { modification.commit(&ctx).await?; uncommitted_records = 0; }