mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-24 00:20:37 +00:00
The `test_storcon_create_delete_sk_down` test is still flaky. This test addresses two possible causes for flakiness. both causes are related to deletion racing with `pull_timeline` which hasn't finished yet. * the first cause is timeline deletion racing with `pull_timeline`: * the first deletion attempt doesn't contain the line because the timeline doesn't exist yet * the subsequent deletion attempts don't contain it either, only a note that the timeline is already deleted. * so this patch adds the note that the timeline is already deleted to the regex * the second cause is about tenant deletion racing with `pull_timeline`: * there were no tenant specific tombstones so if a tenant was deleted, we only added tombstones for the specific timelines being deleted, not for the tenant itself. * This patch changes this, so we now have tenant specific tombstones as well as timeline specific ones, and creation of a timeline checks both. * we also don't see any retries of the tenant deletion in the logs. once it's done it's done. so extend the regex to contain the tenant deletion message as well. One could wonder why the regex and why not using the API to check whether the timeline is just "gone". The issue with the API is that it doesn't allow one to distinguish between "deleted" and "has never existed", and latter case might race with `pull_timeline`. I.e. the second case flakiness helped in the discovery of a real bug (no tenant tombstones), so the more precise check was helpful. Before, I could easily reproduce 2-9 occurences of flakiness when running the test with an additional `range(128)` parameter (i.e. 218 times 4 times). With this patch, I ran it three times, not a single failure. Fixes #11838
685 lines
26 KiB
Rust
685 lines
26 KiB
Rust
//! This module contains global `(tenant_id, timeline_id)` -> `Arc<Timeline>` mapping.
|
|
//! All timelines should always be present in this map, this is done by loading them
|
|
//! all from the disk on startup and keeping them in memory.
|
|
|
|
use std::collections::HashMap;
|
|
use std::str::FromStr;
|
|
use std::sync::{Arc, Mutex};
|
|
use std::time::{Duration, Instant};
|
|
|
|
use anyhow::{Context, Result, bail};
|
|
use camino::Utf8PathBuf;
|
|
use camino_tempfile::Utf8TempDir;
|
|
use safekeeper_api::membership::Configuration;
|
|
use safekeeper_api::models::{SafekeeperUtilization, TimelineDeleteResult};
|
|
use safekeeper_api::{ServerInfo, membership};
|
|
use tokio::fs;
|
|
use tracing::*;
|
|
use utils::crashsafe::{durable_rename, fsync_async_opt};
|
|
use utils::id::{TenantId, TenantTimelineId, TimelineId};
|
|
use utils::lsn::Lsn;
|
|
|
|
use crate::defaults::DEFAULT_EVICTION_CONCURRENCY;
|
|
use crate::http::routes::DeleteOrExcludeError;
|
|
use crate::rate_limit::RateLimiter;
|
|
use crate::state::TimelinePersistentState;
|
|
use crate::timeline::{Timeline, TimelineError, delete_dir, get_tenant_dir, get_timeline_dir};
|
|
use crate::timelines_set::TimelinesSet;
|
|
use crate::wal_backup::WalBackup;
|
|
use crate::wal_storage::Storage;
|
|
use crate::{SafeKeeperConf, control_file, wal_storage};
|
|
|
|
// Timeline entry in the global map: either a ready timeline, or mark that it is
|
|
// being created.
|
|
#[derive(Clone)]
|
|
enum GlobalMapTimeline {
|
|
CreationInProgress,
|
|
Timeline(Arc<Timeline>),
|
|
}
|
|
|
|
struct GlobalTimelinesState {
|
|
timelines: HashMap<TenantTimelineId, GlobalMapTimeline>,
|
|
|
|
// A tombstone indicates this timeline used to exist has been deleted. These are used to prevent
|
|
// on-demand timeline creation from recreating deleted timelines. This is only soft-enforced, as
|
|
// this map is dropped on restart.
|
|
tombstones: HashMap<TenantTimelineId, Instant>,
|
|
tenant_tombstones: HashMap<TenantId, Instant>,
|
|
|
|
conf: Arc<SafeKeeperConf>,
|
|
broker_active_set: Arc<TimelinesSet>,
|
|
global_rate_limiter: RateLimiter,
|
|
wal_backup: Arc<WalBackup>,
|
|
}
|
|
|
|
impl GlobalTimelinesState {
|
|
/// Get dependencies for a timeline constructor.
|
|
fn get_dependencies(
|
|
&self,
|
|
) -> (
|
|
Arc<SafeKeeperConf>,
|
|
Arc<TimelinesSet>,
|
|
RateLimiter,
|
|
Arc<WalBackup>,
|
|
) {
|
|
(
|
|
self.conf.clone(),
|
|
self.broker_active_set.clone(),
|
|
self.global_rate_limiter.clone(),
|
|
self.wal_backup.clone(),
|
|
)
|
|
}
|
|
|
|
/// Get timeline from the map. Returns error if timeline doesn't exist or
|
|
/// creation is in progress.
|
|
fn get(&self, ttid: &TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
|
|
match self.timelines.get(ttid).cloned() {
|
|
Some(GlobalMapTimeline::Timeline(tli)) => Ok(tli),
|
|
Some(GlobalMapTimeline::CreationInProgress) => {
|
|
Err(TimelineError::CreationInProgress(*ttid))
|
|
}
|
|
None => Err(TimelineError::NotFound(*ttid)),
|
|
}
|
|
}
|
|
|
|
fn has_tombstone(&self, ttid: &TenantTimelineId) -> bool {
|
|
self.tombstones.contains_key(ttid) || self.tenant_tombstones.contains_key(&ttid.tenant_id)
|
|
}
|
|
|
|
/// Removes all blocking tombstones for the given timeline ID.
|
|
/// Returns `true` if there have been actual changes.
|
|
fn remove_tombstone(&mut self, ttid: &TenantTimelineId) -> bool {
|
|
self.tombstones.remove(ttid).is_some()
|
|
|| self.tenant_tombstones.remove(&ttid.tenant_id).is_some()
|
|
}
|
|
|
|
fn delete(&mut self, ttid: TenantTimelineId) {
|
|
self.timelines.remove(&ttid);
|
|
self.tombstones.insert(ttid, Instant::now());
|
|
}
|
|
|
|
fn add_tenant_tombstone(&mut self, tenant_id: TenantId) {
|
|
self.tenant_tombstones.insert(tenant_id, Instant::now());
|
|
}
|
|
}
|
|
|
|
/// A struct used to manage access to the global timelines map.
|
|
pub struct GlobalTimelines {
|
|
state: Mutex<GlobalTimelinesState>,
|
|
}
|
|
|
|
impl GlobalTimelines {
|
|
/// Create a new instance of the global timelines map.
|
|
pub fn new(conf: Arc<SafeKeeperConf>, wal_backup: Arc<WalBackup>) -> Self {
|
|
Self {
|
|
state: Mutex::new(GlobalTimelinesState {
|
|
timelines: HashMap::new(),
|
|
tombstones: HashMap::new(),
|
|
tenant_tombstones: HashMap::new(),
|
|
conf,
|
|
broker_active_set: Arc::new(TimelinesSet::default()),
|
|
global_rate_limiter: RateLimiter::new(1, 1),
|
|
wal_backup,
|
|
}),
|
|
}
|
|
}
|
|
|
|
/// Inject dependencies needed for the timeline constructors and load all timelines to memory.
|
|
pub async fn init(&self) -> Result<()> {
|
|
// clippy isn't smart enough to understand that drop(state) releases the
|
|
// lock, so use explicit block
|
|
let tenants_dir = {
|
|
let mut state = self.state.lock().unwrap();
|
|
state.global_rate_limiter = RateLimiter::new(
|
|
state.conf.partial_backup_concurrency,
|
|
DEFAULT_EVICTION_CONCURRENCY,
|
|
);
|
|
|
|
// Iterate through all directories and load tenants for all directories
|
|
// named as a valid tenant_id.
|
|
state.conf.workdir.clone()
|
|
};
|
|
let mut tenant_count = 0;
|
|
for tenants_dir_entry in std::fs::read_dir(&tenants_dir)
|
|
.with_context(|| format!("failed to list tenants dir {}", tenants_dir))?
|
|
{
|
|
match &tenants_dir_entry {
|
|
Ok(tenants_dir_entry) => {
|
|
if let Ok(tenant_id) =
|
|
TenantId::from_str(tenants_dir_entry.file_name().to_str().unwrap_or(""))
|
|
{
|
|
tenant_count += 1;
|
|
self.load_tenant_timelines(tenant_id).await?;
|
|
}
|
|
}
|
|
Err(e) => error!(
|
|
"failed to list tenants dir entry {:?} in directory {}, reason: {:?}",
|
|
tenants_dir_entry, tenants_dir, e
|
|
),
|
|
}
|
|
}
|
|
|
|
info!(
|
|
"found {} tenants directories, successfully loaded {} timelines",
|
|
tenant_count,
|
|
self.state.lock().unwrap().timelines.len()
|
|
);
|
|
Ok(())
|
|
}
|
|
|
|
/// Loads all timelines for the given tenant to memory. Returns fs::read_dir
|
|
/// errors if any.
|
|
///
|
|
/// It is async, but self.state lock is sync and there is no important
|
|
/// reason to make it async (it is always held for a short while), so we
|
|
/// just lock and unlock it for each timeline -- this function is called
|
|
/// during init when nothing else is running, so this is fine.
|
|
async fn load_tenant_timelines(&self, tenant_id: TenantId) -> Result<()> {
|
|
let (conf, broker_active_set, partial_backup_rate_limiter, wal_backup) = {
|
|
let state = self.state.lock().unwrap();
|
|
state.get_dependencies()
|
|
};
|
|
|
|
let timelines_dir = get_tenant_dir(&conf, &tenant_id);
|
|
for timelines_dir_entry in std::fs::read_dir(&timelines_dir)
|
|
.with_context(|| format!("failed to list timelines dir {}", timelines_dir))?
|
|
{
|
|
match &timelines_dir_entry {
|
|
Ok(timeline_dir_entry) => {
|
|
if let Ok(timeline_id) =
|
|
TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or(""))
|
|
{
|
|
let ttid = TenantTimelineId::new(tenant_id, timeline_id);
|
|
match Timeline::load_timeline(conf.clone(), ttid, wal_backup.clone()) {
|
|
Ok(tli) => {
|
|
let mut shared_state = tli.write_shared_state().await;
|
|
self.state
|
|
.lock()
|
|
.unwrap()
|
|
.timelines
|
|
.insert(ttid, GlobalMapTimeline::Timeline(tli.clone()));
|
|
tli.bootstrap(
|
|
&mut shared_state,
|
|
&conf,
|
|
broker_active_set.clone(),
|
|
partial_backup_rate_limiter.clone(),
|
|
wal_backup.clone(),
|
|
);
|
|
}
|
|
// If we can't load a timeline, it's most likely because of a corrupted
|
|
// directory. We will log an error and won't allow to delete/recreate
|
|
// this timeline. The only way to fix this timeline is to repair manually
|
|
// and restart the safekeeper.
|
|
Err(e) => error!(
|
|
"failed to load timeline {} for tenant {}, reason: {:?}",
|
|
timeline_id, tenant_id, e
|
|
),
|
|
}
|
|
}
|
|
}
|
|
Err(e) => error!(
|
|
"failed to list timelines dir entry {:?} in directory {}, reason: {:?}",
|
|
timelines_dir_entry, timelines_dir, e
|
|
),
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Get the number of timelines in the map.
|
|
pub fn timelines_count(&self) -> usize {
|
|
self.state.lock().unwrap().timelines.len()
|
|
}
|
|
|
|
/// Get the global safekeeper config.
|
|
pub fn get_global_config(&self) -> Arc<SafeKeeperConf> {
|
|
self.state.lock().unwrap().conf.clone()
|
|
}
|
|
|
|
pub fn get_global_broker_active_set(&self) -> Arc<TimelinesSet> {
|
|
self.state.lock().unwrap().broker_active_set.clone()
|
|
}
|
|
|
|
pub fn get_wal_backup(&self) -> Arc<WalBackup> {
|
|
self.state.lock().unwrap().wal_backup.clone()
|
|
}
|
|
|
|
/// Create a new timeline with the given id. If the timeline already exists, returns
|
|
/// an existing timeline.
|
|
pub(crate) async fn create(
|
|
&self,
|
|
ttid: TenantTimelineId,
|
|
mconf: Configuration,
|
|
server_info: ServerInfo,
|
|
start_lsn: Lsn,
|
|
commit_lsn: Lsn,
|
|
) -> Result<Arc<Timeline>> {
|
|
let (conf, _, _, _) = {
|
|
let state = self.state.lock().unwrap();
|
|
if let Ok(timeline) = state.get(&ttid) {
|
|
// Timeline already exists, return it.
|
|
return Ok(timeline);
|
|
}
|
|
|
|
if state.has_tombstone(&ttid) {
|
|
anyhow::bail!("Timeline {ttid} is deleted, refusing to recreate");
|
|
}
|
|
|
|
state.get_dependencies()
|
|
};
|
|
|
|
info!("creating new timeline {}", ttid);
|
|
|
|
// Do on disk initialization in tmp dir.
|
|
let (_tmp_dir, tmp_dir_path) = create_temp_timeline_dir(&conf, ttid).await?;
|
|
|
|
// TODO: currently we create only cfile. It would be reasonable to
|
|
// immediately initialize first WAL segment as well.
|
|
let state = TimelinePersistentState::new(&ttid, mconf, server_info, start_lsn, commit_lsn)?;
|
|
control_file::FileStorage::create_new(&tmp_dir_path, state, conf.no_sync).await?;
|
|
let timeline = self.load_temp_timeline(ttid, &tmp_dir_path, true).await?;
|
|
Ok(timeline)
|
|
}
|
|
|
|
/// Move timeline from a temp directory to the main storage, and load it to
|
|
/// the global map. Creating timeline in this way ensures atomicity: rename
|
|
/// is atomic, so either move of the whole datadir succeeds or it doesn't,
|
|
/// but corrupted data dir shouldn't be possible.
|
|
///
|
|
/// We'd like to avoid holding map lock while doing IO, so it's a 3 step
|
|
/// process:
|
|
/// 1) check the global map that timeline doesn't exist and mark that we're
|
|
/// creating it;
|
|
/// 2) move the directory and load the timeline
|
|
/// 3) take lock again and insert the timeline into the global map.
|
|
pub async fn load_temp_timeline(
|
|
&self,
|
|
ttid: TenantTimelineId,
|
|
tmp_path: &Utf8PathBuf,
|
|
check_tombstone: bool,
|
|
) -> Result<Arc<Timeline>> {
|
|
// Check for existence and mark that we're creating it.
|
|
let (conf, broker_active_set, partial_backup_rate_limiter, wal_backup) = {
|
|
let mut state = self.state.lock().unwrap();
|
|
match state.timelines.get(&ttid) {
|
|
Some(GlobalMapTimeline::CreationInProgress) => {
|
|
bail!(TimelineError::CreationInProgress(ttid));
|
|
}
|
|
Some(GlobalMapTimeline::Timeline(_)) => {
|
|
bail!(TimelineError::AlreadyExists(ttid));
|
|
}
|
|
_ => {}
|
|
}
|
|
if check_tombstone {
|
|
if state.has_tombstone(&ttid) {
|
|
anyhow::bail!("timeline {ttid} is deleted, refusing to recreate");
|
|
}
|
|
} else {
|
|
// We may be have been asked to load a timeline that was previously deleted (e.g. from `pull_timeline.rs`). We trust
|
|
// that the human doing this manual intervention knows what they are doing, and remove its tombstone.
|
|
// It's also possible that we enter this when the tenant has been deleted, even if the timeline itself has never existed.
|
|
if state.remove_tombstone(&ttid) {
|
|
warn!("un-deleted timeline {ttid}");
|
|
}
|
|
}
|
|
state
|
|
.timelines
|
|
.insert(ttid, GlobalMapTimeline::CreationInProgress);
|
|
state.get_dependencies()
|
|
};
|
|
|
|
// Do the actual move and reflect the result in the map.
|
|
match GlobalTimelines::install_temp_timeline(
|
|
ttid,
|
|
tmp_path,
|
|
conf.clone(),
|
|
wal_backup.clone(),
|
|
)
|
|
.await
|
|
{
|
|
Ok(timeline) => {
|
|
let mut timeline_shared_state = timeline.write_shared_state().await;
|
|
let mut state = self.state.lock().unwrap();
|
|
assert!(matches!(
|
|
state.timelines.get(&ttid),
|
|
Some(GlobalMapTimeline::CreationInProgress)
|
|
));
|
|
|
|
state
|
|
.timelines
|
|
.insert(ttid, GlobalMapTimeline::Timeline(timeline.clone()));
|
|
drop(state);
|
|
timeline.bootstrap(
|
|
&mut timeline_shared_state,
|
|
&conf,
|
|
broker_active_set,
|
|
partial_backup_rate_limiter,
|
|
wal_backup,
|
|
);
|
|
drop(timeline_shared_state);
|
|
Ok(timeline)
|
|
}
|
|
Err(e) => {
|
|
// Init failed, remove the marker from the map
|
|
let mut state = self.state.lock().unwrap();
|
|
assert!(matches!(
|
|
state.timelines.get(&ttid),
|
|
Some(GlobalMapTimeline::CreationInProgress)
|
|
));
|
|
state.timelines.remove(&ttid);
|
|
Err(e)
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Main part of load_temp_timeline: do the move and load.
|
|
async fn install_temp_timeline(
|
|
ttid: TenantTimelineId,
|
|
tmp_path: &Utf8PathBuf,
|
|
conf: Arc<SafeKeeperConf>,
|
|
wal_backup: Arc<WalBackup>,
|
|
) -> Result<Arc<Timeline>> {
|
|
let tenant_path = get_tenant_dir(conf.as_ref(), &ttid.tenant_id);
|
|
let timeline_path = get_timeline_dir(conf.as_ref(), &ttid);
|
|
|
|
// We must have already checked that timeline doesn't exist in the map,
|
|
// but there might be existing datadir: if timeline is corrupted it is
|
|
// not loaded. We don't want to overwrite such a dir, so check for its
|
|
// existence.
|
|
match fs::metadata(&timeline_path).await {
|
|
Ok(_) => {
|
|
// Timeline directory exists on disk, we should leave state unchanged
|
|
// and return error.
|
|
bail!(TimelineError::Invalid(ttid));
|
|
}
|
|
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
|
|
Err(e) => {
|
|
return Err(e.into());
|
|
}
|
|
}
|
|
|
|
info!(
|
|
"moving timeline {} from {} to {}",
|
|
ttid, tmp_path, timeline_path
|
|
);
|
|
|
|
// Now it is safe to move the timeline directory to the correct
|
|
// location. First, create tenant directory. Ignore error if it already
|
|
// exists.
|
|
if let Err(e) = tokio::fs::create_dir(&tenant_path).await {
|
|
if e.kind() != std::io::ErrorKind::AlreadyExists {
|
|
return Err(e.into());
|
|
}
|
|
}
|
|
// fsync it
|
|
fsync_async_opt(&tenant_path, !conf.no_sync).await?;
|
|
// and its creation
|
|
fsync_async_opt(&conf.workdir, !conf.no_sync).await?;
|
|
|
|
// Do the move.
|
|
durable_rename(tmp_path, &timeline_path, !conf.no_sync).await?;
|
|
|
|
Timeline::load_timeline(conf, ttid, wal_backup)
|
|
}
|
|
|
|
/// Get a timeline from the global map. If it's not present, it doesn't exist on disk,
|
|
/// or was corrupted and couldn't be loaded on startup. Returned timeline is always valid,
|
|
/// i.e. loaded in memory and not cancelled.
|
|
pub(crate) fn get(&self, ttid: TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
|
|
let tli_res = {
|
|
let state = self.state.lock().unwrap();
|
|
state.get(&ttid)
|
|
};
|
|
match tli_res {
|
|
Ok(tli) => {
|
|
if tli.is_cancelled() {
|
|
return Err(TimelineError::Cancelled(ttid));
|
|
}
|
|
Ok(tli)
|
|
}
|
|
_ => tli_res,
|
|
}
|
|
}
|
|
|
|
/// Returns all timelines. This is used for background timeline processes.
|
|
pub fn get_all(&self) -> Vec<Arc<Timeline>> {
|
|
let global_lock = self.state.lock().unwrap();
|
|
global_lock
|
|
.timelines
|
|
.values()
|
|
.filter_map(|t| match t {
|
|
GlobalMapTimeline::Timeline(t) => {
|
|
if t.is_cancelled() {
|
|
None
|
|
} else {
|
|
Some(t.clone())
|
|
}
|
|
}
|
|
_ => None,
|
|
})
|
|
.collect()
|
|
}
|
|
|
|
/// Returns statistics about timeline counts
|
|
pub fn get_timeline_counts(&self) -> SafekeeperUtilization {
|
|
let global_lock = self.state.lock().unwrap();
|
|
let timeline_count = global_lock
|
|
.timelines
|
|
.values()
|
|
.filter(|t| match t {
|
|
GlobalMapTimeline::CreationInProgress => false,
|
|
GlobalMapTimeline::Timeline(t) => !t.is_cancelled(),
|
|
})
|
|
.count() as u64;
|
|
SafekeeperUtilization { timeline_count }
|
|
}
|
|
|
|
/// Returns all timelines belonging to a given tenant. Used for deleting all timelines of a tenant,
|
|
/// and that's why it can return cancelled timelines, to retry deleting them.
|
|
fn get_all_for_tenant(&self, tenant_id: TenantId) -> Vec<Arc<Timeline>> {
|
|
let global_lock = self.state.lock().unwrap();
|
|
global_lock
|
|
.timelines
|
|
.values()
|
|
.filter_map(|t| match t {
|
|
GlobalMapTimeline::Timeline(t) => Some(t.clone()),
|
|
_ => None,
|
|
})
|
|
.filter(|t| t.ttid.tenant_id == tenant_id)
|
|
.collect()
|
|
}
|
|
|
|
/// Delete timeline, only locally on this node or globally (also cleaning
|
|
/// remote storage WAL), depending on `action` value.
|
|
pub(crate) async fn delete_or_exclude(
|
|
&self,
|
|
ttid: &TenantTimelineId,
|
|
action: DeleteOrExclude,
|
|
) -> Result<TimelineDeleteResult, DeleteOrExcludeError> {
|
|
let tli_res = {
|
|
let state = self.state.lock().unwrap();
|
|
|
|
// Do NOT check tenant tombstones here: those were set earlier
|
|
if state.tombstones.contains_key(ttid) {
|
|
// Presence of a tombstone guarantees that a previous deletion has completed and there is no work to do.
|
|
info!("Timeline {ttid} was already deleted");
|
|
return Ok(TimelineDeleteResult { dir_existed: false });
|
|
}
|
|
|
|
state.get(ttid)
|
|
};
|
|
|
|
let result = match tli_res {
|
|
Ok(timeline) => {
|
|
info!("deleting timeline {}, action={:?}", ttid, action);
|
|
|
|
// If node is getting excluded, check the generation first.
|
|
// Then, while holding the lock cancel the timeline; it will be
|
|
// unusable after this point, and if node is added back first
|
|
// deletion must be completed and node seeded anew.
|
|
//
|
|
// We would like to avoid holding the lock while waiting for the
|
|
// gate to finish as this is deadlock prone, so for actual
|
|
// deletion will take it second time.
|
|
if let DeleteOrExclude::Exclude(ref mconf) = action {
|
|
let shared_state = timeline.read_shared_state().await;
|
|
if shared_state.sk.state().mconf.generation > mconf.generation {
|
|
return Err(DeleteOrExcludeError::Conflict {
|
|
requested: mconf.clone(),
|
|
current: shared_state.sk.state().mconf.clone(),
|
|
});
|
|
}
|
|
timeline.cancel().await;
|
|
} else {
|
|
timeline.cancel().await;
|
|
}
|
|
|
|
timeline.close().await;
|
|
|
|
info!("timeline {ttid} shut down for deletion");
|
|
|
|
// Take a lock and finish the deletion holding this mutex.
|
|
let mut shared_state = timeline.write_shared_state().await;
|
|
|
|
let only_local = !matches!(action, DeleteOrExclude::Delete);
|
|
let dir_existed = timeline.delete(&mut shared_state, only_local).await?;
|
|
|
|
Ok(TimelineDeleteResult { dir_existed })
|
|
}
|
|
Err(_) => {
|
|
// Timeline is not memory, but it may still exist on disk in broken state.
|
|
let dir_path = get_timeline_dir(self.state.lock().unwrap().conf.as_ref(), ttid);
|
|
let dir_existed = delete_dir(&dir_path).await?;
|
|
|
|
Ok(TimelineDeleteResult { dir_existed })
|
|
}
|
|
};
|
|
|
|
// Finalize deletion, by dropping Timeline objects and storing smaller tombstones. The tombstones
|
|
// are used to prevent still-running computes from re-creating the same timeline when they send data,
|
|
// and to speed up repeated deletion calls by avoiding re-listing objects.
|
|
self.state.lock().unwrap().delete(*ttid);
|
|
|
|
result
|
|
}
|
|
|
|
/// Deactivates and deletes all timelines for the tenant. Returns map of all timelines which
|
|
/// the tenant had, `true` if a timeline was active. There may be a race if new timelines are
|
|
/// created simultaneously. In that case the function will return error and the caller should
|
|
/// retry tenant deletion again later.
|
|
///
|
|
/// If only_local, doesn't remove WAL segments in remote storage.
|
|
pub async fn delete_all_for_tenant(
|
|
&self,
|
|
tenant_id: &TenantId,
|
|
action: DeleteOrExclude,
|
|
) -> Result<HashMap<TenantTimelineId, TimelineDeleteResult>> {
|
|
info!("deleting all timelines for tenant {}", tenant_id);
|
|
|
|
// Adding a tombstone before getting the timelines to prevent new timeline additions
|
|
self.state.lock().unwrap().add_tenant_tombstone(*tenant_id);
|
|
|
|
let to_delete = self.get_all_for_tenant(*tenant_id);
|
|
|
|
let mut err = None;
|
|
|
|
let mut deleted = HashMap::new();
|
|
for tli in &to_delete {
|
|
match self.delete_or_exclude(&tli.ttid, action.clone()).await {
|
|
Ok(result) => {
|
|
deleted.insert(tli.ttid, result);
|
|
}
|
|
Err(e) => {
|
|
error!("failed to delete timeline {}: {}", tli.ttid, e);
|
|
// Save error to return later.
|
|
err = Some(e);
|
|
}
|
|
}
|
|
}
|
|
|
|
// If there was an error, return it.
|
|
if let Some(e) = err {
|
|
return Err(anyhow::Error::from(e));
|
|
}
|
|
|
|
// There may be broken timelines on disk, so delete the whole tenant dir as well.
|
|
// Note that we could concurrently create new timelines while we were deleting them,
|
|
// so the directory may be not empty. In this case timelines will have bad state
|
|
// and timeline background jobs can panic.
|
|
let tenant_dir = get_tenant_dir(self.state.lock().unwrap().conf.as_ref(), tenant_id);
|
|
delete_dir(&tenant_dir).await?;
|
|
|
|
Ok(deleted)
|
|
}
|
|
|
|
pub fn housekeeping(&self, tombstone_ttl: &Duration) {
|
|
let mut state = self.state.lock().unwrap();
|
|
|
|
// We keep tombstones long enough to have a good chance of preventing rogue computes from re-creating deleted
|
|
// timelines. If a compute kept running for longer than this TTL (or across a safekeeper restart) then they
|
|
// may recreate a deleted timeline.
|
|
let now = Instant::now();
|
|
state
|
|
.tombstones
|
|
.retain(|_, v| now.duration_since(*v) < *tombstone_ttl);
|
|
state
|
|
.tenant_tombstones
|
|
.retain(|_, v| now.duration_since(*v) < *tombstone_ttl);
|
|
}
|
|
}
|
|
|
|
/// Action for delete_or_exclude.
|
|
#[derive(Clone, Debug)]
|
|
pub enum DeleteOrExclude {
|
|
/// Delete timeline globally.
|
|
Delete,
|
|
/// Legacy mode until we fully migrate to generations: like exclude deletes
|
|
/// timeline only locally, but ignores generation number.
|
|
DeleteLocal,
|
|
/// This node is getting excluded, delete timeline locally.
|
|
Exclude(membership::Configuration),
|
|
}
|
|
|
|
/// Create temp directory for a new timeline. It needs to be located on the same
|
|
/// filesystem as the rest of the timelines. It will be automatically deleted when
|
|
/// Utf8TempDir goes out of scope.
|
|
pub async fn create_temp_timeline_dir(
|
|
conf: &SafeKeeperConf,
|
|
ttid: TenantTimelineId,
|
|
) -> Result<(Utf8TempDir, Utf8PathBuf)> {
|
|
let temp_base = conf.workdir.join("tmp");
|
|
|
|
tokio::fs::create_dir_all(&temp_base).await?;
|
|
|
|
let tli_dir = camino_tempfile::Builder::new()
|
|
.suffix("_temptli")
|
|
.prefix(&format!("{}_{}_", ttid.tenant_id, ttid.timeline_id))
|
|
.tempdir_in(temp_base)?;
|
|
|
|
let tli_dir_path = tli_dir.path().to_path_buf();
|
|
|
|
Ok((tli_dir, tli_dir_path))
|
|
}
|
|
|
|
/// Do basic validation of a temp timeline, before moving it to the global map.
|
|
pub async fn validate_temp_timeline(
|
|
conf: &SafeKeeperConf,
|
|
ttid: TenantTimelineId,
|
|
path: &Utf8PathBuf,
|
|
) -> Result<(Lsn, Lsn)> {
|
|
let control_path = path.join("safekeeper.control");
|
|
|
|
let control_store = control_file::FileStorage::load_control_file(control_path)?;
|
|
if control_store.server.wal_seg_size == 0 {
|
|
bail!("wal_seg_size is not set");
|
|
}
|
|
|
|
let wal_store = wal_storage::PhysicalStorage::new(&ttid, path, &control_store, conf.no_sync)?;
|
|
|
|
let commit_lsn = control_store.commit_lsn;
|
|
let flush_lsn = wal_store.flush_lsn();
|
|
|
|
Ok((commit_lsn, flush_lsn))
|
|
}
|