Compare commits

...

65 Commits

Author SHA1 Message Date
John Spray
35fa75699b switch deletion queue to local storage 2023-08-30 12:21:29 +01:00
John Spray
f77aa463c6 clippy 2023-08-30 10:37:06 +01:00
John Spray
4492d40c37 Merge remote-tracking branch 'upstream/main' into jcsp/deletion-queue 2023-08-30 10:34:16 +01:00
John Spray
2f58f39648 Revert "libs: make backoff::retry() take a cancellation token"
This reverts commit 8c2ff87f1a.
2023-08-30 10:26:15 +01:00
John Spray
5a217791fd libs: give TenantTimelineId a compact string serialization
The existing derive'd Serialize/Deserialize were not used anywhere.

To enable using TenantTimelineId as a key in JSON maps, serialize
as a comma separated string.  This is also a more compact
representation.
2023-08-23 10:33:44 +01:00
John Spray
c9a007d05b deletion queue: future-proof DeletionList format
It needs places to put generation numbers
2023-08-23 10:33:44 +01:00
John Spray
696b49eeba Update deletion list doc comment for Executor 2023-08-23 09:35:10 +01:00
John Spray
206420d96a deletion queue: refactor coalescing into Executor 2023-08-23 09:16:55 +01:00
John Spray
416026381f deletion queue: refactor into frontend/backend modules 2023-08-22 16:38:13 +01:00
John Spray
d9755becab Update RemoteTimelineClient doc comment 2023-08-22 14:36:57 +01:00
John Spray
9cb255be97 Update pageserver/src/deletion_queue.rs
Co-authored-by: Christian Schwarz <christian@neon.tech>
2023-08-22 14:10:11 +01:00
John Spray
57a44dcc01 Update pageserver/src/deletion_queue.rs
Co-authored-by: Christian Schwarz <christian@neon.tech>
2023-08-22 14:10:06 +01:00
John Spray
1afc6337fb Remove unused num_inprogress_deletions 2023-08-22 14:06:15 +01:00
John Spray
74058e196a remote_storage: defensively handle 404 on deletions
S3 implementions are _meant_ to return 200 on deleting
a nonexistent object, but S3 is not a standard and
some implementations have their own ideas.
2023-08-22 13:52:58 +01:00
John Spray
a116f6656f deletion queue: more consistent use of backoff::retry 2023-08-22 13:38:31 +01:00
John Spray
2c7b97245a tweak test_remote_storage_upload_queue_retries 2023-08-22 13:34:12 +01:00
John Spray
6efddbf526 flush tweaks 2023-08-22 13:17:57 +01:00
John Spray
7c4d79f4db deletion queue: cancellable retries 2023-08-22 13:05:04 +01:00
John Spray
8c2ff87f1a libs: make backoff::retry() take a cancellation token 2023-08-22 12:39:19 +01:00
John Spray
23fc247a03 remove redundant spans 2023-08-22 11:22:51 +01:00
John Spray
d8dc4425f8 Merge remote-tracking branch 'upstream/main' into jcsp/deletion-queue 2023-08-22 10:09:23 +01:00
John Spray
18159b7695 deletion queue: expose errors from push/flush 2023-08-22 10:01:10 +01:00
John Spray
c1bc9c0f70 Various test fixes + tweaks to flushing 2023-08-18 12:44:35 +01:00
John Spray
2de5efa208 Fix broken wait_untils in test_remote_storage_upload_queue_retries 2023-08-18 12:44:35 +01:00
John Spray
d330eac4bc clippy 2023-08-18 12:44:35 +01:00
John Spray
3ebceeda71 pageserver: refactor timeline args into TimelineResources
This sidesteps clippy complaining about function arg counts,
and will enable introducing more shared structures in future
without the noise of adding extra args to all the functions
involved in timeline setup.
2023-08-18 12:44:35 +01:00
John Spray
31729d6f4d pageserver: refactor tenant args into a structure
This way, when we add some new shared structure that the
tenants need a reference to, we do not have to add it
individually as an extra argument to the various functions.
2023-08-18 12:44:35 +01:00
John Spray
7e0e3517c1 clippy 2023-08-18 12:44:35 +01:00
John Spray
c4fc6e433d tests: add e2e deletion queue recovery test 2023-08-18 12:44:35 +01:00
John Spray
c36cba28d6 pageserver: generalize flush API 2023-08-18 12:44:35 +01:00
John Spray
8eaa4015de deletion queue: versions in keys 2023-08-18 12:44:35 +01:00
John Spray
10e927ee3e Add encoding versions to deletion queue structs 2023-08-18 12:44:35 +01:00
John Spray
bb3a59f275 clippy 2023-08-18 12:44:35 +01:00
John Spray
a0ed43cc12 deletion queue: add DeletionHeader for sequence numbers 2023-08-18 12:44:35 +01:00
John Spray
99dc5a5c27 Deletion queue: implement recovery on startup 2023-08-18 12:44:35 +01:00
John Spray
54db1f5d8a remote_storage: add a helper for downloading full objects
This is only for use with small objects that we will
deserialize in a non-streaming way.

Also add a strip_prefix method to RemotePath.
2023-08-18 12:44:35 +01:00
John Spray
404b25e45f Remove vestigial remote_timeline_client deletion paths 2023-08-18 12:44:35 +01:00
John Spray
f4dba9f907 tests: update tenant deletion tests for deletion queue 2023-08-18 12:44:35 +01:00
John Spray
4ec45bc7dc tests: update tenant deletion tests for deletion queue 2023-08-18 12:44:35 +01:00
John Spray
a00d4a8d8c tests: update test_remote_timeline_client_calls_started_metric for deletion queue 2023-08-18 12:44:35 +01:00
John Spray
43c9a09d8f tests: update remote storage test for deletion queue 2023-08-18 12:44:35 +01:00
John Spray
3edd7ece40 deletion queue: improve frontend retry 2023-08-18 12:44:35 +01:00
John Spray
504fe9c2b0 pageserver: send timeline deletions through the deletion queue 2023-08-18 12:44:35 +01:00
John Spray
10df237a81 deletion queue: add push for generic objects (layers and garbage) 2023-08-18 12:44:35 +01:00
John Spray
d40f8475a5 Error metric and retries 2023-08-18 12:44:35 +01:00
John Spray
164f916a40 Spawn deletion workers with info spans 2023-08-18 12:44:35 +01:00
John Spray
4ebc29768c Add failpoint for deletion execution 2023-08-18 12:44:35 +01:00
John Spray
bae62916dc pageserver/http: add /v1/deletion_queue/flush_execute
This is principally for tesing, but might be useful in
the field if we want to e.g. flush a deletion queue
before running an external scrub tool
2023-08-18 12:44:35 +01:00
John Spray
5e2b8b376c utils: add ApiError::ShuttingDown
So that handlers that check their CancellationToken
explicitly can map it to a set http status.
2023-08-18 12:44:35 +01:00
John Spray
54ec7919b8 pageserver: add deletion queue submitted/executed metrics 2023-08-18 12:44:35 +01:00
John Spray
e0bed0732c Tweak deletion queue constants 2023-08-18 12:44:35 +01:00
John Spray
9e92121cc3 pageserver: flush deletion queue on clean shutdown 2023-08-18 12:44:35 +01:00
John Spray
50a9508f4f clippy 2023-08-18 12:44:35 +01:00
John Spray
f61402be24 pageserver: testing for deletion queue 2023-08-18 12:44:35 +01:00
John Spray
975e4f2235 Refactor deletion worker construction 2023-08-18 12:44:35 +01:00
John Spray
537eca489e Implement flush_execute() in deletion queue 2023-08-18 12:44:35 +01:00
John Spray
de4882886e pageserver: implement batching in deletion queue 2023-08-18 12:44:35 +01:00
John Spray
6982288426 pageserver: implement frontend of deletion queue 2023-08-18 12:44:35 +01:00
John Spray
ccfcfa1098 remote_storage: implement Serialize/Deserialize for RemotePath 2023-08-18 12:44:35 +01:00
John Spray
e2c793c897 Use deletion queue in schedule_layer_file_deletion 2023-08-18 12:44:33 +01:00
John Spray
0fdc492aa4 Add MockDeletionQueue for unit tests 2023-08-18 11:25:40 +01:00
John Spray
787b099541 wire deletion queue into timeline 2023-08-18 11:25:40 +01:00
John Spray
3af693749d pageserver: wire deletion queue through to Tenant 2023-08-18 11:25:40 +01:00
John Spray
6f9ae6bb5f pageserver: instantiate deletion queue at process scope 2023-08-18 11:25:40 +01:00
John Spray
16d77dcb73 Initial stub implementation of deletion queue 2023-08-18 11:25:40 +01:00
26 changed files with 2280 additions and 299 deletions

View File

@@ -13,13 +13,14 @@ use std::{
collections::HashMap,
fmt::Debug,
num::{NonZeroU32, NonZeroUsize},
path::{Path, PathBuf},
path::{Path, PathBuf, StripPrefixError},
pin::Pin,
sync::Arc,
};
use anyhow::{bail, Context};
use serde::{Deserialize, Serialize};
use tokio::io;
use toml_edit::Item;
use tracing::info;
@@ -44,12 +45,34 @@ pub const DEFAULT_MAX_KEYS_PER_LIST_RESPONSE: Option<i32> = None;
const REMOTE_STORAGE_PREFIX_SEPARATOR: char = '/';
// From the S3 spec
pub const MAX_KEYS_PER_DELETE: usize = 1000;
/// Path on the remote storage, relative to some inner prefix.
/// The prefix is an implementation detail, that allows representing local paths
/// as the remote ones, stripping the local storage prefix away.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct RemotePath(PathBuf);
impl Serialize for RemotePath {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.collect_str(self)
}
}
impl<'de> Deserialize<'de> for RemotePath {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let str = String::deserialize(deserializer)?;
Ok(Self(PathBuf::from(&str)))
}
}
impl std::fmt::Display for RemotePath {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0.display())
@@ -88,6 +111,10 @@ impl RemotePath {
pub fn extension(&self) -> Option<&str> {
self.0.extension()?.to_str()
}
pub fn strip_prefix(&self, p: &RemotePath) -> Result<&Path, StripPrefixError> {
self.0.strip_prefix(&p.0)
}
}
/// Storage (potentially remote) API to manage its state.
@@ -166,6 +193,8 @@ pub enum DownloadError {
BadInput(anyhow::Error),
/// The file was not found in the remote storage.
NotFound,
/// The client was shut down
Shutdown,
/// The file was found in the remote storage, but the download failed.
Other(anyhow::Error),
}
@@ -177,6 +206,7 @@ impl std::fmt::Display for DownloadError {
write!(f, "Failed to download a remote file due to user input: {e}")
}
DownloadError::NotFound => write!(f, "No file found for the remote object id given"),
DownloadError::Shutdown => write!(f, "Client shutting down"),
DownloadError::Other(e) => write!(f, "Failed to download a remote file: {e:?}"),
}
}
@@ -241,6 +271,18 @@ impl GenericRemoteStorage {
}
}
/// For small, simple downloads where caller doesn't want to handle the streaming: return the full body
pub async fn download_all(&self, from: &RemotePath) -> Result<Vec<u8>, DownloadError> {
let mut download = self.download(from).await?;
let mut bytes = Vec::new();
tokio::io::copy(&mut download.download_stream, &mut bytes)
.await
.with_context(|| format!("Failed to download body from {from}"))
.map_err(DownloadError::Other)?;
Ok(bytes)
}
pub async fn download_byte_range(
&self,
from: &RemotePath,

View File

@@ -22,7 +22,7 @@ use aws_sdk_s3::{
Client,
};
use aws_smithy_http::body::SdkBody;
use hyper::Body;
use hyper::{Body, StatusCode};
use scopeguard::ScopeGuard;
use tokio::{
io::{self, AsyncRead},
@@ -529,7 +529,16 @@ impl RemoteStorage for S3Bucket {
}
}
Err(e) => {
return Err(e.into());
if let Some(r) = e.raw_response() {
if r.http().status() == StatusCode::NOT_FOUND {
// 404 is acceptable for deletions. AWS S3 does not return this, but
// some other implementations might (e.g. GCS XML API returns 404 on DeleteObject
// to a missing key)
continue;
} else {
return Err(anyhow::format_err!("DeleteObjects response error: {e}"));
}
}
}
}
}

View File

@@ -24,6 +24,9 @@ pub enum ApiError {
#[error("Precondition failed: {0}")]
PreconditionFailed(Box<str>),
#[error("Shutting down")]
ShuttingDown,
#[error(transparent)]
InternalServerError(anyhow::Error),
}
@@ -52,6 +55,10 @@ impl ApiError {
self.to_string(),
StatusCode::PRECONDITION_FAILED,
),
ApiError::ShuttingDown => HttpErrorBody::response_from_msg_and_status(
"Shutting down".to_string(),
StatusCode::SERVICE_UNAVAILABLE,
),
ApiError::InternalServerError(err) => HttpErrorBody::response_from_msg_and_status(
err.to_string(),
StatusCode::INTERNAL_SERVER_ERROR,

View File

@@ -244,13 +244,13 @@ id_newtype!(TenantId);
/// NOTE: It (de)serializes as an array of hex bytes, so the string representation would look
/// like `[173,80,132,115,129,226,72,254,170,201,135,108,199,26,228,24]`.
/// See [`Id`] for alternative ways to serialize it.
#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)]
#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct ConnectionId(Id);
id_newtype!(ConnectionId);
// A pair uniquely identifying Neon instance.
#[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash)]
pub struct TenantTimelineId {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
@@ -273,6 +273,36 @@ impl TenantTimelineId {
}
}
impl Serialize for TenantTimelineId {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.collect_str(self)
}
}
impl<'de> Deserialize<'de> for TenantTimelineId {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let str = String::deserialize(deserializer)?;
if let Some((tenant_part, timeline_part)) = str.split_once('/') {
Ok(Self {
tenant_id: TenantId(Id::from_hex(tenant_part).map_err(|e| {
serde::de::Error::custom(format!("Malformed tenant in TenantTimelineId: {e}"))
})?),
timeline_id: TimelineId(Id::from_hex(timeline_part).map_err(|e| {
serde::de::Error::custom(format!("Malformed timeline in TenantTimelineId {e}"))
})?),
})
} else {
Err(serde::de::Error::custom("Malformed TenantTimelineId"))
}
}
}
impl fmt::Display for TenantTimelineId {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}/{}", self.tenant_id, self.timeline_id)

View File

