mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-28 10:30:40 +00:00
pageserver: implement secondary-mode downloads (#6123)
Follows on from #6050 , in which we upload heatmaps. Secondary locations will now poll those heatmaps and download layers mentioned in the heatmap. TODO: - [X] ~Unify/reconcile stats for behind-schedule execution with warn_when_period_overrun (https://github.com/neondatabase/neon/pull/6050#discussion_r1426560695)~ - [x] Give downloads their own concurrency config independent of uploads Deferred optimizations: - https://github.com/neondatabase/neon/issues/6199 - https://github.com/neondatabase/neon/issues/6200 Eviction will be the next PR: - #5342
This commit is contained in:
@@ -485,6 +485,13 @@ impl PageServerNode {
|
||||
Ok(self.http_client.list_timelines(*tenant_id).await?)
|
||||
}
|
||||
|
||||
pub async fn tenant_secondary_download(&self, tenant_id: &TenantShardId) -> anyhow::Result<()> {
|
||||
Ok(self
|
||||
.http_client
|
||||
.tenant_secondary_download(*tenant_id)
|
||||
.await?)
|
||||
}
|
||||
|
||||
pub async fn timeline_create(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
|
||||
@@ -11,6 +11,7 @@ use crate::{
|
||||
use pageserver_api::models::{
|
||||
LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig,
|
||||
};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::collections::HashMap;
|
||||
use std::time::Duration;
|
||||
use utils::{
|
||||
@@ -40,9 +41,9 @@ async fn await_lsn(
|
||||
loop {
|
||||
let latest = match get_lsns(tenant_id, pageserver).await {
|
||||
Ok(l) => l,
|
||||
Err(e) => {
|
||||
Err(_e) => {
|
||||
println!(
|
||||
"🕑 Can't get LSNs on pageserver {} yet, waiting ({e})",
|
||||
"🕑 Waiting for pageserver {} to activate...",
|
||||
pageserver.conf.id
|
||||
);
|
||||
std::thread::sleep(Duration::from_millis(500));
|
||||
@@ -89,7 +90,7 @@ pub async fn migrate_tenant(
|
||||
tenant_id: TenantId,
|
||||
dest_ps: PageServerNode,
|
||||
) -> anyhow::Result<()> {
|
||||
// Get a new generation
|
||||
println!("🤔 Checking existing status...");
|
||||
let attachment_service = AttachmentService::from_env(env);
|
||||
|
||||
fn build_location_config(
|
||||
@@ -135,6 +136,20 @@ pub async fn migrate_tenant(
|
||||
baseline_lsns = Some(get_lsns(tenant_id, &origin_ps).await?);
|
||||
}
|
||||
|
||||
println!(
|
||||
"🔁 Downloading latest layers to destination pageserver {}",
|
||||
dest_ps.conf.id
|
||||
);
|
||||
match dest_ps
|
||||
.tenant_secondary_download(&TenantShardId::unsharded(tenant_id))
|
||||
.await
|
||||
{
|
||||
Ok(()) => {}
|
||||
Err(_) => {
|
||||
println!(" (skipping, destination wasn't in secondary mode)")
|
||||
}
|
||||
}
|
||||
|
||||
let gen = attachment_service
|
||||
.attach_hook(tenant_id, dest_ps.conf.id)
|
||||
.await?;
|
||||
|
||||
@@ -85,6 +85,8 @@ pub mod sync;
|
||||
|
||||
pub mod failpoint_support;
|
||||
|
||||
pub mod yielding_loop;
|
||||
|
||||
/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages
|
||||
///
|
||||
/// we have several cases:
|
||||
|
||||
@@ -15,6 +15,12 @@ pub struct Gate {
|
||||
name: String,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for Gate {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "Gate<{}>", self.name)
|
||||
}
|
||||
}
|
||||
|
||||
/// RAII guard for a [`Gate`]: as long as this exists, calls to [`Gate::close`] will
|
||||
/// not complete.
|
||||
#[derive(Debug)]
|
||||
|
||||
35
libs/utils/src/yielding_loop.rs
Normal file
35
libs/utils/src/yielding_loop.rs
Normal file
@@ -0,0 +1,35 @@
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum YieldingLoopError {
|
||||
#[error("Cancelled")]
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
/// Helper for long synchronous loops, e.g. over all tenants in the system. Periodically
|
||||
/// yields to avoid blocking the executor, and after resuming checks the provided
|
||||
/// cancellation token to drop out promptly on shutdown.
|
||||
#[inline(always)]
|
||||
pub async fn yielding_loop<I, T, F>(
|
||||
interval: usize,
|
||||
cancel: &CancellationToken,
|
||||
iter: I,
|
||||
mut visitor: F,
|
||||
) -> Result<(), YieldingLoopError>
|
||||
where
|
||||
I: Iterator<Item = T>,
|
||||
F: FnMut(T),
|
||||
{
|
||||
for (i, item) in iter.enumerate() {
|
||||
visitor(item);
|
||||
|
||||
if i + 1 % interval == 0 {
|
||||
tokio::task::yield_now().await;
|
||||
if cancel.is_cancelled() {
|
||||
return Err(YieldingLoopError::Cancelled);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
use pageserver_api::models::*;
|
||||
use pageserver_api::{models::*, shard::TenantShardId};
|
||||
use reqwest::{IntoUrl, Method};
|
||||
use utils::{
|
||||
http::error::HttpErrorBody,
|
||||
@@ -164,6 +164,18 @@ impl Client {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn tenant_secondary_download(&self, tenant_id: TenantShardId) -> Result<()> {
|
||||
let uri = format!(
|
||||
"{}/v1/tenant/{}/secondary/download",
|
||||
self.mgmt_api_endpoint, tenant_id
|
||||
);
|
||||
self.request(Method::POST, &uri, ())
|
||||
.await?
|
||||
.error_for_status()
|
||||
.map(|_| ())
|
||||
.map_err(|e| Error::ApiError(format!("{}", e)))
|
||||
}
|
||||
|
||||
pub async fn location_config(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
|
||||
@@ -37,8 +37,8 @@ use crate::tenant::{
|
||||
TENANTS_SEGMENT_NAME, TENANT_DELETED_MARKER_FILE_NAME, TIMELINES_SEGMENT_NAME,
|
||||
};
|
||||
use crate::{
|
||||
IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TENANT_CONFIG_NAME, TENANT_LOCATION_CONFIG_NAME,
|
||||
TIMELINE_DELETE_MARK_SUFFIX, TIMELINE_UNINIT_MARK_SUFFIX,
|
||||
IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TENANT_CONFIG_NAME, TENANT_HEATMAP_BASENAME,
|
||||
TENANT_LOCATION_CONFIG_NAME, TIMELINE_DELETE_MARK_SUFFIX, TIMELINE_UNINIT_MARK_SUFFIX,
|
||||
};
|
||||
|
||||
use self::defaults::DEFAULT_CONCURRENT_TENANT_WARMUP;
|
||||
@@ -75,6 +75,7 @@ pub mod defaults {
|
||||
pub const DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY: &str = "10s";
|
||||
|
||||
pub const DEFAULT_HEATMAP_UPLOAD_CONCURRENCY: usize = 8;
|
||||
pub const DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY: usize = 1;
|
||||
|
||||
pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100;
|
||||
|
||||
@@ -130,6 +131,7 @@ pub mod defaults {
|
||||
#gc_feedback = false
|
||||
|
||||
#heatmap_upload_concurrency = {DEFAULT_HEATMAP_UPLOAD_CONCURRENCY}
|
||||
#secondary_download_concurrency = {DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY}
|
||||
|
||||
[remote_storage]
|
||||
|
||||
@@ -239,6 +241,10 @@ pub struct PageServerConf {
|
||||
/// heatmap uploads vs. other remote storage operations.
|
||||
pub heatmap_upload_concurrency: usize,
|
||||
|
||||
/// How many remote storage downloads may be done for secondary tenants concurrently. Implicitly
|
||||
/// deprioritises secondary downloads vs. remote storage operations for attached tenants.
|
||||
pub secondary_download_concurrency: usize,
|
||||
|
||||
/// Maximum number of WAL records to be ingested and committed at the same time
|
||||
pub ingest_batch_size: u64,
|
||||
}
|
||||
@@ -322,6 +328,7 @@ struct PageServerConfigBuilder {
|
||||
control_plane_emergency_mode: BuilderValue<bool>,
|
||||
|
||||
heatmap_upload_concurrency: BuilderValue<usize>,
|
||||
secondary_download_concurrency: BuilderValue<usize>,
|
||||
|
||||
ingest_batch_size: BuilderValue<u64>,
|
||||
}
|
||||
@@ -396,6 +403,7 @@ impl Default for PageServerConfigBuilder {
|
||||
control_plane_emergency_mode: Set(false),
|
||||
|
||||
heatmap_upload_concurrency: Set(DEFAULT_HEATMAP_UPLOAD_CONCURRENCY),
|
||||
secondary_download_concurrency: Set(DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY),
|
||||
|
||||
ingest_batch_size: Set(DEFAULT_INGEST_BATCH_SIZE),
|
||||
}
|
||||
@@ -546,6 +554,10 @@ impl PageServerConfigBuilder {
|
||||
self.heatmap_upload_concurrency = BuilderValue::Set(value)
|
||||
}
|
||||
|
||||
pub fn secondary_download_concurrency(&mut self, value: usize) {
|
||||
self.secondary_download_concurrency = BuilderValue::Set(value)
|
||||
}
|
||||
|
||||
pub fn ingest_batch_size(&mut self, ingest_batch_size: u64) {
|
||||
self.ingest_batch_size = BuilderValue::Set(ingest_batch_size)
|
||||
}
|
||||
@@ -651,6 +663,9 @@ impl PageServerConfigBuilder {
|
||||
heatmap_upload_concurrency: self
|
||||
.heatmap_upload_concurrency
|
||||
.ok_or(anyhow!("missing heatmap_upload_concurrency"))?,
|
||||
secondary_download_concurrency: self
|
||||
.secondary_download_concurrency
|
||||
.ok_or(anyhow!("missing secondary_download_concurrency"))?,
|
||||
ingest_batch_size: self
|
||||
.ingest_batch_size
|
||||
.ok_or(anyhow!("missing ingest_batch_size"))?,
|
||||
@@ -711,6 +726,11 @@ impl PageServerConf {
|
||||
.join(TENANT_LOCATION_CONFIG_NAME)
|
||||
}
|
||||
|
||||
pub(crate) fn tenant_heatmap_path(&self, tenant_shard_id: &TenantShardId) -> Utf8PathBuf {
|
||||
self.tenant_path(tenant_shard_id)
|
||||
.join(TENANT_HEATMAP_BASENAME)
|
||||
}
|
||||
|
||||
pub fn timelines_path(&self, tenant_shard_id: &TenantShardId) -> Utf8PathBuf {
|
||||
self.tenant_path(tenant_shard_id)
|
||||
.join(TIMELINES_SEGMENT_NAME)
|
||||
@@ -896,6 +916,9 @@ impl PageServerConf {
|
||||
"heatmap_upload_concurrency" => {
|
||||
builder.heatmap_upload_concurrency(parse_toml_u64(key, item)? as usize)
|
||||
},
|
||||
"secondary_download_concurrency" => {
|
||||
builder.secondary_download_concurrency(parse_toml_u64(key, item)? as usize)
|
||||
},
|
||||
"ingest_batch_size" => builder.ingest_batch_size(parse_toml_u64(key, item)?),
|
||||
_ => bail!("unrecognized pageserver option '{key}'"),
|
||||
}
|
||||
@@ -968,6 +991,7 @@ impl PageServerConf {
|
||||
control_plane_api_token: None,
|
||||
control_plane_emergency_mode: false,
|
||||
heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY,
|
||||
secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY,
|
||||
ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE,
|
||||
}
|
||||
}
|
||||
@@ -1198,6 +1222,7 @@ background_task_maximum_delay = '334 s'
|
||||
control_plane_api_token: None,
|
||||
control_plane_emergency_mode: false,
|
||||
heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY,
|
||||
secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY,
|
||||
ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE,
|
||||
},
|
||||
"Correct defaults should be used when no config values are provided"
|
||||
@@ -1260,6 +1285,7 @@ background_task_maximum_delay = '334 s'
|
||||
control_plane_api_token: None,
|
||||
control_plane_emergency_mode: false,
|
||||
heatmap_upload_concurrency: defaults::DEFAULT_HEATMAP_UPLOAD_CONCURRENCY,
|
||||
secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY,
|
||||
ingest_batch_size: 100,
|
||||
},
|
||||
"Should be able to parse all basic config values correctly"
|
||||
|
||||
@@ -1274,6 +1274,23 @@ async fn put_tenant_location_config_handler(
|
||||
// which is not a 400 but a 409.
|
||||
.map_err(ApiError::BadRequest)?;
|
||||
|
||||
if let Some(_flush_ms) = flush {
|
||||
match state
|
||||
.secondary_controller
|
||||
.upload_tenant(tenant_shard_id)
|
||||
.await
|
||||
{
|
||||
Ok(()) => {
|
||||
tracing::info!("Uploaded heatmap during flush");
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("Failed to flush heatmap: {e}");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
tracing::info!("No flush requested when configuring");
|
||||
}
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
@@ -1611,6 +1628,21 @@ async fn secondary_upload_handler(
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn secondary_download_handler(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let state = get_state(&request);
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
|
||||
state
|
||||
.secondary_controller
|
||||
.download_tenant(tenant_shard_id)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
|
||||
async fn handler_404(_: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
json_response(
|
||||
StatusCode::NOT_FOUND,
|
||||
@@ -1879,6 +1911,9 @@ pub fn make_router(
|
||||
.put("/v1/deletion_queue/flush", |r| {
|
||||
api_handler(r, deletion_queue_flush)
|
||||
})
|
||||
.post("/v1/tenant/:tenant_shard_id/secondary/download", |r| {
|
||||
api_handler(r, secondary_download_handler)
|
||||
})
|
||||
.put("/v1/tenant/:tenant_shard_id/break", |r| {
|
||||
testing_api_handler("set tenant state to broken", r, handle_tenant_break)
|
||||
})
|
||||
|
||||
@@ -117,6 +117,10 @@ pub const TENANT_CONFIG_NAME: &str = "config";
|
||||
/// Full path: `tenants/<tenant_id>/config`.
|
||||
pub const TENANT_LOCATION_CONFIG_NAME: &str = "config-v1";
|
||||
|
||||
/// Per-tenant copy of their remote heatmap, downloaded into the local
|
||||
/// tenant path while in secondary mode.
|
||||
pub const TENANT_HEATMAP_BASENAME: &str = "heatmap-v1.json";
|
||||
|
||||
/// A suffix used for various temporary files. Any temporary files found in the
|
||||
/// data directory at pageserver startup can be automatically removed.
|
||||
pub const TEMP_FILE_SUFFIX: &str = "___temp";
|
||||
|
||||
@@ -1369,6 +1369,8 @@ pub(crate) struct SecondaryModeMetrics {
|
||||
pub(crate) upload_heatmap: IntCounter,
|
||||
pub(crate) upload_heatmap_errors: IntCounter,
|
||||
pub(crate) upload_heatmap_duration: Histogram,
|
||||
pub(crate) download_heatmap: IntCounter,
|
||||
pub(crate) download_layer: IntCounter,
|
||||
}
|
||||
pub(crate) static SECONDARY_MODE: Lazy<SecondaryModeMetrics> = Lazy::new(|| SecondaryModeMetrics {
|
||||
upload_heatmap: register_int_counter!(
|
||||
@@ -1386,6 +1388,16 @@ pub(crate) static SECONDARY_MODE: Lazy<SecondaryModeMetrics> = Lazy::new(|| Seco
|
||||
"Time to build and upload a heatmap, including any waiting inside the S3 client"
|
||||
)
|
||||
.expect("failed to define a metric"),
|
||||
download_heatmap: register_int_counter!(
|
||||
"pageserver_secondary_download_heatmap",
|
||||
"Number of downloads of heatmaps by secondary mode locations"
|
||||
)
|
||||
.expect("failed to define a metric"),
|
||||
download_layer: register_int_counter!(
|
||||
"pageserver_secondary_download_layer",
|
||||
"Number of downloads of layers by secondary mode locations"
|
||||
)
|
||||
.expect("failed to define a metric"),
|
||||
});
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||
|
||||
@@ -258,6 +258,9 @@ pub enum TaskKind {
|
||||
/// See [`crate::disk_usage_eviction_task`].
|
||||
DiskUsageEviction,
|
||||
|
||||
/// See [`crate::tenant::secondary`].
|
||||
SecondaryDownloads,
|
||||
|
||||
/// See [`crate::tenant::secondary`].
|
||||
SecondaryUploads,
|
||||
|
||||
|
||||
@@ -588,7 +588,7 @@ impl DeleteTenantFlow {
|
||||
}
|
||||
break;
|
||||
}
|
||||
TenantsMapRemoveResult::Occupied(TenantSlot::Secondary) => {
|
||||
TenantsMapRemoveResult::Occupied(TenantSlot::Secondary(_)) => {
|
||||
// This is unexpected: this secondary tenants should not have been created, and we
|
||||
// are not in a position to shut it down from here.
|
||||
tracing::warn!("Tenant transitioned to secondary mode while deleting!");
|
||||
|
||||
@@ -44,6 +44,7 @@ use utils::generation::Generation;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use super::delete::DeleteTenantError;
|
||||
use super::secondary::SecondaryTenant;
|
||||
use super::TenantSharedResources;
|
||||
|
||||
/// For a tenant that appears in TenantsMap, it may either be
|
||||
@@ -57,7 +58,7 @@ use super::TenantSharedResources;
|
||||
/// having a properly acquired generation (Secondary doesn't need a generation)
|
||||
pub(crate) enum TenantSlot {
|
||||
Attached(Arc<Tenant>),
|
||||
Secondary,
|
||||
Secondary(Arc<SecondaryTenant>),
|
||||
/// In this state, other administrative operations acting on the TenantId should
|
||||
/// block, or return a retry indicator equivalent to HTTP 503.
|
||||
InProgress(utils::completion::Barrier),
|
||||
@@ -67,7 +68,7 @@ impl std::fmt::Debug for TenantSlot {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
Self::Attached(tenant) => write!(f, "Attached({})", tenant.current_state()),
|
||||
Self::Secondary => write!(f, "Secondary"),
|
||||
Self::Secondary(_) => write!(f, "Secondary"),
|
||||
Self::InProgress(_) => write!(f, "InProgress"),
|
||||
}
|
||||
}
|
||||
@@ -78,7 +79,7 @@ impl TenantSlot {
|
||||
fn get_attached(&self) -> Option<&Arc<Tenant>> {
|
||||
match self {
|
||||
Self::Attached(t) => Some(t),
|
||||
Self::Secondary => None,
|
||||
Self::Secondary(_) => None,
|
||||
Self::InProgress(_) => None,
|
||||
}
|
||||
}
|
||||
@@ -466,12 +467,18 @@ pub async fn init_tenant_mgr(
|
||||
*gen
|
||||
} else {
|
||||
match &location_conf.mode {
|
||||
LocationMode::Secondary(_) => {
|
||||
LocationMode::Secondary(secondary_config) => {
|
||||
// We do not require the control plane's permission for secondary mode
|
||||
// tenants, because they do no remote writes and hence require no
|
||||
// generation number
|
||||
info!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), "Loaded tenant in secondary mode");
|
||||
tenants.insert(tenant_shard_id, TenantSlot::Secondary);
|
||||
tenants.insert(
|
||||
tenant_shard_id,
|
||||
TenantSlot::Secondary(SecondaryTenant::new(
|
||||
tenant_shard_id,
|
||||
secondary_config,
|
||||
)),
|
||||
);
|
||||
}
|
||||
LocationMode::Attached(_) => {
|
||||
// TODO: augment re-attach API to enable the control plane to
|
||||
@@ -663,8 +670,14 @@ async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
|
||||
|
||||
total_attached += 1;
|
||||
}
|
||||
TenantSlot::Secondary => {
|
||||
shutdown_state.insert(tenant_shard_id, TenantSlot::Secondary);
|
||||
TenantSlot::Secondary(state) => {
|
||||
// We don't need to wait for this individually per-tenant: the
|
||||
// downloader task will be waited on eventually, this cancel
|
||||
// is just to encourage it to drop out if it is doing work
|
||||
// for this tenant right now.
|
||||
state.cancel.cancel();
|
||||
|
||||
shutdown_state.insert(tenant_shard_id, TenantSlot::Secondary(state));
|
||||
}
|
||||
TenantSlot::InProgress(notify) => {
|
||||
// InProgress tenants are not visible in TenantsMap::ShuttingDown: we will
|
||||
@@ -847,12 +860,28 @@ impl TenantManager {
|
||||
Some(TenantSlot::InProgress(_)) => {
|
||||
Err(GetTenantError::NotActive(tenant_shard_id.tenant_id))
|
||||
}
|
||||
None | Some(TenantSlot::Secondary) => {
|
||||
None | Some(TenantSlot::Secondary(_)) => {
|
||||
Err(GetTenantError::NotFound(tenant_shard_id.tenant_id))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_secondary_tenant_shard(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
) -> Option<Arc<SecondaryTenant>> {
|
||||
let locked = self.tenants.read().unwrap();
|
||||
|
||||
let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)
|
||||
.ok()
|
||||
.flatten();
|
||||
|
||||
match peek_slot {
|
||||
Some(TenantSlot::Secondary(s)) => Some(s.clone()),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
|
||||
pub(crate) async fn upsert_location(
|
||||
&self,
|
||||
@@ -864,10 +893,15 @@ impl TenantManager {
|
||||
debug_assert_current_span_has_tenant_id();
|
||||
info!("configuring tenant location to state {new_location_config:?}");
|
||||
|
||||
// Special case fast-path for updates to Tenant: if our upsert is only updating configuration,
|
||||
enum FastPathModified {
|
||||
Attached(Arc<Tenant>),
|
||||
Secondary(Arc<SecondaryTenant>),
|
||||
}
|
||||
|
||||
// Special case fast-path for updates to existing slots: if our upsert is only updating configuration,
|
||||
// then we do not need to set the slot to InProgress, we can just call into the
|
||||
// existng tenant.
|
||||
let modify_tenant = {
|
||||
let fast_path_taken = {
|
||||
let locked = self.tenants.read().unwrap();
|
||||
let peek_slot =
|
||||
tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Write)?;
|
||||
@@ -881,12 +915,19 @@ impl TenantManager {
|
||||
new_location_config.clone(),
|
||||
)?);
|
||||
|
||||
Some(tenant.clone())
|
||||
Some(FastPathModified::Attached(tenant.clone()))
|
||||
} else {
|
||||
// Different generations, fall through to general case
|
||||
None
|
||||
}
|
||||
}
|
||||
(
|
||||
LocationMode::Secondary(secondary_conf),
|
||||
Some(TenantSlot::Secondary(secondary_tenant)),
|
||||
) => {
|
||||
secondary_tenant.set_config(secondary_conf);
|
||||
Some(FastPathModified::Secondary(secondary_tenant.clone()))
|
||||
}
|
||||
_ => {
|
||||
// Not an Attached->Attached transition, fall through to general case
|
||||
None
|
||||
@@ -895,34 +936,51 @@ impl TenantManager {
|
||||
};
|
||||
|
||||
// Fast-path continued: having dropped out of the self.tenants lock, do the async
|
||||
// phase of waiting for flush, before returning.
|
||||
if let Some(tenant) = modify_tenant {
|
||||
// Transition to AttachedStale means we may well hold a valid generation
|
||||
// still, and have been requested to go stale as part of a migration. If
|
||||
// the caller set `flush`, then flush to remote storage.
|
||||
if let LocationMode::Attached(AttachedLocationConfig {
|
||||
generation: _,
|
||||
attach_mode: AttachmentMode::Stale,
|
||||
}) = &new_location_config.mode
|
||||
{
|
||||
if let Some(flush_timeout) = flush {
|
||||
match tokio::time::timeout(flush_timeout, tenant.flush_remote()).await {
|
||||
Ok(Err(e)) => {
|
||||
return Err(e);
|
||||
}
|
||||
Ok(Ok(_)) => return Ok(()),
|
||||
Err(_) => {
|
||||
tracing::warn!(
|
||||
// phase of writing config and/or waiting for flush, before returning.
|
||||
match fast_path_taken {
|
||||
Some(FastPathModified::Attached(tenant)) => {
|
||||
Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
|
||||
.await
|
||||
.map_err(SetNewTenantConfigError::Persist)?;
|
||||
|
||||
// Transition to AttachedStale means we may well hold a valid generation
|
||||
// still, and have been requested to go stale as part of a migration. If
|
||||
// the caller set `flush`, then flush to remote storage.
|
||||
if let LocationMode::Attached(AttachedLocationConfig {
|
||||
generation: _,
|
||||
attach_mode: AttachmentMode::Stale,
|
||||
}) = &new_location_config.mode
|
||||
{
|
||||
if let Some(flush_timeout) = flush {
|
||||
match tokio::time::timeout(flush_timeout, tenant.flush_remote()).await {
|
||||
Ok(Err(e)) => {
|
||||
return Err(e);
|
||||
}
|
||||
Ok(Ok(_)) => return Ok(()),
|
||||
Err(_) => {
|
||||
tracing::warn!(
|
||||
timeout_ms = flush_timeout.as_millis(),
|
||||
"Timed out waiting for flush to remote storage, proceeding anyway."
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
Some(FastPathModified::Secondary(_secondary_tenant)) => {
|
||||
Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &new_location_config)
|
||||
.await
|
||||
.map_err(SetNewTenantConfigError::Persist)?;
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
None => {
|
||||
// Proceed with the general case procedure, where we will shutdown & remove any existing
|
||||
// slot contents and replace with a fresh one
|
||||
}
|
||||
};
|
||||
|
||||
// General case for upserts to TenantsMap, excluding the case above: we will substitute an
|
||||
// InProgress value to the slot while we make whatever changes are required. The state for
|
||||
@@ -931,33 +989,47 @@ impl TenantManager {
|
||||
// not do significant I/O, and shutdowns should be prompt via cancellation tokens.
|
||||
let mut slot_guard = tenant_map_acquire_slot(&tenant_shard_id, TenantSlotAcquireMode::Any)?;
|
||||
|
||||
if let Some(TenantSlot::Attached(tenant)) = slot_guard.get_old_value() {
|
||||
// The case where we keep a Tenant alive was covered above in the special case
|
||||
// for Attached->Attached transitions in the same generation. By this point,
|
||||
// if we see an attached tenant we know it will be discarded and should be
|
||||
// shut down.
|
||||
let (_guard, progress) = utils::completion::channel();
|
||||
match slot_guard.get_old_value() {
|
||||
Some(TenantSlot::Attached(tenant)) => {
|
||||
// The case where we keep a Tenant alive was covered above in the special case
|
||||
// for Attached->Attached transitions in the same generation. By this point,
|
||||
// if we see an attached tenant we know it will be discarded and should be
|
||||
// shut down.
|
||||
let (_guard, progress) = utils::completion::channel();
|
||||
|
||||
match tenant.get_attach_mode() {
|
||||
AttachmentMode::Single | AttachmentMode::Multi => {
|
||||
// Before we leave our state as the presumed holder of the latest generation,
|
||||
// flush any outstanding deletions to reduce the risk of leaking objects.
|
||||
self.resources.deletion_queue_client.flush_advisory()
|
||||
}
|
||||
AttachmentMode::Stale => {
|
||||
// If we're stale there's not point trying to flush deletions
|
||||
}
|
||||
};
|
||||
match tenant.get_attach_mode() {
|
||||
AttachmentMode::Single | AttachmentMode::Multi => {
|
||||
// Before we leave our state as the presumed holder of the latest generation,
|
||||
// flush any outstanding deletions to reduce the risk of leaking objects.
|
||||
self.resources.deletion_queue_client.flush_advisory()
|
||||
}
|
||||
AttachmentMode::Stale => {
|
||||
// If we're stale there's not point trying to flush deletions
|
||||
}
|
||||
};
|
||||
|
||||
info!("Shutting down attached tenant");
|
||||
match tenant.shutdown(progress, false).await {
|
||||
Ok(()) => {}
|
||||
Err(barrier) => {
|
||||
info!("Shutdown already in progress, waiting for it to complete");
|
||||
barrier.wait().await;
|
||||
info!("Shutting down attached tenant");
|
||||
match tenant.shutdown(progress, false).await {
|
||||
Ok(()) => {}
|
||||
Err(barrier) => {
|
||||
info!("Shutdown already in progress, waiting for it to complete");
|
||||
barrier.wait().await;
|
||||
}
|
||||
}
|
||||
slot_guard.drop_old_value().expect("We just shut it down");
|
||||
}
|
||||
Some(TenantSlot::Secondary(state)) => {
|
||||
info!("Shutting down secondary tenant");
|
||||
state.shutdown().await;
|
||||
}
|
||||
Some(TenantSlot::InProgress(_)) => {
|
||||
// This should never happen: acquire_slot should error out
|
||||
// if the contents of a slot were InProgress.
|
||||
anyhow::bail!("Acquired an InProgress slot, this is a bug.")
|
||||
}
|
||||
None => {
|
||||
// Slot was vacant, nothing needs shutting down.
|
||||
}
|
||||
slot_guard.drop_old_value().expect("We just shut it down");
|
||||
}
|
||||
|
||||
let tenant_path = self.conf.tenant_path(&tenant_shard_id);
|
||||
@@ -980,7 +1052,9 @@ impl TenantManager {
|
||||
.map_err(SetNewTenantConfigError::Persist)?;
|
||||
|
||||
let new_slot = match &new_location_config.mode {
|
||||
LocationMode::Secondary(_) => TenantSlot::Secondary,
|
||||
LocationMode::Secondary(secondary_config) => {
|
||||
TenantSlot::Secondary(SecondaryTenant::new(tenant_shard_id, secondary_config))
|
||||
}
|
||||
LocationMode::Attached(_attach_config) => {
|
||||
let shard_identity = new_location_config.shard;
|
||||
let tenant = tenant_spawn(
|
||||
@@ -1093,6 +1167,30 @@ impl TenantManager {
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
// Do some synchronous work for all tenant slots in Secondary state. The provided
|
||||
// callback should be small and fast, as it will be called inside the global
|
||||
// TenantsMap lock.
|
||||
pub(crate) fn foreach_secondary_tenants<F>(&self, mut func: F)
|
||||
where
|
||||
// TODO: let the callback return a hint to drop out of the loop early
|
||||
F: FnMut(&TenantShardId, &Arc<SecondaryTenant>),
|
||||
{
|
||||
let locked = self.tenants.read().unwrap();
|
||||
|
||||
let map = match &*locked {
|
||||
TenantsMap::Initializing | TenantsMap::ShuttingDown(_) => return,
|
||||
TenantsMap::Open(m) => m,
|
||||
};
|
||||
|
||||
for (tenant_id, slot) in map {
|
||||
if let TenantSlot::Secondary(state) = slot {
|
||||
// Only expose secondary tenants that are not currently shutting down
|
||||
if !state.cancel.is_cancelled() {
|
||||
func(tenant_id, state)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_tenant(
|
||||
&self,
|
||||
@@ -1207,7 +1305,7 @@ pub(crate) fn get_tenant(
|
||||
Some(TenantSlot::InProgress(_)) => {
|
||||
Err(GetTenantError::NotActive(tenant_shard_id.tenant_id))
|
||||
}
|
||||
None | Some(TenantSlot::Secondary) => {
|
||||
None | Some(TenantSlot::Secondary(_)) => {
|
||||
Err(GetTenantError::NotFound(tenant_shard_id.tenant_id))
|
||||
}
|
||||
}
|
||||
@@ -1280,7 +1378,7 @@ pub(crate) async fn get_active_tenant_with_timeout(
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(TenantSlot::Secondary) => {
|
||||
Some(TenantSlot::Secondary(_)) => {
|
||||
return Err(GetActiveTenantError::NotFound(GetTenantError::NotActive(
|
||||
tenant_id,
|
||||
)))
|
||||
@@ -1544,7 +1642,7 @@ pub(crate) async fn list_tenants() -> Result<Vec<(TenantShardId, TenantState)>,
|
||||
Ok(m.iter()
|
||||
.filter_map(|(id, tenant)| match tenant {
|
||||
TenantSlot::Attached(tenant) => Some((*id, tenant.current_state())),
|
||||
TenantSlot::Secondary => None,
|
||||
TenantSlot::Secondary(_) => None,
|
||||
TenantSlot::InProgress(_) => None,
|
||||
})
|
||||
.collect())
|
||||
@@ -1801,11 +1899,7 @@ impl SlotGuard {
|
||||
fn old_value_is_shutdown(&self) -> bool {
|
||||
match self.old_value.as_ref() {
|
||||
Some(TenantSlot::Attached(tenant)) => tenant.gate.close_complete(),
|
||||
Some(TenantSlot::Secondary) => {
|
||||
// TODO: when adding secondary mode tenants, this will check for shutdown
|
||||
// in the same way that we do for `Tenant` above
|
||||
true
|
||||
}
|
||||
Some(TenantSlot::Secondary(secondary_tenant)) => secondary_tenant.gate.close_complete(),
|
||||
Some(TenantSlot::InProgress(_)) => {
|
||||
// A SlotGuard cannot be constructed for a slot that was already InProgress
|
||||
unreachable!()
|
||||
@@ -2015,26 +2109,19 @@ where
|
||||
let mut slot_guard =
|
||||
tenant_map_acquire_slot_impl(&tenant_shard_id, tenants, TenantSlotAcquireMode::MustExist)?;
|
||||
|
||||
// The SlotGuard allows us to manipulate the Tenant object without fear of some
|
||||
// concurrent API request doing something else for the same tenant ID.
|
||||
let attached_tenant = match slot_guard.get_old_value() {
|
||||
Some(TenantSlot::Attached(t)) => Some(t),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
// allow pageserver shutdown to await for our completion
|
||||
let (_guard, progress) = completion::channel();
|
||||
|
||||
// If the tenant was attached, shut it down gracefully. For secondary
|
||||
// locations this part is not necessary
|
||||
match &attached_tenant {
|
||||
Some(attached_tenant) => {
|
||||
// The SlotGuard allows us to manipulate the Tenant object without fear of some
|
||||
// concurrent API request doing something else for the same tenant ID.
|
||||
let attached_tenant = match slot_guard.get_old_value() {
|
||||
Some(TenantSlot::Attached(tenant)) => {
|
||||
// whenever we remove a tenant from memory, we don't want to flush and wait for upload
|
||||
let freeze_and_flush = false;
|
||||
|
||||
// shutdown is sure to transition tenant to stopping, and wait for all tasks to complete, so
|
||||
// that we can continue safely to cleanup.
|
||||
match attached_tenant.shutdown(progress, freeze_and_flush).await {
|
||||
match tenant.shutdown(progress, freeze_and_flush).await {
|
||||
Ok(()) => {}
|
||||
Err(_other) => {
|
||||
// if pageserver shutdown or other detach/ignore is already ongoing, we don't want to
|
||||
@@ -2043,11 +2130,19 @@ where
|
||||
return Err(TenantStateError::IsStopping(tenant_shard_id.tenant_id));
|
||||
}
|
||||
}
|
||||
Some(tenant)
|
||||
}
|
||||
None => {
|
||||
// Nothing to wait on when not attached, proceed.
|
||||
Some(TenantSlot::Secondary(secondary_state)) => {
|
||||
tracing::info!("Shutting down in secondary mode");
|
||||
secondary_state.shutdown().await;
|
||||
None
|
||||
}
|
||||
}
|
||||
Some(TenantSlot::InProgress(_)) => {
|
||||
// Acquiring a slot guarantees its old value was not InProgress
|
||||
unreachable!();
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
match tenant_cleanup
|
||||
.await
|
||||
|
||||
@@ -229,6 +229,7 @@ use crate::{
|
||||
tenant::upload_queue::{
|
||||
UploadOp, UploadQueue, UploadQueueInitialized, UploadQueueStopped, UploadTask,
|
||||
},
|
||||
TENANT_HEATMAP_BASENAME,
|
||||
};
|
||||
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
@@ -1741,11 +1742,11 @@ pub fn remote_index_path(
|
||||
.expect("Failed to construct path")
|
||||
}
|
||||
|
||||
pub const HEATMAP_BASENAME: &str = "heatmap-v1.json";
|
||||
|
||||
pub(crate) fn remote_heatmap_path(tenant_shard_id: &TenantShardId) -> RemotePath {
|
||||
RemotePath::from_string(&format!("tenants/{tenant_shard_id}/{HEATMAP_BASENAME}"))
|
||||
.expect("Failed to construct path")
|
||||
RemotePath::from_string(&format!(
|
||||
"tenants/{tenant_shard_id}/{TENANT_HEATMAP_BASENAME}"
|
||||
))
|
||||
.expect("Failed to construct path")
|
||||
}
|
||||
|
||||
/// Given the key of an index, parse out the generation part of the name
|
||||
|
||||
@@ -1,24 +1,48 @@
|
||||
mod downloader;
|
||||
pub mod heatmap;
|
||||
mod heatmap_uploader;
|
||||
mod scheduler;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
|
||||
|
||||
use self::heatmap_uploader::heatmap_uploader_task;
|
||||
use self::{
|
||||
downloader::{downloader_task, SecondaryDetail},
|
||||
heatmap_uploader::heatmap_uploader_task,
|
||||
};
|
||||
|
||||
use super::mgr::TenantManager;
|
||||
use super::{config::SecondaryLocationConfig, mgr::TenantManager};
|
||||
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::completion::Barrier;
|
||||
use utils::{completion::Barrier, sync::gate::Gate};
|
||||
|
||||
enum DownloadCommand {
|
||||
Download(TenantShardId),
|
||||
}
|
||||
enum UploadCommand {
|
||||
Upload(TenantShardId),
|
||||
}
|
||||
|
||||
impl UploadCommand {
|
||||
fn get_tenant_shard_id(&self) -> &TenantShardId {
|
||||
match self {
|
||||
Self::Upload(id) => id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DownloadCommand {
|
||||
fn get_tenant_shard_id(&self) -> &TenantShardId {
|
||||
match self {
|
||||
Self::Download(id) => id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct CommandRequest<T> {
|
||||
payload: T,
|
||||
response_tx: tokio::sync::oneshot::Sender<CommandResponse>,
|
||||
@@ -28,12 +52,73 @@ struct CommandResponse {
|
||||
result: anyhow::Result<()>,
|
||||
}
|
||||
|
||||
// Whereas [`Tenant`] represents an attached tenant, this type represents the work
|
||||
// we do for secondary tenant locations: where we are not serving clients or
|
||||
// ingesting WAL, but we are maintaining a warm cache of layer files.
|
||||
//
|
||||
// This type is all about the _download_ path for secondary mode. The upload path
|
||||
// runs separately (see [`heatmap_uploader`]) while a regular attached `Tenant` exists.
|
||||
//
|
||||
// This structure coordinates TenantManager and SecondaryDownloader,
|
||||
// so that the downloader can indicate which tenants it is currently
|
||||
// operating on, and the manager can indicate when a particular
|
||||
// secondary tenant should cancel any work in flight.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct SecondaryTenant {
|
||||
/// Carrying a tenant shard ID simplifies callers such as the downloader
|
||||
/// which need to organize many of these objects by ID.
|
||||
tenant_shard_id: TenantShardId,
|
||||
|
||||
/// Cancellation token indicates to SecondaryDownloader that it should stop doing
|
||||
/// any work for this tenant at the next opportunity.
|
||||
pub(crate) cancel: CancellationToken,
|
||||
|
||||
pub(crate) gate: Gate,
|
||||
|
||||
detail: std::sync::Mutex<SecondaryDetail>,
|
||||
}
|
||||
|
||||
impl SecondaryTenant {
|
||||
pub(crate) fn new(
|
||||
tenant_shard_id: TenantShardId,
|
||||
config: &SecondaryLocationConfig,
|
||||
) -> Arc<Self> {
|
||||
Arc::new(Self {
|
||||
tenant_shard_id,
|
||||
// todo: shall we make this a descendent of the
|
||||
// main cancellation token, or is it sufficient that
|
||||
// on shutdown we walk the tenants and fire their
|
||||
// individual cancellations?
|
||||
cancel: CancellationToken::new(),
|
||||
gate: Gate::new(format!("SecondaryTenant {tenant_shard_id}")),
|
||||
|
||||
detail: std::sync::Mutex::new(SecondaryDetail::new(config.clone())),
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn shutdown(&self) {
|
||||
self.cancel.cancel();
|
||||
|
||||
// Wait for any secondary downloader work to complete
|
||||
self.gate.close().await;
|
||||
}
|
||||
|
||||
pub(crate) fn set_config(&self, config: &SecondaryLocationConfig) {
|
||||
self.detail.lock().unwrap().config = config.clone();
|
||||
}
|
||||
|
||||
fn get_tenant_shard_id(&self) -> &TenantShardId {
|
||||
&self.tenant_shard_id
|
||||
}
|
||||
}
|
||||
|
||||
/// The SecondaryController is a pseudo-rpc client for administrative control of secondary mode downloads,
|
||||
/// and heatmap uploads. This is not a hot data path: it's primarily a hook for tests,
|
||||
/// where we want to immediately upload/download for a particular tenant. In normal operation
|
||||
/// uploads & downloads are autonomous and not driven by this interface.
|
||||
pub struct SecondaryController {
|
||||
upload_req_tx: tokio::sync::mpsc::Sender<CommandRequest<UploadCommand>>,
|
||||
download_req_tx: tokio::sync::mpsc::Sender<CommandRequest<DownloadCommand>>,
|
||||
}
|
||||
|
||||
impl SecondaryController {
|
||||
@@ -63,6 +148,13 @@ impl SecondaryController {
|
||||
self.dispatch(&self.upload_req_tx, UploadCommand::Upload(tenant_shard_id))
|
||||
.await
|
||||
}
|
||||
pub async fn download_tenant(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
|
||||
self.dispatch(
|
||||
&self.download_req_tx,
|
||||
DownloadCommand::Download(tenant_shard_id),
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
pub fn spawn_tasks(
|
||||
@@ -71,9 +163,37 @@ pub fn spawn_tasks(
|
||||
background_jobs_can_start: Barrier,
|
||||
cancel: CancellationToken,
|
||||
) -> SecondaryController {
|
||||
let mgr_clone = tenant_manager.clone();
|
||||
let storage_clone = remote_storage.clone();
|
||||
let cancel_clone = cancel.clone();
|
||||
let bg_jobs_clone = background_jobs_can_start.clone();
|
||||
|
||||
let (download_req_tx, download_req_rx) =
|
||||
tokio::sync::mpsc::channel::<CommandRequest<DownloadCommand>>(16);
|
||||
let (upload_req_tx, upload_req_rx) =
|
||||
tokio::sync::mpsc::channel::<CommandRequest<UploadCommand>>(16);
|
||||
|
||||
task_mgr::spawn(
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::SecondaryDownloads,
|
||||
None,
|
||||
None,
|
||||
"secondary tenant downloads",
|
||||
false,
|
||||
async move {
|
||||
downloader_task(
|
||||
mgr_clone,
|
||||
storage_clone,
|
||||
download_req_rx,
|
||||
bg_jobs_clone,
|
||||
cancel_clone,
|
||||
)
|
||||
.await;
|
||||
|
||||
Ok(())
|
||||
},
|
||||
);
|
||||
|
||||
task_mgr::spawn(
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::SecondaryUploads,
|
||||
@@ -89,16 +209,26 @@ pub fn spawn_tasks(
|
||||
background_jobs_can_start,
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
.await;
|
||||
|
||||
Ok(())
|
||||
},
|
||||
);
|
||||
|
||||
SecondaryController { upload_req_tx }
|
||||
SecondaryController {
|
||||
download_req_tx,
|
||||
upload_req_tx,
|
||||
}
|
||||
}
|
||||
|
||||
/// For running with remote storage disabled: a SecondaryController that is connected to nothing.
|
||||
pub fn null_controller() -> SecondaryController {
|
||||
let (download_req_tx, _download_req_rx) =
|
||||
tokio::sync::mpsc::channel::<CommandRequest<DownloadCommand>>(16);
|
||||
let (upload_req_tx, _upload_req_rx) =
|
||||
tokio::sync::mpsc::channel::<CommandRequest<UploadCommand>>(16);
|
||||
SecondaryController { upload_req_tx }
|
||||
SecondaryController {
|
||||
upload_req_tx,
|
||||
download_req_tx,
|
||||
}
|
||||
}
|
||||
|
||||
801
pageserver/src/tenant/secondary/downloader.rs
Normal file
801
pageserver/src/tenant/secondary/downloader.rs
Normal file
@@ -0,0 +1,801 @@
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
pin::Pin,
|
||||
str::FromStr,
|
||||
sync::Arc,
|
||||
time::{Duration, Instant, SystemTime},
|
||||
};
|
||||
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
metrics::SECONDARY_MODE,
|
||||
tenant::{
|
||||
config::SecondaryLocationConfig,
|
||||
debug_assert_current_span_has_tenant_and_timeline_id,
|
||||
remote_timeline_client::{
|
||||
index::LayerFileMetadata, FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES,
|
||||
},
|
||||
span::debug_assert_current_span_has_tenant_id,
|
||||
storage_layer::LayerFileName,
|
||||
tasks::{warn_when_period_overrun, BackgroundLoopKind},
|
||||
},
|
||||
virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile},
|
||||
METADATA_FILE_NAME, TEMP_FILE_SUFFIX,
|
||||
};
|
||||
|
||||
use super::{
|
||||
heatmap::HeatMapLayer,
|
||||
scheduler::{self, Completion, JobGenerator, SchedulingResult, TenantBackgroundJobs},
|
||||
SecondaryTenant,
|
||||
};
|
||||
|
||||
use crate::tenant::{
|
||||
mgr::TenantManager,
|
||||
remote_timeline_client::{download::download_layer_file, remote_heatmap_path},
|
||||
};
|
||||
|
||||
use chrono::format::{DelayedFormat, StrftimeItems};
|
||||
use futures::Future;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use rand::Rng;
|
||||
use remote_storage::{DownloadError, GenericRemoteStorage};
|
||||
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{info_span, instrument, Instrument};
|
||||
use utils::{
|
||||
backoff, completion::Barrier, crashsafe::path_with_suffix_extension, fs_ext, id::TimelineId,
|
||||
};
|
||||
|
||||
use super::{
|
||||
heatmap::{HeatMapTenant, HeatMapTimeline},
|
||||
CommandRequest, DownloadCommand,
|
||||
};
|
||||
|
||||
/// For each tenant, how long must have passed since the last download_tenant call before
|
||||
/// calling it again. This is approximately the time by which local data is allowed
|
||||
/// to fall behind remote data.
|
||||
///
|
||||
/// TODO: this should just be a default, and the actual period should be controlled
|
||||
/// via the heatmap itself
|
||||
/// `<ttps://github.com/neondatabase/neon/issues/6200>`
|
||||
const DOWNLOAD_FRESHEN_INTERVAL: Duration = Duration::from_millis(60000);
|
||||
|
||||
pub(super) async fn downloader_task(
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
remote_storage: GenericRemoteStorage,
|
||||
command_queue: tokio::sync::mpsc::Receiver<CommandRequest<DownloadCommand>>,
|
||||
background_jobs_can_start: Barrier,
|
||||
cancel: CancellationToken,
|
||||
) {
|
||||
let concurrency = tenant_manager.get_conf().secondary_download_concurrency;
|
||||
|
||||
let generator = SecondaryDownloader {
|
||||
tenant_manager,
|
||||
remote_storage,
|
||||
};
|
||||
let mut scheduler = Scheduler::new(generator, concurrency);
|
||||
|
||||
scheduler
|
||||
.run(command_queue, background_jobs_can_start, cancel)
|
||||
.instrument(info_span!("secondary_downloads"))
|
||||
.await
|
||||
}
|
||||
|
||||
struct SecondaryDownloader {
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
remote_storage: GenericRemoteStorage,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(super) struct OnDiskState {
|
||||
metadata: LayerFileMetadata,
|
||||
access_time: SystemTime,
|
||||
}
|
||||
|
||||
impl OnDiskState {
|
||||
fn new(
|
||||
_conf: &'static PageServerConf,
|
||||
_tenant_shard_id: &TenantShardId,
|
||||
_imeline_id: &TimelineId,
|
||||
_ame: LayerFileName,
|
||||
metadata: LayerFileMetadata,
|
||||
access_time: SystemTime,
|
||||
) -> Self {
|
||||
Self {
|
||||
metadata,
|
||||
access_time,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub(super) struct SecondaryDetailTimeline {
|
||||
pub(super) on_disk_layers: HashMap<LayerFileName, OnDiskState>,
|
||||
|
||||
/// We remember when layers were evicted, to prevent re-downloading them.
|
||||
pub(super) evicted_at: HashMap<LayerFileName, SystemTime>,
|
||||
}
|
||||
|
||||
/// This state is written by the secondary downloader, it is opaque
|
||||
/// to TenantManager
|
||||
#[derive(Debug)]
|
||||
pub(super) struct SecondaryDetail {
|
||||
pub(super) config: SecondaryLocationConfig,
|
||||
|
||||
last_download: Option<Instant>,
|
||||
next_download: Option<Instant>,
|
||||
pub(super) timelines: HashMap<TimelineId, SecondaryDetailTimeline>,
|
||||
}
|
||||
|
||||
/// Helper for logging SystemTime
|
||||
fn strftime(t: &'_ SystemTime) -> DelayedFormat<StrftimeItems<'_>> {
|
||||
let datetime: chrono::DateTime<chrono::Utc> = (*t).into();
|
||||
datetime.format("%d/%m/%Y %T")
|
||||
}
|
||||
|
||||
impl SecondaryDetail {
|
||||
pub(super) fn new(config: SecondaryLocationConfig) -> Self {
|
||||
Self {
|
||||
config,
|
||||
last_download: None,
|
||||
next_download: None,
|
||||
timelines: HashMap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct PendingDownload {
|
||||
secondary_state: Arc<SecondaryTenant>,
|
||||
last_download: Option<Instant>,
|
||||
target_time: Option<Instant>,
|
||||
period: Option<Duration>,
|
||||
}
|
||||
|
||||
impl scheduler::PendingJob for PendingDownload {
|
||||
fn get_tenant_shard_id(&self) -> &TenantShardId {
|
||||
self.secondary_state.get_tenant_shard_id()
|
||||
}
|
||||
}
|
||||
|
||||
struct RunningDownload {
|
||||
barrier: Barrier,
|
||||
}
|
||||
|
||||
impl scheduler::RunningJob for RunningDownload {
|
||||
fn get_barrier(&self) -> Barrier {
|
||||
self.barrier.clone()
|
||||
}
|
||||
}
|
||||
|
||||
struct CompleteDownload {
|
||||
secondary_state: Arc<SecondaryTenant>,
|
||||
completed_at: Instant,
|
||||
}
|
||||
|
||||
impl scheduler::Completion for CompleteDownload {
|
||||
fn get_tenant_shard_id(&self) -> &TenantShardId {
|
||||
self.secondary_state.get_tenant_shard_id()
|
||||
}
|
||||
}
|
||||
|
||||
type Scheduler = TenantBackgroundJobs<
|
||||
SecondaryDownloader,
|
||||
PendingDownload,
|
||||
RunningDownload,
|
||||
CompleteDownload,
|
||||
DownloadCommand,
|
||||
>;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCommand>
|
||||
for SecondaryDownloader
|
||||
{
|
||||
#[instrument(skip_all, fields(tenant_id=%completion.get_tenant_shard_id().tenant_id, shard_id=%completion.get_tenant_shard_id().shard_slug()))]
|
||||
fn on_completion(&mut self, completion: CompleteDownload) {
|
||||
let CompleteDownload {
|
||||
secondary_state,
|
||||
completed_at: _completed_at,
|
||||
} = completion;
|
||||
|
||||
tracing::debug!("Secondary tenant download completed");
|
||||
|
||||
// Update freshened_at even if there was an error: we don't want errored tenants to implicitly
|
||||
// take priority to run again.
|
||||
let mut detail = secondary_state.detail.lock().unwrap();
|
||||
detail.next_download = Some(Instant::now() + DOWNLOAD_FRESHEN_INTERVAL);
|
||||
}
|
||||
|
||||
async fn schedule(&mut self) -> SchedulingResult<PendingDownload> {
|
||||
let mut result = SchedulingResult {
|
||||
jobs: Vec::new(),
|
||||
want_interval: None,
|
||||
};
|
||||
|
||||
// Step 1: identify some tenants that we may work on
|
||||
let mut tenants: Vec<Arc<SecondaryTenant>> = Vec::new();
|
||||
self.tenant_manager
|
||||
.foreach_secondary_tenants(|_id, secondary_state| {
|
||||
tenants.push(secondary_state.clone());
|
||||
});
|
||||
|
||||
// Step 2: filter out tenants which are not yet elegible to run
|
||||
let now = Instant::now();
|
||||
result.jobs = tenants
|
||||
.into_iter()
|
||||
.filter_map(|secondary_tenant| {
|
||||
let (last_download, next_download) = {
|
||||
let mut detail = secondary_tenant.detail.lock().unwrap();
|
||||
|
||||
if !detail.config.warm {
|
||||
// Downloads are disabled for this tenant
|
||||
detail.next_download = None;
|
||||
return None;
|
||||
}
|
||||
|
||||
if detail.next_download.is_none() {
|
||||
// Initialize with a jitter: this spreads initial downloads on startup
|
||||
// or mass-attach across our freshen interval.
|
||||
let jittered_period =
|
||||
rand::thread_rng().gen_range(Duration::ZERO..DOWNLOAD_FRESHEN_INTERVAL);
|
||||
detail.next_download = Some(now.checked_add(jittered_period).expect(
|
||||
"Using our constant, which is known to be small compared with clock range",
|
||||
));
|
||||
}
|
||||
(detail.last_download, detail.next_download.unwrap())
|
||||
};
|
||||
|
||||
if now < next_download {
|
||||
Some(PendingDownload {
|
||||
secondary_state: secondary_tenant,
|
||||
last_download,
|
||||
target_time: Some(next_download),
|
||||
period: Some(DOWNLOAD_FRESHEN_INTERVAL),
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
// Step 3: sort by target execution time to run most urgent first.
|
||||
result.jobs.sort_by_key(|j| j.target_time);
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
fn on_command(&mut self, command: DownloadCommand) -> anyhow::Result<PendingDownload> {
|
||||
let tenant_shard_id = command.get_tenant_shard_id();
|
||||
|
||||
let tenant = self
|
||||
.tenant_manager
|
||||
.get_secondary_tenant_shard(*tenant_shard_id);
|
||||
let Some(tenant) = tenant else {
|
||||
{
|
||||
return Err(anyhow::anyhow!("Not found or not in Secondary mode"));
|
||||
}
|
||||
};
|
||||
|
||||
Ok(PendingDownload {
|
||||
target_time: None,
|
||||
period: None,
|
||||
last_download: None,
|
||||
secondary_state: tenant,
|
||||
})
|
||||
}
|
||||
|
||||
fn spawn(
|
||||
&mut self,
|
||||
job: PendingDownload,
|
||||
) -> (
|
||||
RunningDownload,
|
||||
Pin<Box<dyn Future<Output = CompleteDownload> + Send>>,
|
||||
) {
|
||||
let PendingDownload {
|
||||
secondary_state,
|
||||
last_download,
|
||||
target_time,
|
||||
period,
|
||||
} = job;
|
||||
|
||||
let (completion, barrier) = utils::completion::channel();
|
||||
let remote_storage = self.remote_storage.clone();
|
||||
let conf = self.tenant_manager.get_conf();
|
||||
let tenant_shard_id = *secondary_state.get_tenant_shard_id();
|
||||
(RunningDownload { barrier }, Box::pin(async move {
|
||||
let _completion = completion;
|
||||
|
||||
match TenantDownloader::new(conf, &remote_storage, &secondary_state)
|
||||
.download()
|
||||
.await
|
||||
{
|
||||
Err(UpdateError::NoData) => {
|
||||
tracing::info!("No heatmap found for tenant. This is fine if it is new.");
|
||||
},
|
||||
Err(UpdateError::NoSpace) => {
|
||||
tracing::warn!("Insufficient space while downloading. Will retry later.");
|
||||
}
|
||||
Err(UpdateError::Cancelled) => {
|
||||
tracing::debug!("Shut down while downloading");
|
||||
},
|
||||
Err(UpdateError::Deserialize(e)) => {
|
||||
tracing::error!("Corrupt content while downloading tenant: {e}");
|
||||
},
|
||||
Err(e @ (UpdateError::DownloadError(_) | UpdateError::Other(_))) => {
|
||||
tracing::error!("Error while downloading tenant: {e}");
|
||||
},
|
||||
Ok(()) => {}
|
||||
};
|
||||
|
||||
// Irrespective of the result, we will reschedule ourselves to run after our usual period.
|
||||
|
||||
// If the job had a target execution time, we may check our final execution
|
||||
// time against that for observability purposes.
|
||||
if let (Some(target_time), Some(period)) = (target_time, period) {
|
||||
// Only track execution lag if this isn't our first download: otherwise, it is expected
|
||||
// that execution will have taken longer than our configured interval, for example
|
||||
// when starting up a pageserver and
|
||||
if last_download.is_some() {
|
||||
// Elapsed time includes any scheduling lag as well as the execution of the job
|
||||
let elapsed = Instant::now().duration_since(target_time);
|
||||
|
||||
warn_when_period_overrun(
|
||||
elapsed,
|
||||
period,
|
||||
BackgroundLoopKind::SecondaryDownload,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
CompleteDownload {
|
||||
secondary_state,
|
||||
completed_at: Instant::now(),
|
||||
}
|
||||
}.instrument(info_span!(parent: None, "secondary_download", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))))
|
||||
}
|
||||
}
|
||||
|
||||
/// This type is a convenience to group together the various functions involved in
|
||||
/// freshening a secondary tenant.
|
||||
struct TenantDownloader<'a> {
|
||||
conf: &'static PageServerConf,
|
||||
remote_storage: &'a GenericRemoteStorage,
|
||||
secondary_state: &'a SecondaryTenant,
|
||||
}
|
||||
|
||||
/// Errors that may be encountered while updating a tenant
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
enum UpdateError {
|
||||
#[error("No remote data found")]
|
||||
NoData,
|
||||
#[error("Insufficient local storage space")]
|
||||
NoSpace,
|
||||
#[error("Failed to download")]
|
||||
DownloadError(DownloadError),
|
||||
#[error(transparent)]
|
||||
Deserialize(#[from] serde_json::Error),
|
||||
#[error("Cancelled")]
|
||||
Cancelled,
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
impl From<DownloadError> for UpdateError {
|
||||
fn from(value: DownloadError) -> Self {
|
||||
match &value {
|
||||
DownloadError::Cancelled => Self::Cancelled,
|
||||
DownloadError::NotFound => Self::NoData,
|
||||
_ => Self::DownloadError(value),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<std::io::Error> for UpdateError {
|
||||
fn from(value: std::io::Error) -> Self {
|
||||
if let Some(nix::errno::Errno::ENOSPC) = value.raw_os_error().map(nix::errno::from_i32) {
|
||||
UpdateError::NoSpace
|
||||
} else {
|
||||
// An I/O error from e.g. tokio::io::copy is most likely a remote storage issue
|
||||
UpdateError::Other(anyhow::anyhow!(value))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> TenantDownloader<'a> {
|
||||
fn new(
|
||||
conf: &'static PageServerConf,
|
||||
remote_storage: &'a GenericRemoteStorage,
|
||||
secondary_state: &'a SecondaryTenant,
|
||||
) -> Self {
|
||||
Self {
|
||||
conf,
|
||||
remote_storage,
|
||||
secondary_state,
|
||||
}
|
||||
}
|
||||
|
||||
async fn download(&self) -> Result<(), UpdateError> {
|
||||
debug_assert_current_span_has_tenant_id();
|
||||
|
||||
// For the duration of a download, we must hold the SecondaryTenant::gate, to ensure
|
||||
// cover our access to local storage.
|
||||
let Ok(_guard) = self.secondary_state.gate.enter() else {
|
||||
// Shutting down
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
|
||||
// Download the tenant's heatmap
|
||||
let heatmap_bytes = tokio::select!(
|
||||
bytes = self.download_heatmap() => {bytes?},
|
||||
_ = self.secondary_state.cancel.cancelled() => return Ok(())
|
||||
);
|
||||
|
||||
let heatmap = serde_json::from_slice::<HeatMapTenant>(&heatmap_bytes)?;
|
||||
|
||||
// Save the heatmap: this will be useful on restart, allowing us to reconstruct
|
||||
// layer metadata without having to re-download it.
|
||||
let heatmap_path = self.conf.tenant_heatmap_path(tenant_shard_id);
|
||||
|
||||
let temp_path = path_with_suffix_extension(&heatmap_path, TEMP_FILE_SUFFIX);
|
||||
let context_msg = format!("write tenant {tenant_shard_id} heatmap to {heatmap_path}");
|
||||
let heatmap_path_bg = heatmap_path.clone();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
tokio::runtime::Handle::current().block_on(async move {
|
||||
VirtualFile::crashsafe_overwrite(&heatmap_path_bg, &temp_path, &heatmap_bytes).await
|
||||
})
|
||||
})
|
||||
.await
|
||||
.expect("Blocking task is never aborted")
|
||||
.maybe_fatal_err(&context_msg)?;
|
||||
|
||||
tracing::debug!("Wrote local heatmap to {}", heatmap_path);
|
||||
|
||||
// Download the layers in the heatmap
|
||||
for timeline in heatmap.timelines {
|
||||
if self.secondary_state.cancel.is_cancelled() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let timeline_id = timeline.timeline_id;
|
||||
self.download_timeline(timeline)
|
||||
.instrument(tracing::info_span!(
|
||||
"secondary_download_timeline",
|
||||
tenant_id=%tenant_shard_id.tenant_id,
|
||||
shard_id=%tenant_shard_id.shard_slug(),
|
||||
%timeline_id
|
||||
))
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn download_heatmap(&self) -> Result<Vec<u8>, UpdateError> {
|
||||
debug_assert_current_span_has_tenant_id();
|
||||
let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
|
||||
// TODO: make download conditional on ETag having changed since last download
|
||||
// (https://github.com/neondatabase/neon/issues/6199)
|
||||
tracing::debug!("Downloading heatmap for secondary tenant",);
|
||||
|
||||
let heatmap_path = remote_heatmap_path(tenant_shard_id);
|
||||
|
||||
let heatmap_bytes = backoff::retry(
|
||||
|| async {
|
||||
let download = self
|
||||
.remote_storage
|
||||
.download(&heatmap_path)
|
||||
.await
|
||||
.map_err(UpdateError::from)?;
|
||||
let mut heatmap_bytes = Vec::new();
|
||||
let mut body = tokio_util::io::StreamReader::new(download.download_stream);
|
||||
let _size = tokio::io::copy(&mut body, &mut heatmap_bytes).await?;
|
||||
Ok(heatmap_bytes)
|
||||
},
|
||||
|e| matches!(e, UpdateError::NoData | UpdateError::Cancelled),
|
||||
FAILED_DOWNLOAD_WARN_THRESHOLD,
|
||||
FAILED_REMOTE_OP_RETRIES,
|
||||
"download heatmap",
|
||||
backoff::Cancel::new(self.secondary_state.cancel.clone(), || {
|
||||
UpdateError::Cancelled
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
|
||||
SECONDARY_MODE.download_heatmap.inc();
|
||||
|
||||
Ok(heatmap_bytes)
|
||||
}
|
||||
|
||||
async fn download_timeline(&self, timeline: HeatMapTimeline) -> Result<(), UpdateError> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
|
||||
let timeline_path = self
|
||||
.conf
|
||||
.timeline_path(tenant_shard_id, &timeline.timeline_id);
|
||||
|
||||
// Accumulate updates to the state
|
||||
let mut touched = Vec::new();
|
||||
|
||||
// Clone a view of what layers already exist on disk
|
||||
let timeline_state = self
|
||||
.secondary_state
|
||||
.detail
|
||||
.lock()
|
||||
.unwrap()
|
||||
.timelines
|
||||
.get(&timeline.timeline_id)
|
||||
.cloned();
|
||||
|
||||
let timeline_state = match timeline_state {
|
||||
Some(t) => t,
|
||||
None => {
|
||||
// We have no existing state: need to scan local disk for layers first.
|
||||
let timeline_state =
|
||||
init_timeline_state(self.conf, tenant_shard_id, &timeline).await;
|
||||
|
||||
// Re-acquire detail lock now that we're done with async load from local FS
|
||||
self.secondary_state
|
||||
.detail
|
||||
.lock()
|
||||
.unwrap()
|
||||
.timelines
|
||||
.insert(timeline.timeline_id, timeline_state.clone());
|
||||
timeline_state
|
||||
}
|
||||
};
|
||||
|
||||
let layers_in_heatmap = timeline
|
||||
.layers
|
||||
.iter()
|
||||
.map(|l| &l.name)
|
||||
.collect::<HashSet<_>>();
|
||||
let layers_on_disk = timeline_state
|
||||
.on_disk_layers
|
||||
.iter()
|
||||
.map(|l| l.0)
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
// Remove on-disk layers that are no longer present in heatmap
|
||||
for layer in layers_on_disk.difference(&layers_in_heatmap) {
|
||||
let local_path = timeline_path.join(layer.to_string());
|
||||
tracing::info!("Removing secondary local layer {layer} because it's absent in heatmap",);
|
||||
tokio::fs::remove_file(&local_path)
|
||||
.await
|
||||
.or_else(fs_ext::ignore_not_found)
|
||||
.maybe_fatal_err("Removing secondary layer")?;
|
||||
}
|
||||
|
||||
// Download heatmap layers that are not present on local disk, or update their
|
||||
// access time if they are already present.
|
||||
for layer in timeline.layers {
|
||||
if self.secondary_state.cancel.is_cancelled() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Existing on-disk layers: just update their access time.
|
||||
if let Some(on_disk) = timeline_state.on_disk_layers.get(&layer.name) {
|
||||
tracing::debug!("Layer {} is already on disk", layer.name);
|
||||
if on_disk.metadata != LayerFileMetadata::from(&layer.metadata)
|
||||
|| on_disk.access_time != layer.access_time
|
||||
{
|
||||
// We already have this layer on disk. Update its access time.
|
||||
tracing::debug!(
|
||||
"Access time updated for layer {}: {} -> {}",
|
||||
layer.name,
|
||||
strftime(&on_disk.access_time),
|
||||
strftime(&layer.access_time)
|
||||
);
|
||||
touched.push(layer);
|
||||
}
|
||||
continue;
|
||||
} else {
|
||||
tracing::debug!("Layer {} not present on disk yet", layer.name);
|
||||
}
|
||||
|
||||
// Eviction: if we evicted a layer, then do not re-download it unless it was accessed more
|
||||
// recently than it was evicted.
|
||||
if let Some(evicted_at) = timeline_state.evicted_at.get(&layer.name) {
|
||||
if &layer.access_time > evicted_at {
|
||||
tracing::info!(
|
||||
"Re-downloading evicted layer {}, accessed at {}, evicted at {}",
|
||||
layer.name,
|
||||
strftime(&layer.access_time),
|
||||
strftime(evicted_at)
|
||||
);
|
||||
} else {
|
||||
tracing::trace!(
|
||||
"Not re-downloading evicted layer {}, accessed at {}, evicted at {}",
|
||||
layer.name,
|
||||
strftime(&layer.access_time),
|
||||
strftime(evicted_at)
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// Note: no backoff::retry wrapper here because download_layer_file does its own retries internally
|
||||
let downloaded_bytes = match download_layer_file(
|
||||
self.conf,
|
||||
self.remote_storage,
|
||||
*tenant_shard_id,
|
||||
timeline.timeline_id,
|
||||
&layer.name,
|
||||
&LayerFileMetadata::from(&layer.metadata),
|
||||
&self.secondary_state.cancel,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(bytes) => bytes,
|
||||
Err(e) => {
|
||||
if let DownloadError::NotFound = e {
|
||||
// A heatmap might be out of date and refer to a layer that doesn't exist any more.
|
||||
// This is harmless: continue to download the next layer. It is expected during compaction
|
||||
// GC.
|
||||
tracing::debug!(
|
||||
"Skipped downloading missing layer {}, raced with compaction/gc?",
|
||||
layer.name
|
||||
);
|
||||
continue;
|
||||
} else {
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if downloaded_bytes != layer.metadata.file_size {
|
||||
let local_path = timeline_path.join(layer.name.to_string());
|
||||
|
||||
tracing::warn!(
|
||||
"Downloaded layer {} with unexpected size {} != {}. Removing download.",
|
||||
layer.name,
|
||||
downloaded_bytes,
|
||||
layer.metadata.file_size
|
||||
);
|
||||
|
||||
tokio::fs::remove_file(&local_path)
|
||||
.await
|
||||
.or_else(fs_ext::ignore_not_found)?;
|
||||
}
|
||||
|
||||
SECONDARY_MODE.download_layer.inc();
|
||||
touched.push(layer)
|
||||
}
|
||||
|
||||
// Write updates to state to record layers we just downloaded or touched.
|
||||
{
|
||||
let mut detail = self.secondary_state.detail.lock().unwrap();
|
||||
let timeline_detail = detail.timelines.entry(timeline.timeline_id).or_default();
|
||||
|
||||
tracing::info!("Wrote timeline_detail for {} touched layers", touched.len());
|
||||
|
||||
for t in touched {
|
||||
use std::collections::hash_map::Entry;
|
||||
match timeline_detail.on_disk_layers.entry(t.name.clone()) {
|
||||
Entry::Occupied(mut v) => {
|
||||
v.get_mut().access_time = t.access_time;
|
||||
}
|
||||
Entry::Vacant(e) => {
|
||||
e.insert(OnDiskState::new(
|
||||
self.conf,
|
||||
tenant_shard_id,
|
||||
&timeline.timeline_id,
|
||||
t.name,
|
||||
LayerFileMetadata::from(&t.metadata),
|
||||
t.access_time,
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Scan local storage and build up Layer objects based on the metadata in a HeatMapTimeline
|
||||
async fn init_timeline_state(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
heatmap: &HeatMapTimeline,
|
||||
) -> SecondaryDetailTimeline {
|
||||
let timeline_path = conf.timeline_path(tenant_shard_id, &heatmap.timeline_id);
|
||||
let mut detail = SecondaryDetailTimeline::default();
|
||||
|
||||
let mut dir = match tokio::fs::read_dir(&timeline_path).await {
|
||||
Ok(d) => d,
|
||||
Err(e) => {
|
||||
if e.kind() == std::io::ErrorKind::NotFound {
|
||||
let context = format!("Creating timeline directory {timeline_path}");
|
||||
tracing::info!("{}", context);
|
||||
tokio::fs::create_dir_all(&timeline_path)
|
||||
.await
|
||||
.fatal_err(&context);
|
||||
|
||||
// No entries to report: drop out.
|
||||
return detail;
|
||||
} else {
|
||||
on_fatal_io_error(&e, &format!("Reading timeline dir {timeline_path}"));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// As we iterate through layers found on disk, we will look up their metadata from this map.
|
||||
// Layers not present in metadata will be discarded.
|
||||
let heatmap_metadata: HashMap<&LayerFileName, &HeatMapLayer> =
|
||||
heatmap.layers.iter().map(|l| (&l.name, l)).collect();
|
||||
|
||||
while let Some(dentry) = dir
|
||||
.next_entry()
|
||||
.await
|
||||
.fatal_err(&format!("Listing {timeline_path}"))
|
||||
{
|
||||
let dentry_file_name = dentry.file_name();
|
||||
let file_name = dentry_file_name.to_string_lossy();
|
||||
let local_meta = dentry.metadata().await.fatal_err(&format!(
|
||||
"Read metadata on {}",
|
||||
dentry.path().to_string_lossy()
|
||||
));
|
||||
|
||||
// Secondary mode doesn't use local metadata files, but they might have been left behind by an attached tenant.
|
||||
if file_name == METADATA_FILE_NAME {
|
||||
continue;
|
||||
}
|
||||
|
||||
match LayerFileName::from_str(&file_name) {
|
||||
Ok(name) => {
|
||||
let remote_meta = heatmap_metadata.get(&name);
|
||||
match remote_meta {
|
||||
Some(remote_meta) => {
|
||||
// TODO: checksums for layers (https://github.com/neondatabase/neon/issues/2784)
|
||||
if local_meta.len() != remote_meta.metadata.file_size {
|
||||
// This should not happen, because we do crashsafe write-then-rename when downloading
|
||||
// layers, and layers in remote storage are immutable. Remove the local file because
|
||||
// we cannot trust it.
|
||||
tracing::warn!(
|
||||
"Removing local layer {name} with unexpected local size {} != {}",
|
||||
local_meta.len(),
|
||||
remote_meta.metadata.file_size
|
||||
);
|
||||
} else {
|
||||
// We expect the access time to be initialized immediately afterwards, when
|
||||
// the latest heatmap is applied to the state.
|
||||
detail.on_disk_layers.insert(
|
||||
name.clone(),
|
||||
OnDiskState::new(
|
||||
conf,
|
||||
tenant_shard_id,
|
||||
&heatmap.timeline_id,
|
||||
name,
|
||||
LayerFileMetadata::from(&remote_meta.metadata),
|
||||
remote_meta.access_time,
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
None => {
|
||||
// FIXME: consider some optimization when transitioning from attached to secondary: maybe
|
||||
// wait until we have seen a heatmap that is more recent than the most recent on-disk state? Otherwise
|
||||
// we will end up deleting any layers which were created+uploaded more recently than the heatmap.
|
||||
tracing::info!(
|
||||
"Removing secondary local layer {} because it's absent in heatmap",
|
||||
name
|
||||
);
|
||||
tokio::fs::remove_file(&dentry.path())
|
||||
.await
|
||||
.or_else(fs_ext::ignore_not_found)
|
||||
.fatal_err(&format!(
|
||||
"Removing layer {}",
|
||||
dentry.path().to_string_lossy()
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
// Ignore it.
|
||||
tracing::warn!("Unexpected file in timeline directory: {file_name}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
detail
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
pin::Pin,
|
||||
sync::{Arc, Weak},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
@@ -7,35 +8,86 @@ use std::{
|
||||
use crate::{
|
||||
metrics::SECONDARY_MODE,
|
||||
tenant::{
|
||||
config::AttachmentMode, mgr::TenantManager, remote_timeline_client::remote_heatmap_path,
|
||||
secondary::CommandResponse, span::debug_assert_current_span_has_tenant_id, Tenant,
|
||||
config::AttachmentMode,
|
||||
mgr::TenantManager,
|
||||
remote_timeline_client::remote_heatmap_path,
|
||||
span::debug_assert_current_span_has_tenant_id,
|
||||
tasks::{warn_when_period_overrun, BackgroundLoopKind},
|
||||
Tenant,
|
||||
},
|
||||
};
|
||||
|
||||
use futures::Future;
|
||||
use md5;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use rand::Rng;
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
|
||||
use tokio::task::JoinSet;
|
||||
use super::{
|
||||
scheduler::{self, JobGenerator, RunningJob, SchedulingResult, TenantBackgroundJobs},
|
||||
CommandRequest,
|
||||
};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::instrument;
|
||||
use utils::{backoff, completion::Barrier};
|
||||
use tracing::{info_span, instrument, Instrument};
|
||||
use utils::{backoff, completion::Barrier, yielding_loop::yielding_loop};
|
||||
|
||||
use super::{heatmap::HeatMapTenant, CommandRequest, UploadCommand};
|
||||
use super::{heatmap::HeatMapTenant, UploadCommand};
|
||||
|
||||
/// Period between heatmap uploader walking Tenants to look for work to do.
|
||||
/// If any tenants have a heatmap upload period lower than this, it will be adjusted
|
||||
/// downward to match.
|
||||
const DEFAULT_SCHEDULING_INTERVAL: Duration = Duration::from_millis(60000);
|
||||
const MIN_SCHEDULING_INTERVAL: Duration = Duration::from_millis(1000);
|
||||
pub(super) async fn heatmap_uploader_task(
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
remote_storage: GenericRemoteStorage,
|
||||
command_queue: tokio::sync::mpsc::Receiver<CommandRequest<UploadCommand>>,
|
||||
background_jobs_can_start: Barrier,
|
||||
cancel: CancellationToken,
|
||||
) {
|
||||
let concurrency = tenant_manager.get_conf().heatmap_upload_concurrency;
|
||||
|
||||
let generator = HeatmapUploader {
|
||||
tenant_manager,
|
||||
remote_storage,
|
||||
cancel: cancel.clone(),
|
||||
tenants: HashMap::new(),
|
||||
};
|
||||
let mut scheduler = Scheduler::new(generator, concurrency);
|
||||
|
||||
scheduler
|
||||
.run(command_queue, background_jobs_can_start, cancel)
|
||||
.instrument(info_span!("heatmap_uploader"))
|
||||
.await
|
||||
}
|
||||
|
||||
/// This type is owned by a single task ([`heatmap_uploader_task`]) which runs an event
|
||||
/// handling loop and mutates it as needed: there are no locks here, because that event loop
|
||||
/// can hold &mut references to this type throughout.
|
||||
struct HeatmapUploader {
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
remote_storage: GenericRemoteStorage,
|
||||
cancel: CancellationToken,
|
||||
|
||||
tenants: HashMap<TenantShardId, UploaderTenantState>,
|
||||
}
|
||||
|
||||
struct WriteInProgress {
|
||||
barrier: Barrier,
|
||||
}
|
||||
|
||||
impl RunningJob for WriteInProgress {
|
||||
fn get_barrier(&self) -> Barrier {
|
||||
self.barrier.clone()
|
||||
}
|
||||
}
|
||||
|
||||
struct UploadPending {
|
||||
tenant: Arc<Tenant>,
|
||||
last_digest: Option<md5::Digest>,
|
||||
target_time: Option<Instant>,
|
||||
period: Option<Duration>,
|
||||
}
|
||||
|
||||
impl scheduler::PendingJob for UploadPending {
|
||||
fn get_tenant_shard_id(&self) -> &TenantShardId {
|
||||
self.tenant.get_tenant_shard_id()
|
||||
}
|
||||
}
|
||||
|
||||
struct WriteComplete {
|
||||
@@ -45,6 +97,12 @@ struct WriteComplete {
|
||||
next_upload: Option<Instant>,
|
||||
}
|
||||
|
||||
impl scheduler::Completion for WriteComplete {
|
||||
fn get_tenant_shard_id(&self) -> &TenantShardId {
|
||||
&self.tenant_shard_id
|
||||
}
|
||||
}
|
||||
|
||||
/// The heatmap uploader keeps a little bit of per-tenant state, mainly to remember
|
||||
/// when we last did a write. We only populate this after doing at least one
|
||||
/// write for a tenant -- this avoids holding state for tenants that have
|
||||
@@ -68,267 +126,111 @@ struct UploaderTenantState {
|
||||
next_upload: Option<Instant>,
|
||||
}
|
||||
|
||||
/// This type is owned by a single task ([`heatmap_uploader_task`]) which runs an event
|
||||
/// handling loop and mutates it as needed: there are no locks here, because that event loop
|
||||
/// can hold &mut references to this type throughout.
|
||||
struct HeatmapUploader {
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
remote_storage: GenericRemoteStorage,
|
||||
cancel: CancellationToken,
|
||||
type Scheduler = TenantBackgroundJobs<
|
||||
HeatmapUploader,
|
||||
UploadPending,
|
||||
WriteInProgress,
|
||||
WriteComplete,
|
||||
UploadCommand,
|
||||
>;
|
||||
|
||||
tenants: HashMap<TenantShardId, UploaderTenantState>,
|
||||
|
||||
/// Tenants with work to do, for which tasks should be spawned as soon as concurrency
|
||||
/// limits permit it.
|
||||
tenants_pending: std::collections::VecDeque<UploadPending>,
|
||||
|
||||
/// Tenants for which a task in `tasks` has been spawned.
|
||||
tenants_uploading: HashMap<TenantShardId, WriteInProgress>,
|
||||
|
||||
tasks: JoinSet<()>,
|
||||
|
||||
/// Channel for our child tasks to send results to: we use a channel for results rather than
|
||||
/// just getting task results via JoinSet because we need the channel's recv() "sleep until something
|
||||
/// is available" semantic, rather than JoinSet::join_next()'s "sleep until next thing is available _or_ I'm empty"
|
||||
/// behavior.
|
||||
task_result_tx: tokio::sync::mpsc::UnboundedSender<WriteComplete>,
|
||||
task_result_rx: tokio::sync::mpsc::UnboundedReceiver<WriteComplete>,
|
||||
|
||||
concurrent_uploads: usize,
|
||||
|
||||
scheduling_interval: Duration,
|
||||
}
|
||||
|
||||
/// The uploader task runs a loop that periodically wakes up and schedules tasks for
|
||||
/// tenants that require an upload, or handles any commands that have been sent into
|
||||
/// `command_queue`. No I/O is done in this loop: that all happens in the tasks we
|
||||
/// spawn.
|
||||
///
|
||||
/// Scheduling iterations are somewhat infrequent. However, each one will enqueue
|
||||
/// all tenants that require an upload, and in between scheduling iterations we will
|
||||
/// continue to spawn new tasks for pending tenants, as our concurrency limit permits.
|
||||
///
|
||||
/// While we take a CancellationToken here, it is subordinate to the CancellationTokens
|
||||
/// of tenants: i.e. we expect all Tenants to have been shut down before we are shut down, otherwise
|
||||
/// we might block waiting on a Tenant.
|
||||
pub(super) async fn heatmap_uploader_task(
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
remote_storage: GenericRemoteStorage,
|
||||
mut command_queue: tokio::sync::mpsc::Receiver<CommandRequest<UploadCommand>>,
|
||||
background_jobs_can_start: Barrier,
|
||||
cancel: CancellationToken,
|
||||
) -> anyhow::Result<()> {
|
||||
let concurrent_uploads = tenant_manager.get_conf().heatmap_upload_concurrency;
|
||||
|
||||
let (result_tx, result_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
|
||||
let mut uploader = HeatmapUploader {
|
||||
tenant_manager,
|
||||
remote_storage,
|
||||
cancel: cancel.clone(),
|
||||
tasks: JoinSet::new(),
|
||||
tenants: HashMap::new(),
|
||||
tenants_pending: std::collections::VecDeque::new(),
|
||||
tenants_uploading: HashMap::new(),
|
||||
task_result_tx: result_tx,
|
||||
task_result_rx: result_rx,
|
||||
concurrent_uploads,
|
||||
scheduling_interval: DEFAULT_SCHEDULING_INTERVAL,
|
||||
};
|
||||
|
||||
tracing::info!("Waiting for background_jobs_can start...");
|
||||
background_jobs_can_start.wait().await;
|
||||
tracing::info!("background_jobs_can is ready, proceeding.");
|
||||
|
||||
while !cancel.is_cancelled() {
|
||||
// Look for new work: this is relatively expensive because we have to go acquire the lock on
|
||||
// the tenant manager to retrieve tenants, and then iterate over them to figure out which ones
|
||||
// require an upload.
|
||||
uploader.schedule_iteration().await?;
|
||||
|
||||
// Between scheduling iterations, we will:
|
||||
// - Drain any complete tasks and spawn pending tasks
|
||||
// - Handle incoming administrative commands
|
||||
// - Check our cancellation token
|
||||
let next_scheduling_iteration = Instant::now()
|
||||
.checked_add(uploader.scheduling_interval)
|
||||
.unwrap_or_else(|| {
|
||||
tracing::warn!(
|
||||
"Scheduling interval invalid ({}s), running immediately!",
|
||||
uploader.scheduling_interval.as_secs_f64()
|
||||
);
|
||||
Instant::now()
|
||||
});
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => {
|
||||
// We do not simply drop the JoinSet, in order to have an orderly shutdown without cancellation.
|
||||
tracing::info!("Heatmap uploader joining tasks");
|
||||
while let Some(_r) = uploader.tasks.join_next().await {};
|
||||
tracing::info!("Heatmap uploader terminating");
|
||||
|
||||
break;
|
||||
},
|
||||
_ = tokio::time::sleep(next_scheduling_iteration.duration_since(Instant::now())) => {
|
||||
tracing::debug!("heatmap_uploader_task: woke for scheduling interval");
|
||||
break;},
|
||||
cmd = command_queue.recv() => {
|
||||
tracing::debug!("heatmap_uploader_task: woke for command queue");
|
||||
let cmd = match cmd {
|
||||
Some(c) =>c,
|
||||
None => {
|
||||
// SecondaryController was destroyed, and this has raced with
|
||||
// our CancellationToken
|
||||
tracing::info!("Heatmap uploader terminating");
|
||||
cancel.cancel();
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
let CommandRequest{
|
||||
response_tx,
|
||||
payload
|
||||
} = cmd;
|
||||
uploader.handle_command(payload, response_tx);
|
||||
},
|
||||
_ = uploader.process_next_completion() => {
|
||||
if !cancel.is_cancelled() {
|
||||
uploader.spawn_pending();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl HeatmapUploader {
|
||||
/// Periodic execution phase: inspect all attached tenants and schedule any work they require.
|
||||
async fn schedule_iteration(&mut self) -> anyhow::Result<()> {
|
||||
#[async_trait::async_trait]
|
||||
impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
|
||||
for HeatmapUploader
|
||||
{
|
||||
async fn schedule(&mut self) -> SchedulingResult<UploadPending> {
|
||||
// Cull any entries in self.tenants whose Arc<Tenant> is gone
|
||||
self.tenants
|
||||
.retain(|_k, v| v.tenant.upgrade().is_some() && v.next_upload.is_some());
|
||||
|
||||
// The priority order of previously scheduled work may be invalidated by current state: drop
|
||||
// all pending work (it will be re-scheduled if still needed)
|
||||
self.tenants_pending.clear();
|
||||
|
||||
// Used a fixed 'now' through the following loop, for efficiency and fairness.
|
||||
let now = Instant::now();
|
||||
|
||||
// While iterating over the potentially-long list of tenants, we will periodically yield
|
||||
// to avoid blocking executor.
|
||||
const YIELD_ITERATIONS: usize = 1000;
|
||||
let mut result = SchedulingResult {
|
||||
jobs: Vec::new(),
|
||||
want_interval: None,
|
||||
};
|
||||
|
||||
// Iterate over tenants looking for work to do.
|
||||
let tenants = self.tenant_manager.get_attached_active_tenant_shards();
|
||||
for (i, tenant) in tenants.into_iter().enumerate() {
|
||||
// Process is shutting down, drop out
|
||||
if self.cancel.is_cancelled() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Skip tenants that already have a write in flight
|
||||
if self
|
||||
.tenants_uploading
|
||||
.contains_key(tenant.get_tenant_shard_id())
|
||||
{
|
||||
continue;
|
||||
}
|
||||
yielding_loop(1000, &self.cancel, tenants.into_iter(), |tenant| {
|
||||
let period = match tenant.get_heatmap_period() {
|
||||
None => {
|
||||
// Heatmaps are disabled for this tenant
|
||||
return;
|
||||
}
|
||||
Some(period) => {
|
||||
// If any tenant has asked for uploads more frequent than our scheduling interval,
|
||||
// reduce it to match so that we can keep up. This is mainly useful in testing, where
|
||||
// we may set rather short intervals.
|
||||
result.want_interval = match result.want_interval {
|
||||
None => Some(period),
|
||||
Some(existing) => Some(std::cmp::min(period, existing)),
|
||||
};
|
||||
|
||||
self.maybe_schedule_upload(&now, tenant);
|
||||
period
|
||||
}
|
||||
};
|
||||
|
||||
if i + 1 % YIELD_ITERATIONS == 0 {
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
}
|
||||
|
||||
// Spawn tasks for as many of our pending tenants as we can.
|
||||
self.spawn_pending();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// Cancellation: this method is cancel-safe.
|
||||
async fn process_next_completion(&mut self) {
|
||||
match self.task_result_rx.recv().await {
|
||||
Some(r) => {
|
||||
self.on_completion(r);
|
||||
}
|
||||
None => {
|
||||
unreachable!("Result sender is stored on Self");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The 'maybe' refers to the tenant's state: whether it is configured
|
||||
/// for heatmap uploads at all, and whether sufficient time has passed
|
||||
/// since the last upload.
|
||||
fn maybe_schedule_upload(&mut self, now: &Instant, tenant: Arc<Tenant>) {
|
||||
match tenant.get_heatmap_period() {
|
||||
None => {
|
||||
// Heatmaps are disabled for this tenant
|
||||
// Stale attachments do not upload anything: if we are in this state, there is probably some
|
||||
// other attachment in mode Single or Multi running on another pageserver, and we don't
|
||||
// want to thrash and overwrite their heatmap uploads.
|
||||
if tenant.get_attach_mode() == AttachmentMode::Stale {
|
||||
return;
|
||||
}
|
||||
Some(period) => {
|
||||
// If any tenant has asked for uploads more frequent than our scheduling interval,
|
||||
// reduce it to match so that we can keep up. This is mainly useful in testing, where
|
||||
// we may set rather short intervals.
|
||||
if period < self.scheduling_interval {
|
||||
self.scheduling_interval = std::cmp::max(period, MIN_SCHEDULING_INTERVAL);
|
||||
}
|
||||
|
||||
// Create an entry in self.tenants if one doesn't already exist: this will later be updated
|
||||
// with the completion time in on_completion.
|
||||
let state = self
|
||||
.tenants
|
||||
.entry(*tenant.get_tenant_shard_id())
|
||||
.or_insert_with(|| {
|
||||
let jittered_period = rand::thread_rng().gen_range(Duration::ZERO..period);
|
||||
|
||||
UploaderTenantState {
|
||||
tenant: Arc::downgrade(&tenant),
|
||||
last_upload: None,
|
||||
next_upload: Some(now.checked_add(jittered_period).unwrap_or(now)),
|
||||
last_digest: None,
|
||||
}
|
||||
});
|
||||
|
||||
// Decline to do the upload if insufficient time has passed
|
||||
if state.next_upload.map(|nu| nu > now).unwrap_or(false) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Stale attachments do not upload anything: if we are in this state, there is probably some
|
||||
// other attachment in mode Single or Multi running on another pageserver, and we don't
|
||||
// want to thrash and overwrite their heatmap uploads.
|
||||
if tenant.get_attach_mode() == AttachmentMode::Stale {
|
||||
return;
|
||||
}
|
||||
|
||||
// Create an entry in self.tenants if one doesn't already exist: this will later be updated
|
||||
// with the completion time in on_completion.
|
||||
let state = self
|
||||
.tenants
|
||||
.entry(*tenant.get_tenant_shard_id())
|
||||
.or_insert_with(|| UploaderTenantState {
|
||||
tenant: Arc::downgrade(&tenant),
|
||||
last_upload: None,
|
||||
next_upload: Some(Instant::now()),
|
||||
last_digest: None,
|
||||
let last_digest = state.last_digest;
|
||||
result.jobs.push(UploadPending {
|
||||
tenant,
|
||||
last_digest,
|
||||
target_time: state.next_upload,
|
||||
period: Some(period),
|
||||
});
|
||||
})
|
||||
.await
|
||||
.ok();
|
||||
|
||||
// Decline to do the upload if insufficient time has passed
|
||||
if state.next_upload.map(|nu| &nu > now).unwrap_or(false) {
|
||||
return;
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
let last_digest = state.last_digest;
|
||||
self.tenants_pending.push_back(UploadPending {
|
||||
fn spawn(
|
||||
&mut self,
|
||||
job: UploadPending,
|
||||
) -> (
|
||||
WriteInProgress,
|
||||
Pin<Box<dyn Future<Output = WriteComplete> + Send>>,
|
||||
) {
|
||||
let UploadPending {
|
||||
tenant,
|
||||
last_digest,
|
||||
})
|
||||
}
|
||||
target_time,
|
||||
period,
|
||||
} = job;
|
||||
|
||||
fn spawn_pending(&mut self) {
|
||||
while !self.tenants_pending.is_empty()
|
||||
&& self.tenants_uploading.len() < self.concurrent_uploads
|
||||
{
|
||||
// unwrap: loop condition includes !is_empty()
|
||||
let pending = self.tenants_pending.pop_front().unwrap();
|
||||
self.spawn_upload(pending.tenant, pending.last_digest);
|
||||
}
|
||||
}
|
||||
|
||||
fn spawn_upload(&mut self, tenant: Arc<Tenant>, last_digest: Option<md5::Digest>) {
|
||||
let remote_storage = self.remote_storage.clone();
|
||||
let tenant_shard_id = *tenant.get_tenant_shard_id();
|
||||
let (completion, barrier) = utils::completion::channel();
|
||||
let result_tx = self.task_result_tx.clone();
|
||||
self.tasks.spawn(async move {
|
||||
let tenant_shard_id = *tenant.get_tenant_shard_id();
|
||||
(WriteInProgress { barrier }, Box::pin(async move {
|
||||
// Guard for the barrier in [`WriteInProgress`]
|
||||
let _completion = completion;
|
||||
|
||||
@@ -362,22 +264,47 @@ impl HeatmapUploader {
|
||||
};
|
||||
|
||||
let now = Instant::now();
|
||||
|
||||
// If the job had a target execution time, we may check our final execution
|
||||
// time against that for observability purposes.
|
||||
if let (Some(target_time), Some(period)) = (target_time, period) {
|
||||
// Elapsed time includes any scheduling lag as well as the execution of the job
|
||||
let elapsed = now.duration_since(target_time);
|
||||
|
||||
warn_when_period_overrun(elapsed, period, BackgroundLoopKind::HeatmapUpload);
|
||||
}
|
||||
|
||||
let next_upload = tenant
|
||||
.get_heatmap_period()
|
||||
.and_then(|period| now.checked_add(period));
|
||||
|
||||
result_tx
|
||||
.send(WriteComplete {
|
||||
WriteComplete {
|
||||
tenant_shard_id: *tenant.get_tenant_shard_id(),
|
||||
completed_at: now,
|
||||
digest,
|
||||
next_upload,
|
||||
})
|
||||
.ok();
|
||||
});
|
||||
}
|
||||
}.instrument(info_span!(parent: None, "heatmap_upload", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))))
|
||||
}
|
||||
|
||||
self.tenants_uploading
|
||||
.insert(tenant_shard_id, WriteInProgress { barrier });
|
||||
fn on_command(&mut self, command: UploadCommand) -> anyhow::Result<UploadPending> {
|
||||
let tenant_shard_id = command.get_tenant_shard_id();
|
||||
|
||||
tracing::info!(
|
||||
tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
|
||||
"Starting heatmap write on command");
|
||||
let tenant = self
|
||||
.tenant_manager
|
||||
.get_attached_tenant_shard(*tenant_shard_id, true)
|
||||
.map_err(|e| anyhow::anyhow!(e))?;
|
||||
|
||||
Ok(UploadPending {
|
||||
// Ignore our state for last digest: this forces an upload even if nothing has changed
|
||||
last_digest: None,
|
||||
tenant,
|
||||
target_time: None,
|
||||
period: None,
|
||||
})
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(tenant_id=%completion.tenant_shard_id.tenant_id, shard_id=%completion.tenant_shard_id.shard_slug()))]
|
||||
@@ -389,7 +316,6 @@ impl HeatmapUploader {
|
||||
digest,
|
||||
next_upload,
|
||||
} = completion;
|
||||
self.tenants_uploading.remove(&tenant_shard_id);
|
||||
use std::collections::hash_map::Entry;
|
||||
match self.tenants.entry(tenant_shard_id) {
|
||||
Entry::Vacant(_) => {
|
||||
@@ -402,69 +328,6 @@ impl HeatmapUploader {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_command(
|
||||
&mut self,
|
||||
command: UploadCommand,
|
||||
response_tx: tokio::sync::oneshot::Sender<CommandResponse>,
|
||||
) {
|
||||
match command {
|
||||
UploadCommand::Upload(tenant_shard_id) => {
|
||||
// If an upload was ongoing for this tenant, let it finish first.
|
||||
let barrier = if let Some(writing_state) =
|
||||
self.tenants_uploading.get(&tenant_shard_id)
|
||||
{
|
||||
tracing::info!(
|
||||
tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
|
||||
"Waiting for heatmap write to complete");
|
||||
writing_state.barrier.clone()
|
||||
} else {
|
||||
// Spawn the upload then immediately wait for it. This will block processing of other commands and
|
||||
// starting of other background work.
|
||||
tracing::info!(
|
||||
tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
|
||||
"Starting heatmap write on command");
|
||||
let tenant = match self
|
||||
.tenant_manager
|
||||
.get_attached_tenant_shard(tenant_shard_id, true)
|
||||
{
|
||||
Ok(t) => t,
|
||||
Err(e) => {
|
||||
// Drop result of send: we don't care if caller dropped their receiver
|
||||
drop(response_tx.send(CommandResponse {
|
||||
result: Err(e.into()),
|
||||
}));
|
||||
return;
|
||||
}
|
||||
};
|
||||
self.spawn_upload(tenant, None);
|
||||
let writing_state = self
|
||||
.tenants_uploading
|
||||
.get(&tenant_shard_id)
|
||||
.expect("We just inserted this");
|
||||
tracing::info!(
|
||||
tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
|
||||
"Waiting for heatmap upload to complete");
|
||||
|
||||
writing_state.barrier.clone()
|
||||
};
|
||||
|
||||
// This task does no I/O: it only listens for a barrier's completion and then
|
||||
// sends to the command response channel. It is therefore safe to spawn this without
|
||||
// any gates/task_mgr hooks.
|
||||
tokio::task::spawn(async move {
|
||||
barrier.wait().await;
|
||||
|
||||
tracing::info!(
|
||||
tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
|
||||
"Heatmap upload complete");
|
||||
|
||||
// Drop result of send: we don't care if caller dropped their receiver
|
||||
drop(response_tx.send(CommandResponse { result: Ok(()) }))
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum UploadHeatmapOutcome {
|
||||
@@ -487,7 +350,6 @@ enum UploadHeatmapError {
|
||||
|
||||
/// The inner upload operation. This will skip if `last_digest` is Some and matches the digest
|
||||
/// of the object we would have uploaded.
|
||||
#[instrument(skip_all, fields(tenant_id = %tenant.get_tenant_shard_id().tenant_id, shard_id = %tenant.get_tenant_shard_id().shard_slug()))]
|
||||
async fn upload_tenant_heatmap(
|
||||
remote_storage: GenericRemoteStorage,
|
||||
tenant: &Arc<Tenant>,
|
||||
|
||||
361
pageserver/src/tenant/secondary/scheduler.rs
Normal file
361
pageserver/src/tenant/secondary/scheduler.rs
Normal file
@@ -0,0 +1,361 @@
|
||||
use async_trait;
|
||||
use futures::Future;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
marker::PhantomData,
|
||||
pin::Pin,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::{completion::Barrier, yielding_loop::yielding_loop};
|
||||
|
||||
use super::{CommandRequest, CommandResponse};
|
||||
|
||||
/// Scheduling interval is the time between calls to JobGenerator::schedule.
|
||||
/// When we schedule jobs, the job generator may provide a hint of its preferred
|
||||
/// interval, which we will respect within these intervals.
|
||||
const MAX_SCHEDULING_INTERVAL: Duration = Duration::from_secs(10);
|
||||
const MIN_SCHEDULING_INTERVAL: Duration = Duration::from_secs(1);
|
||||
|
||||
/// Scheduling helper for background work across many tenants.
|
||||
///
|
||||
/// Systems that need to run background work across many tenants may use this type
|
||||
/// to schedule jobs within a concurrency limit, along with their own [`JobGenerator`]
|
||||
/// implementation to provide the work to execute. This is a simple scheduler that just
|
||||
/// polls the generator for outstanding work, replacing its queue of pending work with
|
||||
/// what the generator yields on each call: the job generator can change its mind about
|
||||
/// the order of jobs between calls. The job generator is notified when jobs complete,
|
||||
/// and additionally may expose a command hook to generate jobs on-demand (e.g. to implement
|
||||
/// admin APIs).
|
||||
///
|
||||
/// For an example see [`crate::tenant::secondary::heatmap_uploader`]
|
||||
///
|
||||
/// G: A JobGenerator that this scheduler will poll to find pending jobs
|
||||
/// PJ: 'Pending Job': type for job descriptors that are ready to run
|
||||
/// RJ: 'Running Job' type' for jobs that have been spawned
|
||||
/// C : 'Completion' type that spawned jobs will send when they finish
|
||||
/// CMD: 'Command' type that the job generator will accept to create jobs on-demand
|
||||
pub(super) struct TenantBackgroundJobs<G, PJ, RJ, C, CMD>
|
||||
where
|
||||
G: JobGenerator<PJ, RJ, C, CMD>,
|
||||
C: Completion,
|
||||
PJ: PendingJob,
|
||||
RJ: RunningJob,
|
||||
{
|
||||
generator: G,
|
||||
|
||||
/// Ready to run. Will progress to `running` once concurrent limit is satisfied, or
|
||||
/// be removed on next scheduling pass.
|
||||
pending: std::collections::VecDeque<PJ>,
|
||||
|
||||
/// Tasks currently running in Self::tasks for these tenants. Check this map
|
||||
/// before pushing more work into pending for the same tenant.
|
||||
running: HashMap<TenantShardId, RJ>,
|
||||
|
||||
tasks: JoinSet<C>,
|
||||
|
||||
concurrency: usize,
|
||||
|
||||
/// How often we would like schedule_interval to be called.
|
||||
pub(super) scheduling_interval: Duration,
|
||||
|
||||
_phantom: PhantomData<(PJ, RJ, C, CMD)>,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub(crate) trait JobGenerator<PJ, RJ, C, CMD>
|
||||
where
|
||||
C: Completion,
|
||||
PJ: PendingJob,
|
||||
RJ: RunningJob,
|
||||
{
|
||||
/// Called at each scheduling interval. Return a list of jobs to run, most urgent first.
|
||||
///
|
||||
/// This function may be expensive (e.g. walk all tenants), but should not do any I/O.
|
||||
/// Implementations should take care to yield the executor periodically if running
|
||||
/// very long loops.
|
||||
///
|
||||
/// Yielding a job here does _not_ guarantee that it will run: if the queue of pending
|
||||
/// jobs is not drained by the next scheduling interval, pending jobs will be cleared
|
||||
/// and re-generated.
|
||||
async fn schedule(&mut self) -> SchedulingResult<PJ>;
|
||||
|
||||
/// Called when a pending job is ready to be run.
|
||||
///
|
||||
/// The job generation provides a future, and a RJ (Running Job) descriptor that tracks it.
|
||||
fn spawn(&mut self, pending_job: PJ) -> (RJ, Pin<Box<dyn Future<Output = C> + Send>>);
|
||||
|
||||
/// Called when a job previously spawned with spawn() transmits its completion
|
||||
fn on_completion(&mut self, completion: C);
|
||||
|
||||
/// Called when a command is received. A job will be spawned immediately if the return
|
||||
/// value is Some, ignoring concurrency limits and the pending queue.
|
||||
fn on_command(&mut self, cmd: CMD) -> anyhow::Result<PJ>;
|
||||
}
|
||||
|
||||
/// [`JobGenerator`] returns this to provide pending jobs, and hints about scheduling
|
||||
pub(super) struct SchedulingResult<PJ> {
|
||||
pub(super) jobs: Vec<PJ>,
|
||||
/// The job generator would like to be called again this soon
|
||||
pub(super) want_interval: Option<Duration>,
|
||||
}
|
||||
|
||||
/// See [`TenantBackgroundJobs`].
|
||||
pub(super) trait PendingJob {
|
||||
fn get_tenant_shard_id(&self) -> &TenantShardId;
|
||||
}
|
||||
|
||||
/// See [`TenantBackgroundJobs`].
|
||||
pub(super) trait Completion: Send + 'static {
|
||||
fn get_tenant_shard_id(&self) -> &TenantShardId;
|
||||
}
|
||||
|
||||
/// See [`TenantBackgroundJobs`].
|
||||
pub(super) trait RunningJob {
|
||||
fn get_barrier(&self) -> Barrier;
|
||||
}
|
||||
|
||||
impl<G, PJ, RJ, C, CMD> TenantBackgroundJobs<G, PJ, RJ, C, CMD>
|
||||
where
|
||||
C: Completion,
|
||||
PJ: PendingJob,
|
||||
RJ: RunningJob,
|
||||
G: JobGenerator<PJ, RJ, C, CMD>,
|
||||
{
|
||||
pub(super) fn new(generator: G, concurrency: usize) -> Self {
|
||||
Self {
|
||||
generator,
|
||||
pending: std::collections::VecDeque::new(),
|
||||
running: HashMap::new(),
|
||||
tasks: JoinSet::new(),
|
||||
concurrency,
|
||||
scheduling_interval: MAX_SCHEDULING_INTERVAL,
|
||||
_phantom: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn run(
|
||||
&mut self,
|
||||
mut command_queue: tokio::sync::mpsc::Receiver<CommandRequest<CMD>>,
|
||||
background_jobs_can_start: Barrier,
|
||||
cancel: CancellationToken,
|
||||
) {
|
||||
tracing::info!("Waiting for background_jobs_can start...");
|
||||
background_jobs_can_start.wait().await;
|
||||
tracing::info!("background_jobs_can is ready, proceeding.");
|
||||
|
||||
while !cancel.is_cancelled() {
|
||||
// Look for new work: this is relatively expensive because we have to go acquire the lock on
|
||||
// the tenant manager to retrieve tenants, and then iterate over them to figure out which ones
|
||||
// require an upload.
|
||||
self.schedule_iteration(&cancel).await;
|
||||
|
||||
if cancel.is_cancelled() {
|
||||
return;
|
||||
}
|
||||
|
||||
// Schedule some work, if concurrency limit permits it
|
||||
self.spawn_pending();
|
||||
|
||||
// Between scheduling iterations, we will:
|
||||
// - Drain any complete tasks and spawn pending tasks
|
||||
// - Handle incoming administrative commands
|
||||
// - Check our cancellation token
|
||||
let next_scheduling_iteration = Instant::now()
|
||||
.checked_add(self.scheduling_interval)
|
||||
.unwrap_or_else(|| {
|
||||
tracing::warn!(
|
||||
"Scheduling interval invalid ({}s)",
|
||||
self.scheduling_interval.as_secs_f64()
|
||||
);
|
||||
// unwrap(): this constant is small, cannot fail to add to time unless
|
||||
// we are close to the end of the universe.
|
||||
Instant::now().checked_add(MIN_SCHEDULING_INTERVAL).unwrap()
|
||||
});
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => {
|
||||
tracing::info!("joining tasks");
|
||||
// We do not simply drop the JoinSet, in order to have an orderly shutdown without cancellation.
|
||||
// It is the callers responsibility to make sure that the tasks they scheduled
|
||||
// respect an appropriate cancellation token, to shut down promptly. It is only
|
||||
// safe to wait on joining these tasks because we can see the cancellation token
|
||||
// has been set.
|
||||
while let Some(_r) = self.tasks.join_next().await {}
|
||||
tracing::info!("terminating on cancellation token.");
|
||||
|
||||
break;
|
||||
},
|
||||
_ = tokio::time::sleep(next_scheduling_iteration.duration_since(Instant::now())) => {
|
||||
tracing::debug!("woke for scheduling interval");
|
||||
break;},
|
||||
cmd = command_queue.recv() => {
|
||||
tracing::debug!("woke for command queue");
|
||||
let cmd = match cmd {
|
||||
Some(c) =>c,
|
||||
None => {
|
||||
// SecondaryController was destroyed, and this has raced with
|
||||
// our CancellationToken
|
||||
tracing::info!("terminating on command queue destruction");
|
||||
cancel.cancel();
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
let CommandRequest{
|
||||
response_tx,
|
||||
payload
|
||||
} = cmd;
|
||||
self.handle_command(payload, response_tx);
|
||||
},
|
||||
_ = async {
|
||||
let completion = self.process_next_completion().await;
|
||||
match completion {
|
||||
Some(c) => {
|
||||
self.generator.on_completion(c);
|
||||
if !cancel.is_cancelled() {
|
||||
self.spawn_pending();
|
||||
}
|
||||
},
|
||||
None => {
|
||||
// Nothing is running, so just wait: expect that this future
|
||||
// will be dropped when something in the outer select! fires.
|
||||
cancel.cancelled().await;
|
||||
}
|
||||
}
|
||||
|
||||
} => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn do_spawn(&mut self, job: PJ) {
|
||||
let tenant_shard_id = *job.get_tenant_shard_id();
|
||||
let (in_progress, fut) = self.generator.spawn(job);
|
||||
|
||||
self.tasks.spawn(fut);
|
||||
|
||||
self.running.insert(tenant_shard_id, in_progress);
|
||||
}
|
||||
|
||||
/// For all pending tenants that are elegible for execution, spawn their task.
|
||||
///
|
||||
/// Caller provides the spawn operation, we track the resulting execution.
|
||||
fn spawn_pending(&mut self) {
|
||||
while !self.pending.is_empty() && self.running.len() < self.concurrency {
|
||||
// unwrap: loop condition includes !is_empty()
|
||||
let pending = self.pending.pop_front().unwrap();
|
||||
self.do_spawn(pending);
|
||||
}
|
||||
}
|
||||
|
||||
/// For administrative commands: skip the pending queue, ignore concurrency limits
|
||||
fn spawn_now(&mut self, job: PJ) -> &RJ {
|
||||
let tenant_shard_id = *job.get_tenant_shard_id();
|
||||
self.do_spawn(job);
|
||||
self.running
|
||||
.get(&tenant_shard_id)
|
||||
.expect("We just inserted this")
|
||||
}
|
||||
|
||||
/// Wait until the next task completes, and handle its completion
|
||||
///
|
||||
/// Cancellation: this method is cancel-safe.
|
||||
async fn process_next_completion(&mut self) -> Option<C> {
|
||||
match self.tasks.join_next().await {
|
||||
Some(r) => {
|
||||
// We use a channel to drive completions, but also
|
||||
// need to drain the JoinSet to avoid completed tasks
|
||||
// accumulating. These calls are 1:1 because every task
|
||||
// we spawn into this joinset submits is result to the channel.
|
||||
let completion = r.expect("Panic in background task");
|
||||
|
||||
self.running.remove(completion.get_tenant_shard_id());
|
||||
Some(completion)
|
||||
}
|
||||
None => {
|
||||
// Nothing is running, so we have nothing to wait for. We may drop out: the
|
||||
// main even loop will call us again after the next time it has run something.
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert the command into a pending job, spawn it, and when the spawned
|
||||
/// job completes, send the result down `response_tx`.
|
||||
fn handle_command(
|
||||
&mut self,
|
||||
cmd: CMD,
|
||||
response_tx: tokio::sync::oneshot::Sender<CommandResponse>,
|
||||
) {
|
||||
let job = match self.generator.on_command(cmd) {
|
||||
Ok(j) => j,
|
||||
Err(e) => {
|
||||
response_tx.send(CommandResponse { result: Err(e) }).ok();
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let tenant_shard_id = job.get_tenant_shard_id();
|
||||
let barrier = if let Some(barrier) = self.get_running(tenant_shard_id) {
|
||||
barrier
|
||||
} else {
|
||||
let running = self.spawn_now(job);
|
||||
running.get_barrier().clone()
|
||||
};
|
||||
|
||||
// This task does no I/O: it only listens for a barrier's completion and then
|
||||
// sends to the command response channel. It is therefore safe to spawn this without
|
||||
// any gates/task_mgr hooks.
|
||||
tokio::task::spawn(async move {
|
||||
barrier.wait().await;
|
||||
|
||||
response_tx.send(CommandResponse { result: Ok(()) }).ok();
|
||||
});
|
||||
}
|
||||
|
||||
fn get_running(&self, tenant_shard_id: &TenantShardId) -> Option<Barrier> {
|
||||
self.running.get(tenant_shard_id).map(|r| r.get_barrier())
|
||||
}
|
||||
|
||||
/// Periodic execution phase: inspect all attached tenants and schedule any work they require.
|
||||
///
|
||||
/// The type in `tenants` should be a tenant-like structure, e.g. [`crate::tenant::Tenant`] or [`crate::tenant::secondary::SecondaryTenant`]
|
||||
///
|
||||
/// This function resets the pending list: it is assumed that the caller may change their mind about
|
||||
/// which tenants need work between calls to schedule_iteration.
|
||||
async fn schedule_iteration(&mut self, cancel: &CancellationToken) {
|
||||
let SchedulingResult {
|
||||
jobs,
|
||||
want_interval,
|
||||
} = self.generator.schedule().await;
|
||||
|
||||
// Adjust interval based on feedback from the job generator
|
||||
if let Some(want_interval) = want_interval {
|
||||
// Calculation uses second granularity: this scheduler is not intended for high frequency tasks
|
||||
self.scheduling_interval = Duration::from_secs(std::cmp::min(
|
||||
std::cmp::max(MIN_SCHEDULING_INTERVAL.as_secs(), want_interval.as_secs()),
|
||||
MAX_SCHEDULING_INTERVAL.as_secs(),
|
||||
));
|
||||
}
|
||||
|
||||
// The priority order of previously scheduled work may be invalidated by current state: drop
|
||||
// all pending work (it will be re-scheduled if still needed)
|
||||
self.pending.clear();
|
||||
|
||||
// While iterating over the potentially-long list of tenants, we will periodically yield
|
||||
// to avoid blocking executor.
|
||||
yielding_loop(1000, cancel, jobs.into_iter(), |job| {
|
||||
// Skip tenants that already have a write in flight
|
||||
if !self.running.contains_key(job.get_tenant_shard_id()) {
|
||||
self.pending.push_back(job);
|
||||
}
|
||||
})
|
||||
.await
|
||||
.ok();
|
||||
}
|
||||
}
|
||||
@@ -45,6 +45,8 @@ pub(crate) enum BackgroundLoopKind {
|
||||
ConsumptionMetricsCollectMetrics,
|
||||
ConsumptionMetricsSyntheticSizeWorker,
|
||||
InitialLogicalSizeCalculation,
|
||||
HeatmapUpload,
|
||||
SecondaryDownload,
|
||||
}
|
||||
|
||||
impl BackgroundLoopKind {
|
||||
|
||||
@@ -326,6 +326,10 @@ class PageserverHttpClient(requests.Session):
|
||||
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/heatmap_upload")
|
||||
self.verbose_error(res)
|
||||
|
||||
def tenant_secondary_download(self, tenant_id: TenantId):
|
||||
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/secondary/download")
|
||||
self.verbose_error(res)
|
||||
|
||||
def set_tenant_config(self, tenant_id: TenantId, config: dict[str, Any]):
|
||||
assert "tenant_id" not in config.keys()
|
||||
res = self.put(
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
import random
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, NeonPageserver
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, NeonPageserver, S3Scrubber
|
||||
from fixtures.pageserver.utils import assert_prefix_empty, tenant_delete_wait_completed
|
||||
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
from fixtures.utils import wait_until
|
||||
@@ -251,6 +253,9 @@ def test_live_migration(neon_env_builder: NeonEnvBuilder):
|
||||
flush_ms=5000,
|
||||
)
|
||||
|
||||
# Encourage the new location to download while still in secondary mode
|
||||
pageserver_b.http_client().tenant_secondary_download(tenant_id)
|
||||
|
||||
migrated_generation = env.attachment_service.attach_hook_issue(tenant_id, pageserver_b.id)
|
||||
log.info(f"Acquired generation {migrated_generation} for destination pageserver")
|
||||
assert migrated_generation == initial_generation + 1
|
||||
@@ -258,8 +263,6 @@ def test_live_migration(neon_env_builder: NeonEnvBuilder):
|
||||
# Writes and reads still work in AttachedStale.
|
||||
workload.validate(pageserver_a.id)
|
||||
|
||||
# TODO: call into secondary mode API hooks to do an upload/download sync
|
||||
|
||||
# Generate some more dirty writes: we expect the origin to ingest WAL in
|
||||
# in AttachedStale
|
||||
workload.churn_rows(64, pageserver_a.id, upload=False)
|
||||
@@ -369,3 +372,143 @@ def test_heatmap_uploads(neon_env_builder: NeonEnvBuilder):
|
||||
log.info(f"Read back heatmap: {heatmap_second}")
|
||||
assert heatmap_second != heatmap_first
|
||||
validate_heatmap(heatmap_second)
|
||||
|
||||
|
||||
def list_layers(pageserver, tenant_id: TenantId, timeline_id: TimelineId) -> list[Path]:
|
||||
"""
|
||||
Inspect local storage on a pageserver to discover which layer files are present.
|
||||
|
||||
:return: list of relative paths to layers, from the timeline root.
|
||||
"""
|
||||
timeline_path = pageserver.timeline_dir(tenant_id, timeline_id)
|
||||
|
||||
def relative(p: Path) -> Path:
|
||||
return p.relative_to(timeline_path)
|
||||
|
||||
return sorted(
|
||||
list(
|
||||
map(
|
||||
relative,
|
||||
filter(
|
||||
lambda path: path.name != "metadata"
|
||||
and "ephemeral" not in path.name
|
||||
and "temp" not in path.name,
|
||||
timeline_path.glob("*"),
|
||||
),
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Test the overall data flow in secondary mode:
|
||||
- Heatmap uploads from the attached location
|
||||
- Heatmap & layer downloads from the secondary location
|
||||
- Eviction of layers on the attached location results in deletion
|
||||
on the secondary location as well.
|
||||
"""
|
||||
neon_env_builder.num_pageservers = 2
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
remote_storage_kind=RemoteStorageKind.MOCK_S3,
|
||||
)
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
|
||||
assert env.attachment_service is not None
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
ps_attached = env.pageservers[0]
|
||||
ps_secondary = env.pageservers[1]
|
||||
|
||||
workload = Workload(env, tenant_id, timeline_id)
|
||||
workload.init(env.pageservers[0].id)
|
||||
workload.write_rows(256, ps_attached.id)
|
||||
|
||||
# Configure a secondary location
|
||||
log.info("Setting up secondary location...")
|
||||
ps_secondary.tenant_location_configure(
|
||||
tenant_id,
|
||||
{
|
||||
"mode": "Secondary",
|
||||
"secondary_conf": {"warm": True},
|
||||
"tenant_conf": {},
|
||||
},
|
||||
)
|
||||
readback_conf = ps_secondary.read_tenant_location_conf(tenant_id)
|
||||
log.info(f"Read back conf: {readback_conf}")
|
||||
|
||||
# Explicit upload/download cycle
|
||||
# ==============================
|
||||
log.info("Synchronizing after initial write...")
|
||||
ps_attached.http_client().tenant_heatmap_upload(tenant_id)
|
||||
|
||||
ps_secondary.http_client().tenant_secondary_download(tenant_id)
|
||||
|
||||
assert list_layers(ps_attached, tenant_id, timeline_id) == list_layers(
|
||||
ps_secondary, tenant_id, timeline_id
|
||||
)
|
||||
|
||||
# Make changes on attached pageserver, check secondary downloads them
|
||||
# ===================================================================
|
||||
log.info("Synchronizing after subsequent write...")
|
||||
workload.churn_rows(128, ps_attached.id)
|
||||
|
||||
ps_attached.http_client().tenant_heatmap_upload(tenant_id)
|
||||
ps_secondary.http_client().tenant_secondary_download(tenant_id)
|
||||
|
||||
assert list_layers(ps_attached, tenant_id, timeline_id) == list_layers(
|
||||
ps_secondary, tenant_id, timeline_id
|
||||
)
|
||||
|
||||
# FIXME: this sleep is needed to avoid on-demand promotion of the layers we evict, while
|
||||
# walreceiver is still doing something.
|
||||
import time
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
# Do evictions on attached pageserver, check secondary follows along
|
||||
# ==================================================================
|
||||
log.info("Evicting a layer...")
|
||||
layer_to_evict = list_layers(ps_attached, tenant_id, timeline_id)[0]
|
||||
ps_attached.http_client().evict_layer(tenant_id, timeline_id, layer_name=layer_to_evict.name)
|
||||
|
||||
log.info("Synchronizing after eviction...")
|
||||
ps_attached.http_client().tenant_heatmap_upload(tenant_id)
|
||||
ps_secondary.http_client().tenant_secondary_download(tenant_id)
|
||||
|
||||
assert layer_to_evict not in list_layers(ps_attached, tenant_id, timeline_id)
|
||||
assert list_layers(ps_attached, tenant_id, timeline_id) == list_layers(
|
||||
ps_secondary, tenant_id, timeline_id
|
||||
)
|
||||
|
||||
# Scrub the remote storage
|
||||
# ========================
|
||||
# This confirms that the scrubber isn't upset by the presence of the heatmap
|
||||
S3Scrubber(neon_env_builder.test_output_dir, neon_env_builder).scan_metadata()
|
||||
|
||||
# Detach secondary and delete tenant
|
||||
# ===================================
|
||||
# This confirms that the heatmap gets cleaned up as well as other normal content.
|
||||
log.info("Detaching secondary location...")
|
||||
ps_secondary.tenant_location_configure(
|
||||
tenant_id,
|
||||
{
|
||||
"mode": "Detached",
|
||||
"secondary_conf": None,
|
||||
"tenant_conf": {},
|
||||
},
|
||||
)
|
||||
|
||||
log.info("Deleting tenant...")
|
||||
tenant_delete_wait_completed(ps_attached.http_client(), tenant_id, 10)
|
||||
|
||||
assert_prefix_empty(
|
||||
neon_env_builder,
|
||||
prefix="/".join(
|
||||
(
|
||||
"tenants",
|
||||
str(tenant_id),
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user