From 8437fc056e9c95c3a925df4dd4317f4454b8198c Mon Sep 17 00:00:00 2001 From: Dmitry Rodionov Date: Wed, 23 Mar 2022 22:03:12 +0400 Subject: [PATCH] some follow ups after s3 integration was enabled on staging * do not error out when upload file list is empty * ignore ephemeral files during sync initialization --- pageserver/src/layered_repository.rs | 2 +- pageserver/src/remote_storage.rs | 8 ++++- .../src/remote_storage/storage_sync/upload.rs | 29 ++++++++++--------- 3 files changed, 24 insertions(+), 15 deletions(-) diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 2c4393481d..9cb0a17e66 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -54,7 +54,7 @@ use zenith_utils::lsn::{AtomicLsn, Lsn, RecordLsn}; use zenith_utils::seqwait::SeqWait; mod delta_layer; -mod ephemeral_file; +pub(crate) mod ephemeral_file; mod filename; mod global_layer_map; mod image_layer; diff --git a/pageserver/src/remote_storage.rs b/pageserver/src/remote_storage.rs index 08fb16a679..6eb7bd910b 100644 --- a/pageserver/src/remote_storage.rs +++ b/pageserver/src/remote_storage.rs @@ -94,12 +94,13 @@ use std::{ use anyhow::{bail, Context}; use tokio::{io, sync::RwLock}; -use tracing::{error, info}; +use tracing::{debug, error, info}; use zenith_utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId}; pub use self::storage_sync::index::{RemoteTimelineIndex, TimelineIndexEntry}; pub use self::storage_sync::{schedule_timeline_checkpoint_upload, schedule_timeline_download}; use self::{local_fs::LocalFs, rust_s3::S3}; +use crate::layered_repository::ephemeral_file::is_ephemeral_file; use crate::{ config::{PageServerConf, RemoteStorageKind}, layered_repository::metadata::{TimelineMetadata, METADATA_FILE_NAME}, @@ -261,6 +262,8 @@ fn collect_timelines_for_tenant( Ok(timelines) } +// discover timeline files and extract timeline metadata +// NOTE: ephemeral files are excluded from the list fn collect_timeline_files( timeline_dir: &Path, ) -> anyhow::Result<(ZTimelineId, TimelineMetadata, Vec)> { @@ -280,6 +283,9 @@ fn collect_timeline_files( if entry_path.is_file() { if entry_path.file_name().and_then(ffi::OsStr::to_str) == Some(METADATA_FILE_NAME) { timeline_metadata_path = Some(entry_path); + } else if is_ephemeral_file(&entry_path.file_name().unwrap().to_string_lossy()) { + debug!("skipping ephemeral file {}", entry_path.display()); + continue; } else { timeline_files.push(entry_path); } diff --git a/pageserver/src/remote_storage/storage_sync/upload.rs b/pageserver/src/remote_storage/storage_sync/upload.rs index 431b5ec484..dfc4433694 100644 --- a/pageserver/src/remote_storage/storage_sync/upload.rs +++ b/pageserver/src/remote_storage/storage_sync/upload.rs @@ -2,7 +2,6 @@ use std::{borrow::Cow, collections::BTreeSet, path::PathBuf, sync::Arc}; -use anyhow::ensure; use tokio::sync::RwLock; use tracing::{debug, error, warn}; @@ -95,7 +94,7 @@ pub(super) async fn upload_timeline_checkpoint< ) .await { - Ok((archive_header, header_size)) => { + Some(Ok((archive_header, header_size))) => { let mut index_write = index.write().await; match index_write .timeline_entry_mut(&sync_id) @@ -136,7 +135,7 @@ pub(super) async fn upload_timeline_checkpoint< debug!("Checkpoint uploaded successfully"); Some(true) } - Err(e) => { + Some(Err(e)) => { error!( "Failed to upload checkpoint: {:?}, requeueing the upload", e @@ -148,6 +147,7 @@ pub(super) async fn upload_timeline_checkpoint< )); Some(false) } + None => Some(true), } } @@ -160,7 +160,7 @@ async fn try_upload_checkpoint< sync_id: ZTenantTimelineId, new_checkpoint: &NewCheckpoint, files_to_skip: BTreeSet, -) -> anyhow::Result<(ArchiveHeader, u64)> { +) -> Option> { let ZTenantTimelineId { tenant_id, timeline_id, @@ -172,7 +172,7 @@ async fn try_upload_checkpoint< .iter() .filter(|&path_to_upload| { if files_to_skip.contains(path_to_upload) { - error!( + warn!( "Skipping file upload '{}', since it was already uploaded", path_to_upload.display() ); @@ -183,14 +183,15 @@ async fn try_upload_checkpoint< }) .collect::>(); - ensure!( - !files_to_upload.is_empty(), - "No files to upload. Upload request was: {:?}, already uploaded files: {:?}", - new_checkpoint.layers, - files_to_skip, - ); + if files_to_upload.is_empty() { + warn!( + "No files to upload. Upload request was: {:?}, already uploaded files: {:?}", + new_checkpoint.layers, files_to_skip + ); + return None; + } - compression::archive_files_as_stream( + let upload_result = compression::archive_files_as_stream( &timeline_dir, files_to_upload.into_iter(), &new_checkpoint.metadata, @@ -206,7 +207,9 @@ async fn try_upload_checkpoint< }, ) .await - .map(|(header, header_size, _)| (header, header_size)) + .map(|(header, header_size, _)| (header, header_size)); + + Some(upload_result) } #[cfg(test)]