Use proper download order

This commit is contained in:
Kirill Bulatov
2021-12-13 23:36:03 +02:00
committed by Kirill Bulatov
parent 8f0cafd508
commit 5cff7d1de9
6 changed files with 107 additions and 44 deletions

31
Cargo.lock generated
View File

@@ -1164,6 +1164,7 @@ dependencies = [
"tokio",
"toml",
"tracing",
"tracing-futures",
"url",
"workspace_hack",
"zenith_metrics",
@@ -1236,6 +1237,26 @@ dependencies = [
"siphasher",
]
[[package]]
name = "pin-project"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "576bc800220cc65dac09e99e97b08b358cfab6e17078de8dc5fee223bd2d0c08"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e8fe8163d14ce7f0cdac2e040116f22eac817edabff0be91e8aff7e9accf389"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "pin-project-lite"
version = "0.2.7"
@@ -2113,6 +2134,16 @@ dependencies = [
"lazy_static",
]
[[package]]
name = "tracing-futures"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2"
dependencies = [
"pin-project",
"tracing",
]
[[package]]
name = "tracing-log"
version = "0.1.2"

View File

@@ -35,6 +35,7 @@ scopeguard = "1.1.0"
async-trait = "0.1"
const_format = "0.2.21"
tracing = "0.1.27"
tracing-futures = "0.2"
signal-hook = "0.3.10"
url = "2"
nix = "0.23"

View File

@@ -343,19 +343,23 @@ impl Repository for LayeredRepository {
fn shutdown_timeline(
tenant_id: ZTenantId,
timelineid: ZTimelineId,
timeline_id: ZTimelineId,
timeline: &LayeredTimelineEntry,
) -> Result<(), anyhow::Error> {
if let Some(timeline) = timeline.local_or_schedule_download(tenant_id) {
timeline
.upload_relishes
.store(false, atomic::Ordering::Relaxed);
walreceiver::stop_wal_receiver(timelineid);
trace!("repo shutdown. checkpoint timeline {}", timelineid);
timeline.checkpoint(CheckpointConfig::Forced)?;
//TODO Wait for walredo process to shutdown too
} else {
warn!("Skpping shutdown of a remote timeline");
match timeline {
LayeredTimelineEntry::Local(timeline) => {
timeline
.upload_relishes
.store(false, atomic::Ordering::Relaxed);
walreceiver::stop_wal_receiver(timeline_id);
trace!("repo shutdown. checkpoint timeline {}", timeline_id);
timeline.checkpoint(CheckpointConfig::Forced)?;
//TODO Wait for walredo process to shutdown too
}
LayeredTimelineEntry::Remote(_) => warn!(
"Skipping shutdown of a remote timeline {} for tenant {}",
timeline_id, tenant_id
),
}
Ok(())
}

View File

@@ -112,7 +112,7 @@ pub struct TimelineSyncId(ZTenantId, ZTimelineId);
impl std::fmt::Display for TimelineSyncId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "(tenant id: {}, timeline id: {})", self.0, self.1)
write!(f, "(tenant: {}, timeline: {})", self.0, self.1)
}
}

View File

