refactor: simplify schedule upload and tests

This commit is contained in:
Joonas Koivunen
2023-08-28 16:33:01 +03:00
parent d5ac61d566
commit df328758f0
2 changed files with 36 additions and 70 deletions

View File

@@ -600,18 +600,19 @@ impl RemoteTimelineClient {
pub(crate) fn schedule_layer_file_upload(
self: &Arc<Self>,
layer: ResidentLayer,
layer_metadata: &LayerFileMetadata,
) -> anyhow::Result<()> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
let metadata = LayerFileMetadata::new(layer.layer_desc().file_size);
upload_queue
.latest_files
.insert(layer.layer_desc().filename(), layer_metadata.clone());
.insert(layer.layer_desc().filename(), metadata.clone());
upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
info!("scheduled layer file upload {layer}");
let op = UploadOp::UploadLayer(layer, layer_metadata.clone());
let op = UploadOp::UploadLayer(layer, metadata);
self.calls_unfinished_metric_begin(&op);
upload_queue.queued_operations.push_back(op);
@@ -1523,53 +1524,29 @@ mod tests {
.unwrap();
// Create a couple of dummy files, schedule upload for them
let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
let layer_file_name_2: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D9-00000000016B5A52".parse().unwrap();
let layer_file_name_3: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59DA-00000000016B5A53".parse().unwrap();
let content_1 = dummy_contents("foo");
let content_2 = dummy_contents("bar");
let content_3 = dummy_contents("baz");
for (filename, content) in [
(&layer_file_name_1, &content_1),
(&layer_file_name_2, &content_2),
(&layer_file_name_3, &content_3),
] {
std::fs::write(timeline_path.join(filename.file_name()), content).unwrap();
}
let layers = [
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), dummy_contents("foo")),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D9-00000000016B5A52".parse().unwrap(), dummy_contents("bar")),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59DA-00000000016B5A53".parse().unwrap(), dummy_contents("baz"))
]
.into_iter()
.map(|(name, contents): (LayerFileName, Vec<u8>)| {
std::fs::write(timeline_path.join(name.file_name()), &contents).unwrap();
let layer_file_1 = Layer::for_resident(
harness.conf,
&timeline,
layer_file_name_1.clone(),
LayerFileMetadata::new(content_1.len() as u64),
);
let layer_file_2 = Layer::for_resident(
harness.conf,
&timeline,
layer_file_name_2.clone(),
LayerFileMetadata::new(content_2.len() as u64),
);
let layer_file_3 = Layer::for_resident(
harness.conf,
&timeline,
layer_file_name_3.clone(),
LayerFileMetadata::new(content_3.len() as u64),
);
Layer::for_resident(
harness.conf,
&timeline,
name,
LayerFileMetadata::new(contents.len() as u64),
)
}).collect::<Vec<_>>();
client
.schedule_layer_file_upload(
layer_file_1.clone(),
&LayerFileMetadata::new(content_1.len() as u64),
)
.schedule_layer_file_upload(layers[0].clone())
.unwrap();
client
.schedule_layer_file_upload(
layer_file_2.clone(),
&LayerFileMetadata::new(content_2.len() as u64),
)
.schedule_layer_file_upload(layers[1].clone())
.unwrap();
// Check that they are started immediately, not queued
@@ -1622,21 +1599,18 @@ mod tests {
.map(|f| f.to_owned())
.collect(),
&[
&layer_file_name_1.file_name(),
&layer_file_name_2.file_name(),
&layers[0].layer_desc().filename().file_name(),
&layers[1].layer_desc().filename().file_name(),
],
);
assert_eq!(index_part.metadata, metadata);
// Schedule upload and then a deletion. Check that the deletion is queued
client
.schedule_layer_file_upload(
layer_file_3.clone(),
&LayerFileMetadata::new(content_3.len() as u64),
)
.schedule_layer_file_upload(layers[2].clone())
.unwrap();
client
.schedule_layer_file_deletion(&[layer_file_name_1.clone()])
.schedule_layer_file_deletion(&[layers[0].layer_desc().filename()])
.unwrap();
{
let mut guard = client.upload_queue.lock().unwrap();
@@ -1651,8 +1625,8 @@ mod tests {
}
assert_remote_files(
&[
&layer_file_name_1.file_name(),
&layer_file_name_2.file_name(),
&layers[0].layer_desc().filename().file_name(),
&layers[1].layer_desc().filename().file_name(),
"index_part.json",
],
&remote_timeline_dir,
@@ -1663,8 +1637,8 @@ mod tests {
assert_remote_files(
&[
&layer_file_name_2.file_name(),
&layer_file_name_3.file_name(),
&layers[1].layer_desc().filename().file_name(),
&layers[2].layer_desc().filename().file_name(),
"index_part.json",
],
&remote_timeline_dir,
@@ -1730,10 +1704,7 @@ mod tests {
let init = get_bytes_started_stopped();
client
.schedule_layer_file_upload(
layer_file_1.clone(),
&LayerFileMetadata::new(content_1.len() as u64),
)
.schedule_layer_file_upload(layer_file_1.clone())
.unwrap();
let pre = get_bytes_started_stopped();

View File

@@ -33,7 +33,6 @@ use std::time::{Duration, Instant, SystemTime};
use crate::context::{
AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder,
};
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::storage_layer::delta_layer::DeltaEntry;
use crate::tenant::storage_layer::{
AsLayerDesc, DeltaLayerWriter, EvictionError, ImageLayerWriter, InMemoryLayer, Layer,
@@ -729,9 +728,7 @@ impl Timeline {
.map_err(anyhow::Error::from)?;
if let Some(remote_client) = &self.remote_client {
for layer in layers {
let m = LayerFileMetadata::new(layer.layer_desc().file_size);
remote_client.schedule_layer_file_upload(layer, &m)?;
remote_client.schedule_layer_file_upload(layer)?;
}
}
@@ -1503,7 +1500,7 @@ impl Timeline {
total_physical_size += m.file_size();
let resident = Layer::for_resident(conf, &this, name, m.clone());
let layer = resident.as_ref().clone();
needs_upload.push((resident, m));
needs_upload.push(resident);
layer
}
UseLocal(m) => {
@@ -1534,8 +1531,8 @@ impl Timeline {
if let Some(rtc) = self.remote_client.as_ref() {
let (needs_upload, needs_cleanup) = to_sync;
for (layer, m) in needs_upload {
rtc.schedule_layer_file_upload(layer, &m)?;
for layer in needs_upload {
rtc.schedule_layer_file_upload(layer)?;
}
rtc.schedule_layer_file_deletion(&needs_cleanup)?;
rtc.schedule_index_upload_for_file_changes()?;
@@ -2460,8 +2457,7 @@ impl Timeline {
if let Some(remote_client) = &self.remote_client {
for layer in layer_paths_to_upload {
let m = LayerFileMetadata::new(layer.layer_desc().file_size);
remote_client.schedule_layer_file_upload(layer, &m)?;
remote_client.schedule_layer_file_upload(layer)?;
}
remote_client.schedule_index_upload_for_metadata_update(&metadata)?;
}
@@ -3382,9 +3378,8 @@ impl Timeline {
for l in new_layers {
if let Some(remote_client) = &self.remote_client {
let m = LayerFileMetadata::new(l.layer_desc().file_size);
// upload even if duplicated, because we may have changed the contents
remote_client.schedule_layer_file_upload(l.clone(), &m)?;
remote_client.schedule_layer_file_upload(l.clone())?;
}
if guard.contains(l.as_ref()) {
duplicated_layers.insert(l.layer_desc().key());