Add sharding types

This commit is contained in:
John Spray
2023-10-20 09:57:49 +01:00
parent 22a848cf2b
commit ae19f28f59
7 changed files with 346 additions and 140 deletions

8
Cargo.lock generated
View File

@@ -2846,6 +2846,12 @@ version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
[[package]]
name = "mur3"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97af489e1e21b68de4c390ecca6703318bc1aa16e9733bcb62c089b73c6fbb1b"
[[package]]
name = "native-tls"
version = "0.2.11"
@@ -3297,12 +3303,14 @@ dependencies = [
"bytes",
"const_format",
"enum-map",
"mur3",
"postgres_ffi",
"serde",
"serde_json",
"serde_with",
"strum",
"strum_macros",
"url",
"utils",
"workspace_hack",
]

View File

@@ -94,6 +94,7 @@ jsonwebtoken = "8"
libc = "0.2"
md5 = "0.7.0"
memoffset = "0.8"
mur3 = "0.1.0"
native-tls = "0.2"
nix = "0.26"
notify = "5.0.0"

View File

@@ -17,5 +17,7 @@ postgres_ffi.workspace = true
enum-map.workspace = true
strum.workspace = true
strum_macros.workspace = true
mur3.workspace = true
url.workspace = true
workspace_hack.workspace = true

View File

@@ -0,0 +1,142 @@
use anyhow::{bail, Result};
use byteorder::{ByteOrder, BE};
use serde::{Deserialize, Serialize};
use std::fmt;
/// Key used in the Repository kv-store.
///
/// The Repository treats this as an opaque struct, but see the code in pgdatadir_mapping.rs
/// for what we actually store in these fields.
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)]
pub struct Key {
pub field1: u8,
pub field2: u32,
pub field3: u32,
pub field4: u32,
pub field5: u8,
pub field6: u32,
}
pub const KEY_SIZE: usize = 18;
impl Key {
/// 'field2' is used to store tablespaceid for relations and small enum numbers for other relish.
/// As long as Neon does not support tablespace (because of lack of access to local file system),
/// we can assume that only some predefined namespace OIDs are used which can fit in u16
pub fn to_i128(&self) -> i128 {
assert!(self.field2 < 0xFFFF || self.field2 == 0xFFFFFFFF || self.field2 == 0x22222222);
(((self.field1 & 0xf) as i128) << 120)
| (((self.field2 & 0xFFFF) as i128) << 104)
| ((self.field3 as i128) << 72)
| ((self.field4 as i128) << 40)
| ((self.field5 as i128) << 32)
| self.field6 as i128
}
pub const fn from_i128(x: i128) -> Self {
Key {
field1: ((x >> 120) & 0xf) as u8,
field2: ((x >> 104) & 0xFFFF) as u32,
field3: (x >> 72) as u32,
field4: (x >> 40) as u32,
field5: (x >> 32) as u8,
field6: x as u32,
}
}
pub fn next(&self) -> Key {
self.add(1)
}
pub fn add(&self, x: u32) -> Key {
let mut key = *self;
let r = key.field6.overflowing_add(x);
key.field6 = r.0;
if r.1 {
let r = key.field5.overflowing_add(1);
key.field5 = r.0;
if r.1 {
let r = key.field4.overflowing_add(1);
key.field4 = r.0;
if r.1 {
let r = key.field3.overflowing_add(1);
key.field3 = r.0;
if r.1 {
let r = key.field2.overflowing_add(1);
key.field2 = r.0;
if r.1 {
let r = key.field1.overflowing_add(1);
key.field1 = r.0;
assert!(!r.1);
}
}
}
}
}
key
}
pub fn from_slice(b: &[u8]) -> Self {
Key {
field1: b[0],
field2: u32::from_be_bytes(b[1..5].try_into().unwrap()),
field3: u32::from_be_bytes(b[5..9].try_into().unwrap()),
field4: u32::from_be_bytes(b[9..13].try_into().unwrap()),
field5: b[13],
field6: u32::from_be_bytes(b[14..18].try_into().unwrap()),
}
}
pub fn write_to_byte_slice(&self, buf: &mut [u8]) {
buf[0] = self.field1;
BE::write_u32(&mut buf[1..5], self.field2);
BE::write_u32(&mut buf[5..9], self.field3);
BE::write_u32(&mut buf[9..13], self.field4);
buf[13] = self.field5;
BE::write_u32(&mut buf[14..18], self.field6);
}
}
impl fmt::Display for Key {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{:02X}{:08X}{:08X}{:08X}{:02X}{:08X}",
self.field1, self.field2, self.field3, self.field4, self.field5, self.field6
)
}
}
impl Key {
pub const MIN: Key = Key {
field1: u8::MIN,
field2: u32::MIN,
field3: u32::MIN,
field4: u32::MIN,
field5: u8::MIN,
field6: u32::MIN,
};
pub const MAX: Key = Key {
field1: u8::MAX,
field2: u32::MAX,
field3: u32::MAX,
field4: u32::MAX,
field5: u8::MAX,
field6: u32::MAX,
};
pub fn from_hex(s: &str) -> Result<Self> {
if s.len() != 36 {
bail!("parse error");
}
Ok(Key {
field1: u8::from_str_radix(&s[0..2], 16)?,
field2: u32::from_str_radix(&s[2..10], 16)?,
field3: u32::from_str_radix(&s[10..18], 16)?,
field4: u32::from_str_radix(&s[18..26], 16)?,
field5: u8::from_str_radix(&s[26..28], 16)?,
field6: u32::from_str_radix(&s[28..36], 16)?,
})
}
}