@@ -135,7 +135,7 @@ lazy_static! {
/// mpsc approach was picked to allow blocking the sync loop if no tasks are present, to avoid meaningless spinning.
mod sync_queue {
use std::{
collections::BTreeSet,
collections::{BTreeSet, HashMap},
sync::atomic::{AtomicUsize, Ordering},
};
@@ -199,17 +199,16 @@ mod sync_queue {
receiver: &mut UnboundedReceiver<SyncTask>,
mut max_batch_size: usize,
) -> BTreeSet<SyncTask> {
let mut tasks = BTreeSet::new();
if max_batch_size == 0 {
return tasks;
return BTreeSet::new();
}
let mut tasks = HashMap::with_capacity(max_batch_size);
loop {
match receiver.try_recv() {
Ok(new_task) => {
LENGTH.fetch_sub(1, Ordering::Relaxed);
if tasks.insert(new_task) {
if tasks.insert(new_task.sync_id, new_task).is_none() {
max_batch_size -= 1;
if max_batch_size == 0 {
break;
@@ -227,7 +226,7 @@ mod sync_queue {
}
}
tasks
tasks.into_values().collect()
}
/// Length of the queue, assuming that all receiver counterparts were only called using the queue api.
@@ -264,6 +263,15 @@ enum SyncKind {
Upload(NewCheckpoint),
}
impl SyncKind {
fn sync_name(&self) -> &'static str {
match self {
Self::Download(_) => "download",
Self::Upload(_) => "upload",
}
}
}
/// Local timeline files for upload, appeared after the new checkpoint.
/// Current checkpoint design assumes new files are added only, no deletions or amendment happens.
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
@@ -405,15 +413,19 @@ fn storage_sync_loop<
) -> anyhow::Result<()> {
let remote_assets = Arc::new((storage, RwLock::new(index)));
while !crate::tenant_mgr::shutdown_requested() {
let new_timeline_states = runtime.block_on(loop_step(
conf,
&mut receiver,
Arc::clone(&remote_assets),
max_concurrent_sync,
max_sync_errors,
));
let new_timeline_states = runtime.block_on(
loop_step(
conf,
&mut receiver,
Arc::clone(&remote_assets),
max_concurrent_sync,
max_sync_errors,
)
.instrument(debug_span!("storage_sync_loop_step")),
);
// Batch timeline download registration to ensure that the external registration code won't block any running tasks before.
set_timeline_states(conf, new_timeline_states);
debug!("Sync loop step completed");
}
debug!("Shutdown requested, stopping");
@@ -458,12 +470,13 @@ async fn loop_step<
.into_iter()
.map(|task| async {
let sync_id = task.sync_id;
let extra_step = match tokio::spawn(process_task(
conf,
Arc::clone(&remote_assets),
task,
max_sync_errors,
))
let attempt = task.retries;
let sync_name = task.kind.sync_name();
let extra_step = match tokio::spawn(
process_task(conf, Arc::clone(&remote_assets), task, max_sync_errors)
.instrument(debug_span!("", sync_id = %sync_id, attempt, sync_name)),
)
.await
{
Ok(extra_step) => extra_step,
@@ -522,6 +535,7 @@ async fn process_task<
}
let sync_start = Instant::now();
let sync_name = task.kind.sync_name();
match task.kind {
SyncKind::Download(download_data) => {
let sync_status = download_timeline(
@@ -532,7 +546,7 @@ async fn process_task<
task.retries + 1,
)
.await;
register_sync_status(sync_start, "download", sync_status);
register_sync_status(sync_start, sync_name, sync_status);
if sync_status? {
Some(TimelineSyncState::Ready)
@@ -549,7 +563,7 @@ async fn process_task<
task.retries + 1,
)
.await;
register_sync_status(sync_start, "upload", sync_status);
register_sync_status(sync_start, sync_name, sync_status);
None
}
}

View File

@@ -6,7 +6,7 @@ use std::{borrow::Cow, collections::BTreeSet, path::PathBuf, sync::Arc};
use anyhow::{anyhow, ensure, Context};
use futures::{stream::FuturesUnordered, StreamExt};
use tokio::{fs, sync::RwLock};
use tracing::{debug, error, warn};
use tracing::{debug, error, trace, warn};
use zenith_utils::zid::ZTenantId;
use crate::{
@@ -91,7 +91,8 @@ pub(super) async fn download_timeline<
}
};
let mut archives_to_download = remote_timeline
debug!("Downloading timeline archives");
let archives_to_download = remote_timeline
.checkpoints()
.map(ArchiveId)
.filter(|remote_archive| !download.archives_to_skip.contains(remote_archive))
@@ -99,8 +100,9 @@ pub(super) async fn download_timeline<
let archives_total = archives_to_download.len();
debug!("Downloading {} archives of a timeline", archives_total);
trace!("Archives to download: {:?}", archives_to_download);
while let Some(archive_id) = archives_to_download.pop() {
for (archives_downloaded, archive_id) in archives_to_download.into_iter().enumerate() {
match try_download_archive(
conf,
sync_id,
@@ -112,10 +114,10 @@ pub(super) async fn download_timeline<
.await
{
Err(e) => {
let archives_left = archives_to_download.len();
let archives_left = archives_total - archives_downloaded;
error!(
"Failed to download archive {:?} for tenant {} timeline {} : {:#}, requeueing the download ({} archives left out of {})",
archive_id, tenant_id, timeline_id, e, archives_left, archives_total
"Failed to download archive {:?} (archives downloaded: {}; archives left: {}) for tenant {} timeline {}, requeueing the download: {:#}",
archive_id, archives_downloaded, archives_left, tenant_id, timeline_id, e
);
sync_queue::push(SyncTask::new(
sync_id,
@@ -302,13 +304,9 @@ mod tests {
#[tokio::test]
async fn test_download_timeline() -> anyhow::Result<()> {
let tempdir = tempdir()?;
let tempdir_path = tempdir.path();
let _ = zenith_utils::logging::init(tempdir_path.join("log.log"), false);
let repo_harness = RepoHarness::create("test_download_timeline")?;
let sync_id = TimelineSyncId(repo_harness.tenant_id, TIMELINE_ID);
let storage = LocalFs::new(tempdir_path.to_owned(), &repo_harness.conf.workdir)?;
let storage = LocalFs::new(tempdir()?.path().to_owned(), &repo_harness.conf.workdir)?;
let index = RwLock::new(RemoteTimelineIndex::try_parse_descriptions_from_paths(
repo_harness.conf,
storage
@@ -335,6 +333,21 @@ mod tests {
regular_timeline,
)
.await;
// upload multiple checkpoints for the same timeline
let regular_timeline = create_local_timeline(
&repo_harness,
TIMELINE_ID,
&["c", "d"],
dummy_metadata(Lsn(0x40)),
)?;
ensure_correct_timeline_upload(
&repo_harness,
Arc::clone(&remote_assets),
TIMELINE_ID,
regular_timeline,
)
.await;
fs::remove_dir_all(&regular_timeline_path).await?;
let remote_regular_timeline = expect_timeline(index, sync_id).await;