mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-18 13:40:37 +00:00
Compare commits
6 Commits
alexk/fix-
...
ephemerals
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
53bdbdf71f | ||
|
|
652c7203b5 | ||
|
|
1d3fd5bfc7 | ||
|
|
cc53ed4e43 | ||
|
|
61a3258e5d | ||
|
|
24e627e44c |
@@ -65,6 +65,10 @@ enum Command {
|
||||
NodeDelete {
|
||||
#[arg(long)]
|
||||
node_id: NodeId,
|
||||
|
||||
/// Force flag to delete the node without draining
|
||||
#[arg(long)]
|
||||
force: bool,
|
||||
},
|
||||
/// Delete a tombstone of node from the storage controller.
|
||||
NodeDeleteTombstone {
|
||||
@@ -215,6 +219,8 @@ enum Command {
|
||||
StartDrain {
|
||||
#[arg(long)]
|
||||
node_id: NodeId,
|
||||
#[arg(long)]
|
||||
drain_all: Option<bool>,
|
||||
},
|
||||
/// Cancel draining the specified pageserver and wait for `timeout`
|
||||
/// for the operation to be canceled. May be retried.
|
||||
@@ -903,7 +909,39 @@ async fn main() -> anyhow::Result<()> {
|
||||
.dispatch::<(), ()>(Method::POST, format!("debug/v1/node/{node_id}/drop"), None)
|
||||
.await?;
|
||||
}
|
||||
Command::NodeDelete { node_id } => {
|
||||
Command::NodeDelete { node_id, force } => {
|
||||
// If force is not set, we need to drain the node first
|
||||
// This prevents the node from being deleted while there are still tenants on it
|
||||
if !force {
|
||||
match &storcon_client
|
||||
.dispatch::<(), NodeDescribeResponse>(
|
||||
Method::GET,
|
||||
format!("control/v1/node/{node_id}?drain_all=true"),
|
||||
None,
|
||||
)
|
||||
.await?
|
||||
.scheduling
|
||||
{
|
||||
NodeSchedulingPolicy::Draining | NodeSchedulingPolicy::PauseForRestart => {
|
||||
println!("Node {} is already draining", node_id);
|
||||
}
|
||||
_ => {
|
||||
println!("Node {} is not draining, starting drain", node_id);
|
||||
storcon_client
|
||||
.dispatch::<(), ()>(
|
||||
Method::PUT,
|
||||
format!("control/v1/node/{node_id}/drain?graceful=true"),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for the node to be drained and printing the current state
|
||||
watch_node_drain(&storcon_client, node_id).await?;
|
||||
}
|
||||
|
||||
// Finally delete the node
|
||||
storcon_client
|
||||
.dispatch::<(), ()>(Method::DELETE, format!("control/v1/node/{node_id}"), None)
|
||||
.await?;
|
||||
@@ -1151,13 +1189,14 @@ async fn main() -> anyhow::Result<()> {
|
||||
failure
|
||||
);
|
||||
}
|
||||
Command::StartDrain { node_id } => {
|
||||
Command::StartDrain { node_id, drain_all } => {
|
||||
let path = if drain_all == Some(true) {
|
||||
format!("control/v1/node/{node_id}/drain?drain_all=true")
|
||||
} else {
|
||||
format!("control/v1/node/{node_id}/drain")
|
||||
};
|
||||
storcon_client
|
||||
.dispatch::<(), ()>(
|
||||
Method::PUT,
|
||||
format!("control/v1/node/{node_id}/drain"),
|
||||
None,
|
||||
)
|
||||
.dispatch::<(), ()>(Method::PUT, path, None)
|
||||
.await?;
|
||||
println!("Drain started for {node_id}");
|
||||
}
|
||||
@@ -1350,3 +1389,46 @@ async fn watch_tenant_shard(
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn watch_node_drain(storcon_client: &Client, node_id: NodeId) -> anyhow::Result<()> {
|
||||
loop {
|
||||
let node_desc = storcon_client
|
||||
.dispatch::<(), NodeDescribeResponse>(
|
||||
Method::GET,
|
||||
format!("control/v1/node/{node_id}"),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
let shards_count = storcon_client
|
||||
.dispatch::<(), NodeShardResponse>(
|
||||
Method::GET,
|
||||
format!("control/v1/node/{node_id}/shards"),
|
||||
None,
|
||||
)
|
||||
.await?
|
||||
.shards
|
||||
.len();
|
||||
|
||||
// Print the state
|
||||
if node_desc.scheduling != NodeSchedulingPolicy::Draining {
|
||||
if shards_count != 0 {
|
||||
anyhow::bail!(
|
||||
"Node {} is not draining, but has {} shards",
|
||||
node_id,
|
||||
shards_count
|
||||
);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
println!(
|
||||
"Node {} is draining, {} shards remaining",
|
||||
node_id, shards_count
|
||||
);
|
||||
|
||||
tokio::time::sleep(WATCH_INTERVAL).await;
|
||||
}
|
||||
|
||||
println!("Node {} is not draining", node_id);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1036,8 +1036,9 @@ async fn handle_node_drain(req: Request<Body>) -> Result<Response<Body>, ApiErro
|
||||
|
||||
let state = get_state(&req);
|
||||
let node_id: NodeId = parse_request_param(&req, "node_id")?;
|
||||
let drain_all: bool = parse_query_param(&req, "drain_all")?.unwrap_or(false);
|
||||
|
||||
state.service.start_node_drain(node_id).await?;
|
||||
state.service.start_node_drain(node_id, drain_all).await?;
|
||||
|
||||
json_response(StatusCode::ACCEPTED, ())
|
||||
}
|
||||
|
||||
@@ -7611,6 +7611,7 @@ impl Service {
|
||||
pub(crate) async fn start_node_drain(
|
||||
self: &Arc<Self>,
|
||||
node_id: NodeId,
|
||||
drain_all: bool,
|
||||
) -> Result<(), ApiError> {
|
||||
let (ongoing_op, node_available, node_policy, schedulable_nodes_count) = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
@@ -7684,7 +7685,7 @@ impl Service {
|
||||
}
|
||||
|
||||
tracing::info!("Drain background operation starting");
|
||||
let res = service.drain_node(node_id, cancel).await;
|
||||
let res = service.drain_node(node_id, drain_all, cancel).await;
|
||||
match res {
|
||||
Ok(()) => {
|
||||
tracing::info!("Drain background operation completed successfully");
|
||||
@@ -8850,9 +8851,30 @@ impl Service {
|
||||
}
|
||||
}
|
||||
|
||||
/// Drain a node by moving the shards attached to it as primaries.
|
||||
/// This is a long running operation and it should run as a separate Tokio task.
|
||||
/// Drain a node by moving shards that are attached to it, either as primaries or secondaries.
|
||||
/// When `drain_all` is false, only primary attachments are moved - this is used during node
|
||||
/// deployment when the node is expected to return to service soon. When `drain_all` is true,
|
||||
/// both primary and secondary attachments are moved - this is used when permanently removing
|
||||
/// a node.
|
||||
///
|
||||
/// This is a long running operation that should be spawned as a separate Tokio task.
|
||||
pub(crate) async fn drain_node(
|
||||
self: &Arc<Self>,
|
||||
node_id: NodeId,
|
||||
drain_all: bool,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<(), OperationError> {
|
||||
self.drain_primary_attachments(node_id, cancel.clone())
|
||||
.await?;
|
||||
if drain_all {
|
||||
self.drain_secondary_attachments(node_id, cancel).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Drain a node by moving the shards attached to it as primaries.
|
||||
/// This is a long running operation
|
||||
async fn drain_primary_attachments(
|
||||
self: &Arc<Self>,
|
||||
node_id: NodeId,
|
||||
cancel: CancellationToken,
|
||||
@@ -8868,10 +8890,11 @@ impl Service {
|
||||
// to not stall the operation when a cold secondary is encountered.
|
||||
const SECONDARY_WARMUP_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
const SECONDARY_DOWNLOAD_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
let reconciler_config = ReconcilerConfigBuilder::new(ReconcilerPriority::Normal)
|
||||
.secondary_warmup_timeout(SECONDARY_WARMUP_TIMEOUT)
|
||||
.secondary_download_request_timeout(SECONDARY_DOWNLOAD_REQUEST_TIMEOUT)
|
||||
.build();
|
||||
let reconciler_config: ReconcilerConfig =
|
||||
ReconcilerConfigBuilder::new(ReconcilerPriority::Normal)
|
||||
.secondary_warmup_timeout(SECONDARY_WARMUP_TIMEOUT)
|
||||
.secondary_download_request_timeout(SECONDARY_DOWNLOAD_REQUEST_TIMEOUT)
|
||||
.build();
|
||||
|
||||
let mut waiters = Vec::new();
|
||||
|
||||
@@ -9048,6 +9071,14 @@ impl Service {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn drain_secondary_attachments(
|
||||
self: &Arc<Self>,
|
||||
_node_id: NodeId,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<(), OperationError> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Create a node fill plan (pick secondaries to promote), based on:
|
||||
/// 1. Shards which have a secondary on this node, and this node is in their home AZ, and are currently attached to a node
|
||||
/// outside their home AZ, should be migrated back here.
|
||||
|
||||
@@ -2062,11 +2062,16 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
|
||||
def node_drain(self, node_id):
|
||||
log.info(f"node_drain({node_id})")
|
||||
def node_drain(self, node_id: int, drain_all: bool | None = None):
|
||||
log.info(f"node_drain({node_id}, drain_all={drain_all})")
|
||||
|
||||
url = f"{self.api}/control/v1/node/{node_id}/drain"
|
||||
if drain_all is not None:
|
||||
url += f"?drain_all={str(drain_all).lower()}"
|
||||
|
||||
self.request(
|
||||
"PUT",
|
||||
f"{self.api}/control/v1/node/{node_id}/drain",
|
||||
url,
|
||||
headers=self.headers(TokenScope.INFRA),
|
||||
)
|
||||
|
||||
|
||||
@@ -3093,6 +3093,70 @@ def test_storage_controller_ps_restarted_during_drain(neon_env_builder: NeonEnvB
|
||||
wait_until(reconfigure_node_again)
|
||||
|
||||
|
||||
def test_drain_with_secondary_locations(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_pageservers = 4
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
def get_pageserver_tenant_shards(node_id):
|
||||
ps = env.get_pageserver(node_id)
|
||||
locations = ps.http_client().tenant_list_locations()["tenant_shards"]
|
||||
ret = []
|
||||
for loc in locations:
|
||||
ret.append(
|
||||
{
|
||||
"tenant_shard_id": TenantShardId.parse(loc[0]),
|
||||
"mode": loc[1]["mode"],
|
||||
}
|
||||
)
|
||||
return ret
|
||||
|
||||
def log_pageservers_state():
|
||||
for ps in env.pageservers:
|
||||
for tenant_shard in get_pageserver_tenant_shards(ps.id):
|
||||
tenant_shard_id = tenant_shard["tenant_shard_id"]
|
||||
mode = tenant_shard["mode"]
|
||||
log.info(f"[PS {ps.id}] Seen {tenant_shard_id} in mode {mode}")
|
||||
|
||||
tenants = {} # id → shard_count
|
||||
for shard_count in [1, 2, 4, 8]:
|
||||
id, _ = env.create_tenant(shard_count=shard_count, placement_policy='{"Attached": 1}')
|
||||
tenants[id] = shard_count
|
||||
|
||||
log.info("Pageservers before reconcilation:")
|
||||
log_pageservers_state()
|
||||
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
log.info("Pageservers before drain:")
|
||||
log_pageservers_state()
|
||||
|
||||
node_id = env.pageservers[0].id
|
||||
|
||||
env.storage_controller.warm_up_all_secondaries()
|
||||
env.storage_controller.retryable_node_operation(
|
||||
lambda ps_id: env.storage_controller.node_drain(ps_id, drain_all=True),
|
||||
node_id,
|
||||
max_attempts=3,
|
||||
backoff=2,
|
||||
)
|
||||
|
||||
env.storage_controller.poll_node_status(
|
||||
node_id,
|
||||
PageserverAvailability.ACTIVE,
|
||||
PageserverSchedulingPolicy.PAUSE_FOR_RESTART,
|
||||
max_attempts=6,
|
||||
backoff=5,
|
||||
)
|
||||
|
||||
log.info("Pageservers after drain:")
|
||||
log_pageservers_state()
|
||||
|
||||
shards = get_pageserver_tenant_shards(node_id)
|
||||
assert shards == []
|
||||
|
||||
|
||||
def test_ps_unavailable_after_delete(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_pageservers = 3
|
||||
|
||||
|
||||
Reference in New Issue
Block a user