Reduce "relish" word usages in remote storage

This commit is contained in:
Kirill Bulatov
2021-11-04 01:43:22 +02:00
committed by Kirill Bulatov
parent 956fc3dec9
commit f36acf00de
11 changed files with 261 additions and 274 deletions

View File

@@ -129,7 +129,7 @@ There are the following implementations present:
* local filesystem — to use in tests mainly
* AWS S3 - to use in production
Implementation details are covered in the [storage readme](./src/relish_storage/README.md) and corresponding Rust file docs.
Implementation details are covered in the [backup readme](./src/remote_storage/README.md) and corresponding Rust file docs.
The backup service is disabled by default and can be enabled to interact with a single remote storage.
@@ -140,18 +140,18 @@ CLI examples:
For Amazon AWS S3, a key id and secret access key could be located in `~/.aws/credentials` if awscli was ever configured to work with the desired bucket, on the AWS Settings page for a certain user. Also note, that the bucket names does not contain any protocols when used on AWS.
For local S3 installations, refer to the their documentation for name format and credentials.
Similar to other pageserver settings, toml config file can be used to configure either of the storages as backup backup targets.
Similar to other pageserver settings, toml config file can be used to configure either of the storages as backup targets.
Required sections are:
```toml
[relish_storage]
[remote_storage]
local_path = '/Users/someonetoignore/Downloads/tmp_dir/'
```
or
```toml
[relish_storage]
[remote_storage]
bucket_name = 'some-sample-bucket'
bucket_region = 'eu-north-1'
access_key_id = 'SOMEKEYAAAAASADSAH*#'

View File

@@ -26,8 +26,8 @@ use clap::{App, Arg, ArgMatches};
use daemonize::Daemonize;
use pageserver::{
branches, defaults::*, http, page_service, relish_storage, tenant_mgr, PageServerConf,
RelishStorageConfig, RelishStorageKind, S3Config, LOG_FILE_NAME,
branches, defaults::*, http, page_service, remote_storage, tenant_mgr, PageServerConf,
RemoteStorageConfig, RemoteStorageKind, S3Config, LOG_FILE_NAME,
};
use zenith_utils::http::endpoint;
use zenith_utils::postgres_backend;
@@ -47,22 +47,22 @@ struct CfgFileParams {
pg_distrib_dir: Option<String>,
auth_validation_public_key_path: Option<String>,
auth_type: Option<String>,
relish_storage_max_concurrent_sync: Option<String>,
remote_storage_max_concurrent_sync: Option<String>,
/////////////////////////////////
//// Don't put `Option<String>` and other "simple" values below.
////
/// `Option<RelishStorage>` is a <a href='https://toml.io/en/v1.0.0#table'>table</a> in TOML.
/// `Option<RemoteStorage>` is a <a href='https://toml.io/en/v1.0.0#table'>table</a> in TOML.
/// Values in TOML cannot be defined after tables (other tables can),
/// and [`toml`] crate serializes all fields in the order of their appearance.
////////////////////////////////
relish_storage: Option<RelishStorage>,
remote_storage: Option<RemoteStorage>,
}
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone)]
// Without this attribute, enums with values won't be serialized by the `toml` library (but can be deserialized nonetheless!).
// See https://github.com/alexcrichton/toml-rs/blob/6c162e6562c3e432bf04c82a3d1d789d80761a86/examples/enum_external.rs for the examples
#[serde(untagged)]
enum RelishStorage {
enum RemoteStorage {
Local {
local_path: String,
},
@@ -83,12 +83,12 @@ impl CfgFileParams {
arg_matches.value_of(arg_name).map(str::to_owned)
};
let relish_storage = if let Some(local_path) = get_arg("relish-storage-local-path") {
Some(RelishStorage::Local { local_path })
let remote_storage = if let Some(local_path) = get_arg("relish-storage-local-path") {
Some(RemoteStorage::Local { local_path })
} else if let Some((bucket_name, bucket_region)) =
get_arg("relish-storage-s3-bucket").zip(get_arg("relish-storage-region"))
{
Some(RelishStorage::AwsS3 {
Some(RemoteStorage::AwsS3 {
bucket_name,
bucket_region,
access_key_id: get_arg("relish-storage-access-key"),
@@ -109,8 +109,8 @@ impl CfgFileParams {
pg_distrib_dir: get_arg("postgres-distrib"),
auth_validation_public_key_path: get_arg("auth-validation-public-key-path"),
auth_type: get_arg("auth-type"),
relish_storage,
relish_storage_max_concurrent_sync: get_arg("relish-storage-max-concurrent-sync"),
remote_storage,
remote_storage_max_concurrent_sync: get_arg("relish-storage-max-concurrent-sync"),
}
}
@@ -130,10 +130,10 @@ impl CfgFileParams {
.auth_validation_public_key_path
.or(other.auth_validation_public_key_path),
auth_type: self.auth_type.or(other.auth_type),
relish_storage: self.relish_storage.or(other.relish_storage),
relish_storage_max_concurrent_sync: self
.relish_storage_max_concurrent_sync
.or(other.relish_storage_max_concurrent_sync),
remote_storage: self.remote_storage.or(other.remote_storage),
remote_storage_max_concurrent_sync: self
.remote_storage_max_concurrent_sync
.or(other.remote_storage_max_concurrent_sync),
}
}
@@ -207,30 +207,28 @@ impl CfgFileParams {
);
}
let max_concurrent_sync = match self.relish_storage_max_concurrent_sync.as_deref() {
Some(relish_storage_max_concurrent_sync) => {
relish_storage_max_concurrent_sync.parse()?
}
None => DEFAULT_RELISH_STORAGE_MAX_CONCURRENT_SYNC_LIMITS,
let max_concurrent_sync = match self.remote_storage_max_concurrent_sync.as_deref() {
Some(number_str) => number_str.parse()?,
None => DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNC_LIMITS,
};
let relish_storage_config = self.relish_storage.as_ref().map(|storage_params| {
let remote_storage_config = self.remote_storage.as_ref().map(|storage_params| {
let storage = match storage_params.clone() {
RelishStorage::Local { local_path } => {
RelishStorageKind::LocalFs(PathBuf::from(local_path))
RemoteStorage::Local { local_path } => {
RemoteStorageKind::LocalFs(PathBuf::from(local_path))
}
RelishStorage::AwsS3 {
RemoteStorage::AwsS3 {
bucket_name,
bucket_region,
access_key_id,
secret_access_key,
} => RelishStorageKind::AwsS3(S3Config {
} => RemoteStorageKind::AwsS3(S3Config {
bucket_name,
bucket_region,
access_key_id,
secret_access_key,
}),
};
RelishStorageConfig {
RemoteStorageConfig {
max_concurrent_sync,
storage,
}
@@ -255,7 +253,7 @@ impl CfgFileParams {
auth_validation_public_key_path,
auth_type,
relish_storage_config,
remote_storage_config,
})
}
}
@@ -526,7 +524,7 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> {
// don't spawn threads before daemonizing
let mut join_handles = Vec::new();
if let Some(handle) = relish_storage::run_storage_sync_thread(conf)? {
if let Some(handle) = remote_storage::run_storage_sync_thread(conf)? {
join_handles.push(handle);
}
// Initialize tenant manager.
@@ -622,11 +620,11 @@ mod tests {
"auth_validation_public_key_path_VALUE".to_string(),
),
auth_type: Some("auth_type_VALUE".to_string()),
relish_storage: Some(RelishStorage::Local {
local_path: "relish_storage_local_VALUE".to_string(),
remote_storage: Some(RemoteStorage::Local {
local_path: "remote_storage_local_VALUE".to_string(),
}),
relish_storage_max_concurrent_sync: Some(
"relish_storage_max_concurrent_sync_VALUE".to_string(),
remote_storage_max_concurrent_sync: Some(
"remote_storage_max_concurrent_sync_VALUE".to_string(),
),
};
@@ -644,10 +642,10 @@ open_mem_limit = 'open_mem_limit_VALUE'
pg_distrib_dir = 'pg_distrib_dir_VALUE'
auth_validation_public_key_path = 'auth_validation_public_key_path_VALUE'
auth_type = 'auth_type_VALUE'
relish_storage_max_concurrent_sync = 'relish_storage_max_concurrent_sync_VALUE'
remote_storage_max_concurrent_sync = 'remote_storage_max_concurrent_sync_VALUE'
[relish_storage]
local_path = 'relish_storage_local_VALUE'
[remote_storage]
local_path = 'remote_storage_local_VALUE'
"#,
toml_pretty_string
);
@@ -681,14 +679,14 @@ local_path = 'relish_storage_local_VALUE'
"auth_validation_public_key_path_VALUE".to_string(),
),
auth_type: Some("auth_type_VALUE".to_string()),
relish_storage: Some(RelishStorage::AwsS3 {
remote_storage: Some(RemoteStorage::AwsS3 {
bucket_name: "bucket_name_VALUE".to_string(),
bucket_region: "bucket_region_VALUE".to_string(),
access_key_id: Some("access_key_id_VALUE".to_string()),
secret_access_key: Some("secret_access_key_VALUE".to_string()),
}),
relish_storage_max_concurrent_sync: Some(
"relish_storage_max_concurrent_sync_VALUE".to_string(),
remote_storage_max_concurrent_sync: Some(
"remote_storage_max_concurrent_sync_VALUE".to_string(),
),
};
@@ -706,9 +704,9 @@ open_mem_limit = 'open_mem_limit_VALUE'
pg_distrib_dir = 'pg_distrib_dir_VALUE'
auth_validation_public_key_path = 'auth_validation_public_key_path_VALUE'
auth_type = 'auth_type_VALUE'
relish_storage_max_concurrent_sync = 'relish_storage_max_concurrent_sync_VALUE'
remote_storage_max_concurrent_sync = 'remote_storage_max_concurrent_sync_VALUE'
[relish_storage]
[remote_storage]
bucket_name = 'bucket_name_VALUE'
bucket_region = 'bucket_region_VALUE'
"#,
@@ -721,7 +719,7 @@ bucket_region = 'bucket_region_VALUE'
.expect("Failed to deserialize the prettified serialization result of the config");
let mut expected_params = params;
expected_params.relish_storage = Some(RelishStorage::AwsS3 {
expected_params.remote_storage = Some(RemoteStorage::AwsS3 {
bucket_name: "bucket_name_VALUE".to_string(),
bucket_region: "bucket_region_VALUE".to_string(),
access_key_id: None,

View File

@@ -32,7 +32,7 @@ use std::time::{Duration, Instant};
use self::metadata::{metadata_path, TimelineMetadata};
use crate::relish::*;
use crate::relish_storage::schedule_timeline_upload;
use crate::remote_storage::schedule_timeline_upload;
use crate::repository::{GcResult, Repository, Timeline, TimelineWriter, WALRecord};
use crate::tenant_mgr;
use crate::walreceiver;

View File

@@ -1,7 +1,7 @@
//! Every image of a certain timeline from [`crate::layered_repository::LayeredRepository`]
//! has a metadata that needs to be stored persistently.
//!
//! Later, the file gets is used in [`crate::relish_storage::storage_sync`] as a part of
//! Later, the file gets is used in [`crate::remote_storage::storage_sync`] as a part of
//! external storage import and export operations.
//!
//! The module contains all structs and related helper methods related to timeline metadata.

View File

@@ -14,7 +14,7 @@ pub mod http;
pub mod layered_repository;
pub mod page_service;
pub mod relish;
pub mod relish_storage;
pub mod remote_storage;
pub mod repository;
pub mod restore_local_repo;
pub mod tenant_mgr;
@@ -42,7 +42,7 @@ pub mod defaults {
pub const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100);
pub const DEFAULT_SUPERUSER: &str = "zenith_admin";
pub const DEFAULT_RELISH_STORAGE_MAX_CONCURRENT_SYNC_LIMITS: usize = 100;
pub const DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNC_LIMITS: usize = 100;
pub const DEFAULT_OPEN_MEM_LIMIT: usize = 128 * 1024 * 1024;
}
@@ -88,7 +88,7 @@ pub struct PageServerConf {
pub auth_type: AuthType,
pub auth_validation_public_key_path: Option<PathBuf>,
pub relish_storage_config: Option<RelishStorageConfig>,
pub remote_storage_config: Option<RemoteStorageConfig>,
}
impl PageServerConf {
@@ -165,7 +165,7 @@ impl PageServerConf {
pg_distrib_dir: "".into(),
auth_type: AuthType::Trust,
auth_validation_public_key_path: None,
relish_storage_config: None,
remote_storage_config: None,
}
}
}
@@ -179,18 +179,18 @@ pub enum CheckpointConfig {
Forced,
}
/// External relish storage configuration, enough for creating a client for that storage.
/// External backup storage configuration, enough for creating a client for that storage.
#[derive(Debug, Clone)]
pub struct RelishStorageConfig {
/// Limits the number of concurrent sync operations between pageserver and relish storage.
pub struct RemoteStorageConfig {
/// Limits the number of concurrent sync operations between pageserver and the remote storage.
pub max_concurrent_sync: usize,
/// The storage connection configuration.
pub storage: RelishStorageKind,
pub storage: RemoteStorageKind,
}
/// A kind of a relish storage to connect to, with its connection configuration.
/// A kind of a remote storage to connect to, with its connection configuration.
#[derive(Debug, Clone)]
pub enum RelishStorageKind {
pub enum RemoteStorageKind {
/// Storage based on local file system.
/// Specify a root folder to place all stored relish data into.
LocalFs(PathBuf),

View File

@@ -3,7 +3,7 @@
//! No other modules from this tree are supposed to be used directly by the external code.
//!
//! There are a few components the storage machinery consists of:
//! * [`RelishStorage`] trait a CRUD-like generic abstraction to use for adapting external storages with a few implementations:
//! * [`RemoteStorage`] trait a CRUD-like generic abstraction to use for adapting external storages with a few implementations:
//! * [`local_fs`] allows to use local file system as an external storage
//! * [`rust_s3`] uses AWS S3 bucket entirely as an external storage
//!
@@ -11,7 +11,7 @@
//!
//! * public API via to interact with the external world: [`run_storage_sync_thread`] and [`schedule_timeline_upload`]
//!
//! Here's a schematic overview of all interactions relish storage and the rest of the pageserver perform:
//! Here's a schematic overview of all interactions backup and the rest of the pageserver perform:
//!
//! +------------------------+ +--------->-------+
//! | | - - - (init async loop) - - - -> | |
@@ -29,7 +29,7 @@
//! V
//! +------------------------+
//! | |
//! | [`RelishStorage`] impl |
//! | [`RemoteStorage`] impl |
//! | |
//! | pageserver assumes it |
//! | owns exclusive write |
@@ -56,7 +56,7 @@
//! When the pageserver terminates, the upload loop finishes a current image sync task (if any) and exits.
//!
//! NOTES:
//! * pageserver assumes it has exclusive write access to the relish storage. If supported, the way multiple pageservers can be separated in the same storage
//! * pageserver assumes it has exclusive write access to the remote storage. If supported, the way multiple pageservers can be separated in the same storage
//! (i.e. using different directories in the local filesystem external storage), but totally up to the storage implementation and not covered with the trait API.
//!
//! * the uploads do not happen right after pageserver startup, they are registered when
@@ -65,7 +65,7 @@
//!
//! * the uploads do not happen right after the upload registration: the sync loop might be occupied with other tasks, or tasks with bigger priority could be waiting already
//!
//! * all synchronization tasks (including the public API to register uploads and downloads and the sync queue management) happens on an image scale: a big set of relish files,
//! * all synchronization tasks (including the public API to register uploads and downloads and the sync queue management) happens on an image scale: a big set of remote files,
//! enough to represent (and recover, if needed) a certain timeline state. On the contrary, all internal storage CRUD calls are made per reilsh file from those images.
//! This way, the synchronization is able to download the image partially, if some state was synced before, but exposes correctly synced images only.
@@ -86,7 +86,7 @@ pub use self::storage_sync::schedule_timeline_upload;
use self::{local_fs::LocalFs, rust_s3::S3};
use crate::{
layered_repository::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME},
PageServerConf, RelishStorageKind,
PageServerConf, RemoteStorageKind,
};
/// Based on the config, initiates the remote storage connection and starts a separate thread
@@ -95,16 +95,16 @@ use crate::{
pub fn run_storage_sync_thread(
config: &'static PageServerConf,
) -> anyhow::Result<Option<thread::JoinHandle<anyhow::Result<()>>>> {
match &config.relish_storage_config {
Some(relish_storage_config) => {
let max_concurrent_sync = relish_storage_config.max_concurrent_sync;
let handle = match &relish_storage_config.storage {
RelishStorageKind::LocalFs(root) => storage_sync::spawn_storage_sync_thread(
match &config.remote_storage_config {
Some(storage_config) => {
let max_concurrent_sync = storage_config.max_concurrent_sync;
let handle = match &storage_config.storage {
RemoteStorageKind::LocalFs(root) => storage_sync::spawn_storage_sync_thread(
config,
LocalFs::new(root.clone(), &config.workdir)?,
max_concurrent_sync,
),
RelishStorageKind::AwsS3(s3_config) => storage_sync::spawn_storage_sync_thread(
RemoteStorageKind::AwsS3(s3_config) => storage_sync::spawn_storage_sync_thread(
config,
S3::new(s3_config, &config.workdir)?,
max_concurrent_sync,
@@ -120,39 +120,39 @@ pub fn run_storage_sync_thread(
/// This storage tries to be unaware of any layered repository context,
/// providing basic CRUD operations with storage files.
#[async_trait::async_trait]
trait RelishStorage: Send + Sync {
/// A way to uniquely reference relish in the remote storage.
type RelishStoragePath;
trait RemoteStorage: Send + Sync {
/// A way to uniquely reference a file in the remote storage.
type StoragePath;
/// Attempts to derive the storage path out of the local path, if the latter is correct.
fn storage_path(&self, local_path: &Path) -> anyhow::Result<Self::RelishStoragePath>;
fn storage_path(&self, local_path: &Path) -> anyhow::Result<Self::StoragePath>;
/// Gets the layered storage information about the given entry.
fn info(&self, storage_path: &Self::RelishStoragePath) -> anyhow::Result<RemoteRelishInfo>;
fn info(&self, storage_path: &Self::StoragePath) -> anyhow::Result<RemoteFileInfo>;
/// Lists all items the storage has right now.
async fn list_relishes(&self) -> anyhow::Result<Vec<Self::RelishStoragePath>>;
async fn list(&self) -> anyhow::Result<Vec<Self::StoragePath>>;
/// Streams the local file contents into remote into the remote storage entry.
async fn upload_relish(
async fn upload(
&self,
from: &mut (impl io::AsyncRead + Unpin + Send),
to: &Self::RelishStoragePath,
from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
to: &Self::StoragePath,
) -> anyhow::Result<()>;
/// Streams the remote storage entry contents into the buffered writer given, returns the filled writer.
async fn download_relish(
async fn download(
&self,
from: &Self::RelishStoragePath,
to: &mut (impl io::AsyncWrite + Unpin + Send),
from: &Self::StoragePath,
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
) -> anyhow::Result<()>;
async fn delete_relish(&self, path: &Self::RelishStoragePath) -> anyhow::Result<()>;
async fn delete(&self, path: &Self::StoragePath) -> anyhow::Result<()>;
}
/// Information about a certain remote storage entry.
#[derive(Debug, PartialEq, Eq)]
struct RemoteRelishInfo {
struct RemoteFileInfo {
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
/// Path in the pageserver workdir where the file should go to.

View File

@@ -1,7 +1,7 @@
# Non-implementation details
This document describes the current state of the backup system in pageserver, existing limitations and concerns, why some things are done the way they are the future development plans.
Detailed description on how the synchronization works and how it fits into the rest of the pageserver can be found in the [storage module](./../relish_storage.rs) and its submodules.
Detailed description on how the synchronization works and how it fits into the rest of the pageserver can be found in the [storage module](./../remote_storage.rs) and its submodules.
Ideally, this document should disappear after current implementation concerns are mitigated, with the remaining useful knowledge bits moved into rustdocs.
## Approach
@@ -28,7 +28,7 @@ As mentioned, the backup component is rather new and under development currently
Here's the list of known compromises with comments:
* Remote storage model is the same as the `tenants/` directory contents of the pageserver's local workdir storage.
This is relatively simple to implement, but may be costly to use in AWS S3: an initial data image contains ~782 relish file and a metadata file, ~31 MB combined.
This is relatively simple to implement, but may be costly to use in AWS S3: an initial data image contains ~782 relish files and a metadata file, ~31 MB combined.
AWS charges per API call and for traffic either, layers are expected to be updated frequently, so this model most probably is ineffective.
Additionally, pageservers might need to migrate images between tenants, which does not improve the situation.

View File

@@ -1,4 +1,4 @@
//! Local filesystem relish storage.
//! Local filesystem acting as a remote storage.
//! Multiple pageservers can use the same "storage" of this kind by using different storage roots.
//!
//! This storage used in pageserver tests, but can also be used in cases when a certain persistent
@@ -20,7 +20,7 @@ use tracing::*;
use crate::layered_repository::metadata::METADATA_FILE_NAME;
use super::{parse_ids_from_path, strip_path_prefix, RelishStorage, RemoteRelishInfo};
use super::{parse_ids_from_path, strip_path_prefix, RemoteFileInfo, RemoteStorage};
pub struct LocalFs {
pageserver_workdir: &'static Path,
@@ -28,7 +28,7 @@ pub struct LocalFs {
}
impl LocalFs {
/// Attempts to create local FS relish storage, along with the storage root directory.
/// Attempts to create local FS storage, along with its root directory.
pub fn new(root: PathBuf, pageserver_workdir: &'static Path) -> anyhow::Result<Self> {
if !root.exists() {
std::fs::create_dir_all(&root).with_context(|| {
@@ -59,17 +59,17 @@ impl LocalFs {
}
#[async_trait::async_trait]
impl RelishStorage for LocalFs {
type RelishStoragePath = PathBuf;
impl RemoteStorage for LocalFs {
type StoragePath = PathBuf;
fn storage_path(&self, local_path: &Path) -> anyhow::Result<Self::RelishStoragePath> {
fn storage_path(&self, local_path: &Path) -> anyhow::Result<Self::StoragePath> {
Ok(self.root.join(
strip_path_prefix(self.pageserver_workdir, local_path)
.context("local path does not belong to this storage")?,
))
}
fn info(&self, storage_path: &Self::RelishStoragePath) -> anyhow::Result<RemoteRelishInfo> {
fn info(&self, storage_path: &Self::StoragePath) -> anyhow::Result<RemoteFileInfo> {
let is_metadata =
storage_path.file_name().and_then(OsStr::to_str) == Some(METADATA_FILE_NAME);
let relative_path = strip_path_prefix(&self.root, storage_path)
@@ -79,7 +79,7 @@ impl RelishStorage for LocalFs {
relative_path.iter().filter_map(|segment| segment.to_str()),
&relative_path.display(),
)?;
Ok(RemoteRelishInfo {
Ok(RemoteFileInfo {
tenant_id,
timeline_id,
download_destination,
@@ -87,14 +87,14 @@ impl RelishStorage for LocalFs {
})
}
async fn list_relishes(&self) -> anyhow::Result<Vec<Self::RelishStoragePath>> {
async fn list(&self) -> anyhow::Result<Vec<Self::StoragePath>> {
Ok(get_all_files(&self.root).await?.into_iter().collect())
}
async fn upload_relish(
async fn upload(
&self,
from: &mut (impl io::AsyncRead + Unpin + Send),
to: &Self::RelishStoragePath,
mut from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
to: &Self::StoragePath,
) -> anyhow::Result<()> {
let target_file_path = self.resolve_in_storage(to)?;
create_target_directory(&target_file_path).await?;
@@ -112,20 +112,20 @@ impl RelishStorage for LocalFs {
})?,
);
io::copy(from, &mut destination)
io::copy(&mut from, &mut destination)
.await
.context("Failed to upload relish to local storage")?;
.context("Failed to upload a file to the local storage")?;
destination
.flush()
.await
.context("Failed to upload relish to local storage")?;
.context("Failed to upload a file to the local storage")?;
Ok(())
}
async fn download_relish(
async fn download(
&self,
from: &Self::RelishStoragePath,
to: &mut (impl io::AsyncWrite + Unpin + Send),
from: &Self::StoragePath,
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
) -> anyhow::Result<()> {
let file_path = self.resolve_in_storage(from)?;
@@ -144,7 +144,7 @@ impl RelishStorage for LocalFs {
);
io::copy(&mut source, to)
.await
.context("Failed to download the relish file")?;
.context("Failed to download a file from the local storage")?;
Ok(())
} else {
bail!(
@@ -154,7 +154,7 @@ impl RelishStorage for LocalFs {
}
}
async fn delete_relish(&self, path: &Self::RelishStoragePath) -> anyhow::Result<()> {
async fn delete(&self, path: &Self::StoragePath) -> anyhow::Result<()> {
let file_path = self.resolve_in_storage(path)?;
if file_path.exists() && file_path.is_file() {
Ok(fs::remove_file(file_path).await?)
@@ -204,7 +204,7 @@ async fn create_target_directory(target_file_path: &Path) -> anyhow::Result<()>
let target_dir = match target_file_path.parent() {
Some(parent_dir) => parent_dir,
None => bail!(
"Relish path '{}' has no parent directory",
"File path '{}' has no parent directory",
target_file_path.display()
),
};
@@ -218,7 +218,7 @@ async fn create_target_directory(target_file_path: &Path) -> anyhow::Result<()>
mod pure_tests {
use crate::{
layered_repository::metadata::METADATA_FILE_NAME,
relish_storage::test_utils::{
remote_storage::test_utils::{
custom_tenant_id_path, custom_timeline_id_path, relative_timeline_path,
},
repository::repo_harness::{RepoHarness, TIMELINE_ID},
@@ -235,13 +235,13 @@ mod pure_tests {
root: storage_root.clone(),
};
let local_path = repo_harness.timeline_path(&TIMELINE_ID).join("relish_name");
let local_path = repo_harness.timeline_path(&TIMELINE_ID).join("file_name");
let expected_path = storage_root.join(local_path.strip_prefix(&repo_harness.conf.workdir)?);
assert_eq!(
expected_path,
storage.storage_path(&local_path).expect("Matching path should map to storage path normally"),
"Relish paths from pageserver workdir should be stored in local fs storage with the same path they have relative to the workdir"
"File paths from pageserver workdir should be stored in local fs storage with the same path they have relative to the workdir"
);
Ok(())
@@ -299,7 +299,7 @@ mod pure_tests {
let name = "not a metadata";
let local_path = repo_harness.timeline_path(&TIMELINE_ID).join(name);
assert_eq!(
RemoteRelishInfo {
RemoteFileInfo {
tenant_id: repo_harness.tenant_id,
timeline_id: TIMELINE_ID,
download_destination: local_path.clone(),
@@ -308,7 +308,7 @@ mod pure_tests {
storage
.info(&storage_root.join(local_path.strip_prefix(&repo_harness.conf.workdir)?))
.expect("For a valid input, valid S3 info should be parsed"),
"Should be able to parse metadata out of the correctly named remote delta relish"
"Should be able to parse metadata out of the correctly named remote delta file"
);
let local_metadata_path = repo_harness
@@ -316,7 +316,7 @@ mod pure_tests {
.join(METADATA_FILE_NAME);
let remote_metadata_path = storage.storage_path(&local_metadata_path)?;
assert_eq!(
RemoteRelishInfo {
RemoteFileInfo {
tenant_id: repo_harness.tenant_id,
timeline_id: TIMELINE_ID,
download_destination: local_metadata_path,
@@ -338,7 +338,7 @@ mod pure_tests {
fn storage_info_error(storage: &LocalFs, storage_path: &PathBuf) -> String {
match storage.info(storage_path) {
Ok(wrong_info) => panic!(
"Expected storage path input {:?} to cause an error, but got relish info: {:?}",
"Expected storage path input {:?} to cause an error, but got file info: {:?}",
storage_path, wrong_info,
),
Err(e) => format!("{:?}", e),
@@ -358,24 +358,23 @@ mod pure_tests {
let relative_timeline_path = relative_timeline_path(&repo_harness)?;
let relative_relish_path =
custom_tenant_id_path(&relative_timeline_path, "wrong_tenant_id")?
.join("wrong_tenant_id_name");
let wrong_tenant_id_path = storage_root.join(&relative_relish_path);
let relative_file_path = custom_tenant_id_path(&relative_timeline_path, "wrong_tenant_id")?
.join("wrong_tenant_id_name");
let wrong_tenant_id_path = storage_root.join(&relative_file_path);
let error_message = storage_info_error(&storage, &wrong_tenant_id_path);
assert!(
error_message.contains(relative_relish_path.to_str().unwrap()),
error_message.contains(relative_file_path.to_str().unwrap()),
"Error message '{}' does not contain the expected substring",
error_message
);
let relative_relish_path =
let relative_file_path =
custom_timeline_id_path(&relative_timeline_path, "wrong_timeline_id")?
.join("wrong_timeline_id_name");
let wrong_timeline_id_path = storage_root.join(&relative_relish_path);
let wrong_timeline_id_path = storage_root.join(&relative_file_path);
let error_message = storage_info_error(&storage, &wrong_timeline_id_path);
assert!(
error_message.contains(relative_relish_path.to_str().unwrap()),
error_message.contains(relative_file_path.to_str().unwrap()),
"Error message '{}' does not contain the expected substring",
error_message
);
@@ -410,24 +409,24 @@ mod pure_tests {
mod fs_tests {
use super::*;
use crate::{
relish_storage::test_utils::relative_timeline_path, repository::repo_harness::RepoHarness,
remote_storage::test_utils::relative_timeline_path, repository::repo_harness::RepoHarness,
};
use std::io::Write;
use tempfile::tempdir;
#[tokio::test]
async fn upload_relish() -> anyhow::Result<()> {
let repo_harness = RepoHarness::create("upload_relish")?;
async fn upload_file() -> anyhow::Result<()> {
let repo_harness = RepoHarness::create("upload_file")?;
let storage = create_storage()?;
let mut source = create_file_for_upload(
let source = create_file_for_upload(
&storage.pageserver_workdir.join("whatever"),
"whatever_contents",
)
.await?;
let target_path = PathBuf::from("/").join("somewhere").join("else");
match storage.upload_relish(&mut source, &target_path).await {
match storage.upload(source, &target_path).await {
Ok(()) => panic!("Should not allow storing files with wrong target path"),
Err(e) => {
let message = format!("{:?}", e);
@@ -435,23 +434,23 @@ mod fs_tests {
assert!(message.contains("does not belong to the current storage"));
}
}
assert!(storage.list_relishes().await?.is_empty());
assert!(storage.list().await?.is_empty());
let target_path_1 = upload_dummy_file(&repo_harness, &storage, "upload_1").await?;
assert_eq!(
storage.list_relishes().await?,
storage.list().await?,
vec![target_path_1.clone()],
"Should list a single file after first upload"
);
let target_path_2 = upload_dummy_file(&repo_harness, &storage, "upload_2").await?;
assert_eq!(
list_relishes_sorted(&storage).await?,
list_files_sorted(&storage).await?,
vec![target_path_1.clone(), target_path_2.clone()],
"Should list a two different files after second upload"
);
// match storage.upload_relish(&mut source, &target_path_1).await {
// match storage.upload_file(&mut source, &target_path_1).await {
// Ok(()) => panic!("Should not allow reuploading storage files"),
// Err(e) => {
// let message = format!("{:?}", e);
@@ -460,7 +459,7 @@ mod fs_tests {
// }
// }
assert_eq!(
list_relishes_sorted(&storage).await?,
list_files_sorted(&storage).await?,
vec![target_path_1, target_path_2],
"Should list a two different files after all upload attempts"
);
@@ -475,16 +474,14 @@ mod fs_tests {
}
#[tokio::test]
async fn download_relish() -> anyhow::Result<()> {
let repo_harness = RepoHarness::create("download_relish")?;
async fn download_file() -> anyhow::Result<()> {
let repo_harness = RepoHarness::create("download_file")?;
let storage = create_storage()?;
let upload_name = "upload_1";
let upload_target = upload_dummy_file(&repo_harness, &storage, upload_name).await?;
let mut content_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
storage
.download_relish(&upload_target, &mut content_bytes)
.await?;
storage.download(&upload_target, &mut content_bytes).await?;
content_bytes.flush().await?;
let contents = String::from_utf8(content_bytes.into_inner().into_inner())?;
@@ -495,10 +492,7 @@ mod fs_tests {
);
let non_existing_path = PathBuf::from("somewhere").join("else");
match storage
.download_relish(&non_existing_path, &mut io::sink())
.await
{
match storage.download(&non_existing_path, &mut io::sink()).await {
Ok(_) => panic!("Should not allow downloading non-existing storage files"),
Err(e) => {
let error_string = e.to_string();
@@ -510,16 +504,16 @@ mod fs_tests {
}
#[tokio::test]
async fn delete_relish() -> anyhow::Result<()> {
let repo_harness = RepoHarness::create("delete_relish")?;
async fn delete_file() -> anyhow::Result<()> {
let repo_harness = RepoHarness::create("delete_file")?;
let storage = create_storage()?;
let upload_name = "upload_1";
let upload_target = upload_dummy_file(&repo_harness, &storage, upload_name).await?;
storage.delete_relish(&upload_target).await?;
assert!(storage.list_relishes().await?.is_empty());
storage.delete(&upload_target).await?;
assert!(storage.list().await?.is_empty());
match storage.delete_relish(&upload_target).await {
match storage.delete(&upload_target).await {
Ok(()) => panic!("Should not allow deleting non-existing storage files"),
Err(e) => {
let error_string = e.to_string();
@@ -540,8 +534,8 @@ mod fs_tests {
.join(relative_timeline_path(harness)?)
.join(name);
storage
.upload_relish(
&mut create_file_for_upload(
.upload(
create_file_for_upload(
&storage.pageserver_workdir.join(name),
&dummy_contents(name),
)
@@ -572,9 +566,9 @@ mod fs_tests {
format!("contents for {}", name)
}
async fn list_relishes_sorted(storage: &LocalFs) -> anyhow::Result<Vec<PathBuf>> {
let mut relishes = storage.list_relishes().await?;
relishes.sort();
Ok(relishes)
async fn list_files_sorted(storage: &LocalFs) -> anyhow::Result<Vec<PathBuf>> {
let mut files = storage.list().await?;
files.sort();
Ok(files)
}
}

View File

@@ -1,5 +1,5 @@
//! AWS S3 relish storage wrapper around `rust_s3` library.
//! Currently does not allow multiple pageservers to use the same bucket concurrently: relishes are
//! AWS S3 storage wrapper around `rust_s3` library.
//! Currently does not allow multiple pageservers to use the same bucket concurrently: objects are
//! placed in the root of the bucket.
use std::path::{Path, PathBuf};
@@ -10,7 +10,7 @@ use tokio::io::{self, AsyncWriteExt};
use crate::{
layered_repository::metadata::METADATA_FILE_NAME,
relish_storage::{parse_ids_from_path, strip_path_prefix, RelishStorage, RemoteRelishInfo},
remote_storage::{parse_ids_from_path, strip_path_prefix, RemoteFileInfo, RemoteStorage},
S3Config,
};
@@ -29,14 +29,14 @@ impl S3ObjectKey {
}
}
/// AWS S3 relish storage.
/// AWS S3 storage.
pub struct S3 {
pageserver_workdir: &'static Path,
bucket: Bucket,
}
impl S3 {
/// Creates the relish storage, errors if incorrect AWS S3 configuration provided.
/// Creates the storage, errors if incorrect AWS S3 configuration provided.
pub fn new(aws_config: &S3Config, pageserver_workdir: &'static Path) -> anyhow::Result<Self> {
let region = aws_config
.bucket_region
@@ -63,10 +63,10 @@ impl S3 {
}
#[async_trait::async_trait]
impl RelishStorage for S3 {
type RelishStoragePath = S3ObjectKey;
impl RemoteStorage for S3 {
type StoragePath = S3ObjectKey;
fn storage_path(&self, local_path: &Path) -> anyhow::Result<Self::RelishStoragePath> {
fn storage_path(&self, local_path: &Path) -> anyhow::Result<Self::StoragePath> {
let relative_path = strip_path_prefix(self.pageserver_workdir, local_path)?;
let mut key = String::new();
for segment in relative_path {
@@ -76,14 +76,14 @@ impl RelishStorage for S3 {
Ok(S3ObjectKey(key))
}
fn info(&self, storage_path: &Self::RelishStoragePath) -> anyhow::Result<RemoteRelishInfo> {
fn info(&self, storage_path: &Self::StoragePath) -> anyhow::Result<RemoteFileInfo> {
let storage_path_key = &storage_path.0;
let is_metadata =
storage_path_key.ends_with(&format!("{}{}", S3_FILE_SEPARATOR, METADATA_FILE_NAME));
let download_destination = storage_path.download_destination(self.pageserver_workdir);
let (tenant_id, timeline_id) =
parse_ids_from_path(storage_path_key.split(S3_FILE_SEPARATOR), storage_path_key)?;
Ok(RemoteRelishInfo {
Ok(RemoteFileInfo {
tenant_id,
timeline_id,
download_destination,
@@ -91,7 +91,7 @@ impl RelishStorage for S3 {
})
}
async fn list_relishes(&self) -> anyhow::Result<Vec<Self::RelishStoragePath>> {
async fn list(&self) -> anyhow::Result<Vec<Self::StoragePath>> {
let list_response = self
.bucket
.list(String::new(), None)
@@ -105,13 +105,13 @@ impl RelishStorage for S3 {
.collect())
}
async fn upload_relish(
async fn upload(
&self,
from: &mut (impl io::AsyncRead + Unpin + Send),
to: &Self::RelishStoragePath,
mut from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
to: &Self::StoragePath,
) -> anyhow::Result<()> {
let mut upload_contents = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
io::copy(from, &mut upload_contents)
io::copy(&mut from, &mut upload_contents)
.await
.context("Failed to read the upload contents")?;
upload_contents
@@ -136,10 +136,10 @@ impl RelishStorage for S3 {
}
}
async fn download_relish(
async fn download(
&self,
from: &Self::RelishStoragePath,
to: &mut (impl io::AsyncWrite + Unpin + Send),
from: &Self::StoragePath,
to: &mut (impl io::AsyncWrite + Unpin + Send + Sync),
) -> anyhow::Result<()> {
let (data, code) = self
.bucket
@@ -159,7 +159,7 @@ impl RelishStorage for S3 {
}
}
async fn delete_relish(&self, path: &Self::RelishStoragePath) -> anyhow::Result<()> {
async fn delete(&self, path: &Self::StoragePath) -> anyhow::Result<()> {
let (_, code) = self
.bucket
.delete_object(path.key())
@@ -180,7 +180,7 @@ impl RelishStorage for S3 {
#[cfg(test)]
mod tests {
use crate::{
relish_storage::test_utils::{
remote_storage::test_utils::{
custom_tenant_id_path, custom_timeline_id_path, relative_timeline_path,
},
repository::repo_harness::{RepoHarness, TIMELINE_ID},
@@ -219,7 +219,7 @@ mod tests {
let repo_harness = RepoHarness::create("storage_path_positive")?;
let segment_1 = "matching";
let segment_2 = "relish";
let segment_2 = "file";
let local_path = &repo_harness.conf.workdir.join(segment_1).join(segment_2);
let expected_key = S3ObjectKey(format!(
"{SEPARATOR}{}{SEPARATOR}{}",
@@ -291,7 +291,7 @@ mod tests {
let s3_key = create_s3_key(&relative_timeline_path.join("not a metadata"));
assert_eq!(
RemoteRelishInfo {
RemoteFileInfo {
tenant_id: repo_harness.tenant_id,
timeline_id: TIMELINE_ID,
download_destination: s3_key.download_destination(&repo_harness.conf.workdir),
@@ -300,12 +300,12 @@ mod tests {
storage
.info(&s3_key)
.expect("For a valid input, valid S3 info should be parsed"),
"Should be able to parse metadata out of the correctly named remote delta relish"
"Should be able to parse metadata out of the correctly named remote delta file"
);
let s3_key = create_s3_key(&relative_timeline_path.join(METADATA_FILE_NAME));
assert_eq!(
RemoteRelishInfo {
RemoteFileInfo {
tenant_id: repo_harness.tenant_id,
timeline_id: TIMELINE_ID,
download_destination: s3_key.download_destination(&repo_harness.conf.workdir),
@@ -326,7 +326,7 @@ mod tests {
fn storage_info_error(storage: &S3, s3_key: &S3ObjectKey) -> String {
match storage.info(s3_key) {
Ok(wrong_info) => panic!(
"Expected key {:?} to error, but got relish info: {:?}",
"Expected key {:?} to error, but got file info: {:?}",
s3_key, wrong_info,
),
Err(e) => e.to_string(),
@@ -387,9 +387,9 @@ mod tests {
}
}
fn create_s3_key(relative_relish_path: &Path) -> S3ObjectKey {
fn create_s3_key(relative_file_path: &Path) -> S3ObjectKey {
S3ObjectKey(
relative_relish_path
relative_file_path
.iter()
.fold(String::new(), |mut path_string, segment| {
path_string.push(S3_FILE_SEPARATOR);

View File

@@ -1,4 +1,4 @@
//! A synchronization logic for the [`RelishStorage`] and the state to ensure the correct synchronizations.
//! A synchronization logic for the [`RemoteStorage`] and the state to ensure the correct synchronizations.
//!
//! The synchronization does not aim to be immediate, instead
//! doing all the job in a separate thread asynchronously, attempting to fully replicate the
@@ -9,12 +9,12 @@
//!
//! During the loop startup, an initial loop state is constructed from all remote storage entries.
//! It's enough to poll the remote state once on startup only, due to agreement that the pageserver has
//! an exclusive write access to the relish storage: new files appear in the storage only after the same
//! an exclusive write access to the remote storage: new files appear in the storage only after the same
//! pageserver writes them.
//!
//! The list construction is currently the only place where the storage sync can return an [`Err`] to the user.
//! New upload tasks are accepted via [`schedule_timeline_upload`] function disregarding of the corresponding loop startup,
//! it's up to the caller to avoid uploading of the new relishes, if that caller did not enable the loop.
//! it's up to the caller to avoid uploading of the new file, if that caller did not enable the loop.
//! After the initial state is loaded into memory and the loop starts, any further [`Err`] results do not stop the loop, but rather
//! reschedules the same task, with possibly less files to sync in it.
//!
@@ -51,7 +51,7 @@
//! and two files are considered "equal", if their paths match. Such files are uploaded/downloaded over, no real contents checks are done.
//! NOTE: No real contents or checksum check happens right now and is a subject to improve later.
//!
//! After the whole is downloaded, [`crate::tenant_mgr::register_relish_download`] function is used to register the image in pageserver.
//! After the whole timeline is downloaded, [`crate::tenant_mgr::register_timeline_download`] function is used to register the image in pageserver.
//!
//! When pageserver signals shutdown, current sync task gets finished and the loop exists.
//!
@@ -78,10 +78,10 @@ use tokio::{
};
use tracing::*;
use super::{RelishStorage, RemoteRelishInfo};
use super::{RemoteFileInfo, RemoteStorage};
use crate::{
layered_repository::metadata::{metadata_path, TimelineMetadata},
tenant_mgr::register_relish_download,
tenant_mgr::register_timeline_download,
PageServerConf,
};
use zenith_metrics::{register_histogram_vec, register_int_gauge, HistogramVec, IntGauge};
@@ -92,12 +92,12 @@ use zenith_utils::{
lazy_static! {
static ref REMAINING_SYNC_ITEMS: IntGauge = register_int_gauge!(
"pageserver_backup_remaining_sync_items",
"pageserver_remote_storage_remaining_sync_items",
"Number of storage sync items left in the queue"
)
.expect("failed to register pageserver backup remaining sync items int gauge");
.expect("failed to register pageserver remote storage remaining sync items int gauge");
static ref IMAGE_SYNC_TIME: HistogramVec = register_histogram_vec!(
"pageserver_backup_image_sync_time",
"pageserver_remote_storage_image_sync_time",
"Time took to synchronize (download or upload) a whole pageserver image. \
Grouped by `operation_kind` (upload|download) and `status` (success|failure)",
&["operation_kind", "status"],
@@ -181,14 +181,14 @@ pub fn schedule_timeline_upload(
}))
}
/// Uses a relish storage given to start the storage sync loop.
/// Uses a remote storage given to start the storage sync loop.
/// See module docs for loop step description.
pub(super) fn spawn_storage_sync_thread<
P: std::fmt::Debug,
S: 'static + RelishStorage<RelishStoragePath = P>,
S: 'static + RemoteStorage<StoragePath = P>,
>(
config: &'static PageServerConf,
relish_storage: S,
remote_storage: S,
max_concurrent_sync: usize,
) -> anyhow::Result<thread::JoinHandle<anyhow::Result<()>>> {
ensure!(
@@ -197,10 +197,10 @@ pub(super) fn spawn_storage_sync_thread<
);
let handle = thread::Builder::new()
.name("Queue based relish storage sync".to_string())
.name("Queue based remote storage sync".to_string())
.spawn(move || {
let concurrent_sync_limit = Semaphore::new(max_concurrent_sync);
let thread_result = storage_sync_loop(config, relish_storage, &concurrent_sync_limit);
let thread_result = storage_sync_loop(config, remote_storage, &concurrent_sync_limit);
concurrent_sync_limit.close();
if let Err(e) = &thread_result {
error!("Failed to run storage sync thread: {:#}", e);
@@ -210,16 +210,16 @@ pub(super) fn spawn_storage_sync_thread<
Ok(handle)
}
fn storage_sync_loop<P: std::fmt::Debug, S: 'static + RelishStorage<RelishStoragePath = P>>(
fn storage_sync_loop<P: std::fmt::Debug, S: 'static + RemoteStorage<StoragePath = P>>(
config: &'static PageServerConf,
relish_storage: S,
remote_storage: S,
concurrent_sync_limit: &Semaphore,
) -> anyhow::Result<()> {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
let mut remote_timelines = runtime
.block_on(fetch_existing_uploads(&relish_storage))
.block_on(fetch_existing_uploads(&remote_storage))
.context("Failed to determine previously uploaded timelines")?;
let urgent_downloads = latest_timelines(&remote_timelines)
@@ -231,7 +231,7 @@ fn storage_sync_loop<P: std::fmt::Debug, S: 'static + RelishStorage<RelishStorag
let exists_locally = config.timeline_path(&timeline_id, &tenant_id).exists();
if exists_locally {
debug!(
"Timeline with tenant id {}, relish id {} exists locally, not downloading",
"Timeline with tenant id {}, timeline id {} exists locally, not downloading",
tenant_id, timeline_id
);
false
@@ -271,7 +271,7 @@ fn storage_sync_loop<P: std::fmt::Debug, S: 'static + RelishStorage<RelishStorag
let sync_status = download_timeline(
config,
concurrent_sync_limit,
&relish_storage,
&remote_storage,
download_data,
false,
)
@@ -282,7 +282,7 @@ fn storage_sync_loop<P: std::fmt::Debug, S: 'static + RelishStorage<RelishStorag
let sync_status = download_timeline(
config,
concurrent_sync_limit,
&relish_storage,
&remote_storage,
download_data,
true,
)
@@ -294,7 +294,7 @@ fn storage_sync_loop<P: std::fmt::Debug, S: 'static + RelishStorage<RelishStorag
config,
concurrent_sync_limit,
&mut remote_timelines,
&relish_storage,
&remote_storage,
layer_upload,
)
.await;
@@ -310,7 +310,7 @@ fn storage_sync_loop<P: std::fmt::Debug, S: 'static + RelishStorage<RelishStorag
}
};
}
debug!("Queue based relish storage sync thread shut down");
debug!("Queue based remote storage sync thread shut down");
Ok(())
}
@@ -354,33 +354,30 @@ fn latest_timelines(
.collect()
}
async fn fetch_existing_uploads<
P: std::fmt::Debug,
S: 'static + RelishStorage<RelishStoragePath = P>,
>(
relish_storage: &S,
async fn fetch_existing_uploads<P: std::fmt::Debug, S: 'static + RemoteStorage<StoragePath = P>>(
remote_storage: &S,
) -> anyhow::Result<HashMap<(ZTenantId, ZTimelineId), RemoteTimeline>> {
let uploaded_relishes = relish_storage
.list_relishes()
let uploaded_files = remote_storage
.list()
.await
.context("Failed to list relish uploads")?;
.context("Failed to list the uploads")?;
let mut relish_data_fetches = uploaded_relishes
let mut data_fetches = uploaded_files
.into_iter()
.map(|remote_path| async {
(
remote_relish_info(relish_storage, &remote_path).await,
remote_file_info(remote_storage, &remote_path).await,
remote_path,
)
})
.collect::<FuturesUnordered<_>>();
let mut fetched = HashMap::new();
while let Some((fetch_result, remote_path)) = relish_data_fetches.next().await {
while let Some((fetch_result, remote_path)) = data_fetches.next().await {
match fetch_result {
Ok((relish_info, remote_metadata)) => {
let tenant_id = relish_info.tenant_id;
let timeline_id = relish_info.timeline_id;
Ok((file_info, remote_metadata)) => {
let tenant_id = file_info.tenant_id;
let timeline_id = file_info.timeline_id;
let remote_timeline =
fetched
.entry((tenant_id, timeline_id))
@@ -393,14 +390,12 @@ async fn fetch_existing_uploads<
if remote_metadata.is_some() {
remote_timeline.metadata = remote_metadata;
} else {
remote_timeline
.layers
.push(relish_info.download_destination);
remote_timeline.layers.push(file_info.download_destination);
}
}
Err(e) => {
warn!(
"Failed to fetch relish info for path {:?}, reason: {:#}",
"Failed to fetch file info for path {:?}, reason: {:#}",
remote_path, e
);
continue;
@@ -411,15 +406,15 @@ async fn fetch_existing_uploads<
Ok(fetched)
}
async fn remote_relish_info<P, S: 'static + RelishStorage<RelishStoragePath = P>>(
relish_storage: &S,
async fn remote_file_info<P, S: 'static + RemoteStorage<StoragePath = P>>(
remote_storage: &S,
remote_path: &P,
) -> anyhow::Result<(RemoteRelishInfo, Option<TimelineMetadata>)> {
let info = relish_storage.info(remote_path)?;
) -> anyhow::Result<(RemoteFileInfo, Option<TimelineMetadata>)> {
let info = remote_storage.info(remote_path)?;
let metadata = if info.is_metadata {
let mut metadata_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
relish_storage
.download_relish(remote_path, &mut metadata_bytes)
remote_storage
.download(remote_path, &mut metadata_bytes)
.await
.with_context(|| {
format!(
@@ -441,10 +436,10 @@ async fn remote_relish_info<P, S: 'static + RelishStorage<RelishStoragePath = P>
Ok((info, metadata))
}
async fn download_timeline<'a, P, S: 'static + RelishStorage<RelishStoragePath = P>>(
async fn download_timeline<'a, P, S: 'static + RemoteStorage<StoragePath = P>>(
config: &'static PageServerConf,
concurrent_sync_limit: &'a Semaphore,
relish_storage: &'a S,
remote_storage: &'a S,
remote_timeline: RemoteTimeline,
urgent: bool,
) -> Option<bool> {
@@ -463,7 +458,7 @@ async fn download_timeline<'a, P, S: 'static + RelishStorage<RelishStoragePath =
let sync_result = synchronize_layers(
config,
concurrent_sync_limit,
relish_storage,
remote_storage,
remote_timeline.layers.into_iter(),
SyncOperation::Download,
&new_metadata,
@@ -474,7 +469,7 @@ async fn download_timeline<'a, P, S: 'static + RelishStorage<RelishStoragePath =
match sync_result {
SyncResult::Success { .. } => {
register_relish_download(config, tenant_id, timeline_id);
register_timeline_download(config, tenant_id, timeline_id);
Some(true)
}
SyncResult::MetadataSyncError { .. } => {
@@ -509,11 +504,11 @@ async fn download_timeline<'a, P, S: 'static + RelishStorage<RelishStoragePath =
}
#[allow(clippy::unnecessary_filter_map)]
async fn upload_timeline<'a, P, S: 'static + RelishStorage<RelishStoragePath = P>>(
async fn upload_timeline<'a, P, S: 'static + RemoteStorage<StoragePath = P>>(
config: &'static PageServerConf,
concurrent_sync_limit: &'a Semaphore,
remote_timelines: &'a mut HashMap<(ZTenantId, ZTimelineId), RemoteTimeline>,
relish_storage: &'a S,
remote_storage: &'a S,
mut new_upload: LocalTimeline,
) -> Option<bool> {
let tenant_id = new_upload.tenant_id;
@@ -559,7 +554,7 @@ async fn upload_timeline<'a, P, S: 'static + RelishStorage<RelishStoragePath = P
let sync_result = synchronize_layers(
config,
concurrent_sync_limit,
relish_storage,
remote_storage,
new_layers.into_iter(),
SyncOperation::Upload,
&new_metadata,
@@ -607,9 +602,9 @@ async fn upload_timeline<'a, P, S: 'static + RelishStorage<RelishStoragePath = P
/// Layer sync operation kind.
///
/// This enum allows to unify the logic for image relish uploads and downloads.
/// This enum allows to unify the logic for image uploads and downloads.
/// When image's layers are synchronized, the only difference
/// between downloads and uploads is the [`RelishStorage`] method we need to call.
/// between downloads and uploads is the [`RemoteStorage`] method we need to call.
#[derive(Debug, Copy, Clone)]
enum SyncOperation {
Download,
@@ -619,13 +614,13 @@ enum SyncOperation {
/// Image sync result.
#[derive(Debug)]
enum SyncResult {
/// All relish files are synced (their paths returned).
/// All regular files are synced (their paths returned).
/// Metadata file is synced too (path not returned).
Success { synced: Vec<PathBuf> },
/// All relish files are synced (their paths returned).
/// All regular files are synced (their paths returned).
/// Metadata file is not synced (path not returned).
MetadataSyncError { synced: Vec<PathBuf> },
/// Some relish files are not synced, some are (paths returned).
/// Some regular files are not synced, some are (paths returned).
/// Metadata file is not synced (path not returned).
LayerSyncError {
synced: Vec<PathBuf>,
@@ -634,13 +629,13 @@ enum SyncResult {
}
/// Synchronizes given layers and metadata contents of a certain image.
/// Relishes are always synced before metadata files are, the latter gets synced only if
/// Regular files are always synced before metadata files are, the latter gets synced only if
/// the rest of the files are successfully processed.
#[allow(clippy::too_many_arguments)]
async fn synchronize_layers<'a, P, S: 'static + RelishStorage<RelishStoragePath = P>>(
async fn synchronize_layers<'a, P, S: 'static + RemoteStorage<StoragePath = P>>(
config: &'static PageServerConf,
concurrent_sync_limit: &'a Semaphore,
relish_storage: &'a S,
remote_storage: &'a S,
layers: impl Iterator<Item = PathBuf>,
sync_operation: SyncOperation,
new_metadata: &'a TimelineMetadata,
@@ -655,8 +650,8 @@ async fn synchronize_layers<'a, P, S: 'static + RelishStorage<RelishStoragePath
.await
.expect("Semaphore should not be closed yet");
let sync_result = match sync_operation {
SyncOperation::Download => download(relish_storage, &layer_path).await,
SyncOperation::Upload => upload(relish_storage, &layer_path).await,
SyncOperation::Download => download(remote_storage, &layer_path).await,
SyncOperation::Upload => upload(remote_storage, &layer_path).await,
};
drop(permit);
(layer_path, sync_result)
@@ -689,7 +684,7 @@ async fn synchronize_layers<'a, P, S: 'static + RelishStorage<RelishStoragePath
trace!("Synced layers: {:?}", synced);
match sync_metadata(
config,
relish_storage,
remote_storage,
sync_operation,
new_metadata,
tenant_id,
@@ -714,9 +709,9 @@ async fn synchronize_layers<'a, P, S: 'static + RelishStorage<RelishStoragePath
}
}
async fn sync_metadata<'a, P, S: 'static + RelishStorage<RelishStoragePath = P>>(
async fn sync_metadata<'a, P, S: 'static + RemoteStorage<StoragePath = P>>(
config: &'static PageServerConf,
relish_storage: &'a S,
remote_storage: &'a S,
sync_operation: SyncOperation,
new_metadata: &'a TimelineMetadata,
tenant_id: ZTenantId,
@@ -739,7 +734,7 @@ async fn sync_metadata<'a, P, S: 'static + RelishStorage<RelishStoragePath = P>>
.await?;
}
SyncOperation::Upload => {
let remote_path = relish_storage
let remote_path = remote_storage
.storage_path(&local_metadata_path)
.with_context(|| {
format!(
@@ -747,26 +742,28 @@ async fn sync_metadata<'a, P, S: 'static + RelishStorage<RelishStoragePath = P>>
local_metadata_path.display()
)
})?;
let mut bytes = io::BufReader::new(new_metadata_bytes.as_slice());
relish_storage
.upload_relish(&mut bytes, &remote_path)
remote_storage
.upload(
io::BufReader::new(std::io::Cursor::new(new_metadata_bytes)),
&remote_path,
)
.await?;
}
}
Ok(())
}
async fn upload<P, S: 'static + RelishStorage<RelishStoragePath = P>>(
relish_storage: &S,
async fn upload<P, S: 'static + RemoteStorage<StoragePath = P>>(
remote_storage: &S,
source: &Path,
) -> anyhow::Result<()> {
let destination = relish_storage.storage_path(source).with_context(|| {
let destination = remote_storage.storage_path(source).with_context(|| {
format!(
"Failed to derive storage destination out of upload path {}",
source.display()
)
})?;
let mut source_file = io::BufReader::new(
let source_file = io::BufReader::new(
tokio::fs::OpenOptions::new()
.read(true)
.open(source)
@@ -778,19 +775,17 @@ async fn upload<P, S: 'static + RelishStorage<RelishStoragePath = P>>(
)
})?,
);
relish_storage
.upload_relish(&mut source_file, &destination)
.await
remote_storage.upload(source_file, &destination).await
}
async fn download<P, S: 'static + RelishStorage<RelishStoragePath = P>>(
relish_storage: &S,
async fn download<P, S: 'static + RemoteStorage<StoragePath = P>>(
remote_storage: &S,
destination: &Path,
) -> anyhow::Result<()> {
if destination.exists() {
Ok(())
} else {
let source = relish_storage.storage_path(destination).with_context(|| {
let source = remote_storage.storage_path(destination).with_context(|| {
format!(
"Failed to derive storage source out of download destination '{}'",
destination.display()
@@ -823,8 +818,8 @@ async fn download<P, S: 'static + RelishStorage<RelishStoragePath = P>>(
})?,
);
relish_storage
.download_relish(&source, &mut destination_file)
remote_storage
.download(&source, &mut destination_file)
.await?;
destination_file.flush().await?;
Ok(())
@@ -841,7 +836,7 @@ mod tests {
use super::*;
use crate::{
relish_storage::local_fs::LocalFs,
remote_storage::local_fs::LocalFs,
repository::repo_harness::{RepoHarness, TIMELINE_ID},
};
use hex_literal::hex;
@@ -1015,7 +1010,7 @@ mod tests {
let repo_harness = RepoHarness::create("reupload_missing_metadata")?;
let storage = LocalFs::new(tempdir()?.path().to_owned(), &repo_harness.conf.workdir)?;
let mut remote_timelines =
store_incorrect_metadata_relishes(&repo_harness, &storage).await?;
store_timelines_with_incorrect_metadata(&repo_harness, &storage).await?;
assert_timelines_equal(
remote_timelines.clone(),
fetch_existing_uploads(&storage).await?,
@@ -1094,7 +1089,7 @@ mod tests {
let repo_harness = RepoHarness::create("test_download_timeline")?;
let storage = LocalFs::new(tempdir()?.path().to_owned(), &repo_harness.conf.workdir)?;
let mut remote_timelines =
store_incorrect_metadata_relishes(&repo_harness, &storage).await?;
store_timelines_with_incorrect_metadata(&repo_harness, &storage).await?;
fs::remove_dir_all(repo_harness.timeline_path(&NO_METADATA_TIMELINE_ID))?;
fs::remove_dir_all(repo_harness.timeline_path(&CORRUPT_METADATA_TIMELINE_ID))?;
@@ -1328,20 +1323,20 @@ mod tests {
async fn ensure_correct_timeline_upload<'a>(
harness: &RepoHarness,
remote_timelines: &'a mut HashMap<(ZTenantId, ZTimelineId), RemoteTimeline>,
relish_storage: &'a LocalFs,
remote_storage: &'a LocalFs,
new_upload: LocalTimeline,
) {
upload_timeline(
harness.conf,
&LIMIT,
remote_timelines,
relish_storage,
remote_storage,
new_upload.clone(),
)
.await;
assert_timelines_equal(
remote_timelines.clone(),
fetch_existing_uploads(relish_storage).await.unwrap(),
fetch_existing_uploads(remote_storage).await.unwrap(),
);
let new_remote_files = remote_timelines
@@ -1451,7 +1446,7 @@ mod tests {
let actual_remote_paths = fs::read_dir(
remote_file
.parent()
.expect("Remote relishes are expected to have their timeline dir as parent"),
.expect("Remote files are expected to have their timeline dir as parent"),
)
.unwrap()
.map(|dir| dir.unwrap().path())
@@ -1468,7 +1463,7 @@ mod tests {
}
}
async fn store_incorrect_metadata_relishes(
async fn store_timelines_with_incorrect_metadata(
harness: &RepoHarness,
storage: &LocalFs,
) -> anyhow::Result<HashMap<(ZTenantId, ZTimelineId), RemoteTimeline>> {
@@ -1500,15 +1495,15 @@ mod tests {
.await;
storage
.delete_relish(&storage.storage_path(&metadata_path(
.delete(&storage.storage_path(&metadata_path(
harness.conf,
NO_METADATA_TIMELINE_ID,
harness.tenant_id,
))?)
.await?;
storage
.upload_relish(
&mut BufReader::new(Cursor::new("corrupt meta".to_string().into_bytes())),
.upload(
BufReader::new(Cursor::new("corrupt meta".to_string().into_bytes())),
&storage.storage_path(&metadata_path(
harness.conf,
CORRUPT_METADATA_TIMELINE_ID,
@@ -1517,8 +1512,8 @@ mod tests {
)
.await?;
for remote_relish in remote_timelines.values_mut() {
remote_relish.metadata = None;
for remote_file in remote_timelines.values_mut() {
remote_file.metadata = None;
}
Ok(remote_timelines)

View File

@@ -104,7 +104,7 @@ fn init_repo(conf: &'static PageServerConf, tenant_id: ZTenantId) {
tenant.state = TenantState::Idle;
}
pub fn register_relish_download(
pub fn register_timeline_download(
conf: &'static PageServerConf,
tenant_id: ZTenantId,
timeline_id: ZTimelineId,