mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-01 20:40:37 +00:00
test: do graceful shutdown by default (#8655)
It should give us all possible allowed_errors more consistently. While getting the workflows to pass on https://github.com/neondatabase/neon/pull/8632 it was noticed that allowed_errors are rarely hit (1/4). This made me realize that we always do an immediate stop by default. Doing a graceful shutdown would had made the draining more apparent and likely we would not have needed the #8632 hotfix. Downside of doing this is that we will see more timeouts if tests are randomly leaving pause failpoints which fail the shutdown. The net outcome should however be positive, we could even detect too slow shutdowns caused by a bug or deadlock.
This commit is contained in:
@@ -41,6 +41,7 @@ use tokio::sync::watch;
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use upload_queue::NotInitialized;
|
||||
use utils::backoff;
|
||||
use utils::circuit_breaker::CircuitBreaker;
|
||||
use utils::completion;
|
||||
@@ -601,6 +602,15 @@ impl From<PageReconstructError> for GcError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<NotInitialized> for GcError {
|
||||
fn from(value: NotInitialized) -> Self {
|
||||
match value {
|
||||
NotInitialized::Uninitialized => GcError::Remote(value.into()),
|
||||
NotInitialized::Stopped | NotInitialized::ShuttingDown => GcError::TimelineCancelled,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<timeline::layer_manager::Shutdown> for GcError {
|
||||
fn from(_: timeline::layer_manager::Shutdown) -> Self {
|
||||
GcError::TimelineCancelled
|
||||
|
||||
@@ -985,7 +985,10 @@ impl RemoteTimelineClient {
|
||||
///
|
||||
/// The files will be leaked in remote storage unless [`Self::schedule_deletion_of_unlinked`]
|
||||
/// is invoked on them.
|
||||
pub(crate) fn schedule_gc_update(self: &Arc<Self>, gc_layers: &[Layer]) -> anyhow::Result<()> {
|
||||
pub(crate) fn schedule_gc_update(
|
||||
self: &Arc<Self>,
|
||||
gc_layers: &[Layer],
|
||||
) -> Result<(), NotInitialized> {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
|
||||
|
||||
@@ -369,9 +369,6 @@ impl ImageLayerInner {
|
||||
self.lsn
|
||||
}
|
||||
|
||||
/// Returns nested result following Result<Result<_, OpErr>, Critical>:
|
||||
/// - inner has the success or transient failure
|
||||
/// - outer has the permanent failure
|
||||
pub(super) async fn load(
|
||||
path: &Utf8Path,
|
||||
lsn: Lsn,
|
||||
|
||||
@@ -1848,8 +1848,8 @@ impl ResidentLayer {
|
||||
/// Read all they keys in this layer which match the ShardIdentity, and write them all to
|
||||
/// the provided writer. Return the number of keys written.
|
||||
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all, fields(layer=%self))]
|
||||
pub(crate) async fn filter<'a>(
|
||||
&'a self,
|
||||
pub(crate) async fn filter(
|
||||
&self,
|
||||
shard_identity: &ShardIdentity,
|
||||
writer: &mut ImageLayerWriter,
|
||||
ctx: &RequestContext,
|
||||
|
||||
@@ -211,6 +211,11 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
} else {
|
||||
// Run compaction
|
||||
match tenant.compaction_iteration(&cancel, &ctx).await {
|
||||
Ok(has_pending_task) => {
|
||||
error_run_count = 0;
|
||||
// schedule the next compaction immediately in case there is a pending compaction task
|
||||
if has_pending_task { Duration::ZERO } else { period }
|
||||
}
|
||||
Err(e) => {
|
||||
let wait_duration = backoff::exponential_backoff_duration_seconds(
|
||||
error_run_count + 1,
|
||||
@@ -227,11 +232,6 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
);
|
||||
wait_duration
|
||||
}
|
||||
Ok(has_pending_task) => {
|
||||
error_run_count = 0;
|
||||
// schedule the next compaction immediately in case there is a pending compaction task
|
||||
if has_pending_task { Duration::from_secs(0) } else { period }
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -265,7 +265,8 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
count_throttled,
|
||||
sum_throttled_usecs,
|
||||
allowed_rps=%format_args!("{allowed_rps:.0}"),
|
||||
"shard was throttled in the last n_seconds")
|
||||
"shard was throttled in the last n_seconds"
|
||||
);
|
||||
});
|
||||
|
||||
// Sleep
|
||||
|
||||
@@ -4421,22 +4421,24 @@ impl From<super::upload_queue::NotInitialized> for CompactionError {
|
||||
}
|
||||
}
|
||||
|
||||
impl CompactionError {
|
||||
/// We cannot do compaction because we could not download a layer that is input to the compaction.
|
||||
pub(crate) fn input_layer_download_failed(
|
||||
e: super::storage_layer::layer::DownloadError,
|
||||
) -> Self {
|
||||
impl From<super::storage_layer::layer::DownloadError> for CompactionError {
|
||||
fn from(e: super::storage_layer::layer::DownloadError) -> Self {
|
||||
match e {
|
||||
super::storage_layer::layer::DownloadError::TimelineShutdown |
|
||||
/* TODO DownloadCancelled correct here? */
|
||||
super::storage_layer::layer::DownloadError::DownloadCancelled => CompactionError::ShuttingDown,
|
||||
super::storage_layer::layer::DownloadError::ContextAndConfigReallyDeniesDownloads |
|
||||
super::storage_layer::layer::DownloadError::DownloadRequired |
|
||||
super::storage_layer::layer::DownloadError::NotFile(_) |
|
||||
super::storage_layer::layer::DownloadError::DownloadFailed |
|
||||
super::storage_layer::layer::DownloadError::PreStatFailed(_)=>CompactionError::Other(anyhow::anyhow!(e)),
|
||||
super::storage_layer::layer::DownloadError::TimelineShutdown
|
||||
| super::storage_layer::layer::DownloadError::DownloadCancelled => {
|
||||
CompactionError::ShuttingDown
|
||||
}
|
||||
super::storage_layer::layer::DownloadError::ContextAndConfigReallyDeniesDownloads
|
||||
| super::storage_layer::layer::DownloadError::DownloadRequired
|
||||
| super::storage_layer::layer::DownloadError::NotFile(_)
|
||||
| super::storage_layer::layer::DownloadError::DownloadFailed
|
||||
| super::storage_layer::layer::DownloadError::PreStatFailed(_) => {
|
||||
CompactionError::Other(anyhow::anyhow!(e))
|
||||
}
|
||||
#[cfg(test)]
|
||||
super::storage_layer::layer::DownloadError::Failpoint(_) => CompactionError::Other(anyhow::anyhow!(e)),
|
||||
super::storage_layer::layer::DownloadError::Failpoint(_) => {
|
||||
CompactionError::Other(anyhow::anyhow!(e))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -4990,15 +4992,7 @@ impl Timeline {
|
||||
|
||||
result.layers_removed = gc_layers.len() as u64;
|
||||
|
||||
self.remote_client
|
||||
.schedule_gc_update(&gc_layers)
|
||||
.map_err(|e| {
|
||||
if self.cancel.is_cancelled() {
|
||||
GcError::TimelineCancelled
|
||||
} else {
|
||||
GcError::Remote(e)
|
||||
}
|
||||
})?;
|
||||
self.remote_client.schedule_gc_update(&gc_layers)?;
|
||||
|
||||
guard.open_mut()?.finish_gc_timeline(&gc_layers);
|
||||
|
||||
|
||||
@@ -489,10 +489,7 @@ impl Timeline {
|
||||
// - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are:
|
||||
// - GC, which at worst witnesses us "undelete" a layer that they just deleted.
|
||||
// - ingestion, which only inserts layers, therefore cannot collide with us.
|
||||
let resident = layer
|
||||
.download_and_keep_resident()
|
||||
.await
|
||||
.map_err(CompactionError::input_layer_download_failed)?;
|
||||
let resident = layer.download_and_keep_resident().await?;
|
||||
|
||||
let keys_written = resident
|
||||
.filter(&self.shard_identity, &mut image_layer_writer, ctx)
|
||||
@@ -693,23 +690,14 @@ impl Timeline {
|
||||
|
||||
let mut fully_compacted = true;
|
||||
|
||||
deltas_to_compact.push(
|
||||
first_level0_delta
|
||||
.download_and_keep_resident()
|
||||
.await
|
||||
.map_err(CompactionError::input_layer_download_failed)?,
|
||||
);
|
||||
deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
|
||||
for l in level0_deltas_iter {
|
||||
let lsn_range = &l.layer_desc().lsn_range;
|
||||
|
||||
if lsn_range.start != prev_lsn_end {
|
||||
break;
|
||||
}
|
||||
deltas_to_compact.push(
|
||||
l.download_and_keep_resident()
|
||||
.await
|
||||
.map_err(CompactionError::input_layer_download_failed)?,
|
||||
);
|
||||
deltas_to_compact.push(l.download_and_keep_resident().await?);
|
||||
deltas_to_compact_bytes += l.metadata().file_size;
|
||||
prev_lsn_end = lsn_range.end;
|
||||
|
||||
@@ -1137,6 +1125,10 @@ impl Timeline {
|
||||
|
||||
if !self.shard_identity.is_key_disposable(&key) {
|
||||
if writer.is_none() {
|
||||
if self.cancel.is_cancelled() {
|
||||
// to be somewhat responsive to cancellation, check for each new layer
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
}
|
||||
// Create writer if not initiaized yet
|
||||
writer = Some(
|
||||
DeltaLayerWriter::new(
|
||||
|
||||
@@ -335,6 +335,9 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
filtered_records += 1;
|
||||
}
|
||||
|
||||
// FIXME: this cannot be made pausable_failpoint without fixing the
|
||||
// failpoint library; in tests, the added amount of debugging will cause us
|
||||
// to timeout the tests.
|
||||
fail_point!("walreceiver-after-ingest");
|
||||
|
||||
last_rec_lsn = lsn;
|
||||
|
||||
@@ -963,7 +963,7 @@ class NeonEnvBuilder:
|
||||
if self.env:
|
||||
log.info("Cleaning up all storage and compute nodes")
|
||||
self.env.stop(
|
||||
immediate=True,
|
||||
immediate=False,
|
||||
# if the test threw an exception, don't check for errors
|
||||
# as a failing assertion would cause the cleanup below to fail
|
||||
ps_assert_metric_no_errors=(exc_type is None),
|
||||
|
||||
@@ -20,7 +20,9 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
|
||||
}
|
||||
)
|
||||
|
||||
pageserver_http.configure_failpoints(("flush-frozen-pausable", "sleep(10000)"))
|
||||
failpoint = "flush-frozen-pausable"
|
||||
|
||||
pageserver_http.configure_failpoints((failpoint, "sleep(10000)"))
|
||||
|
||||
endpoint_branch0 = env.endpoints.create_start("main", tenant_id=tenant)
|
||||
branch0_cur = endpoint_branch0.connect().cursor()
|
||||
@@ -96,3 +98,5 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
|
||||
assert query_scalar(branch1_cur, "SELECT count(*) FROM foo") == 200000
|
||||
|
||||
assert query_scalar(branch2_cur, "SELECT count(*) FROM foo") == 300000
|
||||
|
||||
pageserver_http.configure_failpoints((failpoint, "off"))
|
||||
|
||||
@@ -1137,3 +1137,10 @@ def test_lazy_attach_activation(neon_env_builder: NeonEnvBuilder, activation_met
|
||||
delete_lazy_activating(lazy_tenant, env.pageserver, expect_attaching=True)
|
||||
else:
|
||||
raise RuntimeError(activation_method)
|
||||
|
||||
client.configure_failpoints(
|
||||
[
|
||||
("timeline-calculate-logical-size-pause", "off"),
|
||||
("walreceiver-after-ingest", "off"),
|
||||
]
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user