review adjustments, bring back timeline_detach and rename it to timeline_delete

This commit is contained in:
Dmitry Rodionov
2022-06-24 18:52:44 +03:00
committed by Dmitry Rodionov
parent 4c54e4b37d
commit e1e24336b7
14 changed files with 401 additions and 209 deletions

View File

@@ -122,8 +122,9 @@ impl S3ObjectKey {
impl RemoteObjectName for S3ObjectKey {
/// Turn a/b/c or a/b/c/ into c
fn object_name(&self) -> Option<&str> {
// corner case
if &self.0 == "/" {
// corner case, char::to_string is not const, thats why this is more verbose than it needs to be
// see https://github.com/rust-lang/rust/issues/88674
if self.0.len() == 1 && self.0.chars().next().unwrap() == S3_PREFIX_SEPARATOR {
return None;
}

View File

@@ -122,6 +122,35 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/Error"
delete:
description: "Attempts to delete specified timeline. On 500 errors should be retried"
responses:
"200":
description: Ok
"400":
description: Error when no tenant id found in path or no timeline id
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
"401":
description: Unauthorized Error
content:
application/json:
schema:
$ref: "#/components/schemas/UnauthorizedError"
"403":
description: Forbidden Error
content:
application/json:
schema:
$ref: "#/components/schemas/ForbiddenError"
"500":
description: Generic operation error
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
/v1/tenant/{tenant_id}/timeline/{timeline_id}/wal_receiver:
parameters:
@@ -190,7 +219,6 @@ paths:
"410":
description: GONE
/v1/tenant/{tenant_id}/attach:
parameters:
- name: tenant_id
@@ -200,7 +228,7 @@ paths:
type: string
format: hex
post:
description: Deprecated
description: Schedules attach operation to happen in the background for given tenant
responses:
"202":
description: Tenant attaching scheduled
@@ -299,7 +327,6 @@ paths:
schema:
$ref: "#/components/schemas/Error"
/v1/tenant/{tenant_id}/timeline/:
parameters:
- name: tenant_id

View File

@@ -353,22 +353,45 @@ async fn try_download_tenant_index(
Ok(Some(remote_timelines))
}
async fn timeline_detach_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> {
json_response(StatusCode::GONE, ())
async fn timeline_delete_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let timeline_id: ZTimelineId = parse_request_param(&request, "timeline_id")?;
let state = get_state(&request);
tokio::task::spawn_blocking(move || {
let _enter = info_span!("tenant_detach_handler", tenant = %tenant_id).entered();
tenant_mgr::delete_timeline(tenant_id, timeline_id)
})
.await
.map_err(ApiError::from_err)??;
let mut remote_index = state.remote_index.write().await;
remote_index.remove_timeline_entry(ZTenantTimelineId {
tenant_id,
timeline_id,
});
json_response(StatusCode::OK, ())
}
async fn tenant_detach_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let state = get_state(&request);
let conf = state.conf;
tokio::task::spawn_blocking(move || {
let _enter = info_span!("tenant_detach_handler", tenant = %tenant_id).entered();
let state = get_state(&request);
tenant_mgr::detach_tenant(state.conf, tenant_id)
tenant_mgr::detach_tenant(conf, tenant_id)
})
.await
.map_err(ApiError::from_err)??;
let mut remote_index = state.remote_index.write().await;
remote_index.remove_tenant_entry(&tenant_id);
json_response(StatusCode::OK, ())
}
@@ -540,6 +563,10 @@ pub fn make_router(
"/v1/tenant/:tenant_id/timeline/:timeline_id",
timeline_detail_handler,
)
.delete(
"/v1/tenant/:tenant_id/timeline/:timeline_id",
timeline_delete_handler,
)
.get(
"/v1/tenant/:tenant_id/timeline/:timeline_id/wal_receiver",
wal_receiver_get_handler,
@@ -550,7 +577,7 @@ pub fn make_router(
)
.post(
"/v1/tenant/:tenant_id/timeline/:timeline_id/detach",
timeline_detach_handler,
timeline_delete_handler,
)
.any(handler_404))
}

View File

@@ -38,9 +38,7 @@ use crate::keyspace::KeySpace;
use crate::storage_sync::index::RemoteIndex;
use crate::tenant_config::{TenantConf, TenantConfOpt};
use crate::repository::{
GcResult, Repository, RepositoryTimeline, Timeline, TimelineSyncStatusUpdate, TimelineWriter,
};
use crate::repository::{GcResult, Repository, RepositoryTimeline, Timeline, TimelineWriter};
use crate::repository::{Key, Value};
use crate::tenant_mgr;
use crate::thread_mgr;
@@ -410,28 +408,61 @@ impl Repository for LayeredRepository {
Ok(())
}
fn apply_timeline_remote_sync_status_update(
&self,
timeline_id: ZTimelineId,
timeline_sync_status_update: TimelineSyncStatusUpdate,
) -> Result<()> {
debug!(
"apply_timeline_remote_sync_status_update timeline_id: {} update: {:?}",
timeline_id, timeline_sync_status_update
// in order to be retriable detach needs to be idempotent
fn delete_timeline(&self, timeline_id: ZTimelineId) -> anyhow::Result<()> {
// in order to be retriable detach needs to be idempotent
let mut timelines = self.timelines.lock().unwrap();
// Ensure that there are no child timelines **attached to that pageserver**,
// because detach removes files, which will brake child branches
let num_children = timelines
.iter()
.filter(|(_, entry)| entry.ancestor_timeline_id() == Some(timeline_id))
.count();
ensure!(
num_children == 0,
"Cannot detach timeline which has child timelines"
);
match timeline_sync_status_update {
TimelineSyncStatusUpdate::Downloaded => {
match self.timelines.lock().unwrap().entry(timeline_id) {
Entry::Occupied(_) => bail!("We completed a download for a timeline that already exists in repository. This is a bug."),
Entry::Vacant(entry) => {
// we need to get metadata of a timeline, another option is to pass it along with Downloaded status
let metadata = load_metadata(self.conf, timeline_id, self.tenant_id).context("failed to load local metadata")?;
// finally we make newly downloaded timeline visible to repository
entry.insert(LayeredTimelineEntry::Unloaded { id: timeline_id, metadata, })
},
};
}
}
let timeline_entry = match timelines.entry(timeline_id) {
Entry::Occupied(e) => e,
Entry::Vacant(_) => bail!("timeline not found"),
};
// try to acquire gc and compaction locks to prevent errors from missing files
let _gc_guard = self
.gc_cs
.try_lock()
.map_err(|e| anyhow::anyhow!("cannot acquire gc lock {e}"))?;
let compaction_guard = timeline_entry.get().compaction_guard()?;
let local_timeline_directory = self.conf.timeline_path(&timeline_id, &self.tenant_id);
std::fs::remove_dir_all(&local_timeline_directory).with_context(|| {
format!(
"Failed to remove local timeline directory '{}'",
local_timeline_directory.display()
)
})?;
info!("detach removed files");
drop(compaction_guard);
timeline_entry.remove();
Ok(())
}
fn attach_timeline(&self, timeline_id: ZTimelineId) -> Result<()> {
debug!("attach timeline_id: {}", timeline_id,);
match self.timelines.lock().unwrap().entry(timeline_id) {
Entry::Occupied(_) => bail!("We completed a download for a timeline that already exists in repository. This is a bug."),
Entry::Vacant(entry) => {
// we need to get metadata of a timeline, another option is to pass it along with Downloaded status
let metadata = load_metadata(self.conf, timeline_id, self.tenant_id).context("failed to load local metadata")?;
// finally we make newly downloaded timeline visible to repository
entry.insert(LayeredTimelineEntry::Unloaded { id: timeline_id, metadata, })
},
};
Ok(())
}
@@ -481,6 +512,18 @@ impl LayeredTimelineEntry {
}
}
}
fn compaction_guard(&self) -> Result<Option<MutexGuard<()>>, anyhow::Error> {
match self {
LayeredTimelineEntry::Loaded(timeline) => timeline
.compaction_cs
.try_lock()
.map_err(|e| anyhow::anyhow!("cannot lock compaction critical section {e}"))
.map(Some),
LayeredTimelineEntry::Unloaded { .. } => Ok(None),
}
}
}
impl From<LayeredTimelineEntry> for RepositoryTimeline<LayeredTimeline> {

View File

@@ -7,7 +7,6 @@ use byteorder::{ByteOrder, BE};
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::fmt::Display;
use std::ops::{AddAssign, Range};
use std::sync::{Arc, RwLockReadGuard};
use std::time::Duration;
@@ -182,20 +181,6 @@ impl Value {
}
}
#[derive(Clone, Copy, Debug)]
pub enum TimelineSyncStatusUpdate {
Downloaded,
}
impl Display for TimelineSyncStatusUpdate {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = match self {
TimelineSyncStatusUpdate::Downloaded => "Downloaded",
};
f.write_str(s)
}
}
///
/// A repository corresponds to one .neon directory. One repository holds multiple
/// timelines, forked off from the same initial call to 'initdb'.
@@ -204,11 +189,7 @@ pub trait Repository: Send + Sync {
/// Updates timeline based on the `TimelineSyncStatusUpdate`, received from the remote storage synchronization.
/// See [`crate::remote_storage`] for more details about the synchronization.
fn apply_timeline_remote_sync_status_update(
&self,
timeline_id: ZTimelineId,
timeline_sync_status_update: TimelineSyncStatusUpdate,
) -> Result<()>;
fn attach_timeline(&self, timeline_id: ZTimelineId) -> Result<()>;
/// Get Timeline handle for given zenith timeline ID.
/// This function is idempotent. It doesn't change internal state in any way.
@@ -260,7 +241,10 @@ pub trait Repository: Send + Sync {
/// api's 'compact' command.
fn compaction_iteration(&self) -> Result<()>;
// Allows to retrieve remote timeline index from the repo. Used in walreceiver to grab remote consistent lsn.
/// removes timeline-related in-memory data
fn delete_timeline(&self, timeline_id: ZTimelineId) -> anyhow::Result<()>;
/// Allows to retrieve remote timeline index from the repo. Used in walreceiver to grab remote consistent lsn.
fn get_remote_index(&self) -> &RemoteIndex;
}
@@ -550,10 +534,7 @@ pub mod repo_harness {
.parse()
.unwrap();
repo.apply_timeline_remote_sync_status_update(
timeline_id,
TimelineSyncStatusUpdate::Downloaded,
)?;
repo.attach_timeline(timeline_id)?;
}
Ok(repo)

View File

@@ -178,9 +178,8 @@ use crate::{
metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME},
LayeredRepository,
},
repository::TimelineSyncStatusUpdate,
storage_sync::{self, index::RemoteIndex},
tenant_mgr::apply_timeline_sync_status_updates,
tenant_mgr::attach_downloaded_tenants,
thread_mgr,
thread_mgr::ThreadKind,
};
@@ -191,9 +190,8 @@ use metrics::{
};
use utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId};
pub use self::download::download_index_part;
use self::download::download_index_parts;
pub use self::download::download_tenant_index_parts;
pub use self::download::try_download_index_parts;
pub use self::download::TEMP_DOWNLOAD_EXTENSION;
lazy_static! {
@@ -837,7 +835,7 @@ where
.build()
.context("Failed to create storage sync runtime")?;
let applicable_index_parts = runtime.block_on(try_download_index_parts(
let applicable_index_parts = runtime.block_on(download_index_parts(
conf,
&storage,
local_timeline_files.keys().copied().collect(),
@@ -928,6 +926,8 @@ fn storage_sync_loop<P, S>(
"Sync loop step completed, {} new tenant state update(s)",
updated_tenants.len()
);
let mut sync_status_updates: HashMap<ZTenantId, HashSet<ZTimelineId>> =
HashMap::new();
let index_accessor = runtime.block_on(index.write());
for tenant_id in updated_tenants {
let tenant_entry = match index_accessor.tenant_entry(&tenant_id) {
@@ -945,7 +945,7 @@ fn storage_sync_loop<P, S>(
continue;
} else {
info!(
"Tenant {tenant_id} download completed. Registering in repository"
"Tenant {tenant_id} download completed. Picking to register in repository"
);
// Here we assume that if tenant has no in-progress downloads that
// means that it is the last completed timeline download that triggered
@@ -953,26 +953,13 @@ fn storage_sync_loop<P, S>(
// and register them all at once in a repository for download
// to be submitted in a single operation to repository
// so it can apply them at once to internal timeline map.
let sync_status_updates: HashMap<
ZTimelineId,
TimelineSyncStatusUpdate,
> = tenant_entry
.keys()
.copied()
.map(|timeline_id| {
(timeline_id, TimelineSyncStatusUpdate::Downloaded)
})
.collect();
// Batch timeline download registration to ensure that the external registration code won't block any running tasks before.
apply_timeline_sync_status_updates(
conf,
&index,
tenant_id,
sync_status_updates,
);
sync_status_updates
.insert(tenant_id, tenant_entry.keys().copied().collect());
}
}
drop(index_accessor);
// Batch timeline download registration to ensure that the external registration code won't block any running tasks before.
attach_downloaded_tenants(conf, &index, sync_status_updates);
}
}
ControlFlow::Break(()) => {
@@ -983,6 +970,14 @@ fn storage_sync_loop<P, S>(
}
}
// needed to check whether the download happened
// more informative than just a bool
#[derive(Debug)]
enum DownloadMarker {
Downloaded,
Nothing,
}
async fn process_batches<P, S>(
conf: &'static PageServerConf,
max_sync_errors: NonZeroU32,
@@ -1015,17 +1010,19 @@ where
})
.collect::<FuturesUnordered<_>>();
let mut new_timeline_states = HashSet::new();
let mut downloaded_timelines = HashSet::new();
// we purposely ignore actual state update, because we're waiting for last timeline download to happen
while let Some((sync_id, state_update)) = sync_results.next().await {
debug!("Finished storage sync task for sync id {sync_id}");
if state_update.is_some() {
new_timeline_states.insert(sync_id.tenant_id);
while let Some((sync_id, download_marker)) = sync_results.next().await {
debug!(
"Finished storage sync task for sync id {sync_id} download marker {:?}",
download_marker
);
if matches!(download_marker, DownloadMarker::Downloaded) {
downloaded_timelines.insert(sync_id.tenant_id);
}
}
new_timeline_states
downloaded_timelines
}
async fn process_sync_task_batch<P, S>(
@@ -1034,7 +1031,7 @@ async fn process_sync_task_batch<P, S>(
max_sync_errors: NonZeroU32,
sync_id: ZTenantTimelineId,
batch: SyncTaskBatch,
) -> Option<TimelineSyncStatusUpdate>
) -> DownloadMarker
where
P: Debug + Send + Sync + 'static,
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
@@ -1119,7 +1116,7 @@ where
}
}
}
None
DownloadMarker::Nothing
}
.instrument(info_span!("download_timeline_data")),
);
@@ -1173,7 +1170,7 @@ async fn download_timeline_data<P, S>(
new_download_data: SyncData<LayersDownload>,
sync_start: Instant,
task_name: &str,
) -> Option<TimelineSyncStatusUpdate>
) -> DownloadMarker
where
P: Debug + Send + Sync + 'static,
S: RemoteStorage<RemoteObjectId = P> + Send + Sync + 'static,
@@ -1202,7 +1199,7 @@ where
Ok(()) => match index.write().await.set_awaits_download(&sync_id, false) {
Ok(()) => {
register_sync_status(sync_id, sync_start, task_name, Some(true));
return Some(TimelineSyncStatusUpdate::Downloaded);
return DownloadMarker::Downloaded;
}
Err(e) => {
error!("Timeline {sync_id} was expected to be in the remote index after a successful download, but it's absent: {e:?}");
@@ -1218,7 +1215,7 @@ where
}
}
None
DownloadMarker::Nothing
}
async fn update_local_metadata(

View File

@@ -39,7 +39,7 @@ pub const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
/// So there are two requirements: keep everything in one futures unordered
/// to allow higher concurrency. Mark tenants as failed independently.
/// That requires some bookeeping.
pub async fn try_download_index_parts<P, S>(
pub async fn download_index_parts<P, S>(
conf: &'static PageServerConf,
storage: &S,
keys: HashSet<ZTenantTimelineId>,
@@ -116,7 +116,7 @@ where
});
}
let index_parts = try_download_index_parts(conf, storage, sync_ids)
let index_parts = download_index_parts(conf, storage, sync_ids)
.await
.remove(&tenant_id)
.ok_or(anyhow::anyhow!(
@@ -127,7 +127,7 @@ where
}
/// Retrieves index data from the remote storage for a given timeline.
pub async fn download_index_part<P, S>(
async fn download_index_part<P, S>(
conf: &'static PageServerConf,
storage: &S,
sync_id: ZTenantTimelineId,

View File

@@ -159,6 +159,19 @@ impl RemoteTimelineIndex {
.insert(timeline_id, entry);
}
pub fn remove_timeline_entry(
&mut self,
ZTenantTimelineId {
tenant_id,
timeline_id,
}: ZTenantTimelineId,
) -> Option<RemoteTimeline> {
self.entries
.entry(tenant_id)
.or_default()
.remove(&timeline_id)
}
pub fn tenant_entry(&self, tenant_id: &ZTenantId) -> Option<&TenantEntry> {
self.entries.get(tenant_id)
}
@@ -171,6 +184,10 @@ impl RemoteTimelineIndex {
self.entries.entry(tenant_id).or_default()
}
pub fn remove_tenant_entry(&mut self, tenant_id: &ZTenantId) -> Option<TenantEntry> {
self.entries.remove(tenant_id)
}
pub fn set_awaits_download(
&mut self,
id: &ZTenantTimelineId,

View File

@@ -4,7 +4,7 @@
use crate::config::PageServerConf;
use crate::layered_repository::{load_metadata, LayeredRepository};
use crate::pgdatadir_mapping::DatadirTimeline;
use crate::repository::{Repository, TimelineSyncStatusUpdate};
use crate::repository::Repository;
use crate::storage_sync::index::RemoteIndex;
use crate::storage_sync::{self, LocalTimelineInitStatus, SyncStartupData};
use crate::tenant_config::TenantConfOpt;
@@ -17,7 +17,7 @@ use anyhow::{bail, Context};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::sync::Arc;
use tokio::sync::mpsc;
@@ -157,7 +157,13 @@ pub fn init_tenant_mgr(conf: &'static PageServerConf) -> anyhow::Result<RemoteIn
// loading a tenant is serious, but it's better to complete the startup and
// serve other tenants, than fail completely.
error!("Failed to initialize local tenant {tenant_id}: {:?}", err);
set_tenant_state(tenant_id, TenantState::Broken)?;
if let Err(err) = set_tenant_state(tenant_id, TenantState::Broken) {
error!(
"Failed to set tenant state to broken {tenant_id}: {:?}",
err
);
}
}
}
@@ -165,48 +171,56 @@ pub fn init_tenant_mgr(conf: &'static PageServerConf) -> anyhow::Result<RemoteIn
}
pub enum LocalTimelineUpdate {
Detach(ZTenantTimelineId, std::sync::mpsc::Sender<()>),
Attach(ZTenantTimelineId, Arc<DatadirTimelineImpl>),
Detach {
id: ZTenantTimelineId,
// used to signal to the detach caller that walreceiver successfully terminated for specified id
join_confirmation_sender: std::sync::mpsc::Sender<()>,
},
Attach {
id: ZTenantTimelineId,
datadir: Arc<DatadirTimelineImpl>,
},
}
impl std::fmt::Debug for LocalTimelineUpdate {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Detach(ttid, _) => f.debug_tuple("Remove").field(ttid).finish(),
Self::Attach(ttid, _) => f.debug_tuple("Add").field(ttid).finish(),
Self::Detach { id, .. } => f.debug_tuple("Remove").field(id).finish(),
Self::Attach { id, .. } => f.debug_tuple("Add").field(id).finish(),
}
}
}
/// Updates tenants' repositories, changing their timelines state in memory.
pub fn apply_timeline_sync_status_updates(
pub fn attach_downloaded_tenants(
conf: &'static PageServerConf,
remote_index: &RemoteIndex,
tenant_id: ZTenantId,
sync_status_updates: HashMap<ZTimelineId, TimelineSyncStatusUpdate>,
sync_status_updates: HashMap<ZTenantId, HashSet<ZTimelineId>>,
) {
if sync_status_updates.is_empty() {
debug!("no sync status updates to apply");
debug!("No sync status updates to apply");
return;
}
info!(
"Applying sync status updates for tenant {tenant_id} {} timelines",
sync_status_updates.len()
);
debug!("Sync status updates: {sync_status_updates:?}");
for (tenant_id, downloaded_timelines) in sync_status_updates {
info!(
"Registering downlloaded timelines for {tenant_id} {} timelines",
downloaded_timelines.len()
);
debug!("Downloaded timelines: {downloaded_timelines:?}");
let repo = match load_local_repo(conf, tenant_id, remote_index) {
Ok(repo) => repo,
Err(e) => {
error!("Failed to load repo for tenant {tenant_id} Error: {e:?}");
return;
let repo = match load_local_repo(conf, tenant_id, remote_index) {
Ok(repo) => repo,
Err(e) => {
error!("Failed to load repo for tenant {tenant_id} Error: {e:?}");
continue;
}
};
match attach_downloaded_tenant(&repo, downloaded_timelines) {
Ok(()) => info!("successfully applied sync status updates for tenant {tenant_id}"),
Err(e) => error!(
"Failed to apply timeline sync timeline status updates for tenant {tenant_id}: {e:?}"
),
}
};
match apply_timeline_remote_sync_status_updates(&repo, sync_status_updates) {
Ok(()) => info!("successfully applied sync status updates for tenant {tenant_id}"),
Err(e) => error!(
"Failed to apply timeline sync timeline status updates for tenant {tenant_id}: {e:?}"
),
}
}
@@ -386,6 +400,59 @@ pub fn get_local_timeline_with_load(
}
}
pub fn delete_timeline(tenant_id: ZTenantId, timeline_id: ZTimelineId) -> anyhow::Result<()> {
// shutdown the timeline threads (this shuts down the walreceiver)
// FIXME it does not shut down wal receiver
// Things needed to be done
// *. check no ancestors
// *. remove from repo map
// *. remove from global tenant timelines map
// -- no new connections can see the timeline
// *. shutdown threads
// *. join walreceiver (any flushing thread?)
// *. delete files while ensuring that no gc or compaction is in progress
// 7. should we checkpoint before detach? That can be harmful during relocation,
// because it will upload to s3 something that other pageserver didnt see
// TODO put falpoints at every step. Iterate over failpoints
// in detach test and check that timeline is either attached or detached
// verify with a try to start a compute
// TODO adjust remote_index
// what is harder, write whole tenant detach correctly, or fix the timeline based one.
// TODO bail on active page_service threads?
// TODO what about inprogress downloads or uploads?
// can it be idempotent?
// FAILPOINTS: broken repo.detach_timeline
// broken wal_receiver
// broken rmdir
let (sender, receiver) = std::sync::mpsc::channel::<()>();
tenants_state::try_send_timeline_update(LocalTimelineUpdate::Detach {
id: ZTenantTimelineId::new(tenant_id, timeline_id),
join_confirmation_sender: sender,
});
info!("waiting for wal receiver to shutdown");
let _ = receiver.recv();
info!("wal receiver shutdown confirmed");
info!("waiting for threads to shutdown");
thread_mgr::shutdown_threads(None, None, Some(timeline_id));
info!("thread shutdown completed");
match tenants_state::write_tenants().get_mut(&tenant_id) {
Some(tenant) => {
tenant
.repo
.delete_timeline(timeline_id)
.context("Failed to delete tenant timeline from repo")?;
tenant.local_timelines.remove(&timeline_id);
}
None => warn!("Tenant {tenant_id} not found in local tenant state"),
}
Ok(())
}
pub fn detach_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> anyhow::Result<()> {
set_tenant_state(tenant_id, TenantState::Stopping)?;
// shutdown the tenant and timeline threads: gc, compaction, page service threads)
@@ -399,10 +466,10 @@ pub fn detach_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> any
let mut walreceiver_join_handles = Vec::with_capacity(tenant.local_timelines.len());
for timeline_id in tenant.local_timelines.keys() {
let (sender, receiver) = std::sync::mpsc::channel::<()>();
tenants_state::try_send_timeline_update(LocalTimelineUpdate::Detach(
ZTenantTimelineId::new(tenant_id, *timeline_id),
sender,
));
tenants_state::try_send_timeline_update(LocalTimelineUpdate::Detach {
id: ZTenantTimelineId::new(tenant_id, *timeline_id),
join_confirmation_sender: sender,
});
walreceiver_join_handles.push((*timeline_id, receiver));
}
// drop the tenants lock
@@ -428,11 +495,11 @@ pub fn detach_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> any
// which references ephemeral files which are deleted on drop. So if we keep these references
// code will attempt to remove files which no longer exist. This can be fixed by having shutdown
// mechanism for repository that will clean temporary data to avoid any references to ephemeral files
let local_timeline_directory = conf.tenant_path(&tenant_id);
std::fs::remove_dir_all(&local_timeline_directory).with_context(|| {
let local_tenant_directory = conf.tenant_path(&tenant_id);
std::fs::remove_dir_all(&local_tenant_directory).with_context(|| {
format!(
"Failed to remove local timeline directory '{}'",
local_timeline_directory.display()
local_tenant_directory.display()
)
})?;
@@ -453,10 +520,10 @@ fn load_local_timeline(
));
page_tline.init_logical_size()?;
tenants_state::try_send_timeline_update(LocalTimelineUpdate::Attach(
ZTenantTimelineId::new(repo.tenant_id(), timeline_id),
Arc::clone(&page_tline),
));
tenants_state::try_send_timeline_update(LocalTimelineUpdate::Attach {
id: ZTenantTimelineId::new(repo.tenant_id(), timeline_id),
datadir: Arc::clone(&page_tline),
});
Ok(page_tline)
}
@@ -486,9 +553,13 @@ pub fn list_tenants() -> Vec<TenantInfo> {
/// A timeline is categorized as broken when any of following conditions is true:
/// - failed to load the timeline's metadata
/// - the timeline's disk consistent LSN is zero
fn check_broken_timeline(repo: &LayeredRepository, timeline_id: ZTimelineId) -> anyhow::Result<()> {
let metadata = load_metadata(repo.conf, timeline_id, repo.tenant_id())
.context("failed to load metadata")?;
fn check_broken_timeline(
conf: &'static PageServerConf,
tenant_id: ZTenantId,
timeline_id: ZTimelineId,
) -> anyhow::Result<()> {
let metadata =
load_metadata(conf, timeline_id, tenant_id).context("failed to load metadata")?;
// A timeline with zero disk consistent LSN can happen when the page server
// failed to checkpoint the timeline import data when creating that timeline.
@@ -499,61 +570,56 @@ fn check_broken_timeline(repo: &LayeredRepository, timeline_id: ZTimelineId) ->
Ok(())
}
/// Note: all timelines are attached at once if and only if all of them are locally complete
fn init_local_repository(
conf: &'static PageServerConf,
tenant_id: ZTenantId,
local_timeline_init_statuses: HashMap<ZTimelineId, LocalTimelineInitStatus>,
remote_index: &RemoteIndex,
) -> anyhow::Result<(), anyhow::Error> {
// initialize local tenant
let repo = load_local_repo(conf, tenant_id, remote_index)
.with_context(|| format!("Failed to load repo for tenant {tenant_id}"))?;
let mut status_updates = HashMap::with_capacity(local_timeline_init_statuses.len());
let mut timelines_to_attach = HashSet::new();
for (timeline_id, init_status) in local_timeline_init_statuses {
match init_status {
LocalTimelineInitStatus::LocallyComplete => {
debug!("timeline {timeline_id} for tenant {tenant_id} is locally complete, registering it in repository");
if let Err(err) = check_broken_timeline(&repo, timeline_id) {
info!(
"Found a broken timeline {timeline_id} (err={err:?}), skip registering it in repository"
);
} else {
status_updates.insert(timeline_id, TimelineSyncStatusUpdate::Downloaded);
}
check_broken_timeline(conf, tenant_id, timeline_id)
.context("found broken timeline")?;
timelines_to_attach.insert(timeline_id);
}
LocalTimelineInitStatus::NeedsSync => {
debug!(
"timeline {tenant_id} for tenant {timeline_id} needs sync, \
so skipped for adding into repository until sync is finished"
);
return Ok(());
}
}
}
// initialize local tenant
let repo = load_local_repo(conf, tenant_id, remote_index)
.with_context(|| format!("Failed to load repo for tenant {tenant_id}"))?;
// Lets fail here loudly to be on the safe side.
// XXX: It may be a better api to actually distinguish between repository startup
// and processing of newly downloaded timelines.
apply_timeline_remote_sync_status_updates(&repo, status_updates)
attach_downloaded_tenant(&repo, timelines_to_attach)
.with_context(|| format!("Failed to bootstrap timelines for tenant {tenant_id}"))?;
Ok(())
}
fn apply_timeline_remote_sync_status_updates(
fn attach_downloaded_tenant(
repo: &LayeredRepository,
status_updates: HashMap<ZTimelineId, TimelineSyncStatusUpdate>,
downloaded_timelines: HashSet<ZTimelineId>,
) -> anyhow::Result<()> {
let mut registration_queue = Vec::with_capacity(status_updates.len());
let mut registration_queue = Vec::with_capacity(downloaded_timelines.len());
// first need to register the in-mem representations, to avoid missing ancestors during the local disk data registration
for (timeline_id, status_update) in status_updates {
repo.apply_timeline_remote_sync_status_update(timeline_id, status_update)
.with_context(|| {
format!("Failed to load timeline {timeline_id} into in-memory repository")
})?;
match status_update {
TimelineSyncStatusUpdate::Downloaded => registration_queue.push(timeline_id),
}
for timeline_id in downloaded_timelines {
repo.attach_timeline(timeline_id).with_context(|| {
format!("Failed to load timeline {timeline_id} into in-memory repository")
})?;
registration_queue.push(timeline_id);
}
for timeline_id in registration_queue {

View File

@@ -264,7 +264,10 @@ async fn wal_receiver_main_thread_loop_step<'a>(
info!("Processing timeline update: {update:?}");
match update {
// Timeline got detached, stop all related tasks and remove public timeline data.
LocalTimelineUpdate::Detach(id, join_sender) => {
LocalTimelineUpdate::Detach {
id,
join_confirmation_sender,
} => {
match local_timeline_wal_receivers.get_mut(&id.tenant_id) {
Some(wal_receivers) => {
if let hash_map::Entry::Occupied(o) = wal_receivers.entry(id.timeline_id) {
@@ -280,7 +283,7 @@ async fn wal_receiver_main_thread_loop_step<'a>(
};
{
WAL_RECEIVER_ENTRIES.write().await.remove(&id);
if let Err(e) = join_sender.send(()) {
if let Err(e) = join_confirmation_sender.send(()) {
warn!("cannot send wal_receiver shutdown confirmation {e}")
} else {
info!("confirm walreceiver shutdown for {id}");
@@ -288,41 +291,40 @@ async fn wal_receiver_main_thread_loop_step<'a>(
}
}
// Timeline got attached, retrieve all necessary information to start its broker loop and maintain this loop endlessly.
LocalTimelineUpdate::Attach(new_id, new_timeline) => {
LocalTimelineUpdate::Attach { id, datadir } => {
let timeline_connection_managers = local_timeline_wal_receivers
.entry(new_id.tenant_id)
.entry(id.tenant_id)
.or_default();
if timeline_connection_managers.is_empty() {
if let Err(e) =
change_tenant_state(new_id.tenant_id, TenantState::Active).await
if let Err(e) = change_tenant_state(id.tenant_id, TenantState::Active).await
{
error!("Failed to make tenant active for id {new_id}: {e:#}");
error!("Failed to make tenant active for id {id}: {e:#}");
return;
}
}
let vacant_connection_manager_entry =
match timeline_connection_managers.entry(new_id.timeline_id) {
match timeline_connection_managers.entry(id.timeline_id) {
hash_map::Entry::Occupied(_) => {
debug!("Attepted to readd an existing timeline {new_id}, ignoring");
debug!("Attepted to readd an existing timeline {id}, ignoring");
return;
}
hash_map::Entry::Vacant(v) => v,
};
let (wal_connect_timeout, lagging_wal_timeout, max_lsn_wal_lag) =
match fetch_tenant_settings(new_id.tenant_id).await {
match fetch_tenant_settings(id.tenant_id).await {
Ok(settings) => settings,
Err(e) => {
error!("Failed to fetch tenant settings for id {new_id}: {e:#}");
error!("Failed to fetch tenant settings for id {id}: {e:#}");
return;
}
};
{
WAL_RECEIVER_ENTRIES.write().await.insert(
new_id,
id,
WalReceiverEntry {
wal_producer_connstr: None,
last_received_msg_lsn: None,
@@ -333,10 +335,10 @@ async fn wal_receiver_main_thread_loop_step<'a>(
vacant_connection_manager_entry.insert(
connection_manager::spawn_connection_manager_task(
new_id,
id,
broker_prefix.to_owned(),
etcd_client.clone(),
new_timeline,
datadir,
wal_connect_timeout,
lagging_wal_timeout,
max_lsn_wal_lag,

View File

@@ -105,3 +105,26 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
branch2_cur.execute('SELECT count(*) FROM foo')
assert branch2_cur.fetchone() == (300000, )
def test_ancestor_branch_delete(neon_simple_env: NeonEnv):
env = neon_simple_env
parent_timeline_id = env.neon_cli.create_branch("test_ancestor_branch_delete_parent", "empty")
leaf_timeline_id = env.neon_cli.create_branch("test_ancestor_branch_delete_branch1",
"test_ancestor_branch_delete_parent")
ps_http = env.pageserver.http_client()
with pytest.raises(NeonPageserverApiException,
match="Failed to delete tenant timeline from repo"):
ps_http.timeline_delete(env.initial_tenant, parent_timeline_id)
ps_http.timeline_delete(env.initial_tenant, leaf_timeline_id)
# check 404
with pytest.raises(NeonPageserverApiException,
match="is not found neither locally nor remotely"):
ps_http.timeline_detail(env.initial_tenant, leaf_timeline_id)
# FIXME leaves tenant without timelines, should we prevent deletion of root timeline?
ps_http.timeline_delete(env.initial_tenant, parent_timeline_id)

View File

@@ -110,6 +110,6 @@ def test_fix_broken_timelines_on_startup(neon_simple_env: NeonEnv):
env.neon_cli.pageserver_stop(immediate=True)
env.neon_cli.pageserver_start()
# Check that the "broken" timeline is not loaded
timelines = env.neon_cli.list_timelines(tenant_id)
assert len(timelines) == 1
# Check that tenant with "broken" timeline is not loaded.
with pytest.raises(Exception, match=f"Failed to get repo for tenant {tenant_id.hex}"):
env.neon_cli.list_timelines(tenant_id)

View File

@@ -90,7 +90,7 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
# Clean up
# TODO it should clean itself
client = env.pageserver.http_client()
client.timeline_detach(tenant, timeline)
client.timeline_delete(tenant, timeline)
# Importing correct backup works
import_tar(base_tar, wal_tar)

View File

@@ -795,6 +795,27 @@ class NeonPageserverHttpClient(requests.Session):
def check_status(self):
self.get(f"http://localhost:{self.port}/v1/status").raise_for_status()
def tenant_list(self) -> List[Dict[Any, Any]]:
res = self.get(f"http://localhost:{self.port}/v1/tenant")
self.verbose_error(res)
res_json = res.json()
assert isinstance(res_json, list)
return res_json
def tenant_create(self, new_tenant_id: Optional[uuid.UUID] = None) -> uuid.UUID:
res = self.post(
f"http://localhost:{self.port}/v1/tenant",
json={
'new_tenant_id': new_tenant_id.hex if new_tenant_id else None,
},
)
self.verbose_error(res)
if res.status_code == 409:
raise Exception(f'could not create tenant: already exists for id {new_tenant_id}')
new_tenant_id = res.json()
assert isinstance(new_tenant_id, str)
return uuid.UUID(new_tenant_id)
def tenant_attach(self, tenant_id: uuid.UUID):
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/attach")
self.verbose_error(res)
@@ -803,6 +824,13 @@ class NeonPageserverHttpClient(requests.Session):
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/detach")
self.verbose_error(res)
def timeline_list(self, tenant_id: uuid.UUID) -> List[Dict[Any, Any]]:
res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/timeline")
self.verbose_error(res)
res_json = res.json()
assert isinstance(res_json, list)
return res_json
def timeline_create(
self,
tenant_id: uuid.UUID,
@@ -827,34 +855,6 @@ class NeonPageserverHttpClient(requests.Session):
assert isinstance(res_json, dict)
return res_json
def tenant_list(self) -> List[Dict[Any, Any]]:
res = self.get(f"http://localhost:{self.port}/v1/tenant")
self.verbose_error(res)
res_json = res.json()
assert isinstance(res_json, list)
return res_json
def tenant_create(self, new_tenant_id: Optional[uuid.UUID] = None) -> uuid.UUID:
res = self.post(
f"http://localhost:{self.port}/v1/tenant",
json={
'new_tenant_id': new_tenant_id.hex if new_tenant_id else None,
},
)
self.verbose_error(res)
if res.status_code == 409:
raise Exception(f'could not create tenant: already exists for id {new_tenant_id}')
new_tenant_id = res.json()
assert isinstance(new_tenant_id, str)
return uuid.UUID(new_tenant_id)
def timeline_list(self, tenant_id: uuid.UUID) -> List[Dict[Any, Any]]:
res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/timeline")
self.verbose_error(res)
res_json = res.json()
assert isinstance(res_json, list)
return res_json
def timeline_detail(self, tenant_id: uuid.UUID, timeline_id: uuid.UUID) -> Dict[Any, Any]:
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/timeline/{timeline_id.hex}?include-non-incremental-logical-size=1"
@@ -864,6 +864,14 @@ class NeonPageserverHttpClient(requests.Session):
assert isinstance(res_json, dict)
return res_json
def timeline_delete(self, tenant_id: uuid.UUID, timeline_id: uuid.UUID):
res = self.delete(
f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/timeline/{timeline_id.hex}")
self.verbose_error(res)
res_json = res.json()
assert res_json is None
return res_json
def wal_receiver_get(self, tenant_id: uuid.UUID, timeline_id: uuid.UUID) -> Dict[Any, Any]:
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id.hex}/timeline/{timeline_id.hex}/wal_receiver"