mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 20:42:54 +00:00
pageserver: add Key->Shard mapping logic & use it in page service (#5980)
## Problem When a pageserver receives a page service request identified by TenantId, it must decide which `Tenant` object to route it to. As in earlier PRs, this stuff is all a no-op for tenants with a single shard: calls to `is_key_local` always return true without doing any hashing on a single-shard ShardIdentity. Closes: https://github.com/neondatabase/neon/issues/6026 ## Summary of changes - Carry immutable `ShardIdentity` objects in Tenant and Timeline. These provide the information that Tenants/Timelines need to figure out which shard is responsible for which Key. - Augment `get_active_tenant_with_timeout` to take a `ShardSelector` specifying how the shard should be resolved for this tenant. This mode depends on the kind of request (e.g. basebackups always go to shard zero). - In `handle_get_page_at_lsn_request`, handle the case where the Timeline we looked up at connection time is not the correct shard for the page being requested. This can happen whenever one node holds multiple shards for the same tenant. This is currently written as a "slow path" with the optimistic expectation that usually we'll run with one shard per pageserver, and the Timeline resolved at connection time will be the one serving page requests. There is scope for optimization here later, to avoid doing the full shard lookup for each page. - Omit consumption metrics from nonzero shards: only the 0th shard is responsible for tracing accurate relation sizes. Note to reviewers: - Testing of these changes is happening separately on the `jcsp/sharding-pt1` branch, where we have hacked neon_local etc needed to run a test_pg_regress. - The main caveat to this implementation is that page service connections still look up one Timeline when the connection is opened, before they know which pages are going to be read. If there is one shard per pageserver then this will always also be the Timeline that serves page requests. However, if multiple shards are on one pageserver then get page requests will incur the cost of looking up the correct Timeline on each getpage request. We may look to improve this in future with a "sticky" timeline per connection handler so that subsequent requests for the same Timeline don't have to look up again, and/or by having postgres pass a shard hint when connecting. This is tracked in the "Loose ends" section of https://github.com/neondatabase/neon/issues/5507
This commit is contained in:
@@ -140,3 +140,7 @@ impl Key {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_rel_block_key(key: &Key) -> bool {
|
||||
key.field1 == 0x00 && key.field4 != 0
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::{ops::RangeInclusive, str::FromStr};
|
||||
|
||||
use crate::key::{is_rel_block_key, Key};
|
||||
use hex::FromHex;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use thiserror;
|
||||
@@ -302,6 +303,8 @@ pub struct ShardStripeSize(pub u32);
|
||||
pub struct ShardLayout(u8);
|
||||
|
||||
const LAYOUT_V1: ShardLayout = ShardLayout(1);
|
||||
/// ShardIdentity uses a magic layout value to indicate if it is unusable
|
||||
const LAYOUT_BROKEN: ShardLayout = ShardLayout(255);
|
||||
|
||||
/// Default stripe size in pages: 256MiB divided by 8kiB page size.
|
||||
const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8);
|
||||
@@ -310,10 +313,10 @@ const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8);
|
||||
/// to resolve a key to a shard, and then check whether that shard is ==self.
|
||||
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
|
||||
pub struct ShardIdentity {
|
||||
pub layout: ShardLayout,
|
||||
pub number: ShardNumber,
|
||||
pub count: ShardCount,
|
||||
pub stripe_size: ShardStripeSize,
|
||||
stripe_size: ShardStripeSize,
|
||||
layout: ShardLayout,
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug, PartialEq, Eq)]
|
||||
@@ -339,6 +342,22 @@ impl ShardIdentity {
|
||||
}
|
||||
}
|
||||
|
||||
/// A broken instance of this type is only used for `TenantState::Broken` tenants,
|
||||
/// which are constructed in code paths that don't have access to proper configuration.
|
||||
///
|
||||
/// A ShardIdentity in this state may not be used for anything, and should not be persisted.
|
||||
/// Enforcement is via assertions, to avoid making our interface fallible for this
|
||||
/// edge case: it is the Tenant's responsibility to avoid trying to do any I/O when in a broken
|
||||
/// state, and by extension to avoid trying to do any page->shard resolution.
|
||||
pub fn broken(number: ShardNumber, count: ShardCount) -> Self {
|
||||
Self {
|
||||
number,
|
||||
count,
|
||||
layout: LAYOUT_BROKEN,
|
||||
stripe_size: DEFAULT_STRIPE_SIZE,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_unsharded(&self) -> bool {
|
||||
self.number == ShardNumber(0) && self.count == ShardCount(0)
|
||||
}
|
||||
@@ -365,6 +384,33 @@ impl ShardIdentity {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn is_broken(&self) -> bool {
|
||||
self.layout == LAYOUT_BROKEN
|
||||
}
|
||||
|
||||
pub fn get_shard_number(&self, key: &Key) -> ShardNumber {
|
||||
assert!(!self.is_broken());
|
||||
key_to_shard_number(self.count, self.stripe_size, key)
|
||||
}
|
||||
|
||||
/// Return true if the key should be ingested by this shard
|
||||
pub fn is_key_local(&self, key: &Key) -> bool {
|
||||
assert!(!self.is_broken());
|
||||
if self.count < ShardCount(2) || (key_is_shard0(key) && self.number == ShardNumber(0)) {
|
||||
true
|
||||
} else {
|
||||
key_to_shard_number(self.count, self.stripe_size, key) == self.number
|
||||
}
|
||||
}
|
||||
|
||||
pub fn shard_slug(&self) -> String {
|
||||
if self.count > ShardCount(0) {
|
||||
format!("-{:02x}{:02x}", self.number.0, self.count.0)
|
||||
} else {
|
||||
String::new()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for ShardIndex {
|
||||
@@ -438,6 +484,65 @@ impl<'de> Deserialize<'de> for ShardIndex {
|
||||
}
|
||||
}
|
||||
|
||||
/// Whether this key is always held on shard 0 (e.g. shard 0 holds all SLRU keys
|
||||
/// in order to be able to serve basebackup requests without peer communication).
|
||||
fn key_is_shard0(key: &Key) -> bool {
|
||||
// To decide what to shard out to shards >0, we apply a simple rule that only
|
||||
// relation pages are distributed to shards other than shard zero. Everything else gets
|
||||
// stored on shard 0. This guarantees that shard 0 can independently serve basebackup
|
||||
// requests, and any request other than those for particular blocks in relations.
|
||||
//
|
||||
// In this condition:
|
||||
// - is_rel_block_key includes only relations, i.e. excludes SLRU data and
|
||||
// all metadata.
|
||||
// - field6 is set to -1 for relation size pages.
|
||||
!(is_rel_block_key(key) && key.field6 != 0xffffffff)
|
||||
}
|
||||
|
||||
/// Provide the same result as the function in postgres `hashfn.h` with the same name
|
||||
fn murmurhash32(mut h: u32) -> u32 {
|
||||
h ^= h >> 16;
|
||||
h = h.wrapping_mul(0x85ebca6b);
|
||||
h ^= h >> 13;
|
||||
h = h.wrapping_mul(0xc2b2ae35);
|
||||
h ^= h >> 16;
|
||||
h
|
||||
}
|
||||
|
||||
/// Provide the same result as the function in postgres `hashfn.h` with the same name
|
||||
fn hash_combine(mut a: u32, mut b: u32) -> u32 {
|
||||
b = b.wrapping_add(0x9e3779b9);
|
||||
b = b.wrapping_add(a << 6);
|
||||
b = b.wrapping_add(a >> 2);
|
||||
|
||||
a ^= b;
|
||||
a
|
||||
}
|
||||
|
||||
/// Where a Key is to be distributed across shards, select the shard. This function
|
||||
/// does not account for keys that should be broadcast across shards.
|
||||
///
|
||||
/// The hashing in this function must exactly match what we do in postgres smgr
|
||||
/// code. The resulting distribution of pages is intended to preserve locality within
|
||||
/// `stripe_size` ranges of contiguous block numbers in the same relation, while otherwise
|
||||
/// distributing data pseudo-randomly.
|
||||
///
|
||||
/// The mapping of key to shard is not stable across changes to ShardCount: this is intentional
|
||||
/// and will be handled at higher levels when shards are split.
|
||||
fn key_to_shard_number(count: ShardCount, stripe_size: ShardStripeSize, key: &Key) -> ShardNumber {
|
||||
// Fast path for un-sharded tenants or broadcast keys
|
||||
if count < ShardCount(2) || key_is_shard0(key) {
|
||||
return ShardNumber(0);
|
||||
}
|
||||
|
||||
// relNode
|
||||
let mut hash = murmurhash32(key.field4);
|
||||
// blockNum/stripe size
|
||||
hash = hash_combine(hash, murmurhash32(key.field6 / stripe_size.0));
|
||||
|
||||
ShardNumber((hash % count.0 as u32) as u8)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::str::FromStr;
|
||||
@@ -609,4 +714,29 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// These are only smoke tests to spot check that our implementation doesn't
|
||||
// deviate from a few examples values: not aiming to validate the overall
|
||||
// hashing algorithm.
|
||||
#[test]
|
||||
fn murmur_hash() {
|
||||
assert_eq!(murmurhash32(0), 0);
|
||||
|
||||
assert_eq!(hash_combine(0xb1ff3b40, 0), 0xfb7923c9);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn shard_mapping() {
|
||||
let key = Key {
|
||||
field1: 0x00,
|
||||
field2: 0x67f,
|
||||
field3: 0x5,
|
||||
field4: 0x400c,
|
||||
field5: 0x00,
|
||||
field6: 0x7d06,
|
||||
};
|
||||
|
||||
let shard = key_to_shard_number(ShardCount(10), DEFAULT_STRIPE_SIZE, &key);
|
||||
assert_eq!(shard, ShardNumber(8));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ use crate::{context::RequestContext, tenant::timeline::logical_size::CurrentLogi
|
||||
use chrono::{DateTime, Utc};
|
||||
use consumption_metrics::EventType;
|
||||
use futures::stream::StreamExt;
|
||||
use pageserver_api::shard::ShardNumber;
|
||||
use std::{sync::Arc, time::SystemTime};
|
||||
use utils::{
|
||||
id::{TenantId, TimelineId},
|
||||
@@ -228,6 +229,11 @@ where
|
||||
while let Some((tenant_id, tenant)) = tenants.next().await {
|
||||
let mut tenant_resident_size = 0;
|
||||
|
||||
// Sharded tenants report all consumption metrics from shard zero
|
||||
if tenant.tenant_shard_id().shard_number != ShardNumber(0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
for timeline in tenant.list_timelines() {
|
||||
let timeline_id = timeline.timeline_id;
|
||||
|
||||
|
||||
@@ -53,12 +53,14 @@ use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::import_datadir::import_wal_from_tar;
|
||||
use crate::metrics;
|
||||
use crate::metrics::LIVE_CONNECTIONS_COUNT;
|
||||
use crate::pgdatadir_mapping::rel_block_to_key;
|
||||
use crate::task_mgr;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
|
||||
use crate::tenant::mgr;
|
||||
use crate::tenant::mgr::get_active_tenant_with_timeout;
|
||||
use crate::tenant::mgr::GetActiveTenantError;
|
||||
use crate::tenant::mgr::ShardSelector;
|
||||
use crate::tenant::Timeline;
|
||||
use crate::trace::Tracer;
|
||||
|
||||
@@ -399,16 +401,19 @@ impl PageServerHandler {
|
||||
{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
// TODO(sharding): enumerate local tenant shards for this tenant, and select the one
|
||||
// that should serve this request.
|
||||
|
||||
// Make request tracer if needed
|
||||
// Note that since one connection may contain getpage requests that target different
|
||||
// shards (e.g. during splitting when the compute is not yet aware of the split), the tenant
|
||||
// that we look up here may not be the one that serves all the actual requests: we will double
|
||||
// check the mapping of key->shard later before calling into Timeline for getpage requests.
|
||||
let tenant = mgr::get_active_tenant_with_timeout(
|
||||
tenant_id,
|
||||
ShardSelector::First,
|
||||
ACTIVE_TENANT_TIMEOUT,
|
||||
&task_mgr::shutdown_token(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Make request tracer if needed
|
||||
let mut tracer = if tenant.get_trace_read_requests() {
|
||||
let connection_id = ConnectionId::generate();
|
||||
let path =
|
||||
@@ -566,6 +571,7 @@ impl PageServerHandler {
|
||||
info!("creating new timeline");
|
||||
let tenant = get_active_tenant_with_timeout(
|
||||
tenant_id,
|
||||
ShardSelector::Zero,
|
||||
ACTIVE_TENANT_TIMEOUT,
|
||||
&task_mgr::shutdown_token(),
|
||||
)
|
||||
@@ -628,7 +634,7 @@ impl PageServerHandler {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
let timeline = self
|
||||
.get_active_tenant_timeline(tenant_id, timeline_id)
|
||||
.get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
|
||||
.await?;
|
||||
let last_record_lsn = timeline.get_last_record_lsn();
|
||||
if last_record_lsn != start_lsn {
|
||||
@@ -807,9 +813,49 @@ impl PageServerHandler {
|
||||
}
|
||||
*/
|
||||
|
||||
let page = timeline
|
||||
.get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest, ctx)
|
||||
.await?;
|
||||
let key = rel_block_to_key(req.rel, req.blkno);
|
||||
let page = if timeline.get_shard_identity().is_key_local(&key) {
|
||||
timeline
|
||||
.get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest, ctx)
|
||||
.await?
|
||||
} else {
|
||||
// The Tenant shard we looked up at connection start does not hold this particular
|
||||
// key: look for other shards in this tenant. This scenario occurs if a pageserver
|
||||
// has multiple shards for the same tenant.
|
||||
//
|
||||
// TODO: optimize this (https://github.com/neondatabase/neon/pull/6037)
|
||||
let timeline = match self
|
||||
.get_active_tenant_timeline(
|
||||
timeline.tenant_shard_id.tenant_id,
|
||||
timeline.timeline_id,
|
||||
ShardSelector::Page(key),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(t) => t,
|
||||
Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
|
||||
// We already know this tenant exists in general, because we resolved it at
|
||||
// start of connection. Getting a NotFound here indicates that the shard containing
|
||||
// the requested page is not present on this node.
|
||||
|
||||
// TODO: this should be some kind of structured error that the client will understand,
|
||||
// so that it can block until its config is updated: this error is expected in the case
|
||||
// that the Tenant's shards' placements are being updated and the client hasn't been
|
||||
// informed yet.
|
||||
//
|
||||
// https://github.com/neondatabase/neon/issues/6038
|
||||
return Err(anyhow::anyhow!("Request routed to wrong shard"));
|
||||
}
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
|
||||
// Take a GateGuard for the duration of this request. If we were using our main Timeline object,
|
||||
// the GateGuard was already held over the whole connection.
|
||||
let _timeline_guard = timeline.gate.enter().map_err(|_| QueryError::Shutdown)?;
|
||||
timeline
|
||||
.get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest, ctx)
|
||||
.await?
|
||||
};
|
||||
|
||||
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
|
||||
page,
|
||||
@@ -838,7 +884,7 @@ impl PageServerHandler {
|
||||
|
||||
// check that the timeline exists
|
||||
let timeline = self
|
||||
.get_active_tenant_timeline(tenant_id, timeline_id)
|
||||
.get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
|
||||
.await?;
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
if let Some(lsn) = lsn {
|
||||
@@ -944,9 +990,11 @@ impl PageServerHandler {
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
selector: ShardSelector,
|
||||
) -> Result<Arc<Timeline>, GetActiveTimelineError> {
|
||||
let tenant = get_active_tenant_with_timeout(
|
||||
tenant_id,
|
||||
selector,
|
||||
ACTIVE_TENANT_TIMEOUT,
|
||||
&task_mgr::shutdown_token(),
|
||||
)
|
||||
@@ -1120,7 +1168,7 @@ where
|
||||
|
||||
self.check_permission(Some(tenant_id))?;
|
||||
let timeline = self
|
||||
.get_active_tenant_timeline(tenant_id, timeline_id)
|
||||
.get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
|
||||
.await?;
|
||||
|
||||
let end_of_timeline = timeline.get_last_record_rlsn();
|
||||
@@ -1307,6 +1355,7 @@ where
|
||||
|
||||
let tenant = get_active_tenant_with_timeout(
|
||||
tenant_id,
|
||||
ShardSelector::Zero,
|
||||
ACTIVE_TENANT_TIMEOUT,
|
||||
&task_mgr::shutdown_token(),
|
||||
)
|
||||
|
||||
@@ -13,6 +13,7 @@ use crate::repository::*;
|
||||
use crate::walrecord::NeonWalRecord;
|
||||
use anyhow::Context;
|
||||
use bytes::{Buf, Bytes};
|
||||
use pageserver_api::key::is_rel_block_key;
|
||||
use pageserver_api::reltag::{RelTag, SlruKind};
|
||||
use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
|
||||
use postgres_ffi::BLCKSZ;
|
||||
@@ -1322,7 +1323,7 @@ impl<'a> DatadirModification<'a> {
|
||||
// Flush relation and SLRU data blocks, keep metadata.
|
||||
let mut retained_pending_updates = HashMap::new();
|
||||
for (key, value) in self.pending_updates.drain() {
|
||||
if is_rel_block_key(key) || is_slru_block_key(key) {
|
||||
if is_rel_block_key(&key) || is_slru_block_key(key) {
|
||||
// This bails out on first error without modifying pending_updates.
|
||||
// That's Ok, cf this function's doc comment.
|
||||
writer.put(key, self.lsn, &value, ctx).await?;
|
||||
@@ -1578,7 +1579,7 @@ fn rel_dir_to_key(spcnode: Oid, dbnode: Oid) -> Key {
|
||||
}
|
||||
}
|
||||
|
||||
fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key {
|
||||
pub(crate) fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key {
|
||||
Key {
|
||||
field1: 0x00,
|
||||
field2: rel.spcnode,
|
||||
@@ -1777,10 +1778,6 @@ pub fn key_to_rel_block(key: Key) -> anyhow::Result<(RelTag, BlockNumber)> {
|
||||
})
|
||||
}
|
||||
|
||||
fn is_rel_block_key(key: Key) -> bool {
|
||||
key.field1 == 0x00 && key.field4 != 0
|
||||
}
|
||||
|
||||
pub fn is_rel_fsm_block_key(key: Key) -> bool {
|
||||
key.field1 == 0x00 && key.field4 != 0 && key.field5 == FSM_FORKNUM && key.field6 != 0xffffffff
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ use futures::stream::FuturesUnordered;
|
||||
use futures::FutureExt;
|
||||
use futures::StreamExt;
|
||||
use pageserver_api::models::TimelineState;
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use remote_storage::DownloadError;
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
@@ -236,6 +237,9 @@ pub struct Tenant {
|
||||
|
||||
tenant_shard_id: TenantShardId,
|
||||
|
||||
// The detailed sharding information, beyond the number/count in tenant_shard_id
|
||||
shard_identity: ShardIdentity,
|
||||
|
||||
/// The remote storage generation, used to protect S3 objects from split-brain.
|
||||
/// Does not change over the lifetime of the [`Tenant`] object.
|
||||
///
|
||||
@@ -568,6 +572,7 @@ impl Tenant {
|
||||
tenant_shard_id: TenantShardId,
|
||||
resources: TenantSharedResources,
|
||||
attached_conf: AttachedTenantConf,
|
||||
shard_identity: ShardIdentity,
|
||||
init_order: Option<InitializationOrder>,
|
||||
tenants: &'static std::sync::RwLock<TenantsMap>,
|
||||
mode: SpawnMode,
|
||||
@@ -589,6 +594,7 @@ impl Tenant {
|
||||
TenantState::Attaching,
|
||||
conf,
|
||||
attached_conf,
|
||||
shard_identity,
|
||||
wal_redo_manager,
|
||||
tenant_shard_id,
|
||||
remote_storage.clone(),
|
||||
@@ -1040,6 +1046,9 @@ impl Tenant {
|
||||
},
|
||||
conf,
|
||||
AttachedTenantConf::try_from(LocationConf::default()).unwrap(),
|
||||
// Shard identity isn't meaningful for a broken tenant: it's just a placeholder
|
||||
// to occupy the slot for this TenantShardId.
|
||||
ShardIdentity::broken(tenant_shard_id.shard_number, tenant_shard_id.shard_count),
|
||||
wal_redo_manager,
|
||||
tenant_shard_id,
|
||||
None,
|
||||
@@ -2304,6 +2313,7 @@ impl Tenant {
|
||||
new_timeline_id,
|
||||
self.tenant_shard_id,
|
||||
self.generation,
|
||||
self.shard_identity,
|
||||
Arc::clone(&self.walredo_mgr),
|
||||
resources,
|
||||
pg_version,
|
||||
@@ -2321,6 +2331,7 @@ impl Tenant {
|
||||
state: TenantState,
|
||||
conf: &'static PageServerConf,
|
||||
attached_conf: AttachedTenantConf,
|
||||
shard_identity: ShardIdentity,
|
||||
walredo_mgr: Arc<WalRedoManager>,
|
||||
tenant_shard_id: TenantShardId,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
@@ -2382,6 +2393,7 @@ impl Tenant {
|
||||
|
||||
Tenant {
|
||||
tenant_shard_id,
|
||||
shard_identity,
|
||||
generation: attached_conf.location.generation,
|
||||
conf,
|
||||
// using now here is good enough approximation to catch tenants with really long
|
||||
@@ -3793,6 +3805,8 @@ pub(crate) mod harness {
|
||||
self.generation,
|
||||
))
|
||||
.unwrap(),
|
||||
// This is a legacy/test code path: sharding isn't supported here.
|
||||
ShardIdentity::unsharded(),
|
||||
walredo_mgr,
|
||||
self.tenant_shard_id,
|
||||
Some(self.remote_storage.clone()),
|
||||
|
||||
@@ -2,7 +2,8 @@
|
||||
//! page server.
|
||||
|
||||
use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::shard::{ShardIdentity, ShardNumber, TenantShardId};
|
||||
use rand::{distributions::Alphanumeric, Rng};
|
||||
use std::borrow::Cow;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
@@ -130,6 +131,18 @@ pub(crate) enum TenantsMapRemoveResult {
|
||||
InProgress(utils::completion::Barrier),
|
||||
}
|
||||
|
||||
/// When resolving a TenantId to a shard, we may be looking for the 0th
|
||||
/// shard, or we might be looking for whichever shard holds a particular page.
|
||||
pub(crate) enum ShardSelector {
|
||||
/// Only return the 0th shard, if it is present. If a non-0th shard is present,
|
||||
/// ignore it.
|
||||
Zero,
|
||||
/// Pick the first shard we find for the TenantId
|
||||
First,
|
||||
/// Pick the shard that holds this key
|
||||
Page(Key),
|
||||
}
|
||||
|
||||
impl TenantsMap {
|
||||
/// Convenience function for typical usage, where we want to get a `Tenant` object, for
|
||||
/// working with attached tenants. If the TenantId is in the map but in Secondary state,
|
||||
@@ -144,6 +157,49 @@ impl TenantsMap {
|
||||
}
|
||||
}
|
||||
|
||||
/// A page service client sends a TenantId, and to look up the correct Tenant we must
|
||||
/// resolve this to a fully qualified TenantShardId.
|
||||
fn resolve_shard(
|
||||
&self,
|
||||
tenant_id: &TenantId,
|
||||
selector: ShardSelector,
|
||||
) -> Option<TenantShardId> {
|
||||
let mut want_shard = None;
|
||||
match self {
|
||||
TenantsMap::Initializing => None,
|
||||
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
|
||||
for slot in m.range(TenantShardId::tenant_range(*tenant_id)) {
|
||||
match selector {
|
||||
ShardSelector::First => return Some(*slot.0),
|
||||
ShardSelector::Zero if slot.0.shard_number == ShardNumber(0) => {
|
||||
return Some(*slot.0)
|
||||
}
|
||||
ShardSelector::Page(key) => {
|
||||
if let Some(tenant) = slot.1.get_attached() {
|
||||
// First slot we see for this tenant, calculate the expected shard number
|
||||
// for the key: we will use this for checking if this and subsequent
|
||||
// slots contain the key, rather than recalculating the hash each time.
|
||||
if want_shard.is_none() {
|
||||
want_shard = Some(tenant.shard_identity.get_shard_number(&key));
|
||||
}
|
||||
|
||||
if Some(tenant.shard_identity.number) == want_shard {
|
||||
return Some(*slot.0);
|
||||
}
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
_ => continue,
|
||||
}
|
||||
}
|
||||
|
||||
// Fall through: we didn't find an acceptable shard
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Only for use from DeleteTenantFlow. This method directly removes a TenantSlot from the map.
|
||||
///
|
||||
/// The normal way to remove a tenant is using a SlotGuard, which will gracefully remove the guarded
|
||||
@@ -515,12 +571,14 @@ pub async fn init_tenant_mgr(
|
||||
location_conf.attach_in_generation(generation);
|
||||
Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf).await?;
|
||||
|
||||
let shard_identity = location_conf.shard;
|
||||
match tenant_spawn(
|
||||
conf,
|
||||
tenant_shard_id,
|
||||
&tenant_dir_path,
|
||||
resources.clone(),
|
||||
AttachedTenantConf::try_from(location_conf)?,
|
||||
shard_identity,
|
||||
Some(init_order.clone()),
|
||||
&TENANTS,
|
||||
SpawnMode::Normal,
|
||||
@@ -561,6 +619,7 @@ pub(crate) fn tenant_spawn(
|
||||
tenant_path: &Utf8Path,
|
||||
resources: TenantSharedResources,
|
||||
location_conf: AttachedTenantConf,
|
||||
shard_identity: ShardIdentity,
|
||||
init_order: Option<InitializationOrder>,
|
||||
tenants: &'static std::sync::RwLock<TenantsMap>,
|
||||
mode: SpawnMode,
|
||||
@@ -593,6 +652,7 @@ pub(crate) fn tenant_spawn(
|
||||
tenant_shard_id,
|
||||
resources,
|
||||
location_conf,
|
||||
shard_identity,
|
||||
init_order,
|
||||
tenants,
|
||||
mode,
|
||||
@@ -762,12 +822,14 @@ pub(crate) async fn create_tenant(
|
||||
tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustNotExist)?;
|
||||
let tenant_path = super::create_tenant_files(conf, &location_conf, &tenant_shard_id).await?;
|
||||
|
||||
let shard_identity = location_conf.shard;
|
||||
let created_tenant = tenant_spawn(
|
||||
conf,
|
||||
tenant_shard_id,
|
||||
&tenant_path,
|
||||
resources,
|
||||
AttachedTenantConf::try_from(location_conf)?,
|
||||
shard_identity,
|
||||
None,
|
||||
&TENANTS,
|
||||
SpawnMode::Create,
|
||||
@@ -860,6 +922,7 @@ impl TenantManager {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
|
||||
pub(crate) async fn upsert_location(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
@@ -996,12 +1059,14 @@ impl TenantManager {
|
||||
.await
|
||||
.map_err(SetNewTenantConfigError::Persist)?;
|
||||
|
||||
let shard_identity = new_location_config.shard;
|
||||
let tenant = tenant_spawn(
|
||||
self.conf,
|
||||
tenant_shard_id,
|
||||
&tenant_path,
|
||||
self.resources.clone(),
|
||||
AttachedTenantConf::try_from(new_location_config)?,
|
||||
shard_identity,
|
||||
None,
|
||||
self.tenants,
|
||||
SpawnMode::Normal,
|
||||
@@ -1100,6 +1165,7 @@ pub(crate) enum GetActiveTenantError {
|
||||
/// then wait for up to `timeout` (minus however long we waited for the slot).
|
||||
pub(crate) async fn get_active_tenant_with_timeout(
|
||||
tenant_id: TenantId,
|
||||
shard_selector: ShardSelector,
|
||||
timeout: Duration,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<Arc<Tenant>, GetActiveTenantError> {
|
||||
@@ -1108,15 +1174,17 @@ pub(crate) async fn get_active_tenant_with_timeout(
|
||||
Tenant(Arc<Tenant>),
|
||||
}
|
||||
|
||||
// TODO(sharding): make page service interface sharding-aware (page service should apply ShardIdentity to the key
|
||||
// to decide which shard services the request)
|
||||
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
|
||||
|
||||
let wait_start = Instant::now();
|
||||
let deadline = wait_start + timeout;
|
||||
|
||||
let wait_for = {
|
||||
let (wait_for, tenant_shard_id) = {
|
||||
let locked = TENANTS.read().unwrap();
|
||||
|
||||
// Resolve TenantId to TenantShardId
|
||||
let tenant_shard_id = locked.resolve_shard(&tenant_id, shard_selector).ok_or(
|
||||
GetActiveTenantError::NotFound(GetTenantError::NotFound(tenant_id)),
|
||||
)?;
|
||||
|
||||
let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
|
||||
.map_err(GetTenantError::MapState)?;
|
||||
match peek_slot {
|
||||
@@ -1126,7 +1194,7 @@ pub(crate) async fn get_active_tenant_with_timeout(
|
||||
// Fast path: we don't need to do any async waiting.
|
||||
return Ok(tenant.clone());
|
||||
}
|
||||
_ => WaitFor::Tenant(tenant.clone()),
|
||||
_ => (WaitFor::Tenant(tenant.clone()), tenant_shard_id),
|
||||
}
|
||||
}
|
||||
Some(TenantSlot::Secondary) => {
|
||||
@@ -1134,7 +1202,9 @@ pub(crate) async fn get_active_tenant_with_timeout(
|
||||
tenant_id,
|
||||
)))
|
||||
}
|
||||
Some(TenantSlot::InProgress(barrier)) => WaitFor::Barrier(barrier.clone()),
|
||||
Some(TenantSlot::InProgress(barrier)) => {
|
||||
(WaitFor::Barrier(barrier.clone()), tenant_shard_id)
|
||||
}
|
||||
None => {
|
||||
return Err(GetActiveTenantError::NotFound(GetTenantError::NotFound(
|
||||
tenant_id,
|
||||
@@ -1377,12 +1447,14 @@ pub(crate) async fn load_tenant(
|
||||
|
||||
Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf).await?;
|
||||
|
||||
let shard_identity = location_conf.shard;
|
||||
let new_tenant = tenant_spawn(
|
||||
conf,
|
||||
tenant_shard_id,
|
||||
&tenant_path,
|
||||
resources,
|
||||
AttachedTenantConf::try_from(location_conf)?,
|
||||
shard_identity,
|
||||
None,
|
||||
&TENANTS,
|
||||
SpawnMode::Normal,
|
||||
@@ -1472,12 +1544,14 @@ pub(crate) async fn attach_tenant(
|
||||
// TODO: tenant directory remains on disk if we bail out from here on.
|
||||
// See https://github.com/neondatabase/neon/issues/4233
|
||||
|
||||
let shard_identity = location_conf.shard;
|
||||
let attached_tenant = tenant_spawn(
|
||||
conf,
|
||||
tenant_shard_id,
|
||||
&tenant_dir,
|
||||
resources,
|
||||
AttachedTenantConf::try_from(location_conf)?,
|
||||
shard_identity,
|
||||
None,
|
||||
&TENANTS,
|
||||
SpawnMode::Normal,
|
||||
|
||||
@@ -18,7 +18,7 @@ use pageserver_api::{
|
||||
DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, LayerMapInfo,
|
||||
TimelineState,
|
||||
},
|
||||
shard::TenantShardId,
|
||||
shard::{ShardIdentity, TenantShardId},
|
||||
};
|
||||
use rand::Rng;
|
||||
use serde_with::serde_as;
|
||||
@@ -167,6 +167,10 @@ pub struct Timeline {
|
||||
/// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
|
||||
pub(crate) generation: Generation,
|
||||
|
||||
/// The detailed sharding information from our parent Tenant. This enables us to map keys
|
||||
/// to shards, and is constant through the lifetime of this Timeline.
|
||||
shard_identity: ShardIdentity,
|
||||
|
||||
pub pg_version: u32,
|
||||
|
||||
/// The tuple has two elements.
|
||||
@@ -1335,6 +1339,7 @@ impl Timeline {
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
generation: Generation,
|
||||
shard_identity: ShardIdentity,
|
||||
walredo_mgr: Arc<super::WalRedoManager>,
|
||||
resources: TimelineResources,
|
||||
pg_version: u32,
|
||||
@@ -1364,6 +1369,7 @@ impl Timeline {
|
||||
timeline_id,
|
||||
tenant_shard_id,
|
||||
generation,
|
||||
shard_identity,
|
||||
pg_version,
|
||||
layers: Arc::new(tokio::sync::RwLock::new(LayerManager::create())),
|
||||
wanted_image_layers: Mutex::new(None),
|
||||
@@ -2478,6 +2484,10 @@ impl Timeline {
|
||||
Ok(Arc::clone(ancestor))
|
||||
}
|
||||
|
||||
pub(crate) fn get_shard_identity(&self) -> &ShardIdentity {
|
||||
&self.shard_identity
|
||||
}
|
||||
|
||||
///
|
||||
/// Get a handle to the latest layer for appending.
|
||||
///
|
||||
|
||||
Reference in New Issue
Block a user