Compare commits

...

10 Commits

Author SHA1 Message Date
Conrad Ludgate
768c18d0ac lint 2023-11-17 09:39:45 +01:00
Conrad Ludgate
f37cb4f3c2 fix username 2023-11-17 09:30:10 +01:00
Conrad Ludgate
3560f6bcb7 debug 2023-11-17 09:01:14 +01:00
Conrad Ludgate
0c8bc58d1b update endpoint 2023-11-16 19:23:26 +01:00
Conrad Ludgate
ea43f60763 fixup api 2023-11-16 18:24:13 +01:00
Conrad Ludgate
8c1f7adb96 update endpoint 2023-11-16 18:00:07 +01:00
Conrad Ludgate
2634fe9ecf hakari 2023-11-16 16:48:24 +01:00
Conrad Ludgate
46645d1211 fmt 2023-11-16 16:35:21 +01:00
Conrad Ludgate
5004e62f5b proxy: start work on jwt auth 2023-11-16 15:15:08 +01:00
John Spray
ab631e6792 pageserver: make TenantsMap shard-aware (#5819)
## Problem

When using TenantId as the key, we are unable to handle multiple tenant
shards attached to the same pageserver for the same tenant ID. This is
an expected scenario if we have e.g. 8 shards and 5 pageservers.

## Summary of changes

- TenantsMap is now a BTreeMap instead of a HashMap: this enables
looking up by range. In future, we will need this for page_service, as
incoming requests will just specify the Key, and we'll have to figure
out which shard to route it to.
- A new key type TenantShardId is introduced, to act as the key in
TenantsMap, and as the id type in external APIs. Its human readable
serialization is backward compatible with TenantId, and also
forward-compatible as long as sharding is not actually used (when we
construct a TenantShardId with ShardCount(0), it serializes to an
old-fashioned TenantId).
- Essential tenant APIs are updated to accept TenantShardIds:
tenant/timeline create, tenant delete, and /location_conf. These are the
APIs that will enable driving sharded tenants. Other apis like /attach
/detach /load /ignore will not work with sharding: those will soon be
deprecated and replaced with /location_conf as part of the live
migration work.

Closes: #5787
2023-11-15 23:20:21 +02:00
21 changed files with 1199 additions and 320 deletions

93
Cargo.lock generated
View File

@@ -274,7 +274,7 @@ dependencies = [
"hex",
"http",
"hyper",
"ring",
"ring 0.16.20",
"time",
"tokio",
"tower",
@@ -703,7 +703,7 @@ dependencies = [
"bytes",
"dyn-clone",
"futures",
"getrandom 0.2.9",
"getrandom 0.2.11",
"http-types",
"log",
"paste",
@@ -863,6 +863,22 @@ dependencies = [
"which",
]
[[package]]
name = "biscuit"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e28fc7c56c61743a01d0d1b73e4fed68b8a4f032ea3a2d4bb8c6520a33fc05a"
dependencies = [
"chrono",
"data-encoding",
"num-bigint",
"num-traits",
"once_cell",
"ring 0.17.5",
"serde",
"serde_json",
]
[[package]]
name = "bitflags"
version = "1.3.2"
@@ -945,11 +961,12 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "cc"
version = "1.0.79"
version = "1.0.83"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f"
checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0"
dependencies = [
"jobserver",
"libc",
]
[[package]]
@@ -1846,9 +1863,9 @@ dependencies = [
[[package]]
name = "getrandom"
version = "0.2.9"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c85e1d9ab2eadba7e5040d4e09cbd6d072b76a557ad64e797c2cb9d4da21d7e4"
checksum = "fe9006bed769170c11f845cf00c7c1e9092aeb3f268e007c3e760ac68008070f"
dependencies = [
"cfg-if",
"js-sys",
@@ -2342,7 +2359,7 @@ checksum = "6971da4d9c3aa03c3d8f3ff0f4155b534aad021292003895a469716b2a230378"
dependencies = [
"base64 0.21.1",
"pem 1.1.1",
"ring",
"ring 0.16.20",
"serde",
"serde_json",
"simple_asn1",
@@ -2382,9 +2399,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]]
name = "libc"
version = "0.2.144"
version = "0.2.150"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b00cc1c228a6782d0f076e7b232802e0c5689d41bb5df366f2a6b6621cfdfe1"
checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c"
[[package]]
name = "libloading"
@@ -2691,7 +2708,7 @@ checksum = "c38841cdd844847e3e7c8d29cef9dcfed8877f8f56f9071f77843ecf3baf937f"
dependencies = [
"base64 0.13.1",
"chrono",
"getrandom 0.2.9",
"getrandom 0.2.11",
"http",
"rand 0.8.5",
"serde",
@@ -2994,10 +3011,12 @@ name = "pageserver_api"
version = "0.1.0"
dependencies = [
"anyhow",
"bincode",
"byteorder",
"bytes",
"const_format",
"enum-map",
"hex",
"postgres_ffi",
"serde",
"serde_json",
@@ -3472,6 +3491,7 @@ dependencies = [
"anyhow",
"async-trait",
"base64 0.13.1",
"biscuit",
"bstr",
"bytes",
"chrono",
@@ -3617,7 +3637,7 @@ version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
dependencies = [
"getrandom 0.2.9",
"getrandom 0.2.11",
]
[[package]]
@@ -3658,7 +3678,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4954fbc00dcd4d8282c987710e50ba513d351400dbdd00e803a05172a90d8976"
dependencies = [
"pem 2.0.1",
"ring",
"ring 0.16.20",
"time",
"yasna",
]
@@ -3828,7 +3848,7 @@ dependencies = [
"async-trait",
"chrono",
"futures",
"getrandom 0.2.9",
"getrandom 0.2.11",
"http",
"hyper",
"parking_lot 0.11.2",
@@ -3849,7 +3869,7 @@ checksum = "1b97ad83c2fc18113346b7158d79732242002427c30f620fa817c1f32901e0a8"
dependencies = [
"anyhow",
"async-trait",
"getrandom 0.2.9",
"getrandom 0.2.11",
"matchit",
"opentelemetry",
"reqwest",
@@ -3880,11 +3900,25 @@ dependencies = [
"libc",
"once_cell",
"spin 0.5.2",
"untrusted",
"untrusted 0.7.1",
"web-sys",
"winapi",
]
[[package]]
name = "ring"
version = "0.17.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb0205304757e5d899b9c2e448b867ffd03ae7f988002e47cd24954391394d0b"
dependencies = [
"cc",
"getrandom 0.2.11",
"libc",
"spin 0.9.8",
"untrusted 0.9.0",
"windows-sys 0.48.0",
]
[[package]]
name = "routerify"
version = "3.0.0"
@@ -4001,7 +4035,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d1feddffcfcc0b33f5c6ce9a29e341e4cd59c3f78e7ee45f4a40c038b1d6cbb"
dependencies = [
"log",
"ring",
"ring 0.16.20",
"rustls-webpki 0.101.4",
"sct",
]
@@ -4033,8 +4067,8 @@ version = "0.100.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e98ff011474fa39949b7e5c0428f9b4937eda7da7848bbb947786b7be0b27dab"
dependencies = [
"ring",
"untrusted",
"ring 0.16.20",
"untrusted 0.7.1",
]
[[package]]
@@ -4043,8 +4077,8 @@ version = "0.101.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d93931baf2d282fff8d3a532bbfd7653f734643161b87e3e01e59a04439bf0d"
dependencies = [
"ring",
"untrusted",
"ring 0.16.20",
"untrusted 0.7.1",
]
[[package]]
@@ -4189,8 +4223,8 @@ version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4"
dependencies = [
"ring",
"untrusted",
"ring 0.16.20",
"untrusted 0.7.1",
]
[[package]]
@@ -4309,7 +4343,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99dc599bd6646884fc403d593cdcb9816dd67c50cff3271c01ff123617908dcd"
dependencies = [
"debugid",
"getrandom 0.2.9",
"getrandom 0.2.11",
"hex",
"serde",
"serde_json",
@@ -4355,6 +4389,7 @@ version = "1.0.96"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "057d394a50403bcac12672b2b18fb387ab6d289d957dab67dd201875391e52f1"
dependencies = [
"indexmap",
"itoa",
"ryu",
"serde",
@@ -4958,7 +4993,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd5831152cb0d3f79ef5523b357319ba154795d64c7078b2daa95a803b54057f"
dependencies = [
"futures",
"ring",
"ring 0.16.20",
"rustls",
"tokio",
"tokio-postgres",
@@ -5414,6 +5449,12 @@ version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
[[package]]
name = "untrusted"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1"
[[package]]
name = "ureq"
version = "2.7.1"
@@ -5515,7 +5556,7 @@ version = "1.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "345444e32442451b267fc254ae85a209c64be56d2890e601a0c37ff0c3c5ecd2"
dependencies = [
"getrandom 0.2.9",
"getrandom 0.2.11",
"serde",
]
@@ -6008,7 +6049,7 @@ dependencies = [
"regex",
"regex-syntax 0.7.2",
"reqwest",
"ring",
"ring 0.16.20",
"rustls",
"scopeguard",
"serde",

View File

@@ -18,6 +18,7 @@ use camino::Utf8PathBuf;
use pageserver_api::models::{
self, LocationConfig, TenantInfo, TenantLocationConfigRequest, TimelineInfo,
};
use pageserver_api::shard::TenantShardId;
use postgres_backend::AuthType;
use postgres_connection::{parse_host_port, PgConnectionConfig};
use reqwest::blocking::{Client, RequestBuilder, Response};
@@ -408,7 +409,7 @@ impl PageServerNode {
};
let request = models::TenantCreateRequest {
new_tenant_id,
new_tenant_id: TenantShardId::unsharded(new_tenant_id),
generation,
config,
};

View File

@@ -17,5 +17,9 @@ postgres_ffi.workspace = true
enum-map.workspace = true
strum.workspace = true
strum_macros.workspace = true
hex.workspace = true
workspace_hack.workspace = true
[dev-dependencies]
bincode.workspace = true

View File

@@ -0,0 +1,142 @@
use anyhow::{bail, Result};
use byteorder::{ByteOrder, BE};
use serde::{Deserialize, Serialize};
use std::fmt;
/// Key used in the Repository kv-store.
///
/// The Repository treats this as an opaque struct, but see the code in pgdatadir_mapping.rs
/// for what we actually store in these fields.
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)]
pub struct Key {
pub field1: u8,
pub field2: u32,
pub field3: u32,
pub field4: u32,
pub field5: u8,
pub field6: u32,
}
pub const KEY_SIZE: usize = 18;
impl Key {
/// 'field2' is used to store tablespaceid for relations and small enum numbers for other relish.
/// As long as Neon does not support tablespace (because of lack of access to local file system),
/// we can assume that only some predefined namespace OIDs are used which can fit in u16
pub fn to_i128(&self) -> i128 {
assert!(self.field2 < 0xFFFF || self.field2 == 0xFFFFFFFF || self.field2 == 0x22222222);
(((self.field1 & 0xf) as i128) << 120)
| (((self.field2 & 0xFFFF) as i128) << 104)
| ((self.field3 as i128) << 72)
| ((self.field4 as i128) << 40)
| ((self.field5 as i128) << 32)
| self.field6 as i128
}
pub const fn from_i128(x: i128) -> Self {
Key {
field1: ((x >> 120) & 0xf) as u8,
field2: ((x >> 104) & 0xFFFF) as u32,
field3: (x >> 72) as u32,
field4: (x >> 40) as u32,
field5: (x >> 32) as u8,
field6: x as u32,
}
}
pub fn next(&self) -> Key {
self.add(1)
}
pub fn add(&self, x: u32) -> Key {
let mut key = *self;
let r = key.field6.overflowing_add(x);
key.field6 = r.0;
if r.1 {
let r = key.field5.overflowing_add(1);
key.field5 = r.0;
if r.1 {
let r = key.field4.overflowing_add(1);
key.field4 = r.0;
if r.1 {
let r = key.field3.overflowing_add(1);
key.field3 = r.0;
if r.1 {
let r = key.field2.overflowing_add(1);
key.field2 = r.0;
if r.1 {
let r = key.field1.overflowing_add(1);
key.field1 = r.0;
assert!(!r.1);
}
}
}
}
}
key
}
pub fn from_slice(b: &[u8]) -> Self {
Key {
field1: b[0],
field2: u32::from_be_bytes(b[1..5].try_into().unwrap()),
field3: u32::from_be_bytes(b[5..9].try_into().unwrap()),
field4: u32::from_be_bytes(b[9..13].try_into().unwrap()),
field5: b[13],
field6: u32::from_be_bytes(b[14..18].try_into().unwrap()),
}
}
pub fn write_to_byte_slice(&self, buf: &mut [u8]) {
buf[0] = self.field1;
BE::write_u32(&mut buf[1..5], self.field2);
BE::write_u32(&mut buf[5..9], self.field3);
BE::write_u32(&mut buf[9..13], self.field4);
buf[13] = self.field5;
BE::write_u32(&mut buf[14..18], self.field6);
}
}
impl fmt::Display for Key {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{:02X}{:08X}{:08X}{:08X}{:02X}{:08X}",
self.field1, self.field2, self.field3, self.field4, self.field5, self.field6
)
}
}
impl Key {
pub const MIN: Key = Key {
field1: u8::MIN,
field2: u32::MIN,
field3: u32::MIN,
field4: u32::MIN,
field5: u8::MIN,
field6: u32::MIN,
};
pub const MAX: Key = Key {
field1: u8::MAX,
field2: u32::MAX,
field3: u32::MAX,
field4: u32::MAX,
field5: u8::MAX,
field6: u32::MAX,
};
pub fn from_hex(s: &str) -> Result<Self> {
if s.len() != 36 {
bail!("parse error");
}
Ok(Key {
field1: u8::from_str_radix(&s[0..2], 16)?,
field2: u32::from_str_radix(&s[2..10], 16)?,
field3: u32::from_str_radix(&s[10..18], 16)?,
field4: u32::from_str_radix(&s[18..26], 16)?,
field5: u8::from_str_radix(&s[26..28], 16)?,
field6: u32::from_str_radix(&s[28..36], 16)?,
})
}
}

View File

@@ -4,8 +4,10 @@ use const_format::formatcp;
/// Public API types
pub mod control_api;
pub mod key;
pub mod models;
pub mod reltag;
pub mod shard;
pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");

View File

@@ -16,7 +16,7 @@ use utils::{
lsn::Lsn,
};
use crate::reltag::RelTag;
use crate::{reltag::RelTag, shard::TenantShardId};
use anyhow::bail;
use bytes::{BufMut, Bytes, BytesMut};
@@ -187,7 +187,7 @@ pub struct TimelineCreateRequest {
#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
pub struct TenantCreateRequest {
pub new_tenant_id: TenantId,
pub new_tenant_id: TenantShardId,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub generation: Option<u32>,

View File

@@ -0,0 +1,321 @@
use std::{ops::RangeInclusive, str::FromStr};
use hex::FromHex;
use serde::{Deserialize, Serialize};
use utils::id::TenantId;
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug)]
pub struct ShardNumber(pub u8);
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug)]
pub struct ShardCount(pub u8);
impl ShardCount {
pub const MAX: Self = Self(u8::MAX);
}
impl ShardNumber {
pub const MAX: Self = Self(u8::MAX);
}
/// TenantShardId identify the units of work for the Pageserver.
///
/// These are written as `<tenant_id>-<shard number><shard-count>`, for example:
///
/// # The second shard in a two-shard tenant
/// 072f1291a5310026820b2fe4b2968934-0102
///
/// Historically, tenants could not have multiple shards, and were identified
/// by TenantId. To support this, TenantShardId has a special legacy
/// mode where `shard_count` is equal to zero: this represents a single-sharded
/// tenant which should be written as a TenantId with no suffix.
///
/// The human-readable encoding of TenantShardId, such as used in API URLs,
/// is both forward and backward compatible: a legacy TenantId can be
/// decoded as a TenantShardId, and when re-encoded it will be parseable
/// as a TenantId.
///
/// Note that the binary encoding is _not_ backward compatible, because
/// at the time sharding is introduced, there are no existing binary structures
/// containing TenantId that we need to handle.
#[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy)]
pub struct TenantShardId {
pub tenant_id: TenantId,
pub shard_number: ShardNumber,
pub shard_count: ShardCount,
}
impl TenantShardId {
pub fn unsharded(tenant_id: TenantId) -> Self {
Self {
tenant_id,
shard_number: ShardNumber(0),
shard_count: ShardCount(0),
}
}
/// The range of all TenantShardId that belong to a particular TenantId. This is useful when
/// you have a BTreeMap of TenantShardId, and are querying by TenantId.
pub fn tenant_range(tenant_id: TenantId) -> RangeInclusive<Self> {
RangeInclusive::new(
Self {
tenant_id,
shard_number: ShardNumber(0),
shard_count: ShardCount(0),
},
Self {
tenant_id,
shard_number: ShardNumber::MAX,
shard_count: ShardCount::MAX,
},
)
}
pub fn shard_slug(&self) -> String {
format!("{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
}
}
impl std::fmt::Display for TenantShardId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.shard_count != ShardCount(0) {
write!(
f,
"{}-{:02x}{:02x}",
self.tenant_id, self.shard_number.0, self.shard_count.0
)
} else {
// Legacy case (shard_count == 0) -- format as just the tenant id. Note that this
// is distinct from the normal single shard case (shard count == 1).
self.tenant_id.fmt(f)
}
}
}
impl std::fmt::Debug for TenantShardId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// Debug is the same as Display: the compact hex representation
write!(f, "{}", self)
}
}
impl std::str::FromStr for TenantShardId {
type Err = hex::FromHexError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
// Expect format: 16 byte TenantId, '-', 1 byte shard number, 1 byte shard count
if s.len() == 32 {
// Legacy case: no shard specified
Ok(Self {
tenant_id: TenantId::from_str(s)?,
shard_number: ShardNumber(0),
shard_count: ShardCount(0),
})
} else if s.len() == 37 {
let bytes = s.as_bytes();
let tenant_id = TenantId::from_hex(&bytes[0..32])?;
let mut shard_parts: [u8; 2] = [0u8; 2];
hex::decode_to_slice(&bytes[33..37], &mut shard_parts)?;
Ok(Self {
tenant_id,
shard_number: ShardNumber(shard_parts[0]),
shard_count: ShardCount(shard_parts[1]),
})
} else {
Err(hex::FromHexError::InvalidStringLength)
}
}
}
impl From<[u8; 18]> for TenantShardId {
fn from(b: [u8; 18]) -> Self {
let tenant_id_bytes: [u8; 16] = b[0..16].try_into().unwrap();
Self {
tenant_id: TenantId::from(tenant_id_bytes),
shard_number: ShardNumber(b[16]),
shard_count: ShardCount(b[17]),
}
}
}
impl Serialize for TenantShardId {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
if serializer.is_human_readable() {
serializer.collect_str(self)
} else {
let mut packed: [u8; 18] = [0; 18];
packed[0..16].clone_from_slice(&self.tenant_id.as_arr());
packed[16] = self.shard_number.0;
packed[17] = self.shard_count.0;
packed.serialize(serializer)
}
}
}
impl<'de> Deserialize<'de> for TenantShardId {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct IdVisitor {
is_human_readable_deserializer: bool,
}
impl<'de> serde::de::Visitor<'de> for IdVisitor {
type Value = TenantShardId;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
if self.is_human_readable_deserializer {
formatter.write_str("value in form of hex string")
} else {
formatter.write_str("value in form of integer array([u8; 18])")
}
}
fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
where
A: serde::de::SeqAccess<'de>,
{
let s = serde::de::value::SeqAccessDeserializer::new(seq);
let id: [u8; 18] = Deserialize::deserialize(s)?;
Ok(TenantShardId::from(id))
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
TenantShardId::from_str(v).map_err(E::custom)
}
}
if deserializer.is_human_readable() {
deserializer.deserialize_str(IdVisitor {
is_human_readable_deserializer: true,
})
} else {
deserializer.deserialize_tuple(
18,
IdVisitor {
is_human_readable_deserializer: false,
},
)
}
}
}
#[cfg(test)]
mod tests {
use std::str::FromStr;
use bincode;
use utils::{id::TenantId, Hex};
use super::*;
const EXAMPLE_TENANT_ID: &str = "1f359dd625e519a1a4e8d7509690f6fc";
#[test]
fn tenant_shard_id_string() -> Result<(), hex::FromHexError> {
let example = TenantShardId {
tenant_id: TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(),
shard_count: ShardCount(10),
shard_number: ShardNumber(7),
};
let encoded = format!("{example}");
let expected = format!("{EXAMPLE_TENANT_ID}-070a");
assert_eq!(&encoded, &expected);
let decoded = TenantShardId::from_str(&encoded)?;
assert_eq!(example, decoded);
Ok(())
}
#[test]
fn tenant_shard_id_binary() -> Result<(), hex::FromHexError> {
let example = TenantShardId {
tenant_id: TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(),
shard_count: ShardCount(10),
shard_number: ShardNumber(7),
};
let encoded = bincode::serialize(&example).unwrap();
let expected: [u8; 18] = [
0x1f, 0x35, 0x9d, 0xd6, 0x25, 0xe5, 0x19, 0xa1, 0xa4, 0xe8, 0xd7, 0x50, 0x96, 0x90,
0xf6, 0xfc, 0x07, 0x0a,
];
assert_eq!(Hex(&encoded), Hex(&expected));
let decoded = bincode::deserialize(&encoded).unwrap();
assert_eq!(example, decoded);
Ok(())
}
#[test]
fn tenant_shard_id_backward_compat() -> Result<(), hex::FromHexError> {
// Test that TenantShardId can decode a TenantId in human
// readable form
let example = TenantId::from_str(EXAMPLE_TENANT_ID).unwrap();
let encoded = format!("{example}");
assert_eq!(&encoded, EXAMPLE_TENANT_ID);
let decoded = TenantShardId::from_str(&encoded)?;
assert_eq!(example, decoded.tenant_id);
assert_eq!(decoded.shard_count, ShardCount(0));
assert_eq!(decoded.shard_number, ShardNumber(0));
Ok(())
}
#[test]
fn tenant_shard_id_forward_compat() -> Result<(), hex::FromHexError> {
// Test that a legacy TenantShardId encodes into a form that
// can be decoded as TenantId
let example_tenant_id = TenantId::from_str(EXAMPLE_TENANT_ID).unwrap();
let example = TenantShardId::unsharded(example_tenant_id);
let encoded = format!("{example}");
assert_eq!(&encoded, EXAMPLE_TENANT_ID);
let decoded = TenantId::from_str(&encoded)?;
assert_eq!(example_tenant_id, decoded);
Ok(())
}
#[test]
fn tenant_shard_id_legacy_binary() -> Result<(), hex::FromHexError> {
// Unlike in human readable encoding, binary encoding does not
// do any special handling of legacy unsharded TenantIds: this test
// is equivalent to the main test for binary encoding, just verifying
// that the same behavior applies when we have used `unsharded()` to
// construct a TenantShardId.
let example = TenantShardId::unsharded(TenantId::from_str(EXAMPLE_TENANT_ID).unwrap());
let encoded = bincode::serialize(&example).unwrap();
let expected: [u8; 18] = [
0x1f, 0x35, 0x9d, 0xd6, 0x25, 0xe5, 0x19, 0xa1, 0xa4, 0xe8, 0xd7, 0x50, 0x96, 0x90,
0xf6, 0xfc, 0x00, 0x00,
];
assert_eq!(Hex(&encoded), Hex(&expected));
let decoded = bincode::deserialize::<TenantShardId>(&encoded).unwrap();
assert_eq!(example, decoded);
Ok(())
}
}

