From 2fa492943ad33375e41e40dffac02aa2d23280cc Mon Sep 17 00:00:00 2001 From: John Spray Date: Thu, 16 Jan 2025 15:19:15 +0000 Subject: [PATCH] pageserver: rename Tenant to TenantShard --- pageserver/src/consumption_metrics.rs | 4 +- pageserver/src/consumption_metrics/metrics.rs | 4 +- pageserver/src/http/routes.rs | 4 +- pageserver/src/tenant.rs | 44 ++++++------- pageserver/src/tenant/mgr.rs | 61 +++++++++++-------- .../src/tenant/remote_timeline_client.rs | 4 +- .../src/tenant/secondary/heatmap_uploader.rs | 8 +-- pageserver/src/tenant/size.rs | 4 +- .../src/tenant/storage_layer/delta_layer.rs | 4 +- .../src/tenant/storage_layer/image_layer.rs | 4 +- pageserver/src/tenant/tasks.rs | 12 ++-- pageserver/src/tenant/timeline.rs | 12 ++-- pageserver/src/tenant/timeline/delete.rs | 16 ++--- .../src/tenant/timeline/detach_ancestor.rs | 21 ++++--- .../src/tenant/timeline/eviction_task.rs | 17 +++--- pageserver/src/tenant/timeline/offload.rs | 6 +- pageserver/src/tenant/timeline/uninit.rs | 12 ++-- 17 files changed, 126 insertions(+), 111 deletions(-) diff --git a/pageserver/src/consumption_metrics.rs b/pageserver/src/consumption_metrics.rs index 7e8c00c293..8135240a4d 100644 --- a/pageserver/src/consumption_metrics.rs +++ b/pageserver/src/consumption_metrics.rs @@ -7,7 +7,7 @@ use crate::context::{DownloadBehavior, RequestContext}; use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}; use crate::tenant::size::CalculateSyntheticSizeError; use crate::tenant::tasks::BackgroundLoopKind; -use crate::tenant::{mgr::TenantManager, LogicalSizeCalculationCause, Tenant}; +use crate::tenant::{mgr::TenantManager, LogicalSizeCalculationCause, TenantShard}; use camino::Utf8PathBuf; use consumption_metrics::EventType; use itertools::Itertools as _; @@ -425,7 +425,7 @@ async fn calculate_synthetic_size_worker( } } -async fn calculate_and_log(tenant: &Tenant, cancel: &CancellationToken, ctx: &RequestContext) { +async fn calculate_and_log(tenant: &TenantShard, cancel: &CancellationToken, ctx: &RequestContext) { const CAUSE: LogicalSizeCalculationCause = LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize; diff --git a/pageserver/src/consumption_metrics/metrics.rs b/pageserver/src/consumption_metrics/metrics.rs index 07fac09f6f..2f87825373 100644 --- a/pageserver/src/consumption_metrics/metrics.rs +++ b/pageserver/src/consumption_metrics/metrics.rs @@ -253,7 +253,7 @@ pub(super) async fn collect_all_metrics( async fn collect(tenants: S, cache: &Cache, ctx: &RequestContext) -> Vec where - S: futures::stream::Stream)>, + S: futures::stream::Stream)>, { let mut current_metrics: Vec = Vec::new(); @@ -307,7 +307,7 @@ impl TenantSnapshot { /// /// `resident_size` is calculated of the timelines we had access to for other metrics, so we /// cannot just list timelines here. - fn collect(t: &Arc, resident_size: u64) -> Self { + fn collect(t: &Arc, resident_size: u64) -> Self { TenantSnapshot { resident_size, remote_size: t.remote_size(), diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 33b2d04588..d5ac8699ef 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -1761,7 +1761,7 @@ async fn update_tenant_config_handler( &ShardParameters::default(), ); - crate::tenant::Tenant::persist_tenant_config(state.conf, &tenant_shard_id, &location_conf) + crate::tenant::TenantShard::persist_tenant_config(state.conf, &tenant_shard_id, &location_conf) .await .map_err(|e| ApiError::InternalServerError(anyhow::anyhow!(e)))?; @@ -1802,7 +1802,7 @@ async fn patch_tenant_config_handler( &ShardParameters::default(), ); - crate::tenant::Tenant::persist_tenant_config(state.conf, &tenant_shard_id, &location_conf) + crate::tenant::TenantShard::persist_tenant_config(state.conf, &tenant_shard_id, &location_conf) .await .map_err(|e| ApiError::InternalServerError(anyhow::anyhow!(e)))?; diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index f6d758ad22..771f2a7b89 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -273,7 +273,7 @@ pub(crate) enum SpawnMode { /// /// Tenant consists of multiple timelines. Keep them in a hash table. /// -pub struct Tenant { +pub struct TenantShard { // Global pageserver config parameters pub conf: &'static PageServerConf, @@ -384,7 +384,7 @@ pub struct Tenant { l0_flush_global_state: L0FlushGlobalState, } -impl std::fmt::Debug for Tenant { +impl std::fmt::Debug for TenantShard { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{} ({})", self.tenant_shard_id, self.current_state()) } @@ -1099,7 +1099,7 @@ pub(crate) enum LoadConfigError { NotFound(Utf8PathBuf), } -impl Tenant { +impl TenantShard { /// Yet another helper for timeline initialization. /// /// - Initializes the Timeline struct and inserts it into the tenant's hash map @@ -1277,7 +1277,7 @@ impl Tenant { init_order: Option, mode: SpawnMode, ctx: &RequestContext, - ) -> Result, GlobalShutDown> { + ) -> Result, GlobalShutDown> { let wal_redo_manager = WalRedoManager::new(PostgresRedoManager::new(conf, tenant_shard_id))?; @@ -1291,7 +1291,7 @@ impl Tenant { let attach_mode = attached_conf.location.attach_mode; let generation = attached_conf.location.generation; - let tenant = Arc::new(Tenant::new( + let tenant = Arc::new(TenantShard::new( TenantState::Attaching, conf, attached_conf, @@ -1342,7 +1342,7 @@ impl Tenant { Info } let make_broken = - |t: &Tenant, err: anyhow::Error, verbosity: BrokenVerbosity| { + |t: &TenantShard, err: anyhow::Error, verbosity: BrokenVerbosity| { match verbosity { BrokenVerbosity::Info => { info!("attach cancelled, setting tenant state to Broken: {err}"); @@ -1565,7 +1565,7 @@ impl Tenant { /// No background tasks are started as part of this routine. /// async fn attach( - self: &Arc, + self: &Arc, preload: Option, ctx: &RequestContext, ) -> anyhow::Result<()> { @@ -1885,7 +1885,7 @@ impl Tenant { } async fn load_timelines_metadata( - self: &Arc, + self: &Arc, timeline_ids: HashSet, remote_storage: &GenericRemoteStorage, cancel: CancellationToken, @@ -1940,7 +1940,7 @@ impl Tenant { } fn load_timeline_metadata( - self: &Arc, + self: &Arc, timeline_id: TimelineId, remote_storage: GenericRemoteStorage, cancel: CancellationToken, @@ -2480,7 +2480,7 @@ impl Tenant { /// the same timeline ID already exists, returns CreateTimelineError::AlreadyExists. #[allow(clippy::too_many_arguments)] pub(crate) async fn create_timeline( - self: &Arc, + self: &Arc, params: CreateTimelineParams, broker_client: storage_broker::BrokerClientChannel, ctx: &RequestContext, @@ -2641,7 +2641,7 @@ impl Tenant { /// We only return an [`Arc`] here so the API handler can create a [`pageserver_api::models::TimelineInfo`] /// for the response. async fn create_timeline_import_pgdata( - self: &Arc, + self: &Arc, params: CreateTimelineParamsImportPgdata, activate: ActivateTimelineArgs, ctx: &RequestContext, @@ -2736,7 +2736,7 @@ impl Tenant { #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline.timeline_id))] async fn create_timeline_import_pgdata_task( - self: Arc, + self: Arc, timeline: Arc, index_part: import_pgdata::index_part_format::Root, activate: ActivateTimelineArgs, @@ -2762,7 +2762,7 @@ impl Tenant { } async fn create_timeline_import_pgdata_task_impl( - self: Arc, + self: Arc, timeline: Arc, index_part: import_pgdata::index_part_format::Root, activate: ActivateTimelineArgs, @@ -3761,7 +3761,7 @@ enum ActivateTimelineArgs { No, } -impl Tenant { +impl TenantShard { pub fn tenant_specific_overrides(&self) -> TenantConfOpt { self.tenant_conf.load().tenant_conf.clone() } @@ -4010,7 +4010,7 @@ impl Tenant { remote_storage: GenericRemoteStorage, deletion_queue_client: DeletionQueueClient, l0_flush_global_state: L0FlushGlobalState, - ) -> Tenant { + ) -> TenantShard { debug_assert!( !attached_conf.location.generation.is_none() || conf.control_plane_api.is_none() ); @@ -4070,7 +4070,7 @@ impl Tenant { } }); - Tenant { + TenantShard { tenant_shard_id, shard_identity, generation: attached_conf.location.generation, @@ -4104,7 +4104,7 @@ impl Tenant { cancel: CancellationToken::default(), gate: Gate::default(), pagestream_throttle: Arc::new(throttle::Throttle::new( - Tenant::get_pagestream_throttle_config(conf, &attached_conf.tenant_conf), + TenantShard::get_pagestream_throttle_config(conf, &attached_conf.tenant_conf), )), pagestream_throttle_metrics: Arc::new( crate::metrics::tenant_throttling::Pagestream::new(&tenant_shard_id), @@ -5570,7 +5570,7 @@ pub(crate) mod harness { info_span!("TenantHarness", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()) } - pub(crate) async fn load(&self) -> (Arc, RequestContext) { + pub(crate) async fn load(&self) -> (Arc, RequestContext) { let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); ( self.do_try_load(&ctx) @@ -5584,10 +5584,10 @@ pub(crate) mod harness { pub(crate) async fn do_try_load( &self, ctx: &RequestContext, - ) -> anyhow::Result> { + ) -> anyhow::Result> { let walredo_mgr = Arc::new(WalRedoManager::from(TestRedoManager)); - let tenant = Arc::new(Tenant::new( + let tenant = Arc::new(TenantShard::new( TenantState::Attaching, self.conf, AttachedTenantConf::try_from(LocationConf::attached_single( @@ -6368,7 +6368,7 @@ mod tests { } async fn bulk_insert_compact_gc( - tenant: &Tenant, + tenant: &TenantShard, timeline: &Arc, ctx: &RequestContext, lsn: Lsn, @@ -6380,7 +6380,7 @@ mod tests { } async fn bulk_insert_maybe_compact_gc( - tenant: &Tenant, + tenant: &TenantShard, timeline: &Arc, ctx: &RequestContext, mut lsn: Lsn, diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index e8b0d1d4dd..82b82e7ae9 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -44,7 +44,9 @@ use crate::tenant::config::{ use crate::tenant::span::debug_assert_current_span_has_tenant_id; use crate::tenant::storage_layer::inmemory_layer; use crate::tenant::timeline::ShutdownMode; -use crate::tenant::{AttachedTenantConf, GcError, LoadConfigError, SpawnMode, Tenant, TenantState}; +use crate::tenant::{ + AttachedTenantConf, GcError, LoadConfigError, SpawnMode, TenantShard, TenantState, +}; use crate::virtual_file::MaybeFatalIo; use crate::{InitializationOrder, TEMP_FILE_SUFFIX}; @@ -69,7 +71,7 @@ use super::{GlobalShutDown, TenantSharedResources}; /// having a properly acquired generation (Secondary doesn't need a generation) #[derive(Clone)] pub(crate) enum TenantSlot { - Attached(Arc), + Attached(Arc), Secondary(Arc), /// In this state, other administrative operations acting on the TenantId should /// block, or return a retry indicator equivalent to HTTP 503. @@ -88,7 +90,7 @@ impl std::fmt::Debug for TenantSlot { impl TenantSlot { /// Return the `Tenant` in this slot if attached, else None - fn get_attached(&self) -> Option<&Arc> { + fn get_attached(&self) -> Option<&Arc> { match self { Self::Attached(t) => Some(t), Self::Secondary(_) => None, @@ -166,7 +168,7 @@ impl TenantStartupMode { /// Result type for looking up a TenantId to a specific shard pub(crate) enum ShardResolveResult { NotFound, - Found(Arc), + Found(Arc), // Wait for this barrrier, then query again InProgress(utils::completion::Barrier), } @@ -175,7 +177,7 @@ impl TenantsMap { /// Convenience function for typical usage, where we want to get a `Tenant` object, for /// working with attached tenants. If the TenantId is in the map but in Secondary state, /// None is returned. - pub(crate) fn get(&self, tenant_shard_id: &TenantShardId) -> Option<&Arc> { + pub(crate) fn get(&self, tenant_shard_id: &TenantShardId) -> Option<&Arc> { match self { TenantsMap::Initializing => None, TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => { @@ -412,7 +414,7 @@ fn load_tenant_config( return None; } - Some(Tenant::load_tenant_config(conf, &tenant_shard_id)) + Some(TenantShard::load_tenant_config(conf, &tenant_shard_id)) } /// Initial stage of load: walk the local tenants directory, clean up any temp files, @@ -606,7 +608,8 @@ pub async fn init_tenant_mgr( // Presence of a generation number implies attachment: attach the tenant // if it wasn't already, and apply the generation number. config_write_futs.push(async move { - let r = Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf).await; + let r = + TenantShard::persist_tenant_config(conf, &tenant_shard_id, &location_conf).await; (tenant_shard_id, location_conf, r) }); } @@ -694,7 +697,7 @@ fn tenant_spawn( init_order: Option, mode: SpawnMode, ctx: &RequestContext, -) -> Result, GlobalShutDown> { +) -> Result, GlobalShutDown> { // All these conditions should have been satisfied by our caller: the tenant dir exists, is a well formed // path, and contains a configuration file. Assertions that do synchronous I/O are limited to debug mode // to avoid impacting prod runtime performance. @@ -705,7 +708,7 @@ fn tenant_spawn( .try_exists() .unwrap()); - Tenant::spawn( + TenantShard::spawn( conf, tenant_shard_id, resources, @@ -885,7 +888,7 @@ impl TenantManager { pub(crate) fn get_attached_tenant_shard( &self, tenant_shard_id: TenantShardId, - ) -> Result, GetTenantError> { + ) -> Result, GetTenantError> { let locked = self.tenants.read().unwrap(); let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)?; @@ -934,12 +937,12 @@ impl TenantManager { flush: Option, mut spawn_mode: SpawnMode, ctx: &RequestContext, - ) -> Result>, UpsertLocationError> { + ) -> Result>, UpsertLocationError> { debug_assert_current_span_has_tenant_id(); info!("configuring tenant location to state {new_location_config:?}"); enum FastPathModified { - Attached(Arc), + Attached(Arc), Secondary(Arc), } @@ -996,9 +999,13 @@ impl TenantManager { // phase of writing config and/or waiting for flush, before returning. match fast_path_taken { Some(FastPathModified::Attached(tenant)) => { - Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config) - .await - .fatal_err("write tenant shard config"); + TenantShard::persist_tenant_config( + self.conf, + &tenant_shard_id, + &new_location_config, + ) + .await + .fatal_err("write tenant shard config"); // Transition to AttachedStale means we may well hold a valid generation // still, and have been requested to go stale as part of a migration. If @@ -1027,9 +1034,13 @@ impl TenantManager { return Ok(Some(tenant)); } Some(FastPathModified::Secondary(_secondary_tenant)) => { - Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config) - .await - .fatal_err("write tenant shard config"); + TenantShard::persist_tenant_config( + self.conf, + &tenant_shard_id, + &new_location_config, + ) + .await + .fatal_err("write tenant shard config"); return Ok(None); } @@ -1119,7 +1130,7 @@ impl TenantManager { // Before activating either secondary or attached mode, persist the // configuration, so that on restart we will re-attach (or re-start // secondary) on the tenant. - Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config) + TenantShard::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config) .await .fatal_err("write tenant shard config"); @@ -1257,7 +1268,7 @@ impl TenantManager { let tenant_path = self.conf.tenant_path(&tenant_shard_id); let timelines_path = self.conf.timelines_path(&tenant_shard_id); - let config = Tenant::load_tenant_config(self.conf, &tenant_shard_id)?; + let config = TenantShard::load_tenant_config(self.conf, &tenant_shard_id)?; if drop_cache { tracing::info!("Dropping local file cache"); @@ -1292,7 +1303,7 @@ impl TenantManager { Ok(()) } - pub(crate) fn get_attached_active_tenant_shards(&self) -> Vec> { + pub(crate) fn get_attached_active_tenant_shards(&self) -> Vec> { let locked = self.tenants.read().unwrap(); match &*locked { TenantsMap::Initializing => Vec::new(), @@ -1441,7 +1452,7 @@ impl TenantManager { #[instrument(skip_all, fields(tenant_id=%tenant.get_tenant_shard_id().tenant_id, shard_id=%tenant.get_tenant_shard_id().shard_slug(), new_shard_count=%new_shard_count.literal()))] pub(crate) async fn shard_split( &self, - tenant: Arc, + tenant: Arc, new_shard_count: ShardCount, new_stripe_size: Option, ctx: &RequestContext, @@ -1471,7 +1482,7 @@ impl TenantManager { pub(crate) async fn do_shard_split( &self, - tenant: Arc, + tenant: Arc, new_shard_count: ShardCount, new_stripe_size: Option, ctx: &RequestContext, @@ -1697,7 +1708,7 @@ impl TenantManager { /// For each resident layer in the parent shard, we will hard link it into all of the child shards. async fn shard_split_hardlink( &self, - parent_shard: &Tenant, + parent_shard: &TenantShard, child_shards: Vec, ) -> anyhow::Result<()> { debug_assert_current_span_has_tenant_id(); @@ -1974,7 +1985,7 @@ impl TenantManager { } let tenant_path = self.conf.tenant_path(&tenant_shard_id); - let config = Tenant::load_tenant_config(self.conf, &tenant_shard_id) + let config = TenantShard::load_tenant_config(self.conf, &tenant_shard_id) .map_err(|e| Error::DetachReparent(e.into()))?; let shard_identity = config.shard; diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 47c4a8637d..2f279f0236 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -2640,7 +2640,7 @@ mod tests { config::AttachmentMode, harness::{TenantHarness, TIMELINE_ID}, storage_layer::layer::local_layer_path, - Tenant, Timeline, + TenantShard, Timeline, }, DEFAULT_PG_VERSION, }; @@ -2698,7 +2698,7 @@ mod tests { struct TestSetup { harness: TenantHarness, - tenant: Arc, + tenant: Arc, timeline: Arc, tenant_ctx: RequestContext, } diff --git a/pageserver/src/tenant/secondary/heatmap_uploader.rs b/pageserver/src/tenant/secondary/heatmap_uploader.rs index c5e5e04945..d49551e26c 100644 --- a/pageserver/src/tenant/secondary/heatmap_uploader.rs +++ b/pageserver/src/tenant/secondary/heatmap_uploader.rs @@ -14,7 +14,7 @@ use crate::{ remote_timeline_client::remote_heatmap_path, span::debug_assert_current_span_has_tenant_id, tasks::{warn_when_period_overrun, BackgroundLoopKind}, - Tenant, + TenantShard, }, }; @@ -79,7 +79,7 @@ impl RunningJob for WriteInProgress { } struct UploadPending { - tenant: Arc, + tenant: Arc, last_upload: Option, target_time: Option, period: Option, @@ -111,7 +111,7 @@ impl scheduler::Completion for WriteComplete { struct UploaderTenantState { // This Weak only exists to enable culling idle instances of this type // when the Tenant has been deallocated. - tenant: Weak, + tenant: Weak, /// Digest of the serialized heatmap that we last successfully uploaded last_upload_state: Option, @@ -362,7 +362,7 @@ struct LastUploadState { /// of the object we would have uploaded. async fn upload_tenant_heatmap( remote_storage: GenericRemoteStorage, - tenant: &Arc, + tenant: &Arc, last_upload: Option, ) -> Result { debug_assert_current_span_has_tenant_id(); diff --git a/pageserver/src/tenant/size.rs b/pageserver/src/tenant/size.rs index 6c3276ea3c..d6b6ae4261 100644 --- a/pageserver/src/tenant/size.rs +++ b/pageserver/src/tenant/size.rs @@ -11,7 +11,7 @@ use tokio_util::sync::CancellationToken; use crate::context::RequestContext; use crate::pgdatadir_mapping::CalculateLogicalSizeError; -use super::{GcError, LogicalSizeCalculationCause, Tenant}; +use super::{GcError, LogicalSizeCalculationCause, TenantShard}; use crate::tenant::{MaybeOffloaded, Timeline}; use utils::id::TimelineId; use utils::lsn::Lsn; @@ -159,7 +159,7 @@ pub struct TimelineInputs { /// initdb_lsn branchpoints* next_pitr_cutoff latest /// ``` pub(super) async fn gather_inputs( - tenant: &Tenant, + tenant: &TenantShard, limit: &Arc, max_retention_period: Option, logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>, diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index ade1b794c6..a1d8d77a48 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -1618,7 +1618,7 @@ pub(crate) mod test { use crate::tenant::harness::TIMELINE_ID; use crate::tenant::storage_layer::{Layer, ResidentLayer}; use crate::tenant::vectored_blob_io::StreamingVectoredReadPlanner; - use crate::tenant::{Tenant, Timeline}; + use crate::tenant::{TenantShard, Timeline}; use crate::{ context::DownloadBehavior, task_mgr::TaskKind, @@ -2214,7 +2214,7 @@ pub(crate) mod test { } pub(crate) async fn produce_delta_layer( - tenant: &Tenant, + tenant: &TenantShard, tline: &Arc, mut deltas: Vec<(Key, Lsn, Value)>, ctx: &RequestContext, diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 0d3c9d5a44..3098d04d83 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -1141,7 +1141,7 @@ mod test { harness::{TenantHarness, TIMELINE_ID}, storage_layer::{Layer, ResidentLayer}, vectored_blob_io::StreamingVectoredReadPlanner, - Tenant, Timeline, + TenantShard, Timeline, }, DEFAULT_PG_VERSION, }; @@ -1324,7 +1324,7 @@ mod test { } async fn produce_image_layer( - tenant: &Tenant, + tenant: &TenantShard, tline: &Arc, mut images: Vec<(Key, Bytes)>, lsn: Lsn, diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 0118a5ce5f..e93804197a 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -12,7 +12,7 @@ use crate::task_mgr; use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME}; use crate::tenant::throttle::Stats; use crate::tenant::timeline::CompactionError; -use crate::tenant::{Tenant, TenantState}; +use crate::tenant::{TenantShard, TenantState}; use rand::Rng; use tokio_util::sync::CancellationToken; use tracing::*; @@ -81,7 +81,7 @@ pub(crate) async fn concurrent_background_tasks_rate_limit_permit( /// Start per tenant background loops: compaction and gc. pub fn start_background_loops( - tenant: &Arc, + tenant: &Arc, background_jobs_can_start: Option<&completion::Barrier>, ) { let tenant_shard_id = tenant.tenant_shard_id; @@ -158,7 +158,7 @@ pub fn start_background_loops( /// /// Compaction task's main loop /// -async fn compaction_loop(tenant: Arc, cancel: CancellationToken) { +async fn compaction_loop(tenant: Arc, cancel: CancellationToken) { const MAX_BACKOFF_SECS: f64 = 300.0; // How many errors we have seen consequtively let mut error_run_count = 0; @@ -318,7 +318,7 @@ fn log_compaction_error( /// /// GC task's main loop /// -async fn gc_loop(tenant: Arc, cancel: CancellationToken) { +async fn gc_loop(tenant: Arc, cancel: CancellationToken) { const MAX_BACKOFF_SECS: f64 = 300.0; // How many errors we have seen consequtively let mut error_run_count = 0; @@ -418,7 +418,7 @@ async fn gc_loop(tenant: Arc, cancel: CancellationToken) { TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc(); } -async fn ingest_housekeeping_loop(tenant: Arc, cancel: CancellationToken) { +async fn ingest_housekeeping_loop(tenant: Arc, cancel: CancellationToken) { TENANT_TASK_EVENTS.with_label_values(&["start"]).inc(); async { let mut last_throttle_flag_reset_at = Instant::now(); @@ -496,7 +496,7 @@ async fn ingest_housekeeping_loop(tenant: Arc, cancel: CancellationToken TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc(); } -async fn wait_for_active_tenant(tenant: &Arc) -> ControlFlow<()> { +async fn wait_for_active_tenant(tenant: &Arc) -> ControlFlow<()> { // if the tenant has a proper status already, no need to wait for anything if tenant.current_state() == TenantState::Active { ControlFlow::Continue(()) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index d6ae11e67d..ad638bdc95 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1700,7 +1700,7 @@ impl Timeline { pub(crate) fn activate( self: &Arc, - parent: Arc, + parent: Arc, broker_client: BrokerClientChannel, background_jobs_can_start: Option<&completion::Barrier>, ctx: &RequestContext, @@ -4575,7 +4575,7 @@ impl Timeline { /// from our ancestor to be branches of this timeline. pub(crate) async fn prepare_to_detach_from_ancestor( self: &Arc, - tenant: &crate::tenant::Tenant, + tenant: &crate::tenant::TenantShard, options: detach_ancestor::Options, ctx: &RequestContext, ) -> Result { @@ -4593,7 +4593,7 @@ impl Timeline { /// resetting the tenant. pub(crate) async fn detach_from_ancestor_and_reparent( self: &Arc, - tenant: &crate::tenant::Tenant, + tenant: &crate::tenant::TenantShard, prepared: detach_ancestor::PreparedTimelineDetach, ctx: &RequestContext, ) -> Result { @@ -4605,7 +4605,7 @@ impl Timeline { /// The tenant must've been reset if ancestry was modified previously (in tenant manager). pub(crate) async fn complete_detaching_timeline_ancestor( self: &Arc, - tenant: &crate::tenant::Tenant, + tenant: &crate::tenant::TenantShard, attempt: detach_ancestor::Attempt, ctx: &RequestContext, ) -> Result<(), detach_ancestor::Error> { @@ -5609,14 +5609,14 @@ impl Timeline { /// Persistently blocks gc for `Manual` reason. /// /// Returns true if no such block existed before, false otherwise. - pub(crate) async fn block_gc(&self, tenant: &super::Tenant) -> anyhow::Result { + pub(crate) async fn block_gc(&self, tenant: &super::TenantShard) -> anyhow::Result { use crate::tenant::remote_timeline_client::index::GcBlockingReason; assert_eq!(self.tenant_shard_id, tenant.tenant_shard_id); tenant.gc_block.insert(self, GcBlockingReason::Manual).await } /// Persistently unblocks gc for `Manual` reason. - pub(crate) async fn unblock_gc(&self, tenant: &super::Tenant) -> anyhow::Result<()> { + pub(crate) async fn unblock_gc(&self, tenant: &super::TenantShard) -> anyhow::Result<()> { use crate::tenant::remote_timeline_client::index::GcBlockingReason; assert_eq!(self.tenant_shard_id, tenant.tenant_shard_id); tenant.gc_block.remove(self, GcBlockingReason::Manual).await diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index bdc315d985..81e25aaefa 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -16,8 +16,8 @@ use crate::{ tenant::{ metadata::TimelineMetadata, remote_timeline_client::{PersistIndexPartWithDeletedFlagError, RemoteTimelineClient}, - CreateTimelineCause, DeleteTimelineError, MaybeDeletedIndexPart, Tenant, - TenantManifestError, TimelineOrOffloaded, + CreateTimelineCause, DeleteTimelineError, MaybeDeletedIndexPart, TenantManifestError, + TenantShard, TimelineOrOffloaded, }, virtual_file::MaybeFatalIo, }; @@ -114,7 +114,7 @@ pub(super) async fn delete_local_timeline_directory( /// It is important that this gets called when DeletionGuard is being held. /// For more context see comments in [`DeleteTimelineFlow::prepare`] async fn remove_maybe_offloaded_timeline_from_tenant( - tenant: &Tenant, + tenant: &TenantShard, timeline: &TimelineOrOffloaded, _: &DeletionGuard, // using it as a witness ) -> anyhow::Result<()> { @@ -188,7 +188,7 @@ impl DeleteTimelineFlow { // error out if some of the shutdown tasks have already been completed! #[instrument(skip_all)] pub async fn run( - tenant: &Arc, + tenant: &Arc, timeline_id: TimelineId, ) -> Result<(), DeleteTimelineError> { super::debug_assert_current_span_has_tenant_and_timeline_id(); @@ -286,7 +286,7 @@ impl DeleteTimelineFlow { /// Shortcut to create Timeline in stopping state and spawn deletion task. #[instrument(skip_all, fields(%timeline_id))] pub(crate) async fn resume_deletion( - tenant: Arc, + tenant: Arc, timeline_id: TimelineId, local_metadata: &TimelineMetadata, remote_client: RemoteTimelineClient, @@ -334,7 +334,7 @@ impl DeleteTimelineFlow { } pub(super) fn prepare( - tenant: &Tenant, + tenant: &TenantShard, timeline_id: TimelineId, allow_offloaded_children: bool, set_stopping: bool, @@ -405,7 +405,7 @@ impl DeleteTimelineFlow { fn schedule_background( guard: DeletionGuard, conf: &'static PageServerConf, - tenant: Arc, + tenant: Arc, timeline: TimelineOrOffloaded, remote_client: Arc, ) { @@ -439,7 +439,7 @@ impl DeleteTimelineFlow { async fn background( mut guard: DeletionGuard, conf: &PageServerConf, - tenant: &Tenant, + tenant: &TenantShard, timeline: &TimelineOrOffloaded, remote_client: Arc, ) -> Result<(), DeleteTimelineError> { diff --git a/pageserver/src/tenant/timeline/detach_ancestor.rs b/pageserver/src/tenant/timeline/detach_ancestor.rs index f8bc4352e2..8a51d45c72 100644 --- a/pageserver/src/tenant/timeline/detach_ancestor.rs +++ b/pageserver/src/tenant/timeline/detach_ancestor.rs @@ -7,7 +7,7 @@ use crate::{ tenant::{ remote_timeline_client::index::GcBlockingReason::DetachAncestor, storage_layer::{AsLayerDesc as _, DeltaLayerWriter, Layer, ResidentLayer}, - Tenant, + TenantShard, }, virtual_file::{MaybeFatalIo, VirtualFile}, }; @@ -159,7 +159,7 @@ impl Attempt { /// See [`Timeline::prepare_to_detach_from_ancestor`] pub(super) async fn prepare( detached: &Arc, - tenant: &Tenant, + tenant: &TenantShard, options: Options, ctx: &RequestContext, ) -> Result { @@ -410,7 +410,7 @@ pub(super) async fn prepare( Ok(Progress::Prepared(attempt, prepared)) } -async fn start_new_attempt(detached: &Timeline, tenant: &Tenant) -> Result { +async fn start_new_attempt(detached: &Timeline, tenant: &TenantShard) -> Result { let attempt = obtain_exclusive_attempt(detached, tenant)?; // insert the block in the index_part.json, if not already there. @@ -426,13 +426,16 @@ async fn start_new_attempt(detached: &Timeline, tenant: &Tenant) -> Result Result { +async fn continue_with_blocked_gc( + detached: &Timeline, + tenant: &TenantShard, +) -> Result { // FIXME: it would be nice to confirm that there is an in-memory version, since we've just // verified there is a persistent one? obtain_exclusive_attempt(detached, tenant) } -fn obtain_exclusive_attempt(detached: &Timeline, tenant: &Tenant) -> Result { +fn obtain_exclusive_attempt(detached: &Timeline, tenant: &TenantShard) -> Result { use Error::{OtherTimelineDetachOngoing, ShuttingDown}; // ensure we are the only active attempt for this tenant @@ -460,7 +463,7 @@ fn obtain_exclusive_attempt(detached: &Timeline, tenant: &Tenant) -> Result, - tenant: &Tenant, + tenant: &TenantShard, ) -> Result, Error> { let mut all_direct_children = tenant .timelines @@ -698,7 +701,7 @@ impl DetachingAndReparenting { /// See [`Timeline::detach_from_ancestor_and_reparent`]. pub(super) async fn detach_and_reparent( detached: &Arc, - tenant: &Tenant, + tenant: &TenantShard, prepared: PreparedTimelineDetach, _ctx: &RequestContext, ) -> Result { @@ -901,7 +904,7 @@ pub(super) async fn detach_and_reparent( pub(super) async fn complete( detached: &Arc, - tenant: &Tenant, + tenant: &TenantShard, mut attempt: Attempt, _ctx: &RequestContext, ) -> Result<(), Error> { @@ -970,7 +973,7 @@ where } fn check_no_archived_children_of_ancestor( - tenant: &Tenant, + tenant: &TenantShard, detached: &Arc, ancestor: &Arc, ancestor_lsn: Lsn, diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 26c2861b93..517bea6de9 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -31,7 +31,8 @@ use crate::{ task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}, tenant::{ size::CalculateSyntheticSizeError, storage_layer::LayerVisibilityHint, - tasks::BackgroundLoopKind, timeline::EvictionError, LogicalSizeCalculationCause, Tenant, + tasks::BackgroundLoopKind, timeline::EvictionError, LogicalSizeCalculationCause, + TenantShard, }, }; @@ -52,7 +53,7 @@ pub struct EvictionTaskTenantState { impl Timeline { pub(super) fn launch_eviction_task( self: &Arc, - parent: Arc, + parent: Arc, background_tasks_can_start: Option<&completion::Barrier>, ) { let self_clone = Arc::clone(self); @@ -79,7 +80,7 @@ impl Timeline { } #[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))] - async fn eviction_task(self: Arc, tenant: Arc) { + async fn eviction_task(self: Arc, tenant: Arc) { use crate::tenant::tasks::random_init_delay; // acquire the gate guard only once within a useful span @@ -123,7 +124,7 @@ impl Timeline { #[instrument(skip_all, fields(policy_kind = policy.discriminant_str()))] async fn eviction_iteration( self: &Arc, - tenant: &Tenant, + tenant: &TenantShard, policy: &EvictionPolicy, cancel: &CancellationToken, gate: &GateGuard, @@ -180,7 +181,7 @@ impl Timeline { async fn eviction_iteration_threshold( self: &Arc, - tenant: &Tenant, + tenant: &TenantShard, p: &EvictionPolicyLayerAccessThreshold, cancel: &CancellationToken, gate: &GateGuard, @@ -314,7 +315,7 @@ impl Timeline { /// disk usage based eviction task. async fn imitiate_only( self: &Arc, - tenant: &Tenant, + tenant: &TenantShard, p: &EvictionPolicyLayerAccessThreshold, cancel: &CancellationToken, gate: &GateGuard, @@ -370,7 +371,7 @@ impl Timeline { #[instrument(skip_all)] async fn imitate_layer_accesses( &self, - tenant: &Tenant, + tenant: &TenantShard, p: &EvictionPolicyLayerAccessThreshold, cancel: &CancellationToken, gate: &GateGuard, @@ -506,7 +507,7 @@ impl Timeline { #[instrument(skip_all)] async fn imitate_synthetic_size_calculation_worker( &self, - tenant: &Tenant, + tenant: &TenantShard, cancel: &CancellationToken, ctx: &RequestContext, ) { diff --git a/pageserver/src/tenant/timeline/offload.rs b/pageserver/src/tenant/timeline/offload.rs index 6c6b19e8b1..e617cd7392 100644 --- a/pageserver/src/tenant/timeline/offload.rs +++ b/pageserver/src/tenant/timeline/offload.rs @@ -6,7 +6,7 @@ use super::delete::{delete_local_timeline_directory, DeleteTimelineFlow, Deletio use super::Timeline; use crate::span::debug_assert_current_span_has_tenant_and_timeline_id; use crate::tenant::remote_timeline_client::ShutdownIfArchivedError; -use crate::tenant::{OffloadedTimeline, Tenant, TenantManifestError, TimelineOrOffloaded}; +use crate::tenant::{OffloadedTimeline, TenantManifestError, TenantShard, TimelineOrOffloaded}; #[derive(thiserror::Error, Debug)] pub(crate) enum OffloadError { @@ -30,7 +30,7 @@ impl From for OffloadError { } pub(crate) async fn offload_timeline( - tenant: &Tenant, + tenant: &TenantShard, timeline: &Arc, ) -> Result<(), OffloadError> { debug_assert_current_span_has_tenant_and_timeline_id(); @@ -110,7 +110,7 @@ pub(crate) async fn offload_timeline( /// /// Returns the strong count of the timeline `Arc` fn remove_timeline_from_tenant( - tenant: &Tenant, + tenant: &TenantShard, timeline: &Timeline, _: &DeletionGuard, // using it as a witness ) -> usize { diff --git a/pageserver/src/tenant/timeline/uninit.rs b/pageserver/src/tenant/timeline/uninit.rs index 80a09b4840..fe1aee85d5 100644 --- a/pageserver/src/tenant/timeline/uninit.rs +++ b/pageserver/src/tenant/timeline/uninit.rs @@ -8,7 +8,7 @@ use utils::{fs_ext, id::TimelineId, lsn::Lsn, sync::gate::GateGuard}; use crate::{ context::RequestContext, import_datadir, - tenant::{CreateTimelineIdempotency, Tenant, TimelineOrOffloaded}, + tenant::{CreateTimelineIdempotency, TenantShard, TimelineOrOffloaded}, }; use super::Timeline; @@ -21,14 +21,14 @@ use super::Timeline; /// The caller is responsible for proper timeline data filling before the final init. #[must_use] pub struct UninitializedTimeline<'t> { - pub(crate) owning_tenant: &'t Tenant, + pub(crate) owning_tenant: &'t TenantShard, timeline_id: TimelineId, raw_timeline: Option<(Arc, TimelineCreateGuard)>, } impl<'t> UninitializedTimeline<'t> { pub(crate) fn new( - owning_tenant: &'t Tenant, + owning_tenant: &'t TenantShard, timeline_id: TimelineId, raw_timeline: Option<(Arc, TimelineCreateGuard)>, ) -> Self { @@ -94,7 +94,7 @@ impl<'t> UninitializedTimeline<'t> { /// Prepares timeline data by loading it from the basebackup archive. pub(crate) async fn import_basebackup_from_tar( self, - tenant: Arc, + tenant: Arc, copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin), base_lsn: Lsn, broker_client: storage_broker::BrokerClientChannel, @@ -173,7 +173,7 @@ pub(crate) fn cleanup_timeline_directory(create_guard: TimelineCreateGuard) { #[must_use] pub(crate) struct TimelineCreateGuard { pub(crate) _tenant_gate_guard: GateGuard, - pub(crate) owning_tenant: Arc, + pub(crate) owning_tenant: Arc, pub(crate) timeline_id: TimelineId, pub(crate) timeline_path: Utf8PathBuf, pub(crate) idempotency: CreateTimelineIdempotency, @@ -199,7 +199,7 @@ pub(crate) enum TimelineExclusionError { impl TimelineCreateGuard { pub(crate) fn new( - owning_tenant: &Arc, + owning_tenant: &Arc, timeline_id: TimelineId, timeline_path: Utf8PathBuf, idempotency: CreateTimelineIdempotency,