From 30adf8e2bd8ccabd90764158effb9145590e1fd6 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Tue, 27 May 2025 14:57:53 +0100 Subject: [PATCH 1/7] pageserver: add tracing spans for time spent in batch and flushing (#12012) ## Problem We have some gaps in our traces. This indicates missing spans. ## Summary of changes This PR adds two new spans: * WAIT_EXECUTOR: time a batched request spends in the batch waiting to be picked up * FLUSH_RESPONSE: time a get page request spends flushing the response to the compute ![image](https://github.com/user-attachments/assets/41b3ddb8-438d-4375-9da3-da341fc0916a) --- pageserver/src/page_service.rs | 97 ++++++++++++++++++++++++++++------ 1 file changed, 82 insertions(+), 15 deletions(-) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 06aa207f82..e96787e027 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -769,6 +769,9 @@ struct BatchedGetPageRequest { timer: SmgrOpTimer, lsn_range: LsnRange, ctx: RequestContext, + // If the request is perf enabled, this contains a context + // with a perf span tracking the time spent waiting for the executor. + batch_wait_ctx: Option, } #[cfg(feature = "testing")] @@ -781,6 +784,7 @@ struct BatchedTestRequest { /// so that we don't keep the [`Timeline::gate`] open while the batch /// is being built up inside the [`spsc_fold`] (pagestream pipelining). #[derive(IntoStaticStr)] +#[allow(clippy::large_enum_variant)] enum BatchedFeMessage { Exists { span: Span, @@ -1298,6 +1302,22 @@ impl PageServerHandler { } }; + let batch_wait_ctx = if ctx.has_perf_span() { + Some( + RequestContextBuilder::from(&ctx) + .perf_span(|crnt_perf_span| { + info_span!( + target: PERF_TRACE_TARGET, + parent: crnt_perf_span, + "WAIT_EXECUTOR", + ) + }) + .attached_child(), + ) + } else { + None + }; + BatchedFeMessage::GetPage { span, shard: shard.downgrade(), @@ -1309,6 +1329,7 @@ impl PageServerHandler { request_lsn: req.hdr.request_lsn }, ctx, + batch_wait_ctx, }], // The executor grabs the batch when it becomes idle. // Hence, [`GetPageBatchBreakReason::ExecutorSteal`] is the @@ -1464,7 +1485,7 @@ impl PageServerHandler { let mut flush_timers = Vec::with_capacity(handler_results.len()); for handler_result in &mut handler_results { let flush_timer = match handler_result { - Ok((_, timer)) => Some( + Ok((_response, timer, _ctx)) => Some( timer .observe_execution_end(flushing_start_time) .expect("we are the first caller"), @@ -1484,7 +1505,7 @@ impl PageServerHandler { // Some handler errors cause exit from pagestream protocol. // Other handler errors are sent back as an error message and we stay in pagestream protocol. for (handler_result, flushing_timer) in handler_results.into_iter().zip(flush_timers) { - let response_msg = match handler_result { + let (response_msg, ctx) = match handler_result { Err(e) => match &e.err { PageStreamError::Shutdown => { // If we fail to fulfil a request during shutdown, which may be _because_ of @@ -1509,15 +1530,30 @@ impl PageServerHandler { error!("error reading relation or page version: {full:#}") }); - PagestreamBeMessage::Error(PagestreamErrorResponse { - req: e.req, - message: e.err.to_string(), - }) + ( + PagestreamBeMessage::Error(PagestreamErrorResponse { + req: e.req, + message: e.err.to_string(), + }), + None, + ) } }, - Ok((response_msg, _op_timer_already_observed)) => response_msg, + Ok((response_msg, _op_timer_already_observed, ctx)) => (response_msg, Some(ctx)), }; + let ctx = ctx.map(|req_ctx| { + RequestContextBuilder::from(&req_ctx) + .perf_span(|crnt_perf_span| { + info_span!( + target: PERF_TRACE_TARGET, + parent: crnt_perf_span, + "FLUSH_RESPONSE", + ) + }) + .attached_child() + }); + // // marshal & transmit response message // @@ -1540,6 +1576,17 @@ impl PageServerHandler { )), None => futures::future::Either::Right(flush_fut), }; + + let flush_fut = if let Some(req_ctx) = ctx.as_ref() { + futures::future::Either::Left( + flush_fut.maybe_perf_instrument(req_ctx, |current_perf_span| { + current_perf_span.clone() + }), + ) + } else { + futures::future::Either::Right(flush_fut) + }; + // do it while respecting cancellation let _: () = async move { tokio::select! { @@ -1569,7 +1616,7 @@ impl PageServerHandler { ctx: &RequestContext, ) -> Result< ( - Vec>, + Vec>, Span, ), QueryError, @@ -1596,7 +1643,7 @@ impl PageServerHandler { self.handle_get_rel_exists_request(&shard, &req, &ctx) .instrument(span.clone()) .await - .map(|msg| (msg, timer)) + .map(|msg| (msg, timer, ctx)) .map_err(|err| BatchedPageStreamError { err, req: req.hdr }), ], span, @@ -1615,7 +1662,7 @@ impl PageServerHandler { self.handle_get_nblocks_request(&shard, &req, &ctx) .instrument(span.clone()) .await - .map(|msg| (msg, timer)) + .map(|msg| (msg, timer, ctx)) .map_err(|err| BatchedPageStreamError { err, req: req.hdr }), ], span, @@ -1662,7 +1709,7 @@ impl PageServerHandler { self.handle_db_size_request(&shard, &req, &ctx) .instrument(span.clone()) .await - .map(|msg| (msg, timer)) + .map(|msg| (msg, timer, ctx)) .map_err(|err| BatchedPageStreamError { err, req: req.hdr }), ], span, @@ -1681,7 +1728,7 @@ impl PageServerHandler { self.handle_get_slru_segment_request(&shard, &req, &ctx) .instrument(span.clone()) .await - .map(|msg| (msg, timer)) + .map(|msg| (msg, timer, ctx)) .map_err(|err| BatchedPageStreamError { err, req: req.hdr }), ], span, @@ -2033,12 +2080,25 @@ impl PageServerHandler { return Ok(()); } }; - let batch = match batch { + let mut batch = match batch { Ok(batch) => batch, Err(e) => { return Err(e); } }; + + if let BatchedFeMessage::GetPage { + pages, + span: _, + shard: _, + batch_break_reason: _, + } = &mut batch + { + for req in pages { + req.batch_wait_ctx.take(); + } + } + self.pagestream_handle_batched_message( pgb_writer, batch, @@ -2351,7 +2411,8 @@ impl PageServerHandler { io_concurrency: IoConcurrency, batch_break_reason: GetPageBatchBreakReason, ctx: &RequestContext, - ) -> Vec> { + ) -> Vec> + { debug_assert_current_span_has_tenant_and_timeline_id(); timeline @@ -2458,6 +2519,7 @@ impl PageServerHandler { page, }), req.timer, + req.ctx, ) }) .map_err(|e| BatchedPageStreamError { @@ -2502,7 +2564,8 @@ impl PageServerHandler { timeline: &Timeline, requests: Vec, _ctx: &RequestContext, - ) -> Vec> { + ) -> Vec> + { // real requests would do something with the timeline let mut results = Vec::with_capacity(requests.len()); for _req in requests.iter() { @@ -2529,6 +2592,10 @@ impl PageServerHandler { req: req.req.clone(), }), req.timer, + RequestContext::new( + TaskKind::PageRequestHandler, + DownloadBehavior::Warn, + ), ) }) .map_err(|e| BatchedPageStreamError { From f0bb93a9c9322d78deb21c6b4ee2dff59eda26b2 Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Tue, 27 May 2025 22:29:15 +0800 Subject: [PATCH 2/7] feat(pageserver): support evaluate boolean flags (#12024) ## Problem Part of https://github.com/neondatabase/neon/issues/11813 ## Summary of changes * Support evaluate boolean flags. * Add docs on how to handle errors. * Add test cases based on real PostHog config. Signed-off-by: Alex Chi Z --- libs/posthog_client_lite/src/lib.rs | 466 ++++++++++++++++++++++------ pageserver/src/feature_resolver.rs | 29 ++ 2 files changed, 404 insertions(+), 91 deletions(-) diff --git a/libs/posthog_client_lite/src/lib.rs b/libs/posthog_client_lite/src/lib.rs index 21e978df3e..8aa8da2898 100644 --- a/libs/posthog_client_lite/src/lib.rs +++ b/libs/posthog_client_lite/src/lib.rs @@ -37,7 +37,7 @@ pub struct LocalEvaluationFlag { #[derive(Deserialize)] pub struct LocalEvaluationFlagFilters { groups: Vec, - multivariate: LocalEvaluationFlagMultivariate, + multivariate: Option, } #[derive(Deserialize)] @@ -254,7 +254,7 @@ impl FeatureStore { } } - /// Evaluate a multivariate feature flag. Returns `None` if the flag is not available or if there are errors + /// Evaluate a multivariate feature flag. Returns an error if the flag is not available or if there are errors /// during the evaluation. /// /// The parsing logic is as follows: @@ -272,6 +272,10 @@ impl FeatureStore { /// Example: we have a multivariate flag with 3 groups of the configured global rollout percentage: A (10%), B (20%), C (70%). /// There is a single group with a condition that has a rollout percentage of 10% and it does not have a variant override. /// Then, we will have 1% of the users evaluated to A, 2% to B, and 7% to C. + /// + /// Error handling: the caller should inspect the error and decide the behavior when a feature flag + /// cannot be evaluated (i.e., default to false if it cannot be resolved). The error should *not* be + /// propagated beyond where the feature flag gets resolved. pub fn evaluate_multivariate( &self, flag_key: &str, @@ -290,6 +294,35 @@ impl FeatureStore { ) } + /// Evaluate a boolean feature flag. Returns an error if the flag is not available or if there are errors + /// during the evaluation. + /// + /// The parsing logic is as follows: + /// + /// * Generate a consistent hash for the tenant-feature. + /// * Match each filter group. + /// - If a group is matched, it will first determine whether the user is in the range of the rollout + /// percentage. + /// - If the hash falls within the group's rollout percentage, return true. + /// * Otherwise, continue with the next group until all groups are evaluated and no group is within the + /// rollout percentage. + /// * If there are no matching groups, return an error. + /// + /// Returns `Ok(())` if the feature flag evaluates to true. In the future, it will return a payload. + /// + /// Error handling: the caller should inspect the error and decide the behavior when a feature flag + /// cannot be evaluated (i.e., default to false if it cannot be resolved). The error should *not* be + /// propagated beyond where the feature flag gets resolved. + pub fn evaluate_boolean( + &self, + flag_key: &str, + user_id: &str, + properties: &HashMap, + ) -> Result<(), PostHogEvaluationError> { + let hash_on_global_rollout_percentage = Self::consistent_hash(user_id, flag_key, "boolean"); + self.evaluate_boolean_inner(flag_key, hash_on_global_rollout_percentage, properties) + } + /// Evaluate a multivariate feature flag. Note that we directly take the mapped user ID /// (a consistent hash ranging from 0 to 1) so that it is easier to use it in the tests /// and avoid duplicate computations. @@ -316,6 +349,11 @@ impl FeatureStore { flag_key ))); } + let Some(ref multivariate) = flag_config.filters.multivariate else { + return Err(PostHogEvaluationError::Internal(format!( + "No multivariate available, should use evaluate_boolean?: {flag_key}" + ))); + }; // TODO: sort the groups so that variant overrides always get evaluated first and it follows the PostHog // Python SDK behavior; for now we do not configure conditions without variant overrides in Neon so it // does not matter. @@ -324,7 +362,7 @@ impl FeatureStore { GroupEvaluationResult::MatchedAndOverride(variant) => return Ok(variant), GroupEvaluationResult::MatchedAndEvaluate => { let mut percentage = 0; - for variant in &flag_config.filters.multivariate.variants { + for variant in &multivariate.variants { percentage += variant.rollout_percentage; if self .evaluate_percentage(hash_on_global_rollout_percentage, percentage) @@ -352,6 +390,64 @@ impl FeatureStore { ))) } } + + /// Evaluate a multivariate feature flag. Note that we directly take the mapped user ID + /// (a consistent hash ranging from 0 to 1) so that it is easier to use it in the tests + /// and avoid duplicate computations. + /// + /// Use a different consistent hash for evaluating the group rollout percentage. + /// The behavior: if the condition is set to rolling out to 10% of the users, and + /// we set the variant A to 20% in the global config, then 2% of the total users will + /// be evaluated to variant A. + /// + /// Note that the hash to determine group rollout percentage is shared across all groups. So if we have two + /// exactly-the-same conditions with 10% and 20% rollout percentage respectively, a total of 20% of the users + /// will be evaluated (versus 30% if group evaluation is done independently). + pub(crate) fn evaluate_boolean_inner( + &self, + flag_key: &str, + hash_on_global_rollout_percentage: f64, + properties: &HashMap, + ) -> Result<(), PostHogEvaluationError> { + if let Some(flag_config) = self.flags.get(flag_key) { + if !flag_config.active { + return Err(PostHogEvaluationError::NotAvailable(format!( + "The feature flag is not active: {}", + flag_key + ))); + } + if flag_config.filters.multivariate.is_some() { + return Err(PostHogEvaluationError::Internal(format!( + "This looks like a multivariate flag, should use evaluate_multivariate?: {flag_key}" + ))); + }; + // TODO: sort the groups so that variant overrides always get evaluated first and it follows the PostHog + // Python SDK behavior; for now we do not configure conditions without variant overrides in Neon so it + // does not matter. + for group in &flag_config.filters.groups { + match self.evaluate_group(group, hash_on_global_rollout_percentage, properties)? { + GroupEvaluationResult::MatchedAndOverride(_) => { + return Err(PostHogEvaluationError::Internal(format!( + "Boolean flag cannot have overrides: {}", + flag_key + ))); + } + GroupEvaluationResult::MatchedAndEvaluate => { + return Ok(()); + } + GroupEvaluationResult::Unmatched => continue, + } + } + // If no group is matched, the feature is not available, and up to the caller to decide what to do. + Err(PostHogEvaluationError::NoConditionGroupMatched) + } else { + // The feature flag is not available yet + Err(PostHogEvaluationError::NotAvailable(format!( + "Not found in the local evaluation spec: {}", + flag_key + ))) + } + } } pub struct PostHogClientConfig { @@ -469,95 +565,162 @@ mod tests { fn data() -> &'static str { r#"{ - "flags": [ - { - "id": 132794, - "team_id": 152860, - "name": "", - "key": "gc-compaction", - "filters": { - "groups": [ - { - "variant": "enabled-stage-2", - "properties": [ - { - "key": "plan_type", - "type": "person", - "value": [ - "free" - ], - "operator": "exact" - }, - { - "key": "pageserver_remote_size", - "type": "person", - "value": "10000000", - "operator": "lt" - } - ], - "rollout_percentage": 50 - }, - { - "properties": [ - { - "key": "plan_type", - "type": "person", - "value": [ - "free" - ], - "operator": "exact" - }, - { - "key": "pageserver_remote_size", - "type": "person", - "value": "10000000", - "operator": "lt" - } - ], - "rollout_percentage": 80 - } - ], - "payloads": {}, - "multivariate": { - "variants": [ - { - "key": "disabled", - "name": "", - "rollout_percentage": 90 - }, - { - "key": "enabled-stage-1", - "name": "", - "rollout_percentage": 10 - }, - { - "key": "enabled-stage-2", - "name": "", - "rollout_percentage": 0 - }, - { - "key": "enabled-stage-3", - "name": "", - "rollout_percentage": 0 - }, - { - "key": "enabled", - "name": "", - "rollout_percentage": 0 - } - ] - } - }, - "deleted": false, - "active": true, - "ensure_experience_continuity": false, - "has_encrypted_payloads": false, - "version": 6 - } + "flags": [ + { + "id": 141807, + "team_id": 152860, + "name": "", + "key": "image-compaction-boundary", + "filters": { + "groups": [ + { + "variant": null, + "properties": [ + { + "key": "plan_type", + "type": "person", + "value": [ + "free" + ], + "operator": "exact" + } ], - "group_type_mapping": {}, - "cohorts": {} - }"# + "rollout_percentage": 40 + }, + { + "variant": null, + "properties": [], + "rollout_percentage": 10 + } + ], + "payloads": {}, + "multivariate": null + }, + "deleted": false, + "active": true, + "ensure_experience_continuity": false, + "has_encrypted_payloads": false, + "version": 1 + }, + { + "id": 135586, + "team_id": 152860, + "name": "", + "key": "boolean-flag", + "filters": { + "groups": [ + { + "variant": null, + "properties": [ + { + "key": "plan_type", + "type": "person", + "value": [ + "free" + ], + "operator": "exact" + } + ], + "rollout_percentage": 47 + } + ], + "payloads": {}, + "multivariate": null + }, + "deleted": false, + "active": true, + "ensure_experience_continuity": false, + "has_encrypted_payloads": false, + "version": 1 + }, + { + "id": 132794, + "team_id": 152860, + "name": "", + "key": "gc-compaction", + "filters": { + "groups": [ + { + "variant": "enabled-stage-2", + "properties": [ + { + "key": "plan_type", + "type": "person", + "value": [ + "free" + ], + "operator": "exact" + }, + { + "key": "pageserver_remote_size", + "type": "person", + "value": "10000000", + "operator": "lt" + } + ], + "rollout_percentage": 50 + }, + { + "properties": [ + { + "key": "plan_type", + "type": "person", + "value": [ + "free" + ], + "operator": "exact" + }, + { + "key": "pageserver_remote_size", + "type": "person", + "value": "10000000", + "operator": "lt" + } + ], + "rollout_percentage": 80 + } + ], + "payloads": {}, + "multivariate": { + "variants": [ + { + "key": "disabled", + "name": "", + "rollout_percentage": 90 + }, + { + "key": "enabled-stage-1", + "name": "", + "rollout_percentage": 10 + }, + { + "key": "enabled-stage-2", + "name": "", + "rollout_percentage": 0 + }, + { + "key": "enabled-stage-3", + "name": "", + "rollout_percentage": 0 + }, + { + "key": "enabled", + "name": "", + "rollout_percentage": 0 + } + ] + } + }, + "deleted": false, + "active": true, + "ensure_experience_continuity": false, + "has_encrypted_payloads": false, + "version": 7 + } + ], + "group_type_mapping": {}, + "cohorts": {} +}"# } #[test] @@ -633,4 +796,125 @@ mod tests { Err(PostHogEvaluationError::NoConditionGroupMatched) ),); } + + #[test] + fn evaluate_boolean_1() { + // The `boolean-flag` feature flag only has one group that matches on the free user. + + let mut store = FeatureStore::new(); + let response: LocalEvaluationResponse = serde_json::from_str(data()).unwrap(); + store.set_flags(response.flags); + + // This lacks the required properties and cannot be evaluated. + let variant = store.evaluate_boolean_inner("boolean-flag", 1.00, &HashMap::new()); + assert!(matches!( + variant, + Err(PostHogEvaluationError::NotAvailable(_)) + ),); + + let properties_unmatched = HashMap::from([ + ( + "plan_type".to_string(), + PostHogFlagFilterPropertyValue::String("paid".to_string()), + ), + ( + "pageserver_remote_size".to_string(), + PostHogFlagFilterPropertyValue::Number(1000.0), + ), + ]); + + // This does not match any group so there will be an error. + let variant = store.evaluate_boolean_inner("boolean-flag", 1.00, &properties_unmatched); + assert!(matches!( + variant, + Err(PostHogEvaluationError::NoConditionGroupMatched) + ),); + + let properties = HashMap::from([ + ( + "plan_type".to_string(), + PostHogFlagFilterPropertyValue::String("free".to_string()), + ), + ( + "pageserver_remote_size".to_string(), + PostHogFlagFilterPropertyValue::Number(1000.0), + ), + ]); + + // It matches the first group as 0.10 <= 0.50 and the properties are matched. Then it gets evaluated to the variant override. + let variant = store.evaluate_boolean_inner("boolean-flag", 0.10, &properties); + assert!(variant.is_ok()); + + // It matches the group conditions but not the group rollout percentage. + let variant = store.evaluate_boolean_inner("boolean-flag", 1.00, &properties); + assert!(matches!( + variant, + Err(PostHogEvaluationError::NoConditionGroupMatched) + ),); + } + + #[test] + fn evaluate_boolean_2() { + // The `image-compaction-boundary` feature flag has one group that matches on the free user and a group that matches on all users. + + let mut store = FeatureStore::new(); + let response: LocalEvaluationResponse = serde_json::from_str(data()).unwrap(); + store.set_flags(response.flags); + + // This lacks the required properties and cannot be evaluated. + let variant = + store.evaluate_boolean_inner("image-compaction-boundary", 1.00, &HashMap::new()); + assert!(matches!( + variant, + Err(PostHogEvaluationError::NotAvailable(_)) + ),); + + let properties_unmatched = HashMap::from([ + ( + "plan_type".to_string(), + PostHogFlagFilterPropertyValue::String("paid".to_string()), + ), + ( + "pageserver_remote_size".to_string(), + PostHogFlagFilterPropertyValue::Number(1000.0), + ), + ]); + + // This does not match the filtered group but the all user group. + let variant = + store.evaluate_boolean_inner("image-compaction-boundary", 1.00, &properties_unmatched); + assert!(matches!( + variant, + Err(PostHogEvaluationError::NoConditionGroupMatched) + ),); + let variant = + store.evaluate_boolean_inner("image-compaction-boundary", 0.05, &properties_unmatched); + assert!(variant.is_ok()); + + let properties = HashMap::from([ + ( + "plan_type".to_string(), + PostHogFlagFilterPropertyValue::String("free".to_string()), + ), + ( + "pageserver_remote_size".to_string(), + PostHogFlagFilterPropertyValue::Number(1000.0), + ), + ]); + + // It matches the first group as 0.30 <= 0.40 and the properties are matched. Then it gets evaluated to the variant override. + let variant = store.evaluate_boolean_inner("image-compaction-boundary", 0.30, &properties); + assert!(variant.is_ok()); + + // It matches the group conditions but not the group rollout percentage. + let variant = store.evaluate_boolean_inner("image-compaction-boundary", 1.00, &properties); + assert!(matches!( + variant, + Err(PostHogEvaluationError::NoConditionGroupMatched) + ),); + + // It matches the second "all" group conditions. + let variant = store.evaluate_boolean_inner("image-compaction-boundary", 0.09, &properties); + assert!(variant.is_ok()); + } } diff --git a/pageserver/src/feature_resolver.rs b/pageserver/src/feature_resolver.rs index 193fb10abc..2b0f368079 100644 --- a/pageserver/src/feature_resolver.rs +++ b/pageserver/src/feature_resolver.rs @@ -45,6 +45,10 @@ impl FeatureResolver { } /// Evaluate a multivariate feature flag. Currently, we do not support any properties. + /// + /// Error handling: the caller should inspect the error and decide the behavior when a feature flag + /// cannot be evaluated (i.e., default to false if it cannot be resolved). The error should *not* be + /// propagated beyond where the feature flag gets resolved. pub fn evaluate_multivariate( &self, flag_key: &str, @@ -62,4 +66,29 @@ impl FeatureResolver { )) } } + + /// Evaluate a boolean feature flag. Currently, we do not support any properties. + /// + /// Returns `Ok(())` if the flag is evaluated to true, otherwise returns an error. + /// + /// Error handling: the caller should inspect the error and decide the behavior when a feature flag + /// cannot be evaluated (i.e., default to false if it cannot be resolved). The error should *not* be + /// propagated beyond where the feature flag gets resolved. + pub fn evaluate_boolean( + &self, + flag_key: &str, + tenant_id: TenantId, + ) -> Result<(), PostHogEvaluationError> { + if let Some(inner) = &self.inner { + inner.feature_store().evaluate_boolean( + flag_key, + &tenant_id.to_string(), + &HashMap::new(), + ) + } else { + Err(PostHogEvaluationError::NotAvailable( + "PostHog integration is not enabled".to_string(), + )) + } + } } From cdfa06caad553234594ff99e703f0c1bd1a4dae6 Mon Sep 17 00:00:00 2001 From: Tristan Partin Date: Tue, 27 May 2025 12:33:16 -0500 Subject: [PATCH 3/7] Remove test-images compatibility hack for confirming library load paths (#11927) This hack was needed for compatiblity tests, but after the compute release is no longer needed. Signed-off-by: Tristan Partin --- docker-compose/compute_wrapper/shell/compute.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker-compose/compute_wrapper/shell/compute.sh b/docker-compose/compute_wrapper/shell/compute.sh index 20a1ffb7a0..ab8d74d355 100755 --- a/docker-compose/compute_wrapper/shell/compute.sh +++ b/docker-compose/compute_wrapper/shell/compute.sh @@ -20,7 +20,7 @@ first_path="$(ldconfig --verbose 2>/dev/null \ | grep --invert-match ^$'\t' \ | cut --delimiter=: --fields=1 \ | head --lines=1)" -test "$first_path" == '/usr/local/lib' || true # Remove the || true in a follow-up PR. Needed for backwards compat. +test "$first_path" == '/usr/local/lib' echo "Waiting pageserver become ready." while ! nc -z pageserver 6400; do From e77961c1c6f5b242a47f923299df2237d1ee1649 Mon Sep 17 00:00:00 2001 From: Suhas Thalanki <54014218+thesuhas@users.noreply.github.com> Date: Tue, 27 May 2025 15:40:51 -0400 Subject: [PATCH 4/7] background worker that collects installed extensions (#11939) ## Problem Currently, we collect metrics of what extensions are installed on computes at start up time. We do not have a mechanism that does this at runtime. ## Summary of changes Added a background thread that queries all DBs at regular intervals and collects a list of installed extensions. --- compute_tools/src/bin/compute_ctl.rs | 5 +++ compute_tools/src/compute.rs | 53 ++++++++++++++++++++++------ 2 files changed, 47 insertions(+), 11 deletions(-) diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 20b5e567a8..02339f752c 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -136,6 +136,10 @@ struct Cli { requires = "compute-id" )] pub control_plane_uri: Option, + + /// Interval in seconds for collecting installed extensions statistics + #[arg(long, default_value = "3600")] + pub installed_extensions_collection_interval: u64, } fn main() -> Result<()> { @@ -179,6 +183,7 @@ fn main() -> Result<()> { cgroup: cli.cgroup, #[cfg(target_os = "linux")] vm_monitor_addr: cli.vm_monitor_addr, + installed_extensions_collection_interval: cli.installed_extensions_collection_interval, }, config, )?; diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index cb857e0a3e..ff49c737f0 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -97,6 +97,9 @@ pub struct ComputeNodeParams { /// the address of extension storage proxy gateway pub remote_ext_base_url: Option, + + /// Interval for installed extensions collection + pub installed_extensions_collection_interval: u64, } /// Compute node info shared across several `compute_ctl` threads. @@ -742,17 +745,7 @@ impl ComputeNode { let conf = self.get_tokio_conn_conf(None); tokio::task::spawn(async { - let res = get_installed_extensions(conf).await; - match res { - Ok(extensions) => { - info!( - "[NEON_EXT_STAT] {}", - serde_json::to_string(&extensions) - .expect("failed to serialize extensions list") - ); - } - Err(err) => error!("could not get installed extensions: {err:?}"), - } + let _ = installed_extensions(conf).await; }); } @@ -782,6 +775,9 @@ impl ComputeNode { // Log metrics so that we can search for slow operations in logs info!(?metrics, postmaster_pid = %postmaster_pid, "compute start finished"); + // Spawn the extension stats background task + self.spawn_extension_stats_task(); + if pspec.spec.prewarm_lfc_on_startup { self.prewarm_lfc(); } @@ -2192,6 +2188,41 @@ LIMIT 100", info!("Pageserver config changed"); } } + + pub fn spawn_extension_stats_task(&self) { + let conf = self.tokio_conn_conf.clone(); + let installed_extensions_collection_interval = + self.params.installed_extensions_collection_interval; + tokio::spawn(async move { + // An initial sleep is added to ensure that two collections don't happen at the same time. + // The first collection happens during compute startup. + tokio::time::sleep(tokio::time::Duration::from_secs( + installed_extensions_collection_interval, + )) + .await; + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs( + installed_extensions_collection_interval, + )); + loop { + interval.tick().await; + let _ = installed_extensions(conf.clone()).await; + } + }); + } +} + +pub async fn installed_extensions(conf: tokio_postgres::Config) -> Result<()> { + let res = get_installed_extensions(conf).await; + match res { + Ok(extensions) => { + info!( + "[NEON_EXT_STAT] {}", + serde_json::to_string(&extensions).expect("failed to serialize extensions list") + ); + } + Err(err) => error!("could not get installed extensions: {err:?}"), + } + Ok(()) } pub fn forward_termination_signal() { From 541fcd8d2fb5091ee4e8103cfd4cb19ea1ee39fd Mon Sep 17 00:00:00 2001 From: Nikita Kalyanov <44959448+nikitakalyanov@users.noreply.github.com> Date: Wed, 28 May 2025 06:39:59 +0300 Subject: [PATCH 5/7] chore: expose new mark_invisible API in openAPI spec for use in cplane (#12032) ## Problem There is a new API that I plan to use. We generate client from the spec so it should be in the spec ## Summary of changes Document the existing API in openAPI format --- pageserver/src/http/openapi_spec.yml | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index cf99cb110c..e8d1367d6c 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -353,6 +353,33 @@ paths: "200": description: OK + /v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/mark_invisible: + parameters: + - name: tenant_shard_id + in: path + required: true + schema: + type: string + - name: timeline_id + in: path + required: true + schema: + type: string + format: hex + put: + requestBody: + content: + application/json: + schema: + type: object + properties: + is_visible: + type: boolean + default: false + responses: + "200": + description: OK + /v1/tenant/{tenant_shard_id}/location_config: parameters: - name: tenant_shard_id From 67ddf1de28e5d79157cd096d04dce0010d8df2cd Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Wed, 28 May 2025 15:00:52 +0800 Subject: [PATCH 6/7] feat(pageserver): create image layers at L0-L1 boundary (#12023) ## Problem Previous attempt https://github.com/neondatabase/neon/pull/10548 caused some issues in staging and we reverted it. This is a re-attempt to address https://github.com/neondatabase/neon/issues/11063. Currently we create image layers at latest record LSN. We would create "future image layers" (i.e., image layers with LSN larger than disk consistent LSN) that need special handling at startup. We also waste a lot of read operations to reconstruct from L0 layers while we could have compacted all of the L0 layers and operate on a flat level of historic layers. ## Summary of changes * Run repartition at L0-L1 boundary. * Roll out with feature flags. * Piggyback a change that downgrades "image layer creating below gc_cutoff" to debug level. --------- Signed-off-by: Alex Chi Z --- pageserver/src/tenant.rs | 25 +++++-- pageserver/src/tenant/timeline.rs | 10 ++- pageserver/src/tenant/timeline/compaction.rs | 67 ++++++++++++++++--- .../regress/test_layers_from_future.py | 3 + 4 files changed, 90 insertions(+), 15 deletions(-) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 86731fb666..58b766933d 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -5315,6 +5315,7 @@ impl TenantShard { l0_compaction_trigger: self.l0_compaction_trigger.clone(), l0_flush_global_state: self.l0_flush_global_state.clone(), basebackup_prepare_sender: self.basebackup_prepare_sender.clone(), + feature_resolver: self.feature_resolver.clone(), } } @@ -8359,10 +8360,24 @@ mod tests { } tline.freeze_and_flush().await?; + // Force layers to L1 + tline + .compact( + &cancel, + { + let mut flags = EnumSet::new(); + flags.insert(CompactFlags::ForceL0Compaction); + flags + }, + &ctx, + ) + .await?; if iter % 5 == 0 { + let scan_lsn = Lsn(lsn.0 + 1); + info!("scanning at {}", scan_lsn); let (_, before_delta_file_accessed) = - scan_with_statistics(&tline, &keyspace, lsn, &ctx, io_concurrency.clone()) + scan_with_statistics(&tline, &keyspace, scan_lsn, &ctx, io_concurrency.clone()) .await?; tline .compact( @@ -8371,13 +8386,14 @@ mod tests { let mut flags = EnumSet::new(); flags.insert(CompactFlags::ForceImageLayerCreation); flags.insert(CompactFlags::ForceRepartition); + flags.insert(CompactFlags::ForceL0Compaction); flags }, &ctx, ) .await?; let (_, after_delta_file_accessed) = - scan_with_statistics(&tline, &keyspace, lsn, &ctx, io_concurrency.clone()) + scan_with_statistics(&tline, &keyspace, scan_lsn, &ctx, io_concurrency.clone()) .await?; assert!( after_delta_file_accessed < before_delta_file_accessed, @@ -8818,6 +8834,8 @@ mod tests { let cancel = CancellationToken::new(); + // Image layer creation happens on the disk_consistent_lsn so we need to force set it now. + tline.force_set_disk_consistent_lsn(Lsn(0x40)); tline .compact( &cancel, @@ -8831,8 +8849,7 @@ mod tests { ) .await .unwrap(); - - // Image layers are created at last_record_lsn + // Image layers are created at repartition LSN let images = tline .inspect_image_layers(Lsn(0x40), &ctx, io_concurrency.clone()) .await diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 54dc3b2d0b..71765b9197 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -103,6 +103,7 @@ use crate::context::{ DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder, }; use crate::disk_usage_eviction_task::{DiskUsageEvictionInfo, EvictionCandidate, finite_f32}; +use crate::feature_resolver::FeatureResolver; use crate::keyspace::{KeyPartitioning, KeySpace}; use crate::l0_flush::{self, L0FlushGlobalState}; use crate::metrics::{ @@ -198,6 +199,7 @@ pub struct TimelineResources { pub l0_compaction_trigger: Arc, pub l0_flush_global_state: l0_flush::L0FlushGlobalState, pub basebackup_prepare_sender: BasebackupPrepareSender, + pub feature_resolver: FeatureResolver, } pub struct Timeline { @@ -444,6 +446,8 @@ pub struct Timeline { /// A channel to send async requests to prepare a basebackup for the basebackup cache. basebackup_prepare_sender: BasebackupPrepareSender, + + feature_resolver: FeatureResolver, } pub(crate) enum PreviousHeatmap { @@ -3072,6 +3076,8 @@ impl Timeline { wait_lsn_log_slow: tokio::sync::Semaphore::new(1), basebackup_prepare_sender: resources.basebackup_prepare_sender, + + feature_resolver: resources.feature_resolver, }; result.repartition_threshold = @@ -4906,6 +4912,7 @@ impl Timeline { LastImageLayerCreationStatus::Initial, false, // don't yield for L0, we're flushing L0 ) + .instrument(info_span!("create_image_layers", mode = %ImageLayerCreationMode::Initial, partition_mode = "initial", lsn = %self.initdb_lsn)) .await?; debug_assert!( matches!(is_complete, LastImageLayerCreationStatus::Complete), @@ -5462,7 +5469,8 @@ impl Timeline { /// Returns the image layers generated and an enum indicating whether the process is fully completed. /// true = we have generate all image layers, false = we preempt the process for L0 compaction. - #[tracing::instrument(skip_all, fields(%lsn, %mode))] + /// + /// `partition_mode` is only for logging purpose and is not used anywhere in this function. async fn create_image_layers( self: &Arc, partitioning: &KeyPartitioning, diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 0e4b14c3e4..143c2e0865 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -1278,11 +1278,55 @@ impl Timeline { } let gc_cutoff = *self.applied_gc_cutoff_lsn.read(); + let l0_l1_boundary_lsn = { + // We do the repartition on the L0-L1 boundary. All data below the boundary + // are compacted by L0 with low read amplification, thus making the `repartition` + // function run fast. + let guard = self.layers.read().await; + guard + .all_persistent_layers() + .iter() + .map(|x| { + // Use the end LSN of delta layers OR the start LSN of image layers. + if x.is_delta { + x.lsn_range.end + } else { + x.lsn_range.start + } + }) + .max() + }; + + let (partition_mode, partition_lsn) = if cfg!(test) + || cfg!(feature = "testing") + || self + .feature_resolver + .evaluate_boolean("image-compaction-boundary", self.tenant_shard_id.tenant_id) + .is_ok() + { + let last_repartition_lsn = self.partitioning.read().1; + let lsn = match l0_l1_boundary_lsn { + Some(boundary) => gc_cutoff + .max(boundary) + .max(last_repartition_lsn) + .max(self.initdb_lsn) + .max(self.ancestor_lsn), + None => self.get_last_record_lsn(), + }; + if lsn <= self.initdb_lsn || lsn <= self.ancestor_lsn { + // Do not attempt to create image layers below the initdb or ancestor LSN -- no data below it + ("l0_l1_boundary", self.get_last_record_lsn()) + } else { + ("l0_l1_boundary", lsn) + } + } else { + ("latest_record", self.get_last_record_lsn()) + }; // 2. Repartition and create image layers if necessary match self .repartition( - self.get_last_record_lsn(), + partition_lsn, self.get_compaction_target_size(), options.flags, ctx, @@ -1301,18 +1345,19 @@ impl Timeline { .extend(sparse_partitioning.into_dense().parts); // 3. Create new image layers for partitions that have been modified "enough". + let mode = if options + .flags + .contains(CompactFlags::ForceImageLayerCreation) + { + ImageLayerCreationMode::Force + } else { + ImageLayerCreationMode::Try + }; let (image_layers, outcome) = self .create_image_layers( &partitioning, lsn, - if options - .flags - .contains(CompactFlags::ForceImageLayerCreation) - { - ImageLayerCreationMode::Force - } else { - ImageLayerCreationMode::Try - }, + mode, &image_ctx, self.last_image_layer_creation_status .load() @@ -1320,6 +1365,7 @@ impl Timeline { .clone(), options.flags.contains(CompactFlags::YieldForL0), ) + .instrument(info_span!("create_image_layers", mode = %mode, partition_mode = %partition_mode, lsn = %lsn)) .await .inspect_err(|err| { if let CreateImageLayersError::GetVectoredError( @@ -1344,7 +1390,8 @@ impl Timeline { } Ok(_) => { - info!("skipping repartitioning due to image compaction LSN being below GC cutoff"); + // This happens very frequently so we don't want to log it. + debug!("skipping repartitioning due to image compaction LSN being below GC cutoff"); } // Suppress errors when cancelled. diff --git a/test_runner/regress/test_layers_from_future.py b/test_runner/regress/test_layers_from_future.py index b4eba2779d..f3fcdb0d14 100644 --- a/test_runner/regress/test_layers_from_future.py +++ b/test_runner/regress/test_layers_from_future.py @@ -20,6 +20,9 @@ from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind from fixtures.utils import query_scalar, wait_until +@pytest.mark.skip( + reason="We won't create future layers any more after https://github.com/neondatabase/neon/pull/10548" +) @pytest.mark.parametrize( "attach_mode", ["default_generation", "same_generation"], From eadabeddb892b330a9aa65034adb05141ec64035 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Wed, 28 May 2025 16:19:41 +0100 Subject: [PATCH 7/7] pageserver: use the same job size throughout the import lifetime (#12026) ## Problem Import planning takes a job size limit as its input. Previously, the job size came from a pageserver config field. This field may change while imports are in progress. If this happens, plans will no longer be identical and the import would fail permanently. ## Summary of Changes Bake the job size into the import progress reported to the storage controller. For new imports, use the value from the pagesever config, and, for existing imports, use the value present in the shard progress. This value is identical for all shards, but we want it to be versioned since future versions of the planner might split the jobs up differently. Hence, it ends up in `ShardImportProgress`. Closes https://github.com/neondatabase/neon/issues/11983 --- libs/pageserver_api/src/models.rs | 3 +++ .../src/tenant/timeline/import_pgdata/flow.rs | 24 ++++++++++++++++--- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 9f3736d57a..e7d612bb7a 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -354,6 +354,9 @@ pub struct ShardImportProgressV1 { pub completed: usize, /// Hash of the plan pub import_plan_hash: u64, + /// Soft limit for the job size + /// This needs to remain constant throughout the import + pub job_soft_size_limit: usize, } impl ShardImportStatus { diff --git a/pageserver/src/tenant/timeline/import_pgdata/flow.rs b/pageserver/src/tenant/timeline/import_pgdata/flow.rs index 2ba4ca69ac..0d87a2f135 100644 --- a/pageserver/src/tenant/timeline/import_pgdata/flow.rs +++ b/pageserver/src/tenant/timeline/import_pgdata/flow.rs @@ -30,6 +30,7 @@ use std::collections::HashSet; use std::hash::{Hash, Hasher}; +use std::num::NonZeroUsize; use std::ops::Range; use std::sync::Arc; @@ -100,8 +101,24 @@ async fn run_v1( tasks: Vec::default(), }; - let import_config = &timeline.conf.timeline_import_config; - let plan = planner.plan(import_config).await?; + // Use the job size limit encoded in the progress if we are resuming an import. + // This ensures that imports have stable plans even if the pageserver config changes. + let import_config = { + match &import_progress { + Some(progress) => { + let base = &timeline.conf.timeline_import_config; + TimelineImportConfig { + import_job_soft_size_limit: NonZeroUsize::new(progress.job_soft_size_limit) + .unwrap(), + import_job_concurrency: base.import_job_concurrency, + import_job_checkpoint_threshold: base.import_job_checkpoint_threshold, + } + } + None => timeline.conf.timeline_import_config.clone(), + } + }; + + let plan = planner.plan(&import_config).await?; // Hash the plan and compare with the hash of the plan we got back from the storage controller. // If the two match, it means that the planning stage had the same output. @@ -126,7 +143,7 @@ async fn run_v1( pausable_failpoint!("import-timeline-pre-execute-pausable"); let start_from_job_idx = import_progress.map(|progress| progress.completed); - plan.execute(timeline, start_from_job_idx, plan_hash, import_config, ctx) + plan.execute(timeline, start_from_job_idx, plan_hash, &import_config, ctx) .await } @@ -453,6 +470,7 @@ impl Plan { jobs: jobs_in_plan, completed: last_completed_job_idx, import_plan_hash, + job_soft_size_limit: import_config.import_job_soft_size_limit.into(), }; timeline.remote_client.schedule_index_upload_for_file_changes()?;