mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-29 19:10:38 +00:00
## Problem When using TenantId as the key, we are unable to handle multiple tenant shards attached to the same pageserver for the same tenant ID. This is an expected scenario if we have e.g. 8 shards and 5 pageservers. ## Summary of changes - TenantsMap is now a BTreeMap instead of a HashMap: this enables looking up by range. In future, we will need this for page_service, as incoming requests will just specify the Key, and we'll have to figure out which shard to route it to. - A new key type TenantShardId is introduced, to act as the key in TenantsMap, and as the id type in external APIs. Its human readable serialization is backward compatible with TenantId, and also forward-compatible as long as sharding is not actually used (when we construct a TenantShardId with ShardCount(0), it serializes to an old-fashioned TenantId). - Essential tenant APIs are updated to accept TenantShardIds: tenant/timeline create, tenant delete, and /location_conf. These are the APIs that will enable driving sharded tenants. Other apis like /attach /detach /load /ignore will not work with sharding: those will soon be deprecated and replaced with /location_conf as part of the live migration work. Closes: #5787
163 lines
4.4 KiB
Rust
163 lines
4.4 KiB
Rust
use crate::walrecord::NeonWalRecord;
|
|
use anyhow::Result;
|
|
use bytes::Bytes;
|
|
use serde::{Deserialize, Serialize};
|
|
use std::ops::{AddAssign, Range};
|
|
use std::time::Duration;
|
|
|
|
pub use pageserver_api::key::{Key, KEY_SIZE};
|
|
|
|
pub fn key_range_size(key_range: &Range<Key>) -> u32 {
|
|
let start = key_range.start;
|
|
let end = key_range.end;
|
|
|
|
if end.field1 != start.field1
|
|
|| end.field2 != start.field2
|
|
|| end.field3 != start.field3
|
|
|| end.field4 != start.field4
|
|
{
|
|
return u32::MAX;
|
|
}
|
|
|
|
let start = (start.field5 as u64) << 32 | start.field6 as u64;
|
|
let end = (end.field5 as u64) << 32 | end.field6 as u64;
|
|
|
|
let diff = end - start;
|
|
if diff > u32::MAX as u64 {
|
|
u32::MAX
|
|
} else {
|
|
diff as u32
|
|
}
|
|
}
|
|
|
|
pub fn singleton_range(key: Key) -> Range<Key> {
|
|
key..key.next()
|
|
}
|
|
|
|
/// A 'value' stored for a one Key.
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
#[cfg_attr(test, derive(PartialEq))]
|
|
pub enum Value {
|
|
/// An Image value contains a full copy of the value
|
|
Image(Bytes),
|
|
/// A WalRecord value contains a WAL record that needs to be
|
|
/// replayed get the full value. Replaying the WAL record
|
|
/// might need a previous version of the value (if will_init()
|
|
/// returns false), or it may be replayed stand-alone (true).
|
|
WalRecord(NeonWalRecord),
|
|
}
|
|
|
|
impl Value {
|
|
pub fn is_image(&self) -> bool {
|
|
matches!(self, Value::Image(_))
|
|
}
|
|
|
|
pub fn will_init(&self) -> bool {
|
|
match self {
|
|
Value::Image(_) => true,
|
|
Value::WalRecord(rec) => rec.will_init(),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod test {
|
|
use super::*;
|
|
|
|
use bytes::Bytes;
|
|
use utils::bin_ser::BeSer;
|
|
|
|
macro_rules! roundtrip {
|
|
($orig:expr, $expected:expr) => {{
|
|
let orig: Value = $orig;
|
|
|
|
let actual = Value::ser(&orig).unwrap();
|
|
let expected: &[u8] = &$expected;
|
|
|
|
assert_eq!(utils::Hex(&actual), utils::Hex(expected));
|
|
|
|
let deser = Value::des(&actual).unwrap();
|
|
|
|
assert_eq!(orig, deser);
|
|
}};
|
|
}
|
|
|
|
#[test]
|
|
fn image_roundtrip() {
|
|
let image = Bytes::from_static(b"foobar");
|
|
let image = Value::Image(image);
|
|
|
|
#[rustfmt::skip]
|
|
let expected = [
|
|
// top level discriminator of 4 bytes
|
|
0x00, 0x00, 0x00, 0x00,
|
|
// 8 byte length
|
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06,
|
|
// foobar
|
|
0x66, 0x6f, 0x6f, 0x62, 0x61, 0x72
|
|
];
|
|
|
|
roundtrip!(image, expected);
|
|
}
|
|
|
|
#[test]
|
|
fn walrecord_postgres_roundtrip() {
|
|
let rec = NeonWalRecord::Postgres {
|
|
will_init: true,
|
|
rec: Bytes::from_static(b"foobar"),
|
|
};
|
|
let rec = Value::WalRecord(rec);
|
|
|
|
#[rustfmt::skip]
|
|
let expected = [
|
|
// flattened discriminator of total 8 bytes
|
|
0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00,
|
|
// will_init
|
|
0x01,
|
|
// 8 byte length
|
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06,
|
|
// foobar
|
|
0x66, 0x6f, 0x6f, 0x62, 0x61, 0x72
|
|
];
|
|
|
|
roundtrip!(rec, expected);
|
|
}
|
|
}
|
|
|
|
///
|
|
/// Result of performing GC
|
|
///
|
|
#[derive(Default, Serialize, Debug)]
|
|
pub struct GcResult {
|
|
pub layers_total: u64,
|
|
pub layers_needed_by_cutoff: u64,
|
|
pub layers_needed_by_pitr: u64,
|
|
pub layers_needed_by_branches: u64,
|
|
pub layers_not_updated: u64,
|
|
pub layers_removed: u64, // # of layer files removed because they have been made obsolete by newer ondisk files.
|
|
|
|
#[serde(serialize_with = "serialize_duration_as_millis")]
|
|
pub elapsed: Duration,
|
|
}
|
|
|
|
// helper function for `GcResult`, serializing a `Duration` as an integer number of milliseconds
|
|
fn serialize_duration_as_millis<S>(d: &Duration, serializer: S) -> Result<S::Ok, S::Error>
|
|
where
|
|
S: serde::Serializer,
|
|
{
|
|
d.as_millis().serialize(serializer)
|
|
}
|
|
|
|
impl AddAssign for GcResult {
|
|
fn add_assign(&mut self, other: Self) {
|
|
self.layers_total += other.layers_total;
|
|
self.layers_needed_by_pitr += other.layers_needed_by_pitr;
|
|
self.layers_needed_by_cutoff += other.layers_needed_by_cutoff;
|
|
self.layers_needed_by_branches += other.layers_needed_by_branches;
|
|
self.layers_not_updated += other.layers_not_updated;
|
|
self.layers_removed += other.layers_removed;
|
|
|
|
self.elapsed += other.elapsed;
|
|
}
|
|
}
|