View File

@@ -4,8 +4,10 @@ use const_format::formatcp;
/// Public API types
pub mod control_api;
pub mod key;
pub mod models;
pub mod reltag;
pub mod shard;
pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");

View File

@@ -0,0 +1,189 @@
use std::hash::Hasher;
use crate::key::Key;
use mur3;
use serde::{Deserialize, Serialize};
use utils::id::NodeId;
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug)]
pub struct ShardNumber(pub u8);
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug)]
pub struct ShardCount(pub u8);
impl ShardNumber {
fn within_count(&self, rhs: ShardCount) -> bool {
self.0 < rhs.0
}
}
/// Stripe size in number of pages
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
pub struct ShardStripeSize(pub u32);
/// Layout version: for future upgrades where we might change how the key->shard mapping works
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
pub struct ShardLayout(u8);
const LAYOUT_V1: ShardLayout = ShardLayout(1);
/// Default stripe size in pages: 256MiB divided by 8kiB page size.
const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8);
/// The ShardIdentity contains the information needed for one member of map
/// to resolve a key to a shard, and then check whether that shard is ==self.
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
pub struct ShardIdentity {
pub layout: ShardLayout,
pub number: ShardNumber,
pub count: ShardCount,
pub stripe_size: ShardStripeSize,
}
/// The location of a shard contains both the logical identity of the pageserver
/// holding it (control plane's perspective), and the physical page service port
/// that postgres should use (endpoint's perspective).
#[derive(Clone)]
pub struct ShardLocation {
pub id: NodeId,
pub page_service: (url::Host, u16),
}
/// The ShardMap is sufficient information to map any Key to the page service
/// which should store it.
#[derive(Clone)]
struct ShardMap {
layout: ShardLayout,
count: ShardCount,
stripe_size: ShardStripeSize,
pageservers: Vec<Option<ShardLocation>>,
}
impl ShardMap {
pub fn get_location(&self, shard_number: ShardNumber) -> &Option<ShardLocation> {
assert!(shard_number.within_count(self.count));
self.pageservers.get(shard_number.0 as usize).unwrap()
}
pub fn get_identity(&self, shard_number: ShardNumber) -> ShardIdentity {
assert!(shard_number.within_count(self.count));
ShardIdentity {
layout: self.layout,
number: shard_number,
count: self.count,
stripe_size: self.stripe_size,
}
}
/// Return Some if the key is assigned to a particular shard. Else the key
/// should be ingested by all shards (e.g. dbdir metadata).
pub fn get_shard_number(&self, key: &Key) -> Option<ShardNumber> {
if self.count < ShardCount(2) || key_is_broadcast(key) {
None
} else {
Some(key_to_shard_number(self.count, self.stripe_size, key))
}
}
pub fn default_with_shards(shard_count: ShardCount) -> Self {
ShardMap {
layout: LAYOUT_V1,
count: shard_count,
stripe_size: DEFAULT_STRIPE_SIZE,
pageservers: (0..shard_count.0 as usize).map(|_| None).collect(),
}
}
}
impl ShardIdentity {
/// An identity with number=0 count=0 is a "none" identity, which represents legacy
/// tenants. Modern single-shard tenants should not use this: they should
/// have number=0 count=1.
pub fn none() -> Self {
Self {
number: ShardNumber(0),
count: ShardCount(0),
layout: LAYOUT_V1,
stripe_size: DEFAULT_STRIPE_SIZE,
}
}
pub fn new(number: ShardNumber, count: ShardCount, stripe_size: ShardStripeSize) -> Self {
Self {
number,
count,
layout: LAYOUT_V1,
stripe_size,
}
}
pub fn get_shard_number(&self, key: &Key) -> ShardNumber {
key_to_shard_number(self.count, self.stripe_size, key)
}
/// Return true if the key should be ingested by this shard
pub fn is_key_local(&self, key: &Key) -> bool {
if self.count < ShardCount(2) || key_is_broadcast(key) {
return true;
} else {
key_to_shard_number(self.count, self.stripe_size, key) == self.number
}
}
pub fn slug(&self) -> String {
if self.count > ShardCount(0) {
format!("-{:02x}{:02x}", self.number.0, self.count.0)
} else {
String::new()
}
}
}
impl Default for ShardIdentity {
/// The default identity is to be the only shard for a tenant, i.e. the legacy
/// pre-sharding case.
fn default() -> Self {
ShardIdentity {
layout: LAYOUT_V1,
number: ShardNumber(0),
count: ShardCount(1),
stripe_size: DEFAULT_STRIPE_SIZE,
}
}
}
/// Whether this key should be ingested by all shards
fn key_is_broadcast(key: &Key) -> bool {
// TODO: deduplicate wrt pgdatadir_mapping.rs
fn is_rel_block_key(key: &Key) -> bool {
key.field1 == 0x00 && key.field4 != 0
}
// TODO: can we be less conservative? Starting point is to broadcast everything
// except for rel block keys
!is_rel_block_key(key)
}
/// Where a Key is to be distributed across shards, select the shard. This function
/// does not account for keys that should be broadcast across shards.
fn key_to_shard_number(count: ShardCount, stripe_size: ShardStripeSize, key: &Key) -> ShardNumber {
// Fast path for un-sharded tenants or broadcast keys
if count < ShardCount(2) || key_is_broadcast(key) {
return ShardNumber(0);
}
let mut hasher = mur3::Hasher32::with_seed(0);
hasher.write_u8(key.field1);
hasher.write_u32(key.field2);
hasher.write_u32(key.field3);
hasher.write_u32(key.field4);
let hash = hasher.finish32();
let blkno = key.field6;
let stripe = hash + (blkno / stripe_size.0);
let shard = stripe as u8 % (count.0 as u8);
ShardNumber(shard)
}

