mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-22 21:59:59 +00:00
@@ -873,6 +873,22 @@ impl Client {
|
||||
.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
pub async fn reset_alert_gauges(&self) -> Result<()> {
|
||||
let uri = format!(
|
||||
"{}/hadron-internal/reset_alert_gauges",
|
||||
self.mgmt_api_endpoint
|
||||
);
|
||||
self.start_request(Method::POST, uri)
|
||||
.send()
|
||||
.await
|
||||
.map_err(Error::SendRequest)?
|
||||
.error_from_body()
|
||||
.await?
|
||||
.json()
|
||||
.await
|
||||
.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
pub async fn wait_lsn(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
|
||||
@@ -0,0 +1,2 @@
|
||||
DROP TABLE hadron_safekeepers;
|
||||
DROP TABLE hadron_timeline_safekeepers;
|
||||
@@ -0,0 +1,17 @@
|
||||
-- hadron_safekeepers keep track of all Safe Keeper nodes that exist in the system.
|
||||
-- Upon startup, each Safe Keeper reaches out to the hadron cluster coordinator to register its node ID and listen addresses.
|
||||
|
||||
CREATE TABLE hadron_safekeepers (
|
||||
sk_node_id BIGINT PRIMARY KEY NOT NULL,
|
||||
listen_http_addr VARCHAR NOT NULL,
|
||||
listen_http_port INTEGER NOT NULL,
|
||||
listen_pg_addr VARCHAR NOT NULL,
|
||||
listen_pg_port INTEGER NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE hadron_timeline_safekeepers (
|
||||
timeline_id VARCHAR NOT NULL,
|
||||
sk_node_id BIGINT NOT NULL,
|
||||
legacy_endpoint_id UUID DEFAULT NULL,
|
||||
PRIMARY KEY(timeline_id, sk_node_id)
|
||||
);
|
||||
44
storage_controller/src/hadron_utils.rs
Normal file
44
storage_controller/src/hadron_utils.rs
Normal file
@@ -0,0 +1,44 @@
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use rand::Rng;
|
||||
use utils::shard::TenantShardId;
|
||||
|
||||
static CHARSET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*()";
|
||||
|
||||
/// Generate a random string of `length` that can be used as a password. The generated string
|
||||
/// contains alphanumeric characters and special characters (!@#$%^&*())
|
||||
pub fn generate_random_password(length: usize) -> String {
|
||||
let mut rng = rand::thread_rng();
|
||||
(0..length)
|
||||
.map(|_| {
|
||||
let idx = rng.gen_range(0..CHARSET.len());
|
||||
CHARSET[idx] as char
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub(crate) struct TenantShardSizeMap {
|
||||
#[expect(dead_code)]
|
||||
pub map: BTreeMap<TenantShardId, u64>,
|
||||
}
|
||||
|
||||
impl TenantShardSizeMap {
|
||||
pub fn new(map: BTreeMap<TenantShardId, u64>) -> Self {
|
||||
Self { map }
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_generate_random_password() {
|
||||
let pwd1 = generate_random_password(10);
|
||||
assert_eq!(pwd1.len(), 10);
|
||||
let pwd2 = generate_random_password(10);
|
||||
assert_ne!(pwd1, pwd2);
|
||||
assert!(pwd1.chars().all(|c| CHARSET.contains(&(c as u8))));
|
||||
assert!(pwd2.chars().all(|c| CHARSET.contains(&(c as u8))));
|
||||
}
|
||||
}
|
||||
@@ -6,6 +6,7 @@ extern crate hyper0 as hyper;
|
||||
mod auth;
|
||||
mod background_node_operations;
|
||||
mod compute_hook;
|
||||
pub mod hadron_utils;
|
||||
mod heartbeater;
|
||||
pub mod http;
|
||||
mod id_lock_map;
|
||||
|
||||
@@ -14,6 +14,8 @@ use reqwest::StatusCode;
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::hadron_utils::TenantShardSizeMap;
|
||||
|
||||
/// Thin wrapper around [`pageserver_client::mgmt_api::Client`]. It allows the storage
|
||||
/// controller to collect metrics in a non-intrusive manner.
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -86,6 +88,31 @@ impl PageserverClient {
|
||||
)
|
||||
}
|
||||
|
||||
#[expect(dead_code)]
|
||||
pub(crate) async fn tenant_timeline_compact(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
force_image_layer_creation: bool,
|
||||
wait_until_done: bool,
|
||||
) -> Result<()> {
|
||||
measured_request!(
|
||||
"tenant_timeline_compact",
|
||||
crate::metrics::Method::Put,
|
||||
&self.node_id_label,
|
||||
self.inner
|
||||
.tenant_timeline_compact(
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
force_image_layer_creation,
|
||||
true,
|
||||
false,
|
||||
wait_until_done,
|
||||
)
|
||||
.await
|
||||
)
|
||||
}
|
||||
|
||||
/* BEGIN_HADRON */
|
||||
pub(crate) async fn tenant_timeline_describe(
|
||||
&self,
|
||||
@@ -101,6 +128,17 @@ impl PageserverClient {
|
||||
.await
|
||||
)
|
||||
}
|
||||
|
||||
#[expect(dead_code)]
|
||||
pub(crate) async fn list_tenant_visible_size(&self) -> Result<TenantShardSizeMap> {
|
||||
measured_request!(
|
||||
"list_tenant_visible_size",
|
||||
crate::metrics::Method::Get,
|
||||
&self.node_id_label,
|
||||
self.inner.list_tenant_visible_size().await
|
||||
)
|
||||
.map(TenantShardSizeMap::new)
|
||||
}
|
||||
/* END_HADRON */
|
||||
|
||||
pub(crate) async fn tenant_scan_remote_storage(
|
||||
@@ -365,6 +403,16 @@ impl PageserverClient {
|
||||
)
|
||||
}
|
||||
|
||||
#[expect(dead_code)]
|
||||
pub(crate) async fn reset_alert_gauges(&self) -> Result<()> {
|
||||
measured_request!(
|
||||
"reset_alert_gauges",
|
||||
crate::metrics::Method::Post,
|
||||
&self.node_id_label,
|
||||
self.inner.reset_alert_gauges().await
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) async fn wait_lsn(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
|
||||
@@ -862,11 +862,11 @@ impl Reconciler {
|
||||
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
|
||||
if refreshed {
|
||||
tracing::info!(
|
||||
node_id=%node.get_id(), "Observed configuration correct after refresh. Notifying compute.");
|
||||
node_id=%node.get_id(), "[Attached] Observed configuration correct after refresh. Notifying compute.");
|
||||
self.compute_notify().await?;
|
||||
} else {
|
||||
// Nothing to do
|
||||
tracing::info!(node_id=%node.get_id(), "Observed configuration already correct.");
|
||||
tracing::info!(node_id=%node.get_id(), "[Attached] Observed configuration already correct.");
|
||||
}
|
||||
}
|
||||
observed => {
|
||||
@@ -945,17 +945,17 @@ impl Reconciler {
|
||||
match self.observed.locations.get(&node.get_id()) {
|
||||
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
|
||||
// Nothing to do
|
||||
tracing::info!(node_id=%node.get_id(), "Observed configuration already correct.")
|
||||
tracing::info!(node_id=%node.get_id(), "[Secondary] Observed configuration already correct.")
|
||||
}
|
||||
_ => {
|
||||
// Only try and configure secondary locations on nodes that are available. This
|
||||
// allows the reconciler to "succeed" while some secondaries are offline (e.g. after
|
||||
// a node failure, where the failed node will have a secondary intent)
|
||||
if node.is_available() {
|
||||
tracing::info!(node_id=%node.get_id(), "Observed configuration requires update.");
|
||||
tracing::info!(node_id=%node.get_id(), "[Secondary] Observed configuration requires update.");
|
||||
changes.push((node.clone(), wanted_conf))
|
||||
} else {
|
||||
tracing::info!(node_id=%node.get_id(), "Skipping configuration as secondary, node is unavailable");
|
||||
tracing::info!(node_id=%node.get_id(), "[Secondary] Skipping configuration as secondary, node is unavailable");
|
||||
self.observed
|
||||
.locations
|
||||
.insert(node.get_id(), ObservedStateLocation { conf: None });
|
||||
@@ -1066,6 +1066,9 @@ impl Reconciler {
|
||||
}
|
||||
result
|
||||
} else {
|
||||
tracing::info!(
|
||||
"Compute notification is skipped because the tenant shard does not have an attached (primary) location"
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,6 +13,24 @@ diesel::table! {
|
||||
}
|
||||
}
|
||||
|
||||
diesel::table! {
|
||||
hadron_safekeepers (sk_node_id) {
|
||||
sk_node_id -> Int8,
|
||||
listen_http_addr -> Varchar,
|
||||
listen_http_port -> Int4,
|
||||
listen_pg_addr -> Varchar,
|
||||
listen_pg_port -> Int4,
|
||||
}
|
||||
}
|
||||
|
||||
diesel::table! {
|
||||
hadron_timeline_safekeepers (timeline_id, sk_node_id) {
|
||||
timeline_id -> Varchar,
|
||||
sk_node_id -> Int8,
|
||||
legacy_endpoint_id -> Nullable<Uuid>,
|
||||
}
|
||||
}
|
||||
|
||||
diesel::table! {
|
||||
metadata_health (tenant_id, shard_number, shard_count) {
|
||||
tenant_id -> Varchar,
|
||||
@@ -105,6 +123,8 @@ diesel::table! {
|
||||
|
||||
diesel::allow_tables_to_appear_in_same_query!(
|
||||
controllers,
|
||||
hadron_safekeepers,
|
||||
hadron_timeline_safekeepers,
|
||||
metadata_health,
|
||||
nodes,
|
||||
safekeeper_timeline_pending_ops,
|
||||
|
||||
@@ -1611,7 +1611,13 @@ impl TenantShard {
|
||||
|
||||
// Update result counter
|
||||
let outcome_label = match &result {
|
||||
Ok(_) => ReconcileOutcome::Success,
|
||||
Ok(_) => {
|
||||
if reconciler.compute_notify_failure {
|
||||
ReconcileOutcome::SuccessNoNotify
|
||||
} else {
|
||||
ReconcileOutcome::Success
|
||||
}
|
||||
}
|
||||
Err(ReconcileError::Cancel) => ReconcileOutcome::Cancel,
|
||||
Err(_) => ReconcileOutcome::Error,
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user