@@ -2,12 +2,14 @@
use std::env::{var, VarError};
use std::sync::Arc;
use std::time::Duration;
use std::{env, ops::ControlFlow, path::Path, str::FromStr};
use anyhow::{anyhow, Context};
use clap::{Arg, ArgAction, Command};
use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp};
use pageserver::deletion_queue::{DeletionQueue, DeletionQueueError};
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING};
use pageserver::task_mgr::WALRECEIVER_RUNTIME;
@@ -349,6 +351,35 @@ fn start_pageserver(
// Set up remote storage client
let remote_storage = create_remote_storage_client(conf)?;
// Set up deletion queue
let deletion_queue_cancel = tokio_util::sync::CancellationToken::new();
let (deletion_queue, deletion_frontend, deletion_backend, deletion_executor) =
DeletionQueue::new(remote_storage.clone(), conf, deletion_queue_cancel.clone());
if let Some(mut deletion_frontend) = deletion_frontend {
BACKGROUND_RUNTIME.spawn(async move {
deletion_frontend
.background()
.instrument(info_span!(parent:None, "deletion frontend"))
.await
});
}
if let Some(mut deletion_backend) = deletion_backend {
BACKGROUND_RUNTIME.spawn(async move {
deletion_backend
.background()
.instrument(info_span!(parent: None, "deletion backend"))
.await
});
}
if let Some(mut deletion_executor) = deletion_executor {
BACKGROUND_RUNTIME.spawn(async move {
deletion_executor
.background()
.instrument(info_span!(parent: None, "deletion executor"))
.await
});
}
// Up to this point no significant I/O has been done: this should have been fast. Record
// duration prior to starting I/O intensive phase of startup.
startup_checkpoint("initial", "Starting loading tenants");
@@ -386,6 +417,7 @@ fn start_pageserver(
TenantSharedResources {
broker_client: broker_client.clone(),
remote_storage: remote_storage.clone(),
deletion_queue_client: deletion_queue.new_client(),
},
order,
))?;
@@ -482,6 +514,7 @@ fn start_pageserver(
http_auth,
broker_client.clone(),
remote_storage,
deletion_queue.clone(),
disk_usage_eviction_state,
)?
.build()
@@ -604,6 +637,36 @@ fn start_pageserver(
// The plan is to change that over time.
shutdown_pageserver.take();
BACKGROUND_RUNTIME.block_on(pageserver::shutdown_pageserver(0));
// Best effort to persist any outstanding deletions, to avoid leaking objects
let dq = deletion_queue.clone();
BACKGROUND_RUNTIME.block_on(async move {
match tokio::time::timeout(Duration::from_secs(5), dq.new_client().flush()).await {
Ok(flush_r) => {
match flush_r {
Ok(()) => {
info!("Deletion queue flushed successfully on shutdown")
}
Err(e) => {
match e {
DeletionQueueError::ShuttingDown => {
// This is not harmful for correctness, but is unexpected: the deletion
// queue's workers should stay alive as long as there are any client handles instantiated.
warn!("Deletion queue stopped prematurely");
}
}
}
}
}
Err(e) => {
warn!("Timed out flushing deletion queue on shutdown ({e})")
}
}
});
// Clean shutdown of deletion queue workers
deletion_queue_cancel.cancel();
unreachable!()
}
})

View File

@@ -566,6 +566,27 @@ impl PageServerConf {
self.workdir.join("tenants")
}
pub fn deletion_prefix(&self) -> PathBuf {
self.workdir.join("deletion")
}
pub fn deletion_list_path(&self, sequence: u64) -> PathBuf {
// Encode a version in the filename, so that if we ever switch away from JSON we can
// increment this.
const VERSION: u8 = 1;
self.deletion_prefix()
.join(format!("{sequence:016x}-{VERSION:02x}.list"))
}
pub fn deletion_header_path(&self) -> PathBuf {
// Encode a version in the filename, so that if we ever switch away from JSON we can
// increment this.
const VERSION: u8 = 1;
self.deletion_prefix().join(format!("header-{VERSION:02x}"))
}
pub fn tenant_path(&self, tenant_id: &TenantId) -> PathBuf {
self.tenants_path().join(tenant_id.to_string())
}

View File

