Show more S3 logs and less verbove WAL logs

This commit is contained in:
Kirill Bulatov
2022-03-28 22:33:05 +03:00
committed by Kirill Bulatov
parent 1aa57fc262
commit 0e44887929
4 changed files with 33 additions and 20 deletions

View File

@@ -41,7 +41,7 @@ pub mod defaults {
pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s";
pub const DEFAULT_SUPERUSER: &str = "zenith_admin";
pub const DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNC: usize = 100;
pub const DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNC: usize = 10;
pub const DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS: u32 = 10;
pub const DEFAULT_PAGE_CACHE_SIZE: usize = 8192;

View File

@@ -1594,7 +1594,7 @@ impl LayeredTimeline {
self.compact_level0(target_file_size)?;
timer.stop_and_record();
} else {
info!("Could not compact because no partitioning specified yet");
debug!("Could not compact because no partitioning specified yet");
}
// Call unload() on all frozen layers, to release memory.

View File

@@ -443,30 +443,38 @@ fn storage_sync_loop<
max_sync_errors: NonZeroU32,
) {
let remote_assets = Arc::new((storage, index.clone()));
info!("Starting remote storage sync loop");
loop {
let index = index.clone();
let loop_step = runtime.block_on(async {
tokio::select! {
new_timeline_states = loop_step(
step = loop_step(
conf,
&mut receiver,
Arc::clone(&remote_assets),
max_concurrent_sync,
max_sync_errors,
)
.instrument(debug_span!("storage_sync_loop_step")) => LoopStep::SyncStatusUpdates(new_timeline_states),
.instrument(debug_span!("storage_sync_loop_step")) => step,
_ = thread_mgr::shutdown_watcher() => LoopStep::Shutdown,
}
});
match loop_step {
LoopStep::SyncStatusUpdates(new_timeline_states) => {
// Batch timeline download registration to ensure that the external registration code won't block any running tasks before.
apply_timeline_sync_status_updates(conf, index, new_timeline_states);
debug!("Sync loop step completed");
if new_timeline_states.is_empty() {
debug!("Sync loop step completed, no new timeline states");
} else {
info!(
"Sync loop step completed, {} new timeline state update(s)",
new_timeline_states.len()
);
// Batch timeline download registration to ensure that the external registration code won't block any running tasks before.
apply_timeline_sync_status_updates(conf, index, new_timeline_states);
}
}
LoopStep::Shutdown => {
debug!("Shutdown requested, stopping");
info!("Shutdown requested, stopping");
break;
}
}
@@ -482,7 +490,7 @@ async fn loop_step<
remote_assets: Arc<(S, RemoteIndex)>,
max_concurrent_sync: NonZeroUsize,
max_sync_errors: NonZeroU32,
) -> HashMap<ZTenantId, HashMap<ZTimelineId, TimelineSyncStatusUpdate>> {
) -> LoopStep {
let max_concurrent_sync = max_concurrent_sync.get();
let mut next_tasks = Vec::new();
@@ -490,8 +498,7 @@ async fn loop_step<
if let Some(first_task) = sync_queue::next_task(receiver).await {
next_tasks.push(first_task);
} else {
debug!("Shutdown requested, stopping");
return HashMap::new();
return LoopStep::Shutdown;
};
next_tasks.extend(
sync_queue::next_task_batch(receiver, max_concurrent_sync - 1)
@@ -500,12 +507,17 @@ async fn loop_step<
);
let remaining_queue_length = sync_queue::len();
debug!(
"Processing {} tasks in batch, more tasks left to process: {}",
next_tasks.len(),
remaining_queue_length
);
REMAINING_SYNC_ITEMS.set(remaining_queue_length as i64);
if remaining_queue_length > 0 || !next_tasks.is_empty() {
info!(
"Processing {} tasks in batch, more tasks left to process: {}",
next_tasks.len(),
remaining_queue_length
);
} else {
debug!("No tasks to process");
return LoopStep::SyncStatusUpdates(HashMap::new());
}
let mut task_batch = next_tasks
.into_iter()
@@ -515,8 +527,9 @@ async fn loop_step<
let sync_name = task.kind.sync_name();
let extra_step = match tokio::spawn(
process_task(conf, Arc::clone(&remote_assets), task, max_sync_errors)
.instrument(debug_span!("", sync_id = %sync_id, attempt, sync_name)),
process_task(conf, Arc::clone(&remote_assets), task, max_sync_errors).instrument(
debug_span!("process_sync_task", sync_id = %sync_id, attempt, sync_name),
),
)
.await
{
@@ -551,7 +564,7 @@ async fn loop_step<
}
}
new_timeline_states
LoopStep::SyncStatusUpdates(new_timeline_states)
}
async fn process_task<

View File

@@ -70,7 +70,7 @@ pub fn launch_wal_receiver(
match receivers.get_mut(&(tenantid, timelineid)) {
Some(receiver) => {
info!("wal receiver already running, updating connection string");
debug!("wal receiver already running, updating connection string");
receiver.wal_producer_connstr = wal_producer_connstr.into();
}
None => {