Merge remote-tracking branch 'origin/main' into HEAD

This commit is contained in:
Heikki Linnakangas
2025-07-20 00:58:57 +03:00
70 changed files with 2396 additions and 1148 deletions

View File

@@ -52,6 +52,7 @@ tokio-rustls.workspace = true
tokio-util.workspace = true
tokio.workspace = true
tracing.workspace = true
uuid.workspace = true
measured.workspace = true
rustls.workspace = true
scopeguard.workspace = true
@@ -63,6 +64,7 @@ tokio-postgres-rustls.workspace = true
diesel = { version = "2.2.6", features = [
"serde_json",
"chrono",
"uuid",
] }
diesel-async = { version = "0.5.2", features = ["postgres", "bb8", "async-connection-wrapper"] }
diesel_migrations = { version = "2.2.0" }

View File

@@ -0,0 +1,2 @@
DROP TABLE hadron_safekeepers;
DROP TABLE hadron_timeline_safekeepers;

View File

@@ -0,0 +1,17 @@
-- hadron_safekeepers keep track of all Safe Keeper nodes that exist in the system.
-- Upon startup, each Safe Keeper reaches out to the hadron cluster coordinator to register its node ID and listen addresses.
CREATE TABLE hadron_safekeepers (
sk_node_id BIGINT PRIMARY KEY NOT NULL,
listen_http_addr VARCHAR NOT NULL,
listen_http_port INTEGER NOT NULL,
listen_pg_addr VARCHAR NOT NULL,
listen_pg_port INTEGER NOT NULL
);
CREATE TABLE hadron_timeline_safekeepers (
timeline_id VARCHAR NOT NULL,
sk_node_id BIGINT NOT NULL,
legacy_endpoint_id UUID DEFAULT NULL,
PRIMARY KEY(timeline_id, sk_node_id)
);

View File

