Do not require metadata in every upload sync task

This commit is contained in:
Kirill Bulatov
2022-05-04 22:33:53 +03:00
committed by Kirill Bulatov
parent 52a7e3155e
commit 2ef0e5c6ed
4 changed files with 91 additions and 65 deletions

View File

@@ -1789,7 +1789,7 @@ impl LayeredTimeline {
self.tenantid,
self.timelineid,
HashSet::from([new_delta_path]),
metadata,
Some(metadata),
);
}
@@ -1857,13 +1857,11 @@ impl LayeredTimeline {
}
}
if self.upload_layers.load(atomic::Ordering::Relaxed) {
let metadata = load_metadata(self.conf, self.timelineid, self.tenantid)
.context("failed to load local metadata")?;
remote_storage::schedule_layer_upload(
self.tenantid,
self.timelineid,
layer_paths_to_upload,
metadata,
None,
);
}
timer.stop_and_record();
@@ -2045,17 +2043,6 @@ impl LayeredTimeline {
layers.insert_historic(Arc::new(l));
}
if self.upload_layers.load(atomic::Ordering::Relaxed) {
let metadata = load_metadata(self.conf, self.timelineid, self.tenantid)
.context("failed to load local metadata")?;
remote_storage::schedule_layer_upload(
self.tenantid,
self.timelineid,
new_layer_paths,
metadata,
);
}
// Now that we have reshuffled the data to set of new delta layers, we can
// delete the old ones
let mut layer_paths_do_delete = HashSet::with_capacity(level0_deltas.len());
@@ -2069,6 +2056,12 @@ impl LayeredTimeline {
drop(layers);
if self.upload_layers.load(atomic::Ordering::Relaxed) {
remote_storage::schedule_layer_upload(
self.tenantid,
self.timelineid,
new_layer_paths,
None,
);
remote_storage::schedule_layer_delete(
self.tenantid,
self.timelineid,

View File

@@ -105,7 +105,7 @@ pub trait Layer: Send + Sync {
/// log messages, even though they're never not on disk.)
fn filename(&self) -> PathBuf;
/// If a layer has a corresponding file on a local filesystem, return its path.
/// If a layer has a corresponding file on a local filesystem, return its absolute path.
fn local_path(&self) -> Option<PathBuf>;
///

View File

@@ -72,7 +72,7 @@ use std::{
sync::Arc,
};
use anyhow::Context;
use anyhow::{bail, Context};
use futures::stream::{FuturesUnordered, StreamExt};
use lazy_static::lazy_static;
use tokio::{
@@ -341,8 +341,16 @@ impl SyncTask {
.extend(new_upload_data.data.uploaded_layers.into_iter());
upload_data.retries = 0;
if new_upload_data.data.metadata.disk_consistent_lsn()
> upload_data.data.metadata.disk_consistent_lsn()
if new_upload_data
.data
.metadata
.as_ref()
.map(|meta| meta.disk_consistent_lsn())
> upload_data
.data
.metadata
.as_ref()
.map(|meta| meta.disk_consistent_lsn())
{
upload_data.data.metadata = new_upload_data.data.metadata;
}
@@ -371,8 +379,16 @@ impl SyncTask {
.extend(new_upload_data.data.uploaded_layers.into_iter());
upload_data.retries = 0;
if new_upload_data.data.metadata.disk_consistent_lsn()
> upload_data.data.metadata.disk_consistent_lsn()
if new_upload_data
.data
.metadata
.as_ref()
.map(|meta| meta.disk_consistent_lsn())
> upload_data
.data
.metadata
.as_ref()
.map(|meta| meta.disk_consistent_lsn())
{
upload_data.data.metadata = new_upload_data.data.metadata;
}
@@ -410,7 +426,7 @@ pub struct TimelineUpload {
/// Already uploaded layers. Used to store the data about the uploads between task retries
/// and to record the data into the remote index after the task got completed or evicted.
uploaded_layers: HashSet<PathBuf>,
metadata: TimelineMetadata,
metadata: Option<TimelineMetadata>,
}
/// A timeline download task.
@@ -431,7 +447,7 @@ pub fn schedule_layer_upload(
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
layers_to_upload: HashSet<PathBuf>,
metadata: TimelineMetadata,
metadata: Option<TimelineMetadata>,
) {
if !sync_queue::push(
ZTenantTimelineId {
@@ -932,23 +948,24 @@ async fn upload_timeline<P, S>(
}
UploadedTimeline::Successful(upload_data) => upload_data,
UploadedTimeline::SuccessfulAfterLocalFsUpdate(mut outdated_upload_data) => {
let local_metadata_path =
metadata_path(conf, sync_id.timeline_id, sync_id.tenant_id);
let local_metadata = match read_metadata_file(&local_metadata_path).await {
Ok(metadata) => metadata,
Err(e) => {
error!(
"Failed to load local metadata from path '{}': {e:?}",
local_metadata_path.display()
);
outdated_upload_data.retries += 1;
sync_queue::push(sync_id, SyncTask::Upload(outdated_upload_data));
register_sync_status(sync_start, task_name, Some(false));
return;
}
};
outdated_upload_data.data.metadata = local_metadata;
if outdated_upload_data.data.metadata.is_some() {
let local_metadata_path =
metadata_path(conf, sync_id.timeline_id, sync_id.tenant_id);
let local_metadata = match read_metadata_file(&local_metadata_path).await {
Ok(metadata) => metadata,
Err(e) => {
error!(
"Failed to load local metadata from path '{}': {e:?}",
local_metadata_path.display()
);
outdated_upload_data.retries += 1;
sync_queue::push(sync_id, SyncTask::Upload(outdated_upload_data));
register_sync_status(sync_start, task_name, Some(false));
return;
}
};
outdated_upload_data.data.metadata = Some(local_metadata);
}
outdated_upload_data
}
};
@@ -982,11 +999,14 @@ where
match index_accessor.timeline_entry_mut(&sync_id) {
Some(existing_entry) => {
if existing_entry.metadata.disk_consistent_lsn()
< uploaded_data.metadata.disk_consistent_lsn()
{
existing_entry.metadata = uploaded_data.metadata.clone();
if let Some(new_metadata) = uploaded_data.metadata.as_ref() {
if existing_entry.metadata.disk_consistent_lsn()
< new_metadata.disk_consistent_lsn()
{
existing_entry.metadata = new_metadata.clone();
}
}
if upload_failed {
existing_entry
.add_upload_failures(uploaded_data.layers_to_upload.iter().cloned());
@@ -997,7 +1017,11 @@ where
existing_entry.clone()
}
None => {
let mut new_remote_timeline = RemoteTimeline::new(uploaded_data.metadata.clone());
let new_metadata = match uploaded_data.metadata.as_ref() {
Some(new_metadata) => new_metadata,
None => bail!("For timeline {sync_id} upload, there's no upload metadata and no remote index entry, cannot create a new one"),
};
let mut new_remote_timeline = RemoteTimeline::new(new_metadata.clone());
if upload_failed {
new_remote_timeline
.add_upload_failures(uploaded_data.layers_to_upload.iter().cloned());
@@ -1140,7 +1164,7 @@ fn schedule_first_sync_tasks(
SyncTask::upload(TimelineUpload {
layers_to_upload: local_files,
uploaded_layers: HashSet::new(),
metadata: local_metadata,
metadata: Some(local_metadata),
}),
));
local_timeline_init_statuses
@@ -1202,7 +1226,7 @@ fn compare_local_and_remote_timeline(
SyncTask::upload(TimelineUpload {
layers_to_upload,
uploaded_layers: HashSet::new(),
metadata: local_metadata,
metadata: Some(local_metadata),
}),
));
// Note that status here doesn't change.
@@ -1269,7 +1293,7 @@ mod test_utils {
Ok(TimelineUpload {
layers_to_upload,
uploaded_layers: HashSet::new(),
metadata,
metadata: Some(metadata),
})
}
@@ -1340,7 +1364,7 @@ mod tests {
TimelineUpload {
layers_to_upload: HashSet::from([PathBuf::from("one")]),
uploaded_layers: HashSet::from([PathBuf::from("u_one")]),
metadata: metadata_1,
metadata: Some(metadata_1),
},
));
let upload_2 = SyncTask::Upload(SyncData::new(
@@ -1348,7 +1372,7 @@ mod tests {
TimelineUpload {
layers_to_upload: HashSet::from([PathBuf::from("two"), PathBuf::from("three")]),
uploaded_layers: HashSet::from([PathBuf::from("u_two")]),
metadata: metadata_2.clone(),
metadata: Some(metadata_2.clone()),
},
));
@@ -1380,7 +1404,8 @@ mod tests {
);
assert_eq!(
upload.metadata, metadata_2,
upload.metadata,
Some(metadata_2),
"Merged upload tasks should have a metadata with biggest disk_consistent_lsn"
);
}
@@ -1399,7 +1424,7 @@ mod tests {
TimelineUpload {
layers_to_upload: HashSet::from([PathBuf::from("u_one")]),
uploaded_layers: HashSet::from([PathBuf::from("u_one_2")]),
metadata: dummy_metadata(Lsn(1)),
metadata: Some(dummy_metadata(Lsn(1))),
},
);
@@ -1442,7 +1467,7 @@ mod tests {
TimelineUpload {
layers_to_upload: HashSet::from([PathBuf::from("one")]),
uploaded_layers: HashSet::from([PathBuf::from("u_one")]),
metadata: metadata_1.clone(),
metadata: Some(metadata_1.clone()),
},
),
);
@@ -1452,7 +1477,7 @@ mod tests {
TimelineUpload {
layers_to_upload: HashSet::from([PathBuf::from("two"), PathBuf::from("three")]),
uploaded_layers: HashSet::from([PathBuf::from("u_two")]),
metadata: metadata_2,
metadata: Some(metadata_2),
},
));
@@ -1490,7 +1515,8 @@ mod tests {
);
assert_eq!(
upload.metadata, metadata_1,
upload.metadata,
Some(metadata_1),
"Merged upload tasks should have a metadata with biggest disk_consistent_lsn"
);
}
@@ -1502,7 +1528,7 @@ mod tests {
TimelineUpload {
layers_to_upload: HashSet::from([PathBuf::from("one")]),
uploaded_layers: HashSet::from([PathBuf::from("u_one")]),
metadata: dummy_metadata(Lsn(22)),
metadata: Some(dummy_metadata(Lsn(22))),
},
);
@@ -1572,7 +1598,7 @@ mod tests {
TimelineUpload {
layers_to_upload: HashSet::from([PathBuf::from("one")]),
uploaded_layers: HashSet::from([PathBuf::from("u_one")]),
metadata: metadata_1,
metadata: Some(metadata_1),
},
),
);
@@ -1588,7 +1614,7 @@ mod tests {
TimelineUpload {
layers_to_upload: HashSet::from([PathBuf::from("two"), PathBuf::from("three")]),
uploaded_layers: HashSet::from([PathBuf::from("u_two")]),
metadata: metadata_2.clone(),
metadata: Some(metadata_2.clone()),
},
),
);
@@ -1640,7 +1666,8 @@ mod tests {
);
assert_eq!(
upload.metadata, metadata_2,
upload.metadata,
Some(metadata_2),
"Merged upload tasks should have a metadata with biggest disk_consistent_lsn"
);
}

View File

@@ -86,7 +86,10 @@ where
S: RemoteStorage<StoragePath = P> + Send + Sync + 'static,
{
let upload = &mut upload_data.data;
let new_upload_lsn = upload.metadata.disk_consistent_lsn();
let new_upload_lsn = upload
.metadata
.as_ref()
.map(|meta| meta.disk_consistent_lsn());
let already_uploaded_layers = remote_timeline
.map(|timeline| timeline.stored_files())
@@ -101,7 +104,7 @@ where
debug!("Layers to upload: {layers_to_upload:?}");
info!(
"Uploading {} timeline layers, new lsn: {new_upload_lsn}",
"Uploading {} timeline layers, new lsn: {new_upload_lsn:?}",
layers_to_upload.len(),
);
@@ -234,8 +237,10 @@ mod tests {
let current_retries = 3;
let metadata = dummy_metadata(Lsn(0x30));
let local_timeline_path = harness.timeline_path(&TIMELINE_ID);
let timeline_upload =
let mut timeline_upload =
create_local_timeline(&harness, TIMELINE_ID, &layer_files, metadata.clone()).await?;
timeline_upload.metadata = None;
assert!(
storage.list().await?.is_empty(),
"Storage should be empty before any uploads are made"
@@ -278,8 +283,8 @@ mod tests {
"Successful upload should have all layers uploaded"
);
assert_eq!(
upload.metadata, metadata,
"Successful upload should not chage its metadata"
upload.metadata, None,
"Successful upload without metadata should not have it returned either"
);
let storage_files = storage.list().await?;
@@ -367,7 +372,8 @@ mod tests {
"Successful upload should have all layers uploaded"
);
assert_eq!(
upload.metadata, metadata,
upload.metadata,
Some(metadata),
"Successful upload should not chage its metadata"
);