From 23cf2fa9848436ded35eee5ca8e31f691fd9903e Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Fri, 31 Dec 2021 00:45:08 +0200 Subject: [PATCH] Properly shutdown storage sync loop --- pageserver/src/bin/pageserver.rs | 4 +- pageserver/src/branches.rs | 18 +++--- pageserver/src/remote_storage.rs | 5 +- pageserver/src/remote_storage/storage_sync.rs | 59 +++++++++++++------ .../remote_storage/storage_sync/download.rs | 2 +- pageserver/src/tenant_mgr.rs | 2 +- 6 files changed, 60 insertions(+), 30 deletions(-) diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index d3a78b0954..e9f8092a29 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -195,9 +195,10 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<() } let signals = signals::install_shutdown_handlers()?; + let (async_shutdown_tx, async_shutdown_rx) = tokio::sync::watch::channel(()); let mut threads = Vec::new(); - let sync_startup = remote_storage::start_local_timeline_sync(conf) + let sync_startup = remote_storage::start_local_timeline_sync(conf, async_shutdown_rx) .context("Failed to set up local files sync with external storage")?; if let Some(handle) = sync_startup.sync_loop_handle { @@ -255,6 +256,7 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<() signal.name() ); + async_shutdown_tx.send(())?; postgres_backend::set_pgbackend_shutdown_requested(); tenant_mgr::shutdown_all_tenants()?; endpoint::shutdown(); diff --git a/pageserver/src/branches.rs b/pageserver/src/branches.rs index f757431bed..32652f3b02 100644 --- a/pageserver/src/branches.rs +++ b/pageserver/src/branches.rs @@ -45,14 +45,16 @@ impl BranchInfo { repo: &Arc, include_non_incremental_logical_size: bool, ) -> Result { - let name = path - .as_ref() - .file_name() - .unwrap() - .to_str() - .unwrap() - .to_string(); - let timeline_id = std::fs::read_to_string(path)?.parse::()?; + let path = path.as_ref(); + let name = path.file_name().unwrap().to_string_lossy().to_string(); + let timeline_id = std::fs::read_to_string(path) + .with_context(|| { + format!( + "Failed to read branch file contents at path '{}'", + path.display() + ) + })? + .parse::()?; let timeline = match repo.get_timeline(timeline_id)? { RepositoryTimeline::Local(local_entry) => local_entry, diff --git a/pageserver/src/remote_storage.rs b/pageserver/src/remote_storage.rs index 366ed16761..e56ea789ae 100644 --- a/pageserver/src/remote_storage.rs +++ b/pageserver/src/remote_storage.rs @@ -93,7 +93,7 @@ use std::{ }; use anyhow::{bail, Context}; -use tokio::io; +use tokio::{io, sync}; use tracing::{error, info}; use zenith_utils::zid::{ZTenantId, ZTimelineId}; @@ -135,6 +135,7 @@ pub struct SyncStartupData { /// Along with that, scans tenant files local and remote (if the sync gets enabled) to check the initial timeline states. pub fn start_local_timeline_sync( config: &'static PageServerConf, + shutdown_hook: sync::watch::Receiver<()>, ) -> anyhow::Result { let local_timeline_files = local_tenant_timeline_files(config) .context("Failed to collect local tenant timeline files")?; @@ -142,6 +143,7 @@ pub fn start_local_timeline_sync( match &config.remote_storage_config { Some(storage_config) => match &storage_config.storage { RemoteStorageKind::LocalFs(root) => storage_sync::spawn_storage_sync_thread( + shutdown_hook, config, local_timeline_files, LocalFs::new(root.clone(), &config.workdir)?, @@ -149,6 +151,7 @@ pub fn start_local_timeline_sync( storage_config.max_sync_errors, ), RemoteStorageKind::AwsS3(s3_config) => storage_sync::spawn_storage_sync_thread( + shutdown_hook, config, local_timeline_files, S3::new(s3_config, &config.workdir)?, diff --git a/pageserver/src/remote_storage/storage_sync.rs b/pageserver/src/remote_storage/storage_sync.rs index 5a52eaf29b..f89964e4bf 100644 --- a/pageserver/src/remote_storage/storage_sync.rs +++ b/pageserver/src/remote_storage/storage_sync.rs @@ -86,10 +86,15 @@ use std::{ use anyhow::{bail, Context}; use futures::stream::{FuturesUnordered, StreamExt}; use lazy_static::lazy_static; -use tokio::{fs, sync::RwLock}; use tokio::{ - sync::mpsc::{self, UnboundedReceiver}, - time::Instant, + fs, + runtime::Runtime, + sync::{ + mpsc::{self, UnboundedReceiver}, + watch::Receiver, + RwLock, + }, + time::{Duration, Instant}, }; use tracing::*; @@ -346,6 +351,7 @@ pub(super) fn spawn_storage_sync_thread< P: std::fmt::Debug + Send + Sync + 'static, S: RemoteStorage + Send + Sync + 'static, >( + shutdown_hook: Receiver<()>, conf: &'static PageServerConf, local_timeline_files: HashMap)>, storage: S, @@ -384,6 +390,7 @@ pub(super) fn spawn_storage_sync_thread< .spawn(move || { storage_sync_loop( runtime, + shutdown_hook, conf, receiver, remote_index, @@ -399,11 +406,18 @@ pub(super) fn spawn_storage_sync_thread< }) } +enum LoopStep { + NewStates(HashMap>), + Shutdown, +} + +#[allow(clippy::too_many_arguments)] fn storage_sync_loop< P: std::fmt::Debug + Send + Sync + 'static, S: RemoteStorage + Send + Sync + 'static, >( - runtime: tokio::runtime::Runtime, + runtime: Runtime, + mut shutdown_hook: Receiver<()>, conf: &'static PageServerConf, mut receiver: UnboundedReceiver, index: RemoteTimelineIndex, @@ -413,19 +427,28 @@ fn storage_sync_loop< ) -> anyhow::Result<()> { let remote_assets = Arc::new((storage, RwLock::new(index))); while !crate::tenant_mgr::shutdown_requested() { - let new_timeline_states = runtime.block_on( - loop_step( - conf, - &mut receiver, - Arc::clone(&remote_assets), - max_concurrent_sync, - max_sync_errors, - ) - .instrument(debug_span!("storage_sync_loop_step")), - ); - // Batch timeline download registration to ensure that the external registration code won't block any running tasks before. - set_timeline_states(conf, new_timeline_states); - debug!("Sync loop step completed"); + let loop_step = runtime.block_on(async { + tokio::select! { + new_timeline_states = loop_step( + conf, + &mut receiver, + Arc::clone(&remote_assets), + max_concurrent_sync, + max_sync_errors, + ) + .instrument(debug_span!("storage_sync_loop_step")) => LoopStep::NewStates(new_timeline_states), + _ = shutdown_hook.changed() => LoopStep::Shutdown, + } + }); + + match loop_step { + LoopStep::NewStates(new_timeline_states) => { + // Batch timeline download registration to ensure that the external registration code won't block any running tasks before. + set_timeline_states(conf, new_timeline_states); + debug!("Sync loop step completed"); + } + LoopStep::Shutdown => {} + } } debug!("Shutdown requested, stopping"); @@ -539,7 +562,7 @@ async fn process_task< "Waiting {} seconds before starting the task", seconds_to_wait ); - tokio::time::sleep(tokio::time::Duration::from_secs_f64(seconds_to_wait)).await; + tokio::time::sleep(Duration::from_secs_f64(seconds_to_wait)).await; } let sync_start = Instant::now(); diff --git a/pageserver/src/remote_storage/storage_sync/download.rs b/pageserver/src/remote_storage/storage_sync/download.rs index a067bddd23..a2d3f7e4ad 100644 --- a/pageserver/src/remote_storage/storage_sync/download.rs +++ b/pageserver/src/remote_storage/storage_sync/download.rs @@ -202,7 +202,7 @@ async fn try_download_archive< archive_to_download.disk_consistent_lsn(), local_metadata.disk_consistent_lsn() ), - Err(e) => warn!("Failed to read local metadata file, assuing it's safe to override its with the download. Read: {:#}", e), + Err(e) => warn!("Failed to read local metadata file, assuming it's safe to override its with the download. Read: {:#}", e), } compression::uncompress_file_stream_with_index( conf.timeline_path(&timeline_id, &tenant_id), diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index 884ef09886..42a97cfe84 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -198,7 +198,7 @@ pub fn get_repository_for_tenant(tenantid: ZTenantId) -> Result Ok(Arc::clone(repo)), - None => anyhow::bail!("Repository for tenant {} is not yet valid", tenantid), + None => bail!("Repository for tenant {} is not yet valid", tenantid), } }