mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-17 05:00:38 +00:00
## Problem - #7451 INIT_FORKNUM blocks must be stored on shard 0 to enable including them in basebackup. This issue can be missed in simple tests because creating an unlogged table isn't sufficient -- to repro I had to create an _index_ on an unlogged table (then restart the endpoint). Closes: #7451 ## Summary of changes - Add a reproducer for the issue. - Tweak the condition for `key_is_shard0` to include anything that isn't a normal relation block _and_ any normal relation block whose forknum is INIT_FORKNUM. - To enable existing databases to recover from the issue, add a special case that omits relations if they were stored on the wrong INITFORK. This enables postgres to start and the user to drop the table and recreate it.
1022 lines
36 KiB
Rust
1022 lines
36 KiB
Rust
use std::{ops::RangeInclusive, str::FromStr};
|
|
|
|
use crate::{
|
|
key::{is_rel_block_key, Key},
|
|
models::ShardParameters,
|
|
};
|
|
use hex::FromHex;
|
|
use postgres_ffi::relfile_utils::INIT_FORKNUM;
|
|
use serde::{Deserialize, Serialize};
|
|
use utils::id::TenantId;
|
|
|
|
/// See docs/rfcs/031-sharding-static.md for an overview of sharding.
|
|
///
|
|
/// This module contains a variety of types used to represent the concept of sharding
|
|
/// a Neon tenant across multiple physical shards. Since there are quite a few of these,
|
|
/// we provide an summary here.
|
|
///
|
|
/// Types used to describe shards:
|
|
/// - [`ShardCount`] describes how many shards make up a tenant, plus the magic `unsharded` value
|
|
/// which identifies a tenant which is not shard-aware. This means its storage paths do not include
|
|
/// a shard suffix.
|
|
/// - [`ShardNumber`] is simply the zero-based index of a shard within a tenant.
|
|
/// - [`ShardIndex`] is the 2-tuple of `ShardCount` and `ShardNumber`, it's just like a `TenantShardId`
|
|
/// without the tenant ID. This is useful for things that are implicitly scoped to a particular
|
|
/// tenant, such as layer files.
|
|
/// - [`ShardIdentity`]` is the full description of a particular shard's parameters, in sufficient
|
|
/// detail to convert a [`Key`] to a [`ShardNumber`] when deciding where to write/read.
|
|
/// - The [`ShardSlug`] is a terse formatter for ShardCount and ShardNumber, written as
|
|
/// four hex digits. An unsharded tenant is `0000`.
|
|
/// - [`TenantShardId`] is the unique ID of a particular shard within a particular tenant
|
|
///
|
|
/// Types used to describe the parameters for data distribution in a sharded tenant:
|
|
/// - [`ShardStripeSize`] controls how long contiguous runs of [`Key`]s (stripes) are when distributed across
|
|
/// multiple shards. Its value is given in 8kiB pages.
|
|
/// - [`ShardLayout`] describes the data distribution scheme, and at time of writing is
|
|
/// always zero: this is provided for future upgrades that might introduce different
|
|
/// data distribution schemes.
|
|
///
|
|
/// Examples:
|
|
/// - A legacy unsharded tenant has one shard with ShardCount(0), ShardNumber(0), and its slug is 0000
|
|
/// - A single sharded tenant has one shard with ShardCount(1), ShardNumber(0), and its slug is 0001
|
|
/// - In a tenant with 4 shards, each shard has ShardCount(N), ShardNumber(i) where i in 0..N-1 (inclusive),
|
|
/// and their slugs are 0004, 0104, 0204, and 0304.
|
|
|
|
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
|
|
pub struct ShardNumber(pub u8);
|
|
|
|
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
|
|
pub struct ShardCount(u8);
|
|
|
|
/// Combination of ShardNumber and ShardCount. For use within the context of a particular tenant,
|
|
/// when we need to know which shard we're dealing with, but do not need to know the full
|
|
/// ShardIdentity (because we won't be doing any page->shard mapping), and do not need to know
|
|
/// the fully qualified TenantShardId.
|
|
#[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
|
|
pub struct ShardIndex {
|
|
pub shard_number: ShardNumber,
|
|
pub shard_count: ShardCount,
|
|
}
|
|
|
|
/// The ShardIdentity contains enough information to map a [`Key`] to a [`ShardNumber`],
|
|
/// and to check whether that [`ShardNumber`] is the same as the current shard.
|
|
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
|
|
pub struct ShardIdentity {
|
|
pub number: ShardNumber,
|
|
pub count: ShardCount,
|
|
pub stripe_size: ShardStripeSize,
|
|
layout: ShardLayout,
|
|
}
|
|
|
|
/// Formatting helper, for generating the `shard_id` label in traces.
|
|
struct ShardSlug<'a>(&'a TenantShardId);
|
|
|
|
/// TenantShardId globally identifies a particular shard in a particular tenant.
|
|
///
|
|
/// These are written as `<TenantId>-<ShardSlug>`, for example:
|
|
/// # The second shard in a two-shard tenant
|
|
/// 072f1291a5310026820b2fe4b2968934-0102
|
|
///
|
|
/// If the `ShardCount` is _unsharded_, the `TenantShardId` is written without
|
|
/// a shard suffix and is equivalent to the encoding of a `TenantId`: this enables
|
|
/// an unsharded [`TenantShardId`] to be used interchangably with a [`TenantId`].
|
|
///
|
|
/// The human-readable encoding of an unsharded TenantShardId, such as used in API URLs,
|
|
/// is both forward and backward compatible with TenantId: a legacy TenantId can be
|
|
/// decoded as a TenantShardId, and when re-encoded it will be parseable
|
|
/// as a TenantId.
|
|
#[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
|
|
pub struct TenantShardId {
|
|
pub tenant_id: TenantId,
|
|
pub shard_number: ShardNumber,
|
|
pub shard_count: ShardCount,
|
|
}
|
|
|
|
impl ShardCount {
|
|
pub const MAX: Self = Self(u8::MAX);
|
|
|
|
/// The internal value of a ShardCount may be zero, which means "1 shard, but use
|
|
/// legacy format for TenantShardId that excludes the shard suffix", also known
|
|
/// as `TenantShardId::unsharded`.
|
|
///
|
|
/// This method returns the actual number of shards, i.e. if our internal value is
|
|
/// zero, we return 1 (unsharded tenants have 1 shard).
|
|
pub fn count(&self) -> u8 {
|
|
if self.0 > 0 {
|
|
self.0
|
|
} else {
|
|
1
|
|
}
|
|
}
|
|
|
|
/// The literal internal value: this is **not** the number of shards in the
|
|
/// tenant, as we have a special zero value for legacy unsharded tenants. Use
|
|
/// [`Self::count`] if you want to know the cardinality of shards.
|
|
pub fn literal(&self) -> u8 {
|
|
self.0
|
|
}
|
|
|
|
///
|
|
pub fn is_unsharded(&self) -> bool {
|
|
self.0 == 0
|
|
}
|
|
|
|
/// `v` may be zero, or the number of shards in the tenant. `v` is what
|
|
/// [`Self::literal`] would return.
|
|
pub fn new(val: u8) -> Self {
|
|
Self(val)
|
|
}
|
|
}
|
|
|
|
impl ShardNumber {
|
|
pub const MAX: Self = Self(u8::MAX);
|
|
}
|
|
|
|
impl TenantShardId {
|
|
pub fn unsharded(tenant_id: TenantId) -> Self {
|
|
Self {
|
|
tenant_id,
|
|
shard_number: ShardNumber(0),
|
|
shard_count: ShardCount(0),
|
|
}
|
|
}
|
|
|
|
/// The range of all TenantShardId that belong to a particular TenantId. This is useful when
|
|
/// you have a BTreeMap of TenantShardId, and are querying by TenantId.
|
|
pub fn tenant_range(tenant_id: TenantId) -> RangeInclusive<Self> {
|
|
RangeInclusive::new(
|
|
Self {
|
|
tenant_id,
|
|
shard_number: ShardNumber(0),
|
|
shard_count: ShardCount(0),
|
|
},
|
|
Self {
|
|
tenant_id,
|
|
shard_number: ShardNumber::MAX,
|
|
shard_count: ShardCount::MAX,
|
|
},
|
|
)
|
|
}
|
|
|
|
pub fn shard_slug(&self) -> impl std::fmt::Display + '_ {
|
|
ShardSlug(self)
|
|
}
|
|
|
|
/// Convenience for code that has special behavior on the 0th shard.
|
|
pub fn is_shard_zero(&self) -> bool {
|
|
self.shard_number == ShardNumber(0)
|
|
}
|
|
|
|
/// The "unsharded" value is distinct from simply having a single shard: it represents
|
|
/// a tenant which is not shard-aware at all, and whose storage paths will not include
|
|
/// a shard suffix.
|
|
pub fn is_unsharded(&self) -> bool {
|
|
self.shard_number == ShardNumber(0) && self.shard_count.is_unsharded()
|
|
}
|
|
|
|
/// Convenience for dropping the tenant_id and just getting the ShardIndex: this
|
|
/// is useful when logging from code that is already in a span that includes tenant ID, to
|
|
/// keep messages reasonably terse.
|
|
pub fn to_index(&self) -> ShardIndex {
|
|
ShardIndex {
|
|
shard_number: self.shard_number,
|
|
shard_count: self.shard_count,
|
|
}
|
|
}
|
|
|
|
/// Calculate the children of this TenantShardId when splitting the overall tenant into
|
|
/// the given number of shards.
|
|
pub fn split(&self, new_shard_count: ShardCount) -> Vec<TenantShardId> {
|
|
let effective_old_shard_count = std::cmp::max(self.shard_count.0, 1);
|
|
let mut child_shards = Vec::new();
|
|
for shard_number in 0..ShardNumber(new_shard_count.0).0 {
|
|
// Key mapping is based on a round robin mapping of key hash modulo shard count,
|
|
// so our child shards are the ones which the same keys would map to.
|
|
if shard_number % effective_old_shard_count == self.shard_number.0 {
|
|
child_shards.push(TenantShardId {
|
|
tenant_id: self.tenant_id,
|
|
shard_number: ShardNumber(shard_number),
|
|
shard_count: new_shard_count,
|
|
})
|
|
}
|
|
}
|
|
|
|
child_shards
|
|
}
|
|
}
|
|
|
|
impl<'a> std::fmt::Display for ShardSlug<'a> {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
write!(
|
|
f,
|
|
"{:02x}{:02x}",
|
|
self.0.shard_number.0, self.0.shard_count.0
|
|
)
|
|
}
|
|
}
|
|
|
|
impl std::fmt::Display for TenantShardId {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
if self.shard_count != ShardCount(0) {
|
|
write!(f, "{}-{}", self.tenant_id, self.shard_slug())
|
|
} else {
|
|
// Legacy case (shard_count == 0) -- format as just the tenant id. Note that this
|
|
// is distinct from the normal single shard case (shard count == 1).
|
|
self.tenant_id.fmt(f)
|
|
}
|
|
}
|
|
}
|
|
|
|
impl std::fmt::Debug for TenantShardId {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
// Debug is the same as Display: the compact hex representation
|
|
write!(f, "{}", self)
|
|
}
|
|
}
|
|
|
|
impl std::str::FromStr for TenantShardId {
|
|
type Err = hex::FromHexError;
|
|
|
|
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
|
// Expect format: 16 byte TenantId, '-', 1 byte shard number, 1 byte shard count
|
|
if s.len() == 32 {
|
|
// Legacy case: no shard specified
|
|
Ok(Self {
|
|
tenant_id: TenantId::from_str(s)?,
|
|
shard_number: ShardNumber(0),
|
|
shard_count: ShardCount(0),
|
|
})
|
|
} else if s.len() == 37 {
|
|
let bytes = s.as_bytes();
|
|
let tenant_id = TenantId::from_hex(&bytes[0..32])?;
|
|
let mut shard_parts: [u8; 2] = [0u8; 2];
|
|
hex::decode_to_slice(&bytes[33..37], &mut shard_parts)?;
|
|
Ok(Self {
|
|
tenant_id,
|
|
shard_number: ShardNumber(shard_parts[0]),
|
|
shard_count: ShardCount(shard_parts[1]),
|
|
})
|
|
} else {
|
|
Err(hex::FromHexError::InvalidStringLength)
|
|
}
|
|
}
|
|
}
|
|
|
|
impl From<[u8; 18]> for TenantShardId {
|
|
fn from(b: [u8; 18]) -> Self {
|
|
let tenant_id_bytes: [u8; 16] = b[0..16].try_into().unwrap();
|
|
|
|
Self {
|
|
tenant_id: TenantId::from(tenant_id_bytes),
|
|
shard_number: ShardNumber(b[16]),
|
|
shard_count: ShardCount(b[17]),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl ShardIndex {
|
|
pub fn new(number: ShardNumber, count: ShardCount) -> Self {
|
|
Self {
|
|
shard_number: number,
|
|
shard_count: count,
|
|
}
|
|
}
|
|
pub fn unsharded() -> Self {
|
|
Self {
|
|
shard_number: ShardNumber(0),
|
|
shard_count: ShardCount(0),
|
|
}
|
|
}
|
|
|
|
/// The "unsharded" value is distinct from simply having a single shard: it represents
|
|
/// a tenant which is not shard-aware at all, and whose storage paths will not include
|
|
/// a shard suffix.
|
|
pub fn is_unsharded(&self) -> bool {
|
|
self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0)
|
|
}
|
|
|
|
/// For use in constructing remote storage paths: concatenate this with a TenantId
|
|
/// to get a fully qualified TenantShardId.
|
|
///
|
|
/// Backward compat: this function returns an empty string if Self::is_unsharded, such
|
|
/// that the legacy pre-sharding remote key format is preserved.
|
|
pub fn get_suffix(&self) -> String {
|
|
if self.is_unsharded() {
|
|
"".to_string()
|
|
} else {
|
|
format!("-{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
|
|
}
|
|
}
|
|
}
|
|
|
|
impl std::fmt::Display for ShardIndex {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
write!(f, "{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
|
|
}
|
|
}
|
|
|
|
impl std::fmt::Debug for ShardIndex {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
// Debug is the same as Display: the compact hex representation
|
|
write!(f, "{}", self)
|
|
}
|
|
}
|
|
|
|
impl std::str::FromStr for ShardIndex {
|
|
type Err = hex::FromHexError;
|
|
|
|
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
|
// Expect format: 1 byte shard number, 1 byte shard count
|
|
if s.len() == 4 {
|
|
let bytes = s.as_bytes();
|
|
let mut shard_parts: [u8; 2] = [0u8; 2];
|
|
hex::decode_to_slice(bytes, &mut shard_parts)?;
|
|
Ok(Self {
|
|
shard_number: ShardNumber(shard_parts[0]),
|
|
shard_count: ShardCount(shard_parts[1]),
|
|
})
|
|
} else {
|
|
Err(hex::FromHexError::InvalidStringLength)
|
|
}
|
|
}
|
|
}
|
|
|
|
impl From<[u8; 2]> for ShardIndex {
|
|
fn from(b: [u8; 2]) -> Self {
|
|
Self {
|
|
shard_number: ShardNumber(b[0]),
|
|
shard_count: ShardCount(b[1]),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Serialize for TenantShardId {
|
|
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
|
where
|
|
S: serde::Serializer,
|
|
{
|
|
if serializer.is_human_readable() {
|
|
serializer.collect_str(self)
|
|
} else {
|
|
// Note: while human encoding of [`TenantShardId`] is backward and forward
|
|
// compatible, this binary encoding is not.
|
|
let mut packed: [u8; 18] = [0; 18];
|
|
packed[0..16].clone_from_slice(&self.tenant_id.as_arr());
|
|
packed[16] = self.shard_number.0;
|
|
packed[17] = self.shard_count.0;
|
|
|
|
packed.serialize(serializer)
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<'de> Deserialize<'de> for TenantShardId {
|
|
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
|
where
|
|
D: serde::Deserializer<'de>,
|
|
{
|
|
struct IdVisitor {
|
|
is_human_readable_deserializer: bool,
|
|
}
|
|
|
|
impl<'de> serde::de::Visitor<'de> for IdVisitor {
|
|
type Value = TenantShardId;
|
|
|
|
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
|
|
if self.is_human_readable_deserializer {
|
|
formatter.write_str("value in form of hex string")
|
|
} else {
|
|
formatter.write_str("value in form of integer array([u8; 18])")
|
|
}
|
|
}
|
|
|
|
fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
|
|
where
|
|
A: serde::de::SeqAccess<'de>,
|
|
{
|
|
let s = serde::de::value::SeqAccessDeserializer::new(seq);
|
|
let id: [u8; 18] = Deserialize::deserialize(s)?;
|
|
Ok(TenantShardId::from(id))
|
|
}
|
|
|
|
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
|
|
where
|
|
E: serde::de::Error,
|
|
{
|
|
TenantShardId::from_str(v).map_err(E::custom)
|
|
}
|
|
}
|
|
|
|
if deserializer.is_human_readable() {
|
|
deserializer.deserialize_str(IdVisitor {
|
|
is_human_readable_deserializer: true,
|
|
})
|
|
} else {
|
|
deserializer.deserialize_tuple(
|
|
18,
|
|
IdVisitor {
|
|
is_human_readable_deserializer: false,
|
|
},
|
|
)
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Stripe size in number of pages
|
|
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
|
|
pub struct ShardStripeSize(pub u32);
|
|
|
|
/// Layout version: for future upgrades where we might change how the key->shard mapping works
|
|
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
|
|
pub struct ShardLayout(u8);
|
|
|
|
const LAYOUT_V1: ShardLayout = ShardLayout(1);
|
|
/// ShardIdentity uses a magic layout value to indicate if it is unusable
|
|
const LAYOUT_BROKEN: ShardLayout = ShardLayout(255);
|
|
|
|
/// Default stripe size in pages: 256MiB divided by 8kiB page size.
|
|
const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8);
|
|
|
|
#[derive(thiserror::Error, Debug, PartialEq, Eq)]
|
|
pub enum ShardConfigError {
|
|
#[error("Invalid shard count")]
|
|
InvalidCount,
|
|
#[error("Invalid shard number")]
|
|
InvalidNumber,
|
|
#[error("Invalid stripe size")]
|
|
InvalidStripeSize,
|
|
}
|
|
|
|
impl ShardIdentity {
|
|
/// An identity with number=0 count=0 is a "none" identity, which represents legacy
|
|
/// tenants. Modern single-shard tenants should not use this: they should
|
|
/// have number=0 count=1.
|
|
pub fn unsharded() -> Self {
|
|
Self {
|
|
number: ShardNumber(0),
|
|
count: ShardCount(0),
|
|
layout: LAYOUT_V1,
|
|
stripe_size: DEFAULT_STRIPE_SIZE,
|
|
}
|
|
}
|
|
|
|
/// A broken instance of this type is only used for `TenantState::Broken` tenants,
|
|
/// which are constructed in code paths that don't have access to proper configuration.
|
|
///
|
|
/// A ShardIdentity in this state may not be used for anything, and should not be persisted.
|
|
/// Enforcement is via assertions, to avoid making our interface fallible for this
|
|
/// edge case: it is the Tenant's responsibility to avoid trying to do any I/O when in a broken
|
|
/// state, and by extension to avoid trying to do any page->shard resolution.
|
|
pub fn broken(number: ShardNumber, count: ShardCount) -> Self {
|
|
Self {
|
|
number,
|
|
count,
|
|
layout: LAYOUT_BROKEN,
|
|
stripe_size: DEFAULT_STRIPE_SIZE,
|
|
}
|
|
}
|
|
|
|
/// The "unsharded" value is distinct from simply having a single shard: it represents
|
|
/// a tenant which is not shard-aware at all, and whose storage paths will not include
|
|
/// a shard suffix.
|
|
pub fn is_unsharded(&self) -> bool {
|
|
self.number == ShardNumber(0) && self.count == ShardCount(0)
|
|
}
|
|
|
|
/// Count must be nonzero, and number must be < count. To construct
|
|
/// the legacy case (count==0), use Self::unsharded instead.
|
|
pub fn new(
|
|
number: ShardNumber,
|
|
count: ShardCount,
|
|
stripe_size: ShardStripeSize,
|
|
) -> Result<Self, ShardConfigError> {
|
|
if count.0 == 0 {
|
|
Err(ShardConfigError::InvalidCount)
|
|
} else if number.0 > count.0 - 1 {
|
|
Err(ShardConfigError::InvalidNumber)
|
|
} else if stripe_size.0 == 0 {
|
|
Err(ShardConfigError::InvalidStripeSize)
|
|
} else {
|
|
Ok(Self {
|
|
number,
|
|
count,
|
|
layout: LAYOUT_V1,
|
|
stripe_size,
|
|
})
|
|
}
|
|
}
|
|
|
|
/// For use when creating ShardIdentity instances for new shards, where a creation request
|
|
/// specifies the ShardParameters that apply to all shards.
|
|
pub fn from_params(number: ShardNumber, params: &ShardParameters) -> Self {
|
|
Self {
|
|
number,
|
|
count: params.count,
|
|
layout: LAYOUT_V1,
|
|
stripe_size: params.stripe_size,
|
|
}
|
|
}
|
|
|
|
fn is_broken(&self) -> bool {
|
|
self.layout == LAYOUT_BROKEN
|
|
}
|
|
|
|
pub fn get_shard_number(&self, key: &Key) -> ShardNumber {
|
|
assert!(!self.is_broken());
|
|
key_to_shard_number(self.count, self.stripe_size, key)
|
|
}
|
|
|
|
/// Return true if the key should be ingested by this shard
|
|
///
|
|
/// Shards must ingest _at least_ keys which return true from this check.
|
|
pub fn is_key_local(&self, key: &Key) -> bool {
|
|
assert!(!self.is_broken());
|
|
if self.count < ShardCount(2) || (key_is_shard0(key) && self.number == ShardNumber(0)) {
|
|
true
|
|
} else {
|
|
key_to_shard_number(self.count, self.stripe_size, key) == self.number
|
|
}
|
|
}
|
|
|
|
/// Special case for issue `<https://github.com/neondatabase/neon/issues/7451>`
|
|
///
|
|
/// When we fail to read a forknum block, this function tells us whether we may ignore the error
|
|
/// as a symptom of that issue.
|
|
pub fn is_key_buggy_forknum(&self, key: &Key) -> bool {
|
|
if !is_rel_block_key(key) || key.field5 != INIT_FORKNUM {
|
|
return false;
|
|
}
|
|
|
|
let mut hash = murmurhash32(key.field4);
|
|
hash = hash_combine(hash, murmurhash32(key.field6 / self.stripe_size.0));
|
|
let mapped_shard = ShardNumber((hash % self.count.0 as u32) as u8);
|
|
|
|
// The key may be affected by issue #7454: it is an initfork and it would not
|
|
// have mapped to shard 0 until we fixed that issue.
|
|
mapped_shard != ShardNumber(0)
|
|
}
|
|
|
|
/// Return true if the key should be discarded if found in this shard's
|
|
/// data store, e.g. during compaction after a split.
|
|
///
|
|
/// Shards _may_ drop keys which return false here, but are not obliged to.
|
|
pub fn is_key_disposable(&self, key: &Key) -> bool {
|
|
if key_is_shard0(key) {
|
|
// Q: Why can't we dispose of shard0 content if we're not shard 0?
|
|
// A1: because the WAL ingestion logic currently ingests some shard 0
|
|
// content on all shards, even though it's only read on shard 0. If we
|
|
// dropped it, then subsequent WAL ingest to these keys would encounter
|
|
// an error.
|
|
// A2: because key_is_shard0 also covers relation size keys, which are written
|
|
// on all shards even though they're only maintained accurately on shard 0.
|
|
false
|
|
} else {
|
|
!self.is_key_local(key)
|
|
}
|
|
}
|
|
|
|
pub fn shard_slug(&self) -> String {
|
|
if self.count > ShardCount(0) {
|
|
format!("-{:02x}{:02x}", self.number.0, self.count.0)
|
|
} else {
|
|
String::new()
|
|
}
|
|
}
|
|
|
|
/// Convenience for checking if this identity is the 0th shard in a tenant,
|
|
/// for special cases on shard 0 such as ingesting relation sizes.
|
|
pub fn is_shard_zero(&self) -> bool {
|
|
self.number == ShardNumber(0)
|
|
}
|
|
}
|
|
|
|
impl Serialize for ShardIndex {
|
|
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
|
where
|
|
S: serde::Serializer,
|
|
{
|
|
if serializer.is_human_readable() {
|
|
serializer.collect_str(self)
|
|
} else {
|
|
// Binary encoding is not used in index_part.json, but is included in anticipation of
|
|
// switching various structures (e.g. inter-process communication, remote metadata) to more
|
|
// compact binary encodings in future.
|
|
let mut packed: [u8; 2] = [0; 2];
|
|
packed[0] = self.shard_number.0;
|
|
packed[1] = self.shard_count.0;
|
|
packed.serialize(serializer)
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<'de> Deserialize<'de> for ShardIndex {
|
|
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
|
where
|
|
D: serde::Deserializer<'de>,
|
|
{
|
|
struct IdVisitor {
|
|
is_human_readable_deserializer: bool,
|
|
}
|
|
|
|
impl<'de> serde::de::Visitor<'de> for IdVisitor {
|
|
type Value = ShardIndex;
|
|
|
|
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
|
|
if self.is_human_readable_deserializer {
|
|
formatter.write_str("value in form of hex string")
|
|
} else {
|
|
formatter.write_str("value in form of integer array([u8; 2])")
|
|
}
|
|
}
|
|
|
|
fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
|
|
where
|
|
A: serde::de::SeqAccess<'de>,
|
|
{
|
|
let s = serde::de::value::SeqAccessDeserializer::new(seq);
|
|
let id: [u8; 2] = Deserialize::deserialize(s)?;
|
|
Ok(ShardIndex::from(id))
|
|
}
|
|
|
|
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
|
|
where
|
|
E: serde::de::Error,
|
|
{
|
|
ShardIndex::from_str(v).map_err(E::custom)
|
|
}
|
|
}
|
|
|
|
if deserializer.is_human_readable() {
|
|
deserializer.deserialize_str(IdVisitor {
|
|
is_human_readable_deserializer: true,
|
|
})
|
|
} else {
|
|
deserializer.deserialize_tuple(
|
|
2,
|
|
IdVisitor {
|
|
is_human_readable_deserializer: false,
|
|
},
|
|
)
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Whether this key is always held on shard 0 (e.g. shard 0 holds all SLRU keys
|
|
/// in order to be able to serve basebackup requests without peer communication).
|
|
fn key_is_shard0(key: &Key) -> bool {
|
|
// To decide what to shard out to shards >0, we apply a simple rule that only
|
|
// relation pages are distributed to shards other than shard zero. Everything else gets
|
|
// stored on shard 0. This guarantees that shard 0 can independently serve basebackup
|
|
// requests, and any request other than those for particular blocks in relations.
|
|
//
|
|
// The only exception to this rule is "initfork" data -- this relates to postgres's UNLOGGED table
|
|
// type. These are special relations, usually with only 0 or 1 blocks, and we store them on shard 0
|
|
// because they must be included in basebackups.
|
|
let is_initfork = key.field5 == INIT_FORKNUM;
|
|
|
|
!is_rel_block_key(key) || is_initfork
|
|
}
|
|
|
|
/// Provide the same result as the function in postgres `hashfn.h` with the same name
|
|
fn murmurhash32(mut h: u32) -> u32 {
|
|
h ^= h >> 16;
|
|
h = h.wrapping_mul(0x85ebca6b);
|
|
h ^= h >> 13;
|
|
h = h.wrapping_mul(0xc2b2ae35);
|
|
h ^= h >> 16;
|
|
h
|
|
}
|
|
|
|
/// Provide the same result as the function in postgres `hashfn.h` with the same name
|
|
fn hash_combine(mut a: u32, mut b: u32) -> u32 {
|
|
b = b.wrapping_add(0x9e3779b9);
|
|
b = b.wrapping_add(a << 6);
|
|
b = b.wrapping_add(a >> 2);
|
|
|
|
a ^= b;
|
|
a
|
|
}
|
|
|
|
/// Where a Key is to be distributed across shards, select the shard. This function
|
|
/// does not account for keys that should be broadcast across shards.
|
|
///
|
|
/// The hashing in this function must exactly match what we do in postgres smgr
|
|
/// code. The resulting distribution of pages is intended to preserve locality within
|
|
/// `stripe_size` ranges of contiguous block numbers in the same relation, while otherwise
|
|
/// distributing data pseudo-randomly.
|
|
///
|
|
/// The mapping of key to shard is not stable across changes to ShardCount: this is intentional
|
|
/// and will be handled at higher levels when shards are split.
|
|
fn key_to_shard_number(count: ShardCount, stripe_size: ShardStripeSize, key: &Key) -> ShardNumber {
|
|
// Fast path for un-sharded tenants or broadcast keys
|
|
if count < ShardCount(2) || key_is_shard0(key) {
|
|
return ShardNumber(0);
|
|
}
|
|
|
|
// relNode
|
|
let mut hash = murmurhash32(key.field4);
|
|
// blockNum/stripe size
|
|
hash = hash_combine(hash, murmurhash32(key.field6 / stripe_size.0));
|
|
|
|
ShardNumber((hash % count.0 as u32) as u8)
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use utils::Hex;
|
|
|
|
use super::*;
|
|
|
|
const EXAMPLE_TENANT_ID: &str = "1f359dd625e519a1a4e8d7509690f6fc";
|
|
|
|
#[test]
|
|
fn tenant_shard_id_string() -> Result<(), hex::FromHexError> {
|
|
let example = TenantShardId {
|
|
tenant_id: TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(),
|
|
shard_count: ShardCount(10),
|
|
shard_number: ShardNumber(7),
|
|
};
|
|
|
|
let encoded = format!("{example}");
|
|
|
|
let expected = format!("{EXAMPLE_TENANT_ID}-070a");
|
|
assert_eq!(&encoded, &expected);
|
|
|
|
let decoded = TenantShardId::from_str(&encoded)?;
|
|
|
|
assert_eq!(example, decoded);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
fn tenant_shard_id_binary() -> Result<(), hex::FromHexError> {
|
|
let example = TenantShardId {
|
|
tenant_id: TenantId::from_str(EXAMPLE_TENANT_ID).unwrap(),
|
|
shard_count: ShardCount(10),
|
|
shard_number: ShardNumber(7),
|
|
};
|
|
|
|
let encoded = bincode::serialize(&example).unwrap();
|
|
let expected: [u8; 18] = [
|
|
0x1f, 0x35, 0x9d, 0xd6, 0x25, 0xe5, 0x19, 0xa1, 0xa4, 0xe8, 0xd7, 0x50, 0x96, 0x90,
|
|
0xf6, 0xfc, 0x07, 0x0a,
|
|
];
|
|
assert_eq!(Hex(&encoded), Hex(&expected));
|
|
|
|
let decoded = bincode::deserialize(&encoded).unwrap();
|
|
|
|
assert_eq!(example, decoded);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
fn tenant_shard_id_backward_compat() -> Result<(), hex::FromHexError> {
|
|
// Test that TenantShardId can decode a TenantId in human
|
|
// readable form
|
|
let example = TenantId::from_str(EXAMPLE_TENANT_ID).unwrap();
|
|
let encoded = format!("{example}");
|
|
|
|
assert_eq!(&encoded, EXAMPLE_TENANT_ID);
|
|
|
|
let decoded = TenantShardId::from_str(&encoded)?;
|
|
|
|
assert_eq!(example, decoded.tenant_id);
|
|
assert_eq!(decoded.shard_count, ShardCount(0));
|
|
assert_eq!(decoded.shard_number, ShardNumber(0));
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
fn tenant_shard_id_forward_compat() -> Result<(), hex::FromHexError> {
|
|
// Test that a legacy TenantShardId encodes into a form that
|
|
// can be decoded as TenantId
|
|
let example_tenant_id = TenantId::from_str(EXAMPLE_TENANT_ID).unwrap();
|
|
let example = TenantShardId::unsharded(example_tenant_id);
|
|
let encoded = format!("{example}");
|
|
|
|
assert_eq!(&encoded, EXAMPLE_TENANT_ID);
|
|
|
|
let decoded = TenantId::from_str(&encoded)?;
|
|
|
|
assert_eq!(example_tenant_id, decoded);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
fn tenant_shard_id_legacy_binary() -> Result<(), hex::FromHexError> {
|
|
// Unlike in human readable encoding, binary encoding does not
|
|
// do any special handling of legacy unsharded TenantIds: this test
|
|
// is equivalent to the main test for binary encoding, just verifying
|
|
// that the same behavior applies when we have used `unsharded()` to
|
|
// construct a TenantShardId.
|
|
let example = TenantShardId::unsharded(TenantId::from_str(EXAMPLE_TENANT_ID).unwrap());
|
|
let encoded = bincode::serialize(&example).unwrap();
|
|
|
|
let expected: [u8; 18] = [
|
|
0x1f, 0x35, 0x9d, 0xd6, 0x25, 0xe5, 0x19, 0xa1, 0xa4, 0xe8, 0xd7, 0x50, 0x96, 0x90,
|
|
0xf6, 0xfc, 0x00, 0x00,
|
|
];
|
|
assert_eq!(Hex(&encoded), Hex(&expected));
|
|
|
|
let decoded = bincode::deserialize::<TenantShardId>(&encoded).unwrap();
|
|
assert_eq!(example, decoded);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
fn shard_identity_validation() -> Result<(), ShardConfigError> {
|
|
// Happy cases
|
|
ShardIdentity::new(ShardNumber(0), ShardCount(1), DEFAULT_STRIPE_SIZE)?;
|
|
ShardIdentity::new(ShardNumber(0), ShardCount(1), ShardStripeSize(1))?;
|
|
ShardIdentity::new(ShardNumber(254), ShardCount(255), ShardStripeSize(1))?;
|
|
|
|
assert_eq!(
|
|
ShardIdentity::new(ShardNumber(0), ShardCount(0), DEFAULT_STRIPE_SIZE),
|
|
Err(ShardConfigError::InvalidCount)
|
|
);
|
|
assert_eq!(
|
|
ShardIdentity::new(ShardNumber(10), ShardCount(10), DEFAULT_STRIPE_SIZE),
|
|
Err(ShardConfigError::InvalidNumber)
|
|
);
|
|
assert_eq!(
|
|
ShardIdentity::new(ShardNumber(11), ShardCount(10), DEFAULT_STRIPE_SIZE),
|
|
Err(ShardConfigError::InvalidNumber)
|
|
);
|
|
assert_eq!(
|
|
ShardIdentity::new(ShardNumber(255), ShardCount(255), DEFAULT_STRIPE_SIZE),
|
|
Err(ShardConfigError::InvalidNumber)
|
|
);
|
|
assert_eq!(
|
|
ShardIdentity::new(ShardNumber(0), ShardCount(1), ShardStripeSize(0)),
|
|
Err(ShardConfigError::InvalidStripeSize)
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
fn shard_index_human_encoding() -> Result<(), hex::FromHexError> {
|
|
let example = ShardIndex {
|
|
shard_number: ShardNumber(13),
|
|
shard_count: ShardCount(17),
|
|
};
|
|
let expected: String = "0d11".to_string();
|
|
let encoded = format!("{example}");
|
|
assert_eq!(&encoded, &expected);
|
|
|
|
let decoded = ShardIndex::from_str(&encoded)?;
|
|
assert_eq!(example, decoded);
|
|
Ok(())
|
|
}
|
|
|
|
#[test]
|
|
fn shard_index_binary_encoding() -> Result<(), hex::FromHexError> {
|
|
let example = ShardIndex {
|
|
shard_number: ShardNumber(13),
|
|
shard_count: ShardCount(17),
|
|
};
|
|
let expected: [u8; 2] = [0x0d, 0x11];
|
|
|
|
let encoded = bincode::serialize(&example).unwrap();
|
|
assert_eq!(Hex(&encoded), Hex(&expected));
|
|
let decoded = bincode::deserialize(&encoded).unwrap();
|
|
assert_eq!(example, decoded);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
// These are only smoke tests to spot check that our implementation doesn't
|
|
// deviate from a few examples values: not aiming to validate the overall
|
|
// hashing algorithm.
|
|
#[test]
|
|
fn murmur_hash() {
|
|
assert_eq!(murmurhash32(0), 0);
|
|
|
|
assert_eq!(hash_combine(0xb1ff3b40, 0), 0xfb7923c9);
|
|
}
|
|
|
|
#[test]
|
|
fn shard_mapping() {
|
|
let key = Key {
|
|
field1: 0x00,
|
|
field2: 0x67f,
|
|
field3: 0x5,
|
|
field4: 0x400c,
|
|
field5: 0x00,
|
|
field6: 0x7d06,
|
|
};
|
|
|
|
let shard = key_to_shard_number(ShardCount(10), DEFAULT_STRIPE_SIZE, &key);
|
|
assert_eq!(shard, ShardNumber(8));
|
|
}
|
|
|
|
#[test]
|
|
fn shard_id_split() {
|
|
let tenant_id = TenantId::generate();
|
|
let parent = TenantShardId::unsharded(tenant_id);
|
|
|
|
// Unsharded into 2
|
|
assert_eq!(
|
|
parent.split(ShardCount(2)),
|
|
vec![
|
|
TenantShardId {
|
|
tenant_id,
|
|
shard_count: ShardCount(2),
|
|
shard_number: ShardNumber(0)
|
|
},
|
|
TenantShardId {
|
|
tenant_id,
|
|
shard_count: ShardCount(2),
|
|
shard_number: ShardNumber(1)
|
|
}
|
|
]
|
|
);
|
|
|
|
// Unsharded into 4
|
|
assert_eq!(
|
|
parent.split(ShardCount(4)),
|
|
vec![
|
|
TenantShardId {
|
|
tenant_id,
|
|
shard_count: ShardCount(4),
|
|
shard_number: ShardNumber(0)
|
|
},
|
|
TenantShardId {
|
|
tenant_id,
|
|
shard_count: ShardCount(4),
|
|
shard_number: ShardNumber(1)
|
|
},
|
|
TenantShardId {
|
|
tenant_id,
|
|
shard_count: ShardCount(4),
|
|
shard_number: ShardNumber(2)
|
|
},
|
|
TenantShardId {
|
|
tenant_id,
|
|
shard_count: ShardCount(4),
|
|
shard_number: ShardNumber(3)
|
|
}
|
|
]
|
|
);
|
|
|
|
// count=1 into 2 (check this works the same as unsharded.)
|
|
let parent = TenantShardId {
|
|
tenant_id,
|
|
shard_count: ShardCount(1),
|
|
shard_number: ShardNumber(0),
|
|
};
|
|
assert_eq!(
|
|
parent.split(ShardCount(2)),
|
|
vec![
|
|
TenantShardId {
|
|
tenant_id,
|
|
shard_count: ShardCount(2),
|
|
shard_number: ShardNumber(0)
|
|
},
|
|
TenantShardId {
|
|
tenant_id,
|
|
shard_count: ShardCount(2),
|
|
shard_number: ShardNumber(1)
|
|
}
|
|
]
|
|
);
|
|
|
|
// count=2 into count=8
|
|
let parent = TenantShardId {
|
|
tenant_id,
|
|
shard_count: ShardCount(2),
|
|
shard_number: ShardNumber(1),
|
|
};
|
|
assert_eq!(
|
|
parent.split(ShardCount(8)),
|
|
vec![
|
|
TenantShardId {
|
|
tenant_id,
|
|
shard_count: ShardCount(8),
|
|
shard_number: ShardNumber(1)
|
|
},
|
|
TenantShardId {
|
|
tenant_id,
|
|
shard_count: ShardCount(8),
|
|
shard_number: ShardNumber(3)
|
|
},
|
|
TenantShardId {
|
|
tenant_id,
|
|
shard_count: ShardCount(8),
|
|
shard_number: ShardNumber(5)
|
|
},
|
|
TenantShardId {
|
|
tenant_id,
|
|
shard_count: ShardCount(8),
|
|
shard_number: ShardNumber(7)
|
|
},
|
|
]
|
|
);
|
|
}
|
|
}
|