Compare commits

...

5 Commits

Author SHA1 Message Date
John Spray
5accdf03d0 control_plane: connection pool 2024-02-03 22:14:13 +00:00
John Spray
8029ab38e0 tests: basic scale test 2024-02-03 22:14:13 +00:00
John Spray
0b59599326 control_plane: logging improvements 2024-02-03 22:14:13 +00:00
John Spray
39cb20a38b fix tenant* 2024-02-03 22:14:13 +00:00
John Spray
55d73f461c control_plane/attachment_service: better Scheduler 2024-02-03 22:14:10 +00:00
9 changed files with 475 additions and 164 deletions

22
Cargo.lock generated
View File

@@ -288,6 +288,7 @@ dependencies = [
"pageserver_api",
"pageserver_client",
"postgres_connection",
"r2d2",
"reqwest",
"serde",
"serde_json",
@@ -1650,6 +1651,7 @@ dependencies = [
"diesel_derives",
"itoa",
"pq-sys",
"r2d2",
"serde_json",
]
@@ -4153,6 +4155,17 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "r2d2"
version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93"
dependencies = [
"log",
"parking_lot 0.12.1",
"scheduled-thread-pool",
]
[[package]]
name = "rand"
version = "0.7.3"
@@ -4866,6 +4879,15 @@ dependencies = [
"windows-sys 0.42.0",
]
[[package]]
name = "scheduled-thread-pool"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3cbc66816425a074528352f5789333ecff06ca41b36b0b0efdfbb29edc391a19"
dependencies = [
"parking_lot 0.12.1",
]
[[package]]
name = "scopeguard"
version = "1.1.0"

View File

@@ -24,7 +24,8 @@ tokio.workspace = true
tokio-util.workspace = true
tracing.workspace = true
diesel = { version = "2.1.4", features = ["serde_json", "postgres"] }
diesel = { version = "2.1.4", features = ["serde_json", "postgres", "r2d2"] }
r2d2 = { version = "0.8.10" }
utils = { path = "../../libs/utils/" }
metrics = { path = "../../libs/metrics/" }

View File

@@ -415,7 +415,7 @@ pub fn make_router(
tenant_service_handler(r, handle_tenant_timeline_create)
})
// Tenant detail GET passthrough to shard zero
.get("/v1/tenant/:tenant_id*", |r| {
.get("/v1/tenant/:tenant_id", |r| {
tenant_service_handler(r, handle_tenant_timeline_passthrough)
})
// Timeline GET passthrough to shard zero. Note that the `*` in the URL is a wildcard: any future

View File

@@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::str::FromStr;
use std::time::Duration;
use camino::Utf8Path;
use camino::Utf8PathBuf;
@@ -44,7 +45,7 @@ use crate::PlacementPolicy;
/// updated, and reads of nodes are always from memory, not the database. We only require that
/// we can UPDATE a node's scheduling mode reasonably quickly to mark a bad node offline.
pub struct Persistence {
database_url: String,
connection_pool: diesel::r2d2::Pool<diesel::r2d2::ConnectionManager<PgConnection>>,
// In test environments, we support loading+saving a JSON file. This is temporary, for the benefit of
// test_compatibility.py, so that we don't have to commit to making the database contents fully backward/forward
@@ -64,6 +65,8 @@ pub(crate) enum DatabaseError {
Query(#[from] diesel::result::Error),
#[error(transparent)]
Connection(#[from] diesel::result::ConnectionError),
#[error(transparent)]
ConnectionPool(#[from] r2d2::Error),
#[error("Logical error: {0}")]
Logical(String),
}
@@ -71,9 +74,31 @@ pub(crate) enum DatabaseError {
pub(crate) type DatabaseResult<T> = Result<T, DatabaseError>;
impl Persistence {
// The default postgres connection limit is 100. We use up to 99, to leave one free for a human admin under
// normal circumstances. This assumes we have exclusive use of the database cluster to which we connect.
const MAX_CONNECTIONS: u32 = 99;
// We don't want to keep a lot of connections alive: close them down promptly if they aren't being used.
const IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(10);
const MAX_CONNECTION_LIFETIME: Duration = Duration::from_secs(60);
pub fn new(database_url: String, json_path: Option<Utf8PathBuf>) -> Self {
let manager = diesel::r2d2::ConnectionManager::<PgConnection>::new(database_url);
// We will use a connection pool: this is primarily to _limit_ our connection count, rather than to optimize time
// to execute queries (database queries are not generally on latency-sensitive paths).
let connection_pool = diesel::r2d2::Pool::builder()
.max_size(Self::MAX_CONNECTIONS)
.max_lifetime(Some(Self::MAX_CONNECTION_LIFETIME))
.idle_timeout(Some(Self::IDLE_CONNECTION_TIMEOUT))
// Always keep at least one connection ready to go
.min_idle(Some(1))
.test_on_check_out(true)
.build(manager)
.expect("Could not build connection pool");
Self {
database_url,
connection_pool,
json_path,
}
}
@@ -84,14 +109,10 @@ impl Persistence {
F: Fn(&mut PgConnection) -> DatabaseResult<R> + Send + 'static,
R: Send + 'static,
{
let database_url = self.database_url.clone();
tokio::task::spawn_blocking(move || -> DatabaseResult<R> {
// TODO: connection pooling, such as via diesel::r2d2
let mut conn = PgConnection::establish(&database_url)?;
func(&mut conn)
})
.await
.expect("Task panic")
let mut conn = self.connection_pool.get()?;
tokio::task::spawn_blocking(move || -> DatabaseResult<R> { func(&mut conn) })
.await
.expect("Task panic")
}
/// When a node is first registered, persist it before using it for anything

View File

@@ -26,7 +26,7 @@ pub(super) struct Reconciler {
pub(super) tenant_shard_id: TenantShardId,
pub(crate) shard: ShardIdentity,
pub(crate) generation: Generation,
pub(crate) intent: IntentState,
pub(crate) intent: TargetState,
pub(crate) config: TenantConfig,
pub(crate) observed: ObservedState,
@@ -57,6 +57,32 @@ pub(super) struct Reconciler {
pub(crate) persistence: Arc<Persistence>,
}
/// This is a snapshot of [`crate::tenant_state::IntentState`], but it does not do any
/// reference counting for Scheduler. The IntentState is what the scheduler works with,
/// and the TargetState is just the instruction for a particular Reconciler run.
#[derive(Debug)]
pub(crate) struct TargetState {
pub(crate) attached: Option<NodeId>,
pub(crate) secondary: Vec<NodeId>,
}
impl TargetState {
pub(crate) fn from_intent(intent: &IntentState) -> Self {
Self {
attached: *intent.get_attached(),
secondary: intent.get_secondary().clone(),
}
}
fn all_pageservers(&self) -> Vec<NodeId> {
let mut result = self.secondary.clone();
if let Some(node_id) = &self.attached {
result.push(*node_id);
}
result
}
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum ReconcileError {
#[error(transparent)]

View File

@@ -1,9 +1,7 @@
use pageserver_api::shard::TenantShardId;
use std::collections::{BTreeMap, HashMap};
use crate::node::Node;
use std::collections::HashMap;
use utils::{http::error::ApiError, id::NodeId};
use crate::{node::Node, tenant_state::TenantState};
/// Scenarios in which we cannot find a suitable location for a tenant shard
#[derive(thiserror::Error, Debug)]
pub enum ScheduleError {
@@ -19,52 +17,88 @@ impl From<ScheduleError> for ApiError {
}
}
struct SchedulerNode {
/// How many shards are currently scheduled on this node, via their [`crate::tenant_state::IntentState`].
shard_count: usize,
/// Whether this node is currently elegible to have new shards scheduled (this is derived
/// from a node's availability state and scheduling policy).
may_schedule: bool,
}
pub(crate) struct Scheduler {
tenant_counts: HashMap<NodeId, usize>,
nodes: HashMap<NodeId, SchedulerNode>,
}
impl Scheduler {
pub(crate) fn new(
tenants: &BTreeMap<TenantShardId, TenantState>,
nodes: &HashMap<NodeId, Node>,
) -> Self {
let mut tenant_counts = HashMap::new();
for node_id in nodes.keys() {
tenant_counts.insert(*node_id, 0);
}
for tenant in tenants.values() {
if let Some(ps) = tenant.intent.attached {
let entry = tenant_counts.entry(ps).or_insert(0);
*entry += 1;
}
}
pub(crate) fn new(nodes: &HashMap<NodeId, Node>) -> Self {
let mut scheduler_nodes = HashMap::new();
for (node_id, node) in nodes {
if !node.may_schedule() {
tenant_counts.remove(node_id);
}
scheduler_nodes.insert(
*node_id,
SchedulerNode {
shard_count: 0,
may_schedule: node.may_schedule(),
},
);
}
Self { tenant_counts }
Self {
nodes: scheduler_nodes,
}
}
pub(crate) fn node_ref(&mut self, node_id: NodeId) {
let Some(node) = self.nodes.get_mut(&node_id) else {
debug_assert!(false);
tracing::error!("Scheduler missing node {node_id}");
return;
};
node.shard_count += 1;
}
pub(crate) fn node_deref(&mut self, node_id: NodeId) {
let Some(node) = self.nodes.get_mut(&node_id) else {
debug_assert!(false);
tracing::error!("Scheduler missing node {node_id}");
return;
};
node.shard_count -= 1;
}
pub(crate) fn node_upsert(&mut self, node_id: NodeId, may_schedule: bool) {
use std::collections::hash_map::Entry::*;
match self.nodes.entry(node_id) {
Occupied(mut entry) => {
entry.get_mut().may_schedule = may_schedule;
}
Vacant(entry) => {
entry.insert(SchedulerNode {
shard_count: 0,
may_schedule,
});
}
}
}
pub(crate) fn schedule_shard(
&mut self,
hard_exclude: &[NodeId],
) -> Result<NodeId, ScheduleError> {
if self.tenant_counts.is_empty() {
if self.nodes.is_empty() {
return Err(ScheduleError::NoPageservers);
}
let mut tenant_counts: Vec<(NodeId, usize)> = self
.tenant_counts
.nodes
.iter()
.filter_map(|(k, v)| {
if hard_exclude.contains(k) {
if hard_exclude.contains(k) || !v.may_schedule {
None
} else {
Some((*k, *v))
Some((*k, v.shard_count))
}
})
.collect();
@@ -83,7 +117,10 @@ impl Scheduler {
let node_id = tenant_counts.first().unwrap().0;
tracing::info!("scheduler selected node {node_id}");
*self.tenant_counts.get_mut(&node_id).unwrap() += 1;
// Note that we do not update shard count here to reflect the scheduling: that
// is IntentState's job when the scheduled location is used.
Ok(node_id)
}
}

View File

@@ -61,6 +61,8 @@ struct ServiceState {
nodes: Arc<HashMap<NodeId, Node>>,
scheduler: Scheduler,
compute_hook: Arc<ComputeHook>,
result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
@@ -72,14 +74,26 @@ impl ServiceState {
result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
nodes: HashMap<NodeId, Node>,
tenants: BTreeMap<TenantShardId, TenantState>,
scheduler: Scheduler,
) -> Self {
Self {
tenants,
nodes: Arc::new(nodes),
scheduler,
compute_hook: Arc::new(ComputeHook::new(config)),
result_tx,
}
}
fn parts_mut(
&mut self,
) -> (
&mut Arc<HashMap<NodeId, Node>>,
&mut BTreeMap<TenantShardId, TenantState>,
&mut Scheduler,
) {
(&mut self.nodes, &mut self.tenants, &mut self.scheduler)
}
}
#[derive(Clone)]
@@ -103,7 +117,9 @@ impl From<DatabaseError> for ApiError {
match err {
DatabaseError::Query(e) => ApiError::InternalServerError(e.into()),
// FIXME: ApiError doesn't have an Unavailable variant, but ShuttingDown maps to 503.
DatabaseError::Connection(_e) => ApiError::ShuttingDown,
DatabaseError::Connection(_) | DatabaseError::ConnectionPool(_) => {
ApiError::ShuttingDown
}
DatabaseError::Logical(reason) => {
ApiError::InternalServerError(anyhow::anyhow!(reason))
}
@@ -180,8 +196,10 @@ impl Service {
// Populate intent and observed states for all tenants, based on reported state on pageservers
let shard_count = {
let mut locked = self.inner.write().unwrap();
let (_nodes, tenants, scheduler) = locked.parts_mut();
for (tenant_shard_id, (node_id, observed_loc)) in observed {
let Some(tenant_state) = locked.tenants.get_mut(&tenant_shard_id) else {
let Some(tenant_state) = tenants.get_mut(&tenant_shard_id) else {
cleanup.push((tenant_shard_id, node_id));
continue;
};
@@ -193,10 +211,9 @@ impl Service {
}
// Populate each tenant's intent state
let mut scheduler = Scheduler::new(&locked.tenants, &nodes);
for (tenant_shard_id, tenant_state) in locked.tenants.iter_mut() {
for (tenant_shard_id, tenant_state) in tenants.iter_mut() {
tenant_state.intent_from_observed();
if let Err(e) = tenant_state.schedule(&mut scheduler) {
if let Err(e) = tenant_state.schedule(scheduler) {
// Non-fatal error: we are unable to properly schedule the tenant, perhaps because
// not enough pageservers are available. The tenant may well still be available
// to clients.
@@ -327,6 +344,8 @@ impl Service {
let mut tenants = BTreeMap::new();
let mut scheduler = Scheduler::new(&nodes);
for tsp in tenant_shard_persistence {
let tenant_shard_id = TenantShardId {
tenant_id: TenantId::from_str(tsp.tenant_id.as_str())?,
@@ -347,7 +366,10 @@ impl Service {
// it with what we can infer: the node for which a generation was most recently issued.
let mut intent = IntentState::new();
if tsp.generation_pageserver != i64::MAX {
intent.attached = Some(NodeId(tsp.generation_pageserver as u64))
intent.set_attached(
&mut scheduler,
Some(NodeId(tsp.generation_pageserver as u64)),
);
}
let new_tenant = TenantState {
@@ -377,6 +399,7 @@ impl Service {
result_tx,
nodes,
tenants,
scheduler,
))),
config,
persistence,
@@ -386,11 +409,14 @@ impl Service {
let result_task_this = this.clone();
tokio::task::spawn(async move {
while let Some(result) = result_rx.recv().await {
tracing::info!(
"Reconcile result for sequence {}, ok={}",
result.sequence,
result.result.is_ok()
let span = tracing::span!(tracing::Level::INFO, "reconcile_result",
tenant_id=%result.tenant_shard_id.tenant_id,
shard_id=%result.tenant_shard_id.shard_slug(),
sequence=%result.sequence,
);
let _span = span.enter();
tracing::info!("Handling ReconcileResult, ok={}", result.result.is_ok());
let mut locked = result_task_this.inner.write().unwrap();
let Some(tenant) = locked.tenants.get_mut(&result.tenant_shard_id) else {
// A reconciliation result might race with removing a tenant: drop results for
@@ -518,8 +544,9 @@ impl Service {
};
let mut locked = self.inner.write().unwrap();
let tenant_state = locked
.tenants
let (_nodes, tenants, scheduler) = locked.parts_mut();
let tenant_state = tenants
.get_mut(&attach_req.tenant_shard_id)
.expect("Checked for existence above");
@@ -539,7 +566,7 @@ impl Service {
generation = ?tenant_state.generation,
"issuing",
);
} else if let Some(ps_id) = tenant_state.intent.attached {
} else if let Some(ps_id) = tenant_state.intent.get_attached() {
tracing::info!(
tenant_id = %attach_req.tenant_shard_id,
%ps_id,
@@ -551,7 +578,9 @@ impl Service {
tenant_id = %attach_req.tenant_shard_id,
"no-op: tenant already has no pageserver");
}
tenant_state.intent.attached = attach_req.node_id;
tenant_state
.intent
.set_attached(scheduler, attach_req.node_id);
tracing::info!(
"attach_hook: tenant {} set generation {:?}, pageserver {}",
@@ -576,7 +605,7 @@ impl Service {
InspectResponse {
attachment: tenant_state.and_then(|s| {
s.intent
.attached
.get_attached()
.map(|ps| (s.generation.into().unwrap(), ps))
}),
}
@@ -728,16 +757,15 @@ impl Service {
let (waiters, response_shards) = {
let mut locked = self.inner.write().unwrap();
let (_nodes, tenants, scheduler) = locked.parts_mut();
let mut response_shards = Vec::new();
let mut scheduler = Scheduler::new(&locked.tenants, &locked.nodes);
for tenant_shard_id in create_ids {
tracing::info!("Creating shard {tenant_shard_id}...");
use std::collections::btree_map::Entry;
match locked.tenants.entry(tenant_shard_id) {
match tenants.entry(tenant_shard_id) {
Entry::Occupied(mut entry) => {
tracing::info!(
"Tenant shard {tenant_shard_id} already exists while creating"
@@ -747,7 +775,7 @@ impl Service {
// attached and secondary locations (independently) away frorm those
// pageservers also holding a shard for this tenant.
entry.get_mut().schedule(&mut scheduler).map_err(|e| {
entry.get_mut().schedule(scheduler).map_err(|e| {
ApiError::Conflict(format!(
"Failed to schedule shard {tenant_shard_id}: {e}"
))
@@ -758,7 +786,7 @@ impl Service {
node_id: entry
.get()
.intent
.attached
.get_attached()
.expect("We just set pageserver if it was None"),
generation: entry.get().generation.into().unwrap(),
});
@@ -780,7 +808,7 @@ impl Service {
}
state.config = create_req.config.clone();
state.schedule(&mut scheduler).map_err(|e| {
state.schedule(scheduler).map_err(|e| {
ApiError::Conflict(format!(
"Failed to schedule shard {tenant_shard_id}: {e}"
))
@@ -790,7 +818,7 @@ impl Service {
shard_id: tenant_shard_id,
node_id: state
.intent
.attached
.get_attached()
.expect("We just set pageserver if it was None"),
generation: state.generation.into().unwrap(),
});
@@ -866,16 +894,11 @@ impl Service {
let mut locked = self.inner.write().unwrap();
let result_tx = locked.result_tx.clone();
let compute_hook = locked.compute_hook.clone();
let pageservers = locked.nodes.clone();
let mut scheduler = Scheduler::new(&locked.tenants, &locked.nodes);
let (nodes, tenants, scheduler) = locked.parts_mut();
// Maybe we have existing shards
let mut create = true;
for (shard_id, shard) in locked
.tenants
.range_mut(TenantShardId::tenant_range(tenant_id))
{
for (shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
// Saw an existing shard: this is not a creation
create = false;
@@ -899,7 +922,7 @@ impl Service {
| LocationConfigMode::AttachedSingle
| LocationConfigMode::AttachedStale => {
// TODO: persistence for changes in policy
if pageservers.len() > 1 {
if nodes.len() > 1 {
shard.policy = PlacementPolicy::Double(1)
} else {
// Convenience for dev/test: if we just have one pageserver, import
@@ -909,11 +932,11 @@ impl Service {
}
}
shard.schedule(&mut scheduler)?;
shard.schedule(scheduler)?;
let maybe_waiter = shard.maybe_reconcile(
result_tx.clone(),
&pageservers,
nodes,
&compute_hook,
&self.config,
&self.persistence,
@@ -922,10 +945,10 @@ impl Service {
waiters.push(waiter);
}
if let Some(node_id) = shard.intent.attached {
if let Some(node_id) = shard.intent.get_attached() {
result.shards.push(TenantShardLocation {
shard_id: *shard_id,
node_id,
node_id: *node_id,
})
}
}
@@ -1002,7 +1025,7 @@ impl Service {
for (tenant_shard_id, shard) in
locked.tenants.range(TenantShardId::tenant_range(tenant_id))
{
let node_id = shard.intent.attached.ok_or_else(|| {
let node_id = shard.intent.get_attached().ok_or_else(|| {
ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled"))
})?;
let node = locked
@@ -1061,9 +1084,16 @@ impl Service {
// Drop in-memory state
{
let mut locked = self.inner.write().unwrap();
locked
.tenants
.retain(|tenant_shard_id, _shard| tenant_shard_id.tenant_id != tenant_id);
let (_nodes, tenants, scheduler) = locked.parts_mut();
// Dereference Scheduler from shards before dropping them
for (_tenant_shard_id, shard) in
tenants.range_mut(TenantShardId::tenant_range(tenant_id))
{
shard.intent.clear(scheduler);
}
tenants.retain(|tenant_shard_id, _shard| tenant_shard_id.tenant_id != tenant_id);
tracing::info!(
"Deleted tenant {tenant_id}, now have {} tenants",
locked.tenants.len()
@@ -1097,7 +1127,7 @@ impl Service {
for (tenant_shard_id, shard) in
locked.tenants.range(TenantShardId::tenant_range(tenant_id))
{
let node_id = shard.intent.attached.ok_or_else(|| {
let node_id = shard.intent.get_attached().ok_or_else(|| {
ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled"))
})?;
let node = locked
@@ -1177,7 +1207,7 @@ impl Service {
for (tenant_shard_id, shard) in
locked.tenants.range(TenantShardId::tenant_range(tenant_id))
{
let node_id = shard.intent.attached.ok_or_else(|| {
let node_id = shard.intent.get_attached().ok_or_else(|| {
ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled"))
})?;
let node = locked
@@ -1249,13 +1279,13 @@ impl Service {
// TODO: should use the ID last published to compute_hook, rather than the intent: the intent might
// point to somewhere we haven't attached yet.
let Some(node_id) = shard.intent.attached else {
let Some(node_id) = shard.intent.get_attached() else {
return Err(ApiError::Conflict(
"Cannot call timeline API on non-attached tenant".to_string(),
));
};
let Some(node) = locked.nodes.get(&node_id) else {
let Some(node) = locked.nodes.get(node_id) else {
// This should never happen
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"Shard refers to nonexistent node"
@@ -1280,12 +1310,13 @@ impl Service {
for (tenant_shard_id, shard) in locked.tenants.range(TenantShardId::tenant_range(tenant_id))
{
let node_id = shard
.intent
.attached
.ok_or(ApiError::BadRequest(anyhow::anyhow!(
"Cannot locate a tenant that is not attached"
)))?;
let node_id =
shard
.intent
.get_attached()
.ok_or(ApiError::BadRequest(anyhow::anyhow!(
"Cannot locate a tenant that is not attached"
)))?;
let node = pageservers
.get(&node_id)
@@ -1349,35 +1380,34 @@ impl Service {
) -> Result<TenantShardMigrateResponse, ApiError> {
let waiter = {
let mut locked = self.inner.write().unwrap();
let result_tx = locked.result_tx.clone();
let pageservers = locked.nodes.clone();
let compute_hook = locked.compute_hook.clone();
let (nodes, tenants, scheduler) = locked.parts_mut();
let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) else {
let Some(shard) = tenants.get_mut(&tenant_shard_id) else {
return Err(ApiError::NotFound(
anyhow::anyhow!("Tenant shard not found").into(),
));
};
if shard.intent.attached == Some(migrate_req.node_id) {
if shard.intent.get_attached() == &Some(migrate_req.node_id) {
// No-op case: we will still proceed to wait for reconciliation in case it is
// incomplete from an earlier update to the intent.
tracing::info!("Migrating: intent is unchanged {:?}", shard.intent);
} else {
let old_attached = shard.intent.attached;
let old_attached = *shard.intent.get_attached();
match shard.policy {
PlacementPolicy::Single => {
shard.intent.secondary.clear();
shard.intent.clear_secondary(scheduler);
}
PlacementPolicy::Double(_n) => {
// If our new attached node was a secondary, it no longer should be.
shard.intent.secondary.retain(|s| s != &migrate_req.node_id);
shard.intent.remove_secondary(scheduler, migrate_req.node_id);
// If we were already attached to something, demote that to a secondary
if let Some(old_attached) = old_attached {
shard.intent.secondary.push(old_attached);
shard.intent.push_secondary(scheduler, old_attached);
}
}
PlacementPolicy::Detached => {
@@ -1386,7 +1416,9 @@ impl Service {
)))
}
}
shard.intent.attached = Some(migrate_req.node_id);
shard
.intent
.set_attached(scheduler, Some(migrate_req.node_id));
tracing::info!("Migrating: new intent {:?}", shard.intent);
shard.sequence = shard.sequence.next();
@@ -1394,7 +1426,7 @@ impl Service {
shard.maybe_reconcile(
result_tx,
&pageservers,
nodes,
&compute_hook,
&self.config,
&self.persistence,
@@ -1478,6 +1510,9 @@ impl Service {
let mut locked = self.inner.write().unwrap();
let mut new_nodes = (*locked.nodes).clone();
locked
.scheduler
.node_upsert(register_req.node_id, new_node.may_schedule());
new_nodes.insert(register_req.node_id, new_node);
locked.nodes = Arc::new(new_nodes);
@@ -1494,8 +1529,9 @@ impl Service {
let mut locked = self.inner.write().unwrap();
let result_tx = locked.result_tx.clone();
let compute_hook = locked.compute_hook.clone();
let (nodes, tenants, scheduler) = locked.parts_mut();
let mut new_nodes = (*locked.nodes).clone();
let mut new_nodes = (**nodes).clone();
let Some(node) = new_nodes.get_mut(&config_req.node_id) else {
return Err(ApiError::NotFound(
@@ -1531,11 +1567,14 @@ impl Service {
// to wake up and start working.
}
// Update the scheduler, in case the elegibility of the node for new shards has changed
scheduler.node_upsert(node.id, node.may_schedule());
let new_nodes = Arc::new(new_nodes);
let mut scheduler = Scheduler::new(&locked.tenants, &new_nodes);
if offline_transition {
for (tenant_shard_id, tenant_state) in &mut locked.tenants {
let mut tenants_affected: usize = 0;
for (tenant_shard_id, tenant_state) in tenants {
if let Some(observed_loc) =
tenant_state.observed.locations.get_mut(&config_req.node_id)
{
@@ -1546,7 +1585,7 @@ impl Service {
if tenant_state.intent.notify_offline(config_req.node_id) {
tenant_state.sequence = tenant_state.sequence.next();
match tenant_state.schedule(&mut scheduler) {
match tenant_state.schedule(scheduler) {
Err(e) => {
// It is possible that some tenants will become unschedulable when too many pageservers
// go offline: in this case there isn't much we can do other than make the issue observable.
@@ -1554,17 +1593,27 @@ impl Service {
tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", config_req.node_id);
}
Ok(()) => {
tenant_state.maybe_reconcile(
result_tx.clone(),
&new_nodes,
&compute_hook,
&self.config,
&self.persistence,
);
if tenant_state
.maybe_reconcile(
result_tx.clone(),
&new_nodes,
&compute_hook,
&self.config,
&self.persistence,
)
.is_some()
{
tenants_affected += 1;
};
}
}
}
}
tracing::info!(
"Launched {} reconciler tasks for tenants affected by node {} going offline",
tenants_affected,
config_req.node_id
)
}
if active_transition {
@@ -1605,18 +1654,14 @@ impl Service {
let mut waiters = Vec::new();
let result_tx = locked.result_tx.clone();
let compute_hook = locked.compute_hook.clone();
let mut scheduler = Scheduler::new(&locked.tenants, &locked.nodes);
let pageservers = locked.nodes.clone();
let (nodes, tenants, scheduler) = locked.parts_mut();
for (_tenant_shard_id, shard) in locked
.tenants
.range_mut(TenantShardId::tenant_range(tenant_id))
{
shard.schedule(&mut scheduler)?;
for (_tenant_shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
shard.schedule(scheduler)?;
if let Some(waiter) = shard.maybe_reconcile(
result_tx.clone(),
&pageservers,
nodes,
&compute_hook,
&self.config,
&self.persistence,

View File

@@ -17,7 +17,9 @@ use crate::{
compute_hook::ComputeHook,
node::Node,
persistence::Persistence,
reconciler::{attached_location_conf, secondary_location_conf, ReconcileError, Reconciler},
reconciler::{
attached_location_conf, secondary_location_conf, ReconcileError, Reconciler, TargetState,
},
scheduler::{ScheduleError, Scheduler},
service, PlacementPolicy, Sequence,
};
@@ -81,8 +83,97 @@ pub(crate) struct TenantState {
#[derive(Default, Clone, Debug)]
pub(crate) struct IntentState {
pub(crate) attached: Option<NodeId>,
pub(crate) secondary: Vec<NodeId>,
attached: Option<NodeId>,
secondary: Vec<NodeId>,
}
impl IntentState {
pub(crate) fn set_attached(&mut self, scheduler: &mut Scheduler, new_attached: Option<NodeId>) {
if self.attached != new_attached {
if let Some(old_attached) = self.attached.take() {
scheduler.node_deref(old_attached);
}
if let Some(new_attached) = &new_attached {
scheduler.node_ref(*new_attached);
}
self.attached = new_attached;
}
}
pub(crate) fn push_secondary(&mut self, scheduler: &mut Scheduler, new_secondary: NodeId) {
debug_assert!(!self.secondary.contains(&new_secondary));
scheduler.node_ref(new_secondary);
self.secondary.push(new_secondary);
}
/// It is legal to call this with a node that is not currently a secondary: that is a no-op
pub(crate) fn remove_secondary(&mut self, scheduler: &mut Scheduler, node_id: NodeId) {
let index = self.secondary.iter().position(|n| *n == node_id);
if let Some(index) = index {
scheduler.node_deref(node_id);
self.secondary.remove(index);
}
}
pub(crate) fn clear_secondary(&mut self, scheduler: &mut Scheduler) {
for secondary in self.secondary.drain(..) {
scheduler.node_deref(secondary);
}
}
pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) {
if let Some(old_attached) = self.attached.take() {
scheduler.node_deref(old_attached);
}
self.clear_secondary(scheduler);
}
pub(crate) fn new() -> Self {
Self {
attached: None,
secondary: vec![],
}
}
pub(crate) fn all_pageservers(&self) -> Vec<NodeId> {
let mut result = Vec::new();
if let Some(p) = self.attached {
result.push(p)
}
result.extend(self.secondary.iter().copied());
result
}
pub(crate) fn get_attached(&self) -> &Option<NodeId> {
&self.attached
}
pub(crate) fn get_secondary(&self) -> &Vec<NodeId> {
&self.secondary
}
/// When a node goes offline, we update intents to avoid using it
/// as their attached pageserver.
///
/// Returns true if a change was made
pub(crate) fn notify_offline(&mut self, node_id: NodeId) -> bool {
if self.attached == Some(node_id) {
self.attached = None;
self.secondary.push(node_id);
true
} else {
false
}
}
}
impl Drop for IntentState {
fn drop(&mut self) {
// Must clear before dropping, to avoid leaving stale refcounts in the Scheduler
debug_assert!(self.attached.is_none() && self.secondary.is_empty());
}
}
#[derive(Default, Clone)]
@@ -175,39 +266,6 @@ pub(crate) struct ReconcileResult {
pub(crate) pending_compute_notification: bool,
}
impl IntentState {
pub(crate) fn new() -> Self {
Self {
attached: None,
secondary: vec![],
}
}
pub(crate) fn all_pageservers(&self) -> Vec<NodeId> {
let mut result = Vec::new();
if let Some(p) = self.attached {
result.push(p)
}
result.extend(self.secondary.iter().copied());
result
}
/// When a node goes offline, we update intents to avoid using it
/// as their attached pageserver.
///
/// Returns true if a change was made
pub(crate) fn notify_offline(&mut self, node_id: NodeId) -> bool {
if self.attached == Some(node_id) {
self.attached = None;
self.secondary.push(node_id);
true
} else {
false
}
}
}
impl ObservedState {
pub(crate) fn new() -> Self {
Self {
@@ -297,12 +355,12 @@ impl TenantState {
// Should have exactly one attached, and zero secondaries
if self.intent.attached.is_none() {
let node_id = scheduler.schedule_shard(&used_pageservers)?;
self.intent.attached = Some(node_id);
self.intent.set_attached(scheduler, Some(node_id));
used_pageservers.push(node_id);
modified = true;
}
if !self.intent.secondary.is_empty() {
self.intent.secondary.clear();
self.intent.clear_secondary(scheduler);
modified = true;
}
}
@@ -310,14 +368,14 @@ impl TenantState {
// Should have exactly one attached, and N secondaries
if self.intent.attached.is_none() {
let node_id = scheduler.schedule_shard(&used_pageservers)?;
self.intent.attached = Some(node_id);
self.intent.set_attached(scheduler, Some(node_id));
used_pageservers.push(node_id);
modified = true;
}
while self.intent.secondary.len() < secondary_count {
let node_id = scheduler.schedule_shard(&used_pageservers)?;
self.intent.secondary.push(node_id);
self.intent.push_secondary(scheduler, node_id);
used_pageservers.push(node_id);
modified = true;
}
@@ -325,12 +383,12 @@ impl TenantState {
Detached => {
// Should have no attached or secondary pageservers
if self.intent.attached.is_some() {
self.intent.attached = None;
self.intent.set_attached(scheduler, None);
modified = true;
}
if !self.intent.secondary.is_empty() {
self.intent.secondary.clear();
self.intent.clear_secondary(scheduler);
modified = true;
}
}
@@ -455,7 +513,7 @@ impl TenantState {
tenant_shard_id: self.tenant_shard_id,
shard: self.shard,
generation: self.generation,
intent: self.intent.clone(),
intent: TargetState::from_intent(&self.intent),
config: self.config.clone(),
observed: self.observed.clone(),
pageservers: pageservers.clone(),

View File

@@ -1,3 +1,5 @@
import concurrent.futures
import random
import time
from collections import defaultdict
@@ -6,7 +8,7 @@ from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.utils import tenant_delete_wait_completed, timeline_delete_wait_completed
from fixtures.pg_version import PgVersion
from fixtures.types import TenantId, TimelineId
from fixtures.types import TenantId, TenantShardId, TimelineId
from fixtures.utils import wait_until
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
@@ -343,3 +345,102 @@ def test_sharding_service_compute_hook(
assert notifications[1] == expect
wait_until(10, 1, received_restart_notification)
def test_sharding_service_many_tenants(
neon_env_builder: NeonEnvBuilder,
):
"""
Check that we cope well with a not-totally-trivial number of tenants.
This is checking for:
- Obvious concurrency bugs from issuing many tenant creations/modifications
concurrently.
- Obvious scaling bugs like O(N^2) scaling that would be so slow that even
a basic test starts failing from slowness.
This is _not_ a comprehensive scale test: just a basic sanity check that
we don't fall over for a thousand shards.
"""
neon_env_builder.num_pageservers = 5
env = neon_env_builder.init_start()
# Total tenants
tenant_count = 2000
# Shards per tenant
shard_count = 2
stripe_size = 1024
tenants = set(TenantId.generate() for _i in range(0, tenant_count))
virtual_ps_http = PageserverHttpClient(env.attachment_service_port, lambda: True)
# We use a fixed seed to make the test reproducible: we want a randomly
# chosen order, but not to change the order every time we run the test.
rng = random.Random(1234)
# We will create tenants directly via API, not via neon_local, to avoid any false
# serialization of operations in neon_local (it e.g. loads/saves a config file on each call)
with concurrent.futures.ThreadPoolExecutor() as executor:
futs = []
for tenant_id in tenants:
f = executor.submit(
env.attachment_service.tenant_create, tenant_id, shard_count, stripe_size
)
futs.append(f)
# Wait for creations to finish
for f in futs:
f.result()
# Generate a mixture of operations and dispatch them all concurrently
futs = []
for tenant_id in tenants:
op = rng.choice([0, 1, 2])
if op == 0:
# A fan-out write operation to all shards in a tenant (timeline creation)
f = executor.submit(
virtual_ps_http.timeline_create,
PgVersion.NOT_SET,
tenant_id,
TimelineId.generate(),
)
elif op == 1:
# A reconciler operation: migrate a shard.
shard_number = rng.randint(0, shard_count - 1)
tenant_shard_id = TenantShardId(tenant_id, shard_number, shard_count)
dest_ps_id = rng.choice([ps.id for ps in env.pageservers])
f = executor.submit(
env.attachment_service.tenant_shard_migrate, tenant_shard_id, dest_ps_id
)
elif op == 2:
# A passthrough read to shard zero
f = executor.submit(virtual_ps_http.tenant_status, tenant_id)
futs.append(f)
# Wait for mixed ops to finish
for f in futs:
f.result()
# Rolling node failures: this is a small number of requests, but results in a large
# number of scheduler calls and reconcile tasks.
for pageserver in env.pageservers:
env.attachment_service.node_configure(pageserver.id, {"availability": "Offline"})
# The sleeps are just to make sure we aren't optimizing-away any re-scheduling operations
# from a brief flap in node state.
time.sleep(1)
env.attachment_service.node_configure(pageserver.id, {"availability": "Active"})
time.sleep(1)
# Restart the storage controller
env.attachment_service.stop()
env.attachment_service.start()
# Restart pageservers: this exercises the /re-attach API
for pageserver in env.pageservers:
pageserver.stop()
pageserver.start()