mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-26 17:40:37 +00:00
Properly populate local timeline map
This commit is contained in:
committed by
Kirill Bulatov
parent
5c5c3c64f3
commit
4a46b01caf
@@ -10,10 +10,7 @@ use daemonize::Daemonize;
|
||||
|
||||
use pageserver::{
|
||||
config::{defaults::*, PageServerConf},
|
||||
http, page_cache, page_service, profiling,
|
||||
remote_storage::{self, SyncStartupData},
|
||||
repository::{Repository, TimelineSyncStatusUpdate},
|
||||
tenant_mgr, thread_mgr,
|
||||
http, page_cache, page_service, profiling, tenant_mgr, thread_mgr,
|
||||
thread_mgr::ThreadKind,
|
||||
timelines, virtual_file, LOG_FILE_NAME,
|
||||
};
|
||||
@@ -235,47 +232,8 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
|
||||
|
||||
let signals = signals::install_shutdown_handlers()?;
|
||||
|
||||
// Initialize repositories with locally available timelines.
|
||||
// Timelines that are only partially available locally (remote storage has more data than this pageserver)
|
||||
// are scheduled for download and added to the repository once download is completed.
|
||||
let SyncStartupData {
|
||||
remote_index,
|
||||
local_timeline_init_statuses,
|
||||
} = remote_storage::start_local_timeline_sync(conf)
|
||||
.context("Failed to set up local files sync with external storage")?;
|
||||
|
||||
for (tenant_id, local_timeline_init_statuses) in local_timeline_init_statuses {
|
||||
// initialize local tenant
|
||||
let repo = tenant_mgr::load_local_repo(conf, tenant_id, &remote_index)
|
||||
.with_context(|| format!("Failed to load repo for tenant {}", tenant_id))?;
|
||||
for (timeline_id, init_status) in local_timeline_init_statuses {
|
||||
match init_status {
|
||||
remote_storage::LocalTimelineInitStatus::LocallyComplete => {
|
||||
debug!("timeline {} for tenant {} is locally complete, registering it in repository", timeline_id, tenant_id);
|
||||
// Lets fail here loudly to be on the safe side.
|
||||
// XXX: It may be a better api to actually distinguish between repository startup
|
||||
// and processing of newly downloaded timelines.
|
||||
repo.apply_timeline_remote_sync_status_update(
|
||||
timeline_id,
|
||||
TimelineSyncStatusUpdate::Downloaded,
|
||||
)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to bootstrap timeline {} for tenant {}",
|
||||
timeline_id, tenant_id
|
||||
)
|
||||
})?
|
||||
}
|
||||
remote_storage::LocalTimelineInitStatus::NeedsSync => {
|
||||
debug!(
|
||||
"timeline {} for tenant {} needs sync, \
|
||||
so skipped for adding into repository until sync is finished",
|
||||
tenant_id, timeline_id
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// start profiler (if enabled)
|
||||
let profiler_guard = profiling::init_profiler(conf);
|
||||
|
||||
// initialize authentication for incoming connections
|
||||
let auth = match &conf.auth_type {
|
||||
@@ -288,8 +246,7 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<()
|
||||
};
|
||||
info!("Using auth: {:#?}", conf.auth_type);
|
||||
|
||||
// start profiler (if enabled)
|
||||
let profiler_guard = profiling::init_profiler(conf);
|
||||
let remote_index = tenant_mgr::init_tenant_mgr(conf)?;
|
||||
|
||||
// Spawn a new thread for the http endpoint
|
||||
// bind before launching separate thread so the error reported before startup exits
|
||||
|
||||
@@ -244,7 +244,7 @@ async fn timeline_attach_handler(request: Request<Body>) -> Result<Response<Body
|
||||
);
|
||||
|
||||
tokio::task::spawn_blocking(move || {
|
||||
if tenant_mgr::get_timeline_for_tenant_load(tenant_id, timeline_id).is_ok() {
|
||||
if tenant_mgr::get_local_timeline_with_load(tenant_id, timeline_id).is_ok() {
|
||||
// TODO: maybe answer with 309 Not Modified here?
|
||||
anyhow::bail!("Timeline is already present locally")
|
||||
};
|
||||
@@ -365,7 +365,7 @@ async fn tenant_list_handler(request: Request<Body>) -> Result<Response<Body>, A
|
||||
crate::tenant_mgr::list_tenants()
|
||||
})
|
||||
.await
|
||||
.map_err(ApiError::from_err)??;
|
||||
.map_err(ApiError::from_err)?;
|
||||
|
||||
json_response(StatusCode::OK, response_data)
|
||||
}
|
||||
@@ -377,7 +377,7 @@ async fn tenant_create_handler(mut request: Request<Body>) -> Result<Response<Bo
|
||||
let request_data: TenantCreateRequest = json_request(&mut request).await?;
|
||||
let remote_index = get_state(&request).remote_index.clone();
|
||||
|
||||
let mut tenant_conf: TenantConfOpt = Default::default();
|
||||
let mut tenant_conf = TenantConfOpt::default();
|
||||
if let Some(gc_period) = request_data.gc_period {
|
||||
tenant_conf.gc_period =
|
||||
Some(humantime::parse_duration(&gc_period).map_err(ApiError::from_err)?);
|
||||
|
||||
@@ -1814,7 +1814,7 @@ impl LayeredTimeline {
|
||||
let target_file_size = self.get_checkpoint_distance();
|
||||
|
||||
// Define partitioning schema if needed
|
||||
if let Ok(pgdir) = tenant_mgr::get_timeline_for_tenant_load(self.tenantid, self.timelineid)
|
||||
if let Ok(pgdir) = tenant_mgr::get_local_timeline_with_load(self.tenantid, self.timelineid)
|
||||
{
|
||||
let (partitioning, lsn) = pgdir.repartition(
|
||||
self.get_last_record_lsn(),
|
||||
|
||||
@@ -326,7 +326,7 @@ impl PageServerHandler {
|
||||
let _enter = info_span!("pagestream", timeline = %timelineid, tenant = %tenantid).entered();
|
||||
|
||||
// Check that the timeline exists
|
||||
let timeline = tenant_mgr::get_timeline_for_tenant_load(tenantid, timelineid)
|
||||
let timeline = tenant_mgr::get_local_timeline_with_load(tenantid, timelineid)
|
||||
.context("Cannot load local timeline")?;
|
||||
|
||||
/* switch client to COPYBOTH */
|
||||
@@ -522,7 +522,7 @@ impl PageServerHandler {
|
||||
info!("starting");
|
||||
|
||||
// check that the timeline exists
|
||||
let timeline = tenant_mgr::get_timeline_for_tenant_load(tenantid, timelineid)
|
||||
let timeline = tenant_mgr::get_local_timeline_with_load(tenantid, timelineid)
|
||||
.context("Cannot load local timeline")?;
|
||||
let latest_gc_cutoff_lsn = timeline.tline.get_latest_gc_cutoff_lsn();
|
||||
if let Some(lsn) = lsn {
|
||||
@@ -656,7 +656,7 @@ impl postgres_backend::Handler for PageServerHandler {
|
||||
info_span!("callmemaybe", timeline = %timelineid, tenant = %tenantid).entered();
|
||||
|
||||
// Check that the timeline exists
|
||||
tenant_mgr::get_timeline_for_tenant_load(tenantid, timelineid)
|
||||
tenant_mgr::get_local_timeline_with_load(tenantid, timelineid)
|
||||
.context("Cannot load local timeline")?;
|
||||
|
||||
walreceiver::launch_wal_receiver(self.conf, tenantid, timelineid, &connstr)?;
|
||||
@@ -768,7 +768,7 @@ impl postgres_backend::Handler for PageServerHandler {
|
||||
|
||||
let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?;
|
||||
let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?;
|
||||
let timeline = tenant_mgr::get_timeline_for_tenant_load(tenantid, timelineid)
|
||||
let timeline = tenant_mgr::get_local_timeline_with_load(tenantid, timelineid)
|
||||
.context("Couldn't load timeline")?;
|
||||
timeline.tline.compact()?;
|
||||
|
||||
@@ -787,7 +787,7 @@ impl postgres_backend::Handler for PageServerHandler {
|
||||
let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?;
|
||||
let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?;
|
||||
|
||||
let timeline = tenant_mgr::get_timeline_for_tenant_load(tenantid, timelineid)
|
||||
let timeline = tenant_mgr::get_local_timeline_with_load(tenantid, timelineid)
|
||||
.context("Cannot load local timeline")?;
|
||||
|
||||
timeline.tline.checkpoint(CheckpointConfig::Forced)?;
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::layered_repository::LayeredRepository;
|
||||
use crate::remote_storage::RemoteIndex;
|
||||
use crate::remote_storage::{self, LocalTimelineInitStatus, RemoteIndex, SyncStartupData};
|
||||
use crate::repository::{Repository, TimelineSyncStatusUpdate};
|
||||
use crate::tenant_config::TenantConfOpt;
|
||||
use crate::thread_mgr;
|
||||
@@ -12,26 +12,54 @@ use crate::timelines;
|
||||
use crate::timelines::CreateRepo;
|
||||
use crate::walredo::PostgresRedoManager;
|
||||
use crate::{DatadirTimelineImpl, RepositoryImpl};
|
||||
use anyhow::{Context, Result};
|
||||
use lazy_static::lazy_static;
|
||||
use anyhow::Context;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::{serde_as, DisplayFromStr};
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::HashMap;
|
||||
use std::fmt;
|
||||
use std::sync::{Arc, Mutex, MutexGuard};
|
||||
use std::sync::Arc;
|
||||
use tracing::*;
|
||||
|
||||
use utils::zid::{ZTenantId, ZTimelineId};
|
||||
|
||||
lazy_static! {
|
||||
static ref TENANTS: Mutex<HashMap<ZTenantId, Tenant>> = Mutex::new(HashMap::new());
|
||||
mod tenants_state {
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
|
||||
};
|
||||
|
||||
use utils::zid::ZTenantId;
|
||||
|
||||
use crate::tenant_mgr::Tenant;
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
static ref TENANTS: RwLock<HashMap<ZTenantId, Tenant>> = RwLock::new(HashMap::new());
|
||||
}
|
||||
|
||||
pub(super) fn read_tenants() -> RwLockReadGuard<'static, HashMap<ZTenantId, Tenant>> {
|
||||
TENANTS
|
||||
.read()
|
||||
.expect("Failed to read() tenants lock, it got poisoned")
|
||||
}
|
||||
|
||||
pub(super) fn write_tenants() -> RwLockWriteGuard<'static, HashMap<ZTenantId, Tenant>> {
|
||||
TENANTS
|
||||
.write()
|
||||
.expect("Failed to write() tenants lock, it got poisoned")
|
||||
}
|
||||
}
|
||||
|
||||
struct Tenant {
|
||||
state: TenantState,
|
||||
/// Contains in-memory state, including the timeline that might not yet flushed on disk or loaded form disk.
|
||||
repo: Arc<RepositoryImpl>,
|
||||
|
||||
timelines: HashMap<ZTimelineId, Arc<DatadirTimelineImpl>>,
|
||||
/// Timelines, located locally in the pageserver's datadir.
|
||||
/// Whatever manipulations happen, local timelines are not removed, only incremented with files.
|
||||
///
|
||||
/// Local timelines have more metadata that's loaded into memory,
|
||||
/// that is located in the `repo.timelines` field, [`crate::layered_repository::LayeredTimelineEntry`].
|
||||
local_timelines: HashMap<ZTimelineId, Arc<DatadirTimelineImpl>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
|
||||
@@ -60,43 +88,17 @@ impl fmt::Display for TenantState {
|
||||
}
|
||||
}
|
||||
|
||||
fn access_tenants() -> MutexGuard<'static, HashMap<ZTenantId, Tenant>> {
|
||||
TENANTS.lock().unwrap()
|
||||
}
|
||||
|
||||
// Sets up wal redo manager and repository for tenant. Reduces code duplication.
|
||||
// Used during pageserver startup, or when new tenant is attached to pageserver.
|
||||
pub fn load_local_repo(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: ZTenantId,
|
||||
remote_index: &RemoteIndex,
|
||||
) -> Result<Arc<RepositoryImpl>> {
|
||||
let mut m = access_tenants();
|
||||
let tenant = m.entry(tenant_id).or_insert_with(|| {
|
||||
// Set up a WAL redo manager, for applying WAL records.
|
||||
let walredo_mgr = PostgresRedoManager::new(conf, tenant_id);
|
||||
|
||||
// Set up an object repository, for actual data storage.
|
||||
let repo: Arc<LayeredRepository> = Arc::new(LayeredRepository::new(
|
||||
conf,
|
||||
Default::default(),
|
||||
Arc::new(walredo_mgr),
|
||||
tenant_id,
|
||||
remote_index.clone(),
|
||||
conf.remote_storage_config.is_some(),
|
||||
));
|
||||
Tenant {
|
||||
state: TenantState::Idle,
|
||||
repo,
|
||||
timelines: HashMap::new(),
|
||||
}
|
||||
});
|
||||
|
||||
// Restore tenant config
|
||||
let tenant_conf = LayeredRepository::load_tenant_config(conf, tenant_id)?;
|
||||
tenant.repo.update_tenant_config(tenant_conf)?;
|
||||
|
||||
Ok(Arc::clone(&tenant.repo))
|
||||
/// Initialize repositories with locally available timelines.
|
||||
/// Timelines that are only partially available locally (remote storage has more data than this pageserver)
|
||||
/// are scheduled for download and added to the repository once download is completed.
|
||||
pub fn init_tenant_mgr(conf: &'static PageServerConf) -> anyhow::Result<RemoteIndex> {
|
||||
let SyncStartupData {
|
||||
remote_index,
|
||||
local_timeline_init_statuses,
|
||||
} = remote_storage::start_local_timeline_sync(conf)
|
||||
.context("Failed to set up local files sync with external storage")?;
|
||||
init_local_repositories(conf, local_timeline_init_statuses, &remote_index)?;
|
||||
Ok(remote_index)
|
||||
}
|
||||
|
||||
/// Updates tenants' repositories, changing their timelines state in memory.
|
||||
@@ -113,32 +115,28 @@ pub fn apply_timeline_sync_status_updates(
|
||||
"Applying sync status updates for {} timelines",
|
||||
sync_status_updates.len()
|
||||
);
|
||||
trace!("Sync status updates: {:?}", sync_status_updates);
|
||||
debug!("Sync status updates: {sync_status_updates:?}");
|
||||
|
||||
for (tenant_id, tenant_timelines_sync_status_updates) in sync_status_updates {
|
||||
for (tenant_id, status_updates) in sync_status_updates {
|
||||
let repo = match load_local_repo(conf, tenant_id, remote_index) {
|
||||
Ok(repo) => repo,
|
||||
Err(e) => {
|
||||
error!(
|
||||
"Failed to load repo for tenant {} Error: {:#}",
|
||||
tenant_id, e
|
||||
);
|
||||
error!("Failed to load repo for tenant {tenant_id} Error: {e:?}",);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
for (timeline_id, timeline_sync_status_update) in tenant_timelines_sync_status_updates {
|
||||
match repo.apply_timeline_remote_sync_status_update(timeline_id, timeline_sync_status_update)
|
||||
for (timeline_id, status_update) in status_updates {
|
||||
match repo.apply_timeline_remote_sync_status_update(timeline_id, status_update)
|
||||
{
|
||||
Ok(_) => debug!(
|
||||
"successfully applied timeline sync status update: {} -> {}",
|
||||
timeline_id, timeline_sync_status_update
|
||||
),
|
||||
Ok(()) => debug!("successfully applied timeline sync status update: {timeline_id} -> {status_update}"),
|
||||
Err(e) => error!(
|
||||
"Failed to apply timeline sync status update for tenant {}. timeline {} update {} Error: {:#}",
|
||||
tenant_id, timeline_id, timeline_sync_status_update, e
|
||||
"Failed to apply timeline sync status update for tenant {tenant_id}. timeline {timeline_id} update {status_update} Error: {e:?}"
|
||||
),
|
||||
}
|
||||
match status_update {
|
||||
TimelineSyncStatusUpdate::Downloaded => todo!("TODO kb "),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -147,7 +145,7 @@ pub fn apply_timeline_sync_status_updates(
|
||||
/// Shut down all tenants. This runs as part of pageserver shutdown.
|
||||
///
|
||||
pub fn shutdown_all_tenants() {
|
||||
let mut m = access_tenants();
|
||||
let mut m = tenants_state::write_tenants();
|
||||
let mut tenantids = Vec::new();
|
||||
for (tenantid, tenant) in m.iter_mut() {
|
||||
tenant.state = TenantState::Stopping;
|
||||
@@ -167,22 +165,16 @@ pub fn shutdown_all_tenants() {
|
||||
// should be no more activity in any of the repositories.
|
||||
//
|
||||
// On error, log it but continue with the shutdown for other tenants.
|
||||
for tenantid in tenantids {
|
||||
debug!("shutdown tenant {}", tenantid);
|
||||
match get_repository_for_tenant(tenantid) {
|
||||
for tenant_id in tenantids {
|
||||
debug!("shutdown tenant {tenant_id}");
|
||||
match get_repository_for_tenant(tenant_id) {
|
||||
Ok(repo) => {
|
||||
if let Err(err) = repo.checkpoint() {
|
||||
error!(
|
||||
"Could not checkpoint tenant {} during shutdown: {:?}",
|
||||
tenantid, err
|
||||
);
|
||||
error!("Could not checkpoint tenant {tenant_id} during shutdown: {err:?}");
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
error!(
|
||||
"Could not get repository for tenant {} during shutdown: {:?}",
|
||||
tenantid, err
|
||||
);
|
||||
error!("Could not get repository for tenant {tenant_id} during shutdown: {err:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -191,20 +183,20 @@ pub fn shutdown_all_tenants() {
|
||||
pub fn create_tenant_repository(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_conf: TenantConfOpt,
|
||||
tenantid: ZTenantId,
|
||||
tenant_id: ZTenantId,
|
||||
remote_index: RemoteIndex,
|
||||
) -> Result<Option<ZTenantId>> {
|
||||
match access_tenants().entry(tenantid) {
|
||||
) -> anyhow::Result<Option<ZTenantId>> {
|
||||
match tenants_state::write_tenants().entry(tenant_id) {
|
||||
Entry::Occupied(_) => {
|
||||
debug!("tenant {} already exists", tenantid);
|
||||
debug!("tenant {tenant_id} already exists");
|
||||
Ok(None)
|
||||
}
|
||||
Entry::Vacant(v) => {
|
||||
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenantid));
|
||||
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
|
||||
let repo = timelines::create_repo(
|
||||
conf,
|
||||
tenant_conf,
|
||||
tenantid,
|
||||
tenant_id,
|
||||
CreateRepo::Real {
|
||||
wal_redo_manager,
|
||||
remote_index,
|
||||
@@ -213,36 +205,39 @@ pub fn create_tenant_repository(
|
||||
v.insert(Tenant {
|
||||
state: TenantState::Idle,
|
||||
repo,
|
||||
timelines: HashMap::new(),
|
||||
local_timelines: HashMap::new(),
|
||||
});
|
||||
Ok(Some(tenantid))
|
||||
Ok(Some(tenant_id))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update_tenant_config(tenant_conf: TenantConfOpt, tenantid: ZTenantId) -> Result<()> {
|
||||
info!("configuring tenant {}", tenantid);
|
||||
let repo = get_repository_for_tenant(tenantid)?;
|
||||
pub fn update_tenant_config(
|
||||
tenant_conf: TenantConfOpt,
|
||||
tenant_id: ZTenantId,
|
||||
) -> anyhow::Result<()> {
|
||||
info!("configuring tenant {tenant_id}");
|
||||
let repo = get_repository_for_tenant(tenant_id)?;
|
||||
|
||||
repo.update_tenant_config(tenant_conf)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_tenant_state(tenantid: ZTenantId) -> Option<TenantState> {
|
||||
Some(access_tenants().get(&tenantid)?.state)
|
||||
Some(tenants_state::read_tenants().get(&tenantid)?.state)
|
||||
}
|
||||
|
||||
///
|
||||
/// Change the state of a tenant to Active and launch its compactor and GC
|
||||
/// threads. If the tenant was already in Active state or Stopping, does nothing.
|
||||
///
|
||||
pub fn activate_tenant(tenant_id: ZTenantId) -> Result<()> {
|
||||
let mut m = access_tenants();
|
||||
pub fn activate_tenant(tenant_id: ZTenantId) -> anyhow::Result<()> {
|
||||
let mut m = tenants_state::write_tenants();
|
||||
let tenant = m
|
||||
.get_mut(&tenant_id)
|
||||
.with_context(|| format!("Tenant not found for id {}", tenant_id))?;
|
||||
.with_context(|| format!("Tenant not found for id {tenant_id}"))?;
|
||||
|
||||
info!("activating tenant {}", tenant_id);
|
||||
info!("activating tenant {tenant_id}");
|
||||
|
||||
match tenant.state {
|
||||
// If the tenant is already active, nothing to do.
|
||||
@@ -267,13 +262,10 @@ pub fn activate_tenant(tenant_id: ZTenantId) -> Result<()> {
|
||||
true,
|
||||
move || crate::tenant_threads::gc_loop(tenant_id),
|
||||
)
|
||||
.with_context(|| format!("Failed to launch GC thread for tenant {}", tenant_id));
|
||||
.with_context(|| format!("Failed to launch GC thread for tenant {tenant_id}"));
|
||||
|
||||
if let Err(e) = &gc_spawn_result {
|
||||
error!(
|
||||
"Failed to start GC thread for tenant {}, stopping its checkpointer thread: {:?}",
|
||||
tenant_id, e
|
||||
);
|
||||
error!("Failed to start GC thread for tenant {tenant_id}, stopping its checkpointer thread: {e:?}");
|
||||
thread_mgr::shutdown_threads(Some(ThreadKind::Compactor), Some(tenant_id), None);
|
||||
return gc_spawn_result;
|
||||
}
|
||||
@@ -287,39 +279,42 @@ pub fn activate_tenant(tenant_id: ZTenantId) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_repository_for_tenant(tenantid: ZTenantId) -> Result<Arc<RepositoryImpl>> {
|
||||
let m = access_tenants();
|
||||
pub fn get_repository_for_tenant(tenant_id: ZTenantId) -> anyhow::Result<Arc<RepositoryImpl>> {
|
||||
let m = tenants_state::read_tenants();
|
||||
let tenant = m
|
||||
.get(&tenantid)
|
||||
.with_context(|| format!("Tenant {} not found", tenantid))?;
|
||||
.get(&tenant_id)
|
||||
.with_context(|| format!("Tenant {tenant_id} not found"))?;
|
||||
|
||||
Ok(Arc::clone(&tenant.repo))
|
||||
}
|
||||
|
||||
// Retrieve timeline for tenant. Load it into memory if it is not already loaded
|
||||
pub fn get_timeline_for_tenant_load(
|
||||
tenantid: ZTenantId,
|
||||
timelineid: ZTimelineId,
|
||||
) -> Result<Arc<DatadirTimelineImpl>> {
|
||||
let mut m = access_tenants();
|
||||
/// Retrieves local timeline for tenant.
|
||||
/// Loads it into memory if it is not already loaded.
|
||||
pub fn get_local_timeline_with_load(
|
||||
tenant_id: ZTenantId,
|
||||
timeline_id: ZTimelineId,
|
||||
) -> anyhow::Result<Arc<DatadirTimelineImpl>> {
|
||||
let mut m = tenants_state::write_tenants();
|
||||
let tenant = m
|
||||
.get_mut(&tenantid)
|
||||
.with_context(|| format!("Tenant {} not found", tenantid))?;
|
||||
.get_mut(&tenant_id)
|
||||
.with_context(|| format!("Tenant {tenant_id} not found"))?;
|
||||
|
||||
if let Some(page_tline) = tenant.timelines.get(&timelineid) {
|
||||
if let Some(page_tline) = tenant.local_timelines.get(&timeline_id) {
|
||||
return Ok(Arc::clone(page_tline));
|
||||
}
|
||||
// First access to this timeline. Create a DatadirTimeline wrapper for it
|
||||
let tline = tenant
|
||||
.repo
|
||||
.get_timeline_load(timelineid)
|
||||
.with_context(|| format!("Timeline {} not found for tenant {}", timelineid, tenantid))?;
|
||||
.get_timeline_load(timeline_id)
|
||||
.with_context(|| format!("Timeline {timeline_id} not found for tenant {tenant_id}"))?;
|
||||
|
||||
let repartition_distance = tenant.repo.get_checkpoint_distance() / 10;
|
||||
|
||||
let page_tline = Arc::new(DatadirTimelineImpl::new(tline, repartition_distance));
|
||||
page_tline.init_logical_size()?;
|
||||
tenant.timelines.insert(timelineid, Arc::clone(&page_tline));
|
||||
tenant
|
||||
.local_timelines
|
||||
.insert(timeline_id, Arc::clone(&page_tline));
|
||||
Ok(page_tline)
|
||||
}
|
||||
|
||||
@@ -331,15 +326,87 @@ pub struct TenantInfo {
|
||||
pub state: TenantState,
|
||||
}
|
||||
|
||||
pub fn list_tenants() -> Result<Vec<TenantInfo>> {
|
||||
access_tenants()
|
||||
pub fn list_tenants() -> Vec<TenantInfo> {
|
||||
tenants_state::read_tenants()
|
||||
.iter()
|
||||
.map(|v| {
|
||||
let (id, tenant) = v;
|
||||
Ok(TenantInfo {
|
||||
id: *id,
|
||||
state: tenant.state,
|
||||
})
|
||||
.map(|(id, tenant)| TenantInfo {
|
||||
id: *id,
|
||||
state: tenant.state,
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn init_local_repositories(
|
||||
conf: &'static PageServerConf,
|
||||
local_timeline_init_statuses: HashMap<ZTenantId, HashMap<ZTimelineId, LocalTimelineInitStatus>>,
|
||||
remote_index: &RemoteIndex,
|
||||
) -> anyhow::Result<(), anyhow::Error> {
|
||||
for (tenant_id, local_timeline_init_statuses) in local_timeline_init_statuses {
|
||||
// initialize local tenant
|
||||
let repo = load_local_repo(conf, tenant_id, remote_index)
|
||||
.with_context(|| format!("Failed to load repo for tenant {}", tenant_id))?;
|
||||
for (timeline_id, init_status) in local_timeline_init_statuses {
|
||||
match init_status {
|
||||
LocalTimelineInitStatus::LocallyComplete => {
|
||||
debug!("timeline {} for tenant {} is locally complete, registering it in repository", timeline_id, tenant_id);
|
||||
// Lets fail here loudly to be on the safe side.
|
||||
// XXX: It may be a better api to actually distinguish between repository startup
|
||||
// and processing of newly downloaded timelines.
|
||||
repo.apply_timeline_remote_sync_status_update(
|
||||
timeline_id,
|
||||
TimelineSyncStatusUpdate::Downloaded,
|
||||
)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to bootstrap timeline {} for tenant {}",
|
||||
timeline_id, tenant_id
|
||||
)
|
||||
})?
|
||||
}
|
||||
LocalTimelineInitStatus::NeedsSync => {
|
||||
debug!(
|
||||
"timeline {} for tenant {} needs sync, \
|
||||
so skipped for adding into repository until sync is finished",
|
||||
tenant_id, timeline_id
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Sets up wal redo manager and repository for tenant. Reduces code duplication.
|
||||
// Used during pageserver startup, or when new tenant is attached to pageserver.
|
||||
fn load_local_repo(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: ZTenantId,
|
||||
remote_index: &RemoteIndex,
|
||||
) -> anyhow::Result<Arc<RepositoryImpl>> {
|
||||
let mut m = tenants_state::write_tenants();
|
||||
let tenant = m.entry(tenant_id).or_insert_with(|| {
|
||||
// Set up a WAL redo manager, for applying WAL records.
|
||||
let walredo_mgr = PostgresRedoManager::new(conf, tenant_id);
|
||||
|
||||
// Set up an object repository, for actual data storage.
|
||||
let repo: Arc<LayeredRepository> = Arc::new(LayeredRepository::new(
|
||||
conf,
|
||||
TenantConfOpt::default(),
|
||||
Arc::new(walredo_mgr),
|
||||
tenant_id,
|
||||
remote_index.clone(),
|
||||
conf.remote_storage_config.is_some(),
|
||||
));
|
||||
Tenant {
|
||||
state: TenantState::Idle,
|
||||
repo,
|
||||
local_timelines: HashMap::new(),
|
||||
}
|
||||
});
|
||||
|
||||
// Restore tenant config
|
||||
let tenant_conf = LayeredRepository::load_tenant_config(conf, tenant_id)?;
|
||||
tenant.repo.update_tenant_config(tenant_conf)?;
|
||||
|
||||
Ok(Arc::clone(&tenant.repo))
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
//! Timeline management code
|
||||
//
|
||||
|
||||
use anyhow::{bail, Context, Result};
|
||||
use anyhow::{bail, ensure, Context, Result};
|
||||
use postgres_ffi::ControlFileData;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::{serde_as, DisplayFromStr};
|
||||
@@ -106,7 +106,7 @@ impl LocalTimelineInfo {
|
||||
match repo_timeline {
|
||||
RepositoryTimeline::Loaded(_) => {
|
||||
let datadir_tline =
|
||||
tenant_mgr::get_timeline_for_tenant_load(tenant_id, timeline_id)?;
|
||||
tenant_mgr::get_local_timeline_with_load(tenant_id, timeline_id)?;
|
||||
Self::from_loaded_timeline(&datadir_tline, include_non_incremental_logical_size)
|
||||
}
|
||||
RepositoryTimeline::Unloaded { metadata } => Ok(Self::from_unloaded_timeline(metadata)),
|
||||
@@ -152,7 +152,7 @@ pub fn init_pageserver(
|
||||
|
||||
if let Some(tenant_id) = create_tenant {
|
||||
println!("initializing tenantid {}", tenant_id);
|
||||
let repo = create_repo(conf, Default::default(), tenant_id, CreateRepo::Dummy)
|
||||
let repo = create_repo(conf, TenantConfOpt::default(), tenant_id, CreateRepo::Dummy)
|
||||
.context("failed to create repo")?;
|
||||
let new_timeline_id = initial_timeline_id.unwrap_or_else(ZTimelineId::generate);
|
||||
bootstrap_timeline(conf, tenant_id, new_timeline_id, repo.as_ref())
|
||||
@@ -203,9 +203,11 @@ pub fn create_repo(
|
||||
};
|
||||
|
||||
let repo_dir = conf.tenant_path(&tenant_id);
|
||||
if repo_dir.exists() {
|
||||
bail!("tenant {} directory already exists", tenant_id);
|
||||
}
|
||||
ensure!(
|
||||
repo_dir.exists(),
|
||||
"cannot create new tenant repo: '{}' directory already exists",
|
||||
tenant_id
|
||||
);
|
||||
|
||||
// top-level dir may exist if we are creating it through CLI
|
||||
crashsafe_dir::create_dir_all(&repo_dir)
|
||||
@@ -383,7 +385,7 @@ pub(crate) fn create_timeline(
|
||||
repo.branch_timeline(ancestor_timeline_id, new_timeline_id, start_lsn)?;
|
||||
// load the timeline into memory
|
||||
let loaded_timeline =
|
||||
tenant_mgr::get_timeline_for_tenant_load(tenant_id, new_timeline_id)?;
|
||||
tenant_mgr::get_local_timeline_with_load(tenant_id, new_timeline_id)?;
|
||||
LocalTimelineInfo::from_loaded_timeline(&loaded_timeline, false)
|
||||
.context("cannot fill timeline info")?
|
||||
}
|
||||
@@ -391,7 +393,7 @@ pub(crate) fn create_timeline(
|
||||
bootstrap_timeline(conf, tenant_id, new_timeline_id, repo.as_ref())?;
|
||||
// load the timeline into memory
|
||||
let new_timeline =
|
||||
tenant_mgr::get_timeline_for_tenant_load(tenant_id, new_timeline_id)?;
|
||||
tenant_mgr::get_local_timeline_with_load(tenant_id, new_timeline_id)?;
|
||||
LocalTimelineInfo::from_loaded_timeline(&new_timeline, false)
|
||||
.context("cannot fill timeline info")?
|
||||
}
|
||||
|
||||
@@ -184,7 +184,7 @@ fn walreceiver_main(
|
||||
let repo = tenant_mgr::get_repository_for_tenant(tenant_id)
|
||||
.with_context(|| format!("no repository found for tenant {}", tenant_id))?;
|
||||
let timeline =
|
||||
tenant_mgr::get_timeline_for_tenant_load(tenant_id, timeline_id).with_context(|| {
|
||||
tenant_mgr::get_local_timeline_with_load(tenant_id, timeline_id).with_context(|| {
|
||||
format!(
|
||||
"local timeline {} not found for tenant {}",
|
||||
timeline_id, tenant_id
|
||||
|
||||
Reference in New Issue
Block a user