Compare commits

...

6 Commits

Author SHA1 Message Date
Aleksandr Sarantsev
53bdbdf71f another things 2025-06-11 09:19:07 +04:00
Aleksandr Sarantsev
652c7203b5 Merge branch 'main' into ephemeralsad/graceful-draining 2025-06-09 09:54:13 +04:00
Aleksandr Sarantsev
1d3fd5bfc7 Better storcon API 2025-06-06 10:56:58 +04:00
Aleksandr Sarantsev
cc53ed4e43 Merge branch 'main' into ephemeralsad/graceful-draining 2025-06-05 18:25:29 +04:00
Aleksandr Sarantsev
61a3258e5d Add graceful flag for storcon 2025-06-02 17:07:30 +04:00
Aleksandr Sarantsev
24e627e44c Graceful draining 2025-05-30 17:54:26 +04:00
5 changed files with 201 additions and 18 deletions

View File

@@ -65,6 +65,10 @@ enum Command {
NodeDelete {
#[arg(long)]
node_id: NodeId,
/// Force flag to delete the node without draining
#[arg(long)]
force: bool,
},
/// Delete a tombstone of node from the storage controller.
NodeDeleteTombstone {
@@ -215,6 +219,8 @@ enum Command {
StartDrain {
#[arg(long)]
node_id: NodeId,
#[arg(long)]
drain_all: Option<bool>,
},
/// Cancel draining the specified pageserver and wait for `timeout`
/// for the operation to be canceled. May be retried.
@@ -903,7 +909,39 @@ async fn main() -> anyhow::Result<()> {
.dispatch::<(), ()>(Method::POST, format!("debug/v1/node/{node_id}/drop"), None)
.await?;
}
Command::NodeDelete { node_id } => {
Command::NodeDelete { node_id, force } => {
// If force is not set, we need to drain the node first
// This prevents the node from being deleted while there are still tenants on it
if !force {
match &storcon_client
.dispatch::<(), NodeDescribeResponse>(
Method::GET,
format!("control/v1/node/{node_id}?drain_all=true"),
None,
)
.await?
.scheduling
{
NodeSchedulingPolicy::Draining | NodeSchedulingPolicy::PauseForRestart => {
println!("Node {} is already draining", node_id);
}
_ => {
println!("Node {} is not draining, starting drain", node_id);
storcon_client
.dispatch::<(), ()>(
Method::PUT,
format!("control/v1/node/{node_id}/drain?graceful=true"),
None,
)
.await?;
}
}
// Wait for the node to be drained and printing the current state
watch_node_drain(&storcon_client, node_id).await?;
}
// Finally delete the node
storcon_client
.dispatch::<(), ()>(Method::DELETE, format!("control/v1/node/{node_id}"), None)
.await?;
@@ -1151,13 +1189,14 @@ async fn main() -> anyhow::Result<()> {
failure
);
}
Command::StartDrain { node_id } => {
Command::StartDrain { node_id, drain_all } => {
let path = if drain_all == Some(true) {
format!("control/v1/node/{node_id}/drain?drain_all=true")
} else {
format!("control/v1/node/{node_id}/drain")
};
storcon_client
.dispatch::<(), ()>(
Method::PUT,
format!("control/v1/node/{node_id}/drain"),
None,
)
.dispatch::<(), ()>(Method::PUT, path, None)
.await?;
println!("Drain started for {node_id}");
}
@@ -1350,3 +1389,46 @@ async fn watch_tenant_shard(
}
Ok(())
}
async fn watch_node_drain(storcon_client: &Client, node_id: NodeId) -> anyhow::Result<()> {
loop {
let node_desc = storcon_client
.dispatch::<(), NodeDescribeResponse>(
Method::GET,
format!("control/v1/node/{node_id}"),
None,
)
.await?;
let shards_count = storcon_client
.dispatch::<(), NodeShardResponse>(
Method::GET,
format!("control/v1/node/{node_id}/shards"),
None,
)
.await?
.shards
.len();
// Print the state
if node_desc.scheduling != NodeSchedulingPolicy::Draining {
if shards_count != 0 {
anyhow::bail!(
"Node {} is not draining, but has {} shards",
node_id,
shards_count
);
}
break;
}
println!(
"Node {} is draining, {} shards remaining",
node_id, shards_count
);
tokio::time::sleep(WATCH_INTERVAL).await;
}
println!("Node {} is not draining", node_id);
Ok(())
}

View File

@@ -1036,8 +1036,9 @@ async fn handle_node_drain(req: Request<Body>) -> Result<Response<Body>, ApiErro
let state = get_state(&req);
let node_id: NodeId = parse_request_param(&req, "node_id")?;
let drain_all: bool = parse_query_param(&req, "drain_all")?.unwrap_or(false);
state.service.start_node_drain(node_id).await?;
state.service.start_node_drain(node_id, drain_all).await?;
json_response(StatusCode::ACCEPTED, ())
}

View File

@@ -7611,6 +7611,7 @@ impl Service {
pub(crate) async fn start_node_drain(
self: &Arc<Self>,
node_id: NodeId,
drain_all: bool,
) -> Result<(), ApiError> {
let (ongoing_op, node_available, node_policy, schedulable_nodes_count) = {
let locked = self.inner.read().unwrap();
@@ -7684,7 +7685,7 @@ impl Service {
}
tracing::info!("Drain background operation starting");
let res = service.drain_node(node_id, cancel).await;
let res = service.drain_node(node_id, drain_all, cancel).await;
match res {
Ok(()) => {
tracing::info!("Drain background operation completed successfully");
@@ -8850,9 +8851,30 @@ impl Service {
}
}
/// Drain a node by moving the shards attached to it as primaries.
/// This is a long running operation and it should run as a separate Tokio task.
/// Drain a node by moving shards that are attached to it, either as primaries or secondaries.
/// When `drain_all` is false, only primary attachments are moved - this is used during node
/// deployment when the node is expected to return to service soon. When `drain_all` is true,
/// both primary and secondary attachments are moved - this is used when permanently removing
/// a node.
///
/// This is a long running operation that should be spawned as a separate Tokio task.
pub(crate) async fn drain_node(
self: &Arc<Self>,
node_id: NodeId,
drain_all: bool,
cancel: CancellationToken,
) -> Result<(), OperationError> {
self.drain_primary_attachments(node_id, cancel.clone())
.await?;
if drain_all {
self.drain_secondary_attachments(node_id, cancel).await?;
}
Ok(())
}
/// Drain a node by moving the shards attached to it as primaries.
/// This is a long running operation
async fn drain_primary_attachments(
self: &Arc<Self>,
node_id: NodeId,
cancel: CancellationToken,
@@ -8868,10 +8890,11 @@ impl Service {
// to not stall the operation when a cold secondary is encountered.
const SECONDARY_WARMUP_TIMEOUT: Duration = Duration::from_secs(30);
const SECONDARY_DOWNLOAD_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
let reconciler_config = ReconcilerConfigBuilder::new(ReconcilerPriority::Normal)
.secondary_warmup_timeout(SECONDARY_WARMUP_TIMEOUT)
.secondary_download_request_timeout(SECONDARY_DOWNLOAD_REQUEST_TIMEOUT)
.build();
let reconciler_config: ReconcilerConfig =
ReconcilerConfigBuilder::new(ReconcilerPriority::Normal)
.secondary_warmup_timeout(SECONDARY_WARMUP_TIMEOUT)
.secondary_download_request_timeout(SECONDARY_DOWNLOAD_REQUEST_TIMEOUT)
.build();
let mut waiters = Vec::new();
@@ -9048,6 +9071,14 @@ impl Service {
Ok(())
}
async fn drain_secondary_attachments(
self: &Arc<Self>,
_node_id: NodeId,
_cancel: CancellationToken,
) -> Result<(), OperationError> {
Ok(())
}
/// Create a node fill plan (pick secondaries to promote), based on:
/// 1. Shards which have a secondary on this node, and this node is in their home AZ, and are currently attached to a node
/// outside their home AZ, should be migrated back here.

View File

@@ -2062,11 +2062,16 @@ class NeonStorageController(MetricsGetter, LogUtils):
headers=self.headers(TokenScope.ADMIN),
)
def node_drain(self, node_id):
log.info(f"node_drain({node_id})")
def node_drain(self, node_id: int, drain_all: bool | None = None):
log.info(f"node_drain({node_id}, drain_all={drain_all})")
url = f"{self.api}/control/v1/node/{node_id}/drain"
if drain_all is not None:
url += f"?drain_all={str(drain_all).lower()}"
self.request(
"PUT",
f"{self.api}/control/v1/node/{node_id}/drain",
url,
headers=self.headers(TokenScope.INFRA),
)

View File

@@ -3093,6 +3093,70 @@ def test_storage_controller_ps_restarted_during_drain(neon_env_builder: NeonEnvB
wait_until(reconfigure_node_again)
def test_drain_with_secondary_locations(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_pageservers = 4
env = neon_env_builder.init_configs()
env.start()
def get_pageserver_tenant_shards(node_id):
ps = env.get_pageserver(node_id)
locations = ps.http_client().tenant_list_locations()["tenant_shards"]
ret = []
for loc in locations:
ret.append(
{
"tenant_shard_id": TenantShardId.parse(loc[0]),
"mode": loc[1]["mode"],
}
)
return ret
def log_pageservers_state():
for ps in env.pageservers:
for tenant_shard in get_pageserver_tenant_shards(ps.id):
tenant_shard_id = tenant_shard["tenant_shard_id"]
mode = tenant_shard["mode"]
log.info(f"[PS {ps.id}] Seen {tenant_shard_id} in mode {mode}")
tenants = {} # id → shard_count
for shard_count in [1, 2, 4, 8]:
id, _ = env.create_tenant(shard_count=shard_count, placement_policy='{"Attached": 1}')
tenants[id] = shard_count
log.info("Pageservers before reconcilation:")
log_pageservers_state()
env.storage_controller.reconcile_until_idle()
log.info("Pageservers before drain:")
log_pageservers_state()
node_id = env.pageservers[0].id
env.storage_controller.warm_up_all_secondaries()
env.storage_controller.retryable_node_operation(
lambda ps_id: env.storage_controller.node_drain(ps_id, drain_all=True),
node_id,
max_attempts=3,
backoff=2,
)
env.storage_controller.poll_node_status(
node_id,
PageserverAvailability.ACTIVE,
PageserverSchedulingPolicy.PAUSE_FOR_RESTART,
max_attempts=6,
backoff=5,
)
log.info("Pageservers after drain:")
log_pageservers_state()
shards = get_pageserver_tenant_shards(node_id)
assert shards == []
def test_ps_unavailable_after_delete(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_pageservers = 3