From c228cb7b3f024bbd21b382f1a9c28467105e50ad Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Tue, 15 Aug 2023 18:26:35 +0300 Subject: [PATCH] remote_timeline_client: continued integration work post ResidentLayer --- .../src/tenant/remote_timeline_client.rs | 91 +++++++++++++++---- pageserver/src/tenant/storage_layer.rs | 6 ++ pageserver/src/tenant/upload_queue.rs | 12 +-- 3 files changed, 84 insertions(+), 25 deletions(-) diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index e46205810a..256b2f1b4e 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -234,6 +234,7 @@ use crate::metrics::{ use crate::task_mgr::shutdown_token; use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id; use crate::tenant::remote_timeline_client::index::LayerFileMetadata; +use crate::tenant::storage_layer::AsLayerDesc; use crate::tenant::upload_queue::Delete; use crate::{ config::PageServerConf, @@ -250,7 +251,7 @@ use utils::id::{TenantId, TimelineId}; use self::index::IndexPart; -use super::storage_layer::LayerFileName; +use super::storage_layer::{LayerFileName, ResidentLayer}; use super::upload_queue::SetDeletedFlagProgress; // Occasional network issues and such can cause remote operations to fail, and @@ -600,23 +601,25 @@ impl RemoteTimelineClient { /// pub fn schedule_layer_file_upload( self: &Arc, - layer_file_name: &LayerFileName, + layer: ResidentLayer, layer_metadata: &LayerFileMetadata, ) -> anyhow::Result<()> { let mut guard = self.upload_queue.lock().unwrap(); let upload_queue = guard.initialized_mut()?; + // FIXME: we might be still including no longer existing files in the index_part because + // that consistency is built on strings and gentleman agreements, not Weak which + // could be upgraded at the time of rendering of index_part. upload_queue .latest_files - .insert(layer_file_name.clone(), layer_metadata.clone()); + .insert(layer.layer_desc().filename(), layer_metadata.clone()); upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1; - let op = UploadOp::UploadLayer(layer_file_name.clone(), layer_metadata.clone()); + info!("scheduled layer file upload {layer}"); + let op = UploadOp::UploadLayer(layer, layer_metadata.clone()); self.calls_unfinished_metric_begin(&op); upload_queue.queued_operations.push_back(op); - info!("scheduled layer file upload {layer_file_name}"); - // Launch the task immediately, if possible self.launch_queued_tasks(upload_queue); Ok(()) @@ -1054,11 +1057,8 @@ impl RemoteTimelineClient { } let upload_result: anyhow::Result<()> = match &task.op { - UploadOp::UploadLayer(ref layer_file_name, ref layer_metadata) => { - let path = &self - .conf - .timeline_path(&self.tenant_id, &self.timeline_id) - .join(layer_file_name.file_name()); + UploadOp::UploadLayer(ref layer, ref layer_metadata) => { + let path = layer.local_path(); upload::upload_timeline_layer( self.conf, &self.storage_impl, @@ -1367,6 +1367,7 @@ mod tests { context::RequestContext, tenant::{ harness::{TenantHarness, TIMELINE_ID}, + storage_layer::{LayerE, PersistentLayerDesc}, Tenant, Timeline, }, DEFAULT_PG_VERSION, @@ -1507,7 +1508,7 @@ mod tests { let TestSetup { harness, tenant: _tenant, - timeline: _timeline, + timeline, tenant_ctx: _tenant_ctx, remote_fs_dir, client, @@ -1542,15 +1543,56 @@ mod tests { std::fs::write(timeline_path.join(filename.file_name()), content).unwrap(); } + let layer_file_1 = LayerE::for_written( + harness.conf, + &timeline, + PersistentLayerDesc::from_filename( + timeline.tenant_id, + timeline.timeline_id, + layer_file_name_1.clone(), + content_1.len() as u64, + ), + ) + .unwrap(); + + // FIXME: need that api for local files + assert!(layer_file_1.needs_download_blocking().unwrap().is_none()); + + let layer_file_2 = LayerE::for_written( + harness.conf, + &timeline, + PersistentLayerDesc::from_filename( + timeline.tenant_id, + timeline.timeline_id, + layer_file_name_2.clone(), + content_2.len() as u64, + ), + ) + .unwrap(); + assert!(layer_file_2.needs_download_blocking().unwrap().is_none()); + + let layer_file_3 = LayerE::for_written( + harness.conf, + &timeline, + PersistentLayerDesc::from_filename( + timeline.tenant_id, + timeline.timeline_id, + layer_file_name_3.clone(), + content_3.len() as u64, + ), + ) + .unwrap(); + assert!(layer_file_3.needs_download_blocking().unwrap().is_none()); + client .schedule_layer_file_upload( - &layer_file_name_1, + layer_file_1.clone(), &LayerFileMetadata::new(content_1.len() as u64), ) .unwrap(); client .schedule_layer_file_upload( - &layer_file_name_2, + layer_file_2.clone(), &LayerFileMetadata::new(content_2.len() as u64), ) .unwrap(); @@ -1614,7 +1656,7 @@ mod tests { // Schedule upload and then a deletion. Check that the deletion is queued client .schedule_layer_file_upload( - &layer_file_name_3, + layer_file_3.clone(), &LayerFileMetadata::new(content_3.len() as u64), ) .unwrap(); @@ -1661,7 +1703,7 @@ mod tests { let TestSetup { harness, tenant: _tenant, - timeline: _timeline, + timeline, client, .. } = TestSetup::new("metrics").await.unwrap(); @@ -1681,6 +1723,21 @@ mod tests { ) .unwrap(); + let layer_file_1 = LayerE::for_written( + harness.conf, + &timeline, + PersistentLayerDesc::from_filename( + timeline.tenant_id, + timeline.timeline_id, + layer_file_name_1.clone(), + content_1.len() as u64, + ), + ) + .unwrap(); + + // FIXME: need that api for local files that actually exist + assert!(layer_file_1.needs_download_blocking().unwrap().is_none()); + #[derive(Debug, PartialEq)] struct BytesStartedFinished { started: Option, @@ -1707,7 +1764,7 @@ mod tests { client .schedule_layer_file_upload( - &layer_file_name_1, + layer_file_1.clone(), &LayerFileMetadata::new(content_1.len() as u64), ) .unwrap(); diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index ef7c015419..917525dfff 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -1053,6 +1053,12 @@ impl std::fmt::Display for ResidentLayer { } } +impl std::fmt::Debug for ResidentLayer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.owner) + } +} + impl ResidentLayer { pub(crate) fn local_path(&self) -> &std::path::Path { &self.owner.path diff --git a/pageserver/src/tenant/upload_queue.rs b/pageserver/src/tenant/upload_queue.rs index 6026825b0d..9671d83b1b 100644 --- a/pageserver/src/tenant/upload_queue.rs +++ b/pageserver/src/tenant/upload_queue.rs @@ -1,6 +1,7 @@ use crate::metrics::RemoteOpFileKind; use super::storage_layer::LayerFileName; +use super::storage_layer::ResidentLayer; use crate::tenant::metadata::TimelineMetadata; use crate::tenant::remote_timeline_client::index::IndexPart; use crate::tenant::remote_timeline_client::index::LayerFileMetadata; @@ -210,7 +211,7 @@ pub(crate) struct Delete { #[derive(Debug)] pub(crate) enum UploadOp { /// Upload a layer file - UploadLayer(LayerFileName, LayerFileMetadata), + UploadLayer(ResidentLayer, LayerFileMetadata), /// Upload the metadata file UploadMetadata(IndexPart, Lsn), @@ -225,13 +226,8 @@ pub(crate) enum UploadOp { impl std::fmt::Display for UploadOp { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self { - UploadOp::UploadLayer(path, metadata) => { - write!( - f, - "UploadLayer({}, size={:?})", - path.file_name(), - metadata.file_size() - ) + UploadOp::UploadLayer(layer, metadata) => { + write!(f, "UploadLayer({}, size={:?})", layer, metadata.file_size()) } UploadOp::UploadMetadata(_, lsn) => write!(f, "UploadMetadata(lsn: {})", lsn), UploadOp::Delete(delete) => write!(