control_plane/attachment_service: implement PlacementPolicy::Secondary, configuration updates (#6521)

During onboarding, the control plane may attempt ad-hoc creation of a
secondary location to facilitate live migration. This gives us two
problems to solve:
- Accept 'Secondary' mode in /location_config and use it to put the
tenant into secondary mode on some physical pageserver, then pass
through /tenant/xyz/secondary/download requests
- Create tenants with no generation initially, since the initial
`Secondary` mode call will not provide us a generation.

This PR also fixes modification of a tenant's TenantConf during
/location_conf, which was previously ignored, and refines the flow for
config modification:
- avoid bumping generations when the only reason we're reconciling an
attached location is a config change
- increment TenantState.sequence when spawning a reconciler: usually
schedule() does this, but when we do config changes that doesn't happen,
so without this change waiters would think reconciliation was done
immediately. `sequence` is a bit of a murky thing right now, as it's
dual-purposed for tracking waiters, and for checking if an existing
reconciliation is already making updates to our current sequence. I'll
follow up at some point to clarify it's purpose.
- test config modification at the end of onboarding test
This commit is contained in:
John Spray
2024-03-01 20:25:53 +00:00
committed by GitHub
parent ea0d35f3ca
commit 20d0939b00
11 changed files with 842 additions and 231 deletions

View File

@@ -0,0 +1,2 @@
ALTER TABLE tenant_shards ALTER generation SET NOT NULL;
ALTER TABLE tenant_shards ALTER generation_pageserver SET NOT NULL;

View File

@@ -0,0 +1,4 @@
ALTER TABLE tenant_shards ALTER generation DROP NOT NULL;
ALTER TABLE tenant_shards ALTER generation_pageserver DROP NOT NULL;

View File

@@ -1,9 +1,10 @@
use crate::reconciler::ReconcileError;
use crate::service::{Service, STARTUP_RECONCILE_TIMEOUT};
use crate::PlacementPolicy;
use hyper::{Body, Request, Response};
use hyper::{StatusCode, Uri};
use pageserver_api::models::{
TenantCreateRequest, TenantLocationConfigRequest, TenantShardSplitRequest,
TenantConfigRequest, TenantCreateRequest, TenantLocationConfigRequest, TenantShardSplitRequest,
TenantTimeTravelRequest, TimelineCreateRequest,
};
use pageserver_api::shard::TenantShardId;
@@ -117,9 +118,14 @@ async fn handle_tenant_create(
check_permissions(&req, Scope::PageServerApi)?;
let create_req = json_request::<TenantCreateRequest>(&mut req).await?;
// TODO: enable specifying this. Using Single as a default helps legacy tests to work (they
// have no expectation of HA).
let placement_policy = PlacementPolicy::Single;
json_response(
StatusCode::CREATED,
service.tenant_create(create_req).await?,
service.tenant_create(create_req, placement_policy).await?,
)
}
@@ -185,6 +191,27 @@ async fn handle_tenant_location_config(
)
}
async fn handle_tenant_config_set(
service: Arc<Service>,
mut req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::PageServerApi)?;
let config_req = json_request::<TenantConfigRequest>(&mut req).await?;
json_response(StatusCode::OK, service.tenant_config_set(config_req).await?)
}
async fn handle_tenant_config_get(
service: Arc<Service>,
req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
check_permissions(&req, Scope::PageServerApi)?;
json_response(StatusCode::OK, service.tenant_config_get(tenant_id)?)
}
async fn handle_tenant_time_travel_remote_storage(
service: Arc<Service>,
mut req: Request<Body>,
@@ -216,7 +243,15 @@ async fn handle_tenant_time_travel_remote_storage(
done_if_after_raw,
)
.await?;
json_response(StatusCode::OK, ())
}
async fn handle_tenant_secondary_download(
service: Arc<Service>,
req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
service.tenant_secondary_download(tenant_id).await?;
json_response(StatusCode::OK, ())
}
@@ -551,12 +586,21 @@ pub fn make_router(
.delete("/v1/tenant/:tenant_id", |r| {
tenant_service_handler(r, handle_tenant_delete)
})
.put("/v1/tenant/config", |r| {
tenant_service_handler(r, handle_tenant_config_set)
})
.get("/v1/tenant/:tenant_id/config", |r| {
tenant_service_handler(r, handle_tenant_config_get)
})
.put("/v1/tenant/:tenant_id/location_config", |r| {
tenant_service_handler(r, handle_tenant_location_config)
})
.put("/v1/tenant/:tenant_id/time_travel_remote_storage", |r| {
tenant_service_handler(r, handle_tenant_time_travel_remote_storage)
})
.post("/v1/tenant/:tenant_id/secondary/download", |r| {
tenant_service_handler(r, handle_tenant_secondary_download)
})
// Timeline operations
.delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
tenant_service_handler(r, handle_tenant_timeline_delete)

View File

@@ -13,14 +13,20 @@ mod schema;
pub mod service;
mod tenant_state;
#[derive(Clone, Serialize, Deserialize, Debug)]
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)]
enum PlacementPolicy {
/// Cheapest way to attach a tenant: just one pageserver, no secondary
Single,
/// Production-ready way to attach a tenant: one attached pageserver and
/// some number of secondaries.
Double(usize),
/// Do not attach to any pageservers
/// Create one secondary mode locations. This is useful when onboarding
/// a tenant, or for an idle tenant that we might want to bring online quickly.
Secondary,
/// Do not attach to any pageservers. This is appropriate for tenants that
/// have been idle for a long time, where we do not mind some delay in making
/// them available in future.
Detached,
}

View File

@@ -333,7 +333,15 @@ impl Persistence {
shard_number: ShardNumber(tsp.shard_number as u8),
shard_count: ShardCount::new(tsp.shard_count as u8),
};
result.insert(tenant_shard_id, Generation::new(tsp.generation as u32));
let Some(g) = tsp.generation else {
// If the generation_pageserver column was non-NULL, then the generation column should also be non-NULL:
// we only set generation_pageserver when setting generation.
return Err(DatabaseError::Logical(
"Generation should always be set after incrementing".to_string(),
));
};
result.insert(tenant_shard_id, Generation::new(g as u32));
}
Ok(result)
@@ -366,7 +374,85 @@ impl Persistence {
})
.await?;
Ok(Generation::new(updated.generation as u32))
// Generation is always non-null in the rseult: if the generation column had been NULL, then we
// should have experienced an SQL Confilict error while executing a query that tries to increment it.
debug_assert!(updated.generation.is_some());
let Some(g) = updated.generation else {
return Err(DatabaseError::Logical(
"Generation should always be set after incrementing".to_string(),
)
.into());
};
Ok(Generation::new(g as u32))
}
/// For use when updating a persistent property of a tenant, such as its config or placement_policy.
///
/// Do not use this for settting generation, unless in the special onboarding code path (/location_config)
/// API: use [`Self::increment_generation`] instead. Setting the generation via this route is a one-time thing
/// that we only do the first time a tenant is set to an attached policy via /location_config.
pub(crate) async fn update_tenant_shard(
&self,
tenant_shard_id: TenantShardId,
input_placement_policy: PlacementPolicy,
input_config: TenantConfig,
input_generation: Option<Generation>,
) -> DatabaseResult<()> {
use crate::schema::tenant_shards::dsl::*;
self.with_conn(move |conn| {
let query = diesel::update(tenant_shards)
.filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
.filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
.filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32));
if let Some(input_generation) = input_generation {
// Update includes generation column
query
.set((
generation.eq(Some(input_generation.into().unwrap() as i32)),
placement_policy
.eq(serde_json::to_string(&input_placement_policy).unwrap()),
config.eq(serde_json::to_string(&input_config).unwrap()),
))
.execute(conn)?;
} else {
// Update does not include generation column
query
.set((
placement_policy
.eq(serde_json::to_string(&input_placement_policy).unwrap()),
config.eq(serde_json::to_string(&input_config).unwrap()),
))
.execute(conn)?;
}
Ok(())
})
.await?;
Ok(())
}
pub(crate) async fn update_tenant_config(
&self,
input_tenant_id: TenantId,
input_config: TenantConfig,
) -> DatabaseResult<()> {
use crate::schema::tenant_shards::dsl::*;
self.with_conn(move |conn| {
diesel::update(tenant_shards)
.filter(tenant_id.eq(input_tenant_id.to_string()))
.set((config.eq(serde_json::to_string(&input_config).unwrap()),))
.execute(conn)?;
Ok(())
})
.await?;
Ok(())
}
pub(crate) async fn detach(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
@@ -377,7 +463,7 @@ impl Persistence {
.filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
.filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
.set((
generation_pageserver.eq(i64::MAX),
generation_pageserver.eq(Option::<i64>::None),
placement_policy.eq(serde_json::to_string(&PlacementPolicy::Detached).unwrap()),
))
.execute(conn)?;
@@ -503,12 +589,15 @@ pub(crate) struct TenantShardPersistence {
pub(crate) shard_stripe_size: i32,
// Latest generation number: next time we attach, increment this
// and use the incremented number when attaching
pub(crate) generation: i32,
// and use the incremented number when attaching.
//
// Generation is only None when first onboarding a tenant, where it may
// be in PlacementPolicy::Secondary and therefore have no valid generation state.
pub(crate) generation: Option<i32>,
// Currently attached pageserver
#[serde(rename = "pageserver")]
pub(crate) generation_pageserver: i64,
pub(crate) generation_pageserver: Option<i64>,
#[serde(default)]
pub(crate) placement_policy: String,

View File

@@ -26,7 +26,7 @@ pub(super) struct Reconciler {
/// of a tenant's state from when we spawned a reconcile task.
pub(super) tenant_shard_id: TenantShardId,
pub(crate) shard: ShardIdentity,
pub(crate) generation: Generation,
pub(crate) generation: Option<Generation>,
pub(crate) intent: TargetState,
pub(crate) config: TenantConfig,
pub(crate) observed: ObservedState,
@@ -312,7 +312,7 @@ impl Reconciler {
&self.shard,
&self.config,
LocationConfigMode::AttachedStale,
Some(self.generation),
self.generation,
None,
);
self.location_config(origin_ps_id, stale_conf, Some(Duration::from_secs(10)))
@@ -335,16 +335,17 @@ impl Reconciler {
}
// Increment generation before attaching to new pageserver
self.generation = self
.persistence
.increment_generation(self.tenant_shard_id, dest_ps_id)
.await?;
self.generation = Some(
self.persistence
.increment_generation(self.tenant_shard_id, dest_ps_id)
.await?,
);
let dest_conf = build_location_config(
&self.shard,
&self.config,
LocationConfigMode::AttachedMulti,
Some(self.generation),
self.generation,
None,
);
@@ -401,7 +402,7 @@ impl Reconciler {
&self.shard,
&self.config,
LocationConfigMode::AttachedSingle,
Some(self.generation),
self.generation,
None,
);
self.location_config(dest_ps_id, dest_final_conf.clone(), None)
@@ -433,22 +434,62 @@ impl Reconciler {
// If the attached pageserver is not attached, do so now.
if let Some(node_id) = self.intent.attached {
let mut wanted_conf =
attached_location_conf(self.generation, &self.shard, &self.config);
// If we are in an attached policy, then generation must have been set (null generations
// are only present when a tenant is initially loaded with a secondary policy)
debug_assert!(self.generation.is_some());
let Some(generation) = self.generation else {
return Err(ReconcileError::Other(anyhow::anyhow!(
"Attempted to attach with NULL generation"
)));
};
let mut wanted_conf = attached_location_conf(generation, &self.shard, &self.config);
match self.observed.locations.get(&node_id) {
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
// Nothing to do
tracing::info!(%node_id, "Observed configuration already correct.")
}
_ => {
observed => {
// In all cases other than a matching observed configuration, we will
// reconcile this location. This includes locations with different configurations, as well
// as locations with unknown (None) observed state.
self.generation = self
.persistence
.increment_generation(self.tenant_shard_id, node_id)
.await?;
wanted_conf.generation = self.generation.into();
// The general case is to increment the generation. However, there are cases
// where this is not necessary:
// - if we are only updating the TenantConf part of the location
// - if we are only changing the attachment mode (e.g. going to attachedmulti or attachedstale)
// and the location was already in the correct generation
let increment_generation = match observed {
None => true,
Some(ObservedStateLocation { conf: None }) => true,
Some(ObservedStateLocation {
conf: Some(observed),
}) => {
let generations_match = observed.generation == wanted_conf.generation;
use LocationConfigMode::*;
let mode_transition_requires_gen_inc =
match (observed.mode, wanted_conf.mode) {
// Usually the short-lived attachment modes (multi and stale) are only used
// in the case of [`Self::live_migrate`], but it is simple to handle them correctly
// here too. Locations are allowed to go Single->Stale and Multi->Single within the same generation.
(AttachedSingle, AttachedStale) => false,
(AttachedMulti, AttachedSingle) => false,
(lhs, rhs) => lhs != rhs,
};
!generations_match || mode_transition_requires_gen_inc
}
};
if increment_generation {
let generation = self
.persistence
.increment_generation(self.tenant_shard_id, node_id)
.await?;
self.generation = Some(generation);
wanted_conf.generation = generation.into();
}
tracing::info!(%node_id, "Observed configuration requires update.");
self.location_config(node_id, wanted_conf, None).await?;
self.compute_notify().await?;

View File

@@ -17,8 +17,8 @@ diesel::table! {
shard_number -> Int4,
shard_count -> Int4,
shard_stripe_size -> Int4,
generation -> Int4,
generation_pageserver -> Int8,
generation -> Nullable<Int4>,
generation_pageserver -> Nullable<Int8>,
placement_policy -> Varchar,
splitting -> Int2,
config -> Text,

View File

@@ -14,10 +14,13 @@ use control_plane::attachment_service::{
use diesel::result::DatabaseErrorKind;
use futures::{stream::FuturesUnordered, StreamExt};
use hyper::StatusCode;
use pageserver_api::controller_api::{
NodeAvailability, NodeConfigureRequest, NodeRegisterRequest, NodeSchedulingPolicy,
TenantCreateResponse, TenantCreateResponseShard, TenantLocateResponse,
TenantLocateResponseShard, TenantShardMigrateRequest, TenantShardMigrateResponse,
use pageserver_api::{
controller_api::{
NodeAvailability, NodeConfigureRequest, NodeRegisterRequest, NodeSchedulingPolicy,
TenantCreateResponse, TenantCreateResponseShard, TenantLocateResponse,
TenantLocateResponseShard, TenantShardMigrateRequest, TenantShardMigrateResponse,
},
models::TenantConfigRequest,
};
use pageserver_api::{
models::{
@@ -65,6 +68,11 @@ const SHORT_RECONCILE_TIMEOUT: Duration = Duration::from_secs(5);
// some data in it.
const RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
// If we receive a call using Secondary mode initially, it will omit generation. We will initialize
// tenant shards into this generation, and as long as it remains in this generation, we will accept
// input generation from future requests as authoritative.
const INITIAL_GENERATION: Generation = Generation::new(0);
/// How long [`Service::startup_reconcile`] is allowed to take before it should give
/// up on unresponsive pageservers and proceed.
pub(crate) const STARTUP_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
@@ -167,6 +175,21 @@ impl From<ReconcileWaitError> for ApiError {
}
}
#[allow(clippy::large_enum_variant)]
enum TenantCreateOrUpdate {
Create((TenantCreateRequest, PlacementPolicy)),
Update(Vec<ShardUpdate>),
}
struct ShardUpdate {
tenant_shard_id: TenantShardId,
placement_policy: PlacementPolicy,
tenant_config: TenantConfig,
/// If this is None, generation is not updated.
generation: Option<Generation>,
}
impl Service {
pub fn get_config(&self) -> &Config {
&self.config
@@ -571,6 +594,9 @@ impl Service {
// the shard so that a future [`TenantState::maybe_reconcile`] will try again.
tenant.pending_compute_notification = result.pending_compute_notification;
// Let the TenantState know it is idle.
tenant.reconcile_complete(result.sequence);
match result.result {
Ok(()) => {
for (node_id, loc) in &result.observed.locations {
@@ -661,8 +687,8 @@ impl Service {
// after when pageservers start up and register.
let mut node_ids = HashSet::new();
for tsp in &tenant_shard_persistence {
if tsp.generation_pageserver != i64::MAX {
node_ids.insert(tsp.generation_pageserver);
if let Some(node_id) = tsp.generation_pageserver {
node_ids.insert(node_id);
}
}
for node_id in node_ids {
@@ -699,18 +725,15 @@ impl Service {
// We will populate intent properly later in [`Self::startup_reconcile`], initially populate
// it with what we can infer: the node for which a generation was most recently issued.
let mut intent = IntentState::new();
if tsp.generation_pageserver != i64::MAX {
intent.set_attached(
&mut scheduler,
Some(NodeId(tsp.generation_pageserver as u64)),
);
if let Some(generation_pageserver) = tsp.generation_pageserver {
intent.set_attached(&mut scheduler, Some(NodeId(generation_pageserver as u64)));
}
let new_tenant = TenantState {
tenant_shard_id,
shard: shard_identity,
sequence: Sequence::initial(),
generation: Generation::new(tsp.generation as u32),
generation: tsp.generation.map(|g| Generation::new(g as u32)),
policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
intent,
observed: ObservedState::new(),
@@ -790,8 +813,8 @@ impl Service {
shard_number: attach_req.tenant_shard_id.shard_number.0 as i32,
shard_count: attach_req.tenant_shard_id.shard_count.literal() as i32,
shard_stripe_size: 0,
generation: 0,
generation_pageserver: i64::MAX,
generation: Some(0),
generation_pageserver: None,
placement_policy: serde_json::to_string(&PlacementPolicy::default()).unwrap(),
config: serde_json::to_string(&TenantConfig::default()).unwrap(),
splitting: SplitState::default(),
@@ -846,7 +869,7 @@ impl Service {
.expect("Checked for existence above");
if let Some(new_generation) = new_generation {
tenant_state.generation = new_generation;
tenant_state.generation = Some(new_generation);
} else {
// This is a detach notification. We must update placement policy to avoid re-attaching
// during background scheduling/reconciliation, or during attachment service restart.
@@ -896,7 +919,7 @@ impl Service {
node_id,
ObservedStateLocation {
conf: Some(attached_location_conf(
tenant_state.generation,
tenant_state.generation.unwrap(),
&tenant_state.shard,
&tenant_state.config,
)),
@@ -910,7 +933,7 @@ impl Service {
Ok(AttachHookResponse {
gen: attach_req
.node_id
.map(|_| tenant_state.generation.into().unwrap()),
.map(|_| tenant_state.generation.expect("Test hook, not used on tenants that are mid-onboarding with a NULL generation").into().unwrap()),
})
}
@@ -923,7 +946,7 @@ impl Service {
attachment: tenant_state.and_then(|s| {
s.intent
.get_attached()
.map(|ps| (s.generation.into().unwrap(), ps))
.map(|ps| (s.generation.expect("Test hook, not used on tenants that are mid-onboarding with a NULL generation").into().unwrap(), ps))
}),
}
}
@@ -973,7 +996,17 @@ impl Service {
continue;
};
shard_state.generation = std::cmp::max(shard_state.generation, new_gen);
// If [`Persistence::re_attach`] selected this shard, it must have alread
// had a generation set.
debug_assert!(shard_state.generation.is_some());
let Some(old_gen) = shard_state.generation else {
// Should never happen: would only return incremented generation
// for a tenant that already had a non-null generation.
return Err(ApiError::InternalServerError(anyhow::anyhow!(
"Generation must be set while re-attaching"
)));
};
shard_state.generation = Some(std::cmp::max(old_gen, new_gen));
if let Some(observed) = shard_state
.observed
.locations
@@ -1003,7 +1036,7 @@ impl Service {
for req_tenant in validate_req.tenants {
if let Some(tenant_state) = locked.tenants.get(&req_tenant.id) {
let valid = tenant_state.generation == Generation::new(req_tenant.gen);
let valid = tenant_state.generation == Some(Generation::new(req_tenant.gen));
tracing::info!(
"handle_validate: {}(gen {}): valid={valid} (latest {:?})",
req_tenant.id,
@@ -1030,8 +1063,9 @@ impl Service {
pub(crate) async fn tenant_create(
&self,
create_req: TenantCreateRequest,
placement_policy: PlacementPolicy,
) -> Result<TenantCreateResponse, ApiError> {
let (response, waiters) = self.do_tenant_create(create_req).await?;
let (response, waiters) = self.do_tenant_create(create_req, placement_policy).await?;
self.await_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await?;
Ok(response)
@@ -1040,6 +1074,7 @@ impl Service {
pub(crate) async fn do_tenant_create(
&self,
create_req: TenantCreateRequest,
placement_policy: PlacementPolicy,
) -> Result<(TenantCreateResponse, Vec<ReconcilerWaiter>), ApiError> {
// This service expects to handle sharding itself: it is an error to try and directly create
// a particular shard here.
@@ -1065,9 +1100,27 @@ impl Service {
})
.collect::<Vec<_>>();
// TODO: enable specifying this. Using Single as a default helps legacy tests to work (they
// have no expectation of HA).
let placement_policy: PlacementPolicy = PlacementPolicy::Single;
// If the caller specifies a None generation, it means "start from default". This is different
// to [`Self::tenant_location_config`], where a None generation is used to represent
// an incompletely-onboarded tenant.
let initial_generation = if matches!(placement_policy, PlacementPolicy::Secondary) {
tracing::info!(
"tenant_create: secondary mode, generation is_some={}",
create_req.generation.is_some()
);
create_req.generation.map(Generation::new)
} else {
tracing::info!(
"tenant_create: not secondary mode, generation is_some={}",
create_req.generation.is_some()
);
Some(
create_req
.generation
.map(Generation::new)
.unwrap_or(INITIAL_GENERATION),
)
};
// Ordering: we persist tenant shards before creating them on the pageserver. This enables a caller
// to clean up after themselves by issuing a tenant deletion if something goes wrong and we restart
@@ -1079,8 +1132,10 @@ impl Service {
shard_number: tenant_shard_id.shard_number.0 as i32,
shard_count: tenant_shard_id.shard_count.literal() as i32,
shard_stripe_size: create_req.shard_parameters.stripe_size.0 as i32,
generation: create_req.generation.map(|g| g as i32).unwrap_or(0),
generation_pageserver: i64::MAX,
generation: initial_generation.map(|g| g.into().unwrap() as i32),
// The pageserver is not known until scheduling happens: we will set this column when
// incrementing the generation the first time we attach to a pageserver.
generation_pageserver: None,
placement_policy: serde_json::to_string(&placement_policy).unwrap(),
config: serde_json::to_string(&create_req.config).unwrap(),
splitting: SplitState::default(),
@@ -1120,15 +1175,17 @@ impl Service {
))
})?;
response_shards.push(TenantCreateResponseShard {
shard_id: tenant_shard_id,
node_id: entry
if let Some(node_id) = entry.get().intent.get_attached() {
let generation = entry
.get()
.intent
.get_attached()
.expect("We just set pageserver if it was None"),
generation: entry.get().generation.into().unwrap(),
});
.generation
.expect("Generation is set when in attached mode");
response_shards.push(TenantCreateResponseShard {
shard_id: tenant_shard_id,
node_id: *node_id,
generation: generation.into().unwrap(),
});
}
continue;
}
@@ -1142,9 +1199,7 @@ impl Service {
placement_policy.clone(),
);
if let Some(create_gen) = create_req.generation {
state.generation = Generation::new(create_gen);
}
state.generation = initial_generation;
state.config = create_req.config.clone();
state.schedule(scheduler).map_err(|e| {
@@ -1153,14 +1208,18 @@ impl Service {
))
})?;
response_shards.push(TenantCreateResponseShard {
shard_id: tenant_shard_id,
node_id: state
.intent
.get_attached()
.expect("We just set pageserver if it was None"),
generation: state.generation.into().unwrap(),
});
// Only include shards in result if we are attaching: the purpose
// of the response is to tell the caller where the shards are attached.
if let Some(node_id) = state.intent.get_attached() {
let generation = state
.generation
.expect("Generation is set when in attached mode");
response_shards.push(TenantCreateResponseShard {
shard_id: tenant_shard_id,
node_id: *node_id,
generation: generation.into().unwrap(),
});
}
entry.insert(state)
}
};
@@ -1214,12 +1273,114 @@ impl Service {
Ok(())
}
/// This API is used by the cloud control plane to do coarse-grained control of tenants:
/// - Call with mode Attached* to upsert the tenant.
/// - Call with mode Detached to switch to PolicyMode::Detached
/// Part of [`Self::tenant_location_config`]: dissect an incoming location config request,
/// and transform it into either a tenant creation of a series of shard updates.
fn tenant_location_config_prepare(
&self,
tenant_id: TenantId,
req: TenantLocationConfigRequest,
) -> TenantCreateOrUpdate {
let mut updates = Vec::new();
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, _scheduler) = locked.parts_mut();
// Use location config mode as an indicator of policy.
let placement_policy = match req.config.mode {
LocationConfigMode::Detached => PlacementPolicy::Detached,
LocationConfigMode::Secondary => PlacementPolicy::Secondary,
LocationConfigMode::AttachedMulti
| LocationConfigMode::AttachedSingle
| LocationConfigMode::AttachedStale => {
if nodes.len() > 1 {
PlacementPolicy::Double(1)
} else {
// Convenience for dev/test: if we just have one pageserver, import
// tenants into Single mode so that scheduling will succeed.
PlacementPolicy::Single
}
}
};
let mut create = true;
for (shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
// Saw an existing shard: this is not a creation
create = false;
// Shards may have initially been created by a Secondary request, where we
// would have left generation as None.
//
// We only update generation the first time we see an attached-mode request,
// and if there is no existing generation set. The caller is responsible for
// ensuring that no non-storage-controller pageserver ever uses a higher
// generation than they passed in here.
use LocationConfigMode::*;
let set_generation = match req.config.mode {
AttachedMulti | AttachedSingle | AttachedStale if shard.generation.is_none() => {
req.config.generation.map(Generation::new)
}
_ => None,
};
if shard.policy != placement_policy
|| shard.config != req.config.tenant_conf
|| set_generation.is_some()
{
updates.push(ShardUpdate {
tenant_shard_id: *shard_id,
placement_policy: placement_policy.clone(),
tenant_config: req.config.tenant_conf.clone(),
generation: set_generation,
});
}
}
if create {
use LocationConfigMode::*;
let generation = match req.config.mode {
AttachedMulti | AttachedSingle | AttachedStale => req.config.generation,
// If a caller provided a generation in a non-attached request, ignore it
// and leave our generation as None: this enables a subsequent update to set
// the generation when setting an attached mode for the first time.
_ => None,
};
TenantCreateOrUpdate::Create(
// Synthesize a creation request
(
TenantCreateRequest {
new_tenant_id: TenantShardId::unsharded(tenant_id),
generation,
shard_parameters: ShardParameters {
// Must preserve the incoming shard_count do distinguish unsharded (0)
// from single-sharded (1): this distinction appears in the S3 keys of the tenant.
count: req.tenant_id.shard_count,
// We only import un-sharded or single-sharded tenants, so stripe
// size can be made up arbitrarily here.
stripe_size: ShardParameters::DEFAULT_STRIPE_SIZE,
},
config: req.config.tenant_conf,
},
placement_policy,
),
)
} else {
TenantCreateOrUpdate::Update(updates)
}
}
/// This API is used by the cloud control plane to migrate unsharded tenants that it created
/// directly with pageservers into this service.
///
/// In future, calling with mode Secondary may switch to a detach-lite mode in which a tenant only has
/// secondary locations.
/// Cloud control plane MUST NOT continue issuing GENERATION NUMBERS for this tenant once it
/// has attempted to call this API. Failure to oblige to this rule may lead to S3 corruption.
/// Think of the first attempt to call this API as a transfer of absolute authority over the
/// tenant's source of generation numbers.
///
/// The mode in this request coarse-grained control of tenants:
/// - Call with mode Attached* to upsert the tenant.
/// - Call with mode Secondary to either onboard a tenant without attaching it, or
/// to set an existing tenant to PolicyMode::Secondary
/// - Call with mode Detached to switch to PolicyMode::Detached
pub(crate) async fn tenant_location_config(
&self,
tenant_id: TenantId,
@@ -1231,131 +1392,96 @@ impl Service {
)));
}
let mut waiters = Vec::new();
// First check if this is a creation or an update
let create_or_update = self.tenant_location_config_prepare(tenant_id, req);
let mut result = TenantLocationConfigResponse { shards: Vec::new() };
let maybe_create = {
let mut locked = self.inner.write().unwrap();
let result_tx = locked.result_tx.clone();
let compute_hook = locked.compute_hook.clone();
let (nodes, tenants, scheduler) = locked.parts_mut();
let waiters = match create_or_update {
TenantCreateOrUpdate::Create((create_req, placement_policy)) => {
let (create_resp, waiters) =
self.do_tenant_create(create_req, placement_policy).await?;
result.shards = create_resp
.shards
.into_iter()
.map(|s| TenantShardLocation {
node_id: s.node_id,
shard_id: s.shard_id,
})
.collect();
waiters
}
TenantCreateOrUpdate::Update(updates) => {
// Persist updates
// Ordering: write to the database before applying changes in-memory, so that
// we will not appear time-travel backwards on a restart.
for ShardUpdate {
tenant_shard_id,
placement_policy,
tenant_config,
generation,
} in &updates
{
self.persistence
.update_tenant_shard(
*tenant_shard_id,
placement_policy.clone(),
tenant_config.clone(),
*generation,
)
.await?;
}
// Maybe we have existing shards
let mut create = true;
for (shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
// Saw an existing shard: this is not a creation
create = false;
// Apply updates in-memory
let mut waiters = Vec::new();
{
let mut locked = self.inner.write().unwrap();
let result_tx = locked.result_tx.clone();
let compute_hook = locked.compute_hook.clone();
let (nodes, tenants, scheduler) = locked.parts_mut();
// Note that for existing tenants we do _not_ respect the generation in the request: this is likely
// to be stale. Once a tenant is created in this service, our view of generation is authoritative, and
// callers' generations may be ignored. This represents a one-way migration of tenants from the outer
// cloud control plane into this service.
for ShardUpdate {
tenant_shard_id,
placement_policy,
tenant_config,
generation: update_generation,
} in updates
{
let Some(shard) = tenants.get_mut(&tenant_shard_id) else {
tracing::warn!("Shard {tenant_shard_id} removed while updating");
continue;
};
// Use location config mode as an indicator of policy: if they ask for
// attached we go to default HA attached mode. If they ask for secondary
// we go to secondary-only mode. If they ask for detached we detach.
match req.config.mode {
LocationConfigMode::Detached => {
shard.policy = PlacementPolicy::Detached;
}
LocationConfigMode::Secondary => {
// TODO: implement secondary-only mode.
todo!();
}
LocationConfigMode::AttachedMulti
| LocationConfigMode::AttachedSingle
| LocationConfigMode::AttachedStale => {
// TODO: persistence for changes in policy
if nodes.len() > 1 {
shard.policy = PlacementPolicy::Double(1)
} else {
// Convenience for dev/test: if we just have one pageserver, import
// tenants into Single mode so that scheduling will succeed.
shard.policy = PlacementPolicy::Single
shard.policy = placement_policy;
shard.config = tenant_config;
if let Some(generation) = update_generation {
shard.generation = Some(generation);
}
shard.schedule(scheduler)?;
let maybe_waiter = shard.maybe_reconcile(
result_tx.clone(),
nodes,
&compute_hook,
&self.config,
&self.persistence,
&self.gate,
&self.cancel,
);
if let Some(waiter) = maybe_waiter {
waiters.push(waiter);
}
if let Some(node_id) = shard.intent.get_attached() {
result.shards.push(TenantShardLocation {
shard_id: tenant_shard_id,
node_id: *node_id,
})
}
}
}
shard.schedule(scheduler)?;
let maybe_waiter = shard.maybe_reconcile(
result_tx.clone(),
nodes,
&compute_hook,
&self.config,
&self.persistence,
&self.gate,
&self.cancel,
);
if let Some(waiter) = maybe_waiter {
waiters.push(waiter);
}
if let Some(node_id) = shard.intent.get_attached() {
result.shards.push(TenantShardLocation {
shard_id: *shard_id,
node_id: *node_id,
})
}
waiters
}
if create {
// Validate request mode
match req.config.mode {
LocationConfigMode::Detached | LocationConfigMode::Secondary => {
// When using this API to onboard an existing tenant to this service, it must start in
// an attached state, because we need the request to come with a generation
return Err(ApiError::BadRequest(anyhow::anyhow!(
"Imported tenant must be in attached mode"
)));
}
LocationConfigMode::AttachedMulti
| LocationConfigMode::AttachedSingle
| LocationConfigMode::AttachedStale => {
// Pass
}
}
// Validate request generation
let Some(generation) = req.config.generation else {
// We can only import attached tenants, because we need the request to come with a generation
return Err(ApiError::BadRequest(anyhow::anyhow!(
"Generation is mandatory when importing tenant"
)));
};
// Synthesize a creation request
Some(TenantCreateRequest {
new_tenant_id: TenantShardId::unsharded(tenant_id),
generation: Some(generation),
shard_parameters: ShardParameters {
// Must preserve the incoming shard_count do distinguish unsharded (0)
// from single-sharded (1): this distinction appears in the S3 keys of the tenant.
count: req.tenant_id.shard_count,
// We only import un-sharded or single-sharded tenants, so stripe
// size can be made up arbitrarily here.
stripe_size: ShardParameters::DEFAULT_STRIPE_SIZE,
},
config: req.config.tenant_conf,
})
} else {
None
}
};
let waiters = if let Some(create_req) = maybe_create {
let (create_resp, waiters) = self.do_tenant_create(create_req).await?;
result.shards = create_resp
.shards
.into_iter()
.map(|s| TenantShardLocation {
node_id: s.node_id,
shard_id: s.shard_id,
})
.collect();
waiters
} else {
waiters
};
if let Err(e) = self.await_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await {
@@ -1375,6 +1501,91 @@ impl Service {
Ok(result)
}
pub(crate) async fn tenant_config_set(&self, req: TenantConfigRequest) -> Result<(), ApiError> {
let tenant_id = req.tenant_id;
let config = req.config;
self.persistence
.update_tenant_config(req.tenant_id, config.clone())
.await?;
let waiters = {
let mut waiters = Vec::new();
let mut locked = self.inner.write().unwrap();
let result_tx = locked.result_tx.clone();
let compute_hook = locked.compute_hook.clone();
let (nodes, tenants, _scheduler) = locked.parts_mut();
for (_shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
shard.config = config.clone();
if let Some(waiter) = shard.maybe_reconcile(
result_tx.clone(),
nodes,
&compute_hook,
&self.config,
&self.persistence,
&self.gate,
&self.cancel,
) {
waiters.push(waiter);
}
}
waiters
};
if let Err(e) = self.await_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await {
// Treat this as success because we have stored the configuration. If e.g.
// a node was unavailable at this time, it should not stop us accepting a
// configuration change.
tracing::warn!(%tenant_id, "Accepted configuration update but reconciliation failed: {e}");
}
Ok(())
}
pub(crate) fn tenant_config_get(
&self,
tenant_id: TenantId,
) -> Result<HashMap<&str, serde_json::Value>, ApiError> {
let config = {
let locked = self.inner.read().unwrap();
match locked
.tenants
.range(TenantShardId::tenant_range(tenant_id))
.next()
{
Some((_tenant_shard_id, shard)) => shard.config.clone(),
None => {
return Err(ApiError::NotFound(
anyhow::anyhow!("Tenant not found").into(),
))
}
}
};
// Unlike the pageserver, we do not have a set of global defaults: the config is
// entirely per-tenant. Therefore the distinction between `tenant_specific_overrides`
// and `effective_config` in the response is meaningless, but we retain that syntax
// in order to remain compatible with the pageserver API.
let response = HashMap::from([
(
"tenant_specific_overrides",
serde_json::to_value(&config)
.context("serializing tenant specific overrides")
.map_err(ApiError::InternalServerError)?,
),
(
"effective_config",
serde_json::to_value(&config)
.context("serializing effective config")
.map_err(ApiError::InternalServerError)?,
),
]);
Ok(response)
}
pub(crate) async fn tenant_time_travel_remote_storage(
&self,
time_travel_req: &TenantTimeTravelRequest,
@@ -1460,6 +1671,60 @@ impl Service {
})?;
}
}
Ok(())
}
pub(crate) async fn tenant_secondary_download(
&self,
tenant_id: TenantId,
) -> Result<(), ApiError> {
// Acquire lock and yield the collection of shard-node tuples which we will send requests onward to
let targets = {
let locked = self.inner.read().unwrap();
let mut targets = Vec::new();
for (tenant_shard_id, shard) in
locked.tenants.range(TenantShardId::tenant_range(tenant_id))
{
for node_id in shard.intent.get_secondary() {
let node = locked
.nodes
.get(node_id)
.expect("Pageservers may not be deleted while referenced");
targets.push((*tenant_shard_id, node.clone()));
}
}
targets
};
// TODO: this API, and the underlying pageserver API, should take a timeout argument so that for long running
// downloads, they can return a clean 202 response instead of the HTTP client timing out.
// Issue concurrent requests to all shards' locations
let mut futs = FuturesUnordered::new();
for (tenant_shard_id, node) in targets {
let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
futs.push(async move {
let result = client.tenant_secondary_download(tenant_shard_id).await;
(result, node)
})
}
// Handle any errors returned by pageservers. This includes cases like this request racing with
// a scheduling operation, such that the tenant shard we're calling doesn't exist on that pageserver any more, as
// well as more general cases like 503s, 500s, or timeouts.
while let Some((result, node)) = futs.next().await {
let Err(e) = result else { continue };
// Secondary downloads are always advisory: if something fails, we nevertheless report success, so that whoever
// is calling us will proceed with whatever migration they're doing, albeit with a slightly less warm cache
// than they had hoped for.
tracing::warn!(
"Ignoring tenant secondary download error from pageserver {}: {e}",
node.id,
);
}
Ok(())
}
@@ -2039,8 +2304,8 @@ impl Service {
// Note: this generation is a placeholder, [`Persistence::begin_shard_split`] will
// populate the correct generation as part of its transaction, to protect us
// against racing with changes in the state of the parent.
generation: 0,
generation_pageserver: target.node.id.0 as i64,
generation: None,
generation_pageserver: Some(target.node.id.0 as i64),
placement_policy: serde_json::to_string(&policy).unwrap(),
// TODO: get the config out of the map
config: serde_json::to_string(&TenantConfig::default()).unwrap(),
@@ -2161,7 +2426,8 @@ impl Service {
.expect("It was present, we just split it");
let old_attached = old_state.intent.get_attached().unwrap();
old_state.intent.clear(scheduler);
(old_attached, old_state.generation, old_state.config.clone())
let generation = old_state.generation.expect("Shard must have been attached");
(old_attached, generation, old_state.config.clone())
};
for child in child_ids {
@@ -2182,7 +2448,7 @@ impl Service {
child_state.observed = ObservedState {
locations: child_observed,
};
child_state.generation = generation;
child_state.generation = Some(generation);
child_state.config = config.clone();
// The child's TenantState::splitting is intentionally left at the default value of Idle,
@@ -2247,6 +2513,7 @@ impl Service {
match shard.policy {
PlacementPolicy::Single => {
shard.intent.clear_secondary(scheduler);
shard.intent.set_attached(scheduler, Some(migrate_req.node_id));
}
PlacementPolicy::Double(_n) => {
// If our new attached node was a secondary, it no longer should be.
@@ -2256,6 +2523,12 @@ impl Service {
if let Some(old_attached) = old_attached {
shard.intent.push_secondary(scheduler, old_attached);
}
shard.intent.set_attached(scheduler, Some(migrate_req.node_id));
}
PlacementPolicy::Secondary => {
shard.intent.clear(scheduler);
shard.intent.push_secondary(scheduler, migrate_req.node_id);
}
PlacementPolicy::Detached => {
return Err(ApiError::BadRequest(anyhow::anyhow!(
@@ -2263,9 +2536,6 @@ impl Service {
)))
}
}
shard
.intent
.set_attached(scheduler, Some(migrate_req.node_id));
tracing::info!("Migrating: new intent {:?}", shard.intent);
shard.sequence = shard.sequence.next();
@@ -2593,7 +2863,7 @@ impl Service {
observed_loc.conf = None;
}
if tenant_state.intent.notify_offline(config_req.node_id) {
if tenant_state.intent.demote_attached(config_req.node_id) {
tenant_state.sequence = tenant_state.sequence.next();
match tenant_state.schedule(scheduler) {
Err(e) => {
@@ -2660,6 +2930,9 @@ impl Service {
/// Helper for methods that will try and call pageserver APIs for
/// a tenant, such as timeline CRUD: they cannot proceed unless the tenant
/// is attached somewhere.
///
/// TODO: this doesn't actually ensure attached unless the PlacementPolicy is
/// an attached policy. We should error out if it isn't.
fn ensure_attached_schedule(
&self,
mut locked: std::sync::RwLockWriteGuard<'_, ServiceState>,

View File

@@ -53,8 +53,11 @@ pub(crate) struct TenantState {
pub(crate) sequence: Sequence,
// Latest generation number: next time we attach, increment this
// and use the incremented number when attaching
pub(crate) generation: Generation,
// and use the incremented number when attaching.
//
// None represents an incompletely onboarded tenant via the [`Service::location_config`]
// API, where this tenant may only run in PlacementPolicy::Secondary.
pub(crate) generation: Option<Generation>,
// High level description of how the tenant should be set up. Provided
// externally.
@@ -181,6 +184,13 @@ impl IntentState {
}
}
/// Remove the last secondary node from the list of secondaries
pub(crate) fn pop_secondary(&mut self, scheduler: &mut Scheduler) {
if let Some(node_id) = self.secondary.pop() {
scheduler.node_dec_ref(node_id);
}
}
pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) {
if let Some(old_attached) = self.attached.take() {
scheduler.node_dec_ref(old_attached);
@@ -208,11 +218,13 @@ impl IntentState {
&self.secondary
}
/// When a node goes offline, we update intents to avoid using it
/// as their attached pageserver.
/// If the node is in use as the attached location, demote it into
/// the list of secondary locations. This is used when a node goes offline,
/// and we want to use a different node for attachment, but not permanently
/// forget the location on the offline node.
///
/// Returns true if a change was made
pub(crate) fn notify_offline(&mut self, node_id: NodeId) -> bool {
pub(crate) fn demote_attached(&mut self, node_id: NodeId) -> bool {
if self.attached == Some(node_id) {
// TODO: when scheduler starts tracking attached + secondary counts separately, we will
// need to call into it here.
@@ -315,7 +327,7 @@ pub(crate) struct ReconcileResult {
pub(crate) result: Result<(), ReconcileError>,
pub(crate) tenant_shard_id: TenantShardId,
pub(crate) generation: Generation,
pub(crate) generation: Option<Generation>,
pub(crate) observed: ObservedState,
/// Set [`TenantState::pending_compute_notification`] from this flag
@@ -340,7 +352,7 @@ impl TenantState {
tenant_shard_id,
policy,
intent: IntentState::default(),
generation: Generation::new(0),
generation: Some(Generation::new(0)),
shard,
observed: ObservedState::default(),
config: TenantConfig::default(),
@@ -438,10 +450,16 @@ impl TenantState {
// more work on the same pageservers we're already using.
let mut modified = false;
// Add/remove nodes to fulfil policy
use PlacementPolicy::*;
match self.policy {
Single => {
// Should have exactly one attached, and zero secondaries
if !self.intent.secondary.is_empty() {
self.intent.clear_secondary(scheduler);
modified = true;
}
let (modified_attached, _attached_node_id) = self.schedule_attached(scheduler)?;
modified |= modified_attached;
@@ -451,6 +469,23 @@ impl TenantState {
}
}
Double(secondary_count) => {
let retain_secondaries = if self.intent.attached.is_none()
&& scheduler.node_preferred(&self.intent.secondary).is_some()
{
// If we have no attached, and one of the secondaries is elegible to be promoted, retain
// one more secondary than we usually would, as one of them will become attached futher down this function.
secondary_count + 1
} else {
secondary_count
};
while self.intent.secondary.len() > retain_secondaries {
// We have no particular preference for one secondary location over another: just
// arbitrarily drop from the end
self.intent.pop_secondary(scheduler);
modified = true;
}
// Should have exactly one attached, and N secondaries
let (modified_attached, attached_node_id) = self.schedule_attached(scheduler)?;
modified |= modified_attached;
@@ -463,15 +498,28 @@ impl TenantState {
modified = true;
}
}
Detached => {
// Should have no attached or secondary pageservers
if self.intent.attached.is_some() {
self.intent.set_attached(scheduler, None);
Secondary => {
if let Some(node_id) = self.intent.get_attached() {
// Populate secondary by demoting the attached node
self.intent.demote_attached(*node_id);
modified = true;
} else if self.intent.secondary.is_empty() {
// Populate secondary by scheduling a fresh node
let node_id = scheduler.schedule_shard(&[])?;
self.intent.push_secondary(scheduler, node_id);
modified = true;
}
if !self.intent.secondary.is_empty() {
self.intent.clear_secondary(scheduler);
while self.intent.secondary.len() > 1 {
// We have no particular preference for one secondary location over another: just
// arbitrarily drop from the end
self.intent.pop_secondary(scheduler);
modified = true;
}
}
Detached => {
// Never add locations in this mode
if self.intent.get_attached().is_some() || !self.intent.get_secondary().is_empty() {
self.intent.clear(scheduler);
modified = true;
}
}
@@ -518,7 +566,12 @@ impl TenantState {
fn dirty(&self) -> bool {
if let Some(node_id) = self.intent.attached {
let wanted_conf = attached_location_conf(self.generation, &self.shard, &self.config);
// Maybe panic: it is a severe bug if we try to attach while generation is null.
let generation = self
.generation
.expect("Attempted to enter attached state without a generation");
let wanted_conf = attached_location_conf(generation, &self.shard, &self.config);
match self.observed.locations.get(&node_id) {
Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {}
Some(_) | None => {
@@ -596,6 +649,10 @@ impl TenantState {
// Reconcile already in flight for the current sequence?
if let Some(handle) = &self.reconciler {
if handle.sequence == self.sequence {
tracing::info!(
"Reconciliation already in progress for sequence {:?}",
self.sequence,
);
return Some(ReconcilerWaiter {
tenant_shard_id: self.tenant_shard_id,
seq_wait: self.waiter.clone(),
@@ -615,6 +672,10 @@ impl TenantState {
return None;
};
// Advance the sequence before spawning a reconciler, so that sequence waiters
// can distinguish between before+after the reconcile completes.
self.sequence = self.sequence.next();
let reconciler_cancel = cancel.child_token();
let mut reconciler = Reconciler {
tenant_shard_id: self.tenant_shard_id,
@@ -716,6 +777,17 @@ impl TenantState {
})
}
/// Called when a ReconcileResult has been emitted and the service is updating
/// our state: if the result is from a sequence >= my ReconcileHandle, then drop
/// the handle to indicate there is no longer a reconciliation in progress.
pub(crate) fn reconcile_complete(&mut self, sequence: Sequence) {
if let Some(reconcile_handle) = &self.reconciler {
if reconcile_handle.sequence <= sequence {
self.reconciler = None;
}
}
}
// If we had any state at all referring to this node ID, drop it. Does not
// attempt to reschedule.
pub(crate) fn deref_node(&mut self, node_id: NodeId) {
@@ -736,13 +808,8 @@ impl TenantState {
shard_number: self.tenant_shard_id.shard_number.0 as i32,
shard_count: self.tenant_shard_id.shard_count.literal() as i32,
shard_stripe_size: self.shard.stripe_size.0 as i32,
generation: self.generation.into().unwrap_or(0) as i32,
generation_pageserver: self
.intent
.get_attached()
.map(|n| n.0 as i64)
.unwrap_or(i64::MAX),
generation: self.generation.map(|g| g.into().unwrap_or(0) as i32),
generation_pageserver: self.intent.get_attached().map(|n| n.0 as i64),
placement_policy: serde_json::to_string(&self.policy).unwrap(),
config: serde_json::to_string(&self.config).unwrap(),
splitting: SplitState::default(),
@@ -805,8 +872,10 @@ pub(crate) mod tests {
assert_ne!(attached_node_id, secondary_node_id);
// Notifying the attached node is offline should demote it to a secondary
let changed = tenant_state.intent.notify_offline(attached_node_id);
let changed = tenant_state.intent.demote_attached(attached_node_id);
assert!(changed);
assert!(tenant_state.intent.attached.is_none());
assert_eq!(tenant_state.intent.secondary.len(), 2);
// Update the scheduler state to indicate the node is offline
nodes.get_mut(&attached_node_id).unwrap().availability = NodeAvailability::Offline;

View File

@@ -45,7 +45,7 @@ impl Generation {
Self::Broken
}
pub fn new(v: u32) -> Self {
pub const fn new(v: u32) -> Self {
Self::Valid(v)
}

View File

@@ -146,6 +146,8 @@ def test_sharding_service_smoke(
for tid in tenant_ids:
tenant_delete_wait_completed(env.attachment_service.pageserver_api(), tid, 10)
env.attachment_service.consistency_check()
# Set a scheduling policy on one node, create all the tenants, observe
# that the scheduling policy is respected.
env.attachment_service.node_configure(env.pageservers[1].id, {"scheduling": "Draining"})
@@ -256,9 +258,8 @@ def test_sharding_service_restart(neon_env_builder: NeonEnvBuilder):
env.attachment_service.consistency_check()
def test_sharding_service_onboarding(
neon_env_builder: NeonEnvBuilder,
):
@pytest.mark.parametrize("warm_up", [True, False])
def test_sharding_service_onboarding(neon_env_builder: NeonEnvBuilder, warm_up: bool):
"""
We onboard tenants to the sharding service by treating it as a 'virtual pageserver'
which provides the /location_config API. This is similar to creating a tenant,
@@ -306,6 +307,23 @@ def test_sharding_service_onboarding(
},
)
if warm_up:
origin_ps.http_client().tenant_heatmap_upload(tenant_id)
# We expect to be called via live migration code, which may try to configure the tenant into secondary
# mode before attaching it.
virtual_ps_http.tenant_location_conf(
tenant_id,
{
"mode": "Secondary",
"secondary_conf": {"warm": True},
"tenant_conf": {},
"generation": None,
},
)
virtual_ps_http.tenant_secondary_download(tenant_id)
# Call into attachment service to onboard the tenant
generation += 1
virtual_ps_http.tenant_location_conf(
@@ -351,7 +369,9 @@ def test_sharding_service_onboarding(
assert len(dest_tenants) == 1
assert TenantId(dest_tenants[0]["id"]) == tenant_id
# sharding service advances generation by 1 when it first attaches
# sharding service advances generation by 1 when it first attaches. We started
# with a nonzero generation so this equality also proves that the generation
# was properly carried over during onboarding.
assert dest_tenants[0]["generation"] == generation + 1
# The onboarded tenant should survive a restart of sharding service
@@ -362,6 +382,31 @@ def test_sharding_service_onboarding(
dest_ps.stop()
dest_ps.start()
# Having onboarded via /location_config, we should also be able to update the
# TenantConf part of LocationConf, without inadvertently resetting the generation
modified_tenant_conf = {"max_lsn_wal_lag": 1024 * 1024 * 1024 * 100}
dest_tenant_before_conf_change = dest_ps.http_client().tenant_status(tenant_id)
# The generation has moved on since we onboarded
assert generation != dest_tenant_before_conf_change["generation"]
virtual_ps_http.tenant_location_conf(
tenant_id,
{
"mode": "AttachedSingle",
"secondary_conf": None,
"tenant_conf": modified_tenant_conf,
# This is intentionally a stale generation
"generation": generation,
},
)
dest_tenant_after_conf_change = dest_ps.http_client().tenant_status(tenant_id)
assert (
dest_tenant_after_conf_change["generation"] == dest_tenant_before_conf_change["generation"]
)
dest_tenant_conf_after = dest_ps.http_client().tenant_config(tenant_id)
assert dest_tenant_conf_after.tenant_specific_overrides == modified_tenant_conf
env.attachment_service.consistency_check()
@@ -667,3 +712,41 @@ def test_sharding_service_auth(neon_env_builder: NeonEnvBuilder):
svc.request(
"POST", f"{api}/upcall/v1/re-attach", headers=svc.headers(TokenScope.PAGE_SERVER_API)
)
def test_sharding_service_tenant_conf(neon_env_builder: NeonEnvBuilder):
"""
Validate the pageserver-compatible API endpoints for setting and getting tenant conf, without
supplying the whole LocationConf.
"""
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
http = env.attachment_service.pageserver_api()
default_value = "7days"
new_value = "1h"
http.set_tenant_config(tenant_id, {"pitr_interval": new_value})
# Ensure the change landed on the storage controller
readback_controller = http.tenant_config(tenant_id)
assert readback_controller.effective_config["pitr_interval"] == new_value
assert readback_controller.tenant_specific_overrides["pitr_interval"] == new_value
# Ensure the change made it down to the pageserver
readback_ps = env.pageservers[0].http_client().tenant_config(tenant_id)
assert readback_ps.effective_config["pitr_interval"] == new_value
assert readback_ps.tenant_specific_overrides["pitr_interval"] == new_value
# Omitting a value clears it. This looks different in storage controller
# vs. pageserver API calls, because pageserver has defaults.
http.set_tenant_config(tenant_id, {})
readback_controller = http.tenant_config(tenant_id)
assert readback_controller.effective_config["pitr_interval"] is None
assert readback_controller.tenant_specific_overrides["pitr_interval"] is None
readback_ps = env.pageservers[0].http_client().tenant_config(tenant_id)
assert readback_ps.effective_config["pitr_interval"] == default_value
assert "pitr_interval" not in readback_ps.tenant_specific_overrides
env.attachment_service.consistency_check()