Reuse tenant-timeline id struct from utils

This commit is contained in:
Kirill Bulatov
2022-02-14 15:05:52 +02:00
committed by Kirill Bulatov
parent 0a557b2fa9
commit 5563ff123f
7 changed files with 91 additions and 62 deletions

View File

@@ -94,7 +94,7 @@ use std::{
use anyhow::{bail, Context};
use tokio::io;
use tracing::{error, info};
use zenith_utils::zid::{ZTenantId, ZTimelineId};
use zenith_utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId};
pub use self::storage_sync::{schedule_timeline_checkpoint_upload, schedule_timeline_download};
use self::{local_fs::LocalFs, rust_s3::S3};
@@ -106,17 +106,6 @@ use crate::{
pub use storage_sync::compression;
/// Any timeline has its own id and its own tenant it belongs to,
/// the sync processes group timelines by both for simplicity.
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Hash)]
pub struct TimelineSyncId(ZTenantId, ZTimelineId);
impl std::fmt::Display for TimelineSyncId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "(tenant: {}, timeline: {})", self.0, self.1)
}
}
/// A structure to combine all synchronization data to share with pageserver after a successful sync loop initialization.
/// Successful initialization includes a case when sync loop is not started, in which case the startup data is returned still,
/// to simplify the received code.
@@ -169,7 +158,7 @@ pub fn start_local_timeline_sync(
ZTenantId,
HashMap<ZTimelineId, TimelineSyncState>,
> = HashMap::new();
for (TimelineSyncId(tenant_id, timeline_id), (timeline_metadata, _)) in
for (ZTenantTimelineId{tenant_id, timeline_id}, (timeline_metadata, _)) in
local_timeline_files
{
initial_timeline_states
@@ -189,7 +178,7 @@ pub fn start_local_timeline_sync(
fn local_tenant_timeline_files(
config: &'static PageServerConf,
) -> anyhow::Result<HashMap<TimelineSyncId, (TimelineMetadata, Vec<PathBuf>)>> {
) -> anyhow::Result<HashMap<ZTenantTimelineId, (TimelineMetadata, Vec<PathBuf>)>> {
let mut local_tenant_timeline_files = HashMap::new();
let tenants_dir = config.tenants_path();
for tenants_dir_entry in fs::read_dir(&tenants_dir)
@@ -224,8 +213,9 @@ fn local_tenant_timeline_files(
fn collect_timelines_for_tenant(
config: &'static PageServerConf,
tenant_path: &Path,
) -> anyhow::Result<HashMap<TimelineSyncId, (TimelineMetadata, Vec<PathBuf>)>> {
let mut timelines: HashMap<TimelineSyncId, (TimelineMetadata, Vec<PathBuf>)> = HashMap::new();
) -> anyhow::Result<HashMap<ZTenantTimelineId, (TimelineMetadata, Vec<PathBuf>)>> {
let mut timelines: HashMap<ZTenantTimelineId, (TimelineMetadata, Vec<PathBuf>)> =
HashMap::new();
let tenant_id = tenant_path
.file_name()
.and_then(ffi::OsStr::to_str)
@@ -246,7 +236,10 @@ fn collect_timelines_for_tenant(
match collect_timeline_files(&timeline_path) {
Ok((timeline_id, metadata, timeline_files)) => {
timelines.insert(
TimelineSyncId(tenant_id, timeline_id),
ZTenantTimelineId {
tenant_id,
timeline_id,
},
(metadata, timeline_files),
);
}

View File

@@ -106,7 +106,7 @@ use self::{
},
upload::upload_timeline_checkpoint,
};
use super::{RemoteStorage, SyncStartupData, TimelineSyncId};
use super::{RemoteStorage, SyncStartupData, ZTenantTimelineId};
use crate::{
config::PageServerConf, layered_repository::metadata::TimelineMetadata,
remote_storage::storage_sync::compression::read_archive_header, repository::TimelineSyncState,
@@ -243,13 +243,13 @@ mod sync_queue {
/// Limited by the number of retries, after certain threshold the failing task gets evicted and the timeline disabled.
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
pub struct SyncTask {
sync_id: TimelineSyncId,
sync_id: ZTenantTimelineId,
retries: u32,
kind: SyncKind,
}
impl SyncTask {
fn new(sync_id: TimelineSyncId, retries: u32, kind: SyncKind) -> Self {
fn new(sync_id: ZTenantTimelineId, retries: u32, kind: SyncKind) -> Self {
Self {
sync_id,
retries,
@@ -308,7 +308,10 @@ pub fn schedule_timeline_checkpoint_upload(
}
if !sync_queue::push(SyncTask::new(
TimelineSyncId(tenant_id, timeline_id),
ZTenantTimelineId {
tenant_id,
timeline_id,
},
0,
SyncKind::Upload(NewCheckpoint { layers, metadata }),
)) {
@@ -339,7 +342,10 @@ pub fn schedule_timeline_download(tenant_id: ZTenantId, timeline_id: ZTimelineId
tenant_id, timeline_id
);
sync_queue::push(SyncTask::new(
TimelineSyncId(tenant_id, timeline_id),
ZTenantTimelineId {
tenant_id,
timeline_id,
},
0,
SyncKind::Download(TimelineDownload {
files_to_skip: Arc::new(BTreeSet::new()),
@@ -355,7 +361,7 @@ pub(super) fn spawn_storage_sync_thread<
S: RemoteStorage<StoragePath = P> + Send + Sync + 'static,
>(
conf: &'static PageServerConf,
local_timeline_files: HashMap<TimelineSyncId, (TimelineMetadata, Vec<PathBuf>)>,
local_timeline_files: HashMap<ZTenantTimelineId, (TimelineMetadata, Vec<PathBuf>)>,
storage: S,
max_concurrent_sync: NonZeroUsize,
max_sync_errors: NonZeroU32,
@@ -511,7 +517,7 @@ async fn loop_step<
Err(e) => {
error!(
"Failed to process storage sync task for tenant {}, timeline {}: {:?}",
sync_id.0, sync_id.1, e
sync_id.tenant_id, sync_id.timeline_id, e
);
None
}
@@ -525,7 +531,10 @@ async fn loop_step<
while let Some((sync_id, state_update)) = task_batch.next().await {
debug!("Finished storage sync task for sync id {}", sync_id);
if let Some(state_update) = state_update {
let TimelineSyncId(tenant_id, timeline_id) = sync_id;
let ZTenantTimelineId {
tenant_id,
timeline_id,
} = sync_id;
new_timeline_states
.entry(tenant_id)
.or_default()
@@ -619,7 +628,7 @@ async fn process_task<
fn schedule_first_sync_tasks(
index: &RemoteTimelineIndex,
local_timeline_files: HashMap<TimelineSyncId, (TimelineMetadata, Vec<PathBuf>)>,
local_timeline_files: HashMap<ZTenantTimelineId, (TimelineMetadata, Vec<PathBuf>)>,
) -> HashMap<ZTenantId, HashMap<ZTimelineId, TimelineSyncState>> {
let mut initial_timeline_statuses: HashMap<ZTenantId, HashMap<ZTimelineId, TimelineSyncState>> =
HashMap::new();
@@ -630,7 +639,10 @@ fn schedule_first_sync_tasks(
for (sync_id, (local_metadata, local_files)) in local_timeline_files {
let local_disk_consistent_lsn = local_metadata.disk_consistent_lsn();
let TimelineSyncId(tenant_id, timeline_id) = sync_id;
let ZTenantTimelineId {
tenant_id,
timeline_id,
} = sync_id;
match index.timeline_entry(&sync_id) {
Some(index_entry) => {
let timeline_status = compare_local_and_remote_timeline(
@@ -673,10 +685,10 @@ fn schedule_first_sync_tasks(
}
}
let unprocessed_remote_ids = |remote_id: &TimelineSyncId| {
let unprocessed_remote_ids = |remote_id: &ZTenantTimelineId| {
initial_timeline_statuses
.get(&remote_id.0)
.and_then(|timelines| timelines.get(&remote_id.1))
.get(&remote_id.tenant_id)
.and_then(|timelines| timelines.get(&remote_id.timeline_id))
.is_none()
};
for unprocessed_remote_id in index
@@ -684,7 +696,10 @@ fn schedule_first_sync_tasks(
.filter(unprocessed_remote_ids)
.collect::<Vec<_>>()
{
let TimelineSyncId(cloud_only_tenant_id, cloud_only_timeline_id) = unprocessed_remote_id;
let ZTenantTimelineId {
tenant_id: cloud_only_tenant_id,
timeline_id: cloud_only_timeline_id,
} = unprocessed_remote_id;
match index
.timeline_entry(&unprocessed_remote_id)
.and_then(TimelineIndexEntry::disk_consistent_lsn)
@@ -713,7 +728,7 @@ fn schedule_first_sync_tasks(
fn compare_local_and_remote_timeline(
new_sync_tasks: &mut VecDeque<SyncTask>,
sync_id: TimelineSyncId,
sync_id: ZTenantTimelineId,
local_metadata: TimelineMetadata,
local_files: Vec<PathBuf>,
remote_entry: &TimelineIndexEntry,
@@ -770,7 +785,7 @@ async fn update_index_description<
>(
(storage, index): &(S, RwLock<RemoteTimelineIndex>),
timeline_dir: &Path,
id: TimelineSyncId,
id: ZTenantTimelineId,
) -> anyhow::Result<RemoteTimeline> {
let mut index_write = index.write().await;
let full_index = match index_write.timeline_entry(&id) {
@@ -793,7 +808,7 @@ async fn update_index_description<
Ok((archive_id, header_size, header)) => full_index.update_archive_contents(archive_id.0, header, header_size),
Err((e, archive_id)) => bail!(
"Failed to download archive header for tenant {}, timeline {}, archive for Lsn {}: {}",
id.0, id.1, archive_id.0,
id.tenant_id, id.timeline_id, archive_id.0,
e
),
}
@@ -871,7 +886,7 @@ mod test_utils {
timeline_id: ZTimelineId,
new_upload: NewCheckpoint,
) {
let sync_id = TimelineSyncId(harness.tenant_id, timeline_id);
let sync_id = ZTenantTimelineId::new(harness.tenant_id, timeline_id);
upload_timeline_checkpoint(
harness.conf,
Arc::clone(&remote_assets),
@@ -927,7 +942,7 @@ mod test_utils {
pub async fn expect_timeline(
index: &RwLock<RemoteTimelineIndex>,
sync_id: TimelineSyncId,
sync_id: ZTenantTimelineId,
) -> RemoteTimeline {
if let Some(TimelineIndexEntry::Full(remote_timeline)) =
index.read().await.timeline_entry(&sync_id)
@@ -962,18 +977,18 @@ mod test_utils {
let mut expected_timeline_entries = BTreeMap::new();
for sync_id in actual_sync_ids {
actual_branches.insert(
sync_id.1,
sync_id.tenant_id,
index_read
.branch_files(sync_id.0)
.branch_files(sync_id.tenant_id)
.into_iter()
.flat_map(|branch_paths| branch_paths.iter())
.cloned()
.collect::<BTreeSet<_>>(),
);
expected_branches.insert(
sync_id.1,
sync_id.tenant_id,
expected_index_with_descriptions
.branch_files(sync_id.0)
.branch_files(sync_id.tenant_id)
.into_iter()
.flat_map(|branch_paths| branch_paths.iter())
.cloned()

View File

@@ -17,7 +17,7 @@ use crate::{
compression, index::TimelineIndexEntry, sync_queue, tenant_branch_files,
update_index_description, SyncKind, SyncTask,
},
RemoteStorage, TimelineSyncId,
RemoteStorage, ZTenantTimelineId,
},
};
@@ -52,13 +52,16 @@ pub(super) async fn download_timeline<
>(
conf: &'static PageServerConf,
remote_assets: Arc<(S, RwLock<RemoteTimelineIndex>)>,
sync_id: TimelineSyncId,
sync_id: ZTenantTimelineId,
mut download: TimelineDownload,
retries: u32,
) -> DownloadedTimeline {
debug!("Downloading layers for sync id {}", sync_id);
let TimelineSyncId(tenant_id, timeline_id) = sync_id;
let ZTenantTimelineId {
tenant_id,
timeline_id,
} = sync_id;
let index_read = remote_assets.1.read().await;
let remote_timeline = match index_read.timeline_entry(&sync_id) {
None => {
@@ -110,7 +113,8 @@ pub(super) async fn download_timeline<
}
};
if let Err(e) = download_missing_branches(conf, remote_assets.as_ref(), sync_id.0).await {
if let Err(e) = download_missing_branches(conf, remote_assets.as_ref(), sync_id.tenant_id).await
{
error!(
"Failed to download missing branches for sync id {}: {:?}",
sync_id, e
@@ -180,7 +184,10 @@ async fn try_download_archive<
S: RemoteStorage<StoragePath = P> + Send + Sync + 'static,
>(
conf: &'static PageServerConf,
TimelineSyncId(tenant_id, timeline_id): TimelineSyncId,
ZTenantTimelineId {
tenant_id,
timeline_id,
}: ZTenantTimelineId,
remote_assets: Arc<(S, RwLock<RemoteTimelineIndex>)>,
remote_timeline: &RemoteTimeline,
archive_id: ArchiveId,
@@ -343,7 +350,7 @@ mod tests {
#[tokio::test]
async fn test_download_timeline() -> anyhow::Result<()> {
let repo_harness = RepoHarness::create("test_download_timeline")?;
let sync_id = TimelineSyncId(repo_harness.tenant_id, TIMELINE_ID);
let sync_id = ZTenantTimelineId::new(repo_harness.tenant_id, TIMELINE_ID);
let storage = LocalFs::new(tempdir()?.path().to_owned(), &repo_harness.conf.workdir)?;
let index = RwLock::new(RemoteTimelineIndex::try_parse_descriptions_from_paths(
repo_harness.conf,

View File

@@ -22,7 +22,7 @@ use crate::{
layered_repository::TIMELINES_SEGMENT_NAME,
remote_storage::{
storage_sync::compression::{parse_archive_name, FileEntry},
TimelineSyncId,
ZTenantTimelineId,
},
};
@@ -53,7 +53,7 @@ impl RelativePath {
#[derive(Debug, Clone)]
pub struct RemoteTimelineIndex {
branch_files: HashMap<ZTenantId, HashSet<RelativePath>>,
timeline_files: HashMap<TimelineSyncId, TimelineIndexEntry>,
timeline_files: HashMap<ZTenantTimelineId, TimelineIndexEntry>,
}
impl RemoteTimelineIndex {
@@ -80,19 +80,22 @@ impl RemoteTimelineIndex {
index
}
pub fn timeline_entry(&self, id: &TimelineSyncId) -> Option<&TimelineIndexEntry> {
pub fn timeline_entry(&self, id: &ZTenantTimelineId) -> Option<&TimelineIndexEntry> {
self.timeline_files.get(id)
}
pub fn timeline_entry_mut(&mut self, id: &TimelineSyncId) -> Option<&mut TimelineIndexEntry> {
pub fn timeline_entry_mut(
&mut self,
id: &ZTenantTimelineId,
) -> Option<&mut TimelineIndexEntry> {
self.timeline_files.get_mut(id)
}
pub fn add_timeline_entry(&mut self, id: TimelineSyncId, entry: TimelineIndexEntry) {
pub fn add_timeline_entry(&mut self, id: ZTenantTimelineId, entry: TimelineIndexEntry) {
self.timeline_files.insert(id, entry);
}
pub fn all_sync_ids(&self) -> impl Iterator<Item = TimelineSyncId> + '_ {
pub fn all_sync_ids(&self) -> impl Iterator<Item = ZTenantTimelineId> + '_ {
self.timeline_files.keys().copied()
}
@@ -348,7 +351,10 @@ fn try_parse_index_entry(
.to_string_lossy()
.to_string();
let sync_id = TimelineSyncId(tenant_id, timeline_id);
let sync_id = ZTenantTimelineId {
tenant_id,
timeline_id,
};
let timeline_index_entry = index
.timeline_files
.entry(sync_id)

View File

@@ -17,7 +17,7 @@ use crate::{
index::{RemoteTimeline, TimelineIndexEntry},
sync_queue, tenant_branch_files, update_index_description, SyncKind, SyncTask,
},
RemoteStorage, TimelineSyncId,
RemoteStorage, ZTenantTimelineId,
},
};
@@ -36,12 +36,13 @@ pub(super) async fn upload_timeline_checkpoint<
>(
config: &'static PageServerConf,
remote_assets: Arc<(S, RwLock<RemoteTimelineIndex>)>,
sync_id: TimelineSyncId,
sync_id: ZTenantTimelineId,
new_checkpoint: NewCheckpoint,
retries: u32,
) -> Option<bool> {
debug!("Uploading checkpoint for sync id {}", sync_id);
if let Err(e) = upload_missing_branches(config, remote_assets.as_ref(), sync_id.0).await {
if let Err(e) = upload_missing_branches(config, remote_assets.as_ref(), sync_id.tenant_id).await
{
error!(
"Failed to upload missing branches for sync id {}: {:?}",
sync_id, e
@@ -57,7 +58,10 @@ pub(super) async fn upload_timeline_checkpoint<
let index = &remote_assets.1;
let TimelineSyncId(tenant_id, timeline_id) = sync_id;
let ZTenantTimelineId {
tenant_id,
timeline_id,
} = sync_id;
let timeline_dir = config.timeline_path(&timeline_id, &tenant_id);
let index_read = index.read().await;
@@ -151,11 +155,14 @@ async fn try_upload_checkpoint<
>(
config: &'static PageServerConf,
remote_assets: Arc<(S, RwLock<RemoteTimelineIndex>)>,
sync_id: TimelineSyncId,
sync_id: ZTenantTimelineId,
new_checkpoint: &NewCheckpoint,
files_to_skip: BTreeSet<PathBuf>,
) -> anyhow::Result<(ArchiveHeader, u64)> {
let TimelineSyncId(tenant_id, timeline_id) = sync_id;
let ZTenantTimelineId {
tenant_id,
timeline_id,
} = sync_id;
let timeline_dir = config.timeline_path(&timeline_id, &tenant_id);
let files_to_upload = new_checkpoint
@@ -288,7 +295,7 @@ mod tests {
#[tokio::test]
async fn reupload_timeline() -> anyhow::Result<()> {
let repo_harness = RepoHarness::create("reupload_timeline")?;
let sync_id = TimelineSyncId(repo_harness.tenant_id, TIMELINE_ID);
let sync_id = ZTenantTimelineId::new(repo_harness.tenant_id, TIMELINE_ID);
let storage = LocalFs::new(tempdir()?.path().to_owned(), &repo_harness.conf.workdir)?;
let index = RwLock::new(RemoteTimelineIndex::try_parse_descriptions_from_paths(
repo_harness.conf,
@@ -484,7 +491,7 @@ mod tests {
#[tokio::test]
async fn reupload_timeline_rejected() -> anyhow::Result<()> {
let repo_harness = RepoHarness::create("reupload_timeline_rejected")?;
let sync_id = TimelineSyncId(repo_harness.tenant_id, TIMELINE_ID);
let sync_id = ZTenantTimelineId::new(repo_harness.tenant_id, TIMELINE_ID);
let storage = LocalFs::new(tempdir()?.path().to_owned(), &repo_harness.conf.workdir)?;
let index = RwLock::new(RemoteTimelineIndex::try_parse_descriptions_from_paths(
repo_harness.conf,