Refactor deletion worker construction

This commit is contained in:
John Spray
2023-08-10 13:50:28 +01:00
parent 537eca489e
commit 975e4f2235
2 changed files with 37 additions and 36 deletions

View File

@@ -350,10 +350,14 @@ fn start_pageserver(
let remote_storage = create_remote_storage_client(conf)?;
// Set up deletion queue
let (deletion_queue, mut deletion_frontend, mut deletion_backend) =
let (deletion_queue, deletion_frontend, deletion_backend) =
DeletionQueue::new(remote_storage.clone(), conf);
BACKGROUND_RUNTIME.spawn(async move { deletion_frontend.background().await });
BACKGROUND_RUNTIME.spawn(async move { deletion_backend.background().await });
if let Some(mut deletion_frontend) = deletion_frontend {
BACKGROUND_RUNTIME.spawn(async move { deletion_frontend.background().await });
}
if let Some(mut deletion_backend) = deletion_backend {
BACKGROUND_RUNTIME.spawn(async move { deletion_backend.background().await });
}
// Up to this point no significant I/O has been done: this should have been fast. Record
// duration prior to starting I/O intensive phase of startup.

View File

@@ -12,12 +12,12 @@ use crate::{config::PageServerConf, tenant::storage_layer::LayerFileName};
// TODO: small value is just for testing, make this bigger
const DELETION_LIST_TARGET_SIZE: usize = 16;
// Ordinarily, we only flush periodically, to bound the window during
// Ordinarily, we only flush to DeletionList periodically, to bound the window during
// which we might leak objects from not flushing a DeletionList after
// the objects are already unlinked from timeline metadata.
const FLUSH_DEFAULT_DEADLINE: Duration = Duration::from_millis(10000);
// If someone is waiting for a flush, only delay a little to accumulate
// If someone is waiting for a flush to DeletionList, only delay a little to accumulate
// more objects before doing the flush.
const FLUSH_EXPLICIT_DEADLINE: Duration = Duration::from_millis(100);
@@ -28,6 +28,8 @@ const FLUSH_EXPLICIT_DEADLINE: Duration = Duration::from_millis(100);
// TODO: implement admin API hook to flush deletion queue, for use in integration tests
// that would like to assert deleted objects are gone
// TODO: configurable for how long to wait before executing deletions
/// We aggregate object deletions from many tenants in one place, for several reasons:
/// - Coalesce deletions into fewer DeleteObjects calls
/// - Enable Tenant/Timeline lifetimes to be shorter than the time it takes
@@ -167,7 +169,7 @@ impl DeletionQueueClient {
}
pub struct BackendQueueWorker {
remote_storage: Option<GenericRemoteStorage>,
remote_storage: GenericRemoteStorage,
conf: &'static PageServerConf,
rx: tokio::sync::mpsc::Receiver<BackendQueueMessage>,
@@ -185,16 +187,7 @@ pub struct BackendQueueWorker {
impl BackendQueueWorker {
async fn maybe_execute(&mut self) {
// TODO: refactor so that worker is just not constructed if there is no remote
let remote_storage = match &self.remote_storage {
Some(rs) => rs,
None => {
info!("No remote storage configured, deletion queue will not run");
return;
}
};
match remote_storage.delete_objects(&self.accumulator).await {
match self.remote_storage.delete_objects(&self.accumulator).await {
Ok(()) => {
self.accumulator.clear();
self.executed_lists.append(&mut self.pending_lists);
@@ -272,15 +265,7 @@ impl BackendQueueWorker {
})
.collect();
// TODO: refactor so that worker is just not constructed if there is no remote
let remote_storage = match &mut self.remote_storage {
Some(rs) => rs,
None => {
info!("No remote storage configured, deletion queue will not run");
return;
}
};
match remote_storage.delete_objects(&executed_keys).await {
match self.remote_storage.delete_objects(&executed_keys).await {
Ok(()) => {
// Retain any lists that couldn't be deleted in that request
self.executed_lists =
@@ -312,7 +297,7 @@ enum BackendQueueMessage {
}
pub struct FrontendQueueWorker {
remote_storage: Option<GenericRemoteStorage>,
remote_storage: GenericRemoteStorage,
conf: &'static PageServerConf,
// Incoming frontend requests to delete some keys
@@ -342,14 +327,11 @@ impl FrontendQueueWorker {
let key = RemotePath::new(&self.conf.remote_deletion_list_path(self.pending.sequence))
.expect("Failed to compose deletion list path");
// We don't run this worker unless there is remote storage available.
let remote_storage = self.remote_storage.as_ref().unwrap();
let bytes = serde_json::to_vec(&self.pending).expect("Failed to serialize deletion list");
let size = bytes.len();
let source = tokio::io::BufReader::new(std::io::Cursor::new(bytes));
match remote_storage.upload(source, size, &key, None).await {
match self.remote_storage.upload(source, size, &key, None).await {
Ok(_) => {
for f in self.pending_flushes.drain(..) {
f.fire();
@@ -394,6 +376,7 @@ impl FrontendQueueWorker {
}
},
_ = tokio::time::sleep(flush_delay) => {
self.deadline = Instant::now() + FLUSH_DEFAULT_DEADLINE;
if !self.pending.objects.is_empty() {
debug!("Flushing for deadline");
self.flush().await;
@@ -463,35 +446,49 @@ impl DeletionQueue {
}
}
/// Caller may use the returned object to construct clients with new_client.
/// Caller should tokio::spawn the background() members of the two worker objects returned:
/// we don't spawn those inside new() so that the caller can use their runtime/spans of choice.
///
/// If remote_storage is None, then the returned workers will also be None.
pub fn new(
remote_storage: Option<GenericRemoteStorage>,
conf: &'static PageServerConf,
) -> (Self, FrontendQueueWorker, BackendQueueWorker) {
) -> (
Self,
Option<FrontendQueueWorker>,
Option<BackendQueueWorker>,
) {
let (tx, rx) = tokio::sync::mpsc::channel(16384);
let remote_storage = match remote_storage {
None => return (Self { tx }, None, None),
Some(r) => r,
};
let (backend_tx, backend_rx) = tokio::sync::mpsc::channel(16384);
(
Self { tx },
FrontendQueueWorker {
Some(FrontendQueueWorker {
// TODO: on startup, recover sequence number by listing persistent list objects,
// *or* if we implement generation numbers, we may start from 0 every time
pending: DeletionList::new(0xdeadbeef),
remote_storage: remote_storage.as_ref().map(|s| s.clone()),
remote_storage: remote_storage.clone(),
conf,
rx,
tx: backend_tx,
deadline: Instant::now() + FLUSH_DEFAULT_DEADLINE,
pending_flushes: Vec::new(),
},
BackendQueueWorker {
}),
Some(BackendQueueWorker {
remote_storage,
conf,
rx: backend_rx,
accumulator: Vec::new(),
pending_lists: Vec::new(),
executed_lists: Vec::new(),
},
}),
)
}
}