diff --git a/Cargo.lock b/Cargo.lock index 738771f88b..e20f51a58a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2846,6 +2846,12 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" +[[package]] +name = "mur3" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97af489e1e21b68de4c390ecca6703318bc1aa16e9733bcb62c089b73c6fbb1b" + [[package]] name = "native-tls" version = "0.2.11" @@ -3297,12 +3303,14 @@ dependencies = [ "bytes", "const_format", "enum-map", + "mur3", "postgres_ffi", "serde", "serde_json", "serde_with", "strum", "strum_macros", + "url", "utils", "workspace_hack", ] diff --git a/Cargo.toml b/Cargo.toml index e528489f1e..d105cd5242 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -94,6 +94,7 @@ jsonwebtoken = "8" libc = "0.2" md5 = "0.7.0" memoffset = "0.8" +mur3 = "0.1.0" native-tls = "0.2" nix = "0.26" notify = "5.0.0" diff --git a/libs/pageserver_api/Cargo.toml b/libs/pageserver_api/Cargo.toml index f97ec54e91..08a55a061e 100644 --- a/libs/pageserver_api/Cargo.toml +++ b/libs/pageserver_api/Cargo.toml @@ -17,5 +17,7 @@ postgres_ffi.workspace = true enum-map.workspace = true strum.workspace = true strum_macros.workspace = true +mur3.workspace = true +url.workspace = true workspace_hack.workspace = true diff --git a/libs/pageserver_api/src/key.rs b/libs/pageserver_api/src/key.rs new file mode 100644 index 0000000000..b5350d6384 --- /dev/null +++ b/libs/pageserver_api/src/key.rs @@ -0,0 +1,142 @@ +use anyhow::{bail, Result}; +use byteorder::{ByteOrder, BE}; +use serde::{Deserialize, Serialize}; +use std::fmt; + +/// Key used in the Repository kv-store. +/// +/// The Repository treats this as an opaque struct, but see the code in pgdatadir_mapping.rs +/// for what we actually store in these fields. +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)] +pub struct Key { + pub field1: u8, + pub field2: u32, + pub field3: u32, + pub field4: u32, + pub field5: u8, + pub field6: u32, +} + +pub const KEY_SIZE: usize = 18; + +impl Key { + /// 'field2' is used to store tablespaceid for relations and small enum numbers for other relish. + /// As long as Neon does not support tablespace (because of lack of access to local file system), + /// we can assume that only some predefined namespace OIDs are used which can fit in u16 + pub fn to_i128(&self) -> i128 { + assert!(self.field2 < 0xFFFF || self.field2 == 0xFFFFFFFF || self.field2 == 0x22222222); + (((self.field1 & 0xf) as i128) << 120) + | (((self.field2 & 0xFFFF) as i128) << 104) + | ((self.field3 as i128) << 72) + | ((self.field4 as i128) << 40) + | ((self.field5 as i128) << 32) + | self.field6 as i128 + } + + pub const fn from_i128(x: i128) -> Self { + Key { + field1: ((x >> 120) & 0xf) as u8, + field2: ((x >> 104) & 0xFFFF) as u32, + field3: (x >> 72) as u32, + field4: (x >> 40) as u32, + field5: (x >> 32) as u8, + field6: x as u32, + } + } + + pub fn next(&self) -> Key { + self.add(1) + } + + pub fn add(&self, x: u32) -> Key { + let mut key = *self; + + let r = key.field6.overflowing_add(x); + key.field6 = r.0; + if r.1 { + let r = key.field5.overflowing_add(1); + key.field5 = r.0; + if r.1 { + let r = key.field4.overflowing_add(1); + key.field4 = r.0; + if r.1 { + let r = key.field3.overflowing_add(1); + key.field3 = r.0; + if r.1 { + let r = key.field2.overflowing_add(1); + key.field2 = r.0; + if r.1 { + let r = key.field1.overflowing_add(1); + key.field1 = r.0; + assert!(!r.1); + } + } + } + } + } + key + } + + pub fn from_slice(b: &[u8]) -> Self { + Key { + field1: b[0], + field2: u32::from_be_bytes(b[1..5].try_into().unwrap()), + field3: u32::from_be_bytes(b[5..9].try_into().unwrap()), + field4: u32::from_be_bytes(b[9..13].try_into().unwrap()), + field5: b[13], + field6: u32::from_be_bytes(b[14..18].try_into().unwrap()), + } + } + + pub fn write_to_byte_slice(&self, buf: &mut [u8]) { + buf[0] = self.field1; + BE::write_u32(&mut buf[1..5], self.field2); + BE::write_u32(&mut buf[5..9], self.field3); + BE::write_u32(&mut buf[9..13], self.field4); + buf[13] = self.field5; + BE::write_u32(&mut buf[14..18], self.field6); + } +} + +impl fmt::Display for Key { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "{:02X}{:08X}{:08X}{:08X}{:02X}{:08X}", + self.field1, self.field2, self.field3, self.field4, self.field5, self.field6 + ) + } +} + +impl Key { + pub const MIN: Key = Key { + field1: u8::MIN, + field2: u32::MIN, + field3: u32::MIN, + field4: u32::MIN, + field5: u8::MIN, + field6: u32::MIN, + }; + pub const MAX: Key = Key { + field1: u8::MAX, + field2: u32::MAX, + field3: u32::MAX, + field4: u32::MAX, + field5: u8::MAX, + field6: u32::MAX, + }; + + pub fn from_hex(s: &str) -> Result { + if s.len() != 36 { + bail!("parse error"); + } + Ok(Key { + field1: u8::from_str_radix(&s[0..2], 16)?, + field2: u32::from_str_radix(&s[2..10], 16)?, + field3: u32::from_str_radix(&s[10..18], 16)?, + field4: u32::from_str_radix(&s[18..26], 16)?, + field5: u8::from_str_radix(&s[26..28], 16)?, + field6: u32::from_str_radix(&s[28..36], 16)?, + }) + } +} diff --git a/libs/pageserver_api/src/lib.rs b/libs/pageserver_api/src/lib.rs index e49f7d00c1..511c5ed208 100644 --- a/libs/pageserver_api/src/lib.rs +++ b/libs/pageserver_api/src/lib.rs @@ -4,8 +4,10 @@ use const_format::formatcp; /// Public API types pub mod control_api; +pub mod key; pub mod models; pub mod reltag; +pub mod shard; pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000; pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}"); diff --git a/libs/pageserver_api/src/shard.rs b/libs/pageserver_api/src/shard.rs new file mode 100644 index 0000000000..bfa1655195 --- /dev/null +++ b/libs/pageserver_api/src/shard.rs @@ -0,0 +1,189 @@ +use std::hash::Hasher; + +use crate::key::Key; +use mur3; +use serde::{Deserialize, Serialize}; +use utils::id::NodeId; + +#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug)] +pub struct ShardNumber(pub u8); + +#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug)] +pub struct ShardCount(pub u8); + +impl ShardNumber { + fn within_count(&self, rhs: ShardCount) -> bool { + self.0 < rhs.0 + } +} + +/// Stripe size in number of pages +#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)] +pub struct ShardStripeSize(pub u32); + +/// Layout version: for future upgrades where we might change how the key->shard mapping works +#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)] +pub struct ShardLayout(u8); + +const LAYOUT_V1: ShardLayout = ShardLayout(1); + +/// Default stripe size in pages: 256MiB divided by 8kiB page size. +const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8); + +/// The ShardIdentity contains the information needed for one member of map +/// to resolve a key to a shard, and then check whether that shard is ==self. +#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)] +pub struct ShardIdentity { + pub layout: ShardLayout, + pub number: ShardNumber, + pub count: ShardCount, + pub stripe_size: ShardStripeSize, +} + +/// The location of a shard contains both the logical identity of the pageserver +/// holding it (control plane's perspective), and the physical page service port +/// that postgres should use (endpoint's perspective). +#[derive(Clone)] +pub struct ShardLocation { + pub id: NodeId, + pub page_service: (url::Host, u16), +} + +/// The ShardMap is sufficient information to map any Key to the page service +/// which should store it. +#[derive(Clone)] +struct ShardMap { + layout: ShardLayout, + count: ShardCount, + stripe_size: ShardStripeSize, + pageservers: Vec>, +} + +impl ShardMap { + pub fn get_location(&self, shard_number: ShardNumber) -> &Option { + assert!(shard_number.within_count(self.count)); + self.pageservers.get(shard_number.0 as usize).unwrap() + } + + pub fn get_identity(&self, shard_number: ShardNumber) -> ShardIdentity { + assert!(shard_number.within_count(self.count)); + ShardIdentity { + layout: self.layout, + number: shard_number, + count: self.count, + stripe_size: self.stripe_size, + } + } + + /// Return Some if the key is assigned to a particular shard. Else the key + /// should be ingested by all shards (e.g. dbdir metadata). + pub fn get_shard_number(&self, key: &Key) -> Option { + if self.count < ShardCount(2) || key_is_broadcast(key) { + None + } else { + Some(key_to_shard_number(self.count, self.stripe_size, key)) + } + } + + pub fn default_with_shards(shard_count: ShardCount) -> Self { + ShardMap { + layout: LAYOUT_V1, + count: shard_count, + stripe_size: DEFAULT_STRIPE_SIZE, + pageservers: (0..shard_count.0 as usize).map(|_| None).collect(), + } + } +} + +impl ShardIdentity { + /// An identity with number=0 count=0 is a "none" identity, which represents legacy + /// tenants. Modern single-shard tenants should not use this: they should + /// have number=0 count=1. + pub fn none() -> Self { + Self { + number: ShardNumber(0), + count: ShardCount(0), + layout: LAYOUT_V1, + stripe_size: DEFAULT_STRIPE_SIZE, + } + } + + pub fn new(number: ShardNumber, count: ShardCount, stripe_size: ShardStripeSize) -> Self { + Self { + number, + count, + layout: LAYOUT_V1, + stripe_size, + } + } + + pub fn get_shard_number(&self, key: &Key) -> ShardNumber { + key_to_shard_number(self.count, self.stripe_size, key) + } + + /// Return true if the key should be ingested by this shard + pub fn is_key_local(&self, key: &Key) -> bool { + if self.count < ShardCount(2) || key_is_broadcast(key) { + return true; + } else { + key_to_shard_number(self.count, self.stripe_size, key) == self.number + } + } + + pub fn slug(&self) -> String { + if self.count > ShardCount(0) { + format!("-{:02x}{:02x}", self.number.0, self.count.0) + } else { + String::new() + } + } +} + +impl Default for ShardIdentity { + /// The default identity is to be the only shard for a tenant, i.e. the legacy + /// pre-sharding case. + fn default() -> Self { + ShardIdentity { + layout: LAYOUT_V1, + number: ShardNumber(0), + count: ShardCount(1), + stripe_size: DEFAULT_STRIPE_SIZE, + } + } +} + +/// Whether this key should be ingested by all shards +fn key_is_broadcast(key: &Key) -> bool { + // TODO: deduplicate wrt pgdatadir_mapping.rs + fn is_rel_block_key(key: &Key) -> bool { + key.field1 == 0x00 && key.field4 != 0 + } + + // TODO: can we be less conservative? Starting point is to broadcast everything + // except for rel block keys + !is_rel_block_key(key) +} + +/// Where a Key is to be distributed across shards, select the shard. This function +/// does not account for keys that should be broadcast across shards. +fn key_to_shard_number(count: ShardCount, stripe_size: ShardStripeSize, key: &Key) -> ShardNumber { + // Fast path for un-sharded tenants or broadcast keys + if count < ShardCount(2) || key_is_broadcast(key) { + return ShardNumber(0); + } + + let mut hasher = mur3::Hasher32::with_seed(0); + hasher.write_u8(key.field1); + hasher.write_u32(key.field2); + hasher.write_u32(key.field3); + hasher.write_u32(key.field4); + let hash = hasher.finish32(); + + let blkno = key.field6; + + let stripe = hash + (blkno / stripe_size.0); + + let shard = stripe as u8 % (count.0 as u8); + + ShardNumber(shard) +} diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 7a94c3449d..72c6404505 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -1,106 +1,11 @@ use crate::walrecord::NeonWalRecord; -use anyhow::{bail, Result}; -use byteorder::{ByteOrder, BE}; +use anyhow::Result; use bytes::Bytes; use serde::{Deserialize, Serialize}; -use std::fmt; use std::ops::{AddAssign, Range}; use std::time::Duration; -/// Key used in the Repository kv-store. -/// -/// The Repository treats this as an opaque struct, but see the code in pgdatadir_mapping.rs -/// for what we actually store in these fields. -#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)] -pub struct Key { - pub field1: u8, - pub field2: u32, - pub field3: u32, - pub field4: u32, - pub field5: u8, - pub field6: u32, -} - -pub const KEY_SIZE: usize = 18; - -impl Key { - /// 'field2' is used to store tablespaceid for relations and small enum numbers for other relish. - /// As long as Neon does not support tablespace (because of lack of access to local file system), - /// we can assume that only some predefined namespace OIDs are used which can fit in u16 - pub fn to_i128(&self) -> i128 { - assert!(self.field2 < 0xFFFF || self.field2 == 0xFFFFFFFF || self.field2 == 0x22222222); - (((self.field1 & 0xf) as i128) << 120) - | (((self.field2 & 0xFFFF) as i128) << 104) - | ((self.field3 as i128) << 72) - | ((self.field4 as i128) << 40) - | ((self.field5 as i128) << 32) - | self.field6 as i128 - } - - pub const fn from_i128(x: i128) -> Self { - Key { - field1: ((x >> 120) & 0xf) as u8, - field2: ((x >> 104) & 0xFFFF) as u32, - field3: (x >> 72) as u32, - field4: (x >> 40) as u32, - field5: (x >> 32) as u8, - field6: x as u32, - } - } - - pub fn next(&self) -> Key { - self.add(1) - } - - pub fn add(&self, x: u32) -> Key { - let mut key = *self; - - let r = key.field6.overflowing_add(x); - key.field6 = r.0; - if r.1 { - let r = key.field5.overflowing_add(1); - key.field5 = r.0; - if r.1 { - let r = key.field4.overflowing_add(1); - key.field4 = r.0; - if r.1 { - let r = key.field3.overflowing_add(1); - key.field3 = r.0; - if r.1 { - let r = key.field2.overflowing_add(1); - key.field2 = r.0; - if r.1 { - let r = key.field1.overflowing_add(1); - key.field1 = r.0; - assert!(!r.1); - } - } - } - } - } - key - } - - pub fn from_slice(b: &[u8]) -> Self { - Key { - field1: b[0], - field2: u32::from_be_bytes(b[1..5].try_into().unwrap()), - field3: u32::from_be_bytes(b[5..9].try_into().unwrap()), - field4: u32::from_be_bytes(b[9..13].try_into().unwrap()), - field5: b[13], - field6: u32::from_be_bytes(b[14..18].try_into().unwrap()), - } - } - - pub fn write_to_byte_slice(&self, buf: &mut [u8]) { - buf[0] = self.field1; - BE::write_u32(&mut buf[1..5], self.field2); - BE::write_u32(&mut buf[5..9], self.field3); - BE::write_u32(&mut buf[9..13], self.field4); - buf[13] = self.field5; - BE::write_u32(&mut buf[14..18], self.field6); - } -} +pub use pageserver_api::key::{Key, KEY_SIZE}; pub fn key_range_size(key_range: &Range) -> u32 { let start = key_range.start; @@ -129,49 +34,6 @@ pub fn singleton_range(key: Key) -> Range { key..key.next() } -impl fmt::Display for Key { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "{:02X}{:08X}{:08X}{:08X}{:02X}{:08X}", - self.field1, self.field2, self.field3, self.field4, self.field5, self.field6 - ) - } -} - -impl Key { - pub const MIN: Key = Key { - field1: u8::MIN, - field2: u32::MIN, - field3: u32::MIN, - field4: u32::MIN, - field5: u8::MIN, - field6: u32::MIN, - }; - pub const MAX: Key = Key { - field1: u8::MAX, - field2: u32::MAX, - field3: u32::MAX, - field4: u32::MAX, - field5: u8::MAX, - field6: u32::MAX, - }; - - pub fn from_hex(s: &str) -> Result { - if s.len() != 36 { - bail!("parse error"); - } - Ok(Key { - field1: u8::from_str_radix(&s[0..2], 16)?, - field2: u32::from_str_radix(&s[2..10], 16)?, - field3: u32::from_str_radix(&s[10..18], 16)?, - field4: u32::from_str_radix(&s[18..26], 16)?, - field5: u8::from_str_radix(&s[26..28], 16)?, - field6: u32::from_str_radix(&s[28..36], 16)?, - }) - } -} - /// A 'value' stored for a one Key. #[derive(Debug, Clone, Serialize, Deserialize)] pub enum Value {