pageserver: make TenantsMap shard-aware (#5819)

## Problem

When using TenantId as the key, we are unable to handle multiple tenant
shards attached to the same pageserver for the same tenant ID. This is
an expected scenario if we have e.g. 8 shards and 5 pageservers.

## Summary of changes

- TenantsMap is now a BTreeMap instead of a HashMap: this enables
looking up by range. In future, we will need this for page_service, as
incoming requests will just specify the Key, and we'll have to figure
out which shard to route it to.
- A new key type TenantShardId is introduced, to act as the key in
TenantsMap, and as the id type in external APIs. Its human readable
serialization is backward compatible with TenantId, and also
forward-compatible as long as sharding is not actually used (when we
construct a TenantShardId with ShardCount(0), it serializes to an
old-fashioned TenantId).
- Essential tenant APIs are updated to accept TenantShardIds:
tenant/timeline create, tenant delete, and /location_conf. These are the
APIs that will enable driving sharded tenants. Other apis like /attach
/detach /load /ignore will not work with sharding: those will soon be
deprecated and replaced with /location_conf as part of the live
migration work.

Closes: #5787
This commit is contained in:
John Spray
2023-11-15 22:20:21 +01:00
committed by GitHub
parent f84ac2b98d
commit ab631e6792
10 changed files with 744 additions and 269 deletions

2
Cargo.lock generated
View File

@@ -2994,10 +2994,12 @@ name = "pageserver_api"
version = "0.1.0"
dependencies = [
"anyhow",
"bincode",
"byteorder",
"bytes",
"const_format",
"enum-map",
"hex",
"postgres_ffi",
"serde",
"serde_json",

View File

@@ -18,6 +18,7 @@ use camino::Utf8PathBuf;
use pageserver_api::models::{
self, LocationConfig, TenantInfo, TenantLocationConfigRequest, TimelineInfo,
};
use pageserver_api::shard::TenantShardId;
use postgres_backend::AuthType;
use postgres_connection::{parse_host_port, PgConnectionConfig};
use reqwest::blocking::{Client, RequestBuilder, Response};
@@ -408,7 +409,7 @@ impl PageServerNode {
};
let request = models::TenantCreateRequest {
new_tenant_id,
new_tenant_id: TenantShardId::unsharded(new_tenant_id),
generation,
config,
};

View File

@@ -17,5 +17,9 @@ postgres_ffi.workspace = true
enum-map.workspace = true
strum.workspace = true
strum_macros.workspace = true
hex.workspace = true
workspace_hack.workspace = true
[dev-dependencies]
bincode.workspace = true

View File

@@ -0,0 +1,142 @@
use anyhow::{bail, Result};
use byteorder::{ByteOrder, BE};
use serde::{Deserialize, Serialize};
use std::fmt;
/// Key used in the Repository kv-store.
///
/// The Repository treats this as an opaque struct, but see the code in pgdatadir_mapping.rs
/// for what we actually store in these fields.
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)]
pub struct Key {
pub field1: u8,
pub field2: u32,
pub field3: u32,
pub field4: u32,
pub field5: u8,
pub field6: u32,
}
pub const KEY_SIZE: usize = 18;
impl Key {
/// 'field2' is used to store tablespaceid for relations and small enum numbers for other relish.
/// As long as Neon does not support tablespace (because of lack of access to local file system),
/// we can assume that only some predefined namespace OIDs are used which can fit in u16
pub fn to_i128(&self) -> i128 {
assert!(self.field2 < 0xFFFF || self.field2 == 0xFFFFFFFF || self.field2 == 0x22222222);
(((self.field1 & 0xf) as i128) << 120)
| (((self.field2 & 0xFFFF) as i128) << 104)
| ((self.field3 as i128) << 72)
| ((self.field4 as i128) << 40)
| ((self.field5 as i128) << 32)
| self.field6 as i128
}
pub const fn from_i128(x: i128) -> Self {
Key {
field1: ((x >> 120) & 0xf) as u8,
field2: ((x >> 104) & 0xFFFF) as u32,
field3: (x >> 72) as u32,
field4: (x >> 40) as u32,
field5: (x >> 32) as u8,
field6: x as u32,
}
}
pub fn next(&self) -> Key {
self.add(1)
}
pub fn add(&self, x: u32) -> Key {
let mut key = *self;
let r = key.field6.overflowing_add(x);
key.field6 = r.0;
if r.1 {
let r = key.field5.overflowing_add(1);
key.field5 = r.0;
if r.1 {
let r = key.field4.overflowing_add(1);
key.field4 = r.0;
if r.1 {
let r = key.field3.overflowing_add(1);
key.field3 = r.0;
if r.1 {
let r = key.field2.overflowing_add(1);
key.field2 = r.0;
if r.1 {
let r = key.field1.overflowing_add(1);
key.field1 = r.0;
assert!(!r.1);
}
}
}
}
}
key
}
pub fn from_slice(b: &[u8]) -> Self {
Key {
field1: b[0],
field2: u32::from_be_bytes(b[1..5].try_into().unwrap()),
field3: u32::from_be_bytes(b[5..9].try_into().unwrap()),
field4: u32::from_be_bytes(b[9..13].try_into().unwrap()),
field5: b[13],
field6: u32::from_be_bytes(b[14..18].try_into().unwrap()),
}
}
pub fn write_to_byte_slice(&self, buf: &mut [u8]) {
buf[0] = self.field1;
BE::write_u32(&mut buf[1..5], self.field2);
BE::write_u32(&mut buf[5..9], self.field3);
BE::write_u32(&mut buf[9..13], self.field4);
buf[13] = self.field5;
BE::write_u32(&mut buf[14..18], self.field6);
}
}
impl fmt::Display for Key {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{:02X}{:08X}{:08X}{:08X}{:02X}{:08X}",
self.field1, self.field2, self.field3, self.field4, self.field5, self.field6
)
}
}
impl Key {
pub const MIN: Key = Key {
field1: u8::MIN,
field2: u32::MIN,
field3: u32::MIN,
field4: u32::MIN,
field5: u8::MIN,
field6: u32::MIN,
};
pub const MAX: Key = Key {
field1: u8::MAX,
field2: u32::MAX,
field3: u32::MAX,
field4: u32::MAX,
field5: u8::MAX,
field6: u32::MAX,
};
pub fn from_hex(s: &str) -> Result<Self> {
if s.len() != 36 {
bail!("parse error");
}
Ok(Key {
field1: u8::from_str_radix(&s[0..2], 16)?,
field2: u32::from_str_radix(&s[2..10], 16)?,
field3: u32::from_str_radix(&s[10..18], 16)?,
field4: u32::from_str_radix(&s[18..26], 16)?,
field5: u8::from_str_radix(&s[26..28], 16)?,
field6: u32::from_str_radix(&s[28..36], 16)?,
})
}
}

View File

@@ -4,8 +4,10 @@ use const_format::formatcp;
/// Public API types
pub mod control_api;
pub mod key;
pub mod models;
pub mod reltag;
pub mod shard;
pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");

View File

