diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 5c135e4eb4..728dcb53de 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -10,10 +10,7 @@ use daemonize::Daemonize; use pageserver::{ config::{defaults::*, PageServerConf}, - http, page_cache, page_service, profiling, - remote_storage::{self, SyncStartupData}, - repository::{Repository, TimelineSyncStatusUpdate}, - tenant_mgr, thread_mgr, + http, page_cache, page_service, profiling, tenant_mgr, thread_mgr, thread_mgr::ThreadKind, timelines, virtual_file, LOG_FILE_NAME, }; @@ -235,47 +232,8 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<() let signals = signals::install_shutdown_handlers()?; - // Initialize repositories with locally available timelines. - // Timelines that are only partially available locally (remote storage has more data than this pageserver) - // are scheduled for download and added to the repository once download is completed. - let SyncStartupData { - remote_index, - local_timeline_init_statuses, - } = remote_storage::start_local_timeline_sync(conf) - .context("Failed to set up local files sync with external storage")?; - - for (tenant_id, local_timeline_init_statuses) in local_timeline_init_statuses { - // initialize local tenant - let repo = tenant_mgr::load_local_repo(conf, tenant_id, &remote_index) - .with_context(|| format!("Failed to load repo for tenant {}", tenant_id))?; - for (timeline_id, init_status) in local_timeline_init_statuses { - match init_status { - remote_storage::LocalTimelineInitStatus::LocallyComplete => { - debug!("timeline {} for tenant {} is locally complete, registering it in repository", timeline_id, tenant_id); - // Lets fail here loudly to be on the safe side. - // XXX: It may be a better api to actually distinguish between repository startup - // and processing of newly downloaded timelines. - repo.apply_timeline_remote_sync_status_update( - timeline_id, - TimelineSyncStatusUpdate::Downloaded, - ) - .with_context(|| { - format!( - "Failed to bootstrap timeline {} for tenant {}", - timeline_id, tenant_id - ) - })? - } - remote_storage::LocalTimelineInitStatus::NeedsSync => { - debug!( - "timeline {} for tenant {} needs sync, \ - so skipped for adding into repository until sync is finished", - tenant_id, timeline_id - ); - } - } - } - } + // start profiler (if enabled) + let profiler_guard = profiling::init_profiler(conf); // initialize authentication for incoming connections let auth = match &conf.auth_type { @@ -288,8 +246,7 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<() }; info!("Using auth: {:#?}", conf.auth_type); - // start profiler (if enabled) - let profiler_guard = profiling::init_profiler(conf); + let remote_index = tenant_mgr::init_tenant_mgr(conf)?; // Spawn a new thread for the http endpoint // bind before launching separate thread so the error reported before startup exits diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 05485ef3b6..f1b482cf50 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -244,7 +244,7 @@ async fn timeline_attach_handler(request: Request) -> Result) -> Result, A crate::tenant_mgr::list_tenants() }) .await - .map_err(ApiError::from_err)??; + .map_err(ApiError::from_err)?; json_response(StatusCode::OK, response_data) } @@ -377,7 +377,7 @@ async fn tenant_create_handler(mut request: Request) -> Result> = Mutex::new(HashMap::new()); +mod tenants_state { + use std::{ + collections::HashMap, + sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}, + }; + + use utils::zid::ZTenantId; + + use crate::tenant_mgr::Tenant; + + lazy_static::lazy_static! { + static ref TENANTS: RwLock> = RwLock::new(HashMap::new()); + } + + pub(super) fn read_tenants() -> RwLockReadGuard<'static, HashMap> { + TENANTS + .read() + .expect("Failed to read() tenants lock, it got poisoned") + } + + pub(super) fn write_tenants() -> RwLockWriteGuard<'static, HashMap> { + TENANTS + .write() + .expect("Failed to write() tenants lock, it got poisoned") + } } struct Tenant { state: TenantState, + /// Contains in-memory state, including the timeline that might not yet flushed on disk or loaded form disk. repo: Arc, - - timelines: HashMap>, + /// Timelines, located locally in the pageserver's datadir. + /// Whatever manipulations happen, local timelines are not removed, only incremented with files. + /// + /// Local timelines have more metadata that's loaded into memory, + /// that is located in the `repo.timelines` field, [`crate::layered_repository::LayeredTimelineEntry`]. + local_timelines: HashMap>, } #[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] @@ -60,43 +88,17 @@ impl fmt::Display for TenantState { } } -fn access_tenants() -> MutexGuard<'static, HashMap> { - TENANTS.lock().unwrap() -} - -// Sets up wal redo manager and repository for tenant. Reduces code duplication. -// Used during pageserver startup, or when new tenant is attached to pageserver. -pub fn load_local_repo( - conf: &'static PageServerConf, - tenant_id: ZTenantId, - remote_index: &RemoteIndex, -) -> Result> { - let mut m = access_tenants(); - let tenant = m.entry(tenant_id).or_insert_with(|| { - // Set up a WAL redo manager, for applying WAL records. - let walredo_mgr = PostgresRedoManager::new(conf, tenant_id); - - // Set up an object repository, for actual data storage. - let repo: Arc = Arc::new(LayeredRepository::new( - conf, - Default::default(), - Arc::new(walredo_mgr), - tenant_id, - remote_index.clone(), - conf.remote_storage_config.is_some(), - )); - Tenant { - state: TenantState::Idle, - repo, - timelines: HashMap::new(), - } - }); - - // Restore tenant config - let tenant_conf = LayeredRepository::load_tenant_config(conf, tenant_id)?; - tenant.repo.update_tenant_config(tenant_conf)?; - - Ok(Arc::clone(&tenant.repo)) +/// Initialize repositories with locally available timelines. +/// Timelines that are only partially available locally (remote storage has more data than this pageserver) +/// are scheduled for download and added to the repository once download is completed. +pub fn init_tenant_mgr(conf: &'static PageServerConf) -> anyhow::Result { + let SyncStartupData { + remote_index, + local_timeline_init_statuses, + } = remote_storage::start_local_timeline_sync(conf) + .context("Failed to set up local files sync with external storage")?; + init_local_repositories(conf, local_timeline_init_statuses, &remote_index)?; + Ok(remote_index) } /// Updates tenants' repositories, changing their timelines state in memory. @@ -113,32 +115,28 @@ pub fn apply_timeline_sync_status_updates( "Applying sync status updates for {} timelines", sync_status_updates.len() ); - trace!("Sync status updates: {:?}", sync_status_updates); + debug!("Sync status updates: {sync_status_updates:?}"); - for (tenant_id, tenant_timelines_sync_status_updates) in sync_status_updates { + for (tenant_id, status_updates) in sync_status_updates { let repo = match load_local_repo(conf, tenant_id, remote_index) { Ok(repo) => repo, Err(e) => { - error!( - "Failed to load repo for tenant {} Error: {:#}", - tenant_id, e - ); + error!("Failed to load repo for tenant {tenant_id} Error: {e:?}",); continue; } }; - for (timeline_id, timeline_sync_status_update) in tenant_timelines_sync_status_updates { - match repo.apply_timeline_remote_sync_status_update(timeline_id, timeline_sync_status_update) + for (timeline_id, status_update) in status_updates { + match repo.apply_timeline_remote_sync_status_update(timeline_id, status_update) { - Ok(_) => debug!( - "successfully applied timeline sync status update: {} -> {}", - timeline_id, timeline_sync_status_update - ), + Ok(()) => debug!("successfully applied timeline sync status update: {timeline_id} -> {status_update}"), Err(e) => error!( - "Failed to apply timeline sync status update for tenant {}. timeline {} update {} Error: {:#}", - tenant_id, timeline_id, timeline_sync_status_update, e + "Failed to apply timeline sync status update for tenant {tenant_id}. timeline {timeline_id} update {status_update} Error: {e:?}" ), } + match status_update { + TimelineSyncStatusUpdate::Downloaded => todo!("TODO kb "), + } } } } @@ -147,7 +145,7 @@ pub fn apply_timeline_sync_status_updates( /// Shut down all tenants. This runs as part of pageserver shutdown. /// pub fn shutdown_all_tenants() { - let mut m = access_tenants(); + let mut m = tenants_state::write_tenants(); let mut tenantids = Vec::new(); for (tenantid, tenant) in m.iter_mut() { tenant.state = TenantState::Stopping; @@ -167,22 +165,16 @@ pub fn shutdown_all_tenants() { // should be no more activity in any of the repositories. // // On error, log it but continue with the shutdown for other tenants. - for tenantid in tenantids { - debug!("shutdown tenant {}", tenantid); - match get_repository_for_tenant(tenantid) { + for tenant_id in tenantids { + debug!("shutdown tenant {tenant_id}"); + match get_repository_for_tenant(tenant_id) { Ok(repo) => { if let Err(err) = repo.checkpoint() { - error!( - "Could not checkpoint tenant {} during shutdown: {:?}", - tenantid, err - ); + error!("Could not checkpoint tenant {tenant_id} during shutdown: {err:?}"); } } Err(err) => { - error!( - "Could not get repository for tenant {} during shutdown: {:?}", - tenantid, err - ); + error!("Could not get repository for tenant {tenant_id} during shutdown: {err:?}"); } } } @@ -191,20 +183,20 @@ pub fn shutdown_all_tenants() { pub fn create_tenant_repository( conf: &'static PageServerConf, tenant_conf: TenantConfOpt, - tenantid: ZTenantId, + tenant_id: ZTenantId, remote_index: RemoteIndex, -) -> Result> { - match access_tenants().entry(tenantid) { +) -> anyhow::Result> { + match tenants_state::write_tenants().entry(tenant_id) { Entry::Occupied(_) => { - debug!("tenant {} already exists", tenantid); + debug!("tenant {tenant_id} already exists"); Ok(None) } Entry::Vacant(v) => { - let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenantid)); + let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id)); let repo = timelines::create_repo( conf, tenant_conf, - tenantid, + tenant_id, CreateRepo::Real { wal_redo_manager, remote_index, @@ -213,36 +205,39 @@ pub fn create_tenant_repository( v.insert(Tenant { state: TenantState::Idle, repo, - timelines: HashMap::new(), + local_timelines: HashMap::new(), }); - Ok(Some(tenantid)) + Ok(Some(tenant_id)) } } } -pub fn update_tenant_config(tenant_conf: TenantConfOpt, tenantid: ZTenantId) -> Result<()> { - info!("configuring tenant {}", tenantid); - let repo = get_repository_for_tenant(tenantid)?; +pub fn update_tenant_config( + tenant_conf: TenantConfOpt, + tenant_id: ZTenantId, +) -> anyhow::Result<()> { + info!("configuring tenant {tenant_id}"); + let repo = get_repository_for_tenant(tenant_id)?; repo.update_tenant_config(tenant_conf)?; Ok(()) } pub fn get_tenant_state(tenantid: ZTenantId) -> Option { - Some(access_tenants().get(&tenantid)?.state) + Some(tenants_state::read_tenants().get(&tenantid)?.state) } /// /// Change the state of a tenant to Active and launch its compactor and GC /// threads. If the tenant was already in Active state or Stopping, does nothing. /// -pub fn activate_tenant(tenant_id: ZTenantId) -> Result<()> { - let mut m = access_tenants(); +pub fn activate_tenant(tenant_id: ZTenantId) -> anyhow::Result<()> { + let mut m = tenants_state::write_tenants(); let tenant = m .get_mut(&tenant_id) - .with_context(|| format!("Tenant not found for id {}", tenant_id))?; + .with_context(|| format!("Tenant not found for id {tenant_id}"))?; - info!("activating tenant {}", tenant_id); + info!("activating tenant {tenant_id}"); match tenant.state { // If the tenant is already active, nothing to do. @@ -267,13 +262,10 @@ pub fn activate_tenant(tenant_id: ZTenantId) -> Result<()> { true, move || crate::tenant_threads::gc_loop(tenant_id), ) - .with_context(|| format!("Failed to launch GC thread for tenant {}", tenant_id)); + .with_context(|| format!("Failed to launch GC thread for tenant {tenant_id}")); if let Err(e) = &gc_spawn_result { - error!( - "Failed to start GC thread for tenant {}, stopping its checkpointer thread: {:?}", - tenant_id, e - ); + error!("Failed to start GC thread for tenant {tenant_id}, stopping its checkpointer thread: {e:?}"); thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None); return gc_spawn_result; } @@ -287,39 +279,42 @@ pub fn activate_tenant(tenant_id: ZTenantId) -> Result<()> { Ok(()) } -pub fn get_repository_for_tenant(tenantid: ZTenantId) -> Result> { - let m = access_tenants(); +pub fn get_repository_for_tenant(tenant_id: ZTenantId) -> anyhow::Result> { + let m = tenants_state::read_tenants(); let tenant = m - .get(&tenantid) - .with_context(|| format!("Tenant {} not found", tenantid))?; + .get(&tenant_id) + .with_context(|| format!("Tenant {tenant_id} not found"))?; Ok(Arc::clone(&tenant.repo)) } -// Retrieve timeline for tenant. Load it into memory if it is not already loaded -pub fn get_timeline_for_tenant_load( - tenantid: ZTenantId, - timelineid: ZTimelineId, -) -> Result> { - let mut m = access_tenants(); +/// Retrieves local timeline for tenant. +/// Loads it into memory if it is not already loaded. +pub fn get_local_timeline_with_load( + tenant_id: ZTenantId, + timeline_id: ZTimelineId, +) -> anyhow::Result> { + let mut m = tenants_state::write_tenants(); let tenant = m - .get_mut(&tenantid) - .with_context(|| format!("Tenant {} not found", tenantid))?; + .get_mut(&tenant_id) + .with_context(|| format!("Tenant {tenant_id} not found"))?; - if let Some(page_tline) = tenant.timelines.get(&timelineid) { + if let Some(page_tline) = tenant.local_timelines.get(&timeline_id) { return Ok(Arc::clone(page_tline)); } // First access to this timeline. Create a DatadirTimeline wrapper for it let tline = tenant .repo - .get_timeline_load(timelineid) - .with_context(|| format!("Timeline {} not found for tenant {}", timelineid, tenantid))?; + .get_timeline_load(timeline_id) + .with_context(|| format!("Timeline {timeline_id} not found for tenant {tenant_id}"))?; let repartition_distance = tenant.repo.get_checkpoint_distance() / 10; let page_tline = Arc::new(DatadirTimelineImpl::new(tline, repartition_distance)); page_tline.init_logical_size()?; - tenant.timelines.insert(timelineid, Arc::clone(&page_tline)); + tenant + .local_timelines + .insert(timeline_id, Arc::clone(&page_tline)); Ok(page_tline) } @@ -331,15 +326,87 @@ pub struct TenantInfo { pub state: TenantState, } -pub fn list_tenants() -> Result> { - access_tenants() +pub fn list_tenants() -> Vec { + tenants_state::read_tenants() .iter() - .map(|v| { - let (id, tenant) = v; - Ok(TenantInfo { - id: *id, - state: tenant.state, - }) + .map(|(id, tenant)| TenantInfo { + id: *id, + state: tenant.state, }) .collect() } + +fn init_local_repositories( + conf: &'static PageServerConf, + local_timeline_init_statuses: HashMap>, + remote_index: &RemoteIndex, +) -> anyhow::Result<(), anyhow::Error> { + for (tenant_id, local_timeline_init_statuses) in local_timeline_init_statuses { + // initialize local tenant + let repo = load_local_repo(conf, tenant_id, remote_index) + .with_context(|| format!("Failed to load repo for tenant {}", tenant_id))?; + for (timeline_id, init_status) in local_timeline_init_statuses { + match init_status { + LocalTimelineInitStatus::LocallyComplete => { + debug!("timeline {} for tenant {} is locally complete, registering it in repository", timeline_id, tenant_id); + // Lets fail here loudly to be on the safe side. + // XXX: It may be a better api to actually distinguish between repository startup + // and processing of newly downloaded timelines. + repo.apply_timeline_remote_sync_status_update( + timeline_id, + TimelineSyncStatusUpdate::Downloaded, + ) + .with_context(|| { + format!( + "Failed to bootstrap timeline {} for tenant {}", + timeline_id, tenant_id + ) + })? + } + LocalTimelineInitStatus::NeedsSync => { + debug!( + "timeline {} for tenant {} needs sync, \ + so skipped for adding into repository until sync is finished", + tenant_id, timeline_id + ); + } + } + } + } + Ok(()) +} + +// Sets up wal redo manager and repository for tenant. Reduces code duplication. +// Used during pageserver startup, or when new tenant is attached to pageserver. +fn load_local_repo( + conf: &'static PageServerConf, + tenant_id: ZTenantId, + remote_index: &RemoteIndex, +) -> anyhow::Result> { + let mut m = tenants_state::write_tenants(); + let tenant = m.entry(tenant_id).or_insert_with(|| { + // Set up a WAL redo manager, for applying WAL records. + let walredo_mgr = PostgresRedoManager::new(conf, tenant_id); + + // Set up an object repository, for actual data storage. + let repo: Arc = Arc::new(LayeredRepository::new( + conf, + TenantConfOpt::default(), + Arc::new(walredo_mgr), + tenant_id, + remote_index.clone(), + conf.remote_storage_config.is_some(), + )); + Tenant { + state: TenantState::Idle, + repo, + local_timelines: HashMap::new(), + } + }); + + // Restore tenant config + let tenant_conf = LayeredRepository::load_tenant_config(conf, tenant_id)?; + tenant.repo.update_tenant_config(tenant_conf)?; + + Ok(Arc::clone(&tenant.repo)) +} diff --git a/pageserver/src/timelines.rs b/pageserver/src/timelines.rs index adc531e6bb..acc92bb4a2 100644 --- a/pageserver/src/timelines.rs +++ b/pageserver/src/timelines.rs @@ -2,7 +2,7 @@ //! Timeline management code // -use anyhow::{bail, Context, Result}; +use anyhow::{bail, ensure, Context, Result}; use postgres_ffi::ControlFileData; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; @@ -106,7 +106,7 @@ impl LocalTimelineInfo { match repo_timeline { RepositoryTimeline::Loaded(_) => { let datadir_tline = - tenant_mgr::get_timeline_for_tenant_load(tenant_id, timeline_id)?; + tenant_mgr::get_local_timeline_with_load(tenant_id, timeline_id)?; Self::from_loaded_timeline(&datadir_tline, include_non_incremental_logical_size) } RepositoryTimeline::Unloaded { metadata } => Ok(Self::from_unloaded_timeline(metadata)), @@ -152,7 +152,7 @@ pub fn init_pageserver( if let Some(tenant_id) = create_tenant { println!("initializing tenantid {}", tenant_id); - let repo = create_repo(conf, Default::default(), tenant_id, CreateRepo::Dummy) + let repo = create_repo(conf, TenantConfOpt::default(), tenant_id, CreateRepo::Dummy) .context("failed to create repo")?; let new_timeline_id = initial_timeline_id.unwrap_or_else(ZTimelineId::generate); bootstrap_timeline(conf, tenant_id, new_timeline_id, repo.as_ref()) @@ -203,9 +203,11 @@ pub fn create_repo( }; let repo_dir = conf.tenant_path(&tenant_id); - if repo_dir.exists() { - bail!("tenant {} directory already exists", tenant_id); - } + ensure!( + repo_dir.exists(), + "cannot create new tenant repo: '{}' directory already exists", + tenant_id + ); // top-level dir may exist if we are creating it through CLI crashsafe_dir::create_dir_all(&repo_dir) @@ -383,7 +385,7 @@ pub(crate) fn create_timeline( repo.branch_timeline(ancestor_timeline_id, new_timeline_id, start_lsn)?; // load the timeline into memory let loaded_timeline = - tenant_mgr::get_timeline_for_tenant_load(tenant_id, new_timeline_id)?; + tenant_mgr::get_local_timeline_with_load(tenant_id, new_timeline_id)?; LocalTimelineInfo::from_loaded_timeline(&loaded_timeline, false) .context("cannot fill timeline info")? } @@ -391,7 +393,7 @@ pub(crate) fn create_timeline( bootstrap_timeline(conf, tenant_id, new_timeline_id, repo.as_ref())?; // load the timeline into memory let new_timeline = - tenant_mgr::get_timeline_for_tenant_load(tenant_id, new_timeline_id)?; + tenant_mgr::get_local_timeline_with_load(tenant_id, new_timeline_id)?; LocalTimelineInfo::from_loaded_timeline(&new_timeline, false) .context("cannot fill timeline info")? } diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 357aab7221..b7a33364c9 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -184,7 +184,7 @@ fn walreceiver_main( let repo = tenant_mgr::get_repository_for_tenant(tenant_id) .with_context(|| format!("no repository found for tenant {}", tenant_id))?; let timeline = - tenant_mgr::get_timeline_for_tenant_load(tenant_id, timeline_id).with_context(|| { + tenant_mgr::get_local_timeline_with_load(tenant_id, timeline_id).with_context(|| { format!( "local timeline {} not found for tenant {}", timeline_id, tenant_id