mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
@@ -10,12 +10,12 @@ use remote_storage::GenericRemoteStorage;
|
||||
use remote_storage::RemotePath;
|
||||
use remote_storage::TimeoutOrCancel;
|
||||
use remote_storage::MAX_KEYS_PER_DELETE;
|
||||
use utils::pausable_failpoint;
|
||||
use std::time::Duration;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
use utils::backoff;
|
||||
use utils::pausable_failpoint;
|
||||
|
||||
use crate::metrics;
|
||||
|
||||
|
||||
@@ -822,7 +822,7 @@ impl RemoteTimelineClient {
|
||||
|
||||
self.schedule_index_upload(upload_queue)?;
|
||||
|
||||
Some(self.schedule_barrier0(upload_queue))
|
||||
Some(self.schedule_barrier0(upload_queue, false))
|
||||
}
|
||||
};
|
||||
|
||||
@@ -861,7 +861,7 @@ impl RemoteTimelineClient {
|
||||
|
||||
self.schedule_index_upload(upload_queue)?;
|
||||
|
||||
Some(self.schedule_barrier0(upload_queue))
|
||||
Some(self.schedule_barrier0(upload_queue, false))
|
||||
}
|
||||
};
|
||||
|
||||
@@ -899,7 +899,9 @@ impl RemoteTimelineClient {
|
||||
|
||||
match (current, uploaded) {
|
||||
(x, y) if wanted(x) && wanted(y) => None,
|
||||
(x, y) if wanted(x) && !wanted(y) => Some(self.schedule_barrier0(upload_queue)),
|
||||
(x, y) if wanted(x) && !wanted(y) => {
|
||||
Some(self.schedule_barrier0(upload_queue, false))
|
||||
}
|
||||
// Usual case: !wanted(x) && !wanted(y)
|
||||
//
|
||||
// Unusual: !wanted(x) && wanted(y) which means we have two processes waiting to
|
||||
@@ -916,7 +918,7 @@ impl RemoteTimelineClient {
|
||||
.map(|x| x.with_reason(reason))
|
||||
.or_else(|| Some(index::GcBlocking::started_now_for(reason)));
|
||||
self.schedule_index_upload(upload_queue)?;
|
||||
Some(self.schedule_barrier0(upload_queue))
|
||||
Some(self.schedule_barrier0(upload_queue, false))
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -959,7 +961,9 @@ impl RemoteTimelineClient {
|
||||
|
||||
match (current, uploaded) {
|
||||
(x, y) if wanted(x) && wanted(y) => None,
|
||||
(x, y) if wanted(x) && !wanted(y) => Some(self.schedule_barrier0(upload_queue)),
|
||||
(x, y) if wanted(x) && !wanted(y) => {
|
||||
Some(self.schedule_barrier0(upload_queue, false))
|
||||
}
|
||||
(x, y) => {
|
||||
if !wanted(x) && wanted(y) {
|
||||
warn!(?reason, op="remove", "unexpected: two racing processes to enable and disable a gc blocking reason (remove)");
|
||||
@@ -970,7 +974,7 @@ impl RemoteTimelineClient {
|
||||
assert!(wanted(upload_queue.dirty.gc_blocking.as_ref()));
|
||||
// FIXME: bogus ?
|
||||
self.schedule_index_upload(upload_queue)?;
|
||||
Some(self.schedule_barrier0(upload_queue))
|
||||
Some(self.schedule_barrier0(upload_queue, false))
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -1226,7 +1230,7 @@ impl RemoteTimelineClient {
|
||||
let upload_queue = guard
|
||||
.initialized_mut()
|
||||
.map_err(WaitCompletionError::NotInitialized)?;
|
||||
self.schedule_barrier0(upload_queue)
|
||||
self.schedule_barrier0(upload_queue, false)
|
||||
};
|
||||
|
||||
Self::wait_completion0(receiver).await
|
||||
@@ -1242,19 +1246,20 @@ impl RemoteTimelineClient {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn schedule_barrier(self: &Arc<Self>) -> anyhow::Result<()> {
|
||||
pub(crate) fn schedule_barrier(self: &Arc<Self>, initial_barrier: bool) -> anyhow::Result<()> {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
self.schedule_barrier0(upload_queue);
|
||||
self.schedule_barrier0(upload_queue, initial_barrier);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn schedule_barrier0(
|
||||
self: &Arc<Self>,
|
||||
upload_queue: &mut UploadQueueInitialized,
|
||||
initial_barrier: bool,
|
||||
) -> tokio::sync::watch::Receiver<()> {
|
||||
let (sender, receiver) = tokio::sync::watch::channel(());
|
||||
let barrier_op = UploadOp::Barrier(sender);
|
||||
let barrier_op = UploadOp::Barrier(sender, initial_barrier);
|
||||
|
||||
upload_queue.queued_operations.push_back(barrier_op);
|
||||
// Don't count this kind of operation!
|
||||
@@ -1723,8 +1728,8 @@ impl RemoteTimelineClient {
|
||||
// Can we run this task now?
|
||||
let can_run_now = match next_op {
|
||||
UploadOp::UploadLayer(..) => {
|
||||
// Can always be scheduled.
|
||||
true
|
||||
// Can always be scheduled except when there's a barrier
|
||||
upload_queue.num_inprogress_barriers == 0
|
||||
}
|
||||
UploadOp::UploadMetadata { .. } => {
|
||||
// These can only be performed after all the preceding operations
|
||||
@@ -1736,7 +1741,7 @@ impl RemoteTimelineClient {
|
||||
upload_queue.num_inprogress_deletions == upload_queue.inprogress_tasks.len()
|
||||
}
|
||||
|
||||
UploadOp::Barrier(_) | UploadOp::Shutdown => {
|
||||
UploadOp::Barrier(_, _) | UploadOp::Shutdown => {
|
||||
upload_queue.inprogress_tasks.is_empty()
|
||||
}
|
||||
};
|
||||
@@ -1774,10 +1779,15 @@ impl RemoteTimelineClient {
|
||||
UploadOp::Delete(_) => {
|
||||
upload_queue.num_inprogress_deletions += 1;
|
||||
}
|
||||
UploadOp::Barrier(sender) => {
|
||||
UploadOp::Barrier(sender, false) => {
|
||||
// For other barriers, simply send back the ack.
|
||||
sender.send_replace(());
|
||||
continue;
|
||||
}
|
||||
UploadOp::Barrier(_, true) => {
|
||||
// For initial barrier, we need to wait for deletions.
|
||||
upload_queue.num_inprogress_barriers += 1;
|
||||
}
|
||||
UploadOp::Shutdown => unreachable!("shutdown is intentionally never popped off"),
|
||||
};
|
||||
|
||||
@@ -1911,19 +1921,25 @@ impl RemoteTimelineClient {
|
||||
}
|
||||
res
|
||||
}
|
||||
UploadOp::Delete(delete) => {
|
||||
pausable_failpoint!("before-delete-layer-pausable");
|
||||
self.deletion_queue_client
|
||||
.push_layers(
|
||||
self.tenant_shard_id,
|
||||
self.timeline_id,
|
||||
self.generation,
|
||||
delete.layers.clone(),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!(e))
|
||||
}
|
||||
unexpected @ UploadOp::Barrier(_) | unexpected @ UploadOp::Shutdown => {
|
||||
UploadOp::Delete(delete) => self
|
||||
.deletion_queue_client
|
||||
.push_layers(
|
||||
self.tenant_shard_id,
|
||||
self.timeline_id,
|
||||
self.generation,
|
||||
delete.layers.clone(),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!(e)),
|
||||
// Barrier flushes up the deletion queue. Usually, we don't wait until deletion
|
||||
// completes before returning from `deletion_queue.push_layers` due to generation
|
||||
// optimizations, but in case of a barrier scheduled, we need to wait.
|
||||
UploadOp::Barrier(_, _) => self
|
||||
.deletion_queue_client
|
||||
.flush_execute()
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!(e)),
|
||||
unexpected @ UploadOp::Shutdown => {
|
||||
// unreachable. Barrier operations are handled synchronously in
|
||||
// launch_queued_tasks
|
||||
warn!("unexpected {unexpected:?} operation in perform_upload_task");
|
||||
@@ -1933,6 +1949,10 @@ impl RemoteTimelineClient {
|
||||
|
||||
match upload_result {
|
||||
Ok(()) => {
|
||||
if let UploadOp::Barrier(sender, _) = &task.op {
|
||||
// Notify the caller that the barrier has been reached.
|
||||
sender.send(()).ok();
|
||||
}
|
||||
break;
|
||||
}
|
||||
Err(e) if TimeoutOrCancel::caused_by_cancel(&e) => {
|
||||
@@ -2036,7 +2056,11 @@ impl RemoteTimelineClient {
|
||||
upload_queue.num_inprogress_deletions -= 1;
|
||||
None
|
||||
}
|
||||
UploadOp::Barrier(..) | UploadOp::Shutdown => unreachable!(),
|
||||
UploadOp::Barrier(..) => {
|
||||
upload_queue.num_inprogress_barriers -= 1;
|
||||
None
|
||||
}
|
||||
UploadOp::Shutdown => unreachable!(),
|
||||
};
|
||||
|
||||
// Launch any queued tasks that were unblocked by this one.
|
||||
@@ -2162,6 +2186,7 @@ impl RemoteTimelineClient {
|
||||
num_inprogress_layer_uploads: 0,
|
||||
num_inprogress_metadata_uploads: 0,
|
||||
num_inprogress_deletions: 0,
|
||||
num_inprogress_barriers: 0,
|
||||
inprogress_tasks: HashMap::default(),
|
||||
queued_operations: VecDeque::default(),
|
||||
#[cfg(feature = "testing")]
|
||||
|
||||
@@ -2611,7 +2611,7 @@ impl Timeline {
|
||||
// (1) and (4) ONLY IF generation number gets bumped. There are some cases where
|
||||
// we load a tenant without bumping the generation number (i.e., detach ancestor
|
||||
// and timeline offload/un-offload). In those cases, we need to rely on the barrier.
|
||||
self.remote_client.schedule_barrier()?;
|
||||
self.remote_client.schedule_barrier(true)?;
|
||||
// Tenant::create_timeline will wait for these uploads to happen before returning, or
|
||||
// on retry.
|
||||
|
||||
|
||||
@@ -68,6 +68,7 @@ pub(crate) struct UploadQueueInitialized {
|
||||
pub(crate) num_inprogress_layer_uploads: usize,
|
||||
pub(crate) num_inprogress_metadata_uploads: usize,
|
||||
pub(crate) num_inprogress_deletions: usize,
|
||||
pub(crate) num_inprogress_barriers: usize,
|
||||
|
||||
/// Tasks that are currently in-progress. In-progress means that a tokio Task
|
||||
/// has been launched for it. An in-progress task can be busy uploading, but it can
|
||||
@@ -176,6 +177,7 @@ impl UploadQueue {
|
||||
num_inprogress_layer_uploads: 0,
|
||||
num_inprogress_metadata_uploads: 0,
|
||||
num_inprogress_deletions: 0,
|
||||
num_inprogress_barriers: 0,
|
||||
inprogress_tasks: HashMap::new(),
|
||||
queued_operations: VecDeque::new(),
|
||||
#[cfg(feature = "testing")]
|
||||
@@ -216,6 +218,7 @@ impl UploadQueue {
|
||||
num_inprogress_layer_uploads: 0,
|
||||
num_inprogress_metadata_uploads: 0,
|
||||
num_inprogress_deletions: 0,
|
||||
num_inprogress_barriers: 0,
|
||||
inprogress_tasks: HashMap::new(),
|
||||
queued_operations: VecDeque::new(),
|
||||
#[cfg(feature = "testing")]
|
||||
@@ -290,7 +293,10 @@ pub(crate) enum UploadOp {
|
||||
Delete(Delete),
|
||||
|
||||
/// Barrier. When the barrier operation is reached, the channel is closed.
|
||||
Barrier(tokio::sync::watch::Sender<()>),
|
||||
/// The boolean value indicates whether the barrier is an initial barrier scheduled
|
||||
/// at timeline load -- if yes, we will need to wait for all deletions to be completed
|
||||
/// before the next upload.
|
||||
Barrier(tokio::sync::watch::Sender<()>, bool),
|
||||
|
||||
/// Shutdown; upon encountering this operation no new operations will be spawned, otherwise
|
||||
/// this is the same as a Barrier.
|
||||
@@ -317,7 +323,8 @@ impl std::fmt::Display for UploadOp {
|
||||
UploadOp::Delete(delete) => {
|
||||
write!(f, "Delete({} layers)", delete.layers.len())
|
||||
}
|
||||
UploadOp::Barrier(_) => write!(f, "Barrier"),
|
||||
UploadOp::Barrier(_, false) => write!(f, "Barrier"),
|
||||
UploadOp::Barrier(_, true) => write!(f, "Barrier (initial)"),
|
||||
UploadOp::Shutdown => write!(f, "Shutdown"),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -174,10 +174,8 @@ def test_issue_5878(neon_env_builder: NeonEnvBuilder, attach_mode: str):
|
||||
generation_before_detach = get_generation_number()
|
||||
env.pageserver.tenant_detach(tenant_id)
|
||||
failpoint_deletion_queue = "deletion-queue-before-execute-pause"
|
||||
failpoint_upload_queue = "before-delete-layer-pausable"
|
||||
|
||||
ps_http.configure_failpoints((failpoint_deletion_queue, "pause"))
|
||||
ps_http.configure_failpoints((failpoint_upload_queue, "off"))
|
||||
|
||||
if attach_mode == "default_generation":
|
||||
env.pageserver.tenant_attach(tenant_id, tenant_conf.tenant_specific_overrides)
|
||||
@@ -270,3 +268,6 @@ def test_issue_5878(neon_env_builder: NeonEnvBuilder, attach_mode: str):
|
||||
final_stat = future_layer_path.stat()
|
||||
log.info(f"future layer path: {future_layer_path}")
|
||||
assert final_stat.st_mtime != pre_stat.st_mtime
|
||||
|
||||
# Ensure no weird errors in the end...
|
||||
wait_for_upload_queue_empty(ps_http, tenant_id, timeline_id)
|
||||
|
||||
Reference in New Issue
Block a user