diff --git a/control_plane/attachment_service/migrations/2024-02-29-094122_generations_null/down.sql b/control_plane/attachment_service/migrations/2024-02-29-094122_generations_null/down.sql new file mode 100644 index 0000000000..503231f69d --- /dev/null +++ b/control_plane/attachment_service/migrations/2024-02-29-094122_generations_null/down.sql @@ -0,0 +1,2 @@ +ALTER TABLE tenant_shards ALTER generation SET NOT NULL; +ALTER TABLE tenant_shards ALTER generation_pageserver SET NOT NULL; diff --git a/control_plane/attachment_service/migrations/2024-02-29-094122_generations_null/up.sql b/control_plane/attachment_service/migrations/2024-02-29-094122_generations_null/up.sql new file mode 100644 index 0000000000..7e1e3cfe90 --- /dev/null +++ b/control_plane/attachment_service/migrations/2024-02-29-094122_generations_null/up.sql @@ -0,0 +1,4 @@ + + +ALTER TABLE tenant_shards ALTER generation DROP NOT NULL; +ALTER TABLE tenant_shards ALTER generation_pageserver DROP NOT NULL; \ No newline at end of file diff --git a/control_plane/attachment_service/src/http.rs b/control_plane/attachment_service/src/http.rs index f1153c2c18..384bdcef0c 100644 --- a/control_plane/attachment_service/src/http.rs +++ b/control_plane/attachment_service/src/http.rs @@ -1,9 +1,10 @@ use crate::reconciler::ReconcileError; use crate::service::{Service, STARTUP_RECONCILE_TIMEOUT}; +use crate::PlacementPolicy; use hyper::{Body, Request, Response}; use hyper::{StatusCode, Uri}; use pageserver_api::models::{ - TenantCreateRequest, TenantLocationConfigRequest, TenantShardSplitRequest, + TenantConfigRequest, TenantCreateRequest, TenantLocationConfigRequest, TenantShardSplitRequest, TenantTimeTravelRequest, TimelineCreateRequest, }; use pageserver_api::shard::TenantShardId; @@ -117,9 +118,14 @@ async fn handle_tenant_create( check_permissions(&req, Scope::PageServerApi)?; let create_req = json_request::(&mut req).await?; + + // TODO: enable specifying this. Using Single as a default helps legacy tests to work (they + // have no expectation of HA). + let placement_policy = PlacementPolicy::Single; + json_response( StatusCode::CREATED, - service.tenant_create(create_req).await?, + service.tenant_create(create_req, placement_policy).await?, ) } @@ -185,6 +191,27 @@ async fn handle_tenant_location_config( ) } +async fn handle_tenant_config_set( + service: Arc, + mut req: Request, +) -> Result, ApiError> { + check_permissions(&req, Scope::PageServerApi)?; + + let config_req = json_request::(&mut req).await?; + + json_response(StatusCode::OK, service.tenant_config_set(config_req).await?) +} + +async fn handle_tenant_config_get( + service: Arc, + req: Request, +) -> Result, ApiError> { + let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; + check_permissions(&req, Scope::PageServerApi)?; + + json_response(StatusCode::OK, service.tenant_config_get(tenant_id)?) +} + async fn handle_tenant_time_travel_remote_storage( service: Arc, mut req: Request, @@ -216,7 +243,15 @@ async fn handle_tenant_time_travel_remote_storage( done_if_after_raw, ) .await?; + json_response(StatusCode::OK, ()) +} +async fn handle_tenant_secondary_download( + service: Arc, + req: Request, +) -> Result, ApiError> { + let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?; + service.tenant_secondary_download(tenant_id).await?; json_response(StatusCode::OK, ()) } @@ -551,12 +586,21 @@ pub fn make_router( .delete("/v1/tenant/:tenant_id", |r| { tenant_service_handler(r, handle_tenant_delete) }) + .put("/v1/tenant/config", |r| { + tenant_service_handler(r, handle_tenant_config_set) + }) + .get("/v1/tenant/:tenant_id/config", |r| { + tenant_service_handler(r, handle_tenant_config_get) + }) .put("/v1/tenant/:tenant_id/location_config", |r| { tenant_service_handler(r, handle_tenant_location_config) }) .put("/v1/tenant/:tenant_id/time_travel_remote_storage", |r| { tenant_service_handler(r, handle_tenant_time_travel_remote_storage) }) + .post("/v1/tenant/:tenant_id/secondary/download", |r| { + tenant_service_handler(r, handle_tenant_secondary_download) + }) // Timeline operations .delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| { tenant_service_handler(r, handle_tenant_timeline_delete) diff --git a/control_plane/attachment_service/src/lib.rs b/control_plane/attachment_service/src/lib.rs index ce613e858f..7ae7e264c7 100644 --- a/control_plane/attachment_service/src/lib.rs +++ b/control_plane/attachment_service/src/lib.rs @@ -13,14 +13,20 @@ mod schema; pub mod service; mod tenant_state; -#[derive(Clone, Serialize, Deserialize, Debug)] +#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq)] enum PlacementPolicy { /// Cheapest way to attach a tenant: just one pageserver, no secondary Single, /// Production-ready way to attach a tenant: one attached pageserver and /// some number of secondaries. Double(usize), - /// Do not attach to any pageservers + /// Create one secondary mode locations. This is useful when onboarding + /// a tenant, or for an idle tenant that we might want to bring online quickly. + Secondary, + + /// Do not attach to any pageservers. This is appropriate for tenants that + /// have been idle for a long time, where we do not mind some delay in making + /// them available in future. Detached, } diff --git a/control_plane/attachment_service/src/persistence.rs b/control_plane/attachment_service/src/persistence.rs index 4c6eb2291c..d5c304385c 100644 --- a/control_plane/attachment_service/src/persistence.rs +++ b/control_plane/attachment_service/src/persistence.rs @@ -333,7 +333,15 @@ impl Persistence { shard_number: ShardNumber(tsp.shard_number as u8), shard_count: ShardCount::new(tsp.shard_count as u8), }; - result.insert(tenant_shard_id, Generation::new(tsp.generation as u32)); + + let Some(g) = tsp.generation else { + // If the generation_pageserver column was non-NULL, then the generation column should also be non-NULL: + // we only set generation_pageserver when setting generation. + return Err(DatabaseError::Logical( + "Generation should always be set after incrementing".to_string(), + )); + }; + result.insert(tenant_shard_id, Generation::new(g as u32)); } Ok(result) @@ -366,7 +374,85 @@ impl Persistence { }) .await?; - Ok(Generation::new(updated.generation as u32)) + // Generation is always non-null in the rseult: if the generation column had been NULL, then we + // should have experienced an SQL Confilict error while executing a query that tries to increment it. + debug_assert!(updated.generation.is_some()); + let Some(g) = updated.generation else { + return Err(DatabaseError::Logical( + "Generation should always be set after incrementing".to_string(), + ) + .into()); + }; + + Ok(Generation::new(g as u32)) + } + + /// For use when updating a persistent property of a tenant, such as its config or placement_policy. + /// + /// Do not use this for settting generation, unless in the special onboarding code path (/location_config) + /// API: use [`Self::increment_generation`] instead. Setting the generation via this route is a one-time thing + /// that we only do the first time a tenant is set to an attached policy via /location_config. + pub(crate) async fn update_tenant_shard( + &self, + tenant_shard_id: TenantShardId, + input_placement_policy: PlacementPolicy, + input_config: TenantConfig, + input_generation: Option, + ) -> DatabaseResult<()> { + use crate::schema::tenant_shards::dsl::*; + + self.with_conn(move |conn| { + let query = diesel::update(tenant_shards) + .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string())) + .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32)) + .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32)); + + if let Some(input_generation) = input_generation { + // Update includes generation column + query + .set(( + generation.eq(Some(input_generation.into().unwrap() as i32)), + placement_policy + .eq(serde_json::to_string(&input_placement_policy).unwrap()), + config.eq(serde_json::to_string(&input_config).unwrap()), + )) + .execute(conn)?; + } else { + // Update does not include generation column + query + .set(( + placement_policy + .eq(serde_json::to_string(&input_placement_policy).unwrap()), + config.eq(serde_json::to_string(&input_config).unwrap()), + )) + .execute(conn)?; + } + + Ok(()) + }) + .await?; + + Ok(()) + } + + pub(crate) async fn update_tenant_config( + &self, + input_tenant_id: TenantId, + input_config: TenantConfig, + ) -> DatabaseResult<()> { + use crate::schema::tenant_shards::dsl::*; + + self.with_conn(move |conn| { + diesel::update(tenant_shards) + .filter(tenant_id.eq(input_tenant_id.to_string())) + .set((config.eq(serde_json::to_string(&input_config).unwrap()),)) + .execute(conn)?; + + Ok(()) + }) + .await?; + + Ok(()) } pub(crate) async fn detach(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> { @@ -377,7 +463,7 @@ impl Persistence { .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32)) .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32)) .set(( - generation_pageserver.eq(i64::MAX), + generation_pageserver.eq(Option::::None), placement_policy.eq(serde_json::to_string(&PlacementPolicy::Detached).unwrap()), )) .execute(conn)?; @@ -503,12 +589,15 @@ pub(crate) struct TenantShardPersistence { pub(crate) shard_stripe_size: i32, // Latest generation number: next time we attach, increment this - // and use the incremented number when attaching - pub(crate) generation: i32, + // and use the incremented number when attaching. + // + // Generation is only None when first onboarding a tenant, where it may + // be in PlacementPolicy::Secondary and therefore have no valid generation state. + pub(crate) generation: Option, // Currently attached pageserver #[serde(rename = "pageserver")] - pub(crate) generation_pageserver: i64, + pub(crate) generation_pageserver: Option, #[serde(default)] pub(crate) placement_policy: String, diff --git a/control_plane/attachment_service/src/reconciler.rs b/control_plane/attachment_service/src/reconciler.rs index ce91c1f5e9..b633b217c7 100644 --- a/control_plane/attachment_service/src/reconciler.rs +++ b/control_plane/attachment_service/src/reconciler.rs @@ -26,7 +26,7 @@ pub(super) struct Reconciler { /// of a tenant's state from when we spawned a reconcile task. pub(super) tenant_shard_id: TenantShardId, pub(crate) shard: ShardIdentity, - pub(crate) generation: Generation, + pub(crate) generation: Option, pub(crate) intent: TargetState, pub(crate) config: TenantConfig, pub(crate) observed: ObservedState, @@ -312,7 +312,7 @@ impl Reconciler { &self.shard, &self.config, LocationConfigMode::AttachedStale, - Some(self.generation), + self.generation, None, ); self.location_config(origin_ps_id, stale_conf, Some(Duration::from_secs(10))) @@ -335,16 +335,17 @@ impl Reconciler { } // Increment generation before attaching to new pageserver - self.generation = self - .persistence - .increment_generation(self.tenant_shard_id, dest_ps_id) - .await?; + self.generation = Some( + self.persistence + .increment_generation(self.tenant_shard_id, dest_ps_id) + .await?, + ); let dest_conf = build_location_config( &self.shard, &self.config, LocationConfigMode::AttachedMulti, - Some(self.generation), + self.generation, None, ); @@ -401,7 +402,7 @@ impl Reconciler { &self.shard, &self.config, LocationConfigMode::AttachedSingle, - Some(self.generation), + self.generation, None, ); self.location_config(dest_ps_id, dest_final_conf.clone(), None) @@ -433,22 +434,62 @@ impl Reconciler { // If the attached pageserver is not attached, do so now. if let Some(node_id) = self.intent.attached { - let mut wanted_conf = - attached_location_conf(self.generation, &self.shard, &self.config); + // If we are in an attached policy, then generation must have been set (null generations + // are only present when a tenant is initially loaded with a secondary policy) + debug_assert!(self.generation.is_some()); + let Some(generation) = self.generation else { + return Err(ReconcileError::Other(anyhow::anyhow!( + "Attempted to attach with NULL generation" + ))); + }; + + let mut wanted_conf = attached_location_conf(generation, &self.shard, &self.config); match self.observed.locations.get(&node_id) { Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => { // Nothing to do tracing::info!(%node_id, "Observed configuration already correct.") } - _ => { + observed => { // In all cases other than a matching observed configuration, we will // reconcile this location. This includes locations with different configurations, as well // as locations with unknown (None) observed state. - self.generation = self - .persistence - .increment_generation(self.tenant_shard_id, node_id) - .await?; - wanted_conf.generation = self.generation.into(); + + // The general case is to increment the generation. However, there are cases + // where this is not necessary: + // - if we are only updating the TenantConf part of the location + // - if we are only changing the attachment mode (e.g. going to attachedmulti or attachedstale) + // and the location was already in the correct generation + let increment_generation = match observed { + None => true, + Some(ObservedStateLocation { conf: None }) => true, + Some(ObservedStateLocation { + conf: Some(observed), + }) => { + let generations_match = observed.generation == wanted_conf.generation; + + use LocationConfigMode::*; + let mode_transition_requires_gen_inc = + match (observed.mode, wanted_conf.mode) { + // Usually the short-lived attachment modes (multi and stale) are only used + // in the case of [`Self::live_migrate`], but it is simple to handle them correctly + // here too. Locations are allowed to go Single->Stale and Multi->Single within the same generation. + (AttachedSingle, AttachedStale) => false, + (AttachedMulti, AttachedSingle) => false, + (lhs, rhs) => lhs != rhs, + }; + + !generations_match || mode_transition_requires_gen_inc + } + }; + + if increment_generation { + let generation = self + .persistence + .increment_generation(self.tenant_shard_id, node_id) + .await?; + self.generation = Some(generation); + wanted_conf.generation = generation.into(); + } tracing::info!(%node_id, "Observed configuration requires update."); self.location_config(node_id, wanted_conf, None).await?; self.compute_notify().await?; diff --git a/control_plane/attachment_service/src/schema.rs b/control_plane/attachment_service/src/schema.rs index db5a957443..76e4e56a66 100644 --- a/control_plane/attachment_service/src/schema.rs +++ b/control_plane/attachment_service/src/schema.rs @@ -17,8 +17,8 @@ diesel::table! { shard_number -> Int4, shard_count -> Int4, shard_stripe_size -> Int4, - generation -> Int4, - generation_pageserver -> Int8, + generation -> Nullable, + generation_pageserver -> Nullable, placement_policy -> Varchar, splitting -> Int2, config -> Text, diff --git a/control_plane/attachment_service/src/service.rs b/control_plane/attachment_service/src/service.rs index 02c1a65545..4209b62db3 100644 --- a/control_plane/attachment_service/src/service.rs +++ b/control_plane/attachment_service/src/service.rs @@ -14,10 +14,13 @@ use control_plane::attachment_service::{ use diesel::result::DatabaseErrorKind; use futures::{stream::FuturesUnordered, StreamExt}; use hyper::StatusCode; -use pageserver_api::controller_api::{ - NodeAvailability, NodeConfigureRequest, NodeRegisterRequest, NodeSchedulingPolicy, - TenantCreateResponse, TenantCreateResponseShard, TenantLocateResponse, - TenantLocateResponseShard, TenantShardMigrateRequest, TenantShardMigrateResponse, +use pageserver_api::{ + controller_api::{ + NodeAvailability, NodeConfigureRequest, NodeRegisterRequest, NodeSchedulingPolicy, + TenantCreateResponse, TenantCreateResponseShard, TenantLocateResponse, + TenantLocateResponseShard, TenantShardMigrateRequest, TenantShardMigrateResponse, + }, + models::TenantConfigRequest, }; use pageserver_api::{ models::{ @@ -65,6 +68,11 @@ const SHORT_RECONCILE_TIMEOUT: Duration = Duration::from_secs(5); // some data in it. const RECONCILE_TIMEOUT: Duration = Duration::from_secs(30); +// If we receive a call using Secondary mode initially, it will omit generation. We will initialize +// tenant shards into this generation, and as long as it remains in this generation, we will accept +// input generation from future requests as authoritative. +const INITIAL_GENERATION: Generation = Generation::new(0); + /// How long [`Service::startup_reconcile`] is allowed to take before it should give /// up on unresponsive pageservers and proceed. pub(crate) const STARTUP_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30); @@ -167,6 +175,21 @@ impl From for ApiError { } } +#[allow(clippy::large_enum_variant)] +enum TenantCreateOrUpdate { + Create((TenantCreateRequest, PlacementPolicy)), + Update(Vec), +} + +struct ShardUpdate { + tenant_shard_id: TenantShardId, + placement_policy: PlacementPolicy, + tenant_config: TenantConfig, + + /// If this is None, generation is not updated. + generation: Option, +} + impl Service { pub fn get_config(&self) -> &Config { &self.config @@ -571,6 +594,9 @@ impl Service { // the shard so that a future [`TenantState::maybe_reconcile`] will try again. tenant.pending_compute_notification = result.pending_compute_notification; + // Let the TenantState know it is idle. + tenant.reconcile_complete(result.sequence); + match result.result { Ok(()) => { for (node_id, loc) in &result.observed.locations { @@ -661,8 +687,8 @@ impl Service { // after when pageservers start up and register. let mut node_ids = HashSet::new(); for tsp in &tenant_shard_persistence { - if tsp.generation_pageserver != i64::MAX { - node_ids.insert(tsp.generation_pageserver); + if let Some(node_id) = tsp.generation_pageserver { + node_ids.insert(node_id); } } for node_id in node_ids { @@ -699,18 +725,15 @@ impl Service { // We will populate intent properly later in [`Self::startup_reconcile`], initially populate // it with what we can infer: the node for which a generation was most recently issued. let mut intent = IntentState::new(); - if tsp.generation_pageserver != i64::MAX { - intent.set_attached( - &mut scheduler, - Some(NodeId(tsp.generation_pageserver as u64)), - ); + if let Some(generation_pageserver) = tsp.generation_pageserver { + intent.set_attached(&mut scheduler, Some(NodeId(generation_pageserver as u64))); } let new_tenant = TenantState { tenant_shard_id, shard: shard_identity, sequence: Sequence::initial(), - generation: Generation::new(tsp.generation as u32), + generation: tsp.generation.map(|g| Generation::new(g as u32)), policy: serde_json::from_str(&tsp.placement_policy).unwrap(), intent, observed: ObservedState::new(), @@ -790,8 +813,8 @@ impl Service { shard_number: attach_req.tenant_shard_id.shard_number.0 as i32, shard_count: attach_req.tenant_shard_id.shard_count.literal() as i32, shard_stripe_size: 0, - generation: 0, - generation_pageserver: i64::MAX, + generation: Some(0), + generation_pageserver: None, placement_policy: serde_json::to_string(&PlacementPolicy::default()).unwrap(), config: serde_json::to_string(&TenantConfig::default()).unwrap(), splitting: SplitState::default(), @@ -846,7 +869,7 @@ impl Service { .expect("Checked for existence above"); if let Some(new_generation) = new_generation { - tenant_state.generation = new_generation; + tenant_state.generation = Some(new_generation); } else { // This is a detach notification. We must update placement policy to avoid re-attaching // during background scheduling/reconciliation, or during attachment service restart. @@ -896,7 +919,7 @@ impl Service { node_id, ObservedStateLocation { conf: Some(attached_location_conf( - tenant_state.generation, + tenant_state.generation.unwrap(), &tenant_state.shard, &tenant_state.config, )), @@ -910,7 +933,7 @@ impl Service { Ok(AttachHookResponse { gen: attach_req .node_id - .map(|_| tenant_state.generation.into().unwrap()), + .map(|_| tenant_state.generation.expect("Test hook, not used on tenants that are mid-onboarding with a NULL generation").into().unwrap()), }) } @@ -923,7 +946,7 @@ impl Service { attachment: tenant_state.and_then(|s| { s.intent .get_attached() - .map(|ps| (s.generation.into().unwrap(), ps)) + .map(|ps| (s.generation.expect("Test hook, not used on tenants that are mid-onboarding with a NULL generation").into().unwrap(), ps)) }), } } @@ -973,7 +996,17 @@ impl Service { continue; }; - shard_state.generation = std::cmp::max(shard_state.generation, new_gen); + // If [`Persistence::re_attach`] selected this shard, it must have alread + // had a generation set. + debug_assert!(shard_state.generation.is_some()); + let Some(old_gen) = shard_state.generation else { + // Should never happen: would only return incremented generation + // for a tenant that already had a non-null generation. + return Err(ApiError::InternalServerError(anyhow::anyhow!( + "Generation must be set while re-attaching" + ))); + }; + shard_state.generation = Some(std::cmp::max(old_gen, new_gen)); if let Some(observed) = shard_state .observed .locations @@ -1003,7 +1036,7 @@ impl Service { for req_tenant in validate_req.tenants { if let Some(tenant_state) = locked.tenants.get(&req_tenant.id) { - let valid = tenant_state.generation == Generation::new(req_tenant.gen); + let valid = tenant_state.generation == Some(Generation::new(req_tenant.gen)); tracing::info!( "handle_validate: {}(gen {}): valid={valid} (latest {:?})", req_tenant.id, @@ -1030,8 +1063,9 @@ impl Service { pub(crate) async fn tenant_create( &self, create_req: TenantCreateRequest, + placement_policy: PlacementPolicy, ) -> Result { - let (response, waiters) = self.do_tenant_create(create_req).await?; + let (response, waiters) = self.do_tenant_create(create_req, placement_policy).await?; self.await_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await?; Ok(response) @@ -1040,6 +1074,7 @@ impl Service { pub(crate) async fn do_tenant_create( &self, create_req: TenantCreateRequest, + placement_policy: PlacementPolicy, ) -> Result<(TenantCreateResponse, Vec), ApiError> { // This service expects to handle sharding itself: it is an error to try and directly create // a particular shard here. @@ -1065,9 +1100,27 @@ impl Service { }) .collect::>(); - // TODO: enable specifying this. Using Single as a default helps legacy tests to work (they - // have no expectation of HA). - let placement_policy: PlacementPolicy = PlacementPolicy::Single; + // If the caller specifies a None generation, it means "start from default". This is different + // to [`Self::tenant_location_config`], where a None generation is used to represent + // an incompletely-onboarded tenant. + let initial_generation = if matches!(placement_policy, PlacementPolicy::Secondary) { + tracing::info!( + "tenant_create: secondary mode, generation is_some={}", + create_req.generation.is_some() + ); + create_req.generation.map(Generation::new) + } else { + tracing::info!( + "tenant_create: not secondary mode, generation is_some={}", + create_req.generation.is_some() + ); + Some( + create_req + .generation + .map(Generation::new) + .unwrap_or(INITIAL_GENERATION), + ) + }; // Ordering: we persist tenant shards before creating them on the pageserver. This enables a caller // to clean up after themselves by issuing a tenant deletion if something goes wrong and we restart @@ -1079,8 +1132,10 @@ impl Service { shard_number: tenant_shard_id.shard_number.0 as i32, shard_count: tenant_shard_id.shard_count.literal() as i32, shard_stripe_size: create_req.shard_parameters.stripe_size.0 as i32, - generation: create_req.generation.map(|g| g as i32).unwrap_or(0), - generation_pageserver: i64::MAX, + generation: initial_generation.map(|g| g.into().unwrap() as i32), + // The pageserver is not known until scheduling happens: we will set this column when + // incrementing the generation the first time we attach to a pageserver. + generation_pageserver: None, placement_policy: serde_json::to_string(&placement_policy).unwrap(), config: serde_json::to_string(&create_req.config).unwrap(), splitting: SplitState::default(), @@ -1120,15 +1175,17 @@ impl Service { )) })?; - response_shards.push(TenantCreateResponseShard { - shard_id: tenant_shard_id, - node_id: entry + if let Some(node_id) = entry.get().intent.get_attached() { + let generation = entry .get() - .intent - .get_attached() - .expect("We just set pageserver if it was None"), - generation: entry.get().generation.into().unwrap(), - }); + .generation + .expect("Generation is set when in attached mode"); + response_shards.push(TenantCreateResponseShard { + shard_id: tenant_shard_id, + node_id: *node_id, + generation: generation.into().unwrap(), + }); + } continue; } @@ -1142,9 +1199,7 @@ impl Service { placement_policy.clone(), ); - if let Some(create_gen) = create_req.generation { - state.generation = Generation::new(create_gen); - } + state.generation = initial_generation; state.config = create_req.config.clone(); state.schedule(scheduler).map_err(|e| { @@ -1153,14 +1208,18 @@ impl Service { )) })?; - response_shards.push(TenantCreateResponseShard { - shard_id: tenant_shard_id, - node_id: state - .intent - .get_attached() - .expect("We just set pageserver if it was None"), - generation: state.generation.into().unwrap(), - }); + // Only include shards in result if we are attaching: the purpose + // of the response is to tell the caller where the shards are attached. + if let Some(node_id) = state.intent.get_attached() { + let generation = state + .generation + .expect("Generation is set when in attached mode"); + response_shards.push(TenantCreateResponseShard { + shard_id: tenant_shard_id, + node_id: *node_id, + generation: generation.into().unwrap(), + }); + } entry.insert(state) } }; @@ -1214,12 +1273,114 @@ impl Service { Ok(()) } - /// This API is used by the cloud control plane to do coarse-grained control of tenants: - /// - Call with mode Attached* to upsert the tenant. - /// - Call with mode Detached to switch to PolicyMode::Detached + /// Part of [`Self::tenant_location_config`]: dissect an incoming location config request, + /// and transform it into either a tenant creation of a series of shard updates. + fn tenant_location_config_prepare( + &self, + tenant_id: TenantId, + req: TenantLocationConfigRequest, + ) -> TenantCreateOrUpdate { + let mut updates = Vec::new(); + let mut locked = self.inner.write().unwrap(); + let (nodes, tenants, _scheduler) = locked.parts_mut(); + + // Use location config mode as an indicator of policy. + let placement_policy = match req.config.mode { + LocationConfigMode::Detached => PlacementPolicy::Detached, + LocationConfigMode::Secondary => PlacementPolicy::Secondary, + LocationConfigMode::AttachedMulti + | LocationConfigMode::AttachedSingle + | LocationConfigMode::AttachedStale => { + if nodes.len() > 1 { + PlacementPolicy::Double(1) + } else { + // Convenience for dev/test: if we just have one pageserver, import + // tenants into Single mode so that scheduling will succeed. + PlacementPolicy::Single + } + } + }; + + let mut create = true; + for (shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) { + // Saw an existing shard: this is not a creation + create = false; + + // Shards may have initially been created by a Secondary request, where we + // would have left generation as None. + // + // We only update generation the first time we see an attached-mode request, + // and if there is no existing generation set. The caller is responsible for + // ensuring that no non-storage-controller pageserver ever uses a higher + // generation than they passed in here. + use LocationConfigMode::*; + let set_generation = match req.config.mode { + AttachedMulti | AttachedSingle | AttachedStale if shard.generation.is_none() => { + req.config.generation.map(Generation::new) + } + _ => None, + }; + + if shard.policy != placement_policy + || shard.config != req.config.tenant_conf + || set_generation.is_some() + { + updates.push(ShardUpdate { + tenant_shard_id: *shard_id, + placement_policy: placement_policy.clone(), + tenant_config: req.config.tenant_conf.clone(), + generation: set_generation, + }); + } + } + + if create { + use LocationConfigMode::*; + let generation = match req.config.mode { + AttachedMulti | AttachedSingle | AttachedStale => req.config.generation, + // If a caller provided a generation in a non-attached request, ignore it + // and leave our generation as None: this enables a subsequent update to set + // the generation when setting an attached mode for the first time. + _ => None, + }; + + TenantCreateOrUpdate::Create( + // Synthesize a creation request + ( + TenantCreateRequest { + new_tenant_id: TenantShardId::unsharded(tenant_id), + generation, + shard_parameters: ShardParameters { + // Must preserve the incoming shard_count do distinguish unsharded (0) + // from single-sharded (1): this distinction appears in the S3 keys of the tenant. + count: req.tenant_id.shard_count, + // We only import un-sharded or single-sharded tenants, so stripe + // size can be made up arbitrarily here. + stripe_size: ShardParameters::DEFAULT_STRIPE_SIZE, + }, + config: req.config.tenant_conf, + }, + placement_policy, + ), + ) + } else { + TenantCreateOrUpdate::Update(updates) + } + } + + /// This API is used by the cloud control plane to migrate unsharded tenants that it created + /// directly with pageservers into this service. /// - /// In future, calling with mode Secondary may switch to a detach-lite mode in which a tenant only has - /// secondary locations. + /// Cloud control plane MUST NOT continue issuing GENERATION NUMBERS for this tenant once it + /// has attempted to call this API. Failure to oblige to this rule may lead to S3 corruption. + /// Think of the first attempt to call this API as a transfer of absolute authority over the + /// tenant's source of generation numbers. + /// + /// The mode in this request coarse-grained control of tenants: + /// - Call with mode Attached* to upsert the tenant. + /// - Call with mode Secondary to either onboard a tenant without attaching it, or + /// to set an existing tenant to PolicyMode::Secondary + /// - Call with mode Detached to switch to PolicyMode::Detached pub(crate) async fn tenant_location_config( &self, tenant_id: TenantId, @@ -1231,131 +1392,96 @@ impl Service { ))); } - let mut waiters = Vec::new(); + // First check if this is a creation or an update + let create_or_update = self.tenant_location_config_prepare(tenant_id, req); + let mut result = TenantLocationConfigResponse { shards: Vec::new() }; - let maybe_create = { - let mut locked = self.inner.write().unwrap(); - let result_tx = locked.result_tx.clone(); - let compute_hook = locked.compute_hook.clone(); - let (nodes, tenants, scheduler) = locked.parts_mut(); + let waiters = match create_or_update { + TenantCreateOrUpdate::Create((create_req, placement_policy)) => { + let (create_resp, waiters) = + self.do_tenant_create(create_req, placement_policy).await?; + result.shards = create_resp + .shards + .into_iter() + .map(|s| TenantShardLocation { + node_id: s.node_id, + shard_id: s.shard_id, + }) + .collect(); + waiters + } + TenantCreateOrUpdate::Update(updates) => { + // Persist updates + // Ordering: write to the database before applying changes in-memory, so that + // we will not appear time-travel backwards on a restart. + for ShardUpdate { + tenant_shard_id, + placement_policy, + tenant_config, + generation, + } in &updates + { + self.persistence + .update_tenant_shard( + *tenant_shard_id, + placement_policy.clone(), + tenant_config.clone(), + *generation, + ) + .await?; + } - // Maybe we have existing shards - let mut create = true; - for (shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) { - // Saw an existing shard: this is not a creation - create = false; + // Apply updates in-memory + let mut waiters = Vec::new(); + { + let mut locked = self.inner.write().unwrap(); + let result_tx = locked.result_tx.clone(); + let compute_hook = locked.compute_hook.clone(); + let (nodes, tenants, scheduler) = locked.parts_mut(); - // Note that for existing tenants we do _not_ respect the generation in the request: this is likely - // to be stale. Once a tenant is created in this service, our view of generation is authoritative, and - // callers' generations may be ignored. This represents a one-way migration of tenants from the outer - // cloud control plane into this service. + for ShardUpdate { + tenant_shard_id, + placement_policy, + tenant_config, + generation: update_generation, + } in updates + { + let Some(shard) = tenants.get_mut(&tenant_shard_id) else { + tracing::warn!("Shard {tenant_shard_id} removed while updating"); + continue; + }; - // Use location config mode as an indicator of policy: if they ask for - // attached we go to default HA attached mode. If they ask for secondary - // we go to secondary-only mode. If they ask for detached we detach. - match req.config.mode { - LocationConfigMode::Detached => { - shard.policy = PlacementPolicy::Detached; - } - LocationConfigMode::Secondary => { - // TODO: implement secondary-only mode. - todo!(); - } - LocationConfigMode::AttachedMulti - | LocationConfigMode::AttachedSingle - | LocationConfigMode::AttachedStale => { - // TODO: persistence for changes in policy - if nodes.len() > 1 { - shard.policy = PlacementPolicy::Double(1) - } else { - // Convenience for dev/test: if we just have one pageserver, import - // tenants into Single mode so that scheduling will succeed. - shard.policy = PlacementPolicy::Single + shard.policy = placement_policy; + shard.config = tenant_config; + if let Some(generation) = update_generation { + shard.generation = Some(generation); + } + + shard.schedule(scheduler)?; + + let maybe_waiter = shard.maybe_reconcile( + result_tx.clone(), + nodes, + &compute_hook, + &self.config, + &self.persistence, + &self.gate, + &self.cancel, + ); + if let Some(waiter) = maybe_waiter { + waiters.push(waiter); + } + + if let Some(node_id) = shard.intent.get_attached() { + result.shards.push(TenantShardLocation { + shard_id: tenant_shard_id, + node_id: *node_id, + }) } } } - - shard.schedule(scheduler)?; - - let maybe_waiter = shard.maybe_reconcile( - result_tx.clone(), - nodes, - &compute_hook, - &self.config, - &self.persistence, - &self.gate, - &self.cancel, - ); - if let Some(waiter) = maybe_waiter { - waiters.push(waiter); - } - - if let Some(node_id) = shard.intent.get_attached() { - result.shards.push(TenantShardLocation { - shard_id: *shard_id, - node_id: *node_id, - }) - } + waiters } - - if create { - // Validate request mode - match req.config.mode { - LocationConfigMode::Detached | LocationConfigMode::Secondary => { - // When using this API to onboard an existing tenant to this service, it must start in - // an attached state, because we need the request to come with a generation - return Err(ApiError::BadRequest(anyhow::anyhow!( - "Imported tenant must be in attached mode" - ))); - } - - LocationConfigMode::AttachedMulti - | LocationConfigMode::AttachedSingle - | LocationConfigMode::AttachedStale => { - // Pass - } - } - - // Validate request generation - let Some(generation) = req.config.generation else { - // We can only import attached tenants, because we need the request to come with a generation - return Err(ApiError::BadRequest(anyhow::anyhow!( - "Generation is mandatory when importing tenant" - ))); - }; - - // Synthesize a creation request - Some(TenantCreateRequest { - new_tenant_id: TenantShardId::unsharded(tenant_id), - generation: Some(generation), - shard_parameters: ShardParameters { - // Must preserve the incoming shard_count do distinguish unsharded (0) - // from single-sharded (1): this distinction appears in the S3 keys of the tenant. - count: req.tenant_id.shard_count, - // We only import un-sharded or single-sharded tenants, so stripe - // size can be made up arbitrarily here. - stripe_size: ShardParameters::DEFAULT_STRIPE_SIZE, - }, - config: req.config.tenant_conf, - }) - } else { - None - } - }; - - let waiters = if let Some(create_req) = maybe_create { - let (create_resp, waiters) = self.do_tenant_create(create_req).await?; - result.shards = create_resp - .shards - .into_iter() - .map(|s| TenantShardLocation { - node_id: s.node_id, - shard_id: s.shard_id, - }) - .collect(); - waiters - } else { - waiters }; if let Err(e) = self.await_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await { @@ -1375,6 +1501,91 @@ impl Service { Ok(result) } + pub(crate) async fn tenant_config_set(&self, req: TenantConfigRequest) -> Result<(), ApiError> { + let tenant_id = req.tenant_id; + let config = req.config; + + self.persistence + .update_tenant_config(req.tenant_id, config.clone()) + .await?; + + let waiters = { + let mut waiters = Vec::new(); + let mut locked = self.inner.write().unwrap(); + let result_tx = locked.result_tx.clone(); + let compute_hook = locked.compute_hook.clone(); + let (nodes, tenants, _scheduler) = locked.parts_mut(); + for (_shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) { + shard.config = config.clone(); + if let Some(waiter) = shard.maybe_reconcile( + result_tx.clone(), + nodes, + &compute_hook, + &self.config, + &self.persistence, + &self.gate, + &self.cancel, + ) { + waiters.push(waiter); + } + } + waiters + }; + + if let Err(e) = self.await_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await { + // Treat this as success because we have stored the configuration. If e.g. + // a node was unavailable at this time, it should not stop us accepting a + // configuration change. + tracing::warn!(%tenant_id, "Accepted configuration update but reconciliation failed: {e}"); + } + + Ok(()) + } + + pub(crate) fn tenant_config_get( + &self, + tenant_id: TenantId, + ) -> Result, ApiError> { + let config = { + let locked = self.inner.read().unwrap(); + + match locked + .tenants + .range(TenantShardId::tenant_range(tenant_id)) + .next() + { + Some((_tenant_shard_id, shard)) => shard.config.clone(), + None => { + return Err(ApiError::NotFound( + anyhow::anyhow!("Tenant not found").into(), + )) + } + } + }; + + // Unlike the pageserver, we do not have a set of global defaults: the config is + // entirely per-tenant. Therefore the distinction between `tenant_specific_overrides` + // and `effective_config` in the response is meaningless, but we retain that syntax + // in order to remain compatible with the pageserver API. + + let response = HashMap::from([ + ( + "tenant_specific_overrides", + serde_json::to_value(&config) + .context("serializing tenant specific overrides") + .map_err(ApiError::InternalServerError)?, + ), + ( + "effective_config", + serde_json::to_value(&config) + .context("serializing effective config") + .map_err(ApiError::InternalServerError)?, + ), + ]); + + Ok(response) + } + pub(crate) async fn tenant_time_travel_remote_storage( &self, time_travel_req: &TenantTimeTravelRequest, @@ -1460,6 +1671,60 @@ impl Service { })?; } } + Ok(()) + } + + pub(crate) async fn tenant_secondary_download( + &self, + tenant_id: TenantId, + ) -> Result<(), ApiError> { + // Acquire lock and yield the collection of shard-node tuples which we will send requests onward to + let targets = { + let locked = self.inner.read().unwrap(); + let mut targets = Vec::new(); + + for (tenant_shard_id, shard) in + locked.tenants.range(TenantShardId::tenant_range(tenant_id)) + { + for node_id in shard.intent.get_secondary() { + let node = locked + .nodes + .get(node_id) + .expect("Pageservers may not be deleted while referenced"); + + targets.push((*tenant_shard_id, node.clone())); + } + } + targets + }; + + // TODO: this API, and the underlying pageserver API, should take a timeout argument so that for long running + // downloads, they can return a clean 202 response instead of the HTTP client timing out. + + // Issue concurrent requests to all shards' locations + let mut futs = FuturesUnordered::new(); + for (tenant_shard_id, node) in targets { + let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref()); + futs.push(async move { + let result = client.tenant_secondary_download(tenant_shard_id).await; + (result, node) + }) + } + + // Handle any errors returned by pageservers. This includes cases like this request racing with + // a scheduling operation, such that the tenant shard we're calling doesn't exist on that pageserver any more, as + // well as more general cases like 503s, 500s, or timeouts. + while let Some((result, node)) = futs.next().await { + let Err(e) = result else { continue }; + + // Secondary downloads are always advisory: if something fails, we nevertheless report success, so that whoever + // is calling us will proceed with whatever migration they're doing, albeit with a slightly less warm cache + // than they had hoped for. + tracing::warn!( + "Ignoring tenant secondary download error from pageserver {}: {e}", + node.id, + ); + } Ok(()) } @@ -2039,8 +2304,8 @@ impl Service { // Note: this generation is a placeholder, [`Persistence::begin_shard_split`] will // populate the correct generation as part of its transaction, to protect us // against racing with changes in the state of the parent. - generation: 0, - generation_pageserver: target.node.id.0 as i64, + generation: None, + generation_pageserver: Some(target.node.id.0 as i64), placement_policy: serde_json::to_string(&policy).unwrap(), // TODO: get the config out of the map config: serde_json::to_string(&TenantConfig::default()).unwrap(), @@ -2161,7 +2426,8 @@ impl Service { .expect("It was present, we just split it"); let old_attached = old_state.intent.get_attached().unwrap(); old_state.intent.clear(scheduler); - (old_attached, old_state.generation, old_state.config.clone()) + let generation = old_state.generation.expect("Shard must have been attached"); + (old_attached, generation, old_state.config.clone()) }; for child in child_ids { @@ -2182,7 +2448,7 @@ impl Service { child_state.observed = ObservedState { locations: child_observed, }; - child_state.generation = generation; + child_state.generation = Some(generation); child_state.config = config.clone(); // The child's TenantState::splitting is intentionally left at the default value of Idle, @@ -2247,6 +2513,7 @@ impl Service { match shard.policy { PlacementPolicy::Single => { shard.intent.clear_secondary(scheduler); + shard.intent.set_attached(scheduler, Some(migrate_req.node_id)); } PlacementPolicy::Double(_n) => { // If our new attached node was a secondary, it no longer should be. @@ -2256,6 +2523,12 @@ impl Service { if let Some(old_attached) = old_attached { shard.intent.push_secondary(scheduler, old_attached); } + + shard.intent.set_attached(scheduler, Some(migrate_req.node_id)); + } + PlacementPolicy::Secondary => { + shard.intent.clear(scheduler); + shard.intent.push_secondary(scheduler, migrate_req.node_id); } PlacementPolicy::Detached => { return Err(ApiError::BadRequest(anyhow::anyhow!( @@ -2263,9 +2536,6 @@ impl Service { ))) } } - shard - .intent - .set_attached(scheduler, Some(migrate_req.node_id)); tracing::info!("Migrating: new intent {:?}", shard.intent); shard.sequence = shard.sequence.next(); @@ -2593,7 +2863,7 @@ impl Service { observed_loc.conf = None; } - if tenant_state.intent.notify_offline(config_req.node_id) { + if tenant_state.intent.demote_attached(config_req.node_id) { tenant_state.sequence = tenant_state.sequence.next(); match tenant_state.schedule(scheduler) { Err(e) => { @@ -2660,6 +2930,9 @@ impl Service { /// Helper for methods that will try and call pageserver APIs for /// a tenant, such as timeline CRUD: they cannot proceed unless the tenant /// is attached somewhere. + /// + /// TODO: this doesn't actually ensure attached unless the PlacementPolicy is + /// an attached policy. We should error out if it isn't. fn ensure_attached_schedule( &self, mut locked: std::sync::RwLockWriteGuard<'_, ServiceState>, diff --git a/control_plane/attachment_service/src/tenant_state.rs b/control_plane/attachment_service/src/tenant_state.rs index c14fe6699e..33b7d578c7 100644 --- a/control_plane/attachment_service/src/tenant_state.rs +++ b/control_plane/attachment_service/src/tenant_state.rs @@ -53,8 +53,11 @@ pub(crate) struct TenantState { pub(crate) sequence: Sequence, // Latest generation number: next time we attach, increment this - // and use the incremented number when attaching - pub(crate) generation: Generation, + // and use the incremented number when attaching. + // + // None represents an incompletely onboarded tenant via the [`Service::location_config`] + // API, where this tenant may only run in PlacementPolicy::Secondary. + pub(crate) generation: Option, // High level description of how the tenant should be set up. Provided // externally. @@ -181,6 +184,13 @@ impl IntentState { } } + /// Remove the last secondary node from the list of secondaries + pub(crate) fn pop_secondary(&mut self, scheduler: &mut Scheduler) { + if let Some(node_id) = self.secondary.pop() { + scheduler.node_dec_ref(node_id); + } + } + pub(crate) fn clear(&mut self, scheduler: &mut Scheduler) { if let Some(old_attached) = self.attached.take() { scheduler.node_dec_ref(old_attached); @@ -208,11 +218,13 @@ impl IntentState { &self.secondary } - /// When a node goes offline, we update intents to avoid using it - /// as their attached pageserver. + /// If the node is in use as the attached location, demote it into + /// the list of secondary locations. This is used when a node goes offline, + /// and we want to use a different node for attachment, but not permanently + /// forget the location on the offline node. /// /// Returns true if a change was made - pub(crate) fn notify_offline(&mut self, node_id: NodeId) -> bool { + pub(crate) fn demote_attached(&mut self, node_id: NodeId) -> bool { if self.attached == Some(node_id) { // TODO: when scheduler starts tracking attached + secondary counts separately, we will // need to call into it here. @@ -315,7 +327,7 @@ pub(crate) struct ReconcileResult { pub(crate) result: Result<(), ReconcileError>, pub(crate) tenant_shard_id: TenantShardId, - pub(crate) generation: Generation, + pub(crate) generation: Option, pub(crate) observed: ObservedState, /// Set [`TenantState::pending_compute_notification`] from this flag @@ -340,7 +352,7 @@ impl TenantState { tenant_shard_id, policy, intent: IntentState::default(), - generation: Generation::new(0), + generation: Some(Generation::new(0)), shard, observed: ObservedState::default(), config: TenantConfig::default(), @@ -438,10 +450,16 @@ impl TenantState { // more work on the same pageservers we're already using. let mut modified = false; + // Add/remove nodes to fulfil policy use PlacementPolicy::*; match self.policy { Single => { // Should have exactly one attached, and zero secondaries + if !self.intent.secondary.is_empty() { + self.intent.clear_secondary(scheduler); + modified = true; + } + let (modified_attached, _attached_node_id) = self.schedule_attached(scheduler)?; modified |= modified_attached; @@ -451,6 +469,23 @@ impl TenantState { } } Double(secondary_count) => { + let retain_secondaries = if self.intent.attached.is_none() + && scheduler.node_preferred(&self.intent.secondary).is_some() + { + // If we have no attached, and one of the secondaries is elegible to be promoted, retain + // one more secondary than we usually would, as one of them will become attached futher down this function. + secondary_count + 1 + } else { + secondary_count + }; + + while self.intent.secondary.len() > retain_secondaries { + // We have no particular preference for one secondary location over another: just + // arbitrarily drop from the end + self.intent.pop_secondary(scheduler); + modified = true; + } + // Should have exactly one attached, and N secondaries let (modified_attached, attached_node_id) = self.schedule_attached(scheduler)?; modified |= modified_attached; @@ -463,15 +498,28 @@ impl TenantState { modified = true; } } - Detached => { - // Should have no attached or secondary pageservers - if self.intent.attached.is_some() { - self.intent.set_attached(scheduler, None); + Secondary => { + if let Some(node_id) = self.intent.get_attached() { + // Populate secondary by demoting the attached node + self.intent.demote_attached(*node_id); + modified = true; + } else if self.intent.secondary.is_empty() { + // Populate secondary by scheduling a fresh node + let node_id = scheduler.schedule_shard(&[])?; + self.intent.push_secondary(scheduler, node_id); modified = true; } - - if !self.intent.secondary.is_empty() { - self.intent.clear_secondary(scheduler); + while self.intent.secondary.len() > 1 { + // We have no particular preference for one secondary location over another: just + // arbitrarily drop from the end + self.intent.pop_secondary(scheduler); + modified = true; + } + } + Detached => { + // Never add locations in this mode + if self.intent.get_attached().is_some() || !self.intent.get_secondary().is_empty() { + self.intent.clear(scheduler); modified = true; } } @@ -518,7 +566,12 @@ impl TenantState { fn dirty(&self) -> bool { if let Some(node_id) = self.intent.attached { - let wanted_conf = attached_location_conf(self.generation, &self.shard, &self.config); + // Maybe panic: it is a severe bug if we try to attach while generation is null. + let generation = self + .generation + .expect("Attempted to enter attached state without a generation"); + + let wanted_conf = attached_location_conf(generation, &self.shard, &self.config); match self.observed.locations.get(&node_id) { Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {} Some(_) | None => { @@ -596,6 +649,10 @@ impl TenantState { // Reconcile already in flight for the current sequence? if let Some(handle) = &self.reconciler { if handle.sequence == self.sequence { + tracing::info!( + "Reconciliation already in progress for sequence {:?}", + self.sequence, + ); return Some(ReconcilerWaiter { tenant_shard_id: self.tenant_shard_id, seq_wait: self.waiter.clone(), @@ -615,6 +672,10 @@ impl TenantState { return None; }; + // Advance the sequence before spawning a reconciler, so that sequence waiters + // can distinguish between before+after the reconcile completes. + self.sequence = self.sequence.next(); + let reconciler_cancel = cancel.child_token(); let mut reconciler = Reconciler { tenant_shard_id: self.tenant_shard_id, @@ -716,6 +777,17 @@ impl TenantState { }) } + /// Called when a ReconcileResult has been emitted and the service is updating + /// our state: if the result is from a sequence >= my ReconcileHandle, then drop + /// the handle to indicate there is no longer a reconciliation in progress. + pub(crate) fn reconcile_complete(&mut self, sequence: Sequence) { + if let Some(reconcile_handle) = &self.reconciler { + if reconcile_handle.sequence <= sequence { + self.reconciler = None; + } + } + } + // If we had any state at all referring to this node ID, drop it. Does not // attempt to reschedule. pub(crate) fn deref_node(&mut self, node_id: NodeId) { @@ -736,13 +808,8 @@ impl TenantState { shard_number: self.tenant_shard_id.shard_number.0 as i32, shard_count: self.tenant_shard_id.shard_count.literal() as i32, shard_stripe_size: self.shard.stripe_size.0 as i32, - generation: self.generation.into().unwrap_or(0) as i32, - generation_pageserver: self - .intent - .get_attached() - .map(|n| n.0 as i64) - .unwrap_or(i64::MAX), - + generation: self.generation.map(|g| g.into().unwrap_or(0) as i32), + generation_pageserver: self.intent.get_attached().map(|n| n.0 as i64), placement_policy: serde_json::to_string(&self.policy).unwrap(), config: serde_json::to_string(&self.config).unwrap(), splitting: SplitState::default(), @@ -805,8 +872,10 @@ pub(crate) mod tests { assert_ne!(attached_node_id, secondary_node_id); // Notifying the attached node is offline should demote it to a secondary - let changed = tenant_state.intent.notify_offline(attached_node_id); + let changed = tenant_state.intent.demote_attached(attached_node_id); assert!(changed); + assert!(tenant_state.intent.attached.is_none()); + assert_eq!(tenant_state.intent.secondary.len(), 2); // Update the scheduler state to indicate the node is offline nodes.get_mut(&attached_node_id).unwrap().availability = NodeAvailability::Offline; diff --git a/libs/utils/src/generation.rs b/libs/utils/src/generation.rs index 6f6c46cfeb..af15cee924 100644 --- a/libs/utils/src/generation.rs +++ b/libs/utils/src/generation.rs @@ -45,7 +45,7 @@ impl Generation { Self::Broken } - pub fn new(v: u32) -> Self { + pub const fn new(v: u32) -> Self { Self::Valid(v) } diff --git a/test_runner/regress/test_sharding_service.py b/test_runner/regress/test_sharding_service.py index c8224c1c67..bc77dfd084 100644 --- a/test_runner/regress/test_sharding_service.py +++ b/test_runner/regress/test_sharding_service.py @@ -146,6 +146,8 @@ def test_sharding_service_smoke( for tid in tenant_ids: tenant_delete_wait_completed(env.attachment_service.pageserver_api(), tid, 10) + env.attachment_service.consistency_check() + # Set a scheduling policy on one node, create all the tenants, observe # that the scheduling policy is respected. env.attachment_service.node_configure(env.pageservers[1].id, {"scheduling": "Draining"}) @@ -256,9 +258,8 @@ def test_sharding_service_restart(neon_env_builder: NeonEnvBuilder): env.attachment_service.consistency_check() -def test_sharding_service_onboarding( - neon_env_builder: NeonEnvBuilder, -): +@pytest.mark.parametrize("warm_up", [True, False]) +def test_sharding_service_onboarding(neon_env_builder: NeonEnvBuilder, warm_up: bool): """ We onboard tenants to the sharding service by treating it as a 'virtual pageserver' which provides the /location_config API. This is similar to creating a tenant, @@ -306,6 +307,23 @@ def test_sharding_service_onboarding( }, ) + if warm_up: + origin_ps.http_client().tenant_heatmap_upload(tenant_id) + + # We expect to be called via live migration code, which may try to configure the tenant into secondary + # mode before attaching it. + virtual_ps_http.tenant_location_conf( + tenant_id, + { + "mode": "Secondary", + "secondary_conf": {"warm": True}, + "tenant_conf": {}, + "generation": None, + }, + ) + + virtual_ps_http.tenant_secondary_download(tenant_id) + # Call into attachment service to onboard the tenant generation += 1 virtual_ps_http.tenant_location_conf( @@ -351,7 +369,9 @@ def test_sharding_service_onboarding( assert len(dest_tenants) == 1 assert TenantId(dest_tenants[0]["id"]) == tenant_id - # sharding service advances generation by 1 when it first attaches + # sharding service advances generation by 1 when it first attaches. We started + # with a nonzero generation so this equality also proves that the generation + # was properly carried over during onboarding. assert dest_tenants[0]["generation"] == generation + 1 # The onboarded tenant should survive a restart of sharding service @@ -362,6 +382,31 @@ def test_sharding_service_onboarding( dest_ps.stop() dest_ps.start() + # Having onboarded via /location_config, we should also be able to update the + # TenantConf part of LocationConf, without inadvertently resetting the generation + modified_tenant_conf = {"max_lsn_wal_lag": 1024 * 1024 * 1024 * 100} + dest_tenant_before_conf_change = dest_ps.http_client().tenant_status(tenant_id) + + # The generation has moved on since we onboarded + assert generation != dest_tenant_before_conf_change["generation"] + + virtual_ps_http.tenant_location_conf( + tenant_id, + { + "mode": "AttachedSingle", + "secondary_conf": None, + "tenant_conf": modified_tenant_conf, + # This is intentionally a stale generation + "generation": generation, + }, + ) + dest_tenant_after_conf_change = dest_ps.http_client().tenant_status(tenant_id) + assert ( + dest_tenant_after_conf_change["generation"] == dest_tenant_before_conf_change["generation"] + ) + dest_tenant_conf_after = dest_ps.http_client().tenant_config(tenant_id) + assert dest_tenant_conf_after.tenant_specific_overrides == modified_tenant_conf + env.attachment_service.consistency_check() @@ -667,3 +712,41 @@ def test_sharding_service_auth(neon_env_builder: NeonEnvBuilder): svc.request( "POST", f"{api}/upcall/v1/re-attach", headers=svc.headers(TokenScope.PAGE_SERVER_API) ) + + +def test_sharding_service_tenant_conf(neon_env_builder: NeonEnvBuilder): + """ + Validate the pageserver-compatible API endpoints for setting and getting tenant conf, without + supplying the whole LocationConf. + """ + + env = neon_env_builder.init_start() + tenant_id = env.initial_tenant + + http = env.attachment_service.pageserver_api() + + default_value = "7days" + new_value = "1h" + http.set_tenant_config(tenant_id, {"pitr_interval": new_value}) + + # Ensure the change landed on the storage controller + readback_controller = http.tenant_config(tenant_id) + assert readback_controller.effective_config["pitr_interval"] == new_value + assert readback_controller.tenant_specific_overrides["pitr_interval"] == new_value + + # Ensure the change made it down to the pageserver + readback_ps = env.pageservers[0].http_client().tenant_config(tenant_id) + assert readback_ps.effective_config["pitr_interval"] == new_value + assert readback_ps.tenant_specific_overrides["pitr_interval"] == new_value + + # Omitting a value clears it. This looks different in storage controller + # vs. pageserver API calls, because pageserver has defaults. + http.set_tenant_config(tenant_id, {}) + readback_controller = http.tenant_config(tenant_id) + assert readback_controller.effective_config["pitr_interval"] is None + assert readback_controller.tenant_specific_overrides["pitr_interval"] is None + readback_ps = env.pageservers[0].http_client().tenant_config(tenant_id) + assert readback_ps.effective_config["pitr_interval"] == default_value + assert "pitr_interval" not in readback_ps.tenant_specific_overrides + + env.attachment_service.consistency_check()