mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 20:42:54 +00:00
Turn GenericRemoteStorage into just a newtype around 'Arc<dyn RemoteStorage>'
We had a pattern like this:
match remote_storage {
GenericRemoteStorage::Local(storage) => {
let source = storage.remote_object_id(&file_path)?;
...
storage
.function(&source, ...)
.await
},
GenericRemoteStorage::S3(storage) => {
... exact same code as for the Local case ...
},
This removes the code duplication, by allowing you to call the functions
directly on GenericRemoteStorage.
Also change RemoveObjectId to be just a type alias for String. Now that
the callers of GenericRemoteStorage functions don't know whether they're
dealing with the LocalFs or S3 implementation, RemoveObjectId must be the
same type for both.
This commit is contained in:
committed by
Dmitry Rodionov
parent
171385ac14
commit
35b4816f09
@@ -14,8 +14,10 @@ use std::{
|
||||
ffi::OsStr,
|
||||
fmt::Debug,
|
||||
num::{NonZeroU32, NonZeroUsize},
|
||||
ops::Deref,
|
||||
path::{Path, PathBuf},
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use anyhow::{bail, Context};
|
||||
@@ -24,10 +26,7 @@ use tokio::io;
|
||||
use toml_edit::Item;
|
||||
use tracing::info;
|
||||
|
||||
pub use self::{
|
||||
local_fs::LocalFs,
|
||||
s3_bucket::{S3Bucket, S3ObjectKey},
|
||||
};
|
||||
pub use self::{local_fs::LocalFs, s3_bucket::S3Bucket};
|
||||
|
||||
/// How many different timelines can be processed simultaneously when synchronizing layers with the remote storage.
|
||||
/// During regular work, pageserver produces one layer file per timeline checkpoint, with bursts of concurrency
|
||||
@@ -42,22 +41,62 @@ pub const DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS: u32 = 10;
|
||||
/// https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/
|
||||
pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100;
|
||||
|
||||
const REMOTE_STORAGE_PREFIX_SEPARATOR: char = '/';
|
||||
|
||||
#[derive(Clone, PartialEq, Eq)]
|
||||
pub struct RemoteObjectId(String);
|
||||
|
||||
impl From<RemoteObjectId> for String {
|
||||
fn from(id: RemoteObjectId) -> Self {
|
||||
id.0
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// A key that refers to an object in remote storage. It works much like a Path,
|
||||
/// but it's a separate datatype so that you don't accidentally mix local paths
|
||||
/// and remote keys.
|
||||
///
|
||||
impl RemoteObjectId {
|
||||
// Needed to retrieve last component for RemoteObjectId.
|
||||
// In other words a file name
|
||||
/// Turn a/b/c or a/b/c/ into c
|
||||
pub fn object_name(&self) -> Option<&str> {
|
||||
// corner case, char::to_string is not const, thats why this is more verbose than it needs to be
|
||||
// see https://github.com/rust-lang/rust/issues/88674
|
||||
if self.0.len() == 1 && self.0.chars().next().unwrap() == REMOTE_STORAGE_PREFIX_SEPARATOR {
|
||||
return None;
|
||||
}
|
||||
|
||||
if self.0.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
|
||||
self.0.rsplit(REMOTE_STORAGE_PREFIX_SEPARATOR).nth(1)
|
||||
} else {
|
||||
self.0
|
||||
.rsplit_once(REMOTE_STORAGE_PREFIX_SEPARATOR)
|
||||
.map(|(_, last)| last)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Debug for RemoteObjectId {
|
||||
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
|
||||
self.0.fmt(fmt)
|
||||
}
|
||||
}
|
||||
|
||||
/// Storage (potentially remote) API to manage its state.
|
||||
/// This storage tries to be unaware of any layered repository context,
|
||||
/// providing basic CRUD operations for storage files.
|
||||
#[async_trait::async_trait]
|
||||
pub trait RemoteStorage: Send + Sync {
|
||||
/// A way to uniquely reference a file in the remote storage.
|
||||
type RemoteObjectId;
|
||||
|
||||
pub trait RemoteStorage: Send + Sync + 'static {
|
||||
/// Attempts to derive the storage path out of the local path, if the latter is correct.
|
||||
fn remote_object_id(&self, local_path: &Path) -> anyhow::Result<Self::RemoteObjectId>;
|
||||
fn remote_object_id(&self, local_path: &Path) -> anyhow::Result<RemoteObjectId>;
|
||||
|
||||
/// Gets the download path of the given storage file.
|
||||
fn local_path(&self, remote_object_id: &Self::RemoteObjectId) -> anyhow::Result<PathBuf>;
|
||||
fn local_path(&self, remote_object_id: &RemoteObjectId) -> anyhow::Result<PathBuf>;
|
||||
|
||||
/// Lists all items the storage has right now.
|
||||
async fn list(&self) -> anyhow::Result<Vec<Self::RemoteObjectId>>;
|
||||
async fn list(&self) -> anyhow::Result<Vec<RemoteObjectId>>;
|
||||
|
||||
/// Lists all top level subdirectories for a given prefix
|
||||
/// Note: here we assume that if the prefix is passed it was obtained via remote_object_id
|
||||
@@ -65,34 +104,39 @@ pub trait RemoteStorage: Send + Sync {
|
||||
/// so this method doesnt need to.
|
||||
async fn list_prefixes(
|
||||
&self,
|
||||
prefix: Option<&Self::RemoteObjectId>,
|
||||
) -> anyhow::Result<Vec<Self::RemoteObjectId>>;
|
||||
prefix: Option<&RemoteObjectId>,
|
||||
) -> anyhow::Result<Vec<RemoteObjectId>>;
|
||||
|
||||
/// Streams the local file contents into remote into the remote storage entry.
|
||||
async fn upload(
|
||||
&self,
|
||||
from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
|
||||
from: Box<(dyn io::AsyncRead + Unpin + Send + Sync + 'static)>,
|
||||
// S3 PUT request requires the content length to be specified,
|
||||
// otherwise it starts to fail with the concurrent connection count increasing.
|
||||
from_size_bytes: usize,
|
||||
to: &Self::RemoteObjectId,
|
||||
to: &RemoteObjectId,
|
||||
metadata: Option<StorageMetadata>,
|
||||
) -> anyhow::Result<()>;
|
||||
|
||||
/// Streams the remote storage entry contents into the buffered writer given, returns the filled writer.
|
||||
/// Returns the metadata, if any was stored with the file previously.
|
||||
async fn download(&self, from: &Self::RemoteObjectId) -> Result<Download, DownloadError>;
|
||||
async fn download(&self, from: &RemoteObjectId) -> Result<Download, DownloadError>;
|
||||
|
||||
/// Streams a given byte range of the remote storage entry contents into the buffered writer given, returns the filled writer.
|
||||
/// Returns the metadata, if any was stored with the file previously.
|
||||
async fn download_byte_range(
|
||||
&self,
|
||||
from: &Self::RemoteObjectId,
|
||||
from: &RemoteObjectId,
|
||||
start_inclusive: u64,
|
||||
end_exclusive: Option<u64>,
|
||||
) -> Result<Download, DownloadError>;
|
||||
|
||||
async fn delete(&self, path: &Self::RemoteObjectId) -> anyhow::Result<()>;
|
||||
async fn delete(&self, path: &RemoteObjectId) -> anyhow::Result<()>;
|
||||
|
||||
/// Downcast to LocalFs implementation. For tests.
|
||||
fn as_local(&self) -> Option<&LocalFs> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Download {
|
||||
@@ -135,34 +179,37 @@ impl std::error::Error for DownloadError {}
|
||||
|
||||
/// Every storage, currently supported.
|
||||
/// Serves as a simple way to pass around the [`RemoteStorage`] without dealing with generics.
|
||||
pub enum GenericRemoteStorage {
|
||||
Local(LocalFs),
|
||||
S3(S3Bucket),
|
||||
#[derive(Clone)]
|
||||
pub struct GenericRemoteStorage(Arc<dyn RemoteStorage>);
|
||||
|
||||
impl Deref for GenericRemoteStorage {
|
||||
type Target = dyn RemoteStorage;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.0.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
impl GenericRemoteStorage {
|
||||
pub fn new(
|
||||
pub fn new(storage: impl RemoteStorage) -> Self {
|
||||
Self(Arc::new(storage))
|
||||
}
|
||||
|
||||
pub fn from_config(
|
||||
working_directory: PathBuf,
|
||||
storage_config: &RemoteStorageConfig,
|
||||
) -> anyhow::Result<Self> {
|
||||
match &storage_config.storage {
|
||||
) -> anyhow::Result<GenericRemoteStorage> {
|
||||
Ok(match &storage_config.storage {
|
||||
RemoteStorageKind::LocalFs(root) => {
|
||||
info!("Using fs root '{}' as a remote storage", root.display());
|
||||
LocalFs::new(root.clone(), working_directory).map(GenericRemoteStorage::Local)
|
||||
GenericRemoteStorage::new(LocalFs::new(root.clone(), working_directory)?)
|
||||
}
|
||||
RemoteStorageKind::AwsS3(s3_config) => {
|
||||
info!("Using s3 bucket '{}' in region '{}' as a remote storage, prefix in bucket: '{:?}', bucket endpoint: '{:?}'",
|
||||
s3_config.bucket_name, s3_config.bucket_region, s3_config.prefix_in_bucket, s3_config.endpoint);
|
||||
S3Bucket::new(s3_config, working_directory).map(GenericRemoteStorage::S3)
|
||||
s3_config.bucket_name, s3_config.bucket_region, s3_config.prefix_in_bucket, s3_config.endpoint);
|
||||
GenericRemoteStorage::new(S3Bucket::new(s3_config, working_directory)?)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn as_local(&self) -> Option<&LocalFs> {
|
||||
match self {
|
||||
Self::Local(local_fs) => Some(local_fs),
|
||||
_ => None,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Takes storage object contents and its size and uploads to remote storage,
|
||||
@@ -172,47 +219,26 @@ impl GenericRemoteStorage {
|
||||
/// this path is used for the remote object id conversion only.
|
||||
pub async fn upload_storage_object(
|
||||
&self,
|
||||
from: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static,
|
||||
from: Box<dyn tokio::io::AsyncRead + Unpin + Send + Sync + 'static>,
|
||||
from_size_bytes: usize,
|
||||
from_path: &Path,
|
||||
) -> anyhow::Result<()> {
|
||||
async fn do_upload_storage_object<P, S>(
|
||||
storage: &S,
|
||||
from: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static,
|
||||
from_size_bytes: usize,
|
||||
from_path: &Path,
|
||||
) -> anyhow::Result<()>
|
||||
where
|
||||
P: std::fmt::Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
|
||||
{
|
||||
let target_storage_path = storage.remote_object_id(from_path).with_context(|| {
|
||||
let target_storage_path = self.remote_object_id(from_path).with_context(|| {
|
||||
format!(
|
||||
"Failed to get the storage path for source local path '{}'",
|
||||
from_path.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
self.upload(from, from_size_bytes, &target_storage_path, None)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to get the storage path for source local path '{}'",
|
||||
from_path.display()
|
||||
"Failed to upload from '{}' to storage path '{:?}'",
|
||||
from_path.display(),
|
||||
target_storage_path
|
||||
)
|
||||
})?;
|
||||
|
||||
storage
|
||||
.upload(from, from_size_bytes, &target_storage_path, None)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to upload from '{}' to storage path '{:?}'",
|
||||
from_path.display(),
|
||||
target_storage_path
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
match self {
|
||||
GenericRemoteStorage::Local(storage) => {
|
||||
do_upload_storage_object(storage, from, from_size_bytes, from_path).await
|
||||
}
|
||||
GenericRemoteStorage::S3(storage) => {
|
||||
do_upload_storage_object(storage, from, from_size_bytes, from_path).await
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Downloads the storage object into the `to_path` provided.
|
||||
@@ -222,42 +248,22 @@ impl GenericRemoteStorage {
|
||||
byte_range: Option<(u64, Option<u64>)>,
|
||||
to_path: &Path,
|
||||
) -> Result<Download, DownloadError> {
|
||||
async fn do_download_storage_object<P, S>(
|
||||
storage: &S,
|
||||
byte_range: Option<(u64, Option<u64>)>,
|
||||
to_path: &Path,
|
||||
) -> Result<Download, DownloadError>
|
||||
where
|
||||
P: std::fmt::Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
|
||||
{
|
||||
let remote_object_path = storage
|
||||
.remote_object_id(to_path)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to get the storage path for target local path '{}'",
|
||||
to_path.display()
|
||||
)
|
||||
})
|
||||
.map_err(DownloadError::BadInput)?;
|
||||
let remote_object_path = self
|
||||
.remote_object_id(to_path)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to get the storage path for target local path '{}'",
|
||||
to_path.display()
|
||||
)
|
||||
})
|
||||
.map_err(DownloadError::BadInput)?;
|
||||
|
||||
match byte_range {
|
||||
Some((start, end)) => {
|
||||
storage
|
||||
.download_byte_range(&remote_object_path, start, end)
|
||||
.await
|
||||
}
|
||||
None => storage.download(&remote_object_path).await,
|
||||
}
|
||||
}
|
||||
|
||||
match self {
|
||||
GenericRemoteStorage::Local(storage) => {
|
||||
do_download_storage_object(storage, byte_range, to_path).await
|
||||
}
|
||||
GenericRemoteStorage::S3(storage) => {
|
||||
do_download_storage_object(storage, byte_range, to_path).await
|
||||
match byte_range {
|
||||
Some((start, end)) => {
|
||||
self.download_byte_range(&remote_object_path, start, end)
|
||||
.await
|
||||
}
|
||||
None => self.download(&remote_object_path).await,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -463,4 +469,23 @@ mod tests {
|
||||
"/foo/bar.baz..temp"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn object_name() {
|
||||
let k = RemoteObjectId("a/b/c".to_owned());
|
||||
assert_eq!(k.object_name(), Some("c"));
|
||||
|
||||
let k = RemoteObjectId("a/b/c/".to_owned());
|
||||
assert_eq!(k.object_name(), Some("c"));
|
||||
|
||||
let k = RemoteObjectId("a/".to_owned());
|
||||
assert_eq!(k.object_name(), Some("a"));
|
||||
|
||||
// XXX is it impossible to have an empty key?
|
||||
let k = RemoteObjectId("".to_owned());
|
||||
assert_eq!(k.object_name(), None);
|
||||
|
||||
let k = RemoteObjectId("/".to_owned());
|
||||
assert_eq!(k.object_name(), None);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,10 +17,19 @@ use tokio::{
|
||||
};
|
||||
use tracing::*;
|
||||
|
||||
use crate::{path_with_suffix_extension, Download, DownloadError};
|
||||
use crate::{path_with_suffix_extension, Download, DownloadError, RemoteObjectId};
|
||||
|
||||
use super::{strip_path_prefix, RemoteStorage, StorageMetadata};
|
||||
|
||||
/// Convert a Path in the remote storage into a RemoteObjectId
|
||||
fn remote_object_id_from_path(path: &Path) -> anyhow::Result<RemoteObjectId> {
|
||||
Ok(RemoteObjectId(
|
||||
path.to_str()
|
||||
.ok_or_else(|| anyhow::anyhow!("unexpected characters found in path"))?
|
||||
.to_string(),
|
||||
))
|
||||
}
|
||||
|
||||
pub struct LocalFs {
|
||||
working_directory: PathBuf,
|
||||
storage_root: PathBuf,
|
||||
@@ -43,11 +52,17 @@ impl LocalFs {
|
||||
})
|
||||
}
|
||||
|
||||
fn resolve_in_storage(&self, path: &Path) -> anyhow::Result<PathBuf> {
|
||||
///
|
||||
/// Get the absolute path in the local filesystem to given remote object.
|
||||
///
|
||||
/// This is public so that it can be used in tests. Should not be used elsewhere.
|
||||
///
|
||||
pub fn resolve_in_storage(&self, remote_object_id: &RemoteObjectId) -> anyhow::Result<PathBuf> {
|
||||
let path = PathBuf::from(&remote_object_id.0);
|
||||
if path.is_relative() {
|
||||
Ok(self.storage_root.join(path))
|
||||
} else if path.starts_with(&self.storage_root) {
|
||||
Ok(path.to_path_buf())
|
||||
Ok(path)
|
||||
} else {
|
||||
bail!(
|
||||
"Path '{}' does not belong to the current storage",
|
||||
@@ -85,38 +100,42 @@ impl LocalFs {
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl RemoteStorage for LocalFs {
|
||||
type RemoteObjectId = PathBuf;
|
||||
|
||||
fn remote_object_id(&self, local_path: &Path) -> anyhow::Result<Self::RemoteObjectId> {
|
||||
Ok(self.storage_root.join(
|
||||
/// Convert a "local" path into a "remote path"
|
||||
fn remote_object_id(&self, local_path: &Path) -> anyhow::Result<RemoteObjectId> {
|
||||
let path = self.storage_root.join(
|
||||
strip_path_prefix(&self.working_directory, local_path)
|
||||
.context("local path does not belong to this storage")?,
|
||||
))
|
||||
);
|
||||
remote_object_id_from_path(&path)
|
||||
}
|
||||
|
||||
fn local_path(&self, storage_path: &Self::RemoteObjectId) -> anyhow::Result<PathBuf> {
|
||||
let relative_path = strip_path_prefix(&self.storage_root, storage_path)
|
||||
fn local_path(&self, remote_object_id: &RemoteObjectId) -> anyhow::Result<PathBuf> {
|
||||
let storage_path = PathBuf::from(&remote_object_id.0);
|
||||
let relative_path = strip_path_prefix(&self.storage_root, &storage_path)
|
||||
.context("local path does not belong to this storage")?;
|
||||
Ok(self.working_directory.join(relative_path))
|
||||
}
|
||||
|
||||
async fn list(&self) -> anyhow::Result<Vec<Self::RemoteObjectId>> {
|
||||
async fn list(&self) -> anyhow::Result<Vec<RemoteObjectId>> {
|
||||
get_all_files(&self.storage_root, true).await
|
||||
}
|
||||
|
||||
async fn list_prefixes(
|
||||
&self,
|
||||
prefix: Option<&Self::RemoteObjectId>,
|
||||
) -> anyhow::Result<Vec<Self::RemoteObjectId>> {
|
||||
let path = prefix.unwrap_or(&self.storage_root);
|
||||
prefix: Option<&RemoteObjectId>,
|
||||
) -> anyhow::Result<Vec<RemoteObjectId>> {
|
||||
let path = match prefix {
|
||||
Some(prefix) => Path::new(&prefix.0),
|
||||
None => &self.storage_root,
|
||||
};
|
||||
get_all_files(path, false).await
|
||||
}
|
||||
|
||||
async fn upload(
|
||||
&self,
|
||||
from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
|
||||
from: Box<(dyn io::AsyncRead + Unpin + Send + Sync + 'static)>,
|
||||
from_size_bytes: usize,
|
||||
to: &Self::RemoteObjectId,
|
||||
to: &RemoteObjectId,
|
||||
metadata: Option<StorageMetadata>,
|
||||
) -> anyhow::Result<()> {
|
||||
let target_file_path = self.resolve_in_storage(to)?;
|
||||
@@ -197,7 +216,7 @@ impl RemoteStorage for LocalFs {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn download(&self, from: &Self::RemoteObjectId) -> Result<Download, DownloadError> {
|
||||
async fn download(&self, from: &RemoteObjectId) -> Result<Download, DownloadError> {
|
||||
let file_path = self
|
||||
.resolve_in_storage(from)
|
||||
.map_err(DownloadError::BadInput)?;
|
||||
@@ -231,7 +250,7 @@ impl RemoteStorage for LocalFs {
|
||||
|
||||
async fn download_byte_range(
|
||||
&self,
|
||||
from: &Self::RemoteObjectId,
|
||||
from: &RemoteObjectId,
|
||||
start_inclusive: u64,
|
||||
end_exclusive: Option<u64>,
|
||||
) -> Result<Download, DownloadError> {
|
||||
@@ -285,7 +304,7 @@ impl RemoteStorage for LocalFs {
|
||||
}
|
||||
}
|
||||
|
||||
async fn delete(&self, path: &Self::RemoteObjectId) -> anyhow::Result<()> {
|
||||
async fn delete(&self, path: &RemoteObjectId) -> anyhow::Result<()> {
|
||||
let file_path = self.resolve_in_storage(path)?;
|
||||
if file_path.exists() && file_path.is_file() {
|
||||
Ok(fs::remove_file(file_path).await?)
|
||||
@@ -296,6 +315,10 @@ impl RemoteStorage for LocalFs {
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
fn as_local(&self) -> Option<&LocalFs> {
|
||||
Some(self)
|
||||
}
|
||||
}
|
||||
|
||||
fn storage_metadata_path(original_path: &Path) -> PathBuf {
|
||||
@@ -305,7 +328,7 @@ fn storage_metadata_path(original_path: &Path) -> PathBuf {
|
||||
fn get_all_files<'a, P>(
|
||||
directory_path: P,
|
||||
recursive: bool,
|
||||
) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<PathBuf>>> + Send + Sync + 'a>>
|
||||
) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<RemoteObjectId>>> + Send + Sync + 'a>>
|
||||
where
|
||||
P: AsRef<Path> + Send + Sync + 'a,
|
||||
{
|
||||
@@ -322,12 +345,12 @@ where
|
||||
debug!("{:?} us a symlink, skipping", entry_path)
|
||||
} else if file_type.is_dir() {
|
||||
if recursive {
|
||||
paths.extend(get_all_files(entry_path, true).await?.into_iter())
|
||||
paths.extend(get_all_files(&entry_path, true).await?.into_iter())
|
||||
} else {
|
||||
paths.push(dir_entry.path())
|
||||
paths.push(remote_object_id_from_path(&dir_entry.path())?)
|
||||
}
|
||||
} else {
|
||||
paths.push(dir_entry.path());
|
||||
paths.push(remote_object_id_from_path(&dir_entry.path())?);
|
||||
}
|
||||
}
|
||||
Ok(paths)
|
||||
@@ -389,9 +412,15 @@ mod pure_tests {
|
||||
.join("file_name");
|
||||
let expected_path = storage_root.join(local_path.strip_prefix(&workdir)?);
|
||||
|
||||
let actual_path = PathBuf::from(
|
||||
storage
|
||||
.remote_object_id(&local_path)
|
||||
.expect("Matching path should map to storage path normally")
|
||||
.0,
|
||||
);
|
||||
assert_eq!(
|
||||
expected_path,
|
||||
storage.remote_object_id(&local_path).expect("Matching path should map to storage path normally"),
|
||||
actual_path,
|
||||
"File paths from workdir should be stored in local fs storage with the same path they have relative to the workdir"
|
||||
);
|
||||
|
||||
@@ -452,7 +481,9 @@ mod pure_tests {
|
||||
assert_eq!(
|
||||
local_path,
|
||||
storage
|
||||
.local_path(&storage_root.join(local_path.strip_prefix(&workdir)?))
|
||||
.local_path(&remote_object_id_from_path(
|
||||
&storage_root.join(local_path.strip_prefix(&workdir)?)
|
||||
)?)
|
||||
.expect("For a valid input, valid local path should be parsed"),
|
||||
"Should be able to parse metadata out of the correctly named remote delta file"
|
||||
);
|
||||
@@ -476,8 +507,7 @@ mod pure_tests {
|
||||
#[test]
|
||||
fn local_path_negatives() -> anyhow::Result<()> {
|
||||
#[track_caller]
|
||||
#[allow(clippy::ptr_arg)] // have to use &PathBuf due to `storage.local_path` parameter requirements
|
||||
fn local_path_error(storage: &LocalFs, storage_path: &PathBuf) -> String {
|
||||
fn local_path_error(storage: &LocalFs, storage_path: &RemoteObjectId) -> String {
|
||||
match storage.local_path(storage_path) {
|
||||
Ok(wrong_path) => panic!(
|
||||
"Expected local path input {:?} to cause an error, but got file path: {:?}",
|
||||
@@ -494,7 +524,8 @@ mod pure_tests {
|
||||
};
|
||||
|
||||
let totally_wrong_path = "wrong_wrong_wrong";
|
||||
let error_message = local_path_error(&storage, &PathBuf::from(totally_wrong_path));
|
||||
let error_message =
|
||||
local_path_error(&storage, &RemoteObjectId(totally_wrong_path.to_string()));
|
||||
assert!(error_message.contains(totally_wrong_path));
|
||||
|
||||
Ok(())
|
||||
@@ -537,7 +568,7 @@ mod fs_tests {
|
||||
storage: &LocalFs,
|
||||
#[allow(clippy::ptr_arg)]
|
||||
// have to use &PathBuf due to `storage.local_path` parameter requirements
|
||||
remote_storage_path: &PathBuf,
|
||||
remote_storage_path: &RemoteObjectId,
|
||||
expected_metadata: Option<&StorageMetadata>,
|
||||
) -> anyhow::Result<String> {
|
||||
let mut download = storage
|
||||
@@ -568,12 +599,20 @@ mod fs_tests {
|
||||
"whatever_contents",
|
||||
)
|
||||
.await?;
|
||||
let target_path = PathBuf::from("/").join("somewhere").join("else");
|
||||
match storage.upload(file, size, &target_path, None).await {
|
||||
let target_path = "/somewhere/else";
|
||||
match storage
|
||||
.upload(
|
||||
Box::new(file),
|
||||
size,
|
||||
&RemoteObjectId(target_path.to_string()),
|
||||
None,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(()) => panic!("Should not allow storing files with wrong target path"),
|
||||
Err(e) => {
|
||||
let message = format!("{:?}", e);
|
||||
assert!(message.contains(&target_path.display().to_string()));
|
||||
assert!(message.contains(target_path));
|
||||
assert!(message.contains("does not belong to the current storage"));
|
||||
}
|
||||
}
|
||||
@@ -606,20 +645,20 @@ mod fs_tests {
|
||||
// Check that you get an error if the size parameter doesn't match the actual
|
||||
// size of the stream.
|
||||
storage
|
||||
.upload(content.clone(), 0, &id, None)
|
||||
.upload(Box::new(content.clone()), 0, &id, None)
|
||||
.await
|
||||
.expect_err("upload with zero size succeeded");
|
||||
storage
|
||||
.upload(content.clone(), 4, &id, None)
|
||||
.upload(Box::new(content.clone()), 4, &id, None)
|
||||
.await
|
||||
.expect_err("upload with too short size succeeded");
|
||||
storage
|
||||
.upload(content.clone(), 6, &id, None)
|
||||
.upload(Box::new(content.clone()), 6, &id, None)
|
||||
.await
|
||||
.expect_err("upload with too large size succeeded");
|
||||
|
||||
// Correct size is 5, this should succeed.
|
||||
storage.upload(content, 5, &id, None).await?;
|
||||
storage.upload(Box::new(content), 5, &id, None).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -643,8 +682,8 @@ mod fs_tests {
|
||||
"We should upload and download the same contents"
|
||||
);
|
||||
|
||||
let non_existing_path = PathBuf::from("somewhere").join("else");
|
||||
match storage.download(&non_existing_path).await {
|
||||
let non_existing_path = "somewhere/else";
|
||||
match storage.download(&RemoteObjectId(non_existing_path.to_string())).await {
|
||||
Err(DownloadError::NotFound) => {} // Should get NotFound for non existing keys
|
||||
other => panic!("Should get a NotFound error when downloading non-existing storage files, but got: {other:?}"),
|
||||
}
|
||||
@@ -783,7 +822,7 @@ mod fs_tests {
|
||||
Err(e) => {
|
||||
let error_string = e.to_string();
|
||||
assert!(error_string.contains("does not exist"));
|
||||
assert!(error_string.contains(&upload_target.display().to_string()));
|
||||
assert!(error_string.contains(&upload_target.0));
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
@@ -844,15 +883,19 @@ mod fs_tests {
|
||||
storage: &LocalFs,
|
||||
name: &str,
|
||||
metadata: Option<StorageMetadata>,
|
||||
) -> anyhow::Result<PathBuf> {
|
||||
) -> anyhow::Result<RemoteObjectId> {
|
||||
let timeline_path = workdir.join("timelines").join("some_timeline");
|
||||
let relative_timeline_path = timeline_path.strip_prefix(&workdir)?;
|
||||
let storage_path = storage.storage_root.join(relative_timeline_path).join(name);
|
||||
let remote_object_id = RemoteObjectId(storage_path.to_str().unwrap().to_string());
|
||||
|
||||
let from_path = storage.working_directory.join(name);
|
||||
let (file, size) = create_file_for_upload(&from_path, &dummy_contents(name)).await?;
|
||||
storage.upload(file, size, &storage_path, metadata).await?;
|
||||
Ok(storage_path)
|
||||
|
||||
storage
|
||||
.upload(Box::new(file), size, &remote_object_id, metadata)
|
||||
.await?;
|
||||
remote_object_id_from_path(&storage_path)
|
||||
}
|
||||
|
||||
async fn create_file_for_upload(
|
||||
@@ -877,9 +920,9 @@ mod fs_tests {
|
||||
format!("contents for {name}")
|
||||
}
|
||||
|
||||
async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result<Vec<PathBuf>> {
|
||||
async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result<Vec<RemoteObjectId>> {
|
||||
let mut files = storage.list().await?;
|
||||
files.sort();
|
||||
files.sort_by(|a, b| a.0.cmp(&b.0));
|
||||
Ok(files)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,7 +19,10 @@ use tokio::{io, sync::Semaphore};
|
||||
use tokio_util::io::ReaderStream;
|
||||
use tracing::debug;
|
||||
|
||||
use crate::{strip_path_prefix, Download, DownloadError, RemoteStorage, S3Config};
|
||||
use crate::{
|
||||
strip_path_prefix, Download, DownloadError, RemoteObjectId, RemoteStorage, S3Config,
|
||||
REMOTE_STORAGE_PREFIX_SEPARATOR,
|
||||
};
|
||||
|
||||
use super::StorageMetadata;
|
||||
|
||||
@@ -88,50 +91,26 @@ pub(super) mod metrics {
|
||||
}
|
||||
}
|
||||
|
||||
const S3_PREFIX_SEPARATOR: char = '/';
|
||||
fn download_destination(
|
||||
id: &RemoteObjectId,
|
||||
workdir: &Path,
|
||||
prefix_to_strip: Option<&str>,
|
||||
) -> PathBuf {
|
||||
let path_without_prefix = match prefix_to_strip {
|
||||
Some(prefix) => id.0.strip_prefix(prefix).unwrap_or_else(|| {
|
||||
panic!(
|
||||
"Could not strip prefix '{}' from S3 object key '{}'",
|
||||
prefix, id.0
|
||||
)
|
||||
}),
|
||||
None => &id.0,
|
||||
};
|
||||
|
||||
#[derive(Debug, Eq, PartialEq, PartialOrd, Ord, Hash)]
|
||||
pub struct S3ObjectKey(String);
|
||||
|
||||
impl S3ObjectKey {
|
||||
/// Turn a/b/c or a/b/c/ into c
|
||||
pub fn object_name(&self) -> Option<&str> {
|
||||
// corner case, char::to_string is not const, thats why this is more verbose than it needs to be
|
||||
// see https://github.com/rust-lang/rust/issues/88674
|
||||
if self.0.len() == 1 && self.0.chars().next().unwrap() == S3_PREFIX_SEPARATOR {
|
||||
return None;
|
||||
}
|
||||
|
||||
if self.0.ends_with(S3_PREFIX_SEPARATOR) {
|
||||
self.0.rsplit(S3_PREFIX_SEPARATOR).nth(1)
|
||||
} else {
|
||||
self.0
|
||||
.rsplit_once(S3_PREFIX_SEPARATOR)
|
||||
.map(|(_, last)| last)
|
||||
}
|
||||
}
|
||||
|
||||
fn key(&self) -> &str {
|
||||
&self.0
|
||||
}
|
||||
|
||||
fn download_destination(&self, workdir: &Path, prefix_to_strip: Option<&str>) -> PathBuf {
|
||||
let path_without_prefix = match prefix_to_strip {
|
||||
Some(prefix) => self.0.strip_prefix(prefix).unwrap_or_else(|| {
|
||||
panic!(
|
||||
"Could not strip prefix '{}' from S3 object key '{}'",
|
||||
prefix, self.0
|
||||
)
|
||||
}),
|
||||
None => &self.0,
|
||||
};
|
||||
|
||||
workdir.join(
|
||||
path_without_prefix
|
||||
.split(S3_PREFIX_SEPARATOR)
|
||||
.collect::<PathBuf>(),
|
||||
)
|
||||
}
|
||||
workdir.join(
|
||||
path_without_prefix
|
||||
.split(REMOTE_STORAGE_PREFIX_SEPARATOR)
|
||||
.collect::<PathBuf>(),
|
||||
)
|
||||
}
|
||||
|
||||
/// AWS S3 storage.
|
||||
@@ -193,12 +172,12 @@ impl S3Bucket {
|
||||
|
||||
let prefix_in_bucket = aws_config.prefix_in_bucket.as_deref().map(|prefix| {
|
||||
let mut prefix = prefix;
|
||||
while prefix.starts_with(S3_PREFIX_SEPARATOR) {
|
||||
while prefix.starts_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
|
||||
prefix = &prefix[1..]
|
||||
}
|
||||
|
||||
let mut prefix = prefix.to_string();
|
||||
while prefix.ends_with(S3_PREFIX_SEPARATOR) {
|
||||
while prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
|
||||
prefix.pop();
|
||||
}
|
||||
prefix
|
||||
@@ -249,23 +228,25 @@ impl S3Bucket {
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl RemoteStorage for S3Bucket {
|
||||
type RemoteObjectId = S3ObjectKey;
|
||||
|
||||
fn remote_object_id(&self, local_path: &Path) -> anyhow::Result<Self::RemoteObjectId> {
|
||||
fn remote_object_id(&self, local_path: &Path) -> anyhow::Result<RemoteObjectId> {
|
||||
let relative_path = strip_path_prefix(&self.workdir, local_path)?;
|
||||
let mut key = self.prefix_in_bucket.clone().unwrap_or_default();
|
||||
for segment in relative_path {
|
||||
key.push(S3_PREFIX_SEPARATOR);
|
||||
key.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
|
||||
key.push_str(&segment.to_string_lossy());
|
||||
}
|
||||
Ok(S3ObjectKey(key))
|
||||
Ok(RemoteObjectId(key))
|
||||
}
|
||||
|
||||
fn local_path(&self, storage_path: &Self::RemoteObjectId) -> anyhow::Result<PathBuf> {
|
||||
Ok(storage_path.download_destination(&self.workdir, self.prefix_in_bucket.as_deref()))
|
||||
fn local_path(&self, storage_path: &RemoteObjectId) -> anyhow::Result<PathBuf> {
|
||||
Ok(download_destination(
|
||||
storage_path,
|
||||
&self.workdir,
|
||||
self.prefix_in_bucket.as_deref(),
|
||||
))
|
||||
}
|
||||
|
||||
async fn list(&self) -> anyhow::Result<Vec<Self::RemoteObjectId>> {
|
||||
async fn list(&self) -> anyhow::Result<Vec<RemoteObjectId>> {
|
||||
let mut document_keys = Vec::new();
|
||||
|
||||
let mut continuation_token = None;
|
||||
@@ -296,7 +277,7 @@ impl RemoteStorage for S3Bucket {
|
||||
.contents
|
||||
.unwrap_or_default()
|
||||
.into_iter()
|
||||
.filter_map(|o| Some(S3ObjectKey(o.key?))),
|
||||
.filter_map(|o| Some(RemoteObjectId(o.key?))),
|
||||
);
|
||||
|
||||
match fetch_response.continuation_token {
|
||||
@@ -312,8 +293,8 @@ impl RemoteStorage for S3Bucket {
|
||||
/// Note: it wont include empty "directories"
|
||||
async fn list_prefixes(
|
||||
&self,
|
||||
prefix: Option<&Self::RemoteObjectId>,
|
||||
) -> anyhow::Result<Vec<Self::RemoteObjectId>> {
|
||||
prefix: Option<&RemoteObjectId>,
|
||||
) -> anyhow::Result<Vec<RemoteObjectId>> {
|
||||
// get the passed prefix or if it is not set use prefix_in_bucket value
|
||||
let list_prefix = prefix
|
||||
.map(|p| p.0.clone())
|
||||
@@ -321,8 +302,8 @@ impl RemoteStorage for S3Bucket {
|
||||
.map(|mut p| {
|
||||
// required to end with a separator
|
||||
// otherwise request will return only the entry of a prefix
|
||||
if !p.ends_with(S3_PREFIX_SEPARATOR) {
|
||||
p.push(S3_PREFIX_SEPARATOR);
|
||||
if !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
|
||||
p.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
|
||||
}
|
||||
p
|
||||
});
|
||||
@@ -345,7 +326,7 @@ impl RemoteStorage for S3Bucket {
|
||||
bucket: self.bucket_name.clone(),
|
||||
prefix: list_prefix.clone(),
|
||||
continuation_token,
|
||||
delimiter: Some(S3_PREFIX_SEPARATOR.to_string()),
|
||||
delimiter: Some(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string()),
|
||||
..ListObjectsV2Request::default()
|
||||
})
|
||||
.await
|
||||
@@ -359,7 +340,7 @@ impl RemoteStorage for S3Bucket {
|
||||
.common_prefixes
|
||||
.unwrap_or_default()
|
||||
.into_iter()
|
||||
.filter_map(|o| Some(S3ObjectKey(o.prefix?))),
|
||||
.filter_map(|o| Some(RemoteObjectId(o.prefix?))),
|
||||
);
|
||||
|
||||
match fetch_response.continuation_token {
|
||||
@@ -373,9 +354,9 @@ impl RemoteStorage for S3Bucket {
|
||||
|
||||
async fn upload(
|
||||
&self,
|
||||
from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
|
||||
from: Box<(dyn io::AsyncRead + Unpin + Send + Sync + 'static)>,
|
||||
from_size_bytes: usize,
|
||||
to: &Self::RemoteObjectId,
|
||||
to: &RemoteObjectId,
|
||||
metadata: Option<StorageMetadata>,
|
||||
) -> anyhow::Result<()> {
|
||||
let _guard = self
|
||||
@@ -392,7 +373,7 @@ impl RemoteStorage for S3Bucket {
|
||||
from_size_bytes,
|
||||
)),
|
||||
bucket: self.bucket_name.clone(),
|
||||
key: to.key().to_owned(),
|
||||
key: to.0.to_owned(),
|
||||
metadata: metadata.map(|m| m.0),
|
||||
..PutObjectRequest::default()
|
||||
})
|
||||
@@ -404,10 +385,10 @@ impl RemoteStorage for S3Bucket {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn download(&self, from: &Self::RemoteObjectId) -> Result<Download, DownloadError> {
|
||||
async fn download(&self, from: &RemoteObjectId) -> Result<Download, DownloadError> {
|
||||
self.download_object(GetObjectRequest {
|
||||
bucket: self.bucket_name.clone(),
|
||||
key: from.key().to_owned(),
|
||||
key: from.0.to_owned(),
|
||||
..GetObjectRequest::default()
|
||||
})
|
||||
.await
|
||||
@@ -415,7 +396,7 @@ impl RemoteStorage for S3Bucket {
|
||||
|
||||
async fn download_byte_range(
|
||||
&self,
|
||||
from: &Self::RemoteObjectId,
|
||||
from: &RemoteObjectId,
|
||||
start_inclusive: u64,
|
||||
end_exclusive: Option<u64>,
|
||||
) -> Result<Download, DownloadError> {
|
||||
@@ -429,14 +410,14 @@ impl RemoteStorage for S3Bucket {
|
||||
|
||||
self.download_object(GetObjectRequest {
|
||||
bucket: self.bucket_name.clone(),
|
||||
key: from.key().to_owned(),
|
||||
key: from.0.to_owned(),
|
||||
range,
|
||||
..GetObjectRequest::default()
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
async fn delete(&self, path: &Self::RemoteObjectId) -> anyhow::Result<()> {
|
||||
async fn delete(&self, remote_object_id: &RemoteObjectId) -> anyhow::Result<()> {
|
||||
let _guard = self
|
||||
.concurrency_limiter
|
||||
.acquire()
|
||||
@@ -448,7 +429,7 @@ impl RemoteStorage for S3Bucket {
|
||||
self.client
|
||||
.delete_object(DeleteObjectRequest {
|
||||
bucket: self.bucket_name.clone(),
|
||||
key: path.key().to_owned(),
|
||||
key: remote_object_id.0.to_owned(),
|
||||
..DeleteObjectRequest::default()
|
||||
})
|
||||
.await
|
||||
@@ -467,43 +448,24 @@ mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn object_name() {
|
||||
let k = S3ObjectKey("a/b/c".to_owned());
|
||||
assert_eq!(k.object_name(), Some("c"));
|
||||
|
||||
let k = S3ObjectKey("a/b/c/".to_owned());
|
||||
assert_eq!(k.object_name(), Some("c"));
|
||||
|
||||
let k = S3ObjectKey("a/".to_owned());
|
||||
assert_eq!(k.object_name(), Some("a"));
|
||||
|
||||
// XXX is it impossible to have an empty key?
|
||||
let k = S3ObjectKey("".to_owned());
|
||||
assert_eq!(k.object_name(), None);
|
||||
|
||||
let k = S3ObjectKey("/".to_owned());
|
||||
assert_eq!(k.object_name(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn download_destination() -> anyhow::Result<()> {
|
||||
fn test_download_destination() -> anyhow::Result<()> {
|
||||
let workdir = tempdir()?.path().to_owned();
|
||||
let local_path = workdir.join("one").join("two").join("test_name");
|
||||
let relative_path = local_path.strip_prefix(&workdir)?;
|
||||
|
||||
let key = S3ObjectKey(format!(
|
||||
let key = RemoteObjectId(format!(
|
||||
"{}{}",
|
||||
S3_PREFIX_SEPARATOR,
|
||||
REMOTE_STORAGE_PREFIX_SEPARATOR,
|
||||
relative_path
|
||||
.iter()
|
||||
.map(|segment| segment.to_str().unwrap())
|
||||
.collect::<Vec<_>>()
|
||||
.join(&S3_PREFIX_SEPARATOR.to_string()),
|
||||
.join(&REMOTE_STORAGE_PREFIX_SEPARATOR.to_string()),
|
||||
));
|
||||
|
||||
assert_eq!(
|
||||
local_path,
|
||||
key.download_destination(&workdir, None),
|
||||
download_destination(&key, &workdir, None),
|
||||
"Download destination should consist of s3 path joined with the workdir prefix"
|
||||
);
|
||||
|
||||
@@ -520,8 +482,8 @@ mod tests {
|
||||
|
||||
let storage = dummy_storage(workdir);
|
||||
|
||||
let expected_key = S3ObjectKey(format!(
|
||||
"{}{S3_PREFIX_SEPARATOR}{segment_1}{S3_PREFIX_SEPARATOR}{segment_2}",
|
||||
let expected_key = RemoteObjectId(format!(
|
||||
"{}{REMOTE_STORAGE_PREFIX_SEPARATOR}{segment_1}{REMOTE_STORAGE_PREFIX_SEPARATOR}{segment_2}",
|
||||
storage.prefix_in_bucket.as_deref().unwrap_or_default(),
|
||||
));
|
||||
|
||||
@@ -592,7 +554,7 @@ mod tests {
|
||||
storage.prefix_in_bucket.as_deref(),
|
||||
);
|
||||
assert_eq!(
|
||||
s3_key.download_destination(&workdir, storage.prefix_in_bucket.as_deref()),
|
||||
download_destination(&s3_key, &workdir, storage.prefix_in_bucket.as_deref()),
|
||||
storage
|
||||
.local_path(&s3_key)
|
||||
.expect("For a valid input, valid S3 info should be parsed"),
|
||||
@@ -604,7 +566,7 @@ mod tests {
|
||||
storage.prefix_in_bucket.as_deref(),
|
||||
);
|
||||
assert_eq!(
|
||||
s3_key.download_destination(&workdir, storage.prefix_in_bucket.as_deref()),
|
||||
download_destination(&s3_key, &workdir, storage.prefix_in_bucket.as_deref()),
|
||||
storage
|
||||
.local_path(&s3_key)
|
||||
.expect("For a valid input, valid S3 info should be parsed"),
|
||||
@@ -645,11 +607,11 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
fn create_s3_key(relative_file_path: &Path, prefix: Option<&str>) -> S3ObjectKey {
|
||||
S3ObjectKey(relative_file_path.iter().fold(
|
||||
fn create_s3_key(relative_file_path: &Path, prefix: Option<&str>) -> RemoteObjectId {
|
||||
RemoteObjectId(relative_file_path.iter().fold(
|
||||
prefix.unwrap_or_default().to_string(),
|
||||
|mut path_string, segment| {
|
||||
path_string.push(S3_PREFIX_SEPARATOR);
|
||||
path_string.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
|
||||
path_string.push_str(segment.to_str().unwrap());
|
||||
path_string
|
||||
},
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
//! Main entry point for the Page Server executable.
|
||||
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use std::{env, ops::ControlFlow, path::Path, str::FromStr, sync::Arc};
|
||||
use std::{env, ops::ControlFlow, path::Path, str::FromStr};
|
||||
use tracing::*;
|
||||
|
||||
use anyhow::{bail, Context, Result};
|
||||
@@ -302,11 +302,13 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
|
||||
let remote_storage = conf
|
||||
.remote_storage_config
|
||||
.as_ref()
|
||||
.map(|storage_config| GenericRemoteStorage::new(conf.workdir.clone(), storage_config))
|
||||
.map(|storage_config| {
|
||||
GenericRemoteStorage::from_config(conf.workdir.clone(), storage_config)
|
||||
})
|
||||
.transpose()
|
||||
.context("Failed to init generic remote storage")?
|
||||
.map(Arc::new);
|
||||
let remote_index = tenant_mgr::init_tenant_mgr(conf, remote_storage.as_ref().map(Arc::clone))?;
|
||||
.context("Failed to init generic remote storage")?;
|
||||
|
||||
let remote_index = tenant_mgr::init_tenant_mgr(conf, remote_storage.clone())?;
|
||||
|
||||
// Spawn a new thread for the http endpoint
|
||||
// bind before launching separate thread so the error reported before startup exits
|
||||
|
||||
@@ -34,7 +34,7 @@ struct State {
|
||||
auth: Option<Arc<JwtAuth>>,
|
||||
remote_index: RemoteIndex,
|
||||
allowlist_routes: Vec<Uri>,
|
||||
remote_storage: Option<Arc<GenericRemoteStorage>>,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
}
|
||||
|
||||
impl State {
|
||||
@@ -42,7 +42,7 @@ impl State {
|
||||
conf: &'static PageServerConf,
|
||||
auth: Option<Arc<JwtAuth>>,
|
||||
remote_index: RemoteIndex,
|
||||
remote_storage: Option<Arc<GenericRemoteStorage>>,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
) -> anyhow::Result<Self> {
|
||||
let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml"]
|
||||
.iter()
|
||||
@@ -659,7 +659,7 @@ pub fn make_router(
|
||||
conf: &'static PageServerConf,
|
||||
auth: Option<Arc<JwtAuth>>,
|
||||
remote_index: RemoteIndex,
|
||||
remote_storage: Option<Arc<GenericRemoteStorage>>,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
) -> anyhow::Result<RouterBuilder<hyper::Body, ApiError>> {
|
||||
let spec = include_bytes!("openapi_spec.yml");
|
||||
let mut router = attach_openapi_ui(endpoint::make_router(), spec, "/swagger.yml", "/v1/doc");
|
||||
|
||||
@@ -150,7 +150,7 @@ use std::{
|
||||
num::{NonZeroU32, NonZeroUsize},
|
||||
ops::ControlFlow,
|
||||
path::{Path, PathBuf},
|
||||
sync::{Arc, Condvar, Mutex},
|
||||
sync::{Condvar, Mutex},
|
||||
};
|
||||
|
||||
use anyhow::{anyhow, bail, Context};
|
||||
@@ -222,7 +222,7 @@ pub struct SyncStartupData {
|
||||
/// Along with that, scans tenant files local and remote (if the sync gets enabled) to check the initial timeline states.
|
||||
pub fn start_local_timeline_sync(
|
||||
config: &'static PageServerConf,
|
||||
storage: Option<Arc<GenericRemoteStorage>>,
|
||||
storage: Option<GenericRemoteStorage>,
|
||||
) -> anyhow::Result<SyncStartupData> {
|
||||
let local_timeline_files = local_tenant_timeline_files(config)
|
||||
.context("Failed to collect local tenant timeline files")?;
|
||||
@@ -766,7 +766,7 @@ pub fn schedule_layer_download(tenant_id: ZTenantId, timeline_id: ZTimelineId) {
|
||||
pub(super) fn spawn_storage_sync_thread(
|
||||
conf: &'static PageServerConf,
|
||||
local_timeline_files: HashMap<ZTenantTimelineId, (TimelineMetadata, HashSet<PathBuf>)>,
|
||||
storage: Arc<GenericRemoteStorage>,
|
||||
storage: GenericRemoteStorage,
|
||||
max_concurrent_timelines_sync: NonZeroUsize,
|
||||
max_sync_errors: NonZeroU32,
|
||||
) -> anyhow::Result<SyncStartupData> {
|
||||
@@ -825,12 +825,12 @@ pub(super) fn spawn_storage_sync_thread(
|
||||
fn storage_sync_loop(
|
||||
runtime: Runtime,
|
||||
conf: &'static PageServerConf,
|
||||
(storage, index, sync_queue): (Arc<GenericRemoteStorage>, RemoteIndex, &SyncQueue),
|
||||
(storage, index, sync_queue): (GenericRemoteStorage, RemoteIndex, &SyncQueue),
|
||||
max_sync_errors: NonZeroU32,
|
||||
) {
|
||||
info!("Starting remote storage sync loop");
|
||||
loop {
|
||||
let loop_storage = Arc::clone(&storage);
|
||||
let loop_storage = storage.clone();
|
||||
|
||||
let (batched_tasks, remaining_queue_length) = sync_queue.next_task_batch();
|
||||
|
||||
@@ -939,7 +939,7 @@ enum UploadStatus {
|
||||
async fn process_batches(
|
||||
conf: &'static PageServerConf,
|
||||
max_sync_errors: NonZeroU32,
|
||||
storage: Arc<GenericRemoteStorage>,
|
||||
storage: GenericRemoteStorage,
|
||||
index: &RemoteIndex,
|
||||
batched_tasks: HashMap<ZTenantTimelineId, SyncTaskBatch>,
|
||||
sync_queue: &SyncQueue,
|
||||
@@ -947,7 +947,7 @@ async fn process_batches(
|
||||
let mut sync_results = batched_tasks
|
||||
.into_iter()
|
||||
.map(|(sync_id, batch)| {
|
||||
let storage = Arc::clone(&storage);
|
||||
let storage = storage.clone();
|
||||
let index = index.clone();
|
||||
async move {
|
||||
let state_update = process_sync_task_batch(
|
||||
@@ -981,7 +981,7 @@ async fn process_batches(
|
||||
|
||||
async fn process_sync_task_batch(
|
||||
conf: &'static PageServerConf,
|
||||
(storage, index, sync_queue): (Arc<GenericRemoteStorage>, RemoteIndex, &SyncQueue),
|
||||
(storage, index, sync_queue): (GenericRemoteStorage, RemoteIndex, &SyncQueue),
|
||||
max_sync_errors: NonZeroU32,
|
||||
sync_id: ZTenantTimelineId,
|
||||
batch: SyncTaskBatch,
|
||||
@@ -1009,7 +1009,7 @@ async fn process_sync_task_batch(
|
||||
ControlFlow::Continue(()) => {
|
||||
upload_timeline_data(
|
||||
conf,
|
||||
(storage.as_ref(), &index, sync_queue),
|
||||
(&storage, &index, sync_queue),
|
||||
current_remote_timeline.as_ref(),
|
||||
sync_id,
|
||||
upload_data,
|
||||
@@ -1020,7 +1020,7 @@ async fn process_sync_task_batch(
|
||||
}
|
||||
ControlFlow::Break(()) => match update_remote_data(
|
||||
conf,
|
||||
storage.as_ref(),
|
||||
&storage,
|
||||
&index,
|
||||
sync_id,
|
||||
RemoteDataUpdate::Upload {
|
||||
@@ -1053,7 +1053,7 @@ async fn process_sync_task_batch(
|
||||
ControlFlow::Continue(()) => {
|
||||
return download_timeline_data(
|
||||
conf,
|
||||
(storage.as_ref(), &index, sync_queue),
|
||||
(&storage, &index, sync_queue),
|
||||
current_remote_timeline.as_ref(),
|
||||
sync_id,
|
||||
download_data,
|
||||
@@ -1086,7 +1086,7 @@ async fn process_sync_task_batch(
|
||||
ControlFlow::Continue(()) => {
|
||||
delete_timeline_data(
|
||||
conf,
|
||||
(storage.as_ref(), &index, sync_queue),
|
||||
(&storage, &index, sync_queue),
|
||||
sync_id,
|
||||
delete_data,
|
||||
sync_start,
|
||||
@@ -1098,7 +1098,7 @@ async fn process_sync_task_batch(
|
||||
ControlFlow::Break(()) => {
|
||||
if let Err(e) = update_remote_data(
|
||||
conf,
|
||||
storage.as_ref(),
|
||||
&storage,
|
||||
&index,
|
||||
sync_id,
|
||||
RemoteDataUpdate::Delete(&delete_data.data.deleted_layers),
|
||||
|
||||
@@ -7,15 +7,15 @@ use futures::stream::{FuturesUnordered, StreamExt};
|
||||
use tracing::{debug, error, info};
|
||||
|
||||
use crate::storage_sync::{SyncQueue, SyncTask};
|
||||
use remote_storage::{GenericRemoteStorage, RemoteStorage};
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use utils::zid::ZTenantTimelineId;
|
||||
|
||||
use super::{LayersDeletion, SyncData};
|
||||
|
||||
/// Attempts to remove the timleline layers from the remote storage.
|
||||
/// If the task had not adjusted the metadata before, the deletion will fail.
|
||||
pub(super) async fn delete_timeline_layers<'a>(
|
||||
storage: &'a GenericRemoteStorage,
|
||||
pub(super) async fn delete_timeline_layers(
|
||||
storage: &GenericRemoteStorage,
|
||||
sync_queue: &SyncQueue,
|
||||
sync_id: ZTenantTimelineId,
|
||||
mut delete_data: SyncData<LayersDeletion>,
|
||||
@@ -43,14 +43,7 @@ pub(super) async fn delete_timeline_layers<'a>(
|
||||
let mut delete_tasks = layers_to_delete
|
||||
.into_iter()
|
||||
.map(|local_layer_path| async {
|
||||
match match storage {
|
||||
GenericRemoteStorage::Local(storage) => {
|
||||
remove_storage_object(storage, &local_layer_path).await
|
||||
}
|
||||
GenericRemoteStorage::S3(storage) => {
|
||||
remove_storage_object(storage, &local_layer_path).await
|
||||
}
|
||||
} {
|
||||
match remove_storage_object(storage, &local_layer_path).await {
|
||||
Ok(()) => Ok(local_layer_path),
|
||||
Err(e) => Err((e, local_layer_path)),
|
||||
}
|
||||
@@ -88,11 +81,10 @@ pub(super) async fn delete_timeline_layers<'a>(
|
||||
errored
|
||||
}
|
||||
|
||||
async fn remove_storage_object<P, S>(storage: &S, local_layer_path: &Path) -> anyhow::Result<()>
|
||||
where
|
||||
P: std::fmt::Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
|
||||
{
|
||||
async fn remove_storage_object(
|
||||
storage: &GenericRemoteStorage,
|
||||
local_layer_path: &Path,
|
||||
) -> anyhow::Result<()> {
|
||||
let storage_path = storage
|
||||
.remote_object_id(local_layer_path)
|
||||
.with_context(|| {
|
||||
@@ -132,7 +124,7 @@ mod tests {
|
||||
let harness = RepoHarness::create("delete_timeline_negative")?;
|
||||
let sync_queue = SyncQueue::new(NonZeroUsize::new(100).unwrap());
|
||||
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
|
||||
let storage = GenericRemoteStorage::Local(LocalFs::new(
|
||||
let storage = GenericRemoteStorage::new(LocalFs::new(
|
||||
tempdir()?.path().to_path_buf(),
|
||||
harness.conf.workdir.clone(),
|
||||
)?);
|
||||
@@ -167,7 +159,7 @@ mod tests {
|
||||
|
||||
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
|
||||
let layer_files = ["a", "b", "c", "d"];
|
||||
let storage = GenericRemoteStorage::Local(LocalFs::new(
|
||||
let storage = GenericRemoteStorage::new(LocalFs::new(
|
||||
tempdir()?.path().to_path_buf(),
|
||||
harness.conf.workdir.clone(),
|
||||
)?);
|
||||
@@ -180,7 +172,8 @@ mod tests {
|
||||
let timeline_upload =
|
||||
create_local_timeline(&harness, TIMELINE_ID, &layer_files, metadata.clone()).await?;
|
||||
for local_path in timeline_upload.layers_to_upload {
|
||||
let remote_path = local_storage.remote_object_id(&local_path)?;
|
||||
let remote_path =
|
||||
local_storage.resolve_in_storage(&local_storage.remote_object_id(&local_path)?)?;
|
||||
let remote_parent_dir = remote_path.parent().unwrap();
|
||||
if !remote_parent_dir.exists() {
|
||||
fs::create_dir_all(&remote_parent_dir).await?;
|
||||
|
||||
@@ -9,9 +9,7 @@ use std::{
|
||||
|
||||
use anyhow::Context;
|
||||
use futures::stream::{FuturesUnordered, StreamExt};
|
||||
use remote_storage::{
|
||||
path_with_suffix_extension, DownloadError, GenericRemoteStorage, RemoteStorage,
|
||||
};
|
||||
use remote_storage::{path_with_suffix_extension, DownloadError, GenericRemoteStorage};
|
||||
use tokio::{
|
||||
fs,
|
||||
io::{self, AsyncWriteExt},
|
||||
@@ -371,68 +369,6 @@ async fn get_timeline_sync_ids(
|
||||
tenant_path: &Path,
|
||||
tenant_id: ZTenantId,
|
||||
) -> anyhow::Result<HashSet<ZTenantTimelineId>> {
|
||||
let timeline_ids: Vec<ZTimelineId> = match storage {
|
||||
GenericRemoteStorage::Local(storage) => list_prefixes(storage, tenant_path)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|timeline_directory_path| {
|
||||
timeline_directory_path
|
||||
.file_stem()
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to get timeline id string from file '{}'",
|
||||
timeline_directory_path.display()
|
||||
)
|
||||
})?
|
||||
.to_string_lossy()
|
||||
.as_ref()
|
||||
.parse()
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"failed to parse directory name '{}' as timeline id",
|
||||
timeline_directory_path.display()
|
||||
)
|
||||
})
|
||||
})
|
||||
.collect::<anyhow::Result<_>>(),
|
||||
GenericRemoteStorage::S3(storage) => list_prefixes(storage, tenant_path)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|s3_path| {
|
||||
s3_path
|
||||
.object_name()
|
||||
.with_context(|| {
|
||||
format!("Failed to get object name out of S3 path {s3_path:?}")
|
||||
})?
|
||||
.parse()
|
||||
.with_context(|| {
|
||||
format!("failed to parse object name '{s3_path:?}' as timeline id")
|
||||
})
|
||||
})
|
||||
.collect::<anyhow::Result<_>>(),
|
||||
}
|
||||
.with_context(|| {
|
||||
format!("Tenant {tenant_id} has at least one incorrect timeline subdirectory")
|
||||
})?;
|
||||
|
||||
if timeline_ids.is_empty() {
|
||||
anyhow::bail!("no timelines found on the remote storage for tenant {tenant_id}")
|
||||
}
|
||||
|
||||
Ok(timeline_ids
|
||||
.into_iter()
|
||||
.map(|timeline_id| ZTenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
|
||||
async fn list_prefixes<P, S>(storage: &S, tenant_path: &Path) -> anyhow::Result<Vec<P>>
|
||||
where
|
||||
P: std::fmt::Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
|
||||
{
|
||||
let tenant_storage_path = storage.remote_object_id(tenant_path).with_context(|| {
|
||||
format!(
|
||||
"Failed to get tenant storage path for local path '{}'",
|
||||
@@ -440,14 +376,37 @@ where
|
||||
)
|
||||
})?;
|
||||
|
||||
storage
|
||||
let timelines = storage
|
||||
.list_prefixes(Some(&tenant_storage_path))
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to list tenant storage path {tenant_storage_path:?} to get remote timelines to download"
|
||||
)
|
||||
})
|
||||
})?;
|
||||
|
||||
if timelines.is_empty() {
|
||||
anyhow::bail!("no timelines found on the remote storage")
|
||||
}
|
||||
|
||||
let mut sync_ids = HashSet::new();
|
||||
|
||||
for timeline_remote_storage_key in timelines {
|
||||
let object_name = timeline_remote_storage_key.object_name().ok_or_else(|| {
|
||||
anyhow::anyhow!("failed to get timeline id for remote tenant {tenant_id}")
|
||||
})?;
|
||||
|
||||
let timeline_id: ZTimelineId = object_name.parse().with_context(|| {
|
||||
format!("failed to parse object name into timeline id '{object_name}'")
|
||||
})?;
|
||||
|
||||
sync_ids.insert(ZTenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(sync_ids)
|
||||
}
|
||||
|
||||
async fn fsync_path(path: impl AsRef<Path>) -> Result<(), io::Error> {
|
||||
@@ -459,6 +418,7 @@ mod tests {
|
||||
use std::{
|
||||
collections::{BTreeSet, HashSet},
|
||||
num::NonZeroUsize,
|
||||
path::PathBuf,
|
||||
};
|
||||
|
||||
use remote_storage::{LocalFs, RemoteStorage};
|
||||
@@ -482,7 +442,7 @@ mod tests {
|
||||
|
||||
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
|
||||
let layer_files = ["a", "b", "layer_to_skip", "layer_to_keep_locally"];
|
||||
let storage = GenericRemoteStorage::Local(LocalFs::new(
|
||||
let storage = GenericRemoteStorage::new(LocalFs::new(
|
||||
tempdir()?.path().to_owned(),
|
||||
harness.conf.workdir.clone(),
|
||||
)?);
|
||||
@@ -494,7 +454,8 @@ mod tests {
|
||||
create_local_timeline(&harness, TIMELINE_ID, &layer_files, metadata.clone()).await?;
|
||||
|
||||
for local_path in timeline_upload.layers_to_upload {
|
||||
let remote_path = local_storage.remote_object_id(&local_path)?;
|
||||
let remote_path =
|
||||
local_storage.resolve_in_storage(&storage.remote_object_id(&local_path)?)?;
|
||||
let remote_parent_dir = remote_path.parent().unwrap();
|
||||
if !remote_parent_dir.exists() {
|
||||
fs::create_dir_all(&remote_parent_dir).await?;
|
||||
@@ -580,7 +541,7 @@ mod tests {
|
||||
let harness = RepoHarness::create("download_timeline_negatives")?;
|
||||
let sync_queue = SyncQueue::new(NonZeroUsize::new(100).unwrap());
|
||||
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
|
||||
let storage = GenericRemoteStorage::Local(LocalFs::new(
|
||||
let storage = GenericRemoteStorage::new(LocalFs::new(
|
||||
tempdir()?.path().to_owned(),
|
||||
harness.conf.workdir.clone(),
|
||||
)?);
|
||||
@@ -639,7 +600,7 @@ mod tests {
|
||||
let harness = RepoHarness::create("test_download_index_part")?;
|
||||
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
|
||||
|
||||
let storage = GenericRemoteStorage::Local(LocalFs::new(
|
||||
let storage = GenericRemoteStorage::new(LocalFs::new(
|
||||
tempdir()?.path().to_owned(),
|
||||
harness.conf.workdir.clone(),
|
||||
)?);
|
||||
@@ -663,9 +624,10 @@ mod tests {
|
||||
let local_index_part_path =
|
||||
metadata_path(harness.conf, sync_id.timeline_id, sync_id.tenant_id)
|
||||
.with_file_name(IndexPart::FILE_NAME);
|
||||
let storage_path = local_storage.remote_object_id(&local_index_part_path)?;
|
||||
fs::create_dir_all(storage_path.parent().unwrap()).await?;
|
||||
fs::write(&storage_path, serde_json::to_vec(&index_part)?).await?;
|
||||
let index_part_remote_id = local_storage.remote_object_id(&local_index_part_path)?;
|
||||
let index_part_local_path = PathBuf::from(String::from(index_part_remote_id));
|
||||
fs::create_dir_all(index_part_local_path.parent().unwrap()).await?;
|
||||
fs::write(&index_part_local_path, serde_json::to_vec(&index_part)?).await?;
|
||||
|
||||
let downloaded_index_part = download_index_part(harness.conf, &storage, sync_id).await?;
|
||||
|
||||
|
||||
@@ -34,7 +34,11 @@ pub(super) async fn upload_index_part(
|
||||
let index_part_path = metadata_path(conf, sync_id.timeline_id, sync_id.tenant_id)
|
||||
.with_file_name(IndexPart::FILE_NAME);
|
||||
storage
|
||||
.upload_storage_object(index_part_bytes, index_part_size, &index_part_path)
|
||||
.upload_storage_object(
|
||||
Box::new(index_part_bytes),
|
||||
index_part_size,
|
||||
&index_part_path,
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("Failed to upload index part for '{sync_id}'"))
|
||||
}
|
||||
@@ -119,7 +123,7 @@ pub(super) async fn upload_timeline_layers<'a>(
|
||||
.len() as usize;
|
||||
|
||||
match storage
|
||||
.upload_storage_object(source_file, source_size, &source_path)
|
||||
.upload_storage_object(Box::new(source_file), source_size, &source_path)
|
||||
.await
|
||||
.with_context(|| format!("Failed to upload layer file for {sync_id}"))
|
||||
{
|
||||
@@ -214,8 +218,8 @@ mod tests {
|
||||
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
|
||||
|
||||
let layer_files = ["a", "b"];
|
||||
let storage = GenericRemoteStorage::Local(LocalFs::new(
|
||||
tempdir()?.path().to_owned(),
|
||||
let storage = GenericRemoteStorage::new(LocalFs::new(
|
||||
tempdir()?.path().to_path_buf(),
|
||||
harness.conf.workdir.clone(),
|
||||
)?);
|
||||
let local_storage = storage.as_local().unwrap();
|
||||
@@ -302,7 +306,7 @@ mod tests {
|
||||
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
|
||||
|
||||
let layer_files = ["a1", "b1"];
|
||||
let storage = GenericRemoteStorage::Local(LocalFs::new(
|
||||
let storage = GenericRemoteStorage::new(LocalFs::new(
|
||||
tempdir()?.path().to_owned(),
|
||||
harness.conf.workdir.clone(),
|
||||
)?);
|
||||
@@ -395,7 +399,7 @@ mod tests {
|
||||
let harness = RepoHarness::create("test_upload_index_part")?;
|
||||
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
|
||||
|
||||
let storage = GenericRemoteStorage::Local(LocalFs::new(
|
||||
let storage = GenericRemoteStorage::new(LocalFs::new(
|
||||
tempdir()?.path().to_owned(),
|
||||
harness.conf.workdir.clone(),
|
||||
)?);
|
||||
@@ -431,13 +435,13 @@ mod tests {
|
||||
|
||||
let index_part_path = storage_files.first().unwrap();
|
||||
assert_eq!(
|
||||
index_part_path.file_name().and_then(|name| name.to_str()),
|
||||
index_part_path.object_name(),
|
||||
Some(IndexPart::FILE_NAME),
|
||||
"Remote index part should have the correct name"
|
||||
);
|
||||
|
||||
let remote_index_part: IndexPart =
|
||||
serde_json::from_slice(&fs::read(&index_part_path).await?)?;
|
||||
let remote_index_part: IndexPart = serde_json::from_slice(
|
||||
&fs::read(local_storage.resolve_in_storage(index_part_path)?).await?,
|
||||
)?;
|
||||
assert_eq!(
|
||||
index_part, remote_index_part,
|
||||
"Remote index part should match the local one"
|
||||
|
||||
@@ -134,7 +134,7 @@ impl fmt::Display for TenantState {
|
||||
/// are scheduled for download and added to the repository once download is completed.
|
||||
pub fn init_tenant_mgr(
|
||||
conf: &'static PageServerConf,
|
||||
remote_storage: Option<Arc<GenericRemoteStorage>>,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
) -> anyhow::Result<RemoteIndex> {
|
||||
let (timeline_updates_sender, timeline_updates_receiver) =
|
||||
mpsc::unbounded_channel::<LocalTimelineUpdate>();
|
||||
|
||||
@@ -127,7 +127,8 @@ async fn wal_backup_launcher_main_loop(
|
||||
let conf_ = conf.clone();
|
||||
REMOTE_STORAGE.get_or_init(|| {
|
||||
conf_.remote_storage.as_ref().map(|c| {
|
||||
GenericRemoteStorage::new(conf_.workdir, c).expect("failed to create remote storage")
|
||||
GenericRemoteStorage::from_config(conf_.workdir, c)
|
||||
.expect("failed to create remote storage")
|
||||
})
|
||||
});
|
||||
|
||||
@@ -417,7 +418,11 @@ fn get_segments(start: Lsn, end: Lsn, seg_size: usize) -> Vec<Segment> {
|
||||
static REMOTE_STORAGE: OnceCell<Option<GenericRemoteStorage>> = OnceCell::new();
|
||||
|
||||
async fn backup_object(source_file: &Path, size: usize) -> Result<()> {
|
||||
let storage = REMOTE_STORAGE.get().expect("failed to get remote storage");
|
||||
let storage = REMOTE_STORAGE
|
||||
.get()
|
||||
.expect("failed to get remote storage")
|
||||
.as_ref()
|
||||
.unwrap();
|
||||
|
||||
let file = tokio::io::BufReader::new(File::open(&source_file).await.with_context(|| {
|
||||
format!(
|
||||
@@ -427,9 +432,7 @@ async fn backup_object(source_file: &Path, size: usize) -> Result<()> {
|
||||
})?);
|
||||
|
||||
storage
|
||||
.as_ref()
|
||||
.expect("Storage should be initialized by launcher at this point.")
|
||||
.upload_storage_object(file, size, source_file)
|
||||
.upload_storage_object(Box::new(file), size, source_file)
|
||||
.await
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user