@@ -0,0 +1,782 @@
mod backend;
mod executor;
mod frontend;
use std::collections::HashMap;
use crate::metrics::DELETION_QUEUE_SUBMITTED;
use remote_storage::{GenericRemoteStorage, RemotePath};
use serde::Deserialize;
use serde::Serialize;
use serde_with::serde_as;
use thiserror::Error;
use tokio;
use tokio_util::sync::CancellationToken;
use tracing::{self, debug, error};
use utils::id::{TenantId, TenantTimelineId, TimelineId};
pub(crate) use self::backend::BackendQueueWorker;
use self::executor::ExecutorWorker;
use self::frontend::DeletionOp;
pub(crate) use self::frontend::FrontendQueueWorker;
use backend::BackendQueueMessage;
use executor::ExecutorMessage;
use frontend::FrontendQueueMessage;
use crate::{config::PageServerConf, tenant::storage_layer::LayerFileName};
// TODO: adminstrative "panic button" config property to disable all deletions
// TODO: configurable for how long to wait before executing deletions
/// We aggregate object deletions from many tenants in one place, for several reasons:
/// - Coalesce deletions into fewer DeleteObjects calls
/// - Enable Tenant/Timeline lifetimes to be shorter than the time it takes
/// to flush any outstanding deletions.
/// - Globally control throughput of deletions, as these are a low priority task: do
/// not compete with the same S3 clients/connections used for higher priority uploads.
/// - Future: enable validating that we may do deletions in a multi-attached scenario,
/// via generation numbers (see https://github.com/neondatabase/neon/pull/4919)
///
/// There are two kinds of deletion: deferred and immediate. A deferred deletion
/// may be intentionally delayed to protect passive readers of S3 data, and may
/// be subject to a generation number validation step. An immediate deletion is
/// ready to execute immediately, and is only queued up so that it can be coalesced
/// with other deletions in flight.
///
/// Deferred deletions pass through three steps:
/// - Frontend: accumulate deletion requests from Timelines, and batch them up into
/// DeletionLists, which are persisted to S3.
/// - Backend: accumulate deletion lists, and validate them en-masse prior to passing
/// the keys in the list onward for actual deletion
/// - Executor: accumulate object keys that the backend has validated for immediate
/// deletion, and execute them in batches of 1000 keys via DeleteObjects.
///
/// Non-deferred deletions, such as during timeline deletion, bypass the first
/// two stages and are passed straight into the Executor.
///
/// Internally, each stage is joined by a channel to the next. In S3, there is only
/// one queue (of DeletionLists), which is written by the frontend and consumed
/// by the backend.
#[derive(Clone)]
pub struct DeletionQueue {
client: DeletionQueueClient,
}
#[derive(Debug)]
struct FlushOp {
tx: tokio::sync::oneshot::Sender<()>,
}
impl FlushOp {
fn fire(self) {
if self.tx.send(()).is_err() {
// oneshot channel closed. This is legal: a client could be destroyed while waiting for a flush.
debug!("deletion queue flush from dropped client");
};
}
}
#[derive(Clone)]
pub struct DeletionQueueClient {
tx: tokio::sync::mpsc::Sender<FrontendQueueMessage>,
executor_tx: tokio::sync::mpsc::Sender<ExecutorMessage>,
}
#[derive(Debug, Serialize, Deserialize)]
struct TimelineDeletionList {
objects: Vec<RemotePath>,
// TODO: Tenant attachment generation will go here
// (see https://github.com/neondatabase/neon/pull/4919)
// attach_gen: u32,
}
#[serde_as]
#[derive(Debug, Serialize, Deserialize)]
struct DeletionList {
/// Serialization version, for future use
version: u8,
/// Used for constructing a unique key for each deletion list we write out.
sequence: u64,
/// To avoid repeating tenant/timeline IDs in every key, we store keys in
/// nested HashMaps by TenantTimelineID
objects: HashMap<TenantTimelineId, TimelineDeletionList>,
// TODO: Node generation will go here
// (see https://github.com/neondatabase/neon/pull/4919)
// node_gen: u32,
}
#[serde_as]
#[derive(Debug, Serialize, Deserialize)]
struct DeletionHeader {
/// Serialization version, for future use
version: u8,
/// Enable determining the next sequence number even if there are no deletion lists present.
/// If there _are_ deletion lists present, then their sequence numbers take precedence over
/// this.
last_deleted_list_seq: u64,
// TODO: this is where we will track a 'clean' sequence number that indicates all deletion
// lists <= that sequence have had their generations validated with the control plane
// and are OK to execute.
}
impl DeletionHeader {
const VERSION_LATEST: u8 = 1;
fn new(last_deleted_list_seq: u64) -> Self {
Self {
version: Self::VERSION_LATEST,
last_deleted_list_seq,
}
}
}
impl DeletionList {
const VERSION_LATEST: u8 = 1;
fn new(sequence: u64) -> Self {
Self {
version: Self::VERSION_LATEST,
sequence,
objects: HashMap::new(),
}
}
fn is_empty(&self) -> bool {
self.objects.is_empty()
}
fn len(&self) -> usize {
self.objects.values().map(|v| v.objects.len()).sum()
}
fn push(&mut self, tenant: &TenantId, timeline: &TimelineId, mut objects: Vec<RemotePath>) {
if objects.is_empty() {
// Avoid inserting an empty TimelineDeletionList: this preserves the property
// that if we have no keys, then self.objects is empty (used in Self::is_empty)
return;
}
let key = TenantTimelineId::new(*tenant, *timeline);
let entry = self
.objects
.entry(key)
.or_insert_with(|| TimelineDeletionList {
objects: Vec::new(),
});
entry.objects.append(&mut objects)
}
fn take_paths(&mut self) -> Vec<RemotePath> {
self.objects
.drain()
.flat_map(|(_k, v)| v.objects.into_iter())
.collect()
}
}
#[derive(Error, Debug)]
pub enum DeletionQueueError {
#[error("Deletion queue unavailable during shutdown")]
ShuttingDown,
}
impl DeletionQueueClient {
async fn do_push(&self, msg: FrontendQueueMessage) -> Result<(), DeletionQueueError> {
match self.tx.send(msg).await {
Ok(_) => Ok(()),
Err(e) => {
// This shouldn't happen, we should shut down all tenants before
// we shut down the global delete queue. If we encounter a bug like this,
// we may leak objects as deletions won't be processed.
error!("Deletion queue closed while pushing, shutting down? ({e})");
Err(DeletionQueueError::ShuttingDown)
}
}
}
/// Submit a list of layers for deletion: this function will return before the deletion is
/// persistent, but it may be executed at any time after this function enters: do not push
/// layers until you're sure they can be deleted safely (i.e. remote metadata no longer
/// references them).
pub(crate) async fn push_layers(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
layers: Vec<LayerFileName>,
) -> Result<(), DeletionQueueError> {
DELETION_QUEUE_SUBMITTED.inc_by(layers.len() as u64);
self.do_push(FrontendQueueMessage::Delete(DeletionOp {
tenant_id,
timeline_id,
layers,
objects: Vec::new(),
}))
.await
}
async fn do_flush(
&self,
msg: FrontendQueueMessage,
rx: tokio::sync::oneshot::Receiver<()>,
) -> Result<(), DeletionQueueError> {
self.do_push(msg).await?;
if rx.await.is_err() {
// This shouldn't happen if tenants are shut down before deletion queue. If we
// encounter a bug like this, then a flusher will incorrectly believe it has flushed
// when it hasn't, possibly leading to leaking objects.
error!("Deletion queue dropped flush op while client was still waiting");
Err(DeletionQueueError::ShuttingDown)
} else {
Ok(())
}
}
/// Wait until all previous deletions are persistent (either executed, or written to a DeletionList)
pub async fn flush(&self) -> Result<(), DeletionQueueError> {
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
self.do_flush(FrontendQueueMessage::Flush(FlushOp { tx }), rx)
.await
}
// Wait until all previous deletions are executed
pub(crate) async fn flush_execute(&self) -> Result<(), DeletionQueueError> {
debug!("flush_execute: flushing to deletion lists...");
// Flush any buffered work to deletion lists
self.flush().await?;
// Flush execution of deletion lists
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
debug!("flush_execute: flushing execution...");
self.do_flush(FrontendQueueMessage::FlushExecute(FlushOp { tx }), rx)
.await?;
debug!("flush_execute: finished flushing execution...");
Ok(())
}
/// This interface bypasses the persistent deletion queue, and any validation
/// that this pageserver is still elegible to execute the deletions. It is for
/// use in timeline deletions, where the control plane is telling us we may
/// delete everything in the timeline.
///
/// DO NOT USE THIS FROM GC OR COMPACTION CODE. Use the regular `push_layers`.
pub(crate) async fn push_immediate(
&self,
objects: Vec<RemotePath>,
) -> Result<(), DeletionQueueError> {
self.executor_tx
.send(ExecutorMessage::Delete(objects))
.await
.map_err(|_| DeletionQueueError::ShuttingDown)
}
/// Companion to push_immediate. When this returns Ok, all prior objects sent
/// into push_immediate have been deleted from remote storage.
pub(crate) async fn flush_immediate(&self) -> Result<(), DeletionQueueError> {
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
self.executor_tx
.send(ExecutorMessage::Flush(FlushOp { tx }))
.await
.map_err(|_| DeletionQueueError::ShuttingDown)?;
rx.await.map_err(|_| DeletionQueueError::ShuttingDown)
}
}
impl DeletionQueue {
pub fn new_client(&self) -> DeletionQueueClient {
self.client.clone()
}
/// Caller may use the returned object to construct clients with new_client.
/// Caller should tokio::spawn the background() members of the two worker objects returned:
/// we don't spawn those inside new() so that the caller can use their runtime/spans of choice.
///
/// If remote_storage is None, then the returned workers will also be None.
pub fn new(
remote_storage: Option<GenericRemoteStorage>,
conf: &'static PageServerConf,
cancel: CancellationToken,
) -> (
Self,
Option<FrontendQueueWorker>,
Option<BackendQueueWorker>,
Option<ExecutorWorker>,
) {
// Deep channel: it consumes deletions from all timelines and we do not want to block them
let (tx, rx) = tokio::sync::mpsc::channel(16384);
// Shallow channel: it carries DeletionLists which each contain up to thousands of deletions
let (backend_tx, backend_rx) = tokio::sync::mpsc::channel(16);
// Shallow channel: it carries lists of paths, and we expect the main queueing to
// happen in the backend (persistent), not in this queue.
let (executor_tx, executor_rx) = tokio::sync::mpsc::channel(16);
let remote_storage = match remote_storage {
None => {
return (
Self {
client: DeletionQueueClient { tx, executor_tx },
},
None,
None,
None,
)
}
Some(r) => r,
};
(
Self {
client: DeletionQueueClient {
tx,
executor_tx: executor_tx.clone(),
},
},
Some(FrontendQueueWorker::new(
conf,
rx,
backend_tx,
cancel.clone(),
)),
Some(BackendQueueWorker::new(conf, backend_rx, executor_tx)),
Some(ExecutorWorker::new(
remote_storage,
executor_rx,
cancel.clone(),
)),
)
}
}
#[cfg(test)]
mod test {
use hex_literal::hex;
use std::{
io::ErrorKind,
path::{Path, PathBuf},
};
use tracing::info;
use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
use tokio::{runtime::EnterGuard, task::JoinHandle};
use crate::tenant::harness::TenantHarness;
use super::*;
pub const TIMELINE_ID: TimelineId =
TimelineId::from_array(hex!("11223344556677881122334455667788"));
struct TestSetup {
runtime: &'static tokio::runtime::Runtime,
_entered_runtime: EnterGuard<'static>,
harness: TenantHarness,
remote_fs_dir: PathBuf,
storage: GenericRemoteStorage,
deletion_queue: DeletionQueue,
fe_worker: JoinHandle<()>,
be_worker: JoinHandle<()>,
ex_worker: JoinHandle<()>,
}
impl TestSetup {
/// Simulate a pageserver restart by destroying and recreating the deletion queue
fn restart(&mut self) {
let (deletion_queue, fe_worker, be_worker, ex_worker) = DeletionQueue::new(
Some(self.storage.clone()),
self.harness.conf,
CancellationToken::new(),
);
self.deletion_queue = deletion_queue;
let mut fe_worker = fe_worker.unwrap();
let mut be_worker = be_worker.unwrap();
let mut ex_worker = ex_worker.unwrap();
let mut fe_worker = self
.runtime
.spawn(async move { fe_worker.background().await });
let mut be_worker = self
.runtime
.spawn(async move { be_worker.background().await });
let mut ex_worker = self.runtime.spawn(async move {
drop(ex_worker.background().await);
});
std::mem::swap(&mut self.fe_worker, &mut fe_worker);
std::mem::swap(&mut self.be_worker, &mut be_worker);
std::mem::swap(&mut self.ex_worker, &mut ex_worker);
// Join the old workers
self.runtime.block_on(fe_worker).unwrap();
self.runtime.block_on(be_worker).unwrap();
self.runtime.block_on(ex_worker).unwrap();
}
}
fn setup(test_name: &str) -> anyhow::Result<TestSetup> {
let test_name = Box::leak(Box::new(format!("deletion_queue__{test_name}")));
let harness = TenantHarness::create(test_name)?;
// We do not load() the harness: we only need its config and remote_storage
// Set up a GenericRemoteStorage targetting a directory
let remote_fs_dir = harness.conf.workdir.join("remote_fs");
std::fs::create_dir_all(remote_fs_dir)?;
let remote_fs_dir = std::fs::canonicalize(harness.conf.workdir.join("remote_fs"))?;
let storage_config = RemoteStorageConfig {
max_concurrent_syncs: std::num::NonZeroUsize::new(
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS,
)
.unwrap(),
max_sync_errors: std::num::NonZeroU32::new(
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS,
)
.unwrap(),
storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()),
};
let storage = GenericRemoteStorage::from_config(&storage_config).unwrap();
let runtime = Box::leak(Box::new(
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?,
));
let entered_runtime = runtime.enter();
let (deletion_queue, fe_worker, be_worker, ex_worker) = DeletionQueue::new(
Some(storage.clone()),
harness.conf,
CancellationToken::new(),
);
let mut fe_worker = fe_worker.unwrap();
let mut be_worker = be_worker.unwrap();
let mut ex_worker = ex_worker.unwrap();
let fe_worker_join = runtime.spawn(async move { fe_worker.background().await });
let be_worker_join = runtime.spawn(async move { be_worker.background().await });
let ex_worker_join = runtime.spawn(async move {
drop(ex_worker.background().await);
});
Ok(TestSetup {
runtime,
_entered_runtime: entered_runtime,
harness,
remote_fs_dir,
storage,
deletion_queue,
fe_worker: fe_worker_join,
be_worker: be_worker_join,
ex_worker: ex_worker_join,
})
}
// TODO: put this in a common location so that we can share with remote_timeline_client's tests
fn assert_remote_files(expected: &[&str], remote_path: &Path) {
let mut expected: Vec<String> = expected.iter().map(|x| String::from(*x)).collect();
expected.sort();
let mut found: Vec<String> = Vec::new();
let dir = match std::fs::read_dir(remote_path) {
Ok(d) => d,
Err(e) => {
if e.kind() == ErrorKind::NotFound {
if expected.is_empty() {
// We are asserting prefix is empty: it is expected that the dir is missing
return;
} else {
assert_eq!(expected, Vec::<String>::new());
unreachable!();
}
} else {
panic!(
"Unexpected error listing {0}: {e}",
remote_path.to_string_lossy()
);
}
}
};
for entry in dir.flatten() {
let entry_name = entry.file_name();
let fname = entry_name.to_str().unwrap();
found.push(String::from(fname));
}
found.sort();
assert_eq!(expected, found);
}
fn assert_local_files(expected: &[&str], directory: &Path) {
let mut dir = match std::fs::read_dir(directory) {
Ok(d) => d,
Err(_) => {
assert_eq!(expected, &Vec::<String>::new());
return;
}
};
let mut found = Vec::new();
while let Some(dentry) = dir.next() {
let dentry = dentry.unwrap();
let file_name = dentry.file_name();
let file_name_str = file_name.to_string_lossy();
found.push(file_name_str.to_string());
}
found.sort();
assert_eq!(expected, found);
}
#[test]
fn deletion_queue_smoke() -> anyhow::Result<()> {
// Basic test that the deletion queue processes the deletions we pass into it
let ctx = setup("deletion_queue_smoke").expect("Failed test setup");
let client = ctx.deletion_queue.new_client();
let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
let tenant_id = ctx.harness.tenant_id;
let content: Vec<u8> = "victim1 contents".into();
let relative_remote_path = ctx
.harness
.conf
.remote_path(&ctx.harness.timeline_path(&TIMELINE_ID))
.expect("Failed to construct remote path");
let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
let deletion_prefix = ctx.harness.conf.deletion_prefix();
// Inject a victim file to remote storage
info!("Writing");
std::fs::create_dir_all(&remote_timeline_path)?;
std::fs::write(
remote_timeline_path.join(layer_file_name_1.to_string()),
content,
)?;
assert_remote_files(&[&layer_file_name_1.file_name()], &remote_timeline_path);
// File should still be there after we push it to the queue (we haven't pushed enough to flush anything)
info!("Pushing");
ctx.runtime.block_on(client.push_layers(
tenant_id,
TIMELINE_ID,
[layer_file_name_1.clone()].to_vec(),
))?;
assert_remote_files(&[&layer_file_name_1.file_name()], &remote_timeline_path);
assert_local_files(&[], &deletion_prefix);
// File should still be there after we write a deletion list (we haven't pushed enough to execute anything)
info!("Flushing");
ctx.runtime.block_on(client.flush())?;
assert_remote_files(&[&layer_file_name_1.file_name()], &remote_timeline_path);
assert_local_files(&["0000000000000001-01.list"], &deletion_prefix);
// File should go away when we execute
info!("Flush-executing");
ctx.runtime.block_on(client.flush_execute())?;
assert_remote_files(&[], &remote_timeline_path);
assert_local_files(&["header-01"], &deletion_prefix);
// Flushing on an empty queue should succeed immediately, and not write any lists
info!("Flush-executing on empty");
ctx.runtime.block_on(client.flush_execute())?;
assert_local_files(&["header-01"], &deletion_prefix);
Ok(())
}
#[test]
fn deletion_queue_recovery() -> anyhow::Result<()> {
// Basic test that the deletion queue processes the deletions we pass into it
let mut ctx = setup("deletion_queue_recovery").expect("Failed test setup");
let client = ctx.deletion_queue.new_client();
let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
let tenant_id = ctx.harness.tenant_id;
let content: Vec<u8> = "victim1 contents".into();
let relative_remote_path = ctx
.harness
.conf
.remote_path(&ctx.harness.timeline_path(&TIMELINE_ID))
.expect("Failed to construct remote path");
let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
let deletion_prefix = ctx.harness.conf.deletion_prefix();
// Inject a file, delete it, and flush to a deletion list
std::fs::create_dir_all(&remote_timeline_path)?;
std::fs::write(
remote_timeline_path.join(layer_file_name_1.to_string()),
content,
)?;
ctx.runtime.block_on(client.push_layers(
tenant_id,
TIMELINE_ID,
[layer_file_name_1.clone()].to_vec(),
))?;
ctx.runtime.block_on(client.flush())?;
assert_local_files(&["0000000000000001-01.list"], &deletion_prefix);
// Restart the deletion queue
drop(client);
ctx.restart();
let client = ctx.deletion_queue.new_client();
// If we have recovered the deletion list properly, then executing after restart should purge it
info!("Flush-executing");
ctx.runtime.block_on(client.flush_execute())?;
assert_remote_files(&[], &remote_timeline_path);
assert_local_files(&["header-01"], &deletion_prefix);
Ok(())
}
}
/// A lightweight queue which can issue ordinary DeletionQueueClient objects, but doesn't do any persistence
/// or coalescing, and doesn't actually execute any deletions unless you call pump() to kick it.
#[cfg(test)]
pub mod mock {
use tracing::info;
use super::*;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
pub struct MockDeletionQueue {
tx: tokio::sync::mpsc::Sender<FrontendQueueMessage>,
executor_tx: tokio::sync::mpsc::Sender<ExecutorMessage>,
tx_pump: tokio::sync::mpsc::Sender<FlushOp>,
executed: Arc<AtomicUsize>,
}
impl MockDeletionQueue {
pub fn new(
remote_storage: Option<GenericRemoteStorage>,
conf: &'static PageServerConf,
) -> Self {
let (tx, mut rx) = tokio::sync::mpsc::channel(16384);
let (tx_pump, mut rx_pump) = tokio::sync::mpsc::channel::<FlushOp>(1);
let (executor_tx, mut executor_rx) = tokio::sync::mpsc::channel(16384);
let executed = Arc::new(AtomicUsize::new(0));
let executed_bg = executed.clone();
tokio::spawn(async move {
let remote_storage = match &remote_storage {
Some(rs) => rs,
None => {
info!("No remote storage configured, deletion queue will not run");
return;
}
};
info!("Running mock deletion queue");
// Each time we are asked to pump, drain the queue of deletions
while let Some(flush_op) = rx_pump.recv().await {
info!("Executing all pending deletions");
// Transform all executor messages to generic frontend messages
while let Ok(msg) = executor_rx.try_recv() {
match msg {
ExecutorMessage::Delete(objects) => {
for path in objects {
match remote_storage.delete(&path).await {
Ok(_) => {
debug!("Deleted {path}");
}
Err(e) => {
error!(
"Failed to delete {path}, leaking object! ({e})"
);
}
}
executed_bg.fetch_add(1, Ordering::Relaxed);
}
}
ExecutorMessage::Flush(flush_op) => {
flush_op.fire();
}
}
}
while let Ok(msg) = rx.try_recv() {
match msg {
FrontendQueueMessage::Delete(op) => {
let timeline_path =
conf.timeline_path(&op.tenant_id, &op.timeline_id);
let mut objects = op.objects;
for layer in op.layers {
let local_path = timeline_path.join(layer.file_name());
let path = match conf.remote_path(&local_path) {
Ok(p) => p,
Err(e) => {
panic!("Can't make a timeline path! {e}");
}
};
objects.push(path);
}
for path in objects {
info!("Executing deletion {path}");
match remote_storage.delete(&path).await {
Ok(_) => {
debug!("Deleted {path}");
}
Err(e) => {
error!(
"Failed to delete {path}, leaking object! ({e})"
);
}
}
executed_bg.fetch_add(1, Ordering::Relaxed);
}
}
FrontendQueueMessage::Flush(op) => {
op.fire();
}
FrontendQueueMessage::FlushExecute(op) => {
// We have already executed all prior deletions because mock does them inline
op.fire();
}
}
info!("All pending deletions have been executed");
}
flush_op
.tx
.send(())
.expect("Test called flush but dropped before finishing");
}
});
Self {
tx,
tx_pump,
executor_tx,
executed,
}
}
pub fn get_executed(&self) -> usize {
self.executed.load(Ordering::Relaxed)
}
pub async fn pump(&self) {
let (tx, rx) = tokio::sync::oneshot::channel();
self.tx_pump
.send(FlushOp { tx })
.await
.expect("pump called after deletion queue loop stopped");
rx.await
.expect("Mock delete queue shutdown while waiting to pump");
}
pub(crate) fn new_client(&self) -> DeletionQueueClient {
DeletionQueueClient {
tx: self.tx.clone(),
executor_tx: self.executor_tx.clone(),
}
}
}
}

View File

@@ -0,0 +1,181 @@
use std::time::Duration;
use tracing::debug;
use tracing::info;
use tracing::warn;
use crate::config::PageServerConf;
use crate::metrics::DELETION_QUEUE_ERRORS;
use super::executor::ExecutorMessage;
use super::DeletionHeader;
use super::DeletionList;
use super::FlushOp;
// After this length of time, execute deletions which are elegible to run,
// even if we haven't accumulated enough for a full-sized DeleteObjects
const EXECUTE_IDLE_DEADLINE: Duration = Duration::from_secs(60);
// If we have received this number of keys, proceed with attempting to execute
const AUTOFLUSH_KEY_COUNT: usize = 16384;
#[derive(Debug)]
pub(super) enum BackendQueueMessage {
Delete(DeletionList),
Flush(FlushOp),
}
pub struct BackendQueueWorker {
conf: &'static PageServerConf,
rx: tokio::sync::mpsc::Receiver<BackendQueueMessage>,
tx: tokio::sync::mpsc::Sender<ExecutorMessage>,
// Accumulate some lists to execute in a batch.
// The purpose of this accumulation is to implement batched validation of
// attachment generations, when split-brain protection is implemented.
// (see https://github.com/neondatabase/neon/pull/4919)
pending_lists: Vec<DeletionList>,
// Sum of all the lengths of lists in pending_lists
pending_key_count: usize,
// DeletionLists we have fully executed, which may be deleted
// from remote storage.
executed_lists: Vec<DeletionList>,
}
impl BackendQueueWorker {
pub(super) fn new(
conf: &'static PageServerConf,
rx: tokio::sync::mpsc::Receiver<BackendQueueMessage>,
tx: tokio::sync::mpsc::Sender<ExecutorMessage>,
) -> Self {
Self {
conf,
rx,
tx,
pending_lists: Vec::new(),
pending_key_count: 0,
executed_lists: Vec::new(),
}
}
async fn cleanup_lists(&mut self) {
debug!(
"cleanup_lists: {0} executed lists, {1} pending lists",
self.executed_lists.len(),
self.pending_lists.len()
);
// Lists are always pushed into the queues + executed list in sequence order, so
// no sort is required: can find the highest sequence number by peeking at last element
let max_executed_seq = match self.executed_lists.last() {
Some(v) => v.sequence,
None => {
// No executed lists, nothing to clean up.
return;
}
};
// In case this is the last list, write a header out first so that
// we don't risk losing our knowledge of the sequence number (on replay, our
// next sequence number is the highest list seen + 1, or read from the header
// if there are no lists)
let header = DeletionHeader::new(max_executed_seq);
debug!("Writing header {:?}", header);
let header_bytes =
serde_json::to_vec(&header).expect("Failed to serialize deletion header");
let header_path = self.conf.deletion_header_path();
if let Err(e) = tokio::fs::write(&header_path, header_bytes).await {
warn!("Failed to upload deletion queue header: {e:#}");
DELETION_QUEUE_ERRORS
.with_label_values(&["put_header"])
.inc();
return;
}
while let Some(list) = self.executed_lists.pop() {
let list_path = self.conf.deletion_list_path(list.sequence);
if let Err(e) = tokio::fs::remove_file(&list_path).await {
// Unexpected: we should have permissions and nothing else should
// be touching these files
tracing::error!("Failed to delete {0}: {e:#}", list_path.display());
self.executed_lists.push(list);
break;
}
}
}
pub async fn flush(&mut self) {
self.pending_key_count = 0;
// Submit all keys from pending DeletionLists into the executor
for list in &mut self.pending_lists {
let objects = list.take_paths();
if let Err(_e) = self.tx.send(ExecutorMessage::Delete(objects)).await {
warn!("Shutting down");
return;
};
}
// Flush the executor to ensure all the operations we just submitted have been executed
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
let flush_op = FlushOp { tx };
if let Err(_e) = self.tx.send(ExecutorMessage::Flush(flush_op)).await {
warn!("Shutting down");
return;
};
if rx.await.is_err() {
warn!("Shutting down");
return;
}
// After flush, we are assured that all contents of the pending lists
// are executed
self.executed_lists.append(&mut self.pending_lists);
// Erase the lists we executed
self.cleanup_lists().await;
}
pub async fn background(&mut self) {
// TODO: if we would like to be able to defer deletions while a Layer still has
// refs (but it will be elegible for deletion after process ends), then we may
// add an ephemeral part to BackendQueueMessage::Delete that tracks which keys
// in the deletion list may not be deleted yet, with guards to block on while
// we wait to proceed.
loop {
let msg = match tokio::time::timeout(EXECUTE_IDLE_DEADLINE, self.rx.recv()).await {
Ok(Some(m)) => m,
Ok(None) => {
// All queue senders closed
info!("Shutting down");
break;
}
Err(_) => {
// Timeout, we hit deadline to execute whatever we have in hand. These functions will
// return immediately if no work is pending
self.flush().await;
continue;
}
};
match msg {
BackendQueueMessage::Delete(list) => {
self.pending_key_count += list.objects.len();
self.pending_lists.push(list);
if self.pending_key_count > AUTOFLUSH_KEY_COUNT {
self.flush().await;
}
}
BackendQueueMessage::Flush(op) => {
self.flush().await;
op.fire();
}
}
}
}
}

