Show more logs during S3 sync

This commit is contained in:
Kirill Bulatov
2022-04-20 00:46:29 +03:00
committed by Kirill Bulatov
parent 3e6087a12f
commit 91fb21225a
3 changed files with 83 additions and 128 deletions

View File

@@ -165,10 +165,7 @@ mod sync_queue {
if let Some(sender) = SENDER.get() {
match sender.send((sync_id, new_task)) {
Err(e) => {
warn!(
"Failed to enqueue a sync task: the receiver is dropped: {}",
e
);
warn!("Failed to enqueue a sync task: the receiver is dropped: {e}");
false
}
Ok(()) => {
@@ -429,15 +426,9 @@ pub fn schedule_timeline_checkpoint_upload(
metadata,
}),
) {
warn!(
"Could not send an upload task for tenant {}, timeline {}",
tenant_id, timeline_id
)
warn!("Could not send an upload task for tenant {tenant_id}, timeline {timeline_id}",)
} else {
debug!(
"Upload task for tenant {}, timeline {} sent",
tenant_id, timeline_id
)
debug!("Upload task for tenant {tenant_id}, timeline {timeline_id} sent")
}
}
@@ -449,10 +440,7 @@ pub fn schedule_timeline_checkpoint_upload(
///
/// Ensure that the loop is started otherwise the task is never processed.
pub fn schedule_timeline_download(tenant_id: ZTenantId, timeline_id: ZTimelineId) {
debug!(
"Scheduling timeline download for tenant {}, timeline {}",
tenant_id, timeline_id
);
debug!("Scheduling timeline download for tenant {tenant_id}, timeline {timeline_id}");
sync_queue::push(
ZTenantTimelineId {
tenant_id,
@@ -614,11 +602,7 @@ where
let remaining_queue_length = sync_queue::len();
REMAINING_SYNC_ITEMS.set(remaining_queue_length as i64);
if remaining_queue_length > 0 || !batched_tasks.is_empty() {
info!(
"Processing tasks for {} timelines in batch, more tasks left to process: {}",
batched_tasks.len(),
remaining_queue_length
);
info!("Processing tasks for {} timelines in batch, more tasks left to process: {remaining_queue_length}", batched_tasks.len());
} else {
debug!("No tasks to process");
return ControlFlow::Continue(HashMap::new());
@@ -644,7 +628,7 @@ where
HashMap<ZTimelineId, TimelineSyncStatusUpdate>,
> = HashMap::with_capacity(max_concurrent_sync);
while let Some((sync_id, state_update)) = sync_results.next().await {
debug!("Finished storage sync task for sync id {}", sync_id);
debug!("Finished storage sync task for sync id {sync_id}");
if let Some(state_update) = state_update {
new_timeline_states
.entry(sync_id.tenant_id)
@@ -693,7 +677,7 @@ where
)
.await
{
error!("Failed to update remote timeline {}: {:?}", sync_id, e);
error!("Failed to update remote timeline {sync_id}: {e:?}");
}
}
SyncTask::DownloadAndUpload(_, failed_upload_data) => {
@@ -712,7 +696,7 @@ where
)
.await
{
error!("Failed to update remote timeline {}: {:?}", sync_id, e);
error!("Failed to update remote timeline {sync_id}: {e:?}");
}
}
}
@@ -720,18 +704,17 @@ where
}
};
let task_name = task.name();
let current_task_attempt = task.retries();
info!("Sync task '{task_name}' processing started, attempt #{current_task_attempt}");
if current_task_attempt > 0 {
let seconds_to_wait = 2.0_f64.powf(current_task_attempt as f64 - 1.0).min(30.0);
debug!(
"Waiting {} seconds before starting the task",
seconds_to_wait
);
info!("Waiting {seconds_to_wait} seconds before starting the '{task_name}' task");
tokio::time::sleep(Duration::from_secs_f64(seconds_to_wait)).await;
}
let task_name = task.name();
match task {
let status_update = match task {
SyncTask::Download(new_download_data) => {
download_timeline(
conf,
@@ -782,7 +765,11 @@ where
status_update
}
}
};
info!("Finished processing the task");
status_update
}
async fn download_timeline<P, S>(
@@ -804,10 +791,7 @@ where
DownloadedTimeline::Abort => {
register_sync_status(sync_start, task_name, None);
if let Err(e) = index.write().await.set_awaits_download(&sync_id, false) {
error!(
"Timeline {} was expected to be in the remote index after a download attempt, but it's absent: {:?}",
sync_id, e
);
error!("Timeline {sync_id} was expected to be in the remote index after a download attempt, but it's absent: {e:?}");
}
None
}
@@ -823,15 +807,12 @@ where
Some(TimelineSyncStatusUpdate::Downloaded)
}
Err(e) => {
error!(
"Timeline {} was expected to be in the remote index after a sucessful download, but it's absent: {:?}",
sync_id, e
);
error!("Timeline {sync_id} was expected to be in the remote index after a sucessful download, but it's absent: {e:?}");
None
}
},
Err(e) => {
error!("Failed to update local timeline metadata: {:?}", e);
error!("Failed to update local timeline metadata: {e:?}");
download_data.retries += 1;
sync_queue::push(sync_id, SyncTask::Download(download_data));
register_sync_status(sync_start, task_name, Some(false));
@@ -873,10 +854,7 @@ async fn update_local_metadata(
};
if local_lsn < Some(remote_lsn) {
info!(
"Updating local timeline metadata from remote timeline: local disk_consistent_lsn={:?}, remote disk_consistent_lsn={}",
local_lsn, remote_lsn
);
info!("Updating local timeline metadata from remote timeline: local disk_consistent_lsn={local_lsn:?}, remote disk_consistent_lsn={remote_lsn}");
let remote_metadata_bytes = remote_metadata
.to_bytes()
@@ -890,7 +868,7 @@ async fn update_local_metadata(
)
})?;
} else {
info!("Local metadata at path '{}' has later disk consistent Lsn ({:?}) than the remote one ({}), skipping the update", local_metadata_path.display(), local_lsn, remote_lsn);
info!("Local metadata at path '{}' has later disk consistent Lsn ({local_lsn:?}) than the remote one ({remote_lsn}), skipping the update", local_metadata_path.display());
}
Ok(())
@@ -933,9 +911,8 @@ async fn upload_timeline<P, S>(
Ok(metadata) => metadata,
Err(e) => {
error!(
"Failed to load local metadata from path '{}': {:?}",
local_metadata_path.display(),
e
"Failed to load local metadata from path '{}': {e:?}",
local_metadata_path.display()
);
outdated_upload_data.retries += 1;
sync_queue::push(sync_id, SyncTask::Upload(outdated_upload_data));
@@ -952,7 +929,7 @@ async fn upload_timeline<P, S>(
match update_remote_data(conf, storage, index, sync_id, &uploaded_data.data, false).await {
Ok(()) => register_sync_status(sync_start, task_name, Some(true)),
Err(e) => {
error!("Failed to update remote timeline {}: {:?}", sync_id, e);
error!("Failed to update remote timeline {sync_id}: {e:?}");
uploaded_data.retries += 1;
sync_queue::push(sync_id, SyncTask::Upload(uploaded_data));
register_sync_status(sync_start, task_name, Some(false));
@@ -972,6 +949,7 @@ where
P: Debug + Send + Sync + 'static,
S: RemoteStorage<StoragePath = P> + Send + Sync + 'static,
{
info!("Updating remote index for the timeline");
let updated_remote_timeline = {
let mut index_accessor = index.write().await;
@@ -1012,6 +990,7 @@ where
IndexPart::from_remote_timeline(&timeline_path, updated_remote_timeline)
.context("Failed to create an index part from the updated remote timeline")?;
info!("Uploading remote data for the timeline");
upload_index_part(conf, storage, sync_id, new_index_part)
.await
.context("Failed to upload new index part")
@@ -1031,8 +1010,8 @@ fn validate_task_retries(
if download_data.retries > max_sync_errors =>
{
error!(
"Evicting download task for timeline {} that failed {} times, exceeding the error threshold {}",
sync_id, download_data.retries, max_sync_errors
"Evicting download task for timeline {sync_id} that failed {} times, exceeding the error threshold {max_sync_errors}",
download_data.retries
);
skip_download = true;
}
@@ -1040,9 +1019,9 @@ fn validate_task_retries(
if upload_data.retries > max_sync_errors =>
{
error!(
"Evicting upload task for timeline {} that failed {} times, exceeding the error threshold {}",
sync_id, upload_data.retries, max_sync_errors
);
"Evicting upload task for timeline {sync_id} that failed {} times, exceeding the error threshold {max_sync_errors}",
upload_data.retries,
);
skip_upload = true;
}
_ => {}
@@ -1083,10 +1062,10 @@ where
while let Some((id, part_upload_result)) = part_downloads.next().await {
match part_upload_result {
Ok(index_part) => {
debug!("Successfully fetched index part for {}", id);
debug!("Successfully fetched index part for {id}");
index_parts.insert(id, index_part);
}
Err(e) => warn!("Failed to fetch index part for {}: {:?}", id, e),
Err(e) => warn!("Failed to fetch index part for {id}: {e:?}"),
}
}
@@ -1120,8 +1099,8 @@ fn schedule_first_sync_tasks(
if was_there.is_some() {
// defensive check
warn!(
"Overwriting timeline init sync status. Status {:?} Timeline {}",
timeline_status, sync_id.timeline_id
"Overwriting timeline init sync status. Status {timeline_status:?}, timeline {}",
sync_id.timeline_id
);
}
remote_timeline.awaits_download = awaits_download;
@@ -1207,7 +1186,7 @@ fn compare_local_and_remote_timeline(
fn register_sync_status(sync_start: Instant, sync_name: &str, sync_status: Option<bool>) {
let secs_elapsed = sync_start.elapsed().as_secs_f64();
debug!("Processed a sync task in {} seconds", secs_elapsed);
info!("Processed a sync task in {secs_elapsed:.2} seconds");
match sync_status {
Some(true) => IMAGE_SYNC_TIME.with_label_values(&[sync_name, "success"]),
Some(false) => IMAGE_SYNC_TIME.with_label_values(&[sync_name, "failure"]),
@@ -1254,7 +1233,7 @@ mod test_utils {
}
pub fn dummy_contents(name: &str) -> String {
format!("contents for {}", name)
format!("contents for {name}")
}
pub fn dummy_metadata(disk_consistent_lsn: Lsn) -> TimelineMetadata {
@@ -1286,7 +1265,7 @@ mod tests {
let merged_download = match download_1.merge(download_2) {
SyncTask::Download(merged_download) => merged_download,
wrong_merge_result => panic!("Unexpected merge result: {:?}", wrong_merge_result),
wrong_merge_result => panic!("Unexpected merge result: {wrong_merge_result:?}"),
};
assert_eq!(
@@ -1334,7 +1313,7 @@ mod tests {
let merged_upload = match upload_1.merge(upload_2) {
SyncTask::Upload(merged_upload) => merged_upload,
wrong_merge_result => panic!("Unexpected merge result: {:?}", wrong_merge_result),
wrong_merge_result => panic!("Unexpected merge result: {wrong_merge_result:?}"),
};
assert_eq!(
@@ -1389,7 +1368,7 @@ mod tests {
SyncTask::DownloadAndUpload(merged_download, merged_upload) => {
(merged_download, merged_upload)
}
wrong_merge_result => panic!("Unexpected merge result: {:?}", wrong_merge_result),
wrong_merge_result => panic!("Unexpected merge result: {wrong_merge_result:?}"),
};
assert_eq!(
@@ -1440,7 +1419,7 @@ mod tests {
SyncTask::DownloadAndUpload(merged_download, merged_upload) => {
(merged_download, merged_upload)
}
wrong_merge_result => panic!("Unexpected merge result: {:?}", wrong_merge_result),
wrong_merge_result => panic!("Unexpected merge result: {wrong_merge_result:?}"),
};
assert_eq!(
@@ -1507,7 +1486,7 @@ mod tests {
SyncTask::DownloadAndUpload(merged_download, merged_upload) => {
(merged_download, merged_upload)
}
wrong_merge_result => panic!("Unexpected merge result: {:?}", wrong_merge_result),
wrong_merge_result => panic!("Unexpected merge result: {wrong_merge_result:?}"),
};
assert_eq!(
@@ -1577,7 +1556,7 @@ mod tests {
SyncTask::DownloadAndUpload(merged_download, merged_upload) => {
(merged_download, merged_upload)
}
wrong_merge_result => panic!("Unexpected merge result: {:?}", wrong_merge_result),
wrong_merge_result => panic!("Unexpected merge result: {wrong_merge_result:?}"),
};
assert_eq!(

View File

@@ -5,7 +5,7 @@ use std::fmt::Debug;
use anyhow::Context;
use futures::stream::{FuturesUnordered, StreamExt};
use tokio::fs;
use tracing::{debug, error, trace, warn};
use tracing::{debug, error, info, warn};
use crate::{
config::PageServerConf,
@@ -45,25 +45,16 @@ where
.download(&part_storage_path, &mut index_part_bytes)
.await
.with_context(|| {
format!(
"Failed to download an index part from storage path '{:?}'",
part_storage_path
)
format!("Failed to download an index part from storage path '{part_storage_path:?}'")
})?;
let index_part: IndexPart = serde_json::from_slice(&index_part_bytes).with_context(|| {
format!(
"Failed to deserialize index part file from storage path '{:?}'",
part_storage_path
)
format!("Failed to deserialize index part file from storage path '{part_storage_path:?}'")
})?;
let missing_files = index_part.missing_files();
if !missing_files.is_empty() {
warn!(
"Found missing layers in index part for timeline {}: {:?}",
sync_id, missing_files
);
warn!("Found missing layers in index part for timeline {sync_id}: {missing_files:?}");
}
Ok(index_part)
@@ -100,21 +91,17 @@ where
let remote_timeline = match remote_timeline {
Some(remote_timeline) => {
if !remote_timeline.awaits_download {
error!("Timeline with sync id {} is not awaiting download", sync_id);
error!("Timeline with sync id {sync_id} is not awaiting download");
return DownloadedTimeline::Abort;
}
remote_timeline
}
None => {
error!(
"Timeline with sync id {} is not present in the remote index",
sync_id
);
error!("Timeline with sync id {sync_id} is not present in the remote index");
return DownloadedTimeline::Abort;
}
};
debug!("Downloading timeline layers for sync id {}", sync_id);
let download = &mut download_data.data;
let layers_to_download = remote_timeline
@@ -123,7 +110,8 @@ where
.cloned()
.collect::<Vec<_>>();
trace!("Layers to download: {:?}", layers_to_download);
debug!("Layers to download: {layers_to_download:?}");
info!("Downloading {} timeline layers", layers_to_download.len());
let mut download_tasks = layers_to_download
.into_iter()
@@ -157,8 +145,7 @@ where
.await
.with_context(|| {
format!(
"Failed to download a layer from storage path '{:?}'",
layer_storage_path
"Failed to download a layer from storage path '{layer_storage_path:?}'"
)
})?;
}
@@ -166,8 +153,6 @@ where
})
.collect::<FuturesUnordered<_>>();
debug!("Downloading {} layers of a timeline", download_tasks.len());
let mut errors_happened = false;
while let Some(download_result) = download_tasks.next().await {
match download_result {
@@ -176,21 +161,18 @@ where
}
Err(e) => {
errors_happened = true;
error!(
"Failed to download a layer for timeline {}: {:?}",
sync_id, e
);
error!("Failed to download a layer for timeline {sync_id}: {e:?}");
}
}
}
if errors_happened {
debug!("Reenqueuing failed download task for timeline {}", sync_id);
debug!("Reenqueuing failed download task for timeline {sync_id}");
download_data.retries += 1;
sync_queue::push(sync_id, SyncTask::Download(download_data));
DownloadedTimeline::FailedAndRescheduled
} else {
debug!("Finished downloading all timeline's layers");
info!("Successfully downloaded all layers");
DownloadedTimeline::Successful(download_data)
}
}
@@ -266,10 +248,9 @@ mod tests {
.await
{
DownloadedTimeline::Successful(data) => data,
wrong_result => panic!(
"Expected a successful download for timeline, but got: {:?}",
wrong_result
),
wrong_result => {
panic!("Expected a successful download for timeline, but got: {wrong_result:?}")
}
};
assert_eq!(

View File

@@ -5,7 +5,7 @@ use std::{fmt::Debug, path::PathBuf};
use anyhow::Context;
use futures::stream::{FuturesUnordered, StreamExt};
use tokio::fs;
use tracing::{debug, error, trace, warn};
use tracing::{debug, error, info, warn};
use crate::{
config::PageServerConf,
@@ -53,10 +53,7 @@ where
)
.await
.with_context(|| {
format!(
"Failed to upload index part to the storage path '{:?}'",
index_part_storage_path
)
format!("Failed to upload index part to the storage path '{index_part_storage_path:?}'")
})
}
@@ -89,10 +86,6 @@ where
{
let upload = &mut upload_data.data;
let new_upload_lsn = upload.metadata.disk_consistent_lsn();
debug!(
"Uploading timeline layers for sync id {}, new lsn: {}",
sync_id, new_upload_lsn
);
let already_uploaded_layers = remote_timeline
.map(|timeline| timeline.stored_files())
@@ -105,7 +98,11 @@ where
.cloned()
.collect::<Vec<_>>();
trace!("Layers to upload: {:?}", layers_to_upload);
debug!("Layers to upload: {layers_to_upload:?}");
info!(
"Uploading {} timeline layers, new lsn: {new_upload_lsn}",
layers_to_upload.len(),
);
let mut upload_tasks = layers_to_upload
.into_iter()
@@ -157,8 +154,6 @@ where
})
.collect::<FuturesUnordered<_>>();
debug!("uploading {} layers of a timeline", upload_tasks.len());
let mut errors_happened = false;
let mut local_fs_updated = false;
while let Some(upload_result) = upload_tasks.next().await {
@@ -170,16 +165,19 @@ where
Err(e) => match e {
UploadError::Other(e) => {
errors_happened = true;
error!("Failed to upload a layer for timeline {}: {:?}", sync_id, e);
error!("Failed to upload a layer for timeline {sync_id}: {e:?}");
}
UploadError::MissingLocalFile(source_path, e) => {
if source_path.exists() {
errors_happened = true;
error!("Failed to upload a layer for timeline {}: {:?}", sync_id, e);
error!("Failed to upload a layer for timeline {sync_id}: {e:?}");
} else {
local_fs_updated = true;
upload.layers_to_upload.remove(&source_path);
warn!("Missing locally a layer file scheduled for upload, skipping");
warn!(
"Missing locally a layer file {} scheduled for upload, skipping",
source_path.display()
);
}
}
},
@@ -187,17 +185,16 @@ where
}
if errors_happened {
debug!("Reenqueuing failed upload task for timeline {}", sync_id);
debug!("Reenqueuing failed upload task for timeline {sync_id}");
upload_data.retries += 1;
sync_queue::push(sync_id, SyncTask::Upload(upload_data));
UploadedTimeline::FailedAndRescheduled
} else if local_fs_updated {
info!("Successfully uploaded all layers, some local layers were removed during the upload");
UploadedTimeline::SuccessfulAfterLocalFsUpdate(upload_data)
} else {
debug!("Finished uploading all timeline's layers");
if local_fs_updated {
UploadedTimeline::SuccessfulAfterLocalFsUpdate(upload_data)
} else {
UploadedTimeline::Successful(upload_data)
}
info!("Successfully uploaded all layers");
UploadedTimeline::Successful(upload_data)
}
}
@@ -253,10 +250,9 @@ mod tests {
let upload_data = match upload_result {
UploadedTimeline::Successful(upload_data) => upload_data,
wrong_result => panic!(
"Expected a successful upload for timeline, but got: {:?}",
wrong_result
),
wrong_result => {
panic!("Expected a successful upload for timeline, but got: {wrong_result:?}")
}
};
assert_eq!(
@@ -344,8 +340,7 @@ mod tests {
let upload_data = match upload_result {
UploadedTimeline::SuccessfulAfterLocalFsUpdate(upload_data) => upload_data,
wrong_result => panic!(
"Expected a successful after local fs upload for timeline, but got: {:?}",
wrong_result
"Expected a successful after local fs upload for timeline, but got: {wrong_result:?}"
),
};