mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-18 21:50:37 +00:00
## Problem To test sharding, we need something to control it. We could write python code for doing this from the test runner, but this wouldn't be usable with neon_local run directly, and when we want to write tests with large number of shards/tenants, Rust is a better fit efficiently handling all the required state. This service enables automated tests to easily get a system with sharding/HA without the test itself having to set this all up by hand: existing tests can be run against sharded tenants just by setting a shard count when creating the tenant. ## Summary of changes Attachment service was previously a map of TenantId->TenantState, where the principal state stored for each tenant was the generation and the last attached pageserver. This enabled it to serve the re-attach and validate requests that the pageserver requires. In this PR, the scope of the service is extended substantially to do overall management of tenants in the pageserver, including tenant/timeline creation, live migration, evacuation of offline pageservers etc. This is done using synchronous code to make declarative changes to the tenant's intended state (`TenantState.policy` and `TenantState.intent`), which are then translated into calls into the pageserver by the `Reconciler`. Top level summary of modules within `control_plane/attachment_service/src`: - `tenant_state`: structure that represents one tenant shard. - `service`: implements the main high level such as tenant/timeline creation, marking a node offline, etc. - `scheduler`: for operations that need to pick a pageserver for a tenant, construct a scheduler and call into it. - `compute_hook`: receive notifications when a tenant shard is attached somewhere new. Once we have locations for all the shards in a tenant, emit an update to postgres configuration via the neon_local `LocalEnv`. - `http`: HTTP stubs. These mostly map to methods on `Service`, but are separated for readability and so that it'll be easier to adapt if/when we switch to another RPC layer. - `node`: structure that describes a pageserver node. The most important attribute of a node is its availability: marking a node offline causes tenant shards to reschedule away from it. This PR is a precursor to implementing the full sharding service for prod (#6342). What's the difference between this and a production-ready controller for pageservers? - JSON file persistence to be replaced with a database - Limited observability. - No concurrency limits. Marking a pageserver offline will try and migrate every tenant to a new pageserver concurrently, even if there are thousands. - Very simple scheduler that only knows to pick the pageserver with fewest tenants, and place secondary locations on a different pageserver than attached locations: it does not try to place shards for the same tenant on different pageservers. This matters little in tests, because picking the least-used pageserver usually results in round-robin placement. - Scheduler state is rebuilt exhaustively for each operation that requires a scheduler. - Relies on neon_local mechanisms for updating postgres: in production this would be something that flows through the real control plane. --------- Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
117 lines
4.1 KiB
Rust
117 lines
4.1 KiB
Rust
use std::collections::HashMap;
|
|
|
|
use control_plane::endpoint::ComputeControlPlane;
|
|
use control_plane::local_env::LocalEnv;
|
|
use pageserver_api::shard::{ShardCount, ShardIndex, TenantShardId};
|
|
use postgres_connection::parse_host_port;
|
|
use utils::id::{NodeId, TenantId};
|
|
|
|
pub(super) struct ComputeHookTenant {
|
|
shards: Vec<(ShardIndex, NodeId)>,
|
|
}
|
|
|
|
impl ComputeHookTenant {
|
|
pub(super) async fn maybe_reconfigure(&mut self, tenant_id: TenantId) -> anyhow::Result<()> {
|
|
// Find the highest shard count and drop any shards that aren't
|
|
// for that shard count.
|
|
let shard_count = self.shards.iter().map(|(k, _v)| k.shard_count).max();
|
|
let Some(shard_count) = shard_count else {
|
|
// No shards, nothing to do.
|
|
tracing::info!("ComputeHookTenant::maybe_reconfigure: no shards");
|
|
return Ok(());
|
|
};
|
|
|
|
self.shards.retain(|(k, _v)| k.shard_count == shard_count);
|
|
self.shards
|
|
.sort_by_key(|(shard, _node_id)| shard.shard_number);
|
|
|
|
if self.shards.len() == shard_count.0 as usize || shard_count == ShardCount(0) {
|
|
// We have pageservers for all the shards: proceed to reconfigure compute
|
|
let env = match LocalEnv::load_config() {
|
|
Ok(e) => e,
|
|
Err(e) => {
|
|
tracing::warn!(
|
|
"Couldn't load neon_local config, skipping compute update ({e})"
|
|
);
|
|
return Ok(());
|
|
}
|
|
};
|
|
let cplane = ComputeControlPlane::load(env.clone())
|
|
.expect("Error loading compute control plane");
|
|
|
|
let compute_pageservers = self
|
|
.shards
|
|
.iter()
|
|
.map(|(_shard, node_id)| {
|
|
let ps_conf = env
|
|
.get_pageserver_conf(*node_id)
|
|
.expect("Unknown pageserver");
|
|
let (pg_host, pg_port) = parse_host_port(&ps_conf.listen_pg_addr)
|
|
.expect("Unable to parse listen_pg_addr");
|
|
(pg_host, pg_port.unwrap_or(5432))
|
|
})
|
|
.collect::<Vec<_>>();
|
|
|
|
for (endpoint_name, endpoint) in &cplane.endpoints {
|
|
if endpoint.tenant_id == tenant_id && endpoint.status() == "running" {
|
|
tracing::info!("🔁 Reconfiguring endpoint {}", endpoint_name,);
|
|
endpoint.reconfigure(compute_pageservers.clone()).await?;
|
|
}
|
|
}
|
|
} else {
|
|
tracing::info!(
|
|
"ComputeHookTenant::maybe_reconfigure: not enough shards ({}/{})",
|
|
self.shards.len(),
|
|
shard_count.0
|
|
);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
/// The compute hook is a destination for notifications about changes to tenant:pageserver
|
|
/// mapping. It aggregates updates for the shards in a tenant, and when appropriate reconfigures
|
|
/// the compute connection string.
|
|
pub(super) struct ComputeHook {
|
|
state: tokio::sync::Mutex<HashMap<TenantId, ComputeHookTenant>>,
|
|
}
|
|
|
|
impl ComputeHook {
|
|
pub(super) fn new() -> Self {
|
|
Self {
|
|
state: Default::default(),
|
|
}
|
|
}
|
|
|
|
pub(super) async fn notify(
|
|
&self,
|
|
tenant_shard_id: TenantShardId,
|
|
node_id: NodeId,
|
|
) -> anyhow::Result<()> {
|
|
tracing::info!("ComputeHook::notify: {}->{}", tenant_shard_id, node_id);
|
|
let mut locked = self.state.lock().await;
|
|
let entry = locked
|
|
.entry(tenant_shard_id.tenant_id)
|
|
.or_insert_with(|| ComputeHookTenant { shards: Vec::new() });
|
|
|
|
let shard_index = ShardIndex {
|
|
shard_count: tenant_shard_id.shard_count,
|
|
shard_number: tenant_shard_id.shard_number,
|
|
};
|
|
|
|
let mut set = false;
|
|
for (existing_shard, existing_node) in &mut entry.shards {
|
|
if *existing_shard == shard_index {
|
|
*existing_node = node_id;
|
|
set = true;
|
|
}
|
|
}
|
|
if !set {
|
|
entry.shards.push((shard_index, node_id));
|
|
}
|
|
|
|
entry.maybe_reconfigure(tenant_shard_id.tenant_id).await
|
|
}
|
|
}
|