mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 08:52:56 +00:00
pageserver: refactor Tenant/Timeline args into structs (#5053)
## Problem There are some common types that we pass into tenants and timelines as we construct them, such as remote storage and the broker client. Currently the list is small, but this is likely to grow -- the deletion queue PR (#4960) pushed some methods to the point of clippy complaining they had too many args, because of the extra deletion queue client being passed around. There are some shared objects that currently aren't passed around explicitly because they use a static `once_cell` (e.g. CONCURRENT_COMPACTIONS), but as we add more resource management and concurreny control over time, it will be more readable & testable to pass a type around in the respective Resources object, rather than to coordinate via static objects. The `Resources` structures in this PR will make it easier to add references to central coordination functions, without having to rely on statics. ## Summary of changes - For `Tenant`, the `broker_client` and `remote_storage` are bundled into `TenantSharedResources` - For `Timeline`, the `remote_client` is wrapped into `TimelineResources`. Both of these structures will get an additional deletion queue member in #4960.
This commit is contained in:
@@ -11,6 +11,7 @@ use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp};
|
||||
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
|
||||
use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING};
|
||||
use pageserver::task_mgr::WALRECEIVER_RUNTIME;
|
||||
use pageserver::tenant::TenantSharedResources;
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
use tokio::time::Instant;
|
||||
use tracing::*;
|
||||
@@ -382,8 +383,10 @@ fn start_pageserver(
|
||||
|
||||
BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
|
||||
conf,
|
||||
broker_client.clone(),
|
||||
remote_storage.clone(),
|
||||
TenantSharedResources {
|
||||
broker_client: broker_client.clone(),
|
||||
remote_storage: remote_storage.clone(),
|
||||
},
|
||||
order,
|
||||
))?;
|
||||
|
||||
|
||||
@@ -56,6 +56,7 @@ use self::remote_timeline_client::RemoteTimelineClient;
|
||||
use self::timeline::uninit::TimelineUninitMark;
|
||||
use self::timeline::uninit::UninitializedTimeline;
|
||||
use self::timeline::EvictionTaskTenantState;
|
||||
use self::timeline::TimelineResources;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::import_datadir;
|
||||
@@ -150,6 +151,14 @@ pub const TENANT_ATTACHING_MARKER_FILENAME: &str = "attaching";
|
||||
|
||||
pub const TENANT_DELETED_MARKER_FILE_NAME: &str = "deleted";
|
||||
|
||||
/// References to shared objects that are passed into each tenant, such
|
||||
/// as the shared remote storage client and process initialization state.
|
||||
#[derive(Clone)]
|
||||
pub struct TenantSharedResources {
|
||||
pub broker_client: storage_broker::BrokerClientChannel,
|
||||
pub remote_storage: Option<GenericRemoteStorage>,
|
||||
}
|
||||
|
||||
///
|
||||
/// Tenant consists of multiple timelines. Keep them in a hash table.
|
||||
///
|
||||
@@ -389,7 +398,7 @@ impl Tenant {
|
||||
async fn timeline_init_and_sync(
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
remote_client: Option<RemoteTimelineClient>,
|
||||
resources: TimelineResources,
|
||||
remote_startup_data: Option<RemoteStartupData>,
|
||||
local_metadata: Option<TimelineMetadata>,
|
||||
ancestor: Option<Arc<Timeline>>,
|
||||
@@ -410,7 +419,7 @@ impl Tenant {
|
||||
timeline_id,
|
||||
up_to_date_metadata,
|
||||
ancestor.clone(),
|
||||
remote_client,
|
||||
resources,
|
||||
init_order,
|
||||
CreateTimelineCause::Load,
|
||||
)?;
|
||||
@@ -701,14 +710,22 @@ impl Tenant {
|
||||
.expect("just put it in above");
|
||||
|
||||
// TODO again handle early failure
|
||||
self.load_remote_timeline(timeline_id, index_part, remote_metadata, remote_client, ctx)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"failed to load remote timeline {} for tenant {}",
|
||||
timeline_id, self.tenant_id
|
||||
)
|
||||
})?;
|
||||
self.load_remote_timeline(
|
||||
timeline_id,
|
||||
index_part,
|
||||
remote_metadata,
|
||||
TimelineResources {
|
||||
remote_client: Some(remote_client),
|
||||
},
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"failed to load remote timeline {} for tenant {}",
|
||||
timeline_id, self.tenant_id
|
||||
)
|
||||
})?;
|
||||
}
|
||||
|
||||
// Walk through deleted timelines, resume deletion
|
||||
@@ -763,7 +780,7 @@ impl Tenant {
|
||||
timeline_id: TimelineId,
|
||||
index_part: IndexPart,
|
||||
remote_metadata: TimelineMetadata,
|
||||
remote_client: RemoteTimelineClient,
|
||||
resources: TimelineResources,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
span::debug_assert_current_span_has_tenant_id();
|
||||
@@ -793,7 +810,7 @@ impl Tenant {
|
||||
|
||||
self.timeline_init_and_sync(
|
||||
timeline_id,
|
||||
Some(remote_client),
|
||||
resources,
|
||||
Some(RemoteStartupData {
|
||||
index_part,
|
||||
remote_metadata,
|
||||
@@ -840,8 +857,7 @@ impl Tenant {
|
||||
pub(crate) fn spawn_load(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_id: TenantId,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
resources: TenantSharedResources,
|
||||
init_order: Option<InitializationOrder>,
|
||||
tenants: &'static tokio::sync::RwLock<TenantsMap>,
|
||||
ctx: &RequestContext,
|
||||
@@ -856,6 +872,9 @@ impl Tenant {
|
||||
}
|
||||
};
|
||||
|
||||
let broker_client = resources.broker_client;
|
||||
let remote_storage = resources.remote_storage;
|
||||
|
||||
let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenant_id));
|
||||
let tenant = Tenant::new(
|
||||
TenantState::Loading,
|
||||
@@ -1241,16 +1260,9 @@ impl Tenant {
|
||||
) -> Result<(), LoadLocalTimelineError> {
|
||||
span::debug_assert_current_span_has_tenant_id();
|
||||
|
||||
let remote_client = self.remote_storage.as_ref().map(|remote_storage| {
|
||||
RemoteTimelineClient::new(
|
||||
remote_storage.clone(),
|
||||
self.conf,
|
||||
self.tenant_id,
|
||||
timeline_id,
|
||||
)
|
||||
});
|
||||
let mut resources = self.build_timeline_resources(timeline_id);
|
||||
|
||||
let (remote_startup_data, remote_client) = match remote_client {
|
||||
let (remote_startup_data, remote_client) = match resources.remote_client {
|
||||
Some(remote_client) => match remote_client.download_index_file().await {
|
||||
Ok(index_part) => {
|
||||
let index_part = match index_part {
|
||||
@@ -1338,9 +1350,10 @@ impl Tenant {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
(None, remote_client)
|
||||
(None, resources.remote_client)
|
||||
}
|
||||
};
|
||||
resources.remote_client = remote_client;
|
||||
|
||||
let ancestor = if let Some(ancestor_timeline_id) = local_metadata.ancestor_timeline() {
|
||||
let ancestor_timeline = self.get_timeline(ancestor_timeline_id, false)
|
||||
@@ -1353,7 +1366,7 @@ impl Tenant {
|
||||
|
||||
self.timeline_init_and_sync(
|
||||
timeline_id,
|
||||
remote_client,
|
||||
resources,
|
||||
remote_startup_data,
|
||||
Some(local_metadata),
|
||||
ancestor,
|
||||
@@ -2225,7 +2238,7 @@ impl Tenant {
|
||||
new_timeline_id: TimelineId,
|
||||
new_metadata: &TimelineMetadata,
|
||||
ancestor: Option<Arc<Timeline>>,
|
||||
remote_client: Option<RemoteTimelineClient>,
|
||||
resources: TimelineResources,
|
||||
init_order: Option<&InitializationOrder>,
|
||||
cause: CreateTimelineCause,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
@@ -2254,7 +2267,7 @@ impl Tenant {
|
||||
new_timeline_id,
|
||||
self.tenant_id,
|
||||
Arc::clone(&self.walredo_mgr),
|
||||
remote_client,
|
||||
resources,
|
||||
pg_version,
|
||||
initial_logical_size_can_start.cloned(),
|
||||
initial_logical_size_attempt.cloned().flatten(),
|
||||
@@ -2902,6 +2915,23 @@ impl Tenant {
|
||||
Ok(timeline)
|
||||
}
|
||||
|
||||
/// Call this before constructing a timeline, to build its required structures
|
||||
fn build_timeline_resources(&self, timeline_id: TimelineId) -> TimelineResources {
|
||||
let remote_client = if let Some(remote_storage) = self.remote_storage.as_ref() {
|
||||
let remote_client = RemoteTimelineClient::new(
|
||||
remote_storage.clone(),
|
||||
self.conf,
|
||||
self.tenant_id,
|
||||
timeline_id,
|
||||
);
|
||||
Some(remote_client)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
TimelineResources { remote_client }
|
||||
}
|
||||
|
||||
/// Creates intermediate timeline structure and its files.
|
||||
///
|
||||
/// An empty layer map is initialized, and new data and WAL can be imported starting
|
||||
@@ -2918,25 +2948,17 @@ impl Tenant {
|
||||
) -> anyhow::Result<UninitializedTimeline> {
|
||||
let tenant_id = self.tenant_id;
|
||||
|
||||
let remote_client = if let Some(remote_storage) = self.remote_storage.as_ref() {
|
||||
let remote_client = RemoteTimelineClient::new(
|
||||
remote_storage.clone(),
|
||||
self.conf,
|
||||
tenant_id,
|
||||
new_timeline_id,
|
||||
);
|
||||
let resources = self.build_timeline_resources(new_timeline_id);
|
||||
if let Some(remote_client) = &resources.remote_client {
|
||||
remote_client.init_upload_queue_for_empty_remote(new_metadata)?;
|
||||
Some(remote_client)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
}
|
||||
|
||||
let timeline_struct = self
|
||||
.create_timeline_struct(
|
||||
new_timeline_id,
|
||||
new_metadata,
|
||||
ancestor,
|
||||
remote_client,
|
||||
resources,
|
||||
None,
|
||||
CreateTimelineCause::Load,
|
||||
)
|
||||
|
||||
@@ -29,6 +29,7 @@ use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use super::delete::DeleteTenantError;
|
||||
use super::timeline::delete::DeleteTimelineFlow;
|
||||
use super::TenantSharedResources;
|
||||
|
||||
/// The tenants known to the pageserver.
|
||||
/// The enum variants are used to distinguish the different states that the pageserver can be in.
|
||||
@@ -66,8 +67,7 @@ static TENANTS: Lazy<RwLock<TenantsMap>> = Lazy::new(|| RwLock::new(TenantsMap::
|
||||
#[instrument(skip_all)]
|
||||
pub async fn init_tenant_mgr(
|
||||
conf: &'static PageServerConf,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
resources: TenantSharedResources,
|
||||
init_order: InitializationOrder,
|
||||
) -> anyhow::Result<()> {
|
||||
// Scan local filesystem for attached tenants
|
||||
@@ -125,8 +125,7 @@ pub async fn init_tenant_mgr(
|
||||
match schedule_local_tenant_processing(
|
||||
conf,
|
||||
&tenant_dir_path,
|
||||
broker_client.clone(),
|
||||
remote_storage.clone(),
|
||||
resources.clone(),
|
||||
Some(init_order.clone()),
|
||||
&TENANTS,
|
||||
&ctx,
|
||||
@@ -162,8 +161,7 @@ pub async fn init_tenant_mgr(
|
||||
pub(crate) fn schedule_local_tenant_processing(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_path: &Path,
|
||||
broker_client: storage_broker::BrokerClientChannel,
|
||||
remote_storage: Option<GenericRemoteStorage>,
|
||||
resources: TenantSharedResources,
|
||||
init_order: Option<InitializationOrder>,
|
||||
tenants: &'static tokio::sync::RwLock<TenantsMap>,
|
||||
ctx: &RequestContext,
|
||||
@@ -200,9 +198,15 @@ pub(crate) fn schedule_local_tenant_processing(
|
||||
|
||||
let tenant = if conf.tenant_attaching_mark_file_path(&tenant_id).exists() {
|
||||
info!("tenant {tenant_id} has attaching mark file, resuming its attach operation");
|
||||
if let Some(remote_storage) = remote_storage {
|
||||
match Tenant::spawn_attach(conf, tenant_id, broker_client, tenants, remote_storage, ctx)
|
||||
{
|
||||
if let Some(remote_storage) = resources.remote_storage {
|
||||
match Tenant::spawn_attach(
|
||||
conf,
|
||||
tenant_id,
|
||||
resources.broker_client,
|
||||
tenants,
|
||||
remote_storage,
|
||||
ctx,
|
||||
) {
|
||||
Ok(tenant) => tenant,
|
||||
Err(e) => {
|
||||
error!("Failed to spawn_attach tenant {tenant_id}, reason: {e:#}");
|
||||
@@ -220,15 +224,7 @@ pub(crate) fn schedule_local_tenant_processing(
|
||||
} else {
|
||||
info!("tenant {tenant_id} is assumed to be loadable, starting load operation");
|
||||
// Start loading the tenant into memory. It will initially be in Loading state.
|
||||
Tenant::spawn_load(
|
||||
conf,
|
||||
tenant_id,
|
||||
broker_client,
|
||||
remote_storage,
|
||||
init_order,
|
||||
tenants,
|
||||
ctx,
|
||||
)
|
||||
Tenant::spawn_load(conf, tenant_id, resources, init_order, tenants, ctx)
|
||||
};
|
||||
Ok(tenant)
|
||||
}
|
||||
@@ -363,8 +359,12 @@ pub async fn create_tenant(
|
||||
// TODO: tenant directory remains on disk if we bail out from here on.
|
||||
// See https://github.com/neondatabase/neon/issues/4233
|
||||
|
||||
let tenant_resources = TenantSharedResources {
|
||||
broker_client,
|
||||
remote_storage,
|
||||
};
|
||||
let created_tenant =
|
||||
schedule_local_tenant_processing(conf, &tenant_directory, broker_client, remote_storage, None, &TENANTS, ctx)?;
|
||||
schedule_local_tenant_processing(conf, &tenant_directory, tenant_resources, None, &TENANTS, ctx)?;
|
||||
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
|
||||
// See https://github.com/neondatabase/neon/issues/4233
|
||||
|
||||
@@ -523,7 +523,11 @@ pub async fn load_tenant(
|
||||
.with_context(|| format!("Failed to remove tenant ignore mark {tenant_ignore_mark:?} during tenant loading"))?;
|
||||
}
|
||||
|
||||
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, broker_client, remote_storage, None, &TENANTS, ctx)
|
||||
let resources = TenantSharedResources {
|
||||
broker_client,
|
||||
remote_storage,
|
||||
};
|
||||
let new_tenant = schedule_local_tenant_processing(conf, &tenant_path, resources, None, &TENANTS, ctx)
|
||||
.with_context(|| {
|
||||
format!("Failed to schedule tenant processing in path {tenant_path:?}")
|
||||
})?;
|
||||
@@ -604,7 +608,11 @@ pub async fn attach_tenant(
|
||||
.context("check for attach marker file existence")?;
|
||||
anyhow::ensure!(marker_file_exists, "create_tenant_files should have created the attach marker file");
|
||||
|
||||
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, broker_client, Some(remote_storage), None, &TENANTS, ctx)?;
|
||||
let resources = TenantSharedResources {
|
||||
broker_client,
|
||||
remote_storage: Some(remote_storage),
|
||||
};
|
||||
let attached_tenant = schedule_local_tenant_processing(conf, &tenant_dir, resources, None, &TENANTS, ctx)?;
|
||||
// TODO: tenant object & its background loops remain, untracked in tenant map, if we fail here.
|
||||
// See https://github.com/neondatabase/neon/issues/4233
|
||||
|
||||
|
||||
@@ -140,6 +140,12 @@ fn drop_rlock<T>(rlock: tokio::sync::OwnedRwLockReadGuard<T>) {
|
||||
fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
|
||||
drop(rlock)
|
||||
}
|
||||
|
||||
/// The outward-facing resources required to build a Timeline
|
||||
pub struct TimelineResources {
|
||||
pub remote_client: Option<RemoteTimelineClient>,
|
||||
}
|
||||
|
||||
pub struct Timeline {
|
||||
conf: &'static PageServerConf,
|
||||
tenant_conf: Arc<RwLock<TenantConfOpt>>,
|
||||
@@ -1374,7 +1380,7 @@ impl Timeline {
|
||||
timeline_id: TimelineId,
|
||||
tenant_id: TenantId,
|
||||
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
|
||||
remote_client: Option<RemoteTimelineClient>,
|
||||
resources: TimelineResources,
|
||||
pg_version: u32,
|
||||
initial_logical_size_can_start: Option<completion::Barrier>,
|
||||
initial_logical_size_attempt: Option<completion::Completion>,
|
||||
@@ -1409,7 +1415,7 @@ impl Timeline {
|
||||
walredo_mgr,
|
||||
walreceiver: Mutex::new(None),
|
||||
|
||||
remote_client: remote_client.map(Arc::new),
|
||||
remote_client: resources.remote_client.map(Arc::new),
|
||||
|
||||
// initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'.
|
||||
last_record_lsn: SeqWait::new(RecordLsn {
|
||||
|
||||
@@ -25,7 +25,7 @@ use crate::{
|
||||
InitializationOrder,
|
||||
};
|
||||
|
||||
use super::Timeline;
|
||||
use super::{Timeline, TimelineResources};
|
||||
|
||||
/// Now that the Timeline is in Stopping state, request all the related tasks to shut down.
|
||||
async fn stop_tasks(timeline: &Timeline) -> Result<(), DeleteTimelineError> {
|
||||
@@ -416,7 +416,7 @@ impl DeleteTimelineFlow {
|
||||
timeline_id,
|
||||
local_metadata,
|
||||
None, // Ancestor is not needed for deletion.
|
||||
remote_client,
|
||||
TimelineResources { remote_client },
|
||||
init_order,
|
||||
// Important. We dont pass ancestor above because it can be missing.
|
||||
// Thus we need to skip the validation here.
|
||||
|
||||
Reference in New Issue
Block a user