diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs
index 0fa4f4fd0e..32882c201a 100644
--- a/storage_controller/src/http.rs
+++ b/storage_controller/src/http.rs
@@ -102,7 +102,7 @@ async fn handle_validate(mut req: Request
) -> Result, ApiEr
let validate_req = json_request::(&mut req).await?;
let state = get_state(&req);
- json_response(StatusCode::OK, state.service.validate(validate_req))
+ json_response(StatusCode::OK, state.service.validate(validate_req).await?)
}
/// Call into this before attaching a tenant to a pageserver, to acquire a generation number
diff --git a/storage_controller/src/persistence.rs b/storage_controller/src/persistence.rs
index d03eb87242..e801289752 100644
--- a/storage_controller/src/persistence.rs
+++ b/storage_controller/src/persistence.rs
@@ -8,6 +8,7 @@ use self::split_state::SplitState;
use diesel::pg::PgConnection;
use diesel::prelude::*;
use diesel::Connection;
+use itertools::Itertools;
use pageserver_api::controller_api::MetadataHealthRecord;
use pageserver_api::controller_api::ShardSchedulingPolicy;
use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy};
@@ -91,7 +92,8 @@ pub(crate) enum DatabaseOperation {
Detach,
ReAttach,
IncrementGeneration,
- PeekGenerations,
+ TenantGenerations,
+ ShardGenerations,
ListTenantShards,
InsertTenantShards,
UpdateTenantShard,
@@ -544,13 +546,13 @@ impl Persistence {
/// If the tenant doesn't exist, an empty vector is returned.
///
/// Output is sorted by shard number
- pub(crate) async fn peek_generations(
+ pub(crate) async fn tenant_generations(
&self,
filter_tenant_id: TenantId,
) -> Result, DatabaseError> {
use crate::schema::tenant_shards::dsl::*;
let rows = self
- .with_measured_conn(DatabaseOperation::PeekGenerations, move |conn| {
+ .with_measured_conn(DatabaseOperation::TenantGenerations, move |conn| {
let result = tenant_shards
.filter(tenant_id.eq(filter_tenant_id.to_string()))
.select(TenantShardPersistence::as_select())
@@ -572,6 +574,64 @@ impl Persistence {
.collect())
}
+ /// Read the generation number of specific tenant shards
+ ///
+ /// Output is unsorted. Output may not include values for all inputs, if they are missing in the database.
+ pub(crate) async fn shard_generations(
+ &self,
+ mut tenant_shard_ids: impl Iterator- ,
+ ) -> Result)>, DatabaseError> {
+ let mut rows = Vec::with_capacity(tenant_shard_ids.size_hint().0);
+
+ // We will chunk our input to avoid composing arbitrarily long `IN` clauses. Typically we are
+ // called with a single digit number of IDs, but in principle we could be called with tens
+ // of thousands (all the shards on one pageserver) from the generation validation API.
+ loop {
+ // A modest hardcoded chunk size to handle typical cases in a single query but never generate particularly
+ // large query strings.
+ let chunk_ids = tenant_shard_ids.by_ref().take(32);
+
+ // Compose a comma separated list of tuples for matching on (tenant_id, shard_number, shard_count)
+ let in_clause = chunk_ids
+ .map(|tsid| {
+ format!(
+ "('{}', {}, {})",
+ tsid.tenant_id, tsid.shard_number.0, tsid.shard_count.0
+ )
+ })
+ .join(",");
+
+ // We are done when our iterator gives us nothing to filter on
+ if in_clause.is_empty() {
+ break;
+ }
+
+ let chunk_rows = self
+ .with_measured_conn(DatabaseOperation::ShardGenerations, move |conn| {
+ // diesel doesn't support multi-column IN queries, so we compose raw SQL. No escaping is required because
+ // the inputs are strongly typed and cannot carry any user-supplied raw string content.
+ let result : Vec = diesel::sql_query(
+ format!("SELECT * from tenant_shards where (tenant_id, shard_number, shard_count) in ({in_clause});").as_str()
+ ).load(conn)?;
+
+ Ok(result)
+ })
+ .await?;
+ rows.extend(chunk_rows.into_iter())
+ }
+
+ Ok(rows
+ .into_iter()
+ .map(|tsp| {
+ (
+ tsp.get_tenant_shard_id()
+ .expect("Bad tenant ID in database"),
+ tsp.generation.map(|g| Generation::new(g as u32)),
+ )
+ })
+ .collect())
+ }
+
#[allow(non_local_definitions)]
/// For use when updating a persistent property of a tenant, such as its config or placement_policy.
///
@@ -983,7 +1043,9 @@ impl Persistence {
}
/// Parts of [`crate::tenant_shard::TenantShard`] that are stored durably
-#[derive(Queryable, Selectable, Insertable, Serialize, Deserialize, Clone, Eq, PartialEq)]
+#[derive(
+ QueryableByName, Queryable, Selectable, Insertable, Serialize, Deserialize, Clone, Eq, PartialEq,
+)]
#[diesel(table_name = crate::schema::tenant_shards)]
pub(crate) struct TenantShardPersistence {
#[serde(default)]
diff --git a/storage_controller/src/reconciler.rs b/storage_controller/src/reconciler.rs
index 102a3124d2..83b7b2b4f2 100644
--- a/storage_controller/src/reconciler.rs
+++ b/storage_controller/src/reconciler.rs
@@ -17,6 +17,7 @@ use utils::failpoint_support;
use utils::generation::Generation;
use utils::id::{NodeId, TimelineId};
use utils::lsn::Lsn;
+use utils::pausable_failpoint;
use utils::sync::gate::GateGuard;
use crate::compute_hook::{ComputeHook, NotifyError};
@@ -593,6 +594,8 @@ impl Reconciler {
notify_attempts += 1;
}
+ pausable_failpoint!("reconciler-live-migrate-post-notify");
+
// Downgrade the origin to secondary. If the tenant's policy is PlacementPolicy::Attached(0), then
// this location will be deleted in the general case reconciliation that runs after this.
let origin_secondary_conf = build_location_config(
diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs
index 4ccc5c951c..90334d10a7 100644
--- a/storage_controller/src/service.rs
+++ b/storage_controller/src/service.rs
@@ -1854,37 +1854,74 @@ impl Service {
Ok(response)
}
- pub(crate) fn validate(&self, validate_req: ValidateRequest) -> ValidateResponse {
- let locked = self.inner.read().unwrap();
+ pub(crate) async fn validate(
+ &self,
+ validate_req: ValidateRequest,
+ ) -> Result {
+ // Fast in-memory check: we may reject validation on anything that doesn't match our
+ // in-memory generation for a shard
+ let in_memory_result = {
+ let mut in_memory_result = Vec::new();
+ let locked = self.inner.read().unwrap();
+ for req_tenant in validate_req.tenants {
+ if let Some(tenant_shard) = locked.tenants.get(&req_tenant.id) {
+ let valid = tenant_shard.generation == Some(Generation::new(req_tenant.gen));
+ tracing::info!(
+ "handle_validate: {}(gen {}): valid={valid} (latest {:?})",
+ req_tenant.id,
+ req_tenant.gen,
+ tenant_shard.generation
+ );
+
+ in_memory_result.push((req_tenant.id, Generation::new(req_tenant.gen), valid));
+ } else {
+ // This is legal: for example during a shard split the pageserver may still
+ // have deletions in its queue from the old pre-split shard, or after deletion
+ // of a tenant that was busy with compaction/gc while being deleted.
+ tracing::info!(
+ "Refusing deletion validation for missing shard {}",
+ req_tenant.id
+ );
+ }
+ }
+
+ in_memory_result
+ };
+
+ // Database calls to confirm validity for anything that passed the in-memory check. We must do this
+ // in case of controller split-brain, where some other controller process might have incremented the generation.
+ let db_generations = self
+ .persistence
+ .shard_generations(in_memory_result.iter().filter_map(|i| {
+ if i.2 {
+ Some(&i.0)
+ } else {
+ None
+ }
+ }))
+ .await?;
+ let db_generations = db_generations.into_iter().collect::>();
let mut response = ValidateResponse {
tenants: Vec::new(),
};
-
- for req_tenant in validate_req.tenants {
- if let Some(tenant_shard) = locked.tenants.get(&req_tenant.id) {
- let valid = tenant_shard.generation == Some(Generation::new(req_tenant.gen));
- tracing::info!(
- "handle_validate: {}(gen {}): valid={valid} (latest {:?})",
- req_tenant.id,
- req_tenant.gen,
- tenant_shard.generation
- );
- response.tenants.push(ValidateResponseTenant {
- id: req_tenant.id,
- valid,
- });
+ for (tenant_shard_id, validate_generation, valid) in in_memory_result.into_iter() {
+ let valid = if valid {
+ let db_generation = db_generations.get(&tenant_shard_id);
+ db_generation == Some(&Some(validate_generation))
} else {
- // After tenant deletion, we may approve any validation. This avoids
- // spurious warnings on the pageserver if it has pending LSN updates
- // at the point a deletion happens.
- response.tenants.push(ValidateResponseTenant {
- id: req_tenant.id,
- valid: true,
- });
- }
+ // If in-memory state says it's invalid, trust that. It's always safe to fail a validation, at worst
+ // this prevents a pageserver from cleaning up an object in S3.
+ false
+ };
+
+ response.tenants.push(ValidateResponseTenant {
+ id: tenant_shard_id,
+ valid,
+ })
}
- response
+
+ Ok(response)
}
pub(crate) async fn tenant_create(
@@ -3179,7 +3216,7 @@ impl Service {
// run concurrently with reconciliations, and it is not guaranteed that the node we find here
// will still be the latest when we're done: we will check generations again at the end of
// this function to handle that.
- let generations = self.persistence.peek_generations(tenant_id).await?;
+ let generations = self.persistence.tenant_generations(tenant_id).await?;
if generations
.iter()
@@ -3236,7 +3273,7 @@ impl Service {
// Post-check: are all the generations of all the shards the same as they were initially? This proves that
// our remote operation executed on the latest generation and is therefore persistent.
{
- let latest_generations = self.persistence.peek_generations(tenant_id).await?;
+ let latest_generations = self.persistence.tenant_generations(tenant_id).await?;
if latest_generations
.into_iter()
.map(
diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py
index 13f5ec1b4f..8da42294b0 100644
--- a/test_runner/regress/test_storage_controller.py
+++ b/test_runner/regress/test_storage_controller.py
@@ -2332,6 +2332,122 @@ def test_storage_controller_timeline_crud_race(neon_env_builder: NeonEnvBuilder)
).timeline_create(PgVersion.NOT_SET, tenant_id, create_timeline_id)
+def test_storage_controller_validate_during_migration(neon_env_builder: NeonEnvBuilder):
+ """
+ A correctness edge case: while we are live migrating and a shard's generation is
+ visible to the Reconciler but not to the central Service, the generation validation
+ API should still prevent stale generations from doing deletions.
+ """
+ neon_env_builder.num_pageservers = 2
+ neon_env_builder.enable_pageserver_remote_storage(s3_storage())
+ env = neon_env_builder.init_configs()
+ env.start()
+
+ TENANT_CONF = {
+ # small checkpointing and compaction targets to ensure we generate many upload operations
+ "checkpoint_distance": 128 * 1024,
+ "compaction_threshold": 1,
+ "compaction_target_size": 128 * 1024,
+ # disable background compaction and GC. We invoke it manually when we want it to happen.
+ "gc_period": "0s",
+ "compaction_period": "0s",
+ }
+
+ tenant_id = env.initial_tenant
+ timeline_id = env.initial_timeline
+ env.neon_cli.create_tenant(tenant_id, timeline_id)
+ env.storage_controller.pageserver_api().set_tenant_config(tenant_id, TENANT_CONF)
+
+ # Write enough data that a compaction would do some work (deleting some L0s)
+ workload = Workload(env, tenant_id, timeline_id)
+ workload.init()
+ workload.write_rows(64)
+ for _i in range(0, 2):
+ workload.churn_rows(64, upload=False)
+
+ # Upload but don't compact
+ origin_pageserver = env.get_tenant_pageserver(tenant_id)
+ dest_ps_id = [p.id for p in env.pageservers if p.id != origin_pageserver.id][0]
+ origin_pageserver.http_client().timeline_checkpoint(
+ tenant_id, timeline_id, wait_until_uploaded=True, compact=False
+ )
+
+ # Start a compaction that will pause on a failpoint.
+ compaction_failpoint = "before-upload-index-pausable"
+ origin_pageserver.http_client().configure_failpoints((compaction_failpoint, "pause"))
+
+ # This failpoint can also cause migration code to time out trying to politely flush
+ # during migrations
+ origin_pageserver.allowed_errors.append(".*Timed out waiting for flush to remote storage.*")
+
+ try:
+ with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
+ compact_fut = executor.submit(
+ origin_pageserver.http_client().timeline_compact,
+ tenant_id,
+ timeline_id,
+ wait_until_uploaded=True,
+ )
+
+ # Let the compaction start and then get stuck uploading an index: when we live migrate, the new generation's
+ # index will be initialized from the pre-compaction index, referencing layers that the compaction will try to delete
+ def has_hit_compaction_failpoint():
+ assert origin_pageserver.log_contains(f"at failpoint {compaction_failpoint}")
+
+ wait_until(10, 1, has_hit_compaction_failpoint)
+
+ # While the compaction is running, start a live migration which will pause long enough for the compaction to sleep,
+ # after incrementing generation and attaching the new location
+ migration_failpoint = "reconciler-live-migrate-post-notify"
+ env.storage_controller.configure_failpoints((migration_failpoint, "pause"))
+ migrate_fut = executor.submit(
+ env.storage_controller.tenant_shard_migrate,
+ TenantShardId(tenant_id, 0, 0),
+ dest_ps_id,
+ )
+
+ def has_hit_migration_failpoint():
+ assert env.storage_controller.log_contains(f"at failpoint {migration_failpoint}")
+
+ # Long wait because the migration will have to time out during transition to AttachedStale
+ # before it reaches this point. The timeout is because the AttachedStale transition includes
+ # a flush of remote storage, and if the compaction already enqueued an index upload this cannot
+ # make progress.
+ wait_until(60, 1, has_hit_migration_failpoint)
+
+ # Origin pageserver has succeeded with compaction before the migration completed. It has done all the writes it wanted to do in its own (stale) generation
+ origin_pageserver.http_client().configure_failpoints((compaction_failpoint, "off"))
+ compact_fut.result()
+ origin_pageserver.http_client().deletion_queue_flush(execute=True)
+
+ # Eventually migration completes
+ env.storage_controller.configure_failpoints((migration_failpoint, "off"))
+ migrate_fut.result()
+ except:
+ # Always disable 'pause' failpoints, even on failure, to avoid hanging in shutdown
+ env.storage_controller.configure_failpoints((migration_failpoint, "off"))
+ origin_pageserver.http_client().configure_failpoints((compaction_failpoint, "off"))
+ raise
+
+ # Ensure the destination of the migration writes an index, so that if it has corrupt state that is
+ # visible to the scrubber.
+ workload.write_rows(1, upload=False)
+ env.get_pageserver(dest_ps_id).http_client().timeline_checkpoint(
+ tenant_id, timeline_id, wait_until_uploaded=True, compact=False
+ )
+
+ # The destination of the live migration would now have a corrupt index (referencing deleted L0s) if
+ # the controller had not properly applied validation rules.
+ healthy, _summary = env.storage_scrubber.scan_metadata()
+ try:
+ log.info(f"scrubbed, healthy={healthy}")
+ assert healthy
+ except:
+ # On failures, we want to report them FAIL during the test, not as ERROR during teardown
+ neon_env_builder.enable_scrub_on_exit = False
+ raise
+
+
@run_only_on_default_postgres("this is like a 'unit test' against storcon db")
def test_safekeeper_deployment_time_update(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_configs()
diff --git a/test_runner/regress/test_storage_scrubber.py b/test_runner/regress/test_storage_scrubber.py
index 292a9a1010..848e214c5e 100644
--- a/test_runner/regress/test_storage_scrubber.py
+++ b/test_runner/regress/test_storage_scrubber.py
@@ -217,6 +217,13 @@ def test_scrubber_physical_gc_ancestors(
workload.init()
workload.write_rows(100)
+ # Issue a deletion queue flush so that the parent shard can't leave behind layers
+ # that will look like unexpected garbage to the scrubber
+ for pre_split_shard in env.storage_controller.locate(tenant_id):
+ env.get_pageserver(pre_split_shard["node_id"]).http_client().deletion_queue_flush(
+ execute=True
+ )
+
new_shard_count = 4
assert shard_count is None or new_shard_count > shard_count
shards = env.storage_controller.tenant_shard_split(tenant_id, shard_count=new_shard_count)
@@ -321,6 +328,10 @@ def test_scrubber_physical_gc_timeline_deletion(neon_env_builder: NeonEnvBuilder
workload.write_rows(100, upload=False)
workload.stop()
+ # Issue a deletion queue flush so that the parent shard can't leave behind layers
+ # that will look like unexpected garbage to the scrubber
+ env.get_tenant_pageserver(tenant_id).http_client().deletion_queue_flush(execute=True)
+
new_shard_count = 4
shards = env.storage_controller.tenant_shard_split(tenant_id, shard_count=new_shard_count)
for shard in shards: