From 4f172e7612870909613eb7c8f9c3d3a41a426618 Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Sat, 9 Apr 2022 01:15:20 +0300 Subject: [PATCH] Replicate S3 blob metadata in the remote storage --- pageserver/src/remote_storage.rs | 12 +- pageserver/src/remote_storage/local_fs.rs | 188 +++++++++++++++--- pageserver/src/remote_storage/s3_bucket.rs | 12 +- .../src/remote_storage/storage_sync/upload.rs | 1 + 4 files changed, 179 insertions(+), 34 deletions(-) diff --git a/pageserver/src/remote_storage.rs b/pageserver/src/remote_storage.rs index 02d37af5de..aebd74af5a 100644 --- a/pageserver/src/remote_storage.rs +++ b/pageserver/src/remote_storage.rs @@ -325,27 +325,35 @@ trait RemoteStorage: Send + Sync { &self, from: impl io::AsyncRead + Unpin + Send + Sync + 'static, to: &Self::StoragePath, + metadata: Option, ) -> anyhow::Result<()>; /// Streams the remote storage entry contents into the buffered writer given, returns the filled writer. + /// Returns the metadata, if any was stored with the file previously. async fn download( &self, from: &Self::StoragePath, to: &mut (impl io::AsyncWrite + Unpin + Send + Sync), - ) -> anyhow::Result<()>; + ) -> anyhow::Result>; /// Streams a given byte range of the remote storage entry contents into the buffered writer given, returns the filled writer. + /// Returns the metadata, if any was stored with the file previously. async fn download_range( &self, from: &Self::StoragePath, start_inclusive: u64, end_exclusive: Option, to: &mut (impl io::AsyncWrite + Unpin + Send + Sync), - ) -> anyhow::Result<()>; + ) -> anyhow::Result>; async fn delete(&self, path: &Self::StoragePath) -> anyhow::Result<()>; } +/// Extra set of key-value pairs that contain arbitrary metadata about the storage entry. +/// Immutable, cannot be changed once the file is created. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct StorageMetadata(HashMap); + fn strip_path_prefix<'a>(prefix: &'a Path, path: &'a Path) -> anyhow::Result<&'a Path> { if prefix == path { anyhow::bail!( diff --git a/pageserver/src/remote_storage/local_fs.rs b/pageserver/src/remote_storage/local_fs.rs index bac693c8d0..846adf8e9b 100644 --- a/pageserver/src/remote_storage/local_fs.rs +++ b/pageserver/src/remote_storage/local_fs.rs @@ -5,7 +5,6 @@ //! volume is mounted to the local FS. use std::{ - ffi::OsString, future::Future, path::{Path, PathBuf}, pin::Pin, @@ -18,7 +17,7 @@ use tokio::{ }; use tracing::*; -use super::{strip_path_prefix, RemoteStorage}; +use super::{strip_path_prefix, RemoteStorage, StorageMetadata}; pub struct LocalFs { pageserver_workdir: &'static Path, @@ -54,6 +53,32 @@ impl LocalFs { ) } } + + async fn read_storage_metadata( + &self, + file_path: &Path, + ) -> anyhow::Result> { + let metadata_path = storage_metadata_path(&file_path); + if metadata_path.exists() && metadata_path.is_file() { + let metadata_string = fs::read_to_string(&metadata_path).await.with_context(|| { + format!( + "Failed to read metadata from the local storage at '{}'", + metadata_path.display() + ) + })?; + + serde_json::from_str(&metadata_string) + .with_context(|| { + format!( + "Failed to deserialize metadata from the local storage at '{}'", + metadata_path.display() + ) + }) + .map(|metadata| Some(StorageMetadata(metadata))) + } else { + Ok(None) + } + } } #[async_trait::async_trait] @@ -81,19 +106,14 @@ impl RemoteStorage for LocalFs { &self, mut from: impl io::AsyncRead + Unpin + Send + Sync + 'static, to: &Self::StoragePath, + metadata: Option, ) -> anyhow::Result<()> { let target_file_path = self.resolve_in_storage(to)?; create_target_directory(&target_file_path).await?; // We need this dance with sort of durable rename (without fsyncs) // to prevent partial uploads. This was really hit when pageserver shutdown // cancelled the upload and partial file was left on the fs - let mut temp_extension = target_file_path - .extension() - .unwrap_or_default() - .to_os_string(); - - temp_extension.push(OsString::from(".temp")); - let temp_file_path = target_file_path.with_extension(temp_extension); + let temp_file_path = path_with_suffix_extension(&target_file_path, ".temp"); let mut destination = io::BufWriter::new( fs::OpenOptions::new() .write(true) @@ -132,6 +152,23 @@ impl RemoteStorage for LocalFs { target_file_path.display() ) })?; + + if let Some(storage_metadata) = metadata { + let storage_metadata_path = storage_metadata_path(&target_file_path); + fs::write( + &storage_metadata_path, + serde_json::to_string(&storage_metadata.0) + .context("Failed to serialize storage metadata as json")?, + ) + .await + .with_context(|| { + format!( + "Failed to write metadata to the local storage at '{}'", + storage_metadata_path.display() + ) + })?; + } + Ok(()) } @@ -139,7 +176,7 @@ impl RemoteStorage for LocalFs { &self, from: &Self::StoragePath, to: &mut (impl io::AsyncWrite + Unpin + Send + Sync), - ) -> anyhow::Result<()> { + ) -> anyhow::Result> { let file_path = self.resolve_in_storage(from)?; if file_path.exists() && file_path.is_file() { @@ -162,7 +199,8 @@ impl RemoteStorage for LocalFs { ) })?; source.flush().await?; - Ok(()) + + self.read_storage_metadata(&file_path).await } else { bail!( "File '{}' either does not exist or is not a file", @@ -177,7 +215,7 @@ impl RemoteStorage for LocalFs { start_inclusive: u64, end_exclusive: Option, to: &mut (impl io::AsyncWrite + Unpin + Send + Sync), - ) -> anyhow::Result<()> { + ) -> anyhow::Result> { if let Some(end_exclusive) = end_exclusive { ensure!( end_exclusive > start_inclusive, @@ -186,7 +224,7 @@ impl RemoteStorage for LocalFs { end_exclusive ); if start_inclusive == end_exclusive.saturating_sub(1) { - return Ok(()); + return Ok(None); } } let file_path = self.resolve_in_storage(from)?; @@ -220,7 +258,8 @@ impl RemoteStorage for LocalFs { file_path.display() ) })?; - Ok(()) + + self.read_storage_metadata(&file_path).await } else { bail!( "File '{}' either does not exist or is not a file", @@ -242,6 +281,17 @@ impl RemoteStorage for LocalFs { } } +fn path_with_suffix_extension(original_path: &Path, suffix: &str) -> PathBuf { + let mut extension_with_suffix = original_path.extension().unwrap_or_default().to_os_string(); + extension_with_suffix.push(suffix); + + original_path.with_extension(extension_with_suffix) +} + +fn storage_metadata_path(original_path: &Path) -> PathBuf { + path_with_suffix_extension(original_path, ".metadata") +} + fn get_all_files<'a, P>( directory_path: P, ) -> Pin>> + Send + Sync + 'a>> @@ -451,7 +501,7 @@ mod fs_tests { use super::*; use crate::repository::repo_harness::{RepoHarness, TIMELINE_ID}; - use std::io::Write; + use std::{collections::HashMap, io::Write}; use tempfile::tempdir; #[tokio::test] @@ -465,7 +515,7 @@ mod fs_tests { ) .await?; let target_path = PathBuf::from("/").join("somewhere").join("else"); - match storage.upload(source, &target_path).await { + match storage.upload(source, &target_path, None).await { Ok(()) => panic!("Should not allow storing files with wrong target path"), Err(e) => { let message = format!("{:?}", e); @@ -475,14 +525,14 @@ mod fs_tests { } assert!(storage.list().await?.is_empty()); - let target_path_1 = upload_dummy_file(&repo_harness, &storage, "upload_1").await?; + let target_path_1 = upload_dummy_file(&repo_harness, &storage, "upload_1", None).await?; assert_eq!( storage.list().await?, vec![target_path_1.clone()], "Should list a single file after first upload" ); - let target_path_2 = upload_dummy_file(&repo_harness, &storage, "upload_2").await?; + let target_path_2 = upload_dummy_file(&repo_harness, &storage, "upload_2", None).await?; assert_eq!( list_files_sorted(&storage).await?, vec![target_path_1.clone(), target_path_2.clone()], @@ -503,12 +553,16 @@ mod fs_tests { let repo_harness = RepoHarness::create("download_file")?; let storage = create_storage()?; let upload_name = "upload_1"; - let upload_target = upload_dummy_file(&repo_harness, &storage, upload_name).await?; + let upload_target = upload_dummy_file(&repo_harness, &storage, upload_name, None).await?; let mut content_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new())); - storage.download(&upload_target, &mut content_bytes).await?; - content_bytes.flush().await?; + let metadata = storage.download(&upload_target, &mut content_bytes).await?; + assert!( + metadata.is_none(), + "No metadata should be returned for no metadata upload" + ); + content_bytes.flush().await?; let contents = String::from_utf8(content_bytes.into_inner().into_inner())?; assert_eq!( dummy_contents(upload_name), @@ -533,12 +587,16 @@ mod fs_tests { let repo_harness = RepoHarness::create("download_file_range_positive")?; let storage = create_storage()?; let upload_name = "upload_1"; - let upload_target = upload_dummy_file(&repo_harness, &storage, upload_name).await?; + let upload_target = upload_dummy_file(&repo_harness, &storage, upload_name, None).await?; let mut full_range_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new())); - storage + let metadata = storage .download_range(&upload_target, 0, None, &mut full_range_bytes) .await?; + assert!( + metadata.is_none(), + "No metadata should be returned for no metadata upload" + ); full_range_bytes.flush().await?; assert_eq!( dummy_contents(upload_name), @@ -548,7 +606,7 @@ mod fs_tests { let mut zero_range_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new())); let same_byte = 1_000_000_000; - storage + let metadata = storage .download_range( &upload_target, same_byte, @@ -556,6 +614,10 @@ mod fs_tests { &mut zero_range_bytes, ) .await?; + assert!( + metadata.is_none(), + "No metadata should be returned for no metadata upload" + ); zero_range_bytes.flush().await?; assert!( zero_range_bytes.into_inner().into_inner().is_empty(), @@ -566,7 +628,7 @@ mod fs_tests { let (first_part_local, second_part_local) = uploaded_bytes.split_at(3); let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new())); - storage + let metadata = storage .download_range( &upload_target, 0, @@ -574,6 +636,11 @@ mod fs_tests { &mut first_part_remote, ) .await?; + assert!( + metadata.is_none(), + "No metadata should be returned for no metadata upload" + ); + first_part_remote.flush().await?; let first_part_remote = first_part_remote.into_inner().into_inner(); assert_eq!( @@ -583,7 +650,7 @@ mod fs_tests { ); let mut second_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new())); - storage + let metadata = storage .download_range( &upload_target, first_part_local.len() as u64, @@ -591,6 +658,11 @@ mod fs_tests { &mut second_part_remote, ) .await?; + assert!( + metadata.is_none(), + "No metadata should be returned for no metadata upload" + ); + second_part_remote.flush().await?; let second_part_remote = second_part_remote.into_inner().into_inner(); assert_eq!( @@ -607,7 +679,7 @@ mod fs_tests { let repo_harness = RepoHarness::create("download_file_range_negative")?; let storage = create_storage()?; let upload_name = "upload_1"; - let upload_target = upload_dummy_file(&repo_harness, &storage, upload_name).await?; + let upload_target = upload_dummy_file(&repo_harness, &storage, upload_name, None).await?; let start = 10000; let end = 234; @@ -645,7 +717,7 @@ mod fs_tests { let repo_harness = RepoHarness::create("delete_file")?; let storage = create_storage()?; let upload_name = "upload_1"; - let upload_target = upload_dummy_file(&repo_harness, &storage, upload_name).await?; + let upload_target = upload_dummy_file(&repo_harness, &storage, upload_name, None).await?; storage.delete(&upload_target).await?; assert!(storage.list().await?.is_empty()); @@ -661,10 +733,69 @@ mod fs_tests { Ok(()) } + #[tokio::test] + async fn file_with_metadata() -> anyhow::Result<()> { + let repo_harness = RepoHarness::create("download_file")?; + let storage = create_storage()?; + let upload_name = "upload_1"; + let metadata = StorageMetadata(HashMap::from([ + ("one".to_string(), "1".to_string()), + ("two".to_string(), "2".to_string()), + ])); + let upload_target = + upload_dummy_file(&repo_harness, &storage, upload_name, Some(metadata.clone())).await?; + + let mut content_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new())); + let full_download_metadata = storage.download(&upload_target, &mut content_bytes).await?; + + content_bytes.flush().await?; + let contents = String::from_utf8(content_bytes.into_inner().into_inner())?; + assert_eq!( + dummy_contents(upload_name), + contents, + "We should upload and download the same contents" + ); + + assert_eq!( + full_download_metadata.as_ref(), + Some(&metadata), + "We should get the same metadata back for full download" + ); + + let uploaded_bytes = dummy_contents(upload_name).into_bytes(); + let (first_part_local, _) = uploaded_bytes.split_at(3); + + let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new())); + let partial_download_metadata = storage + .download_range( + &upload_target, + 0, + Some(first_part_local.len() as u64), + &mut first_part_remote, + ) + .await?; + first_part_remote.flush().await?; + let first_part_remote = first_part_remote.into_inner().into_inner(); + assert_eq!( + first_part_local, + first_part_remote.as_slice(), + "First part bytes should be returned when requested" + ); + + assert_eq!( + partial_download_metadata.as_ref(), + Some(&metadata), + "We should get the same metadata back for partial download" + ); + + Ok(()) + } + async fn upload_dummy_file( harness: &RepoHarness<'_>, storage: &LocalFs, name: &str, + metadata: Option, ) -> anyhow::Result { let timeline_path = harness.timeline_path(&TIMELINE_ID); let relative_timeline_path = timeline_path.strip_prefix(&harness.conf.workdir)?; @@ -677,6 +808,7 @@ mod fs_tests { ) .await?, &storage_path, + metadata, ) .await?; Ok(storage_path) diff --git a/pageserver/src/remote_storage/s3_bucket.rs b/pageserver/src/remote_storage/s3_bucket.rs index 92b3b0cce8..bfd28168f4 100644 --- a/pageserver/src/remote_storage/s3_bucket.rs +++ b/pageserver/src/remote_storage/s3_bucket.rs @@ -24,6 +24,8 @@ use crate::{ remote_storage::{strip_path_prefix, RemoteStorage}, }; +use super::StorageMetadata; + const S3_FILE_SEPARATOR: char = '/'; #[derive(Debug, Eq, PartialEq)] @@ -179,12 +181,14 @@ impl RemoteStorage for S3Bucket { &self, from: impl io::AsyncRead + Unpin + Send + Sync + 'static, to: &Self::StoragePath, + metadata: Option, ) -> anyhow::Result<()> { self.client .put_object(PutObjectRequest { body: Some(StreamingBody::new(ReaderStream::new(from))), bucket: self.bucket_name.clone(), key: to.key().to_owned(), + metadata: metadata.map(|m| m.0), ..PutObjectRequest::default() }) .await?; @@ -195,7 +199,7 @@ impl RemoteStorage for S3Bucket { &self, from: &Self::StoragePath, to: &mut (impl io::AsyncWrite + Unpin + Send + Sync), - ) -> anyhow::Result<()> { + ) -> anyhow::Result> { let object_output = self .client .get_object(GetObjectRequest { @@ -210,7 +214,7 @@ impl RemoteStorage for S3Bucket { io::copy(&mut from, to).await?; } - Ok(()) + Ok(object_output.metadata.map(StorageMetadata)) } async fn download_range( @@ -219,7 +223,7 @@ impl RemoteStorage for S3Bucket { start_inclusive: u64, end_exclusive: Option, to: &mut (impl io::AsyncWrite + Unpin + Send + Sync), - ) -> anyhow::Result<()> { + ) -> anyhow::Result> { // S3 accepts ranges as https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35 // and needs both ends to be exclusive let end_inclusive = end_exclusive.map(|end| end.saturating_sub(1)); @@ -242,7 +246,7 @@ impl RemoteStorage for S3Bucket { io::copy(&mut from, to).await?; } - Ok(()) + Ok(object_output.metadata.map(StorageMetadata)) } async fn delete(&self, path: &Self::StoragePath) -> anyhow::Result<()> { diff --git a/pageserver/src/remote_storage/storage_sync/upload.rs b/pageserver/src/remote_storage/storage_sync/upload.rs index 76e92c2781..f955e04474 100644 --- a/pageserver/src/remote_storage/storage_sync/upload.rs +++ b/pageserver/src/remote_storage/storage_sync/upload.rs @@ -201,6 +201,7 @@ async fn try_upload_checkpoint< .upload( archive_streamer, &remote_storage.storage_path(&timeline_dir.join(&archive_name))?, + None, ) .await },