Compare commits

..

10 Commits

Author SHA1 Message Date
Anna Khanova
136ed19387 Test 2024-03-27 13:42:33 +01:00
Christian Schwarz
cdf12ed008 fix(walreceiver): Timeline::shutdown can leave a dangling handle_walreceiver_connection tokio task (#7235)
# Problem

As pointed out through doc-comments in this PR, `drop_old_connection` is
not cancellation-safe.

This means we can leave a `handle_walreceiver_connection` tokio task
dangling during Timeline shutdown.

More details described in the corresponding issue #7062.

# Solution

Don't cancel-by-drop the `connection_manager_loop_step` from the
`tokio::select!()` in the task_mgr task.
Instead, transform the code to use a `CancellationToken` ---
specifically, `task_mgr::shutdown_token()` --- and make code responsive
to it.

The `drop_old_connection()` is still not cancellation-safe and also
doesn't get a cancellation token, because there's no point inside the
function where we could return early if cancellation were requested
using a token.

We rely on the `handle_walreceiver_connection` to be sensitive to the
`TaskHandle`s cancellation token (argument name: `cancellation`).
Currently it checks for `cancellation` on each WAL message. It is
probably also sensitive to `Timeline::cancel` because ultimately all
that `handle_walreceiver_connection` does is interact with the
`Timeline`.

In summary, the above means that the following code (which is found in
`Timeline::shutdown`) now might **take longer**, but actually ensures
that all `handle_walreceiver_connection` tasks are finished:

```rust
task_mgr::shutdown_tasks(
    Some(TaskKind::WalReceiverManager),
    Some(self.tenant_shard_id),
    Some(self.timeline_id)
)
```

# Refs

refs #7062
2024-03-27 12:04:31 +01:00
Conrad Ludgate
12512f3173 add authentication rate limiting (#6865)
## Problem

https://github.com/neondatabase/cloud/issues/9642

## Summary of changes

1. Make `EndpointRateLimiter` generic, renamed as `BucketRateLimiter`
2. Add support for claiming multiple tokens at once
3. Add `AuthRateLimiter` alias.
4. Check `(Endpoint, IP)` pair during authentication, weighted by how
many hashes proxy would be doing.

TODO: handle ipv6 subnets. will do this in a separate PR.
2024-03-26 19:31:19 +00:00
John Spray
b3b7ce457c pageserver: remove bare mgr::get_tenant, mgr::list_tenants (#7237)
## Problem

This is a refactor.

This PR was a precursor to a much smaller change
e5bd602dc1,
where as I was writing it I found that we were not far from getting rid
of the last non-deprecated code paths that use `mgr::` scoped functions
to get at the TenantManager state.

We're almost done cleaning this up as per
https://github.com/neondatabase/neon/issues/5796. The only significant
remaining mgr:: item is `get_active_tenant_with_timeout`, which is
page_service's path for fetching tenants.

## Summary of changes

- Remove the bool argument to get_attached_tenant_shard: this was almost
always false from API use cases, and in cases when it was true, it was
readily replacable with an explicit check of the returned tenant's
status.
- Rather than letting the timeline eviction task query any tenant it
likes via `mgr::`, pass an `Arc<Tenant>` into the task. This is still an
ugly circular reference, but should eventually go away: either when we
switch to exclusively using disk usage eviction, or when we change
metadata storage to avoid the need to imitate layer accesses.
- Convert all the mgr::get_tenant call sites to use
TenantManager::get_attached_tenant_shard
- Move list_tenants into TenantManager.
2024-03-26 18:29:08 +00:00
John Spray
6814bb4b59 tests: add a log allow list to stabilize benchmarks (#7251)
## Problem

https://github.com/neondatabase/neon/pull/7227 destabilized various
tests in the performance suite, with log errors during shutdown. It's
because we switched shutdown order to stop the storage controller before
the pageservers.

## Summary of changes

- Tolerate "connection failed" errors from pageservers trying to
validation their deletion queue.
2024-03-26 17:44:18 +00:00
John Spray
b3bb1d1cad storage controller: make direct tenant creation more robust (#7247)
## Problem

- Creations were not idempotent (unique key violation)
- Creations waited for reconciliation, which control plane blocks while
an operation is in flight

## Summary of changes

- Handle unique key constraint violation as an OK situation: if we're
creating the same tenant ID and shard count, it's reasonable to assume
this is a duplicate creation.
- Make the wait for reconcile during creation tolerate failures: this is
similar to location_conf, where the cloud control plane blocks our
notification calls until it is done with calling into our API (in future
this constraint is expected to relax as the cloud control plane learns
to run multiple operations concurrently for a tenant)
2024-03-26 16:57:35 +00:00
John Spray
47d2b3a483 pageserver: limit total ephemeral layer bytes (#7218)
## Problem

Follows: https://github.com/neondatabase/neon/pull/7182

- Sufficient concurrent writes could OOM a pageserver from the size of
indices on all the InMemoryLayer instances.
- Enforcement of checkpoint_period only happened if there were some
writes.

Closes: https://github.com/neondatabase/neon/issues/6916

## Summary of changes

- Add `ephemeral_bytes_per_memory_kb` config property. This controls the
ratio of ephemeral layer capacity to memory capacity. The weird unit is
to enable making the ratio less than 1:1 (set this property to 1024 to
use 1MB of ephemeral layers for every 1MB of RAM, set it smaller to get
a fraction).
- Implement background layer rolling checks in
Timeline::compaction_iteration -- this ensures we apply layer rolling
policy in the absence of writes.
- During background checks, if the total ephemeral layer size has
exceeded the limit, then roll layers whose size is greater than the mean
size of all ephemeral layers.
- Remove the tick() path from walreceiver: it isn't needed any more now
that we do equivalent checks from compaction_iteration.
- Add tests for the above.

---------

Co-authored-by: Arpad Müller <arpad-m@users.noreply.github.com>
2024-03-26 15:45:32 +00:00
John Spray
8dfe3a070c pageserver: return 429 on timeline creation in progress (#7225)
## Problem

Currently, we return 409 (Conflict) in two cases:
- Temporary: Timeline creation cannot proceed because another timeline
with the same ID is being created
- Permanent: Timeline creation cannot proceed because another timeline
exists with different parameters but the same ID.

Callers which time out a request and retry should be able to distinguish
these cases.

Closes: #7208 

## Summary of changes

- Expose `AlreadyCreating` errors as 429 instead of 409
2024-03-26 15:20:05 +00:00
Alexander Bayandin
3426619a79 test_runner/performance: skip test_bulk_insert (#7238)
## Problem
`test_bulk_insert` becomes too slow, and it fails constantly:
https://github.com/neondatabase/neon/issues/7124

## Summary of changes
- Skip `test_bulk_insert` until it's fixed
2024-03-26 15:10:15 +00:00
Vlad Lazar
de03742ca3 pageserver: drop layer map lock in Timeline::get (#7217)
## Problem
We currently hold the layer map read lock while doing IO on the read
path. This is not required for correctness.

## Summary of changes
Drop the layer map lock after figuring out which layer we wish to read
from.
Why is this correct:
* `Layer` models the lifecycle of an on disk layer. In the event the
layer is removed from local disk, it will be on demand downloaded
* `InMemoryLayer` holds the `EphemeralFile` which wraps the on disk
file. As long as the `InMemoryLayer` is in scope, it's safe to read from it.

Related https://github.com/neondatabase/neon/issues/6833
2024-03-26 14:35:36 +00:00
44 changed files with 1085 additions and 652 deletions

1
Cargo.lock generated
View File

@@ -3581,6 +3581,7 @@ dependencies = [
"strum_macros",
"svg_fmt",
"sync_wrapper",
"sysinfo",
"tenant_size_model",
"thiserror",
"tokio",

View File

@@ -1523,6 +1523,8 @@ impl Service {
&self,
create_req: TenantCreateRequest,
) -> Result<TenantCreateResponse, ApiError> {
let tenant_id = create_req.new_tenant_id.tenant_id;
// Exclude any concurrent attempts to create/access the same tenant ID
let _tenant_lock = self
.tenant_op_locks
@@ -1531,7 +1533,12 @@ impl Service {
let (response, waiters) = self.do_tenant_create(create_req).await?;
self.await_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await?;
if let Err(e) = self.await_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await {
// Avoid deadlock: reconcile may fail while notifying compute, if the cloud control plane refuses to
// accept compute notifications while it is in the process of creating. Reconciliation will
// be retried in the background.
tracing::warn!(%tenant_id, "Reconcile not done yet while creating tenant ({e})");
}
Ok(response)
}
@@ -1610,13 +1617,25 @@ impl Service {
splitting: SplitState::default(),
})
.collect();
self.persistence
match self
.persistence
.insert_tenant_shards(persist_tenant_shards)
.await
.map_err(|e| {
// TODO: distinguish primary key constraint (idempotent, OK), from other errors
ApiError::InternalServerError(anyhow::anyhow!(e))
})?;
{
Ok(_) => {}
Err(DatabaseError::Query(diesel::result::Error::DatabaseError(
DatabaseErrorKind::UniqueViolation,
_,
))) => {
// Unique key violation: this is probably a retry. Because the shard count is part of the unique key,
// if we see a unique key violation it means that the creation request's shard count matches the previous
// creation's shard count.
tracing::info!("Tenant shards already present in database, proceeding with idempotent creation...");
}
// Any other database error is unexpected and a bug.
Err(e) => return Err(ApiError::InternalServerError(anyhow::anyhow!(e))),
};
let (waiters, response_shards) = {
let mut locked = self.inner.write().unwrap();

View File

@@ -40,7 +40,7 @@ macro_rules! register_hll {
}};
($N:literal, $NAME:expr, $HELP:expr $(,)?) => {{
$crate::register_hll!($N, $crate::opts!($NAME, $HELP), $LABELS_NAMES)
$crate::register_hll!($N, $crate::opts!($NAME, $HELP))
}};
}

View File

@@ -59,6 +59,7 @@ signal-hook.workspace = true
smallvec = { workspace = true, features = ["write"] }
svg_fmt.workspace = true
sync_wrapper.workspace = true
sysinfo.workspace = true
tokio-tar.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time"] }

View File

@@ -600,33 +600,37 @@ fn start_pageserver(
None,
"consumption metrics collection",
true,
async move {
// first wait until background jobs are cleared to launch.
//
// this is because we only process active tenants and timelines, and the
// Timeline::get_current_logical_size will spawn the logical size calculation,
// which will not be rate-limited.
let cancel = task_mgr::shutdown_token();
{
let tenant_manager = tenant_manager.clone();
async move {
// first wait until background jobs are cleared to launch.
//
// this is because we only process active tenants and timelines, and the
// Timeline::get_current_logical_size will spawn the logical size calculation,
// which will not be rate-limited.
let cancel = task_mgr::shutdown_token();
tokio::select! {
_ = cancel.cancelled() => { return Ok(()); },
_ = background_jobs_barrier.wait() => {}
};
tokio::select! {
_ = cancel.cancelled() => { return Ok(()); },
_ = background_jobs_barrier.wait() => {}
};
pageserver::consumption_metrics::collect_metrics(
metric_collection_endpoint,
&conf.metric_collection_bucket,
conf.metric_collection_interval,
conf.cached_metric_collection_interval,
conf.synthetic_size_calculation_interval,
conf.id,
local_disk_storage,
cancel,
metrics_ctx,
)
.instrument(info_span!("metrics_collection"))
.await?;
Ok(())
pageserver::consumption_metrics::collect_metrics(
tenant_manager,
metric_collection_endpoint,
&conf.metric_collection_bucket,
conf.metric_collection_interval,
conf.cached_metric_collection_interval,
conf.synthetic_size_calculation_interval,
conf.id,
local_disk_storage,
cancel,
metrics_ctx,
)
.instrument(info_span!("metrics_collection"))
.await?;
Ok(())
}
},
);
}

View File

@@ -95,6 +95,8 @@ pub mod defaults {
pub const DEFAULT_VALIDATE_VECTORED_GET: bool = true;
pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0;
///
/// Default built-in configuration file.
///
@@ -156,6 +158,8 @@ pub mod defaults {
#heatmap_upload_concurrency = {DEFAULT_HEATMAP_UPLOAD_CONCURRENCY}
#secondary_download_concurrency = {DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY}
#ephemeral_bytes_per_memory_kb = {DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB}
[remote_storage]
"#
@@ -279,6 +283,13 @@ pub struct PageServerConf {
pub max_vectored_read_bytes: MaxVectoredReadBytes,
pub validate_vectored_get: bool,
/// How many bytes of ephemeral layer content will we allow per kilobyte of RAM. When this
/// is exceeded, we start proactively closing ephemeral layers to limit the total amount
/// of ephemeral data.
///
/// Setting this to zero disables limits on total ephemeral layer size.
pub ephemeral_bytes_per_memory_kb: usize,
}
/// We do not want to store this in a PageServerConf because the latter may be logged
@@ -400,6 +411,8 @@ struct PageServerConfigBuilder {
max_vectored_read_bytes: BuilderValue<MaxVectoredReadBytes>,
validate_vectored_get: BuilderValue<bool>,
ephemeral_bytes_per_memory_kb: BuilderValue<usize>,
}
impl PageServerConfigBuilder {
@@ -486,6 +499,7 @@ impl PageServerConfigBuilder {
NonZeroUsize::new(DEFAULT_MAX_VECTORED_READ_BYTES).unwrap(),
)),
validate_vectored_get: Set(DEFAULT_VALIDATE_VECTORED_GET),
ephemeral_bytes_per_memory_kb: Set(DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
}
}
}
@@ -665,6 +679,10 @@ impl PageServerConfigBuilder {
self.validate_vectored_get = BuilderValue::Set(value);
}
pub fn get_ephemeral_bytes_per_memory_kb(&mut self, value: usize) {
self.ephemeral_bytes_per_memory_kb = BuilderValue::Set(value);
}
pub fn build(self) -> anyhow::Result<PageServerConf> {
let default = Self::default_values();
@@ -720,6 +738,7 @@ impl PageServerConfigBuilder {
get_vectored_impl,
max_vectored_read_bytes,
validate_vectored_get,
ephemeral_bytes_per_memory_kb,
}
CUSTOM LOGIC
{
@@ -1010,6 +1029,9 @@ impl PageServerConf {
"validate_vectored_get" => {
builder.get_validate_vectored_get(parse_toml_bool("validate_vectored_get", item)?)
}
"ephemeral_bytes_per_memory_kb" => {
builder.get_ephemeral_bytes_per_memory_kb(parse_toml_u64("ephemeral_bytes_per_memory_kb", item)? as usize)
}
_ => bail!("unrecognized pageserver option '{key}'"),
}
}
@@ -1091,6 +1113,7 @@ impl PageServerConf {
.expect("Invalid default constant"),
),
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
}
}
}
@@ -1328,6 +1351,7 @@ background_task_maximum_delay = '334 s'
.expect("Invalid default constant")
),
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB
},
"Correct defaults should be used when no config values are provided"
);
@@ -1399,6 +1423,7 @@ background_task_maximum_delay = '334 s'
.expect("Invalid default constant")
),
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB
},
"Should be able to parse all basic config values correctly"
);

View File

@@ -3,7 +3,9 @@
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::tasks::BackgroundLoopKind;
use crate::tenant::{mgr, LogicalSizeCalculationCause, PageReconstructError, Tenant};
use crate::tenant::{
mgr::TenantManager, LogicalSizeCalculationCause, PageReconstructError, Tenant,
};
use camino::Utf8PathBuf;
use consumption_metrics::EventType;
use pageserver_api::models::TenantState;
@@ -41,6 +43,7 @@ type Cache = HashMap<MetricsKey, (EventType, u64)>;
/// Main thread that serves metrics collection
#[allow(clippy::too_many_arguments)]
pub async fn collect_metrics(
tenant_manager: Arc<TenantManager>,
metric_collection_endpoint: &Url,
metric_collection_bucket: &Option<RemoteStorageConfig>,
metric_collection_interval: Duration,
@@ -67,15 +70,19 @@ pub async fn collect_metrics(
None,
"synthetic size calculation",
false,
async move {
calculate_synthetic_size_worker(
synthetic_size_calculation_interval,
&cancel,
&worker_ctx,
)
.instrument(info_span!("synthetic_size_worker"))
.await?;
Ok(())
{
let tenant_manager = tenant_manager.clone();
async move {
calculate_synthetic_size_worker(
tenant_manager,
synthetic_size_calculation_interval,
&cancel,
&worker_ctx,
)
.instrument(info_span!("synthetic_size_worker"))
.await?;
Ok(())
}
},
);
@@ -116,7 +123,7 @@ pub async fn collect_metrics(
let started_at = Instant::now();
// these are point in time, with variable "now"
let metrics = metrics::collect_all_metrics(&cached_metrics, &ctx).await;
let metrics = metrics::collect_all_metrics(&tenant_manager, &cached_metrics, &ctx).await;
let metrics = Arc::new(metrics);
@@ -271,6 +278,7 @@ async fn reschedule(
/// Caclculate synthetic size for each active tenant
async fn calculate_synthetic_size_worker(
tenant_manager: Arc<TenantManager>,
synthetic_size_calculation_interval: Duration,
cancel: &CancellationToken,
ctx: &RequestContext,
@@ -283,7 +291,7 @@ async fn calculate_synthetic_size_worker(
loop {
let started_at = Instant::now();
let tenants = match mgr::list_tenants().await {
let tenants = match tenant_manager.list_tenants() {
Ok(tenants) => tenants,
Err(e) => {
warn!("cannot get tenant list: {e:#}");
@@ -302,10 +310,14 @@ async fn calculate_synthetic_size_worker(
continue;
}
let Ok(tenant) = mgr::get_tenant(tenant_shard_id, true) else {
let Ok(tenant) = tenant_manager.get_attached_tenant_shard(tenant_shard_id) else {
continue;
};
if !tenant.is_active() {
continue;
}
// there is never any reason to exit calculate_synthetic_size_worker following any
// return value -- we don't need to care about shutdown because no tenant is found when
// pageserver is shut down.
@@ -343,9 +355,7 @@ async fn calculate_and_log(tenant: &Tenant, cancel: &CancellationToken, ctx: &Re
};
// this error can be returned if timeline is shutting down, but it does not
// mean the synthetic size worker should terminate. we do not need any checks
// in this function because `mgr::get_tenant` will error out after shutdown has
// progressed to shutting down tenants.
// mean the synthetic size worker should terminate.
let shutting_down = matches!(
e.downcast_ref::<PageReconstructError>(),
Some(PageReconstructError::Cancelled | PageReconstructError::AncestorStopping(_))

View File

@@ -1,3 +1,4 @@
use crate::tenant::mgr::TenantManager;
use crate::{context::RequestContext, tenant::timeline::logical_size::CurrentLogicalSize};
use chrono::{DateTime, Utc};
use consumption_metrics::EventType;
@@ -181,6 +182,7 @@ impl MetricsKey {
}
pub(super) async fn collect_all_metrics(
tenant_manager: &Arc<TenantManager>,
cached_metrics: &Cache,
ctx: &RequestContext,
) -> Vec<RawMetric> {
@@ -188,7 +190,7 @@ pub(super) async fn collect_all_metrics(
let started_at = std::time::Instant::now();
let tenants = match crate::tenant::mgr::list_tenants().await {
let tenants = match tenant_manager.list_tenants() {
Ok(tenants) => tenants,
Err(err) => {
tracing::error!("failed to list tenants: {:?}", err);
@@ -200,7 +202,8 @@ pub(super) async fn collect_all_metrics(
if state != TenantState::Active || !id.is_zero() {
None
} else {
crate::tenant::mgr::get_tenant(id, true)
tenant_manager
.get_attached_tenant_shard(id)
.ok()
.map(|tenant| (id.tenant_id, tenant))
}

View File

@@ -61,7 +61,6 @@ use crate::{
metrics::disk_usage_based_eviction::METRICS,
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
tenant::{
self,
mgr::TenantManager,
remote_timeline_client::LayerFileMetadata,
secondary::SecondaryTenant,
@@ -814,8 +813,8 @@ async fn collect_eviction_candidates(
const LOG_DURATION_THRESHOLD: std::time::Duration = std::time::Duration::from_secs(10);
// get a snapshot of the list of tenants
let tenants = tenant::mgr::list_tenants()
.await
let tenants = tenant_manager
.list_tenants()
.context("get list of tenants")?;
// TODO: avoid listing every layer in every tenant: this loop can block the executor,
@@ -827,8 +826,12 @@ async fn collect_eviction_candidates(
if cancel.is_cancelled() {
return Ok(EvictionCandidates::Cancelled);
}
let tenant = match tenant::mgr::get_tenant(tenant_id, true) {
Ok(tenant) => tenant,
let tenant = match tenant_manager.get_attached_tenant_shard(tenant_id) {
Ok(tenant) if tenant.is_active() => tenant,
Ok(_) => {
debug!(tenant_id=%tenant_id.tenant_id, shard_id=%tenant_id.shard_slug(), "Tenant shard is not active");
continue;
}
Err(e) => {
// this can happen if tenant has lifecycle transition after we fetched it
debug!("failed to get tenant: {e:#}");

View File

@@ -1038,7 +1038,7 @@ paths:
format: hex
responses:
"201":
description: TimelineInfo
description: Timeline was created, or already existed with matching parameters
content:
application/json:
schema:
@@ -1068,11 +1068,17 @@ paths:
schema:
$ref: "#/components/schemas/Error"
"409":
description: Timeline already exists, creation skipped
description: Timeline already exists, with different parameters. Creation cannot proceed.
content:
application/json:
schema:
$ref: "#/components/schemas/ConflictError"
"429":
description: A creation request was sent for the same Timeline Id while a creation was already in progress. Back off and retry.
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
"500":
description: Generic operation error
content:

View File

@@ -49,8 +49,8 @@ use crate::task_mgr::TaskKind;
use crate::tenant::config::{LocationConf, TenantConfOpt};
use crate::tenant::mgr::GetActiveTenantError;
use crate::tenant::mgr::{
GetTenantError, SetNewTenantConfigError, TenantManager, TenantMapError, TenantMapInsertError,
TenantSlotError, TenantSlotUpsertError, TenantStateError,
GetTenantError, TenantManager, TenantMapError, TenantMapInsertError, TenantSlotError,
TenantSlotUpsertError, TenantStateError,
};
use crate::tenant::mgr::{TenantSlot, UpsertLocationError};
use crate::tenant::remote_timeline_client;
@@ -249,16 +249,11 @@ impl From<GetTenantError> for ApiError {
fn from(tse: GetTenantError) -> ApiError {
match tse {
GetTenantError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid).into()),
GetTenantError::Broken(reason) => {
ApiError::InternalServerError(anyhow!("tenant is broken: {}", reason))
}
GetTenantError::NotActive(_) => {
// Why is this not `ApiError::NotFound`?
// Because we must be careful to never return 404 for a tenant if it does
// in fact exist locally. If we did, the caller could draw the conclusion
// that it can attach the tenant to another PS and we'd be in split-brain.
//
// (We can produce this variant only in `mgr::get_tenant(..., active=true)` calls).
ApiError::ResourceUnavailable("Tenant not yet active".into())
}
GetTenantError::MapState(e) => ApiError::ResourceUnavailable(format!("{e}").into()),
@@ -269,6 +264,9 @@ impl From<GetTenantError> for ApiError {
impl From<GetActiveTenantError> for ApiError {
fn from(e: GetActiveTenantError) -> ApiError {
match e {
GetActiveTenantError::Broken(reason) => {
ApiError::InternalServerError(anyhow!("tenant is broken: {}", reason))
}
GetActiveTenantError::WillNotBecomeActive(_) => ApiError::Conflict(format!("{}", e)),
GetActiveTenantError::Cancelled => ApiError::ShuttingDown,
GetActiveTenantError::NotFound(gte) => gte.into(),
@@ -279,19 +277,6 @@ impl From<GetActiveTenantError> for ApiError {
}
}
impl From<SetNewTenantConfigError> for ApiError {
fn from(e: SetNewTenantConfigError) -> ApiError {
match e {
SetNewTenantConfigError::GetTenant(tid) => {
ApiError::NotFound(anyhow!("tenant {}", tid).into())
}
e @ (SetNewTenantConfigError::Persist(_) | SetNewTenantConfigError::Other(_)) => {
ApiError::InternalServerError(anyhow::Error::new(e))
}
}
}
}
impl From<crate::tenant::DeleteTimelineError> for ApiError {
fn from(value: crate::tenant::DeleteTimelineError) -> Self {
use crate::tenant::DeleteTimelineError::*;
@@ -495,7 +480,7 @@ async fn timeline_create_handler(
async {
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id, false)?;
.get_attached_tenant_shard(tenant_shard_id)?;
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
@@ -535,10 +520,13 @@ async fn timeline_create_handler(
HttpErrorBody::from_msg("Tenant shutting down".to_string()),
)
}
Err(
e @ tenant::CreateTimelineError::Conflict
| e @ tenant::CreateTimelineError::AlreadyCreating,
) => json_response(StatusCode::CONFLICT, HttpErrorBody::from_msg(e.to_string())),
Err(e @ tenant::CreateTimelineError::Conflict) => {
json_response(StatusCode::CONFLICT, HttpErrorBody::from_msg(e.to_string()))
}
Err(e @ tenant::CreateTimelineError::AlreadyCreating) => json_response(
StatusCode::TOO_MANY_REQUESTS,
HttpErrorBody::from_msg(e.to_string()),
),
Err(tenant::CreateTimelineError::AncestorLsn(err)) => json_response(
StatusCode::NOT_ACCEPTABLE,
HttpErrorBody::from_msg(format!("{err:#}")),
@@ -581,7 +569,7 @@ async fn timeline_list_handler(
let response_data = async {
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id, false)?;
.get_attached_tenant_shard(tenant_shard_id)?;
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
@@ -619,6 +607,7 @@ async fn timeline_preserve_initdb_handler(
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let state = get_state(&request);
// Part of the process for disaster recovery from safekeeper-stored WAL:
// If we don't recover into a new timeline but want to keep the timeline ID,
@@ -626,7 +615,9 @@ async fn timeline_preserve_initdb_handler(
// location where timeline recreation cand find it.
async {
let tenant = mgr::get_tenant(tenant_shard_id, false)?;
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
let timeline = tenant
.get_timeline(timeline_id, false)
@@ -668,7 +659,7 @@ async fn timeline_detail_handler(
let timeline_info = async {
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id, false)?;
.get_attached_tenant_shard(tenant_shard_id)?;
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
@@ -855,7 +846,7 @@ async fn timeline_delete_handler(
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id, false)
.get_attached_tenant_shard(tenant_shard_id)
.map_err(|e| {
match e {
// GetTenantError has a built-in conversion to ApiError, but in this context we don't
@@ -973,10 +964,11 @@ async fn tenant_list_handler(
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
let state = get_state(&request);
let response_data = mgr::list_tenants()
.instrument(info_span!("tenant_list"))
.await
let response_data = state
.tenant_manager
.list_tenants()
.map_err(|_| {
ApiError::ResourceUnavailable("Tenant map is initializing or shutting down".into())
})?
@@ -999,9 +991,12 @@ async fn tenant_status(
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let state = get_state(&request);
let tenant_info = async {
let tenant = mgr::get_tenant(tenant_shard_id, false)?;
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
// Calculate total physical size of all timelines
let mut current_physical_size = 0;
@@ -1074,9 +1069,7 @@ async fn tenant_size_handler(
let inputs_only: Option<bool> = parse_query_param(&request, "inputs_only")?;
let retention_period: Option<u64> = parse_query_param(&request, "retention_period")?;
let headers = request.headers();
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let tenant = mgr::get_tenant(tenant_shard_id, true)?;
let state = get_state(&request);
if !tenant_shard_id.is_zero() {
return Err(ApiError::BadRequest(anyhow!(
@@ -1084,6 +1077,12 @@ async fn tenant_size_handler(
)));
}
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
// this can be long operation
let inputs = tenant
.gather_size_inputs(
@@ -1152,10 +1151,15 @@ async fn tenant_shard_split_handler(
let state = get_state(&request);
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
let new_shards = state
.tenant_manager
.shard_split(
tenant_shard_id,
tenant,
ShardCount::new(req.new_shard_count),
req.new_stripe_size,
&ctx,
@@ -1373,8 +1377,11 @@ async fn get_tenant_config_handler(
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let state = get_state(&request);
let tenant = mgr::get_tenant(tenant_shard_id, false)?;
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
let response = HashMap::from([
(
@@ -1402,15 +1409,31 @@ async fn update_tenant_config_handler(
let tenant_id = request_data.tenant_id;
check_permission(&request, Some(tenant_id))?;
let tenant_conf =
let new_tenant_conf =
TenantConfOpt::try_from(&request_data.config).map_err(ApiError::BadRequest)?;
let state = get_state(&request);
state
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
let tenant = state
.tenant_manager
.set_new_tenant_config(tenant_conf, tenant_id)
.instrument(info_span!("tenant_config", %tenant_id))
.await?;
.get_attached_tenant_shard(tenant_shard_id)?;
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
// This is a legacy API that only operates on attached tenants: the preferred
// API to use is the location_config/ endpoint, which lets the caller provide
// the full LocationConf.
let location_conf = LocationConf::attached_single(
new_tenant_conf.clone(),
tenant.get_generation(),
&ShardParameters::default(),
);
crate::tenant::Tenant::persist_tenant_config(state.conf, &tenant_shard_id, &location_conf)
.await
.map_err(ApiError::InternalServerError)?;
tenant.set_new_tenant_config(new_tenant_conf);
json_response(StatusCode::OK, ())
}
@@ -1634,10 +1657,12 @@ async fn handle_tenant_break(
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&r, "tenant_shard_id")?;
let tenant = crate::tenant::mgr::get_tenant(tenant_shard_id, true)
.map_err(|_| ApiError::Conflict(String::from("no active tenant found")))?;
tenant.set_broken("broken from test".to_owned()).await;
let state = get_state(&r);
state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?
.set_broken("broken from test".to_owned())
.await;
json_response(StatusCode::OK, ())
}
@@ -1881,7 +1906,7 @@ async fn active_timeline_of_active_tenant(
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
) -> Result<Arc<Timeline>, ApiError> {
let tenant = tenant_manager.get_attached_tenant_shard(tenant_shard_id, false)?;
let tenant = tenant_manager.get_attached_tenant_shard(tenant_shard_id)?;
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;

View File

@@ -760,6 +760,7 @@ impl PageServerHandler {
let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb, &tenant.cancel)));
timeline
.import_basebackup_from_tar(
tenant.clone(),
&mut copyin_reader,
base_lsn,
self.broker_client.clone(),

View File

@@ -1411,7 +1411,7 @@ impl Tenant {
/// the same timeline ID already exists, returns CreateTimelineError::AlreadyExists.
#[allow(clippy::too_many_arguments)]
pub(crate) async fn create_timeline(
&self,
self: &Arc<Tenant>,
new_timeline_id: TimelineId,
ancestor_timeline_id: Option<TimelineId>,
mut ancestor_start_lsn: Option<Lsn>,
@@ -1559,7 +1559,7 @@ impl Tenant {
})?;
}
loaded_timeline.activate(broker_client, None, ctx);
loaded_timeline.activate(self.clone(), broker_client, None, ctx);
Ok(loaded_timeline)
}
@@ -1731,7 +1731,12 @@ impl Tenant {
let mut activated_timelines = 0;
for timeline in timelines_to_activate {
timeline.activate(broker_client.clone(), background_jobs_can_start, ctx);
timeline.activate(
self.clone(),
broker_client.clone(),
background_jobs_can_start,
ctx,
);
activated_timelines += 1;
}
@@ -2063,7 +2068,12 @@ impl Tenant {
TenantState::Active { .. } => {
return Ok(());
}
TenantState::Broken { .. } | TenantState::Stopping { .. } => {
TenantState::Broken { reason, .. } => {
// This is fatal, and reported distinctly from the general case of "will never be active" because
// it's logically a 500 to external API users (broken is always a bug).
return Err(GetActiveTenantError::Broken(reason));
}
TenantState::Stopping { .. } => {
// There's no chance the tenant can transition back into ::Active
return Err(GetActiveTenantError::WillNotBecomeActive(current_state));
}

View File

@@ -4,7 +4,7 @@
use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
use itertools::Itertools;
use pageserver_api::key::Key;
use pageserver_api::models::{LocationConfigMode, ShardParameters};
use pageserver_api::models::LocationConfigMode;
use pageserver_api::shard::{
ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId,
};
@@ -16,6 +16,7 @@ use std::collections::{BTreeMap, HashMap};
use std::ops::Deref;
use std::sync::Arc;
use std::time::{Duration, Instant};
use sysinfo::SystemExt;
use tokio::fs;
use utils::timeout::{timeout_cancellable, TimeoutCancellableError};
@@ -39,10 +40,10 @@ use crate::metrics::{TENANT, TENANT_MANAGER as METRICS};
use crate::task_mgr::{self, TaskKind};
use crate::tenant::config::{
AttachedLocationConfig, AttachmentMode, LocationConf, LocationMode, SecondaryLocationConfig,
TenantConfOpt,
};
use crate::tenant::delete::DeleteTenantFlow;
use crate::tenant::span::debug_assert_current_span_has_tenant_id;
use crate::tenant::storage_layer::inmemory_layer;
use crate::tenant::{AttachedTenantConf, SpawnMode, Tenant, TenantState};
use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TEMP_FILE_SUFFIX};
@@ -543,6 +544,18 @@ pub async fn init_tenant_mgr(
let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
// Initialize dynamic limits that depend on system resources
let system_memory =
sysinfo::System::new_with_specifics(sysinfo::RefreshKind::new().with_memory())
.total_memory();
let max_ephemeral_layer_bytes =
conf.ephemeral_bytes_per_memory_kb as u64 * (system_memory / 1024);
tracing::info!("Initialized ephemeral layer size limit to {max_ephemeral_layer_bytes}, for {system_memory} bytes of memory");
inmemory_layer::GLOBAL_RESOURCES.max_dirty_bytes.store(
max_ephemeral_layer_bytes,
std::sync::atomic::Ordering::Relaxed,
);
// Scan local filesystem for attached tenants
let tenant_configs = init_load_tenant_configs(conf).await?;
@@ -875,16 +888,6 @@ async fn shutdown_all_tenants0(tenants: &std::sync::RwLock<TenantsMap>) {
// caller will log how long we took
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum SetNewTenantConfigError {
#[error(transparent)]
GetTenant(#[from] GetTenantError),
#[error(transparent)]
Persist(anyhow::Error),
#[error(transparent)]
Other(anyhow::Error),
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum UpsertLocationError {
#[error("Bad config request: {0}")]
@@ -910,32 +913,21 @@ impl TenantManager {
self.conf
}
/// Gets the attached tenant from the in-memory data, erroring if it's absent, in secondary mode, or is not fitting to the query.
/// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
/// Gets the attached tenant from the in-memory data, erroring if it's absent, in secondary mode, or currently
/// undergoing a state change (i.e. slot is InProgress).
///
/// The return Tenant is not guaranteed to be active: check its status after obtaing it, or
/// use [`Tenant::wait_to_become_active`] before using it if you will do I/O on it.
pub(crate) fn get_attached_tenant_shard(
&self,
tenant_shard_id: TenantShardId,
active_only: bool,
) -> Result<Arc<Tenant>, GetTenantError> {
let locked = self.tenants.read().unwrap();
let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)?;
match peek_slot {
Some(TenantSlot::Attached(tenant)) => match tenant.current_state() {
TenantState::Broken {
reason,
backtrace: _,
} if active_only => Err(GetTenantError::Broken(reason)),
TenantState::Active => Ok(Arc::clone(tenant)),
_ => {
if active_only {
Err(GetTenantError::NotActive(tenant_shard_id))
} else {
Ok(Arc::clone(tenant))
}
}
},
Some(TenantSlot::Attached(tenant)) => Ok(Arc::clone(tenant)),
Some(TenantSlot::InProgress(_)) => Err(GetTenantError::NotActive(tenant_shard_id)),
None | Some(TenantSlot::Secondary(_)) => {
Err(GetTenantError::NotFound(tenant_shard_id.tenant_id))
@@ -1428,7 +1420,8 @@ impl TenantManager {
.wait_to_become_active(activation_timeout)
.await
.map_err(|e| match e {
GetActiveTenantError::WillNotBecomeActive(_) => {
GetActiveTenantError::WillNotBecomeActive(_)
| GetActiveTenantError::Broken(_) => {
DeleteTenantError::InvalidState(tenant.current_state())
}
GetActiveTenantError::Cancelled => DeleteTenantError::Cancelled,
@@ -1455,29 +1448,30 @@ impl TenantManager {
result
}
#[instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), new_shard_count=%new_shard_count.literal()))]
#[instrument(skip_all, fields(tenant_id=%tenant.get_tenant_shard_id().tenant_id, shard_id=%tenant.get_tenant_shard_id().shard_slug(), new_shard_count=%new_shard_count.literal()))]
pub(crate) async fn shard_split(
&self,
tenant_shard_id: TenantShardId,
tenant: Arc<Tenant>,
new_shard_count: ShardCount,
new_stripe_size: Option<ShardStripeSize>,
ctx: &RequestContext,
) -> anyhow::Result<Vec<TenantShardId>> {
let tenant_shard_id = *tenant.get_tenant_shard_id();
let r = self
.do_shard_split(tenant_shard_id, new_shard_count, new_stripe_size, ctx)
.do_shard_split(tenant, new_shard_count, new_stripe_size, ctx)
.await;
if r.is_err() {
// Shard splitting might have left the original shard in a partially shut down state (it
// stops the shard's remote timeline client). Reset it to ensure we leave things in
// a working state.
if self.get(tenant_shard_id).is_some() {
tracing::warn!("Resetting {tenant_shard_id} after shard split failure");
tracing::warn!("Resetting after shard split failure");
if let Err(e) = self.reset_tenant(tenant_shard_id, false, ctx).await {
// Log this error because our return value will still be the original error, not this one. This is
// a severe error: if this happens, we might be leaving behind a tenant that is not fully functional
// (e.g. has uploads disabled). We can't do anything else: if reset fails then shutting the tenant down or
// setting it broken probably won't help either.
tracing::error!("Failed to reset {tenant_shard_id}: {e}");
tracing::error!("Failed to reset: {e}");
}
}
}
@@ -1487,12 +1481,12 @@ impl TenantManager {
pub(crate) async fn do_shard_split(
&self,
tenant_shard_id: TenantShardId,
tenant: Arc<Tenant>,
new_shard_count: ShardCount,
new_stripe_size: Option<ShardStripeSize>,
ctx: &RequestContext,
) -> anyhow::Result<Vec<TenantShardId>> {
let tenant = get_tenant(tenant_shard_id, true)?;
let tenant_shard_id = *tenant.get_tenant_shard_id();
// Validate the incoming request
if new_shard_count.count() <= tenant_shard_id.shard_count.count() {
@@ -1538,7 +1532,6 @@ impl TenantManager {
// If [`Tenant::split_prepare`] fails, we must reload the tenant, because it might
// have been left in a partially-shut-down state.
tracing::warn!("Failed to prepare for split: {e}, reloading Tenant before returning");
self.reset_tenant(tenant_shard_id, false, ctx).await?;
return Err(e);
}
@@ -1936,38 +1929,23 @@ impl TenantManager {
removal_result
}
pub(crate) async fn set_new_tenant_config(
pub(crate) fn list_tenants(
&self,
new_tenant_conf: TenantConfOpt,
tenant_id: TenantId,
) -> Result<(), SetNewTenantConfigError> {
// Legacy API: does not support sharding
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
info!("configuring tenant {tenant_id}");
let tenant = get_tenant(tenant_shard_id, true)?;
if !tenant.tenant_shard_id().shard_count.is_unsharded() {
// Note that we use ShardParameters::default below.
return Err(SetNewTenantConfigError::Other(anyhow::anyhow!(
"This API may only be used on single-sharded tenants, use the /location_config API for sharded tenants"
)));
}
// This is a legacy API that only operates on attached tenants: the preferred
// API to use is the location_config/ endpoint, which lets the caller provide
// the full LocationConf.
let location_conf = LocationConf::attached_single(
new_tenant_conf.clone(),
tenant.generation,
&ShardParameters::default(),
);
Tenant::persist_tenant_config(self.conf, &tenant_shard_id, &location_conf)
.await
.map_err(SetNewTenantConfigError::Persist)?;
tenant.set_new_tenant_config(new_tenant_conf);
Ok(())
) -> Result<Vec<(TenantShardId, TenantState, Generation)>, TenantMapListError> {
let tenants = TENANTS.read().unwrap();
let m = match &*tenants {
TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m,
};
Ok(m.iter()
.filter_map(|(id, tenant)| match tenant {
TenantSlot::Attached(tenant) => {
Some((*id, tenant.current_state(), tenant.generation()))
}
TenantSlot::Secondary(_) => None,
TenantSlot::InProgress(_) => None,
})
.collect())
}
}
@@ -1980,51 +1958,12 @@ pub(crate) enum GetTenantError {
#[error("Tenant {0} is not active")]
NotActive(TenantShardId),
/// Broken is logically a subset of NotActive, but a distinct error is useful as
/// NotActive is usually a retryable state for API purposes, whereas Broken
/// is a stuck error state
#[error("Tenant is broken: {0}")]
Broken(String),
// Initializing or shutting down: cannot authoritatively say whether we have this tenant
#[error("Tenant map is not available: {0}")]
MapState(#[from] TenantMapError),
}
/// Gets the tenant from the in-memory data, erroring if it's absent or is not fitting to the query.
/// `active_only = true` allows to query only tenants that are ready for operations, erroring on other kinds of tenants.
///
/// This method is cancel-safe.
pub(crate) fn get_tenant(
tenant_shard_id: TenantShardId,
active_only: bool,
) -> Result<Arc<Tenant>, GetTenantError> {
let locked = TENANTS.read().unwrap();
let peek_slot = tenant_map_peek_slot(&locked, &tenant_shard_id, TenantSlotPeekMode::Read)?;
match peek_slot {
Some(TenantSlot::Attached(tenant)) => match tenant.current_state() {
TenantState::Broken {
reason,
backtrace: _,
} if active_only => Err(GetTenantError::Broken(reason)),
TenantState::Active => Ok(Arc::clone(tenant)),
_ => {
if active_only {
Err(GetTenantError::NotActive(tenant_shard_id))
} else {
Ok(Arc::clone(tenant))
}
}
},
Some(TenantSlot::InProgress(_)) => Err(GetTenantError::NotActive(tenant_shard_id)),
None | Some(TenantSlot::Secondary(_)) => {
Err(GetTenantError::NotFound(tenant_shard_id.tenant_id))
}
}
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum GetActiveTenantError {
/// We may time out either while TenantSlot is InProgress, or while the Tenant
@@ -2048,6 +1987,12 @@ pub(crate) enum GetActiveTenantError {
/// Tenant exists, but is in a state that cannot become active (e.g. Stopping, Broken)
#[error("will not become active. Current state: {0}")]
WillNotBecomeActive(TenantState),
/// Broken is logically a subset of WillNotBecomeActive, but a distinct error is useful as
/// WillNotBecomeActive is a permitted error under some circumstances, whereas broken should
/// never happen.
#[error("Tenant is broken: {0}")]
Broken(String),
}
/// Get a [`Tenant`] in its active state. If the tenant_id is currently in [`TenantSlot::InProgress`]
@@ -2267,27 +2212,6 @@ pub(crate) enum TenantMapListError {
Initializing,
}
///
/// Get list of tenants, for the mgmt API
///
pub(crate) async fn list_tenants(
) -> Result<Vec<(TenantShardId, TenantState, Generation)>, TenantMapListError> {
let tenants = TENANTS.read().unwrap();
let m = match &*tenants {
TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
TenantsMap::Open(m) | TenantsMap::ShuttingDown(m) => m,
};
Ok(m.iter()
.filter_map(|(id, tenant)| match tenant {
TenantSlot::Attached(tenant) => {
Some((*id, tenant.current_state(), tenant.generation()))
}
TenantSlot::Secondary(_) => None,
TenantSlot::InProgress(_) => None,
})
.collect())
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum TenantMapInsertError {
#[error(transparent)]

View File

@@ -9,6 +9,7 @@ use crate::{
metrics::SECONDARY_MODE,
tenant::{
config::AttachmentMode,
mgr::GetTenantError,
mgr::TenantManager,
remote_timeline_client::remote_heatmap_path,
span::debug_assert_current_span_has_tenant_id,
@@ -292,8 +293,11 @@ impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
"Starting heatmap write on command");
let tenant = self
.tenant_manager
.get_attached_tenant_shard(*tenant_shard_id, true)
.get_attached_tenant_shard(*tenant_shard_id)
.map_err(|e| anyhow::anyhow!(e))?;
if !tenant.is_active() {
return Err(GetTenantError::NotActive(*tenant_shard_id).into());
}
Ok(UploadPending {
// Ignore our state for last digest: this forces an upload even if nothing has changed

View File

@@ -3,7 +3,7 @@
pub mod delta_layer;
mod filename;
pub mod image_layer;
mod inmemory_layer;
pub(crate) mod inmemory_layer;
pub(crate) mod layer;
mod layer_desc;

View File

@@ -89,7 +89,10 @@ impl std::fmt::Debug for InMemoryLayerInner {
///
/// This global state is used to implement behaviors that require a global view of the system, e.g.
/// rolling layers proactively to limit the total amount of dirty data.
struct GlobalResources {
pub(crate) struct GlobalResources {
// Limit on how high dirty_bytes may grow before we start freezing layers to reduce it.
// Zero means unlimited.
pub(crate) max_dirty_bytes: AtomicU64,
// How many bytes are in all EphemeralFile objects
dirty_bytes: AtomicU64,
// How many layers are contributing to dirty_bytes
@@ -118,11 +121,12 @@ impl GlobalResourceUnits {
/// Do not call this frequently: all timelines will write to these same global atomics,
/// so this is a relatively expensive operation. Wait at least a few seconds between calls.
fn publish_size(&mut self, size: u64) {
///
/// Returns the effective layer size limit that should be applied, if any, to keep
/// the total number of dirty bytes below the configured maximum.
fn publish_size(&mut self, size: u64) -> Option<u64> {
let new_global_dirty_bytes = match size.cmp(&self.dirty_bytes) {
Ordering::Equal => {
return;
}
Ordering::Equal => GLOBAL_RESOURCES.dirty_bytes.load(AtomicOrdering::Relaxed),
Ordering::Greater => {
let delta = size - self.dirty_bytes;
let old = GLOBAL_RESOURCES
@@ -146,6 +150,21 @@ impl GlobalResourceUnits {
TIMELINE_EPHEMERAL_BYTES.set(new_global_dirty_bytes);
self.dirty_bytes = size;
let max_dirty_bytes = GLOBAL_RESOURCES
.max_dirty_bytes
.load(AtomicOrdering::Relaxed);
if max_dirty_bytes > 0 && new_global_dirty_bytes > max_dirty_bytes {
// Set the layer file limit to the average layer size: this implies that all above-average
// sized layers will be elegible for freezing. They will be frozen in the order they
// next enter publish_size.
Some(
new_global_dirty_bytes
/ GLOBAL_RESOURCES.dirty_layers.load(AtomicOrdering::Relaxed) as u64,
)
} else {
None
}
}
// Call publish_size if the input size differs from last published size by more than
@@ -174,7 +193,8 @@ impl Drop for GlobalResourceUnits {
}
}
static GLOBAL_RESOURCES: GlobalResources = GlobalResources {
pub(crate) static GLOBAL_RESOURCES: GlobalResources = GlobalResources {
max_dirty_bytes: AtomicU64::new(0),
dirty_bytes: AtomicU64::new(0),
dirty_layers: AtomicUsize::new(0),
};
@@ -194,6 +214,10 @@ impl InMemoryLayer {
}
}
pub(crate) fn try_len(&self) -> Option<u64> {
self.inner.try_read().map(|i| i.file.len()).ok()
}
pub(crate) fn assert_writable(&self) {
assert!(self.end_lsn.get().is_none());
}
@@ -486,10 +510,10 @@ impl InMemoryLayer {
Ok(())
}
pub(crate) async fn tick(&self) {
pub(crate) async fn tick(&self) -> Option<u64> {
let mut inner = self.inner.write().await;
let size = inner.file.len();
inner.resource_units.publish_size(size);
inner.resource_units.publish_size(size)
}
pub(crate) async fn put_tombstones(&self, _key_ranges: &[(Range<Key>, Lsn)]) -> Result<()> {

View File

@@ -19,7 +19,7 @@ use pageserver_api::{
keyspace::KeySpaceAccum,
models::{
CompactionAlgorithm, DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest,
EvictionPolicy, LayerMapInfo, TimelineState,
EvictionPolicy, InMemoryLayerInfo, LayerMapInfo, TimelineState,
},
reltag::BlockNumber,
shard::{ShardIdentity, TenantShardId},
@@ -1142,6 +1142,79 @@ impl Timeline {
self.flush_frozen_layers_and_wait().await
}
/// If there is no writer, and conditions for rolling the latest layer are met, then freeze it.
///
/// This is for use in background housekeeping, to provide guarantees of layers closing eventually
/// even if there are no ongoing writes to drive that.
async fn maybe_freeze_ephemeral_layer(&self) {
let Ok(_write_guard) = self.write_lock.try_lock() else {
// If the write lock is held, there is an active wal receiver: rolling open layers
// is their responsibility while they hold this lock.
return;
};
let Ok(layers_guard) = self.layers.try_read() else {
// Don't block if the layer lock is busy
return;
};
let Some(open_layer) = &layers_guard.layer_map().open_layer else {
// No open layer, no work to do.
return;
};
let Some(current_size) = open_layer.try_len() else {
// Unexpected: since we hold the write guard, nobody else should be writing to this layer, so
// read lock to get size should always succeed.
tracing::warn!("Lock conflict while reading size of open layer");
return;
};
let current_lsn = self.get_last_record_lsn();
let checkpoint_distance_override = open_layer.tick().await;
if let Some(size_override) = checkpoint_distance_override {
if current_size > size_override {
// This is not harmful, but it only happens in relatively rare cases where
// time-based checkpoints are not happening fast enough to keep the amount of
// ephemeral data within configured limits. It's a sign of stress on the system.
tracing::info!("Early-rolling open layer at size {current_size} (limit {size_override}) due to dirty data pressure");
}
}
let checkpoint_distance =
checkpoint_distance_override.unwrap_or(self.get_checkpoint_distance());
if self.should_roll(
current_size,
current_size,
checkpoint_distance,
self.get_last_record_lsn(),
self.last_freeze_at.load(),
*self.last_freeze_ts.read().unwrap(),
) {
match open_layer.info() {
InMemoryLayerInfo::Frozen { lsn_start, lsn_end } => {
// We may reach this point if the layer was already frozen by not yet flushed: flushing
// happens asynchronously in the background.
tracing::debug!(
"Not freezing open layer, it's already frozen ({lsn_start}..{lsn_end})"
);
}
InMemoryLayerInfo::Open { .. } => {
// Upgrade to a write lock and freeze the layer
drop(layers_guard);
let mut layers_guard = self.layers.write().await;
layers_guard
.try_freeze_in_memory_layer(current_lsn, &self.last_freeze_at)
.await;
}
}
self.flush_frozen_layers();
}
}
/// Outermost timeline compaction operation; downloads needed layers.
pub(crate) async fn compact(
self: &Arc<Self>,
@@ -1164,6 +1237,11 @@ impl Timeline {
(guard, permit)
};
// Prior to compaction, check if an open ephemeral layer should be closed: this provides
// background enforcement of checkpoint interval if there is no active WAL receiver, to avoid keeping
// an ephemeral layer open forever when idle.
self.maybe_freeze_ephemeral_layer().await;
// this wait probably never needs any "long time spent" logging, because we already nag if
// compaction task goes over it's period (20s) which is quite often in production.
let (_guard, _permit) = tokio::select! {
@@ -1196,6 +1274,7 @@ impl Timeline {
pub(crate) fn activate(
self: &Arc<Self>,
parent: Arc<crate::tenant::Tenant>,
broker_client: BrokerClientChannel,
background_jobs_can_start: Option<&completion::Barrier>,
ctx: &RequestContext,
@@ -1206,7 +1285,7 @@ impl Timeline {
}
self.launch_wal_receiver(ctx, broker_client);
self.set_state(TimelineState::Active);
self.launch_eviction_task(background_jobs_can_start);
self.launch_eviction_task(parent, background_jobs_can_start);
}
/// Graceful shutdown, may do a lot of I/O as we flush any open layers to disk and then
@@ -1434,6 +1513,53 @@ impl Timeline {
Err(EvictionError::Timeout) => Ok(Some(false)),
}
}
fn should_roll(
&self,
layer_size: u64,
projected_layer_size: u64,
checkpoint_distance: u64,
projected_lsn: Lsn,
last_freeze_at: Lsn,
last_freeze_ts: Instant,
) -> bool {
let distance = projected_lsn.widening_sub(last_freeze_at);
// Rolling the open layer can be triggered by:
// 1. The distance from the last LSN we rolled at. This bounds the amount of WAL that
// the safekeepers need to store. For sharded tenants, we multiply by shard count to
// account for how writes are distributed across shards: we expect each node to consume
// 1/count of the LSN on average.
// 2. The size of the currently open layer.
// 3. The time since the last roll. It helps safekeepers to regard pageserver as caught
// up and suspend activity.
if distance >= checkpoint_distance as i128 * self.shard_identity.count.count() as i128 {
info!(
"Will roll layer at {} with layer size {} due to LSN distance ({})",
projected_lsn, layer_size, distance
);
true
} else if projected_layer_size >= checkpoint_distance {
info!(
"Will roll layer at {} with layer size {} due to layer size ({})",
projected_lsn, layer_size, projected_layer_size
);
true
} else if distance > 0 && last_freeze_ts.elapsed() >= self.get_checkpoint_timeout() {
info!(
"Will roll layer at {} with layer size {} due to time since last flush ({:?})",
projected_lsn,
layer_size,
last_freeze_ts.elapsed()
);
true
} else {
false
}
}
}
/// Number of times we will compute partition within a checkpoint distance.
@@ -2587,6 +2713,10 @@ impl Timeline {
// Get all the data needed to reconstruct the page version from this layer.
// But if we have an older cached page image, no need to go past that.
let lsn_floor = max(cached_lsn + 1, start_lsn);
let open_layer = open_layer.clone();
drop(guard);
result = match open_layer
.get_value_reconstruct_data(
key,
@@ -2604,10 +2734,7 @@ impl Timeline {
traversal_path.push((
result,
cont_lsn,
Box::new({
let open_layer = Arc::clone(open_layer);
move || open_layer.traversal_id()
}),
Box::new(move || open_layer.traversal_id()),
));
continue 'outer;
}
@@ -2617,6 +2744,10 @@ impl Timeline {
if cont_lsn > start_lsn {
//info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
let lsn_floor = max(cached_lsn + 1, start_lsn);
let frozen_layer = frozen_layer.clone();
drop(guard);
result = match frozen_layer
.get_value_reconstruct_data(
key,
@@ -2634,10 +2765,7 @@ impl Timeline {
traversal_path.push((
result,
cont_lsn,
Box::new({
let frozen_layer = Arc::clone(frozen_layer);
move || frozen_layer.traversal_id()
}),
Box::new(move || frozen_layer.traversal_id()),
));
continue 'outer;
}
@@ -2645,6 +2773,8 @@ impl Timeline {
if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
let layer = guard.get_from_desc(&layer);
drop(guard);
// Get all the data needed to reconstruct the page version from this layer.
// But if we have an older cached page image, no need to go past that.
let lsn_floor = max(cached_lsn + 1, lsn_floor);
@@ -4451,52 +4581,6 @@ impl<'a> TimelineWriter<'a> {
res
}
/// "Tick" the timeline writer: it will roll the open layer if required
/// and do nothing else.
pub(crate) async fn tick(&mut self) -> anyhow::Result<()> {
self.open_layer_if_present().await?;
let last_record_lsn = self.get_last_record_lsn();
let action = self.get_open_layer_action(last_record_lsn, 0);
if action == OpenLayerAction::Roll {
self.roll_layer(last_record_lsn).await?;
} else if let Some(writer_state) = &mut *self.write_guard {
// Periodic update of statistics
writer_state.open_layer.tick().await;
}
Ok(())
}
/// Populate the timeline writer state only if an in-memory layer
/// is already open.
async fn open_layer_if_present(&mut self) -> anyhow::Result<()> {
assert!(self.write_guard.is_none());
let open_layer = {
let guard = self.layers.read().await;
let layers = guard.layer_map();
match layers.open_layer {
Some(ref open_layer) => open_layer.clone(),
None => {
return Ok(());
}
}
};
let initial_size = open_layer.size().await?;
let last_freeze_at = self.last_freeze_at.load();
let last_freeze_ts = *self.last_freeze_ts.read().unwrap();
self.write_guard.replace(TimelineWriterState::new(
open_layer,
initial_size,
last_freeze_at,
last_freeze_ts,
));
Ok(())
}
async fn handle_open_layer_action(
&mut self,
at: Lsn,
@@ -4568,43 +4652,14 @@ impl<'a> TimelineWriter<'a> {
return OpenLayerAction::None;
}
let distance = lsn.widening_sub(state.cached_last_freeze_at);
let proposed_open_layer_size = state.current_size + new_value_size;
// Rolling the open layer can be triggered by:
// 1. The distance from the last LSN we rolled at. This bounds the amount of WAL that
// the safekeepers need to store. For sharded tenants, we multiply by shard count to
// account for how writes are distributed across shards: we expect each node to consume
// 1/count of the LSN on average.
// 2. The size of the currently open layer.
// 3. The time since the last roll. It helps safekeepers to regard pageserver as caught
// up and suspend activity.
if distance
>= self.get_checkpoint_distance() as i128 * self.shard_identity.count.count() as i128
{
info!(
"Will roll layer at {} with layer size {} due to LSN distance ({})",
lsn, state.current_size, distance
);
OpenLayerAction::Roll
} else if proposed_open_layer_size >= self.get_checkpoint_distance() {
info!(
"Will roll layer at {} with layer size {} due to layer size ({})",
lsn, state.current_size, proposed_open_layer_size
);
OpenLayerAction::Roll
} else if distance > 0
&& state.cached_last_freeze_ts.elapsed() >= self.get_checkpoint_timeout()
{
info!(
"Will roll layer at {} with layer size {} due to time since last flush ({:?})",
lsn,
state.current_size,
state.cached_last_freeze_ts.elapsed()
);
if self.tl.should_roll(
state.current_size,
state.current_size + new_value_size,
self.get_checkpoint_distance(),
lsn,
state.cached_last_freeze_at,
state.cached_last_freeze_ts,
) {
OpenLayerAction::Roll
} else {
OpenLayerAction::None

View File

@@ -51,6 +51,7 @@ pub struct EvictionTaskTenantState {
impl Timeline {
pub(super) fn launch_eviction_task(
self: &Arc<Self>,
parent: Arc<Tenant>,
background_tasks_can_start: Option<&completion::Barrier>,
) {
let self_clone = Arc::clone(self);
@@ -72,14 +73,14 @@ impl Timeline {
_ = completion::Barrier::maybe_wait(background_tasks_can_start) => {}
};
self_clone.eviction_task(cancel).await;
self_clone.eviction_task(parent, cancel).await;
Ok(())
},
);
}
#[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))]
async fn eviction_task(self: Arc<Self>, cancel: CancellationToken) {
async fn eviction_task(self: Arc<Self>, tenant: Arc<Tenant>, cancel: CancellationToken) {
use crate::tenant::tasks::random_init_delay;
// acquire the gate guard only once within a useful span
@@ -103,7 +104,7 @@ impl Timeline {
loop {
let policy = self.get_eviction_policy();
let cf = self
.eviction_iteration(&policy, &cancel, &guard, &ctx)
.eviction_iteration(&tenant, &policy, &cancel, &guard, &ctx)
.await;
match cf {
@@ -123,6 +124,7 @@ impl Timeline {
#[instrument(skip_all, fields(policy_kind = policy.discriminant_str()))]
async fn eviction_iteration(
self: &Arc<Self>,
tenant: &Tenant,
policy: &EvictionPolicy,
cancel: &CancellationToken,
gate: &GateGuard,
@@ -137,7 +139,7 @@ impl Timeline {
}
EvictionPolicy::LayerAccessThreshold(p) => {
match self
.eviction_iteration_threshold(p, cancel, gate, ctx)
.eviction_iteration_threshold(tenant, p, cancel, gate, ctx)
.await
{
ControlFlow::Break(()) => return ControlFlow::Break(()),
@@ -146,7 +148,11 @@ impl Timeline {
(p.period, p.threshold)
}
EvictionPolicy::OnlyImitiate(p) => {
if self.imitiate_only(p, cancel, gate, ctx).await.is_break() {
if self
.imitiate_only(tenant, p, cancel, gate, ctx)
.await
.is_break()
{
return ControlFlow::Break(());
}
(p.period, p.threshold)
@@ -175,6 +181,7 @@ impl Timeline {
async fn eviction_iteration_threshold(
self: &Arc<Self>,
tenant: &Tenant,
p: &EvictionPolicyLayerAccessThreshold,
cancel: &CancellationToken,
gate: &GateGuard,
@@ -193,7 +200,10 @@ impl Timeline {
_ = self.cancel.cancelled() => return ControlFlow::Break(()),
};
match self.imitate_layer_accesses(p, cancel, gate, ctx).await {
match self
.imitate_layer_accesses(tenant, p, cancel, gate, ctx)
.await
{
ControlFlow::Break(()) => return ControlFlow::Break(()),
ControlFlow::Continue(()) => (),
}
@@ -315,6 +325,7 @@ impl Timeline {
/// disk usage based eviction task.
async fn imitiate_only(
self: &Arc<Self>,
tenant: &Tenant,
p: &EvictionPolicyLayerAccessThreshold,
cancel: &CancellationToken,
gate: &GateGuard,
@@ -331,7 +342,8 @@ impl Timeline {
_ = self.cancel.cancelled() => return ControlFlow::Break(()),
};
self.imitate_layer_accesses(p, cancel, gate, ctx).await
self.imitate_layer_accesses(tenant, p, cancel, gate, ctx)
.await
}
/// If we evict layers but keep cached values derived from those layers, then
@@ -361,6 +373,7 @@ impl Timeline {
#[instrument(skip_all)]
async fn imitate_layer_accesses(
&self,
tenant: &Tenant,
p: &EvictionPolicyLayerAccessThreshold,
cancel: &CancellationToken,
gate: &GateGuard,
@@ -396,17 +409,11 @@ impl Timeline {
// Make one of the tenant's timelines draw the short straw and run the calculation.
// The others wait until the calculation is done so that they take into account the
// imitated accesses that the winner made.
let tenant = match crate::tenant::mgr::get_tenant(self.tenant_shard_id, true) {
Ok(t) => t,
Err(_) => {
return ControlFlow::Break(());
}
};
let mut state = tenant.eviction_task_tenant_state.lock().await;
match state.last_layer_access_imitation {
Some(ts) if ts.elapsed() < inter_imitate_period => { /* no need to run */ }
_ => {
self.imitate_synthetic_size_calculation_worker(&tenant, cancel, ctx)
self.imitate_synthetic_size_calculation_worker(tenant, cancel, ctx)
.await;
state.last_layer_access_imitation = Some(tokio::time::Instant::now());
}
@@ -480,7 +487,7 @@ impl Timeline {
#[instrument(skip_all)]
async fn imitate_synthetic_size_calculation_worker(
&self,
tenant: &Arc<Tenant>,
tenant: &Tenant,
cancel: &CancellationToken,
ctx: &RequestContext,
) {

View File

@@ -86,6 +86,7 @@ impl<'t> UninitializedTimeline<'t> {
/// Prepares timeline data by loading it from the basebackup archive.
pub(crate) async fn import_basebackup_from_tar(
self,
tenant: Arc<Tenant>,
copyin_read: &mut (impl tokio::io::AsyncRead + Send + Sync + Unpin),
base_lsn: Lsn,
broker_client: storage_broker::BrokerClientChannel,
@@ -114,7 +115,7 @@ impl<'t> UninitializedTimeline<'t> {
// All the data has been imported. Insert the Timeline into the tenant's timelines map
let tl = self.finish_creation()?;
tl.activate(broker_client, None, ctx);
tl.activate(tenant, broker_client, None, ctx);
Ok(tl)
}

View File

@@ -33,11 +33,9 @@ use crate::tenant::timeline::walreceiver::connection_manager::{
use pageserver_api::shard::TenantShardId;
use std::future::Future;
use std::num::NonZeroU64;
use std::ops::ControlFlow;
use std::sync::Arc;
use std::time::Duration;
use storage_broker::BrokerClientChannel;
use tokio::select;
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
use tracing::*;
@@ -91,31 +89,27 @@ impl WalReceiver {
async move {
debug_assert_current_span_has_tenant_and_timeline_id();
debug!("WAL receiver manager started, connecting to broker");
let cancel = task_mgr::shutdown_token();
let mut connection_manager_state = ConnectionManagerState::new(
timeline,
conf,
);
loop {
select! {
_ = task_mgr::shutdown_watcher() => {
trace!("WAL receiver shutdown requested, shutting down");
while !cancel.is_cancelled() {
let loop_step_result = connection_manager_loop_step(
&mut broker_client,
&mut connection_manager_state,
&walreceiver_ctx,
&cancel,
&loop_status,
).await;
match loop_step_result {
Ok(()) => continue,
Err(_cancelled) => {
trace!("Connection manager loop ended, shutting down");
break;
},
loop_step_result = connection_manager_loop_step(
&mut broker_client,
&mut connection_manager_state,
&walreceiver_ctx,
&loop_status,
) => match loop_step_result {
ControlFlow::Continue(()) => continue,
ControlFlow::Break(()) => {
trace!("Connection manager loop ended, shutting down");
break;
}
},
}
}
}
connection_manager_state.shutdown().await;
*loop_status.write().unwrap() = None;
Ok(())
@@ -197,6 +191,9 @@ impl<E: Clone> TaskHandle<E> {
}
}
/// # Cancel-Safety
///
/// Cancellation-safe.
async fn next_task_event(&mut self) -> TaskEvent<E> {
match self.events_receiver.changed().await {
Ok(()) => TaskEvent::Update((self.events_receiver.borrow()).clone()),

View File

@@ -17,7 +17,7 @@ use crate::metrics::{
WALRECEIVER_ACTIVE_MANAGERS, WALRECEIVER_BROKER_UPDATES, WALRECEIVER_CANDIDATES_ADDED,
WALRECEIVER_CANDIDATES_REMOVED, WALRECEIVER_SWITCHES,
};
use crate::task_mgr::{shutdown_token, TaskKind};
use crate::task_mgr::TaskKind;
use crate::tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline};
use anyhow::Context;
use chrono::{NaiveDateTime, Utc};
@@ -27,7 +27,7 @@ use storage_broker::proto::SafekeeperTimelineInfo;
use storage_broker::proto::SubscribeSafekeeperInfoRequest;
use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
use storage_broker::{BrokerClientChannel, Code, Streaming};
use tokio::select;
use tokio_util::sync::CancellationToken;
use tracing::*;
use postgres_connection::PgConnectionConfig;
@@ -45,27 +45,33 @@ use super::{
TaskEvent, TaskHandle,
};
pub(crate) struct Cancelled;
/// Attempts to subscribe for timeline updates, pushed by safekeepers into the broker.
/// Based on the updates, desides whether to start, keep or stop a WAL receiver task.
/// If storage broker subscription is cancelled, exits.
///
/// # Cancel-Safety
///
/// Not cancellation-safe. Use `cancel` token to request cancellation.
pub(super) async fn connection_manager_loop_step(
broker_client: &mut BrokerClientChannel,
connection_manager_state: &mut ConnectionManagerState,
ctx: &RequestContext,
cancel: &CancellationToken,
manager_status: &std::sync::RwLock<Option<ConnectionManagerStatus>>,
) -> ControlFlow<(), ()> {
match connection_manager_state
.timeline
.wait_to_become_active(ctx)
.await
{
) -> Result<(), Cancelled> {
match tokio::select! {
_ = cancel.cancelled() => { return Err(Cancelled); },
st = connection_manager_state.timeline.wait_to_become_active(ctx) => { st }
} {
Ok(()) => {}
Err(new_state) => {
debug!(
?new_state,
"state changed, stopping wal connection manager loop"
);
return ControlFlow::Break(());
return Err(Cancelled);
}
}
@@ -86,7 +92,7 @@ pub(super) async fn connection_manager_loop_step(
// Subscribe to the broker updates. Stream shares underlying TCP connection
// with other streams on this client (other connection managers). When
// object goes out of scope, stream finishes in drop() automatically.
let mut broker_subscription = subscribe_for_timeline_updates(broker_client, id).await;
let mut broker_subscription = subscribe_for_timeline_updates(broker_client, id, cancel).await?;
debug!("Subscribed for broker timeline updates");
loop {
@@ -94,6 +100,7 @@ pub(super) async fn connection_manager_loop_step(
// These things are happening concurrently:
//
// - cancellation request
// - keep receiving WAL on the current connection
// - if the shared state says we need to change connection, disconnect and return
// - this runs in a separate task and we receive updates via a watch channel
@@ -101,7 +108,11 @@ pub(super) async fn connection_manager_loop_step(
// - receive updates from broker
// - this might change the current desired connection
// - timeline state changes to something that does not allow walreceiver to run concurrently
select! {
// NB: make sure each of the select expressions are cancellation-safe
// (no need for arms to be cancellation-safe).
tokio::select! {
_ = cancel.cancelled() => { return Err(Cancelled); }
Some(wal_connection_update) = async {
match connection_manager_state.wal_connection.as_mut() {
Some(wal_connection) => Some(wal_connection.connection_task.next_task_event().await),
@@ -133,7 +144,7 @@ pub(super) async fn connection_manager_loop_step(
},
// Got a new update from the broker
broker_update = broker_subscription.message() => {
broker_update = broker_subscription.message() /* TODO: review cancellation-safety */ => {
match broker_update {
Ok(Some(broker_update)) => connection_manager_state.register_timeline_update(broker_update),
Err(status) => {
@@ -147,16 +158,17 @@ pub(super) async fn connection_manager_loop_step(
warn!("broker subscription failed: {status}");
}
}
return ControlFlow::Continue(());
return Ok(());
}
Ok(None) => {
error!("broker subscription stream ended"); // can't happen
return ControlFlow::Continue(());
return Ok(());
}
}
},
new_event = async {
// Reminder: this match arm needs to be cancellation-safe.
loop {
if connection_manager_state.timeline.current_state() == TimelineState::Loading {
warn!("wal connection manager should only be launched after timeline has become active");
@@ -182,11 +194,11 @@ pub(super) async fn connection_manager_loop_step(
}
} => match new_event {
ControlFlow::Continue(()) => {
return ControlFlow::Continue(());
return Ok(());
}
ControlFlow::Break(()) => {
debug!("Timeline is no longer active, stopping wal connection manager loop");
return ControlFlow::Break(());
return Err(Cancelled);
}
},
@@ -218,16 +230,15 @@ pub(super) async fn connection_manager_loop_step(
async fn subscribe_for_timeline_updates(
broker_client: &mut BrokerClientChannel,
id: TenantTimelineId,
) -> Streaming<SafekeeperTimelineInfo> {
cancel: &CancellationToken,
) -> Result<Streaming<SafekeeperTimelineInfo>, Cancelled> {
let mut attempt = 0;
let cancel = shutdown_token();
loop {
exponential_backoff(
attempt,
DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS,
&cancel,
cancel,
)
.await;
attempt += 1;
@@ -241,9 +252,14 @@ async fn subscribe_for_timeline_updates(
subscription_key: Some(key),
};
match broker_client.subscribe_safekeeper_info(request).await {
match {
tokio::select! {
r = broker_client.subscribe_safekeeper_info(request) => { r }
_ = cancel.cancelled() => { return Err(Cancelled); }
}
} {
Ok(resp) => {
return resp.into_inner();
return Ok(resp.into_inner());
}
Err(e) => {
// Safekeeper nodes can stop pushing timeline updates to the broker, when no new writes happen and
@@ -486,6 +502,10 @@ impl ConnectionManagerState {
/// Drops the current connection (if any) and updates retry timeout for the next
/// connection attempt to the same safekeeper.
///
/// # Cancel-Safety
///
/// Not cancellation-safe.
async fn drop_old_connection(&mut self, needs_shutdown: bool) {
let wal_connection = match self.wal_connection.take() {
Some(wal_connection) => wal_connection,
@@ -493,7 +513,14 @@ impl ConnectionManagerState {
};
if needs_shutdown {
wal_connection.connection_task.shutdown().await;
wal_connection
.connection_task
.shutdown()
// This here is why this function isn't cancellation-safe.
// If we got cancelled here, then self.wal_connection is already None and we lose track of the task.
// Even if our caller diligently calls Self::shutdown(), it will find a self.wal_connection=None
// and thus be ineffective.
.await;
}
let retry = self
@@ -838,6 +865,9 @@ impl ConnectionManagerState {
}
}
/// # Cancel-Safety
///
/// Not cancellation-safe.
pub(super) async fn shutdown(mut self) {
if let Some(wal_connection) = self.wal_connection.take() {
wal_connection.connection_task.shutdown().await;

View File

@@ -389,17 +389,6 @@ pub(super) async fn handle_walreceiver_connection(
}
}
{
// This is a hack. It piggybacks on the keepalive messages sent by the
// safekeeper in order to enforce `checkpoint_timeout` on the currently
// open layer. This hack doesn't provide a bound on the total size of
// in-memory layers on a pageserver. See https://github.com/neondatabase/neon/issues/6916.
let mut writer = timeline.writer().await;
if let Err(err) = writer.tick().await {
warn!("Timeline writer tick failed: {err}");
}
}
if let Some(last_lsn) = status_update {
let timeline_remote_consistent_lsn = timeline
.get_remote_consistent_lsn_visible()

View File

@@ -12,6 +12,8 @@ use crate::console::errors::GetAuthInfoError;
use crate::console::provider::{CachedRoleSecret, ConsoleBackend};
use crate::console::{AuthSecret, NodeInfo};
use crate::context::RequestMonitoring;
use crate::intern::EndpointIdInt;
use crate::metrics::{AUTH_RATE_LIMIT_HITS, ENDPOINTS_AUTH_RATE_LIMITED};
use crate::proxy::connect_compute::ComputeConnectBackend;
use crate::proxy::NeonOptions;
use crate::stream::Stream;
@@ -28,7 +30,7 @@ use crate::{
use crate::{scram, EndpointCacheKey, EndpointId, RoleName};
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::info;
use tracing::{info, warn};
/// Alternative to [`std::borrow::Cow`] but doesn't need `T: ToOwned` as we don't need that functionality
pub enum MaybeOwned<'a, T> {
@@ -174,6 +176,52 @@ impl TryFrom<ComputeUserInfoMaybeEndpoint> for ComputeUserInfo {
}
}
impl AuthenticationConfig {
pub fn check_rate_limit(
&self,
ctx: &mut RequestMonitoring,
secret: AuthSecret,
endpoint: &EndpointId,
is_cleartext: bool,
) -> auth::Result<AuthSecret> {
// we have validated the endpoint exists, so let's intern it.
let endpoint_int = EndpointIdInt::from(endpoint);
// only count the full hash count if password hack or websocket flow.
// in other words, if proxy needs to run the hashing
let password_weight = if is_cleartext {
match &secret {
#[cfg(any(test, feature = "testing"))]
AuthSecret::Md5(_) => 1,
AuthSecret::Scram(s) => s.iterations + 1,
}
} else {
// validating scram takes just 1 hmac_sha_256 operation.
1
};
let limit_not_exceeded = self
.rate_limiter
.check((endpoint_int, ctx.peer_addr), password_weight);
if !limit_not_exceeded {
warn!(
enabled = self.rate_limiter_enabled,
"rate limiting authentication"
);
AUTH_RATE_LIMIT_HITS.inc();
ENDPOINTS_AUTH_RATE_LIMITED.measure(endpoint);
if self.rate_limiter_enabled {
return Err(auth::AuthError::too_many_connections());
}
}
Ok(secret)
}
}
/// True to its name, this function encapsulates our current auth trade-offs.
/// Here, we choose the appropriate auth flow based on circumstances.
///
@@ -214,14 +262,24 @@ async fn auth_quirks(
Some(secret) => secret,
None => api.get_role_secret(ctx, &info).await?,
};
let (cached_entry, secret) = cached_secret.take_value();
let secret = match secret {
Some(secret) => config.check_rate_limit(
ctx,
secret,
&info.endpoint,
unauthenticated_password.is_some() || allow_cleartext,
)?,
None => {
// If we don't have an authentication secret, we mock one to
// prevent malicious probing (possible due to missing protocol steps).
// This mocked secret will never lead to successful authentication.
info!("authentication info not found, mocking it");
AuthSecret::Scram(scram::ServerSecret::mock(rand::random()))
}
};
let secret = cached_secret.value.clone().unwrap_or_else(|| {
// If we don't have an authentication secret, we mock one to
// prevent malicious probing (possible due to missing protocol steps).
// This mocked secret will never lead to successful authentication.
info!("authentication info not found, mocking it");
AuthSecret::Scram(scram::ServerSecret::mock(&info.user, rand::random()))
});
match authenticate_with_secret(
ctx,
secret,
@@ -237,7 +295,7 @@ async fn auth_quirks(
Err(e) => {
if e.is_auth_failed() {
// The password could have been changed, so we invalidate the cache.
cached_secret.invalidate();
cached_entry.invalidate();
}
Err(e)
}
@@ -415,6 +473,7 @@ mod tests {
use bytes::BytesMut;
use fallible_iterator::FallibleIterator;
use once_cell::sync::Lazy;
use postgres_protocol::{
authentication::sasl::{ChannelBinding, ScramSha256},
message::{backend::Message as PgMessage, frontend},
@@ -432,6 +491,7 @@ mod tests {
},
context::RequestMonitoring,
proxy::NeonOptions,
rate_limiter::{AuthRateLimiter, RateBucketInfo},
scram::ServerSecret,
stream::{PqStream, Stream},
};
@@ -473,9 +533,11 @@ mod tests {
}
}
static CONFIG: &AuthenticationConfig = &AuthenticationConfig {
static CONFIG: Lazy<AuthenticationConfig> = Lazy::new(|| AuthenticationConfig {
scram_protocol_timeout: std::time::Duration::from_secs(5),
};
rate_limiter_enabled: true,
rate_limiter: AuthRateLimiter::new(&RateBucketInfo::DEFAULT_AUTH_SET),
});
async fn read_message(r: &mut (impl AsyncRead + Unpin), b: &mut BytesMut) -> PgMessage {
loop {
@@ -544,7 +606,7 @@ mod tests {
}
});
let _creds = auth_quirks(&mut ctx, &api, user_info, &mut stream, false, CONFIG)
let _creds = auth_quirks(&mut ctx, &api, user_info, &mut stream, false, &CONFIG)
.await
.unwrap();
@@ -584,7 +646,7 @@ mod tests {
client.write_all(&write).await.unwrap();
});
let _creds = auth_quirks(&mut ctx, &api, user_info, &mut stream, true, CONFIG)
let _creds = auth_quirks(&mut ctx, &api, user_info, &mut stream, true, &CONFIG)
.await
.unwrap();
@@ -624,7 +686,7 @@ mod tests {
client.write_all(&write).await.unwrap();
});
let creds = auth_quirks(&mut ctx, &api, user_info, &mut stream, true, CONFIG)
let creds = auth_quirks(&mut ctx, &api, user_info, &mut stream, true, &CONFIG)
.await
.unwrap();

View File

@@ -18,6 +18,7 @@ use proxy::console;
use proxy::context::parquet::ParquetUploadArgs;
use proxy::http;
use proxy::metrics::NUM_CANCELLATION_REQUESTS_SOURCE_FROM_CLIENT;
use proxy::rate_limiter::AuthRateLimiter;
use proxy::rate_limiter::EndpointRateLimiter;
use proxy::rate_limiter::RateBucketInfo;
use proxy::rate_limiter::RateLimiterConfig;
@@ -141,10 +142,16 @@ struct ProxyCliArgs {
///
/// Provided in the form '<Requests Per Second>@<Bucket Duration Size>'.
/// Can be given multiple times for different bucket sizes.
#[clap(long, default_values_t = RateBucketInfo::DEFAULT_SET)]
#[clap(long, default_values_t = RateBucketInfo::DEFAULT_ENDPOINT_SET)]
endpoint_rps_limit: Vec<RateBucketInfo>,
/// Whether the auth rate limiter actually takes effect (for testing)
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
auth_rate_limit_enabled: bool,
/// Authentication rate limiter max number of hashes per second.
#[clap(long, default_values_t = RateBucketInfo::DEFAULT_AUTH_SET)]
auth_rate_limit: Vec<RateBucketInfo>,
/// Redis rate limiter max number of requests per second.
#[clap(long, default_values_t = RateBucketInfo::DEFAULT_SET)]
#[clap(long, default_values_t = RateBucketInfo::DEFAULT_ENDPOINT_SET)]
redis_rps_limit: Vec<RateBucketInfo>,
/// Initial limit for dynamic rate limiter. Makes sense only if `rate_limit_algorithm` is *not* `None`.
#[clap(long, default_value_t = 100)]
@@ -510,6 +517,8 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
};
let authentication_config = AuthenticationConfig {
scram_protocol_timeout: args.scram_protocol_timeout,
rate_limiter_enabled: args.auth_rate_limit_enabled,
rate_limiter: AuthRateLimiter::new(args.auth_rate_limit.clone()),
};
let mut endpoint_rps_limit = args.endpoint_rps_limit.clone();

View File

@@ -43,6 +43,16 @@ impl<C: Cache, V> Cached<C, V> {
Self { token: None, value }
}
pub fn take_value(self) -> (Cached<C, ()>, V) {
(
Cached {
token: self.token,
value: (),
},
self.value,
)
}
/// Drop this entry from a cache if it's still there.
pub fn invalidate(self) -> V {
if let Some((cache, info)) = &self.token {

View File

@@ -373,10 +373,7 @@ mod tests {
let endpoint_id = "endpoint".into();
let user1: RoleName = "user1".into();
let user2: RoleName = "user2".into();
let secret1 = Some(AuthSecret::Scram(ServerSecret::mock(
user1.as_str(),
[1; 32],
)));
let secret1 = Some(AuthSecret::Scram(ServerSecret::mock([1; 32])));
let secret2 = None;
let allowed_ips = Arc::new(vec![
"127.0.0.1".parse().unwrap(),
@@ -395,10 +392,7 @@ mod tests {
// Shouldn't add more than 2 roles.
let user3: RoleName = "user3".into();
let secret3 = Some(AuthSecret::Scram(ServerSecret::mock(
user3.as_str(),
[3; 32],
)));
let secret3 = Some(AuthSecret::Scram(ServerSecret::mock([3; 32])));
cache.insert_role_secret(&project_id, &endpoint_id, &user3, secret3.clone());
assert!(cache.get_role_secret(&endpoint_id, &user3).is_none());
@@ -431,14 +425,8 @@ mod tests {
let endpoint_id = "endpoint".into();
let user1: RoleName = "user1".into();
let user2: RoleName = "user2".into();
let secret1 = Some(AuthSecret::Scram(ServerSecret::mock(
user1.as_str(),
[1; 32],
)));
let secret2 = Some(AuthSecret::Scram(ServerSecret::mock(
user2.as_str(),
[2; 32],
)));
let secret1 = Some(AuthSecret::Scram(ServerSecret::mock([1; 32])));
let secret2 = Some(AuthSecret::Scram(ServerSecret::mock([2; 32])));
let allowed_ips = Arc::new(vec![
"127.0.0.1".parse().unwrap(),
"127.0.0.2".parse().unwrap(),
@@ -486,14 +474,8 @@ mod tests {
let endpoint_id = "endpoint".into();
let user1: RoleName = "user1".into();
let user2: RoleName = "user2".into();
let secret1 = Some(AuthSecret::Scram(ServerSecret::mock(
user1.as_str(),
[1; 32],
)));
let secret2 = Some(AuthSecret::Scram(ServerSecret::mock(
user2.as_str(),
[2; 32],
)));
let secret1 = Some(AuthSecret::Scram(ServerSecret::mock([1; 32])));
let secret2 = Some(AuthSecret::Scram(ServerSecret::mock([2; 32])));
let allowed_ips = Arc::new(vec![
"127.0.0.1".parse().unwrap(),
"127.0.0.2".parse().unwrap(),

View File

@@ -1,4 +1,8 @@
use crate::{auth, rate_limiter::RateBucketInfo, serverless::GlobalConnPoolOptions};
use crate::{
auth,
rate_limiter::{AuthRateLimiter, RateBucketInfo},
serverless::GlobalConnPoolOptions,
};
use anyhow::{bail, ensure, Context, Ok};
use itertools::Itertools;
use rustls::{
@@ -50,6 +54,8 @@ pub struct HttpConfig {
pub struct AuthenticationConfig {
pub scram_protocol_timeout: tokio::time::Duration,
pub rate_limiter_enabled: bool,
pub rate_limiter: AuthRateLimiter,
}
impl TlsConfig {

View File

@@ -75,6 +75,7 @@ impl Api {
let start = Instant::now();
let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Cplane);
let response = self.endpoint.execute(request).await?;
info!("received http response {response:?}");
drop(pause);
info!(duration = ?start.elapsed(), "received http response");
let body = match parse_body::<GetRoleSecret>(response).await {
@@ -137,6 +138,7 @@ impl Api {
let start = Instant::now();
let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Cplane);
let response = self.endpoint.execute(request).await?;
info!("received http response {response:?}");
drop(pause);
info!(duration = ?start.elapsed(), "received http response");
let body = parse_body::<WakeCompute>(response).await?;

View File

@@ -4,7 +4,10 @@ use ::metrics::{
register_int_gauge_vec, Histogram, HistogramVec, HyperLogLogVec, IntCounterPairVec,
IntCounterVec, IntGauge, IntGaugeVec,
};
use metrics::{register_int_counter, register_int_counter_pair, IntCounter, IntCounterPair};
use metrics::{
register_hll, register_int_counter, register_int_counter_pair, HyperLogLog, IntCounter,
IntCounterPair,
};
use once_cell::sync::Lazy;
use tokio::time::{self, Instant};
@@ -358,3 +361,20 @@ pub static TLS_HANDSHAKE_FAILURES: Lazy<IntCounter> = Lazy::new(|| {
)
.unwrap()
});
pub static ENDPOINTS_AUTH_RATE_LIMITED: Lazy<HyperLogLog<32>> = Lazy::new(|| {
register_hll!(
32,
"proxy_endpoints_auth_rate_limits",
"Number of endpoints affected by authentication rate limits",
)
.unwrap()
});
pub static AUTH_RATE_LIMIT_HITS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"proxy_requests_auth_rate_limits_total",
"Number of connection requests affected by authentication rate limits",
)
.unwrap()
});

View File

@@ -280,7 +280,7 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
// check rate limit
if let Some(ep) = user_info.get_endpoint() {
if !endpoint_rate_limiter.check(ep) {
if !endpoint_rate_limiter.check(ep, 1) {
return stream
.throw_error(auth::AuthError::too_many_connections())
.await?;

View File

@@ -142,8 +142,8 @@ impl Scram {
Ok(Scram(secret))
}
fn mock(user: &str) -> Self {
Scram(scram::ServerSecret::mock(user, rand::random()))
fn mock() -> Self {
Scram(scram::ServerSecret::mock(rand::random()))
}
}
@@ -330,11 +330,7 @@ async fn scram_auth_mock() -> anyhow::Result<()> {
let (client_config, server_config) =
generate_tls_config("generic-project-name.localhost", "localhost")?;
let proxy = tokio::spawn(dummy_proxy(
client,
Some(server_config),
Scram::mock("user"),
));
let proxy = tokio::spawn(dummy_proxy(client, Some(server_config), Scram::mock()));
use rand::{distributions::Alphanumeric, Rng};
let password: String = rand::thread_rng()

View File

@@ -4,4 +4,4 @@ mod limiter;
pub use aimd::Aimd;
pub use limit_algorithm::{AimdConfig, Fixed, RateLimitAlgorithm, RateLimiterConfig};
pub use limiter::Limiter;
pub use limiter::{EndpointRateLimiter, RateBucketInfo, RedisRateLimiter};
pub use limiter::{AuthRateLimiter, EndpointRateLimiter, RateBucketInfo, RedisRateLimiter};

View File

@@ -1,6 +1,8 @@
use std::{
borrow::Cow,
collections::hash_map::RandomState,
hash::BuildHasher,
hash::{BuildHasher, Hash},
net::IpAddr,
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
@@ -15,7 +17,7 @@ use tokio::sync::{Mutex as AsyncMutex, Semaphore, SemaphorePermit};
use tokio::time::{timeout, Duration, Instant};
use tracing::info;
use crate::EndpointId;
use crate::{intern::EndpointIdInt, EndpointId};
use super::{
limit_algorithm::{LimitAlgorithm, Sample},
@@ -49,11 +51,11 @@ impl RedisRateLimiter {
.data
.iter_mut()
.zip(self.info)
.all(|(bucket, info)| bucket.should_allow_request(info, now));
.all(|(bucket, info)| bucket.should_allow_request(info, now, 1));
if should_allow_request {
// only increment the bucket counts if the request will actually be accepted
self.data.iter_mut().for_each(RateBucket::inc);
self.data.iter_mut().for_each(|b| b.inc(1));
}
should_allow_request
@@ -71,9 +73,14 @@ impl RedisRateLimiter {
// saw SNI, before doing TLS handshake. User-side error messages in that case
// does not look very nice (`SSL SYSCALL error: Undefined error: 0`), so for now
// I went with a more expensive way that yields user-friendlier error messages.
pub struct EndpointRateLimiter<Rand = StdRng, Hasher = RandomState> {
map: DashMap<EndpointId, Vec<RateBucket>, Hasher>,
info: &'static [RateBucketInfo],
pub type EndpointRateLimiter = BucketRateLimiter<EndpointId, StdRng, RandomState>;
// This can't be just per IP because that would limit some PaaS that share IP addresses
pub type AuthRateLimiter = BucketRateLimiter<(EndpointIdInt, IpAddr), StdRng, RandomState>;
pub struct BucketRateLimiter<Key, Rand = StdRng, Hasher = RandomState> {
map: DashMap<Key, Vec<RateBucket>, Hasher>,
info: Cow<'static, [RateBucketInfo]>,
access_count: AtomicUsize,
rand: Mutex<Rand>,
}
@@ -85,9 +92,9 @@ struct RateBucket {
}
impl RateBucket {
fn should_allow_request(&mut self, info: &RateBucketInfo, now: Instant) -> bool {
fn should_allow_request(&mut self, info: &RateBucketInfo, now: Instant, n: u32) -> bool {
if now - self.start < info.interval {
self.count < info.max_rpi
self.count + n <= info.max_rpi
} else {
// bucket expired, reset
self.count = 0;
@@ -97,8 +104,8 @@ impl RateBucket {
}
}
fn inc(&mut self) {
self.count += 1;
fn inc(&mut self, n: u32) {
self.count += n;
}
}
@@ -111,7 +118,7 @@ pub struct RateBucketInfo {
impl std::fmt::Display for RateBucketInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let rps = self.max_rpi * 1000 / self.interval.as_millis() as u32;
let rps = (self.max_rpi as u64) * 1000 / self.interval.as_millis() as u64;
write!(f, "{rps}@{}", humantime::format_duration(self.interval))
}
}
@@ -136,12 +143,25 @@ impl std::str::FromStr for RateBucketInfo {
}
impl RateBucketInfo {
pub const DEFAULT_SET: [Self; 3] = [
pub const DEFAULT_ENDPOINT_SET: [Self; 3] = [
Self::new(300, Duration::from_secs(1)),
Self::new(200, Duration::from_secs(60)),
Self::new(100, Duration::from_secs(600)),
];
/// All of these are per endpoint-ip pair.
/// Context: 4096 rounds of pbkdf2 take about 1ms of cpu time to execute (1 milli-cpu-second or 1mcpus).
///
/// First bucket: 300mcpus total per endpoint-ip pair
/// * 1228800 requests per second with 1 hash rounds. (endpoint rate limiter will catch this first)
/// * 300 requests per second with 4096 hash rounds.
/// * 2 requests per second with 600000 hash rounds.
pub const DEFAULT_AUTH_SET: [Self; 3] = [
Self::new(300 * 4096, Duration::from_secs(1)),
Self::new(200 * 4096, Duration::from_secs(60)),
Self::new(100 * 4096, Duration::from_secs(600)),
];
pub fn validate(info: &mut [Self]) -> anyhow::Result<()> {
info.sort_unstable_by_key(|info| info.interval);
let invalid = info
@@ -150,7 +170,7 @@ impl RateBucketInfo {
.find(|(a, b)| a.max_rpi > b.max_rpi);
if let Some((a, b)) = invalid {
bail!(
"invalid endpoint RPS limits. {b} allows fewer requests per bucket than {a} ({} vs {})",
"invalid bucket RPS limits. {b} allows fewer requests per bucket than {a} ({} vs {})",
b.max_rpi,
a.max_rpi,
);
@@ -162,19 +182,24 @@ impl RateBucketInfo {
pub const fn new(max_rps: u32, interval: Duration) -> Self {
Self {
interval,
max_rpi: max_rps * interval.as_millis() as u32 / 1000,
max_rpi: ((max_rps as u64) * (interval.as_millis() as u64) / 1000) as u32,
}
}
}
impl EndpointRateLimiter {
pub fn new(info: &'static [RateBucketInfo]) -> Self {
impl<K: Hash + Eq> BucketRateLimiter<K> {
pub fn new(info: impl Into<Cow<'static, [RateBucketInfo]>>) -> Self {
Self::new_with_rand_and_hasher(info, StdRng::from_entropy(), RandomState::new())
}
}
impl<R: Rng, S: BuildHasher + Clone> EndpointRateLimiter<R, S> {
fn new_with_rand_and_hasher(info: &'static [RateBucketInfo], rand: R, hasher: S) -> Self {
impl<K: Hash + Eq, R: Rng, S: BuildHasher + Clone> BucketRateLimiter<K, R, S> {
fn new_with_rand_and_hasher(
info: impl Into<Cow<'static, [RateBucketInfo]>>,
rand: R,
hasher: S,
) -> Self {
let info = info.into();
info!(buckets = ?info, "endpoint rate limiter");
Self {
info,
@@ -185,7 +210,7 @@ impl<R: Rng, S: BuildHasher + Clone> EndpointRateLimiter<R, S> {
}
/// Check that number of connections to the endpoint is below `max_rps` rps.
pub fn check(&self, endpoint: EndpointId) -> bool {
pub fn check(&self, key: K, n: u32) -> bool {
// do a partial GC every 2k requests. This cleans up ~ 1/64th of the map.
// worst case memory usage is about:
// = 2 * 2048 * 64 * (48B + 72B)
@@ -195,7 +220,7 @@ impl<R: Rng, S: BuildHasher + Clone> EndpointRateLimiter<R, S> {
}
let now = Instant::now();
let mut entry = self.map.entry(endpoint).or_insert_with(|| {
let mut entry = self.map.entry(key).or_insert_with(|| {
vec![
RateBucket {
start: now,
@@ -207,12 +232,12 @@ impl<R: Rng, S: BuildHasher + Clone> EndpointRateLimiter<R, S> {
let should_allow_request = entry
.iter_mut()
.zip(self.info)
.all(|(bucket, info)| bucket.should_allow_request(info, now));
.zip(&*self.info)
.all(|(bucket, info)| bucket.should_allow_request(info, now, n));
if should_allow_request {
// only increment the bucket counts if the request will actually be accepted
entry.iter_mut().for_each(RateBucket::inc);
entry.iter_mut().for_each(|b| b.inc(n));
}
should_allow_request
@@ -223,7 +248,7 @@ impl<R: Rng, S: BuildHasher + Clone> EndpointRateLimiter<R, S> {
/// But that way deletion does not aquire mutex on each entry access.
pub fn do_gc(&self) {
info!(
"cleaning up endpoint rate limiter, current size = {}",
"cleaning up bucket rate limiter, current size = {}",
self.map.len()
);
let n = self.map.shards().len();
@@ -534,7 +559,7 @@ mod tests {
use rustc_hash::FxHasher;
use tokio::time;
use super::{EndpointRateLimiter, Limiter, Outcome};
use super::{BucketRateLimiter, EndpointRateLimiter, Limiter, Outcome};
use crate::{
rate_limiter::{RateBucketInfo, RateLimitAlgorithm},
EndpointId,
@@ -672,12 +697,12 @@ mod tests {
#[test]
fn default_rate_buckets() {
let mut defaults = RateBucketInfo::DEFAULT_SET;
let mut defaults = RateBucketInfo::DEFAULT_ENDPOINT_SET;
RateBucketInfo::validate(&mut defaults[..]).unwrap();
}
#[test]
#[should_panic = "invalid endpoint RPS limits. 10@10s allows fewer requests per bucket than 300@1s (100 vs 300)"]
#[should_panic = "invalid bucket RPS limits. 10@10s allows fewer requests per bucket than 300@1s (100 vs 300)"]
fn rate_buckets_validate() {
let mut rates: Vec<RateBucketInfo> = ["300@1s", "10@10s"]
.into_iter()
@@ -693,42 +718,42 @@ mod tests {
.map(|s| s.parse().unwrap())
.collect();
RateBucketInfo::validate(&mut rates).unwrap();
let limiter = EndpointRateLimiter::new(Vec::leak(rates));
let limiter = EndpointRateLimiter::new(rates);
let endpoint = EndpointId::from("ep-my-endpoint-1234");
time::pause();
for _ in 0..100 {
assert!(limiter.check(endpoint.clone()));
assert!(limiter.check(endpoint.clone(), 1));
}
// more connections fail
assert!(!limiter.check(endpoint.clone()));
assert!(!limiter.check(endpoint.clone(), 1));
// fail even after 500ms as it's in the same bucket
time::advance(time::Duration::from_millis(500)).await;
assert!(!limiter.check(endpoint.clone()));
assert!(!limiter.check(endpoint.clone(), 1));
// after a full 1s, 100 requests are allowed again
time::advance(time::Duration::from_millis(500)).await;
for _ in 1..6 {
for _ in 0..100 {
assert!(limiter.check(endpoint.clone()));
for _ in 0..50 {
assert!(limiter.check(endpoint.clone(), 2));
}
time::advance(time::Duration::from_millis(1000)).await;
}
// more connections after 600 will exceed the 20rps@30s limit
assert!(!limiter.check(endpoint.clone()));
assert!(!limiter.check(endpoint.clone(), 1));
// will still fail before the 30 second limit
time::advance(time::Duration::from_millis(30_000 - 6_000 - 1)).await;
assert!(!limiter.check(endpoint.clone()));
assert!(!limiter.check(endpoint.clone(), 1));
// after the full 30 seconds, 100 requests are allowed again
time::advance(time::Duration::from_millis(1)).await;
for _ in 0..100 {
assert!(limiter.check(endpoint.clone()));
assert!(limiter.check(endpoint.clone(), 1));
}
}
@@ -738,14 +763,41 @@ mod tests {
let rand = rand::rngs::StdRng::from_seed([1; 32]);
let hasher = BuildHasherDefault::<FxHasher>::default();
let limiter = EndpointRateLimiter::new_with_rand_and_hasher(
&RateBucketInfo::DEFAULT_SET,
let limiter = BucketRateLimiter::new_with_rand_and_hasher(
&RateBucketInfo::DEFAULT_ENDPOINT_SET,
rand,
hasher,
);
for i in 0..1_000_000 {
limiter.check(format!("{i}").into());
limiter.check(i, 1);
}
assert!(limiter.map.len() < 150_000);
}
#[test]
fn test_default_auth_set() {
// these values used to exceed u32::MAX
assert_eq!(
RateBucketInfo::DEFAULT_AUTH_SET,
[
RateBucketInfo {
interval: Duration::from_secs(1),
max_rpi: 300 * 4096,
},
RateBucketInfo {
interval: Duration::from_secs(60),
max_rpi: 200 * 4096 * 60,
},
RateBucketInfo {
interval: Duration::from_secs(600),
max_rpi: 100 * 4096 * 600,
}
]
);
for x in RateBucketInfo::DEFAULT_AUTH_SET {
let y = x.to_string().parse().unwrap();
assert_eq!(x, y);
}
}
}

View File

@@ -50,13 +50,13 @@ impl ServerSecret {
/// To avoid revealing information to an attacker, we use a
/// mocked server secret even if the user doesn't exist.
/// See `auth-scram.c : mock_scram_secret` for details.
pub fn mock(user: &str, nonce: [u8; 32]) -> Self {
// Refer to `auth-scram.c : scram_mock_salt`.
let mocked_salt = super::sha256([user.as_bytes(), &nonce]);
pub fn mock(nonce: [u8; 32]) -> Self {
Self {
iterations: 4096,
salt_base64: base64::encode(mocked_salt),
// this doesn't reveal much information as we're going to use
// iteration count 1 for our generated passwords going forward.
// PG16 users can set iteration count=1 already today.
iterations: 1,
salt_base64: base64::encode(nonce),
stored_key: ScramKey::default(),
server_key: ScramKey::default(),
doomed: true,

View File

@@ -42,7 +42,12 @@ impl PoolingBackend {
};
let secret = match cached_secret.value.clone() {
Some(secret) => secret,
Some(secret) => self.config.authentication_config.check_rate_limit(
ctx,
secret,
&user_info.endpoint,
true,
)?,
None => {
// If we don't have an authentication secret, for the http flow we can just return an error.
info!("authentication info not found");

View File

@@ -2126,6 +2126,8 @@ class NeonStorageController(MetricsGetter):
shard_params = {"count": shard_count}
if shard_stripe_size is not None:
shard_params["stripe_size"] = shard_stripe_size
else:
shard_params["stripe_size"] = 32768
body["shard_parameters"] = shard_params
@@ -2139,6 +2141,7 @@ class NeonStorageController(MetricsGetter):
json=body,
headers=self.headers(TokenScope.PAGE_SERVER_API),
)
response.raise_for_status()
log.info(f"tenant_create success: {response.json()}")
def locate(self, tenant_id: TenantId) -> list[dict[str, Any]]:

View File

@@ -86,6 +86,9 @@ DEFAULT_PAGESERVER_ALLOWED_ERRORS = (
# This is especially pronounced in tests that set small checkpoint
# distances.
".*Flushed oversized open layer with size.*",
# During teardown, we stop the storage controller before the pageservers, so pageservers
# can experience connection errors doing background deletion queue work.
".*WARN deletion backend: calling control plane generation validation API failed.*Connection refused.*",
)

View File

@@ -1,5 +1,6 @@
from contextlib import closing
import pytest
from fixtures.benchmark_fixture import MetricReport
from fixtures.compare_fixtures import NeonCompare, PgCompare
from fixtures.pageserver.utils import wait_tenant_status_404
@@ -17,6 +18,7 @@ from fixtures.types import Lsn
# 3. Disk space used
# 4. Peak memory usage
#
@pytest.mark.skip("See https://github.com/neondatabase/neon/issues/7124")
def test_bulk_insert(neon_with_baseline: PgCompare):
env = neon_with_baseline

View File

@@ -0,0 +1,275 @@
import asyncio
import os
from typing import Tuple
import psutil
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
tenant_get_shards,
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import wait_until
TIMELINE_COUNT = 10
ENTRIES_PER_TIMELINE = 10_000
CHECKPOINT_TIMEOUT_SECONDS = 60
async def run_worker(env: NeonEnv, tenant_conf, entries: int) -> Tuple[TenantId, TimelineId, Lsn]:
tenant, timeline = env.neon_cli.create_tenant(conf=tenant_conf)
with env.endpoints.create_start("main", tenant_id=tenant) as ep:
conn = await ep.connect_async()
try:
await conn.execute("CREATE TABLE IF NOT EXISTS t(key serial primary key, value text)")
await conn.execute(
f"INSERT INTO t SELECT i, CONCAT('payload_', i) FROM generate_series(0,{entries}) as i"
)
finally:
await conn.close(timeout=10)
last_flush_lsn = Lsn(ep.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
return tenant, timeline, last_flush_lsn
async def workload(
env: NeonEnv, tenant_conf, timelines: int, entries: int
) -> list[Tuple[TenantId, TimelineId, Lsn]]:
workers = [asyncio.create_task(run_worker(env, tenant_conf, entries)) for _ in range(timelines)]
return await asyncio.gather(*workers)
def wait_until_pageserver_is_caught_up(
env: NeonEnv, last_flush_lsns: list[Tuple[TenantId, TimelineId, Lsn]]
):
for tenant, timeline, last_flush_lsn in last_flush_lsns:
shards = tenant_get_shards(env, tenant)
for tenant_shard_id, pageserver in shards:
waited = wait_for_last_record_lsn(
pageserver.http_client(), tenant_shard_id, timeline, last_flush_lsn
)
assert waited >= last_flush_lsn
def wait_until_pageserver_has_uploaded(
env: NeonEnv, last_flush_lsns: list[Tuple[TenantId, TimelineId, Lsn]]
):
for tenant, timeline, last_flush_lsn in last_flush_lsns:
shards = tenant_get_shards(env, tenant)
for tenant_shard_id, pageserver in shards:
wait_for_upload(pageserver.http_client(), tenant_shard_id, timeline, last_flush_lsn)
def wait_for_wal_ingest_metric(pageserver_http: PageserverHttpClient) -> float:
def query():
value = pageserver_http.get_metric_value("pageserver_wal_ingest_records_received_total")
assert value is not None
return value
# The metric gets initialised on the first update.
# Retry a few times, but return 0 if it's stable.
try:
return float(wait_until(3, 0.5, query))
except Exception:
return 0
def get_dirty_bytes(env):
v = env.pageserver.http_client().get_metric_value("pageserver_timeline_ephemeral_bytes") or 0
log.info(f"dirty_bytes: {v}")
return v
def assert_dirty_bytes(env, v):
assert get_dirty_bytes(env) == v
def assert_dirty_bytes_nonzero(env):
assert get_dirty_bytes(env) > 0
@pytest.mark.parametrize("immediate_shutdown", [True, False])
def test_pageserver_small_inmemory_layers(
neon_env_builder: NeonEnvBuilder, immediate_shutdown: bool
):
"""
Test that open layers get flushed after the `checkpoint_timeout` config
and do not require WAL reingest upon restart.
The workload creates a number of timelines and writes some data to each,
but not enough to trigger flushes via the `checkpoint_distance` config.
"""
tenant_conf = {
# Large `checkpoint_distance` effectively disables size
# based checkpointing.
"checkpoint_distance": f"{2 * 1024 ** 3}",
"checkpoint_timeout": f"{CHECKPOINT_TIMEOUT_SECONDS}s",
"compaction_period": "1s",
}
env = neon_env_builder.init_configs()
env.start()
last_flush_lsns = asyncio.run(workload(env, tenant_conf, TIMELINE_COUNT, ENTRIES_PER_TIMELINE))
wait_until_pageserver_is_caught_up(env, last_flush_lsns)
# We didn't write enough data to trigger a size-based checkpoint: we should see dirty data.
wait_until(10, 1, lambda: assert_dirty_bytes_nonzero(env)) # type: ignore
ps_http_client = env.pageserver.http_client()
total_wal_ingested_before_restart = wait_for_wal_ingest_metric(ps_http_client)
# Within ~ the checkpoint interval, all the ephemeral layers should be frozen and flushed,
# such that there are zero bytes of ephemeral layer left on the pageserver
log.info("Waiting for background checkpoints...")
wait_until(CHECKPOINT_TIMEOUT_SECONDS * 2, 1, lambda: assert_dirty_bytes(env, 0)) # type: ignore
# Zero ephemeral layer bytes does not imply that all the frozen layers were uploaded: they
# must be uploaded to remain visible to the pageserver after restart.
wait_until_pageserver_has_uploaded(env, last_flush_lsns)
env.pageserver.restart(immediate=immediate_shutdown)
wait_until_pageserver_is_caught_up(env, last_flush_lsns)
# Catching up with WAL ingest should have resulted in zero bytes of ephemeral layers, since
# we froze, flushed and uploaded everything before restarting. There can be no more WAL writes
# because we shut down compute endpoints before flushing.
assert get_dirty_bytes(env) == 0
total_wal_ingested_after_restart = wait_for_wal_ingest_metric(ps_http_client)
log.info(f"WAL ingested before restart: {total_wal_ingested_before_restart}")
log.info(f"WAL ingested after restart: {total_wal_ingested_after_restart}")
assert total_wal_ingested_after_restart == 0
def test_idle_checkpoints(neon_env_builder: NeonEnvBuilder):
"""
Test that `checkpoint_timeout` is enforced even if there is no safekeeper input.
"""
tenant_conf = {
# Large `checkpoint_distance` effectively disables size
# based checkpointing.
"checkpoint_distance": f"{2 * 1024 ** 3}",
"checkpoint_timeout": f"{CHECKPOINT_TIMEOUT_SECONDS}s",
"compaction_period": "1s",
}
env = neon_env_builder.init_configs()
env.start()
last_flush_lsns = asyncio.run(workload(env, tenant_conf, TIMELINE_COUNT, ENTRIES_PER_TIMELINE))
wait_until_pageserver_is_caught_up(env, last_flush_lsns)
# We didn't write enough data to trigger a size-based checkpoint: we should see dirty data.
wait_until(10, 1, lambda: assert_dirty_bytes_nonzero(env)) # type: ignore
# Stop the safekeepers, so that we cannot have any more WAL receiver connections
for sk in env.safekeepers:
sk.stop()
# We should have got here fast enough that we didn't hit the background interval yet,
# and the teardown of SK connections shouldn't prompt any layer freezing.
assert get_dirty_bytes(env) > 0
# Within ~ the checkpoint interval, all the ephemeral layers should be frozen and flushed,
# such that there are zero bytes of ephemeral layer left on the pageserver
log.info("Waiting for background checkpoints...")
wait_until(CHECKPOINT_TIMEOUT_SECONDS * 2, 1, lambda: assert_dirty_bytes(env, 0)) # type: ignore
@pytest.mark.skipif(
# We have to use at least ~100MB of data to hit the lowest limit we can configure, which is
# prohibitively slow in debug mode
os.getenv("BUILD_TYPE") == "debug",
reason="Avoid running bulkier ingest tests in debug mode",
)
def test_total_size_limit(neon_env_builder: NeonEnvBuilder):
"""
Test that checkpoints are done based on total ephemeral layer size, even if no one timeline is
individually exceeding checkpoint thresholds.
"""
system_memory = psutil.virtual_memory().total
# The smallest total size limit we can configure is 1/1024th of the system memory (e.g. 128MB on
# a system with 128GB of RAM). We will then write enough data to violate this limit.
max_dirty_data = 128 * 1024 * 1024
ephemeral_bytes_per_memory_kb = (max_dirty_data * 1024) // system_memory
assert ephemeral_bytes_per_memory_kb > 0
neon_env_builder.pageserver_config_override = f"""
ephemeral_bytes_per_memory_kb={ephemeral_bytes_per_memory_kb}
"""
compaction_period_s = 10
tenant_conf = {
# Large space + time thresholds: effectively disable these limits
"checkpoint_distance": f"{1024 ** 4}",
"checkpoint_timeout": "3600s",
"compaction_period": f"{compaction_period_s}s",
}
env = neon_env_builder.init_configs()
env.start()
timeline_count = 10
# This is about 2MiB of data per timeline
entries_per_timeline = 100_000
last_flush_lsns = asyncio.run(workload(env, tenant_conf, timeline_count, entries_per_timeline))
wait_until_pageserver_is_caught_up(env, last_flush_lsns)
total_bytes_ingested = 0
for tenant, timeline, last_flush_lsn in last_flush_lsns:
http_client = env.pageserver.http_client()
initdb_lsn = Lsn(http_client.timeline_detail(tenant, timeline)["initdb_lsn"])
total_bytes_ingested += last_flush_lsn - initdb_lsn
log.info(f"Ingested {total_bytes_ingested} bytes since initdb (vs max dirty {max_dirty_data})")
assert total_bytes_ingested > max_dirty_data
# Expected end state: the total physical size of all the tenants is in excess of the max dirty
# data, but the total amount of dirty data is less than the limit: this demonstrates that we
# have exceeded the threshold but then rolled layers in response
def get_total_historic_layers():
total_ephemeral_layers = 0
total_historic_bytes = 0
for tenant, timeline, _last_flush_lsn in last_flush_lsns:
http_client = env.pageserver.http_client()
initdb_lsn = Lsn(http_client.timeline_detail(tenant, timeline)["initdb_lsn"])
layer_map = http_client.layer_map_info(tenant, timeline)
total_historic_bytes += sum(
layer.layer_file_size
for layer in layer_map.historic_layers
if layer.layer_file_size is not None and Lsn(layer.lsn_start) > initdb_lsn
)
total_ephemeral_layers += len(layer_map.in_memory_layers)
log.info(
f"Total historic layer bytes: {total_historic_bytes} ({total_ephemeral_layers} ephemeral layers)"
)
return total_historic_bytes
def assert_bytes_rolled():
assert total_bytes_ingested - get_total_historic_layers() <= max_dirty_data
# Wait until enough layers have rolled that the amount of dirty data is under the threshold.
# We do this indirectly via layer maps, rather than the dirty bytes metric, to avoid false-passing
# if that metric isn't updated quickly enough to reflect the dirty bytes exceeding the limit.
wait_until(compaction_period_s * 2, 1, assert_bytes_rolled)
# The end state should also have the reported metric under the limit
def assert_dirty_data_limited():
dirty_bytes = get_dirty_bytes(env)
assert dirty_bytes < max_dirty_data
wait_until(compaction_period_s * 2, 1, lambda: assert_dirty_data_limited()) # type: ignore

View File

@@ -90,6 +90,8 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, seed: int):
# this shutdown case is logged at WARN severity by the time it bubbles up to logical size calculation code
# WARN ...: initial size calculation failed: downloading failed, possibly for shutdown
".*downloading failed, possibly for shutdown",
# {tenant_id=... timeline_id=...}:handle_pagerequests:handle_get_page_at_lsn_request{rel=1664/0/1260 blkno=0 req_lsn=0/149F0D8}: error reading relation or page version: Not found: will not become active. Current state: Stopping\n'
".*page_service.*will not become active.*",
]
)

View File

@@ -1,143 +0,0 @@
import asyncio
from typing import Tuple
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
tenant_get_shards,
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import wait_until
TIMELINE_COUNT = 10
ENTRIES_PER_TIMELINE = 10_000
CHECKPOINT_TIMEOUT_SECONDS = 60
TENANT_CONF = {
# Large `checkpoint_distance` effectively disables size
# based checkpointing.
"checkpoint_distance": f"{2 * 1024 ** 3}",
"checkpoint_timeout": f"{CHECKPOINT_TIMEOUT_SECONDS}s",
}
async def run_worker(env: NeonEnv, entries: int) -> Tuple[TenantId, TimelineId, Lsn]:
tenant, timeline = env.neon_cli.create_tenant(conf=TENANT_CONF)
with env.endpoints.create_start("main", tenant_id=tenant) as ep:
conn = await ep.connect_async()
try:
await conn.execute("CREATE TABLE IF NOT EXISTS t(key serial primary key, value text)")
await conn.execute(
f"INSERT INTO t SELECT i, CONCAT('payload_', i) FROM generate_series(0,{entries}) as i"
)
finally:
await conn.close(timeout=10)
last_flush_lsn = Lsn(ep.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
return tenant, timeline, last_flush_lsn
async def workload(
env: NeonEnv, timelines: int, entries: int
) -> list[Tuple[TenantId, TimelineId, Lsn]]:
workers = [asyncio.create_task(run_worker(env, entries)) for _ in range(timelines)]
return await asyncio.gather(*workers)
def wait_until_pageserver_is_caught_up(
env: NeonEnv, last_flush_lsns: list[Tuple[TenantId, TimelineId, Lsn]]
):
for tenant, timeline, last_flush_lsn in last_flush_lsns:
shards = tenant_get_shards(env, tenant)
for tenant_shard_id, pageserver in shards:
waited = wait_for_last_record_lsn(
pageserver.http_client(), tenant_shard_id, timeline, last_flush_lsn
)
assert waited >= last_flush_lsn
def wait_until_pageserver_has_uploaded(
env: NeonEnv, last_flush_lsns: list[Tuple[TenantId, TimelineId, Lsn]]
):
for tenant, timeline, last_flush_lsn in last_flush_lsns:
shards = tenant_get_shards(env, tenant)
for tenant_shard_id, pageserver in shards:
wait_for_upload(pageserver.http_client(), tenant_shard_id, timeline, last_flush_lsn)
def wait_for_wal_ingest_metric(pageserver_http: PageserverHttpClient) -> float:
def query():
value = pageserver_http.get_metric_value("pageserver_wal_ingest_records_received_total")
assert value is not None
return value
# The metric gets initialised on the first update.
# Retry a few times, but return 0 if it's stable.
try:
return float(wait_until(3, 0.5, query))
except Exception:
return 0
@pytest.mark.parametrize("immediate_shutdown", [True, False])
def test_pageserver_small_inmemory_layers(
neon_env_builder: NeonEnvBuilder, immediate_shutdown: bool
):
"""
Test that open layers get flushed after the `checkpoint_timeout` config
and do not require WAL reingest upon restart.
The workload creates a number of timelines and writes some data to each,
but not enough to trigger flushes via the `checkpoint_distance` config.
"""
def get_dirty_bytes():
v = (
env.pageserver.http_client().get_metric_value("pageserver_timeline_ephemeral_bytes")
or 0
)
log.info(f"dirty_bytes: {v}")
return v
def assert_dirty_bytes(v):
assert get_dirty_bytes() == v
env = neon_env_builder.init_configs()
env.start()
last_flush_lsns = asyncio.run(workload(env, TIMELINE_COUNT, ENTRIES_PER_TIMELINE))
wait_until_pageserver_is_caught_up(env, last_flush_lsns)
# We didn't write enough data to trigger a size-based checkpoint
assert get_dirty_bytes() > 0
ps_http_client = env.pageserver.http_client()
total_wal_ingested_before_restart = wait_for_wal_ingest_metric(ps_http_client)
# Within ~ the checkpoint interval, all the ephemeral layers should be frozen and flushed,
# such that there are zero bytes of ephemeral layer left on the pageserver
log.info("Waiting for background checkpoints...")
wait_until(CHECKPOINT_TIMEOUT_SECONDS * 2, 1, lambda: assert_dirty_bytes(0)) # type: ignore
# Zero ephemeral layer bytes does not imply that all the frozen layers were uploaded: they
# must be uploaded to remain visible to the pageserver after restart.
wait_until_pageserver_has_uploaded(env, last_flush_lsns)
env.pageserver.restart(immediate=immediate_shutdown)
wait_until_pageserver_is_caught_up(env, last_flush_lsns)
# Catching up with WAL ingest should have resulted in zero bytes of ephemeral layers, since
# we froze, flushed and uploaded everything before restarting. There can be no more WAL writes
# because we shut down compute endpoints before flushing.
assert get_dirty_bytes() == 0
total_wal_ingested_after_restart = wait_for_wal_ingest_metric(ps_http_client)
log.info(f"WAL ingested before restart: {total_wal_ingested_before_restart}")
log.info(f"WAL ingested after restart: {total_wal_ingested_after_restart}")
assert total_wal_ingested_after_restart == 0

View File

@@ -89,6 +89,11 @@ def test_sharding_service_smoke(
for tid in tenant_ids:
env.neon_cli.create_tenant(tid, shard_count=shards_per_tenant)
# Repeating a creation should be idempotent (we are just testing it doesn't return an error)
env.storage_controller.tenant_create(
tenant_id=next(iter(tenant_ids)), shard_count=shards_per_tenant
)
for node_id, count in get_node_shard_counts(env, tenant_ids).items():
# we used a multiple of pagservers for the total shard count,
# so expect equal number on all pageservers

View File

@@ -389,6 +389,9 @@ def test_create_churn_during_restart(neon_env_builder: NeonEnvBuilder):
if e.status_code == 409:
log.info(f"delay_ms={delay_ms} 409")
pass
elif e.status_code == 429:
log.info(f"delay_ms={delay_ms} 429")
pass
elif e.status_code == 400:
if "is less than existing" in e.message:
# We send creation requests very close together in time: it is expected that these