mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-24 00:20:37 +00:00
Compare commits
15 Commits
jcsp/batch
...
jcsp/tenan
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c6867e9f32 | ||
|
|
e9a9e6be30 | ||
|
|
e8d4c214a3 | ||
|
|
8106f87945 | ||
|
|
5b989ff595 | ||
|
|
819426ec59 | ||
|
|
403a25d42d | ||
|
|
4234c886f4 | ||
|
|
98629841e0 | ||
|
|
215cdd18c4 | ||
|
|
0fd80484a9 | ||
|
|
07508fb110 | ||
|
|
5bb9ba37cc | ||
|
|
f1cd1a2122 | ||
|
|
f010479107 |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -5765,6 +5765,7 @@ dependencies = [
|
||||
"serde",
|
||||
"serde_assert",
|
||||
"serde_json",
|
||||
"serde_path_to_error",
|
||||
"serde_with",
|
||||
"signal-hook",
|
||||
"strum",
|
||||
|
||||
@@ -50,6 +50,8 @@ const_format.workspace = true
|
||||
# why is it only here? no other crate should use it, streams are rarely needed.
|
||||
tokio-stream = { version = "0.1.14" }
|
||||
|
||||
serde_path_to_error.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
byteorder.workspace = true
|
||||
bytes.workspace = true
|
||||
|
||||
@@ -25,8 +25,12 @@ pub async fn json_request_or_empty_body<T: for<'de> Deserialize<'de>>(
|
||||
if body.remaining() == 0 {
|
||||
return Ok(None);
|
||||
}
|
||||
serde_json::from_reader(body.reader())
|
||||
.context("Failed to parse json request")
|
||||
|
||||
let mut deser = serde_json::de::Deserializer::from_reader(body.reader());
|
||||
|
||||
serde_path_to_error::deserialize(&mut deser)
|
||||
// intentionally stringify because the debug version is not helpful in python logs
|
||||
.map_err(|e| anyhow::anyhow!("Failed to parse json request: {e}"))
|
||||
.map(Some)
|
||||
.map_err(ApiError::BadRequest)
|
||||
}
|
||||
|
||||
@@ -408,6 +408,11 @@ fn start_pageserver(
|
||||
initial_tenant_load_remote: Some(init_done_tx),
|
||||
initial_tenant_load: Some(init_remote_done_tx),
|
||||
background_jobs_can_start: background_jobs_barrier.clone(),
|
||||
warmup_limit: Arc::new(tokio::sync::Semaphore::new(
|
||||
conf.concurrent_tenant_size_logical_size_queries
|
||||
.initial_permits()
|
||||
.get(),
|
||||
)),
|
||||
};
|
||||
|
||||
// Scan the local 'tenants/' directory and start loading the tenants
|
||||
|
||||
@@ -992,8 +992,8 @@ paths:
|
||||
type: string
|
||||
post:
|
||||
description: |
|
||||
Create a timeline. Returns new timeline id on success.\
|
||||
If no new timeline id is specified in parameters, it would be generated. It's an error to recreate the same timeline.
|
||||
Create a timeline. Returns new timeline id on success.
|
||||
Recreating the same timeline will succeed if the parameters match the existing timeline.
|
||||
If no pg_version is specified, assume DEFAULT_PG_VERSION hardcoded in the pageserver.
|
||||
requestBody:
|
||||
content:
|
||||
|
||||
@@ -38,6 +38,7 @@ use crate::metrics::{StorageTimeOperation, STORAGE_TIME_GLOBAL};
|
||||
use crate::pgdatadir_mapping::LsnForTimestamp;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::config::{LocationConf, TenantConfOpt};
|
||||
use crate::tenant::mgr::GetActiveTenantError;
|
||||
use crate::tenant::mgr::{
|
||||
GetTenantError, SetNewTenantConfigError, TenantManager, TenantMapError, TenantMapInsertError,
|
||||
TenantSlotError, TenantSlotUpsertError, TenantStateError,
|
||||
@@ -67,6 +68,11 @@ use utils::{
|
||||
// Imports only used for testing APIs
|
||||
use super::models::ConfigureFailpointsRequest;
|
||||
|
||||
// For APIs that require an Active tenant, how long should we block waiting for that state?
|
||||
// This is not functionally necessary (clients will retry), but avoids generating a lot of
|
||||
// failed API calls while tenants are activating.
|
||||
const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(5000);
|
||||
|
||||
pub struct State {
|
||||
conf: &'static PageServerConf,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
@@ -233,6 +239,19 @@ impl From<GetTenantError> for ApiError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetActiveTenantError> for ApiError {
|
||||
fn from(e: GetActiveTenantError) -> ApiError {
|
||||
match e {
|
||||
GetActiveTenantError::WillNotBecomeActive(_) => ApiError::Conflict(format!("{}", e)),
|
||||
GetActiveTenantError::Cancelled => ApiError::ShuttingDown,
|
||||
GetActiveTenantError::NotFound(gte) => gte.into(),
|
||||
GetActiveTenantError::WaitForActiveTimeout { .. } => {
|
||||
ApiError::ResourceUnavailable(format!("{}", e).into())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<SetNewTenantConfigError> for ApiError {
|
||||
fn from(e: SetNewTenantConfigError) -> ApiError {
|
||||
match e {
|
||||
@@ -435,7 +454,10 @@ async fn timeline_create_handler(
|
||||
let state = get_state(&request);
|
||||
|
||||
async {
|
||||
let tenant = state.tenant_manager.get_attached_tenant_shard(tenant_shard_id, true)?;
|
||||
let tenant = state.tenant_manager.get_attached_tenant_shard(tenant_shard_id, false)?;
|
||||
|
||||
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
|
||||
|
||||
match tenant.create_timeline(
|
||||
new_timeline_id,
|
||||
request_data.ancestor_timeline_id.map(TimelineId::from),
|
||||
@@ -453,7 +475,7 @@ async fn timeline_create_handler(
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
json_response(StatusCode::CREATED, timeline_info)
|
||||
}
|
||||
Err(tenant::CreateTimelineError::AlreadyExists) => {
|
||||
Err(tenant::CreateTimelineError::Conflict | tenant::CreateTimelineError::AlreadyCreating) => {
|
||||
json_response(StatusCode::CONFLICT, ())
|
||||
}
|
||||
Err(tenant::CreateTimelineError::AncestorLsn(err)) => {
|
||||
@@ -694,11 +716,23 @@ async fn timeline_delete_handler(
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
|
||||
let state = get_state(&request);
|
||||
|
||||
state.tenant_manager.delete_timeline(tenant_shard_id, timeline_id, &ctx)
|
||||
.instrument(info_span!("timeline_delete", tenant_id=%tenant_shard_id.tenant_id, shard=%tenant_shard_id.shard_slug(), %timeline_id))
|
||||
let tenant = state
|
||||
.tenant_manager
|
||||
.get_attached_tenant_shard(tenant_shard_id, false)
|
||||
.map_err(|e| {
|
||||
match e {
|
||||
// GetTenantError has a built-in conversion to ApiError, but in this context we don't
|
||||
// want to treat missing tenants as 404, to avoid ambiguity with successful deletions.
|
||||
GetTenantError::NotFound(_) => ApiError::PreconditionFailed(
|
||||
"Requested tenant is missing".to_string().into_boxed_str(),
|
||||
),
|
||||
e => e.into(),
|
||||
}
|
||||
})?;
|
||||
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
|
||||
tenant.delete_timeline(timeline_id).instrument(info_span!("timeline_delete", tenant_id=%tenant_shard_id.tenant_id, shard=%tenant_shard_id.shard_slug(), %timeline_id))
|
||||
.await?;
|
||||
|
||||
json_response(StatusCode::ACCEPTED, ())
|
||||
@@ -1136,7 +1170,10 @@ async fn tenant_create_handler(
|
||||
|
||||
// We created the tenant. Existing API semantics are that the tenant
|
||||
// is Active when this function returns.
|
||||
if let res @ Err(_) = new_tenant.wait_to_become_active().await {
|
||||
if let res @ Err(_) = new_tenant
|
||||
.wait_to_become_active(ACTIVE_TENANT_TIMEOUT)
|
||||
.await
|
||||
{
|
||||
// This shouldn't happen because we just created the tenant directory
|
||||
// in tenant::mgr::create_tenant, and there aren't any remote timelines
|
||||
// to load, so, nothing can really fail during load.
|
||||
@@ -1621,9 +1658,7 @@ async fn disk_usage_eviction_run(
|
||||
}
|
||||
}
|
||||
|
||||
let config = json_request::<Config>(&mut r)
|
||||
.await
|
||||
.map_err(|_| ApiError::BadRequest(anyhow::anyhow!("invalid JSON body")))?;
|
||||
let config = json_request::<Config>(&mut r).await?;
|
||||
|
||||
let usage = Usage {
|
||||
config,
|
||||
|
||||
@@ -27,6 +27,8 @@ pub mod walredo;
|
||||
|
||||
pub mod failpoint_support;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::task_mgr::TaskKind;
|
||||
use camino::Utf8Path;
|
||||
use deletion_queue::DeletionQueue;
|
||||
@@ -190,6 +192,11 @@ pub struct InitializationOrder {
|
||||
///
|
||||
/// This can be broken up later on, but right now there is just one class of a background job.
|
||||
pub background_jobs_can_start: utils::completion::Barrier,
|
||||
|
||||
/// Concurrency limit for attaching tenants during startup. This limit does not
|
||||
/// apply to tenants that a client tries to access: those proceed to attach as fast
|
||||
/// as they can.
|
||||
pub warmup_limit: Arc<tokio::sync::Semaphore>,
|
||||
}
|
||||
|
||||
/// Time the future with a warning when it exceeds a threshold.
|
||||
|
||||
@@ -684,14 +684,54 @@ pub static STARTUP_IS_LOADING: Lazy<UIntGauge> = Lazy::new(|| {
|
||||
.expect("Failed to register pageserver_startup_is_loading")
|
||||
});
|
||||
|
||||
/// How long did tenants take to go from construction to active state?
|
||||
pub(crate) static TENANT_ACTIVATION: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
/// Metrics related to the lifecycle of a [`crate::tenant::Tenant`] object: things
|
||||
/// like how long it took to load.
|
||||
///
|
||||
/// Note that these are process-global metrics, _not_ per-tenant metrics. Per-tenant
|
||||
/// metrics are rather expensive, and usually fine grained stuff makes more sense
|
||||
/// at a timeline level than tenant level.
|
||||
pub(crate) struct TenantMetrics {
|
||||
/// How long did tenants take to go from construction to active state?
|
||||
pub(crate) activation: Histogram,
|
||||
pub(crate) preload: Histogram,
|
||||
pub(crate) attach: Histogram,
|
||||
|
||||
/// How many tenants are included in the initial startup of the pagesrever?
|
||||
pub(crate) startup_scheduled: IntCounter,
|
||||
pub(crate) startup_complete: IntCounter,
|
||||
}
|
||||
|
||||
pub(crate) static TENANT: Lazy<TenantMetrics> = Lazy::new(|| {
|
||||
TenantMetrics {
|
||||
activation: register_histogram!(
|
||||
"pageserver_tenant_activation_seconds",
|
||||
"Time taken by tenants to activate, in seconds",
|
||||
CRITICAL_OP_BUCKETS.into()
|
||||
)
|
||||
.expect("Failed to register pageserver_tenant_activation_seconds metric")
|
||||
.expect("Failed to register metric"),
|
||||
preload: register_histogram!(
|
||||
"pageserver_tenant_preload_seconds",
|
||||
"Time taken by tenants to load remote metadata on startup/attach, in seconds",
|
||||
CRITICAL_OP_BUCKETS.into()
|
||||
)
|
||||
.expect("Failed to register metric"),
|
||||
attach: register_histogram!(
|
||||
"pageserver_tenant_attach_seconds",
|
||||
"Time taken by tenants to intialize, after remote metadata is already loaded",
|
||||
CRITICAL_OP_BUCKETS.into()
|
||||
)
|
||||
.expect("Failed to register metric"),
|
||||
startup_scheduled: register_int_counter!(
|
||||
"pageserver_tenant_startup_scheduled",
|
||||
"Number of tenants included in pageserver startup (doesn't count tenants attached later)"
|
||||
).expect("Failed to register metric"),
|
||||
startup_complete: register_int_counter!(
|
||||
"pageserver_tenant_startup_complete",
|
||||
"Number of tenants that have completed warm-up, or activated on-demand during initial startup: \
|
||||
should eventually reach `pageserver_tenant_startup_scheduled_total`. Does not include broken \
|
||||
tenants: such cases will lead to this metric never reaching the scheduled count."
|
||||
).expect("Failed to register metric"),
|
||||
}
|
||||
});
|
||||
|
||||
/// Each `Timeline`'s [`EVICTIONS_WITH_LOW_RESIDENCE_DURATION`] metric.
|
||||
@@ -2213,6 +2253,9 @@ pub fn preinitialize_metrics() {
|
||||
// Deletion queue stats
|
||||
Lazy::force(&DELETION_QUEUE);
|
||||
|
||||
// Tenant stats
|
||||
Lazy::force(&TENANT);
|
||||
|
||||
// Tenant manager stats
|
||||
Lazy::force(&TENANT_MANAGER);
|
||||
|
||||
|
||||
@@ -561,9 +561,14 @@ pub async fn shutdown_watcher() {
|
||||
/// cancelled. It can however be moved to other tasks, such as `tokio::task::spawn_blocking` or
|
||||
/// `tokio::task::JoinSet::spawn`.
|
||||
pub fn shutdown_token() -> CancellationToken {
|
||||
SHUTDOWN_TOKEN
|
||||
.try_with(|t| t.clone())
|
||||
.expect("shutdown_token() called in an unexpected task or thread")
|
||||
let res = SHUTDOWN_TOKEN.try_with(|t| t.clone());
|
||||
|
||||
if cfg!(test) {
|
||||
// in tests this method is called from non-taskmgr spawned tasks, and that is all ok.
|
||||
res.unwrap_or_default()
|
||||
} else {
|
||||
res.expect("shutdown_token() called in an unexpected task or thread")
|
||||
}
|
||||
}
|
||||
|
||||
/// Has the current task been requested to shut down?
|
||||
|
||||
@@ -36,6 +36,8 @@ use utils::crashsafe::path_with_suffix_extension;
|
||||
use utils::fs_ext;
|
||||
use utils::sync::gate::Gate;
|
||||
use utils::sync::gate::GateGuard;
|
||||
use utils::timeout::timeout_cancellable;
|
||||
use utils::timeout::TimeoutCancellableError;
|
||||
|
||||
use self::config::AttachedLocationConfig;
|
||||
use self::config::AttachmentMode;
|
||||
@@ -48,6 +50,7 @@ use self::mgr::GetActiveTenantError;
|
||||
use self::mgr::GetTenantError;
|
||||
use self::mgr::TenantsMap;
|
||||
use self::remote_timeline_client::RemoteTimelineClient;
|
||||
use self::timeline::uninit::TimelineExclusionError;
|
||||
use self::timeline::uninit::TimelineUninitMark;
|
||||
use self::timeline::uninit::UninitializedTimeline;
|
||||
use self::timeline::EvictionTaskTenantState;
|
||||
@@ -58,7 +61,7 @@ use crate::deletion_queue::DeletionQueueClient;
|
||||
use crate::deletion_queue::DeletionQueueError;
|
||||
use crate::import_datadir;
|
||||
use crate::is_uninit_mark;
|
||||
use crate::metrics::TENANT_ACTIVATION;
|
||||
use crate::metrics::TENANT;
|
||||
use crate::metrics::{remove_tenant_metrics, TENANT_STATE_METRIC, TENANT_SYNTHETIC_SIZE_METRIC};
|
||||
use crate::repository::GcResult;
|
||||
use crate::task_mgr;
|
||||
@@ -87,7 +90,6 @@ use std::process::Stdio;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::Arc;
|
||||
use std::sync::MutexGuard;
|
||||
use std::sync::{Mutex, RwLock};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
@@ -226,7 +228,7 @@ pub struct Tenant {
|
||||
|
||||
/// The value creation timestamp, used to measure activation delay, see:
|
||||
/// <https://github.com/neondatabase/neon/issues/4025>
|
||||
loading_started_at: Instant,
|
||||
constructed_at: Instant,
|
||||
|
||||
state: watch::Sender<TenantState>,
|
||||
|
||||
@@ -249,6 +251,12 @@ pub struct Tenant {
|
||||
generation: Generation,
|
||||
|
||||
timelines: Mutex<HashMap<TimelineId, Arc<Timeline>>>,
|
||||
|
||||
/// During timeline creation, we first insert the TimelineId to the
|
||||
/// creating map, then `timelines`, then remove it from the creating map.
|
||||
/// **Lock order**: if acquring both, acquire`timelines` before `timelines_creating`
|
||||
timelines_creating: std::sync::Mutex<HashSet<TimelineId>>,
|
||||
|
||||
// This mutex prevents creation of new timelines during GC.
|
||||
// Adding yet another mutex (in addition to `timelines`) is needed because holding
|
||||
// `timelines` mutex during all GC iteration
|
||||
@@ -270,6 +278,11 @@ pub struct Tenant {
|
||||
|
||||
eviction_task_tenant_state: tokio::sync::Mutex<EvictionTaskTenantState>,
|
||||
|
||||
/// If the tenant is in Activating state, notify this to encourage it
|
||||
/// to proceed to Active as soon as possible, rather than waiting for lazy
|
||||
/// background warmup.
|
||||
pub(crate) activate_now: tokio::sync::Notify,
|
||||
|
||||
pub(crate) delete_progress: Arc<tokio::sync::Mutex<DeleteTenantFlow>>,
|
||||
|
||||
// Cancellation token fires when we have entered shutdown(). This is a parent of
|
||||
@@ -407,8 +420,10 @@ impl Debug for SetStoppingError {
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum CreateTimelineError {
|
||||
#[error("a timeline with the given ID already exists")]
|
||||
AlreadyExists,
|
||||
#[error("creation of timeline with the given ID is in progress")]
|
||||
AlreadyCreating,
|
||||
#[error("timeline already exists with different parameters")]
|
||||
Conflict,
|
||||
#[error(transparent)]
|
||||
AncestorLsn(anyhow::Error),
|
||||
#[error("ancestor timeline is not active")]
|
||||
@@ -614,6 +629,11 @@ impl Tenant {
|
||||
"attach tenant",
|
||||
false,
|
||||
async move {
|
||||
scopeguard::defer! {
|
||||
tracing::info!("Increment complete count");
|
||||
TENANT.startup_complete.inc();
|
||||
}
|
||||
|
||||
// Ideally we should use Tenant::set_broken_no_wait, but it is not supposed to be used when tenant is in loading state.
|
||||
let make_broken =
|
||||
|t: &Tenant, err: anyhow::Error| {
|
||||
@@ -640,6 +660,54 @@ impl Tenant {
|
||||
.as_mut()
|
||||
.and_then(|x| x.initial_tenant_load_remote.take());
|
||||
|
||||
enum AttachType<'a> {
|
||||
// During pageserver startup, we are attaching this tenant lazily in the background
|
||||
Warmup(tokio::sync::SemaphorePermit<'a>),
|
||||
// During pageserver startup, we are attaching this tenant as soon as we can,
|
||||
// because a client tried to access it.
|
||||
OnDemand,
|
||||
// During normal operations after startup, we are attaching a tenant.
|
||||
Normal,
|
||||
|
||||
}
|
||||
|
||||
// Before doing any I/O, wait for either or:
|
||||
// - A client to attempt to access to this tenant (on-demand loading)
|
||||
// - A permit to become available in the warmup semaphore (background warmup)
|
||||
let attach_type = if let Some(init_order) = &init_order {
|
||||
tokio::select!(
|
||||
_ = tenant_clone.activate_now.notified() => {
|
||||
tracing::info!("Activating tenant (on-demand)");
|
||||
AttachType::OnDemand
|
||||
},
|
||||
permit_result = init_order.warmup_limit.acquire() => {
|
||||
match permit_result {
|
||||
Ok(p) => {
|
||||
tracing::info!("Activating tenant (warmup)");
|
||||
AttachType::Warmup(p)
|
||||
}
|
||||
Err(_) => {
|
||||
// This is unexpected: the warmup semaphore should stay alive
|
||||
// for the lifetime of init_order. Log a warning and proceed.
|
||||
tracing::warn!("warmup_limit semaphore unexpectedly closed");
|
||||
AttachType::Normal
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
_ = tenant_clone.cancel.cancelled() => {
|
||||
// This is safe, but should be pretty rare: it is interesting if a tenant
|
||||
// stayed in Activating for such a long time that shutdown found it in
|
||||
// that state.
|
||||
tracing::info!("Tenant shut down before activation");
|
||||
return Ok(());
|
||||
},
|
||||
)
|
||||
} else {
|
||||
AttachType::Normal
|
||||
};
|
||||
|
||||
let preload_timer = TENANT.preload.start_timer();
|
||||
let preload = match mode {
|
||||
SpawnMode::Create => {None},
|
||||
SpawnMode::Normal => {
|
||||
@@ -662,6 +730,7 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
};
|
||||
preload_timer.observe_duration();
|
||||
|
||||
// Remote preload is complete.
|
||||
drop(remote_load_completion);
|
||||
@@ -713,6 +782,7 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
|
||||
let attach_timer = TENANT.attach.start_timer();
|
||||
match tenant_clone.attach(preload, &ctx).await {
|
||||
Ok(()) => {
|
||||
info!("attach finished, activating");
|
||||
@@ -722,6 +792,28 @@ impl Tenant {
|
||||
make_broken(&tenant_clone, anyhow::anyhow!(e));
|
||||
}
|
||||
}
|
||||
attach_timer.observe_duration();
|
||||
|
||||
// If we are doing an opportunistic warmup attachment at startup, initialize
|
||||
// logical size at the same time. This is better than starting a bunch of idle tenants
|
||||
// with cold caches and then coming back later to initialize their logical sizes.
|
||||
//
|
||||
// It also prevents the warmup proccess competing with the concurrency limit on
|
||||
// logical size calculations: if logical size calculation semaphore is saturated,
|
||||
// then warmup will wait for that before proceeding to the next tenant.
|
||||
if let AttachType::Warmup(_permit) = attach_type {
|
||||
let mut futs = FuturesUnordered::new();
|
||||
let timelines: Vec<_> = tenant_clone.timelines.lock().unwrap().values().cloned().collect();
|
||||
for t in timelines {
|
||||
futs.push(t.await_initial_logical_size())
|
||||
}
|
||||
tracing::info!("Waiting for initial logical sizes while warming up...");
|
||||
while futs.next().await.is_some() {
|
||||
|
||||
}
|
||||
tracing::info!("Warm-up complete");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
.instrument({
|
||||
@@ -1458,7 +1550,7 @@ impl Tenant {
|
||||
/// For tests, use `DatadirModification::init_empty_test_timeline` + `commit` to setup the
|
||||
/// minimum amount of keys required to get a writable timeline.
|
||||
/// (Without it, `put` might fail due to `repartition` failing.)
|
||||
pub async fn create_empty_timeline(
|
||||
pub(crate) async fn create_empty_timeline(
|
||||
&self,
|
||||
new_timeline_id: TimelineId,
|
||||
initdb_lsn: Lsn,
|
||||
@@ -1470,10 +1562,7 @@ impl Tenant {
|
||||
"Cannot create empty timelines on inactive tenant"
|
||||
);
|
||||
|
||||
let timeline_uninit_mark = {
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
self.create_timeline_uninit_mark(new_timeline_id, &timelines)?
|
||||
};
|
||||
let timeline_uninit_mark = self.create_timeline_uninit_mark(new_timeline_id)?;
|
||||
let new_metadata = TimelineMetadata::new(
|
||||
// Initialize disk_consistent LSN to 0, The caller must import some data to
|
||||
// make it valid, before calling finish_creation()
|
||||
@@ -1550,7 +1639,7 @@ impl Tenant {
|
||||
/// If the caller specified the timeline ID to use (`new_timeline_id`), and timeline with
|
||||
/// the same timeline ID already exists, returns CreateTimelineError::AlreadyExists.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn create_timeline(
|
||||
pub(crate) async fn create_timeline(
|
||||
&self,
|
||||
new_timeline_id: TimelineId,
|
||||
ancestor_timeline_id: Option<TimelineId>,
|
||||
@@ -1571,26 +1660,51 @@ impl Tenant {
|
||||
.enter()
|
||||
.map_err(|_| CreateTimelineError::ShuttingDown)?;
|
||||
|
||||
if let Ok(existing) = self.get_timeline(new_timeline_id, false) {
|
||||
debug!("timeline {new_timeline_id} already exists");
|
||||
|
||||
if let Some(remote_client) = existing.remote_client.as_ref() {
|
||||
// Wait for uploads to complete, so that when we return Ok, the timeline
|
||||
// is known to be durable on remote storage. Just like we do at the end of
|
||||
// this function, after we have created the timeline ourselves.
|
||||
//
|
||||
// We only really care that the initial version of `index_part.json` has
|
||||
// been uploaded. That's enough to remember that the timeline
|
||||
// exists. However, there is no function to wait specifically for that so
|
||||
// we just wait for all in-progress uploads to finish.
|
||||
remote_client
|
||||
.wait_completion()
|
||||
.await
|
||||
.context("wait for timeline uploads to complete")?;
|
||||
// Get exclusive access to the timeline ID: this ensures that it does not already exist,
|
||||
// and that no other creation attempts will be allowed in while we are working. The
|
||||
// uninit_mark is a guard.
|
||||
let uninit_mark = match self.create_timeline_uninit_mark(new_timeline_id) {
|
||||
Ok(m) => m,
|
||||
Err(TimelineExclusionError::AlreadyCreating) => {
|
||||
// Creation is in progress, we cannot create it again, and we cannot
|
||||
// check if this request matches the existing one, so caller must try
|
||||
// again later.
|
||||
return Err(CreateTimelineError::AlreadyCreating);
|
||||
}
|
||||
Err(TimelineExclusionError::Other(e)) => {
|
||||
return Err(CreateTimelineError::Other(e));
|
||||
}
|
||||
Err(TimelineExclusionError::AlreadyExists(existing)) => {
|
||||
debug!("timeline {new_timeline_id} already exists");
|
||||
|
||||
return Err(CreateTimelineError::AlreadyExists);
|
||||
}
|
||||
// Idempotency: creating the same timeline twice is not an error, unless
|
||||
// the second creation has different parameters.
|
||||
if existing.get_ancestor_timeline_id() != ancestor_timeline_id
|
||||
|| existing.pg_version != pg_version
|
||||
|| (ancestor_start_lsn.is_some()
|
||||
&& ancestor_start_lsn != Some(existing.get_ancestor_lsn()))
|
||||
{
|
||||
return Err(CreateTimelineError::Conflict);
|
||||
}
|
||||
|
||||
if let Some(remote_client) = existing.remote_client.as_ref() {
|
||||
// Wait for uploads to complete, so that when we return Ok, the timeline
|
||||
// is known to be durable on remote storage. Just like we do at the end of
|
||||
// this function, after we have created the timeline ourselves.
|
||||
//
|
||||
// We only really care that the initial version of `index_part.json` has
|
||||
// been uploaded. That's enough to remember that the timeline
|
||||
// exists. However, there is no function to wait specifically for that so
|
||||
// we just wait for all in-progress uploads to finish.
|
||||
remote_client
|
||||
.wait_completion()
|
||||
.await
|
||||
.context("wait for timeline uploads to complete")?;
|
||||
}
|
||||
|
||||
return Ok(existing);
|
||||
}
|
||||
};
|
||||
|
||||
let loaded_timeline = match ancestor_timeline_id {
|
||||
Some(ancestor_timeline_id) => {
|
||||
@@ -1627,18 +1741,32 @@ impl Tenant {
|
||||
ancestor_timeline.wait_lsn(*lsn, ctx).await?;
|
||||
}
|
||||
|
||||
self.branch_timeline(&ancestor_timeline, new_timeline_id, ancestor_start_lsn, ctx)
|
||||
.await?
|
||||
self.branch_timeline(
|
||||
&ancestor_timeline,
|
||||
new_timeline_id,
|
||||
ancestor_start_lsn,
|
||||
uninit_mark,
|
||||
ctx,
|
||||
)
|
||||
.await?
|
||||
}
|
||||
None => {
|
||||
self.bootstrap_timeline(new_timeline_id, pg_version, load_existing_initdb, ctx)
|
||||
.await?
|
||||
self.bootstrap_timeline(
|
||||
new_timeline_id,
|
||||
pg_version,
|
||||
load_existing_initdb,
|
||||
uninit_mark,
|
||||
ctx,
|
||||
)
|
||||
.await?
|
||||
}
|
||||
};
|
||||
|
||||
// At this point we have dropped our guard on [`Self::timelines_creating`], and
|
||||
// the timeline is visible in [`Self::timelines`], but it is _not_ durable yet. We must
|
||||
// not send a success to the caller until it is. The same applies to handling retries,
|
||||
// see the handling of [`TimelineExclusionError::AlreadyExists`] above.
|
||||
if let Some(remote_client) = loaded_timeline.remote_client.as_ref() {
|
||||
// Wait for the upload of the 'index_part.json` file to finish, so that when we return
|
||||
// Ok, the timeline is durable in remote storage.
|
||||
let kind = ancestor_timeline_id
|
||||
.map(|_| "branched")
|
||||
.unwrap_or("bootstrapped");
|
||||
@@ -1652,6 +1780,15 @@ impl Tenant {
|
||||
Ok(loaded_timeline)
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_timeline(
|
||||
self: Arc<Self>,
|
||||
timeline_id: TimelineId,
|
||||
) -> Result<(), DeleteTimelineError> {
|
||||
DeleteTimelineFlow::run(&self, timeline_id, false).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// perform one garbage collection iteration, removing old data files from disk.
|
||||
/// this function is periodically called by gc task.
|
||||
/// also it can be explicitly requested through page server api 'do_gc' command.
|
||||
@@ -1813,7 +1950,7 @@ impl Tenant {
|
||||
);
|
||||
*current_state = TenantState::Active;
|
||||
|
||||
let elapsed = self.loading_started_at.elapsed();
|
||||
let elapsed = self.constructed_at.elapsed();
|
||||
let total_timelines = timelines_accessor.len();
|
||||
|
||||
// log a lot of stuff, because some tenants sometimes suffer from user-visible
|
||||
@@ -1828,7 +1965,7 @@ impl Tenant {
|
||||
"activation attempt finished"
|
||||
);
|
||||
|
||||
TENANT_ACTIVATION.observe(elapsed.as_secs_f64());
|
||||
TENANT.activation.observe(elapsed.as_secs_f64());
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -2083,18 +2220,35 @@ impl Tenant {
|
||||
self.state.subscribe()
|
||||
}
|
||||
|
||||
pub(crate) async fn wait_to_become_active(&self) -> Result<(), GetActiveTenantError> {
|
||||
pub(crate) async fn wait_to_become_active(
|
||||
&self,
|
||||
timeout: Duration,
|
||||
) -> Result<(), GetActiveTenantError> {
|
||||
self.activate_now.notify_one();
|
||||
let mut receiver = self.state.subscribe();
|
||||
loop {
|
||||
let current_state = receiver.borrow_and_update().clone();
|
||||
match current_state {
|
||||
TenantState::Loading | TenantState::Attaching | TenantState::Activating(_) => {
|
||||
// in these states, there's a chance that we can reach ::Active
|
||||
receiver.changed().await.map_err(
|
||||
|_e: tokio::sync::watch::error::RecvError|
|
||||
// Tenant existed but was dropped: report it as non-existent
|
||||
GetActiveTenantError::NotFound(GetTenantError::NotFound(self.tenant_shard_id.tenant_id))
|
||||
)?;
|
||||
match timeout_cancellable(timeout, &self.cancel, receiver.changed()).await {
|
||||
Ok(r) => {
|
||||
r.map_err(
|
||||
|_e: tokio::sync::watch::error::RecvError|
|
||||
// Tenant existed but was dropped: report it as non-existent
|
||||
GetActiveTenantError::NotFound(GetTenantError::NotFound(self.tenant_shard_id.tenant_id))
|
||||
)?
|
||||
}
|
||||
Err(TimeoutCancellableError::Cancelled) => {
|
||||
return Err(GetActiveTenantError::Cancelled);
|
||||
}
|
||||
Err(TimeoutCancellableError::Timeout) => {
|
||||
return Err(GetActiveTenantError::WaitForActiveTimeout {
|
||||
latest_state: Some(self.current_state()),
|
||||
wait_time: timeout,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
TenantState::Active { .. } => {
|
||||
return Ok(());
|
||||
@@ -2419,9 +2573,10 @@ impl Tenant {
|
||||
conf,
|
||||
// using now here is good enough approximation to catch tenants with really long
|
||||
// activation times.
|
||||
loading_started_at: Instant::now(),
|
||||
constructed_at: Instant::now(),
|
||||
tenant_conf: Arc::new(RwLock::new(attached_conf)),
|
||||
timelines: Mutex::new(HashMap::new()),
|
||||
timelines_creating: Mutex::new(HashSet::new()),
|
||||
gc_cs: tokio::sync::Mutex::new(()),
|
||||
walredo_mgr,
|
||||
remote_storage,
|
||||
@@ -2430,6 +2585,7 @@ impl Tenant {
|
||||
cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()),
|
||||
cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
|
||||
eviction_task_tenant_state: tokio::sync::Mutex::new(EvictionTaskTenantState::default()),
|
||||
activate_now: tokio::sync::Notify::new(),
|
||||
delete_progress: Arc::new(tokio::sync::Mutex::new(DeleteTenantFlow::default())),
|
||||
cancel: CancellationToken::default(),
|
||||
gate: Gate::new(format!("Tenant<{tenant_shard_id}>")),
|
||||
@@ -2813,8 +2969,9 @@ impl Tenant {
|
||||
start_lsn: Option<Lsn>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Arc<Timeline>, CreateTimelineError> {
|
||||
let uninit_mark = self.create_timeline_uninit_mark(dst_id).unwrap();
|
||||
let tl = self
|
||||
.branch_timeline_impl(src_timeline, dst_id, start_lsn, ctx)
|
||||
.branch_timeline_impl(src_timeline, dst_id, start_lsn, uninit_mark, ctx)
|
||||
.await?;
|
||||
tl.set_state(TimelineState::Active);
|
||||
Ok(tl)
|
||||
@@ -2828,9 +2985,10 @@ impl Tenant {
|
||||
src_timeline: &Arc<Timeline>,
|
||||
dst_id: TimelineId,
|
||||
start_lsn: Option<Lsn>,
|
||||
timeline_uninit_mark: TimelineUninitMark<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Arc<Timeline>, CreateTimelineError> {
|
||||
self.branch_timeline_impl(src_timeline, dst_id, start_lsn, ctx)
|
||||
self.branch_timeline_impl(src_timeline, dst_id, start_lsn, timeline_uninit_mark, ctx)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -2839,13 +2997,14 @@ impl Tenant {
|
||||
src_timeline: &Arc<Timeline>,
|
||||
dst_id: TimelineId,
|
||||
start_lsn: Option<Lsn>,
|
||||
timeline_uninit_mark: TimelineUninitMark<'_>,
|
||||
_ctx: &RequestContext,
|
||||
) -> Result<Arc<Timeline>, CreateTimelineError> {
|
||||
let src_id = src_timeline.timeline_id;
|
||||
|
||||
// First acquire the GC lock so that another task cannot advance the GC
|
||||
// cutoff in 'gc_info', and make 'start_lsn' invalid, while we are
|
||||
// creating the branch.
|
||||
// We will validate our ancestor LSN in this function. Acquire the GC lock so that
|
||||
// this check cannot race with GC, and the ancestor LSN is guaranteed to remain
|
||||
// valid while we are creating the branch.
|
||||
let _gc_cs = self.gc_cs.lock().await;
|
||||
|
||||
// If no start LSN is specified, we branch the new timeline from the source timeline's last record LSN
|
||||
@@ -2855,13 +3014,6 @@ impl Tenant {
|
||||
lsn
|
||||
});
|
||||
|
||||
// Create a placeholder for the new branch. This will error
|
||||
// out if the new timeline ID is already in use.
|
||||
let timeline_uninit_mark = {
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
self.create_timeline_uninit_mark(dst_id, &timelines)?
|
||||
};
|
||||
|
||||
// Ensure that `start_lsn` is valid, i.e. the LSN is within the PITR
|
||||
// horizon on the source timeline
|
||||
//
|
||||
@@ -2953,21 +3105,38 @@ impl Tenant {
|
||||
Ok(new_timeline)
|
||||
}
|
||||
|
||||
/// - run initdb to init temporary instance and get bootstrap data
|
||||
/// - after initialization completes, tar up the temp dir and upload it to S3.
|
||||
///
|
||||
/// The caller is responsible for activating the returned timeline.
|
||||
pub(crate) async fn bootstrap_timeline(
|
||||
/// For unit tests, make this visible so that other modules can directly create timelines
|
||||
#[cfg(test)]
|
||||
pub(crate) async fn bootstrap_timeline_test(
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
pg_version: u32,
|
||||
load_existing_initdb: Option<TimelineId>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let timeline_uninit_mark = {
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
self.create_timeline_uninit_mark(timeline_id, &timelines)?
|
||||
};
|
||||
let uninit_mark = self.create_timeline_uninit_mark(timeline_id).unwrap();
|
||||
self.bootstrap_timeline(
|
||||
timeline_id,
|
||||
pg_version,
|
||||
load_existing_initdb,
|
||||
uninit_mark,
|
||||
ctx,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// - run initdb to init temporary instance and get bootstrap data
|
||||
/// - after initialization completes, tar up the temp dir and upload it to S3.
|
||||
///
|
||||
/// The caller is responsible for activating the returned timeline.
|
||||
async fn bootstrap_timeline(
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
pg_version: u32,
|
||||
load_existing_initdb: Option<TimelineId>,
|
||||
timeline_uninit_mark: TimelineUninitMark<'_>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
// create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/`
|
||||
// temporary directory for basebackup files for the given timeline.
|
||||
|
||||
@@ -3048,8 +3217,9 @@ impl Tenant {
|
||||
3,
|
||||
u32::MAX,
|
||||
"persist_initdb_tar_zst",
|
||||
// TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066)
|
||||
backoff::Cancel::new(CancellationToken::new(), || unreachable!()),
|
||||
backoff::Cancel::new(self.cancel.clone(), || {
|
||||
anyhow::anyhow!("initdb upload cancelled")
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -3164,11 +3334,11 @@ impl Tenant {
|
||||
/// at 'disk_consistent_lsn'. After any initial data has been imported, call
|
||||
/// `finish_creation` to insert the Timeline into the timelines map and to remove the
|
||||
/// uninit mark file.
|
||||
async fn prepare_new_timeline(
|
||||
&self,
|
||||
async fn prepare_new_timeline<'a>(
|
||||
&'a self,
|
||||
new_timeline_id: TimelineId,
|
||||
new_metadata: &TimelineMetadata,
|
||||
uninit_mark: TimelineUninitMark,
|
||||
uninit_mark: TimelineUninitMark<'a>,
|
||||
start_lsn: Lsn,
|
||||
ancestor: Option<Arc<Timeline>>,
|
||||
) -> anyhow::Result<UninitializedTimeline> {
|
||||
@@ -3241,23 +3411,38 @@ impl Tenant {
|
||||
fn create_timeline_uninit_mark(
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
timelines: &MutexGuard<HashMap<TimelineId, Arc<Timeline>>>,
|
||||
) -> anyhow::Result<TimelineUninitMark> {
|
||||
) -> Result<TimelineUninitMark, TimelineExclusionError> {
|
||||
let tenant_shard_id = self.tenant_shard_id;
|
||||
|
||||
anyhow::ensure!(
|
||||
timelines.get(&timeline_id).is_none(),
|
||||
"Timeline {tenant_shard_id}/{timeline_id} already exists in pageserver's memory"
|
||||
);
|
||||
let timeline_path = self.conf.timeline_path(&tenant_shard_id, &timeline_id);
|
||||
anyhow::ensure!(
|
||||
!timeline_path.exists(),
|
||||
"Timeline {timeline_path} already exists, cannot create its uninit mark file",
|
||||
);
|
||||
|
||||
let uninit_mark_path = self
|
||||
.conf
|
||||
.timeline_uninit_mark_file_path(tenant_shard_id, timeline_id);
|
||||
let timeline_path = self.conf.timeline_path(&tenant_shard_id, &timeline_id);
|
||||
|
||||
let uninit_mark = TimelineUninitMark::new(
|
||||
self,
|
||||
timeline_id,
|
||||
uninit_mark_path.clone(),
|
||||
timeline_path.clone(),
|
||||
)?;
|
||||
|
||||
// At this stage, we have got exclusive access to in-memory state for this timeline ID
|
||||
// for creation.
|
||||
// A timeline directory should never exist on disk already:
|
||||
// - a previous failed creation would have cleaned up after itself
|
||||
// - a pageserver restart would clean up timeline directories that don't have valid remote state
|
||||
//
|
||||
// Therefore it is an unexpected internal error to encounter a timeline directory already existing here,
|
||||
// this error may indicate a bug in cleanup on failed creations.
|
||||
if timeline_path.exists() {
|
||||
return Err(TimelineExclusionError::Other(anyhow::anyhow!(
|
||||
"Timeline directory already exists! This is a bug."
|
||||
)));
|
||||
}
|
||||
|
||||
// Create the on-disk uninit mark _after_ the in-memory acquisition of the tenant ID: guarantees
|
||||
// that during process runtime, colliding creations will be caught in-memory without getting
|
||||
// as far as failing to write a file.
|
||||
fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.create_new(true)
|
||||
@@ -3271,8 +3456,6 @@ impl Tenant {
|
||||
format!("Failed to crate uninit mark for timeline {tenant_shard_id}/{timeline_id}")
|
||||
})?;
|
||||
|
||||
let uninit_mark = TimelineUninitMark::new(uninit_mark_path, timeline_path);
|
||||
|
||||
Ok(uninit_mark)
|
||||
}
|
||||
|
||||
@@ -4022,13 +4205,7 @@ mod tests {
|
||||
.await
|
||||
{
|
||||
Ok(_) => panic!("duplicate timeline creation should fail"),
|
||||
Err(e) => assert_eq!(
|
||||
e.to_string(),
|
||||
format!(
|
||||
"Timeline {}/{} already exists in pageserver's memory",
|
||||
tenant.tenant_shard_id, TIMELINE_ID
|
||||
)
|
||||
),
|
||||
Err(e) => assert_eq!(e.to_string(), "Already exists".to_string()),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -28,7 +28,7 @@ use crate::control_plane_client::{
|
||||
ControlPlaneClient, ControlPlaneGenerationsApi, RetryForeverError,
|
||||
};
|
||||
use crate::deletion_queue::DeletionQueueClient;
|
||||
use crate::metrics::TENANT_MANAGER as METRICS;
|
||||
use crate::metrics::{TENANT, TENANT_MANAGER as METRICS};
|
||||
use crate::task_mgr::{self, TaskKind};
|
||||
use crate::tenant::config::{
|
||||
AttachedLocationConfig, AttachmentMode, LocationConf, LocationMode, TenantConfOpt,
|
||||
@@ -44,7 +44,6 @@ use utils::generation::Generation;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use super::delete::DeleteTenantError;
|
||||
use super::timeline::delete::DeleteTimelineFlow;
|
||||
use super::TenantSharedResources;
|
||||
|
||||
/// For a tenant that appears in TenantsMap, it may either be
|
||||
@@ -430,6 +429,13 @@ pub async fn init_tenant_mgr(
|
||||
let tenant_generations =
|
||||
init_load_generations(conf, &tenant_configs, &resources, &cancel).await?;
|
||||
|
||||
tracing::info!(
|
||||
"Attaching {} tenants at startup, {} at a time",
|
||||
tenant_configs.len(),
|
||||
init_order.warmup_limit.available_permits()
|
||||
);
|
||||
TENANT.startup_scheduled.inc_by(tenant_configs.len() as u64);
|
||||
|
||||
// Construct `Tenant` objects and start them running
|
||||
for (tenant_shard_id, location_conf) in tenant_configs {
|
||||
let tenant_dir_path = conf.tenant_path(&tenant_shard_id);
|
||||
@@ -848,17 +854,6 @@ impl TenantManager {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_timeline(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
_ctx: &RequestContext,
|
||||
) -> Result<(), DeleteTimelineError> {
|
||||
let tenant = self.get_attached_tenant_shard(tenant_shard_id, true)?;
|
||||
DeleteTimelineFlow::run(&tenant, timeline_id, false).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
|
||||
pub(crate) async fn upsert_location(
|
||||
&self,
|
||||
@@ -1221,7 +1216,10 @@ pub(crate) async fn get_active_tenant_with_timeout(
|
||||
// Fast path: we don't need to do any async waiting.
|
||||
return Ok(tenant.clone());
|
||||
}
|
||||
_ => (WaitFor::Tenant(tenant.clone()), tenant_shard_id),
|
||||
_ => {
|
||||
tenant.activate_now.notify_one();
|
||||
(WaitFor::Tenant(tenant.clone()), tenant_shard_id)
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(TenantSlot::Secondary) => {
|
||||
@@ -1275,28 +1273,10 @@ pub(crate) async fn get_active_tenant_with_timeout(
|
||||
};
|
||||
|
||||
tracing::debug!("Waiting for tenant to enter active state...");
|
||||
match timeout_cancellable(
|
||||
deadline.duration_since(Instant::now()),
|
||||
cancel,
|
||||
tenant.wait_to_become_active(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Ok(())) => Ok(tenant),
|
||||
Ok(Err(e)) => Err(e),
|
||||
Err(TimeoutCancellableError::Timeout) => {
|
||||
let latest_state = tenant.current_state();
|
||||
if latest_state == TenantState::Active {
|
||||
Ok(tenant)
|
||||
} else {
|
||||
Err(GetActiveTenantError::WaitForActiveTimeout {
|
||||
latest_state: Some(latest_state),
|
||||
wait_time: timeout,
|
||||
})
|
||||
}
|
||||
}
|
||||
Err(TimeoutCancellableError::Cancelled) => Err(GetActiveTenantError::Cancelled),
|
||||
}
|
||||
tenant
|
||||
.wait_to_become_active(deadline.duration_since(Instant::now()))
|
||||
.await?;
|
||||
Ok(tenant)
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_tenant(
|
||||
|
||||
@@ -4,8 +4,9 @@ use anyhow::{bail, Context};
|
||||
use camino::Utf8Path;
|
||||
use fail::fail_point;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::io::ErrorKind;
|
||||
use std::io::{ErrorKind, SeekFrom};
|
||||
use tokio::fs::{self, File};
|
||||
use tokio::io::AsyncSeekExt;
|
||||
|
||||
use super::Generation;
|
||||
use crate::{
|
||||
@@ -119,11 +120,14 @@ pub(crate) async fn upload_initdb_dir(
|
||||
storage: &GenericRemoteStorage,
|
||||
tenant_id: &TenantId,
|
||||
timeline_id: &TimelineId,
|
||||
initdb_tar_zst: File,
|
||||
mut initdb_tar_zst: File,
|
||||
size: u64,
|
||||
) -> anyhow::Result<()> {
|
||||
tracing::trace!("uploading initdb dir");
|
||||
|
||||
// We might have read somewhat into the file already in the prior retry attempt
|
||||
initdb_tar_zst.seek(SeekFrom::Start(0)).await?;
|
||||
|
||||
let file = tokio_util::io::ReaderStream::with_capacity(initdb_tar_zst, super::BUFFER_SIZE);
|
||||
|
||||
let remote_path = remote_initdb_archive_path(tenant_id, timeline_id);
|
||||
|
||||
@@ -457,6 +457,8 @@ struct LayerInner {
|
||||
/// For loaded layers, this may be some other value if the tenant has undergone
|
||||
/// a shard split since the layer was originally written.
|
||||
shard: ShardIndex,
|
||||
|
||||
last_evicted_at: std::sync::Mutex<Option<std::time::Instant>>,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for LayerInner {
|
||||
@@ -587,6 +589,7 @@ impl LayerInner {
|
||||
consecutive_failures: AtomicUsize::new(0),
|
||||
generation,
|
||||
shard,
|
||||
last_evicted_at: std::sync::Mutex::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -722,6 +725,14 @@ impl LayerInner {
|
||||
permit
|
||||
};
|
||||
|
||||
let since_last_eviction =
|
||||
self.last_evicted_at.lock().unwrap().map(|ts| ts.elapsed());
|
||||
if let Some(since_last_eviction) = since_last_eviction {
|
||||
// FIXME: this will not always be recorded correctly until #6028 (the no
|
||||
// download needed branch above)
|
||||
LAYER_IMPL_METRICS.record_redownloaded_after(since_last_eviction);
|
||||
}
|
||||
|
||||
let res = Arc::new(DownloadedLayer {
|
||||
owner: Arc::downgrade(self),
|
||||
kind: tokio::sync::OnceCell::default(),
|
||||
@@ -1117,6 +1128,8 @@ impl LayerInner {
|
||||
// we are still holding the permit, so no new spawn_download_and_wait can happen
|
||||
drop(self.status.send(Status::Evicted));
|
||||
|
||||
*self.last_evicted_at.lock().unwrap() = Some(std::time::Instant::now());
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
@@ -1421,6 +1434,7 @@ pub(crate) struct LayerImplMetrics {
|
||||
|
||||
rare_counters: enum_map::EnumMap<RareEvent, IntCounter>,
|
||||
inits_cancelled: metrics::core::GenericCounter<metrics::core::AtomicU64>,
|
||||
redownload_after: metrics::Histogram,
|
||||
}
|
||||
|
||||
impl Default for LayerImplMetrics {
|
||||
@@ -1496,6 +1510,26 @@ impl Default for LayerImplMetrics {
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let redownload_after = {
|
||||
let minute = 60.0;
|
||||
let hour = 60.0 * minute;
|
||||
metrics::register_histogram!(
|
||||
"pageserver_layer_redownloaded_after",
|
||||
"Time between evicting and re-downloading.",
|
||||
vec![
|
||||
10.0,
|
||||
30.0,
|
||||
minute,
|
||||
5.0 * minute,
|
||||
15.0 * minute,
|
||||
30.0 * minute,
|
||||
hour,
|
||||
12.0 * hour,
|
||||
]
|
||||
)
|
||||
.unwrap()
|
||||
};
|
||||
|
||||
Self {
|
||||
started_evictions,
|
||||
completed_evictions,
|
||||
@@ -1507,6 +1541,7 @@ impl Default for LayerImplMetrics {
|
||||
|
||||
rare_counters,
|
||||
inits_cancelled,
|
||||
redownload_after,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1574,6 +1609,10 @@ impl LayerImplMetrics {
|
||||
fn inc_init_cancelled(&self) {
|
||||
self.inits_cancelled.inc()
|
||||
}
|
||||
|
||||
fn record_redownloaded_after(&self, duration: std::time::Duration) {
|
||||
self.redownload_after.observe(duration.as_secs_f64())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(enum_map::Enum)]
|
||||
|
||||
@@ -54,29 +54,18 @@ impl BackgroundLoopKind {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) enum RateLimitError {
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
pub(crate) async fn concurrent_background_tasks_rate_limit(
|
||||
/// Cancellation safe.
|
||||
pub(crate) async fn concurrent_background_tasks_rate_limit_permit(
|
||||
loop_kind: BackgroundLoopKind,
|
||||
_ctx: &RequestContext,
|
||||
cancel: &CancellationToken,
|
||||
) -> Result<impl Drop, RateLimitError> {
|
||||
) -> impl Drop {
|
||||
let _guard = crate::metrics::BACKGROUND_LOOP_SEMAPHORE_WAIT_GAUGE
|
||||
.with_label_values(&[loop_kind.as_static_str()])
|
||||
.guard();
|
||||
|
||||
tokio::select! {
|
||||
permit = CONCURRENT_BACKGROUND_TASKS.acquire() => {
|
||||
match permit {
|
||||
Ok(permit) => Ok(permit),
|
||||
Err(_closed) => unreachable!("we never close the semaphore"),
|
||||
}
|
||||
},
|
||||
_ = cancel.cancelled() => {
|
||||
Err(RateLimitError::Cancelled)
|
||||
}
|
||||
match CONCURRENT_BACKGROUND_TASKS.acquire().await {
|
||||
Ok(permit) => permit,
|
||||
Err(_closed) => unreachable!("we never close the semaphore"),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -51,7 +51,7 @@ use crate::tenant::storage_layer::{
|
||||
LayerAccessStatsReset, LayerFileName, ResidentLayer, ValueReconstructResult,
|
||||
ValueReconstructState,
|
||||
};
|
||||
use crate::tenant::tasks::{BackgroundLoopKind, RateLimitError};
|
||||
use crate::tenant::tasks::BackgroundLoopKind;
|
||||
use crate::tenant::timeline::logical_size::CurrentLogicalSize;
|
||||
use crate::tenant::{
|
||||
layer_map::{LayerMap, SearchResult},
|
||||
@@ -446,6 +446,12 @@ pub(crate) enum CompactFlags {
|
||||
ForceRepartition,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for Timeline {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(f, "Timeline<{}>", self.timeline_id)
|
||||
}
|
||||
}
|
||||
|
||||
/// Public interface functions
|
||||
impl Timeline {
|
||||
/// Get the LSN where this branch was created
|
||||
@@ -709,19 +715,27 @@ impl Timeline {
|
||||
flags: EnumSet<CompactFlags>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), CompactionError> {
|
||||
let _g = self.compaction_lock.lock().await;
|
||||
// most likely the cancellation token is from background task, but in tests it could be the
|
||||
// request task as well.
|
||||
|
||||
let prepare = async move {
|
||||
let guard = self.compaction_lock.lock().await;
|
||||
|
||||
let permit = super::tasks::concurrent_background_tasks_rate_limit_permit(
|
||||
BackgroundLoopKind::Compaction,
|
||||
ctx,
|
||||
)
|
||||
.await;
|
||||
|
||||
(guard, permit)
|
||||
};
|
||||
|
||||
// this wait probably never needs any "long time spent" logging, because we already nag if
|
||||
// compaction task goes over it's period (20s) which is quite often in production.
|
||||
let _permit = match super::tasks::concurrent_background_tasks_rate_limit(
|
||||
BackgroundLoopKind::Compaction,
|
||||
ctx,
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(permit) => permit,
|
||||
Err(RateLimitError::Cancelled) => return Ok(()),
|
||||
let (_guard, _permit) = tokio::select! {
|
||||
tuple = prepare => { tuple },
|
||||
_ = self.cancel.cancelled() => return Ok(()),
|
||||
_ = cancel.cancelled() => return Ok(()),
|
||||
};
|
||||
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
@@ -1776,22 +1790,22 @@ impl Timeline {
|
||||
let skip_concurrency_limiter = &skip_concurrency_limiter;
|
||||
async move {
|
||||
let cancel = task_mgr::shutdown_token();
|
||||
let wait_for_permit = super::tasks::concurrent_background_tasks_rate_limit(
|
||||
let wait_for_permit = super::tasks::concurrent_background_tasks_rate_limit_permit(
|
||||
BackgroundLoopKind::InitialLogicalSizeCalculation,
|
||||
background_ctx,
|
||||
&cancel,
|
||||
);
|
||||
|
||||
use crate::metrics::initial_logical_size::StartCircumstances;
|
||||
let (_maybe_permit, circumstances) = tokio::select! {
|
||||
res = wait_for_permit => {
|
||||
match res {
|
||||
Ok(permit) => (Some(permit), StartCircumstances::AfterBackgroundTasksRateLimit),
|
||||
Err(RateLimitError::Cancelled) => {
|
||||
return Err(BackgroundCalculationError::Cancelled);
|
||||
}
|
||||
}
|
||||
permit = wait_for_permit => {
|
||||
(Some(permit), StartCircumstances::AfterBackgroundTasksRateLimit)
|
||||
}
|
||||
_ = self_ref.cancel.cancelled() => {
|
||||
return Err(BackgroundCalculationError::Cancelled);
|
||||
}
|
||||
_ = cancel.cancelled() => {
|
||||
return Err(BackgroundCalculationError::Cancelled);
|
||||
},
|
||||
() = skip_concurrency_limiter.cancelled() => {
|
||||
// Some action that is part of a end user interaction requested logical size
|
||||
// => break out of the rate limit
|
||||
@@ -1889,6 +1903,7 @@ impl Timeline {
|
||||
.set((calculated_size, metrics_guard.calculation_result_saved()))
|
||||
.ok()
|
||||
.expect("only this task sets it");
|
||||
self.current_logical_size.initialized.notify_one();
|
||||
}
|
||||
|
||||
pub fn spawn_ondemand_logical_size_calculation(
|
||||
@@ -3090,6 +3105,24 @@ impl Timeline {
|
||||
|
||||
Ok(image_layers)
|
||||
}
|
||||
|
||||
/// Wait until the background initial logical size calculation is complete, or
|
||||
/// this Timeline is shut down. Calling this function will cause the initial
|
||||
/// logical size calculation to skip waiting for the background jobs barrier.
|
||||
pub(crate) async fn await_initial_logical_size(self: Arc<Self>) {
|
||||
if let Some(await_bg_cancel) = self
|
||||
.current_logical_size
|
||||
.cancel_wait_for_background_loop_concurrency_limit_semaphore
|
||||
.get()
|
||||
{
|
||||
await_bg_cancel.cancel();
|
||||
}
|
||||
|
||||
tokio::select!(
|
||||
_ = self.current_logical_size.initialized.notified() => {},
|
||||
_ = self.cancel.cancelled() => {}
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
@@ -3846,7 +3879,14 @@ impl Timeline {
|
||||
/// within a layer file. We can only remove the whole file if it's fully
|
||||
/// obsolete.
|
||||
pub(super) async fn gc(&self) -> anyhow::Result<GcResult> {
|
||||
let _g = self.gc_lock.lock().await;
|
||||
// this is most likely the background tasks, but it might be the spawned task from
|
||||
// immediate_gc
|
||||
let cancel = crate::task_mgr::shutdown_token();
|
||||
let _g = tokio::select! {
|
||||
guard = self.gc_lock.lock() => guard,
|
||||
_ = self.cancel.cancelled() => return Ok(GcResult::default()),
|
||||
_ = cancel.cancelled() => return Ok(GcResult::default()),
|
||||
};
|
||||
let timer = self.metrics.garbage_collect_histo.start_timer();
|
||||
|
||||
fail_point!("before-timeline-gc");
|
||||
|
||||
@@ -30,7 +30,7 @@ use crate::{
|
||||
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
|
||||
tenant::{
|
||||
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
|
||||
tasks::{BackgroundLoopKind, RateLimitError},
|
||||
tasks::BackgroundLoopKind,
|
||||
timeline::EvictionError,
|
||||
LogicalSizeCalculationCause, Tenant,
|
||||
},
|
||||
@@ -158,15 +158,15 @@ impl Timeline {
|
||||
) -> ControlFlow<()> {
|
||||
let now = SystemTime::now();
|
||||
|
||||
let _permit = match crate::tenant::tasks::concurrent_background_tasks_rate_limit(
|
||||
let acquire_permit = crate::tenant::tasks::concurrent_background_tasks_rate_limit_permit(
|
||||
BackgroundLoopKind::Eviction,
|
||||
ctx,
|
||||
cancel,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(permit) => permit,
|
||||
Err(RateLimitError::Cancelled) => return ControlFlow::Break(()),
|
||||
);
|
||||
|
||||
let _permit = tokio::select! {
|
||||
permit = acquire_permit => permit,
|
||||
_ = cancel.cancelled() => return ControlFlow::Break(()),
|
||||
_ = self.cancel.cancelled() => return ControlFlow::Break(()),
|
||||
};
|
||||
|
||||
// If we evict layers but keep cached values derived from those layers, then
|
||||
|
||||
@@ -34,6 +34,9 @@ pub(super) struct LogicalSize {
|
||||
pub(crate) cancel_wait_for_background_loop_concurrency_limit_semaphore:
|
||||
OnceCell<CancellationToken>,
|
||||
|
||||
/// Once the initial logical size is initialized, this is notified.
|
||||
pub(crate) initialized: tokio::sync::Notify,
|
||||
|
||||
/// Latest Lsn that has its size uncalculated, could be absent for freshly created timelines.
|
||||
pub initial_part_end: Option<Lsn>,
|
||||
|
||||
@@ -125,6 +128,7 @@ impl LogicalSize {
|
||||
initial_part_end: None,
|
||||
size_added_after_initial: AtomicI64::new(0),
|
||||
did_return_approximate_to_walreceiver: AtomicBool::new(false),
|
||||
initialized: tokio::sync::Notify::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -135,6 +139,7 @@ impl LogicalSize {
|
||||
initial_part_end: Some(compute_to),
|
||||
size_added_after_initial: AtomicI64::new(0),
|
||||
did_return_approximate_to_walreceiver: AtomicBool::new(false),
|
||||
initialized: tokio::sync::Notify::new(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -19,14 +19,14 @@ use super::Timeline;
|
||||
pub struct UninitializedTimeline<'t> {
|
||||
pub(crate) owning_tenant: &'t Tenant,
|
||||
timeline_id: TimelineId,
|
||||
raw_timeline: Option<(Arc<Timeline>, TimelineUninitMark)>,
|
||||
raw_timeline: Option<(Arc<Timeline>, TimelineUninitMark<'t>)>,
|
||||
}
|
||||
|
||||
impl<'t> UninitializedTimeline<'t> {
|
||||
pub(crate) fn new(
|
||||
owning_tenant: &'t Tenant,
|
||||
timeline_id: TimelineId,
|
||||
raw_timeline: Option<(Arc<Timeline>, TimelineUninitMark)>,
|
||||
raw_timeline: Option<(Arc<Timeline>, TimelineUninitMark<'t>)>,
|
||||
) -> Self {
|
||||
Self {
|
||||
owning_tenant,
|
||||
@@ -169,18 +169,55 @@ pub(crate) fn cleanup_timeline_directory(uninit_mark: TimelineUninitMark) {
|
||||
///
|
||||
/// XXX: it's important to create it near the timeline dir, not inside it to ensure timeline dir gets removed first.
|
||||
#[must_use]
|
||||
pub(crate) struct TimelineUninitMark {
|
||||
pub(crate) struct TimelineUninitMark<'t> {
|
||||
owning_tenant: &'t Tenant,
|
||||
timeline_id: TimelineId,
|
||||
uninit_mark_deleted: bool,
|
||||
uninit_mark_path: Utf8PathBuf,
|
||||
pub(crate) timeline_path: Utf8PathBuf,
|
||||
}
|
||||
|
||||
impl TimelineUninitMark {
|
||||
pub(crate) fn new(uninit_mark_path: Utf8PathBuf, timeline_path: Utf8PathBuf) -> Self {
|
||||
Self {
|
||||
uninit_mark_deleted: false,
|
||||
uninit_mark_path,
|
||||
timeline_path,
|
||||
/// Errors when acquiring exclusive access to a timeline ID for creation
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum TimelineExclusionError {
|
||||
#[error("Already exists")]
|
||||
AlreadyExists(Arc<Timeline>),
|
||||
#[error("Already creating")]
|
||||
AlreadyCreating,
|
||||
|
||||
// e.g. I/O errors, or some failure deep in postgres initdb
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
impl<'t> TimelineUninitMark<'t> {
|
||||
pub(crate) fn new(
|
||||
owning_tenant: &'t Tenant,
|
||||
timeline_id: TimelineId,
|
||||
uninit_mark_path: Utf8PathBuf,
|
||||
timeline_path: Utf8PathBuf,
|
||||
) -> Result<Self, TimelineExclusionError> {
|
||||
// Lock order: this is the only place we take both locks. During drop() we only
|
||||
// lock creating_timelines
|
||||
let timelines = owning_tenant.timelines.lock().unwrap();
|
||||
let mut creating_timelines: std::sync::MutexGuard<
|
||||
'_,
|
||||
std::collections::HashSet<TimelineId>,
|
||||
> = owning_tenant.timelines_creating.lock().unwrap();
|
||||
|
||||
if let Some(existing) = timelines.get(&timeline_id) {
|
||||
Err(TimelineExclusionError::AlreadyExists(existing.clone()))
|
||||
} else if creating_timelines.contains(&timeline_id) {
|
||||
Err(TimelineExclusionError::AlreadyCreating)
|
||||
} else {
|
||||
creating_timelines.insert(timeline_id);
|
||||
Ok(Self {
|
||||
owning_tenant,
|
||||
timeline_id,
|
||||
uninit_mark_deleted: false,
|
||||
uninit_mark_path,
|
||||
timeline_path,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -207,7 +244,7 @@ impl TimelineUninitMark {
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TimelineUninitMark {
|
||||
impl Drop for TimelineUninitMark<'_> {
|
||||
fn drop(&mut self) {
|
||||
if !self.uninit_mark_deleted {
|
||||
if self.timeline_path.exists() {
|
||||
@@ -226,5 +263,11 @@ impl Drop for TimelineUninitMark {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.owning_tenant
|
||||
.timelines_creating
|
||||
.lock()
|
||||
.unwrap()
|
||||
.remove(&self.timeline_id);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2191,7 +2191,7 @@ mod tests {
|
||||
.load()
|
||||
.await;
|
||||
let tline = tenant
|
||||
.bootstrap_timeline(TIMELINE_ID, pg_version, None, &ctx)
|
||||
.bootstrap_timeline_test(TIMELINE_ID, pg_version, None, &ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
||||
@@ -33,39 +33,6 @@ impl Aimd {
|
||||
min_utilisation_threshold: config.aimd_min_utilisation_threshold,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn decrease_factor(self, factor: f32) -> Self {
|
||||
assert!((0.5..1.0).contains(&factor));
|
||||
Self {
|
||||
decrease_factor: factor,
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
pub fn increase_by(self, increase: usize) -> Self {
|
||||
assert!(increase > 0);
|
||||
Self {
|
||||
increase_by: increase,
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_max_limit(self, max: usize) -> Self {
|
||||
assert!(max > 0);
|
||||
Self {
|
||||
max_limit: max,
|
||||
..self
|
||||
}
|
||||
}
|
||||
|
||||
/// A threshold below which the limit won't be increased. 0.5 = 50%.
|
||||
pub fn with_min_utilisation_threshold(self, min_util: f32) -> Self {
|
||||
assert!(min_util > 0. && min_util < 1.);
|
||||
Self {
|
||||
min_utilisation_threshold: min_util,
|
||||
..self
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
||||
@@ -1,12 +1,16 @@
|
||||
use std::sync::{
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
Arc,
|
||||
use std::{
|
||||
collections::hash_map::RandomState,
|
||||
hash::BuildHasher,
|
||||
sync::{
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
Arc, Mutex,
|
||||
},
|
||||
};
|
||||
|
||||
use anyhow::bail;
|
||||
use dashmap::DashMap;
|
||||
use itertools::Itertools;
|
||||
use rand::{thread_rng, Rng};
|
||||
use rand::{rngs::StdRng, Rng, SeedableRng};
|
||||
use smol_str::SmolStr;
|
||||
use tokio::sync::{Mutex as AsyncMutex, Semaphore, SemaphorePermit};
|
||||
use tokio::time::{timeout, Duration, Instant};
|
||||
@@ -28,10 +32,11 @@ use super::{
|
||||
// saw SNI, before doing TLS handshake. User-side error messages in that case
|
||||
// does not look very nice (`SSL SYSCALL error: Undefined error: 0`), so for now
|
||||
// I went with a more expensive way that yields user-friendlier error messages.
|
||||
pub struct EndpointRateLimiter {
|
||||
map: DashMap<SmolStr, Vec<RateBucket>>,
|
||||
pub struct EndpointRateLimiter<Rand = StdRng, Hasher = RandomState> {
|
||||
map: DashMap<SmolStr, Vec<RateBucket>, Hasher>,
|
||||
info: &'static [RateBucketInfo],
|
||||
access_count: AtomicUsize,
|
||||
rand: Mutex<Rand>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
@@ -125,11 +130,18 @@ impl RateBucketInfo {
|
||||
|
||||
impl EndpointRateLimiter {
|
||||
pub fn new(info: &'static [RateBucketInfo]) -> Self {
|
||||
Self::new_with_rand_and_hasher(info, StdRng::from_entropy(), RandomState::new())
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: Rng, S: BuildHasher + Clone> EndpointRateLimiter<R, S> {
|
||||
fn new_with_rand_and_hasher(info: &'static [RateBucketInfo], rand: R, hasher: S) -> Self {
|
||||
info!(buckets = ?info, "endpoint rate limiter");
|
||||
Self {
|
||||
info,
|
||||
map: DashMap::with_shard_amount(64),
|
||||
map: DashMap::with_hasher_and_shard_amount(hasher, 64),
|
||||
access_count: AtomicUsize::new(1), // start from 1 to avoid GC on the first request
|
||||
rand: Mutex::new(rand),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -176,7 +188,9 @@ impl EndpointRateLimiter {
|
||||
self.map.len()
|
||||
);
|
||||
let n = self.map.shards().len();
|
||||
let shard = thread_rng().gen_range(0..n);
|
||||
// this lock is ok as the periodic cycle of do_gc makes this very unlikely to collide
|
||||
// (impossible, infact, unless we have 2048 threads)
|
||||
let shard = self.rand.lock().unwrap().gen_range(0..n);
|
||||
self.map.shards()[shard].write().clear();
|
||||
}
|
||||
}
|
||||
@@ -219,7 +233,6 @@ pub struct Token<'t> {
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct LimiterState {
|
||||
limit: usize,
|
||||
available: usize,
|
||||
in_flight: usize,
|
||||
}
|
||||
|
||||
@@ -397,11 +410,7 @@ impl Limiter {
|
||||
pub fn state(&self) -> LimiterState {
|
||||
let limit = self.limits.load(Ordering::Relaxed);
|
||||
let in_flight = self.in_flight.load(Ordering::Relaxed);
|
||||
LimiterState {
|
||||
limit,
|
||||
available: limit.saturating_sub(in_flight),
|
||||
in_flight,
|
||||
}
|
||||
LimiterState { limit, in_flight }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -414,13 +423,6 @@ impl<'t> Token<'t> {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub fn set_latency(&mut self, latency: Duration) {
|
||||
use std::ops::Sub;
|
||||
|
||||
self.start = Instant::now().sub(latency);
|
||||
}
|
||||
|
||||
pub fn forget(&mut self) {
|
||||
if let Some(permit) = self.permit.take() {
|
||||
permit.forget();
|
||||
@@ -439,10 +441,6 @@ impl LimiterState {
|
||||
pub fn limit(&self) -> usize {
|
||||
self.limit
|
||||
}
|
||||
/// The amount of concurrency available to use.
|
||||
pub fn available(&self) -> usize {
|
||||
self.available
|
||||
}
|
||||
/// The number of jobs in flight.
|
||||
pub fn in_flight(&self) -> usize {
|
||||
self.in_flight
|
||||
@@ -490,9 +488,11 @@ impl reqwest_middleware::Middleware for Limiter {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{pin::pin, task::Context, time::Duration};
|
||||
use std::{hash::BuildHasherDefault, pin::pin, task::Context, time::Duration};
|
||||
|
||||
use futures::{task::noop_waker_ref, Future};
|
||||
use rand::SeedableRng;
|
||||
use rustc_hash::FxHasher;
|
||||
use smol_str::SmolStr;
|
||||
use tokio::time;
|
||||
|
||||
@@ -690,4 +690,21 @@ mod tests {
|
||||
assert!(limiter.check(endpoint.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_rate_limits_gc() {
|
||||
// fixed seeded random/hasher to ensure that the test is not flaky
|
||||
let rand = rand::rngs::StdRng::from_seed([1; 32]);
|
||||
let hasher = BuildHasherDefault::<FxHasher>::default();
|
||||
|
||||
let limiter = EndpointRateLimiter::new_with_rand_and_hasher(
|
||||
&RateBucketInfo::DEFAULT_SET,
|
||||
rand,
|
||||
hasher,
|
||||
);
|
||||
for i in 0..1_000_000 {
|
||||
limiter.check(format!("{i}").into());
|
||||
}
|
||||
assert!(limiter.map.len() < 150_000);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,15 +27,15 @@ use sync_wrapper::SyncWrapper;
|
||||
pin_project! {
|
||||
/// This is a wrapper around a [`WebSocketStream`] that
|
||||
/// implements [`AsyncRead`] and [`AsyncWrite`].
|
||||
pub struct WebSocketRw {
|
||||
pub struct WebSocketRw<S = Upgraded> {
|
||||
#[pin]
|
||||
stream: SyncWrapper<WebSocketStream<Upgraded>>,
|
||||
stream: SyncWrapper<WebSocketStream<S>>,
|
||||
bytes: Bytes,
|
||||
}
|
||||
}
|
||||
|
||||
impl WebSocketRw {
|
||||
pub fn new(stream: WebSocketStream<Upgraded>) -> Self {
|
||||
impl<S> WebSocketRw<S> {
|
||||
pub fn new(stream: WebSocketStream<S>) -> Self {
|
||||
Self {
|
||||
stream: stream.into(),
|
||||
bytes: Bytes::new(),
|
||||
@@ -43,7 +43,7 @@ impl WebSocketRw {
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncWrite for WebSocketRw {
|
||||
impl<S: AsyncRead + AsyncWrite + Unpin> AsyncWrite for WebSocketRw<S> {
|
||||
fn poll_write(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
@@ -69,7 +69,7 @@ impl AsyncWrite for WebSocketRw {
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncRead for WebSocketRw {
|
||||
impl<S: AsyncRead + AsyncWrite + Unpin> AsyncRead for WebSocketRw<S> {
|
||||
fn poll_read(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
@@ -86,7 +86,7 @@ impl AsyncRead for WebSocketRw {
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncBufRead for WebSocketRw {
|
||||
impl<S: AsyncRead + AsyncWrite + Unpin> AsyncBufRead for WebSocketRw<S> {
|
||||
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
|
||||
// Please refer to poll_fill_buf's documentation.
|
||||
const EOF: Poll<io::Result<&[u8]>> = Poll::Ready(Ok(&[]));
|
||||
@@ -151,3 +151,60 @@ pub async fn serve_websocket(
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::pin::pin;
|
||||
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use hyper_tungstenite::{
|
||||
tungstenite::{protocol::Role, Message},
|
||||
WebSocketStream,
|
||||
};
|
||||
use tokio::{
|
||||
io::{duplex, AsyncReadExt, AsyncWriteExt},
|
||||
task::JoinSet,
|
||||
};
|
||||
|
||||
use super::WebSocketRw;
|
||||
|
||||
#[tokio::test]
|
||||
async fn websocket_stream_wrapper_happy_path() {
|
||||
let (stream1, stream2) = duplex(1024);
|
||||
|
||||
let mut js = JoinSet::new();
|
||||
|
||||
js.spawn(async move {
|
||||
let mut client = WebSocketStream::from_raw_socket(stream1, Role::Client, None).await;
|
||||
|
||||
client
|
||||
.send(Message::Binary(b"hello world".to_vec()))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let message = client.next().await.unwrap().unwrap();
|
||||
assert_eq!(message, Message::Binary(b"websockets are cool".to_vec()));
|
||||
|
||||
client.close(None).await.unwrap();
|
||||
});
|
||||
|
||||
js.spawn(async move {
|
||||
let mut rw = pin!(WebSocketRw::new(
|
||||
WebSocketStream::from_raw_socket(stream2, Role::Server, None).await
|
||||
));
|
||||
|
||||
let mut buf = vec![0; 1024];
|
||||
let n = rw.read(&mut buf).await.unwrap();
|
||||
assert_eq!(&buf[..n], b"hello world");
|
||||
|
||||
rw.write_all(b"websockets are cool").await.unwrap();
|
||||
rw.flush().await.unwrap();
|
||||
|
||||
let n = rw.read_to_end(&mut buf).await.unwrap();
|
||||
assert_eq!(n, 0);
|
||||
});
|
||||
|
||||
js.join_next().await.unwrap().unwrap();
|
||||
js.join_next().await.unwrap().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2945,7 +2945,7 @@ class Safekeeper:
|
||||
tli_dir = self.timeline_dir(tenant_id, timeline_id)
|
||||
segments = []
|
||||
for _, _, filenames in os.walk(tli_dir):
|
||||
segments.extend([f for f in filenames if f != "safekeeper.control"])
|
||||
segments.extend([f for f in filenames if not f.startswith("safekeeper.control")])
|
||||
segments.sort()
|
||||
return segments
|
||||
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
import random
|
||||
import threading
|
||||
import time
|
||||
from queue import SimpleQueue
|
||||
from typing import Any, Dict, List, Union
|
||||
from typing import List
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
@@ -239,92 +238,6 @@ def test_cannot_branch_from_non_uploaded_branch(neon_env_builder: NeonEnvBuilder
|
||||
t.join()
|
||||
|
||||
|
||||
def test_competing_branchings_from_loading_race_to_ok_or_err(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
If the activate only after upload is used, then retries could become competing.
|
||||
"""
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
".*request{method=POST path=/v1/tenant/.*/timeline request_id=.*}: request was dropped before completing.*",
|
||||
".*Error processing HTTP request: InternalServerError\\(Timeline .*/.* already exists in pageserver's memory",
|
||||
]
|
||||
)
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
# pause all uploads
|
||||
ps_http.configure_failpoints(("before-upload-index-pausable", "pause"))
|
||||
env.pageserver.tenant_create(env.initial_tenant)
|
||||
|
||||
def start_creating_timeline():
|
||||
ps_http.timeline_create(
|
||||
env.pg_version, env.initial_tenant, env.initial_timeline, timeout=60
|
||||
)
|
||||
|
||||
create_root = threading.Thread(target=start_creating_timeline)
|
||||
|
||||
branch_id = TimelineId.generate()
|
||||
|
||||
queue: SimpleQueue[Union[Dict[Any, Any], Exception]] = SimpleQueue()
|
||||
barrier = threading.Barrier(3)
|
||||
|
||||
def try_branch():
|
||||
barrier.wait()
|
||||
barrier.wait()
|
||||
try:
|
||||
ret = ps_http.timeline_create(
|
||||
env.pg_version,
|
||||
env.initial_tenant,
|
||||
branch_id,
|
||||
ancestor_timeline_id=env.initial_timeline,
|
||||
timeout=5,
|
||||
)
|
||||
queue.put(ret)
|
||||
except Exception as e:
|
||||
queue.put(e)
|
||||
|
||||
threads = [threading.Thread(target=try_branch) for _ in range(2)]
|
||||
|
||||
try:
|
||||
create_root.start()
|
||||
|
||||
for t in threads:
|
||||
t.start()
|
||||
|
||||
wait_until_paused(env, "before-upload-index-pausable")
|
||||
|
||||
barrier.wait()
|
||||
ps_http.configure_failpoints(("before-upload-index-pausable", "off"))
|
||||
barrier.wait()
|
||||
|
||||
# now both requests race to branch, only one can win because they take gc_cs, Tenant::timelines or marker files
|
||||
first = queue.get()
|
||||
second = queue.get()
|
||||
|
||||
log.info(first)
|
||||
log.info(second)
|
||||
|
||||
(succeeded, failed) = (first, second) if isinstance(second, Exception) else (second, first)
|
||||
assert isinstance(failed, Exception)
|
||||
assert isinstance(succeeded, Dict)
|
||||
|
||||
# there's multiple valid status codes:
|
||||
# - Timeline x/y already exists
|
||||
# - whatever 409 response says, but that is a subclass of PageserverApiException
|
||||
assert isinstance(failed, PageserverApiException)
|
||||
assert succeeded["state"] == "Active"
|
||||
finally:
|
||||
# we might still have the failpoint active
|
||||
env.pageserver.stop(immediate=True)
|
||||
|
||||
for t in threads:
|
||||
t.join()
|
||||
create_root.join()
|
||||
|
||||
|
||||
def test_non_uploaded_root_timeline_is_deleted_after_restart(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Check that a timeline is deleted locally on subsequent restart if it never successfully uploaded during creation.
|
||||
|
||||
@@ -300,7 +300,8 @@ def test_timeline_initial_logical_size_calculation_cancellation(
|
||||
env = neon_env_builder.init_start()
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant()
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
# load in some data
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
|
||||
@@ -732,3 +733,139 @@ def wait_for_timeline_size_init(
|
||||
raise Exception(
|
||||
f"timed out while waiting for current_logical_size of a timeline to reach its non-incremental value, details: {timeline_details}"
|
||||
)
|
||||
|
||||
|
||||
def test_ondemand_activation(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Tenants warmuping up opportunistically will wait for one another's logical size calculations to complete
|
||||
before proceeding. However, they skip this if a client is actively trying to access them.
|
||||
|
||||
This test is not purely about logical sizes, but logical size calculation is the phase that we
|
||||
use as a proxy for "warming up" in this test: it happens within the semaphore guard used
|
||||
to limit concurrent tenant warm-up.
|
||||
"""
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
# Create some tenants
|
||||
n_tenants = 10
|
||||
tenant_ids = {env.initial_tenant}
|
||||
for _i in range(0, n_tenants - 1):
|
||||
tenant_id = TenantId.generate()
|
||||
env.pageserver.tenant_create(tenant_id)
|
||||
|
||||
# Empty tenants are not subject to waiting for logical size calculations, because
|
||||
# those hapen on timeline level
|
||||
branch_name = "main"
|
||||
timeline_id = TimelineId.generate()
|
||||
env.neon_cli.create_timeline(
|
||||
new_branch_name=branch_name, tenant_id=tenant_id, timeline_id=timeline_id
|
||||
)
|
||||
|
||||
tenant_ids.add(tenant_id)
|
||||
|
||||
# Restart pageserver with logical size calculations paused
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start(
|
||||
extra_env_vars={"FAILPOINTS": "timeline-calculate-logical-size-pause=pause"}
|
||||
)
|
||||
|
||||
def get_tenant_states():
|
||||
states = {}
|
||||
for tenant_id in tenant_ids:
|
||||
tenant = pageserver_http.tenant_status(tenant_id=tenant_id)
|
||||
states[tenant_id] = tenant["state"]["slug"]
|
||||
log.info(f"Tenant states: {states}")
|
||||
return states
|
||||
|
||||
def at_least_one_active():
|
||||
assert "Active" in set(get_tenant_states().values())
|
||||
|
||||
# One tenant should activate, then get stuck in their logical size calculation
|
||||
wait_until(10, 1, at_least_one_active)
|
||||
|
||||
# Wait some walltime to gain confidence that other tenants really are stuck and not proceeding to activate
|
||||
time.sleep(5)
|
||||
|
||||
# We should see one tenant win the activation race, and enter logical size calculation. The rest
|
||||
# will stay in Attaching state, waiting for the "warmup_limit" semaphore
|
||||
expect_activated = 1
|
||||
states = get_tenant_states()
|
||||
assert len([s for s in states.values() if s == "Active"]) == expect_activated
|
||||
assert len([s for s in states.values() if s == "Attaching"]) == n_tenants - expect_activated
|
||||
|
||||
assert (
|
||||
pageserver_http.get_metric_value("pageserver_tenant_startup_scheduled_total") == n_tenants
|
||||
)
|
||||
|
||||
# This is zero, and subsequent checks are expect_activated - 1, because this counter does not
|
||||
# count how may tenants are Active, it counts how many have finished warmup. The first tenant
|
||||
# that reached Active is still stuck in its local size calculation, and has therefore not finished warmup.
|
||||
assert pageserver_http.get_metric_value("pageserver_tenant_startup_complete_total") == 0
|
||||
|
||||
# If a client accesses one of the blocked tenants, it should skip waiting for warmup and
|
||||
# go active as fast as it can.
|
||||
stuck_tenant_id = list(
|
||||
[(tid, s) for (tid, s) in get_tenant_states().items() if s == "Attaching"]
|
||||
)[0][0]
|
||||
|
||||
endpoint = env.endpoints.create_start(branch_name="main", tenant_id=stuck_tenant_id)
|
||||
endpoint.safe_psql_many(
|
||||
[
|
||||
"CREATE TABLE foo (x INTEGER)",
|
||||
"INSERT INTO foo SELECT g FROM generate_series(1, 10) g",
|
||||
]
|
||||
)
|
||||
endpoint.stop()
|
||||
|
||||
# That one that we successfully accessed is now Active
|
||||
expect_activated += 1
|
||||
assert pageserver_http.tenant_status(tenant_id=stuck_tenant_id)["state"]["slug"] == "Active"
|
||||
assert (
|
||||
pageserver_http.get_metric_value("pageserver_tenant_startup_complete_total")
|
||||
== expect_activated - 1
|
||||
)
|
||||
|
||||
# The ones we didn't touch are still in Attaching
|
||||
assert (
|
||||
len([s for s in get_tenant_states().values() if s == "Attaching"])
|
||||
== n_tenants - expect_activated
|
||||
)
|
||||
|
||||
# Timeline creation operations also wake up Attaching tenants
|
||||
stuck_tenant_id = list(
|
||||
[(tid, s) for (tid, s) in get_tenant_states().items() if s == "Attaching"]
|
||||
)[0][0]
|
||||
pageserver_http.timeline_create(env.pg_version, stuck_tenant_id, TimelineId.generate())
|
||||
expect_activated += 1
|
||||
assert pageserver_http.tenant_status(tenant_id=stuck_tenant_id)["state"]["slug"] == "Active"
|
||||
assert (
|
||||
len([s for s in get_tenant_states().values() if s == "Attaching"])
|
||||
== n_tenants - expect_activated
|
||||
)
|
||||
|
||||
assert (
|
||||
pageserver_http.get_metric_value("pageserver_tenant_startup_complete_total")
|
||||
== expect_activated - 1
|
||||
)
|
||||
|
||||
# When we unblock logical size calculation, all tenants should proceed to active state via
|
||||
# the warmup route.
|
||||
pageserver_http.configure_failpoints(("timeline-calculate-logical-size-pause", "off"))
|
||||
|
||||
def all_active():
|
||||
assert all(s == "Active" for s in get_tenant_states().values())
|
||||
|
||||
wait_until(10, 1, all_active)
|
||||
|
||||
# Final control check: restarting with no failpoints at all results in all tenants coming active
|
||||
# without being prompted by client I/O
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
wait_until(10, 1, all_active)
|
||||
|
||||
assert (
|
||||
pageserver_http.get_metric_value("pageserver_tenant_startup_scheduled_total") == n_tenants
|
||||
)
|
||||
assert pageserver_http.get_metric_value("pageserver_tenant_startup_complete_total") == n_tenants
|
||||
|
||||
Reference in New Issue
Block a user