diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs
index 8684927554..d37f62185c 100644
--- a/libs/pageserver_api/src/models.rs
+++ b/libs/pageserver_api/src/models.rs
@@ -1068,6 +1068,12 @@ pub mod virtual_file {
}
}
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct ScanDisposableKeysResponse {
+ pub disposable_count: usize,
+ pub not_disposable_count: usize,
+}
+
// Wrapped in libpq CopyData
#[derive(PartialEq, Eq, Debug)]
pub enum PagestreamFeMessage {
diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs
index bc03df9ad2..3943f62ac0 100644
--- a/pageserver/src/http/routes.rs
+++ b/pageserver/src/http/routes.rs
@@ -1293,6 +1293,99 @@ async fn layer_map_info_handler(
json_response(StatusCode::OK, layer_map_info)
}
+#[instrument(skip_all, fields(tenant_id, shard_id, timeline_id, layer_name))]
+async fn timeline_layer_scan_disposable_keys(
+ request: Request
,
+ cancel: CancellationToken,
+) -> Result, ApiError> {
+ let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
+ let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
+ let layer_name: LayerName = parse_request_param(&request, "layer_name")?;
+
+ tracing::Span::current().record(
+ "tenant_id",
+ tracing::field::display(&tenant_shard_id.tenant_id),
+ );
+ tracing::Span::current().record(
+ "shard_id",
+ tracing::field::display(tenant_shard_id.shard_slug()),
+ );
+ tracing::Span::current().record("timeline_id", tracing::field::display(&timeline_id));
+ tracing::Span::current().record("layer_name", tracing::field::display(&layer_name));
+
+ let state = get_state(&request);
+
+ check_permission(&request, Some(tenant_shard_id.tenant_id))?;
+
+ // technically the timeline need not be active for this scan to complete
+ let timeline =
+ active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
+ .await?;
+
+ let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
+
+ let guard = timeline.layers.read().await;
+ let Some(layer) = guard.try_get_from_key(&layer_name.clone().into()) else {
+ return Err(ApiError::NotFound(
+ anyhow::anyhow!("Layer {tenant_shard_id}/{timeline_id}/{layer_name} not found").into(),
+ ));
+ };
+
+ let resident_layer = layer
+ .download_and_keep_resident()
+ .await
+ .map_err(|err| match err {
+ tenant::storage_layer::layer::DownloadError::TimelineShutdown
+ | tenant::storage_layer::layer::DownloadError::DownloadCancelled => {
+ ApiError::ShuttingDown
+ }
+ tenant::storage_layer::layer::DownloadError::ContextAndConfigReallyDeniesDownloads
+ | tenant::storage_layer::layer::DownloadError::DownloadRequired
+ | tenant::storage_layer::layer::DownloadError::NotFile(_)
+ | tenant::storage_layer::layer::DownloadError::DownloadFailed
+ | tenant::storage_layer::layer::DownloadError::PreStatFailed(_) => {
+ ApiError::InternalServerError(err.into())
+ }
+ #[cfg(test)]
+ tenant::storage_layer::layer::DownloadError::Failpoint(_) => {
+ ApiError::InternalServerError(err.into())
+ }
+ })?;
+
+ let keys = resident_layer
+ .load_keys(&ctx)
+ .await
+ .map_err(ApiError::InternalServerError)?;
+
+ let shard_identity = timeline.get_shard_identity();
+
+ let mut disposable_count = 0;
+ let mut not_disposable_count = 0;
+ let cancel = cancel.clone();
+ for (i, key) in keys.into_iter().enumerate() {
+ if shard_identity.is_key_disposable(&key) {
+ disposable_count += 1;
+ tracing::debug!(key = %key, key.dbg=?key, "disposable key");
+ } else {
+ not_disposable_count += 1;
+ }
+ #[allow(clippy::collapsible_if)]
+ if i % 10000 == 0 {
+ if cancel.is_cancelled() || timeline.cancel.is_cancelled() || timeline.is_stopping() {
+ return Err(ApiError::ShuttingDown);
+ }
+ }
+ }
+
+ json_response(
+ StatusCode::OK,
+ pageserver_api::models::ScanDisposableKeysResponse {
+ disposable_count,
+ not_disposable_count,
+ },
+ )
+}
+
async fn layer_download_handler(
request: Request,
_cancel: CancellationToken,
@@ -3155,6 +3248,10 @@ pub fn make_router(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/layer/:layer_file_name",
|r| api_handler(r, evict_timeline_layer_handler),
)
+ .post(
+ "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/layer/:layer_name/scan_disposable_keys",
+ |r| testing_api_handler("timeline_layer_scan_disposable_keys", r, timeline_layer_scan_disposable_keys),
+ )
.post(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/block_gc",
|r| api_handler(r, timeline_gc_blocking_handler),
diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs
index ceae1d4b1a..641729d681 100644
--- a/pageserver/src/tenant/storage_layer/delta_layer.rs
+++ b/pageserver/src/tenant/storage_layer/delta_layer.rs
@@ -1084,7 +1084,7 @@ impl DeltaLayerInner {
}
}
- pub(super) async fn load_keys<'a>(
+ pub(crate) async fn index_entries<'a>(
&'a self,
ctx: &RequestContext,
) -> Result>> {
@@ -1346,7 +1346,7 @@ impl DeltaLayerInner {
tree_reader.dump().await?;
- let keys = self.load_keys(ctx).await?;
+ let keys = self.index_entries(ctx).await?;
async fn dump_blob(val: &ValueRef<'_>, ctx: &RequestContext) -> anyhow::Result {
let buf = val.load_raw(ctx).await?;
@@ -1453,6 +1453,16 @@ impl DeltaLayerInner {
),
}
}
+
+ /// NB: not super efficient, but not terrible either. Should prob be an iterator.
+ //
+ // We're reusing the index traversal logical in plan_reads; would be nice to
+ // factor that out.
+ pub(crate) async fn load_keys(&self, ctx: &RequestContext) -> anyhow::Result> {
+ self.index_entries(ctx)
+ .await
+ .map(|entries| entries.into_iter().map(|entry| entry.key).collect())
+ }
}
/// A set of data associated with a delta layer key and its value
diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs
index ff2be1780e..3f90df312d 100644
--- a/pageserver/src/tenant/storage_layer/image_layer.rs
+++ b/pageserver/src/tenant/storage_layer/image_layer.rs
@@ -673,6 +673,21 @@ impl ImageLayerInner {
),
}
}
+
+ /// NB: not super efficient, but not terrible either. Should prob be an iterator.
+ //
+ // We're reusing the index traversal logical in plan_reads; would be nice to
+ // factor that out.
+ pub(crate) async fn load_keys(&self, ctx: &RequestContext) -> anyhow::Result> {
+ let plan = self
+ .plan_reads(KeySpace::single(self.key_range.clone()), None, ctx)
+ .await?;
+ Ok(plan
+ .into_iter()
+ .flat_map(|read| read.blobs_at)
+ .map(|(_, blob_meta)| blob_meta.key)
+ .collect())
+ }
}
/// A builder object for constructing a new image layer.
diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs
index 38a7cd09af..a9f1189b41 100644
--- a/pageserver/src/tenant/storage_layer/layer.rs
+++ b/pageserver/src/tenant/storage_layer/layer.rs
@@ -19,7 +19,7 @@ use crate::task_mgr::TaskKind;
use crate::tenant::timeline::{CompactionError, GetVectoredError};
use crate::tenant::{remote_timeline_client::LayerFileMetadata, Timeline};
-use super::delta_layer::{self, DeltaEntry};
+use super::delta_layer::{self};
use super::image_layer::{self};
use super::{
AsLayerDesc, ImageLayerWriter, LayerAccessStats, LayerAccessStatsReset, LayerName,
@@ -1841,23 +1841,22 @@ impl ResidentLayer {
pub(crate) async fn load_keys<'a>(
&'a self,
ctx: &RequestContext,
- ) -> anyhow::Result>> {
+ ) -> anyhow::Result> {
use LayerKind::*;
let owner = &self.owner.0;
- match self.downloaded.get(owner, ctx).await? {
- Delta(ref d) => {
- // this is valid because the DownloadedLayer::kind is a OnceCell, not a
- // Mutex, so we cannot go and deinitialize the value with OnceCell::take
- // while it's being held.
- self.owner.record_access(ctx);
+ let inner = self.downloaded.get(owner, ctx).await?;
- delta_layer::DeltaLayerInner::load_keys(d, ctx)
- .await
- .with_context(|| format!("Layer index is corrupted for {self}"))
- }
- Image(_) => anyhow::bail!(format!("cannot load_keys on a image layer {self}")),
- }
+ // this is valid because the DownloadedLayer::kind is a OnceCell, not a
+ // Mutex, so we cannot go and deinitialize the value with OnceCell::take
+ // while it's being held.
+ self.owner.record_access(ctx);
+
+ let res = match inner {
+ Delta(ref d) => delta_layer::DeltaLayerInner::load_keys(d, ctx).await,
+ Image(ref i) => image_layer::ImageLayerInner::load_keys(i, ctx).await,
+ };
+ res.with_context(|| format!("Layer index is corrupted for {self}"))
}
/// Read all they keys in this layer which match the ShardIdentity, and write them all to
diff --git a/pageserver/src/tenant/storage_layer/layer_desc.rs b/pageserver/src/tenant/storage_layer/layer_desc.rs
index e90ff3c4b2..a30c25d780 100644
--- a/pageserver/src/tenant/storage_layer/layer_desc.rs
+++ b/pageserver/src/tenant/storage_layer/layer_desc.rs
@@ -57,6 +57,34 @@ impl std::fmt::Display for PersistentLayerKey {
}
}
+impl From for PersistentLayerKey {
+ fn from(image_layer_name: ImageLayerName) -> Self {
+ Self {
+ key_range: image_layer_name.key_range,
+ lsn_range: PersistentLayerDesc::image_layer_lsn_range(image_layer_name.lsn),
+ is_delta: false,
+ }
+ }
+}
+
+impl From for PersistentLayerKey {
+ fn from(delta_layer_name: DeltaLayerName) -> Self {
+ Self {
+ key_range: delta_layer_name.key_range,
+ lsn_range: delta_layer_name.lsn_range,
+ is_delta: true,
+ }
+ }
+}
+
+impl From for PersistentLayerKey {
+ fn from(layer_name: LayerName) -> Self {
+ match layer_name {
+ LayerName::Image(i) => i.into(),
+ LayerName::Delta(d) => d.into(),
+ }
+ }
+}
impl PersistentLayerDesc {
pub fn key(&self) -> PersistentLayerKey {
PersistentLayerKey {
diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs
index 6aa5b30f07..73e4f0e87c 100644
--- a/pageserver/src/tenant/timeline/compaction.rs
+++ b/pageserver/src/tenant/timeline/compaction.rs
@@ -834,7 +834,12 @@ impl Timeline {
if self.cancel.is_cancelled() {
return Err(CompactionError::ShuttingDown);
}
- all_keys.extend(l.load_keys(ctx).await.map_err(CompactionError::Other)?);
+ let delta = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
+ let keys = delta
+ .index_entries(ctx)
+ .await
+ .map_err(CompactionError::Other)?;
+ all_keys.extend(keys);
}
// The current stdlib sorting implementation is designed in a way where it is
// particularly fast where the slice is made up of sorted sub-ranges.
@@ -2438,7 +2443,7 @@ impl CompactionDeltaLayer for ResidentDeltaLayer {
type DeltaEntry<'a> = DeltaEntry<'a>;
async fn load_keys<'a>(&self, ctx: &RequestContext) -> anyhow::Result>> {
- self.0.load_keys(ctx).await
+ self.0.get_as_delta(ctx).await?.index_entries(ctx).await
}
}
diff --git a/pageserver/src/tenant/timeline/layer_manager.rs b/pageserver/src/tenant/timeline/layer_manager.rs
index 8f20d84401..4293a44dca 100644
--- a/pageserver/src/tenant/timeline/layer_manager.rs
+++ b/pageserver/src/tenant/timeline/layer_manager.rs
@@ -45,13 +45,16 @@ impl LayerManager {
pub(crate) fn get_from_key(&self, key: &PersistentLayerKey) -> Layer {
// The assumption for the `expect()` is that all code maintains the following invariant:
// A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
- self.layers()
- .get(key)
+ self.try_get_from_key(key)
.with_context(|| format!("get layer from key: {key}"))
.expect("not found")
.clone()
}
+ pub(crate) fn try_get_from_key(&self, key: &PersistentLayerKey) -> Option<&Layer> {
+ self.layers().get(key)
+ }
+
pub(crate) fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Layer {
self.get_from_key(&desc.key())
}
diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py
index a1ea056213..6491069f20 100644
--- a/test_runner/fixtures/neon_fixtures.py
+++ b/test_runner/fixtures/neon_fixtures.py
@@ -61,7 +61,11 @@ from fixtures.pageserver.allowed_errors import (
DEFAULT_STORAGE_CONTROLLER_ALLOWED_ERRORS,
)
from fixtures.pageserver.common_types import LayerName, parse_layer_file_name
-from fixtures.pageserver.http import PageserverHttpClient
+from fixtures.pageserver.http import (
+ HistoricLayerInfo,
+ PageserverHttpClient,
+ ScanDisposableKeysResponse,
+)
from fixtures.pageserver.utils import (
wait_for_last_record_lsn,
)
@@ -2670,6 +2674,51 @@ class NeonPageserver(PgProtocol, LogUtils):
layers = self.list_layers(tenant_id, timeline_id)
return layer_name in [parse_layer_file_name(p.name) for p in layers]
+ def timeline_scan_no_disposable_keys(
+ self, tenant_shard_id: TenantShardId, timeline_id: TimelineId
+ ) -> TimelineAssertNoDisposableKeysResult:
+ """
+ Scan all keys in all layers of the tenant/timeline for disposable keys.
+ Disposable keys are keys that are present in a layer referenced by the shard
+ but are not going to be accessed by the shard.
+ For example, after shard split, the child shards will reference the parent's layer
+ files until new data is ingested and/or compaction rewrites the layers.
+ """
+
+ ps_http = self.http_client()
+ tally = ScanDisposableKeysResponse(0, 0)
+ per_layer = []
+ with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
+ futs = []
+ shard_layer_map = ps_http.layer_map_info(tenant_shard_id, timeline_id)
+ for layer in shard_layer_map.historic_layers:
+
+ def do_layer(
+ shard_ps_http: PageserverHttpClient,
+ tenant_shard_id: TenantShardId,
+ timeline_id: TimelineId,
+ layer: HistoricLayerInfo,
+ ) -> tuple[HistoricLayerInfo, ScanDisposableKeysResponse]:
+ return (
+ layer,
+ shard_ps_http.timeline_layer_scan_disposable_keys(
+ tenant_shard_id, timeline_id, layer.layer_file_name
+ ),
+ )
+
+ futs.append(executor.submit(do_layer, ps_http, tenant_shard_id, timeline_id, layer))
+ for fut in futs:
+ layer, result = fut.result()
+ tally += result
+ per_layer.append((layer, result))
+ return TimelineAssertNoDisposableKeysResult(tally, per_layer)
+
+
+@dataclass
+class TimelineAssertNoDisposableKeysResult:
+ tally: ScanDisposableKeysResponse
+ per_layer: list[tuple[HistoricLayerInfo, ScanDisposableKeysResponse]]
+
class PgBin:
"""A helper class for executing postgres binaries"""
diff --git a/test_runner/fixtures/pageserver/http.py b/test_runner/fixtures/pageserver/http.py
index 706bc550e5..175a1870d4 100644
--- a/test_runner/fixtures/pageserver/http.py
+++ b/test_runner/fixtures/pageserver/http.py
@@ -129,6 +129,26 @@ class LayerMapInfo:
return set(x.layer_file_name for x in self.historic_layers)
+@dataclass
+class ScanDisposableKeysResponse:
+ disposable_count: int
+ not_disposable_count: int
+
+ def __add__(self, b):
+ a = self
+ assert isinstance(a, ScanDisposableKeysResponse)
+ assert isinstance(b, ScanDisposableKeysResponse)
+ return ScanDisposableKeysResponse(
+ a.disposable_count + b.disposable_count, a.not_disposable_count + b.not_disposable_count
+ )
+
+ @classmethod
+ def from_json(cls, d: dict[str, Any]) -> ScanDisposableKeysResponse:
+ disposable_count = d["disposable_count"]
+ not_disposable_count = d["not_disposable_count"]
+ return ScanDisposableKeysResponse(disposable_count, not_disposable_count)
+
+
@dataclass
class TenantConfig:
tenant_specific_overrides: dict[str, Any]
@@ -905,6 +925,16 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
self.verbose_error(res)
return LayerMapInfo.from_json(res.json())
+ def timeline_layer_scan_disposable_keys(
+ self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId, layer_name: str
+ ) -> ScanDisposableKeysResponse:
+ res = self.post(
+ f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/layer/{layer_name}/scan_disposable_keys",
+ )
+ self.verbose_error(res)
+ assert res.status_code == 200
+ return ScanDisposableKeysResponse.from_json(res.json())
+
def download_layer(
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId, layer_name: str
):
diff --git a/test_runner/regress/test_sharding.py b/test_runner/regress/test_sharding.py
index 6c2a059098..3a249bbdb4 100644
--- a/test_runner/regress/test_sharding.py
+++ b/test_runner/regress/test_sharding.py
@@ -188,7 +188,9 @@ def test_sharding_split_unsharded(
"compact-shard-ancestors-persistent",
],
)
-def test_sharding_split_compaction(neon_env_builder: NeonEnvBuilder, failpoint: Optional[str]):
+def test_sharding_split_compaction(
+ neon_env_builder: NeonEnvBuilder, failpoint: Optional[str], build_type: str
+):
"""
Test that after a split, we clean up parent layer data in the child shards via compaction.
"""
@@ -322,9 +324,19 @@ def test_sharding_split_compaction(neon_env_builder: NeonEnvBuilder, failpoint:
# Physical size should shrink because layers are smaller
assert detail_after["current_physical_size"] < detail_before["current_physical_size"]
- # Validate size statistics
+ # Validate filtering compaction actually happened
for shard in shards:
ps = env.get_tenant_pageserver(shard)
+
+ log.info("scan all layer files for disposable keys, there shouldn't be any")
+ result = ps.timeline_scan_no_disposable_keys(shard, timeline_id)
+ tally = result.tally
+ raw_page_count = tally.not_disposable_count + tally.disposable_count
+ assert tally.not_disposable_count > (
+ raw_page_count // 2
+ ), "compaction doesn't rewrite layers that are >=50pct local"
+
+ log.info("check sizes")
timeline_info = ps.http_client().timeline_detail(shard, timeline_id)
reported_size = timeline_info["current_physical_size"]
layer_paths = ps.list_layers(shard, timeline_id)