View File

@@ -1,106 +1,11 @@
use crate::walrecord::NeonWalRecord;
use anyhow::{bail, Result};
use byteorder::{ByteOrder, BE};
use anyhow::Result;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::ops::{AddAssign, Range};
use std::time::Duration;
/// Key used in the Repository kv-store.
///
/// The Repository treats this as an opaque struct, but see the code in pgdatadir_mapping.rs
/// for what we actually store in these fields.
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)]
pub struct Key {
pub field1: u8,
pub field2: u32,
pub field3: u32,
pub field4: u32,
pub field5: u8,
pub field6: u32,
}
pub const KEY_SIZE: usize = 18;
impl Key {
/// 'field2' is used to store tablespaceid for relations and small enum numbers for other relish.
/// As long as Neon does not support tablespace (because of lack of access to local file system),
/// we can assume that only some predefined namespace OIDs are used which can fit in u16
pub fn to_i128(&self) -> i128 {
assert!(self.field2 < 0xFFFF || self.field2 == 0xFFFFFFFF || self.field2 == 0x22222222);
(((self.field1 & 0xf) as i128) << 120)
| (((self.field2 & 0xFFFF) as i128) << 104)
| ((self.field3 as i128) << 72)
| ((self.field4 as i128) << 40)
| ((self.field5 as i128) << 32)
| self.field6 as i128
}
pub const fn from_i128(x: i128) -> Self {
Key {
field1: ((x >> 120) & 0xf) as u8,
field2: ((x >> 104) & 0xFFFF) as u32,
field3: (x >> 72) as u32,
field4: (x >> 40) as u32,
field5: (x >> 32) as u8,
field6: x as u32,
}
}
pub fn next(&self) -> Key {
self.add(1)
}
pub fn add(&self, x: u32) -> Key {
let mut key = *self;
let r = key.field6.overflowing_add(x);
key.field6 = r.0;
if r.1 {
let r = key.field5.overflowing_add(1);
key.field5 = r.0;
if r.1 {
let r = key.field4.overflowing_add(1);
key.field4 = r.0;
if r.1 {
let r = key.field3.overflowing_add(1);
key.field3 = r.0;
if r.1 {
let r = key.field2.overflowing_add(1);
key.field2 = r.0;
if r.1 {
let r = key.field1.overflowing_add(1);
key.field1 = r.0;
assert!(!r.1);
}
}
}
}
}
key
}
pub fn from_slice(b: &[u8]) -> Self {
Key {
field1: b[0],
field2: u32::from_be_bytes(b[1..5].try_into().unwrap()),
field3: u32::from_be_bytes(b[5..9].try_into().unwrap()),
field4: u32::from_be_bytes(b[9..13].try_into().unwrap()),
field5: b[13],
field6: u32::from_be_bytes(b[14..18].try_into().unwrap()),
}
}
pub fn write_to_byte_slice(&self, buf: &mut [u8]) {
buf[0] = self.field1;
BE::write_u32(&mut buf[1..5], self.field2);
BE::write_u32(&mut buf[5..9], self.field3);
BE::write_u32(&mut buf[9..13], self.field4);
buf[13] = self.field5;
BE::write_u32(&mut buf[14..18], self.field6);
}
}
pub use pageserver_api::key::{Key, KEY_SIZE};
pub fn key_range_size(key_range: &Range<Key>) -> u32 {
let start = key_range.start;
@@ -129,49 +34,6 @@ pub fn singleton_range(key: Key) -> Range<Key> {
key..key.next()
}
impl fmt::Display for Key {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{:02X}{:08X}{:08X}{:08X}{:02X}{:08X}",
self.field1, self.field2, self.field3, self.field4, self.field5, self.field6
)
}
}
impl Key {
pub const MIN: Key = Key {
field1: u8::MIN,
field2: u32::MIN,
field3: u32::MIN,
field4: u32::MIN,
field5: u8::MIN,
field6: u32::MIN,
};
pub const MAX: Key = Key {
field1: u8::MAX,
field2: u32::MAX,
field3: u32::MAX,
field4: u32::MAX,
field5: u8::MAX,
field6: u32::MAX,
};
pub fn from_hex(s: &str) -> Result<Self> {
if s.len() != 36 {
bail!("parse error");
}
Ok(Key {
field1: u8::from_str_radix(&s[0..2], 16)?,
field2: u32::from_str_radix(&s[2..10], 16)?,
field3: u32::from_str_radix(&s[10..18], 16)?,
field4: u32::from_str_radix(&s[18..26], 16)?,
field5: u8::from_str_radix(&s[26..28], 16)?,
field6: u32::from_str_radix(&s[28..36], 16)?,
})
}
}
/// A 'value' stored for a one Key.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Value {