From f1fc1fd639a2b061c6f1b5c2cb116eb3f4741d5b Mon Sep 17 00:00:00 2001 From: John Spray Date: Mon, 11 Dec 2023 15:52:33 +0000 Subject: [PATCH] pageserver: further refactoring from TenantId to TenantShardId (#6059) ## Problem In https://github.com/neondatabase/neon/pull/5957, the most essential types were updated to use TenantShardId rather than TenantId. That unblocked other work, but didn't fully enable running multiple shards from the same tenant on the same pageserver. ## Summary of changes - Use TenantShardId in page cache key for materialized pages - Update mgr.rs get_tenant() and list_tenants() functions to use a shard id, and update all callers. - Eliminate the exactly_one_or_none helper in mgr.rs and all code that used it - Convert timeline HTTP routes to use tenant_shard_id Note on page cache: ``` struct MaterializedPageHashKey { /// Why is this TenantShardId rather than TenantId? /// /// Usually, the materialized value of a page@lsn is identical on any shard in the same tenant. However, this /// this not the case for certain internally-generated pages (e.g. relation sizes). In future, we may make this /// key smaller by omitting the shard, if we ensure that reads to such pages always skip the cache, or are /// special-cased in some other way. tenant_shard_id: TenantShardId, timeline_id: TimelineId, key: Key, } ``` --- control_plane/src/bin/neon_local.rs | 2 +- control_plane/src/tenant_migration.rs | 2 +- libs/pageserver_api/src/models.rs | 8 +- libs/pageserver_api/src/shard.rs | 5 + pageserver/src/consumption_metrics.rs | 14 +- pageserver/src/consumption_metrics/metrics.rs | 10 +- pageserver/src/http/routes.rs | 222 ++++++++++-------- pageserver/src/metrics.rs | 38 ++- pageserver/src/page_cache.rs | 24 +- pageserver/src/task_mgr.rs | 41 ++-- pageserver/src/tenant.rs | 4 +- pageserver/src/tenant/delete.rs | 4 +- pageserver/src/tenant/mgr.rs | 104 +++----- .../src/tenant/remote_timeline_client.rs | 2 +- pageserver/src/tenant/storage_layer/layer.rs | 2 +- pageserver/src/tenant/tasks.rs | 14 +- pageserver/src/tenant/timeline.rs | 49 ++-- pageserver/src/tenant/timeline/delete.rs | 6 +- .../src/tenant/timeline/eviction_task.rs | 4 +- pageserver/src/tenant/timeline/walreceiver.rs | 21 +- .../walreceiver/walreceiver_connection.rs | 2 +- 21 files changed, 299 insertions(+), 279 deletions(-) diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 8d53a6a658..6f0b929ac6 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -168,7 +168,7 @@ fn print_timelines_tree( info: t.clone(), children: BTreeSet::new(), name: timeline_name_mappings - .remove(&TenantTimelineId::new(t.tenant_id, t.timeline_id)), + .remove(&TenantTimelineId::new(t.tenant_id.tenant_id, t.timeline_id)), }, ) }) diff --git a/control_plane/src/tenant_migration.rs b/control_plane/src/tenant_migration.rs index c0c44e279f..fbb0358158 100644 --- a/control_plane/src/tenant_migration.rs +++ b/control_plane/src/tenant_migration.rs @@ -165,7 +165,7 @@ pub fn migrate_tenant( let found = other_ps_tenants .into_iter() .map(|t| t.id) - .any(|i| i == tenant_id); + .any(|i| i.tenant_id == tenant_id); if !found { continue; } diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 495a58e865..2572bcf74f 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -357,7 +357,7 @@ pub enum TenantAttachmentStatus { #[derive(Serialize, Deserialize, Clone)] pub struct TenantInfo { - pub id: TenantId, + pub id: TenantShardId, // NB: intentionally not part of OpenAPI, we don't want to commit to a specific set of TenantState's pub state: TenantState, /// Sum of the size of all layer files. @@ -369,7 +369,7 @@ pub struct TenantInfo { /// This represents the output of the "timeline_detail" and "timeline_list" API calls. #[derive(Debug, Serialize, Deserialize, Clone)] pub struct TimelineInfo { - pub tenant_id: TenantId, + pub tenant_id: TenantShardId, pub timeline_id: TimelineId, pub ancestor_timeline_id: Option, @@ -823,7 +823,7 @@ mod tests { fn test_tenantinfo_serde() { // Test serialization/deserialization of TenantInfo let original_active = TenantInfo { - id: TenantId::generate(), + id: TenantShardId::unsharded(TenantId::generate()), state: TenantState::Active, current_physical_size: Some(42), attachment_status: TenantAttachmentStatus::Attached, @@ -840,7 +840,7 @@ mod tests { }); let original_broken = TenantInfo { - id: TenantId::generate(), + id: TenantShardId::unsharded(TenantId::generate()), state: TenantState::Broken { reason: "reason".into(), backtrace: "backtrace info".into(), diff --git a/libs/pageserver_api/src/shard.rs b/libs/pageserver_api/src/shard.rs index 9e83e0eee2..052fbd1402 100644 --- a/libs/pageserver_api/src/shard.rs +++ b/libs/pageserver_api/src/shard.rs @@ -76,6 +76,11 @@ impl TenantShardId { pub fn shard_slug(&self) -> impl std::fmt::Display + '_ { ShardSlug(self) } + + /// Convenience for code that has special behavior on the 0th shard. + pub fn is_zero(&self) -> bool { + self.shard_number == ShardNumber(0) + } } /// Formatting helper diff --git a/pageserver/src/consumption_metrics.rs b/pageserver/src/consumption_metrics.rs index 7ad6a0f890..bb13bdd5e5 100644 --- a/pageserver/src/consumption_metrics.rs +++ b/pageserver/src/consumption_metrics.rs @@ -269,12 +269,18 @@ async fn calculate_synthetic_size_worker( } }; - for (tenant_id, tenant_state) in tenants { + for (tenant_shard_id, tenant_state) in tenants { if tenant_state != TenantState::Active { continue; } - if let Ok(tenant) = mgr::get_tenant(tenant_id, true) { + if !tenant_shard_id.is_zero() { + // We only send consumption metrics from shard 0, so don't waste time calculating + // synthetic size on other shards. + continue; + } + + if let Ok(tenant) = mgr::get_tenant(tenant_shard_id, true) { // TODO should we use concurrent_background_tasks_rate_limit() here, like the other background tasks? // We can put in some prioritization for consumption metrics. // Same for the loop that fetches computed metrics. @@ -286,7 +292,9 @@ async fn calculate_synthetic_size_worker( { return Ok(()); } - error!("failed to calculate synthetic size for tenant {tenant_id}: {e:#}"); + error!( + "failed to calculate synthetic size for tenant {tenant_shard_id}: {e:#}" + ); } } } diff --git a/pageserver/src/consumption_metrics/metrics.rs b/pageserver/src/consumption_metrics/metrics.rs index 918e45ea9e..0b827816bc 100644 --- a/pageserver/src/consumption_metrics/metrics.rs +++ b/pageserver/src/consumption_metrics/metrics.rs @@ -2,7 +2,6 @@ use crate::{context::RequestContext, tenant::timeline::logical_size::CurrentLogi use chrono::{DateTime, Utc}; use consumption_metrics::EventType; use futures::stream::StreamExt; -use pageserver_api::shard::ShardNumber; use std::{sync::Arc, time::SystemTime}; use utils::{ id::{TenantId, TimelineId}, @@ -198,12 +197,12 @@ pub(super) async fn collect_all_metrics( }; let tenants = futures::stream::iter(tenants).filter_map(|(id, state)| async move { - if state != TenantState::Active { + if state != TenantState::Active || !id.is_zero() { None } else { crate::tenant::mgr::get_tenant(id, true) .ok() - .map(|tenant| (id, tenant)) + .map(|tenant| (id.tenant_id, tenant)) } }); @@ -229,11 +228,6 @@ where while let Some((tenant_id, tenant)) = tenants.next().await { let mut tenant_resident_size = 0; - // Sharded tenants report all consumption metrics from shard zero - if tenant.tenant_shard_id().shard_number != ShardNumber(0) { - continue; - } - for timeline in tenant.list_timelines() { let timeline_id = timeline.timeline_id; diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 9e41d912c2..b9b0250671 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -352,8 +352,7 @@ async fn build_timeline_info_common( let walreceiver_status = timeline.walreceiver_status(); let info = TimelineInfo { - // TODO(sharding): add a shard_id field, or make tenant_id into a tenant_shard_id - tenant_id: timeline.tenant_shard_id.tenant_id, + tenant_id: timeline.tenant_shard_id, timeline_id: timeline.timeline_id, ancestor_timeline_id, ancestor_lsn, @@ -480,15 +479,15 @@ async fn timeline_list_handler( request: Request, _cancel: CancellationToken, ) -> Result, ApiError> { - let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; let include_non_incremental_logical_size: Option = parse_query_param(&request, "include-non-incremental-logical-size")?; - check_permission(&request, Some(tenant_id))?; + check_permission(&request, Some(tenant_shard_id.tenant_id))?; let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let response_data = async { - let tenant = mgr::get_tenant(tenant_id, true)?; + let tenant = mgr::get_tenant(tenant_shard_id, true)?; let timelines = tenant.list_timelines(); let mut response_data = Vec::with_capacity(timelines.len()); @@ -507,7 +506,9 @@ async fn timeline_list_handler( } Ok::, ApiError>(response_data) } - .instrument(info_span!("timeline_list", %tenant_id)) + .instrument(info_span!("timeline_list", + tenant_id = %tenant_shard_id.tenant_id, + shard_id = %tenant_shard_id.shard_slug())) .await?; json_response(StatusCode::OK, response_data) @@ -517,17 +518,17 @@ async fn timeline_detail_handler( request: Request, _cancel: CancellationToken, ) -> Result, ApiError> { - let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?; let include_non_incremental_logical_size: Option = parse_query_param(&request, "include-non-incremental-logical-size")?; - check_permission(&request, Some(tenant_id))?; + check_permission(&request, Some(tenant_shard_id.tenant_id))?; // Logical size calculation needs downloading. let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let timeline_info = async { - let tenant = mgr::get_tenant(tenant_id, true)?; + let tenant = mgr::get_tenant(tenant_shard_id, true)?; let timeline = tenant .get_timeline(timeline_id, false) @@ -544,7 +545,10 @@ async fn timeline_detail_handler( Ok::<_, ApiError>(timeline_info) } - .instrument(info_span!("timeline_detail", %tenant_id, %timeline_id)) + .instrument(info_span!("timeline_detail", + tenant_id = %tenant_shard_id.tenant_id, + shard_id = %tenant_shard_id.shard_slug(), + %timeline_id)) .await?; json_response(StatusCode::OK, timeline_info) @@ -554,8 +558,15 @@ async fn get_lsn_by_timestamp_handler( request: Request, cancel: CancellationToken, ) -> Result, ApiError> { - let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; - check_permission(&request, Some(tenant_id))?; + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; + check_permission(&request, Some(tenant_shard_id.tenant_id))?; + + if !tenant_shard_id.is_zero() { + // Requires SLRU contents, which are only stored on shard zero + return Err(ApiError::BadRequest(anyhow!( + "Size calculations are only available on shard zero" + ))); + } let version: Option = parse_query_param(&request, "version")?; @@ -567,7 +578,7 @@ async fn get_lsn_by_timestamp_handler( let timestamp_pg = postgres_ffi::to_pg_timestamp(timestamp); let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); - let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?; + let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?; let result = timeline .find_lsn_for_timestamp(timestamp_pg, &cancel, &ctx) .await?; @@ -602,8 +613,15 @@ async fn get_timestamp_of_lsn_handler( request: Request, _cancel: CancellationToken, ) -> Result, ApiError> { - let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; - check_permission(&request, Some(tenant_id))?; + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; + check_permission(&request, Some(tenant_shard_id.tenant_id))?; + + if !tenant_shard_id.is_zero() { + // Requires SLRU contents, which are only stored on shard zero + return Err(ApiError::BadRequest(anyhow!( + "Size calculations are only available on shard zero" + ))); + } let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?; @@ -613,7 +631,7 @@ async fn get_timestamp_of_lsn_handler( .map_err(ApiError::BadRequest)?; let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); - let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?; + let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?; let result = timeline.get_timestamp_for_lsn(lsn, &ctx).await?; match result { @@ -805,11 +823,11 @@ async fn tenant_status( request: Request, _cancel: CancellationToken, ) -> Result, ApiError> { - let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; - check_permission(&request, Some(tenant_id))?; + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; + check_permission(&request, Some(tenant_shard_id.tenant_id))?; let tenant_info = async { - let tenant = mgr::get_tenant(tenant_id, false)?; + let tenant = mgr::get_tenant(tenant_shard_id, false)?; // Calculate total physical size of all timelines let mut current_physical_size = 0; @@ -819,13 +837,15 @@ async fn tenant_status( let state = tenant.current_state(); Result::<_, ApiError>::Ok(TenantInfo { - id: tenant_id, + id: tenant_shard_id, state: state.clone(), current_physical_size: Some(current_physical_size), attachment_status: state.attachment_status(), }) } - .instrument(info_span!("tenant_status_handler", %tenant_id)) + .instrument(info_span!("tenant_status_handler", + tenant_id = %tenant_shard_id.tenant_id, + shard_id = %tenant_shard_id.shard_slug())) .await?; json_response(StatusCode::OK, tenant_info) @@ -868,14 +888,20 @@ async fn tenant_size_handler( request: Request, cancel: CancellationToken, ) -> Result, ApiError> { - let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; - check_permission(&request, Some(tenant_id))?; + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; + check_permission(&request, Some(tenant_shard_id.tenant_id))?; let inputs_only: Option = parse_query_param(&request, "inputs_only")?; let retention_period: Option = parse_query_param(&request, "retention_period")?; let headers = request.headers(); let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); - let tenant = mgr::get_tenant(tenant_id, true)?; + let tenant = mgr::get_tenant(tenant_shard_id, true)?; + + if !tenant_shard_id.is_zero() { + return Err(ApiError::BadRequest(anyhow!( + "Size calculations are only available on shard zero" + ))); + } // this can be long operation let inputs = tenant @@ -927,7 +953,7 @@ async fn tenant_size_handler( json_response( StatusCode::OK, TenantHistorySize { - id: tenant_id, + id: tenant_shard_id.tenant_id, size: sizes.as_ref().map(|x| x.total_size), segment_sizes: sizes.map(|x| x.segments), inputs, @@ -939,14 +965,14 @@ async fn layer_map_info_handler( request: Request, _cancel: CancellationToken, ) -> Result, ApiError> { - let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?; let reset: LayerAccessStatsReset = parse_query_param(&request, "reset")?.unwrap_or(LayerAccessStatsReset::NoReset); - check_permission(&request, Some(tenant_id))?; + check_permission(&request, Some(tenant_shard_id.tenant_id))?; - let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?; + let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?; let layer_map_info = timeline.layer_map_info(reset).await; json_response(StatusCode::OK, layer_map_info) @@ -956,13 +982,12 @@ async fn layer_download_handler( request: Request, _cancel: CancellationToken, ) -> Result, ApiError> { - let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; - check_permission(&request, Some(tenant_id))?; + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?; let layer_file_name = get_request_param(&request, "layer_file_name")?; - check_permission(&request, Some(tenant_id))?; + check_permission(&request, Some(tenant_shard_id.tenant_id))?; - let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?; + let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?; let downloaded = timeline .download_layer(layer_file_name) .await @@ -973,7 +998,7 @@ async fn layer_download_handler( Some(false) => json_response(StatusCode::NOT_MODIFIED, ()), None => json_response( StatusCode::BAD_REQUEST, - format!("Layer {tenant_id}/{timeline_id}/{layer_file_name} not found"), + format!("Layer {tenant_shard_id}/{timeline_id}/{layer_file_name} not found"), ), } } @@ -982,12 +1007,12 @@ async fn evict_timeline_layer_handler( request: Request, _cancel: CancellationToken, ) -> Result, ApiError> { - let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; - check_permission(&request, Some(tenant_id))?; + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; + check_permission(&request, Some(tenant_shard_id.tenant_id))?; let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?; let layer_file_name = get_request_param(&request, "layer_file_name")?; - let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?; + let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?; let evicted = timeline .evict_layer(layer_file_name) .await @@ -998,7 +1023,7 @@ async fn evict_timeline_layer_handler( Some(false) => json_response(StatusCode::NOT_MODIFIED, ()), None => json_response( StatusCode::BAD_REQUEST, - format!("Layer {tenant_id}/{timeline_id}/{layer_file_name} not found"), + format!("Layer {tenant_shard_id}/{timeline_id}/{layer_file_name} not found"), ), } } @@ -1130,10 +1155,10 @@ async fn get_tenant_config_handler( request: Request, _cancel: CancellationToken, ) -> Result, ApiError> { - let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; - check_permission(&request, Some(tenant_id))?; + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; + check_permission(&request, Some(tenant_shard_id.tenant_id))?; - let tenant = mgr::get_tenant(tenant_id, false)?; + let tenant = mgr::get_tenant(tenant_shard_id, false)?; let response = HashMap::from([ ( @@ -1227,9 +1252,9 @@ async fn handle_tenant_break( r: Request, _cancel: CancellationToken, ) -> Result, ApiError> { - let tenant_id: TenantId = parse_request_param(&r, "tenant_id")?; + let tenant_shard_id: TenantShardId = parse_request_param(&r, "tenant_shard_id")?; - let tenant = crate::tenant::mgr::get_tenant(tenant_id, true) + let tenant = crate::tenant::mgr::get_tenant(tenant_shard_id, true) .map_err(|_| ApiError::Conflict(String::from("no active tenant found")))?; tenant.set_broken("broken from test".to_owned()).await; @@ -1270,14 +1295,15 @@ async fn timeline_gc_handler( mut request: Request, cancel: CancellationToken, ) -> Result, ApiError> { - let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?; - check_permission(&request, Some(tenant_id))?; + check_permission(&request, Some(tenant_shard_id.tenant_id))?; let gc_req: TimelineGcRequest = json_request(&mut request).await?; let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); - let wait_task_done = mgr::immediate_gc(tenant_id, timeline_id, gc_req, cancel, &ctx).await?; + let wait_task_done = + mgr::immediate_gc(tenant_shard_id, timeline_id, gc_req, cancel, &ctx).await?; let gc_result = wait_task_done .await .context("wait for gc task") @@ -1292,9 +1318,9 @@ async fn timeline_compact_handler( request: Request, cancel: CancellationToken, ) -> Result, ApiError> { - let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?; - check_permission(&request, Some(tenant_id))?; + check_permission(&request, Some(tenant_shard_id.tenant_id))?; let mut flags = EnumSet::empty(); if Some(true) == parse_query_param::<_, bool>(&request, "force_repartition")? { @@ -1302,14 +1328,14 @@ async fn timeline_compact_handler( } async { let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); - let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?; + let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?; timeline .compact(&cancel, flags, &ctx) .await .map_err(|e| ApiError::InternalServerError(e.into()))?; json_response(StatusCode::OK, ()) } - .instrument(info_span!("manual_compaction", %tenant_id, %timeline_id)) + .instrument(info_span!("manual_compaction", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id)) .await } @@ -1318,9 +1344,9 @@ async fn timeline_checkpoint_handler( request: Request, cancel: CancellationToken, ) -> Result, ApiError> { - let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?; - check_permission(&request, Some(tenant_id))?; + check_permission(&request, Some(tenant_shard_id.tenant_id))?; let mut flags = EnumSet::empty(); if Some(true) == parse_query_param::<_, bool>(&request, "force_repartition")? { @@ -1328,7 +1354,7 @@ async fn timeline_checkpoint_handler( } async { let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); - let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?; + let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?; timeline .freeze_and_flush() .await @@ -1340,7 +1366,7 @@ async fn timeline_checkpoint_handler( json_response(StatusCode::OK, ()) } - .instrument(info_span!("manual_checkpoint", %tenant_id, %timeline_id)) + .instrument(info_span!("manual_checkpoint", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id)) .await } @@ -1348,12 +1374,12 @@ async fn timeline_download_remote_layers_handler_post( mut request: Request, _cancel: CancellationToken, ) -> Result, ApiError> { - let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?; let body: DownloadRemoteLayersTaskSpawnRequest = json_request(&mut request).await?; - check_permission(&request, Some(tenant_id))?; + check_permission(&request, Some(tenant_shard_id.tenant_id))?; - let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?; + let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?; match timeline.spawn_download_all_remote_layers(body).await { Ok(st) => json_response(StatusCode::ACCEPTED, st), Err(st) => json_response(StatusCode::CONFLICT, st), @@ -1364,11 +1390,11 @@ async fn timeline_download_remote_layers_handler_get( request: Request, _cancel: CancellationToken, ) -> Result, ApiError> { - let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; - check_permission(&request, Some(tenant_id))?; + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; + check_permission(&request, Some(tenant_shard_id.tenant_id))?; let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?; - let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?; + let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?; let info = timeline .get_download_all_remote_layers_task_info() .context("task never started since last pageserver process start") @@ -1414,9 +1440,9 @@ async fn getpage_at_lsn_handler( request: Request, _cancel: CancellationToken, ) -> Result, ApiError> { - let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?; - check_permission(&request, Some(tenant_id))?; + check_permission(&request, Some(tenant_shard_id.tenant_id))?; struct Key(crate::repository::Key); @@ -1435,7 +1461,7 @@ async fn getpage_at_lsn_handler( async { let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); - let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?; + let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?; let page = timeline.get(key.0, lsn, &ctx).await?; @@ -1447,7 +1473,7 @@ async fn getpage_at_lsn_handler( .unwrap(), ) } - .instrument(info_span!("timeline_get", %tenant_id, %timeline_id)) + .instrument(info_span!("timeline_get", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id)) .await } @@ -1455,9 +1481,9 @@ async fn timeline_collect_keyspace( request: Request, _cancel: CancellationToken, ) -> Result, ApiError> { - let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; + let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?; let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?; - check_permission(&request, Some(tenant_id))?; + check_permission(&request, Some(tenant_shard_id.tenant_id))?; struct Partitioning { keys: crate::keyspace::KeySpace, @@ -1526,7 +1552,7 @@ async fn timeline_collect_keyspace( async { let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); - let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?; + let timeline = active_timeline_of_active_tenant(tenant_shard_id, timeline_id).await?; let at_lsn = at_lsn.unwrap_or_else(|| timeline.get_last_record_lsn()); let keys = timeline .collect_keyspace(at_lsn, &ctx) @@ -1535,15 +1561,15 @@ async fn timeline_collect_keyspace( json_response(StatusCode::OK, Partitioning { keys, at_lsn }) } - .instrument(info_span!("timeline_collect_keyspace", %tenant_id, %timeline_id)) + .instrument(info_span!("timeline_collect_keyspace", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id)) .await } async fn active_timeline_of_active_tenant( - tenant_id: TenantId, + tenant_shard_id: TenantShardId, timeline_id: TimelineId, ) -> Result, ApiError> { - let tenant = mgr::get_tenant(tenant_id, true)?; + let tenant = mgr::get_tenant(tenant_shard_id, true)?; tenant .get_timeline(timeline_id, true) .map_err(|e| ApiError::NotFound(e.into())) @@ -1820,23 +1846,25 @@ pub fn make_router( }) .get("/v1/tenant", |r| api_handler(r, tenant_list_handler)) .post("/v1/tenant", |r| api_handler(r, tenant_create_handler)) - .get("/v1/tenant/:tenant_id", |r| api_handler(r, tenant_status)) + .get("/v1/tenant/:tenant_shard_id", |r| { + api_handler(r, tenant_status) + }) .delete("/v1/tenant/:tenant_shard_id", |r| { api_handler(r, tenant_delete_handler) }) - .get("/v1/tenant/:tenant_id/synthetic_size", |r| { + .get("/v1/tenant/:tenant_shard_id/synthetic_size", |r| { api_handler(r, tenant_size_handler) }) .put("/v1/tenant/config", |r| { api_handler(r, update_tenant_config_handler) }) - .get("/v1/tenant/:tenant_id/config", |r| { + .get("/v1/tenant/:tenant_shard_id/config", |r| { api_handler(r, get_tenant_config_handler) }) .put("/v1/tenant/:tenant_shard_id/location_config", |r| { api_handler(r, put_tenant_location_config_handler) }) - .get("/v1/tenant/:tenant_id/timeline", |r| { + .get("/v1/tenant/:tenant_shard_id/timeline", |r| { api_handler(r, timeline_list_handler) }) .post("/v1/tenant/:tenant_shard_id/timeline", |r| { @@ -1857,47 +1885,50 @@ pub fn make_router( .post("/v1/tenant/:tenant_id/ignore", |r| { api_handler(r, tenant_ignore_handler) }) - .get("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| { + .get("/v1/tenant/:tenant_shard_id/timeline/:timeline_id", |r| { api_handler(r, timeline_detail_handler) }) .get( - "/v1/tenant/:tenant_id/timeline/:timeline_id/get_lsn_by_timestamp", + "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/get_lsn_by_timestamp", |r| api_handler(r, get_lsn_by_timestamp_handler), ) .get( - "/v1/tenant/:tenant_id/timeline/:timeline_id/get_timestamp_of_lsn", + "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/get_timestamp_of_lsn", |r| api_handler(r, get_timestamp_of_lsn_handler), ) - .put("/v1/tenant/:tenant_id/timeline/:timeline_id/do_gc", |r| { - api_handler(r, timeline_gc_handler) - }) - .put("/v1/tenant/:tenant_id/timeline/:timeline_id/compact", |r| { - testing_api_handler("run timeline compaction", r, timeline_compact_handler) - }) .put( - "/v1/tenant/:tenant_id/timeline/:timeline_id/checkpoint", + "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/do_gc", + |r| api_handler(r, timeline_gc_handler), + ) + .put( + "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/compact", + |r| testing_api_handler("run timeline compaction", r, timeline_compact_handler), + ) + .put( + "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/checkpoint", |r| testing_api_handler("run timeline checkpoint", r, timeline_checkpoint_handler), ) .post( - "/v1/tenant/:tenant_id/timeline/:timeline_id/download_remote_layers", + "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/download_remote_layers", |r| api_handler(r, timeline_download_remote_layers_handler_post), ) .get( - "/v1/tenant/:tenant_id/timeline/:timeline_id/download_remote_layers", + "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/download_remote_layers", |r| api_handler(r, timeline_download_remote_layers_handler_get), ) .delete("/v1/tenant/:tenant_shard_id/timeline/:timeline_id", |r| { api_handler(r, timeline_delete_handler) }) - .get("/v1/tenant/:tenant_id/timeline/:timeline_id/layer", |r| { - api_handler(r, layer_map_info_handler) - }) .get( - "/v1/tenant/:tenant_id/timeline/:timeline_id/layer/:layer_file_name", + "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/layer", + |r| api_handler(r, layer_map_info_handler), + ) + .get( + "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/layer/:layer_file_name", |r| api_handler(r, layer_download_handler), ) .delete( - "/v1/tenant/:tenant_id/timeline/:timeline_id/layer/:layer_file_name", + "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/layer/:layer_file_name", |r| api_handler(r, evict_timeline_layer_handler), ) .put("/v1/disk_usage_eviction/run", |r| { @@ -1906,18 +1937,19 @@ pub fn make_router( .put("/v1/deletion_queue/flush", |r| { api_handler(r, deletion_queue_flush) }) - .put("/v1/tenant/:tenant_id/break", |r| { + .put("/v1/tenant/:tenant_shard_id/break", |r| { testing_api_handler("set tenant state to broken", r, handle_tenant_break) }) .get("/v1/panic", |r| api_handler(r, always_panic_handler)) .post("/v1/tracing/event", |r| { testing_api_handler("emit a tracing event", r, post_tracing_event_handler) }) - .get("/v1/tenant/:tenant_id/timeline/:timeline_id/getpage", |r| { - testing_api_handler("getpage@lsn", r, getpage_at_lsn_handler) - }) .get( - "/v1/tenant/:tenant_id/timeline/:timeline_id/keyspace", + "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/getpage", + |r| testing_api_handler("getpage@lsn", r, getpage_at_lsn_handler), + ) + .get( + "/v1/tenant/:tenant_shard_id/timeline/:timeline_id/keyspace", |r| testing_api_handler("read out the keyspace", r, timeline_collect_keyspace), ) .any(handler_404)) diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 67d798c1d4..7cc0333ee5 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -650,7 +650,7 @@ static EVICTIONS_WITH_LOW_RESIDENCE_DURATION: Lazy = Lazy::new(|| "pageserver_evictions_with_low_residence_duration", "If a layer is evicted that was resident for less than `low_threshold`, it is counted to this counter. \ Residence duration is determined using the `residence_duration_data_source`.", - &["tenant_id", "timeline_id", "residence_duration_data_source", "low_threshold_secs"] + &["tenant_id", "shard_id", "timeline_id", "residence_duration_data_source", "low_threshold_secs"] ) .expect("failed to define a metric") }); @@ -714,10 +714,16 @@ impl EvictionsWithLowResidenceDurationBuilder { } } - fn build(&self, tenant_id: &str, timeline_id: &str) -> EvictionsWithLowResidenceDuration { + fn build( + &self, + tenant_id: &str, + shard_id: &str, + timeline_id: &str, + ) -> EvictionsWithLowResidenceDuration { let counter = EVICTIONS_WITH_LOW_RESIDENCE_DURATION .get_metric_with_label_values(&[ tenant_id, + shard_id, timeline_id, self.data_source, &EvictionsWithLowResidenceDuration::threshold_label_value(self.threshold), @@ -748,21 +754,24 @@ impl EvictionsWithLowResidenceDuration { pub fn change_threshold( &mut self, tenant_id: &str, + shard_id: &str, timeline_id: &str, new_threshold: Duration, ) { if new_threshold == self.threshold { return; } - let mut with_new = - EvictionsWithLowResidenceDurationBuilder::new(self.data_source, new_threshold) - .build(tenant_id, timeline_id); + let mut with_new = EvictionsWithLowResidenceDurationBuilder::new( + self.data_source, + new_threshold, + ) + .build(tenant_id, shard_id, timeline_id); std::mem::swap(self, &mut with_new); - with_new.remove(tenant_id, timeline_id); + with_new.remove(tenant_id, shard_id, timeline_id); } // This could be a `Drop` impl, but, we need the `tenant_id` and `timeline_id`. - fn remove(&mut self, tenant_id: &str, timeline_id: &str) { + fn remove(&mut self, tenant_id: &str, shard_id: &str, timeline_id: &str) { let Some(_counter) = self.counter.take() else { return; }; @@ -771,6 +780,7 @@ impl EvictionsWithLowResidenceDuration { let removed = EVICTIONS_WITH_LOW_RESIDENCE_DURATION.remove_label_values(&[ tenant_id, + shard_id, timeline_id, self.data_source, &threshold, @@ -1603,6 +1613,7 @@ impl StorageTimeMetrics { #[derive(Debug)] pub struct TimelineMetrics { tenant_id: String, + shard_id: String, timeline_id: String, pub flush_time_histo: StorageTimeMetrics, pub compact_time_histo: StorageTimeMetrics, @@ -1623,11 +1634,12 @@ pub struct TimelineMetrics { impl TimelineMetrics { pub fn new( - tenant_id: &TenantId, + tenant_shard_id: &TenantShardId, timeline_id: &TimelineId, evictions_with_low_residence_duration_builder: EvictionsWithLowResidenceDurationBuilder, ) -> Self { - let tenant_id = tenant_id.to_string(); + let tenant_id = tenant_shard_id.tenant_id.to_string(); + let shard_id = format!("{}", tenant_shard_id.shard_slug()); let timeline_id = timeline_id.to_string(); let flush_time_histo = StorageTimeMetrics::new(StorageTimeOperation::LayerFlush, &tenant_id, &timeline_id); @@ -1664,11 +1676,12 @@ impl TimelineMetrics { let evictions = EVICTIONS .get_metric_with_label_values(&[&tenant_id, &timeline_id]) .unwrap(); - let evictions_with_low_residence_duration = - evictions_with_low_residence_duration_builder.build(&tenant_id, &timeline_id); + let evictions_with_low_residence_duration = evictions_with_low_residence_duration_builder + .build(&tenant_id, &shard_id, &timeline_id); TimelineMetrics { tenant_id, + shard_id, timeline_id, flush_time_histo, compact_time_histo, @@ -1714,6 +1727,7 @@ impl Drop for TimelineMetrics { fn drop(&mut self) { let tenant_id = &self.tenant_id; let timeline_id = &self.timeline_id; + let shard_id = &self.shard_id; let _ = LAST_RECORD_LSN.remove_label_values(&[tenant_id, timeline_id]); { RESIDENT_PHYSICAL_SIZE_GLOBAL.sub(self.resident_physical_size_get()); @@ -1727,7 +1741,7 @@ impl Drop for TimelineMetrics { self.evictions_with_low_residence_duration .write() .unwrap() - .remove(tenant_id, timeline_id); + .remove(tenant_id, shard_id, timeline_id); // The following metrics are born outside of the TimelineMetrics lifecycle but still // removed at the end of it. The idea is to have the metrics outlive the diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index dbd85d2dcf..c3c98af406 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -28,7 +28,7 @@ //! Page cache maps from a cache key to a buffer slot. //! The cache key uniquely identifies the piece of data that is being cached. //! -//! The cache key for **materialized pages** is [`TenantId`], [`TimelineId`], [`Key`], and [`Lsn`]. +//! The cache key for **materialized pages** is [`TenantShardId`], [`TimelineId`], [`Key`], and [`Lsn`]. //! Use [`PageCache::memorize_materialized_page`] and [`PageCache::lookup_materialized_page`] for fill & access. //! //! The cache key for **immutable file** pages is [`FileId`] and a block number. @@ -83,10 +83,8 @@ use std::{ use anyhow::Context; use once_cell::sync::OnceCell; -use utils::{ - id::{TenantId, TimelineId}, - lsn::Lsn, -}; +use pageserver_api::shard::TenantShardId; +use utils::{id::TimelineId, lsn::Lsn}; use crate::{ context::RequestContext, @@ -154,7 +152,13 @@ enum CacheKey { #[derive(Debug, PartialEq, Eq, Hash, Clone)] struct MaterializedPageHashKey { - tenant_id: TenantId, + /// Why is this TenantShardId rather than TenantId? + /// + /// Usually, the materialized value of a page@lsn is identical on any shard in the same tenant. However, this + /// this not the case for certain internally-generated pages (e.g. relation sizes). In future, we may make this + /// key smaller by omitting the shard, if we ensure that reads to such pages always skip the cache, or are + /// special-cased in some other way. + tenant_shard_id: TenantShardId, timeline_id: TimelineId, key: Key, } @@ -378,7 +382,7 @@ impl PageCache { /// returned page. pub async fn lookup_materialized_page( &self, - tenant_id: TenantId, + tenant_shard_id: TenantShardId, timeline_id: TimelineId, key: &Key, lsn: Lsn, @@ -395,7 +399,7 @@ impl PageCache { let mut cache_key = CacheKey::MaterializedPage { hash_key: MaterializedPageHashKey { - tenant_id, + tenant_shard_id, timeline_id, key: *key, }, @@ -436,7 +440,7 @@ impl PageCache { /// pub async fn memorize_materialized_page( &self, - tenant_id: TenantId, + tenant_shard_id: TenantShardId, timeline_id: TimelineId, key: Key, lsn: Lsn, @@ -444,7 +448,7 @@ impl PageCache { ) -> anyhow::Result<()> { let cache_key = CacheKey::MaterializedPage { hash_key: MaterializedPageHashKey { - tenant_id, + tenant_shard_id, timeline_id, key, }, diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index 4270b6edb0..5786356720 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -42,6 +42,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use futures::FutureExt; +use pageserver_api::shard::TenantShardId; use tokio::runtime::Runtime; use tokio::task::JoinHandle; use tokio::task_local; @@ -51,7 +52,7 @@ use tracing::{debug, error, info, warn}; use once_cell::sync::Lazy; -use utils::id::{TenantId, TimelineId}; +use utils::id::TimelineId; use crate::shutdown_pageserver; @@ -317,7 +318,7 @@ struct PageServerTask { /// Tasks may optionally be launched for a particular tenant/timeline, enabling /// later cancelling tasks for that tenant/timeline in [`shutdown_tasks`] - tenant_id: Option, + tenant_shard_id: Option, timeline_id: Option, mutable: Mutex, @@ -329,7 +330,7 @@ struct PageServerTask { pub fn spawn( runtime: &tokio::runtime::Handle, kind: TaskKind, - tenant_id: Option, + tenant_shard_id: Option, timeline_id: Option, name: &str, shutdown_process_on_error: bool, @@ -345,7 +346,7 @@ where kind, name: name.to_string(), cancel: cancel.clone(), - tenant_id, + tenant_shard_id, timeline_id, mutable: Mutex::new(MutableTaskState { join_handle: None }), }); @@ -424,28 +425,28 @@ async fn task_finish( Ok(Err(err)) => { if shutdown_process_on_error { error!( - "Shutting down: task '{}' tenant_id: {:?}, timeline_id: {:?} exited with error: {:?}", - task_name, task.tenant_id, task.timeline_id, err + "Shutting down: task '{}' tenant_shard_id: {:?}, timeline_id: {:?} exited with error: {:?}", + task_name, task.tenant_shard_id, task.timeline_id, err ); shutdown_process = true; } else { error!( - "Task '{}' tenant_id: {:?}, timeline_id: {:?} exited with error: {:?}", - task_name, task.tenant_id, task.timeline_id, err + "Task '{}' tenant_shard_id: {:?}, timeline_id: {:?} exited with error: {:?}", + task_name, task.tenant_shard_id, task.timeline_id, err ); } } Err(err) => { if shutdown_process_on_error { error!( - "Shutting down: task '{}' tenant_id: {:?}, timeline_id: {:?} panicked: {:?}", - task_name, task.tenant_id, task.timeline_id, err + "Shutting down: task '{}' tenant_shard_id: {:?}, timeline_id: {:?} panicked: {:?}", + task_name, task.tenant_shard_id, task.timeline_id, err ); shutdown_process = true; } else { error!( - "Task '{}' tenant_id: {:?}, timeline_id: {:?} panicked: {:?}", - task_name, task.tenant_id, task.timeline_id, err + "Task '{}' tenant_shard_id: {:?}, timeline_id: {:?} panicked: {:?}", + task_name, task.tenant_shard_id, task.timeline_id, err ); } } @@ -467,11 +468,11 @@ async fn task_finish( /// /// Or to shut down all tasks for given timeline: /// -/// shutdown_tasks(None, Some(tenant_id), Some(timeline_id)) +/// shutdown_tasks(None, Some(tenant_shard_id), Some(timeline_id)) /// pub async fn shutdown_tasks( kind: Option, - tenant_id: Option, + tenant_shard_id: Option, timeline_id: Option, ) { let mut victim_tasks = Vec::new(); @@ -480,35 +481,35 @@ pub async fn shutdown_tasks( let tasks = TASKS.lock().unwrap(); for task in tasks.values() { if (kind.is_none() || Some(task.kind) == kind) - && (tenant_id.is_none() || task.tenant_id == tenant_id) + && (tenant_shard_id.is_none() || task.tenant_shard_id == tenant_shard_id) && (timeline_id.is_none() || task.timeline_id == timeline_id) { task.cancel.cancel(); victim_tasks.push(( Arc::clone(task), task.kind, - task.tenant_id, + task.tenant_shard_id, task.timeline_id, )); } } } - let log_all = kind.is_none() && tenant_id.is_none() && timeline_id.is_none(); + let log_all = kind.is_none() && tenant_shard_id.is_none() && timeline_id.is_none(); - for (task, task_kind, tenant_id, timeline_id) in victim_tasks { + for (task, task_kind, tenant_shard_id, timeline_id) in victim_tasks { let join_handle = { let mut task_mut = task.mutable.lock().unwrap(); task_mut.join_handle.take() }; if let Some(mut join_handle) = join_handle { if log_all { - if tenant_id.is_none() { + if tenant_shard_id.is_none() { // there are quite few of these info!(name = task.name, kind = ?task_kind, "stopping global task"); } else { // warn to catch these in tests; there shouldn't be any - warn!(name = task.name, tenant_id = ?tenant_id, timeline_id = ?timeline_id, kind = ?task_kind, "stopping left-over"); + warn!(name = task.name, tenant_shard_id = ?tenant_shard_id, timeline_id = ?timeline_id, kind = ?task_kind, "stopping left-over"); } } if tokio::time::timeout(std::time::Duration::from_secs(1), &mut join_handle) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 48f71d7747..a8e8b4cbfa 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -608,7 +608,7 @@ impl Tenant { task_mgr::spawn( &tokio::runtime::Handle::current(), TaskKind::Attach, - Some(tenant_shard_id.tenant_id), + Some(tenant_shard_id), None, "attach tenant", false, @@ -1917,7 +1917,7 @@ impl Tenant { // // this will additionally shutdown and await all timeline tasks. tracing::debug!("Waiting for tasks..."); - task_mgr::shutdown_tasks(None, Some(self.tenant_shard_id.tenant_id), None).await; + task_mgr::shutdown_tasks(None, Some(self.tenant_shard_id), None).await; // Wait for any in-flight operations to complete self.gate.close().await; diff --git a/pageserver/src/tenant/delete.rs b/pageserver/src/tenant/delete.rs index b8d6d0a321..acd311ace6 100644 --- a/pageserver/src/tenant/delete.rs +++ b/pageserver/src/tenant/delete.rs @@ -463,7 +463,7 @@ impl DeleteTenantFlow { task_mgr::spawn( task_mgr::BACKGROUND_RUNTIME.handle(), TaskKind::TimelineDeletionWorker, - Some(tenant_shard_id.tenant_id), + Some(tenant_shard_id), None, "tenant_delete", false, @@ -550,7 +550,7 @@ impl DeleteTenantFlow { // we encounter an InProgress marker, yield the barrier it contains and wait on it. let barrier = { let mut locked = tenants.write().unwrap(); - let removed = locked.remove(&tenant.tenant_shard_id.tenant_id); + let removed = locked.remove(tenant.tenant_shard_id); // FIXME: we should not be modifying this from outside of mgr.rs. // This will go away when we simplify deletion (https://github.com/neondatabase/neon/issues/5080) diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 8466fe7fca..4d7bd4259f 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -98,33 +98,6 @@ pub(crate) enum TenantsMap { ShuttingDown(BTreeMap), } -/// Helper for mapping shard-unaware functions to a sharding-aware map -/// TODO(sharding): all users of this must be made shard-aware. -fn exactly_one_or_none<'a>( - map: &'a BTreeMap, - tenant_id: &TenantId, -) -> Option<(&'a TenantShardId, &'a TenantSlot)> { - let mut slots = map.range(TenantShardId::tenant_range(*tenant_id)); - - // Retrieve the first two slots in the range: if both are populated, we must panic because the caller - // needs a shard-naive view of the world in which only one slot can exist for a TenantId at a time. - let slot_a = slots.next(); - let slot_b = slots.next(); - match (slot_a, slot_b) { - (None, None) => None, - (Some(slot), None) => { - // Exactly one matching slot - Some(slot) - } - (Some(_slot_a), Some(_slot_b)) => { - // Multiple shards for this tenant: cannot handle this yet. - // TODO(sharding): callers of get() should be shard-aware. - todo!("Attaching multiple shards in teh same tenant to the same pageserver") - } - (None, Some(_)) => unreachable!(), - } -} - pub(crate) enum TenantsMapRemoveResult { Occupied(TenantSlot), Vacant, @@ -147,12 +120,11 @@ impl TenantsMap { /// Convenience function for typical usage, where we want to get a `Tenant` object, for /// working with attached tenants. If the TenantId is in the map but in Secondary state, /// None is returned. - pub(crate) fn get(&self, tenant_id: &TenantId) -> Option<&Arc> { + pub(crate) fn get(&self, tenant_shard_id: &TenantShardId) -> Option<&Arc> { match self { TenantsMap::Initializing => None, TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => { - // TODO(sharding): callers of get() should be shard-aware. - exactly_one_or_none(m, tenant_id).and_then(|(_, slot)| slot.get_attached()) + m.get(tenant_shard_id).and_then(|slot| slot.get_attached()) } } } @@ -204,25 +176,19 @@ impl TenantsMap { /// /// The normal way to remove a tenant is using a SlotGuard, which will gracefully remove the guarded /// slot if the enclosed tenant is shutdown. - pub(crate) fn remove(&mut self, tenant_id: &TenantId) -> TenantsMapRemoveResult { + pub(crate) fn remove(&mut self, tenant_shard_id: TenantShardId) -> TenantsMapRemoveResult { use std::collections::btree_map::Entry; match self { TenantsMap::Initializing => TenantsMapRemoveResult::Vacant, - TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => { - let key = exactly_one_or_none(m, tenant_id).map(|(k, _)| *k); - match key { - Some(key) => match m.entry(key) { - Entry::Occupied(entry) => match entry.get() { - TenantSlot::InProgress(barrier) => { - TenantsMapRemoveResult::InProgress(barrier.clone()) - } - _ => TenantsMapRemoveResult::Occupied(entry.remove()), - }, - Entry::Vacant(_entry) => TenantsMapRemoveResult::Vacant, - }, - None => TenantsMapRemoveResult::Vacant, - } - } + TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => match m.entry(tenant_shard_id) { + Entry::Occupied(entry) => match entry.get() { + TenantSlot::InProgress(barrier) => { + TenantsMapRemoveResult::InProgress(barrier.clone()) + } + _ => TenantsMapRemoveResult::Occupied(entry.remove()), + }, + Entry::Vacant(_entry) => TenantsMapRemoveResult::Vacant, + }, } } @@ -822,14 +788,16 @@ pub(crate) async fn set_new_tenant_config( new_tenant_conf: TenantConfOpt, tenant_id: TenantId, ) -> Result<(), SetNewTenantConfigError> { + // Legacy API: does not support sharding + let tenant_shard_id = TenantShardId::unsharded(tenant_id); + info!("configuring tenant {tenant_id}"); - let tenant = get_tenant(tenant_id, true)?; + let tenant = get_tenant(tenant_shard_id, true)?; // This is a legacy API that only operates on attached tenants: the preferred // API to use is the location_config/ endpoint, which lets the caller provide // the full LocationConf. let location_conf = LocationConf::attached_single(new_tenant_conf, tenant.generation); - let tenant_shard_id = TenantShardId::unsharded(tenant_id); Tenant::persist_tenant_config(conf, &tenant_shard_id, &location_conf) .await @@ -1143,14 +1111,11 @@ pub(crate) enum GetTenantError { /// /// This method is cancel-safe. pub(crate) fn get_tenant( - tenant_id: TenantId, + tenant_shard_id: TenantShardId, active_only: bool, ) -> Result, GetTenantError> { let locked = TENANTS.read().unwrap(); - // TODO(sharding): make all callers of get_tenant shard-aware - let tenant_shard_id = TenantShardId::unsharded(tenant_id); - let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)?; match peek_slot { @@ -1162,14 +1127,18 @@ pub(crate) fn get_tenant( TenantState::Active => Ok(Arc::clone(tenant)), _ => { if active_only { - Err(GetTenantError::NotActive(tenant_id)) + Err(GetTenantError::NotActive(tenant_shard_id.tenant_id)) } else { Ok(Arc::clone(tenant)) } } }, - Some(TenantSlot::InProgress(_)) => Err(GetTenantError::NotActive(tenant_id)), - None | Some(TenantSlot::Secondary) => Err(GetTenantError::NotFound(tenant_id)), + Some(TenantSlot::InProgress(_)) => { + Err(GetTenantError::NotActive(tenant_shard_id.tenant_id)) + } + None | Some(TenantSlot::Secondary) => { + Err(GetTenantError::NotFound(tenant_shard_id.tenant_id)) + } } } @@ -1542,7 +1511,8 @@ pub(crate) enum TenantMapListError { /// /// Get list of tenants, for the mgmt API /// -pub(crate) async fn list_tenants() -> Result, TenantMapListError> { +pub(crate) async fn list_tenants() -> Result, TenantMapListError> +{ let tenants = TENANTS.read().unwrap(); let m = match &*tenants { TenantsMap::Initializing => return Err(TenantMapListError::Initializing), @@ -1550,12 +1520,10 @@ pub(crate) async fn list_tenants() -> Result, Tenan }; Ok(m.iter() .filter_map(|(id, tenant)| match tenant { - TenantSlot::Attached(tenant) => Some((id, tenant.current_state())), + TenantSlot::Attached(tenant) => Some((*id, tenant.current_state())), TenantSlot::Secondary => None, TenantSlot::InProgress(_) => None, }) - // TODO(sharding): make callers of this function shard-aware - .map(|(k, v)| (k.tenant_id, v)) .collect()) } @@ -2089,21 +2057,19 @@ use { }; pub(crate) async fn immediate_gc( - tenant_id: TenantId, + tenant_shard_id: TenantShardId, timeline_id: TimelineId, gc_req: TimelineGcRequest, cancel: CancellationToken, ctx: &RequestContext, ) -> Result>, ApiError> { let guard = TENANTS.read().unwrap(); - let tenant = guard - .get(&tenant_id) - .map(Arc::clone) - .with_context(|| format!("tenant {tenant_id}")) - .map_err(|e| ApiError::NotFound(e.into()))?; - // TODO(sharding): make callers of this function shard-aware - let tenant_shard_id = TenantShardId::unsharded(tenant_id); + let tenant = guard + .get(&tenant_shard_id) + .map(Arc::clone) + .with_context(|| format!("tenant {tenant_shard_id}")) + .map_err(|e| ApiError::NotFound(e.into()))?; let gc_horizon = gc_req.gc_horizon.unwrap_or_else(|| tenant.get_gc_horizon()); // Use tenant's pitr setting @@ -2116,9 +2082,9 @@ pub(crate) async fn immediate_gc( task_mgr::spawn( &tokio::runtime::Handle::current(), TaskKind::GarbageCollector, - Some(tenant_id), + Some(tenant_shard_id), Some(timeline_id), - &format!("timeline_gc_handler garbage collection run for tenant {tenant_id} timeline {timeline_id}"), + &format!("timeline_gc_handler garbage collection run for tenant {tenant_shard_id} timeline {timeline_id}"), false, async move { fail::fail_point!("immediate_gc_task_pre"); diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 03600cf5ae..3765ff6e7a 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -1223,7 +1223,7 @@ impl RemoteTimelineClient { task_mgr::spawn( &self.runtime, TaskKind::RemoteUploadTask, - Some(self.tenant_shard_id.tenant_id), + Some(self.tenant_shard_id), Some(self.timeline_id), "remote upload", false, diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index 126d4d5563..112128ead8 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -837,7 +837,7 @@ impl LayerInner { crate::task_mgr::spawn( &tokio::runtime::Handle::current(), crate::task_mgr::TaskKind::RemoteDownloadTask, - Some(self.desc.tenant_shard_id.tenant_id), + Some(self.desc.tenant_shard_id), Some(self.desc.timeline_id), &task_name, false, diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index bc404c41a0..dc23030218 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -87,13 +87,13 @@ pub fn start_background_loops( tenant: &Arc, background_jobs_can_start: Option<&completion::Barrier>, ) { - let tenant_id = tenant.tenant_shard_id.tenant_id; + let tenant_shard_id = tenant.tenant_shard_id; task_mgr::spawn( BACKGROUND_RUNTIME.handle(), TaskKind::Compaction, - Some(tenant_id), + Some(tenant_shard_id), None, - &format!("compactor for tenant {tenant_id}"), + &format!("compactor for tenant {tenant_shard_id}"), false, { let tenant = Arc::clone(tenant); @@ -105,7 +105,7 @@ pub fn start_background_loops( _ = completion::Barrier::maybe_wait(background_jobs_can_start) => {} }; compaction_loop(tenant, cancel) - .instrument(info_span!("compaction_loop", tenant_id = %tenant_id)) + .instrument(info_span!("compaction_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug())) .await; Ok(()) } @@ -114,9 +114,9 @@ pub fn start_background_loops( task_mgr::spawn( BACKGROUND_RUNTIME.handle(), TaskKind::GarbageCollector, - Some(tenant_id), + Some(tenant_shard_id), None, - &format!("garbage collector for tenant {tenant_id}"), + &format!("garbage collector for tenant {tenant_shard_id}"), false, { let tenant = Arc::clone(tenant); @@ -128,7 +128,7 @@ pub fn start_background_loops( _ = completion::Barrier::maybe_wait(background_jobs_can_start) => {} }; gc_loop(tenant, cancel) - .instrument(info_span!("gc_loop", tenant_id = %tenant_id)) + .instrument(info_span!("gc_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug())) .await; Ok(()) } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 551b66b77d..f3907a6d2b 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -77,7 +77,7 @@ use postgres_ffi::to_pg_timestamp; use utils::{ completion, generation::Generation, - id::{TenantId, TimelineId}, + id::TimelineId, lsn::{AtomicLsn, Lsn, RecordLsn}, seqwait::SeqWait, simple_rcu::{Rcu, RcuReadGuard}, @@ -926,7 +926,7 @@ impl Timeline { tracing::debug!("Waiting for WalReceiverManager..."); task_mgr::shutdown_tasks( Some(TaskKind::WalReceiverManager), - Some(self.tenant_shard_id.tenant_id), + Some(self.tenant_shard_id), Some(self.timeline_id), ) .await; @@ -977,7 +977,7 @@ impl Timeline { // Shut down the layer flush task before the remote client, as one depends on the other task_mgr::shutdown_tasks( Some(TaskKind::LayerFlushTask), - Some(self.tenant_shard_id.tenant_id), + Some(self.tenant_shard_id), Some(self.timeline_id), ) .await; @@ -995,12 +995,7 @@ impl Timeline { tracing::debug!("Waiting for tasks..."); - task_mgr::shutdown_tasks( - None, - Some(self.tenant_shard_id.tenant_id), - Some(self.timeline_id), - ) - .await; + task_mgr::shutdown_tasks(None, Some(self.tenant_shard_id), Some(self.timeline_id)).await; // Finally wait until any gate-holders are complete self.gate.close().await; @@ -1314,16 +1309,20 @@ impl Timeline { &self.conf.default_tenant_conf, ); - // TODO(sharding): make evictions state shard aware - // (https://github.com/neondatabase/neon/issues/5953) let tenant_id_str = self.tenant_shard_id.tenant_id.to_string(); + let shard_id_str = format!("{}", self.tenant_shard_id.shard_slug()); let timeline_id_str = self.timeline_id.to_string(); self.metrics .evictions_with_low_residence_duration .write() .unwrap() - .change_threshold(&tenant_id_str, &timeline_id_str, new_threshold); + .change_threshold( + &tenant_id_str, + &shard_id_str, + &timeline_id_str, + new_threshold, + ); } } @@ -1395,7 +1394,7 @@ impl Timeline { ancestor_lsn: metadata.ancestor_lsn(), metrics: TimelineMetrics::new( - &tenant_shard_id.tenant_id, + &tenant_shard_id, &timeline_id, crate::metrics::EvictionsWithLowResidenceDurationBuilder::new( "mtime", @@ -1496,7 +1495,7 @@ impl Timeline { task_mgr::spawn( task_mgr::BACKGROUND_RUNTIME.handle(), task_mgr::TaskKind::LayerFlushTask, - Some(self.tenant_shard_id.tenant_id), + Some(self.tenant_shard_id), Some(self.timeline_id), "layer flush task", false, @@ -1847,7 +1846,7 @@ impl Timeline { task_mgr::spawn( task_mgr::BACKGROUND_RUNTIME.handle(), task_mgr::TaskKind::InitialLogicalSizeCalculation, - Some(self.tenant_shard_id.tenant_id), + Some(self.tenant_shard_id), Some(self.timeline_id), "initial size calculation", false, @@ -2020,7 +2019,7 @@ impl Timeline { task_mgr::spawn( task_mgr::BACKGROUND_RUNTIME.handle(), task_mgr::TaskKind::OndemandLogicalSizeCalculation, - Some(self.tenant_shard_id.tenant_id), + Some(self.tenant_shard_id), Some(self.timeline_id), "ondemand logical size calculation", false, @@ -2461,13 +2460,7 @@ impl Timeline { // FIXME: It's pointless to check the cache for things that are not 8kB pages. // We should look at the key to determine if it's a cacheable object let (lsn, read_guard) = cache - .lookup_materialized_page( - self.tenant_shard_id.tenant_id, - self.timeline_id, - key, - lsn, - ctx, - ) + .lookup_materialized_page(self.tenant_shard_id, self.timeline_id, key, lsn, ctx) .await?; let img = Bytes::from(read_guard.to_vec()); Some((lsn, img)) @@ -3209,7 +3202,7 @@ impl DurationRecorder { #[derive(Default)] struct CompactLevel0Phase1StatsBuilder { version: Option, - tenant_id: Option, + tenant_id: Option, timeline_id: Option, read_lock_acquisition_micros: DurationRecorder, read_lock_held_spawn_blocking_startup_micros: DurationRecorder, @@ -3226,7 +3219,7 @@ struct CompactLevel0Phase1StatsBuilder { #[derive(serde::Serialize)] struct CompactLevel0Phase1Stats { version: u64, - tenant_id: TenantId, + tenant_id: TenantShardId, timeline_id: TimelineId, read_lock_acquisition_micros: RecordedDuration, read_lock_held_spawn_blocking_startup_micros: RecordedDuration, @@ -3745,7 +3738,7 @@ impl Timeline { let ctx = ctx.attached_child(); let mut stats = CompactLevel0Phase1StatsBuilder { version: Some(2), - tenant_id: Some(self.tenant_shard_id.tenant_id), + tenant_id: Some(self.tenant_shard_id), timeline_id: Some(self.timeline_id), ..Default::default() }; @@ -4207,7 +4200,7 @@ impl Timeline { let cache = page_cache::get(); if let Err(e) = cache .memorize_materialized_page( - self.tenant_shard_id.tenant_id, + self.tenant_shard_id, self.timeline_id, key, last_rec_lsn, @@ -4251,7 +4244,7 @@ impl Timeline { let task_id = task_mgr::spawn( task_mgr::BACKGROUND_RUNTIME.handle(), task_mgr::TaskKind::DownloadAllRemoteLayers, - Some(self.tenant_shard_id.tenant_id), + Some(self.tenant_shard_id), Some(self.timeline_id), "download all remote layers task", false, diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index 2a103a7ff4..be873181d9 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -43,7 +43,7 @@ async fn stop_tasks(timeline: &Timeline) -> Result<(), DeleteTimelineError> { // Shut down the layer flush task before the remote client, as one depends on the other task_mgr::shutdown_tasks( Some(TaskKind::LayerFlushTask), - Some(timeline.tenant_shard_id.tenant_id), + Some(timeline.tenant_shard_id), Some(timeline.timeline_id), ) .await; @@ -71,7 +71,7 @@ async fn stop_tasks(timeline: &Timeline) -> Result<(), DeleteTimelineError> { info!("waiting for timeline tasks to shutdown"); task_mgr::shutdown_tasks( None, - Some(timeline.tenant_shard_id.tenant_id), + Some(timeline.tenant_shard_id), Some(timeline.timeline_id), ) .await; @@ -528,7 +528,7 @@ impl DeleteTimelineFlow { task_mgr::spawn( task_mgr::BACKGROUND_RUNTIME.handle(), TaskKind::TimelineDeletionWorker, - Some(tenant_shard_id.tenant_id), + Some(tenant_shard_id), Some(timeline_id), "timeline_delete", false, diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 3fe4bc0f83..020c5a9e9f 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -60,7 +60,7 @@ impl Timeline { task_mgr::spawn( BACKGROUND_RUNTIME.handle(), TaskKind::Eviction, - Some(self.tenant_shard_id.tenant_id), + Some(self.tenant_shard_id), Some(self.timeline_id), &format!( "layer eviction for {}/{}", @@ -343,7 +343,7 @@ impl Timeline { // Make one of the tenant's timelines draw the short straw and run the calculation. // The others wait until the calculation is done so that they take into account the // imitated accesses that the winner made. - let tenant = match crate::tenant::mgr::get_tenant(self.tenant_shard_id.tenant_id, true) { + let tenant = match crate::tenant::mgr::get_tenant(self.tenant_shard_id, true) { Ok(t) => t, Err(_) => { return ControlFlow::Break(()); diff --git a/pageserver/src/tenant/timeline/walreceiver.rs b/pageserver/src/tenant/timeline/walreceiver.rs index 04ff8602d6..e32265afb5 100644 --- a/pageserver/src/tenant/timeline/walreceiver.rs +++ b/pageserver/src/tenant/timeline/walreceiver.rs @@ -30,6 +30,7 @@ use crate::tenant::timeline::walreceiver::connection_manager::{ connection_manager_loop_step, ConnectionManagerState, }; +use pageserver_api::shard::TenantShardId; use std::future::Future; use std::num::NonZeroU64; use std::ops::ControlFlow; @@ -41,7 +42,7 @@ use tokio::sync::watch; use tokio_util::sync::CancellationToken; use tracing::*; -use utils::id::TenantTimelineId; +use utils::id::TimelineId; use self::connection_manager::ConnectionManagerStatus; @@ -60,7 +61,8 @@ pub struct WalReceiverConf { } pub struct WalReceiver { - timeline: TenantTimelineId, + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, manager_status: Arc>>, } @@ -71,7 +73,7 @@ impl WalReceiver { mut broker_client: BrokerClientChannel, ctx: &RequestContext, ) -> Self { - let tenant_id = timeline.tenant_shard_id.tenant_id; + let tenant_shard_id = timeline.tenant_shard_id; let timeline_id = timeline.timeline_id; let walreceiver_ctx = ctx.detached_child(TaskKind::WalReceiverManager, DownloadBehavior::Error); @@ -81,9 +83,9 @@ impl WalReceiver { task_mgr::spawn( WALRECEIVER_RUNTIME.handle(), TaskKind::WalReceiverManager, - Some(tenant_id), + Some(timeline.tenant_shard_id), Some(timeline_id), - &format!("walreceiver for timeline {tenant_id}/{timeline_id}"), + &format!("walreceiver for timeline {tenant_shard_id}/{timeline_id}"), false, async move { debug_assert_current_span_has_tenant_and_timeline_id(); @@ -117,11 +119,12 @@ impl WalReceiver { *loop_status.write().unwrap() = None; Ok(()) } - .instrument(info_span!(parent: None, "wal_connection_manager", tenant_id = %tenant_id, timeline_id = %timeline_id)) + .instrument(info_span!(parent: None, "wal_connection_manager", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), timeline_id = %timeline_id)) ); Self { - timeline: TenantTimelineId::new(tenant_id, timeline_id), + tenant_shard_id, + timeline_id, manager_status, } } @@ -129,8 +132,8 @@ impl WalReceiver { pub async fn stop(self) { task_mgr::shutdown_tasks( Some(TaskKind::WalReceiverManager), - Some(self.timeline.tenant_id), - Some(self.timeline.timeline_id), + Some(self.tenant_shard_id), + Some(self.timeline_id), ) .await; } diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 3bcb7ff891..61ab236322 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -163,7 +163,7 @@ pub(super) async fn handle_walreceiver_connection( task_mgr::spawn( WALRECEIVER_RUNTIME.handle(), TaskKind::WalReceiverConnectionPoller, - Some(timeline.tenant_shard_id.tenant_id), + Some(timeline.tenant_shard_id), Some(timeline.timeline_id), "walreceiver connection", false,