mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-02 13:00:37 +00:00
Remove RemoteObjectName and many remote storage generics in pageserver (#2360)
This commit is contained in:
@@ -42,19 +42,13 @@ pub const DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS: u32 = 10;
|
||||
/// https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/
|
||||
pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100;
|
||||
|
||||
pub trait RemoteObjectName {
|
||||
// Needed to retrieve last component for RemoteObjectId.
|
||||
// In other words a file name
|
||||
fn object_name(&self) -> Option<&str>;
|
||||
}
|
||||
|
||||
/// Storage (potentially remote) API to manage its state.
|
||||
/// This storage tries to be unaware of any layered repository context,
|
||||
/// providing basic CRUD operations for storage files.
|
||||
#[async_trait::async_trait]
|
||||
pub trait RemoteStorage: Send + Sync {
|
||||
/// A way to uniquely reference a file in the remote storage.
|
||||
type RemoteObjectId: RemoteObjectName;
|
||||
type RemoteObjectId;
|
||||
|
||||
/// Attempts to derive the storage path out of the local path, if the latter is correct.
|
||||
fn remote_object_id(&self, local_path: &Path) -> anyhow::Result<Self::RemoteObjectId>;
|
||||
@@ -71,7 +65,7 @@ pub trait RemoteStorage: Send + Sync {
|
||||
/// so this method doesnt need to.
|
||||
async fn list_prefixes(
|
||||
&self,
|
||||
prefix: Option<Self::RemoteObjectId>,
|
||||
prefix: Option<&Self::RemoteObjectId>,
|
||||
) -> anyhow::Result<Vec<Self::RemoteObjectId>>;
|
||||
|
||||
/// Streams the local file contents into remote into the remote storage entry.
|
||||
@@ -163,6 +157,13 @@ impl GenericRemoteStorage {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn as_local(&self) -> Option<&LocalFs> {
|
||||
match self {
|
||||
Self::Local(local_fs) => Some(local_fs),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Extra set of key-value pairs that contain arbitrary metadata about the storage entry.
|
||||
|
||||
@@ -5,7 +5,6 @@
|
||||
//! volume is mounted to the local FS.
|
||||
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
future::Future,
|
||||
path::{Path, PathBuf},
|
||||
pin::Pin,
|
||||
@@ -18,16 +17,10 @@ use tokio::{
|
||||
};
|
||||
use tracing::*;
|
||||
|
||||
use crate::{path_with_suffix_extension, Download, DownloadError, RemoteObjectName};
|
||||
use crate::{path_with_suffix_extension, Download, DownloadError};
|
||||
|
||||
use super::{strip_path_prefix, RemoteStorage, StorageMetadata};
|
||||
|
||||
impl RemoteObjectName for PathBuf {
|
||||
fn object_name(&self) -> Option<&str> {
|
||||
self.file_stem().and_then(|n| n.to_str())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct LocalFs {
|
||||
working_directory: PathBuf,
|
||||
storage_root: PathBuf,
|
||||
@@ -113,13 +106,10 @@ impl RemoteStorage for LocalFs {
|
||||
|
||||
async fn list_prefixes(
|
||||
&self,
|
||||
prefix: Option<Self::RemoteObjectId>,
|
||||
prefix: Option<&Self::RemoteObjectId>,
|
||||
) -> anyhow::Result<Vec<Self::RemoteObjectId>> {
|
||||
let path = match prefix {
|
||||
Some(prefix) => Cow::Owned(prefix),
|
||||
None => Cow::Borrowed(&self.storage_root),
|
||||
};
|
||||
get_all_files(path.as_ref(), false).await
|
||||
let path = prefix.unwrap_or(&self.storage_root);
|
||||
get_all_files(path, false).await
|
||||
}
|
||||
|
||||
async fn upload(
|
||||
|
||||
@@ -19,9 +19,7 @@ use tokio::{io, sync::Semaphore};
|
||||
use tokio_util::io::ReaderStream;
|
||||
use tracing::debug;
|
||||
|
||||
use crate::{
|
||||
strip_path_prefix, Download, DownloadError, RemoteObjectName, RemoteStorage, S3Config,
|
||||
};
|
||||
use crate::{strip_path_prefix, Download, DownloadError, RemoteStorage, S3Config};
|
||||
|
||||
use super::StorageMetadata;
|
||||
|
||||
@@ -96,6 +94,23 @@ const S3_PREFIX_SEPARATOR: char = '/';
|
||||
pub struct S3ObjectKey(String);
|
||||
|
||||
impl S3ObjectKey {
|
||||
/// Turn a/b/c or a/b/c/ into c
|
||||
pub fn object_name(&self) -> Option<&str> {
|
||||
// corner case, char::to_string is not const, thats why this is more verbose than it needs to be
|
||||
// see https://github.com/rust-lang/rust/issues/88674
|
||||
if self.0.len() == 1 && self.0.chars().next().unwrap() == S3_PREFIX_SEPARATOR {
|
||||
return None;
|
||||
}
|
||||
|
||||
if self.0.ends_with(S3_PREFIX_SEPARATOR) {
|
||||
self.0.rsplit(S3_PREFIX_SEPARATOR).nth(1)
|
||||
} else {
|
||||
self.0
|
||||
.rsplit_once(S3_PREFIX_SEPARATOR)
|
||||
.map(|(_, last)| last)
|
||||
}
|
||||
}
|
||||
|
||||
fn key(&self) -> &str {
|
||||
&self.0
|
||||
}
|
||||
@@ -119,25 +134,6 @@ impl S3ObjectKey {
|
||||
}
|
||||
}
|
||||
|
||||
impl RemoteObjectName for S3ObjectKey {
|
||||
/// Turn a/b/c or a/b/c/ into c
|
||||
fn object_name(&self) -> Option<&str> {
|
||||
// corner case, char::to_string is not const, thats why this is more verbose than it needs to be
|
||||
// see https://github.com/rust-lang/rust/issues/88674
|
||||
if self.0.len() == 1 && self.0.chars().next().unwrap() == S3_PREFIX_SEPARATOR {
|
||||
return None;
|
||||
}
|
||||
|
||||
if self.0.ends_with(S3_PREFIX_SEPARATOR) {
|
||||
self.0.rsplit(S3_PREFIX_SEPARATOR).nth(1)
|
||||
} else {
|
||||
self.0
|
||||
.rsplit_once(S3_PREFIX_SEPARATOR)
|
||||
.map(|(_, last)| last)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// AWS S3 storage.
|
||||
pub struct S3Bucket {
|
||||
workdir: PathBuf,
|
||||
@@ -316,11 +312,11 @@ impl RemoteStorage for S3Bucket {
|
||||
/// Note: it wont include empty "directories"
|
||||
async fn list_prefixes(
|
||||
&self,
|
||||
prefix: Option<Self::RemoteObjectId>,
|
||||
prefix: Option<&Self::RemoteObjectId>,
|
||||
) -> anyhow::Result<Vec<Self::RemoteObjectId>> {
|
||||
// get the passed prefix or if it is not set use prefix_in_bucket value
|
||||
let list_prefix = prefix
|
||||
.map(|p| p.0)
|
||||
.map(|p| p.0.clone())
|
||||
.or_else(|| self.prefix_in_bucket.clone())
|
||||
.map(|mut p| {
|
||||
// required to end with a separator
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
//! Main entry point for the Page Server executable.
|
||||
|
||||
use std::{env, ops::ControlFlow, path::Path, str::FromStr};
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use std::{env, ops::ControlFlow, path::Path, str::FromStr, sync::Arc};
|
||||
use tracing::*;
|
||||
|
||||
use anyhow::{bail, Context, Result};
|
||||
@@ -298,7 +299,14 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
|
||||
};
|
||||
info!("Using auth: {:#?}", conf.auth_type);
|
||||
|
||||
let remote_index = tenant_mgr::init_tenant_mgr(conf)?;
|
||||
let remote_storage = conf
|
||||
.remote_storage_config
|
||||
.as_ref()
|
||||
.map(|storage_config| GenericRemoteStorage::new(conf.workdir.clone(), storage_config))
|
||||
.transpose()
|
||||
.context("Failed to init generic remote storage")?
|
||||
.map(Arc::new);
|
||||
let remote_index = tenant_mgr::init_tenant_mgr(conf, remote_storage.as_ref().map(Arc::clone))?;
|
||||
|
||||
// Spawn a new thread for the http endpoint
|
||||
// bind before launching separate thread so the error reported before startup exits
|
||||
@@ -310,7 +318,7 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
|
||||
"http_endpoint_thread",
|
||||
true,
|
||||
move || {
|
||||
let router = http::make_router(conf, auth_cloned, remote_index)?;
|
||||
let router = http::make_router(conf, auth_cloned, remote_index, remote_storage)?;
|
||||
endpoint::serve_thread_main(router, http_listener, thread_mgr::shutdown_watcher())
|
||||
},
|
||||
)?;
|
||||
|
||||
@@ -35,7 +35,7 @@ struct State {
|
||||
auth: Option<Arc<JwtAuth>>,
|
||||
remote_index: RemoteIndex,
|
||||
allowlist_routes: Vec<Uri>,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
remote_storage: Option<Arc<GenericRemoteStorage>>,
|
||||
}
|
||||
|
||||
impl State {
|
||||
@@ -43,20 +43,12 @@ impl State {
|
||||
conf: &'static PageServerConf,
|
||||
auth: Option<Arc<JwtAuth>>,
|
||||
remote_index: RemoteIndex,
|
||||
remote_storage: Option<Arc<GenericRemoteStorage>>,
|
||||
) -> anyhow::Result<Self> {
|
||||
let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml"]
|
||||
.iter()
|
||||
.map(|v| v.parse().unwrap())
|
||||
.collect::<Vec<_>>();
|
||||
// Note that this remote storage is created separately from the main one in the sync_loop.
|
||||
// It's fine since it's stateless and some code duplication saves us from bloating the code around with generics.
|
||||
let remote_storage = conf
|
||||
.remote_storage_config
|
||||
.as_ref()
|
||||
.map(|storage_config| GenericRemoteStorage::new(conf.workdir.clone(), storage_config))
|
||||
.transpose()
|
||||
.context("Failed to init generic remote storage")?;
|
||||
|
||||
Ok(Self {
|
||||
conf,
|
||||
auth,
|
||||
@@ -448,16 +440,8 @@ async fn gather_tenant_timelines_index_parts(
|
||||
tenant_id: ZTenantId,
|
||||
) -> anyhow::Result<Option<Vec<(ZTimelineId, RemoteTimeline)>>> {
|
||||
let index_parts = match state.remote_storage.as_ref() {
|
||||
Some(GenericRemoteStorage::Local(local_storage)) => {
|
||||
storage_sync::gather_tenant_timelines_index_parts(state.conf, local_storage, tenant_id)
|
||||
.await
|
||||
}
|
||||
// FIXME here s3 storage contains its own limits, that are separate from sync storage thread ones
|
||||
// because it is a different instance. We can move this limit to some global static
|
||||
// or use one instance everywhere.
|
||||
Some(GenericRemoteStorage::S3(s3_storage)) => {
|
||||
storage_sync::gather_tenant_timelines_index_parts(state.conf, s3_storage, tenant_id)
|
||||
.await
|
||||
Some(storage) => {
|
||||
storage_sync::gather_tenant_timelines_index_parts(state.conf, storage, tenant_id).await
|
||||
}
|
||||
None => return Ok(None),
|
||||
}
|
||||
@@ -714,6 +698,7 @@ pub fn make_router(
|
||||
conf: &'static PageServerConf,
|
||||
auth: Option<Arc<JwtAuth>>,
|
||||
remote_index: RemoteIndex,
|
||||
remote_storage: Option<Arc<GenericRemoteStorage>>,
|
||||
) -> anyhow::Result<RouterBuilder<hyper::Body, ApiError>> {
|
||||
let spec = include_bytes!("openapi_spec.yml");
|
||||
let mut router = attach_openapi_ui(endpoint::make_router(), spec, "/swagger.yml", "/v1/doc");
|
||||
@@ -730,7 +715,8 @@ pub fn make_router(
|
||||
|
||||
Ok(router
|
||||
.data(Arc::new(
|
||||
State::new(conf, auth, remote_index).context("Failed to initialize router state")?,
|
||||
State::new(conf, auth, remote_index, remote_storage)
|
||||
.context("Failed to initialize router state")?,
|
||||
))
|
||||
.get("/v1/status", status_handler)
|
||||
.get("/v1/tenant", tenant_list_handler)
|
||||
|
||||
@@ -156,7 +156,7 @@ use std::{
|
||||
use anyhow::{anyhow, bail, Context};
|
||||
use futures::stream::{FuturesUnordered, StreamExt};
|
||||
use once_cell::sync::{Lazy, OnceCell};
|
||||
use remote_storage::{GenericRemoteStorage, RemoteStorage};
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use tokio::{
|
||||
fs,
|
||||
runtime::Runtime,
|
||||
@@ -253,36 +253,20 @@ pub struct SyncStartupData {
|
||||
/// Along with that, scans tenant files local and remote (if the sync gets enabled) to check the initial timeline states.
|
||||
pub fn start_local_timeline_sync(
|
||||
config: &'static PageServerConf,
|
||||
storage: Option<Arc<GenericRemoteStorage>>,
|
||||
) -> anyhow::Result<SyncStartupData> {
|
||||
let local_timeline_files = local_tenant_timeline_files(config)
|
||||
.context("Failed to collect local tenant timeline files")?;
|
||||
|
||||
match config.remote_storage_config.as_ref() {
|
||||
Some(storage_config) => {
|
||||
match GenericRemoteStorage::new(config.workdir.clone(), storage_config)
|
||||
.context("Failed to init the generic remote storage")?
|
||||
{
|
||||
GenericRemoteStorage::Local(local_fs_storage) => {
|
||||
storage_sync::spawn_storage_sync_thread(
|
||||
config,
|
||||
local_timeline_files,
|
||||
local_fs_storage,
|
||||
storage_config.max_concurrent_syncs,
|
||||
storage_config.max_sync_errors,
|
||||
)
|
||||
}
|
||||
GenericRemoteStorage::S3(s3_bucket_storage) => {
|
||||
storage_sync::spawn_storage_sync_thread(
|
||||
config,
|
||||
local_timeline_files,
|
||||
s3_bucket_storage,
|
||||
storage_config.max_concurrent_syncs,
|
||||
storage_config.max_sync_errors,
|
||||
)
|
||||
}
|
||||
}
|
||||
.context("Failed to spawn the storage sync thread")
|
||||
}
|
||||
match storage.zip(config.remote_storage_config.as_ref()) {
|
||||
Some((storage, storage_config)) => storage_sync::spawn_storage_sync_thread(
|
||||
config,
|
||||
local_timeline_files,
|
||||
storage,
|
||||
storage_config.max_concurrent_syncs,
|
||||
storage_config.max_sync_errors,
|
||||
)
|
||||
.context("Failed to spawn the storage sync thread"),
|
||||
None => {
|
||||
info!("No remote storage configured, skipping storage sync, considering all local timelines with correct metadata files enabled");
|
||||
let mut local_timeline_init_statuses = LocalTimelineInitStatuses::new();
|
||||
@@ -810,17 +794,13 @@ pub fn schedule_layer_download(tenant_id: ZTenantId, timeline_id: ZTimelineId) {
|
||||
|
||||
/// Launch a thread to perform remote storage sync tasks.
|
||||
/// See module docs for loop step description.
|
||||
pub(super) fn spawn_storage_sync_thread<P, S>(
|
||||
pub(super) fn spawn_storage_sync_thread(
|
||||
conf: &'static PageServerConf,
|
||||
local_timeline_files: HashMap<ZTenantTimelineId, (TimelineMetadata, HashSet<PathBuf>)>,
|
||||
storage: S,
|
||||
storage: Arc<GenericRemoteStorage>,
|
||||
max_concurrent_timelines_sync: NonZeroUsize,
|
||||
max_sync_errors: NonZeroU32,
|
||||
) -> anyhow::Result<SyncStartupData>
|
||||
where
|
||||
P: Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
|
||||
{
|
||||
) -> anyhow::Result<SyncStartupData> {
|
||||
let sync_queue = SyncQueue::new(max_concurrent_timelines_sync);
|
||||
SYNC_QUEUE
|
||||
.set(sync_queue)
|
||||
@@ -860,7 +840,7 @@ where
|
||||
storage_sync_loop(
|
||||
runtime,
|
||||
conf,
|
||||
(Arc::new(storage), remote_index_clone, sync_queue),
|
||||
(storage, remote_index_clone, sync_queue),
|
||||
max_sync_errors,
|
||||
);
|
||||
Ok(())
|
||||
@@ -873,15 +853,12 @@ where
|
||||
})
|
||||
}
|
||||
|
||||
fn storage_sync_loop<P, S>(
|
||||
fn storage_sync_loop(
|
||||
runtime: Runtime,
|
||||
conf: &'static PageServerConf,
|
||||
(storage, index, sync_queue): (Arc<S>, RemoteIndex, &SyncQueue),
|
||||
(storage, index, sync_queue): (Arc<GenericRemoteStorage>, RemoteIndex, &SyncQueue),
|
||||
max_sync_errors: NonZeroU32,
|
||||
) where
|
||||
P: Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
|
||||
{
|
||||
) {
|
||||
info!("Starting remote storage sync loop");
|
||||
loop {
|
||||
let loop_storage = Arc::clone(&storage);
|
||||
@@ -983,18 +960,14 @@ enum UploadStatus {
|
||||
Nothing,
|
||||
}
|
||||
|
||||
async fn process_batches<P, S>(
|
||||
async fn process_batches(
|
||||
conf: &'static PageServerConf,
|
||||
max_sync_errors: NonZeroU32,
|
||||
storage: Arc<S>,
|
||||
storage: Arc<GenericRemoteStorage>,
|
||||
index: &RemoteIndex,
|
||||
batched_tasks: HashMap<ZTenantTimelineId, SyncTaskBatch>,
|
||||
sync_queue: &SyncQueue,
|
||||
) -> HashSet<ZTenantId>
|
||||
where
|
||||
P: Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
|
||||
{
|
||||
) -> HashSet<ZTenantId> {
|
||||
let mut sync_results = batched_tasks
|
||||
.into_iter()
|
||||
.map(|(sync_id, batch)| {
|
||||
@@ -1030,17 +1003,13 @@ where
|
||||
downloaded_timelines
|
||||
}
|
||||
|
||||
async fn process_sync_task_batch<P, S>(
|
||||
async fn process_sync_task_batch(
|
||||
conf: &'static PageServerConf,
|
||||
(storage, index, sync_queue): (Arc<S>, RemoteIndex, &SyncQueue),
|
||||
(storage, index, sync_queue): (Arc<GenericRemoteStorage>, RemoteIndex, &SyncQueue),
|
||||
max_sync_errors: NonZeroU32,
|
||||
sync_id: ZTenantTimelineId,
|
||||
batch: SyncTaskBatch,
|
||||
) -> DownloadStatus
|
||||
where
|
||||
P: Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
|
||||
{
|
||||
) -> DownloadStatus {
|
||||
let sync_start = Instant::now();
|
||||
let current_remote_timeline = { index.read().await.timeline_entry(&sync_id).cloned() };
|
||||
|
||||
@@ -1175,19 +1144,15 @@ where
|
||||
download_status
|
||||
}
|
||||
|
||||
async fn download_timeline_data<P, S>(
|
||||
async fn download_timeline_data(
|
||||
conf: &'static PageServerConf,
|
||||
(storage, index, sync_queue): (&S, &RemoteIndex, &SyncQueue),
|
||||
(storage, index, sync_queue): (&GenericRemoteStorage, &RemoteIndex, &SyncQueue),
|
||||
current_remote_timeline: Option<&RemoteTimeline>,
|
||||
sync_id: ZTenantTimelineId,
|
||||
new_download_data: SyncData<LayersDownload>,
|
||||
sync_start: Instant,
|
||||
task_name: &str,
|
||||
) -> DownloadStatus
|
||||
where
|
||||
P: Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
|
||||
{
|
||||
) -> DownloadStatus {
|
||||
match download_timeline_layers(
|
||||
conf,
|
||||
storage,
|
||||
@@ -1298,17 +1263,14 @@ async fn update_local_metadata(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn delete_timeline_data<P, S>(
|
||||
async fn delete_timeline_data(
|
||||
conf: &'static PageServerConf,
|
||||
(storage, index, sync_queue): (&S, &RemoteIndex, &SyncQueue),
|
||||
(storage, index, sync_queue): (&GenericRemoteStorage, &RemoteIndex, &SyncQueue),
|
||||
sync_id: ZTenantTimelineId,
|
||||
mut new_delete_data: SyncData<LayersDeletion>,
|
||||
sync_start: Instant,
|
||||
task_name: &str,
|
||||
) where
|
||||
P: Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
|
||||
{
|
||||
) {
|
||||
let timeline_delete = &mut new_delete_data.data;
|
||||
|
||||
if !timeline_delete.deletion_registered {
|
||||
@@ -1343,19 +1305,15 @@ async fn read_metadata_file(metadata_path: &Path) -> anyhow::Result<TimelineMeta
|
||||
.context("Failed to parse metadata bytes")
|
||||
}
|
||||
|
||||
async fn upload_timeline_data<P, S>(
|
||||
async fn upload_timeline_data(
|
||||
conf: &'static PageServerConf,
|
||||
(storage, index, sync_queue): (&S, &RemoteIndex, &SyncQueue),
|
||||
(storage, index, sync_queue): (&GenericRemoteStorage, &RemoteIndex, &SyncQueue),
|
||||
current_remote_timeline: Option<&RemoteTimeline>,
|
||||
sync_id: ZTenantTimelineId,
|
||||
new_upload_data: SyncData<LayersUpload>,
|
||||
sync_start: Instant,
|
||||
task_name: &str,
|
||||
) -> UploadStatus
|
||||
where
|
||||
P: Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
|
||||
{
|
||||
) -> UploadStatus {
|
||||
let mut uploaded_data = match upload_timeline_layers(
|
||||
storage,
|
||||
sync_queue,
|
||||
@@ -1406,17 +1364,13 @@ enum RemoteDataUpdate<'a> {
|
||||
Delete(&'a HashSet<PathBuf>),
|
||||
}
|
||||
|
||||
async fn update_remote_data<P, S>(
|
||||
async fn update_remote_data(
|
||||
conf: &'static PageServerConf,
|
||||
storage: &S,
|
||||
storage: &GenericRemoteStorage,
|
||||
index: &RemoteIndex,
|
||||
sync_id: ZTenantTimelineId,
|
||||
update: RemoteDataUpdate<'_>,
|
||||
) -> anyhow::Result<()>
|
||||
where
|
||||
P: Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
|
||||
{
|
||||
) -> anyhow::Result<()> {
|
||||
let updated_remote_timeline = {
|
||||
let mut index_accessor = index.write().await;
|
||||
|
||||
|
||||
@@ -1,27 +1,25 @@
|
||||
//! Timeline synchronization logic to delete a bulk of timeline's remote files from the remote storage.
|
||||
|
||||
use std::path::Path;
|
||||
|
||||
use anyhow::Context;
|
||||
use futures::stream::{FuturesUnordered, StreamExt};
|
||||
use tracing::{debug, error, info};
|
||||
|
||||
use crate::storage_sync::{SyncQueue, SyncTask};
|
||||
use remote_storage::RemoteStorage;
|
||||
use remote_storage::{GenericRemoteStorage, RemoteStorage};
|
||||
use utils::zid::ZTenantTimelineId;
|
||||
|
||||
use super::{LayersDeletion, SyncData};
|
||||
|
||||
/// Attempts to remove the timleline layers from the remote storage.
|
||||
/// If the task had not adjusted the metadata before, the deletion will fail.
|
||||
pub(super) async fn delete_timeline_layers<'a, P, S>(
|
||||
storage: &'a S,
|
||||
pub(super) async fn delete_timeline_layers<'a>(
|
||||
storage: &'a GenericRemoteStorage,
|
||||
sync_queue: &SyncQueue,
|
||||
sync_id: ZTenantTimelineId,
|
||||
mut delete_data: SyncData<LayersDeletion>,
|
||||
) -> bool
|
||||
where
|
||||
P: std::fmt::Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
|
||||
{
|
||||
) -> bool {
|
||||
if !delete_data.data.deletion_registered {
|
||||
error!("Cannot delete timeline layers before the deletion metadata is not registered, reenqueueing");
|
||||
delete_data.retries += 1;
|
||||
@@ -45,25 +43,14 @@ where
|
||||
let mut delete_tasks = layers_to_delete
|
||||
.into_iter()
|
||||
.map(|local_layer_path| async {
|
||||
let storage_path =
|
||||
match storage
|
||||
.remote_object_id(&local_layer_path)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to get the layer storage path for local path '{}'",
|
||||
local_layer_path.display()
|
||||
)
|
||||
}) {
|
||||
Ok(path) => path,
|
||||
Err(e) => return Err((e, local_layer_path)),
|
||||
};
|
||||
|
||||
match storage.delete(&storage_path).await.with_context(|| {
|
||||
format!(
|
||||
"Failed to delete remote layer from storage at '{:?}'",
|
||||
storage_path
|
||||
)
|
||||
}) {
|
||||
match match storage {
|
||||
GenericRemoteStorage::Local(storage) => {
|
||||
remove_storage_object(storage, &local_layer_path).await
|
||||
}
|
||||
GenericRemoteStorage::S3(storage) => {
|
||||
remove_storage_object(storage, &local_layer_path).await
|
||||
}
|
||||
} {
|
||||
Ok(()) => Ok(local_layer_path),
|
||||
Err(e) => Err((e, local_layer_path)),
|
||||
}
|
||||
@@ -101,6 +88,28 @@ where
|
||||
errored
|
||||
}
|
||||
|
||||
async fn remove_storage_object<P, S>(storage: &S, local_layer_path: &Path) -> anyhow::Result<()>
|
||||
where
|
||||
P: std::fmt::Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
|
||||
{
|
||||
let storage_path = storage
|
||||
.remote_object_id(local_layer_path)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to get the layer storage path for local path '{}'",
|
||||
local_layer_path.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
storage.delete(&storage_path).await.with_context(|| {
|
||||
format!(
|
||||
"Failed to delete remote layer from storage at '{:?}'",
|
||||
storage_path
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{collections::HashSet, num::NonZeroUsize};
|
||||
@@ -114,7 +123,7 @@ mod tests {
|
||||
layered_repository::repo_harness::{RepoHarness, TIMELINE_ID},
|
||||
storage_sync::test_utils::{create_local_timeline, dummy_metadata},
|
||||
};
|
||||
use remote_storage::LocalFs;
|
||||
use remote_storage::{LocalFs, RemoteStorage};
|
||||
|
||||
use super::*;
|
||||
|
||||
@@ -123,10 +132,10 @@ mod tests {
|
||||
let harness = RepoHarness::create("delete_timeline_negative")?;
|
||||
let sync_queue = SyncQueue::new(NonZeroUsize::new(100).unwrap());
|
||||
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
|
||||
let storage = LocalFs::new(
|
||||
let storage = GenericRemoteStorage::Local(LocalFs::new(
|
||||
tempdir()?.path().to_path_buf(),
|
||||
harness.conf.workdir.clone(),
|
||||
)?;
|
||||
)?);
|
||||
|
||||
let deleted = delete_timeline_layers(
|
||||
&storage,
|
||||
@@ -158,17 +167,20 @@ mod tests {
|
||||
|
||||
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
|
||||
let layer_files = ["a", "b", "c", "d"];
|
||||
let storage = LocalFs::new(
|
||||
let storage = GenericRemoteStorage::Local(LocalFs::new(
|
||||
tempdir()?.path().to_path_buf(),
|
||||
harness.conf.workdir.clone(),
|
||||
)?;
|
||||
)?);
|
||||
|
||||
let local_storage = storage.as_local().unwrap();
|
||||
|
||||
let current_retries = 3;
|
||||
let metadata = dummy_metadata(Lsn(0x30));
|
||||
let local_timeline_path = harness.timeline_path(&TIMELINE_ID);
|
||||
let timeline_upload =
|
||||
create_local_timeline(&harness, TIMELINE_ID, &layer_files, metadata.clone()).await?;
|
||||
for local_path in timeline_upload.layers_to_upload {
|
||||
let remote_path = storage.remote_object_id(&local_path)?;
|
||||
let remote_path = local_storage.remote_object_id(&local_path)?;
|
||||
let remote_parent_dir = remote_path.parent().unwrap();
|
||||
if !remote_parent_dir.exists() {
|
||||
fs::create_dir_all(&remote_parent_dir).await?;
|
||||
@@ -176,11 +188,11 @@ mod tests {
|
||||
fs::copy(&local_path, &remote_path).await?;
|
||||
}
|
||||
assert_eq!(
|
||||
storage
|
||||
local_storage
|
||||
.list()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|remote_path| storage.local_path(&remote_path).unwrap())
|
||||
.map(|remote_path| local_storage.local_path(&remote_path).unwrap())
|
||||
.filter_map(|local_path| { Some(local_path.file_name()?.to_str()?.to_owned()) })
|
||||
.sorted()
|
||||
.collect::<Vec<_>>(),
|
||||
@@ -213,11 +225,11 @@ mod tests {
|
||||
assert!(deleted, "Should be able to delete timeline files");
|
||||
|
||||
assert_eq!(
|
||||
storage
|
||||
local_storage
|
||||
.list()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|remote_path| storage.local_path(&remote_path).unwrap())
|
||||
.map(|remote_path| local_storage.local_path(&remote_path).unwrap())
|
||||
.filter_map(|local_path| { Some(local_path.file_name()?.to_str()?.to_owned()) })
|
||||
.sorted()
|
||||
.collect::<Vec<_>>(),
|
||||
|
||||
@@ -9,7 +9,9 @@ use std::{
|
||||
|
||||
use anyhow::Context;
|
||||
use futures::stream::{FuturesUnordered, StreamExt};
|
||||
use remote_storage::{path_with_suffix_extension, DownloadError, RemoteObjectName, RemoteStorage};
|
||||
use remote_storage::{
|
||||
path_with_suffix_extension, Download, DownloadError, GenericRemoteStorage, RemoteStorage,
|
||||
};
|
||||
use tokio::{
|
||||
fs,
|
||||
io::{self, AsyncWriteExt},
|
||||
@@ -62,15 +64,11 @@ impl Default for TenantIndexParts {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn download_index_parts<P, S>(
|
||||
pub async fn download_index_parts(
|
||||
conf: &'static PageServerConf,
|
||||
storage: &S,
|
||||
storage: &GenericRemoteStorage,
|
||||
keys: HashSet<ZTenantTimelineId>,
|
||||
) -> HashMap<ZTenantId, TenantIndexParts>
|
||||
where
|
||||
P: Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
|
||||
{
|
||||
) -> HashMap<ZTenantId, TenantIndexParts> {
|
||||
let mut index_parts: HashMap<ZTenantId, TenantIndexParts> = HashMap::new();
|
||||
|
||||
let mut part_downloads = keys
|
||||
@@ -114,60 +112,17 @@ where
|
||||
/// Note: The function is rather expensive from s3 access point of view, it will execute ceil(N/1000) + N requests.
|
||||
/// At least one request to obtain a list of tenant timelines (more requests is there are more than 1000 timelines).
|
||||
/// And then will attempt to download all index files that belong to these timelines.
|
||||
pub async fn gather_tenant_timelines_index_parts<P, S>(
|
||||
pub async fn gather_tenant_timelines_index_parts(
|
||||
conf: &'static PageServerConf,
|
||||
storage: &S,
|
||||
storage: &GenericRemoteStorage,
|
||||
tenant_id: ZTenantId,
|
||||
) -> anyhow::Result<HashMap<ZTimelineId, IndexPart>>
|
||||
where
|
||||
P: RemoteObjectName + Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
|
||||
{
|
||||
) -> anyhow::Result<HashMap<ZTimelineId, IndexPart>> {
|
||||
let tenant_path = conf.timelines_path(&tenant_id);
|
||||
let tenant_storage_path = storage.remote_object_id(&tenant_path).with_context(|| {
|
||||
format!(
|
||||
"Failed to get tenant storage path for local path '{}'",
|
||||
tenant_path.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
let timelines = storage
|
||||
.list_prefixes(Some(tenant_storage_path))
|
||||
let timeline_sync_ids = get_timeline_sync_ids(storage, &tenant_path, tenant_id)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to list tenant storage path to get remote timelines to download: {}",
|
||||
tenant_id
|
||||
)
|
||||
})?;
|
||||
.with_context(|| format!("Failed to list timeline sync ids for tenat {tenant_id}"))?;
|
||||
|
||||
if timelines.is_empty() {
|
||||
anyhow::bail!(
|
||||
"no timelines found on the remote storage for tenant {}",
|
||||
tenant_id
|
||||
)
|
||||
}
|
||||
|
||||
let mut sync_ids = HashSet::new();
|
||||
|
||||
for timeline_remote_storage_key in timelines {
|
||||
let object_name = timeline_remote_storage_key.object_name().ok_or_else(|| {
|
||||
anyhow::anyhow!("failed to get timeline id for remote tenant {tenant_id}")
|
||||
})?;
|
||||
|
||||
let timeline_id: ZTimelineId = object_name
|
||||
.parse()
|
||||
.with_context(|| {
|
||||
format!("failed to parse object name into timeline id for tenant {tenant_id} '{object_name}'")
|
||||
})?;
|
||||
|
||||
sync_ids.insert(ZTenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
});
|
||||
}
|
||||
|
||||
match download_index_parts(conf, storage, sync_ids)
|
||||
match download_index_parts(conf, storage, timeline_sync_ids)
|
||||
.await
|
||||
.remove(&tenant_id)
|
||||
.ok_or_else(|| anyhow::anyhow!("Missing tenant index parts. This is a bug."))?
|
||||
@@ -180,29 +135,15 @@ where
|
||||
}
|
||||
|
||||
/// Retrieves index data from the remote storage for a given timeline.
|
||||
async fn download_index_part<P, S>(
|
||||
async fn download_index_part(
|
||||
conf: &'static PageServerConf,
|
||||
storage: &S,
|
||||
storage: &GenericRemoteStorage,
|
||||
sync_id: ZTenantTimelineId,
|
||||
) -> Result<IndexPart, DownloadError>
|
||||
where
|
||||
P: Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
|
||||
{
|
||||
) -> Result<IndexPart, DownloadError> {
|
||||
let index_part_path = metadata_path(conf, sync_id.timeline_id, sync_id.tenant_id)
|
||||
.with_file_name(IndexPart::FILE_NAME)
|
||||
.with_extension(IndexPart::FILE_EXTENSION);
|
||||
let part_storage_path = storage
|
||||
.remote_object_id(&index_part_path)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to get the index part storage path for local path '{}'",
|
||||
index_part_path.display()
|
||||
)
|
||||
})
|
||||
.map_err(DownloadError::BadInput)?;
|
||||
|
||||
let mut index_part_download = storage.download(&part_storage_path).await?;
|
||||
let mut index_part_download = download_storage_object(storage, &index_part_path).await?;
|
||||
|
||||
let mut index_part_bytes = Vec::new();
|
||||
io::copy(
|
||||
@@ -211,14 +152,18 @@ where
|
||||
)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("Failed to download an index part from storage path {part_storage_path:?}")
|
||||
format!(
|
||||
"Failed to download an index part into file '{}'",
|
||||
index_part_path.display()
|
||||
)
|
||||
})
|
||||
.map_err(DownloadError::Other)?;
|
||||
|
||||
let index_part: IndexPart = serde_json::from_slice(&index_part_bytes)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to deserialize index part file from storage path '{part_storage_path:?}'"
|
||||
"Failed to deserialize index part file into file '{}'",
|
||||
index_part_path.display()
|
||||
)
|
||||
})
|
||||
.map_err(DownloadError::Other)?;
|
||||
@@ -249,18 +194,14 @@ pub(super) enum DownloadedTimeline {
|
||||
/// updated in the end, if the remote one contains a newer disk_consistent_lsn.
|
||||
///
|
||||
/// On an error, bumps the retries count and updates the files to skip with successful downloads, rescheduling the task.
|
||||
pub(super) async fn download_timeline_layers<'a, P, S>(
|
||||
pub(super) async fn download_timeline_layers<'a>(
|
||||
conf: &'static PageServerConf,
|
||||
storage: &'a S,
|
||||
storage: &'a GenericRemoteStorage,
|
||||
sync_queue: &'a SyncQueue,
|
||||
remote_timeline: Option<&'a RemoteTimeline>,
|
||||
sync_id: ZTenantTimelineId,
|
||||
mut download_data: SyncData<LayersDownload>,
|
||||
) -> DownloadedTimeline
|
||||
where
|
||||
P: Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
|
||||
{
|
||||
) -> DownloadedTimeline {
|
||||
let remote_timeline = match remote_timeline {
|
||||
Some(remote_timeline) => {
|
||||
if !remote_timeline.awaits_download {
|
||||
@@ -300,15 +241,6 @@ where
|
||||
layer_desination_path.display()
|
||||
);
|
||||
} else {
|
||||
let layer_storage_path = storage
|
||||
.remote_object_id(&layer_desination_path)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to get the layer storage path for local path '{}'",
|
||||
layer_desination_path.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
// Perform a rename inspired by durable_rename from file_utils.c.
|
||||
// The sequence:
|
||||
// write(tmp)
|
||||
@@ -329,19 +261,23 @@ where
|
||||
temp_file_path.display()
|
||||
)
|
||||
})?;
|
||||
let mut download = storage
|
||||
.download(&layer_storage_path)
|
||||
|
||||
let mut layer_download = download_storage_object(storage, &layer_desination_path)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to open a download stream for layer with remote storage path '{layer_storage_path:?}'"
|
||||
"Failed to initiate the download the layer for {sync_id} into file '{}'",
|
||||
temp_file_path.display()
|
||||
)
|
||||
})?;
|
||||
io::copy(&mut layer_download.download_stream, &mut destination_file)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to download the layer for {sync_id} into file '{}'",
|
||||
temp_file_path.display()
|
||||
)
|
||||
})?;
|
||||
io::copy(&mut download.download_stream, &mut destination_file).await.with_context(|| {
|
||||
format!(
|
||||
"Failed to download layer with remote storage path '{layer_storage_path:?}' into file '{}'", temp_file_path.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
// Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that:
|
||||
// A file will not be closed immediately when it goes out of scope if there are any IO operations
|
||||
@@ -429,6 +365,121 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
async fn download_storage_object(
|
||||
storage: &GenericRemoteStorage,
|
||||
to_path: &Path,
|
||||
) -> Result<Download, DownloadError> {
|
||||
async fn do_download_storage_object<P, S>(
|
||||
storage: &S,
|
||||
to_path: &Path,
|
||||
) -> Result<Download, DownloadError>
|
||||
where
|
||||
P: std::fmt::Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
|
||||
{
|
||||
let remote_object_path = storage
|
||||
.remote_object_id(to_path)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to get the storage path for target local path '{}'",
|
||||
to_path.display()
|
||||
)
|
||||
})
|
||||
.map_err(DownloadError::BadInput)?;
|
||||
|
||||
storage.download(&remote_object_path).await
|
||||
}
|
||||
|
||||
match storage {
|
||||
GenericRemoteStorage::Local(storage) => do_download_storage_object(storage, to_path).await,
|
||||
GenericRemoteStorage::S3(storage) => do_download_storage_object(storage, to_path).await,
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_timeline_sync_ids(
|
||||
storage: &GenericRemoteStorage,
|
||||
tenant_path: &Path,
|
||||
tenant_id: ZTenantId,
|
||||
) -> anyhow::Result<HashSet<ZTenantTimelineId>> {
|
||||
let timeline_ids: Vec<ZTimelineId> = match storage {
|
||||
GenericRemoteStorage::Local(storage) => list_prefixes(storage, tenant_path)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|timeline_directory_path| {
|
||||
timeline_directory_path
|
||||
.file_stem()
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to get timeline id string from file '{}'",
|
||||
timeline_directory_path.display()
|
||||
)
|
||||
})?
|
||||
.to_string_lossy()
|
||||
.as_ref()
|
||||
.parse()
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"failed to parse directory name '{}' as timeline id",
|
||||
timeline_directory_path.display()
|
||||
)
|
||||
})
|
||||
})
|
||||
.collect::<anyhow::Result<_>>(),
|
||||
GenericRemoteStorage::S3(storage) => list_prefixes(storage, tenant_path)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|s3_path| {
|
||||
s3_path
|
||||
.object_name()
|
||||
.with_context(|| {
|
||||
format!("Failed to get object name out of S3 path {s3_path:?}")
|
||||
})?
|
||||
.parse()
|
||||
.with_context(|| {
|
||||
format!("failed to parse object name '{s3_path:?}' as timeline id")
|
||||
})
|
||||
})
|
||||
.collect::<anyhow::Result<_>>(),
|
||||
}
|
||||
.with_context(|| {
|
||||
format!("Tenant {tenant_id} has at least one incorrect timeline subdirectory")
|
||||
})?;
|
||||
|
||||
if timeline_ids.is_empty() {
|
||||
anyhow::bail!("no timelines found on the remote storage for tenant {tenant_id}")
|
||||
}
|
||||
|
||||
Ok(timeline_ids
|
||||
.into_iter()
|
||||
.map(|timeline_id| ZTenantTimelineId {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
|
||||
async fn list_prefixes<P, S>(storage: &S, tenant_path: &Path) -> anyhow::Result<Vec<P>>
|
||||
where
|
||||
P: std::fmt::Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
|
||||
{
|
||||
let tenant_storage_path = storage.remote_object_id(tenant_path).with_context(|| {
|
||||
format!(
|
||||
"Failed to get tenant storage path for local path '{}'",
|
||||
tenant_path.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
storage
|
||||
.list_prefixes(Some(&tenant_storage_path))
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to list tenant storage path {tenant_storage_path:?} to get remote timelines to download"
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
async fn fsync_path(path: impl AsRef<Path>) -> Result<(), io::Error> {
|
||||
fs::File::open(path).await?.sync_all().await
|
||||
}
|
||||
@@ -461,10 +512,11 @@ mod tests {
|
||||
|
||||
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
|
||||
let layer_files = ["a", "b", "layer_to_skip", "layer_to_keep_locally"];
|
||||
let storage = LocalFs::new(
|
||||
tempdir()?.path().to_path_buf(),
|
||||
let storage = GenericRemoteStorage::Local(LocalFs::new(
|
||||
tempdir()?.path().to_owned(),
|
||||
harness.conf.workdir.clone(),
|
||||
)?;
|
||||
)?);
|
||||
let local_storage = storage.as_local().unwrap();
|
||||
let current_retries = 3;
|
||||
let metadata = dummy_metadata(Lsn(0x30));
|
||||
let local_timeline_path = harness.timeline_path(&TIMELINE_ID);
|
||||
@@ -472,7 +524,7 @@ mod tests {
|
||||
create_local_timeline(&harness, TIMELINE_ID, &layer_files, metadata.clone()).await?;
|
||||
|
||||
for local_path in timeline_upload.layers_to_upload {
|
||||
let remote_path = storage.remote_object_id(&local_path)?;
|
||||
let remote_path = local_storage.remote_object_id(&local_path)?;
|
||||
let remote_parent_dir = remote_path.parent().unwrap();
|
||||
if !remote_parent_dir.exists() {
|
||||
fs::create_dir_all(&remote_parent_dir).await?;
|
||||
@@ -558,7 +610,10 @@ mod tests {
|
||||
let harness = RepoHarness::create("download_timeline_negatives")?;
|
||||
let sync_queue = SyncQueue::new(NonZeroUsize::new(100).unwrap());
|
||||
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
|
||||
let storage = LocalFs::new(tempdir()?.path().to_owned(), harness.conf.workdir.clone())?;
|
||||
let storage = GenericRemoteStorage::Local(LocalFs::new(
|
||||
tempdir()?.path().to_owned(),
|
||||
harness.conf.workdir.clone(),
|
||||
)?);
|
||||
|
||||
let empty_remote_timeline_download = download_timeline_layers(
|
||||
harness.conf,
|
||||
@@ -614,10 +669,11 @@ mod tests {
|
||||
let harness = RepoHarness::create("test_download_index_part")?;
|
||||
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
|
||||
|
||||
let storage = LocalFs::new(
|
||||
tempdir()?.path().to_path_buf(),
|
||||
let storage = GenericRemoteStorage::Local(LocalFs::new(
|
||||
tempdir()?.path().to_owned(),
|
||||
harness.conf.workdir.clone(),
|
||||
)?;
|
||||
)?);
|
||||
let local_storage = storage.as_local().unwrap();
|
||||
let metadata = dummy_metadata(Lsn(0x30));
|
||||
let local_timeline_path = harness.timeline_path(&TIMELINE_ID);
|
||||
|
||||
@@ -638,7 +694,7 @@ mod tests {
|
||||
metadata_path(harness.conf, sync_id.timeline_id, sync_id.tenant_id)
|
||||
.with_file_name(IndexPart::FILE_NAME)
|
||||
.with_extension(IndexPart::FILE_EXTENSION);
|
||||
let storage_path = storage.remote_object_id(&local_index_part_path)?;
|
||||
let storage_path = local_storage.remote_object_id(&local_index_part_path)?;
|
||||
fs::create_dir_all(storage_path.parent().unwrap()).await?;
|
||||
fs::write(&storage_path, serde_json::to_vec(&index_part)?).await?;
|
||||
|
||||
|
||||
@@ -1,11 +1,14 @@
|
||||
//! Timeline synchronization logic to compress and upload to the remote storage all new timeline files from the checkpoints.
|
||||
|
||||
use std::{fmt::Debug, path::PathBuf};
|
||||
use std::{
|
||||
fmt::Debug,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
use futures::stream::{FuturesUnordered, StreamExt};
|
||||
use once_cell::sync::Lazy;
|
||||
use remote_storage::RemoteStorage;
|
||||
use remote_storage::{GenericRemoteStorage, RemoteStorage};
|
||||
use tokio::fs;
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
@@ -30,16 +33,12 @@ static NO_LAYERS_UPLOAD: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
});
|
||||
|
||||
/// Serializes and uploads the given index part data to the remote storage.
|
||||
pub(super) async fn upload_index_part<P, S>(
|
||||
pub(super) async fn upload_index_part(
|
||||
conf: &'static PageServerConf,
|
||||
storage: &S,
|
||||
storage: &GenericRemoteStorage,
|
||||
sync_id: ZTenantTimelineId,
|
||||
index_part: IndexPart,
|
||||
) -> anyhow::Result<()>
|
||||
where
|
||||
P: Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
|
||||
{
|
||||
) -> anyhow::Result<()> {
|
||||
let index_part_bytes = serde_json::to_vec(&index_part)
|
||||
.context("Failed to serialize index part file into bytes")?;
|
||||
let index_part_size = index_part_bytes.len();
|
||||
@@ -48,27 +47,9 @@ where
|
||||
let index_part_path = metadata_path(conf, sync_id.timeline_id, sync_id.tenant_id)
|
||||
.with_file_name(IndexPart::FILE_NAME)
|
||||
.with_extension(IndexPart::FILE_EXTENSION);
|
||||
let index_part_storage_path =
|
||||
storage
|
||||
.remote_object_id(&index_part_path)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to get the index part storage path for local path '{}'",
|
||||
index_part_path.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
storage
|
||||
.upload(
|
||||
index_part_bytes,
|
||||
index_part_size,
|
||||
&index_part_storage_path,
|
||||
None,
|
||||
)
|
||||
upload_storage_object(storage, index_part_bytes, index_part_size, &index_part_path)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("Failed to upload index part to the storage path '{index_part_storage_path:?}'")
|
||||
})
|
||||
.with_context(|| format!("Failed to upload index part for '{sync_id}'"))
|
||||
}
|
||||
|
||||
/// Timeline upload result, with extra data, needed for uploading.
|
||||
@@ -84,17 +65,13 @@ pub(super) enum UploadedTimeline {
|
||||
/// No extra checks for overlapping files is made and any files that are already present remotely will be overwritten, if submitted during the upload.
|
||||
///
|
||||
/// On an error, bumps the retries count and reschedules the entire task.
|
||||
pub(super) async fn upload_timeline_layers<'a, P, S>(
|
||||
storage: &'a S,
|
||||
pub(super) async fn upload_timeline_layers<'a>(
|
||||
storage: &'a GenericRemoteStorage,
|
||||
sync_queue: &SyncQueue,
|
||||
remote_timeline: Option<&'a RemoteTimeline>,
|
||||
sync_id: ZTenantTimelineId,
|
||||
mut upload_data: SyncData<LayersUpload>,
|
||||
) -> UploadedTimeline
|
||||
where
|
||||
P: Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
|
||||
{
|
||||
) -> UploadedTimeline {
|
||||
let upload = &mut upload_data.data;
|
||||
let new_upload_lsn = upload
|
||||
.metadata
|
||||
@@ -132,16 +109,6 @@ where
|
||||
let mut upload_tasks = layers_to_upload
|
||||
.into_iter()
|
||||
.map(|source_path| async move {
|
||||
let storage_path = storage
|
||||
.remote_object_id(&source_path)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to get the layer storage path for local path '{}'",
|
||||
source_path.display()
|
||||
)
|
||||
})
|
||||
.map_err(UploadError::Other)?;
|
||||
|
||||
let source_file = match fs::File::open(&source_path).await.with_context(|| {
|
||||
format!(
|
||||
"Failed to upen a source file for layer '{}'",
|
||||
@@ -164,15 +131,10 @@ where
|
||||
.map_err(UploadError::Other)?
|
||||
.len() as usize;
|
||||
|
||||
match storage
|
||||
.upload(source_file, source_size, &storage_path, None)
|
||||
match upload_storage_object(storage, source_file, source_size, &source_path)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to upload a layer from local path '{}'",
|
||||
source_path.display()
|
||||
)
|
||||
}) {
|
||||
.with_context(|| format!("Failed to upload layer file for {sync_id}"))
|
||||
{
|
||||
Ok(()) => Ok(source_path),
|
||||
Err(e) => Err(UploadError::MissingLocalFile(source_path, e)),
|
||||
}
|
||||
@@ -231,6 +193,51 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
async fn upload_storage_object(
|
||||
storage: &GenericRemoteStorage,
|
||||
from: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static,
|
||||
from_size_bytes: usize,
|
||||
from_path: &Path,
|
||||
) -> anyhow::Result<()> {
|
||||
async fn do_upload_storage_object<P, S>(
|
||||
storage: &S,
|
||||
from: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static,
|
||||
from_size_bytes: usize,
|
||||
from_path: &Path,
|
||||
) -> anyhow::Result<()>
|
||||
where
|
||||
P: std::fmt::Debug + Send + Sync + 'static,
|
||||
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
|
||||
{
|
||||
let target_storage_path = storage.remote_object_id(from_path).with_context(|| {
|
||||
format!(
|
||||
"Failed to get the storage path for source local path '{}'",
|
||||
from_path.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
storage
|
||||
.upload(from, from_size_bytes, &target_storage_path, None)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to upload from '{}' to storage path '{:?}'",
|
||||
from_path.display(),
|
||||
target_storage_path
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
match storage {
|
||||
GenericRemoteStorage::Local(storage) => {
|
||||
do_upload_storage_object(storage, from, from_size_bytes, from_path).await
|
||||
}
|
||||
GenericRemoteStorage::S3(storage) => {
|
||||
do_upload_storage_object(storage, from, from_size_bytes, from_path).await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum UploadError {
|
||||
MissingLocalFile(PathBuf, anyhow::Error),
|
||||
Other(anyhow::Error),
|
||||
@@ -243,7 +250,7 @@ mod tests {
|
||||
num::NonZeroUsize,
|
||||
};
|
||||
|
||||
use remote_storage::LocalFs;
|
||||
use remote_storage::{LocalFs, RemoteStorage};
|
||||
use tempfile::tempdir;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
@@ -264,10 +271,11 @@ mod tests {
|
||||
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
|
||||
|
||||
let layer_files = ["a", "b"];
|
||||
let storage = LocalFs::new(
|
||||
tempdir()?.path().to_path_buf(),
|
||||
let storage = GenericRemoteStorage::Local(LocalFs::new(
|
||||
tempdir()?.path().to_owned(),
|
||||
harness.conf.workdir.clone(),
|
||||
)?;
|
||||
)?);
|
||||
let local_storage = storage.as_local().unwrap();
|
||||
let current_retries = 3;
|
||||
let metadata = dummy_metadata(Lsn(0x30));
|
||||
let local_timeline_path = harness.timeline_path(&TIMELINE_ID);
|
||||
@@ -276,7 +284,7 @@ mod tests {
|
||||
timeline_upload.metadata = None;
|
||||
|
||||
assert!(
|
||||
storage.list().await?.is_empty(),
|
||||
local_storage.list().await?.is_empty(),
|
||||
"Storage should be empty before any uploads are made"
|
||||
);
|
||||
|
||||
@@ -322,7 +330,7 @@ mod tests {
|
||||
"Successful upload without metadata should not have it returned either"
|
||||
);
|
||||
|
||||
let storage_files = storage.list().await?;
|
||||
let storage_files = local_storage.list().await?;
|
||||
assert_eq!(
|
||||
storage_files.len(),
|
||||
layer_files.len(),
|
||||
@@ -331,7 +339,7 @@ mod tests {
|
||||
assert_eq!(
|
||||
storage_files
|
||||
.into_iter()
|
||||
.map(|storage_path| storage.local_path(&storage_path))
|
||||
.map(|storage_path| local_storage.local_path(&storage_path))
|
||||
.collect::<anyhow::Result<BTreeSet<_>>>()?,
|
||||
layer_files
|
||||
.into_iter()
|
||||
@@ -351,7 +359,11 @@ mod tests {
|
||||
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
|
||||
|
||||
let layer_files = ["a1", "b1"];
|
||||
let storage = LocalFs::new(tempdir()?.path().to_owned(), harness.conf.workdir.clone())?;
|
||||
let storage = GenericRemoteStorage::Local(LocalFs::new(
|
||||
tempdir()?.path().to_owned(),
|
||||
harness.conf.workdir.clone(),
|
||||
)?);
|
||||
let local_storage = storage.as_local().unwrap();
|
||||
let current_retries = 5;
|
||||
let metadata = dummy_metadata(Lsn(0x40));
|
||||
|
||||
@@ -365,7 +377,7 @@ mod tests {
|
||||
create_local_timeline(&harness, TIMELINE_ID, &layers_to_upload, metadata.clone())
|
||||
.await?;
|
||||
assert!(
|
||||
storage.list().await?.is_empty(),
|
||||
local_storage.list().await?.is_empty(),
|
||||
"Storage should be empty before any uploads are made"
|
||||
);
|
||||
|
||||
@@ -414,7 +426,7 @@ mod tests {
|
||||
"Successful upload should not change its metadata"
|
||||
);
|
||||
|
||||
let storage_files = storage.list().await?;
|
||||
let storage_files = local_storage.list().await?;
|
||||
assert_eq!(
|
||||
storage_files.len(),
|
||||
layer_files.len(),
|
||||
@@ -423,7 +435,7 @@ mod tests {
|
||||
assert_eq!(
|
||||
storage_files
|
||||
.into_iter()
|
||||
.map(|storage_path| storage.local_path(&storage_path))
|
||||
.map(|storage_path| local_storage.local_path(&storage_path))
|
||||
.collect::<anyhow::Result<BTreeSet<_>>>()?,
|
||||
layer_files
|
||||
.into_iter()
|
||||
@@ -440,7 +452,11 @@ mod tests {
|
||||
let harness = RepoHarness::create("test_upload_index_part")?;
|
||||
let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID);
|
||||
|
||||
let storage = LocalFs::new(tempdir()?.path().to_owned(), harness.conf.workdir.clone())?;
|
||||
let storage = GenericRemoteStorage::Local(LocalFs::new(
|
||||
tempdir()?.path().to_owned(),
|
||||
harness.conf.workdir.clone(),
|
||||
)?);
|
||||
let local_storage = storage.as_local().unwrap();
|
||||
let metadata = dummy_metadata(Lsn(0x40));
|
||||
let local_timeline_path = harness.timeline_path(&TIMELINE_ID);
|
||||
|
||||
@@ -458,12 +474,12 @@ mod tests {
|
||||
);
|
||||
|
||||
assert!(
|
||||
storage.list().await?.is_empty(),
|
||||
local_storage.list().await?.is_empty(),
|
||||
"Storage should be empty before any uploads are made"
|
||||
);
|
||||
upload_index_part(harness.conf, &storage, sync_id, index_part.clone()).await?;
|
||||
|
||||
let storage_files = storage.list().await?;
|
||||
let storage_files = local_storage.list().await?;
|
||||
assert_eq!(
|
||||
storage_files.len(),
|
||||
1,
|
||||
|
||||
@@ -12,6 +12,7 @@ use crate::thread_mgr::ThreadKind;
|
||||
use crate::walredo::PostgresRedoManager;
|
||||
use crate::{thread_mgr, timelines, walreceiver};
|
||||
use anyhow::Context;
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
@@ -131,7 +132,10 @@ impl fmt::Display for TenantState {
|
||||
/// Initialize repositories with locally available timelines.
|
||||
/// Timelines that are only partially available locally (remote storage has more data than this pageserver)
|
||||
/// are scheduled for download and added to the repository once download is completed.
|
||||
pub fn init_tenant_mgr(conf: &'static PageServerConf) -> anyhow::Result<RemoteIndex> {
|
||||
pub fn init_tenant_mgr(
|
||||
conf: &'static PageServerConf,
|
||||
remote_storage: Option<Arc<GenericRemoteStorage>>,
|
||||
) -> anyhow::Result<RemoteIndex> {
|
||||
let (timeline_updates_sender, timeline_updates_receiver) =
|
||||
mpsc::unbounded_channel::<LocalTimelineUpdate>();
|
||||
tenants_state::set_timeline_update_sender(timeline_updates_sender)?;
|
||||
@@ -140,7 +144,7 @@ pub fn init_tenant_mgr(conf: &'static PageServerConf) -> anyhow::Result<RemoteIn
|
||||
let SyncStartupData {
|
||||
remote_index,
|
||||
local_timeline_init_statuses,
|
||||
} = storage_sync::start_local_timeline_sync(conf)
|
||||
} = storage_sync::start_local_timeline_sync(conf, remote_storage)
|
||||
.context("Failed to set up local files sync with external storage")?;
|
||||
|
||||
for (tenant_id, local_timeline_init_statuses) in local_timeline_init_statuses {
|
||||
|
||||
Reference in New Issue
Block a user