diff --git a/Cargo.lock b/Cargo.lock index a083af020a..841c60c7e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2994,10 +2994,12 @@ name = "pageserver_api" version = "0.1.0" dependencies = [ "anyhow", + "bincode", "byteorder", "bytes", "const_format", "enum-map", + "hex", "postgres_ffi", "serde", "serde_json", diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index e13a234e89..237df48543 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -18,6 +18,7 @@ use camino::Utf8PathBuf; use pageserver_api::models::{ self, LocationConfig, TenantInfo, TenantLocationConfigRequest, TimelineInfo, }; +use pageserver_api::shard::TenantShardId; use postgres_backend::AuthType; use postgres_connection::{parse_host_port, PgConnectionConfig}; use reqwest::blocking::{Client, RequestBuilder, Response}; @@ -408,7 +409,7 @@ impl PageServerNode { }; let request = models::TenantCreateRequest { - new_tenant_id, + new_tenant_id: TenantShardId::unsharded(new_tenant_id), generation, config, }; diff --git a/libs/pageserver_api/Cargo.toml b/libs/pageserver_api/Cargo.toml index f97ec54e91..df9796b039 100644 --- a/libs/pageserver_api/Cargo.toml +++ b/libs/pageserver_api/Cargo.toml @@ -17,5 +17,9 @@ postgres_ffi.workspace = true enum-map.workspace = true strum.workspace = true strum_macros.workspace = true +hex.workspace = true workspace_hack.workspace = true + +[dev-dependencies] +bincode.workspace = true diff --git a/libs/pageserver_api/src/key.rs b/libs/pageserver_api/src/key.rs new file mode 100644 index 0000000000..b5350d6384 --- /dev/null +++ b/libs/pageserver_api/src/key.rs @@ -0,0 +1,142 @@ +use anyhow::{bail, Result}; +use byteorder::{ByteOrder, BE}; +use serde::{Deserialize, Serialize}; +use std::fmt; + +/// Key used in the Repository kv-store. +/// +/// The Repository treats this as an opaque struct, but see the code in pgdatadir_mapping.rs +/// for what we actually store in these fields. +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)] +pub struct Key { + pub field1: u8, + pub field2: u32, + pub field3: u32, + pub field4: u32, + pub field5: u8, + pub field6: u32, +} + +pub const KEY_SIZE: usize = 18; + +impl Key { + /// 'field2' is used to store tablespaceid for relations and small enum numbers for other relish. + /// As long as Neon does not support tablespace (because of lack of access to local file system), + /// we can assume that only some predefined namespace OIDs are used which can fit in u16 + pub fn to_i128(&self) -> i128 { + assert!(self.field2 < 0xFFFF || self.field2 == 0xFFFFFFFF || self.field2 == 0x22222222); + (((self.field1 & 0xf) as i128) << 120) + | (((self.field2 & 0xFFFF) as i128) << 104) + | ((self.field3 as i128) << 72) + | ((self.field4 as i128) << 40) + | ((self.field5 as i128) << 32) + | self.field6 as i128 + } + + pub const fn from_i128(x: i128) -> Self { + Key { + field1: ((x >> 120) & 0xf) as u8, + field2: ((x >> 104) & 0xFFFF) as u32, + field3: (x >> 72) as u32, + field4: (x >> 40) as u32, + field5: (x >> 32) as u8, + field6: x as u32, + } + } + + pub fn next(&self) -> Key { + self.add(1) + } + + pub fn add(&self, x: u32) -> Key { + let mut key = *self; + + let r = key.field6.overflowing_add(x); + key.field6 = r.0; + if r.1 { + let r = key.field5.overflowing_add(1); + key.field5 = r.0; + if r.1 { + let r = key.field4.overflowing_add(1); + key.field4 = r.0; + if r.1 { + let r = key.field3.overflowing_add(1); + key.field3 = r.0; + if r.1 { + let r = key.field2.overflowing_add(1); + key.field2 = r.0; + if r.1 { + let r = key.field1.overflowing_add(1); + key.field1 = r.0; + assert!(!r.1); + } + } + } + } + } + key + } + + pub fn from_slice(b: &[u8]) -> Self { + Key { + field1: b[0], + field2: u32::from_be_bytes(b[1..5].try_into().unwrap()), + field3: u32::from_be_bytes(b[5..9].try_into().unwrap()), + field4: u32::from_be_bytes(b[9..13].try_into().unwrap()), + field5: b[13], + field6: u32::from_be_bytes(b[14..18].try_into().unwrap()), + } + } + + pub fn write_to_byte_slice(&self, buf: &mut [u8]) { + buf[0] = self.field1; + BE::write_u32(&mut buf[1..5], self.field2); + BE::write_u32(&mut buf[5..9], self.field3); + BE::write_u32(&mut buf[9..13], self.field4); + buf[13] = self.field5; + BE::write_u32(&mut buf[14..18], self.field6); + } +} + +impl fmt::Display for Key { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "{:02X}{:08X}{:08X}{:08X}{:02X}{:08X}", + self.field1, self.field2, self.field3, self.field4, self.field5, self.field6 + ) + } +} + +impl Key { + pub const MIN: Key = Key { + field1: u8::MIN, + field2: u32::MIN, + field3: u32::MIN, + field4: u32::MIN, + field5: u8::MIN, + field6: u32::MIN, + }; + pub const MAX: Key = Key { + field1: u8::MAX, + field2: u32::MAX, + field3: u32::MAX, + field4: u32::MAX, + field5: u8::MAX, + field6: u32::MAX, + }; + + pub fn from_hex(s: &str) -> Result { + if s.len() != 36 { + bail!("parse error"); + } + Ok(Key { + field1: u8::from_str_radix(&s[0..2], 16)?, + field2: u32::from_str_radix(&s[2..10], 16)?, + field3: u32::from_str_radix(&s[10..18], 16)?, + field4: u32::from_str_radix(&s[18..26], 16)?, + field5: u8::from_str_radix(&s[26..28], 16)?, + field6: u32::from_str_radix(&s[28..36], 16)?, + }) + } +} diff --git a/libs/pageserver_api/src/lib.rs b/libs/pageserver_api/src/lib.rs index e49f7d00c1..511c5ed208 100644 --- a/libs/pageserver_api/src/lib.rs +++ b/libs/pageserver_api/src/lib.rs @@ -4,8 +4,10 @@ use const_format::formatcp; /// Public API types pub mod control_api; +pub mod key; pub mod models; pub mod reltag; +pub mod shard; pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000; pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}"); diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index cb99dc0a55..71e32e479f 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -16,7 +16,7 @@ use utils::{ lsn::Lsn, }; -use crate::reltag::RelTag; +use crate::{reltag::RelTag, shard::TenantShardId}; use anyhow::bail; use bytes::{BufMut, Bytes, BytesMut}; @@ -187,7 +187,7 @@ pub struct TimelineCreateRequest { #[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct TenantCreateRequest { - pub new_tenant_id: TenantId, + pub new_tenant_id: TenantShardId, #[serde(default)] #[serde(skip_serializing_if = "Option::is_none")] pub generation: Option, diff --git a/libs/pageserver_api/src/shard.rs b/libs/pageserver_api/src/shard.rs new file mode 100644 index 0000000000..32a834a26a --- /dev/null +++ b/libs/pageserver_api/src/shard.rs @@ -0,0 +1,321 @@ +use std::{ops::RangeInclusive, str::FromStr}; + +use hex::FromHex; +use serde::{Deserialize, Serialize}; +use utils::id::TenantId; + +#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug)] +pub struct ShardNumber(pub u8); + +#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug)] +pub struct ShardCount(pub u8); + +impl ShardCount { + pub const MAX: Self = Self(u8::MAX); +} + +impl ShardNumber { + pub const MAX: Self = Self(u8::MAX); +} + +/// TenantShardId identify the units of work for the Pageserver. +/// +/// These are written as `-`, for example: +/// +/// # The second shard in a two-shard tenant +/// 072f1291a5310026820b2fe4b2968934-0102 +/// +/// Historically, tenants could not have multiple shards, and were identified +/// by TenantId. To support this, TenantShardId has a special legacy +/// mode where `shard_count` is equal to zero: this represents a single-sharded +/// tenant which should be written as a TenantId with no suffix. +/// +/// The human-readable encoding of TenantShardId, such as used in API URLs, +/// is both forward and backward compatible: a legacy TenantId can be +/// decoded as a TenantShardId, and when re-encoded it will be parseable +/// as a TenantId. +/// +/// Note that the binary encoding is _not_ backward compatible, because +/// at the time sharding is introduced, there are no existing binary structures +/// containing TenantId that we need to handle. +#[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy)] +pub struct TenantShardId { + pub tenant_id: TenantId, + pub shard_number: ShardNumber, + pub shard_count: ShardCount, +} + +impl TenantShardId { + pub fn unsharded(tenant_id: TenantId) -> Self { + Self { + tenant_id, + shard_number: ShardNumber(0), + shard_count: ShardCount(0), + } + } + + /// The range of all TenantShardId that belong to a particular TenantId. This is useful when + /// you have a BTreeMap of TenantShardId, and are querying by TenantId. + pub fn tenant_range(tenant_id: TenantId) -> RangeInclusive { + RangeInclusive::new( + Self { + tenant_id, + shard_number: ShardNumber(0), + shard_count: ShardCount(0), + }, + Self { + tenant_id, + shard_number: ShardNumber::MAX, + shard_count: ShardCount::MAX, + }, + ) + } + + pub fn shard_slug(&self) -> String { + format!("{:02x}{:02x}", self.shard_number.0, self.shard_count.0) + } +} + +impl std::fmt::Display for TenantShardId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if self.shard_count != ShardCount(0) { + write!( + f, + "{}-{:02x}{:02x}", + self.tenant_id, self.shard_number.0, self.shard_count.0 + ) + } else { + // Legacy case (shard_count == 0) -- format as just the tenant id. Note that this + // is distinct from the normal single shard case (shard count == 1). + self.tenant_id.fmt(f) + } + } +} + +impl std::fmt::Debug for TenantShardId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // Debug is the same as Display: the compact hex representation + write!(f, "{}", self) + } +} + +impl std::str::FromStr for TenantShardId { + type Err = hex::FromHexError; + + fn from_str(s: &str) -> Result { + // Expect format: 16 byte TenantId, '-', 1 byte shard number, 1 byte shard count + if s.len() == 32 { + // Legacy case: no shard specified + Ok(Self { + tenant_id: TenantId::from_str(s)?, + shard_number: ShardNumber(0), + shard_count: ShardCount(0), + }) + } else if s.len() == 37 { + let bytes = s.as_bytes(); + let tenant_id = TenantId::from_hex(&bytes[0..32])?; + let mut shard_parts: [u8; 2] = [0u8; 2]; + hex::decode_to_slice(&bytes[33..37], &mut shard_parts)?; + Ok(Self { + tenant_id, + shard_number: ShardNumber(shard_parts[0]), + shard_count: ShardCount(shard_parts[1]), + }) + } else { + Err(hex::FromHexError::InvalidStringLength) + } + } +} + +impl From<[u8; 18]> for TenantShardId { + fn from(b: [u8; 18]) -> Self { + let tenant_id_bytes: [u8; 16] = b[0..16].try_into().unwrap(); + + Self { + tenant_id: TenantId::from(tenant_id_bytes), + shard_number: ShardNumber(b[16]), + shard_count: ShardCount(b[17]), + } + } +} + +impl Serialize for TenantShardId { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + if serializer.is_human_readable() { + serializer.collect_str(self) + } else { + let mut packed: [u8; 18] = [0; 18]; + packed[0..16].clone_from_slice(&self.tenant_id.as_arr()); + packed[16] = self.shard_number.0; + packed[17] = self.shard_count.0; + + packed.serialize(serializer) + } + } +} + +impl<'de> Deserialize<'de> for TenantShardId { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + struct IdVisitor { + is_human_readable_deserializer: bool, + } + + impl<'de> serde::de::Visitor<'de> for IdVisitor { + type Value = TenantShardId; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + if self.is_human_readable_deserializer { + formatter.write_str("value in form of hex string") + } else { + formatter.write_str("value in form of integer array([u8; 18])") + } + } + + fn visit_seq(self, seq: A) -> Result + where + A: serde::de::SeqAccess<'de>, + { + let s = serde::de::value::SeqAccessDeserializer::new(seq); + let id: [u8; 18] = Deserialize::deserialize(s)?; + Ok(TenantShardId::from(id)) + } + + fn visit_str(self, v: &str) -> Result + where + E: serde::de::Error, + { + TenantShardId::from_str(v).map_err(E::custom) + } + } + + if deserializer.is_human_readable() { + deserializer.deserialize_str(IdVisitor { + is_human_readable_deserializer: true, + }) + } else { + deserializer.deserialize_tuple( + 18, + IdVisitor { + is_human_readable_deserializer: false, + }, + ) + } + } +} + +#[cfg(test)] +mod tests { + use std::str::FromStr; + + use bincode; + use utils::{id::TenantId, Hex}; + + use super::*; + + const EXAMPLE_TENANT_ID: &str = "1f359dd625e519a1a4e8d7509690f6fc"; + + #[test] + fn tenant_shard_id_string() -> Result<(), hex::FromHexError> { + let example = TenantShardId { + tenant_id: TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(), + shard_count: ShardCount(10), + shard_number: ShardNumber(7), + }; + + let encoded = format!("{example}"); + + let expected = format!("{EXAMPLE_TENANT_ID}-070a"); + assert_eq!(&encoded, &expected); + + let decoded = TenantShardId::from_str(&encoded)?; + + assert_eq!(example, decoded); + + Ok(()) + } + + #[test] + fn tenant_shard_id_binary() -> Result<(), hex::FromHexError> { + let example = TenantShardId { + tenant_id: TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(), + shard_count: ShardCount(10), + shard_number: ShardNumber(7), + }; + + let encoded = bincode::serialize(&example).unwrap(); + let expected: [u8; 18] = [ + 0x1f, 0x35, 0x9d, 0xd6, 0x25, 0xe5, 0x19, 0xa1, 0xa4, 0xe8, 0xd7, 0x50, 0x96, 0x90, + 0xf6, 0xfc, 0x07, 0x0a, + ]; + assert_eq!(Hex(&encoded), Hex(&expected)); + + let decoded = bincode::deserialize(&encoded).unwrap(); + + assert_eq!(example, decoded); + + Ok(()) + } + + #[test] + fn tenant_shard_id_backward_compat() -> Result<(), hex::FromHexError> { + // Test that TenantShardId can decode a TenantId in human + // readable form + let example = TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(); + let encoded = format!("{example}"); + + assert_eq!(&encoded, EXAMPLE_TENANT_ID); + + let decoded = TenantShardId::from_str(&encoded)?; + + assert_eq!(example, decoded.tenant_id); + assert_eq!(decoded.shard_count, ShardCount(0)); + assert_eq!(decoded.shard_number, ShardNumber(0)); + + Ok(()) + } + + #[test] + fn tenant_shard_id_forward_compat() -> Result<(), hex::FromHexError> { + // Test that a legacy TenantShardId encodes into a form that + // can be decoded as TenantId + let example_tenant_id = TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(); + let example = TenantShardId::unsharded(example_tenant_id); + let encoded = format!("{example}"); + + assert_eq!(&encoded, EXAMPLE_TENANT_ID); + + let decoded = TenantId::from_str(&encoded)?; + + assert_eq!(example_tenant_id, decoded); + + Ok(()) + } + + #[test] + fn tenant_shard_id_legacy_binary() -> Result<(), hex::FromHexError> { + // Unlike in human readable encoding, binary encoding does not + // do any special handling of legacy unsharded TenantIds: this test + // is equivalent to the main test for binary encoding, just verifying + // that the same behavior applies when we have used `unsharded()` to + // construct a TenantShardId. + let example = TenantShardId::unsharded(TenantId::from_str(EXAMPLE_TENANT_ID).unwrap()); + let encoded = bincode::serialize(&example).unwrap(); + + let expected: [u8; 18] = [ + 0x1f, 0x35, 0x9d, 0xd6, 0x25, 0xe5, 0x19, 0xa1, 0xa4, 0xe8, 0xd7, 0x50, 0x96, 0x90, + 0xf6, 0xfc, 0x00, 0x00, + ]; + assert_eq!(Hex(&encoded), Hex(&expected)); + + let decoded = bincode::deserialize::(&encoded).unwrap(); + assert_eq!(example, decoded); + + Ok(()) + } +} diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 2915178104..aa2b017471 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -16,6 +16,7 @@ use pageserver_api::models::{ DownloadRemoteLayersTaskSpawnRequest, LocationConfigMode, TenantAttachRequest, TenantLoadRequest, TenantLocationConfigRequest, }; +use pageserver_api::shard::TenantShardId; use remote_storage::GenericRemoteStorage; use tenant_size_model::{SizeResult, StorageModel}; use tokio_util::sync::CancellationToken; @@ -419,9 +420,9 @@ async fn timeline_create_handler( mut request: Request, _cancel: CancellationToken, ) -> Result, ApiError> { - let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; let request_data: TimelineCreateRequest = json_request(&mut request).await?; - check_permission(&request, Some(tenant_id))?; + check_permission(&request, Some(tenant_shard_id.tenant_id))?; let new_timeline_id = request_data.new_timeline_id; @@ -430,7 +431,7 @@ async fn timeline_create_handler( let state = get_state(&request); async { - let tenant = mgr::get_tenant(tenant_id, true)?; + let tenant = state.tenant_manager.get_attached_tenant_shard(tenant_shard_id, true)?; match tenant.create_timeline( new_timeline_id, request_data.ancestor_timeline_id.map(TimelineId::from), @@ -464,7 +465,10 @@ async fn timeline_create_handler( Err(tenant::CreateTimelineError::Other(err)) => Err(ApiError::InternalServerError(err)), } } - .instrument(info_span!("timeline_create", %tenant_id, timeline_id = %new_timeline_id, lsn=?request_data.ancestor_start_lsn, pg_version=?request_data.pg_version)) + .instrument(info_span!("timeline_create", + tenant_id = %tenant_shard_id.tenant_id, + shard = %tenant_shard_id.shard_slug(), + timeline_id = %new_timeline_id, lsn=?request_data.ancestor_start_lsn, pg_version=?request_data.pg_version)) .await } @@ -660,14 +664,15 @@ async fn timeline_delete_handler( request: Request, _cancel: CancellationToken, ) -> Result, ApiError> { - let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?; - check_permission(&request, Some(tenant_id))?; + check_permission(&request, Some(tenant_shard_id.tenant_id))?; let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); + let state = get_state(&request); - mgr::delete_timeline(tenant_id, timeline_id, &ctx) - .instrument(info_span!("timeline_delete", %tenant_id, %timeline_id)) + state.tenant_manager.delete_timeline(tenant_shard_id, timeline_id, &ctx) + .instrument(info_span!("timeline_delete", tenant_id=%tenant_shard_id.tenant_id, shard=%tenant_shard_id.shard_slug(), %timeline_id)) .await?; json_response(StatusCode::ACCEPTED, ()) @@ -681,11 +686,14 @@ async fn tenant_detach_handler( check_permission(&request, Some(tenant_id))?; let detach_ignored: Option = parse_query_param(&request, "detach_ignored")?; + // This is a legacy API (`/location_conf` is the replacement). It only supports unsharded tenants + let tenant_shard_id = TenantShardId::unsharded(tenant_id); + let state = get_state(&request); let conf = state.conf; mgr::detach_tenant( conf, - tenant_id, + tenant_shard_id, detach_ignored.unwrap_or(false), &state.deletion_queue_client, ) @@ -802,13 +810,16 @@ async fn tenant_delete_handler( _cancel: CancellationToken, ) -> Result, ApiError> { // TODO openapi spec - let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; - check_permission(&request, Some(tenant_id))?; + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; + check_permission(&request, Some(tenant_shard_id.tenant_id))?; let state = get_state(&request); - mgr::delete_tenant(state.conf, state.remote_storage.clone(), tenant_id) - .instrument(info_span!("tenant_delete_handler", %tenant_id)) + mgr::delete_tenant(state.conf, state.remote_storage.clone(), tenant_shard_id) + .instrument(info_span!("tenant_delete_handler", + tenant_id = %tenant_shard_id.tenant_id, + shard = tenant_shard_id.shard_slug() + )) .await?; json_response(StatusCode::ACCEPTED, ()) @@ -1138,9 +1149,10 @@ async fn put_tenant_location_config_handler( mut request: Request, _cancel: CancellationToken, ) -> Result, ApiError> { + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; + let request_data: TenantLocationConfigRequest = json_request(&mut request).await?; - let tenant_id = request_data.tenant_id; - check_permission(&request, Some(tenant_id))?; + check_permission(&request, Some(tenant_shard_id.tenant_id))?; let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn); let state = get_state(&request); @@ -1149,9 +1161,13 @@ async fn put_tenant_location_config_handler( // The `Detached` state is special, it doesn't upsert a tenant, it removes // its local disk content and drops it from memory. if let LocationConfigMode::Detached = request_data.config.mode { - if let Err(e) = mgr::detach_tenant(conf, tenant_id, true, &state.deletion_queue_client) - .instrument(info_span!("tenant_detach", %tenant_id)) - .await + if let Err(e) = + mgr::detach_tenant(conf, tenant_shard_id, true, &state.deletion_queue_client) + .instrument(info_span!("tenant_detach", + tenant_id = %tenant_shard_id.tenant_id, + shard = tenant_shard_id.shard_slug() + )) + .await { match e { TenantStateError::SlotError(TenantSlotError::NotFound(_)) => { @@ -1168,7 +1184,7 @@ async fn put_tenant_location_config_handler( state .tenant_manager - .upsert_location(tenant_id, location_conf, &ctx) + .upsert_location(tenant_shard_id, location_conf, &ctx) .await // TODO: badrequest assumes the caller was asking for something unreasonable, but in // principle we might have hit something like concurrent API calls to the same tenant, @@ -1752,7 +1768,7 @@ pub fn make_router( .get("/v1/tenant", |r| api_handler(r, tenant_list_handler)) .post("/v1/tenant", |r| api_handler(r, tenant_create_handler)) .get("/v1/tenant/:tenant_id", |r| api_handler(r, tenant_status)) - .delete("/v1/tenant/:tenant_id", |r| { + .delete("/v1/tenant/:tenant_shard_id", |r| { api_handler(r, tenant_delete_handler) }) .get("/v1/tenant/:tenant_id/synthetic_size", |r| { @@ -1764,13 +1780,13 @@ pub fn make_router( .get("/v1/tenant/:tenant_id/config", |r| { api_handler(r, get_tenant_config_handler) }) - .put("/v1/tenant/:tenant_id/location_config", |r| { + .put("/v1/tenant/:tenant_shard_id/location_config", |r| { api_handler(r, put_tenant_location_config_handler) }) .get("/v1/tenant/:tenant_id/timeline", |r| { api_handler(r, timeline_list_handler) }) - .post("/v1/tenant/:tenant_id/timeline", |r| { + .post("/v1/tenant/:tenant_shard_id/timeline", |r| { api_handler(r, timeline_create_handler) }) .post("/v1/tenant/:tenant_id/attach", |r| { @@ -1814,7 +1830,7 @@ pub fn make_router( "/v1/tenant/:tenant_id/timeline/:timeline_id/download_remote_layers", |r| api_handler(r, timeline_download_remote_layers_handler_get), ) - .delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| { + .delete("/v1/tenant/:tenant_shard_id/timeline/:timeline_id", |r| { api_handler(r, timeline_delete_handler) }) .get("/v1/tenant/:tenant_id/timeline/:timeline_id/layer", |r| { diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index dc75aeeb50..24f47df92e 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -1,106 +1,11 @@ use crate::walrecord::NeonWalRecord; -use anyhow::{bail, Result}; -use byteorder::{ByteOrder, BE}; +use anyhow::Result; use bytes::Bytes; use serde::{Deserialize, Serialize}; -use std::fmt; use std::ops::{AddAssign, Range}; use std::time::Duration; -/// Key used in the Repository kv-store. -/// -/// The Repository treats this as an opaque struct, but see the code in pgdatadir_mapping.rs -/// for what we actually store in these fields. -#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)] -pub struct Key { - pub field1: u8, - pub field2: u32, - pub field3: u32, - pub field4: u32, - pub field5: u8, - pub field6: u32, -} - -pub const KEY_SIZE: usize = 18; - -impl Key { - /// 'field2' is used to store tablespaceid for relations and small enum numbers for other relish. - /// As long as Neon does not support tablespace (because of lack of access to local file system), - /// we can assume that only some predefined namespace OIDs are used which can fit in u16 - pub fn to_i128(&self) -> i128 { - assert!(self.field2 < 0xFFFF || self.field2 == 0xFFFFFFFF || self.field2 == 0x22222222); - (((self.field1 & 0xf) as i128) << 120) - | (((self.field2 & 0xFFFF) as i128) << 104) - | ((self.field3 as i128) << 72) - | ((self.field4 as i128) << 40) - | ((self.field5 as i128) << 32) - | self.field6 as i128 - } - - pub const fn from_i128(x: i128) -> Self { - Key { - field1: ((x >> 120) & 0xf) as u8, - field2: ((x >> 104) & 0xFFFF) as u32, - field3: (x >> 72) as u32, - field4: (x >> 40) as u32, - field5: (x >> 32) as u8, - field6: x as u32, - } - } - - pub fn next(&self) -> Key { - self.add(1) - } - - pub fn add(&self, x: u32) -> Key { - let mut key = *self; - - let r = key.field6.overflowing_add(x); - key.field6 = r.0; - if r.1 { - let r = key.field5.overflowing_add(1); - key.field5 = r.0; - if r.1 { - let r = key.field4.overflowing_add(1); - key.field4 = r.0; - if r.1 { - let r = key.field3.overflowing_add(1); - key.field3 = r.0; - if r.1 { - let r = key.field2.overflowing_add(1); - key.field2 = r.0; - if r.1 { - let r = key.field1.overflowing_add(1); - key.field1 = r.0; - assert!(!r.1); - } - } - } - } - } - key - } - - pub fn from_slice(b: &[u8]) -> Self { - Key { - field1: b[0], - field2: u32::from_be_bytes(b[1..5].try_into().unwrap()), - field3: u32::from_be_bytes(b[5..9].try_into().unwrap()), - field4: u32::from_be_bytes(b[9..13].try_into().unwrap()), - field5: b[13], - field6: u32::from_be_bytes(b[14..18].try_into().unwrap()), - } - } - - pub fn write_to_byte_slice(&self, buf: &mut [u8]) { - buf[0] = self.field1; - BE::write_u32(&mut buf[1..5], self.field2); - BE::write_u32(&mut buf[5..9], self.field3); - BE::write_u32(&mut buf[9..13], self.field4); - buf[13] = self.field5; - BE::write_u32(&mut buf[14..18], self.field6); - } -} +pub use pageserver_api::key::{Key, KEY_SIZE}; pub fn key_range_size(key_range: &Range) -> u32 { let start = key_range.start; @@ -129,49 +34,6 @@ pub fn singleton_range(key: Key) -> Range { key..key.next() } -impl fmt::Display for Key { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "{:02X}{:08X}{:08X}{:08X}{:02X}{:08X}", - self.field1, self.field2, self.field3, self.field4, self.field5, self.field6 - ) - } -} - -impl Key { - pub const MIN: Key = Key { - field1: u8::MIN, - field2: u32::MIN, - field3: u32::MIN, - field4: u32::MIN, - field5: u8::MIN, - field6: u32::MIN, - }; - pub const MAX: Key = Key { - field1: u8::MAX, - field2: u32::MAX, - field3: u32::MAX, - field4: u32::MAX, - field5: u8::MAX, - field6: u32::MAX, - }; - - pub fn from_hex(s: &str) -> Result { - if s.len() != 36 { - bail!("parse error"); - } - Ok(Key { - field1: u8::from_str_radix(&s[0..2], 16)?, - field2: u32::from_str_radix(&s[2..10], 16)?, - field3: u32::from_str_radix(&s[10..18], 16)?, - field4: u32::from_str_radix(&s[18..26], 16)?, - field5: u8::from_str_radix(&s[26..28], 16)?, - field6: u32::from_str_radix(&s[28..36], 16)?, - }) - } -} - /// A 'value' stored for a one Key. #[derive(Debug, Clone, Serialize, Deserialize)] #[cfg_attr(test, derive(PartialEq))] diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 4363dab375..a766cca0c5 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -2,9 +2,10 @@ //! page server. use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf}; +use pageserver_api::shard::TenantShardId; use rand::{distributions::Alphanumeric, Rng}; use std::borrow::Cow; -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::ops::Deref; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -30,6 +31,7 @@ use crate::metrics::TENANT_MANAGER as METRICS; use crate::task_mgr::{self, TaskKind}; use crate::tenant::config::{AttachmentMode, LocationConf, LocationMode, TenantConfOpt}; use crate::tenant::delete::DeleteTenantFlow; +use crate::tenant::span::debug_assert_current_span_has_tenant_id; use crate::tenant::{create_tenant_files, AttachedTenantConf, SpawnMode, Tenant, TenantState}; use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, TEMP_FILE_SUFFIX}; @@ -87,10 +89,37 @@ pub(crate) enum TenantsMap { Initializing, /// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded. /// New tenants can be added using [`tenant_map_acquire_slot`]. - Open(HashMap), + Open(BTreeMap), /// The pageserver has entered shutdown mode via [`shutdown_all_tenants`]. /// Existing tenants are still accessible, but no new tenants can be created. - ShuttingDown(HashMap), + ShuttingDown(BTreeMap), +} + +/// Helper for mapping shard-unaware functions to a sharding-aware map +/// TODO(sharding): all users of this must be made shard-aware. +fn exactly_one_or_none<'a>( + map: &'a BTreeMap, + tenant_id: &TenantId, +) -> Option<(&'a TenantShardId, &'a TenantSlot)> { + let mut slots = map.range(TenantShardId::tenant_range(*tenant_id)); + + // Retrieve the first two slots in the range: if both are populated, we must panic because the caller + // needs a shard-naive view of the world in which only one slot can exist for a TenantId at a time. + let slot_a = slots.next(); + let slot_b = slots.next(); + match (slot_a, slot_b) { + (None, None) => None, + (Some(slot), None) => { + // Exactly one matching slot + Some(slot) + } + (Some(_slot_a), Some(_slot_b)) => { + // Multiple shards for this tenant: cannot handle this yet. + // TODO(sharding): callers of get() should be shard-aware. + todo!("Attaching multiple shards in teh same tenant to the same pageserver") + } + (None, Some(_)) => unreachable!(), + } } impl TenantsMap { @@ -101,7 +130,8 @@ impl TenantsMap { match self { TenantsMap::Initializing => None, TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => { - m.get(tenant_id).and_then(TenantSlot::get_attached) + // TODO(sharding): callers of get() should be shard-aware. + exactly_one_or_none(m, tenant_id).and_then(|(_, slot)| slot.get_attached()) } } } @@ -109,7 +139,10 @@ impl TenantsMap { pub(crate) fn remove(&mut self, tenant_id: &TenantId) -> Option { match self { TenantsMap::Initializing => None, - TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.remove(tenant_id), + TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => { + let key = exactly_one_or_none(m, tenant_id).map(|(k, _)| *k); + key.and_then(|key| m.remove(&key)) + } } } @@ -383,7 +416,7 @@ pub async fn init_tenant_mgr( init_order: InitializationOrder, cancel: CancellationToken, ) -> anyhow::Result { - let mut tenants = HashMap::new(); + let mut tenants = BTreeMap::new(); let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn); @@ -404,7 +437,7 @@ pub async fn init_tenant_mgr( warn!(%tenant_id, "Marking tenant broken, failed to {e:#}"); tenants.insert( - tenant_id, + TenantShardId::unsharded(tenant_id), TenantSlot::Attached(Tenant::create_broken_tenant( conf, tenant_id, @@ -427,7 +460,7 @@ pub async fn init_tenant_mgr( // tenants, because they do no remote writes and hence require no // generation number info!(%tenant_id, "Loaded tenant in secondary mode"); - tenants.insert(tenant_id, TenantSlot::Secondary); + tenants.insert(TenantShardId::unsharded(tenant_id), TenantSlot::Secondary); } LocationMode::Attached(_) => { // TODO: augment re-attach API to enable the control plane to @@ -470,7 +503,10 @@ pub async fn init_tenant_mgr( &ctx, ) { Ok(tenant) => { - tenants.insert(tenant.tenant_id(), TenantSlot::Attached(tenant)); + tenants.insert( + TenantShardId::unsharded(tenant.tenant_id()), + TenantSlot::Attached(tenant), + ); } Err(e) => { error!(%tenant_id, "Failed to start tenant: {e:#}"); @@ -573,19 +609,19 @@ async fn shutdown_all_tenants0(tenants: &std::sync::RwLock) { let mut m = tenants.write().unwrap(); match &mut *m { TenantsMap::Initializing => { - *m = TenantsMap::ShuttingDown(HashMap::default()); + *m = TenantsMap::ShuttingDown(BTreeMap::default()); info!("tenants map is empty"); return; } TenantsMap::Open(tenants) => { - let mut shutdown_state = HashMap::new(); + let mut shutdown_state = BTreeMap::new(); let mut total_in_progress = 0; let mut total_attached = 0; - for (tenant_id, v) in tenants.drain() { + for (tenant_shard_id, v) in std::mem::take(tenants).into_iter() { match v { TenantSlot::Attached(t) => { - shutdown_state.insert(tenant_id, TenantSlot::Attached(t.clone())); + shutdown_state.insert(tenant_shard_id, TenantSlot::Attached(t.clone())); join_set.spawn( async move { let freeze_and_flush = true; @@ -604,13 +640,13 @@ async fn shutdown_all_tenants0(tenants: &std::sync::RwLock) { // going to log too many lines debug!("tenant successfully stopped"); } - .instrument(info_span!("shutdown", %tenant_id)), + .instrument(info_span!("shutdown", tenant_id=%tenant_shard_id.tenant_id, shard=%tenant_shard_id.shard_slug())), ); total_attached += 1; } TenantSlot::Secondary => { - shutdown_state.insert(tenant_id, TenantSlot::Secondary); + shutdown_state.insert(tenant_shard_id, TenantSlot::Secondary); } TenantSlot::InProgress(notify) => { // InProgress tenants are not visible in TenantsMap::ShuttingDown: we will @@ -690,19 +726,22 @@ async fn shutdown_all_tenants0(tenants: &std::sync::RwLock) { pub(crate) async fn create_tenant( conf: &'static PageServerConf, tenant_conf: TenantConfOpt, - tenant_id: TenantId, + tenant_shard_id: TenantShardId, generation: Generation, resources: TenantSharedResources, ctx: &RequestContext, ) -> Result, TenantMapInsertError> { let location_conf = LocationConf::attached_single(tenant_conf, generation); - let slot_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::MustNotExist)?; - let tenant_path = super::create_tenant_files(conf, &location_conf, &tenant_id).await?; + let slot_guard = + tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustNotExist)?; + // TODO(sharding): make local paths shard-aware + let tenant_path = + super::create_tenant_files(conf, &location_conf, &tenant_shard_id.tenant_id).await?; let created_tenant = tenant_spawn( conf, - tenant_id, + tenant_shard_id.tenant_id, &tenant_path, resources, AttachedTenantConf::try_from(location_conf)?, @@ -715,11 +754,7 @@ pub(crate) async fn create_tenant( // See https://github.com/neondatabase/neon/issues/4233 let created_tenant_id = created_tenant.tenant_id(); - if tenant_id != created_tenant_id { - return Err(TenantMapInsertError::Other(anyhow::anyhow!( - "loaded created tenant has unexpected tenant id (expect {tenant_id} != actual {created_tenant_id})", - ))); - } + debug_assert_eq!(created_tenant_id, tenant_shard_id.tenant_id); slot_guard.upsert(TenantSlot::Attached(created_tenant.clone()))?; @@ -755,21 +790,70 @@ pub(crate) async fn set_new_tenant_config( } impl TenantManager { - #[instrument(skip_all, fields(%tenant_id))] + /// Gets the attached tenant from the in-memory data, erroring if it's absent, in secondary mode, or is not fitting to the query. + /// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants. + /// + /// This method is cancel-safe. + pub(crate) fn get_attached_tenant_shard( + &self, + tenant_shard_id: TenantShardId, + active_only: bool, + ) -> Result, GetTenantError> { + let locked = self.tenants.read().unwrap(); + + let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)?; + + match peek_slot { + Some(TenantSlot::Attached(tenant)) => match tenant.current_state() { + TenantState::Broken { + reason, + backtrace: _, + } if active_only => Err(GetTenantError::Broken(reason)), + TenantState::Active => Ok(Arc::clone(tenant)), + _ => { + if active_only { + Err(GetTenantError::NotActive(tenant_shard_id.tenant_id)) + } else { + Ok(Arc::clone(tenant)) + } + } + }, + Some(TenantSlot::InProgress(_)) => { + Err(GetTenantError::NotActive(tenant_shard_id.tenant_id)) + } + None | Some(TenantSlot::Secondary) => { + Err(GetTenantError::NotFound(tenant_shard_id.tenant_id)) + } + } + } + + pub(crate) async fn delete_timeline( + &self, + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + _ctx: &RequestContext, + ) -> Result<(), DeleteTimelineError> { + let tenant = self.get_attached_tenant_shard(tenant_shard_id, true)?; + DeleteTimelineFlow::run(&tenant, timeline_id, false).await?; + Ok(()) + } + pub(crate) async fn upsert_location( &self, - tenant_id: TenantId, + tenant_shard_id: TenantShardId, new_location_config: LocationConf, ctx: &RequestContext, ) -> Result<(), anyhow::Error> { - info!("configuring tenant location {tenant_id} to state {new_location_config:?}"); + debug_assert_current_span_has_tenant_id(); + info!("configuring tenant location to state {new_location_config:?}"); // Special case fast-path for updates to Tenant: if our upsert is only updating configuration, // then we do not need to set the slot to InProgress, we can just call into the // existng tenant. { let locked = self.tenants.read().unwrap(); - let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, TenantSlotPeekMode::Write)?; + let peek_slot = + tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Write)?; match (&new_location_config.mode, peek_slot) { (LocationMode::Attached(attach_conf), Some(TenantSlot::Attached(tenant))) => { if attach_conf.generation == tenant.generation { @@ -800,7 +884,7 @@ impl TenantManager { // the tenant is inaccessible to the outside world while we are doing this, but that is sensible: // the state is ill-defined while we're in transition. Transitions are async, but fast: we do // not do significant I/O, and shutdowns should be prompt via cancellation tokens. - let mut slot_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::Any)?; + let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?; if let Some(TenantSlot::Attached(tenant)) = slot_guard.get_old_value() { // The case where we keep a Tenant alive was covered above in the special case @@ -831,25 +915,31 @@ impl TenantManager { slot_guard.drop_old_value().expect("We just shut it down"); } - let tenant_path = self.conf.tenant_path(&tenant_id); + // TODO(sharding): make local paths sharding-aware + let tenant_path = self.conf.tenant_path(&tenant_shard_id.tenant_id); let new_slot = match &new_location_config.mode { LocationMode::Secondary(_) => { - let tenant_path = self.conf.tenant_path(&tenant_id); // Directory doesn't need to be fsync'd because if we crash it can // safely be recreated next time this tenant location is configured. unsafe_create_dir_all(&tenant_path) .await .with_context(|| format!("Creating {tenant_path}"))?; - Tenant::persist_tenant_config(self.conf, &tenant_id, &new_location_config) - .await - .map_err(SetNewTenantConfigError::Persist)?; + // TODO(sharding): make local paths sharding-aware + Tenant::persist_tenant_config( + self.conf, + &tenant_shard_id.tenant_id, + &new_location_config, + ) + .await + .map_err(SetNewTenantConfigError::Persist)?; TenantSlot::Secondary } LocationMode::Attached(_attach_config) => { - let timelines_path = self.conf.timelines_path(&tenant_id); + // TODO(sharding): make local paths sharding-aware + let timelines_path = self.conf.timelines_path(&tenant_shard_id.tenant_id); // Directory doesn't need to be fsync'd because we do not depend on // it to exist after crashes: it may be recreated when tenant is @@ -858,13 +948,19 @@ impl TenantManager { .await .with_context(|| format!("Creating {timelines_path}"))?; - Tenant::persist_tenant_config(self.conf, &tenant_id, &new_location_config) - .await - .map_err(SetNewTenantConfigError::Persist)?; + // TODO(sharding): make local paths sharding-aware + Tenant::persist_tenant_config( + self.conf, + &tenant_shard_id.tenant_id, + &new_location_config, + ) + .await + .map_err(SetNewTenantConfigError::Persist)?; + // TODO(sharding): make spawn sharding-aware let tenant = tenant_spawn( self.conf, - tenant_id, + tenant_shard_id.tenant_id, &tenant_path, self.resources.clone(), AttachedTenantConf::try_from(new_location_config)?, @@ -910,7 +1006,11 @@ pub(crate) fn get_tenant( active_only: bool, ) -> Result, GetTenantError> { let locked = TENANTS.read().unwrap(); - let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, TenantSlotPeekMode::Read)?; + + // TODO(sharding): make all callers of get_tenant shard-aware + let tenant_shard_id = TenantShardId::unsharded(tenant_id); + + let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)?; match peek_slot { Some(TenantSlot::Attached(tenant)) => match tenant.current_state() { @@ -970,12 +1070,16 @@ pub(crate) async fn get_active_tenant_with_timeout( Tenant(Arc), } + // TODO(sharding): make page service interface sharding-aware (page service should apply ShardIdentity to the key + // to decide which shard services the request) + let tenant_shard_id = TenantShardId::unsharded(tenant_id); + let wait_start = Instant::now(); let deadline = wait_start + timeout; let wait_for = { let locked = TENANTS.read().unwrap(); - let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, TenantSlotPeekMode::Read) + let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read) .map_err(GetTenantError::MapState)?; match peek_slot { Some(TenantSlot::Attached(tenant)) => { @@ -1019,8 +1123,9 @@ pub(crate) async fn get_active_tenant_with_timeout( })?; { let locked = TENANTS.read().unwrap(); - let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, TenantSlotPeekMode::Read) - .map_err(GetTenantError::MapState)?; + let peek_slot = + tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read) + .map_err(GetTenantError::MapState)?; match peek_slot { Some(TenantSlot::Attached(tenant)) => tenant.clone(), _ => { @@ -1062,7 +1167,7 @@ pub(crate) async fn get_active_tenant_with_timeout( pub(crate) async fn delete_tenant( conf: &'static PageServerConf, remote_storage: Option, - tenant_id: TenantId, + tenant_shard_id: TenantShardId, ) -> Result<(), DeleteTenantError> { // We acquire a SlotGuard during this function to protect against concurrent // changes while the ::prepare phase of DeleteTenantFlow executes, but then @@ -1075,7 +1180,9 @@ pub(crate) async fn delete_tenant( // // See https://github.com/neondatabase/neon/issues/5080 - let mut slot_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::MustExist)?; + // TODO(sharding): make delete API sharding-aware + let mut slot_guard = + tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist)?; // unwrap is safe because we used MustExist mode when acquiring let tenant = match slot_guard.get_old_value().as_ref().unwrap() { @@ -1102,16 +1209,6 @@ pub(crate) enum DeleteTimelineError { Timeline(#[from] crate::tenant::DeleteTimelineError), } -pub(crate) async fn delete_timeline( - tenant_id: TenantId, - timeline_id: TimelineId, - _ctx: &RequestContext, -) -> Result<(), DeleteTimelineError> { - let tenant = get_tenant(tenant_id, true)?; - DeleteTimelineFlow::run(&tenant, timeline_id, false).await?; - Ok(()) -} - #[derive(Debug, thiserror::Error)] pub(crate) enum TenantStateError { #[error("Tenant {0} is stopping")] @@ -1126,14 +1223,14 @@ pub(crate) enum TenantStateError { pub(crate) async fn detach_tenant( conf: &'static PageServerConf, - tenant_id: TenantId, + tenant_shard_id: TenantShardId, detach_ignored: bool, deletion_queue_client: &DeletionQueueClient, ) -> Result<(), TenantStateError> { let tmp_path = detach_tenant0( conf, &TENANTS, - tenant_id, + tenant_shard_id, detach_ignored, deletion_queue_client, ) @@ -1160,19 +1257,24 @@ pub(crate) async fn detach_tenant( async fn detach_tenant0( conf: &'static PageServerConf, tenants: &std::sync::RwLock, - tenant_id: TenantId, + tenant_shard_id: TenantShardId, detach_ignored: bool, deletion_queue_client: &DeletionQueueClient, ) -> Result { - let tenant_dir_rename_operation = |tenant_id_to_clean| async move { - let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean); + let tenant_dir_rename_operation = |tenant_id_to_clean: TenantShardId| async move { + // TODO(sharding): make local path helpers shard-aware + let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean.tenant_id); safe_rename_tenant_dir(&local_tenant_directory) .await .with_context(|| format!("local tenant directory {local_tenant_directory:?} rename")) }; - let removal_result = - remove_tenant_from_memory(tenants, tenant_id, tenant_dir_rename_operation(tenant_id)).await; + let removal_result = remove_tenant_from_memory( + tenants, + tenant_shard_id, + tenant_dir_rename_operation(tenant_shard_id), + ) + .await; // Flush pending deletions, so that they have a good chance of passing validation // before this tenant is potentially re-attached elsewhere. @@ -1186,12 +1288,15 @@ async fn detach_tenant0( Err(TenantStateError::SlotError(TenantSlotError::NotFound(_))) ) { - let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id); + // TODO(sharding): make local paths sharding-aware + let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_shard_id.tenant_id); if tenant_ignore_mark.exists() { info!("Detaching an ignored tenant"); - let tmp_path = tenant_dir_rename_operation(tenant_id) + let tmp_path = tenant_dir_rename_operation(tenant_shard_id) .await - .with_context(|| format!("Ignored tenant {tenant_id} local directory rename"))?; + .with_context(|| { + format!("Ignored tenant {tenant_shard_id} local directory rename") + })?; return Ok(tmp_path); } } @@ -1208,7 +1313,11 @@ pub(crate) async fn load_tenant( deletion_queue_client: DeletionQueueClient, ctx: &RequestContext, ) -> Result<(), TenantMapInsertError> { - let slot_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::MustNotExist)?; + // This is a legacy API (replaced by `/location_conf`). It does not support sharding + let tenant_shard_id = TenantShardId::unsharded(tenant_id); + + let slot_guard = + tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustNotExist)?; let tenant_path = conf.tenant_path(&tenant_id); let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id); @@ -1261,7 +1370,10 @@ async fn ignore_tenant0( tenants: &std::sync::RwLock, tenant_id: TenantId, ) -> Result<(), TenantStateError> { - remove_tenant_from_memory(tenants, tenant_id, async { + // This is a legacy API (replaced by `/location_conf`). It does not support sharding + let tenant_shard_id = TenantShardId::unsharded(tenant_id); + + remove_tenant_from_memory(tenants, tenant_shard_id, async { let ignore_mark_file = conf.tenant_ignore_mark_file_path(&tenant_id); fs::File::create(&ignore_mark_file) .await @@ -1270,7 +1382,7 @@ async fn ignore_tenant0( crashsafe::fsync_file_and_parent(&ignore_mark_file) .context("Failed to fsync ignore mark file") }) - .with_context(|| format!("Failed to crate ignore mark for tenant {tenant_id}"))?; + .with_context(|| format!("Failed to crate ignore mark for tenant {tenant_shard_id}"))?; Ok(()) }) .await @@ -1293,10 +1405,12 @@ pub(crate) async fn list_tenants() -> Result, Tenan }; Ok(m.iter() .filter_map(|(id, tenant)| match tenant { - TenantSlot::Attached(tenant) => Some((*id, tenant.current_state())), + TenantSlot::Attached(tenant) => Some((id, tenant.current_state())), TenantSlot::Secondary => None, TenantSlot::InProgress(_) => None, }) + // TODO(sharding): make callers of this function shard-aware + .map(|(k, v)| (k.tenant_id, v)) .collect()) } @@ -1312,7 +1426,11 @@ pub(crate) async fn attach_tenant( resources: TenantSharedResources, ctx: &RequestContext, ) -> Result<(), TenantMapInsertError> { - let slot_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::MustNotExist)?; + // This is a legacy API (replaced by `/location_conf`). It does not support sharding + let tenant_shard_id = TenantShardId::unsharded(tenant_id); + + let slot_guard = + tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustNotExist)?; let location_conf = LocationConf::attached_single(tenant_conf, generation); let tenant_dir = create_tenant_files(conf, &location_conf, &tenant_id).await?; // TODO: tenant directory remains on disk if we bail out from here on. @@ -1359,14 +1477,14 @@ pub(crate) enum TenantMapInsertError { pub enum TenantSlotError { /// When acquiring a slot with the expectation that the tenant already exists. #[error("Tenant {0} not found")] - NotFound(TenantId), + NotFound(TenantShardId), /// When acquiring a slot with the expectation that the tenant does not already exist. #[error("tenant {0} already exists, state: {1:?}")] - AlreadyExists(TenantId, TenantState), + AlreadyExists(TenantShardId, TenantState), #[error("tenant {0} already exists in but is not attached")] - Conflict(TenantId), + Conflict(TenantShardId), // Tried to read a slot that is currently being mutated by another administrative // operation. @@ -1428,7 +1546,7 @@ pub enum TenantMapError { /// `drop_old_value`. It is an error to call this without shutting down /// the conents of `old_value`. pub struct SlotGuard { - tenant_id: TenantId, + tenant_shard_id: TenantShardId, old_value: Option, upserted: bool, @@ -1439,12 +1557,12 @@ pub struct SlotGuard { impl SlotGuard { fn new( - tenant_id: TenantId, + tenant_shard_id: TenantShardId, old_value: Option, completion: utils::completion::Completion, ) -> Self { Self { - tenant_id, + tenant_shard_id, old_value, upserted: false, _completion: completion, @@ -1487,7 +1605,7 @@ impl SlotGuard { TenantsMap::Open(m) => m, }; - let replaced = m.insert(self.tenant_id, new_value); + let replaced = m.insert(self.tenant_shard_id, new_value); self.upserted = true; METRICS.tenant_slots.set(m.len() as u64); @@ -1506,7 +1624,7 @@ impl SlotGuard { None => { METRICS.unexpected_errors.inc(); error!( - tenant_id = %self.tenant_id, + tenant_shard_id = %self.tenant_shard_id, "Missing InProgress marker during tenant upsert, this is a bug." ); Err(TenantSlotUpsertError::InternalError( @@ -1515,7 +1633,7 @@ impl SlotGuard { } Some(slot) => { METRICS.unexpected_errors.inc(); - error!(tenant_id=%self.tenant_id, "Unexpected contents of TenantSlot during upsert, this is a bug. Contents: {:?}", slot); + error!(tenant_shard_id=%self.tenant_shard_id, "Unexpected contents of TenantSlot during upsert, this is a bug. Contents: {:?}", slot); Err(TenantSlotUpsertError::InternalError( "Unexpected contents of TenantSlot".into(), )) @@ -1593,12 +1711,12 @@ impl Drop for SlotGuard { TenantsMap::Open(m) => m, }; - use std::collections::hash_map::Entry; - match m.entry(self.tenant_id) { + use std::collections::btree_map::Entry; + match m.entry(self.tenant_shard_id) { Entry::Occupied(mut entry) => { if !matches!(entry.get(), TenantSlot::InProgress(_)) { METRICS.unexpected_errors.inc(); - error!(tenant_id=%self.tenant_id, "Unexpected contents of TenantSlot during drop, this is a bug. Contents: {:?}", entry.get()); + error!(tenant_shard_id=%self.tenant_shard_id, "Unexpected contents of TenantSlot during drop, this is a bug. Contents: {:?}", entry.get()); } if self.old_value_is_shutdown() { @@ -1610,7 +1728,7 @@ impl Drop for SlotGuard { Entry::Vacant(_) => { METRICS.unexpected_errors.inc(); error!( - tenant_id = %self.tenant_id, + tenant_shard_id = %self.tenant_shard_id, "Missing InProgress marker during SlotGuard drop, this is a bug." ); } @@ -1629,7 +1747,7 @@ enum TenantSlotPeekMode { fn tenant_map_peek_slot<'a>( tenants: &'a std::sync::RwLockReadGuard<'a, TenantsMap>, - tenant_id: &TenantId, + tenant_shard_id: &TenantShardId, mode: TenantSlotPeekMode, ) -> Result, TenantMapError> { let m = match tenants.deref() { @@ -1643,7 +1761,7 @@ fn tenant_map_peek_slot<'a>( TenantsMap::Open(m) => m, }; - Ok(m.get(tenant_id)) + Ok(m.get(tenant_shard_id)) } enum TenantSlotAcquireMode { @@ -1656,14 +1774,14 @@ enum TenantSlotAcquireMode { } fn tenant_map_acquire_slot( - tenant_id: &TenantId, + tenant_shard_id: &TenantShardId, mode: TenantSlotAcquireMode, ) -> Result { - tenant_map_acquire_slot_impl(tenant_id, &TENANTS, mode) + tenant_map_acquire_slot_impl(tenant_shard_id, &TENANTS, mode) } fn tenant_map_acquire_slot_impl( - tenant_id: &TenantId, + tenant_shard_id: &TenantShardId, tenants: &std::sync::RwLock, mode: TenantSlotAcquireMode, ) -> Result { @@ -1671,7 +1789,7 @@ fn tenant_map_acquire_slot_impl( METRICS.tenant_slot_writes.inc(); let mut locked = tenants.write().unwrap(); - let span = tracing::info_span!("acquire_slot", %tenant_id); + let span = tracing::info_span!("acquire_slot", tenant_id=%tenant_shard_id.tenant_id, shard=tenant_shard_id.shard_slug()); let _guard = span.enter(); let m = match &mut *locked { @@ -1680,19 +1798,21 @@ fn tenant_map_acquire_slot_impl( TenantsMap::Open(m) => m, }; - use std::collections::hash_map::Entry; - let entry = m.entry(*tenant_id); + use std::collections::btree_map::Entry; + + let entry = m.entry(*tenant_shard_id); + match entry { Entry::Vacant(v) => match mode { MustExist => { tracing::debug!("Vacant && MustExist: return NotFound"); - Err(TenantSlotError::NotFound(*tenant_id)) + Err(TenantSlotError::NotFound(*tenant_shard_id)) } _ => { let (completion, barrier) = utils::completion::channel(); v.insert(TenantSlot::InProgress(barrier)); tracing::debug!("Vacant, inserted InProgress"); - Ok(SlotGuard::new(*tenant_id, None, completion)) + Ok(SlotGuard::new(*tenant_shard_id, None, completion)) } }, Entry::Occupied(mut o) => { @@ -1706,7 +1826,7 @@ fn tenant_map_acquire_slot_impl( TenantSlot::Attached(tenant) => { tracing::debug!("Attached && MustNotExist, return AlreadyExists"); Err(TenantSlotError::AlreadyExists( - *tenant_id, + *tenant_shard_id, tenant.current_state(), )) } @@ -1715,7 +1835,7 @@ fn tenant_map_acquire_slot_impl( // to get the state from tracing::debug!("Occupied & MustNotExist, return AlreadyExists"); Err(TenantSlotError::AlreadyExists( - *tenant_id, + *tenant_shard_id, TenantState::Broken { reason: "Present but not attached".to_string(), backtrace: "".to_string(), @@ -1728,7 +1848,11 @@ fn tenant_map_acquire_slot_impl( let (completion, barrier) = utils::completion::channel(); let old_value = o.insert(TenantSlot::InProgress(barrier)); tracing::debug!("Occupied, replaced with InProgress"); - Ok(SlotGuard::new(*tenant_id, Some(old_value), completion)) + Ok(SlotGuard::new( + *tenant_shard_id, + Some(old_value), + completion, + )) } } } @@ -1741,7 +1865,7 @@ fn tenant_map_acquire_slot_impl( /// operation would be needed to remove it. async fn remove_tenant_from_memory( tenants: &std::sync::RwLock, - tenant_id: TenantId, + tenant_shard_id: TenantShardId, tenant_cleanup: F, ) -> Result where @@ -1750,7 +1874,7 @@ where use utils::completion; let mut slot_guard = - tenant_map_acquire_slot_impl(&tenant_id, tenants, TenantSlotAcquireMode::MustExist)?; + tenant_map_acquire_slot_impl(&tenant_shard_id, tenants, TenantSlotAcquireMode::MustExist)?; // The SlotGuard allows us to manipulate the Tenant object without fear of some // concurrent API request doing something else for the same tenant ID. @@ -1777,7 +1901,7 @@ where // if pageserver shutdown or other detach/ignore is already ongoing, we don't want to // wait for it but return an error right away because these are distinct requests. slot_guard.revert(); - return Err(TenantStateError::IsStopping(tenant_id)); + return Err(TenantStateError::IsStopping(tenant_shard_id.tenant_id)); } } } @@ -1788,7 +1912,7 @@ where match tenant_cleanup .await - .with_context(|| format!("Failed to run cleanup for tenant {tenant_id}")) + .with_context(|| format!("Failed to run cleanup for tenant {tenant_shard_id}")) { Ok(hook_value) => { // Success: drop the old TenantSlot::Attached. @@ -1867,7 +1991,8 @@ pub(crate) async fn immediate_gc( #[cfg(test)] mod tests { - use std::collections::HashMap; + use pageserver_api::shard::TenantShardId; + use std::collections::BTreeMap; use std::sync::Arc; use tracing::{info_span, Instrument}; @@ -1887,12 +2012,12 @@ mod tests { // harness loads it to active, which is forced and nothing is running on the tenant - let id = t.tenant_id(); + let id = TenantShardId::unsharded(t.tenant_id()); // tenant harness configures the logging and we cannot escape it let _e = info_span!("testing", tenant_id = %id).entered(); - let tenants = HashMap::from([(id, TenantSlot::Attached(t.clone()))]); + let tenants = BTreeMap::from([(id, TenantSlot::Attached(t.clone()))]); let tenants = Arc::new(std::sync::RwLock::new(TenantsMap::Open(tenants))); // Invoke remove_tenant_from_memory with a cleanup hook that blocks until we manually