diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 20b5e567a8..02339f752c 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -136,6 +136,10 @@ struct Cli { requires = "compute-id" )] pub control_plane_uri: Option, + + /// Interval in seconds for collecting installed extensions statistics + #[arg(long, default_value = "3600")] + pub installed_extensions_collection_interval: u64, } fn main() -> Result<()> { @@ -179,6 +183,7 @@ fn main() -> Result<()> { cgroup: cli.cgroup, #[cfg(target_os = "linux")] vm_monitor_addr: cli.vm_monitor_addr, + installed_extensions_collection_interval: cli.installed_extensions_collection_interval, }, config, )?; diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index cb857e0a3e..ff49c737f0 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -97,6 +97,9 @@ pub struct ComputeNodeParams { /// the address of extension storage proxy gateway pub remote_ext_base_url: Option, + + /// Interval for installed extensions collection + pub installed_extensions_collection_interval: u64, } /// Compute node info shared across several `compute_ctl` threads. @@ -742,17 +745,7 @@ impl ComputeNode { let conf = self.get_tokio_conn_conf(None); tokio::task::spawn(async { - let res = get_installed_extensions(conf).await; - match res { - Ok(extensions) => { - info!( - "[NEON_EXT_STAT] {}", - serde_json::to_string(&extensions) - .expect("failed to serialize extensions list") - ); - } - Err(err) => error!("could not get installed extensions: {err:?}"), - } + let _ = installed_extensions(conf).await; }); } @@ -782,6 +775,9 @@ impl ComputeNode { // Log metrics so that we can search for slow operations in logs info!(?metrics, postmaster_pid = %postmaster_pid, "compute start finished"); + // Spawn the extension stats background task + self.spawn_extension_stats_task(); + if pspec.spec.prewarm_lfc_on_startup { self.prewarm_lfc(); } @@ -2192,6 +2188,41 @@ LIMIT 100", info!("Pageserver config changed"); } } + + pub fn spawn_extension_stats_task(&self) { + let conf = self.tokio_conn_conf.clone(); + let installed_extensions_collection_interval = + self.params.installed_extensions_collection_interval; + tokio::spawn(async move { + // An initial sleep is added to ensure that two collections don't happen at the same time. + // The first collection happens during compute startup. + tokio::time::sleep(tokio::time::Duration::from_secs( + installed_extensions_collection_interval, + )) + .await; + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs( + installed_extensions_collection_interval, + )); + loop { + interval.tick().await; + let _ = installed_extensions(conf.clone()).await; + } + }); + } +} + +pub async fn installed_extensions(conf: tokio_postgres::Config) -> Result<()> { + let res = get_installed_extensions(conf).await; + match res { + Ok(extensions) => { + info!( + "[NEON_EXT_STAT] {}", + serde_json::to_string(&extensions).expect("failed to serialize extensions list") + ); + } + Err(err) => error!("could not get installed extensions: {err:?}"), + } + Ok(()) } pub fn forward_termination_signal() { diff --git a/docker-compose/compute_wrapper/shell/compute.sh b/docker-compose/compute_wrapper/shell/compute.sh index 20a1ffb7a0..ab8d74d355 100755 --- a/docker-compose/compute_wrapper/shell/compute.sh +++ b/docker-compose/compute_wrapper/shell/compute.sh @@ -20,7 +20,7 @@ first_path="$(ldconfig --verbose 2>/dev/null \ | grep --invert-match ^$'\t' \ | cut --delimiter=: --fields=1 \ | head --lines=1)" -test "$first_path" == '/usr/local/lib' || true # Remove the || true in a follow-up PR. Needed for backwards compat. +test "$first_path" == '/usr/local/lib' echo "Waiting pageserver become ready." while ! nc -z pageserver 6400; do diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 9f3736d57a..e7d612bb7a 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -354,6 +354,9 @@ pub struct ShardImportProgressV1 { pub completed: usize, /// Hash of the plan pub import_plan_hash: u64, + /// Soft limit for the job size + /// This needs to remain constant throughout the import + pub job_soft_size_limit: usize, } impl ShardImportStatus { diff --git a/libs/posthog_client_lite/src/lib.rs b/libs/posthog_client_lite/src/lib.rs index 21e978df3e..8aa8da2898 100644 --- a/libs/posthog_client_lite/src/lib.rs +++ b/libs/posthog_client_lite/src/lib.rs @@ -37,7 +37,7 @@ pub struct LocalEvaluationFlag { #[derive(Deserialize)] pub struct LocalEvaluationFlagFilters { groups: Vec, - multivariate: LocalEvaluationFlagMultivariate, + multivariate: Option, } #[derive(Deserialize)] @@ -254,7 +254,7 @@ impl FeatureStore { } } - /// Evaluate a multivariate feature flag. Returns `None` if the flag is not available or if there are errors + /// Evaluate a multivariate feature flag. Returns an error if the flag is not available or if there are errors /// during the evaluation. /// /// The parsing logic is as follows: @@ -272,6 +272,10 @@ impl FeatureStore { /// Example: we have a multivariate flag with 3 groups of the configured global rollout percentage: A (10%), B (20%), C (70%). /// There is a single group with a condition that has a rollout percentage of 10% and it does not have a variant override. /// Then, we will have 1% of the users evaluated to A, 2% to B, and 7% to C. + /// + /// Error handling: the caller should inspect the error and decide the behavior when a feature flag + /// cannot be evaluated (i.e., default to false if it cannot be resolved). The error should *not* be + /// propagated beyond where the feature flag gets resolved. pub fn evaluate_multivariate( &self, flag_key: &str, @@ -290,6 +294,35 @@ impl FeatureStore { ) } + /// Evaluate a boolean feature flag. Returns an error if the flag is not available or if there are errors + /// during the evaluation. + /// + /// The parsing logic is as follows: + /// + /// * Generate a consistent hash for the tenant-feature. + /// * Match each filter group. + /// - If a group is matched, it will first determine whether the user is in the range of the rollout + /// percentage. + /// - If the hash falls within the group's rollout percentage, return true. + /// * Otherwise, continue with the next group until all groups are evaluated and no group is within the + /// rollout percentage. + /// * If there are no matching groups, return an error. + /// + /// Returns `Ok(())` if the feature flag evaluates to true. In the future, it will return a payload. + /// + /// Error handling: the caller should inspect the error and decide the behavior when a feature flag + /// cannot be evaluated (i.e., default to false if it cannot be resolved). The error should *not* be + /// propagated beyond where the feature flag gets resolved. + pub fn evaluate_boolean( + &self, + flag_key: &str, + user_id: &str, + properties: &HashMap, + ) -> Result<(), PostHogEvaluationError> { + let hash_on_global_rollout_percentage = Self::consistent_hash(user_id, flag_key, "boolean"); + self.evaluate_boolean_inner(flag_key, hash_on_global_rollout_percentage, properties) + } + /// Evaluate a multivariate feature flag. Note that we directly take the mapped user ID /// (a consistent hash ranging from 0 to 1) so that it is easier to use it in the tests /// and avoid duplicate computations. @@ -316,6 +349,11 @@ impl FeatureStore { flag_key ))); } + let Some(ref multivariate) = flag_config.filters.multivariate else { + return Err(PostHogEvaluationError::Internal(format!( + "No multivariate available, should use evaluate_boolean?: {flag_key}" + ))); + }; // TODO: sort the groups so that variant overrides always get evaluated first and it follows the PostHog // Python SDK behavior; for now we do not configure conditions without variant overrides in Neon so it // does not matter. @@ -324,7 +362,7 @@ impl FeatureStore { GroupEvaluationResult::MatchedAndOverride(variant) => return Ok(variant), GroupEvaluationResult::MatchedAndEvaluate => { let mut percentage = 0; - for variant in &flag_config.filters.multivariate.variants { + for variant in &multivariate.variants { percentage += variant.rollout_percentage; if self .evaluate_percentage(hash_on_global_rollout_percentage, percentage) @@ -352,6 +390,64 @@ impl FeatureStore { ))) } } + + /// Evaluate a multivariate feature flag. Note that we directly take the mapped user ID + /// (a consistent hash ranging from 0 to 1) so that it is easier to use it in the tests + /// and avoid duplicate computations. + /// + /// Use a different consistent hash for evaluating the group rollout percentage. + /// The behavior: if the condition is set to rolling out to 10% of the users, and + /// we set the variant A to 20% in the global config, then 2% of the total users will + /// be evaluated to variant A. + /// + /// Note that the hash to determine group rollout percentage is shared across all groups. So if we have two + /// exactly-the-same conditions with 10% and 20% rollout percentage respectively, a total of 20% of the users + /// will be evaluated (versus 30% if group evaluation is done independently). + pub(crate) fn evaluate_boolean_inner( + &self, + flag_key: &str, + hash_on_global_rollout_percentage: f64, + properties: &HashMap, + ) -> Result<(), PostHogEvaluationError> { + if let Some(flag_config) = self.flags.get(flag_key) { + if !flag_config.active { + return Err(PostHogEvaluationError::NotAvailable(format!( + "The feature flag is not active: {}", + flag_key + ))); + } + if flag_config.filters.multivariate.is_some() { + return Err(PostHogEvaluationError::Internal(format!( + "This looks like a multivariate flag, should use evaluate_multivariate?: {flag_key}" + ))); + }; + // TODO: sort the groups so that variant overrides always get evaluated first and it follows the PostHog + // Python SDK behavior; for now we do not configure conditions without variant overrides in Neon so it + // does not matter. + for group in &flag_config.filters.groups { + match self.evaluate_group(group, hash_on_global_rollout_percentage, properties)? { + GroupEvaluationResult::MatchedAndOverride(_) => { + return Err(PostHogEvaluationError::Internal(format!( + "Boolean flag cannot have overrides: {}", + flag_key + ))); + } + GroupEvaluationResult::MatchedAndEvaluate => { + return Ok(()); + } + GroupEvaluationResult::Unmatched => continue, + } + } + // If no group is matched, the feature is not available, and up to the caller to decide what to do. + Err(PostHogEvaluationError::NoConditionGroupMatched) + } else { + // The feature flag is not available yet + Err(PostHogEvaluationError::NotAvailable(format!( + "Not found in the local evaluation spec: {}", + flag_key + ))) + } + } } pub struct PostHogClientConfig { @@ -469,95 +565,162 @@ mod tests { fn data() -> &'static str { r#"{ - "flags": [ - { - "id": 132794, - "team_id": 152860, - "name": "", - "key": "gc-compaction", - "filters": { - "groups": [ - { - "variant": "enabled-stage-2", - "properties": [ - { - "key": "plan_type", - "type": "person", - "value": [ - "free" - ], - "operator": "exact" - }, - { - "key": "pageserver_remote_size", - "type": "person", - "value": "10000000", - "operator": "lt" - } - ], - "rollout_percentage": 50 - }, - { - "properties": [ - { - "key": "plan_type", - "type": "person", - "value": [ - "free" - ], - "operator": "exact" - }, - { - "key": "pageserver_remote_size", - "type": "person", - "value": "10000000", - "operator": "lt" - } - ], - "rollout_percentage": 80 - } - ], - "payloads": {}, - "multivariate": { - "variants": [ - { - "key": "disabled", - "name": "", - "rollout_percentage": 90 - }, - { - "key": "enabled-stage-1", - "name": "", - "rollout_percentage": 10 - }, - { - "key": "enabled-stage-2", - "name": "", - "rollout_percentage": 0 - }, - { - "key": "enabled-stage-3", - "name": "", - "rollout_percentage": 0 - }, - { - "key": "enabled", - "name": "", - "rollout_percentage": 0 - } - ] - } - }, - "deleted": false, - "active": true, - "ensure_experience_continuity": false, - "has_encrypted_payloads": false, - "version": 6 - } + "flags": [ + { + "id": 141807, + "team_id": 152860, + "name": "", + "key": "image-compaction-boundary", + "filters": { + "groups": [ + { + "variant": null, + "properties": [ + { + "key": "plan_type", + "type": "person", + "value": [ + "free" + ], + "operator": "exact" + } ], - "group_type_mapping": {}, - "cohorts": {} - }"# + "rollout_percentage": 40 + }, + { + "variant": null, + "properties": [], + "rollout_percentage": 10 + } + ], + "payloads": {}, + "multivariate": null + }, + "deleted": false, + "active": true, + "ensure_experience_continuity": false, + "has_encrypted_payloads": false, + "version": 1 + }, + { + "id": 135586, + "team_id": 152860, + "name": "", + "key": "boolean-flag", + "filters": { + "groups": [ + { + "variant": null, + "properties": [ + { + "key": "plan_type", + "type": "person", + "value": [ + "free" + ], + "operator": "exact" + } + ], + "rollout_percentage": 47 + } + ], + "payloads": {}, + "multivariate": null + }, + "deleted": false, + "active": true, + "ensure_experience_continuity": false, + "has_encrypted_payloads": false, + "version": 1 + }, + { + "id": 132794, + "team_id": 152860, + "name": "", + "key": "gc-compaction", + "filters": { + "groups": [ + { + "variant": "enabled-stage-2", + "properties": [ + { + "key": "plan_type", + "type": "person", + "value": [ + "free" + ], + "operator": "exact" + }, + { + "key": "pageserver_remote_size", + "type": "person", + "value": "10000000", + "operator": "lt" + } + ], + "rollout_percentage": 50 + }, + { + "properties": [ + { + "key": "plan_type", + "type": "person", + "value": [ + "free" + ], + "operator": "exact" + }, + { + "key": "pageserver_remote_size", + "type": "person", + "value": "10000000", + "operator": "lt" + } + ], + "rollout_percentage": 80 + } + ], + "payloads": {}, + "multivariate": { + "variants": [ + { + "key": "disabled", + "name": "", + "rollout_percentage": 90 + }, + { + "key": "enabled-stage-1", + "name": "", + "rollout_percentage": 10 + }, + { + "key": "enabled-stage-2", + "name": "", + "rollout_percentage": 0 + }, + { + "key": "enabled-stage-3", + "name": "", + "rollout_percentage": 0 + }, + { + "key": "enabled", + "name": "", + "rollout_percentage": 0 + } + ] + } + }, + "deleted": false, + "active": true, + "ensure_experience_continuity": false, + "has_encrypted_payloads": false, + "version": 7 + } + ], + "group_type_mapping": {}, + "cohorts": {} +}"# } #[test] @@ -633,4 +796,125 @@ mod tests { Err(PostHogEvaluationError::NoConditionGroupMatched) ),); } + + #[test] + fn evaluate_boolean_1() { + // The `boolean-flag` feature flag only has one group that matches on the free user. + + let mut store = FeatureStore::new(); + let response: LocalEvaluationResponse = serde_json::from_str(data()).unwrap(); + store.set_flags(response.flags); + + // This lacks the required properties and cannot be evaluated. + let variant = store.evaluate_boolean_inner("boolean-flag", 1.00, &HashMap::new()); + assert!(matches!( + variant, + Err(PostHogEvaluationError::NotAvailable(_)) + ),); + + let properties_unmatched = HashMap::from([ + ( + "plan_type".to_string(), + PostHogFlagFilterPropertyValue::String("paid".to_string()), + ), + ( + "pageserver_remote_size".to_string(), + PostHogFlagFilterPropertyValue::Number(1000.0), + ), + ]); + + // This does not match any group so there will be an error. + let variant = store.evaluate_boolean_inner("boolean-flag", 1.00, &properties_unmatched); + assert!(matches!( + variant, + Err(PostHogEvaluationError::NoConditionGroupMatched) + ),); + + let properties = HashMap::from([ + ( + "plan_type".to_string(), + PostHogFlagFilterPropertyValue::String("free".to_string()), + ), + ( + "pageserver_remote_size".to_string(), + PostHogFlagFilterPropertyValue::Number(1000.0), + ), + ]); + + // It matches the first group as 0.10 <= 0.50 and the properties are matched. Then it gets evaluated to the variant override. + let variant = store.evaluate_boolean_inner("boolean-flag", 0.10, &properties); + assert!(variant.is_ok()); + + // It matches the group conditions but not the group rollout percentage. + let variant = store.evaluate_boolean_inner("boolean-flag", 1.00, &properties); + assert!(matches!( + variant, + Err(PostHogEvaluationError::NoConditionGroupMatched) + ),); + } + + #[test] + fn evaluate_boolean_2() { + // The `image-compaction-boundary` feature flag has one group that matches on the free user and a group that matches on all users. + + let mut store = FeatureStore::new(); + let response: LocalEvaluationResponse = serde_json::from_str(data()).unwrap(); + store.set_flags(response.flags); + + // This lacks the required properties and cannot be evaluated. + let variant = + store.evaluate_boolean_inner("image-compaction-boundary", 1.00, &HashMap::new()); + assert!(matches!( + variant, + Err(PostHogEvaluationError::NotAvailable(_)) + ),); + + let properties_unmatched = HashMap::from([ + ( + "plan_type".to_string(), + PostHogFlagFilterPropertyValue::String("paid".to_string()), + ), + ( + "pageserver_remote_size".to_string(), + PostHogFlagFilterPropertyValue::Number(1000.0), + ), + ]); + + // This does not match the filtered group but the all user group. + let variant = + store.evaluate_boolean_inner("image-compaction-boundary", 1.00, &properties_unmatched); + assert!(matches!( + variant, + Err(PostHogEvaluationError::NoConditionGroupMatched) + ),); + let variant = + store.evaluate_boolean_inner("image-compaction-boundary", 0.05, &properties_unmatched); + assert!(variant.is_ok()); + + let properties = HashMap::from([ + ( + "plan_type".to_string(), + PostHogFlagFilterPropertyValue::String("free".to_string()), + ), + ( + "pageserver_remote_size".to_string(), + PostHogFlagFilterPropertyValue::Number(1000.0), + ), + ]); + + // It matches the first group as 0.30 <= 0.40 and the properties are matched. Then it gets evaluated to the variant override. + let variant = store.evaluate_boolean_inner("image-compaction-boundary", 0.30, &properties); + assert!(variant.is_ok()); + + // It matches the group conditions but not the group rollout percentage. + let variant = store.evaluate_boolean_inner("image-compaction-boundary", 1.00, &properties); + assert!(matches!( + variant, + Err(PostHogEvaluationError::NoConditionGroupMatched) + ),); + + // It matches the second "all" group conditions. + let variant = store.evaluate_boolean_inner("image-compaction-boundary", 0.09, &properties); + assert!(variant.is_ok()); + } } diff --git a/pageserver/src/feature_resolver.rs b/pageserver/src/feature_resolver.rs index 193fb10abc..2b0f368079 100644 --- a/pageserver/src/feature_resolver.rs +++ b/pageserver/src/feature_resolver.rs @@ -45,6 +45,10 @@ impl FeatureResolver { } /// Evaluate a multivariate feature flag. Currently, we do not support any properties. + /// + /// Error handling: the caller should inspect the error and decide the behavior when a feature flag + /// cannot be evaluated (i.e., default to false if it cannot be resolved). The error should *not* be + /// propagated beyond where the feature flag gets resolved. pub fn evaluate_multivariate( &self, flag_key: &str, @@ -62,4 +66,29 @@ impl FeatureResolver { )) } } + + /// Evaluate a boolean feature flag. Currently, we do not support any properties. + /// + /// Returns `Ok(())` if the flag is evaluated to true, otherwise returns an error. + /// + /// Error handling: the caller should inspect the error and decide the behavior when a feature flag + /// cannot be evaluated (i.e., default to false if it cannot be resolved). The error should *not* be + /// propagated beyond where the feature flag gets resolved. + pub fn evaluate_boolean( + &self, + flag_key: &str, + tenant_id: TenantId, + ) -> Result<(), PostHogEvaluationError> { + if let Some(inner) = &self.inner { + inner.feature_store().evaluate_boolean( + flag_key, + &tenant_id.to_string(), + &HashMap::new(), + ) + } else { + Err(PostHogEvaluationError::NotAvailable( + "PostHog integration is not enabled".to_string(), + )) + } + } } diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index cf99cb110c..e8d1367d6c 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -353,6 +353,33 @@ paths: "200": description: OK + /v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/mark_invisible: + parameters: + - name: tenant_shard_id + in: path + required: true + schema: + type: string + - name: timeline_id + in: path + required: true + schema: + type: string + format: hex + put: + requestBody: + content: + application/json: + schema: + type: object + properties: + is_visible: + type: boolean + default: false + responses: + "200": + description: OK + /v1/tenant/{tenant_shard_id}/location_config: parameters: - name: tenant_shard_id diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 06aa207f82..e96787e027 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -769,6 +769,9 @@ struct BatchedGetPageRequest { timer: SmgrOpTimer, lsn_range: LsnRange, ctx: RequestContext, + // If the request is perf enabled, this contains a context + // with a perf span tracking the time spent waiting for the executor. + batch_wait_ctx: Option, } #[cfg(feature = "testing")] @@ -781,6 +784,7 @@ struct BatchedTestRequest { /// so that we don't keep the [`Timeline::gate`] open while the batch /// is being built up inside the [`spsc_fold`] (pagestream pipelining). #[derive(IntoStaticStr)] +#[allow(clippy::large_enum_variant)] enum BatchedFeMessage { Exists { span: Span, @@ -1298,6 +1302,22 @@ impl PageServerHandler { } }; + let batch_wait_ctx = if ctx.has_perf_span() { + Some( + RequestContextBuilder::from(&ctx) + .perf_span(|crnt_perf_span| { + info_span!( + target: PERF_TRACE_TARGET, + parent: crnt_perf_span, + "WAIT_EXECUTOR", + ) + }) + .attached_child(), + ) + } else { + None + }; + BatchedFeMessage::GetPage { span, shard: shard.downgrade(), @@ -1309,6 +1329,7 @@ impl PageServerHandler { request_lsn: req.hdr.request_lsn }, ctx, + batch_wait_ctx, }], // The executor grabs the batch when it becomes idle. // Hence, [`GetPageBatchBreakReason::ExecutorSteal`] is the @@ -1464,7 +1485,7 @@ impl PageServerHandler { let mut flush_timers = Vec::with_capacity(handler_results.len()); for handler_result in &mut handler_results { let flush_timer = match handler_result { - Ok((_, timer)) => Some( + Ok((_response, timer, _ctx)) => Some( timer .observe_execution_end(flushing_start_time) .expect("we are the first caller"), @@ -1484,7 +1505,7 @@ impl PageServerHandler { // Some handler errors cause exit from pagestream protocol. // Other handler errors are sent back as an error message and we stay in pagestream protocol. for (handler_result, flushing_timer) in handler_results.into_iter().zip(flush_timers) { - let response_msg = match handler_result { + let (response_msg, ctx) = match handler_result { Err(e) => match &e.err { PageStreamError::Shutdown => { // If we fail to fulfil a request during shutdown, which may be _because_ of @@ -1509,15 +1530,30 @@ impl PageServerHandler { error!("error reading relation or page version: {full:#}") }); - PagestreamBeMessage::Error(PagestreamErrorResponse { - req: e.req, - message: e.err.to_string(), - }) + ( + PagestreamBeMessage::Error(PagestreamErrorResponse { + req: e.req, + message: e.err.to_string(), + }), + None, + ) } }, - Ok((response_msg, _op_timer_already_observed)) => response_msg, + Ok((response_msg, _op_timer_already_observed, ctx)) => (response_msg, Some(ctx)), }; + let ctx = ctx.map(|req_ctx| { + RequestContextBuilder::from(&req_ctx) + .perf_span(|crnt_perf_span| { + info_span!( + target: PERF_TRACE_TARGET, + parent: crnt_perf_span, + "FLUSH_RESPONSE", + ) + }) + .attached_child() + }); + // // marshal & transmit response message // @@ -1540,6 +1576,17 @@ impl PageServerHandler { )), None => futures::future::Either::Right(flush_fut), }; + + let flush_fut = if let Some(req_ctx) = ctx.as_ref() { + futures::future::Either::Left( + flush_fut.maybe_perf_instrument(req_ctx, |current_perf_span| { + current_perf_span.clone() + }), + ) + } else { + futures::future::Either::Right(flush_fut) + }; + // do it while respecting cancellation let _: () = async move { tokio::select! { @@ -1569,7 +1616,7 @@ impl PageServerHandler { ctx: &RequestContext, ) -> Result< ( - Vec>, + Vec>, Span, ), QueryError, @@ -1596,7 +1643,7 @@ impl PageServerHandler { self.handle_get_rel_exists_request(&shard, &req, &ctx) .instrument(span.clone()) .await - .map(|msg| (msg, timer)) + .map(|msg| (msg, timer, ctx)) .map_err(|err| BatchedPageStreamError { err, req: req.hdr }), ], span, @@ -1615,7 +1662,7 @@ impl PageServerHandler { self.handle_get_nblocks_request(&shard, &req, &ctx) .instrument(span.clone()) .await - .map(|msg| (msg, timer)) + .map(|msg| (msg, timer, ctx)) .map_err(|err| BatchedPageStreamError { err, req: req.hdr }), ], span, @@ -1662,7 +1709,7 @@ impl PageServerHandler { self.handle_db_size_request(&shard, &req, &ctx) .instrument(span.clone()) .await - .map(|msg| (msg, timer)) + .map(|msg| (msg, timer, ctx)) .map_err(|err| BatchedPageStreamError { err, req: req.hdr }), ], span, @@ -1681,7 +1728,7 @@ impl PageServerHandler { self.handle_get_slru_segment_request(&shard, &req, &ctx) .instrument(span.clone()) .await - .map(|msg| (msg, timer)) + .map(|msg| (msg, timer, ctx)) .map_err(|err| BatchedPageStreamError { err, req: req.hdr }), ], span, @@ -2033,12 +2080,25 @@ impl PageServerHandler { return Ok(()); } }; - let batch = match batch { + let mut batch = match batch { Ok(batch) => batch, Err(e) => { return Err(e); } }; + + if let BatchedFeMessage::GetPage { + pages, + span: _, + shard: _, + batch_break_reason: _, + } = &mut batch + { + for req in pages { + req.batch_wait_ctx.take(); + } + } + self.pagestream_handle_batched_message( pgb_writer, batch, @@ -2351,7 +2411,8 @@ impl PageServerHandler { io_concurrency: IoConcurrency, batch_break_reason: GetPageBatchBreakReason, ctx: &RequestContext, - ) -> Vec> { + ) -> Vec> + { debug_assert_current_span_has_tenant_and_timeline_id(); timeline @@ -2458,6 +2519,7 @@ impl PageServerHandler { page, }), req.timer, + req.ctx, ) }) .map_err(|e| BatchedPageStreamError { @@ -2502,7 +2564,8 @@ impl PageServerHandler { timeline: &Timeline, requests: Vec, _ctx: &RequestContext, - ) -> Vec> { + ) -> Vec> + { // real requests would do something with the timeline let mut results = Vec::with_capacity(requests.len()); for _req in requests.iter() { @@ -2529,6 +2592,10 @@ impl PageServerHandler { req: req.req.clone(), }), req.timer, + RequestContext::new( + TaskKind::PageRequestHandler, + DownloadBehavior::Warn, + ), ) }) .map_err(|e| BatchedPageStreamError { diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 86731fb666..58b766933d 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -5315,6 +5315,7 @@ impl TenantShard { l0_compaction_trigger: self.l0_compaction_trigger.clone(), l0_flush_global_state: self.l0_flush_global_state.clone(), basebackup_prepare_sender: self.basebackup_prepare_sender.clone(), + feature_resolver: self.feature_resolver.clone(), } } @@ -8359,10 +8360,24 @@ mod tests { } tline.freeze_and_flush().await?; + // Force layers to L1 + tline + .compact( + &cancel, + { + let mut flags = EnumSet::new(); + flags.insert(CompactFlags::ForceL0Compaction); + flags + }, + &ctx, + ) + .await?; if iter % 5 == 0 { + let scan_lsn = Lsn(lsn.0 + 1); + info!("scanning at {}", scan_lsn); let (_, before_delta_file_accessed) = - scan_with_statistics(&tline, &keyspace, lsn, &ctx, io_concurrency.clone()) + scan_with_statistics(&tline, &keyspace, scan_lsn, &ctx, io_concurrency.clone()) .await?; tline .compact( @@ -8371,13 +8386,14 @@ mod tests { let mut flags = EnumSet::new(); flags.insert(CompactFlags::ForceImageLayerCreation); flags.insert(CompactFlags::ForceRepartition); + flags.insert(CompactFlags::ForceL0Compaction); flags }, &ctx, ) .await?; let (_, after_delta_file_accessed) = - scan_with_statistics(&tline, &keyspace, lsn, &ctx, io_concurrency.clone()) + scan_with_statistics(&tline, &keyspace, scan_lsn, &ctx, io_concurrency.clone()) .await?; assert!( after_delta_file_accessed < before_delta_file_accessed, @@ -8818,6 +8834,8 @@ mod tests { let cancel = CancellationToken::new(); + // Image layer creation happens on the disk_consistent_lsn so we need to force set it now. + tline.force_set_disk_consistent_lsn(Lsn(0x40)); tline .compact( &cancel, @@ -8831,8 +8849,7 @@ mod tests { ) .await .unwrap(); - - // Image layers are created at last_record_lsn + // Image layers are created at repartition LSN let images = tline .inspect_image_layers(Lsn(0x40), &ctx, io_concurrency.clone()) .await diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 54dc3b2d0b..71765b9197 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -103,6 +103,7 @@ use crate::context::{ DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder, }; use crate::disk_usage_eviction_task::{DiskUsageEvictionInfo, EvictionCandidate, finite_f32}; +use crate::feature_resolver::FeatureResolver; use crate::keyspace::{KeyPartitioning, KeySpace}; use crate::l0_flush::{self, L0FlushGlobalState}; use crate::metrics::{ @@ -198,6 +199,7 @@ pub struct TimelineResources { pub l0_compaction_trigger: Arc, pub l0_flush_global_state: l0_flush::L0FlushGlobalState, pub basebackup_prepare_sender: BasebackupPrepareSender, + pub feature_resolver: FeatureResolver, } pub struct Timeline { @@ -444,6 +446,8 @@ pub struct Timeline { /// A channel to send async requests to prepare a basebackup for the basebackup cache. basebackup_prepare_sender: BasebackupPrepareSender, + + feature_resolver: FeatureResolver, } pub(crate) enum PreviousHeatmap { @@ -3072,6 +3076,8 @@ impl Timeline { wait_lsn_log_slow: tokio::sync::Semaphore::new(1), basebackup_prepare_sender: resources.basebackup_prepare_sender, + + feature_resolver: resources.feature_resolver, }; result.repartition_threshold = @@ -4906,6 +4912,7 @@ impl Timeline { LastImageLayerCreationStatus::Initial, false, // don't yield for L0, we're flushing L0 ) + .instrument(info_span!("create_image_layers", mode = %ImageLayerCreationMode::Initial, partition_mode = "initial", lsn = %self.initdb_lsn)) .await?; debug_assert!( matches!(is_complete, LastImageLayerCreationStatus::Complete), @@ -5462,7 +5469,8 @@ impl Timeline { /// Returns the image layers generated and an enum indicating whether the process is fully completed. /// true = we have generate all image layers, false = we preempt the process for L0 compaction. - #[tracing::instrument(skip_all, fields(%lsn, %mode))] + /// + /// `partition_mode` is only for logging purpose and is not used anywhere in this function. async fn create_image_layers( self: &Arc, partitioning: &KeyPartitioning, diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 0e4b14c3e4..143c2e0865 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -1278,11 +1278,55 @@ impl Timeline { } let gc_cutoff = *self.applied_gc_cutoff_lsn.read(); + let l0_l1_boundary_lsn = { + // We do the repartition on the L0-L1 boundary. All data below the boundary + // are compacted by L0 with low read amplification, thus making the `repartition` + // function run fast. + let guard = self.layers.read().await; + guard + .all_persistent_layers() + .iter() + .map(|x| { + // Use the end LSN of delta layers OR the start LSN of image layers. + if x.is_delta { + x.lsn_range.end + } else { + x.lsn_range.start + } + }) + .max() + }; + + let (partition_mode, partition_lsn) = if cfg!(test) + || cfg!(feature = "testing") + || self + .feature_resolver + .evaluate_boolean("image-compaction-boundary", self.tenant_shard_id.tenant_id) + .is_ok() + { + let last_repartition_lsn = self.partitioning.read().1; + let lsn = match l0_l1_boundary_lsn { + Some(boundary) => gc_cutoff + .max(boundary) + .max(last_repartition_lsn) + .max(self.initdb_lsn) + .max(self.ancestor_lsn), + None => self.get_last_record_lsn(), + }; + if lsn <= self.initdb_lsn || lsn <= self.ancestor_lsn { + // Do not attempt to create image layers below the initdb or ancestor LSN -- no data below it + ("l0_l1_boundary", self.get_last_record_lsn()) + } else { + ("l0_l1_boundary", lsn) + } + } else { + ("latest_record", self.get_last_record_lsn()) + }; // 2. Repartition and create image layers if necessary match self .repartition( - self.get_last_record_lsn(), + partition_lsn, self.get_compaction_target_size(), options.flags, ctx, @@ -1301,18 +1345,19 @@ impl Timeline { .extend(sparse_partitioning.into_dense().parts); // 3. Create new image layers for partitions that have been modified "enough". + let mode = if options + .flags + .contains(CompactFlags::ForceImageLayerCreation) + { + ImageLayerCreationMode::Force + } else { + ImageLayerCreationMode::Try + }; let (image_layers, outcome) = self .create_image_layers( &partitioning, lsn, - if options - .flags - .contains(CompactFlags::ForceImageLayerCreation) - { - ImageLayerCreationMode::Force - } else { - ImageLayerCreationMode::Try - }, + mode, &image_ctx, self.last_image_layer_creation_status .load() @@ -1320,6 +1365,7 @@ impl Timeline { .clone(), options.flags.contains(CompactFlags::YieldForL0), ) + .instrument(info_span!("create_image_layers", mode = %mode, partition_mode = %partition_mode, lsn = %lsn)) .await .inspect_err(|err| { if let CreateImageLayersError::GetVectoredError( @@ -1344,7 +1390,8 @@ impl Timeline { } Ok(_) => { - info!("skipping repartitioning due to image compaction LSN being below GC cutoff"); + // This happens very frequently so we don't want to log it. + debug!("skipping repartitioning due to image compaction LSN being below GC cutoff"); } // Suppress errors when cancelled. diff --git a/pageserver/src/tenant/timeline/import_pgdata/flow.rs b/pageserver/src/tenant/timeline/import_pgdata/flow.rs index 2ba4ca69ac..0d87a2f135 100644 --- a/pageserver/src/tenant/timeline/import_pgdata/flow.rs +++ b/pageserver/src/tenant/timeline/import_pgdata/flow.rs @@ -30,6 +30,7 @@ use std::collections::HashSet; use std::hash::{Hash, Hasher}; +use std::num::NonZeroUsize; use std::ops::Range; use std::sync::Arc; @@ -100,8 +101,24 @@ async fn run_v1( tasks: Vec::default(), }; - let import_config = &timeline.conf.timeline_import_config; - let plan = planner.plan(import_config).await?; + // Use the job size limit encoded in the progress if we are resuming an import. + // This ensures that imports have stable plans even if the pageserver config changes. + let import_config = { + match &import_progress { + Some(progress) => { + let base = &timeline.conf.timeline_import_config; + TimelineImportConfig { + import_job_soft_size_limit: NonZeroUsize::new(progress.job_soft_size_limit) + .unwrap(), + import_job_concurrency: base.import_job_concurrency, + import_job_checkpoint_threshold: base.import_job_checkpoint_threshold, + } + } + None => timeline.conf.timeline_import_config.clone(), + } + }; + + let plan = planner.plan(&import_config).await?; // Hash the plan and compare with the hash of the plan we got back from the storage controller. // If the two match, it means that the planning stage had the same output. @@ -126,7 +143,7 @@ async fn run_v1( pausable_failpoint!("import-timeline-pre-execute-pausable"); let start_from_job_idx = import_progress.map(|progress| progress.completed); - plan.execute(timeline, start_from_job_idx, plan_hash, import_config, ctx) + plan.execute(timeline, start_from_job_idx, plan_hash, &import_config, ctx) .await } @@ -453,6 +470,7 @@ impl Plan { jobs: jobs_in_plan, completed: last_completed_job_idx, import_plan_hash, + job_soft_size_limit: import_config.import_job_soft_size_limit.into(), }; timeline.remote_client.schedule_index_upload_for_file_changes()?; diff --git a/test_runner/regress/test_layers_from_future.py b/test_runner/regress/test_layers_from_future.py index b4eba2779d..f3fcdb0d14 100644 --- a/test_runner/regress/test_layers_from_future.py +++ b/test_runner/regress/test_layers_from_future.py @@ -20,6 +20,9 @@ from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind from fixtures.utils import query_scalar, wait_until +@pytest.mark.skip( + reason="We won't create future layers any more after https://github.com/neondatabase/neon/pull/10548" +) @pytest.mark.parametrize( "attach_mode", ["default_generation", "same_generation"],