mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-19 14:10:37 +00:00
pageserver: propagate secondary config into downloader
This commit is contained in:
@@ -466,14 +466,17 @@ pub async fn init_tenant_mgr(
|
||||
*gen
|
||||
} else {
|
||||
match &location_conf.mode {
|
||||
LocationMode::Secondary(_) => {
|
||||
LocationMode::Secondary(secondary_config) => {
|
||||
// We do not require the control plane's permission for secondary mode
|
||||
// tenants, because they do no remote writes and hence require no
|
||||
// generation number
|
||||
info!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Loaded tenant in secondary mode");
|
||||
tenants.insert(
|
||||
tenant_shard_id,
|
||||
TenantSlot::Secondary(SecondaryTenant::new(tenant_shard_id)),
|
||||
TenantSlot::Secondary(SecondaryTenant::new(
|
||||
tenant_shard_id,
|
||||
secondary_config,
|
||||
)),
|
||||
);
|
||||
}
|
||||
LocationMode::Attached(_) => {
|
||||
@@ -892,10 +895,15 @@ impl TenantManager {
|
||||
debug_assert_current_span_has_tenant_id();
|
||||
info!("configuring tenant location to state {new_location_config:?}");
|
||||
|
||||
enum FastPathModified {
|
||||
Attached(Arc<Tenant>),
|
||||
Secondary(Arc<SecondaryTenant>),
|
||||
}
|
||||
|
||||
// Special case fast-path for updates to Tenant: if our upsert is only updating configuration,
|
||||
// then we do not need to set the slot to InProgress, we can just call into the
|
||||
// existng tenant.
|
||||
let modify_tenant = {
|
||||
let fast_path_taken = {
|
||||
let locked = self.tenants.read().unwrap();
|
||||
let peek_slot =
|
||||
tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Write)?;
|
||||
@@ -909,12 +917,19 @@ impl TenantManager {
|
||||
new_location_config.clone(),
|
||||
)?);
|
||||
|
||||
Some(tenant.clone())
|
||||
Some(FastPathModified::Attached(tenant.clone()))
|
||||
} else {
|
||||
// Different generations, fall through to general case
|
||||
None
|
||||
}
|
||||
}
|
||||
(
|
||||
LocationMode::Secondary(secondary_conf),
|
||||
Some(TenantSlot::Secondary(secondary_tenant)),
|
||||
) => {
|
||||
secondary_tenant.set_config(secondary_conf);
|
||||
Some(FastPathModified::Secondary(secondary_tenant.clone()))
|
||||
}
|
||||
_ => {
|
||||
// Not an Attached->Attached transition, fall through to general case
|
||||
None
|
||||
@@ -923,34 +938,46 @@ impl TenantManager {
|
||||
};
|
||||
|
||||
// Fast-path continued: having dropped out of the self.tenants lock, do the async
|
||||
// phase of waiting for flush, before returning.
|
||||
if let Some(tenant) = modify_tenant {
|
||||
// Transition to AttachedStale means we may well hold a valid generation
|
||||
// still, and have been requested to go stale as part of a migration. If
|
||||
// the caller set `flush`, then flush to remote storage.
|
||||
if let LocationMode::Attached(AttachedLocationConfig {
|
||||
generation: _,
|
||||
attach_mode: AttachmentMode::Stale,
|
||||
}) = &new_location_config.mode
|
||||
{
|
||||
if let Some(flush_timeout) = flush {
|
||||
match tokio::time::timeout(flush_timeout, tenant.flush_remote()).await {
|
||||
Ok(Err(e)) => {
|
||||
return Err(e);
|
||||
}
|
||||
Ok(Ok(_)) => return Ok(()),
|
||||
Err(_) => {
|
||||
tracing::warn!(
|
||||
// phase of writing config and/or waiting for flush, before returning.
|
||||
match fast_path_taken {
|
||||
Some(FastPathModified::Attached(tenant)) => {
|
||||
Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
|
||||
.await
|
||||
.map_err(SetNewTenantConfigError::Persist)?;
|
||||
|
||||
// Transition to AttachedStale means we may well hold a valid generation
|
||||
// still, and have been requested to go stale as part of a migration. If
|
||||
// the caller set `flush`, then flush to remote storage.
|
||||
if let LocationMode::Attached(AttachedLocationConfig {
|
||||
generation: _,
|
||||
attach_mode: AttachmentMode::Stale,
|
||||
}) = &new_location_config.mode
|
||||
{
|
||||
if let Some(flush_timeout) = flush {
|
||||
match tokio::time::timeout(flush_timeout, tenant.flush_remote()).await {
|
||||
Ok(Err(e)) => {
|
||||
return Err(e);
|
||||
}
|
||||
Ok(Ok(_)) => return Ok(()),
|
||||
Err(_) => {
|
||||
tracing::warn!(
|
||||
timeout_ms = flush_timeout.as_millis(),
|
||||
"Timed out waiting for flush to remote storage, proceeding anyway."
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
Some(FastPathModified::Secondary(_secondary_tenant)) => {
|
||||
Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
|
||||
.await
|
||||
.map_err(SetNewTenantConfigError::Persist)?;
|
||||
}
|
||||
None => {}
|
||||
};
|
||||
|
||||
// General case for upserts to TenantsMap, excluding the case above: we will substitute an
|
||||
// InProgress value to the slot while we make whatever changes are required. The state for
|
||||
@@ -1006,7 +1033,7 @@ impl TenantManager {
|
||||
let timelines_path = self.conf.timelines_path(&tenant_shard_id);
|
||||
|
||||
let new_slot = match &new_location_config.mode {
|
||||
LocationMode::Secondary(_) => {
|
||||
LocationMode::Secondary(secondary_config) => {
|
||||
// Directory doesn't need to be fsync'd because if we crash it can
|
||||
// safely be recreated next time this tenant location is configured.
|
||||
tokio::fs::create_dir_all(&timelines_path)
|
||||
@@ -1017,7 +1044,7 @@ impl TenantManager {
|
||||
.await
|
||||
.map_err(SetNewTenantConfigError::Persist)?;
|
||||
|
||||
TenantSlot::Secondary(SecondaryTenant::new(tenant_shard_id))
|
||||
TenantSlot::Secondary(SecondaryTenant::new(tenant_shard_id, secondary_config))
|
||||
}
|
||||
LocationMode::Attached(_attach_config) => {
|
||||
// Directory doesn't need to be fsync'd because we do not depend on
|
||||
|
||||
@@ -18,6 +18,7 @@ use self::{
|
||||
};
|
||||
|
||||
use super::{
|
||||
config::SecondaryLocationConfig,
|
||||
mgr::TenantManager,
|
||||
storage_layer::{AsLayerDesc, Layer},
|
||||
timeline::DiskUsageEvictionInfo,
|
||||
@@ -85,12 +86,13 @@ pub(crate) struct SecondaryTenant {
|
||||
pub(crate) gate: Gate,
|
||||
|
||||
detail: std::sync::Mutex<SecondaryDetail>,
|
||||
// TODO: propagate the `warm` from LocationConf into here, and respect it when doing downloads
|
||||
}
|
||||
|
||||
impl SecondaryTenant {
|
||||
pub(crate) fn new(tenant_shard_id: TenantShardId) -> Arc<Self> {
|
||||
// TODO; consider whether we really need to Arc this
|
||||
pub(crate) fn new(
|
||||
tenant_shard_id: TenantShardId,
|
||||
config: &SecondaryLocationConfig,
|
||||
) -> Arc<Self> {
|
||||
Arc::new(Self {
|
||||
tenant_shard_id,
|
||||
// todo: shall we make this a descendent of the
|
||||
@@ -100,7 +102,7 @@ impl SecondaryTenant {
|
||||
cancel: CancellationToken::new(),
|
||||
gate: Gate::new(format!("SecondaryTenant {tenant_shard_id}")),
|
||||
|
||||
detail: std::sync::Mutex::default(),
|
||||
detail: std::sync::Mutex::new(SecondaryDetail::new(config.clone())),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -111,6 +113,10 @@ impl SecondaryTenant {
|
||||
self.gate.close().await;
|
||||
}
|
||||
|
||||
pub(crate) fn set_config(&self, config: &SecondaryLocationConfig) {
|
||||
self.detail.lock().unwrap().config = config.clone();
|
||||
}
|
||||
|
||||
pub(crate) async fn get_layers_for_eviction(
|
||||
&self,
|
||||
conf: &'static PageServerConf,
|
||||
|
||||
@@ -9,6 +9,7 @@ use crate::{
|
||||
config::PageServerConf,
|
||||
metrics::SECONDARY_MODE,
|
||||
tenant::{
|
||||
config::SecondaryLocationConfig,
|
||||
debug_assert_current_span_has_tenant_and_timeline_id,
|
||||
remote_timeline_client::{index::LayerFileMetadata, HEATMAP_BASENAME},
|
||||
span::debug_assert_current_span_has_tenant_id,
|
||||
@@ -49,9 +50,9 @@ use super::{
|
||||
/// calling it again. This is approximately the time by which local data is allowed
|
||||
/// to fall behind remote data.
|
||||
///
|
||||
/// TODO: this should be an upper bound, and tenants that are uploading regularly
|
||||
/// should adaptively freshen more often (e.g. a tenant writing 1 layer per second
|
||||
/// should not wait a minute between freshens)
|
||||
/// TODO: this should just be a default, and the actual period should be controlled
|
||||
/// via the heatmap itself
|
||||
/// (https://github.com/neondatabase/neon/issues/6200)
|
||||
const DOWNLOAD_FRESHEN_INTERVAL: Duration = Duration::from_millis(60000);
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -86,8 +87,10 @@ pub(super) struct SecondaryDetailTimeline {
|
||||
|
||||
/// This state is written by the secondary downloader, it is opaque
|
||||
/// to TenantManager
|
||||
#[derive(Default, Debug)]
|
||||
#[derive(Debug)]
|
||||
pub(super) struct SecondaryDetail {
|
||||
pub(super) config: SecondaryLocationConfig,
|
||||
|
||||
last_download: Option<Instant>,
|
||||
next_download: Option<Instant>,
|
||||
pub(super) timelines: HashMap<TimelineId, SecondaryDetailTimeline>,
|
||||
@@ -100,6 +103,15 @@ fn strftime(t: &'_ SystemTime) -> DelayedFormat<StrftimeItems<'_>> {
|
||||
}
|
||||
|
||||
impl SecondaryDetail {
|
||||
pub(super) fn new(config: SecondaryLocationConfig) -> Self {
|
||||
Self {
|
||||
config,
|
||||
last_download: None,
|
||||
next_download: None,
|
||||
timelines: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn is_uninit(&self) -> bool {
|
||||
// FIXME: empty timelines is not synonymous with not initialized, as it is legal for
|
||||
// a tenant to exist with no timelines.
|
||||
@@ -259,6 +271,13 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
|
||||
.filter_map(|c| {
|
||||
let (last_download, next_download) = {
|
||||
let mut detail = c.detail.lock().unwrap();
|
||||
|
||||
if !detail.config.warm {
|
||||
// Downloads are disabled for this tenant
|
||||
detail.next_download = None;
|
||||
return None;
|
||||
}
|
||||
|
||||
if detail.next_download.is_none() {
|
||||
// Initialize with a jitter: this spreads initial downloads on startup
|
||||
// or mass-attach across our freshen interval.
|
||||
@@ -579,6 +598,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
debug_assert_current_span_has_tenant_id();
|
||||
let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
|
||||
// TODO: make download conditional on ETag having changed since last download
|
||||
// (https://github.com/neondatabase/neon/issues/6199)
|
||||
tracing::debug!("Downloading heatmap for secondary tenant",);
|
||||
|
||||
let heatmap_path = remote_heatmap_path(tenant_shard_id);
|
||||
|
||||
Reference in New Issue
Block a user