From 1a8c8b04d70bd82a20055e2653c4aa593e3bfc34 Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Wed, 7 Sep 2022 18:01:49 +0300 Subject: [PATCH] Merge Repository and Tenant entities, rework tenant background jobs --- control_plane/src/bin/neon_local.rs | 8 +- pageserver/src/basebackup.rs | 2 +- pageserver/src/bin/dump_layerfile.rs | 2 +- pageserver/src/bin/pageserver.rs | 2 +- pageserver/src/bin/update_metadata.rs | 2 +- pageserver/src/config.rs | 2 +- pageserver/src/http/models.rs | 5 +- pageserver/src/http/openapi_spec.yml | 4 +- pageserver/src/http/routes.rs | 123 ++++--- pageserver/src/import_datadir.rs | 2 +- pageserver/src/lib.rs | 4 +- pageserver/src/page_cache.rs | 2 +- pageserver/src/page_service.rs | 56 ++-- pageserver/src/pgdatadir_mapping.rs | 10 +- pageserver/src/storage_sync.rs | 34 +- pageserver/src/storage_sync/delete.rs | 6 +- pageserver/src/storage_sync/download.rs | 10 +- pageserver/src/storage_sync/index.rs | 8 +- pageserver/src/storage_sync/upload.rs | 12 +- .../src/{layered_repository.rs => tenant.rs} | 255 +++++++++----- .../{layered_repository => tenant}/blob_io.rs | 2 +- .../block_io.rs | 2 +- .../delta_layer.rs | 12 +- .../disk_btree.rs | 2 +- .../disk_btree_test_data.rs | 0 .../ephemeral_file.rs | 14 +- .../filename.rs | 0 .../image_layer.rs | 12 +- .../inmemory_layer.rs | 12 +- .../layer_map.rs | 6 +- .../metadata.rs | 4 +- .../par_fsync.rs | 0 .../storage_layer.rs | 0 .../timeline.rs | 4 +- pageserver/src/tenant_mgr.rs | 312 +++++++----------- pageserver/src/tenant_tasks.rs | 147 ++++++--- pageserver/src/timelines.rs | 31 +- pageserver/src/walingest.rs | 25 +- .../src/walreceiver/connection_manager.rs | 20 +- .../src/walreceiver/walreceiver_connection.rs | 7 +- test_runner/regress/test_broken_timeline.py | 4 +- test_runner/regress/test_tenant_tasks.py | 8 +- test_runner/regress/test_timeline_delete.py | 5 +- 43 files changed, 615 insertions(+), 563 deletions(-) rename pageserver/src/{layered_repository.rs => tenant.rs} (88%) rename pageserver/src/{layered_repository => tenant}/blob_io.rs (98%) rename pageserver/src/{layered_repository => tenant}/block_io.rs (98%) rename pageserver/src/{layered_repository => tenant}/delta_layer.rs (98%) rename pageserver/src/{layered_repository => tenant}/disk_btree.rs (99%) rename pageserver/src/{layered_repository => tenant}/disk_btree_test_data.rs (100%) rename pageserver/src/{layered_repository => tenant}/ephemeral_file.rs (97%) rename pageserver/src/{layered_repository => tenant}/filename.rs (100%) rename pageserver/src/{layered_repository => tenant}/image_layer.rs (97%) rename pageserver/src/{layered_repository => tenant}/inmemory_layer.rs (96%) rename pageserver/src/{layered_repository => tenant}/layer_map.rs (98%) rename pageserver/src/{layered_repository => tenant}/metadata.rs (98%) rename pageserver/src/{layered_repository => tenant}/par_fsync.rs (100%) rename pageserver/src/{layered_repository => tenant}/storage_layer.rs (100%) rename pageserver/src/{layered_repository => tenant}/timeline.rs (99%) diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 828d6a2e5a..e3160db53b 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -543,13 +543,7 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an match tenant_match.subcommand() { Some(("list", _)) => { for t in pageserver.tenant_list()? { - println!( - "{} {}", - t.id, - t.state - .map(|s| s.to_string()) - .unwrap_or_else(|| String::from("")) - ); + println!("{} {:?}", t.id, t.state); } } Some(("create", create_match)) => { diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 61facc852d..eca6a3c87f 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -22,8 +22,8 @@ use std::time::SystemTime; use tar::{Builder, EntryType, Header}; use tracing::*; -use crate::layered_repository::Timeline; use crate::reltag::{RelTag, SlruKind}; +use crate::tenant::Timeline; use postgres_ffi::v14::pg_constants; use postgres_ffi::v14::xlog_utils::{generate_wal_segment, normalize_lsn, XLogFileName}; diff --git a/pageserver/src/bin/dump_layerfile.rs b/pageserver/src/bin/dump_layerfile.rs index 87390a1b06..7e766ce859 100644 --- a/pageserver/src/bin/dump_layerfile.rs +++ b/pageserver/src/bin/dump_layerfile.rs @@ -3,8 +3,8 @@ //! A handy tool for debugging, that's all. use anyhow::Result; use clap::{App, Arg}; -use pageserver::layered_repository::dump_layerfile_from_path; use pageserver::page_cache; +use pageserver::tenant::dump_layerfile_from_path; use pageserver::virtual_file; use std::path::PathBuf; use utils::project_git_version; diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index ec71e5b320..679c6f76e7 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -182,7 +182,7 @@ fn initialize_config( cfg_file_path.display() ); } else { - // We're initializing the repo, so there's no config file yet + // We're initializing the tenant, so there's no config file yet ( DEFAULT_CONFIG_FILE .parse::() diff --git a/pageserver/src/bin/update_metadata.rs b/pageserver/src/bin/update_metadata.rs index 983fdb8647..3339564b0f 100644 --- a/pageserver/src/bin/update_metadata.rs +++ b/pageserver/src/bin/update_metadata.rs @@ -3,7 +3,7 @@ //! A handy tool for debugging, that's all. use anyhow::Result; use clap::{App, Arg}; -use pageserver::layered_repository::metadata::TimelineMetadata; +use pageserver::tenant::metadata::TimelineMetadata; use std::path::PathBuf; use std::str::FromStr; use utils::{lsn::Lsn, project_git_version}; diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index fb70ea327d..56171f46e3 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -19,7 +19,7 @@ use utils::{ zid::{NodeId, ZTenantId, ZTimelineId}, }; -use crate::layered_repository::TIMELINES_SEGMENT_NAME; +use crate::tenant::TIMELINES_SEGMENT_NAME; use crate::tenant_config::{TenantConf, TenantConfOpt}; pub mod defaults { diff --git a/pageserver/src/http/models.rs b/pageserver/src/http/models.rs index 7c7d7f7b0c..0ccf23776c 100644 --- a/pageserver/src/http/models.rs +++ b/pageserver/src/http/models.rs @@ -7,8 +7,7 @@ use utils::{ zid::{NodeId, ZTenantId, ZTimelineId}, }; -// These enums are used in the API response fields. -use crate::tenant_mgr::TenantState; +use crate::tenant::TenantState; #[serde_as] #[derive(Serialize, Deserialize)] @@ -108,7 +107,7 @@ impl TenantConfigRequest { pub struct TenantInfo { #[serde_as(as = "DisplayFromStr")] pub id: ZTenantId, - pub state: Option, + pub state: TenantState, pub current_physical_size: Option, // physical size is only included in `tenant_status` endpoint pub has_in_progress_downloads: Option, } diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index 6beb938d6a..b9a62d0f32 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -489,6 +489,7 @@ components: type: object required: - id + - state properties: id: type: string @@ -573,7 +574,6 @@ components: required: - last_record_lsn - disk_consistent_lsn - - timeline_state properties: last_record_lsn: type: string @@ -581,8 +581,6 @@ components: disk_consistent_lsn: type: string format: hex - timeline_state: - type: string ancestor_timeline_id: type: string format: hex diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 78f83511cb..36ba2e9b66 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -11,9 +11,9 @@ use super::models::{ StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo, TimelineCreateRequest, }; -use crate::layered_repository::Timeline; use crate::storage_sync; use crate::storage_sync::index::{RemoteIndex, RemoteTimeline}; +use crate::tenant::{TenantState, Timeline}; use crate::tenant_config::TenantConfOpt; use crate::{config::PageServerConf, tenant_mgr, timelines}; use utils::{ @@ -132,12 +132,11 @@ fn list_local_timelines( include_non_incremental_logical_size: bool, include_non_incremental_physical_size: bool, ) -> Result> { - let repo = tenant_mgr::get_repository_for_tenant(tenant_id) - .with_context(|| format!("Failed to get repo for tenant {tenant_id}"))?; - let repo_timelines = repo.list_timelines(); + let tenant = tenant_mgr::get_tenant(tenant_id, true)?; + let timelines = tenant.list_timelines(); - let mut local_timeline_info = Vec::with_capacity(repo_timelines.len()); - for (timeline_id, repository_timeline) in repo_timelines { + let mut local_timeline_info = Vec::with_capacity(timelines.len()); + for (timeline_id, repository_timeline) in timelines { local_timeline_info.push(( timeline_id, local_timeline_info_from_timeline( @@ -201,23 +200,31 @@ async fn timeline_list_handler(request: Request) -> Result, query_param_present(&request, "include-non-incremental-physical-size"); check_permission(&request, Some(tenant_id))?; - let local_timeline_infos = tokio::task::spawn_blocking(move || { + let timelines = tokio::task::spawn_blocking(move || { let _enter = info_span!("timeline_list", tenant = %tenant_id).entered(); - list_local_timelines( - tenant_id, - include_non_incremental_logical_size, - include_non_incremental_physical_size, - ) + Ok::<_, anyhow::Error>(tenant_mgr::get_tenant(tenant_id, true)?.list_timelines()) }) .await .map_err(ApiError::from_err)??; - let mut response_data = Vec::with_capacity(local_timeline_infos.len()); - for (timeline_id, local_timeline_info) in local_timeline_infos { + let mut response_data = Vec::with_capacity(timelines.len()); + for (timeline_id, timeline) in timelines { + let local = match local_timeline_info_from_timeline( + &timeline, + include_non_incremental_logical_size, + include_non_incremental_physical_size, + ) { + Ok(local) => Some(local), + Err(e) => { + error!("Failed to convert tenant timeline {timeline_id} into the local one: {e:?}"); + None + } + }; + response_data.push(TimelineInfo { tenant_id, timeline_id, - local: Some(local_timeline_info), + local, remote: get_state(&request) .remote_index .read() @@ -259,28 +266,25 @@ async fn timeline_detail_handler(request: Request) -> Result(local_timeline) + let timeline = tokio::task::spawn_blocking(move || { + tenant_mgr::get_tenant(tenant_id, true)?.get_timeline(timeline_id) }) .await - .ok() - .and_then(|r| r.ok()) - .flatten(); + .map_err(ApiError::from_err)?; + + let local_timeline_info = match timeline.and_then(|timeline| { + local_timeline_info_from_timeline( + &timeline, + include_non_incremental_logical_size, + include_non_incremental_physical_size, + ) + }) { + Ok(local_info) => Some(local_info), + Err(e) => { + error!("Failed to get local timeline info: {e:#}"); + None + } + }; let remote_timeline_info = { let remote_index_read = get_state(&request).remote_index.read().await; @@ -294,25 +298,26 @@ async fn timeline_detail_handler(request: Request) -> Result((local_timeline_info, remote_timeline_info)) } .instrument(info_span!("timeline_detail", tenant = %tenant_id, timeline = %timeline_id)) - .await; + .await?; if local_timeline_info.is_none() && remote_timeline_info.is_none() { - return Err(ApiError::NotFound(format!( + Err(ApiError::NotFound(format!( "Timeline {tenant_id}/{timeline_id} is not found neither locally nor remotely" - ))); + ))) + } else { + json_response( + StatusCode::OK, + TimelineInfo { + tenant_id, + timeline_id, + local: local_timeline_info, + remote: remote_timeline_info, + }, + ) } - - let timeline_info = TimelineInfo { - tenant_id, - timeline_id, - local: local_timeline_info, - remote: remote_timeline_info, - }; - - json_response(StatusCode::OK, timeline_info) } // TODO makes sense to provide tenant config right away the same way as it handled in tenant_create @@ -320,10 +325,10 @@ async fn tenant_attach_handler(request: Request) -> Result, let tenant_id: ZTenantId = parse_request_param(&request, "tenant_id")?; check_permission(&request, Some(tenant_id))?; - info!("Handling tenant attach {}", tenant_id); + info!("Handling tenant attach {tenant_id}"); tokio::task::spawn_blocking(move || { - if tenant_mgr::get_tenant_state(tenant_id).is_some() { + if tenant_mgr::get_tenant(tenant_id, false).is_ok() { anyhow::bail!("Tenant is already present locally") }; Ok(()) @@ -426,7 +431,7 @@ async fn timeline_delete_handler(request: Request) -> Result) -> Result, ApiErro check_permission(&request, Some(tenant_id))?; // if tenant is in progress of downloading it can be absent in global tenant map - let tenant_state = tokio::task::spawn_blocking(move || tenant_mgr::get_tenant_state(tenant_id)) + let tenant = tokio::task::spawn_blocking(move || tenant_mgr::get_tenant(tenant_id, false)) .await .map_err(ApiError::from_err)?; @@ -494,13 +499,25 @@ async fn tenant_status(request: Request) -> Result, ApiErro false }); + let tenant_state = match tenant { + Ok(tenant) => tenant.current_state(), + Err(e) => { + error!("Failed to get local tenant state: {e:#}"); + if has_in_progress_downloads { + TenantState::Paused + } else { + TenantState::Broken + } + } + }; + let current_physical_size = match tokio::task::spawn_blocking(move || list_local_timelines(tenant_id, false, false)) .await .map_err(ApiError::from_err)? { Err(err) => { - // Getting local timelines can fail when no local repo is on disk (e.g, when tenant data is being downloaded). + // Getting local timelines can fail when no local tenant directory is on disk (e.g, when tenant data is being downloaded). // In that case, put a warning message into log and operate normally. warn!("Failed to get local timelines for tenant {tenant_id}: {err}"); None diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index f8f614f8f4..ee0780f4b2 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -11,9 +11,9 @@ use bytes::Bytes; use tracing::*; use walkdir::WalkDir; -use crate::layered_repository::Timeline; use crate::pgdatadir_mapping::*; use crate::reltag::{RelTag, SlruKind}; +use crate::tenant::Timeline; use crate::walingest::WalIngest; use crate::walrecord::DecodedWALRecord; use postgres_ffi::v14::relfile_utils::*; diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 8b9251229e..5742568079 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -3,7 +3,6 @@ pub mod config; pub mod http; pub mod import_datadir; pub mod keyspace; -pub mod layered_repository; pub mod metrics; pub mod page_cache; pub mod page_service; @@ -13,6 +12,7 @@ pub mod reltag; pub mod repository; pub mod storage_sync; pub mod task_mgr; +pub mod tenant; pub mod tenant_config; pub mod tenant_mgr; pub mod tenant_tasks; @@ -181,7 +181,7 @@ mod backoff_defaults_tests { #[cfg(test)] mod tests { - use crate::layered_repository::repo_harness::TIMELINE_ID; + use crate::tenant::harness::TIMELINE_ID; use super::*; diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 27b1400243..15c3c22dd6 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -53,8 +53,8 @@ use utils::{ zid::{ZTenantId, ZTimelineId}, }; -use crate::layered_repository::writeback_ephemeral_file; use crate::repository::Key; +use crate::tenant::writeback_ephemeral_file; static PAGE_CACHE: OnceCell = OnceCell::new(); const TEST_PAGE_CACHE_SIZE: usize = 50; diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 149144bfe4..b03dab20e0 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -34,13 +34,13 @@ use utils::{ use crate::basebackup; use crate::config::{PageServerConf, ProfilingConfig}; use crate::import_datadir::{import_basebackup_from_tar, import_wal_from_tar}; -use crate::layered_repository::Timeline; use crate::metrics::{LIVE_CONNECTIONS_COUNT, SMGR_QUERY_TIME}; use crate::pgdatadir_mapping::LsnForTimestamp; use crate::profiling::profpoint_start; use crate::reltag::RelTag; use crate::task_mgr; use crate::task_mgr::TaskKind; +use crate::tenant::Timeline; use crate::tenant_mgr; use crate::CheckpointConfig; use postgres_ffi::v14::xlog_utils::to_pg_timestamp; @@ -477,8 +477,8 @@ impl PageServerHandler { task_mgr::associate_with(Some(tenant_id), Some(timeline_id)); // Create empty timeline info!("creating new timeline"); - let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?; - let timeline = repo.create_empty_timeline(timeline_id, base_lsn)?; + let timeline = tenant_mgr::get_tenant(tenant_id, true)? + .create_empty_timeline(timeline_id, base_lsn)?; // TODO mark timeline as not ready until it reaches end_lsn. // We might have some wal to import as well, and we should prevent compute @@ -539,10 +539,7 @@ impl PageServerHandler { ) -> anyhow::Result<()> { task_mgr::associate_with(Some(tenant_id), Some(timeline_id)); - let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?; - let timeline = repo - .get_timeline(timeline_id) - .with_context(|| format!("Timeline {timeline_id} was not found"))?; + let timeline = get_local_timeline(tenant_id, timeline_id)?; ensure!(timeline.get_last_record_lsn() == start_lsn); // TODO leave clean state on error. For now you can use detach to clean @@ -770,7 +767,7 @@ impl PageServerHandler { // when accessing management api supply None as an argument // when using to authorize tenant pass corresponding tenant id - fn check_permission(&self, tenantid: Option) -> Result<()> { + fn check_permission(&self, tenant_id: Option) -> Result<()> { if self.auth.is_none() { // auth is set to Trust, nothing to check so just return ok return Ok(()); @@ -782,7 +779,7 @@ impl PageServerHandler { .claims .as_ref() .expect("claims presence already checked"); - auth::check_permission(claims, tenantid) + auth::check_permission(claims, tenant_id) } } @@ -809,7 +806,7 @@ impl postgres_backend_async::Handler for PageServerHandler { } info!( - "jwt auth succeeded for scope: {:#?} by tenantid: {:?}", + "jwt auth succeeded for scope: {:#?} by tenant id: {:?}", data.claims.scope, data.claims.tenant_id, ); @@ -1013,8 +1010,8 @@ impl postgres_backend_async::Handler for PageServerHandler { let (_, params_raw) = query_string.split_at("show ".len()); let params = params_raw.split(' ').collect::>(); ensure!(params.len() == 1, "invalid param number for config command"); - let tenantid = ZTenantId::from_str(params[0])?; - let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; + let tenant_id = ZTenantId::from_str(params[0])?; + let tenant = tenant_mgr::get_tenant(tenant_id, true)?; pgb.write_message(&BeMessage::RowDescription(&[ RowDescriptor::int8_col(b"checkpoint_distance"), RowDescriptor::int8_col(b"checkpoint_timeout"), @@ -1027,25 +1024,27 @@ impl postgres_backend_async::Handler for PageServerHandler { RowDescriptor::int8_col(b"pitr_interval"), ]))? .write_message(&BeMessage::DataRow(&[ - Some(repo.get_checkpoint_distance().to_string().as_bytes()), + Some(tenant.get_checkpoint_distance().to_string().as_bytes()), Some( - repo.get_checkpoint_timeout() + tenant + .get_checkpoint_timeout() .as_secs() .to_string() .as_bytes(), ), - Some(repo.get_compaction_target_size().to_string().as_bytes()), + Some(tenant.get_compaction_target_size().to_string().as_bytes()), Some( - repo.get_compaction_period() + tenant + .get_compaction_period() .as_secs() .to_string() .as_bytes(), ), - Some(repo.get_compaction_threshold().to_string().as_bytes()), - Some(repo.get_gc_horizon().to_string().as_bytes()), - Some(repo.get_gc_period().as_secs().to_string().as_bytes()), - Some(repo.get_image_creation_threshold().to_string().as_bytes()), - Some(repo.get_pitr_interval().as_secs().to_string().as_bytes()), + Some(tenant.get_compaction_threshold().to_string().as_bytes()), + Some(tenant.get_gc_horizon().to_string().as_bytes()), + Some(tenant.get_gc_period().as_secs().to_string().as_bytes()), + Some(tenant.get_image_creation_threshold().to_string().as_bytes()), + Some(tenant.get_pitr_interval().as_secs().to_string().as_bytes()), ]))? .write_message(&BeMessage::CommandComplete(b"SELECT 1"))?; } else if query_string.starts_with("do_gc ") { @@ -1066,16 +1065,16 @@ impl postgres_backend_async::Handler for PageServerHandler { let tenant_id = ZTenantId::from_str(caps.get(1).unwrap().as_str())?; let timeline_id = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?; - let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?; + let tenant = tenant_mgr::get_tenant(tenant_id, true)?; let gc_horizon: u64 = caps .get(4) .map(|h| h.as_str().parse()) - .unwrap_or_else(|| Ok(repo.get_gc_horizon()))?; + .unwrap_or_else(|| Ok(tenant.get_gc_horizon()))?; // Use tenant's pitr setting - let pitr = repo.get_pitr_interval(); - let result = repo.gc_iteration(Some(timeline_id), gc_horizon, pitr, true)?; + let pitr = tenant.get_pitr_interval(); + let result = tenant.gc_iteration(Some(timeline_id), gc_horizon, pitr, true)?; pgb.write_message(&BeMessage::RowDescription(&[ RowDescriptor::int8_col(b"layers_total"), RowDescriptor::int8_col(b"layers_needed_by_cutoff"), @@ -1169,12 +1168,7 @@ impl postgres_backend_async::Handler for PageServerHandler { } fn get_local_timeline(tenant_id: ZTenantId, timeline_id: ZTimelineId) -> Result> { - tenant_mgr::get_repository_for_tenant(tenant_id) - .and_then(|repo| { - repo.get_timeline(timeline_id) - .context("No timeline in tenant's repository") - }) - .with_context(|| format!("Could not get timeline {timeline_id} in tenant {tenant_id}")) + tenant_mgr::get_tenant(tenant_id, true).and_then(|tenant| tenant.get_timeline(timeline_id)) } /// diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index ba48a77961..2454b6f54f 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -7,9 +7,9 @@ //! Clarify that) //! use crate::keyspace::{KeySpace, KeySpaceAccum}; -use crate::layered_repository::Timeline; use crate::reltag::{RelTag, SlruKind}; use crate::repository::*; +use crate::tenant::Timeline; use crate::walrecord::ZenithWalRecord; use anyhow::{bail, ensure, Result}; use bytes::{Buf, Bytes}; @@ -1398,16 +1398,12 @@ fn is_slru_block_key(key: Key) -> bool { && key.field6 != 0xffffffff // and not SlruSegSize } -// -//-- Tests that should work the same with any Repository/Timeline implementation. -// - #[cfg(test)] pub fn create_test_timeline( - repo: &crate::layered_repository::Repository, + tenant: &crate::tenant::Tenant, timeline_id: utils::zid::ZTimelineId, ) -> Result> { - let tline = repo.create_empty_timeline(timeline_id, Lsn(8))?; + let tline = tenant.create_empty_timeline(timeline_id, Lsn(8))?; let mut m = tline.begin_modification(Lsn(8)); m.init_empty()?; m.commit()?; diff --git a/pageserver/src/storage_sync.rs b/pageserver/src/storage_sync.rs index 8ebfa6a935..c104dba298 100644 --- a/pageserver/src/storage_sync.rs +++ b/pageserver/src/storage_sync.rs @@ -46,10 +46,10 @@ //! Some time later, during pageserver checkpoints, in-memory data is flushed onto disk along with its metadata. //! If the storage sync loop was successfully started before, pageserver schedules the layer files and the updated metadata file for upload, every time a layer is flushed to disk. //! The uploads are disabled, if no remote storage configuration is provided (no sync loop is started this way either). -//! See [`crate::layered_repository`] for the upload calls and the adjacent logic. +//! See [`crate::tenant`] for the upload calls and the adjacent logic. //! -//! Synchronization logic is able to communicate back with updated timeline sync states, [`crate::repository::TimelineSyncStatusUpdate`], -//! submitted via [`crate::tenant_mgr::apply_timeline_sync_status_updates`] function. Tenant manager applies corresponding timeline updates in pageserver's in-memory state. +//! Synchronization logic is able to communicate back with updated timeline sync states, submitted via [`crate::tenant_mgr::attach_local_tenants`] function. +//! Tenant manager applies corresponding timeline updates in pageserver's in-memory state. //! Such submissions happen in two cases: //! * once after the sync loop startup, to signal pageserver which timelines will be synchronized in the near future //! * after every loop step, in case a timeline needs to be reloaded or evicted from pageserver's memory @@ -171,11 +171,11 @@ use self::{ use crate::{ config::PageServerConf, exponential_backoff, - layered_repository::metadata::{metadata_path, TimelineMetadata}, storage_sync::index::RemoteIndex, task_mgr, task_mgr::TaskKind, task_mgr::BACKGROUND_RUNTIME, + tenant::metadata::{metadata_path, TimelineMetadata}, tenant_mgr::attach_local_tenants, }; use crate::{ @@ -714,17 +714,17 @@ async fn storage_sync_loop( }; if tenant_entry.has_in_progress_downloads() { - info!("Tenant {tenant_id} has pending timeline downloads, skipping repository registration"); + info!("Tenant {tenant_id} has pending timeline downloads, skipping tenant registration"); continue; } else { info!( - "Tenant {tenant_id} download completed. Picking to register in repository" + "Tenant {tenant_id} download completed. Picking to register in tenant" ); // Here we assume that if tenant has no in-progress downloads that // means that it is the last completed timeline download that triggered // sync status update. So we look at the index for available timelines - // and register them all at once in a repository for download - // to be submitted in a single operation to repository + // and register them all at once in a tenant for download + // to be submitted in a single operation to tenant // so it can apply them at once to internal timeline map. timelines_to_attach.0.insert( tenant_id, @@ -737,9 +737,7 @@ async fn storage_sync_loop( } drop(index_accessor); // Batch timeline download registration to ensure that the external registration code won't block any running tasks before. - if let Err(e) = attach_local_tenants(conf, &index, timelines_to_attach) { - error!("Failed to attach new timelines: {e:?}"); - }; + attach_local_tenants(conf, &index, timelines_to_attach); } } ControlFlow::Break(()) => { @@ -1038,13 +1036,7 @@ async fn update_local_metadata( timeline_id, } = sync_id; tokio::task::spawn_blocking(move || { - crate::layered_repository::save_metadata( - conf, - timeline_id, - tenant_id, - &cloned_metadata, - true, - ) + crate::tenant::save_metadata(conf, timeline_id, tenant_id, &cloned_metadata, true) }) .await .with_context(|| { @@ -1411,12 +1403,12 @@ fn register_sync_status( mod test_utils { use utils::lsn::Lsn; - use crate::layered_repository::repo_harness::RepoHarness; + use crate::tenant::harness::TenantHarness; use super::*; pub(super) async fn create_local_timeline( - harness: &RepoHarness<'_>, + harness: &TenantHarness<'_>, timeline_id: ZTimelineId, filenames: &[&str], metadata: TimelineMetadata, @@ -1456,7 +1448,7 @@ mod test_utils { #[cfg(test)] mod tests { use super::test_utils::dummy_metadata; - use crate::layered_repository::repo_harness::TIMELINE_ID; + use crate::tenant::harness::TIMELINE_ID; use hex_literal::hex; use utils::lsn::Lsn; diff --git a/pageserver/src/storage_sync/delete.rs b/pageserver/src/storage_sync/delete.rs index 794ecbaeb3..945f5fded8 100644 --- a/pageserver/src/storage_sync/delete.rs +++ b/pageserver/src/storage_sync/delete.rs @@ -112,8 +112,8 @@ mod tests { use utils::lsn::Lsn; use crate::{ - layered_repository::repo_harness::{RepoHarness, TIMELINE_ID}, storage_sync::test_utils::{create_local_timeline, dummy_metadata}, + tenant::harness::{TenantHarness, TIMELINE_ID}, }; use remote_storage::{LocalFs, RemoteStorage}; @@ -121,7 +121,7 @@ mod tests { #[tokio::test] async fn delete_timeline_negative() -> anyhow::Result<()> { - let harness = RepoHarness::create("delete_timeline_negative")?; + let harness = TenantHarness::create("delete_timeline_negative")?; let sync_queue = SyncQueue::new(NonZeroUsize::new(100).unwrap()); let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID); let storage = GenericRemoteStorage::new(LocalFs::new( @@ -154,7 +154,7 @@ mod tests { #[tokio::test] async fn delete_timeline() -> anyhow::Result<()> { - let harness = RepoHarness::create("delete_timeline")?; + let harness = TenantHarness::create("delete_timeline")?; let sync_queue = SyncQueue::new(NonZeroUsize::new(100).unwrap()); let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID); diff --git a/pageserver/src/storage_sync/download.rs b/pageserver/src/storage_sync/download.rs index 91ee557b79..32f228b447 100644 --- a/pageserver/src/storage_sync/download.rs +++ b/pageserver/src/storage_sync/download.rs @@ -17,7 +17,7 @@ use tokio::{ use tracing::{debug, error, info, warn}; use crate::{ - config::PageServerConf, layered_repository::metadata::metadata_path, storage_sync::SyncTask, + config::PageServerConf, storage_sync::SyncTask, tenant::metadata::metadata_path, TEMP_FILE_SUFFIX, }; use utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId}; @@ -425,18 +425,18 @@ mod tests { use utils::lsn::Lsn; use crate::{ - layered_repository::repo_harness::{RepoHarness, TIMELINE_ID}, storage_sync::{ index::RelativePath, test_utils::{create_local_timeline, dummy_metadata}, }, + tenant::harness::{TenantHarness, TIMELINE_ID}, }; use super::*; #[tokio::test] async fn download_timeline() -> anyhow::Result<()> { - let harness = RepoHarness::create("download_timeline")?; + let harness = TenantHarness::create("download_timeline")?; let sync_queue = SyncQueue::new(NonZeroUsize::new(100).unwrap()); let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID); @@ -537,7 +537,7 @@ mod tests { #[tokio::test] async fn download_timeline_negatives() -> anyhow::Result<()> { - let harness = RepoHarness::create("download_timeline_negatives")?; + let harness = TenantHarness::create("download_timeline_negatives")?; let sync_queue = SyncQueue::new(NonZeroUsize::new(100).unwrap()); let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID); let storage = GenericRemoteStorage::new(LocalFs::new( @@ -596,7 +596,7 @@ mod tests { #[tokio::test] async fn test_download_index_part() -> anyhow::Result<()> { - let harness = RepoHarness::create("test_download_index_part")?; + let harness = TenantHarness::create("test_download_index_part")?; let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID); let storage = GenericRemoteStorage::new(LocalFs::new( diff --git a/pageserver/src/storage_sync/index.rs b/pageserver/src/storage_sync/index.rs index b17bb40da4..cff14cde49 100644 --- a/pageserver/src/storage_sync/index.rs +++ b/pageserver/src/storage_sync/index.rs @@ -15,7 +15,7 @@ use serde_with::{serde_as, DisplayFromStr}; use tokio::sync::RwLock; use tracing::log::warn; -use crate::{config::PageServerConf, layered_repository::metadata::TimelineMetadata}; +use crate::{config::PageServerConf, tenant::metadata::TimelineMetadata}; use utils::{ lsn::Lsn, zid::{ZTenantId, ZTenantTimelineId, ZTimelineId}, @@ -340,11 +340,11 @@ mod tests { use std::collections::BTreeSet; use super::*; - use crate::layered_repository::repo_harness::{RepoHarness, TIMELINE_ID}; + use crate::tenant::harness::{TenantHarness, TIMELINE_ID}; #[test] fn index_part_conversion() { - let harness = RepoHarness::create("index_part_conversion").unwrap(); + let harness = TenantHarness::create("index_part_conversion").unwrap(); let timeline_path = harness.timeline_path(&TIMELINE_ID); let metadata = TimelineMetadata::new(Lsn(5).align(), Some(Lsn(4)), None, Lsn(3), Lsn(2), Lsn(1)); @@ -462,7 +462,7 @@ mod tests { #[test] fn index_part_conversion_negatives() { - let harness = RepoHarness::create("index_part_conversion_negatives").unwrap(); + let harness = TenantHarness::create("index_part_conversion_negatives").unwrap(); let timeline_path = harness.timeline_path(&TIMELINE_ID); let metadata = TimelineMetadata::new(Lsn(5).align(), Some(Lsn(4)), None, Lsn(3), Lsn(2), Lsn(1)); diff --git a/pageserver/src/storage_sync/upload.rs b/pageserver/src/storage_sync/upload.rs index a4285e426b..bd09e6b898 100644 --- a/pageserver/src/storage_sync/upload.rs +++ b/pageserver/src/storage_sync/upload.rs @@ -15,9 +15,7 @@ use super::{ LayersUpload, SyncData, SyncQueue, }; use crate::metrics::NO_LAYERS_UPLOAD; -use crate::{ - config::PageServerConf, layered_repository::metadata::metadata_path, storage_sync::SyncTask, -}; +use crate::{config::PageServerConf, storage_sync::SyncTask, tenant::metadata::metadata_path}; /// Serializes and uploads the given index part data to the remote storage. pub(super) async fn upload_index_part( @@ -202,18 +200,18 @@ mod tests { use utils::lsn::Lsn; use crate::{ - layered_repository::repo_harness::{RepoHarness, TIMELINE_ID}, storage_sync::{ index::RelativePath, test_utils::{create_local_timeline, dummy_metadata}, }, + tenant::harness::{TenantHarness, TIMELINE_ID}, }; use super::{upload_index_part, *}; #[tokio::test] async fn regular_layer_upload() -> anyhow::Result<()> { - let harness = RepoHarness::create("regular_layer_upload")?; + let harness = TenantHarness::create("regular_layer_upload")?; let sync_queue = SyncQueue::new(NonZeroUsize::new(100).unwrap()); let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID); @@ -301,7 +299,7 @@ mod tests { // Currently, GC can run between upload retries, removing local layers scheduled for upload. Test this scenario. #[tokio::test] async fn layer_upload_after_local_fs_update() -> anyhow::Result<()> { - let harness = RepoHarness::create("layer_upload_after_local_fs_update")?; + let harness = TenantHarness::create("layer_upload_after_local_fs_update")?; let sync_queue = SyncQueue::new(NonZeroUsize::new(100).unwrap()); let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID); @@ -396,7 +394,7 @@ mod tests { #[tokio::test] async fn test_upload_index_part() -> anyhow::Result<()> { - let harness = RepoHarness::create("test_upload_index_part")?; + let harness = TenantHarness::create("test_upload_index_part")?; let sync_id = ZTenantTimelineId::new(harness.tenant_id, TIMELINE_ID); let storage = GenericRemoteStorage::new(LocalFs::new( diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/tenant.rs similarity index 88% rename from pageserver/src/layered_repository.rs rename to pageserver/src/tenant.rs index ecc0bfe3b5..4ef810faba 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/tenant.rs @@ -1,6 +1,6 @@ //! //! Timeline repository implementation that keeps old data in files on disk, and -//! the recent changes in memory. See layered_repository/*_layer.rs files. +//! the recent changes in memory. See tenant/*_layer.rs files. //! The functions here are responsible for locating the correct layer for the //! get/put call, walking back the timeline branching history as needed. //! @@ -12,6 +12,7 @@ //! use anyhow::{bail, ensure, Context, Result}; +use tokio::sync::watch; use tracing::*; use std::cmp::min; @@ -71,24 +72,26 @@ use storage_layer::Layer; pub use timeline::Timeline; // re-export this function so that page_cache.rs can use it. -pub use crate::layered_repository::ephemeral_file::writeback as writeback_ephemeral_file; +pub use crate::tenant::ephemeral_file::writeback as writeback_ephemeral_file; // re-export for use in storage_sync.rs -pub use crate::layered_repository::metadata::save_metadata; +pub use crate::tenant::metadata::save_metadata; // re-export for use in walreceiver -pub use crate::layered_repository::timeline::WalReceiverInfo; +pub use crate::tenant::timeline::WalReceiverInfo; /// Parts of the `.neon/tenants//timelines/` directory prefix. pub const TIMELINES_SEGMENT_NAME: &str = "timelines"; /// -/// Repository consists of multiple timelines. Keep them in a hash table. +/// Tenant consists of multiple timelines. Keep them in a hash table. /// -pub struct Repository { +pub struct Tenant { // Global pageserver config parameters pub conf: &'static PageServerConf, + state: watch::Sender, + // Overridden tenant-specific config parameters. // We keep TenantConfOpt sturct here to preserve the information // about parameters that are not set. @@ -114,17 +117,40 @@ pub struct Repository { upload_layers: bool, } +/// A state of a tenant in pageserver's memory. +#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +pub enum TenantState { + /// Tenant is fully operational, its background jobs might be running or not. + Active { background_jobs_running: bool }, + /// A tenant is recognized by pageserver, but not yet ready to operate: + /// e.g. not present locally and being downloaded or being read into memory from the file system. + Paused, + /// A tenant is recognized by the pageserver, but no longer used for any operations, as failed to get activated. + Broken, +} + /// A repository corresponds to one .neon directory. One repository holds multiple /// timelines, forked off from the same initial call to 'initdb'. -impl Repository { +impl Tenant { /// Get Timeline handle for given zenith timeline ID. /// This function is idempotent. It doesn't change internal state in any way. - pub fn get_timeline(&self, timeline_id: ZTimelineId) -> Option> { - self.timelines.lock().unwrap().get(&timeline_id).cloned() + pub fn get_timeline(&self, timeline_id: ZTimelineId) -> anyhow::Result> { + self.timelines + .lock() + .unwrap() + .get(&timeline_id) + .with_context(|| { + format!( + "Timeline {} was not found for tenant {}", + timeline_id, + self.tenant_id() + ) + }) + .map(Arc::clone) } - /// Lists timelines the repository contains. - /// Up to repository's implementation to omit certain timelines that ar not considered ready for use. + /// Lists timelines the tenant contains. + /// Up to tenant's implementation to omit certain timelines that ar not considered ready for use. pub fn list_timelines(&self) -> Vec<(ZTimelineId, Arc)> { self.timelines .lock() @@ -425,6 +451,54 @@ impl Repository { pub fn get_remote_index(&self) -> &RemoteIndex { &self.remote_index } + + pub fn current_state(&self) -> TenantState { + *self.state.borrow() + } + + pub fn is_active(&self) -> bool { + matches!(self.current_state(), TenantState::Active { .. }) + } + + pub fn should_run_tasks(&self) -> bool { + matches!( + self.current_state(), + TenantState::Active { + background_jobs_running: true + } + ) + } + + /// Changes tenant status to active, if it was not broken before. + /// Otherwise, ignores the state change, logging an error. + pub fn activate(&self, enable_background_jobs: bool) { + self.set_state(TenantState::Active { + background_jobs_running: enable_background_jobs, + }); + } + + pub fn set_state(&self, new_state: TenantState) { + match (self.current_state(), new_state) { + (equal_state_1, equal_state_2) if equal_state_1 == equal_state_2 => { + debug!("Ignoring new state, equal to the existing one: {equal_state_2:?}"); + } + (TenantState::Broken, _) => { + error!("Ignoring state update {new_state:?} for broken tenant"); + } + (_, new_state) => { + self.state.send_replace(new_state); + if self.should_run_tasks() { + // Spawn gc and compaction loops. The loops will shut themselves + // down when they notice that the tenant is inactive. + crate::tenant_tasks::start_background_loops(self.tenant_id); + } + } + } + } + + pub fn subscribe_for_state_updates(&self) -> watch::Receiver { + self.state.subscribe() + } } /// Given a Vec of timelines and their ancestors (timeline_id, ancestor_id), @@ -471,7 +545,7 @@ fn tree_sort_timelines( } /// Private functions -impl Repository { +impl Tenant { pub fn get_checkpoint_distance(&self) -> u64 { let tenant_conf = self.tenant_conf.read().unwrap(); tenant_conf @@ -609,8 +683,9 @@ impl Repository { tenant_id: ZTenantId, remote_index: RemoteIndex, upload_layers: bool, - ) -> Repository { - Repository { + ) -> Tenant { + let (state, _) = watch::channel(TenantState::Paused); + Tenant { tenant_id, conf, tenant_conf: Arc::new(RwLock::new(tenant_conf)), @@ -619,6 +694,7 @@ impl Repository { walredo_mgr, remote_index, upload_layers, + state, } } @@ -848,7 +924,7 @@ impl Repository { // compaction (both require `layer_removal_cs` lock), // but the GC iteration can run concurrently with branch creation. // - // See comments in [`Repository::branch_timeline`] for more information + // See comments in [`Tenant::branch_timeline`] for more information // about why branch creation task can run concurrently with timeline's GC iteration. for timeline in gc_timelines { if task_mgr::is_shutdown_requested() { @@ -881,7 +957,7 @@ impl Repository { } } -impl Drop for Repository { +impl Drop for Tenant { fn drop(&mut self) { remove_tenant_metrics(&self.tenant_id); } @@ -910,7 +986,7 @@ pub fn dump_layerfile_from_path(path: &Path, verbose: bool) -> Result<()> { } #[cfg(test)] -pub mod repo_harness { +pub mod harness { use bytes::{Bytes, BytesMut}; use once_cell::sync::Lazy; use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}; @@ -920,8 +996,8 @@ pub mod repo_harness { use crate::storage_sync::index::RemoteIndex; use crate::{ config::PageServerConf, - layered_repository::Repository, repository::Key, + tenant::Tenant, walrecord::ZenithWalRecord, walredo::{WalRedoError, WalRedoManager}, }; @@ -968,7 +1044,7 @@ pub mod repo_harness { } } - pub struct RepoHarness<'a> { + pub struct TenantHarness<'a> { pub conf: &'static PageServerConf, pub tenant_conf: TenantConf, pub tenant_id: ZTenantId, @@ -979,7 +1055,7 @@ pub mod repo_harness { ), } - impl<'a> RepoHarness<'a> { + impl<'a> TenantHarness<'a> { pub fn create(test_name: &'static str) -> Result { Self::create_internal(test_name, false) } @@ -1016,14 +1092,14 @@ pub mod repo_harness { }) } - pub fn load(&self) -> Repository { - self.try_load().expect("failed to load test repo") + pub fn load(&self) -> Tenant { + self.try_load().expect("failed to load test tenant") } - pub fn try_load(&self) -> Result { + pub fn try_load(&self) -> Result { let walredo_mgr = Arc::new(TestRedoManager); - let repo = Repository::new( + let tenant = Tenant::new( self.conf, TenantConfOpt::from(self.tenant_conf), walredo_mgr, @@ -1031,7 +1107,7 @@ pub mod repo_harness { RemoteIndex::default(), false, ); - // populate repo with locally available timelines + // populate tenant with locally available timelines let mut timelines_to_load = HashMap::new(); for timeline_dir_entry in fs::read_dir(self.conf.timelines_path(&self.tenant_id)) .expect("should be able to read timelines dir") @@ -1043,12 +1119,13 @@ pub mod repo_harness { .unwrap() .to_string_lossy() .parse()?; + let timeline_metadata = load_metadata(self.conf, timeline_id, self.tenant_id)?; timelines_to_load.insert(timeline_id, timeline_metadata); } - repo.init_attach_timelines(timelines_to_load)?; + tenant.init_attach_timelines(timelines_to_load)?; - Ok(repo) + Ok(tenant) } pub fn timeline_path(&self, timeline_id: &ZTimelineId) -> PathBuf { @@ -1110,8 +1187,8 @@ mod tests { use super::metadata::METADATA_FILE_NAME; use super::*; use crate::keyspace::KeySpaceAccum; - use crate::layered_repository::repo_harness::*; use crate::repository::{Key, Value}; + use crate::tenant::harness::*; use bytes::BytesMut; use hex_literal::hex; use once_cell::sync::Lazy; @@ -1122,8 +1199,8 @@ mod tests { #[test] fn test_basic() -> Result<()> { - let repo = RepoHarness::create("test_basic")?.load(); - let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?; + let tenant = TenantHarness::create("test_basic")?.load(); + let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?; let writer = tline.writer(); writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?; @@ -1144,10 +1221,10 @@ mod tests { #[test] fn no_duplicate_timelines() -> Result<()> { - let repo = RepoHarness::create("no_duplicate_timelines")?.load(); - let _ = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?; + let tenant = TenantHarness::create("no_duplicate_timelines")?.load(); + let _ = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?; - match repo.create_empty_timeline(TIMELINE_ID, Lsn(0)) { + match tenant.create_empty_timeline(TIMELINE_ID, Lsn(0)) { Ok(_) => panic!("duplicate timeline creation should fail"), Err(e) => assert_eq!( e.to_string(), @@ -1170,8 +1247,8 @@ mod tests { /// #[test] fn test_branch() -> Result<()> { - let repo = RepoHarness::create("test_branch")?.load(); - let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?; + let tenant = TenantHarness::create("test_branch")?.load(); + let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?; let writer = tline.writer(); use std::str::from_utf8; @@ -1193,8 +1270,8 @@ mod tests { //assert_current_logical_size(&tline, Lsn(0x40)); // Branch the history, modify relation differently on the new timeline - repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x30)))?; - let newtline = repo + tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x30)))?; + let newtline = tenant .get_timeline(NEW_TIMELINE_ID) .expect("Should have a local timeline"); let new_writer = newtline.writer(); @@ -1263,19 +1340,20 @@ mod tests { #[test] fn test_prohibit_branch_creation_on_garbage_collected_data() -> Result<()> { - let repo = - RepoHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?.load(); - let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?; + let tenant = + TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")? + .load(); + let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?; make_some_layers(tline.as_ref(), Lsn(0x20))?; // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50 // FIXME: this doesn't actually remove any layer currently, given how the checkpointing // and compaction works. But it does set the 'cutoff' point so that the cross check // below should fail. - repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?; + tenant.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?; // try to branch at lsn 25, should fail because we already garbage collected the data - match repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x25))) { + match tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x25))) { Ok(_) => panic!("branching should have failed"), Err(err) => { assert!(err.to_string().contains("invalid branch start lsn")); @@ -1292,11 +1370,12 @@ mod tests { #[test] fn test_prohibit_branch_creation_on_pre_initdb_lsn() -> Result<()> { - let repo = RepoHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")?.load(); + let tenant = + TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")?.load(); - repo.create_empty_timeline(TIMELINE_ID, Lsn(0x50))?; + tenant.create_empty_timeline(TIMELINE_ID, Lsn(0x50))?; // try to branch at lsn 0x25, should fail because initdb lsn is 0x50 - match repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x25))) { + match tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x25))) { Ok(_) => panic!("branching should have failed"), Err(err) => { assert!(&err.to_string().contains("invalid branch start lsn")); @@ -1336,36 +1415,37 @@ mod tests { #[test] fn test_retain_data_in_parent_which_is_needed_for_child() -> Result<()> { - let repo = - RepoHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?.load(); - let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?; + let tenant = + TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?.load(); + let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?; make_some_layers(tline.as_ref(), Lsn(0x20))?; - repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?; - let newtline = repo + tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?; + let newtline = tenant .get_timeline(NEW_TIMELINE_ID) .expect("Should have a local timeline"); // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50 - repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?; + tenant.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?; assert!(newtline.get(*TEST_KEY, Lsn(0x25)).is_ok()); Ok(()) } #[test] fn test_parent_keeps_data_forever_after_branching() -> Result<()> { - let repo = RepoHarness::create("test_parent_keeps_data_forever_after_branching")?.load(); - let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?; + let tenant = + TenantHarness::create("test_parent_keeps_data_forever_after_branching")?.load(); + let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?; make_some_layers(tline.as_ref(), Lsn(0x20))?; - repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?; - let newtline = repo + tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?; + let newtline = tenant .get_timeline(NEW_TIMELINE_ID) .expect("Should have a local timeline"); make_some_layers(newtline.as_ref(), Lsn(0x60))?; // run gc on parent - repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?; + tenant.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?; // Check that the data is still accessible on the branch. assert_eq!( @@ -1379,16 +1459,17 @@ mod tests { #[test] fn timeline_load() -> Result<()> { const TEST_NAME: &str = "timeline_load"; - let harness = RepoHarness::create(TEST_NAME)?; + let harness = TenantHarness::create(TEST_NAME)?; { - let repo = harness.load(); - let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0x8000))?; + let tenant = harness.load(); + let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0x8000))?; make_some_layers(tline.as_ref(), Lsn(0x8000))?; tline.checkpoint(CheckpointConfig::Forced)?; } - let repo = harness.load(); - repo.get_timeline(TIMELINE_ID) + let tenant = harness.load(); + tenant + .get_timeline(TIMELINE_ID) .expect("cannot load timeline"); Ok(()) @@ -1397,18 +1478,18 @@ mod tests { #[test] fn timeline_load_with_ancestor() -> Result<()> { const TEST_NAME: &str = "timeline_load_with_ancestor"; - let harness = RepoHarness::create(TEST_NAME)?; + let harness = TenantHarness::create(TEST_NAME)?; // create two timelines { - let repo = harness.load(); - let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?; + let tenant = harness.load(); + let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?; make_some_layers(tline.as_ref(), Lsn(0x20))?; tline.checkpoint(CheckpointConfig::Forced)?; - repo.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?; + tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?; - let newtline = repo + let newtline = tenant .get_timeline(NEW_TIMELINE_ID) .expect("Should have a local timeline"); @@ -1417,14 +1498,14 @@ mod tests { } // check that both of them are initially unloaded - let repo = harness.load(); + let tenant = harness.load(); // check that both, child and ancestor are loaded - let _child_tline = repo + let _child_tline = tenant .get_timeline(NEW_TIMELINE_ID) .expect("cannot get child timeline loaded"); - let _ancestor_tline = repo + let _ancestor_tline = tenant .get_timeline(TIMELINE_ID) .expect("cannot get ancestor timeline loaded"); @@ -1434,11 +1515,11 @@ mod tests { #[test] fn corrupt_metadata() -> Result<()> { const TEST_NAME: &str = "corrupt_metadata"; - let harness = RepoHarness::create(TEST_NAME)?; - let repo = harness.load(); + let harness = TenantHarness::create(TEST_NAME)?; + let tenant = harness.load(); - repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?; - drop(repo); + tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?; + drop(tenant); let metadata_path = harness.timeline_path(&TIMELINE_ID).join(METADATA_FILE_NAME); @@ -1473,8 +1554,8 @@ mod tests { #[test] fn test_images() -> Result<()> { - let repo = RepoHarness::create("test_images")?.load(); - let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?; + let tenant = TenantHarness::create("test_images")?.load(); + let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?; let writer = tline.writer(); writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?; @@ -1523,8 +1604,8 @@ mod tests { // #[test] fn test_bulk_insert() -> Result<()> { - let repo = RepoHarness::create("test_bulk_insert")?.load(); - let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?; + let tenant = TenantHarness::create("test_bulk_insert")?.load(); + let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?; let mut lsn = Lsn(0x10); @@ -1563,8 +1644,8 @@ mod tests { #[test] fn test_random_updates() -> Result<()> { - let repo = RepoHarness::create("test_random_updates")?.load(); - let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?; + let tenant = TenantHarness::create("test_random_updates")?.load(); + let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?; const NUM_KEYS: usize = 1000; @@ -1633,8 +1714,8 @@ mod tests { #[test] fn test_traverse_branches() -> Result<()> { - let repo = RepoHarness::create("test_traverse_branches")?.load(); - let mut tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?; + let tenant = TenantHarness::create("test_traverse_branches")?.load(); + let mut tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?; const NUM_KEYS: usize = 1000; @@ -1667,8 +1748,8 @@ mod tests { let mut tline_id = TIMELINE_ID; for _ in 0..50 { let new_tline_id = ZTimelineId::generate(); - repo.branch_timeline(tline_id, new_tline_id, Some(lsn))?; - tline = repo + tenant.branch_timeline(tline_id, new_tline_id, Some(lsn))?; + tline = tenant .get_timeline(new_tline_id) .expect("Should have the branched timeline"); tline_id = new_tline_id; @@ -1712,8 +1793,8 @@ mod tests { #[test] fn test_traverse_ancestors() -> Result<()> { - let repo = RepoHarness::create("test_traverse_ancestors")?.load(); - let mut tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?; + let tenant = TenantHarness::create("test_traverse_ancestors")?.load(); + let mut tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?; const NUM_KEYS: usize = 100; const NUM_TLINES: usize = 50; @@ -1728,8 +1809,8 @@ mod tests { #[allow(clippy::needless_range_loop)] for idx in 0..NUM_TLINES { let new_tline_id = ZTimelineId::generate(); - repo.branch_timeline(tline_id, new_tline_id, Some(lsn))?; - tline = repo + tenant.branch_timeline(tline_id, new_tline_id, Some(lsn))?; + tline = tenant .get_timeline(new_tline_id) .expect("Should have the branched timeline"); tline_id = new_tline_id; diff --git a/pageserver/src/layered_repository/blob_io.rs b/pageserver/src/tenant/blob_io.rs similarity index 98% rename from pageserver/src/layered_repository/blob_io.rs rename to pageserver/src/tenant/blob_io.rs index a4c6186056..78ecbcb9c1 100644 --- a/pageserver/src/layered_repository/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -11,8 +11,8 @@ //! len < 128: 0XXXXXXX //! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX //! -use crate::layered_repository::block_io::{BlockCursor, BlockReader}; use crate::page_cache::PAGE_SZ; +use crate::tenant::block_io::{BlockCursor, BlockReader}; use std::cmp::min; use std::io::{Error, ErrorKind}; diff --git a/pageserver/src/layered_repository/block_io.rs b/pageserver/src/tenant/block_io.rs similarity index 98% rename from pageserver/src/layered_repository/block_io.rs rename to pageserver/src/tenant/block_io.rs index 5e32b8833a..bbcdabe1cd 100644 --- a/pageserver/src/layered_repository/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -60,7 +60,7 @@ where /// the underlying BlockReader. For example: /// /// ```no_run -/// # use pageserver::layered_repository::block_io::{BlockReader, FileBlockReader}; +/// # use pageserver::tenant::block_io::{BlockReader, FileBlockReader}; /// # let reader: FileBlockReader = todo!(); /// let cursor = reader.block_cursor(); /// let buf = cursor.read_blk(1); diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/tenant/delta_layer.rs similarity index 98% rename from pageserver/src/layered_repository/delta_layer.rs rename to pageserver/src/tenant/delta_layer.rs index af02f84bc0..ff6d3652f9 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/tenant/delta_layer.rs @@ -24,15 +24,13 @@ //! "values" part. //! use crate::config::PageServerConf; -use crate::layered_repository::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter}; -use crate::layered_repository::block_io::{BlockBuf, BlockCursor, BlockReader, FileBlockReader}; -use crate::layered_repository::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection}; -use crate::layered_repository::filename::{DeltaFileName, PathOrConf}; -use crate::layered_repository::storage_layer::{ - Layer, ValueReconstructResult, ValueReconstructState, -}; use crate::page_cache::{PageReadGuard, PAGE_SZ}; use crate::repository::{Key, Value, KEY_SIZE}; +use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter}; +use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockReader, FileBlockReader}; +use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection}; +use crate::tenant::filename::{DeltaFileName, PathOrConf}; +use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState}; use crate::virtual_file::VirtualFile; use crate::{walrecord, TEMP_FILE_SUFFIX}; use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION}; diff --git a/pageserver/src/layered_repository/disk_btree.rs b/pageserver/src/tenant/disk_btree.rs similarity index 99% rename from pageserver/src/layered_repository/disk_btree.rs rename to pageserver/src/tenant/disk_btree.rs index c130a42a8e..33255dbd82 100644 --- a/pageserver/src/layered_repository/disk_btree.rs +++ b/pageserver/src/tenant/disk_btree.rs @@ -25,7 +25,7 @@ use std::{cmp::Ordering, io, result}; use thiserror::Error; use tracing::error; -use crate::layered_repository::block_io::{BlockReader, BlockWriter}; +use crate::tenant::block_io::{BlockReader, BlockWriter}; // The maximum size of a value stored in the B-tree. 5 bytes is enough currently. pub const VALUE_SZ: usize = 5; diff --git a/pageserver/src/layered_repository/disk_btree_test_data.rs b/pageserver/src/tenant/disk_btree_test_data.rs similarity index 100% rename from pageserver/src/layered_repository/disk_btree_test_data.rs rename to pageserver/src/tenant/disk_btree_test_data.rs diff --git a/pageserver/src/layered_repository/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs similarity index 97% rename from pageserver/src/layered_repository/ephemeral_file.rs rename to pageserver/src/tenant/ephemeral_file.rs index a1b2d68cd5..c675e4e778 100644 --- a/pageserver/src/layered_repository/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -2,11 +2,11 @@ //! used to keep in-memory layers spilled on disk. use crate::config::PageServerConf; -use crate::layered_repository::blob_io::BlobWriter; -use crate::layered_repository::block_io::BlockReader; use crate::page_cache; use crate::page_cache::PAGE_SZ; use crate::page_cache::{ReadBufResult, WriteBufResult}; +use crate::tenant::blob_io::BlobWriter; +use crate::tenant::block_io::BlockReader; use crate::virtual_file::VirtualFile; use once_cell::sync::Lazy; use std::cmp::min; @@ -330,13 +330,13 @@ fn to_io_error(e: anyhow::Error, context: &str) -> io::Error { #[cfg(test)] mod tests { use super::*; - use crate::layered_repository::blob_io::{BlobCursor, BlobWriter}; - use crate::layered_repository::block_io::BlockCursor; + use crate::tenant::blob_io::{BlobCursor, BlobWriter}; + use crate::tenant::block_io::BlockCursor; use rand::{seq::SliceRandom, thread_rng, RngCore}; use std::fs; use std::str::FromStr; - fn repo_harness( + fn harness( test_name: &str, ) -> Result<(&'static PageServerConf, ZTenantId, ZTimelineId), io::Error> { let repo_dir = PageServerConf::test_repo_dir(test_name); @@ -368,7 +368,7 @@ mod tests { #[test] fn test_ephemeral_files() -> Result<(), io::Error> { - let (conf, tenantid, timelineid) = repo_harness("ephemeral_files")?; + let (conf, tenantid, timelineid) = harness("ephemeral_files")?; let file_a = EphemeralFile::create(conf, tenantid, timelineid)?; @@ -399,7 +399,7 @@ mod tests { #[test] fn test_ephemeral_blobs() -> Result<(), io::Error> { - let (conf, tenantid, timelineid) = repo_harness("ephemeral_blobs")?; + let (conf, tenantid, timelineid) = harness("ephemeral_blobs")?; let mut file = EphemeralFile::create(conf, tenantid, timelineid)?; diff --git a/pageserver/src/layered_repository/filename.rs b/pageserver/src/tenant/filename.rs similarity index 100% rename from pageserver/src/layered_repository/filename.rs rename to pageserver/src/tenant/filename.rs diff --git a/pageserver/src/layered_repository/image_layer.rs b/pageserver/src/tenant/image_layer.rs similarity index 97% rename from pageserver/src/layered_repository/image_layer.rs rename to pageserver/src/tenant/image_layer.rs index 4fe771bb3f..518643241d 100644 --- a/pageserver/src/layered_repository/image_layer.rs +++ b/pageserver/src/tenant/image_layer.rs @@ -20,15 +20,13 @@ //! mapping from Key to an offset in the "values" part. The //! actual page images are stored in the "values" part. use crate::config::PageServerConf; -use crate::layered_repository::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter}; -use crate::layered_repository::block_io::{BlockBuf, BlockReader, FileBlockReader}; -use crate::layered_repository::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection}; -use crate::layered_repository::filename::{ImageFileName, PathOrConf}; -use crate::layered_repository::storage_layer::{ - Layer, ValueReconstructResult, ValueReconstructState, -}; use crate::page_cache::PAGE_SZ; use crate::repository::{Key, Value, KEY_SIZE}; +use crate::tenant::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter}; +use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader}; +use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection}; +use crate::tenant::filename::{ImageFileName, PathOrConf}; +use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState}; use crate::virtual_file::VirtualFile; use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX}; use anyhow::{bail, ensure, Context, Result}; diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/tenant/inmemory_layer.rs similarity index 96% rename from pageserver/src/layered_repository/inmemory_layer.rs rename to pageserver/src/tenant/inmemory_layer.rs index 5f269a868f..0e7b215b1e 100644 --- a/pageserver/src/layered_repository/inmemory_layer.rs +++ b/pageserver/src/tenant/inmemory_layer.rs @@ -5,14 +5,12 @@ //! its position in the file, is kept in memory, though. //! use crate::config::PageServerConf; -use crate::layered_repository::blob_io::{BlobCursor, BlobWriter}; -use crate::layered_repository::block_io::BlockReader; -use crate::layered_repository::delta_layer::{DeltaLayer, DeltaLayerWriter}; -use crate::layered_repository::ephemeral_file::EphemeralFile; -use crate::layered_repository::storage_layer::{ - Layer, ValueReconstructResult, ValueReconstructState, -}; use crate::repository::{Key, Value}; +use crate::tenant::blob_io::{BlobCursor, BlobWriter}; +use crate::tenant::block_io::BlockReader; +use crate::tenant::delta_layer::{DeltaLayer, DeltaLayerWriter}; +use crate::tenant::ephemeral_file::EphemeralFile; +use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState}; use crate::walrecord; use anyhow::{bail, ensure, Result}; use std::cell::RefCell; diff --git a/pageserver/src/layered_repository/layer_map.rs b/pageserver/src/tenant/layer_map.rs similarity index 98% rename from pageserver/src/layered_repository/layer_map.rs rename to pageserver/src/tenant/layer_map.rs index 88dcf32409..c24e3976fb 100644 --- a/pageserver/src/layered_repository/layer_map.rs +++ b/pageserver/src/tenant/layer_map.rs @@ -10,11 +10,11 @@ //! corresponding files are written to disk. //! -use crate::layered_repository::inmemory_layer::InMemoryLayer; -use crate::layered_repository::storage_layer::Layer; -use crate::layered_repository::storage_layer::{range_eq, range_overlaps}; use crate::metrics::NUM_ONDISK_LAYERS; use crate::repository::Key; +use crate::tenant::inmemory_layer::InMemoryLayer; +use crate::tenant::storage_layer::Layer; +use crate::tenant::storage_layer::{range_eq, range_overlaps}; use anyhow::Result; use std::collections::VecDeque; use std::ops::Range; diff --git a/pageserver/src/layered_repository/metadata.rs b/pageserver/src/tenant/metadata.rs similarity index 98% rename from pageserver/src/layered_repository/metadata.rs rename to pageserver/src/tenant/metadata.rs index 910dba4644..4ea2b7d55b 100644 --- a/pageserver/src/layered_repository/metadata.rs +++ b/pageserver/src/tenant/metadata.rs @@ -1,4 +1,4 @@ -//! Every image of a certain timeline from [`crate::layered_repository::Repository`] +//! Every image of a certain timeline from [`crate::tenant::Tenant`] //! has a metadata that needs to be stored persistently. //! //! Later, the file gets is used in [`crate::remote_storage::storage_sync`] as a part of @@ -216,7 +216,7 @@ pub fn save_metadata( #[cfg(test)] mod tests { use super::*; - use crate::layered_repository::repo_harness::TIMELINE_ID; + use crate::tenant::harness::TIMELINE_ID; #[test] fn metadata_serializes_correctly() { diff --git a/pageserver/src/layered_repository/par_fsync.rs b/pageserver/src/tenant/par_fsync.rs similarity index 100% rename from pageserver/src/layered_repository/par_fsync.rs rename to pageserver/src/tenant/par_fsync.rs diff --git a/pageserver/src/layered_repository/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs similarity index 100% rename from pageserver/src/layered_repository/storage_layer.rs rename to pageserver/src/tenant/storage_layer.rs diff --git a/pageserver/src/layered_repository/timeline.rs b/pageserver/src/tenant/timeline.rs similarity index 99% rename from pageserver/src/layered_repository/timeline.rs rename to pageserver/src/tenant/timeline.rs index 60abbe33e6..c96ad99909 100644 --- a/pageserver/src/layered_repository/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -17,7 +17,7 @@ use std::sync::atomic::{self, AtomicBool, AtomicI64, Ordering as AtomicOrdering} use std::sync::{Arc, Mutex, MutexGuard, RwLock, TryLockError}; use std::time::{Duration, Instant, SystemTime}; -use crate::layered_repository::{ +use crate::tenant::{ delta_layer::{DeltaLayer, DeltaLayerWriter}, ephemeral_file::is_ephemeral_file, filename::{DeltaFileName, ImageFileName}, @@ -118,7 +118,7 @@ pub struct Timeline { /// Layer removal lock. /// A lock to ensure that no layer of the timeline is removed concurrently by other tasks. /// This lock is acquired in [`Timeline::gc`], [`Timeline::compact`], - /// and [`Repository::delete_timeline`]. + /// and [`Tenant::delete_timeline`]. layer_removal_cs: Mutex<()>, // Needed to ensure that we can't create a branch at a point that was already garbage collected diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index a9f015229f..a8a9926c77 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -1,26 +1,31 @@ //! This module acts as a switchboard to access different repositories managed by this //! page server. -use crate::config::PageServerConf; -use crate::http::models::TenantInfo; -use crate::layered_repository::ephemeral_file::is_ephemeral_file; -use crate::layered_repository::metadata::{TimelineMetadata, METADATA_FILE_NAME}; -use crate::layered_repository::Repository; -use crate::storage_sync::index::{RemoteIndex, RemoteTimelineIndex}; -use crate::storage_sync::{self, LocalTimelineInitStatus, SyncStartupData}; -use crate::task_mgr::{self, TaskKind}; -use crate::tenant_config::{TenantConf, TenantConfOpt}; -use crate::walredo::{PostgresRedoManager, WalRedoManager}; -use crate::{TenantTimelineValues, TEMP_FILE_SUFFIX}; -use anyhow::Context; -use remote_storage::{path_with_suffix_extension, GenericRemoteStorage}; use std::collections::{hash_map, HashMap, HashSet}; use std::ffi::OsStr; use std::fs; use std::path::{Path, PathBuf}; use std::sync::Arc; + +use anyhow::Context; use tracing::*; +use remote_storage::{path_with_suffix_extension, GenericRemoteStorage}; + +use crate::config::PageServerConf; +use crate::http::models::TenantInfo; +use crate::storage_sync::index::{RemoteIndex, RemoteTimelineIndex}; +use crate::storage_sync::{self, LocalTimelineInitStatus, SyncStartupData}; +use crate::task_mgr::{self, TaskKind}; +use crate::tenant::{ + ephemeral_file::is_ephemeral_file, + metadata::{TimelineMetadata, METADATA_FILE_NAME}, + Tenant, TenantState, +}; +use crate::tenant_config::{TenantConf, TenantConfOpt}; +use crate::walredo::PostgresRedoManager; +use crate::{TenantTimelineValues, TEMP_FILE_SUFFIX}; + use utils::crashsafe_dir; use utils::zid::{ZTenantId, ZTimelineId}; @@ -28,64 +33,31 @@ mod tenants_state { use once_cell::sync::Lazy; use std::{ collections::HashMap, - sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}, + sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}, }; use utils::zid::ZTenantId; - use crate::tenant_mgr::Tenant; + use crate::tenant::Tenant; - static TENANTS: Lazy>> = + static TENANTS: Lazy>>> = Lazy::new(|| RwLock::new(HashMap::new())); - pub(super) fn read_tenants() -> RwLockReadGuard<'static, HashMap> { + pub(super) fn read_tenants() -> RwLockReadGuard<'static, HashMap>> { TENANTS .read() .expect("Failed to read() tenants lock, it got poisoned") } - pub(super) fn write_tenants() -> RwLockWriteGuard<'static, HashMap> { + pub(super) fn write_tenants() -> RwLockWriteGuard<'static, HashMap>> { TENANTS .write() .expect("Failed to write() tenants lock, it got poisoned") } } -struct Tenant { - state: TenantState, - /// Contains in-memory state, including the timeline that might not yet flushed on disk or loaded form disk. - repo: Arc, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] -pub enum TenantState { - // This tenant exists on local disk, and the layer map has been loaded into memory. - // The local disk might have some newer files that don't exist in cloud storage yet. - Active, - // Tenant is active, but there is no walreceiver connection. - Idle, - // This tenant exists on local disk, and the layer map has been loaded into memory. - // The local disk might have some newer files that don't exist in cloud storage yet. - // The tenant cannot be accessed anymore for any reason, but graceful shutdown. - Stopping, - - // Something went wrong loading the tenant state - Broken, -} - -impl std::fmt::Display for TenantState { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::Active => f.write_str("Active"), - Self::Idle => f.write_str("Idle"), - Self::Stopping => f.write_str("Stopping"), - Self::Broken => f.write_str("Broken"), - } - } -} - /// Initialize repositories with locally available timelines. /// Timelines that are only partially available locally (remote storage has more data than this pageserver) -/// are scheduled for download and added to the repository once download is completed. +/// are scheduled for download and added to the tenant once download is completed. pub fn init_tenant_mgr( conf: &'static PageServerConf, remote_storage: Option, @@ -128,7 +100,7 @@ pub fn init_tenant_mgr( ) }; - attach_local_tenants(conf, &remote_index, tenants_to_attach)?; + attach_local_tenants(conf, &remote_index, tenants_to_attach); Ok(remote_index) } @@ -141,7 +113,7 @@ pub fn attach_local_tenants( conf: &'static PageServerConf, remote_index: &RemoteIndex, tenants_to_attach: TenantTimelineValues, -) -> anyhow::Result<()> { +) { let _entered = info_span!("attach_local_tenants").entered(); let number_of_tenants = tenants_to_attach.0.len(); @@ -152,104 +124,109 @@ pub fn attach_local_tenants( ); debug!("Timelines to attach: {local_timelines:?}"); - let repository = load_local_repo(conf, tenant_id, remote_index) - .context("Failed to load repository for tenant")?; - - let repo = Arc::clone(&repository); + let tenant = load_local_tenant(conf, tenant_id, remote_index); { match tenants_state::write_tenants().entry(tenant_id) { hash_map::Entry::Occupied(_) => { - anyhow::bail!("Cannot attach tenant {tenant_id}: there's already an entry in the tenant state"); + error!("Cannot attach tenant {tenant_id}: there's already an entry in the tenant state"); + continue; } hash_map::Entry::Vacant(v) => { - v.insert(Tenant { - state: TenantState::Idle, - repo, - }); + v.insert(Arc::clone(&tenant)); + } + } + } + + if tenant.current_state() == TenantState::Broken { + warn!("Skipping timeline load for broken tenant {tenant_id}") + } else { + let has_timelines = !local_timelines.is_empty(); + match tenant.init_attach_timelines(local_timelines) { + Ok(()) => { + info!("successfully loaded local timelines for tenant {tenant_id}"); + tenant.activate(has_timelines); + } + Err(e) => { + error!("Failed to attach tenant timelines: {e:?}"); + tenant.set_state(TenantState::Broken); } } } - // XXX: current timeline init enables walreceiver that looks for tenant in the state, so insert the tenant entry before - repository - .init_attach_timelines(local_timelines) - .context("Failed to attach timelines for tenant")?; } - info!("Processed {number_of_tenants} local tenants during attach"); - Ok(()) + info!("Processed {number_of_tenants} local tenants during attach") } -fn load_local_repo( +fn load_local_tenant( conf: &'static PageServerConf, tenant_id: ZTenantId, remote_index: &RemoteIndex, -) -> anyhow::Result> { - let repository = Repository::new( +) -> Arc { + let tenant = Arc::new(Tenant::new( conf, TenantConfOpt::default(), Arc::new(PostgresRedoManager::new(conf, tenant_id)), tenant_id, remote_index.clone(), conf.remote_storage_config.is_some(), - ); - let tenant_conf = Repository::load_tenant_config(conf, tenant_id)?; - repository.update_tenant_config(tenant_conf); - - Ok(Arc::new(repository)) + )); + match Tenant::load_tenant_config(conf, tenant_id) { + Ok(tenant_conf) => { + tenant.update_tenant_config(tenant_conf); + tenant.activate(false); + } + Err(e) => { + error!("Failed to read config for tenant {tenant_id}, disabling tenant: {e:?}"); + tenant.set_state(TenantState::Broken); + } + } + tenant } /// /// Shut down all tenants. This runs as part of pageserver shutdown. /// pub async fn shutdown_all_tenants() { - let tenantids = { + let tenants_to_shut_down = { let mut m = tenants_state::write_tenants(); - let mut tenantids = Vec::new(); - for (tenantid, tenant) in m.iter_mut() { - match tenant.state { - TenantState::Active | TenantState::Idle | TenantState::Stopping => { - tenant.state = TenantState::Stopping; - tenantids.push(*tenantid) - } - TenantState::Broken => {} + let mut tenants_to_shut_down = Vec::with_capacity(m.len()); + for (_, tenant) in m.drain() { + if tenant.is_active() { + // updates tenant state, forbidding new GC and compaction iterations from starting + tenant.set_state(TenantState::Paused); + tenants_to_shut_down.push(tenant) } } drop(m); - tenantids + tenants_to_shut_down }; + // Shut down all existing walreceiver connections and stop accepting the new ones. task_mgr::shutdown_tasks(Some(TaskKind::WalReceiverManager), None, None).await; // Ok, no background tasks running anymore. Flush any remaining data in // memory to disk. // // We assume that any incoming connections that might request pages from - // the repository have already been terminated by the caller, so there + // the tenant have already been terminated by the caller, so there // should be no more activity in any of the repositories. // // On error, log it but continue with the shutdown for other tenants. - for tenant_id in tenantids { + for tenant in tenants_to_shut_down { + let tenant_id = tenant.tenant_id(); debug!("shutdown tenant {tenant_id}"); - match get_repository_for_tenant(tenant_id) { - Ok(repo) => { - if let Err(err) = repo.checkpoint() { - error!("Could not checkpoint tenant {tenant_id} during shutdown: {err:?}"); - } - } - Err(err) => { - error!("Could not get repository for tenant {tenant_id} during shutdown: {err:?}"); - } + + if let Err(err) = tenant.checkpoint() { + error!("Could not checkpoint tenant {tenant_id} during shutdown: {err:?}"); } } } -fn create_repo( +fn create_tenant_files( conf: &'static PageServerConf, tenant_conf: TenantConfOpt, tenant_id: ZTenantId, - wal_redo_manager: Arc, - remote_index: RemoteIndex, -) -> anyhow::Result> { +) -> anyhow::Result<()> { let target_tenant_directory = conf.tenant_path(&tenant_id); anyhow::ensure!( !target_tenant_directory.exists(), @@ -282,7 +259,7 @@ fn create_repo( ) })?; // first, create a config in the top-level temp directory, fsync the file - Repository::persist_tenant_config(&temporary_tenant_config_path, tenant_conf, true)?; + Tenant::persist_tenant_config(&temporary_tenant_config_path, tenant_conf, true)?; // then, create a subdirectory in the top-level temp directory, fsynced crashsafe_dir::create_dir(&temporary_tenant_timelines_dir).with_context(|| { format!( @@ -312,18 +289,11 @@ fn create_repo( fs::File::open(target_dir_parent)?.sync_all()?; info!( - "created directory structure in {}", + "created tenant directory structure in {}", target_tenant_directory.display() ); - Ok(Arc::new(Repository::new( - conf, - tenant_conf, - wal_redo_manager, - tenant_id, - remote_index, - conf.remote_storage_config.is_some(), - ))) + Ok(()) } fn rebase_directory(original_path: &Path, base: &Path, new_base: &Path) -> anyhow::Result { @@ -350,12 +320,17 @@ pub fn create_tenant( } hash_map::Entry::Vacant(v) => { let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id)); - let repo = create_repo(conf, tenant_conf, tenant_id, wal_redo_manager, remote_index)?; - v.insert(Tenant { - state: TenantState::Active, - repo, - }); - crate::tenant_tasks::start_background_loops(tenant_id); + create_tenant_files(conf, tenant_conf, tenant_id)?; + let tenant = Arc::new(Tenant::new( + conf, + tenant_conf, + wal_redo_manager, + tenant_id, + remote_index, + conf.remote_storage_config.is_some(), + )); + tenant.activate(false); + v.insert(tenant); Ok(Some(tenant_id)) } } @@ -367,70 +342,23 @@ pub fn update_tenant_config( tenant_id: ZTenantId, ) -> anyhow::Result<()> { info!("configuring tenant {tenant_id}"); - get_repository_for_tenant(tenant_id)?.update_tenant_config(tenant_conf); - - Repository::persist_tenant_config(&TenantConf::path(conf, tenant_id), tenant_conf, false)?; + get_tenant(tenant_id, true)?.update_tenant_config(tenant_conf); + Tenant::persist_tenant_config(&TenantConf::path(conf, tenant_id), tenant_conf, false)?; Ok(()) } -pub fn get_tenant_state(tenantid: ZTenantId) -> Option { - Some(tenants_state::read_tenants().get(&tenantid)?.state) -} - -pub fn set_tenant_state(tenant_id: ZTenantId, new_state: TenantState) -> anyhow::Result<()> { - let old_state = { - let mut m = tenants_state::write_tenants(); - let tenant = m - .get_mut(&tenant_id) - .with_context(|| format!("Tenant not found for id {tenant_id}"))?; - let old_state = tenant.state; - tenant.state = new_state; - old_state - }; - - match (old_state, new_state) { - (TenantState::Broken, TenantState::Broken) - | (TenantState::Active, TenantState::Active) - | (TenantState::Idle, TenantState::Idle) - | (TenantState::Stopping, TenantState::Stopping) => { - debug!("tenant {tenant_id} already in state {new_state}"); - } - (TenantState::Broken, ignored) => { - debug!("Ignoring {ignored} since tenant {tenant_id} is in broken state"); - } - (_, TenantState::Broken) => { - debug!("Setting tenant {tenant_id} status to broken"); - } - (TenantState::Stopping, ignored) => { - debug!("Ignoring {ignored} since tenant {tenant_id} is in stopping state"); - } - (TenantState::Idle, TenantState::Active) => { - info!("activating tenant {tenant_id}"); - - // Spawn gc and compaction loops. The loops will shut themselves - // down when they notice that the tenant is inactive. - crate::tenant_tasks::start_background_loops(tenant_id); - } - (TenantState::Idle, TenantState::Stopping) => { - info!("stopping idle tenant {tenant_id}"); - } - (TenantState::Active, TenantState::Stopping | TenantState::Idle) => { - info!("stopping tenant {tenant_id} tasks due to new state {new_state}"); - - // Note: The caller is responsible for waiting for any tasks to finish. - } - } - - Ok(()) -} - -pub fn get_repository_for_tenant(tenant_id: ZTenantId) -> anyhow::Result> { +/// Gets the tenant from the in-memory data, erroring if it's absent or is not fitting to the query. +/// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants. +pub fn get_tenant(tenant_id: ZTenantId, active_only: bool) -> anyhow::Result> { let m = tenants_state::read_tenants(); let tenant = m .get(&tenant_id) - .with_context(|| format!("Tenant {tenant_id} not found"))?; - - Ok(Arc::clone(&tenant.repo)) + .with_context(|| format!("Tenant {tenant_id} not found in the local state"))?; + if active_only && !tenant.is_active() { + anyhow::bail!("Tenant {tenant_id} is not active") + } else { + Ok(Arc::clone(tenant)) + } } pub async fn delete_timeline(tenant_id: ZTenantId, timeline_id: ZTimelineId) -> anyhow::Result<()> { @@ -455,9 +383,14 @@ pub async fn delete_timeline(tenant_id: ZTenantId, timeline_id: ZTimelineId) -> info!("waiting for timeline tasks to shutdown"); task_mgr::shutdown_tasks(None, Some(tenant_id), Some(timeline_id)).await; info!("timeline task shutdown completed"); - match tenants_state::read_tenants().get(&tenant_id) { - Some(tenant) => tenant.repo.delete_timeline(timeline_id)?, - None => anyhow::bail!("Tenant {tenant_id} not found in local tenant state"), + match get_tenant(tenant_id, true) { + Ok(tenant) => { + tenant.delete_timeline(timeline_id)?; + if tenant.list_timelines().is_empty() { + tenant.activate(false); + } + } + Err(e) => anyhow::bail!("Cannot access tenant {tenant_id} in local tenant state: {e:?}"), } Ok(()) @@ -467,21 +400,24 @@ pub async fn detach_tenant( conf: &'static PageServerConf, tenant_id: ZTenantId, ) -> anyhow::Result<()> { - set_tenant_state(tenant_id, TenantState::Stopping)?; + let tenant = match { + let mut tenants_accessor = tenants_state::write_tenants(); + tenants_accessor.remove(&tenant_id) + } { + Some(tenant) => tenant, + None => anyhow::bail!("Tenant not found for id {tenant_id}"), + }; + + tenant.set_state(TenantState::Paused); // shutdown all tenant and timeline tasks: gc, compaction, page service) task_mgr::shutdown_tasks(None, Some(tenant_id), None).await; - { - let mut tenants_accessor = tenants_state::write_tenants(); - tenants_accessor.remove(&tenant_id); - } - // If removal fails there will be no way to successfully retry detach, // because the tenant no longer exists in the in-memory map. And it needs to be removed from it - // before we remove files, because it contains references to repository + // before we remove files, because it contains references to tenant // which references ephemeral files which are deleted on drop. So if we keep these references, // we will attempt to remove files which no longer exist. This can be fixed by having shutdown - // mechanism for repository that will clean temporary data to avoid any references to ephemeral files + // mechanism for tenant that will clean temporary data to avoid any references to ephemeral files let local_tenant_directory = conf.tenant_path(&tenant_id); fs::remove_dir_all(&local_tenant_directory).with_context(|| { format!( @@ -512,7 +448,7 @@ pub fn list_tenant_info(remote_index: &RemoteTimelineIndex) -> Vec { TenantInfo { id: *id, - state: Some(tenant.state), + state: tenant.current_state(), current_physical_size: None, has_in_progress_downloads, } diff --git a/pageserver/src/tenant_tasks.rs b/pageserver/src/tenant_tasks.rs index 9aaafe7f92..3ef54838af 100644 --- a/pageserver/src/tenant_tasks.rs +++ b/pageserver/src/tenant_tasks.rs @@ -1,12 +1,14 @@ //! This module contains functions to serve per-tenant background processes, //! such as compaction and GC +use std::ops::ControlFlow; +use std::sync::Arc; use std::time::Duration; use crate::metrics::TENANT_TASK_EVENTS; use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}; +use crate::tenant::{Tenant, TenantState}; use crate::tenant_mgr; -use crate::tenant_mgr::TenantState; use tracing::*; use utils::zid::ZTenantId; @@ -18,7 +20,10 @@ pub fn start_background_loops(tenant_id: ZTenantId) { None, &format!("compactor for tenant {tenant_id}"), false, - compaction_loop(tenant_id), + async move { + compaction_loop(tenant_id).await; + Ok(()) + }, ); task_mgr::spawn( BACKGROUND_RUNTIME.handle(), @@ -27,43 +32,50 @@ pub fn start_background_loops(tenant_id: ZTenantId) { None, &format!("garbage collector for tenant {tenant_id}"), false, - gc_loop(tenant_id), + async move { + gc_loop(tenant_id).await; + Ok(()) + }, ); } /// /// Compaction task's main loop /// -async fn compaction_loop(tenant_id: ZTenantId) -> anyhow::Result<()> { +async fn compaction_loop(tenant_id: ZTenantId) { + let wait_duration = Duration::from_secs(2); + info!("starting compaction loop for {tenant_id}"); TENANT_TASK_EVENTS.with_label_values(&["start"]).inc(); - let result = async { + async { loop { trace!("waking up"); + let tenant = tokio::select! { + _ = task_mgr::shutdown_watcher() => { + info!("received compaction cancellation request"); + return; + }, + tenant_wait_result = wait_for_active_tenant(tenant_id, wait_duration) => match tenant_wait_result { + ControlFlow::Break(()) => return, + ControlFlow::Continue(tenant) => tenant, + }, + }; + // Run blocking part of the task - // Break if tenant is not active - if tenant_mgr::get_tenant_state(tenant_id) != Some(TenantState::Active) { - break Ok(()); - } - // This should not fail. If someone started us, it means that the tenant exists. - // And before you remove a tenant, you have to wait until all the associated tasks - // exit. - let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?; - // Run compaction - let mut sleep_duration = repo.get_compaction_period(); - if let Err(e) = repo.compaction_iteration() { - error!("Compaction failed, retrying: {}", e); - sleep_duration = Duration::from_secs(2) + let mut sleep_duration = tenant.get_compaction_period(); + if let Err(e) = tenant.compaction_iteration() { + error!("Compaction failed, retrying: {e:#}"); + sleep_duration = wait_duration; } // Sleep tokio::select! { _ = task_mgr::shutdown_watcher() => { - trace!("received cancellation request"); - break Ok(()); + info!("received compaction cancellation request during idling"); + break ; }, _ = tokio::time::sleep(sleep_duration) => {}, } @@ -72,49 +84,49 @@ async fn compaction_loop(tenant_id: ZTenantId) -> anyhow::Result<()> { .await; TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc(); - info!( - "compaction loop stopped. State is {:?}", - tenant_mgr::get_tenant_state(tenant_id) - ); - result + trace!("compaction loop stopped."); } /// /// GC task's main loop /// -async fn gc_loop(tenant_id: ZTenantId) -> anyhow::Result<()> { +async fn gc_loop(tenant_id: ZTenantId) { + let wait_duration = Duration::from_secs(2); + info!("starting gc loop for {tenant_id}"); TENANT_TASK_EVENTS.with_label_values(&["start"]).inc(); - let result = async { + async { loop { trace!("waking up"); - // Break if tenant is not active - if tenant_mgr::get_tenant_state(tenant_id) != Some(TenantState::Active) { - break Ok(()); - } - // This should not fail. If someone started us, it means that the tenant exists. - // And before you remove a tenant, you have to wait until all the associated tasks - // exit. - let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?; + let tenant = tokio::select! { + _ = task_mgr::shutdown_watcher() => { + info!("received GC cancellation request"); + return; + }, + tenant_wait_result = wait_for_active_tenant(tenant_id, wait_duration) => match tenant_wait_result { + ControlFlow::Break(()) => return, + ControlFlow::Continue(tenant) => tenant, + }, + }; // Run gc - let gc_period = repo.get_gc_period(); - let gc_horizon = repo.get_gc_horizon(); + let gc_period = tenant.get_gc_period(); + let gc_horizon = tenant.get_gc_horizon(); let mut sleep_duration = gc_period; if gc_horizon > 0 { - if let Err(e) = repo.gc_iteration(None, gc_horizon, repo.get_pitr_interval(), false) + if let Err(e) = tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), false) { - error!("Gc failed, retrying: {}", e); - sleep_duration = Duration::from_secs(2) + error!("Gc failed, retrying: {e:#}"); + sleep_duration = wait_duration; } } // Sleep tokio::select! { _ = task_mgr::shutdown_watcher() => { - trace!("received cancellation request"); - break Ok(()); + info!("received GC cancellation request during idling"); + break; }, _ = tokio::time::sleep(sleep_duration) => {}, } @@ -122,9 +134,50 @@ async fn gc_loop(tenant_id: ZTenantId) -> anyhow::Result<()> { } .await; TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc(); - info!( - "GC loop stopped. State is {:?}", - tenant_mgr::get_tenant_state(tenant_id) - ); - result + trace!("GC loop stopped."); +} + +async fn wait_for_active_tenant( + tenant_id: ZTenantId, + wait: Duration, +) -> ControlFlow<(), Arc> { + let tenant = loop { + match tenant_mgr::get_tenant(tenant_id, false) { + Ok(tenant) => break tenant, + Err(e) => { + error!("Failed to get a tenant {tenant_id}: {e:#}"); + tokio::time::sleep(wait).await; + } + } + }; + + // if the tenant has a proper status already, no need to wait for anything + if tenant.should_run_tasks() { + ControlFlow::Continue(tenant) + } else { + let mut tenant_state_updates = tenant.subscribe_for_state_updates(); + loop { + match tenant_state_updates.changed().await { + Ok(()) => { + let new_state = *tenant_state_updates.borrow(); + match new_state { + TenantState::Active { + background_jobs_running: true, + } => { + debug!("Tenant state changed to active with background jobs enabled, continuing the task loop"); + return ControlFlow::Continue(tenant); + } + state => { + debug!("Not running the task loop, tenant is not active with background jobs enabled: {state:?}"); + tokio::time::sleep(wait).await; + } + } + } + Err(_sender_dropped_error) => { + info!("Tenant dropped the state updates sender, quitting waiting for tenant and the task loop"); + return ControlFlow::Break(()); + } + } + } + } } diff --git a/pageserver/src/timelines.rs b/pageserver/src/timelines.rs index 35dec54d5c..69d14babf0 100644 --- a/pageserver/src/timelines.rs +++ b/pageserver/src/timelines.rs @@ -2,34 +2,28 @@ //! Timeline management code // -use anyhow::{bail, Context, Result}; -use remote_storage::path_with_suffix_extension; - use std::{ fs, path::Path, process::{Command, Stdio}, sync::Arc, }; + +use anyhow::{bail, Context, Result}; use tracing::*; +use remote_storage::path_with_suffix_extension; use utils::{ lsn::Lsn, zid::{ZTenantId, ZTimelineId}, }; use crate::config::PageServerConf; -use crate::layered_repository::{Repository, Timeline}; +use crate::tenant::{Tenant, Timeline}; use crate::tenant_mgr; use crate::CheckpointConfig; use crate::{import_datadir, TEMP_FILE_SUFFIX}; -#[derive(Debug, Clone, Copy)] -pub struct PointInTime { - pub timeline_id: ZTimelineId, - pub lsn: Lsn, -} - // Create the cluster temporarily in 'initdbpath' directory inside the repository // to get bootstrap data for timeline initialization. // @@ -69,7 +63,7 @@ fn bootstrap_timeline( conf: &'static PageServerConf, tenant_id: ZTenantId, timeline_id: ZTimelineId, - repo: &Repository, + tenant: &Tenant, ) -> Result> { // create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/` // temporary directory for basebackup files for the given timeline. @@ -89,7 +83,7 @@ fn bootstrap_timeline( // LSN, and any WAL after that. // Initdb lsn will be equal to last_record_lsn which will be set after import. // Because we know it upfront avoid having an option or dummy zero value by passing it to create_empty_timeline. - let timeline = repo.create_empty_timeline(timeline_id, lsn)?; + let timeline = tenant.create_empty_timeline(timeline_id, lsn)?; import_datadir::import_timeline_from_postgres_datadir(&pgdata_path, &*timeline, lsn)?; fail::fail_point!("before-checkpoint-new-timeline", |_| { @@ -127,16 +121,16 @@ pub(crate) async fn create_timeline( mut ancestor_start_lsn: Option, ) -> Result>> { let new_timeline_id = new_timeline_id.unwrap_or_else(ZTimelineId::generate); - let repo = tenant_mgr::get_repository_for_tenant(tenant_id)?; + let tenant = tenant_mgr::get_tenant(tenant_id, true)?; if conf.timeline_path(&new_timeline_id, &tenant_id).exists() { - debug!("timeline {} already exists", new_timeline_id); + debug!("timeline {new_timeline_id} already exists"); return Ok(None); } let loaded_timeline = match ancestor_timeline_id { Some(ancestor_timeline_id) => { - let ancestor_timeline = repo + let ancestor_timeline = tenant .get_timeline(ancestor_timeline_id) .context("Cannot branch off the timeline that's not present in pageserver")?; @@ -162,10 +156,13 @@ pub(crate) async fn create_timeline( } } - repo.branch_timeline(ancestor_timeline_id, new_timeline_id, ancestor_start_lsn)? + tenant.branch_timeline(ancestor_timeline_id, new_timeline_id, ancestor_start_lsn)? } - None => bootstrap_timeline(conf, tenant_id, new_timeline_id, repo.as_ref())?, + None => bootstrap_timeline(conf, tenant_id, new_timeline_id, &tenant)?, }; + // Have added new timeline into the tenant, now its background tasks are needed. + tenant.activate(true); + Ok(Some(loaded_timeline)) } diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 57592a46d3..45d0916dec 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -30,9 +30,9 @@ use anyhow::Result; use bytes::{Buf, Bytes, BytesMut}; use tracing::*; -use crate::layered_repository::Timeline; use crate::pgdatadir_mapping::*; use crate::reltag::{RelTag, SlruKind}; +use crate::tenant::Timeline; use crate::walrecord::*; use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment; use postgres_ffi::v14::pg_constants; @@ -1022,16 +1022,13 @@ impl<'a> WalIngest<'a> { } } -/// -/// Tests that should work the same with any Repository/Timeline implementation. -/// #[allow(clippy::bool_assert_comparison)] #[cfg(test)] mod tests { use super::*; - use crate::layered_repository::repo_harness::*; - use crate::layered_repository::Timeline; use crate::pgdatadir_mapping::create_test_timeline; + use crate::tenant::harness::*; + use crate::tenant::Timeline; use postgres_ffi::v14::xlog_utils::SIZEOF_CHECKPOINT; use postgres_ffi::RELSEG_SIZE; @@ -1061,8 +1058,8 @@ mod tests { #[test] fn test_relsize() -> Result<()> { - let repo = RepoHarness::create("test_relsize")?.load(); - let tline = create_test_timeline(&repo, TIMELINE_ID)?; + let tenant = TenantHarness::create("test_relsize")?.load(); + let tline = create_test_timeline(&tenant, TIMELINE_ID)?; let mut walingest = init_walingest_test(&*tline)?; let mut m = tline.begin_modification(Lsn(0x20)); @@ -1189,8 +1186,8 @@ mod tests { // and then created it again within the same layer. #[test] fn test_drop_extend() -> Result<()> { - let repo = RepoHarness::create("test_drop_extend")?.load(); - let tline = create_test_timeline(&repo, TIMELINE_ID)?; + let tenant = TenantHarness::create("test_drop_extend")?.load(); + let tline = create_test_timeline(&tenant, TIMELINE_ID)?; let mut walingest = init_walingest_test(&*tline)?; let mut m = tline.begin_modification(Lsn(0x20)); @@ -1229,8 +1226,8 @@ mod tests { // and then extended it again within the same layer. #[test] fn test_truncate_extend() -> Result<()> { - let repo = RepoHarness::create("test_truncate_extend")?.load(); - let tline = create_test_timeline(&repo, TIMELINE_ID)?; + let tenant = TenantHarness::create("test_truncate_extend")?.load(); + let tline = create_test_timeline(&tenant, TIMELINE_ID)?; let mut walingest = init_walingest_test(&*tline)?; // Create a 20 MB relation (the size is arbitrary) @@ -1317,8 +1314,8 @@ mod tests { /// split into multiple 1 GB segments in Postgres. #[test] fn test_large_rel() -> Result<()> { - let repo = RepoHarness::create("test_large_rel")?.load(); - let tline = create_test_timeline(&repo, TIMELINE_ID)?; + let tenant = TenantHarness::create("test_large_rel")?.load(); + let tline = create_test_timeline(&tenant, TIMELINE_ID)?; let mut walingest = init_walingest_test(&*tline)?; let mut lsn = 0x10; diff --git a/pageserver/src/walreceiver/connection_manager.rs b/pageserver/src/walreceiver/connection_manager.rs index 1fcb768ddf..69e400f291 100644 --- a/pageserver/src/walreceiver/connection_manager.rs +++ b/pageserver/src/walreceiver/connection_manager.rs @@ -16,10 +16,10 @@ use std::{ time::Duration, }; -use crate::layered_repository::Timeline; use crate::task_mgr; use crate::task_mgr::TaskKind; use crate::task_mgr::WALRECEIVER_RUNTIME; +use crate::tenant::Timeline; use anyhow::Context; use chrono::{NaiveDateTime, Utc}; use etcd_broker::{ @@ -767,11 +767,11 @@ fn wal_stream_connection_string( #[cfg(test)] mod tests { use super::*; - use crate::layered_repository::repo_harness::{RepoHarness, TIMELINE_ID}; + use crate::tenant::harness::{TenantHarness, TIMELINE_ID}; #[test] fn no_connection_no_candidate() -> anyhow::Result<()> { - let harness = RepoHarness::create("no_connection_no_candidate")?; + let harness = TenantHarness::create("no_connection_no_candidate")?; let mut state = dummy_state(&harness); let now = Utc::now().naive_utc(); @@ -857,7 +857,7 @@ mod tests { #[tokio::test] async fn connection_no_candidate() -> anyhow::Result<()> { - let harness = RepoHarness::create("connection_no_candidate")?; + let harness = TenantHarness::create("connection_no_candidate")?; let mut state = dummy_state(&harness); let now = Utc::now().naive_utc(); @@ -948,7 +948,7 @@ mod tests { #[test] fn no_connection_candidate() -> anyhow::Result<()> { - let harness = RepoHarness::create("no_connection_candidate")?; + let harness = TenantHarness::create("no_connection_candidate")?; let mut state = dummy_state(&harness); let now = Utc::now().naive_utc(); @@ -1053,7 +1053,7 @@ mod tests { #[tokio::test] async fn candidate_with_many_connection_failures() -> anyhow::Result<()> { - let harness = RepoHarness::create("candidate_with_many_connection_failures")?; + let harness = TenantHarness::create("candidate_with_many_connection_failures")?; let mut state = dummy_state(&harness); let now = Utc::now().naive_utc(); @@ -1117,7 +1117,7 @@ mod tests { #[tokio::test] async fn lsn_wal_over_threshhold_current_candidate() -> anyhow::Result<()> { - let harness = RepoHarness::create("lsn_wal_over_threshcurrent_candidate")?; + let harness = TenantHarness::create("lsn_wal_over_threshcurrent_candidate")?; let mut state = dummy_state(&harness); let current_lsn = Lsn(100_000).align(); let now = Utc::now().naive_utc(); @@ -1204,7 +1204,7 @@ mod tests { #[tokio::test] async fn timeout_connection_threshhold_current_candidate() -> anyhow::Result<()> { - let harness = RepoHarness::create("timeout_connection_threshhold_current_candidate")?; + let harness = TenantHarness::create("timeout_connection_threshhold_current_candidate")?; let mut state = dummy_state(&harness); let current_lsn = Lsn(100_000).align(); let now = Utc::now().naive_utc(); @@ -1276,7 +1276,7 @@ mod tests { #[tokio::test] async fn timeout_wal_over_threshhold_current_candidate() -> anyhow::Result<()> { - let harness = RepoHarness::create("timeout_wal_over_threshhold_current_candidate")?; + let harness = TenantHarness::create("timeout_wal_over_threshhold_current_candidate")?; let mut state = dummy_state(&harness); let current_lsn = Lsn(100_000).align(); let new_lsn = Lsn(100_100).align(); @@ -1353,7 +1353,7 @@ mod tests { const DUMMY_SAFEKEEPER_CONNSTR: &str = "safekeeper_connstr"; - fn dummy_state(harness: &RepoHarness) -> WalreceiverState { + fn dummy_state(harness: &TenantHarness) -> WalreceiverState { WalreceiverState { id: ZTenantTimelineId { tenant_id: harness.tenant_id, diff --git a/pageserver/src/walreceiver/walreceiver_connection.rs b/pageserver/src/walreceiver/walreceiver_connection.rs index e8fa9f9aca..6f1fbc2c9d 100644 --- a/pageserver/src/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/walreceiver/walreceiver_connection.rs @@ -21,10 +21,10 @@ use tracing::{debug, error, info, info_span, trace, warn, Instrument}; use super::TaskEvent; use crate::metrics::LIVE_CONNECTIONS_COUNT; use crate::{ - layered_repository::{Timeline, WalReceiverInfo}, task_mgr, task_mgr::TaskKind, task_mgr::WALRECEIVER_RUNTIME, + tenant::{Timeline, WalReceiverInfo}, tenant_mgr, walingest::WalIngest, walrecord::DecodedWALRecord, @@ -141,8 +141,7 @@ pub async fn handle_walreceiver_connection( let tenant_id = timeline.tenant_id; let timeline_id = timeline.timeline_id; - let repo = tenant_mgr::get_repository_for_tenant(tenant_id) - .with_context(|| format!("no repository found for tenant {tenant_id}"))?; + let tenant = tenant_mgr::get_tenant(tenant_id, true)?; // // Start streaming the WAL, from where we left off previously. @@ -283,7 +282,7 @@ pub async fn handle_walreceiver_connection( })?; if let Some(last_lsn) = status_update { - let remote_index = repo.get_remote_index(); + let remote_index = tenant.get_remote_index(); let timeline_remote_consistent_lsn = remote_index .read() .await diff --git a/test_runner/regress/test_broken_timeline.py b/test_runner/regress/test_broken_timeline.py index 1d083b3ef9..ce3a74930e 100644 --- a/test_runner/regress/test_broken_timeline.py +++ b/test_runner/regress/test_broken_timeline.py @@ -71,7 +71,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder): # First timeline would not get loaded into pageserver due to corrupt metadata file with pytest.raises( - Exception, match=f"Could not get timeline {timeline1} in tenant {tenant1}" + Exception, match=f"Timeline {timeline1} was not found for tenant {tenant1}" ) as err: pg1.start() log.info(f"compute startup failed eagerly for timeline with corrupt metadata: {err}") @@ -80,7 +80,7 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder): # We don't have the remote storage enabled, which means timeline is in an incorrect state, # it's not loaded at all with pytest.raises( - Exception, match=f"Could not get timeline {timeline2} in tenant {tenant2}" + Exception, match=f"Timeline {timeline2} was not found for tenant {tenant2}" ) as err: pg2.start() log.info(f"compute startup failed eagerly for timeline with corrupt metadata: {err}") diff --git a/test_runner/regress/test_tenant_tasks.py b/test_runner/regress/test_tenant_tasks.py index 315ec7f306..1214d703d0 100644 --- a/test_runner/regress/test_tenant_tasks.py +++ b/test_runner/regress/test_tenant_tasks.py @@ -40,11 +40,16 @@ def test_tenant_tasks(neon_env_builder: NeonEnvBuilder): for t in timelines: client.timeline_delete(tenant, t) + def assert_active_without_jobs(tenant): + assert get_state(tenant) == {"Active": {"background_jobs_running": False}} + # Create tenant, start compute tenant, _ = env.neon_cli.create_tenant() env.neon_cli.create_timeline(name, tenant_id=tenant) pg = env.postgres.create_start(name, tenant_id=tenant) - assert get_state(tenant) == "Active" + assert get_state(tenant) == { + "Active": {"background_jobs_running": True} + }, "Pageserver should activate a tenant and start background jobs if timelines are loaded" # Stop compute pg.stop() @@ -53,6 +58,7 @@ def test_tenant_tasks(neon_env_builder: NeonEnvBuilder): for tenant_info in client.tenant_list(): tenant_id = ZTenantId(tenant_info["id"]) delete_all_timelines(tenant_id) + wait_until(10, 0.2, lambda: assert_active_without_jobs(tenant_id)) # Assert that all tasks finish quickly after tenant is detached assert get_metric_value('pageserver_tenant_task_events{event="start"}') > 0 diff --git a/test_runner/regress/test_timeline_delete.py b/test_runner/regress/test_timeline_delete.py index a5dadc535b..5a20dbd232 100644 --- a/test_runner/regress/test_timeline_delete.py +++ b/test_runner/regress/test_timeline_delete.py @@ -18,7 +18,7 @@ def test_timeline_delete(neon_simple_env: NeonEnv): invalid_tenant_id = ZTenantId.generate() with pytest.raises( NeonPageserverApiException, - match=f"Tenant {invalid_tenant_id} not found in local tenant state", + match=f"Tenant {invalid_tenant_id} not found in the local state", ): ps_http.timeline_delete(tenant_id=invalid_tenant_id, timeline_id=invalid_timeline_id) @@ -64,7 +64,8 @@ def test_timeline_delete(neon_simple_env: NeonEnv): # check 404 with pytest.raises( - NeonPageserverApiException, match="is not found neither locally nor remotely" + NeonPageserverApiException, + match=f"Timeline {env.initial_tenant}/{leaf_timeline_id} is not found neither locally nor remotely", ): ps_http.timeline_detail(env.initial_tenant, leaf_timeline_id)