Compare commits

...

23 Commits

Author SHA1 Message Date
John Spray
0baf91fcac pageserver: include secondary tenants in disk usage eviction 2023-10-26 20:33:47 +01:00
John Spray
cd27f42839 pageserver: pass TenantManager into disk usage eviction task 2023-10-26 20:33:47 +01:00
John Spray
edf303ff44 pageserver: add Layer::for_secondary 2023-10-26 20:33:47 +01:00
John Spray
ec189338eb tests: avoid deprecated log.warn() 2023-10-26 20:33:47 +01:00
John Spray
e22f9b9fbb tests: more logging from attachment_service 2023-10-26 20:33:47 +01:00
John Spray
83fc486636 tests: log in UTC 2023-10-26 20:33:47 +01:00
John Spray
3afec731dc pageserver: add a CancellationToken to Tenant 2023-10-26 20:33:47 +01:00
John Spray
559611f9a6 pageserver: add Tenant::heatmap_hook 2023-10-26 20:33:47 +01:00
John Spray
d56dad3824 tests: add test_pageserver_secondary 2023-10-26 20:33:47 +01:00
John Spray
547acb6653 pageserver: create timelines/ dir when configuring secondary location 2023-10-26 20:33:47 +01:00
John Spray
5264ffc2a9 pageserver/http: add testing routes for secondary mode 2023-10-26 20:33:47 +01:00
John Spray
ee841708ff pageserver: implement flush on location conf update 2023-10-26 20:33:47 +01:00
John Spray
f356df1860 pageserver: more logging during tenant shutdown 2023-10-26 20:33:47 +01:00
John Spray
fbe1da2981 tests: add helpers for location_conf 2023-10-26 20:33:47 +01:00
John Spray
22c94d99f5 tests: refactor a path helper 2023-10-26 20:33:47 +01:00
John Spray
6de414b5a5 tests: add multi attach test 2023-10-26 20:33:47 +01:00
John Spray
8c0ce9723a Refactor Workload into shared location 2023-10-26 20:33:47 +01:00
John Spray
5a4c371e94 pageserver: launch tasks for secondary mode 2023-10-26 20:33:47 +01:00
John Spray
65096ac992 pageserver: add secondary downloader & heatmaps 2023-10-26 20:33:47 +01:00
John Spray
049cb1fb4b Add Timeline::generate_heatmap, remote client heatmap upload 2023-10-26 20:33:47 +01:00
John Spray
640350a2c0 TenantManager: implement hooks for secondary downloads 2023-10-26 20:33:47 +01:00
John Spray
d422105d88 pageserver: start refactoring into TenantManager 2023-10-26 11:14:24 +01:00
John Spray
ead1931167 pageserver: add InProgress top level state & make TenantsMap lock synchronous 2023-10-26 11:13:16 +01:00
27 changed files with 3178 additions and 519 deletions

View File

