mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-06 04:52:55 +00:00
storage controller: rename TenantState to TenantShard (#7329)
This is a widely used type that had a misleading name: it's not the total state of a tenant, but rrepresents one shard.
This commit is contained in:
@@ -14,7 +14,7 @@ mod reconciler;
|
||||
mod scheduler;
|
||||
mod schema;
|
||||
pub mod service;
|
||||
mod tenant_state;
|
||||
mod tenant_shard;
|
||||
|
||||
#[derive(Ord, PartialOrd, Eq, PartialEq, Copy, Clone, Serialize)]
|
||||
struct Sequence(u64);
|
||||
|
||||
@@ -696,7 +696,7 @@ impl Persistence {
|
||||
}
|
||||
}
|
||||
|
||||
/// Parts of [`crate::tenant_state::TenantState`] that are stored durably
|
||||
/// Parts of [`crate::tenant_shard::TenantShard`] that are stored durably
|
||||
#[derive(Queryable, Selectable, Insertable, Serialize, Deserialize, Clone, Eq, PartialEq)]
|
||||
#[diesel(table_name = crate::schema::tenant_shards)]
|
||||
pub(crate) struct TenantShardPersistence {
|
||||
|
||||
@@ -18,14 +18,14 @@ use utils::sync::gate::GateGuard;
|
||||
|
||||
use crate::compute_hook::{ComputeHook, NotifyError};
|
||||
use crate::node::Node;
|
||||
use crate::tenant_state::{IntentState, ObservedState, ObservedStateLocation};
|
||||
use crate::tenant_shard::{IntentState, ObservedState, ObservedStateLocation};
|
||||
|
||||
const DEFAULT_HEATMAP_PERIOD: &str = "60s";
|
||||
|
||||
/// Object with the lifetime of the background reconcile task that is created
|
||||
/// for tenants which have a difference between their intent and observed states.
|
||||
pub(super) struct Reconciler {
|
||||
/// See [`crate::tenant_state::TenantState`] for the meanings of these fields: they are a snapshot
|
||||
/// See [`crate::tenant_shard::TenantShard`] for the meanings of these fields: they are a snapshot
|
||||
/// of a tenant's state from when we spawned a reconcile task.
|
||||
pub(super) tenant_shard_id: TenantShardId,
|
||||
pub(crate) shard: ShardIdentity,
|
||||
@@ -48,11 +48,11 @@ pub(super) struct Reconciler {
|
||||
|
||||
/// To avoid stalling if the cloud control plane is unavailable, we may proceed
|
||||
/// past failures in [`ComputeHook::notify`], but we _must_ remember that we failed
|
||||
/// so that we can set [`crate::tenant_state::TenantState::pending_compute_notification`] to ensure a later retry.
|
||||
/// so that we can set [`crate::tenant_shard::TenantShard::pending_compute_notification`] to ensure a later retry.
|
||||
pub(crate) compute_notify_failure: bool,
|
||||
|
||||
/// A means to abort background reconciliation: it is essential to
|
||||
/// call this when something changes in the original TenantState that
|
||||
/// call this when something changes in the original TenantShard that
|
||||
/// will make this reconciliation impossible or unnecessary, for
|
||||
/// example when a pageserver node goes offline, or the PlacementPolicy for
|
||||
/// the tenant is changed.
|
||||
@@ -66,7 +66,7 @@ pub(super) struct Reconciler {
|
||||
pub(crate) persistence: Arc<Persistence>,
|
||||
}
|
||||
|
||||
/// This is a snapshot of [`crate::tenant_state::IntentState`], but it does not do any
|
||||
/// This is a snapshot of [`crate::tenant_shard::IntentState`], but it does not do any
|
||||
/// reference counting for Scheduler. The IntentState is what the scheduler works with,
|
||||
/// and the TargetState is just the instruction for a particular Reconciler run.
|
||||
#[derive(Debug)]
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::{node::Node, tenant_state::TenantState};
|
||||
use crate::{node::Node, tenant_shard::TenantShard};
|
||||
use pageserver_api::controller_api::UtilizationScore;
|
||||
use serde::Serialize;
|
||||
use std::collections::HashMap;
|
||||
@@ -27,7 +27,7 @@ pub enum MaySchedule {
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct SchedulerNode {
|
||||
/// How many shards are currently scheduled on this node, via their [`crate::tenant_state::IntentState`].
|
||||
/// How many shards are currently scheduled on this node, via their [`crate::tenant_shard::IntentState`].
|
||||
shard_count: usize,
|
||||
|
||||
/// Whether this node is currently elegible to have new shards scheduled (this is derived
|
||||
@@ -84,7 +84,7 @@ impl std::ops::Add for AffinityScore {
|
||||
}
|
||||
}
|
||||
|
||||
// For carrying state between multiple calls to [`TenantState::schedule`], e.g. when calling
|
||||
// For carrying state between multiple calls to [`TenantShard::schedule`], e.g. when calling
|
||||
// it for many shards in the same tenant.
|
||||
#[derive(Debug, Default)]
|
||||
pub(crate) struct ScheduleContext {
|
||||
@@ -147,7 +147,7 @@ impl Scheduler {
|
||||
pub(crate) fn consistency_check<'a>(
|
||||
&self,
|
||||
nodes: impl Iterator<Item = &'a Node>,
|
||||
shards: impl Iterator<Item = &'a TenantState>,
|
||||
shards: impl Iterator<Item = &'a TenantShard>,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut expect_nodes: HashMap<NodeId, SchedulerNode> = HashMap::new();
|
||||
for node in nodes {
|
||||
@@ -398,7 +398,7 @@ pub(crate) mod test_utils {
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
use crate::tenant_state::IntentState;
|
||||
use crate::tenant_shard::IntentState;
|
||||
#[test]
|
||||
fn scheduler_basic() -> anyhow::Result<()> {
|
||||
let nodes = test_utils::make_test_nodes(2);
|
||||
|
||||
@@ -66,9 +66,9 @@ use crate::{
|
||||
persistence::{split_state::SplitState, DatabaseError, Persistence, TenantShardPersistence},
|
||||
reconciler::attached_location_conf,
|
||||
scheduler::Scheduler,
|
||||
tenant_state::{
|
||||
tenant_shard::{
|
||||
IntentState, ObservedState, ObservedStateLocation, ReconcileResult, ReconcileWaitError,
|
||||
ReconcilerWaiter, TenantState,
|
||||
ReconcilerWaiter, TenantShard,
|
||||
},
|
||||
};
|
||||
|
||||
@@ -92,7 +92,7 @@ pub const MAX_UNAVAILABLE_INTERVAL_DEFAULT: Duration = Duration::from_secs(30);
|
||||
|
||||
// Top level state available to all HTTP handlers
|
||||
struct ServiceState {
|
||||
tenants: BTreeMap<TenantShardId, TenantState>,
|
||||
tenants: BTreeMap<TenantShardId, TenantShard>,
|
||||
|
||||
nodes: Arc<HashMap<NodeId, Node>>,
|
||||
|
||||
@@ -102,7 +102,7 @@ struct ServiceState {
|
||||
impl ServiceState {
|
||||
fn new(
|
||||
nodes: HashMap<NodeId, Node>,
|
||||
tenants: BTreeMap<TenantShardId, TenantState>,
|
||||
tenants: BTreeMap<TenantShardId, TenantShard>,
|
||||
scheduler: Scheduler,
|
||||
) -> Self {
|
||||
Self {
|
||||
@@ -116,7 +116,7 @@ impl ServiceState {
|
||||
&mut self,
|
||||
) -> (
|
||||
&mut Arc<HashMap<NodeId, Node>>,
|
||||
&mut BTreeMap<TenantShardId, TenantState>,
|
||||
&mut BTreeMap<TenantShardId, TenantShard>,
|
||||
&mut Scheduler,
|
||||
) {
|
||||
(&mut self.nodes, &mut self.tenants, &mut self.scheduler)
|
||||
@@ -335,11 +335,11 @@ impl Service {
|
||||
|
||||
for (tenant_shard_id, shard_observations) in observed {
|
||||
for (node_id, observed_loc) in shard_observations {
|
||||
let Some(tenant_state) = tenants.get_mut(&tenant_shard_id) else {
|
||||
let Some(tenant_shard) = tenants.get_mut(&tenant_shard_id) else {
|
||||
cleanup.push((tenant_shard_id, node_id));
|
||||
continue;
|
||||
};
|
||||
tenant_state
|
||||
tenant_shard
|
||||
.observed
|
||||
.locations
|
||||
.insert(node_id, ObservedStateLocation { conf: observed_loc });
|
||||
@@ -348,14 +348,14 @@ impl Service {
|
||||
|
||||
// Populate each tenant's intent state
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
for (tenant_shard_id, tenant_state) in tenants.iter_mut() {
|
||||
for (tenant_shard_id, tenant_shard) in tenants.iter_mut() {
|
||||
if tenant_shard_id.shard_number == ShardNumber(0) {
|
||||
// Reset scheduling context each time we advance to the next Tenant
|
||||
schedule_context = ScheduleContext::default();
|
||||
}
|
||||
|
||||
tenant_state.intent_from_observed(scheduler);
|
||||
if let Err(e) = tenant_state.schedule(scheduler, &mut schedule_context) {
|
||||
tenant_shard.intent_from_observed(scheduler);
|
||||
if let Err(e) = tenant_shard.schedule(scheduler, &mut schedule_context) {
|
||||
// Non-fatal error: we are unable to properly schedule the tenant, perhaps because
|
||||
// not enough pageservers are available. The tenant may well still be available
|
||||
// to clients.
|
||||
@@ -364,11 +364,11 @@ impl Service {
|
||||
// If we're both intending and observed to be attached at a particular node, we will
|
||||
// emit a compute notification for this. In the case where our observed state does not
|
||||
// yet match our intent, we will eventually reconcile, and that will emit a compute notification.
|
||||
if let Some(attached_at) = tenant_state.stably_attached() {
|
||||
if let Some(attached_at) = tenant_shard.stably_attached() {
|
||||
compute_notifications.push((
|
||||
*tenant_shard_id,
|
||||
attached_at,
|
||||
tenant_state.shard.stripe_size,
|
||||
tenant_shard.shard.stripe_size,
|
||||
));
|
||||
}
|
||||
}
|
||||
@@ -743,7 +743,7 @@ impl Service {
|
||||
|
||||
/// Apply the contents of a [`ReconcileResult`] to our in-memory state: if the reconciliation
|
||||
/// was successful, this will update the observed state of the tenant such that subsequent
|
||||
/// calls to [`TenantState::maybe_reconcile`] will do nothing.
|
||||
/// calls to [`TenantShard::maybe_reconcile`] will do nothing.
|
||||
#[instrument(skip_all, fields(
|
||||
tenant_id=%result.tenant_shard_id.tenant_id, shard_id=%result.tenant_shard_id.shard_slug(),
|
||||
sequence=%result.sequence
|
||||
@@ -761,10 +761,10 @@ impl Service {
|
||||
tenant.generation = std::cmp::max(tenant.generation, result.generation);
|
||||
|
||||
// If the reconciler signals that it failed to notify compute, set this state on
|
||||
// the shard so that a future [`TenantState::maybe_reconcile`] will try again.
|
||||
// the shard so that a future [`TenantShard::maybe_reconcile`] will try again.
|
||||
tenant.pending_compute_notification = result.pending_compute_notification;
|
||||
|
||||
// Let the TenantState know it is idle.
|
||||
// Let the TenantShard know it is idle.
|
||||
tenant.reconcile_complete(result.sequence);
|
||||
|
||||
match result.result {
|
||||
@@ -979,7 +979,7 @@ impl Service {
|
||||
if let Some(generation_pageserver) = tsp.generation_pageserver {
|
||||
intent.set_attached(&mut scheduler, Some(NodeId(generation_pageserver as u64)));
|
||||
}
|
||||
let new_tenant = TenantState::from_persistent(tsp, intent)?;
|
||||
let new_tenant = TenantShard::from_persistent(tsp, intent)?;
|
||||
|
||||
tenants.insert(tenant_shard_id, new_tenant);
|
||||
}
|
||||
@@ -1126,7 +1126,7 @@ impl Service {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
locked.tenants.insert(
|
||||
attach_req.tenant_shard_id,
|
||||
TenantState::new(
|
||||
TenantShard::new(
|
||||
attach_req.tenant_shard_id,
|
||||
ShardIdentity::unsharded(),
|
||||
PlacementPolicy::Attached(0),
|
||||
@@ -1178,32 +1178,32 @@ impl Service {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let (_nodes, tenants, scheduler) = locked.parts_mut();
|
||||
|
||||
let tenant_state = tenants
|
||||
let tenant_shard = tenants
|
||||
.get_mut(&attach_req.tenant_shard_id)
|
||||
.expect("Checked for existence above");
|
||||
|
||||
if let Some(new_generation) = new_generation {
|
||||
tenant_state.generation = Some(new_generation);
|
||||
tenant_state.policy = PlacementPolicy::Attached(0);
|
||||
tenant_shard.generation = Some(new_generation);
|
||||
tenant_shard.policy = PlacementPolicy::Attached(0);
|
||||
} else {
|
||||
// This is a detach notification. We must update placement policy to avoid re-attaching
|
||||
// during background scheduling/reconciliation, or during storage controller restart.
|
||||
assert!(attach_req.node_id.is_none());
|
||||
tenant_state.policy = PlacementPolicy::Detached;
|
||||
tenant_shard.policy = PlacementPolicy::Detached;
|
||||
}
|
||||
|
||||
if let Some(attaching_pageserver) = attach_req.node_id.as_ref() {
|
||||
tracing::info!(
|
||||
tenant_id = %attach_req.tenant_shard_id,
|
||||
ps_id = %attaching_pageserver,
|
||||
generation = ?tenant_state.generation,
|
||||
generation = ?tenant_shard.generation,
|
||||
"issuing",
|
||||
);
|
||||
} else if let Some(ps_id) = tenant_state.intent.get_attached() {
|
||||
} else if let Some(ps_id) = tenant_shard.intent.get_attached() {
|
||||
tracing::info!(
|
||||
tenant_id = %attach_req.tenant_shard_id,
|
||||
%ps_id,
|
||||
generation = ?tenant_state.generation,
|
||||
generation = ?tenant_shard.generation,
|
||||
"dropping",
|
||||
);
|
||||
} else {
|
||||
@@ -1211,14 +1211,14 @@ impl Service {
|
||||
tenant_id = %attach_req.tenant_shard_id,
|
||||
"no-op: tenant already has no pageserver");
|
||||
}
|
||||
tenant_state
|
||||
tenant_shard
|
||||
.intent
|
||||
.set_attached(scheduler, attach_req.node_id);
|
||||
|
||||
tracing::info!(
|
||||
"attach_hook: tenant {} set generation {:?}, pageserver {}",
|
||||
attach_req.tenant_shard_id,
|
||||
tenant_state.generation,
|
||||
tenant_shard.generation,
|
||||
// TODO: this is an odd number of 0xf's
|
||||
attach_req.node_id.unwrap_or(utils::id::NodeId(0xfffffff))
|
||||
);
|
||||
@@ -1230,36 +1230,36 @@ impl Service {
|
||||
#[cfg(feature = "testing")]
|
||||
{
|
||||
if let Some(node_id) = attach_req.node_id {
|
||||
tenant_state.observed.locations = HashMap::from([(
|
||||
tenant_shard.observed.locations = HashMap::from([(
|
||||
node_id,
|
||||
ObservedStateLocation {
|
||||
conf: Some(attached_location_conf(
|
||||
tenant_state.generation.unwrap(),
|
||||
&tenant_state.shard,
|
||||
&tenant_state.config,
|
||||
tenant_shard.generation.unwrap(),
|
||||
&tenant_shard.shard,
|
||||
&tenant_shard.config,
|
||||
false,
|
||||
)),
|
||||
},
|
||||
)]);
|
||||
} else {
|
||||
tenant_state.observed.locations.clear();
|
||||
tenant_shard.observed.locations.clear();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(AttachHookResponse {
|
||||
gen: attach_req
|
||||
.node_id
|
||||
.map(|_| tenant_state.generation.expect("Test hook, not used on tenants that are mid-onboarding with a NULL generation").into().unwrap()),
|
||||
.map(|_| tenant_shard.generation.expect("Test hook, not used on tenants that are mid-onboarding with a NULL generation").into().unwrap()),
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn inspect(&self, inspect_req: InspectRequest) -> InspectResponse {
|
||||
let locked = self.inner.read().unwrap();
|
||||
|
||||
let tenant_state = locked.tenants.get(&inspect_req.tenant_shard_id);
|
||||
let tenant_shard = locked.tenants.get(&inspect_req.tenant_shard_id);
|
||||
|
||||
InspectResponse {
|
||||
attachment: tenant_state.and_then(|s| {
|
||||
attachment: tenant_shard.and_then(|s| {
|
||||
s.intent
|
||||
.get_attached()
|
||||
.map(|ps| (s.generation.expect("Test hook, not used on tenants that are mid-onboarding with a NULL generation").into().unwrap(), ps))
|
||||
@@ -1321,11 +1321,11 @@ impl Service {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
|
||||
for (tenant_shard_id, observed_loc) in configs.tenant_shards {
|
||||
let Some(tenant_state) = locked.tenants.get_mut(&tenant_shard_id) else {
|
||||
let Some(tenant_shard) = locked.tenants.get_mut(&tenant_shard_id) else {
|
||||
cleanup.push(tenant_shard_id);
|
||||
continue;
|
||||
};
|
||||
tenant_state
|
||||
tenant_shard
|
||||
.observed
|
||||
.locations
|
||||
.insert(node.get_id(), ObservedStateLocation { conf: observed_loc });
|
||||
@@ -1496,13 +1496,13 @@ impl Service {
|
||||
};
|
||||
|
||||
for req_tenant in validate_req.tenants {
|
||||
if let Some(tenant_state) = locked.tenants.get(&req_tenant.id) {
|
||||
let valid = tenant_state.generation == Some(Generation::new(req_tenant.gen));
|
||||
if let Some(tenant_shard) = locked.tenants.get(&req_tenant.id) {
|
||||
let valid = tenant_shard.generation == Some(Generation::new(req_tenant.gen));
|
||||
tracing::info!(
|
||||
"handle_validate: {}(gen {}): valid={valid} (latest {:?})",
|
||||
req_tenant.id,
|
||||
req_tenant.gen,
|
||||
tenant_state.generation
|
||||
tenant_shard.generation
|
||||
);
|
||||
response.tenants.push(ValidateResponseTenant {
|
||||
id: req_tenant.id,
|
||||
@@ -1688,7 +1688,7 @@ impl Service {
|
||||
continue;
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
let state = entry.insert(TenantState::new(
|
||||
let state = entry.insert(TenantShard::new(
|
||||
tenant_shard_id,
|
||||
ShardIdentity::from_params(
|
||||
tenant_shard_id.shard_number,
|
||||
@@ -2738,7 +2738,7 @@ impl Service {
|
||||
/// Returns None if the input iterator of shards does not include a shard with number=0
|
||||
fn tenant_describe_impl<'a>(
|
||||
&self,
|
||||
shards: impl Iterator<Item = &'a TenantState>,
|
||||
shards: impl Iterator<Item = &'a TenantShard>,
|
||||
) -> Option<TenantDescribeResponse> {
|
||||
let mut shard_zero = None;
|
||||
let mut describe_shards = Vec::new();
|
||||
@@ -3038,7 +3038,7 @@ impl Service {
|
||||
},
|
||||
);
|
||||
|
||||
let mut child_state = TenantState::new(child, child_shard, policy.clone());
|
||||
let mut child_state = TenantShard::new(child, child_shard, policy.clone());
|
||||
child_state.intent = IntentState::single(scheduler, Some(pageserver));
|
||||
child_state.observed = ObservedState {
|
||||
locations: child_observed,
|
||||
@@ -3046,7 +3046,7 @@ impl Service {
|
||||
child_state.generation = Some(generation);
|
||||
child_state.config = config.clone();
|
||||
|
||||
// The child's TenantState::splitting is intentionally left at the default value of Idle,
|
||||
// The child's TenantShard::splitting is intentionally left at the default value of Idle,
|
||||
// as at this point in the split process we have succeeded and this part is infallible:
|
||||
// we will never need to do any special recovery from this state.
|
||||
|
||||
@@ -3595,8 +3595,8 @@ impl Service {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// For debug/support: a full JSON dump of TenantStates. Returns a response so that
|
||||
/// we don't have to make TenantState clonable in the return path.
|
||||
/// For debug/support: a full JSON dump of TenantShards. Returns a response so that
|
||||
/// we don't have to make TenantShard clonable in the return path.
|
||||
pub(crate) fn tenants_dump(&self) -> Result<hyper::Response<hyper::Body>, ApiError> {
|
||||
let serialized = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
@@ -3700,7 +3700,7 @@ impl Service {
|
||||
}
|
||||
|
||||
/// For debug/support: a JSON dump of the [`Scheduler`]. Returns a response so that
|
||||
/// we don't have to make TenantState clonable in the return path.
|
||||
/// we don't have to make TenantShard clonable in the return path.
|
||||
pub(crate) fn scheduler_dump(&self) -> Result<hyper::Response<hyper::Body>, ApiError> {
|
||||
let serialized = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
@@ -3917,8 +3917,8 @@ impl Service {
|
||||
tracing::info!("Node {} transition to offline", node_id);
|
||||
let mut tenants_affected: usize = 0;
|
||||
|
||||
for (tenant_shard_id, tenant_state) in tenants {
|
||||
if let Some(observed_loc) = tenant_state.observed.locations.get_mut(&node_id) {
|
||||
for (tenant_shard_id, tenant_shard) in tenants {
|
||||
if let Some(observed_loc) = tenant_shard.observed.locations.get_mut(&node_id) {
|
||||
// When a node goes offline, we set its observed configuration to None, indicating unknown: we will
|
||||
// not assume our knowledge of the node's configuration is accurate until it comes back online
|
||||
observed_loc.conf = None;
|
||||
@@ -3931,24 +3931,24 @@ impl Service {
|
||||
continue;
|
||||
}
|
||||
|
||||
if tenant_state.intent.demote_attached(node_id) {
|
||||
tenant_state.sequence = tenant_state.sequence.next();
|
||||
if tenant_shard.intent.demote_attached(node_id) {
|
||||
tenant_shard.sequence = tenant_shard.sequence.next();
|
||||
|
||||
// TODO: populate a ScheduleContext including all shards in the same tenant_id (only matters
|
||||
// for tenants without secondary locations: if they have a secondary location, then this
|
||||
// schedule() call is just promoting an existing secondary)
|
||||
let mut schedule_context = ScheduleContext::default();
|
||||
|
||||
match tenant_state.schedule(scheduler, &mut schedule_context) {
|
||||
match tenant_shard.schedule(scheduler, &mut schedule_context) {
|
||||
Err(e) => {
|
||||
// It is possible that some tenants will become unschedulable when too many pageservers
|
||||
// go offline: in this case there isn't much we can do other than make the issue observable.
|
||||
// TODO: give TenantState a scheduling error attribute to be queried later.
|
||||
// TODO: give TenantShard a scheduling error attribute to be queried later.
|
||||
tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", node_id);
|
||||
}
|
||||
Ok(()) => {
|
||||
if self
|
||||
.maybe_reconcile_shard(tenant_state, &new_nodes)
|
||||
.maybe_reconcile_shard(tenant_shard, &new_nodes)
|
||||
.is_some()
|
||||
{
|
||||
tenants_affected += 1;
|
||||
@@ -3967,10 +3967,10 @@ impl Service {
|
||||
tracing::info!("Node {} transition to active", node_id);
|
||||
// When a node comes back online, we must reconcile any tenant that has a None observed
|
||||
// location on the node.
|
||||
for tenant_state in locked.tenants.values_mut() {
|
||||
if let Some(observed_loc) = tenant_state.observed.locations.get_mut(&node_id) {
|
||||
for tenant_shard in locked.tenants.values_mut() {
|
||||
if let Some(observed_loc) = tenant_shard.observed.locations.get_mut(&node_id) {
|
||||
if observed_loc.conf.is_none() {
|
||||
self.maybe_reconcile_shard(tenant_state, &new_nodes);
|
||||
self.maybe_reconcile_shard(tenant_shard, &new_nodes);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -4053,11 +4053,11 @@ impl Service {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Convenience wrapper around [`TenantState::maybe_reconcile`] that provides
|
||||
/// Convenience wrapper around [`TenantShard::maybe_reconcile`] that provides
|
||||
/// all the references to parts of Self that are needed
|
||||
fn maybe_reconcile_shard(
|
||||
&self,
|
||||
shard: &mut TenantState,
|
||||
shard: &mut TenantShard,
|
||||
nodes: &Arc<HashMap<NodeId, Node>>,
|
||||
) -> Option<ReconcilerWaiter> {
|
||||
shard.maybe_reconcile(
|
||||
@@ -4123,7 +4123,7 @@ impl Service {
|
||||
|
||||
let mut reconciles_spawned = 0;
|
||||
|
||||
let mut tenant_shards: Vec<&TenantState> = Vec::new();
|
||||
let mut tenant_shards: Vec<&TenantShard> = Vec::new();
|
||||
|
||||
// Limit on how many shards' optmizations each call to this function will execute. Combined
|
||||
// with the frequency of background calls, this acts as an implicit rate limit that runs a small
|
||||
@@ -4254,7 +4254,7 @@ impl Service {
|
||||
|
||||
pub async fn shutdown(&self) {
|
||||
// Note that this already stops processing any results from reconciles: so
|
||||
// we do not expect that our [`TenantState`] objects will reach a neat
|
||||
// we do not expect that our [`TenantShard`] objects will reach a neat
|
||||
// final state.
|
||||
self.cancel.cancel();
|
||||
|
||||
|
||||
@@ -50,7 +50,7 @@ where
|
||||
/// This struct implement Serialize for debugging purposes, but is _not_ persisted
|
||||
/// itself: see [`crate::persistence`] for the subset of tenant shard state that is persisted.
|
||||
#[derive(Serialize)]
|
||||
pub(crate) struct TenantState {
|
||||
pub(crate) struct TenantShard {
|
||||
pub(crate) tenant_shard_id: TenantShardId,
|
||||
|
||||
pub(crate) shard: ShardIdentity,
|
||||
@@ -354,7 +354,7 @@ pub(crate) struct ReconcilerHandle {
|
||||
}
|
||||
|
||||
/// When a reconcile task completes, it sends this result object
|
||||
/// to be applied to the primary TenantState.
|
||||
/// to be applied to the primary TenantShard.
|
||||
pub(crate) struct ReconcileResult {
|
||||
pub(crate) sequence: Sequence,
|
||||
/// On errors, `observed` should be treated as an incompleted description
|
||||
@@ -367,7 +367,7 @@ pub(crate) struct ReconcileResult {
|
||||
pub(crate) generation: Option<Generation>,
|
||||
pub(crate) observed: ObservedState,
|
||||
|
||||
/// Set [`TenantState::pending_compute_notification`] from this flag
|
||||
/// Set [`TenantShard::pending_compute_notification`] from this flag
|
||||
pub(crate) pending_compute_notification: bool,
|
||||
}
|
||||
|
||||
@@ -379,7 +379,7 @@ impl ObservedState {
|
||||
}
|
||||
}
|
||||
|
||||
impl TenantState {
|
||||
impl TenantShard {
|
||||
pub(crate) fn new(
|
||||
tenant_shard_id: TenantShardId,
|
||||
shard: ShardIdentity,
|
||||
@@ -1143,7 +1143,7 @@ pub(crate) mod tests {
|
||||
|
||||
use super::*;
|
||||
|
||||
fn make_test_tenant_shard(policy: PlacementPolicy) -> TenantState {
|
||||
fn make_test_tenant_shard(policy: PlacementPolicy) -> TenantShard {
|
||||
let tenant_id = TenantId::generate();
|
||||
let shard_number = ShardNumber(0);
|
||||
let shard_count = ShardCount::new(1);
|
||||
@@ -1153,7 +1153,7 @@ pub(crate) mod tests {
|
||||
shard_number,
|
||||
shard_count,
|
||||
};
|
||||
TenantState::new(
|
||||
TenantShard::new(
|
||||
tenant_shard_id,
|
||||
ShardIdentity::new(
|
||||
shard_number,
|
||||
@@ -1165,7 +1165,7 @@ pub(crate) mod tests {
|
||||
)
|
||||
}
|
||||
|
||||
fn make_test_tenant(policy: PlacementPolicy, shard_count: ShardCount) -> Vec<TenantState> {
|
||||
fn make_test_tenant(policy: PlacementPolicy, shard_count: ShardCount) -> Vec<TenantShard> {
|
||||
let tenant_id = TenantId::generate();
|
||||
|
||||
(0..shard_count.count())
|
||||
@@ -1177,7 +1177,7 @@ pub(crate) mod tests {
|
||||
shard_number,
|
||||
shard_count,
|
||||
};
|
||||
TenantState::new(
|
||||
TenantShard::new(
|
||||
tenant_shard_id,
|
||||
ShardIdentity::new(
|
||||
shard_number,
|
||||
@@ -1202,24 +1202,24 @@ pub(crate) mod tests {
|
||||
let mut scheduler = Scheduler::new(nodes.values());
|
||||
let mut context = ScheduleContext::default();
|
||||
|
||||
let mut tenant_state = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
||||
tenant_state
|
||||
let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
||||
tenant_shard
|
||||
.schedule(&mut scheduler, &mut context)
|
||||
.expect("we have enough nodes, scheduling should work");
|
||||
|
||||
// Expect to initially be schedule on to different nodes
|
||||
assert_eq!(tenant_state.intent.secondary.len(), 1);
|
||||
assert!(tenant_state.intent.attached.is_some());
|
||||
assert_eq!(tenant_shard.intent.secondary.len(), 1);
|
||||
assert!(tenant_shard.intent.attached.is_some());
|
||||
|
||||
let attached_node_id = tenant_state.intent.attached.unwrap();
|
||||
let secondary_node_id = *tenant_state.intent.secondary.iter().last().unwrap();
|
||||
let attached_node_id = tenant_shard.intent.attached.unwrap();
|
||||
let secondary_node_id = *tenant_shard.intent.secondary.iter().last().unwrap();
|
||||
assert_ne!(attached_node_id, secondary_node_id);
|
||||
|
||||
// Notifying the attached node is offline should demote it to a secondary
|
||||
let changed = tenant_state.intent.demote_attached(attached_node_id);
|
||||
let changed = tenant_shard.intent.demote_attached(attached_node_id);
|
||||
assert!(changed);
|
||||
assert!(tenant_state.intent.attached.is_none());
|
||||
assert_eq!(tenant_state.intent.secondary.len(), 2);
|
||||
assert!(tenant_shard.intent.attached.is_none());
|
||||
assert_eq!(tenant_shard.intent.secondary.len(), 2);
|
||||
|
||||
// Update the scheduler state to indicate the node is offline
|
||||
nodes
|
||||
@@ -1229,18 +1229,18 @@ pub(crate) mod tests {
|
||||
scheduler.node_upsert(nodes.get(&attached_node_id).unwrap());
|
||||
|
||||
// Scheduling the node should promote the still-available secondary node to attached
|
||||
tenant_state
|
||||
tenant_shard
|
||||
.schedule(&mut scheduler, &mut context)
|
||||
.expect("active nodes are available");
|
||||
assert_eq!(tenant_state.intent.attached.unwrap(), secondary_node_id);
|
||||
assert_eq!(tenant_shard.intent.attached.unwrap(), secondary_node_id);
|
||||
|
||||
// The original attached node should have been retained as a secondary
|
||||
assert_eq!(
|
||||
*tenant_state.intent.secondary.iter().last().unwrap(),
|
||||
*tenant_shard.intent.secondary.iter().last().unwrap(),
|
||||
attached_node_id
|
||||
);
|
||||
|
||||
tenant_state.intent.clear(&mut scheduler);
|
||||
tenant_shard.intent.clear(&mut scheduler);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1250,48 +1250,48 @@ pub(crate) mod tests {
|
||||
let nodes = make_test_nodes(3);
|
||||
let mut scheduler = Scheduler::new(nodes.values());
|
||||
|
||||
let mut tenant_state = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
||||
let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
||||
|
||||
tenant_state.observed.locations.insert(
|
||||
tenant_shard.observed.locations.insert(
|
||||
NodeId(3),
|
||||
ObservedStateLocation {
|
||||
conf: Some(LocationConfig {
|
||||
mode: LocationConfigMode::AttachedMulti,
|
||||
generation: Some(2),
|
||||
secondary_conf: None,
|
||||
shard_number: tenant_state.shard.number.0,
|
||||
shard_count: tenant_state.shard.count.literal(),
|
||||
shard_stripe_size: tenant_state.shard.stripe_size.0,
|
||||
shard_number: tenant_shard.shard.number.0,
|
||||
shard_count: tenant_shard.shard.count.literal(),
|
||||
shard_stripe_size: tenant_shard.shard.stripe_size.0,
|
||||
tenant_conf: TenantConfig::default(),
|
||||
}),
|
||||
},
|
||||
);
|
||||
|
||||
tenant_state.observed.locations.insert(
|
||||
tenant_shard.observed.locations.insert(
|
||||
NodeId(2),
|
||||
ObservedStateLocation {
|
||||
conf: Some(LocationConfig {
|
||||
mode: LocationConfigMode::AttachedStale,
|
||||
generation: Some(1),
|
||||
secondary_conf: None,
|
||||
shard_number: tenant_state.shard.number.0,
|
||||
shard_count: tenant_state.shard.count.literal(),
|
||||
shard_stripe_size: tenant_state.shard.stripe_size.0,
|
||||
shard_number: tenant_shard.shard.number.0,
|
||||
shard_count: tenant_shard.shard.count.literal(),
|
||||
shard_stripe_size: tenant_shard.shard.stripe_size.0,
|
||||
tenant_conf: TenantConfig::default(),
|
||||
}),
|
||||
},
|
||||
);
|
||||
|
||||
tenant_state.intent_from_observed(&mut scheduler);
|
||||
tenant_shard.intent_from_observed(&mut scheduler);
|
||||
|
||||
// The highest generationed attached location gets used as attached
|
||||
assert_eq!(tenant_state.intent.attached, Some(NodeId(3)));
|
||||
assert_eq!(tenant_shard.intent.attached, Some(NodeId(3)));
|
||||
// Other locations get used as secondary
|
||||
assert_eq!(tenant_state.intent.secondary, vec![NodeId(2)]);
|
||||
assert_eq!(tenant_shard.intent.secondary, vec![NodeId(2)]);
|
||||
|
||||
scheduler.consistency_check(nodes.values(), [&tenant_state].into_iter())?;
|
||||
scheduler.consistency_check(nodes.values(), [&tenant_shard].into_iter())?;
|
||||
|
||||
tenant_state.intent.clear(&mut scheduler);
|
||||
tenant_shard.intent.clear(&mut scheduler);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1300,23 +1300,23 @@ pub(crate) mod tests {
|
||||
let nodes = make_test_nodes(3);
|
||||
let mut scheduler = Scheduler::new(nodes.values());
|
||||
|
||||
let mut tenant_state = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
||||
let mut tenant_shard = make_test_tenant_shard(PlacementPolicy::Attached(1));
|
||||
|
||||
// In pause mode, schedule() shouldn't do anything
|
||||
tenant_state.scheduling_policy = ShardSchedulingPolicy::Pause;
|
||||
assert!(tenant_state
|
||||
tenant_shard.scheduling_policy = ShardSchedulingPolicy::Pause;
|
||||
assert!(tenant_shard
|
||||
.schedule(&mut scheduler, &mut ScheduleContext::default())
|
||||
.is_ok());
|
||||
assert!(tenant_state.intent.all_pageservers().is_empty());
|
||||
assert!(tenant_shard.intent.all_pageservers().is_empty());
|
||||
|
||||
// In active mode, schedule() works
|
||||
tenant_state.scheduling_policy = ShardSchedulingPolicy::Active;
|
||||
assert!(tenant_state
|
||||
tenant_shard.scheduling_policy = ShardSchedulingPolicy::Active;
|
||||
assert!(tenant_shard
|
||||
.schedule(&mut scheduler, &mut ScheduleContext::default())
|
||||
.is_ok());
|
||||
assert!(!tenant_state.intent.all_pageservers().is_empty());
|
||||
assert!(!tenant_shard.intent.all_pageservers().is_empty());
|
||||
|
||||
tenant_state.intent.clear(&mut scheduler);
|
||||
tenant_shard.intent.clear(&mut scheduler);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1429,7 +1429,7 @@ pub(crate) mod tests {
|
||||
fn optimize_til_idle(
|
||||
nodes: &HashMap<NodeId, Node>,
|
||||
scheduler: &mut Scheduler,
|
||||
shards: &mut [TenantState],
|
||||
shards: &mut [TenantShard],
|
||||
) {
|
||||
let mut loop_n = 0;
|
||||
loop {
|
||||
Reference in New Issue
Block a user