From 0e44887929daa9851fb0c6239d1011c41cde04b8 Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Mon, 28 Mar 2022 22:33:05 +0300 Subject: [PATCH] Show more S3 logs and less verbove WAL logs --- pageserver/src/config.rs | 2 +- pageserver/src/layered_repository.rs | 2 +- pageserver/src/remote_storage/storage_sync.rs | 47 ++++++++++++------- pageserver/src/walreceiver.rs | 2 +- 4 files changed, 33 insertions(+), 20 deletions(-) diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 0fdfb4ceed..9f7cd34a7a 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -41,7 +41,7 @@ pub mod defaults { pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s"; pub const DEFAULT_SUPERUSER: &str = "zenith_admin"; - pub const DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNC: usize = 100; + pub const DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNC: usize = 10; pub const DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS: u32 = 10; pub const DEFAULT_PAGE_CACHE_SIZE: usize = 8192; diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index a0f1f2d830..56d14fd4e9 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -1594,7 +1594,7 @@ impl LayeredTimeline { self.compact_level0(target_file_size)?; timer.stop_and_record(); } else { - info!("Could not compact because no partitioning specified yet"); + debug!("Could not compact because no partitioning specified yet"); } // Call unload() on all frozen layers, to release memory. diff --git a/pageserver/src/remote_storage/storage_sync.rs b/pageserver/src/remote_storage/storage_sync.rs index ddd47ea981..cd6c40b46f 100644 --- a/pageserver/src/remote_storage/storage_sync.rs +++ b/pageserver/src/remote_storage/storage_sync.rs @@ -443,30 +443,38 @@ fn storage_sync_loop< max_sync_errors: NonZeroU32, ) { let remote_assets = Arc::new((storage, index.clone())); + info!("Starting remote storage sync loop"); loop { let index = index.clone(); let loop_step = runtime.block_on(async { tokio::select! { - new_timeline_states = loop_step( + step = loop_step( conf, &mut receiver, Arc::clone(&remote_assets), max_concurrent_sync, max_sync_errors, ) - .instrument(debug_span!("storage_sync_loop_step")) => LoopStep::SyncStatusUpdates(new_timeline_states), + .instrument(debug_span!("storage_sync_loop_step")) => step, _ = thread_mgr::shutdown_watcher() => LoopStep::Shutdown, } }); match loop_step { LoopStep::SyncStatusUpdates(new_timeline_states) => { - // Batch timeline download registration to ensure that the external registration code won't block any running tasks before. - apply_timeline_sync_status_updates(conf, index, new_timeline_states); - debug!("Sync loop step completed"); + if new_timeline_states.is_empty() { + debug!("Sync loop step completed, no new timeline states"); + } else { + info!( + "Sync loop step completed, {} new timeline state update(s)", + new_timeline_states.len() + ); + // Batch timeline download registration to ensure that the external registration code won't block any running tasks before. + apply_timeline_sync_status_updates(conf, index, new_timeline_states); + } } LoopStep::Shutdown => { - debug!("Shutdown requested, stopping"); + info!("Shutdown requested, stopping"); break; } } @@ -482,7 +490,7 @@ async fn loop_step< remote_assets: Arc<(S, RemoteIndex)>, max_concurrent_sync: NonZeroUsize, max_sync_errors: NonZeroU32, -) -> HashMap> { +) -> LoopStep { let max_concurrent_sync = max_concurrent_sync.get(); let mut next_tasks = Vec::new(); @@ -490,8 +498,7 @@ async fn loop_step< if let Some(first_task) = sync_queue::next_task(receiver).await { next_tasks.push(first_task); } else { - debug!("Shutdown requested, stopping"); - return HashMap::new(); + return LoopStep::Shutdown; }; next_tasks.extend( sync_queue::next_task_batch(receiver, max_concurrent_sync - 1) @@ -500,12 +507,17 @@ async fn loop_step< ); let remaining_queue_length = sync_queue::len(); - debug!( - "Processing {} tasks in batch, more tasks left to process: {}", - next_tasks.len(), - remaining_queue_length - ); REMAINING_SYNC_ITEMS.set(remaining_queue_length as i64); + if remaining_queue_length > 0 || !next_tasks.is_empty() { + info!( + "Processing {} tasks in batch, more tasks left to process: {}", + next_tasks.len(), + remaining_queue_length + ); + } else { + debug!("No tasks to process"); + return LoopStep::SyncStatusUpdates(HashMap::new()); + } let mut task_batch = next_tasks .into_iter() @@ -515,8 +527,9 @@ async fn loop_step< let sync_name = task.kind.sync_name(); let extra_step = match tokio::spawn( - process_task(conf, Arc::clone(&remote_assets), task, max_sync_errors) - .instrument(debug_span!("", sync_id = %sync_id, attempt, sync_name)), + process_task(conf, Arc::clone(&remote_assets), task, max_sync_errors).instrument( + debug_span!("process_sync_task", sync_id = %sync_id, attempt, sync_name), + ), ) .await { @@ -551,7 +564,7 @@ async fn loop_step< } } - new_timeline_states + LoopStep::SyncStatusUpdates(new_timeline_states) } async fn process_task< diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index e382475627..6de0b87478 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -70,7 +70,7 @@ pub fn launch_wal_receiver( match receivers.get_mut(&(tenantid, timelineid)) { Some(receiver) => { - info!("wal receiver already running, updating connection string"); + debug!("wal receiver already running, updating connection string"); receiver.wal_producer_connstr = wal_producer_connstr.into(); } None => {