From b50e0793cf482e47f28f4d34b9c959885f0a167d Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Wed, 7 Dec 2022 23:11:02 +0200 Subject: [PATCH] Rework remote_storage interface (#2993) Changes: * Remove `RemoteObjectId` concept from remote_storage. Operate directly on /-separated names instead. These names are now represented by struct `RemotePath` which was renamed from struct `RelativePath` * Require remote storage to operate on relative paths for its contents, thus simplifying the way to derive them in pageserver and safekeeper * Make `IndexPart` to use `String` instead of `RelativePath` for its entries, since those are just the layer names --- libs/remote_storage/src/lib.rs | 159 +++------ libs/remote_storage/src/local_fs.rs | 431 ++++++----------------- libs/remote_storage/src/s3_bucket.rs | 286 +++------------ pageserver/src/bin/pageserver.rs | 4 +- pageserver/src/config.rs | 24 +- pageserver/src/storage_sync2.rs | 106 +++--- pageserver/src/storage_sync2/delete.rs | 30 +- pageserver/src/storage_sync2/download.rs | 62 +--- pageserver/src/storage_sync2/index.rs | 113 +++--- pageserver/src/storage_sync2/upload.rs | 37 +- pageserver/src/tenant/timeline.rs | 186 +++++----- safekeeper/src/send_wal.rs | 1 + safekeeper/src/wal_backup.rs | 60 ++-- safekeeper/src/wal_storage.rs | 16 +- 14 files changed, 492 insertions(+), 1023 deletions(-) diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 0218fb464d..f72689884e 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -10,7 +10,7 @@ mod s3_bucket; use std::{ collections::HashMap, - fmt::{Debug, Display}, + fmt::Debug, num::{NonZeroU32, NonZeroUsize}, ops::Deref, path::{Path, PathBuf}, @@ -41,44 +41,27 @@ pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100; const REMOTE_STORAGE_PREFIX_SEPARATOR: char = '/'; -#[derive(Clone, PartialEq, Eq)] -pub struct RemoteObjectId(String); +/// Path on the remote storage, relative to some inner prefix. +/// The prefix is an implementation detail, that allows representing local paths +/// as the remote ones, stripping the local storage prefix away. +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct RemotePath(PathBuf); + +impl RemotePath { + pub fn new(relative_path: &Path) -> anyhow::Result { + anyhow::ensure!( + relative_path.is_relative(), + "Path {relative_path:?} is not relative" + ); + Ok(Self(relative_path.to_path_buf())) + } + + pub fn with_base(&self, base_path: &Path) -> PathBuf { + base_path.join(&self.0) + } -/// -/// A key that refers to an object in remote storage. It works much like a Path, -/// but it's a separate datatype so that you don't accidentally mix local paths -/// and remote keys. -/// -impl RemoteObjectId { - // Needed to retrieve last component for RemoteObjectId. - // In other words a file name - /// Turn a/b/c or a/b/c/ into c pub fn object_name(&self) -> Option<&str> { - // corner case, char::to_string is not const, thats why this is more verbose than it needs to be - // see https://github.com/rust-lang/rust/issues/88674 - if self.0.len() == 1 && self.0.chars().next().unwrap() == REMOTE_STORAGE_PREFIX_SEPARATOR { - return None; - } - - if self.0.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) { - self.0.rsplit(REMOTE_STORAGE_PREFIX_SEPARATOR).nth(1) - } else { - self.0 - .rsplit_once(REMOTE_STORAGE_PREFIX_SEPARATOR) - .map(|(_, last)| last) - } - } -} - -impl Debug for RemoteObjectId { - fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { - Debug::fmt(&self.0, fmt) - } -} - -impl Display for RemoteObjectId { - fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - Display::fmt(&self.0, fmt) + self.0.file_name().and_then(|os_str| os_str.to_str()) } } @@ -87,49 +70,40 @@ impl Display for RemoteObjectId { /// providing basic CRUD operations for storage files. #[async_trait::async_trait] pub trait RemoteStorage: Send + Sync + 'static { - /// Attempts to derive the storage path out of the local path, if the latter is correct. - fn remote_object_id(&self, local_path: &Path) -> anyhow::Result; - - /// Gets the download path of the given storage file. - fn local_path(&self, remote_object_id: &RemoteObjectId) -> anyhow::Result; - /// Lists all items the storage has right now. - async fn list(&self) -> anyhow::Result>; + async fn list(&self) -> anyhow::Result>; /// Lists all top level subdirectories for a given prefix /// Note: here we assume that if the prefix is passed it was obtained via remote_object_id /// which already takes into account any kind of global prefix (prefix_in_bucket for S3 or storage_root for LocalFS) /// so this method doesnt need to. - async fn list_prefixes( - &self, - prefix: Option<&RemoteObjectId>, - ) -> anyhow::Result>; + async fn list_prefixes(&self, prefix: Option<&RemotePath>) -> anyhow::Result>; /// Streams the local file contents into remote into the remote storage entry. async fn upload( &self, - from: Box<(dyn io::AsyncRead + Unpin + Send + Sync + 'static)>, + data: Box<(dyn io::AsyncRead + Unpin + Send + Sync + 'static)>, // S3 PUT request requires the content length to be specified, // otherwise it starts to fail with the concurrent connection count increasing. - from_size_bytes: usize, - to: &RemoteObjectId, + data_size_bytes: usize, + to: &RemotePath, metadata: Option, ) -> anyhow::Result<()>; /// Streams the remote storage entry contents into the buffered writer given, returns the filled writer. /// Returns the metadata, if any was stored with the file previously. - async fn download(&self, from: &RemoteObjectId) -> Result; + async fn download(&self, from: &RemotePath) -> Result; /// Streams a given byte range of the remote storage entry contents into the buffered writer given, returns the filled writer. /// Returns the metadata, if any was stored with the file previously. async fn download_byte_range( &self, - from: &RemoteObjectId, + from: &RemotePath, start_inclusive: u64, end_exclusive: Option, ) -> Result; - async fn delete(&self, path: &RemoteObjectId) -> anyhow::Result<()>; + async fn delete(&self, path: &RemotePath) -> anyhow::Result<()>; /// Downcast to LocalFs implementation. For tests. fn as_local(&self) -> Option<&LocalFs> { @@ -196,18 +170,17 @@ impl Deref for GenericRemoteStorage { impl GenericRemoteStorage { pub fn from_config( - working_directory: PathBuf, storage_config: &RemoteStorageConfig, ) -> anyhow::Result { Ok(match &storage_config.storage { RemoteStorageKind::LocalFs(root) => { info!("Using fs root '{}' as a remote storage", root.display()); - GenericRemoteStorage::LocalFs(LocalFs::new(root.clone(), working_directory)?) + GenericRemoteStorage::LocalFs(LocalFs::new(root.clone())?) } RemoteStorageKind::AwsS3(s3_config) => { info!("Using s3 bucket '{}' in region '{}' as a remote storage, prefix in bucket: '{:?}', bucket endpoint: '{:?}'", s3_config.bucket_name, s3_config.bucket_region, s3_config.prefix_in_bucket, s3_config.endpoint); - GenericRemoteStorage::AwsS3(Arc::new(S3Bucket::new(s3_config, working_directory)?)) + GenericRemoteStorage::AwsS3(Arc::new(S3Bucket::new(s3_config)?)) } }) } @@ -221,23 +194,12 @@ impl GenericRemoteStorage { &self, from: Box, from_size_bytes: usize, - from_path: &Path, + to: &RemotePath, ) -> anyhow::Result<()> { - let target_storage_path = self.remote_object_id(from_path).with_context(|| { - format!( - "Failed to get the storage path for source local path '{}'", - from_path.display() - ) - })?; - - self.upload(from, from_size_bytes, &target_storage_path, None) + self.upload(from, from_size_bytes, to, None) .await .with_context(|| { - format!( - "Failed to upload from '{}' to storage path '{:?}'", - from_path.display(), - target_storage_path - ) + format!("Failed to upload data of length {from_size_bytes} to storage path {to:?}") }) } @@ -246,24 +208,11 @@ impl GenericRemoteStorage { pub async fn download_storage_object( &self, byte_range: Option<(u64, Option)>, - to_path: &Path, + from: &RemotePath, ) -> Result { - let remote_object_path = self - .remote_object_id(to_path) - .with_context(|| { - format!( - "Failed to get the storage path for target local path '{}'", - to_path.display() - ) - }) - .map_err(DownloadError::BadInput)?; - match byte_range { - Some((start, end)) => { - self.download_byte_range(&remote_object_path, start, end) - .await - } - None => self.download(&remote_object_path).await, + Some((start, end)) => self.download_byte_range(from, start, end).await, + None => self.download(from).await, } } } @@ -273,23 +222,6 @@ impl GenericRemoteStorage { #[derive(Debug, Clone, PartialEq, Eq)] pub struct StorageMetadata(HashMap); -fn strip_path_prefix<'a>(prefix: &'a Path, path: &'a Path) -> anyhow::Result<&'a Path> { - if prefix == path { - anyhow::bail!( - "Prefix and the path are equal, cannot strip: '{}'", - prefix.display() - ) - } else { - path.strip_prefix(prefix).with_context(|| { - format!( - "Path '{}' is not prefixed with '{}'", - path.display(), - prefix.display(), - ) - }) - } -} - /// External backup storage configuration, enough for creating a client for that storage. #[derive(Debug, Clone, PartialEq, Eq)] pub struct RemoteStorageConfig { @@ -433,21 +365,24 @@ mod tests { use super::*; #[test] - fn object_name() { - let k = RemoteObjectId("a/b/c".to_owned()); + fn test_object_name() { + let k = RemotePath::new(Path::new("a/b/c")).unwrap(); assert_eq!(k.object_name(), Some("c")); - let k = RemoteObjectId("a/b/c/".to_owned()); + let k = RemotePath::new(Path::new("a/b/c/")).unwrap(); assert_eq!(k.object_name(), Some("c")); - let k = RemoteObjectId("a/".to_owned()); + let k = RemotePath::new(Path::new("a/")).unwrap(); assert_eq!(k.object_name(), Some("a")); // XXX is it impossible to have an empty key? - let k = RemoteObjectId("".to_owned()); - assert_eq!(k.object_name(), None); - - let k = RemoteObjectId("/".to_owned()); + let k = RemotePath::new(Path::new("")).unwrap(); assert_eq!(k.object_name(), None); } + + #[test] + fn rempte_path_cannot_be_created_from_absolute_ones() { + let err = RemotePath::new(Path::new("/")).expect_err("Should fail on absolute paths"); + assert_eq!(err.to_string(), "Path \"/\" is not relative"); + } } diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index 363d47f38d..3e2bded203 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -5,6 +5,7 @@ //! volume is mounted to the local FS. use std::{ + borrow::Cow, future::Future, path::{Path, PathBuf}, pin::Pin, @@ -18,61 +19,33 @@ use tokio::{ use tracing::*; use utils::crashsafe::path_with_suffix_extension; -use crate::{Download, DownloadError, RemoteObjectId}; +use crate::{Download, DownloadError, RemotePath}; -use super::{strip_path_prefix, RemoteStorage, StorageMetadata}; +use super::{RemoteStorage, StorageMetadata}; const LOCAL_FS_TEMP_FILE_SUFFIX: &str = "___temp"; -/// Convert a Path in the remote storage into a RemoteObjectId -fn remote_object_id_from_path(path: &Path) -> anyhow::Result { - Ok(RemoteObjectId( - path.to_str() - .ok_or_else(|| anyhow::anyhow!("unexpected characters found in path"))? - .to_string(), - )) -} - #[derive(Debug, Clone)] pub struct LocalFs { - working_directory: PathBuf, storage_root: PathBuf, } impl LocalFs { /// Attempts to create local FS storage, along with its root directory. - pub fn new(root: PathBuf, working_directory: PathBuf) -> anyhow::Result { - if !root.exists() { - std::fs::create_dir_all(&root).with_context(|| { - format!( - "Failed to create all directories in the given root path '{}'", - root.display(), - ) + /// Storage root will be created (if does not exist) and transformed into an absolute path (if passed as relative). + pub fn new(mut storage_root: PathBuf) -> anyhow::Result { + if !storage_root.exists() { + std::fs::create_dir_all(&storage_root).with_context(|| { + format!("Failed to create all directories in the given root path {storage_root:?}") })?; } - Ok(Self { - working_directory, - storage_root: root, - }) - } - - /// - /// Get the absolute path in the local filesystem to given remote object. - /// - /// This is public so that it can be used in tests. Should not be used elsewhere. - /// - pub fn resolve_in_storage(&self, remote_object_id: &RemoteObjectId) -> anyhow::Result { - let path = PathBuf::from(&remote_object_id.0); - if path.is_relative() { - Ok(self.storage_root.join(path)) - } else if path.starts_with(&self.storage_root) { - Ok(path) - } else { - bail!( - "Path '{}' does not belong to the current storage", - path.display() - ) + if !storage_root.is_absolute() { + storage_root = storage_root.canonicalize().with_context(|| { + format!("Failed to represent path {storage_root:?} as an absolute path") + })?; } + + Ok(Self { storage_root }) } async fn read_storage_metadata( @@ -104,45 +77,48 @@ impl LocalFs { #[async_trait::async_trait] impl RemoteStorage for LocalFs { - /// Convert a "local" path into a "remote path" - fn remote_object_id(&self, local_path: &Path) -> anyhow::Result { - let path = self.storage_root.join( - strip_path_prefix(&self.working_directory, local_path) - .context("local path does not belong to this storage")?, - ); - remote_object_id_from_path(&path) + async fn list(&self) -> anyhow::Result> { + Ok(get_all_files(&self.storage_root, true) + .await? + .into_iter() + .map(|path| { + path.strip_prefix(&self.storage_root) + .context("Failed to strip storage root prefix") + .and_then(RemotePath::new) + .expect( + "We list files for storage root, hence should be able to remote the prefix", + ) + }) + .collect()) } - fn local_path(&self, remote_object_id: &RemoteObjectId) -> anyhow::Result { - let storage_path = PathBuf::from(&remote_object_id.0); - let relative_path = strip_path_prefix(&self.storage_root, &storage_path) - .context("local path does not belong to this storage")?; - Ok(self.working_directory.join(relative_path)) - } - - async fn list(&self) -> anyhow::Result> { - get_all_files(&self.storage_root, true).await - } - - async fn list_prefixes( - &self, - prefix: Option<&RemoteObjectId>, - ) -> anyhow::Result> { + async fn list_prefixes(&self, prefix: Option<&RemotePath>) -> anyhow::Result> { let path = match prefix { - Some(prefix) => Path::new(&prefix.0), - None => &self.storage_root, + Some(prefix) => Cow::Owned(prefix.with_base(&self.storage_root)), + None => Cow::Borrowed(&self.storage_root), }; - get_all_files(path, false).await + Ok(get_all_files(path.as_ref(), false) + .await? + .into_iter() + .map(|path| { + path.strip_prefix(&self.storage_root) + .context("Failed to strip preifix") + .and_then(RemotePath::new) + .expect( + "We list files for storage root, hence should be able to remote the prefix", + ) + }) + .collect()) } async fn upload( &self, - from: Box<(dyn io::AsyncRead + Unpin + Send + Sync + 'static)>, - from_size_bytes: usize, - to: &RemoteObjectId, + data: Box<(dyn io::AsyncRead + Unpin + Send + Sync + 'static)>, + data_size_bytes: usize, + to: &RemotePath, metadata: Option, ) -> anyhow::Result<()> { - let target_file_path = self.resolve_in_storage(to)?; + let target_file_path = to.with_base(&self.storage_root); create_target_directory(&target_file_path).await?; // We need this dance with sort of durable rename (without fsyncs) // to prevent partial uploads. This was really hit when pageserver shutdown @@ -163,8 +139,8 @@ impl RemoteStorage for LocalFs { })?, ); - let from_size_bytes = from_size_bytes as u64; - let mut buffer_to_read = from.take(from_size_bytes); + let from_size_bytes = data_size_bytes as u64; + let mut buffer_to_read = data.take(from_size_bytes); let bytes_read = io::copy(&mut buffer_to_read, &mut destination) .await @@ -221,27 +197,22 @@ impl RemoteStorage for LocalFs { Ok(()) } - async fn download(&self, from: &RemoteObjectId) -> Result { - let file_path = self - .resolve_in_storage(from) - .map_err(DownloadError::BadInput)?; - if file_exists(&file_path).map_err(DownloadError::BadInput)? { + async fn download(&self, from: &RemotePath) -> Result { + let target_path = from.with_base(&self.storage_root); + if file_exists(&target_path).map_err(DownloadError::BadInput)? { let source = io::BufReader::new( fs::OpenOptions::new() .read(true) - .open(&file_path) + .open(&target_path) .await .with_context(|| { - format!( - "Failed to open source file '{}' to use in the download", - file_path.display() - ) + format!("Failed to open source file {target_path:?} to use in the download") }) .map_err(DownloadError::Other)?, ); let metadata = self - .read_storage_metadata(&file_path) + .read_storage_metadata(&target_path) .await .map_err(DownloadError::Other)?; Ok(Download { @@ -255,7 +226,7 @@ impl RemoteStorage for LocalFs { async fn download_byte_range( &self, - from: &RemoteObjectId, + from: &RemotePath, start_inclusive: u64, end_exclusive: Option, ) -> Result { @@ -267,20 +238,15 @@ impl RemoteStorage for LocalFs { return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) and end_exclusive ({end_exclusive:?}) difference is zero bytes"))); } } - let file_path = self - .resolve_in_storage(from) - .map_err(DownloadError::BadInput)?; - if file_exists(&file_path).map_err(DownloadError::BadInput)? { + let target_path = from.with_base(&self.storage_root); + if file_exists(&target_path).map_err(DownloadError::BadInput)? { let mut source = io::BufReader::new( fs::OpenOptions::new() .read(true) - .open(&file_path) + .open(&target_path) .await .with_context(|| { - format!( - "Failed to open source file '{}' to use in the download", - file_path.display() - ) + format!("Failed to open source file {target_path:?} to use in the download") }) .map_err(DownloadError::Other)?, ); @@ -290,7 +256,7 @@ impl RemoteStorage for LocalFs { .context("Failed to seek to the range start in a local storage file") .map_err(DownloadError::Other)?; let metadata = self - .read_storage_metadata(&file_path) + .read_storage_metadata(&target_path) .await .map_err(DownloadError::Other)?; @@ -309,15 +275,12 @@ impl RemoteStorage for LocalFs { } } - async fn delete(&self, path: &RemoteObjectId) -> anyhow::Result<()> { - let file_path = self.resolve_in_storage(path)?; + async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> { + let file_path = path.with_base(&self.storage_root); if file_path.exists() && file_path.is_file() { Ok(fs::remove_file(file_path).await?) } else { - bail!( - "File '{}' either does not exist or is not a file", - file_path.display() - ) + bail!("File {file_path:?} either does not exist or is not a file") } } @@ -333,7 +296,7 @@ fn storage_metadata_path(original_path: &Path) -> PathBuf { fn get_all_files<'a, P>( directory_path: P, recursive: bool, -) -> Pin>> + Send + Sync + 'a>> +) -> Pin>> + Send + Sync + 'a>> where P: AsRef + Send + Sync + 'a, { @@ -347,20 +310,20 @@ where let file_type = dir_entry.file_type().await?; let entry_path = dir_entry.path(); if file_type.is_symlink() { - debug!("{:?} us a symlink, skipping", entry_path) + debug!("{entry_path:?} us a symlink, skipping") } else if file_type.is_dir() { if recursive { paths.extend(get_all_files(&entry_path, true).await?.into_iter()) } else { - paths.push(remote_object_id_from_path(&dir_entry.path())?) + paths.push(entry_path) } } else { - paths.push(remote_object_id_from_path(&dir_entry.path())?); + paths.push(entry_path); } } Ok(paths) } else { - bail!("Path '{}' is not a directory", directory_path.display()) + bail!("Path {directory_path:?} is not a directory") } } else { Ok(Vec::new()) @@ -395,173 +358,6 @@ fn file_exists(file_path: &Path) -> anyhow::Result { } } -#[cfg(test)] -mod pure_tests { - use tempfile::tempdir; - - use super::*; - - #[test] - fn storage_path_positive() -> anyhow::Result<()> { - let workdir = tempdir()?.path().to_owned(); - - let storage_root = PathBuf::from("somewhere").join("else"); - let storage = LocalFs { - working_directory: workdir.clone(), - storage_root: storage_root.clone(), - }; - - let local_path = workdir - .join("timelines") - .join("some_timeline") - .join("file_name"); - let expected_path = storage_root.join(local_path.strip_prefix(&workdir)?); - - let actual_path = PathBuf::from( - storage - .remote_object_id(&local_path) - .expect("Matching path should map to storage path normally") - .0, - ); - assert_eq!( - expected_path, - actual_path, - "File paths from workdir should be stored in local fs storage with the same path they have relative to the workdir" - ); - - Ok(()) - } - - #[test] - fn storage_path_negatives() -> anyhow::Result<()> { - #[track_caller] - fn storage_path_error(storage: &LocalFs, mismatching_path: &Path) -> String { - match storage.remote_object_id(mismatching_path) { - Ok(wrong_path) => panic!( - "Expected path '{}' to error, but got storage path: {:?}", - mismatching_path.display(), - wrong_path, - ), - Err(e) => format!("{:?}", e), - } - } - - let workdir = tempdir()?.path().to_owned(); - let storage_root = PathBuf::from("somewhere").join("else"); - let storage = LocalFs { - working_directory: workdir.clone(), - storage_root, - }; - - let error_string = storage_path_error(&storage, &workdir); - assert!(error_string.contains("does not belong to this storage")); - assert!(error_string.contains(workdir.to_str().unwrap())); - - let mismatching_path_str = "/something/else"; - let error_message = storage_path_error(&storage, Path::new(mismatching_path_str)); - assert!( - error_message.contains(mismatching_path_str), - "Error should mention wrong path" - ); - assert!( - error_message.contains(workdir.to_str().unwrap()), - "Error should mention server workdir" - ); - assert!(error_message.contains("does not belong to this storage")); - - Ok(()) - } - - #[test] - fn local_path_positive() -> anyhow::Result<()> { - let workdir = tempdir()?.path().to_owned(); - let storage_root = PathBuf::from("somewhere").join("else"); - let storage = LocalFs { - working_directory: workdir.clone(), - storage_root: storage_root.clone(), - }; - - let name = "not a metadata"; - let local_path = workdir.join("timelines").join("some_timeline").join(name); - assert_eq!( - local_path, - storage - .local_path(&remote_object_id_from_path( - &storage_root.join(local_path.strip_prefix(&workdir)?) - )?) - .expect("For a valid input, valid local path should be parsed"), - "Should be able to parse metadata out of the correctly named remote delta file" - ); - - let local_metadata_path = workdir - .join("timelines") - .join("some_timeline") - .join("metadata"); - let remote_metadata_path = storage.remote_object_id(&local_metadata_path)?; - assert_eq!( - local_metadata_path, - storage - .local_path(&remote_metadata_path) - .expect("For a valid input, valid local path should be parsed"), - "Should be able to parse metadata out of the correctly named remote metadata file" - ); - - Ok(()) - } - - #[test] - fn local_path_negatives() -> anyhow::Result<()> { - #[track_caller] - fn local_path_error(storage: &LocalFs, storage_path: &RemoteObjectId) -> String { - match storage.local_path(storage_path) { - Ok(wrong_path) => panic!( - "Expected local path input {:?} to cause an error, but got file path: {:?}", - storage_path, wrong_path, - ), - Err(e) => format!("{:?}", e), - } - } - - let storage_root = PathBuf::from("somewhere").join("else"); - let storage = LocalFs { - working_directory: tempdir()?.path().to_owned(), - storage_root, - }; - - let totally_wrong_path = "wrong_wrong_wrong"; - let error_message = - local_path_error(&storage, &RemoteObjectId(totally_wrong_path.to_string())); - assert!(error_message.contains(totally_wrong_path)); - - Ok(()) - } - - #[test] - fn download_destination_matches_original_path() -> anyhow::Result<()> { - let workdir = tempdir()?.path().to_owned(); - let original_path = workdir - .join("timelines") - .join("some_timeline") - .join("some name"); - - let storage_root = PathBuf::from("somewhere").join("else"); - let dummy_storage = LocalFs { - working_directory: workdir, - storage_root, - }; - - let storage_path = dummy_storage.remote_object_id(&original_path)?; - let download_destination = dummy_storage.local_path(&storage_path)?; - - assert_eq!( - original_path, download_destination, - "'original path -> storage path -> matching fs path' transformation should produce the same path as the input one for the correct path" - ); - - Ok(()) - } -} - #[cfg(test)] mod fs_tests { use super::*; @@ -573,7 +369,7 @@ mod fs_tests { storage: &LocalFs, #[allow(clippy::ptr_arg)] // have to use &PathBuf due to `storage.local_path` parameter requirements - remote_storage_path: &RemoteObjectId, + remote_storage_path: &RemotePath, expected_metadata: Option<&StorageMetadata>, ) -> anyhow::Result { let mut download = storage @@ -596,41 +392,16 @@ mod fs_tests { #[tokio::test] async fn upload_file() -> anyhow::Result<()> { - let workdir = tempdir()?.path().to_owned(); let storage = create_storage()?; - let (file, size) = create_file_for_upload( - &storage.working_directory.join("whatever"), - "whatever_contents", - ) - .await?; - let target_path = "/somewhere/else"; - match storage - .upload( - Box::new(file), - size, - &RemoteObjectId(target_path.to_string()), - None, - ) - .await - { - Ok(()) => panic!("Should not allow storing files with wrong target path"), - Err(e) => { - let message = format!("{:?}", e); - assert!(message.contains(target_path)); - assert!(message.contains("does not belong to the current storage")); - } - } - assert!(storage.list().await?.is_empty()); - - let target_path_1 = upload_dummy_file(&workdir, &storage, "upload_1", None).await?; + let target_path_1 = upload_dummy_file(&storage, "upload_1", None).await?; assert_eq!( storage.list().await?, vec![target_path_1.clone()], "Should list a single file after first upload" ); - let target_path_2 = upload_dummy_file(&workdir, &storage, "upload_2", None).await?; + let target_path_2 = upload_dummy_file(&storage, "upload_2", None).await?; assert_eq!( list_files_sorted(&storage).await?, vec![target_path_1.clone(), target_path_2.clone()], @@ -644,7 +415,7 @@ mod fs_tests { async fn upload_file_negatives() -> anyhow::Result<()> { let storage = create_storage()?; - let id = storage.remote_object_id(&storage.working_directory.join("dummy"))?; + let id = RemotePath::new(Path::new("dummy"))?; let content = std::io::Cursor::new(b"12345"); // Check that you get an error if the size parameter doesn't match the actual @@ -669,16 +440,14 @@ mod fs_tests { } fn create_storage() -> anyhow::Result { - LocalFs::new(tempdir()?.path().to_owned(), tempdir()?.path().to_owned()) + LocalFs::new(tempdir()?.path().to_owned()) } #[tokio::test] async fn download_file() -> anyhow::Result<()> { - let workdir = tempdir()?.path().to_owned(); - let storage = create_storage()?; let upload_name = "upload_1"; - let upload_target = upload_dummy_file(&workdir, &storage, upload_name, None).await?; + let upload_target = upload_dummy_file(&storage, upload_name, None).await?; let contents = read_and_assert_remote_file_contents(&storage, &upload_target, None).await?; assert_eq!( @@ -688,7 +457,7 @@ mod fs_tests { ); let non_existing_path = "somewhere/else"; - match storage.download(&RemoteObjectId(non_existing_path.to_string())).await { + match storage.download(&RemotePath::new(Path::new(non_existing_path))?).await { Err(DownloadError::NotFound) => {} // Should get NotFound for non existing keys other => panic!("Should get a NotFound error when downloading non-existing storage files, but got: {other:?}"), } @@ -697,11 +466,9 @@ mod fs_tests { #[tokio::test] async fn download_file_range_positive() -> anyhow::Result<()> { - let workdir = tempdir()?.path().to_owned(); - let storage = create_storage()?; let upload_name = "upload_1"; - let upload_target = upload_dummy_file(&workdir, &storage, upload_name, None).await?; + let upload_target = upload_dummy_file(&storage, upload_name, None).await?; let full_range_download_contents = read_and_assert_remote_file_contents(&storage, &upload_target, None).await?; @@ -767,11 +534,9 @@ mod fs_tests { #[tokio::test] async fn download_file_range_negative() -> anyhow::Result<()> { - let workdir = tempdir()?.path().to_owned(); - let storage = create_storage()?; let upload_name = "upload_1"; - let upload_target = upload_dummy_file(&workdir, &storage, upload_name, None).await?; + let upload_target = upload_dummy_file(&storage, upload_name, None).await?; let start = 1_000_000_000; let end = start + 1; @@ -813,11 +578,9 @@ mod fs_tests { #[tokio::test] async fn delete_file() -> anyhow::Result<()> { - let workdir = tempdir()?.path().to_owned(); - let storage = create_storage()?; let upload_name = "upload_1"; - let upload_target = upload_dummy_file(&workdir, &storage, upload_name, None).await?; + let upload_target = upload_dummy_file(&storage, upload_name, None).await?; storage.delete(&upload_target).await?; assert!(storage.list().await?.is_empty()); @@ -827,7 +590,8 @@ mod fs_tests { Err(e) => { let error_string = e.to_string(); assert!(error_string.contains("does not exist")); - assert!(error_string.contains(&upload_target.0)); + let expected_path = upload_target.with_base(&storage.storage_root); + assert!(error_string.contains(expected_path.to_str().unwrap())); } } Ok(()) @@ -835,8 +599,6 @@ mod fs_tests { #[tokio::test] async fn file_with_metadata() -> anyhow::Result<()> { - let workdir = tempdir()?.path().to_owned(); - let storage = create_storage()?; let upload_name = "upload_1"; let metadata = StorageMetadata(HashMap::from([ @@ -844,7 +606,7 @@ mod fs_tests { ("two".to_string(), "2".to_string()), ])); let upload_target = - upload_dummy_file(&workdir, &storage, upload_name, Some(metadata.clone())).await?; + upload_dummy_file(&storage, upload_name, Some(metadata.clone())).await?; let full_range_download_contents = read_and_assert_remote_file_contents(&storage, &upload_target, Some(&metadata)).await?; @@ -884,23 +646,32 @@ mod fs_tests { } async fn upload_dummy_file( - workdir: &Path, storage: &LocalFs, name: &str, metadata: Option, - ) -> anyhow::Result { - let timeline_path = workdir.join("timelines").join("some_timeline"); - let relative_timeline_path = timeline_path.strip_prefix(&workdir)?; - let storage_path = storage.storage_root.join(relative_timeline_path).join(name); - let remote_object_id = RemoteObjectId(storage_path.to_str().unwrap().to_string()); - - let from_path = storage.working_directory.join(name); + ) -> anyhow::Result { + let from_path = storage + .storage_root + .join("timelines") + .join("some_timeline") + .join(name); let (file, size) = create_file_for_upload(&from_path, &dummy_contents(name)).await?; + let relative_path = from_path + .strip_prefix(&storage.storage_root) + .context("Failed to strip storage root prefix") + .and_then(RemotePath::new) + .with_context(|| { + format!( + "Failed to resolve remote part of path {:?} for base {:?}", + from_path, storage.storage_root + ) + })?; + storage - .upload(Box::new(file), size, &remote_object_id, metadata) + .upload(Box::new(file), size, &relative_path, metadata) .await?; - remote_object_id_from_path(&storage_path) + Ok(relative_path) } async fn create_file_for_upload( @@ -925,7 +696,7 @@ mod fs_tests { format!("contents for {name}") } - async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result> { + async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result> { let mut files = storage.list().await?; files.sort_by(|a, b| a.0.cmp(&b.0)); Ok(files) diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index c721560c29..ab1e5da6c5 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -5,7 +5,6 @@ //! their bucket prefixes are both specified and different. use std::env::var; -use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; @@ -29,8 +28,7 @@ use tracing::debug; use super::StorageMetadata; use crate::{ - strip_path_prefix, Download, DownloadError, RemoteObjectId, RemoteStorage, S3Config, - REMOTE_STORAGE_PREFIX_SEPARATOR, + Download, DownloadError, RemotePath, RemoteStorage, S3Config, REMOTE_STORAGE_PREFIX_SEPARATOR, }; const DEFAULT_IMDS_TIMEOUT: Duration = Duration::from_secs(10); @@ -100,31 +98,8 @@ pub(super) mod metrics { } } -fn download_destination( - id: &RemoteObjectId, - workdir: &Path, - prefix_to_strip: Option<&str>, -) -> PathBuf { - let path_without_prefix = match prefix_to_strip { - Some(prefix) => id.0.strip_prefix(prefix).unwrap_or_else(|| { - panic!( - "Could not strip prefix '{}' from S3 object key '{}'", - prefix, id.0 - ) - }), - None => &id.0, - }; - - workdir.join( - path_without_prefix - .split(REMOTE_STORAGE_PREFIX_SEPARATOR) - .collect::(), - ) -} - /// AWS S3 storage. pub struct S3Bucket { - workdir: PathBuf, client: Client, bucket_name: String, prefix_in_bucket: Option, @@ -142,7 +117,7 @@ struct GetObjectRequest { } impl S3Bucket { /// Creates the S3 storage, errors if incorrect AWS S3 configuration provided. - pub fn new(aws_config: &S3Config, workdir: PathBuf) -> anyhow::Result { + pub fn new(aws_config: &S3Config) -> anyhow::Result { debug!( "Creating s3 remote storage for S3 bucket {}", aws_config.bucket_name @@ -196,13 +171,39 @@ impl S3Bucket { }); Ok(Self { client, - workdir, bucket_name: aws_config.bucket_name.clone(), prefix_in_bucket, concurrency_limiter: Semaphore::new(aws_config.concurrency_limit.get()), }) } + fn s3_object_to_relative_path(&self, key: &str) -> RemotePath { + let relative_path = + match key.strip_prefix(self.prefix_in_bucket.as_deref().unwrap_or_default()) { + Some(stripped) => stripped, + // we rely on AWS to return properly prefixed paths + // for requests with a certain prefix + None => panic!( + "Key {} does not start with bucket prefix {:?}", + key, self.prefix_in_bucket + ), + }; + RemotePath( + relative_path + .split(REMOTE_STORAGE_PREFIX_SEPARATOR) + .collect(), + ) + } + + fn relative_path_to_s3_object(&self, path: &RemotePath) -> String { + let mut full_path = self.prefix_in_bucket.clone().unwrap_or_default(); + for segment in path.0.iter() { + full_path.push(REMOTE_STORAGE_PREFIX_SEPARATOR); + full_path.push_str(segment.to_str().unwrap_or_default()); + } + full_path + } + async fn download_object(&self, request: GetObjectRequest) -> Result { let _guard = self .concurrency_limiter @@ -252,25 +253,7 @@ impl S3Bucket { #[async_trait::async_trait] impl RemoteStorage for S3Bucket { - fn remote_object_id(&self, local_path: &Path) -> anyhow::Result { - let relative_path = strip_path_prefix(&self.workdir, local_path)?; - let mut key = self.prefix_in_bucket.clone().unwrap_or_default(); - for segment in relative_path { - key.push(REMOTE_STORAGE_PREFIX_SEPARATOR); - key.push_str(&segment.to_string_lossy()); - } - Ok(RemoteObjectId(key)) - } - - fn local_path(&self, storage_path: &RemoteObjectId) -> anyhow::Result { - Ok(download_destination( - storage_path, - &self.workdir, - self.prefix_in_bucket.as_deref(), - )) - } - - async fn list(&self) -> anyhow::Result> { + async fn list(&self) -> anyhow::Result> { let mut document_keys = Vec::new(); let mut continuation_token = None; @@ -300,7 +283,7 @@ impl RemoteStorage for S3Bucket { .contents .unwrap_or_default() .into_iter() - .filter_map(|o| Some(RemoteObjectId(o.key?))), + .filter_map(|o| Some(self.s3_object_to_relative_path(o.key()?))), ); match fetch_response.continuation_token { @@ -314,13 +297,10 @@ impl RemoteStorage for S3Bucket { /// See the doc for `RemoteStorage::list_prefixes` /// Note: it wont include empty "directories" - async fn list_prefixes( - &self, - prefix: Option<&RemoteObjectId>, - ) -> anyhow::Result> { + async fn list_prefixes(&self, prefix: Option<&RemotePath>) -> anyhow::Result> { // get the passed prefix or if it is not set use prefix_in_bucket value let list_prefix = prefix - .map(|p| p.0.clone()) + .map(|p| self.relative_path_to_s3_object(p)) .or_else(|| self.prefix_in_bucket.clone()) .map(|mut p| { // required to end with a separator @@ -362,7 +342,7 @@ impl RemoteStorage for S3Bucket { .common_prefixes .unwrap_or_default() .into_iter() - .filter_map(|o| Some(RemoteObjectId(o.prefix?))), + .filter_map(|o| Some(self.s3_object_to_relative_path(o.prefix()?))), ); match fetch_response.continuation_token { @@ -378,7 +358,7 @@ impl RemoteStorage for S3Bucket { &self, from: Box<(dyn io::AsyncRead + Unpin + Send + Sync + 'static)>, from_size_bytes: usize, - to: &RemoteObjectId, + to: &RemotePath, metadata: Option, ) -> anyhow::Result<()> { let _guard = self @@ -395,7 +375,7 @@ impl RemoteStorage for S3Bucket { self.client .put_object() .bucket(self.bucket_name.clone()) - .key(to.0.to_owned()) + .key(self.relative_path_to_s3_object(to)) .set_metadata(metadata.map(|m| m.0)) .content_length(from_size_bytes.try_into()?) .body(bytes_stream) @@ -408,10 +388,10 @@ impl RemoteStorage for S3Bucket { Ok(()) } - async fn download(&self, from: &RemoteObjectId) -> Result { + async fn download(&self, from: &RemotePath) -> Result { self.download_object(GetObjectRequest { bucket: self.bucket_name.clone(), - key: from.0.to_owned(), + key: self.relative_path_to_s3_object(from), ..GetObjectRequest::default() }) .await @@ -419,7 +399,7 @@ impl RemoteStorage for S3Bucket { async fn download_byte_range( &self, - from: &RemoteObjectId, + from: &RemotePath, start_inclusive: u64, end_exclusive: Option, ) -> Result { @@ -427,19 +407,19 @@ impl RemoteStorage for S3Bucket { // and needs both ends to be exclusive let end_inclusive = end_exclusive.map(|end| end.saturating_sub(1)); let range = Some(match end_inclusive { - Some(end_inclusive) => format!("bytes={}-{}", start_inclusive, end_inclusive), - None => format!("bytes={}-", start_inclusive), + Some(end_inclusive) => format!("bytes={start_inclusive}-{end_inclusive}"), + None => format!("bytes={start_inclusive}-"), }); self.download_object(GetObjectRequest { bucket: self.bucket_name.clone(), - key: from.0.to_owned(), + key: self.relative_path_to_s3_object(from), range, }) .await } - async fn delete(&self, remote_object_id: &RemoteObjectId) -> anyhow::Result<()> { + async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> { let _guard = self .concurrency_limiter .acquire() @@ -451,7 +431,7 @@ impl RemoteStorage for S3Bucket { self.client .delete_object() .bucket(self.bucket_name.clone()) - .key(remote_object_id.0.to_owned()) + .key(self.relative_path_to_s3_object(path)) .send() .await .map_err(|e| { @@ -461,181 +441,3 @@ impl RemoteStorage for S3Bucket { Ok(()) } } - -#[cfg(test)] -mod tests { - use tempfile::tempdir; - - use super::*; - - #[test] - fn test_download_destination() -> anyhow::Result<()> { - let workdir = tempdir()?.path().to_owned(); - let local_path = workdir.join("one").join("two").join("test_name"); - let relative_path = local_path.strip_prefix(&workdir)?; - - let key = RemoteObjectId(format!( - "{}{}", - REMOTE_STORAGE_PREFIX_SEPARATOR, - relative_path - .iter() - .map(|segment| segment.to_str().unwrap()) - .collect::>() - .join(&REMOTE_STORAGE_PREFIX_SEPARATOR.to_string()), - )); - - assert_eq!( - local_path, - download_destination(&key, &workdir, None), - "Download destination should consist of s3 path joined with the workdir prefix" - ); - - Ok(()) - } - - #[test] - fn storage_path_positive() -> anyhow::Result<()> { - let workdir = tempdir()?.path().to_owned(); - - let segment_1 = "matching"; - let segment_2 = "file"; - let local_path = &workdir.join(segment_1).join(segment_2); - - let storage = dummy_storage(workdir); - - let expected_key = RemoteObjectId(format!( - "{}{REMOTE_STORAGE_PREFIX_SEPARATOR}{segment_1}{REMOTE_STORAGE_PREFIX_SEPARATOR}{segment_2}", - storage.prefix_in_bucket.as_deref().unwrap_or_default(), - )); - - let actual_key = storage - .remote_object_id(local_path) - .expect("Matching path should map to S3 path normally"); - assert_eq!( - expected_key, - actual_key, - "S3 key from the matching path should contain all segments after the workspace prefix, separated with S3 separator" - ); - - Ok(()) - } - - #[test] - fn storage_path_negatives() -> anyhow::Result<()> { - #[track_caller] - fn storage_path_error(storage: &S3Bucket, mismatching_path: &Path) -> String { - match storage.remote_object_id(mismatching_path) { - Ok(wrong_key) => panic!( - "Expected path '{}' to error, but got S3 key: {:?}", - mismatching_path.display(), - wrong_key, - ), - Err(e) => e.to_string(), - } - } - - let workdir = tempdir()?.path().to_owned(); - let storage = dummy_storage(workdir.clone()); - - let error_message = storage_path_error(&storage, &workdir); - assert!( - error_message.contains("Prefix and the path are equal"), - "Message '{}' does not contain the required string", - error_message - ); - - let mismatching_path = PathBuf::from("somewhere").join("else"); - let error_message = storage_path_error(&storage, &mismatching_path); - assert!( - error_message.contains(mismatching_path.to_str().unwrap()), - "Error should mention wrong path" - ); - assert!( - error_message.contains(workdir.to_str().unwrap()), - "Error should mention server workdir" - ); - assert!( - error_message.contains("is not prefixed with"), - "Message '{}' does not contain a required string", - error_message - ); - - Ok(()) - } - - #[test] - fn local_path_positive() -> anyhow::Result<()> { - let workdir = tempdir()?.path().to_owned(); - let storage = dummy_storage(workdir.clone()); - let timeline_dir = workdir.join("timelines").join("test_timeline"); - let relative_timeline_path = timeline_dir.strip_prefix(&workdir)?; - - let s3_key = create_s3_key( - &relative_timeline_path.join("not a metadata"), - storage.prefix_in_bucket.as_deref(), - ); - assert_eq!( - download_destination(&s3_key, &workdir, storage.prefix_in_bucket.as_deref()), - storage - .local_path(&s3_key) - .expect("For a valid input, valid S3 info should be parsed"), - "Should be able to parse metadata out of the correctly named remote delta file" - ); - - let s3_key = create_s3_key( - &relative_timeline_path.join("metadata"), - storage.prefix_in_bucket.as_deref(), - ); - assert_eq!( - download_destination(&s3_key, &workdir, storage.prefix_in_bucket.as_deref()), - storage - .local_path(&s3_key) - .expect("For a valid input, valid S3 info should be parsed"), - "Should be able to parse metadata out of the correctly named remote metadata file" - ); - - Ok(()) - } - - #[test] - fn download_destination_matches_original_path() -> anyhow::Result<()> { - let workdir = tempdir()?.path().to_owned(); - let original_path = workdir - .join("timelines") - .join("some_timeline") - .join("some name"); - - let dummy_storage = dummy_storage(workdir); - - let key = dummy_storage.remote_object_id(&original_path)?; - let download_destination = dummy_storage.local_path(&key)?; - - assert_eq!( - original_path, download_destination, - "'original path -> storage key -> matching fs path' transformation should produce the same path as the input one for the correct path" - ); - - Ok(()) - } - - fn dummy_storage(workdir: PathBuf) -> S3Bucket { - S3Bucket { - workdir, - client: Client::new(&aws_config::SdkConfig::builder().build()), - bucket_name: "dummy-bucket".to_string(), - prefix_in_bucket: Some("dummy_prefix/".to_string()), - concurrency_limiter: Semaphore::new(1), - } - } - - fn create_s3_key(relative_file_path: &Path, prefix: Option<&str>) -> RemoteObjectId { - RemoteObjectId(relative_file_path.iter().fold( - prefix.unwrap_or_default().to_string(), - |mut path_string, segment| { - path_string.push(REMOTE_STORAGE_PREFIX_SEPARATOR); - path_string.push_str(segment.to_str().unwrap()); - path_string - }, - )) - } -} diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 3995229e03..d70b36616b 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -280,9 +280,7 @@ fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> { let remote_storage = conf .remote_storage_config .as_ref() - .map(|storage_config| { - GenericRemoteStorage::from_config(conf.workdir.clone(), storage_config) - }) + .map(GenericRemoteStorage::from_config) .transpose() .context("Failed to init generic remote storage")?; diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 4542afae33..86f1fcef94 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -5,7 +5,7 @@ //! See also `settings.md` for better description on every parameter. use anyhow::{anyhow, bail, ensure, Context, Result}; -use remote_storage::RemoteStorageConfig; +use remote_storage::{RemotePath, RemoteStorageConfig}; use std::env; use utils::crashsafe::path_with_suffix_extension; use utils::id::ConnectionId; @@ -454,6 +454,28 @@ impl PageServerConf { .join(METADATA_FILE_NAME) } + /// Files on the remote storage are stored with paths, relative to the workdir. + /// That path includes in itself both tenant and timeline ids, allowing to have a unique remote storage path. + /// + /// Errors if the path provided does not start from pageserver's workdir. + pub fn remote_path(&self, local_path: &Path) -> anyhow::Result { + local_path + .strip_prefix(&self.workdir) + .context("Failed to strip workdir prefix") + .and_then(RemotePath::new) + .with_context(|| { + format!( + "Failed to resolve remote part of path {:?} for base {:?}", + local_path, self.workdir + ) + }) + } + + /// Turns storage remote path of a file into its local path. + pub fn local_path(&self, remote_path: &RemotePath) -> PathBuf { + remote_path.with_base(&self.workdir) + } + // // Postgres distribution paths // diff --git a/pageserver/src/storage_sync2.rs b/pageserver/src/storage_sync2.rs index 5b3225028f..b5c5a0d25d 100644 --- a/pageserver/src/storage_sync2.rs +++ b/pageserver/src/storage_sync2.rs @@ -202,7 +202,7 @@ use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, Mutex}; use anyhow::ensure; -use remote_storage::{DownloadError, GenericRemoteStorage}; +use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath}; use tokio::runtime::Runtime; use tracing::{info, warn}; use tracing::{info_span, Instrument}; @@ -217,7 +217,7 @@ use crate::metrics::RemoteOpKind; use crate::metrics::REMOTE_UPLOAD_QUEUE_UNFINISHED_TASKS; use crate::{ config::PageServerConf, - storage_sync::index::{LayerFileMetadata, RemotePath}, + storage_sync::index::LayerFileMetadata, task_mgr, task_mgr::TaskKind, task_mgr::BACKGROUND_RUNTIME, @@ -337,18 +337,18 @@ impl UploadQueue { let state = UploadQueueInitialized { // As described in the doc comment, it's ok for `latest_files` and `latest_metadata` to be ahead. - latest_files: Default::default(), + latest_files: HashMap::new(), latest_metadata: metadata.clone(), // We haven't uploaded anything yet, so, `last_uploaded_consistent_lsn` must be 0 to prevent // safekeepers from garbage-collecting anything. last_uploaded_consistent_lsn: Lsn(0), // what follows are boring default initializations - task_counter: Default::default(), + task_counter: 0, num_inprogress_layer_uploads: 0, num_inprogress_metadata_uploads: 0, num_inprogress_deletions: 0, - inprogress_tasks: Default::default(), - queued_operations: Default::default(), + inprogress_tasks: HashMap::new(), + queued_operations: VecDeque::new(), }; *self = UploadQueue::Initialized(state); @@ -357,6 +357,10 @@ impl UploadQueue { fn initialize_with_current_remote_index_part( &mut self, + conf: &'static PageServerConf, + tenant_id: TenantId, + timeline_id: TimelineId, + index_part: &IndexPart, ) -> anyhow::Result<&mut UploadQueueInitialized> { match self { @@ -366,14 +370,19 @@ impl UploadQueue { } } - let mut files = HashMap::new(); - for path in &index_part.timeline_layers { + let mut files = HashMap::with_capacity(index_part.timeline_layers.len()); + let timeline_path = conf.timeline_path(&timeline_id, &tenant_id); + for timeline_name in &index_part.timeline_layers { + let local_path = timeline_path.join(timeline_name); + let remote_timeline_path = conf.remote_path(&local_path).expect( + "Remote timeline path and local timeline path were constructed form the same conf", + ); let layer_metadata = index_part .layer_metadata - .get(path) + .get(timeline_name) .map(LayerFileMetadata::from) .unwrap_or(LayerFileMetadata::MISSING); - files.insert(path.clone(), layer_metadata); + files.insert(remote_timeline_path, layer_metadata); } let index_part_metadata = index_part.parse_metadata()?; @@ -391,8 +400,8 @@ impl UploadQueue { num_inprogress_layer_uploads: 0, num_inprogress_metadata_uploads: 0, num_inprogress_deletions: 0, - inprogress_tasks: Default::default(), - queued_operations: Default::default(), + inprogress_tasks: HashMap::new(), + queued_operations: VecDeque::new(), }; *self = UploadQueue::Initialized(state); @@ -456,7 +465,12 @@ impl RemoteTimelineClient { /// The given `index_part` must be the one on the remote. pub fn init_upload_queue(&self, index_part: &IndexPart) -> anyhow::Result<()> { let mut upload_queue = self.upload_queue.lock().unwrap(); - upload_queue.initialize_with_current_remote_index_part(index_part)?; + upload_queue.initialize_with_current_remote_index_part( + self.conf, + self.tenant_id, + self.timeline_id, + index_part, + )?; Ok(()) } @@ -510,15 +524,13 @@ impl RemoteTimelineClient { /// On success, returns the size of the downloaded file. pub async fn download_layer_file( &self, - path: &RemotePath, + remote_path: &RemotePath, layer_metadata: &LayerFileMetadata, ) -> anyhow::Result { let downloaded_size = download::download_layer_file( self.conf, &self.storage_impl, - self.tenant_id, - self.timeline_id, - path, + remote_path, layer_metadata, ) .measure_remote_op( @@ -536,13 +548,13 @@ impl RemoteTimelineClient { let new_metadata = LayerFileMetadata::new(downloaded_size); let mut guard = self.upload_queue.lock().unwrap(); let upload_queue = guard.initialized_mut()?; - if let Some(upgraded) = upload_queue.latest_files.get_mut(path) { + if let Some(upgraded) = upload_queue.latest_files.get_mut(remote_path) { upgraded.merge(&new_metadata); } else { // The file should exist, since we just downloaded it. warn!( "downloaded file {:?} not found in local copy of the index file", - path + remote_path ); } } @@ -612,14 +624,9 @@ impl RemoteTimelineClient { "file size not initialized in metadata" ); - let relative_path = RemotePath::strip_base_path( - &self.conf.timeline_path(&self.timeline_id, &self.tenant_id), - path, - )?; - upload_queue .latest_files - .insert(relative_path, layer_metadata.clone()); + .insert(self.conf.remote_path(path)?, layer_metadata.clone()); let op = UploadOp::UploadLayer(PathBuf::from(path), layer_metadata.clone()); self.update_upload_queue_unfinished_metric(1, &op); @@ -641,13 +648,10 @@ impl RemoteTimelineClient { let mut guard = self.upload_queue.lock().unwrap(); let upload_queue = guard.initialized_mut()?; - // Convert the paths into RelativePaths, and gather other information we need. - let mut relative_paths = Vec::with_capacity(paths.len()); + // Convert the paths into RemotePaths, and gather other information we need. + let mut remote_paths = Vec::with_capacity(paths.len()); for path in paths { - relative_paths.push(RemotePath::strip_base_path( - &self.conf.timeline_path(&self.timeline_id, &self.tenant_id), - path, - )?); + remote_paths.push(self.conf.remote_path(path)?); } // Deleting layers doesn't affect the values stored in TimelineMetadata, @@ -663,8 +667,8 @@ impl RemoteTimelineClient { // from latest_files, but not yet scheduled for deletion. Use a closure // to syntactically forbid ? or bail! calls here. let no_bail_here = || { - for relative_path in relative_paths { - upload_queue.latest_files.remove(&relative_path); + for remote_path in remote_paths { + upload_queue.latest_files.remove(&remote_path); } let index_part = IndexPart::new( @@ -838,14 +842,19 @@ impl RemoteTimelineClient { let upload_result: anyhow::Result<()> = match &task.op { UploadOp::UploadLayer(ref path, ref layer_metadata) => { - upload::upload_timeline_layer(&self.storage_impl, path, layer_metadata) - .measure_remote_op( - self.tenant_id, - self.timeline_id, - RemoteOpFileKind::Layer, - RemoteOpKind::Upload, - ) - .await + upload::upload_timeline_layer( + self.conf, + &self.storage_impl, + path, + layer_metadata, + ) + .measure_remote_op( + self.tenant_id, + self.timeline_id, + RemoteOpFileKind::Layer, + RemoteOpKind::Upload, + ) + .await } UploadOp::UploadMetadata(ref index_part, _lsn) => { upload::upload_index_part( @@ -864,7 +873,7 @@ impl RemoteTimelineClient { .await } UploadOp::Delete(metric_file_kind, ref path) => { - delete::delete_layer(&self.storage_impl, path) + delete::delete_layer(self.conf, &self.storage_impl, path) .measure_remote_op( self.tenant_id, self.timeline_id, @@ -1093,15 +1102,11 @@ mod tests { TimelineMetadata::from_bytes(&metadata.to_bytes().unwrap()).unwrap() } - fn assert_file_list(a: &HashSet, b: &[&str]) { - let xx = PathBuf::from(""); - let mut avec: Vec = a - .iter() - .map(|x| x.to_local_path(&xx).to_string_lossy().into()) - .collect(); + fn assert_file_list(a: &HashSet, b: &[&str]) { + let mut avec: Vec<&str> = a.iter().map(|a| a.as_str()).collect(); avec.sort(); - let mut bvec = b.to_owned(); + let mut bvec = b.to_vec(); bvec.sort_unstable(); assert_eq!(avec, bvec); @@ -1169,8 +1174,7 @@ mod tests { println!("workdir: {}", harness.conf.workdir.display()); - let storage_impl = - GenericRemoteStorage::from_config(harness.conf.workdir.clone(), &storage_config)?; + let storage_impl = GenericRemoteStorage::from_config(&storage_config)?; let client = Arc::new(RemoteTimelineClient { conf: harness.conf, runtime, diff --git a/pageserver/src/storage_sync2/delete.rs b/pageserver/src/storage_sync2/delete.rs index f22dbdc2d8..9f6732fbff 100644 --- a/pageserver/src/storage_sync2/delete.rs +++ b/pageserver/src/storage_sync2/delete.rs @@ -5,34 +5,24 @@ use tracing::debug; use remote_storage::GenericRemoteStorage; -pub(super) async fn delete_layer( - storage: &GenericRemoteStorage, - local_layer_path: &Path, +use crate::config::PageServerConf; + +pub(super) async fn delete_layer<'a>( + conf: &'static PageServerConf, + storage: &'a GenericRemoteStorage, + local_layer_path: &'a Path, ) -> anyhow::Result<()> { fail::fail_point!("before-delete-layer", |_| { anyhow::bail!("failpoint before-delete-layer") }); - debug!( - "Deleting layer from remote storage: {:?}", - local_layer_path.display() - ); + debug!("Deleting layer from remote storage: {local_layer_path:?}",); - let storage_path = storage - .remote_object_id(local_layer_path) - .with_context(|| { - format!( - "Failed to get the layer storage path for local path '{}'", - local_layer_path.display() - ) - })?; + let path_to_delete = conf.remote_path(local_layer_path)?; // XXX: If the deletion fails because the object already didn't exist, // it would be good to just issue a warning but consider it success. // https://github.com/neondatabase/neon/issues/2934 - storage.delete(&storage_path).await.with_context(|| { - format!( - "Failed to delete remote layer from storage at '{:?}'", - storage_path - ) + storage.delete(&path_to_delete).await.with_context(|| { + format!("Failed to delete remote layer from storage at {path_to_delete:?}") }) } diff --git a/pageserver/src/storage_sync2/download.rs b/pageserver/src/storage_sync2/download.rs index d68455ea2b..18a6ac0179 100644 --- a/pageserver/src/storage_sync2/download.rs +++ b/pageserver/src/storage_sync2/download.rs @@ -10,12 +10,11 @@ use tracing::debug; use crate::config::PageServerConf; use crate::storage_sync::index::LayerFileMetadata; -use remote_storage::{DownloadError, GenericRemoteStorage}; +use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath}; use utils::crashsafe::path_with_suffix_extension; use utils::id::{TenantId, TimelineId}; use super::index::IndexPart; -use super::RemotePath; async fn fsync_path(path: impl AsRef) -> Result<(), std::io::Error> { fs::File::open(path).await?.sync_all().await @@ -29,21 +28,10 @@ async fn fsync_path(path: impl AsRef) -> Result<(), std::io::Er pub async fn download_layer_file<'a>( conf: &'static PageServerConf, storage: &'a GenericRemoteStorage, - tenant_id: TenantId, - timeline_id: TimelineId, - path: &'a RemotePath, + remote_path: &'a RemotePath, layer_metadata: &'a LayerFileMetadata, ) -> anyhow::Result { - let timeline_path = conf.timeline_path(&timeline_id, &tenant_id); - - let local_path = path.to_local_path(&timeline_path); - - let layer_storage_path = storage.remote_object_id(&local_path).with_context(|| { - format!( - "Failed to get the layer storage path for local path '{}'", - local_path.display() - ) - })?; + let local_path = conf.local_path(remote_path); // Perform a rename inspired by durable_rename from file_utils.c. // The sequence: @@ -64,19 +52,14 @@ pub async fn download_layer_file<'a>( temp_file_path.display() ) })?; - let mut download = storage - .download(&layer_storage_path) - .await - .with_context(|| { - format!( - "Failed to open a download stream for layer with remote storage path '{layer_storage_path:?}'" - ) - })?; - let bytes_amount = tokio::io::copy(&mut download.download_stream, &mut destination_file).await.with_context(|| { + let mut download = storage.download(remote_path).await.with_context(|| { format!( - "Failed to download layer with remote storage path '{layer_storage_path:?}' into file '{}'", temp_file_path.display() + "Failed to open a download stream for layer with remote storage path '{remote_path:?}'" ) })?; + let bytes_amount = tokio::io::copy(&mut download.download_stream, &mut destination_file).await.with_context(|| { + format!("Failed to download layer with remote storage path '{remote_path:?}' into file {temp_file_path:?}") + })?; // Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that: // A file will not be closed immediately when it goes out of scope if there are any IO operations @@ -151,12 +134,7 @@ pub async fn list_remote_timelines<'a>( tenant_id: TenantId, ) -> anyhow::Result> { let tenant_path = conf.timelines_path(&tenant_id); - let tenant_storage_path = storage.remote_object_id(&tenant_path).with_context(|| { - format!( - "Failed to get tenant storage path for local path '{}'", - tenant_path.display() - ) - })?; + let tenant_storage_path = conf.remote_path(&tenant_path)?; let timelines = storage .list_prefixes(Some(&tenant_storage_path)) @@ -218,14 +196,8 @@ pub async fn download_index_part( let index_part_path = conf .metadata_path(timeline_id, tenant_id) .with_file_name(IndexPart::FILE_NAME); - let part_storage_path = storage - .remote_object_id(&index_part_path) - .with_context(|| { - format!( - "Failed to get the index part storage path for local path '{}'", - index_part_path.display() - ) - }) + let part_storage_path = conf + .remote_path(&index_part_path) .map_err(DownloadError::BadInput)?; let mut index_part_download = storage.download(&part_storage_path).await?; @@ -236,20 +208,12 @@ pub async fn download_index_part( &mut index_part_bytes, ) .await - .with_context(|| { - format!( - "Failed to download an index part into file '{}'", - index_part_path.display() - ) - }) + .with_context(|| format!("Failed to download an index part into file {index_part_path:?}")) .map_err(DownloadError::Other)?; let index_part: IndexPart = serde_json::from_slice(&index_part_bytes) .with_context(|| { - format!( - "Failed to deserialize index part file into file '{}'", - index_part_path.display() - ) + format!("Failed to deserialize index part file into file {index_part_path:?}") }) .map_err(DownloadError::Other)?; diff --git a/pageserver/src/storage_sync2/index.rs b/pageserver/src/storage_sync2/index.rs index a1da37b826..5560712a1b 100644 --- a/pageserver/src/storage_sync2/index.rs +++ b/pageserver/src/storage_sync2/index.rs @@ -2,12 +2,9 @@ //! Able to restore itself from the storage index parts, that are located in every timeline's remote directory and contain all data about //! remote timeline layers and its metadata. -use std::{ - collections::{HashMap, HashSet}, - path::{Path, PathBuf}, -}; +use std::collections::{HashMap, HashSet}; -use anyhow::{Context, Ok}; +use remote_storage::RemotePath; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; @@ -15,34 +12,6 @@ use crate::tenant::metadata::TimelineMetadata; use utils::lsn::Lsn; -/// Path on the remote storage, relative to some inner prefix. -/// The prefix is an implementation detail, that allows representing local paths -/// as the remote ones, stripping the local storage prefix away. -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] -#[serde(transparent)] -pub struct RemotePath(PathBuf); - -impl RemotePath { - pub fn new(relative_path: &Path) -> Self { - debug_assert!( - relative_path.is_relative(), - "Path {relative_path:?} is not relative" - ); - Self(relative_path.to_path_buf()) - } - - pub fn strip_base_path(base_path: &Path, full_path: &Path) -> anyhow::Result { - let relative = full_path.strip_prefix(base_path).with_context(|| { - format!("path {full_path:?} is not relative to base {base_path:?}",) - })?; - Ok(Self::new(relative)) - } - - pub fn to_local_path(&self, base_path: &Path) -> PathBuf { - base_path.join(&self.0) - } -} - /// Metadata gathered for each of the layer files. /// /// Fields have to be `Option`s because remote [`IndexPart`]'s can be from different version, which @@ -101,19 +70,19 @@ pub struct IndexPart { /// Layer names, which are stored on the remote storage. /// /// Additional metadata can might exist in `layer_metadata`. - pub timeline_layers: HashSet, + pub timeline_layers: HashSet, /// FIXME: unused field. This should be removed, but that changes the on-disk format, - /// so we need to make sure we're backwards- (and maybe forwards-) compatible + /// so we need to make sure we're backwards-` (and maybe forwards-) compatible /// First pass is to move it to Optional and the next would be its removal - missing_layers: Option>, + missing_layers: Option>, /// Per layer file name metadata, which can be present for a present or missing layer file. /// /// Older versions of `IndexPart` will not have this property or have only a part of metadata /// that latest version stores. #[serde(default)] - pub layer_metadata: HashMap, + pub layer_metadata: HashMap, // 'disk_consistent_lsn' is a copy of the 'disk_consistent_lsn' in the metadata. // It's duplicated here for convenience. @@ -135,14 +104,20 @@ impl IndexPart { disk_consistent_lsn: Lsn, metadata_bytes: Vec, ) -> Self { - let mut timeline_layers = HashSet::new(); - let mut layer_metadata = HashMap::new(); + let mut timeline_layers = HashSet::with_capacity(layers_and_metadata.len()); + let mut layer_metadata = HashMap::with_capacity(layers_and_metadata.len()); - separate_paths_and_metadata( - &layers_and_metadata, - &mut timeline_layers, - &mut layer_metadata, - ); + for (remote_path, metadata) in &layers_and_metadata { + let metadata = IndexLayerMetadata::from(metadata); + match remote_path.object_name() { + Some(layer_name) => { + timeline_layers.insert(layer_name.to_owned()); + layer_metadata.insert(layer_name.to_owned(), metadata); + } + // TODO move this on a type level: we know, that every layer entry does have a name + None => panic!("Layer {remote_path:?} has no file name, skipping"), + } + } Self { version: Self::LATEST_VERSION, @@ -173,18 +148,6 @@ impl From<&'_ LayerFileMetadata> for IndexLayerMetadata { } } -fn separate_paths_and_metadata( - input: &HashMap, - output: &mut HashSet, - layer_metadata: &mut HashMap, -) { - for (path, metadata) in input { - let metadata = IndexLayerMetadata::from(metadata); - layer_metadata.insert(path.clone(), metadata); - output.insert(path.clone()); - } -} - #[cfg(test)] mod tests { use super::*; @@ -200,8 +163,8 @@ mod tests { let expected = IndexPart { version: 0, - timeline_layers: HashSet::from([RemotePath(PathBuf::from("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9"))]), - missing_layers: Some(HashSet::from([RemotePath(PathBuf::from("not_a_real_layer_but_adding_coverage"))])), + timeline_layers: HashSet::from([String::from("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9")]), + missing_layers: Some(HashSet::from([String::from("not_a_real_layer_but_adding_coverage")])), layer_metadata: HashMap::default(), disk_consistent_lsn: "0/16960E8".parse::().unwrap(), metadata_bytes: [113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0].to_vec(), @@ -228,13 +191,13 @@ mod tests { let expected = IndexPart { // note this is not verified, could be anything, but exists for humans debugging.. could be the git version instead? version: 1, - timeline_layers: HashSet::from([RemotePath(PathBuf::from("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9"))]), - missing_layers: Some(HashSet::from([RemotePath(PathBuf::from("not_a_real_layer_but_adding_coverage"))])), + timeline_layers: HashSet::from([String::from("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9")]), + missing_layers: Some(HashSet::from([String::from("not_a_real_layer_but_adding_coverage")])), layer_metadata: HashMap::from([ - (RemotePath(PathBuf::from("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9")), IndexLayerMetadata { + (String::from("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9"), IndexLayerMetadata { file_size: Some(25600000), }), - (RemotePath(PathBuf::from("not_a_real_layer_but_adding_coverage")), IndexLayerMetadata { + (String::from("not_a_real_layer_but_adding_coverage"), IndexLayerMetadata { // serde_json should always parse this but this might be a double with jq for // example. file_size: Some(9007199254741001), @@ -264,20 +227,26 @@ mod tests { let expected = IndexPart { // note this is not verified, could be anything, but exists for humans debugging.. could be the git version instead? version: 1, - timeline_layers: [RemotePath(PathBuf::from("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9"))].into_iter().collect(), + timeline_layers: HashSet::from(["000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".to_string()]), layer_metadata: HashMap::from([ - (RemotePath(PathBuf::from("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9")), IndexLayerMetadata { - file_size: Some(25600000), - }), - (RemotePath(PathBuf::from("not_a_real_layer_but_adding_coverage")), IndexLayerMetadata { - // serde_json should always parse this but this might be a double with jq for - // example. - file_size: Some(9007199254741001), - }) + ( + "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".to_string(), + IndexLayerMetadata { + file_size: Some(25600000), + } + ), + ( + "not_a_real_layer_but_adding_coverage".to_string(), + IndexLayerMetadata { + // serde_json should always parse this but this might be a double with jq for + // example. + file_size: Some(9007199254741001), + } + ) ]), disk_consistent_lsn: "0/16960E8".parse::().unwrap(), metadata_bytes: [112,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0].to_vec(), - missing_layers: None::>, + missing_layers: None, }; let part = serde_json::from_str::(example).unwrap(); diff --git a/pageserver/src/storage_sync2/upload.rs b/pageserver/src/storage_sync2/upload.rs index b03a0f6ce7..57a524a22d 100644 --- a/pageserver/src/storage_sync2/upload.rs +++ b/pageserver/src/storage_sync2/upload.rs @@ -30,12 +30,9 @@ pub(super) async fn upload_index_part<'a>( let index_part_path = conf .metadata_path(timeline_id, tenant_id) .with_file_name(IndexPart::FILE_NAME); + let storage_path = conf.remote_path(&index_part_path)?; storage - .upload_storage_object( - Box::new(index_part_bytes), - index_part_size, - &index_part_path, - ) + .upload_storage_object(Box::new(index_part_bytes), index_part_size, &storage_path) .await .with_context(|| format!("Failed to upload index part for '{tenant_id} / {timeline_id}'")) } @@ -44,36 +41,26 @@ pub(super) async fn upload_index_part<'a>( /// No extra checks for overlapping files is made and any files that are already present remotely will be overwritten, if submitted during the upload. /// /// On an error, bumps the retries count and reschedules the entire task. -pub(super) async fn upload_timeline_layer( - storage: &GenericRemoteStorage, - source_path: &Path, - known_metadata: &LayerFileMetadata, +pub(super) async fn upload_timeline_layer<'a>( + conf: &'static PageServerConf, + storage: &'a GenericRemoteStorage, + source_path: &'a Path, + known_metadata: &'a LayerFileMetadata, ) -> anyhow::Result<()> { fail_point!("before-upload-layer", |_| { bail!("failpoint before-upload-layer") }); - let storage_path = storage.remote_object_id(source_path).with_context(|| { - format!( - "Failed to get the layer storage path for local path '{}'", - source_path.display() - ) - })?; + let storage_path = conf.remote_path(source_path)?; - let source_file = fs::File::open(&source_path).await.with_context(|| { - format!( - "Failed to open a source file for layer '{}'", - source_path.display() - ) - })?; + let source_file = fs::File::open(&source_path) + .await + .with_context(|| format!("Failed to open a source file for layer {source_path:?}"))?; let fs_size = source_file .metadata() .await .with_context(|| { - format!( - "Failed to get the source file metadata for layer '{}'", - source_path.display() - ) + format!("Failed to get the source file metadata for layer {source_path:?}") })? .len(); diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 4011156ec5..3b15966352 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -19,7 +19,7 @@ use std::sync::atomic::{AtomicBool, AtomicI64, Ordering as AtomicOrdering}; use std::sync::{Arc, Mutex, MutexGuard, RwLock}; use std::time::{Duration, Instant, SystemTime}; -use crate::storage_sync::index::{IndexPart, RemotePath}; +use crate::storage_sync::index::IndexPart; use crate::storage_sync::RemoteTimelineClient; use crate::tenant::{ delta_layer::{DeltaLayer, DeltaLayerWriter}, @@ -999,55 +999,9 @@ impl Timeline { &self, index_part: &IndexPart, remote_client: &RemoteTimelineClient, - mut local_filenames: HashSet, + local_layers: HashSet, up_to_date_disk_consistent_lsn: Lsn, ) -> anyhow::Result> { - let mut remote_filenames: HashSet = HashSet::new(); - for fname in index_part.timeline_layers.iter() { - remote_filenames.insert(fname.to_local_path(&PathBuf::from(""))); - } - - // Are there any local files that exist, with a size that doesn't match - // with the size stored in the remote index file? - // If so, rename_to_backup those files so that we re-download them later. - local_filenames.retain(|path| { - let layer_metadata = index_part - .layer_metadata - .get(&RemotePath::new(path)) - .map(LayerFileMetadata::from) - .unwrap_or(LayerFileMetadata::MISSING); - - if let Some(remote_size) = layer_metadata.file_size() { - let local_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id).join(&path); - match local_path.metadata() { - Ok(metadata) => { - let local_size = metadata.len(); - - if local_size != remote_size { - warn!("removing local file \"{}\" because it has unexpected length {}; length in remote index is {}", - path.display(), - local_size, - remote_size); - if let Err(err) = rename_to_backup(&local_path) { - error!("could not rename file \"{}\": {:?}", - local_path.display(), err); - } - self.metrics.current_physical_size_gauge.sub(local_size); - false - } else { - true - } - } - Err(err) => { - error!("could not get size of local file \"{}\": {:?}", path.display(), err); - true - } - } - } else { - true - } - }); - // Are we missing some files that are present in remote storage? // Download them now. // TODO Downloading many files this way is not efficient. @@ -1056,17 +1010,63 @@ impl Timeline { // b) typical case now is that there is nothing to sync, this downloads a lot // 1) if there was another pageserver that came and generated new files // 2) during attach of a timeline with big history which we currently do not do - for path in remote_filenames.difference(&local_filenames) { - let fname = path.to_str().unwrap(); - info!("remote layer file {fname} does not exist locally"); + let mut local_only_layers = local_layers; + let timeline_dir = self.conf.timeline_path(&self.timeline_id, &self.tenant_id); + for remote_layer_name in &index_part.timeline_layers { + let local_layer_path = timeline_dir.join(remote_layer_name); + local_only_layers.remove(&local_layer_path); - let layer_metadata = index_part + let remote_layer_metadata = index_part .layer_metadata - .get(&RemotePath::new(path)) + .get(remote_layer_name) .map(LayerFileMetadata::from) .unwrap_or(LayerFileMetadata::MISSING); - if let Some(imgfilename) = ImageFileName::parse_str(fname) { + let remote_layer_path = self + .conf + .remote_path(&local_layer_path) + .expect("local_layer_path received from the same conf that provided a workdir"); + + if local_layer_path.exists() { + let mut already_downloaded = true; + // Are there any local files that exist, with a size that doesn't match + // with the size stored in the remote index file? + // If so, rename_to_backup those files so that we re-download them later. + if let Some(remote_size) = remote_layer_metadata.file_size() { + match local_layer_path.metadata() { + Ok(metadata) => { + let local_size = metadata.len(); + + if local_size != remote_size { + warn!("removing local file {local_layer_path:?} because it has unexpected length {local_size}; length in remote index is {remote_size}"); + if let Err(err) = rename_to_backup(&local_layer_path) { + error!("could not rename file {local_layer_path:?}: {err:?}"); + } else { + self.metrics.current_physical_size_gauge.sub(local_size); + already_downloaded = false; + } + } + } + Err(err) => { + error!("could not get size of local file {local_layer_path:?}: {err:?}") + } + } + } + + if already_downloaded { + continue; + } + } else { + info!("remote layer {remote_layer_path:?} does not exist locally"); + } + + let layer_name = local_layer_path + .file_name() + .and_then(|os_str| os_str.to_str()) + .with_context(|| { + format!("Layer file {local_layer_path:?} has no name in unicode") + })?; + if let Some(imgfilename) = ImageFileName::parse_str(layer_name) { if imgfilename.lsn > up_to_date_disk_consistent_lsn { warn!( "found future image layer {} on timeline {} remote_consistent_lsn is {}", @@ -1075,11 +1075,13 @@ impl Timeline { continue; } - trace!("downloading image file: {path:?}"); - let sz = remote_client - .download_layer_file(&RemotePath::new(path), &layer_metadata) + trace!("downloading image file: {remote_layer_path:?}"); + let downloaded_size = remote_client + .download_layer_file(&remote_layer_path, &remote_layer_metadata) .await - .context("download image layer")?; + .with_context(|| { + format!("failed to download image layer from path {remote_layer_path:?}") + })?; trace!("done"); let image_layer = @@ -1089,8 +1091,10 @@ impl Timeline { .write() .unwrap() .insert_historic(Arc::new(image_layer)); - self.metrics.current_physical_size_gauge.add(sz); - } else if let Some(deltafilename) = DeltaFileName::parse_str(fname) { + self.metrics + .current_physical_size_gauge + .add(downloaded_size); + } else if let Some(deltafilename) = DeltaFileName::parse_str(layer_name) { // Create a DeltaLayer struct for each delta file. // The end-LSN is exclusive, while disk_consistent_lsn is // inclusive. For example, if disk_consistent_lsn is 100, it is @@ -1105,11 +1109,13 @@ impl Timeline { continue; } - trace!("downloading delta file: {path:?}"); + trace!("downloading delta file: {remote_layer_path:?}"); let sz = remote_client - .download_layer_file(&RemotePath::new(path), &layer_metadata) + .download_layer_file(&remote_layer_path, &remote_layer_metadata) .await - .context("download delta layer")?; + .with_context(|| { + format!("failed to download delta layer from path {remote_layer_path:?}") + })?; trace!("done"); let delta_layer = @@ -1121,16 +1127,11 @@ impl Timeline { .insert_historic(Arc::new(delta_layer)); self.metrics.current_physical_size_gauge.add(sz); } else { - bail!("unexpected layer filename in remote storage: {}", fname); + bail!("unexpected layer filename {layer_name} in remote storage path: {remote_layer_path:?}"); } } - // now these are local only filenames - let local_only_filenames = local_filenames - .difference(&remote_filenames) - .cloned() - .collect(); - Ok(local_only_filenames) + Ok(local_only_layers) } /// @@ -1164,47 +1165,46 @@ impl Timeline { let disk_consistent_lsn = up_to_date_metadata.disk_consistent_lsn(); // Build a map of local layers for quick lookups - let mut local_filenames: HashSet = HashSet::new(); - for layer in self.layers.read().unwrap().iter_historic_layers() { - local_filenames.insert(layer.filename()); - } + let local_layers = self + .layers + .read() + .unwrap() + .iter_historic_layers() + .map(|historic_layer| { + historic_layer + .local_path() + .expect("Historic layers should have a path") + }) + .collect::>(); - let local_only_filenames = match index_part { + let local_only_layers = match index_part { Some(index_part) => { info!( "initializing upload queue from remote index with {} layer files", index_part.timeline_layers.len() ); remote_client.init_upload_queue(index_part)?; - let local_only_filenames = self - .download_missing( - index_part, - remote_client, - local_filenames, - disk_consistent_lsn, - ) - .await?; - local_only_filenames + self.download_missing(index_part, remote_client, local_layers, disk_consistent_lsn) + .await? } None => { info!("initializing upload queue as empty"); remote_client.init_upload_queue_for_empty_remote(up_to_date_metadata)?; - local_filenames + local_layers } }; // Are there local files that don't exist remotely? Schedule uploads for them - let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id); - for fname in &local_only_filenames { - let absolute = timeline_path.join(fname); - let sz = absolute + for layer_path in &local_only_layers { + let layer_size = layer_path .metadata() - .with_context(|| format!("failed to get file {} metadata", fname.display()))? + .with_context(|| format!("failed to get file {layer_path:?} metadata"))? .len(); - info!("scheduling {} for upload", fname.display()); - remote_client.schedule_layer_file_upload(&absolute, &LayerFileMetadata::new(sz))?; + info!("scheduling {layer_path:?} for upload"); + remote_client + .schedule_layer_file_upload(layer_path, &LayerFileMetadata::new(layer_size))?; } - if !local_only_filenames.is_empty() { + if !local_only_layers.is_empty() { remote_client.schedule_index_upload(up_to_date_metadata)?; } diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 576a02c686..a3481430d0 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -226,6 +226,7 @@ impl ReplicationConn { let mut end_pos = stop_pos.unwrap_or(inmem_state.commit_lsn); let mut wal_reader = WalReader::new( + spg.conf.workdir.clone(), spg.conf.timeline_dir(&tli.ttid), &persisted_state, start_pos, diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index 0a43d6085c..300e9a1cba 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -13,7 +13,7 @@ use std::time::Duration; use postgres_ffi::v14::xlog_utils::XLogSegNoOffsetToRecPtr; use postgres_ffi::XLogFileName; use postgres_ffi::{XLogSegNo, PG_TLI}; -use remote_storage::GenericRemoteStorage; +use remote_storage::{GenericRemoteStorage, RemotePath}; use tokio::fs::File; use tokio::runtime::Builder; @@ -151,7 +151,7 @@ async fn update_task( let timeline_dir = conf.timeline_dir(&ttid); let handle = tokio::spawn( - backup_task_main(ttid, timeline_dir, shutdown_rx) + backup_task_main(ttid, timeline_dir, conf.workdir.clone(), shutdown_rx) .instrument(info_span!("WAL backup task", ttid = %ttid)), ); @@ -182,10 +182,10 @@ async fn wal_backup_launcher_main_loop( let conf_ = conf.clone(); REMOTE_STORAGE.get_or_init(|| { - conf_.remote_storage.as_ref().map(|c| { - GenericRemoteStorage::from_config(conf_.workdir, c) - .expect("failed to create remote storage") - }) + conf_ + .remote_storage + .as_ref() + .map(|c| GenericRemoteStorage::from_config(c).expect("failed to create remote storage")) }); // Presense in this map means launcher is aware s3 offloading is needed for @@ -234,6 +234,7 @@ async fn wal_backup_launcher_main_loop( struct WalBackupTask { timeline: Arc, timeline_dir: PathBuf, + workspace_dir: PathBuf, wal_seg_size: usize, commit_lsn_watch_rx: watch::Receiver, } @@ -242,6 +243,7 @@ struct WalBackupTask { async fn backup_task_main( ttid: TenantTimelineId, timeline_dir: PathBuf, + workspace_dir: PathBuf, mut shutdown_rx: Receiver<()>, ) { info!("started"); @@ -257,6 +259,7 @@ async fn backup_task_main( commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(), timeline: tli, timeline_dir, + workspace_dir, }; // task is spinned up only when wal_seg_size already initialized @@ -321,6 +324,7 @@ impl WalBackupTask { commit_lsn, self.wal_seg_size, &self.timeline_dir, + &self.workspace_dir, ) .await { @@ -353,11 +357,12 @@ pub async fn backup_lsn_range( end_lsn: Lsn, wal_seg_size: usize, timeline_dir: &Path, + workspace_dir: &Path, ) -> Result { let mut res = start_lsn; let segments = get_segments(start_lsn, end_lsn, wal_seg_size); for s in &segments { - backup_single_segment(s, timeline_dir) + backup_single_segment(s, timeline_dir, workspace_dir) .await .with_context(|| format!("offloading segno {}", s.seg_no))?; @@ -372,11 +377,24 @@ pub async fn backup_lsn_range( Ok(res) } -async fn backup_single_segment(seg: &Segment, timeline_dir: &Path) -> Result<()> { - let segment_file_name = seg.file_path(timeline_dir)?; +async fn backup_single_segment( + seg: &Segment, + timeline_dir: &Path, + workspace_dir: &Path, +) -> Result<()> { + let segment_file_path = seg.file_path(timeline_dir)?; + let remote_segment_path = segment_file_path + .strip_prefix(&workspace_dir) + .context("Failed to strip workspace dir prefix") + .and_then(RemotePath::new) + .with_context(|| { + format!( + "Failed to resolve remote part of path {segment_file_path:?} for base {workspace_dir:?}", + ) + })?; - backup_object(&segment_file_name, seg.size()).await?; - debug!("Backup of {} done", segment_file_name.display()); + backup_object(&segment_file_path, &remote_segment_path, seg.size()).await?; + debug!("Backup of {} done", segment_file_path.display()); Ok(()) } @@ -426,7 +444,7 @@ fn get_segments(start: Lsn, end: Lsn, seg_size: usize) -> Vec { static REMOTE_STORAGE: OnceCell> = OnceCell::new(); -async fn backup_object(source_file: &Path, size: usize) -> Result<()> { +async fn backup_object(source_file: &Path, target_file: &RemotePath, size: usize) -> Result<()> { let storage = REMOTE_STORAGE .get() .expect("failed to get remote storage") @@ -441,12 +459,12 @@ async fn backup_object(source_file: &Path, size: usize) -> Result<()> { })?); storage - .upload_storage_object(Box::new(file), size, source_file) + .upload_storage_object(Box::new(file), size, target_file) .await } pub async fn read_object( - file_path: PathBuf, + file_path: &RemotePath, offset: u64, ) -> anyhow::Result>> { let storage = REMOTE_STORAGE @@ -455,19 +473,13 @@ pub async fn read_object( .as_ref() .context("No remote storage configured")?; - info!( - "segment download about to start for local path {} at offset {}", - file_path.display(), - offset - ); + info!("segment download about to start from remote path {file_path:?} at offset {offset}"); + let download = storage - .download_storage_object(Some((offset, None)), &file_path) + .download_storage_object(Some((offset, None)), file_path) .await .with_context(|| { - format!( - "Failed to open WAL segment download stream for local path {}", - file_path.display() - ) + format!("Failed to open WAL segment download stream for remote path {file_path:?}") })?; Ok(download.download_stream) diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index bc5e2d7b24..52368bb719 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -8,6 +8,7 @@ //! Note that last file has `.partial` suffix, that's different from postgres. use anyhow::{bail, Context, Result}; +use remote_storage::RemotePath; use std::io::{self, Seek, SeekFrom}; use std::pin::Pin; @@ -445,6 +446,7 @@ fn remove_segments_from_disk( } pub struct WalReader { + workdir: PathBuf, timeline_dir: PathBuf, wal_seg_size: usize, pos: Lsn, @@ -459,6 +461,7 @@ pub struct WalReader { impl WalReader { pub fn new( + workdir: PathBuf, timeline_dir: PathBuf, state: &SafeKeeperState, start_pos: Lsn, @@ -478,6 +481,7 @@ impl WalReader { } Ok(Self { + workdir, timeline_dir, wal_seg_size: state.server.wal_seg_size as usize, pos: start_pos, @@ -545,7 +549,17 @@ impl WalReader { // Try to open remote file, if remote reads are enabled if self.enable_remote_read { - return read_object(wal_file_path, xlogoff as u64).await; + let remote_wal_file_path = wal_file_path + .strip_prefix(&self.workdir) + .context("Failed to strip workdir prefix") + .and_then(RemotePath::new) + .with_context(|| { + format!( + "Failed to resolve remote part of path {:?} for base {:?}", + wal_file_path, self.workdir, + ) + })?; + return read_object(&remote_wal_file_path, xlogoff as u64).await; } bail!("WAL segment is not found")