Ensure all temporary and empty directories and files are cleansed on pageserver startup

This commit is contained in:
Kirill Bulatov
2022-09-07 17:03:20 +03:00
committed by Kirill Bulatov
parent d3f83eda52
commit c9e7c2f014
14 changed files with 639 additions and 473 deletions

View File

@@ -344,6 +344,8 @@ impl Debug for S3Config {
}
}
/// Adds a suffix to the file(directory) name, either appending the suffux to the end of its extension,
/// or if there's no extension, creates one and puts a suffix there.
pub fn path_with_suffix_extension(original_path: impl AsRef<Path>, suffix: &str) -> PathBuf {
let new_extension = match original_path
.as_ref()
@@ -468,6 +470,11 @@ mod tests {
&path_with_suffix_extension(&p, ".temp").to_string_lossy(),
"/foo/bar.baz..temp"
);
let p = PathBuf::from("/foo/bar/dir/");
assert_eq!(
&path_with_suffix_extension(&p, ".temp").to_string_lossy(),
"/foo/bar/dir..temp"
);
}
#[test]

View File

@@ -21,6 +21,8 @@ use crate::{path_with_suffix_extension, Download, DownloadError, RemoteObjectId}
use super::{strip_path_prefix, RemoteStorage, StorageMetadata};
const LOCAL_FS_TEMP_FILE_SUFFIX: &str = "___temp";
/// Convert a Path in the remote storage into a RemoteObjectId
fn remote_object_id_from_path(path: &Path) -> anyhow::Result<RemoteObjectId> {
Ok(RemoteObjectId(
@@ -143,7 +145,8 @@ impl RemoteStorage for LocalFs {
// We need this dance with sort of durable rename (without fsyncs)
// to prevent partial uploads. This was really hit when pageserver shutdown
// cancelled the upload and partial file was left on the fs
let temp_file_path = path_with_suffix_extension(&target_file_path, "temp");
let temp_file_path =
path_with_suffix_extension(&target_file_path, LOCAL_FS_TEMP_FILE_SUFFIX);
let mut destination = io::BufWriter::new(
fs::OpenOptions::new()
.write(true)

View File

@@ -470,7 +470,7 @@ async fn tenant_list_handler(request: Request<Body>) -> Result<Response<Body>, A
let response_data = tokio::task::spawn_blocking(move || {
let _enter = info_span!("tenant_list").entered();
crate::tenant_mgr::list_tenants(&remote_index)
crate::tenant_mgr::list_tenant_info(&remote_index)
})
.await
.map_err(ApiError::from_err)?;
@@ -640,7 +640,8 @@ async fn tenant_config_handler(mut request: Request<Body>) -> Result<Response<Bo
tokio::task::spawn_blocking(move || {
let _enter = info_span!("tenant_config", tenant = ?tenant_id).entered();
tenant_mgr::update_tenant_config(tenant_conf, tenant_id)
let state = get_state(&request);
tenant_mgr::update_tenant_config(state.conf, tenant_conf, tenant_id)
})
.await
.map_err(ApiError::from_err)??;

View File

@@ -30,7 +30,7 @@ use std::sync::MutexGuard;
use std::sync::{Mutex, RwLock};
use std::time::{Duration, Instant};
use self::metadata::{metadata_path, TimelineMetadata};
use self::metadata::TimelineMetadata;
use crate::config::PageServerConf;
use crate::metrics::remove_tenant_metrics;
use crate::storage_sync::index::RemoteIndex;
@@ -299,6 +299,14 @@ impl Repository {
pitr: Duration,
checkpoint_before_gc: bool,
) -> Result<GcResult> {
let _guard = match self.file_lock.try_read() {
Ok(g) => g,
Err(_) => {
info!("File lock write acquired, shutting down GC");
return Ok(GcResult::default());
}
};
let timeline_str = target_timeline_id
.map(|x| x.to_string())
.unwrap_or_else(|| "-".to_string());
@@ -315,6 +323,14 @@ impl Repository {
/// Also it can be explicitly requested per timeline through page server
/// api's 'compact' command.
pub fn compaction_iteration(&self) -> Result<()> {
let _guard = match self.file_lock.try_read() {
Ok(g) => g,
Err(_) => {
info!("File lock write acquired, shutting down compaction");
return Ok(());
}
};
// Scan through the hashmap and collect a list of all the timelines,
// while holding the lock. Then drop the lock and actually perform the
// compactions. We don't want to block everything else while the
@@ -401,10 +417,10 @@ impl Repository {
pub fn init_attach_timelines(
&self,
timelines: Vec<(ZTimelineId, TimelineMetadata)>,
timelines: HashMap<ZTimelineId, TimelineMetadata>,
) -> anyhow::Result<()> {
let sorted_timelines = if timelines.len() == 1 {
timelines
timelines.into_iter().collect()
} else if !timelines.is_empty() {
tree_sort_timelines(timelines)?
} else {
@@ -442,7 +458,7 @@ impl Repository {
/// perform a topological sort, so that the parent of each timeline comes
/// before the children.
fn tree_sort_timelines(
timelines: Vec<(ZTimelineId, TimelineMetadata)>,
timelines: HashMap<ZTimelineId, TimelineMetadata>,
) -> Result<Vec<(ZTimelineId, TimelineMetadata)>> {
let mut result = Vec::with_capacity(timelines.len());
@@ -567,13 +583,8 @@ impl Repository {
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag)
}
pub fn update_tenant_config(&self, new_tenant_conf: TenantConfOpt) -> Result<()> {
let mut tenant_conf = self.tenant_conf.write().unwrap();
tenant_conf.update(&new_tenant_conf);
Repository::persist_tenant_config(self.conf, self.tenant_id, *tenant_conf)?;
Ok(())
pub fn update_tenant_config(&self, new_tenant_conf: TenantConfOpt) {
self.tenant_conf.write().unwrap().update(&new_tenant_conf);
}
fn initialize_new_timeline(
@@ -648,32 +659,37 @@ impl Repository {
tenant_id: ZTenantId,
) -> anyhow::Result<TenantConfOpt> {
let target_config_path = TenantConf::path(conf, tenant_id);
let target_config_display = target_config_path.display();
info!("load tenantconf from {}", target_config_path.display());
info!("loading tenantconf from {target_config_display}");
// FIXME If the config file is not found, assume that we're attaching
// a detached tenant and config is passed via attach command.
// https://github.com/neondatabase/neon/issues/1555
if !target_config_path.exists() {
info!(
"tenant config not found in {}",
target_config_path.display()
);
return Ok(Default::default());
info!("tenant config not found in {target_config_display}");
return Ok(TenantConfOpt::default());
}
// load and parse file
let config = fs::read_to_string(target_config_path)?;
let config = fs::read_to_string(&target_config_path).with_context(|| {
format!("Failed to load config from path '{target_config_display}'")
})?;
let toml = config.parse::<toml_edit::Document>()?;
let toml = config.parse::<toml_edit::Document>().with_context(|| {
format!("Failed to parse config from file '{target_config_display}' as toml file")
})?;
let mut tenant_conf: TenantConfOpt = Default::default();
let mut tenant_conf = TenantConfOpt::default();
for (key, item) in toml.iter() {
match key {
"tenant_config" => {
tenant_conf = PageServerConf::parse_toml_tenant_conf(item)?;
tenant_conf = PageServerConf::parse_toml_tenant_conf(item).with_context(|| {
format!("Failed to parse config from file '{target_config_display}' as pageserver config")
})?;
}
_ => bail!("unrecognized pageserver option '{}'", key),
_ => bail!("config file {target_config_display} has unrecognized pageserver option '{key}'"),
}
}
@@ -888,26 +904,6 @@ pub fn dump_layerfile_from_path(path: &Path, verbose: bool) -> Result<()> {
Ok(())
}
pub fn load_metadata(
conf: &'static PageServerConf,
timeline_id: ZTimelineId,
tenant_id: ZTenantId,
) -> anyhow::Result<TimelineMetadata> {
let metadata_path = metadata_path(conf, timeline_id, tenant_id);
let metadata_bytes = std::fs::read(&metadata_path).with_context(|| {
format!(
"Failed to read metadata bytes from path {}",
metadata_path.display()
)
})?;
TimelineMetadata::from_bytes(&metadata_bytes).with_context(|| {
format!(
"Failed to parse metadata bytes from path {}",
metadata_path.display()
)
})
}
#[cfg(test)]
pub mod repo_harness {
use bytes::{Bytes, BytesMut};
@@ -925,6 +921,7 @@ pub mod repo_harness {
walredo::{WalRedoError, WalRedoManager},
};
use super::metadata::metadata_path;
use super::*;
use crate::tenant_config::{TenantConf, TenantConfOpt};
use hex_literal::hex;
@@ -1030,7 +1027,7 @@ pub mod repo_harness {
false,
);
// populate repo with locally available timelines
let mut timelines_to_load = Vec::new();
let mut timelines_to_load = HashMap::new();
for timeline_dir_entry in fs::read_dir(self.conf.timelines_path(&self.tenant_id))
.expect("should be able to read timelines dir")
{
@@ -1042,7 +1039,7 @@ pub mod repo_harness {
.to_string_lossy()
.parse()?;
let timeline_metadata = load_metadata(self.conf, timeline_id, self.tenant_id)?;
timelines_to_load.push((timeline_id, timeline_metadata));
timelines_to_load.insert(timeline_id, timeline_metadata);
}
repo.init_attach_timelines(timelines_to_load)?;
@@ -1054,6 +1051,26 @@ pub mod repo_harness {
}
}
fn load_metadata(
conf: &'static PageServerConf,
timeline_id: ZTimelineId,
tenant_id: ZTenantId,
) -> anyhow::Result<TimelineMetadata> {
let metadata_path = metadata_path(conf, timeline_id, tenant_id);
let metadata_bytes = std::fs::read(&metadata_path).with_context(|| {
format!(
"Failed to read metadata bytes from path {}",
metadata_path.display()
)
})?;
TimelineMetadata::from_bytes(&metadata_bytes).with_context(|| {
format!(
"Failed to parse metadata bytes from path {}",
metadata_path.display()
)
})
}
// Mock WAL redo manager that doesn't do much
pub struct TestRedoManager;

View File

@@ -34,7 +34,7 @@ use crate::layered_repository::storage_layer::{
use crate::page_cache::{PageReadGuard, PAGE_SZ};
use crate::repository::{Key, Value, KEY_SIZE};
use crate::virtual_file::VirtualFile;
use crate::walrecord;
use crate::{walrecord, TEMP_FILE_SUFFIX};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use anyhow::{bail, ensure, Context, Result};
use rand::{distributions::Alphanumeric, Rng};
@@ -447,11 +447,12 @@ impl DeltaLayer {
.collect();
conf.timeline_path(&timelineid, &tenantid).join(format!(
"{}-XXX__{:016X}-{:016X}.{}.temp",
"{}-XXX__{:016X}-{:016X}.{}.{}",
key_start,
u64::from(lsn_range.start),
u64::from(lsn_range.end),
rand_string
rand_string,
TEMP_FILE_SUFFIX,
))
}

View File

@@ -30,7 +30,7 @@ use crate::layered_repository::storage_layer::{
use crate::page_cache::PAGE_SZ;
use crate::repository::{Key, Value, KEY_SIZE};
use crate::virtual_file::VirtualFile;
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
use anyhow::{bail, ensure, Context, Result};
use bytes::Bytes;
use hex;
@@ -255,7 +255,7 @@ impl ImageLayer {
.collect();
conf.timeline_path(&timelineid, &tenantid)
.join(format!("{}.{}.temp", fname, rand_string))
.join(format!("{fname}.{rand_string}.{TEMP_FILE_SUFFIX}"))
}
///

View File

@@ -23,7 +23,10 @@ pub mod walreceiver;
pub mod walrecord;
pub mod walredo;
use std::collections::HashMap;
use tracing::info;
use utils::zid::{ZTenantId, ZTimelineId};
use crate::thread_mgr::ThreadKind;
@@ -100,6 +103,50 @@ fn exponential_backoff_duration_seconds(n: u32, base_increment: f64, max_seconds
}
}
/// A newtype to store arbitrary data grouped by tenant and timeline ids.
/// One could use [`utils::zid::ZTenantTimelineId`] for grouping, but that would
/// not include the cases where a certain tenant has zero timelines.
/// This is sometimes important: a tenant could be registered during initial load from FS,
/// even if he has no timelines on disk.
#[derive(Debug)]
pub struct TenantTimelineValues<T>(HashMap<ZTenantId, HashMap<ZTimelineId, T>>);
impl<T> TenantTimelineValues<T> {
fn new() -> Self {
Self(HashMap::new())
}
fn with_capacity(capacity: usize) -> Self {
Self(HashMap::with_capacity(capacity))
}
/// A convenience method to map certain values and omit some of them, if needed.
/// Tenants that won't have any timeline entries due to the filtering, will still be preserved
/// in the structure.
fn filter_map<F, NewT>(self, map: F) -> TenantTimelineValues<NewT>
where
F: Fn(T) -> Option<NewT>,
{
let capacity = self.0.len();
self.0.into_iter().fold(
TenantTimelineValues::<NewT>::with_capacity(capacity),
|mut new_values, (tenant_id, old_values)| {
let new_timeline_values = new_values.0.entry(tenant_id).or_default();
for (timeline_id, old_value) in old_values {
if let Some(new_value) = map(old_value) {
new_timeline_values.insert(timeline_id, new_value);
}
}
new_values
},
)
}
}
/// A suffix to be used during file sync from the remote storage,
/// to ensure that we do not leave corrupted files that pretend to be layers.
const TEMP_FILE_SUFFIX: &str = "___temp";
#[cfg(test)]
mod backoff_defaults_tests {
use super::*;
@@ -130,3 +177,35 @@ mod backoff_defaults_tests {
);
}
}
#[cfg(test)]
mod tests {
use crate::layered_repository::repo_harness::TIMELINE_ID;
use super::*;
#[test]
fn tenant_timeline_value_mapping() {
let first_tenant = ZTenantId::generate();
let second_tenant = ZTenantId::generate();
assert_ne!(first_tenant, second_tenant);
let mut initial = TenantTimelineValues::new();
initial
.0
.entry(first_tenant)
.or_default()
.insert(TIMELINE_ID, "test_value");
let _ = initial.0.entry(second_tenant).or_default();
assert_eq!(initial.0.len(), 2, "Should have entries for both tenants");
let filtered = initial.filter_map(|_| None::<&str>).0;
assert_eq!(
filtered.len(),
2,
"Should have entries for both tenants even after filtering away all entries"
);
assert!(filtered.contains_key(&first_tenant));
assert!(filtered.contains_key(&second_tenant));
}
}

View File

@@ -145,7 +145,6 @@ mod upload;
use std::{
collections::{hash_map, HashMap, HashSet, VecDeque},
ffi::OsStr,
fmt::Debug,
num::{NonZeroU32, NonZeroUsize},
ops::ControlFlow,
@@ -170,244 +169,56 @@ use self::{
index::{IndexPart, RemoteTimeline, RemoteTimelineIndex},
upload::{upload_index_part, upload_timeline_layers, UploadedTimeline},
};
use crate::metrics::{IMAGE_SYNC_TIME, REMAINING_SYNC_ITEMS, REMOTE_INDEX_UPLOAD};
use crate::{
config::PageServerConf,
exponential_backoff,
layered_repository::{
ephemeral_file::is_ephemeral_file,
metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME},
},
storage_sync::{self, index::RemoteIndex},
tenant_mgr::attach_downloaded_tenants,
layered_repository::metadata::{metadata_path, TimelineMetadata},
storage_sync::index::RemoteIndex,
tenant_mgr::attach_local_tenants,
thread_mgr,
thread_mgr::ThreadKind,
};
use crate::{
metrics::{IMAGE_SYNC_TIME, REMAINING_SYNC_ITEMS, REMOTE_INDEX_UPLOAD},
TenantTimelineValues,
};
use utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId};
use self::download::download_index_parts;
pub use self::download::gather_tenant_timelines_index_parts;
pub use self::download::TEMP_DOWNLOAD_EXTENSION;
static SYNC_QUEUE: OnceCell<SyncQueue> = OnceCell::new();
/// A timeline status to share with pageserver's sync counterpart,
/// after comparing local and remote timeline state.
#[derive(Clone, Copy, Debug)]
#[derive(Clone)]
pub enum LocalTimelineInitStatus {
/// The timeline has every remote layer present locally.
/// There could be some layers requiring uploading,
/// but this does not block the timeline from any user interaction.
LocallyComplete,
LocallyComplete(TimelineMetadata),
/// A timeline has some files remotely, that are not present locally and need downloading.
/// Downloading might update timeline's metadata locally and current pageserver logic deals with local layers only,
/// so the data needs to be downloaded first before the timeline can be used.
NeedsSync,
}
type LocalTimelineInitStatuses = HashMap<ZTenantId, HashMap<ZTimelineId, LocalTimelineInitStatus>>;
impl std::fmt::Debug for LocalTimelineInitStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::LocallyComplete(_) => write!(f, "LocallyComplete"),
Self::NeedsSync => write!(f, "NeedsSync"),
}
}
}
/// A structure to combine all synchronization data to share with pageserver after a successful sync loop initialization.
/// Successful initialization includes a case when sync loop is not started, in which case the startup data is returned still,
/// to simplify the received code.
pub struct SyncStartupData {
pub remote_index: RemoteIndex,
pub local_timeline_init_statuses: LocalTimelineInitStatuses,
}
/// Based on the config, initiates the remote storage connection and starts a separate thread
/// that ensures that pageserver and the remote storage are in sync with each other.
/// If no external configuration connection given, no thread or storage initialization is done.
/// Along with that, scans tenant files local and remote (if the sync gets enabled) to check the initial timeline states.
pub fn start_local_timeline_sync(
config: &'static PageServerConf,
storage: Option<GenericRemoteStorage>,
) -> anyhow::Result<SyncStartupData> {
let local_timeline_files = local_tenant_timeline_files(config)
.context("Failed to collect local tenant timeline files")?;
match storage.zip(config.remote_storage_config.as_ref()) {
Some((storage, storage_config)) => storage_sync::spawn_storage_sync_thread(
config,
local_timeline_files,
storage,
storage_config.max_concurrent_syncs,
storage_config.max_sync_errors,
)
.context("Failed to spawn the storage sync thread"),
None => {
info!("No remote storage configured, skipping storage sync, considering all local timelines with correct metadata files enabled");
let mut local_timeline_init_statuses = LocalTimelineInitStatuses::new();
for (
ZTenantTimelineId {
tenant_id,
timeline_id,
},
_,
) in local_timeline_files
{
local_timeline_init_statuses
.entry(tenant_id)
.or_default()
.insert(timeline_id, LocalTimelineInitStatus::LocallyComplete);
}
Ok(SyncStartupData {
local_timeline_init_statuses,
remote_index: RemoteIndex::default(),
})
}
}
}
fn local_tenant_timeline_files(
config: &'static PageServerConf,
) -> anyhow::Result<HashMap<ZTenantTimelineId, (TimelineMetadata, HashSet<PathBuf>)>> {
let mut local_tenant_timeline_files = HashMap::new();
let tenants_dir = config.tenants_path();
for tenants_dir_entry in std::fs::read_dir(&tenants_dir)
.with_context(|| format!("Failed to list tenants dir {}", tenants_dir.display()))?
{
match &tenants_dir_entry {
Ok(tenants_dir_entry) => {
match collect_timelines_for_tenant(config, &tenants_dir_entry.path()) {
Ok(collected_files) => {
local_tenant_timeline_files.extend(collected_files.into_iter())
}
Err(e) => error!(
"Failed to collect tenant files from dir '{}' for entry {:?}, reason: {:#}",
tenants_dir.display(),
tenants_dir_entry,
e
),
}
}
Err(e) => error!(
"Failed to list tenants dir entry {:?} in directory {}, reason: {:?}",
tenants_dir_entry,
tenants_dir.display(),
e
),
}
}
Ok(local_tenant_timeline_files)
}
fn collect_timelines_for_tenant(
config: &'static PageServerConf,
tenant_path: &Path,
) -> anyhow::Result<HashMap<ZTenantTimelineId, (TimelineMetadata, HashSet<PathBuf>)>> {
let mut timelines = HashMap::new();
let tenant_id = tenant_path
.file_name()
.and_then(OsStr::to_str)
.unwrap_or_default()
.parse::<ZTenantId>()
.context("Could not parse tenant id out of the tenant dir name")?;
let timelines_dir = config.timelines_path(&tenant_id);
for timelines_dir_entry in std::fs::read_dir(&timelines_dir).with_context(|| {
format!(
"Failed to list timelines dir entry for tenant {}",
tenant_id
)
})? {
match timelines_dir_entry {
Ok(timelines_dir_entry) => {
let timeline_path = timelines_dir_entry.path();
match collect_timeline_files(&timeline_path) {
Ok((timeline_id, metadata, timeline_files)) => {
timelines.insert(
ZTenantTimelineId {
tenant_id,
timeline_id,
},
(metadata, timeline_files),
);
}
Err(e) => error!(
"Failed to process timeline dir contents at '{}', reason: {:?}",
timeline_path.display(),
e
),
}
}
Err(e) => error!(
"Failed to list timelines for entry tenant {}, reason: {:?}",
tenant_id, e
),
}
}
Ok(timelines)
}
// discover timeline files and extract timeline metadata
// NOTE: ephemeral files are excluded from the list
fn collect_timeline_files(
timeline_dir: &Path,
) -> anyhow::Result<(ZTimelineId, TimelineMetadata, HashSet<PathBuf>)> {
let mut timeline_files = HashSet::new();
let mut timeline_metadata_path = None;
let timeline_id = timeline_dir
.file_name()
.and_then(OsStr::to_str)
.unwrap_or_default()
.parse::<ZTimelineId>()
.context("Could not parse timeline id out of the timeline dir name")?;
let timeline_dir_entries =
std::fs::read_dir(&timeline_dir).context("Failed to list timeline dir contents")?;
for entry in timeline_dir_entries {
let entry_path = entry.context("Failed to list timeline dir entry")?.path();
if entry_path.is_file() {
if entry_path.file_name().and_then(OsStr::to_str) == Some(METADATA_FILE_NAME) {
timeline_metadata_path = Some(entry_path);
} else if is_ephemeral_file(&entry_path.file_name().unwrap().to_string_lossy()) {
debug!("skipping ephemeral file {}", entry_path.display());
continue;
} else if entry_path.extension().and_then(OsStr::to_str)
== Some(TEMP_DOWNLOAD_EXTENSION)
{
info!("removing temp download file at {}", entry_path.display());
std::fs::remove_file(&entry_path).with_context(|| {
format!(
"failed to remove temp download file at {}",
entry_path.display()
)
})?;
} else if entry_path.extension().and_then(OsStr::to_str) == Some("temp") {
info!("removing temp layer file at {}", entry_path.display());
std::fs::remove_file(&entry_path).with_context(|| {
format!(
"failed to remove temp layer file at {}",
entry_path.display()
)
})?;
} else {
timeline_files.insert(entry_path);
}
}
}
// FIXME (rodionov) if attach call succeeded, and then pageserver is restarted before download is completed
// then attach is lost. There would be no retries for that,
// initial collect will fail because there is no metadata.
// We either need to start download if we see empty dir after restart or attach caller should
// be aware of that and retry attach if awaits_download for timeline switched from true to false
// but timelinne didn't appear locally.
// Check what happens with remote index in that case.
let timeline_metadata_path = match timeline_metadata_path {
Some(path) => path,
None => bail!("No metadata file found in the timeline directory"),
};
let metadata = TimelineMetadata::from_bytes(
&std::fs::read(&timeline_metadata_path).context("Failed to read timeline metadata file")?,
)
.context("Failed to parse timeline metadata file bytes")?;
Ok((timeline_id, metadata, timeline_files))
pub local_timeline_init_statuses: TenantTimelineValues<LocalTimelineInitStatus>,
}
/// Global queue of sync tasks.
@@ -763,9 +574,9 @@ pub fn schedule_layer_download(tenant_id: ZTenantId, timeline_id: ZTimelineId) {
/// Launch a thread to perform remote storage sync tasks.
/// See module docs for loop step description.
pub(super) fn spawn_storage_sync_thread(
pub fn spawn_storage_sync_thread(
conf: &'static PageServerConf,
local_timeline_files: HashMap<ZTenantTimelineId, (TimelineMetadata, HashSet<PathBuf>)>,
local_timeline_files: TenantTimelineValues<(TimelineMetadata, HashSet<PathBuf>)>,
storage: GenericRemoteStorage,
max_concurrent_timelines_sync: NonZeroUsize,
max_sync_errors: NonZeroU32,
@@ -784,19 +595,43 @@ pub(super) fn spawn_storage_sync_thread(
.build()
.context("Failed to create storage sync runtime")?;
// TODO we are able to "attach" empty tenants, but not doing it now since it might require big wait time:
// * we need to list every timeline for tenant on S3, that might be a costly operation
// * we need to download every timeline for the tenant, to activate it in memory
//
// When on-demand download gets merged, we're able to do this fast by storing timeline metadata only.
let mut empty_tenants = TenantTimelineValues::<LocalTimelineInitStatus>::new();
let mut keys_for_index_part_downloads = HashSet::new();
let mut timelines_to_sync = HashMap::new();
for (tenant_id, timeline_data) in local_timeline_files.0 {
if timeline_data.is_empty() {
let _ = empty_tenants.0.entry(tenant_id).or_default();
} else {
for (timeline_id, timeline_data) in timeline_data {
let id = ZTenantTimelineId::new(tenant_id, timeline_id);
keys_for_index_part_downloads.insert(id);
timelines_to_sync.insert(id, timeline_data);
}
}
}
let applicable_index_parts = runtime.block_on(download_index_parts(
conf,
&storage,
local_timeline_files.keys().copied().collect(),
keys_for_index_part_downloads,
));
let remote_index = RemoteIndex::from_parts(conf, applicable_index_parts)?;
let local_timeline_init_statuses = schedule_first_sync_tasks(
let mut local_timeline_init_statuses = schedule_first_sync_tasks(
&mut runtime.block_on(remote_index.write()),
sync_queue,
local_timeline_files,
timelines_to_sync,
);
local_timeline_init_statuses
.0
.extend(empty_tenants.0.into_iter());
let remote_index_clone = remote_index.clone();
thread_mgr::spawn(
@@ -872,10 +707,7 @@ fn storage_sync_loop(
"Sync loop step completed, {} new tenant state update(s)",
updated_tenants.len()
);
let mut timelines_to_attach: HashMap<
ZTenantId,
Vec<(ZTimelineId, TimelineMetadata)>,
> = HashMap::new();
let mut timelines_to_attach = TenantTimelineValues::new();
let index_accessor = runtime.block_on(index.read());
for tenant_id in updated_tenants {
let tenant_entry = match index_accessor.tenant_entry(&tenant_id) {
@@ -901,7 +733,7 @@ fn storage_sync_loop(
// and register them all at once in a repository for download
// to be submitted in a single operation to repository
// so it can apply them at once to internal timeline map.
timelines_to_attach.insert(
timelines_to_attach.0.insert(
tenant_id,
tenant_entry
.iter()
@@ -912,7 +744,9 @@ fn storage_sync_loop(
}
drop(index_accessor);
// Batch timeline download registration to ensure that the external registration code won't block any running tasks before.
attach_downloaded_tenants(conf, &index, timelines_to_attach);
if let Err(e) = attach_local_tenants(conf, &index, timelines_to_attach) {
error!("Failed to attach new timelines: {e:?}");
};
}
}
ControlFlow::Break(()) => {
@@ -1443,11 +1277,10 @@ fn schedule_first_sync_tasks(
index: &mut RemoteTimelineIndex,
sync_queue: &SyncQueue,
local_timeline_files: HashMap<ZTenantTimelineId, (TimelineMetadata, HashSet<PathBuf>)>,
) -> LocalTimelineInitStatuses {
let mut local_timeline_init_statuses = LocalTimelineInitStatuses::new();
) -> TenantTimelineValues<LocalTimelineInitStatus> {
let mut local_timeline_init_statuses = TenantTimelineValues::new();
let mut new_sync_tasks =
VecDeque::with_capacity(local_timeline_files.len().max(local_timeline_files.len()));
let mut new_sync_tasks = VecDeque::with_capacity(local_timeline_files.len());
for (sync_id, (local_metadata, local_files)) in local_timeline_files {
match index.timeline_entry_mut(&sync_id) {
@@ -1459,18 +1292,27 @@ fn schedule_first_sync_tasks(
local_files,
remote_timeline,
);
let was_there = local_timeline_init_statuses
match local_timeline_init_statuses
.0
.entry(sync_id.tenant_id)
.or_default()
.insert(sync_id.timeline_id, timeline_status);
if was_there.is_some() {
// defensive check
warn!(
"Overwriting timeline init sync status. Status {timeline_status:?}, timeline {}",
sync_id.timeline_id
);
.entry(sync_id.timeline_id)
{
hash_map::Entry::Occupied(mut o) => {
{
// defensive check
warn!(
"Overwriting timeline init sync status. Status {timeline_status:?}, timeline {}",
sync_id.timeline_id
);
}
o.insert(timeline_status);
}
hash_map::Entry::Vacant(v) => {
v.insert(timeline_status);
}
}
remote_timeline.awaits_download = awaits_download;
}
None => {
@@ -1481,15 +1323,16 @@ fn schedule_first_sync_tasks(
SyncTask::upload(LayersUpload {
layers_to_upload: local_files,
uploaded_layers: HashSet::new(),
metadata: Some(local_metadata),
metadata: Some(local_metadata.clone()),
}),
));
local_timeline_init_statuses
.0
.entry(sync_id.tenant_id)
.or_default()
.insert(
sync_id.timeline_id,
LocalTimelineInitStatus::LocallyComplete,
LocalTimelineInitStatus::LocallyComplete(local_metadata),
);
}
}
@@ -1523,7 +1366,10 @@ fn compare_local_and_remote_timeline(
// we do not need to manipulate with remote consistent lsn here
// because it will be updated when sync will be completed
} else {
(LocalTimelineInitStatus::LocallyComplete, false)
(
LocalTimelineInitStatus::LocallyComplete(local_metadata.clone()),
false,
)
};
let layers_to_upload = local_files

View File

@@ -18,6 +18,7 @@ use tracing::{debug, error, info, warn};
use crate::{
config::PageServerConf, layered_repository::metadata::metadata_path, storage_sync::SyncTask,
TEMP_FILE_SUFFIX,
};
use utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId};
@@ -26,8 +27,6 @@ use super::{
LayersDownload, SyncData, SyncQueue,
};
pub const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
// We collect timelines remotely available for each tenant
// in case we failed to gather all index parts (due to an error)
// Poisoned variant is returned.
@@ -251,7 +250,7 @@ pub(super) async fn download_timeline_layers<'a>(
// https://www.postgresql.org/message-id/56583BDD.9060302@2ndquadrant.com
// If pageserver crashes the temp file will be deleted on startup and re-downloaded.
let temp_file_path =
path_with_suffix_extension(&layer_destination_path, TEMP_DOWNLOAD_EXTENSION);
path_with_suffix_extension(&layer_destination_path, TEMP_FILE_SUFFIX);
let mut destination_file =
fs::File::create(&temp_file_path).await.with_context(|| {

View File

@@ -3,24 +3,26 @@
use crate::config::PageServerConf;
use crate::http::models::TenantInfo;
use crate::layered_repository::metadata::TimelineMetadata;
use crate::layered_repository::{load_metadata, Repository, Timeline};
use crate::layered_repository::ephemeral_file::is_ephemeral_file;
use crate::layered_repository::metadata::{TimelineMetadata, METADATA_FILE_NAME};
use crate::layered_repository::{Repository, Timeline};
use crate::storage_sync::index::{RemoteIndex, RemoteTimelineIndex};
use crate::storage_sync::{self, LocalTimelineInitStatus, SyncStartupData};
use crate::tenant_config::TenantConfOpt;
use crate::thread_mgr::ThreadKind;
use crate::walredo::PostgresRedoManager;
use crate::{thread_mgr, timelines, walreceiver};
use crate::{thread_mgr, timelines, walreceiver, TenantTimelineValues, TEMP_FILE_SUFFIX};
use anyhow::Context;
use remote_storage::GenericRemoteStorage;
use serde::{Deserialize, Serialize};
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::collections::hash_map::{self, Entry};
use std::collections::{HashMap, HashSet};
use std::ffi::OsStr;
use std::fmt;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::sync::mpsc;
use tracing::*;
use utils::lsn::Lsn;
pub use tenants_state::try_send_timeline_update;
use utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId};
@@ -136,34 +138,49 @@ pub fn init_tenant_mgr(
conf: &'static PageServerConf,
remote_storage: Option<GenericRemoteStorage>,
) -> anyhow::Result<RemoteIndex> {
let _entered = info_span!("init_tenant_mgr").entered();
let (timeline_updates_sender, timeline_updates_receiver) =
mpsc::unbounded_channel::<LocalTimelineUpdate>();
tenants_state::set_timeline_update_sender(timeline_updates_sender)?;
walreceiver::init_wal_receiver_main_thread(conf, timeline_updates_receiver)?;
let SyncStartupData {
remote_index,
local_timeline_init_statuses,
} = storage_sync::start_local_timeline_sync(conf, remote_storage)
.context("Failed to set up local files sync with external storage")?;
let local_tenant_files = local_tenant_timeline_files(conf)
.context("Failed to collect local tenant timeline files")?;
for (tenant_id, local_timeline_init_statuses) in local_timeline_init_statuses {
if let Err(err) =
init_local_repository(conf, tenant_id, local_timeline_init_statuses, &remote_index)
{
// Report the error, but continue with the startup for other tenants. An error
// loading a tenant is serious, but it's better to complete the startup and
// serve other tenants, than fail completely.
error!("Failed to initialize local tenant {tenant_id}: {:?}", err);
let (remote_index, tenants_to_attach) = if let Some(storage) = remote_storage {
let storage_config = conf
.remote_storage_config
.as_ref()
.expect("remote storage without config");
if let Err(err) = set_tenant_state(tenant_id, TenantState::Broken) {
error!(
"Failed to set tenant state to broken {tenant_id}: {:?}",
err
);
}
}
}
let SyncStartupData {
remote_index,
local_timeline_init_statuses,
} = storage_sync::spawn_storage_sync_thread(
conf,
local_tenant_files,
storage,
storage_config.max_concurrent_syncs,
storage_config.max_sync_errors,
)
.context("Failed to spawn the storage sync thread")?;
(
remote_index,
local_timeline_init_statuses.filter_map(|init_status| match init_status {
LocalTimelineInitStatus::LocallyComplete(metadata) => Some(metadata),
LocalTimelineInitStatus::NeedsSync => None,
}),
)
} else {
info!("No remote storage configured, skipping storage sync, considering all local timelines with correct metadata files enabled");
(
RemoteIndex::default(),
local_tenant_files.filter_map(|(metadata, _)| Some(metadata)),
)
};
attach_local_tenants(conf, &remote_index, tenants_to_attach)?;
Ok(remote_index)
}
@@ -189,35 +206,69 @@ impl std::fmt::Debug for LocalTimelineUpdate {
}
}
/// Updates tenants' repositories, changing their timelines state in memory.
pub fn attach_downloaded_tenants(
/// Reads local files to load tenants and their timelines given into pageserver's memory.
/// Ignores other timelines that might be present for tenant, but were not passed as a parameter.
/// Attempts to load as many entites as possible: if a certain timeline fails during the load, the tenant is marked as "Broken",
/// and the load continues.
pub fn attach_local_tenants(
conf: &'static PageServerConf,
remote_index: &RemoteIndex,
sync_status_updates: HashMap<ZTenantId, Vec<(ZTimelineId, TimelineMetadata)>>,
) {
if sync_status_updates.is_empty() {
debug!("No sync status updates to apply");
return;
}
for (tenant_id, downloaded_timelines) in sync_status_updates {
info!(
"Registering downlloaded timelines for {tenant_id} {} timelines",
downloaded_timelines.len()
);
debug!("Downloaded timelines: {downloaded_timelines:?}");
tenants_to_attach: TenantTimelineValues<TimelineMetadata>,
) -> anyhow::Result<()> {
let _entered = info_span!("attach_local_tenants").entered();
let number_of_tenants = tenants_to_attach.0.len();
let repo = match load_local_repo(conf, tenant_id, remote_index) {
Ok(repo) => repo,
Err(e) => {
error!("Failed to load repo for tenant {tenant_id} Error: {e:?}");
continue;
for (tenant_id, local_timelines) in tenants_to_attach.0 {
info!(
"Attaching {} timelines for {tenant_id}",
local_timelines.len()
);
debug!("Timelines to attach: {local_timelines:?}");
let repository = load_local_repo(conf, tenant_id, remote_index)
.context("Failed to load repository for tenant")?;
let repo = Arc::clone(&repository);
{
match tenants_state::write_tenants().entry(tenant_id) {
hash_map::Entry::Occupied(_) => {
anyhow::bail!("Cannot attach tenant {tenant_id}: there's already an entry in the tenant state");
}
hash_map::Entry::Vacant(v) => {
v.insert(Tenant {
state: TenantState::Idle,
repo,
});
}
}
};
match repo.init_attach_timelines(downloaded_timelines) {
Ok(()) => info!("successfully loaded local timelines for tenant {tenant_id}"),
Err(e) => error!("Failed to load local timelines for tenant {tenant_id}: {e:?}"),
}
// XXX: current timeline init enables walreceiver that looks for tenant in the state, so insert the tenant entry before
repository
.init_attach_timelines(local_timelines)
.context("Failed to attach timelines for tenant")?;
}
info!("Processed {number_of_tenants} local tenants during attach");
Ok(())
}
fn load_local_repo(
conf: &'static PageServerConf,
tenant_id: ZTenantId,
remote_index: &RemoteIndex,
) -> anyhow::Result<Arc<Repository>> {
let repository = Repository::new(
conf,
TenantConfOpt::default(),
Arc::new(PostgresRedoManager::new(conf, tenant_id)),
tenant_id,
remote_index.clone(),
conf.remote_storage_config.is_some(),
);
let tenant_conf = Repository::load_tenant_config(conf, tenant_id)?;
repository.update_tenant_config(tenant_conf);
Ok(Arc::new(repository))
}
///
@@ -293,13 +344,14 @@ pub fn create_tenant_repository(
}
pub fn update_tenant_config(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
tenant_id: ZTenantId,
) -> anyhow::Result<()> {
info!("configuring tenant {tenant_id}");
let repo = get_repository_for_tenant(tenant_id)?;
get_repository_for_tenant(tenant_id)?.update_tenant_config(tenant_conf);
repo.update_tenant_config(tenant_conf)?;
Repository::persist_tenant_config(conf, tenant_id, tenant_conf)?;
Ok(())
}
@@ -392,7 +444,7 @@ pub fn delete_timeline(tenant_id: ZTenantId, timeline_id: ZTimelineId) -> anyhow
debug!("waiting for threads to shutdown");
thread_mgr::shutdown_threads(None, None, Some(timeline_id));
debug!("thread shutdown completed");
match tenants_state::write_tenants().get_mut(&tenant_id) {
match tenants_state::read_tenants().get(&tenant_id) {
Some(tenant) => tenant.repo.delete_timeline(timeline_id)?,
None => anyhow::bail!("Tenant {tenant_id} not found in local tenant state"),
}
@@ -428,12 +480,10 @@ pub fn detach_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> any
// need to use crossbeam-channel
for (timeline_id, join_handle) in walreceiver_join_handles {
info!("waiting for wal receiver to shutdown timeline_id {timeline_id}");
join_handle.recv().context("failed to join walreceiver")?;
join_handle.recv().ok();
info!("wal receiver shutdown confirmed timeline_id {timeline_id}");
}
tenants_state::write_tenants().remove(&tenant_id);
// If removal fails there will be no way to successfully retry detach,
// because the tenant no longer exists in the in-memory map. And it needs to be removed from it
// before we remove files, because it contains references to repository
@@ -443,7 +493,7 @@ pub fn detach_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> any
let local_tenant_directory = conf.tenant_path(&tenant_id);
std::fs::remove_dir_all(&local_tenant_directory).with_context(|| {
format!(
"Failed to remove local timeline directory '{}'",
"Failed to remove local tenant directory '{}'",
local_tenant_directory.display()
)
})?;
@@ -454,7 +504,7 @@ pub fn detach_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> any
///
/// Get list of tenants, for the mgmt API
///
pub fn list_tenants(remote_index: &RemoteTimelineIndex) -> Vec<TenantInfo> {
pub fn list_tenant_info(remote_index: &RemoteTimelineIndex) -> Vec<TenantInfo> {
tenants_state::read_tenants()
.iter()
.map(|(id, tenant)| {
@@ -478,98 +528,248 @@ pub fn list_tenants(remote_index: &RemoteTimelineIndex) -> Vec<TenantInfo> {
.collect()
}
/// Check if a given timeline is "broken" \[1\].
/// The function returns an error if the timeline is "broken".
///
/// \[1\]: it's not clear now how should we classify a timeline as broken.
/// A timeline is categorized as broken when any of following conditions is true:
/// - failed to load the timeline's metadata
/// - the timeline's disk consistent LSN is zero
fn check_broken_timeline(
conf: &'static PageServerConf,
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
) -> anyhow::Result<TimelineMetadata> {
let metadata =
load_metadata(conf, timeline_id, tenant_id).context("failed to load metadata")?;
/// Attempts to collect information about all tenant and timelines, existing on the local FS.
/// If finds any, deletes all temporary files and directories, created before. Also removes empty directories,
/// that may appear due to such removals.
/// Does not fail on particular timeline or tenant collection errors, rather logging them and ignoring the entities.
fn local_tenant_timeline_files(
config: &'static PageServerConf,
) -> anyhow::Result<TenantTimelineValues<(TimelineMetadata, HashSet<PathBuf>)>> {
let _entered = info_span!("local_tenant_timeline_files").entered();
// A timeline with zero disk consistent LSN can happen when the page server
// failed to checkpoint the timeline import data when creating that timeline.
if metadata.disk_consistent_lsn() == Lsn::INVALID {
anyhow::bail!("Timeline {timeline_id} has a zero disk consistent LSN.");
let mut local_tenant_timeline_files = TenantTimelineValues::new();
let tenants_dir = config.tenants_path();
for tenants_dir_entry in std::fs::read_dir(&tenants_dir)
.with_context(|| format!("Failed to list tenants dir {}", tenants_dir.display()))?
{
match &tenants_dir_entry {
Ok(tenants_dir_entry) => {
let tenant_dir_path = tenants_dir_entry.path();
if is_temporary(&tenant_dir_path) {
info!(
"Found temporary tenant directory, removing: {}",
tenant_dir_path.display()
);
if let Err(e) = std::fs::remove_dir_all(&tenant_dir_path) {
error!(
"Failed to remove temporary directory '{}': {:?}",
tenant_dir_path.display(),
e
);
}
} else {
match collect_timelines_for_tenant(config, &tenant_dir_path) {
Ok((tenant_id, collected_files)) => {
if collected_files.is_empty() {
match remove_if_empty(&tenant_dir_path) {
Ok(true) => info!("Removed empty tenant directory {}", tenant_dir_path.display()),
Ok(false) => {
// insert empty timeline entry: it has some non-temporary files inside that we cannot remove
// so make obvious for HTTP API callers, that something exists there and try to load the tenant
let _ = local_tenant_timeline_files.0.entry(tenant_id).or_default();
},
Err(e) => error!("Failed to remove empty tenant directory: {e:?}"),
}
} else {
local_tenant_timeline_files.0.entry(tenant_id).or_default().extend(collected_files.into_iter())
}
},
Err(e) => error!(
"Failed to collect tenant files from dir '{}' for entry {:?}, reason: {:#}",
tenants_dir.display(),
tenants_dir_entry,
e
),
}
}
}
Err(e) => error!(
"Failed to list tenants dir entry {:?} in directory {}, reason: {:?}",
tenants_dir_entry,
tenants_dir.display(),
e
),
}
}
Ok(metadata)
info!(
"Collected files for {} tenants",
local_tenant_timeline_files.0.len()
);
Ok(local_tenant_timeline_files)
}
/// Note: all timelines are attached at once if and only if all of them are locally complete
fn init_local_repository(
conf: &'static PageServerConf,
tenant_id: ZTenantId,
local_timeline_init_statuses: HashMap<ZTimelineId, LocalTimelineInitStatus>,
remote_index: &RemoteIndex,
) -> anyhow::Result<(), anyhow::Error> {
let mut timelines_to_attach = Vec::new();
for (timeline_id, init_status) in local_timeline_init_statuses {
match init_status {
LocalTimelineInitStatus::LocallyComplete => {
debug!("timeline {timeline_id} for tenant {tenant_id} is locally complete, registering it in repository");
let metadata = check_broken_timeline(conf, tenant_id, timeline_id)
.context("found broken timeline")?;
timelines_to_attach.push((timeline_id, metadata));
fn remove_if_empty(tenant_dir_path: &Path) -> anyhow::Result<bool> {
let directory_is_empty = tenant_dir_path
.read_dir()
.with_context(|| {
format!(
"Failed to read directory '{}' contents",
tenant_dir_path.display()
)
})?
.next()
.is_none();
if directory_is_empty {
std::fs::remove_dir_all(&tenant_dir_path).with_context(|| {
format!(
"Failed to remove empty directory '{}'",
tenant_dir_path.display(),
)
})?;
Ok(true)
} else {
Ok(false)
}
}
fn is_temporary(path: &Path) -> bool {
match path.file_name() {
Some(name) => name.to_string_lossy().ends_with(TEMP_FILE_SUFFIX),
None => false,
}
}
#[allow(clippy::type_complexity)]
fn collect_timelines_for_tenant(
config: &'static PageServerConf,
tenant_path: &Path,
) -> anyhow::Result<(
ZTenantId,
HashMap<ZTimelineId, (TimelineMetadata, HashSet<PathBuf>)>,
)> {
let tenant_id = tenant_path
.file_name()
.and_then(OsStr::to_str)
.unwrap_or_default()
.parse::<ZTenantId>()
.context("Could not parse tenant id out of the tenant dir name")?;
let timelines_dir = config.timelines_path(&tenant_id);
let mut tenant_timelines = HashMap::new();
for timelines_dir_entry in std::fs::read_dir(&timelines_dir)
.with_context(|| format!("Failed to list timelines dir entry for tenant {tenant_id}"))?
{
match timelines_dir_entry {
Ok(timelines_dir_entry) => {
let timeline_dir = timelines_dir_entry.path();
if is_temporary(&timeline_dir) {
info!(
"Found temporary timeline directory, removing: {}",
timeline_dir.display()
);
if let Err(e) = std::fs::remove_dir_all(&timeline_dir) {
error!(
"Failed to remove temporary directory '{}': {:?}",
timeline_dir.display(),
e
);
}
} else {
match collect_timeline_files(&timeline_dir) {
Ok((timeline_id, metadata, timeline_files)) => {
tenant_timelines.insert(timeline_id, (metadata, timeline_files));
}
Err(e) => {
error!(
"Failed to process timeline dir contents at '{}', reason: {:?}",
timeline_dir.display(),
e
);
match remove_if_empty(&timeline_dir) {
Ok(true) => info!(
"Removed empty timeline directory {}",
timeline_dir.display()
),
Ok(false) => (),
Err(e) => {
error!("Failed to remove empty timeline directory: {e:?}")
}
}
}
}
}
}
LocalTimelineInitStatus::NeedsSync => {
debug!(
"timeline {tenant_id} for tenant {timeline_id} needs sync, \
so skipped for adding into repository until sync is finished"
);
return Ok(());
Err(e) => {
error!("Failed to list timelines for entry tenant {tenant_id}, reason: {e:?}")
}
}
}
// initialize local tenant
let repo = load_local_repo(conf, tenant_id, remote_index)
.with_context(|| format!("Failed to load repo for tenant {tenant_id}"))?;
// Lets fail here loudly to be on the safe side.
// XXX: It may be a better api to actually distinguish between repository startup
// and processing of newly downloaded timelines.
repo.init_attach_timelines(timelines_to_attach)
.with_context(|| format!("Failed to init local timelines for tenant {tenant_id}"))?;
Ok(())
}
// Sets up wal redo manager and repository for tenant. Reduces code duplication.
// Used during pageserver startup, or when new tenant is attached to pageserver.
fn load_local_repo(
conf: &'static PageServerConf,
tenant_id: ZTenantId,
remote_index: &RemoteIndex,
) -> anyhow::Result<Arc<Repository>> {
let mut m = tenants_state::write_tenants();
let tenant = m.entry(tenant_id).or_insert_with(|| {
// Set up a WAL redo manager, for applying WAL records.
let walredo_mgr = PostgresRedoManager::new(conf, tenant_id);
// Set up an object repository, for actual data storage.
let repo: Arc<Repository> = Arc::new(Repository::new(
conf,
TenantConfOpt::default(),
Arc::new(walredo_mgr),
tenant_id,
remote_index.clone(),
conf.remote_storage_config.is_some(),
));
Tenant {
state: TenantState::Idle,
repo,
if tenant_timelines.is_empty() {
match remove_if_empty(&timelines_dir) {
Ok(true) => info!(
"Removed empty tenant timelines directory {}",
timelines_dir.display()
),
Ok(false) => (),
Err(e) => error!("Failed to remove empty tenant timelines directory: {e:?}"),
}
});
}
// Restore tenant config
let tenant_conf = Repository::load_tenant_config(conf, tenant_id)?;
tenant.repo.update_tenant_config(tenant_conf)?;
Ok(Arc::clone(&tenant.repo))
Ok((tenant_id, tenant_timelines))
}
// discover timeline files and extract timeline metadata
// NOTE: ephemeral files are excluded from the list
fn collect_timeline_files(
timeline_dir: &Path,
) -> anyhow::Result<(ZTimelineId, TimelineMetadata, HashSet<PathBuf>)> {
let mut timeline_files = HashSet::new();
let mut timeline_metadata_path = None;
let timeline_id = timeline_dir
.file_name()
.and_then(OsStr::to_str)
.unwrap_or_default()
.parse::<ZTimelineId>()
.context("Could not parse timeline id out of the timeline dir name")?;
let timeline_dir_entries =
std::fs::read_dir(&timeline_dir).context("Failed to list timeline dir contents")?;
for entry in timeline_dir_entries {
let entry_path = entry.context("Failed to list timeline dir entry")?.path();
if entry_path.is_file() {
if entry_path.file_name().and_then(OsStr::to_str) == Some(METADATA_FILE_NAME) {
timeline_metadata_path = Some(entry_path);
} else if is_ephemeral_file(&entry_path.file_name().unwrap().to_string_lossy()) {
debug!("skipping ephemeral file {}", entry_path.display());
continue;
} else if is_temporary(&entry_path) {
info!("removing temp timeline file at {}", entry_path.display());
std::fs::remove_file(&entry_path).with_context(|| {
format!(
"failed to remove temp download file at {}",
entry_path.display()
)
})?;
} else {
timeline_files.insert(entry_path);
}
}
}
// FIXME (rodionov) if attach call succeeded, and then pageserver is restarted before download is completed
// then attach is lost. There would be no retries for that,
// initial collect will fail because there is no metadata.
// We either need to start download if we see empty dir after restart or attach caller should
// be aware of that and retry attach if awaits_download for timeline switched from true to false
// but timelinne didn't appear locally.
// Check what happens with remote index in that case.
let timeline_metadata_path = match timeline_metadata_path {
Some(path) => path,
None => anyhow::bail!("No metadata file found in the timeline directory"),
};
let metadata = TimelineMetadata::from_bytes(
&std::fs::read(&timeline_metadata_path).context("Failed to read timeline metadata file")?,
)
.context("Failed to parse timeline metadata file bytes")?;
anyhow::ensure!(
metadata.ancestor_timeline().is_some() || !timeline_files.is_empty(),
"Timeline has no ancestor and no layer files"
);
Ok((timeline_id, metadata, timeline_files))
}

View File

@@ -34,11 +34,6 @@ async fn compaction_loop(tenantid: ZTenantId, mut cancel: watch::Receiver<()>) {
// Break if we're not allowed to write to disk
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
// TODO do this inside repo.compaction_iteration instead.
let _guard = match repo.file_lock.try_read() {
Ok(g) => g,
Err(_) => return Ok(ControlFlow::Break(())),
};
// Run compaction
let compaction_period = repo.get_compaction_period();
@@ -233,11 +228,6 @@ async fn gc_loop(tenantid: ZTenantId, mut cancel: watch::Receiver<()>) {
// Break if we're not allowed to write to disk
let repo = tenant_mgr::get_repository_for_tenant(tenantid)?;
// TODO do this inside repo.gc_iteration instead.
let _guard = match repo.file_lock.try_read() {
Ok(g) => g,
Err(_) => return Ok(ControlFlow::Break(())),
};
// Run gc
let gc_period = repo.get_gc_period();

View File

@@ -3,6 +3,7 @@
//
use anyhow::{bail, ensure, Context, Result};
use remote_storage::path_with_suffix_extension;
use std::{
fs,
@@ -18,12 +19,12 @@ use utils::{
zid::{ZTenantId, ZTimelineId},
};
use crate::import_datadir;
use crate::tenant_mgr;
use crate::CheckpointConfig;
use crate::{
config::PageServerConf, storage_sync::index::RemoteIndex, tenant_config::TenantConfOpt,
};
use crate::{import_datadir, TEMP_FILE_SUFFIX};
use crate::{
layered_repository::{Repository, Timeline},
walredo::WalRedoManager,
@@ -105,13 +106,17 @@ fn run_initdb(conf: &'static PageServerConf, initdbpath: &Path) -> Result<()> {
//
fn bootstrap_timeline(
conf: &'static PageServerConf,
tenantid: ZTenantId,
tli: ZTimelineId,
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
repo: &Repository,
) -> Result<Arc<Timeline>> {
let initdb_path = conf
.tenant_path(&tenantid)
.join(format!("tmp-timeline-{}", tli));
// create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/`
// temporary directory for basebackup files for the given timeline.
let initdb_path = path_with_suffix_extension(
conf.timelines_path(&tenant_id)
.join(format!("basebackup-{timeline_id}")),
TEMP_FILE_SUFFIX,
);
// Init temporarily repo to get bootstrap data
run_initdb(conf, &initdb_path)?;
@@ -123,7 +128,7 @@ fn bootstrap_timeline(
// LSN, and any WAL after that.
// Initdb lsn will be equal to last_record_lsn which will be set after import.
// Because we know it upfront avoid having an option or dummy zero value by passing it to create_empty_timeline.
let timeline = repo.create_empty_timeline(tli, lsn)?;
let timeline = repo.create_empty_timeline(timeline_id, lsn)?;
import_datadir::import_timeline_from_postgres_datadir(&pgdata_path, &*timeline, lsn)?;
fail::fail_point!("before-checkpoint-new-timeline", |_| {
@@ -134,7 +139,7 @@ fn bootstrap_timeline(
info!(
"created root timeline {} timeline.lsn {}",
tli,
timeline_id,
timeline.get_last_record_lsn()
);

View File

@@ -21,6 +21,7 @@
use byteorder::{ByteOrder, LittleEndian};
use bytes::{BufMut, Bytes, BytesMut};
use nix::poll::*;
use remote_storage::path_with_suffix_extension;
use serde::Serialize;
use std::fs;
use std::fs::OpenOptions;
@@ -37,7 +38,6 @@ use std::time::Instant;
use tracing::*;
use utils::{bin_ser::BeSer, lsn::Lsn, nonblock::set_nonblock, zid::ZTenantId};
use crate::config::PageServerConf;
use crate::metrics::{
WAL_REDO_RECORDS_HISTOGRAM, WAL_REDO_RECORD_COUNTER, WAL_REDO_TIME, WAL_REDO_WAIT_TIME,
};
@@ -45,6 +45,7 @@ use crate::pgdatadir_mapping::{key_to_rel_block, key_to_slru_block};
use crate::reltag::{RelTag, SlruKind};
use crate::repository::Key;
use crate::walrecord::ZenithWalRecord;
use crate::{config::PageServerConf, TEMP_FILE_SUFFIX};
use postgres_ffi::v14::nonrelfile_utils::{
mx_offset_to_flags_bitshift, mx_offset_to_flags_offset, mx_offset_to_member_offset,
transaction_id_set_status,
@@ -569,20 +570,24 @@ impl PostgresRedoProcess {
//
// Start postgres binary in special WAL redo mode.
//
fn launch(conf: &PageServerConf, tenantid: &ZTenantId) -> Result<PostgresRedoProcess, Error> {
fn launch(conf: &PageServerConf, tenant_id: &ZTenantId) -> Result<PostgresRedoProcess, Error> {
// FIXME: We need a dummy Postgres cluster to run the process in. Currently, we
// just create one with constant name. That fails if you try to launch more than
// one WAL redo manager concurrently.
let datadir = conf.tenant_path(tenantid).join("wal-redo-datadir");
let datadir = path_with_suffix_extension(
conf.tenant_path(tenant_id).join("wal-redo-datadir"),
TEMP_FILE_SUFFIX,
);
// Create empty data directory for wal-redo postgres, deleting old one first.
if datadir.exists() {
info!("directory {:?} exists, removing", &datadir);
if let Err(e) = fs::remove_dir_all(&datadir) {
error!("could not remove old wal-redo-datadir: {:#}", e);
}
info!(
"old temporary datadir {} exists, removing",
datadir.display()
);
fs::remove_dir_all(&datadir)?;
}
info!("running initdb in {:?}", datadir.display());
info!("running initdb in {}", datadir.display());
let initdb = Command::new(conf.pg_bin_dir().join("initdb"))
.args(&["-D", &datadir.to_string_lossy()])
.arg("-N")
@@ -591,7 +596,7 @@ impl PostgresRedoProcess {
.env("DYLD_LIBRARY_PATH", conf.pg_lib_dir())
.close_fds()
.output()
.map_err(|e| Error::new(e.kind(), format!("failed to execute initdb: {}", e)))?;
.map_err(|e| Error::new(e.kind(), format!("failed to execute initdb: {e}")))?;
if !initdb.status.success() {
return Err(Error::new(
@@ -645,7 +650,7 @@ impl PostgresRedoProcess {
})?;
info!(
"launched WAL redo postgres process on {:?}",
"launched WAL redo postgres process on {}",
datadir.display()
);

View File

@@ -32,33 +32,34 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
# Leave the first timeline alone, but corrupt the others in different ways
(tenant0, timeline0, pg0) = tenant_timelines[0]
log.info(f"Timeline {tenant0}/{timeline0} is left intact")
# Corrupt metadata file on timeline 1
(tenant1, timeline1, pg1) = tenant_timelines[1]
metadata_path = "{}/tenants/{}/timelines/{}/metadata".format(env.repo_dir, tenant1, timeline1)
print(f"overwriting metadata file at {metadata_path}")
metadata_path = f"{env.repo_dir}/tenants/{tenant1}/timelines/{timeline1}/metadata"
f = open(metadata_path, "w")
f.write("overwritten with garbage!")
f.close()
log.info(f"Timeline {tenant1}/{timeline1} got its metadata spoiled")
# Missing layer files file on timeline 2. (This would actually work
# if we had Cloud Storage enabled in this test.)
(tenant2, timeline2, pg2) = tenant_timelines[2]
timeline_path = "{}/tenants/{}/timelines/{}/".format(env.repo_dir, tenant2, timeline2)
timeline_path = f"{env.repo_dir}/tenants/{tenant2}/timelines/{timeline2}/"
for filename in os.listdir(timeline_path):
if filename.startswith("00000"):
# Looks like a layer file. Remove it
os.remove(f"{timeline_path}/{filename}")
log.info(
f"Timeline {tenant2}/{timeline2} got its layer files removed (no remote storage enabled)"
)
# Corrupt layer files file on timeline 3
(tenant3, timeline3, pg3) = tenant_timelines[3]
timeline_path = "{}/tenants/{}/timelines/{}/".format(env.repo_dir, tenant3, timeline3)
timeline_path = f"{env.repo_dir}/tenants/{tenant3}/timelines/{timeline3}/"
for filename in os.listdir(timeline_path):
if filename.startswith("00000"):
# Looks like a layer file. Corrupt it
f = open(f"{timeline_path}/{filename}", "w")
f.write("overwritten with garbage!")
f.close()
log.info(f"Timeline {tenant3}/{timeline3} got its layer files spoiled")
env.pageserver.start()
@@ -69,20 +70,28 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
# But all others are broken
# First timeline would not get loaded into pageserver due to corrupt metadata file
(_tenant, _timeline, pg) = tenant_timelines[1]
with pytest.raises(
Exception, match=f"Could not get timeline {timeline1} in tenant {tenant1}"
) as err:
pg.start()
pg1.start()
log.info(f"compute startup failed eagerly for timeline with corrupt metadata: {err}")
# Second timeline has no ancestors, only the metadata file and no layer files
# We don't have the remote storage enabled, which means timeline is in an incorrect state,
# it's not loaded at all
with pytest.raises(
Exception, match=f"Could not get timeline {timeline2} in tenant {tenant2}"
) as err:
pg2.start()
log.info(f"compute startup failed eagerly for timeline with corrupt metadata: {err}")
# Yet other timelines will fail when their layers will be queried during basebackup: we don't check layer file contents on startup, when loading the timeline
for n in range(2, 4):
(_tenant, _timeline, pg) = tenant_timelines[n]
for n in range(3, 4):
(bad_tenant, bad_timeline, pg) = tenant_timelines[n]
with pytest.raises(Exception, match="extracting base backup failed") as err:
pg.start()
log.info(
f"compute startup failed lazily for timeline with corrupt layers, during basebackup preparation: {err}"
f"compute startup failed lazily for timeline {bad_tenant}/{bad_timeline} with corrupt layers, during basebackup preparation: {err}"
)
@@ -107,6 +116,8 @@ def test_fix_broken_timelines_on_startup(neon_simple_env: NeonEnv):
tenant_id, _ = env.neon_cli.create_tenant()
old_tenant_timelines = env.neon_cli.list_timelines(tenant_id)
# Introduce failpoint when creating a new timeline
env.pageserver.safe_psql("failpoints before-checkpoint-new-timeline=return")
with pytest.raises(Exception, match="before-checkpoint-new-timeline"):
@@ -116,6 +127,8 @@ def test_fix_broken_timelines_on_startup(neon_simple_env: NeonEnv):
env.neon_cli.pageserver_stop(immediate=True)
env.neon_cli.pageserver_start()
# Check that tenant with "broken" timeline is not loaded.
with pytest.raises(Exception, match=f"Failed to get repo for tenant {tenant_id}"):
env.neon_cli.list_timelines(tenant_id)
# Creating the timeline didn't finish. The other timelines on tenant should still be present and work normally.
new_tenant_timelines = env.neon_cli.list_timelines(tenant_id)
assert (
new_tenant_timelines == old_tenant_timelines
), f"Pageserver after restart should ignore non-initialized timelines for tenant {tenant_id}"