diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 07f8cb08aa..d5ad2f8633 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -42,19 +42,13 @@ pub const DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS: u32 = 10; /// https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/ pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100; -pub trait RemoteObjectName { - // Needed to retrieve last component for RemoteObjectId. - // In other words a file name - fn object_name(&self) -> Option<&str>; -} - /// Storage (potentially remote) API to manage its state. /// This storage tries to be unaware of any layered repository context, /// providing basic CRUD operations for storage files. #[async_trait::async_trait] pub trait RemoteStorage: Send + Sync { /// A way to uniquely reference a file in the remote storage. - type RemoteObjectId: RemoteObjectName; + type RemoteObjectId; /// Attempts to derive the storage path out of the local path, if the latter is correct. fn remote_object_id(&self, local_path: &Path) -> anyhow::Result; @@ -71,7 +65,7 @@ pub trait RemoteStorage: Send + Sync { /// so this method doesnt need to. async fn list_prefixes( &self, - prefix: Option, + prefix: Option<&Self::RemoteObjectId>, ) -> anyhow::Result>; /// Streams the local file contents into remote into the remote storage entry. @@ -163,6 +157,13 @@ impl GenericRemoteStorage { } } } + + pub fn as_local(&self) -> Option<&LocalFs> { + match self { + Self::Local(local_fs) => Some(local_fs), + _ => None, + } + } } /// Extra set of key-value pairs that contain arbitrary metadata about the storage entry. diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index a65d0887af..ddf6c01759 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -5,7 +5,6 @@ //! volume is mounted to the local FS. use std::{ - borrow::Cow, future::Future, path::{Path, PathBuf}, pin::Pin, @@ -18,16 +17,10 @@ use tokio::{ }; use tracing::*; -use crate::{path_with_suffix_extension, Download, DownloadError, RemoteObjectName}; +use crate::{path_with_suffix_extension, Download, DownloadError}; use super::{strip_path_prefix, RemoteStorage, StorageMetadata}; -impl RemoteObjectName for PathBuf { - fn object_name(&self) -> Option<&str> { - self.file_stem().and_then(|n| n.to_str()) - } -} - pub struct LocalFs { working_directory: PathBuf, storage_root: PathBuf, @@ -113,13 +106,10 @@ impl RemoteStorage for LocalFs { async fn list_prefixes( &self, - prefix: Option, + prefix: Option<&Self::RemoteObjectId>, ) -> anyhow::Result> { - let path = match prefix { - Some(prefix) => Cow::Owned(prefix), - None => Cow::Borrowed(&self.storage_root), - }; - get_all_files(path.as_ref(), false).await + let path = prefix.unwrap_or(&self.storage_root); + get_all_files(path, false).await } async fn upload( diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 1b241fe4ed..db31200c36 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -19,9 +19,7 @@ use tokio::{io, sync::Semaphore}; use tokio_util::io::ReaderStream; use tracing::debug; -use crate::{ - strip_path_prefix, Download, DownloadError, RemoteObjectName, RemoteStorage, S3Config, -}; +use crate::{strip_path_prefix, Download, DownloadError, RemoteStorage, S3Config}; use super::StorageMetadata; @@ -96,6 +94,23 @@ const S3_PREFIX_SEPARATOR: char = '/'; pub struct S3ObjectKey(String); impl S3ObjectKey { + /// Turn a/b/c or a/b/c/ into c + pub fn object_name(&self) -> Option<&str> { + // corner case, char::to_string is not const, thats why this is more verbose than it needs to be + // see https://github.com/rust-lang/rust/issues/88674 + if self.0.len() == 1 && self.0.chars().next().unwrap() == S3_PREFIX_SEPARATOR { + return None; + } + + if self.0.ends_with(S3_PREFIX_SEPARATOR) { + self.0.rsplit(S3_PREFIX_SEPARATOR).nth(1) + } else { + self.0 + .rsplit_once(S3_PREFIX_SEPARATOR) + .map(|(_, last)| last) + } + } + fn key(&self) -> &str { &self.0 } @@ -119,25 +134,6 @@ impl S3ObjectKey { } } -impl RemoteObjectName for S3ObjectKey { - /// Turn a/b/c or a/b/c/ into c - fn object_name(&self) -> Option<&str> { - // corner case, char::to_string is not const, thats why this is more verbose than it needs to be - // see https://github.com/rust-lang/rust/issues/88674 - if self.0.len() == 1 && self.0.chars().next().unwrap() == S3_PREFIX_SEPARATOR { - return None; - } - - if self.0.ends_with(S3_PREFIX_SEPARATOR) { - self.0.rsplit(S3_PREFIX_SEPARATOR).nth(1) - } else { - self.0 - .rsplit_once(S3_PREFIX_SEPARATOR) - .map(|(_, last)| last) - } - } -} - /// AWS S3 storage. pub struct S3Bucket { workdir: PathBuf, @@ -316,11 +312,11 @@ impl RemoteStorage for S3Bucket { /// Note: it wont include empty "directories" async fn list_prefixes( &self, - prefix: Option, + prefix: Option<&Self::RemoteObjectId>, ) -> anyhow::Result> { // get the passed prefix or if it is not set use prefix_in_bucket value let list_prefix = prefix - .map(|p| p.0) + .map(|p| p.0.clone()) .or_else(|| self.prefix_in_bucket.clone()) .map(|mut p| { // required to end with a separator diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 1a13147f42..7a33a548e7 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -1,6 +1,7 @@ //! Main entry point for the Page Server executable. -use std::{env, ops::ControlFlow, path::Path, str::FromStr}; +use remote_storage::GenericRemoteStorage; +use std::{env, ops::ControlFlow, path::Path, str::FromStr, sync::Arc}; use tracing::*; use anyhow::{bail, Context, Result}; @@ -298,7 +299,14 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<() }; info!("Using auth: {:#?}", conf.auth_type); - let remote_index = tenant_mgr::init_tenant_mgr(conf)?; + let remote_storage = conf + .remote_storage_config + .as_ref() + .map(|storage_config| GenericRemoteStorage::new(conf.workdir.clone(), storage_config)) + .transpose() + .context("Failed to init generic remote storage")? + .map(Arc::new); + let remote_index = tenant_mgr::init_tenant_mgr(conf, remote_storage.as_ref().map(Arc::clone))?; // Spawn a new thread for the http endpoint // bind before launching separate thread so the error reported before startup exits @@ -310,7 +318,7 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<() "http_endpoint_thread", true, move || { - let router = http::make_router(conf, auth_cloned, remote_index)?; + let router = http::make_router(conf, auth_cloned, remote_index, remote_storage)?; endpoint::serve_thread_main(router, http_listener, thread_mgr::shutdown_watcher()) }, )?; diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 2bb181dd9a..ef18129504 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -35,7 +35,7 @@ struct State { auth: Option>, remote_index: RemoteIndex, allowlist_routes: Vec, - remote_storage: Option, + remote_storage: Option>, } impl State { @@ -43,20 +43,12 @@ impl State { conf: &'static PageServerConf, auth: Option>, remote_index: RemoteIndex, + remote_storage: Option>, ) -> anyhow::Result { let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml"] .iter() .map(|v| v.parse().unwrap()) .collect::>(); - // Note that this remote storage is created separately from the main one in the sync_loop. - // It's fine since it's stateless and some code duplication saves us from bloating the code around with generics. - let remote_storage = conf - .remote_storage_config - .as_ref() - .map(|storage_config| GenericRemoteStorage::new(conf.workdir.clone(), storage_config)) - .transpose() - .context("Failed to init generic remote storage")?; - Ok(Self { conf, auth, @@ -448,16 +440,8 @@ async fn gather_tenant_timelines_index_parts( tenant_id: ZTenantId, ) -> anyhow::Result>> { let index_parts = match state.remote_storage.as_ref() { - Some(GenericRemoteStorage::Local(local_storage)) => { - storage_sync::gather_tenant_timelines_index_parts(state.conf, local_storage, tenant_id) - .await - } - // FIXME here s3 storage contains its own limits, that are separate from sync storage thread ones - // because it is a different instance. We can move this limit to some global static - // or use one instance everywhere. - Some(GenericRemoteStorage::S3(s3_storage)) => { - storage_sync::gather_tenant_timelines_index_parts(state.conf, s3_storage, tenant_id) - .await + Some(storage) => { + storage_sync::gather_tenant_timelines_index_parts(state.conf, storage, tenant_id).await } None => return Ok(None), } @@ -714,6 +698,7 @@ pub fn make_router( conf: &'static PageServerConf, auth: Option>, remote_index: RemoteIndex, + remote_storage: Option>, ) -> anyhow::Result> { let spec = include_bytes!("openapi_spec.yml"); let mut router = attach_openapi_ui(endpoint::make_router(), spec, "/swagger.yml", "/v1/doc"); @@ -730,7 +715,8 @@ pub fn make_router( Ok(router .data(Arc::new( - State::new(conf, auth, remote_index).context("Failed to initialize router state")?, + State::new(conf, auth, remote_index, remote_storage) + .context("Failed to initialize router state")?, )) .get("/v1/status", status_handler) .get("/v1/tenant", tenant_list_handler) diff --git a/pageserver/src/storage_sync.rs b/pageserver/src/storage_sync.rs index 52d544b28c..a52cde7286 100644 --- a/pageserver/src/storage_sync.rs +++ b/pageserver/src/storage_sync.rs @@ -156,7 +156,7 @@ use std::{ use anyhow::{anyhow, bail, Context}; use futures::stream::{FuturesUnordered, StreamExt}; use once_cell::sync::{Lazy, OnceCell}; -use remote_storage::{GenericRemoteStorage, RemoteStorage}; +use remote_storage::GenericRemoteStorage; use tokio::{ fs, runtime::Runtime, @@ -253,36 +253,20 @@ pub struct SyncStartupData { /// Along with that, scans tenant files local and remote (if the sync gets enabled) to check the initial timeline states. pub fn start_local_timeline_sync( config: &'static PageServerConf, + storage: Option>, ) -> anyhow::Result { let local_timeline_files = local_tenant_timeline_files(config) .context("Failed to collect local tenant timeline files")?; - match config.remote_storage_config.as_ref() { - Some(storage_config) => { - match GenericRemoteStorage::new(config.workdir.clone(), storage_config) - .context("Failed to init the generic remote storage")? - { - GenericRemoteStorage::Local(local_fs_storage) => { - storage_sync::spawn_storage_sync_thread( - config, - local_timeline_files, - local_fs_storage, - storage_config.max_concurrent_syncs, - storage_config.max_sync_errors, - ) - } - GenericRemoteStorage::S3(s3_bucket_storage) => { - storage_sync::spawn_storage_sync_thread( - config, - local_timeline_files, - s3_bucket_storage, - storage_config.max_concurrent_syncs, - storage_config.max_sync_errors, - ) - } - } - .context("Failed to spawn the storage sync thread") - } + match storage.zip(config.remote_storage_config.as_ref()) { + Some((storage, storage_config)) => storage_sync::spawn_storage_sync_thread( + config, + local_timeline_files, + storage, + storage_config.max_concurrent_syncs, + storage_config.max_sync_errors, + ) + .context("Failed to spawn the storage sync thread"), None => { info!("No remote storage configured, skipping storage sync, considering all local timelines with correct metadata files enabled"); let mut local_timeline_init_statuses = LocalTimelineInitStatuses::new(); @@ -810,17 +794,13 @@ pub fn schedule_layer_download(tenant_id: ZTenantId, timeline_id: ZTimelineId) { /// Launch a thread to perform remote storage sync tasks. /// See module docs for loop step description. -pub(super) fn spawn_storage_sync_thread( +pub(super) fn spawn_storage_sync_thread( conf: &'static PageServerConf, local_timeline_files: HashMap)>, - storage: S, + storage: Arc, max_concurrent_timelines_sync: NonZeroUsize, max_sync_errors: NonZeroU32, -) -> anyhow::Result -where - P: Debug + Send + Sync + 'static, - S: RemoteStorage + Send + Sync + 'static, -{ +) -> anyhow::Result { let sync_queue = SyncQueue::new(max_concurrent_timelines_sync); SYNC_QUEUE .set(sync_queue) @@ -860,7 +840,7 @@ where storage_sync_loop( runtime, conf, - (Arc::new(storage), remote_index_clone, sync_queue), + (storage, remote_index_clone, sync_queue), max_sync_errors, ); Ok(()) @@ -873,15 +853,12 @@ where }) } -fn storage_sync_loop( +fn storage_sync_loop( runtime: Runtime, conf: &'static PageServerConf, - (storage, index, sync_queue): (Arc, RemoteIndex, &SyncQueue), + (storage, index, sync_queue): (Arc, RemoteIndex, &SyncQueue), max_sync_errors: NonZeroU32, -) where - P: Debug + Send + Sync + 'static, - S: RemoteStorage + Send + Sync + 'static, -{ +) { info!("Starting remote storage sync loop"); loop { let loop_storage = Arc::clone(&storage); @@ -983,18 +960,14 @@ enum UploadStatus { Nothing, } -async fn process_batches( +async fn process_batches( conf: &'static PageServerConf, max_sync_errors: NonZeroU32, - storage: Arc, + storage: Arc, index: &RemoteIndex, batched_tasks: HashMap, sync_queue: &SyncQueue, -) -> HashSet -where - P: Debug + Send + Sync + 'static, - S: RemoteStorage + Send + Sync + 'static, -{ +) -> HashSet { let mut sync_results = batched_tasks .into_iter() .map(|(sync_id, batch)| { @@ -1030,17 +1003,13 @@ where downloaded_timelines } -async fn process_sync_task_batch( +async fn process_sync_task_batch( conf: &'static PageServerConf, - (storage, index, sync_queue): (Arc, RemoteIndex, &SyncQueue), + (storage, index, sync_queue): (Arc, RemoteIndex, &SyncQueue), max_sync_errors: NonZeroU32, sync_id: ZTenantTimelineId, batch: SyncTaskBatch, -) -> DownloadStatus -where - P: Debug + Send + Sync + 'static, - S: RemoteStorage + Send + Sync + 'static, -{ +) -> DownloadStatus { let sync_start = Instant::now(); let current_remote_timeline = { index.read().await.timeline_entry(&sync_id).cloned() }; @@ -1175,19 +1144,15 @@ where download_status } -async fn download_timeline_data( +async fn download_timeline_data( conf: &'static PageServerConf, - (storage, index, sync_queue): (&S, &RemoteIndex, &SyncQueue), + (storage, index, sync_queue): (&GenericRemoteStorage, &RemoteIndex, &SyncQueue), current_remote_timeline: Option<&RemoteTimeline>, sync_id: ZTenantTimelineId, new_download_data: SyncData, sync_start: Instant, task_name: &str, -) -> DownloadStatus -where - P: Debug + Send + Sync + 'static, - S: RemoteStorage + Send + Sync + 'static, -{ +) -> DownloadStatus { match download_timeline_layers( conf, storage, @@ -1298,17 +1263,14 @@ async fn update_local_metadata( Ok(()) } -async fn delete_timeline_data( +async fn delete_timeline_data( conf: &'static PageServerConf, - (storage, index, sync_queue): (&S, &RemoteIndex, &SyncQueue), + (storage, index, sync_queue): (&GenericRemoteStorage, &RemoteIndex, &SyncQueue), sync_id: ZTenantTimelineId, mut new_delete_data: SyncData, sync_start: Instant, task_name: &str, -) where - P: Debug + Send + Sync + 'static, - S: RemoteStorage + Send + Sync + 'static, -{ +) { let timeline_delete = &mut new_delete_data.data; if !timeline_delete.deletion_registered { @@ -1343,19 +1305,15 @@ async fn read_metadata_file(metadata_path: &Path) -> anyhow::Result( +async fn upload_timeline_data( conf: &'static PageServerConf, - (storage, index, sync_queue): (&S, &RemoteIndex, &SyncQueue), + (storage, index, sync_queue): (&GenericRemoteStorage, &RemoteIndex, &SyncQueue), current_remote_timeline: Option<&RemoteTimeline>, sync_id: ZTenantTimelineId, new_upload_data: SyncData, sync_start: Instant, task_name: &str, -) -> UploadStatus -where - P: Debug + Send + Sync + 'static, - S: RemoteStorage + Send + Sync + 'static, -{ +) -> UploadStatus { let mut uploaded_data = match upload_timeline_layers( storage, sync_queue, @@ -1406,17 +1364,13 @@ enum RemoteDataUpdate<'a> { Delete(&'a HashSet), } -async fn update_remote_data( +async fn update_remote_data( conf: &'static PageServerConf, - storage: &S, + storage: &GenericRemoteStorage, index: &RemoteIndex, sync_id: ZTenantTimelineId, update: RemoteDataUpdate<'_>, -) -> anyhow::Result<()> -where - P: Debug + Send + Sync + 'static, - S: RemoteStorage + Send + Sync + 'static, -{ +) -> anyhow::Result<()> { let updated_remote_timeline = { let mut index_accessor = index.write().await; diff --git a/pageserver/src/storage_sync/delete.rs b/pageserver/src/storage_sync/delete.rs index 2e39ed073f..d80a082d0c 100644 --- a/pageserver/src/storage_sync/delete.rs +++ b/pageserver/src/storage_sync/delete.rs @@ -1,27 +1,25 @@ //! Timeline synchronization logic to delete a bulk of timeline's remote files from the remote storage. +use std::path::Path; + use anyhow::Context; use futures::stream::{FuturesUnordered, StreamExt}; use tracing::{debug, error, info}; use crate::storage_sync::{SyncQueue, SyncTask}; -use remote_storage::RemoteStorage; +use remote_storage::{GenericRemoteStorage, RemoteStorage}; use utils::zid::ZTenantTimelineId; use super::{LayersDeletion, SyncData}; /// Attempts to remove the timleline layers from the remote storage. /// If the task had not adjusted the metadata before, the deletion will fail. -pub(super) async fn delete_timeline_layers<'a, P, S>( - storage: &'a S, +pub(super) async fn delete_timeline_layers<'a>( + storage: &'a GenericRemoteStorage, sync_queue: &SyncQueue, sync_id: ZTenantTimelineId, mut delete_data: SyncData, -) -> bool -where - P: std::fmt::Debug + Send + Sync + 'static, - S: RemoteStorage + Send + Sync + 'static, -{ +) -> bool { if !delete_data.data.deletion_registered { error!("Cannot delete timeline layers before the deletion metadata is not registered, reenqueueing"); delete_data.retries += 1; @@ -45,25 +43,14 @@ where let mut delete_tasks = layers_to_delete .into_iter() .map(|local_layer_path| async { - let storage_path = - match storage - .remote_object_id(&local_layer_path) - .with_context(|| { - format!( - "Failed to get the layer storage path for local path '{}'", - local_layer_path.display() - ) - }) { - Ok(path) => path, - Err(e) => return Err((e, local_layer_path)), - }; - - match storage.delete(&storage_path).await.with_context(|| { - format!( - "Failed to delete remote layer from storage at '{:?}'", - storage_path - ) - }) { + match match storage { + GenericRemoteStorage::Local(storage) => { + remove_storage_object(storage, &local_layer_path).await + } + GenericRemoteStorage::S3(storage) => { + remove_storage_object(storage, &local_layer_path).await + } + } { Ok(()) => Ok(local_layer_path), Err(e) => Err((e, local_layer_path)), } @@ -101,6 +88,28 @@ where errored } +async fn remove_storage_object(storage: &S, local_layer_path: &Path) -> anyhow::Result<()> +where + P: std::fmt::Debug + Send + Sync + 'static, + S: RemoteStorage + Send + Sync + 'static, +{ + let storage_path = storage + .remote_object_id(local_layer_path) + .with_context(|| { + format!( + "Failed to get the layer storage path for local path '{}'", + local_layer_path.display() + ) + })?; + + storage.delete(&storage_path).await.with_context(|| { + format!( + "Failed to delete remote layer from storage at '{:?}'", + storage_path + ) + }) +} + #[cfg(test)] mod tests { use std::{collections::HashSet, num::NonZeroUsize}; @@ -114,7 +123,7 @@ mod tests { layered_repository::repo_harness::{RepoHarness, TIMELINE_ID}, storage_sync::test_utils::{create_local_timeline, dummy_metadata}, }; - use remote_storage::LocalFs; + use remote_storage::{LocalFs, RemoteStorage}; use super::*; @@ -123,10 +132,10 @@ mod tests { let harness = RepoHarness::create("delete_timeline_negative")?; let sync_queue = SyncQueue::new(NonZeroUsize::new(100).unwrap()); let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID); - let storage = LocalFs::new( + let storage = GenericRemoteStorage::Local(LocalFs::new( tempdir()?.path().to_path_buf(), harness.conf.workdir.clone(), - )?; + )?); let deleted = delete_timeline_layers( &storage, @@ -158,17 +167,20 @@ mod tests { let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID); let layer_files = ["a", "b", "c", "d"]; - let storage = LocalFs::new( + let storage = GenericRemoteStorage::Local(LocalFs::new( tempdir()?.path().to_path_buf(), harness.conf.workdir.clone(), - )?; + )?); + + let local_storage = storage.as_local().unwrap(); + let current_retries = 3; let metadata = dummy_metadata(Lsn(0x30)); let local_timeline_path = harness.timeline_path(&TIMELINE_ID); let timeline_upload = create_local_timeline(&harness, TIMELINE_ID, &layer_files, metadata.clone()).await?; for local_path in timeline_upload.layers_to_upload { - let remote_path = storage.remote_object_id(&local_path)?; + let remote_path = local_storage.remote_object_id(&local_path)?; let remote_parent_dir = remote_path.parent().unwrap(); if !remote_parent_dir.exists() { fs::create_dir_all(&remote_parent_dir).await?; @@ -176,11 +188,11 @@ mod tests { fs::copy(&local_path, &remote_path).await?; } assert_eq!( - storage + local_storage .list() .await? .into_iter() - .map(|remote_path| storage.local_path(&remote_path).unwrap()) + .map(|remote_path| local_storage.local_path(&remote_path).unwrap()) .filter_map(|local_path| { Some(local_path.file_name()?.to_str()?.to_owned()) }) .sorted() .collect::>(), @@ -213,11 +225,11 @@ mod tests { assert!(deleted, "Should be able to delete timeline files"); assert_eq!( - storage + local_storage .list() .await? .into_iter() - .map(|remote_path| storage.local_path(&remote_path).unwrap()) + .map(|remote_path| local_storage.local_path(&remote_path).unwrap()) .filter_map(|local_path| { Some(local_path.file_name()?.to_str()?.to_owned()) }) .sorted() .collect::>(), diff --git a/pageserver/src/storage_sync/download.rs b/pageserver/src/storage_sync/download.rs index 98c45bf9af..8e6aa47c88 100644 --- a/pageserver/src/storage_sync/download.rs +++ b/pageserver/src/storage_sync/download.rs @@ -9,7 +9,9 @@ use std::{ use anyhow::Context; use futures::stream::{FuturesUnordered, StreamExt}; -use remote_storage::{path_with_suffix_extension, DownloadError, RemoteObjectName, RemoteStorage}; +use remote_storage::{ + path_with_suffix_extension, Download, DownloadError, GenericRemoteStorage, RemoteStorage, +}; use tokio::{ fs, io::{self, AsyncWriteExt}, @@ -62,15 +64,11 @@ impl Default for TenantIndexParts { } } -pub async fn download_index_parts( +pub async fn download_index_parts( conf: &'static PageServerConf, - storage: &S, + storage: &GenericRemoteStorage, keys: HashSet, -) -> HashMap -where - P: Debug + Send + Sync + 'static, - S: RemoteStorage + Send + Sync + 'static, -{ +) -> HashMap { let mut index_parts: HashMap = HashMap::new(); let mut part_downloads = keys @@ -114,60 +112,17 @@ where /// Note: The function is rather expensive from s3 access point of view, it will execute ceil(N/1000) + N requests. /// At least one request to obtain a list of tenant timelines (more requests is there are more than 1000 timelines). /// And then will attempt to download all index files that belong to these timelines. -pub async fn gather_tenant_timelines_index_parts( +pub async fn gather_tenant_timelines_index_parts( conf: &'static PageServerConf, - storage: &S, + storage: &GenericRemoteStorage, tenant_id: ZTenantId, -) -> anyhow::Result> -where - P: RemoteObjectName + Debug + Send + Sync + 'static, - S: RemoteStorage + Send + Sync + 'static, -{ +) -> anyhow::Result> { let tenant_path = conf.timelines_path(&tenant_id); - let tenant_storage_path = storage.remote_object_id(&tenant_path).with_context(|| { - format!( - "Failed to get tenant storage path for local path '{}'", - tenant_path.display() - ) - })?; - - let timelines = storage - .list_prefixes(Some(tenant_storage_path)) + let timeline_sync_ids = get_timeline_sync_ids(storage, &tenant_path, tenant_id) .await - .with_context(|| { - format!( - "Failed to list tenant storage path to get remote timelines to download: {}", - tenant_id - ) - })?; + .with_context(|| format!("Failed to list timeline sync ids for tenat {tenant_id}"))?; - if timelines.is_empty() { - anyhow::bail!( - "no timelines found on the remote storage for tenant {}", - tenant_id - ) - } - - let mut sync_ids = HashSet::new(); - - for timeline_remote_storage_key in timelines { - let object_name = timeline_remote_storage_key.object_name().ok_or_else(|| { - anyhow::anyhow!("failed to get timeline id for remote tenant {tenant_id}") - })?; - - let timeline_id: ZTimelineId = object_name - .parse() - .with_context(|| { - format!("failed to parse object name into timeline id for tenant {tenant_id} '{object_name}'") - })?; - - sync_ids.insert(ZTenantTimelineId { - tenant_id, - timeline_id, - }); - } - - match download_index_parts(conf, storage, sync_ids) + match download_index_parts(conf, storage, timeline_sync_ids) .await .remove(&tenant_id) .ok_or_else(|| anyhow::anyhow!("Missing tenant index parts. This is a bug."))? @@ -180,29 +135,15 @@ where } /// Retrieves index data from the remote storage for a given timeline. -async fn download_index_part( +async fn download_index_part( conf: &'static PageServerConf, - storage: &S, + storage: &GenericRemoteStorage, sync_id: ZTenantTimelineId, -) -> Result -where - P: Debug + Send + Sync + 'static, - S: RemoteStorage + Send + Sync + 'static, -{ +) -> Result { let index_part_path = metadata_path(conf, sync_id.timeline_id, sync_id.tenant_id) .with_file_name(IndexPart::FILE_NAME) .with_extension(IndexPart::FILE_EXTENSION); - let part_storage_path = storage - .remote_object_id(&index_part_path) - .with_context(|| { - format!( - "Failed to get the index part storage path for local path '{}'", - index_part_path.display() - ) - }) - .map_err(DownloadError::BadInput)?; - - let mut index_part_download = storage.download(&part_storage_path).await?; + let mut index_part_download = download_storage_object(storage, &index_part_path).await?; let mut index_part_bytes = Vec::new(); io::copy( @@ -211,14 +152,18 @@ where ) .await .with_context(|| { - format!("Failed to download an index part from storage path {part_storage_path:?}") + format!( + "Failed to download an index part into file '{}'", + index_part_path.display() + ) }) .map_err(DownloadError::Other)?; let index_part: IndexPart = serde_json::from_slice(&index_part_bytes) .with_context(|| { format!( - "Failed to deserialize index part file from storage path '{part_storage_path:?}'" + "Failed to deserialize index part file into file '{}'", + index_part_path.display() ) }) .map_err(DownloadError::Other)?; @@ -249,18 +194,14 @@ pub(super) enum DownloadedTimeline { /// updated in the end, if the remote one contains a newer disk_consistent_lsn. /// /// On an error, bumps the retries count and updates the files to skip with successful downloads, rescheduling the task. -pub(super) async fn download_timeline_layers<'a, P, S>( +pub(super) async fn download_timeline_layers<'a>( conf: &'static PageServerConf, - storage: &'a S, + storage: &'a GenericRemoteStorage, sync_queue: &'a SyncQueue, remote_timeline: Option<&'a RemoteTimeline>, sync_id: ZTenantTimelineId, mut download_data: SyncData, -) -> DownloadedTimeline -where - P: Debug + Send + Sync + 'static, - S: RemoteStorage + Send + Sync + 'static, -{ +) -> DownloadedTimeline { let remote_timeline = match remote_timeline { Some(remote_timeline) => { if !remote_timeline.awaits_download { @@ -300,15 +241,6 @@ where layer_desination_path.display() ); } else { - let layer_storage_path = storage - .remote_object_id(&layer_desination_path) - .with_context(|| { - format!( - "Failed to get the layer storage path for local path '{}'", - layer_desination_path.display() - ) - })?; - // Perform a rename inspired by durable_rename from file_utils.c. // The sequence: // write(tmp) @@ -329,19 +261,23 @@ where temp_file_path.display() ) })?; - let mut download = storage - .download(&layer_storage_path) + + let mut layer_download = download_storage_object(storage, &layer_desination_path) .await .with_context(|| { format!( - "Failed to open a download stream for layer with remote storage path '{layer_storage_path:?}'" + "Failed to initiate the download the layer for {sync_id} into file '{}'", + temp_file_path.display() + ) + })?; + io::copy(&mut layer_download.download_stream, &mut destination_file) + .await + .with_context(|| { + format!( + "Failed to download the layer for {sync_id} into file '{}'", + temp_file_path.display() ) })?; - io::copy(&mut download.download_stream, &mut destination_file).await.with_context(|| { - format!( - "Failed to download layer with remote storage path '{layer_storage_path:?}' into file '{}'", temp_file_path.display() - ) - })?; // Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that: // A file will not be closed immediately when it goes out of scope if there are any IO operations @@ -429,6 +365,121 @@ where } } +async fn download_storage_object( + storage: &GenericRemoteStorage, + to_path: &Path, +) -> Result { + async fn do_download_storage_object( + storage: &S, + to_path: &Path, + ) -> Result + where + P: std::fmt::Debug + Send + Sync + 'static, + S: RemoteStorage + Send + Sync + 'static, + { + let remote_object_path = storage + .remote_object_id(to_path) + .with_context(|| { + format!( + "Failed to get the storage path for target local path '{}'", + to_path.display() + ) + }) + .map_err(DownloadError::BadInput)?; + + storage.download(&remote_object_path).await + } + + match storage { + GenericRemoteStorage::Local(storage) => do_download_storage_object(storage, to_path).await, + GenericRemoteStorage::S3(storage) => do_download_storage_object(storage, to_path).await, + } +} + +async fn get_timeline_sync_ids( + storage: &GenericRemoteStorage, + tenant_path: &Path, + tenant_id: ZTenantId, +) -> anyhow::Result> { + let timeline_ids: Vec = match storage { + GenericRemoteStorage::Local(storage) => list_prefixes(storage, tenant_path) + .await? + .into_iter() + .map(|timeline_directory_path| { + timeline_directory_path + .file_stem() + .with_context(|| { + format!( + "Failed to get timeline id string from file '{}'", + timeline_directory_path.display() + ) + })? + .to_string_lossy() + .as_ref() + .parse() + .with_context(|| { + format!( + "failed to parse directory name '{}' as timeline id", + timeline_directory_path.display() + ) + }) + }) + .collect::>(), + GenericRemoteStorage::S3(storage) => list_prefixes(storage, tenant_path) + .await? + .into_iter() + .map(|s3_path| { + s3_path + .object_name() + .with_context(|| { + format!("Failed to get object name out of S3 path {s3_path:?}") + })? + .parse() + .with_context(|| { + format!("failed to parse object name '{s3_path:?}' as timeline id") + }) + }) + .collect::>(), + } + .with_context(|| { + format!("Tenant {tenant_id} has at least one incorrect timeline subdirectory") + })?; + + if timeline_ids.is_empty() { + anyhow::bail!("no timelines found on the remote storage for tenant {tenant_id}") + } + + Ok(timeline_ids + .into_iter() + .map(|timeline_id| ZTenantTimelineId { + tenant_id, + timeline_id, + }) + .collect()) +} + +async fn list_prefixes(storage: &S, tenant_path: &Path) -> anyhow::Result> +where + P: std::fmt::Debug + Send + Sync + 'static, + S: RemoteStorage + Send + Sync + 'static, +{ + let tenant_storage_path = storage.remote_object_id(tenant_path).with_context(|| { + format!( + "Failed to get tenant storage path for local path '{}'", + tenant_path.display() + ) + })?; + + storage + .list_prefixes(Some(&tenant_storage_path)) + .await + .with_context(|| { + format!( + "Failed to list tenant storage path {tenant_storage_path:?} to get remote timelines to download" + ) + }) +} + async fn fsync_path(path: impl AsRef) -> Result<(), io::Error> { fs::File::open(path).await?.sync_all().await } @@ -461,10 +512,11 @@ mod tests { let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID); let layer_files = ["a", "b", "layer_to_skip", "layer_to_keep_locally"]; - let storage = LocalFs::new( - tempdir()?.path().to_path_buf(), + let storage = GenericRemoteStorage::Local(LocalFs::new( + tempdir()?.path().to_owned(), harness.conf.workdir.clone(), - )?; + )?); + let local_storage = storage.as_local().unwrap(); let current_retries = 3; let metadata = dummy_metadata(Lsn(0x30)); let local_timeline_path = harness.timeline_path(&TIMELINE_ID); @@ -472,7 +524,7 @@ mod tests { create_local_timeline(&harness, TIMELINE_ID, &layer_files, metadata.clone()).await?; for local_path in timeline_upload.layers_to_upload { - let remote_path = storage.remote_object_id(&local_path)?; + let remote_path = local_storage.remote_object_id(&local_path)?; let remote_parent_dir = remote_path.parent().unwrap(); if !remote_parent_dir.exists() { fs::create_dir_all(&remote_parent_dir).await?; @@ -558,7 +610,10 @@ mod tests { let harness = RepoHarness::create("download_timeline_negatives")?; let sync_queue = SyncQueue::new(NonZeroUsize::new(100).unwrap()); let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID); - let storage = LocalFs::new(tempdir()?.path().to_owned(), harness.conf.workdir.clone())?; + let storage = GenericRemoteStorage::Local(LocalFs::new( + tempdir()?.path().to_owned(), + harness.conf.workdir.clone(), + )?); let empty_remote_timeline_download = download_timeline_layers( harness.conf, @@ -614,10 +669,11 @@ mod tests { let harness = RepoHarness::create("test_download_index_part")?; let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID); - let storage = LocalFs::new( - tempdir()?.path().to_path_buf(), + let storage = GenericRemoteStorage::Local(LocalFs::new( + tempdir()?.path().to_owned(), harness.conf.workdir.clone(), - )?; + )?); + let local_storage = storage.as_local().unwrap(); let metadata = dummy_metadata(Lsn(0x30)); let local_timeline_path = harness.timeline_path(&TIMELINE_ID); @@ -638,7 +694,7 @@ mod tests { metadata_path(harness.conf, sync_id.timeline_id, sync_id.tenant_id) .with_file_name(IndexPart::FILE_NAME) .with_extension(IndexPart::FILE_EXTENSION); - let storage_path = storage.remote_object_id(&local_index_part_path)?; + let storage_path = local_storage.remote_object_id(&local_index_part_path)?; fs::create_dir_all(storage_path.parent().unwrap()).await?; fs::write(&storage_path, serde_json::to_vec(&index_part)?).await?; diff --git a/pageserver/src/storage_sync/upload.rs b/pageserver/src/storage_sync/upload.rs index 2acc935537..a8c768e0ae 100644 --- a/pageserver/src/storage_sync/upload.rs +++ b/pageserver/src/storage_sync/upload.rs @@ -1,11 +1,14 @@ //! Timeline synchronization logic to compress and upload to the remote storage all new timeline files from the checkpoints. -use std::{fmt::Debug, path::PathBuf}; +use std::{ + fmt::Debug, + path::{Path, PathBuf}, +}; use anyhow::Context; use futures::stream::{FuturesUnordered, StreamExt}; use once_cell::sync::Lazy; -use remote_storage::RemoteStorage; +use remote_storage::{GenericRemoteStorage, RemoteStorage}; use tokio::fs; use tracing::{debug, error, info, warn}; @@ -30,16 +33,12 @@ static NO_LAYERS_UPLOAD: Lazy = Lazy::new(|| { }); /// Serializes and uploads the given index part data to the remote storage. -pub(super) async fn upload_index_part( +pub(super) async fn upload_index_part( conf: &'static PageServerConf, - storage: &S, + storage: &GenericRemoteStorage, sync_id: ZTenantTimelineId, index_part: IndexPart, -) -> anyhow::Result<()> -where - P: Debug + Send + Sync + 'static, - S: RemoteStorage + Send + Sync + 'static, -{ +) -> anyhow::Result<()> { let index_part_bytes = serde_json::to_vec(&index_part) .context("Failed to serialize index part file into bytes")?; let index_part_size = index_part_bytes.len(); @@ -48,27 +47,9 @@ where let index_part_path = metadata_path(conf, sync_id.timeline_id, sync_id.tenant_id) .with_file_name(IndexPart::FILE_NAME) .with_extension(IndexPart::FILE_EXTENSION); - let index_part_storage_path = - storage - .remote_object_id(&index_part_path) - .with_context(|| { - format!( - "Failed to get the index part storage path for local path '{}'", - index_part_path.display() - ) - })?; - - storage - .upload( - index_part_bytes, - index_part_size, - &index_part_storage_path, - None, - ) + upload_storage_object(storage, index_part_bytes, index_part_size, &index_part_path) .await - .with_context(|| { - format!("Failed to upload index part to the storage path '{index_part_storage_path:?}'") - }) + .with_context(|| format!("Failed to upload index part for '{sync_id}'")) } /// Timeline upload result, with extra data, needed for uploading. @@ -84,17 +65,13 @@ pub(super) enum UploadedTimeline { /// No extra checks for overlapping files is made and any files that are already present remotely will be overwritten, if submitted during the upload. /// /// On an error, bumps the retries count and reschedules the entire task. -pub(super) async fn upload_timeline_layers<'a, P, S>( - storage: &'a S, +pub(super) async fn upload_timeline_layers<'a>( + storage: &'a GenericRemoteStorage, sync_queue: &SyncQueue, remote_timeline: Option<&'a RemoteTimeline>, sync_id: ZTenantTimelineId, mut upload_data: SyncData, -) -> UploadedTimeline -where - P: Debug + Send + Sync + 'static, - S: RemoteStorage + Send + Sync + 'static, -{ +) -> UploadedTimeline { let upload = &mut upload_data.data; let new_upload_lsn = upload .metadata @@ -132,16 +109,6 @@ where let mut upload_tasks = layers_to_upload .into_iter() .map(|source_path| async move { - let storage_path = storage - .remote_object_id(&source_path) - .with_context(|| { - format!( - "Failed to get the layer storage path for local path '{}'", - source_path.display() - ) - }) - .map_err(UploadError::Other)?; - let source_file = match fs::File::open(&source_path).await.with_context(|| { format!( "Failed to upen a source file for layer '{}'", @@ -164,15 +131,10 @@ where .map_err(UploadError::Other)? .len() as usize; - match storage - .upload(source_file, source_size, &storage_path, None) + match upload_storage_object(storage, source_file, source_size, &source_path) .await - .with_context(|| { - format!( - "Failed to upload a layer from local path '{}'", - source_path.display() - ) - }) { + .with_context(|| format!("Failed to upload layer file for {sync_id}")) + { Ok(()) => Ok(source_path), Err(e) => Err(UploadError::MissingLocalFile(source_path, e)), } @@ -231,6 +193,51 @@ where } } +async fn upload_storage_object( + storage: &GenericRemoteStorage, + from: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static, + from_size_bytes: usize, + from_path: &Path, +) -> anyhow::Result<()> { + async fn do_upload_storage_object( + storage: &S, + from: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static, + from_size_bytes: usize, + from_path: &Path, + ) -> anyhow::Result<()> + where + P: std::fmt::Debug + Send + Sync + 'static, + S: RemoteStorage + Send + Sync + 'static, + { + let target_storage_path = storage.remote_object_id(from_path).with_context(|| { + format!( + "Failed to get the storage path for source local path '{}'", + from_path.display() + ) + })?; + + storage + .upload(from, from_size_bytes, &target_storage_path, None) + .await + .with_context(|| { + format!( + "Failed to upload from '{}' to storage path '{:?}'", + from_path.display(), + target_storage_path + ) + }) + } + + match storage { + GenericRemoteStorage::Local(storage) => { + do_upload_storage_object(storage, from, from_size_bytes, from_path).await + } + GenericRemoteStorage::S3(storage) => { + do_upload_storage_object(storage, from, from_size_bytes, from_path).await + } + } +} + enum UploadError { MissingLocalFile(PathBuf, anyhow::Error), Other(anyhow::Error), @@ -243,7 +250,7 @@ mod tests { num::NonZeroUsize, }; - use remote_storage::LocalFs; + use remote_storage::{LocalFs, RemoteStorage}; use tempfile::tempdir; use utils::lsn::Lsn; @@ -264,10 +271,11 @@ mod tests { let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID); let layer_files = ["a", "b"]; - let storage = LocalFs::new( - tempdir()?.path().to_path_buf(), + let storage = GenericRemoteStorage::Local(LocalFs::new( + tempdir()?.path().to_owned(), harness.conf.workdir.clone(), - )?; + )?); + let local_storage = storage.as_local().unwrap(); let current_retries = 3; let metadata = dummy_metadata(Lsn(0x30)); let local_timeline_path = harness.timeline_path(&TIMELINE_ID); @@ -276,7 +284,7 @@ mod tests { timeline_upload.metadata = None; assert!( - storage.list().await?.is_empty(), + local_storage.list().await?.is_empty(), "Storage should be empty before any uploads are made" ); @@ -322,7 +330,7 @@ mod tests { "Successful upload without metadata should not have it returned either" ); - let storage_files = storage.list().await?; + let storage_files = local_storage.list().await?; assert_eq!( storage_files.len(), layer_files.len(), @@ -331,7 +339,7 @@ mod tests { assert_eq!( storage_files .into_iter() - .map(|storage_path| storage.local_path(&storage_path)) + .map(|storage_path| local_storage.local_path(&storage_path)) .collect::>>()?, layer_files .into_iter() @@ -351,7 +359,11 @@ mod tests { let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID); let layer_files = ["a1", "b1"]; - let storage = LocalFs::new(tempdir()?.path().to_owned(), harness.conf.workdir.clone())?; + let storage = GenericRemoteStorage::Local(LocalFs::new( + tempdir()?.path().to_owned(), + harness.conf.workdir.clone(), + )?); + let local_storage = storage.as_local().unwrap(); let current_retries = 5; let metadata = dummy_metadata(Lsn(0x40)); @@ -365,7 +377,7 @@ mod tests { create_local_timeline(&harness, TIMELINE_ID, &layers_to_upload, metadata.clone()) .await?; assert!( - storage.list().await?.is_empty(), + local_storage.list().await?.is_empty(), "Storage should be empty before any uploads are made" ); @@ -414,7 +426,7 @@ mod tests { "Successful upload should not change its metadata" ); - let storage_files = storage.list().await?; + let storage_files = local_storage.list().await?; assert_eq!( storage_files.len(), layer_files.len(), @@ -423,7 +435,7 @@ mod tests { assert_eq!( storage_files .into_iter() - .map(|storage_path| storage.local_path(&storage_path)) + .map(|storage_path| local_storage.local_path(&storage_path)) .collect::>>()?, layer_files .into_iter() @@ -440,7 +452,11 @@ mod tests { let harness = RepoHarness::create("test_upload_index_part")?; let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID); - let storage = LocalFs::new(tempdir()?.path().to_owned(), harness.conf.workdir.clone())?; + let storage = GenericRemoteStorage::Local(LocalFs::new( + tempdir()?.path().to_owned(), + harness.conf.workdir.clone(), + )?); + let local_storage = storage.as_local().unwrap(); let metadata = dummy_metadata(Lsn(0x40)); let local_timeline_path = harness.timeline_path(&TIMELINE_ID); @@ -458,12 +474,12 @@ mod tests { ); assert!( - storage.list().await?.is_empty(), + local_storage.list().await?.is_empty(), "Storage should be empty before any uploads are made" ); upload_index_part(harness.conf, &storage, sync_id, index_part.clone()).await?; - let storage_files = storage.list().await?; + let storage_files = local_storage.list().await?; assert_eq!( storage_files.len(), 1, diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index 921d973a41..4a907ac0e1 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -12,6 +12,7 @@ use crate::thread_mgr::ThreadKind; use crate::walredo::PostgresRedoManager; use crate::{thread_mgr, timelines, walreceiver}; use anyhow::Context; +use remote_storage::GenericRemoteStorage; use serde::{Deserialize, Serialize}; use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; @@ -131,7 +132,10 @@ impl fmt::Display for TenantState { /// Initialize repositories with locally available timelines. /// Timelines that are only partially available locally (remote storage has more data than this pageserver) /// are scheduled for download and added to the repository once download is completed. -pub fn init_tenant_mgr(conf: &'static PageServerConf) -> anyhow::Result { +pub fn init_tenant_mgr( + conf: &'static PageServerConf, + remote_storage: Option>, +) -> anyhow::Result { let (timeline_updates_sender, timeline_updates_receiver) = mpsc::unbounded_channel::(); tenants_state::set_timeline_update_sender(timeline_updates_sender)?; @@ -140,7 +144,7 @@ pub fn init_tenant_mgr(conf: &'static PageServerConf) -> anyhow::Result