@@ -16,7 +16,7 @@ use utils::{
lsn::Lsn,
};
use crate::reltag::RelTag;
use crate::{reltag::RelTag, shard::TenantShardId};
use anyhow::bail;
use bytes::{BufMut, Bytes, BytesMut};
@@ -187,7 +187,7 @@ pub struct TimelineCreateRequest {
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct TenantCreateRequest {
pub new_tenant_id: TenantId,
pub new_tenant_id: TenantShardId,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub generation: Option<u32>,

View File

@@ -0,0 +1,321 @@
use std::{ops::RangeInclusive, str::FromStr};
use hex::FromHex;
use serde::{Deserialize, Serialize};
use utils::id::TenantId;
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug)]
pub struct ShardNumber(pub u8);
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug)]
pub struct ShardCount(pub u8);
impl ShardCount {
pub const MAX: Self = Self(u8::MAX);
}
impl ShardNumber {
pub const MAX: Self = Self(u8::MAX);
}
/// TenantShardId identify the units of work for the Pageserver.
///
/// These are written as `<tenant_id>-<shard number><shard-count>`, for example:
///
/// # The second shard in a two-shard tenant
/// 072f1291a5310026820b2fe4b2968934-0102
///
/// Historically, tenants could not have multiple shards, and were identified
/// by TenantId. To support this, TenantShardId has a special legacy
/// mode where `shard_count` is equal to zero: this represents a single-sharded
/// tenant which should be written as a TenantId with no suffix.
///
/// The human-readable encoding of TenantShardId, such as used in API URLs,
/// is both forward and backward compatible: a legacy TenantId can be
/// decoded as a TenantShardId, and when re-encoded it will be parseable
/// as a TenantId.
///
/// Note that the binary encoding is _not_ backward compatible, because
/// at the time sharding is introduced, there are no existing binary structures
/// containing TenantId that we need to handle.
#[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy)]
pub struct TenantShardId {
pub tenant_id: TenantId,
pub shard_number: ShardNumber,
pub shard_count: ShardCount,
}
impl TenantShardId {
pub fn unsharded(tenant_id: TenantId) -> Self {
Self {
tenant_id,
shard_number: ShardNumber(0),
shard_count: ShardCount(0),
}
}
/// The range of all TenantShardId that belong to a particular TenantId. This is useful when
/// you have a BTreeMap of TenantShardId, and are querying by TenantId.
pub fn tenant_range(tenant_id: TenantId) -> RangeInclusive<Self> {
RangeInclusive::new(
Self {
tenant_id,
shard_number: ShardNumber(0),
shard_count: ShardCount(0),
},
Self {
tenant_id,
shard_number: ShardNumber::MAX,
shard_count: ShardCount::MAX,
},
)
}
pub fn shard_slug(&self) -> String {
format!("{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
}
}
impl std::fmt::Display for TenantShardId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.shard_count != ShardCount(0) {
write!(
f,
"{}-{:02x}{:02x}",
self.tenant_id, self.shard_number.0, self.shard_count.0
)
} else {
// Legacy case (shard_count == 0) -- format as just the tenant id. Note that this
// is distinct from the normal single shard case (shard count == 1).
self.tenant_id.fmt(f)
}
}
}
impl std::fmt::Debug for TenantShardId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// Debug is the same as Display: the compact hex representation
write!(f, "{}", self)
}
}
impl std::str::FromStr for TenantShardId {
type Err = hex::FromHexError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
// Expect format: 16 byte TenantId, '-', 1 byte shard number, 1 byte shard count
if s.len() == 32 {
// Legacy case: no shard specified
Ok(Self {
tenant_id: TenantId::from_str(s)?,
shard_number: ShardNumber(0),
shard_count: ShardCount(0),
})
} else if s.len() == 37 {
let bytes = s.as_bytes();
let tenant_id = TenantId::from_hex(&bytes[0..32])?;
let mut shard_parts: [u8; 2] = [0u8; 2];
hex::decode_to_slice(&bytes[33..37], &mut shard_parts)?;
Ok(Self {
tenant_id,
shard_number: ShardNumber(shard_parts[0]),
shard_count: ShardCount(shard_parts[1]),
})
} else {
Err(hex::FromHexError::InvalidStringLength)
}
}
}
impl From<[u8; 18]> for TenantShardId {
fn from(b: [u8; 18]) -> Self {
let tenant_id_bytes: [u8; 16] = b[0..16].try_into().unwrap();
Self {
tenant_id: TenantId::from(tenant_id_bytes),
shard_number: ShardNumber(b[16]),
shard_count: ShardCount(b[17]),
}
}
}
impl Serialize for TenantShardId {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
if serializer.is_human_readable() {
serializer.collect_str(self)
} else {
let mut packed: [u8; 18] = [0; 18];
packed[0..16].clone_from_slice(&self.tenant_id.as_arr());
packed[16] = self.shard_number.0;
packed[17] = self.shard_count.0;
packed.serialize(serializer)
}
}
}
impl<'de> Deserialize<'de> for TenantShardId {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct IdVisitor {
is_human_readable_deserializer: bool,
}
impl<'de> serde::de::Visitor<'de> for IdVisitor {
type Value = TenantShardId;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
if self.is_human_readable_deserializer {
formatter.write_str("value in form of hex string")
} else {
formatter.write_str("value in form of integer array([u8; 18])")
}
}
fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
where
A: serde::de::SeqAccess<'de>,
{
let s = serde::de::value::SeqAccessDeserializer::new(seq);
let id: [u8; 18] = Deserialize::deserialize(s)?;
Ok(TenantShardId::from(id))
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
TenantShardId::from_str(v).map_err(E::custom)
}
}
if deserializer.is_human_readable() {
deserializer.deserialize_str(IdVisitor {
is_human_readable_deserializer: true,
})
} else {
deserializer.deserialize_tuple(
18,
IdVisitor {
is_human_readable_deserializer: false,
},
)
}
}
}
#[cfg(test)]
mod tests {
use std::str::FromStr;
use bincode;
use utils::{id::TenantId, Hex};
use super::*;
const EXAMPLE_TENANT_ID: &str = "1f359dd625e519a1a4e8d7509690f6fc";
#[test]
fn tenant_shard_id_string() -> Result<(), hex::FromHexError> {
let example = TenantShardId {
tenant_id: TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(),
shard_count: ShardCount(10),
shard_number: ShardNumber(7),
};
let encoded = format!("{example}");
let expected = format!("{EXAMPLE_TENANT_ID}-070a");
assert_eq!(&encoded, &expected);
let decoded = TenantShardId::from_str(&encoded)?;
assert_eq!(example, decoded);
Ok(())
}
#[test]
fn tenant_shard_id_binary() -> Result<(), hex::FromHexError> {
let example = TenantShardId {
tenant_id: TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(),
shard_count: ShardCount(10),
shard_number: ShardNumber(7),
};
let encoded = bincode::serialize(&example).unwrap();
let expected: [u8; 18] = [
0x1f, 0x35, 0x9d, 0xd6, 0x25, 0xe5, 0x19, 0xa1, 0xa4, 0xe8, 0xd7, 0x50, 0x96, 0x90,
0xf6, 0xfc, 0x07, 0x0a,
];
assert_eq!(Hex(&encoded), Hex(&expected));
let decoded = bincode::deserialize(&encoded).unwrap();
assert_eq!(example, decoded);
Ok(())
}
#[test]
fn tenant_shard_id_backward_compat() -> Result<(), hex::FromHexError> {
// Test that TenantShardId can decode a TenantId in human
// readable form
let example = TenantId::from_str(EXAMPLE_TENANT_ID).unwrap();
let encoded = format!("{example}");
assert_eq!(&encoded, EXAMPLE_TENANT_ID);
let decoded = TenantShardId::from_str(&encoded)?;
assert_eq!(example, decoded.tenant_id);
assert_eq!(decoded.shard_count, ShardCount(0));
assert_eq!(decoded.shard_number, ShardNumber(0));
Ok(())
}
#[test]
fn tenant_shard_id_forward_compat() -> Result<(), hex::FromHexError> {
// Test that a legacy TenantShardId encodes into a form that
// can be decoded as TenantId
let example_tenant_id = TenantId::from_str(EXAMPLE_TENANT_ID).unwrap();
let example = TenantShardId::unsharded(example_tenant_id);
let encoded = format!("{example}");
assert_eq!(&encoded, EXAMPLE_TENANT_ID);
let decoded = TenantId::from_str(&encoded)?;
assert_eq!(example_tenant_id, decoded);
Ok(())
}
#[test]
fn tenant_shard_id_legacy_binary() -> Result<(), hex::FromHexError> {
// Unlike in human readable encoding, binary encoding does not
// do any special handling of legacy unsharded TenantIds: this test
// is equivalent to the main test for binary encoding, just verifying
// that the same behavior applies when we have used `unsharded()` to
// construct a TenantShardId.
let example = TenantShardId::unsharded(TenantId::from_str(EXAMPLE_TENANT_ID).unwrap());
let encoded = bincode::serialize(&example).unwrap();
let expected: [u8; 18] = [
0x1f, 0x35, 0x9d, 0xd6, 0x25, 0xe5, 0x19, 0xa1, 0xa4, 0xe8, 0xd7, 0x50, 0x96, 0x90,
0xf6, 0xfc, 0x00, 0x00,
];
assert_eq!(Hex(&encoded), Hex(&expected));
let decoded = bincode::deserialize::<TenantShardId>(&encoded).unwrap();
assert_eq!(example, decoded);
Ok(())
}
}

