mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-22 21:59:59 +00:00
pageserver: use bounded sender for basebackup cache (#12342)
## Problem Basebackup cache now uses unbounded channel for prepare requests. In theory it can grow large if the cache is hung and does not process the requests. - Part of https://github.com/neondatabase/cloud/issues/29353 ## Summary of changes - Replace an unbounded channel with a bounded one, the size is configurable. - Add `pageserver_basebackup_cache_prepare_queue_size` to observe the size of the queue. - Refactor a bit to move all metrics logic to `basebackup_cache.rs`
This commit is contained in:
@@ -371,6 +371,9 @@ pub struct BasebackupCacheConfig {
|
||||
// TODO(diko): support max_entry_size_bytes.
|
||||
// pub max_entry_size_bytes: u64,
|
||||
pub max_size_entries: usize,
|
||||
/// Size of the channel used to send prepare requests to the basebackup cache worker.
|
||||
/// If exceeded, new prepare requests will be dropped.
|
||||
pub prepare_channel_size: usize,
|
||||
}
|
||||
|
||||
impl Default for BasebackupCacheConfig {
|
||||
@@ -380,6 +383,7 @@ impl Default for BasebackupCacheConfig {
|
||||
max_total_size_bytes: 1024 * 1024 * 1024, // 1 GiB
|
||||
// max_entry_size_bytes: 16 * 1024 * 1024, // 16 MiB
|
||||
max_size_entries: 1000,
|
||||
prepare_channel_size: 100,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ use metrics::core::{AtomicU64, GenericCounter};
|
||||
use pageserver_api::{config::BasebackupCacheConfig, models::TenantState};
|
||||
use tokio::{
|
||||
io::{AsyncWriteExt, BufWriter},
|
||||
sync::mpsc::{UnboundedReceiver, UnboundedSender},
|
||||
sync::mpsc::{Receiver, Sender, error::TrySendError},
|
||||
};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::{
|
||||
@@ -19,8 +19,8 @@ use crate::{
|
||||
basebackup::send_basebackup_tarball,
|
||||
context::{DownloadBehavior, RequestContext},
|
||||
metrics::{
|
||||
BASEBACKUP_CACHE_ENTRIES, BASEBACKUP_CACHE_PREPARE, BASEBACKUP_CACHE_READ,
|
||||
BASEBACKUP_CACHE_SIZE,
|
||||
BASEBACKUP_CACHE_ENTRIES, BASEBACKUP_CACHE_PREPARE, BASEBACKUP_CACHE_PREPARE_QUEUE_SIZE,
|
||||
BASEBACKUP_CACHE_READ, BASEBACKUP_CACHE_SIZE,
|
||||
},
|
||||
task_mgr::TaskKind,
|
||||
tenant::{
|
||||
@@ -35,8 +35,8 @@ pub struct BasebackupPrepareRequest {
|
||||
pub lsn: Lsn,
|
||||
}
|
||||
|
||||
pub type BasebackupPrepareSender = UnboundedSender<BasebackupPrepareRequest>;
|
||||
pub type BasebackupPrepareReceiver = UnboundedReceiver<BasebackupPrepareRequest>;
|
||||
pub type BasebackupPrepareSender = Sender<BasebackupPrepareRequest>;
|
||||
pub type BasebackupPrepareReceiver = Receiver<BasebackupPrepareRequest>;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct CacheEntry {
|
||||
@@ -60,40 +60,65 @@ struct CacheEntry {
|
||||
/// and ~1 RPS for get requests.
|
||||
pub struct BasebackupCache {
|
||||
data_dir: Utf8PathBuf,
|
||||
config: Option<BasebackupCacheConfig>,
|
||||
|
||||
entries: std::sync::Mutex<HashMap<TenantTimelineId, CacheEntry>>,
|
||||
|
||||
prepare_sender: BasebackupPrepareSender,
|
||||
|
||||
read_hit_count: GenericCounter<AtomicU64>,
|
||||
read_miss_count: GenericCounter<AtomicU64>,
|
||||
read_err_count: GenericCounter<AtomicU64>,
|
||||
|
||||
prepare_skip_count: GenericCounter<AtomicU64>,
|
||||
}
|
||||
|
||||
impl BasebackupCache {
|
||||
/// Creates a BasebackupCache and spawns the background task.
|
||||
/// The initialization of the cache is performed in the background and does not
|
||||
/// block the caller. The cache will return `None` for any get requests until
|
||||
/// initialization is complete.
|
||||
pub fn spawn(
|
||||
runtime_handle: &tokio::runtime::Handle,
|
||||
/// Create a new BasebackupCache instance.
|
||||
/// Also returns a BasebackupPrepareReceiver which is needed to start
|
||||
/// the background task.
|
||||
/// The cache is initialized from the data_dir in the background task.
|
||||
/// The cache will return `None` for any get requests until the initialization is complete.
|
||||
/// The background task is spawned separately using [`Self::spawn_background_task`]
|
||||
/// to avoid a circular dependency between the cache and the tenant manager.
|
||||
pub fn new(
|
||||
data_dir: Utf8PathBuf,
|
||||
config: Option<BasebackupCacheConfig>,
|
||||
prepare_receiver: BasebackupPrepareReceiver,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
cancel: CancellationToken,
|
||||
) -> Arc<Self> {
|
||||
) -> (Arc<Self>, BasebackupPrepareReceiver) {
|
||||
let chan_size = config.as_ref().map(|c| c.max_size_entries).unwrap_or(1);
|
||||
|
||||
let (prepare_sender, prepare_receiver) = tokio::sync::mpsc::channel(chan_size);
|
||||
|
||||
let cache = Arc::new(BasebackupCache {
|
||||
data_dir,
|
||||
|
||||
config,
|
||||
entries: std::sync::Mutex::new(HashMap::new()),
|
||||
prepare_sender,
|
||||
|
||||
read_hit_count: BASEBACKUP_CACHE_READ.with_label_values(&["hit"]),
|
||||
read_miss_count: BASEBACKUP_CACHE_READ.with_label_values(&["miss"]),
|
||||
read_err_count: BASEBACKUP_CACHE_READ.with_label_values(&["error"]),
|
||||
|
||||
prepare_skip_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["skip"]),
|
||||
});
|
||||
|
||||
if let Some(config) = config {
|
||||
(cache, prepare_receiver)
|
||||
}
|
||||
|
||||
/// Spawns the background task.
|
||||
/// The background task initializes the cache from the disk,
|
||||
/// processes prepare requests, and cleans up outdated cache entries.
|
||||
/// Noop if the cache is disabled (config is None).
|
||||
pub fn spawn_background_task(
|
||||
self: Arc<Self>,
|
||||
runtime_handle: &tokio::runtime::Handle,
|
||||
prepare_receiver: BasebackupPrepareReceiver,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
cancel: CancellationToken,
|
||||
) {
|
||||
if let Some(config) = self.config.clone() {
|
||||
let background = BackgroundTask {
|
||||
c: cache.clone(),
|
||||
c: self,
|
||||
|
||||
config,
|
||||
tenant_manager,
|
||||
@@ -108,8 +133,45 @@ impl BasebackupCache {
|
||||
};
|
||||
runtime_handle.spawn(background.run(prepare_receiver));
|
||||
}
|
||||
}
|
||||
|
||||
cache
|
||||
/// Send a basebackup prepare request to the background task.
|
||||
/// The basebackup will be prepared asynchronously, it does not block the caller.
|
||||
/// The request will be skipped if any cache limits are exceeded.
|
||||
pub fn send_prepare(&self, tenant_shard_id: TenantShardId, timeline_id: TimelineId, lsn: Lsn) {
|
||||
let req = BasebackupPrepareRequest {
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
lsn,
|
||||
};
|
||||
|
||||
BASEBACKUP_CACHE_PREPARE_QUEUE_SIZE.inc();
|
||||
let res = self.prepare_sender.try_send(req);
|
||||
|
||||
if let Err(e) = res {
|
||||
BASEBACKUP_CACHE_PREPARE_QUEUE_SIZE.dec();
|
||||
self.prepare_skip_count.inc();
|
||||
match e {
|
||||
TrySendError::Full(_) => {
|
||||
// Basebackup prepares are pretty rare, normally we should not hit this.
|
||||
tracing::info!(
|
||||
tenant_id = %tenant_shard_id.tenant_id,
|
||||
%timeline_id,
|
||||
%lsn,
|
||||
"Basebackup prepare channel is full, skipping the request"
|
||||
);
|
||||
}
|
||||
TrySendError::Closed(_) => {
|
||||
// Normal during shutdown, not critical.
|
||||
tracing::info!(
|
||||
tenant_id = %tenant_shard_id.tenant_id,
|
||||
%timeline_id,
|
||||
%lsn,
|
||||
"Basebackup prepare channel is closed, skipping the request"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Gets a basebackup entry from the cache.
|
||||
@@ -122,6 +184,10 @@ impl BasebackupCache {
|
||||
timeline_id: TimelineId,
|
||||
lsn: Lsn,
|
||||
) -> Option<tokio::fs::File> {
|
||||
if !self.is_enabled() {
|
||||
return None;
|
||||
}
|
||||
|
||||
// Fast path. Check if the entry exists using the in-memory state.
|
||||
let tti = TenantTimelineId::new(tenant_id, timeline_id);
|
||||
if self.entries.lock().unwrap().get(&tti).map(|e| e.lsn) != Some(lsn) {
|
||||
@@ -149,6 +215,10 @@ impl BasebackupCache {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_enabled(&self) -> bool {
|
||||
self.config.is_some()
|
||||
}
|
||||
|
||||
// Private methods.
|
||||
|
||||
fn entry_filename(tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn) -> String {
|
||||
@@ -366,6 +436,7 @@ impl BackgroundTask {
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(req) = prepare_receiver.recv() => {
|
||||
BASEBACKUP_CACHE_PREPARE_QUEUE_SIZE.dec();
|
||||
if let Err(err) = self.prepare_basebackup(
|
||||
req.tenant_shard_id,
|
||||
req.timeline_id,
|
||||
|
||||
@@ -569,8 +569,10 @@ fn start_pageserver(
|
||||
pageserver::l0_flush::L0FlushGlobalState::new(conf.l0_flush.clone());
|
||||
|
||||
// Scan the local 'tenants/' directory and start loading the tenants
|
||||
let (basebackup_prepare_sender, basebackup_prepare_receiver) =
|
||||
tokio::sync::mpsc::unbounded_channel();
|
||||
let (basebackup_cache, basebackup_prepare_receiver) = BasebackupCache::new(
|
||||
conf.basebackup_cache_dir(),
|
||||
conf.basebackup_cache_config.clone(),
|
||||
);
|
||||
let deletion_queue_client = deletion_queue.new_client();
|
||||
let background_purges = mgr::BackgroundPurges::default();
|
||||
|
||||
@@ -582,7 +584,7 @@ fn start_pageserver(
|
||||
remote_storage: remote_storage.clone(),
|
||||
deletion_queue_client,
|
||||
l0_flush_global_state,
|
||||
basebackup_prepare_sender,
|
||||
basebackup_cache: Arc::clone(&basebackup_cache),
|
||||
feature_resolver: feature_resolver.clone(),
|
||||
},
|
||||
shutdown_pageserver.clone(),
|
||||
@@ -590,10 +592,8 @@ fn start_pageserver(
|
||||
let tenant_manager = Arc::new(tenant_manager);
|
||||
BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(tenant_manager.clone(), order))?;
|
||||
|
||||
let basebackup_cache = BasebackupCache::spawn(
|
||||
basebackup_cache.spawn_background_task(
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
conf.basebackup_cache_dir(),
|
||||
conf.basebackup_cache_config.clone(),
|
||||
basebackup_prepare_receiver,
|
||||
Arc::clone(&tenant_manager),
|
||||
shutdown_pageserver.child_token(),
|
||||
@@ -806,7 +806,6 @@ fn start_pageserver(
|
||||
} else {
|
||||
None
|
||||
},
|
||||
basebackup_cache,
|
||||
);
|
||||
|
||||
// Spawn a Pageserver gRPC server task. It will spawn separate tasks for
|
||||
|
||||
@@ -4439,6 +4439,14 @@ pub(crate) static BASEBACKUP_CACHE_SIZE: Lazy<UIntGauge> = Lazy::new(|| {
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static BASEBACKUP_CACHE_PREPARE_QUEUE_SIZE: Lazy<UIntGauge> = Lazy::new(|| {
|
||||
register_uint_gauge!(
|
||||
"pageserver_basebackup_cache_prepare_queue_size",
|
||||
"Number of requests in the basebackup prepare channel"
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static PAGESERVER_CONFIG_IGNORED_ITEMS: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
register_uint_gauge_vec!(
|
||||
"pageserver_config_ignored_items",
|
||||
|
||||
@@ -62,7 +62,6 @@ use utils::{failpoint_support, span_record};
|
||||
|
||||
use crate::auth::check_permission;
|
||||
use crate::basebackup::{self, BasebackupError};
|
||||
use crate::basebackup_cache::BasebackupCache;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{
|
||||
DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
|
||||
@@ -137,7 +136,6 @@ pub fn spawn(
|
||||
perf_trace_dispatch: Option<Dispatch>,
|
||||
tcp_listener: tokio::net::TcpListener,
|
||||
tls_config: Option<Arc<rustls::ServerConfig>>,
|
||||
basebackup_cache: Arc<BasebackupCache>,
|
||||
) -> Listener {
|
||||
let cancel = CancellationToken::new();
|
||||
let libpq_ctx = RequestContext::todo_child(
|
||||
@@ -159,7 +157,6 @@ pub fn spawn(
|
||||
conf.pg_auth_type,
|
||||
tls_config,
|
||||
conf.page_service_pipelining.clone(),
|
||||
basebackup_cache,
|
||||
libpq_ctx,
|
||||
cancel.clone(),
|
||||
)
|
||||
@@ -218,7 +215,6 @@ pub async fn libpq_listener_main(
|
||||
auth_type: AuthType,
|
||||
tls_config: Option<Arc<rustls::ServerConfig>>,
|
||||
pipelining_config: PageServicePipeliningConfig,
|
||||
basebackup_cache: Arc<BasebackupCache>,
|
||||
listener_ctx: RequestContext,
|
||||
listener_cancel: CancellationToken,
|
||||
) -> Connections {
|
||||
@@ -262,7 +258,6 @@ pub async fn libpq_listener_main(
|
||||
auth_type,
|
||||
tls_config.clone(),
|
||||
pipelining_config.clone(),
|
||||
Arc::clone(&basebackup_cache),
|
||||
connection_ctx,
|
||||
connections_cancel.child_token(),
|
||||
gate_guard,
|
||||
@@ -305,7 +300,6 @@ async fn page_service_conn_main(
|
||||
auth_type: AuthType,
|
||||
tls_config: Option<Arc<rustls::ServerConfig>>,
|
||||
pipelining_config: PageServicePipeliningConfig,
|
||||
basebackup_cache: Arc<BasebackupCache>,
|
||||
connection_ctx: RequestContext,
|
||||
cancel: CancellationToken,
|
||||
gate_guard: GateGuard,
|
||||
@@ -371,7 +365,6 @@ async fn page_service_conn_main(
|
||||
pipelining_config,
|
||||
conf.get_vectored_concurrent_io,
|
||||
perf_span_fields,
|
||||
basebackup_cache,
|
||||
connection_ctx,
|
||||
cancel.clone(),
|
||||
gate_guard,
|
||||
@@ -425,8 +418,6 @@ struct PageServerHandler {
|
||||
pipelining_config: PageServicePipeliningConfig,
|
||||
get_vectored_concurrent_io: GetVectoredConcurrentIo,
|
||||
|
||||
basebackup_cache: Arc<BasebackupCache>,
|
||||
|
||||
gate_guard: GateGuard,
|
||||
}
|
||||
|
||||
@@ -912,7 +903,6 @@ impl PageServerHandler {
|
||||
pipelining_config: PageServicePipeliningConfig,
|
||||
get_vectored_concurrent_io: GetVectoredConcurrentIo,
|
||||
perf_span_fields: ConnectionPerfSpanFields,
|
||||
basebackup_cache: Arc<BasebackupCache>,
|
||||
connection_ctx: RequestContext,
|
||||
cancel: CancellationToken,
|
||||
gate_guard: GateGuard,
|
||||
@@ -926,7 +916,6 @@ impl PageServerHandler {
|
||||
cancel,
|
||||
pipelining_config,
|
||||
get_vectored_concurrent_io,
|
||||
basebackup_cache,
|
||||
gate_guard,
|
||||
}
|
||||
}
|
||||
@@ -2626,9 +2615,7 @@ impl PageServerHandler {
|
||||
&& lsn.is_some()
|
||||
&& prev_lsn.is_none()
|
||||
{
|
||||
self.basebackup_cache
|
||||
.get(tenant_id, timeline_id, lsn.unwrap())
|
||||
.await
|
||||
timeline.get_cached_basebackup(lsn.unwrap()).await
|
||||
} else {
|
||||
None
|
||||
}
|
||||
|
||||
@@ -80,7 +80,7 @@ use self::timeline::uninit::{TimelineCreateGuard, TimelineExclusionError, Uninit
|
||||
use self::timeline::{
|
||||
EvictionTaskTenantState, GcCutoffs, TimelineDeleteProgress, TimelineResources, WaitLsnError,
|
||||
};
|
||||
use crate::basebackup_cache::BasebackupPrepareSender;
|
||||
use crate::basebackup_cache::BasebackupCache;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context;
|
||||
use crate::context::RequestContextBuilder;
|
||||
@@ -162,7 +162,7 @@ pub struct TenantSharedResources {
|
||||
pub remote_storage: GenericRemoteStorage,
|
||||
pub deletion_queue_client: DeletionQueueClient,
|
||||
pub l0_flush_global_state: L0FlushGlobalState,
|
||||
pub basebackup_prepare_sender: BasebackupPrepareSender,
|
||||
pub basebackup_cache: Arc<BasebackupCache>,
|
||||
pub feature_resolver: FeatureResolver,
|
||||
}
|
||||
|
||||
@@ -331,7 +331,7 @@ pub struct TenantShard {
|
||||
deletion_queue_client: DeletionQueueClient,
|
||||
|
||||
/// A channel to send async requests to prepare a basebackup for the basebackup cache.
|
||||
basebackup_prepare_sender: BasebackupPrepareSender,
|
||||
basebackup_cache: Arc<BasebackupCache>,
|
||||
|
||||
/// Cached logical sizes updated updated on each [`TenantShard::gather_size_inputs`].
|
||||
cached_logical_sizes: tokio::sync::Mutex<HashMap<(TimelineId, Lsn), u64>>,
|
||||
@@ -1363,7 +1363,7 @@ impl TenantShard {
|
||||
remote_storage,
|
||||
deletion_queue_client,
|
||||
l0_flush_global_state,
|
||||
basebackup_prepare_sender,
|
||||
basebackup_cache,
|
||||
feature_resolver,
|
||||
} = resources;
|
||||
|
||||
@@ -1380,7 +1380,7 @@ impl TenantShard {
|
||||
remote_storage.clone(),
|
||||
deletion_queue_client,
|
||||
l0_flush_global_state,
|
||||
basebackup_prepare_sender,
|
||||
basebackup_cache,
|
||||
feature_resolver,
|
||||
));
|
||||
|
||||
@@ -4380,7 +4380,7 @@ impl TenantShard {
|
||||
remote_storage: GenericRemoteStorage,
|
||||
deletion_queue_client: DeletionQueueClient,
|
||||
l0_flush_global_state: L0FlushGlobalState,
|
||||
basebackup_prepare_sender: BasebackupPrepareSender,
|
||||
basebackup_cache: Arc<BasebackupCache>,
|
||||
feature_resolver: FeatureResolver,
|
||||
) -> TenantShard {
|
||||
assert!(!attached_conf.location.generation.is_none());
|
||||
@@ -4485,7 +4485,7 @@ impl TenantShard {
|
||||
ongoing_timeline_detach: std::sync::Mutex::default(),
|
||||
gc_block: Default::default(),
|
||||
l0_flush_global_state,
|
||||
basebackup_prepare_sender,
|
||||
basebackup_cache,
|
||||
feature_resolver,
|
||||
}
|
||||
}
|
||||
@@ -5414,7 +5414,7 @@ impl TenantShard {
|
||||
pagestream_throttle_metrics: self.pagestream_throttle_metrics.clone(),
|
||||
l0_compaction_trigger: self.l0_compaction_trigger.clone(),
|
||||
l0_flush_global_state: self.l0_flush_global_state.clone(),
|
||||
basebackup_prepare_sender: self.basebackup_prepare_sender.clone(),
|
||||
basebackup_cache: self.basebackup_cache.clone(),
|
||||
feature_resolver: self.feature_resolver.clone(),
|
||||
}
|
||||
}
|
||||
@@ -6000,7 +6000,7 @@ pub(crate) mod harness {
|
||||
) -> anyhow::Result<Arc<TenantShard>> {
|
||||
let walredo_mgr = Arc::new(WalRedoManager::from(TestRedoManager));
|
||||
|
||||
let (basebackup_requst_sender, _) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (basebackup_cache, _) = BasebackupCache::new(Utf8PathBuf::new(), None);
|
||||
|
||||
let tenant = Arc::new(TenantShard::new(
|
||||
TenantState::Attaching,
|
||||
@@ -6018,7 +6018,7 @@ pub(crate) mod harness {
|
||||
self.deletion_queue.new_client(),
|
||||
// TODO: ideally we should run all unit tests with both configs
|
||||
L0FlushGlobalState::new(L0FlushConfig::default()),
|
||||
basebackup_requst_sender,
|
||||
basebackup_cache,
|
||||
FeatureResolver::new_disabled(),
|
||||
));
|
||||
|
||||
|
||||
@@ -2891,14 +2891,18 @@ mod tests {
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use camino::Utf8PathBuf;
|
||||
use storage_broker::BrokerClientChannel;
|
||||
use tracing::Instrument;
|
||||
|
||||
use super::super::harness::TenantHarness;
|
||||
use super::TenantsMap;
|
||||
use crate::tenant::{
|
||||
TenantSharedResources,
|
||||
mgr::{BackgroundPurges, TenantManager, TenantSlot},
|
||||
use crate::{
|
||||
basebackup_cache::BasebackupCache,
|
||||
tenant::{
|
||||
TenantSharedResources,
|
||||
mgr::{BackgroundPurges, TenantManager, TenantSlot},
|
||||
},
|
||||
};
|
||||
|
||||
#[tokio::test(start_paused = true)]
|
||||
@@ -2924,9 +2928,7 @@ mod tests {
|
||||
// Invoke remove_tenant_from_memory with a cleanup hook that blocks until we manually
|
||||
// permit it to proceed: that will stick the tenant in InProgress
|
||||
|
||||
let (basebackup_prepare_sender, _) = tokio::sync::mpsc::unbounded_channel::<
|
||||
crate::basebackup_cache::BasebackupPrepareRequest,
|
||||
>();
|
||||
let (basebackup_cache, _) = BasebackupCache::new(Utf8PathBuf::new(), None);
|
||||
|
||||
let tenant_manager = TenantManager {
|
||||
tenants: std::sync::RwLock::new(TenantsMap::Open(tenants)),
|
||||
@@ -2940,7 +2942,7 @@ mod tests {
|
||||
l0_flush_global_state: crate::l0_flush::L0FlushGlobalState::new(
|
||||
h.conf.l0_flush.clone(),
|
||||
),
|
||||
basebackup_prepare_sender,
|
||||
basebackup_cache,
|
||||
feature_resolver: crate::feature_resolver::FeatureResolver::new_disabled(),
|
||||
},
|
||||
cancel: tokio_util::sync::CancellationToken::new(),
|
||||
|
||||
@@ -95,12 +95,12 @@ use super::storage_layer::{LayerFringe, LayerVisibilityHint, ReadableLayer};
|
||||
use super::tasks::log_compaction_error;
|
||||
use super::upload_queue::NotInitialized;
|
||||
use super::{
|
||||
AttachedTenantConf, BasebackupPrepareSender, GcError, HeatMapTimeline, MaybeOffloaded,
|
||||
AttachedTenantConf, GcError, HeatMapTimeline, MaybeOffloaded,
|
||||
debug_assert_current_span_has_tenant_and_timeline_id,
|
||||
};
|
||||
use crate::PERF_TRACE_TARGET;
|
||||
use crate::aux_file::AuxFileSizeEstimator;
|
||||
use crate::basebackup_cache::BasebackupPrepareRequest;
|
||||
use crate::basebackup_cache::BasebackupCache;
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{
|
||||
DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
|
||||
@@ -201,7 +201,7 @@ pub struct TimelineResources {
|
||||
pub pagestream_throttle_metrics: Arc<crate::metrics::tenant_throttling::Pagestream>,
|
||||
pub l0_compaction_trigger: Arc<Notify>,
|
||||
pub l0_flush_global_state: l0_flush::L0FlushGlobalState,
|
||||
pub basebackup_prepare_sender: BasebackupPrepareSender,
|
||||
pub basebackup_cache: Arc<BasebackupCache>,
|
||||
pub feature_resolver: FeatureResolver,
|
||||
}
|
||||
|
||||
@@ -448,7 +448,7 @@ pub struct Timeline {
|
||||
wait_lsn_log_slow: tokio::sync::Semaphore,
|
||||
|
||||
/// A channel to send async requests to prepare a basebackup for the basebackup cache.
|
||||
basebackup_prepare_sender: BasebackupPrepareSender,
|
||||
basebackup_cache: Arc<BasebackupCache>,
|
||||
|
||||
feature_resolver: FeatureResolver,
|
||||
}
|
||||
@@ -2500,6 +2500,13 @@ impl Timeline {
|
||||
.unwrap_or(self.conf.default_tenant_conf.basebackup_cache_enabled)
|
||||
}
|
||||
|
||||
/// Try to get a basebackup from the on-disk cache.
|
||||
pub(crate) async fn get_cached_basebackup(&self, lsn: Lsn) -> Option<tokio::fs::File> {
|
||||
self.basebackup_cache
|
||||
.get(self.tenant_shard_id.tenant_id, self.timeline_id, lsn)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Prepare basebackup for the given LSN and store it in the basebackup cache.
|
||||
/// The method is asynchronous and returns immediately.
|
||||
/// The actual basebackup preparation is performed in the background
|
||||
@@ -2521,17 +2528,8 @@ impl Timeline {
|
||||
return;
|
||||
}
|
||||
|
||||
let res = self
|
||||
.basebackup_prepare_sender
|
||||
.send(BasebackupPrepareRequest {
|
||||
tenant_shard_id: self.tenant_shard_id,
|
||||
timeline_id: self.timeline_id,
|
||||
lsn,
|
||||
});
|
||||
if let Err(e) = res {
|
||||
// May happen during shutdown, it's not critical.
|
||||
info!("Failed to send shutdown checkpoint: {e:#}");
|
||||
}
|
||||
self.basebackup_cache
|
||||
.send_prepare(self.tenant_shard_id, self.timeline_id, lsn);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3088,7 +3086,7 @@ impl Timeline {
|
||||
|
||||
wait_lsn_log_slow: tokio::sync::Semaphore::new(1),
|
||||
|
||||
basebackup_prepare_sender: resources.basebackup_prepare_sender,
|
||||
basebackup_cache: resources.basebackup_cache,
|
||||
|
||||
feature_resolver: resources.feature_resolver,
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user