From 67b17034abe811c36207103da8c4b15d014886b1 Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 29 Aug 2023 14:59:40 +0100 Subject: [PATCH] pageserver: use generation in keys when writing --- pageserver/src/config.rs | 17 --- pageserver/src/tenant.rs | 10 +- .../src/tenant/remote_timeline_client.rs | 110 ++++++++++++++++-- .../tenant/remote_timeline_client/delete.rs | 9 +- .../tenant/remote_timeline_client/download.rs | 42 +++---- .../tenant/remote_timeline_client/index.rs | 8 +- .../tenant/remote_timeline_client/upload.rs | 19 +-- pageserver/src/tenant/upload_queue.rs | 4 +- 8 files changed, 147 insertions(+), 72 deletions(-) diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index f2aa2f365e..5394f17398 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -643,23 +643,6 @@ impl PageServerConf { .join(METADATA_FILE_NAME) } - /// Files on the remote storage are stored with paths, relative to the workdir. - /// That path includes in itself both tenant and timeline ids, allowing to have a unique remote storage path. - /// - /// Errors if the path provided does not start from pageserver's workdir. - pub fn remote_path(&self, local_path: &Path) -> anyhow::Result { - local_path - .strip_prefix(&self.workdir) - .context("Failed to strip workdir prefix") - .and_then(RemotePath::new) - .with_context(|| { - format!( - "Failed to resolve remote part of path {:?} for base {:?}", - local_path, self.workdir - ) - }) - } - /// Turns storage remote path of a file into its local path. pub fn local_path(&self, remote_path: &RemotePath) -> PathBuf { remote_path.with_base(&self.workdir) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 792ab3c30b..4a2d0103aa 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -655,12 +655,8 @@ impl Tenant { .as_ref() .ok_or_else(|| anyhow::anyhow!("cannot attach without remote storage"))?; - let remote_timeline_ids = remote_timeline_client::list_remote_timelines( - remote_storage, - self.conf, - self.tenant_id, - ) - .await?; + let remote_timeline_ids = + remote_timeline_client::list_remote_timelines(remote_storage, self.tenant_id).await?; info!("found {} timelines", remote_timeline_ids.len()); @@ -672,6 +668,7 @@ impl Tenant { self.conf, self.tenant_id, timeline_id, + self.generation, ); part_downloads.spawn( async move { @@ -2944,6 +2941,7 @@ impl Tenant { self.conf, self.tenant_id, timeline_id, + self.generation, ); Some(remote_client) } else { diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 5e33c5a74b..6f34696724 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -216,7 +216,7 @@ use utils::backoff::{ }; use std::collections::{HashMap, VecDeque}; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, Mutex}; @@ -235,6 +235,7 @@ use crate::task_mgr::shutdown_token; use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id; use crate::tenant::remote_timeline_client::index::LayerFileMetadata; use crate::tenant::upload_queue::Delete; +use crate::tenant::TIMELINES_SEGMENT_NAME; use crate::{ config::PageServerConf, task_mgr, @@ -252,6 +253,7 @@ use self::index::IndexPart; use super::storage_layer::LayerFileName; use super::upload_queue::SetDeletedFlagProgress; +use super::Generation; // Occasional network issues and such can cause remote operations to fail, and // that's expected. If a download fails, we log it at info-level, and retry. @@ -315,6 +317,7 @@ pub struct RemoteTimelineClient { tenant_id: TenantId, timeline_id: TimelineId, + generation: Generation, upload_queue: Mutex, @@ -335,12 +338,14 @@ impl RemoteTimelineClient { conf: &'static PageServerConf, tenant_id: TenantId, timeline_id: TimelineId, + generation: Generation, ) -> RemoteTimelineClient { RemoteTimelineClient { conf, runtime: BACKGROUND_RUNTIME.handle().to_owned(), tenant_id, timeline_id, + generation, storage_impl: remote_storage, upload_queue: Mutex::new(UploadQueue::Uninitialized), metrics: Arc::new(RemoteTimelineClientMetrics::new(&tenant_id, &timeline_id)), @@ -453,6 +458,7 @@ impl RemoteTimelineClient { &self.storage_impl, &self.tenant_id, &self.timeline_id, + self.generation, ) .measure_remote_op( self.tenant_id, @@ -761,10 +767,10 @@ impl RemoteTimelineClient { backoff::retry( || { upload::upload_index_part( - self.conf, &self.storage_impl, &self.tenant_id, &self.timeline_id, + self.generation, &index_part_with_deleted_at, ) }, @@ -850,8 +856,7 @@ impl RemoteTimelineClient { // Do not delete index part yet, it is needed for possible retry. If we remove it first // and retry will arrive to different pageserver there wont be any traces of it on remote storage - let timeline_path = self.conf.timeline_path(&self.tenant_id, &self.timeline_id); - let timeline_storage_path = self.conf.remote_path(&timeline_path)?; + let timeline_storage_path = remote_timeline_path(&self.tenant_id, &self.timeline_id); let remaining = backoff::retry( || async { @@ -1055,15 +1060,17 @@ impl RemoteTimelineClient { let upload_result: anyhow::Result<()> = match &task.op { UploadOp::UploadLayer(ref layer_file_name, ref layer_metadata) => { - let path = &self + let mut path = self .conf .timeline_path(&self.tenant_id, &self.timeline_id) .join(layer_file_name.file_name()); + upload::upload_timeline_layer( self.conf, &self.storage_impl, - path, + &path, layer_metadata, + self.generation, ) .measure_remote_op( self.tenant_id, @@ -1085,10 +1092,10 @@ impl RemoteTimelineClient { }; let res = upload::upload_index_part( - self.conf, &self.storage_impl, &self.tenant_id, &self.timeline_id, + self.generation, index_part, ) .measure_remote_op( @@ -1360,6 +1367,79 @@ impl RemoteTimelineClient { } } +pub fn remote_timelines_path(tenant_id: &TenantId) -> RemotePath { + let path = format!("tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}"); + RemotePath::from_string(&path).expect("Failed to construct path") +} + +pub fn remote_timeline_path(tenant_id: &TenantId, timeline_id: &TimelineId) -> RemotePath { + remote_timelines_path(tenant_id).join(&PathBuf::from(timeline_id.to_string())) +} + +pub fn remote_layer_path( + tenant_id: &TenantId, + timeline_id: &TimelineId, + layer_file_name: &LayerFileName, + layer_meta: &LayerFileMetadata, +) -> RemotePath { + let path = if let Some(generation) = layer_meta.generation { + // Generation-aware key format + format!( + "tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}", + layer_file_name.file_name(), + generation.get_suffix() + ) + } else { + // Pre-generation key format + format!( + "tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}", + layer_file_name.file_name(), + ) + }; + + RemotePath::from_string(&path).expect("Failed to construct path") +} + +pub fn remote_index_path( + tenant_id: &TenantId, + timeline_id: &TimelineId, + generation: Generation, +) -> RemotePath { + RemotePath::from_string(&format!( + "tenants/{tenant_id}/{TIMELINES_SEGMENT_NAME}/{timeline_id}/{0}{1}", + IndexPart::FILE_NAME, + generation.get_suffix() + )) + .expect("Failed to construct path") +} + +/// Files on the remote storage are stored with paths, relative to the workdir. +/// That path includes in itself both tenant and timeline ids, allowing to have a unique remote storage path. +/// +/// Errors if the path provided does not start from pageserver's workdir. +pub fn remote_path( + conf: &PageServerConf, + local_path: &Path, + generation: Generation, +) -> anyhow::Result { + let stripped = local_path + .strip_prefix(&conf.workdir) + .context("Failed to strip workdir prefix")?; + + let suffixed = format!( + "{0}{1}", + stripped.to_string_lossy(), + generation.get_suffix() + ); + + RemotePath::new(&PathBuf::from(suffixed)).with_context(|| { + format!( + "Failed to resolve remote part of path {:?} for base {:?}", + local_path, conf.workdir + ) + }) +} + #[cfg(test)] mod tests { use super::*; @@ -1409,8 +1489,11 @@ mod tests { assert_eq!(avec, bvec); } - fn assert_remote_files(expected: &[&str], remote_path: &Path) { - let mut expected: Vec = expected.iter().map(|x| String::from(*x)).collect(); + fn assert_remote_files(expected: &[&str], remote_path: &Path, generation: Generation) { + let mut expected: Vec = expected + .iter() + .map(|x| format!("{}{}", x, generation.get_suffix())) + .collect(); expected.sort(); let mut found: Vec = Vec::new(); @@ -1461,6 +1544,8 @@ mod tests { storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()), }; + let generation = Generation::new(0xdeadbeef); + let storage = GenericRemoteStorage::from_config(&storage_config).unwrap(); let client = Arc::new(RemoteTimelineClient { @@ -1468,6 +1553,7 @@ mod tests { runtime: tokio::runtime::Handle::current(), tenant_id: harness.tenant_id, timeline_id: TIMELINE_ID, + generation: generation, storage_impl: storage, upload_queue: Mutex::new(UploadQueue::Uninitialized), metrics: Arc::new(RemoteTimelineClientMetrics::new( @@ -1526,6 +1612,8 @@ mod tests { .init_upload_queue_for_empty_remote(&metadata) .unwrap(); + let generation = Generation::new(0xdeadbeef); + // Create a couple of dummy files, schedule upload for them let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(); let layer_file_name_2: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D9-00000000016B5A52".parse().unwrap(); @@ -1542,8 +1630,6 @@ mod tests { std::fs::write(timeline_path.join(filename.file_name()), content).unwrap(); } - let generation = Generation::new(0xdeadbeef); - client .schedule_layer_file_upload( &layer_file_name_1, @@ -1641,6 +1727,7 @@ mod tests { "index_part.json", ], &remote_timeline_dir, + generation, ); // Finish them @@ -1653,6 +1740,7 @@ mod tests { "index_part.json", ], &remote_timeline_dir, + generation, ); } diff --git a/pageserver/src/tenant/remote_timeline_client/delete.rs b/pageserver/src/tenant/remote_timeline_client/delete.rs index 3f505d45ab..912f213f15 100644 --- a/pageserver/src/tenant/remote_timeline_client/delete.rs +++ b/pageserver/src/tenant/remote_timeline_client/delete.rs @@ -5,7 +5,10 @@ use tracing::debug; use remote_storage::GenericRemoteStorage; -use crate::config::PageServerConf; +use crate::{ + config::PageServerConf, + tenant::{remote_timeline_client::remote_path, Generation}, +}; pub(super) async fn delete_layer<'a>( conf: &'static PageServerConf, @@ -17,7 +20,9 @@ pub(super) async fn delete_layer<'a>( }); debug!("Deleting layer from remote storage: {local_layer_path:?}",); - let path_to_delete = conf.remote_path(local_layer_path)?; + // FIXME: once we start writing out keys with generations, this will + // need updating (or, in the deletion queue branch, it is already gone) + let path_to_delete = remote_path(conf, local_layer_path, Generation::placeholder())?; // We don't want to print an error if the delete failed if the file has // already been deleted. Thankfully, in this situation S3 already diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index 2cb33f07c9..087f6be58c 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -15,14 +15,16 @@ use tokio_util::sync::CancellationToken; use utils::{backoff, crashsafe}; use crate::config::PageServerConf; +use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path}; use crate::tenant::storage_layer::LayerFileName; use crate::tenant::timeline::span::debug_assert_current_span_has_tenant_and_timeline_id; +use crate::tenant::Generation; use remote_storage::{DownloadError, GenericRemoteStorage}; use utils::crashsafe::path_with_suffix_extension; use utils::id::{TenantId, TimelineId}; use super::index::{IndexPart, LayerFileMetadata}; -use super::{FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES}; +use super::{remote_index_path, FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES}; static MAX_DOWNLOAD_DURATION: Duration = Duration::from_secs(120); @@ -41,13 +43,11 @@ pub async fn download_layer_file<'a>( ) -> Result { debug_assert_current_span_has_tenant_and_timeline_id(); - let timeline_path = conf.timeline_path(&tenant_id, &timeline_id); + let local_path = conf + .timeline_path(&tenant_id, &timeline_id) + .join(layer_file_name.file_name()); - let local_path = timeline_path.join(layer_file_name.file_name()); - - let remote_path = conf - .remote_path(&local_path) - .map_err(DownloadError::Other)?; + let remote_path = remote_layer_path(&tenant_id, &timeline_id, layer_file_name, layer_metadata); // Perform a rename inspired by durable_rename from file_utils.c. // The sequence: @@ -175,19 +175,17 @@ pub fn is_temp_download_file(path: &Path) -> bool { /// List timelines of given tenant in remote storage pub async fn list_remote_timelines<'a>( storage: &'a GenericRemoteStorage, - conf: &'static PageServerConf, tenant_id: TenantId, ) -> anyhow::Result> { - let tenant_path = conf.timelines_path(&tenant_id); - let tenant_storage_path = conf.remote_path(&tenant_path)?; + let remote_path = remote_timelines_path(&tenant_id); fail::fail_point!("storage-sync-list-remote-timelines", |_| { anyhow::bail!("storage-sync-list-remote-timelines"); }); let timelines = download_retry( - || storage.list_prefixes(Some(&tenant_storage_path)), - &format!("list prefixes for {tenant_path:?}"), + || storage.list_prefixes(Some(&remote_path)), + &format!("list prefixes for {tenant_id}"), ) .await?; @@ -226,17 +224,17 @@ pub(super) async fn download_index_part( storage: &GenericRemoteStorage, tenant_id: &TenantId, timeline_id: &TimelineId, + generation: Generation, ) -> Result { - let index_part_path = conf + let local_path = conf .metadata_path(tenant_id, timeline_id) .with_file_name(IndexPart::FILE_NAME); - let part_storage_path = conf - .remote_path(&index_part_path) - .map_err(DownloadError::BadInput)?; + + let remote_path = remote_index_path(tenant_id, timeline_id, generation); let index_part_bytes = download_retry( || async { - let mut index_part_download = storage.download(&part_storage_path).await?; + let mut index_part_download = storage.download(&remote_path).await?; let mut index_part_bytes = Vec::new(); tokio::io::copy( @@ -244,20 +242,16 @@ pub(super) async fn download_index_part( &mut index_part_bytes, ) .await - .with_context(|| { - format!("Failed to download an index part into file {index_part_path:?}") - }) + .with_context(|| format!("Failed to download an index part into file {local_path:?}")) .map_err(DownloadError::Other)?; Ok(index_part_bytes) }, - &format!("download {part_storage_path:?}"), + &format!("download {remote_path:?}"), ) .await?; let index_part: IndexPart = serde_json::from_slice(&index_part_bytes) - .with_context(|| { - format!("Failed to deserialize index part file into file {index_part_path:?}") - }) + .with_context(|| format!("Failed to deserialize index part file into file {local_path:?}")) .map_err(DownloadError::Other)?; Ok(index_part) diff --git a/pageserver/src/tenant/remote_timeline_client/index.rs b/pageserver/src/tenant/remote_timeline_client/index.rs index 4a6d0815ca..fb0cc413d1 100644 --- a/pageserver/src/tenant/remote_timeline_client/index.rs +++ b/pageserver/src/tenant/remote_timeline_client/index.rs @@ -26,7 +26,7 @@ pub struct LayerFileMetadata { file_size: u64, // Optional for backward compatibility: older data will not have a generation set - generation: Option, + pub(crate) generation: Option, } impl From<&'_ IndexLayerMetadata> for LayerFileMetadata { @@ -142,13 +142,17 @@ impl TryFrom<&UploadQueueInitialized> for IndexPart { } } +fn generation_is_none(g: &Option) -> bool { + g.map(|g| g.is_none()).unwrap_or(true) +} + /// Serialized form of [`LayerFileMetadata`]. #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Default)] pub struct IndexLayerMetadata { pub(super) file_size: u64, #[serde(default)] - #[serde(skip_serializing_if = "Option::is_none")] + #[serde(skip_serializing_if = "generation_is_none")] pub(super) generation: Option, } diff --git a/pageserver/src/tenant/remote_timeline_client/upload.rs b/pageserver/src/tenant/remote_timeline_client/upload.rs index a805e9bd60..3a51ea035d 100644 --- a/pageserver/src/tenant/remote_timeline_client/upload.rs +++ b/pageserver/src/tenant/remote_timeline_client/upload.rs @@ -5,7 +5,11 @@ use fail::fail_point; use std::{io::ErrorKind, path::Path}; use tokio::fs; -use crate::{config::PageServerConf, tenant::remote_timeline_client::index::IndexPart}; +use super::Generation; +use crate::{ + config::PageServerConf, + tenant::remote_timeline_client::{index::IndexPart, remote_index_path, remote_path}, +}; use remote_storage::GenericRemoteStorage; use utils::id::{TenantId, TimelineId}; @@ -15,10 +19,10 @@ use tracing::info; /// Serializes and uploads the given index part data to the remote storage. pub(super) async fn upload_index_part<'a>( - conf: &'static PageServerConf, storage: &'a GenericRemoteStorage, tenant_id: &TenantId, timeline_id: &TimelineId, + generation: Generation, index_part: &'a IndexPart, ) -> anyhow::Result<()> { tracing::trace!("uploading new index part"); @@ -32,13 +36,9 @@ pub(super) async fn upload_index_part<'a>( let index_part_size = index_part_bytes.len(); let index_part_bytes = tokio::io::BufReader::new(std::io::Cursor::new(index_part_bytes)); - let index_part_path = conf - .metadata_path(tenant_id, timeline_id) - .with_file_name(IndexPart::FILE_NAME); - let storage_path = conf.remote_path(&index_part_path)?; - + let remote_path = remote_index_path(tenant_id, timeline_id, generation); storage - .upload_storage_object(Box::new(index_part_bytes), index_part_size, &storage_path) + .upload_storage_object(Box::new(index_part_bytes), index_part_size, &remote_path) .await .with_context(|| format!("Failed to upload index part for '{tenant_id} / {timeline_id}'")) } @@ -52,12 +52,13 @@ pub(super) async fn upload_timeline_layer<'a>( storage: &'a GenericRemoteStorage, source_path: &'a Path, known_metadata: &'a LayerFileMetadata, + generation: Generation, ) -> anyhow::Result<()> { fail_point!("before-upload-layer", |_| { bail!("failpoint before-upload-layer") }); - let storage_path = conf.remote_path(source_path)?; + let storage_path = remote_path(conf, source_path, generation)?; let source_file_res = fs::File::open(&source_path).await; let source_file = match source_file_res { Ok(source_file) => source_file, diff --git a/pageserver/src/tenant/upload_queue.rs b/pageserver/src/tenant/upload_queue.rs index 6026825b0d..92202cb3eb 100644 --- a/pageserver/src/tenant/upload_queue.rs +++ b/pageserver/src/tenant/upload_queue.rs @@ -233,7 +233,9 @@ impl std::fmt::Display for UploadOp { metadata.file_size() ) } - UploadOp::UploadMetadata(_, lsn) => write!(f, "UploadMetadata(lsn: {})", lsn), + UploadOp::UploadMetadata(_, lsn) => { + write!(f, "UploadMetadata(lsn: {})", lsn) + } UploadOp::Delete(delete) => write!( f, "Delete(path: {}, scheduled_from_timeline_delete: {})",