From 14d495ae141fbde57146e8e5afad696565e1cbbf Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 16 Jun 2023 13:23:55 +0200 Subject: [PATCH 01/10] create_delta_layer: improve misleading TODO comment (#4488) Context: https://github.com/neondatabase/neon/pull/4441#discussion_r1228086608 --- pageserver/src/tenant/timeline.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index d42fdf5e55..13705d8b85 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3015,16 +3015,19 @@ impl Timeline { // Sync it to disk. // // We must also fsync the timeline dir to ensure the directory entries for - // new layer files are durable + // new layer files are durable. + // + // NB: timeline dir must be synced _after_ the file contents are durable. + // So, two separate fsyncs are required, they mustn't be batched. // // TODO: If we're running inside 'flush_frozen_layers' and there are multiple - // files to flush, it might be better to first write them all, and then fsync - // them all in parallel. - - // First sync the delta layer. We still use par_fsync here to keep everything consistent. Feel free to replace - // this with a single fsync in future refactors. + // files to flush, the fsync overhead can be reduces as follows: + // 1. write them all to temporary file names + // 2. fsync them + // 3. rename to the final name + // 4. fsync the parent directory. + // Note that (1),(2),(3) today happen inside write_to_disk(). par_fsync::par_fsync(&[new_delta_path.clone()]).context("fsync of delta layer")?; - // Then sync the parent directory. par_fsync::par_fsync(&[self_clone .conf .timeline_path(&self_clone.timeline_id, &self_clone.tenant_id)]) From 190c3ba6109710f53f70d36c1cdae5891b007f7a Mon Sep 17 00:00:00 2001 From: Alexander Bayandin Date: Fri, 16 Jun 2023 14:17:37 +0100 Subject: [PATCH 02/10] Add tags for releases (#4524) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Problem It's not a trivial task to find corresponding changes for a particular release (for example, for 3371 — 🤷) Ref: https://neondb.slack.com/archives/C04BLQ4LW7K/p1686761537607649?thread_ts=1686736854.174559&cid=C04BLQ4LW7K ## Summary of changes - Tag releases - Add a manual trigger for the release workflow --- .github/workflows/build_and_test.yml | 14 ++++++++++++++ .github/workflows/release.yml | 1 + 2 files changed, 15 insertions(+) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 471dc68df9..5f82ab7aca 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -914,6 +914,20 @@ jobs: exit 1 fi + - name: Create tag "release-${{ needs.tag.outputs.build-tag }}" + if: github.ref_name == 'release' + uses: actions/github-script@v6 + with: + # Retry script for 5XX server errors: https://github.com/actions/github-script#retries + retries: 5 + script: | + github.rest.git.createRef({ + owner: context.repo.owner, + repo: context.repo.repo, + ref: "refs/tags/release-${{ needs.tag.outputs.build-tag }}", + sha: context.sha, + }) + promote-compatibility-data: runs-on: [ self-hosted, gen3, small ] container: diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 4bce9cdd1e..595ee05514 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -3,6 +3,7 @@ name: Create Release Branch on: schedule: - cron: '0 10 * * 2' + workflow_dispatch: jobs: create_release_branch: From 78082d0b9fdfdb8a4c328ad1b2082f840dc0968f Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 16 Jun 2023 16:54:41 +0200 Subject: [PATCH 03/10] create_delta_layer: avoid needless `stat` (#4489) We already do it inside `frozen_layer.write_to_disk()`. Context: https://github.com/neondatabase/neon/pull/4441#discussion_r1228083959 --- pageserver/src/tenant/timeline.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 13705d8b85..ef7474cb8b 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3003,7 +3003,7 @@ impl Timeline { frozen_layer: &Arc, ) -> anyhow::Result<(LayerFileName, LayerFileMetadata)> { let span = tracing::info_span!("blocking"); - let (new_delta, sz): (DeltaLayer, _) = tokio::task::spawn_blocking({ + let new_delta: DeltaLayer = tokio::task::spawn_blocking({ let _g = span.entered(); let self_clone = Arc::clone(self); let frozen_layer = Arc::clone(frozen_layer); @@ -3027,20 +3027,19 @@ impl Timeline { // 3. rename to the final name // 4. fsync the parent directory. // Note that (1),(2),(3) today happen inside write_to_disk(). - par_fsync::par_fsync(&[new_delta_path.clone()]).context("fsync of delta layer")?; + par_fsync::par_fsync(&[new_delta_path]).context("fsync of delta layer")?; par_fsync::par_fsync(&[self_clone .conf .timeline_path(&self_clone.timeline_id, &self_clone.tenant_id)]) .context("fsync of timeline dir")?; - let sz = new_delta_path.metadata()?.len(); - - anyhow::Ok((new_delta, sz)) + anyhow::Ok(new_delta) } }) .await .context("spawn_blocking")??; let new_delta_name = new_delta.filename(); + let sz = new_delta.desc.file_size; // Add it to the layer map let l = Arc::new(new_delta); @@ -3054,9 +3053,8 @@ impl Timeline { batch_updates.insert_historic(l.layer_desc().clone(), l); batch_updates.flush(); - // update the timeline's physical size - self.metrics.resident_physical_size_gauge.add(sz); // update metrics + self.metrics.resident_physical_size_gauge.add(sz); self.metrics.num_persistent_files_created.inc_by(1); self.metrics.persistent_bytes_written.inc_by(sz); From 1b947fc8aff5b3cad6f8a057372860491ec58ab6 Mon Sep 17 00:00:00 2001 From: Alexander Bayandin Date: Fri, 16 Jun 2023 18:08:11 +0100 Subject: [PATCH 04/10] test_runner: workaround rerunfailures and timeout incompatibility (#4469) ## Problem `pytest-timeout` and `pytest-rerunfailures` are incompatible (or rather not fully compatible). Timeouts aren't set for reruns. Ref https://github.com/pytest-dev/pytest-rerunfailures/issues/99 ## Summary of changes - Dynamically make timeouts `func_only` for tests that we're going to retry. It applies timeouts for reruns as well. --- test_runner/fixtures/flaky.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/test_runner/fixtures/flaky.py b/test_runner/fixtures/flaky.py index 9d7f8ead9a..d13f3318b0 100644 --- a/test_runner/fixtures/flaky.py +++ b/test_runner/fixtures/flaky.py @@ -1,6 +1,6 @@ import json from pathlib import Path -from typing import List +from typing import Any, List, MutableMapping, cast import pytest from _pytest.config import Config @@ -56,3 +56,15 @@ def pytest_collection_modifyitems(config: Config, items: List[pytest.Item]): # Rerun 3 times = 1 original run + 2 reruns log.info(f"Marking {item.nodeid} as flaky. It will be rerun up to 3 times") item.add_marker(pytest.mark.flaky(reruns=2)) + + # pytest-rerunfailures is not compatible with pytest-timeout (timeout is not set for reruns), + # we can workaround it by setting `timeout_func_only` to True[1]. + # Unfortunately, setting `timeout_func_only = True` globally in pytest.ini is broken[2], + # but we still can do it using pytest marker. + # + # - [1] https://github.com/pytest-dev/pytest-rerunfailures/issues/99 + # - [2] https://github.com/pytest-dev/pytest-timeout/issues/142 + timeout_marker = item.get_closest_marker("timeout") + if timeout_marker is not None: + kwargs = cast(MutableMapping[str, Any], timeout_marker.kwargs) + kwargs["func_only"] = True From 3b06a5bc54a2ef7b5ec8f3fee24556547310586f Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Mon, 19 Jun 2023 14:04:16 +0400 Subject: [PATCH 05/10] Raise pageserver walreceiver timeouts. I observe sporadic reconnections with ~10k idle computes. It looks like a separate issue, probably walreceiver runtime gets blocked somewhere, but in any case 2-3 seconds is too small. --- pageserver/src/tenant/config.rs | 4 ++-- test_runner/regress/test_wal_receiver.py | 9 ++++++++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/pageserver/src/tenant/config.rs b/pageserver/src/tenant/config.rs index 80d153661a..ffe2c5eab6 100644 --- a/pageserver/src/tenant/config.rs +++ b/pageserver/src/tenant/config.rs @@ -38,8 +38,8 @@ pub mod defaults { pub const DEFAULT_GC_PERIOD: &str = "1 hr"; pub const DEFAULT_IMAGE_CREATION_THRESHOLD: usize = 3; pub const DEFAULT_PITR_INTERVAL: &str = "7 days"; - pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "2 seconds"; - pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "3 seconds"; + pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "10 seconds"; + pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "10 seconds"; pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 10 * 1024 * 1024; pub const DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD: &str = "24 hour"; } diff --git a/test_runner/regress/test_wal_receiver.py b/test_runner/regress/test_wal_receiver.py index 515d47c079..7ac6e6332c 100644 --- a/test_runner/regress/test_wal_receiver.py +++ b/test_runner/regress/test_wal_receiver.py @@ -1,3 +1,5 @@ +import time + from fixtures.log_helper import log from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder from fixtures.types import Lsn, TenantId @@ -40,7 +42,10 @@ def test_pageserver_lsn_wait_error_start(neon_env_builder: NeonEnvBuilder): # Kills one of the safekeepers and ensures that only the active ones are printed in the state. def test_pageserver_lsn_wait_error_safekeeper_stop(neon_env_builder: NeonEnvBuilder): # Trigger WAL wait timeout faster - neon_env_builder.pageserver_config_override = "wait_lsn_timeout = '1s'" + neon_env_builder.pageserver_config_override = """ + wait_lsn_timeout = "1s" + tenant_config={walreceiver_connect_timeout = "2s", lagging_wal_timeout = "2s"} + """ # Have notable SK ids to ensure we check logs for their presence, not some other random numbers neon_env_builder.safekeepers_id_start = 12345 neon_env_builder.num_safekeepers = 3 @@ -70,6 +75,8 @@ def test_pageserver_lsn_wait_error_safekeeper_stop(neon_env_builder: NeonEnvBuil stopped_safekeeper_id = stopped_safekeeper.id log.info(f"Stopping safekeeper {stopped_safekeeper.id}") stopped_safekeeper.stop() + # sleep until stopped safekeeper is removed from candidates + time.sleep(2) # Spend some more time inserting, to ensure SKs report updated statuses and walreceiver in PS have time to update its connection stats. insert_test_elements(env, tenant_id, start=elements_to_insert + 1, count=elements_to_insert) From 557abc18f36506c001c62c22ffa13ee11610d6c7 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Mon, 19 Jun 2023 13:27:14 +0400 Subject: [PATCH 06/10] Fix test_s3_wal_replay assertion flakiness. Supposedly fixes https://github.com/neondatabase/neon/issues/4277 --- test_runner/regress/test_wal_acceptor.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index a837501678..994858edf7 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -575,11 +575,21 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Re pg_version = sk.http_client().timeline_status(tenant_id, timeline_id).pg_version + # Terminate first all safekeepers to prevent communication unexpectantly + # advancing peer_horizon_lsn. for sk in env.safekeepers: cli = sk.http_client() cli.timeline_delete_force(tenant_id, timeline_id) # restart safekeeper to clear its in-memory state - sk.stop().start() + sk.stop() + # wait all potenital in flight pushes to broker arrive before starting + # safekeepers (even without sleep, it is very unlikely they are not + # delivered yet). + time.sleep(1) + + for sk in env.safekeepers: + sk.start() + cli = sk.http_client() cli.timeline_create(tenant_id, timeline_id, pg_version, last_lsn) f_partial_path = ( Path(sk.data_dir()) / str(tenant_id) / str(timeline_id) / f_partial_saved.name From 036fda392ff9e864aeb2d5a9528d85a8c388d590 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 19 Jun 2023 16:25:57 +0200 Subject: [PATCH 07/10] log timings for compact_level0_phase1 (#4527) The data will help decide whether it's ok to keep holding Timeline::layers in shared mode until after we've calculated the holes. Other timings are to understand the general breakdown of timings in that function. Context: https://github.com/neondatabase/neon/issues/4492 --- pageserver/src/tenant/timeline.rs | 161 ++++++++++++++++++++++++++++++ 1 file changed, 161 insertions(+) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index ef7474cb8b..de786da322 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -15,6 +15,7 @@ use pageserver_api::models::{ TimelineState, }; use remote_storage::GenericRemoteStorage; +use serde_with::serde_as; use storage_broker::BrokerClientChannel; use tokio::sync::{oneshot, watch, Semaphore, TryAcquireError}; use tokio_util::sync::CancellationToken; @@ -3333,6 +3334,130 @@ impl From for CompactionError { } } +#[serde_as] +#[derive(serde::Serialize)] +struct RecordedDuration(#[serde_as(as = "serde_with::DurationMicroSeconds")] Duration); + +#[derive(Default)] +enum DurationRecorder { + #[default] + NotStarted, + Recorded(RecordedDuration, tokio::time::Instant), +} + +impl DurationRecorder { + pub fn till_now(&self) -> DurationRecorder { + match self { + DurationRecorder::NotStarted => { + panic!("must only call on recorded measurements") + } + DurationRecorder::Recorded(_, ended) => { + let now = tokio::time::Instant::now(); + DurationRecorder::Recorded(RecordedDuration(now - *ended), now) + } + } + } + pub fn into_recorded(self) -> Option { + match self { + DurationRecorder::NotStarted => None, + DurationRecorder::Recorded(recorded, _) => Some(recorded), + } + } +} + +#[derive(Default)] +struct CompactLevel0Phase1StatsBuilder { + version: Option, + tenant_id: Option, + timeline_id: Option, + first_read_lock_acquisition_micros: DurationRecorder, + get_level0_deltas_plus_drop_lock_micros: DurationRecorder, + level0_deltas_count: Option, + time_spent_between_locks: DurationRecorder, + second_read_lock_acquisition_micros: DurationRecorder, + second_read_lock_held_micros: DurationRecorder, + sort_holes_micros: DurationRecorder, + write_layer_files_micros: DurationRecorder, + new_deltas_count: Option, + new_deltas_size: Option, +} + +#[serde_as] +#[derive(serde::Serialize)] +struct CompactLevel0Phase1Stats { + version: u64, + #[serde_as(as = "serde_with::DisplayFromStr")] + tenant_id: TenantId, + #[serde_as(as = "serde_with::DisplayFromStr")] + timeline_id: TimelineId, + first_read_lock_acquisition_micros: RecordedDuration, + get_level0_deltas_plus_drop_lock_micros: RecordedDuration, + level0_deltas_count: usize, + time_spent_between_locks: RecordedDuration, + second_read_lock_acquisition_micros: RecordedDuration, + second_read_lock_held_micros: RecordedDuration, + sort_holes_micros: RecordedDuration, + write_layer_files_micros: RecordedDuration, + new_deltas_count: usize, + new_deltas_size: u64, +} + +impl TryFrom for CompactLevel0Phase1Stats { + type Error = anyhow::Error; + + fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result { + let CompactLevel0Phase1StatsBuilder { + version, + tenant_id, + timeline_id, + first_read_lock_acquisition_micros, + get_level0_deltas_plus_drop_lock_micros, + level0_deltas_count, + time_spent_between_locks, + second_read_lock_acquisition_micros, + second_read_lock_held_micros, + sort_holes_micros, + write_layer_files_micros, + new_deltas_count, + new_deltas_size, + } = value; + Ok(CompactLevel0Phase1Stats { + version: version.ok_or_else(|| anyhow::anyhow!("version not set"))?, + tenant_id: tenant_id.ok_or_else(|| anyhow::anyhow!("tenant_id not set"))?, + timeline_id: timeline_id.ok_or_else(|| anyhow::anyhow!("timeline_id not set"))?, + first_read_lock_acquisition_micros: first_read_lock_acquisition_micros + .into_recorded() + .ok_or_else(|| anyhow::anyhow!("first_read_lock_acquisition_micros not set"))?, + get_level0_deltas_plus_drop_lock_micros: get_level0_deltas_plus_drop_lock_micros + .into_recorded() + .ok_or_else(|| { + anyhow::anyhow!("get_level0_deltas_plus_drop_lock_micros not set") + })?, + level0_deltas_count: level0_deltas_count + .ok_or_else(|| anyhow::anyhow!("level0_deltas_count not set"))?, + time_spent_between_locks: time_spent_between_locks + .into_recorded() + .ok_or_else(|| anyhow::anyhow!("time_spent_between_locks not set"))?, + second_read_lock_acquisition_micros: second_read_lock_acquisition_micros + .into_recorded() + .ok_or_else(|| anyhow::anyhow!("second_read_lock_acquisition_micros not set"))?, + second_read_lock_held_micros: second_read_lock_held_micros + .into_recorded() + .ok_or_else(|| anyhow::anyhow!("second_read_lock_held_micros not set"))?, + sort_holes_micros: sort_holes_micros + .into_recorded() + .ok_or_else(|| anyhow::anyhow!("sort_holes_micros not set"))?, + write_layer_files_micros: write_layer_files_micros + .into_recorded() + .ok_or_else(|| anyhow::anyhow!("write_layer_files_micros not set"))?, + new_deltas_count: new_deltas_count + .ok_or_else(|| anyhow::anyhow!("new_deltas_count not set"))?, + new_deltas_size: new_deltas_size + .ok_or_else(|| anyhow::anyhow!("new_deltas_size not set"))?, + }) + } +} + impl Timeline { /// Level0 files first phase of compaction, explained in the [`compact_inner`] comment. /// @@ -3345,9 +3470,23 @@ impl Timeline { target_file_size: u64, ctx: &RequestContext, ) -> Result { + let mut stats = CompactLevel0Phase1StatsBuilder { + version: Some(1), + tenant_id: Some(self.tenant_id), + timeline_id: Some(self.timeline_id), + ..Default::default() + }; + + let begin = tokio::time::Instant::now(); let layers = self.layers.read().await; + let now = tokio::time::Instant::now(); + stats.first_read_lock_acquisition_micros = + DurationRecorder::Recorded(RecordedDuration(now - begin), now); let mut level0_deltas = layers.get_level0_deltas()?; drop(layers); + stats.level0_deltas_count = Some(level0_deltas.len()); + stats.get_level0_deltas_plus_drop_lock_micros = + stats.first_read_lock_acquisition_micros.till_now(); // Only compact if enough layers have accumulated. let threshold = self.get_compaction_threshold(); @@ -3468,7 +3607,9 @@ impl Timeline { // Determine N largest holes where N is number of compacted layers. let max_holes = deltas_to_compact.len(); let last_record_lsn = self.get_last_record_lsn(); + stats.time_spent_between_locks = stats.get_level0_deltas_plus_drop_lock_micros.till_now(); let layers = self.layers.read().await; // Is'n it better to hold original layers lock till here? + stats.second_read_lock_acquisition_micros = stats.time_spent_between_locks.till_now(); let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128; let min_hole_coverage_size = 3; // TODO: something more flexible? @@ -3502,9 +3643,11 @@ impl Timeline { prev = Some(next_key.next()); } drop(layers); + stats.second_read_lock_held_micros = stats.second_read_lock_acquisition_micros.till_now(); let mut holes = heap.into_vec(); holes.sort_unstable_by_key(|hole| hole.key_range.start); let mut next_hole = 0; // index of next hole in holes vector + stats.sort_holes_micros = stats.second_read_lock_held_micros.till_now(); // Merge the contents of all the input delta layers into a new set // of delta layers, based on the current partitioning. @@ -3664,8 +3807,26 @@ impl Timeline { layer_paths.pop().unwrap(); } + stats.write_layer_files_micros = stats.sort_holes_micros.till_now(); + stats.new_deltas_count = Some(new_layers.len()); + stats.new_deltas_size = Some(new_layers.iter().map(|l| l.desc.file_size).sum()); + drop(all_keys_iter); // So that deltas_to_compact is no longer borrowed + match TryInto::::try_into(stats) + .and_then(|stats| serde_json::to_string(&stats).context("serde_json::to_string")) + { + Ok(stats_json) => { + info!( + stats_json = stats_json.as_str(), + "compact_level0_phase1 stats available" + ) + } + Err(e) => { + warn!("compact_level0_phase1 stats failed to serialize: {:#}", e); + } + } + Ok(CompactLevel0Phase1Result { new_layers, deltas_to_compact, From 2023e22ed3620b050473b5171c064df5bc1ba7aa Mon Sep 17 00:00:00 2001 From: Alek Westover Date: Mon, 19 Jun 2023 13:14:20 -0400 Subject: [PATCH 08/10] Add `RelationError` error type to pageserver rather than string parsing error messages (#4508) --- pageserver/src/import_datadir.rs | 12 +++--- pageserver/src/pgdatadir_mapping.rs | 58 ++++++++++++++++++----------- pageserver/src/walingest.rs | 7 +++- 3 files changed, 48 insertions(+), 29 deletions(-) diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 9ad0124a80..5bff5337bd 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -148,17 +148,17 @@ async fn import_rel( // because there is no guarantee about the order in which we are processing segments. // ignore "relation already exists" error // - // FIXME: use proper error type for this, instead of parsing the error message. - // Or better yet, keep track of which relations we've already created + // FIXME: Keep track of which relations we've already created? // https://github.com/neondatabase/neon/issues/3309 if let Err(e) = modification .put_rel_creation(rel, nblocks as u32, ctx) .await { - if e.to_string().contains("already exists") { - debug!("relation {} already exists. we must be extending it", rel); - } else { - return Err(e); + match e { + RelationError::AlreadyExists => { + debug!("Relation {} already exist. We must be extending it.", rel) + } + _ => return Err(e.into()), } } diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 86c84ec82f..998c199ba6 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -43,6 +43,16 @@ pub enum CalculateLogicalSizeError { Other(#[from] anyhow::Error), } +#[derive(Debug, thiserror::Error)] +pub enum RelationError { + #[error("Relation Already Exists")] + AlreadyExists, + #[error("invalid relnode")] + InvalidRelnode, + #[error(transparent)] + Other(#[from] anyhow::Error), +} + /// /// This impl provides all the functionality to store PostgreSQL relations, SLRUs, /// and other special kinds of files, in a versioned key-value store. The @@ -101,9 +111,9 @@ impl Timeline { ctx: &RequestContext, ) -> Result { if tag.relnode == 0 { - return Err(PageReconstructError::Other(anyhow::anyhow!( - "invalid relnode" - ))); + return Err(PageReconstructError::Other( + RelationError::InvalidRelnode.into(), + )); } let nblocks = self.get_rel_size(tag, lsn, latest, ctx).await?; @@ -148,9 +158,9 @@ impl Timeline { ctx: &RequestContext, ) -> Result { if tag.relnode == 0 { - return Err(PageReconstructError::Other(anyhow::anyhow!( - "invalid relnode" - ))); + return Err(PageReconstructError::Other( + RelationError::InvalidRelnode.into(), + )); } if let Some(nblocks) = self.get_cached_rel_size(&tag, lsn) { @@ -193,9 +203,9 @@ impl Timeline { ctx: &RequestContext, ) -> Result { if tag.relnode == 0 { - return Err(PageReconstructError::Other(anyhow::anyhow!( - "invalid relnode" - ))); + return Err(PageReconstructError::Other( + RelationError::InvalidRelnode.into(), + )); } // first try to lookup relation in cache @@ -724,7 +734,7 @@ impl<'a> DatadirModification<'a> { blknum: BlockNumber, rec: NeonWalRecord, ) -> anyhow::Result<()> { - anyhow::ensure!(rel.relnode != 0, "invalid relnode"); + anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode); self.put(rel_block_to_key(rel, blknum), Value::WalRecord(rec)); Ok(()) } @@ -751,7 +761,7 @@ impl<'a> DatadirModification<'a> { blknum: BlockNumber, img: Bytes, ) -> anyhow::Result<()> { - anyhow::ensure!(rel.relnode != 0, "invalid relnode"); + anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode); self.put(rel_block_to_key(rel, blknum), Value::Image(img)); Ok(()) } @@ -875,32 +885,38 @@ impl<'a> DatadirModification<'a> { rel: RelTag, nblocks: BlockNumber, ctx: &RequestContext, - ) -> anyhow::Result<()> { - anyhow::ensure!(rel.relnode != 0, "invalid relnode"); + ) -> Result<(), RelationError> { + if rel.relnode == 0 { + return Err(RelationError::AlreadyExists); + } // It's possible that this is the first rel for this db in this // tablespace. Create the reldir entry for it if so. - let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await?)?; + let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?) + .context("deserialize db")?; let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode); let mut rel_dir = if dbdir.dbdirs.get(&(rel.spcnode, rel.dbnode)).is_none() { // Didn't exist. Update dbdir dbdir.dbdirs.insert((rel.spcnode, rel.dbnode), false); - let buf = DbDirectory::ser(&dbdir)?; + let buf = DbDirectory::ser(&dbdir).context("serialize db")?; self.put(DBDIR_KEY, Value::Image(buf.into())); // and create the RelDirectory RelDirectory::default() } else { // reldir already exists, fetch it - RelDirectory::des(&self.get(rel_dir_key, ctx).await?)? + RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?) + .context("deserialize db")? }; // Add the new relation to the rel directory entry, and write it back if !rel_dir.rels.insert((rel.relnode, rel.forknum)) { - anyhow::bail!("rel {rel} already exists"); + return Err(RelationError::AlreadyExists); } self.put( rel_dir_key, - Value::Image(Bytes::from(RelDirectory::ser(&rel_dir)?)), + Value::Image(Bytes::from( + RelDirectory::ser(&rel_dir).context("serialize")?, + )), ); // Put size @@ -925,7 +941,7 @@ impl<'a> DatadirModification<'a> { nblocks: BlockNumber, ctx: &RequestContext, ) -> anyhow::Result<()> { - anyhow::ensure!(rel.relnode != 0, "invalid relnode"); + anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode); let last_lsn = self.tline.get_last_record_lsn(); if self.tline.get_rel_exists(rel, last_lsn, true, ctx).await? { let size_key = rel_size_to_key(rel); @@ -956,7 +972,7 @@ impl<'a> DatadirModification<'a> { nblocks: BlockNumber, ctx: &RequestContext, ) -> anyhow::Result<()> { - anyhow::ensure!(rel.relnode != 0, "invalid relnode"); + anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode); // Put size let size_key = rel_size_to_key(rel); @@ -977,7 +993,7 @@ impl<'a> DatadirModification<'a> { /// Drop a relation. pub async fn put_rel_drop(&mut self, rel: RelTag, ctx: &RequestContext) -> anyhow::Result<()> { - anyhow::ensure!(rel.relnode != 0, "invalid relnode"); + anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode); // Remove it from the directory entry let dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode); diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 68cf2a4645..8d4c1842bd 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -25,7 +25,7 @@ use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes; use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment; use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn}; -use anyhow::Result; +use anyhow::{Context, Result}; use bytes::{Buf, Bytes, BytesMut}; use tracing::*; @@ -1082,7 +1082,10 @@ impl<'a> WalIngest<'a> { .await? { // create it with 0 size initially, the logic below will extend it - modification.put_rel_creation(rel, 0, ctx).await?; + modification + .put_rel_creation(rel, 0, ctx) + .await + .context("Relation Error")?; 0 } else { self.timeline.get_rel_size(rel, last_lsn, true, ctx).await? From 90e1f629e8ab6780ff3e2c25d03f822dd35062b6 Mon Sep 17 00:00:00 2001 From: bojanserafimov Date: Tue, 20 Jun 2023 11:38:59 -0400 Subject: [PATCH 09/10] Add test for `skip_pg_catalog_updates` (#4530) --- control_plane/src/endpoint.rs | 9 ++++++++- test_runner/fixtures/neon_fixtures.py | 11 +++++++++++ test_runner/performance/test_startup.py | 10 +++++++++- 3 files changed, 28 insertions(+), 2 deletions(-) diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index d3131ac476..52683ff1c3 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -67,6 +67,7 @@ pub struct EndpointConf { pg_port: u16, http_port: u16, pg_version: u32, + skip_pg_catalog_updates: bool, } // @@ -135,6 +136,7 @@ impl ComputeControlPlane { mode, tenant_id, pg_version, + skip_pg_catalog_updates: false, }); ep.create_endpoint_dir()?; @@ -148,6 +150,7 @@ impl ComputeControlPlane { http_port, pg_port, pg_version, + skip_pg_catalog_updates: false, })?, )?; std::fs::write( @@ -183,6 +186,9 @@ pub struct Endpoint { // the endpoint runs in. pub env: LocalEnv, pageserver: Arc, + + // Optimizations + skip_pg_catalog_updates: bool, } impl Endpoint { @@ -216,6 +222,7 @@ impl Endpoint { mode: conf.mode, tenant_id: conf.tenant_id, pg_version: conf.pg_version, + skip_pg_catalog_updates: conf.skip_pg_catalog_updates, }) } @@ -450,7 +457,7 @@ impl Endpoint { // Create spec file let spec = ComputeSpec { - skip_pg_catalog_updates: false, + skip_pg_catalog_updates: self.skip_pg_catalog_updates, format_version: 1.0, operation_uuid: None, cluster: Cluster { diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 64c71d2a59..e56bf78019 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -2415,6 +2415,17 @@ class Endpoint(PgProtocol): return self + def respec(self, **kwargs): + """Update the endpoint.json file used by control_plane.""" + # Read config + config_path = os.path.join(self.endpoint_path(), "endpoint.json") + with open(config_path, "r") as f: + data_dict = json.load(f) + + # Write it back updated + with open(config_path, "w") as file: + json.dump(dict(data_dict, **kwargs), file, indent=4) + def stop(self) -> "Endpoint": """ Stop the Postgres instance if it's running. diff --git a/test_runner/performance/test_startup.py b/test_runner/performance/test_startup.py index 9c45088d62..8babbbe132 100644 --- a/test_runner/performance/test_startup.py +++ b/test_runner/performance/test_startup.py @@ -32,13 +32,18 @@ def test_startup_simple(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenc env.neon_cli.create_branch("test_startup") + endpoint = None + # We do two iterations so we can see if the second startup is faster. It should # be because the compute node should already be configured with roles, databases, # extensions, etc from the first run. for i in range(2): # Start with zenbenchmark.record_duration(f"{i}_start_and_select"): - endpoint = env.endpoints.create_start("test_startup") + if endpoint: + endpoint.start() + else: + endpoint = env.endpoints.create_start("test_startup") endpoint.safe_psql("select 1;") # Get metrics @@ -57,6 +62,9 @@ def test_startup_simple(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenc # Stop so we can restart endpoint.stop() + # Imitate optimizations that console would do for the second start + endpoint.respec(skip_pg_catalog_updates=True) + # This test sometimes runs for longer than the global 5 minute timeout. @pytest.mark.timeout(600) From b4c5beff9fee5980379b67607e2e3e3d6b6058b5 Mon Sep 17 00:00:00 2001 From: Alek Westover Date: Tue, 20 Jun 2023 15:36:28 -0400 Subject: [PATCH 10/10] `list_files` function in `remote_storage` (#4522) --- libs/remote_storage/src/lib.rs | 29 ++++ libs/remote_storage/src/local_fs.rs | 36 ++++ libs/remote_storage/src/s3_bucket.rs | 45 +++++ libs/remote_storage/src/simulate_failures.rs | 5 + libs/remote_storage/tests/test_real_s3.rs | 163 ++++++++++++++++++- 5 files changed, 277 insertions(+), 1 deletion(-) diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index ac1f8a357e..0e9c237e1e 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -70,6 +70,14 @@ impl RemotePath { pub fn join(&self, segment: &Path) -> Self { Self(self.0.join(segment)) } + + pub fn get_path(&self) -> &PathBuf { + &self.0 + } + + pub fn extension(&self) -> Option<&str> { + self.0.extension()?.to_str() + } } /// Storage (potentially remote) API to manage its state. @@ -86,6 +94,19 @@ pub trait RemoteStorage: Send + Sync + 'static { prefix: Option<&RemotePath>, ) -> Result, DownloadError>; + /// Lists all files in directory "recursively" + /// (not really recursively, because AWS has a flat namespace) + /// Note: This is subtely different than list_prefixes, + /// because it is for listing files instead of listing + /// names sharing common prefixes. + /// For example, + /// list_files("foo/bar") = ["foo/bar/cat123.txt", + /// "foo/bar/cat567.txt", "foo/bar/dog123.txt", "foo/bar/dog456.txt"] + /// whereas, + /// list_prefixes("foo/bar/") = ["cat", "dog"] + /// See `test_real_s3.rs` for more details. + async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result>; + /// Streams the local file contents into remote into the remote storage entry. async fn upload( &self, @@ -174,6 +195,14 @@ impl GenericRemoteStorage { } } + pub async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result> { + match self { + Self::LocalFs(s) => s.list_files(folder).await, + Self::AwsS3(s) => s.list_files(folder).await, + Self::Unreliable(s) => s.list_files(folder).await, + } + } + pub async fn upload( &self, from: impl io::AsyncRead + Unpin + Send + Sync + 'static, diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index 59304c2481..ca5fbd5de5 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -48,6 +48,14 @@ impl LocalFs { Ok(Self { storage_root }) } + // mirrors S3Bucket::s3_object_to_relative_path + fn local_file_to_relative_path(&self, key: PathBuf) -> RemotePath { + let relative_path = key + .strip_prefix(&self.storage_root) + .expect("relative path must contain storage_root as prefix"); + RemotePath(relative_path.into()) + } + async fn read_storage_metadata( &self, file_path: &Path, @@ -132,6 +140,34 @@ impl RemoteStorage for LocalFs { Ok(prefixes) } + // recursively lists all files in a directory, + // mirroring the `list_files` for `s3_bucket` + async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result> { + let full_path = match folder { + Some(folder) => folder.with_base(&self.storage_root), + None => self.storage_root.clone(), + }; + let mut files = vec![]; + let mut directory_queue = vec![full_path.clone()]; + + while !directory_queue.is_empty() { + let cur_folder = directory_queue + .pop() + .expect("queue cannot be empty: we just checked"); + let mut entries = fs::read_dir(cur_folder.clone()).await?; + while let Some(entry) = entries.next_entry().await? { + let file_name: PathBuf = entry.file_name().into(); + let full_file_name = cur_folder.clone().join(&file_name); + let file_remote_path = self.local_file_to_relative_path(full_file_name.clone()); + files.push(file_remote_path.clone()); + if full_file_name.is_dir() { + directory_queue.push(full_file_name); + } + } + } + Ok(files) + } + async fn upload( &self, data: impl io::AsyncRead + Unpin + Send + Sync + 'static, diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index dafb6dcb45..43d818dfb9 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -347,6 +347,51 @@ impl RemoteStorage for S3Bucket { Ok(document_keys) } + /// See the doc for `RemoteStorage::list_files` + async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result> { + let folder_name = folder + .map(|p| self.relative_path_to_s3_object(p)) + .or_else(|| self.prefix_in_bucket.clone()); + + // AWS may need to break the response into several parts + let mut continuation_token = None; + let mut all_files = vec![]; + loop { + let _guard = self + .concurrency_limiter + .acquire() + .await + .context("Concurrency limiter semaphore got closed during S3 list_files")?; + metrics::inc_list_objects(); + + let response = self + .client + .list_objects_v2() + .bucket(self.bucket_name.clone()) + .set_prefix(folder_name.clone()) + .set_continuation_token(continuation_token) + .set_max_keys(self.max_keys_per_list_response) + .send() + .await + .map_err(|e| { + metrics::inc_list_objects_fail(); + e + }) + .context("Failed to list files in S3 bucket")?; + + for object in response.contents().unwrap_or_default() { + let object_path = object.key().expect("response does not contain a key"); + let remote_path = self.s3_object_to_relative_path(object_path); + all_files.push(remote_path); + } + match response.next_continuation_token { + Some(new_token) => continuation_token = Some(new_token), + None => break, + } + } + Ok(all_files) + } + async fn upload( &self, from: impl io::AsyncRead + Unpin + Send + Sync + 'static, diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index 741c18bf6f..c46ca14ace 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -83,6 +83,11 @@ impl RemoteStorage for UnreliableWrapper { self.inner.list_prefixes(prefix).await } + async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result> { + self.attempt(RemoteOp::ListPrefixes(folder.cloned()))?; + self.inner.list_files(folder).await + } + async fn upload( &self, data: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static, diff --git a/libs/remote_storage/tests/test_real_s3.rs b/libs/remote_storage/tests/test_real_s3.rs index 5f52b0754c..6fe65a0362 100644 --- a/libs/remote_storage/tests/test_real_s3.rs +++ b/libs/remote_storage/tests/test_real_s3.rs @@ -88,6 +88,58 @@ async fn s3_pagination_should_work(ctx: &mut MaybeEnabledS3WithTestBlobs) -> any Ok(()) } +/// Tests that S3 client can list all files in a folder, even if the response comes paginated and requirees multiple S3 queries. +/// Uses real S3 and requires [`ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME`] and related S3 cred env vars specified. Test will skip real code and pass if env vars not set. +/// See `s3_pagination_should_work` for more information. +/// +/// First, create a set of S3 objects with keys `random_prefix/folder{j}/blob_{i}.txt` in [`upload_s3_data`] +/// Then performs the following queries: +/// 1. `list_files(None)`. This should return all files `random_prefix/folder{j}/blob_{i}.txt` +/// 2. `list_files("folder1")`. This should return all files `random_prefix/folder1/blob_{i}.txt` +#[test_context(MaybeEnabledS3WithSimpleTestBlobs)] +#[tokio::test] +async fn s3_list_files_works(ctx: &mut MaybeEnabledS3WithSimpleTestBlobs) -> anyhow::Result<()> { + let ctx = match ctx { + MaybeEnabledS3WithSimpleTestBlobs::Enabled(ctx) => ctx, + MaybeEnabledS3WithSimpleTestBlobs::Disabled => return Ok(()), + MaybeEnabledS3WithSimpleTestBlobs::UploadsFailed(e, _) => { + anyhow::bail!("S3 init failed: {e:?}") + } + }; + let test_client = Arc::clone(&ctx.enabled.client); + let base_prefix = + RemotePath::new(Path::new("folder1")).context("common_prefix construction")?; + let root_files = test_client + .list_files(None) + .await + .context("client list root files failure")? + .into_iter() + .collect::>(); + assert_eq!( + root_files, + ctx.remote_blobs.clone(), + "remote storage list_files on root mismatches with the uploads." + ); + let nested_remote_files = test_client + .list_files(Some(&base_prefix)) + .await + .context("client list nested files failure")? + .into_iter() + .collect::>(); + let trim_remote_blobs: HashSet<_> = ctx + .remote_blobs + .iter() + .map(|x| x.get_path().to_str().expect("must be valid name")) + .filter(|x| x.starts_with("folder1")) + .map(|x| RemotePath::new(Path::new(x)).expect("must be valid name")) + .collect(); + assert_eq!( + nested_remote_files, trim_remote_blobs, + "remote storage list_files on subdirrectory mismatches with the uploads." + ); + Ok(()) +} + #[test_context(MaybeEnabledS3)] #[tokio::test] async fn s3_delete_non_exising_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()> { @@ -248,6 +300,66 @@ impl AsyncTestContext for MaybeEnabledS3WithTestBlobs { } } +// NOTE: the setups for the list_prefixes test and the list_files test are very similar +// However, they are not idential. The list_prefixes function is concerned with listing prefixes, +// whereas the list_files function is concerned with listing files. +// See `RemoteStorage::list_files` documentation for more details +enum MaybeEnabledS3WithSimpleTestBlobs { + Enabled(S3WithSimpleTestBlobs), + Disabled, + UploadsFailed(anyhow::Error, S3WithSimpleTestBlobs), +} +struct S3WithSimpleTestBlobs { + enabled: EnabledS3, + remote_blobs: HashSet, +} + +#[async_trait::async_trait] +impl AsyncTestContext for MaybeEnabledS3WithSimpleTestBlobs { + async fn setup() -> Self { + ensure_logging_ready(); + if env::var(ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME).is_err() { + info!( + "`{}` env variable is not set, skipping the test", + ENABLE_REAL_S3_REMOTE_STORAGE_ENV_VAR_NAME + ); + return Self::Disabled; + } + + let max_keys_in_list_response = 10; + let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap()); + + let enabled = EnabledS3::setup(Some(max_keys_in_list_response)).await; + + match upload_simple_s3_data(&enabled.client, upload_tasks_count).await { + ControlFlow::Continue(uploads) => { + info!("Remote objects created successfully"); + + Self::Enabled(S3WithSimpleTestBlobs { + enabled, + remote_blobs: uploads, + }) + } + ControlFlow::Break(uploads) => Self::UploadsFailed( + anyhow::anyhow!("One or multiple blobs failed to upload to S3"), + S3WithSimpleTestBlobs { + enabled, + remote_blobs: uploads, + }, + ), + } + } + + async fn teardown(self) { + match self { + Self::Disabled => {} + Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => { + cleanup(&ctx.enabled.client, ctx.remote_blobs).await; + } + } + } +} + fn create_s3_client( max_keys_per_list_response: Option, ) -> anyhow::Result> { @@ -258,7 +370,7 @@ fn create_s3_client( let random_prefix_part = std::time::SystemTime::now() .duration_since(UNIX_EPOCH) .context("random s3 test prefix part calculation")? - .as_millis(); + .as_nanos(); let remote_storage_config = RemoteStorageConfig { max_concurrent_syncs: NonZeroUsize::new(100).unwrap(), max_sync_errors: NonZeroU32::new(5).unwrap(), @@ -364,3 +476,52 @@ async fn cleanup(client: &Arc, objects_to_delete: HashSet< } } } + +// Uploads files `folder{j}/blob{i}.txt`. See test description for more details. +async fn upload_simple_s3_data( + client: &Arc, + upload_tasks_count: usize, +) -> ControlFlow, HashSet> { + info!("Creating {upload_tasks_count} S3 files"); + let mut upload_tasks = JoinSet::new(); + for i in 1..upload_tasks_count + 1 { + let task_client = Arc::clone(client); + upload_tasks.spawn(async move { + let blob_path = PathBuf::from(format!("folder{}/blob_{}.txt", i / 7, i)); + let blob_path = RemotePath::new(&blob_path) + .with_context(|| format!("{blob_path:?} to RemotePath conversion"))?; + debug!("Creating remote item {i} at path {blob_path:?}"); + + let data = format!("remote blob data {i}").into_bytes(); + let data_len = data.len(); + task_client + .upload(std::io::Cursor::new(data), data_len, &blob_path, None) + .await?; + + Ok::<_, anyhow::Error>(blob_path) + }); + } + + let mut upload_tasks_failed = false; + let mut uploaded_blobs = HashSet::with_capacity(upload_tasks_count); + while let Some(task_run_result) = upload_tasks.join_next().await { + match task_run_result + .context("task join failed") + .and_then(|task_result| task_result.context("upload task failed")) + { + Ok(upload_path) => { + uploaded_blobs.insert(upload_path); + } + Err(e) => { + error!("Upload task failed: {e:?}"); + upload_tasks_failed = true; + } + } + } + + if upload_tasks_failed { + ControlFlow::Break(uploaded_blobs) + } else { + ControlFlow::Continue(uploaded_blobs) + } +}