@@ -1,4 +1,5 @@
use utils::auth::{AuthError, Claims, Scope};
use uuid::Uuid;
pub fn check_permission(claims: &Claims, required_scope: Scope) -> Result<(), AuthError> {
if claims.scope != required_scope {
@@ -7,3 +8,14 @@ pub fn check_permission(claims: &Claims, required_scope: Scope) -> Result<(), Au
Ok(())
}
#[allow(dead_code)]
pub fn check_endpoint_permission(claims: &Claims, endpoint_id: Uuid) -> Result<(), AuthError> {
if claims.scope != Scope::TenantEndpoint {
return Err(AuthError("Scope mismatch. Permission denied".into()));
}
if claims.endpoint_id != Some(endpoint_id) {
return Err(AuthError("Endpoint id mismatch. Permission denied".into()));
}
Ok(())
}

View File

@@ -5,6 +5,8 @@ use std::sync::Arc;
use std::time::Duration;
use anyhow::Context;
use compute_api::spec::PageserverProtocol;
use compute_api::spec::PageserverShardInfo;
use control_plane::endpoint::{
ComputeControlPlane, EndpointStatus, PageserverConnectionInfo, PageserverShardConnectionInfo,
};
@@ -13,7 +15,7 @@ use futures::StreamExt;
use hyper::StatusCode;
use pageserver_api::config::DEFAULT_GRPC_LISTEN_PORT;
use pageserver_api::controller_api::AvailabilityZone;
use pageserver_api::shard::{ShardCount, ShardNumber, ShardStripeSize, TenantShardId};
use pageserver_api::shard::{ShardCount, ShardIndex, ShardNumber, ShardStripeSize, TenantShardId};
use postgres_connection::parse_host_port;
use safekeeper_api::membership::SafekeeperGeneration;
use serde::{Deserialize, Serialize};
@@ -507,7 +509,16 @@ impl ApiMethod for ComputeHookTenant {
if endpoint.tenant_id == *tenant_id && endpoint.status() == EndpointStatus::Running {
tracing::info!("Reconfiguring pageservers for endpoint {endpoint_name}");
let mut shard_conninfos = HashMap::new();
let shard_count = ShardCount(shards.len().try_into().expect("too many shards"));
let mut shard_infos: HashMap<ShardIndex, PageserverShardInfo> = HashMap::new();
let prefer_protocol = if endpoint.grpc {
PageserverProtocol::Grpc
} else {
PageserverProtocol::Libpq
};
for shard in shards.iter() {
let ps_conf = env
.get_pageserver_conf(shard.node_id)
@@ -528,19 +539,31 @@ impl ApiMethod for ComputeHookTenant {
None
};
let pageserver = PageserverShardConnectionInfo {
id: Some(shard.node_id.to_string()),
libpq_url,
grpc_url,
};
shard_conninfos.insert(shard.shard_number.0 as u32, pageserver);
let shard_info = PageserverShardInfo {
pageservers: vec![pageserver],
};
shard_infos.insert(
ShardIndex {
shard_number: shard.shard_number,
shard_count,
},
shard_info,
);
}
let pageserver_conninfo = PageserverConnectionInfo {
shards: shard_conninfos,
prefer_grpc: endpoint.grpc,
shard_count: ShardCount::unsharded(),
stripe_size: stripe_size.map(|val| val.0),
shards: shard_infos,
prefer_protocol,
};
endpoint
.reconfigure_pageservers(pageserver_conninfo, *stripe_size)
.reconfigure_pageservers(&pageserver_conninfo)
.await
.map_err(NotifyError::NeonLocal)?;
}
@@ -824,6 +847,7 @@ impl ComputeHook {
let send_locked = tokio::select! {
guard = send_lock.lock_owned() => {guard},
_ = cancel.cancelled() => {
tracing::info!("Notification cancelled while waiting for lock");
return Err(NotifyError::ShuttingDown)
}
};
@@ -865,11 +889,32 @@ impl ComputeHook {
let notify_url = compute_hook_url.as_ref().unwrap();
self.do_notify(notify_url, &request, cancel).await
} else {
self.do_notify_local::<M>(&request).await.map_err(|e| {
match self.do_notify_local::<M>(&request).await.map_err(|e| {
// This path is for testing only, so munge the error into our prod-style error type.
tracing::error!("neon_local notification hook failed: {e}");
NotifyError::Fatal(StatusCode::INTERNAL_SERVER_ERROR)
})
if e.to_string().contains("refresh-configuration-pending") {
// If the error message mentions "refresh-configuration-pending", it means the compute node
// rejected our notification request because it already trying to reconfigure itself. We
// can proceed with the rest of the reconcliation process as the compute node already
// discovers the need to reconfigure and will eventually update its configuration once
// we update the pageserver mappings. In fact, it is important that we continue with
// reconcliation to make sure we update the pageserver mappings to unblock the compute node.
tracing::info!("neon_local notification hook failed: {e}");
tracing::info!("Notification failed likely due to compute node self-reconfiguration, will retry.");
Ok(())
} else {
tracing::error!("neon_local notification hook failed: {e}");
Err(NotifyError::Fatal(StatusCode::INTERNAL_SERVER_ERROR))
}
}) {
// Compute node accepted the notification request. Ok to proceed.
Ok(_) => Ok(()),
// Compute node rejected our request but it is already self-reconfiguring. Ok to proceed.
Err(Ok(_)) => Ok(()),
// Fail the reconciliation attempt in all other cases. Recall that this whole code path involving
// neon_local is for testing only. In production we always retry failed reconcliations so we
// don't have any deadends here.
Err(Err(e)) => Err(e),
}
};
match result {

View File

@@ -0,0 +1,44 @@
use std::collections::BTreeMap;
use rand::Rng;
use utils::shard::TenantShardId;
static CHARSET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*()";
/// Generate a random string of `length` that can be used as a password. The generated string
/// contains alphanumeric characters and special characters (!@#$%^&*())
pub fn generate_random_password(length: usize) -> String {
let mut rng = rand::thread_rng();
(0..length)
.map(|_| {
let idx = rng.gen_range(0..CHARSET.len());
CHARSET[idx] as char
})
.collect()
}
pub(crate) struct TenantShardSizeMap {
#[expect(dead_code)]
pub map: BTreeMap<TenantShardId, u64>,
}
impl TenantShardSizeMap {
pub fn new(map: BTreeMap<TenantShardId, u64>) -> Self {
Self { map }
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_generate_random_password() {
let pwd1 = generate_random_password(10);
assert_eq!(pwd1.len(), 10);
let pwd2 = generate_random_password(10);
assert_ne!(pwd1, pwd2);
assert!(pwd1.chars().all(|c| CHARSET.contains(&(c as u8))));
assert!(pwd2.chars().all(|c| CHARSET.contains(&(c as u8))));
}
}

View File

@@ -48,7 +48,10 @@ use crate::metrics::{
};
use crate::persistence::SafekeeperUpsert;
use crate::reconciler::ReconcileError;
use crate::service::{LeadershipStatus, RECONCILE_TIMEOUT, STARTUP_RECONCILE_TIMEOUT, Service};
use crate::service::{
LeadershipStatus, RECONCILE_TIMEOUT, STARTUP_RECONCILE_TIMEOUT, Service,
TenantMutationLocations,
};
/// State available to HTTP request handlers
pub struct HttpState {
@@ -734,77 +737,104 @@ async fn handle_tenant_timeline_passthrough(
path
);
// Find the node that holds shard zero
let (node, tenant_shard_id, consistent) = if tenant_or_shard_id.is_unsharded() {
service
let tenant_shard_id = if tenant_or_shard_id.is_unsharded() {
// If the request contains only tenant ID, find the node that holds shard zero
let (_, shard_id) = service
.tenant_shard0_node(tenant_or_shard_id.tenant_id)
.await?
.await?;
shard_id
} else {
let (node, consistent) = service.tenant_shard_node(tenant_or_shard_id).await?;
(node, tenant_or_shard_id, consistent)
tenant_or_shard_id
};
// Callers will always pass an unsharded tenant ID. Before proxying, we must
// rewrite this to a shard-aware shard zero ID.
let path = format!("{path}");
let tenant_str = tenant_or_shard_id.tenant_id.to_string();
let tenant_shard_str = format!("{tenant_shard_id}");
let path = path.replace(&tenant_str, &tenant_shard_str);
let service_inner = service.clone();
let latency = &METRICS_REGISTRY
.metrics_group
.storage_controller_passthrough_request_latency;
service.tenant_shard_remote_mutation(tenant_shard_id, |locations| async move {
let TenantMutationLocations(locations) = locations;
if locations.is_empty() {
return Err(ApiError::NotFound(anyhow::anyhow!("Tenant {} not found", tenant_or_shard_id.tenant_id).into()));
}
let path_label = path_without_ids(&path)
.split('/')
.filter(|token| !token.is_empty())
.collect::<Vec<_>>()
.join("_");
let labels = PageserverRequestLabelGroup {
pageserver_id: &node.get_id().to_string(),
path: &path_label,
method: crate::metrics::Method::Get,
};
let (tenant_or_shard_id, locations) = locations.into_iter().next().unwrap();
let node = locations.latest.node;
let _timer = latency.start_timer(labels.clone());
// Callers will always pass an unsharded tenant ID. Before proxying, we must
// rewrite this to a shard-aware shard zero ID.
let path = format!("{path}");
let tenant_str = tenant_or_shard_id.tenant_id.to_string();
let tenant_shard_str = format!("{tenant_shard_id}");
let path = path.replace(&tenant_str, &tenant_shard_str);
let client = mgmt_api::Client::new(
service.get_http_client().clone(),
node.base_url(),
service.get_config().pageserver_jwt_token.as_deref(),
);
let resp = client.op_raw(method, path).await.map_err(|e|
// We return 503 here because if we can't successfully send a request to the pageserver,
// either we aren't available or the pageserver is unavailable.
ApiError::ResourceUnavailable(format!("Error sending pageserver API request to {node}: {e}").into()))?;
if !resp.status().is_success() {
let error_counter = &METRICS_REGISTRY
let latency = &METRICS_REGISTRY
.metrics_group
.storage_controller_passthrough_request_error;
error_counter.inc(labels);
}
.storage_controller_passthrough_request_latency;
// Transform 404 into 503 if we raced with a migration
if resp.status() == reqwest::StatusCode::NOT_FOUND && !consistent {
// Rather than retry here, send the client a 503 to prompt a retry: this matches
// the pageserver's use of 503, and all clients calling this API should retry on 503.
return Err(ApiError::ResourceUnavailable(
format!("Pageserver {node} returned 404 due to ongoing migration, retry later").into(),
));
}
let path_label = path_without_ids(&path)
.split('/')
.filter(|token| !token.is_empty())
.collect::<Vec<_>>()
.join("_");
let labels = PageserverRequestLabelGroup {
pageserver_id: &node.get_id().to_string(),
path: &path_label,
method: crate::metrics::Method::Get,
};
// We have a reqest::Response, would like a http::Response
let mut builder = hyper::Response::builder().status(map_reqwest_hyper_status(resp.status())?);
for (k, v) in resp.headers() {
builder = builder.header(k.as_str(), v.as_bytes());
}
let _timer = latency.start_timer(labels.clone());
let response = builder
.body(Body::wrap_stream(resp.bytes_stream()))
.map_err(|e| ApiError::InternalServerError(e.into()))?;
let client = mgmt_api::Client::new(
service_inner.get_http_client().clone(),
node.base_url(),
service_inner.get_config().pageserver_jwt_token.as_deref(),
);
let resp = client.op_raw(method, path).await.map_err(|e|
// We return 503 here because if we can't successfully send a request to the pageserver,
// either we aren't available or the pageserver is unavailable.
ApiError::ResourceUnavailable(format!("Error sending pageserver API request to {node}: {e}").into()))?;
Ok(response)
if !resp.status().is_success() {
let error_counter = &METRICS_REGISTRY
.metrics_group
.storage_controller_passthrough_request_error;
error_counter.inc(labels);
}
let resp_staus = resp.status();
// We have a reqest::Response, would like a http::Response
let mut builder = hyper::Response::builder().status(map_reqwest_hyper_status(resp_staus)?);
for (k, v) in resp.headers() {
builder = builder.header(k.as_str(), v.as_bytes());
}
let resp_bytes = resp
.bytes()
.await
.map_err(|e| ApiError::InternalServerError(e.into()))?;
// Inspect 404 errors: at this point, we know that the tenant exists, but the pageserver we route
// the request to might not yet be ready. Therefore, if it is a _tenant_ not found error, we can
// convert it into a 503. TODO: we should make this part of the check in `tenant_shard_remote_mutation`.
// However, `tenant_shard_remote_mutation` currently cannot inspect the HTTP error response body,
// so we have to do it here instead.
if resp_staus == reqwest::StatusCode::NOT_FOUND {
let resp_str = std::str::from_utf8(&resp_bytes)
.map_err(|e| ApiError::InternalServerError(e.into()))?;
// We only handle "tenant not found" errors; other 404s like timeline not found should
// be forwarded as-is.
if Service::is_tenant_not_found_error(resp_str, tenant_or_shard_id.tenant_id) {
// Rather than retry here, send the client a 503 to prompt a retry: this matches
// the pageserver's use of 503, and all clients calling this API should retry on 503.
return Err(ApiError::ResourceUnavailable(
format!(
"Pageserver {node} returned tenant 404 due to ongoing migration, retry later"
)
.into(),
));
}
}
let response = builder
.body(Body::from(resp_bytes))
.map_err(|e| ApiError::InternalServerError(e.into()))?;
Ok(response)
}).await?
}
async fn handle_tenant_locate(
@@ -1085,9 +1115,10 @@ async fn handle_node_delete(req: Request<Body>) -> Result<Response<Body>, ApiErr
let state = get_state(&req);
let node_id: NodeId = parse_request_param(&req, "node_id")?;
let force: bool = parse_query_param(&req, "force")?.unwrap_or(false);
json_response(
StatusCode::OK,
state.service.start_node_delete(node_id).await?,
state.service.start_node_delete(node_id, force).await?,
)
}

View File

@@ -6,6 +6,7 @@ extern crate hyper0 as hyper;
mod auth;
mod background_node_operations;
mod compute_hook;
pub mod hadron_utils;
mod heartbeater;
pub mod http;
mod id_lock_map;

View File

@@ -76,8 +76,8 @@ pub(crate) struct StorageControllerMetricGroup {
/// How many shards would like to reconcile but were blocked by concurrency limits
pub(crate) storage_controller_pending_reconciles: measured::Gauge,
/// How many shards are keep-failing and will be ignored when considering to run optimizations
pub(crate) storage_controller_keep_failing_reconciles: measured::Gauge,
/// How many shards are stuck and will be ignored when considering to run optimizations
pub(crate) storage_controller_stuck_reconciles: measured::Gauge,
/// HTTP request status counters for handled requests
pub(crate) storage_controller_http_request_status:
@@ -151,6 +151,29 @@ pub(crate) struct StorageControllerMetricGroup {
/// Indicator of completed safekeeper reconciles, broken down by safekeeper.
pub(crate) storage_controller_safekeeper_reconciles_complete:
measured::CounterVec<SafekeeperReconcilerLabelGroupSet>,
/* BEGIN HADRON */
/// Hadron `config_watcher` reconciliation runs completed, broken down by success/failure.
pub(crate) storage_controller_config_watcher_complete:
measured::CounterVec<ConfigWatcherCompleteLabelGroupSet>,
/// Hadron long waits for node state changes during drain and fill.
pub(crate) storage_controller_drain_and_fill_long_waits: measured::Counter,
/// Set to 1 if we detect any page server pods with pending node pool rotation annotations.
/// Requires manual reset after oncall investigation.
pub(crate) storage_controller_ps_node_pool_rotation_pending: measured::Gauge,
/// Hadron storage scrubber status.
pub(crate) storage_controller_storage_scrub_status:
measured::CounterVec<StorageScrubberLabelGroupSet>,
/// Desired number of pageservers managed by the storage controller
pub(crate) storage_controller_num_pageservers_desired: measured::Gauge,
/// Desired number of safekeepers managed by the storage controller
pub(crate) storage_controller_num_safekeeper_desired: measured::Gauge,
/* END HADRON */
}
impl StorageControllerMetrics {
@@ -173,6 +196,10 @@ impl Default for StorageControllerMetrics {
.storage_controller_reconcile_complete
.init_all_dense();
metrics_group
.storage_controller_config_watcher_complete
.init_all_dense();
Self {
metrics_group,
encoder: Mutex::new(measured::text::BufferedTextEncoder::new()),
@@ -262,11 +289,48 @@ pub(crate) struct ReconcileLongRunningLabelGroup<'a> {
pub(crate) sequence: &'a str,
}
#[derive(measured::LabelGroup, Clone)]
#[label(set = StorageScrubberLabelGroupSet)]
pub(crate) struct StorageScrubberLabelGroup<'a> {
#[label(dynamic_with = lasso::ThreadedRodeo, default)]
pub(crate) tenant_id: &'a str,
#[label(dynamic_with = lasso::ThreadedRodeo, default)]
pub(crate) shard_number: &'a str,
#[label(dynamic_with = lasso::ThreadedRodeo, default)]
pub(crate) timeline_id: &'a str,
pub(crate) outcome: StorageScrubberOutcome,
}
#[derive(FixedCardinalityLabel, Clone, Copy)]
pub(crate) enum StorageScrubberOutcome {
PSOk,
PSWarning,
PSError,
PSOrphan,
SKOk,
SKError,
}
#[derive(measured::LabelGroup)]
#[label(set = ConfigWatcherCompleteLabelGroupSet)]
pub(crate) struct ConfigWatcherCompleteLabelGroup {
// Reuse the ReconcileOutcome from the SC's reconciliation metrics.
pub(crate) status: ReconcileOutcome,
}
#[derive(FixedCardinalityLabel, Clone, Copy)]
pub(crate) enum ReconcileOutcome {
// Successfully reconciled everything.
#[label(rename = "ok")]
Success,
// Used by tenant-shard reconciler only. Reconciled pageserver state successfully,
// but failed to delivery the compute notificiation. This error is typically transient
// but if its occurance keeps increasing, it should be investigated.
#[label(rename = "ok_no_notify")]
SuccessNoNotify,
// We failed to reconcile some state and the reconcilation will be retried.
Error,
// Reconciliation was cancelled.
Cancel,
}

View File

@@ -51,6 +51,39 @@ pub(crate) struct Node {
cancel: CancellationToken,
}
#[allow(dead_code)]
const ONE_MILLION: i64 = 1000000;
// Converts a pool ID to a large number that can be used to assign unique IDs to pods in StatefulSets.
/// For example, if pool_id is 1, then the pods have NodeIds 1000000, 1000001, 1000002, etc.
/// If pool_id is None, then the pods have NodeIds 0, 1, 2, etc.
#[allow(dead_code)]
pub fn transform_pool_id(pool_id: Option<i32>) -> i64 {
match pool_id {
Some(id) => (id as i64) * ONE_MILLION,
None => 0,
}
}
#[allow(dead_code)]
pub fn get_pool_id_from_node_id(node_id: i64) -> i32 {
(node_id / ONE_MILLION) as i32
}
/// Example pod name: page-server-0-1, safe-keeper-1-0
#[allow(dead_code)]
pub fn get_node_id_from_pod_name(pod_name: &str) -> anyhow::Result<NodeId> {
let parts: Vec<&str> = pod_name.split('-').collect();
if parts.len() != 4 {
return Err(anyhow::anyhow!("Invalid pod name: {}", pod_name));
}
let pool_id = parts[2].parse::<i32>()?;
let node_offset = parts[3].parse::<i64>()?;
let node_id = transform_pool_id(Some(pool_id)) + node_offset;
Ok(NodeId(node_id as u64))
}
/// When updating [`Node::availability`] we use this type to indicate to the caller
/// whether/how they changed it.
pub(crate) enum AvailabilityTransition {
@@ -403,3 +436,25 @@ impl std::fmt::Debug for Node {
write!(f, "{} ({})", self.id, self.listen_http_addr)
}
}
#[cfg(test)]
mod tests {
use utils::id::NodeId;
use crate::node::get_node_id_from_pod_name;
#[test]
fn test_get_node_id_from_pod_name() {
let pod_name = "page-server-3-12";
let node_id = get_node_id_from_pod_name(pod_name).unwrap();
assert_eq!(node_id, NodeId(3000012));
let pod_name = "safe-keeper-1-0";
let node_id = get_node_id_from_pod_name(pod_name).unwrap();
assert_eq!(node_id, NodeId(1000000));
let pod_name = "invalid-pod-name";
let result = get_node_id_from_pod_name(pod_name);
assert!(result.is_err());
}
}

View File

@@ -14,6 +14,8 @@ use reqwest::StatusCode;
use utils::id::{NodeId, TenantId, TimelineId};
use utils::lsn::Lsn;
use crate::hadron_utils::TenantShardSizeMap;
/// Thin wrapper around [`pageserver_client::mgmt_api::Client`]. It allows the storage
/// controller to collect metrics in a non-intrusive manner.
#[derive(Debug, Clone)]
@@ -86,6 +88,31 @@ impl PageserverClient {
)
}
#[expect(dead_code)]
pub(crate) async fn tenant_timeline_compact(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
force_image_layer_creation: bool,
wait_until_done: bool,
) -> Result<()> {
measured_request!(
"tenant_timeline_compact",
crate::metrics::Method::Put,
&self.node_id_label,
self.inner
.tenant_timeline_compact(
tenant_shard_id,
timeline_id,
force_image_layer_creation,
true,
false,
wait_until_done,
)
.await
)
}
/* BEGIN_HADRON */
pub(crate) async fn tenant_timeline_describe(
&self,
@@ -101,6 +128,17 @@ impl PageserverClient {
.await
)
}
#[expect(dead_code)]
pub(crate) async fn list_tenant_visible_size(&self) -> Result<TenantShardSizeMap> {
measured_request!(
"list_tenant_visible_size",
crate::metrics::Method::Get,
&self.node_id_label,
self.inner.list_tenant_visible_size().await
)
.map(TenantShardSizeMap::new)
}
/* END_HADRON */
pub(crate) async fn tenant_scan_remote_storage(
@@ -365,6 +403,16 @@ impl PageserverClient {
)
}
#[expect(dead_code)]
pub(crate) async fn reset_alert_gauges(&self) -> Result<()> {
measured_request!(
"reset_alert_gauges",
crate::metrics::Method::Post,
&self.node_id_label,
self.inner.reset_alert_gauges().await
)
}
pub(crate) async fn wait_lsn(
&self,
tenant_shard_id: TenantShardId,

View File

@@ -862,11 +862,11 @@ impl Reconciler {
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
if refreshed {
tracing::info!(
node_id=%node.get_id(), "Observed configuration correct after refresh. Notifying compute.");
node_id=%node.get_id(), "[Attached] Observed configuration correct after refresh. Notifying compute.");
self.compute_notify().await?;
} else {
// Nothing to do
tracing::info!(node_id=%node.get_id(), "Observed configuration already correct.");
tracing::info!(node_id=%node.get_id(), "[Attached] Observed configuration already correct.");
}
}
observed => {
@@ -945,17 +945,17 @@ impl Reconciler {
match self.observed.locations.get(&node.get_id()) {
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
// Nothing to do
tracing::info!(node_id=%node.get_id(), "Observed configuration already correct.")
tracing::info!(node_id=%node.get_id(), "[Secondary] Observed configuration already correct.")
}
_ => {
// Only try and configure secondary locations on nodes that are available. This
// allows the reconciler to "succeed" while some secondaries are offline (e.g. after
// a node failure, where the failed node will have a secondary intent)
if node.is_available() {
tracing::info!(node_id=%node.get_id(), "Observed configuration requires update.");
tracing::info!(node_id=%node.get_id(), "[Secondary] Observed configuration requires update.");
changes.push((node.clone(), wanted_conf))
} else {
tracing::info!(node_id=%node.get_id(), "Skipping configuration as secondary, node is unavailable");
tracing::info!(node_id=%node.get_id(), "[Secondary] Skipping configuration as secondary, node is unavailable");
self.observed
.locations
.insert(node.get_id(), ObservedStateLocation { conf: None });
@@ -1066,6 +1066,9 @@ impl Reconciler {
}
result
} else {
tracing::info!(
"Compute notification is skipped because the tenant shard does not have an attached (primary) location"
);
Ok(())
}
}

View File

@@ -13,6 +13,24 @@ diesel::table! {
}
}
diesel::table! {
hadron_safekeepers (sk_node_id) {
sk_node_id -> Int8,
listen_http_addr -> Varchar,
listen_http_port -> Int4,
listen_pg_addr -> Varchar,
listen_pg_port -> Int4,
}
}
diesel::table! {
hadron_timeline_safekeepers (timeline_id, sk_node_id) {
timeline_id -> Varchar,
sk_node_id -> Int8,
legacy_endpoint_id -> Nullable<Uuid>,
}
}
diesel::table! {
metadata_health (tenant_id, shard_number, shard_count) {
tenant_id -> Varchar,
@@ -105,6 +123,8 @@ diesel::table! {
diesel::allow_tables_to_appear_in_same_query!(
controllers,
hadron_safekeepers,
hadron_timeline_safekeepers,
metadata_health,
nodes,
safekeeper_timeline_pending_ops,

View File

@@ -207,34 +207,13 @@ enum ShardGenerationValidity {
},
}
/// We collect the state of attachments for some operations to determine if the operation
/// needs to be retried when it fails.
struct TenantShardAttachState {
/// The targets of the operation.
///
/// Tenant shard ID, node ID, node, is intent node observed primary.
targets: Vec<(TenantShardId, NodeId, Node, bool)>,
/// The targets grouped by node ID.
by_node_id: HashMap<NodeId, (TenantShardId, Node, bool)>,
}
impl TenantShardAttachState {
fn for_api_call(&self) -> Vec<(TenantShardId, Node)> {
self.targets
.iter()
.map(|(tenant_shard_id, _, node, _)| (*tenant_shard_id, node.clone()))
.collect()
}
}
pub const RECONCILER_CONCURRENCY_DEFAULT: usize = 128;
pub const PRIORITY_RECONCILER_CONCURRENCY_DEFAULT: usize = 256;
pub const SAFEKEEPER_RECONCILER_CONCURRENCY_DEFAULT: usize = 32;
// Number of consecutive reconciliation errors, occured for one shard,
// Number of consecutive reconciliations that have occurred for one shard,
// after which the shard is ignored when considering to run optimizations.
const MAX_CONSECUTIVE_RECONCILIATION_ERRORS: usize = 5;
const MAX_CONSECUTIVE_RECONCILES: usize = 10;
// Depth of the channel used to enqueue shards for reconciliation when they can't do it immediately.
// This channel is finite-size to avoid using excessive memory if we get into a state where reconciles are finishing more slowly
@@ -719,47 +698,70 @@ pub(crate) enum ReconcileResultRequest {
}
#[derive(Clone)]
struct MutationLocation {
node: Node,
generation: Generation,
pub(crate) struct MutationLocation {
pub(crate) node: Node,
pub(crate) generation: Generation,
}
#[derive(Clone)]
struct ShardMutationLocations {
latest: MutationLocation,
other: Vec<MutationLocation>,
pub(crate) struct ShardMutationLocations {
pub(crate) latest: MutationLocation,
pub(crate) other: Vec<MutationLocation>,
}
#[derive(Default, Clone)]
struct TenantMutationLocations(BTreeMap<TenantShardId, ShardMutationLocations>);
pub(crate) struct TenantMutationLocations(pub BTreeMap<TenantShardId, ShardMutationLocations>);
struct ReconcileAllResult {
spawned_reconciles: usize,
keep_failing_reconciles: usize,
stuck_reconciles: usize,
has_delayed_reconciles: bool,
}
impl ReconcileAllResult {
fn new(
spawned_reconciles: usize,
keep_failing_reconciles: usize,
stuck_reconciles: usize,
has_delayed_reconciles: bool,
) -> Self {
assert!(
spawned_reconciles >= keep_failing_reconciles,
"It is impossible to have more keep-failing reconciles than spawned reconciles"
spawned_reconciles >= stuck_reconciles,
"It is impossible to have less spawned reconciles than stuck reconciles"
);
Self {
spawned_reconciles,
keep_failing_reconciles,
stuck_reconciles,
has_delayed_reconciles,
}
}
/// We can run optimizations only if we don't have any delayed reconciles and
/// all spawned reconciles are also keep-failing reconciles.
/// all spawned reconciles are also stuck reconciles.
fn can_run_optimizations(&self) -> bool {
!self.has_delayed_reconciles && self.spawned_reconciles == self.keep_failing_reconciles
!self.has_delayed_reconciles && self.spawned_reconciles == self.stuck_reconciles
}
}
enum TenantIdOrShardId {
TenantId(TenantId),
TenantShardId(TenantShardId),
}
impl TenantIdOrShardId {
fn tenant_id(&self) -> TenantId {
match self {
TenantIdOrShardId::TenantId(tenant_id) => *tenant_id,
TenantIdOrShardId::TenantShardId(tenant_shard_id) => tenant_shard_id.tenant_id,
}
}
fn matches(&self, tenant_shard_id: &TenantShardId) -> bool {
match self {
TenantIdOrShardId::TenantId(tenant_id) => tenant_shard_id.tenant_id == *tenant_id,
TenantIdOrShardId::TenantShardId(this_tenant_shard_id) => {
this_tenant_shard_id == tenant_shard_id
}
}
}
}
@@ -1503,7 +1505,6 @@ impl Service {
match result.result {
Ok(()) => {
tenant.consecutive_errors_count = 0;
tenant.apply_observed_deltas(deltas);
tenant.waiter.advance(result.sequence);
}
@@ -1522,8 +1523,6 @@ impl Service {
}
}
tenant.consecutive_errors_count = tenant.consecutive_errors_count.saturating_add(1);
// Ordering: populate last_error before advancing error_seq,
// so that waiters will see the correct error after waiting.
tenant.set_last_error(result.sequence, e);
@@ -1535,6 +1534,8 @@ impl Service {
}
}
tenant.consecutive_reconciles_count = tenant.consecutive_reconciles_count.saturating_add(1);
// If we just finished detaching all shards for a tenant, it might be time to drop it from memory.
if tenant.policy == PlacementPolicy::Detached {
// We may only drop a tenant from memory while holding the exclusive lock on the tenant ID: this protects us
@@ -4773,72 +4774,24 @@ impl Service {
Ok(())
}
fn is_observed_consistent_with_intent(
&self,
shard: &TenantShard,
intent_node_id: NodeId,
) -> bool {
if let Some(location) = shard.observed.locations.get(&intent_node_id)
&& let Some(ref conf) = location.conf
&& (conf.mode == LocationConfigMode::AttachedSingle
|| conf.mode == LocationConfigMode::AttachedMulti)
{
true
} else {
false
}
}
fn collect_tenant_shards(
&self,
tenant_id: TenantId,
) -> Result<TenantShardAttachState, ApiError> {
let locked = self.inner.read().unwrap();
let mut targets = Vec::new();
let mut by_node_id = HashMap::new();
// If the request got an unsharded tenant id, then apply
// the operation to all shards. Otherwise, apply it to a specific shard.
let shards_range = TenantShardId::tenant_range(tenant_id);
for (tenant_shard_id, shard) in locked.tenants.range(shards_range) {
if let Some(node_id) = shard.intent.get_attached() {
let node = locked
.nodes
.get(node_id)
.expect("Pageservers may not be deleted while referenced");
let consistent = self.is_observed_consistent_with_intent(shard, *node_id);
targets.push((*tenant_shard_id, *node_id, node.clone(), consistent));
by_node_id.insert(*node_id, (*tenant_shard_id, node.clone(), consistent));
}
}
Ok(TenantShardAttachState {
targets,
by_node_id,
})
pub(crate) fn is_tenant_not_found_error(body: &str, tenant_id: TenantId) -> bool {
body.contains(&format!("tenant {tenant_id}"))
}
fn process_result_and_passthrough_errors<T>(
&self,
tenant_id: TenantId,
results: Vec<(Node, Result<T, mgmt_api::Error>)>,
attach_state: TenantShardAttachState,
) -> Result<Vec<(Node, T)>, ApiError> {
let mut processed_results: Vec<(Node, T)> = Vec::with_capacity(results.len());
debug_assert_eq!(results.len(), attach_state.targets.len());
for (node, res) in results {
let is_consistent = attach_state
.by_node_id
.get(&node.get_id())
.map(|(_, _, consistent)| *consistent);
match res {
Ok(res) => processed_results.push((node, res)),
Err(mgmt_api::Error::ApiError(StatusCode::NOT_FOUND, _))
if is_consistent == Some(false) =>
Err(mgmt_api::Error::ApiError(StatusCode::NOT_FOUND, body))
if Self::is_tenant_not_found_error(&body, tenant_id) =>
{
// This is expected if the attach is not finished yet. Return 503 so that the client can retry.
// If there's a tenant not found, we are still in the process of attaching the tenant.
// Return 503 so that the client can retry.
return Err(ApiError::ResourceUnavailable(
format!(
"Timeline is not attached to the pageserver {} yet, please retry",
@@ -4866,35 +4819,48 @@ impl Service {
)
.await;
let attach_state = self.collect_tenant_shards(tenant_id)?;
let results = self
.tenant_for_shards_api(
attach_state.for_api_call(),
|tenant_shard_id, client| async move {
client
.timeline_lease_lsn(tenant_shard_id, timeline_id, lsn)
.await
},
1,
1,
SHORT_RECONCILE_TIMEOUT,
&self.cancel,
)
.await;
let leases = self.process_result_and_passthrough_errors(results, attach_state)?;
let mut valid_until = None;
for (_, lease) in leases {
if let Some(ref mut valid_until) = valid_until {
*valid_until = std::cmp::min(*valid_until, lease.valid_until);
} else {
valid_until = Some(lease.valid_until);
self.tenant_remote_mutation(tenant_id, |locations| async move {
if locations.0.is_empty() {
return Err(ApiError::NotFound(
anyhow::anyhow!("Tenant not found").into(),
));
}
}
Ok(LsnLease {
valid_until: valid_until.unwrap_or_else(SystemTime::now),
let results = self
.tenant_for_shards_api(
locations
.0
.iter()
.map(|(tenant_shard_id, ShardMutationLocations { latest, .. })| {
(*tenant_shard_id, latest.node.clone())
})
.collect(),
|tenant_shard_id, client| async move {
client
.timeline_lease_lsn(tenant_shard_id, timeline_id, lsn)
.await
},
1,
1,
SHORT_RECONCILE_TIMEOUT,
&self.cancel,
)
.await;
let leases = self.process_result_and_passthrough_errors(tenant_id, results)?;
let mut valid_until = None;
for (_, lease) in leases {
if let Some(ref mut valid_until) = valid_until {
*valid_until = std::cmp::min(*valid_until, lease.valid_until);
} else {
valid_until = Some(lease.valid_until);
}
}
Ok(LsnLease {
valid_until: valid_until.unwrap_or_else(SystemTime::now),
})
})
.await?
}
pub(crate) async fn tenant_timeline_download_heatmap_layers(
@@ -5041,11 +5007,37 @@ impl Service {
/// - Looks up the shards and the nodes where they were most recently attached
/// - Guarantees that after the inner function returns, the shards' generations haven't moved on: this
/// ensures that the remote operation acted on the most recent generation, and is therefore durable.
async fn tenant_remote_mutation<R, O, F>(
pub(crate) async fn tenant_remote_mutation<R, O, F>(
&self,
tenant_id: TenantId,
op: O,
) -> Result<R, ApiError>
where
O: FnOnce(TenantMutationLocations) -> F,
F: std::future::Future<Output = R>,
{
self.tenant_remote_mutation_inner(TenantIdOrShardId::TenantId(tenant_id), op)
.await
}
pub(crate) async fn tenant_shard_remote_mutation<R, O, F>(
&self,
tenant_shard_id: TenantShardId,
op: O,
) -> Result<R, ApiError>
where
O: FnOnce(TenantMutationLocations) -> F,
F: std::future::Future<Output = R>,
{
self.tenant_remote_mutation_inner(TenantIdOrShardId::TenantShardId(tenant_shard_id), op)
.await
}
async fn tenant_remote_mutation_inner<R, O, F>(
&self,
tenant_id_or_shard_id: TenantIdOrShardId,
op: O,
) -> Result<R, ApiError>
where
O: FnOnce(TenantMutationLocations) -> F,
F: std::future::Future<Output = R>,
@@ -5057,7 +5049,13 @@ impl Service {
// run concurrently with reconciliations, and it is not guaranteed that the node we find here
// will still be the latest when we're done: we will check generations again at the end of
// this function to handle that.
let generations = self.persistence.tenant_generations(tenant_id).await?;
let generations = self
.persistence
.tenant_generations(tenant_id_or_shard_id.tenant_id())
.await?
.into_iter()
.filter(|i| tenant_id_or_shard_id.matches(&i.tenant_shard_id))
.collect::<Vec<_>>();
if generations
.iter()
@@ -5071,9 +5069,14 @@ impl Service {
// One or more shards has not been attached to a pageserver. Check if this is because it's configured
// to be detached (409: caller should give up), or because it's meant to be attached but isn't yet (503: caller should retry)
let locked = self.inner.read().unwrap();
for (shard_id, shard) in
locked.tenants.range(TenantShardId::tenant_range(tenant_id))
{
let tenant_shards = locked
.tenants
.range(TenantShardId::tenant_range(
tenant_id_or_shard_id.tenant_id(),
))
.filter(|(shard_id, _)| tenant_id_or_shard_id.matches(shard_id))
.collect::<Vec<_>>();
for (shard_id, shard) in tenant_shards {
match shard.policy {
PlacementPolicy::Attached(_) => {
// This shard is meant to be attached: the caller is not wrong to try and
@@ -5183,7 +5186,14 @@ impl Service {
// Post-check: are all the generations of all the shards the same as they were initially? This proves that
// our remote operation executed on the latest generation and is therefore persistent.
{
let latest_generations = self.persistence.tenant_generations(tenant_id).await?;
let latest_generations = self
.persistence
.tenant_generations(tenant_id_or_shard_id.tenant_id())
.await?
.into_iter()
.filter(|i| tenant_id_or_shard_id.matches(&i.tenant_shard_id))
.collect::<Vec<_>>();
if latest_generations
.into_iter()
.map(
@@ -5317,7 +5327,7 @@ impl Service {
pub(crate) async fn tenant_shard0_node(
&self,
tenant_id: TenantId,
) -> Result<(Node, TenantShardId, bool), ApiError> {
) -> Result<(Node, TenantShardId), ApiError> {
let tenant_shard_id = {
let locked = self.inner.read().unwrap();
let Some((tenant_shard_id, _shard)) = locked
@@ -5335,7 +5345,7 @@ impl Service {
self.tenant_shard_node(tenant_shard_id)
.await
.map(|(node, consistent)| (node, tenant_shard_id, consistent))
.map(|node| (node, tenant_shard_id))
}
/// When you need to send an HTTP request to the pageserver that holds a shard of a tenant, this
@@ -5345,7 +5355,7 @@ impl Service {
pub(crate) async fn tenant_shard_node(
&self,
tenant_shard_id: TenantShardId,
) -> Result<(Node, bool), ApiError> {
) -> Result<Node, ApiError> {
// Look up in-memory state and maybe use the node from there.
{
let locked = self.inner.read().unwrap();
@@ -5375,8 +5385,7 @@ impl Service {
"Shard refers to nonexistent node"
)));
};
let consistent = self.is_observed_consistent_with_intent(shard, *intent_node_id);
return Ok((node.clone(), consistent));
return Ok(node.clone());
}
};
@@ -5411,7 +5420,7 @@ impl Service {
)));
};
// As a reconciliation is in flight, we do not have the observed state yet, and therefore we assume it is always inconsistent.
Ok((node.clone(), false))
Ok(node.clone())
}
pub(crate) fn tenant_locate(
@@ -7385,6 +7394,7 @@ impl Service {
self: &Arc<Self>,
node_id: NodeId,
policy_on_start: NodeSchedulingPolicy,
force: bool,
cancel: CancellationToken,
) -> Result<(), OperationError> {
let reconciler_config = ReconcilerConfigBuilder::new(ReconcilerPriority::Normal).build();
@@ -7392,23 +7402,27 @@ impl Service {
let mut waiters: Vec<ReconcilerWaiter> = Vec::new();
let mut tid_iter = create_shared_shard_iterator(self.clone());
let reset_node_policy_on_cancel = || async {
match self
.node_configure(node_id, None, Some(policy_on_start))
.await
{
Ok(()) => OperationError::Cancelled,
Err(err) => {
OperationError::FinalizeError(
format!(
"Failed to finalise delete cancel of {} by setting scheduling policy to {}: {}",
node_id, String::from(policy_on_start), err
)
.into(),
)
}
}
};
while !tid_iter.finished() {
if cancel.is_cancelled() {
match self
.node_configure(node_id, None, Some(policy_on_start))
.await
{
Ok(()) => return Err(OperationError::Cancelled),
Err(err) => {
return Err(OperationError::FinalizeError(
format!(
"Failed to finalise delete cancel of {} by setting scheduling policy to {}: {}",
node_id, String::from(policy_on_start), err
)
.into(),
));
}
}
return Err(reset_node_policy_on_cancel().await);
}
operation_utils::validate_node_state(
@@ -7477,8 +7491,18 @@ impl Service {
nodes,
reconciler_config,
);
if let Some(some) = waiter {
waiters.push(some);
if force {
// Here we remove an existing observed location for the node we're removing, and it will
// not be re-added by a reconciler's completion because we filter out removed nodes in
// process_result.
//
// Note that we update the shard's observed state _after_ calling maybe_configured_reconcile_shard:
// that means any reconciles we spawned will know about the node we're deleting,
// enabling them to do live migrations if it's still online.
tenant_shard.observed.locations.remove(&node_id);
} else if let Some(waiter) = waiter {
waiters.push(waiter);
}
}
}
@@ -7492,21 +7516,7 @@ impl Service {
while !waiters.is_empty() {
if cancel.is_cancelled() {
match self
.node_configure(node_id, None, Some(policy_on_start))
.await
{
Ok(()) => return Err(OperationError::Cancelled),
Err(err) => {
return Err(OperationError::FinalizeError(
format!(
"Failed to finalise drain cancel of {} by setting scheduling policy to {}: {}",
node_id, String::from(policy_on_start), err
)
.into(),
));
}
}
return Err(reset_node_policy_on_cancel().await);
}
tracing::info!("Awaiting {} pending delete reconciliations", waiters.len());
@@ -7516,6 +7526,12 @@ impl Service {
.await;
}
let pf = pausable_failpoint!("delete-node-after-reconciles-spawned", &cancel);
if pf.is_err() {
// An error from pausable_failpoint indicates the cancel token was triggered.
return Err(reset_node_policy_on_cancel().await);
}
self.persistence
.set_tombstone(node_id)
.await
@@ -8111,6 +8127,7 @@ impl Service {
pub(crate) async fn start_node_delete(
self: &Arc<Self>,
node_id: NodeId,
force: bool,
) -> Result<(), ApiError> {
let (ongoing_op, node_policy, schedulable_nodes_count) = {
let locked = self.inner.read().unwrap();
@@ -8180,7 +8197,7 @@ impl Service {
tracing::info!("Delete background operation starting");
let res = service
.delete_node(node_id, policy_on_start, cancel)
.delete_node(node_id, policy_on_start, force, cancel)
.await;
match res {
Ok(()) => {
@@ -8632,7 +8649,7 @@ impl Service {
// This function is an efficient place to update lazy statistics, since we are walking
// all tenants.
let mut pending_reconciles = 0;
let mut keep_failing_reconciles = 0;
let mut stuck_reconciles = 0;
let mut az_violations = 0;
// If we find any tenants to drop from memory, stash them to offload after
@@ -8668,30 +8685,32 @@ impl Service {
// Eventual consistency: if an earlier reconcile job failed, and the shard is still
// dirty, spawn another one
let consecutive_errors_count = shard.consecutive_errors_count;
if self
.maybe_reconcile_shard(shard, &pageservers, ReconcilerPriority::Normal)
.is_some()
{
spawned_reconciles += 1;
// Count shards that are keep-failing. We still want to reconcile them
// to avoid a situation where a shard is stuck.
// But we don't want to consider them when deciding to run optimizations.
if consecutive_errors_count >= MAX_CONSECUTIVE_RECONCILIATION_ERRORS {
if shard.consecutive_reconciles_count >= MAX_CONSECUTIVE_RECONCILES {
// Count shards that are stuck, butwe still want to reconcile them.
// We don't want to consider them when deciding to run optimizations.
tracing::warn!(
tenant_id=%shard.tenant_shard_id.tenant_id,
shard_id=%shard.tenant_shard_id.shard_slug(),
"Shard reconciliation is keep-failing: {} errors",
consecutive_errors_count
"Shard reconciliation is stuck: {} consecutive launches",
shard.consecutive_reconciles_count
);
keep_failing_reconciles += 1;
stuck_reconciles += 1;
}
} else {
if shard.delayed_reconcile {
// Shard wanted to reconcile but for some reason couldn't.
pending_reconciles += 1;
}
} else if shard.delayed_reconcile {
// Shard wanted to reconcile but for some reason couldn't.
pending_reconciles += 1;
}
// Reset the counter when we don't need to launch a reconcile.
shard.consecutive_reconciles_count = 0;
}
// If this tenant is detached, try dropping it from memory. This is usually done
// proactively in [`Self::process_results`], but we do it here to handle the edge
// case where a reconcile completes while someone else is holding an op lock for the tenant.
@@ -8727,14 +8746,10 @@ impl Service {
metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_keep_failing_reconciles
.set(keep_failing_reconciles as i64);
.storage_controller_stuck_reconciles
.set(stuck_reconciles as i64);
ReconcileAllResult::new(
spawned_reconciles,
keep_failing_reconciles,
has_delayed_reconciles,
)
ReconcileAllResult::new(spawned_reconciles, stuck_reconciles, has_delayed_reconciles)
}
/// `optimize` in this context means identifying shards which have valid scheduled locations, but

View File

@@ -131,14 +131,16 @@ pub(crate) struct TenantShard {
#[serde(serialize_with = "read_last_error")]
pub(crate) last_error: std::sync::Arc<std::sync::Mutex<Option<Arc<ReconcileError>>>>,
/// Number of consecutive reconciliation errors that have occurred for this shard.
/// Amount of consecutive [`crate::service::Service::reconcile_all`] iterations that have been
/// scheduled a reconciliation for this shard.
///
/// When this count reaches MAX_CONSECUTIVE_RECONCILIATION_ERRORS, the tenant shard
/// will be countered as keep-failing in `reconcile_all` calculations. This will lead to
/// allowing optimizations to run even with some failing shards.
/// If this reaches `MAX_CONSECUTIVE_RECONCILES`, the shard is considered "stuck" and will be
/// ignored when deciding whether optimizations can run. This includes both successful and failed
/// reconciliations.
///
/// The counter is reset to 0 after a successful reconciliation.
pub(crate) consecutive_errors_count: usize,
/// Incremented in [`crate::service::Service::process_result`], and reset to 0 when
/// [`crate::service::Service::reconcile_all`] determines no reconciliation is needed for this shard.
pub(crate) consecutive_reconciles_count: usize,
/// If we have a pending compute notification that for some reason we weren't able to send,
/// set this to true. If this is set, calls to [`Self::get_reconcile_needed`] will return Yes
@@ -603,7 +605,7 @@ impl TenantShard {
waiter: Arc::new(SeqWait::new(Sequence(0))),
error_waiter: Arc::new(SeqWait::new(Sequence(0))),
last_error: Arc::default(),
consecutive_errors_count: 0,
consecutive_reconciles_count: 0,
pending_compute_notification: false,
scheduling_policy: ShardSchedulingPolicy::default(),
preferred_node: None,
@@ -1609,7 +1611,13 @@ impl TenantShard {
// Update result counter
let outcome_label = match &result {
Ok(_) => ReconcileOutcome::Success,
Ok(_) => {
if reconciler.compute_notify_failure {
ReconcileOutcome::SuccessNoNotify
} else {
ReconcileOutcome::Success
}
}
Err(ReconcileError::Cancel) => ReconcileOutcome::Cancel,
Err(_) => ReconcileOutcome::Error,
};
@@ -1908,7 +1916,7 @@ impl TenantShard {
waiter: Arc::new(SeqWait::new(Sequence::initial())),
error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
last_error: Arc::default(),
consecutive_errors_count: 0,
consecutive_reconciles_count: 0,
pending_compute_notification: false,
delayed_reconcile: false,
scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(),