@@ -221,8 +221,21 @@ async fn handle_attach_hook(mut req: Request<Body>) -> Result<Response<Body>, Ap
generation: 0,
});
if attach_req.pageserver_id.is_some() {
if let Some(attaching_pageserver) = attach_req.pageserver_id.as_ref() {
tenant_state.generation += 1;
tracing::info!(
"attach_hook: issuing generation {} to pageserver {}",
attaching_pageserver,
tenant_state.generation
);
} else if let Some(ps_id) = tenant_state.pageserver {
tracing::info!(
"attach_hook: dropping pageserver {} in generation {}",
ps_id,
tenant_state.generation
);
} else {
tracing::info!("attach_hook: no-op: tenant already has no pageserver");
}
tenant_state.pageserver = attach_req.pageserver_id;
let generation = tenant_state.generation;

View File

@@ -14,7 +14,7 @@ use pageserver::control_plane_client::ControlPlaneClient;
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING};
use pageserver::task_mgr::WALRECEIVER_RUNTIME;
use pageserver::tenant::TenantSharedResources;
use pageserver::tenant::{secondary, TenantSharedResources};
use remote_storage::GenericRemoteStorage;
use tokio::time::Instant;
use tracing::*;
@@ -408,7 +408,7 @@ fn start_pageserver(
// Scan the local 'tenants/' directory and start loading the tenants
let deletion_queue_client = deletion_queue.new_client();
BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
let tenant_manager = BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
conf,
TenantSharedResources {
broker_client: broker_client.clone(),
@@ -418,6 +418,7 @@ fn start_pageserver(
order,
shutdown_pageserver.clone(),
))?;
let tenant_manager = Arc::new(tenant_manager);
BACKGROUND_RUNTIME.spawn({
let init_done_rx = init_done_rx;
@@ -523,6 +524,18 @@ fn start_pageserver(
}
});
let secondary_controller = if let Some(remote_storage) = &remote_storage {
secondary::spawn_tasks(
conf,
tenant_manager.clone(),
remote_storage.clone(),
background_jobs_barrier.clone(),
shutdown_pageserver.clone(),
)
} else {
secondary::null_controller()
};
// shared state between the disk-usage backed eviction background task and the http endpoint
// that allows triggering disk-usage based eviction manually. note that the http endpoint
// is still accessible even if background task is not configured as long as remote storage has
@@ -534,6 +547,7 @@ fn start_pageserver(
conf,
remote_storage.clone(),
disk_usage_eviction_state.clone(),
tenant_manager.clone(),
background_jobs_barrier.clone(),
)?;
}
@@ -546,11 +560,13 @@ fn start_pageserver(
let router_state = Arc::new(
http::routes::State::new(
conf,
tenant_manager,
http_auth.clone(),
remote_storage.clone(),
broker_client.clone(),
disk_usage_eviction_state,
deletion_queue.new_client(),
secondary_controller,
)
.context("Failed to initialize router state")?,
);

View File

@@ -266,7 +266,7 @@ async fn calculate_synthetic_size_worker(
continue;
}
if let Ok(tenant) = mgr::get_tenant(tenant_id, true).await {
if let Ok(tenant) = mgr::get_tenant(tenant_id, true) {
// TODO should we use concurrent_background_tasks_rate_limit() here, like the other background tasks?
// We can put in some prioritization for consumption metrics.
// Same for the loop that fetches computed metrics.

View File

@@ -206,7 +206,6 @@ pub(super) async fn collect_all_metrics(
None
} else {
crate::tenant::mgr::get_tenant(id, true)
.await
.ok()
.map(|tenant| (id, tenant))
}

View File

@@ -0,0 +1,2 @@
Checking pageserver v0.1.0 (/home/neon/neon/pageserver)
Finished dev [optimized + debuginfo] target(s) in 7.62s

View File

@@ -48,20 +48,23 @@ use std::{
};
use anyhow::Context;
use camino::Utf8Path;
use remote_storage::GenericRemoteStorage;
use serde::{Deserialize, Serialize};
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, instrument, warn, Instrument};
use utils::completion;
use utils::serde_percent::Percent;
use utils::{
completion,
id::{TenantId, TenantTimelineId},
};
use utils::{id::TimelineId, serde_percent::Percent};
use crate::{
config::PageServerConf,
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
tenant::{
self,
mgr::TenantManager,
secondary::SecondaryTenant,
storage_layer::{AsLayerDesc, EvictionError, Layer},
Timeline,
},
@@ -87,6 +90,7 @@ pub fn launch_disk_usage_global_eviction_task(
conf: &'static PageServerConf,
storage: GenericRemoteStorage,
state: Arc<State>,
tenant_manager: Arc<TenantManager>,
background_jobs_barrier: completion::Barrier,
) -> anyhow::Result<()> {
let Some(task_config) = &conf.disk_usage_based_eviction else {
@@ -112,8 +116,7 @@ pub fn launch_disk_usage_global_eviction_task(
_ = background_jobs_barrier.wait() => { }
};
disk_usage_eviction_task(&state, task_config, &storage, &conf.tenants_path(), cancel)
.await;
disk_usage_eviction_task(&state, task_config, &storage, tenant_manager, cancel).await;
Ok(())
},
);
@@ -126,7 +129,7 @@ async fn disk_usage_eviction_task(
state: &State,
task_config: &DiskUsageEvictionTaskConfig,
_storage: &GenericRemoteStorage,
tenants_dir: &Utf8Path,
tenant_manager: Arc<TenantManager>,
cancel: CancellationToken,
) {
scopeguard::defer! {
@@ -150,7 +153,8 @@ async fn disk_usage_eviction_task(
async {
let res =
disk_usage_eviction_task_iteration(state, task_config, tenants_dir, &cancel).await;
disk_usage_eviction_task_iteration(state, task_config, &tenant_manager, &cancel)
.await;
match res {
Ok(()) => {}
@@ -181,12 +185,14 @@ pub trait Usage: Clone + Copy + std::fmt::Debug {
async fn disk_usage_eviction_task_iteration(
state: &State,
task_config: &DiskUsageEvictionTaskConfig,
tenants_dir: &Utf8Path,
tenant_manager: &Arc<TenantManager>,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let usage_pre = filesystem_level_usage::get(tenants_dir, task_config)
let tenants_dir = tenant_manager.get_conf().tenants_path();
let usage_pre = filesystem_level_usage::get(&tenants_dir, task_config)
.context("get filesystem-level disk usage before evictions")?;
let res = disk_usage_eviction_task_iteration_impl(state, usage_pre, cancel).await;
let res =
disk_usage_eviction_task_iteration_impl(state, usage_pre, tenant_manager, cancel).await;
match res {
Ok(outcome) => {
debug!(?outcome, "disk_usage_eviction_iteration finished");
@@ -196,7 +202,7 @@ async fn disk_usage_eviction_task_iteration(
}
IterationOutcome::Finished(outcome) => {
// Verify with statvfs whether we made any real progress
let after = filesystem_level_usage::get(tenants_dir, task_config)
let after = filesystem_level_usage::get(&tenants_dir, task_config)
// It's quite unlikely to hit the error here. Keep the code simple and bail out.
.context("get filesystem-level disk usage after evictions")?;
@@ -271,6 +277,7 @@ struct LayerCount {
pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
state: &State,
usage_pre: U,
tenant_manager: &Arc<TenantManager>,
cancel: &CancellationToken,
) -> anyhow::Result<IterationOutcome<U>> {
// use tokio's mutex to get a Sync guard (instead of std::sync::Mutex)
@@ -290,7 +297,7 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
"running disk usage based eviction due to pressure"
);
let candidates = match collect_eviction_candidates(cancel).await? {
let candidates = match collect_eviction_candidates(tenant_manager, cancel).await? {
EvictionCandidates::Cancelled => {
return Ok(IterationOutcome::Cancelled);
}
@@ -326,7 +333,13 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
// If we get far enough in the list that we start to evict layers that are below
// the tenant's min-resident-size threshold, print a warning, and memorize the disk
// usage at that point, in 'usage_planned_min_resident_size_respecting'.
// Evictions for attached tenants, batched by timeline
let mut batched: HashMap<_, Vec<_>> = HashMap::new();
// Evictions for secondary locations, batched by tenant
let mut secondary_by_tenant: HashMap<TenantId, Vec<(TimelineId, Layer)>> = HashMap::new();
let mut warned = None;
let mut usage_planned = usage_pre;
let mut max_batch_size = 0;
@@ -349,14 +362,22 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
// FIXME: batching makes no sense anymore because of no layermap locking, should just spawn
// tasks to evict all seen layers until we have evicted enough
let batch = batched.entry(TimelineKey(candidate.timeline)).or_default();
match candidate.source {
EvictionCandidateSource::Attached(timeline) => {
let batch = batched.entry(TimelineKey(timeline)).or_default();
// semaphore will later be used to limit eviction concurrency, and we can express at
// most u32 number of permits. unlikely we would have u32::MAX layers to be evicted,
// but fail gracefully by not making batches larger.
if batch.len() < u32::MAX as usize {
batch.push(candidate.layer);
max_batch_size = max_batch_size.max(batch.len());
// semaphore will later be used to limit eviction concurrency, and we can express at
// most u32 number of permits. unlikely we would have u32::MAX layers to be evicted,
// but fail gracefully by not making batches larger.
if batch.len() < u32::MAX as usize {
batch.push(candidate.layer);
max_batch_size = max_batch_size.max(batch.len());
}
}
EvictionCandidateSource::Secondary(ttid) => {
let batch = secondary_by_tenant.entry(ttid.tenant_id).or_default();
batch.push((ttid.timeline_id, candidate.layer));
}
}
}
@@ -372,7 +393,20 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
};
debug!(?usage_planned, "usage planned");
// phase2: evict victims batched by timeline
// phase2 (secondary tenants): evict victims batched by tenant
for (tenant_id, timeline_layers) in secondary_by_tenant {
// Q: Why do we go via TenantManager again rather than just deleting files, or keeping
// an Arc ref to the secondary state?
// A: It's because a given tenant's local storage **belongs** to whoever is currently
// live in the TenantManager. We must avoid a race where we might plan an eviction
// for secondary, and then execute it when the tenant is actually in an attached state.
tenant_manager
.evict_tenant_layers(&tenant_id, timeline_layers)
.instrument(tracing::info_span!("evict_batch", %tenant_id))
.await;
}
// phase2 (attached tenants): evict victims batched by timeline
let mut js = tokio::task::JoinSet::new();
@@ -480,9 +514,18 @@ pub async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
}))
}
// An eviction candidate might originate from either an attached tenant
// with a [`Tenant`] and [`Timeline`] object, or from a secondary tenant
// location. These differ in how we will execute the eviction.
#[derive(Clone)]
enum EvictionCandidateSource {
Attached(Arc<Timeline>),
Secondary(TenantTimelineId),
}
#[derive(Clone)]
struct EvictionCandidate {
timeline: Arc<Timeline>,
source: EvictionCandidateSource,
layer: Layer,
last_activity_ts: SystemTime,
}
@@ -532,27 +575,18 @@ enum EvictionCandidates {
/// after exhauting the `Above` partition.
/// So, we did not respect each tenant's min_resident_size.
async fn collect_eviction_candidates(
tenant_manager: &Arc<TenantManager>,
cancel: &CancellationToken,
) -> anyhow::Result<EvictionCandidates> {
// get a snapshot of the list of tenants
let tenants = tenant::mgr::list_tenants()
.await
.context("get list of tenants")?;
let mut candidates = Vec::new();
for (tenant_id, _state) in &tenants {
let tenants = tenant_manager.get_attached_tenants();
for tenant in tenants {
if cancel.is_cancelled() {
return Ok(EvictionCandidates::Cancelled);
}
let tenant = match tenant::mgr::get_tenant(*tenant_id, true).await {
Ok(tenant) => tenant,
Err(e) => {
// this can happen if tenant has lifecycle transition after we fetched it
debug!("failed to get tenant: {e:#}");
continue;
}
};
// collect layers from all timelines in this tenant
//
@@ -615,7 +649,7 @@ async fn collect_eviction_candidates(
for (timeline, layer_info) in tenant_candidates.into_iter() {
let file_size = layer_info.file_size();
let candidate = EvictionCandidate {
timeline,
source: EvictionCandidateSource::Attached(timeline),
last_activity_ts: layer_info.last_activity_ts,
layer: layer_info.layer,
};
@@ -629,6 +663,43 @@ async fn collect_eviction_candidates(
}
}
// FIXME: this is a long loop over all secondary locations. At the least, respect
// cancellation here, but really we need to break up the loop. We could extract the
// Arc<SecondaryTenant>s and iterate over them with some tokio yields in there. Ideally
// though we should just reduce the total amount of work: our eviction goals do not require
// listing absolutely every layer in every tenant: we could sample this.
tenant_manager.foreach_secondary_tenants(
|tenant_id: &TenantId, state: &Arc<SecondaryTenant>| {
let mut tenant_candidates = Vec::new();
for (timeline_id, layer_info) in state.get_layers_for_eviction() {
debug!(tenant_id=%tenant_id, timeline_id=%timeline_id, "timeline resident layers (secondary) count: {}", layer_info.resident_layers.len());
tenant_candidates.extend(
layer_info.resident_layers
.into_iter()
.map(|layer_infos| (timeline_id, layer_infos)),
);
}
tenant_candidates
.sort_unstable_by_key(|(_, layer_info)| std::cmp::Reverse(layer_info.last_activity_ts));
candidates.extend(tenant_candidates.into_iter().map(|(timeline_id, candidate)| {
(
// Secondary locations' layers are always considered above the min resident size,
// i.e. secondary locations are permitted to be trimmed to zero layers if all
// the layers have sufficiently old access times.
MinResidentSizePartition::Above,
EvictionCandidate {
source: EvictionCandidateSource::Secondary(TenantTimelineId { tenant_id: *tenant_id, timeline_id}),
last_activity_ts: candidate.last_activity_ts,
layer: candidate.layer,
}
)
}));
},
);
debug_assert!(MinResidentSizePartition::Above < MinResidentSizePartition::Below,
"as explained in the function's doc comment, layers that aren't in the tenant's min_resident_size are evicted first");
candidates

View File

@@ -4,6 +4,7 @@
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use anyhow::{anyhow, Context, Result};
use futures::TryFutureExt;
@@ -36,8 +37,10 @@ use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::task_mgr::TaskKind;
use crate::tenant::config::{LocationConf, TenantConfOpt};
use crate::tenant::mgr::{
GetTenantError, SetNewTenantConfigError, TenantMapInsertError, TenantStateError,
GetTenantError, SetNewTenantConfigError, TenantManager, TenantMapError, TenantMapInsertError,
TenantSlotError, TenantSlotUpsertError, TenantStateError,
};
use crate::tenant::secondary::SecondaryController;
use crate::tenant::size::ModelInputs;
use crate::tenant::storage_layer::LayerAccessStatsReset;
use crate::tenant::timeline::Timeline;
@@ -63,22 +66,27 @@ use super::models::ConfigureFailpointsRequest;
pub struct State {
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<JwtAuth>>,
allowlist_routes: Vec<Uri>,
remote_storage: Option<GenericRemoteStorage>,
broker_client: storage_broker::BrokerClientChannel,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
deletion_queue_client: DeletionQueueClient,
secondary_controller: SecondaryController,
}
impl State {
#[allow(clippy::too_many_arguments)]
pub fn new(
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<JwtAuth>>,
remote_storage: Option<GenericRemoteStorage>,
broker_client: storage_broker::BrokerClientChannel,
disk_usage_eviction_state: Arc<disk_usage_eviction_task::State>,
deletion_queue_client: DeletionQueueClient,
secondary_controller: SecondaryController,
) -> anyhow::Result<Self> {
let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml", "/metrics"]
.iter()
@@ -86,12 +94,14 @@ impl State {
.collect::<Vec<_>>();
Ok(Self {
conf,
tenant_manager,
auth,
allowlist_routes,
remote_storage,
broker_client,
disk_usage_eviction_state,
deletion_queue_client,
secondary_controller,
})
}
@@ -147,28 +157,60 @@ impl From<PageReconstructError> for ApiError {
impl From<TenantMapInsertError> for ApiError {
fn from(tmie: TenantMapInsertError) -> ApiError {
match tmie {
TenantMapInsertError::StillInitializing | TenantMapInsertError::ShuttingDown => {
ApiError::ResourceUnavailable(format!("{tmie}").into())
}
TenantMapInsertError::TenantAlreadyExists(id, state) => {
ApiError::Conflict(format!("tenant {id} already exists, state: {state:?}"))
}
TenantMapInsertError::TenantExistsSecondary(id) => {
ApiError::Conflict(format!("tenant {id} already exists as secondary"))
}
TenantMapInsertError::SlotError(e) => e.into(),
TenantMapInsertError::SlotUpsertError(e) => e.into(),
TenantMapInsertError::Other(e) => ApiError::InternalServerError(e),
}
}
}
impl From<TenantSlotError> for ApiError {
fn from(e: TenantSlotError) -> ApiError {
use TenantSlotError::*;
match e {
NotFound(tenant_id) => {
ApiError::NotFound(anyhow::anyhow!("NotFound: tenant {tenant_id}").into())
}
e @ AlreadyExists(_, _) => ApiError::Conflict(format!("{e}")),
e @ Conflict(_) => ApiError::Conflict(format!("{e}")),
InProgress => {
ApiError::ResourceUnavailable("Tenant is being modified concurrently".into())
}
MapState(e) => e.into(),
}
}
}
impl From<TenantSlotUpsertError> for ApiError {
fn from(e: TenantSlotUpsertError) -> ApiError {
use TenantSlotUpsertError::*;
match e {
InternalError(e) => ApiError::InternalServerError(anyhow::anyhow!("{e}")),
MapState(e) => e.into(),
}
}
}
impl From<TenantMapError> for ApiError {
fn from(e: TenantMapError) -> ApiError {
use TenantMapError::*;
match e {
StillInitializing | ShuttingDown => {
ApiError::ResourceUnavailable(format!("{e}").into())
}
}
}
}
impl From<TenantStateError> for ApiError {
fn from(tse: TenantStateError) -> ApiError {
match tse {
TenantStateError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid).into()),
TenantStateError::IsStopping(_) => {
ApiError::ResourceUnavailable("Tenant is stopping".into())
}
_ => ApiError::InternalServerError(anyhow::Error::new(tse)),
TenantStateError::SlotError(e) => e.into(),
TenantStateError::SlotUpsertError(e) => e.into(),
TenantStateError::Other(e) => ApiError::InternalServerError(anyhow!(e)),
}
}
}
@@ -243,6 +285,9 @@ impl From<crate::tenant::delete::DeleteTenantError> for ApiError {
Get(g) => ApiError::from(g),
e @ AlreadyInProgress => ApiError::Conflict(e.to_string()),
Timeline(t) => ApiError::from(t),
NotAttached => ApiError::NotFound(anyhow::anyhow!("Tenant is not attached").into()),
SlotError(e) => e.into(),
SlotUpsertError(e) => e.into(),
Other(o) => ApiError::InternalServerError(o),
e @ InvalidState(_) => ApiError::PreconditionFailed(e.to_string().into_boxed_str()),
}
@@ -369,7 +414,7 @@ async fn timeline_create_handler(
let state = get_state(&request);
async {
let tenant = mgr::get_tenant(tenant_id, true).await?;
let tenant = mgr::get_tenant(tenant_id, true)?;
match tenant.create_timeline(
new_timeline_id,
request_data.ancestor_timeline_id.map(TimelineId::from),
@@ -416,7 +461,7 @@ async fn timeline_list_handler(
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let response_data = async {
let tenant = mgr::get_tenant(tenant_id, true).await?;
let tenant = mgr::get_tenant(tenant_id, true)?;
let timelines = tenant.list_timelines();
let mut response_data = Vec::with_capacity(timelines.len());
@@ -455,7 +500,7 @@ async fn timeline_detail_handler(
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let timeline_info = async {
let tenant = mgr::get_tenant(tenant_id, true).await?;
let tenant = mgr::get_tenant(tenant_id, true)?;
let timeline = tenant
.get_timeline(timeline_id, false)
@@ -713,7 +758,7 @@ async fn tenant_status(
check_permission(&request, Some(tenant_id))?;
let tenant_info = async {
let tenant = mgr::get_tenant(tenant_id, false).await?;
let tenant = mgr::get_tenant(tenant_id, false)?;
// Calculate total physical size of all timelines
let mut current_physical_size = 0;
@@ -776,7 +821,7 @@ async fn tenant_size_handler(
let headers = request.headers();
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let tenant = mgr::get_tenant(tenant_id, true).await?;
let tenant = mgr::get_tenant(tenant_id, true)?;
// this can be long operation
let inputs = tenant
@@ -1035,7 +1080,7 @@ async fn get_tenant_config_handler(
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
check_permission(&request, Some(tenant_id))?;
let tenant = mgr::get_tenant(tenant_id, false).await?;
let tenant = mgr::get_tenant(tenant_id, false)?;
let response = HashMap::from([
(
@@ -1079,6 +1124,9 @@ async fn put_tenant_location_config_handler(
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let request_data: TenantLocationConfigRequest = json_request(&mut request).await?;
let flush = parse_query_param(&request, "flush_ms")?.map(Duration::from_millis);
let tenant_id = request_data.tenant_id;
check_permission(&request, Some(tenant_id))?;
@@ -1094,7 +1142,7 @@ async fn put_tenant_location_config_handler(
.await
{
match e {
TenantStateError::NotFound(_) => {
TenantStateError::SlotError(TenantSlotError::NotFound(_)) => {
// This API is idempotent: a NotFound on a detach is fine.
}
_ => return Err(e.into()),
@@ -1106,20 +1154,14 @@ async fn put_tenant_location_config_handler(
let location_conf =
LocationConf::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
mgr::upsert_location(
state.conf,
tenant_id,
location_conf,
state.broker_client.clone(),
state.remote_storage.clone(),
state.deletion_queue_client.clone(),
&ctx,
)
.await
// TODO: badrequest assumes the caller was asking for something unreasonable, but in
// principle we might have hit something like concurrent API calls to the same tenant,
// which is not a 400 but a 409.
.map_err(ApiError::BadRequest)?;
state
.tenant_manager
.upsert_location(tenant_id, location_conf, flush, &ctx)
.await
// TODO: badrequest assumes the caller was asking for something unreasonable, but in
// principle we might have hit something like concurrent API calls to the same tenant,
// which is not a 400 but a 409.
.map_err(ApiError::BadRequest)?;
json_response(StatusCode::OK, ())
}
@@ -1132,7 +1174,6 @@ async fn handle_tenant_break(
let tenant_id: TenantId = parse_request_param(&r, "tenant_id")?;
let tenant = crate::tenant::mgr::get_tenant(tenant_id, true)
.await
.map_err(|_| ApiError::Conflict(String::from("no active tenant found")))?;
tenant.set_broken("broken from test".to_owned()).await;
@@ -1437,7 +1478,7 @@ async fn active_timeline_of_active_tenant(
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<Arc<Timeline>, ApiError> {
let tenant = mgr::get_tenant(tenant_id, true).await?;
let tenant = mgr::get_tenant(tenant_id, true)?;
tenant
.get_timeline(timeline_id, true)
.map_err(|e| ApiError::NotFound(e.into()))
@@ -1506,11 +1547,12 @@ async fn disk_usage_eviction_run(
)));
}
let state = state.disk_usage_eviction_state.clone();
let eviction_state = state.disk_usage_eviction_state.clone();
let cancel = CancellationToken::new();
let child_cancel = cancel.clone();
let _g = cancel.drop_guard();
let tenant_manager = state.tenant_manager.clone();
crate::task_mgr::spawn(
crate::task_mgr::BACKGROUND_RUNTIME.handle(),
@@ -1521,8 +1563,9 @@ async fn disk_usage_eviction_run(
false,
async move {
let res = crate::disk_usage_eviction_task::disk_usage_eviction_task_iteration_impl(
&state,
&eviction_state,
usage,
&tenant_manager,
&child_cancel,
)
.await;
@@ -1540,6 +1583,36 @@ async fn disk_usage_eviction_run(
json_response(StatusCode::OK, response)
}
async fn secondary_download_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let state = get_state(&request);
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
state
.secondary_controller
.download_tenant(tenant_id)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
}
async fn secondary_upload_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let state = get_state(&request);
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
state
.secondary_controller
.upload_tenant(tenant_id)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
}
async fn handler_404(_: Request<Body>) -> Result<Response<Body>, ApiError> {
json_response(
StatusCode::NOT_FOUND,
@@ -1776,6 +1849,16 @@ pub fn make_router(
.put("/v1/deletion_queue/flush", |r| {
api_handler(r, deletion_queue_flush)
})
.post("/v1/secondary/:tenant_id/upload", |r| {
testing_api_handler("force heatmap upload", r, secondary_upload_handler)
})
.post("/v1/secondary/:tenant_id/download", |r| {
testing_api_handler(
"force secondary layer download",
r,
secondary_download_handler,
)
})
.put("/v1/tenant/:tenant_id/break", |r| {
testing_api_handler("set tenant state to broken", r, handle_tenant_break)
})

View File

@@ -1314,7 +1314,7 @@ async fn get_active_tenant_with_timeout(
tenant_id: TenantId,
_ctx: &RequestContext, /* require get a context to support cancellation in the future */
) -> Result<Arc<Tenant>, GetActiveTenantError> {
let tenant = match mgr::get_tenant(tenant_id, false).await {
let tenant = match mgr::get_tenant(tenant_id, false) {
Ok(tenant) => tenant,
Err(e @ GetTenantError::NotFound(_)) => return Err(GetActiveTenantError::NotFound(e)),
Err(GetTenantError::NotActive(_)) => {

View File

@@ -257,6 +257,12 @@ pub enum TaskKind {
/// See [`crate::disk_usage_eviction_task`].
DiskUsageEviction,
/// See [`crate::tenant::secondary`].
SecondaryDownloads,
/// See [`crate::tenant::secondary`].
SecondaryUploads,
// Initial logical size calculation
InitialLogicalSizeCalculation,

View File

@@ -63,6 +63,7 @@ use self::timeline::TimelineResources;
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::deletion_queue::DeletionQueueClient;
use crate::deletion_queue::DeletionQueueError;
use crate::import_datadir;
use crate::is_uninit_mark;
use crate::metrics::TENANT_ACTIVATION;
@@ -130,6 +131,7 @@ pub mod storage_layer;
pub mod config;
pub mod delete;
pub mod mgr;
pub mod secondary;
pub mod tasks;
pub mod upload_queue;
@@ -192,6 +194,45 @@ struct TimelinePreload {
index_part: Result<MaybeDeletedIndexPart, DownloadError>,
}
/// To include the HeatmapWriter in tenant shutdown, we provide a hook
/// for it to publish a barrier when upload is going on. We will take
/// this and wait on it during shutdown, ensuring that there is no
/// upload going on once shutdown() returns.
pub struct HeatmapHook {
// Mutually exclude shutdown and any in-flight uploads
//
// If this is None, we are shutting down
in_progress: Option<Arc<tokio::sync::Mutex<()>>>,
}
impl Default for HeatmapHook {
fn default() -> Self {
Self {
in_progress: Some(Arc::default()),
}
}
}
impl HeatmapHook {
pub(crate) fn enter(&self) -> Option<tokio::sync::OwnedMutexGuard<()>> {
self.in_progress.as_ref().map(|l| {
l.clone()
.try_lock_owned()
// expect: shutdown cannot have started yet or in_progress would have been None,
// so we expect that only one HeatmapWriter may take this lock at once.
// Depends on the invariant that HeatmapWriter is the only thing that calls
// enter(), and that it will never try and do uploads concurrently for the same
// tenant.
.expect("Tried to double-lock HeatmapHook")
})
}
/// Returns a lock that the caller should wait on before proceeding with shutdown
fn shutdown(&mut self) -> Arc<tokio::sync::Mutex<()>> {
self.in_progress.take().expect("Called shutdown twice")
}
}
///
/// Tenant consists of multiple timelines. Keep them in a hash table.
///
@@ -243,6 +284,16 @@ pub struct Tenant {
eviction_task_tenant_state: tokio::sync::Mutex<EvictionTaskTenantState>,
pub(crate) delete_progress: Arc<tokio::sync::Mutex<DeleteTenantFlow>>,
pub(crate) heatmap_hook: Mutex<HeatmapHook>,
pub(crate) cancel: CancellationToken,
}
impl std::fmt::Debug for Tenant {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{} ({})", self.tenant_id, self.current_state())
}
}
pub(crate) enum WalRedoManager {
@@ -523,7 +574,7 @@ impl Tenant {
tenant_id: TenantId,
resources: TenantSharedResources,
attached_conf: AttachedTenantConf,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
tenants: &'static std::sync::RwLock<TenantsMap>,
expect_marker: AttachMarkerMode,
ctx: &RequestContext,
) -> anyhow::Result<Arc<Tenant>> {
@@ -890,7 +941,7 @@ impl Tenant {
attached_conf: AttachedTenantConf,
resources: TenantSharedResources,
init_order: Option<InitializationOrder>,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
tenants: &'static std::sync::RwLock<TenantsMap>,
ctx: &RequestContext,
) -> Arc<Tenant> {
span::debug_assert_current_span_has_tenant_id();
@@ -2023,6 +2074,8 @@ impl Tenant {
// It's mesed up.
// we just ignore the failure to stop
tracing::debug!("shutting down...");
self.cancel.cancel();
match self.set_stopping(shutdown_progress, false, false).await {
Ok(()) => {}
Err(SetStoppingError::Broken) => {
@@ -2030,6 +2083,7 @@ impl Tenant {
}
Err(SetStoppingError::AlreadyStopping(other)) => {
// give caller the option to wait for this this shutdown
info!("Tenant::shutdown: AlreadyStopping");
return Err(other);
}
};
@@ -2043,6 +2097,7 @@ impl Tenant {
js.spawn(async move { timeline.shutdown(freeze_and_flush).instrument(span).await });
})
};
tracing::debug!("shutdown waiting for timelines...");
while let Some(res) = js.join_next().await {
match res {
Ok(()) => {}
@@ -2052,12 +2107,23 @@ impl Tenant {
}
}
let heatmap_hook_lock = {
let mut hook = self.heatmap_hook.lock().unwrap();
hook.shutdown()
};
tracing::debug!("shutdown waiting heatmap uploads...");
// Take & drop lock to ensure any heatmap upload is complete.
drop(heatmap_hook_lock.lock().await);
tracing::debug!("shutdown waiting for tasks...");
// shutdown all tenant and timeline tasks: gc, compaction, page service
// No new tasks will be started for this tenant because it's in `Stopping` state.
//
// this will additionally shutdown and await all timeline tasks.
task_mgr::shutdown_tasks(None, Some(self.tenant_id), None).await;
tracing::debug!("shutdown complete");
Ok(())
}
@@ -2307,6 +2373,9 @@ where
}
impl Tenant {
pub fn get_tenant_id(&self) -> TenantId {
self.tenant_id
}
pub fn tenant_specific_overrides(&self) -> TenantConfOpt {
self.tenant_conf.read().unwrap().tenant_conf
}
@@ -2553,6 +2622,8 @@ impl Tenant {
cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())),
heatmap_hook: Mutex::default(),
cancel: CancellationToken::new(),
}
}
@@ -3362,6 +3433,30 @@ impl Tenant {
pub fn cached_synthetic_size(&self) -> u64 {
self.cached_synthetic_tenant_size.load(Ordering::Relaxed)
}
/// Flush any in-progress layers, schedule uploads, and wait for uploads to complete.
///
/// This function can take a long time: callers should wrap it in a timeout if calling
/// from an external API handler.
pub async fn flush_remote(&self) -> anyhow::Result<()> {
let timelines = self.timelines.lock().unwrap().clone();
for (timeline_id, timeline) in timelines {
tracing::info!(%timeline_id, "Flushing...");
timeline.freeze_and_flush().await?;
tracing::info!(%timeline_id, "Waiting for uploads...");
if let Some(client) = &timeline.remote_client {
client.wait_completion().await?;
}
}
match self.deletion_queue_client.flush_execute().await {
Ok(_) => {}
Err(DeletionQueueError::ShuttingDown) => {}
}
Ok(())
}
}
fn remove_timeline_and_uninit_mark(
@@ -4391,7 +4486,7 @@ mod tests {
metadata_bytes[8] ^= 1;
std::fs::write(metadata_path, metadata_bytes)?;
let err = harness.try_load(&ctx).await.err().expect("should fail");
let err = harness.try_load(&ctx).await.expect_err("should fail");
// get all the stack with all .context, not only the last one
let message = format!("{err:#}");
let expected = "failed to load metadata";

View File

@@ -21,7 +21,7 @@ use crate::{
};
use super::{
mgr::{GetTenantError, TenantsMap},
mgr::{GetTenantError, TenantSlotError, TenantSlotUpsertError, TenantsMap},
remote_timeline_client::{FAILED_REMOTE_OP_RETRIES, FAILED_UPLOAD_WARN_THRESHOLD},
span,
timeline::delete::DeleteTimelineFlow,
@@ -35,12 +35,21 @@ pub(crate) enum DeleteTenantError {
#[error("GetTenant {0}")]
Get(#[from] GetTenantError),
#[error("Tenant not attached")]
NotAttached,
#[error("Invalid state {0}. Expected Active or Broken")]
InvalidState(TenantState),
#[error("Tenant deletion is already in progress")]
AlreadyInProgress,
#[error("Tenant map slot error {0}")]
SlotError(#[from] TenantSlotError),
#[error("Tenant map slot upsert error {0}")]
SlotUpsertError(#[from] TenantSlotUpsertError),
#[error("Timeline {0}")]
Timeline(#[from] DeleteTimelineError),
@@ -301,12 +310,12 @@ impl DeleteTenantFlow {
pub(crate) async fn run(
conf: &'static PageServerConf,
remote_storage: Option<GenericRemoteStorage>,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
tenant_id: TenantId,
tenants: &'static std::sync::RwLock<TenantsMap>,
tenant: Arc<Tenant>,
) -> Result<(), DeleteTenantError> {
span::debug_assert_current_span_has_tenant_id();
let (tenant, mut guard) = Self::prepare(tenants, tenant_id).await?;
let mut guard = Self::prepare(&tenant).await?;
if let Err(e) = Self::run_inner(&mut guard, conf, remote_storage.as_ref(), &tenant).await {
tenant.set_broken(format!("{e:#}")).await;
@@ -411,7 +420,7 @@ impl DeleteTenantFlow {
guard: DeletionGuard,
tenant: &Arc<Tenant>,
init_order: Option<&InitializationOrder>,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
tenants: &'static std::sync::RwLock<TenantsMap>,
ctx: &RequestContext,
) -> Result<(), DeleteTenantError> {
let (_, progress) = completion::channel();
@@ -448,7 +457,7 @@ impl DeleteTenantFlow {
pub(crate) async fn resume_from_attach(
guard: DeletionGuard,
tenant: &Arc<Tenant>,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
tenants: &'static std::sync::RwLock<TenantsMap>,
ctx: &RequestContext,
) -> Result<(), DeleteTenantError> {
let (_, progress) = completion::channel();
@@ -474,15 +483,8 @@ impl DeleteTenantFlow {
}
async fn prepare(
tenants: &tokio::sync::RwLock<TenantsMap>,
tenant_id: TenantId,
) -> Result<(Arc<Tenant>, tokio::sync::OwnedMutexGuard<Self>), DeleteTenantError> {
let m = tenants.read().await;
let tenant = m
.get(&tenant_id)
.ok_or(GetTenantError::NotFound(tenant_id))?;
tenant: &Arc<Tenant>,
) -> Result<tokio::sync::OwnedMutexGuard<Self>, DeleteTenantError> {
// FIXME: unsure about active only. Our init jobs may not be cancellable properly,
// so at least for now allow deletions only for active tenants. TODO recheck
// Broken and Stopping is needed for retries.
@@ -516,14 +518,14 @@ impl DeleteTenantFlow {
)));
}
Ok((Arc::clone(tenant), guard))
Ok(guard)
}
fn schedule_background(
guard: OwnedMutexGuard<Self>,
conf: &'static PageServerConf,
remote_storage: Option<GenericRemoteStorage>,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
tenants: &'static std::sync::RwLock<TenantsMap>,
tenant: Arc<Tenant>,
) {
let tenant_id = tenant.tenant_id;
@@ -556,7 +558,7 @@ impl DeleteTenantFlow {
mut guard: OwnedMutexGuard<Self>,
conf: &PageServerConf,
remote_storage: Option<GenericRemoteStorage>,
tenants: &'static tokio::sync::RwLock<TenantsMap>,
tenants: &'static std::sync::RwLock<TenantsMap>,
tenant: &Arc<Tenant>,
) -> Result<(), DeleteTenantError> {
// Tree sort timelines, schedule delete for them. Mention retries from the console side.
@@ -604,7 +606,7 @@ impl DeleteTenantFlow {
.await
.context("cleanup_remaining_fs_traces")?;
let mut locked = tenants.write().await;
let mut locked = tenants.write().unwrap();
if locked.remove(&tenant.tenant_id).is_none() {
warn!("Tenant got removed from tenants map during deletion");
};

File diff suppressed because it is too large Load Diff

View File

@@ -202,7 +202,7 @@
//! [`Tenant::timeline_init_and_sync`]: super::Tenant::timeline_init_and_sync
//! [`Timeline::load_layer_map`]: super::Timeline::load_layer_map
mod download;
pub(crate) mod download;
pub mod index;
mod upload;
@@ -1495,6 +1495,23 @@ impl RemoteTimelineClient {
}
}
}
pub(crate) fn get_layers_metadata(
&self,
layers: Vec<LayerFileName>,
) -> anyhow::Result<Vec<Option<LayerFileMetadata>>> {
let q = self.upload_queue.lock().unwrap();
let q = match &*q {
UploadQueue::Stopped(_) | UploadQueue::Uninitialized => {
anyhow::bail!("queue is in state {}", q.as_str())
}
UploadQueue::Initialized(inner) => inner,
};
let decorated = layers.into_iter().map(|l| q.latest_files.get(&l).cloned());
Ok(decorated.collect())
}
}
pub fn remote_timelines_path(tenant_id: &TenantId) -> RemotePath {
@@ -1535,6 +1552,13 @@ pub fn remote_index_path(
.expect("Failed to construct path")
}
pub const HEATMAP_BASENAME: &str = "heatmap";
pub fn remote_heatmap_path(tenant_id: &TenantId) -> RemotePath {
RemotePath::from_string(&format!("tenants/{tenant_id}/{HEATMAP_BASENAME}-v01"))
.expect("Failed to construct path")
}
/// Given the key of an index, parse out the generation part of the name
pub(crate) fn parse_remote_index_path(path: RemotePath) -> Option<Generation> {
let file_name = match path.get_path().file_name() {

View File

@@ -0,0 +1,268 @@
pub mod downloader;
pub mod heatmap;
pub mod heatmap_writer;
use std::{sync::Arc, time::SystemTime};
use crate::{
config::PageServerConf,
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
};
use self::{
downloader::{downloader_task, SecondaryDetail},
heatmap_writer::heatmap_writer_task,
};
use super::{
mgr::TenantManager,
storage_layer::{AsLayerDesc, Layer},
timeline::DiskUsageEvictionInfo,
};
use remote_storage::GenericRemoteStorage;
use tokio_util::sync::CancellationToken;
use utils::{
completion::Barrier,
fs_ext,
id::{TenantId, TimelineId},
};
enum DownloadCommand {
Download(TenantId),
}
enum UploadCommand {
Upload(TenantId),
}
struct CommandRequest<T> {
payload: T,
response_tx: tokio::sync::oneshot::Sender<CommandResponse>,
}
struct CommandResponse {
result: anyhow::Result<()>,
}
// Whereas [`Tenant`] represents an attached tenant, this type represents the work
// we do for secondary tenant locations: where we are not serving clients or
// ingesting WAL, but we are maintaining a warm cache of layer files.
//
// This type is all about the _download_ path for secondary mode. The upload path
// runs while a regular attached `Tenant` exists.
//
// This structure coordinates TenantManager and SecondaryDownloader,
// so that the downloader can indicate which tenants it is currently
// operating on, and the manager can indicate when a particular
// secondary tenant should cancel any work in flight.
#[derive(Debug)]
pub(crate) struct SecondaryTenant {
/// Cancellation token indicates to SecondaryDownloader that it should stop doing
/// any work for this tenant at the next opportunity.
pub(crate) cancel: CancellationToken,
/// Lock must be held by SecondaryDownloader at any time that it might be operating
/// on the local filesystem directory for this tenant ID.
// Ordering: the TenantManager must set the cancellation token _before_
// taking the lock. The SecondaryDownloader must always check the cancellation
// token immediately _after_ taking the lock (and at appropriate intervals
// while holding it).
pub(crate) busy: Arc<tokio::sync::Mutex<()>>,
detail: std::sync::Mutex<SecondaryDetail>,
// TODO: propagate the `warm` from LocationConf into here, and respect it when doing downloads
}
impl SecondaryTenant {
pub(crate) fn new() -> Arc<Self> {
// TODO; consider whether we really need to Arc this
Arc::new(Self {
busy: Arc::new(tokio::sync::Mutex::new(())),
// todo: shall we make this a descendent of the
// main cancellation token, or is it sufficient that
// on shutdown we walk the tenants and fire their
// individual cancellations?
cancel: CancellationToken::new(),
detail: std::sync::Mutex::default(),
})
}
pub(crate) async fn shutdown(&self) {
self.cancel.cancel();
// Wait for any secondary downloader work to complete: once we
// acquire this lock, we are guaranteed that the secondary downloader
// won't touch the local filesystem again for this instance: it is safe
// to e.g. construct a `Tenant` for the same TenantId
drop(self.busy.lock().await);
}
pub(crate) fn get_layers_for_eviction(&self) -> Vec<(TimelineId, DiskUsageEvictionInfo)> {
self.detail.lock().unwrap().get_layers_for_eviction()
}
pub(crate) async fn evict_layers(
&self,
_guard: tokio::sync::OwnedMutexGuard<()>,
conf: &PageServerConf,
tenant_id: &TenantId,
layers: Vec<(TimelineId, Layer)>,
) {
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
if self.cancel.is_cancelled() {
// Eviction is a no-op if shutdown() was already called.
tracing::info!(
"Dropping {} layer evictions, secondary tenant shutting down",
layers.len()
);
return;
}
let now = SystemTime::now();
for (timeline_id, layer) in layers {
let layer_name = layer.layer_desc().filename();
let path = conf
.timeline_path(tenant_id, &timeline_id)
.join(&layer_name.file_name());
// We tolerate ENOENT, because between planning eviction and executing
// it, the secondary downloader could have seen an updated heatmap that
// resulted in a layer being deleted.
tokio::fs::remove_file(path)
.await
.or_else(fs_ext::ignore_not_found)
.expect("TODO: terminate process on local I/O errors");
// TODO: batch up updates instead of acquiring lock in inner loop
let mut detail = self.detail.lock().unwrap();
// If there is no timeline detail for what we just deleted, that indicates that
// the secondary downloader did some work (perhaps removing all)
if let Some(timeline_detail) = detail.timelines.get_mut(&timeline_id) {
timeline_detail.on_disk_layers.remove(&layer_name);
timeline_detail.evicted_at.insert(layer_name, now);
}
}
}
}
/// The SecondaryController is a pseudo-rpc client for administrative control of secondary mode downloads,
/// and heatmap uploads. This is not a hot data path: it's primarily a hook for tests,
/// where we want to immediately upload/download for a particular tenant. In normal operation
/// uploads & downloads are autonomous and not driven by this interface.
pub struct SecondaryController {
upload_req_tx: tokio::sync::mpsc::Sender<CommandRequest<UploadCommand>>,
download_req_tx: tokio::sync::mpsc::Sender<CommandRequest<DownloadCommand>>,
}
impl SecondaryController {
async fn dispatch<T>(
&self,
queue: &tokio::sync::mpsc::Sender<CommandRequest<T>>,
payload: T,
) -> anyhow::Result<()> {
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
queue
.send(CommandRequest {
payload,
response_tx,
})
.await
.map_err(|_| anyhow::anyhow!("Receiver shut down"))?;
let response = response_rx
.await
.map_err(|_| anyhow::anyhow!("Request dropped"))?;
response.result
}
pub async fn download_tenant(&self, tenant_id: TenantId) -> anyhow::Result<()> {
self.dispatch(&self.download_req_tx, DownloadCommand::Download(tenant_id))
.await
}
pub async fn upload_tenant(&self, tenant_id: TenantId) -> anyhow::Result<()> {
self.dispatch(&self.upload_req_tx, UploadCommand::Upload(tenant_id))
.await
}
}
pub fn spawn_tasks(
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
remote_storage: GenericRemoteStorage,
background_jobs_can_start: Barrier,
cancel: CancellationToken,
) -> SecondaryController {
let mgr_clone = tenant_manager.clone();
let storage_clone = remote_storage.clone();
let cancel_clone = cancel.clone();
let bg_jobs_clone = background_jobs_can_start.clone();
let (download_req_tx, download_req_rx) =
tokio::sync::mpsc::channel::<CommandRequest<DownloadCommand>>(16);
let (upload_req_tx, upload_req_rx) =
tokio::sync::mpsc::channel::<CommandRequest<UploadCommand>>(16);
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::SecondaryDownloads,
None,
None,
"secondary tenant downloads",
false,
async move {
downloader_task(
conf,
mgr_clone,
storage_clone,
download_req_rx,
bg_jobs_clone,
cancel_clone,
)
.await
},
);
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::SecondaryDownloads,
None,
None,
"heatmap uploads",
false,
async move {
heatmap_writer_task(
tenant_manager,
remote_storage,
upload_req_rx,
background_jobs_can_start,
cancel,
)
.await
},
);
SecondaryController {
download_req_tx,
upload_req_tx,
}
}
/// For running with remote storage disabled: a SecondaryController that is connected to nothing.
pub fn null_controller() -> SecondaryController {
let (download_req_tx, _download_req_rx) =
tokio::sync::mpsc::channel::<CommandRequest<DownloadCommand>>(16);
let (upload_req_tx, _upload_req_rx) =
tokio::sync::mpsc::channel::<CommandRequest<UploadCommand>>(16);
SecondaryController {
upload_req_tx,
download_req_tx,
}
}

View File

@@ -0,0 +1,579 @@
use std::{
collections::{HashMap, HashSet},
str::FromStr,
sync::Arc,
time::{Duration, Instant, SystemTime},
};
use crate::{
config::PageServerConf,
tenant::{
remote_timeline_client::index::LayerFileMetadata,
secondary::CommandResponse,
storage_layer::{Layer, LayerFileName},
timeline::{DiskUsageEvictionInfo, LocalLayerInfoForDiskUsageEviction},
},
METADATA_FILE_NAME,
};
use super::SecondaryTenant;
use crate::tenant::{
mgr::TenantManager,
remote_timeline_client::{download::download_layer_file, remote_heatmap_path},
};
use anyhow::Context;
use chrono::format::{DelayedFormat, StrftimeItems};
use remote_storage::GenericRemoteStorage;
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
use utils::{
completion::Barrier,
fs_ext,
id::{TenantId, TimelineId},
};
use super::{
heatmap::{HeatMapTenant, HeatMapTimeline},
CommandRequest, DownloadCommand,
};
/// Interval between checking if any Secondary tenants have download work to do:
/// note that this is _not_ the frequency with which we actually freshen the tenants,
/// just the frequency with which we wake up to decide whether anyone needs freshening.
///
/// Making this somewhat infrequent reduces the load on mutexes inside TenantManager
/// and SecondaryTenant for reads when checking for work to do.
const DOWNLOAD_CHECK_INTERVAL: Duration = Duration::from_millis(10000);
/// For each tenant, how long must have passed since the last freshen_tenant call before
/// calling it again. This is approximately the time by which local data is allowed
/// to fall behind remote data.
///
/// TODO: this should be an upper bound, and tenants that are uploading regularly
/// should adaptively freshen more often (e.g. a tenant writing 1 layer per second
/// should not wait a minute between freshens)
const DOWNLOAD_FRESHEN_INTERVAL: Duration = Duration::from_millis(60000);
#[derive(Debug, Clone)]
pub(super) struct OnDiskState {
layer: Layer,
access_time: SystemTime,
}
impl OnDiskState {
fn new(
conf: &'static PageServerConf,
tenant_id: &TenantId,
timeline_id: &TimelineId,
name: LayerFileName,
metadata: LayerFileMetadata,
access_time: SystemTime,
) -> Self {
Self {
layer: Layer::for_secondary(conf, tenant_id, timeline_id, name, metadata),
access_time,
}
}
}
#[derive(Debug, Clone, Default)]
pub(super) struct SecondaryDetailTimeline {
pub(super) on_disk_layers: HashMap<LayerFileName, OnDiskState>,
/// We remember when layers were evicted, to prevent re-downloading them.
/// TODO: persist this, so that we don't try and re-download everything on restart.
pub(super) evicted_at: HashMap<LayerFileName, SystemTime>,
}
/// This state is written by the secondary downloader, it is opaque
/// to TenantManager
#[derive(Default, Debug)]
pub(super) struct SecondaryDetail {
freshened_at: Option<Instant>,
pub(super) timelines: HashMap<TimelineId, SecondaryDetailTimeline>,
}
/// Helper for logging SystemTime
fn strftime(t: &'_ SystemTime) -> DelayedFormat<StrftimeItems<'_>> {
let datetime: chrono::DateTime<chrono::Utc> = (*t).into();
datetime.format("%d/%m/%Y %T")
}
impl SecondaryDetail {
pub(super) fn get_layers_for_eviction(&self) -> Vec<(TimelineId, DiskUsageEvictionInfo)> {
let mut result = Vec::new();
for (timeline_id, timeline_detail) in &self.timelines {
let layers: Vec<_> = timeline_detail
.on_disk_layers
.values()
.map(|ods| LocalLayerInfoForDiskUsageEviction {
layer: ods.layer.clone(),
last_activity_ts: ods.access_time,
})
.collect();
let max_layer_size = layers.iter().map(|l| l.layer.metadata().file_size()).max();
result.push((
*timeline_id,
DiskUsageEvictionInfo {
resident_layers: layers,
max_layer_size,
},
))
}
result
}
}
/// Keep trying to do downloads until the cancellation token is fired. Remote storage
/// errors are handled internally: any error returned by this function is an unexpected
/// internal error of some kind.
pub(super) async fn downloader_task(
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
remote_storage: GenericRemoteStorage,
mut command_queue: tokio::sync::mpsc::Receiver<CommandRequest<DownloadCommand>>,
background_jobs_can_start: Barrier,
cancel: CancellationToken,
) -> anyhow::Result<()> {
let downloader = SecondaryDownloader {
conf,
tenant_manager,
remote_storage,
cancel: cancel.clone(),
};
tracing::info!("Waiting for background_jobs_can start...");
background_jobs_can_start.wait().await;
tracing::info!("background_jobs_can is ready, proceeding.");
while !cancel.is_cancelled() {
downloader.iteration().await?;
tokio::select! {
_ = cancel.cancelled() => {
tracing::info!("Heatmap writer terminating");
break;
},
_ = tokio::time::sleep(DOWNLOAD_CHECK_INTERVAL) => {},
cmd = command_queue.recv() => {
let cmd = match cmd {
Some(c) =>c,
None => {
// SecondaryController was destroyed, and this has raced with
// our CancellationToken
tracing::info!("Heatmap writer terminating");
break;
}
};
let CommandRequest{
response_tx,
payload
} = cmd;
let result = downloader.handle_command(payload).await;
if response_tx.send(CommandResponse{result}).is_err() {
// Caller went away, e.g. because an HTTP request timed out
tracing::info!("Dropping response to administrative command")
}
}
}
}
Ok(())
}
struct SecondaryDownloader {
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
remote_storage: GenericRemoteStorage,
cancel: CancellationToken,
}
struct TenantJob {
tenant_id: TenantId,
secondary_state: Arc<SecondaryTenant>,
// This mutex guard conveys the right to write to the tenant's local directory: it must
// be taken before doing downloads, and TenantManager must ensure it has been released
// before it considers shutdown complete for the secondary state -- [`SecondaryDownloader`]
// will thereby never be racing with [`Tenant`] for access to local files.
_guard: tokio::sync::OwnedMutexGuard<()>,
}
impl SecondaryDownloader {
async fn iteration(&self) -> anyhow::Result<()> {
// Step 1: identify some tenants that we may work on
let mut candidates: Vec<TenantJob> = Vec::new();
self.tenant_manager
.foreach_secondary_tenants(|tenant_id, secondary_state| {
let guard = match secondary_state.busy.clone().try_lock_owned() {
Ok(guard) => guard,
// If we can't lock, someone is in the process of shutting it down, or we are
// already working on it. We may ignore it when scanning for new work to do.
Err(_) => return,
};
candidates.push(TenantJob {
tenant_id: *tenant_id,
secondary_state: secondary_state.clone(),
_guard: guard,
});
});
// Step 2: prioritized selection of next batch of tenants to freshen
let now = Instant::now();
let candidates = candidates.into_iter().filter(|c| {
let detail = c.secondary_state.detail.lock().unwrap();
match detail.freshened_at {
None => true, // Not yet freshened, therefore elegible to run
Some(t) => {
let since = now.duration_since(t);
since > DOWNLOAD_FRESHEN_INTERVAL
}
}
});
// TODO: don't just cut down the list, prioritize it to freshen the stalest tenants first
// TODO: bounded parallelism
// Step 3: spawn freshen_tenant tasks
for job in candidates {
if job.secondary_state.cancel.is_cancelled() {
continue;
}
async {
if let Err(e) = self.freshen_tenant(&job).await {
tracing::info!("Failed to freshen secondary content: {e:#}")
};
// Update freshened_at even if there was an error: we don't want errored tenants to implicitly
// take priority to run again.
let mut detail = job.secondary_state.detail.lock().unwrap();
detail.freshened_at = Some(Instant::now());
}
.instrument(tracing::info_span!(
"freshen_tenant",
tenant_id = %job.tenant_id
))
.await;
}
Ok(())
}
async fn handle_command(&self, command: DownloadCommand) -> anyhow::Result<()> {
match command {
DownloadCommand::Download(req_tenant_id) => {
let mut candidates: Vec<TenantJob> = Vec::new();
self.tenant_manager
.foreach_secondary_tenants(|tenant_id, secondary_state| {
tracing::info!("foreach_secondary: {tenant_id} ({req_tenant_id})");
if tenant_id == &req_tenant_id {
let guard = match secondary_state.busy.clone().try_lock_owned() {
Ok(guard) => guard,
// If we can't lock, someone is in the process of shutting it down, or we are
// already working on it. We may ignore it when scanning for new work to do.
Err(_) => return,
};
candidates.push(TenantJob {
tenant_id: *tenant_id,
secondary_state: secondary_state.clone(),
_guard: guard,
});
}
});
let tenant_job = if candidates.len() != 1 {
anyhow::bail!("Tenant not found in secondary mode");
} else {
candidates.pop().unwrap()
};
self.freshen_tenant(&tenant_job).await
}
}
}
async fn download_heatmap(&self, tenant_id: &TenantId) -> anyhow::Result<HeatMapTenant> {
// TODO: make download conditional on ETag having changed since last download
let heatmap_path = remote_heatmap_path(tenant_id);
// TODO: wrap this download in a select! that checks self.cancel
let mut download = self.remote_storage.download(&heatmap_path).await?;
let mut heatmap_bytes = Vec::new();
let _size = tokio::io::copy(&mut download.download_stream, &mut heatmap_bytes)
.await
.with_context(|| format!("download heatmap {heatmap_path:?}"))?;
Ok(serde_json::from_slice::<HeatMapTenant>(&heatmap_bytes)?)
}
async fn init_timeline_state(
&self,
tenant_id: &TenantId,
timeline_id: &TimelineId,
heatmap: &HeatMapTimeline,
) -> anyhow::Result<SecondaryDetailTimeline> {
let timeline_path = self.conf.timeline_path(tenant_id, timeline_id);
let mut detail = SecondaryDetailTimeline::default();
let mut dir = match tokio::fs::read_dir(&timeline_path).await {
Ok(d) => d,
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
tracing::info!("Creating timeline directory {timeline_path}");
tokio::fs::create_dir(&timeline_path).await?;
// No entries to report: drop out.
return Ok(detail);
} else {
return Err(e.into());
}
}
};
let heatmap_metadata: HashMap<_, _> = heatmap.layers.iter().map(|l| (&l.name, l)).collect();
while let Some(dentry) = dir.next_entry().await? {
let dentry_file_name = dentry.file_name();
let file_name = dentry_file_name.to_string_lossy();
let local_meta = dentry.metadata().await?;
// Secondary mode doesn't use local metadata files, but they might have been left behind by an attached tenant.
if file_name == METADATA_FILE_NAME {
continue;
}
match LayerFileName::from_str(&file_name) {
Ok(name) => {
let remote_meta = heatmap_metadata.get(&name);
match remote_meta {
Some(remote_meta) => {
// TODO: checksums for layers (https://github.com/neondatabase/neon/issues/2784)
if local_meta.len() != remote_meta.metadata.file_size {
// This should not happen, because we do crashsafe write-then-rename when downloading
// layers, and layers in remote storage are immutable. Remove the local file because
// we cannot trust it.
tracing::warn!("Removing local layer {name} with unexpected local size {} != {}",
local_meta.len(), remote_meta.metadata.file_size);
} else {
// We expect the access time to be initialized immediately afterwards, when
// the latest heatmap is applied to the state.
detail.on_disk_layers.insert(
name.clone(),
OnDiskState::new(
self.conf,
tenant_id,
timeline_id,
name,
LayerFileMetadata::from(&remote_meta.metadata),
remote_meta.access_time,
),
);
}
}
None => {
// FIXME: consider some optimization when transitioning from attached to secondary: maybe
// wait until we have seen a heatmap that is more recent than the most recent on-disk state? Otherwise
// we will end up deleting any layers which were created+uploaded more recently than the heatmap.
tracing::info!(
"Removing secondary local layer {} because it's absent in heatmap",
name
);
tokio::fs::remove_file(dentry.path()).await?;
}
}
}
Err(_) => {
// Ignore it.
tracing::warn!("Unexpected file in timeline directory: {file_name}");
}
}
}
Ok(detail)
}
async fn freshen_timeline(
&self,
job: &TenantJob,
timeline: HeatMapTimeline,
) -> anyhow::Result<()> {
let timeline_path = self
.conf
.timeline_path(&job.tenant_id, &timeline.timeline_id);
// Accumulate updates to the state
let mut touched = Vec::new();
// Clone a view of what layers already exist on disk
let timeline_state = job
.secondary_state
.detail
.lock()
.unwrap()
.timelines
.get(&timeline.timeline_id)
.cloned();
let timeline_state = match timeline_state {
Some(t) => t,
None => {
// We have no existing state: need to scan local disk for layers first.
self.init_timeline_state(&job.tenant_id, &timeline.timeline_id, &timeline)
.await?
}
};
let layers_in_heatmap = timeline
.layers
.iter()
.map(|l| &l.name)
.collect::<HashSet<_>>();
let layers_on_disk = timeline_state
.on_disk_layers
.iter()
.map(|l| l.0)
.collect::<HashSet<_>>();
// Remove on-disk layers that are no longer present in heatmap
for layer in layers_on_disk.difference(&layers_in_heatmap) {
let local_path = timeline_path.join(layer.to_string());
tracing::info!("Removing secondary local layer {layer} because it's absent in heatmap",);
tokio::fs::remove_file(&local_path)
.await
.or_else(fs_ext::ignore_not_found)?;
}
// Download heatmap layers that are not present on local disk, or update their
// access time if they are already present.
for layer in timeline.layers {
if self.cancel.is_cancelled() {
return Ok(());
}
// Existing on-disk layers: just update their access time.
if let Some(on_disk) = timeline_state.on_disk_layers.get(&layer.name) {
if on_disk.layer.metadata() != LayerFileMetadata::from(&layer.metadata)
|| on_disk.access_time != layer.access_time
{
// We already have this layer on disk. Update its access time.
tracing::trace!(
"Access time updated for layer {}: {} -> {}",
layer.name,
strftime(&on_disk.access_time),
strftime(&layer.access_time)
);
touched.push(layer);
}
continue;
}
// Eviction: if we evicted a layer, then do not re-download it unless it was accessed more
// recently than it was evicted.
if let Some(evicted_at) = timeline_state.evicted_at.get(&layer.name) {
if &layer.access_time > evicted_at {
tracing::info!(
"Re-downloading evicted layer {}, accessed at {}, evicted at {}",
layer.name,
strftime(&layer.access_time),
strftime(evicted_at)
);
} else {
tracing::trace!(
"Not re-downloading evicted layer {}, accessed at {}, evicted at {}",
layer.name,
strftime(&layer.access_time),
strftime(evicted_at)
);
continue;
}
}
match download_layer_file(
self.conf,
&self.remote_storage,
job.tenant_id,
timeline.timeline_id,
&layer.name,
&LayerFileMetadata::from(&layer.metadata),
)
.await
{
Ok(downloaded_bytes) => {
if downloaded_bytes != layer.metadata.file_size {
let local_path = timeline_path.join(layer.name.to_string());
tracing::error!(
"Downloaded layer {} with unexpected size {} != {}",
layer.name,
downloaded_bytes,
layer.metadata.file_size
);
tokio::fs::remove_file(&local_path)
.await
.or_else(fs_ext::ignore_not_found)?;
}
touched.push(layer)
}
Err(e) => {
// No retries here: secondary downloads don't have to succeed: if they fail we just proceed and expect
// that on some future call to freshen the download will work.
// TODO: refine this behavior.
tracing::info!("Failed to download layer {}: {}", layer.name, e);
}
}
}
// Write updates to state to record layers we just downloaded or touched.
{
let mut detail = job.secondary_state.detail.lock().unwrap();
let timeline_detail = detail.timelines.entry(timeline.timeline_id).or_default();
for t in touched {
use std::collections::hash_map::Entry;
match timeline_detail.on_disk_layers.entry(t.name.clone()) {
Entry::Occupied(mut v) => {
v.get_mut().access_time = t.access_time;
}
Entry::Vacant(e) => {
e.insert(OnDiskState::new(
self.conf,
&job.tenant_id,
&timeline.timeline_id,
t.name,
LayerFileMetadata::from(&t.metadata),
t.access_time,
));
}
}
}
}
Ok(())
}
async fn freshen_tenant(&self, job: &TenantJob) -> anyhow::Result<()> {
// Download the tenant's heatmap
let heatmap = self.download_heatmap(&job.tenant_id).await?;
// Download the layers in the heatmap
for timeline in heatmap.timelines {
if self.cancel.is_cancelled() {
return Ok(());
}
self.freshen_timeline(job, timeline).await?;
}
Ok(())
}
}

View File

@@ -0,0 +1,57 @@
use std::time::SystemTime;
use crate::tenant::{
remote_timeline_client::index::IndexLayerMetadata, storage_layer::LayerFileName,
};
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use utils::id::TimelineId;
#[derive(Serialize, Deserialize)]
pub(super) struct HeatMapTenant {
pub(super) timelines: Vec<HeatMapTimeline>,
}
#[derive(Serialize, Deserialize)]
pub(crate) struct HeatMapLayer {
pub(super) name: LayerFileName,
pub(super) metadata: IndexLayerMetadata,
pub(super) access_time: SystemTime,
// TODO: an actual 'heat' score that would let secondary locations prioritize downloading
// the hottest layers, rather than trying to simply mirror whatever layers are on-disk on the primary.
}
impl HeatMapLayer {
pub(crate) fn new(
name: LayerFileName,
metadata: IndexLayerMetadata,
access_time: SystemTime,
) -> Self {
Self {
name,
metadata,
access_time,
}
}
}
#[serde_as]
#[derive(Serialize, Deserialize)]
pub(crate) struct HeatMapTimeline {
#[serde_as(as = "DisplayFromStr")]
pub(super) timeline_id: TimelineId,
pub(super) layers: Vec<HeatMapLayer>,
}
impl HeatMapTimeline {
pub(crate) fn new(timeline_id: TimelineId, layers: Vec<HeatMapLayer>) -> Self {
Self {
timeline_id,
layers,
}
}
}

View File

@@ -0,0 +1,207 @@
use std::{collections::HashMap, sync::Arc, time::Duration};
use crate::tenant::{
mgr::TenantManager, remote_timeline_client::remote_heatmap_path, secondary::CommandResponse,
Tenant,
};
use pageserver_api::models::TenantState;
use remote_storage::GenericRemoteStorage;
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
use utils::{backoff, completion::Barrier};
use super::{heatmap::HeatMapTenant, CommandRequest, UploadCommand};
const HEATMAP_UPLOAD_INTERVAL: Duration = Duration::from_millis(60000);
pub(super) async fn heatmap_writer_task(
tenant_manager: Arc<TenantManager>,
remote_storage: GenericRemoteStorage,
mut command_queue: tokio::sync::mpsc::Receiver<CommandRequest<UploadCommand>>,
background_jobs_can_start: Barrier,
cancel: CancellationToken,
) -> anyhow::Result<()> {
let writer = HeatmapWriter {
tenant_manager,
remote_storage,
cancel: cancel.clone(),
};
tracing::info!("Waiting for background_jobs_can start...");
background_jobs_can_start.wait().await;
tracing::info!("background_jobs_can is ready, proceeding.");
while !cancel.is_cancelled() {
writer.iteration().await?;
tokio::select! {
_ = cancel.cancelled() => {
tracing::info!("Heatmap writer terminating");
break;
},
_ = tokio::time::sleep(HEATMAP_UPLOAD_INTERVAL) => {},
cmd = command_queue.recv() => {
let cmd = match cmd {
Some(c) =>c,
None => {
// SecondaryController was destroyed, and this has raced with
// our CancellationToken
tracing::info!("Heatmap writer terminating");
break;
}
};
let CommandRequest{
response_tx,
payload
} = cmd;
let result = writer.handle_command(payload).await;
if response_tx.send(CommandResponse{result}).is_err() {
// Caller went away, e.g. because an HTTP request timed out
tracing::info!("Dropping response to administrative command")
}
}
}
}
Ok(())
}
struct HeatmapWriter {
tenant_manager: Arc<TenantManager>,
remote_storage: GenericRemoteStorage,
cancel: CancellationToken,
}
impl HeatmapWriter {
async fn iteration(&self) -> anyhow::Result<()> {
let tenants = self.tenant_manager.get_attached_tenants();
for tenant in tenants {
if self.cancel.is_cancelled() {
return Ok(());
}
if tenant.current_state() != TenantState::Active {
continue;
}
// TODO: add a mechanism to check whether the active layer set has
// changed since our last write
// TODO: add a minimum time between uploads
match self
.write_tenant(&tenant)
.instrument(tracing::info_span!(
"write_tenant",
tenant_id = %tenant.get_tenant_id()
))
.await
{
Ok(()) => {}
Err(e) => {
tracing::warn!(
"Failed to upload heatmap for tenant {}: {e:#}",
tenant.get_tenant_id(),
)
}
}
}
Ok(())
}
async fn handle_command(&self, command: UploadCommand) -> anyhow::Result<()> {
match command {
UploadCommand::Upload(tenant_id) => {
let tenants = self.tenant_manager.get_attached_tenants();
let map = tenants
.iter()
.map(|t| (t.get_tenant_id(), t))
.collect::<HashMap<_, _>>();
match map.get(&tenant_id) {
Some(tenant) => self.write_tenant(tenant).await,
None => {
anyhow::bail!("Tenant is not attached");
}
}
}
}
}
async fn write_tenant(&self, tenant: &Arc<Tenant>) -> anyhow::Result<()> {
let mut heatmap = HeatMapTenant {
timelines: Vec::new(),
};
let timelines = tenant.timelines.lock().unwrap().clone();
let tenant_cancel = tenant.cancel.clone();
// Ensure that Tenant::shutdown waits for any upload in flight
let _guard = {
let hook = tenant.heatmap_hook.lock().unwrap();
match hook.enter() {
Some(g) => g,
None => {
// Tenant is shutting down
tracing::info!("Skipping, tenant is shutting down");
return Ok(());
}
}
};
for (timeline_id, timeline) in timelines {
let heatmap_timeline = timeline.generate_heatmap().await;
match heatmap_timeline {
None => {
tracing::debug!(
"Skipping heatmap upload because timeline {timeline_id} is not ready"
);
return Ok(());
}
Some(heatmap_timeline) => {
heatmap.timelines.push(heatmap_timeline);
}
}
}
// Serialize the heatmap
let bytes = serde_json::to_vec(&heatmap)?;
let size = bytes.len();
let path = remote_heatmap_path(&tenant.get_tenant_id());
// Write the heatmap.
tracing::debug!("Uploading {size} byte heatmap to {path}");
if let Err(e) = backoff::retry(
|| async {
let bytes = tokio::io::BufReader::new(std::io::Cursor::new(bytes.clone()));
let bytes = Box::new(bytes);
self.remote_storage
.upload_storage_object(bytes, size, &path)
.await
},
|_| false,
3,
u32::MAX,
"Uploading heatmap",
backoff::Cancel::new(tenant_cancel.clone(), || anyhow::anyhow!("Shutting down")),
)
.await
{
if tenant_cancel.is_cancelled() {
return Ok(());
} else {
return Err(e);
}
}
tracing::info!("Successfully uploading {size} byte heatmap to {path}");
Ok(())
}
}

View File

@@ -8,6 +8,7 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Weak};
use std::time::SystemTime;
use tracing::Instrument;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use utils::sync::heavier_once_cell;
@@ -82,7 +83,38 @@ impl Layer {
let owner = Layer(Arc::new(LayerInner::new(
conf,
timeline,
Arc::downgrade(timeline),
access_stats,
desc,
None,
metadata.generation,
)));
debug_assert!(owner.0.needs_download_blocking().unwrap().is_some());
owner
}
/// A layer which is resident locally in a secondary location: not associated with a live Timeline object.
pub(crate) fn for_secondary(
conf: &'static PageServerConf,
tenant_id: &TenantId,
timeline_id: &TimelineId,
file_name: LayerFileName,
metadata: LayerFileMetadata,
) -> Self {
let desc = PersistentLayerDesc::from_filename(
*tenant_id,
*timeline_id,
file_name,
metadata.file_size(),
);
let access_stats = LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted);
let owner = Layer(Arc::new(LayerInner::new(
conf,
Weak::default(),
access_stats,
desc,
None,
@@ -121,7 +153,7 @@ impl Layer {
LayerInner::new(
conf,
timeline,
Arc::downgrade(timeline),
access_stats,
desc,
Some(inner),
@@ -163,7 +195,7 @@ impl Layer {
);
LayerInner::new(
conf,
timeline,
Arc::downgrade(timeline),
access_stats,
desc,
Some(inner),
@@ -496,22 +528,27 @@ impl Drop for LayerInner {
impl LayerInner {
fn new(
conf: &'static PageServerConf,
timeline: &Arc<Timeline>,
timeline: Weak<Timeline>,
access_stats: LayerAccessStats,
desc: PersistentLayerDesc,
downloaded: Option<Arc<DownloadedLayer>>,
generation: Generation,
) -> Self {
let path = conf
.timeline_path(&timeline.tenant_id, &timeline.timeline_id)
.timeline_path(&desc.tenant_id, &desc.timeline_id)
.join(desc.filename().to_string());
let have_remote_client = timeline
.upgrade()
.map(|t| t.remote_client.is_some())
.unwrap_or(false);
LayerInner {
conf,
path,
desc,
timeline: Arc::downgrade(timeline),
have_remote_client: timeline.remote_client.is_some(),
timeline,
have_remote_client,
access_stats,
wanted_garbage_collected: AtomicBool::new(false),
wanted_evicted: AtomicBool::new(false),

View File

@@ -88,8 +88,9 @@ use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::config::TenantConf;
use super::remote_timeline_client::index::IndexPart;
use super::remote_timeline_client::index::{IndexLayerMetadata, IndexPart};
use super::remote_timeline_client::RemoteTimelineClient;
use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline};
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
@@ -1969,6 +1970,55 @@ impl Timeline {
None
}
/// The timeline heatmap is a hint to secondary locations from the primary location,
/// indicating which layers are currently on-disk on the primary.
///
/// None is returned if the Timeline is in a state where uploading a heatmap
/// doesn't make sense, such as shutting down or initializing. The caller
/// should treat this as a cue to simply skip doing any heatmap uploading
/// for this timeline.
pub(crate) async fn generate_heatmap(&self) -> Option<HeatMapTimeline> {
let eviction_info = self.get_local_layers_for_disk_usage_eviction().await;
let remote_client = match &self.remote_client {
Some(c) => c,
None => return None,
};
let layer_file_names = eviction_info
.resident_layers
.iter()
.map(|l| l.layer.layer_desc().filename())
.collect::<Vec<_>>();
let decorated = match remote_client.get_layers_metadata(layer_file_names) {
Ok(d) => d,
Err(_) => {
// Getting metadata only fails on Timeline in bad state.
return None;
}
};
let heatmap_layers = std::iter::zip(
eviction_info.resident_layers.into_iter(),
decorated.into_iter(),
)
.filter_map(|(layer, remote_info)| {
remote_info.map(|remote_info| {
HeatMapLayer::new(
layer.layer.layer_desc().filename(),
IndexLayerMetadata::from(remote_info),
layer.last_activity_ts,
)
})
});
Some(HeatMapTimeline::new(
self.timeline_id,
heatmap_layers.collect(),
))
}
}
type TraversalId = String;

View File

@@ -344,20 +344,7 @@ impl Timeline {
// Make one of the tenant's timelines draw the short straw and run the calculation.
// The others wait until the calculation is done so that they take into account the
// imitated accesses that the winner made.
//
// It is critical we are responsive to cancellation here. Otherwise, we deadlock with
// tenant deletion (holds TENANTS in read mode) any other task that attempts to
// acquire TENANTS in write mode before we here call get_tenant.
// See https://github.com/neondatabase/neon/issues/5284.
let res = tokio::select! {
_ = cancel.cancelled() => {
return ControlFlow::Break(());
}
res = crate::tenant::mgr::get_tenant(self.tenant_id, true) => {
res
}
};
let tenant = match res {
let tenant = match crate::tenant::mgr::get_tenant(self.tenant_id, true) {
Ok(t) => t,
Err(_) => {
return ControlFlow::Break(());

View File

@@ -22,6 +22,11 @@ https://docs.pytest.org/en/6.2.x/logging.html
# log format is specified in pytest.ini file
LOGGING = {
"version": 1,
"formatters": {
"standard": {
"datefmt": "%m/%d/%Y %I:%M:%SZ %p %Z",
}
},
"loggers": {
"root": {"level": "INFO"},
"root.safekeeper_async": {"level": "INFO"}, # a lot of logs on DEBUG level

View File

@@ -1707,7 +1707,7 @@ class NeonPageserver(PgProtocol):
@property
def workdir(self) -> Path:
return Path(os.path.join(self.env.repo_dir, f"pageserver_{self.id}"))
return self.env.repo_dir / f"pageserver_{self.id}"
def assert_no_errors(self):
logfile = open(os.path.join(self.workdir, "pageserver.log"), "r")
@@ -1773,6 +1773,16 @@ class NeonPageserver(PgProtocol):
client = self.http_client()
return client.tenant_attach(tenant_id, config, config_null, generation=generation)
def tenant_location_configure(self, tenant_id: TenantId, config: dict[str, Any], **kwargs):
# This API is only for use when generations are enabled
assert self.env.attachment_service is not None
if config["mode"].startswith("Attached") and "generation" not in config:
config["generation"] = self.env.attachment_service.attach_hook(tenant_id, self.id)
client = self.http_client()
return client.tenant_location_conf(tenant_id, config, **kwargs)
def append_pageserver_param_overrides(
params_to_update: List[str],
@@ -2629,6 +2639,7 @@ class EndpointFactory:
lsn: Optional[Lsn] = None,
hot_standby: bool = False,
config_lines: Optional[List[str]] = None,
pageserver_id: Optional[int] = None,
) -> Endpoint:
ep = Endpoint(
self.env,
@@ -2648,6 +2659,7 @@ class EndpointFactory:
lsn=lsn,
hot_standby=hot_standby,
config_lines=config_lines,
pageserver_id=pageserver_id,
)
def stop_all(self) -> "EndpointFactory":

View File

@@ -247,6 +247,23 @@ class PageserverHttpClient(requests.Session):
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/detach", params=params)
self.verbose_error(res)
def tenant_location_conf(
self, tenant_id: TenantId, location_conf=dict[str, Any], flush_ms=None
):
body = location_conf.copy()
body["tenant_id"] = str(tenant_id)
params = {}
if flush_ms is not None:
params["flush_ms"] = str(flush_ms)
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/location_config",
json=body,
params=params,
)
self.verbose_error(res)
def tenant_delete(self, tenant_id: TenantId):
res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}")
self.verbose_error(res)
@@ -650,6 +667,14 @@ class PageserverHttpClient(requests.Session):
res = self.put(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/break")
self.verbose_error(res)
def secondary_tenant_upload(self, tenant_id: TenantId):
res = self.post(f"http://localhost:{self.port}/v1/secondary/{tenant_id}/upload")
self.verbose_error(res)
def secondary_tenant_download(self, tenant_id: TenantId):
res = self.post(f"http://localhost:{self.port}/v1/secondary/{tenant_id}/download")
self.verbose_error(res)
def post_tracing_event(self, level: str, message: str):
res = self.post(
f"http://localhost:{self.port}/v1/tracing/event",

View File

@@ -249,7 +249,7 @@ def assert_prefix_empty(neon_env_builder: "NeonEnvBuilder", prefix: Optional[str
# this has been seen in the wild by tests with the below contradicting logging
# https://neon-github-public-dev.s3.amazonaws.com/reports/pr-5322/6207777020/index.html#suites/3556ed71f2d69272a7014df6dcb02317/53b5c368b5a68865
# this seems like a mock_s3 issue
log.warn(
log.warning(
f"contrading ListObjectsV2 response with KeyCount={keys} and Contents={objects}, CommonPrefixes={common_prefixes}, assuming this means KeyCount=0"
)
keys = 0
@@ -257,7 +257,7 @@ def assert_prefix_empty(neon_env_builder: "NeonEnvBuilder", prefix: Optional[str
# this has been seen in one case with mock_s3:
# https://neon-github-public-dev.s3.amazonaws.com/reports/pr-4938/6000769714/index.html#suites/3556ed71f2d69272a7014df6dcb02317/ca01e4f4d8d9a11f
# looking at moto impl, it might be there's a race with common prefix (sub directory) not going away with deletes
log.warn(
log.warning(
f"contradicting ListObjectsV2 response with KeyCount={keys} and Contents={objects}, CommonPrefixes={common_prefixes}"
)

View File

@@ -0,0 +1,133 @@
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
Endpoint,
NeonEnv,
last_flush_lsn_upload,
wait_for_last_flush_lsn,
)
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.types import TenantId, TimelineId
class Workload:
"""
This is not a general purpose load generator: it exists for storage tests that need to inject some
high level types of storage work via the postgres interface:
- layer writes (`write_rows`)
- work for compaction (`churn_rows`)
- reads, checking we get the right data (`validate`)
"""
def __init__(self, env: NeonEnv, tenant_id: TenantId, timeline_id: TimelineId):
self.env = env
self.tenant_id = tenant_id
self.timeline_id = timeline_id
self.table = "foo"
self.expect_rows = 0
self.churn_cursor = 0
self.endpoints: dict[int, Endpoint] = {}
def endpoint(self, pageserver_id: int):
if pageserver_id not in self.endpoints:
self.endpoints[pageserver_id] = self.env.endpoints.create(
"main",
tenant_id=self.tenant_id,
pageserver_id=pageserver_id,
endpoint_id=f"ep-{pageserver_id}",
)
endpoint = self.endpoints[pageserver_id]
assert not endpoint.running
endpoint.start(pageserver_id=pageserver_id)
return endpoint
def init(self, pageserver_id: int):
with self.endpoint(pageserver_id) as endpoint:
endpoint.safe_psql(f"CREATE TABLE {self.table} (id INTEGER PRIMARY KEY, val text);")
endpoint.safe_psql("CREATE EXTENSION IF NOT EXISTS neon_test_utils;")
last_flush_lsn_upload(
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
)
def write_rows(self, n, pageserver_id):
with self.endpoint(pageserver_id) as endpoint:
start = self.expect_rows
end = start + n - 1
self.expect_rows += n
dummy_value = "blah"
endpoint.safe_psql(
f"""
INSERT INTO {self.table} (id, val)
SELECT g, '{dummy_value}'
FROM generate_series({start}, {end}) g
"""
)
return last_flush_lsn_upload(
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
)
def churn_rows(self, n, pageserver_id, upload=True):
assert self.expect_rows >= n
max_iters = 10
with self.endpoint(pageserver_id) as endpoint:
todo = n
i = 0
while todo > 0:
i += 1
if i > max_iters:
raise RuntimeError("oops")
start = self.churn_cursor % self.expect_rows
n_iter = min((self.expect_rows - start), todo)
todo -= n_iter
end = start + n_iter - 1
log.info(
f"start,end = {start},{end}, cursor={self.churn_cursor}, expect_rows={self.expect_rows}"
)
assert end < self.expect_rows
self.churn_cursor += n_iter
dummy_value = "blah"
endpoint.safe_psql_many(
[
f"""
INSERT INTO {self.table} (id, val)
SELECT g, '{dummy_value}'
FROM generate_series({start}, {end}) g
ON CONFLICT (id) DO UPDATE
SET val = EXCLUDED.val
""",
f"VACUUM {self.table}",
]
)
last_flush_lsn = wait_for_last_flush_lsn(
self.env, endpoint, self.tenant_id, self.timeline_id, pageserver_id=pageserver_id
)
ps_http = self.env.get_pageserver(pageserver_id).http_client()
wait_for_last_record_lsn(ps_http, self.tenant_id, self.timeline_id, last_flush_lsn)
if upload:
# force a checkpoint to trigger upload
ps_http.timeline_checkpoint(self.tenant_id, self.timeline_id)
wait_for_upload(ps_http, self.tenant_id, self.timeline_id, last_flush_lsn)
def validate(self, pageserver_id):
with self.endpoint(pageserver_id) as endpoint:
result = endpoint.safe_psql_many(
[
"select clear_buffer_cache()",
f"""
SELECT COUNT(*) FROM {self.table}
""",
]
)
log.info(f"validate({self.expect_rows}): {result}")
assert result == [[("",)], [(self.expect_rows,)]]

View File

@@ -24,12 +24,19 @@ from fixtures.neon_fixtures import (
last_flush_lsn_upload,
wait_for_last_flush_lsn,
)
from fixtures.pageserver.utils import list_prefix
from fixtures.pageserver.http import PageserverApiException
from fixtures.pageserver.utils import (
assert_tenant_state,
list_prefix,
wait_for_last_record_lsn,
wait_for_upload,
)
from fixtures.remote_storage import (
RemoteStorageKind,
)
from fixtures.types import TenantId, TimelineId
from fixtures.utils import print_gc_result, wait_until
from fixtures.workload import Workload
# A tenant configuration that is convenient for generating uploads and deletions
# without a large amount of postgres traffic.
@@ -532,3 +539,91 @@ def test_eviction_across_generations(neon_env_builder: NeonEnvBuilder):
read_all(env, tenant_id, timeline_id)
evict_all_layers(env, tenant_id, timeline_id)
read_all(env, tenant_id, timeline_id)
def test_multi_attach(
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
):
neon_env_builder.enable_generations = True
neon_env_builder.num_pageservers = 3
neon_env_builder.enable_pageserver_remote_storage(
remote_storage_kind=RemoteStorageKind.MOCK_S3,
)
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
pageservers = env.pageservers
http_clients = list([p.http_client() for p in pageservers])
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
# We will intentionally create situations where stale deletions happen from non-latest-generation
# nodes when the tenant is multiply-attached
for ps in env.pageservers:
ps.allowed_errors.extend(
[".*Dropped remote consistent LSN updates.*", ".*Dropping stale deletions.*"]
)
# Initially, the tenant will be attached to the pageserver a (first is default in our test harness)
wait_until(10, 0.2, lambda: assert_tenant_state(http_clients[0], tenant_id, "Active"))
_detail = http_clients[0].timeline_detail(tenant_id, timeline_id)
with pytest.raises(PageserverApiException):
http_clients[1].timeline_detail(tenant_id, timeline_id)
with pytest.raises(PageserverApiException):
http_clients[2].timeline_detail(tenant_id, timeline_id)
workload = Workload(env, tenant_id, timeline_id)
workload.init(pageservers[0].id)
workload.write_rows(1000, pageservers[0].id)
# Attach the tenant to the other two pageservers
pageservers[1].tenant_attach(env.initial_tenant)
pageservers[2].tenant_attach(env.initial_tenant)
wait_until(10, 0.2, lambda: assert_tenant_state(http_clients[1], tenant_id, "Active"))
wait_until(10, 0.2, lambda: assert_tenant_state(http_clients[2], tenant_id, "Active"))
# Now they all have it attached
_detail = http_clients[0].timeline_detail(tenant_id, timeline_id)
_detail = http_clients[1].timeline_detail(tenant_id, timeline_id)
_detail = http_clients[2].timeline_detail(tenant_id, timeline_id)
# The endpoint can use any pageserver to service its reads
for pageserver in pageservers:
workload.validate(pageserver.id)
# If we write some more data, all the nodes can see it, including stale ones
wrote_lsn = workload.write_rows(1000, pageservers[0].id)
for ps_http in http_clients:
wait_for_last_record_lsn(ps_http, tenant_id, timeline_id, wrote_lsn)
# ...and indeed endpoints can see it via any of the pageservers
for pageserver in pageservers:
workload.validate(pageserver.id)
# Prompt all the pageservers, including stale ones, to upload ingested layers to remote storage
for ps_http in http_clients:
ps_http.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload(ps_http, tenant_id, timeline_id, wrote_lsn)
# Now, the contents of remote storage will be a set of layers from each pageserver, but with unique
# generation numbers
# TODO: validate remote storage contents
# Stop all pageservers
for ps in pageservers:
ps.stop()
# Returning to a normal healthy state: all pageservers will start, but only the one most
# recently attached via the control plane will re-attach on startup
for ps in pageservers:
ps.start()
with pytest.raises(PageserverApiException):
_detail = http_clients[0].timeline_detail(tenant_id, timeline_id)
with pytest.raises(PageserverApiException):
_detail = http_clients[1].timeline_detail(tenant_id, timeline_id)
_detail = http_clients[2].timeline_detail(tenant_id, timeline_id)
# All data we wrote while multi-attached remains readable
workload.validate(pageservers[2].id)

View File

@@ -0,0 +1,468 @@
import random
from pathlib import Path
from typing import Any, Dict, Optional
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, NeonPageserver
from fixtures.pageserver.utils import assert_prefix_empty, tenant_delete_wait_completed
from fixtures.remote_storage import RemoteStorageKind
from fixtures.types import TenantId, TimelineId
from fixtures.utils import wait_until
from fixtures.workload import Workload
# A tenant configuration that is convenient for generating uploads and deletions
# without a large amount of postgres traffic.
TENANT_CONF = {
# small checkpointing and compaction targets to ensure we generate many upload operations
"checkpoint_distance": f"{128 * 1024}",
"compaction_target_size": f"{128 * 1024}",
"compaction_threshold": "1",
# no PITR horizon, we specify the horizon when we request on-demand GC
"pitr_interval": "0s",
# disable background compaction and GC. We invoke it manually when we want it to happen.
"gc_period": "0s",
"compaction_period": "0s",
# create image layers eagerly, so that GC can remove some layers
"image_creation_threshold": "1",
}
def evict_random_layers(
rng: random.Random, pageserver: NeonPageserver, tenant_id: TenantId, timeline_id: TimelineId
):
"""
Evict 50% of the layers on a pageserver
"""
timeline_path = pageserver.timeline_dir(tenant_id, timeline_id)
initial_local_layers = sorted(
list(filter(lambda path: path.name != "metadata", timeline_path.glob("*")))
)
client = pageserver.http_client()
for layer in initial_local_layers:
if "ephemeral" in layer.name:
continue
if rng.choice([True, False]):
log.info(f"Evicting layer {tenant_id}/{timeline_id} {layer.name}")
client.evict_layer(tenant_id=tenant_id, timeline_id=timeline_id, layer_name=layer.name)
@pytest.mark.parametrize("seed", [1, 2, 3])
def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, seed: int):
"""
Issue many location configuration changes, ensure that tenants
remain readable & we don't get any unexpected errors. We should
have no ERROR in the log, and no 500s in the API.
The location_config API is intentionally designed so that all destination
states are valid, so that we may test it in this way: the API should always
work as long as the tenant exists.
"""
neon_env_builder.enable_generations = True
neon_env_builder.num_pageservers = 3
neon_env_builder.enable_pageserver_remote_storage(
remote_storage_kind=RemoteStorageKind.MOCK_S3,
)
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
assert env.attachment_service is not None
pageservers = env.pageservers
list([p.http_client() for p in pageservers])
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
# We will make no effort to avoid stale attachments
for ps in env.pageservers:
ps.allowed_errors.extend(
[".*Dropped remote consistent LSN updates.*", ".*Dropping stale deletions.*"]
)
# these can happen, if we shutdown at a good time. to be fixed as part of #5172.
message = ".*duplicated L1 layer layer=.*"
ps.allowed_errors.append(message)
workload = Workload(env, tenant_id, timeline_id)
workload.init(env.pageservers[0].id)
workload.write_rows(256, env.pageservers[0].id)
# We use a fixed seed to make the test reproducible: we want a randomly
# chosen order, but not to change the order every time we run the test.
rng = random.Random(seed)
initial_generation = 1
last_state = {
env.pageservers[0].id: ("AttachedSingle", initial_generation),
env.pageservers[1].id: ("Detached", None),
env.pageservers[2].id: ("Detached", None),
}
latest_attached = env.pageservers[0].id
for _i in range(0, 64):
# Pick a pageserver
pageserver = rng.choice(env.pageservers)
# Pick a pseudorandom state
modes = [
"AttachedSingle",
"AttachedMulti",
"AttachedStale",
"Secondary",
"Detached",
"_Evictions",
"_Restart",
]
mode = rng.choice(modes)
last_state_ps = last_state[pageserver.id]
if mode == "_Evictions":
if last_state_ps[0].startswith("Attached"):
log.info(f"Action: evictions on pageserver {pageserver.id}")
evict_random_layers(rng, pageserver, tenant_id, timeline_id)
else:
log.info(
f"Action: skipping evictions on pageserver {pageserver.id}, is not attached"
)
elif mode == "_Restart":
log.info(f"Action: restarting pageserver {pageserver.id}")
pageserver.stop()
pageserver.start()
if last_state_ps[0].startswith("Attached") and latest_attached == pageserver.id:
log.info("Entering postgres...")
workload.churn_rows(rng.randint(128, 256), pageserver.id)
workload.validate(pageserver.id)
elif last_state_ps[0].startswith("Attached"):
# The `attachment_service` will only re-attach on startup when a pageserver was the
# holder of the latest generation: otherwise the pageserver will revert to detached
# state if it was running attached with a stale generation
last_state[pageserver.id] = ("Detached", None)
else:
secondary_conf: Optional[Dict[str, Any]] = None
if mode == "Secondary":
secondary_conf = {"warm": rng.choice([True, False])}
location_conf: Dict[str, Any] = {
"mode": mode,
"secondary_conf": secondary_conf,
"tenant_conf": {},
}
log.info(f"Action: Configuring pageserver {pageserver.id} to {location_conf}")
# Select a generation number
if mode.startswith("Attached"):
if last_state_ps[1] is not None:
if rng.choice([True, False]):
# Move between attached states, staying in the same generation
generation = last_state_ps[1]
else:
# Switch generations, while also jumping between attached states
generation = env.attachment_service.attach_hook(tenant_id, pageserver.id)
latest_attached = pageserver.id
else:
generation = env.attachment_service.attach_hook(tenant_id, pageserver.id)
latest_attached = pageserver.id
else:
generation = None
location_conf["generation"] = generation
pageserver.tenant_location_configure(tenant_id, location_conf)
last_state[pageserver.id] = (mode, generation)
if mode.startswith("Attached"):
# TODO: a variant of this test that runs background endpoint workloads, as well as
# the inter-step workloads.
workload.churn_rows(
rng.randint(128, 256), pageserver.id, upload=mode != "AttachedStale"
)
workload.validate(pageserver.id)
# Attach all pageservers
for ps in env.pageservers:
location_conf = {"mode": "AttachedMulti", "secondary_conf": None, "tenant_conf": {}}
ps.tenant_location_configure(tenant_id, location_conf)
# Confirm that all are readable
for ps in env.pageservers:
workload.validate(ps.id)
# Detach all pageservers
for ps in env.pageservers:
location_conf = {"mode": "Detached", "secondary_conf": None, "tenant_conf": {}}
ps.tenant_location_configure(tenant_id, location_conf)
# Confirm that all local disk state was removed on detach
# TODO
def test_live_migration(neon_env_builder: NeonEnvBuilder):
"""
Test the sequence of location states that are used in a live migration.
"""
neon_env_builder.enable_generations = True
neon_env_builder.num_pageservers = 2
neon_env_builder.enable_pageserver_remote_storage(
remote_storage_kind=RemoteStorageKind.MOCK_S3,
)
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
assert env.attachment_service is not None
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
pageserver_a = env.pageservers[0]
pageserver_b = env.pageservers[1]
initial_generation = 1
workload = Workload(env, tenant_id, timeline_id)
workload.init(env.pageservers[0].id)
workload.write_rows(256, env.pageservers[0].id)
# Make the destination a secondary location
pageserver_b.tenant_location_configure(
tenant_id,
{
"mode": "Secondary",
"secondary_conf": {"warm": True},
"tenant_conf": {},
},
)
workload.churn_rows(64, pageserver_a.id, upload=False)
# Set origin attachment to stale
log.info("Setting origin to AttachedStale")
pageserver_a.tenant_location_configure(
tenant_id,
{
"mode": "AttachedStale",
"secondary_conf": None,
"tenant_conf": {},
"generation": initial_generation,
},
flush_ms=5000,
)
migrated_generation = env.attachment_service.attach_hook(tenant_id, pageserver_b.id)
log.info(f"Acquired generation {migrated_generation} for destination pageserver")
assert migrated_generation == initial_generation + 1
# Writes and reads still work in AttachedStale.
workload.validate(pageserver_a.id)
workload.validate(pageserver_a.id)
# Ensure that secondary location's timeline directory is populated: we will then
# do some more writes on top of that to ensure that the newly attached pageserver
# properly makes use of the downloaded layers as well as ingesting WAL to catch up.
pageserver_a.http_client().secondary_tenant_upload(tenant_id)
pageserver_b.http_client().secondary_tenant_download(tenant_id)
# Generate some more dirty writes
workload.churn_rows(64, pageserver_a.id)
# Attach the destination
log.info("Setting destination to AttachedMulti")
pageserver_b.tenant_location_configure(
tenant_id,
{
"mode": "AttachedMulti",
"secondary_conf": None,
"tenant_conf": {},
"generation": migrated_generation,
},
)
# Wait for destination LSN to catch up with origin
origin_lsn = pageserver_a.http_client().timeline_detail(tenant_id, timeline_id)[
"last_record_lsn"
]
def caught_up():
destination_lsn = pageserver_b.http_client().timeline_detail(tenant_id, timeline_id)[
"last_record_lsn"
]
log.info(
f"Waiting for LSN to catch up: origin {origin_lsn} vs destination {destination_lsn}"
)
assert destination_lsn >= origin_lsn
wait_until(100, 0.1, caught_up)
# The destination should accept writes
workload.churn_rows(64, pageserver_b.id)
# Dual attached: both are readable.
workload.validate(pageserver_a.id)
workload.validate(pageserver_b.id)
# Revert the origin to secondary
log.info("Setting origin to Secondary")
pageserver_a.tenant_location_configure(
tenant_id,
{
"mode": "Secondary",
"secondary_conf": {"warm": True},
"tenant_conf": {},
},
)
workload.churn_rows(64, pageserver_b.id)
# Put the destination into final state
pageserver_b.tenant_location_configure(
tenant_id,
{
"mode": "AttachedSingle",
"secondary_conf": None,
"tenant_conf": {},
"generation": migrated_generation,
},
)
workload.churn_rows(64, pageserver_b.id)
workload.validate(pageserver_b.id)
def list_layers(pageserver, tenant_id: TenantId, timeline_id: TimelineId) -> list[Path]:
"""
Inspect local storage on a pageserver to discover which layer files are present.
:return: list of relative paths to layers, from the timeline root.
"""
timeline_path = pageserver.timeline_dir(tenant_id, timeline_id)
def relative(p: Path) -> Path:
return p.relative_to(timeline_path)
return sorted(
list(
map(
relative,
filter(
lambda path: path.name != "metadata"
and "ephemeral" not in path.name
and "temp" not in path.name,
timeline_path.glob("*"),
),
)
)
)
def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
"""
Test the overall data flow in secondary mode:
- Heatmap uploads from the attached location
- Heatmap & layer downloads from the secondary location
- Eviction of layers on the attached location results in deletion
on the secondary location as well.
"""
neon_env_builder.enable_generations = True
neon_env_builder.num_pageservers = 2
neon_env_builder.enable_pageserver_remote_storage(
remote_storage_kind=RemoteStorageKind.MOCK_S3,
)
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
assert env.attachment_service is not None
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
ps_attached = env.pageservers[0]
ps_secondary = env.pageservers[1]
workload = Workload(env, tenant_id, timeline_id)
workload.init(env.pageservers[0].id)
workload.write_rows(256, ps_attached.id)
# Configure a secondary location
log.info("Setting up secondary location...")
ps_secondary.tenant_location_configure(
tenant_id,
{
"mode": "Secondary",
"secondary_conf": {"warm": True},
"tenant_conf": {},
},
)
# Explicit upload/download cycle
# ==============================
log.info("Synchronizing after initial write...")
ps_attached.http_client().secondary_tenant_upload(tenant_id)
ps_secondary.http_client().secondary_tenant_download(tenant_id)
assert list_layers(ps_attached, tenant_id, timeline_id) == list_layers(
ps_secondary, tenant_id, timeline_id
)
# Make changes on attached pageserver, check secondary downloads them
# ===================================================================
log.info("Synchronizing after subsequent write...")
workload.churn_rows(128, ps_attached.id)
ps_attached.http_client().secondary_tenant_upload(tenant_id)
ps_secondary.http_client().secondary_tenant_download(tenant_id)
assert list_layers(ps_attached, tenant_id, timeline_id) == list_layers(
ps_secondary, tenant_id, timeline_id
)
# Do evictions on attached pageserver, check secondary follows along
# ==================================================================
log.info("Evicting a layer...")
layer_to_evict = list_layers(ps_attached, tenant_id, timeline_id)[0]
ps_attached.http_client().evict_layer(tenant_id, timeline_id, layer_name=layer_to_evict.name)
log.info("Synchronizing after eviction...")
ps_attached.http_client().secondary_tenant_upload(tenant_id)
ps_secondary.http_client().secondary_tenant_download(tenant_id)
assert layer_to_evict not in list_layers(ps_attached, tenant_id, timeline_id)
assert list_layers(ps_attached, tenant_id, timeline_id) == list_layers(
ps_secondary, tenant_id, timeline_id
)
# Scrub the remote storage
# ========================
# This confirms that the scrubber isn't upset by the presence of the heatmap
# TODO: depends on `jcsp/scrubber-index-part` branch.
# Detach secondary and delete tenant
# ===================================
# This confirms that the heatmap gets cleaned up as well as other normal content.
log.info("Detaching secondary location...")
ps_secondary.tenant_location_configure(
tenant_id,
{
"mode": "Detached",
"secondary_conf": None,
"tenant_conf": {},
},
)
log.info("Deleting tenant...")
tenant_delete_wait_completed(ps_attached.http_client(), tenant_id, 10)
assert_prefix_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
)
),
)
# def test_secondary_download_loop
# Configure some short check intervals, and validate that layers are downloaded by secondary
# without any explicit admin API calls.
# def test_secondary_eviction(neon_env_builder: NeonEnvBuilder):
#