passthrough wait_for_upload, better upload scheduling

Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
Alex Chi Z
2024-11-21 14:11:46 -05:00
parent 95474cfbe0
commit 499105da6d
7 changed files with 85 additions and 27 deletions

View File

@@ -3,6 +3,7 @@ mod list_writer;
mod validator;
use std::collections::HashMap;
use std::sync::atomic::AtomicU32;
use std::sync::Arc;
use std::time::Duration;
@@ -157,6 +158,7 @@ pub struct DeletionQueueClient {
executor_tx: tokio::sync::mpsc::Sender<DeleterMessage>,
lsn_table: Arc<std::sync::RwLock<VisibleLsnUpdates>>,
max_layer_generation_in_queue: Arc<AtomicU32>,
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
@@ -382,6 +384,17 @@ pub enum DeletionQueueError {
}
impl DeletionQueueClient {
/// Returns if there is any layer file <= `gen` in the deletion queue.
pub(crate) fn maybe_processing_generation(&self, gen: Generation) -> bool {
if let Some(gen) = gen.into() {
self.max_layer_generation_in_queue
.load(std::sync::atomic::Ordering::SeqCst)
>= gen
} else {
false
}
}
/// This is cancel-safe. If you drop the future before it completes, the message
/// is not pushed, although in the context of the deletion queue it doesn't matter: once
/// we decide to do a deletion the decision is always final.
@@ -505,6 +518,13 @@ impl DeletionQueueClient {
metrics::DELETION_QUEUE
.keys_submitted
.inc_by(layers.len() as u64);
for (_, meta) in &layers {
let Some(gen) = meta.generation.into() else {
continue;
};
self.max_layer_generation_in_queue
.fetch_max(gen, std::sync::atomic::Ordering::SeqCst);
}
self.do_push(
&self.tx,
ListWriterQueueMessage::Delete(DeletionOp {
@@ -651,6 +671,7 @@ impl DeletionQueue {
tx,
executor_tx: executor_tx.clone(),
lsn_table: lsn_table.clone(),
max_layer_generation_in_queue: Arc::new(AtomicU32::new(0)),
},
cancel: cancel.clone(),
},
@@ -1265,6 +1286,7 @@ pub(crate) mod mock {
tx: self.tx.clone(),
executor_tx: self.executor_tx.clone(),
lsn_table: self.lsn_table.clone(),
max_layer_generation_in_queue: Arc::new(AtomicU32::new(0)),
}
}
}

View File

@@ -244,7 +244,7 @@ use self::index::IndexPart;
use super::config::AttachedLocationConfig;
use super::metadata::MetadataUpdate;
use super::storage_layer::{Layer, LayerName, ResidentLayer};
use super::upload_queue::{NotInitialized, SetDeletedFlagProgress};
use super::upload_queue::{BarrierType, NotInitialized, SetDeletedFlagProgress};
use super::{DeleteTimelineError, Generation};
pub(crate) use download::{
@@ -896,7 +896,7 @@ impl RemoteTimelineClient {
self.schedule_index_upload(upload_queue)?;
Some(self.schedule_barrier0(upload_queue, false))
Some(self.schedule_wait_barrier0(upload_queue))
}
};
@@ -935,7 +935,7 @@ impl RemoteTimelineClient {
self.schedule_index_upload(upload_queue)?;
Some(self.schedule_barrier0(upload_queue, false))
Some(self.schedule_wait_barrier0(upload_queue))
}
};
@@ -974,7 +974,7 @@ impl RemoteTimelineClient {
match (current, uploaded) {
(x, y) if wanted(x) && wanted(y) => None,
(x, y) if wanted(x) && !wanted(y) => {
Some(self.schedule_barrier0(upload_queue, false))
Some(self.schedule_wait_barrier0(upload_queue))
}
// Usual case: !wanted(x) && !wanted(y)
//
@@ -992,7 +992,7 @@ impl RemoteTimelineClient {
.map(|x| x.with_reason(reason))
.or_else(|| Some(index::GcBlocking::started_now_for(reason)));
self.schedule_index_upload(upload_queue)?;
Some(self.schedule_barrier0(upload_queue, false))
Some(self.schedule_wait_barrier0(upload_queue))
}
}
};
@@ -1036,7 +1036,7 @@ impl RemoteTimelineClient {
match (current, uploaded) {
(x, y) if wanted(x) && wanted(y) => None,
(x, y) if wanted(x) && !wanted(y) => {
Some(self.schedule_barrier0(upload_queue, false))
Some(self.schedule_wait_barrier0(upload_queue))
}
(x, y) => {
if !wanted(x) && wanted(y) {
@@ -1048,7 +1048,7 @@ impl RemoteTimelineClient {
assert!(wanted(upload_queue.dirty.gc_blocking.as_ref()));
// FIXME: bogus ?
self.schedule_index_upload(upload_queue)?;
Some(self.schedule_barrier0(upload_queue, false))
Some(self.schedule_wait_barrier0(upload_queue))
}
}
};
@@ -1304,7 +1304,7 @@ impl RemoteTimelineClient {
let upload_queue = guard
.initialized_mut()
.map_err(WaitCompletionError::NotInitialized)?;
self.schedule_barrier0(upload_queue, false)
self.schedule_wait_barrier0(upload_queue)
};
Self::wait_completion0(receiver).await
@@ -1320,20 +1320,32 @@ impl RemoteTimelineClient {
Ok(())
}
pub(crate) fn schedule_barrier(self: &Arc<Self>, initial_barrier: bool) -> anyhow::Result<()> {
pub fn schedule_initial_barrier(self: &Arc<Self>) -> anyhow::Result<()> {
self.schedule_barrier(BarrierType::Initial)
}
fn schedule_barrier(self: &Arc<Self>, barrier: BarrierType) -> anyhow::Result<()> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
self.schedule_barrier0(upload_queue, initial_barrier);
self.schedule_barrier0(upload_queue, barrier);
Ok(())
}
/// Schedule a barrier to wait for all previously scheduled operations to complete.
fn schedule_wait_barrier0(
self: &Arc<Self>,
upload_queue: &mut UploadQueueInitialized,
) -> tokio::sync::watch::Receiver<()> {
self.schedule_barrier0(upload_queue, BarrierType::Normal)
}
fn schedule_barrier0(
self: &Arc<Self>,
upload_queue: &mut UploadQueueInitialized,
initial_barrier: bool,
barrier: BarrierType,
) -> tokio::sync::watch::Receiver<()> {
let (sender, receiver) = tokio::sync::watch::channel(());
let barrier_op = UploadOp::Barrier(sender, initial_barrier);
let barrier_op = UploadOp::Barrier(sender, barrier);
upload_queue.queued_operations.push_back(barrier_op);
// Don't count this kind of operation!
@@ -1801,9 +1813,12 @@ impl RemoteTimelineClient {
while let Some(next_op) = upload_queue.queued_operations.front() {
// Can we run this task now?
let can_run_now = match next_op {
UploadOp::UploadLayer(..) => {
// Can always be scheduled except when there's a barrier
UploadOp::UploadLayer(_, meta) => {
// Can always be scheduled except when there's a barrier, or if the deletion queue doesn't contain any file with the same/lower generation.
upload_queue.num_inprogress_barriers == 0
|| !self
.deletion_queue_client
.maybe_processing_generation(meta.generation)
}
UploadOp::UploadMetadata { .. } => {
// These can only be performed after all the preceding operations
@@ -1853,12 +1868,12 @@ impl RemoteTimelineClient {
UploadOp::Delete(_) => {
upload_queue.num_inprogress_deletions += 1;
}
UploadOp::Barrier(sender, false) => {
UploadOp::Barrier(sender, BarrierType::Normal) => {
// For other barriers, simply send back the ack.
sender.send_replace(());
continue;
}
UploadOp::Barrier(_, true) => {
UploadOp::Barrier(_, BarrierType::Initial) => {
// For initial barrier, we need to wait for deletions.
upload_queue.num_inprogress_barriers += 1;
}

View File

@@ -2649,7 +2649,7 @@ impl Timeline {
// (1) and (4) ONLY IF generation number gets bumped. There are some cases where
// we load a tenant without bumping the generation number (i.e., detach ancestor
// and timeline offload/un-offload). In those cases, we need to rely on the barrier.
self.remote_client.schedule_barrier(true)?;
self.remote_client.schedule_initial_barrier()?;
// Tenant::create_timeline will wait for these uploads to happen before returning, or
// on retry.

View File

@@ -283,6 +283,14 @@ pub(crate) struct Delete {
pub(crate) layers: Vec<(LayerName, LayerFileMetadata)>,
}
#[derive(Debug)]
pub(crate) enum BarrierType {
/// Barrier is a normal barrier, not an initial barrier.
Normal,
/// Barrier is an initial barrier, scheduled at timeline load.
Initial,
}
#[derive(Debug)]
pub(crate) enum UploadOp {
/// Upload a layer file
@@ -301,7 +309,7 @@ pub(crate) enum UploadOp {
/// The boolean value indicates whether the barrier is an initial barrier scheduled
/// at timeline load -- if yes, we will need to wait for all deletions to be completed
/// before the next upload.
Barrier(tokio::sync::watch::Sender<()>, bool),
Barrier(tokio::sync::watch::Sender<()>, BarrierType),
/// Shutdown; upon encountering this operation no new operations will be spawned, otherwise
/// this is the same as a Barrier.
@@ -328,8 +336,8 @@ impl std::fmt::Display for UploadOp {
UploadOp::Delete(delete) => {
write!(f, "Delete({} layers)", delete.layers.len())
}
UploadOp::Barrier(_, false) => write!(f, "Barrier"),
UploadOp::Barrier(_, true) => write!(f, "Barrier (initial)"),
UploadOp::Barrier(_, BarrierType::Normal) => write!(f, "Barrier"),
UploadOp::Barrier(_, BarrierType::Initial) => write!(f, "Barrier (initial)"),
UploadOp::Shutdown => write!(f, "Shutdown"),
}
}

View File

@@ -4947,6 +4947,7 @@ def last_flush_lsn_upload(
timeline_id: TimelineId,
pageserver_id: int | None = None,
auth_token: str | None = None,
wait_until_uploaded: bool = True,
) -> Lsn:
"""
Wait for pageserver to catch to the latest flush LSN of given endpoint,
@@ -4960,7 +4961,9 @@ def last_flush_lsn_upload(
for tenant_shard_id, pageserver in shards:
ps_http = pageserver.http_client(auth_token=auth_token)
wait_for_last_record_lsn(ps_http, tenant_shard_id, timeline_id, last_flush_lsn)
ps_http.timeline_checkpoint(tenant_shard_id, timeline_id, wait_until_uploaded=True)
ps_http.timeline_checkpoint(
tenant_shard_id, timeline_id, wait_until_uploaded=wait_until_uploaded
)
return last_flush_lsn
@@ -4985,7 +4988,7 @@ def generate_uploads_and_deletions(
timeline_id: TimelineId | None = None,
data: str | None = None,
pageserver: NeonPageserver,
wait_for_upload: bool = True,
wait_until_uploaded: bool = True,
):
"""
Using the environment's default tenant + timeline, generate a load pattern
@@ -5008,7 +5011,12 @@ def generate_uploads_and_deletions(
if init:
endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)")
last_flush_lsn_upload(
env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id
env,
endpoint,
tenant_id,
timeline_id,
pageserver_id=pageserver.id,
wait_until_uploaded=wait_until_uploaded,
)
def churn(data):
@@ -5031,7 +5039,12 @@ def generate_uploads_and_deletions(
# in a state where there are "future layers" in remote storage that will generate deletions
# after a restart.
last_flush_lsn_upload(
env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id
env,
endpoint,
tenant_id,
timeline_id,
pageserver_id=pageserver.id,
wait_until_uploaded=wait_until_uploaded,
)
# Compaction should generate some GC-elegible layers
@@ -5047,4 +5060,4 @@ def generate_uploads_and_deletions(
# background ingest, no more uploads pending, and therefore no non-determinism
# in subsequent actions like pageserver restarts.
flush_ep_to_pageserver(env, endpoint, tenant_id, timeline_id, pageserver.id)
ps_http.timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=wait_for_upload)
ps_http.timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=wait_until_uploaded)

View File

@@ -794,7 +794,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
if compact is not None:
query["compact"] = "true" if compact else "false"
log.info(f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}")
log.info(f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}, wait_until_uploaded={wait_until_uploaded}")
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint",
params=query,

View File

@@ -462,7 +462,7 @@ def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
# Because it is in emergency mode, it will not attempt to validate deletions required by the initial barrier, and therefore
# other files cannot be uploaded b/c it's waiting for the initial barrier to be validated.
generate_uploads_and_deletions(
env, init=False, pageserver=env.pageserver, wait_for_upload=False
env, init=False, pageserver=env.pageserver, wait_until_uploaded=False
)
# The pageserver should neither validate nor execute any deletions, it should have