diff --git a/libs/pageserver_api/src/key.rs b/libs/pageserver_api/src/key.rs index b5350d6384..16715bc667 100644 --- a/libs/pageserver_api/src/key.rs +++ b/libs/pageserver_api/src/key.rs @@ -140,3 +140,7 @@ impl Key { }) } } + +pub fn is_rel_block_key(key: &Key) -> bool { + key.field1 == 0x00 && key.field4 != 0 +} diff --git a/libs/pageserver_api/src/shard.rs b/libs/pageserver_api/src/shard.rs index 3510b4dbca..7bcc0ee4c6 100644 --- a/libs/pageserver_api/src/shard.rs +++ b/libs/pageserver_api/src/shard.rs @@ -1,5 +1,6 @@ use std::{ops::RangeInclusive, str::FromStr}; +use crate::key::{is_rel_block_key, Key}; use hex::FromHex; use serde::{Deserialize, Serialize}; use thiserror; @@ -302,6 +303,8 @@ pub struct ShardStripeSize(pub u32); pub struct ShardLayout(u8); const LAYOUT_V1: ShardLayout = ShardLayout(1); +/// ShardIdentity uses a magic layout value to indicate if it is unusable +const LAYOUT_BROKEN: ShardLayout = ShardLayout(255); /// Default stripe size in pages: 256MiB divided by 8kiB page size. const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8); @@ -310,10 +313,10 @@ const DEFAULT_STRIPE_SIZE: ShardStripeSize = ShardStripeSize(256 * 1024 / 8); /// to resolve a key to a shard, and then check whether that shard is ==self. #[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)] pub struct ShardIdentity { - pub layout: ShardLayout, pub number: ShardNumber, pub count: ShardCount, - pub stripe_size: ShardStripeSize, + stripe_size: ShardStripeSize, + layout: ShardLayout, } #[derive(thiserror::Error, Debug, PartialEq, Eq)] @@ -339,6 +342,22 @@ impl ShardIdentity { } } + /// A broken instance of this type is only used for `TenantState::Broken` tenants, + /// which are constructed in code paths that don't have access to proper configuration. + /// + /// A ShardIdentity in this state may not be used for anything, and should not be persisted. + /// Enforcement is via assertions, to avoid making our interface fallible for this + /// edge case: it is the Tenant's responsibility to avoid trying to do any I/O when in a broken + /// state, and by extension to avoid trying to do any page->shard resolution. + pub fn broken(number: ShardNumber, count: ShardCount) -> Self { + Self { + number, + count, + layout: LAYOUT_BROKEN, + stripe_size: DEFAULT_STRIPE_SIZE, + } + } + pub fn is_unsharded(&self) -> bool { self.number == ShardNumber(0) && self.count == ShardCount(0) } @@ -365,6 +384,33 @@ impl ShardIdentity { }) } } + + fn is_broken(&self) -> bool { + self.layout == LAYOUT_BROKEN + } + + pub fn get_shard_number(&self, key: &Key) -> ShardNumber { + assert!(!self.is_broken()); + key_to_shard_number(self.count, self.stripe_size, key) + } + + /// Return true if the key should be ingested by this shard + pub fn is_key_local(&self, key: &Key) -> bool { + assert!(!self.is_broken()); + if self.count < ShardCount(2) || (key_is_shard0(key) && self.number == ShardNumber(0)) { + true + } else { + key_to_shard_number(self.count, self.stripe_size, key) == self.number + } + } + + pub fn shard_slug(&self) -> String { + if self.count > ShardCount(0) { + format!("-{:02x}{:02x}", self.number.0, self.count.0) + } else { + String::new() + } + } } impl Serialize for ShardIndex { @@ -438,6 +484,65 @@ impl<'de> Deserialize<'de> for ShardIndex { } } +/// Whether this key is always held on shard 0 (e.g. shard 0 holds all SLRU keys +/// in order to be able to serve basebackup requests without peer communication). +fn key_is_shard0(key: &Key) -> bool { + // To decide what to shard out to shards >0, we apply a simple rule that only + // relation pages are distributed to shards other than shard zero. Everything else gets + // stored on shard 0. This guarantees that shard 0 can independently serve basebackup + // requests, and any request other than those for particular blocks in relations. + // + // In this condition: + // - is_rel_block_key includes only relations, i.e. excludes SLRU data and + // all metadata. + // - field6 is set to -1 for relation size pages. + !(is_rel_block_key(key) && key.field6 != 0xffffffff) +} + +/// Provide the same result as the function in postgres `hashfn.h` with the same name +fn murmurhash32(mut h: u32) -> u32 { + h ^= h >> 16; + h = h.wrapping_mul(0x85ebca6b); + h ^= h >> 13; + h = h.wrapping_mul(0xc2b2ae35); + h ^= h >> 16; + h +} + +/// Provide the same result as the function in postgres `hashfn.h` with the same name +fn hash_combine(mut a: u32, mut b: u32) -> u32 { + b = b.wrapping_add(0x9e3779b9); + b = b.wrapping_add(a << 6); + b = b.wrapping_add(a >> 2); + + a ^= b; + a +} + +/// Where a Key is to be distributed across shards, select the shard. This function +/// does not account for keys that should be broadcast across shards. +/// +/// The hashing in this function must exactly match what we do in postgres smgr +/// code. The resulting distribution of pages is intended to preserve locality within +/// `stripe_size` ranges of contiguous block numbers in the same relation, while otherwise +/// distributing data pseudo-randomly. +/// +/// The mapping of key to shard is not stable across changes to ShardCount: this is intentional +/// and will be handled at higher levels when shards are split. +fn key_to_shard_number(count: ShardCount, stripe_size: ShardStripeSize, key: &Key) -> ShardNumber { + // Fast path for un-sharded tenants or broadcast keys + if count < ShardCount(2) || key_is_shard0(key) { + return ShardNumber(0); + } + + // relNode + let mut hash = murmurhash32(key.field4); + // blockNum/stripe size + hash = hash_combine(hash, murmurhash32(key.field6 / stripe_size.0)); + + ShardNumber((hash % count.0 as u32) as u8) +} + #[cfg(test)] mod tests { use std::str::FromStr; @@ -609,4 +714,29 @@ mod tests { Ok(()) } + + // These are only smoke tests to spot check that our implementation doesn't + // deviate from a few examples values: not aiming to validate the overall + // hashing algorithm. + #[test] + fn murmur_hash() { + assert_eq!(murmurhash32(0), 0); + + assert_eq!(hash_combine(0xb1ff3b40, 0), 0xfb7923c9); + } + + #[test] + fn shard_mapping() { + let key = Key { + field1: 0x00, + field2: 0x67f, + field3: 0x5, + field4: 0x400c, + field5: 0x00, + field6: 0x7d06, + }; + + let shard = key_to_shard_number(ShardCount(10), DEFAULT_STRIPE_SIZE, &key); + assert_eq!(shard, ShardNumber(8)); + } } diff --git a/pageserver/src/consumption_metrics/metrics.rs b/pageserver/src/consumption_metrics/metrics.rs index d70f1fec4d..918e45ea9e 100644 --- a/pageserver/src/consumption_metrics/metrics.rs +++ b/pageserver/src/consumption_metrics/metrics.rs @@ -2,6 +2,7 @@ use crate::{context::RequestContext, tenant::timeline::logical_size::CurrentLogi use chrono::{DateTime, Utc}; use consumption_metrics::EventType; use futures::stream::StreamExt; +use pageserver_api::shard::ShardNumber; use std::{sync::Arc, time::SystemTime}; use utils::{ id::{TenantId, TimelineId}, @@ -228,6 +229,11 @@ where while let Some((tenant_id, tenant)) = tenants.next().await { let mut tenant_resident_size = 0; + // Sharded tenants report all consumption metrics from shard zero + if tenant.tenant_shard_id().shard_number != ShardNumber(0) { + continue; + } + for timeline in tenant.list_timelines() { let timeline_id = timeline.timeline_id; diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 82c16eb9bd..9956e761e6 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -53,12 +53,14 @@ use crate::context::{DownloadBehavior, RequestContext}; use crate::import_datadir::import_wal_from_tar; use crate::metrics; use crate::metrics::LIVE_CONNECTIONS_COUNT; +use crate::pgdatadir_mapping::rel_block_to_key; use crate::task_mgr; use crate::task_mgr::TaskKind; use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id; use crate::tenant::mgr; use crate::tenant::mgr::get_active_tenant_with_timeout; use crate::tenant::mgr::GetActiveTenantError; +use crate::tenant::mgr::ShardSelector; use crate::tenant::Timeline; use crate::trace::Tracer; @@ -399,16 +401,19 @@ impl PageServerHandler { { debug_assert_current_span_has_tenant_and_timeline_id(); - // TODO(sharding): enumerate local tenant shards for this tenant, and select the one - // that should serve this request. - - // Make request tracer if needed + // Note that since one connection may contain getpage requests that target different + // shards (e.g. during splitting when the compute is not yet aware of the split), the tenant + // that we look up here may not be the one that serves all the actual requests: we will double + // check the mapping of key->shard later before calling into Timeline for getpage requests. let tenant = mgr::get_active_tenant_with_timeout( tenant_id, + ShardSelector::First, ACTIVE_TENANT_TIMEOUT, &task_mgr::shutdown_token(), ) .await?; + + // Make request tracer if needed let mut tracer = if tenant.get_trace_read_requests() { let connection_id = ConnectionId::generate(); let path = @@ -566,6 +571,7 @@ impl PageServerHandler { info!("creating new timeline"); let tenant = get_active_tenant_with_timeout( tenant_id, + ShardSelector::Zero, ACTIVE_TENANT_TIMEOUT, &task_mgr::shutdown_token(), ) @@ -628,7 +634,7 @@ impl PageServerHandler { debug_assert_current_span_has_tenant_and_timeline_id(); let timeline = self - .get_active_tenant_timeline(tenant_id, timeline_id) + .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero) .await?; let last_record_lsn = timeline.get_last_record_lsn(); if last_record_lsn != start_lsn { @@ -807,9 +813,49 @@ impl PageServerHandler { } */ - let page = timeline - .get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest, ctx) - .await?; + let key = rel_block_to_key(req.rel, req.blkno); + let page = if timeline.get_shard_identity().is_key_local(&key) { + timeline + .get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest, ctx) + .await? + } else { + // The Tenant shard we looked up at connection start does not hold this particular + // key: look for other shards in this tenant. This scenario occurs if a pageserver + // has multiple shards for the same tenant. + // + // TODO: optimize this (https://github.com/neondatabase/neon/pull/6037) + let timeline = match self + .get_active_tenant_timeline( + timeline.tenant_shard_id.tenant_id, + timeline.timeline_id, + ShardSelector::Page(key), + ) + .await + { + Ok(t) => t, + Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => { + // We already know this tenant exists in general, because we resolved it at + // start of connection. Getting a NotFound here indicates that the shard containing + // the requested page is not present on this node. + + // TODO: this should be some kind of structured error that the client will understand, + // so that it can block until its config is updated: this error is expected in the case + // that the Tenant's shards' placements are being updated and the client hasn't been + // informed yet. + // + // https://github.com/neondatabase/neon/issues/6038 + return Err(anyhow::anyhow!("Request routed to wrong shard")); + } + Err(e) => return Err(e.into()), + }; + + // Take a GateGuard for the duration of this request. If we were using our main Timeline object, + // the GateGuard was already held over the whole connection. + let _timeline_guard = timeline.gate.enter().map_err(|_| QueryError::Shutdown)?; + timeline + .get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest, ctx) + .await? + }; Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse { page, @@ -838,7 +884,7 @@ impl PageServerHandler { // check that the timeline exists let timeline = self - .get_active_tenant_timeline(tenant_id, timeline_id) + .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero) .await?; let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); if let Some(lsn) = lsn { @@ -944,9 +990,11 @@ impl PageServerHandler { &self, tenant_id: TenantId, timeline_id: TimelineId, + selector: ShardSelector, ) -> Result, GetActiveTimelineError> { let tenant = get_active_tenant_with_timeout( tenant_id, + selector, ACTIVE_TENANT_TIMEOUT, &task_mgr::shutdown_token(), ) @@ -1120,7 +1168,7 @@ where self.check_permission(Some(tenant_id))?; let timeline = self - .get_active_tenant_timeline(tenant_id, timeline_id) + .get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero) .await?; let end_of_timeline = timeline.get_last_record_rlsn(); @@ -1307,6 +1355,7 @@ where let tenant = get_active_tenant_with_timeout( tenant_id, + ShardSelector::Zero, ACTIVE_TENANT_TIMEOUT, &task_mgr::shutdown_token(), ) diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index a448142158..d37d953696 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -13,6 +13,7 @@ use crate::repository::*; use crate::walrecord::NeonWalRecord; use anyhow::Context; use bytes::{Buf, Bytes}; +use pageserver_api::key::is_rel_block_key; use pageserver_api::reltag::{RelTag, SlruKind}; use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM}; use postgres_ffi::BLCKSZ; @@ -1322,7 +1323,7 @@ impl<'a> DatadirModification<'a> { // Flush relation and SLRU data blocks, keep metadata. let mut retained_pending_updates = HashMap::new(); for (key, value) in self.pending_updates.drain() { - if is_rel_block_key(key) || is_slru_block_key(key) { + if is_rel_block_key(&key) || is_slru_block_key(key) { // This bails out on first error without modifying pending_updates. // That's Ok, cf this function's doc comment. writer.put(key, self.lsn, &value, ctx).await?; @@ -1578,7 +1579,7 @@ fn rel_dir_to_key(spcnode: Oid, dbnode: Oid) -> Key { } } -fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key { +pub(crate) fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key { Key { field1: 0x00, field2: rel.spcnode, @@ -1777,10 +1778,6 @@ pub fn key_to_rel_block(key: Key) -> anyhow::Result<(RelTag, BlockNumber)> { }) } -fn is_rel_block_key(key: Key) -> bool { - key.field1 == 0x00 && key.field4 != 0 -} - pub fn is_rel_fsm_block_key(key: Key) -> bool { key.field1 == 0x00 && key.field4 != 0 && key.field5 == FSM_FORKNUM && key.field6 != 0xffffffff } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 0b2e48e1ff..65cfef1097 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -19,6 +19,7 @@ use futures::stream::FuturesUnordered; use futures::FutureExt; use futures::StreamExt; use pageserver_api::models::TimelineState; +use pageserver_api::shard::ShardIdentity; use pageserver_api::shard::TenantShardId; use remote_storage::DownloadError; use remote_storage::GenericRemoteStorage; @@ -236,6 +237,9 @@ pub struct Tenant { tenant_shard_id: TenantShardId, + // The detailed sharding information, beyond the number/count in tenant_shard_id + shard_identity: ShardIdentity, + /// The remote storage generation, used to protect S3 objects from split-brain. /// Does not change over the lifetime of the [`Tenant`] object. /// @@ -568,6 +572,7 @@ impl Tenant { tenant_shard_id: TenantShardId, resources: TenantSharedResources, attached_conf: AttachedTenantConf, + shard_identity: ShardIdentity, init_order: Option, tenants: &'static std::sync::RwLock, mode: SpawnMode, @@ -589,6 +594,7 @@ impl Tenant { TenantState::Attaching, conf, attached_conf, + shard_identity, wal_redo_manager, tenant_shard_id, remote_storage.clone(), @@ -1040,6 +1046,9 @@ impl Tenant { }, conf, AttachedTenantConf::try_from(LocationConf::default()).unwrap(), + // Shard identity isn't meaningful for a broken tenant: it's just a placeholder + // to occupy the slot for this TenantShardId. + ShardIdentity::broken(tenant_shard_id.shard_number, tenant_shard_id.shard_count), wal_redo_manager, tenant_shard_id, None, @@ -2304,6 +2313,7 @@ impl Tenant { new_timeline_id, self.tenant_shard_id, self.generation, + self.shard_identity, Arc::clone(&self.walredo_mgr), resources, pg_version, @@ -2321,6 +2331,7 @@ impl Tenant { state: TenantState, conf: &'static PageServerConf, attached_conf: AttachedTenantConf, + shard_identity: ShardIdentity, walredo_mgr: Arc, tenant_shard_id: TenantShardId, remote_storage: Option, @@ -2382,6 +2393,7 @@ impl Tenant { Tenant { tenant_shard_id, + shard_identity, generation: attached_conf.location.generation, conf, // using now here is good enough approximation to catch tenants with really long @@ -3793,6 +3805,8 @@ pub(crate) mod harness { self.generation, )) .unwrap(), + // This is a legacy/test code path: sharding isn't supported here. + ShardIdentity::unsharded(), walredo_mgr, self.tenant_shard_id, Some(self.remote_storage.clone()), diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index f34d62ba53..8339e7b583 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -2,7 +2,8 @@ //! page server. use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf}; -use pageserver_api::shard::TenantShardId; +use pageserver_api::key::Key; +use pageserver_api::shard::{ShardIdentity, ShardNumber, TenantShardId}; use rand::{distributions::Alphanumeric, Rng}; use std::borrow::Cow; use std::collections::{BTreeMap, HashMap}; @@ -130,6 +131,18 @@ pub(crate) enum TenantsMapRemoveResult { InProgress(utils::completion::Barrier), } +/// When resolving a TenantId to a shard, we may be looking for the 0th +/// shard, or we might be looking for whichever shard holds a particular page. +pub(crate) enum ShardSelector { + /// Only return the 0th shard, if it is present. If a non-0th shard is present, + /// ignore it. + Zero, + /// Pick the first shard we find for the TenantId + First, + /// Pick the shard that holds this key + Page(Key), +} + impl TenantsMap { /// Convenience function for typical usage, where we want to get a `Tenant` object, for /// working with attached tenants. If the TenantId is in the map but in Secondary state, @@ -144,6 +157,49 @@ impl TenantsMap { } } + /// A page service client sends a TenantId, and to look up the correct Tenant we must + /// resolve this to a fully qualified TenantShardId. + fn resolve_shard( + &self, + tenant_id: &TenantId, + selector: ShardSelector, + ) -> Option { + let mut want_shard = None; + match self { + TenantsMap::Initializing => None, + TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => { + for slot in m.range(TenantShardId::tenant_range(*tenant_id)) { + match selector { + ShardSelector::First => return Some(*slot.0), + ShardSelector::Zero if slot.0.shard_number == ShardNumber(0) => { + return Some(*slot.0) + } + ShardSelector::Page(key) => { + if let Some(tenant) = slot.1.get_attached() { + // First slot we see for this tenant, calculate the expected shard number + // for the key: we will use this for checking if this and subsequent + // slots contain the key, rather than recalculating the hash each time. + if want_shard.is_none() { + want_shard = Some(tenant.shard_identity.get_shard_number(&key)); + } + + if Some(tenant.shard_identity.number) == want_shard { + return Some(*slot.0); + } + } else { + continue; + } + } + _ => continue, + } + } + + // Fall through: we didn't find an acceptable shard + None + } + } + } + /// Only for use from DeleteTenantFlow. This method directly removes a TenantSlot from the map. /// /// The normal way to remove a tenant is using a SlotGuard, which will gracefully remove the guarded @@ -515,12 +571,14 @@ pub async fn init_tenant_mgr( location_conf.attach_in_generation(generation); Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf).await?; + let shard_identity = location_conf.shard; match tenant_spawn( conf, tenant_shard_id, &tenant_dir_path, resources.clone(), AttachedTenantConf::try_from(location_conf)?, + shard_identity, Some(init_order.clone()), &TENANTS, SpawnMode::Normal, @@ -561,6 +619,7 @@ pub(crate) fn tenant_spawn( tenant_path: &Utf8Path, resources: TenantSharedResources, location_conf: AttachedTenantConf, + shard_identity: ShardIdentity, init_order: Option, tenants: &'static std::sync::RwLock, mode: SpawnMode, @@ -593,6 +652,7 @@ pub(crate) fn tenant_spawn( tenant_shard_id, resources, location_conf, + shard_identity, init_order, tenants, mode, @@ -762,12 +822,14 @@ pub(crate) async fn create_tenant( tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::MustNotExist)?; let tenant_path = super::create_tenant_files(conf, &location_conf, &tenant_shard_id).await?; + let shard_identity = location_conf.shard; let created_tenant = tenant_spawn( conf, tenant_shard_id, &tenant_path, resources, AttachedTenantConf::try_from(location_conf)?, + shard_identity, None, &TENANTS, SpawnMode::Create, @@ -860,6 +922,7 @@ impl TenantManager { Ok(()) } + #[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))] pub(crate) async fn upsert_location( &self, tenant_shard_id: TenantShardId, @@ -996,12 +1059,14 @@ impl TenantManager { .await .map_err(SetNewTenantConfigError::Persist)?; + let shard_identity = new_location_config.shard; let tenant = tenant_spawn( self.conf, tenant_shard_id, &tenant_path, self.resources.clone(), AttachedTenantConf::try_from(new_location_config)?, + shard_identity, None, self.tenants, SpawnMode::Normal, @@ -1100,6 +1165,7 @@ pub(crate) enum GetActiveTenantError { /// then wait for up to `timeout` (minus however long we waited for the slot). pub(crate) async fn get_active_tenant_with_timeout( tenant_id: TenantId, + shard_selector: ShardSelector, timeout: Duration, cancel: &CancellationToken, ) -> Result, GetActiveTenantError> { @@ -1108,15 +1174,17 @@ pub(crate) async fn get_active_tenant_with_timeout( Tenant(Arc), } - // TODO(sharding): make page service interface sharding-aware (page service should apply ShardIdentity to the key - // to decide which shard services the request) - let tenant_shard_id = TenantShardId::unsharded(tenant_id); - let wait_start = Instant::now(); let deadline = wait_start + timeout; - let wait_for = { + let (wait_for, tenant_shard_id) = { let locked = TENANTS.read().unwrap(); + + // Resolve TenantId to TenantShardId + let tenant_shard_id = locked.resolve_shard(&tenant_id, shard_selector).ok_or( + GetActiveTenantError::NotFound(GetTenantError::NotFound(tenant_id)), + )?; + let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read) .map_err(GetTenantError::MapState)?; match peek_slot { @@ -1126,7 +1194,7 @@ pub(crate) async fn get_active_tenant_with_timeout( // Fast path: we don't need to do any async waiting. return Ok(tenant.clone()); } - _ => WaitFor::Tenant(tenant.clone()), + _ => (WaitFor::Tenant(tenant.clone()), tenant_shard_id), } } Some(TenantSlot::Secondary) => { @@ -1134,7 +1202,9 @@ pub(crate) async fn get_active_tenant_with_timeout( tenant_id, ))) } - Some(TenantSlot::InProgress(barrier)) => WaitFor::Barrier(barrier.clone()), + Some(TenantSlot::InProgress(barrier)) => { + (WaitFor::Barrier(barrier.clone()), tenant_shard_id) + } None => { return Err(GetActiveTenantError::NotFound(GetTenantError::NotFound( tenant_id, @@ -1377,12 +1447,14 @@ pub(crate) async fn load_tenant( Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf).await?; + let shard_identity = location_conf.shard; let new_tenant = tenant_spawn( conf, tenant_shard_id, &tenant_path, resources, AttachedTenantConf::try_from(location_conf)?, + shard_identity, None, &TENANTS, SpawnMode::Normal, @@ -1472,12 +1544,14 @@ pub(crate) async fn attach_tenant( // TODO: tenant directory remains on disk if we bail out from here on. // See https://github.com/neondatabase/neon/issues/4233 + let shard_identity = location_conf.shard; let attached_tenant = tenant_spawn( conf, tenant_shard_id, &tenant_dir, resources, AttachedTenantConf::try_from(location_conf)?, + shard_identity, None, &TENANTS, SpawnMode::Normal, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index f02fd733b4..f666f1049f 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -18,7 +18,7 @@ use pageserver_api::{ DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, LayerMapInfo, TimelineState, }, - shard::TenantShardId, + shard::{ShardIdentity, TenantShardId}, }; use rand::Rng; use serde_with::serde_as; @@ -167,6 +167,10 @@ pub struct Timeline { /// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime. pub(crate) generation: Generation, + /// The detailed sharding information from our parent Tenant. This enables us to map keys + /// to shards, and is constant through the lifetime of this Timeline. + shard_identity: ShardIdentity, + pub pg_version: u32, /// The tuple has two elements. @@ -1335,6 +1339,7 @@ impl Timeline { timeline_id: TimelineId, tenant_shard_id: TenantShardId, generation: Generation, + shard_identity: ShardIdentity, walredo_mgr: Arc, resources: TimelineResources, pg_version: u32, @@ -1364,6 +1369,7 @@ impl Timeline { timeline_id, tenant_shard_id, generation, + shard_identity, pg_version, layers: Arc::new(tokio::sync::RwLock::new(LayerManager::create())), wanted_image_layers: Mutex::new(None), @@ -2478,6 +2484,10 @@ impl Timeline { Ok(Arc::clone(ancestor)) } + pub(crate) fn get_shard_identity(&self) -> &ShardIdentity { + &self.shard_identity + } + /// /// Get a handle to the latest layer for appending. ///