View File

@@ -0,0 +1,143 @@
use remote_storage::GenericRemoteStorage;
use remote_storage::RemotePath;
use remote_storage::MAX_KEYS_PER_DELETE;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use tracing::info;
use tracing::warn;
use crate::metrics::DELETION_QUEUE_ERRORS;
use crate::metrics::DELETION_QUEUE_EXECUTED;
use super::DeletionQueueError;
use super::FlushOp;
const AUTOFLUSH_INTERVAL: Duration = Duration::from_secs(10);
pub(super) enum ExecutorMessage {
Delete(Vec<RemotePath>),
Flush(FlushOp),
}
/// Non-persistent deletion queue, for coalescing multiple object deletes into
/// larger DeleteObjects requests.
pub struct ExecutorWorker {
// Accumulate up to 1000 keys for the next deletion operation
accumulator: Vec<RemotePath>,
rx: tokio::sync::mpsc::Receiver<ExecutorMessage>,
cancel: CancellationToken,
remote_storage: GenericRemoteStorage,
}
impl ExecutorWorker {
pub(super) fn new(
remote_storage: GenericRemoteStorage,
rx: tokio::sync::mpsc::Receiver<ExecutorMessage>,
cancel: CancellationToken,
) -> Self {
Self {
remote_storage,
rx,
cancel,
accumulator: Vec::new(),
}
}
/// Wrap the remote `delete_objects` with a failpoint
pub async fn remote_delete(&self) -> Result<(), anyhow::Error> {
fail::fail_point!("deletion-queue-before-execute", |_| {
info!("Skipping execution, failpoint set");
DELETION_QUEUE_ERRORS
.with_label_values(&["failpoint"])
.inc();
Err(anyhow::anyhow!("failpoint hit"))
});
self.remote_storage.delete_objects(&self.accumulator).await
}
/// Block until everything in accumulator has been executed
pub async fn flush(&mut self) -> Result<(), DeletionQueueError> {
while !self.accumulator.is_empty() && !self.cancel.is_cancelled() {
match self.remote_delete().await {
Ok(()) => {
// Note: we assume that the remote storage layer returns Ok(()) if some
// or all of the deleted objects were already gone.
DELETION_QUEUE_EXECUTED.inc_by(self.accumulator.len() as u64);
info!(
"Executed deletion batch {}..{}",
self.accumulator
.first()
.expect("accumulator should be non-empty"),
self.accumulator
.last()
.expect("accumulator should be non-empty"),
);
self.accumulator.clear();
}
Err(e) => {
warn!("DeleteObjects request failed: {e:#}, will retry");
DELETION_QUEUE_ERRORS.with_label_values(&["execute"]).inc();
}
};
}
if self.cancel.is_cancelled() {
// Expose an error because we may not have actually flushed everything
Err(DeletionQueueError::ShuttingDown)
} else {
Ok(())
}
}
pub async fn background(&mut self) -> Result<(), DeletionQueueError> {
self.accumulator.reserve(MAX_KEYS_PER_DELETE);
loop {
if self.cancel.is_cancelled() {
return Err(DeletionQueueError::ShuttingDown);
}
let msg = match tokio::time::timeout(AUTOFLUSH_INTERVAL, self.rx.recv()).await {
Ok(Some(m)) => m,
Ok(None) => {
// All queue senders closed
info!("Shutting down");
return Err(DeletionQueueError::ShuttingDown);
}
Err(_) => {
// Timeout, we hit deadline to execute whatever we have in hand. These functions will
// return immediately if no work is pending
self.flush().await?;
continue;
}
};
match msg {
ExecutorMessage::Delete(mut list) => {
while !list.is_empty() || self.accumulator.len() == MAX_KEYS_PER_DELETE {
if self.accumulator.len() == MAX_KEYS_PER_DELETE {
self.flush().await?;
// If we have received this number of keys, proceed with attempting to execute
assert_eq!(self.accumulator.len(), 0);
}
let available_slots = MAX_KEYS_PER_DELETE - self.accumulator.len();
let take_count = std::cmp::min(available_slots, list.len());
for path in list.drain(list.len() - take_count..) {
self.accumulator.push(path);
}
}
}
ExecutorMessage::Flush(flush_op) => {
// If flush() errors, we drop the flush_op and the caller will get
// an error recv()'ing their oneshot channel.
self.flush().await?;
flush_op.fire();
}
}
}
}
}

View File

@@ -0,0 +1,357 @@
use super::BackendQueueMessage;
use super::DeletionHeader;
use super::DeletionList;
use super::FlushOp;
use std::fs::create_dir_all;
use std::time::Duration;
use regex::Regex;
use remote_storage::RemotePath;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use tracing::info;
use tracing::warn;
use utils::id::TenantId;
use utils::id::TimelineId;
use crate::config::PageServerConf;
use crate::metrics::DELETION_QUEUE_ERRORS;
use crate::metrics::DELETION_QUEUE_SUBMITTED;
use crate::tenant::storage_layer::LayerFileName;
// The number of keys in a DeletionList before we will proactively persist it
// (without reaching a flush deadline). This aims to deliver objects of the order
// of magnitude 1MB when we are under heavy delete load.
const DELETION_LIST_TARGET_SIZE: usize = 16384;
// Ordinarily, we only flush to DeletionList periodically, to bound the window during
// which we might leak objects from not flushing a DeletionList after
// the objects are already unlinked from timeline metadata.
const FRONTEND_DEFAULT_TIMEOUT: Duration = Duration::from_millis(10000);
// If someone is waiting for a flush to DeletionList, only delay a little to accumulate
// more objects before doing the flush.
const FRONTEND_FLUSHING_TIMEOUT: Duration = Duration::from_millis(100);
#[derive(Debug)]
pub(super) struct DeletionOp {
pub(super) tenant_id: TenantId,
pub(super) timeline_id: TimelineId,
// `layers` and `objects` are both just lists of objects. `layers` is used if you do not
// have a config object handy to project it to a remote key, and need the consuming worker
// to do it for you.
pub(super) layers: Vec<LayerFileName>,
pub(super) objects: Vec<RemotePath>,
}
#[derive(Debug)]
pub(super) enum FrontendQueueMessage {
Delete(DeletionOp),
// Wait until all prior deletions make it into a persistent DeletionList
Flush(FlushOp),
// Wait until all prior deletions have been executed (i.e. objects are actually deleted)
FlushExecute(FlushOp),
}
pub struct FrontendQueueWorker {
conf: &'static PageServerConf,
// Incoming frontend requests to delete some keys
rx: tokio::sync::mpsc::Receiver<FrontendQueueMessage>,
// Outbound requests to the backend to execute deletion lists we have composed.
tx: tokio::sync::mpsc::Sender<BackendQueueMessage>,
// The list we are currently building, contains a buffer of keys to delete
// and our next sequence number
pending: DeletionList,
// These FlushOps should fire the next time we flush
pending_flushes: Vec<FlushOp>,
// Worker loop is torn down when this fires.
cancel: CancellationToken,
}
impl FrontendQueueWorker {
pub(super) fn new(
conf: &'static PageServerConf,
rx: tokio::sync::mpsc::Receiver<FrontendQueueMessage>,
tx: tokio::sync::mpsc::Sender<BackendQueueMessage>,
cancel: CancellationToken,
) -> Self {
Self {
pending: DeletionList::new(1),
conf,
rx,
tx,
pending_flushes: Vec::new(),
cancel,
}
}
async fn upload_pending_list(&mut self) -> anyhow::Result<()> {
let path = self.conf.deletion_list_path(self.pending.sequence);
let bytes = serde_json::to_vec(&self.pending).expect("Failed to serialize deletion list");
tokio::fs::write(&path, &bytes).await?;
tokio::fs::File::open(&path).await?.sync_all().await?;
Ok(())
}
/// Try to flush `list` to persistent storage
///
/// This does not return errors, because on failure to flush we do not lose
/// any state: flushing will be retried implicitly on the next deadline
async fn flush(&mut self) {
if self.pending.is_empty() {
for f in self.pending_flushes.drain(..) {
f.fire();
}
return;
}
match self.upload_pending_list().await {
Ok(_) => {
info!(sequence = self.pending.sequence, "Stored deletion list");
for f in self.pending_flushes.drain(..) {
f.fire();
}
let mut onward_list = DeletionList::new(self.pending.sequence);
std::mem::swap(&mut onward_list.objects, &mut self.pending.objects);
// We have consumed out of pending: reset it for the next incoming deletions to accumulate there
self.pending = DeletionList::new(self.pending.sequence + 1);
if let Err(e) = self.tx.send(BackendQueueMessage::Delete(onward_list)).await {
// This is allowed to fail: it will only happen if the backend worker is shut down,
// so we can just drop this on the floor.
info!("Deletion list dropped, this is normal during shutdown ({e:#})");
}
}
Err(e) => {
DELETION_QUEUE_ERRORS.with_label_values(&["put_list"]).inc();
warn!(
sequence = self.pending.sequence,
"Failed to write deletion list to remote storage, will retry later ({e:#})"
);
}
}
}
async fn recover(&mut self) -> Result<(), anyhow::Error> {
// Load header: this is not required to be present, e.g. when a pageserver first runs
let header_path = self.conf.deletion_header_path();
// Synchronous, but we only do it once per process lifetime so it's tolerable
create_dir_all(&self.conf.deletion_prefix())?;
let header_bytes = match tokio::fs::read(&header_path).await {
Ok(h) => Ok(Some(h)),
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
debug!(
"Deletion header {0} not found, first start?",
header_path.display()
);
Ok(None)
} else {
Err(e)
}
}
}?;
if let Some(header_bytes) = header_bytes {
if let Some(header) = match serde_json::from_slice::<DeletionHeader>(&header_bytes) {
Ok(h) => Some(h),
Err(e) => {
warn!(
"Failed to deserialize deletion header, ignoring {0}: {e:#}",
header_path.display()
);
// This should never happen unless we make a mistake with our serialization.
// Ignoring a deletion header is not consequential for correctnes because all deletions
// are ultimately allowed to fail: worst case we leak some objects for the scrubber to clean up.
None
}
} {
self.pending.sequence =
std::cmp::max(self.pending.sequence, header.last_deleted_list_seq + 1);
};
};
let mut dir = match tokio::fs::read_dir(&self.conf.deletion_prefix()).await {
Ok(d) => d,
Err(e) => {
warn!(
"Failed to open deletion list directory {0}: {e:#}",
header_path.display()
);
// Give up: if we can't read the deletion list directory, we probably can't
// write lists into it later, so the queue won't work.
return Err(e.into());
}
};
let list_name_pattern = Regex::new("([a-zA-Z0-9]{16})-([a-zA-Z0-9]{2}).list").unwrap();
let mut seqs: Vec<u64> = Vec::new();
while let Some(dentry) = dir.next_entry().await? {
let file_name = dentry.file_name().to_owned();
let basename = file_name.to_string_lossy();
let seq_part = if let Some(m) = list_name_pattern.captures(&basename) {
m.get(1)
.expect("Non optional group should be present")
.as_str()
} else {
warn!("Unexpected key in deletion queue: {basename}");
continue;
};
let seq: u64 = match u64::from_str_radix(seq_part, 16) {
Ok(s) => s,
Err(e) => {
warn!("Malformed key '{basename}': {e}");
continue;
}
};
seqs.push(seq);
}
seqs.sort();
// Initialize the next sequence number in the frontend based on the maximum of the highest list we see,
// and the last list that was deleted according to the header. Combined with writing out the header
// prior to deletions, this guarnatees no re-use of sequence numbers.
if let Some(max_list_seq) = seqs.last() {
self.pending.sequence = std::cmp::max(self.pending.sequence, max_list_seq + 1);
}
for s in seqs {
let list_path = self.conf.deletion_list_path(s);
let list_bytes = tokio::fs::read(&list_path).await?;
let deletion_list = match serde_json::from_slice::<DeletionList>(&list_bytes) {
Ok(l) => l,
Err(e) => {
// Drop the list on the floor: any objects it referenced will be left behind
// for scrubbing to clean up. This should never happen unless we have a serialization bug.
warn!(sequence = s, "Failed to deserialize deletion list: {e}");
continue;
}
};
// We will drop out of recovery if this fails: it indicates that we are shutting down
// or the backend has panicked
DELETION_QUEUE_SUBMITTED.inc_by(deletion_list.len() as u64);
self.tx
.send(BackendQueueMessage::Delete(deletion_list))
.await?;
}
info!(next_sequence = self.pending.sequence, "Replay complete");
Ok(())
}
/// This is the front-end ingest, where we bundle up deletion requests into DeletionList
/// and write them out, for later
pub async fn background(&mut self) {
info!("Started deletion frontend worker");
let mut recovered: bool = false;
while !self.cancel.is_cancelled() {
let timeout = if self.pending_flushes.is_empty() {
FRONTEND_DEFAULT_TIMEOUT
} else {
FRONTEND_FLUSHING_TIMEOUT
};
let msg = match tokio::time::timeout(timeout, self.rx.recv()).await {
Ok(Some(msg)) => msg,
Ok(None) => {
// Queue sender destroyed, shutting down
break;
}
Err(_) => {
// Hit deadline, flush.
self.flush().await;
continue;
}
};
// On first message, do recovery. This avoids unnecessary recovery very
// early in startup, and simplifies testing by avoiding a 404 reading the
// header on every first pageserver startup.
if !recovered {
// Before accepting any input from this pageserver lifetime, recover all deletion lists that are in S3
if let Err(e) = self.recover().await {
// This should only happen in truly unrecoverable cases, like the recovery finding that the backend
// queue receiver has been dropped.
info!("Deletion queue recover aborted, deletion queue will not proceed ({e})");
return;
} else {
recovered = true;
}
}
match msg {
FrontendQueueMessage::Delete(op) => {
debug!(
"Delete: ingesting {0} layers, {1} other objects",
op.layers.len(),
op.objects.len()
);
let timeline_path = self.conf.timeline_path(&op.tenant_id, &op.timeline_id);
let mut layer_paths = Vec::new();
for layer in op.layers {
// TODO go directly to remote path without composing local path
let local_path = timeline_path.join(layer.file_name());
let path = match self.conf.remote_path(&local_path) {
Ok(p) => p,
Err(e) => {
panic!("Can't make a timeline path! {e}");
}
};
layer_paths.push(path);
}
self.pending
.push(&op.tenant_id, &op.timeline_id, layer_paths);
self.pending
.push(&op.tenant_id, &op.timeline_id, op.objects);
}
FrontendQueueMessage::Flush(op) => {
if self.pending.objects.is_empty() {
// Execute immediately
debug!("Flush: No pending objects, flushing immediately");
op.fire()
} else {
// Execute next time we flush
debug!("Flush: adding to pending flush list for next deadline flush");
self.pending_flushes.push(op);
}
}
FrontendQueueMessage::FlushExecute(op) => {
debug!("FlushExecute: passing through to backend");
// We do not flush to a deletion list here: the client sends a Flush before the FlushExecute
if let Err(e) = self.tx.send(BackendQueueMessage::Flush(op)).await {
info!("Can't flush, shutting down ({e})");
// Caller will get error when their oneshot sender was dropped.
}
}
}
if self.pending.objects.len() > DELETION_LIST_TARGET_SIZE
|| !self.pending_flushes.is_empty()
{
self.flush().await;
}
}
info!("Deletion queue shut down.");
}
}

