mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 01:12:56 +00:00
storcon: skip draining shard if it's secondary is lagging too much (#8644)
## Problem Migrations of tenant shards with cold secondaries are holding up drains in during production deployments. ## Summary of changes If a secondary locations is lagging by more than 256MiB (configurable, but that's the default), then skip cutting it over to the secondary as part of the node drain.
This commit is contained in:
@@ -158,6 +158,8 @@ pub struct NeonStorageControllerConf {
|
||||
|
||||
/// Threshold for auto-splitting a tenant into shards
|
||||
pub split_threshold: Option<u64>,
|
||||
|
||||
pub max_secondary_lag_bytes: Option<u64>,
|
||||
}
|
||||
|
||||
impl NeonStorageControllerConf {
|
||||
@@ -173,6 +175,7 @@ impl Default for NeonStorageControllerConf {
|
||||
max_offline: Self::DEFAULT_MAX_OFFLINE_INTERVAL,
|
||||
max_warming_up: Self::DEFAULT_MAX_WARMING_UP_INTERVAL,
|
||||
split_threshold: None,
|
||||
max_secondary_lag_bytes: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -383,6 +383,10 @@ impl StorageController {
|
||||
args.push(format!("--split-threshold={split_threshold}"))
|
||||
}
|
||||
|
||||
if let Some(lag) = self.config.max_secondary_lag_bytes.as_ref() {
|
||||
args.push(format!("--max-secondary-lag-bytes={lag}"))
|
||||
}
|
||||
|
||||
args.push(format!(
|
||||
"--neon-local-repo-dir={}",
|
||||
self.env.base_data_dir.display()
|
||||
|
||||
@@ -55,7 +55,7 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::{info_span, instrument, warn, Instrument};
|
||||
use utils::{
|
||||
backoff, completion::Barrier, crashsafe::path_with_suffix_extension, failpoint_support, fs_ext,
|
||||
id::TimelineId, serde_system_time,
|
||||
id::TimelineId, pausable_failpoint, serde_system_time,
|
||||
};
|
||||
|
||||
use super::{
|
||||
@@ -1146,12 +1146,14 @@ impl<'a> TenantDownloader<'a> {
|
||||
layer: HeatMapLayer,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Option<HeatMapLayer>, UpdateError> {
|
||||
// Failpoint for simulating slow remote storage
|
||||
// Failpoints for simulating slow remote storage
|
||||
failpoint_support::sleep_millis_async!(
|
||||
"secondary-layer-download-sleep",
|
||||
&self.secondary_state.cancel
|
||||
);
|
||||
|
||||
pausable_failpoint!("secondary-layer-download-pausable");
|
||||
|
||||
let local_path = local_layer_path(
|
||||
self.conf,
|
||||
tenant_shard_id,
|
||||
|
||||
225
storage_controller/src/drain_utils.rs
Normal file
225
storage_controller/src/drain_utils.rs
Normal file
@@ -0,0 +1,225 @@
|
||||
use std::{
|
||||
collections::{BTreeMap, HashMap},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use pageserver_api::controller_api::NodeSchedulingPolicy;
|
||||
use utils::{id::NodeId, shard::TenantShardId};
|
||||
|
||||
use crate::{
|
||||
background_node_operations::OperationError, node::Node, scheduler::Scheduler,
|
||||
tenant_shard::TenantShard,
|
||||
};
|
||||
|
||||
pub(crate) struct TenantShardIterator<F> {
|
||||
tenants_accessor: F,
|
||||
inspected_all_shards: bool,
|
||||
last_inspected_shard: Option<TenantShardId>,
|
||||
}
|
||||
|
||||
/// A simple iterator which can be used in tandem with [`crate::service::Service`]
|
||||
/// to iterate over all known tenant shard ids without holding the lock on the
|
||||
/// service state at all times.
|
||||
impl<F> TenantShardIterator<F>
|
||||
where
|
||||
F: Fn(Option<TenantShardId>) -> Option<TenantShardId>,
|
||||
{
|
||||
pub(crate) fn new(tenants_accessor: F) -> Self {
|
||||
Self {
|
||||
tenants_accessor,
|
||||
inspected_all_shards: false,
|
||||
last_inspected_shard: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the next tenant shard id if one exists
|
||||
pub(crate) fn next(&mut self) -> Option<TenantShardId> {
|
||||
if self.inspected_all_shards {
|
||||
return None;
|
||||
}
|
||||
|
||||
match (self.tenants_accessor)(self.last_inspected_shard) {
|
||||
Some(tid) => {
|
||||
self.last_inspected_shard = Some(tid);
|
||||
Some(tid)
|
||||
}
|
||||
None => {
|
||||
self.inspected_all_shards = true;
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true when the end of the iterator is reached and false otherwise
|
||||
pub(crate) fn finished(&self) -> bool {
|
||||
self.inspected_all_shards
|
||||
}
|
||||
}
|
||||
|
||||
/// Check that the state of the node being drained is as expected:
|
||||
/// node is present in memory and scheduling policy is set to [`NodeSchedulingPolicy::Draining`]
|
||||
pub(crate) fn validate_node_state(
|
||||
node_id: &NodeId,
|
||||
nodes: Arc<HashMap<NodeId, Node>>,
|
||||
) -> Result<(), OperationError> {
|
||||
let node = nodes.get(node_id).ok_or(OperationError::NodeStateChanged(
|
||||
format!("node {} was removed", node_id).into(),
|
||||
))?;
|
||||
|
||||
let current_policy = node.get_scheduling();
|
||||
if !matches!(current_policy, NodeSchedulingPolicy::Draining) {
|
||||
// TODO(vlad): maybe cancel pending reconciles before erroring out. need to think
|
||||
// about it
|
||||
return Err(OperationError::NodeStateChanged(
|
||||
format!("node {} changed state to {:?}", node_id, current_policy).into(),
|
||||
));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Struct that houses a few utility methods for draining pageserver nodes
|
||||
pub(crate) struct TenantShardDrain {
|
||||
pub(crate) drained_node: NodeId,
|
||||
pub(crate) tenant_shard_id: TenantShardId,
|
||||
}
|
||||
|
||||
impl TenantShardDrain {
|
||||
/// Check if the tenant shard under question is eligible for drainining:
|
||||
/// it's primary attachment is on the node being drained
|
||||
pub(crate) fn tenant_shard_eligible_for_drain(
|
||||
&self,
|
||||
tenants: &BTreeMap<TenantShardId, TenantShard>,
|
||||
scheduler: &Scheduler,
|
||||
) -> Option<NodeId> {
|
||||
let tenant_shard = tenants.get(&self.tenant_shard_id)?;
|
||||
|
||||
if *tenant_shard.intent.get_attached() != Some(self.drained_node) {
|
||||
return None;
|
||||
}
|
||||
|
||||
match scheduler.node_preferred(tenant_shard.intent.get_secondary()) {
|
||||
Some(node) => Some(node),
|
||||
None => {
|
||||
tracing::warn!(
|
||||
tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
|
||||
"No eligible secondary while draining {}", self.drained_node
|
||||
);
|
||||
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempt to reschedule the tenant shard under question to one of its secondary locations
|
||||
/// Returns an Err when the operation should be aborted and Ok(None) when the tenant shard
|
||||
/// should be skipped.
|
||||
pub(crate) fn reschedule_to_secondary<'a>(
|
||||
&self,
|
||||
destination: NodeId,
|
||||
tenants: &'a mut BTreeMap<TenantShardId, TenantShard>,
|
||||
scheduler: &mut Scheduler,
|
||||
nodes: &Arc<HashMap<NodeId, Node>>,
|
||||
) -> Result<Option<&'a mut TenantShard>, OperationError> {
|
||||
let tenant_shard = match tenants.get_mut(&self.tenant_shard_id) {
|
||||
Some(some) => some,
|
||||
None => {
|
||||
// Tenant shard was removed in the meantime.
|
||||
// Skip to the next one, but don't fail the overall operation
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
|
||||
if !nodes.contains_key(&destination) {
|
||||
return Err(OperationError::NodeStateChanged(
|
||||
format!("node {} was removed", destination).into(),
|
||||
));
|
||||
}
|
||||
|
||||
if !tenant_shard.intent.get_secondary().contains(&destination) {
|
||||
tracing::info!(
|
||||
tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
|
||||
"Secondary moved away from {destination} during drain"
|
||||
);
|
||||
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
match tenant_shard.reschedule_to_secondary(Some(destination), scheduler) {
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
|
||||
"Scheduling error when draining pageserver {} : {}", self.drained_node, e
|
||||
);
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
Ok(()) => {
|
||||
let scheduled_to = tenant_shard.intent.get_attached();
|
||||
tracing::info!(
|
||||
tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
|
||||
"Rescheduled shard while draining node {}: {} -> {:?}",
|
||||
self.drained_node,
|
||||
self.drained_node,
|
||||
scheduled_to
|
||||
);
|
||||
|
||||
Ok(Some(tenant_shard))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use utils::{
|
||||
id::TenantId,
|
||||
shard::{ShardCount, ShardNumber, TenantShardId},
|
||||
};
|
||||
|
||||
use super::TenantShardIterator;
|
||||
|
||||
#[test]
|
||||
fn test_tenant_shard_iterator() {
|
||||
let tenant_id = TenantId::generate();
|
||||
let shard_count = ShardCount(8);
|
||||
|
||||
let mut tenant_shards = Vec::default();
|
||||
for i in 0..shard_count.0 {
|
||||
tenant_shards.push((
|
||||
TenantShardId {
|
||||
tenant_id,
|
||||
shard_number: ShardNumber(i),
|
||||
shard_count,
|
||||
},
|
||||
(),
|
||||
))
|
||||
}
|
||||
|
||||
let tenant_shards = Arc::new(tenant_shards);
|
||||
|
||||
let mut tid_iter = TenantShardIterator::new({
|
||||
let tenants = tenant_shards.clone();
|
||||
move |last_inspected_shard: Option<TenantShardId>| {
|
||||
let entry = match last_inspected_shard {
|
||||
Some(skip_past) => {
|
||||
let mut cursor = tenants.iter().skip_while(|(tid, _)| *tid != skip_past);
|
||||
cursor.nth(1)
|
||||
}
|
||||
None => tenants.first(),
|
||||
};
|
||||
|
||||
entry.map(|(tid, _)| tid).copied()
|
||||
}
|
||||
});
|
||||
|
||||
let mut iterated_over = Vec::default();
|
||||
while let Some(tid) = tid_iter.next() {
|
||||
iterated_over.push((tid, ()));
|
||||
}
|
||||
|
||||
assert_eq!(iterated_over, *tenant_shards);
|
||||
}
|
||||
}
|
||||
@@ -4,6 +4,7 @@ use utils::seqwait::MonotonicCounter;
|
||||
mod auth;
|
||||
mod background_node_operations;
|
||||
mod compute_hook;
|
||||
mod drain_utils;
|
||||
mod heartbeater;
|
||||
pub mod http;
|
||||
mod id_lock_map;
|
||||
|
||||
@@ -92,6 +92,11 @@ struct Cli {
|
||||
/// Chaos testing
|
||||
#[arg(long)]
|
||||
chaos_interval: Option<humantime::Duration>,
|
||||
|
||||
// Maximum acceptable lag for the secondary location while draining
|
||||
// a pageserver
|
||||
#[arg(long)]
|
||||
max_secondary_lag_bytes: Option<u64>,
|
||||
}
|
||||
|
||||
enum StrictMode {
|
||||
@@ -279,6 +284,7 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
.unwrap_or(RECONCILER_CONCURRENCY_DEFAULT),
|
||||
split_threshold: args.split_threshold,
|
||||
neon_local_repo_dir: args.neon_local_repo_dir,
|
||||
max_secondary_lag_bytes: args.max_secondary_lag_bytes,
|
||||
};
|
||||
|
||||
// After loading secrets & config, but before starting anything else, apply database migrations
|
||||
|
||||
@@ -39,6 +39,9 @@ pub(super) struct Reconciler {
|
||||
/// to detach this tenant shard.
|
||||
pub(crate) detach: Vec<Node>,
|
||||
|
||||
/// Configuration specific to this reconciler
|
||||
pub(crate) reconciler_config: ReconcilerConfig,
|
||||
|
||||
pub(crate) config: TenantConfig,
|
||||
pub(crate) observed: ObservedState,
|
||||
|
||||
@@ -73,6 +76,65 @@ pub(super) struct Reconciler {
|
||||
pub(crate) persistence: Arc<Persistence>,
|
||||
}
|
||||
|
||||
pub(crate) struct ReconcilerConfigBuilder {
|
||||
config: ReconcilerConfig,
|
||||
}
|
||||
|
||||
impl ReconcilerConfigBuilder {
|
||||
pub(crate) fn new() -> Self {
|
||||
Self {
|
||||
config: ReconcilerConfig::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn secondary_warmup_timeout(self, value: Duration) -> Self {
|
||||
Self {
|
||||
config: ReconcilerConfig {
|
||||
secondary_warmup_timeout: Some(value),
|
||||
..self.config
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn secondary_download_request_timeout(self, value: Duration) -> Self {
|
||||
Self {
|
||||
config: ReconcilerConfig {
|
||||
secondary_download_request_timeout: Some(value),
|
||||
..self.config
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn build(self) -> ReconcilerConfig {
|
||||
self.config
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Debug, Copy, Clone)]
|
||||
pub(crate) struct ReconcilerConfig {
|
||||
// During live migration give up on warming-up the secondary
|
||||
// after this timeout.
|
||||
secondary_warmup_timeout: Option<Duration>,
|
||||
|
||||
// During live migrations this is the amount of time that
|
||||
// the pagserver will hold our poll.
|
||||
secondary_download_request_timeout: Option<Duration>,
|
||||
}
|
||||
|
||||
impl ReconcilerConfig {
|
||||
pub(crate) fn get_secondary_warmup_timeout(&self) -> Duration {
|
||||
const SECONDARY_WARMUP_TIMEOUT_DEFAULT: Duration = Duration::from_secs(300);
|
||||
self.secondary_warmup_timeout
|
||||
.unwrap_or(SECONDARY_WARMUP_TIMEOUT_DEFAULT)
|
||||
}
|
||||
|
||||
pub(crate) fn get_secondary_download_request_timeout(&self) -> Duration {
|
||||
const SECONDARY_DOWNLOAD_REQUEST_TIMEOUT_DEFAULT: Duration = Duration::from_secs(20);
|
||||
self.secondary_download_request_timeout
|
||||
.unwrap_or(SECONDARY_DOWNLOAD_REQUEST_TIMEOUT_DEFAULT)
|
||||
}
|
||||
}
|
||||
|
||||
/// RAII resource units granted to a Reconciler, which it should keep alive until it finishes doing I/O
|
||||
pub(crate) struct ReconcileUnits {
|
||||
_sem_units: tokio::sync::OwnedSemaphorePermit,
|
||||
@@ -300,11 +362,13 @@ impl Reconciler {
|
||||
) -> Result<(), ReconcileError> {
|
||||
// This is not the timeout for a request, but the total amount of time we're willing to wait
|
||||
// for a secondary location to get up to date before
|
||||
const TOTAL_DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(300);
|
||||
let total_download_timeout = self.reconciler_config.get_secondary_warmup_timeout();
|
||||
|
||||
// This the long-polling interval for the secondary download requests we send to destination pageserver
|
||||
// during a migration.
|
||||
const REQUEST_DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(20);
|
||||
let request_download_timeout = self
|
||||
.reconciler_config
|
||||
.get_secondary_download_request_timeout();
|
||||
|
||||
let started_at = Instant::now();
|
||||
|
||||
@@ -315,14 +379,14 @@ impl Reconciler {
|
||||
client
|
||||
.tenant_secondary_download(
|
||||
tenant_shard_id,
|
||||
Some(REQUEST_DOWNLOAD_TIMEOUT),
|
||||
Some(request_download_timeout),
|
||||
)
|
||||
.await
|
||||
},
|
||||
&self.service_config.jwt_token,
|
||||
1,
|
||||
3,
|
||||
REQUEST_DOWNLOAD_TIMEOUT * 2,
|
||||
request_download_timeout * 2,
|
||||
&self.cancel,
|
||||
)
|
||||
.await
|
||||
@@ -350,7 +414,7 @@ impl Reconciler {
|
||||
return Ok(());
|
||||
} else if status == StatusCode::ACCEPTED {
|
||||
let total_runtime = started_at.elapsed();
|
||||
if total_runtime > TOTAL_DOWNLOAD_TIMEOUT {
|
||||
if total_runtime > total_download_timeout {
|
||||
tracing::warn!("Timed out after {}ms downloading layers to {node}. Progress so far: {}/{} layers, {}/{} bytes",
|
||||
total_runtime.as_millis(),
|
||||
progress.layers_downloaded,
|
||||
|
||||
@@ -14,10 +14,11 @@ use crate::{
|
||||
Drain, Fill, Operation, OperationError, OperationHandler, MAX_RECONCILES_PER_OPERATION,
|
||||
},
|
||||
compute_hook::NotifyError,
|
||||
drain_utils::{self, TenantShardDrain, TenantShardIterator},
|
||||
id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, TracingExclusiveGuard},
|
||||
metrics::LeadershipStatusGroup,
|
||||
persistence::{AbortShardSplitStatus, MetadataHealthPersistence, TenantFilter},
|
||||
reconciler::{ReconcileError, ReconcileUnits},
|
||||
reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder},
|
||||
scheduler::{MaySchedule, ScheduleContext, ScheduleMode},
|
||||
tenant_shard::{
|
||||
MigrateAttachment, ReconcileNeeded, ReconcilerStatus, ScheduleOptimization,
|
||||
@@ -325,6 +326,12 @@ pub struct Config {
|
||||
|
||||
// TODO: make this cfg(feature = "testing")
|
||||
pub neon_local_repo_dir: Option<PathBuf>,
|
||||
|
||||
// Maximum acceptable download lag for the secondary location
|
||||
// while draining a node. If the secondary location is lagging
|
||||
// by more than the configured amount, then the secondary is not
|
||||
// upgraded to primary.
|
||||
pub max_secondary_lag_bytes: Option<u64>,
|
||||
}
|
||||
|
||||
impl From<DatabaseError> for ApiError {
|
||||
@@ -5187,11 +5194,22 @@ impl Service {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Wrap [`TenantShard`] reconciliation methods with acquisition of [`Gate`] and [`ReconcileUnits`],
|
||||
/// Like [`Self::maybe_configured_reconcile_shard`], but uses the default reconciler
|
||||
/// configuration
|
||||
fn maybe_reconcile_shard(
|
||||
&self,
|
||||
shard: &mut TenantShard,
|
||||
nodes: &Arc<HashMap<NodeId, Node>>,
|
||||
) -> Option<ReconcilerWaiter> {
|
||||
self.maybe_configured_reconcile_shard(shard, nodes, ReconcilerConfig::default())
|
||||
}
|
||||
|
||||
/// Wrap [`TenantShard`] reconciliation methods with acquisition of [`Gate`] and [`ReconcileUnits`],
|
||||
fn maybe_configured_reconcile_shard(
|
||||
&self,
|
||||
shard: &mut TenantShard,
|
||||
nodes: &Arc<HashMap<NodeId, Node>>,
|
||||
reconciler_config: ReconcilerConfig,
|
||||
) -> Option<ReconcilerWaiter> {
|
||||
let reconcile_needed = shard.get_reconcile_needed(nodes);
|
||||
|
||||
@@ -5241,6 +5259,7 @@ impl Service {
|
||||
&self.result_tx,
|
||||
nodes,
|
||||
&self.compute_hook,
|
||||
reconciler_config,
|
||||
&self.config,
|
||||
&self.persistence,
|
||||
units,
|
||||
@@ -5715,18 +5734,92 @@ impl Service {
|
||||
self.gate.close().await;
|
||||
}
|
||||
|
||||
/// Spot check the download lag for a secondary location of a shard.
|
||||
/// Should be used as a heuristic, since it's not always precise: the
|
||||
/// secondary might have not downloaded the new heat map yet and, hence,
|
||||
/// is not aware of the lag.
|
||||
///
|
||||
/// Returns:
|
||||
/// * Ok(None) if the lag could not be determined from the status,
|
||||
/// * Ok(Some(_)) if the lag could be determind
|
||||
/// * Err on failures to query the pageserver.
|
||||
async fn secondary_lag(
|
||||
&self,
|
||||
secondary: &NodeId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
) -> Result<Option<u64>, mgmt_api::Error> {
|
||||
let nodes = self.inner.read().unwrap().nodes.clone();
|
||||
let node = nodes.get(secondary).ok_or(mgmt_api::Error::ApiError(
|
||||
StatusCode::NOT_FOUND,
|
||||
format!("Node with id {} not found", secondary),
|
||||
))?;
|
||||
|
||||
match node
|
||||
.with_client_retries(
|
||||
|client| async move { client.tenant_secondary_status(tenant_shard_id).await },
|
||||
&self.config.jwt_token,
|
||||
1,
|
||||
3,
|
||||
Duration::from_millis(250),
|
||||
&self.cancel,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Some(Ok(status)) => match status.heatmap_mtime {
|
||||
Some(_) => Ok(Some(status.bytes_total - status.bytes_downloaded)),
|
||||
None => Ok(None),
|
||||
},
|
||||
Some(Err(e)) => Err(e),
|
||||
None => Err(mgmt_api::Error::Cancelled),
|
||||
}
|
||||
}
|
||||
|
||||
/// Drain a node by moving the shards attached to it as primaries.
|
||||
/// This is a long running operation and it should run as a separate Tokio task.
|
||||
pub(crate) async fn drain_node(
|
||||
&self,
|
||||
self: &Arc<Self>,
|
||||
node_id: NodeId,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<(), OperationError> {
|
||||
let mut last_inspected_shard: Option<TenantShardId> = None;
|
||||
let mut inspected_all_shards = false;
|
||||
const MAX_SECONDARY_LAG_BYTES_DEFAULT: u64 = 256 * 1024 * 1024;
|
||||
let max_secondary_lag_bytes = self
|
||||
.config
|
||||
.max_secondary_lag_bytes
|
||||
.unwrap_or(MAX_SECONDARY_LAG_BYTES_DEFAULT);
|
||||
|
||||
// By default, live migrations are generous about the wait time for getting
|
||||
// the secondary location up to speed. When draining, give up earlier in order
|
||||
// to not stall the operation when a cold secondary is encountered.
|
||||
const SECONDARY_WARMUP_TIMEOUT: Duration = Duration::from_secs(20);
|
||||
const SECONDARY_DOWNLOAD_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
let reconciler_config = ReconcilerConfigBuilder::new()
|
||||
.secondary_warmup_timeout(SECONDARY_WARMUP_TIMEOUT)
|
||||
.secondary_download_request_timeout(SECONDARY_DOWNLOAD_REQUEST_TIMEOUT)
|
||||
.build();
|
||||
|
||||
let mut waiters = Vec::new();
|
||||
|
||||
while !inspected_all_shards {
|
||||
let mut tid_iter = TenantShardIterator::new({
|
||||
let service = self.clone();
|
||||
move |last_inspected_shard: Option<TenantShardId>| {
|
||||
let locked = &service.inner.read().unwrap();
|
||||
let tenants = &locked.tenants;
|
||||
let entry = match last_inspected_shard {
|
||||
Some(skip_past) => {
|
||||
// Skip to the last seen tenant shard id
|
||||
let mut cursor = tenants.iter().skip_while(|(tid, _)| **tid != skip_past);
|
||||
|
||||
// Skip past the last seen
|
||||
cursor.nth(1)
|
||||
}
|
||||
None => tenants.first_key_value(),
|
||||
};
|
||||
|
||||
entry.map(|(tid, _)| tid).copied()
|
||||
}
|
||||
});
|
||||
|
||||
while !tid_iter.finished() {
|
||||
if cancel.is_cancelled() {
|
||||
match self
|
||||
.node_configure(node_id, None, Some(NodeSchedulingPolicy::Active))
|
||||
@@ -5745,71 +5838,82 @@ impl Service {
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let (nodes, tenants, scheduler) = locked.parts_mut();
|
||||
drain_utils::validate_node_state(&node_id, self.inner.read().unwrap().nodes.clone())?;
|
||||
|
||||
let node = nodes.get(&node_id).ok_or(OperationError::NodeStateChanged(
|
||||
format!("node {node_id} was removed").into(),
|
||||
))?;
|
||||
|
||||
let current_policy = node.get_scheduling();
|
||||
if !matches!(current_policy, NodeSchedulingPolicy::Draining) {
|
||||
// TODO(vlad): maybe cancel pending reconciles before erroring out. need to think
|
||||
// about it
|
||||
return Err(OperationError::NodeStateChanged(
|
||||
format!("node {node_id} changed state to {current_policy:?}").into(),
|
||||
));
|
||||
}
|
||||
|
||||
let mut cursor = tenants.iter_mut().skip_while({
|
||||
let skip_past = last_inspected_shard;
|
||||
move |(tid, _)| match skip_past {
|
||||
Some(last) => **tid != last,
|
||||
None => false,
|
||||
while waiters.len() < MAX_RECONCILES_PER_OPERATION {
|
||||
let tid = match tid_iter.next() {
|
||||
Some(tid) => tid,
|
||||
None => {
|
||||
break;
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
while waiters.len() < MAX_RECONCILES_PER_OPERATION {
|
||||
let (tid, tenant_shard) = match cursor.next() {
|
||||
Some(some) => some,
|
||||
let tid_drain = TenantShardDrain {
|
||||
drained_node: node_id,
|
||||
tenant_shard_id: tid,
|
||||
};
|
||||
|
||||
let dest_node_id = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
|
||||
match tid_drain
|
||||
.tenant_shard_eligible_for_drain(&locked.tenants, &locked.scheduler)
|
||||
{
|
||||
Some(node_id) => node_id,
|
||||
None => {
|
||||
inspected_all_shards = true;
|
||||
break;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
// If the shard is not attached to the node being drained, skip it.
|
||||
if *tenant_shard.intent.get_attached() != Some(node_id) {
|
||||
last_inspected_shard = Some(*tid);
|
||||
match self.secondary_lag(&dest_node_id, tid).await {
|
||||
Ok(Some(lag)) if lag <= max_secondary_lag_bytes => {
|
||||
// The secondary is reasonably up to date.
|
||||
// Migrate to it
|
||||
}
|
||||
Ok(Some(lag)) => {
|
||||
tracing::info!(
|
||||
tenant_id=%tid.tenant_id, shard_id=%tid.shard_slug(),
|
||||
"Secondary on node {dest_node_id} is lagging by {lag}. Skipping reconcile."
|
||||
);
|
||||
continue;
|
||||
}
|
||||
Ok(None) => {
|
||||
tracing::info!(
|
||||
tenant_id=%tid.tenant_id, shard_id=%tid.shard_slug(),
|
||||
"Could not determine lag for secondary on node {dest_node_id}. Skipping reconcile."
|
||||
);
|
||||
continue;
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::warn!(
|
||||
tenant_id=%tid.tenant_id, shard_id=%tid.shard_slug(),
|
||||
"Failed to get secondary lag from node {dest_node_id}. Skipping reconcile: {err}"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
match tenant_shard.reschedule_to_secondary(None, scheduler) {
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
tenant_id=%tid.tenant_id, shard_id=%tid.shard_slug(),
|
||||
"Scheduling error when draining pageserver {} : {e}", node_id
|
||||
);
|
||||
}
|
||||
Ok(()) => {
|
||||
let scheduled_to = tenant_shard.intent.get_attached();
|
||||
tracing::info!(
|
||||
tenant_id=%tid.tenant_id, shard_id=%tid.shard_slug(),
|
||||
"Rescheduled shard while draining node {}: {} -> {:?}",
|
||||
node_id,
|
||||
node_id,
|
||||
scheduled_to
|
||||
);
|
||||
{
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let (nodes, tenants, scheduler) = locked.parts_mut();
|
||||
let rescheduled = tid_drain.reschedule_to_secondary(
|
||||
dest_node_id,
|
||||
tenants,
|
||||
scheduler,
|
||||
nodes,
|
||||
)?;
|
||||
|
||||
let waiter = self.maybe_reconcile_shard(tenant_shard, nodes);
|
||||
if let Some(some) = waiter {
|
||||
waiters.push(some);
|
||||
}
|
||||
if let Some(tenant_shard) = rescheduled {
|
||||
let waiter = self.maybe_configured_reconcile_shard(
|
||||
tenant_shard,
|
||||
nodes,
|
||||
reconciler_config,
|
||||
);
|
||||
if let Some(some) = waiter {
|
||||
waiters.push(some);
|
||||
}
|
||||
}
|
||||
|
||||
last_inspected_shard = Some(*tid);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@ use std::{
|
||||
use crate::{
|
||||
metrics::{self, ReconcileCompleteLabelGroup, ReconcileOutcome},
|
||||
persistence::TenantShardPersistence,
|
||||
reconciler::ReconcileUnits,
|
||||
reconciler::{ReconcileUnits, ReconcilerConfig},
|
||||
scheduler::{AffinityScore, MaySchedule, RefCountUpdate, ScheduleContext},
|
||||
service::ReconcileResultRequest,
|
||||
};
|
||||
@@ -1063,6 +1063,7 @@ impl TenantShard {
|
||||
result_tx: &tokio::sync::mpsc::UnboundedSender<ReconcileResultRequest>,
|
||||
pageservers: &Arc<HashMap<NodeId, Node>>,
|
||||
compute_hook: &Arc<ComputeHook>,
|
||||
reconciler_config: ReconcilerConfig,
|
||||
service_config: &service::Config,
|
||||
persistence: &Arc<Persistence>,
|
||||
units: ReconcileUnits,
|
||||
@@ -1101,6 +1102,7 @@ impl TenantShard {
|
||||
generation: self.generation,
|
||||
intent: reconciler_intent,
|
||||
detach,
|
||||
reconciler_config,
|
||||
config: self.config.clone(),
|
||||
observed: self.observed.clone(),
|
||||
compute_hook: compute_hook.clone(),
|
||||
|
||||
@@ -14,6 +14,7 @@ import textwrap
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
from collections import defaultdict
|
||||
from contextlib import closing, contextmanager
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
@@ -2667,6 +2668,69 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
log.info(f"Got failpoints request response code {res.status_code}")
|
||||
res.raise_for_status()
|
||||
|
||||
def get_tenants_placement(self) -> defaultdict[str, Dict[str, Any]]:
|
||||
"""
|
||||
Get the intent and observed placements of all tenants known to the storage controller.
|
||||
"""
|
||||
tenants = self.tenant_list()
|
||||
|
||||
tenant_placement: defaultdict[str, Dict[str, Any]] = defaultdict(
|
||||
lambda: {
|
||||
"observed": {"attached": None, "secondary": []},
|
||||
"intent": {"attached": None, "secondary": []},
|
||||
}
|
||||
)
|
||||
|
||||
for t in tenants:
|
||||
for node_id, loc_state in t["observed"]["locations"].items():
|
||||
if (
|
||||
loc_state is not None
|
||||
and "conf" in loc_state
|
||||
and loc_state["conf"] is not None
|
||||
and loc_state["conf"]["mode"]
|
||||
in set(["AttachedSingle", "AttachedMulti", "AttachedStale"])
|
||||
):
|
||||
tenant_placement[t["tenant_shard_id"]]["observed"]["attached"] = int(node_id)
|
||||
|
||||
if (
|
||||
loc_state is not None
|
||||
and "conf" in loc_state
|
||||
and loc_state["conf"] is not None
|
||||
and loc_state["conf"]["mode"] == "Secondary"
|
||||
):
|
||||
tenant_placement[t["tenant_shard_id"]]["observed"]["secondary"].append(
|
||||
int(node_id)
|
||||
)
|
||||
|
||||
if "attached" in t["intent"]:
|
||||
tenant_placement[t["tenant_shard_id"]]["intent"]["attached"] = t["intent"][
|
||||
"attached"
|
||||
]
|
||||
|
||||
if "secondary" in t["intent"]:
|
||||
tenant_placement[t["tenant_shard_id"]]["intent"]["secondary"] += t["intent"][
|
||||
"secondary"
|
||||
]
|
||||
|
||||
return tenant_placement
|
||||
|
||||
def warm_up_all_secondaries(self):
|
||||
log.info("Warming up all secondary locations")
|
||||
|
||||
tenant_placement = self.get_tenants_placement()
|
||||
for tid, placement in tenant_placement.items():
|
||||
assert placement["observed"]["attached"] is not None
|
||||
primary_id = placement["observed"]["attached"]
|
||||
|
||||
assert len(placement["observed"]["secondary"]) == 1
|
||||
secondary_id = placement["observed"]["secondary"][0]
|
||||
|
||||
parsed_tid = TenantShardId.parse(tid)
|
||||
self.env.get_pageserver(primary_id).http_client().tenant_heatmap_upload(parsed_tid)
|
||||
self.env.get_pageserver(secondary_id).http_client().tenant_secondary_download(
|
||||
parsed_tid, wait_ms=250
|
||||
)
|
||||
|
||||
@property
|
||||
def workdir(self) -> Path:
|
||||
return self.env.repo_dir
|
||||
|
||||
@@ -361,6 +361,12 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
self.verbose_error(res)
|
||||
return (res.status_code, res.json())
|
||||
|
||||
def tenant_secondary_status(self, tenant_id: Union[TenantId, TenantShardId]):
|
||||
url = f"http://localhost:{self.port}/v1/tenant/{tenant_id}/secondary/status"
|
||||
res = self.get(url)
|
||||
self.verbose_error(res)
|
||||
return res.json()
|
||||
|
||||
def set_tenant_config(self, tenant_id: Union[TenantId, TenantShardId], config: dict[str, Any]):
|
||||
assert "tenant_id" not in config.keys()
|
||||
res = self.put(
|
||||
|
||||
@@ -2,7 +2,6 @@ import concurrent.futures
|
||||
import random
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from typing import Any, Dict
|
||||
|
||||
import pytest
|
||||
from fixtures.common_types import TenantId, TenantShardId, TimelineId
|
||||
@@ -24,51 +23,14 @@ def get_consistent_node_shard_counts(env: NeonEnv, total_shards) -> defaultdict[
|
||||
This function takes into account the intersection of the intent and the observed state.
|
||||
If they do not match, it asserts out.
|
||||
"""
|
||||
tenants = env.storage_controller.tenant_list()
|
||||
|
||||
intent = dict()
|
||||
observed = dict()
|
||||
|
||||
tenant_placement: defaultdict[str, Dict[str, Any]] = defaultdict(
|
||||
lambda: {
|
||||
"observed": {"attached": None, "secondary": []},
|
||||
"intent": {"attached": None, "secondary": []},
|
||||
}
|
||||
)
|
||||
|
||||
for t in tenants:
|
||||
for node_id, loc_state in t["observed"]["locations"].items():
|
||||
if (
|
||||
loc_state is not None
|
||||
and "conf" in loc_state
|
||||
and loc_state["conf"] is not None
|
||||
and loc_state["conf"]["mode"]
|
||||
in set(["AttachedSingle", "AttachedMulti", "AttachedStale"])
|
||||
):
|
||||
observed[t["tenant_shard_id"]] = int(node_id)
|
||||
tenant_placement[t["tenant_shard_id"]]["observed"]["attached"] = int(node_id)
|
||||
|
||||
if (
|
||||
loc_state is not None
|
||||
and "conf" in loc_state
|
||||
and loc_state["conf"] is not None
|
||||
and loc_state["conf"]["mode"] == "Secondary"
|
||||
):
|
||||
tenant_placement[t["tenant_shard_id"]]["observed"]["secondary"].append(int(node_id))
|
||||
|
||||
if "attached" in t["intent"]:
|
||||
intent[t["tenant_shard_id"]] = t["intent"]["attached"]
|
||||
tenant_placement[t["tenant_shard_id"]]["intent"]["attached"] = t["intent"]["attached"]
|
||||
|
||||
if "secondary" in t["intent"]:
|
||||
tenant_placement[t["tenant_shard_id"]]["intent"]["secondary"] += t["intent"][
|
||||
"secondary"
|
||||
]
|
||||
|
||||
tenant_placement = env.storage_controller.get_tenants_placement()
|
||||
log.info(f"{tenant_placement=}")
|
||||
|
||||
matching = {
|
||||
tid: intent[tid] for tid in observed if tid in intent and intent[tid] == observed[tid]
|
||||
tid: tenant_placement[tid]["intent"]["attached"]
|
||||
for tid in tenant_placement
|
||||
if tenant_placement[tid]["intent"]["attached"]
|
||||
== tenant_placement[tid]["observed"]["attached"]
|
||||
}
|
||||
assert len(matching) == total_shards
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@ from fixtures.neon_fixtures import (
|
||||
PgBin,
|
||||
StorageControllerApiException,
|
||||
TokenScope,
|
||||
last_flush_lsn_upload,
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from fixtures.pageserver.utils import (
|
||||
@@ -1597,6 +1598,8 @@ def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# Perform a graceful rolling restart
|
||||
for ps in env.pageservers:
|
||||
env.storage_controller.warm_up_all_secondaries()
|
||||
|
||||
env.storage_controller.retryable_node_operation(
|
||||
lambda ps_id: env.storage_controller.node_drain(ps_id), ps.id, max_attempts=3, backoff=2
|
||||
)
|
||||
@@ -1645,6 +1648,115 @@ def test_graceful_cluster_restart(neon_env_builder: NeonEnvBuilder):
|
||||
assert_shard_counts_balanced(env, shard_counts, total_shards)
|
||||
|
||||
|
||||
def test_skip_drain_on_secondary_lag(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
"""
|
||||
Artificially make a tenant shard's secondary location lag behind the primary
|
||||
and check that storage controller driven node drains skip the lagging tenant shard.
|
||||
Finally, validate that the tenant shard is migrated when a new drain request comes
|
||||
in and it's no longer lagging.
|
||||
"""
|
||||
neon_env_builder.num_pageservers = 2
|
||||
neon_env_builder.storage_controller_config = {
|
||||
"max_secondary_lag_bytes": 1 * 1024 * 1024,
|
||||
}
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
tid, timeline_id = env.neon_cli.create_tenant(placement_policy='{"Attached":1}')
|
||||
|
||||
# Give things a chance to settle.
|
||||
env.storage_controller.reconcile_until_idle(timeout_secs=30)
|
||||
|
||||
locations = env.storage_controller.locate(tid)
|
||||
assert len(locations) == 1
|
||||
primary: int = locations[0]["node_id"]
|
||||
not_primary = [ps.id for ps in env.pageservers if ps.id != primary]
|
||||
assert len(not_primary) == 1
|
||||
secondary = not_primary[0]
|
||||
|
||||
log.info(f"Paused secondary downloads on {secondary}")
|
||||
env.get_pageserver(secondary).http_client().configure_failpoints(
|
||||
("secondary-layer-download-pausable", "pause")
|
||||
)
|
||||
|
||||
log.info(f"Ingesting some data for {tid}")
|
||||
|
||||
with env.endpoints.create_start("main", tenant_id=tid) as endpoint:
|
||||
run_pg_bench_small(pg_bin, endpoint.connstr())
|
||||
endpoint.safe_psql("CREATE TABLE created_foo(id integer);")
|
||||
last_flush_lsn_upload(env, endpoint, tid, timeline_id)
|
||||
|
||||
log.info(f"Uploading heatmap from {primary} and requesting download from {secondary}")
|
||||
|
||||
env.get_pageserver(primary).http_client().tenant_heatmap_upload(tid)
|
||||
env.get_pageserver(secondary).http_client().tenant_secondary_download(tid, wait_ms=100)
|
||||
|
||||
def secondary_is_lagging():
|
||||
resp = env.get_pageserver(secondary).http_client().tenant_secondary_status(tid)
|
||||
lag = resp["bytes_total"] - resp["bytes_downloaded"]
|
||||
|
||||
if lag <= 1 * 1024 * 1024:
|
||||
raise Exception(f"Secondary lag not big enough: {lag}")
|
||||
|
||||
log.info(f"Looking for lag to develop on the secondary {secondary}")
|
||||
wait_until(10, 1, secondary_is_lagging)
|
||||
|
||||
log.info(f"Starting drain of primary {primary} with laggy secondary {secondary}")
|
||||
env.storage_controller.retryable_node_operation(
|
||||
lambda ps_id: env.storage_controller.node_drain(ps_id), primary, max_attempts=3, backoff=2
|
||||
)
|
||||
|
||||
env.storage_controller.poll_node_status(
|
||||
primary,
|
||||
PageserverAvailability.ACTIVE,
|
||||
PageserverSchedulingPolicy.PAUSE_FOR_RESTART,
|
||||
max_attempts=6,
|
||||
backoff=5,
|
||||
)
|
||||
|
||||
locations = env.storage_controller.locate(tid)
|
||||
assert len(locations) == 1
|
||||
assert locations[0]["node_id"] == primary
|
||||
|
||||
log.info(f"Unpausing secondary downloads on {secondary}")
|
||||
env.get_pageserver(secondary).http_client().configure_failpoints(
|
||||
("secondary-layer-download-pausable", "off")
|
||||
)
|
||||
env.get_pageserver(secondary).http_client().tenant_secondary_download(tid, wait_ms=100)
|
||||
|
||||
log.info(f"Waiting for lag to reduce on {secondary}")
|
||||
|
||||
def lag_is_acceptable():
|
||||
resp = env.get_pageserver(secondary).http_client().tenant_secondary_status(tid)
|
||||
lag = resp["bytes_total"] - resp["bytes_downloaded"]
|
||||
|
||||
if lag > 1 * 1024 * 1024:
|
||||
raise Exception(f"Secondary lag not big enough: {lag}")
|
||||
|
||||
wait_until(10, 1, lag_is_acceptable)
|
||||
|
||||
env.storage_controller.node_configure(primary, {"scheduling": "Active"})
|
||||
|
||||
log.info(f"Starting drain of primary {primary} with non-laggy secondary {secondary}")
|
||||
|
||||
env.storage_controller.retryable_node_operation(
|
||||
lambda ps_id: env.storage_controller.node_drain(ps_id), primary, max_attempts=3, backoff=2
|
||||
)
|
||||
|
||||
env.storage_controller.poll_node_status(
|
||||
primary,
|
||||
PageserverAvailability.ACTIVE,
|
||||
PageserverSchedulingPolicy.PAUSE_FOR_RESTART,
|
||||
max_attempts=6,
|
||||
backoff=5,
|
||||
)
|
||||
|
||||
locations = env.storage_controller.locate(tid)
|
||||
assert len(locations) == 1
|
||||
assert locations[0]["node_id"] == secondary
|
||||
|
||||
|
||||
def test_background_operation_cancellation(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_pageservers = 2
|
||||
env = neon_env_builder.init_configs()
|
||||
@@ -1671,6 +1783,7 @@ def test_background_operation_cancellation(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
ps_id_to_drain = env.pageservers[0].id
|
||||
|
||||
env.storage_controller.warm_up_all_secondaries()
|
||||
env.storage_controller.retryable_node_operation(
|
||||
lambda ps_id: env.storage_controller.node_drain(ps_id),
|
||||
ps_id_to_drain,
|
||||
|
||||
Reference in New Issue
Block a user