From 5cff7d1de9862d8a826c4454d997a193ffa26ba6 Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Mon, 13 Dec 2021 23:36:03 +0200 Subject: [PATCH] Use proper download order --- Cargo.lock | 31 ++++++++++ pageserver/Cargo.toml | 1 + pageserver/src/layered_repository.rs | 26 +++++---- pageserver/src/remote_storage.rs | 2 +- pageserver/src/remote_storage/storage_sync.rs | 56 ++++++++++++------- .../remote_storage/storage_sync/download.rs | 35 ++++++++---- 6 files changed, 107 insertions(+), 44 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0df9930b80..c4e130c3ac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1164,6 +1164,7 @@ dependencies = [ "tokio", "toml", "tracing", + "tracing-futures", "url", "workspace_hack", "zenith_metrics", @@ -1236,6 +1237,26 @@ dependencies = [ "siphasher", ] +[[package]] +name = "pin-project" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "576bc800220cc65dac09e99e97b08b358cfab6e17078de8dc5fee223bd2d0c08" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e8fe8163d14ce7f0cdac2e040116f22eac817edabff0be91e8aff7e9accf389" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.7" @@ -2113,6 +2134,16 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "tracing-futures" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" +dependencies = [ + "pin-project", + "tracing", +] + [[package]] name = "tracing-log" version = "0.1.2" diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 64f825ded5..571d8938a2 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -35,6 +35,7 @@ scopeguard = "1.1.0" async-trait = "0.1" const_format = "0.2.21" tracing = "0.1.27" +tracing-futures = "0.2" signal-hook = "0.3.10" url = "2" nix = "0.23" diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index e6b72bcd0b..8d5c9acf70 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -343,19 +343,23 @@ impl Repository for LayeredRepository { fn shutdown_timeline( tenant_id: ZTenantId, - timelineid: ZTimelineId, + timeline_id: ZTimelineId, timeline: &LayeredTimelineEntry, ) -> Result<(), anyhow::Error> { - if let Some(timeline) = timeline.local_or_schedule_download(tenant_id) { - timeline - .upload_relishes - .store(false, atomic::Ordering::Relaxed); - walreceiver::stop_wal_receiver(timelineid); - trace!("repo shutdown. checkpoint timeline {}", timelineid); - timeline.checkpoint(CheckpointConfig::Forced)?; - //TODO Wait for walredo process to shutdown too - } else { - warn!("Skpping shutdown of a remote timeline"); + match timeline { + LayeredTimelineEntry::Local(timeline) => { + timeline + .upload_relishes + .store(false, atomic::Ordering::Relaxed); + walreceiver::stop_wal_receiver(timeline_id); + trace!("repo shutdown. checkpoint timeline {}", timeline_id); + timeline.checkpoint(CheckpointConfig::Forced)?; + //TODO Wait for walredo process to shutdown too + } + LayeredTimelineEntry::Remote(_) => warn!( + "Skipping shutdown of a remote timeline {} for tenant {}", + timeline_id, tenant_id + ), } Ok(()) } diff --git a/pageserver/src/remote_storage.rs b/pageserver/src/remote_storage.rs index 54661b3f00..5e3e987186 100644 --- a/pageserver/src/remote_storage.rs +++ b/pageserver/src/remote_storage.rs @@ -112,7 +112,7 @@ pub struct TimelineSyncId(ZTenantId, ZTimelineId); impl std::fmt::Display for TimelineSyncId { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "(tenant id: {}, timeline id: {})", self.0, self.1) + write!(f, "(tenant: {}, timeline: {})", self.0, self.1) } } diff --git a/pageserver/src/remote_storage/storage_sync.rs b/pageserver/src/remote_storage/storage_sync.rs index 5ddf872b82..b1c878e760 100644 --- a/pageserver/src/remote_storage/storage_sync.rs +++ b/pageserver/src/remote_storage/storage_sync.rs @@ -135,7 +135,7 @@ lazy_static! { /// mpsc approach was picked to allow blocking the sync loop if no tasks are present, to avoid meaningless spinning. mod sync_queue { use std::{ - collections::BTreeSet, + collections::{BTreeSet, HashMap}, sync::atomic::{AtomicUsize, Ordering}, }; @@ -199,17 +199,16 @@ mod sync_queue { receiver: &mut UnboundedReceiver, mut max_batch_size: usize, ) -> BTreeSet { - let mut tasks = BTreeSet::new(); - if max_batch_size == 0 { - return tasks; + return BTreeSet::new(); } + let mut tasks = HashMap::with_capacity(max_batch_size); loop { match receiver.try_recv() { Ok(new_task) => { LENGTH.fetch_sub(1, Ordering::Relaxed); - if tasks.insert(new_task) { + if tasks.insert(new_task.sync_id, new_task).is_none() { max_batch_size -= 1; if max_batch_size == 0 { break; @@ -227,7 +226,7 @@ mod sync_queue { } } - tasks + tasks.into_values().collect() } /// Length of the queue, assuming that all receiver counterparts were only called using the queue api. @@ -264,6 +263,15 @@ enum SyncKind { Upload(NewCheckpoint), } +impl SyncKind { + fn sync_name(&self) -> &'static str { + match self { + Self::Download(_) => "download", + Self::Upload(_) => "upload", + } + } +} + /// Local timeline files for upload, appeared after the new checkpoint. /// Current checkpoint design assumes new files are added only, no deletions or amendment happens. #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)] @@ -405,15 +413,19 @@ fn storage_sync_loop< ) -> anyhow::Result<()> { let remote_assets = Arc::new((storage, RwLock::new(index))); while !crate::tenant_mgr::shutdown_requested() { - let new_timeline_states = runtime.block_on(loop_step( - conf, - &mut receiver, - Arc::clone(&remote_assets), - max_concurrent_sync, - max_sync_errors, - )); + let new_timeline_states = runtime.block_on( + loop_step( + conf, + &mut receiver, + Arc::clone(&remote_assets), + max_concurrent_sync, + max_sync_errors, + ) + .instrument(debug_span!("storage_sync_loop_step")), + ); // Batch timeline download registration to ensure that the external registration code won't block any running tasks before. set_timeline_states(conf, new_timeline_states); + debug!("Sync loop step completed"); } debug!("Shutdown requested, stopping"); @@ -458,12 +470,13 @@ async fn loop_step< .into_iter() .map(|task| async { let sync_id = task.sync_id; - let extra_step = match tokio::spawn(process_task( - conf, - Arc::clone(&remote_assets), - task, - max_sync_errors, - )) + let attempt = task.retries; + let sync_name = task.kind.sync_name(); + + let extra_step = match tokio::spawn( + process_task(conf, Arc::clone(&remote_assets), task, max_sync_errors) + .instrument(debug_span!("", sync_id = %sync_id, attempt, sync_name)), + ) .await { Ok(extra_step) => extra_step, @@ -522,6 +535,7 @@ async fn process_task< } let sync_start = Instant::now(); + let sync_name = task.kind.sync_name(); match task.kind { SyncKind::Download(download_data) => { let sync_status = download_timeline( @@ -532,7 +546,7 @@ async fn process_task< task.retries + 1, ) .await; - register_sync_status(sync_start, "download", sync_status); + register_sync_status(sync_start, sync_name, sync_status); if sync_status? { Some(TimelineSyncState::Ready) @@ -549,7 +563,7 @@ async fn process_task< task.retries + 1, ) .await; - register_sync_status(sync_start, "upload", sync_status); + register_sync_status(sync_start, sync_name, sync_status); None } } diff --git a/pageserver/src/remote_storage/storage_sync/download.rs b/pageserver/src/remote_storage/storage_sync/download.rs index c8fd2ee2d1..8b7af821ed 100644 --- a/pageserver/src/remote_storage/storage_sync/download.rs +++ b/pageserver/src/remote_storage/storage_sync/download.rs @@ -6,7 +6,7 @@ use std::{borrow::Cow, collections::BTreeSet, path::PathBuf, sync::Arc}; use anyhow::{anyhow, ensure, Context}; use futures::{stream::FuturesUnordered, StreamExt}; use tokio::{fs, sync::RwLock}; -use tracing::{debug, error, warn}; +use tracing::{debug, error, trace, warn}; use zenith_utils::zid::ZTenantId; use crate::{ @@ -91,7 +91,8 @@ pub(super) async fn download_timeline< } }; - let mut archives_to_download = remote_timeline + debug!("Downloading timeline archives"); + let archives_to_download = remote_timeline .checkpoints() .map(ArchiveId) .filter(|remote_archive| !download.archives_to_skip.contains(remote_archive)) @@ -99,8 +100,9 @@ pub(super) async fn download_timeline< let archives_total = archives_to_download.len(); debug!("Downloading {} archives of a timeline", archives_total); + trace!("Archives to download: {:?}", archives_to_download); - while let Some(archive_id) = archives_to_download.pop() { + for (archives_downloaded, archive_id) in archives_to_download.into_iter().enumerate() { match try_download_archive( conf, sync_id, @@ -112,10 +114,10 @@ pub(super) async fn download_timeline< .await { Err(e) => { - let archives_left = archives_to_download.len(); + let archives_left = archives_total - archives_downloaded; error!( - "Failed to download archive {:?} for tenant {} timeline {} : {:#}, requeueing the download ({} archives left out of {})", - archive_id, tenant_id, timeline_id, e, archives_left, archives_total + "Failed to download archive {:?} (archives downloaded: {}; archives left: {}) for tenant {} timeline {}, requeueing the download: {:#}", + archive_id, archives_downloaded, archives_left, tenant_id, timeline_id, e ); sync_queue::push(SyncTask::new( sync_id, @@ -302,13 +304,9 @@ mod tests { #[tokio::test] async fn test_download_timeline() -> anyhow::Result<()> { - let tempdir = tempdir()?; - let tempdir_path = tempdir.path(); - let _ = zenith_utils::logging::init(tempdir_path.join("log.log"), false); - let repo_harness = RepoHarness::create("test_download_timeline")?; let sync_id = TimelineSyncId(repo_harness.tenant_id, TIMELINE_ID); - let storage = LocalFs::new(tempdir_path.to_owned(), &repo_harness.conf.workdir)?; + let storage = LocalFs::new(tempdir()?.path().to_owned(), &repo_harness.conf.workdir)?; let index = RwLock::new(RemoteTimelineIndex::try_parse_descriptions_from_paths( repo_harness.conf, storage @@ -335,6 +333,21 @@ mod tests { regular_timeline, ) .await; + // upload multiple checkpoints for the same timeline + let regular_timeline = create_local_timeline( + &repo_harness, + TIMELINE_ID, + &["c", "d"], + dummy_metadata(Lsn(0x40)), + )?; + ensure_correct_timeline_upload( + &repo_harness, + Arc::clone(&remote_assets), + TIMELINE_ID, + regular_timeline, + ) + .await; + fs::remove_dir_all(®ular_timeline_path).await?; let remote_regular_timeline = expect_timeline(index, sync_id).await;