From 6f5c2626844e984abebb8704928ecdcbc87ad49e Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 25 Oct 2024 14:16:45 +0200 Subject: [PATCH] pageserver: add testing API to scan layers for disposable keys (#9393) This PR adds a pageserver mgmt API to scan a layer file for disposable keys. It hooks it up to the sharding compaction test, demonstrating that we're not filtering out all disposable keys. This is extracted from PGDATA import (https://github.com/neondatabase/neon/pull/9218) where I do the filtering of layer files based on `is_key_disposable`. --- libs/pageserver_api/src/models.rs | 6 ++ pageserver/src/http/routes.rs | 97 +++++++++++++++++++ .../src/tenant/storage_layer/delta_layer.rs | 14 ++- .../src/tenant/storage_layer/image_layer.rs | 15 +++ pageserver/src/tenant/storage_layer/layer.rs | 27 +++--- .../src/tenant/storage_layer/layer_desc.rs | 28 ++++++ pageserver/src/tenant/timeline/compaction.rs | 9 +- .../src/tenant/timeline/layer_manager.rs | 7 +- test_runner/fixtures/neon_fixtures.py | 51 +++++++++- test_runner/fixtures/pageserver/http.py | 30 ++++++ test_runner/regress/test_sharding.py | 16 ++- 11 files changed, 277 insertions(+), 23 deletions(-) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 8684927554..d37f62185c 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -1068,6 +1068,12 @@ pub mod virtual_file { } } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ScanDisposableKeysResponse { + pub disposable_count: usize, + pub not_disposable_count: usize, +} + // Wrapped in libpq CopyData #[derive(PartialEq, Eq, Debug)] pub enum PagestreamFeMessage { diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index bc03df9ad2..3943f62ac0 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -1293,6 +1293,99 @@ async fn layer_map_info_handler( json_response(StatusCode::OK, layer_map_info) } +#[instrument(skip_all, fields(tenant_id, shard_id, timeline_id, layer_name))] +async fn timeline_layer_scan_disposable_keys( + request: Request, + cancel: CancellationToken, +) -> Result, ApiError> { + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; + let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?; + let layer_name: LayerName = parse_request_param(&request, "layer_name")?; + + tracing::Span::current().record( + "tenant_id", + tracing::field::display(&tenant_shard_id.tenant_id), + ); + tracing::Span::current().record( + "shard_id", + tracing::field::display(tenant_shard_id.shard_slug()), + ); + tracing::Span::current().record("timeline_id", tracing::field::display(&timeline_id)); + tracing::Span::current().record("layer_name", tracing::field::display(&layer_name)); + + let state = get_state(&request); + + check_permission(&request, Some(tenant_shard_id.tenant_id))?; + + // technically the timeline need not be active for this scan to complete + let timeline = + active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id) + .await?; + + let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); + + let guard = timeline.layers.read().await; + let Some(layer) = guard.try_get_from_key(&layer_name.clone().into()) else { + return Err(ApiError::NotFound( + anyhow::anyhow!("Layer {tenant_shard_id}/{timeline_id}/{layer_name} not found").into(), + )); + }; + + let resident_layer = layer + .download_and_keep_resident() + .await + .map_err(|err| match err { + tenant::storage_layer::layer::DownloadError::TimelineShutdown + | tenant::storage_layer::layer::DownloadError::DownloadCancelled => { + ApiError::ShuttingDown + } + tenant::storage_layer::layer::DownloadError::ContextAndConfigReallyDeniesDownloads + | tenant::storage_layer::layer::DownloadError::DownloadRequired + | tenant::storage_layer::layer::DownloadError::NotFile(_) + | tenant::storage_layer::layer::DownloadError::DownloadFailed + | tenant::storage_layer::layer::DownloadError::PreStatFailed(_) => { + ApiError::InternalServerError(err.into()) + } + #[cfg(test)] + tenant::storage_layer::layer::DownloadError::Failpoint(_) => { + ApiError::InternalServerError(err.into()) + } + })?; + + let keys = resident_layer + .load_keys(&ctx) + .await + .map_err(ApiError::InternalServerError)?; + + let shard_identity = timeline.get_shard_identity(); + + let mut disposable_count = 0; + let mut not_disposable_count = 0; + let cancel = cancel.clone(); + for (i, key) in keys.into_iter().enumerate() { + if shard_identity.is_key_disposable(&key) { + disposable_count += 1; + tracing::debug!(key = %key, key.dbg=?key, "disposable key"); + } else { + not_disposable_count += 1; + } + #[allow(clippy::collapsible_if)] + if i % 10000 == 0 { + if cancel.is_cancelled() || timeline.cancel.is_cancelled() || timeline.is_stopping() { + return Err(ApiError::ShuttingDown); + } + } + } + + json_response( + StatusCode::OK, + pageserver_api::models::ScanDisposableKeysResponse { + disposable_count, + not_disposable_count, + }, + ) +} + async fn layer_download_handler( request: Request, _cancel: CancellationToken, @@ -3155,6 +3248,10 @@ pub fn make_router( "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/layer/:layer_file_name", |r| api_handler(r, evict_timeline_layer_handler), ) + .post( + "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/layer/:layer_name/scan_disposable_keys", + |r| testing_api_handler("timeline_layer_scan_disposable_keys", r, timeline_layer_scan_disposable_keys), + ) .post( "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/block_gc", |r| api_handler(r, timeline_gc_blocking_handler), diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index ceae1d4b1a..641729d681 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -1084,7 +1084,7 @@ impl DeltaLayerInner { } } - pub(super) async fn load_keys<'a>( + pub(crate) async fn index_entries<'a>( &'a self, ctx: &RequestContext, ) -> Result>> { @@ -1346,7 +1346,7 @@ impl DeltaLayerInner { tree_reader.dump().await?; - let keys = self.load_keys(ctx).await?; + let keys = self.index_entries(ctx).await?; async fn dump_blob(val: &ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result { let buf = val.load_raw(ctx).await?; @@ -1453,6 +1453,16 @@ impl DeltaLayerInner { ), } } + + /// NB: not super efficient, but not terrible either. Should prob be an iterator. + // + // We're reusing the index traversal logical in plan_reads; would be nice to + // factor that out. + pub(crate) async fn load_keys(&self, ctx: &RequestContext) -> anyhow::Result> { + self.index_entries(ctx) + .await + .map(|entries| entries.into_iter().map(|entry| entry.key).collect()) + } } /// A set of data associated with a delta layer key and its value diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index ff2be1780e..3f90df312d 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -673,6 +673,21 @@ impl ImageLayerInner { ), } } + + /// NB: not super efficient, but not terrible either. Should prob be an iterator. + // + // We're reusing the index traversal logical in plan_reads; would be nice to + // factor that out. + pub(crate) async fn load_keys(&self, ctx: &RequestContext) -> anyhow::Result> { + let plan = self + .plan_reads(KeySpace::single(self.key_range.clone()), None, ctx) + .await?; + Ok(plan + .into_iter() + .flat_map(|read| read.blobs_at) + .map(|(_, blob_meta)| blob_meta.key) + .collect()) + } } /// A builder object for constructing a new image layer. diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index 38a7cd09af..a9f1189b41 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -19,7 +19,7 @@ use crate::task_mgr::TaskKind; use crate::tenant::timeline::{CompactionError, GetVectoredError}; use crate::tenant::{remote_timeline_client::LayerFileMetadata, Timeline}; -use super::delta_layer::{self, DeltaEntry}; +use super::delta_layer::{self}; use super::image_layer::{self}; use super::{ AsLayerDesc, ImageLayerWriter, LayerAccessStats, LayerAccessStatsReset, LayerName, @@ -1841,23 +1841,22 @@ impl ResidentLayer { pub(crate) async fn load_keys<'a>( &'a self, ctx: &RequestContext, - ) -> anyhow::Result>> { + ) -> anyhow::Result> { use LayerKind::*; let owner = &self.owner.0; - match self.downloaded.get(owner, ctx).await? { - Delta(ref d) => { - // this is valid because the DownloadedLayer::kind is a OnceCell, not a - // Mutex, so we cannot go and deinitialize the value with OnceCell::take - // while it's being held. - self.owner.record_access(ctx); + let inner = self.downloaded.get(owner, ctx).await?; - delta_layer::DeltaLayerInner::load_keys(d, ctx) - .await - .with_context(|| format!("Layer index is corrupted for {self}")) - } - Image(_) => anyhow::bail!(format!("cannot load_keys on a image layer {self}")), - } + // this is valid because the DownloadedLayer::kind is a OnceCell, not a + // Mutex, so we cannot go and deinitialize the value with OnceCell::take + // while it's being held. + self.owner.record_access(ctx); + + let res = match inner { + Delta(ref d) => delta_layer::DeltaLayerInner::load_keys(d, ctx).await, + Image(ref i) => image_layer::ImageLayerInner::load_keys(i, ctx).await, + }; + res.with_context(|| format!("Layer index is corrupted for {self}")) } /// Read all they keys in this layer which match the ShardIdentity, and write them all to diff --git a/pageserver/src/tenant/storage_layer/layer_desc.rs b/pageserver/src/tenant/storage_layer/layer_desc.rs index e90ff3c4b2..a30c25d780 100644 --- a/pageserver/src/tenant/storage_layer/layer_desc.rs +++ b/pageserver/src/tenant/storage_layer/layer_desc.rs @@ -57,6 +57,34 @@ impl std::fmt::Display for PersistentLayerKey { } } +impl From for PersistentLayerKey { + fn from(image_layer_name: ImageLayerName) -> Self { + Self { + key_range: image_layer_name.key_range, + lsn_range: PersistentLayerDesc::image_layer_lsn_range(image_layer_name.lsn), + is_delta: false, + } + } +} + +impl From for PersistentLayerKey { + fn from(delta_layer_name: DeltaLayerName) -> Self { + Self { + key_range: delta_layer_name.key_range, + lsn_range: delta_layer_name.lsn_range, + is_delta: true, + } + } +} + +impl From for PersistentLayerKey { + fn from(layer_name: LayerName) -> Self { + match layer_name { + LayerName::Image(i) => i.into(), + LayerName::Delta(d) => d.into(), + } + } +} impl PersistentLayerDesc { pub fn key(&self) -> PersistentLayerKey { PersistentLayerKey { diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 6aa5b30f07..73e4f0e87c 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -834,7 +834,12 @@ impl Timeline { if self.cancel.is_cancelled() { return Err(CompactionError::ShuttingDown); } - all_keys.extend(l.load_keys(ctx).await.map_err(CompactionError::Other)?); + let delta = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?; + let keys = delta + .index_entries(ctx) + .await + .map_err(CompactionError::Other)?; + all_keys.extend(keys); } // The current stdlib sorting implementation is designed in a way where it is // particularly fast where the slice is made up of sorted sub-ranges. @@ -2438,7 +2443,7 @@ impl CompactionDeltaLayer for ResidentDeltaLayer { type DeltaEntry<'a> = DeltaEntry<'a>; async fn load_keys<'a>(&self, ctx: &RequestContext) -> anyhow::Result>> { - self.0.load_keys(ctx).await + self.0.get_as_delta(ctx).await?.index_entries(ctx).await } } diff --git a/pageserver/src/tenant/timeline/layer_manager.rs b/pageserver/src/tenant/timeline/layer_manager.rs index 8f20d84401..4293a44dca 100644 --- a/pageserver/src/tenant/timeline/layer_manager.rs +++ b/pageserver/src/tenant/timeline/layer_manager.rs @@ -45,13 +45,16 @@ impl LayerManager { pub(crate) fn get_from_key(&self, key: &PersistentLayerKey) -> Layer { // The assumption for the `expect()` is that all code maintains the following invariant: // A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor. - self.layers() - .get(key) + self.try_get_from_key(key) .with_context(|| format!("get layer from key: {key}")) .expect("not found") .clone() } + pub(crate) fn try_get_from_key(&self, key: &PersistentLayerKey) -> Option<&Layer> { + self.layers().get(key) + } + pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer { self.get_from_key(&desc.key()) } diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index a1ea056213..6491069f20 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -61,7 +61,11 @@ from fixtures.pageserver.allowed_errors import ( DEFAULT_STORAGE_CONTROLLER_ALLOWED_ERRORS, ) from fixtures.pageserver.common_types import LayerName, parse_layer_file_name -from fixtures.pageserver.http import PageserverHttpClient +from fixtures.pageserver.http import ( + HistoricLayerInfo, + PageserverHttpClient, + ScanDisposableKeysResponse, +) from fixtures.pageserver.utils import ( wait_for_last_record_lsn, ) @@ -2670,6 +2674,51 @@ class NeonPageserver(PgProtocol, LogUtils): layers = self.list_layers(tenant_id, timeline_id) return layer_name in [parse_layer_file_name(p.name) for p in layers] + def timeline_scan_no_disposable_keys( + self, tenant_shard_id: TenantShardId, timeline_id: TimelineId + ) -> TimelineAssertNoDisposableKeysResult: + """ + Scan all keys in all layers of the tenant/timeline for disposable keys. + Disposable keys are keys that are present in a layer referenced by the shard + but are not going to be accessed by the shard. + For example, after shard split, the child shards will reference the parent's layer + files until new data is ingested and/or compaction rewrites the layers. + """ + + ps_http = self.http_client() + tally = ScanDisposableKeysResponse(0, 0) + per_layer = [] + with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: + futs = [] + shard_layer_map = ps_http.layer_map_info(tenant_shard_id, timeline_id) + for layer in shard_layer_map.historic_layers: + + def do_layer( + shard_ps_http: PageserverHttpClient, + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + layer: HistoricLayerInfo, + ) -> tuple[HistoricLayerInfo, ScanDisposableKeysResponse]: + return ( + layer, + shard_ps_http.timeline_layer_scan_disposable_keys( + tenant_shard_id, timeline_id, layer.layer_file_name + ), + ) + + futs.append(executor.submit(do_layer, ps_http, tenant_shard_id, timeline_id, layer)) + for fut in futs: + layer, result = fut.result() + tally += result + per_layer.append((layer, result)) + return TimelineAssertNoDisposableKeysResult(tally, per_layer) + + +@dataclass +class TimelineAssertNoDisposableKeysResult: + tally: ScanDisposableKeysResponse + per_layer: list[tuple[HistoricLayerInfo, ScanDisposableKeysResponse]] + class PgBin: """A helper class for executing postgres binaries""" diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py index 706bc550e5..175a1870d4 100644 --- a/test_runner/fixtures/pageserver/http.py +++ b/test_runner/fixtures/pageserver/http.py @@ -129,6 +129,26 @@ class LayerMapInfo: return set(x.layer_file_name for x in self.historic_layers) +@dataclass +class ScanDisposableKeysResponse: + disposable_count: int + not_disposable_count: int + + def __add__(self, b): + a = self + assert isinstance(a, ScanDisposableKeysResponse) + assert isinstance(b, ScanDisposableKeysResponse) + return ScanDisposableKeysResponse( + a.disposable_count + b.disposable_count, a.not_disposable_count + b.not_disposable_count + ) + + @classmethod + def from_json(cls, d: dict[str, Any]) -> ScanDisposableKeysResponse: + disposable_count = d["disposable_count"] + not_disposable_count = d["not_disposable_count"] + return ScanDisposableKeysResponse(disposable_count, not_disposable_count) + + @dataclass class TenantConfig: tenant_specific_overrides: dict[str, Any] @@ -905,6 +925,16 @@ class PageserverHttpClient(requests.Session, MetricsGetter): self.verbose_error(res) return LayerMapInfo.from_json(res.json()) + def timeline_layer_scan_disposable_keys( + self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId, layer_name: str + ) -> ScanDisposableKeysResponse: + res = self.post( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/layer/{layer_name}/scan_disposable_keys", + ) + self.verbose_error(res) + assert res.status_code == 200 + return ScanDisposableKeysResponse.from_json(res.json()) + def download_layer( self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId, layer_name: str ): diff --git a/test_runner/regress/test_sharding.py b/test_runner/regress/test_sharding.py index 6c2a059098..3a249bbdb4 100644 --- a/test_runner/regress/test_sharding.py +++ b/test_runner/regress/test_sharding.py @@ -188,7 +188,9 @@ def test_sharding_split_unsharded( "compact-shard-ancestors-persistent", ], ) -def test_sharding_split_compaction(neon_env_builder: NeonEnvBuilder, failpoint: Optional[str]): +def test_sharding_split_compaction( + neon_env_builder: NeonEnvBuilder, failpoint: Optional[str], build_type: str +): """ Test that after a split, we clean up parent layer data in the child shards via compaction. """ @@ -322,9 +324,19 @@ def test_sharding_split_compaction(neon_env_builder: NeonEnvBuilder, failpoint: # Physical size should shrink because layers are smaller assert detail_after["current_physical_size"] < detail_before["current_physical_size"] - # Validate size statistics + # Validate filtering compaction actually happened for shard in shards: ps = env.get_tenant_pageserver(shard) + + log.info("scan all layer files for disposable keys, there shouldn't be any") + result = ps.timeline_scan_no_disposable_keys(shard, timeline_id) + tally = result.tally + raw_page_count = tally.not_disposable_count + tally.disposable_count + assert tally.not_disposable_count > ( + raw_page_count // 2 + ), "compaction doesn't rewrite layers that are >=50pct local" + + log.info("check sizes") timeline_info = ps.http_client().timeline_detail(shard, timeline_id) reported_size = timeline_info["current_physical_size"] layer_paths = ps.list_layers(shard, timeline_id)