mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
Compare commits
4 Commits
release-78
...
vlad/remov
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
96bd74697a | ||
|
|
8e77e10aa3 | ||
|
|
83e90d3b65 | ||
|
|
d3fa0f6b9e |
@@ -14,7 +14,6 @@ use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::backoff::exponential_backoff;
|
||||
use utils::failpoint_support;
|
||||
use utils::generation::Generation;
|
||||
use utils::id::{NodeId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
@@ -212,11 +211,11 @@ impl Reconciler {
|
||||
lazy: bool,
|
||||
) -> Result<(), ReconcileError> {
|
||||
if !node.is_available() && config.mode == LocationConfigMode::Detached {
|
||||
// Attempts to detach from offline nodes may be imitated without doing I/O: a node which is offline
|
||||
// will get fully reconciled wrt the shard's intent state when it is reactivated, irrespective of
|
||||
// what we put into `observed`, in [`crate::service::Service::node_activate_reconcile`]
|
||||
tracing::info!("Node {node} is unavailable during detach: proceeding anyway, it will be detached on next activation");
|
||||
self.observed.locations.remove(&node.get_id());
|
||||
// Skip if we cannot detach right now due to the node being unavailable. It will be
|
||||
// handled by the background reconciliation loop.
|
||||
tracing::info!(
|
||||
"Node {node} is unavailable during detach: proceeding anyway, it will be detached via background reconciliation"
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@@ -749,6 +748,8 @@ impl Reconciler {
|
||||
};
|
||||
|
||||
if increment_generation {
|
||||
pausable_failpoint!("reconciler-pre-increment-generation");
|
||||
|
||||
let generation = self
|
||||
.persistence
|
||||
.increment_generation(self.tenant_shard_id, node.get_id())
|
||||
@@ -824,7 +825,7 @@ impl Reconciler {
|
||||
.handle_detach(self.tenant_shard_id, self.shard.stripe_size);
|
||||
}
|
||||
|
||||
failpoint_support::sleep_millis_async!("sleep-on-reconcile-epilogue");
|
||||
pausable_failpoint!("reconciler-epilogue");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -83,6 +83,7 @@ use utils::{
|
||||
generation::Generation,
|
||||
http::error::ApiError,
|
||||
id::{NodeId, TenantId, TimelineId},
|
||||
pausable_failpoint,
|
||||
sync::gate::Gate,
|
||||
};
|
||||
|
||||
@@ -1024,10 +1025,12 @@ impl Service {
|
||||
)
|
||||
.await;
|
||||
|
||||
pausable_failpoint!("heartbeat-pre-node-state-configure");
|
||||
|
||||
// This is the code path for geniune availability transitions (i.e node
|
||||
// goes unavailable and/or comes back online).
|
||||
let res = self
|
||||
.node_state_configure(node_id, Some(new_availability), None, &node_lock)
|
||||
.node_state_configure(node_id, Some(new_availability), None)
|
||||
.await;
|
||||
|
||||
match res {
|
||||
@@ -1314,7 +1317,7 @@ impl Service {
|
||||
if count_min != count_max {
|
||||
// Aborting the split in the database and dropping the child shards is sufficient: the reconciliation in
|
||||
// [`Self::startup_reconcile`] will implicitly drop the child shards on remote pageservers, or they'll
|
||||
// be dropped later in [`Self::node_activate_reconcile`] if it isn't available right now.
|
||||
// be dropped later via background reconciliation if it isn't available right now.
|
||||
tracing::info!("Aborting shard split {tenant_id} {count_min:?} -> {count_max:?}");
|
||||
let abort_status = persistence.abort_shard_split(tenant_id, count_max).await?;
|
||||
|
||||
@@ -1724,118 +1727,6 @@ impl Service {
|
||||
}
|
||||
}
|
||||
|
||||
// When the availability state of a node transitions to active, we must do a full reconciliation
|
||||
// of LocationConfigs on that node. This is because while a node was offline:
|
||||
// - we might have proceeded through startup_reconcile without checking for extraneous LocationConfigs on this node
|
||||
// - aborting a tenant shard split might have left rogue child shards behind on this node.
|
||||
//
|
||||
// This function must complete _before_ setting a `Node` to Active: once it is set to Active, other
|
||||
// Reconcilers might communicate with the node, and these must not overlap with the work we do in
|
||||
// this function.
|
||||
//
|
||||
// The reconciliation logic in here is very similar to what [`Self::startup_reconcile`] does, but
|
||||
// for written for a single node rather than as a batch job for all nodes.
|
||||
#[tracing::instrument(skip_all, fields(node_id=%node.get_id()))]
|
||||
async fn node_activate_reconcile(
|
||||
&self,
|
||||
mut node: Node,
|
||||
_lock: &TracingExclusiveGuard<NodeOperations>,
|
||||
) -> Result<(), ApiError> {
|
||||
// This Node is a mutable local copy: we will set it active so that we can use its
|
||||
// API client to reconcile with the node. The Node in [`Self::nodes`] will get updated
|
||||
// later.
|
||||
node.set_availability(NodeAvailability::Active(PageserverUtilization::full()));
|
||||
|
||||
let configs = match node
|
||||
.with_client_retries(
|
||||
|client| async move { client.list_location_config().await },
|
||||
&self.config.jwt_token,
|
||||
1,
|
||||
5,
|
||||
SHORT_RECONCILE_TIMEOUT,
|
||||
&self.cancel,
|
||||
)
|
||||
.await
|
||||
{
|
||||
None => {
|
||||
// We're shutting down (the Node's cancellation token can't have fired, because
|
||||
// we're the only scope that has a reference to it, and we didn't fire it).
|
||||
return Err(ApiError::ShuttingDown);
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
// This node didn't succeed listing its locations: it may not proceed to active state
|
||||
// as it is apparently unavailable.
|
||||
return Err(ApiError::PreconditionFailed(
|
||||
format!("Failed to query node location configs, cannot activate ({e})").into(),
|
||||
));
|
||||
}
|
||||
Some(Ok(configs)) => configs,
|
||||
};
|
||||
tracing::info!("Loaded {} LocationConfigs", configs.tenant_shards.len());
|
||||
|
||||
let mut cleanup = Vec::new();
|
||||
{
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
|
||||
for (tenant_shard_id, observed_loc) in configs.tenant_shards {
|
||||
let Some(tenant_shard) = locked.tenants.get_mut(&tenant_shard_id) else {
|
||||
cleanup.push(tenant_shard_id);
|
||||
continue;
|
||||
};
|
||||
tenant_shard
|
||||
.observed
|
||||
.locations
|
||||
.insert(node.get_id(), ObservedStateLocation { conf: observed_loc });
|
||||
}
|
||||
}
|
||||
|
||||
for tenant_shard_id in cleanup {
|
||||
tracing::info!("Detaching {tenant_shard_id}");
|
||||
match node
|
||||
.with_client_retries(
|
||||
|client| async move {
|
||||
let config = LocationConfig {
|
||||
mode: LocationConfigMode::Detached,
|
||||
generation: None,
|
||||
secondary_conf: None,
|
||||
shard_number: tenant_shard_id.shard_number.0,
|
||||
shard_count: tenant_shard_id.shard_count.literal(),
|
||||
shard_stripe_size: 0,
|
||||
tenant_conf: models::TenantConfig::default(),
|
||||
};
|
||||
client
|
||||
.location_config(tenant_shard_id, config, None, false)
|
||||
.await
|
||||
},
|
||||
&self.config.jwt_token,
|
||||
1,
|
||||
5,
|
||||
SHORT_RECONCILE_TIMEOUT,
|
||||
&self.cancel,
|
||||
)
|
||||
.await
|
||||
{
|
||||
None => {
|
||||
// We're shutting down (the Node's cancellation token can't have fired, because
|
||||
// we're the only scope that has a reference to it, and we didn't fire it).
|
||||
return Err(ApiError::ShuttingDown);
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
// Do not let the node proceed to Active state if it is not responsive to requests
|
||||
// to detach. This could happen if e.g. a shutdown bug in the pageserver is preventing
|
||||
// detach completing: we should not let this node back into the set of nodes considered
|
||||
// okay for scheduling.
|
||||
return Err(ApiError::Conflict(format!(
|
||||
"Node {node} failed to detach {tenant_shard_id}: {e}"
|
||||
)));
|
||||
}
|
||||
Some(Ok(_)) => {}
|
||||
};
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn re_attach(
|
||||
&self,
|
||||
reattach_req: ReAttachRequest,
|
||||
@@ -1923,9 +1814,8 @@ impl Service {
|
||||
}
|
||||
}
|
||||
|
||||
// We consider a node Active once we have composed a re-attach response, but we
|
||||
// do not call [`Self::node_activate_reconcile`]: the handling of the re-attach response
|
||||
// implicitly synchronizes the LocationConfigs on the node.
|
||||
// We consider a node Active once we have composed a re-attach response:
|
||||
// the handling of the re-attach response implicitly synchronizes the LocationConfigs on the node.
|
||||
//
|
||||
// Setting a node active unblocks any Reconcilers that might write to the location config API,
|
||||
// but those requests will not be accepted by the node until it has finished processing
|
||||
@@ -2492,6 +2382,7 @@ impl Service {
|
||||
// Persist updates
|
||||
// Ordering: write to the database before applying changes in-memory, so that
|
||||
// we will not appear time-travel backwards on a restart.
|
||||
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
for ShardUpdate {
|
||||
tenant_shard_id,
|
||||
@@ -5507,7 +5398,6 @@ impl Service {
|
||||
node_id: NodeId,
|
||||
availability: Option<NodeAvailability>,
|
||||
scheduling: Option<NodeSchedulingPolicy>,
|
||||
node_lock: &TracingExclusiveGuard<NodeOperations>,
|
||||
) -> Result<AvailabilityTransition, ApiError> {
|
||||
if let Some(scheduling) = scheduling {
|
||||
// Scheduling is a persistent part of Node: we must write updates to the database before
|
||||
@@ -5515,32 +5405,15 @@ impl Service {
|
||||
self.persistence.update_node(node_id, scheduling).await?;
|
||||
}
|
||||
|
||||
// If we're activating a node, then before setting it active we must reconcile any shard locations
|
||||
// on that node, in case it is out of sync, e.g. due to being unavailable during controller startup,
|
||||
// by calling [`Self::node_activate_reconcile`]
|
||||
//
|
||||
// The transition we calculate here remains valid later in the function because we hold the op lock on the node:
|
||||
// nothing else can mutate its availability while we run.
|
||||
let availability_transition = if let Some(input_availability) = availability.as_ref() {
|
||||
let (activate_node, availability_transition) = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let Some(node) = locked.nodes.get(&node_id) else {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Node {} not registered", node_id).into(),
|
||||
));
|
||||
};
|
||||
|
||||
(
|
||||
node.clone(),
|
||||
node.get_availability_transition(input_availability),
|
||||
)
|
||||
let locked = self.inner.read().unwrap();
|
||||
let Some(node) = locked.nodes.get(&node_id) else {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Node {} not registered", node_id).into(),
|
||||
));
|
||||
};
|
||||
|
||||
if matches!(availability_transition, AvailabilityTransition::ToActive) {
|
||||
self.node_activate_reconcile(activate_node, node_lock)
|
||||
.await?;
|
||||
}
|
||||
availability_transition
|
||||
node.get_availability_transition(input_availability)
|
||||
} else {
|
||||
AvailabilityTransition::Unchanged
|
||||
};
|
||||
@@ -5730,7 +5603,7 @@ impl Service {
|
||||
trace_exclusive_lock(&self.node_op_locks, node_id, NodeOperations::Configure).await;
|
||||
|
||||
let transition = self
|
||||
.node_state_configure(node_id, availability, scheduling, &node_lock)
|
||||
.node_state_configure(node_id, availability, scheduling)
|
||||
.await?;
|
||||
self.handle_node_availability_transition(node_id, transition, &node_lock)
|
||||
.await
|
||||
|
||||
@@ -2521,6 +2521,7 @@ class NeonPageserver(PgProtocol, LogUtils):
|
||||
self,
|
||||
extra_env_vars: dict[str, str] | None = None,
|
||||
timeout_in_seconds: int | None = None,
|
||||
await_active: bool = True,
|
||||
) -> Self:
|
||||
"""
|
||||
Start the page server.
|
||||
@@ -2547,8 +2548,10 @@ class NeonPageserver(PgProtocol, LogUtils):
|
||||
)
|
||||
self.running = True
|
||||
|
||||
if self.env.storage_controller.running and self.env.storage_controller.node_registered(
|
||||
self.id
|
||||
if (
|
||||
await_active
|
||||
and self.env.storage_controller.running
|
||||
and self.env.storage_controller.node_registered(self.id)
|
||||
):
|
||||
self.env.storage_controller.poll_node_status(
|
||||
self.id, PageserverAvailability.ACTIVE, None, max_attempts=200, backoff=0.1
|
||||
|
||||
@@ -17,6 +17,7 @@ from fixtures.compute_reconfigure import ComputeReconfigure
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
DEFAULT_AZ_ID,
|
||||
LogCursor,
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
NeonPageserver,
|
||||
@@ -2406,7 +2407,14 @@ def test_storage_controller_step_down(neon_env_builder: NeonEnvBuilder):
|
||||
env.storage_controller.tenant_create(tid)
|
||||
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
env.storage_controller.configure_failpoints(("sleep-on-reconcile-epilogue", "return(10000)"))
|
||||
env.storage_controller.configure_failpoints(("reconciler-epilogue", "pause"))
|
||||
|
||||
def unpause_failpoint():
|
||||
time.sleep(2)
|
||||
env.storage_controller.configure_failpoints(("reconciler-epilogue", "off"))
|
||||
|
||||
thread = threading.Thread(target=unpause_failpoint)
|
||||
thread.start()
|
||||
|
||||
# Make a change to the tenant config to trigger a slow reconcile
|
||||
virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
|
||||
@@ -2421,6 +2429,8 @@ def test_storage_controller_step_down(neon_env_builder: NeonEnvBuilder):
|
||||
observed_state = env.storage_controller.step_down()
|
||||
log.info(f"Storage controller stepped down with {observed_state=}")
|
||||
|
||||
thread.join()
|
||||
|
||||
# Validate that we waited for the slow reconcile to complete
|
||||
# and updated the observed state in the storcon before stepping down.
|
||||
node_id = str(env.pageserver.id)
|
||||
@@ -3294,3 +3304,113 @@ def test_storage_controller_detached_stopped(
|
||||
|
||||
# Confirm the detach happened
|
||||
assert env.pageserver.http_client().tenant_list_locations()["tenant_shards"] == []
|
||||
|
||||
|
||||
@run_only_on_default_postgres("Postgres version makes no difference here")
|
||||
def test_storage_controller_node_flap_detach_race(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
):
|
||||
"""
|
||||
Reproducer for https://github.com/neondatabase/neon/issues/10253.
|
||||
|
||||
When a node's availability flaps, the reconciliations spawned by the node
|
||||
going offline may race with the reconciliation done when then node comes
|
||||
back online.
|
||||
"""
|
||||
neon_env_builder.num_pageservers = 4
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
tenant_id = TenantId.generate()
|
||||
env.storage_controller.tenant_create(
|
||||
tenant_id,
|
||||
shard_count=2,
|
||||
)
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
stopped_nodes = [s["node_id"] for s in env.storage_controller.locate(tenant_id)]
|
||||
|
||||
def has_hit_failpoint(failpoint: str, offset: LogCursor | None = None) -> LogCursor:
|
||||
res = env.storage_controller.log_contains(f"at failpoint {failpoint}", offset=offset)
|
||||
assert res
|
||||
return res[1]
|
||||
|
||||
# Stop the nodes which host attached shards.
|
||||
# This will trigger reconciliations which pause before incrmenenting the generation,
|
||||
# and, more importantly, updating the `generation_pageserver` of the shards.
|
||||
env.storage_controller.configure_failpoints(("reconciler-pre-increment-generation", "pause"))
|
||||
for node_id in stopped_nodes:
|
||||
env.get_pageserver(node_id).stop(immediate=True)
|
||||
|
||||
def failure_handled() -> LogCursor:
|
||||
stop_offset = None
|
||||
|
||||
for node_id in stopped_nodes:
|
||||
res = env.storage_controller.log_contains(f"node {node_id} going offline")
|
||||
assert res
|
||||
stop_offset = res[1]
|
||||
|
||||
assert stop_offset
|
||||
return stop_offset
|
||||
|
||||
offset = wait_until(failure_handled)
|
||||
|
||||
# Now restart the nodes and make them pause before marking themselves as available
|
||||
# or running the activation reconciliation.
|
||||
env.storage_controller.configure_failpoints(("heartbeat-pre-node-state-configure", "pause"))
|
||||
|
||||
for node_id in stopped_nodes:
|
||||
env.get_pageserver(node_id).start(await_active=False)
|
||||
|
||||
offset = wait_until(
|
||||
lambda: has_hit_failpoint("heartbeat-pre-node-state-configure", offset=offset)
|
||||
)
|
||||
|
||||
# The nodes have restarted and are waiting to perform activaction reconciliation.
|
||||
# Unpause the initial reconciliation triggered by the nodes going offline.
|
||||
# It will attempt to detach from the old location, but notice that the old location
|
||||
# is not yet available, and then stop before processing the results of the reconciliation.
|
||||
env.storage_controller.configure_failpoints(("reconciler-epilogue", "pause"))
|
||||
env.storage_controller.configure_failpoints(("reconciler-pre-increment-generation", "off"))
|
||||
|
||||
offset = wait_until(lambda: has_hit_failpoint("reconciler-epilogue", offset=offset))
|
||||
|
||||
# Let the nodes perform activation reconciliation while still holding up processing the result
|
||||
# from the initial reconcile triggered by going offline.
|
||||
env.storage_controller.configure_failpoints(("heartbeat-pre-node-state-configure", "off"))
|
||||
|
||||
def activate_reconciliation_done():
|
||||
for node_id in stopped_nodes:
|
||||
assert env.storage_controller.log_contains(
|
||||
f"Node {node_id} transition to active", offset=offset
|
||||
)
|
||||
|
||||
wait_until(activate_reconciliation_done)
|
||||
|
||||
# Finally, allow the initial reconcile to finish up.
|
||||
env.storage_controller.configure_failpoints(("reconciler-epilogue", "off"))
|
||||
|
||||
# Give things a chance to settle and validate that no stale locations exist
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
def validate_locations():
|
||||
shard_locations = defaultdict(list)
|
||||
for ps in env.pageservers:
|
||||
locations = ps.http_client().tenant_list_locations()["tenant_shards"]
|
||||
for loc in locations:
|
||||
shard_locations[loc[0]].append(
|
||||
{"generation": loc[1]["generation"], "mode": loc[1]["mode"], "node": ps.id}
|
||||
)
|
||||
|
||||
log.info(f"Shard locations: {shard_locations}")
|
||||
|
||||
attached_locations = {
|
||||
k: list(filter(lambda loc: loc["mode"] == "AttachedSingle", v))
|
||||
for k, v in shard_locations.items()
|
||||
}
|
||||
|
||||
for shard, locs in attached_locations.items():
|
||||
assert len(locs) == 1, f"{shard} has {len(locs)} attached locations"
|
||||
|
||||
wait_until(validate_locations, timeout=10)
|
||||
|
||||
Reference in New Issue
Block a user