diff --git a/control_plane/src/bin/attachment_service.rs b/control_plane/src/bin/attachment_service.rs
index 16577e27d6..be7cff352c 100644
--- a/control_plane/src/bin/attachment_service.rs
+++ b/control_plane/src/bin/attachment_service.rs
@@ -9,6 +9,7 @@ use clap::Parser;
use hex::FromHex;
use hyper::StatusCode;
use hyper::{Body, Request, Response};
+use pageserver_api::shard::TenantShardId;
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use std::{collections::HashMap, sync::Arc};
@@ -173,7 +174,8 @@ async fn handle_re_attach(mut req: Request
) -> Result, ApiE
if state.pageserver == Some(reattach_req.node_id) {
state.generation += 1;
response.tenants.push(ReAttachResponseTenant {
- id: *t,
+ // TODO(sharding): make this shard-aware
+ id: TenantShardId::unsharded(*t),
gen: state.generation,
});
}
@@ -196,7 +198,8 @@ async fn handle_validate(mut req: Request) -> Result, ApiEr
};
for req_tenant in validate_req.tenants {
- if let Some(tenant_state) = locked.tenants.get(&req_tenant.id) {
+ // TODO(sharding): make this shard-aware
+ if let Some(tenant_state) = locked.tenants.get(&req_tenant.id.tenant_id) {
let valid = tenant_state.generation == req_tenant.gen;
response.tenants.push(ValidateResponseTenant {
id: req_tenant.id,
diff --git a/libs/pageserver_api/src/control_api.rs b/libs/pageserver_api/src/control_api.rs
index 8232e81b98..0acc3a7bb0 100644
--- a/libs/pageserver_api/src/control_api.rs
+++ b/libs/pageserver_api/src/control_api.rs
@@ -4,7 +4,9 @@
//! See docs/rfcs/025-generation-numbers.md
use serde::{Deserialize, Serialize};
-use utils::id::{NodeId, TenantId};
+use utils::id::NodeId;
+
+use crate::shard::TenantShardId;
#[derive(Serialize, Deserialize)]
pub struct ReAttachRequest {
@@ -13,7 +15,7 @@ pub struct ReAttachRequest {
#[derive(Serialize, Deserialize)]
pub struct ReAttachResponseTenant {
- pub id: TenantId,
+ pub id: TenantShardId,
pub gen: u32,
}
@@ -24,7 +26,7 @@ pub struct ReAttachResponse {
#[derive(Serialize, Deserialize)]
pub struct ValidateRequestTenant {
- pub id: TenantId,
+ pub id: TenantShardId,
pub gen: u32,
}
@@ -40,6 +42,6 @@ pub struct ValidateResponse {
#[derive(Serialize, Deserialize)]
pub struct ValidateResponseTenant {
- pub id: TenantId,
+ pub id: TenantShardId,
pub valid: bool,
}
diff --git a/libs/pageserver_api/src/shard.rs b/libs/pageserver_api/src/shard.rs
index 688b911425..3510b4dbca 100644
--- a/libs/pageserver_api/src/shard.rs
+++ b/libs/pageserver_api/src/shard.rs
@@ -5,10 +5,10 @@ use serde::{Deserialize, Serialize};
use thiserror;
use utils::id::TenantId;
-#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug)]
+#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
pub struct ShardNumber(pub u8);
-#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug)]
+#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
pub struct ShardCount(pub u8);
impl ShardCount {
@@ -39,7 +39,7 @@ impl ShardNumber {
/// Note that the binary encoding is _not_ backward compatible, because
/// at the time sharding is introduced, there are no existing binary structures
/// containing TenantId that we need to handle.
-#[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy)]
+#[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
pub struct TenantShardId {
pub tenant_id: TenantId,
pub shard_number: ShardNumber,
diff --git a/pageserver/benches/bench_layer_map.rs b/pageserver/benches/bench_layer_map.rs
index 735f358d8b..5d05af0c00 100644
--- a/pageserver/benches/bench_layer_map.rs
+++ b/pageserver/benches/bench_layer_map.rs
@@ -3,6 +3,7 @@ use pageserver::repository::Key;
use pageserver::tenant::layer_map::LayerMap;
use pageserver::tenant::storage_layer::LayerFileName;
use pageserver::tenant::storage_layer::PersistentLayerDesc;
+use pageserver_api::shard::TenantShardId;
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
use std::cmp::{max, min};
use std::fs::File;
@@ -211,7 +212,7 @@ fn bench_sequential(c: &mut Criterion) {
let i32 = (i as u32) % 100;
let zero = Key::from_hex("000000000000000000000000000000000000").unwrap();
let layer = PersistentLayerDesc::new_img(
- TenantId::generate(),
+ TenantShardId::unsharded(TenantId::generate()),
TimelineId::generate(),
zero.add(10 * i32)..zero.add(10 * i32 + 1),
Lsn(i),
diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs
index 87d9cc522e..5b170af4ef 100644
--- a/pageserver/src/config.rs
+++ b/pageserver/src/config.rs
@@ -5,6 +5,7 @@
//! See also `settings.md` for better description on every parameter.
use anyhow::{anyhow, bail, ensure, Context, Result};
+use pageserver_api::shard::TenantShardId;
use remote_storage::{RemotePath, RemoteStorageConfig};
use serde::de::IntoDeserializer;
use std::env;
@@ -25,7 +26,7 @@ use toml_edit::{Document, Item};
use camino::{Utf8Path, Utf8PathBuf};
use postgres_backend::AuthType;
use utils::{
- id::{NodeId, TenantId, TimelineId},
+ id::{NodeId, TimelineId},
logging::LogFormat,
};
@@ -628,12 +629,13 @@ impl PageServerConf {
self.deletion_prefix().join(format!("header-{VERSION:02x}"))
}
- pub fn tenant_path(&self, tenant_id: &TenantId) -> Utf8PathBuf {
- self.tenants_path().join(tenant_id.to_string())
+ pub fn tenant_path(&self, tenant_shard_id: &TenantShardId) -> Utf8PathBuf {
+ self.tenants_path().join(tenant_shard_id.to_string())
}
- pub fn tenant_ignore_mark_file_path(&self, tenant_id: &TenantId) -> Utf8PathBuf {
- self.tenant_path(tenant_id).join(IGNORED_TENANT_FILE_NAME)
+ pub fn tenant_ignore_mark_file_path(&self, tenant_shard_id: &TenantShardId) -> Utf8PathBuf {
+ self.tenant_path(tenant_shard_id)
+ .join(IGNORED_TENANT_FILE_NAME)
}
/// Points to a place in pageserver's local directory,
@@ -641,47 +643,53 @@ impl PageServerConf {
///
/// Legacy: superseded by tenant_location_config_path. Eventually
/// remove this function.
- pub fn tenant_config_path(&self, tenant_id: &TenantId) -> Utf8PathBuf {
- self.tenant_path(tenant_id).join(TENANT_CONFIG_NAME)
+ pub fn tenant_config_path(&self, tenant_shard_id: &TenantShardId) -> Utf8PathBuf {
+ self.tenant_path(tenant_shard_id).join(TENANT_CONFIG_NAME)
}
- pub fn tenant_location_config_path(&self, tenant_id: &TenantId) -> Utf8PathBuf {
- self.tenant_path(tenant_id)
+ pub fn tenant_location_config_path(&self, tenant_shard_id: &TenantShardId) -> Utf8PathBuf {
+ self.tenant_path(tenant_shard_id)
.join(TENANT_LOCATION_CONFIG_NAME)
}
- pub fn timelines_path(&self, tenant_id: &TenantId) -> Utf8PathBuf {
- self.tenant_path(tenant_id).join(TIMELINES_SEGMENT_NAME)
+ pub fn timelines_path(&self, tenant_shard_id: &TenantShardId) -> Utf8PathBuf {
+ self.tenant_path(tenant_shard_id)
+ .join(TIMELINES_SEGMENT_NAME)
}
- pub fn timeline_path(&self, tenant_id: &TenantId, timeline_id: &TimelineId) -> Utf8PathBuf {
- self.timelines_path(tenant_id).join(timeline_id.to_string())
+ pub fn timeline_path(
+ &self,
+ tenant_shard_id: &TenantShardId,
+ timeline_id: &TimelineId,
+ ) -> Utf8PathBuf {
+ self.timelines_path(tenant_shard_id)
+ .join(timeline_id.to_string())
}
pub fn timeline_uninit_mark_file_path(
&self,
- tenant_id: TenantId,
+ tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
) -> Utf8PathBuf {
path_with_suffix_extension(
- self.timeline_path(&tenant_id, &timeline_id),
+ self.timeline_path(&tenant_shard_id, &timeline_id),
TIMELINE_UNINIT_MARK_SUFFIX,
)
}
pub fn timeline_delete_mark_file_path(
&self,
- tenant_id: TenantId,
+ tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
) -> Utf8PathBuf {
path_with_suffix_extension(
- self.timeline_path(&tenant_id, &timeline_id),
+ self.timeline_path(&tenant_shard_id, &timeline_id),
TIMELINE_DELETE_MARK_SUFFIX,
)
}
- pub fn tenant_deleted_mark_file_path(&self, tenant_id: &TenantId) -> Utf8PathBuf {
- self.tenant_path(tenant_id)
+ pub fn tenant_deleted_mark_file_path(&self, tenant_shard_id: &TenantShardId) -> Utf8PathBuf {
+ self.tenant_path(tenant_shard_id)
.join(TENANT_DELETED_MARKER_FILE_NAME)
}
@@ -691,20 +699,24 @@ impl PageServerConf {
pub fn trace_path(
&self,
- tenant_id: &TenantId,
+ tenant_shard_id: &TenantShardId,
timeline_id: &TimelineId,
connection_id: &ConnectionId,
) -> Utf8PathBuf {
self.traces_path()
- .join(tenant_id.to_string())
+ .join(tenant_shard_id.to_string())
.join(timeline_id.to_string())
.join(connection_id.to_string())
}
/// Points to a place in pageserver's local directory,
/// where certain timeline's metadata file should be located.
- pub fn metadata_path(&self, tenant_id: &TenantId, timeline_id: &TimelineId) -> Utf8PathBuf {
- self.timeline_path(tenant_id, timeline_id)
+ pub fn metadata_path(
+ &self,
+ tenant_shard_id: &TenantShardId,
+ timeline_id: &TimelineId,
+ ) -> Utf8PathBuf {
+ self.timeline_path(tenant_shard_id, timeline_id)
.join(METADATA_FILE_NAME)
}
diff --git a/pageserver/src/consumption_metrics/metrics.rs b/pageserver/src/consumption_metrics/metrics.rs
index 2989e15e8e..213f08484c 100644
--- a/pageserver/src/consumption_metrics/metrics.rs
+++ b/pageserver/src/consumption_metrics/metrics.rs
@@ -351,7 +351,7 @@ impl TimelineSnapshot {
let last_record_lsn = t.get_last_record_lsn();
let current_exact_logical_size = {
- let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_id, timeline_id = %t.timeline_id);
+ let span = tracing::info_span!("collect_metrics_iteration", tenant_id = %t.tenant_shard_id.tenant_id, timeline_id = %t.timeline_id);
let res = span
.in_scope(|| t.get_current_logical_size(ctx))
.context("get_current_logical_size");
diff --git a/pageserver/src/control_plane_client.rs b/pageserver/src/control_plane_client.rs
index f50c19a629..25ae3d1b01 100644
--- a/pageserver/src/control_plane_client.rs
+++ b/pageserver/src/control_plane_client.rs
@@ -1,16 +1,15 @@
use std::collections::HashMap;
-use pageserver_api::control_api::{
- ReAttachRequest, ReAttachResponse, ValidateRequest, ValidateRequestTenant, ValidateResponse,
+use pageserver_api::{
+ control_api::{
+ ReAttachRequest, ReAttachResponse, ValidateRequest, ValidateRequestTenant, ValidateResponse,
+ },
+ shard::TenantShardId,
};
use serde::{de::DeserializeOwned, Serialize};
use tokio_util::sync::CancellationToken;
use url::Url;
-use utils::{
- backoff,
- generation::Generation,
- id::{NodeId, TenantId},
-};
+use utils::{backoff, generation::Generation, id::NodeId};
use crate::config::PageServerConf;
@@ -31,11 +30,11 @@ pub enum RetryForeverError {
#[async_trait::async_trait]
pub trait ControlPlaneGenerationsApi {
- async fn re_attach(&self) -> Result, RetryForeverError>;
+ async fn re_attach(&self) -> Result, RetryForeverError>;
async fn validate(
&self,
- tenants: Vec<(TenantId, Generation)>,
- ) -> Result, RetryForeverError>;
+ tenants: Vec<(TenantShardId, Generation)>,
+ ) -> Result, RetryForeverError>;
}
impl ControlPlaneClient {
@@ -127,7 +126,7 @@ impl ControlPlaneClient {
#[async_trait::async_trait]
impl ControlPlaneGenerationsApi for ControlPlaneClient {
/// Block until we get a successful response, or error out if we are shut down
- async fn re_attach(&self) -> Result, RetryForeverError> {
+ async fn re_attach(&self) -> Result, RetryForeverError> {
let re_attach_path = self
.base_url
.join("re-attach")
@@ -154,8 +153,8 @@ impl ControlPlaneGenerationsApi for ControlPlaneClient {
/// Block until we get a successful response, or error out if we are shut down
async fn validate(
&self,
- tenants: Vec<(TenantId, Generation)>,
- ) -> Result, RetryForeverError> {
+ tenants: Vec<(TenantShardId, Generation)>,
+ ) -> Result, RetryForeverError> {
let re_attach_path = self
.base_url
.join("validate")
diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs
index 4bc99eb94f..7b05745483 100644
--- a/pageserver/src/deletion_queue.rs
+++ b/pageserver/src/deletion_queue.rs
@@ -15,6 +15,7 @@ use crate::virtual_file::MaybeFatalIo;
use crate::virtual_file::VirtualFile;
use anyhow::Context;
use camino::Utf8PathBuf;
+use pageserver_api::shard::TenantShardId;
use remote_storage::{GenericRemoteStorage, RemotePath};
use serde::Deserialize;
use serde::Serialize;
@@ -25,7 +26,7 @@ use tracing::Instrument;
use tracing::{self, debug, error};
use utils::crashsafe::path_with_suffix_extension;
use utils::generation::Generation;
-use utils::id::{TenantId, TimelineId};
+use utils::id::TimelineId;
use utils::lsn::AtomicLsn;
use utils::lsn::Lsn;
@@ -193,7 +194,7 @@ struct DeletionList {
/// nested HashMaps by TenantTimelineID. Each Tenant only appears once
/// with one unique generation ID: if someone tries to push a second generation
/// ID for the same tenant, we will start a new DeletionList.
- tenants: HashMap,
+ tenants: HashMap,
/// Avoid having to walk `tenants` to calculate the number of keys in
/// the nested deletion lists
@@ -265,7 +266,7 @@ impl DeletionList {
/// deletion list.
fn push(
&mut self,
- tenant: &TenantId,
+ tenant: &TenantShardId,
timeline: &TimelineId,
generation: Generation,
objects: &mut Vec,
@@ -357,7 +358,7 @@ struct TenantLsnState {
#[derive(Default)]
struct VisibleLsnUpdates {
- tenants: HashMap,
+ tenants: HashMap,
}
impl VisibleLsnUpdates {
@@ -414,7 +415,7 @@ impl DeletionQueueClient {
pub(crate) fn recover(
&self,
- attached_tenants: HashMap,
+ attached_tenants: HashMap,
) -> Result<(), DeletionQueueError> {
self.do_push(
&self.tx,
@@ -431,7 +432,7 @@ impl DeletionQueueClient {
/// backend will later wake up and notice that the tenant's generation requires validation.
pub(crate) async fn update_remote_consistent_lsn(
&self,
- tenant_id: TenantId,
+ tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
current_generation: Generation,
lsn: Lsn,
@@ -442,10 +443,13 @@ impl DeletionQueueClient {
.write()
.expect("Lock should never be poisoned");
- let tenant_entry = locked.tenants.entry(tenant_id).or_insert(TenantLsnState {
- timelines: HashMap::new(),
- generation: current_generation,
- });
+ let tenant_entry = locked
+ .tenants
+ .entry(tenant_shard_id)
+ .or_insert(TenantLsnState {
+ timelines: HashMap::new(),
+ generation: current_generation,
+ });
if tenant_entry.generation != current_generation {
// Generation might have changed if we were detached and then re-attached: in this case,
@@ -472,7 +476,7 @@ impl DeletionQueueClient {
/// generations in `layers` are the generations in which those layers were written.
pub(crate) async fn push_layers(
&self,
- tenant_id: TenantId,
+ tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
current_generation: Generation,
layers: Vec<(LayerFileName, LayerFileMetadata)>,
@@ -483,7 +487,7 @@ impl DeletionQueueClient {
let mut layer_paths = Vec::new();
for (layer, meta) in layers {
layer_paths.push(remote_layer_path(
- &tenant_id,
+ &tenant_shard_id.tenant_id,
&timeline_id,
meta.shard,
&layer,
@@ -494,7 +498,7 @@ impl DeletionQueueClient {
return self.flush_immediate().await;
}
- self.push_layers_sync(tenant_id, timeline_id, current_generation, layers)
+ self.push_layers_sync(tenant_shard_id, timeline_id, current_generation, layers)
}
/// When a Tenant has a generation, push_layers is always synchronous because
@@ -504,7 +508,7 @@ impl DeletionQueueClient {
/// support (``)
pub(crate) fn push_layers_sync(
&self,
- tenant_id: TenantId,
+ tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
current_generation: Generation,
layers: Vec<(LayerFileName, LayerFileMetadata)>,
@@ -515,7 +519,7 @@ impl DeletionQueueClient {
self.do_push(
&self.tx,
ListWriterQueueMessage::Delete(DeletionOp {
- tenant_id,
+ tenant_shard_id,
timeline_id,
layers,
generation: current_generation,
@@ -783,12 +787,12 @@ mod test {
}
fn set_latest_generation(&self, gen: Generation) {
- let tenant_id = self.harness.tenant_id;
+ let tenant_shard_id = self.harness.tenant_shard_id;
self.mock_control_plane
.latest_generation
.lock()
.unwrap()
- .insert(tenant_id, gen);
+ .insert(tenant_shard_id, gen);
}
/// Returns remote layer file name, suitable for use in assert_remote_files
@@ -797,8 +801,8 @@ mod test {
file_name: LayerFileName,
gen: Generation,
) -> anyhow::Result {
- let tenant_id = self.harness.tenant_id;
- let relative_remote_path = remote_timeline_path(&tenant_id, &TIMELINE_ID);
+ let tenant_shard_id = self.harness.tenant_shard_id;
+ let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
let remote_timeline_path = self.remote_fs_dir.join(relative_remote_path.get_path());
std::fs::create_dir_all(&remote_timeline_path)?;
let remote_layer_file_name = format!("{}{}", file_name, gen.get_suffix());
@@ -816,7 +820,7 @@ mod test {
#[derive(Debug, Clone)]
struct MockControlPlane {
- pub latest_generation: std::sync::Arc>>,
+ pub latest_generation: std::sync::Arc>>,
}
impl MockControlPlane {
@@ -830,20 +834,20 @@ mod test {
#[async_trait::async_trait]
impl ControlPlaneGenerationsApi for MockControlPlane {
#[allow(clippy::diverging_sub_expression)] // False positive via async_trait
- async fn re_attach(&self) -> Result, RetryForeverError> {
+ async fn re_attach(&self) -> Result, RetryForeverError> {
unimplemented!()
}
async fn validate(
&self,
- tenants: Vec<(TenantId, Generation)>,
- ) -> Result, RetryForeverError> {
+ tenants: Vec<(TenantShardId, Generation)>,
+ ) -> Result, RetryForeverError> {
let mut result = HashMap::new();
let latest_generation = self.latest_generation.lock().unwrap();
- for (tenant_id, generation) in tenants {
- if let Some(latest) = latest_generation.get(&tenant_id) {
- result.insert(tenant_id, *latest == generation);
+ for (tenant_shard_id, generation) in tenants {
+ if let Some(latest) = latest_generation.get(&tenant_shard_id) {
+ result.insert(tenant_shard_id, *latest == generation);
}
}
@@ -947,10 +951,10 @@ mod test {
client.recover(HashMap::new())?;
let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap();
- let tenant_id = ctx.harness.tenant_id;
+ let tenant_shard_id = ctx.harness.tenant_shard_id;
let content: Vec = "victim1 contents".into();
- let relative_remote_path = remote_timeline_path(&tenant_id, &TIMELINE_ID);
+ let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
let deletion_prefix = ctx.harness.conf.deletion_prefix();
@@ -980,7 +984,7 @@ mod test {
info!("Pushing");
client
.push_layers(
- tenant_id,
+ tenant_shard_id,
TIMELINE_ID,
now_generation,
[(layer_file_name_1.clone(), layer_metadata)].to_vec(),
@@ -1027,8 +1031,8 @@ mod test {
ctx.set_latest_generation(latest_generation);
- let tenant_id = ctx.harness.tenant_id;
- let relative_remote_path = remote_timeline_path(&tenant_id, &TIMELINE_ID);
+ let tenant_shard_id = ctx.harness.tenant_shard_id;
+ let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
// Initial state: a remote layer exists
@@ -1038,7 +1042,7 @@ mod test {
tracing::debug!("Pushing...");
client
.push_layers(
- tenant_id,
+ tenant_shard_id,
TIMELINE_ID,
stale_generation,
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
@@ -1053,7 +1057,7 @@ mod test {
tracing::debug!("Pushing...");
client
.push_layers(
- tenant_id,
+ tenant_shard_id,
TIMELINE_ID,
latest_generation,
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
@@ -1075,9 +1079,9 @@ mod test {
let client = ctx.deletion_queue.new_client();
client.recover(HashMap::new())?;
- let tenant_id = ctx.harness.tenant_id;
+ let tenant_shard_id = ctx.harness.tenant_shard_id;
- let relative_remote_path = remote_timeline_path(&tenant_id, &TIMELINE_ID);
+ let relative_remote_path = remote_timeline_path(&tenant_shard_id, &TIMELINE_ID);
let remote_timeline_path = ctx.remote_fs_dir.join(relative_remote_path.get_path());
let deletion_prefix = ctx.harness.conf.deletion_prefix();
@@ -1093,7 +1097,7 @@ mod test {
ctx.write_remote_layer(EXAMPLE_LAYER_NAME, layer_generation)?;
client
.push_layers(
- tenant_id,
+ tenant_shard_id,
TIMELINE_ID,
now_generation.previous(),
[(EXAMPLE_LAYER_NAME.clone(), layer_metadata.clone())].to_vec(),
@@ -1107,7 +1111,7 @@ mod test {
ctx.write_remote_layer(EXAMPLE_LAYER_NAME_ALT, layer_generation)?;
client
.push_layers(
- tenant_id,
+ tenant_shard_id,
TIMELINE_ID,
now_generation,
[(EXAMPLE_LAYER_NAME_ALT.clone(), layer_metadata.clone())].to_vec(),
@@ -1138,7 +1142,7 @@ mod test {
drop(client);
ctx.restart().await;
let client = ctx.deletion_queue.new_client();
- client.recover(HashMap::from([(tenant_id, now_generation)]))?;
+ client.recover(HashMap::from([(tenant_shard_id, now_generation)]))?;
info!("Flush-executing");
client.flush_execute().await?;
@@ -1202,7 +1206,7 @@ pub(crate) mod mock {
let mut objects = op.objects;
for (layer, meta) in op.layers {
objects.push(remote_layer_path(
- &op.tenant_id,
+ &op.tenant_shard_id.tenant_id,
&op.timeline_id,
meta.shard,
&layer,
@@ -1293,7 +1297,7 @@ pub(crate) mod mock {
fn deletion_list_serialization() -> anyhow::Result<()> {
let tenant_id = "ad6c1a56f5680419d3a16ff55d97ec3c"
.to_string()
- .parse::()?;
+ .parse::()?;
let timeline_id = "be322c834ed9e709e63b5c9698691910"
.to_string()
.parse::()?;
diff --git a/pageserver/src/deletion_queue/list_writer.rs b/pageserver/src/deletion_queue/list_writer.rs
index 5d52b680e4..7ff27ceb44 100644
--- a/pageserver/src/deletion_queue/list_writer.rs
+++ b/pageserver/src/deletion_queue/list_writer.rs
@@ -19,6 +19,7 @@ use std::collections::HashMap;
use std::fs::create_dir_all;
use std::time::Duration;
+use pageserver_api::shard::TenantShardId;
use regex::Regex;
use remote_storage::RemotePath;
use tokio_util::sync::CancellationToken;
@@ -26,7 +27,6 @@ use tracing::debug;
use tracing::info;
use tracing::warn;
use utils::generation::Generation;
-use utils::id::TenantId;
use utils::id::TimelineId;
use crate::config::PageServerConf;
@@ -54,7 +54,7 @@ const FRONTEND_FLUSHING_TIMEOUT: Duration = Duration::from_millis(100);
#[derive(Debug)]
pub(super) struct DeletionOp {
- pub(super) tenant_id: TenantId,
+ pub(super) tenant_shard_id: TenantShardId,
pub(super) timeline_id: TimelineId,
// `layers` and `objects` are both just lists of objects. `layers` is used if you do not
// have a config object handy to project it to a remote key, and need the consuming worker
@@ -62,14 +62,14 @@ pub(super) struct DeletionOp {
pub(super) layers: Vec<(LayerFileName, LayerFileMetadata)>,
pub(super) objects: Vec,
- /// The _current_ generation of the Tenant attachment in which we are enqueuing
+ /// The _current_ generation of the Tenant shard attachment in which we are enqueuing
/// this deletion.
pub(super) generation: Generation,
}
#[derive(Debug)]
pub(super) struct RecoverOp {
- pub(super) attached_tenants: HashMap,
+ pub(super) attached_tenants: HashMap,
}
#[derive(Debug)]
@@ -206,7 +206,7 @@ impl ListWriter {
async fn recover(
&mut self,
- attached_tenants: HashMap,
+ attached_tenants: HashMap,
) -> Result<(), anyhow::Error> {
debug!(
"recovering with {} attached tenants",
@@ -309,8 +309,8 @@ impl ListWriter {
// generation was issued to another node in the interval while we restarted,
// then we may treat deletion lists from the previous generation as if they
// belong to our currently attached generation, and proceed to validate & execute.
- for (tenant_id, tenant_list) in &mut deletion_list.tenants {
- if let Some(attached_gen) = attached_tenants.get(tenant_id) {
+ for (tenant_shard_id, tenant_list) in &mut deletion_list.tenants {
+ if let Some(attached_gen) = attached_tenants.get(tenant_shard_id) {
if attached_gen.previous() == tenant_list.generation {
tenant_list.generation = *attached_gen;
}
@@ -390,7 +390,7 @@ impl ListWriter {
let mut layer_paths = Vec::new();
for (layer, meta) in op.layers {
layer_paths.push(remote_layer_path(
- &op.tenant_id,
+ &op.tenant_shard_id.tenant_id,
&op.timeline_id,
meta.shard,
&layer,
@@ -400,14 +400,14 @@ impl ListWriter {
layer_paths.extend(op.objects);
if !self.pending.push(
- &op.tenant_id,
+ &op.tenant_shard_id,
&op.timeline_id,
op.generation,
&mut layer_paths,
) {
self.flush().await;
let retry_succeeded = self.pending.push(
- &op.tenant_id,
+ &op.tenant_shard_id,
&op.timeline_id,
op.generation,
&mut layer_paths,
diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs
index 642cafad28..f01cd1cf8c 100644
--- a/pageserver/src/disk_usage_eviction_task.rs
+++ b/pageserver/src/disk_usage_eviction_task.rs
@@ -310,7 +310,7 @@ pub async fn disk_usage_eviction_task_iteration_impl(
.unwrap()
.as_micros(),
partition,
- desc.tenant_id,
+ desc.tenant_shard_id,
desc.timeline_id,
candidate.layer,
);
@@ -380,7 +380,7 @@ pub async fn disk_usage_eviction_task_iteration_impl(
let limit = Arc::new(tokio::sync::Semaphore::new(1000.max(max_batch_size)));
for (timeline, batch) in batched {
- let tenant_id = timeline.tenant_id;
+ let tenant_shard_id = timeline.tenant_shard_id;
let timeline_id = timeline.timeline_id;
let batch_size =
u32::try_from(batch.len()).expect("batch size limited to u32::MAX during partitioning");
@@ -431,7 +431,7 @@ pub async fn disk_usage_eviction_task_iteration_impl(
(evicted_bytes, evictions_failed)
}
}
- .instrument(tracing::info_span!("evict_batch", %tenant_id, %timeline_id, batch_size));
+ .instrument(tracing::info_span!("evict_batch", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), %timeline_id, batch_size));
js.spawn(evict);
@@ -572,7 +572,7 @@ async fn collect_eviction_candidates(
continue;
}
let info = tl.get_local_layers_for_disk_usage_eviction().await;
- debug!(tenant_id=%tl.tenant_id, timeline_id=%tl.timeline_id, "timeline resident layers count: {}", info.resident_layers.len());
+ debug!(tenant_id=%tl.tenant_shard_id.tenant_id, shard_id=%tl.tenant_shard_id.shard_slug(), timeline_id=%tl.timeline_id, "timeline resident layers count: {}", info.resident_layers.len());
tenant_candidates.extend(
info.resident_layers
.into_iter()
diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs
index 5ce09500ee..9cb411c95c 100644
--- a/pageserver/src/http/routes.rs
+++ b/pageserver/src/http/routes.rs
@@ -356,7 +356,8 @@ async fn build_timeline_info_common(
let walreceiver_status = timeline.walreceiver_status();
let info = TimelineInfo {
- tenant_id: timeline.tenant_id,
+ // TODO(sharding): add a shard_id field, or make tenant_id into a tenant_shard_id
+ tenant_id: timeline.tenant_shard_id.tenant_id,
timeline_id: timeline.timeline_id,
ancestor_timeline_id,
ancestor_lsn,
diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs
index d5915f4c98..b178024b9a 100644
--- a/pageserver/src/metrics.rs
+++ b/pageserver/src/metrics.rs
@@ -7,6 +7,7 @@ use metrics::{
HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, UIntGauge, UIntGaugeVec,
};
use once_cell::sync::Lazy;
+use pageserver_api::shard::TenantShardId;
use strum::{EnumCount, IntoEnumIterator, VariantNames};
use strum_macros::{EnumVariantNames, IntoStaticStr};
use utils::id::{TenantId, TimelineId};
@@ -1571,9 +1572,9 @@ pub struct RemoteTimelineClientMetrics {
}
impl RemoteTimelineClientMetrics {
- pub fn new(tenant_id: &TenantId, timeline_id: &TimelineId) -> Self {
+ pub fn new(tenant_shard_id: &TenantShardId, timeline_id: &TimelineId) -> Self {
RemoteTimelineClientMetrics {
- tenant_id: tenant_id.to_string(),
+ tenant_id: tenant_shard_id.tenant_id.to_string(),
timeline_id: timeline_id.to_string(),
calls_unfinished_gauge: Mutex::new(HashMap::default()),
bytes_started_counter: Mutex::new(HashMap::default()),
diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs
index ee5f1732e4..82c16eb9bd 100644
--- a/pageserver/src/page_service.rs
+++ b/pageserver/src/page_service.rs
@@ -399,6 +399,9 @@ impl PageServerHandler {
{
debug_assert_current_span_has_tenant_and_timeline_id();
+ // TODO(sharding): enumerate local tenant shards for this tenant, and select the one
+ // that should serve this request.
+
// Make request tracer if needed
let tenant = mgr::get_active_tenant_with_timeout(
tenant_id,
@@ -408,9 +411,10 @@ impl PageServerHandler {
.await?;
let mut tracer = if tenant.get_trace_read_requests() {
let connection_id = ConnectionId::generate();
- let path = tenant
- .conf
- .trace_path(&tenant_id, &timeline_id, &connection_id);
+ let path =
+ tenant
+ .conf
+ .trace_path(&tenant.tenant_shard_id(), &timeline_id, &connection_id);
Some(Tracer::new(path))
} else {
None
diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs
index bc8779b26f..fe3a5bfb79 100644
--- a/pageserver/src/tenant.rs
+++ b/pageserver/src/tenant.rs
@@ -17,6 +17,7 @@ use camino::{Utf8Path, Utf8PathBuf};
use enumset::EnumSet;
use futures::FutureExt;
use pageserver_api::models::TimelineState;
+use pageserver_api::shard::TenantShardId;
use remote_storage::DownloadError;
use remote_storage::GenericRemoteStorage;
use std::fmt;
@@ -228,7 +229,7 @@ pub struct Tenant {
// This is necessary to allow global config updates.
tenant_conf: Arc>,
- tenant_id: TenantId,
+ tenant_shard_id: TenantShardId,
/// The remote storage generation, used to protect S3 objects from split-brain.
/// Does not change over the lifetime of the [`Tenant`] object.
@@ -272,7 +273,7 @@ pub struct Tenant {
impl std::fmt::Debug for Tenant {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(f, "{} ({})", self.tenant_id, self.current_state())
+ write!(f, "{} ({})", self.tenant_shard_id, self.current_state())
}
}
@@ -466,7 +467,7 @@ impl Tenant {
init_order: Option<&InitializationOrder>,
_ctx: &RequestContext,
) -> anyhow::Result<()> {
- let tenant_id = self.tenant_id;
+ let tenant_id = self.tenant_shard_id;
let timeline = self.create_timeline_struct(
timeline_id,
@@ -558,7 +559,7 @@ impl Tenant {
#[allow(clippy::too_many_arguments)]
pub(crate) fn spawn(
conf: &'static PageServerConf,
- tenant_id: TenantId,
+ tenant_shard_id: TenantShardId,
resources: TenantSharedResources,
attached_conf: AttachedTenantConf,
init_order: Option,
@@ -566,8 +567,10 @@ impl Tenant {
mode: SpawnMode,
ctx: &RequestContext,
) -> anyhow::Result> {
+ // TODO(sharding): make WalRedoManager shard-aware
let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
- conf, tenant_id,
+ conf,
+ tenant_shard_id.tenant_id,
)));
let TenantSharedResources {
@@ -581,7 +584,7 @@ impl Tenant {
conf,
attached_conf,
wal_redo_manager,
- tenant_id,
+ tenant_shard_id,
remote_storage.clone(),
deletion_queue_client,
));
@@ -593,7 +596,7 @@ impl Tenant {
task_mgr::spawn(
&tokio::runtime::Handle::current(),
TaskKind::Attach,
- Some(tenant_id),
+ Some(tenant_shard_id.tenant_id),
None,
"attach tenant",
false,
@@ -632,7 +635,7 @@ impl Tenant {
match tenant_clone
.preload(remote_storage, task_mgr::shutdown_token())
.instrument(
- tracing::info_span!(parent: None, "attach_preload", tenant_id=%tenant_id),
+ tracing::info_span!(parent: None, "attach_preload", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()),
)
.await {
Ok(p) => p,
@@ -714,7 +717,7 @@ impl Tenant {
Ok(())
}
.instrument({
- let span = tracing::info_span!(parent: None, "attach", tenant_id=%tenant_id);
+ let span = tracing::info_span!(parent: None, "attach", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug());
span.follows_from(Span::current());
span
}),
@@ -732,7 +735,7 @@ impl Tenant {
info!("listing remote timelines");
let (remote_timeline_ids, other_keys) = remote_timeline_client::list_remote_timelines(
remote_storage,
- self.tenant_id,
+ self.tenant_shard_id,
cancel.clone(),
)
.await?;
@@ -844,7 +847,7 @@ impl Tenant {
.with_context(|| {
format!(
"failed to load remote timeline {} for tenant {}",
- timeline_id, self.tenant_id
+ timeline_id, self.tenant_shard_id
)
})?;
}
@@ -884,7 +887,7 @@ impl Tenant {
/// timeline that still exists: this can happen if we crashed during a deletion/creation, or
/// if a timeline was deleted while the tenant was attached to a different pageserver.
fn clean_up_timelines(&self, existent_timelines: &HashSet) -> anyhow::Result<()> {
- let timelines_dir = self.conf.timelines_path(&self.tenant_id);
+ let timelines_dir = self.conf.timelines_path(&self.tenant_shard_id);
let entries = match timelines_dir.read_dir_utf8() {
Ok(d) => d,
@@ -970,7 +973,7 @@ impl Tenant {
span::debug_assert_current_span_has_tenant_id();
info!("downloading index file for timeline {}", timeline_id);
- tokio::fs::create_dir_all(self.conf.timeline_path(&self.tenant_id, &timeline_id))
+ tokio::fs::create_dir_all(self.conf.timeline_path(&self.tenant_shard_id, &timeline_id))
.await
.context("Failed to create new timeline directory")?;
@@ -992,10 +995,15 @@ impl Tenant {
let init_order = None;
// timeline loading after attach expects to find metadata file for each metadata
- save_metadata(self.conf, &self.tenant_id, &timeline_id, &remote_metadata)
- .await
- .context("save_metadata")
- .map_err(LoadLocalTimelineError::Load)?;
+ save_metadata(
+ self.conf,
+ &self.tenant_shard_id,
+ &timeline_id,
+ &remote_metadata,
+ )
+ .await
+ .context("save_metadata")
+ .map_err(LoadLocalTimelineError::Load)?;
self.timeline_init_and_sync(
timeline_id,
@@ -1012,11 +1020,13 @@ impl Tenant {
/// Create a placeholder Tenant object for a broken tenant
pub fn create_broken_tenant(
conf: &'static PageServerConf,
- tenant_id: TenantId,
+ tenant_shard_id: TenantShardId,
reason: String,
) -> Arc {
+ // TODO(sharding): make WalRedoManager shard-aware
let wal_redo_manager = Arc::new(WalRedoManager::from(PostgresRedoManager::new(
- conf, tenant_id,
+ conf,
+ tenant_shard_id.tenant_id,
)));
Arc::new(Tenant::new(
TenantState::Broken {
@@ -1026,7 +1036,7 @@ impl Tenant {
conf,
AttachedTenantConf::try_from(LocationConf::default()).unwrap(),
wal_redo_manager,
- tenant_id,
+ tenant_shard_id,
None,
DeletionQueueClient::broken(),
))
@@ -1039,7 +1049,7 @@ impl Tenant {
// completed in non topological order (for example because parent has smaller number of layer files in it)
let mut timelines_to_resume_deletion: Vec<(TimelineId, Option)> = vec![];
- let timelines_dir = self.conf.timelines_path(&self.tenant_id);
+ let timelines_dir = self.conf.timelines_path(&self.tenant_shard_id);
for entry in timelines_dir
.read_dir_utf8()
@@ -1070,7 +1080,7 @@ impl Tenant {
"Could not parse timeline id out of the timeline uninit mark name {timeline_uninit_mark_file}",
)
})?;
- let timeline_dir = self.conf.timeline_path(&self.tenant_id, &timeline_id);
+ let timeline_dir = self.conf.timeline_path(&self.tenant_shard_id, &timeline_id);
if let Err(e) =
remove_timeline_and_uninit_mark(&timeline_dir, timeline_uninit_mark_file)
{
@@ -1087,7 +1097,7 @@ impl Tenant {
info!("Found deletion mark for timeline {}", timeline_id);
- match load_metadata(self.conf, &self.tenant_id, &timeline_id) {
+ match load_metadata(self.conf, &self.tenant_shard_id, &timeline_id) {
Ok(metadata) => {
timelines_to_resume_deletion.push((timeline_id, Some(metadata)))
}
@@ -1131,7 +1141,7 @@ impl Tenant {
})?;
let timeline_uninit_mark_file = self
.conf
- .timeline_uninit_mark_file_path(self.tenant_id, timeline_id);
+ .timeline_uninit_mark_file_path(self.tenant_shard_id, timeline_id);
if timeline_uninit_mark_file.exists() {
info!(
%timeline_id,
@@ -1147,7 +1157,7 @@ impl Tenant {
let timeline_delete_mark_file = self
.conf
- .timeline_delete_mark_file_path(self.tenant_id, timeline_id);
+ .timeline_delete_mark_file_path(self.tenant_shard_id, timeline_id);
if timeline_delete_mark_file.exists() {
// Cleanup should be done in `is_delete_mark` branch above
continue;
@@ -1155,7 +1165,7 @@ impl Tenant {
let file_name = entry.file_name();
if let Ok(timeline_id) = file_name.parse::() {
- let metadata = load_metadata(self.conf, &self.tenant_id, &timeline_id)
+ let metadata = load_metadata(self.conf, &self.tenant_shard_id, &timeline_id)
.context("failed to load metadata")?;
timelines_to_load.insert(timeline_id, metadata);
} else {
@@ -1187,7 +1197,7 @@ impl Tenant {
remote_storage.clone(),
self.deletion_queue_client.clone(),
self.conf,
- self.tenant_id,
+ self.tenant_shard_id,
timeline_id,
self.generation,
);
@@ -1393,8 +1403,12 @@ impl Tenant {
.map_err(LoadLocalTimelineError::Load)
}
- pub fn tenant_id(&self) -> TenantId {
- self.tenant_id
+ pub(crate) fn tenant_id(&self) -> TenantId {
+ self.tenant_shard_id.tenant_id
+ }
+
+ pub(crate) fn tenant_shard_id(&self) -> TenantShardId {
+ self.tenant_shard_id
}
/// Get Timeline handle for given Neon timeline ID.
@@ -1408,13 +1422,13 @@ impl Tenant {
let timeline = timelines_accessor
.get(&timeline_id)
.ok_or(GetTimelineError::NotFound {
- tenant_id: self.tenant_id,
+ tenant_id: self.tenant_shard_id.tenant_id,
timeline_id,
})?;
if active_only && !timeline.is_active() {
Err(GetTimelineError::NotActive {
- tenant_id: self.tenant_id,
+ tenant_id: self.tenant_shard_id.tenant_id,
timeline_id,
state: timeline.current_state(),
})
@@ -1772,7 +1786,7 @@ impl Tenant {
*current_state = TenantState::Activating(ActivatingFrom::Attaching);
}
}
- debug!(tenant_id = %self.tenant_id, "Activating tenant");
+ debug!(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), "Activating tenant");
activating = true;
// Continue outside the closure. We need to grab timelines.lock()
// and we plan to turn it into a tokio::sync::Mutex in a future patch.
@@ -1809,7 +1823,8 @@ impl Tenant {
// times to activate. see https://github.com/neondatabase/neon/issues/4025
info!(
since_creation_millis = elapsed.as_millis(),
- tenant_id = %self.tenant_id,
+ tenant_id = %self.tenant_shard_id.tenant_id,
+ shard_id = %self.tenant_shard_id.shard_slug(),
activated_timelines,
total_timelines,
post_state = <&'static str>::from(&*current_state),
@@ -1906,7 +1921,7 @@ impl Tenant {
//
// this will additionally shutdown and await all timeline tasks.
tracing::debug!("Waiting for tasks...");
- task_mgr::shutdown_tasks(None, Some(self.tenant_id), None).await;
+ task_mgr::shutdown_tasks(None, Some(self.tenant_shard_id.tenant_id), None).await;
// Wait for any in-flight operations to complete
self.gate.close().await;
@@ -2081,7 +2096,7 @@ impl Tenant {
receiver.changed().await.map_err(
|_e: tokio::sync::watch::error::RecvError|
// Tenant existed but was dropped: report it as non-existent
- GetActiveTenantError::NotFound(GetTenantError::NotFound(self.tenant_id))
+ GetActiveTenantError::NotFound(GetTenantError::NotFound(self.tenant_shard_id.tenant_id))
)?;
}
TenantState::Active { .. } => {
@@ -2155,9 +2170,6 @@ where
}
impl Tenant {
- pub fn get_tenant_id(&self) -> TenantId {
- self.tenant_id
- }
pub fn tenant_specific_overrides(&self) -> TenantConfOpt {
self.tenant_conf.read().unwrap().tenant_conf
}
@@ -2307,7 +2319,7 @@ impl Tenant {
new_metadata,
ancestor,
new_timeline_id,
- self.tenant_id,
+ self.tenant_shard_id,
self.generation,
Arc::clone(&self.walredo_mgr),
resources,
@@ -2329,14 +2341,14 @@ impl Tenant {
conf: &'static PageServerConf,
attached_conf: AttachedTenantConf,
walredo_mgr: Arc,
- tenant_id: TenantId,
+ tenant_shard_id: TenantShardId,
remote_storage: Option,
deletion_queue_client: DeletionQueueClient,
) -> Tenant {
let (state, mut rx) = watch::channel(state);
tokio::spawn(async move {
- let tid = tenant_id.to_string();
+ let tid = tenant_shard_id.to_string();
fn inspect_state(state: &TenantState) -> ([&'static str; 1], bool) {
([state.into()], matches!(state, TenantState::Broken { .. }))
@@ -2388,7 +2400,7 @@ impl Tenant {
});
Tenant {
- tenant_id,
+ tenant_shard_id,
generation: attached_conf.location.generation,
conf,
// using now here is good enough approximation to catch tenants with really long
@@ -2406,17 +2418,17 @@ impl Tenant {
eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())),
cancel: CancellationToken::default(),
- gate: Gate::new(format!("Tenant<{tenant_id}>")),
+ gate: Gate::new(format!("Tenant<{tenant_shard_id}>")),
}
}
/// Locate and load config
pub(super) fn load_tenant_config(
conf: &'static PageServerConf,
- tenant_id: &TenantId,
+ tenant_shard_id: &TenantShardId,
) -> anyhow::Result {
- let legacy_config_path = conf.tenant_config_path(tenant_id);
- let config_path = conf.tenant_location_config_path(tenant_id);
+ let legacy_config_path = conf.tenant_config_path(tenant_shard_id);
+ let config_path = conf.tenant_location_config_path(tenant_shard_id);
if config_path.exists() {
// New-style config takes precedence
@@ -2470,29 +2482,34 @@ impl Tenant {
.with_context(|| format!("Failed to parse config from file '{path}' as toml file"))
}
- #[tracing::instrument(skip_all, fields(%tenant_id))]
+ #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
pub(super) async fn persist_tenant_config(
conf: &'static PageServerConf,
- tenant_id: &TenantId,
+ tenant_shard_id: &TenantShardId,
location_conf: &LocationConf,
) -> anyhow::Result<()> {
- let legacy_config_path = conf.tenant_config_path(tenant_id);
- let config_path = conf.tenant_location_config_path(tenant_id);
+ let legacy_config_path = conf.tenant_config_path(tenant_shard_id);
+ let config_path = conf.tenant_location_config_path(tenant_shard_id);
- Self::persist_tenant_config_at(tenant_id, &config_path, &legacy_config_path, location_conf)
- .await
+ Self::persist_tenant_config_at(
+ tenant_shard_id,
+ &config_path,
+ &legacy_config_path,
+ location_conf,
+ )
+ .await
}
- #[tracing::instrument(skip_all, fields(%tenant_id))]
+ #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
pub(super) async fn persist_tenant_config_at(
- tenant_id: &TenantId,
+ tenant_shard_id: &TenantShardId,
config_path: &Utf8Path,
legacy_config_path: &Utf8Path,
location_conf: &LocationConf,
) -> anyhow::Result<()> {
// Forward compat: write out an old-style configuration that old versions can read, in case we roll back
Self::persist_tenant_config_legacy(
- tenant_id,
+ tenant_shard_id,
legacy_config_path,
&location_conf.tenant_conf,
)
@@ -2519,14 +2536,16 @@ impl Tenant {
let temp_path = path_with_suffix_extension(config_path, TEMP_FILE_SUFFIX);
- let tenant_id = *tenant_id;
+ let tenant_shard_id = *tenant_shard_id;
let config_path = config_path.to_owned();
tokio::task::spawn_blocking(move || {
Handle::current().block_on(async move {
let conf_content = conf_content.as_bytes();
VirtualFile::crashsafe_overwrite(&config_path, &temp_path, conf_content)
.await
- .with_context(|| format!("write tenant {tenant_id} config to {config_path}"))
+ .with_context(|| {
+ format!("write tenant {tenant_shard_id} config to {config_path}")
+ })
})
})
.await??;
@@ -2534,9 +2553,9 @@ impl Tenant {
Ok(())
}
- #[tracing::instrument(skip_all, fields(%tenant_id))]
+ #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
async fn persist_tenant_config_legacy(
- tenant_id: &TenantId,
+ tenant_shard_id: &TenantShardId,
target_config_path: &Utf8Path,
tenant_conf: &TenantConfOpt,
) -> anyhow::Result<()> {
@@ -2554,7 +2573,7 @@ impl Tenant {
let temp_path = path_with_suffix_extension(target_config_path, TEMP_FILE_SUFFIX);
- let tenant_id = *tenant_id;
+ let tenant_shard_id = *tenant_shard_id;
let target_config_path = target_config_path.to_owned();
tokio::task::spawn_blocking(move || {
Handle::current().block_on(async move {
@@ -2562,7 +2581,7 @@ impl Tenant {
VirtualFile::crashsafe_overwrite(&target_config_path, &temp_path, conf_content)
.await
.with_context(|| {
- format!("write tenant {tenant_id} config to {target_config_path}")
+ format!("write tenant {tenant_shard_id} config to {target_config_path}")
})
})
})
@@ -2940,7 +2959,7 @@ impl Tenant {
// temporary directory for basebackup files for the given timeline.
let pgdata_path = path_with_suffix_extension(
self.conf
- .timelines_path(&self.tenant_id)
+ .timelines_path(&self.tenant_shard_id)
.join(format!("basebackup-{timeline_id}")),
TEMP_FILE_SUFFIX,
);
@@ -2971,7 +2990,7 @@ impl Tenant {
|| async {
self::remote_timeline_client::upload_initdb_dir(
storage,
- &self.tenant_id,
+ &self.tenant_shard_id.tenant_id,
&timeline_id,
pgdata_zstd.clone(),
)
@@ -3010,7 +3029,7 @@ impl Tenant {
)
.await?;
- let tenant_id = raw_timeline.owning_tenant.tenant_id;
+ let tenant_shard_id = raw_timeline.owning_tenant.tenant_shard_id;
let unfinished_timeline = raw_timeline.raw_timeline()?;
import_datadir::import_timeline_from_postgres_datadir(
@@ -3021,7 +3040,7 @@ impl Tenant {
)
.await
.with_context(|| {
- format!("Failed to import pgdatadir for timeline {tenant_id}/{timeline_id}")
+ format!("Failed to import pgdatadir for timeline {tenant_shard_id}/{timeline_id}")
})?;
// Flush the new layer files to disk, before we make the timeline as available to
@@ -3039,7 +3058,7 @@ impl Tenant {
.await
.with_context(|| {
format!(
- "Failed to flush after pgdatadir import for timeline {tenant_id}/{timeline_id}"
+ "Failed to flush after pgdatadir import for timeline {tenant_shard_id}/{timeline_id}"
)
})?;
@@ -3062,7 +3081,7 @@ impl Tenant {
remote_storage.clone(),
self.deletion_queue_client.clone(),
self.conf,
- self.tenant_id,
+ self.tenant_shard_id,
timeline_id,
self.generation,
);
@@ -3091,7 +3110,7 @@ impl Tenant {
start_lsn: Lsn,
ancestor: Option>,
) -> anyhow::Result {
- let tenant_id = self.tenant_id;
+ let tenant_shard_id = self.tenant_shard_id;
let resources = self.build_timeline_resources(new_timeline_id);
if let Some(remote_client) = &resources.remote_client {
@@ -3115,12 +3134,14 @@ impl Tenant {
.create_timeline_files(&uninit_mark.timeline_path, &new_timeline_id, new_metadata)
.await
{
- error!("Failed to create initial files for timeline {tenant_id}/{new_timeline_id}, cleaning up: {e:?}");
+ error!("Failed to create initial files for timeline {tenant_shard_id}/{new_timeline_id}, cleaning up: {e:?}");
cleanup_timeline_directory(uninit_mark);
return Err(e);
}
- debug!("Successfully created initial files for timeline {tenant_id}/{new_timeline_id}");
+ debug!(
+ "Successfully created initial files for timeline {tenant_shard_id}/{new_timeline_id}"
+ );
Ok(UninitializedTimeline::new(
self,
@@ -3141,9 +3162,14 @@ impl Tenant {
anyhow::bail!("failpoint after-timeline-uninit-mark-creation");
});
- save_metadata(self.conf, &self.tenant_id, new_timeline_id, new_metadata)
- .await
- .context("Failed to create timeline metadata")?;
+ save_metadata(
+ self.conf,
+ &self.tenant_shard_id,
+ new_timeline_id,
+ new_metadata,
+ )
+ .await
+ .context("Failed to create timeline metadata")?;
Ok(())
}
@@ -3156,13 +3182,13 @@ impl Tenant {
timeline_id: TimelineId,
timelines: &MutexGuard>>,
) -> anyhow::Result {
- let tenant_id = self.tenant_id;
+ let tenant_shard_id = self.tenant_shard_id;
anyhow::ensure!(
timelines.get(&timeline_id).is_none(),
- "Timeline {tenant_id}/{timeline_id} already exists in pageserver's memory"
+ "Timeline {tenant_shard_id}/{timeline_id} already exists in pageserver's memory"
);
- let timeline_path = self.conf.timeline_path(&tenant_id, &timeline_id);
+ let timeline_path = self.conf.timeline_path(&tenant_shard_id, &timeline_id);
anyhow::ensure!(
!timeline_path.exists(),
"Timeline {timeline_path} already exists, cannot create its uninit mark file",
@@ -3170,7 +3196,7 @@ impl Tenant {
let uninit_mark_path = self
.conf
- .timeline_uninit_mark_file_path(tenant_id, timeline_id);
+ .timeline_uninit_mark_file_path(tenant_shard_id, timeline_id);
fs::File::create(&uninit_mark_path)
.context("Failed to create uninit mark file")
.and_then(|_| {
@@ -3178,7 +3204,7 @@ impl Tenant {
.context("Failed to fsync uninit mark file")
})
.with_context(|| {
- format!("Failed to crate uninit mark for timeline {tenant_id}/{timeline_id}")
+ format!("Failed to crate uninit mark for timeline {tenant_shard_id}/{timeline_id}")
})?;
let uninit_mark = TimelineUninitMark::new(uninit_mark_path, timeline_path);
@@ -3189,7 +3215,7 @@ impl Tenant {
/// Gathers inputs from all of the timelines to produce a sizing model input.
///
/// Future is cancellation safe. Only one calculation can be running at once per tenant.
- #[instrument(skip_all, fields(tenant_id=%self.tenant_id))]
+ #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
pub async fn gather_size_inputs(
&self,
// `max_retention_period` overrides the cutoff that is used to calculate the size
@@ -3228,7 +3254,7 @@ impl Tenant {
/// Calculate synthetic tenant size and cache the result.
/// This is periodically called by background worker.
/// result is cached in tenant struct
- #[instrument(skip_all, fields(tenant_id=%self.tenant_id))]
+ #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
pub async fn calculate_synthetic_size(
&self,
cause: LogicalSizeCalculationCause,
@@ -3250,7 +3276,7 @@ impl Tenant {
.store(size, Ordering::Relaxed);
TENANT_SYNTHETIC_SIZE_METRIC
- .get_metric_with_label_values(&[&self.tenant_id.to_string()])
+ .get_metric_with_label_values(&[&self.tenant_shard_id.tenant_id.to_string()])
.unwrap()
.set(size);
}
@@ -3286,9 +3312,9 @@ fn remove_timeline_and_uninit_mark(
pub(crate) async fn create_tenant_files(
conf: &'static PageServerConf,
location_conf: &LocationConf,
- tenant_id: &TenantId,
+ tenant_shard_id: &TenantShardId,
) -> anyhow::Result {
- let target_tenant_directory = conf.tenant_path(tenant_id);
+ let target_tenant_directory = conf.tenant_path(tenant_shard_id);
anyhow::ensure!(
!target_tenant_directory
.try_exists()
@@ -3308,14 +3334,16 @@ pub(crate) async fn create_tenant_files(
let creation_result = try_create_target_tenant_dir(
conf,
location_conf,
- tenant_id,
+ tenant_shard_id,
&temporary_tenant_dir,
&target_tenant_directory,
)
.await;
if creation_result.is_err() {
- error!("Failed to create directory structure for tenant {tenant_id}, cleaning tmp data");
+ error!(
+ "Failed to create directory structure for tenant {tenant_shard_id}, cleaning tmp data"
+ );
if let Err(e) = fs::remove_dir_all(&temporary_tenant_dir) {
error!("Failed to remove temporary tenant directory {temporary_tenant_dir:?}: {e}")
} else if let Err(e) = crashsafe::fsync(&temporary_tenant_dir) {
@@ -3333,31 +3361,31 @@ pub(crate) async fn create_tenant_files(
async fn try_create_target_tenant_dir(
conf: &'static PageServerConf,
location_conf: &LocationConf,
- tenant_id: &TenantId,
+ tenant_shard_id: &TenantShardId,
temporary_tenant_dir: &Utf8Path,
target_tenant_directory: &Utf8Path,
) -> Result<(), anyhow::Error> {
let temporary_tenant_timelines_dir = rebase_directory(
- &conf.timelines_path(tenant_id),
+ &conf.timelines_path(tenant_shard_id),
target_tenant_directory,
temporary_tenant_dir,
)
- .with_context(|| format!("resolve tenant {tenant_id} temporary timelines dir"))?;
+ .with_context(|| format!("resolve tenant {tenant_shard_id} temporary timelines dir"))?;
let temporary_legacy_tenant_config_path = rebase_directory(
- &conf.tenant_config_path(tenant_id),
+ &conf.tenant_config_path(tenant_shard_id),
target_tenant_directory,
temporary_tenant_dir,
)
- .with_context(|| format!("resolve tenant {tenant_id} temporary config path"))?;
+ .with_context(|| format!("resolve tenant {tenant_shard_id} temporary config path"))?;
let temporary_tenant_config_path = rebase_directory(
- &conf.tenant_location_config_path(tenant_id),
+ &conf.tenant_location_config_path(tenant_shard_id),
target_tenant_directory,
temporary_tenant_dir,
)
- .with_context(|| format!("resolve tenant {tenant_id} temporary config path"))?;
+ .with_context(|| format!("resolve tenant {tenant_shard_id} temporary config path"))?;
Tenant::persist_tenant_config_at(
- tenant_id,
+ tenant_shard_id,
&temporary_tenant_config_path,
&temporary_legacy_tenant_config_path,
location_conf,
@@ -3367,7 +3395,7 @@ async fn try_create_target_tenant_dir(
crashsafe::create_dir(&temporary_tenant_timelines_dir).with_context(|| {
format!(
"create tenant {} temporary timelines directory {}",
- tenant_id, temporary_tenant_timelines_dir,
+ tenant_shard_id, temporary_tenant_timelines_dir,
)
})?;
fail::fail_point!("tenant-creation-before-tmp-rename", |_| {
@@ -3382,19 +3410,19 @@ async fn try_create_target_tenant_dir(
fs::rename(temporary_tenant_dir, target_tenant_directory).with_context(|| {
format!(
"move tenant {} temporary directory {} into the permanent one {}",
- tenant_id, temporary_tenant_dir, target_tenant_directory
+ tenant_shard_id, temporary_tenant_dir, target_tenant_directory
)
})?;
let target_dir_parent = target_tenant_directory.parent().with_context(|| {
format!(
"get tenant {} dir parent for {}",
- tenant_id, target_tenant_directory,
+ tenant_shard_id, target_tenant_directory,
)
})?;
crashsafe::fsync(target_dir_parent).with_context(|| {
format!(
"fsync renamed directory's parent {} for tenant {}",
- target_dir_parent, tenant_id,
+ target_dir_parent, tenant_shard_id,
)
})?;
@@ -3472,7 +3500,7 @@ async fn run_initdb(
impl Drop for Tenant {
fn drop(&mut self) {
- remove_tenant_metrics(&self.tenant_id);
+ remove_tenant_metrics(&self.tenant_shard_id.tenant_id);
}
}
/// Dump contents of a layer file to stdout.
@@ -3575,7 +3603,9 @@ pub(crate) mod harness {
pub struct TenantHarness {
pub conf: &'static PageServerConf,
pub tenant_conf: TenantConf,
- pub tenant_id: TenantId,
+ // TODO(sharding): remove duplicative `tenant_id` in favor of access to tenant_shard_id
+ pub(crate) tenant_id: TenantId,
+ pub tenant_shard_id: TenantShardId,
pub generation: Generation,
pub shard: ShardIndex,
pub remote_storage: GenericRemoteStorage,
@@ -3620,8 +3650,9 @@ pub(crate) mod harness {
};
let tenant_id = TenantId::generate();
- fs::create_dir_all(conf.tenant_path(&tenant_id))?;
- fs::create_dir_all(conf.timelines_path(&tenant_id))?;
+ let tenant_shard_id = TenantShardId::unsharded(tenant_id);
+ fs::create_dir_all(conf.tenant_path(&tenant_shard_id))?;
+ fs::create_dir_all(conf.timelines_path(&tenant_shard_id))?;
use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
let remote_fs_dir = conf.workdir.join("localfs");
@@ -3636,6 +3667,7 @@ pub(crate) mod harness {
conf,
tenant_conf,
tenant_id,
+ tenant_shard_id,
generation: Generation::new(0xdeadbeef),
shard: ShardIndex::unsharded(),
remote_storage,
@@ -3655,7 +3687,7 @@ pub(crate) mod harness {
}
fn remote_empty(&self) -> bool {
- let tenant_path = self.conf.tenant_path(&self.tenant_id);
+ let tenant_path = self.conf.tenant_path(&self.tenant_shard_id);
let remote_tenant_dir = self
.remote_fs_dir
.join(tenant_path.strip_prefix(&self.conf.workdir).unwrap());
@@ -3695,7 +3727,7 @@ pub(crate) mod harness {
))
.unwrap(),
walredo_mgr,
- self.tenant_id,
+ self.tenant_shard_id,
Some(self.remote_storage.clone()),
self.deletion_queue.new_client(),
));
@@ -3704,17 +3736,17 @@ pub(crate) mod harness {
LoadMode::Local => {
tenant
.load_local(None, ctx)
- .instrument(info_span!("try_load", tenant_id=%self.tenant_id))
+ .instrument(info_span!("try_load", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))
.await?;
}
LoadMode::Remote => {
let preload = tenant
.preload(&self.remote_storage, CancellationToken::new())
- .instrument(info_span!("try_load_preload", tenant_id=%self.tenant_id))
+ .instrument(info_span!("try_load_preload", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))
.await?;
tenant
.attach(None, Some(preload), ctx)
- .instrument(info_span!("try_load", tenant_id=%self.tenant_id))
+ .instrument(info_span!("try_load", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))
.await?;
}
}
@@ -3748,7 +3780,7 @@ pub(crate) mod harness {
}
pub fn timeline_path(&self, timeline_id: &TimelineId) -> Utf8PathBuf {
- self.conf.timeline_path(&self.tenant_id, timeline_id)
+ self.conf.timeline_path(&self.tenant_shard_id, timeline_id)
}
}
@@ -3864,7 +3896,7 @@ mod tests {
e.to_string(),
format!(
"Timeline {}/{} already exists in pageserver's memory",
- tenant.tenant_id, TIMELINE_ID
+ tenant.tenant_shard_id, TIMELINE_ID
)
),
}
@@ -4248,7 +4280,7 @@ mod tests {
// so that all uploads finish & we can call harness.load() below again
tenant
.shutdown(Default::default(), true)
- .instrument(info_span!("test_shutdown", tenant_id=%tenant.tenant_id))
+ .instrument(info_span!("test_shutdown", tenant_id=%tenant.tenant_shard_id))
.await
.ok()
.unwrap();
@@ -4289,7 +4321,7 @@ mod tests {
// so that all uploads finish & we can call harness.load() below again
tenant
.shutdown(Default::default(), true)
- .instrument(info_span!("test_shutdown", tenant_id=%tenant.tenant_id))
+ .instrument(info_span!("test_shutdown", tenant_id=%tenant.tenant_shard_id))
.await
.ok()
.unwrap();
@@ -4351,7 +4383,7 @@ mod tests {
// so that all uploads finish & we can call harness.try_load() below again
tenant
.shutdown(Default::default(), true)
- .instrument(info_span!("test_shutdown", tenant_id=%tenant.tenant_id))
+ .instrument(info_span!("test_shutdown", tenant_id=%tenant.tenant_shard_id))
.await
.ok()
.unwrap();
@@ -4884,7 +4916,7 @@ mod tests {
let raw_tline = tline.raw_timeline().unwrap();
raw_tline
.shutdown()
- .instrument(info_span!("test_shutdown", tenant_id=%raw_tline.tenant_id))
+ .instrument(info_span!("test_shutdown", tenant_id=%raw_tline.tenant_shard_id))
.await;
std::mem::forget(tline);
}
@@ -4896,7 +4928,7 @@ mod tests {
assert_eq!(
e,
GetTimelineError::NotFound {
- tenant_id: tenant.tenant_id,
+ tenant_id: tenant.tenant_shard_id.tenant_id,
timeline_id: TIMELINE_ID,
}
)
@@ -4905,12 +4937,12 @@ mod tests {
assert!(!harness
.conf
- .timeline_path(&tenant.tenant_id, &TIMELINE_ID)
+ .timeline_path(&tenant.tenant_shard_id, &TIMELINE_ID)
.exists());
assert!(!harness
.conf
- .timeline_uninit_mark_file_path(tenant.tenant_id, TIMELINE_ID)
+ .timeline_uninit_mark_file_path(tenant.tenant_shard_id, TIMELINE_ID)
.exists());
Ok(())
diff --git a/pageserver/src/tenant/delete.rs b/pageserver/src/tenant/delete.rs
index 6289d40b93..b7b2ef9c79 100644
--- a/pageserver/src/tenant/delete.rs
+++ b/pageserver/src/tenant/delete.rs
@@ -2,16 +2,13 @@ use std::sync::Arc;
use anyhow::Context;
use camino::{Utf8Path, Utf8PathBuf};
-use pageserver_api::models::TenantState;
+use pageserver_api::{models::TenantState, shard::TenantShardId};
use remote_storage::{GenericRemoteStorage, RemotePath};
use tokio::sync::OwnedMutexGuard;
use tokio_util::sync::CancellationToken;
use tracing::{error, instrument, Instrument, Span};
-use utils::{
- backoff, completion, crashsafe, fs_ext,
- id::{TenantId, TimelineId},
-};
+use utils::{backoff, completion, crashsafe, fs_ext, id::TimelineId};
use crate::{
config::PageServerConf,
@@ -60,10 +57,10 @@ type DeletionGuard = tokio::sync::OwnedMutexGuard;
fn remote_tenant_delete_mark_path(
conf: &PageServerConf,
- tenant_id: &TenantId,
+ tenant_shard_id: &TenantShardId,
) -> anyhow::Result {
let tenant_remote_path = conf
- .tenant_path(tenant_id)
+ .tenant_path(tenant_shard_id)
.strip_prefix(&conf.workdir)
.context("Failed to strip workdir prefix")
.and_then(RemotePath::new)
@@ -74,9 +71,9 @@ fn remote_tenant_delete_mark_path(
async fn create_remote_delete_mark(
conf: &PageServerConf,
remote_storage: &GenericRemoteStorage,
- tenant_id: &TenantId,
+ tenant_shard_id: &TenantShardId,
) -> Result<(), DeleteTenantError> {
- let remote_mark_path = remote_tenant_delete_mark_path(conf, tenant_id)?;
+ let remote_mark_path = remote_tenant_delete_mark_path(conf, tenant_shard_id)?;
let data: &[u8] = &[];
backoff::retry(
@@ -100,9 +97,9 @@ async fn create_remote_delete_mark(
async fn create_local_delete_mark(
conf: &PageServerConf,
- tenant_id: &TenantId,
+ tenant_shard_id: &TenantShardId,
) -> Result<(), DeleteTenantError> {
- let marker_path = conf.tenant_deleted_mark_file_path(tenant_id);
+ let marker_path = conf.tenant_deleted_mark_file_path(tenant_shard_id);
// Note: we're ok to replace existing file.
let _ = std::fs::OpenOptions::new()
@@ -171,10 +168,10 @@ async fn ensure_timelines_dir_empty(timelines_path: &Utf8Path) -> Result<(), Del
async fn remove_tenant_remote_delete_mark(
conf: &PageServerConf,
remote_storage: Option<&GenericRemoteStorage>,
- tenant_id: &TenantId,
+ tenant_shard_id: &TenantShardId,
) -> Result<(), DeleteTenantError> {
if let Some(remote_storage) = remote_storage {
- let path = remote_tenant_delete_mark_path(conf, tenant_id)?;
+ let path = remote_tenant_delete_mark_path(conf, tenant_shard_id)?;
backoff::retry(
|| async { remote_storage.delete(&path).await },
|_e| false,
@@ -193,7 +190,7 @@ async fn remove_tenant_remote_delete_mark(
// Cleanup fs traces: tenant config, timelines dir local delete mark, tenant dir
async fn cleanup_remaining_fs_traces(
conf: &PageServerConf,
- tenant_id: &TenantId,
+ tenant_shard_id: &TenantShardId,
) -> Result<(), DeleteTenantError> {
let rm = |p: Utf8PathBuf, is_dir: bool| async move {
if is_dir {
@@ -205,8 +202,8 @@ async fn cleanup_remaining_fs_traces(
.with_context(|| format!("failed to delete {p}"))
};
- rm(conf.tenant_config_path(tenant_id), false).await?;
- rm(conf.tenant_location_config_path(tenant_id), false).await?;
+ rm(conf.tenant_config_path(tenant_shard_id), false).await?;
+ rm(conf.tenant_location_config_path(tenant_shard_id), false).await?;
fail::fail_point!("tenant-delete-before-remove-timelines-dir", |_| {
Err(anyhow::anyhow!(
@@ -214,7 +211,7 @@ async fn cleanup_remaining_fs_traces(
))?
});
- rm(conf.timelines_path(tenant_id), true).await?;
+ rm(conf.timelines_path(tenant_shard_id), true).await?;
fail::fail_point!("tenant-delete-before-remove-deleted-mark", |_| {
Err(anyhow::anyhow!(
@@ -228,14 +225,14 @@ async fn cleanup_remaining_fs_traces(
// to be reordered later and thus missed if a crash occurs.
// Note that we dont need to sync after mark file is removed
// because we can tolerate the case when mark file reappears on startup.
- let tenant_path = &conf.tenant_path(tenant_id);
+ let tenant_path = &conf.tenant_path(tenant_shard_id);
if tenant_path.exists() {
- crashsafe::fsync_async(&conf.tenant_path(tenant_id))
+ crashsafe::fsync_async(&conf.tenant_path(tenant_shard_id))
.await
.context("fsync_pre_mark_remove")?;
}
- rm(conf.tenant_deleted_mark_file_path(tenant_id), false).await?;
+ rm(conf.tenant_deleted_mark_file_path(tenant_shard_id), false).await?;
fail::fail_point!("tenant-delete-before-remove-tenant-dir", |_| {
Err(anyhow::anyhow!(
@@ -243,7 +240,7 @@ async fn cleanup_remaining_fs_traces(
))?
});
- rm(conf.tenant_path(tenant_id), true).await?;
+ rm(conf.tenant_path(tenant_shard_id), true).await?;
Ok(())
}
@@ -324,7 +321,7 @@ impl DeleteTenantFlow {
// Though sounds scary, different mark name?
// Detach currently uses remove_dir_all so in case of a crash we can end up in a weird state.
if let Some(remote_storage) = &remote_storage {
- create_remote_delete_mark(conf, remote_storage, &tenant.tenant_id)
+ create_remote_delete_mark(conf, remote_storage, &tenant.tenant_shard_id)
.await
.context("remote_mark")?
}
@@ -335,7 +332,7 @@ impl DeleteTenantFlow {
))?
});
- create_local_delete_mark(conf, &tenant.tenant_id)
+ create_local_delete_mark(conf, &tenant.tenant_shard_id)
.await
.context("local delete mark")?;
@@ -377,9 +374,11 @@ impl DeleteTenantFlow {
return Ok(acquire(tenant));
}
- let tenant_id = tenant.tenant_id;
// Check local mark first, if its there there is no need to go to s3 to check whether remote one exists.
- if conf.tenant_deleted_mark_file_path(&tenant_id).exists() {
+ if conf
+ .tenant_deleted_mark_file_path(&tenant.tenant_shard_id)
+ .exists()
+ {
Ok(acquire(tenant))
} else {
Ok(None)
@@ -462,12 +461,12 @@ impl DeleteTenantFlow {
tenants: &'static std::sync::RwLock,
tenant: Arc,
) {
- let tenant_id = tenant.tenant_id;
+ let tenant_shard_id = tenant.tenant_shard_id;
task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
TaskKind::TimelineDeletionWorker,
- Some(tenant_id),
+ Some(tenant_shard_id.tenant_id),
None,
"tenant_delete",
false,
@@ -481,7 +480,7 @@ impl DeleteTenantFlow {
Ok(())
}
.instrument({
- let span = tracing::info_span!(parent: None, "delete_tenant", tenant_id=%tenant_id);
+ let span = tracing::info_span!(parent: None, "delete_tenant", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug());
span.follows_from(Span::current());
span
}),
@@ -519,7 +518,7 @@ impl DeleteTenantFlow {
}
}
- let timelines_path = conf.timelines_path(&tenant.tenant_id);
+ let timelines_path = conf.timelines_path(&tenant.tenant_shard_id);
// May not exist if we fail in cleanup_remaining_fs_traces after removing it
if timelines_path.exists() {
// sanity check to guard against layout changes
@@ -528,7 +527,8 @@ impl DeleteTenantFlow {
.context("timelines dir not empty")?;
}
- remove_tenant_remote_delete_mark(conf, remote_storage.as_ref(), &tenant.tenant_id).await?;
+ remove_tenant_remote_delete_mark(conf, remote_storage.as_ref(), &tenant.tenant_shard_id)
+ .await?;
fail::fail_point!("tenant-delete-before-cleanup-remaining-fs-traces", |_| {
Err(anyhow::anyhow!(
@@ -536,7 +536,7 @@ impl DeleteTenantFlow {
))?
});
- cleanup_remaining_fs_traces(conf, &tenant.tenant_id)
+ cleanup_remaining_fs_traces(conf, &tenant.tenant_shard_id)
.await
.context("cleanup_remaining_fs_traces")?;
@@ -553,7 +553,7 @@ impl DeleteTenantFlow {
// we encounter an InProgress marker, yield the barrier it contains and wait on it.
let barrier = {
let mut locked = tenants.write().unwrap();
- let removed = locked.remove(&tenant.tenant_id);
+ let removed = locked.remove(&tenant.tenant_shard_id.tenant_id);
// FIXME: we should not be modifying this from outside of mgr.rs.
// This will go away when we simplify deletion (https://github.com/neondatabase/neon/issues/5080)
diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs
index 9a06d9df61..591eacd104 100644
--- a/pageserver/src/tenant/ephemeral_file.rs
+++ b/pageserver/src/tenant/ephemeral_file.rs
@@ -7,18 +7,19 @@ use crate::page_cache::{self, PAGE_SZ};
use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
use crate::virtual_file::VirtualFile;
use camino::Utf8PathBuf;
+use pageserver_api::shard::TenantShardId;
use std::cmp::min;
use std::fs::OpenOptions;
use std::io::{self, ErrorKind};
use std::ops::DerefMut;
use std::sync::atomic::AtomicU64;
use tracing::*;
-use utils::id::{TenantId, TimelineId};
+use utils::id::TimelineId;
pub struct EphemeralFile {
page_cache_file_id: page_cache::FileId,
- _tenant_id: TenantId,
+ _tenant_shard_id: TenantShardId,
_timeline_id: TimelineId,
file: VirtualFile,
len: u64,
@@ -31,7 +32,7 @@ pub struct EphemeralFile {
impl EphemeralFile {
pub async fn create(
conf: &PageServerConf,
- tenant_id: TenantId,
+ tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
) -> Result {
static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
@@ -39,7 +40,7 @@ impl EphemeralFile {
NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let filename = conf
- .timeline_path(&tenant_id, &timeline_id)
+ .timeline_path(&tenant_shard_id, &timeline_id)
.join(Utf8PathBuf::from(format!(
"ephemeral-{filename_disambiguator}"
)));
@@ -52,7 +53,7 @@ impl EphemeralFile {
Ok(EphemeralFile {
page_cache_file_id: page_cache::next_file_id(),
- _tenant_id: tenant_id,
+ _tenant_shard_id: tenant_shard_id,
_timeline_id: timeline_id,
file,
len: 0,
@@ -282,7 +283,7 @@ mod tests {
) -> Result<
(
&'static PageServerConf,
- TenantId,
+ TenantShardId,
TimelineId,
RequestContext,
),
@@ -295,13 +296,13 @@ mod tests {
// OK in a test.
let conf: &'static PageServerConf = Box::leak(Box::new(conf));
- let tenant_id = TenantId::from_str("11000000000000000000000000000000").unwrap();
+ let tenant_shard_id = TenantShardId::from_str("11000000000000000000000000000000").unwrap();
let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap();
- fs::create_dir_all(conf.timeline_path(&tenant_id, &timeline_id))?;
+ fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id))?;
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
- Ok((conf, tenant_id, timeline_id, ctx))
+ Ok((conf, tenant_shard_id, timeline_id, ctx))
}
#[tokio::test]
diff --git a/pageserver/src/tenant/metadata.rs b/pageserver/src/tenant/metadata.rs
index 38fd426746..6fb86c65e2 100644
--- a/pageserver/src/tenant/metadata.rs
+++ b/pageserver/src/tenant/metadata.rs
@@ -11,15 +11,12 @@
use std::io::{self};
use anyhow::{ensure, Context};
+use pageserver_api::shard::TenantShardId;
use serde::{de::Error, Deserialize, Serialize, Serializer};
use thiserror::Error;
use utils::bin_ser::SerializeError;
use utils::crashsafe::path_with_suffix_extension;
-use utils::{
- bin_ser::BeSer,
- id::{TenantId, TimelineId},
- lsn::Lsn,
-};
+use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn};
use crate::config::PageServerConf;
use crate::virtual_file::VirtualFile;
@@ -272,14 +269,14 @@ impl Serialize for TimelineMetadata {
}
/// Save timeline metadata to file
-#[tracing::instrument(skip_all, fields(%tenant_id, %timeline_id))]
+#[tracing::instrument(skip_all, fields(%tenant_id=tenant_shard_id.tenant_id, %shard_id=tenant_shard_id.shard_slug(), %timeline_id))]
pub async fn save_metadata(
conf: &'static PageServerConf,
- tenant_id: &TenantId,
+ tenant_shard_id: &TenantShardId,
timeline_id: &TimelineId,
data: &TimelineMetadata,
) -> anyhow::Result<()> {
- let path = conf.metadata_path(tenant_id, timeline_id);
+ let path = conf.metadata_path(tenant_shard_id, timeline_id);
let temp_path = path_with_suffix_extension(&path, TEMP_FILE_SUFFIX);
let metadata_bytes = data.to_bytes().context("serialize metadata")?;
VirtualFile::crashsafe_overwrite(&path, &temp_path, &metadata_bytes)
@@ -299,10 +296,10 @@ pub enum LoadMetadataError {
pub fn load_metadata(
conf: &'static PageServerConf,
- tenant_id: &TenantId,
+ tenant_shard_id: &TenantShardId,
timeline_id: &TimelineId,
) -> Result {
- let metadata_path = conf.metadata_path(tenant_id, timeline_id);
+ let metadata_path = conf.metadata_path(tenant_shard_id, timeline_id);
let metadata_bytes = std::fs::read(metadata_path)?;
Ok(TimelineMetadata::from_bytes(&metadata_bytes)?)
diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs
index 52d697a878..e94d29327e 100644
--- a/pageserver/src/tenant/mgr.rs
+++ b/pageserver/src/tenant/mgr.rs
@@ -272,8 +272,8 @@ pub struct TenantManager {
}
fn emergency_generations(
- tenant_confs: &HashMap>,
-) -> HashMap {
+ tenant_confs: &HashMap>,
+) -> HashMap {
tenant_confs
.iter()
.filter_map(|(tid, lc)| {
@@ -293,10 +293,10 @@ fn emergency_generations(
async fn init_load_generations(
conf: &'static PageServerConf,
- tenant_confs: &HashMap>,
+ tenant_confs: &HashMap>,
resources: &TenantSharedResources,
cancel: &CancellationToken,
-) -> anyhow::Result