mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-24 16:40:38 +00:00
Compare commits
10 Commits
ci-run/pr-
...
vlad/port-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
950df2668c | ||
|
|
d9774e2c67 | ||
|
|
dc25f1de90 | ||
|
|
2f9cc9a11e | ||
|
|
e92ed85e9a | ||
|
|
07396d3fc7 | ||
|
|
d5cd2de3cf | ||
|
|
bab09fffb6 | ||
|
|
7bd73eba72 | ||
|
|
a1924e72ad |
@@ -74,4 +74,4 @@ http-utils = { path = "../libs/http-utils/" }
|
||||
utils = { path = "../libs/utils/" }
|
||||
metrics = { path = "../libs/metrics/" }
|
||||
control_plane = { path = "../control_plane" }
|
||||
workspace_hack = { version = "0.1", path = "../workspace_hack" }
|
||||
workspace_hack = { version = "0.1", path = "../workspace_hack" }
|
||||
|
||||
7
storage_controller/src/hadron_dns.rs
Normal file
7
storage_controller/src/hadron_dns.rs
Normal file
@@ -0,0 +1,7 @@
|
||||
/// Type of the storage node (pageserver or safekeeper) that we are updating DNS records for. Different types of nodes will have
|
||||
/// different-looking DNS names in the DNS zone.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
|
||||
pub enum NodeType {
|
||||
Pageserver,
|
||||
Safekeeper,
|
||||
}
|
||||
433
storage_controller/src/hadron_queries.rs
Normal file
433
storage_controller/src/hadron_queries.rs
Normal file
@@ -0,0 +1,433 @@
|
||||
#![allow(dead_code, unused)]
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
use diesel::Queryable;
|
||||
use diesel::dsl::min;
|
||||
use diesel::prelude::*;
|
||||
use diesel_async::AsyncConnection;
|
||||
use diesel_async::AsyncPgConnection;
|
||||
use diesel_async::RunQueryDsl;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::controller_api::SCSafekeeperTimelinesResponse;
|
||||
use scoped_futures::ScopedFutureExt;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use utils::id::{NodeId, TenantId, TimelineId};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::hadron_dns::NodeType;
|
||||
use crate::hadron_requests::NodeConnectionInfo;
|
||||
use crate::persistence::{DatabaseError, DatabaseResult};
|
||||
use crate::schema::{hadron_safekeepers, nodes};
|
||||
use crate::sk_node::SafeKeeperNode;
|
||||
use std::str::FromStr;
|
||||
|
||||
// The Safe Keeper node database representation (for Diesel).
|
||||
#[derive(
|
||||
Clone, Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq, AsChangeset,
|
||||
)]
|
||||
#[diesel(table_name = crate::schema::hadron_safekeepers)]
|
||||
pub(crate) struct HadronSafekeeperRow {
|
||||
pub(crate) sk_node_id: i64,
|
||||
pub(crate) listen_http_addr: String,
|
||||
pub(crate) listen_http_port: i32,
|
||||
pub(crate) listen_pg_addr: String,
|
||||
pub(crate) listen_pg_port: i32,
|
||||
}
|
||||
|
||||
#[derive(
|
||||
Clone, Serialize, Deserialize, Queryable, Selectable, Insertable, Eq, PartialEq, AsChangeset,
|
||||
)]
|
||||
#[diesel(table_name = crate::schema::hadron_timeline_safekeepers)]
|
||||
pub(crate) struct HadronTimelineSafekeeper {
|
||||
pub(crate) timeline_id: String,
|
||||
pub(crate) sk_node_id: i64,
|
||||
pub(crate) legacy_endpoint_id: Option<Uuid>,
|
||||
}
|
||||
|
||||
pub async fn execute_sk_upsert(
|
||||
conn: &mut AsyncPgConnection,
|
||||
sk_row: HadronSafekeeperRow,
|
||||
) -> DatabaseResult<()> {
|
||||
// SQL:
|
||||
// INSERT INTO hadron_safekeepers (sk_node_id, listen_http_addr, listen_http_port, listen_pg_addr, listen_pg_port)
|
||||
// VALUES ($1, $2, $3, $4, $5)
|
||||
// ON CONFLICT (sk_node_id)
|
||||
// DO UPDATE SET listen_http_addr = $2, listen_http_port = $3, listen_pg_addr = $4, listen_pg_port = $5;
|
||||
|
||||
use crate::schema::hadron_safekeepers::dsl::*;
|
||||
|
||||
diesel::insert_into(hadron_safekeepers)
|
||||
.values(&sk_row)
|
||||
.on_conflict(sk_node_id)
|
||||
.do_update()
|
||||
.set(&sk_row)
|
||||
.execute(conn)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Load all safekeeper nodes and their associated timelines from the meta PG. This query is supposed
|
||||
// to run only once on HCC startup and is used to construct the SafeKeeperScheduler state. Performs
|
||||
// scans of the hadron_safekeepers and hadron_timeline_safekeepers tables.
|
||||
pub async fn scan_safekeepers_and_scheduled_timelines(
|
||||
conn: &mut AsyncPgConnection,
|
||||
) -> DatabaseResult<HashMap<NodeId, SafeKeeperNode>> {
|
||||
use crate::schema::hadron_safekeepers;
|
||||
use crate::schema::hadron_timeline_safekeepers;
|
||||
|
||||
// We first scan the hadron_safekeepers table to constuct the SafeKeeperNode objects. We don't know anything about
|
||||
// the timelines scheduled to the safekeepers after this step. We then scan the hadron_timeline_safekeepers table
|
||||
// to populate the data structures in the SafeKeeperNode objects to reflect the timelines scheduled to the safekeepers.
|
||||
let mut results: HashMap<NodeId, SafeKeeperNode> = hadron_safekeepers::table
|
||||
.select((
|
||||
hadron_safekeepers::sk_node_id,
|
||||
hadron_safekeepers::listen_http_addr,
|
||||
hadron_safekeepers::listen_http_port,
|
||||
hadron_safekeepers::listen_pg_addr,
|
||||
hadron_safekeepers::listen_pg_port,
|
||||
))
|
||||
.load::<HadronSafekeeperRow>(conn)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|row| {
|
||||
let sk_node = SafeKeeperNode {
|
||||
id: NodeId(row.sk_node_id as u64),
|
||||
listen_http_addr: row.listen_http_addr.clone(),
|
||||
listen_http_port: row.listen_http_port as u16,
|
||||
listen_pg_addr: row.listen_pg_addr.clone(),
|
||||
listen_pg_port: row.listen_pg_port as u16,
|
||||
legacy_endpoints: HashMap::new(),
|
||||
timelines: HashSet::new(),
|
||||
};
|
||||
(sk_node.id, sk_node)
|
||||
})
|
||||
.collect();
|
||||
|
||||
let timeline_sk_rows = hadron_timeline_safekeepers::table
|
||||
.select((
|
||||
hadron_timeline_safekeepers::sk_node_id,
|
||||
hadron_timeline_safekeepers::timeline_id,
|
||||
hadron_timeline_safekeepers::legacy_endpoint_id,
|
||||
))
|
||||
.load::<(i64, String, Option<Uuid>)>(conn)
|
||||
.await?;
|
||||
for (sk_node_id, timeline_id, legacy_endpoint_id) in timeline_sk_rows {
|
||||
if let Some(sk_node) = results.get_mut(&NodeId(sk_node_id as u64)) {
|
||||
let parsed_timeline_id =
|
||||
TimelineId::from_str(&timeline_id).map_err(|e: hex::FromHexError| {
|
||||
DatabaseError::Logical(format!("Failed to parse timeline IDs: {e}"))
|
||||
})?;
|
||||
sk_node.timelines.insert(parsed_timeline_id);
|
||||
if let Some(legacy_endpoint_id) = legacy_endpoint_id {
|
||||
sk_node
|
||||
.legacy_endpoints
|
||||
.insert(legacy_endpoint_id, parsed_timeline_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
// Queries the hadron_timeline_safekeepers table to get the safekeepers assigned to the passed
|
||||
// timeline. If none are found, persists the input proposed safekeepers to the table and returns
|
||||
// them.
|
||||
pub async fn idempotently_persist_or_get_existing_timeline_safekeepers(
|
||||
conn: &mut AsyncPgConnection,
|
||||
timeline_id: TimelineId,
|
||||
safekeepers: &[NodeId],
|
||||
) -> DatabaseResult<Vec<NodeId>> {
|
||||
use crate::schema::hadron_timeline_safekeepers;
|
||||
// Confirm and persist the timeline-safekeeper mapping. If there are existing safekeepers
|
||||
// assigned to the timeline in the database, treat those as the source of truth.
|
||||
let existing_safekeepers: Vec<i64> = hadron_timeline_safekeepers::table
|
||||
.select(hadron_timeline_safekeepers::sk_node_id)
|
||||
.filter(hadron_timeline_safekeepers::timeline_id.eq(timeline_id.to_string()))
|
||||
.load::<i64>(conn)
|
||||
.await?;
|
||||
let confirmed_safekeepers: Vec<NodeId> = if existing_safekeepers.is_empty() {
|
||||
let proposed_safekeeper_endpoint_rows_result: Result<Vec<HadronTimelineSafekeeper>, _> =
|
||||
safekeepers
|
||||
.iter()
|
||||
.map(|sk_node_id| {
|
||||
i64::try_from(sk_node_id.0).map(|sk_node_id| HadronTimelineSafekeeper {
|
||||
timeline_id: timeline_id.to_string(),
|
||||
sk_node_id,
|
||||
legacy_endpoint_id: None,
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
|
||||
let proposed_safekeeper_endpoint_rows =
|
||||
proposed_safekeeper_endpoint_rows_result.map_err(|e| {
|
||||
DatabaseError::Logical(format!("Failed to convert safekeeper IDs: {e}"))
|
||||
})?;
|
||||
|
||||
diesel::insert_into(hadron_timeline_safekeepers::table)
|
||||
.values(&proposed_safekeeper_endpoint_rows)
|
||||
.execute(conn)
|
||||
.await?;
|
||||
safekeepers.to_owned()
|
||||
} else {
|
||||
let safekeeper_result: Result<Vec<NodeId>, _> = existing_safekeepers
|
||||
.into_iter()
|
||||
.map(|arg0: i64| u64::try_from(arg0).map(NodeId))
|
||||
.collect();
|
||||
|
||||
safekeeper_result
|
||||
.map_err(|e| DatabaseError::Logical(format!("Failed to convert safekeeper IDs: {e}")))?
|
||||
};
|
||||
|
||||
Ok(confirmed_safekeepers)
|
||||
}
|
||||
|
||||
pub async fn delete_timeline_safekeepers(
|
||||
conn: &mut AsyncPgConnection,
|
||||
timeline_id: TimelineId,
|
||||
) -> DatabaseResult<()> {
|
||||
use crate::schema::hadron_timeline_safekeepers;
|
||||
|
||||
diesel::delete(hadron_timeline_safekeepers::table)
|
||||
.filter(hadron_timeline_safekeepers::timeline_id.eq(timeline_id.to_string()))
|
||||
.execute(conn)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn execute_safekeeper_list_timelines(
|
||||
conn: &mut AsyncPgConnection,
|
||||
safekeeper_id: i64,
|
||||
) -> DatabaseResult<SCSafekeeperTimelinesResponse> {
|
||||
use crate::schema::hadron_timeline_safekeepers;
|
||||
use pageserver_api::controller_api::SCSafekeeperTimelinesResponse;
|
||||
|
||||
conn.transaction(|conn| {
|
||||
async move {
|
||||
let mut sk_timelines = SCSafekeeperTimelinesResponse {
|
||||
timelines: Vec::new(),
|
||||
safekeeper_peers: Vec::new(),
|
||||
};
|
||||
|
||||
// Find all timelines <String>
|
||||
let timeline_ids = hadron_timeline_safekeepers::table
|
||||
.select(hadron_timeline_safekeepers::timeline_id)
|
||||
.filter(hadron_timeline_safekeepers::sk_node_id.eq(safekeeper_id))
|
||||
.load::<String>(conn)
|
||||
.await
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.collect_vec();
|
||||
|
||||
// Find the peers for each timeline. <timeline_id, sk_node_id>
|
||||
let timeline_peers = hadron_timeline_safekeepers::table
|
||||
.select((
|
||||
hadron_timeline_safekeepers::timeline_id,
|
||||
hadron_timeline_safekeepers::sk_node_id,
|
||||
))
|
||||
.filter(hadron_timeline_safekeepers::timeline_id.eq_any(&timeline_ids))
|
||||
.load::<(String, i64)>(conn)
|
||||
.await
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.collect_vec();
|
||||
|
||||
let mut timeline_peers_map = HashMap::new();
|
||||
let mut seen = HashSet::new();
|
||||
let mut unique_sks = Vec::new();
|
||||
|
||||
for (timeline_id, sk_node_id) in timeline_peers {
|
||||
timeline_peers_map
|
||||
.entry(timeline_id)
|
||||
.or_insert_with(Vec::new)
|
||||
.push(sk_node_id);
|
||||
if seen.insert(sk_node_id) {
|
||||
unique_sks.push(sk_node_id);
|
||||
}
|
||||
}
|
||||
|
||||
// Find SK info.
|
||||
let mut found_sk_nodes = HashSet::new();
|
||||
hadron_safekeepers::table
|
||||
.select((
|
||||
hadron_safekeepers::sk_node_id,
|
||||
hadron_safekeepers::listen_http_addr,
|
||||
hadron_safekeepers::listen_http_port,
|
||||
))
|
||||
.filter(hadron_safekeepers::sk_node_id.eq_any(&unique_sks))
|
||||
.load::<(i64, String, i32)>(conn)
|
||||
.await
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.for_each(|(sk_node_id, listen_http_addr, http_port)| {
|
||||
found_sk_nodes.insert(sk_node_id);
|
||||
|
||||
sk_timelines.safekeeper_peers.push(
|
||||
pageserver_api::controller_api::TimelineSafekeeperPeer {
|
||||
node_id: utils::id::NodeId(sk_node_id as u64),
|
||||
listen_http_addr,
|
||||
http_port,
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
// Prepare timeline response.
|
||||
for timeline_id in timeline_ids {
|
||||
if !timeline_peers_map.contains_key(&timeline_id) {
|
||||
continue;
|
||||
}
|
||||
let peers = timeline_peers_map.get(&timeline_id).unwrap();
|
||||
// Check peers exist.
|
||||
if !peers
|
||||
.iter()
|
||||
.all(|sk_node_id| found_sk_nodes.contains(sk_node_id))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
let timeline = pageserver_api::controller_api::SCSafekeeperTimeline {
|
||||
timeline_id: TimelineId::from_str(&timeline_id).unwrap(),
|
||||
peers: peers
|
||||
.iter()
|
||||
.map(|sk_node_id| utils::id::NodeId(*sk_node_id as u64))
|
||||
.collect(),
|
||||
};
|
||||
sk_timelines.timelines.push(timeline);
|
||||
}
|
||||
|
||||
Ok(sk_timelines)
|
||||
}
|
||||
.scope_boxed()
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Stores details about connecting to pageserver and safekeeper nodes for a given tenant and
|
||||
/// timeline.
|
||||
pub struct PageserverAndSafekeeperConnectionInfo {
|
||||
pub pageserver_conn_info: Vec<NodeConnectionInfo>,
|
||||
pub safekeeper_conn_info: Vec<NodeConnectionInfo>,
|
||||
}
|
||||
|
||||
/// Retrieves the connection information for the pageserver and safekeepers associated with the
|
||||
/// given tenant and timeline.
|
||||
pub async fn get_pageserver_and_safekeeper_connection_info(
|
||||
conn: &mut AsyncPgConnection,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> DatabaseResult<PageserverAndSafekeeperConnectionInfo> {
|
||||
conn.transaction(|conn| {
|
||||
async move {
|
||||
// Fetch details about pageserver, which is associated with the input tenant.
|
||||
let pageserver_conn_info =
|
||||
get_pageserver_connection_info(conn, &tenant_id.to_string()).await?;
|
||||
|
||||
// Fetch details about safekeepers, which are associated with the input timeline.
|
||||
let safekeeper_conn_info =
|
||||
get_safekeeper_connection_info(conn, &timeline_id.to_string()).await?;
|
||||
|
||||
Ok(PageserverAndSafekeeperConnectionInfo {
|
||||
pageserver_conn_info,
|
||||
safekeeper_conn_info,
|
||||
})
|
||||
}
|
||||
.scope_boxed()
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
async fn get_safekeeper_connection_info(
|
||||
conn: &mut AsyncPgConnection,
|
||||
timeline_id: &str,
|
||||
) -> DatabaseResult<Vec<NodeConnectionInfo>> {
|
||||
use crate::schema::hadron_safekeepers;
|
||||
use crate::schema::hadron_timeline_safekeepers;
|
||||
|
||||
Ok(hadron_timeline_safekeepers::table
|
||||
.inner_join(
|
||||
hadron_safekeepers::table
|
||||
.on(hadron_timeline_safekeepers::sk_node_id.eq(hadron_safekeepers::sk_node_id)),
|
||||
)
|
||||
.select((
|
||||
hadron_safekeepers::sk_node_id,
|
||||
hadron_safekeepers::listen_pg_addr,
|
||||
hadron_safekeepers::listen_pg_port,
|
||||
))
|
||||
.filter(hadron_timeline_safekeepers::timeline_id.eq(timeline_id.to_string()))
|
||||
.load::<(i64, String, i32)>(conn)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|(node_id, addr, port)| {
|
||||
NodeConnectionInfo::new(
|
||||
NodeType::Safekeeper,
|
||||
NodeId(node_id as u64),
|
||||
addr,
|
||||
port as u16,
|
||||
)
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
|
||||
async fn get_pageserver_connection_info(
|
||||
conn: &mut AsyncPgConnection,
|
||||
tenant_id: &str,
|
||||
) -> DatabaseResult<Vec<NodeConnectionInfo>> {
|
||||
use crate::schema::tenant_shards;
|
||||
|
||||
// When the tenant is being split, it'll contain both old shards and new shards. Until the tenant split is committed,
|
||||
// we should always use the old shards.
|
||||
// NOTE: we only support tenant split without tennat merge. Thus shard count could only increase.
|
||||
let min_shard_count = match tenant_shards::table
|
||||
.select(min(tenant_shards::shard_count))
|
||||
.filter(tenant_shards::tenant_id.eq(tenant_id))
|
||||
.first::<Option<i32>>(conn)
|
||||
.await
|
||||
.optional()?
|
||||
{
|
||||
Some(Some(count)) => count,
|
||||
Some(None) => {
|
||||
// Tenant doesn't exist. It's possible that it was deleted before we got the request.
|
||||
return Ok(vec![]);
|
||||
}
|
||||
None => {
|
||||
// This is never supposed to happen because `SELECT min()` should always return one row.
|
||||
return Err(DatabaseError::Logical(format!(
|
||||
"Unexpected empty query result for min(shard_count) query. Tenant ID {tenant_id}"
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
let shards: Vec<NodeConnectionInfo> = nodes::table
|
||||
.inner_join(
|
||||
tenant_shards::table.on(nodes::node_id
|
||||
.nullable()
|
||||
.eq(tenant_shards::generation_pageserver)),
|
||||
)
|
||||
.select((nodes::node_id, nodes::listen_pg_addr, nodes::listen_pg_port))
|
||||
.filter(tenant_shards::tenant_id.eq(&tenant_id.to_string()))
|
||||
.order(tenant_shards::shard_number.asc())
|
||||
.filter(tenant_shards::shard_count.eq(min_shard_count))
|
||||
.load::<(i64, String, i32)>(conn)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|(node_id, addr, port)| {
|
||||
NodeConnectionInfo::new(
|
||||
NodeType::Pageserver,
|
||||
NodeId(node_id as u64),
|
||||
addr,
|
||||
port as u16,
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
|
||||
if !shards.is_empty() && !shards.len().is_power_of_two() {
|
||||
return Err(DatabaseError::Logical(format!(
|
||||
"Tenant {} has unexpected shard count {} (not a power of 2)",
|
||||
tenant_id,
|
||||
shards.len()
|
||||
)));
|
||||
}
|
||||
Ok(shards)
|
||||
}
|
||||
34
storage_controller/src/hadron_requests.rs
Normal file
34
storage_controller/src/hadron_requests.rs
Normal file
@@ -0,0 +1,34 @@
|
||||
use utils::id::NodeId;
|
||||
|
||||
use crate::hadron_dns::NodeType;
|
||||
|
||||
/// Internal representation of how a compute node should connect to a PS or SK node. HCC uses this struct to
|
||||
/// construct connection strings that are passed to the compute node via the compute spec. This struct is never
|
||||
/// serialized or sent over the wire.
|
||||
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
|
||||
pub(crate) struct NodeConnectionInfo {
|
||||
// Type of the node.
|
||||
node_type: NodeType,
|
||||
// Node ID. Unique for each node type.
|
||||
pub(crate) node_id: NodeId,
|
||||
// The hostname reported by the node when it registers. This is the hostname we store in the meta PG, and is
|
||||
// typically the k8s cluster DNS name of the node. Note that this may not be resolvable from compute nodes running
|
||||
// on dblet. For this reason, this hostname is usually not communicated to the compute node. Instead, HCC computes
|
||||
// a DNS name of the node in the Cloud DNS hosted zone based on `node_type` and `node_id` and advertise the DNS name
|
||||
// to compute nodes. This hostname here is used as a fallback in tests or other scenarios where we do not have the
|
||||
// Cloud DNS hosted zone available.
|
||||
registration_hostname: String,
|
||||
// The PG wire protocol port on the PS or SK node.
|
||||
port: u16,
|
||||
}
|
||||
|
||||
impl NodeConnectionInfo {
|
||||
pub(crate) fn new(node_type: NodeType, node_id: NodeId, hostname: String, port: u16) -> Self {
|
||||
NodeConnectionInfo {
|
||||
node_type,
|
||||
node_id,
|
||||
registration_hostname: hostname,
|
||||
port,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -6,6 +6,9 @@ extern crate hyper0 as hyper;
|
||||
mod auth;
|
||||
mod background_node_operations;
|
||||
mod compute_hook;
|
||||
pub mod hadron_dns;
|
||||
mod hadron_queries;
|
||||
pub mod hadron_requests;
|
||||
pub mod hadron_utils;
|
||||
mod heartbeater;
|
||||
pub mod http;
|
||||
@@ -23,6 +26,7 @@ mod safekeeper_client;
|
||||
mod scheduler;
|
||||
mod schema;
|
||||
pub mod service;
|
||||
mod sk_node;
|
||||
mod tenant_shard;
|
||||
mod timeline_import;
|
||||
|
||||
|
||||
@@ -16,7 +16,7 @@ use pageserver_api::config::PostHogConfig;
|
||||
use reqwest::Certificate;
|
||||
use storage_controller::http::make_router;
|
||||
use storage_controller::metrics::preinitialize_metrics;
|
||||
use storage_controller::persistence::Persistence;
|
||||
use storage_controller::persistence::{Persistence, PersistenceConfig};
|
||||
use storage_controller::service::chaos_injector::ChaosInjector;
|
||||
use storage_controller::service::feature_flag::FeatureFlagService;
|
||||
use storage_controller::service::{
|
||||
@@ -229,6 +229,15 @@ struct Cli {
|
||||
/// **Feature Flag** Whether the storage controller should act to rectify pageserver-reported local disk loss.
|
||||
#[arg(long, default_value = "false")]
|
||||
handle_ps_local_disk_loss: bool,
|
||||
|
||||
#[arg(long)]
|
||||
db_max_connections: Option<u32>,
|
||||
|
||||
#[arg(long)]
|
||||
db_idle_connection_timeout: Option<humantime::Duration>,
|
||||
|
||||
#[arg(long)]
|
||||
db_max_connection_lifetime: Option<humantime::Duration>,
|
||||
}
|
||||
|
||||
enum StrictMode {
|
||||
@@ -338,7 +347,7 @@ fn main() -> anyhow::Result<()> {
|
||||
tokio::runtime::Builder::new_current_thread()
|
||||
// We use spawn_blocking for database operations, so require approximately
|
||||
// as many blocking threads as we will open database connections.
|
||||
.max_blocking_threads(Persistence::MAX_CONNECTIONS as usize)
|
||||
.max_blocking_threads(PersistenceConfig::MAX_CONNECTIONS_DEFAULT as usize)
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap()
|
||||
@@ -429,6 +438,19 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
None
|
||||
};
|
||||
|
||||
let db_idle_connection_timeout: Option<Duration> = args
|
||||
.db_idle_connection_timeout
|
||||
.map(humantime::Duration::into);
|
||||
let db_max_connection_lifetime: Option<Duration> = args
|
||||
.db_max_connection_lifetime
|
||||
.map(humantime::Duration::into);
|
||||
|
||||
let persistence_config = PersistenceConfig::new(
|
||||
args.db_max_connections,
|
||||
db_idle_connection_timeout,
|
||||
db_max_connection_lifetime,
|
||||
);
|
||||
|
||||
let config = Config {
|
||||
pageserver_jwt_token: secrets.pageserver_jwt_token,
|
||||
safekeeper_jwt_token: secrets.safekeeper_jwt_token,
|
||||
@@ -482,12 +504,13 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
.map(humantime::Duration::into)
|
||||
.unwrap_or(Duration::MAX),
|
||||
handle_ps_local_disk_loss: args.handle_ps_local_disk_loss,
|
||||
persistence_config,
|
||||
};
|
||||
|
||||
// Validate that we can connect to the database
|
||||
Persistence::await_connection(&secrets.database_url, args.db_connect_timeout.into()).await?;
|
||||
|
||||
let persistence = Arc::new(Persistence::new(secrets.database_url).await);
|
||||
let persistence = Arc::new(Persistence::new(secrets.database_url, persistence_config).await);
|
||||
|
||||
let service = Service::spawn(config, persistence.clone()).await?;
|
||||
|
||||
|
||||
@@ -20,7 +20,8 @@ use futures::future::BoxFuture;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::controller_api::{
|
||||
AvailabilityZone, MetadataHealthRecord, NodeLifecycle, NodeSchedulingPolicy, PlacementPolicy,
|
||||
SafekeeperDescribeResponse, ShardSchedulingPolicy, SkSchedulingPolicy,
|
||||
SCSafekeeperTimelinesResponse, SafekeeperDescribeResponse, ShardSchedulingPolicy,
|
||||
SkSchedulingPolicy,
|
||||
};
|
||||
use pageserver_api::models::{ShardImportStatus, TenantConfig};
|
||||
use pageserver_api::shard::{
|
||||
@@ -37,10 +38,19 @@ use utils::id::{NodeId, TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use self::split_state::SplitState;
|
||||
use crate::hadron_queries::HadronSafekeeperRow;
|
||||
use crate::hadron_queries::PageserverAndSafekeeperConnectionInfo;
|
||||
use crate::hadron_queries::delete_timeline_safekeepers;
|
||||
use crate::hadron_queries::execute_safekeeper_list_timelines;
|
||||
use crate::hadron_queries::execute_sk_upsert;
|
||||
use crate::hadron_queries::get_pageserver_and_safekeeper_connection_info;
|
||||
use crate::hadron_queries::idempotently_persist_or_get_existing_timeline_safekeepers;
|
||||
use crate::hadron_queries::scan_safekeepers_and_scheduled_timelines;
|
||||
use crate::metrics::{
|
||||
DatabaseQueryErrorLabelGroup, DatabaseQueryLatencyLabelGroup, METRICS_REGISTRY,
|
||||
};
|
||||
use crate::node::Node;
|
||||
use crate::sk_node::SafeKeeperNode;
|
||||
use crate::timeline_import::{
|
||||
TimelineImport, TimelineImportUpdateError, TimelineImportUpdateFollowUp,
|
||||
};
|
||||
@@ -77,6 +87,38 @@ pub struct Persistence {
|
||||
connection_pool: Pool<AsyncPgConnection>,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone)]
|
||||
pub struct PersistenceConfig {
|
||||
max_connections: u32,
|
||||
idle_connection_timeout: Duration,
|
||||
max_connection_lifetime: Duration,
|
||||
}
|
||||
|
||||
impl PersistenceConfig {
|
||||
// If unspecified, use neon.com defaults
|
||||
//
|
||||
// The default postgres connection limit is 100. We use up to 99, to leave one free for a human admin under
|
||||
// normal circumstances. This assumes we have exclusive use of the database cluster to which we connect.
|
||||
pub const MAX_CONNECTIONS_DEFAULT: u32 = 99;
|
||||
// We don't want to keep a lot of connections alive: close them down promptly if they aren't being used.
|
||||
pub const IDLE_CONNECTION_TIMEOUT_DEFAULT: Duration = Duration::from_secs(10);
|
||||
pub const MAX_CONNECTION_LIFETIME_DEFAULT: Duration = Duration::from_secs(60);
|
||||
|
||||
pub fn new(
|
||||
max_connections: Option<u32>,
|
||||
idle_connection_timeout: Option<Duration>,
|
||||
max_connection_lifetime: Option<Duration>,
|
||||
) -> Self {
|
||||
PersistenceConfig {
|
||||
max_connections: max_connections.unwrap_or(Self::MAX_CONNECTIONS_DEFAULT),
|
||||
idle_connection_timeout: idle_connection_timeout
|
||||
.unwrap_or(Self::IDLE_CONNECTION_TIMEOUT_DEFAULT),
|
||||
max_connection_lifetime: max_connection_lifetime
|
||||
.unwrap_or(Self::MAX_CONNECTION_LIFETIME_DEFAULT),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Legacy format, for use in JSON compat objects in test environment
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct JsonPersistence {
|
||||
@@ -143,6 +185,20 @@ pub(crate) enum DatabaseOperation {
|
||||
DeleteTimelineImport,
|
||||
ListTimelineImports,
|
||||
IsTenantImportingTimeline,
|
||||
// Brickstore Hadron
|
||||
UpsertSafeKeeperNode,
|
||||
LoadSafeKeepersAndEndpoints,
|
||||
EnsureHadronEndpointTransaction,
|
||||
DeleteHadronEndpoint,
|
||||
GetHadronEndpointInfo,
|
||||
FetchComputeSpec,
|
||||
GetTenandIdByEndpointId,
|
||||
GetTenantShardsByEndpointId,
|
||||
GetComputeNamesByTenantId,
|
||||
GetOrCreateHadronTimelineSafekeeper,
|
||||
FetchPageServerAndSafeKeeperConnections,
|
||||
DeleteHadronTimeline,
|
||||
ListSafekeeperTimelines,
|
||||
}
|
||||
|
||||
#[must_use]
|
||||
@@ -179,11 +235,7 @@ impl Persistence {
|
||||
// normal circumstances. This assumes we have exclusive use of the database cluster to which we connect.
|
||||
pub const MAX_CONNECTIONS: u32 = 99;
|
||||
|
||||
// We don't want to keep a lot of connections alive: close them down promptly if they aren't being used.
|
||||
const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
const MAX_CONNECTION_LIFETIME: Duration = Duration::from_secs(60);
|
||||
|
||||
pub async fn new(database_url: String) -> Self {
|
||||
pub async fn new(database_url: String, config: PersistenceConfig) -> Self {
|
||||
let mut mgr_config = ManagerConfig::default();
|
||||
mgr_config.custom_setup = Box::new(establish_connection_rustls);
|
||||
|
||||
@@ -195,9 +247,9 @@ impl Persistence {
|
||||
// We will use a connection pool: this is primarily to _limit_ our connection count, rather than to optimize time
|
||||
// to execute queries (database queries are not generally on latency-sensitive paths).
|
||||
let connection_pool = Pool::builder()
|
||||
.max_size(Self::MAX_CONNECTIONS)
|
||||
.max_lifetime(Some(Self::MAX_CONNECTION_LIFETIME))
|
||||
.idle_timeout(Some(Self::IDLE_CONNECTION_TIMEOUT))
|
||||
.max_size(config.max_connections)
|
||||
.max_lifetime(Some(config.max_connection_lifetime))
|
||||
.idle_timeout(Some(config.idle_connection_timeout))
|
||||
// Always keep at least one connection ready to go
|
||||
.min_idle(Some(1))
|
||||
.test_on_check_out(true)
|
||||
@@ -2135,6 +2187,134 @@ impl Persistence {
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////
|
||||
//////////////////////// Hadron methods ////////////////////////
|
||||
//////////////////////// (Brickstore) //////////////////////////
|
||||
////////////////////////////////////////////////////////////////
|
||||
|
||||
/// Upsert a SafeKeeper node.
|
||||
#[allow(unused)]
|
||||
pub(crate) async fn upsert_sk_node(&self, sk_node: &SafeKeeperNode) -> DatabaseResult<()> {
|
||||
let sk_row = sk_node.to_database_row();
|
||||
self.with_measured_conn(DatabaseOperation::UpsertSafeKeeperNode, move |conn| {
|
||||
// Incantation to make the borrow checker happy
|
||||
let sk_row_clone = sk_row.clone();
|
||||
Box::pin(async move { execute_sk_upsert(conn, sk_row_clone).await })
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Load all Safe Keeper nodes and their scheduled endpoints from the database. This method is called at startup to
|
||||
/// populate the SafeKeeperScheduler.
|
||||
#[allow(unused)]
|
||||
pub(crate) async fn load_safekeeper_scheduling_data(
|
||||
&self,
|
||||
) -> DatabaseResult<HashMap<NodeId, SafeKeeperNode>> {
|
||||
let sk_nodes: HashMap<NodeId, SafeKeeperNode> = self
|
||||
.with_measured_conn(
|
||||
DatabaseOperation::LoadSafeKeepersAndEndpoints,
|
||||
move |conn| {
|
||||
// Retrieve all Safe Keeper nodes from the hadron_safekeepers table, and all timelines (grouped by
|
||||
// safe keeper IDs) from the hadron_timeline_safekeepers table.
|
||||
Box::pin(async move { scan_safekeepers_and_scheduled_timelines(conn).await })
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
tracing::info!(
|
||||
"load_safekeepers_and_endpoints: loaded {} safekeepers",
|
||||
sk_nodes.len()
|
||||
);
|
||||
|
||||
Ok(sk_nodes)
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub(crate) async fn get_or_assign_safekeepers_to_timeline(
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
safekeepers: Vec<NodeId>,
|
||||
) -> DatabaseResult<Vec<NodeId>> {
|
||||
self.with_measured_conn(
|
||||
DatabaseOperation::GetOrCreateHadronTimelineSafekeeper,
|
||||
move |conn| {
|
||||
let safekeepers_clone = safekeepers.clone();
|
||||
Box::pin(async move {
|
||||
idempotently_persist_or_get_existing_timeline_safekeepers(
|
||||
conn,
|
||||
timeline_id,
|
||||
&safekeepers_clone,
|
||||
)
|
||||
.await
|
||||
})
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub(crate) async fn delete_hadron_timeline_safekeepers(
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
) -> DatabaseResult<()> {
|
||||
self.with_measured_conn(DatabaseOperation::DeleteHadronTimeline, move |conn| {
|
||||
Box::pin(async move {
|
||||
delete_timeline_safekeepers(conn, timeline_id).await?;
|
||||
Ok(())
|
||||
})
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub(crate) async fn get_pageserver_and_safekeepers(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> DatabaseResult<PageserverAndSafekeeperConnectionInfo> {
|
||||
self.with_measured_conn(
|
||||
DatabaseOperation::FetchPageServerAndSafeKeeperConnections,
|
||||
move |conn| {
|
||||
Box::pin(async move {
|
||||
get_pageserver_and_safekeeper_connection_info(conn, tenant_id, timeline_id)
|
||||
.await
|
||||
})
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub(crate) async fn list_hadron_safekeepers(&self) -> DatabaseResult<Vec<HadronSafekeeperRow>> {
|
||||
let safekeepers: Vec<HadronSafekeeperRow> = self
|
||||
.with_measured_conn(DatabaseOperation::ListNodes, move |conn| {
|
||||
Box::pin(async move {
|
||||
Ok(crate::schema::hadron_safekeepers::table
|
||||
.load::<HadronSafekeeperRow>(conn)
|
||||
.await?)
|
||||
})
|
||||
})
|
||||
.await?;
|
||||
|
||||
tracing::info!(
|
||||
"list_hadron_safekeepers: loaded {} nodes",
|
||||
safekeepers.len()
|
||||
);
|
||||
|
||||
Ok(safekeepers)
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub(crate) async fn safekeeper_list_timelines(
|
||||
&self,
|
||||
id: i64,
|
||||
) -> DatabaseResult<SCSafekeeperTimelinesResponse> {
|
||||
self.with_measured_conn(DatabaseOperation::ListSafekeeperTimelines, move |conn| {
|
||||
Box::pin(async move { execute_safekeeper_list_timelines(conn, id).await })
|
||||
})
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn load_certs() -> anyhow::Result<Arc<rustls::RootCertStore>> {
|
||||
@@ -2223,15 +2403,45 @@ fn client_config_with_root_certs() -> anyhow::Result<rustls::ClientConfig> {
|
||||
})
|
||||
}
|
||||
|
||||
// Hadron's implementation of establish_connection_rustls which avoids hogging the tokio executor thread during
|
||||
// CPU-intensive operations in postgres connection and session establishments.
|
||||
// Compared to the original implementation this function performs the following tasks using spawn_blocking to avoid
|
||||
// hogging the tokio executor thread:
|
||||
// 1. Parsing and decoding root certificates during rustls client config setup.
|
||||
// 2. The tokio_postgres::connect() call, which performs the TLS handshake and the postgres password authentication.
|
||||
fn establish_connection_rustls(config: &str) -> BoxFuture<ConnectionResult<AsyncPgConnection>> {
|
||||
let fut = async {
|
||||
let fut = async move {
|
||||
// We first set up the way we want rustls to work.
|
||||
let rustls_config = client_config_with_root_certs()
|
||||
.map_err(|err| ConnectionError::BadConnection(format!("{err:?}")))?;
|
||||
let tls = tokio_postgres_rustls::MakeRustlsConnect::new(rustls_config);
|
||||
let (client, conn) = tokio_postgres::connect(config, tls)
|
||||
let rustls_config = tokio::task::spawn_blocking(client_config_with_root_certs)
|
||||
.await
|
||||
.map_err(|e| ConnectionError::BadConnection(e.to_string()))?;
|
||||
.map_err(|e| {
|
||||
ConnectionError::BadConnection(format!(
|
||||
"Error in spawn_blocking client_config_with_root_certs: {e}"
|
||||
))
|
||||
})
|
||||
.and_then(|r| {
|
||||
r.map_err(|e| {
|
||||
ConnectionError::BadConnection(format!(
|
||||
"Error in client_config_with_root_certs: {e}"
|
||||
))
|
||||
})
|
||||
})?;
|
||||
|
||||
let tls = tokio_postgres_rustls::MakeRustlsConnect::new(rustls_config);
|
||||
|
||||
// Perform the expensive TLS handshake and SCRAM SHA calculations in a blocking task
|
||||
let task_owned_config = config.to_owned();
|
||||
let (client, conn) = tokio::task::spawn_blocking(move || {
|
||||
tokio::runtime::Handle::current()
|
||||
.block_on(async { tokio_postgres::connect(&task_owned_config, tls).await })
|
||||
})
|
||||
.await
|
||||
.map_err(|e| {
|
||||
ConnectionError::BadConnection(format!(
|
||||
"Error in spawn_blocking tokio_postgres::connect: {e}"
|
||||
))
|
||||
})
|
||||
.and_then(|r| r.map_err(|e| ConnectionError::BadConnection(e.to_string())))?;
|
||||
|
||||
AsyncPgConnection::try_from_client_and_connection(client, conn).await
|
||||
};
|
||||
|
||||
@@ -85,7 +85,7 @@ use crate::peer_client::GlobalObservedState;
|
||||
use crate::persistence::split_state::SplitState;
|
||||
use crate::persistence::{
|
||||
AbortShardSplitStatus, ControllerPersistence, DatabaseError, DatabaseResult,
|
||||
MetadataHealthPersistence, Persistence, ShardGenerationState, TenantFilter,
|
||||
MetadataHealthPersistence, Persistence, PersistenceConfig, ShardGenerationState, TenantFilter,
|
||||
TenantShardPersistence,
|
||||
};
|
||||
use crate::reconciler::{
|
||||
@@ -490,6 +490,8 @@ pub struct Config {
|
||||
|
||||
// Feature flag: Whether the storage controller should act to rectify pageserver-reported local disk loss.
|
||||
pub handle_ps_local_disk_loss: bool,
|
||||
|
||||
pub persistence_config: PersistenceConfig,
|
||||
}
|
||||
|
||||
impl From<DatabaseError> for ApiError {
|
||||
|
||||
56
storage_controller/src/sk_node.rs
Normal file
56
storage_controller/src/sk_node.rs
Normal file
@@ -0,0 +1,56 @@
|
||||
use serde::Serialize;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use utils::id::{NodeId, TimelineId};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::hadron_queries::HadronSafekeeperRow;
|
||||
|
||||
// In-memory representation of a Safe Keeper node.
|
||||
#[derive(Clone, Serialize)]
|
||||
pub(crate) struct SafeKeeperNode {
|
||||
pub(crate) id: NodeId,
|
||||
pub(crate) listen_http_addr: String,
|
||||
pub(crate) listen_http_port: u16,
|
||||
pub(crate) listen_pg_addr: String,
|
||||
pub(crate) listen_pg_port: u16,
|
||||
|
||||
// All timelines scheduled to this SK node. Some of the timelines may be associated with
|
||||
// a legacy "endpoint", a deprecated concept used in HCC compute CRUD APIs. The "endpoint"
|
||||
// concept will be retired after Public Preview launch.
|
||||
pub(crate) timelines: HashSet<TimelineId>,
|
||||
// All legacy endpoints and their associated timelines scheduled to this SK node.
|
||||
// Invariant: The timelines referenced in this map must be present in the `timelines` set above.
|
||||
pub(crate) legacy_endpoints: HashMap<Uuid, TimelineId>,
|
||||
}
|
||||
|
||||
impl SafeKeeperNode {
|
||||
#[allow(unused)]
|
||||
pub(crate) fn new(
|
||||
id: NodeId,
|
||||
listen_http_addr: String,
|
||||
listen_http_port: u16,
|
||||
listen_pg_addr: String,
|
||||
listen_pg_port: u16,
|
||||
) -> Self {
|
||||
Self {
|
||||
id,
|
||||
listen_http_addr,
|
||||
listen_http_port,
|
||||
listen_pg_addr,
|
||||
listen_pg_port,
|
||||
legacy_endpoints: HashMap::new(),
|
||||
timelines: HashSet::new(),
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub(crate) fn to_database_row(&self) -> HadronSafekeeperRow {
|
||||
HadronSafekeeperRow {
|
||||
sk_node_id: self.id.0 as i64,
|
||||
listen_http_addr: self.listen_http_addr.clone(),
|
||||
listen_http_port: self.listen_http_port as i32,
|
||||
listen_pg_addr: self.listen_pg_addr.clone(),
|
||||
listen_pg_port: self.listen_pg_port as i32,
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user