Files
neon/libs/remote_storage/src/local_fs.rs
Kirill Bulatov dec58092e8 Replace Box<dyn> with impl in RemoteStorage upload (#3984)
Replaces `Box<(dyn io::AsyncRead + Unpin + Send + Sync + 'static)>` with
`impl io::AsyncRead + Unpin + Send + Sync + 'static` usages in the
`RemoteStorage` interface, to make it closer to
[`#![feature(async_fn_in_trait)]`](https://blog.rust-lang.org/inside-rust/2022/11/17/async-fn-in-trait-nightly.html)

For `GenericRemoteStorage`, replaces `type Target = dyn RemoteStorage`
with another impl with `RemoteStorage` methods inside it.
We can reuse the trait, that would require importing the trait in every
file where it's used and makes us farther from the unstable feature.
After this PR, I've manged to create a patch with the changes:

https://github.com/neondatabase/neon/compare/kb/less-dyn-storage...kb/nightly-async-trait?expand=1

Current rust implementation does not like recursive async trait calls,
so `UnreliableWrapper` was removed: it contained a
`GenericRemoteStorage` that implemented the `RemoteStorage` trait, and
itself implemented the trait, which nightly rustc did not like and
proposed to box the future.
Similarly, `GenericRemoteStorage` cannot implement `RemoteStorage` for
nightly rustc to work, since calls various remote storages' methods from
inside.

I've compiled current `main` and the nightly branch both with `time env
RUSTC_WRAPPER="" cargo +nightly build --all --timings` command, and got
```
    Finished dev [optimized + debuginfo] target(s) in 2m 04s
env RUSTC_WRAPPER="" cargo +nightly build --all --timings  1283.19s user 50.40s system 1074% cpu 2:04.15 total

for the new feature tried and

    Finished dev [optimized + debuginfo] target(s) in 2m 40s
env RUSTC_WRAPPER="" cargo +nightly build --all --timings  1288.59s user 52.06s system 834% cpu 2:40.71 total

for the old async_trait approach.
```

On my machine, the `remote_storage` lib compilation takes ~10 less time
with the nightly feature (left) than the regular main (right).


![image](https://user-images.githubusercontent.com/2690773/230620797-163d8b89-dac8-4366-bcf6-cd1cdddcd22c.png)

Full cargo reports are available at
[timings.zip](https://github.com/neondatabase/neon/files/11179369/timings.zip)
2023-04-07 21:39:49 +03:00

706 lines
24 KiB
Rust

//! Local filesystem acting as a remote storage.
//! Multiple API users can use the same "storage" of this kind by using different storage roots.
//!
//! This storage used in tests, but can also be used in cases when a certain persistent
//! volume is mounted to the local FS.
use std::{
borrow::Cow,
future::Future,
path::{Path, PathBuf},
pin::Pin,
};
use anyhow::{bail, ensure, Context};
use tokio::{
fs,
io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
};
use tracing::*;
use utils::crashsafe::path_with_suffix_extension;
use crate::{Download, DownloadError, RemotePath};
use super::{RemoteStorage, StorageMetadata};
const LOCAL_FS_TEMP_FILE_SUFFIX: &str = "___temp";
#[derive(Debug, Clone)]
pub struct LocalFs {
storage_root: PathBuf,
}
impl LocalFs {
/// Attempts to create local FS storage, along with its root directory.
/// Storage root will be created (if does not exist) and transformed into an absolute path (if passed as relative).
pub fn new(mut storage_root: PathBuf) -> anyhow::Result<Self> {
if !storage_root.exists() {
std::fs::create_dir_all(&storage_root).with_context(|| {
format!("Failed to create all directories in the given root path {storage_root:?}")
})?;
}
if !storage_root.is_absolute() {
storage_root = storage_root.canonicalize().with_context(|| {
format!("Failed to represent path {storage_root:?} as an absolute path")
})?;
}
Ok(Self { storage_root })
}
async fn read_storage_metadata(
&self,
file_path: &Path,
) -> anyhow::Result<Option<StorageMetadata>> {
let metadata_path = storage_metadata_path(file_path);
if metadata_path.exists() && metadata_path.is_file() {
let metadata_string = fs::read_to_string(&metadata_path).await.with_context(|| {
format!(
"Failed to read metadata from the local storage at '{}'",
metadata_path.display()
)
})?;
serde_json::from_str(&metadata_string)
.with_context(|| {
format!(
"Failed to deserialize metadata from the local storage at '{}'",
metadata_path.display()
)
})
.map(|metadata| Some(StorageMetadata(metadata)))
} else {
Ok(None)
}
}
#[cfg(test)]
async fn list(&self) -> anyhow::Result<Vec<RemotePath>> {
Ok(get_all_files(&self.storage_root, true)
.await?
.into_iter()
.map(|path| {
path.strip_prefix(&self.storage_root)
.context("Failed to strip storage root prefix")
.and_then(RemotePath::new)
.expect(
"We list files for storage root, hence should be able to remote the prefix",
)
})
.collect())
}
}
#[async_trait::async_trait]
impl RemoteStorage for LocalFs {
async fn list_prefixes(
&self,
prefix: Option<&RemotePath>,
) -> Result<Vec<RemotePath>, DownloadError> {
let path = match prefix {
Some(prefix) => Cow::Owned(prefix.with_base(&self.storage_root)),
None => Cow::Borrowed(&self.storage_root),
};
Ok(get_all_files(path.as_ref(), false)
.await
.map_err(DownloadError::Other)?
.into_iter()
.map(|path| {
path.strip_prefix(&self.storage_root)
.context("Failed to strip preifix")
.and_then(RemotePath::new)
.expect(
"We list files for storage root, hence should be able to remote the prefix",
)
})
.collect())
}
async fn upload(
&self,
data: impl io::AsyncRead + Unpin + Send + Sync + 'static,
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
) -> anyhow::Result<()> {
let target_file_path = to.with_base(&self.storage_root);
create_target_directory(&target_file_path).await?;
// We need this dance with sort of durable rename (without fsyncs)
// to prevent partial uploads. This was really hit when pageserver shutdown
// cancelled the upload and partial file was left on the fs
let temp_file_path =
path_with_suffix_extension(&target_file_path, LOCAL_FS_TEMP_FILE_SUFFIX);
let mut destination = io::BufWriter::new(
fs::OpenOptions::new()
.write(true)
.create(true)
.open(&temp_file_path)
.await
.with_context(|| {
format!(
"Failed to open target fs destination at '{}'",
target_file_path.display()
)
})?,
);
let from_size_bytes = data_size_bytes as u64;
let mut buffer_to_read = data.take(from_size_bytes);
let bytes_read = io::copy(&mut buffer_to_read, &mut destination)
.await
.with_context(|| {
format!(
"Failed to upload file (write temp) to the local storage at '{}'",
temp_file_path.display()
)
})?;
if bytes_read < from_size_bytes {
bail!("Provided stream was shorter than expected: {bytes_read} vs {from_size_bytes} bytes");
}
// Check if there is any extra data after the given size.
let mut from = buffer_to_read.into_inner();
let extra_read = from.read(&mut [1]).await?;
ensure!(
extra_read == 0,
"Provided stream was larger than expected: expected {from_size_bytes} bytes",
);
destination.flush().await.with_context(|| {
format!(
"Failed to upload (flush temp) file to the local storage at '{}'",
temp_file_path.display()
)
})?;
fs::rename(temp_file_path, &target_file_path)
.await
.with_context(|| {
format!(
"Failed to upload (rename) file to the local storage at '{}'",
target_file_path.display()
)
})?;
if let Some(storage_metadata) = metadata {
let storage_metadata_path = storage_metadata_path(&target_file_path);
fs::write(
&storage_metadata_path,
serde_json::to_string(&storage_metadata.0)
.context("Failed to serialize storage metadata as json")?,
)
.await
.with_context(|| {
format!(
"Failed to write metadata to the local storage at '{}'",
storage_metadata_path.display()
)
})?;
}
Ok(())
}
async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
let target_path = from.with_base(&self.storage_root);
if file_exists(&target_path).map_err(DownloadError::BadInput)? {
let source = io::BufReader::new(
fs::OpenOptions::new()
.read(true)
.open(&target_path)
.await
.with_context(|| {
format!("Failed to open source file {target_path:?} to use in the download")
})
.map_err(DownloadError::Other)?,
);
let metadata = self
.read_storage_metadata(&target_path)
.await
.map_err(DownloadError::Other)?;
Ok(Download {
metadata,
download_stream: Box::pin(source),
})
} else {
Err(DownloadError::NotFound)
}
}
async fn download_byte_range(
&self,
from: &RemotePath,
start_inclusive: u64,
end_exclusive: Option<u64>,
) -> Result<Download, DownloadError> {
if let Some(end_exclusive) = end_exclusive {
if end_exclusive <= start_inclusive {
return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) is not less than end_exclusive ({end_exclusive:?})")));
};
if start_inclusive == end_exclusive.saturating_sub(1) {
return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) and end_exclusive ({end_exclusive:?}) difference is zero bytes")));
}
}
let target_path = from.with_base(&self.storage_root);
if file_exists(&target_path).map_err(DownloadError::BadInput)? {
let mut source = io::BufReader::new(
fs::OpenOptions::new()
.read(true)
.open(&target_path)
.await
.with_context(|| {
format!("Failed to open source file {target_path:?} to use in the download")
})
.map_err(DownloadError::Other)?,
);
source
.seek(io::SeekFrom::Start(start_inclusive))
.await
.context("Failed to seek to the range start in a local storage file")
.map_err(DownloadError::Other)?;
let metadata = self
.read_storage_metadata(&target_path)
.await
.map_err(DownloadError::Other)?;
Ok(match end_exclusive {
Some(end_exclusive) => Download {
metadata,
download_stream: Box::pin(source.take(end_exclusive - start_inclusive)),
},
None => Download {
metadata,
download_stream: Box::pin(source),
},
})
} else {
Err(DownloadError::NotFound)
}
}
async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
let file_path = path.with_base(&self.storage_root);
if file_path.exists() && file_path.is_file() {
Ok(fs::remove_file(file_path).await?)
} else {
bail!("File {file_path:?} either does not exist or is not a file")
}
}
}
fn storage_metadata_path(original_path: &Path) -> PathBuf {
path_with_suffix_extension(original_path, "metadata")
}
fn get_all_files<'a, P>(
directory_path: P,
recursive: bool,
) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<PathBuf>>> + Send + Sync + 'a>>
where
P: AsRef<Path> + Send + Sync + 'a,
{
Box::pin(async move {
let directory_path = directory_path.as_ref();
if directory_path.exists() {
if directory_path.is_dir() {
let mut paths = Vec::new();
let mut dir_contents = fs::read_dir(directory_path).await?;
while let Some(dir_entry) = dir_contents.next_entry().await? {
let file_type = dir_entry.file_type().await?;
let entry_path = dir_entry.path();
if file_type.is_symlink() {
debug!("{entry_path:?} us a symlink, skipping")
} else if file_type.is_dir() {
if recursive {
paths.extend(get_all_files(&entry_path, true).await?.into_iter())
} else {
paths.push(entry_path)
}
} else {
paths.push(entry_path);
}
}
Ok(paths)
} else {
bail!("Path {directory_path:?} is not a directory")
}
} else {
Ok(Vec::new())
}
})
}
async fn create_target_directory(target_file_path: &Path) -> anyhow::Result<()> {
let target_dir = match target_file_path.parent() {
Some(parent_dir) => parent_dir,
None => bail!(
"File path '{}' has no parent directory",
target_file_path.display()
),
};
if !target_dir.exists() {
fs::create_dir_all(target_dir).await?;
}
Ok(())
}
fn file_exists(file_path: &Path) -> anyhow::Result<bool> {
if file_path.exists() {
ensure!(
file_path.is_file(),
"file path '{}' is not a file",
file_path.display()
);
Ok(true)
} else {
Ok(false)
}
}
#[cfg(test)]
mod fs_tests {
use super::*;
use std::{collections::HashMap, io::Write};
use tempfile::tempdir;
async fn read_and_assert_remote_file_contents(
storage: &LocalFs,
#[allow(clippy::ptr_arg)]
// have to use &PathBuf due to `storage.local_path` parameter requirements
remote_storage_path: &RemotePath,
expected_metadata: Option<&StorageMetadata>,
) -> anyhow::Result<String> {
let mut download = storage
.download(remote_storage_path)
.await
.map_err(|e| anyhow::anyhow!("Download failed: {e}"))?;
ensure!(
download.metadata.as_ref() == expected_metadata,
"Unexpected metadata returned for the downloaded file"
);
let mut contents = String::new();
download
.download_stream
.read_to_string(&mut contents)
.await
.context("Failed to read remote file contents into string")?;
Ok(contents)
}
#[tokio::test]
async fn upload_file() -> anyhow::Result<()> {
let storage = create_storage()?;
let target_path_1 = upload_dummy_file(&storage, "upload_1", None).await?;
assert_eq!(
storage.list().await?,
vec![target_path_1.clone()],
"Should list a single file after first upload"
);
let target_path_2 = upload_dummy_file(&storage, "upload_2", None).await?;
assert_eq!(
list_files_sorted(&storage).await?,
vec![target_path_1.clone(), target_path_2.clone()],
"Should list a two different files after second upload"
);
Ok(())
}
#[tokio::test]
async fn upload_file_negatives() -> anyhow::Result<()> {
let storage = create_storage()?;
let id = RemotePath::new(Path::new("dummy"))?;
let content = std::io::Cursor::new(b"12345");
// Check that you get an error if the size parameter doesn't match the actual
// size of the stream.
storage
.upload(Box::new(content.clone()), 0, &id, None)
.await
.expect_err("upload with zero size succeeded");
storage
.upload(Box::new(content.clone()), 4, &id, None)
.await
.expect_err("upload with too short size succeeded");
storage
.upload(Box::new(content.clone()), 6, &id, None)
.await
.expect_err("upload with too large size succeeded");
// Correct size is 5, this should succeed.
storage.upload(Box::new(content), 5, &id, None).await?;
Ok(())
}
fn create_storage() -> anyhow::Result<LocalFs> {
LocalFs::new(tempdir()?.path().to_owned())
}
#[tokio::test]
async fn download_file() -> anyhow::Result<()> {
let storage = create_storage()?;
let upload_name = "upload_1";
let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
let contents = read_and_assert_remote_file_contents(&storage, &upload_target, None).await?;
assert_eq!(
dummy_contents(upload_name),
contents,
"We should upload and download the same contents"
);
let non_existing_path = "somewhere/else";
match storage.download(&RemotePath::new(Path::new(non_existing_path))?).await {
Err(DownloadError::NotFound) => {} // Should get NotFound for non existing keys
other => panic!("Should get a NotFound error when downloading non-existing storage files, but got: {other:?}"),
}
Ok(())
}
#[tokio::test]
async fn download_file_range_positive() -> anyhow::Result<()> {
let storage = create_storage()?;
let upload_name = "upload_1";
let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
let full_range_download_contents =
read_and_assert_remote_file_contents(&storage, &upload_target, None).await?;
assert_eq!(
dummy_contents(upload_name),
full_range_download_contents,
"Download full range should return the whole upload"
);
let uploaded_bytes = dummy_contents(upload_name).into_bytes();
let (first_part_local, second_part_local) = uploaded_bytes.split_at(3);
let mut first_part_download = storage
.download_byte_range(&upload_target, 0, Some(first_part_local.len() as u64))
.await?;
assert!(
first_part_download.metadata.is_none(),
"No metadata should be returned for no metadata upload"
);
let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
io::copy(
&mut first_part_download.download_stream,
&mut first_part_remote,
)
.await?;
first_part_remote.flush().await?;
let first_part_remote = first_part_remote.into_inner().into_inner();
assert_eq!(
first_part_local,
first_part_remote.as_slice(),
"First part bytes should be returned when requested"
);
let mut second_part_download = storage
.download_byte_range(
&upload_target,
first_part_local.len() as u64,
Some((first_part_local.len() + second_part_local.len()) as u64),
)
.await?;
assert!(
second_part_download.metadata.is_none(),
"No metadata should be returned for no metadata upload"
);
let mut second_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
io::copy(
&mut second_part_download.download_stream,
&mut second_part_remote,
)
.await?;
second_part_remote.flush().await?;
let second_part_remote = second_part_remote.into_inner().into_inner();
assert_eq!(
second_part_local,
second_part_remote.as_slice(),
"Second part bytes should be returned when requested"
);
Ok(())
}
#[tokio::test]
async fn download_file_range_negative() -> anyhow::Result<()> {
let storage = create_storage()?;
let upload_name = "upload_1";
let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
let start = 1_000_000_000;
let end = start + 1;
match storage
.download_byte_range(
&upload_target,
start,
Some(end), // exclusive end
)
.await
{
Ok(_) => panic!("Should not allow downloading wrong ranges"),
Err(e) => {
let error_string = e.to_string();
assert!(error_string.contains("zero bytes"));
assert!(error_string.contains(&start.to_string()));
assert!(error_string.contains(&end.to_string()));
}
}
let start = 10000;
let end = 234;
assert!(start > end, "Should test an incorrect range");
match storage
.download_byte_range(&upload_target, start, Some(end))
.await
{
Ok(_) => panic!("Should not allow downloading wrong ranges"),
Err(e) => {
let error_string = e.to_string();
assert!(error_string.contains("Invalid range"));
assert!(error_string.contains(&start.to_string()));
assert!(error_string.contains(&end.to_string()));
}
}
Ok(())
}
#[tokio::test]
async fn delete_file() -> anyhow::Result<()> {
let storage = create_storage()?;
let upload_name = "upload_1";
let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
storage.delete(&upload_target).await?;
assert!(storage.list().await?.is_empty());
match storage.delete(&upload_target).await {
Ok(()) => panic!("Should not allow deleting non-existing storage files"),
Err(e) => {
let error_string = e.to_string();
assert!(error_string.contains("does not exist"));
let expected_path = upload_target.with_base(&storage.storage_root);
assert!(error_string.contains(expected_path.to_str().unwrap()));
}
}
Ok(())
}
#[tokio::test]
async fn file_with_metadata() -> anyhow::Result<()> {
let storage = create_storage()?;
let upload_name = "upload_1";
let metadata = StorageMetadata(HashMap::from([
("one".to_string(), "1".to_string()),
("two".to_string(), "2".to_string()),
]));
let upload_target =
upload_dummy_file(&storage, upload_name, Some(metadata.clone())).await?;
let full_range_download_contents =
read_and_assert_remote_file_contents(&storage, &upload_target, Some(&metadata)).await?;
assert_eq!(
dummy_contents(upload_name),
full_range_download_contents,
"We should upload and download the same contents"
);
let uploaded_bytes = dummy_contents(upload_name).into_bytes();
let (first_part_local, _) = uploaded_bytes.split_at(3);
let mut partial_download_with_metadata = storage
.download_byte_range(&upload_target, 0, Some(first_part_local.len() as u64))
.await?;
let mut first_part_remote = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
io::copy(
&mut partial_download_with_metadata.download_stream,
&mut first_part_remote,
)
.await?;
first_part_remote.flush().await?;
let first_part_remote = first_part_remote.into_inner().into_inner();
assert_eq!(
first_part_local,
first_part_remote.as_slice(),
"First part bytes should be returned when requested"
);
assert_eq!(
partial_download_with_metadata.metadata,
Some(metadata),
"We should get the same metadata back for partial download"
);
Ok(())
}
async fn upload_dummy_file(
storage: &LocalFs,
name: &str,
metadata: Option<StorageMetadata>,
) -> anyhow::Result<RemotePath> {
let from_path = storage
.storage_root
.join("timelines")
.join("some_timeline")
.join(name);
let (file, size) = create_file_for_upload(&from_path, &dummy_contents(name)).await?;
let relative_path = from_path
.strip_prefix(&storage.storage_root)
.context("Failed to strip storage root prefix")
.and_then(RemotePath::new)
.with_context(|| {
format!(
"Failed to resolve remote part of path {:?} for base {:?}",
from_path, storage.storage_root
)
})?;
storage
.upload(Box::new(file), size, &relative_path, metadata)
.await?;
Ok(relative_path)
}
async fn create_file_for_upload(
path: &Path,
contents: &str,
) -> anyhow::Result<(io::BufReader<fs::File>, usize)> {
std::fs::create_dir_all(path.parent().unwrap())?;
let mut file_for_writing = std::fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(path)?;
write!(file_for_writing, "{}", contents)?;
drop(file_for_writing);
let file_size = path.metadata()?.len() as usize;
Ok((
io::BufReader::new(fs::OpenOptions::new().read(true).open(&path).await?),
file_size,
))
}
fn dummy_contents(name: &str) -> String {
format!("contents for {name}")
}
async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result<Vec<RemotePath>> {
let mut files = storage.list().await?;
files.sort_by(|a, b| a.0.cmp(&b.0));
Ok(files)
}
}