diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index e89f60de7e..6b3fd29a0e 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -344,6 +344,8 @@ impl Debug for S3Config { } } +/// Adds a suffix to the file(directory) name, either appending the suffux to the end of its extension, +/// or if there's no extension, creates one and puts a suffix there. pub fn path_with_suffix_extension(original_path: impl AsRef, suffix: &str) -> PathBuf { let new_extension = match original_path .as_ref() @@ -468,6 +470,11 @@ mod tests { &path_with_suffix_extension(&p, ".temp").to_string_lossy(), "/foo/bar.baz..temp" ); + let p = PathBuf::from("/foo/bar/dir/"); + assert_eq!( + &path_with_suffix_extension(&p, ".temp").to_string_lossy(), + "/foo/bar/dir..temp" + ); } #[test] diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index 2561c0ca24..3ffbf3cb39 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -21,6 +21,8 @@ use crate::{path_with_suffix_extension, Download, DownloadError, RemoteObjectId} use super::{strip_path_prefix, RemoteStorage, StorageMetadata}; +const LOCAL_FS_TEMP_FILE_SUFFIX: &str = "___temp"; + /// Convert a Path in the remote storage into a RemoteObjectId fn remote_object_id_from_path(path: &Path) -> anyhow::Result { Ok(RemoteObjectId( @@ -143,7 +145,8 @@ impl RemoteStorage for LocalFs { // We need this dance with sort of durable rename (without fsyncs) // to prevent partial uploads. This was really hit when pageserver shutdown // cancelled the upload and partial file was left on the fs - let temp_file_path = path_with_suffix_extension(&target_file_path, "temp"); + let temp_file_path = + path_with_suffix_extension(&target_file_path, LOCAL_FS_TEMP_FILE_SUFFIX); let mut destination = io::BufWriter::new( fs::OpenOptions::new() .write(true) diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index a31c2fd2a5..59142bd9b2 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -470,7 +470,7 @@ async fn tenant_list_handler(request: Request) -> Result, A let response_data = tokio::task::spawn_blocking(move || { let _enter = info_span!("tenant_list").entered(); - crate::tenant_mgr::list_tenants(&remote_index) + crate::tenant_mgr::list_tenant_info(&remote_index) }) .await .map_err(ApiError::from_err)?; @@ -640,7 +640,8 @@ async fn tenant_config_handler(mut request: Request) -> Result Result { + let _guard = match self.file_lock.try_read() { + Ok(g) => g, + Err(_) => { + info!("File lock write acquired, shutting down GC"); + return Ok(GcResult::default()); + } + }; + let timeline_str = target_timeline_id .map(|x| x.to_string()) .unwrap_or_else(|| "-".to_string()); @@ -315,6 +323,14 @@ impl Repository { /// Also it can be explicitly requested per timeline through page server /// api's 'compact' command. pub fn compaction_iteration(&self) -> Result<()> { + let _guard = match self.file_lock.try_read() { + Ok(g) => g, + Err(_) => { + info!("File lock write acquired, shutting down compaction"); + return Ok(()); + } + }; + // Scan through the hashmap and collect a list of all the timelines, // while holding the lock. Then drop the lock and actually perform the // compactions. We don't want to block everything else while the @@ -401,10 +417,10 @@ impl Repository { pub fn init_attach_timelines( &self, - timelines: Vec<(ZTimelineId, TimelineMetadata)>, + timelines: HashMap, ) -> anyhow::Result<()> { let sorted_timelines = if timelines.len() == 1 { - timelines + timelines.into_iter().collect() } else if !timelines.is_empty() { tree_sort_timelines(timelines)? } else { @@ -442,7 +458,7 @@ impl Repository { /// perform a topological sort, so that the parent of each timeline comes /// before the children. fn tree_sort_timelines( - timelines: Vec<(ZTimelineId, TimelineMetadata)>, + timelines: HashMap, ) -> Result> { let mut result = Vec::with_capacity(timelines.len()); @@ -567,13 +583,8 @@ impl Repository { .unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag) } - pub fn update_tenant_config(&self, new_tenant_conf: TenantConfOpt) -> Result<()> { - let mut tenant_conf = self.tenant_conf.write().unwrap(); - - tenant_conf.update(&new_tenant_conf); - - Repository::persist_tenant_config(self.conf, self.tenant_id, *tenant_conf)?; - Ok(()) + pub fn update_tenant_config(&self, new_tenant_conf: TenantConfOpt) { + self.tenant_conf.write().unwrap().update(&new_tenant_conf); } fn initialize_new_timeline( @@ -648,32 +659,37 @@ impl Repository { tenant_id: ZTenantId, ) -> anyhow::Result { let target_config_path = TenantConf::path(conf, tenant_id); + let target_config_display = target_config_path.display(); - info!("load tenantconf from {}", target_config_path.display()); + info!("loading tenantconf from {target_config_display}"); // FIXME If the config file is not found, assume that we're attaching // a detached tenant and config is passed via attach command. // https://github.com/neondatabase/neon/issues/1555 if !target_config_path.exists() { - info!( - "tenant config not found in {}", - target_config_path.display() - ); - return Ok(Default::default()); + info!("tenant config not found in {target_config_display}"); + return Ok(TenantConfOpt::default()); } // load and parse file - let config = fs::read_to_string(target_config_path)?; + let config = fs::read_to_string(&target_config_path).with_context(|| { + format!("Failed to load config from path '{target_config_display}'") + })?; - let toml = config.parse::()?; + let toml = config.parse::().with_context(|| { + format!("Failed to parse config from file '{target_config_display}' as toml file") + })?; - let mut tenant_conf: TenantConfOpt = Default::default(); + let mut tenant_conf = TenantConfOpt::default(); for (key, item) in toml.iter() { match key { "tenant_config" => { - tenant_conf = PageServerConf::parse_toml_tenant_conf(item)?; + tenant_conf = PageServerConf::parse_toml_tenant_conf(item).with_context(|| { + format!("Failed to parse config from file '{target_config_display}' as pageserver config") + })?; } - _ => bail!("unrecognized pageserver option '{}'", key), + _ => bail!("config file {target_config_display} has unrecognized pageserver option '{key}'"), + } } @@ -888,26 +904,6 @@ pub fn dump_layerfile_from_path(path: &Path, verbose: bool) -> Result<()> { Ok(()) } -pub fn load_metadata( - conf: &'static PageServerConf, - timeline_id: ZTimelineId, - tenant_id: ZTenantId, -) -> anyhow::Result { - let metadata_path = metadata_path(conf, timeline_id, tenant_id); - let metadata_bytes = std::fs::read(&metadata_path).with_context(|| { - format!( - "Failed to read metadata bytes from path {}", - metadata_path.display() - ) - })?; - TimelineMetadata::from_bytes(&metadata_bytes).with_context(|| { - format!( - "Failed to parse metadata bytes from path {}", - metadata_path.display() - ) - }) -} - #[cfg(test)] pub mod repo_harness { use bytes::{Bytes, BytesMut}; @@ -925,6 +921,7 @@ pub mod repo_harness { walredo::{WalRedoError, WalRedoManager}, }; + use super::metadata::metadata_path; use super::*; use crate::tenant_config::{TenantConf, TenantConfOpt}; use hex_literal::hex; @@ -1030,7 +1027,7 @@ pub mod repo_harness { false, ); // populate repo with locally available timelines - let mut timelines_to_load = Vec::new(); + let mut timelines_to_load = HashMap::new(); for timeline_dir_entry in fs::read_dir(self.conf.timelines_path(&self.tenant_id)) .expect("should be able to read timelines dir") { @@ -1042,7 +1039,7 @@ pub mod repo_harness { .to_string_lossy() .parse()?; let timeline_metadata = load_metadata(self.conf, timeline_id, self.tenant_id)?; - timelines_to_load.push((timeline_id, timeline_metadata)); + timelines_to_load.insert(timeline_id, timeline_metadata); } repo.init_attach_timelines(timelines_to_load)?; @@ -1054,6 +1051,26 @@ pub mod repo_harness { } } + fn load_metadata( + conf: &'static PageServerConf, + timeline_id: ZTimelineId, + tenant_id: ZTenantId, + ) -> anyhow::Result { + let metadata_path = metadata_path(conf, timeline_id, tenant_id); + let metadata_bytes = std::fs::read(&metadata_path).with_context(|| { + format!( + "Failed to read metadata bytes from path {}", + metadata_path.display() + ) + })?; + TimelineMetadata::from_bytes(&metadata_bytes).with_context(|| { + format!( + "Failed to parse metadata bytes from path {}", + metadata_path.display() + ) + }) + } + // Mock WAL redo manager that doesn't do much pub struct TestRedoManager; diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/layered_repository/delta_layer.rs index ce5cb57745..af02f84bc0 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/layered_repository/delta_layer.rs @@ -34,7 +34,7 @@ use crate::layered_repository::storage_layer::{ use crate::page_cache::{PageReadGuard, PAGE_SZ}; use crate::repository::{Key, Value, KEY_SIZE}; use crate::virtual_file::VirtualFile; -use crate::walrecord; +use crate::{walrecord, TEMP_FILE_SUFFIX}; use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION}; use anyhow::{bail, ensure, Context, Result}; use rand::{distributions::Alphanumeric, Rng}; @@ -447,11 +447,12 @@ impl DeltaLayer { .collect(); conf.timeline_path(&timelineid, &tenantid).join(format!( - "{}-XXX__{:016X}-{:016X}.{}.temp", + "{}-XXX__{:016X}-{:016X}.{}.{}", key_start, u64::from(lsn_range.start), u64::from(lsn_range.end), - rand_string + rand_string, + TEMP_FILE_SUFFIX, )) } diff --git a/pageserver/src/layered_repository/image_layer.rs b/pageserver/src/layered_repository/image_layer.rs index bb24553afd..4fe771bb3f 100644 --- a/pageserver/src/layered_repository/image_layer.rs +++ b/pageserver/src/layered_repository/image_layer.rs @@ -30,7 +30,7 @@ use crate::layered_repository::storage_layer::{ use crate::page_cache::PAGE_SZ; use crate::repository::{Key, Value, KEY_SIZE}; use crate::virtual_file::VirtualFile; -use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION}; +use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX}; use anyhow::{bail, ensure, Context, Result}; use bytes::Bytes; use hex; @@ -255,7 +255,7 @@ impl ImageLayer { .collect(); conf.timeline_path(&timelineid, &tenantid) - .join(format!("{}.{}.temp", fname, rand_string)) + .join(format!("{fname}.{rand_string}.{TEMP_FILE_SUFFIX}")) } /// diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 4731179e22..86bbf25b67 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -23,7 +23,10 @@ pub mod walreceiver; pub mod walrecord; pub mod walredo; +use std::collections::HashMap; + use tracing::info; +use utils::zid::{ZTenantId, ZTimelineId}; use crate::thread_mgr::ThreadKind; @@ -100,6 +103,50 @@ fn exponential_backoff_duration_seconds(n: u32, base_increment: f64, max_seconds } } +/// A newtype to store arbitrary data grouped by tenant and timeline ids. +/// One could use [`utils::zid::ZTenantTimelineId`] for grouping, but that would +/// not include the cases where a certain tenant has zero timelines. +/// This is sometimes important: a tenant could be registered during initial load from FS, +/// even if he has no timelines on disk. +#[derive(Debug)] +pub struct TenantTimelineValues(HashMap>); + +impl TenantTimelineValues { + fn new() -> Self { + Self(HashMap::new()) + } + + fn with_capacity(capacity: usize) -> Self { + Self(HashMap::with_capacity(capacity)) + } + + /// A convenience method to map certain values and omit some of them, if needed. + /// Tenants that won't have any timeline entries due to the filtering, will still be preserved + /// in the structure. + fn filter_map(self, map: F) -> TenantTimelineValues + where + F: Fn(T) -> Option, + { + let capacity = self.0.len(); + self.0.into_iter().fold( + TenantTimelineValues::::with_capacity(capacity), + |mut new_values, (tenant_id, old_values)| { + let new_timeline_values = new_values.0.entry(tenant_id).or_default(); + for (timeline_id, old_value) in old_values { + if let Some(new_value) = map(old_value) { + new_timeline_values.insert(timeline_id, new_value); + } + } + new_values + }, + ) + } +} + +/// A suffix to be used during file sync from the remote storage, +/// to ensure that we do not leave corrupted files that pretend to be layers. +const TEMP_FILE_SUFFIX: &str = "___temp"; + #[cfg(test)] mod backoff_defaults_tests { use super::*; @@ -130,3 +177,35 @@ mod backoff_defaults_tests { ); } } + +#[cfg(test)] +mod tests { + use crate::layered_repository::repo_harness::TIMELINE_ID; + + use super::*; + + #[test] + fn tenant_timeline_value_mapping() { + let first_tenant = ZTenantId::generate(); + let second_tenant = ZTenantId::generate(); + assert_ne!(first_tenant, second_tenant); + + let mut initial = TenantTimelineValues::new(); + initial + .0 + .entry(first_tenant) + .or_default() + .insert(TIMELINE_ID, "test_value"); + let _ = initial.0.entry(second_tenant).or_default(); + assert_eq!(initial.0.len(), 2, "Should have entries for both tenants"); + + let filtered = initial.filter_map(|_| None::<&str>).0; + assert_eq!( + filtered.len(), + 2, + "Should have entries for both tenants even after filtering away all entries" + ); + assert!(filtered.contains_key(&first_tenant)); + assert!(filtered.contains_key(&second_tenant)); + } +} diff --git a/pageserver/src/storage_sync.rs b/pageserver/src/storage_sync.rs index 42fd6b8ea8..57a964cb67 100644 --- a/pageserver/src/storage_sync.rs +++ b/pageserver/src/storage_sync.rs @@ -145,7 +145,6 @@ mod upload; use std::{ collections::{hash_map, HashMap, HashSet, VecDeque}, - ffi::OsStr, fmt::Debug, num::{NonZeroU32, NonZeroUsize}, ops::ControlFlow, @@ -170,244 +169,56 @@ use self::{ index::{IndexPart, RemoteTimeline, RemoteTimelineIndex}, upload::{upload_index_part, upload_timeline_layers, UploadedTimeline}, }; -use crate::metrics::{IMAGE_SYNC_TIME, REMAINING_SYNC_ITEMS, REMOTE_INDEX_UPLOAD}; use crate::{ config::PageServerConf, exponential_backoff, - layered_repository::{ - ephemeral_file::is_ephemeral_file, - metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME}, - }, - storage_sync::{self, index::RemoteIndex}, - tenant_mgr::attach_downloaded_tenants, + layered_repository::metadata::{metadata_path, TimelineMetadata}, + storage_sync::index::RemoteIndex, + tenant_mgr::attach_local_tenants, thread_mgr, thread_mgr::ThreadKind, }; +use crate::{ + metrics::{IMAGE_SYNC_TIME, REMAINING_SYNC_ITEMS, REMOTE_INDEX_UPLOAD}, + TenantTimelineValues, +}; use utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId}; use self::download::download_index_parts; pub use self::download::gather_tenant_timelines_index_parts; -pub use self::download::TEMP_DOWNLOAD_EXTENSION; static SYNC_QUEUE: OnceCell = OnceCell::new(); /// A timeline status to share with pageserver's sync counterpart, /// after comparing local and remote timeline state. -#[derive(Clone, Copy, Debug)] +#[derive(Clone)] pub enum LocalTimelineInitStatus { /// The timeline has every remote layer present locally. /// There could be some layers requiring uploading, /// but this does not block the timeline from any user interaction. - LocallyComplete, + LocallyComplete(TimelineMetadata), /// A timeline has some files remotely, that are not present locally and need downloading. /// Downloading might update timeline's metadata locally and current pageserver logic deals with local layers only, /// so the data needs to be downloaded first before the timeline can be used. NeedsSync, } -type LocalTimelineInitStatuses = HashMap>; +impl std::fmt::Debug for LocalTimelineInitStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::LocallyComplete(_) => write!(f, "LocallyComplete"), + Self::NeedsSync => write!(f, "NeedsSync"), + } + } +} /// A structure to combine all synchronization data to share with pageserver after a successful sync loop initialization. /// Successful initialization includes a case when sync loop is not started, in which case the startup data is returned still, /// to simplify the received code. pub struct SyncStartupData { pub remote_index: RemoteIndex, - pub local_timeline_init_statuses: LocalTimelineInitStatuses, -} - -/// Based on the config, initiates the remote storage connection and starts a separate thread -/// that ensures that pageserver and the remote storage are in sync with each other. -/// If no external configuration connection given, no thread or storage initialization is done. -/// Along with that, scans tenant files local and remote (if the sync gets enabled) to check the initial timeline states. -pub fn start_local_timeline_sync( - config: &'static PageServerConf, - storage: Option, -) -> anyhow::Result { - let local_timeline_files = local_tenant_timeline_files(config) - .context("Failed to collect local tenant timeline files")?; - - match storage.zip(config.remote_storage_config.as_ref()) { - Some((storage, storage_config)) => storage_sync::spawn_storage_sync_thread( - config, - local_timeline_files, - storage, - storage_config.max_concurrent_syncs, - storage_config.max_sync_errors, - ) - .context("Failed to spawn the storage sync thread"), - None => { - info!("No remote storage configured, skipping storage sync, considering all local timelines with correct metadata files enabled"); - let mut local_timeline_init_statuses = LocalTimelineInitStatuses::new(); - for ( - ZTenantTimelineId { - tenant_id, - timeline_id, - }, - _, - ) in local_timeline_files - { - local_timeline_init_statuses - .entry(tenant_id) - .or_default() - .insert(timeline_id, LocalTimelineInitStatus::LocallyComplete); - } - Ok(SyncStartupData { - local_timeline_init_statuses, - remote_index: RemoteIndex::default(), - }) - } - } -} - -fn local_tenant_timeline_files( - config: &'static PageServerConf, -) -> anyhow::Result)>> { - let mut local_tenant_timeline_files = HashMap::new(); - let tenants_dir = config.tenants_path(); - for tenants_dir_entry in std::fs::read_dir(&tenants_dir) - .with_context(|| format!("Failed to list tenants dir {}", tenants_dir.display()))? - { - match &tenants_dir_entry { - Ok(tenants_dir_entry) => { - match collect_timelines_for_tenant(config, &tenants_dir_entry.path()) { - Ok(collected_files) => { - local_tenant_timeline_files.extend(collected_files.into_iter()) - } - Err(e) => error!( - "Failed to collect tenant files from dir '{}' for entry {:?}, reason: {:#}", - tenants_dir.display(), - tenants_dir_entry, - e - ), - } - } - Err(e) => error!( - "Failed to list tenants dir entry {:?} in directory {}, reason: {:?}", - tenants_dir_entry, - tenants_dir.display(), - e - ), - } - } - - Ok(local_tenant_timeline_files) -} - -fn collect_timelines_for_tenant( - config: &'static PageServerConf, - tenant_path: &Path, -) -> anyhow::Result)>> { - let mut timelines = HashMap::new(); - let tenant_id = tenant_path - .file_name() - .and_then(OsStr::to_str) - .unwrap_or_default() - .parse::() - .context("Could not parse tenant id out of the tenant dir name")?; - let timelines_dir = config.timelines_path(&tenant_id); - - for timelines_dir_entry in std::fs::read_dir(&timelines_dir).with_context(|| { - format!( - "Failed to list timelines dir entry for tenant {}", - tenant_id - ) - })? { - match timelines_dir_entry { - Ok(timelines_dir_entry) => { - let timeline_path = timelines_dir_entry.path(); - match collect_timeline_files(&timeline_path) { - Ok((timeline_id, metadata, timeline_files)) => { - timelines.insert( - ZTenantTimelineId { - tenant_id, - timeline_id, - }, - (metadata, timeline_files), - ); - } - Err(e) => error!( - "Failed to process timeline dir contents at '{}', reason: {:?}", - timeline_path.display(), - e - ), - } - } - Err(e) => error!( - "Failed to list timelines for entry tenant {}, reason: {:?}", - tenant_id, e - ), - } - } - - Ok(timelines) -} - -// discover timeline files and extract timeline metadata -// NOTE: ephemeral files are excluded from the list -fn collect_timeline_files( - timeline_dir: &Path, -) -> anyhow::Result<(ZTimelineId, TimelineMetadata, HashSet)> { - let mut timeline_files = HashSet::new(); - let mut timeline_metadata_path = None; - - let timeline_id = timeline_dir - .file_name() - .and_then(OsStr::to_str) - .unwrap_or_default() - .parse::() - .context("Could not parse timeline id out of the timeline dir name")?; - let timeline_dir_entries = - std::fs::read_dir(&timeline_dir).context("Failed to list timeline dir contents")?; - for entry in timeline_dir_entries { - let entry_path = entry.context("Failed to list timeline dir entry")?.path(); - if entry_path.is_file() { - if entry_path.file_name().and_then(OsStr::to_str) == Some(METADATA_FILE_NAME) { - timeline_metadata_path = Some(entry_path); - } else if is_ephemeral_file(&entry_path.file_name().unwrap().to_string_lossy()) { - debug!("skipping ephemeral file {}", entry_path.display()); - continue; - } else if entry_path.extension().and_then(OsStr::to_str) - == Some(TEMP_DOWNLOAD_EXTENSION) - { - info!("removing temp download file at {}", entry_path.display()); - std::fs::remove_file(&entry_path).with_context(|| { - format!( - "failed to remove temp download file at {}", - entry_path.display() - ) - })?; - } else if entry_path.extension().and_then(OsStr::to_str) == Some("temp") { - info!("removing temp layer file at {}", entry_path.display()); - std::fs::remove_file(&entry_path).with_context(|| { - format!( - "failed to remove temp layer file at {}", - entry_path.display() - ) - })?; - } else { - timeline_files.insert(entry_path); - } - } - } - - // FIXME (rodionov) if attach call succeeded, and then pageserver is restarted before download is completed - // then attach is lost. There would be no retries for that, - // initial collect will fail because there is no metadata. - // We either need to start download if we see empty dir after restart or attach caller should - // be aware of that and retry attach if awaits_download for timeline switched from true to false - // but timelinne didn't appear locally. - // Check what happens with remote index in that case. - let timeline_metadata_path = match timeline_metadata_path { - Some(path) => path, - None => bail!("No metadata file found in the timeline directory"), - }; - let metadata = TimelineMetadata::from_bytes( - &std::fs::read(&timeline_metadata_path).context("Failed to read timeline metadata file")?, - ) - .context("Failed to parse timeline metadata file bytes")?; - - Ok((timeline_id, metadata, timeline_files)) + pub local_timeline_init_statuses: TenantTimelineValues, } /// Global queue of sync tasks. @@ -763,9 +574,9 @@ pub fn schedule_layer_download(tenant_id: ZTenantId, timeline_id: ZTimelineId) { /// Launch a thread to perform remote storage sync tasks. /// See module docs for loop step description. -pub(super) fn spawn_storage_sync_thread( +pub fn spawn_storage_sync_thread( conf: &'static PageServerConf, - local_timeline_files: HashMap)>, + local_timeline_files: TenantTimelineValues<(TimelineMetadata, HashSet)>, storage: GenericRemoteStorage, max_concurrent_timelines_sync: NonZeroUsize, max_sync_errors: NonZeroU32, @@ -784,19 +595,43 @@ pub(super) fn spawn_storage_sync_thread( .build() .context("Failed to create storage sync runtime")?; + // TODO we are able to "attach" empty tenants, but not doing it now since it might require big wait time: + // * we need to list every timeline for tenant on S3, that might be a costly operation + // * we need to download every timeline for the tenant, to activate it in memory + // + // When on-demand download gets merged, we're able to do this fast by storing timeline metadata only. + let mut empty_tenants = TenantTimelineValues::::new(); + let mut keys_for_index_part_downloads = HashSet::new(); + let mut timelines_to_sync = HashMap::new(); + + for (tenant_id, timeline_data) in local_timeline_files.0 { + if timeline_data.is_empty() { + let _ = empty_tenants.0.entry(tenant_id).or_default(); + } else { + for (timeline_id, timeline_data) in timeline_data { + let id = ZTenantTimelineId::new(tenant_id, timeline_id); + keys_for_index_part_downloads.insert(id); + timelines_to_sync.insert(id, timeline_data); + } + } + } + let applicable_index_parts = runtime.block_on(download_index_parts( conf, &storage, - local_timeline_files.keys().copied().collect(), + keys_for_index_part_downloads, )); let remote_index = RemoteIndex::from_parts(conf, applicable_index_parts)?; - let local_timeline_init_statuses = schedule_first_sync_tasks( + let mut local_timeline_init_statuses = schedule_first_sync_tasks( &mut runtime.block_on(remote_index.write()), sync_queue, - local_timeline_files, + timelines_to_sync, ); + local_timeline_init_statuses + .0 + .extend(empty_tenants.0.into_iter()); let remote_index_clone = remote_index.clone(); thread_mgr::spawn( @@ -872,10 +707,7 @@ fn storage_sync_loop( "Sync loop step completed, {} new tenant state update(s)", updated_tenants.len() ); - let mut timelines_to_attach: HashMap< - ZTenantId, - Vec<(ZTimelineId, TimelineMetadata)>, - > = HashMap::new(); + let mut timelines_to_attach = TenantTimelineValues::new(); let index_accessor = runtime.block_on(index.read()); for tenant_id in updated_tenants { let tenant_entry = match index_accessor.tenant_entry(&tenant_id) { @@ -901,7 +733,7 @@ fn storage_sync_loop( // and register them all at once in a repository for download // to be submitted in a single operation to repository // so it can apply them at once to internal timeline map. - timelines_to_attach.insert( + timelines_to_attach.0.insert( tenant_id, tenant_entry .iter() @@ -912,7 +744,9 @@ fn storage_sync_loop( } drop(index_accessor); // Batch timeline download registration to ensure that the external registration code won't block any running tasks before. - attach_downloaded_tenants(conf, &index, timelines_to_attach); + if let Err(e) = attach_local_tenants(conf, &index, timelines_to_attach) { + error!("Failed to attach new timelines: {e:?}"); + }; } } ControlFlow::Break(()) => { @@ -1443,11 +1277,10 @@ fn schedule_first_sync_tasks( index: &mut RemoteTimelineIndex, sync_queue: &SyncQueue, local_timeline_files: HashMap)>, -) -> LocalTimelineInitStatuses { - let mut local_timeline_init_statuses = LocalTimelineInitStatuses::new(); +) -> TenantTimelineValues { + let mut local_timeline_init_statuses = TenantTimelineValues::new(); - let mut new_sync_tasks = - VecDeque::with_capacity(local_timeline_files.len().max(local_timeline_files.len())); + let mut new_sync_tasks = VecDeque::with_capacity(local_timeline_files.len()); for (sync_id, (local_metadata, local_files)) in local_timeline_files { match index.timeline_entry_mut(&sync_id) { @@ -1459,18 +1292,27 @@ fn schedule_first_sync_tasks( local_files, remote_timeline, ); - let was_there = local_timeline_init_statuses + match local_timeline_init_statuses + .0 .entry(sync_id.tenant_id) .or_default() - .insert(sync_id.timeline_id, timeline_status); - - if was_there.is_some() { - // defensive check - warn!( - "Overwriting timeline init sync status. Status {timeline_status:?}, timeline {}", - sync_id.timeline_id - ); + .entry(sync_id.timeline_id) + { + hash_map::Entry::Occupied(mut o) => { + { + // defensive check + warn!( + "Overwriting timeline init sync status. Status {timeline_status:?}, timeline {}", + sync_id.timeline_id + ); + } + o.insert(timeline_status); + } + hash_map::Entry::Vacant(v) => { + v.insert(timeline_status); + } } + remote_timeline.awaits_download = awaits_download; } None => { @@ -1481,15 +1323,16 @@ fn schedule_first_sync_tasks( SyncTask::upload(LayersUpload { layers_to_upload: local_files, uploaded_layers: HashSet::new(), - metadata: Some(local_metadata), + metadata: Some(local_metadata.clone()), }), )); local_timeline_init_statuses + .0 .entry(sync_id.tenant_id) .or_default() .insert( sync_id.timeline_id, - LocalTimelineInitStatus::LocallyComplete, + LocalTimelineInitStatus::LocallyComplete(local_metadata), ); } } @@ -1523,7 +1366,10 @@ fn compare_local_and_remote_timeline( // we do not need to manipulate with remote consistent lsn here // because it will be updated when sync will be completed } else { - (LocalTimelineInitStatus::LocallyComplete, false) + ( + LocalTimelineInitStatus::LocallyComplete(local_metadata.clone()), + false, + ) }; let layers_to_upload = local_files diff --git a/pageserver/src/storage_sync/download.rs b/pageserver/src/storage_sync/download.rs index b0beb4219a..91ee557b79 100644 --- a/pageserver/src/storage_sync/download.rs +++ b/pageserver/src/storage_sync/download.rs @@ -18,6 +18,7 @@ use tracing::{debug, error, info, warn}; use crate::{ config::PageServerConf, layered_repository::metadata::metadata_path, storage_sync::SyncTask, + TEMP_FILE_SUFFIX, }; use utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId}; @@ -26,8 +27,6 @@ use super::{ LayersDownload, SyncData, SyncQueue, }; -pub const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download"; - // We collect timelines remotely available for each tenant // in case we failed to gather all index parts (due to an error) // Poisoned variant is returned. @@ -251,7 +250,7 @@ pub(super) async fn download_timeline_layers<'a>( // https://www.postgresql.org/message-id/56583BDD.9060302@2ndquadrant.com // If pageserver crashes the temp file will be deleted on startup and re-downloaded. let temp_file_path = - path_with_suffix_extension(&layer_destination_path, TEMP_DOWNLOAD_EXTENSION); + path_with_suffix_extension(&layer_destination_path, TEMP_FILE_SUFFIX); let mut destination_file = fs::File::create(&temp_file_path).await.with_context(|| { diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index 041bd50737..baa58f5eb5 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -3,24 +3,26 @@ use crate::config::PageServerConf; use crate::http::models::TenantInfo; -use crate::layered_repository::metadata::TimelineMetadata; -use crate::layered_repository::{load_metadata, Repository, Timeline}; +use crate::layered_repository::ephemeral_file::is_ephemeral_file; +use crate::layered_repository::metadata::{TimelineMetadata, METADATA_FILE_NAME}; +use crate::layered_repository::{Repository, Timeline}; use crate::storage_sync::index::{RemoteIndex, RemoteTimelineIndex}; use crate::storage_sync::{self, LocalTimelineInitStatus, SyncStartupData}; use crate::tenant_config::TenantConfOpt; use crate::thread_mgr::ThreadKind; use crate::walredo::PostgresRedoManager; -use crate::{thread_mgr, timelines, walreceiver}; +use crate::{thread_mgr, timelines, walreceiver, TenantTimelineValues, TEMP_FILE_SUFFIX}; use anyhow::Context; use remote_storage::GenericRemoteStorage; use serde::{Deserialize, Serialize}; -use std::collections::hash_map::Entry; -use std::collections::HashMap; +use std::collections::hash_map::{self, Entry}; +use std::collections::{HashMap, HashSet}; +use std::ffi::OsStr; use std::fmt; +use std::path::{Path, PathBuf}; use std::sync::Arc; use tokio::sync::mpsc; use tracing::*; -use utils::lsn::Lsn; pub use tenants_state::try_send_timeline_update; use utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId}; @@ -136,34 +138,49 @@ pub fn init_tenant_mgr( conf: &'static PageServerConf, remote_storage: Option, ) -> anyhow::Result { + let _entered = info_span!("init_tenant_mgr").entered(); let (timeline_updates_sender, timeline_updates_receiver) = mpsc::unbounded_channel::(); tenants_state::set_timeline_update_sender(timeline_updates_sender)?; walreceiver::init_wal_receiver_main_thread(conf, timeline_updates_receiver)?; - let SyncStartupData { - remote_index, - local_timeline_init_statuses, - } = storage_sync::start_local_timeline_sync(conf, remote_storage) - .context("Failed to set up local files sync with external storage")?; + let local_tenant_files = local_tenant_timeline_files(conf) + .context("Failed to collect local tenant timeline files")?; - for (tenant_id, local_timeline_init_statuses) in local_timeline_init_statuses { - if let Err(err) = - init_local_repository(conf, tenant_id, local_timeline_init_statuses, &remote_index) - { - // Report the error, but continue with the startup for other tenants. An error - // loading a tenant is serious, but it's better to complete the startup and - // serve other tenants, than fail completely. - error!("Failed to initialize local tenant {tenant_id}: {:?}", err); + let (remote_index, tenants_to_attach) = if let Some(storage) = remote_storage { + let storage_config = conf + .remote_storage_config + .as_ref() + .expect("remote storage without config"); - if let Err(err) = set_tenant_state(tenant_id, TenantState::Broken) { - error!( - "Failed to set tenant state to broken {tenant_id}: {:?}", - err - ); - } - } - } + let SyncStartupData { + remote_index, + local_timeline_init_statuses, + } = storage_sync::spawn_storage_sync_thread( + conf, + local_tenant_files, + storage, + storage_config.max_concurrent_syncs, + storage_config.max_sync_errors, + ) + .context("Failed to spawn the storage sync thread")?; + + ( + remote_index, + local_timeline_init_statuses.filter_map(|init_status| match init_status { + LocalTimelineInitStatus::LocallyComplete(metadata) => Some(metadata), + LocalTimelineInitStatus::NeedsSync => None, + }), + ) + } else { + info!("No remote storage configured, skipping storage sync, considering all local timelines with correct metadata files enabled"); + ( + RemoteIndex::default(), + local_tenant_files.filter_map(|(metadata, _)| Some(metadata)), + ) + }; + + attach_local_tenants(conf, &remote_index, tenants_to_attach)?; Ok(remote_index) } @@ -189,35 +206,69 @@ impl std::fmt::Debug for LocalTimelineUpdate { } } -/// Updates tenants' repositories, changing their timelines state in memory. -pub fn attach_downloaded_tenants( +/// Reads local files to load tenants and their timelines given into pageserver's memory. +/// Ignores other timelines that might be present for tenant, but were not passed as a parameter. +/// Attempts to load as many entites as possible: if a certain timeline fails during the load, the tenant is marked as "Broken", +/// and the load continues. +pub fn attach_local_tenants( conf: &'static PageServerConf, remote_index: &RemoteIndex, - sync_status_updates: HashMap>, -) { - if sync_status_updates.is_empty() { - debug!("No sync status updates to apply"); - return; - } - for (tenant_id, downloaded_timelines) in sync_status_updates { - info!( - "Registering downlloaded timelines for {tenant_id} {} timelines", - downloaded_timelines.len() - ); - debug!("Downloaded timelines: {downloaded_timelines:?}"); + tenants_to_attach: TenantTimelineValues, +) -> anyhow::Result<()> { + let _entered = info_span!("attach_local_tenants").entered(); + let number_of_tenants = tenants_to_attach.0.len(); - let repo = match load_local_repo(conf, tenant_id, remote_index) { - Ok(repo) => repo, - Err(e) => { - error!("Failed to load repo for tenant {tenant_id} Error: {e:?}"); - continue; + for (tenant_id, local_timelines) in tenants_to_attach.0 { + info!( + "Attaching {} timelines for {tenant_id}", + local_timelines.len() + ); + debug!("Timelines to attach: {local_timelines:?}"); + + let repository = load_local_repo(conf, tenant_id, remote_index) + .context("Failed to load repository for tenant")?; + + let repo = Arc::clone(&repository); + { + match tenants_state::write_tenants().entry(tenant_id) { + hash_map::Entry::Occupied(_) => { + anyhow::bail!("Cannot attach tenant {tenant_id}: there's already an entry in the tenant state"); + } + hash_map::Entry::Vacant(v) => { + v.insert(Tenant { + state: TenantState::Idle, + repo, + }); + } } - }; - match repo.init_attach_timelines(downloaded_timelines) { - Ok(()) => info!("successfully loaded local timelines for tenant {tenant_id}"), - Err(e) => error!("Failed to load local timelines for tenant {tenant_id}: {e:?}"), } + // XXX: current timeline init enables walreceiver that looks for tenant in the state, so insert the tenant entry before + repository + .init_attach_timelines(local_timelines) + .context("Failed to attach timelines for tenant")?; } + + info!("Processed {number_of_tenants} local tenants during attach"); + Ok(()) +} + +fn load_local_repo( + conf: &'static PageServerConf, + tenant_id: ZTenantId, + remote_index: &RemoteIndex, +) -> anyhow::Result> { + let repository = Repository::new( + conf, + TenantConfOpt::default(), + Arc::new(PostgresRedoManager::new(conf, tenant_id)), + tenant_id, + remote_index.clone(), + conf.remote_storage_config.is_some(), + ); + let tenant_conf = Repository::load_tenant_config(conf, tenant_id)?; + repository.update_tenant_config(tenant_conf); + + Ok(Arc::new(repository)) } /// @@ -293,13 +344,14 @@ pub fn create_tenant_repository( } pub fn update_tenant_config( + conf: &'static PageServerConf, tenant_conf: TenantConfOpt, tenant_id: ZTenantId, ) -> anyhow::Result<()> { info!("configuring tenant {tenant_id}"); - let repo = get_repository_for_tenant(tenant_id)?; + get_repository_for_tenant(tenant_id)?.update_tenant_config(tenant_conf); - repo.update_tenant_config(tenant_conf)?; + Repository::persist_tenant_config(conf, tenant_id, tenant_conf)?; Ok(()) } @@ -392,7 +444,7 @@ pub fn delete_timeline(tenant_id: ZTenantId, timeline_id: ZTimelineId) -> anyhow debug!("waiting for threads to shutdown"); thread_mgr::shutdown_threads(None, None, Some(timeline_id)); debug!("thread shutdown completed"); - match tenants_state::write_tenants().get_mut(&tenant_id) { + match tenants_state::read_tenants().get(&tenant_id) { Some(tenant) => tenant.repo.delete_timeline(timeline_id)?, None => anyhow::bail!("Tenant {tenant_id} not found in local tenant state"), } @@ -428,12 +480,10 @@ pub fn detach_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> any // need to use crossbeam-channel for (timeline_id, join_handle) in walreceiver_join_handles { info!("waiting for wal receiver to shutdown timeline_id {timeline_id}"); - join_handle.recv().context("failed to join walreceiver")?; + join_handle.recv().ok(); info!("wal receiver shutdown confirmed timeline_id {timeline_id}"); } - tenants_state::write_tenants().remove(&tenant_id); - // If removal fails there will be no way to successfully retry detach, // because the tenant no longer exists in the in-memory map. And it needs to be removed from it // before we remove files, because it contains references to repository @@ -443,7 +493,7 @@ pub fn detach_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> any let local_tenant_directory = conf.tenant_path(&tenant_id); std::fs::remove_dir_all(&local_tenant_directory).with_context(|| { format!( - "Failed to remove local timeline directory '{}'", + "Failed to remove local tenant directory '{}'", local_tenant_directory.display() ) })?; @@ -454,7 +504,7 @@ pub fn detach_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> any /// /// Get list of tenants, for the mgmt API /// -pub fn list_tenants(remote_index: &RemoteTimelineIndex) -> Vec { +pub fn list_tenant_info(remote_index: &RemoteTimelineIndex) -> Vec { tenants_state::read_tenants() .iter() .map(|(id, tenant)| { @@ -478,98 +528,248 @@ pub fn list_tenants(remote_index: &RemoteTimelineIndex) -> Vec { .collect() } -/// Check if a given timeline is "broken" \[1\]. -/// The function returns an error if the timeline is "broken". -/// -/// \[1\]: it's not clear now how should we classify a timeline as broken. -/// A timeline is categorized as broken when any of following conditions is true: -/// - failed to load the timeline's metadata -/// - the timeline's disk consistent LSN is zero -fn check_broken_timeline( - conf: &'static PageServerConf, - tenant_id: ZTenantId, - timeline_id: ZTimelineId, -) -> anyhow::Result { - let metadata = - load_metadata(conf, timeline_id, tenant_id).context("failed to load metadata")?; +/// Attempts to collect information about all tenant and timelines, existing on the local FS. +/// If finds any, deletes all temporary files and directories, created before. Also removes empty directories, +/// that may appear due to such removals. +/// Does not fail on particular timeline or tenant collection errors, rather logging them and ignoring the entities. +fn local_tenant_timeline_files( + config: &'static PageServerConf, +) -> anyhow::Result)>> { + let _entered = info_span!("local_tenant_timeline_files").entered(); - // A timeline with zero disk consistent LSN can happen when the page server - // failed to checkpoint the timeline import data when creating that timeline. - if metadata.disk_consistent_lsn() == Lsn::INVALID { - anyhow::bail!("Timeline {timeline_id} has a zero disk consistent LSN."); + let mut local_tenant_timeline_files = TenantTimelineValues::new(); + let tenants_dir = config.tenants_path(); + for tenants_dir_entry in std::fs::read_dir(&tenants_dir) + .with_context(|| format!("Failed to list tenants dir {}", tenants_dir.display()))? + { + match &tenants_dir_entry { + Ok(tenants_dir_entry) => { + let tenant_dir_path = tenants_dir_entry.path(); + if is_temporary(&tenant_dir_path) { + info!( + "Found temporary tenant directory, removing: {}", + tenant_dir_path.display() + ); + if let Err(e) = std::fs::remove_dir_all(&tenant_dir_path) { + error!( + "Failed to remove temporary directory '{}': {:?}", + tenant_dir_path.display(), + e + ); + } + } else { + match collect_timelines_for_tenant(config, &tenant_dir_path) { + Ok((tenant_id, collected_files)) => { + if collected_files.is_empty() { + match remove_if_empty(&tenant_dir_path) { + Ok(true) => info!("Removed empty tenant directory {}", tenant_dir_path.display()), + Ok(false) => { + // insert empty timeline entry: it has some non-temporary files inside that we cannot remove + // so make obvious for HTTP API callers, that something exists there and try to load the tenant + let _ = local_tenant_timeline_files.0.entry(tenant_id).or_default(); + }, + Err(e) => error!("Failed to remove empty tenant directory: {e:?}"), + } + } else { + local_tenant_timeline_files.0.entry(tenant_id).or_default().extend(collected_files.into_iter()) + } + }, + Err(e) => error!( + "Failed to collect tenant files from dir '{}' for entry {:?}, reason: {:#}", + tenants_dir.display(), + tenants_dir_entry, + e + ), + } + } + } + Err(e) => error!( + "Failed to list tenants dir entry {:?} in directory {}, reason: {:?}", + tenants_dir_entry, + tenants_dir.display(), + e + ), + } } - Ok(metadata) + info!( + "Collected files for {} tenants", + local_tenant_timeline_files.0.len() + ); + Ok(local_tenant_timeline_files) } -/// Note: all timelines are attached at once if and only if all of them are locally complete -fn init_local_repository( - conf: &'static PageServerConf, - tenant_id: ZTenantId, - local_timeline_init_statuses: HashMap, - remote_index: &RemoteIndex, -) -> anyhow::Result<(), anyhow::Error> { - let mut timelines_to_attach = Vec::new(); - for (timeline_id, init_status) in local_timeline_init_statuses { - match init_status { - LocalTimelineInitStatus::LocallyComplete => { - debug!("timeline {timeline_id} for tenant {tenant_id} is locally complete, registering it in repository"); - let metadata = check_broken_timeline(conf, tenant_id, timeline_id) - .context("found broken timeline")?; - timelines_to_attach.push((timeline_id, metadata)); +fn remove_if_empty(tenant_dir_path: &Path) -> anyhow::Result { + let directory_is_empty = tenant_dir_path + .read_dir() + .with_context(|| { + format!( + "Failed to read directory '{}' contents", + tenant_dir_path.display() + ) + })? + .next() + .is_none(); + + if directory_is_empty { + std::fs::remove_dir_all(&tenant_dir_path).with_context(|| { + format!( + "Failed to remove empty directory '{}'", + tenant_dir_path.display(), + ) + })?; + + Ok(true) + } else { + Ok(false) + } +} + +fn is_temporary(path: &Path) -> bool { + match path.file_name() { + Some(name) => name.to_string_lossy().ends_with(TEMP_FILE_SUFFIX), + None => false, + } +} + +#[allow(clippy::type_complexity)] +fn collect_timelines_for_tenant( + config: &'static PageServerConf, + tenant_path: &Path, +) -> anyhow::Result<( + ZTenantId, + HashMap)>, +)> { + let tenant_id = tenant_path + .file_name() + .and_then(OsStr::to_str) + .unwrap_or_default() + .parse::() + .context("Could not parse tenant id out of the tenant dir name")?; + let timelines_dir = config.timelines_path(&tenant_id); + + let mut tenant_timelines = HashMap::new(); + for timelines_dir_entry in std::fs::read_dir(&timelines_dir) + .with_context(|| format!("Failed to list timelines dir entry for tenant {tenant_id}"))? + { + match timelines_dir_entry { + Ok(timelines_dir_entry) => { + let timeline_dir = timelines_dir_entry.path(); + if is_temporary(&timeline_dir) { + info!( + "Found temporary timeline directory, removing: {}", + timeline_dir.display() + ); + if let Err(e) = std::fs::remove_dir_all(&timeline_dir) { + error!( + "Failed to remove temporary directory '{}': {:?}", + timeline_dir.display(), + e + ); + } + } else { + match collect_timeline_files(&timeline_dir) { + Ok((timeline_id, metadata, timeline_files)) => { + tenant_timelines.insert(timeline_id, (metadata, timeline_files)); + } + Err(e) => { + error!( + "Failed to process timeline dir contents at '{}', reason: {:?}", + timeline_dir.display(), + e + ); + match remove_if_empty(&timeline_dir) { + Ok(true) => info!( + "Removed empty timeline directory {}", + timeline_dir.display() + ), + Ok(false) => (), + Err(e) => { + error!("Failed to remove empty timeline directory: {e:?}") + } + } + } + } + } } - LocalTimelineInitStatus::NeedsSync => { - debug!( - "timeline {tenant_id} for tenant {timeline_id} needs sync, \ - so skipped for adding into repository until sync is finished" - ); - return Ok(()); + Err(e) => { + error!("Failed to list timelines for entry tenant {tenant_id}, reason: {e:?}") } } } - // initialize local tenant - let repo = load_local_repo(conf, tenant_id, remote_index) - .with_context(|| format!("Failed to load repo for tenant {tenant_id}"))?; - - // Lets fail here loudly to be on the safe side. - // XXX: It may be a better api to actually distinguish between repository startup - // and processing of newly downloaded timelines. - repo.init_attach_timelines(timelines_to_attach) - .with_context(|| format!("Failed to init local timelines for tenant {tenant_id}"))?; - Ok(()) -} - -// Sets up wal redo manager and repository for tenant. Reduces code duplication. -// Used during pageserver startup, or when new tenant is attached to pageserver. -fn load_local_repo( - conf: &'static PageServerConf, - tenant_id: ZTenantId, - remote_index: &RemoteIndex, -) -> anyhow::Result> { - let mut m = tenants_state::write_tenants(); - let tenant = m.entry(tenant_id).or_insert_with(|| { - // Set up a WAL redo manager, for applying WAL records. - let walredo_mgr = PostgresRedoManager::new(conf, tenant_id); - - // Set up an object repository, for actual data storage. - let repo: Arc = Arc::new(Repository::new( - conf, - TenantConfOpt::default(), - Arc::new(walredo_mgr), - tenant_id, - remote_index.clone(), - conf.remote_storage_config.is_some(), - )); - Tenant { - state: TenantState::Idle, - repo, + if tenant_timelines.is_empty() { + match remove_if_empty(&timelines_dir) { + Ok(true) => info!( + "Removed empty tenant timelines directory {}", + timelines_dir.display() + ), + Ok(false) => (), + Err(e) => error!("Failed to remove empty tenant timelines directory: {e:?}"), } - }); + } - // Restore tenant config - let tenant_conf = Repository::load_tenant_config(conf, tenant_id)?; - tenant.repo.update_tenant_config(tenant_conf)?; - - Ok(Arc::clone(&tenant.repo)) + Ok((tenant_id, tenant_timelines)) +} + +// discover timeline files and extract timeline metadata +// NOTE: ephemeral files are excluded from the list +fn collect_timeline_files( + timeline_dir: &Path, +) -> anyhow::Result<(ZTimelineId, TimelineMetadata, HashSet)> { + let mut timeline_files = HashSet::new(); + let mut timeline_metadata_path = None; + + let timeline_id = timeline_dir + .file_name() + .and_then(OsStr::to_str) + .unwrap_or_default() + .parse::() + .context("Could not parse timeline id out of the timeline dir name")?; + let timeline_dir_entries = + std::fs::read_dir(&timeline_dir).context("Failed to list timeline dir contents")?; + for entry in timeline_dir_entries { + let entry_path = entry.context("Failed to list timeline dir entry")?.path(); + if entry_path.is_file() { + if entry_path.file_name().and_then(OsStr::to_str) == Some(METADATA_FILE_NAME) { + timeline_metadata_path = Some(entry_path); + } else if is_ephemeral_file(&entry_path.file_name().unwrap().to_string_lossy()) { + debug!("skipping ephemeral file {}", entry_path.display()); + continue; + } else if is_temporary(&entry_path) { + info!("removing temp timeline file at {}", entry_path.display()); + std::fs::remove_file(&entry_path).with_context(|| { + format!( + "failed to remove temp download file at {}", + entry_path.display() + ) + })?; + } else { + timeline_files.insert(entry_path); + } + } + } + + // FIXME (rodionov) if attach call succeeded, and then pageserver is restarted before download is completed + // then attach is lost. There would be no retries for that, + // initial collect will fail because there is no metadata. + // We either need to start download if we see empty dir after restart or attach caller should + // be aware of that and retry attach if awaits_download for timeline switched from true to false + // but timelinne didn't appear locally. + // Check what happens with remote index in that case. + let timeline_metadata_path = match timeline_metadata_path { + Some(path) => path, + None => anyhow::bail!("No metadata file found in the timeline directory"), + }; + let metadata = TimelineMetadata::from_bytes( + &std::fs::read(&timeline_metadata_path).context("Failed to read timeline metadata file")?, + ) + .context("Failed to parse timeline metadata file bytes")?; + + anyhow::ensure!( + metadata.ancestor_timeline().is_some() || !timeline_files.is_empty(), + "Timeline has no ancestor and no layer files" + ); + + Ok((timeline_id, metadata, timeline_files)) } diff --git a/pageserver/src/tenant_tasks.rs b/pageserver/src/tenant_tasks.rs index 11be13b80c..4e9a5fc6ec 100644 --- a/pageserver/src/tenant_tasks.rs +++ b/pageserver/src/tenant_tasks.rs @@ -34,11 +34,6 @@ async fn compaction_loop(tenantid: ZTenantId, mut cancel: watch::Receiver<()>) { // Break if we're not allowed to write to disk let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; - // TODO do this inside repo.compaction_iteration instead. - let _guard = match repo.file_lock.try_read() { - Ok(g) => g, - Err(_) => return Ok(ControlFlow::Break(())), - }; // Run compaction let compaction_period = repo.get_compaction_period(); @@ -233,11 +228,6 @@ async fn gc_loop(tenantid: ZTenantId, mut cancel: watch::Receiver<()>) { // Break if we're not allowed to write to disk let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; - // TODO do this inside repo.gc_iteration instead. - let _guard = match repo.file_lock.try_read() { - Ok(g) => g, - Err(_) => return Ok(ControlFlow::Break(())), - }; // Run gc let gc_period = repo.get_gc_period(); diff --git a/pageserver/src/timelines.rs b/pageserver/src/timelines.rs index 936699c2ec..9356893908 100644 --- a/pageserver/src/timelines.rs +++ b/pageserver/src/timelines.rs @@ -3,6 +3,7 @@ // use anyhow::{bail, ensure, Context, Result}; +use remote_storage::path_with_suffix_extension; use std::{ fs, @@ -18,12 +19,12 @@ use utils::{ zid::{ZTenantId, ZTimelineId}, }; -use crate::import_datadir; use crate::tenant_mgr; use crate::CheckpointConfig; use crate::{ config::PageServerConf, storage_sync::index::RemoteIndex, tenant_config::TenantConfOpt, }; +use crate::{import_datadir, TEMP_FILE_SUFFIX}; use crate::{ layered_repository::{Repository, Timeline}, walredo::WalRedoManager, @@ -105,13 +106,17 @@ fn run_initdb(conf: &'static PageServerConf, initdbpath: &Path) -> Result<()> { // fn bootstrap_timeline( conf: &'static PageServerConf, - tenantid: ZTenantId, - tli: ZTimelineId, + tenant_id: ZTenantId, + timeline_id: ZTimelineId, repo: &Repository, ) -> Result> { - let initdb_path = conf - .tenant_path(&tenantid) - .join(format!("tmp-timeline-{}", tli)); + // create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/` + // temporary directory for basebackup files for the given timeline. + let initdb_path = path_with_suffix_extension( + conf.timelines_path(&tenant_id) + .join(format!("basebackup-{timeline_id}")), + TEMP_FILE_SUFFIX, + ); // Init temporarily repo to get bootstrap data run_initdb(conf, &initdb_path)?; @@ -123,7 +128,7 @@ fn bootstrap_timeline( // LSN, and any WAL after that. // Initdb lsn will be equal to last_record_lsn which will be set after import. // Because we know it upfront avoid having an option or dummy zero value by passing it to create_empty_timeline. - let timeline = repo.create_empty_timeline(tli, lsn)?; + let timeline = repo.create_empty_timeline(timeline_id, lsn)?; import_datadir::import_timeline_from_postgres_datadir(&pgdata_path, &*timeline, lsn)?; fail::fail_point!("before-checkpoint-new-timeline", |_| { @@ -134,7 +139,7 @@ fn bootstrap_timeline( info!( "created root timeline {} timeline.lsn {}", - tli, + timeline_id, timeline.get_last_record_lsn() ); diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 4e49fd9373..dd946659bb 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -21,6 +21,7 @@ use byteorder::{ByteOrder, LittleEndian}; use bytes::{BufMut, Bytes, BytesMut}; use nix::poll::*; +use remote_storage::path_with_suffix_extension; use serde::Serialize; use std::fs; use std::fs::OpenOptions; @@ -37,7 +38,6 @@ use std::time::Instant; use tracing::*; use utils::{bin_ser::BeSer, lsn::Lsn, nonblock::set_nonblock, zid::ZTenantId}; -use crate::config::PageServerConf; use crate::metrics::{ WAL_REDO_RECORDS_HISTOGRAM, WAL_REDO_RECORD_COUNTER, WAL_REDO_TIME, WAL_REDO_WAIT_TIME, }; @@ -45,6 +45,7 @@ use crate::pgdatadir_mapping::{key_to_rel_block, key_to_slru_block}; use crate::reltag::{RelTag, SlruKind}; use crate::repository::Key; use crate::walrecord::ZenithWalRecord; +use crate::{config::PageServerConf, TEMP_FILE_SUFFIX}; use postgres_ffi::v14::nonrelfile_utils::{ mx_offset_to_flags_bitshift, mx_offset_to_flags_offset, mx_offset_to_member_offset, transaction_id_set_status, @@ -569,20 +570,24 @@ impl PostgresRedoProcess { // // Start postgres binary in special WAL redo mode. // - fn launch(conf: &PageServerConf, tenantid: &ZTenantId) -> Result { + fn launch(conf: &PageServerConf, tenant_id: &ZTenantId) -> Result { // FIXME: We need a dummy Postgres cluster to run the process in. Currently, we // just create one with constant name. That fails if you try to launch more than // one WAL redo manager concurrently. - let datadir = conf.tenant_path(tenantid).join("wal-redo-datadir"); + let datadir = path_with_suffix_extension( + conf.tenant_path(tenant_id).join("wal-redo-datadir"), + TEMP_FILE_SUFFIX, + ); // Create empty data directory for wal-redo postgres, deleting old one first. if datadir.exists() { - info!("directory {:?} exists, removing", &datadir); - if let Err(e) = fs::remove_dir_all(&datadir) { - error!("could not remove old wal-redo-datadir: {:#}", e); - } + info!( + "old temporary datadir {} exists, removing", + datadir.display() + ); + fs::remove_dir_all(&datadir)?; } - info!("running initdb in {:?}", datadir.display()); + info!("running initdb in {}", datadir.display()); let initdb = Command::new(conf.pg_bin_dir().join("initdb")) .args(&["-D", &datadir.to_string_lossy()]) .arg("-N") @@ -591,7 +596,7 @@ impl PostgresRedoProcess { .env("DYLD_LIBRARY_PATH", conf.pg_lib_dir()) .close_fds() .output() - .map_err(|e| Error::new(e.kind(), format!("failed to execute initdb: {}", e)))?; + .map_err(|e| Error::new(e.kind(), format!("failed to execute initdb: {e}")))?; if !initdb.status.success() { return Err(Error::new( @@ -645,7 +650,7 @@ impl PostgresRedoProcess { })?; info!( - "launched WAL redo postgres process on {:?}", + "launched WAL redo postgres process on {}", datadir.display() ); diff --git a/test_runner/regress/test_broken_timeline.py b/test_runner/regress/test_broken_timeline.py index 4aba2494e9..1d083b3ef9 100644 --- a/test_runner/regress/test_broken_timeline.py +++ b/test_runner/regress/test_broken_timeline.py @@ -32,33 +32,34 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder): # Leave the first timeline alone, but corrupt the others in different ways (tenant0, timeline0, pg0) = tenant_timelines[0] + log.info(f"Timeline {tenant0}/{timeline0} is left intact") - # Corrupt metadata file on timeline 1 (tenant1, timeline1, pg1) = tenant_timelines[1] - metadata_path = "{}/tenants/{}/timelines/{}/metadata".format(env.repo_dir, tenant1, timeline1) - print(f"overwriting metadata file at {metadata_path}") + metadata_path = f"{env.repo_dir}/tenants/{tenant1}/timelines/{timeline1}/metadata" f = open(metadata_path, "w") f.write("overwritten with garbage!") f.close() + log.info(f"Timeline {tenant1}/{timeline1} got its metadata spoiled") - # Missing layer files file on timeline 2. (This would actually work - # if we had Cloud Storage enabled in this test.) (tenant2, timeline2, pg2) = tenant_timelines[2] - timeline_path = "{}/tenants/{}/timelines/{}/".format(env.repo_dir, tenant2, timeline2) + timeline_path = f"{env.repo_dir}/tenants/{tenant2}/timelines/{timeline2}/" for filename in os.listdir(timeline_path): if filename.startswith("00000"): # Looks like a layer file. Remove it os.remove(f"{timeline_path}/{filename}") + log.info( + f"Timeline {tenant2}/{timeline2} got its layer files removed (no remote storage enabled)" + ) - # Corrupt layer files file on timeline 3 (tenant3, timeline3, pg3) = tenant_timelines[3] - timeline_path = "{}/tenants/{}/timelines/{}/".format(env.repo_dir, tenant3, timeline3) + timeline_path = f"{env.repo_dir}/tenants/{tenant3}/timelines/{timeline3}/" for filename in os.listdir(timeline_path): if filename.startswith("00000"): # Looks like a layer file. Corrupt it f = open(f"{timeline_path}/{filename}", "w") f.write("overwritten with garbage!") f.close() + log.info(f"Timeline {tenant3}/{timeline3} got its layer files spoiled") env.pageserver.start() @@ -69,20 +70,28 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder): # But all others are broken # First timeline would not get loaded into pageserver due to corrupt metadata file - (_tenant, _timeline, pg) = tenant_timelines[1] with pytest.raises( Exception, match=f"Could not get timeline {timeline1} in tenant {tenant1}" ) as err: - pg.start() + pg1.start() + log.info(f"compute startup failed eagerly for timeline with corrupt metadata: {err}") + + # Second timeline has no ancestors, only the metadata file and no layer files + # We don't have the remote storage enabled, which means timeline is in an incorrect state, + # it's not loaded at all + with pytest.raises( + Exception, match=f"Could not get timeline {timeline2} in tenant {tenant2}" + ) as err: + pg2.start() log.info(f"compute startup failed eagerly for timeline with corrupt metadata: {err}") # Yet other timelines will fail when their layers will be queried during basebackup: we don't check layer file contents on startup, when loading the timeline - for n in range(2, 4): - (_tenant, _timeline, pg) = tenant_timelines[n] + for n in range(3, 4): + (bad_tenant, bad_timeline, pg) = tenant_timelines[n] with pytest.raises(Exception, match="extracting base backup failed") as err: pg.start() log.info( - f"compute startup failed lazily for timeline with corrupt layers, during basebackup preparation: {err}" + f"compute startup failed lazily for timeline {bad_tenant}/{bad_timeline} with corrupt layers, during basebackup preparation: {err}" ) @@ -107,6 +116,8 @@ def test_fix_broken_timelines_on_startup(neon_simple_env: NeonEnv): tenant_id, _ = env.neon_cli.create_tenant() + old_tenant_timelines = env.neon_cli.list_timelines(tenant_id) + # Introduce failpoint when creating a new timeline env.pageserver.safe_psql("failpoints before-checkpoint-new-timeline=return") with pytest.raises(Exception, match="before-checkpoint-new-timeline"): @@ -116,6 +127,8 @@ def test_fix_broken_timelines_on_startup(neon_simple_env: NeonEnv): env.neon_cli.pageserver_stop(immediate=True) env.neon_cli.pageserver_start() - # Check that tenant with "broken" timeline is not loaded. - with pytest.raises(Exception, match=f"Failed to get repo for tenant {tenant_id}"): - env.neon_cli.list_timelines(tenant_id) + # Creating the timeline didn't finish. The other timelines on tenant should still be present and work normally. + new_tenant_timelines = env.neon_cli.list_timelines(tenant_id) + assert ( + new_tenant_timelines == old_tenant_timelines + ), f"Pageserver after restart should ignore non-initialized timelines for tenant {tenant_id}"