Propagate disk consistent lsn in timeline sync statuses

This commit is contained in:
Kirill Bulatov
2021-12-14 00:46:36 +02:00
committed by Kirill Bulatov
parent 86a409a174
commit ca60561a01
8 changed files with 246 additions and 95 deletions

View File

@@ -56,7 +56,7 @@ impl BranchInfo {
let timeline = match repo.get_timeline(timeline_id)? {
RepositoryTimeline::Local(local_entry) => local_entry,
RepositoryTimeline::Remote(_) => {
RepositoryTimeline::Remote { .. } => {
bail!("Timeline {} is remote, no branches to display", timeline_id)
}
};

View File

@@ -139,9 +139,13 @@ impl Repository for LayeredRepository {
Ok(
match self.get_or_init_timeline(timelineid, &mut timelines)? {
LayeredTimelineEntry::Local(local) => RepositoryTimeline::Local(local),
LayeredTimelineEntry::Remote(remote_timeline_id) => {
RepositoryTimeline::Remote(remote_timeline_id)
}
LayeredTimelineEntry::Remote {
id,
disk_consistent_lsn,
} => RepositoryTimeline::Remote {
id,
disk_consistent_lsn,
},
},
)
}
@@ -184,7 +188,7 @@ impl Repository for LayeredRepository {
let mut timelines = self.timelines.lock().unwrap();
let src_timeline = match self.get_or_init_timeline(src, &mut timelines)? {
LayeredTimelineEntry::Local(timeline) => timeline,
LayeredTimelineEntry::Remote(_) => {
LayeredTimelineEntry::Remote { .. } => {
bail!("Cannot branch off the timeline {} that's not local", src)
}
};
@@ -262,7 +266,7 @@ impl Repository for LayeredRepository {
info_span!("checkpoint", timeline = %timelineid, tenant = %self.tenantid).entered();
match timeline {
LayeredTimelineEntry::Local(timeline) => timeline.checkpoint(cconf)?,
LayeredTimelineEntry::Remote(_) => debug!(
LayeredTimelineEntry::Remote { .. } => debug!(
"Cannot run the checkpoint for remote timeline {}",
timelineid
),
@@ -296,17 +300,22 @@ impl Repository for LayeredRepository {
let mut timelines_accessor = self.timelines.lock().unwrap();
let timeline_to_shutdown = match new_state {
TimelineSyncState::Ready => {
TimelineSyncState::Ready(_) => {
let reloaded_timeline =
self.init_local_timeline(timeline_id, &mut timelines_accessor)?;
timelines_accessor
.insert(timeline_id, LayeredTimelineEntry::Local(reloaded_timeline));
None
}
TimelineSyncState::Evicted => timelines_accessor.remove(&timeline_id),
TimelineSyncState::AwaitsDownload | TimelineSyncState::CloudOnly => {
timelines_accessor.insert(timeline_id, LayeredTimelineEntry::Remote(timeline_id))
}
TimelineSyncState::Evicted(_) => timelines_accessor.remove(&timeline_id),
TimelineSyncState::AwaitsDownload(disk_consistent_lsn)
| TimelineSyncState::CloudOnly(disk_consistent_lsn) => timelines_accessor.insert(
timeline_id,
LayeredTimelineEntry::Remote {
id: timeline_id,
disk_consistent_lsn,
},
),
};
drop(timelines_accessor);
@@ -324,18 +333,16 @@ impl Repository for LayeredRepository {
/// [`TimelineSyncState::Evicted`] and other non-local and non-remote states are not stored in the layered repo at all,
/// hence their statuses cannot be returned by the repo.
fn get_timeline_state(&self, timeline_id: ZTimelineId) -> Option<TimelineSyncState> {
let timelines_accessor = self.timelines.lock().unwrap();
let timeline_entry = timelines_accessor.get(&timeline_id)?;
Some(
if self
.timelines
.lock()
.unwrap()
.get(&timeline_id)?
if timeline_entry
.local_or_schedule_download(self.tenantid)
.is_some()
{
TimelineSyncState::Ready
TimelineSyncState::Ready(timeline_entry.disk_consistent_lsn())
} else {
TimelineSyncState::CloudOnly
TimelineSyncState::CloudOnly(timeline_entry.disk_consistent_lsn())
},
)
}
@@ -356,7 +363,7 @@ fn shutdown_timeline(
timeline.checkpoint(CheckpointConfig::Forced)?;
//TODO Wait for walredo process to shutdown too
}
LayeredTimelineEntry::Remote(_) => warn!(
LayeredTimelineEntry::Remote { .. } => warn!(
"Skipping shutdown of a remote timeline {} for tenant {}",
timeline_id, tenant_id
),
@@ -367,14 +374,18 @@ fn shutdown_timeline(
#[derive(Clone)]
enum LayeredTimelineEntry {
Local(Arc<LayeredTimeline>),
Remote(ZTimelineId),
Remote {
id: ZTimelineId,
/// metadata contents of the latest successfully uploaded checkpoint
disk_consistent_lsn: Lsn,
},
}
impl LayeredTimelineEntry {
fn timeline_id(&self) -> ZTimelineId {
match self {
LayeredTimelineEntry::Local(timeline) => timeline.timelineid,
LayeredTimelineEntry::Remote(timeline_id) => *timeline_id,
LayeredTimelineEntry::Remote { id, .. } => *id,
}
}
@@ -382,7 +393,9 @@ impl LayeredTimelineEntry {
fn local_or_schedule_download(&self, tenant_id: ZTenantId) -> Option<&LayeredTimeline> {
match self {
Self::Local(local) => Some(local.as_ref()),
Self::Remote(timeline_id) => {
Self::Remote {
id: timeline_id, ..
} => {
debug!(
"Accessed a remote timeline {} for tenant {}, scheduling a timeline download",
timeline_id, tenant_id
@@ -392,6 +405,17 @@ impl LayeredTimelineEntry {
}
}
}
/// Gets a current (latest for the remote case) disk consistent Lsn for the timeline.
fn disk_consistent_lsn(&self) -> Lsn {
match self {
Self::Local(local) => local.disk_consistent_lsn.load(),
Self::Remote {
disk_consistent_lsn,
..
} => *disk_consistent_lsn,
}
}
}
/// Private functions
@@ -576,7 +600,7 @@ impl LayeredRepository {
for &timelineid in &timelineids {
let timeline = match self.get_or_init_timeline(timelineid, &mut timelines)? {
LayeredTimelineEntry::Local(timeline) => timeline,
LayeredTimelineEntry::Remote(_) => {
LayeredTimelineEntry::Remote { .. } => {
warn!(
"Timeline {} is not local, cannot proceed with gc",
timelineid
@@ -623,7 +647,7 @@ impl LayeredRepository {
// so this operation is just a quick map lookup.
let timeline = match self.get_or_init_timeline(timelineid, &mut *timelines)? {
LayeredTimelineEntry::Local(timeline) => timeline,
LayeredTimelineEntry::Remote(_) => {
LayeredTimelineEntry::Remote { .. } => {
debug!("Skipping GC for non-local timeline {}", timelineid);
continue;
}

View File

@@ -163,11 +163,16 @@ pub fn start_local_timeline_sync(
ZTenantId,
HashMap<ZTimelineId, TimelineSyncState>,
> = HashMap::new();
for TimelineSyncId(tenant_id, timeline_id) in local_timeline_files.into_keys() {
for (TimelineSyncId(tenant_id, timeline_id), (timeline_metadata, _)) in
local_timeline_files
{
initial_timeline_states
.entry(tenant_id)
.or_default()
.insert(timeline_id, TimelineSyncState::Ready);
.insert(
timeline_id,
TimelineSyncState::Ready(timeline_metadata.disk_consistent_lsn()),
);
}
Ok(SyncStartupData {
initial_timeline_states,

View File

@@ -95,7 +95,7 @@ use tracing::*;
use self::{
compression::ArchiveHeader,
download::download_timeline,
download::{download_timeline, DownloadedTimeline},
index::{
ArchiveDescription, ArchiveId, RelativePath, RemoteTimeline, RemoteTimelineIndex,
TimelineIndexEntry,
@@ -522,7 +522,15 @@ async fn process_task<
"Evicting task {:?} that failed {} times, exceeding the error threshold",
task.kind, task.retries
);
return Some(TimelineSyncState::Evicted);
return Some(TimelineSyncState::Evicted(
remote_assets
.as_ref()
.1
.read()
.await
.timeline_entry(&task.sync_id)
.and_then(TimelineIndexEntry::disk_consistent_lsn),
));
}
if task.retries > 0 {
@@ -538,7 +546,7 @@ async fn process_task<
let sync_name = task.kind.sync_name();
match task.kind {
SyncKind::Download(download_data) => {
let sync_status = download_timeline(
let download_result = download_timeline(
conf,
remote_assets,
task.sync_id,
@@ -546,12 +554,24 @@ async fn process_task<
task.retries + 1,
)
.await;
register_sync_status(sync_start, sync_name, sync_status);
if sync_status? {
Some(TimelineSyncState::Ready)
} else {
Some(TimelineSyncState::AwaitsDownload)
match download_result {
DownloadedTimeline::Abort => {
register_sync_status(sync_start, sync_name, None);
None
}
DownloadedTimeline::FailedAndRescheduled {
disk_consistent_lsn,
} => {
register_sync_status(sync_start, sync_name, Some(false));
Some(TimelineSyncState::AwaitsDownload(disk_consistent_lsn))
}
DownloadedTimeline::Successful {
disk_consistent_lsn,
} => {
register_sync_status(sync_start, sync_name, Some(true));
Some(TimelineSyncState::Ready(disk_consistent_lsn))
}
}
}
SyncKind::Upload(layer_upload) => {
@@ -580,6 +600,8 @@ fn schedule_first_sync_tasks(
VecDeque::with_capacity(local_timeline_files.len().max(local_timeline_files.len()));
for (sync_id, (local_metadata, local_files)) in local_timeline_files {
let local_disk_consistent_lsn = local_metadata.disk_consistent_lsn();
let TimelineSyncId(tenant_id, timeline_id) = sync_id;
match index.timeline_entry(&sync_id) {
Some(index_entry) => {
@@ -590,10 +612,18 @@ fn schedule_first_sync_tasks(
local_files,
index_entry,
);
initial_timeline_statuses
.entry(tenant_id)
.or_default()
.insert(timeline_id, timeline_status);
match timeline_status {
Some(timeline_status) => {
initial_timeline_statuses
.entry(tenant_id)
.or_default()
.insert(timeline_id, timeline_status);
}
None => error!(
"Failed to compare local and remote timeline for task {}",
sync_id
),
}
}
None => {
new_sync_tasks.push_back(SyncTask::new(
@@ -607,7 +637,10 @@ fn schedule_first_sync_tasks(
initial_timeline_statuses
.entry(tenant_id)
.or_default()
.insert(timeline_id, TimelineSyncState::Ready);
.insert(
timeline_id,
TimelineSyncState::Ready(local_disk_consistent_lsn),
);
}
}
}
@@ -624,10 +657,24 @@ fn schedule_first_sync_tasks(
.collect::<Vec<_>>()
{
let TimelineSyncId(cloud_only_tenant_id, cloud_only_timeline_id) = unprocessed_remote_id;
initial_timeline_statuses
.entry(cloud_only_tenant_id)
.or_default()
.insert(cloud_only_timeline_id, TimelineSyncState::CloudOnly);
match index
.timeline_entry(&unprocessed_remote_id)
.and_then(TimelineIndexEntry::disk_consistent_lsn)
{
Some(remote_disk_consistent_lsn) => {
initial_timeline_statuses
.entry(cloud_only_tenant_id)
.or_default()
.insert(
cloud_only_timeline_id,
TimelineSyncState::CloudOnly(remote_disk_consistent_lsn),
);
}
None => error!(
"Failed to find disk consistent LSN for remote timeline {}",
unprocessed_remote_id
),
}
}
new_sync_tasks.into_iter().for_each(|task| {
@@ -642,7 +689,7 @@ fn compare_local_and_remote_timeline(
local_metadata: TimelineMetadata,
local_files: Vec<PathBuf>,
remote_entry: &TimelineIndexEntry,
) -> TimelineSyncState {
) -> Option<TimelineSyncState> {
let local_lsn = local_metadata.disk_consistent_lsn();
let uploads = remote_entry.uploaded_checkpoints();
@@ -663,7 +710,7 @@ fn compare_local_and_remote_timeline(
.filter(|upload_lsn| upload_lsn <= &local_lsn)
.map(ArchiveId)
.collect();
if archives_to_skip.len() != uploads_count {
Some(if archives_to_skip.len() != uploads_count {
new_sync_tasks.push_back(SyncTask::new(
sync_id,
0,
@@ -672,10 +719,10 @@ fn compare_local_and_remote_timeline(
archives_to_skip,
}),
));
TimelineSyncState::AwaitsDownload
TimelineSyncState::AwaitsDownload(remote_entry.disk_consistent_lsn()?)
} else {
TimelineSyncState::Ready
}
TimelineSyncState::Ready(remote_entry.disk_consistent_lsn().unwrap_or(local_lsn))
})
}
fn register_sync_status(sync_start: Instant, sync_name: &str, sync_status: Option<bool>) {

View File

@@ -7,7 +7,7 @@ use anyhow::{anyhow, ensure, Context};
use futures::{stream::FuturesUnordered, StreamExt};
use tokio::{fs, sync::RwLock};
use tracing::{debug, error, trace, warn};
use zenith_utils::zid::ZTenantId;
use zenith_utils::{lsn::Lsn, zid::ZTenantId};
use crate::{
layered_repository::metadata::{metadata_path, TimelineMetadata},
@@ -26,6 +26,18 @@ use super::{
TimelineDownload,
};
/// Timeline download result, with extra data, needed for downloading.
pub(super) enum DownloadedTimeline {
/// Remote timeline data is either absent or corrupt, no download possible.
Abort,
/// Remote timeline data is found, its latest checkpoint's metadata contents (disk_consistent_lsn) is known.
/// Initial download failed due to some error, the download task is rescheduled for another retry.
FailedAndRescheduled { disk_consistent_lsn: Lsn },
/// Remote timeline data is found, its latest checkpoint's metadata contents (disk_consistent_lsn) is known.
/// Initial download successful.
Successful { disk_consistent_lsn: Lsn },
}
/// Attempts to download and uncompress files from all remote archives for the timeline given.
/// Timeline files that already exist locally are skipped during the download, but the local metadata file is
/// updated in the end of every checkpoint archive extraction.
@@ -43,8 +55,61 @@ pub(super) async fn download_timeline<
sync_id: TimelineSyncId,
mut download: TimelineDownload,
retries: u32,
) -> Option<bool> {
) -> DownloadedTimeline {
debug!("Downloading layers for sync id {}", sync_id);
let TimelineSyncId(tenant_id, timeline_id) = sync_id;
let index_read = remote_assets.1.read().await;
let remote_timeline = match index_read.timeline_entry(&sync_id) {
None => {
error!("Cannot download: no timeline is present in the index for given ids");
return DownloadedTimeline::Abort;
}
Some(index_entry) => match index_entry {
TimelineIndexEntry::Full(remote_timeline) => Cow::Borrowed(remote_timeline),
TimelineIndexEntry::Description(_) => {
let remote_disk_consistent_lsn = index_entry.disk_consistent_lsn();
drop(index_read);
debug!("Found timeline description for the given ids, downloading the full index");
match update_index_description(
remote_assets.as_ref(),
&conf.timeline_path(&timeline_id, &tenant_id),
sync_id,
)
.await
{
Ok(remote_timeline) => Cow::Owned(remote_timeline),
Err(e) => {
error!("Failed to download full timeline index: {:#}", e);
return match remote_disk_consistent_lsn {
Some(disk_consistent_lsn) => {
sync_queue::push(SyncTask::new(
sync_id,
retries,
SyncKind::Download(download),
));
DownloadedTimeline::FailedAndRescheduled {
disk_consistent_lsn,
}
}
None => {
error!("Cannot download: no disk consistent Lsn is present for the index entry");
DownloadedTimeline::Abort
}
};
}
}
}
},
};
let disk_consistent_lsn = match remote_timeline.checkpoints().max() {
Some(lsn) => lsn,
None => {
debug!("Cannot download: no disk consistent Lsn is present for the remote timeline");
return DownloadedTimeline::Abort;
}
};
if let Err(e) = download_missing_branches(conf, remote_assets.as_ref(), sync_id.0).await {
error!(
"Failed to download missing branches for sync id {}: {:#}",
@@ -55,42 +120,11 @@ pub(super) async fn download_timeline<
retries,
SyncKind::Download(download),
));
return Some(false);
return DownloadedTimeline::FailedAndRescheduled {
disk_consistent_lsn,
};
}
let TimelineSyncId(tenant_id, timeline_id) = sync_id;
let index_read = remote_assets.1.read().await;
let remote_timeline = match index_read.timeline_entry(&sync_id) {
None => {
error!("Cannot download: no timeline is present in the index for given ids");
return None;
}
Some(TimelineIndexEntry::Full(remote_timeline)) => Cow::Borrowed(remote_timeline),
Some(TimelineIndexEntry::Description(_)) => {
drop(index_read);
debug!("Found timeline description for the given ids, downloading the full index");
match update_index_description(
remote_assets.as_ref(),
&conf.timeline_path(&timeline_id, &tenant_id),
sync_id,
)
.await
{
Ok(remote_timeline) => Cow::Owned(remote_timeline),
Err(e) => {
error!("Failed to download full timeline index: {:#}", e);
sync_queue::push(SyncTask::new(
sync_id,
retries,
SyncKind::Download(download),
));
return Some(false);
}
}
}
};
debug!("Downloading timeline archives");
let archives_to_download = remote_timeline
.checkpoints()
@@ -124,7 +158,9 @@ pub(super) async fn download_timeline<
retries,
SyncKind::Download(download),
));
return Some(false);
return DownloadedTimeline::FailedAndRescheduled {
disk_consistent_lsn,
};
}
Ok(()) => {
debug!("Successfully downloaded archive {:?}", archive_id);
@@ -134,7 +170,9 @@ pub(super) async fn download_timeline<
}
debug!("Finished downloading all timeline's archives");
Some(true)
DownloadedTimeline::Successful {
disk_consistent_lsn,
}
}
async fn try_download_archive<
@@ -160,7 +198,7 @@ async fn try_download_archive<
Ok(local_metadata) => ensure!(
// need to allow `<=` instead of `<` due to cases when a failed archive can be redownloaded
local_metadata.disk_consistent_lsn() <= archive_to_download.disk_consistent_lsn(),
"Cannot download archive with LSN {} since it's earlier than local LSN {}",
"Cannot download archive with Lsn {} since it's earlier than local Lsn {}",
archive_to_download.disk_consistent_lsn(),
local_metadata.disk_consistent_lsn()
),

View File

@@ -119,16 +119,30 @@ pub enum TimelineIndexEntry {
impl TimelineIndexEntry {
pub fn uploaded_checkpoints(&self) -> BTreeSet<Lsn> {
match self {
TimelineIndexEntry::Description(description) => {
Self::Description(description) => {
description.keys().map(|archive_id| archive_id.0).collect()
}
TimelineIndexEntry::Full(remote_timeline) => remote_timeline
Self::Full(remote_timeline) => remote_timeline
.checkpoint_archives
.keys()
.map(|archive_id| archive_id.0)
.collect(),
}
}
/// Gets latest uploaded checkpoint's disk consisten Lsn for the corresponding timeline.
pub fn disk_consistent_lsn(&self) -> Option<Lsn> {
match self {
Self::Description(description) => {
description.keys().map(|archive_id| archive_id.0).max()
}
Self::Full(remote_timeline) => remote_timeline
.checkpoint_archives
.keys()
.map(|archive_id| archive_id.0)
.max(),
}
}
}
/// Checkpoint archive's id, corresponding to the `disk_consistent_lsn` from the timeline's metadata file during checkpointing.

View File

@@ -69,7 +69,11 @@ pub enum RepositoryTimeline {
/// Loaded into pageserver's memory and ready to be used.
Local(Arc<dyn Timeline>),
/// Timeline, found on the pageserver's remote storage, but not yet downloaded locally.
Remote(ZTimelineId),
Remote {
id: ZTimelineId,
/// metadata contents of the latest successfully uploaded checkpoint
disk_consistent_lsn: Lsn,
},
}
impl RepositoryTimeline {
@@ -82,22 +86,38 @@ impl RepositoryTimeline {
}
}
/// A state of the timeline synchronization with the remote storage.
/// Contains `disk_consistent_lsn` of the corresponding remote timeline (latest checkpoint's disk_consistent_lsn).
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
pub enum TimelineSyncState {
/// No further downloads from the remote storage are needed.
/// The timeline state is up-to-date or ahead of the remote storage one,
/// ready to be used in any pageserver operation.
Ready,
Ready(Lsn),
/// Timeline is scheduled for downloading, but its current local state is not up to date with the remote storage.
/// The timeline is not ready to be used in any pageserver operations, otherwise it might diverge its local state from the remote version,
/// making it impossible to sync it further.
AwaitsDownload,
AwaitsDownload(Lsn),
/// Timeline was not in the pageserver's local working directory, but was found on the remote storage, ready to be downloaded.
/// Cannot be used in any pageserver operations due to complete absence locally.
CloudOnly,
CloudOnly(Lsn),
/// Timeline was evicted from the pageserver's local working directory due to conflicting remote and local states or too many errors during the synchronization.
/// Such timelines cannot have their state synchronized further.
Evicted,
/// Such timelines cannot have their state synchronized further and may not have the data about remote timeline's disk_consistent_lsn, since eviction may happen
/// due to errors before the remote timeline contents is known.
Evicted(Option<Lsn>),
}
impl TimelineSyncState {
pub fn remote_disk_consistent_lsn(&self) -> Option<Lsn> {
Some(match self {
TimelineSyncState::Evicted(None) => return None,
TimelineSyncState::Ready(lsn) => lsn,
TimelineSyncState::AwaitsDownload(lsn) => lsn,
TimelineSyncState::CloudOnly(lsn) => lsn,
TimelineSyncState::Evicted(Some(lsn)) => lsn,
})
.copied()
}
}
///

View File

@@ -209,6 +209,9 @@ fn walreceiver_main(
tenantid, timelineid,
)
})?;
let _timeline_synced_disk_consistent_lsn = tenant_mgr::get_repository_for_tenant(tenantid)?
.get_timeline_state(timelineid)
.and_then(|state| state.remote_disk_consistent_lsn());
//
// Start streaming the WAL, from where we left off previously.