mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
Make ingest_batch_size a pageserver config
This commit is contained in:
@@ -408,11 +408,6 @@ impl PageServerNode {
|
||||
.transpose()
|
||||
.context("Failed to parse 'gc_feedback' as bool")?,
|
||||
heatmap_period: settings.remove("heatmap_period").map(|x| x.to_string()),
|
||||
ingest_batch_size: settings
|
||||
.remove("ingest_batch_size")
|
||||
.map(|x| x.parse::<NonZeroU64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'ingest_batch_size' as integer")?,
|
||||
};
|
||||
|
||||
let request = models::TenantCreateRequest {
|
||||
@@ -511,11 +506,6 @@ impl PageServerNode {
|
||||
.transpose()
|
||||
.context("Failed to parse 'gc_feedback' as bool")?,
|
||||
heatmap_period: settings.remove("heatmap_period").map(|x| x.to_string()),
|
||||
ingest_batch_size: settings
|
||||
.remove("ingest_batch_size")
|
||||
.map(|x| x.parse::<NonZeroU64>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'ingest_batch_size' as integer")?,
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -238,7 +238,6 @@ pub struct TenantConfig {
|
||||
pub evictions_low_residence_duration_metric_threshold: Option<String>,
|
||||
pub gc_feedback: Option<bool>,
|
||||
pub heatmap_period: Option<String>,
|
||||
pub ingest_batch_size: Option<NonZeroU64>,
|
||||
}
|
||||
|
||||
/// A flattened analog of a `pagesever::tenant::LocationMode`, which
|
||||
|
||||
@@ -72,6 +72,8 @@ pub mod defaults {
|
||||
|
||||
pub const DEFAULT_HEATMAP_UPLOAD_CONCURRENCY: usize = 8;
|
||||
|
||||
pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100;
|
||||
|
||||
///
|
||||
/// Default built-in configuration file.
|
||||
///
|
||||
@@ -84,6 +86,7 @@ pub mod defaults {
|
||||
#wait_lsn_timeout = '{DEFAULT_WAIT_LSN_TIMEOUT}'
|
||||
#wal_redo_timeout = '{DEFAULT_WAL_REDO_TIMEOUT}'
|
||||
|
||||
#page_cache_size = {DEFAULT_PAGE_CACHE_SIZE}
|
||||
#max_file_descriptors = {DEFAULT_MAX_FILE_DESCRIPTORS}
|
||||
|
||||
# initial superuser role name to use when creating a new tenant
|
||||
@@ -103,6 +106,8 @@ pub mod defaults {
|
||||
|
||||
#background_task_maximum_delay = '{DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY}'
|
||||
|
||||
#ingest_batch_size = {DEFAULT_INGEST_BATCH_SIZE}
|
||||
|
||||
[tenant_config]
|
||||
#checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes
|
||||
#checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT}
|
||||
@@ -118,7 +123,6 @@ pub mod defaults {
|
||||
#min_resident_size_override = .. # in bytes
|
||||
#evictions_low_residence_duration_metric_threshold = '{DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD}'
|
||||
#gc_feedback = false
|
||||
#ingest_batch_size = {DEFAULT_INGEST_BATCH_SIZE}
|
||||
|
||||
#heatmap_upload_concurrency = {DEFAULT_HEATMAP_UPLOAD_CONCURRENCY}
|
||||
|
||||
@@ -224,6 +228,9 @@ pub struct PageServerConf {
|
||||
/// How many heatmap uploads may be done concurrency: lower values implicitly deprioritize
|
||||
/// heatmap uploads vs. other remote storage operations.
|
||||
pub heatmap_upload_concurrency: usize,
|
||||
|
||||
/// Maximum number of WAL records to be ingested and committed at the same time
|
||||
pub ingest_batch_size: u64,
|
||||
}
|
||||
|
||||
/// We do not want to store this in a PageServerConf because the latter may be logged
|
||||
@@ -304,6 +311,8 @@ struct PageServerConfigBuilder {
|
||||
control_plane_emergency_mode: BuilderValue<bool>,
|
||||
|
||||
heatmap_upload_concurrency: BuilderValue<usize>,
|
||||
|
||||
ingest_batch_size: BuilderValue<u64>,
|
||||
}
|
||||
|
||||
impl Default for PageServerConfigBuilder {
|
||||
@@ -374,6 +383,8 @@ impl Default for PageServerConfigBuilder {
|
||||
control_plane_emergency_mode: Set(false),
|
||||
|
||||
heatmap_upload_concurrency: Set(DEFAULT_HEATMAP_UPLOAD_CONCURRENCY),
|
||||
|
||||
ingest_batch_size: Set(DEFAULT_INGEST_BATCH_SIZE),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -518,6 +529,10 @@ impl PageServerConfigBuilder {
|
||||
self.heatmap_upload_concurrency = BuilderValue::Set(value)
|
||||
}
|
||||
|
||||
pub fn ingest_batch_size(&mut self, ingest_batch_size: u64) {
|
||||
self.ingest_batch_size = BuilderValue::Set(ingest_batch_size)
|
||||
}
|
||||
|
||||
pub fn build(self) -> anyhow::Result<PageServerConf> {
|
||||
let concurrent_tenant_size_logical_size_queries = self
|
||||
.concurrent_tenant_size_logical_size_queries
|
||||
@@ -612,10 +627,12 @@ impl PageServerConfigBuilder {
|
||||
control_plane_emergency_mode: self
|
||||
.control_plane_emergency_mode
|
||||
.ok_or(anyhow!("missing control_plane_emergency_mode"))?,
|
||||
|
||||
heatmap_upload_concurrency: self
|
||||
.heatmap_upload_concurrency
|
||||
.ok_or(anyhow!("missing heatmap_upload_concurrency"))?,
|
||||
ingest_batch_size: self
|
||||
.ingest_batch_size
|
||||
.ok_or(anyhow!("missing ingest_batch_size"))?,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -853,6 +870,7 @@ impl PageServerConf {
|
||||
"heatmap_upload_concurrency" => {
|
||||
builder.heatmap_upload_concurrency(parse_toml_u64(key, item)? as usize)
|
||||
},
|
||||
"ingest_batch_size" => builder.ingest_batch_size(parse_toml_u64(key, item)?),
|
||||
_ => bail!("unrecognized pageserver option '{key}'"),
|
||||
}
|
||||
}
|
||||
@@ -920,6 +938,7 @@ impl PageServerConf {
|
||||
control_plane_api_token: None,
|
||||
control_plane_emergency_mode: false,
|
||||
heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY,
|
||||
ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1145,7 +1164,8 @@ background_task_maximum_delay = '334 s'
|
||||
control_plane_api: None,
|
||||
control_plane_api_token: None,
|
||||
control_plane_emergency_mode: false,
|
||||
heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY
|
||||
heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY,
|
||||
ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE,
|
||||
},
|
||||
"Correct defaults should be used when no config values are provided"
|
||||
);
|
||||
@@ -1203,7 +1223,8 @@ background_task_maximum_delay = '334 s'
|
||||
control_plane_api: None,
|
||||
control_plane_api_token: None,
|
||||
control_plane_emergency_mode: false,
|
||||
heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY
|
||||
heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY,
|
||||
ingest_batch_size: 1,
|
||||
},
|
||||
"Should be able to parse all basic config values correctly"
|
||||
);
|
||||
|
||||
@@ -1397,8 +1397,6 @@ components:
|
||||
type: string
|
||||
image_creation_threshold:
|
||||
type: integer
|
||||
ingest_batch_size:
|
||||
type: integer
|
||||
walreceiver_connect_timeout:
|
||||
type: string
|
||||
lagging_wal_timeout:
|
||||
|
||||
@@ -3716,7 +3716,6 @@ pub(crate) mod harness {
|
||||
),
|
||||
gc_feedback: Some(tenant_conf.gc_feedback),
|
||||
heatmap_period: Some(tenant_conf.heatmap_period),
|
||||
ingest_batch_size: Some(tenant_conf.ingest_batch_size),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -341,8 +341,6 @@ pub struct TenantConf {
|
||||
/// may be disabled if a Tenant will not have secondary locations: only secondary
|
||||
/// locations will use the heatmap uploaded by attached locations.
|
||||
pub heatmap_period: Duration,
|
||||
|
||||
pub ingest_batch_size: NonZeroU64,
|
||||
}
|
||||
|
||||
/// Same as TenantConf, but this struct preserves the information about
|
||||
@@ -428,10 +426,6 @@ pub struct TenantConfOpt {
|
||||
#[serde(with = "humantime_serde")]
|
||||
#[serde(default)]
|
||||
pub heatmap_period: Option<Duration>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
pub ingest_batch_size: Option<NonZeroU64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
@@ -501,9 +495,6 @@ impl TenantConfOpt {
|
||||
.unwrap_or(global_conf.evictions_low_residence_duration_metric_threshold),
|
||||
gc_feedback: self.gc_feedback.unwrap_or(global_conf.gc_feedback),
|
||||
heatmap_period: self.heatmap_period.unwrap_or(global_conf.heatmap_period),
|
||||
ingest_batch_size: self
|
||||
.ingest_batch_size
|
||||
.unwrap_or(global_conf.ingest_batch_size),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -542,8 +533,6 @@ impl Default for TenantConf {
|
||||
.expect("cannot parse default evictions_low_residence_duration_metric_threshold"),
|
||||
gc_feedback: false,
|
||||
heatmap_period: Duration::ZERO,
|
||||
ingest_batch_size: NonZeroU64::new(DEFAULT_INGEST_BATCH_SIZE)
|
||||
.expect("cannot parse default ingest_batch_size"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1430,10 +1430,7 @@ impl Timeline {
|
||||
.tenant_conf
|
||||
.max_lsn_wal_lag
|
||||
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag);
|
||||
let ingest_batch_size = tenant_conf_guard
|
||||
.tenant_conf
|
||||
.ingest_batch_size
|
||||
.unwrap_or(self.conf.default_tenant_conf.ingest_batch_size);
|
||||
let ingest_batch_size = self.conf.ingest_batch_size;
|
||||
drop(tenant_conf_guard);
|
||||
|
||||
let mut guard = self.walreceiver.lock().unwrap();
|
||||
|
||||
@@ -58,7 +58,7 @@ pub struct WalReceiverConf {
|
||||
pub max_lsn_wal_lag: NonZeroU64,
|
||||
pub auth_token: Option<Arc<String>>,
|
||||
pub availability_zone: Option<String>,
|
||||
pub ingest_batch_size: NonZeroU64,
|
||||
pub ingest_batch_size: u64,
|
||||
}
|
||||
|
||||
pub struct WalReceiver {
|
||||
|
||||
@@ -1347,7 +1347,7 @@ mod tests {
|
||||
max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(),
|
||||
auth_token: None,
|
||||
availability_zone: None,
|
||||
ingest_batch_size: NonZeroU64::new(1).unwrap(),
|
||||
ingest_batch_size: 1,
|
||||
},
|
||||
wal_connection: None,
|
||||
wal_stream_candidates: HashMap::new(),
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
|
||||
use std::{
|
||||
error::Error,
|
||||
num::NonZeroU64,
|
||||
pin::pin,
|
||||
str::FromStr,
|
||||
sync::Arc,
|
||||
@@ -116,7 +115,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
connect_timeout: Duration,
|
||||
ctx: RequestContext,
|
||||
node: NodeId,
|
||||
ingest_batch_size: NonZeroU64,
|
||||
ingest_batch_size: u64,
|
||||
) -> Result<(), WalReceiverError> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
@@ -337,7 +336,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
|
||||
uncommitted_records += 1;
|
||||
// Commit every ingest_batch_size records.
|
||||
if uncommitted_records >= ingest_batch_size.get() {
|
||||
if uncommitted_records >= ingest_batch_size {
|
||||
modification.commit(&ctx).await?;
|
||||
uncommitted_records = 0;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user