Implement validation of generations before delete

This commit is contained in:
John Spray
2023-08-30 17:44:10 +01:00
parent 35e4b43531
commit c63a952b78
5 changed files with 279 additions and 50 deletions

View File

@@ -67,6 +67,14 @@ impl Generation {
Self::none()
}
}
pub fn into(self) -> Option<u32> {
if let Self::Valid(v) = self {
Some(v)
} else {
None
}
}
}
impl Serialize for Generation {

View File

@@ -3,8 +3,10 @@ mod executor;
mod frontend;
use std::collections::HashMap;
use std::path::PathBuf;
use crate::metrics::DELETION_QUEUE_SUBMITTED;
use crate::tenant::remote_timeline_client::remote_timeline_path;
use remote_storage::{GenericRemoteStorage, RemotePath};
use serde::Deserialize;
use serde::Serialize;
@@ -14,7 +16,7 @@ use tokio;
use tokio_util::sync::CancellationToken;
use tracing::{self, debug, error};
use utils::generation::Generation;
use utils::id::{TenantId, TenantTimelineId, TimelineId};
use utils::id::{TenantId, TimelineId};
pub(crate) use self::backend::BackendQueueWorker;
use self::executor::ExecutorWorker;
@@ -84,11 +86,15 @@ pub struct DeletionQueueClient {
}
#[derive(Debug, Serialize, Deserialize)]
struct TimelineDeletionList {
objects: Vec<RemotePath>,
// TODO: Tenant attachment generation will go here
// (see https://github.com/neondatabase/neon/pull/4919)
// attach_gen: u32,
struct TenantDeletionList {
/// For each Timeline, a list of key fragments to append to the timeline remote path
/// when reconstructing a full key
timelines: HashMap<TimelineId, Vec<String>>,
/// The generation in which this deletion was emitted: note that this may not be the
/// same as the generation of any layers being deleted. The generation of the layer
/// has already been absorbed into the keys in `objects`
generation: Generation,
}
#[serde_as]
@@ -101,11 +107,13 @@ struct DeletionList {
sequence: u64,
/// To avoid repeating tenant/timeline IDs in every key, we store keys in
/// nested HashMaps by TenantTimelineID
objects: HashMap<TenantTimelineId, TimelineDeletionList>,
// TODO: Node generation will go here
// (see https://github.com/neondatabase/neon/pull/4919)
// node_gen: u32,
/// nested HashMaps by TenantTimelineID. Each Tenant only appears once
/// with one unique generation ID: if someone tries to push a second generation
/// ID for the same tenant, we will start a new DeletionList.
tenants: HashMap<TenantId, TenantDeletionList>,
/// Avoid having to walk `tenants` to calculate size
size: usize,
}
#[serde_as]
@@ -140,40 +148,92 @@ impl DeletionList {
Self {
version: Self::VERSION_LATEST,
sequence,
objects: HashMap::new(),
tenants: HashMap::new(),
size: 0,
}
}
fn drain(&mut self) -> Self {
let mut tenants = HashMap::new();
std::mem::swap(&mut self.tenants, &mut tenants);
let other = Self {
version: Self::VERSION_LATEST,
sequence: self.sequence,
tenants,
size: self.size,
};
self.size = 0;
other
}
fn is_empty(&self) -> bool {
self.objects.is_empty()
self.tenants.is_empty()
}
fn len(&self) -> usize {
self.objects.values().map(|v| v.objects.len()).sum()
self.size
}
fn push(&mut self, tenant: &TenantId, timeline: &TimelineId, mut objects: Vec<RemotePath>) {
/// Returns true if the push was accepted, false if the caller must start a new
/// deletion list.
fn push(
&mut self,
tenant: &TenantId,
timeline: &TimelineId,
generation: Generation,
objects: &mut Vec<RemotePath>,
) -> bool {
if objects.is_empty() {
// Avoid inserting an empty TimelineDeletionList: this preserves the property
// that if we have no keys, then self.objects is empty (used in Self::is_empty)
return;
return true;
}
let key = TenantTimelineId::new(*tenant, *timeline);
let entry = self
.objects
.entry(key)
.or_insert_with(|| TimelineDeletionList {
objects: Vec::new(),
let tenant_entry = self
.tenants
.entry(*tenant)
.or_insert_with(|| TenantDeletionList {
timelines: HashMap::new(),
generation: generation,
});
entry.objects.append(&mut objects)
if tenant_entry.generation != generation {
// Only one generation per tenant per list: signal to
// caller to start a new list.
return false;
}
let timeline_entry = tenant_entry
.timelines
.entry(*timeline)
.or_insert_with(|| Vec::new());
let timeline_remote_path = remote_timeline_path(tenant, timeline);
self.size += objects.len();
timeline_entry.extend(objects.drain(..).map(|p| {
p.strip_prefix(&timeline_remote_path)
.expect("Timeline paths always start with the timeline prefix")
.to_string_lossy()
.to_string()
}));
true
}
fn take_paths(&mut self) -> Vec<RemotePath> {
self.objects
.drain()
.flat_map(|(_k, v)| v.objects.into_iter())
.collect()
fn take_paths(self) -> Vec<RemotePath> {
let mut result = Vec::new();
for (tenant, tenant_deletions) in self.tenants.into_iter() {
for (timeline, timeline_layers) in tenant_deletions.timelines.into_iter() {
let timeline_remote_path = remote_timeline_path(&tenant, &timeline);
result.extend(
timeline_layers
.into_iter()
.map(|l| timeline_remote_path.join(&PathBuf::from(l))),
);
}
}
result
}
}
@@ -205,6 +265,7 @@ impl DeletionQueueClient {
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
generation: Generation,
layers: Vec<(LayerFileName, Generation)>,
) -> Result<(), DeletionQueueError> {
DELETION_QUEUE_SUBMITTED.inc_by(layers.len() as u64);
@@ -212,6 +273,7 @@ impl DeletionQueueClient {
tenant_id,
timeline_id,
layers,
generation,
objects: Vec::new(),
}))
.await
@@ -342,7 +404,12 @@ impl DeletionQueue {
backend_tx,
cancel.clone(),
)),
Some(BackendQueueWorker::new(conf, backend_rx, executor_tx)),
Some(BackendQueueWorker::new(
conf,
backend_rx,
executor_tx,
cancel.clone(),
)),
Some(ExecutorWorker::new(
remote_storage,
executor_rx,
@@ -543,8 +610,13 @@ mod test {
let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
let deletion_prefix = ctx.harness.conf.deletion_prefix();
let generation = Generation::new(0xdeadbeef);
let remote_layer_file_name_1 = format!("{}{}", layer_file_name_1, generation.get_suffix());
// Exercise the distinction between the generation of the layers
// we delete, and the generation of the running Tenant.
let layer_generation = Generation::new(0xdeadbeef);
let now_generation = Generation::new(0xfeedbeef);
let remote_layer_file_name_1 =
format!("{}{}", layer_file_name_1, layer_generation.get_suffix());
// Inject a victim file to remote storage
info!("Writing");
@@ -560,7 +632,8 @@ mod test {
ctx.runtime.block_on(client.push_layers(
tenant_id,
TIMELINE_ID,
[(layer_file_name_1.clone(), generation)].to_vec(),
now_generation,
[(layer_file_name_1.clone(), layer_generation)].to_vec(),
))?;
assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
@@ -599,8 +672,10 @@ mod test {
let relative_remote_path = remote_timeline_path(&tenant_id, &TIMELINE_ID);
let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
let deletion_prefix = ctx.harness.conf.deletion_prefix();
let generation = Generation::new(0xdeadbeef);
let remote_layer_file_name_1 = format!("{}{}", layer_file_name_1, generation.get_suffix());
let layer_generation = Generation::new(0xdeadbeef);
let now_generation = Generation::new(0xfeedbeef);
let remote_layer_file_name_1 =
format!("{}{}", layer_file_name_1, layer_generation.get_suffix());
// Inject a file, delete it, and flush to a deletion list
std::fs::create_dir_all(&remote_timeline_path)?;
@@ -611,7 +686,8 @@ mod test {
ctx.runtime.block_on(client.push_layers(
tenant_id,
TIMELINE_ID,
[(layer_file_name_1.clone(), generation)].to_vec(),
now_generation,
[(layer_file_name_1.clone(), layer_generation)].to_vec(),
))?;
ctx.runtime.block_on(client.flush())?;
assert_local_files(&["0000000000000001-01.list"], &deletion_prefix);

View File

@@ -1,8 +1,15 @@
use std::collections::HashMap;
use std::time::Duration;
use futures::future::TryFutureExt;
use pageserver_api::control_api::HexTenantId;
use pageserver_api::control_api::{ValidateRequest, ValidateRequestTenant, ValidateResponse};
use serde::de::DeserializeOwned;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use tracing::info;
use tracing::warn;
use utils::backoff;
use crate::config::PageServerConf;
use crate::metrics::DELETION_QUEUE_ERRORS;
@@ -10,6 +17,7 @@ use crate::metrics::DELETION_QUEUE_ERRORS;
use super::executor::ExecutorMessage;
use super::DeletionHeader;
use super::DeletionList;
use super::DeletionQueueError;
use super::FlushOp;
// After this length of time, execute deletions which are elegible to run,
@@ -41,6 +49,58 @@ pub struct BackendQueueWorker {
// DeletionLists we have fully executed, which may be deleted
// from remote storage.
executed_lists: Vec<DeletionList>,
cancel: CancellationToken,
}
#[derive(thiserror::Error, Debug)]
enum ValidateCallError {
#[error("shutdown")]
Shutdown,
#[error("remote: {0}")]
Remote(reqwest::Error),
}
async fn retry_http_forever<T>(
url: &url::Url,
request: ValidateRequest,
cancel: CancellationToken,
) -> Result<T, DeletionQueueError>
where
T: DeserializeOwned,
{
let client = reqwest::ClientBuilder::new()
.build()
.expect("Failed to construct http client");
let response = match backoff::retry(
|| {
client
.post(url.clone())
.json(&request)
.send()
.map_err(|e| ValidateCallError::Remote(e))
},
|_| false,
3,
u32::MAX,
"calling control plane generation validation API",
backoff::Cancel::new(cancel.clone(), || ValidateCallError::Shutdown),
)
.await
{
Err(ValidateCallError::Shutdown) => {
return Err(DeletionQueueError::ShuttingDown);
}
Err(ValidateCallError::Remote(_)) => {
panic!("We retry forever");
}
Ok(r) => r,
};
// TODO: handle non-200 response
// TODO: handle decode error
Ok(response.json::<T>().await.unwrap())
}
impl BackendQueueWorker {
@@ -48,6 +108,7 @@ impl BackendQueueWorker {
conf: &'static PageServerConf,
rx: tokio::sync::mpsc::Receiver<BackendQueueMessage>,
tx: tokio::sync::mpsc::Sender<ExecutorMessage>,
cancel: CancellationToken,
) -> Self {
Self {
conf,
@@ -56,6 +117,7 @@ impl BackendQueueWorker {
pending_lists: Vec::new(),
pending_key_count: 0,
executed_lists: Vec::new(),
cancel,
}
}
@@ -106,11 +168,67 @@ impl BackendQueueWorker {
}
}
pub async fn validate_lists(&mut self) -> Result<(), DeletionQueueError> {
let control_plane_api = match &self.conf.control_plane_api {
None => {
// Generations are not switched on yet.
return Ok(());
}
Some(api) => api,
};
let validate_path = control_plane_api
.join("validate")
.expect("Failed to build validate path");
for list in &mut self.pending_lists {
let request = ValidateRequest {
tenants: list
.tenants
.iter()
.map(|(tid, tdl)| ValidateRequestTenant {
id: HexTenantId::new(*tid),
gen: tdl.generation.into().expect(
"Generation should always be valid for a Tenant doing deletions",
),
})
.collect(),
};
// Retry forever, we cannot make progress until we get a response
let response: ValidateResponse =
retry_http_forever(&validate_path, request, self.cancel.clone()).await?;
let tenants_valid: HashMap<_, _> = response
.tenants
.into_iter()
.map(|t| (t.id.take(), t.valid))
.collect();
// Filter the list based on whether the server responded valid: true.
// If a tenant is omitted in the response, it has been deleted, and we should
// proceed with deletion.
list.tenants.retain(|tenant_id, _tenant| {
let r = tenants_valid.get(tenant_id).map(|v| *v).unwrap_or(true);
if !r {
warn!("Dropping stale deletions for tenant {tenant_id}, objects may be leaked");
}
r
});
}
Ok(())
}
pub async fn flush(&mut self) {
self.pending_key_count = 0;
// Issue any required generation validation calls to the control plane
if let Err(DeletionQueueError::ShuttingDown) = self.validate_lists().await {
warn!("Shutting down");
return;
}
// Submit all keys from pending DeletionLists into the executor
for list in &mut self.pending_lists {
for list in self.pending_lists.drain(..) {
let objects = list.take_paths();
if let Err(_e) = self.tx.send(ExecutorMessage::Delete(objects)).await {
warn!("Shutting down");
@@ -132,6 +250,7 @@ impl BackendQueueWorker {
// After flush, we are assured that all contents of the pending lists
// are executed
self.pending_key_count = 0;
self.executed_lists.append(&mut self.pending_lists);
// Erase the lists we executed
@@ -164,7 +283,7 @@ impl BackendQueueWorker {
match msg {
BackendQueueMessage::Delete(list) => {
self.pending_key_count += list.objects.len();
self.pending_key_count += list.len();
self.pending_lists.push(list);
if self.pending_key_count > AUTOFLUSH_KEY_COUNT {

View File

@@ -45,6 +45,10 @@ pub(super) struct DeletionOp {
// to do it for you.
pub(super) layers: Vec<(LayerFileName, Generation)>,
pub(super) objects: Vec<RemotePath>,
/// The _current_ generation of the Tenant attachment in which we are enqueuing
/// this deletion.
pub(super) generation: Generation,
}
#[derive(Debug)]
@@ -121,8 +125,7 @@ impl FrontendQueueWorker {
f.fire();
}
let mut onward_list = DeletionList::new(self.pending.sequence);
std::mem::swap(&mut onward_list.objects, &mut self.pending.objects);
let onward_list = self.pending.drain();
// We have consumed out of pending: reset it for the next incoming deletions to accumulate there
self.pending = DeletionList::new(self.pending.sequence + 1);
@@ -317,14 +320,34 @@ impl FrontendQueueWorker {
generation,
));
}
layer_paths.extend(op.objects);
self.pending
.push(&op.tenant_id, &op.timeline_id, layer_paths);
self.pending
.push(&op.tenant_id, &op.timeline_id, op.objects);
if self.pending.push(
&op.tenant_id,
&op.timeline_id,
op.generation,
&mut layer_paths,
) == false
{
self.flush().await;
let retry = self.pending.push(
&op.tenant_id,
&op.timeline_id,
op.generation,
&mut layer_paths,
);
if retry != true {
// Unexpeted: after we flush, we should have
// drained self.pending, so a conflict on
// generation numbers should be impossible.
tracing::error!(
"Failed to enqueue deletions, leaking objects. This is a bug."
);
}
}
}
FrontendQueueMessage::Flush(op) => {
if self.pending.objects.is_empty() {
if self.pending.is_empty() {
// Execute immediately
debug!("Flush: No pending objects, flushing immediately");
op.fire()
@@ -344,9 +367,7 @@ impl FrontendQueueWorker {
}
}
if self.pending.objects.len() > DELETION_LIST_TARGET_SIZE
|| !self.pending_flushes.is_empty()
{
if self.pending.len() > DELETION_LIST_TARGET_SIZE || !self.pending_flushes.is_empty() {
self.flush().await;
}
}

View File

@@ -693,7 +693,12 @@ impl RemoteTimelineClient {
// Enqueue deletions
deletion_queue_client
.push_layers(self.tenant_id, self.timeline_id, with_generations)
.push_layers(
self.tenant_id,
self.timeline_id,
self.generation,
with_generations,
)
.await?;
Ok(())
}
@@ -1509,7 +1514,7 @@ mod tests {
)),
});
let deletion_queue = MockDeletionQueue::new(Some(storage), harness.conf);
let deletion_queue = MockDeletionQueue::new(Some(storage));
Ok(Self {
harness,