View File

@@ -16,6 +16,7 @@ use pageserver_api::models::{
DownloadRemoteLayersTaskSpawnRequest, LocationConfigMode, TenantAttachRequest,
TenantLoadRequest, TenantLocationConfigRequest,
};
use pageserver_api::shard::TenantShardId;
use remote_storage::GenericRemoteStorage;
use tenant_size_model::{SizeResult, StorageModel};
use tokio_util::sync::CancellationToken;
@@ -419,9 +420,9 @@ async fn timeline_create_handler(
mut request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let request_data: TimelineCreateRequest = json_request(&mut request).await?;
check_permission(&request, Some(tenant_id))?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let new_timeline_id = request_data.new_timeline_id;
@@ -430,7 +431,7 @@ async fn timeline_create_handler(
let state = get_state(&request);
async {
let tenant = mgr::get_tenant(tenant_id, true)?;
let tenant = state.tenant_manager.get_attached_tenant_shard(tenant_shard_id, true)?;
match tenant.create_timeline(
new_timeline_id,
request_data.ancestor_timeline_id.map(TimelineId::from),
@@ -464,7 +465,10 @@ async fn timeline_create_handler(
Err(tenant::CreateTimelineError::Other(err)) => Err(ApiError::InternalServerError(err)),
}
}
.instrument(info_span!("timeline_create", %tenant_id, timeline_id = %new_timeline_id, lsn=?request_data.ancestor_start_lsn, pg_version=?request_data.pg_version))
.instrument(info_span!("timeline_create",
tenant_id = %tenant_shard_id.tenant_id,
shard = %tenant_shard_id.shard_slug(),
timeline_id = %new_timeline_id, lsn=?request_data.ancestor_start_lsn, pg_version=?request_data.pg_version))
.await
}
@@ -660,14 +664,15 @@ async fn timeline_delete_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
let state = get_state(&request);
mgr::delete_timeline(tenant_id, timeline_id, &ctx)
.instrument(info_span!("timeline_delete", %tenant_id, %timeline_id))
state.tenant_manager.delete_timeline(tenant_shard_id, timeline_id, &ctx)
.instrument(info_span!("timeline_delete", tenant_id=%tenant_shard_id.tenant_id, shard=%tenant_shard_id.shard_slug(), %timeline_id))
.await?;
json_response(StatusCode::ACCEPTED, ())
@@ -681,11 +686,14 @@ async fn tenant_detach_handler(
check_permission(&request, Some(tenant_id))?;
let detach_ignored: Option<bool> = parse_query_param(&request, "detach_ignored")?;
// This is a legacy API (`/location_conf` is the replacement). It only supports unsharded tenants
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
let state = get_state(&request);
let conf = state.conf;
mgr::detach_tenant(
conf,
tenant_id,
tenant_shard_id,
detach_ignored.unwrap_or(false),
&state.deletion_queue_client,
)
@@ -802,13 +810,16 @@ async fn tenant_delete_handler(
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
// TODO openapi spec
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let state = get_state(&request);
mgr::delete_tenant(state.conf, state.remote_storage.clone(), tenant_id)
.instrument(info_span!("tenant_delete_handler", %tenant_id))
mgr::delete_tenant(state.conf, state.remote_storage.clone(), tenant_shard_id)
.instrument(info_span!("tenant_delete_handler",
tenant_id = %tenant_shard_id.tenant_id,
shard = tenant_shard_id.shard_slug()
))
.await?;
json_response(StatusCode::ACCEPTED, ())
@@ -1138,9 +1149,10 @@ async fn put_tenant_location_config_handler(
mut request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let request_data: TenantLocationConfigRequest = json_request(&mut request).await?;
let tenant_id = request_data.tenant_id;
check_permission(&request, Some(tenant_id))?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
let state = get_state(&request);
@@ -1149,9 +1161,13 @@ async fn put_tenant_location_config_handler(
// The `Detached` state is special, it doesn't upsert a tenant, it removes
// its local disk content and drops it from memory.
if let LocationConfigMode::Detached = request_data.config.mode {
if let Err(e) = mgr::detach_tenant(conf, tenant_id, true, &state.deletion_queue_client)
.instrument(info_span!("tenant_detach", %tenant_id))
.await
if let Err(e) =
mgr::detach_tenant(conf, tenant_shard_id, true, &state.deletion_queue_client)
.instrument(info_span!("tenant_detach",
tenant_id = %tenant_shard_id.tenant_id,
shard = tenant_shard_id.shard_slug()
))
.await
{
match e {
TenantStateError::SlotError(TenantSlotError::NotFound(_)) => {
@@ -1168,7 +1184,7 @@ async fn put_tenant_location_config_handler(
state
.tenant_manager
.upsert_location(tenant_id, location_conf, &ctx)
.upsert_location(tenant_shard_id, location_conf, &ctx)
.await
// TODO: badrequest assumes the caller was asking for something unreasonable, but in
// principle we might have hit something like concurrent API calls to the same tenant,
@@ -1752,7 +1768,7 @@ pub fn make_router(
.get("/v1/tenant", |r| api_handler(r, tenant_list_handler))
.post("/v1/tenant", |r| api_handler(r, tenant_create_handler))
.get("/v1/tenant/:tenant_id", |r| api_handler(r, tenant_status))
.delete("/v1/tenant/:tenant_id", |r| {
.delete("/v1/tenant/:tenant_shard_id", |r| {
api_handler(r, tenant_delete_handler)
})
.get("/v1/tenant/:tenant_id/synthetic_size", |r| {
@@ -1764,13 +1780,13 @@ pub fn make_router(
.get("/v1/tenant/:tenant_id/config", |r| {
api_handler(r, get_tenant_config_handler)
})
.put("/v1/tenant/:tenant_id/location_config", |r| {
.put("/v1/tenant/:tenant_shard_id/location_config", |r| {
api_handler(r, put_tenant_location_config_handler)
})
.get("/v1/tenant/:tenant_id/timeline", |r| {
api_handler(r, timeline_list_handler)
})
.post("/v1/tenant/:tenant_id/timeline", |r| {
.post("/v1/tenant/:tenant_shard_id/timeline", |r| {
api_handler(r, timeline_create_handler)
})
.post("/v1/tenant/:tenant_id/attach", |r| {
@@ -1814,7 +1830,7 @@ pub fn make_router(
"/v1/tenant/:tenant_id/timeline/:timeline_id/download_remote_layers",
|r| api_handler(r, timeline_download_remote_layers_handler_get),
)
.delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
.delete("/v1/tenant/:tenant_shard_id/timeline/:timeline_id", |r| {
api_handler(r, timeline_delete_handler)
})
.get("/v1/tenant/:tenant_id/timeline/:timeline_id/layer", |r| {

View File

@@ -1,106 +1,11 @@
use crate::walrecord::NeonWalRecord;
use anyhow::{bail, Result};
use byteorder::{ByteOrder, BE};
use anyhow::Result;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::ops::{AddAssign, Range};
use std::time::Duration;
/// Key used in the Repository kv-store.
///
/// The Repository treats this as an opaque struct, but see the code in pgdatadir_mapping.rs
/// for what we actually store in these fields.
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)]
pub struct Key {
pub field1: u8,
pub field2: u32,
pub field3: u32,
pub field4: u32,
pub field5: u8,
pub field6: u32,
}
pub const KEY_SIZE: usize = 18;
impl Key {
/// 'field2' is used to store tablespaceid for relations and small enum numbers for other relish.
/// As long as Neon does not support tablespace (because of lack of access to local file system),
/// we can assume that only some predefined namespace OIDs are used which can fit in u16
pub fn to_i128(&self) -> i128 {
assert!(self.field2 < 0xFFFF || self.field2 == 0xFFFFFFFF || self.field2 == 0x22222222);
(((self.field1 & 0xf) as i128) << 120)
| (((self.field2 & 0xFFFF) as i128) << 104)
| ((self.field3 as i128) << 72)
| ((self.field4 as i128) << 40)
| ((self.field5 as i128) << 32)
| self.field6 as i128
}
pub const fn from_i128(x: i128) -> Self {
Key {
field1: ((x >> 120) & 0xf) as u8,
field2: ((x >> 104) & 0xFFFF) as u32,
field3: (x >> 72) as u32,
field4: (x >> 40) as u32,
field5: (x >> 32) as u8,
field6: x as u32,
}
}
pub fn next(&self) -> Key {
self.add(1)
}
pub fn add(&self, x: u32) -> Key {
let mut key = *self;
let r = key.field6.overflowing_add(x);
key.field6 = r.0;
if r.1 {
let r = key.field5.overflowing_add(1);
key.field5 = r.0;
if r.1 {
let r = key.field4.overflowing_add(1);
key.field4 = r.0;
if r.1 {
let r = key.field3.overflowing_add(1);
key.field3 = r.0;
if r.1 {
let r = key.field2.overflowing_add(1);
key.field2 = r.0;
if r.1 {
let r = key.field1.overflowing_add(1);
key.field1 = r.0;
assert!(!r.1);
}
}
}
}
}
key
}
pub fn from_slice(b: &[u8]) -> Self {
Key {
field1: b[0],
field2: u32::from_be_bytes(b[1..5].try_into().unwrap()),
field3: u32::from_be_bytes(b[5..9].try_into().unwrap()),
field4: u32::from_be_bytes(b[9..13].try_into().unwrap()),
field5: b[13],
field6: u32::from_be_bytes(b[14..18].try_into().unwrap()),
}
}
pub fn write_to_byte_slice(&self, buf: &mut [u8]) {
buf[0] = self.field1;
BE::write_u32(&mut buf[1..5], self.field2);
BE::write_u32(&mut buf[5..9], self.field3);
BE::write_u32(&mut buf[9..13], self.field4);
buf[13] = self.field5;
BE::write_u32(&mut buf[14..18], self.field6);
}
}
pub use pageserver_api::key::{Key, KEY_SIZE};
pub fn key_range_size(key_range: &Range<Key>) -> u32 {
let start = key_range.start;
@@ -129,49 +34,6 @@ pub fn singleton_range(key: Key) -> Range<Key> {
key..key.next()
}
impl fmt::Display for Key {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{:02X}{:08X}{:08X}{:08X}{:02X}{:08X}",
self.field1, self.field2, self.field3, self.field4, self.field5, self.field6
)
}
}
impl Key {
pub const MIN: Key = Key {
field1: u8::MIN,
field2: u32::MIN,
field3: u32::MIN,
field4: u32::MIN,
field5: u8::MIN,
field6: u32::MIN,
};
pub const MAX: Key = Key {
field1: u8::MAX,
field2: u32::MAX,
field3: u32::MAX,
field4: u32::MAX,
field5: u8::MAX,
field6: u32::MAX,
};
pub fn from_hex(s: &str) -> Result<Self> {
if s.len() != 36 {
bail!("parse error");
}
Ok(Key {
field1: u8::from_str_radix(&s[0..2], 16)?,
field2: u32::from_str_radix(&s[2..10], 16)?,
field3: u32::from_str_radix(&s[10..18], 16)?,
field4: u32::from_str_radix(&s[18..26], 16)?,
field5: u8::from_str_radix(&s[26..28], 16)?,
field6: u32::from_str_radix(&s[28..36], 16)?,
})
}
}
/// A 'value' stored for a one Key.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]

View File

@@ -2,9 +2,10 @@
//! page server.
use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
use pageserver_api::shard::TenantShardId;
use rand::{distributions::Alphanumeric, Rng};
use std::borrow::Cow;
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::ops::Deref;
use std::sync::Arc;
use std::time::{Duration, Instant};
@@ -30,6 +31,7 @@ use crate::metrics::TENANT_MANAGER as METRICS;
use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::{AttachmentMode, LocationConf, LocationMode, TenantConfOpt};
use crate::tenant::delete::DeleteTenantFlow;
use crate::tenant::span::debug_assert_current_span_has_tenant_id;
use crate::tenant::{create_tenant_files, AttachedTenantConf, SpawnMode, Tenant, TenantState};
use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, TEMP_FILE_SUFFIX};
@@ -87,10 +89,37 @@ pub(crate) enum TenantsMap {
Initializing,
/// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
/// New tenants can be added using [`tenant_map_acquire_slot`].
Open(HashMap<TenantId, TenantSlot>),
Open(BTreeMap<TenantShardId, TenantSlot>),
/// The pageserver has entered shutdown mode via [`shutdown_all_tenants`].
/// Existing tenants are still accessible, but no new tenants can be created.
ShuttingDown(HashMap<TenantId, TenantSlot>),
ShuttingDown(BTreeMap<TenantShardId, TenantSlot>),
}
/// Helper for mapping shard-unaware functions to a sharding-aware map
/// TODO(sharding): all users of this must be made shard-aware.
fn exactly_one_or_none<'a>(
map: &'a BTreeMap<TenantShardId, TenantSlot>,
tenant_id: &TenantId,
) -> Option<(&'a TenantShardId, &'a TenantSlot)> {
let mut slots = map.range(TenantShardId::tenant_range(*tenant_id));
// Retrieve the first two slots in the range: if both are populated, we must panic because the caller
// needs a shard-naive view of the world in which only one slot can exist for a TenantId at a time.
let slot_a = slots.next();
let slot_b = slots.next();
match (slot_a, slot_b) {
(None, None) => None,
(Some(slot), None) => {
// Exactly one matching slot
Some(slot)
}
(Some(_slot_a), Some(_slot_b)) => {
// Multiple shards for this tenant: cannot handle this yet.
// TODO(sharding): callers of get() should be shard-aware.
todo!("Attaching multiple shards in teh same tenant to the same pageserver")
}
(None, Some(_)) => unreachable!(),
}
}
impl TenantsMap {
@@ -101,7 +130,8 @@ impl TenantsMap {
match self {
TenantsMap::Initializing => None,
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
m.get(tenant_id).and_then(TenantSlot::get_attached)
// TODO(sharding): callers of get() should be shard-aware.
exactly_one_or_none(m, tenant_id).and_then(|(_, slot)| slot.get_attached())
}
}
}
@@ -109,7 +139,10 @@ impl TenantsMap {
pub(crate) fn remove(&mut self, tenant_id: &TenantId) -> Option<TenantSlot> {
match self {
TenantsMap::Initializing => None,
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.remove(tenant_id),
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
let key = exactly_one_or_none(m, tenant_id).map(|(k, _)| *k);
key.and_then(|key| m.remove(&key))
}
}
}
@@ -383,7 +416,7 @@ pub async fn init_tenant_mgr(
init_order: InitializationOrder,
cancel: CancellationToken,
) -> anyhow::Result<TenantManager> {
let mut tenants = HashMap::new();
let mut tenants = BTreeMap::new();
let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
@@ -404,7 +437,7 @@ pub async fn init_tenant_mgr(
warn!(%tenant_id, "Marking tenant broken, failed to {e:#}");
tenants.insert(
tenant_id,
TenantShardId::unsharded(tenant_id),
TenantSlot::Attached(Tenant::create_broken_tenant(
conf,
tenant_id,
@@ -427,7 +460,7 @@ pub async fn init_tenant_mgr(
// tenants, because they do no remote writes and hence require no
// generation number
info!(%tenant_id, "Loaded tenant in secondary mode");
tenants.insert(tenant_id, TenantSlot::Secondary);
tenants.insert(TenantShardId::unsharded(tenant_id), TenantSlot::Secondary);
}
LocationMode::Attached(_) => {
// TODO: augment re-attach API to enable the control plane to
@@ -470,7 +503,10 @@ pub async fn init_tenant_mgr(
&ctx,
) {
Ok(tenant) => {
tenants.insert(tenant.tenant_id(), TenantSlot::Attached(tenant));
tenants.insert(
TenantShardId::unsharded(tenant.tenant_id()),
TenantSlot::Attached(tenant),
);
}
Err(e) => {
error!(%tenant_id, "Failed to start tenant: {e:#}");
@@ -573,19 +609,19 @@ async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
let mut m = tenants.write().unwrap();
match &mut *m {
TenantsMap::Initializing => {
*m = TenantsMap::ShuttingDown(HashMap::default());
*m = TenantsMap::ShuttingDown(BTreeMap::default());
info!("tenants map is empty");
return;
}
TenantsMap::Open(tenants) => {
let mut shutdown_state = HashMap::new();
let mut shutdown_state = BTreeMap::new();
let mut total_in_progress = 0;
let mut total_attached = 0;
for (tenant_id, v) in tenants.drain() {
for (tenant_shard_id, v) in std::mem::take(tenants).into_iter() {
match v {
TenantSlot::Attached(t) => {
shutdown_state.insert(tenant_id, TenantSlot::Attached(t.clone()));
shutdown_state.insert(tenant_shard_id, TenantSlot::Attached(t.clone()));
join_set.spawn(
async move {
let freeze_and_flush = true;
@@ -604,13 +640,13 @@ async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
// going to log too many lines
debug!("tenant successfully stopped");
}
.instrument(info_span!("shutdown", %tenant_id)),
.instrument(info_span!("shutdown", tenant_id=%tenant_shard_id.tenant_id, shard=%tenant_shard_id.shard_slug())),
);
total_attached += 1;
}
TenantSlot::Secondary => {
shutdown_state.insert(tenant_id, TenantSlot::Secondary);
shutdown_state.insert(tenant_shard_id, TenantSlot::Secondary);
}
TenantSlot::InProgress(notify) => {
// InProgress tenants are not visible in TenantsMap::ShuttingDown: we will
@@ -690,19 +726,22 @@ async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
pub(crate) async fn create_tenant(
conf: &'static PageServerConf,
tenant_conf: TenantConfOpt,
tenant_id: TenantId,
tenant_shard_id: TenantShardId,
generation: Generation,
resources: TenantSharedResources,
ctx: &RequestContext,
) -> Result<Arc<Tenant>, TenantMapInsertError> {
let location_conf = LocationConf::attached_single(tenant_conf, generation);
let slot_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::MustNotExist)?;
let tenant_path = super::create_tenant_files(conf, &location_conf, &tenant_id).await?;
let slot_guard =
tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustNotExist)?;
// TODO(sharding): make local paths shard-aware
let tenant_path =
super::create_tenant_files(conf, &location_conf, &tenant_shard_id.tenant_id).await?;
let created_tenant = tenant_spawn(
conf,
tenant_id,
tenant_shard_id.tenant_id,
&tenant_path,
resources,
AttachedTenantConf::try_from(location_conf)?,
@@ -715,11 +754,7 @@ pub(crate) async fn create_tenant(
// See https://github.com/neondatabase/neon/issues/4233
let created_tenant_id = created_tenant.tenant_id();
if tenant_id != created_tenant_id {
return Err(TenantMapInsertError::Other(anyhow::anyhow!(
"loaded created tenant has unexpected tenant id (expect {tenant_id} != actual {created_tenant_id})",
)));
}
debug_assert_eq!(created_tenant_id, tenant_shard_id.tenant_id);
slot_guard.upsert(TenantSlot::Attached(created_tenant.clone()))?;
@@ -755,21 +790,70 @@ pub(crate) async fn set_new_tenant_config(
}
impl TenantManager {
#[instrument(skip_all, fields(%tenant_id))]
/// Gets the attached tenant from the in-memory data, erroring if it's absent, in secondary mode, or is not fitting to the query.
/// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
///
/// This method is cancel-safe.
pub(crate) fn get_attached_tenant_shard(
&self,
tenant_shard_id: TenantShardId,
active_only: bool,
) -> Result<Arc<Tenant>, GetTenantError> {
let locked = self.tenants.read().unwrap();
let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)?;
match peek_slot {
Some(TenantSlot::Attached(tenant)) => match tenant.current_state() {
TenantState::Broken {
reason,
backtrace: _,
} if active_only => Err(GetTenantError::Broken(reason)),
TenantState::Active => Ok(Arc::clone(tenant)),
_ => {
if active_only {
Err(GetTenantError::NotActive(tenant_shard_id.tenant_id))
} else {
Ok(Arc::clone(tenant))
}
}
},
Some(TenantSlot::InProgress(_)) => {
Err(GetTenantError::NotActive(tenant_shard_id.tenant_id))
}
None | Some(TenantSlot::Secondary) => {
Err(GetTenantError::NotFound(tenant_shard_id.tenant_id))
}
}
}
pub(crate) async fn delete_timeline(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
_ctx: &RequestContext,
) -> Result<(), DeleteTimelineError> {
let tenant = self.get_attached_tenant_shard(tenant_shard_id, true)?;
DeleteTimelineFlow::run(&tenant, timeline_id, false).await?;
Ok(())
}
pub(crate) async fn upsert_location(
&self,
tenant_id: TenantId,
tenant_shard_id: TenantShardId,
new_location_config: LocationConf,
ctx: &RequestContext,
) -> Result<(), anyhow::Error> {
info!("configuring tenant location {tenant_id} to state {new_location_config:?}");
debug_assert_current_span_has_tenant_id();
info!("configuring tenant location to state {new_location_config:?}");
// Special case fast-path for updates to Tenant: if our upsert is only updating configuration,
// then we do not need to set the slot to InProgress, we can just call into the
// existng tenant.
{
let locked = self.tenants.read().unwrap();
let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, TenantSlotPeekMode::Write)?;
let peek_slot =
tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Write)?;
match (&new_location_config.mode, peek_slot) {
(LocationMode::Attached(attach_conf), Some(TenantSlot::Attached(tenant))) => {
if attach_conf.generation == tenant.generation {
@@ -800,7 +884,7 @@ impl TenantManager {
// the tenant is inaccessible to the outside world while we are doing this, but that is sensible:
// the state is ill-defined while we're in transition. Transitions are async, but fast: we do
// not do significant I/O, and shutdowns should be prompt via cancellation tokens.
let mut slot_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::Any)?;
let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
if let Some(TenantSlot::Attached(tenant)) = slot_guard.get_old_value() {
// The case where we keep a Tenant alive was covered above in the special case
@@ -831,25 +915,31 @@ impl TenantManager {
slot_guard.drop_old_value().expect("We just shut it down");
}
let tenant_path = self.conf.tenant_path(&tenant_id);
// TODO(sharding): make local paths sharding-aware
let tenant_path = self.conf.tenant_path(&tenant_shard_id.tenant_id);
let new_slot = match &new_location_config.mode {
LocationMode::Secondary(_) => {
let tenant_path = self.conf.tenant_path(&tenant_id);
// Directory doesn't need to be fsync'd because if we crash it can
// safely be recreated next time this tenant location is configured.
unsafe_create_dir_all(&tenant_path)
.await
.with_context(|| format!("Creating {tenant_path}"))?;
Tenant::persist_tenant_config(self.conf, &tenant_id, &new_location_config)
.await
.map_err(SetNewTenantConfigError::Persist)?;
// TODO(sharding): make local paths sharding-aware
Tenant::persist_tenant_config(
self.conf,
&tenant_shard_id.tenant_id,
&new_location_config,
)
.await
.map_err(SetNewTenantConfigError::Persist)?;
TenantSlot::Secondary
}
LocationMode::Attached(_attach_config) => {
let timelines_path = self.conf.timelines_path(&tenant_id);
// TODO(sharding): make local paths sharding-aware
let timelines_path = self.conf.timelines_path(&tenant_shard_id.tenant_id);
// Directory doesn't need to be fsync'd because we do not depend on
// it to exist after crashes: it may be recreated when tenant is
@@ -858,13 +948,19 @@ impl TenantManager {
.await
.with_context(|| format!("Creating {timelines_path}"))?;
Tenant::persist_tenant_config(self.conf, &tenant_id, &new_location_config)
.await
.map_err(SetNewTenantConfigError::Persist)?;
// TODO(sharding): make local paths sharding-aware
Tenant::persist_tenant_config(
self.conf,
&tenant_shard_id.tenant_id,
&new_location_config,
)
.await
.map_err(SetNewTenantConfigError::Persist)?;
// TODO(sharding): make spawn sharding-aware
let tenant = tenant_spawn(
self.conf,
tenant_id,
tenant_shard_id.tenant_id,
&tenant_path,
self.resources.clone(),
AttachedTenantConf::try_from(new_location_config)?,
@@ -910,7 +1006,11 @@ pub(crate) fn get_tenant(
active_only: bool,
) -> Result<Arc<Tenant>, GetTenantError> {
let locked = TENANTS.read().unwrap();
let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, TenantSlotPeekMode::Read)?;
// TODO(sharding): make all callers of get_tenant shard-aware
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)?;
match peek_slot {
Some(TenantSlot::Attached(tenant)) => match tenant.current_state() {
@@ -970,12 +1070,16 @@ pub(crate) async fn get_active_tenant_with_timeout(
Tenant(Arc<Tenant>),
}
// TODO(sharding): make page service interface sharding-aware (page service should apply ShardIdentity to the key
// to decide which shard services the request)
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
let wait_start = Instant::now();
let deadline = wait_start + timeout;
let wait_for = {
let locked = TENANTS.read().unwrap();
let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, TenantSlotPeekMode::Read)
let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
.map_err(GetTenantError::MapState)?;
match peek_slot {
Some(TenantSlot::Attached(tenant)) => {
@@ -1019,8 +1123,9 @@ pub(crate) async fn get_active_tenant_with_timeout(
})?;
{
let locked = TENANTS.read().unwrap();
let peek_slot = tenant_map_peek_slot(&locked, &tenant_id, TenantSlotPeekMode::Read)
.map_err(GetTenantError::MapState)?;
let peek_slot =
tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
.map_err(GetTenantError::MapState)?;
match peek_slot {
Some(TenantSlot::Attached(tenant)) => tenant.clone(),
_ => {
@@ -1062,7 +1167,7 @@ pub(crate) async fn get_active_tenant_with_timeout(
pub(crate) async fn delete_tenant(
conf: &'static PageServerConf,
remote_storage: Option<GenericRemoteStorage>,
tenant_id: TenantId,
tenant_shard_id: TenantShardId,
) -> Result<(), DeleteTenantError> {
// We acquire a SlotGuard during this function to protect against concurrent
// changes while the ::prepare phase of DeleteTenantFlow executes, but then
@@ -1075,7 +1180,9 @@ pub(crate) async fn delete_tenant(
//
// See https://github.com/neondatabase/neon/issues/5080
let mut slot_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::MustExist)?;
// TODO(sharding): make delete API sharding-aware
let mut slot_guard =
tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustExist)?;
// unwrap is safe because we used MustExist mode when acquiring
let tenant = match slot_guard.get_old_value().as_ref().unwrap() {
@@ -1102,16 +1209,6 @@ pub(crate) enum DeleteTimelineError {
Timeline(#[from] crate::tenant::DeleteTimelineError),
}
pub(crate) async fn delete_timeline(
tenant_id: TenantId,
timeline_id: TimelineId,
_ctx: &RequestContext,
) -> Result<(), DeleteTimelineError> {
let tenant = get_tenant(tenant_id, true)?;
DeleteTimelineFlow::run(&tenant, timeline_id, false).await?;
Ok(())
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum TenantStateError {
#[error("Tenant {0} is stopping")]
@@ -1126,14 +1223,14 @@ pub(crate) enum TenantStateError {
pub(crate) async fn detach_tenant(
conf: &'static PageServerConf,
tenant_id: TenantId,
tenant_shard_id: TenantShardId,
detach_ignored: bool,
deletion_queue_client: &DeletionQueueClient,
) -> Result<(), TenantStateError> {
let tmp_path = detach_tenant0(
conf,
&TENANTS,
tenant_id,
tenant_shard_id,
detach_ignored,
deletion_queue_client,
)
@@ -1160,19 +1257,24 @@ pub(crate) async fn detach_tenant(
async fn detach_tenant0(
conf: &'static PageServerConf,
tenants: &std::sync::RwLock<TenantsMap>,
tenant_id: TenantId,
tenant_shard_id: TenantShardId,
detach_ignored: bool,
deletion_queue_client: &DeletionQueueClient,
) -> Result<Utf8PathBuf, TenantStateError> {
let tenant_dir_rename_operation = |tenant_id_to_clean| async move {
let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean);
let tenant_dir_rename_operation = |tenant_id_to_clean: TenantShardId| async move {
// TODO(sharding): make local path helpers shard-aware
let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean.tenant_id);
safe_rename_tenant_dir(&local_tenant_directory)
.await
.with_context(|| format!("local tenant directory {local_tenant_directory:?} rename"))
};
let removal_result =
remove_tenant_from_memory(tenants, tenant_id, tenant_dir_rename_operation(tenant_id)).await;
let removal_result = remove_tenant_from_memory(
tenants,
tenant_shard_id,
tenant_dir_rename_operation(tenant_shard_id),
)
.await;
// Flush pending deletions, so that they have a good chance of passing validation
// before this tenant is potentially re-attached elsewhere.
@@ -1186,12 +1288,15 @@ async fn detach_tenant0(
Err(TenantStateError::SlotError(TenantSlotError::NotFound(_)))
)
{
let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id);
// TODO(sharding): make local paths sharding-aware
let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_shard_id.tenant_id);
if tenant_ignore_mark.exists() {
info!("Detaching an ignored tenant");
let tmp_path = tenant_dir_rename_operation(tenant_id)
let tmp_path = tenant_dir_rename_operation(tenant_shard_id)
.await
.with_context(|| format!("Ignored tenant {tenant_id} local directory rename"))?;
.with_context(|| {
format!("Ignored tenant {tenant_shard_id} local directory rename")
})?;
return Ok(tmp_path);
}
}
@@ -1208,7 +1313,11 @@ pub(crate) async fn load_tenant(
deletion_queue_client: DeletionQueueClient,
ctx: &RequestContext,
) -> Result<(), TenantMapInsertError> {
let slot_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::MustNotExist)?;
// This is a legacy API (replaced by `/location_conf`). It does not support sharding
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
let slot_guard =
tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustNotExist)?;
let tenant_path = conf.tenant_path(&tenant_id);
let tenant_ignore_mark = conf.tenant_ignore_mark_file_path(&tenant_id);
@@ -1261,7 +1370,10 @@ async fn ignore_tenant0(
tenants: &std::sync::RwLock<TenantsMap>,
tenant_id: TenantId,
) -> Result<(), TenantStateError> {
remove_tenant_from_memory(tenants, tenant_id, async {
// This is a legacy API (replaced by `/location_conf`). It does not support sharding
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
remove_tenant_from_memory(tenants, tenant_shard_id, async {
let ignore_mark_file = conf.tenant_ignore_mark_file_path(&tenant_id);
fs::File::create(&ignore_mark_file)
.await
@@ -1270,7 +1382,7 @@ async fn ignore_tenant0(
crashsafe::fsync_file_and_parent(&ignore_mark_file)
.context("Failed to fsync ignore mark file")
})
.with_context(|| format!("Failed to crate ignore mark for tenant {tenant_id}"))?;
.with_context(|| format!("Failed to crate ignore mark for tenant {tenant_shard_id}"))?;
Ok(())
})
.await
@@ -1293,10 +1405,12 @@ pub(crate) async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, Tenan
};
Ok(m.iter()
.filter_map(|(id, tenant)| match tenant {
TenantSlot::Attached(tenant) => Some((*id, tenant.current_state())),
TenantSlot::Attached(tenant) => Some((id, tenant.current_state())),
TenantSlot::Secondary => None,
TenantSlot::InProgress(_) => None,
})
// TODO(sharding): make callers of this function shard-aware
.map(|(k, v)| (k.tenant_id, v))
.collect())
}
@@ -1312,7 +1426,11 @@ pub(crate) async fn attach_tenant(
resources: TenantSharedResources,
ctx: &RequestContext,
) -> Result<(), TenantMapInsertError> {
let slot_guard = tenant_map_acquire_slot(&tenant_id, TenantSlotAcquireMode::MustNotExist)?;
// This is a legacy API (replaced by `/location_conf`). It does not support sharding
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
let slot_guard =
tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustNotExist)?;
let location_conf = LocationConf::attached_single(tenant_conf, generation);
let tenant_dir = create_tenant_files(conf, &location_conf, &tenant_id).await?;
// TODO: tenant directory remains on disk if we bail out from here on.
@@ -1359,14 +1477,14 @@ pub(crate) enum TenantMapInsertError {
pub enum TenantSlotError {
/// When acquiring a slot with the expectation that the tenant already exists.
#[error("Tenant {0} not found")]
NotFound(TenantId),
NotFound(TenantShardId),
/// When acquiring a slot with the expectation that the tenant does not already exist.
#[error("tenant {0} already exists, state: {1:?}")]
AlreadyExists(TenantId, TenantState),
AlreadyExists(TenantShardId, TenantState),
#[error("tenant {0} already exists in but is not attached")]
Conflict(TenantId),
Conflict(TenantShardId),
// Tried to read a slot that is currently being mutated by another administrative
// operation.
@@ -1428,7 +1546,7 @@ pub enum TenantMapError {
/// `drop_old_value`. It is an error to call this without shutting down
/// the conents of `old_value`.
pub struct SlotGuard {
tenant_id: TenantId,
tenant_shard_id: TenantShardId,
old_value: Option<TenantSlot>,
upserted: bool,
@@ -1439,12 +1557,12 @@ pub struct SlotGuard {
impl SlotGuard {
fn new(
tenant_id: TenantId,
tenant_shard_id: TenantShardId,
old_value: Option<TenantSlot>,
completion: utils::completion::Completion,
) -> Self {
Self {
tenant_id,
tenant_shard_id,
old_value,
upserted: false,
_completion: completion,
@@ -1487,7 +1605,7 @@ impl SlotGuard {
TenantsMap::Open(m) => m,
};
let replaced = m.insert(self.tenant_id, new_value);
let replaced = m.insert(self.tenant_shard_id, new_value);
self.upserted = true;
METRICS.tenant_slots.set(m.len() as u64);
@@ -1506,7 +1624,7 @@ impl SlotGuard {
None => {
METRICS.unexpected_errors.inc();
error!(
tenant_id = %self.tenant_id,
tenant_shard_id = %self.tenant_shard_id,
"Missing InProgress marker during tenant upsert, this is a bug."
);
Err(TenantSlotUpsertError::InternalError(
@@ -1515,7 +1633,7 @@ impl SlotGuard {
}
Some(slot) => {
METRICS.unexpected_errors.inc();
error!(tenant_id=%self.tenant_id, "Unexpected contents of TenantSlot during upsert, this is a bug. Contents: {:?}", slot);
error!(tenant_shard_id=%self.tenant_shard_id, "Unexpected contents of TenantSlot during upsert, this is a bug. Contents: {:?}", slot);
Err(TenantSlotUpsertError::InternalError(
"Unexpected contents of TenantSlot".into(),
))
@@ -1593,12 +1711,12 @@ impl Drop for SlotGuard {
TenantsMap::Open(m) => m,
};
use std::collections::hash_map::Entry;
match m.entry(self.tenant_id) {
use std::collections::btree_map::Entry;
match m.entry(self.tenant_shard_id) {
Entry::Occupied(mut entry) => {
if !matches!(entry.get(), TenantSlot::InProgress(_)) {
METRICS.unexpected_errors.inc();
error!(tenant_id=%self.tenant_id, "Unexpected contents of TenantSlot during drop, this is a bug. Contents: {:?}", entry.get());
error!(tenant_shard_id=%self.tenant_shard_id, "Unexpected contents of TenantSlot during drop, this is a bug. Contents: {:?}", entry.get());
}
if self.old_value_is_shutdown() {
@@ -1610,7 +1728,7 @@ impl Drop for SlotGuard {
Entry::Vacant(_) => {
METRICS.unexpected_errors.inc();
error!(
tenant_id = %self.tenant_id,
tenant_shard_id = %self.tenant_shard_id,
"Missing InProgress marker during SlotGuard drop, this is a bug."
);
}
@@ -1629,7 +1747,7 @@ enum TenantSlotPeekMode {
fn tenant_map_peek_slot<'a>(
tenants: &'a std::sync::RwLockReadGuard<'a, TenantsMap>,
tenant_id: &TenantId,
tenant_shard_id: &TenantShardId,
mode: TenantSlotPeekMode,
) -> Result<Option<&'a TenantSlot>, TenantMapError> {
let m = match tenants.deref() {
@@ -1643,7 +1761,7 @@ fn tenant_map_peek_slot<'a>(
TenantsMap::Open(m) => m,
};
Ok(m.get(tenant_id))
Ok(m.get(tenant_shard_id))
}
enum TenantSlotAcquireMode {
@@ -1656,14 +1774,14 @@ enum TenantSlotAcquireMode {
}
fn tenant_map_acquire_slot(
tenant_id: &TenantId,
tenant_shard_id: &TenantShardId,
mode: TenantSlotAcquireMode,
) -> Result<SlotGuard, TenantSlotError> {
tenant_map_acquire_slot_impl(tenant_id, &TENANTS, mode)
tenant_map_acquire_slot_impl(tenant_shard_id, &TENANTS, mode)
}
fn tenant_map_acquire_slot_impl(
tenant_id: &TenantId,
tenant_shard_id: &TenantShardId,
tenants: &std::sync::RwLock<TenantsMap>,
mode: TenantSlotAcquireMode,
) -> Result<SlotGuard, TenantSlotError> {
@@ -1671,7 +1789,7 @@ fn tenant_map_acquire_slot_impl(
METRICS.tenant_slot_writes.inc();
let mut locked = tenants.write().unwrap();
let span = tracing::info_span!("acquire_slot", %tenant_id);
let span = tracing::info_span!("acquire_slot", tenant_id=%tenant_shard_id.tenant_id, shard=tenant_shard_id.shard_slug());
let _guard = span.enter();
let m = match &mut *locked {
@@ -1680,19 +1798,21 @@ fn tenant_map_acquire_slot_impl(
TenantsMap::Open(m) => m,
};
use std::collections::hash_map::Entry;
let entry = m.entry(*tenant_id);
use std::collections::btree_map::Entry;
let entry = m.entry(*tenant_shard_id);
match entry {
Entry::Vacant(v) => match mode {
MustExist => {
tracing::debug!("Vacant && MustExist: return NotFound");
Err(TenantSlotError::NotFound(*tenant_id))
Err(TenantSlotError::NotFound(*tenant_shard_id))
}
_ => {
let (completion, barrier) = utils::completion::channel();
v.insert(TenantSlot::InProgress(barrier));
tracing::debug!("Vacant, inserted InProgress");
Ok(SlotGuard::new(*tenant_id, None, completion))
Ok(SlotGuard::new(*tenant_shard_id, None, completion))
}
},
Entry::Occupied(mut o) => {
@@ -1706,7 +1826,7 @@ fn tenant_map_acquire_slot_impl(
TenantSlot::Attached(tenant) => {
tracing::debug!("Attached && MustNotExist, return AlreadyExists");
Err(TenantSlotError::AlreadyExists(
*tenant_id,
*tenant_shard_id,
tenant.current_state(),
))
}
@@ -1715,7 +1835,7 @@ fn tenant_map_acquire_slot_impl(
// to get the state from
tracing::debug!("Occupied & MustNotExist, return AlreadyExists");
Err(TenantSlotError::AlreadyExists(
*tenant_id,
*tenant_shard_id,
TenantState::Broken {
reason: "Present but not attached".to_string(),
backtrace: "".to_string(),
@@ -1728,7 +1848,11 @@ fn tenant_map_acquire_slot_impl(
let (completion, barrier) = utils::completion::channel();
let old_value = o.insert(TenantSlot::InProgress(barrier));
tracing::debug!("Occupied, replaced with InProgress");
Ok(SlotGuard::new(*tenant_id, Some(old_value), completion))
Ok(SlotGuard::new(
*tenant_shard_id,
Some(old_value),
completion,
))
}
}
}
@@ -1741,7 +1865,7 @@ fn tenant_map_acquire_slot_impl(
/// operation would be needed to remove it.
async fn remove_tenant_from_memory<V, F>(
tenants: &std::sync::RwLock<TenantsMap>,
tenant_id: TenantId,
tenant_shard_id: TenantShardId,
tenant_cleanup: F,
) -> Result<V, TenantStateError>
where
@@ -1750,7 +1874,7 @@ where
use utils::completion;
let mut slot_guard =
tenant_map_acquire_slot_impl(&tenant_id, tenants, TenantSlotAcquireMode::MustExist)?;
tenant_map_acquire_slot_impl(&tenant_shard_id, tenants, TenantSlotAcquireMode::MustExist)?;
// The SlotGuard allows us to manipulate the Tenant object without fear of some
// concurrent API request doing something else for the same tenant ID.
@@ -1777,7 +1901,7 @@ where
// if pageserver shutdown or other detach/ignore is already ongoing, we don't want to
// wait for it but return an error right away because these are distinct requests.
slot_guard.revert();
return Err(TenantStateError::IsStopping(tenant_id));
return Err(TenantStateError::IsStopping(tenant_shard_id.tenant_id));
}
}
}
@@ -1788,7 +1912,7 @@ where
match tenant_cleanup
.await
.with_context(|| format!("Failed to run cleanup for tenant {tenant_id}"))
.with_context(|| format!("Failed to run cleanup for tenant {tenant_shard_id}"))
{
Ok(hook_value) => {
// Success: drop the old TenantSlot::Attached.
@@ -1867,7 +1991,8 @@ pub(crate) async fn immediate_gc(
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use pageserver_api::shard::TenantShardId;
use std::collections::BTreeMap;
use std::sync::Arc;
use tracing::{info_span, Instrument};
@@ -1887,12 +2012,12 @@ mod tests {
// harness loads it to active, which is forced and nothing is running on the tenant
let id = t.tenant_id();
let id = TenantShardId::unsharded(t.tenant_id());
// tenant harness configures the logging and we cannot escape it
let _e = info_span!("testing", tenant_id = %id).entered();
let tenants = HashMap::from([(id, TenantSlot::Attached(t.clone()))]);
let tenants = BTreeMap::from([(id, TenantSlot::Attached(t.clone()))]);
let tenants = Arc::new(std::sync::RwLock::new(TenantsMap::Open(tenants)));
// Invoke remove_tenant_from_memory with a cleanup hook that blocks until we manually

View File

@@ -68,7 +68,7 @@ webpki-roots.workspace = true
x509-parser.workspace = true
native-tls.workspace = true
postgres-native-tls.workspace = true
biscuit = { version = "0.7",features = [] }
workspace_hack.workspace = true
tokio-util.workspace = true

View File

@@ -3,8 +3,10 @@ mod hacks;
mod link;
pub use link::LinkAuthError;
use serde::{Deserialize, Serialize};
use tokio_postgres::config::AuthKeys;
use crate::console::provider::neon::UserRowLevel;
use crate::proxy::{handle_try_wake, retry_after, LatencyTimer};
use crate::{
auth::{self, ClientCredentials},
@@ -319,4 +321,41 @@ impl BackendType<'_, ClientCredentials<'_>> {
Test(x) => x.wake_compute().map(Some),
}
}
/// Get the password for the RLS user
pub async fn ensure_row_level(
&self,
extra: &ConsoleReqExtra<'_>,
dbname: String,
username: String,
policies: Vec<Policy>,
) -> anyhow::Result<UserRowLevel> {
use BackendType::*;
match self {
Console(api, creds) => {
api.ensure_row_level(extra, creds, dbname, username, policies)
.await
}
Postgres(api, creds) => {
api.ensure_row_level(extra, creds, dbname, username, policies)
.await
}
Link(_) => Err(anyhow::anyhow!("not on link")),
Test(_) => Err(anyhow::anyhow!("not on test")),
}
}
}
// TODO(conrad): policies can be quite complex. Figure out how to configure this
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct Policy {
pub table: String,
pub column: String,
}
// enum PolicyType {
// ForSelect(),
// ForUpdate()
// }

View File

@@ -1,9 +1,11 @@
pub mod mock;
pub mod neon;
use self::neon::UserRowLevel;
use super::messages::MetricsAuxInfo;
use crate::{
auth::ClientCredentials,
auth::{backend::Policy, ClientCredentials},
cache::{timed_lru, TimedLru},
compute, scram,
};
@@ -248,6 +250,16 @@ pub trait Api {
extra: &ConsoleReqExtra<'_>,
creds: &ClientCredentials,
) -> Result<CachedNodeInfo, errors::WakeComputeError>;
/// Get the password for the RLS user
async fn ensure_row_level(
&self,
extra: &ConsoleReqExtra<'_>,
creds: &ClientCredentials,
dbname: String,
username: String,
policies: Vec<Policy>,
) -> anyhow::Result<UserRowLevel>;
}
/// Various caches for [`console`](super).

View File

@@ -2,9 +2,16 @@
use super::{
errors::{ApiError, GetAuthInfoError, WakeComputeError},
neon::UserRowLevel,
AuthInfo, CachedNodeInfo, ConsoleReqExtra, NodeInfo,
};
use crate::{auth::ClientCredentials, compute, error::io_error, scram, url::ApiUrl};
use crate::{
auth::{backend::Policy, ClientCredentials},
compute,
error::io_error,
scram,
url::ApiUrl,
};
use async_trait::async_trait;
use futures::TryFutureExt;
use thiserror::Error;
@@ -121,6 +128,18 @@ impl super::Api for Api {
.map_ok(CachedNodeInfo::new_uncached)
.await
}
/// Get the password for the RLS user
async fn ensure_row_level(
&self,
_extra: &ConsoleReqExtra<'_>,
_creds: &ClientCredentials,
_dbname: String,
_username: String,
_policies: Vec<Policy>,
) -> anyhow::Result<UserRowLevel> {
Err(anyhow::anyhow!("unimplemented"))
}
}
fn parse_md5(input: &str) -> Option<[u8; 16]> {

View File

@@ -5,9 +5,13 @@ use super::{
errors::{ApiError, GetAuthInfoError, WakeComputeError},
ApiCaches, ApiLocks, AuthInfo, CachedNodeInfo, ConsoleReqExtra, NodeInfo,
};
use crate::{auth::ClientCredentials, compute, http, scram};
use crate::{
auth::{backend::Policy, ClientCredentials},
compute, http, scram,
};
use async_trait::async_trait;
use futures::TryFutureExt;
use serde::{Deserialize, Serialize};
use std::{net::SocketAddr, sync::Arc};
use tokio::time::Instant;
use tokio_postgres::config::SslMode;
@@ -139,6 +143,84 @@ impl Api {
.instrument(info_span!("http", id = request_id))
.await
}
async fn do_ensure_row_level(
&self,
extra: &ConsoleReqExtra<'_>,
creds: &ClientCredentials<'_>,
dbname: String,
username: String,
policies: Vec<Policy>,
) -> anyhow::Result<UserRowLevel> {
let project = creds.project().expect("impossible");
let request_id = uuid::Uuid::new_v4().to_string();
async {
let request = self
.endpoint
.post("proxy_ensure_role_level_sec")
.header("X-Request-ID", &request_id)
.header("Authorization", format!("Bearer {}", &self.jwt))
.query(&[("session_id", extra.session_id)])
// .query(&[
// ("application_name", extra.application_name),
// ("project", Some(project)),
// ("dbname", Some(&dbname)),
// ("username", Some(&username)),
// ("options", extra.options),
// ])
.json(&EnsureRowLevelReq {
project: project.to_owned(),
targets: policies
.into_iter()
.map(|p| Target {
database_name: dbname.clone(),
table_name: p.table,
row_level_user_id: username.clone(),
role_name: "enduser".to_owned(),
column_name: p.column,
})
.collect(),
})
.build()?;
info!(url = request.url().as_str(), "sending http request");
let start = Instant::now();
let response = self.endpoint.execute(request).await?;
info!(duration = ?start.elapsed(), "received http response");
let mut body = parse_body::<UserRowLevel>(response).await?;
// hack
body.username = body.username.to_lowercase();
// info!(user = %body.username, pw=%body.password, "please don't merge this in production");
Ok(body)
}
.map_err(crate::error::log_error)
.instrument(info_span!("http", id = request_id))
.await
}
}
#[derive(Serialize)]
struct EnsureRowLevelReq {
project: String,
targets: Vec<Target>,
}
#[derive(Serialize)]
struct Target {
database_name: String,
table_name: String,
row_level_user_id: String,
role_name: String,
column_name: String,
}
#[derive(Deserialize)]
pub struct UserRowLevel {
pub username: String,
pub password: String,
}
#[async_trait]
@@ -188,6 +270,20 @@ impl super::Api for Api {
Ok(cached)
}
/// Get the password for the RLS user
#[tracing::instrument(skip_all)]
async fn ensure_row_level(
&self,
extra: &ConsoleReqExtra<'_>,
creds: &ClientCredentials,
dbname: String,
username: String,
policies: Vec<Policy>,
) -> anyhow::Result<UserRowLevel> {
self.do_ensure_row_level(extra, creds, dbname, username, policies)
.await
}
}
/// Parse http response body, taking status code into account.

View File

@@ -88,6 +88,14 @@ impl Endpoint {
self.client.get(url.into_inner())
}
/// Return a [builder](RequestBuilder) for a `POST` request,
/// appending a single `path` segment to the base endpoint URL.
pub fn post(&self, path: &str) -> RequestBuilder {
let mut url = self.endpoint.clone();
url.path_segments_mut().push(path);
self.client.post(url.into_inner())
}
/// Execute a [request](reqwest::Request).
pub async fn execute(&self, request: Request) -> Result<Response, Error> {
self.client.execute(request).await

View File

@@ -3,10 +3,12 @@
//! Handles both SQL over HTTP and SQL over Websockets.
mod conn_pool;
pub mod jwt_auth;
mod sql_over_http;
mod websocket;
use anyhow::bail;
use dashmap::DashMap;
use hyper::StatusCode;
pub use reqwest_middleware::{ClientWithMiddleware, Error};
pub use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
@@ -31,6 +33,8 @@ use tokio_util::sync::CancellationToken;
use tracing::{error, info, info_span, warn, Instrument};
use utils::http::{error::ApiError, json::json_response};
use self::jwt_auth::JWKSetCaches;
pub async fn task_main(
config: &'static ProxyConfig,
ws_listener: TcpListener,
@@ -41,6 +45,9 @@ pub async fn task_main(
}
let conn_pool = conn_pool::GlobalConnPool::new(config);
let jwk_cache_pool = Arc::new(JWKSetCaches {
map: DashMap::new(),
});
// shutdown the connection pool
tokio::spawn({
@@ -85,6 +92,7 @@ pub async fn task_main(
let remote_addr = io.inner.remote_addr();
let sni_name = tls.server_name().map(|s| s.to_string());
let conn_pool = conn_pool.clone();
let jwk_cache_pool = jwk_cache_pool.clone();
async move {
let peer_addr = match client_addr {
@@ -96,13 +104,20 @@ pub async fn task_main(
move |req: Request<Body>| {
let sni_name = sni_name.clone();
let conn_pool = conn_pool.clone();
let jwk_cache_pool = jwk_cache_pool.clone();
async move {
let cancel_map = Arc::new(CancelMap::default());
let session_id = uuid::Uuid::new_v4();
request_handler(
req, config, conn_pool, cancel_map, session_id, sni_name,
req,
config,
conn_pool,
jwk_cache_pool,
cancel_map,
session_id,
sni_name,
)
.instrument(info_span!(
"serverless",
@@ -167,6 +182,7 @@ async fn request_handler(
mut request: Request<Body>,
config: &'static ProxyConfig,
conn_pool: Arc<conn_pool::GlobalConnPool>,
jwk_cache_pool: Arc<JWKSetCaches>,
cancel_map: Arc<CancelMap>,
session_id: uuid::Uuid,
sni_hostname: Option<String>,
@@ -204,6 +220,7 @@ async fn request_handler(
request,
sni_hostname,
conn_pool,
jwk_cache_pool,
session_id,
&config.http_config,
)
@@ -214,7 +231,7 @@ async fn request_handler(
.header("Access-Control-Allow-Origin", "*")
.header(
"Access-Control-Allow-Headers",
"Neon-Connection-String, Neon-Raw-Text-Output, Neon-Array-Mode, Neon-Pool-Opt-In",
"Neon-Connection-String, Neon-Raw-Text-Output, Neon-Array-Mode, Neon-Pool-Opt-In, Authorization",
)
.header("Access-Control-Max-Age", "86400" /* 24 hours */)
.status(StatusCode::OK) // 204 is also valid, but see: https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods/OPTIONS#status_code

View File

@@ -21,7 +21,8 @@ use tokio::time;
use tokio_postgres::{AsyncMessage, ReadyForQueryStatus};
use crate::{
auth, console,
auth::{self, backend::Policy},
console::{self, provider::neon::UserRowLevel},
proxy::{
neon_options, LatencyTimer, NUM_DB_CONNECTIONS_CLOSED_COUNTER,
NUM_DB_CONNECTIONS_OPENED_COUNTER,
@@ -45,6 +46,8 @@ pub struct ConnInfo {
pub hostname: String,
pub password: String,
pub options: Option<String>,
/// row level security mode enabled
pub policies: Option<Vec<Policy>>,
}
impl ConnInfo {
@@ -365,6 +368,7 @@ struct TokioMechanism<'a> {
conn_info: &'a ConnInfo,
session_id: uuid::Uuid,
conn_id: uuid::Uuid,
row_level: Option<UserRowLevel>,
}
#[async_trait]
@@ -384,6 +388,7 @@ impl ConnectMechanism for TokioMechanism<'_> {
timeout,
self.conn_id,
self.session_id,
&self.row_level,
)
.await
}
@@ -431,11 +436,26 @@ async fn connect_to_compute(
.await?
.context("missing cache entry from wake_compute")?;
let mut row_level = None;
if let Some(policies) = &conn_info.policies {
row_level = Some(
creds
.ensure_row_level(
&extra,
conn_info.dbname.to_owned(),
conn_info.username.to_owned(),
policies.clone(),
)
.await?,
);
}
crate::proxy::connect_to_compute(
&TokioMechanism {
conn_id,
conn_info,
session_id,
row_level,
},
node_info,
&extra,
@@ -451,12 +471,24 @@ async fn connect_to_compute_once(
timeout: time::Duration,
conn_id: uuid::Uuid,
mut session: uuid::Uuid,
row_level: &Option<UserRowLevel>,
) -> Result<ClientInner, tokio_postgres::Error> {
let mut config = (*node_info.config).clone();
let username = row_level
.as_ref()
.map(|r| &r.username)
.unwrap_or(&conn_info.username);
info!(%username, dbname = %conn_info.dbname, "connecting");
let (client, mut connection) = config
.user(&conn_info.username)
.password(&conn_info.password)
.user(username)
.password(
row_level
.as_ref()
.map(|r| &r.password)
.unwrap_or(&conn_info.password),
)
.dbname(&conn_info.dbname)
.connect_timeout(timeout)
.connect(tokio_postgres::NoTls)

View File

@@ -0,0 +1,98 @@
// https://adapted-gorilla-88.clerk.accounts.dev/.well-known/jwks.json
use std::sync::Arc;
use anyhow::{bail, Context};
use biscuit::{
jwk::{JWKSet, JWK},
jws, CompactPart,
};
use dashmap::DashMap;
use reqwest::{IntoUrl, Url};
use serde::{de::DeserializeOwned, Serialize};
use tokio::sync::RwLock;
pub struct JWKSetCaches {
pub map: DashMap<Url, Arc<JWKSetCache>>,
}
impl JWKSetCaches {
pub async fn get_cache(&self, url: impl IntoUrl) -> anyhow::Result<Arc<JWKSetCache>> {
let url = url.into_url()?;
if let Some(x) = self.map.get(&url) {
return Ok(x.clone());
}
let cache = JWKSetCache::new(url.clone()).await?;
let cache = Arc::new(cache);
self.map.insert(url, cache.clone());
Ok(cache)
}
}
pub struct JWKSetCache {
url: Url,
current: RwLock<biscuit::jwk::JWKSet<()>>,
}
impl JWKSetCache {
pub async fn new(url: impl IntoUrl) -> anyhow::Result<Self> {
let url = url.into_url()?;
let current = reqwest::get(url.clone()).await?.json().await?;
Ok(Self {
url,
current: RwLock::new(current),
})
}
pub async fn get(&self, kid: &str) -> anyhow::Result<JWK<()>> {
let current = self.current.read().await.clone();
if let Some(key) = current.find(kid) {
return Ok(key.clone());
}
let new = reqwest::get(self.url.clone()).await?.json().await?;
if new == current {
bail!("not found")
}
*self.current.write().await = new;
current.find(kid).cloned().context("not found")
}
pub async fn decode<T, H>(
&self,
token: &jws::Compact<T, H>,
) -> anyhow::Result<jws::Compact<T, H>>
where
T: CompactPart,
H: Serialize + DeserializeOwned,
{
let current = self.current.read().await.clone();
match token.decode_with_jwks(&current, None) {
Ok(t) => Ok(t),
Err(biscuit::errors::Error::ValidationError(
biscuit::errors::ValidationError::KeyNotFound,
)) => {
let new: JWKSet<()> = reqwest::get(self.url.clone()).await?.json().await?;
if new == current {
bail!("not found")
}
*self.current.write().await = new.clone();
token.decode_with_jwks(&new, None).context("error")
// current.find(kid).cloned().context("not found")
}
Err(e) => Err(e.into()),
}
}
}
#[cfg(test)]
mod tests {
use super::JWKSetCache;
#[tokio::test]
async fn jwkset() {
let cache =
JWKSetCache::new("https://adapted-gorilla-88.clerk.accounts.dev/.well-known/jwks.json")
.await
.unwrap();
dbg!(cache.get("ins_2YFechxysnwZcZN6TDHEz6u6w6v").await.unwrap());
}
}

View File

@@ -1,15 +1,20 @@
use std::sync::Arc;
use anyhow::bail;
use anyhow::Context;
use biscuit::JWT;
use futures::pin_mut;
use futures::StreamExt;
use hyper::body::HttpBody;
use hyper::header;
use hyper::header::AUTHORIZATION;
use hyper::http::HeaderName;
use hyper::http::HeaderValue;
use hyper::Response;
use hyper::StatusCode;
use hyper::{Body, HeaderMap, Request};
use serde::Deserialize;
use serde::Serialize;
use serde_json::json;
use serde_json::Map;
use serde_json::Value;
@@ -26,11 +31,13 @@ use url::Url;
use utils::http::error::ApiError;
use utils::http::json::json_response;
use crate::auth::backend::Policy;
use crate::config::HttpConfig;
use crate::proxy::{NUM_CONNECTIONS_ACCEPTED_COUNTER, NUM_CONNECTIONS_CLOSED_COUNTER};
use super::conn_pool::ConnInfo;
use super::conn_pool::GlobalConnPool;
use super::jwt_auth::JWKSetCaches;
#[derive(serde::Deserialize)]
struct QueryData {
@@ -118,9 +125,10 @@ fn json_array_to_pg_array(value: &Value) -> Option<String> {
}
}
fn get_conn_info(
async fn get_conn_info(
jwk_cache_pool: &JWKSetCaches,
headers: &HeaderMap,
sni_hostname: Option<String>,
sni_hostname: &str,
) -> Result<ConnInfo, anyhow::Error> {
let connection_string = headers
.get("Neon-Connection-String")
@@ -144,18 +152,42 @@ fn get_conn_info(
.next()
.ok_or(anyhow::anyhow!("invalid database name"))?;
let username = connection_url.username();
if username.is_empty() {
return Err(anyhow::anyhow!("missing username"));
}
let mut password = "";
let mut policies = None;
let authorization = headers.get(AUTHORIZATION);
let username = if let Some(auth) = authorization {
// TODO: introduce control plane API to fetch this
let jwks_url = match sni_hostname {
"ep-flat-night-23370355.cloud.krypton.aws.neon.build" => {
"https://adapted-gorilla-88.clerk.accounts.dev/.well-known/jwks.json"
}
_ => anyhow::bail!("jwt auth not supported"),
};
let jwk_cache = jwk_cache_pool.get_cache(jwks_url).await?;
let password = connection_url
.password()
.ok_or(anyhow::anyhow!("no password"))?;
let auth = auth.to_str()?;
let token = auth.strip_prefix("Bearer ").context("bad token")?;
let jwt: JWT<NeonFields, ()> = JWT::new_encoded(token);
let token = jwk_cache.decode(&jwt).await?;
let payload = token.payload().unwrap();
policies = Some(payload.private.policies.clone());
payload
.registered
.subject
.as_deref()
.context("missing user id")?
.to_owned()
} else {
password = connection_url
.password()
.ok_or(anyhow::anyhow!("no password"))?;
// TLS certificate selector now based on SNI hostname, so if we are running here
// we are sure that SNI hostname is set to one of the configured domain names.
let sni_hostname = sni_hostname.ok_or(anyhow::anyhow!("no SNI hostname set"))?;
let u = connection_url.username();
if u.is_empty() {
return Err(anyhow::anyhow!("missing username"));
}
u.to_owned()
};
let hostname = connection_url
.host_str()
@@ -186,7 +218,8 @@ fn get_conn_info(
}
Ok(ConnInfo {
username: username.to_owned(),
username,
policies,
dbname: dbname.to_owned(),
hostname: hostname.to_owned(),
password: password.to_owned(),
@@ -199,12 +232,13 @@ pub async fn handle(
request: Request<Body>,
sni_hostname: Option<String>,
conn_pool: Arc<GlobalConnPool>,
jwk_cache_pool: Arc<JWKSetCaches>,
session_id: uuid::Uuid,
config: &'static HttpConfig,
) -> Result<Response<Body>, ApiError> {
let result = tokio::time::timeout(
config.sql_over_http_timeout,
handle_inner(request, sni_hostname, conn_pool, session_id),
handle_inner(request, sni_hostname, conn_pool, jwk_cache_pool, session_id),
)
.await;
let mut response = match result {
@@ -255,6 +289,7 @@ async fn handle_inner(
request: Request<Body>,
sni_hostname: Option<String>,
conn_pool: Arc<GlobalConnPool>,
jwk_cache_pool: Arc<JWKSetCaches>,
session_id: uuid::Uuid,
) -> anyhow::Result<Response<Body>> {
NUM_CONNECTIONS_ACCEPTED_COUNTER
@@ -264,11 +299,15 @@ async fn handle_inner(
NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&["http"]).inc();
}
// TLS certificate selector now based on SNI hostname, so if we are running here
// we are sure that SNI hostname is set to one of the configured domain names.
let sni_hostname = sni_hostname.ok_or(anyhow::anyhow!("no SNI hostname set"))?;
//
// Determine the destination and connection params
//
let headers = request.headers();
let conn_info = get_conn_info(headers, sni_hostname)?;
let conn_info = get_conn_info(&jwk_cache_pool, headers, &sni_hostname).await?;
// Determine the output options. Default behaviour is 'false'. Anything that is not
// strictly 'true' assumed to be false.
@@ -697,6 +736,11 @@ fn _pg_array_parse(
Ok((Value::Array(entries), 0))
}
#[derive(Serialize, Deserialize)]
pub struct NeonFields {
policies: Vec<Policy>,
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -54,7 +54,7 @@ ring = { version = "0.16", features = ["std"] }
rustls = { version = "0.21", features = ["dangerous_configuration"] }
scopeguard = { version = "1" }
serde = { version = "1", features = ["alloc", "derive"] }
serde_json = { version = "1", features = ["raw_value"] }
serde_json = { version = "1", features = ["preserve_order", "raw_value"] }
smallvec = { version = "1", default-features = false, features = ["write"] }
time = { version = "0.3", features = ["local-offset", "macros", "serde-well-known"] }
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "test-util"] }