From b22a84a7bf2ccae30243be81439cc284835a37f1 Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Tue, 19 Nov 2024 14:38:41 -0500 Subject: [PATCH] feat(pageserver): support key range for manual compaction trigger (#9723) part of https://github.com/neondatabase/neon/issues/9114, we want to be able to run partial gc-compaction in tests. In the future, we can also expand this functionality to legacy compaction, so that we can trigger compaction for a specific key range. ## Summary of changes * Support passing compaction key range through pageserver routes. * Refactor input parameters of compact related function to take the new `CompactOptions`. * Add tests for partial compaction. Note that the test may or may not trigger compaction based on GC horizon. We need to improve the test case to ensure things always get below the gc_horizon and the gc-compaction can be triggered. --------- Signed-off-by: Alex Chi Z --- libs/utils/src/http/json.rs | 22 ++++++++++ pageserver/src/http/routes.rs | 15 +++++-- pageserver/src/tenant.rs | 42 ++++++++++++------- pageserver/src/tenant/timeline.rs | 36 +++++++++++++++- pageserver/src/tenant/timeline/compaction.rs | 43 +++++++++++++++----- test_runner/fixtures/pageserver/http.py | 2 + test_runner/regress/test_compaction.py | 39 ++++++++++++++++++ 7 files changed, 170 insertions(+), 29 deletions(-) diff --git a/libs/utils/src/http/json.rs b/libs/utils/src/http/json.rs index 6c25440b42..e53231f313 100644 --- a/libs/utils/src/http/json.rs +++ b/libs/utils/src/http/json.rs @@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize}; use super::error::ApiError; +/// Parse a json request body and deserialize it to the type `T`. pub async fn json_request Deserialize<'de>>( request: &mut Request, ) -> Result { @@ -27,6 +28,27 @@ pub async fn json_request Deserialize<'de>>( .map_err(ApiError::BadRequest) } +/// Parse a json request body and deserialize it to the type `T`. If the body is empty, return `T::default`. +pub async fn json_request_maybe Deserialize<'de> + Default>( + request: &mut Request, +) -> Result { + let body = hyper::body::aggregate(request.body_mut()) + .await + .context("Failed to read request body") + .map_err(ApiError::BadRequest)?; + + if body.remaining() == 0 { + return Ok(T::default()); + } + + let mut deser = serde_json::de::Deserializer::from_reader(body.reader()); + + serde_path_to_error::deserialize(&mut deser) + // intentionally stringify because the debug version is not helpful in python logs + .map_err(|e| anyhow::anyhow!("Failed to parse json request: {e}")) + .map_err(ApiError::BadRequest) +} + pub fn json_response( status: StatusCode, data: T, diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index ab170679ba..306b0f35ab 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -83,6 +83,8 @@ use crate::tenant::storage_layer::LayerName; use crate::tenant::timeline::offload::offload_timeline; use crate::tenant::timeline::offload::OffloadError; use crate::tenant::timeline::CompactFlags; +use crate::tenant::timeline::CompactOptions; +use crate::tenant::timeline::CompactRange; use crate::tenant::timeline::CompactionError; use crate::tenant::timeline::Timeline; use crate::tenant::GetTimelineError; @@ -100,7 +102,7 @@ use utils::{ http::{ endpoint::{self, attach_openapi_ui, auth_middleware, check_permission_with}, error::{ApiError, HttpErrorBody}, - json::{json_request, json_response}, + json::{json_request, json_request_maybe, json_response}, request::parse_request_param, RequestExt, RouterBuilder, }, @@ -1927,13 +1929,15 @@ async fn timeline_gc_handler( // Run compaction immediately on given timeline. async fn timeline_compact_handler( - request: Request, + mut request: Request, cancel: CancellationToken, ) -> Result, ApiError> { let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?; check_permission(&request, Some(tenant_shard_id.tenant_id))?; + let compact_range = json_request_maybe::>(&mut request).await?; + let state = get_state(&request); let mut flags = EnumSet::empty(); @@ -1957,11 +1961,16 @@ async fn timeline_compact_handler( let wait_until_uploaded = parse_query_param::<_, bool>(&request, "wait_until_uploaded")?.unwrap_or(false); + let options = CompactOptions { + compact_range, + flags, + }; + async { let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?; timeline - .compact(&cancel, flags, &ctx) + .compact_with_options(&cancel, options, &ctx) .await .map_err(|e| ApiError::InternalServerError(e.into()))?; if wait_until_uploaded { diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 46317e93ee..37bf83c984 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -5254,7 +5254,7 @@ mod tests { use storage_layer::PersistentLayerKey; use tests::storage_layer::ValuesReconstructState; use tests::timeline::{GetVectoredError, ShutdownMode}; - use timeline::DeltaLayerTestDesc; + use timeline::{CompactOptions, DeltaLayerTestDesc}; use utils::id::TenantId; #[cfg(feature = "testing")] @@ -7728,7 +7728,7 @@ mod tests { let cancel = CancellationToken::new(); tline - .compact_with_gc(&cancel, EnumSet::new(), &ctx) + .compact_with_gc(&cancel, CompactOptions::default(), &ctx) .await .unwrap(); @@ -7805,7 +7805,7 @@ mod tests { guard.cutoffs.space = Lsn(0x40); } tline - .compact_with_gc(&cancel, EnumSet::new(), &ctx) + .compact_with_gc(&cancel, CompactOptions::default(), &ctx) .await .unwrap(); @@ -8237,7 +8237,7 @@ mod tests { let cancel = CancellationToken::new(); tline - .compact_with_gc(&cancel, EnumSet::new(), &ctx) + .compact_with_gc(&cancel, CompactOptions::default(), &ctx) .await .unwrap(); @@ -8266,7 +8266,7 @@ mod tests { guard.cutoffs.space = Lsn(0x40); } tline - .compact_with_gc(&cancel, EnumSet::new(), &ctx) + .compact_with_gc(&cancel, CompactOptions::default(), &ctx) .await .unwrap(); @@ -8819,7 +8819,14 @@ mod tests { dryrun_flags.insert(CompactFlags::DryRun); tline - .compact_with_gc(&cancel, dryrun_flags, &ctx) + .compact_with_gc( + &cancel, + CompactOptions { + flags: dryrun_flags, + compact_range: None, + }, + &ctx, + ) .await .unwrap(); // We expect layer map to be the same b/c the dry run flag, but we don't know whether there will be other background jobs @@ -8827,14 +8834,14 @@ mod tests { verify_result().await; tline - .compact_with_gc(&cancel, EnumSet::new(), &ctx) + .compact_with_gc(&cancel, CompactOptions::default(), &ctx) .await .unwrap(); verify_result().await; // compact again tline - .compact_with_gc(&cancel, EnumSet::new(), &ctx) + .compact_with_gc(&cancel, CompactOptions::default(), &ctx) .await .unwrap(); verify_result().await; @@ -8847,14 +8854,14 @@ mod tests { guard.cutoffs.space = Lsn(0x38); } tline - .compact_with_gc(&cancel, EnumSet::new(), &ctx) + .compact_with_gc(&cancel, CompactOptions::default(), &ctx) .await .unwrap(); verify_result().await; // no wals between 0x30 and 0x38, so we should obtain the same result // not increasing the GC horizon and compact again tline - .compact_with_gc(&cancel, EnumSet::new(), &ctx) + .compact_with_gc(&cancel, CompactOptions::default(), &ctx) .await .unwrap(); verify_result().await; @@ -9048,7 +9055,14 @@ mod tests { dryrun_flags.insert(CompactFlags::DryRun); tline - .compact_with_gc(&cancel, dryrun_flags, &ctx) + .compact_with_gc( + &cancel, + CompactOptions { + flags: dryrun_flags, + compact_range: None, + }, + &ctx, + ) .await .unwrap(); // We expect layer map to be the same b/c the dry run flag, but we don't know whether there will be other background jobs @@ -9056,14 +9070,14 @@ mod tests { verify_result().await; tline - .compact_with_gc(&cancel, EnumSet::new(), &ctx) + .compact_with_gc(&cancel, CompactOptions::default(), &ctx) .await .unwrap(); verify_result().await; // compact again tline - .compact_with_gc(&cancel, EnumSet::new(), &ctx) + .compact_with_gc(&cancel, CompactOptions::default(), &ctx) .await .unwrap(); verify_result().await; @@ -9248,7 +9262,7 @@ mod tests { let cancel = CancellationToken::new(); branch_tline - .compact_with_gc(&cancel, EnumSet::new(), &ctx) + .compact_with_gc(&cancel, CompactOptions::default(), &ctx) .await .unwrap(); diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 5547bc2c7a..0eb3de21e9 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -774,6 +774,21 @@ pub(crate) enum CompactFlags { DryRun, } +#[serde_with::serde_as] +#[derive(Debug, Clone, serde::Deserialize)] +pub(crate) struct CompactRange { + #[serde_as(as = "serde_with::DisplayFromStr")] + pub start: Key, + #[serde_as(as = "serde_with::DisplayFromStr")] + pub end: Key, +} + +#[derive(Clone, Default)] +pub(crate) struct CompactOptions { + pub flags: EnumSet, + pub compact_range: Option, +} + impl std::fmt::Debug for Timeline { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!(f, "Timeline<{}>", self.timeline_id) @@ -1612,6 +1627,25 @@ impl Timeline { cancel: &CancellationToken, flags: EnumSet, ctx: &RequestContext, + ) -> Result { + self.compact_with_options( + cancel, + CompactOptions { + flags, + compact_range: None, + }, + ctx, + ) + .await + } + + /// Outermost timeline compaction operation; downloads needed layers. Returns whether we have pending + /// compaction tasks. + pub(crate) async fn compact_with_options( + self: &Arc, + cancel: &CancellationToken, + options: CompactOptions, + ctx: &RequestContext, ) -> Result { // most likely the cancellation token is from background task, but in tests it could be the // request task as well. @@ -1649,7 +1683,7 @@ impl Timeline { self.compact_tiered(cancel, ctx).await?; Ok(false) } - CompactionAlgorithm::Legacy => self.compact_legacy(cancel, flags, ctx).await, + CompactionAlgorithm::Legacy => self.compact_legacy(cancel, options, ctx).await, } } diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index b30e380de5..ecd68ba55e 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -10,7 +10,7 @@ use std::sync::Arc; use super::layer_manager::LayerManager; use super::{ - CompactFlags, CreateImageLayersError, DurationRecorder, ImageLayerCreationMode, + CompactFlags, CompactOptions, CreateImageLayersError, DurationRecorder, ImageLayerCreationMode, RecordedDuration, Timeline, }; @@ -273,22 +273,32 @@ impl Timeline { pub(crate) async fn compact_legacy( self: &Arc, cancel: &CancellationToken, - flags: EnumSet, + options: CompactOptions, ctx: &RequestContext, ) -> Result { - if flags.contains(CompactFlags::EnhancedGcBottomMostCompaction) { - self.compact_with_gc(cancel, flags, ctx) + if options + .flags + .contains(CompactFlags::EnhancedGcBottomMostCompaction) + { + self.compact_with_gc(cancel, options, ctx) .await .map_err(CompactionError::Other)?; return Ok(false); } - if flags.contains(CompactFlags::DryRun) { + if options.flags.contains(CompactFlags::DryRun) { return Err(CompactionError::Other(anyhow!( "dry-run mode is not supported for legacy compaction for now" ))); } + if options.compact_range.is_some() { + // maybe useful in the future? could implement this at some point + return Err(CompactionError::Other(anyhow!( + "compaction range is not supported for legacy compaction for now" + ))); + } + // High level strategy for compaction / image creation: // // 1. First, calculate the desired "partitioning" of the @@ -338,7 +348,7 @@ impl Timeline { .repartition( self.get_last_record_lsn(), self.get_compaction_target_size(), - flags, + options.flags, ctx, ) .await @@ -354,7 +364,7 @@ impl Timeline { let fully_compacted = self .compact_level0( target_file_size, - flags.contains(CompactFlags::ForceL0Compaction), + options.flags.contains(CompactFlags::ForceL0Compaction), ctx, ) .await?; @@ -372,7 +382,10 @@ impl Timeline { .create_image_layers( &partitioning, lsn, - if flags.contains(CompactFlags::ForceImageLayerCreation) { + if options + .flags + .contains(CompactFlags::ForceImageLayerCreation) + { ImageLayerCreationMode::Force } else { ImageLayerCreationMode::Try @@ -1736,11 +1749,19 @@ impl Timeline { pub(crate) async fn compact_with_gc( self: &Arc, cancel: &CancellationToken, - flags: EnumSet, + options: CompactOptions, ctx: &RequestContext, ) -> anyhow::Result<()> { - self.partial_compact_with_gc(Key::MIN..Key::MAX, cancel, flags, ctx) - .await + self.partial_compact_with_gc( + options + .compact_range + .map(|range| range.start..range.end) + .unwrap_or_else(|| Key::MIN..Key::MAX), + cancel, + options.flags, + ctx, + ) + .await } /// An experimental compaction building block that combines compaction with garbage collection. diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index d1a9b5921a..01583757fa 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -665,6 +665,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter): force_l0_compaction=False, wait_until_uploaded=False, enhanced_gc_bottom_most_compaction=False, + body: Optional[dict[str, Any]] = None, ): self.is_testing_enabled_or_skip() query = {} @@ -683,6 +684,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter): res = self.put( f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/compact", params=query, + json=body, ) log.info(f"Got compact request response code: {res.status_code}") self.verbose_error(res) diff --git a/test_runner/regress/test_compaction.py b/test_runner/regress/test_compaction.py index a02d0f6b98..48950a5a50 100644 --- a/test_runner/regress/test_compaction.py +++ b/test_runner/regress/test_compaction.py @@ -116,6 +116,45 @@ page_cache_size=10 assert vectored_average < 8 +def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder): + env = neon_env_builder.init_start(initial_tenant_conf=AGGRESIVE_COMPACTION_TENANT_CONF) + + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + row_count = 1000 + churn_rounds = 10 + + ps_http = env.pageserver.http_client() + + workload = Workload(env, tenant_id, timeline_id) + workload.init(env.pageserver.id) + + log.info("Writing initial data ...") + workload.write_rows(row_count, env.pageserver.id) + + for i in range(1, churn_rounds + 1): + if i % 10 == 0: + log.info(f"Running churn round {i}/{churn_rounds} ...") + + workload.churn_rows(row_count, env.pageserver.id) + # Force L0 compaction to ensure the number of layers is within bounds, so that gc-compaction can run. + ps_http.timeline_compact(tenant_id, timeline_id, force_l0_compaction=True) + assert ps_http.perf_info(tenant_id, timeline_id)[0]["num_of_l0"] <= 1 + ps_http.timeline_compact( + tenant_id, + timeline_id, + enhanced_gc_bottom_most_compaction=True, + body={ + "start": "000000000000000000000000000000000000", + "end": "030000000000000000000000000000000000", + }, + ) + + log.info("Validating at workload end ...") + workload.validate(env.pageserver.id) + + # Stripe sizes in number of pages. TINY_STRIPES = 16 LARGE_STRIPES = 32768