mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-09 22:42:57 +00:00
Add timeline offload mechanism (#8907)
Implements an initial mechanism for offloading of archived timelines. Offloading is implemented as specified in the RFC. For now, there is no persistence, so a restart of the pageserver will retrigger downloads until the timeline is offloaded again. We trigger offloading in the compaction loop because we need the signal for whether compaction is done and everything has been uploaded or not. Part of #8088
This commit is contained in:
@@ -703,6 +703,8 @@ async fn timeline_archival_config_handler(
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
|
||||
|
||||
let request_data: TimelineArchivalConfigRequest = json_request(&mut request).await?;
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
let state = get_state(&request);
|
||||
@@ -713,7 +715,7 @@ async fn timeline_archival_config_handler(
|
||||
.get_attached_tenant_shard(tenant_shard_id)?;
|
||||
|
||||
tenant
|
||||
.apply_timeline_archival_config(timeline_id, request_data.state)
|
||||
.apply_timeline_archival_config(timeline_id, request_data.state, ctx)
|
||||
.await?;
|
||||
Ok::<_, ApiError>(())
|
||||
}
|
||||
|
||||
@@ -38,6 +38,7 @@ use std::future::Future;
|
||||
use std::sync::Weak;
|
||||
use std::time::SystemTime;
|
||||
use storage_broker::BrokerClientChannel;
|
||||
use timeline::offload::offload_timeline;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::sync::watch;
|
||||
use tokio::task::JoinSet;
|
||||
@@ -287,9 +288,13 @@ pub struct Tenant {
|
||||
|
||||
/// During timeline creation, we first insert the TimelineId to the
|
||||
/// creating map, then `timelines`, then remove it from the creating map.
|
||||
/// **Lock order**: if acquring both, acquire`timelines` before `timelines_creating`
|
||||
/// **Lock order**: if acquiring both, acquire`timelines` before `timelines_creating`
|
||||
timelines_creating: std::sync::Mutex<HashSet<TimelineId>>,
|
||||
|
||||
/// Possibly offloaded and archived timelines
|
||||
/// **Lock order**: if acquiring both, acquire`timelines` before `timelines_offloaded`
|
||||
timelines_offloaded: Mutex<HashMap<TimelineId, Arc<OffloadedTimeline>>>,
|
||||
|
||||
// This mutex prevents creation of new timelines during GC.
|
||||
// Adding yet another mutex (in addition to `timelines`) is needed because holding
|
||||
// `timelines` mutex during all GC iteration
|
||||
@@ -484,6 +489,65 @@ impl WalRedoManager {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct OffloadedTimeline {
|
||||
pub tenant_shard_id: TenantShardId,
|
||||
pub timeline_id: TimelineId,
|
||||
pub ancestor_timeline_id: Option<TimelineId>,
|
||||
|
||||
// TODO: once we persist offloaded state, make this lazily constructed
|
||||
pub remote_client: Arc<RemoteTimelineClient>,
|
||||
|
||||
/// Prevent two tasks from deleting the timeline at the same time. If held, the
|
||||
/// timeline is being deleted. If 'true', the timeline has already been deleted.
|
||||
pub delete_progress: Arc<tokio::sync::Mutex<DeleteTimelineFlow>>,
|
||||
}
|
||||
|
||||
impl OffloadedTimeline {
|
||||
fn from_timeline(timeline: &Timeline) -> Self {
|
||||
Self {
|
||||
tenant_shard_id: timeline.tenant_shard_id,
|
||||
timeline_id: timeline.timeline_id,
|
||||
ancestor_timeline_id: timeline.get_ancestor_timeline_id(),
|
||||
|
||||
remote_client: timeline.remote_client.clone(),
|
||||
delete_progress: timeline.delete_progress.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub enum TimelineOrOffloaded {
|
||||
Timeline(Arc<Timeline>),
|
||||
Offloaded(Arc<OffloadedTimeline>),
|
||||
}
|
||||
|
||||
impl TimelineOrOffloaded {
|
||||
pub fn tenant_shard_id(&self) -> TenantShardId {
|
||||
match self {
|
||||
TimelineOrOffloaded::Timeline(timeline) => timeline.tenant_shard_id,
|
||||
TimelineOrOffloaded::Offloaded(offloaded) => offloaded.tenant_shard_id,
|
||||
}
|
||||
}
|
||||
pub fn timeline_id(&self) -> TimelineId {
|
||||
match self {
|
||||
TimelineOrOffloaded::Timeline(timeline) => timeline.timeline_id,
|
||||
TimelineOrOffloaded::Offloaded(offloaded) => offloaded.timeline_id,
|
||||
}
|
||||
}
|
||||
pub fn delete_progress(&self) -> &Arc<tokio::sync::Mutex<DeleteTimelineFlow>> {
|
||||
match self {
|
||||
TimelineOrOffloaded::Timeline(timeline) => &timeline.delete_progress,
|
||||
TimelineOrOffloaded::Offloaded(offloaded) => &offloaded.delete_progress,
|
||||
}
|
||||
}
|
||||
pub fn remote_client(&self) -> &Arc<RemoteTimelineClient> {
|
||||
match self {
|
||||
TimelineOrOffloaded::Timeline(timeline) => &timeline.remote_client,
|
||||
TimelineOrOffloaded::Offloaded(offloaded) => &offloaded.remote_client,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error, PartialEq, Eq)]
|
||||
pub enum GetTimelineError {
|
||||
#[error("Timeline is shutting down")]
|
||||
@@ -1406,52 +1470,192 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn apply_timeline_archival_config(
|
||||
&self,
|
||||
fn check_to_be_archived_has_no_unarchived_children(
|
||||
timeline_id: TimelineId,
|
||||
state: TimelineArchivalState,
|
||||
timelines: &std::sync::MutexGuard<'_, HashMap<TimelineId, Arc<Timeline>>>,
|
||||
) -> Result<(), TimelineArchivalError> {
|
||||
let children: Vec<TimelineId> = timelines
|
||||
.iter()
|
||||
.filter_map(|(id, entry)| {
|
||||
if entry.get_ancestor_timeline_id() != Some(timeline_id) {
|
||||
return None;
|
||||
}
|
||||
if entry.is_archived() == Some(true) {
|
||||
return None;
|
||||
}
|
||||
Some(*id)
|
||||
})
|
||||
.collect();
|
||||
|
||||
if !children.is_empty() {
|
||||
return Err(TimelineArchivalError::HasUnarchivedChildren(children));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn check_ancestor_of_to_be_unarchived_is_not_archived(
|
||||
ancestor_timeline_id: TimelineId,
|
||||
timelines: &std::sync::MutexGuard<'_, HashMap<TimelineId, Arc<Timeline>>>,
|
||||
offloaded_timelines: &std::sync::MutexGuard<
|
||||
'_,
|
||||
HashMap<TimelineId, Arc<OffloadedTimeline>>,
|
||||
>,
|
||||
) -> Result<(), TimelineArchivalError> {
|
||||
let has_archived_parent =
|
||||
if let Some(ancestor_timeline) = timelines.get(&ancestor_timeline_id) {
|
||||
ancestor_timeline.is_archived() == Some(true)
|
||||
} else if offloaded_timelines.contains_key(&ancestor_timeline_id) {
|
||||
true
|
||||
} else {
|
||||
error!("ancestor timeline {ancestor_timeline_id} not found");
|
||||
if cfg!(debug_assertions) {
|
||||
panic!("ancestor timeline {ancestor_timeline_id} not found");
|
||||
}
|
||||
return Err(TimelineArchivalError::NotFound);
|
||||
};
|
||||
if has_archived_parent {
|
||||
return Err(TimelineArchivalError::HasArchivedParent(
|
||||
ancestor_timeline_id,
|
||||
));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn check_to_be_unarchived_timeline_has_no_archived_parent(
|
||||
timeline: &Arc<Timeline>,
|
||||
) -> Result<(), TimelineArchivalError> {
|
||||
if let Some(ancestor_timeline) = timeline.ancestor_timeline() {
|
||||
if ancestor_timeline.is_archived() == Some(true) {
|
||||
return Err(TimelineArchivalError::HasArchivedParent(
|
||||
ancestor_timeline.timeline_id,
|
||||
));
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Loads the specified (offloaded) timeline from S3 and attaches it as a loaded timeline
|
||||
async fn unoffload_timeline(
|
||||
self: &Arc<Self>,
|
||||
timeline_id: TimelineId,
|
||||
ctx: RequestContext,
|
||||
) -> Result<Arc<Timeline>, TimelineArchivalError> {
|
||||
let cancel = self.cancel.clone();
|
||||
let timeline_preload = self
|
||||
.load_timeline_metadata(timeline_id, self.remote_storage.clone(), cancel)
|
||||
.await;
|
||||
|
||||
let index_part = match timeline_preload.index_part {
|
||||
Ok(index_part) => {
|
||||
debug!("remote index part exists for timeline {timeline_id}");
|
||||
index_part
|
||||
}
|
||||
Err(DownloadError::NotFound) => {
|
||||
error!(%timeline_id, "index_part not found on remote");
|
||||
return Err(TimelineArchivalError::NotFound);
|
||||
}
|
||||
Err(e) => {
|
||||
// Some (possibly ephemeral) error happened during index_part download.
|
||||
warn!(%timeline_id, "Failed to load index_part from remote storage, failed creation? ({e})");
|
||||
return Err(TimelineArchivalError::Other(
|
||||
anyhow::Error::new(e).context("downloading index_part from remote storage"),
|
||||
));
|
||||
}
|
||||
};
|
||||
let index_part = match index_part {
|
||||
MaybeDeletedIndexPart::IndexPart(index_part) => index_part,
|
||||
MaybeDeletedIndexPart::Deleted(_index_part) => {
|
||||
info!("timeline is deleted according to index_part.json");
|
||||
return Err(TimelineArchivalError::NotFound);
|
||||
}
|
||||
};
|
||||
let remote_metadata = index_part.metadata.clone();
|
||||
let timeline_resources = self.build_timeline_resources(timeline_id);
|
||||
self.load_remote_timeline(
|
||||
timeline_id,
|
||||
index_part,
|
||||
remote_metadata,
|
||||
timeline_resources,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"failed to load remote timeline {} for tenant {}",
|
||||
timeline_id, self.tenant_shard_id
|
||||
)
|
||||
})?;
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
if let Some(timeline) = timelines.get(&timeline_id) {
|
||||
let mut offloaded_timelines = self.timelines_offloaded.lock().unwrap();
|
||||
if offloaded_timelines.remove(&timeline_id).is_none() {
|
||||
warn!("timeline already removed from offloaded timelines");
|
||||
}
|
||||
Ok(Arc::clone(timeline))
|
||||
} else {
|
||||
warn!("timeline not available directly after attach");
|
||||
Err(TimelineArchivalError::Other(anyhow::anyhow!(
|
||||
"timeline not available directly after attach"
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn apply_timeline_archival_config(
|
||||
self: &Arc<Self>,
|
||||
timeline_id: TimelineId,
|
||||
new_state: TimelineArchivalState,
|
||||
ctx: RequestContext,
|
||||
) -> Result<(), TimelineArchivalError> {
|
||||
info!("setting timeline archival config");
|
||||
let timeline = {
|
||||
// First part: figure out what is needed to do, and do validation
|
||||
let timeline_or_unarchive_offloaded = 'outer: {
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
|
||||
let Some(timeline) = timelines.get(&timeline_id) else {
|
||||
return Err(TimelineArchivalError::NotFound);
|
||||
let offloaded_timelines = self.timelines_offloaded.lock().unwrap();
|
||||
let Some(offloaded) = offloaded_timelines.get(&timeline_id) else {
|
||||
return Err(TimelineArchivalError::NotFound);
|
||||
};
|
||||
if new_state == TimelineArchivalState::Archived {
|
||||
// It's offloaded already, so nothing to do
|
||||
return Ok(());
|
||||
}
|
||||
if let Some(ancestor_timeline_id) = offloaded.ancestor_timeline_id {
|
||||
Self::check_ancestor_of_to_be_unarchived_is_not_archived(
|
||||
ancestor_timeline_id,
|
||||
&timelines,
|
||||
&offloaded_timelines,
|
||||
)?;
|
||||
}
|
||||
break 'outer None;
|
||||
};
|
||||
|
||||
if state == TimelineArchivalState::Unarchived {
|
||||
if let Some(ancestor_timeline) = timeline.ancestor_timeline() {
|
||||
if ancestor_timeline.is_archived() == Some(true) {
|
||||
return Err(TimelineArchivalError::HasArchivedParent(
|
||||
ancestor_timeline.timeline_id,
|
||||
));
|
||||
}
|
||||
// Do some validation. We release the timelines lock below, so there is potential
|
||||
// for race conditions: these checks are more present to prevent misunderstandings of
|
||||
// the API's capabilities, instead of serving as the sole way to defend their invariants.
|
||||
match new_state {
|
||||
TimelineArchivalState::Unarchived => {
|
||||
Self::check_to_be_unarchived_timeline_has_no_archived_parent(timeline)?
|
||||
}
|
||||
TimelineArchivalState::Archived => {
|
||||
Self::check_to_be_archived_has_no_unarchived_children(timeline_id, &timelines)?
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure that there are no non-archived child timelines
|
||||
let children: Vec<TimelineId> = timelines
|
||||
.iter()
|
||||
.filter_map(|(id, entry)| {
|
||||
if entry.get_ancestor_timeline_id() != Some(timeline_id) {
|
||||
return None;
|
||||
}
|
||||
if entry.is_archived() == Some(true) {
|
||||
return None;
|
||||
}
|
||||
Some(*id)
|
||||
})
|
||||
.collect();
|
||||
|
||||
if !children.is_empty() && state == TimelineArchivalState::Archived {
|
||||
return Err(TimelineArchivalError::HasUnarchivedChildren(children));
|
||||
}
|
||||
Arc::clone(timeline)
|
||||
Some(Arc::clone(timeline))
|
||||
};
|
||||
|
||||
// Second part: unarchive timeline (if needed)
|
||||
let timeline = if let Some(timeline) = timeline_or_unarchive_offloaded {
|
||||
timeline
|
||||
} else {
|
||||
// Turn offloaded timeline into a non-offloaded one
|
||||
self.unoffload_timeline(timeline_id, ctx).await?
|
||||
};
|
||||
|
||||
// Third part: upload new timeline archival state and block until it is present in S3
|
||||
let upload_needed = timeline
|
||||
.remote_client
|
||||
.schedule_index_upload_for_timeline_archival_state(state)?;
|
||||
.schedule_index_upload_for_timeline_archival_state(new_state)?;
|
||||
|
||||
if upload_needed {
|
||||
info!("Uploading new state");
|
||||
@@ -1884,7 +2088,7 @@ impl Tenant {
|
||||
///
|
||||
/// Returns whether we have pending compaction task.
|
||||
async fn compaction_iteration(
|
||||
&self,
|
||||
self: &Arc<Self>,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<bool, timeline::CompactionError> {
|
||||
@@ -1905,21 +2109,28 @@ impl Tenant {
|
||||
// while holding the lock. Then drop the lock and actually perform the
|
||||
// compactions. We don't want to block everything else while the
|
||||
// compaction runs.
|
||||
let timelines_to_compact = {
|
||||
let timelines_to_compact_or_offload;
|
||||
{
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
let timelines_to_compact = timelines
|
||||
timelines_to_compact_or_offload = timelines
|
||||
.iter()
|
||||
.filter_map(|(timeline_id, timeline)| {
|
||||
if timeline.is_active() {
|
||||
Some((*timeline_id, timeline.clone()))
|
||||
} else {
|
||||
let (is_active, can_offload) = (timeline.is_active(), timeline.can_offload());
|
||||
let has_no_unoffloaded_children = {
|
||||
!timelines
|
||||
.iter()
|
||||
.any(|(_id, tl)| tl.get_ancestor_timeline_id() == Some(*timeline_id))
|
||||
};
|
||||
let can_offload = can_offload && has_no_unoffloaded_children;
|
||||
if (is_active, can_offload) == (false, false) {
|
||||
None
|
||||
} else {
|
||||
Some((*timeline_id, timeline.clone(), (is_active, can_offload)))
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
drop(timelines);
|
||||
timelines_to_compact
|
||||
};
|
||||
}
|
||||
|
||||
// Before doing any I/O work, check our circuit breaker
|
||||
if self.compaction_circuit_breaker.lock().unwrap().is_broken() {
|
||||
@@ -1929,20 +2140,34 @@ impl Tenant {
|
||||
|
||||
let mut has_pending_task = false;
|
||||
|
||||
for (timeline_id, timeline) in &timelines_to_compact {
|
||||
has_pending_task |= timeline
|
||||
.compact(cancel, EnumSet::empty(), ctx)
|
||||
.instrument(info_span!("compact_timeline", %timeline_id))
|
||||
.await
|
||||
.inspect_err(|e| match e {
|
||||
timeline::CompactionError::ShuttingDown => (),
|
||||
timeline::CompactionError::Other(e) => {
|
||||
self.compaction_circuit_breaker
|
||||
.lock()
|
||||
.unwrap()
|
||||
.fail(&CIRCUIT_BREAKERS_BROKEN, e);
|
||||
}
|
||||
})?;
|
||||
for (timeline_id, timeline, (can_compact, can_offload)) in &timelines_to_compact_or_offload
|
||||
{
|
||||
let pending_task_left = if *can_compact {
|
||||
Some(
|
||||
timeline
|
||||
.compact(cancel, EnumSet::empty(), ctx)
|
||||
.instrument(info_span!("compact_timeline", %timeline_id))
|
||||
.await
|
||||
.inspect_err(|e| match e {
|
||||
timeline::CompactionError::ShuttingDown => (),
|
||||
timeline::CompactionError::Other(e) => {
|
||||
self.compaction_circuit_breaker
|
||||
.lock()
|
||||
.unwrap()
|
||||
.fail(&CIRCUIT_BREAKERS_BROKEN, e);
|
||||
}
|
||||
})?,
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
has_pending_task |= pending_task_left.unwrap_or(false);
|
||||
if pending_task_left == Some(false) && *can_offload {
|
||||
offload_timeline(self, timeline)
|
||||
.instrument(info_span!("offload_timeline", %timeline_id))
|
||||
.await
|
||||
.map_err(timeline::CompactionError::Other)?;
|
||||
}
|
||||
}
|
||||
|
||||
self.compaction_circuit_breaker
|
||||
@@ -2852,6 +3077,7 @@ impl Tenant {
|
||||
constructed_at: Instant::now(),
|
||||
timelines: Mutex::new(HashMap::new()),
|
||||
timelines_creating: Mutex::new(HashSet::new()),
|
||||
timelines_offloaded: Mutex::new(HashMap::new()),
|
||||
gc_cs: tokio::sync::Mutex::new(()),
|
||||
walredo_mgr,
|
||||
remote_storage,
|
||||
|
||||
@@ -141,14 +141,14 @@ impl GcBlock {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn before_delete(&self, timeline: &super::Timeline) {
|
||||
pub(crate) fn before_delete(&self, timeline_id: &super::TimelineId) {
|
||||
let unblocked = {
|
||||
let mut g = self.reasons.lock().unwrap();
|
||||
if g.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
g.remove(&timeline.timeline_id);
|
||||
g.remove(timeline_id);
|
||||
|
||||
BlockingReasons::clean_and_summarize(g).is_none()
|
||||
};
|
||||
|
||||
@@ -7,6 +7,7 @@ pub(crate) mod handle;
|
||||
mod init;
|
||||
pub mod layer_manager;
|
||||
pub(crate) mod logical_size;
|
||||
pub mod offload;
|
||||
pub mod span;
|
||||
pub mod uninit;
|
||||
mod walreceiver;
|
||||
@@ -1556,6 +1557,17 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
/// Checks if the internal state of the timeline is consistent with it being able to be offloaded.
|
||||
/// This is neccessary but not sufficient for offloading of the timeline as it might have
|
||||
/// child timelines that are not offloaded yet.
|
||||
pub(crate) fn can_offload(&self) -> bool {
|
||||
if self.remote_client.is_archived() != Some(true) {
|
||||
return false;
|
||||
}
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
/// Outermost timeline compaction operation; downloads needed layers. Returns whether we have pending
|
||||
/// compaction tasks.
|
||||
pub(crate) async fn compact(
|
||||
@@ -1818,7 +1830,6 @@ impl Timeline {
|
||||
self.current_state() == TimelineState::Active
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub(crate) fn is_archived(&self) -> Option<bool> {
|
||||
self.remote_client.is_archived()
|
||||
}
|
||||
|
||||
@@ -15,7 +15,7 @@ use crate::{
|
||||
tenant::{
|
||||
metadata::TimelineMetadata,
|
||||
remote_timeline_client::{PersistIndexPartWithDeletedFlagError, RemoteTimelineClient},
|
||||
CreateTimelineCause, DeleteTimelineError, Tenant,
|
||||
CreateTimelineCause, DeleteTimelineError, Tenant, TimelineOrOffloaded,
|
||||
},
|
||||
};
|
||||
|
||||
@@ -24,12 +24,14 @@ use super::{Timeline, TimelineResources};
|
||||
/// Mark timeline as deleted in S3 so we won't pick it up next time
|
||||
/// during attach or pageserver restart.
|
||||
/// See comment in persist_index_part_with_deleted_flag.
|
||||
async fn set_deleted_in_remote_index(timeline: &Timeline) -> Result<(), DeleteTimelineError> {
|
||||
match timeline
|
||||
.remote_client
|
||||
async fn set_deleted_in_remote_index(
|
||||
timeline: &TimelineOrOffloaded,
|
||||
) -> Result<(), DeleteTimelineError> {
|
||||
let res = timeline
|
||||
.remote_client()
|
||||
.persist_index_part_with_deleted_flag()
|
||||
.await
|
||||
{
|
||||
.await;
|
||||
match res {
|
||||
// If we (now, or already) marked it successfully as deleted, we can proceed
|
||||
Ok(()) | Err(PersistIndexPartWithDeletedFlagError::AlreadyDeleted(_)) => (),
|
||||
// Bail out otherwise
|
||||
@@ -127,9 +129,9 @@ pub(super) async fn delete_local_timeline_directory(
|
||||
}
|
||||
|
||||
/// Removes remote layers and an index file after them.
|
||||
async fn delete_remote_layers_and_index(timeline: &Timeline) -> anyhow::Result<()> {
|
||||
async fn delete_remote_layers_and_index(timeline: &TimelineOrOffloaded) -> anyhow::Result<()> {
|
||||
timeline
|
||||
.remote_client
|
||||
.remote_client()
|
||||
.delete_all()
|
||||
.await
|
||||
.context("delete_all")
|
||||
@@ -137,27 +139,41 @@ async fn delete_remote_layers_and_index(timeline: &Timeline) -> anyhow::Result<(
|
||||
|
||||
/// It is important that this gets called when DeletionGuard is being held.
|
||||
/// For more context see comments in [`DeleteTimelineFlow::prepare`]
|
||||
async fn remove_timeline_from_tenant(
|
||||
async fn remove_maybe_offloaded_timeline_from_tenant(
|
||||
tenant: &Tenant,
|
||||
timeline: &Timeline,
|
||||
timeline: &TimelineOrOffloaded,
|
||||
_: &DeletionGuard, // using it as a witness
|
||||
) -> anyhow::Result<()> {
|
||||
// Remove the timeline from the map.
|
||||
// This observes the locking order between timelines and timelines_offloaded
|
||||
let mut timelines = tenant.timelines.lock().unwrap();
|
||||
let mut timelines_offloaded = tenant.timelines_offloaded.lock().unwrap();
|
||||
let offloaded_children_exist = timelines_offloaded
|
||||
.iter()
|
||||
.any(|(_, entry)| entry.ancestor_timeline_id == Some(timeline.timeline_id()));
|
||||
let children_exist = timelines
|
||||
.iter()
|
||||
.any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline.timeline_id));
|
||||
// XXX this can happen because `branch_timeline` doesn't check `TimelineState::Stopping`.
|
||||
// We already deleted the layer files, so it's probably best to panic.
|
||||
// (Ideally, above remove_dir_all is atomic so we don't see this timeline after a restart)
|
||||
if children_exist {
|
||||
.any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline.timeline_id()));
|
||||
// XXX this can happen because of race conditions with branch creation.
|
||||
// We already deleted the remote layer files, so it's probably best to panic.
|
||||
if children_exist || offloaded_children_exist {
|
||||
panic!("Timeline grew children while we removed layer files");
|
||||
}
|
||||
|
||||
timelines
|
||||
.remove(&timeline.timeline_id)
|
||||
.expect("timeline that we were deleting was concurrently removed from 'timelines' map");
|
||||
match timeline {
|
||||
TimelineOrOffloaded::Timeline(timeline) => {
|
||||
timelines.remove(&timeline.timeline_id).expect(
|
||||
"timeline that we were deleting was concurrently removed from 'timelines' map",
|
||||
);
|
||||
}
|
||||
TimelineOrOffloaded::Offloaded(timeline) => {
|
||||
timelines_offloaded
|
||||
.remove(&timeline.timeline_id)
|
||||
.expect("timeline that we were deleting was concurrently removed from 'timelines_offloaded' map");
|
||||
}
|
||||
}
|
||||
|
||||
drop(timelines_offloaded);
|
||||
drop(timelines);
|
||||
|
||||
Ok(())
|
||||
@@ -207,9 +223,11 @@ impl DeleteTimelineFlow {
|
||||
guard.mark_in_progress()?;
|
||||
|
||||
// Now that the Timeline is in Stopping state, request all the related tasks to shut down.
|
||||
timeline.shutdown(super::ShutdownMode::Hard).await;
|
||||
if let TimelineOrOffloaded::Timeline(timeline) = &timeline {
|
||||
timeline.shutdown(super::ShutdownMode::Hard).await;
|
||||
}
|
||||
|
||||
tenant.gc_block.before_delete(&timeline);
|
||||
tenant.gc_block.before_delete(&timeline.timeline_id());
|
||||
|
||||
fail::fail_point!("timeline-delete-before-index-deleted-at", |_| {
|
||||
Err(anyhow::anyhow!(
|
||||
@@ -285,15 +303,16 @@ impl DeleteTimelineFlow {
|
||||
|
||||
guard.mark_in_progress()?;
|
||||
|
||||
let timeline = TimelineOrOffloaded::Timeline(timeline);
|
||||
Self::schedule_background(guard, tenant.conf, tenant, timeline);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn prepare(
|
||||
pub(super) fn prepare(
|
||||
tenant: &Tenant,
|
||||
timeline_id: TimelineId,
|
||||
) -> Result<(Arc<Timeline>, DeletionGuard), DeleteTimelineError> {
|
||||
) -> Result<(TimelineOrOffloaded, DeletionGuard), DeleteTimelineError> {
|
||||
// Note the interaction between this guard and deletion guard.
|
||||
// Here we attempt to lock deletion guard when we're holding a lock on timelines.
|
||||
// This is important because when you take into account `remove_timeline_from_tenant`
|
||||
@@ -307,8 +326,14 @@ impl DeleteTimelineFlow {
|
||||
let timelines = tenant.timelines.lock().unwrap();
|
||||
|
||||
let timeline = match timelines.get(&timeline_id) {
|
||||
Some(t) => t,
|
||||
None => return Err(DeleteTimelineError::NotFound),
|
||||
Some(t) => TimelineOrOffloaded::Timeline(Arc::clone(t)),
|
||||
None => {
|
||||
let offloaded_timelines = tenant.timelines_offloaded.lock().unwrap();
|
||||
match offloaded_timelines.get(&timeline_id) {
|
||||
Some(t) => TimelineOrOffloaded::Offloaded(Arc::clone(t)),
|
||||
None => return Err(DeleteTimelineError::NotFound),
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Ensure that there are no child timelines **attached to that pageserver**,
|
||||
@@ -334,30 +359,32 @@ impl DeleteTimelineFlow {
|
||||
// to remove the timeline from it.
|
||||
// Always if you have two locks that are taken in different order this can result in a deadlock.
|
||||
|
||||
let delete_progress = Arc::clone(&timeline.delete_progress);
|
||||
let delete_progress = Arc::clone(timeline.delete_progress());
|
||||
let delete_lock_guard = match delete_progress.try_lock_owned() {
|
||||
Ok(guard) => DeletionGuard(guard),
|
||||
Err(_) => {
|
||||
// Unfortunately if lock fails arc is consumed.
|
||||
return Err(DeleteTimelineError::AlreadyInProgress(Arc::clone(
|
||||
&timeline.delete_progress,
|
||||
timeline.delete_progress(),
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
timeline.set_state(TimelineState::Stopping);
|
||||
if let TimelineOrOffloaded::Timeline(timeline) = &timeline {
|
||||
timeline.set_state(TimelineState::Stopping);
|
||||
}
|
||||
|
||||
Ok((Arc::clone(timeline), delete_lock_guard))
|
||||
Ok((timeline, delete_lock_guard))
|
||||
}
|
||||
|
||||
fn schedule_background(
|
||||
guard: DeletionGuard,
|
||||
conf: &'static PageServerConf,
|
||||
tenant: Arc<Tenant>,
|
||||
timeline: Arc<Timeline>,
|
||||
timeline: TimelineOrOffloaded,
|
||||
) {
|
||||
let tenant_shard_id = timeline.tenant_shard_id;
|
||||
let timeline_id = timeline.timeline_id;
|
||||
let tenant_shard_id = timeline.tenant_shard_id();
|
||||
let timeline_id = timeline.timeline_id();
|
||||
|
||||
task_mgr::spawn(
|
||||
task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||
@@ -368,7 +395,9 @@ impl DeleteTimelineFlow {
|
||||
async move {
|
||||
if let Err(err) = Self::background(guard, conf, &tenant, &timeline).await {
|
||||
error!("Error: {err:#}");
|
||||
timeline.set_broken(format!("{err:#}"))
|
||||
if let TimelineOrOffloaded::Timeline(timeline) = timeline {
|
||||
timeline.set_broken(format!("{err:#}"))
|
||||
}
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
@@ -380,15 +409,19 @@ impl DeleteTimelineFlow {
|
||||
mut guard: DeletionGuard,
|
||||
conf: &PageServerConf,
|
||||
tenant: &Tenant,
|
||||
timeline: &Timeline,
|
||||
timeline: &TimelineOrOffloaded,
|
||||
) -> Result<(), DeleteTimelineError> {
|
||||
delete_local_timeline_directory(conf, tenant.tenant_shard_id, timeline).await?;
|
||||
// Offloaded timelines have no local state
|
||||
// TODO: once we persist offloaded information, delete the timeline from there, too
|
||||
if let TimelineOrOffloaded::Timeline(timeline) = timeline {
|
||||
delete_local_timeline_directory(conf, tenant.tenant_shard_id, timeline).await?;
|
||||
}
|
||||
|
||||
delete_remote_layers_and_index(timeline).await?;
|
||||
|
||||
pausable_failpoint!("in_progress_delete");
|
||||
|
||||
remove_timeline_from_tenant(tenant, timeline, &guard).await?;
|
||||
remove_maybe_offloaded_timeline_from_tenant(tenant, timeline, &guard).await?;
|
||||
|
||||
*guard = Self::Finished;
|
||||
|
||||
@@ -400,7 +433,7 @@ impl DeleteTimelineFlow {
|
||||
}
|
||||
}
|
||||
|
||||
struct DeletionGuard(OwnedMutexGuard<DeleteTimelineFlow>);
|
||||
pub(super) struct DeletionGuard(OwnedMutexGuard<DeleteTimelineFlow>);
|
||||
|
||||
impl Deref for DeletionGuard {
|
||||
type Target = DeleteTimelineFlow;
|
||||
|
||||
69
pageserver/src/tenant/timeline/offload.rs
Normal file
69
pageserver/src/tenant/timeline/offload.rs
Normal file
@@ -0,0 +1,69 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::tenant::{OffloadedTimeline, Tenant, TimelineOrOffloaded};
|
||||
|
||||
use super::{
|
||||
delete::{delete_local_timeline_directory, DeleteTimelineFlow, DeletionGuard},
|
||||
Timeline,
|
||||
};
|
||||
|
||||
pub(crate) async fn offload_timeline(
|
||||
tenant: &Tenant,
|
||||
timeline: &Arc<Timeline>,
|
||||
) -> anyhow::Result<()> {
|
||||
tracing::info!("offloading archived timeline");
|
||||
let (timeline, guard) = DeleteTimelineFlow::prepare(tenant, timeline.timeline_id)?;
|
||||
|
||||
let TimelineOrOffloaded::Timeline(timeline) = timeline else {
|
||||
tracing::error!("timeline already offloaded, but given timeline object");
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
// TODO extend guard mechanism above with method
|
||||
// to make deletions possible while offloading is in progress
|
||||
|
||||
// TODO mark timeline as offloaded in S3
|
||||
|
||||
let conf = &tenant.conf;
|
||||
delete_local_timeline_directory(conf, tenant.tenant_shard_id, &timeline).await?;
|
||||
|
||||
remove_timeline_from_tenant(tenant, &timeline, &guard).await?;
|
||||
|
||||
{
|
||||
let mut offloaded_timelines = tenant.timelines_offloaded.lock().unwrap();
|
||||
offloaded_timelines.insert(
|
||||
timeline.timeline_id,
|
||||
Arc::new(OffloadedTimeline::from_timeline(&timeline)),
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// It is important that this gets called when DeletionGuard is being held.
|
||||
/// For more context see comments in [`DeleteTimelineFlow::prepare`]
|
||||
async fn remove_timeline_from_tenant(
|
||||
tenant: &Tenant,
|
||||
timeline: &Timeline,
|
||||
_: &DeletionGuard, // using it as a witness
|
||||
) -> anyhow::Result<()> {
|
||||
// Remove the timeline from the map.
|
||||
let mut timelines = tenant.timelines.lock().unwrap();
|
||||
let children_exist = timelines
|
||||
.iter()
|
||||
.any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline.timeline_id));
|
||||
// XXX this can happen because `branch_timeline` doesn't check `TimelineState::Stopping`.
|
||||
// We already deleted the layer files, so it's probably best to panic.
|
||||
// (Ideally, above remove_dir_all is atomic so we don't see this timeline after a restart)
|
||||
if children_exist {
|
||||
panic!("Timeline grew children while we removed layer files");
|
||||
}
|
||||
|
||||
timelines
|
||||
.remove(&timeline.timeline_id)
|
||||
.expect("timeline that we were deleting was concurrently removed from 'timelines' map");
|
||||
|
||||
drop(timelines);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Reference in New Issue
Block a user