diff --git a/pageserver/src/remote_storage.rs b/pageserver/src/remote_storage.rs index 21ab65a896..6e960ed1b7 100644 --- a/pageserver/src/remote_storage.rs +++ b/pageserver/src/remote_storage.rs @@ -78,16 +78,12 @@ use std::{ thread, }; -use anyhow::{anyhow, ensure, Context}; +use anyhow::Context; use tokio::io; -use zenith_utils::zid::{ZTenantId, ZTimelineId}; pub use self::storage_sync::schedule_timeline_upload; use self::{local_fs::LocalFs, rust_s3::S3}; -use crate::{ - layered_repository::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME}, - PageServerConf, RemoteStorageKind, -}; +use crate::{PageServerConf, RemoteStorageKind}; /// Based on the config, initiates the remote storage connection and starts a separate thread /// that ensures that pageserver and the remote storage are in sync with each other. @@ -127,8 +123,8 @@ trait RemoteStorage: Send + Sync { /// Attempts to derive the storage path out of the local path, if the latter is correct. fn storage_path(&self, local_path: &Path) -> anyhow::Result; - /// Gets the layered storage information about the given entry. - fn info(&self, storage_path: &Self::StoragePath) -> anyhow::Result; + /// Gets the download path of the given storage file. + fn local_path(&self, storage_path: &Self::StoragePath) -> anyhow::Result; /// Lists all items the storage has right now. async fn list(&self) -> anyhow::Result>; @@ -159,16 +155,6 @@ trait RemoteStorage: Send + Sync { async fn delete(&self, path: &Self::StoragePath) -> anyhow::Result<()>; } -/// Information about a certain remote storage entry. -#[derive(Debug, PartialEq, Eq)] -struct RemoteFileInfo { - tenant_id: ZTenantId, - timeline_id: ZTimelineId, - /// Path in the pageserver workdir where the file should go to. - download_destination: PathBuf, - is_metadata: bool, -} - fn strip_path_prefix<'a>(prefix: &'a Path, path: &'a Path) -> anyhow::Result<&'a Path> { if prefix == path { anyhow::bail!( @@ -185,147 +171,3 @@ fn strip_path_prefix<'a>(prefix: &'a Path, path: &'a Path) -> anyhow::Result<&'a }) } } - -fn parse_ids_from_path<'a, R: std::fmt::Display>( - path_segments: impl Iterator, - path_log_representation: &R, -) -> anyhow::Result<(ZTenantId, ZTimelineId)> { - let mut segments = path_segments.skip_while(|&segment| segment != TENANTS_SEGMENT_NAME); - let tenants_segment = segments.next().ok_or_else(|| { - anyhow!( - "Found no '{}' segment in the storage path '{}'", - TENANTS_SEGMENT_NAME, - path_log_representation - ) - })?; - ensure!( - tenants_segment == TENANTS_SEGMENT_NAME, - "Failed to extract '{}' segment from storage path '{}'", - TENANTS_SEGMENT_NAME, - path_log_representation - ); - let tenant_id = segments - .next() - .ok_or_else(|| { - anyhow!( - "Found no tenant id in the storage path '{}'", - path_log_representation - ) - })? - .parse::() - .with_context(|| { - format!( - "Failed to parse tenant id from storage path '{}'", - path_log_representation - ) - })?; - - let timelines_segment = segments.next().ok_or_else(|| { - anyhow!( - "Found no '{}' segment in the storage path '{}'", - TIMELINES_SEGMENT_NAME, - path_log_representation - ) - })?; - ensure!( - timelines_segment == TIMELINES_SEGMENT_NAME, - "Failed to extract '{}' segment from storage path '{}'", - TIMELINES_SEGMENT_NAME, - path_log_representation - ); - let timeline_id = segments - .next() - .ok_or_else(|| { - anyhow!( - "Found no timeline id in the storage path '{}'", - path_log_representation - ) - })? - .parse::() - .with_context(|| { - format!( - "Failed to parse timeline id from storage path '{}'", - path_log_representation - ) - })?; - - Ok((tenant_id, timeline_id)) -} - -/// A set of common test utils to share in unit tests inside the module tree. -#[cfg(test)] -mod test_utils { - use std::path::{Path, PathBuf}; - - use anyhow::ensure; - - use crate::{ - layered_repository::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME}, - repository::repo_harness::{RepoHarness, TIMELINE_ID}, - }; - - /// Gives a timeline path with pageserver workdir stripped off. - pub fn relative_timeline_path(harness: &RepoHarness) -> anyhow::Result { - let timeline_path = harness.timeline_path(&TIMELINE_ID); - Ok(timeline_path - .strip_prefix(&harness.conf.workdir)? - .to_path_buf()) - } - - /// Creates a path with custom tenant id in one of its segments. - /// Useful for emulating paths with wrong ids. - pub fn custom_tenant_id_path( - path_with_tenant_id: &Path, - new_tenant_id: &str, - ) -> anyhow::Result { - let mut new_path = PathBuf::new(); - let mut is_tenant_id = false; - let mut tenant_id_replaced = false; - for segment in path_with_tenant_id { - match segment.to_str() { - Some(TENANTS_SEGMENT_NAME) => is_tenant_id = true, - Some(_tenant_id_str) if is_tenant_id => { - is_tenant_id = false; - new_path.push(new_tenant_id); - tenant_id_replaced = true; - continue; - } - _ => {} - } - new_path.push(segment) - } - - ensure!(tenant_id_replaced, "Found no tenant id segment to replace"); - Ok(new_path) - } - - /// Creates a path with custom timeline id in one of its segments. - /// Useful for emulating paths with wrong ids. - pub fn custom_timeline_id_path( - path_with_timeline_id: &Path, - new_timeline_id: &str, - ) -> anyhow::Result { - let mut new_path = PathBuf::new(); - let mut is_timeline_id = false; - let mut timeline_id_replaced = false; - for segment in path_with_timeline_id { - match segment.to_str() { - Some(TIMELINES_SEGMENT_NAME) => is_timeline_id = true, - Some(_timeline_id_str) if is_timeline_id => { - is_timeline_id = false; - new_path.push(new_timeline_id); - timeline_id_replaced = true; - continue; - } - _ => {} - } - new_path.push(segment) - } - - ensure!( - timeline_id_replaced, - "Found no timeline id segment to replace" - ); - Ok(new_path) - } -} diff --git a/pageserver/src/remote_storage/local_fs.rs b/pageserver/src/remote_storage/local_fs.rs index d7cdcacb98..e9651988da 100644 --- a/pageserver/src/remote_storage/local_fs.rs +++ b/pageserver/src/remote_storage/local_fs.rs @@ -5,7 +5,6 @@ //! volume is mounted to the local FS. use std::{ - ffi::OsStr, future::Future, path::{Path, PathBuf}, pin::Pin, @@ -18,8 +17,7 @@ use tokio::{ }; use tracing::*; -use super::{parse_ids_from_path, strip_path_prefix, RemoteFileInfo, RemoteStorage}; -use crate::layered_repository::metadata::METADATA_FILE_NAME; +use super::{strip_path_prefix, RemoteStorage}; pub struct LocalFs { pageserver_workdir: &'static Path, @@ -68,22 +66,10 @@ impl RemoteStorage for LocalFs { )) } - fn info(&self, storage_path: &Self::StoragePath) -> anyhow::Result { - let is_metadata = - storage_path.file_name().and_then(OsStr::to_str) == Some(METADATA_FILE_NAME); + fn local_path(&self, storage_path: &Self::StoragePath) -> anyhow::Result { let relative_path = strip_path_prefix(&self.root, storage_path) .context("local path does not belong to this storage")?; - let download_destination = self.pageserver_workdir.join(relative_path); - let (tenant_id, timeline_id) = parse_ids_from_path( - relative_path.iter().filter_map(|segment| segment.to_str()), - &relative_path.display(), - )?; - Ok(RemoteFileInfo { - tenant_id, - timeline_id, - download_destination, - is_metadata, - }) + Ok(self.pageserver_workdir.join(relative_path)) } async fn list(&self) -> anyhow::Result> { @@ -113,11 +99,18 @@ impl RemoteStorage for LocalFs { io::copy(&mut from, &mut destination) .await - .context("Failed to upload a file to the local storage")?; - destination - .flush() - .await - .context("Failed to upload a file to the local storage")?; + .with_context(|| { + format!( + "Failed to upload file to the local storage at '{}'", + target_file_path.display() + ) + })?; + destination.flush().await.with_context(|| { + format!( + "Failed to upload file to the local storage at '{}'", + target_file_path.display() + ) + })?; Ok(()) } @@ -141,9 +134,13 @@ impl RemoteStorage for LocalFs { ) })?, ); - io::copy(&mut source, to) - .await - .context("Failed to download a file from the local storage")?; + io::copy(&mut source, to).await.with_context(|| { + format!( + "Failed to download file '{}' from the local storage", + file_path.display() + ) + })?; + source.flush().await?; Ok(()) } else { bail!( @@ -275,9 +272,6 @@ async fn create_target_directory(target_file_path: &Path) -> anyhow::Result<()> mod pure_tests { use crate::{ layered_repository::metadata::METADATA_FILE_NAME, - remote_storage::test_utils::{ - custom_tenant_id_path, custom_timeline_id_path, relative_timeline_path, - }, repository::repo_harness::{RepoHarness, TIMELINE_ID}, }; @@ -345,8 +339,8 @@ mod pure_tests { } #[test] - fn info_positive() -> anyhow::Result<()> { - let repo_harness = RepoHarness::create("info_positive")?; + fn local_path_positive() -> anyhow::Result<()> { + let repo_harness = RepoHarness::create("local_path_positive")?; let storage_root = PathBuf::from("somewhere").join("else"); let storage = LocalFs { pageserver_workdir: &repo_harness.conf.workdir, @@ -356,14 +350,11 @@ mod pure_tests { let name = "not a metadata"; let local_path = repo_harness.timeline_path(&TIMELINE_ID).join(name); assert_eq!( - RemoteFileInfo { - tenant_id: repo_harness.tenant_id, - timeline_id: TIMELINE_ID, - download_destination: local_path.clone(), - is_metadata: false, - }, + local_path, storage - .info(&storage_root.join(local_path.strip_prefix(&repo_harness.conf.workdir)?)) + .local_path( + &storage_root.join(local_path.strip_prefix(&repo_harness.conf.workdir)?) + ) .expect("For a valid input, valid S3 info should be parsed"), "Should be able to parse metadata out of the correctly named remote delta file" ); @@ -373,15 +364,10 @@ mod pure_tests { .join(METADATA_FILE_NAME); let remote_metadata_path = storage.storage_path(&local_metadata_path)?; assert_eq!( - RemoteFileInfo { - tenant_id: repo_harness.tenant_id, - timeline_id: TIMELINE_ID, - download_destination: local_metadata_path, - is_metadata: true, - }, + local_metadata_path, storage - .info(&remote_metadata_path) - .expect("For a valid input, valid S3 info should be parsed"), + .local_path(&remote_metadata_path) + .expect("For a valid input, valid local path should be parsed"), "Should be able to parse metadata out of the correctly named remote metadata file" ); @@ -389,53 +375,30 @@ mod pure_tests { } #[test] - fn info_negatives() -> anyhow::Result<()> { + fn local_path_negatives() -> anyhow::Result<()> { #[track_caller] - #[allow(clippy::ptr_arg)] // have to use &PathBuf due to `storage.info` parameter requirements - fn storage_info_error(storage: &LocalFs, storage_path: &PathBuf) -> String { - match storage.info(storage_path) { - Ok(wrong_info) => panic!( - "Expected storage path input {:?} to cause an error, but got file info: {:?}", - storage_path, wrong_info, + #[allow(clippy::ptr_arg)] // have to use &PathBuf due to `storage.local_path` parameter requirements + fn local_path_error(storage: &LocalFs, storage_path: &PathBuf) -> String { + match storage.local_path(storage_path) { + Ok(wrong_path) => panic!( + "Expected local path input {:?} to cause an error, but got file path: {:?}", + storage_path, wrong_path, ), Err(e) => format!("{:?}", e), } } - let repo_harness = RepoHarness::create("info_negatives")?; + let repo_harness = RepoHarness::create("local_path_negatives")?; let storage_root = PathBuf::from("somewhere").join("else"); let storage = LocalFs { pageserver_workdir: &repo_harness.conf.workdir, - root: storage_root.clone(), + root: storage_root, }; let totally_wrong_path = "wrong_wrong_wrong"; - let error_message = storage_info_error(&storage, &PathBuf::from(totally_wrong_path)); + let error_message = local_path_error(&storage, &PathBuf::from(totally_wrong_path)); assert!(error_message.contains(totally_wrong_path)); - let relative_timeline_path = relative_timeline_path(&repo_harness)?; - - let relative_file_path = custom_tenant_id_path(&relative_timeline_path, "wrong_tenant_id")? - .join("wrong_tenant_id_name"); - let wrong_tenant_id_path = storage_root.join(&relative_file_path); - let error_message = storage_info_error(&storage, &wrong_tenant_id_path); - assert!( - error_message.contains(relative_file_path.to_str().unwrap()), - "Error message '{}' does not contain the expected substring", - error_message - ); - - let relative_file_path = - custom_timeline_id_path(&relative_timeline_path, "wrong_timeline_id")? - .join("wrong_timeline_id_name"); - let wrong_timeline_id_path = storage_root.join(&relative_file_path); - let error_message = storage_info_error(&storage, &wrong_timeline_id_path); - assert!( - error_message.contains(relative_file_path.to_str().unwrap()), - "Error message '{}' does not contain the expected substring", - error_message - ); - Ok(()) } @@ -451,7 +414,7 @@ mod pure_tests { }; let storage_path = dummy_storage.storage_path(&original_path)?; - let download_destination = dummy_storage.info(&storage_path)?.download_destination; + let download_destination = dummy_storage.local_path(&storage_path)?; assert_eq!( original_path, download_destination, @@ -465,9 +428,7 @@ mod pure_tests { #[cfg(test)] mod fs_tests { use super::*; - use crate::{ - remote_storage::test_utils::relative_timeline_path, repository::repo_harness::RepoHarness, - }; + use crate::repository::repo_harness::{RepoHarness, TIMELINE_ID}; use std::io::Write; use tempfile::tempdir; @@ -684,10 +645,9 @@ mod fs_tests { storage: &LocalFs, name: &str, ) -> anyhow::Result { - let storage_path = storage - .root - .join(relative_timeline_path(harness)?) - .join(name); + let timeline_path = harness.timeline_path(&TIMELINE_ID); + let relative_timeline_path = timeline_path.strip_prefix(&harness.conf.workdir)?; + let storage_path = storage.root.join(relative_timeline_path).join(name); storage .upload( create_file_for_upload( diff --git a/pageserver/src/remote_storage/rust_s3.rs b/pageserver/src/remote_storage/rust_s3.rs index 05e8617b82..a494d187e6 100644 --- a/pageserver/src/remote_storage/rust_s3.rs +++ b/pageserver/src/remote_storage/rust_s3.rs @@ -9,8 +9,7 @@ use s3::{bucket::Bucket, creds::Credentials, region::Region}; use tokio::io::{self, AsyncWriteExt}; use crate::{ - layered_repository::metadata::METADATA_FILE_NAME, - remote_storage::{parse_ids_from_path, strip_path_prefix, RemoteFileInfo, RemoteStorage}, + remote_storage::{strip_path_prefix, RemoteStorage}, S3Config, }; @@ -76,19 +75,8 @@ impl RemoteStorage for S3 { Ok(S3ObjectKey(key)) } - fn info(&self, storage_path: &Self::StoragePath) -> anyhow::Result { - let storage_path_key = &storage_path.0; - let is_metadata = - storage_path_key.ends_with(&format!("{}{}", S3_FILE_SEPARATOR, METADATA_FILE_NAME)); - let download_destination = storage_path.download_destination(self.pageserver_workdir); - let (tenant_id, timeline_id) = - parse_ids_from_path(storage_path_key.split(S3_FILE_SEPARATOR), storage_path_key)?; - Ok(RemoteFileInfo { - tenant_id, - timeline_id, - download_destination, - is_metadata, - }) + fn local_path(&self, storage_path: &Self::StoragePath) -> anyhow::Result { + Ok(storage_path.download_destination(self.pageserver_workdir)) } async fn list(&self) -> anyhow::Result> { @@ -212,9 +200,7 @@ impl RemoteStorage for S3 { #[cfg(test)] mod tests { use crate::{ - remote_storage::test_utils::{ - custom_tenant_id_path, custom_timeline_id_path, relative_timeline_path, - }, + layered_repository::metadata::METADATA_FILE_NAME, repository::repo_harness::{RepoHarness, TIMELINE_ID}, }; @@ -316,35 +302,26 @@ mod tests { } #[test] - fn info_positive() -> anyhow::Result<()> { - let repo_harness = RepoHarness::create("info_positive")?; + fn local_path_positive() -> anyhow::Result<()> { + let repo_harness = RepoHarness::create("local_path_positive")?; let storage = dummy_storage(&repo_harness.conf.workdir); - let relative_timeline_path = relative_timeline_path(&repo_harness)?; + let timeline_dir = repo_harness.timeline_path(&TIMELINE_ID); + let relative_timeline_path = timeline_dir.strip_prefix(&repo_harness.conf.workdir)?; let s3_key = create_s3_key(&relative_timeline_path.join("not a metadata")); assert_eq!( - RemoteFileInfo { - tenant_id: repo_harness.tenant_id, - timeline_id: TIMELINE_ID, - download_destination: s3_key.download_destination(&repo_harness.conf.workdir), - is_metadata: false, - }, + s3_key.download_destination(&repo_harness.conf.workdir), storage - .info(&s3_key) + .local_path(&s3_key) .expect("For a valid input, valid S3 info should be parsed"), "Should be able to parse metadata out of the correctly named remote delta file" ); let s3_key = create_s3_key(&relative_timeline_path.join(METADATA_FILE_NAME)); assert_eq!( - RemoteFileInfo { - tenant_id: repo_harness.tenant_id, - timeline_id: TIMELINE_ID, - download_destination: s3_key.download_destination(&repo_harness.conf.workdir), - is_metadata: true, - }, + s3_key.download_destination(&repo_harness.conf.workdir), storage - .info(&s3_key) + .local_path(&s3_key) .expect("For a valid input, valid S3 info should be parsed"), "Should be able to parse metadata out of the correctly named remote metadata file" ); @@ -352,43 +329,6 @@ mod tests { Ok(()) } - #[test] - fn info_negatives() -> anyhow::Result<()> { - #[track_caller] - fn storage_info_error(storage: &S3, s3_key: &S3ObjectKey) -> String { - match storage.info(s3_key) { - Ok(wrong_info) => panic!( - "Expected key {:?} to error, but got file info: {:?}", - s3_key, wrong_info, - ), - Err(e) => e.to_string(), - } - } - - let repo_harness = RepoHarness::create("info_negatives")?; - let storage = dummy_storage(&repo_harness.conf.workdir); - let relative_timeline_path = relative_timeline_path(&repo_harness)?; - - let totally_wrong_path = "wrong_wrong_wrong"; - let error_message = - storage_info_error(&storage, &S3ObjectKey(totally_wrong_path.to_string())); - assert!(error_message.contains(totally_wrong_path)); - - let wrong_tenant_id = create_s3_key( - &custom_tenant_id_path(&relative_timeline_path, "wrong_tenant_id")?.join("name"), - ); - let error_message = storage_info_error(&storage, &wrong_tenant_id); - assert!(error_message.contains(&wrong_tenant_id.0)); - - let wrong_timeline_id = create_s3_key( - &custom_timeline_id_path(&relative_timeline_path, "wrong_timeline_id")?.join("name"), - ); - let error_message = storage_info_error(&storage, &wrong_timeline_id); - assert!(error_message.contains(&wrong_timeline_id.0)); - - Ok(()) - } - #[test] fn download_destination_matches_original_path() -> anyhow::Result<()> { let repo_harness = RepoHarness::create("download_destination_matches_original_path")?; @@ -397,7 +337,7 @@ mod tests { let dummy_storage = dummy_storage(&repo_harness.conf.workdir); let key = dummy_storage.storage_path(&original_path)?; - let download_destination = dummy_storage.info(&key)?.download_destination; + let download_destination = dummy_storage.local_path(&key)?; assert_eq!( original_path, download_destination, diff --git a/pageserver/src/remote_storage/storage_sync.rs b/pageserver/src/remote_storage/storage_sync.rs index 96d5d2698f..732e8e505c 100644 --- a/pageserver/src/remote_storage/storage_sync.rs +++ b/pageserver/src/remote_storage/storage_sync.rs @@ -67,7 +67,7 @@ use std::{ time::Duration, }; -use anyhow::{ensure, Context}; +use anyhow::{anyhow, ensure, Context}; use futures::stream::{FuturesUnordered, StreamExt}; use lazy_static::lazy_static; use tokio::{ @@ -78,9 +78,12 @@ use tokio::{ }; use tracing::*; -use super::{RemoteFileInfo, RemoteStorage}; +use super::RemoteStorage; use crate::{ - layered_repository::metadata::{metadata_path, TimelineMetadata}, + layered_repository::{ + metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME}, + TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME, + }, tenant_mgr::register_timeline_download, PageServerConf, }; @@ -365,19 +368,41 @@ async fn fetch_existing_uploads path, + Err(e) => return Err((e, remote_path)), + }; + let metadata = if local_path + .file_name() + .and_then(|os_str| os_str.to_str()) + .unwrap_or_default() + == METADATA_FILE_NAME + { + let mut metadata_bytes = Vec::new(); + if let Err(e) = remote_storage + .download(&remote_path, &mut metadata_bytes) + .await + { + return Err((e, remote_path)); + }; + let metadata = match TimelineMetadata::from_bytes(&metadata_bytes) { + Ok(metadata) => metadata, + Err(e) => return Err((e, remote_path)), + }; + Some(metadata) + } else { + None + }; + let (tenant_id, timeline_id) = + parse_ids_from_path(&local_path).map_err(|e| (e, remote_path))?; + Ok::<_, (anyhow::Error, P)>((local_path, tenant_id, timeline_id, metadata)) }) .collect::>(); let mut fetched = HashMap::new(); - while let Some((fetch_result, remote_path)) = data_fetches.next().await { + while let Some(fetch_result) = data_fetches.next().await { match fetch_result { - Ok((file_info, remote_metadata)) => { - let tenant_id = file_info.tenant_id; - let timeline_id = file_info.timeline_id; + Ok((local_path, tenant_id, timeline_id, remote_metadata)) => { let remote_timeline = fetched .entry((tenant_id, timeline_id)) @@ -390,10 +415,10 @@ async fn fetch_existing_uploads { + Err((e, remote_path)) => { warn!( "Failed to fetch file info for path {:?}, reason: {:#}", remote_path, e @@ -406,34 +431,70 @@ async fn fetch_existing_uploads>( - remote_storage: &S, - remote_path: &P, -) -> anyhow::Result<(RemoteFileInfo, Option)> { - let info = remote_storage.info(remote_path)?; - let metadata = if info.is_metadata { - let mut metadata_bytes = io::BufWriter::new(std::io::Cursor::new(Vec::new())); - remote_storage - .download(remote_path, &mut metadata_bytes) - .await - .with_context(|| { - format!( - "Failed to download metadata file contents for tenant {}, timeline {}", - info.tenant_id, info.timeline_id - ) - })?; - metadata_bytes.flush().await.with_context(|| { +fn parse_ids_from_path(path: &Path) -> anyhow::Result<(ZTenantId, ZTimelineId)> { + let mut segments = path + .iter() + .flat_map(|segment| segment.to_str()) + .skip_while(|&segment| segment != TENANTS_SEGMENT_NAME); + let tenants_segment = segments.next().ok_or_else(|| { + anyhow!( + "Found no '{}' segment in the storage path '{}'", + TENANTS_SEGMENT_NAME, + path.display() + ) + })?; + ensure!( + tenants_segment == TENANTS_SEGMENT_NAME, + "Failed to extract '{}' segment from storage path '{}'", + TENANTS_SEGMENT_NAME, + path.display() + ); + let tenant_id = segments + .next() + .ok_or_else(|| { + anyhow!( + "Found no tenant id in the storage path '{}'", + path.display() + ) + })? + .parse::() + .with_context(|| { format!( - "Failed to download metadata file contents for tenant {}, timeline {}", - info.tenant_id, info.timeline_id + "Failed to parse tenant id from storage path '{}'", + path.display() ) })?; - let metadata_bytes = metadata_bytes.into_inner().into_inner(); - Some(TimelineMetadata::from_bytes(&metadata_bytes)?) - } else { - None - }; - Ok((info, metadata)) + + let timelines_segment = segments.next().ok_or_else(|| { + anyhow!( + "Found no '{}' segment in the storage path '{}'", + TIMELINES_SEGMENT_NAME, + path.display() + ) + })?; + ensure!( + timelines_segment == TIMELINES_SEGMENT_NAME, + "Failed to extract '{}' segment from storage path '{}'", + TIMELINES_SEGMENT_NAME, + path.display() + ); + let timeline_id = segments + .next() + .ok_or_else(|| { + anyhow!( + "Found no timeline id in the storage path '{}'", + path.display() + ) + })? + .parse::() + .with_context(|| { + format!( + "Failed to parse timeline id from storage path '{}'", + path.display() + ) + })?; + + Ok((tenant_id, timeline_id)) } async fn download_timeline<'a, P, S: 'static + RemoteStorage>(