From 2ab9f6982590cd8570f505df23c134cc71a1a576 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Mon, 20 Jan 2025 22:57:15 +0100 Subject: [PATCH] Simplify pageserver_physical_gc function (#10104) This simplifies the code in `pageserver_physical_gc` a little bit after the feedback in #10007 that the code is too complicated. Most importantly, we don't pass around `GcSummary` any more in a complicated fashion, and we save on async stream-combinator-inception in one place in favour of `try_stream!{}`. Follow-up of #10007 --- .../src/pageserver_physical_gc.rs | 86 +++++++++---------- 1 file changed, 40 insertions(+), 46 deletions(-) diff --git a/storage_scrubber/src/pageserver_physical_gc.rs b/storage_scrubber/src/pageserver_physical_gc.rs index a997373375..063c6bcfb9 100644 --- a/storage_scrubber/src/pageserver_physical_gc.rs +++ b/storage_scrubber/src/pageserver_physical_gc.rs @@ -8,6 +8,8 @@ use crate::checks::{ }; use crate::metadata_stream::{stream_tenant_timelines, stream_tenants}; use crate::{init_remote, BucketConfig, NodeKind, RootTarget, TenantShardTimelineId, MAX_RETRIES}; +use async_stream::try_stream; +use futures::future::Either; use futures_util::{StreamExt, TryStreamExt}; use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata; use pageserver::tenant::remote_timeline_client::manifest::OffloadedTimelineManifest; @@ -578,7 +580,7 @@ async fn gc_timeline( target: &RootTarget, mode: GcMode, ttid: TenantShardTimelineId, - accumulator: &Arc>, + accumulator: &std::sync::Mutex, tenant_manifest_info: Arc>, ) -> anyhow::Result { let mut summary = GcSummary::default(); @@ -721,9 +723,9 @@ pub async fn pageserver_physical_gc( let remote_client = Arc::new(remote_client); let tenants = if tenant_shard_ids.is_empty() { - futures::future::Either::Left(stream_tenants(&remote_client, &target)) + Either::Left(stream_tenants(&remote_client, &target)) } else { - futures::future::Either::Right(futures::stream::iter(tenant_shard_ids.into_iter().map(Ok))) + Either::Right(futures::stream::iter(tenant_shard_ids.into_iter().map(Ok))) }; // How many tenants to process in parallel. We need to be mindful of pageservers @@ -731,16 +733,16 @@ pub async fn pageserver_physical_gc( const CONCURRENCY: usize = 32; // Accumulate information about each tenant for cross-shard GC step we'll do at the end - let accumulator = Arc::new(std::sync::Mutex::new(TenantRefAccumulator::default())); + let accumulator = std::sync::Mutex::new(TenantRefAccumulator::default()); + + // Accumulate information about how many manifests we have GCd + let manifest_gc_summary = std::sync::Mutex::new(GcSummary::default()); // Generate a stream of TenantTimelineId - enum GcSummaryOrContent { - Content(T), - GcSummary(GcSummary), - } let timelines = tenants.map_ok(|tenant_shard_id| { let target_ref = ⌖ let remote_client_ref = &remote_client; + let manifest_gc_summary_ref = &manifest_gc_summary; async move { let gc_manifest_result = gc_tenant_manifests( remote_client_ref, @@ -757,55 +759,48 @@ pub async fn pageserver_physical_gc( (GcSummary::default(), None) } }; + manifest_gc_summary_ref + .lock() + .unwrap() + .merge(summary_from_manifest); let tenant_manifest_arc = Arc::new(tenant_manifest_opt); - let summary_from_manifest = Ok(GcSummaryOrContent::<(_, _)>::GcSummary( - summary_from_manifest, - )); - stream_tenant_timelines(remote_client_ref, target_ref, tenant_shard_id) - .await - .map(|stream| { - stream - .zip(futures::stream::iter(std::iter::repeat( - tenant_manifest_arc, - ))) - .map(|(ttid_res, tenant_manifest_arc)| { - ttid_res.map(move |ttid| { - GcSummaryOrContent::Content((ttid, tenant_manifest_arc)) - }) - }) - .chain(futures::stream::iter([summary_from_manifest].into_iter())) - }) + let mut timelines = Box::pin( + stream_tenant_timelines(remote_client_ref, target_ref, tenant_shard_id).await?, + ); + Ok(try_stream! { + while let Some(ttid_res) = timelines.next().await { + let ttid = ttid_res?; + yield (ttid, tenant_manifest_arc.clone()); + } + }) } }); - let timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY)); - let timelines = timelines.try_flatten(); let mut summary = GcSummary::default(); - - // Drain futures for per-shard GC, populating accumulator as a side effect { - let timelines = timelines.map_ok(|summary_or_ttid| match summary_or_ttid { - GcSummaryOrContent::Content((ttid, tenant_manifest_arc)) => { - futures::future::Either::Left(gc_timeline( - &remote_client, - &min_age, - &target, - mode, - ttid, - &accumulator, - tenant_manifest_arc, - )) - } - GcSummaryOrContent::GcSummary(gc_summary) => { - futures::future::Either::Right(futures::future::ok(gc_summary)) - } + let timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY)); + let timelines = timelines.try_flatten(); + + let timelines = timelines.map_ok(|(ttid, tenant_manifest_arc)| { + gc_timeline( + &remote_client, + &min_age, + &target, + mode, + ttid, + &accumulator, + tenant_manifest_arc, + ) }); let mut timelines = std::pin::pin!(timelines.try_buffered(CONCURRENCY)); + // Drain futures for per-shard GC, populating accumulator as a side effect while let Some(i) = timelines.next().await { summary.merge(i?); } } + // Streams are lazily evaluated, so only now do we have access to the inner object + summary.merge(manifest_gc_summary.into_inner().unwrap()); // Execute cross-shard GC, using the accumulator's full view of all the shards built in the per-shard GC let Some(client) = controller_client else { @@ -813,8 +808,7 @@ pub async fn pageserver_physical_gc( return Ok(summary); }; - let (ancestor_shards, ancestor_refs) = Arc::into_inner(accumulator) - .unwrap() + let (ancestor_shards, ancestor_refs) = accumulator .into_inner() .unwrap() .into_gc_ancestors(client, &mut summary)