View File

@@ -16,6 +16,7 @@ use pageserver_api::models::{
DownloadRemoteLayersTaskSpawnRequest, LocationConfigMode, TenantAttachRequest,
TenantLoadRequest, TenantLocationConfigRequest,
};
use pageserver_api::shard::TenantShardId;
use remote_storage::GenericRemoteStorage;
use tenant_size_model::{SizeResult, StorageModel};
use tokio_util::sync::CancellationToken;
@@ -419,9 +420,9 @@ async fn timeline_create_handler(
mut request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let request_data: TimelineCreateRequest = json_request(&mut request).await?;
check_permission(&request, Some(tenant_id))?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let new_timeline_id = request_data.new_timeline_id;
@@ -430,7 +431,7 @@ async fn timeline_create_handler(
let state = get_state(&request);
async {
let tenant = mgr::get_tenant(tenant_id, true)?;
let tenant = state.tenant_manager.get_attached_tenant_shard(tenant_shard_id, true)?;
match tenant.create_timeline(
new_timeline_id,
request_data.ancestor_timeline_id.map(TimelineId::from),
@@ -464,7 +465,10 @@ async fn timeline_create_handler(
Err(tenant::CreateTimelineError::Other(err)) => Err(ApiError::InternalServerError(err)),
}
}
.instrument(info_span!("timeline_create", %tenant_id, timeline_id = %new_timeline_id, lsn=?request_data.ancestor_start_lsn, pg_version=?request_data.pg_version))
.instrument(info_span!("timeline_create",
tenant_id = %tenant_shard_id.tenant_id,
shard = %tenant_shard_id.shard_slug(),
timeline_id = %new_timeline_id, lsn=?request_data.ancestor_start_lsn, pg_version=?request_data.pg_version))
.await
}
@@ -660,14 +664,15 @@ async fn timeline_delete_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
let state = get_state(&request);
mgr::delete_timeline(tenant_id, timeline_id, &ctx)
.instrument(info_span!("timeline_delete", %tenant_id, %timeline_id))
state.tenant_manager.delete_timeline(tenant_shard_id, timeline_id, &ctx)
.instrument(info_span!("timeline_delete", tenant_id=%tenant_shard_id.tenant_id, shard=%tenant_shard_id.shard_slug(), %timeline_id))
.await?;
json_response(StatusCode::ACCEPTED, ())
@@ -681,11 +686,14 @@ async fn tenant_detach_handler(
check_permission(&request, Some(tenant_id))?;
let detach_ignored: Option<bool> = parse_query_param(&request, "detach_ignored")?;
// This is a legacy API (`/location_conf` is the replacement). It only supports unsharded tenants
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
let state = get_state(&request);
let conf = state.conf;
mgr::detach_tenant(
conf,
tenant_id,
tenant_shard_id,
detach_ignored.unwrap_or(false),
&state.deletion_queue_client,
)
@@ -802,13 +810,16 @@ async fn tenant_delete_handler(
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
// TODO openapi spec
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let state = get_state(&request);
mgr::delete_tenant(state.conf, state.remote_storage.clone(), tenant_id)
.instrument(info_span!("tenant_delete_handler", %tenant_id))
mgr::delete_tenant(state.conf, state.remote_storage.clone(), tenant_shard_id)
.instrument(info_span!("tenant_delete_handler",
tenant_id = %tenant_shard_id.tenant_id,
shard = tenant_shard_id.shard_slug()
))
.await?;
json_response(StatusCode::ACCEPTED, ())
@@ -1138,9 +1149,10 @@ async fn put_tenant_location_config_handler(
mut request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let request_data: TenantLocationConfigRequest = json_request(&mut request).await?;
let tenant_id = request_data.tenant_id;
check_permission(&request, Some(tenant_id))?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
let state = get_state(&request);
@@ -1149,9 +1161,13 @@ async fn put_tenant_location_config_handler(
// The `Detached` state is special, it doesn't upsert a tenant, it removes
// its local disk content and drops it from memory.
if let LocationConfigMode::Detached = request_data.config.mode {
if let Err(e) = mgr::detach_tenant(conf, tenant_id, true, &state.deletion_queue_client)
.instrument(info_span!("tenant_detach", %tenant_id))
.await
if let Err(e) =
mgr::detach_tenant(conf, tenant_shard_id, true, &state.deletion_queue_client)
.instrument(info_span!("tenant_detach",
tenant_id = %tenant_shard_id.tenant_id,
shard = tenant_shard_id.shard_slug()
))
.await
{
match e {
TenantStateError::SlotError(TenantSlotError::NotFound(_)) => {
@@ -1168,7 +1184,7 @@ async fn put_tenant_location_config_handler(
state
.tenant_manager
.upsert_location(tenant_id, location_conf, &ctx)
.upsert_location(tenant_shard_id, location_conf, &ctx)
.await
// TODO: badrequest assumes the caller was asking for something unreasonable, but in
// principle we might have hit something like concurrent API calls to the same tenant,
@@ -1752,7 +1768,7 @@ pub fn make_router(
.get("/v1/tenant", |r| api_handler(r, tenant_list_handler))
.post("/v1/tenant", |r| api_handler(r, tenant_create_handler))
.get("/v1/tenant/:tenant_id", |r| api_handler(r, tenant_status))
.delete("/v1/tenant/:tenant_id", |r| {
.delete("/v1/tenant/:tenant_shard_id", |r| {
api_handler(r, tenant_delete_handler)
})
.get("/v1/tenant/:tenant_id/synthetic_size", |r| {
@@ -1764,13 +1780,13 @@ pub fn make_router(
.get("/v1/tenant/:tenant_id/config", |r| {
api_handler(r, get_tenant_config_handler)
})
.put("/v1/tenant/:tenant_id/location_config", |r| {
.put("/v1/tenant/:tenant_shard_id/location_config", |r| {
api_handler(r, put_tenant_location_config_handler)
})
.get("/v1/tenant/:tenant_id/timeline", |r| {
api_handler(r, timeline_list_handler)
})
.post("/v1/tenant/:tenant_id/timeline", |r| {
.post("/v1/tenant/:tenant_shard_id/timeline", |r| {
api_handler(r, timeline_create_handler)
})
.post("/v1/tenant/:tenant_id/attach", |r| {
@@ -1814,7 +1830,7 @@ pub fn make_router(
"/v1/tenant/:tenant_id/timeline/:timeline_id/download_remote_layers",
|r| api_handler(r, timeline_download_remote_layers_handler_get),
)
.delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
.delete("/v1/tenant/:tenant_shard_id/timeline/:timeline_id", |r| {
api_handler(r, timeline_delete_handler)
})
.get("/v1/tenant/:tenant_id/timeline/:timeline_id/layer", |r| {

View File

@@ -1,106 +1,11 @@
use crate::walrecord::NeonWalRecord;
use anyhow::{bail, Result};
use byteorder::{ByteOrder, BE};
use anyhow::Result;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::ops::{AddAssign, Range};
use std::time::Duration;
/// Key used in the Repository kv-store.
///
/// The Repository treats this as an opaque struct, but see the code in pgdatadir_mapping.rs
/// for what we actually store in these fields.
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)]
pub struct Key {
pub field1: u8,
pub field2: u32,
pub field3: u32,
pub field4: u32,
pub field5: u8,
pub field6: u32,
}
pub const KEY_SIZE: usize = 18;
impl Key {
/// 'field2' is used to store tablespaceid for relations and small enum numbers for other relish.
/// As long as Neon does not support tablespace (because of lack of access to local file system),
/// we can assume that only some predefined namespace OIDs are used which can fit in u16
pub fn to_i128(&self) -> i128 {
assert!(self.field2 < 0xFFFF || self.field2 == 0xFFFFFFFF || self.field2 == 0x22222222);
(((self.field1 & 0xf) as i128) << 120)
| (((self.field2 & 0xFFFF) as i128) << 104)
| ((self.field3 as i128) << 72)
| ((self.field4 as i128) << 40)
| ((self.field5 as i128) << 32)
| self.field6 as i128
}
pub const fn from_i128(x: i128) -> Self {
Key {
field1: ((x >> 120) & 0xf) as u8,
field2: ((x >> 104) & 0xFFFF) as u32,
field3: (x >> 72) as u32,
field4: (x >> 40) as u32,
field5: (x >> 32) as u8,
field6: x as u32,
}
}
pub fn next(&self) -> Key {
self.add(1)
}
pub fn add(&self, x: u32) -> Key {
let mut key = *self;
let r = key.field6.overflowing_add(x);
key.field6 = r.0;
if r.1 {
let r = key.field5.overflowing_add(1);
key.field5 = r.0;
if r.1 {
let r = key.field4.overflowing_add(1);
key.field4 = r.0;
if r.1 {
let r = key.field3.overflowing_add(1);
key.field3 = r.0;
if r.1 {
let r = key.field2.overflowing_add(1);
key.field2 = r.0;
if r.1 {
let r = key.field1.overflowing_add(1);
key.field1 = r.0;
assert!(!r.1);
}
}
}
}
}
key
}
pub fn from_slice(b: &[u8]) -> Self {
Key {
field1: b[0],
field2: u32::from_be_bytes(b[1..5].try_into().unwrap()),
field3: u32::from_be_bytes(b[5..9].try_into().unwrap()),
field4: u32::from_be_bytes(b[9..13].try_into().unwrap()),
field5: b[13],
field6: u32::from_be_bytes(b[14..18].try_into().unwrap()),
}
}
pub fn write_to_byte_slice(&self, buf: &mut [u8]) {
buf[0] = self.field1;
BE::write_u32(&mut buf[1..5], self.field2);
BE::write_u32(&mut buf[5..9], self.field3);
BE::write_u32(&mut buf[9..13], self.field4);
buf[13] = self.field5;
BE::write_u32(&mut buf[14..18], self.field6);
}
}
pub use pageserver_api::key::{Key, KEY_SIZE};
pub fn key_range_size(key_range: &Range<Key>) -> u32 {
let start = key_range.start;
@@ -129,49 +34,6 @@ pub fn singleton_range(key: Key) -> Range<Key> {
key..key.next()
}
impl fmt::Display for Key {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{:02X}{:08X}{:08X}{:08X}{:02X}{:08X}",
self.field1, self.field2, self.field3, self.field4, self.field5, self.field6
)
}
}
impl Key {
pub const MIN: Key = Key {
field1: u8::MIN,
field2: u32::MIN,
field3: u32::MIN,
field4: u32::MIN,
field5: u8::MIN,
field6: u32::MIN,
};
pub const MAX: Key = Key {
field1: u8::MAX,
field2: u32::MAX,
field3: u32::MAX,
field4: u32::MAX,
field5: u8::MAX,
field6: u32::MAX,
};
pub fn from_hex(s: &str) -> Result<Self> {
if s.len() != 36 {
bail!("parse error");
}
Ok(Key {
field1: u8::from_str_radix(&s[0..2], 16)?,
field2: u32::from_str_radix(&s[2..10], 16)?,
field3: u32::from_str_radix(&s[10..18], 16)?,
field4: u32::from_str_radix(&s[18..26], 16)?,
field5: u8::from_str_radix(&s[26..28], 16)?,
field6: u32::from_str_radix(&s[28..36], 16)?,
})
}
}
/// A 'value' stored for a one Key.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]

View File

@@ -2,9 +2,10 @@
//! page server.
use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
use pageserver_api::shard::TenantShardId;
use rand::{distributions::Alphanumeric, Rng};
use std::borrow::Cow;
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::ops::Deref;
use std::sync::Arc;
use std::time::{Duration, Instant};
@@ -30,6 +31,7 @@ use crate::metrics::TENANT_MANAGER as METRICS;
use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::{AttachmentMode, LocationConf, LocationMode, TenantConfOpt};
use crate::tenant::delete::DeleteTenantFlow;
use crate::tenant::span::debug_assert_current_span_has_tenant_id;
use crate::tenant::{create_tenant_files, AttachedTenantConf, SpawnMode, Tenant, TenantState};
use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, TEMP_FILE_SUFFIX};
@@ -87,10 +89,37 @@ pub(crate) enum TenantsMap {
Initializing,
/// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
/// New tenants can be added using [`tenant_map_acquire_slot`].
Open(HashMap<TenantId, TenantSlot>),
Open(BTreeMap<TenantShardId, TenantSlot>),
/// The pageserver has entered shutdown mode via [`shutdown_all_tenants`].
/// Existing tenants are still accessible, but no new tenants can be created.
ShuttingDown(HashMap<TenantId, TenantSlot>),
ShuttingDown(BTreeMap<TenantShardId, TenantSlot>),
}
/// Helper for mapping shard-unaware functions to a sharding-aware map
/// TODO(sharding): all users of this must be made shard-aware.
fn exactly_one_or_none<'a>(
map: &'a BTreeMap<TenantShardId, TenantSlot>,
tenant_id: &TenantId,
) -> Option<(&'a TenantShardId, &'a TenantSlot)> {
let mut slots = map.range(TenantShardId::tenant_range(*tenant_id));
// Retrieve the first two slots in the range: if both are populated, we must panic because the caller
// needs a shard-naive view of the world in which only one slot can exist for a TenantId at a time.
let slot_a = slots.next();
let slot_b = slots.next();
match (slot_a, slot_b) {
(None, None) => None,
(Some(slot), None) => {
// Exactly one matching slot
Some(slot)
}
(Some(_slot_a), Some(_slot_b)) => {
// Multiple shards for this tenant: cannot handle this yet.
// TODO(sharding): callers of get() should be shard-aware.
todo!("Attaching multiple shards in teh same tenant to the same pageserver")
}
(None, Some(_)) => unreachable!(),
}
}
impl TenantsMap {
@@ -101,7 +130,8 @@ impl TenantsMap {
match self {
TenantsMap::Initializing => None,
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
m.get(tenant_id).and_then(TenantSlot::get_attached)
// TODO(sharding): callers of get() should be shard-aware.
exactly_one_or_none(m, tenant_id).and_then(|(_, slot)| slot.get_attached())
}
}
}
@@ -109,7 +139,10 @@ impl TenantsMap {
pub(crate) fn remove(&mut self, tenant_id: &TenantId) -> Option<TenantSlot> {
match self {
TenantsMap::Initializing => None,
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.remove(tenant_id),
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
let key = exactly_one_or_none(m, tenant_id).map(|(k, _)| *k);
key.and_then(|key| m.remove(&key))
}
}
}
@@ -383,7 +416,7 @@ pub async fn init_tenant_mgr(
init_order: InitializationOrder,
cancel: CancellationToken,
) -> anyhow::Result<TenantManager> {
let mut tenants = HashMap::new();
let mut tenants = BTreeMap::new();
let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
@@ -404,7 +437,7 @@ pub async fn init_tenant_mgr(
warn!(%tenant_id, "Marking tenant broken, failed to {e:#}");
tenants.insert(
tenant_id,
TenantShardId::unsharded(tenant_id),
TenantSlot::Attached(Tenant::create_broken_tenant(
conf,
tenant_id,
@@ -427,7 +460,7 @@ pub async fn init_tenant_mgr(
// tenants, because they do no remote writes and hence require no
// generation number
info!(%tenant_id, "Loaded tenant in secondary mode");
tenants.insert(tenant_id, TenantSlot::Secondary);
tenants.insert(TenantShardId::unsharded(tenant_id), TenantSlot::Secondary);
}
LocationMode::Attached(_) => {
// TODO: augment re-attach API to enable the control plane to
@@ -470,7 +503,10 @@ pub async fn init_tenant_mgr(
&ctx,
) {
Ok(tenant) => {
tenants.insert(tenant.tenant_id(), TenantSlot::Attached(tenant));
tenants.insert(
TenantShardId::unsharded(tenant.tenant_id()),
TenantSlot::Attached(tenant),
);
}
Err(e) => {
error!(%tenant_id, "Failed to start tenant: {e:#}");
@@ -573,19 +609,19 @@ async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
let mut m = tenants.write().unwrap();
match &mut *m {
TenantsMap::Initializing => {
*m = TenantsMap::ShuttingDown(HashMap::default());
*m = TenantsMap::ShuttingDown(BTreeMap::default());
info!("tenants map is empty");
return;
}
TenantsMap::Open(tenants) => {
let mut shutdown_state = HashMap::new();
let mut shutdown_state = BTreeMap::new();
let mut total_in_progress = 0;
let mut total_attached = 0;
for (tenant_id, v) in tenants.drain() {
for (tenant_shard_id, v) in std::mem::take(tenants).into_iter() {
match v {
TenantSlot::Attached(t) => {
shutdown_state.insert(tenant_id, TenantSlot::Attached(t.clone()));
shutdown_state.insert(tenant_shard_id, TenantSlot::Attached(t.clone()));
join_set.spawn(
async move {
let freeze_and_flush = true;
@@ -604,13 +640,13 @@ async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
// going to log too many lines
debug!("tenant successfully stopped");
}
.instrument(info_span!("shutdown", %tenant_id)),
.instrument(info_span!("shutdown", tenant_id=%tenant_shard_id.tenant_id, shard=%tenant_shard_id.shard_slug())),
);
total_attached += 1;
}
TenantSlot::Secondary => {
shutdown_state.insert(tenant_id, TenantSlot::Secondary);
shutdown_state.insert(tenant_shard_id, TenantSlot::Secondary);
}
TenantSlot::InProgress(notify) => {
// InProgress tenants are not visible in TenantsMap::ShuttingDown: we will
@@ -690,19 +726,22 @@ async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
pub(crate) async fn create_tenant(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
tenant_id: TenantId,
tenant_shard_id: TenantShardId,
generation: Generation,
resources: TenantSharedResources,
ctx: &RequestContext,
) -> Result<Arc<Tenant>, TenantMapInsertError> {
let location_conf = LocationConf::attached_single(tenant_conf, generation);
let slot_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::MustNotExist)?;
let tenant_path = super::create_tenant_files(conf, &location_conf, &tenant_id).await?;
let slot_guard =
tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustNotExist)?;
// TODO(sharding): make local paths shard-aware
let tenant_path =
super::create_tenant_files(conf, &location_conf, &tenant_shard_id.tenant_id).await?;
let created_tenant = tenant_spawn(
conf,
tenant_id,
tenant_shard_id.tenant_id,
&tenant_path,
resources,
AttachedTenantConf::try_from(location_conf)?,
@@ -715,11 +754,7 @@ pub(crate) async fn create_tenant(
// See https://github.com/neondatabase/neon/issues/4233
let created_tenant_id = created_tenant.tenant_id();
if tenant_id != created_tenant_id {
return Err(TenantMapInsertError::Other(anyhow::anyhow!(
"loaded created tenant has unexpected tenant id (expect {tenant_id} != actual {created_tenant_id})",
)));
}
debug_assert_eq!(created_tenant_id, tenant_shard_id.tenant_id);
slot_guard.upsert(TenantSlot::Attached(created_tenant.clone()))?;
@@ -755,21 +790,70 @@ pub(crate) async fn set_new_tenant_config(
}
impl TenantManager {
#[instrument(skip_all, fields(%tenant_id))]
/// Gets the attached tenant from the in-memory data, erroring if it's absent, in secondary mode, or is not fitting to the query.
/// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
///
/// This method is cancel-safe.
pub(crate) fn get_attached_tenant_shard(
&self,
tenant_shard_id: TenantShardId,
active_only: bool,
) -> Result<Arc<Tenant>, GetTenantError> {
let locked = self.tenants.read().unwrap();
let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)?;
match peek_slot {
Some(TenantSlot::Attached(tenant)) => match tenant.current_state() {
TenantState::Broken {
reason,
backtrace: _,
} if active_only => Err(GetTenantError::Broken(reason)),
TenantState::Active => Ok(Arc::clone(tenant)),
_ => {
if active_only {
Err(GetTenantError::NotActive(tenant_shard_id.tenant_id))
} else {
Ok(Arc::clone(tenant))
}
}
},
Some(TenantSlot::InProgress(_)) => {
Err(GetTenantError::NotActive(tenant_shard_id.tenant_id))
}
None | Some(TenantSlot::Secondary) => {
Err(GetTenantError::NotFound(tenant_shard_id.tenant_id))
}
}
}
pub(crate) async fn delete_timeline(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
_ctx: &RequestContext,
) -> Result<(), DeleteTimelineError> {
let tenant = self.get_attached_tenant_shard(tenant_shard_id, true)?;
DeleteTimelineFlow::run(&tenant, timeline_id, false).await?;
Ok(())
}
pub(crate) async fn upsert_location(
&self,
tenant_id: TenantId,
tenant_shard_id: TenantShardId,
new_location_config: LocationConf,
ctx: &RequestContext,
) -> Result<(), anyhow::Error> {
info!("configuring tenant location {tenant_id} to state {new_location_config:?}");
debug_assert_current_span_has_tenant_id();
info!("configuring tenant location to state {new_location_config:?}");
// Special case fast-path for updates to Tenant: if our upsert is only updating configuration,
// then we do not need to set the slot to InProgress, we can just call into the
// existng tenant.
{
let locked = self.tenants.read().unwrap();
let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, TenantSlotPeekMode::Write)?;
let peek_slot =
tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Write)?;
match (&new_location_config.mode, peek_slot) {
(LocationMode::Attached(attach_conf), Some(TenantSlot::Attached(tenant))) => {
if attach_conf.generation == tenant.generation {
@@ -800,7 +884,7 @@ impl TenantManager {
// the tenant is inaccessible to the outside world while we are doing this, but that is sensible:
// the state is ill-defined while we're in transition. Transitions are async, but fast: we do
// not do significant I/O, and shutdowns should be prompt via cancellation tokens.
let mut slot_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::Any)?;
let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
if let Some(TenantSlot::Attached(tenant)) = slot_guard.get_old_value() {
// The case where we keep a Tenant alive was covered above in the special case
@@ -831,25 +915,31 @@ impl TenantManager {
slot_guard.drop_old_value().expect("We just shut it down");
}
let tenant_path = self.conf.tenant_path(&tenant_id);
// TODO(sharding): make local paths sharding-aware
let tenant_path = self.conf.tenant_path(&tenant_shard_id.tenant_id);
let new_slot = match &new_location_config.mode {
LocationMode::Secondary(_) => {
let tenant_path = self.conf.tenant_path(&tenant_id);
// Directory doesn't need to be fsync'd because if we crash it can
// safely be recreated next time this tenant location is configured.
unsafe_create_dir_all(&tenant_path)
.await
.with_context(|| format!("Creating {tenant_path}"))?;
Tenant::persist_tenant_config(self.conf, &tenant_id, &new_location_config)
.await
.map_err(SetNewTenantConfigError::Persist)?;
// TODO(sharding): make local paths sharding-aware
Tenant::persist_tenant_config(
self.conf,
&tenant_shard_id.tenant_id,
&new_location_config,
)
.await
.map_err(SetNewTenantConfigError::Persist)?;
TenantSlot::Secondary
}
LocationMode::Attached(_attach_config) => {
let timelines_path = self.conf.timelines_path(&tenant_id);
// TODO(sharding): make local paths sharding-aware
let timelines_path = self.conf.timelines_path(&tenant_shard_id.tenant_id);
// Directory doesn't need to be fsync'd because we do not depend on
// it to exist after crashes: it may be recreated when tenant is
@@ -858,13 +948,19 @@ impl TenantManager {
.await
.with_context(|| format!("Creating {timelines_path}"))?;
Tenant::persist_tenant_config(self.conf, &tenant_id, &new_location_config)
.await
.map_err(SetNewTenantConfigError::Persist)?;
// TODO(sharding): make local paths sharding-aware
Tenant::persist_tenant_config(
self.conf,
&tenant_shard_id.tenant_id,
&new_location_config,
)
.await
.map_err(SetNewTenantConfigError::Persist)?;
// TODO(sharding): make spawn sharding-aware
let tenant = tenant_spawn(
self.conf,
tenant_id,
tenant_shard_id.tenant_id,
&tenant_path,
self.resources.clone(),
AttachedTenantConf::try_from(new_location_config)?,
@@ -910,7 +1006,11 @@ pub(crate) fn get_tenant(
active_only: bool,
) -> Result<Arc<Tenant>, GetTenantError> {
let locked = TENANTS.read().unwrap();
let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, TenantSlotPeekMode::Read)?;
// TODO(sharding): make all callers of get_tenant shard-aware
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)?;
match peek_slot {
Some(TenantSlot::Attached(tenant)) => match tenant.current_state() {
@@ -970,12 +1070,16 @@ pub(crate) async fn get_active_tenant_with_timeout(
Tenant(Arc<Tenant>),
}
// TODO(sharding): make page service interface sharding-aware (page service should apply ShardIdentity to the key
// to decide which shard services the request)
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
let wait_start = Instant::now();
let deadline = wait_start + timeout;
let wait_for = {
let locked = TENANTS.read().unwrap();
let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, TenantSlotPeekMode::Read)
let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
.map_err(GetTenantError::MapState)?;
match peek_slot {
Some(TenantSlot::Attached(tenant)) => {
@@ -1019,8 +1123,9 @@ pub(crate) async fn get_active_tenant_with_timeout(
})?;
{
let locked = TENANTS.read().unwrap();
let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, TenantSlotPeekMode::Read)
.map_err(GetTenantError::MapState)?;
let peek_slot =
tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
.map_err(GetTenantError::MapState)?;
match peek_slot {
Some(TenantSlot::Attached(tenant)) => tenant.clone(),
_ => {
@@ -1062,7 +1167,7 @@ pub(crate) async fn get_active_tenant_with_timeout(
pub(crate) async fn delete_tenant(
conf: &'static PageServerConf,
remote_storage: Option<GenericRemoteStorage>,
tenant_id: TenantId,
tenant_shard_id: TenantShardId,
) -> Result<(), DeleteTenantError> {
// We acquire a SlotGuard during this function to protect against concurrent
// changes while the ::prepare phase of DeleteTenantFlow executes, but then
@@ -1075,7 +1180,9 @@ pub(crate) async fn delete_tenant(
//
// See https://github.com/neondatabase/neon/issues/5080
let mut slot_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::MustExist)?;
// TODO(sharding): make delete API sharding-aware
let mut slot_guard =
tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist)?;
// unwrap is safe because we used MustExist mode when acquiring
let tenant = match slot_guard.get_old_value().as_ref().unwrap() {
@@ -1102,16 +1209,6 @@ pub(crate) enum DeleteTimelineError {
Timeline(#[from] crate::tenant::DeleteTimelineError),
}
pub(crate) async fn delete_timeline(
tenant_id: TenantId,
timeline_id: TimelineId,
_ctx: &RequestContext,
) -> Result<(), DeleteTimelineError> {
let tenant = get_tenant(tenant_id, true)?;
DeleteTimelineFlow::run(&tenant, timeline_id, false).await?;
Ok(())
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum TenantStateError {
#[error("Tenant {0} is stopping")]
@@ -1126,14 +1223,14 @@ pub(crate) enum TenantStateError {
pub(crate) async fn detach_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
tenant_shard_id: TenantShardId,
detach_ignored: bool,
deletion_queue_client: &DeletionQueueClient,
) -> Result<(), TenantStateError> {
let tmp_path = detach_tenant0(
conf,
&TENANTS,
tenant_id,
tenant_shard_id,
detach_ignored,
deletion_queue_client,
)
@@ -1160,19 +1257,24 @@ pub(crate) async fn detach_tenant(
async fn detach_tenant0(
conf: &'static PageServerConf,
tenants: &std::sync::RwLock<TenantsMap>,
tenant_id: TenantId,
tenant_shard_id: TenantShardId,
detach_ignored: bool,
deletion_queue_client: &DeletionQueueClient,
) -> Result<Utf8PathBuf, TenantStateError> {
let tenant_dir_rename_operation = |tenant_id_to_clean| async move {
let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean);
let tenant_dir_rename_operation = |tenant_id_to_clean: TenantShardId| async move {
// TODO(sharding): make local path helpers shard-aware
let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean.tenant_id);
safe_rename_tenant_dir(&local_tenant_directory)
.await
.with_context(|| format!("local tenant directory {local_tenant_directory:?} rename"))
};
let removal_result =
remove_tenant_from_memory(tenants, tenant_id, tenant_dir_rename_operation(tenant_id)).await;
let removal_result = remove_tenant_from_memory(
tenants,
tenant_shard_id,
tenant_dir_rename_operation(tenant_shard_id),
)
.await;
// Flush pending deletions, so that they have a good chance of passing validation
// before this tenant is potentially re-attached elsewhere.
@@ -1186,12 +1288,15 @@ async fn detach_tenant0(
Err(TenantStateError::SlotError(TenantSlotError::NotFound(_)))
)
{
let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id);
// TODO(sharding): make local paths sharding-aware
let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_shard_id.tenant_id);
if tenant_ignore_mark.exists() {
info!("Detaching an ignored tenant");
let tmp_path = tenant_dir_rename_operation(tenant_id)
let tmp_path = tenant_dir_rename_operation(tenant_shard_id)
.await
.with_context(|| format!("Ignored tenant {tenant_id} local directory rename"))?;
.with_context(|| {
format!("Ignored tenant {tenant_shard_id} local directory rename")
})?;
return Ok(tmp_path);
}
}
@@ -1208,7 +1313,11 @@ pub(crate) async fn load_tenant(
deletion_queue_client: DeletionQueueClient,
ctx: &RequestContext,
) -> Result<(), TenantMapInsertError> {
let slot_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::MustNotExist)?;
// This is a legacy API (replaced by `/location_conf`). It does not support sharding
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
let slot_guard =
tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustNotExist)?;
let tenant_path = conf.tenant_path(&tenant_id);
let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id);
@@ -1261,7 +1370,10 @@ async fn ignore_tenant0(
tenants: &std::sync::RwLock<TenantsMap>,
tenant_id: TenantId,
) -> Result<(), TenantStateError> {
remove_tenant_from_memory(tenants, tenant_id, async {
// This is a legacy API (replaced by `/location_conf`). It does not support sharding
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
remove_tenant_from_memory(tenants, tenant_shard_id, async {
let ignore_mark_file = conf.tenant_ignore_mark_file_path(&tenant_id);
fs::File::create(&ignore_mark_file)
.await
@@ -1270,7 +1382,7 @@ async fn ignore_tenant0(
crashsafe::fsync_file_and_parent(&ignore_mark_file)
.context("Failed to fsync ignore mark file")
})
.with_context(|| format!("Failed to crate ignore mark for tenant {tenant_id}"))?;
.with_context(|| format!("Failed to crate ignore mark for tenant {tenant_shard_id}"))?;
Ok(())
})
.await
@@ -1293,10 +1405,12 @@ pub(crate) async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, Tenan
};
Ok(m.iter()
.filter_map(|(id, tenant)| match tenant {
TenantSlot::Attached(tenant) => Some((*id, tenant.current_state())),
TenantSlot::Attached(tenant) => Some((id, tenant.current_state())),
TenantSlot::Secondary => None,
TenantSlot::InProgress(_) => None,
})
// TODO(sharding): make callers of this function shard-aware
.map(|(k, v)| (k.tenant_id, v))
.collect())
}
@@ -1312,7 +1426,11 @@ pub(crate) async fn attach_tenant(
resources: TenantSharedResources,
ctx: &RequestContext,
) -> Result<(), TenantMapInsertError> {
let slot_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::MustNotExist)?;
// This is a legacy API (replaced by `/location_conf`). It does not support sharding
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
let slot_guard =
tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustNotExist)?;
let location_conf = LocationConf::attached_single(tenant_conf, generation);
let tenant_dir = create_tenant_files(conf, &location_conf, &tenant_id).await?;
// TODO: tenant directory remains on disk if we bail out from here on.
@@ -1359,14 +1477,14 @@ pub(crate) enum TenantMapInsertError {
pub enum TenantSlotError {
/// When acquiring a slot with the expectation that the tenant already exists.
#[error("Tenant {0} not found")]
NotFound(TenantId),
NotFound(TenantShardId),
/// When acquiring a slot with the expectation that the tenant does not already exist.
#[error("tenant {0} already exists, state: {1:?}")]
AlreadyExists(TenantId, TenantState),
AlreadyExists(TenantShardId, TenantState),
#[error("tenant {0} already exists in but is not attached")]
Conflict(TenantId),
Conflict(TenantShardId),
// Tried to read a slot that is currently being mutated by another administrative
// operation.
@@ -1428,7 +1546,7 @@ pub enum TenantMapError {
/// `drop_old_value`. It is an error to call this without shutting down
/// the conents of `old_value`.
pub struct SlotGuard {
tenant_id: TenantId,
tenant_shard_id: TenantShardId,
old_value: Option<TenantSlot>,
upserted: bool,
@@ -1439,12 +1557,12 @@ pub struct SlotGuard {
impl SlotGuard {
fn new(
tenant_id: TenantId,
tenant_shard_id: TenantShardId,
old_value: Option<TenantSlot>,
completion: utils::completion::Completion,
) -> Self {
Self {
tenant_id,
tenant_shard_id,
old_value,
upserted: false,
_completion: completion,
@@ -1487,7 +1605,7 @@ impl SlotGuard {
TenantsMap::Open(m) => m,
};
let replaced = m.insert(self.tenant_id, new_value);
let replaced = m.insert(self.tenant_shard_id, new_value);
self.upserted = true;
METRICS.tenant_slots.set(m.len() as u64);
@@ -1506,7 +1624,7 @@ impl SlotGuard {
None => {
METRICS.unexpected_errors.inc();
error!(
tenant_id = %self.tenant_id,
tenant_shard_id = %self.tenant_shard_id,
"Missing InProgress marker during tenant upsert, this is a bug."
);
Err(TenantSlotUpsertError::InternalError(
@@ -1515,7 +1633,7 @@ impl SlotGuard {
}
Some(slot) => {
METRICS.unexpected_errors.inc();
error!(tenant_id=%self.tenant_id, "Unexpected contents of TenantSlot during upsert, this is a bug. Contents: {:?}", slot);
error!(tenant_shard_id=%self.tenant_shard_id, "Unexpected contents of TenantSlot during upsert, this is a bug. Contents: {:?}", slot);
Err(TenantSlotUpsertError::InternalError(
"Unexpected contents of TenantSlot".into(),
))
@@ -1593,12 +1711,12 @@ impl Drop for SlotGuard {
TenantsMap::Open(m) => m,
};
use std::collections::hash_map::Entry;
match m.entry(self.tenant_id) {
use std::collections::btree_map::Entry;
match m.entry(self.tenant_shard_id) {
Entry::Occupied(mut entry) => {
if !matches!(entry.get(), TenantSlot::InProgress(_)) {
METRICS.unexpected_errors.inc();
error!(tenant_id=%self.tenant_id, "Unexpected contents of TenantSlot during drop, this is a bug. Contents: {:?}", entry.get());
error!(tenant_shard_id=%self.tenant_shard_id, "Unexpected contents of TenantSlot during drop, this is a bug. Contents: {:?}", entry.get());
}
if self.old_value_is_shutdown() {
@@ -1610,7 +1728,7 @@ impl Drop for SlotGuard {
Entry::Vacant(_) => {
METRICS.unexpected_errors.inc();
error!(
tenant_id = %self.tenant_id,
tenant_shard_id = %self.tenant_shard_id,
"Missing InProgress marker during SlotGuard drop, this is a bug."
);
}
@@ -1629,7 +1747,7 @@ enum TenantSlotPeekMode {
fn tenant_map_peek_slot<'a>(
tenants: &'a std::sync::RwLockReadGuard<'a, TenantsMap>,
tenant_id: &TenantId,
tenant_shard_id: &TenantShardId,
mode: TenantSlotPeekMode,
) -> Result<Option<&'a TenantSlot>, TenantMapError> {
let m = match tenants.deref() {
@@ -1643,7 +1761,7 @@ fn tenant_map_peek_slot<'a>(
TenantsMap::Open(m) => m,
};
Ok(m.get(tenant_id))
Ok(m.get(tenant_shard_id))
}
enum TenantSlotAcquireMode {
@@ -1656,14 +1774,14 @@ enum TenantSlotAcquireMode {
}
fn tenant_map_acquire_slot(
tenant_id: &TenantId,
tenant_shard_id: &TenantShardId,
mode: TenantSlotAcquireMode,
) -> Result<SlotGuard, TenantSlotError> {
tenant_map_acquire_slot_impl(tenant_id, &TENANTS, mode)
tenant_map_acquire_slot_impl(tenant_shard_id, &TENANTS, mode)
}
fn tenant_map_acquire_slot_impl(
tenant_id: &TenantId,
tenant_shard_id: &TenantShardId,
tenants: &std::sync::RwLock<TenantsMap>,
mode: TenantSlotAcquireMode,
) -> Result<SlotGuard, TenantSlotError> {
@@ -1671,7 +1789,7 @@ fn tenant_map_acquire_slot_impl(
METRICS.tenant_slot_writes.inc();
let mut locked = tenants.write().unwrap();
let span = tracing::info_span!("acquire_slot", %tenant_id);
let span = tracing::info_span!("acquire_slot", tenant_id=%tenant_shard_id.tenant_id, shard=tenant_shard_id.shard_slug());
let _guard = span.enter();
let m = match &mut *locked {
@@ -1680,19 +1798,21 @@ fn tenant_map_acquire_slot_impl(
TenantsMap::Open(m) => m,
};
use std::collections::hash_map::Entry;
let entry = m.entry(*tenant_id);
use std::collections::btree_map::Entry;
let entry = m.entry(*tenant_shard_id);
match entry {
Entry::Vacant(v) => match mode {
MustExist => {
tracing::debug!("Vacant && MustExist: return NotFound");
Err(TenantSlotError::NotFound(*tenant_id))
Err(TenantSlotError::NotFound(*tenant_shard_id))
}
_ => {
let (completion, barrier) = utils::completion::channel();
v.insert(TenantSlot::InProgress(barrier));
tracing::debug!("Vacant, inserted InProgress");
Ok(SlotGuard::new(*tenant_id, None, completion))
Ok(SlotGuard::new(*tenant_shard_id, None, completion))
}
},
Entry::Occupied(mut o) => {
@@ -1706,7 +1826,7 @@ fn tenant_map_acquire_slot_impl(
TenantSlot::Attached(tenant) => {
tracing::debug!("Attached && MustNotExist, return AlreadyExists");
Err(TenantSlotError::AlreadyExists(
*tenant_id,
*tenant_shard_id,
tenant.current_state(),
))
}
@@ -1715,7 +1835,7 @@ fn tenant_map_acquire_slot_impl(
// to get the state from
tracing::debug!("Occupied & MustNotExist, return AlreadyExists");
Err(TenantSlotError::AlreadyExists(
*tenant_id,
*tenant_shard_id,
TenantState::Broken {
reason: "Present but not attached".to_string(),
backtrace: "".to_string(),
@@ -1728,7 +1848,11 @@ fn tenant_map_acquire_slot_impl(
let (completion, barrier) = utils::completion::channel();
let old_value = o.insert(TenantSlot::InProgress(barrier));
tracing::debug!("Occupied, replaced with InProgress");
Ok(SlotGuard::new(*tenant_id, Some(old_value), completion))
Ok(SlotGuard::new(
*tenant_shard_id,
Some(old_value),
completion,
))
}
}
}
@@ -1741,7 +1865,7 @@ fn tenant_map_acquire_slot_impl(
/// operation would be needed to remove it.
async fn remove_tenant_from_memory<V, F>(
tenants: &std::sync::RwLock<TenantsMap>,
tenant_id: TenantId,
tenant_shard_id: TenantShardId,
tenant_cleanup: F,
) -> Result<V, TenantStateError>
where
@@ -1750,7 +1874,7 @@ where
use utils::completion;
let mut slot_guard =
tenant_map_acquire_slot_impl(&tenant_id, tenants, TenantSlotAcquireMode::MustExist)?;
tenant_map_acquire_slot_impl(&tenant_shard_id, tenants, TenantSlotAcquireMode::MustExist)?;
// The SlotGuard allows us to manipulate the Tenant object without fear of some
// concurrent API request doing something else for the same tenant ID.
@@ -1777,7 +1901,7 @@ where
// if pageserver shutdown or other detach/ignore is already ongoing, we don't want to
// wait for it but return an error right away because these are distinct requests.
slot_guard.revert();
return Err(TenantStateError::IsStopping(tenant_id));
return Err(TenantStateError::IsStopping(tenant_shard_id.tenant_id));
}
}
}
@@ -1788,7 +1912,7 @@ where
match tenant_cleanup
.await
.with_context(|| format!("Failed to run cleanup for tenant {tenant_id}"))
.with_context(|| format!("Failed to run cleanup for tenant {tenant_shard_id}"))
{
Ok(hook_value) => {
// Success: drop the old TenantSlot::Attached.
@@ -1867,7 +1991,8 @@ pub(crate) async fn immediate_gc(
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use pageserver_api::shard::TenantShardId;
use std::collections::BTreeMap;
use std::sync::Arc;
use tracing::{info_span, Instrument};
@@ -1887,12 +2012,12 @@ mod tests {
// harness loads it to active, which is forced and nothing is running on the tenant
let id = t.tenant_id();
let id = TenantShardId::unsharded(t.tenant_id());
// tenant harness configures the logging and we cannot escape it
let _e = info_span!("testing", tenant_id = %id).entered();
let tenants = HashMap::from([(id, TenantSlot::Attached(t.clone()))]);
let tenants = BTreeMap::from([(id, TenantSlot::Attached(t.clone()))]);
let tenants = Arc::new(std::sync::RwLock::new(TenantsMap::Open(tenants)));
// Invoke remove_tenant_from_memory with a cleanup hook that blocks until we manually