unimpl the parts that support !generation.is_none()

This commit is contained in:
Christian Schwarz
2023-09-13 18:15:15 +02:00
parent 0ba442f1e0
commit d62723ea57
10 changed files with 66 additions and 2133 deletions

View File

@@ -8,7 +8,6 @@ use anyhow::{anyhow, Context};
use clap::{Arg, ArgAction, Command};
use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp};
use pageserver::control_plane_client::ControlPlaneClient;
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING};
use pageserver::task_mgr::WALRECEIVER_RUNTIME;
@@ -355,11 +354,7 @@ fn start_pageserver(
let remote_storage = create_remote_storage_client(conf)?;
// Set up deletion queue
let (deletion_queue, deletion_workers) = DeletionQueue::new(
remote_storage.clone(),
ControlPlaneClient::new(conf, &shutdown_pageserver),
conf,
);
let (deletion_queue, deletion_workers) = DeletionQueue::new(remote_storage.clone(), conf);
if let Some(deletion_workers) = deletion_workers {
deletion_workers.spawn_with(BACKGROUND_RUNTIME.handle());
}

View File

@@ -1,164 +0,0 @@
use std::collections::HashMap;
use pageserver_api::control_api::{
ReAttachRequest, ReAttachResponse, ValidateRequest, ValidateRequestTenant, ValidateResponse,
};
use serde::{de::DeserializeOwned, Serialize};
use tokio_util::sync::CancellationToken;
use url::Url;
use utils::{
backoff,
generation::Generation,
id::{NodeId, TenantId},
};
use crate::config::PageServerConf;
/// The Pageserver's client for using the control plane API: this is a small subset
/// of the overall control plane API, for dealing with generations (see docs/rfcs/025-generation-numbers.md)
pub struct ControlPlaneClient {
http_client: reqwest::Client,
base_url: Url,
node_id: NodeId,
cancel: CancellationToken,
}
#[async_trait::async_trait]
pub trait ControlPlaneGenerationsApi {
async fn re_attach(&self) -> anyhow::Result<HashMap<TenantId, Generation>>;
async fn validate(
&self,
tenants: Vec<(TenantId, Generation)>,
) -> anyhow::Result<HashMap<TenantId, bool>>;
}
impl ControlPlaneClient {
/// A None return value indicates that the input `conf` object does not have control
/// plane API enabled.
pub fn new(conf: &'static PageServerConf, cancel: &CancellationToken) -> Option<Self> {
let mut url = match conf.control_plane_api.as_ref() {
Some(u) => u.clone(),
None => return None,
};
if let Ok(mut segs) = url.path_segments_mut() {
// This ensures that `url` ends with a slash if it doesn't already.
// That way, we can subsequently use join() to safely attach extra path elements.
segs.pop_if_empty().push("");
}
let client = reqwest::ClientBuilder::new()
.build()
.expect("Failed to construct http client");
Some(Self {
http_client: client,
base_url: url,
node_id: conf.id,
cancel: cancel.clone(),
})
}
async fn retry_http_forever<R, T>(&self, url: &url::Url, request: R) -> Result<T, anyhow::Error>
where
R: Serialize,
T: DeserializeOwned,
{
#[derive(thiserror::Error, Debug)]
enum RemoteAttemptError {
#[error("shutdown")]
Shutdown,
#[error("remote: {0}")]
Remote(reqwest::Error),
}
match backoff::retry(
|| async {
let response = self
.http_client
.post(url.clone())
.json(&request)
.send()
.await
.map_err(RemoteAttemptError::Remote)?;
response
.error_for_status_ref()
.map_err(RemoteAttemptError::Remote)?;
response
.json::<T>()
.await
.map_err(RemoteAttemptError::Remote)
},
|_| false,
3,
u32::MAX,
"calling control plane generation validation API",
backoff::Cancel::new(self.cancel.clone(), || RemoteAttemptError::Shutdown),
)
.await
{
Err(RemoteAttemptError::Shutdown) => Err(anyhow::anyhow!("Shutting down")),
Err(RemoteAttemptError::Remote(_)) => {
panic!("We retry forever, this should never be reached");
}
Ok(r) => Ok(r),
}
}
}
#[async_trait::async_trait]
impl ControlPlaneGenerationsApi for ControlPlaneClient {
/// Block until we get a successful response
async fn re_attach(&self) -> anyhow::Result<HashMap<TenantId, Generation>> {
let re_attach_path = self
.base_url
.join("re-attach")
.expect("Failed to build re-attach path");
let request = ReAttachRequest {
node_id: self.node_id,
};
let response: ReAttachResponse = self.retry_http_forever(&re_attach_path, request).await?;
tracing::info!(
"Received re-attach response with {} tenants",
response.tenants.len()
);
Ok(response
.tenants
.into_iter()
.map(|t| (t.id, Generation::new(t.generation)))
.collect::<HashMap<_, _>>())
}
async fn validate(
&self,
tenants: Vec<(TenantId, Generation)>,
) -> anyhow::Result<HashMap<TenantId, bool>> {
let re_attach_path = self
.base_url
.join("validate")
.expect("Failed to build validate path");
let request = ValidateRequest {
tenants: tenants
.into_iter()
.map(|(id, gen)| ValidateRequestTenant {
id,
gen: gen
.into()
.expect("Generation should always be valid for a Tenant doing deletions"),
})
.collect(),
};
let response: ValidateResponse = self.retry_http_forever(&re_attach_path, request).await?;
Ok(response
.tenants
.into_iter()
.map(|rt| (rt.id, rt.valid))
.collect())
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,349 +0,0 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use tracing::info;
use tracing::warn;
use crate::config::PageServerConf;
use crate::control_plane_client::ControlPlaneGenerationsApi;
use crate::metrics::DELETION_QUEUE_DROPPED;
use crate::metrics::DELETION_QUEUE_ERRORS;
use super::executor::ExecutorMessage;
use super::DeletionHeader;
use super::DeletionList;
use super::DeletionQueueError;
use super::FlushOp;
use super::VisibleLsnUpdates;
// After this length of time, do any validation work that is pending,
// even if we haven't accumulated many keys to delete.
//
// This also causes updates to remote_consistent_lsn to be validated, even
// if there were no deletions enqueued.
const AUTOFLUSH_INTERVAL: Duration = Duration::from_secs(10);
// If we have received this number of keys, proceed with attempting to execute
const AUTOFLUSH_KEY_COUNT: usize = 16384;
#[derive(Debug)]
pub(super) enum BackendQueueMessage {
Delete(DeletionList),
Flush(FlushOp),
}
pub(super) struct BackendQueueWorker<C>
where
C: ControlPlaneGenerationsApi,
{
conf: &'static PageServerConf,
rx: tokio::sync::mpsc::Receiver<BackendQueueMessage>,
tx: tokio::sync::mpsc::Sender<ExecutorMessage>,
// Client for calling into control plane API for validation of deletes
control_plane_client: Option<C>,
// DeletionLists which are waiting generation validation. Not safe to
// execute until [`validate`] has processed them.
pending_lists: Vec<DeletionList>,
// DeletionLists which have passed validation and are ready to execute.
validated_lists: Vec<DeletionList>,
// Sum of all the lengths of lists in pending_lists
pending_key_count: usize,
// Lsn validation state: we read projected LSNs and write back visible LSNs
// after validation. This is the LSN equivalent of `pending_validation_lists`:
// it is drained in [`validate`]
lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
cancel: CancellationToken,
}
impl<C> BackendQueueWorker<C>
where
C: ControlPlaneGenerationsApi,
{
pub(super) fn new(
conf: &'static PageServerConf,
rx: tokio::sync::mpsc::Receiver<BackendQueueMessage>,
tx: tokio::sync::mpsc::Sender<ExecutorMessage>,
control_plane_client: Option<C>,
lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
cancel: CancellationToken,
) -> Self {
Self {
conf,
rx,
tx,
control_plane_client,
lsn_table,
pending_lists: Vec::new(),
validated_lists: Vec::new(),
pending_key_count: 0,
cancel,
}
}
async fn cleanup_lists(&mut self, lists: Vec<DeletionList>) {
for list in lists {
let list_path = self.conf.deletion_list_path(list.sequence);
debug!("Removing deletion list {list} at {}", list_path.display());
if let Err(e) = tokio::fs::remove_file(&list_path).await {
// Unexpected: we should have permissions and nothing else should
// be touching these files. We will leave the file behind. Subsequent
// pageservers will try and load it again: hopefully whatever storage
// issue (probably permissions) has been fixed by then.
tracing::error!("Failed to delete {}: {e:#}", list_path.display());
break;
}
}
}
/// Process any outstanding validations of generations of pending LSN updates or pending
/// DeletionLists.
///
/// Valid LSN updates propagate back to their result channel immediately, valid DeletionLists
/// go into the queue of ready-to-execute lists.
async fn validate(&mut self) -> Result<(), DeletionQueueError> {
let mut tenant_generations = HashMap::new();
for list in &self.pending_lists {
for (tenant_id, tenant_list) in &list.tenants {
// Note: DeletionLists are in logical time order, so generation always
// goes up. By doing a simple insert() we will always end up with
// the latest generation seen for a tenant.
tenant_generations.insert(*tenant_id, tenant_list.generation);
}
}
let pending_lsn_updates = {
let mut lsn_table = self.lsn_table.write().expect("Lock should not be poisoned");
let mut pending_updates = VisibleLsnUpdates::new();
std::mem::swap(&mut pending_updates, &mut lsn_table);
pending_updates
};
for (tenant_id, update) in &pending_lsn_updates.tenants {
let entry = tenant_generations
.entry(*tenant_id)
.or_insert(update.generation);
if update.generation > *entry {
*entry = update.generation;
}
}
if tenant_generations.is_empty() {
// No work to do
return Ok(());
}
let tenants_valid = if let Some(control_plane_client) = &self.control_plane_client {
control_plane_client
.validate(tenant_generations.iter().map(|(k, v)| (*k, *v)).collect())
.await
// The only wait a validation call returns an error is when the cancellation token fires
.map_err(|_| DeletionQueueError::ShuttingDown)?
} else {
// Control plane API disabled. In legacy mode we consider everything valid.
tenant_generations.keys().map(|k| (*k, true)).collect()
};
let mut validated_sequence: Option<u64> = None;
// Apply the validation results to the pending LSN updates
for (tenant_id, tenant_lsn_state) in pending_lsn_updates.tenants {
let validated_generation = tenant_generations
.get(&tenant_id)
.expect("Map was built from the same keys we're reading");
// If the tenant was missing from the validation response, it has been deleted. We may treat
// deletions as valid as the tenant's remote storage is all to be wiped anyway.
let valid = tenants_valid.get(&tenant_id).copied().unwrap_or(true);
if valid && *validated_generation == tenant_lsn_state.generation {
for (_timeline_id, pending_lsn) in tenant_lsn_state.timelines {
// Drop result of send: it is legal for the Timeline to have been dropped along
// with its queue receiver while we were doing validation.
pending_lsn.result_slot.store(pending_lsn.projected);
}
} else {
// If we failed validation, then do not apply any of the projected updates
warn!("Dropped remote consistent LSN updates for tenant {tenant_id} in stale generation {0:?}", tenant_lsn_state.generation);
}
}
// Apply the validation results to the pending deletion lists
for list in &mut self.pending_lists {
// Filter the list based on whether the server responded valid: true.
// If a tenant is omitted in the response, it has been deleted, and we should
// proceed with deletion.
let mut mutated = false;
list.tenants.retain(|tenant_id, tenant| {
let validated_generation = tenant_generations
.get(tenant_id)
.expect("Map was built from the same keys we're reading");
// If the tenant was missing from the validation response, it has been deleted. We may treat
// deletions as valid as the tenant's remote storage is all to be wiped anyway.
let valid = tenants_valid.get(tenant_id).copied().unwrap_or(true);
// A list is valid if it comes from the current _or previous_ generation.
// The previous generation case is due to how we store deletion lists locally:
// if we see the immediately previous generation in a locally stored deletion list,
// it proves that this node's disk was used for both current & previous generations,
// and therefore no other node was involved in between: the two generations may be
// logically treated as the same.
let this_list_valid = valid
&& (tenant.generation == *validated_generation);
if !this_list_valid {
warn!("Dropping stale deletions for tenant {tenant_id} in generation {:?}, objects may be leaked", tenant.generation);
DELETION_QUEUE_DROPPED.inc_by(tenant.len() as u64);
mutated = true;
}
this_list_valid
});
list.validated = true;
if mutated {
// Save the deletion list if we had to make changes due to stale generations. The
// saved list is valid for execution.
if let Err(e) = list.save(self.conf).await {
// Highly unexpected. Could happen if e.g. disk full.
// If we didn't save the trimmed list, it is _not_ valid to execute.
warn!("Failed to save modified deletion list {list}: {e:#}");
// Rather than have a complex retry process, just drop it and leak the objects,
// scrubber will clean up eventually.
list.tenants.clear(); // Result is a valid-but-empty list, which is a no-op for execution.
}
}
validated_sequence = Some(list.sequence);
}
if let Some(validated_sequence) = validated_sequence {
// Write the queue header to record how far validation progressed. This avoids having
// to rewrite each DeletionList to set validated=true in it.
let header = DeletionHeader::new(validated_sequence);
// Drop result because the validated_sequence is an optimization. If we fail to save it,
// then restart, we will drop some deletion lists, creating work for scrubber.
// The save() function logs a warning on error.
if let Err(e) = header.save(self.conf).await {
warn!("Failed to write deletion queue header: {e:#}");
DELETION_QUEUE_ERRORS
.with_label_values(&["put_header"])
.inc();
}
}
// Transfer the validated lists to the validated queue, for eventual execution
self.validated_lists.append(&mut self.pending_lists);
Ok(())
}
async fn flush(&mut self) -> Result<(), DeletionQueueError> {
tracing::debug!("Flushing with {} pending lists", self.pending_lists.len());
// Issue any required generation validation calls to the control plane
self.validate().await?;
// After successful validation, nothing is pending: any lists that
// made it through validation will be in validated_lists.
assert!(self.pending_lists.is_empty());
self.pending_key_count = 0;
tracing::debug!(
"Validation complete, have {} validated lists",
self.validated_lists.len()
);
// Return quickly if we have no validated lists to execute. This avoids flushing the
// executor when an idle backend hits its autoflush interval
if self.validated_lists.is_empty() {
return Ok(());
}
// Drain `validated_lists` into the executor
let mut executing_lists = Vec::new();
for mut list in self.validated_lists.drain(..) {
let objects = list.drain_paths();
self.tx
.send(ExecutorMessage::Delete(objects))
.await
.map_err(|_| DeletionQueueError::ShuttingDown)?;
executing_lists.push(list);
}
self.flush_executor().await?;
// Erase the deletion lists whose keys have all be deleted from remote storage
self.cleanup_lists(executing_lists).await;
Ok(())
}
async fn flush_executor(&mut self) -> Result<(), DeletionQueueError> {
// Flush the executor, so that all the keys referenced by these deletion lists
// are actually removed from remote storage. This is a precondition to deleting
// the deletion lists themselves.
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
let flush_op = FlushOp { tx };
self.tx
.send(ExecutorMessage::Flush(flush_op))
.await
.map_err(|_| DeletionQueueError::ShuttingDown)?;
rx.await.map_err(|_| DeletionQueueError::ShuttingDown)
}
pub(super) async fn background(&mut self) {
tracing::info!("Started deletion backend worker");
while !self.cancel.is_cancelled() {
let msg = match tokio::time::timeout(AUTOFLUSH_INTERVAL, self.rx.recv()).await {
Ok(Some(m)) => m,
Ok(None) => {
// All queue senders closed
info!("Shutting down");
break;
}
Err(_) => {
// Timeout, we hit deadline to execute whatever we have in hand. These functions will
// return immediately if no work is pending.
// Drop result, because it' a background flush and we don't care whether it really worked.
drop(self.flush().await);
continue;
}
};
match msg {
BackendQueueMessage::Delete(list) => {
if list.validated {
self.validated_lists.push(list)
} else {
self.pending_key_count += list.len();
self.pending_lists.push(list);
}
if self.pending_key_count > AUTOFLUSH_KEY_COUNT {
// Drop possible shutdown error, because we will just fall out of loop if that happens
drop(self.flush().await);
}
}
BackendQueueMessage::Flush(op) => {
if let Ok(()) = self.flush().await {
// If we fail due to shutting down, we will just drop `op` to propagate that status.
op.fire();
}
}
}
}
}
}

View File

@@ -1,430 +0,0 @@
use super::BackendQueueMessage;
use super::DeletionHeader;
use super::DeletionList;
use super::FlushOp;
use std::collections::HashMap;
use std::fs::create_dir_all;
use std::time::Duration;
use regex::Regex;
use remote_storage::RemotePath;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use tracing::info;
use tracing::warn;
use utils::generation::Generation;
use utils::id::TenantId;
use utils::id::TimelineId;
use crate::config::PageServerConf;
use crate::metrics::DELETION_QUEUE_ERRORS;
use crate::metrics::DELETION_QUEUE_SUBMITTED;
use crate::tenant::remote_timeline_client::remote_layer_path;
use crate::tenant::storage_layer::LayerFileName;
// The number of keys in a DeletionList before we will proactively persist it
// (without reaching a flush deadline). This aims to deliver objects of the order
// of magnitude 1MB when we are under heavy delete load.
const DELETION_LIST_TARGET_SIZE: usize = 16384;
// Ordinarily, we only flush to DeletionList periodically, to bound the window during
// which we might leak objects from not flushing a DeletionList after
// the objects are already unlinked from timeline metadata.
const FRONTEND_DEFAULT_TIMEOUT: Duration = Duration::from_millis(10000);
// If someone is waiting for a flush to DeletionList, only delay a little to accumulate
// more objects before doing the flush.
const FRONTEND_FLUSHING_TIMEOUT: Duration = Duration::from_millis(100);
#[derive(Debug)]
pub(super) struct DeletionOp {
pub(super) tenant_id: TenantId,
pub(super) timeline_id: TimelineId,
// `layers` and `objects` are both just lists of objects. `layers` is used if you do not
// have a config object handy to project it to a remote key, and need the consuming worker
// to do it for you.
pub(super) layers: Vec<(LayerFileName, Generation)>,
pub(super) objects: Vec<RemotePath>,
/// The _current_ generation of the Tenant attachment in which we are enqueuing
/// this deletion.
pub(super) generation: Generation,
}
#[derive(Debug)]
pub(super) struct RecoverOp {
pub(super) attached_tenants: HashMap<TenantId, Generation>,
}
#[derive(Debug)]
pub(super) enum FrontendQueueMessage {
Delete(DeletionOp),
// Wait until all prior deletions make it into a persistent DeletionList
Flush(FlushOp),
// Wait until all prior deletions have been executed (i.e. objects are actually deleted)
FlushExecute(FlushOp),
// Call once after re-attaching to control plane, to notify the deletion queue about
// latest attached generations & load any saved deletion lists from disk.
Recover(RecoverOp),
}
pub(super) struct FrontendQueueWorker {
conf: &'static PageServerConf,
// Incoming frontend requests to delete some keys
rx: tokio::sync::mpsc::Receiver<FrontendQueueMessage>,
// Outbound requests to the backend to execute deletion lists we have composed.
tx: tokio::sync::mpsc::Sender<BackendQueueMessage>,
// The list we are currently building, contains a buffer of keys to delete
// and our next sequence number
pending: DeletionList,
// These FlushOps should fire the next time we flush
pending_flushes: Vec<FlushOp>,
// Worker loop is torn down when this fires.
cancel: CancellationToken,
}
impl FrontendQueueWorker {
// Initially DeletionHeader.validated_sequence is zero. The place we start our
// sequence numbers must be higher than that.
const BASE_SEQUENCE: u64 = 1;
pub(super) fn new(
conf: &'static PageServerConf,
rx: tokio::sync::mpsc::Receiver<FrontendQueueMessage>,
tx: tokio::sync::mpsc::Sender<BackendQueueMessage>,
cancel: CancellationToken,
) -> Self {
Self {
pending: DeletionList::new(Self::BASE_SEQUENCE),
conf,
rx,
tx,
pending_flushes: Vec::new(),
cancel,
}
}
/// Try to flush `list` to persistent storage
///
/// This does not return errors, because on failure to flush we do not lose
/// any state: flushing will be retried implicitly on the next deadline
async fn flush(&mut self) {
if self.pending.is_empty() {
for f in self.pending_flushes.drain(..) {
f.fire();
}
return;
}
match self.pending.save(self.conf).await {
Ok(_) => {
info!(sequence = self.pending.sequence, "Stored deletion list");
for f in self.pending_flushes.drain(..) {
f.fire();
}
let onward_list = self.pending.drain();
// We have consumed out of pending: reset it for the next incoming deletions to accumulate there
self.pending = DeletionList::new(self.pending.sequence + 1);
if let Err(e) = self.tx.send(BackendQueueMessage::Delete(onward_list)).await {
// This is allowed to fail: it will only happen if the backend worker is shut down,
// so we can just drop this on the floor.
info!("Deletion list dropped, this is normal during shutdown ({e:#})");
}
}
Err(e) => {
DELETION_QUEUE_ERRORS.with_label_values(&["put_list"]).inc();
warn!(
sequence = self.pending.sequence,
"Failed to write deletion list, will retry later ({e:#})"
);
}
}
}
/// Load the header, to learn the sequence number up to which deletions
/// have been validated. We will apply validated=true to DeletionLists
/// <= this sequence when loading them.
///
/// It is not an error for the header to not exist: we return None, and
/// the caller should act as if validated_sequence is 0
async fn load_validated_sequence(&self) -> Result<Option<u64>, anyhow::Error> {
let header_path = self.conf.deletion_header_path();
match tokio::fs::read(&header_path).await {
Ok(header_bytes) => {
match serde_json::from_slice::<DeletionHeader>(&header_bytes) {
Ok(h) => Ok(Some(h.validated_sequence)),
Err(e) => {
warn!(
"Failed to deserialize deletion header, ignoring {}: {e:#}",
header_path.display()
);
// This should never happen unless we make a mistake with our serialization.
// Ignoring a deletion header is not consequential for correctnes because all deletions
// are ultimately allowed to fail: worst case we leak some objects for the scrubber to clean up.
Ok(None)
}
}
}
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
debug!(
"Deletion header {} not found, first start?",
header_path.display()
);
Ok(None)
} else {
Err(anyhow::anyhow!(e))
}
}
}
}
async fn recover(
&mut self,
attached_tenants: HashMap<TenantId, Generation>,
) -> Result<(), anyhow::Error> {
debug!(
"recovering with {} attached tenants",
attached_tenants.len()
);
// Load the header
let validated_sequence = self.load_validated_sequence().await?.unwrap_or(0);
// Start our next deletion list from after the last location validated by
// previous process lifetime, or after the last location found (it is updated
// below after enumerating the deletion lists)
self.pending.sequence = std::cmp::max(self.pending.sequence, validated_sequence + 1);
let deletion_directory = self.conf.deletion_prefix();
let mut dir = match tokio::fs::read_dir(&deletion_directory).await {
Ok(d) => d,
Err(e) => {
warn!(
"Failed to open deletion list directory {}: {e:#}",
deletion_directory.display(),
);
// Give up: if we can't read the deletion list directory, we probably can't
// write lists into it later, so the queue won't work.
return Err(e.into());
}
};
let list_name_pattern = Regex::new("([a-zA-Z0-9]{16})-([a-zA-Z0-9]{2}).list").unwrap();
let header_path = self.conf.deletion_header_path();
let mut seqs: Vec<u64> = Vec::new();
while let Some(dentry) = dir.next_entry().await? {
if Some(dentry.file_name().as_os_str()) == header_path.file_name() {
// Don't try and parse the header's name like a list
continue;
}
let file_name = dentry.file_name().to_owned();
let basename = file_name.to_string_lossy();
let seq_part = if let Some(m) = list_name_pattern.captures(&basename) {
m.get(1)
.expect("Non optional group should be present")
.as_str()
} else {
warn!("Unexpected key in deletion queue: {basename}");
continue;
};
let seq: u64 = match u64::from_str_radix(seq_part, 16) {
Ok(s) => s,
Err(e) => {
warn!("Malformed key '{basename}': {e}");
continue;
}
};
seqs.push(seq);
}
seqs.sort();
// Initialize the next sequence number in the frontend based on the maximum of the highest list we see,
// and the last list that was deleted according to the header. Combined with writing out the header
// prior to deletions, this guarnatees no re-use of sequence numbers.
if let Some(max_list_seq) = seqs.last() {
self.pending.sequence = std::cmp::max(self.pending.sequence, max_list_seq + 1);
}
for s in seqs {
let list_path = self.conf.deletion_list_path(s);
let list_bytes = tokio::fs::read(&list_path).await?;
let mut deletion_list = match serde_json::from_slice::<DeletionList>(&list_bytes) {
Ok(l) => l,
Err(e) => {
// Drop the list on the floor: any objects it referenced will be left behind
// for scrubbing to clean up. This should never happen unless we have a serialization bug.
warn!(sequence = s, "Failed to deserialize deletion list: {e}");
continue;
}
};
if deletion_list.sequence <= validated_sequence {
// If the deletion list falls below valid_seq, we may assume that it was
// already validated the last time this pageserver ran. Otherwise, we still
// load it, as it may still contain content valid in this generation.
deletion_list.validated = true;
} else {
// Special case optimization: if a tenant is still attached, and no other
// generation was issued to another node in the interval while we restarted,
// then we may treat deletion lists from the previous generation as if they
// belong to our currently attached generation, and proceed to validate & execute.
for (tenant_id, tenant_list) in &mut deletion_list.tenants {
if let Some(attached_gen) = attached_tenants.get(tenant_id) {
if attached_gen.previous() == tenant_list.generation {
tenant_list.generation = *attached_gen;
}
}
}
}
info!(
validated = deletion_list.validated,
sequence = deletion_list.sequence,
"Recovered deletion list"
);
// We will drop out of recovery if this fails: it indicates that we are shutting down
// or the backend has panicked
DELETION_QUEUE_SUBMITTED.inc_by(deletion_list.len() as u64);
self.tx
.send(BackendQueueMessage::Delete(deletion_list))
.await?;
}
info!(next_sequence = self.pending.sequence, "Replay complete");
Ok(())
}
/// This is the front-end ingest, where we bundle up deletion requests into DeletionList
/// and write them out, for later validation by the backend and execution by the executor.
pub(super) async fn background(&mut self) {
info!("Started deletion frontend worker");
// Synchronous, but we only do it once per process lifetime so it's tolerable
if let Err(e) = create_dir_all(&self.conf.deletion_prefix()) {
tracing::error!(
"Failed to create deletion list directory {}, deletions will not be executed ({e})",
self.conf.deletion_prefix().display()
);
return;
}
while !self.cancel.is_cancelled() {
let timeout = if self.pending_flushes.is_empty() {
FRONTEND_DEFAULT_TIMEOUT
} else {
FRONTEND_FLUSHING_TIMEOUT
};
let msg = match tokio::time::timeout(timeout, self.rx.recv()).await {
Ok(Some(msg)) => msg,
Ok(None) => {
// Queue sender destroyed, shutting down
break;
}
Err(_) => {
// Hit deadline, flush.
self.flush().await;
continue;
}
};
match msg {
FrontendQueueMessage::Delete(op) => {
debug!(
"Delete: ingesting {} layers, {} other objects",
op.layers.len(),
op.objects.len()
);
let mut layer_paths = Vec::new();
for (layer, generation) in op.layers {
layer_paths.push(remote_layer_path(
&op.tenant_id,
&op.timeline_id,
&layer,
generation,
));
}
layer_paths.extend(op.objects);
if !self.pending.push(
&op.tenant_id,
&op.timeline_id,
op.generation,
&mut layer_paths,
) {
self.flush().await;
let retry_succeeded = self.pending.push(
&op.tenant_id,
&op.timeline_id,
op.generation,
&mut layer_paths,
);
if !retry_succeeded {
// Unexpected: after we flush, we should have
// drained self.pending, so a conflict on
// generation numbers should be impossible.
tracing::error!(
"Failed to enqueue deletions, leaking objects. This is a bug."
);
}
}
}
FrontendQueueMessage::Flush(op) => {
if self.pending.is_empty() {
// Execute immediately
debug!("Flush: No pending objects, flushing immediately");
op.fire()
} else {
// Execute next time we flush
debug!("Flush: adding to pending flush list for next deadline flush");
self.pending_flushes.push(op);
}
}
FrontendQueueMessage::FlushExecute(op) => {
debug!("FlushExecute: passing through to backend");
// We do not flush to a deletion list here: the client sends a Flush before the FlushExecute
if let Err(e) = self.tx.send(BackendQueueMessage::Flush(op)).await {
info!("Can't flush, shutting down ({e})");
// Caller will get error when their oneshot sender was dropped.
}
}
FrontendQueueMessage::Recover(op) => {
if let Err(e) = self.recover(op.attached_tenants).await {
// This should only happen in truly unrecoverable cases, like the recovery finding that the backend
// queue receiver has been dropped, or something is critically broken with
// the local filesystem holding deletion lists.
info!(
"Deletion queue recover aborted, deletion queue will not proceed ({e})"
);
return;
}
}
}
if self.pending.len() > DELETION_LIST_TARGET_SIZE || !self.pending_flushes.is_empty() {
self.flush().await;
}
}
info!("Deletion queue shut down.");
}
}

View File

@@ -5,7 +5,6 @@ use std::collections::HashMap;
use std::sync::Arc;
use anyhow::{anyhow, Context, Result};
use futures::TryFutureExt;
use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use metrics::launch_timestamp::LaunchTimestamp;
@@ -1147,39 +1146,6 @@ async fn timeline_download_remote_layers_handler_get(
json_response(StatusCode::OK, info)
}
async fn deletion_queue_flush(
r: Request<Body>,
cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let state = get_state(&r);
if state.remote_storage.is_none() {
// Nothing to do if remote storage is disabled.
return json_response(StatusCode::OK, ());
}
let execute = parse_query_param(&r, "execute")?.unwrap_or(false);
let flush = async {
if execute {
state.deletion_queue_client.flush_execute().await
} else {
state.deletion_queue_client.flush().await
}
}
// DeletionQueueError's only case is shutting down.
.map_err(|_| ApiError::ShuttingDown);
tokio::select! {
res = flush => {
res.map(|()| json_response(StatusCode::OK, ()))?
}
_ = cancel.cancelled() => {
Err(ApiError::ShuttingDown)
}
}
}
async fn active_timeline_of_active_tenant(
tenant_id: TenantId,
timeline_id: TimelineId,
@@ -1514,9 +1480,6 @@ pub fn make_router(
.put("/v1/disk_usage_eviction/run", |r| {
api_handler(r, disk_usage_eviction_run)
})
.put("/v1/deletion_queue/flush", |r| {
api_handler(r, deletion_queue_flush)
})
.put("/v1/tenant/:tenant_id/break", |r| {
testing_api_handler("set tenant state to broken", r, handle_tenant_break)
})

View File

@@ -3,7 +3,6 @@ pub mod basebackup;
pub mod config;
pub mod consumption_metrics;
pub mod context;
pub mod control_plane_client;
pub mod deletion_queue;
pub mod disk_usage_eviction_task;
pub mod http;
@@ -51,7 +50,7 @@ static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
pub use crate::metrics::preinitialize_metrics;
#[tracing::instrument(skip_all, fields(%exit_code))]
pub async fn shutdown_pageserver(deletion_queue: Option<DeletionQueue>, exit_code: i32) {
pub async fn shutdown_pageserver(_deletion_queue: Option<DeletionQueue>, exit_code: i32) {
use std::time::Duration;
// Shut down the libpq endpoint task. This prevents new connections from
// being accepted.
@@ -79,11 +78,6 @@ pub async fn shutdown_pageserver(deletion_queue: Option<DeletionQueue>, exit_cod
)
.await;
// Best effort to persist any outstanding deletions, to avoid leaking objects
if let Some(mut deletion_queue) = deletion_queue {
deletion_queue.shutdown(Duration::from_secs(5)).await;
}
// Shut down the HTTP endpoint last, so that you can still check the server's
// status while it's shutting down.
// FIXME: We should probably stop accepting commands like attach/detach earlier.

View File

@@ -874,14 +874,6 @@ pub(crate) static DELETION_QUEUE_SUBMITTED: Lazy<IntCounter> = Lazy::new(|| {
.expect("failed to define a metric")
});
pub(crate) static DELETION_QUEUE_DROPPED: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_deletion_queue_dropped_total",
"Number of object deletions dropped due to stale generation."
)
.expect("failed to define a metric")
});
pub(crate) static DELETION_QUEUE_EXECUTED: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_deletion_queue_executed_total",

View File

@@ -20,7 +20,6 @@ use utils::crashsafe;
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::control_plane_client::{ControlPlaneClient, ControlPlaneGenerationsApi};
use crate::deletion_queue::DeletionQueueClient;
use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::TenantConfOpt;
@@ -65,17 +64,6 @@ impl TenantsMap {
}
}
/// This is "safe" in that that it won't leave behind a partially deleted directory
/// at the original path, because we rename with TEMP_FILE_SUFFIX before starting deleting
/// the contents.
///
/// This is pageserver-specific, as it relies on future processes after a crash to check
/// for TEMP_FILE_SUFFIX when loading things.
async fn safe_remove_tenant_dir_all(path: impl AsRef<Path>) -> std::io::Result<()> {
let tmp_path = safe_rename_tenant_dir(path).await?;
fs::remove_dir_all(tmp_path).await
}
async fn safe_rename_tenant_dir(path: impl AsRef<Path>) -> std::io::Result<PathBuf> {
let parent = path
.as_ref()
@@ -115,22 +103,6 @@ pub async fn init_tenant_mgr(
let mut tenants = HashMap::new();
// If we are configured to use the control plane API, then it is the source of truth for what tenants to load.
let tenant_generations = if let Some(client) = ControlPlaneClient::new(conf, &cancel) {
let result = client.re_attach().await?;
// Tip off the deletion queue about latest attached generations before starting any Tenants
resources
.deletion_queue_client
.recover(result.clone())
.await?;
Some(result)
} else {
info!("Control plane API not configured, tenant generations are disabled");
None
};
let mut dir_entries = fs::read_dir(&tenants_dir)
.await
.with_context(|| format!("Failed to list tenants dir {tenants_dir:?}"))?;
@@ -196,31 +168,7 @@ pub async fn init_tenant_mgr(
}
};
let generation = if let Some(generations) = &tenant_generations {
// We have a generation map: treat it as the authority for whether
// this tenant is really attached.
if let Some(gen) = generations.get(&tenant_id) {
*gen
} else {
info!("Detaching tenant {tenant_id}, control plane omitted it in re-attach response");
if let Err(e) = safe_remove_tenant_dir_all(&tenant_dir_path).await {
error!(
"Failed to remove detached tenant directory '{}': {:?}",
tenant_dir_path.display(),
e
);
}
continue;
}
} else {
// Legacy mode: no generation information, any tenant present
// on local disk may activate
info!(
"Starting tenant {} in legacy mode, no generation",
tenant_dir_path.display()
);
Generation::none()
};
let generation = Generation::none();
match schedule_local_tenant_processing(
conf,

View File

@@ -1196,67 +1196,49 @@ impl RemoteTimelineClient {
}
// The task has completed successfully. Remove it from the in-progress list.
let lsn_update = {
let mut upload_queue_guard = self.upload_queue.lock().unwrap();
let upload_queue = match upload_queue_guard.deref_mut() {
UploadQueue::Uninitialized => panic!("callers are responsible for ensuring this is only called on an initialized queue"),
UploadQueue::Stopped(_stopped) => {
None
},
UploadQueue::Initialized(qi) => { Some(qi) }
};
let upload_queue = match upload_queue {
Some(upload_queue) => upload_queue,
None => {
info!("another concurrent task already stopped the queue");
return;
}
};
upload_queue.inprogress_tasks.remove(&task.task_id);
let lsn_update = match task.op {
UploadOp::UploadLayer(_, _) => {
upload_queue.num_inprogress_layer_uploads -= 1;
None
}
UploadOp::UploadMetadata(_, lsn) => {
upload_queue.num_inprogress_metadata_uploads -= 1;
// XXX monotonicity check?
upload_queue.projected_remote_consistent_lsn = Some(lsn);
if self.generation.is_none() {
// Legacy mode: skip validating generation
upload_queue.visible_remote_consistent_lsn.store(lsn);
None
} else {
Some((lsn, upload_queue.visible_remote_consistent_lsn.clone()))
}
}
UploadOp::Delete(_) => {
upload_queue.num_inprogress_deletions -= 1;
None
}
UploadOp::Barrier(_) => unreachable!(),
};
// Launch any queued tasks that were unblocked by this one.
self.launch_queued_tasks(upload_queue);
lsn_update
let mut upload_queue_guard = self.upload_queue.lock().unwrap();
let upload_queue = match upload_queue_guard.deref_mut() {
UploadQueue::Uninitialized => panic!(
"callers are responsible for ensuring this is only called on an initialized queue"
),
UploadQueue::Stopped(_stopped) => None,
UploadQueue::Initialized(qi) => Some(qi),
};
if let Some((lsn, slot)) = lsn_update {
self.deletion_queue_client
.update_remote_consistent_lsn(
self.tenant_id,
self.timeline_id,
self.generation,
lsn,
slot,
)
.await;
}
let upload_queue = match upload_queue {
Some(upload_queue) => upload_queue,
None => {
info!("another concurrent task already stopped the queue");
return;
}
};
upload_queue.inprogress_tasks.remove(&task.task_id);
match task.op {
UploadOp::UploadLayer(_, _) => {
upload_queue.num_inprogress_layer_uploads -= 1;
}
UploadOp::UploadMetadata(_, lsn) => {
upload_queue.num_inprogress_metadata_uploads -= 1;
// XXX monotonicity check?
upload_queue.projected_remote_consistent_lsn = Some(lsn);
if self.generation.is_none() {
// Legacy mode: skip validating generation
upload_queue.visible_remote_consistent_lsn.store(lsn);
} else {
unimplemented!("no support for generations yet")
}
}
UploadOp::Delete(_) => {
upload_queue.num_inprogress_deletions -= 1;
}
UploadOp::Barrier(_) => unreachable!(),
};
// Launch any queued tasks that were unblocked by this one.
self.launch_queued_tasks(upload_queue);
self.calls_unfinished_metric_end(&task.op);
}