View File

@@ -52,6 +52,29 @@ paths:
schema:
type: object
/v1/deletion_queue/flush:
parameters:
- name: execute
in: query
required: false
schema:
type: boolean
description:
If true, attempt to execute deletions. If false, just flush deletions to persistent deletion lists.
put:
description: Execute any deletions currently enqueued
security: []
responses:
"200":
description: |
Flush completed: if execute was true, then enqueued deletions have been completed. If execute was false,
then enqueued deletions have been persisted to deletion lists, and may have been completed.
content:
application/json:
schema:
type: object
/v1/tenant/{tenant_id}:
parameters:
- name: tenant_id

View File

@@ -23,6 +23,7 @@ use super::models::{
TimelineCreateRequest, TimelineGcRequest, TimelineInfo,
};
use crate::context::{DownloadBehavior, RequestContext};
use crate::deletion_queue::{DeletionQueue, DeletionQueueError};
use crate::metrics::{StorageTimeOperation, STORAGE_TIME_GLOBAL};
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::task_mgr::TaskKind;
@@ -56,6 +57,7 @@ struct State {
auth: Option<Arc<JwtAuth>>,
allowlist_routes: Vec<Uri>,
remote_storage: Option<GenericRemoteStorage>,
deletion_queue: DeletionQueue,
broker_client: storage_broker::BrokerClientChannel,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
}
@@ -65,6 +67,7 @@ impl State {
conf: &'static PageServerConf,
auth: Option<Arc<JwtAuth>>,
remote_storage: Option<GenericRemoteStorage>,
deletion_queue: DeletionQueue,
broker_client: storage_broker::BrokerClientChannel,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
) -> anyhow::Result<Self> {
@@ -78,6 +81,7 @@ impl State {
allowlist_routes,
remote_storage,
broker_client,
deletion_queue,
disk_usage_eviction_state,
})
}
@@ -490,6 +494,7 @@ async fn tenant_attach_handler(
tenant_conf,
state.broker_client.clone(),
remote_storage.clone(),
&state.deletion_queue,
&ctx,
)
.instrument(info_span!("tenant_attach", %tenant_id))
@@ -552,6 +557,7 @@ async fn tenant_load_handler(
tenant_id,
state.broker_client.clone(),
state.remote_storage.clone(),
&state.deletion_queue,
&ctx,
)
.instrument(info_span!("load", %tenant_id))
@@ -877,6 +883,7 @@ async fn tenant_create_handler(
target_tenant_id,
state.broker_client.clone(),
state.remote_storage.clone(),
&state.deletion_queue,
&ctx,
)
.instrument(info_span!("tenant_create", tenant_id = %target_tenant_id))
@@ -1117,6 +1124,48 @@ async fn always_panic_handler(
json_response(StatusCode::NO_CONTENT, ())
}
async fn deletion_queue_flush(
r: Request<Body>,
cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let state = get_state(&r);
if state.remote_storage.is_none() {
// Nothing to do if remote storage is disabled.
return json_response(StatusCode::OK, ());
}
let execute = parse_query_param(&r, "execute")?.unwrap_or(false);
let queue_client = state.deletion_queue.new_client();
tokio::select! {
flush_result = async {
if execute {
queue_client.flush_execute().await
} else {
queue_client.flush().await
}
} => {
match flush_result {
Ok(())=> {
json_response(StatusCode::OK, ())
},
Err(e) => {
match e {
DeletionQueueError::ShuttingDown => {
Err(ApiError::ShuttingDown)
}
}
}
}
},
_ = cancel.cancelled() => {
Err(ApiError::ShuttingDown)
}
}
}
async fn disk_usage_eviction_run(
mut r: Request<Body>,
_cancel: CancellationToken,
@@ -1326,6 +1375,7 @@ pub fn make_router(
auth: Option<Arc<JwtAuth>>,
broker_client: BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
deletion_queue: DeletionQueue,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
) -> anyhow::Result<RouterBuilder<hyper::Body, ApiError>> {
let spec = include_bytes!("openapi_spec.yml");
@@ -1355,6 +1405,7 @@ pub fn make_router(
conf,
auth,
remote_storage,
deletion_queue,
broker_client,
disk_usage_eviction_state,
)
@@ -1439,6 +1490,9 @@ pub fn make_router(
.put("/v1/disk_usage_eviction/run", |r| {
api_handler(r, disk_usage_eviction_run)
})
.put("/v1/deletion_queue/flush", |r| {
api_handler(r, deletion_queue_flush)
})
.put("/v1/tenant/:tenant_id/break", |r| {
testing_api_handler("set tenant state to broken", r, handle_tenant_break)
})

View File

@@ -3,6 +3,7 @@ pub mod basebackup;
pub mod config;
pub mod consumption_metrics;
pub mod context;
pub mod deletion_queue;
pub mod disk_usage_eviction_task;
pub mod http;
pub mod import_datadir;

View File

@@ -795,6 +795,31 @@ static REMOTE_TIMELINE_CLIENT_CALLS_STARTED_HIST: Lazy<HistogramVec> = Lazy::new
.expect("failed to define a metric")
});
pub(crate) static DELETION_QUEUE_SUBMITTED: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_deletion_queue_submitted_total",
"Number of objects submitted for deletion"
)
.expect("failed to define a metric")
});
pub(crate) static DELETION_QUEUE_EXECUTED: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_deletion_queue_executed_total",
"Number of objects deleted"
)
.expect("failed to define a metric")
});
pub(crate) static DELETION_QUEUE_ERRORS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_deletion_queue_errors_total",
"Incremented on retryable remote I/O errors writing deletion lists or executing deletions.",
&["op_kind"],
)
.expect("failed to define a metric")
});
static REMOTE_TIMELINE_CLIENT_BYTES_STARTED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_remote_timeline_client_bytes_started",

View File

