mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 08:52:56 +00:00
Use less pageserver-specific method in RemoteStorage trait
This commit is contained in:
committed by
Kirill Bulatov
parent
0ccfc62e88
commit
b32da3b42e
@@ -78,16 +78,12 @@ use std::{
|
||||
thread,
|
||||
};
|
||||
|
||||
use anyhow::{anyhow, ensure, Context};
|
||||
use anyhow::Context;
|
||||
use tokio::io;
|
||||
use zenith_utils::zid::{ZTenantId, ZTimelineId};
|
||||
|
||||
pub use self::storage_sync::schedule_timeline_upload;
|
||||
use self::{local_fs::LocalFs, rust_s3::S3};
|
||||
use crate::{
|
||||
layered_repository::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME},
|
||||
PageServerConf, RemoteStorageKind,
|
||||
};
|
||||
use crate::{PageServerConf, RemoteStorageKind};
|
||||
|
||||
/// Based on the config, initiates the remote storage connection and starts a separate thread
|
||||
/// that ensures that pageserver and the remote storage are in sync with each other.
|
||||
@@ -127,8 +123,8 @@ trait RemoteStorage: Send + Sync {
|
||||
/// Attempts to derive the storage path out of the local path, if the latter is correct.
|
||||
fn storage_path(&self, local_path: &Path) -> anyhow::Result<Self::StoragePath>;
|
||||
|
||||
/// Gets the layered storage information about the given entry.
|
||||
fn info(&self, storage_path: &Self::StoragePath) -> anyhow::Result<RemoteFileInfo>;
|
||||
/// Gets the download path of the given storage file.
|
||||
fn local_path(&self, storage_path: &Self::StoragePath) -> anyhow::Result<PathBuf>;
|
||||
|
||||
/// Lists all items the storage has right now.
|
||||
async fn list(&self) -> anyhow::Result<Vec<Self::StoragePath>>;
|
||||
@@ -159,16 +155,6 @@ trait RemoteStorage: Send + Sync {
|
||||
async fn delete(&self, path: &Self::StoragePath) -> anyhow::Result<()>;
|
||||
}
|
||||
|
||||
/// Information about a certain remote storage entry.
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
struct RemoteFileInfo {
|
||||
tenant_id: ZTenantId,
|
||||
timeline_id: ZTimelineId,
|
||||
/// Path in the pageserver workdir where the file should go to.
|
||||
download_destination: PathBuf,
|
||||
is_metadata: bool,
|
||||
}
|
||||
|
||||
fn strip_path_prefix<'a>(prefix: &'a Path, path: &'a Path) -> anyhow::Result<&'a Path> {
|
||||
if prefix == path {
|
||||
anyhow::bail!(
|
||||
@@ -185,147 +171,3 @@ fn strip_path_prefix<'a>(prefix: &'a Path, path: &'a Path) -> anyhow::Result<&'a
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_ids_from_path<'a, R: std::fmt::Display>(
|
||||
path_segments: impl Iterator<Item = &'a str>,
|
||||
path_log_representation: &R,
|
||||
) -> anyhow::Result<(ZTenantId, ZTimelineId)> {
|
||||
let mut segments = path_segments.skip_while(|&segment| segment != TENANTS_SEGMENT_NAME);
|
||||
let tenants_segment = segments.next().ok_or_else(|| {
|
||||
anyhow!(
|
||||
"Found no '{}' segment in the storage path '{}'",
|
||||
TENANTS_SEGMENT_NAME,
|
||||
path_log_representation
|
||||
)
|
||||
})?;
|
||||
ensure!(
|
||||
tenants_segment == TENANTS_SEGMENT_NAME,
|
||||
"Failed to extract '{}' segment from storage path '{}'",
|
||||
TENANTS_SEGMENT_NAME,
|
||||
path_log_representation
|
||||
);
|
||||
let tenant_id = segments
|
||||
.next()
|
||||
.ok_or_else(|| {
|
||||
anyhow!(
|
||||
"Found no tenant id in the storage path '{}'",
|
||||
path_log_representation
|
||||
)
|
||||
})?
|
||||
.parse::<ZTenantId>()
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to parse tenant id from storage path '{}'",
|
||||
path_log_representation
|
||||
)
|
||||
})?;
|
||||
|
||||
let timelines_segment = segments.next().ok_or_else(|| {
|
||||
anyhow!(
|
||||
"Found no '{}' segment in the storage path '{}'",
|
||||
TIMELINES_SEGMENT_NAME,
|
||||
path_log_representation
|
||||
)
|
||||
})?;
|
||||
ensure!(
|
||||
timelines_segment == TIMELINES_SEGMENT_NAME,
|
||||
"Failed to extract '{}' segment from storage path '{}'",
|
||||
TIMELINES_SEGMENT_NAME,
|
||||
path_log_representation
|
||||
);
|
||||
let timeline_id = segments
|
||||
.next()
|
||||
.ok_or_else(|| {
|
||||
anyhow!(
|
||||
"Found no timeline id in the storage path '{}'",
|
||||
path_log_representation
|
||||
)
|
||||
})?
|
||||
.parse::<ZTimelineId>()
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to parse timeline id from storage path '{}'",
|
||||
path_log_representation
|
||||
)
|
||||
})?;
|
||||
|
||||
Ok((tenant_id, timeline_id))
|
||||
}
|
||||
|
||||
/// A set of common test utils to share in unit tests inside the module tree.
|
||||
#[cfg(test)]
|
||||
mod test_utils {
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use anyhow::ensure;
|
||||
|
||||
use crate::{
|
||||
layered_repository::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME},
|
||||
repository::repo_harness::{RepoHarness, TIMELINE_ID},
|
||||
};
|
||||
|
||||
/// Gives a timeline path with pageserver workdir stripped off.
|
||||
pub fn relative_timeline_path(harness: &RepoHarness) -> anyhow::Result<PathBuf> {
|
||||
let timeline_path = harness.timeline_path(&TIMELINE_ID);
|
||||
Ok(timeline_path
|
||||
.strip_prefix(&harness.conf.workdir)?
|
||||
.to_path_buf())
|
||||
}
|
||||
|
||||
/// Creates a path with custom tenant id in one of its segments.
|
||||
/// Useful for emulating paths with wrong ids.
|
||||
pub fn custom_tenant_id_path(
|
||||
path_with_tenant_id: &Path,
|
||||
new_tenant_id: &str,
|
||||
) -> anyhow::Result<PathBuf> {
|
||||
let mut new_path = PathBuf::new();
|
||||
let mut is_tenant_id = false;
|
||||
let mut tenant_id_replaced = false;
|
||||
for segment in path_with_tenant_id {
|
||||
match segment.to_str() {
|
||||
Some(TENANTS_SEGMENT_NAME) => is_tenant_id = true,
|
||||
Some(_tenant_id_str) if is_tenant_id => {
|
||||
is_tenant_id = false;
|
||||
new_path.push(new_tenant_id);
|
||||
tenant_id_replaced = true;
|
||||
continue;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
new_path.push(segment)
|
||||
}
|
||||
|
||||
ensure!(tenant_id_replaced, "Found no tenant id segment to replace");
|
||||
Ok(new_path)
|
||||
}
|
||||
|
||||
/// Creates a path with custom timeline id in one of its segments.
|
||||
/// Useful for emulating paths with wrong ids.
|
||||
pub fn custom_timeline_id_path(
|
||||
path_with_timeline_id: &Path,
|
||||
new_timeline_id: &str,
|
||||
) -> anyhow::Result<PathBuf> {
|
||||
let mut new_path = PathBuf::new();
|
||||
let mut is_timeline_id = false;
|
||||
let mut timeline_id_replaced = false;
|
||||
for segment in path_with_timeline_id {
|
||||
match segment.to_str() {
|
||||
Some(TIMELINES_SEGMENT_NAME) => is_timeline_id = true,
|
||||
Some(_timeline_id_str) if is_timeline_id => {
|
||||
is_timeline_id = false;
|
||||
new_path.push(new_timeline_id);
|
||||
timeline_id_replaced = true;
|
||||
continue;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
new_path.push(segment)
|
||||
}
|
||||
|
||||
ensure!(
|
||||
timeline_id_replaced,
|
||||
"Found no timeline id segment to replace"
|
||||
);
|
||||
Ok(new_path)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,6 @@
|
||||
//! volume is mounted to the local FS.
|
||||
|
||||
use std::{
|
||||
ffi::OsStr,
|
||||
future::Future,
|
||||
path::{Path, PathBuf},
|
||||
pin::Pin,
|
||||
@@ -18,8 +17,7 @@ use tokio::{
|
||||
};
|
||||
use tracing::*;
|
||||
|
||||
use super::{parse_ids_from_path, strip_path_prefix, RemoteFileInfo, RemoteStorage};
|
||||
use crate::layered_repository::metadata::METADATA_FILE_NAME;
|
||||
use super::{strip_path_prefix, RemoteStorage};
|
||||
|
||||
pub struct LocalFs {
|
||||
pageserver_workdir: &'static Path,
|
||||
@@ -68,22 +66,10 @@ impl RemoteStorage for LocalFs {
|
||||
))
|
||||
}
|
||||
|
||||
fn info(&self, storage_path: &Self::StoragePath) -> anyhow::Result<RemoteFileInfo> {
|
||||
let is_metadata =
|
||||
storage_path.file_name().and_then(OsStr::to_str) == Some(METADATA_FILE_NAME);
|
||||
fn local_path(&self, storage_path: &Self::StoragePath) -> anyhow::Result<PathBuf> {
|
||||
let relative_path = strip_path_prefix(&self.root, storage_path)
|
||||
.context("local path does not belong to this storage")?;
|
||||
let download_destination = self.pageserver_workdir.join(relative_path);
|
||||
let (tenant_id, timeline_id) = parse_ids_from_path(
|
||||
relative_path.iter().filter_map(|segment| segment.to_str()),
|
||||
&relative_path.display(),
|
||||
)?;
|
||||
Ok(RemoteFileInfo {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
download_destination,
|
||||
is_metadata,
|
||||
})
|
||||
Ok(self.pageserver_workdir.join(relative_path))
|
||||
}
|
||||
|
||||
async fn list(&self) -> anyhow::Result<Vec<Self::StoragePath>> {
|
||||
@@ -113,11 +99,18 @@ impl RemoteStorage for LocalFs {
|
||||
|
||||
io::copy(&mut from, &mut destination)
|
||||
.await
|
||||
.context("Failed to upload a file to the local storage")?;
|
||||
destination
|
||||
.flush()
|
||||
.await
|
||||
.context("Failed to upload a file to the local storage")?;
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to upload file to the local storage at '{}'",
|
||||
target_file_path.display()
|
||||
)
|
||||
})?;
|
||||
destination.flush().await.with_context(|| {
|
||||
format!(
|
||||
"Failed to upload file to the local storage at '{}'",
|
||||
target_file_path.display()
|
||||
)
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -141,9 +134,13 @@ impl RemoteStorage for LocalFs {
|
||||
)
|
||||
})?,
|
||||
);
|
||||
io::copy(&mut source, to)
|
||||
.await
|
||||
.context("Failed to download a file from the local storage")?;
|
||||
io::copy(&mut source, to).await.with_context(|| {
|
||||
format!(
|
||||
"Failed to download file '{}' from the local storage",
|
||||
file_path.display()
|
||||
)
|
||||
})?;
|
||||
source.flush().await?;
|
||||
Ok(())
|
||||
} else {
|
||||
bail!(
|
||||
@@ -275,9 +272,6 @@ async fn create_target_directory(target_file_path: &Path) -> anyhow::Result<()>
|
||||
mod pure_tests {
|
||||
use crate::{
|
||||
layered_repository::metadata::METADATA_FILE_NAME,
|
||||
remote_storage::test_utils::{
|
||||
custom_tenant_id_path, custom_timeline_id_path, relative_timeline_path,
|
||||
},
|
||||
repository::repo_harness::{RepoHarness, TIMELINE_ID},
|
||||
};
|
||||
|
||||
@@ -345,8 +339,8 @@ mod pure_tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn info_positive() -> anyhow::Result<()> {
|
||||
let repo_harness = RepoHarness::create("info_positive")?;
|
||||
fn local_path_positive() -> anyhow::Result<()> {
|
||||
let repo_harness = RepoHarness::create("local_path_positive")?;
|
||||
let storage_root = PathBuf::from("somewhere").join("else");
|
||||
let storage = LocalFs {
|
||||
pageserver_workdir: &repo_harness.conf.workdir,
|
||||
@@ -356,14 +350,11 @@ mod pure_tests {
|
||||
let name = "not a metadata";
|
||||
let local_path = repo_harness.timeline_path(&TIMELINE_ID).join(name);
|
||||
assert_eq!(
|
||||
RemoteFileInfo {
|
||||
tenant_id: repo_harness.tenant_id,
|
||||
timeline_id: TIMELINE_ID,
|
||||
download_destination: local_path.clone(),
|
||||
is_metadata: false,
|
||||
},
|
||||
local_path,
|
||||
storage
|
||||
.info(&storage_root.join(local_path.strip_prefix(&repo_harness.conf.workdir)?))
|
||||
.local_path(
|
||||
&storage_root.join(local_path.strip_prefix(&repo_harness.conf.workdir)?)
|
||||
)
|
||||
.expect("For a valid input, valid S3 info should be parsed"),
|
||||
"Should be able to parse metadata out of the correctly named remote delta file"
|
||||
);
|
||||
@@ -373,15 +364,10 @@ mod pure_tests {
|
||||
.join(METADATA_FILE_NAME);
|
||||
let remote_metadata_path = storage.storage_path(&local_metadata_path)?;
|
||||
assert_eq!(
|
||||
RemoteFileInfo {
|
||||
tenant_id: repo_harness.tenant_id,
|
||||
timeline_id: TIMELINE_ID,
|
||||
download_destination: local_metadata_path,
|
||||
is_metadata: true,
|
||||
},
|
||||
local_metadata_path,
|
||||
storage
|
||||
.info(&remote_metadata_path)
|
||||
.expect("For a valid input, valid S3 info should be parsed"),
|
||||
.local_path(&remote_metadata_path)
|
||||
.expect("For a valid input, valid local path should be parsed"),
|
||||
"Should be able to parse metadata out of the correctly named remote metadata file"
|
||||
);
|
||||
|
||||
@@ -389,53 +375,30 @@ mod pure_tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn info_negatives() -> anyhow::Result<()> {
|
||||
fn local_path_negatives() -> anyhow::Result<()> {
|
||||
#[track_caller]
|
||||
#[allow(clippy::ptr_arg)] // have to use &PathBuf due to `storage.info` parameter requirements
|
||||
fn storage_info_error(storage: &LocalFs, storage_path: &PathBuf) -> String {
|
||||
match storage.info(storage_path) {
|
||||
Ok(wrong_info) => panic!(
|
||||
"Expected storage path input {:?} to cause an error, but got file info: {:?}",
|
||||
storage_path, wrong_info,
|
||||
#[allow(clippy::ptr_arg)] // have to use &PathBuf due to `storage.local_path` parameter requirements
|
||||
fn local_path_error(storage: &LocalFs, storage_path: &PathBuf) -> String {
|
||||
match storage.local_path(storage_path) {
|
||||
Ok(wrong_path) => panic!(
|
||||
"Expected local path input {:?} to cause an error, but got file path: {:?}",
|
||||
storage_path, wrong_path,
|
||||
),
|
||||
Err(e) => format!("{:?}", e),
|
||||
}
|
||||
}
|
||||
|
||||
let repo_harness = RepoHarness::create("info_negatives")?;
|
||||
let repo_harness = RepoHarness::create("local_path_negatives")?;
|
||||
let storage_root = PathBuf::from("somewhere").join("else");
|
||||
let storage = LocalFs {
|
||||
pageserver_workdir: &repo_harness.conf.workdir,
|
||||
root: storage_root.clone(),
|
||||
root: storage_root,
|
||||
};
|
||||
|
||||
let totally_wrong_path = "wrong_wrong_wrong";
|
||||
let error_message = storage_info_error(&storage, &PathBuf::from(totally_wrong_path));
|
||||
let error_message = local_path_error(&storage, &PathBuf::from(totally_wrong_path));
|
||||
assert!(error_message.contains(totally_wrong_path));
|
||||
|
||||
let relative_timeline_path = relative_timeline_path(&repo_harness)?;
|
||||
|
||||
let relative_file_path = custom_tenant_id_path(&relative_timeline_path, "wrong_tenant_id")?
|
||||
.join("wrong_tenant_id_name");
|
||||
let wrong_tenant_id_path = storage_root.join(&relative_file_path);
|
||||
let error_message = storage_info_error(&storage, &wrong_tenant_id_path);
|
||||
assert!(
|
||||
error_message.contains(relative_file_path.to_str().unwrap()),
|
||||
"Error message '{}' does not contain the expected substring",
|
||||
error_message
|
||||
);
|
||||
|
||||
let relative_file_path =
|
||||
custom_timeline_id_path(&relative_timeline_path, "wrong_timeline_id")?
|
||||
.join("wrong_timeline_id_name");
|
||||
let wrong_timeline_id_path = storage_root.join(&relative_file_path);
|
||||
let error_message = storage_info_error(&storage, &wrong_timeline_id_path);
|
||||
assert!(
|
||||
error_message.contains(relative_file_path.to_str().unwrap()),
|
||||
"Error message '{}' does not contain the expected substring",
|
||||
error_message
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -451,7 +414,7 @@ mod pure_tests {
|
||||
};
|
||||
|
||||
let storage_path = dummy_storage.storage_path(&original_path)?;
|
||||
let download_destination = dummy_storage.info(&storage_path)?.download_destination;
|
||||
let download_destination = dummy_storage.local_path(&storage_path)?;
|
||||
|
||||
assert_eq!(
|
||||
original_path, download_destination,
|
||||
@@ -465,9 +428,7 @@ mod pure_tests {
|
||||
#[cfg(test)]
|
||||
mod fs_tests {
|
||||
use super::*;
|
||||
use crate::{
|
||||
remote_storage::test_utils::relative_timeline_path, repository::repo_harness::RepoHarness,
|
||||
};
|
||||
use crate::repository::repo_harness::{RepoHarness, TIMELINE_ID};
|
||||
|
||||
use std::io::Write;
|
||||
use tempfile::tempdir;
|
||||
@@ -684,10 +645,9 @@ mod fs_tests {
|
||||
storage: &LocalFs,
|
||||
name: &str,
|
||||
) -> anyhow::Result<PathBuf> {
|
||||
let storage_path = storage
|
||||
.root
|
||||
.join(relative_timeline_path(harness)?)
|
||||
.join(name);
|
||||
let timeline_path = harness.timeline_path(&TIMELINE_ID);
|
||||
let relative_timeline_path = timeline_path.strip_prefix(&harness.conf.workdir)?;
|
||||
let storage_path = storage.root.join(relative_timeline_path).join(name);
|
||||
storage
|
||||
.upload(
|
||||
create_file_for_upload(
|
||||
|
||||
@@ -9,8 +9,7 @@ use s3::{bucket::Bucket, creds::Credentials, region::Region};
|
||||
use tokio::io::{self, AsyncWriteExt};
|
||||
|
||||
use crate::{
|
||||
layered_repository::metadata::METADATA_FILE_NAME,
|
||||
remote_storage::{parse_ids_from_path, strip_path_prefix, RemoteFileInfo, RemoteStorage},
|
||||
remote_storage::{strip_path_prefix, RemoteStorage},
|
||||
S3Config,
|
||||
};
|
||||
|
||||
@@ -76,19 +75,8 @@ impl RemoteStorage for S3 {
|
||||
Ok(S3ObjectKey(key))
|
||||
}
|
||||
|
||||
fn info(&self, storage_path: &Self::StoragePath) -> anyhow::Result<RemoteFileInfo> {
|
||||
let storage_path_key = &storage_path.0;
|
||||
let is_metadata =
|
||||
storage_path_key.ends_with(&format!("{}{}", S3_FILE_SEPARATOR, METADATA_FILE_NAME));
|
||||
let download_destination = storage_path.download_destination(self.pageserver_workdir);
|
||||
let (tenant_id, timeline_id) =
|
||||
parse_ids_from_path(storage_path_key.split(S3_FILE_SEPARATOR), storage_path_key)?;
|
||||
Ok(RemoteFileInfo {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
download_destination,
|
||||
is_metadata,
|
||||
})
|
||||
fn local_path(&self, storage_path: &Self::StoragePath) -> anyhow::Result<PathBuf> {
|
||||
Ok(storage_path.download_destination(self.pageserver_workdir))
|
||||
}
|
||||
|
||||
async fn list(&self) -> anyhow::Result<Vec<Self::StoragePath>> {
|
||||
@@ -212,9 +200,7 @@ impl RemoteStorage for S3 {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::{
|
||||
remote_storage::test_utils::{
|
||||
custom_tenant_id_path, custom_timeline_id_path, relative_timeline_path,
|
||||
},
|
||||
layered_repository::metadata::METADATA_FILE_NAME,
|
||||
repository::repo_harness::{RepoHarness, TIMELINE_ID},
|
||||
};
|
||||
|
||||
@@ -316,35 +302,26 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn info_positive() -> anyhow::Result<()> {
|
||||
let repo_harness = RepoHarness::create("info_positive")?;
|
||||
fn local_path_positive() -> anyhow::Result<()> {
|
||||
let repo_harness = RepoHarness::create("local_path_positive")?;
|
||||
let storage = dummy_storage(&repo_harness.conf.workdir);
|
||||
let relative_timeline_path = relative_timeline_path(&repo_harness)?;
|
||||
let timeline_dir = repo_harness.timeline_path(&TIMELINE_ID);
|
||||
let relative_timeline_path = timeline_dir.strip_prefix(&repo_harness.conf.workdir)?;
|
||||
|
||||
let s3_key = create_s3_key(&relative_timeline_path.join("not a metadata"));
|
||||
assert_eq!(
|
||||
RemoteFileInfo {
|
||||
tenant_id: repo_harness.tenant_id,
|
||||
timeline_id: TIMELINE_ID,
|
||||
download_destination: s3_key.download_destination(&repo_harness.conf.workdir),
|
||||
is_metadata: false,
|
||||
},
|
||||
s3_key.download_destination(&repo_harness.conf.workdir),
|
||||
storage
|
||||
.info(&s3_key)
|
||||
.local_path(&s3_key)
|
||||
.expect("For a valid input, valid S3 info should be parsed"),
|
||||
"Should be able to parse metadata out of the correctly named remote delta file"
|
||||
);
|
||||
|
||||
let s3_key = create_s3_key(&relative_timeline_path.join(METADATA_FILE_NAME));
|
||||
assert_eq!(
|
||||
RemoteFileInfo {
|
||||
tenant_id: repo_harness.tenant_id,
|
||||
timeline_id: TIMELINE_ID,
|
||||
download_destination: s3_key.download_destination(&repo_harness.conf.workdir),
|
||||
is_metadata: true,
|
||||
},
|
||||
s3_key.download_destination(&repo_harness.conf.workdir),
|
||||
storage
|
||||
.info(&s3_key)
|
||||
.local_path(&s3_key)
|
||||
.expect("For a valid input, valid S3 info should be parsed"),
|
||||
"Should be able to parse metadata out of the correctly named remote metadata file"
|
||||
);
|
||||
@@ -352,43 +329,6 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn info_negatives() -> anyhow::Result<()> {
|
||||
#[track_caller]
|
||||
fn storage_info_error(storage: &S3, s3_key: &S3ObjectKey) -> String {
|
||||
match storage.info(s3_key) {
|
||||
Ok(wrong_info) => panic!(
|
||||
"Expected key {:?} to error, but got file info: {:?}",
|
||||
s3_key, wrong_info,
|
||||
),
|
||||
Err(e) => e.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
let repo_harness = RepoHarness::create("info_negatives")?;
|
||||
let storage = dummy_storage(&repo_harness.conf.workdir);
|
||||
let relative_timeline_path = relative_timeline_path(&repo_harness)?;
|
||||
|
||||
let totally_wrong_path = "wrong_wrong_wrong";
|
||||
let error_message =
|
||||
storage_info_error(&storage, &S3ObjectKey(totally_wrong_path.to_string()));
|
||||
assert!(error_message.contains(totally_wrong_path));
|
||||
|
||||
let wrong_tenant_id = create_s3_key(
|
||||
&custom_tenant_id_path(&relative_timeline_path, "wrong_tenant_id")?.join("name"),
|
||||
);
|
||||
let error_message = storage_info_error(&storage, &wrong_tenant_id);
|
||||
assert!(error_message.contains(&wrong_tenant_id.0));
|
||||
|
||||
let wrong_timeline_id = create_s3_key(
|
||||
&custom_timeline_id_path(&relative_timeline_path, "wrong_timeline_id")?.join("name"),
|
||||
);
|
||||
let error_message = storage_info_error(&storage, &wrong_timeline_id);
|
||||
assert!(error_message.contains(&wrong_timeline_id.0));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn download_destination_matches_original_path() -> anyhow::Result<()> {
|
||||
let repo_harness = RepoHarness::create("download_destination_matches_original_path")?;
|
||||
@@ -397,7 +337,7 @@ mod tests {
|
||||
let dummy_storage = dummy_storage(&repo_harness.conf.workdir);
|
||||
|
||||
let key = dummy_storage.storage_path(&original_path)?;
|
||||
let download_destination = dummy_storage.info(&key)?.download_destination;
|
||||
let download_destination = dummy_storage.local_path(&key)?;
|
||||
|
||||
assert_eq!(
|
||||
original_path, download_destination,
|
||||
|
||||
@@ -67,7 +67,7 @@ use std::{
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use anyhow::{ensure, Context};
|
||||
use anyhow::{anyhow, ensure, Context};
|
||||
use futures::stream::{FuturesUnordered, StreamExt};
|
||||
use lazy_static::lazy_static;
|
||||
use tokio::{
|
||||
@@ -78,9 +78,12 @@ use tokio::{
|
||||
};
|
||||
use tracing::*;
|
||||
|
||||
use super::{RemoteFileInfo, RemoteStorage};
|
||||
use super::RemoteStorage;
|
||||
use crate::{
|
||||
layered_repository::metadata::{metadata_path, TimelineMetadata},
|
||||
layered_repository::{
|
||||
metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME},
|
||||
TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME,
|
||||
},
|
||||
tenant_mgr::register_timeline_download,
|
||||
PageServerConf,
|
||||
};
|
||||
@@ -365,19 +368,41 @@ async fn fetch_existing_uploads<P: std::fmt::Debug, S: 'static + RemoteStorage<S
|
||||
let mut data_fetches = uploaded_files
|
||||
.into_iter()
|
||||
.map(|remote_path| async {
|
||||
(
|
||||
remote_file_info(remote_storage, &remote_path).await,
|
||||
remote_path,
|
||||
)
|
||||
let local_path = match remote_storage.local_path(&remote_path) {
|
||||
Ok(path) => path,
|
||||
Err(e) => return Err((e, remote_path)),
|
||||
};
|
||||
let metadata = if local_path
|
||||
.file_name()
|
||||
.and_then(|os_str| os_str.to_str())
|
||||
.unwrap_or_default()
|
||||
== METADATA_FILE_NAME
|
||||
{
|
||||
let mut metadata_bytes = Vec::new();
|
||||
if let Err(e) = remote_storage
|
||||
.download(&remote_path, &mut metadata_bytes)
|
||||
.await
|
||||
{
|
||||
return Err((e, remote_path));
|
||||
};
|
||||
let metadata = match TimelineMetadata::from_bytes(&metadata_bytes) {
|
||||
Ok(metadata) => metadata,
|
||||
Err(e) => return Err((e, remote_path)),
|
||||
};
|
||||
Some(metadata)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let (tenant_id, timeline_id) =
|
||||
parse_ids_from_path(&local_path).map_err(|e| (e, remote_path))?;
|
||||
Ok::<_, (anyhow::Error, P)>((local_path, tenant_id, timeline_id, metadata))
|
||||
})
|
||||
.collect::<FuturesUnordered<_>>();
|
||||
|
||||
let mut fetched = HashMap::new();
|
||||
while let Some((fetch_result, remote_path)) = data_fetches.next().await {
|
||||
while let Some(fetch_result) = data_fetches.next().await {
|
||||
match fetch_result {
|
||||
Ok((file_info, remote_metadata)) => {
|
||||
let tenant_id = file_info.tenant_id;
|
||||
let timeline_id = file_info.timeline_id;
|
||||
Ok((local_path, tenant_id, timeline_id, remote_metadata)) => {
|
||||
let remote_timeline =
|
||||
fetched
|
||||
.entry((tenant_id, timeline_id))
|
||||
@@ -390,10 +415,10 @@ async fn fetch_existing_uploads<P: std::fmt::Debug, S: 'static + RemoteStorage<S
|
||||
if remote_metadata.is_some() {
|
||||
remote_timeline.metadata = remote_metadata;
|
||||
} else {
|
||||
remote_timeline.layers.push(file_info.download_destination);
|
||||
remote_timeline.layers.push(local_path);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
Err((e, remote_path)) => {
|
||||
warn!(
|
||||
"Failed to fetch file info for path {:?}, reason: {:#}",
|
||||
remote_path, e
|
||||
@@ -406,34 +431,70 @@ async fn fetch_existing_uploads<P: std::fmt::Debug, S: 'static + RemoteStorage<S
|
||||
Ok(fetched)
|
||||
}
|
||||
|
||||
async fn remote_file_info<P, S: 'static + RemoteStorage<StoragePath = P>>(
|
||||
remote_storage: &S,
|
||||
remote_path: &P,
|
||||
) -> anyhow::Result<(RemoteFileInfo, Option<TimelineMetadata>)> {
|
||||
let info = remote_storage.info(remote_path)?;
|
||||
let metadata = if info.is_metadata {
|
||||
let mut metadata_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new()));
|
||||
remote_storage
|
||||
.download(remote_path, &mut metadata_bytes)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to download metadata file contents for tenant {}, timeline {}",
|
||||
info.tenant_id, info.timeline_id
|
||||
)
|
||||
})?;
|
||||
metadata_bytes.flush().await.with_context(|| {
|
||||
fn parse_ids_from_path(path: &Path) -> anyhow::Result<(ZTenantId, ZTimelineId)> {
|
||||
let mut segments = path
|
||||
.iter()
|
||||
.flat_map(|segment| segment.to_str())
|
||||
.skip_while(|&segment| segment != TENANTS_SEGMENT_NAME);
|
||||
let tenants_segment = segments.next().ok_or_else(|| {
|
||||
anyhow!(
|
||||
"Found no '{}' segment in the storage path '{}'",
|
||||
TENANTS_SEGMENT_NAME,
|
||||
path.display()
|
||||
)
|
||||
})?;
|
||||
ensure!(
|
||||
tenants_segment == TENANTS_SEGMENT_NAME,
|
||||
"Failed to extract '{}' segment from storage path '{}'",
|
||||
TENANTS_SEGMENT_NAME,
|
||||
path.display()
|
||||
);
|
||||
let tenant_id = segments
|
||||
.next()
|
||||
.ok_or_else(|| {
|
||||
anyhow!(
|
||||
"Found no tenant id in the storage path '{}'",
|
||||
path.display()
|
||||
)
|
||||
})?
|
||||
.parse::<ZTenantId>()
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to download metadata file contents for tenant {}, timeline {}",
|
||||
info.tenant_id, info.timeline_id
|
||||
"Failed to parse tenant id from storage path '{}'",
|
||||
path.display()
|
||||
)
|
||||
})?;
|
||||
let metadata_bytes = metadata_bytes.into_inner().into_inner();
|
||||
Some(TimelineMetadata::from_bytes(&metadata_bytes)?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
Ok((info, metadata))
|
||||
|
||||
let timelines_segment = segments.next().ok_or_else(|| {
|
||||
anyhow!(
|
||||
"Found no '{}' segment in the storage path '{}'",
|
||||
TIMELINES_SEGMENT_NAME,
|
||||
path.display()
|
||||
)
|
||||
})?;
|
||||
ensure!(
|
||||
timelines_segment == TIMELINES_SEGMENT_NAME,
|
||||
"Failed to extract '{}' segment from storage path '{}'",
|
||||
TIMELINES_SEGMENT_NAME,
|
||||
path.display()
|
||||
);
|
||||
let timeline_id = segments
|
||||
.next()
|
||||
.ok_or_else(|| {
|
||||
anyhow!(
|
||||
"Found no timeline id in the storage path '{}'",
|
||||
path.display()
|
||||
)
|
||||
})?
|
||||
.parse::<ZTimelineId>()
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to parse timeline id from storage path '{}'",
|
||||
path.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
Ok((tenant_id, timeline_id))
|
||||
}
|
||||
|
||||
async fn download_timeline<'a, P, S: 'static + RemoteStorage<StoragePath = P>>(
|
||||
|
||||
Reference in New Issue
Block a user