Simplify pageserver_physical_gc function (#10104)

This simplifies the code in `pageserver_physical_gc` a little bit after
the feedback in #10007 that the code is too complicated.

Most importantly, we don't pass around `GcSummary` any more in a
complicated fashion, and we save on async stream-combinator-inception in
one place in favour of `try_stream!{}`.

Follow-up of #10007
This commit is contained in:
Arpad Müller
2025-01-20 22:57:15 +01:00
committed by GitHub
parent 2de2b26c62
commit 2ab9f69825

View File

@@ -8,6 +8,8 @@ use crate::checks::{
}; };
use crate::metadata_stream::{stream_tenant_timelines, stream_tenants}; use crate::metadata_stream::{stream_tenant_timelines, stream_tenants};
use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId, MAX_RETRIES}; use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId, MAX_RETRIES};
use async_stream::try_stream;
use futures::future::Either;
use futures_util::{StreamExt, TryStreamExt}; use futures_util::{StreamExt, TryStreamExt};
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata; use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
use pageserver::tenant::remote_timeline_client::manifest::OffloadedTimelineManifest; use pageserver::tenant::remote_timeline_client::manifest::OffloadedTimelineManifest;
@@ -578,7 +580,7 @@ async fn gc_timeline(
target: &RootTarget, target: &RootTarget,
mode: GcMode, mode: GcMode,
ttid: TenantShardTimelineId, ttid: TenantShardTimelineId,
accumulator: &Arc<std::sync::Mutex<TenantRefAccumulator>>, accumulator: &std::sync::Mutex<TenantRefAccumulator>,
tenant_manifest_info: Arc<Option<RemoteTenantManifestInfo>>, tenant_manifest_info: Arc<Option<RemoteTenantManifestInfo>>,
) -> anyhow::Result<GcSummary> { ) -> anyhow::Result<GcSummary> {
let mut summary = GcSummary::default(); let mut summary = GcSummary::default();
@@ -721,9 +723,9 @@ pub async fn pageserver_physical_gc(
let remote_client = Arc::new(remote_client); let remote_client = Arc::new(remote_client);
let tenants = if tenant_shard_ids.is_empty() { let tenants = if tenant_shard_ids.is_empty() {
futures::future::Either::Left(stream_tenants(&remote_client, &target)) Either::Left(stream_tenants(&remote_client, &target))
} else { } else {
futures::future::Either::Right(futures::stream::iter(tenant_shard_ids.into_iter().map(Ok))) Either::Right(futures::stream::iter(tenant_shard_ids.into_iter().map(Ok)))
}; };
// How many tenants to process in parallel. We need to be mindful of pageservers // How many tenants to process in parallel. We need to be mindful of pageservers
@@ -731,16 +733,16 @@ pub async fn pageserver_physical_gc(
const CONCURRENCY: usize = 32; const CONCURRENCY: usize = 32;
// Accumulate information about each tenant for cross-shard GC step we'll do at the end // Accumulate information about each tenant for cross-shard GC step we'll do at the end
let accumulator = Arc::new(std::sync::Mutex::new(TenantRefAccumulator::default())); let accumulator = std::sync::Mutex::new(TenantRefAccumulator::default());
// Accumulate information about how many manifests we have GCd
let manifest_gc_summary = std::sync::Mutex::new(GcSummary::default());
// Generate a stream of TenantTimelineId // Generate a stream of TenantTimelineId
enum GcSummaryOrContent<T> {
Content(T),
GcSummary(GcSummary),
}
let timelines = tenants.map_ok(|tenant_shard_id| { let timelines = tenants.map_ok(|tenant_shard_id| {
let target_ref = &target; let target_ref = &target;
let remote_client_ref = &remote_client; let remote_client_ref = &remote_client;
let manifest_gc_summary_ref = &manifest_gc_summary;
async move { async move {
let gc_manifest_result = gc_tenant_manifests( let gc_manifest_result = gc_tenant_manifests(
remote_client_ref, remote_client_ref,
@@ -757,55 +759,48 @@ pub async fn pageserver_physical_gc(
(GcSummary::default(), None) (GcSummary::default(), None)
} }
}; };
manifest_gc_summary_ref
.lock()
.unwrap()
.merge(summary_from_manifest);
let tenant_manifest_arc = Arc::new(tenant_manifest_opt); let tenant_manifest_arc = Arc::new(tenant_manifest_opt);
let summary_from_manifest = Ok(GcSummaryOrContent::<(_, _)>::GcSummary( let mut timelines = Box::pin(
summary_from_manifest, stream_tenant_timelines(remote_client_ref, target_ref, tenant_shard_id).await?,
)); );
stream_tenant_timelines(remote_client_ref, target_ref, tenant_shard_id) Ok(try_stream! {
.await while let Some(ttid_res) = timelines.next().await {
.map(|stream| { let ttid = ttid_res?;
stream yield (ttid, tenant_manifest_arc.clone());
.zip(futures::stream::iter(std::iter::repeat( }
tenant_manifest_arc, })
)))
.map(|(ttid_res, tenant_manifest_arc)| {
ttid_res.map(move |ttid| {
GcSummaryOrContent::Content((ttid, tenant_manifest_arc))
})
})
.chain(futures::stream::iter([summary_from_manifest].into_iter()))
})
} }
}); });
let timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));
let timelines = timelines.try_flatten();
let mut summary = GcSummary::default(); let mut summary = GcSummary::default();
// Drain futures for per-shard GC, populating accumulator as a side effect
{ {
let timelines = timelines.map_ok(|summary_or_ttid| match summary_or_ttid { let timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));
GcSummaryOrContent::Content((ttid, tenant_manifest_arc)) => { let timelines = timelines.try_flatten();
futures::future::Either::Left(gc_timeline(
&remote_client, let timelines = timelines.map_ok(|(ttid, tenant_manifest_arc)| {
&min_age, gc_timeline(
&target, &remote_client,
mode, &min_age,
ttid, &target,
&accumulator, mode,
tenant_manifest_arc, ttid,
)) &accumulator,
} tenant_manifest_arc,
GcSummaryOrContent::GcSummary(gc_summary) => { )
futures::future::Either::Right(futures::future::ok(gc_summary))
}
}); });
let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY)); let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY));
// Drain futures for per-shard GC, populating accumulator as a side effect
while let Some(i) = timelines.next().await { while let Some(i) = timelines.next().await {
summary.merge(i?); summary.merge(i?);
} }
} }
// Streams are lazily evaluated, so only now do we have access to the inner object
summary.merge(manifest_gc_summary.into_inner().unwrap());
// Execute cross-shard GC, using the accumulator's full view of all the shards built in the per-shard GC // Execute cross-shard GC, using the accumulator's full view of all the shards built in the per-shard GC
let Some(client) = controller_client else { let Some(client) = controller_client else {
@@ -813,8 +808,7 @@ pub async fn pageserver_physical_gc(
return Ok(summary); return Ok(summary);
}; };
let (ancestor_shards, ancestor_refs) = Arc::into_inner(accumulator) let (ancestor_shards, ancestor_refs) = accumulator
.unwrap()
.into_inner() .into_inner()
.unwrap() .unwrap()
.into_gc_ancestors(client, &mut summary) .into_gc_ancestors(client, &mut summary)