feat(pageserver): support key range for manual compaction trigger (#9723)

part of https://github.com/neondatabase/neon/issues/9114, we want to be
able to run partial gc-compaction in tests. In the future, we can also
expand this functionality to legacy compaction, so that we can trigger
compaction for a specific key range.

## Summary of changes

* Support passing compaction key range through pageserver routes.
* Refactor input parameters of compact related function to take the new
`CompactOptions`.
* Add tests for partial compaction. Note that the test may or may not
trigger compaction based on GC horizon. We need to improve the test case
to ensure things always get below the gc_horizon and the gc-compaction
can be triggered.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
Alex Chi Z.
2024-11-19 14:38:41 -05:00
committed by GitHub
parent b092126c94
commit b22a84a7bf
7 changed files with 170 additions and 29 deletions

View File

@@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize};
use super::error::ApiError;
/// Parse a json request body and deserialize it to the type `T`.
pub async fn json_request<T: for<'de> Deserialize<'de>>(
request: &mut Request<Body>,
) -> Result<T, ApiError> {
@@ -27,6 +28,27 @@ pub async fn json_request<T: for<'de> Deserialize<'de>>(
.map_err(ApiError::BadRequest)
}
/// Parse a json request body and deserialize it to the type `T`. If the body is empty, return `T::default`.
pub async fn json_request_maybe<T: for<'de> Deserialize<'de> + Default>(
request: &mut Request<Body>,
) -> Result<T, ApiError> {
let body = hyper::body::aggregate(request.body_mut())
.await
.context("Failed to read request body")
.map_err(ApiError::BadRequest)?;
if body.remaining() == 0 {
return Ok(T::default());
}
let mut deser = serde_json::de::Deserializer::from_reader(body.reader());
serde_path_to_error::deserialize(&mut deser)
// intentionally stringify because the debug version is not helpful in python logs
.map_err(|e| anyhow::anyhow!("Failed to parse json request: {e}"))
.map_err(ApiError::BadRequest)
}
pub fn json_response<T: Serialize>(
status: StatusCode,
data: T,

View File

@@ -83,6 +83,8 @@ use crate::tenant::storage_layer::LayerName;
use crate::tenant::timeline::offload::offload_timeline;
use crate::tenant::timeline::offload::OffloadError;
use crate::tenant::timeline::CompactFlags;
use crate::tenant::timeline::CompactOptions;
use crate::tenant::timeline::CompactRange;
use crate::tenant::timeline::CompactionError;
use crate::tenant::timeline::Timeline;
use crate::tenant::GetTimelineError;
@@ -100,7 +102,7 @@ use utils::{
http::{
endpoint::{self, attach_openapi_ui, auth_middleware, check_permission_with},
error::{ApiError, HttpErrorBody},
json::{json_request, json_response},
json::{json_request, json_request_maybe, json_response},
request::parse_request_param,
RequestExt, RouterBuilder,
},
@@ -1927,13 +1929,15 @@ async fn timeline_gc_handler(
// Run compaction immediately on given timeline.
async fn timeline_compact_handler(
request: Request<Body>,
mut request: Request<Body>,
cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let compact_range = json_request_maybe::<Option<CompactRange>>(&mut request).await?;
let state = get_state(&request);
let mut flags = EnumSet::empty();
@@ -1957,11 +1961,16 @@ async fn timeline_compact_handler(
let wait_until_uploaded =
parse_query_param::<_, bool>(&request, "wait_until_uploaded")?.unwrap_or(false);
let options = CompactOptions {
compact_range,
flags,
};
async {
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?;
timeline
.compact(&cancel, flags, &ctx)
.compact_with_options(&cancel, options, &ctx)
.await
.map_err(|e| ApiError::InternalServerError(e.into()))?;
if wait_until_uploaded {

View File

@@ -5254,7 +5254,7 @@ mod tests {
use storage_layer::PersistentLayerKey;
use tests::storage_layer::ValuesReconstructState;
use tests::timeline::{GetVectoredError, ShutdownMode};
use timeline::DeltaLayerTestDesc;
use timeline::{CompactOptions, DeltaLayerTestDesc};
use utils::id::TenantId;
#[cfg(feature = "testing")]
@@ -7728,7 +7728,7 @@ mod tests {
let cancel = CancellationToken::new();
tline
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
.compact_with_gc(&cancel, CompactOptions::default(), &ctx)
.await
.unwrap();
@@ -7805,7 +7805,7 @@ mod tests {
guard.cutoffs.space = Lsn(0x40);
}
tline
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
.compact_with_gc(&cancel, CompactOptions::default(), &ctx)
.await
.unwrap();
@@ -8237,7 +8237,7 @@ mod tests {
let cancel = CancellationToken::new();
tline
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
.compact_with_gc(&cancel, CompactOptions::default(), &ctx)
.await
.unwrap();
@@ -8266,7 +8266,7 @@ mod tests {
guard.cutoffs.space = Lsn(0x40);
}
tline
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
.compact_with_gc(&cancel, CompactOptions::default(), &ctx)
.await
.unwrap();
@@ -8819,7 +8819,14 @@ mod tests {
dryrun_flags.insert(CompactFlags::DryRun);
tline
.compact_with_gc(&cancel, dryrun_flags, &ctx)
.compact_with_gc(
&cancel,
CompactOptions {
flags: dryrun_flags,
compact_range: None,
},
&ctx,
)
.await
.unwrap();
// We expect layer map to be the same b/c the dry run flag, but we don't know whether there will be other background jobs
@@ -8827,14 +8834,14 @@ mod tests {
verify_result().await;
tline
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
.compact_with_gc(&cancel, CompactOptions::default(), &ctx)
.await
.unwrap();
verify_result().await;
// compact again
tline
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
.compact_with_gc(&cancel, CompactOptions::default(), &ctx)
.await
.unwrap();
verify_result().await;
@@ -8847,14 +8854,14 @@ mod tests {
guard.cutoffs.space = Lsn(0x38);
}
tline
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
.compact_with_gc(&cancel, CompactOptions::default(), &ctx)
.await
.unwrap();
verify_result().await; // no wals between 0x30 and 0x38, so we should obtain the same result
// not increasing the GC horizon and compact again
tline
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
.compact_with_gc(&cancel, CompactOptions::default(), &ctx)
.await
.unwrap();
verify_result().await;
@@ -9048,7 +9055,14 @@ mod tests {
dryrun_flags.insert(CompactFlags::DryRun);
tline
.compact_with_gc(&cancel, dryrun_flags, &ctx)
.compact_with_gc(
&cancel,
CompactOptions {
flags: dryrun_flags,
compact_range: None,
},
&ctx,
)
.await
.unwrap();
// We expect layer map to be the same b/c the dry run flag, but we don't know whether there will be other background jobs
@@ -9056,14 +9070,14 @@ mod tests {
verify_result().await;
tline
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
.compact_with_gc(&cancel, CompactOptions::default(), &ctx)
.await
.unwrap();
verify_result().await;
// compact again
tline
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
.compact_with_gc(&cancel, CompactOptions::default(), &ctx)
.await
.unwrap();
verify_result().await;
@@ -9248,7 +9262,7 @@ mod tests {
let cancel = CancellationToken::new();
branch_tline
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
.compact_with_gc(&cancel, CompactOptions::default(), &ctx)
.await
.unwrap();

View File

@@ -774,6 +774,21 @@ pub(crate) enum CompactFlags {
DryRun,
}
#[serde_with::serde_as]
#[derive(Debug, Clone, serde::Deserialize)]
pub(crate) struct CompactRange {
#[serde_as(as = "serde_with::DisplayFromStr")]
pub start: Key,
#[serde_as(as = "serde_with::DisplayFromStr")]
pub end: Key,
}
#[derive(Clone, Default)]
pub(crate) struct CompactOptions {
pub flags: EnumSet<CompactFlags>,
pub compact_range: Option<CompactRange>,
}
impl std::fmt::Debug for Timeline {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "Timeline<{}>", self.timeline_id)
@@ -1612,6 +1627,25 @@ impl Timeline {
cancel: &CancellationToken,
flags: EnumSet<CompactFlags>,
ctx: &RequestContext,
) -> Result<bool, CompactionError> {
self.compact_with_options(
cancel,
CompactOptions {
flags,
compact_range: None,
},
ctx,
)
.await
}
/// Outermost timeline compaction operation; downloads needed layers. Returns whether we have pending
/// compaction tasks.
pub(crate) async fn compact_with_options(
self: &Arc<Self>,
cancel: &CancellationToken,
options: CompactOptions,
ctx: &RequestContext,
) -> Result<bool, CompactionError> {
// most likely the cancellation token is from background task, but in tests it could be the
// request task as well.
@@ -1649,7 +1683,7 @@ impl Timeline {
self.compact_tiered(cancel, ctx).await?;
Ok(false)
}
CompactionAlgorithm::Legacy => self.compact_legacy(cancel, flags, ctx).await,
CompactionAlgorithm::Legacy => self.compact_legacy(cancel, options, ctx).await,
}
}

View File

@@ -10,7 +10,7 @@ use std::sync::Arc;
use super::layer_manager::LayerManager;
use super::{
CompactFlags, CreateImageLayersError, DurationRecorder, ImageLayerCreationMode,
CompactFlags, CompactOptions, CreateImageLayersError, DurationRecorder, ImageLayerCreationMode,
RecordedDuration, Timeline,
};
@@ -273,22 +273,32 @@ impl Timeline {
pub(crate) async fn compact_legacy(
self: &Arc<Self>,
cancel: &CancellationToken,
flags: EnumSet<CompactFlags>,
options: CompactOptions,
ctx: &RequestContext,
) -> Result<bool, CompactionError> {
if flags.contains(CompactFlags::EnhancedGcBottomMostCompaction) {
self.compact_with_gc(cancel, flags, ctx)
if options
.flags
.contains(CompactFlags::EnhancedGcBottomMostCompaction)
{
self.compact_with_gc(cancel, options, ctx)
.await
.map_err(CompactionError::Other)?;
return Ok(false);
}
if flags.contains(CompactFlags::DryRun) {
if options.flags.contains(CompactFlags::DryRun) {
return Err(CompactionError::Other(anyhow!(
"dry-run mode is not supported for legacy compaction for now"
)));
}
if options.compact_range.is_some() {
// maybe useful in the future? could implement this at some point
return Err(CompactionError::Other(anyhow!(
"compaction range is not supported for legacy compaction for now"
)));
}
// High level strategy for compaction / image creation:
//
// 1. First, calculate the desired "partitioning" of the
@@ -338,7 +348,7 @@ impl Timeline {
.repartition(
self.get_last_record_lsn(),
self.get_compaction_target_size(),
flags,
options.flags,
ctx,
)
.await
@@ -354,7 +364,7 @@ impl Timeline {
let fully_compacted = self
.compact_level0(
target_file_size,
flags.contains(CompactFlags::ForceL0Compaction),
options.flags.contains(CompactFlags::ForceL0Compaction),
ctx,
)
.await?;
@@ -372,7 +382,10 @@ impl Timeline {
.create_image_layers(
&partitioning,
lsn,
if flags.contains(CompactFlags::ForceImageLayerCreation) {
if options
.flags
.contains(CompactFlags::ForceImageLayerCreation)
{
ImageLayerCreationMode::Force
} else {
ImageLayerCreationMode::Try
@@ -1736,11 +1749,19 @@ impl Timeline {
pub(crate) async fn compact_with_gc(
self: &Arc<Self>,
cancel: &CancellationToken,
flags: EnumSet<CompactFlags>,
options: CompactOptions,
ctx: &RequestContext,
) -> anyhow::Result<()> {
self.partial_compact_with_gc(Key::MIN..Key::MAX, cancel, flags, ctx)
.await
self.partial_compact_with_gc(
options
.compact_range
.map(|range| range.start..range.end)
.unwrap_or_else(|| Key::MIN..Key::MAX),
cancel,
options.flags,
ctx,
)
.await
}
/// An experimental compaction building block that combines compaction with garbage collection.

View File

@@ -665,6 +665,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
force_l0_compaction=False,
wait_until_uploaded=False,
enhanced_gc_bottom_most_compaction=False,
body: Optional[dict[str, Any]] = None,
):
self.is_testing_enabled_or_skip()
query = {}
@@ -683,6 +684,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/compact",
params=query,
json=body,
)
log.info(f"Got compact request response code: {res.status_code}")
self.verbose_error(res)

View File

@@ -116,6 +116,45 @@ page_cache_size=10
assert vectored_average < 8
def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start(initial_tenant_conf=AGGRESIVE_COMPACTION_TENANT_CONF)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
row_count = 1000
churn_rounds = 10
ps_http = env.pageserver.http_client()
workload = Workload(env, tenant_id, timeline_id)
workload.init(env.pageserver.id)
log.info("Writing initial data ...")
workload.write_rows(row_count, env.pageserver.id)
for i in range(1, churn_rounds + 1):
if i % 10 == 0:
log.info(f"Running churn round {i}/{churn_rounds} ...")
workload.churn_rows(row_count, env.pageserver.id)
# Force L0 compaction to ensure the number of layers is within bounds, so that gc-compaction can run.
ps_http.timeline_compact(tenant_id, timeline_id, force_l0_compaction=True)
assert ps_http.perf_info(tenant_id, timeline_id)[0]["num_of_l0"] <= 1
ps_http.timeline_compact(
tenant_id,
timeline_id,
enhanced_gc_bottom_most_compaction=True,
body={
"start": "000000000000000000000000000000000000",
"end": "030000000000000000000000000000000000",
},
)
log.info("Validating at workload end ...")
workload.validate(env.pageserver.id)
# Stripe sizes in number of pages.
TINY_STRIPES = 16
LARGE_STRIPES = 32768