From 5563ff123f93cd13fe4fab68b77bd328b1e78a6a Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Mon, 14 Feb 2022 15:05:52 +0200 Subject: [PATCH] Reuse tenant-timeline id struct from utils --- pageserver/src/remote_storage.rs | 27 +++----- pageserver/src/remote_storage/storage_sync.rs | 61 ++++++++++++------- .../remote_storage/storage_sync/download.rs | 19 ++++-- .../src/remote_storage/storage_sync/index.rs | 20 +++--- .../src/remote_storage/storage_sync/upload.rs | 23 ++++--- test_runner/batch_others/test_zenith_cli.py | 1 + zenith_utils/src/zid.rs | 2 +- 7 files changed, 91 insertions(+), 62 deletions(-) diff --git a/pageserver/src/remote_storage.rs b/pageserver/src/remote_storage.rs index 12ae0b5c22..4af1f8ed56 100644 --- a/pageserver/src/remote_storage.rs +++ b/pageserver/src/remote_storage.rs @@ -94,7 +94,7 @@ use std::{ use anyhow::{bail, Context}; use tokio::io; use tracing::{error, info}; -use zenith_utils::zid::{ZTenantId, ZTimelineId}; +use zenith_utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId}; pub use self::storage_sync::{schedule_timeline_checkpoint_upload, schedule_timeline_download}; use self::{local_fs::LocalFs, rust_s3::S3}; @@ -106,17 +106,6 @@ use crate::{ pub use storage_sync::compression; -/// Any timeline has its own id and its own tenant it belongs to, -/// the sync processes group timelines by both for simplicity. -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Hash)] -pub struct TimelineSyncId(ZTenantId, ZTimelineId); - -impl std::fmt::Display for TimelineSyncId { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "(tenant: {}, timeline: {})", self.0, self.1) - } -} - /// A structure to combine all synchronization data to share with pageserver after a successful sync loop initialization. /// Successful initialization includes a case when sync loop is not started, in which case the startup data is returned still, /// to simplify the received code. @@ -169,7 +158,7 @@ pub fn start_local_timeline_sync( ZTenantId, HashMap, > = HashMap::new(); - for (TimelineSyncId(tenant_id, timeline_id), (timeline_metadata, _)) in + for (ZTenantTimelineId{tenant_id, timeline_id}, (timeline_metadata, _)) in local_timeline_files { initial_timeline_states @@ -189,7 +178,7 @@ pub fn start_local_timeline_sync( fn local_tenant_timeline_files( config: &'static PageServerConf, -) -> anyhow::Result)>> { +) -> anyhow::Result)>> { let mut local_tenant_timeline_files = HashMap::new(); let tenants_dir = config.tenants_path(); for tenants_dir_entry in fs::read_dir(&tenants_dir) @@ -224,8 +213,9 @@ fn local_tenant_timeline_files( fn collect_timelines_for_tenant( config: &'static PageServerConf, tenant_path: &Path, -) -> anyhow::Result)>> { - let mut timelines: HashMap)> = HashMap::new(); +) -> anyhow::Result)>> { + let mut timelines: HashMap)> = + HashMap::new(); let tenant_id = tenant_path .file_name() .and_then(ffi::OsStr::to_str) @@ -246,7 +236,10 @@ fn collect_timelines_for_tenant( match collect_timeline_files(&timeline_path) { Ok((timeline_id, metadata, timeline_files)) => { timelines.insert( - TimelineSyncId(tenant_id, timeline_id), + ZTenantTimelineId { + tenant_id, + timeline_id, + }, (metadata, timeline_files), ); } diff --git a/pageserver/src/remote_storage/storage_sync.rs b/pageserver/src/remote_storage/storage_sync.rs index 6f5072affc..6b588c8e5f 100644 --- a/pageserver/src/remote_storage/storage_sync.rs +++ b/pageserver/src/remote_storage/storage_sync.rs @@ -106,7 +106,7 @@ use self::{ }, upload::upload_timeline_checkpoint, }; -use super::{RemoteStorage, SyncStartupData, TimelineSyncId}; +use super::{RemoteStorage, SyncStartupData, ZTenantTimelineId}; use crate::{ config::PageServerConf, layered_repository::metadata::TimelineMetadata, remote_storage::storage_sync::compression::read_archive_header, repository::TimelineSyncState, @@ -243,13 +243,13 @@ mod sync_queue { /// Limited by the number of retries, after certain threshold the failing task gets evicted and the timeline disabled. #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)] pub struct SyncTask { - sync_id: TimelineSyncId, + sync_id: ZTenantTimelineId, retries: u32, kind: SyncKind, } impl SyncTask { - fn new(sync_id: TimelineSyncId, retries: u32, kind: SyncKind) -> Self { + fn new(sync_id: ZTenantTimelineId, retries: u32, kind: SyncKind) -> Self { Self { sync_id, retries, @@ -308,7 +308,10 @@ pub fn schedule_timeline_checkpoint_upload( } if !sync_queue::push(SyncTask::new( - TimelineSyncId(tenant_id, timeline_id), + ZTenantTimelineId { + tenant_id, + timeline_id, + }, 0, SyncKind::Upload(NewCheckpoint { layers, metadata }), )) { @@ -339,7 +342,10 @@ pub fn schedule_timeline_download(tenant_id: ZTenantId, timeline_id: ZTimelineId tenant_id, timeline_id ); sync_queue::push(SyncTask::new( - TimelineSyncId(tenant_id, timeline_id), + ZTenantTimelineId { + tenant_id, + timeline_id, + }, 0, SyncKind::Download(TimelineDownload { files_to_skip: Arc::new(BTreeSet::new()), @@ -355,7 +361,7 @@ pub(super) fn spawn_storage_sync_thread< S: RemoteStorage + Send + Sync + 'static, >( conf: &'static PageServerConf, - local_timeline_files: HashMap)>, + local_timeline_files: HashMap)>, storage: S, max_concurrent_sync: NonZeroUsize, max_sync_errors: NonZeroU32, @@ -511,7 +517,7 @@ async fn loop_step< Err(e) => { error!( "Failed to process storage sync task for tenant {}, timeline {}: {:?}", - sync_id.0, sync_id.1, e + sync_id.tenant_id, sync_id.timeline_id, e ); None } @@ -525,7 +531,10 @@ async fn loop_step< while let Some((sync_id, state_update)) = task_batch.next().await { debug!("Finished storage sync task for sync id {}", sync_id); if let Some(state_update) = state_update { - let TimelineSyncId(tenant_id, timeline_id) = sync_id; + let ZTenantTimelineId { + tenant_id, + timeline_id, + } = sync_id; new_timeline_states .entry(tenant_id) .or_default() @@ -619,7 +628,7 @@ async fn process_task< fn schedule_first_sync_tasks( index: &RemoteTimelineIndex, - local_timeline_files: HashMap)>, + local_timeline_files: HashMap)>, ) -> HashMap> { let mut initial_timeline_statuses: HashMap> = HashMap::new(); @@ -630,7 +639,10 @@ fn schedule_first_sync_tasks( for (sync_id, (local_metadata, local_files)) in local_timeline_files { let local_disk_consistent_lsn = local_metadata.disk_consistent_lsn(); - let TimelineSyncId(tenant_id, timeline_id) = sync_id; + let ZTenantTimelineId { + tenant_id, + timeline_id, + } = sync_id; match index.timeline_entry(&sync_id) { Some(index_entry) => { let timeline_status = compare_local_and_remote_timeline( @@ -673,10 +685,10 @@ fn schedule_first_sync_tasks( } } - let unprocessed_remote_ids = |remote_id: &TimelineSyncId| { + let unprocessed_remote_ids = |remote_id: &ZTenantTimelineId| { initial_timeline_statuses - .get(&remote_id.0) - .and_then(|timelines| timelines.get(&remote_id.1)) + .get(&remote_id.tenant_id) + .and_then(|timelines| timelines.get(&remote_id.timeline_id)) .is_none() }; for unprocessed_remote_id in index @@ -684,7 +696,10 @@ fn schedule_first_sync_tasks( .filter(unprocessed_remote_ids) .collect::>() { - let TimelineSyncId(cloud_only_tenant_id, cloud_only_timeline_id) = unprocessed_remote_id; + let ZTenantTimelineId { + tenant_id: cloud_only_tenant_id, + timeline_id: cloud_only_timeline_id, + } = unprocessed_remote_id; match index .timeline_entry(&unprocessed_remote_id) .and_then(TimelineIndexEntry::disk_consistent_lsn) @@ -713,7 +728,7 @@ fn schedule_first_sync_tasks( fn compare_local_and_remote_timeline( new_sync_tasks: &mut VecDeque, - sync_id: TimelineSyncId, + sync_id: ZTenantTimelineId, local_metadata: TimelineMetadata, local_files: Vec, remote_entry: &TimelineIndexEntry, @@ -770,7 +785,7 @@ async fn update_index_description< >( (storage, index): &(S, RwLock), timeline_dir: &Path, - id: TimelineSyncId, + id: ZTenantTimelineId, ) -> anyhow::Result { let mut index_write = index.write().await; let full_index = match index_write.timeline_entry(&id) { @@ -793,7 +808,7 @@ async fn update_index_description< Ok((archive_id, header_size, header)) => full_index.update_archive_contents(archive_id.0, header, header_size), Err((e, archive_id)) => bail!( "Failed to download archive header for tenant {}, timeline {}, archive for Lsn {}: {}", - id.0, id.1, archive_id.0, + id.tenant_id, id.timeline_id, archive_id.0, e ), } @@ -871,7 +886,7 @@ mod test_utils { timeline_id: ZTimelineId, new_upload: NewCheckpoint, ) { - let sync_id = TimelineSyncId(harness.tenant_id, timeline_id); + let sync_id = ZTenantTimelineId::new(harness.tenant_id, timeline_id); upload_timeline_checkpoint( harness.conf, Arc::clone(&remote_assets), @@ -927,7 +942,7 @@ mod test_utils { pub async fn expect_timeline( index: &RwLock, - sync_id: TimelineSyncId, + sync_id: ZTenantTimelineId, ) -> RemoteTimeline { if let Some(TimelineIndexEntry::Full(remote_timeline)) = index.read().await.timeline_entry(&sync_id) @@ -962,18 +977,18 @@ mod test_utils { let mut expected_timeline_entries = BTreeMap::new(); for sync_id in actual_sync_ids { actual_branches.insert( - sync_id.1, + sync_id.tenant_id, index_read - .branch_files(sync_id.0) + .branch_files(sync_id.tenant_id) .into_iter() .flat_map(|branch_paths| branch_paths.iter()) .cloned() .collect::>(), ); expected_branches.insert( - sync_id.1, + sync_id.tenant_id, expected_index_with_descriptions - .branch_files(sync_id.0) + .branch_files(sync_id.tenant_id) .into_iter() .flat_map(|branch_paths| branch_paths.iter()) .cloned() diff --git a/pageserver/src/remote_storage/storage_sync/download.rs b/pageserver/src/remote_storage/storage_sync/download.rs index b00b746522..f268fc442a 100644 --- a/pageserver/src/remote_storage/storage_sync/download.rs +++ b/pageserver/src/remote_storage/storage_sync/download.rs @@ -17,7 +17,7 @@ use crate::{ compression, index::TimelineIndexEntry, sync_queue, tenant_branch_files, update_index_description, SyncKind, SyncTask, }, - RemoteStorage, TimelineSyncId, + RemoteStorage, ZTenantTimelineId, }, }; @@ -52,13 +52,16 @@ pub(super) async fn download_timeline< >( conf: &'static PageServerConf, remote_assets: Arc<(S, RwLock)>, - sync_id: TimelineSyncId, + sync_id: ZTenantTimelineId, mut download: TimelineDownload, retries: u32, ) -> DownloadedTimeline { debug!("Downloading layers for sync id {}", sync_id); - let TimelineSyncId(tenant_id, timeline_id) = sync_id; + let ZTenantTimelineId { + tenant_id, + timeline_id, + } = sync_id; let index_read = remote_assets.1.read().await; let remote_timeline = match index_read.timeline_entry(&sync_id) { None => { @@ -110,7 +113,8 @@ pub(super) async fn download_timeline< } }; - if let Err(e) = download_missing_branches(conf, remote_assets.as_ref(), sync_id.0).await { + if let Err(e) = download_missing_branches(conf, remote_assets.as_ref(), sync_id.tenant_id).await + { error!( "Failed to download missing branches for sync id {}: {:?}", sync_id, e @@ -180,7 +184,10 @@ async fn try_download_archive< S: RemoteStorage + Send + Sync + 'static, >( conf: &'static PageServerConf, - TimelineSyncId(tenant_id, timeline_id): TimelineSyncId, + ZTenantTimelineId { + tenant_id, + timeline_id, + }: ZTenantTimelineId, remote_assets: Arc<(S, RwLock)>, remote_timeline: &RemoteTimeline, archive_id: ArchiveId, @@ -343,7 +350,7 @@ mod tests { #[tokio::test] async fn test_download_timeline() -> anyhow::Result<()> { let repo_harness = RepoHarness::create("test_download_timeline")?; - let sync_id = TimelineSyncId(repo_harness.tenant_id, TIMELINE_ID); + let sync_id = ZTenantTimelineId::new(repo_harness.tenant_id, TIMELINE_ID); let storage = LocalFs::new(tempdir()?.path().to_owned(), &repo_harness.conf.workdir)?; let index = RwLock::new(RemoteTimelineIndex::try_parse_descriptions_from_paths( repo_harness.conf, diff --git a/pageserver/src/remote_storage/storage_sync/index.rs b/pageserver/src/remote_storage/storage_sync/index.rs index 039ccf8ea0..3d2680948d 100644 --- a/pageserver/src/remote_storage/storage_sync/index.rs +++ b/pageserver/src/remote_storage/storage_sync/index.rs @@ -22,7 +22,7 @@ use crate::{ layered_repository::TIMELINES_SEGMENT_NAME, remote_storage::{ storage_sync::compression::{parse_archive_name, FileEntry}, - TimelineSyncId, + ZTenantTimelineId, }, }; @@ -53,7 +53,7 @@ impl RelativePath { #[derive(Debug, Clone)] pub struct RemoteTimelineIndex { branch_files: HashMap>, - timeline_files: HashMap, + timeline_files: HashMap, } impl RemoteTimelineIndex { @@ -80,19 +80,22 @@ impl RemoteTimelineIndex { index } - pub fn timeline_entry(&self, id: &TimelineSyncId) -> Option<&TimelineIndexEntry> { + pub fn timeline_entry(&self, id: &ZTenantTimelineId) -> Option<&TimelineIndexEntry> { self.timeline_files.get(id) } - pub fn timeline_entry_mut(&mut self, id: &TimelineSyncId) -> Option<&mut TimelineIndexEntry> { + pub fn timeline_entry_mut( + &mut self, + id: &ZTenantTimelineId, + ) -> Option<&mut TimelineIndexEntry> { self.timeline_files.get_mut(id) } - pub fn add_timeline_entry(&mut self, id: TimelineSyncId, entry: TimelineIndexEntry) { + pub fn add_timeline_entry(&mut self, id: ZTenantTimelineId, entry: TimelineIndexEntry) { self.timeline_files.insert(id, entry); } - pub fn all_sync_ids(&self) -> impl Iterator + '_ { + pub fn all_sync_ids(&self) -> impl Iterator + '_ { self.timeline_files.keys().copied() } @@ -348,7 +351,10 @@ fn try_parse_index_entry( .to_string_lossy() .to_string(); - let sync_id = TimelineSyncId(tenant_id, timeline_id); + let sync_id = ZTenantTimelineId { + tenant_id, + timeline_id, + }; let timeline_index_entry = index .timeline_files .entry(sync_id) diff --git a/pageserver/src/remote_storage/storage_sync/upload.rs b/pageserver/src/remote_storage/storage_sync/upload.rs index 89e5bc241b..0f57d714dd 100644 --- a/pageserver/src/remote_storage/storage_sync/upload.rs +++ b/pageserver/src/remote_storage/storage_sync/upload.rs @@ -17,7 +17,7 @@ use crate::{ index::{RemoteTimeline, TimelineIndexEntry}, sync_queue, tenant_branch_files, update_index_description, SyncKind, SyncTask, }, - RemoteStorage, TimelineSyncId, + RemoteStorage, ZTenantTimelineId, }, }; @@ -36,12 +36,13 @@ pub(super) async fn upload_timeline_checkpoint< >( config: &'static PageServerConf, remote_assets: Arc<(S, RwLock)>, - sync_id: TimelineSyncId, + sync_id: ZTenantTimelineId, new_checkpoint: NewCheckpoint, retries: u32, ) -> Option { debug!("Uploading checkpoint for sync id {}", sync_id); - if let Err(e) = upload_missing_branches(config, remote_assets.as_ref(), sync_id.0).await { + if let Err(e) = upload_missing_branches(config, remote_assets.as_ref(), sync_id.tenant_id).await + { error!( "Failed to upload missing branches for sync id {}: {:?}", sync_id, e @@ -57,7 +58,10 @@ pub(super) async fn upload_timeline_checkpoint< let index = &remote_assets.1; - let TimelineSyncId(tenant_id, timeline_id) = sync_id; + let ZTenantTimelineId { + tenant_id, + timeline_id, + } = sync_id; let timeline_dir = config.timeline_path(&timeline_id, &tenant_id); let index_read = index.read().await; @@ -151,11 +155,14 @@ async fn try_upload_checkpoint< >( config: &'static PageServerConf, remote_assets: Arc<(S, RwLock)>, - sync_id: TimelineSyncId, + sync_id: ZTenantTimelineId, new_checkpoint: &NewCheckpoint, files_to_skip: BTreeSet, ) -> anyhow::Result<(ArchiveHeader, u64)> { - let TimelineSyncId(tenant_id, timeline_id) = sync_id; + let ZTenantTimelineId { + tenant_id, + timeline_id, + } = sync_id; let timeline_dir = config.timeline_path(&timeline_id, &tenant_id); let files_to_upload = new_checkpoint @@ -288,7 +295,7 @@ mod tests { #[tokio::test] async fn reupload_timeline() -> anyhow::Result<()> { let repo_harness = RepoHarness::create("reupload_timeline")?; - let sync_id = TimelineSyncId(repo_harness.tenant_id, TIMELINE_ID); + let sync_id = ZTenantTimelineId::new(repo_harness.tenant_id, TIMELINE_ID); let storage = LocalFs::new(tempdir()?.path().to_owned(), &repo_harness.conf.workdir)?; let index = RwLock::new(RemoteTimelineIndex::try_parse_descriptions_from_paths( repo_harness.conf, @@ -484,7 +491,7 @@ mod tests { #[tokio::test] async fn reupload_timeline_rejected() -> anyhow::Result<()> { let repo_harness = RepoHarness::create("reupload_timeline_rejected")?; - let sync_id = TimelineSyncId(repo_harness.tenant_id, TIMELINE_ID); + let sync_id = ZTenantTimelineId::new(repo_harness.tenant_id, TIMELINE_ID); let storage = LocalFs::new(tempdir()?.path().to_owned(), &repo_harness.conf.workdir)?; let index = RwLock::new(RemoteTimelineIndex::try_parse_descriptions_from_paths( repo_harness.conf, diff --git a/test_runner/batch_others/test_zenith_cli.py b/test_runner/batch_others/test_zenith_cli.py index 65fef79d30..872821abfc 100644 --- a/test_runner/batch_others/test_zenith_cli.py +++ b/test_runner/batch_others/test_zenith_cli.py @@ -107,6 +107,7 @@ def test_cli_tenant_list(zenith_simple_env: ZenithEnv): assert tenant1 in tenants assert tenant2 in tenants + def test_cli_ipv4_listeners(zenith_env_builder: ZenithEnvBuilder): # Start with single sk zenith_env_builder.num_safekeepers = 1 diff --git a/zenith_utils/src/zid.rs b/zenith_utils/src/zid.rs index 7ff3c0cffd..2e93ab596c 100644 --- a/zenith_utils/src/zid.rs +++ b/zenith_utils/src/zid.rs @@ -196,7 +196,7 @@ pub mod opt_display_serde { } // A pair uniquely identifying Zenith instance. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash)] pub struct ZTenantTimelineId { pub tenant_id: ZTenantId, pub timeline_id: ZTimelineId,