Compare commits

...

5 Commits

Author SHA1 Message Date
John Spray
652479cf62 pageserver: update doc comments referencing renamed types 2025-01-16 15:38:34 +00:00
John Spray
b82f037ab1 pageserver: rename TenantsMap -> TenantShardMap 2025-01-16 15:22:53 +00:00
John Spray
fa96b758bc pageserver: rename TenantManager -> TenantShardManager 2025-01-16 15:22:17 +00:00
John Spray
ecfe76865b pageserver: rename TenantHarness -> TenantShardHarness 2025-01-16 15:20:48 +00:00
John Spray
2fa492943a pageserver: rename Tenant to TenantShard 2025-01-16 15:19:15 +00:00
35 changed files with 413 additions and 362 deletions

View File

@@ -109,13 +109,13 @@ pub struct PageServerConf {
/// A lower value implicitly deprioritizes loading such tenants, vs. other work in the system.
pub concurrent_tenant_warmup: ConfigurableSemaphore,
/// Number of concurrent [`Tenant::gather_size_inputs`](crate::tenant::Tenant::gather_size_inputs) allowed.
/// Number of concurrent [`TenantShard::gather_size_inputs`](crate::tenant::TenantShard::gather_size_inputs) allowed.
pub concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore,
/// Limit of concurrent [`Tenant::gather_size_inputs`] issued by module `eviction_task`.
/// Limit of concurrent [`TenantShard::gather_size_inputs`] issued by module `eviction_task`.
/// The number of permits is the same as `concurrent_tenant_size_logical_size_queries`.
/// See the comment in `eviction_task` for details.
///
/// [`Tenant::gather_size_inputs`]: crate::tenant::Tenant::gather_size_inputs
/// [`TenantShard::gather_size_inputs`]: crate::tenant::TenantShard::gather_size_inputs
pub eviction_task_immitated_concurrent_logical_size_queries: ConfigurableSemaphore,
// How often to collect metrics and send them to the metrics endpoint.
@@ -509,10 +509,10 @@ impl ConfigurableSemaphore {
/// Initializse using a non-zero amount of permits.
///
/// Require a non-zero initial permits, because using permits == 0 is a crude way to disable a
/// feature such as [`Tenant::gather_size_inputs`]. Otherwise any semaphore using future will
/// feature such as [`TenantShard::gather_size_inputs`]. Otherwise any semaphore using future will
/// behave like [`futures::future::pending`], just waiting until new permits are added.
///
/// [`Tenant::gather_size_inputs`]: crate::tenant::Tenant::gather_size_inputs
/// [`TenantShard::gather_size_inputs`]: crate::tenant::TenantShard::gather_size_inputs
pub fn new(initial_permits: NonZeroUsize) -> Self {
ConfigurableSemaphore {
initial_permits,

View File

@@ -7,7 +7,7 @@ use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::size::CalculateSyntheticSizeError;
use crate::tenant::tasks::BackgroundLoopKind;
use crate::tenant::{mgr::TenantManager, LogicalSizeCalculationCause, Tenant};
use crate::tenant::{mgr::TenantShardManager, LogicalSizeCalculationCause, TenantShard};
use camino::Utf8PathBuf;
use consumption_metrics::EventType;
use itertools::Itertools as _;
@@ -95,7 +95,7 @@ type Cache = HashMap<MetricsKey, NewRawMetric>;
pub async fn run(
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
tenant_manager: Arc<TenantShardManager>,
cancel: CancellationToken,
) {
let Some(metric_collection_endpoint) = conf.metric_collection_endpoint.as_ref() else {
@@ -150,7 +150,7 @@ pub async fn run(
/// Main thread that serves metrics collection
#[allow(clippy::too_many_arguments)]
async fn collect_metrics(
tenant_manager: Arc<TenantManager>,
tenant_manager: Arc<TenantShardManager>,
metric_collection_endpoint: &Url,
metric_collection_bucket: &Option<RemoteStorageConfig>,
metric_collection_interval: Duration,
@@ -362,7 +362,7 @@ async fn reschedule(
/// Caclculate synthetic size for each active tenant
async fn calculate_synthetic_size_worker(
tenant_manager: Arc<TenantManager>,
tenant_manager: Arc<TenantShardManager>,
synthetic_size_calculation_interval: Duration,
cancel: CancellationToken,
ctx: RequestContext,
@@ -425,7 +425,7 @@ async fn calculate_synthetic_size_worker(
}
}
async fn calculate_and_log(tenant: &Tenant, cancel: &CancellationToken, ctx: &RequestContext) {
async fn calculate_and_log(tenant: &TenantShard, cancel: &CancellationToken, ctx: &RequestContext) {
const CAUSE: LogicalSizeCalculationCause =
LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize;

View File

@@ -1,4 +1,4 @@
use crate::tenant::mgr::TenantManager;
use crate::tenant::mgr::TenantShardManager;
use crate::{context::RequestContext, tenant::timeline::logical_size::CurrentLogicalSize};
use chrono::{DateTime, Utc};
use consumption_metrics::EventType;
@@ -174,9 +174,9 @@ impl MetricsKey {
.absolute_values()
}
/// [`Tenant::remote_size`]
/// [`TenantShard::remote_size`]
///
/// [`Tenant::remote_size`]: crate::tenant::Tenant::remote_size
/// [`TenantShard::remote_size`]: crate::tenant::TenantShard::remote_size
const fn remote_storage_size(tenant_id: TenantId) -> AbsoluteValueFactory {
MetricsKey {
tenant_id,
@@ -198,9 +198,9 @@ impl MetricsKey {
.absolute_values()
}
/// [`Tenant::cached_synthetic_size`] as refreshed by [`calculate_synthetic_size_worker`].
/// [`TenantShard::cached_synthetic_size`] as refreshed by [`calculate_synthetic_size_worker`].
///
/// [`Tenant::cached_synthetic_size`]: crate::tenant::Tenant::cached_synthetic_size
/// [`TenantShard::cached_synthetic_size`]: crate::tenant::TenantShard::cached_synthetic_size
/// [`calculate_synthetic_size_worker`]: super::calculate_synthetic_size_worker
const fn synthetic_size(tenant_id: TenantId) -> AbsoluteValueFactory {
MetricsKey {
@@ -213,7 +213,7 @@ impl MetricsKey {
}
pub(super) async fn collect_all_metrics(
tenant_manager: &Arc<TenantManager>,
tenant_manager: &Arc<TenantShardManager>,
cached_metrics: &Cache,
ctx: &RequestContext,
) -> Vec<NewRawMetric> {
@@ -253,7 +253,7 @@ pub(super) async fn collect_all_metrics(
async fn collect<S>(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec<NewRawMetric>
where
S: futures::stream::Stream<Item = (TenantId, Arc<crate::tenant::Tenant>)>,
S: futures::stream::Stream<Item = (TenantId, Arc<crate::tenant::TenantShard>)>,
{
let mut current_metrics: Vec<NewRawMetric> = Vec::new();
@@ -307,7 +307,7 @@ impl TenantSnapshot {
///
/// `resident_size` is calculated of the timelines we had access to for other metrics, so we
/// cannot just list timelines here.
fn collect(t: &Arc<crate::tenant::Tenant>, resident_size: u64) -> Self {
fn collect(t: &Arc<crate::tenant::TenantShard>, resident_size: u64) -> Self {
TenantSnapshot {
resident_size,
remote_size: t.remote_size(),

View File

@@ -703,7 +703,7 @@ mod test {
use crate::{
controller_upcall_client::RetryForeverError,
tenant::{harness::TenantHarness, storage_layer::DeltaLayerName},
tenant::{harness::TenantShardHarness, storage_layer::DeltaLayerName},
};
use super::*;
@@ -722,7 +722,7 @@ mod test {
});
struct TestSetup {
harness: TenantHarness,
harness: TenantShardHarness,
remote_fs_dir: Utf8PathBuf,
storage: GenericRemoteStorage,
mock_control_plane: MockControlPlane,
@@ -825,7 +825,7 @@ mod test {
async fn setup(test_name: &str) -> anyhow::Result<TestSetup> {
let test_name = Box::leak(Box::new(format!("deletion_queue__{test_name}")));
let harness = TenantHarness::create(test_name).await?;
let harness = TenantShardHarness::create(test_name).await?;
// We do not load() the harness: we only need its config and remote_storage

View File

@@ -57,7 +57,7 @@ use crate::{
metrics::disk_usage_based_eviction::METRICS,
task_mgr::{self, BACKGROUND_RUNTIME},
tenant::{
mgr::TenantManager,
mgr::TenantShardManager,
remote_timeline_client::LayerFileMetadata,
secondary::SecondaryTenant,
storage_layer::{AsLayerDesc, EvictionError, Layer, LayerName, LayerVisibilityHint},
@@ -166,7 +166,7 @@ pub fn launch_disk_usage_global_eviction_task(
conf: &'static PageServerConf,
storage: GenericRemoteStorage,
state: Arc<State>,
tenant_manager: Arc<TenantManager>,
tenant_manager: Arc<TenantShardManager>,
background_jobs_barrier: completion::Barrier,
) -> Option<DiskUsageEvictionTask> {
let Some(task_config) = &conf.disk_usage_based_eviction else {
@@ -203,7 +203,7 @@ async fn disk_usage_eviction_task(
state: &State,
task_config: &DiskUsageEvictionTaskConfig,
storage: &GenericRemoteStorage,
tenant_manager: Arc<TenantManager>,
tenant_manager: Arc<TenantShardManager>,
cancel: CancellationToken,
) {
scopeguard::defer! {
@@ -265,7 +265,7 @@ async fn disk_usage_eviction_task_iteration(
state: &State,
task_config: &DiskUsageEvictionTaskConfig,
storage: &GenericRemoteStorage,
tenant_manager: &Arc<TenantManager>,
tenant_manager: &Arc<TenantShardManager>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let tenants_dir = tenant_manager.get_conf().tenants_path();
@@ -361,7 +361,7 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
state: &State,
_storage: &GenericRemoteStorage,
usage_pre: U,
tenant_manager: &Arc<TenantManager>,
tenant_manager: &Arc<TenantShardManager>,
eviction_order: EvictionOrder,
cancel: &CancellationToken,
) -> anyhow::Result<IterationOutcome<U>> {
@@ -788,7 +788,7 @@ enum EvictionCandidates {
/// - tenant B 1 layer
/// - tenant C 8 layers
async fn collect_eviction_candidates(
tenant_manager: &Arc<TenantManager>,
tenant_manager: &Arc<TenantShardManager>,
eviction_order: EvictionOrder,
cancel: &CancellationToken,
) -> anyhow::Result<EvictionCandidates> {

View File

@@ -74,7 +74,7 @@ use crate::task_mgr::TaskKind;
use crate::tenant::config::{LocationConf, TenantConfOpt};
use crate::tenant::mgr::GetActiveTenantError;
use crate::tenant::mgr::{
GetTenantError, TenantManager, TenantMapError, TenantMapInsertError, TenantSlotError,
GetTenantError, TenantMapError, TenantMapInsertError, TenantShardManager, TenantSlotError,
TenantSlotUpsertError, TenantStateError,
};
use crate::tenant::mgr::{TenantSlot, UpsertLocationError};
@@ -130,7 +130,7 @@ pub(crate) const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
pub struct State {
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
tenant_manager: Arc<TenantShardManager>,
auth: Option<Arc<SwappableJwtAuth>>,
allowlist_routes: &'static [&'static str],
remote_storage: GenericRemoteStorage,
@@ -145,7 +145,7 @@ impl State {
#[allow(clippy::too_many_arguments)]
pub fn new(
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
tenant_manager: Arc<TenantShardManager>,
auth: Option<Arc<SwappableJwtAuth>>,
remote_storage: GenericRemoteStorage,
broker_client: storage_broker::BrokerClientChannel,
@@ -1761,7 +1761,7 @@ async fn update_tenant_config_handler(
&ShardParameters::default(),
);
crate::tenant::Tenant::persist_tenant_config(state.conf, &tenant_shard_id, &location_conf)
crate::tenant::TenantShard::persist_tenant_config(state.conf, &tenant_shard_id, &location_conf)
.await
.map_err(|e| ApiError::InternalServerError(anyhow::anyhow!(e)))?;
@@ -1802,7 +1802,7 @@ async fn patch_tenant_config_handler(
&ShardParameters::default(),
);
crate::tenant::Tenant::persist_tenant_config(state.conf, &tenant_shard_id, &location_conf)
crate::tenant::TenantShard::persist_tenant_config(state.conf, &tenant_shard_id, &location_conf)
.await
.map_err(|e| ApiError::InternalServerError(anyhow::anyhow!(e)))?;
@@ -2571,7 +2571,7 @@ async fn timeline_collect_keyspace(
}
async fn active_timeline_of_active_tenant(
tenant_manager: &TenantManager,
tenant_manager: &TenantShardManager,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
) -> Result<Arc<Timeline>, ApiError> {

View File

@@ -36,7 +36,7 @@ pub mod walredo;
use camino::Utf8Path;
use deletion_queue::DeletionQueue;
use tenant::{
mgr::{BackgroundPurges, TenantManager},
mgr::{BackgroundPurges, TenantShardManager},
secondary,
};
use tracing::{info, info_span};
@@ -81,7 +81,7 @@ pub async fn shutdown_pageserver(
page_service: page_service::Listener,
consumption_metrics_worker: ConsumptionMetricsTasks,
disk_usage_eviction_task: Option<DiskUsageEvictionTask>,
tenant_manager: &TenantManager,
tenant_manager: &TenantShardManager,
background_purges: BackgroundPurges,
mut deletion_queue: DeletionQueue,
secondary_controller_tasks: secondary::GlobalTasks,

View File

@@ -920,7 +920,7 @@ pub(crate) static TIMELINE_EPHEMERAL_BYTES: Lazy<UIntGauge> = Lazy::new(|| {
.expect("Failed to register metric")
});
/// Metrics related to the lifecycle of a [`crate::tenant::Tenant`] object: things
/// Metrics related to the lifecycle of a [`crate::tenant::TenantShard`] object: things
/// like how long it took to load.
///
/// Note that these are process-global metrics, _not_ per-tenant metrics. Per-tenant

View File

@@ -59,7 +59,7 @@ use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_i
use crate::task_mgr::TaskKind;
use crate::task_mgr::{self, COMPUTE_REQUEST_RUNTIME};
use crate::tenant::mgr::ShardSelector;
use crate::tenant::mgr::TenantManager;
use crate::tenant::mgr::TenantShardManager;
use crate::tenant::mgr::{GetActiveTenantError, GetTenantError, ShardResolveResult};
use crate::tenant::timeline::{self, WaitLsnError};
use crate::tenant::GetTimelineError;
@@ -94,7 +94,7 @@ pub struct Connections {
pub fn spawn(
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
tenant_manager: Arc<TenantShardManager>,
pg_auth: Option<Arc<SwappableJwtAuth>>,
tcp_listener: tokio::net::TcpListener,
) -> Listener {
@@ -159,7 +159,7 @@ impl Connections {
/// open connections.
///
pub async fn libpq_listener_main(
tenant_manager: Arc<TenantManager>,
tenant_manager: Arc<TenantShardManager>,
auth: Option<Arc<SwappableJwtAuth>>,
listener: tokio::net::TcpListener,
auth_type: AuthType,
@@ -218,7 +218,7 @@ type ConnectionHandlerResult = anyhow::Result<()>;
#[instrument(skip_all, fields(peer_addr))]
async fn page_service_conn_main(
tenant_manager: Arc<TenantManager>,
tenant_manager: Arc<TenantShardManager>,
auth: Option<Arc<SwappableJwtAuth>>,
socket: tokio::net::TcpStream,
auth_type: AuthType,
@@ -337,7 +337,7 @@ struct TimelineHandles {
}
impl TimelineHandles {
fn new(tenant_manager: Arc<TenantManager>) -> Self {
fn new(tenant_manager: Arc<TenantShardManager>) -> Self {
Self {
wrapper: TenantManagerWrapper {
tenant_manager,
@@ -379,7 +379,7 @@ impl TimelineHandles {
}
pub(crate) struct TenantManagerWrapper {
tenant_manager: Arc<TenantManager>,
tenant_manager: Arc<TenantShardManager>,
// We do not support switching tenant_id on a connection at this point.
// We can can add support for this later if needed without changing
// the protocol.
@@ -613,7 +613,7 @@ impl BatchedFeMessage {
impl PageServerHandler {
pub fn new(
tenant_manager: Arc<TenantManager>,
tenant_manager: Arc<TenantShardManager>,
auth: Option<Arc<SwappableJwtAuth>>,
pipelining_config: PageServicePipeliningConfig,
connection_ctx: RequestContext,

View File

@@ -2404,13 +2404,13 @@ mod tests {
use super::*;
use crate::{tenant::harness::TenantHarness, DEFAULT_PG_VERSION};
use crate::{tenant::harness::TenantShardHarness, DEFAULT_PG_VERSION};
/// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
#[tokio::test]
async fn aux_files_round_trip() -> anyhow::Result<()> {
let name = "aux_files_round_trip";
let harness = TenantHarness::create(name).await?;
let harness = TenantShardHarness::create(name).await?;
pub const TIMELINE_ID: TimelineId =
TimelineId::from_array(hex!("11223344556677881122334455667788"));

View File

@@ -193,7 +193,7 @@ pub struct TenantSharedResources {
pub l0_flush_global_state: L0FlushGlobalState,
}
/// A [`Tenant`] is really an _attached_ tenant. The configuration
/// A [`TenantShard`] is really an _attached_ tenant. The configuration
/// for an attached tenant is a subset of the [`LocationConf`], represented
/// in this struct.
#[derive(Clone)]
@@ -273,7 +273,7 @@ pub(crate) enum SpawnMode {
///
/// Tenant consists of multiple timelines. Keep them in a hash table.
///
pub struct Tenant {
pub struct TenantShard {
// Global pageserver config parameters
pub conf: &'static PageServerConf,
@@ -295,7 +295,7 @@ pub struct Tenant {
shard_identity: ShardIdentity,
/// The remote storage generation, used to protect S3 objects from split-brain.
/// Does not change over the lifetime of the [`Tenant`] object.
/// Does not change over the lifetime of the [`TenantShard`] object.
///
/// This duplicates the generation stored in LocationConf, but that structure is mutable:
/// this copy enforces the invariant that generatio doesn't change during a Tenant's lifetime.
@@ -334,7 +334,7 @@ pub struct Tenant {
// Access to global deletion queue for when this tenant wants to schedule a deletion
deletion_queue_client: DeletionQueueClient,
/// Cached logical sizes updated updated on each [`Tenant::gather_size_inputs`].
/// Cached logical sizes updated updated on each [`TenantShard::gather_size_inputs`].
cached_logical_sizes: tokio::sync::Mutex<HashMap<(TimelineId, Lsn), u64>>,
cached_synthetic_tenant_size: Arc<AtomicU64>,
@@ -364,7 +364,7 @@ pub struct Tenant {
pub(crate) gate: Gate,
/// Throttle applied at the top of [`Timeline::get`].
/// All [`Tenant::timelines`] of a given [`Tenant`] instance share the same [`throttle::Throttle`] instance.
/// All [`TenantShard::timelines`] of a given [`TenantShard`] instance share the same [`throttle::Throttle`] instance.
pub(crate) pagestream_throttle: Arc<throttle::Throttle>,
pub(crate) pagestream_throttle_metrics: Arc<crate::metrics::tenant_throttling::Pagestream>,
@@ -384,7 +384,7 @@ pub struct Tenant {
l0_flush_global_state: L0FlushGlobalState,
}
impl std::fmt::Debug for Tenant {
impl std::fmt::Debug for TenantShard {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{} ({})", self.tenant_shard_id, self.current_state())
}
@@ -860,7 +860,7 @@ impl Debug for SetStoppingError {
}
}
/// Arguments to [`Tenant::create_timeline`].
/// Arguments to [`TenantShard::create_timeline`].
///
/// Not usable as an idempotency key for timeline creation because if [`CreateTimelineParamsBranch::ancestor_start_lsn`]
/// is `None`, the result of the timeline create call is not deterministic.
@@ -895,7 +895,7 @@ pub(crate) struct CreateTimelineParamsImportPgdata {
pub(crate) idempotency_key: import_pgdata::index_part_format::IdempotencyKey,
}
/// What is used to determine idempotency of a [`Tenant::create_timeline`] call in [`Tenant::start_creating_timeline`] in [`Tenant::start_creating_timeline`].
/// What is used to determine idempotency of a [`TenantShard::create_timeline`] call in [`TenantShard::start_creating_timeline`] in [`TenantShard::start_creating_timeline`].
///
/// Each [`Timeline`] object holds [`Self`] as an immutable property in [`Timeline::create_idempotency`].
///
@@ -933,7 +933,7 @@ pub(crate) struct CreatingTimelineIdempotencyImportPgdata {
idempotency_key: import_pgdata::index_part_format::IdempotencyKey,
}
/// What is returned by [`Tenant::start_creating_timeline`].
/// What is returned by [`TenantShard::start_creating_timeline`].
#[must_use]
enum StartCreatingTimelineResult {
CreateGuard(TimelineCreateGuard),
@@ -961,13 +961,13 @@ struct TimelineInitAndSyncNeedsSpawnImportPgdata {
guard: TimelineCreateGuard,
}
/// What is returned by [`Tenant::create_timeline`].
/// What is returned by [`TenantShard::create_timeline`].
enum CreateTimelineResult {
Created(Arc<Timeline>),
Idempotent(Arc<Timeline>),
/// IMPORTANT: This [`Arc<Timeline>`] object is not in [`Tenant::timelines`] when
/// IMPORTANT: This [`Arc<Timeline>`] object is not in [`TenantShard::timelines`] when
/// we return this result, nor will this concrete object ever be added there.
/// Cf method comment on [`Tenant::create_timeline_import_pgdata`].
/// Cf method comment on [`TenantShard::create_timeline_import_pgdata`].
ImportSpawned(Arc<Timeline>),
}
@@ -1099,7 +1099,7 @@ pub(crate) enum LoadConfigError {
NotFound(Utf8PathBuf),
}
impl Tenant {
impl TenantShard {
/// Yet another helper for timeline initialization.
///
/// - Initializes the Timeline struct and inserts it into the tenant's hash map
@@ -1277,7 +1277,7 @@ impl Tenant {
init_order: Option<InitializationOrder>,
mode: SpawnMode,
ctx: &RequestContext,
) -> Result<Arc<Tenant>, GlobalShutDown> {
) -> Result<Arc<TenantShard>, GlobalShutDown> {
let wal_redo_manager =
WalRedoManager::new(PostgresRedoManager::new(conf, tenant_shard_id))?;
@@ -1291,7 +1291,7 @@ impl Tenant {
let attach_mode = attached_conf.location.attach_mode;
let generation = attached_conf.location.generation;
let tenant = Arc::new(Tenant::new(
let tenant = Arc::new(TenantShard::new(
TenantState::Attaching,
conf,
attached_conf,
@@ -1336,13 +1336,13 @@ impl Tenant {
}
}
// Ideally we should use Tenant::set_broken_no_wait, but it is not supposed to be used when tenant is in loading state.
// Ideally we should use TenantShard::set_broken_no_wait, but it is not supposed to be used when tenant is in loading state.
enum BrokenVerbosity {
Error,
Info
}
let make_broken =
|t: &Tenant, err: anyhow::Error, verbosity: BrokenVerbosity| {
|t: &TenantShard, err: anyhow::Error, verbosity: BrokenVerbosity| {
match verbosity {
BrokenVerbosity::Info => {
info!("attach cancelled, setting tenant state to Broken: {err}");
@@ -1565,7 +1565,7 @@ impl Tenant {
/// No background tasks are started as part of this routine.
///
async fn attach(
self: &Arc<Tenant>,
self: &Arc<TenantShard>,
preload: Option<TenantPreload>,
ctx: &RequestContext,
) -> anyhow::Result<()> {
@@ -1704,7 +1704,7 @@ impl Tenant {
match effect {
TimelineInitAndSyncResult::ReadyToActivate(_) => {
// activation happens later, on Tenant::activate
// activation happens later, on TenantShard::activate
}
TimelineInitAndSyncResult::NeedsSpawnImportPgdata(
TimelineInitAndSyncNeedsSpawnImportPgdata {
@@ -1885,7 +1885,7 @@ impl Tenant {
}
async fn load_timelines_metadata(
self: &Arc<Tenant>,
self: &Arc<TenantShard>,
timeline_ids: HashSet<TimelineId>,
remote_storage: &GenericRemoteStorage,
cancel: CancellationToken,
@@ -1940,7 +1940,7 @@ impl Tenant {
}
fn load_timeline_metadata(
self: &Arc<Tenant>,
self: &Arc<TenantShard>,
timeline_id: TimelineId,
remote_storage: GenericRemoteStorage,
cancel: CancellationToken,
@@ -2333,14 +2333,14 @@ impl Tenant {
/// This is used by tests & import-from-basebackup.
///
/// The returned [`UninitializedTimeline`] contains no data nor metadata and it is in
/// a state that will fail [`Tenant::load_remote_timeline`] because `disk_consistent_lsn=Lsn(0)`.
/// a state that will fail [`TenantShard::load_remote_timeline`] because `disk_consistent_lsn=Lsn(0)`.
///
/// The caller is responsible for getting the timeline into a state that will be accepted
/// by [`Tenant::load_remote_timeline`] / [`Tenant::attach`].
/// by [`TenantShard::load_remote_timeline`] / [`TenantShard::attach`].
/// Then they may call [`UninitializedTimeline::finish_creation`] to add the timeline
/// to the [`Tenant::timelines`].
/// to the [`TenantShard::timelines`].
///
/// Tests should use `Tenant::create_test_timeline` to set up the minimum required metadata keys.
/// Tests should use `TenantShard::create_test_timeline` to set up the minimum required metadata keys.
pub(crate) async fn create_empty_timeline(
self: &Arc<Self>,
new_timeline_id: TimelineId,
@@ -2480,7 +2480,7 @@ impl Tenant {
/// the same timeline ID already exists, returns CreateTimelineError::AlreadyExists.
#[allow(clippy::too_many_arguments)]
pub(crate) async fn create_timeline(
self: &Arc<Tenant>,
self: &Arc<TenantShard>,
params: CreateTimelineParams,
broker_client: storage_broker::BrokerClientChannel,
ctx: &RequestContext,
@@ -2635,13 +2635,13 @@ impl Tenant {
Ok(activated_timeline)
}
/// The returned [`Arc<Timeline>`] is NOT in the [`Tenant::timelines`] map until the import
/// The returned [`Arc<Timeline>`] is NOT in the [`TenantShard::timelines`] map until the import
/// completes in the background. A DIFFERENT [`Arc<Timeline>`] will be inserted into the
/// [`Tenant::timelines`] map when the import completes.
/// [`TenantShard::timelines`] map when the import completes.
/// We only return an [`Arc<Timeline>`] here so the API handler can create a [`pageserver_api::models::TimelineInfo`]
/// for the response.
async fn create_timeline_import_pgdata(
self: &Arc<Tenant>,
self: &Arc<TenantShard>,
params: CreateTimelineParamsImportPgdata,
activate: ActivateTimelineArgs,
ctx: &RequestContext,
@@ -2736,7 +2736,7 @@ impl Tenant {
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline.timeline_id))]
async fn create_timeline_import_pgdata_task(
self: Arc<Tenant>,
self: Arc<TenantShard>,
timeline: Arc<Timeline>,
index_part: import_pgdata::index_part_format::Root,
activate: ActivateTimelineArgs,
@@ -2762,7 +2762,7 @@ impl Tenant {
}
async fn create_timeline_import_pgdata_task_impl(
self: Arc<Tenant>,
self: Arc<TenantShard>,
timeline: Arc<Timeline>,
index_part: import_pgdata::index_part_format::Root,
activate: ActivateTimelineArgs,
@@ -2780,10 +2780,10 @@ impl Tenant {
// Reload timeline from remote.
// This proves that the remote state is attachable, and it reuses the code.
//
// TODO: think about whether this is safe to do with concurrent Tenant::shutdown.
// TODO: think about whether this is safe to do with concurrent TenantShard::shutdown.
// timeline_create_guard hols the tenant gate open, so, shutdown cannot _complete_ until we exit.
// But our activate() call might launch new background tasks after Tenant::shutdown
// already went past shutting down the Tenant::timelines, which this timeline here is no part of.
// But our activate() call might launch new background tasks after TenantShard::shutdown
// already went past shutting down the TenantShard::timelines, which this timeline here is no part of.
// I think the same problem exists with the bootstrap & branch mgmt API tasks (tenant shutting
// down while bootstrapping/branching + activating), but, the race condition is much more likely
// to manifest because of the long runtime of this import task.
@@ -2798,7 +2798,7 @@ impl Tenant {
// };
let timeline_id = timeline.timeline_id;
// load from object storage like Tenant::attach does
// load from object storage like TenantShard::attach does
let resources = self.build_timeline_resources(timeline_id);
let index_part = resources
.remote_client
@@ -3268,7 +3268,7 @@ impl Tenant {
}
Err(SetStoppingError::AlreadyStopping(other)) => {
// give caller the option to wait for this this shutdown
info!("Tenant::shutdown: AlreadyStopping");
info!("TenantShard::shutdown: AlreadyStopping");
return Err(other);
}
};
@@ -3761,7 +3761,7 @@ enum ActivateTimelineArgs {
No,
}
impl Tenant {
impl TenantShard {
pub fn tenant_specific_overrides(&self) -> TenantConfOpt {
self.tenant_conf.load().tenant_conf.clone()
}
@@ -3882,7 +3882,7 @@ impl Tenant {
update: F,
) -> anyhow::Result<TenantConfOpt> {
// Use read-copy-update in order to avoid overwriting the location config
// state if this races with [`Tenant::set_new_location_config`]. Note that
// state if this races with [`TenantShard::set_new_location_config`]. Note that
// this race is not possible if both request types come from the storage
// controller (as they should!) because an exclusive op lock is required
// on the storage controller side.
@@ -3994,7 +3994,7 @@ impl Tenant {
Ok(timeline)
}
/// [`Tenant::shutdown`] must be called before dropping the returned [`Tenant`] object
/// [`TenantShard::shutdown`] must be called before dropping the returned [`TenantShard`] object
/// to ensure proper cleanup of background tasks and metrics.
//
// Allow too_many_arguments because a constructor's argument list naturally grows with the
@@ -4010,7 +4010,7 @@ impl Tenant {
remote_storage: GenericRemoteStorage,
deletion_queue_client: DeletionQueueClient,
l0_flush_global_state: L0FlushGlobalState,
) -> Tenant {
) -> TenantShard {
debug_assert!(
!attached_conf.location.generation.is_none() || conf.control_plane_api.is_none()
);
@@ -4070,7 +4070,7 @@ impl Tenant {
}
});
Tenant {
TenantShard {
tenant_shard_id,
shard_identity,
generation: attached_conf.location.generation,
@@ -4104,7 +4104,7 @@ impl Tenant {
cancel: CancellationToken::default(),
gate: Gate::default(),
pagestream_throttle: Arc::new(throttle::Throttle::new(
Tenant::get_pagestream_throttle_config(conf, &attached_conf.tenant_conf),
TenantShard::get_pagestream_throttle_config(conf, &attached_conf.tenant_conf),
)),
pagestream_throttle_metrics: Arc::new(
crate::metrics::tenant_throttling::Pagestream::new(&tenant_shard_id),
@@ -4243,11 +4243,11 @@ impl Tenant {
// Perform GC for each timeline.
//
// Note that we don't hold the `Tenant::gc_cs` lock here because we don't want to delay the
// Note that we don't hold the `TenantShard::gc_cs` lock here because we don't want to delay the
// branch creation task, which requires the GC lock. A GC iteration can run concurrently
// with branch creation.
//
// See comments in [`Tenant::branch_timeline`] for more information about why branch
// See comments in [`TenantShard::branch_timeline`] for more information about why branch
// creation task can run concurrently with timeline's GC iteration.
for timeline in gc_timelines {
if cancel.is_cancelled() {
@@ -4277,7 +4277,7 @@ impl Tenant {
/// Refreshes the Timeline::gc_info for all timelines, returning the
/// vector of timelines which have [`Timeline::get_last_record_lsn`] past
/// [`Tenant::get_gc_horizon`].
/// [`TenantShard::get_gc_horizon`].
///
/// This is usually executed as part of periodic gc, but can now be triggered more often.
pub(crate) async fn refresh_gc_info(
@@ -5467,7 +5467,7 @@ pub(crate) mod harness {
}
}
pub struct TenantHarness {
pub struct TenantShardHarness {
pub conf: &'static PageServerConf,
pub tenant_conf: TenantConf,
pub tenant_shard_id: TenantShardId,
@@ -5493,7 +5493,7 @@ pub(crate) mod harness {
});
}
impl TenantHarness {
impl TenantShardHarness {
pub async fn create_custom(
test_name: &'static str,
tenant_conf: TenantConf,
@@ -5570,7 +5570,7 @@ pub(crate) mod harness {
info_span!("TenantHarness", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug())
}
pub(crate) async fn load(&self) -> (Arc<Tenant>, RequestContext) {
pub(crate) async fn load(&self) -> (Arc<TenantShard>, RequestContext) {
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
(
self.do_try_load(&ctx)
@@ -5584,10 +5584,10 @@ pub(crate) mod harness {
pub(crate) async fn do_try_load(
&self,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
) -> anyhow::Result<Arc<TenantShard>> {
let walredo_mgr = Arc::new(WalRedoManager::from(TestRedoManager));
let tenant = Arc::new(Tenant::new(
let tenant = Arc::new(TenantShard::new(
TenantState::Attaching,
self.conf,
AttachedTenantConf::try_from(LocationConf::attached_single(
@@ -5716,7 +5716,7 @@ mod tests {
#[tokio::test]
async fn test_basic() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_basic").await?.load().await;
let (tenant, ctx) = TenantShardHarness::create("test_basic").await?.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
.await?;
@@ -5763,7 +5763,7 @@ mod tests {
#[tokio::test]
async fn no_duplicate_timelines() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("no_duplicate_timelines")
let (tenant, ctx) = TenantShardHarness::create("no_duplicate_timelines")
.await?
.load()
.await;
@@ -5799,7 +5799,10 @@ mod tests {
async fn test_branch() -> anyhow::Result<()> {
use std::str::from_utf8;
let (tenant, ctx) = TenantHarness::create("test_branch").await?.load().await;
let (tenant, ctx) = TenantShardHarness::create("test_branch")
.await?
.load()
.await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
@@ -5921,7 +5924,7 @@ mod tests {
#[tokio::test(start_paused = true)]
async fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> {
let (tenant, ctx) =
TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")
TenantShardHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")
.await?
.load()
.await;
@@ -5973,7 +5976,7 @@ mod tests {
#[tokio::test]
async fn test_prohibit_branch_creation_on_pre_initdb_lsn() -> anyhow::Result<()> {
let (tenant, ctx) =
TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")
TenantShardHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")
.await?
.load()
.await;
@@ -6029,7 +6032,7 @@ mod tests {
#[tokio::test]
async fn test_get_branchpoints_from_an_inactive_timeline() -> anyhow::Result<()> {
let (tenant, ctx) =
TenantHarness::create("test_get_branchpoints_from_an_inactive_timeline")
TenantShardHarness::create("test_get_branchpoints_from_an_inactive_timeline")
.await?
.load()
.await;
@@ -6093,7 +6096,7 @@ mod tests {
#[tokio::test]
async fn test_retain_data_in_parent_which_is_needed_for_child() -> anyhow::Result<()> {
let (tenant, ctx) =
TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")
TenantShardHarness::create("test_retain_data_in_parent_which_is_needed_for_child")
.await?
.load()
.await;
@@ -6124,10 +6127,11 @@ mod tests {
}
#[tokio::test]
async fn test_parent_keeps_data_forever_after_branching() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_parent_keeps_data_forever_after_branching")
.await?
.load()
.await;
let (tenant, ctx) =
TenantShardHarness::create("test_parent_keeps_data_forever_after_branching")
.await?
.load()
.await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
@@ -6165,7 +6169,7 @@ mod tests {
#[tokio::test]
async fn timeline_load() -> anyhow::Result<()> {
const TEST_NAME: &str = "timeline_load";
let harness = TenantHarness::create(TEST_NAME).await?;
let harness = TenantShardHarness::create(TEST_NAME).await?;
{
let (tenant, ctx) = harness.load().await;
let tline = tenant
@@ -6192,7 +6196,7 @@ mod tests {
#[tokio::test]
async fn timeline_load_with_ancestor() -> anyhow::Result<()> {
const TEST_NAME: &str = "timeline_load_with_ancestor";
let harness = TenantHarness::create(TEST_NAME).await?;
let harness = TenantShardHarness::create(TEST_NAME).await?;
// create two timelines
{
let (tenant, ctx) = harness.load().await;
@@ -6240,7 +6244,7 @@ mod tests {
#[tokio::test]
async fn delta_layer_dumping() -> anyhow::Result<()> {
use storage_layer::AsLayerDesc;
let (tenant, ctx) = TenantHarness::create("test_layer_dumping")
let (tenant, ctx) = TenantShardHarness::create("test_layer_dumping")
.await?
.load()
.await;
@@ -6270,7 +6274,10 @@ mod tests {
#[tokio::test]
async fn test_images() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_images").await?.load().await;
let (tenant, ctx) = TenantShardHarness::create("test_images")
.await?
.load()
.await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
.await?;
@@ -6368,7 +6375,7 @@ mod tests {
}
async fn bulk_insert_compact_gc(
tenant: &Tenant,
tenant: &TenantShard,
timeline: &Arc<Timeline>,
ctx: &RequestContext,
lsn: Lsn,
@@ -6380,7 +6387,7 @@ mod tests {
}
async fn bulk_insert_maybe_compact_gc(
tenant: &Tenant,
tenant: &TenantShard,
timeline: &Arc<Timeline>,
ctx: &RequestContext,
mut lsn: Lsn,
@@ -6444,7 +6451,7 @@ mod tests {
//
#[tokio::test]
async fn test_bulk_insert() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_bulk_insert").await?;
let harness = TenantShardHarness::create("test_bulk_insert").await?;
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
@@ -6475,7 +6482,7 @@ mod tests {
// so the search can stop at the first delta layer and doesn't traverse any deeper.
#[tokio::test]
async fn test_get_vectored() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_get_vectored").await?;
let harness = TenantShardHarness::create("test_get_vectored").await?;
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
@@ -6585,7 +6592,7 @@ mod tests {
#[tokio::test]
async fn test_get_vectored_aux_files() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_get_vectored_aux_files").await?;
let harness = TenantShardHarness::create("test_get_vectored_aux_files").await?;
let (tenant, ctx) = harness.load().await;
let tline = tenant
@@ -6661,7 +6668,7 @@ mod tests {
..TenantConf::default()
};
let harness = TenantHarness::create_custom(
let harness = TenantShardHarness::create_custom(
"test_get_vectored_key_gap",
tenant_conf,
TenantId::generate(),
@@ -6811,7 +6818,7 @@ mod tests {
// ```
#[tokio::test]
async fn test_get_vectored_ancestor_descent() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_get_vectored_on_lsn_axis").await?;
let harness = TenantShardHarness::create("test_get_vectored_on_lsn_axis").await?;
let (tenant, ctx) = harness.load().await;
let start_key = Key::from_hex("010000000033333333444444445500000000").unwrap();
@@ -6960,7 +6967,7 @@ mod tests {
name: &'static str,
compaction_algorithm: CompactionAlgorithm,
) -> anyhow::Result<()> {
let mut harness = TenantHarness::create(name).await?;
let mut harness = TenantShardHarness::create(name).await?;
harness.tenant_conf.compaction_algorithm = CompactionAlgorithmSettings {
kind: compaction_algorithm,
};
@@ -7044,7 +7051,7 @@ mod tests {
#[tokio::test]
async fn test_traverse_branches() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_traverse_branches")
let (tenant, ctx) = TenantShardHarness::create("test_traverse_branches")
.await?
.load()
.await;
@@ -7135,7 +7142,7 @@ mod tests {
#[tokio::test]
async fn test_traverse_ancestors() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_traverse_ancestors")
let (tenant, ctx) = TenantShardHarness::create("test_traverse_ancestors")
.await?
.load()
.await;
@@ -7202,7 +7209,7 @@ mod tests {
#[tokio::test]
async fn test_write_at_initdb_lsn_takes_optimization_code_path() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_empty_test_timeline_is_usable")
let (tenant, ctx) = TenantShardHarness::create("test_empty_test_timeline_is_usable")
.await?
.load()
.await;
@@ -7272,13 +7279,13 @@ mod tests {
#[tokio::test]
async fn test_create_guard_crash() -> anyhow::Result<()> {
let name = "test_create_guard_crash";
let harness = TenantHarness::create(name).await?;
let harness = TenantShardHarness::create(name).await?;
{
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
.await?;
// Leave the timeline ID in [`Tenant::timelines_creating`] to exclude attempting to create it again
// Leave the timeline ID in [`TenantShard::timelines_creating`] to exclude attempting to create it again
let raw_tline = tline.raw_timeline().unwrap();
raw_tline
.shutdown(super::timeline::ShutdownMode::Hard)
@@ -7325,7 +7332,7 @@ mod tests {
name: &'static str,
compaction_algorithm: CompactionAlgorithm,
) -> anyhow::Result<()> {
let mut harness = TenantHarness::create(name).await?;
let mut harness = TenantShardHarness::create(name).await?;
harness.tenant_conf.compaction_algorithm = CompactionAlgorithmSettings {
kind: compaction_algorithm,
};
@@ -7349,7 +7356,7 @@ mod tests {
#[tokio::test]
async fn test_metadata_scan() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_metadata_scan").await?;
let harness = TenantShardHarness::create("test_metadata_scan").await?;
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
@@ -7468,7 +7475,7 @@ mod tests {
#[tokio::test]
async fn test_metadata_compaction_trigger() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_metadata_compaction_trigger").await?;
let harness = TenantShardHarness::create("test_metadata_compaction_trigger").await?;
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
@@ -7516,7 +7523,9 @@ mod tests {
#[tokio::test]
async fn test_aux_file_e2e() {
let harness = TenantHarness::create("test_aux_file_e2e").await.unwrap();
let harness = TenantShardHarness::create("test_aux_file_e2e")
.await
.unwrap();
let (tenant, ctx) = harness.load().await;
@@ -7572,7 +7581,7 @@ mod tests {
#[tokio::test]
async fn test_metadata_image_creation() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_metadata_image_creation").await?;
let harness = TenantShardHarness::create("test_metadata_image_creation").await?;
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
@@ -7671,7 +7680,7 @@ mod tests {
#[tokio::test]
async fn test_vectored_missing_data_key_reads() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_vectored_missing_data_key_reads").await?;
let harness = TenantShardHarness::create("test_vectored_missing_data_key_reads").await?;
let (tenant, ctx) = harness.load().await;
let base_key = Key::from_hex("000000000033333333444444445500000000").unwrap();
@@ -7743,7 +7752,8 @@ mod tests {
#[tokio::test]
async fn test_vectored_missing_metadata_key_reads() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_vectored_missing_metadata_key_reads").await?;
let harness =
TenantShardHarness::create("test_vectored_missing_metadata_key_reads").await?;
let (tenant, ctx) = harness.load().await;
let base_key = Key::from_hex("620000000033333333444444445500000000").unwrap();
@@ -7964,7 +7974,7 @@ mod tests {
#[tokio::test]
async fn test_metadata_tombstone_reads() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_metadata_tombstone_reads").await?;
let harness = TenantShardHarness::create("test_metadata_tombstone_reads").await?;
let (tenant, ctx) = harness.load().await;
let key0 = Key::from_hex("620000000033333333444444445500000000").unwrap();
let key1 = Key::from_hex("620000000033333333444444445500000001").unwrap();
@@ -8044,7 +8054,7 @@ mod tests {
#[tokio::test]
async fn test_metadata_tombstone_image_creation() {
let harness = TenantHarness::create("test_metadata_tombstone_image_creation")
let harness = TenantShardHarness::create("test_metadata_tombstone_image_creation")
.await
.unwrap();
let (tenant, ctx) = harness.load().await;
@@ -8118,7 +8128,7 @@ mod tests {
#[tokio::test]
async fn test_metadata_tombstone_empty_image_creation() {
let harness = TenantHarness::create("test_metadata_tombstone_empty_image_creation")
let harness = TenantShardHarness::create("test_metadata_tombstone_empty_image_creation")
.await
.unwrap();
let (tenant, ctx) = harness.load().await;
@@ -8183,7 +8193,8 @@ mod tests {
#[tokio::test]
async fn test_simple_bottom_most_compaction_images() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_simple_bottom_most_compaction_images").await?;
let harness =
TenantShardHarness::create("test_simple_bottom_most_compaction_images").await?;
let (tenant, ctx) = harness.load().await;
fn get_key(id: u32) -> Key {
@@ -8403,7 +8414,7 @@ mod tests {
#[cfg(feature = "testing")]
#[tokio::test]
async fn test_neon_test_record() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_neon_test_record").await?;
let harness = TenantShardHarness::create("test_neon_test_record").await?;
let (tenant, ctx) = harness.load().await;
fn get_key(id: u32) -> Key {
@@ -8484,7 +8495,7 @@ mod tests {
#[tokio::test(start_paused = true)]
async fn test_lsn_lease() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_lsn_lease")
let (tenant, ctx) = TenantShardHarness::create("test_lsn_lease")
.await
.unwrap()
.load()
@@ -8617,7 +8628,7 @@ mod tests {
test_name: &'static str,
use_delta_bottom_layer: bool,
) -> anyhow::Result<()> {
let harness = TenantHarness::create(test_name).await?;
let harness = TenantShardHarness::create(test_name).await?;
let (tenant, ctx) = harness.load().await;
fn get_key(id: u32) -> Key {
@@ -8876,7 +8887,7 @@ mod tests {
#[cfg(feature = "testing")]
#[tokio::test]
async fn test_generate_key_retention() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_generate_key_retention").await?;
let harness = TenantShardHarness::create("test_generate_key_retention").await?;
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
@@ -9225,7 +9236,8 @@ mod tests {
#[tokio::test]
async fn test_simple_bottom_most_compaction_with_retain_lsns() -> anyhow::Result<()> {
let harness =
TenantHarness::create("test_simple_bottom_most_compaction_with_retain_lsns").await?;
TenantShardHarness::create("test_simple_bottom_most_compaction_with_retain_lsns")
.await?;
let (tenant, ctx) = harness.load().await;
fn get_key(id: u32) -> Key {
@@ -9485,9 +9497,10 @@ mod tests {
#[tokio::test]
async fn test_simple_bottom_most_compaction_with_retain_lsns_single_key() -> anyhow::Result<()>
{
let harness =
TenantHarness::create("test_simple_bottom_most_compaction_with_retain_lsns_single_key")
.await?;
let harness = TenantShardHarness::create(
"test_simple_bottom_most_compaction_with_retain_lsns_single_key",
)
.await?;
let (tenant, ctx) = harness.load().await;
fn get_key(id: u32) -> Key {
@@ -9708,7 +9721,8 @@ mod tests {
async fn test_simple_bottom_most_compaction_on_branch() -> anyhow::Result<()> {
use models::CompactLsnRange;
let harness = TenantHarness::create("test_simple_bottom_most_compaction_on_branch").await?;
let harness =
TenantShardHarness::create("test_simple_bottom_most_compaction_on_branch").await?;
let (tenant, ctx) = harness.load().await;
fn get_key(id: u32) -> Key {
@@ -9938,7 +9952,8 @@ mod tests {
#[cfg(feature = "testing")]
#[tokio::test]
async fn test_vectored_read_with_nested_image_layer() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_vectored_read_with_nested_image_layer").await?;
let harness =
TenantShardHarness::create("test_vectored_read_with_nested_image_layer").await?;
let (tenant, ctx) = harness.load().await;
let will_init_keys = [2, 6];
@@ -10103,7 +10118,8 @@ mod tests {
#[cfg(feature = "testing")]
#[tokio::test]
async fn test_simple_partial_bottom_most_compaction() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_simple_partial_bottom_most_compaction").await?;
let harness =
TenantShardHarness::create("test_simple_partial_bottom_most_compaction").await?;
let (tenant, ctx) = harness.load().await;
fn get_key(id: u32) -> Key {
@@ -10445,7 +10461,7 @@ mod tests {
#[cfg(feature = "testing")]
#[tokio::test]
async fn test_timeline_offload_retain_lsn() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_timeline_offload_retain_lsn")
let harness = TenantShardHarness::create("test_timeline_offload_retain_lsn")
.await
.unwrap();
let (tenant, ctx) = harness.load().await;
@@ -10495,7 +10511,8 @@ mod tests {
#[cfg(feature = "testing")]
#[tokio::test]
async fn test_simple_bottom_most_compaction_above_lsn() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_simple_bottom_most_compaction_above_lsn").await?;
let harness =
TenantShardHarness::create("test_simple_bottom_most_compaction_above_lsn").await?;
let (tenant, ctx) = harness.load().await;
fn get_key(id: u32) -> Key {
@@ -10746,7 +10763,8 @@ mod tests {
#[cfg(feature = "testing")]
#[tokio::test]
async fn test_simple_bottom_most_compaction_rectangle() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_simple_bottom_most_compaction_rectangle").await?;
let harness =
TenantShardHarness::create("test_simple_bottom_most_compaction_rectangle").await?;
let (tenant, ctx) = harness.load().await;
fn get_key(id: u32) -> Key {

View File

@@ -44,7 +44,9 @@ use crate::tenant::config::{
use crate::tenant::span::debug_assert_current_span_has_tenant_id;
use crate::tenant::storage_layer::inmemory_layer;
use crate::tenant::timeline::ShutdownMode;
use crate::tenant::{AttachedTenantConf, GcError, LoadConfigError, SpawnMode, Tenant, TenantState};
use crate::tenant::{
AttachedTenantConf, GcError, LoadConfigError, SpawnMode, TenantShard, TenantState,
};
use crate::virtual_file::MaybeFatalIo;
use crate::{InitializationOrder, TEMP_FILE_SUFFIX};
@@ -69,7 +71,7 @@ use super::{GlobalShutDown, TenantSharedResources};
/// having a properly acquired generation (Secondary doesn't need a generation)
#[derive(Clone)]
pub(crate) enum TenantSlot {
Attached(Arc<Tenant>),
Attached(Arc<TenantShard>),
Secondary(Arc<SecondaryTenant>),
/// In this state, other administrative operations acting on the TenantId should
/// block, or return a retry indicator equivalent to HTTP 503.
@@ -88,7 +90,7 @@ impl std::fmt::Debug for TenantSlot {
impl TenantSlot {
/// Return the `Tenant` in this slot if attached, else None
fn get_attached(&self) -> Option<&Arc<Tenant>> {
fn get_attached(&self) -> Option<&Arc<TenantShard>> {
match self {
Self::Attached(t) => Some(t),
Self::Secondary(_) => None,
@@ -99,13 +101,13 @@ impl TenantSlot {
/// The tenants known to the pageserver.
/// The enum variants are used to distinguish the different states that the pageserver can be in.
pub(crate) enum TenantsMap {
pub(crate) enum TenantShardMap {
/// [`init_tenant_mgr`] is not done yet.
Initializing,
/// [`init_tenant_mgr`] is done, all on-disk tenants have been loaded.
/// New tenants can be added using [`tenant_map_acquire_slot`].
Open(BTreeMap<TenantShardId, TenantSlot>),
/// The pageserver has entered shutdown mode via [`TenantManager::shutdown`].
/// The pageserver has entered shutdown mode via [`TenantShardManager::shutdown`].
/// Existing tenants are still accessible, but no new tenants can be created.
ShuttingDown(BTreeMap<TenantShardId, TenantSlot>),
}
@@ -166,19 +168,19 @@ impl TenantStartupMode {
/// Result type for looking up a TenantId to a specific shard
pub(crate) enum ShardResolveResult {
NotFound,
Found(Arc<Tenant>),
Found(Arc<TenantShard>),
// Wait for this barrrier, then query again
InProgress(utils::completion::Barrier),
}
impl TenantsMap {
impl TenantShardMap {
/// Convenience function for typical usage, where we want to get a `Tenant` object, for
/// working with attached tenants. If the TenantId is in the map but in Secondary state,
/// None is returned.
pub(crate) fn get(&self, tenant_shard_id: &TenantShardId) -> Option<&Arc<Tenant>> {
pub(crate) fn get(&self, tenant_shard_id: &TenantShardId) -> Option<&Arc<TenantShard>> {
match self {
TenantsMap::Initializing => None,
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
TenantShardMap::Initializing => None,
TenantShardMap::Open(m) | TenantShardMap::ShuttingDown(m) => {
m.get(tenant_shard_id).and_then(|slot| slot.get_attached())
}
}
@@ -187,8 +189,8 @@ impl TenantsMap {
#[cfg(all(debug_assertions, not(test)))]
pub(crate) fn len(&self) -> usize {
match self {
TenantsMap::Initializing => 0,
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m.len(),
TenantShardMap::Initializing => 0,
TenantShardMap::Open(m) | TenantShardMap::ShuttingDown(m) => m.len(),
}
}
}
@@ -284,23 +286,23 @@ impl BackgroundPurges {
}
}
static TENANTS: Lazy<std::sync::RwLock<TenantsMap>> =
Lazy::new(|| std::sync::RwLock::new(TenantsMap::Initializing));
static TENANTS: Lazy<std::sync::RwLock<TenantShardMap>> =
Lazy::new(|| std::sync::RwLock::new(TenantShardMap::Initializing));
/// Responsible for storing and mutating the collection of all tenants
/// that this pageserver has state for.
///
/// Every Tenant and SecondaryTenant instance lives inside the TenantManager.
/// Every TenantShard and SecondaryTenant instance lives inside the TenantShardManager.
///
/// The most important role of the TenantManager is to prevent conflicts: e.g. trying to attach
/// The most important role of the TenantShardManager is to prevent conflicts: e.g. trying to attach
/// the same tenant twice concurrently, or trying to configure the same tenant into secondary
/// and attached modes concurrently.
pub struct TenantManager {
pub struct TenantShardManager {
conf: &'static PageServerConf,
// TODO: currently this is a &'static pointing to TENANTs. When we finish refactoring
// out of that static variable, the TenantManager can own this.
// See https://github.com/neondatabase/neon/issues/5796
tenants: &'static std::sync::RwLock<TenantsMap>,
tenants: &'static std::sync::RwLock<TenantShardMap>,
resources: TenantSharedResources,
// Long-running operations that happen outside of a [`Tenant`] lifetime should respect this token.
@@ -412,7 +414,7 @@ fn load_tenant_config(
return None;
}
Some(Tenant::load_tenant_config(conf, &tenant_shard_id))
Some(TenantShard::load_tenant_config(conf, &tenant_shard_id))
}
/// Initial stage of load: walk the local tenants directory, clean up any temp files,
@@ -491,7 +493,7 @@ pub async fn init_tenant_mgr(
resources: TenantSharedResources,
init_order: InitializationOrder,
cancel: CancellationToken,
) -> anyhow::Result<TenantManager> {
) -> anyhow::Result<TenantShardManager> {
let mut tenants = BTreeMap::new();
let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
@@ -606,7 +608,8 @@ pub async fn init_tenant_mgr(
// Presence of a generation number implies attachment: attach the tenant
// if it wasn't already, and apply the generation number.
config_write_futs.push(async move {
let r = Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf).await;
let r =
TenantShard::persist_tenant_config(conf, &tenant_shard_id, &location_conf).await;
(tenant_shard_id, location_conf, r)
});
}
@@ -669,11 +672,11 @@ pub async fn init_tenant_mgr(
info!("Processed {} local tenants at startup", tenants.len());
let mut tenants_map = TENANTS.write().unwrap();
assert!(matches!(&*tenants_map, &TenantsMap::Initializing));
assert!(matches!(&*tenants_map, &TenantShardMap::Initializing));
*tenants_map = TenantsMap::Open(tenants);
*tenants_map = TenantShardMap::Open(tenants);
Ok(TenantManager {
Ok(TenantShardManager {
conf,
tenants: &TENANTS,
resources,
@@ -682,7 +685,7 @@ pub async fn init_tenant_mgr(
})
}
/// Wrapper for Tenant::spawn that checks invariants before running
/// Wrapper for TenantShard::spawn that checks invariants before running
#[allow(clippy::too_many_arguments)]
fn tenant_spawn(
conf: &'static PageServerConf,
@@ -694,7 +697,7 @@ fn tenant_spawn(
init_order: Option<InitializationOrder>,
mode: SpawnMode,
ctx: &RequestContext,
) -> Result<Arc<Tenant>, GlobalShutDown> {
) -> Result<Arc<TenantShard>, GlobalShutDown> {
// All these conditions should have been satisfied by our caller: the tenant dir exists, is a well formed
// path, and contains a configuration file. Assertions that do synchronous I/O are limited to debug mode
// to avoid impacting prod runtime performance.
@@ -705,7 +708,7 @@ fn tenant_spawn(
.try_exists()
.unwrap());
Tenant::spawn(
TenantShard::spawn(
conf,
tenant_shard_id,
resources,
@@ -717,7 +720,7 @@ fn tenant_spawn(
)
}
async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantShardMap>) {
let mut join_set = JoinSet::new();
#[cfg(all(debug_assertions, not(test)))]
@@ -732,12 +735,12 @@ async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
let (total_in_progress, total_attached) = {
let mut m = tenants.write().unwrap();
match &mut *m {
TenantsMap::Initializing => {
*m = TenantsMap::ShuttingDown(BTreeMap::default());
TenantShardMap::Initializing => {
*m = TenantShardMap::ShuttingDown(BTreeMap::default());
info!("tenants map is empty");
return;
}
TenantsMap::Open(tenants) => {
TenantShardMap::Open(tenants) => {
let mut shutdown_state = BTreeMap::new();
let mut total_in_progress = 0;
let mut total_attached = 0;
@@ -787,10 +790,10 @@ async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
}
}
}
*m = TenantsMap::ShuttingDown(shutdown_state);
*m = TenantShardMap::ShuttingDown(shutdown_state);
(total_in_progress, total_attached)
}
TenantsMap::ShuttingDown(_) => {
TenantShardMap::ShuttingDown(_) => {
error!("already shutting down, this function isn't supposed to be called more than once");
return;
}
@@ -870,7 +873,7 @@ pub(crate) enum UpsertLocationError {
InternalError(anyhow::Error),
}
impl TenantManager {
impl TenantShardManager {
/// Convenience function so that anyone with a TenantManager can get at the global configuration, without
/// having to pass it around everywhere as a separate object.
pub(crate) fn get_conf(&self) -> &'static PageServerConf {
@@ -881,11 +884,11 @@ impl TenantManager {
/// undergoing a state change (i.e. slot is InProgress).
///
/// The return Tenant is not guaranteed to be active: check its status after obtaing it, or
/// use [`Tenant::wait_to_become_active`] before using it if you will do I/O on it.
/// use [`TenantShard::wait_to_become_active`] before using it if you will do I/O on it.
pub(crate) fn get_attached_tenant_shard(
&self,
tenant_shard_id: TenantShardId,
) -> Result<Arc<Tenant>, GetTenantError> {
) -> Result<Arc<TenantShard>, GetTenantError> {
let locked = self.tenants.read().unwrap();
let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)?;
@@ -934,12 +937,12 @@ impl TenantManager {
flush: Option<Duration>,
mut spawn_mode: SpawnMode,
ctx: &RequestContext,
) -> Result<Option<Arc<Tenant>>, UpsertLocationError> {
) -> Result<Option<Arc<TenantShard>>, UpsertLocationError> {
debug_assert_current_span_has_tenant_id();
info!("configuring tenant location to state {new_location_config:?}");
enum FastPathModified {
Attached(Arc<Tenant>),
Attached(Arc<TenantShard>),
Secondary(Arc<SecondaryTenant>),
}
@@ -996,9 +999,13 @@ impl TenantManager {
// phase of writing config and/or waiting for flush, before returning.
match fast_path_taken {
Some(FastPathModified::Attached(tenant)) => {
Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
.await
.fatal_err("write tenant shard config");
TenantShard::persist_tenant_config(
self.conf,
&tenant_shard_id,
&new_location_config,
)
.await
.fatal_err("write tenant shard config");
// Transition to AttachedStale means we may well hold a valid generation
// still, and have been requested to go stale as part of a migration. If
@@ -1027,9 +1034,13 @@ impl TenantManager {
return Ok(Some(tenant));
}
Some(FastPathModified::Secondary(_secondary_tenant)) => {
Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
.await
.fatal_err("write tenant shard config");
TenantShard::persist_tenant_config(
self.conf,
&tenant_shard_id,
&new_location_config,
)
.await
.fatal_err("write tenant shard config");
return Ok(None);
}
@@ -1119,7 +1130,7 @@ impl TenantManager {
// Before activating either secondary or attached mode, persist the
// configuration, so that on restart we will re-attach (or re-start
// secondary) on the tenant.
Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
TenantShard::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
.await
.fatal_err("write tenant shard config");
@@ -1257,7 +1268,7 @@ impl TenantManager {
let tenant_path = self.conf.tenant_path(&tenant_shard_id);
let timelines_path = self.conf.timelines_path(&tenant_shard_id);
let config = Tenant::load_tenant_config(self.conf, &tenant_shard_id)?;
let config = TenantShard::load_tenant_config(self.conf, &tenant_shard_id)?;
if drop_cache {
tracing::info!("Dropping local file cache");
@@ -1292,11 +1303,11 @@ impl TenantManager {
Ok(())
}
pub(crate) fn get_attached_active_tenant_shards(&self) -> Vec<Arc<Tenant>> {
pub(crate) fn get_attached_active_tenant_shards(&self) -> Vec<Arc<TenantShard>> {
let locked = self.tenants.read().unwrap();
match &*locked {
TenantsMap::Initializing => Vec::new(),
TenantsMap::Open(map) | TenantsMap::ShuttingDown(map) => map
TenantShardMap::Initializing => Vec::new(),
TenantShardMap::Open(map) | TenantShardMap::ShuttingDown(map) => map
.values()
.filter_map(|slot| {
slot.get_attached()
@@ -1316,8 +1327,8 @@ impl TenantManager {
let locked = self.tenants.read().unwrap();
let map = match &*locked {
TenantsMap::Initializing | TenantsMap::ShuttingDown(_) => return,
TenantsMap::Open(m) => m,
TenantShardMap::Initializing | TenantShardMap::ShuttingDown(_) => return,
TenantShardMap::Open(m) => m,
};
for (tenant_id, slot) in map {
@@ -1334,8 +1345,8 @@ impl TenantManager {
pub(crate) fn list(&self) -> Vec<(TenantShardId, TenantSlot)> {
let locked = self.tenants.read().unwrap();
match &*locked {
TenantsMap::Initializing => Vec::new(),
TenantsMap::Open(map) | TenantsMap::ShuttingDown(map) => {
TenantShardMap::Initializing => Vec::new(),
TenantShardMap::Open(map) | TenantShardMap::ShuttingDown(map) => {
map.iter().map(|(k, v)| (*k, v.clone())).collect()
}
}
@@ -1344,8 +1355,8 @@ impl TenantManager {
pub(crate) fn get(&self, tenant_shard_id: TenantShardId) -> Option<TenantSlot> {
let locked = self.tenants.read().unwrap();
match &*locked {
TenantsMap::Initializing => None,
TenantsMap::Open(map) | TenantsMap::ShuttingDown(map) => {
TenantShardMap::Initializing => None,
TenantShardMap::Open(map) | TenantShardMap::ShuttingDown(map) => {
map.get(&tenant_shard_id).cloned()
}
}
@@ -1441,7 +1452,7 @@ impl TenantManager {
#[instrument(skip_all, fields(tenant_id=%tenant.get_tenant_shard_id().tenant_id, shard_id=%tenant.get_tenant_shard_id().shard_slug(), new_shard_count=%new_shard_count.literal()))]
pub(crate) async fn shard_split(
&self,
tenant: Arc<Tenant>,
tenant: Arc<TenantShard>,
new_shard_count: ShardCount,
new_stripe_size: Option<ShardStripeSize>,
ctx: &RequestContext,
@@ -1471,7 +1482,7 @@ impl TenantManager {
pub(crate) async fn do_shard_split(
&self,
tenant: Arc<Tenant>,
tenant: Arc<TenantShard>,
new_shard_count: ShardCount,
new_stripe_size: Option<ShardStripeSize>,
ctx: &RequestContext,
@@ -1519,7 +1530,7 @@ impl TenantManager {
// Phase 1: Write out child shards' remote index files, in the parent tenant's current generation
if let Err(e) = tenant.split_prepare(&child_shards).await {
// If [`Tenant::split_prepare`] fails, we must reload the tenant, because it might
// If [`TenantShard::split_prepare`] fails, we must reload the tenant, because it might
// have been left in a partially-shut-down state.
tracing::warn!("Failed to prepare for split: {e}, reloading Tenant before returning");
return Err(e);
@@ -1697,7 +1708,7 @@ impl TenantManager {
/// For each resident layer in the parent shard, we will hard link it into all of the child shards.
async fn shard_split_hardlink(
&self,
parent_shard: &Tenant,
parent_shard: &TenantShard,
child_shards: Vec<TenantShardId>,
) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_id();
@@ -1888,8 +1899,8 @@ impl TenantManager {
) -> Result<Vec<(TenantShardId, TenantState, Generation)>, TenantMapListError> {
let tenants = self.tenants.read().unwrap();
let m = match &*tenants {
TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m,
TenantShardMap::Initializing => return Err(TenantMapListError::Initializing),
TenantShardMap::Open(m) | TenantShardMap::ShuttingDown(m) => m,
};
Ok(m.iter()
.filter_map(|(id, tenant)| match tenant {
@@ -1974,7 +1985,7 @@ impl TenantManager {
}
let tenant_path = self.conf.tenant_path(&tenant_shard_id);
let config = Tenant::load_tenant_config(self.conf, &tenant_shard_id)
let config = TenantShard::load_tenant_config(self.conf, &tenant_shard_id)
.map_err(|e| Error::DetachReparent(e.into()))?;
let shard_identity = config.shard;
@@ -2070,8 +2081,8 @@ impl TenantManager {
let mut any_in_progress = None;
match &*tenants {
TenantsMap::Initializing => ShardResolveResult::NotFound,
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => {
TenantShardMap::Initializing => ShardResolveResult::NotFound,
TenantShardMap::Open(m) | TenantShardMap::ShuttingDown(m) => {
for slot in m.range(TenantShardId::tenant_range(*tenant_id)) {
// Ignore all slots that don't contain an attached tenant
let tenant = match &slot.1 {
@@ -2135,8 +2146,8 @@ impl TenantManager {
pub(crate) fn calculate_utilization(&self) -> Result<(u64, u32), TenantMapListError> {
let tenants = self.tenants.read().unwrap();
let m = match &*tenants {
TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m,
TenantShardMap::Initializing => return Err(TenantMapListError::Initializing),
TenantShardMap::Open(m) | TenantShardMap::ShuttingDown(m) => m,
};
let shard_count = m.len();
let mut wanted_bytes = 0;
@@ -2482,18 +2493,18 @@ impl SlotGuard {
}
let m = match &mut *locked {
TenantsMap::Initializing => {
TenantShardMap::Initializing => {
return Err(TenantSlotUpsertError::MapState(
TenantMapError::StillInitializing,
))
}
TenantsMap::ShuttingDown(_) => {
TenantShardMap::ShuttingDown(_) => {
return Err(TenantSlotUpsertError::ShuttingDown((
new_value,
self.completion.clone(),
)));
}
TenantsMap::Open(m) => m,
TenantShardMap::Open(m) => m,
};
METRICS.slot_inserted(&new_value);
@@ -2590,17 +2601,17 @@ impl Drop for SlotGuard {
let mut locked = TENANTS.write().unwrap();
let m = match &mut *locked {
TenantsMap::Initializing => {
TenantShardMap::Initializing => {
// There is no map, this should never happen.
return;
}
TenantsMap::ShuttingDown(_) => {
TenantShardMap::ShuttingDown(_) => {
// When we transition to shutdown, InProgress elements are removed
// from the map, so we do not need to clean up our Inprogress marker.
// See [`shutdown_all_tenants0`]
return;
}
TenantsMap::Open(m) => m,
TenantShardMap::Open(m) => m,
};
use std::collections::btree_map::Entry;
@@ -2640,13 +2651,13 @@ enum TenantSlotPeekMode {
}
fn tenant_map_peek_slot<'a>(
tenants: &'a std::sync::RwLockReadGuard<'a, TenantsMap>,
tenants: &'a std::sync::RwLockReadGuard<'a, TenantShardMap>,
tenant_shard_id: &TenantShardId,
mode: TenantSlotPeekMode,
) -> Result<Option<&'a TenantSlot>, TenantMapError> {
match tenants.deref() {
TenantsMap::Initializing => Err(TenantMapError::StillInitializing),
TenantsMap::ShuttingDown(m) => match mode {
TenantShardMap::Initializing => Err(TenantMapError::StillInitializing),
TenantShardMap::ShuttingDown(m) => match mode {
TenantSlotPeekMode::Read => Ok(Some(
// When reading in ShuttingDown state, we must translate None results
// into a ShuttingDown error, because absence of a tenant shard ID in the map
@@ -2658,7 +2669,7 @@ fn tenant_map_peek_slot<'a>(
)),
TenantSlotPeekMode::Write => Err(TenantMapError::ShuttingDown),
},
TenantsMap::Open(m) => Ok(m.get(tenant_shard_id)),
TenantShardMap::Open(m) => Ok(m.get(tenant_shard_id)),
}
}
@@ -2678,7 +2689,7 @@ fn tenant_map_acquire_slot(
fn tenant_map_acquire_slot_impl(
tenant_shard_id: &TenantShardId,
tenants: &std::sync::RwLock<TenantsMap>,
tenants: &std::sync::RwLock<TenantShardMap>,
mode: TenantSlotAcquireMode,
) -> Result<SlotGuard, TenantSlotError> {
use TenantSlotAcquireMode::*;
@@ -2689,9 +2700,9 @@ fn tenant_map_acquire_slot_impl(
let _guard = span.enter();
let m = match &mut *locked {
TenantsMap::Initializing => return Err(TenantMapError::StillInitializing.into()),
TenantsMap::ShuttingDown(_) => return Err(TenantMapError::ShuttingDown.into()),
TenantsMap::Open(m) => m,
TenantShardMap::Initializing => return Err(TenantMapError::StillInitializing.into()),
TenantShardMap::ShuttingDown(_) => return Err(TenantMapError::ShuttingDown.into()),
TenantShardMap::Open(m) => m,
};
use std::collections::btree_map::Entry;
@@ -2744,7 +2755,7 @@ fn tenant_map_acquire_slot_impl(
/// If the cleanup fails, tenant will stay in memory in [`TenantState::Broken`] state, and another removal
/// operation would be needed to remove it.
async fn remove_tenant_from_memory<V, F>(
tenants: &std::sync::RwLock<TenantsMap>,
tenants: &std::sync::RwLock<TenantShardMap>,
tenant_shard_id: TenantShardId,
tenant_cleanup: F,
) -> Result<V, TenantStateError>
@@ -2827,14 +2838,14 @@ mod tests {
use crate::tenant::mgr::TenantSlot;
use super::{super::harness::TenantHarness, TenantsMap};
use super::{super::harness::TenantShardHarness, TenantShardMap};
#[tokio::test(start_paused = true)]
async fn shutdown_awaits_in_progress_tenant() {
// Test that if an InProgress tenant is in the map during shutdown, the shutdown will gracefully
// wait for it to complete before proceeding.
let h = TenantHarness::create("shutdown_awaits_in_progress_tenant")
let h = TenantShardHarness::create("shutdown_awaits_in_progress_tenant")
.await
.unwrap();
let (t, _ctx) = h.load().await;
@@ -2848,7 +2859,7 @@ mod tests {
let _e = span.enter();
let tenants = BTreeMap::from([(id, TenantSlot::Attached(t.clone()))]);
let tenants = Arc::new(std::sync::RwLock::new(TenantsMap::Open(tenants)));
let tenants = Arc::new(std::sync::RwLock::new(TenantShardMap::Open(tenants)));
// Invoke remove_tenant_from_memory with a cleanup hook that blocks until we manually
// permit it to proceed: that will stick the tenant in InProgress

View File

@@ -133,7 +133,7 @@
//! - Initiate upload queue with that [`IndexPart`].
//! - Reschedule all lost operations by comparing the local filesystem state
//! and remote state as per [`IndexPart`]. This is done in
//! [`Tenant::timeline_init_and_sync`].
//! [`TenantShard::timeline_init_and_sync`].
//!
//! Note that if we crash during file deletion between the index update
//! that removes the file from the list of files, and deleting the remote file,
@@ -171,7 +171,7 @@
//! If no remote storage configuration is provided, the [`RemoteTimelineClient`] is
//! not created and the uploads are skipped.
//!
//! [`Tenant::timeline_init_and_sync`]: super::Tenant::timeline_init_and_sync
//! [`TenantShard::timeline_init_and_sync`]: super::TenantShard::timeline_init_and_sync
//! [`Timeline::load_layer_map`]: super::Timeline::load_layer_map
pub(crate) mod download;
@@ -2638,9 +2638,9 @@ mod tests {
context::RequestContext,
tenant::{
config::AttachmentMode,
harness::{TenantHarness, TIMELINE_ID},
harness::{TenantShardHarness, TIMELINE_ID},
storage_layer::layer::local_layer_path,
Tenant, Timeline,
TenantShard, Timeline,
},
DEFAULT_PG_VERSION,
};
@@ -2697,8 +2697,8 @@ mod tests {
}
struct TestSetup {
harness: TenantHarness,
tenant: Arc<Tenant>,
harness: TenantShardHarness,
tenant: Arc<TenantShard>,
timeline: Arc<Timeline>,
tenant_ctx: RequestContext,
}
@@ -2706,7 +2706,7 @@ mod tests {
impl TestSetup {
async fn new(test_name: &str) -> anyhow::Result<Self> {
let test_name = Box::leak(Box::new(format!("remote_timeline_client__{test_name}")));
let harness = TenantHarness::create(test_name).await?;
let harness = TenantShardHarness::create(test_name).await?;
let (tenant, ctx) = harness.load().await;
let timeline = tenant

View File

@@ -19,7 +19,7 @@ use self::{
use super::{
config::{SecondaryLocationConfig, TenantConfOpt},
mgr::TenantManager,
mgr::TenantShardManager,
span::debug_assert_current_span_has_tenant_id,
storage_layer::LayerName,
GetTenantError,
@@ -374,7 +374,7 @@ impl GlobalTasks {
}
pub fn spawn_tasks(
tenant_manager: Arc<TenantManager>,
tenant_manager: Arc<TenantShardManager>,
remote_storage: GenericRemoteStorage,
background_jobs_can_start: Barrier,
cancel: CancellationToken,

View File

@@ -39,7 +39,7 @@ use super::{
};
use crate::tenant::{
mgr::TenantManager,
mgr::TenantShardManager,
remote_timeline_client::{download::download_layer_file, remote_heatmap_path},
};
@@ -69,7 +69,7 @@ use super::{
const DEFAULT_DOWNLOAD_INTERVAL: Duration = Duration::from_millis(60000);
pub(super) async fn downloader_task(
tenant_manager: Arc<TenantManager>,
tenant_manager: Arc<TenantShardManager>,
remote_storage: GenericRemoteStorage,
command_queue: tokio::sync::mpsc::Receiver<CommandRequest<DownloadCommand>>,
background_jobs_can_start: Barrier,
@@ -92,7 +92,7 @@ pub(super) async fn downloader_task(
}
struct SecondaryDownloader {
tenant_manager: Arc<TenantManager>,
tenant_manager: Arc<TenantShardManager>,
remote_storage: GenericRemoteStorage,
root_ctx: RequestContext,
}

View File

@@ -10,11 +10,11 @@ use crate::{
tenant::{
config::AttachmentMode,
mgr::GetTenantError,
mgr::TenantManager,
mgr::TenantShardManager,
remote_timeline_client::remote_heatmap_path,
span::debug_assert_current_span_has_tenant_id,
tasks::{warn_when_period_overrun, BackgroundLoopKind},
Tenant,
TenantShard,
},
};
@@ -35,7 +35,7 @@ use tracing::{info_span, instrument, Instrument};
use utils::{backoff, completion::Barrier, yielding_loop::yielding_loop};
pub(super) async fn heatmap_uploader_task(
tenant_manager: Arc<TenantManager>,
tenant_manager: Arc<TenantShardManager>,
remote_storage: GenericRemoteStorage,
command_queue: tokio::sync::mpsc::Receiver<CommandRequest<UploadCommand>>,
background_jobs_can_start: Barrier,
@@ -61,7 +61,7 @@ pub(super) async fn heatmap_uploader_task(
/// handling loop and mutates it as needed: there are no locks here, because that event loop
/// can hold &mut references to this type throughout.
struct HeatmapUploader {
tenant_manager: Arc<TenantManager>,
tenant_manager: Arc<TenantShardManager>,
remote_storage: GenericRemoteStorage,
cancel: CancellationToken,
@@ -79,7 +79,7 @@ impl RunningJob for WriteInProgress {
}
struct UploadPending {
tenant: Arc<Tenant>,
tenant: Arc<TenantShard>,
last_upload: Option<LastUploadState>,
target_time: Option<Instant>,
period: Option<Duration>,
@@ -111,7 +111,7 @@ impl scheduler::Completion for WriteComplete {
struct UploaderTenantState {
// This Weak only exists to enable culling idle instances of this type
// when the Tenant has been deallocated.
tenant: Weak<Tenant>,
tenant: Weak<TenantShard>,
/// Digest of the serialized heatmap that we last successfully uploaded
last_upload_state: Option<LastUploadState>,
@@ -362,7 +362,7 @@ struct LastUploadState {
/// of the object we would have uploaded.
async fn upload_tenant_heatmap(
remote_storage: GenericRemoteStorage,
tenant: &Arc<Tenant>,
tenant: &Arc<TenantShard>,
last_upload: Option<LastUploadState>,
) -> Result<UploadHeatmapOutcome, UploadHeatmapError> {
debug_assert_current_span_has_tenant_id();

View File

@@ -361,7 +361,7 @@ where
/// Periodic execution phase: inspect all attached tenants and schedule any work they require.
///
/// The type in `tenants` should be a tenant-like structure, e.g. [`crate::tenant::Tenant`] or [`crate::tenant::secondary::SecondaryTenant`]
/// The type in `tenants` should be a tenant-like structure, e.g. [`crate::tenant::TenantShard`] or [`crate::tenant::secondary::SecondaryTenant`]
///
/// This function resets the pending list: it is assumed that the caller may change their mind about
/// which tenants need work between calls to schedule_iteration.

View File

@@ -11,7 +11,7 @@ use tokio_util::sync::CancellationToken;
use crate::context::RequestContext;
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
use super::{GcError, LogicalSizeCalculationCause, Tenant};
use super::{GcError, LogicalSizeCalculationCause, TenantShard};
use crate::tenant::{MaybeOffloaded, Timeline};
use utils::id::TimelineId;
use utils::lsn::Lsn;
@@ -159,7 +159,7 @@ pub struct TimelineInputs {
/// initdb_lsn branchpoints* next_pitr_cutoff latest
/// ```
pub(super) async fn gather_inputs(
tenant: &Tenant,
tenant: &TenantShard,
limit: &Arc<Semaphore>,
max_retention_period: Option<u64>,
logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,

View File

@@ -404,7 +404,7 @@ mod tests {
use crate::{
tenant::{
harness::{TenantHarness, TIMELINE_ID},
harness::{TenantShardHarness, TIMELINE_ID},
storage_layer::AsLayerDesc,
},
DEFAULT_PG_VERSION,
@@ -431,7 +431,7 @@ mod tests {
#[tokio::test]
async fn write_one_image() {
let harness = TenantHarness::create("split_writer_write_one_image")
let harness = TenantShardHarness::create("split_writer_write_one_image")
.await
.unwrap();
let (tenant, ctx) = harness.load().await;
@@ -510,7 +510,7 @@ mod tests {
/// Test the image+delta writer by writing a large number of images and deltas. If discard is
/// set to true, all layers will be discarded.
async fn write_split_helper(harness_name: &'static str, discard: bool) {
let harness = TenantHarness::create(harness_name).await.unwrap();
let harness = TenantShardHarness::create(harness_name).await.unwrap();
let (tenant, ctx) = harness.load().await;
let tline = tenant
@@ -605,7 +605,7 @@ mod tests {
#[tokio::test]
async fn write_large_img() {
let harness = TenantHarness::create("split_writer_write_large_img")
let harness = TenantShardHarness::create("split_writer_write_large_img")
.await
.unwrap();
let (tenant, ctx) = harness.load().await;
@@ -692,7 +692,7 @@ mod tests {
#[tokio::test]
async fn write_split_single_key() {
let harness = TenantHarness::create("split_writer_write_split_single_key")
let harness = TenantShardHarness::create("split_writer_write_split_single_key")
.await
.unwrap();
let (tenant, ctx) = harness.load().await;

View File

@@ -1618,11 +1618,11 @@ pub(crate) mod test {
use crate::tenant::harness::TIMELINE_ID;
use crate::tenant::storage_layer::{Layer, ResidentLayer};
use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner;
use crate::tenant::{Tenant, Timeline};
use crate::tenant::{TenantShard, Timeline};
use crate::{
context::DownloadBehavior,
task_mgr::TaskKind,
tenant::{disk_btree::tests::TestDisk, harness::TenantHarness},
tenant::{disk_btree::tests::TestDisk, harness::TenantShardHarness},
DEFAULT_PG_VERSION,
};
use bytes::Bytes;
@@ -1879,7 +1879,8 @@ pub(crate) mod test {
#[tokio::test]
async fn test_delta_layer_vectored_read_end_to_end() -> anyhow::Result<()> {
let harness = TenantHarness::create("test_delta_layer_oversized_vectored_read").await?;
let harness =
TenantShardHarness::create("test_delta_layer_oversized_vectored_read").await?;
let (tenant, ctx) = harness.load().await;
let timeline_id = TimelineId::generate();
@@ -1982,7 +1983,7 @@ pub(crate) mod test {
use bytes::Bytes;
use pageserver_api::record::NeonWalRecord;
let h = crate::tenant::harness::TenantHarness::create("truncate_delta_smoke")
let h = crate::tenant::harness::TenantShardHarness::create("truncate_delta_smoke")
.await
.unwrap();
let (tenant, ctx) = h.load().await;
@@ -2214,7 +2215,7 @@ pub(crate) mod test {
}
pub(crate) async fn produce_delta_layer(
tenant: &Tenant,
tenant: &TenantShard,
tline: &Arc<Timeline>,
mut deltas: Vec<(Key, Lsn, Value)>,
ctx: &RequestContext,
@@ -2268,7 +2269,9 @@ pub(crate) mod test {
#[tokio::test]
async fn delta_layer_iterator() {
let harness = TenantHarness::create("delta_layer_iterator").await.unwrap();
let harness = TenantShardHarness::create("delta_layer_iterator")
.await
.unwrap();
let (tenant, ctx) = harness.load().await;
let tline = tenant

View File

@@ -106,7 +106,7 @@ mod tests {
use crate::{
tenant::{
harness::{TenantHarness, TIMELINE_ID},
harness::{TenantShardHarness, TIMELINE_ID},
storage_layer::delta_layer::test::produce_delta_layer,
},
DEFAULT_PG_VERSION,
@@ -137,7 +137,7 @@ mod tests {
use bytes::Bytes;
use pageserver_api::value::Value;
let harness = TenantHarness::create("filter_iterator_filter_keyspace_iterator")
let harness = TenantShardHarness::create("filter_iterator_filter_keyspace_iterator")
.await
.unwrap();
let (tenant, ctx) = harness.load().await;

View File

@@ -1138,10 +1138,10 @@ mod test {
context::RequestContext,
tenant::{
config::TenantConf,
harness::{TenantHarness, TIMELINE_ID},
harness::{TenantShardHarness, TIMELINE_ID},
storage_layer::{Layer, ResidentLayer},
vectored_blob_io::StreamingVectoredReadPlanner,
Tenant, Timeline,
TenantShard, Timeline,
},
DEFAULT_PG_VERSION,
};
@@ -1170,7 +1170,7 @@ mod test {
// Create an unsharded parent with a layer.
//
let harness = TenantHarness::create_custom(
let harness = TenantShardHarness::create_custom(
"test_image_layer_rewrite--parent",
tenant_conf.clone(),
tenant_id,
@@ -1233,7 +1233,7 @@ mod test {
ShardStripeSize(0x8000),
)
.unwrap();
let harness = TenantHarness::create_custom(
let harness = TenantShardHarness::create_custom(
Box::leak(Box::new(format!(
"test_image_layer_rewrite--child{}",
shard_identity.shard_slug()
@@ -1324,7 +1324,7 @@ mod test {
}
async fn produce_image_layer(
tenant: &Tenant,
tenant: &TenantShard,
tline: &Arc<Timeline>,
mut images: Vec<(Key, Bytes)>,
lsn: Lsn,
@@ -1380,7 +1380,9 @@ mod test {
#[tokio::test]
async fn image_layer_iterator() {
let harness = TenantHarness::create("image_layer_iterator").await.unwrap();
let harness = TenantShardHarness::create("image_layer_iterator")
.await
.unwrap();
let (tenant, ctx) = harness.load().await;
let tline = tenant

View File

@@ -10,7 +10,7 @@ use utils::{
use super::failpoints::{Failpoint, FailpointKind};
use super::*;
use crate::{context::DownloadBehavior, tenant::storage_layer::LayerVisibilityHint};
use crate::{task_mgr::TaskKind, tenant::harness::TenantHarness};
use crate::{task_mgr::TaskKind, tenant::harness::TenantShardHarness};
/// Used in tests to advance a future to wanted await point, and not futher.
const ADVANCE: std::time::Duration = std::time::Duration::from_secs(3600);
@@ -24,7 +24,7 @@ const FOREVER: std::time::Duration = std::time::Duration::from_secs(ADVANCE.as_s
async fn smoke_test() {
let handle = tokio::runtime::Handle::current();
let h = TenantHarness::create("smoke_test").await.unwrap();
let h = TenantShardHarness::create("smoke_test").await.unwrap();
let span = h.span();
let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
let (tenant, _) = h.load().await;
@@ -202,7 +202,7 @@ async fn evict_and_wait_on_wanted_deleted() {
// this is the runtime on which Layer spawns the blocking tasks on
let handle = tokio::runtime::Handle::current();
let h = TenantHarness::create("evict_and_wait_on_wanted_deleted")
let h = TenantShardHarness::create("evict_and_wait_on_wanted_deleted")
.await
.unwrap();
utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
@@ -286,7 +286,7 @@ fn read_wins_pending_eviction() {
rt.block_on(async move {
// this is the runtime on which Layer spawns the blocking tasks on
let handle = tokio::runtime::Handle::current();
let h = TenantHarness::create("read_wins_pending_eviction")
let h = TenantShardHarness::create("read_wins_pending_eviction")
.await
.unwrap();
let (tenant, ctx) = h.load().await;
@@ -420,7 +420,7 @@ fn multiple_pending_evictions_scenario(name: &'static str, in_order: bool) {
rt.block_on(async move {
// this is the runtime on which Layer spawns the blocking tasks on
let handle = tokio::runtime::Handle::current();
let h = TenantHarness::create(name).await.unwrap();
let h = TenantShardHarness::create(name).await.unwrap();
let (tenant, ctx) = h.load().await;
let span = h.span();
let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
@@ -589,7 +589,7 @@ fn multiple_pending_evictions_scenario(name: &'static str, in_order: bool) {
#[tokio::test(start_paused = true)]
async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() {
let handle = tokio::runtime::Handle::current();
let h = TenantHarness::create("cancelled_get_or_maybe_download_does_not_cancel_eviction")
let h = TenantShardHarness::create("cancelled_get_or_maybe_download_does_not_cancel_eviction")
.await
.unwrap();
let (tenant, ctx) = h.load().await;
@@ -667,7 +667,7 @@ async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() {
#[tokio::test(start_paused = true)]
async fn evict_and_wait_does_not_wait_for_download() {
// let handle = tokio::runtime::Handle::current();
let h = TenantHarness::create("evict_and_wait_does_not_wait_for_download")
let h = TenantShardHarness::create("evict_and_wait_does_not_wait_for_download")
.await
.unwrap();
let (tenant, ctx) = h.load().await;
@@ -766,7 +766,7 @@ async fn eviction_cancellation_on_drop() {
// this is the runtime on which Layer spawns the blocking tasks on
let handle = tokio::runtime::Handle::current();
let h = TenantHarness::create("eviction_cancellation_on_drop")
let h = TenantShardHarness::create("eviction_cancellation_on_drop")
.await
.unwrap();
utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();

View File

@@ -357,7 +357,7 @@ mod tests {
use crate::{
tenant::{
harness::{TenantHarness, TIMELINE_ID},
harness::{TenantShardHarness, TIMELINE_ID},
storage_layer::delta_layer::test::{produce_delta_layer, sort_delta},
},
DEFAULT_PG_VERSION,
@@ -393,7 +393,7 @@ mod tests {
use bytes::Bytes;
use pageserver_api::value::Value;
let harness = TenantHarness::create("merge_iterator_merge_in_between")
let harness = TenantShardHarness::create("merge_iterator_merge_in_between")
.await
.unwrap();
let (tenant, ctx) = harness.load().await;
@@ -458,7 +458,7 @@ mod tests {
use bytes::Bytes;
use pageserver_api::value::Value;
let harness = TenantHarness::create("merge_iterator_delta_merge")
let harness = TenantShardHarness::create("merge_iterator_delta_merge")
.await
.unwrap();
let (tenant, ctx) = harness.load().await;
@@ -535,7 +535,7 @@ mod tests {
use bytes::Bytes;
use pageserver_api::value::Value;
let harness = TenantHarness::create("merge_iterator_delta_image_mixed_merge")
let harness = TenantShardHarness::create("merge_iterator_delta_image_mixed_merge")
.await
.unwrap();
let (tenant, ctx) = harness.load().await;

View File

@@ -12,7 +12,7 @@ use crate::task_mgr;
use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::throttle::Stats;
use crate::tenant::timeline::CompactionError;
use crate::tenant::{Tenant, TenantState};
use crate::tenant::{TenantShard, TenantState};
use rand::Rng;
use tokio_util::sync::CancellationToken;
use tracing::*;
@@ -81,7 +81,7 @@ pub(crate) async fn concurrent_background_tasks_rate_limit_permit(
/// Start per tenant background loops: compaction and gc.
pub fn start_background_loops(
tenant: &Arc<Tenant>,
tenant: &Arc<TenantShard>,
background_jobs_can_start: Option<&completion::Barrier>,
) {
let tenant_shard_id = tenant.tenant_shard_id;
@@ -158,7 +158,7 @@ pub fn start_background_loops(
///
/// Compaction task's main loop
///
async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
async fn compaction_loop(tenant: Arc<TenantShard>, cancel: CancellationToken) {
const MAX_BACKOFF_SECS: f64 = 300.0;
// How many errors we have seen consequtively
let mut error_run_count = 0;
@@ -318,7 +318,7 @@ fn log_compaction_error(
///
/// GC task's main loop
///
async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
async fn gc_loop(tenant: Arc<TenantShard>, cancel: CancellationToken) {
const MAX_BACKOFF_SECS: f64 = 300.0;
// How many errors we have seen consequtively
let mut error_run_count = 0;
@@ -418,7 +418,7 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
}
async fn ingest_housekeeping_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
async fn ingest_housekeeping_loop(tenant: Arc<TenantShard>, cancel: CancellationToken) {
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
async {
let mut last_throttle_flag_reset_at = Instant::now();
@@ -496,7 +496,7 @@ async fn ingest_housekeeping_loop(tenant: Arc<Tenant>, cancel: CancellationToken
TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
}
async fn wait_for_active_tenant(tenant: &Arc<Tenant>) -> ControlFlow<()> {
async fn wait_for_active_tenant(tenant: &Arc<TenantShard>) -> ControlFlow<()> {
// if the tenant has a proper status already, no need to wait for anything
if tenant.current_state() == TenantState::Active {
ControlFlow::Continue(())

View File

@@ -413,7 +413,7 @@ pub struct Timeline {
/// Timeline deletion will acquire both compaction and gc locks in whatever order.
gc_lock: tokio::sync::Mutex<()>,
/// Cloned from [`super::Tenant::pagestream_throttle`] on construction.
/// Cloned from [`super::TenantShard::pagestream_throttle`] on construction.
pub(crate) pagestream_throttle: Arc<crate::tenant::throttle::Throttle>,
/// Size estimator for aux file v2
@@ -1700,7 +1700,7 @@ impl Timeline {
pub(crate) fn activate(
self: &Arc<Self>,
parent: Arc<crate::tenant::Tenant>,
parent: Arc<crate::tenant::TenantShard>,
broker_client: BrokerClientChannel,
background_jobs_can_start: Option<&completion::Barrier>,
ctx: &RequestContext,
@@ -2675,7 +2675,7 @@ impl Timeline {
// (1) and (4)
// TODO: this is basically a no-op now, should we remove it?
self.remote_client.schedule_barrier()?;
// Tenant::create_timeline will wait for these uploads to happen before returning, or
// TenantShard::create_timeline will wait for these uploads to happen before returning, or
// on retry.
// Now that we have the full layer map, we may calculate the visibility of layers within it (a global scan)
@@ -4575,7 +4575,7 @@ impl Timeline {
/// from our ancestor to be branches of this timeline.
pub(crate) async fn prepare_to_detach_from_ancestor(
self: &Arc<Timeline>,
tenant: &crate::tenant::Tenant,
tenant: &crate::tenant::TenantShard,
options: detach_ancestor::Options,
ctx: &RequestContext,
) -> Result<detach_ancestor::Progress, detach_ancestor::Error> {
@@ -4593,7 +4593,7 @@ impl Timeline {
/// resetting the tenant.
pub(crate) async fn detach_from_ancestor_and_reparent(
self: &Arc<Timeline>,
tenant: &crate::tenant::Tenant,
tenant: &crate::tenant::TenantShard,
prepared: detach_ancestor::PreparedTimelineDetach,
ctx: &RequestContext,
) -> Result<detach_ancestor::DetachingAndReparenting, detach_ancestor::Error> {
@@ -4605,7 +4605,7 @@ impl Timeline {
/// The tenant must've been reset if ancestry was modified previously (in tenant manager).
pub(crate) async fn complete_detaching_timeline_ancestor(
self: &Arc<Timeline>,
tenant: &crate::tenant::Tenant,
tenant: &crate::tenant::TenantShard,
attempt: detach_ancestor::Attempt,
ctx: &RequestContext,
) -> Result<(), detach_ancestor::Error> {
@@ -5609,14 +5609,14 @@ impl Timeline {
/// Persistently blocks gc for `Manual` reason.
///
/// Returns true if no such block existed before, false otherwise.
pub(crate) async fn block_gc(&self, tenant: &super::Tenant) -> anyhow::Result<bool> {
pub(crate) async fn block_gc(&self, tenant: &super::TenantShard) -> anyhow::Result<bool> {
use crate::tenant::remote_timeline_client::index::GcBlockingReason;
assert_eq!(self.tenant_shard_id, tenant.tenant_shard_id);
tenant.gc_block.insert(self, GcBlockingReason::Manual).await
}
/// Persistently unblocks gc for `Manual` reason.
pub(crate) async fn unblock_gc(&self, tenant: &super::Tenant) -> anyhow::Result<()> {
pub(crate) async fn unblock_gc(&self, tenant: &super::TenantShard) -> anyhow::Result<()> {
use crate::tenant::remote_timeline_client::index::GcBlockingReason;
assert_eq!(self.tenant_shard_id, tenant.tenant_shard_id);
tenant.gc_block.remove(self, GcBlockingReason::Manual).await
@@ -5634,8 +5634,8 @@ impl Timeline {
/// Force create an image layer and place it into the layer map.
///
/// DO NOT use this function directly. Use [`Tenant::branch_timeline_test_with_layers`]
/// or [`Tenant::create_test_timeline_with_layers`] to ensure all these layers are
/// DO NOT use this function directly. Use [`TenantShard::branch_timeline_test_with_layers`]
/// or [`TenantShard::create_test_timeline_with_layers`] to ensure all these layers are
/// placed into the layer map in one run AND be validated.
#[cfg(test)]
pub(super) async fn force_create_image_layer(
@@ -5681,8 +5681,8 @@ impl Timeline {
/// Force create a delta layer and place it into the layer map.
///
/// DO NOT use this function directly. Use [`Tenant::branch_timeline_test_with_layers`]
/// or [`Tenant::create_test_timeline_with_layers`] to ensure all these layers are
/// DO NOT use this function directly. Use [`TenantShard::branch_timeline_test_with_layers`]
/// or [`TenantShard::create_test_timeline_with_layers`] to ensure all these layers are
/// placed into the layer map in one run AND be validated.
#[cfg(test)]
pub(super) async fn force_create_delta_layer(
@@ -6064,7 +6064,7 @@ mod tests {
use utils::{id::TimelineId, lsn::Lsn};
use crate::tenant::{
harness::{test_img, TenantHarness},
harness::{test_img, TenantShardHarness},
layer_map::LayerMap,
storage_layer::{Layer, LayerName},
timeline::{DeltaLayerTestDesc, EvictionError},
@@ -6073,7 +6073,9 @@ mod tests {
#[tokio::test]
async fn test_heatmap_generation() {
let harness = TenantHarness::create("heatmap_generation").await.unwrap();
let harness = TenantShardHarness::create("heatmap_generation")
.await
.unwrap();
let covered_delta = DeltaLayerTestDesc::new_with_inferred_key_range(
Lsn(0x10)..Lsn(0x20),
@@ -6163,7 +6165,7 @@ mod tests {
#[tokio::test]
async fn two_layer_eviction_attempts_at_the_same_time() {
let harness = TenantHarness::create("two_layer_eviction_attempts_at_the_same_time")
let harness = TenantShardHarness::create("two_layer_eviction_attempts_at_the_same_time")
.await
.unwrap();

View File

@@ -16,8 +16,8 @@ use crate::{
tenant::{
metadata::TimelineMetadata,
remote_timeline_client::{PersistIndexPartWithDeletedFlagError, RemoteTimelineClient},
CreateTimelineCause, DeleteTimelineError, MaybeDeletedIndexPart, Tenant,
TenantManifestError, TimelineOrOffloaded,
CreateTimelineCause, DeleteTimelineError, MaybeDeletedIndexPart, TenantManifestError,
TenantShard, TimelineOrOffloaded,
},
virtual_file::MaybeFatalIo,
};
@@ -114,7 +114,7 @@ pub(super) async fn delete_local_timeline_directory(
/// It is important that this gets called when DeletionGuard is being held.
/// For more context see comments in [`DeleteTimelineFlow::prepare`]
async fn remove_maybe_offloaded_timeline_from_tenant(
tenant: &Tenant,
tenant: &TenantShard,
timeline: &TimelineOrOffloaded,
_: &DeletionGuard, // using it as a witness
) -> anyhow::Result<()> {
@@ -188,7 +188,7 @@ impl DeleteTimelineFlow {
// error out if some of the shutdown tasks have already been completed!
#[instrument(skip_all)]
pub async fn run(
tenant: &Arc<Tenant>,
tenant: &Arc<TenantShard>,
timeline_id: TimelineId,
) -> Result<(), DeleteTimelineError> {
super::debug_assert_current_span_has_tenant_and_timeline_id();
@@ -286,7 +286,7 @@ impl DeleteTimelineFlow {
/// Shortcut to create Timeline in stopping state and spawn deletion task.
#[instrument(skip_all, fields(%timeline_id))]
pub(crate) async fn resume_deletion(
tenant: Arc<Tenant>,
tenant: Arc<TenantShard>,
timeline_id: TimelineId,
local_metadata: &TimelineMetadata,
remote_client: RemoteTimelineClient,
@@ -334,7 +334,7 @@ impl DeleteTimelineFlow {
}
pub(super) fn prepare(
tenant: &Tenant,
tenant: &TenantShard,
timeline_id: TimelineId,
allow_offloaded_children: bool,
set_stopping: bool,
@@ -405,7 +405,7 @@ impl DeleteTimelineFlow {
fn schedule_background(
guard: DeletionGuard,
conf: &'static PageServerConf,
tenant: Arc<Tenant>,
tenant: Arc<TenantShard>,
timeline: TimelineOrOffloaded,
remote_client: Arc<RemoteTimelineClient>,
) {
@@ -439,7 +439,7 @@ impl DeleteTimelineFlow {
async fn background(
mut guard: DeletionGuard,
conf: &PageServerConf,
tenant: &Tenant,
tenant: &TenantShard,
timeline: &TimelineOrOffloaded,
remote_client: Arc<RemoteTimelineClient>,
) -> Result<(), DeleteTimelineError> {

View File

@@ -7,7 +7,7 @@ use crate::{
tenant::{
remote_timeline_client::index::GcBlockingReason::DetachAncestor,
storage_layer::{AsLayerDesc as _, DeltaLayerWriter, Layer, ResidentLayer},
Tenant,
TenantShard,
},
virtual_file::{MaybeFatalIo, VirtualFile},
};
@@ -159,7 +159,7 @@ impl Attempt {
/// See [`Timeline::prepare_to_detach_from_ancestor`]
pub(super) async fn prepare(
detached: &Arc<Timeline>,
tenant: &Tenant,
tenant: &TenantShard,
options: Options,
ctx: &RequestContext,
) -> Result<Progress, Error> {
@@ -410,7 +410,7 @@ pub(super) async fn prepare(
Ok(Progress::Prepared(attempt, prepared))
}
async fn start_new_attempt(detached: &Timeline, tenant: &Tenant) -> Result<Attempt, Error> {
async fn start_new_attempt(detached: &Timeline, tenant: &TenantShard) -> Result<Attempt, Error> {
let attempt = obtain_exclusive_attempt(detached, tenant)?;
// insert the block in the index_part.json, if not already there.
@@ -426,13 +426,16 @@ async fn start_new_attempt(detached: &Timeline, tenant: &Tenant) -> Result<Attem
Ok(attempt)
}
async fn continue_with_blocked_gc(detached: &Timeline, tenant: &Tenant) -> Result<Attempt, Error> {
async fn continue_with_blocked_gc(
detached: &Timeline,
tenant: &TenantShard,
) -> Result<Attempt, Error> {
// FIXME: it would be nice to confirm that there is an in-memory version, since we've just
// verified there is a persistent one?
obtain_exclusive_attempt(detached, tenant)
}
fn obtain_exclusive_attempt(detached: &Timeline, tenant: &Tenant) -> Result<Attempt, Error> {
fn obtain_exclusive_attempt(detached: &Timeline, tenant: &TenantShard) -> Result<Attempt, Error> {
use Error::{OtherTimelineDetachOngoing, ShuttingDown};
// ensure we are the only active attempt for this tenant
@@ -460,7 +463,7 @@ fn obtain_exclusive_attempt(detached: &Timeline, tenant: &Tenant) -> Result<Atte
fn reparented_direct_children(
detached: &Arc<Timeline>,
tenant: &Tenant,
tenant: &TenantShard,
) -> Result<HashSet<TimelineId>, Error> {
let mut all_direct_children = tenant
.timelines
@@ -698,7 +701,7 @@ impl DetachingAndReparenting {
/// See [`Timeline::detach_from_ancestor_and_reparent`].
pub(super) async fn detach_and_reparent(
detached: &Arc<Timeline>,
tenant: &Tenant,
tenant: &TenantShard,
prepared: PreparedTimelineDetach,
_ctx: &RequestContext,
) -> Result<DetachingAndReparenting, Error> {
@@ -901,7 +904,7 @@ pub(super) async fn detach_and_reparent(
pub(super) async fn complete(
detached: &Arc<Timeline>,
tenant: &Tenant,
tenant: &TenantShard,
mut attempt: Attempt,
_ctx: &RequestContext,
) -> Result<(), Error> {
@@ -970,7 +973,7 @@ where
}
fn check_no_archived_children_of_ancestor(
tenant: &Tenant,
tenant: &TenantShard,
detached: &Arc<Timeline>,
ancestor: &Arc<Timeline>,
ancestor_lsn: Lsn,

View File

@@ -31,7 +31,8 @@ use crate::{
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
tenant::{
size::CalculateSyntheticSizeError, storage_layer::LayerVisibilityHint,
tasks::BackgroundLoopKind, timeline::EvictionError, LogicalSizeCalculationCause, Tenant,
tasks::BackgroundLoopKind, timeline::EvictionError, LogicalSizeCalculationCause,
TenantShard,
},
};
@@ -52,7 +53,7 @@ pub struct EvictionTaskTenantState {
impl Timeline {
pub(super) fn launch_eviction_task(
self: &Arc<Self>,
parent: Arc<Tenant>,
parent: Arc<TenantShard>,
background_tasks_can_start: Option<&completion::Barrier>,
) {
let self_clone = Arc::clone(self);
@@ -79,7 +80,7 @@ impl Timeline {
}
#[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))]
async fn eviction_task(self: Arc<Self>, tenant: Arc<Tenant>) {
async fn eviction_task(self: Arc<Self>, tenant: Arc<TenantShard>) {
use crate::tenant::tasks::random_init_delay;
// acquire the gate guard only once within a useful span
@@ -123,7 +124,7 @@ impl Timeline {
#[instrument(skip_all, fields(policy_kind = policy.discriminant_str()))]
async fn eviction_iteration(
self: &Arc<Self>,
tenant: &Tenant,
tenant: &TenantShard,
policy: &EvictionPolicy,
cancel: &CancellationToken,
gate: &GateGuard,
@@ -180,7 +181,7 @@ impl Timeline {
async fn eviction_iteration_threshold(
self: &Arc<Self>,
tenant: &Tenant,
tenant: &TenantShard,
p: &EvictionPolicyLayerAccessThreshold,
cancel: &CancellationToken,
gate: &GateGuard,
@@ -314,7 +315,7 @@ impl Timeline {
/// disk usage based eviction task.
async fn imitiate_only(
self: &Arc<Self>,
tenant: &Tenant,
tenant: &TenantShard,
p: &EvictionPolicyLayerAccessThreshold,
cancel: &CancellationToken,
gate: &GateGuard,
@@ -370,7 +371,7 @@ impl Timeline {
#[instrument(skip_all)]
async fn imitate_layer_accesses(
&self,
tenant: &Tenant,
tenant: &TenantShard,
p: &EvictionPolicyLayerAccessThreshold,
cancel: &CancellationToken,
gate: &GateGuard,
@@ -506,7 +507,7 @@ impl Timeline {
#[instrument(skip_all)]
async fn imitate_synthetic_size_calculation_worker(
&self,
tenant: &Tenant,
tenant: &TenantShard,
cancel: &CancellationToken,
ctx: &RequestContext,
) {

View File

@@ -6,7 +6,7 @@ use super::delete::{delete_local_timeline_directory, DeleteTimelineFlow, Deletio
use super::Timeline;
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::remote_timeline_client::ShutdownIfArchivedError;
use crate::tenant::{OffloadedTimeline, Tenant, TenantManifestError, TimelineOrOffloaded};
use crate::tenant::{OffloadedTimeline, TenantManifestError, TenantShard, TimelineOrOffloaded};
#[derive(thiserror::Error, Debug)]
pub(crate) enum OffloadError {
@@ -30,7 +30,7 @@ impl From<TenantManifestError> for OffloadError {
}
pub(crate) async fn offload_timeline(
tenant: &Tenant,
tenant: &TenantShard,
timeline: &Arc<Timeline>,
) -> Result<(), OffloadError> {
debug_assert_current_span_has_tenant_and_timeline_id();
@@ -110,7 +110,7 @@ pub(crate) async fn offload_timeline(
///
/// Returns the strong count of the timeline `Arc`
fn remove_timeline_from_tenant(
tenant: &Tenant,
tenant: &TenantShard,
timeline: &Timeline,
_: &DeletionGuard, // using it as a witness
) -> usize {

View File

@@ -8,7 +8,7 @@ use utils::{fs_ext, id::TimelineId, lsn::Lsn, sync::gate::GateGuard};
use crate::{
context::RequestContext,
import_datadir,
tenant::{CreateTimelineIdempotency, Tenant, TimelineOrOffloaded},
tenant::{CreateTimelineIdempotency, TenantShard, TimelineOrOffloaded},
};
use super::Timeline;
@@ -16,19 +16,19 @@ use super::Timeline;
/// A timeline with some of its files on disk, being initialized.
/// This struct ensures the atomicity of the timeline init: it's either properly created and inserted into pageserver's memory, or
/// its local files are removed. If we crash while this class exists, then the timeline's local
/// state is cleaned up during [`Tenant::clean_up_timelines`], because the timeline's content isn't in remote storage.
/// state is cleaned up during [`TenantShard::clean_up_timelines`], because the timeline's content isn't in remote storage.
///
/// The caller is responsible for proper timeline data filling before the final init.
#[must_use]
pub struct UninitializedTimeline<'t> {
pub(crate) owning_tenant: &'t Tenant,
pub(crate) owning_tenant: &'t TenantShard,
timeline_id: TimelineId,
raw_timeline: Option<(Arc<Timeline>, TimelineCreateGuard)>,
}
impl<'t> UninitializedTimeline<'t> {
pub(crate) fn new(
owning_tenant: &'t Tenant,
owning_tenant: &'t TenantShard,
timeline_id: TimelineId,
raw_timeline: Option<(Arc<Timeline>, TimelineCreateGuard)>,
) -> Self {
@@ -94,7 +94,7 @@ impl<'t> UninitializedTimeline<'t> {
/// Prepares timeline data by loading it from the basebackup archive.
pub(crate) async fn import_basebackup_from_tar(
self,
tenant: Arc<Tenant>,
tenant: Arc<TenantShard>,
copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin),
base_lsn: Lsn,
broker_client: storage_broker::BrokerClientChannel,
@@ -163,17 +163,17 @@ pub(crate) fn cleanup_timeline_directory(create_guard: TimelineCreateGuard) {
error!("Failed to clean up uninitialized timeline directory {timeline_path:?}: {e:?}")
}
}
// Having cleaned up, we can release this TimelineId in `[Tenant::timelines_creating]` to allow other
// Having cleaned up, we can release this TimelineId in `[TenantShard::timelines_creating]` to allow other
// timeline creation attempts under this TimelineId to proceed
drop(create_guard);
}
/// A guard for timeline creations in process: as long as this object exists, the timeline ID
/// is kept in `[Tenant::timelines_creating]` to exclude concurrent attempts to create the same timeline.
/// is kept in `[TenantShard::timelines_creating]` to exclude concurrent attempts to create the same timeline.
#[must_use]
pub(crate) struct TimelineCreateGuard {
pub(crate) _tenant_gate_guard: GateGuard,
pub(crate) owning_tenant: Arc<Tenant>,
pub(crate) owning_tenant: Arc<TenantShard>,
pub(crate) timeline_id: TimelineId,
pub(crate) timeline_path: Utf8PathBuf,
pub(crate) idempotency: CreateTimelineIdempotency,
@@ -199,7 +199,7 @@ pub(crate) enum TimelineExclusionError {
impl TimelineCreateGuard {
pub(crate) fn new(
owning_tenant: &Arc<Tenant>,
owning_tenant: &Arc<TenantShard>,
timeline_id: TimelineId,
timeline_path: Utf8PathBuf,
idempotency: CreateTimelineIdempotency,

View File

@@ -1117,7 +1117,7 @@ impl ReconnectReason {
#[cfg(test)]
mod tests {
use super::*;
use crate::tenant::harness::{TenantHarness, TIMELINE_ID};
use crate::tenant::harness::{TenantShardHarness, TIMELINE_ID};
use pageserver_api::config::defaults::DEFAULT_WAL_RECEIVER_PROTOCOL;
use url::Host;
@@ -1141,7 +1141,7 @@ mod tests {
#[tokio::test]
async fn no_connection_no_candidate() -> anyhow::Result<()> {
let harness = TenantHarness::create("no_connection_no_candidate").await?;
let harness = TenantShardHarness::create("no_connection_no_candidate").await?;
let mut state = dummy_state(&harness).await;
let now = Utc::now().naive_utc();
@@ -1174,7 +1174,7 @@ mod tests {
#[tokio::test]
async fn connection_no_candidate() -> anyhow::Result<()> {
let harness = TenantHarness::create("connection_no_candidate").await?;
let harness = TenantShardHarness::create("connection_no_candidate").await?;
let mut state = dummy_state(&harness).await;
let now = Utc::now().naive_utc();
@@ -1239,7 +1239,7 @@ mod tests {
#[tokio::test]
async fn no_connection_candidate() -> anyhow::Result<()> {
let harness = TenantHarness::create("no_connection_candidate").await?;
let harness = TenantShardHarness::create("no_connection_candidate").await?;
let mut state = dummy_state(&harness).await;
let now = Utc::now().naive_utc();
@@ -1302,7 +1302,7 @@ mod tests {
#[tokio::test]
async fn candidate_with_many_connection_failures() -> anyhow::Result<()> {
let harness = TenantHarness::create("candidate_with_many_connection_failures").await?;
let harness = TenantShardHarness::create("candidate_with_many_connection_failures").await?;
let mut state = dummy_state(&harness).await;
let now = Utc::now().naive_utc();
@@ -1342,7 +1342,7 @@ mod tests {
#[tokio::test]
async fn lsn_wal_over_threshold_current_candidate() -> anyhow::Result<()> {
let harness = TenantHarness::create("lsn_wal_over_threshcurrent_candidate").await?;
let harness = TenantShardHarness::create("lsn_wal_over_threshcurrent_candidate").await?;
let mut state = dummy_state(&harness).await;
let current_lsn = Lsn(100_000).align();
let now = Utc::now().naive_utc();
@@ -1409,7 +1409,7 @@ mod tests {
#[tokio::test]
async fn timeout_connection_threshold_current_candidate() -> anyhow::Result<()> {
let harness =
TenantHarness::create("timeout_connection_threshold_current_candidate").await?;
TenantShardHarness::create("timeout_connection_threshold_current_candidate").await?;
let mut state = dummy_state(&harness).await;
let current_lsn = Lsn(100_000).align();
let now = Utc::now().naive_utc();
@@ -1472,7 +1472,8 @@ mod tests {
#[tokio::test]
async fn timeout_wal_over_threshold_current_candidate() -> anyhow::Result<()> {
let harness = TenantHarness::create("timeout_wal_over_threshold_current_candidate").await?;
let harness =
TenantShardHarness::create("timeout_wal_over_threshold_current_candidate").await?;
let mut state = dummy_state(&harness).await;
let current_lsn = Lsn(100_000).align();
let new_lsn = Lsn(100_100).align();
@@ -1540,7 +1541,7 @@ mod tests {
const DUMMY_SAFEKEEPER_HOST: &str = "safekeeper_connstr";
async fn dummy_state(harness: &TenantHarness) -> ConnectionManagerState {
async fn dummy_state(harness: &TenantShardHarness) -> ConnectionManagerState {
let (tenant, ctx) = harness.load().await;
let timeline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x8), crate::DEFAULT_PG_VERSION, &ctx)
@@ -1575,7 +1576,7 @@ mod tests {
// and pageserver should prefer to connect to it.
let test_az = Some("test_az".to_owned());
let harness = TenantHarness::create("switch_to_same_availability_zone").await?;
let harness = TenantShardHarness::create("switch_to_same_availability_zone").await?;
let mut state = dummy_state(&harness).await;
state.conf.availability_zone.clone_from(&test_az);
let current_lsn = Lsn(100_000).align();

View File

@@ -563,7 +563,7 @@ impl UploadOp {
#[cfg(test)]
mod tests {
use super::*;
use crate::tenant::harness::{TenantHarness, TIMELINE_ID};
use crate::tenant::harness::{TenantShardHarness, TIMELINE_ID};
use crate::tenant::storage_layer::layer::local_layer_path;
use crate::tenant::storage_layer::Layer;
use crate::tenant::Timeline;
@@ -621,7 +621,7 @@ mod tests {
runtime
.block_on(async {
let harness = TenantHarness::create(test_name).await?;
let harness = TenantShardHarness::create(test_name).await?;
let (tenant, ctx) = harness.load().await;
tenant
.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)

View File

@@ -9,12 +9,14 @@ use utils::serde_percent::Percent;
use pageserver_api::models::PageserverUtilization;
use crate::{config::PageServerConf, metrics::NODE_UTILIZATION_SCORE, tenant::mgr::TenantManager};
use crate::{
config::PageServerConf, metrics::NODE_UTILIZATION_SCORE, tenant::mgr::TenantShardManager,
};
pub(crate) fn regenerate(
conf: &PageServerConf,
tenants_path: &Path,
tenant_manager: &TenantManager,
tenant_manager: &TenantShardManager,
) -> anyhow::Result<PageserverUtilization> {
let statvfs = nix::sys::statvfs::statvfs(tenants_path)
.map_err(std::io::Error::from)

View File

@@ -1531,7 +1531,10 @@ mod tests {
#[tokio::test]
async fn test_relsize() -> Result<()> {
let (tenant, ctx) = TenantHarness::create("test_relsize").await?.load().await;
let (tenant, ctx) = TenantShardHarness::create("test_relsize")
.await?
.load()
.await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
.await?;
@@ -1771,7 +1774,7 @@ mod tests {
// and then created it again within the same layer.
#[tokio::test]
async fn test_drop_extend() -> Result<()> {
let (tenant, ctx) = TenantHarness::create("test_drop_extend")
let (tenant, ctx) = TenantShardHarness::create("test_drop_extend")
.await?
.load()
.await;
@@ -1847,7 +1850,7 @@ mod tests {
// and then extended it again within the same layer.
#[tokio::test]
async fn test_truncate_extend() -> Result<()> {
let (tenant, ctx) = TenantHarness::create("test_truncate_extend")
let (tenant, ctx) = TenantShardHarness::create("test_truncate_extend")
.await?
.load()
.await;
@@ -2001,7 +2004,10 @@ mod tests {
/// split into multiple 1 GB segments in Postgres.
#[tokio::test]
async fn test_large_rel() -> Result<()> {
let (tenant, ctx) = TenantHarness::create("test_large_rel").await?.load().await;
let (tenant, ctx) = TenantShardHarness::create("test_large_rel")
.await?
.load()
.await;
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(8), DEFAULT_PG_VERSION, &ctx)
.await?;
@@ -2109,7 +2115,9 @@ mod tests {
let startpoint = Lsn::from_hex("14AEC08").unwrap();
let _endpoint = Lsn::from_hex("1FFFF98").unwrap();
let harness = TenantHarness::create("test_ingest_real_wal").await.unwrap();
let harness = TenantShardHarness::create("test_ingest_real_wal")
.await
.unwrap();
let span = harness
.span()
.in_scope(|| info_span!("timeline_span", timeline_id=%TIMELINE_ID));