Rework remote_storage interface (#2993)

Changes:

* Remove `RemoteObjectId` concept from remote_storage.
Operate directly on /-separated names instead.
These names are now represented by struct `RemotePath` which was renamed from struct `RelativePath`

* Require remote storage to operate on relative paths for its contents, thus simplifying the way to derive them in pageserver and safekeeper

* Make `IndexPart` to use `String` instead of `RelativePath` for its entries, since those are just the layer names
This commit is contained in:
Kirill Bulatov
2022-12-07 23:11:02 +02:00
committed by GitHub
parent ac0c167a85
commit b50e0793cf
14 changed files with 492 additions and 1023 deletions

View File

@@ -10,7 +10,7 @@ mod s3_bucket;
use std::{
collections::HashMap,
fmt::{Debug, Display},
fmt::Debug,
num::{NonZeroU32, NonZeroUsize},
ops::Deref,
path::{Path, PathBuf},
@@ -41,44 +41,27 @@ pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100;
const REMOTE_STORAGE_PREFIX_SEPARATOR: char = '/';
#[derive(Clone, PartialEq, Eq)]
pub struct RemoteObjectId(String);
/// Path on the remote storage, relative to some inner prefix.
/// The prefix is an implementation detail, that allows representing local paths
/// as the remote ones, stripping the local storage prefix away.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct RemotePath(PathBuf);
impl RemotePath {
pub fn new(relative_path: &Path) -> anyhow::Result<Self> {
anyhow::ensure!(
relative_path.is_relative(),
"Path {relative_path:?} is not relative"
);
Ok(Self(relative_path.to_path_buf()))
}
pub fn with_base(&self, base_path: &Path) -> PathBuf {
base_path.join(&self.0)
}
///
/// A key that refers to an object in remote storage. It works much like a Path,
/// but it's a separate datatype so that you don't accidentally mix local paths
/// and remote keys.
///
impl RemoteObjectId {
// Needed to retrieve last component for RemoteObjectId.
// In other words a file name
/// Turn a/b/c or a/b/c/ into c
pub fn object_name(&self) -> Option<&str> {
// corner case, char::to_string is not const, thats why this is more verbose than it needs to be
// see https://github.com/rust-lang/rust/issues/88674
if self.0.len() == 1 && self.0.chars().next().unwrap() == REMOTE_STORAGE_PREFIX_SEPARATOR {
return None;
}
if self.0.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
self.0.rsplit(REMOTE_STORAGE_PREFIX_SEPARATOR).nth(1)
} else {
self.0
.rsplit_once(REMOTE_STORAGE_PREFIX_SEPARATOR)
.map(|(_, last)| last)
}
}
}
impl Debug for RemoteObjectId {
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
Debug::fmt(&self.0, fmt)
}
}
impl Display for RemoteObjectId {
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Display::fmt(&self.0, fmt)
self.0.file_name().and_then(|os_str| os_str.to_str())
}
}
@@ -87,49 +70,40 @@ impl Display for RemoteObjectId {
/// providing basic CRUD operations for storage files.
#[async_trait::async_trait]
pub trait RemoteStorage: Send + Sync + 'static {
/// Attempts to derive the storage path out of the local path, if the latter is correct.
fn remote_object_id(&self, local_path: &Path) -> anyhow::Result<RemoteObjectId>;
/// Gets the download path of the given storage file.
fn local_path(&self, remote_object_id: &RemoteObjectId) -> anyhow::Result<PathBuf>;
/// Lists all items the storage has right now.
async fn list(&self) -> anyhow::Result<Vec<RemoteObjectId>>;
async fn list(&self) -> anyhow::Result<Vec<RemotePath>>;
/// Lists all top level subdirectories for a given prefix
/// Note: here we assume that if the prefix is passed it was obtained via remote_object_id
/// which already takes into account any kind of global prefix (prefix_in_bucket for S3 or storage_root for LocalFS)
/// so this method doesnt need to.
async fn list_prefixes(
&self,
prefix: Option<&RemoteObjectId>,
) -> anyhow::Result<Vec<RemoteObjectId>>;
async fn list_prefixes(&self, prefix: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>>;
/// Streams the local file contents into remote into the remote storage entry.
async fn upload(
&self,
from: Box<(dyn io::AsyncRead + Unpin + Send + Sync + 'static)>,
data: Box<(dyn io::AsyncRead + Unpin + Send + Sync + 'static)>,
// S3 PUT request requires the content length to be specified,
// otherwise it starts to fail with the concurrent connection count increasing.
from_size_bytes: usize,
to: &RemoteObjectId,
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
) -> anyhow::Result<()>;
/// Streams the remote storage entry contents into the buffered writer given, returns the filled writer.
/// Returns the metadata, if any was stored with the file previously.
async fn download(&self, from: &RemoteObjectId) -> Result<Download, DownloadError>;
async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError>;
/// Streams a given byte range of the remote storage entry contents into the buffered writer given, returns the filled writer.
/// Returns the metadata, if any was stored with the file previously.
async fn download_byte_range(
&self,
from: &RemoteObjectId,
from: &RemotePath,
start_inclusive: u64,
end_exclusive: Option<u64>,
) -> Result<Download, DownloadError>;
async fn delete(&self, path: &RemoteObjectId) -> anyhow::Result<()>;
async fn delete(&self, path: &RemotePath) -> anyhow::Result<()>;
/// Downcast to LocalFs implementation. For tests.
fn as_local(&self) -> Option<&LocalFs> {
@@ -196,18 +170,17 @@ impl Deref for GenericRemoteStorage {
impl GenericRemoteStorage {
pub fn from_config(
working_directory: PathBuf,
storage_config: &RemoteStorageConfig,
) -> anyhow::Result<GenericRemoteStorage> {
Ok(match &storage_config.storage {
RemoteStorageKind::LocalFs(root) => {
info!("Using fs root '{}' as a remote storage", root.display());
GenericRemoteStorage::LocalFs(LocalFs::new(root.clone(), working_directory)?)
GenericRemoteStorage::LocalFs(LocalFs::new(root.clone())?)
}
RemoteStorageKind::AwsS3(s3_config) => {
info!("Using s3 bucket '{}' in region '{}' as a remote storage, prefix in bucket: '{:?}', bucket endpoint: '{:?}'",
s3_config.bucket_name, s3_config.bucket_region, s3_config.prefix_in_bucket, s3_config.endpoint);
GenericRemoteStorage::AwsS3(Arc::new(S3Bucket::new(s3_config, working_directory)?))
GenericRemoteStorage::AwsS3(Arc::new(S3Bucket::new(s3_config)?))
}
})
}
@@ -221,23 +194,12 @@ impl GenericRemoteStorage {
&self,
from: Box<dyn tokio::io::AsyncRead + Unpin + Send + Sync + 'static>,
from_size_bytes: usize,
from_path: &Path,
to: &RemotePath,
) -> anyhow::Result<()> {
let target_storage_path = self.remote_object_id(from_path).with_context(|| {
format!(
"Failed to get the storage path for source local path '{}'",
from_path.display()
)
})?;
self.upload(from, from_size_bytes, &target_storage_path, None)
self.upload(from, from_size_bytes, to, None)
.await
.with_context(|| {
format!(
"Failed to upload from '{}' to storage path '{:?}'",
from_path.display(),
target_storage_path
)
format!("Failed to upload data of length {from_size_bytes} to storage path {to:?}")
})
}
@@ -246,24 +208,11 @@ impl GenericRemoteStorage {
pub async fn download_storage_object(
&self,
byte_range: Option<(u64, Option<u64>)>,
to_path: &Path,
from: &RemotePath,
) -> Result<Download, DownloadError> {
let remote_object_path = self
.remote_object_id(to_path)
.with_context(|| {
format!(
"Failed to get the storage path for target local path '{}'",
to_path.display()
)
})
.map_err(DownloadError::BadInput)?;
match byte_range {
Some((start, end)) => {
self.download_byte_range(&remote_object_path, start, end)
.await
}
None => self.download(&remote_object_path).await,
Some((start, end)) => self.download_byte_range(from, start, end).await,
None => self.download(from).await,
}
}
}
@@ -273,23 +222,6 @@ impl GenericRemoteStorage {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct StorageMetadata(HashMap<String, String>);
fn strip_path_prefix<'a>(prefix: &'a Path, path: &'a Path) -> anyhow::Result<&'a Path> {
if prefix == path {
anyhow::bail!(
"Prefix and the path are equal, cannot strip: '{}'",
prefix.display()
)
} else {
path.strip_prefix(prefix).with_context(|| {
format!(
"Path '{}' is not prefixed with '{}'",
path.display(),
prefix.display(),
)
})
}
}
/// External backup storage configuration, enough for creating a client for that storage.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RemoteStorageConfig {
@@ -433,21 +365,24 @@ mod tests {
use super::*;
#[test]
fn object_name() {
let k = RemoteObjectId("a/b/c".to_owned());
fn test_object_name() {
let k = RemotePath::new(Path::new("a/b/c")).unwrap();
assert_eq!(k.object_name(), Some("c"));
let k = RemoteObjectId("a/b/c/".to_owned());
let k = RemotePath::new(Path::new("a/b/c/")).unwrap();
assert_eq!(k.object_name(), Some("c"));
let k = RemoteObjectId("a/".to_owned());
let k = RemotePath::new(Path::new("a/")).unwrap();
assert_eq!(k.object_name(), Some("a"));
// XXX is it impossible to have an empty key?
let k = RemoteObjectId("".to_owned());
assert_eq!(k.object_name(), None);
let k = RemoteObjectId("/".to_owned());
let k = RemotePath::new(Path::new("")).unwrap();
assert_eq!(k.object_name(), None);
}
#[test]
fn rempte_path_cannot_be_created_from_absolute_ones() {
let err = RemotePath::new(Path::new("/")).expect_err("Should fail on absolute paths");
assert_eq!(err.to_string(), "Path \"/\" is not relative");
}
}

View File

@@ -5,6 +5,7 @@
//! volume is mounted to the local FS.
use std::{
borrow::Cow,
future::Future,
path::{Path, PathBuf},
pin::Pin,
@@ -18,61 +19,33 @@ use tokio::{
use tracing::*;
use utils::crashsafe::path_with_suffix_extension;
use crate::{Download, DownloadError, RemoteObjectId};
use crate::{Download, DownloadError, RemotePath};
use super::{strip_path_prefix, RemoteStorage, StorageMetadata};
use super::{RemoteStorage, StorageMetadata};
const LOCAL_FS_TEMP_FILE_SUFFIX: &str = "___temp";
/// Convert a Path in the remote storage into a RemoteObjectId
fn remote_object_id_from_path(path: &Path) -> anyhow::Result<RemoteObjectId> {
Ok(RemoteObjectId(
path.to_str()
.ok_or_else(|| anyhow::anyhow!("unexpected characters found in path"))?
.to_string(),
))
}
#[derive(Debug, Clone)]
pub struct LocalFs {
working_directory: PathBuf,
storage_root: PathBuf,
}
impl LocalFs {
/// Attempts to create local FS storage, along with its root directory.
pub fn new(root: PathBuf, working_directory: PathBuf) -> anyhow::Result<Self> {
if !root.exists() {
std::fs::create_dir_all(&root).with_context(|| {
format!(
"Failed to create all directories in the given root path '{}'",
root.display(),
)
/// Storage root will be created (if does not exist) and transformed into an absolute path (if passed as relative).
pub fn new(mut storage_root: PathBuf) -> anyhow::Result<Self> {
if !storage_root.exists() {
std::fs::create_dir_all(&storage_root).with_context(|| {
format!("Failed to create all directories in the given root path {storage_root:?}")
})?;
}
Ok(Self {
working_directory,
storage_root: root,
})
}
///
/// Get the absolute path in the local filesystem to given remote object.
///
/// This is public so that it can be used in tests. Should not be used elsewhere.
///
pub fn resolve_in_storage(&self, remote_object_id: &RemoteObjectId) -> anyhow::Result<PathBuf> {
let path = PathBuf::from(&remote_object_id.0);
if path.is_relative() {
Ok(self.storage_root.join(path))
} else if path.starts_with(&self.storage_root) {
Ok(path)
} else {
bail!(
"Path '{}' does not belong to the current storage",
path.display()
)
if !storage_root.is_absolute() {
storage_root = storage_root.canonicalize().with_context(|| {
format!("Failed to represent path {storage_root:?} as an absolute path")
})?;
}
Ok(Self { storage_root })
}
async fn read_storage_metadata(
@@ -104,45 +77,48 @@ impl LocalFs {
#[async_trait::async_trait]
impl RemoteStorage for LocalFs {
/// Convert a "local" path into a "remote path"
fn remote_object_id(&self, local_path: &Path) -> anyhow::Result<RemoteObjectId> {
let path = self.storage_root.join(
strip_path_prefix(&self.working_directory, local_path)
.context("local path does not belong to this storage")?,
);
remote_object_id_from_path(&path)
async fn list(&self) -> anyhow::Result<Vec<RemotePath>> {
Ok(get_all_files(&self.storage_root, true)
.await?
.into_iter()
.map(|path| {
path.strip_prefix(&self.storage_root)
.context("Failed to strip storage root prefix")
.and_then(RemotePath::new)
.expect(
"We list files for storage root, hence should be able to remote the prefix",
)
})
.collect())
}
fn local_path(&self, remote_object_id: &RemoteObjectId) -> anyhow::Result<PathBuf> {
let storage_path = PathBuf::from(&remote_object_id.0);
let relative_path = strip_path_prefix(&self.storage_root, &storage_path)
.context("local path does not belong to this storage")?;
Ok(self.working_directory.join(relative_path))
}
async fn list(&self) -> anyhow::Result<Vec<RemoteObjectId>> {
get_all_files(&self.storage_root, true).await
}
async fn list_prefixes(
&self,
prefix: Option<&RemoteObjectId>,
) -> anyhow::Result<Vec<RemoteObjectId>> {
async fn list_prefixes(&self, prefix: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
let path = match prefix {
Some(prefix) => Path::new(&prefix.0),
None => &self.storage_root,
Some(prefix) => Cow::Owned(prefix.with_base(&self.storage_root)),
None => Cow::Borrowed(&self.storage_root),
};
get_all_files(path, false).await
Ok(get_all_files(path.as_ref(), false)
.await?
.into_iter()
.map(|path| {
path.strip_prefix(&self.storage_root)
.context("Failed to strip preifix")
.and_then(RemotePath::new)
.expect(
"We list files for storage root, hence should be able to remote the prefix",
)
})
.collect())
}
async fn upload(
&self,
from: Box<(dyn io::AsyncRead + Unpin + Send + Sync + 'static)>,
from_size_bytes: usize,
to: &RemoteObjectId,
data: Box<(dyn io::AsyncRead + Unpin + Send + Sync + 'static)>,
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
) -> anyhow::Result<()> {
let target_file_path = self.resolve_in_storage(to)?;
let target_file_path = to.with_base(&self.storage_root);
create_target_directory(&target_file_path).await?;
// We need this dance with sort of durable rename (without fsyncs)
// to prevent partial uploads. This was really hit when pageserver shutdown
@@ -163,8 +139,8 @@ impl RemoteStorage for LocalFs {
})?,
);
let from_size_bytes = from_size_bytes as u64;
let mut buffer_to_read = from.take(from_size_bytes);
let from_size_bytes = data_size_bytes as u64;
let mut buffer_to_read = data.take(from_size_bytes);
let bytes_read = io::copy(&mut buffer_to_read, &mut destination)
.await
@@ -221,27 +197,22 @@ impl RemoteStorage for LocalFs {
Ok(())
}
async fn download(&self, from: &RemoteObjectId) -> Result<Download, DownloadError> {
let file_path = self
.resolve_in_storage(from)
.map_err(DownloadError::BadInput)?;
if file_exists(&file_path).map_err(DownloadError::BadInput)? {
async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
let target_path = from.with_base(&self.storage_root);
if file_exists(&target_path).map_err(DownloadError::BadInput)? {
let source = io::BufReader::new(
fs::OpenOptions::new()
.read(true)
.open(&file_path)
.open(&target_path)
.await
.with_context(|| {
format!(
"Failed to open source file '{}' to use in the download",
file_path.display()
)
format!("Failed to open source file {target_path:?} to use in the download")
})
.map_err(DownloadError::Other)?,
);
let metadata = self
.read_storage_metadata(&file_path)
.read_storage_metadata(&target_path)
.await
.map_err(DownloadError::Other)?;
Ok(Download {
@@ -255,7 +226,7 @@ impl RemoteStorage for LocalFs {
async fn download_byte_range(
&self,
from: &RemoteObjectId,
from: &RemotePath,
start_inclusive: u64,
end_exclusive: Option<u64>,
) -> Result<Download, DownloadError> {
@@ -267,20 +238,15 @@ impl RemoteStorage for LocalFs {
return Err(DownloadError::Other(anyhow::anyhow!("Invalid range, start ({start_inclusive}) and end_exclusive ({end_exclusive:?}) difference is zero bytes")));
}
}
let file_path = self
.resolve_in_storage(from)
.map_err(DownloadError::BadInput)?;
if file_exists(&file_path).map_err(DownloadError::BadInput)? {
let target_path = from.with_base(&self.storage_root);
if file_exists(&target_path).map_err(DownloadError::BadInput)? {
let mut source = io::BufReader::new(
fs::OpenOptions::new()
.read(true)
.open(&file_path)
.open(&target_path)
.await
.with_context(|| {
format!(
"Failed to open source file '{}' to use in the download",
file_path.display()
)
format!("Failed to open source file {target_path:?} to use in the download")
})
.map_err(DownloadError::Other)?,
);
@@ -290,7 +256,7 @@ impl RemoteStorage for LocalFs {
.context("Failed to seek to the range start in a local storage file")
.map_err(DownloadError::Other)?;
let metadata = self
.read_storage_metadata(&file_path)
.read_storage_metadata(&target_path)
.await
.map_err(DownloadError::Other)?;
@@ -309,15 +275,12 @@ impl RemoteStorage for LocalFs {
}
}
async fn delete(&self, path: &RemoteObjectId) -> anyhow::Result<()> {
let file_path = self.resolve_in_storage(path)?;
async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
let file_path = path.with_base(&self.storage_root);
if file_path.exists() && file_path.is_file() {
Ok(fs::remove_file(file_path).await?)
} else {
bail!(
"File '{}' either does not exist or is not a file",
file_path.display()
)
bail!("File {file_path:?} either does not exist or is not a file")
}
}
@@ -333,7 +296,7 @@ fn storage_metadata_path(original_path: &Path) -> PathBuf {
fn get_all_files<'a, P>(
directory_path: P,
recursive: bool,
) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<RemoteObjectId>>> + Send + Sync + 'a>>
) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<PathBuf>>> + Send + Sync + 'a>>
where
P: AsRef<Path> + Send + Sync + 'a,
{
@@ -347,20 +310,20 @@ where
let file_type = dir_entry.file_type().await?;
let entry_path = dir_entry.path();
if file_type.is_symlink() {
debug!("{:?} us a symlink, skipping", entry_path)
debug!("{entry_path:?} us a symlink, skipping")
} else if file_type.is_dir() {
if recursive {
paths.extend(get_all_files(&entry_path, true).await?.into_iter())
} else {
paths.push(remote_object_id_from_path(&dir_entry.path())?)
paths.push(entry_path)
}
} else {
paths.push(remote_object_id_from_path(&dir_entry.path())?);
paths.push(entry_path);
}
}
Ok(paths)
} else {
bail!("Path '{}' is not a directory", directory_path.display())
bail!("Path {directory_path:?} is not a directory")
}
} else {
Ok(Vec::new())
@@ -395,173 +358,6 @@ fn file_exists(file_path: &Path) -> anyhow::Result<bool> {
}
}
#[cfg(test)]
mod pure_tests {
use tempfile::tempdir;
use super::*;
#[test]
fn storage_path_positive() -> anyhow::Result<()> {
let workdir = tempdir()?.path().to_owned();
let storage_root = PathBuf::from("somewhere").join("else");
let storage = LocalFs {
working_directory: workdir.clone(),
storage_root: storage_root.clone(),
};
let local_path = workdir
.join("timelines")
.join("some_timeline")
.join("file_name");
let expected_path = storage_root.join(local_path.strip_prefix(&workdir)?);
let actual_path = PathBuf::from(
storage
.remote_object_id(&local_path)
.expect("Matching path should map to storage path normally")
.0,
);
assert_eq!(
expected_path,
actual_path,
"File paths from workdir should be stored in local fs storage with the same path they have relative to the workdir"
);
Ok(())
}
#[test]
fn storage_path_negatives() -> anyhow::Result<()> {
#[track_caller]
fn storage_path_error(storage: &LocalFs, mismatching_path: &Path) -> String {
match storage.remote_object_id(mismatching_path) {
Ok(wrong_path) => panic!(
"Expected path '{}' to error, but got storage path: {:?}",
mismatching_path.display(),
wrong_path,
),
Err(e) => format!("{:?}", e),
}
}
let workdir = tempdir()?.path().to_owned();
let storage_root = PathBuf::from("somewhere").join("else");
let storage = LocalFs {
working_directory: workdir.clone(),
storage_root,
};
let error_string = storage_path_error(&storage, &workdir);
assert!(error_string.contains("does not belong to this storage"));
assert!(error_string.contains(workdir.to_str().unwrap()));
let mismatching_path_str = "/something/else";
let error_message = storage_path_error(&storage, Path::new(mismatching_path_str));
assert!(
error_message.contains(mismatching_path_str),
"Error should mention wrong path"
);
assert!(
error_message.contains(workdir.to_str().unwrap()),
"Error should mention server workdir"
);
assert!(error_message.contains("does not belong to this storage"));
Ok(())
}
#[test]
fn local_path_positive() -> anyhow::Result<()> {
let workdir = tempdir()?.path().to_owned();
let storage_root = PathBuf::from("somewhere").join("else");
let storage = LocalFs {
working_directory: workdir.clone(),
storage_root: storage_root.clone(),
};
let name = "not a metadata";
let local_path = workdir.join("timelines").join("some_timeline").join(name);
assert_eq!(
local_path,
storage
.local_path(&remote_object_id_from_path(
&storage_root.join(local_path.strip_prefix(&workdir)?)
)?)
.expect("For a valid input, valid local path should be parsed"),
"Should be able to parse metadata out of the correctly named remote delta file"
);
let local_metadata_path = workdir
.join("timelines")
.join("some_timeline")
.join("metadata");
let remote_metadata_path = storage.remote_object_id(&local_metadata_path)?;
assert_eq!(
local_metadata_path,
storage
.local_path(&remote_metadata_path)
.expect("For a valid input, valid local path should be parsed"),
"Should be able to parse metadata out of the correctly named remote metadata file"
);
Ok(())
}
#[test]
fn local_path_negatives() -> anyhow::Result<()> {
#[track_caller]
fn local_path_error(storage: &LocalFs, storage_path: &RemoteObjectId) -> String {
match storage.local_path(storage_path) {
Ok(wrong_path) => panic!(
"Expected local path input {:?} to cause an error, but got file path: {:?}",
storage_path, wrong_path,
),
Err(e) => format!("{:?}", e),
}
}
let storage_root = PathBuf::from("somewhere").join("else");
let storage = LocalFs {
working_directory: tempdir()?.path().to_owned(),
storage_root,
};
let totally_wrong_path = "wrong_wrong_wrong";
let error_message =
local_path_error(&storage, &RemoteObjectId(totally_wrong_path.to_string()));
assert!(error_message.contains(totally_wrong_path));
Ok(())
}
#[test]
fn download_destination_matches_original_path() -> anyhow::Result<()> {
let workdir = tempdir()?.path().to_owned();
let original_path = workdir
.join("timelines")
.join("some_timeline")
.join("some name");
let storage_root = PathBuf::from("somewhere").join("else");
let dummy_storage = LocalFs {
working_directory: workdir,
storage_root,
};
let storage_path = dummy_storage.remote_object_id(&original_path)?;
let download_destination = dummy_storage.local_path(&storage_path)?;
assert_eq!(
original_path, download_destination,
"'original path -> storage path -> matching fs path' transformation should produce the same path as the input one for the correct path"
);
Ok(())
}
}
#[cfg(test)]
mod fs_tests {
use super::*;
@@ -573,7 +369,7 @@ mod fs_tests {
storage: &LocalFs,
#[allow(clippy::ptr_arg)]
// have to use &PathBuf due to `storage.local_path` parameter requirements
remote_storage_path: &RemoteObjectId,
remote_storage_path: &RemotePath,
expected_metadata: Option<&StorageMetadata>,
) -> anyhow::Result<String> {
let mut download = storage
@@ -596,41 +392,16 @@ mod fs_tests {
#[tokio::test]
async fn upload_file() -> anyhow::Result<()> {
let workdir = tempdir()?.path().to_owned();
let storage = create_storage()?;
let (file, size) = create_file_for_upload(
&storage.working_directory.join("whatever"),
"whatever_contents",
)
.await?;
let target_path = "/somewhere/else";
match storage
.upload(
Box::new(file),
size,
&RemoteObjectId(target_path.to_string()),
None,
)
.await
{
Ok(()) => panic!("Should not allow storing files with wrong target path"),
Err(e) => {
let message = format!("{:?}", e);
assert!(message.contains(target_path));
assert!(message.contains("does not belong to the current storage"));
}
}
assert!(storage.list().await?.is_empty());
let target_path_1 = upload_dummy_file(&workdir, &storage, "upload_1", None).await?;
let target_path_1 = upload_dummy_file(&storage, "upload_1", None).await?;
assert_eq!(
storage.list().await?,
vec![target_path_1.clone()],
"Should list a single file after first upload"
);
let target_path_2 = upload_dummy_file(&workdir, &storage, "upload_2", None).await?;
let target_path_2 = upload_dummy_file(&storage, "upload_2", None).await?;
assert_eq!(
list_files_sorted(&storage).await?,
vec![target_path_1.clone(), target_path_2.clone()],
@@ -644,7 +415,7 @@ mod fs_tests {
async fn upload_file_negatives() -> anyhow::Result<()> {
let storage = create_storage()?;
let id = storage.remote_object_id(&storage.working_directory.join("dummy"))?;
let id = RemotePath::new(Path::new("dummy"))?;
let content = std::io::Cursor::new(b"12345");
// Check that you get an error if the size parameter doesn't match the actual
@@ -669,16 +440,14 @@ mod fs_tests {
}
fn create_storage() -> anyhow::Result<LocalFs> {
LocalFs::new(tempdir()?.path().to_owned(), tempdir()?.path().to_owned())
LocalFs::new(tempdir()?.path().to_owned())
}
#[tokio::test]
async fn download_file() -> anyhow::Result<()> {
let workdir = tempdir()?.path().to_owned();
let storage = create_storage()?;
let upload_name = "upload_1";
let upload_target = upload_dummy_file(&workdir, &storage, upload_name, None).await?;
let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
let contents = read_and_assert_remote_file_contents(&storage, &upload_target, None).await?;
assert_eq!(
@@ -688,7 +457,7 @@ mod fs_tests {
);
let non_existing_path = "somewhere/else";
match storage.download(&RemoteObjectId(non_existing_path.to_string())).await {
match storage.download(&RemotePath::new(Path::new(non_existing_path))?).await {
Err(DownloadError::NotFound) => {} // Should get NotFound for non existing keys
other => panic!("Should get a NotFound error when downloading non-existing storage files, but got: {other:?}"),
}
@@ -697,11 +466,9 @@ mod fs_tests {
#[tokio::test]
async fn download_file_range_positive() -> anyhow::Result<()> {
let workdir = tempdir()?.path().to_owned();
let storage = create_storage()?;
let upload_name = "upload_1";
let upload_target = upload_dummy_file(&workdir, &storage, upload_name, None).await?;
let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
let full_range_download_contents =
read_and_assert_remote_file_contents(&storage, &upload_target, None).await?;
@@ -767,11 +534,9 @@ mod fs_tests {
#[tokio::test]
async fn download_file_range_negative() -> anyhow::Result<()> {
let workdir = tempdir()?.path().to_owned();
let storage = create_storage()?;
let upload_name = "upload_1";
let upload_target = upload_dummy_file(&workdir, &storage, upload_name, None).await?;
let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
let start = 1_000_000_000;
let end = start + 1;
@@ -813,11 +578,9 @@ mod fs_tests {
#[tokio::test]
async fn delete_file() -> anyhow::Result<()> {
let workdir = tempdir()?.path().to_owned();
let storage = create_storage()?;
let upload_name = "upload_1";
let upload_target = upload_dummy_file(&workdir, &storage, upload_name, None).await?;
let upload_target = upload_dummy_file(&storage, upload_name, None).await?;
storage.delete(&upload_target).await?;
assert!(storage.list().await?.is_empty());
@@ -827,7 +590,8 @@ mod fs_tests {
Err(e) => {
let error_string = e.to_string();
assert!(error_string.contains("does not exist"));
assert!(error_string.contains(&upload_target.0));
let expected_path = upload_target.with_base(&storage.storage_root);
assert!(error_string.contains(expected_path.to_str().unwrap()));
}
}
Ok(())
@@ -835,8 +599,6 @@ mod fs_tests {
#[tokio::test]
async fn file_with_metadata() -> anyhow::Result<()> {
let workdir = tempdir()?.path().to_owned();
let storage = create_storage()?;
let upload_name = "upload_1";
let metadata = StorageMetadata(HashMap::from([
@@ -844,7 +606,7 @@ mod fs_tests {
("two".to_string(), "2".to_string()),
]));
let upload_target =
upload_dummy_file(&workdir, &storage, upload_name, Some(metadata.clone())).await?;
upload_dummy_file(&storage, upload_name, Some(metadata.clone())).await?;
let full_range_download_contents =
read_and_assert_remote_file_contents(&storage, &upload_target, Some(&metadata)).await?;
@@ -884,23 +646,32 @@ mod fs_tests {
}
async fn upload_dummy_file(
workdir: &Path,
storage: &LocalFs,
name: &str,
metadata: Option<StorageMetadata>,
) -> anyhow::Result<RemoteObjectId> {
let timeline_path = workdir.join("timelines").join("some_timeline");
let relative_timeline_path = timeline_path.strip_prefix(&workdir)?;
let storage_path = storage.storage_root.join(relative_timeline_path).join(name);
let remote_object_id = RemoteObjectId(storage_path.to_str().unwrap().to_string());
let from_path = storage.working_directory.join(name);
) -> anyhow::Result<RemotePath> {
let from_path = storage
.storage_root
.join("timelines")
.join("some_timeline")
.join(name);
let (file, size) = create_file_for_upload(&from_path, &dummy_contents(name)).await?;
let relative_path = from_path
.strip_prefix(&storage.storage_root)
.context("Failed to strip storage root prefix")
.and_then(RemotePath::new)
.with_context(|| {
format!(
"Failed to resolve remote part of path {:?} for base {:?}",
from_path, storage.storage_root
)
})?;
storage
.upload(Box::new(file), size, &remote_object_id, metadata)
.upload(Box::new(file), size, &relative_path, metadata)
.await?;
remote_object_id_from_path(&storage_path)
Ok(relative_path)
}
async fn create_file_for_upload(
@@ -925,7 +696,7 @@ mod fs_tests {
format!("contents for {name}")
}
async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result<Vec<RemoteObjectId>> {
async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result<Vec<RemotePath>> {
let mut files = storage.list().await?;
files.sort_by(|a, b| a.0.cmp(&b.0));
Ok(files)

View File

@@ -5,7 +5,6 @@
//! their bucket prefixes are both specified and different.
use std::env::var;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
@@ -29,8 +28,7 @@ use tracing::debug;
use super::StorageMetadata;
use crate::{
strip_path_prefix, Download, DownloadError, RemoteObjectId, RemoteStorage, S3Config,
REMOTE_STORAGE_PREFIX_SEPARATOR,
Download, DownloadError, RemotePath, RemoteStorage, S3Config, REMOTE_STORAGE_PREFIX_SEPARATOR,
};
const DEFAULT_IMDS_TIMEOUT: Duration = Duration::from_secs(10);
@@ -100,31 +98,8 @@ pub(super) mod metrics {
}
}
fn download_destination(
id: &RemoteObjectId,
workdir: &Path,
prefix_to_strip: Option<&str>,
) -> PathBuf {
let path_without_prefix = match prefix_to_strip {
Some(prefix) => id.0.strip_prefix(prefix).unwrap_or_else(|| {
panic!(
"Could not strip prefix '{}' from S3 object key '{}'",
prefix, id.0
)
}),
None => &id.0,
};
workdir.join(
path_without_prefix
.split(REMOTE_STORAGE_PREFIX_SEPARATOR)
.collect::<PathBuf>(),
)
}
/// AWS S3 storage.
pub struct S3Bucket {
workdir: PathBuf,
client: Client,
bucket_name: String,
prefix_in_bucket: Option<String>,
@@ -142,7 +117,7 @@ struct GetObjectRequest {
}
impl S3Bucket {
/// Creates the S3 storage, errors if incorrect AWS S3 configuration provided.
pub fn new(aws_config: &S3Config, workdir: PathBuf) -> anyhow::Result<Self> {
pub fn new(aws_config: &S3Config) -> anyhow::Result<Self> {
debug!(
"Creating s3 remote storage for S3 bucket {}",
aws_config.bucket_name
@@ -196,13 +171,39 @@ impl S3Bucket {
});
Ok(Self {
client,
workdir,
bucket_name: aws_config.bucket_name.clone(),
prefix_in_bucket,
concurrency_limiter: Semaphore::new(aws_config.concurrency_limit.get()),
})
}
fn s3_object_to_relative_path(&self, key: &str) -> RemotePath {
let relative_path =
match key.strip_prefix(self.prefix_in_bucket.as_deref().unwrap_or_default()) {
Some(stripped) => stripped,
// we rely on AWS to return properly prefixed paths
// for requests with a certain prefix
None => panic!(
"Key {} does not start with bucket prefix {:?}",
key, self.prefix_in_bucket
),
};
RemotePath(
relative_path
.split(REMOTE_STORAGE_PREFIX_SEPARATOR)
.collect(),
)
}
fn relative_path_to_s3_object(&self, path: &RemotePath) -> String {
let mut full_path = self.prefix_in_bucket.clone().unwrap_or_default();
for segment in path.0.iter() {
full_path.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
full_path.push_str(segment.to_str().unwrap_or_default());
}
full_path
}
async fn download_object(&self, request: GetObjectRequest) -> Result<Download, DownloadError> {
let _guard = self
.concurrency_limiter
@@ -252,25 +253,7 @@ impl S3Bucket {
#[async_trait::async_trait]
impl RemoteStorage for S3Bucket {
fn remote_object_id(&self, local_path: &Path) -> anyhow::Result<RemoteObjectId> {
let relative_path = strip_path_prefix(&self.workdir, local_path)?;
let mut key = self.prefix_in_bucket.clone().unwrap_or_default();
for segment in relative_path {
key.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
key.push_str(&segment.to_string_lossy());
}
Ok(RemoteObjectId(key))
}
fn local_path(&self, storage_path: &RemoteObjectId) -> anyhow::Result<PathBuf> {
Ok(download_destination(
storage_path,
&self.workdir,
self.prefix_in_bucket.as_deref(),
))
}
async fn list(&self) -> anyhow::Result<Vec<RemoteObjectId>> {
async fn list(&self) -> anyhow::Result<Vec<RemotePath>> {
let mut document_keys = Vec::new();
let mut continuation_token = None;
@@ -300,7 +283,7 @@ impl RemoteStorage for S3Bucket {
.contents
.unwrap_or_default()
.into_iter()
.filter_map(|o| Some(RemoteObjectId(o.key?))),
.filter_map(|o| Some(self.s3_object_to_relative_path(o.key()?))),
);
match fetch_response.continuation_token {
@@ -314,13 +297,10 @@ impl RemoteStorage for S3Bucket {
/// See the doc for `RemoteStorage::list_prefixes`
/// Note: it wont include empty "directories"
async fn list_prefixes(
&self,
prefix: Option<&RemoteObjectId>,
) -> anyhow::Result<Vec<RemoteObjectId>> {
async fn list_prefixes(&self, prefix: Option<&RemotePath>) -> anyhow::Result<Vec<RemotePath>> {
// get the passed prefix or if it is not set use prefix_in_bucket value
let list_prefix = prefix
.map(|p| p.0.clone())
.map(|p| self.relative_path_to_s3_object(p))
.or_else(|| self.prefix_in_bucket.clone())
.map(|mut p| {
// required to end with a separator
@@ -362,7 +342,7 @@ impl RemoteStorage for S3Bucket {
.common_prefixes
.unwrap_or_default()
.into_iter()
.filter_map(|o| Some(RemoteObjectId(o.prefix?))),
.filter_map(|o| Some(self.s3_object_to_relative_path(o.prefix()?))),
);
match fetch_response.continuation_token {
@@ -378,7 +358,7 @@ impl RemoteStorage for S3Bucket {
&self,
from: Box<(dyn io::AsyncRead + Unpin + Send + Sync + 'static)>,
from_size_bytes: usize,
to: &RemoteObjectId,
to: &RemotePath,
metadata: Option<StorageMetadata>,
) -> anyhow::Result<()> {
let _guard = self
@@ -395,7 +375,7 @@ impl RemoteStorage for S3Bucket {
self.client
.put_object()
.bucket(self.bucket_name.clone())
.key(to.0.to_owned())
.key(self.relative_path_to_s3_object(to))
.set_metadata(metadata.map(|m| m.0))
.content_length(from_size_bytes.try_into()?)
.body(bytes_stream)
@@ -408,10 +388,10 @@ impl RemoteStorage for S3Bucket {
Ok(())
}
async fn download(&self, from: &RemoteObjectId) -> Result<Download, DownloadError> {
async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
self.download_object(GetObjectRequest {
bucket: self.bucket_name.clone(),
key: from.0.to_owned(),
key: self.relative_path_to_s3_object(from),
..GetObjectRequest::default()
})
.await
@@ -419,7 +399,7 @@ impl RemoteStorage for S3Bucket {
async fn download_byte_range(
&self,
from: &RemoteObjectId,
from: &RemotePath,
start_inclusive: u64,
end_exclusive: Option<u64>,
) -> Result<Download, DownloadError> {
@@ -427,19 +407,19 @@ impl RemoteStorage for S3Bucket {
// and needs both ends to be exclusive
let end_inclusive = end_exclusive.map(|end| end.saturating_sub(1));
let range = Some(match end_inclusive {
Some(end_inclusive) => format!("bytes={}-{}", start_inclusive, end_inclusive),
None => format!("bytes={}-", start_inclusive),
Some(end_inclusive) => format!("bytes={start_inclusive}-{end_inclusive}"),
None => format!("bytes={start_inclusive}-"),
});
self.download_object(GetObjectRequest {
bucket: self.bucket_name.clone(),
key: from.0.to_owned(),
key: self.relative_path_to_s3_object(from),
range,
})
.await
}
async fn delete(&self, remote_object_id: &RemoteObjectId) -> anyhow::Result<()> {
async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
let _guard = self
.concurrency_limiter
.acquire()
@@ -451,7 +431,7 @@ impl RemoteStorage for S3Bucket {
self.client
.delete_object()
.bucket(self.bucket_name.clone())
.key(remote_object_id.0.to_owned())
.key(self.relative_path_to_s3_object(path))
.send()
.await
.map_err(|e| {
@@ -461,181 +441,3 @@ impl RemoteStorage for S3Bucket {
Ok(())
}
}
#[cfg(test)]
mod tests {
use tempfile::tempdir;
use super::*;
#[test]
fn test_download_destination() -> anyhow::Result<()> {
let workdir = tempdir()?.path().to_owned();
let local_path = workdir.join("one").join("two").join("test_name");
let relative_path = local_path.strip_prefix(&workdir)?;
let key = RemoteObjectId(format!(
"{}{}",
REMOTE_STORAGE_PREFIX_SEPARATOR,
relative_path
.iter()
.map(|segment| segment.to_str().unwrap())
.collect::<Vec<_>>()
.join(&REMOTE_STORAGE_PREFIX_SEPARATOR.to_string()),
));
assert_eq!(
local_path,
download_destination(&key, &workdir, None),
"Download destination should consist of s3 path joined with the workdir prefix"
);
Ok(())
}
#[test]
fn storage_path_positive() -> anyhow::Result<()> {
let workdir = tempdir()?.path().to_owned();
let segment_1 = "matching";
let segment_2 = "file";
let local_path = &workdir.join(segment_1).join(segment_2);
let storage = dummy_storage(workdir);
let expected_key = RemoteObjectId(format!(
"{}{REMOTE_STORAGE_PREFIX_SEPARATOR}{segment_1}{REMOTE_STORAGE_PREFIX_SEPARATOR}{segment_2}",
storage.prefix_in_bucket.as_deref().unwrap_or_default(),
));
let actual_key = storage
.remote_object_id(local_path)
.expect("Matching path should map to S3 path normally");
assert_eq!(
expected_key,
actual_key,
"S3 key from the matching path should contain all segments after the workspace prefix, separated with S3 separator"
);
Ok(())
}
#[test]
fn storage_path_negatives() -> anyhow::Result<()> {
#[track_caller]
fn storage_path_error(storage: &S3Bucket, mismatching_path: &Path) -> String {
match storage.remote_object_id(mismatching_path) {
Ok(wrong_key) => panic!(
"Expected path '{}' to error, but got S3 key: {:?}",
mismatching_path.display(),
wrong_key,
),
Err(e) => e.to_string(),
}
}
let workdir = tempdir()?.path().to_owned();
let storage = dummy_storage(workdir.clone());
let error_message = storage_path_error(&storage, &workdir);
assert!(
error_message.contains("Prefix and the path are equal"),
"Message '{}' does not contain the required string",
error_message
);
let mismatching_path = PathBuf::from("somewhere").join("else");
let error_message = storage_path_error(&storage, &mismatching_path);
assert!(
error_message.contains(mismatching_path.to_str().unwrap()),
"Error should mention wrong path"
);
assert!(
error_message.contains(workdir.to_str().unwrap()),
"Error should mention server workdir"
);
assert!(
error_message.contains("is not prefixed with"),
"Message '{}' does not contain a required string",
error_message
);
Ok(())
}
#[test]
fn local_path_positive() -> anyhow::Result<()> {
let workdir = tempdir()?.path().to_owned();
let storage = dummy_storage(workdir.clone());
let timeline_dir = workdir.join("timelines").join("test_timeline");
let relative_timeline_path = timeline_dir.strip_prefix(&workdir)?;
let s3_key = create_s3_key(
&relative_timeline_path.join("not a metadata"),
storage.prefix_in_bucket.as_deref(),
);
assert_eq!(
download_destination(&s3_key, &workdir, storage.prefix_in_bucket.as_deref()),
storage
.local_path(&s3_key)
.expect("For a valid input, valid S3 info should be parsed"),
"Should be able to parse metadata out of the correctly named remote delta file"
);
let s3_key = create_s3_key(
&relative_timeline_path.join("metadata"),
storage.prefix_in_bucket.as_deref(),
);
assert_eq!(
download_destination(&s3_key, &workdir, storage.prefix_in_bucket.as_deref()),
storage
.local_path(&s3_key)
.expect("For a valid input, valid S3 info should be parsed"),
"Should be able to parse metadata out of the correctly named remote metadata file"
);
Ok(())
}
#[test]
fn download_destination_matches_original_path() -> anyhow::Result<()> {
let workdir = tempdir()?.path().to_owned();
let original_path = workdir
.join("timelines")
.join("some_timeline")
.join("some name");
let dummy_storage = dummy_storage(workdir);
let key = dummy_storage.remote_object_id(&original_path)?;
let download_destination = dummy_storage.local_path(&key)?;
assert_eq!(
original_path, download_destination,
"'original path -> storage key -> matching fs path' transformation should produce the same path as the input one for the correct path"
);
Ok(())
}
fn dummy_storage(workdir: PathBuf) -> S3Bucket {
S3Bucket {
workdir,
client: Client::new(&aws_config::SdkConfig::builder().build()),
bucket_name: "dummy-bucket".to_string(),
prefix_in_bucket: Some("dummy_prefix/".to_string()),
concurrency_limiter: Semaphore::new(1),
}
}
fn create_s3_key(relative_file_path: &Path, prefix: Option<&str>) -> RemoteObjectId {
RemoteObjectId(relative_file_path.iter().fold(
prefix.unwrap_or_default().to_string(),
|mut path_string, segment| {
path_string.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
path_string.push_str(segment.to_str().unwrap());
path_string
},
))
}
}

View File

@@ -280,9 +280,7 @@ fn start_pageserver(conf: &'static PageServerConf) -> anyhow::Result<()> {
let remote_storage = conf
.remote_storage_config
.as_ref()
.map(|storage_config| {
GenericRemoteStorage::from_config(conf.workdir.clone(), storage_config)
})
.map(GenericRemoteStorage::from_config)
.transpose()
.context("Failed to init generic remote storage")?;

View File

@@ -5,7 +5,7 @@
//! See also `settings.md` for better description on every parameter.
use anyhow::{anyhow, bail, ensure, Context, Result};
use remote_storage::RemoteStorageConfig;
use remote_storage::{RemotePath, RemoteStorageConfig};
use std::env;
use utils::crashsafe::path_with_suffix_extension;
use utils::id::ConnectionId;
@@ -454,6 +454,28 @@ impl PageServerConf {
.join(METADATA_FILE_NAME)
}
/// Files on the remote storage are stored with paths, relative to the workdir.
/// That path includes in itself both tenant and timeline ids, allowing to have a unique remote storage path.
///
/// Errors if the path provided does not start from pageserver's workdir.
pub fn remote_path(&self, local_path: &Path) -> anyhow::Result<RemotePath> {
local_path
.strip_prefix(&self.workdir)
.context("Failed to strip workdir prefix")
.and_then(RemotePath::new)
.with_context(|| {
format!(
"Failed to resolve remote part of path {:?} for base {:?}",
local_path, self.workdir
)
})
}
/// Turns storage remote path of a file into its local path.
pub fn local_path(&self, remote_path: &RemotePath) -> PathBuf {
remote_path.with_base(&self.workdir)
}
//
// Postgres distribution paths
//

View File

@@ -202,7 +202,7 @@ use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex};
use anyhow::ensure;
use remote_storage::{DownloadError, GenericRemoteStorage};
use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath};
use tokio::runtime::Runtime;
use tracing::{info, warn};
use tracing::{info_span, Instrument};
@@ -217,7 +217,7 @@ use crate::metrics::RemoteOpKind;
use crate::metrics::REMOTE_UPLOAD_QUEUE_UNFINISHED_TASKS;
use crate::{
config::PageServerConf,
storage_sync::index::{LayerFileMetadata, RemotePath},
storage_sync::index::LayerFileMetadata,
task_mgr,
task_mgr::TaskKind,
task_mgr::BACKGROUND_RUNTIME,
@@ -337,18 +337,18 @@ impl UploadQueue {
let state = UploadQueueInitialized {
// As described in the doc comment, it's ok for `latest_files` and `latest_metadata` to be ahead.
latest_files: Default::default(),
latest_files: HashMap::new(),
latest_metadata: metadata.clone(),
// We haven't uploaded anything yet, so, `last_uploaded_consistent_lsn` must be 0 to prevent
// safekeepers from garbage-collecting anything.
last_uploaded_consistent_lsn: Lsn(0),
// what follows are boring default initializations
task_counter: Default::default(),
task_counter: 0,
num_inprogress_layer_uploads: 0,
num_inprogress_metadata_uploads: 0,
num_inprogress_deletions: 0,
inprogress_tasks: Default::default(),
queued_operations: Default::default(),
inprogress_tasks: HashMap::new(),
queued_operations: VecDeque::new(),
};
*self = UploadQueue::Initialized(state);
@@ -357,6 +357,10 @@ impl UploadQueue {
fn initialize_with_current_remote_index_part(
&mut self,
conf: &'static PageServerConf,
tenant_id: TenantId,
timeline_id: TimelineId,
index_part: &IndexPart,
) -> anyhow::Result<&mut UploadQueueInitialized> {
match self {
@@ -366,14 +370,19 @@ impl UploadQueue {
}
}
let mut files = HashMap::new();
for path in &index_part.timeline_layers {
let mut files = HashMap::with_capacity(index_part.timeline_layers.len());
let timeline_path = conf.timeline_path(&timeline_id, &tenant_id);
for timeline_name in &index_part.timeline_layers {
let local_path = timeline_path.join(timeline_name);
let remote_timeline_path = conf.remote_path(&local_path).expect(
"Remote timeline path and local timeline path were constructed form the same conf",
);
let layer_metadata = index_part
.layer_metadata
.get(path)
.get(timeline_name)
.map(LayerFileMetadata::from)
.unwrap_or(LayerFileMetadata::MISSING);
files.insert(path.clone(), layer_metadata);
files.insert(remote_timeline_path, layer_metadata);
}
let index_part_metadata = index_part.parse_metadata()?;
@@ -391,8 +400,8 @@ impl UploadQueue {
num_inprogress_layer_uploads: 0,
num_inprogress_metadata_uploads: 0,
num_inprogress_deletions: 0,
inprogress_tasks: Default::default(),
queued_operations: Default::default(),
inprogress_tasks: HashMap::new(),
queued_operations: VecDeque::new(),
};
*self = UploadQueue::Initialized(state);
@@ -456,7 +465,12 @@ impl RemoteTimelineClient {
/// The given `index_part` must be the one on the remote.
pub fn init_upload_queue(&self, index_part: &IndexPart) -> anyhow::Result<()> {
let mut upload_queue = self.upload_queue.lock().unwrap();
upload_queue.initialize_with_current_remote_index_part(index_part)?;
upload_queue.initialize_with_current_remote_index_part(
self.conf,
self.tenant_id,
self.timeline_id,
index_part,
)?;
Ok(())
}
@@ -510,15 +524,13 @@ impl RemoteTimelineClient {
/// On success, returns the size of the downloaded file.
pub async fn download_layer_file(
&self,
path: &RemotePath,
remote_path: &RemotePath,
layer_metadata: &LayerFileMetadata,
) -> anyhow::Result<u64> {
let downloaded_size = download::download_layer_file(
self.conf,
&self.storage_impl,
self.tenant_id,
self.timeline_id,
path,
remote_path,
layer_metadata,
)
.measure_remote_op(
@@ -536,13 +548,13 @@ impl RemoteTimelineClient {
let new_metadata = LayerFileMetadata::new(downloaded_size);
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
if let Some(upgraded) = upload_queue.latest_files.get_mut(path) {
if let Some(upgraded) = upload_queue.latest_files.get_mut(remote_path) {
upgraded.merge(&new_metadata);
} else {
// The file should exist, since we just downloaded it.
warn!(
"downloaded file {:?} not found in local copy of the index file",
path
remote_path
);
}
}
@@ -612,14 +624,9 @@ impl RemoteTimelineClient {
"file size not initialized in metadata"
);
let relative_path = RemotePath::strip_base_path(
&self.conf.timeline_path(&self.timeline_id, &self.tenant_id),
path,
)?;
upload_queue
.latest_files
.insert(relative_path, layer_metadata.clone());
.insert(self.conf.remote_path(path)?, layer_metadata.clone());
let op = UploadOp::UploadLayer(PathBuf::from(path), layer_metadata.clone());
self.update_upload_queue_unfinished_metric(1, &op);
@@ -641,13 +648,10 @@ impl RemoteTimelineClient {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
// Convert the paths into RelativePaths, and gather other information we need.
let mut relative_paths = Vec::with_capacity(paths.len());
// Convert the paths into RemotePaths, and gather other information we need.
let mut remote_paths = Vec::with_capacity(paths.len());
for path in paths {
relative_paths.push(RemotePath::strip_base_path(
&self.conf.timeline_path(&self.timeline_id, &self.tenant_id),
path,
)?);
remote_paths.push(self.conf.remote_path(path)?);
}
// Deleting layers doesn't affect the values stored in TimelineMetadata,
@@ -663,8 +667,8 @@ impl RemoteTimelineClient {
// from latest_files, but not yet scheduled for deletion. Use a closure
// to syntactically forbid ? or bail! calls here.
let no_bail_here = || {
for relative_path in relative_paths {
upload_queue.latest_files.remove(&relative_path);
for remote_path in remote_paths {
upload_queue.latest_files.remove(&remote_path);
}
let index_part = IndexPart::new(
@@ -838,14 +842,19 @@ impl RemoteTimelineClient {
let upload_result: anyhow::Result<()> = match &task.op {
UploadOp::UploadLayer(ref path, ref layer_metadata) => {
upload::upload_timeline_layer(&self.storage_impl, path, layer_metadata)
.measure_remote_op(
self.tenant_id,
self.timeline_id,
RemoteOpFileKind::Layer,
RemoteOpKind::Upload,
)
.await
upload::upload_timeline_layer(
self.conf,
&self.storage_impl,
path,
layer_metadata,
)
.measure_remote_op(
self.tenant_id,
self.timeline_id,
RemoteOpFileKind::Layer,
RemoteOpKind::Upload,
)
.await
}
UploadOp::UploadMetadata(ref index_part, _lsn) => {
upload::upload_index_part(
@@ -864,7 +873,7 @@ impl RemoteTimelineClient {
.await
}
UploadOp::Delete(metric_file_kind, ref path) => {
delete::delete_layer(&self.storage_impl, path)
delete::delete_layer(self.conf, &self.storage_impl, path)
.measure_remote_op(
self.tenant_id,
self.timeline_id,
@@ -1093,15 +1102,11 @@ mod tests {
TimelineMetadata::from_bytes(&metadata.to_bytes().unwrap()).unwrap()
}
fn assert_file_list(a: &HashSet<RemotePath>, b: &[&str]) {
let xx = PathBuf::from("");
let mut avec: Vec<String> = a
.iter()
.map(|x| x.to_local_path(&xx).to_string_lossy().into())
.collect();
fn assert_file_list(a: &HashSet<String>, b: &[&str]) {
let mut avec: Vec<&str> = a.iter().map(|a| a.as_str()).collect();
avec.sort();
let mut bvec = b.to_owned();
let mut bvec = b.to_vec();
bvec.sort_unstable();
assert_eq!(avec, bvec);
@@ -1169,8 +1174,7 @@ mod tests {
println!("workdir: {}", harness.conf.workdir.display());
let storage_impl =
GenericRemoteStorage::from_config(harness.conf.workdir.clone(), &storage_config)?;
let storage_impl = GenericRemoteStorage::from_config(&storage_config)?;
let client = Arc::new(RemoteTimelineClient {
conf: harness.conf,
runtime,

View File

@@ -5,34 +5,24 @@ use tracing::debug;
use remote_storage::GenericRemoteStorage;
pub(super) async fn delete_layer(
storage: &GenericRemoteStorage,
local_layer_path: &Path,
use crate::config::PageServerConf;
pub(super) async fn delete_layer<'a>(
conf: &'static PageServerConf,
storage: &'a GenericRemoteStorage,
local_layer_path: &'a Path,
) -> anyhow::Result<()> {
fail::fail_point!("before-delete-layer", |_| {
anyhow::bail!("failpoint before-delete-layer")
});
debug!(
"Deleting layer from remote storage: {:?}",
local_layer_path.display()
);
debug!("Deleting layer from remote storage: {local_layer_path:?}",);
let storage_path = storage
.remote_object_id(local_layer_path)
.with_context(|| {
format!(
"Failed to get the layer storage path for local path '{}'",
local_layer_path.display()
)
})?;
let path_to_delete = conf.remote_path(local_layer_path)?;
// XXX: If the deletion fails because the object already didn't exist,
// it would be good to just issue a warning but consider it success.
// https://github.com/neondatabase/neon/issues/2934
storage.delete(&storage_path).await.with_context(|| {
format!(
"Failed to delete remote layer from storage at '{:?}'",
storage_path
)
storage.delete(&path_to_delete).await.with_context(|| {
format!("Failed to delete remote layer from storage at {path_to_delete:?}")
})
}

View File

@@ -10,12 +10,11 @@ use tracing::debug;
use crate::config::PageServerConf;
use crate::storage_sync::index::LayerFileMetadata;
use remote_storage::{DownloadError, GenericRemoteStorage};
use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath};
use utils::crashsafe::path_with_suffix_extension;
use utils::id::{TenantId, TimelineId};
use super::index::IndexPart;
use super::RemotePath;
async fn fsync_path(path: impl AsRef<std::path::Path>) -> Result<(), std::io::Error> {
fs::File::open(path).await?.sync_all().await
@@ -29,21 +28,10 @@ async fn fsync_path(path: impl AsRef<std::path::Path>) -> Result<(), std::io::Er
pub async fn download_layer_file<'a>(
conf: &'static PageServerConf,
storage: &'a GenericRemoteStorage,
tenant_id: TenantId,
timeline_id: TimelineId,
path: &'a RemotePath,
remote_path: &'a RemotePath,
layer_metadata: &'a LayerFileMetadata,
) -> anyhow::Result<u64> {
let timeline_path = conf.timeline_path(&timeline_id, &tenant_id);
let local_path = path.to_local_path(&timeline_path);
let layer_storage_path = storage.remote_object_id(&local_path).with_context(|| {
format!(
"Failed to get the layer storage path for local path '{}'",
local_path.display()
)
})?;
let local_path = conf.local_path(remote_path);
// Perform a rename inspired by durable_rename from file_utils.c.
// The sequence:
@@ -64,19 +52,14 @@ pub async fn download_layer_file<'a>(
temp_file_path.display()
)
})?;
let mut download = storage
.download(&layer_storage_path)
.await
.with_context(|| {
format!(
"Failed to open a download stream for layer with remote storage path '{layer_storage_path:?}'"
)
})?;
let bytes_amount = tokio::io::copy(&mut download.download_stream, &mut destination_file).await.with_context(|| {
let mut download = storage.download(remote_path).await.with_context(|| {
format!(
"Failed to download layer with remote storage path '{layer_storage_path:?}' into file '{}'", temp_file_path.display()
"Failed to open a download stream for layer with remote storage path '{remote_path:?}'"
)
})?;
let bytes_amount = tokio::io::copy(&mut download.download_stream, &mut destination_file).await.with_context(|| {
format!("Failed to download layer with remote storage path '{remote_path:?}' into file {temp_file_path:?}")
})?;
// Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
// A file will not be closed immediately when it goes out of scope if there are any IO operations
@@ -151,12 +134,7 @@ pub async fn list_remote_timelines<'a>(
tenant_id: TenantId,
) -> anyhow::Result<Vec<(TimelineId, IndexPart)>> {
let tenant_path = conf.timelines_path(&tenant_id);
let tenant_storage_path = storage.remote_object_id(&tenant_path).with_context(|| {
format!(
"Failed to get tenant storage path for local path '{}'",
tenant_path.display()
)
})?;
let tenant_storage_path = conf.remote_path(&tenant_path)?;
let timelines = storage
.list_prefixes(Some(&tenant_storage_path))
@@ -218,14 +196,8 @@ pub async fn download_index_part(
let index_part_path = conf
.metadata_path(timeline_id, tenant_id)
.with_file_name(IndexPart::FILE_NAME);
let part_storage_path = storage
.remote_object_id(&index_part_path)
.with_context(|| {
format!(
"Failed to get the index part storage path for local path '{}'",
index_part_path.display()
)
})
let part_storage_path = conf
.remote_path(&index_part_path)
.map_err(DownloadError::BadInput)?;
let mut index_part_download = storage.download(&part_storage_path).await?;
@@ -236,20 +208,12 @@ pub async fn download_index_part(
&mut index_part_bytes,
)
.await
.with_context(|| {
format!(
"Failed to download an index part into file '{}'",
index_part_path.display()
)
})
.with_context(|| format!("Failed to download an index part into file {index_part_path:?}"))
.map_err(DownloadError::Other)?;
let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
.with_context(|| {
format!(
"Failed to deserialize index part file into file '{}'",
index_part_path.display()
)
format!("Failed to deserialize index part file into file {index_part_path:?}")
})
.map_err(DownloadError::Other)?;

View File

@@ -2,12 +2,9 @@
//! Able to restore itself from the storage index parts, that are located in every timeline's remote directory and contain all data about
//! remote timeline layers and its metadata.
use std::{
collections::{HashMap, HashSet},
path::{Path, PathBuf},
};
use std::collections::{HashMap, HashSet};
use anyhow::{Context, Ok};
use remote_storage::RemotePath;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
@@ -15,34 +12,6 @@ use crate::tenant::metadata::TimelineMetadata;
use utils::lsn::Lsn;
/// Path on the remote storage, relative to some inner prefix.
/// The prefix is an implementation detail, that allows representing local paths
/// as the remote ones, stripping the local storage prefix away.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
#[serde(transparent)]
pub struct RemotePath(PathBuf);
impl RemotePath {
pub fn new(relative_path: &Path) -> Self {
debug_assert!(
relative_path.is_relative(),
"Path {relative_path:?} is not relative"
);
Self(relative_path.to_path_buf())
}
pub fn strip_base_path(base_path: &Path, full_path: &Path) -> anyhow::Result<Self> {
let relative = full_path.strip_prefix(base_path).with_context(|| {
format!("path {full_path:?} is not relative to base {base_path:?}",)
})?;
Ok(Self::new(relative))
}
pub fn to_local_path(&self, base_path: &Path) -> PathBuf {
base_path.join(&self.0)
}
}
/// Metadata gathered for each of the layer files.
///
/// Fields have to be `Option`s because remote [`IndexPart`]'s can be from different version, which
@@ -101,19 +70,19 @@ pub struct IndexPart {
/// Layer names, which are stored on the remote storage.
///
/// Additional metadata can might exist in `layer_metadata`.
pub timeline_layers: HashSet<RemotePath>,
pub timeline_layers: HashSet<String>,
/// FIXME: unused field. This should be removed, but that changes the on-disk format,
/// so we need to make sure we're backwards- (and maybe forwards-) compatible
/// so we need to make sure we're backwards-` (and maybe forwards-) compatible
/// First pass is to move it to Optional and the next would be its removal
missing_layers: Option<HashSet<RemotePath>>,
missing_layers: Option<HashSet<String>>,
/// Per layer file name metadata, which can be present for a present or missing layer file.
///
/// Older versions of `IndexPart` will not have this property or have only a part of metadata
/// that latest version stores.
#[serde(default)]
pub layer_metadata: HashMap<RemotePath, IndexLayerMetadata>,
pub layer_metadata: HashMap<String, IndexLayerMetadata>,
// 'disk_consistent_lsn' is a copy of the 'disk_consistent_lsn' in the metadata.
// It's duplicated here for convenience.
@@ -135,14 +104,20 @@ impl IndexPart {
disk_consistent_lsn: Lsn,
metadata_bytes: Vec<u8>,
) -> Self {
let mut timeline_layers = HashSet::new();
let mut layer_metadata = HashMap::new();
let mut timeline_layers = HashSet::with_capacity(layers_and_metadata.len());
let mut layer_metadata = HashMap::with_capacity(layers_and_metadata.len());
separate_paths_and_metadata(
&layers_and_metadata,
&mut timeline_layers,
&mut layer_metadata,
);
for (remote_path, metadata) in &layers_and_metadata {
let metadata = IndexLayerMetadata::from(metadata);
match remote_path.object_name() {
Some(layer_name) => {
timeline_layers.insert(layer_name.to_owned());
layer_metadata.insert(layer_name.to_owned(), metadata);
}
// TODO move this on a type level: we know, that every layer entry does have a name
None => panic!("Layer {remote_path:?} has no file name, skipping"),
}
}
Self {
version: Self::LATEST_VERSION,
@@ -173,18 +148,6 @@ impl From<&'_ LayerFileMetadata> for IndexLayerMetadata {
}
}
fn separate_paths_and_metadata(
input: &HashMap<RemotePath, LayerFileMetadata>,
output: &mut HashSet<RemotePath>,
layer_metadata: &mut HashMap<RemotePath, IndexLayerMetadata>,
) {
for (path, metadata) in input {
let metadata = IndexLayerMetadata::from(metadata);
layer_metadata.insert(path.clone(), metadata);
output.insert(path.clone());
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -200,8 +163,8 @@ mod tests {
let expected = IndexPart {
version: 0,
timeline_layers: HashSet::from([RemotePath(PathBuf::from("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9"))]),
missing_layers: Some(HashSet::from([RemotePath(PathBuf::from("not_a_real_layer_but_adding_coverage"))])),
timeline_layers: HashSet::from([String::from("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9")]),
missing_layers: Some(HashSet::from([String::from("not_a_real_layer_but_adding_coverage")])),
layer_metadata: HashMap::default(),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
metadata_bytes: [113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0].to_vec(),
@@ -228,13 +191,13 @@ mod tests {
let expected = IndexPart {
// note this is not verified, could be anything, but exists for humans debugging.. could be the git version instead?
version: 1,
timeline_layers: HashSet::from([RemotePath(PathBuf::from("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9"))]),
missing_layers: Some(HashSet::from([RemotePath(PathBuf::from("not_a_real_layer_but_adding_coverage"))])),
timeline_layers: HashSet::from([String::from("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9")]),
missing_layers: Some(HashSet::from([String::from("not_a_real_layer_but_adding_coverage")])),
layer_metadata: HashMap::from([
(RemotePath(PathBuf::from("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9")), IndexLayerMetadata {
(String::from("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9"), IndexLayerMetadata {
file_size: Some(25600000),
}),
(RemotePath(PathBuf::from("not_a_real_layer_but_adding_coverage")), IndexLayerMetadata {
(String::from("not_a_real_layer_but_adding_coverage"), IndexLayerMetadata {
// serde_json should always parse this but this might be a double with jq for
// example.
file_size: Some(9007199254741001),
@@ -264,20 +227,26 @@ mod tests {
let expected = IndexPart {
// note this is not verified, could be anything, but exists for humans debugging.. could be the git version instead?
version: 1,
timeline_layers: [RemotePath(PathBuf::from("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9"))].into_iter().collect(),
timeline_layers: HashSet::from(["000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".to_string()]),
layer_metadata: HashMap::from([
(RemotePath(PathBuf::from("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9")), IndexLayerMetadata {
file_size: Some(25600000),
}),
(RemotePath(PathBuf::from("not_a_real_layer_but_adding_coverage")), IndexLayerMetadata {
// serde_json should always parse this but this might be a double with jq for
// example.
file_size: Some(9007199254741001),
})
(
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".to_string(),
IndexLayerMetadata {
file_size: Some(25600000),
}
),
(
"not_a_real_layer_but_adding_coverage".to_string(),
IndexLayerMetadata {
// serde_json should always parse this but this might be a double with jq for
// example.
file_size: Some(9007199254741001),
}
)
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
metadata_bytes: [112,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0].to_vec(),
missing_layers: None::<HashSet<RemotePath>>,
missing_layers: None,
};
let part = serde_json::from_str::<IndexPart>(example).unwrap();

View File

@@ -30,12 +30,9 @@ pub(super) async fn upload_index_part<'a>(
let index_part_path = conf
.metadata_path(timeline_id, tenant_id)
.with_file_name(IndexPart::FILE_NAME);
let storage_path = conf.remote_path(&index_part_path)?;
storage
.upload_storage_object(
Box::new(index_part_bytes),
index_part_size,
&index_part_path,
)
.upload_storage_object(Box::new(index_part_bytes), index_part_size, &storage_path)
.await
.with_context(|| format!("Failed to upload index part for '{tenant_id} / {timeline_id}'"))
}
@@ -44,36 +41,26 @@ pub(super) async fn upload_index_part<'a>(
/// No extra checks for overlapping files is made and any files that are already present remotely will be overwritten, if submitted during the upload.
///
/// On an error, bumps the retries count and reschedules the entire task.
pub(super) async fn upload_timeline_layer(
storage: &GenericRemoteStorage,
source_path: &Path,
known_metadata: &LayerFileMetadata,
pub(super) async fn upload_timeline_layer<'a>(
conf: &'static PageServerConf,
storage: &'a GenericRemoteStorage,
source_path: &'a Path,
known_metadata: &'a LayerFileMetadata,
) -> anyhow::Result<()> {
fail_point!("before-upload-layer", |_| {
bail!("failpoint before-upload-layer")
});
let storage_path = storage.remote_object_id(source_path).with_context(|| {
format!(
"Failed to get the layer storage path for local path '{}'",
source_path.display()
)
})?;
let storage_path = conf.remote_path(source_path)?;
let source_file = fs::File::open(&source_path).await.with_context(|| {
format!(
"Failed to open a source file for layer '{}'",
source_path.display()
)
})?;
let source_file = fs::File::open(&source_path)
.await
.with_context(|| format!("Failed to open a source file for layer {source_path:?}"))?;
let fs_size = source_file
.metadata()
.await
.with_context(|| {
format!(
"Failed to get the source file metadata for layer '{}'",
source_path.display()
)
format!("Failed to get the source file metadata for layer {source_path:?}")
})?
.len();

View File

@@ -19,7 +19,7 @@ use std::sync::atomic::{AtomicBool, AtomicI64, Ordering as AtomicOrdering};
use std::sync::{Arc, Mutex, MutexGuard, RwLock};
use std::time::{Duration, Instant, SystemTime};
use crate::storage_sync::index::{IndexPart, RemotePath};
use crate::storage_sync::index::IndexPart;
use crate::storage_sync::RemoteTimelineClient;
use crate::tenant::{
delta_layer::{DeltaLayer, DeltaLayerWriter},
@@ -999,55 +999,9 @@ impl Timeline {
&self,
index_part: &IndexPart,
remote_client: &RemoteTimelineClient,
mut local_filenames: HashSet<PathBuf>,
local_layers: HashSet<PathBuf>,
up_to_date_disk_consistent_lsn: Lsn,
) -> anyhow::Result<HashSet<PathBuf>> {
let mut remote_filenames: HashSet<PathBuf> = HashSet::new();
for fname in index_part.timeline_layers.iter() {
remote_filenames.insert(fname.to_local_path(&PathBuf::from("")));
}
// Are there any local files that exist, with a size that doesn't match
// with the size stored in the remote index file?
// If so, rename_to_backup those files so that we re-download them later.
local_filenames.retain(|path| {
let layer_metadata = index_part
.layer_metadata
.get(&RemotePath::new(path))
.map(LayerFileMetadata::from)
.unwrap_or(LayerFileMetadata::MISSING);
if let Some(remote_size) = layer_metadata.file_size() {
let local_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id).join(&path);
match local_path.metadata() {
Ok(metadata) => {
let local_size = metadata.len();
if local_size != remote_size {
warn!("removing local file \"{}\" because it has unexpected length {}; length in remote index is {}",
path.display(),
local_size,
remote_size);
if let Err(err) = rename_to_backup(&local_path) {
error!("could not rename file \"{}\": {:?}",
local_path.display(), err);
}
self.metrics.current_physical_size_gauge.sub(local_size);
false
} else {
true
}
}
Err(err) => {
error!("could not get size of local file \"{}\": {:?}", path.display(), err);
true
}
}
} else {
true
}
});
// Are we missing some files that are present in remote storage?
// Download them now.
// TODO Downloading many files this way is not efficient.
@@ -1056,17 +1010,63 @@ impl Timeline {
// b) typical case now is that there is nothing to sync, this downloads a lot
// 1) if there was another pageserver that came and generated new files
// 2) during attach of a timeline with big history which we currently do not do
for path in remote_filenames.difference(&local_filenames) {
let fname = path.to_str().unwrap();
info!("remote layer file {fname} does not exist locally");
let mut local_only_layers = local_layers;
let timeline_dir = self.conf.timeline_path(&self.timeline_id, &self.tenant_id);
for remote_layer_name in &index_part.timeline_layers {
let local_layer_path = timeline_dir.join(remote_layer_name);
local_only_layers.remove(&local_layer_path);
let layer_metadata = index_part
let remote_layer_metadata = index_part
.layer_metadata
.get(&RemotePath::new(path))
.get(remote_layer_name)
.map(LayerFileMetadata::from)
.unwrap_or(LayerFileMetadata::MISSING);
if let Some(imgfilename) = ImageFileName::parse_str(fname) {
let remote_layer_path = self
.conf
.remote_path(&local_layer_path)
.expect("local_layer_path received from the same conf that provided a workdir");
if local_layer_path.exists() {
let mut already_downloaded = true;
// Are there any local files that exist, with a size that doesn't match
// with the size stored in the remote index file?
// If so, rename_to_backup those files so that we re-download them later.
if let Some(remote_size) = remote_layer_metadata.file_size() {
match local_layer_path.metadata() {
Ok(metadata) => {
let local_size = metadata.len();
if local_size != remote_size {
warn!("removing local file {local_layer_path:?} because it has unexpected length {local_size}; length in remote index is {remote_size}");
if let Err(err) = rename_to_backup(&local_layer_path) {
error!("could not rename file {local_layer_path:?}: {err:?}");
} else {
self.metrics.current_physical_size_gauge.sub(local_size);
already_downloaded = false;
}
}
}
Err(err) => {
error!("could not get size of local file {local_layer_path:?}: {err:?}")
}
}
}
if already_downloaded {
continue;
}
} else {
info!("remote layer {remote_layer_path:?} does not exist locally");
}
let layer_name = local_layer_path
.file_name()
.and_then(|os_str| os_str.to_str())
.with_context(|| {
format!("Layer file {local_layer_path:?} has no name in unicode")
})?;
if let Some(imgfilename) = ImageFileName::parse_str(layer_name) {
if imgfilename.lsn > up_to_date_disk_consistent_lsn {
warn!(
"found future image layer {} on timeline {} remote_consistent_lsn is {}",
@@ -1075,11 +1075,13 @@ impl Timeline {
continue;
}
trace!("downloading image file: {path:?}");
let sz = remote_client
.download_layer_file(&RemotePath::new(path), &layer_metadata)
trace!("downloading image file: {remote_layer_path:?}");
let downloaded_size = remote_client
.download_layer_file(&remote_layer_path, &remote_layer_metadata)
.await
.context("download image layer")?;
.with_context(|| {
format!("failed to download image layer from path {remote_layer_path:?}")
})?;
trace!("done");
let image_layer =
@@ -1089,8 +1091,10 @@ impl Timeline {
.write()
.unwrap()
.insert_historic(Arc::new(image_layer));
self.metrics.current_physical_size_gauge.add(sz);
} else if let Some(deltafilename) = DeltaFileName::parse_str(fname) {
self.metrics
.current_physical_size_gauge
.add(downloaded_size);
} else if let Some(deltafilename) = DeltaFileName::parse_str(layer_name) {
// Create a DeltaLayer struct for each delta file.
// The end-LSN is exclusive, while disk_consistent_lsn is
// inclusive. For example, if disk_consistent_lsn is 100, it is
@@ -1105,11 +1109,13 @@ impl Timeline {
continue;
}
trace!("downloading delta file: {path:?}");
trace!("downloading delta file: {remote_layer_path:?}");
let sz = remote_client
.download_layer_file(&RemotePath::new(path), &layer_metadata)
.download_layer_file(&remote_layer_path, &remote_layer_metadata)
.await
.context("download delta layer")?;
.with_context(|| {
format!("failed to download delta layer from path {remote_layer_path:?}")
})?;
trace!("done");
let delta_layer =
@@ -1121,16 +1127,11 @@ impl Timeline {
.insert_historic(Arc::new(delta_layer));
self.metrics.current_physical_size_gauge.add(sz);
} else {
bail!("unexpected layer filename in remote storage: {}", fname);
bail!("unexpected layer filename {layer_name} in remote storage path: {remote_layer_path:?}");
}
}
// now these are local only filenames
let local_only_filenames = local_filenames
.difference(&remote_filenames)
.cloned()
.collect();
Ok(local_only_filenames)
Ok(local_only_layers)
}
///
@@ -1164,47 +1165,46 @@ impl Timeline {
let disk_consistent_lsn = up_to_date_metadata.disk_consistent_lsn();
// Build a map of local layers for quick lookups
let mut local_filenames: HashSet<PathBuf> = HashSet::new();
for layer in self.layers.read().unwrap().iter_historic_layers() {
local_filenames.insert(layer.filename());
}
let local_layers = self
.layers
.read()
.unwrap()
.iter_historic_layers()
.map(|historic_layer| {
historic_layer
.local_path()
.expect("Historic layers should have a path")
})
.collect::<HashSet<_>>();
let local_only_filenames = match index_part {
let local_only_layers = match index_part {
Some(index_part) => {
info!(
"initializing upload queue from remote index with {} layer files",
index_part.timeline_layers.len()
);
remote_client.init_upload_queue(index_part)?;
let local_only_filenames = self
.download_missing(
index_part,
remote_client,
local_filenames,
disk_consistent_lsn,
)
.await?;
local_only_filenames
self.download_missing(index_part, remote_client, local_layers, disk_consistent_lsn)
.await?
}
None => {
info!("initializing upload queue as empty");
remote_client.init_upload_queue_for_empty_remote(up_to_date_metadata)?;
local_filenames
local_layers
}
};
// Are there local files that don't exist remotely? Schedule uploads for them
let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id);
for fname in &local_only_filenames {
let absolute = timeline_path.join(fname);
let sz = absolute
for layer_path in &local_only_layers {
let layer_size = layer_path
.metadata()
.with_context(|| format!("failed to get file {} metadata", fname.display()))?
.with_context(|| format!("failed to get file {layer_path:?} metadata"))?
.len();
info!("scheduling {} for upload", fname.display());
remote_client.schedule_layer_file_upload(&absolute, &LayerFileMetadata::new(sz))?;
info!("scheduling {layer_path:?} for upload");
remote_client
.schedule_layer_file_upload(layer_path, &LayerFileMetadata::new(layer_size))?;
}
if !local_only_filenames.is_empty() {
if !local_only_layers.is_empty() {
remote_client.schedule_index_upload(up_to_date_metadata)?;
}

View File

@@ -226,6 +226,7 @@ impl ReplicationConn {
let mut end_pos = stop_pos.unwrap_or(inmem_state.commit_lsn);
let mut wal_reader = WalReader::new(
spg.conf.workdir.clone(),
spg.conf.timeline_dir(&tli.ttid),
&persisted_state,
start_pos,

View File

@@ -13,7 +13,7 @@ use std::time::Duration;
use postgres_ffi::v14::xlog_utils::XLogSegNoOffsetToRecPtr;
use postgres_ffi::XLogFileName;
use postgres_ffi::{XLogSegNo, PG_TLI};
use remote_storage::GenericRemoteStorage;
use remote_storage::{GenericRemoteStorage, RemotePath};
use tokio::fs::File;
use tokio::runtime::Builder;
@@ -151,7 +151,7 @@ async fn update_task(
let timeline_dir = conf.timeline_dir(&ttid);
let handle = tokio::spawn(
backup_task_main(ttid, timeline_dir, shutdown_rx)
backup_task_main(ttid, timeline_dir, conf.workdir.clone(), shutdown_rx)
.instrument(info_span!("WAL backup task", ttid = %ttid)),
);
@@ -182,10 +182,10 @@ async fn wal_backup_launcher_main_loop(
let conf_ = conf.clone();
REMOTE_STORAGE.get_or_init(|| {
conf_.remote_storage.as_ref().map(|c| {
GenericRemoteStorage::from_config(conf_.workdir, c)
.expect("failed to create remote storage")
})
conf_
.remote_storage
.as_ref()
.map(|c| GenericRemoteStorage::from_config(c).expect("failed to create remote storage"))
});
// Presense in this map means launcher is aware s3 offloading is needed for
@@ -234,6 +234,7 @@ async fn wal_backup_launcher_main_loop(
struct WalBackupTask {
timeline: Arc<Timeline>,
timeline_dir: PathBuf,
workspace_dir: PathBuf,
wal_seg_size: usize,
commit_lsn_watch_rx: watch::Receiver<Lsn>,
}
@@ -242,6 +243,7 @@ struct WalBackupTask {
async fn backup_task_main(
ttid: TenantTimelineId,
timeline_dir: PathBuf,
workspace_dir: PathBuf,
mut shutdown_rx: Receiver<()>,
) {
info!("started");
@@ -257,6 +259,7 @@ async fn backup_task_main(
commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(),
timeline: tli,
timeline_dir,
workspace_dir,
};
// task is spinned up only when wal_seg_size already initialized
@@ -321,6 +324,7 @@ impl WalBackupTask {
commit_lsn,
self.wal_seg_size,
&self.timeline_dir,
&self.workspace_dir,
)
.await
{
@@ -353,11 +357,12 @@ pub async fn backup_lsn_range(
end_lsn: Lsn,
wal_seg_size: usize,
timeline_dir: &Path,
workspace_dir: &Path,
) -> Result<Lsn> {
let mut res = start_lsn;
let segments = get_segments(start_lsn, end_lsn, wal_seg_size);
for s in &segments {
backup_single_segment(s, timeline_dir)
backup_single_segment(s, timeline_dir, workspace_dir)
.await
.with_context(|| format!("offloading segno {}", s.seg_no))?;
@@ -372,11 +377,24 @@ pub async fn backup_lsn_range(
Ok(res)
}
async fn backup_single_segment(seg: &Segment, timeline_dir: &Path) -> Result<()> {
let segment_file_name = seg.file_path(timeline_dir)?;
async fn backup_single_segment(
seg: &Segment,
timeline_dir: &Path,
workspace_dir: &Path,
) -> Result<()> {
let segment_file_path = seg.file_path(timeline_dir)?;
let remote_segment_path = segment_file_path
.strip_prefix(&workspace_dir)
.context("Failed to strip workspace dir prefix")
.and_then(RemotePath::new)
.with_context(|| {
format!(
"Failed to resolve remote part of path {segment_file_path:?} for base {workspace_dir:?}",
)
})?;
backup_object(&segment_file_name, seg.size()).await?;
debug!("Backup of {} done", segment_file_name.display());
backup_object(&segment_file_path, &remote_segment_path, seg.size()).await?;
debug!("Backup of {} done", segment_file_path.display());
Ok(())
}
@@ -426,7 +444,7 @@ fn get_segments(start: Lsn, end: Lsn, seg_size: usize) -> Vec<Segment> {
static REMOTE_STORAGE: OnceCell<Option<GenericRemoteStorage>> = OnceCell::new();
async fn backup_object(source_file: &Path, size: usize) -> Result<()> {
async fn backup_object(source_file: &Path, target_file: &RemotePath, size: usize) -> Result<()> {
let storage = REMOTE_STORAGE
.get()
.expect("failed to get remote storage")
@@ -441,12 +459,12 @@ async fn backup_object(source_file: &Path, size: usize) -> Result<()> {
})?);
storage
.upload_storage_object(Box::new(file), size, source_file)
.upload_storage_object(Box::new(file), size, target_file)
.await
}
pub async fn read_object(
file_path: PathBuf,
file_path: &RemotePath,
offset: u64,
) -> anyhow::Result<Pin<Box<dyn tokio::io::AsyncRead>>> {
let storage = REMOTE_STORAGE
@@ -455,19 +473,13 @@ pub async fn read_object(
.as_ref()
.context("No remote storage configured")?;
info!(
"segment download about to start for local path {} at offset {}",
file_path.display(),
offset
);
info!("segment download about to start from remote path {file_path:?} at offset {offset}");
let download = storage
.download_storage_object(Some((offset, None)), &file_path)
.download_storage_object(Some((offset, None)), file_path)
.await
.with_context(|| {
format!(
"Failed to open WAL segment download stream for local path {}",
file_path.display()
)
format!("Failed to open WAL segment download stream for remote path {file_path:?}")
})?;
Ok(download.download_stream)

View File

@@ -8,6 +8,7 @@
//! Note that last file has `.partial` suffix, that's different from postgres.
use anyhow::{bail, Context, Result};
use remote_storage::RemotePath;
use std::io::{self, Seek, SeekFrom};
use std::pin::Pin;
@@ -445,6 +446,7 @@ fn remove_segments_from_disk(
}
pub struct WalReader {
workdir: PathBuf,
timeline_dir: PathBuf,
wal_seg_size: usize,
pos: Lsn,
@@ -459,6 +461,7 @@ pub struct WalReader {
impl WalReader {
pub fn new(
workdir: PathBuf,
timeline_dir: PathBuf,
state: &SafeKeeperState,
start_pos: Lsn,
@@ -478,6 +481,7 @@ impl WalReader {
}
Ok(Self {
workdir,
timeline_dir,
wal_seg_size: state.server.wal_seg_size as usize,
pos: start_pos,
@@ -545,7 +549,17 @@ impl WalReader {
// Try to open remote file, if remote reads are enabled
if self.enable_remote_read {
return read_object(wal_file_path, xlogoff as u64).await;
let remote_wal_file_path = wal_file_path
.strip_prefix(&self.workdir)
.context("Failed to strip workdir prefix")
.and_then(RemotePath::new)
.with_context(|| {
format!(
"Failed to resolve remote part of path {:?} for base {:?}",
wal_file_path, self.workdir,
)
})?;
return read_object(&remote_wal_file_path, xlogoff as u64).await;
}
bail!("WAL segment is not found")