refactor(rtc): schedule compaction update (#5649)

a single operation instead of N uploads and 1 deletion scheduling with
write(layer_map) lock releasing in the between. Compaction update will
make for a much better place to change how the operation will change in
future compared to more general file based operations.

builds upon #5645. solves the problem of difficult to see hopeful
correctness w.r.t. other `index_part.json` changing operations.

Co-authored-by: Shany Pozin <shany@neon.tech>
This commit is contained in:
Joonas Koivunen
2023-10-26 00:25:43 +03:00
committed by GitHub
parent 325258413a
commit f70019797c
2 changed files with 60 additions and 33 deletions

View File

@@ -627,7 +627,7 @@ impl RemoteTimelineClient {
///
/// Launch an upload operation in the background.
///
pub fn schedule_layer_file_upload(
pub(crate) fn schedule_layer_file_upload(
self: &Arc<Self>,
layer_file_name: &LayerFileName,
layer_metadata: &LayerFileMetadata,
@@ -635,6 +635,17 @@ impl RemoteTimelineClient {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
self.schedule_layer_file_upload0(upload_queue, layer_file_name, layer_metadata);
self.launch_queued_tasks(upload_queue);
Ok(())
}
fn schedule_layer_file_upload0(
self: &Arc<Self>,
upload_queue: &mut UploadQueueInitialized,
layer_file_name: &LayerFileName,
layer_metadata: &LayerFileMetadata,
) {
upload_queue
.latest_files
.insert(layer_file_name.clone(), layer_metadata.clone());
@@ -643,12 +654,7 @@ impl RemoteTimelineClient {
let op = UploadOp::UploadLayer(layer_file_name.clone(), layer_metadata.clone());
self.calls_unfinished_metric_begin(&op);
upload_queue.queued_operations.push_back(op);
info!("scheduled layer file upload {layer_file_name}");
// Launch the task immediately, if possible
self.launch_queued_tasks(upload_queue);
Ok(())
}
/// Launch a delete operation in the background.
@@ -666,16 +672,13 @@ impl RemoteTimelineClient {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
let no_bail_here = || {
let with_generations =
self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names);
let with_generations =
self.schedule_unlinking_of_layers_from_index_part0(upload_queue, &names);
self.schedule_deletion_of_unlinked0(upload_queue, with_generations);
self.schedule_deletion_of_unlinked0(upload_queue, with_generations);
// Launch the tasks immediately, if possible
self.launch_queued_tasks(upload_queue);
};
no_bail_here();
// Launch the tasks immediately, if possible
self.launch_queued_tasks(upload_queue);
Ok(())
}
@@ -694,7 +697,7 @@ impl RemoteTimelineClient {
// just forget the return value; after uploading the next index_part.json, we can consider
// the layer files as "dangling". this is fine however.
self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names);
self.schedule_unlinking_of_layers_from_index_part0(upload_queue, &names);
self.launch_queued_tasks(upload_queue);
@@ -706,7 +709,7 @@ impl RemoteTimelineClient {
fn schedule_unlinking_of_layers_from_index_part0(
self: &Arc<Self>,
upload_queue: &mut UploadQueueInitialized,
names: Vec<LayerFileName>,
names: &[LayerFileName],
) -> Vec<(LayerFileName, Generation)> {
// Deleting layers doesn't affect the values stored in TimelineMetadata,
// so we don't need update it. Just serialize it.
@@ -715,14 +718,14 @@ impl RemoteTimelineClient {
// Decorate our list of names with each name's generation, dropping
// makes that are unexpectedly missing from our metadata.
let with_generations: Vec<_> = names
.into_iter()
.iter()
.filter_map(|name| {
// Remove from latest_files, learning the file's remote generation in the process
let meta = upload_queue.latest_files.remove(&name);
let meta = upload_queue.latest_files.remove(name);
if let Some(meta) = meta {
upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
Some((name, meta.generation))
Some((name.to_owned(), meta.generation))
} else {
// This can only happen if we forgot to to schedule the file upload
// before scheduling the delete. Log it because it is a rare/strange
@@ -734,6 +737,9 @@ impl RemoteTimelineClient {
})
.collect();
// after unlinking files from the upload_queue.latest_files we must always schedule an
// index_part update, because that needs to be uploaded before we can actually delete the
// files.
if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
self.schedule_index_upload(upload_queue, metadata);
}
@@ -742,7 +748,7 @@ impl RemoteTimelineClient {
}
/// Schedules deletion for layer files which have previously been unlinked from the
/// `index_part.json`.
/// `index_part.json` with [`Self::schedule_unlinking_of_layers_from_index_part`].
#[allow(unused)] // will be used by Layer::drop in PR#4938
pub(crate) fn schedule_deletion_of_unlinked(
self: &Arc<Self>,
@@ -773,6 +779,29 @@ impl RemoteTimelineClient {
upload_queue.queued_operations.push_back(op);
}
/// Schedules a compaction update to the remote `index_part.json`.
///
/// `compacted_from` represent the L0 names which have been `compacted_to` L1 layers.
pub(crate) fn schedule_compaction_update(
self: &Arc<Self>,
compacted_from: &[LayerFileName],
compacted_to: &[(LayerFileName, LayerFileMetadata)],
) -> anyhow::Result<()> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
for (name, m) in compacted_to {
self.schedule_layer_file_upload0(upload_queue, name, m);
}
let with_generations =
self.schedule_unlinking_of_layers_from_index_part0(upload_queue, compacted_from);
self.schedule_deletion_of_unlinked0(upload_queue, with_generations);
self.launch_queued_tasks(upload_queue);
Ok(())
}
///
/// Wait for all previously scheduled uploads/deletions to complete
///

View File

@@ -3870,22 +3870,21 @@ impl Timeline {
// now, we just skip the file to avoid unintentional modification to files on the disk and in the layer map.
let mut duplicated_layers = HashSet::new();
let mut uploaded_layers = Vec::with_capacity(new_layers.len());
let mut insert_layers = Vec::new();
let mut remove_layers = Vec::new();
for l in new_layers {
for l in &new_layers {
let new_delta_path = l.path();
let metadata = new_delta_path.metadata().with_context(|| {
format!("read file metadata for new created layer {new_delta_path}")
})?;
if let Some(remote_client) = &self.remote_client {
remote_client.schedule_layer_file_upload(
&l.filename(),
&LayerFileMetadata::new(metadata.len(), self.generation),
)?;
}
uploaded_layers.push((
l.filename(),
LayerFileMetadata::new(metadata.len(), self.generation),
));
// update metrics, including the timeline's physical size
self.metrics.record_new_file_metrics(metadata.len());
@@ -3898,7 +3897,7 @@ impl Timeline {
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
let l = l as Arc<dyn PersistentLayer>;
let l = l.to_owned() as Arc<dyn PersistentLayer>;
if guard.contains(&l) {
tracing::error!(layer=%l, "duplicated L1 layer");
duplicated_layers.insert(l.layer_desc().key());
@@ -3930,13 +3929,12 @@ impl Timeline {
&self.metrics,
)?;
drop_wlock(guard);
// Also schedule the deletions in remote storage
if let Some(remote_client) = &self.remote_client {
remote_client.schedule_layer_file_deletion(layer_names_to_delete)?;
if let Some(remote_client) = self.remote_client.as_ref() {
remote_client.schedule_compaction_update(&layer_names_to_delete, &uploaded_layers)?;
}
drop_wlock(guard);
Ok(())
}