@@ -59,6 +59,7 @@ use self::timeline::EvictionTaskTenantState;
use self::timeline::TimelineResources;
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::deletion_queue::DeletionQueueClient;
use crate::import_datadir;
use crate::is_uninit_mark;
use crate::metrics::TENANT_ACTIVATION;
@@ -156,6 +157,7 @@ pub const TENANT_DELETED_MARKER_FILE_NAME: &str = "deleted";
pub struct TenantSharedResources {
pub broker_client: storage_broker::BrokerClientChannel,
pub remote_storage: Option<GenericRemoteStorage>,
pub deletion_queue_client: DeletionQueueClient,
}
///
@@ -191,6 +193,9 @@ pub struct Tenant {
// provides access to timeline data sitting in the remote storage
remote_storage: Option<GenericRemoteStorage>,
// Access to global deletion queue for when this tenant wants to schedule a deletion
deletion_queue_client: Option<DeletionQueueClient>,
/// Cached logical sizes updated updated on each [`Tenant::gather_size_inputs`].
cached_logical_sizes: tokio::sync::Mutex<HashMap<(TimelineId, Lsn), u64>>,
cached_synthetic_tenant_size: Arc<AtomicU64>,
@@ -525,6 +530,7 @@ impl Tenant {
broker_client: storage_broker::BrokerClientChannel,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
remote_storage: GenericRemoteStorage,
deletion_queue_client: DeletionQueueClient,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
// TODO dedup with spawn_load
@@ -539,6 +545,7 @@ impl Tenant {
wal_redo_manager,
tenant_id,
Some(remote_storage.clone()),
Some(deletion_queue_client),
));
// Do all the hard work in the background
@@ -727,6 +734,7 @@ impl Tenant {
remote_metadata,
TimelineResources {
remote_client: Some(remote_client),
deletion_queue_client: self.deletion_queue_client.clone(),
},
ctx,
)
@@ -751,6 +759,7 @@ impl Tenant {
timeline_id,
&index_part.metadata,
Some(remote_timeline_client),
self.deletion_queue_client.clone(),
None,
)
.await
@@ -852,6 +861,7 @@ impl Tenant {
wal_redo_manager,
tenant_id,
None,
None,
))
}
@@ -885,6 +895,7 @@ impl Tenant {
let broker_client = resources.broker_client;
let remote_storage = resources.remote_storage;
let deletion_queue_client = resources.deletion_queue_client;
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
let tenant = Tenant::new(
@@ -894,6 +905,7 @@ impl Tenant {
wal_redo_manager,
tenant_id,
remote_storage.clone(),
Some(deletion_queue_client),
);
let tenant = Arc::new(tenant);
@@ -1301,6 +1313,7 @@ impl Tenant {
timeline_id,
&local_metadata,
Some(remote_client),
self.deletion_queue_client.clone(),
init_order,
)
.await
@@ -1350,6 +1363,7 @@ impl Tenant {
timeline_id,
&local_metadata,
None,
None,
init_order,
)
.await
@@ -2292,7 +2306,16 @@ impl Tenant {
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
tenant_id: TenantId,
remote_storage: Option<GenericRemoteStorage>,
deletion_queue_client: Option<DeletionQueueClient>,
) -> Tenant {
#[cfg(not(test))]
match state {
TenantState::Broken { .. } => {}
_ => {
// Non-broken tenants must be constructed with a deletion queue
assert!(deletion_queue_client.is_some());
}
}
let (state, mut rx) = watch::channel(state);
tokio::spawn(async move {
@@ -2358,6 +2381,7 @@ impl Tenant {
gc_cs: tokio::sync::Mutex::new(()),
walredo_mgr,
remote_storage,
deletion_queue_client,
state,
cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()),
cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
@@ -2937,7 +2961,10 @@ impl Tenant {
None
};
TimelineResources { remote_client }
TimelineResources {
remote_client,
deletion_queue_client: self.deletion_queue_client.clone(),
}
}
/// Creates intermediate timeline structure and its files.
@@ -3501,7 +3528,7 @@ pub mod harness {
pub async fn load(&self) -> (Arc<Tenant>, RequestContext) {
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
(
self.try_load(&ctx, None)
self.try_load(&ctx, None, None)
.await
.expect("failed to load test tenant"),
ctx,
@@ -3512,6 +3539,7 @@ pub mod harness {
&self,
ctx: &RequestContext,
remote_storage: Option<remote_storage::GenericRemoteStorage>,
deletion_queue_client: Option<DeletionQueueClient>,
) -> anyhow::Result<Arc<Tenant>> {
let walredo_mgr = Arc::new(TestRedoManager);
@@ -3522,6 +3550,7 @@ pub mod harness {
walredo_mgr,
self.tenant_id,
remote_storage,
deletion_queue_client,
));
tenant
.load(None, ctx)
@@ -4086,7 +4115,7 @@ mod tests {
std::fs::write(metadata_path, metadata_bytes)?;
let err = harness
.try_load(&ctx, None)
.try_load(&ctx, None, None)
.await
.err()
.expect("should fail");

View File

@@ -18,6 +18,7 @@ use utils::crashsafe;
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::deletion_queue::DeletionQueue;
use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::TenantConfOpt;
use crate::tenant::delete::DeleteTenantFlow;
@@ -205,6 +206,7 @@ pub(crate) fn schedule_local_tenant_processing(
resources.broker_client,
tenants,
remote_storage,
resources.deletion_queue_client,
ctx,
) {
Ok(tenant) => tenant,
@@ -349,6 +351,7 @@ pub async fn create_tenant(
tenant_id: TenantId,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
deletion_queue: &DeletionQueue,
ctx: &RequestContext,
) -> Result<Arc<Tenant>, TenantMapInsertError> {
tenant_map_insert(tenant_id, || {
@@ -362,6 +365,7 @@ pub async fn create_tenant(
let tenant_resources = TenantSharedResources {
broker_client,
remote_storage,
deletion_queue_client: deletion_queue.new_client(),
};
let created_tenant =
schedule_local_tenant_processing(conf, &tenant_directory, tenant_resources, None, &TENANTS, ctx)?;
@@ -513,6 +517,7 @@ pub async fn load_tenant(
tenant_id: TenantId,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: Option<GenericRemoteStorage>,
deletion_queue: &DeletionQueue,
ctx: &RequestContext,
) -> Result<(), TenantMapInsertError> {
tenant_map_insert(tenant_id, || {
@@ -526,6 +531,7 @@ pub async fn load_tenant(
let resources = TenantSharedResources {
broker_client,
remote_storage,
deletion_queue_client: deletion_queue.new_client(),
};
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, resources, None, &TENANTS, ctx)
.with_context(|| {
@@ -594,6 +600,7 @@ pub async fn attach_tenant(
tenant_conf: TenantConfOpt,
broker_client: storage_broker::BrokerClientChannel,
remote_storage: GenericRemoteStorage,
deletion_queue: &DeletionQueue,
ctx: &RequestContext,
) -> Result<(), TenantMapInsertError> {
tenant_map_insert(tenant_id, || {
@@ -611,6 +618,7 @@ pub async fn attach_tenant(
let resources = TenantSharedResources {
broker_client,
remote_storage: Some(remote_storage),
deletion_queue_client: deletion_queue.new_client(),
};
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, resources, None, &TENANTS, ctx)?;
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.

View File

@@ -56,9 +56,11 @@
//! # Consistency
//!
//! To have a consistent remote structure, it's important that uploads and
//! deletions are performed in the right order. For example, the index file
//! contains a list of layer files, so it must not be uploaded until all the
//! layer files that are in its list have been successfully uploaded.
//! deletions are performed in the right order. For example:
//! - the index file contains a list of layer files, so it must not be uploaded
//! until all the layer files that are in its list have been successfully uploaded.
//! - objects must be removed from the index before being deleted, and that updated
//! index must be written to remote storage before deleting the objects from remote storage.
//!
//! The contract between client and its user is that the user is responsible of
//! scheduling operations in an order that keeps the remote consistent as
@@ -70,10 +72,12 @@
//! correct order, and the client will parallelize the operations in a way that
//! is safe.
//!
//! The caller should be careful with deletion, though. They should not delete
//! local files that have been scheduled for upload but not yet finished uploading.
//! Otherwise the upload will fail. To wait for an upload to finish, use
//! the 'wait_completion' function (more on that later.)
//! The caller should be careful with deletion, though:
//! - they should not delete local files that have been scheduled for upload but
//! not yet finished uploading. Otherwise the upload will fail. To wait for an
//! upload to finish, use the 'wait_completion' function (more on that later.)
//! - they should not to remote deletions via DeletionQueue without waiting for
//! the latest metadata to upload via RemoteTimelineClient.
//!
//! All of this relies on the following invariants:
//!
@@ -200,12 +204,11 @@
//! [`Tenant::timeline_init_and_sync`]: super::Tenant::timeline_init_and_sync
//! [`Timeline::load_layer_map`]: super::Timeline::load_layer_map
mod delete;
mod download;
pub mod index;
mod upload;
use anyhow::Context;
use anyhow::{bail, Context};
use chrono::{NaiveDateTime, Utc};
// re-export these
pub use download::{is_temp_download_file, list_remote_timelines};
@@ -226,6 +229,7 @@ use tracing::{debug, error, info, instrument, warn};
use tracing::{info_span, Instrument};
use utils::lsn::Lsn;
use crate::deletion_queue::DeletionQueueClient;
use crate::metrics::{
MeasureRemoteOp, RemoteOpFileKind, RemoteOpKind, RemoteTimelineClientMetrics,
RemoteTimelineClientMetricsCallTrackSize, REMOTE_ONDEMAND_DOWNLOADED_BYTES,
@@ -234,7 +238,6 @@ use crate::metrics::{
use crate::task_mgr::shutdown_token;
use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::upload_queue::Delete;
use crate::{
config::PageServerConf,
task_mgr,
@@ -631,25 +634,22 @@ impl RemoteTimelineClient {
/// deletion won't actually be performed, until any previously scheduled
/// upload operations, and the index file upload, have completed
/// successfully.
pub fn schedule_layer_file_deletion(
pub async fn schedule_layer_file_deletion(
self: &Arc<Self>,
names: &[LayerFileName],
deletion_queue_client: &DeletionQueueClient,
) -> anyhow::Result<()> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
// Synchronous update of upload queues under mutex
{
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
// Deleting layers doesn't affect the values stored in TimelineMetadata,
// so we don't need update it. Just serialize it.
let metadata = upload_queue.latest_metadata.clone();
// Deleting layers doesn't affect the values stored in TimelineMetadata,
// so we don't need update it. Just serialize it.
let metadata = upload_queue.latest_metadata.clone();
// Update the remote index file, removing the to-be-deleted files from the index,
// before deleting the actual files.
//
// Once we start removing files from upload_queue.latest_files, there's
// no going back! Otherwise, some of the files would already be removed
// from latest_files, but not yet scheduled for deletion. Use a closure
// to syntactically forbid ? or bail! calls here.
let no_bail_here = || {
// Update the remote index file, removing the to-be-deleted files from the index,
// before deleting the actual files.
for name in names {
if upload_queue.latest_files.remove(name).is_some() {
upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
@@ -659,23 +659,21 @@ impl RemoteTimelineClient {
if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
self.schedule_index_upload(upload_queue, metadata);
}
}
// schedule the actual deletions
for name in names {
let op = UploadOp::Delete(Delete {
file_kind: RemoteOpFileKind::Layer,
layer_file_name: name.clone(),
scheduled_from_timeline_delete: false,
});
self.calls_unfinished_metric_begin(&op);
upload_queue.queued_operations.push_back(op);
info!("scheduled layer file deletion {name}");
}
// Barrier: we must ensure all prior uploads and index writes have landed in S3
// before emitting deletions.
if let Err(e) = self.wait_completion().await {
// This can only fail if upload queue is shut down: if this happens, we do
// not emit any deletions. In this condition (remote client is shut down
// during compaction or GC) we may leak some objects.
bail!("Cannot complete layer file deletions during shutdown ({e})");
}
// Launch the tasks immediately, if possible
self.launch_queued_tasks(upload_queue);
};
no_bail_here();
// Enqueue deletions
deletion_queue_client
.push_layers(self.tenant_id, self.timeline_id, names.to_vec())
.await?;
Ok(())
}
@@ -801,12 +799,13 @@ impl RemoteTimelineClient {
/// Prerequisites: UploadQueue should be in stopped state and deleted_at should be successfuly set.
/// The function deletes layer files one by one, then lists the prefix to see if we leaked something
/// deletes leaked files if any and proceeds with deletion of index file at the end.
pub(crate) async fn delete_all(self: &Arc<Self>) -> anyhow::Result<()> {
pub(crate) async fn delete_all(
self: &Arc<Self>,
deletion_queue: &DeletionQueueClient,
) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_and_timeline_id();
let (mut receiver, deletions_queued) = {
let mut deletions_queued = 0;
let layers: Vec<LayerFileName> = {
let mut locked = self.upload_queue.lock().unwrap();
let stopped = locked.stopped_mut()?;
@@ -818,41 +817,35 @@ impl RemoteTimelineClient {
stopped
.upload_queue_for_deletion
.queued_operations
.reserve(stopped.upload_queue_for_deletion.latest_files.len());
// schedule the actual deletions
for name in stopped.upload_queue_for_deletion.latest_files.keys() {
let op = UploadOp::Delete(Delete {
file_kind: RemoteOpFileKind::Layer,
layer_file_name: name.clone(),
scheduled_from_timeline_delete: true,
});
self.calls_unfinished_metric_begin(&op);
stopped
.upload_queue_for_deletion
.queued_operations
.push_back(op);
info!("scheduled layer file deletion {name}");
deletions_queued += 1;
}
self.launch_queued_tasks(&mut stopped.upload_queue_for_deletion);
(
self.schedule_barrier(&mut stopped.upload_queue_for_deletion),
deletions_queued,
)
.latest_files
.drain()
.map(|kv| kv.0)
.collect()
};
receiver.changed().await.context("upload queue shut down")?;
let layer_deletion_count = layers.len();
let timeline_path = self.conf.timeline_path(&self.tenant_id, &self.timeline_id);
let layer_paths = layers
.into_iter()
.map(|l| {
let local_path = timeline_path.join(l.file_name());
self.conf
.remote_path(&local_path)
.expect("Timeline path should always convert to remote")
})
.collect();
deletion_queue.push_immediate(layer_paths).await?;
// Do not delete index part yet, it is needed for possible retry. If we remove it first
// and retry will arrive to different pageserver there wont be any traces of it on remote storage
let timeline_path = self.conf.timeline_path(&self.tenant_id, &self.timeline_id);
let timeline_storage_path = self.conf.remote_path(&timeline_path)?;
// Execute all pending deletions, so that when we prroceed to do a list_prefixes below, we aren't
// taking the burden of listing all the layers that we already know we should delete.
deletion_queue.flush_immediate().await?;
let remaining = backoff::retry(
|| async {
self.storage_impl
@@ -880,17 +873,9 @@ impl RemoteTimelineClient {
})
.collect();
let not_referenced_count = remaining.len();
if !remaining.is_empty() {
backoff::retry(
|| async { self.storage_impl.delete_objects(&remaining).await },
|_e| false,
FAILED_UPLOAD_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES,
"delete_objects",
backoff::Cancel::new(shutdown_token(), || anyhow::anyhow!("Cancelled!")),
)
.await
.context("delete_objects")?;
deletion_queue.push_immediate(remaining).await?;
}
fail::fail_point!("timeline-delete-before-index-delete", |_| {
@@ -901,18 +886,14 @@ impl RemoteTimelineClient {
let index_file_path = timeline_storage_path.join(Path::new(IndexPart::FILE_NAME));
debug!("deleting index part");
debug!("enqueuing index part deletion");
deletion_queue
.push_immediate([index_file_path].to_vec())
.await?;
backoff::retry(
|| async { self.storage_impl.delete(&index_file_path).await },
|_e| false,
FAILED_UPLOAD_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES,
"delete_index",
backoff::Cancel::new(shutdown_token(), || anyhow::anyhow!("Cancelled")),
)
.await
.context("delete_index")?;
// Timeline deletion is rare and we have probably emitted a reasonably number of objects: wait
// for a flush to a persistent deletion list so that we may be sure deletion will occur.
deletion_queue.flush_immediate().await?;
fail::fail_point!("timeline-delete-after-index-delete", |_| {
Err(anyhow::anyhow!(
@@ -920,7 +901,7 @@ impl RemoteTimelineClient {
))?
});
info!(prefix=%timeline_storage_path, referenced=deletions_queued, not_referenced=%remaining.len(), "done deleting in timeline prefix, including index_part.json");
info!(prefix=%timeline_storage_path, referenced=layer_deletion_count, not_referenced=%not_referenced_count, "done deleting in timeline prefix, including index_part.json");
Ok(())
}
@@ -943,10 +924,6 @@ impl RemoteTimelineClient {
// have finished.
upload_queue.inprogress_tasks.is_empty()
}
UploadOp::Delete(_) => {
// Wait for preceding uploads to finish. Concurrent deletions are OK, though.
upload_queue.num_inprogress_deletions == upload_queue.inprogress_tasks.len()
}
UploadOp::Barrier(_) => upload_queue.inprogress_tasks.is_empty(),
};
@@ -974,9 +951,6 @@ impl RemoteTimelineClient {
UploadOp::UploadMetadata(_, _) => {
upload_queue.num_inprogress_metadata_uploads += 1;
}
UploadOp::Delete(_) => {
upload_queue.num_inprogress_deletions += 1;
}
UploadOp::Barrier(sender) => {
sender.send_replace(());
continue;
@@ -1108,21 +1082,6 @@ impl RemoteTimelineClient {
}
res
}
UploadOp::Delete(delete) => {
let path = &self
.conf
.timeline_path(&self.tenant_id, &self.timeline_id)
.join(delete.layer_file_name.file_name());
delete::delete_layer(self.conf, &self.storage_impl, path)
.measure_remote_op(
self.tenant_id,
self.timeline_id,
delete.file_kind,
RemoteOpKind::Delete,
Arc::clone(&self.metrics),
)
.await
}
UploadOp::Barrier(_) => {
// unreachable. Barrier operations are handled synchronously in
// launch_queued_tasks
@@ -1182,15 +1141,7 @@ impl RemoteTimelineClient {
let mut upload_queue_guard = self.upload_queue.lock().unwrap();
let upload_queue = match upload_queue_guard.deref_mut() {
UploadQueue::Uninitialized => panic!("callers are responsible for ensuring this is only called on an initialized queue"),
UploadQueue::Stopped(stopped) => {
// Special care is needed for deletions, if it was an earlier deletion (not scheduled from deletion)
// then stop() took care of it so we just return.
// For deletions that come from delete_all we still want to maintain metrics, launch following tasks, etc.
match &task.op {
UploadOp::Delete(delete) if delete.scheduled_from_timeline_delete => Some(&mut stopped.upload_queue_for_deletion),
_ => None
}
},
UploadQueue::Stopped(_) => { None }
UploadQueue::Initialized(qi) => { Some(qi) }
};
@@ -1212,9 +1163,6 @@ impl RemoteTimelineClient {
upload_queue.num_inprogress_metadata_uploads -= 1;
upload_queue.last_uploaded_consistent_lsn = lsn; // XXX monotonicity check?
}
UploadOp::Delete(_) => {
upload_queue.num_inprogress_deletions -= 1;
}
UploadOp::Barrier(_) => unreachable!(),
};
@@ -1246,13 +1194,6 @@ impl RemoteTimelineClient {
reason: "metadata uploads are tiny",
},
),
UploadOp::Delete(delete) => (
delete.file_kind,
RemoteOpKind::Delete,
DontTrackSize {
reason: "should we track deletes? positive or negative sign?",
},
),
UploadOp::Barrier(_) => {
// we do not account these
return None;
@@ -1312,7 +1253,6 @@ impl RemoteTimelineClient {
last_uploaded_consistent_lsn: initialized.last_uploaded_consistent_lsn,
num_inprogress_layer_uploads: 0,
num_inprogress_metadata_uploads: 0,
num_inprogress_deletions: 0,
inprogress_tasks: HashMap::default(),
queued_operations: VecDeque::default(),
};
@@ -1333,9 +1273,7 @@ impl RemoteTimelineClient {
// consistency check
assert_eq!(
qi.num_inprogress_layer_uploads
+ qi.num_inprogress_metadata_uploads
+ qi.num_inprogress_deletions,
qi.num_inprogress_layer_uploads + qi.num_inprogress_metadata_uploads,
qi.inprogress_tasks.len()
);
@@ -1365,6 +1303,7 @@ mod tests {
use super::*;
use crate::{
context::RequestContext,
deletion_queue::mock::MockDeletionQueue,
tenant::{
harness::{TenantHarness, TIMELINE_ID},
Tenant, Timeline,
@@ -1431,6 +1370,7 @@ mod tests {
tenant_ctx: RequestContext,
remote_fs_dir: PathBuf,
client: Arc<RemoteTimelineClient>,
deletion_queue: MockDeletionQueue,
}
impl TestSetup {
@@ -1468,7 +1408,7 @@ mod tests {
runtime: tokio::runtime::Handle::current(),
tenant_id: harness.tenant_id,
timeline_id: TIMELINE_ID,
storage_impl: storage,
storage_impl: storage.clone(),
upload_queue: Mutex::new(UploadQueue::Uninitialized),
metrics: Arc::new(RemoteTimelineClientMetrics::new(
&harness.tenant_id,
@@ -1476,6 +1416,8 @@ mod tests {
)),
});
let deletion_queue = MockDeletionQueue::new(Some(storage), harness.conf);
Ok(Self {
harness,
tenant,
@@ -1483,6 +1425,7 @@ mod tests {
tenant_ctx: ctx,
remote_fs_dir,
client,
deletion_queue,
})
}
}
@@ -1511,6 +1454,7 @@ mod tests {
tenant_ctx: _tenant_ctx,
remote_fs_dir,
client,
deletion_queue,
} = TestSetup::new("upload_scheduling").await.unwrap();
let timeline_path = harness.timeline_path(&TIMELINE_ID);
@@ -1618,20 +1562,14 @@ mod tests {
&LayerFileMetadata::new(content_3.len() as u64),
)
.unwrap();
client
.schedule_layer_file_deletion(&[layer_file_name_1.clone()])
.unwrap();
{
let mut guard = client.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut().unwrap();
// Deletion schedules upload of the index file, and the file deletion itself
assert!(upload_queue.queued_operations.len() == 2);
assert!(upload_queue.inprogress_tasks.len() == 1);
assert!(upload_queue.num_inprogress_layer_uploads == 1);
assert!(upload_queue.num_inprogress_deletions == 0);
assert!(upload_queue.latest_files_changes_since_metadata_upload_scheduled == 0);
assert_eq!(upload_queue.queued_operations.len(), 0);
assert_eq!(upload_queue.num_inprogress_layer_uploads, 1);
}
assert_remote_files(
&[
&layer_file_name_1.file_name(),
@@ -1641,8 +1579,46 @@ mod tests {
&remote_timeline_dir,
);
// Finish them
client
.schedule_layer_file_deletion(
&[layer_file_name_1.clone()],
&deletion_queue.new_client(),
)
.await
.unwrap();
{
let mut guard = client.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut().unwrap();
// Deletion schedules upload of the index file via RemoteTimelineClient, and
// deletion of layer files via DeletionQueue. The uploads have all been flushed
// because schedule_layer_file_deletion does a wait_completion before pushing
// to the deletion_queue
assert_eq!(upload_queue.queued_operations.len(), 0);
assert_eq!(upload_queue.inprogress_tasks.len(), 0);
assert_eq!(upload_queue.num_inprogress_layer_uploads, 0);
assert_eq!(
upload_queue.latest_files_changes_since_metadata_upload_scheduled,
0
);
}
assert_remote_files(
&[
&layer_file_name_1.file_name(),
&layer_file_name_2.file_name(),
&layer_file_name_3.file_name(),
"index_part.json",
],
&remote_timeline_dir,
);
// Finish uploads and deletions
client.wait_completion().await.unwrap();
deletion_queue.pump().await;
// 1 layer was deleted
assert_eq!(deletion_queue.get_executed(), 1);
assert_remote_files(
&[

View File

@@ -1,29 +0,0 @@
//! Helper functions to delete files from remote storage with a RemoteStorage
use anyhow::Context;
use std::path::Path;
use tracing::debug;
use remote_storage::GenericRemoteStorage;
use crate::config::PageServerConf;
pub(super) async fn delete_layer<'a>(
conf: &'static PageServerConf,
storage: &'a GenericRemoteStorage,
local_layer_path: &'a Path,
) -> anyhow::Result<()> {
fail::fail_point!("before-delete-layer", |_| {
anyhow::bail!("failpoint before-delete-layer")
});
debug!("Deleting layer from remote storage: {local_layer_path:?}",);
let path_to_delete = conf.remote_path(local_layer_path)?;
// We don't want to print an error if the delete failed if the file has
// already been deleted. Thankfully, in this situation S3 already
// does not yield an error. While OS-provided local file system APIs do yield
// errors, we avoid them in the `LocalFs` wrapper.
storage.delete(&path_to_delete).await.with_context(|| {
format!("Failed to delete remote layer from storage at {path_to_delete:?}")
})
}

View File

@@ -235,21 +235,7 @@ pub(super) async fn download_index_part(
.map_err(DownloadError::BadInput)?;
let index_part_bytes = download_retry(
|| async {
let mut index_part_download = storage.download(&part_storage_path).await?;
let mut index_part_bytes = Vec::new();
tokio::io::copy(
&mut index_part_download.download_stream,
&mut index_part_bytes,
)
.await
.with_context(|| {
format!("Failed to download an index part into file {index_part_path:?}")
})
.map_err(DownloadError::Other)?;
Ok(index_part_bytes)
},
|| storage.download_all(&part_storage_path),
&format!("download {part_storage_path:?}"),
)
.await?;

View File

@@ -38,6 +38,7 @@ use std::time::{Duration, Instant, SystemTime};
use crate::context::{
AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder,
};
use crate::deletion_queue::DeletionQueueClient;
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::storage_layer::delta_layer::DeltaEntry;
use crate::tenant::storage_layer::{
@@ -141,6 +142,7 @@ fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
/// The outward-facing resources required to build a Timeline
pub struct TimelineResources {
pub remote_client: Option<RemoteTimelineClient>,
pub deletion_queue_client: Option<DeletionQueueClient>,
}
pub struct Timeline {
@@ -195,6 +197,9 @@ pub struct Timeline {
/// See [`remote_timeline_client`](super::remote_timeline_client) module comment for details.
pub remote_client: Option<Arc<RemoteTimelineClient>>,
/// Deletion queue: a global queue, separate to the remote storage queue's
deletion_queue_client: Option<Arc<DeletionQueueClient>>,
// What page versions do we hold in the repository? If we get a
// request > last_record_lsn, we need to wait until we receive all
// the WAL up to the request. The SeqWait provides functions for
@@ -1262,6 +1267,18 @@ impl Timeline {
Ok(())
}
async fn delete_all_remote(&self) -> anyhow::Result<()> {
if let Some(remote_client) = &self.remote_client {
if let Some(deletion_queue_client) = &self.deletion_queue_client {
remote_client.delete_all(deletion_queue_client).await
} else {
Ok(())
}
} else {
Ok(())
}
}
}
#[derive(Debug, thiserror::Error)]
@@ -1414,6 +1431,7 @@ impl Timeline {
walreceiver: Mutex::new(None),
remote_client: resources.remote_client.map(Arc::new),
deletion_queue_client: resources.deletion_queue_client.map(Arc::new),
// initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
last_record_lsn: SeqWait::new(RecordLsn {
@@ -1750,11 +1768,15 @@ impl Timeline {
guard.initialize_local_layers(loaded_layers, disk_consistent_lsn + 1);
if let Some(rtc) = self.remote_client.as_ref() {
// Deletion queue client is always Some if remote_client is Some
let deletion_queue_client = self.deletion_queue_client.as_ref().unwrap();
let (needs_upload, needs_cleanup) = to_sync;
for (layer, m) in needs_upload {
rtc.schedule_layer_file_upload(&layer.layer_desc().filename(), &m)?;
}
rtc.schedule_layer_file_deletion(&needs_cleanup)?;
rtc.schedule_layer_file_deletion(&needs_cleanup, deletion_queue_client)
.await?;
rtc.schedule_index_upload_for_file_changes()?;
// Tenant::create_timeline will wait for these uploads to happen before returning, or
// on retry.
@@ -3789,7 +3811,13 @@ impl Timeline {
// Also schedule the deletions in remote storage
if let Some(remote_client) = &self.remote_client {
remote_client.schedule_layer_file_deletion(&layer_names_to_delete)?;
let deletion_queue = self
.deletion_queue_client
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Remote storage enabled without deletion queue"))?;
remote_client
.schedule_layer_file_deletion(&layer_names_to_delete, deletion_queue)
.await?;
}
Ok(())
@@ -4123,7 +4151,15 @@ impl Timeline {
}
if let Some(remote_client) = &self.remote_client {
remote_client.schedule_layer_file_deletion(&layer_names_to_delete)?;
// Remote metadata upload was scheduled in `update_metadata_file`: wait
// for completion before scheduling any deletions.
remote_client.wait_completion().await?;
let deletion_queue = self.deletion_queue_client.as_ref().ok_or_else(|| {
anyhow::anyhow!("Remote storage enabled without deletion queue")
})?;
remote_client
.schedule_layer_file_deletion(&layer_names_to_delete, deletion_queue)
.await?;
}
apply.flush();
@@ -4713,6 +4749,7 @@ mod tests {
use utils::{id::TimelineId, lsn::Lsn};
use crate::deletion_queue::mock::MockDeletionQueue;
use crate::tenant::{harness::TenantHarness, storage_layer::PersistentLayer};
use super::{EvictionError, Timeline};
@@ -4735,9 +4772,17 @@ mod tests {
};
GenericRemoteStorage::from_config(&config).unwrap()
};
let deletion_queue = MockDeletionQueue::new(Some(remote_storage.clone()), harness.conf);
let ctx = any_context();
let tenant = harness.try_load(&ctx, Some(remote_storage)).await.unwrap();
let tenant = harness
.try_load(
&ctx,
Some(remote_storage),
Some(deletion_queue.new_client()),
)
.await
.unwrap();
let timeline = tenant
.create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
.await
@@ -4800,9 +4845,17 @@ mod tests {
};
GenericRemoteStorage::from_config(&config).unwrap()
};
let deletion_queue = MockDeletionQueue::new(Some(remote_storage.clone()), harness.conf);
let ctx = any_context();
let tenant = harness.try_load(&ctx, Some(remote_storage)).await.unwrap();
let tenant = harness
.try_load(
&ctx,
Some(remote_storage),
Some(deletion_queue.new_client()),
)
.await
.unwrap();
let timeline = tenant
.create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
.await

View File

@@ -14,6 +14,7 @@ use utils::{
use crate::{
config::PageServerConf,
deletion_queue::DeletionQueueClient,
task_mgr::{self, TaskKind},
tenant::{
metadata::TimelineMetadata,
@@ -238,15 +239,6 @@ async fn delete_local_layer_files(
Ok(())
}
/// Removes remote layers and an index file after them.
async fn delete_remote_layers_and_index(timeline: &Timeline) -> anyhow::Result<()> {
if let Some(remote_client) = &timeline.remote_client {
remote_client.delete_all().await.context("delete_all")?
};
Ok(())
}
// This function removs remaining traces of a timeline on disk.
// Namely: metadata file, timeline directory, delete mark.
// Note: io::ErrorKind::NotFound are ignored for metadata and timeline dir.
@@ -407,6 +399,7 @@ impl DeleteTimelineFlow {
timeline_id: TimelineId,
local_metadata: &TimelineMetadata,
remote_client: Option<RemoteTimelineClient>,
deletion_queue_client: Option<DeletionQueueClient>,
init_order: Option<&InitializationOrder>,
) -> anyhow::Result<()> {
// Note: here we even skip populating layer map. Timeline is essentially uninitialized.
@@ -416,7 +409,10 @@ impl DeleteTimelineFlow {
timeline_id,
local_metadata,
None, // Ancestor is not needed for deletion.
TimelineResources { remote_client },
TimelineResources {
remote_client,
deletion_queue_client,
},
init_order,
// Important. We dont pass ancestor above because it can be missing.
// Thus we need to skip the validation here.
@@ -559,7 +555,7 @@ impl DeleteTimelineFlow {
) -> Result<(), DeleteTimelineError> {
delete_local_layer_files(conf, tenant.tenant_id, timeline).await?;
delete_remote_layers_and_index(timeline).await?;
timeline.delete_all_remote().await?;
pausable_failpoint!("in_progress_delete");

View File

@@ -1,5 +1,3 @@
use crate::metrics::RemoteOpFileKind;
use super::storage_layer::LayerFileName;
use crate::tenant::metadata::TimelineMetadata;
use crate::tenant::remote_timeline_client::index::IndexPart;
@@ -62,7 +60,6 @@ pub(crate) struct UploadQueueInitialized {
// Breakdown of different kinds of tasks currently in-progress
pub(crate) num_inprogress_layer_uploads: usize,
pub(crate) num_inprogress_metadata_uploads: usize,
pub(crate) num_inprogress_deletions: usize,
/// Tasks that are currently in-progress. In-progress means that a tokio Task
/// has been launched for it. An in-progress task can be busy uploading, but it can
@@ -120,7 +117,6 @@ impl UploadQueue {
task_counter: 0,
num_inprogress_layer_uploads: 0,
num_inprogress_metadata_uploads: 0,
num_inprogress_deletions: 0,
inprogress_tasks: HashMap::new(),
queued_operations: VecDeque::new(),
};
@@ -162,7 +158,6 @@ impl UploadQueue {
task_counter: 0,
num_inprogress_layer_uploads: 0,
num_inprogress_metadata_uploads: 0,
num_inprogress_deletions: 0,
inprogress_tasks: HashMap::new(),
queued_operations: VecDeque::new(),
};
@@ -200,13 +195,6 @@ pub(crate) struct UploadTask {
pub(crate) op: UploadOp,
}
#[derive(Debug)]
pub(crate) struct Delete {
pub(crate) file_kind: RemoteOpFileKind,
pub(crate) layer_file_name: LayerFileName,
pub(crate) scheduled_from_timeline_delete: bool,
}
#[derive(Debug)]
pub(crate) enum UploadOp {
/// Upload a layer file
@@ -215,9 +203,6 @@ pub(crate) enum UploadOp {
/// Upload the metadata file
UploadMetadata(IndexPart, Lsn),
/// Delete a layer file
Delete(Delete),
/// Barrier. When the barrier operation is reached,
Barrier(tokio::sync::watch::Sender<()>),
}
@@ -234,12 +219,6 @@ impl std::fmt::Display for UploadOp {
)
}
UploadOp::UploadMetadata(_, lsn) => write!(f, "UploadMetadata(lsn: {})", lsn),
UploadOp::Delete(delete) => write!(
f,
"Delete(path: {}, scheduled_from_timeline_delete: {})",
delete.layer_file_name.file_name(),
delete.scheduled_from_timeline_delete
),
UploadOp::Barrier(_) => write!(f, "Barrier"),
}
}

View File

@@ -613,3 +613,8 @@ class PageserverHttpClient(requests.Session):
},
)
self.verbose_error(res)
def deletion_queue_flush(self, execute: bool = False):
self.put(
f"http://localhost:{self.port}/v1/deletion_queue/flush?execute={'true' if execute else 'false'}"
).raise_for_status()

View File

@@ -12,7 +12,10 @@ from typing import Dict, List, Optional, Tuple
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
PgBin,
last_flush_lsn_upload,
wait_for_last_flush_lsn,
)
from fixtures.pageserver.http import PageserverApiException, PageserverHttpClient
@@ -250,35 +253,20 @@ def test_remote_storage_upload_queue_retries(
client = env.pageserver.http_client()
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)")
def configure_storage_sync_failpoints(action):
def configure_storage_write_failpoints(action):
client.configure_failpoints(
[
("before-upload-layer", action),
("before-upload-index", action),
("before-delete-layer", action),
]
)
def overwrite_data_and_wait_for_it_to_arrive_at_pageserver(data):
# create initial set of layers & upload them with failpoints configured
endpoint.safe_psql_many(
def configure_storage_delete_failpoints(action):
client.configure_failpoints(
[
f"""
INSERT INTO foo (id, val)
SELECT g, '{data}'
FROM generate_series(1, 20000) g
ON CONFLICT (id) DO UPDATE
SET val = EXCLUDED.val
""",
# to ensure that GC can actually remove some layers
"VACUUM foo",
("deletion-queue-before-execute", action),
]
)
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
def get_queued_count(file_kind, op_kind):
val = client.get_remote_timeline_client_metric(
@@ -291,23 +279,52 @@ def test_remote_storage_upload_queue_retries(
assert val is not None, "expecting metric to be present"
return int(val)
# create some layers & wait for uploads to finish
overwrite_data_and_wait_for_it_to_arrive_at_pageserver("a")
client.timeline_checkpoint(tenant_id, timeline_id)
client.timeline_compact(tenant_id, timeline_id)
overwrite_data_and_wait_for_it_to_arrive_at_pageserver("b")
client.timeline_checkpoint(tenant_id, timeline_id)
client.timeline_compact(tenant_id, timeline_id)
gc_result = client.timeline_gc(tenant_id, timeline_id, 0)
print_gc_result(gc_result)
assert gc_result["layers_removed"] > 0
def get_deletions_executed() -> int:
executed = client.get_metric_value("pageserver_deletion_queue_executed_total")
if executed is None:
return 0
else:
return int(executed)
wait_until(2, 1, lambda: get_queued_count(file_kind="layer", op_kind="upload") == 0)
wait_until(2, 1, lambda: get_queued_count(file_kind="index", op_kind="upload") == 0)
wait_until(2, 1, lambda: get_queued_count(file_kind="layer", op_kind="delete") == 0)
def get_deletion_errors(op_type) -> int:
executed = client.get_metric_value(
"pageserver_deletion_queue_errors_total", {"op_kind": op_type}
)
if executed is None:
return 0
else:
return int(executed)
def assert_queued_count(file_kind: str, op_kind: str, fn):
v = get_queued_count(file_kind=file_kind, op_kind=op_kind)
log.info(f"queue count: {file_kind} {op_kind} {v}")
assert fn(v)
# Push some uploads into the remote_timeline_client queues, before failpoints
# are enabled: these should execute and the queue should revert to zero depth
generate_uploads_and_deletions(env, tenant_id=tenant_id, timeline_id=timeline_id)
wait_until(2, 1, lambda: assert_queued_count("layer", "upload", lambda v: v == 0))
wait_until(2, 1, lambda: assert_queued_count("index", "upload", lambda v: v == 0))
# Wait for some deletions to happen in the above compactions, assert that
# our metrics of interest exist
wait_until(2, 1, lambda: assert_deletion_queue(client, lambda v: v is not None))
# Before enabling failpoints, flushing deletions through should work
client.deletion_queue_flush(execute=True)
executed = client.get_metric_value("pageserver_deletion_queue_executed_total")
assert executed is not None
assert executed > 0
# let all future operations queue up
configure_storage_sync_failpoints("return")
configure_storage_write_failpoints("return")
configure_storage_delete_failpoints("return")
# Snapshot of executed deletions: should not increment while failpoint is enabled
deletions_executed_pre_failpoint = client.get_metric_value(
"pageserver_deletion_queue_executed_total"
)
# Create more churn to generate all upload ops.
# The checkpoint / compact / gc ops will block because they call remote_client.wait_completion().
@@ -315,38 +332,77 @@ def test_remote_storage_upload_queue_retries(
churn_thread_result = [False]
def churn_while_failpoints_active(result):
overwrite_data_and_wait_for_it_to_arrive_at_pageserver("c")
client.timeline_checkpoint(tenant_id, timeline_id)
client.timeline_compact(tenant_id, timeline_id)
overwrite_data_and_wait_for_it_to_arrive_at_pageserver("d")
client.timeline_checkpoint(tenant_id, timeline_id)
client.timeline_compact(tenant_id, timeline_id)
gc_result = client.timeline_gc(tenant_id, timeline_id, 0)
print_gc_result(gc_result)
assert gc_result["layers_removed"] > 0
generate_uploads_and_deletions(
env, init=False, tenant_id=tenant_id, timeline_id=timeline_id, data="d"
)
result[0] = True
churn_while_failpoints_active_thread = threading.Thread(
target=churn_while_failpoints_active, args=[churn_thread_result]
)
log.info("Entered churn phase")
churn_while_failpoints_active_thread.start()
# wait for churn thread's data to get stuck in the upload queue
wait_until(10, 0.1, lambda: get_queued_count(file_kind="layer", op_kind="upload") > 0)
wait_until(10, 0.1, lambda: get_queued_count(file_kind="index", op_kind="upload") >= 2)
wait_until(10, 0.1, lambda: get_queued_count(file_kind="layer", op_kind="delete") > 0)
try:
# wait for churn thread's data to get stuck in the upload queue
wait_until(10, 0.1, lambda: assert_queued_count("layer", "upload", lambda v: v > 0))
wait_until(10, 0.1, lambda: assert_queued_count("index", "upload", lambda v: v >= 2))
# unblock churn operations
configure_storage_sync_failpoints("off")
# Deletion queue should not grow, because deletions wait for upload of
# metadata, and we blocked that upload.
wait_until(10, 0.5, lambda: assert_deletion_queue(client, lambda v: v == 0))
# ... and wait for them to finish. Exponential back-off in upload queue, so, gracious timeouts.
wait_until(30, 1, lambda: get_queued_count(file_kind="layer", op_kind="upload") == 0)
wait_until(30, 1, lambda: get_queued_count(file_kind="index", op_kind="upload") == 0)
wait_until(30, 1, lambda: get_queued_count(file_kind="layer", op_kind="delete") == 0)
# No more deletions should have executed
assert get_deletions_executed() == deletions_executed_pre_failpoint
# unblock write operations
log.info("Unblocking remote writes")
configure_storage_write_failpoints("off")
# ... and wait for them to finish. Exponential back-off in upload queue, so, gracious timeouts.
wait_until(30, 1, lambda: assert_queued_count("layer", "upload", lambda v: v == 0))
wait_until(30, 1, lambda: assert_queued_count("index", "upload", lambda v: v == 0))
# Deletions should have been enqueued now that index uploads proceeded
log.info("Waiting to see deletions enqueued")
wait_until(10, 1, lambda: assert_deletion_queue(client, lambda v: v > 0))
# Run flush in the backgrorund because it will block on the failpoint
class background_flush(threading.Thread):
def run(self):
client.deletion_queue_flush(execute=True)
flusher = background_flush()
flusher.start()
def assert_failpoint_hit():
assert get_deletion_errors("failpoint") > 0
# Our background flush thread should induce us to hit the failpoint
wait_until(20, 0.25, assert_failpoint_hit)
# Deletions should not have been executed while failpoint is still active.
assert get_deletion_queue_depth(client) is not None
assert get_deletion_queue_depth(client) > 0
assert get_deletions_executed() == deletions_executed_pre_failpoint
log.info("Unblocking remote deletes")
configure_storage_delete_failpoints("off")
# An API flush should now complete
flusher.join()
# Queue should drain, which should involve executing some deletions
wait_until(2, 1, lambda: assert_deletion_queue(client, lambda v: v == 0))
assert get_deletions_executed() > deletions_executed_pre_failpoint
finally:
# The churn thread doesn't make progress once it blocks on the first wait_completion() call,
# so, give it some time to wrap up.
log.info("Joining churn workload")
churn_while_failpoints_active_thread.join(30)
log.info("Joined churn workload")
# The churn thread doesn't make progress once it blocks on the first wait_completion() call,
# so, give it some time to wrap up.
churn_while_failpoints_active_thread.join(30)
assert not churn_while_failpoints_active_thread.is_alive()
assert churn_thread_result[0]
@@ -432,7 +488,6 @@ def test_remote_timeline_client_calls_started_metric(
calls_started: Dict[Tuple[str, str], List[int]] = {
("layer", "upload"): [0],
("index", "upload"): [0],
("layer", "delete"): [0],
}
def fetch_calls_started():
@@ -930,4 +985,154 @@ def assert_nothing_to_upload(
assert Lsn(detail["last_record_lsn"]) == Lsn(detail["remote_consistent_lsn"])
def get_deletion_queue_depth(ps_http) -> int:
"""
Queue depth if at least one deletion has been submitted, else None
"""
submitted = ps_http.get_metric_value("pageserver_deletion_queue_submitted_total")
if submitted is None:
return 0
executed = ps_http.get_metric_value("pageserver_deletion_queue_executed_total")
executed = 0 if executed is None else executed
depth = submitted - executed
assert depth >= 0
log.info(f"get_deletion_queue_depth: {depth} ({submitted} - {executed})")
return int(depth)
def assert_deletion_queue(ps_http, size_fn) -> None:
v = get_deletion_queue_depth(ps_http)
assert v is not None
assert size_fn(v) is True
# TODO Test that we correctly handle GC of files that are stuck in upload queue.
def generate_uploads_and_deletions(
env: NeonEnv,
*,
init: bool = True,
tenant_id: Optional[TenantId] = None,
timeline_id: Optional[TimelineId] = None,
data: Optional[str] = None,
):
"""
Using the environment's default tenant + timeline, generate a load pattern
that results in some uploads and some deletions to remote storage.
"""
if tenant_id is None:
tenant_id = env.initial_tenant
assert tenant_id is not None
if timeline_id is None:
timeline_id = env.initial_timeline
assert timeline_id is not None
ps_http = env.pageserver.http_client()
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
if init:
endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)")
last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id)
def churn(data):
endpoint.safe_psql_many(
[
f"""
INSERT INTO foo (id, val)
SELECT g, '{data}'
FROM generate_series(1, 20000) g
ON CONFLICT (id) DO UPDATE
SET val = EXCLUDED.val
""",
# to ensure that GC can actually remove some layers
"VACUUM foo",
]
)
assert tenant_id is not None
assert timeline_id is not None
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
ps_http.timeline_checkpoint(tenant_id, timeline_id)
# Compaction should generate some GC-elegible layers
for i in range(0, 2):
churn(f"{i if data is None else data}")
gc_result = ps_http.timeline_gc(tenant_id, timeline_id, 0)
print_gc_result(gc_result)
assert gc_result["layers_removed"] > 0
@pytest.mark.parametrize("remote_storage_kind", [RemoteStorageKind.LOCAL_FS])
def test_deletion_queue_recovery(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
pg_bin: PgBin,
):
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storage_kind,
test_name="test_deletion_queue_recovery",
)
env = neon_env_builder.init_start(
initial_tenant_conf={
# small checkpointing and compaction targets to ensure we generate many upload operations
"checkpoint_distance": f"{128 * 1024}",
"compaction_threshold": "1",
"compaction_target_size": f"{128 * 1024}",
# no PITR horizon, we specify the horizon when we request on-demand GC
"pitr_interval": "0s",
# disable background compaction and GC. We invoke it manually when we want it to happen.
"gc_period": "0s",
"compaction_period": "0s",
# create image layers eagerly, so that GC can remove some layers
"image_creation_threshold": "1",
}
)
ps_http = env.pageserver.http_client()
# Prevent deletion lists from being executed, to build up some backlog of deletions
ps_http.configure_failpoints(
[
("deletion-queue-before-execute", "return"),
]
)
generate_uploads_and_deletions(env)
# There should be entries in the deletion queue
assert_deletion_queue(ps_http, lambda n: n > 0)
ps_http.deletion_queue_flush()
before_restart_depth = get_deletion_queue_depth(ps_http)
log.info(f"Restarting pageserver with {before_restart_depth} deletions enqueued")
env.pageserver.stop(immediate=True)
env.pageserver.start()
def assert_deletions_submitted(n: int):
assert ps_http.get_metric_value("pageserver_deletion_queue_submitted_total") == n
# After restart, issue a flush to kick the deletion frorntend to do recovery.
# It should recover all the operations we submitted before the restart.
ps_http.deletion_queue_flush(execute=False)
wait_until(20, 0.25, lambda: assert_deletions_submitted(before_restart_depth))
# The queue should drain through completely if we flush it
ps_http.deletion_queue_flush(execute=True)
wait_until(10, 1, lambda: assert_deletion_queue(ps_http, lambda n: n == 0))
# Restart again
env.pageserver.stop(immediate=True)
env.pageserver.start()
# No deletion lists should be recovered: this demonstrates that deletion lists
# were cleaned up after being executed.
time.sleep(1)
assert_deletion_queue(ps_http, lambda n: n == 0)

View File

@@ -47,6 +47,15 @@ def test_tenant_delete_smoke(
)
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.extend(
[
# The deletion queue will complain when it encounters simulated S3 errors
".*deletion frontend: Failed to write deletion list.*",
".*deletion backend: Failed to delete deletion list.*",
".*deletion executor: DeleteObjects request failed.*",
".*deletion backend: Failed to upload deletion queue header.*",
]
)
# lucky race with stopping from flushing a layer we fail to schedule any uploads
env.pageserver.allowed_errors.append(
@@ -91,7 +100,9 @@ def test_tenant_delete_smoke(
iterations = poll_for_remote_storage_iterations(remote_storage_kind)
tenant_delete_wait_completed(ps_http, tenant_id, iterations)
# We are running with failures enabled, so this may take some time to make
# it through all the remote storage operations required to complete
tenant_delete_wait_completed(ps_http, tenant_id, iterations * 10)
tenant_path = env.tenant_dir(tenant_id=tenant_id)
assert not tenant_path.exists()
@@ -201,6 +212,17 @@ def test_delete_tenant_exercise_crash_safety_failpoints(
]
)
if simulate_failures:
env.pageserver.allowed_errors.extend(
[
# The deletion queue will complain when it encounters simulated S3 errors
".*deletion frontend: Failed to write deletion list.*",
".*deletion backend: Failed to delete deletion list.*",
".*deletion executor: DeleteObjects request failed.*",
".*deletion backend: Failed to upload deletion queue header.*",
]
)
ps_http = env.pageserver.http_client()
timeline_id = env.neon_cli.create_timeline("delete", tenant_id=tenant_id)

View File

@@ -488,7 +488,14 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
# Wait for tenant to finish loading.
wait_until_tenant_active(ps_http, tenant_id=env.initial_tenant, iterations=10, period=1)
wait_timeline_detail_404(ps_http, env.initial_tenant, leaf_timeline_id, iterations=4)
# Timeline deletion takes some finite time after startup
wait_timeline_detail_404(
ps_http,
tenant_id=env.initial_tenant,
timeline_id=leaf_timeline_id,
iterations=20,
interval=0.5,
)
assert (
not leaf_timeline_path.exists()
@@ -534,7 +541,7 @@ def test_timeline_delete_fail_before_local_delete(neon_env_builder: NeonEnvBuild
wait_until(
2,
0.5,
lambda: assert_prefix_empty(neon_env_builder),
lambda: assert_prefix_empty(neon_env_builder, prefix="/tenants"),
)
@@ -688,7 +695,7 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder):
wait_until(50, 0.1, first_request_finished)
# check that the timeline is gone
wait_timeline_detail_404(ps_http, env.initial_tenant, child_timeline_id, iterations=2)
wait_timeline_detail_404(ps_http, env.initial_tenant, child_timeline_id, iterations=4)
@pytest.mark.parametrize(
@@ -772,7 +779,11 @@ def test_timeline_delete_works_for_remote_smoke(
# for some reason the check above doesnt immediately take effect for the below.
# Assume it is mock server inconsistency and check twice.
wait_until(2, 0.5, lambda: assert_prefix_empty(neon_env_builder))
wait_until(
2,
0.5,
lambda: assert_prefix_empty(neon_env_builder, "/tenants"),
)
def test_delete_orphaned_objects(
@@ -827,6 +838,8 @@ def test_delete_orphaned_objects(
reason = timeline_info["state"]["Broken"]["reason"]
assert reason.endswith(f"failpoint: {failpoint}"), reason
ps_http.deletion_queue_flush(execute=True)
for orphan in orphans:
assert not orphan.exists()
assert env.pageserver.log_contains(