mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 22:12:56 +00:00
pageserver: clean up DeletionQueue push_layers_sync (#10701)
## Problem This is tech debt. While we introduced generations for tenants, some legacy situations without generations needed to delete things inline (async operation) instead of enqueing them (sync operation). ## Summary of changes - Remove the async code, replace calls with the sync variant, and assert that the generation is always set
This commit is contained in:
@@ -8,7 +8,6 @@ use std::time::Duration;
|
||||
|
||||
use crate::controller_upcall_client::ControlPlaneGenerationsApi;
|
||||
use crate::metrics;
|
||||
use crate::tenant::remote_timeline_client::remote_layer_path;
|
||||
use crate::tenant::remote_timeline_client::remote_timeline_path;
|
||||
use crate::tenant::remote_timeline_client::LayerFileMetadata;
|
||||
use crate::virtual_file::MaybeFatalIo;
|
||||
@@ -463,45 +462,18 @@ impl DeletionQueueClient {
|
||||
///
|
||||
/// The `current_generation` is the generation of this pageserver's current attachment. The
|
||||
/// generations in `layers` are the generations in which those layers were written.
|
||||
pub(crate) async fn push_layers(
|
||||
pub(crate) fn push_layers(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
current_generation: Generation,
|
||||
layers: Vec<(LayerName, LayerFileMetadata)>,
|
||||
) -> Result<(), DeletionQueueError> {
|
||||
if current_generation.is_none() {
|
||||
debug!("Enqueuing deletions in legacy mode, skipping queue");
|
||||
// None generations are not valid for attached tenants: they must always be attached in
|
||||
// a known generation. None generations are still permitted for layers in the index because
|
||||
// they may be historical.
|
||||
assert!(!current_generation.is_none());
|
||||
|
||||
let mut layer_paths = Vec::new();
|
||||
for (layer, meta) in layers {
|
||||
layer_paths.push(remote_layer_path(
|
||||
&tenant_shard_id.tenant_id,
|
||||
&timeline_id,
|
||||
meta.shard,
|
||||
&layer,
|
||||
meta.generation,
|
||||
));
|
||||
}
|
||||
self.push_immediate(layer_paths).await?;
|
||||
return self.flush_immediate().await;
|
||||
}
|
||||
|
||||
self.push_layers_sync(tenant_shard_id, timeline_id, current_generation, layers)
|
||||
}
|
||||
|
||||
/// When a Tenant has a generation, push_layers is always synchronous because
|
||||
/// the ListValidator channel is an unbounded channel.
|
||||
///
|
||||
/// This can be merged into push_layers when we remove the Generation-less mode
|
||||
/// support (`<https://github.com/neondatabase/neon/issues/5395>`)
|
||||
pub(crate) fn push_layers_sync(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
current_generation: Generation,
|
||||
layers: Vec<(LayerName, LayerFileMetadata)>,
|
||||
) -> Result<(), DeletionQueueError> {
|
||||
metrics::DELETION_QUEUE
|
||||
.keys_submitted
|
||||
.inc_by(layers.len() as u64);
|
||||
@@ -957,14 +929,12 @@ mod test {
|
||||
|
||||
// File should still be there after we push it to the queue (we haven't pushed enough to flush anything)
|
||||
info!("Pushing");
|
||||
client
|
||||
.push_layers(
|
||||
tenant_shard_id,
|
||||
TIMELINE_ID,
|
||||
now_generation,
|
||||
[(layer_file_name_1.clone(), layer_metadata)].to_vec(),
|
||||
)
|
||||
.await?;
|
||||
client.push_layers(
|
||||
tenant_shard_id,
|
||||
TIMELINE_ID,
|
||||
now_generation,
|
||||
[(layer_file_name_1.clone(), layer_metadata)].to_vec(),
|
||||
)?;
|
||||
assert_remote_files(&[&remote_layer_file_name_1], &remote_timeline_path);
|
||||
|
||||
assert_local_files(&[], &deletion_prefix);
|
||||
@@ -1017,14 +987,12 @@ mod test {
|
||||
assert_remote_files(&[&remote_layer_name], &remote_timeline_path);
|
||||
|
||||
tracing::debug!("Pushing...");
|
||||
client
|
||||
.push_layers(
|
||||
tenant_shard_id,
|
||||
TIMELINE_ID,
|
||||
stale_generation,
|
||||
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
|
||||
)
|
||||
.await?;
|
||||
client.push_layers(
|
||||
tenant_shard_id,
|
||||
TIMELINE_ID,
|
||||
stale_generation,
|
||||
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
|
||||
)?;
|
||||
|
||||
// We enqueued the operation in a stale generation: it should have failed validation
|
||||
tracing::debug!("Flushing...");
|
||||
@@ -1032,14 +1000,12 @@ mod test {
|
||||
assert_remote_files(&[&remote_layer_name], &remote_timeline_path);
|
||||
|
||||
tracing::debug!("Pushing...");
|
||||
client
|
||||
.push_layers(
|
||||
tenant_shard_id,
|
||||
TIMELINE_ID,
|
||||
latest_generation,
|
||||
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
|
||||
)
|
||||
.await?;
|
||||
client.push_layers(
|
||||
tenant_shard_id,
|
||||
TIMELINE_ID,
|
||||
latest_generation,
|
||||
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
|
||||
)?;
|
||||
|
||||
// We enqueued the operation in a fresh generation: it should have passed validation
|
||||
tracing::debug!("Flushing...");
|
||||
@@ -1074,28 +1040,24 @@ mod test {
|
||||
// generation gets that treatment)
|
||||
let remote_layer_file_name_historical =
|
||||
ctx.write_remote_layer(EXAMPLE_LAYER_NAME, layer_generation)?;
|
||||
client
|
||||
.push_layers(
|
||||
tenant_shard_id,
|
||||
TIMELINE_ID,
|
||||
now_generation.previous(),
|
||||
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
|
||||
)
|
||||
.await?;
|
||||
client.push_layers(
|
||||
tenant_shard_id,
|
||||
TIMELINE_ID,
|
||||
now_generation.previous(),
|
||||
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
|
||||
)?;
|
||||
|
||||
// Inject a deletion in the generation before generation_now: after restart,
|
||||
// this deletion should get executed, because we execute deletions in the
|
||||
// immediately previous generation on the same node.
|
||||
let remote_layer_file_name_previous =
|
||||
ctx.write_remote_layer(EXAMPLE_LAYER_NAME_ALT, layer_generation)?;
|
||||
client
|
||||
.push_layers(
|
||||
tenant_shard_id,
|
||||
TIMELINE_ID,
|
||||
now_generation,
|
||||
[(EXAMPLE_LAYER_NAME_ALT.clone(), layer_metadata.clone())].to_vec(),
|
||||
)
|
||||
.await?;
|
||||
client.push_layers(
|
||||
tenant_shard_id,
|
||||
TIMELINE_ID,
|
||||
now_generation,
|
||||
[(EXAMPLE_LAYER_NAME_ALT.clone(), layer_metadata.clone())].to_vec(),
|
||||
)?;
|
||||
|
||||
client.flush().await?;
|
||||
assert_remote_files(
|
||||
@@ -1139,6 +1101,7 @@ pub(crate) mod mock {
|
||||
use tracing::info;
|
||||
|
||||
use super::*;
|
||||
use crate::tenant::remote_timeline_client::remote_layer_path;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
pub struct ConsumerState {
|
||||
|
||||
@@ -517,7 +517,7 @@ impl RemoteTimelineClient {
|
||||
if let Ok(queue) = queue_locked.initialized_mut() {
|
||||
let blocked_deletions = std::mem::take(&mut queue.blocked_deletions);
|
||||
for d in blocked_deletions {
|
||||
if let Err(e) = self.deletion_queue_client.push_layers_sync(
|
||||
if let Err(e) = self.deletion_queue_client.push_layers(
|
||||
self.tenant_shard_id,
|
||||
self.timeline_id,
|
||||
self.generation,
|
||||
@@ -2151,7 +2151,6 @@ impl RemoteTimelineClient {
|
||||
self.generation,
|
||||
delete.layers.clone(),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!(e))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -474,6 +474,14 @@ HISTORIC_DATA_SETS = [
|
||||
PgVersion.V16,
|
||||
"https://neon-github-public-dev.s3.eu-central-1.amazonaws.com/compatibility-data-snapshots/2024-07-18-pgv16.tar.zst",
|
||||
),
|
||||
# This dataset created on a pageserver running modern code at time of capture, but configured with no generation. This
|
||||
# is our regression test that we can load data written without generations in layer file names & indices
|
||||
HistoricDataSet(
|
||||
"2025-02-07-nogenerations",
|
||||
TenantId("e1411ca6562d6ff62419f693a5695d67"),
|
||||
PgVersion.V17,
|
||||
"https://neon-github-public-dev.s3.eu-central-1.amazonaws.com/compatibility-data-snapshots/2025-02-07-pgv17-nogenerations.tar.zst",
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
|
||||
@@ -12,7 +12,6 @@ of the pageserver are:
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
from enum import StrEnum
|
||||
|
||||
@@ -29,7 +28,6 @@ from fixtures.pageserver.common_types import parse_layer_file_name
|
||||
from fixtures.pageserver.http import PageserverApiException
|
||||
from fixtures.pageserver.utils import (
|
||||
assert_tenant_state,
|
||||
list_prefix,
|
||||
wait_for_last_record_lsn,
|
||||
wait_for_upload,
|
||||
)
|
||||
@@ -124,109 +122,6 @@ def assert_deletion_queue(ps_http, size_fn) -> None:
|
||||
assert size_fn(v) is True
|
||||
|
||||
|
||||
def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Validate behavior when a pageserver is run without generation support enabled,
|
||||
then started again after activating it:
|
||||
- Before upgrade, no objects should have generation suffixes
|
||||
- After upgrade, the bucket should contain a mixture.
|
||||
- In both cases, postgres I/O should work.
|
||||
"""
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
RemoteStorageKind.MOCK_S3,
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.broker.start()
|
||||
for sk in env.safekeepers:
|
||||
sk.start()
|
||||
env.storage_controller.start()
|
||||
|
||||
# We will start a pageserver with no control_plane_api set, so it won't be able to self-register
|
||||
env.storage_controller.node_register(env.pageserver)
|
||||
|
||||
def remove_control_plane_api_field(config):
|
||||
return config.pop("control_plane_api")
|
||||
|
||||
control_plane_api = env.pageserver.edit_config_toml(remove_control_plane_api_field)
|
||||
env.pageserver.start()
|
||||
env.storage_controller.node_configure(env.pageserver.id, {"availability": "Active"})
|
||||
|
||||
env.create_tenant(
|
||||
tenant_id=env.initial_tenant, conf=TENANT_CONF, timeline_id=env.initial_timeline
|
||||
)
|
||||
|
||||
generate_uploads_and_deletions(env, pageserver=env.pageserver)
|
||||
|
||||
def parse_generation_suffix(key):
|
||||
m = re.match(".+-([0-9a-zA-Z]{8})$", key)
|
||||
if m is None:
|
||||
return None
|
||||
else:
|
||||
log.info(f"match: {m}")
|
||||
log.info(f"group: {m.group(1)}")
|
||||
return int(m.group(1), 16)
|
||||
|
||||
assert neon_env_builder.pageserver_remote_storage is not None
|
||||
pre_upgrade_keys = list(
|
||||
[
|
||||
o["Key"]
|
||||
for o in list_prefix(neon_env_builder.pageserver_remote_storage, delimiter="")[
|
||||
"Contents"
|
||||
]
|
||||
]
|
||||
)
|
||||
for key in pre_upgrade_keys:
|
||||
assert parse_generation_suffix(key) is None
|
||||
|
||||
env.pageserver.stop()
|
||||
# Starting without the override that disabled control_plane_api
|
||||
env.pageserver.patch_config_toml_nonrecursive(
|
||||
{
|
||||
"control_plane_api": control_plane_api,
|
||||
}
|
||||
)
|
||||
env.pageserver.start()
|
||||
|
||||
generate_uploads_and_deletions(env, pageserver=env.pageserver, init=False)
|
||||
|
||||
legacy_objects: list[str] = []
|
||||
suffixed_objects = []
|
||||
post_upgrade_keys = list(
|
||||
[
|
||||
o["Key"]
|
||||
for o in list_prefix(neon_env_builder.pageserver_remote_storage, delimiter="")[
|
||||
"Contents"
|
||||
]
|
||||
]
|
||||
)
|
||||
for key in post_upgrade_keys:
|
||||
log.info(f"post-upgrade key: {key}")
|
||||
if parse_generation_suffix(key) is not None:
|
||||
suffixed_objects.append(key)
|
||||
else:
|
||||
legacy_objects.append(key)
|
||||
|
||||
# Bucket now contains a mixture of suffixed and non-suffixed objects
|
||||
assert len(suffixed_objects) > 0
|
||||
assert len(legacy_objects) > 0
|
||||
|
||||
# Flush through deletions to get a clean state for scrub: we are implicitly validating
|
||||
# that our generations-enabled pageserver was able to do deletions of layers
|
||||
# from earlier which don't have a generation.
|
||||
env.pageserver.http_client().deletion_queue_flush(execute=True)
|
||||
|
||||
assert get_deletion_queue_unexpected_errors(env.pageserver.http_client()) == 0
|
||||
|
||||
# Having written a mixture of generation-aware and legacy index_part.json,
|
||||
# ensure the scrubber handles the situation as expected.
|
||||
healthy, metadata_summary = env.storage_scrubber.scan_metadata()
|
||||
assert metadata_summary["tenant_count"] == 1 # Scrubber should have seen our timeline
|
||||
assert metadata_summary["timeline_count"] == 1
|
||||
assert metadata_summary["timeline_shard_count"] == 1
|
||||
assert healthy
|
||||
|
||||
|
||||
def test_deferred_deletion(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
RemoteStorageKind.MOCK_S3,
|
||||
|
||||
Reference in New Issue
Block a user