mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-27 18:10:37 +00:00
Co-authored-by: andres <andres.rodriguez@outlook.es>
This commit is contained in:
@@ -119,32 +119,6 @@ impl<T> TenantTimelineValues<T> {
|
||||
fn new() -> Self {
|
||||
Self(HashMap::new())
|
||||
}
|
||||
|
||||
fn with_capacity(capacity: usize) -> Self {
|
||||
Self(HashMap::with_capacity(capacity))
|
||||
}
|
||||
|
||||
/// A convenience method to map certain values and omit some of them, if needed.
|
||||
/// Tenants that won't have any timeline entries due to the filtering, will still be preserved
|
||||
/// in the structure.
|
||||
fn filter_map<F, NewT>(self, map: F) -> TenantTimelineValues<NewT>
|
||||
where
|
||||
F: Fn(T) -> Option<NewT>,
|
||||
{
|
||||
let capacity = self.0.len();
|
||||
self.0.into_iter().fold(
|
||||
TenantTimelineValues::<NewT>::with_capacity(capacity),
|
||||
|mut new_values, (tenant_id, old_values)| {
|
||||
let new_timeline_values = new_values.0.entry(tenant_id).or_default();
|
||||
for (timeline_id, old_value) in old_values {
|
||||
if let Some(new_value) = map(old_value) {
|
||||
new_timeline_values.insert(timeline_id, new_value);
|
||||
}
|
||||
}
|
||||
new_values
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// A suffix to be used during file sync from the remote storage,
|
||||
@@ -181,35 +155,3 @@ mod backoff_defaults_tests {
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::tenant::harness::TIMELINE_ID;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn tenant_timeline_value_mapping() {
|
||||
let first_tenant = TenantId::generate();
|
||||
let second_tenant = TenantId::generate();
|
||||
assert_ne!(first_tenant, second_tenant);
|
||||
|
||||
let mut initial = TenantTimelineValues::new();
|
||||
initial
|
||||
.0
|
||||
.entry(first_tenant)
|
||||
.or_default()
|
||||
.insert(TIMELINE_ID, "test_value");
|
||||
let _ = initial.0.entry(second_tenant).or_default();
|
||||
assert_eq!(initial.0.len(), 2, "Should have entries for both tenants");
|
||||
|
||||
let filtered = initial.filter_map(|_| None::<&str>).0;
|
||||
assert_eq!(
|
||||
filtered.len(),
|
||||
2,
|
||||
"Should have entries for both tenants even after filtering away all entries"
|
||||
);
|
||||
assert!(filtered.contains_key(&first_tenant));
|
||||
assert!(filtered.contains_key(&second_tenant));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -169,9 +169,14 @@ use self::{
|
||||
upload::{upload_index_part, upload_timeline_layers, UploadedTimeline},
|
||||
};
|
||||
use crate::{
|
||||
config::PageServerConf, exponential_backoff, storage_sync::index::RemoteIndex, task_mgr,
|
||||
task_mgr::TaskKind, task_mgr::BACKGROUND_RUNTIME, tenant::metadata::TimelineMetadata,
|
||||
tenant_mgr::attach_local_tenants,
|
||||
config::PageServerConf,
|
||||
exponential_backoff,
|
||||
storage_sync::index::RemoteIndex,
|
||||
task_mgr,
|
||||
task_mgr::TaskKind,
|
||||
task_mgr::BACKGROUND_RUNTIME,
|
||||
tenant::metadata::TimelineMetadata,
|
||||
tenant_mgr::{attach_local_tenants, TenantAttachData},
|
||||
};
|
||||
use crate::{
|
||||
metrics::{IMAGE_SYNC_TIME, REMAINING_SYNC_ITEMS, REMOTE_INDEX_UPLOAD},
|
||||
@@ -572,7 +577,10 @@ pub fn schedule_layer_download(tenant_id: TenantId, timeline_id: TimelineId) {
|
||||
/// See module docs for loop step description.
|
||||
pub fn spawn_storage_sync_task(
|
||||
conf: &'static PageServerConf,
|
||||
local_timeline_files: TenantTimelineValues<(TimelineMetadata, HashSet<PathBuf>)>,
|
||||
local_timeline_files: HashMap<
|
||||
TenantId,
|
||||
HashMap<TimelineId, (TimelineMetadata, HashSet<PathBuf>)>,
|
||||
>,
|
||||
storage: GenericRemoteStorage,
|
||||
max_concurrent_timelines_sync: NonZeroUsize,
|
||||
max_sync_errors: NonZeroU32,
|
||||
@@ -595,7 +603,7 @@ pub fn spawn_storage_sync_task(
|
||||
let mut keys_for_index_part_downloads = HashSet::new();
|
||||
let mut timelines_to_sync = HashMap::new();
|
||||
|
||||
for (tenant_id, timeline_data) in local_timeline_files.0 {
|
||||
for (tenant_id, timeline_data) in local_timeline_files {
|
||||
if timeline_data.is_empty() {
|
||||
info!("got empty tenant {}", tenant_id);
|
||||
let _ = empty_tenants.0.entry(tenant_id).or_default();
|
||||
@@ -698,7 +706,7 @@ async fn storage_sync_loop(
|
||||
"Sync loop step completed, {} new tenant state update(s)",
|
||||
updated_tenants.len()
|
||||
);
|
||||
let mut timelines_to_attach = TenantTimelineValues::new();
|
||||
let mut timelines_to_attach = HashMap::new();
|
||||
let index_accessor = index.read().await;
|
||||
for tenant_id in updated_tenants {
|
||||
let tenant_entry = match index_accessor.tenant_entry(&tenant_id) {
|
||||
@@ -724,12 +732,16 @@ async fn storage_sync_loop(
|
||||
// and register them all at once in a tenant for download
|
||||
// to be submitted in a single operation to tenant
|
||||
// so it can apply them at once to internal timeline map.
|
||||
timelines_to_attach.0.insert(
|
||||
timelines_to_attach.insert(
|
||||
tenant_id,
|
||||
tenant_entry
|
||||
.iter()
|
||||
.map(|(&id, entry)| (id, entry.metadata.clone()))
|
||||
.collect(),
|
||||
TenantAttachData::Ready(
|
||||
tenant_entry
|
||||
.iter()
|
||||
.map(|(&id, entry)| {
|
||||
(id, (entry.metadata.clone(), HashSet::new()))
|
||||
})
|
||||
.collect(),
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,7 +22,7 @@ use crate::tenant::{
|
||||
};
|
||||
use crate::tenant_config::TenantConfOpt;
|
||||
use crate::walredo::PostgresRedoManager;
|
||||
use crate::{TenantTimelineValues, TEMP_FILE_SUFFIX};
|
||||
use crate::TEMP_FILE_SUFFIX;
|
||||
|
||||
use utils::crashsafe_dir::{self, path_with_suffix_extension};
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
@@ -70,34 +70,54 @@ pub fn init_tenant_mgr(
|
||||
.remote_storage_config
|
||||
.as_ref()
|
||||
.expect("remote storage without config");
|
||||
|
||||
let mut broken_tenants = HashMap::new();
|
||||
let mut ready_tenants = HashMap::new();
|
||||
for (tenant_id, tenant_attach_data) in local_tenant_files.into_iter() {
|
||||
match tenant_attach_data {
|
||||
TenantAttachData::Ready(t) => {
|
||||
ready_tenants.insert(tenant_id, t);
|
||||
}
|
||||
TenantAttachData::Broken(e) => {
|
||||
broken_tenants.insert(tenant_id, TenantAttachData::Broken(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
let SyncStartupData {
|
||||
remote_index,
|
||||
local_timeline_init_statuses,
|
||||
} = storage_sync::spawn_storage_sync_task(
|
||||
conf,
|
||||
local_tenant_files,
|
||||
ready_tenants,
|
||||
storage,
|
||||
storage_config.max_concurrent_syncs,
|
||||
storage_config.max_sync_errors,
|
||||
)
|
||||
.context("Failed to spawn the storage sync thread")?;
|
||||
|
||||
(
|
||||
remote_index,
|
||||
local_timeline_init_statuses.filter_map(|init_status| match init_status {
|
||||
LocalTimelineInitStatus::LocallyComplete(metadata) => Some(metadata),
|
||||
LocalTimelineInitStatus::NeedsSync => None,
|
||||
}),
|
||||
)
|
||||
let n = local_timeline_init_statuses.0.len();
|
||||
let mut synced_timelines = local_timeline_init_statuses.0.into_iter().fold(
|
||||
HashMap::<TenantId, TenantAttachData>::with_capacity(n),
|
||||
|mut new_values, (tenant_id, old_values)| {
|
||||
let new_timeline_values = new_values
|
||||
.entry(tenant_id)
|
||||
.or_insert_with(|| TenantAttachData::Ready(HashMap::new()));
|
||||
if let TenantAttachData::Ready(t) = new_timeline_values {
|
||||
for (timeline_id, old_value) in old_values {
|
||||
if let LocalTimelineInitStatus::LocallyComplete(metadata) = old_value {
|
||||
t.insert(timeline_id, (metadata, HashSet::new()));
|
||||
}
|
||||
}
|
||||
}
|
||||
new_values
|
||||
},
|
||||
);
|
||||
synced_timelines.extend(broken_tenants);
|
||||
|
||||
(remote_index, synced_timelines)
|
||||
} else {
|
||||
info!("No remote storage configured, skipping storage sync, considering all local timelines with correct metadata files enabled");
|
||||
(
|
||||
RemoteIndex::default(),
|
||||
local_tenant_files.filter_map(|(metadata, _)| Some(metadata)),
|
||||
)
|
||||
(RemoteIndex::default(), local_tenant_files)
|
||||
};
|
||||
|
||||
attach_local_tenants(conf, &remote_index, tenants_to_attach);
|
||||
|
||||
Ok(remote_index)
|
||||
@@ -117,18 +137,12 @@ pub fn init_tenant_mgr(
|
||||
pub fn attach_local_tenants(
|
||||
conf: &'static PageServerConf,
|
||||
remote_index: &RemoteIndex,
|
||||
tenants_to_attach: TenantTimelineValues<TimelineMetadata>,
|
||||
tenants_to_attach: HashMap<TenantId, TenantAttachData>,
|
||||
) {
|
||||
let _entered = info_span!("attach_local_tenants").entered();
|
||||
let number_of_tenants = tenants_to_attach.0.len();
|
||||
|
||||
for (tenant_id, local_timelines) in tenants_to_attach.0 {
|
||||
info!(
|
||||
"Attaching {} timelines for {tenant_id}",
|
||||
local_timelines.len()
|
||||
);
|
||||
debug!("Timelines to attach: {local_timelines:?}");
|
||||
let number_of_tenants = tenants_to_attach.len();
|
||||
|
||||
for (tenant_id, local_timelines) in tenants_to_attach {
|
||||
let mut tenants_accessor = tenants_state::write_tenants();
|
||||
let tenant = match tenants_accessor.entry(tenant_id) {
|
||||
hash_map::Entry::Occupied(o) => {
|
||||
@@ -137,25 +151,55 @@ pub fn attach_local_tenants(
|
||||
}
|
||||
hash_map::Entry::Vacant(v) => {
|
||||
info!("Tenant {tenant_id} was not found in pageserver's memory, loading it");
|
||||
let tenant = load_local_tenant(conf, tenant_id, remote_index);
|
||||
let tenant = Arc::new(Tenant::new(
|
||||
conf,
|
||||
TenantConfOpt::default(),
|
||||
Arc::new(PostgresRedoManager::new(conf, tenant_id)),
|
||||
tenant_id,
|
||||
remote_index.clone(),
|
||||
conf.remote_storage_config.is_some(),
|
||||
));
|
||||
match local_timelines {
|
||||
TenantAttachData::Broken(_) => {
|
||||
tenant.set_state(TenantState::Broken);
|
||||
}
|
||||
TenantAttachData::Ready(_) => {
|
||||
match Tenant::load_tenant_config(conf, tenant_id) {
|
||||
Ok(tenant_conf) => {
|
||||
tenant.update_tenant_config(tenant_conf);
|
||||
tenant.activate(false);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to read config for tenant {tenant_id}, disabling tenant: {e:?}");
|
||||
tenant.set_state(TenantState::Broken);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
v.insert(Arc::clone(&tenant));
|
||||
tenant
|
||||
}
|
||||
};
|
||||
drop(tenants_accessor);
|
||||
|
||||
if tenant.current_state() == TenantState::Broken {
|
||||
warn!("Skipping timeline load for broken tenant {tenant_id}")
|
||||
} else {
|
||||
let has_timelines = !local_timelines.is_empty();
|
||||
match tenant.init_attach_timelines(local_timelines) {
|
||||
Ok(()) => {
|
||||
info!("successfully loaded local timelines for tenant {tenant_id}");
|
||||
tenant.activate(has_timelines);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to attach tenant timelines: {e:?}");
|
||||
tenant.set_state(TenantState::Broken);
|
||||
match local_timelines {
|
||||
TenantAttachData::Broken(e) => warn!("{}", e),
|
||||
TenantAttachData::Ready(ref timelines) => {
|
||||
info!("Attaching {} timelines for {tenant_id}", timelines.len());
|
||||
debug!("Timelines to attach: {local_timelines:?}");
|
||||
let has_timelines = !timelines.is_empty();
|
||||
let timelines_to_attach = timelines
|
||||
.iter()
|
||||
.map(|(&k, (v, _))| (k, v.clone()))
|
||||
.collect();
|
||||
match tenant.init_attach_timelines(timelines_to_attach) {
|
||||
Ok(()) => {
|
||||
info!("successfully loaded local timelines for tenant {tenant_id}");
|
||||
tenant.activate(has_timelines);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to attach tenant timelines: {e:?}");
|
||||
tenant.set_state(TenantState::Broken);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -164,44 +208,6 @@ pub fn attach_local_tenants(
|
||||
info!("Processed {number_of_tenants} local tenants during attach")
|
||||
}
|
||||
|
||||
fn load_local_tenant(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
remote_index: &RemoteIndex,
|
||||
) -> Arc<Tenant> {
|
||||
let tenant = Arc::new(Tenant::new(
|
||||
conf,
|
||||
TenantConfOpt::default(),
|
||||
Arc::new(PostgresRedoManager::new(conf, tenant_id)),
|
||||
tenant_id,
|
||||
remote_index.clone(),
|
||||
conf.remote_storage_config.is_some(),
|
||||
));
|
||||
|
||||
let tenant_timelines_dir = conf.timelines_path(&tenant_id);
|
||||
if !tenant_timelines_dir.is_dir() {
|
||||
error!(
|
||||
"Tenant {} has no timelines directory at {}",
|
||||
tenant_id,
|
||||
tenant_timelines_dir.display()
|
||||
);
|
||||
tenant.set_state(TenantState::Broken);
|
||||
} else {
|
||||
match Tenant::load_tenant_config(conf, tenant_id) {
|
||||
Ok(tenant_conf) => {
|
||||
tenant.update_tenant_config(tenant_conf);
|
||||
tenant.activate(false);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to read config for tenant {tenant_id}, disabling tenant: {e:?}");
|
||||
tenant.set_state(TenantState::Broken);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tenant
|
||||
}
|
||||
|
||||
///
|
||||
/// Shut down all tenants. This runs as part of pageserver shutdown.
|
||||
///
|
||||
@@ -475,16 +481,21 @@ pub fn list_tenant_info(remote_index: &RemoteTimelineIndex) -> Vec<TenantInfo> {
|
||||
.collect()
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum TenantAttachData {
|
||||
Ready(HashMap<TimelineId, (TimelineMetadata, HashSet<PathBuf>)>),
|
||||
Broken(anyhow::Error),
|
||||
}
|
||||
/// Attempts to collect information about all tenant and timelines, existing on the local FS.
|
||||
/// If finds any, deletes all temporary files and directories, created before. Also removes empty directories,
|
||||
/// that may appear due to such removals.
|
||||
/// Does not fail on particular timeline or tenant collection errors, rather logging them and ignoring the entities.
|
||||
fn local_tenant_timeline_files(
|
||||
config: &'static PageServerConf,
|
||||
) -> anyhow::Result<TenantTimelineValues<(TimelineMetadata, HashSet<PathBuf>)>> {
|
||||
) -> anyhow::Result<HashMap<TenantId, TenantAttachData>> {
|
||||
let _entered = info_span!("local_tenant_timeline_files").entered();
|
||||
|
||||
let mut local_tenant_timeline_files = TenantTimelineValues::new();
|
||||
let mut local_tenant_timeline_files = HashMap::new();
|
||||
let tenants_dir = config.tenants_path();
|
||||
for tenants_dir_entry in fs::read_dir(&tenants_dir)
|
||||
.with_context(|| format!("Failed to list tenants dir {}", tenants_dir.display()))?
|
||||
@@ -506,19 +517,31 @@ fn local_tenant_timeline_files(
|
||||
}
|
||||
} else {
|
||||
match collect_timelines_for_tenant(config, &tenant_dir_path) {
|
||||
Ok((tenant_id, collected_files)) => {
|
||||
Ok((tenant_id, TenantAttachData::Broken(e))) => {
|
||||
local_tenant_timeline_files.entry(tenant_id).or_insert(TenantAttachData::Broken(e));
|
||||
},
|
||||
Ok((tenant_id, TenantAttachData::Ready(collected_files))) => {
|
||||
if collected_files.is_empty() {
|
||||
match remove_if_empty(&tenant_dir_path) {
|
||||
Ok(true) => info!("Removed empty tenant directory {}", tenant_dir_path.display()),
|
||||
Ok(false) => {
|
||||
// insert empty timeline entry: it has some non-temporary files inside that we cannot remove
|
||||
// so make obvious for HTTP API callers, that something exists there and try to load the tenant
|
||||
let _ = local_tenant_timeline_files.0.entry(tenant_id).or_default();
|
||||
let _ = local_tenant_timeline_files.entry(tenant_id).or_insert_with(|| TenantAttachData::Ready(HashMap::new()));
|
||||
},
|
||||
Err(e) => error!("Failed to remove empty tenant directory: {e:?}"),
|
||||
}
|
||||
} else {
|
||||
local_tenant_timeline_files.0.entry(tenant_id).or_default().extend(collected_files.into_iter())
|
||||
match local_tenant_timeline_files.entry(tenant_id) {
|
||||
hash_map::Entry::Vacant(entry) => {
|
||||
entry.insert(TenantAttachData::Ready(collected_files));
|
||||
}
|
||||
hash_map::Entry::Occupied(entry) =>{
|
||||
if let TenantAttachData::Ready(old_timelines) = entry.into_mut() {
|
||||
old_timelines.extend(collected_files);
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
},
|
||||
Err(e) => error!(
|
||||
@@ -541,7 +564,7 @@ fn local_tenant_timeline_files(
|
||||
|
||||
info!(
|
||||
"Collected files for {} tenants",
|
||||
local_tenant_timeline_files.0.len()
|
||||
local_tenant_timeline_files.len(),
|
||||
);
|
||||
Ok(local_tenant_timeline_files)
|
||||
}
|
||||
@@ -583,10 +606,7 @@ fn is_temporary(path: &Path) -> bool {
|
||||
fn collect_timelines_for_tenant(
|
||||
config: &'static PageServerConf,
|
||||
tenant_path: &Path,
|
||||
) -> anyhow::Result<(
|
||||
TenantId,
|
||||
HashMap<TimelineId, (TimelineMetadata, HashSet<PathBuf>)>,
|
||||
)> {
|
||||
) -> anyhow::Result<(TenantId, TenantAttachData)> {
|
||||
let tenant_id = tenant_path
|
||||
.file_name()
|
||||
.and_then(OsStr::to_str)
|
||||
@@ -595,6 +615,17 @@ fn collect_timelines_for_tenant(
|
||||
.context("Could not parse tenant id out of the tenant dir name")?;
|
||||
let timelines_dir = config.timelines_path(&tenant_id);
|
||||
|
||||
if !timelines_dir.as_path().is_dir() {
|
||||
return Ok((
|
||||
tenant_id,
|
||||
TenantAttachData::Broken(anyhow::anyhow!(
|
||||
"Tenant {} has no timelines directory at {}",
|
||||
tenant_id,
|
||||
timelines_dir.display()
|
||||
)),
|
||||
));
|
||||
}
|
||||
|
||||
let mut tenant_timelines = HashMap::new();
|
||||
for timelines_dir_entry in fs::read_dir(&timelines_dir)
|
||||
.with_context(|| format!("Failed to list timelines dir entry for tenant {tenant_id}"))?
|
||||
@@ -652,7 +683,7 @@ fn collect_timelines_for_tenant(
|
||||
debug!("Tenant {tenant_id} has no timelines loaded");
|
||||
}
|
||||
|
||||
Ok((tenant_id, tenant_timelines))
|
||||
Ok((tenant_id, TenantAttachData::Ready(tenant_timelines)))
|
||||
}
|
||||
|
||||
// discover timeline files and extract timeline metadata
|
||||
|
||||
@@ -258,11 +258,20 @@ def test_pageserver_with_empty_tenants(
|
||||
tenants = client.tenant_list()
|
||||
|
||||
assert (
|
||||
len(tenants) == 1
|
||||
), "Pageserver should attach only tenants with empty timelines/ dir on restart"
|
||||
loaded_tenant = tenants[0]
|
||||
assert loaded_tenant["id"] == str(
|
||||
tenant_with_empty_timelines_dir
|
||||
len(tenants) == 2
|
||||
), "Pageserver should attach only tenants with empty or not existing timelines/ dir on restart"
|
||||
|
||||
[broken_tenant] = [t for t in tenants if t["id"] == str(tenant_without_timelines_dir)]
|
||||
assert (
|
||||
broken_tenant
|
||||
), f"A broken tenant {tenant_without_timelines_dir} should exists in the tenant list"
|
||||
assert (
|
||||
broken_tenant["state"] == "Broken"
|
||||
), f"Tenant {tenant_without_timelines_dir} without timelines dir should be broken"
|
||||
|
||||
[loaded_tenant] = [t for t in tenants if t["id"] == str(tenant_with_empty_timelines_dir)]
|
||||
assert (
|
||||
loaded_tenant
|
||||
), f"Tenant {tenant_with_empty_timelines_dir} should be loaded as the only one with tenants/ directory"
|
||||
assert loaded_tenant["state"] == {
|
||||
"Active": {"background_jobs_running": False}
|
||||
|
||||
Reference in New Issue
Block a user