From ca60561a01f4e23f130dbbdb1915630e59fd2863 Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Tue, 14 Dec 2021 00:46:36 +0200 Subject: [PATCH] Propagate disk consistent lsn in timeline sync statuses --- pageserver/src/branches.rs | 2 +- pageserver/src/layered_repository.rs | 70 +++++++---- pageserver/src/remote_storage.rs | 9 +- pageserver/src/remote_storage/storage_sync.rs | 91 ++++++++++---- .../remote_storage/storage_sync/download.rs | 116 ++++++++++++------ .../src/remote_storage/storage_sync/index.rs | 18 ++- pageserver/src/repository.rs | 32 ++++- pageserver/src/walreceiver.rs | 3 + 8 files changed, 246 insertions(+), 95 deletions(-) diff --git a/pageserver/src/branches.rs b/pageserver/src/branches.rs index 2ba11cdec1..eb716a6501 100644 --- a/pageserver/src/branches.rs +++ b/pageserver/src/branches.rs @@ -56,7 +56,7 @@ impl BranchInfo { let timeline = match repo.get_timeline(timeline_id)? { RepositoryTimeline::Local(local_entry) => local_entry, - RepositoryTimeline::Remote(_) => { + RepositoryTimeline::Remote { .. } => { bail!("Timeline {} is remote, no branches to display", timeline_id) } }; diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 8d5c9acf70..0533d6265b 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -139,9 +139,13 @@ impl Repository for LayeredRepository { Ok( match self.get_or_init_timeline(timelineid, &mut timelines)? { LayeredTimelineEntry::Local(local) => RepositoryTimeline::Local(local), - LayeredTimelineEntry::Remote(remote_timeline_id) => { - RepositoryTimeline::Remote(remote_timeline_id) - } + LayeredTimelineEntry::Remote { + id, + disk_consistent_lsn, + } => RepositoryTimeline::Remote { + id, + disk_consistent_lsn, + }, }, ) } @@ -184,7 +188,7 @@ impl Repository for LayeredRepository { let mut timelines = self.timelines.lock().unwrap(); let src_timeline = match self.get_or_init_timeline(src, &mut timelines)? { LayeredTimelineEntry::Local(timeline) => timeline, - LayeredTimelineEntry::Remote(_) => { + LayeredTimelineEntry::Remote { .. } => { bail!("Cannot branch off the timeline {} that's not local", src) } }; @@ -262,7 +266,7 @@ impl Repository for LayeredRepository { info_span!("checkpoint", timeline = %timelineid, tenant = %self.tenantid).entered(); match timeline { LayeredTimelineEntry::Local(timeline) => timeline.checkpoint(cconf)?, - LayeredTimelineEntry::Remote(_) => debug!( + LayeredTimelineEntry::Remote { .. } => debug!( "Cannot run the checkpoint for remote timeline {}", timelineid ), @@ -296,17 +300,22 @@ impl Repository for LayeredRepository { let mut timelines_accessor = self.timelines.lock().unwrap(); let timeline_to_shutdown = match new_state { - TimelineSyncState::Ready => { + TimelineSyncState::Ready(_) => { let reloaded_timeline = self.init_local_timeline(timeline_id, &mut timelines_accessor)?; timelines_accessor .insert(timeline_id, LayeredTimelineEntry::Local(reloaded_timeline)); None } - TimelineSyncState::Evicted => timelines_accessor.remove(&timeline_id), - TimelineSyncState::AwaitsDownload | TimelineSyncState::CloudOnly => { - timelines_accessor.insert(timeline_id, LayeredTimelineEntry::Remote(timeline_id)) - } + TimelineSyncState::Evicted(_) => timelines_accessor.remove(&timeline_id), + TimelineSyncState::AwaitsDownload(disk_consistent_lsn) + | TimelineSyncState::CloudOnly(disk_consistent_lsn) => timelines_accessor.insert( + timeline_id, + LayeredTimelineEntry::Remote { + id: timeline_id, + disk_consistent_lsn, + }, + ), }; drop(timelines_accessor); @@ -324,18 +333,16 @@ impl Repository for LayeredRepository { /// [`TimelineSyncState::Evicted`] and other non-local and non-remote states are not stored in the layered repo at all, /// hence their statuses cannot be returned by the repo. fn get_timeline_state(&self, timeline_id: ZTimelineId) -> Option { + let timelines_accessor = self.timelines.lock().unwrap(); + let timeline_entry = timelines_accessor.get(&timeline_id)?; Some( - if self - .timelines - .lock() - .unwrap() - .get(&timeline_id)? + if timeline_entry .local_or_schedule_download(self.tenantid) .is_some() { - TimelineSyncState::Ready + TimelineSyncState::Ready(timeline_entry.disk_consistent_lsn()) } else { - TimelineSyncState::CloudOnly + TimelineSyncState::CloudOnly(timeline_entry.disk_consistent_lsn()) }, ) } @@ -356,7 +363,7 @@ fn shutdown_timeline( timeline.checkpoint(CheckpointConfig::Forced)?; //TODO Wait for walredo process to shutdown too } - LayeredTimelineEntry::Remote(_) => warn!( + LayeredTimelineEntry::Remote { .. } => warn!( "Skipping shutdown of a remote timeline {} for tenant {}", timeline_id, tenant_id ), @@ -367,14 +374,18 @@ fn shutdown_timeline( #[derive(Clone)] enum LayeredTimelineEntry { Local(Arc), - Remote(ZTimelineId), + Remote { + id: ZTimelineId, + /// metadata contents of the latest successfully uploaded checkpoint + disk_consistent_lsn: Lsn, + }, } impl LayeredTimelineEntry { fn timeline_id(&self) -> ZTimelineId { match self { LayeredTimelineEntry::Local(timeline) => timeline.timelineid, - LayeredTimelineEntry::Remote(timeline_id) => *timeline_id, + LayeredTimelineEntry::Remote { id, .. } => *id, } } @@ -382,7 +393,9 @@ impl LayeredTimelineEntry { fn local_or_schedule_download(&self, tenant_id: ZTenantId) -> Option<&LayeredTimeline> { match self { Self::Local(local) => Some(local.as_ref()), - Self::Remote(timeline_id) => { + Self::Remote { + id: timeline_id, .. + } => { debug!( "Accessed a remote timeline {} for tenant {}, scheduling a timeline download", timeline_id, tenant_id @@ -392,6 +405,17 @@ impl LayeredTimelineEntry { } } } + + /// Gets a current (latest for the remote case) disk consistent Lsn for the timeline. + fn disk_consistent_lsn(&self) -> Lsn { + match self { + Self::Local(local) => local.disk_consistent_lsn.load(), + Self::Remote { + disk_consistent_lsn, + .. + } => *disk_consistent_lsn, + } + } } /// Private functions @@ -576,7 +600,7 @@ impl LayeredRepository { for &timelineid in &timelineids { let timeline = match self.get_or_init_timeline(timelineid, &mut timelines)? { LayeredTimelineEntry::Local(timeline) => timeline, - LayeredTimelineEntry::Remote(_) => { + LayeredTimelineEntry::Remote { .. } => { warn!( "Timeline {} is not local, cannot proceed with gc", timelineid @@ -623,7 +647,7 @@ impl LayeredRepository { // so this operation is just a quick map lookup. let timeline = match self.get_or_init_timeline(timelineid, &mut *timelines)? { LayeredTimelineEntry::Local(timeline) => timeline, - LayeredTimelineEntry::Remote(_) => { + LayeredTimelineEntry::Remote { .. } => { debug!("Skipping GC for non-local timeline {}", timelineid); continue; } diff --git a/pageserver/src/remote_storage.rs b/pageserver/src/remote_storage.rs index 5e3e987186..d6f449c4dc 100644 --- a/pageserver/src/remote_storage.rs +++ b/pageserver/src/remote_storage.rs @@ -163,11 +163,16 @@ pub fn start_local_timeline_sync( ZTenantId, HashMap, > = HashMap::new(); - for TimelineSyncId(tenant_id, timeline_id) in local_timeline_files.into_keys() { + for (TimelineSyncId(tenant_id, timeline_id), (timeline_metadata, _)) in + local_timeline_files + { initial_timeline_states .entry(tenant_id) .or_default() - .insert(timeline_id, TimelineSyncState::Ready); + .insert( + timeline_id, + TimelineSyncState::Ready(timeline_metadata.disk_consistent_lsn()), + ); } Ok(SyncStartupData { initial_timeline_states, diff --git a/pageserver/src/remote_storage/storage_sync.rs b/pageserver/src/remote_storage/storage_sync.rs index b1c878e760..a8f46ef511 100644 --- a/pageserver/src/remote_storage/storage_sync.rs +++ b/pageserver/src/remote_storage/storage_sync.rs @@ -95,7 +95,7 @@ use tracing::*; use self::{ compression::ArchiveHeader, - download::download_timeline, + download::{download_timeline, DownloadedTimeline}, index::{ ArchiveDescription, ArchiveId, RelativePath, RemoteTimeline, RemoteTimelineIndex, TimelineIndexEntry, @@ -522,7 +522,15 @@ async fn process_task< "Evicting task {:?} that failed {} times, exceeding the error threshold", task.kind, task.retries ); - return Some(TimelineSyncState::Evicted); + return Some(TimelineSyncState::Evicted( + remote_assets + .as_ref() + .1 + .read() + .await + .timeline_entry(&task.sync_id) + .and_then(TimelineIndexEntry::disk_consistent_lsn), + )); } if task.retries > 0 { @@ -538,7 +546,7 @@ async fn process_task< let sync_name = task.kind.sync_name(); match task.kind { SyncKind::Download(download_data) => { - let sync_status = download_timeline( + let download_result = download_timeline( conf, remote_assets, task.sync_id, @@ -546,12 +554,24 @@ async fn process_task< task.retries + 1, ) .await; - register_sync_status(sync_start, sync_name, sync_status); - if sync_status? { - Some(TimelineSyncState::Ready) - } else { - Some(TimelineSyncState::AwaitsDownload) + match download_result { + DownloadedTimeline::Abort => { + register_sync_status(sync_start, sync_name, None); + None + } + DownloadedTimeline::FailedAndRescheduled { + disk_consistent_lsn, + } => { + register_sync_status(sync_start, sync_name, Some(false)); + Some(TimelineSyncState::AwaitsDownload(disk_consistent_lsn)) + } + DownloadedTimeline::Successful { + disk_consistent_lsn, + } => { + register_sync_status(sync_start, sync_name, Some(true)); + Some(TimelineSyncState::Ready(disk_consistent_lsn)) + } } } SyncKind::Upload(layer_upload) => { @@ -580,6 +600,8 @@ fn schedule_first_sync_tasks( VecDeque::with_capacity(local_timeline_files.len().max(local_timeline_files.len())); for (sync_id, (local_metadata, local_files)) in local_timeline_files { + let local_disk_consistent_lsn = local_metadata.disk_consistent_lsn(); + let TimelineSyncId(tenant_id, timeline_id) = sync_id; match index.timeline_entry(&sync_id) { Some(index_entry) => { @@ -590,10 +612,18 @@ fn schedule_first_sync_tasks( local_files, index_entry, ); - initial_timeline_statuses - .entry(tenant_id) - .or_default() - .insert(timeline_id, timeline_status); + match timeline_status { + Some(timeline_status) => { + initial_timeline_statuses + .entry(tenant_id) + .or_default() + .insert(timeline_id, timeline_status); + } + None => error!( + "Failed to compare local and remote timeline for task {}", + sync_id + ), + } } None => { new_sync_tasks.push_back(SyncTask::new( @@ -607,7 +637,10 @@ fn schedule_first_sync_tasks( initial_timeline_statuses .entry(tenant_id) .or_default() - .insert(timeline_id, TimelineSyncState::Ready); + .insert( + timeline_id, + TimelineSyncState::Ready(local_disk_consistent_lsn), + ); } } } @@ -624,10 +657,24 @@ fn schedule_first_sync_tasks( .collect::>() { let TimelineSyncId(cloud_only_tenant_id, cloud_only_timeline_id) = unprocessed_remote_id; - initial_timeline_statuses - .entry(cloud_only_tenant_id) - .or_default() - .insert(cloud_only_timeline_id, TimelineSyncState::CloudOnly); + match index + .timeline_entry(&unprocessed_remote_id) + .and_then(TimelineIndexEntry::disk_consistent_lsn) + { + Some(remote_disk_consistent_lsn) => { + initial_timeline_statuses + .entry(cloud_only_tenant_id) + .or_default() + .insert( + cloud_only_timeline_id, + TimelineSyncState::CloudOnly(remote_disk_consistent_lsn), + ); + } + None => error!( + "Failed to find disk consistent LSN for remote timeline {}", + unprocessed_remote_id + ), + } } new_sync_tasks.into_iter().for_each(|task| { @@ -642,7 +689,7 @@ fn compare_local_and_remote_timeline( local_metadata: TimelineMetadata, local_files: Vec, remote_entry: &TimelineIndexEntry, -) -> TimelineSyncState { +) -> Option { let local_lsn = local_metadata.disk_consistent_lsn(); let uploads = remote_entry.uploaded_checkpoints(); @@ -663,7 +710,7 @@ fn compare_local_and_remote_timeline( .filter(|upload_lsn| upload_lsn <= &local_lsn) .map(ArchiveId) .collect(); - if archives_to_skip.len() != uploads_count { + Some(if archives_to_skip.len() != uploads_count { new_sync_tasks.push_back(SyncTask::new( sync_id, 0, @@ -672,10 +719,10 @@ fn compare_local_and_remote_timeline( archives_to_skip, }), )); - TimelineSyncState::AwaitsDownload + TimelineSyncState::AwaitsDownload(remote_entry.disk_consistent_lsn()?) } else { - TimelineSyncState::Ready - } + TimelineSyncState::Ready(remote_entry.disk_consistent_lsn().unwrap_or(local_lsn)) + }) } fn register_sync_status(sync_start: Instant, sync_name: &str, sync_status: Option) { diff --git a/pageserver/src/remote_storage/storage_sync/download.rs b/pageserver/src/remote_storage/storage_sync/download.rs index 8b7af821ed..3494138cb5 100644 --- a/pageserver/src/remote_storage/storage_sync/download.rs +++ b/pageserver/src/remote_storage/storage_sync/download.rs @@ -7,7 +7,7 @@ use anyhow::{anyhow, ensure, Context}; use futures::{stream::FuturesUnordered, StreamExt}; use tokio::{fs, sync::RwLock}; use tracing::{debug, error, trace, warn}; -use zenith_utils::zid::ZTenantId; +use zenith_utils::{lsn::Lsn, zid::ZTenantId}; use crate::{ layered_repository::metadata::{metadata_path, TimelineMetadata}, @@ -26,6 +26,18 @@ use super::{ TimelineDownload, }; +/// Timeline download result, with extra data, needed for downloading. +pub(super) enum DownloadedTimeline { + /// Remote timeline data is either absent or corrupt, no download possible. + Abort, + /// Remote timeline data is found, its latest checkpoint's metadata contents (disk_consistent_lsn) is known. + /// Initial download failed due to some error, the download task is rescheduled for another retry. + FailedAndRescheduled { disk_consistent_lsn: Lsn }, + /// Remote timeline data is found, its latest checkpoint's metadata contents (disk_consistent_lsn) is known. + /// Initial download successful. + Successful { disk_consistent_lsn: Lsn }, +} + /// Attempts to download and uncompress files from all remote archives for the timeline given. /// Timeline files that already exist locally are skipped during the download, but the local metadata file is /// updated in the end of every checkpoint archive extraction. @@ -43,8 +55,61 @@ pub(super) async fn download_timeline< sync_id: TimelineSyncId, mut download: TimelineDownload, retries: u32, -) -> Option { +) -> DownloadedTimeline { debug!("Downloading layers for sync id {}", sync_id); + + let TimelineSyncId(tenant_id, timeline_id) = sync_id; + let index_read = remote_assets.1.read().await; + let remote_timeline = match index_read.timeline_entry(&sync_id) { + None => { + error!("Cannot download: no timeline is present in the index for given ids"); + return DownloadedTimeline::Abort; + } + Some(index_entry) => match index_entry { + TimelineIndexEntry::Full(remote_timeline) => Cow::Borrowed(remote_timeline), + TimelineIndexEntry::Description(_) => { + let remote_disk_consistent_lsn = index_entry.disk_consistent_lsn(); + drop(index_read); + debug!("Found timeline description for the given ids, downloading the full index"); + match update_index_description( + remote_assets.as_ref(), + &conf.timeline_path(&timeline_id, &tenant_id), + sync_id, + ) + .await + { + Ok(remote_timeline) => Cow::Owned(remote_timeline), + Err(e) => { + error!("Failed to download full timeline index: {:#}", e); + return match remote_disk_consistent_lsn { + Some(disk_consistent_lsn) => { + sync_queue::push(SyncTask::new( + sync_id, + retries, + SyncKind::Download(download), + )); + DownloadedTimeline::FailedAndRescheduled { + disk_consistent_lsn, + } + } + None => { + error!("Cannot download: no disk consistent Lsn is present for the index entry"); + DownloadedTimeline::Abort + } + }; + } + } + } + }, + }; + let disk_consistent_lsn = match remote_timeline.checkpoints().max() { + Some(lsn) => lsn, + None => { + debug!("Cannot download: no disk consistent Lsn is present for the remote timeline"); + return DownloadedTimeline::Abort; + } + }; + if let Err(e) = download_missing_branches(conf, remote_assets.as_ref(), sync_id.0).await { error!( "Failed to download missing branches for sync id {}: {:#}", @@ -55,42 +120,11 @@ pub(super) async fn download_timeline< retries, SyncKind::Download(download), )); - return Some(false); + return DownloadedTimeline::FailedAndRescheduled { + disk_consistent_lsn, + }; } - let TimelineSyncId(tenant_id, timeline_id) = sync_id; - - let index_read = remote_assets.1.read().await; - let remote_timeline = match index_read.timeline_entry(&sync_id) { - None => { - error!("Cannot download: no timeline is present in the index for given ids"); - return None; - } - Some(TimelineIndexEntry::Full(remote_timeline)) => Cow::Borrowed(remote_timeline), - Some(TimelineIndexEntry::Description(_)) => { - drop(index_read); - debug!("Found timeline description for the given ids, downloading the full index"); - match update_index_description( - remote_assets.as_ref(), - &conf.timeline_path(&timeline_id, &tenant_id), - sync_id, - ) - .await - { - Ok(remote_timeline) => Cow::Owned(remote_timeline), - Err(e) => { - error!("Failed to download full timeline index: {:#}", e); - sync_queue::push(SyncTask::new( - sync_id, - retries, - SyncKind::Download(download), - )); - return Some(false); - } - } - } - }; - debug!("Downloading timeline archives"); let archives_to_download = remote_timeline .checkpoints() @@ -124,7 +158,9 @@ pub(super) async fn download_timeline< retries, SyncKind::Download(download), )); - return Some(false); + return DownloadedTimeline::FailedAndRescheduled { + disk_consistent_lsn, + }; } Ok(()) => { debug!("Successfully downloaded archive {:?}", archive_id); @@ -134,7 +170,9 @@ pub(super) async fn download_timeline< } debug!("Finished downloading all timeline's archives"); - Some(true) + DownloadedTimeline::Successful { + disk_consistent_lsn, + } } async fn try_download_archive< @@ -160,7 +198,7 @@ async fn try_download_archive< Ok(local_metadata) => ensure!( // need to allow `<=` instead of `<` due to cases when a failed archive can be redownloaded local_metadata.disk_consistent_lsn() <= archive_to_download.disk_consistent_lsn(), - "Cannot download archive with LSN {} since it's earlier than local LSN {}", + "Cannot download archive with Lsn {} since it's earlier than local Lsn {}", archive_to_download.disk_consistent_lsn(), local_metadata.disk_consistent_lsn() ), diff --git a/pageserver/src/remote_storage/storage_sync/index.rs b/pageserver/src/remote_storage/storage_sync/index.rs index 7e07817026..e5906264af 100644 --- a/pageserver/src/remote_storage/storage_sync/index.rs +++ b/pageserver/src/remote_storage/storage_sync/index.rs @@ -119,16 +119,30 @@ pub enum TimelineIndexEntry { impl TimelineIndexEntry { pub fn uploaded_checkpoints(&self) -> BTreeSet { match self { - TimelineIndexEntry::Description(description) => { + Self::Description(description) => { description.keys().map(|archive_id| archive_id.0).collect() } - TimelineIndexEntry::Full(remote_timeline) => remote_timeline + Self::Full(remote_timeline) => remote_timeline .checkpoint_archives .keys() .map(|archive_id| archive_id.0) .collect(), } } + + /// Gets latest uploaded checkpoint's disk consisten Lsn for the corresponding timeline. + pub fn disk_consistent_lsn(&self) -> Option { + match self { + Self::Description(description) => { + description.keys().map(|archive_id| archive_id.0).max() + } + Self::Full(remote_timeline) => remote_timeline + .checkpoint_archives + .keys() + .map(|archive_id| archive_id.0) + .max(), + } + } } /// Checkpoint archive's id, corresponding to the `disk_consistent_lsn` from the timeline's metadata file during checkpointing. diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 8b31547781..077969131f 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -69,7 +69,11 @@ pub enum RepositoryTimeline { /// Loaded into pageserver's memory and ready to be used. Local(Arc), /// Timeline, found on the pageserver's remote storage, but not yet downloaded locally. - Remote(ZTimelineId), + Remote { + id: ZTimelineId, + /// metadata contents of the latest successfully uploaded checkpoint + disk_consistent_lsn: Lsn, + }, } impl RepositoryTimeline { @@ -82,22 +86,38 @@ impl RepositoryTimeline { } } +/// A state of the timeline synchronization with the remote storage. +/// Contains `disk_consistent_lsn` of the corresponding remote timeline (latest checkpoint's disk_consistent_lsn). #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] pub enum TimelineSyncState { /// No further downloads from the remote storage are needed. /// The timeline state is up-to-date or ahead of the remote storage one, /// ready to be used in any pageserver operation. - Ready, + Ready(Lsn), /// Timeline is scheduled for downloading, but its current local state is not up to date with the remote storage. /// The timeline is not ready to be used in any pageserver operations, otherwise it might diverge its local state from the remote version, /// making it impossible to sync it further. - AwaitsDownload, + AwaitsDownload(Lsn), /// Timeline was not in the pageserver's local working directory, but was found on the remote storage, ready to be downloaded. /// Cannot be used in any pageserver operations due to complete absence locally. - CloudOnly, + CloudOnly(Lsn), /// Timeline was evicted from the pageserver's local working directory due to conflicting remote and local states or too many errors during the synchronization. - /// Such timelines cannot have their state synchronized further. - Evicted, + /// Such timelines cannot have their state synchronized further and may not have the data about remote timeline's disk_consistent_lsn, since eviction may happen + /// due to errors before the remote timeline contents is known. + Evicted(Option), +} + +impl TimelineSyncState { + pub fn remote_disk_consistent_lsn(&self) -> Option { + Some(match self { + TimelineSyncState::Evicted(None) => return None, + TimelineSyncState::Ready(lsn) => lsn, + TimelineSyncState::AwaitsDownload(lsn) => lsn, + TimelineSyncState::CloudOnly(lsn) => lsn, + TimelineSyncState::Evicted(Some(lsn)) => lsn, + }) + .copied() + } } /// diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 12de49286f..cf97e72de0 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -209,6 +209,9 @@ fn walreceiver_main( tenantid, timelineid, ) })?; + let _timeline_synced_disk_consistent_lsn = tenant_mgr::get_repository_for_tenant(tenantid)? + .get_timeline_state(timelineid) + .and_then(|state| state.remote_disk_consistent_lsn()); // // Start streaming the WAL, from where we left off previously.