mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 15:02:56 +00:00
storcon: notify compute if correct observed state was refreshed (#11342)
## Problem Previously, if the observed state was refreshed and matching the intent, we wouldn't send a compute notification. This is unsafe. There's no guarantee that the location landed on the pageserver _and_ a compute notification for it was delivered. See https://github.com/neondatabase/neon/issues/11291#issuecomment-2743205411 for one such example. ## Summary of changes Add a reproducer and notify the compute if the correct observed state required a refresh. Closes https://github.com/neondatabase/neon/issues/11291
This commit is contained in:
@@ -686,6 +686,8 @@ impl Reconciler {
|
||||
.await?,
|
||||
);
|
||||
|
||||
pausable_failpoint!("reconciler-live-migrate-post-generation-inc");
|
||||
|
||||
let dest_conf = build_location_config(
|
||||
&self.shard,
|
||||
&self.config,
|
||||
@@ -760,7 +762,9 @@ impl Reconciler {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn maybe_refresh_observed(&mut self) -> Result<(), ReconcileError> {
|
||||
/// Returns true if the observed state of the attached location was refreshed
|
||||
/// and false otherwise.
|
||||
async fn maybe_refresh_observed(&mut self) -> Result<bool, ReconcileError> {
|
||||
// If the attached node has uncertain state, read it from the pageserver before proceeding: this
|
||||
// is important to avoid spurious generation increments.
|
||||
//
|
||||
@@ -770,7 +774,7 @@ impl Reconciler {
|
||||
|
||||
let Some(attached_node) = self.intent.attached.as_ref() else {
|
||||
// Nothing to do
|
||||
return Ok(());
|
||||
return Ok(false);
|
||||
};
|
||||
|
||||
if matches!(
|
||||
@@ -815,7 +819,7 @@ impl Reconciler {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
/// Reconciling a tenant makes API calls to pageservers until the observed state
|
||||
@@ -831,7 +835,7 @@ impl Reconciler {
|
||||
/// state where it still requires later reconciliation.
|
||||
pub(crate) async fn reconcile(&mut self) -> Result<(), ReconcileError> {
|
||||
// Prepare: if we have uncertain `observed` state for our would-be attachement location, then refresh it
|
||||
self.maybe_refresh_observed().await?;
|
||||
let refreshed = self.maybe_refresh_observed().await?;
|
||||
|
||||
// Special case: live migration
|
||||
self.maybe_live_migrate().await?;
|
||||
@@ -855,8 +859,14 @@ impl Reconciler {
|
||||
);
|
||||
match self.observed.locations.get(&node.get_id()) {
|
||||
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
|
||||
// Nothing to do
|
||||
tracing::info!(node_id=%node.get_id(), "Observed configuration already correct.")
|
||||
if refreshed {
|
||||
tracing::info!(
|
||||
node_id=%node.get_id(), "Observed configuration correct after refresh. Notifying compute.");
|
||||
self.compute_notify().await?;
|
||||
} else {
|
||||
// Nothing to do
|
||||
tracing::info!(node_id=%node.get_id(), "Observed configuration already correct.");
|
||||
}
|
||||
}
|
||||
observed => {
|
||||
// In all cases other than a matching observed configuration, we will
|
||||
|
||||
@@ -249,6 +249,7 @@ def test_forward_compatibility(
|
||||
top_output_dir: Path,
|
||||
pg_version: PgVersion,
|
||||
compatibility_snapshot_dir: Path,
|
||||
compute_reconfigure_listener: ComputeReconfigure,
|
||||
):
|
||||
"""
|
||||
Test that the old binaries can read new data
|
||||
@@ -257,6 +258,7 @@ def test_forward_compatibility(
|
||||
os.environ.get("ALLOW_FORWARD_COMPATIBILITY_BREAKAGE", "false").lower() == "true"
|
||||
)
|
||||
|
||||
neon_env_builder.control_plane_hooks_api = compute_reconfigure_listener.control_plane_hooks_api
|
||||
neon_env_builder.test_may_use_compatibility_snapshot_binaries = True
|
||||
|
||||
try:
|
||||
|
||||
@@ -4176,3 +4176,121 @@ def test_storage_controller_graceful_migration(neon_env_builder: NeonEnvBuilder,
|
||||
)
|
||||
else:
|
||||
assert initial_ps.http_client().tenant_list_locations()["tenant_shards"] == []
|
||||
|
||||
|
||||
@run_only_on_default_postgres("this is like a 'unit test' against storcon db")
|
||||
def test_storage_controller_migrate_with_pageserver_restart(
|
||||
neon_env_builder: NeonEnvBuilder, make_httpserver
|
||||
):
|
||||
"""
|
||||
Test that live migrations which fail right after incrementing the generation
|
||||
due to the destination going offline eventually send a compute notification
|
||||
after the destination re-attaches.
|
||||
"""
|
||||
neon_env_builder.num_pageservers = 2
|
||||
|
||||
neon_env_builder.storage_controller_config = {
|
||||
# Disable transitions to offline
|
||||
"max_offline": "600s",
|
||||
"use_local_compute_notifications": False,
|
||||
}
|
||||
|
||||
neon_env_builder.control_plane_hooks_api = (
|
||||
f"http://{make_httpserver.host}:{make_httpserver.port}/"
|
||||
)
|
||||
|
||||
notifications = []
|
||||
|
||||
def notify(request: Request):
|
||||
log.info(f"Received notify-attach: {request}")
|
||||
notifications.append(request.json)
|
||||
|
||||
make_httpserver.expect_request("/notify-attach", method="PUT").respond_with_handler(notify)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.storage_controller.allowed_errors.extend(
|
||||
[
|
||||
".*Call to node.*management API failed.*",
|
||||
".*Call to node.*management API still failed.*",
|
||||
".*Reconcile error.*",
|
||||
".*request.*PUT.*migrate.*",
|
||||
]
|
||||
)
|
||||
|
||||
env.storage_controller.tenant_policy_update(env.initial_tenant, {"placement": {"Attached": 1}})
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
initial_desc = env.storage_controller.tenant_describe(env.initial_tenant)["shards"][0]
|
||||
log.info(f"{initial_desc=}")
|
||||
primary = env.get_pageserver(initial_desc["node_attached"])
|
||||
secondary = env.get_pageserver(initial_desc["node_secondary"][0])
|
||||
|
||||
# Pause the migration after incrementing the generation in the database
|
||||
env.storage_controller.configure_failpoints(
|
||||
("reconciler-live-migrate-post-generation-inc", "pause")
|
||||
)
|
||||
|
||||
tenant_shard_id = TenantShardId(env.initial_tenant, 0, 0)
|
||||
|
||||
try:
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
|
||||
migrate_fut = executor.submit(
|
||||
env.storage_controller.tenant_shard_migrate,
|
||||
tenant_shard_id,
|
||||
secondary.id,
|
||||
config=StorageControllerMigrationConfig(prewarm=False, override_scheduler=True),
|
||||
)
|
||||
|
||||
def has_hit_migration_failpoint():
|
||||
expr = "at failpoint reconciler-live-migrate-post-generation-inc"
|
||||
log.info(expr)
|
||||
assert env.storage_controller.log_contains(expr)
|
||||
|
||||
wait_until(has_hit_migration_failpoint)
|
||||
|
||||
secondary.stop()
|
||||
|
||||
# Eventually migration completes
|
||||
env.storage_controller.configure_failpoints(
|
||||
("reconciler-live-migrate-post-generation-inc", "off")
|
||||
)
|
||||
try:
|
||||
migrate_fut.result()
|
||||
except StorageControllerApiException as err:
|
||||
log.info(f"Migration failed: {err}")
|
||||
except:
|
||||
env.storage_controller.configure_failpoints(
|
||||
("reconciler-live-migrate-post-generation-inc", "off")
|
||||
)
|
||||
raise
|
||||
|
||||
def process_migration_result():
|
||||
dump = env.storage_controller.tenant_shard_dump()
|
||||
observed = dump[0]["observed"]["locations"]
|
||||
|
||||
log.info(f"{observed=} primary={primary.id} secondary={secondary.id}")
|
||||
|
||||
assert observed[str(primary.id)]["conf"]["mode"] == "AttachedStale"
|
||||
assert observed[str(secondary.id)]["conf"] is None
|
||||
|
||||
wait_until(process_migration_result)
|
||||
|
||||
# Start and wait for re-attach to be processed
|
||||
secondary.start()
|
||||
env.storage_controller.poll_node_status(
|
||||
secondary.id,
|
||||
desired_availability=PageserverAvailability.ACTIVE,
|
||||
desired_scheduling_policy=None,
|
||||
max_attempts=10,
|
||||
backoff=1,
|
||||
)
|
||||
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
assert notifications[-1] == {
|
||||
"tenant_id": str(env.initial_tenant),
|
||||
"stripe_size": None,
|
||||
"shards": [{"node_id": int(secondary.id), "shard_number": 0}],
|
||||
"preferred_az": DEFAULT_AZ_ID,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user