Merge branch 'main' into yuchen/direct-io-delta-image-layer-write

This commit is contained in:
Yuchen Liang
2024-12-09 11:55:35 -05:00
committed by GitHub
46 changed files with 2205 additions and 390 deletions

View File

@@ -1,16 +1,29 @@
/.github/ @neondatabase/developer-productivity
/compute_tools/ @neondatabase/control-plane @neondatabase/compute
/libs/pageserver_api/ @neondatabase/storage
/libs/postgres_ffi/ @neondatabase/compute @neondatabase/storage
/libs/proxy/ @neondatabase/proxy
/libs/remote_storage/ @neondatabase/storage
/libs/safekeeper_api/ @neondatabase/storage
# Autoscaling
/libs/vm_monitor/ @neondatabase/autoscaling
/pageserver/ @neondatabase/storage
# DevProd
/.github/ @neondatabase/developer-productivity
# Compute
/pgxn/ @neondatabase/compute
/pgxn/neon/ @neondatabase/compute @neondatabase/storage
/vendor/ @neondatabase/compute
/compute/ @neondatabase/compute
/compute_tools/ @neondatabase/compute
# Proxy
/libs/proxy/ @neondatabase/proxy
/proxy/ @neondatabase/proxy
# Storage
/pageserver/ @neondatabase/storage
/safekeeper/ @neondatabase/storage
/storage_controller @neondatabase/storage
/storage_scrubber @neondatabase/storage
/vendor/ @neondatabase/compute
/libs/pageserver_api/ @neondatabase/storage
/libs/remote_storage/ @neondatabase/storage
/libs/safekeeper_api/ @neondatabase/storage
# Shared
/pgxn/neon/ @neondatabase/compute @neondatabase/storage
/libs/compute_api/ @neondatabase/compute @neondatabase/control-plane
/libs/postgres_ffi/ @neondatabase/compute @neondatabase/storage

528
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -51,10 +51,6 @@ anyhow = { version = "1.0", features = ["backtrace"] }
arc-swap = "1.6"
async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] }
atomic-take = "1.1.0"
azure_core = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls", "hmac_rust"] }
azure_identity = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls"] }
azure_storage = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls"] }
azure_storage_blobs = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls"] }
flate2 = "1.0.26"
async-stream = "0.3"
async-trait = "0.1"
@@ -216,6 +212,12 @@ postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git",
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch = "neon" }
## Azure SDK crates
azure_core = { git = "https://github.com/neondatabase/azure-sdk-for-rust.git", branch = "neon", default-features = false, features = ["enable_reqwest_rustls", "hmac_rust"] }
azure_identity = { git = "https://github.com/neondatabase/azure-sdk-for-rust.git", branch = "neon", default-features = false, features = ["enable_reqwest_rustls"] }
azure_storage = { git = "https://github.com/neondatabase/azure-sdk-for-rust.git", branch = "neon", default-features = false, features = ["enable_reqwest_rustls"] }
azure_storage_blobs = { git = "https://github.com/neondatabase/azure-sdk-for-rust.git", branch = "neon", default-features = false, features = ["enable_reqwest_rustls"] }
## Local libraries
compute_api = { version = "0.1", path = "./libs/compute_api/" }
consumption_metrics = { version = "0.1", path = "./libs/consumption_metrics/" }

View File

@@ -115,7 +115,7 @@ RUN set -e \
# Keep the version the same as in compute/compute-node.Dockerfile and
# test_runner/regress/test_compute_metrics.py.
ENV SQL_EXPORTER_VERSION=0.13.1
ENV SQL_EXPORTER_VERSION=0.16.0
RUN curl -fsSL \
"https://github.com/burningalchemist/sql_exporter/releases/download/${SQL_EXPORTER_VERSION}/sql_exporter-${SQL_EXPORTER_VERSION}.linux-$(case "$(uname -m)" in x86_64) echo amd64;; aarch64) echo arm64;; esac).tar.gz" \
--output sql_exporter.tar.gz \

View File

@@ -1324,7 +1324,7 @@ FROM quay.io/prometheuscommunity/postgres-exporter:v0.12.1 AS postgres-exporter
# Keep the version the same as in build-tools.Dockerfile and
# test_runner/regress/test_compute_metrics.py.
FROM burningalchemist/sql_exporter:0.13.1 AS sql-exporter
FROM burningalchemist/sql_exporter:0.16.0 AS sql-exporter
#########################################################################################
#

View File

@@ -6,6 +6,7 @@
import 'sql_exporter/compute_backpressure_throttling_seconds.libsonnet',
import 'sql_exporter/compute_current_lsn.libsonnet',
import 'sql_exporter/compute_logical_snapshot_files.libsonnet',
import 'sql_exporter/compute_logical_snapshots_bytes.libsonnet',
import 'sql_exporter/compute_max_connections.libsonnet',
import 'sql_exporter/compute_receive_lsn.libsonnet',
import 'sql_exporter/compute_subscriptions_count.libsonnet',

View File

@@ -0,0 +1,7 @@
SELECT
(SELECT current_setting('neon.timeline_id')) AS timeline_id,
-- Postgres creates temporary snapshot files of the form %X-%X.snap.%d.tmp.
-- These temporary snapshot files are renamed to the actual snapshot files
-- after they are completely built. We only WAL-log the completely built
-- snapshot files
(SELECT COALESCE(sum(size), 0) FROM pg_ls_logicalsnapdir() WHERE name LIKE '%.snap') AS logical_snapshots_bytes;

View File

@@ -0,0 +1,17 @@
local neon = import 'neon.libsonnet';
local pg_ls_logicalsnapdir = importstr 'sql_exporter/compute_logical_snapshots_bytes.15.sql';
local pg_ls_dir = importstr 'sql_exporter/compute_logical_snapshots_bytes.sql';
{
metric_name: 'compute_logical_snapshots_bytes',
type: 'gauge',
help: 'Size of the pg_logical/snapshots directory, not including temporary files',
key_labels: [
'timeline_id',
],
values: [
'logical_snapshots_bytes',
],
query: if neon.PG_MAJORVERSION_NUM < 15 then pg_ls_dir else pg_ls_logicalsnapdir,
}

View File

@@ -0,0 +1,9 @@
SELECT
(SELECT setting FROM pg_settings WHERE name = 'neon.timeline_id') AS timeline_id,
-- Postgres creates temporary snapshot files of the form %X-%X.snap.%d.tmp.
-- These temporary snapshot files are renamed to the actual snapshot files
-- after they are completely built. We only WAL-log the completely built
-- snapshot files
(SELECT COALESCE(sum((pg_stat_file('pg_logical/snapshots/' || name, missing_ok => true)).size), 0)
FROM (SELECT * FROM pg_ls_dir('pg_logical/snapshots') WHERE pg_ls_dir LIKE '%.snap') AS name
) AS logical_snapshots_bytes;

View File

@@ -1243,12 +1243,7 @@ impl ComputeNode {
let postgresql_conf_path = pgdata_path.join("postgresql.conf");
config::write_postgres_conf(&postgresql_conf_path, &spec, self.http_port)?;
// TODO(ololobus): We need a concurrency during reconfiguration as well,
// but DB is already running and used by user. We can easily get out of
// `max_connections` limit, and the current code won't handle that.
// let compute_state = self.state.lock().unwrap().clone();
// let max_concurrent_connections = self.max_service_connections(&compute_state, &spec);
let max_concurrent_connections = 1;
let max_concurrent_connections = spec.reconfigure_concurrency;
// Temporarily reset max_cluster_size in config
// to avoid the possibility of hitting the limit, while we are reconfiguring:

View File

@@ -53,6 +53,7 @@ use compute_api::spec::Role;
use nix::sys::signal::kill;
use nix::sys::signal::Signal;
use pageserver_api::shard::ShardStripeSize;
use reqwest::header::CONTENT_TYPE;
use serde::{Deserialize, Serialize};
use url::Host;
use utils::id::{NodeId, TenantId, TimelineId};
@@ -618,6 +619,7 @@ impl Endpoint {
pgbouncer_settings: None,
shard_stripe_size: Some(shard_stripe_size),
local_proxy_config: None,
reconfigure_concurrency: 1,
};
let spec_path = self.endpoint_path().join("spec.json");
std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
@@ -817,6 +819,7 @@ impl Endpoint {
self.http_address.ip(),
self.http_address.port()
))
.header(CONTENT_TYPE.as_str(), "application/json")
.body(format!(
"{{\"spec\":{}}}",
serde_json::to_string_pretty(&spec)?

View File

@@ -42,6 +42,7 @@ allow = [
"MPL-2.0",
"OpenSSL",
"Unicode-DFS-2016",
"Unicode-3.0",
]
confidence-threshold = 0.8
exceptions = [

View File

@@ -19,6 +19,10 @@ pub type PgIdent = String;
/// String type alias representing Postgres extension version
pub type ExtVersion = String;
fn default_reconfigure_concurrency() -> usize {
1
}
/// Cluster spec or configuration represented as an optional number of
/// delta operations + final cluster state description.
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
@@ -67,7 +71,7 @@ pub struct ComputeSpec {
pub cluster: Cluster,
pub delta_operations: Option<Vec<DeltaOp>>,
/// An optinal hint that can be passed to speed up startup time if we know
/// An optional hint that can be passed to speed up startup time if we know
/// that no pg catalog mutations (like role creation, database creation,
/// extension creation) need to be done on the actual database to start.
#[serde(default)] // Default false
@@ -86,9 +90,7 @@ pub struct ComputeSpec {
// etc. GUCs in cluster.settings. TODO: Once the control plane has been
// updated to fill these fields, we can make these non optional.
pub tenant_id: Option<TenantId>,
pub timeline_id: Option<TimelineId>,
pub pageserver_connstring: Option<String>,
#[serde(default)]
@@ -113,6 +115,20 @@ pub struct ComputeSpec {
/// Local Proxy configuration used for JWT authentication
#[serde(default)]
pub local_proxy_config: Option<LocalProxySpec>,
/// Number of concurrent connections during the parallel RunInEachDatabase
/// phase of the apply config process.
///
/// We need a higher concurrency during reconfiguration in case of many DBs,
/// but instance is already running and used by client. We can easily get out of
/// `max_connections` limit, and the current code won't handle that.
///
/// Default is 1, but also allow control plane to override this value for specific
/// projects. It's also recommended to bump `superuser_reserved_connections` +=
/// `reconfigure_concurrency` for such projects to ensure that we always have
/// enough spare connections for reconfiguration process to succeed.
#[serde(default = "default_reconfigure_concurrency")]
pub reconfigure_concurrency: usize,
}
/// Feature flag to signal `compute_ctl` to enable certain experimental functionality.
@@ -315,6 +331,9 @@ mod tests {
// Features list defaults to empty vector.
assert!(spec.features.is_empty());
// Reconfigure concurrency defaults to 1.
assert_eq!(spec.reconfigure_concurrency, 1);
}
#[test]

View File

@@ -245,6 +245,17 @@ impl From<NodeAvailability> for NodeAvailabilityWrapper {
}
}
/// Scheduling policy enables us to selectively disable some automatic actions that the
/// controller performs on a tenant shard. This is only set to a non-default value by
/// human intervention, and it is reset to the default value (Active) when the tenant's
/// placement policy is modified away from Attached.
///
/// The typical use of a non-Active scheduling policy is one of:
/// - Pinnning a shard to a node (i.e. migrating it there & setting a non-Active scheduling policy)
/// - Working around a bug (e.g. if something is flapping and we need to stop it until the bug is fixed)
///
/// If you're not sure which policy to use to pin a shard to its current location, you probably
/// want Pause.
#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
pub enum ShardSchedulingPolicy {
// Normal mode: the tenant's scheduled locations may be updated at will, including

View File

@@ -158,7 +158,8 @@ impl ShardIdentity {
key_to_shard_number(self.count, self.stripe_size, key)
}
/// Return true if the key should be ingested by this shard
/// Return true if the key is stored only on this shard. This does not include
/// global keys, see is_key_global().
///
/// Shards must ingest _at least_ keys which return true from this check.
pub fn is_key_local(&self, key: &Key) -> bool {
@@ -171,7 +172,7 @@ impl ShardIdentity {
}
/// Return true if the key should be stored on all shards, not just one.
fn is_key_global(&self, key: &Key) -> bool {
pub fn is_key_global(&self, key: &Key) -> bool {
if key.is_slru_block_key() || key.is_slru_segment_size_key() || key.is_aux_file_key() {
// Special keys that are only stored on shard 0
false

View File

@@ -8,15 +8,14 @@ use std::io;
use std::num::NonZeroU32;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use std::time::SystemTime;
use super::REMOTE_STORAGE_PREFIX_SEPARATOR;
use anyhow::Context;
use anyhow::Result;
use azure_core::request_options::{IfMatchCondition, MaxResults, Metadata, Range};
use azure_core::{Continuable, RetryOptions};
use azure_identity::DefaultAzureCredential;
use azure_storage::StorageCredentials;
use azure_storage_blobs::blob::CopyStatus;
use azure_storage_blobs::prelude::ClientBuilder;
@@ -76,8 +75,9 @@ impl AzureBlobStorage {
let credentials = if let Ok(access_key) = env::var("AZURE_STORAGE_ACCESS_KEY") {
StorageCredentials::access_key(account.clone(), access_key)
} else {
let token_credential = DefaultAzureCredential::default();
StorageCredentials::token_credential(Arc::new(token_credential))
let token_credential = azure_identity::create_default_credential()
.context("trying to obtain Azure default credentials")?;
StorageCredentials::token_credential(token_credential)
};
// we have an outer retry

View File

@@ -164,6 +164,12 @@ impl TenantShardId {
}
}
impl std::fmt::Display for ShardNumber {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl std::fmt::Display for ShardSlug<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(

View File

@@ -87,7 +87,7 @@ use crate::tenant::timeline::offload::offload_timeline;
use crate::tenant::timeline::offload::OffloadError;
use crate::tenant::timeline::CompactFlags;
use crate::tenant::timeline::CompactOptions;
use crate::tenant::timeline::CompactRange;
use crate::tenant::timeline::CompactRequest;
use crate::tenant::timeline::CompactionError;
use crate::tenant::timeline::Timeline;
use crate::tenant::GetTimelineError;
@@ -1978,6 +1978,26 @@ async fn timeline_gc_handler(
json_response(StatusCode::OK, gc_result)
}
// Cancel scheduled compaction tasks
async fn timeline_cancel_compact_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let state = get_state(&request);
async {
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
tenant.cancel_scheduled_compaction(timeline_id);
json_response(StatusCode::OK, ())
}
.instrument(info_span!("timeline_cancel_compact", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
.await
}
// Run compaction immediately on given timeline.
async fn timeline_compact_handler(
mut request: Request<Body>,
@@ -1987,7 +2007,7 @@ async fn timeline_compact_handler(
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let compact_range = json_request_maybe::<Option<CompactRange>>(&mut request).await?;
let compact_request = json_request_maybe::<Option<CompactRequest>>(&mut request).await?;
let state = get_state(&request);
@@ -2012,22 +2032,50 @@ async fn timeline_compact_handler(
let wait_until_uploaded =
parse_query_param::<_, bool>(&request, "wait_until_uploaded")?.unwrap_or(false);
let wait_until_scheduled_compaction_done =
parse_query_param::<_, bool>(&request, "wait_until_scheduled_compaction_done")?
.unwrap_or(false);
let sub_compaction = compact_request
.as_ref()
.map(|r| r.sub_compaction)
.unwrap_or(false);
let options = CompactOptions {
compact_range,
compact_range: compact_request
.as_ref()
.and_then(|r| r.compact_range.clone()),
compact_below_lsn: compact_request.as_ref().and_then(|r| r.compact_below_lsn),
flags,
sub_compaction,
};
let scheduled = compact_request
.as_ref()
.map(|r| r.scheduled)
.unwrap_or(false);
async {
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?;
timeline
.compact_with_options(&cancel, options, &ctx)
.await
.map_err(|e| ApiError::InternalServerError(e.into()))?;
if wait_until_uploaded {
timeline.remote_client.wait_completion().await
// XXX map to correct ApiError for the cases where it's due to shutdown
.context("wait completion").map_err(ApiError::InternalServerError)?;
if scheduled {
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
let rx = tenant.schedule_compaction(timeline_id, options).await;
if wait_until_scheduled_compaction_done {
// It is possible that this will take a long time, dropping the HTTP request will not cancel the compaction.
rx.await.ok();
}
} else {
timeline
.compact_with_options(&cancel, options, &ctx)
.await
.map_err(|e| ApiError::InternalServerError(e.into()))?;
if wait_until_uploaded {
timeline.remote_client.wait_completion().await
// XXX map to correct ApiError for the cases where it's due to shutdown
.context("wait completion").map_err(ApiError::InternalServerError)?;
}
}
json_response(StatusCode::OK, ())
}
@@ -2108,16 +2156,20 @@ async fn timeline_checkpoint_handler(
// By default, checkpoints come with a compaction, but this may be optionally disabled by tests that just want to flush + upload.
let compact = parse_query_param::<_, bool>(&request, "compact")?.unwrap_or(true);
let wait_until_flushed: bool =
parse_query_param(&request, "wait_until_flushed")?.unwrap_or(true);
let wait_until_uploaded =
parse_query_param::<_, bool>(&request, "wait_until_uploaded")?.unwrap_or(false);
async {
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?;
timeline
.freeze_and_flush()
.await
.map_err(|e| {
if wait_until_flushed {
timeline.freeze_and_flush().await
} else {
timeline.freeze().await.and(Ok(()))
}.map_err(|e| {
match e {
tenant::timeline::FlushLayerError::Cancelled => ApiError::ShuttingDown,
other => ApiError::InternalServerError(other.into()),
@@ -3301,6 +3353,10 @@ pub fn make_router(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/compact",
|r| api_handler(r, timeline_compact_handler),
)
.delete(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/compact",
|r| api_handler(r, timeline_cancel_compact_handler),
)
.put(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/offload",
|r| testing_api_handler("attempt timeline offload", r, timeline_offload_handler),

View File

@@ -464,6 +464,24 @@ static LAST_RECORD_LSN: Lazy<IntGaugeVec> = Lazy::new(|| {
.expect("failed to define a metric")
});
static DISK_CONSISTENT_LSN: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_disk_consistent_lsn",
"Disk consistent LSN grouped by timeline",
&["tenant_id", "shard_id", "timeline_id"]
)
.expect("failed to define a metric")
});
pub(crate) static PROJECTED_REMOTE_CONSISTENT_LSN: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_projected_remote_consistent_lsn",
"Projected remote consistent LSN grouped by timeline",
&["tenant_id", "shard_id", "timeline_id"]
)
.expect("failed to define a metric")
});
static PITR_HISTORY_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_pitr_history_size",
@@ -1205,31 +1223,60 @@ pub(crate) mod virtual_file_io_engine {
});
}
pub(crate) struct SmgrOpTimer {
pub(crate) struct SmgrOpTimer(Option<SmgrOpTimerInner>);
pub(crate) struct SmgrOpTimerInner {
global_latency_histo: Histogram,
// Optional because not all op types are tracked per-timeline
per_timeline_latency_histo: Option<Histogram>,
global_flush_in_progress_micros: IntCounter,
per_timeline_flush_in_progress_micros: IntCounter,
start: Instant,
throttled: Duration,
op: SmgrQueryType,
}
pub(crate) struct SmgrOpFlushInProgress {
base: Instant,
global_micros: IntCounter,
per_timeline_micros: IntCounter,
}
impl SmgrOpTimer {
pub(crate) fn deduct_throttle(&mut self, throttle: &Option<Duration>) {
let Some(throttle) = throttle else {
return;
};
self.throttled += *throttle;
let inner = self.0.as_mut().expect("other public methods consume self");
inner.throttled += *throttle;
}
}
impl Drop for SmgrOpTimer {
fn drop(&mut self) {
let elapsed = self.start.elapsed();
pub(crate) fn observe_smgr_op_completion_and_start_flushing(mut self) -> SmgrOpFlushInProgress {
let (flush_start, inner) = self
.smgr_op_end()
.expect("this method consume self, and the only other caller is drop handler");
let SmgrOpTimerInner {
global_flush_in_progress_micros,
per_timeline_flush_in_progress_micros,
..
} = inner;
SmgrOpFlushInProgress {
base: flush_start,
global_micros: global_flush_in_progress_micros,
per_timeline_micros: per_timeline_flush_in_progress_micros,
}
}
let elapsed = match elapsed.checked_sub(self.throttled) {
/// Returns `None`` if this method has already been called, `Some` otherwise.
fn smgr_op_end(&mut self) -> Option<(Instant, SmgrOpTimerInner)> {
let inner = self.0.take()?;
let now = Instant::now();
let elapsed = now - inner.start;
let elapsed = match elapsed.checked_sub(inner.throttled) {
Some(elapsed) => elapsed,
None => {
use utils::rate_limit::RateLimit;
@@ -1240,9 +1287,9 @@ impl Drop for SmgrOpTimer {
})))
});
let mut guard = LOGGED.lock().unwrap();
let rate_limit = &mut guard[self.op];
let rate_limit = &mut guard[inner.op];
rate_limit.call(|| {
warn!(op=?self.op, ?elapsed, ?self.throttled, "implementation error: time spent throttled exceeds total request wall clock time");
warn!(op=?inner.op, ?elapsed, ?inner.throttled, "implementation error: time spent throttled exceeds total request wall clock time");
});
elapsed // un-throttled time, more info than just saturating to 0
}
@@ -1250,10 +1297,54 @@ impl Drop for SmgrOpTimer {
let elapsed = elapsed.as_secs_f64();
self.global_latency_histo.observe(elapsed);
if let Some(per_timeline_getpage_histo) = &self.per_timeline_latency_histo {
inner.global_latency_histo.observe(elapsed);
if let Some(per_timeline_getpage_histo) = &inner.per_timeline_latency_histo {
per_timeline_getpage_histo.observe(elapsed);
}
Some((now, inner))
}
}
impl Drop for SmgrOpTimer {
fn drop(&mut self) {
self.smgr_op_end();
}
}
impl SmgrOpFlushInProgress {
pub(crate) async fn measure<Fut, O>(mut self, mut fut: Fut) -> O
where
Fut: std::future::Future<Output = O>,
{
let mut fut = std::pin::pin!(fut);
let now = Instant::now();
// Whenever observe_guard gets called, or dropped,
// it adds the time elapsed since its last call to metrics.
// Last call is tracked in `now`.
let mut observe_guard = scopeguard::guard(
|| {
let elapsed = now - self.base;
self.global_micros
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
self.per_timeline_micros
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
self.base = now;
},
|mut observe| {
observe();
},
);
loop {
match tokio::time::timeout(Duration::from_secs(10), &mut fut).await {
Ok(v) => return v,
Err(_timeout) => {
(*observe_guard)();
}
}
}
}
}
@@ -1284,6 +1375,8 @@ pub(crate) struct SmgrQueryTimePerTimeline {
per_timeline_getpage_latency: Histogram,
global_batch_size: Histogram,
per_timeline_batch_size: Histogram,
global_flush_in_progress_micros: IntCounter,
per_timeline_flush_in_progress_micros: IntCounter,
}
static SMGR_QUERY_STARTED_GLOBAL: Lazy<IntCounterVec> = Lazy::new(|| {
@@ -1446,6 +1539,26 @@ fn set_page_service_config_max_batch_size(conf: &PageServicePipeliningConfig) {
.set(value.try_into().unwrap());
}
static PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_page_service_pagestream_flush_in_progress_micros",
"Counter that sums up the microseconds that a pagestream response was being flushed into the TCP connection. \
If the flush is particularly slow, this counter will be updated periodically to make slow flushes \
easily discoverable in monitoring. \
Hence, this is NOT a completion latency historgram.",
&["tenant_id", "shard_id", "timeline_id"],
)
.expect("failed to define a metric")
});
static PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS_GLOBAL: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_page_service_pagestream_flush_in_progress_micros_global",
"Like pageserver_page_service_pagestream_flush_in_progress_seconds, but instance-wide.",
)
.expect("failed to define a metric")
});
impl SmgrQueryTimePerTimeline {
pub(crate) fn new(tenant_shard_id: &TenantShardId, timeline_id: &TimelineId) -> Self {
let tenant_id = tenant_shard_id.tenant_id.to_string();
@@ -1486,6 +1599,12 @@ impl SmgrQueryTimePerTimeline {
.get_metric_with_label_values(&[&tenant_id, &shard_slug, &timeline_id])
.unwrap();
let global_flush_in_progress_micros =
PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS_GLOBAL.clone();
let per_timeline_flush_in_progress_micros = PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS
.get_metric_with_label_values(&[&tenant_id, &shard_slug, &timeline_id])
.unwrap();
Self {
global_started,
global_latency,
@@ -1493,6 +1612,8 @@ impl SmgrQueryTimePerTimeline {
per_timeline_getpage_started,
global_batch_size,
per_timeline_batch_size,
global_flush_in_progress_micros,
per_timeline_flush_in_progress_micros,
}
}
pub(crate) fn start_smgr_op(&self, op: SmgrQueryType, started_at: Instant) -> SmgrOpTimer {
@@ -1505,13 +1626,17 @@ impl SmgrQueryTimePerTimeline {
None
};
SmgrOpTimer {
SmgrOpTimer(Some(SmgrOpTimerInner {
global_latency_histo: self.global_latency[op as usize].clone(),
per_timeline_latency_histo,
start: started_at,
op,
throttled: Duration::ZERO,
}
global_flush_in_progress_micros: self.global_flush_in_progress_micros.clone(),
per_timeline_flush_in_progress_micros: self
.per_timeline_flush_in_progress_micros
.clone(),
}))
}
pub(crate) fn observe_getpage_batch_start(&self, batch_size: usize) {
@@ -2186,6 +2311,15 @@ pub(crate) static WAL_INGEST: Lazy<WalIngestMetrics> = Lazy::new(|| WalIngestMet
.expect("failed to define a metric"),
});
pub(crate) static PAGESERVER_TIMELINE_WAL_RECORDS_RECEIVED: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_timeline_wal_records_received",
"Number of WAL records received per shard",
&["tenant_id", "shard_id", "timeline_id"]
)
.expect("failed to define a metric")
});
pub(crate) static WAL_REDO_TIME: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_wal_redo_seconds",
@@ -2394,7 +2528,8 @@ pub(crate) struct TimelineMetrics {
pub load_layer_map_histo: StorageTimeMetrics,
pub garbage_collect_histo: StorageTimeMetrics,
pub find_gc_cutoffs_histo: StorageTimeMetrics,
pub last_record_gauge: IntGauge,
pub last_record_lsn_gauge: IntGauge,
pub disk_consistent_lsn_gauge: IntGauge,
pub pitr_history_size: UIntGauge,
pub archival_size: UIntGauge,
pub(crate) layer_size_image: UIntGauge,
@@ -2412,6 +2547,7 @@ pub(crate) struct TimelineMetrics {
pub evictions_with_low_residence_duration: std::sync::RwLock<EvictionsWithLowResidenceDuration>,
/// Number of valid LSN leases.
pub valid_lsn_lease_count_gauge: UIntGauge,
pub wal_records_received: IntCounter,
shutdown: std::sync::atomic::AtomicBool,
}
@@ -2475,7 +2611,11 @@ impl TimelineMetrics {
&shard_id,
&timeline_id,
);
let last_record_gauge = LAST_RECORD_LSN
let last_record_lsn_gauge = LAST_RECORD_LSN
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
let disk_consistent_lsn_gauge = DISK_CONSISTENT_LSN
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
@@ -2565,6 +2705,10 @@ impl TimelineMetrics {
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
let wal_records_received = PAGESERVER_TIMELINE_WAL_RECORDS_RECEIVED
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
TimelineMetrics {
tenant_id,
shard_id,
@@ -2578,7 +2722,8 @@ impl TimelineMetrics {
garbage_collect_histo,
find_gc_cutoffs_histo,
load_layer_map_histo,
last_record_gauge,
last_record_lsn_gauge,
disk_consistent_lsn_gauge,
pitr_history_size,
archival_size,
layer_size_image,
@@ -2596,6 +2741,7 @@ impl TimelineMetrics {
evictions_with_low_residence_duration,
),
valid_lsn_lease_count_gauge,
wal_records_received,
shutdown: std::sync::atomic::AtomicBool::default(),
}
}
@@ -2642,6 +2788,7 @@ impl TimelineMetrics {
let timeline_id = &self.timeline_id;
let shard_id = &self.shard_id;
let _ = LAST_RECORD_LSN.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = DISK_CONSISTENT_LSN.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = FLUSH_WAIT_UPLOAD_TIME.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = STANDBY_HORIZON.remove_label_values(&[tenant_id, shard_id, timeline_id]);
{
@@ -2732,6 +2879,16 @@ impl TimelineMetrics {
shard_id,
timeline_id,
]);
let _ = PAGESERVER_TIMELINE_WAL_RECORDS_RECEIVED.remove_label_values(&[
tenant_id,
shard_id,
timeline_id,
]);
let _ = PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS.remove_label_values(&[
tenant_id,
shard_id,
timeline_id,
]);
}
}
@@ -2805,6 +2962,7 @@ pub(crate) struct RemoteTimelineClientMetrics {
calls: Mutex<HashMap<(&'static str, &'static str), IntCounterPair>>,
bytes_started_counter: Mutex<HashMap<(&'static str, &'static str), IntCounter>>,
bytes_finished_counter: Mutex<HashMap<(&'static str, &'static str), IntCounter>>,
pub(crate) projected_remote_consistent_lsn_gauge: UIntGauge,
}
impl RemoteTimelineClientMetrics {
@@ -2819,6 +2977,10 @@ impl RemoteTimelineClientMetrics {
.unwrap(),
);
let projected_remote_consistent_lsn_gauge = PROJECTED_REMOTE_CONSISTENT_LSN
.get_metric_with_label_values(&[&tenant_id_str, &shard_id_str, &timeline_id_str])
.unwrap();
RemoteTimelineClientMetrics {
tenant_id: tenant_id_str,
shard_id: shard_id_str,
@@ -2827,6 +2989,7 @@ impl RemoteTimelineClientMetrics {
bytes_started_counter: Mutex::new(HashMap::default()),
bytes_finished_counter: Mutex::new(HashMap::default()),
remote_physical_size_gauge,
projected_remote_consistent_lsn_gauge,
}
}
@@ -3040,6 +3203,7 @@ impl Drop for RemoteTimelineClientMetrics {
calls,
bytes_started_counter,
bytes_finished_counter,
projected_remote_consistent_lsn_gauge,
} = self;
for ((a, b), _) in calls.get_mut().unwrap().drain() {
let mut res = [Ok(()), Ok(())];
@@ -3069,6 +3233,14 @@ impl Drop for RemoteTimelineClientMetrics {
let _ = remote_physical_size_gauge; // use to avoid 'unused' warning in desctructuring above
let _ = REMOTE_PHYSICAL_SIZE.remove_label_values(&[tenant_id, shard_id, timeline_id]);
}
{
let _ = projected_remote_consistent_lsn_gauge;
let _ = PROJECTED_REMOTE_CONSISTENT_LSN.remove_label_values(&[
tenant_id,
shard_id,
timeline_id,
]);
}
}
}

View File

@@ -1017,10 +1017,8 @@ impl PageServerHandler {
// Map handler result to protocol behavior.
// Some handler errors cause exit from pagestream protocol.
// Other handler errors are sent back as an error message and we stay in pagestream protocol.
let mut timers: smallvec::SmallVec<[_; 1]> =
smallvec::SmallVec::with_capacity(handler_results.len());
for handler_result in handler_results {
let response_msg = match handler_result {
let (response_msg, timer) = match handler_result {
Err(e) => match &e {
PageStreamError::Shutdown => {
// If we fail to fulfil a request during shutdown, which may be _because_ of
@@ -1044,34 +1042,66 @@ impl PageServerHandler {
span.in_scope(|| {
error!("error reading relation or page version: {full:#}")
});
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
})
(
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
}),
None, // TODO: measure errors
)
}
},
Ok((response_msg, timer)) => {
// Extending the lifetime of the timers so observations on drop
// include the flush time.
timers.push(timer);
response_msg
}
Ok((response_msg, timer)) => (response_msg, Some(timer)),
};
//
// marshal & transmit response message
//
pgb_writer.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
}
tokio::select! {
biased;
_ = cancel.cancelled() => {
// We were requested to shut down.
info!("shutdown request received in page handler");
return Err(QueryError::Shutdown)
}
res = pgb_writer.flush() => {
res?;
// We purposefully don't count flush time into the timer.
//
// The reason is that current compute client will not perform protocol processing
// if the postgres backend process is doing things other than `->smgr_read()`.
// This is especially the case for prefetch.
//
// If the compute doesn't read from the connection, eventually TCP will backpressure
// all the way into our flush call below.
//
// The timer's underlying metric is used for a storage-internal latency SLO and
// we don't want to include latency in it that we can't control.
// And as pointed out above, in this case, we don't control the time that flush will take.
let flushing_timer =
timer.map(|timer| timer.observe_smgr_op_completion_and_start_flushing());
// what we want to do
let flush_fut = pgb_writer.flush();
// metric for how long flushing takes
let flush_fut = match flushing_timer {
Some(flushing_timer) => {
futures::future::Either::Left(flushing_timer.measure(flush_fut))
}
None => futures::future::Either::Right(flush_fut),
};
// do it while respecting cancellation
let _: () = async move {
tokio::select! {
biased;
_ = cancel.cancelled() => {
// We were requested to shut down.
info!("shutdown request received in page handler");
return Err(QueryError::Shutdown)
}
res = flush_fut => {
res?;
}
}
Ok(())
}
// and log the info! line inside the request span
.instrument(span.clone())
.await?;
}
drop(timers);
Ok(())
}

View File

@@ -37,14 +37,19 @@ use remote_timeline_client::manifest::{
};
use remote_timeline_client::UploadQueueNotReadyError;
use std::collections::BTreeMap;
use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
use std::sync::atomic::AtomicBool;
use std::sync::Weak;
use std::time::SystemTime;
use storage_broker::BrokerClientChannel;
use timeline::compaction::ScheduledCompactionTask;
use timeline::import_pgdata;
use timeline::offload::offload_timeline;
use timeline::CompactFlags;
use timeline::CompactOptions;
use timeline::CompactionError;
use timeline::ShutdownMode;
use tokio::io::BufReader;
use tokio::sync::watch;
@@ -339,6 +344,11 @@ pub struct Tenant {
/// Overhead of mutex is acceptable because compaction is done with a multi-second period.
compaction_circuit_breaker: std::sync::Mutex<CircuitBreaker>,
/// Scheduled compaction tasks. Currently, this can only be populated by triggering
/// a manual gc-compaction from the manual compaction API.
scheduled_compaction_tasks:
std::sync::Mutex<HashMap<TimelineId, VecDeque<ScheduledCompactionTask>>>,
/// If the tenant is in Activating state, notify this to encourage it
/// to proceed to Active as soon as possible, rather than waiting for lazy
/// background warmup.
@@ -2953,27 +2963,100 @@ impl Tenant {
for (timeline_id, timeline, (can_compact, can_offload)) in &timelines_to_compact_or_offload
{
// pending_task_left == None: cannot compact, maybe still pending tasks
// pending_task_left == Some(true): compaction task left
// pending_task_left == Some(false): no compaction task left
let pending_task_left = if *can_compact {
Some(
timeline
.compact(cancel, EnumSet::empty(), ctx)
.instrument(info_span!("compact_timeline", %timeline_id))
.await
.inspect_err(|e| match e {
timeline::CompactionError::ShuttingDown => (),
timeline::CompactionError::Offload(_) => {
// Failures to offload timelines do not trip the circuit breaker, because
// they do not do lots of writes the way compaction itself does: it is cheap
// to retry, and it would be bad to stop all compaction because of an issue with offloading.
let has_pending_l0_compaction_task = timeline
.compact(cancel, EnumSet::empty(), ctx)
.instrument(info_span!("compact_timeline", %timeline_id))
.await
.inspect_err(|e| match e {
timeline::CompactionError::ShuttingDown => (),
timeline::CompactionError::Offload(_) => {
// Failures to offload timelines do not trip the circuit breaker, because
// they do not do lots of writes the way compaction itself does: it is cheap
// to retry, and it would be bad to stop all compaction because of an issue with offloading.
}
timeline::CompactionError::Other(e) => {
self.compaction_circuit_breaker
.lock()
.unwrap()
.fail(&CIRCUIT_BREAKERS_BROKEN, e);
}
})?;
if has_pending_l0_compaction_task {
Some(true)
} else {
let mut has_pending_scheduled_compaction_task;
let next_scheduled_compaction_task = {
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
if let Some(tline_pending_tasks) = guard.get_mut(timeline_id) {
if !tline_pending_tasks.is_empty() {
info!(
"{} tasks left in the compaction schedule queue",
tline_pending_tasks.len()
);
}
timeline::CompactionError::Other(e) => {
self.compaction_circuit_breaker
.lock()
.unwrap()
.fail(&CIRCUIT_BREAKERS_BROKEN, e);
let next_task = tline_pending_tasks.pop_front();
has_pending_scheduled_compaction_task = !tline_pending_tasks.is_empty();
next_task
} else {
has_pending_scheduled_compaction_task = false;
None
}
};
if let Some(mut next_scheduled_compaction_task) = next_scheduled_compaction_task
{
if !next_scheduled_compaction_task
.options
.flags
.contains(CompactFlags::EnhancedGcBottomMostCompaction)
{
warn!("ignoring scheduled compaction task: scheduled task must be gc compaction: {:?}", next_scheduled_compaction_task.options);
} else if next_scheduled_compaction_task.options.sub_compaction {
info!("running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs");
let jobs = timeline
.gc_compaction_split_jobs(next_scheduled_compaction_task.options)
.await
.map_err(CompactionError::Other)?;
if jobs.is_empty() {
info!("no jobs to run, skipping scheduled compaction task");
} else {
has_pending_scheduled_compaction_task = true;
let jobs_len = jobs.len();
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
let tline_pending_tasks = guard.entry(*timeline_id).or_default();
for (idx, job) in jobs.into_iter().enumerate() {
tline_pending_tasks.push_back(ScheduledCompactionTask {
options: job,
result_tx: if idx == jobs_len - 1 {
// The last compaction job sends the completion signal
next_scheduled_compaction_task.result_tx.take()
} else {
None
},
});
}
info!("scheduled enhanced gc bottom-most compaction with sub-compaction, split into {} jobs", jobs_len);
}
})?,
)
} else {
let _ = timeline
.compact_with_options(
cancel,
next_scheduled_compaction_task.options,
ctx,
)
.instrument(info_span!("scheduled_compact_timeline", %timeline_id))
.await?;
if let Some(tx) = next_scheduled_compaction_task.result_tx.take() {
// TODO: we can send compaction statistics in the future
tx.send(()).ok();
}
}
}
Some(has_pending_scheduled_compaction_task)
}
} else {
None
};
@@ -2993,6 +3076,36 @@ impl Tenant {
Ok(has_pending_task)
}
/// Cancel scheduled compaction tasks
pub(crate) fn cancel_scheduled_compaction(
&self,
timeline_id: TimelineId,
) -> Vec<ScheduledCompactionTask> {
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
if let Some(tline_pending_tasks) = guard.get_mut(&timeline_id) {
let current_tline_pending_tasks = std::mem::take(tline_pending_tasks);
current_tline_pending_tasks.into_iter().collect()
} else {
Vec::new()
}
}
/// Schedule a compaction task for a timeline.
pub(crate) async fn schedule_compaction(
&self,
timeline_id: TimelineId,
options: CompactOptions,
) -> tokio::sync::oneshot::Receiver<()> {
let (tx, rx) = tokio::sync::oneshot::channel();
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
let tline_pending_tasks = guard.entry(timeline_id).or_default();
tline_pending_tasks.push_back(ScheduledCompactionTask {
options,
result_tx: Some(tx),
});
rx
}
// Call through to all timelines to freeze ephemeral layers if needed. Usually
// this happens during ingest: this background housekeeping is for freezing layers
// that are open but haven't been written to for some time.
@@ -4005,6 +4118,7 @@ impl Tenant {
// use an extremely long backoff.
Some(Duration::from_secs(3600 * 24)),
)),
scheduled_compaction_tasks: Mutex::new(Default::default()),
activate_now_sem: tokio::sync::Semaphore::new(0),
attach_wal_lag_cooldown: Arc::new(std::sync::OnceLock::new()),
cancel: CancellationToken::default(),
@@ -9163,6 +9277,7 @@ mod tests {
CompactOptions {
flags: dryrun_flags,
compact_range: None,
..Default::default()
},
&ctx,
)
@@ -9399,6 +9514,7 @@ mod tests {
CompactOptions {
flags: dryrun_flags,
compact_range: None,
..Default::default()
},
&ctx,
)
@@ -9885,7 +10001,15 @@ mod tests {
// Do a partial compaction on key range 0..2
tline
.partial_compact_with_gc(get_key(0)..get_key(2), &cancel, EnumSet::new(), &ctx)
.compact_with_gc(
&cancel,
CompactOptions {
flags: EnumSet::new(),
compact_range: Some((get_key(0)..get_key(2)).into()),
..Default::default()
},
&ctx,
)
.await
.unwrap();
let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await;
@@ -9924,7 +10048,15 @@ mod tests {
// Do a partial compaction on key range 2..4
tline
.partial_compact_with_gc(get_key(2)..get_key(4), &cancel, EnumSet::new(), &ctx)
.compact_with_gc(
&cancel,
CompactOptions {
flags: EnumSet::new(),
compact_range: Some((get_key(2)..get_key(4)).into()),
..Default::default()
},
&ctx,
)
.await
.unwrap();
let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await;
@@ -9968,7 +10100,15 @@ mod tests {
// Do a partial compaction on key range 4..9
tline
.partial_compact_with_gc(get_key(4)..get_key(9), &cancel, EnumSet::new(), &ctx)
.compact_with_gc(
&cancel,
CompactOptions {
flags: EnumSet::new(),
compact_range: Some((get_key(4)..get_key(9)).into()),
..Default::default()
},
&ctx,
)
.await
.unwrap();
let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await;
@@ -10011,7 +10151,15 @@ mod tests {
// Do a partial compaction on key range 9..10
tline
.partial_compact_with_gc(get_key(9)..get_key(10), &cancel, EnumSet::new(), &ctx)
.compact_with_gc(
&cancel,
CompactOptions {
flags: EnumSet::new(),
compact_range: Some((get_key(9)..get_key(10)).into()),
..Default::default()
},
&ctx,
)
.await
.unwrap();
let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await;
@@ -10059,7 +10207,15 @@ mod tests {
// Do a partial compaction on key range 0..10, all image layers below LSN 20 can be replaced with new ones.
tline
.partial_compact_with_gc(get_key(0)..get_key(10), &cancel, EnumSet::new(), &ctx)
.compact_with_gc(
&cancel,
CompactOptions {
flags: EnumSet::new(),
compact_range: Some((get_key(0)..get_key(10)).into()),
..Default::default()
},
&ctx,
)
.await
.unwrap();
let all_layers = inspect_and_sort(&tline, Some(get_key(0)..get_key(10))).await;

View File

@@ -2192,6 +2192,9 @@ impl RemoteTimelineClient {
upload_queue.clean.1 = Some(task.task_id);
let lsn = upload_queue.clean.0.metadata.disk_consistent_lsn();
self.metrics
.projected_remote_consistent_lsn_gauge
.set(lsn.0);
if self.generation.is_none() {
// Legacy mode: skip validating generation

View File

@@ -6,7 +6,6 @@
use std::collections::HashSet;
use std::future::Future;
use std::str::FromStr;
use std::sync::Arc;
use std::time::SystemTime;
use anyhow::{anyhow, Context};
@@ -27,7 +26,7 @@ use crate::span::{
use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
use crate::tenant::storage_layer::LayerName;
use crate::tenant::Generation;
use crate::virtual_file::{on_fatal_io_error, IoBufferMut, MaybeFatalIo, VirtualFile};
use crate::virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile};
use crate::TEMP_FILE_SUFFIX;
use remote_storage::{
DownloadError, DownloadKind, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath,
@@ -150,7 +149,7 @@ async fn download_object<'a>(
storage: &'a GenericRemoteStorage,
src_path: &RemotePath,
dst_path: &Utf8PathBuf,
gate: &utils::sync::gate::Gate,
#[cfg_attr(target_os = "macos", allow(unused_variables))] gate: &utils::sync::gate::Gate,
cancel: &CancellationToken,
#[cfg_attr(target_os = "macos", allow(unused_variables))] ctx: &RequestContext,
) -> Result<u64, DownloadError> {
@@ -209,6 +208,8 @@ async fn download_object<'a>(
#[cfg(target_os = "linux")]
crate::virtual_file::io_engine::IoEngine::TokioEpollUring => {
use crate::virtual_file::owned_buffers_io;
use crate::virtual_file::IoBufferMut;
use std::sync::Arc;
async {
let destination_file = Arc::new(
VirtualFile::create(dst_path, ctx)

View File

@@ -53,7 +53,7 @@ use utils::{
postgres_client::PostgresClientProtocol,
sync::gate::{Gate, GateGuard},
};
use wal_decoder::serialized_batch::SerializedValueBatch;
use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::{Arc, Mutex, RwLock, Weak};
@@ -768,7 +768,7 @@ pub enum GetLogicalSizePriority {
Background,
}
#[derive(enumset::EnumSetType)]
#[derive(Debug, enumset::EnumSetType)]
pub(crate) enum CompactFlags {
ForceRepartition,
ForceImageLayerCreation,
@@ -777,6 +777,19 @@ pub(crate) enum CompactFlags {
DryRun,
}
#[serde_with::serde_as]
#[derive(Debug, Clone, serde::Deserialize)]
pub(crate) struct CompactRequest {
pub compact_range: Option<CompactRange>,
pub compact_below_lsn: Option<Lsn>,
/// Whether the compaction job should be scheduled.
#[serde(default)]
pub scheduled: bool,
/// Whether the compaction job should be split across key ranges.
#[serde(default)]
pub sub_compaction: bool,
}
#[serde_with::serde_as]
#[derive(Debug, Clone, serde::Deserialize)]
pub(crate) struct CompactRange {
@@ -786,10 +799,27 @@ pub(crate) struct CompactRange {
pub end: Key,
}
#[derive(Clone, Default)]
impl From<Range<Key>> for CompactRange {
fn from(range: Range<Key>) -> Self {
CompactRange {
start: range.start,
end: range.end,
}
}
}
#[derive(Debug, Clone, Default)]
pub(crate) struct CompactOptions {
pub flags: EnumSet<CompactFlags>,
/// If set, the compaction will only compact the key range specified by this option.
/// This option is only used by GC compaction.
pub compact_range: Option<CompactRange>,
/// If set, the compaction will only compact the LSN below this value.
/// This option is only used by GC compaction.
pub compact_below_lsn: Option<Lsn>,
/// Enable sub-compaction (split compaction job across key ranges).
/// This option is only used by GC compaction.
pub sub_compaction: bool,
}
impl std::fmt::Debug for Timeline {
@@ -1433,23 +1463,31 @@ impl Timeline {
Ok(lease)
}
/// Flush to disk all data that was written with the put_* functions
/// Freeze the current open in-memory layer. It will be written to disk on next iteration.
/// Returns the flush request ID which can be awaited with wait_flush_completion().
#[instrument(skip(self), fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id))]
pub(crate) async fn freeze(&self) -> Result<u64, FlushLayerError> {
self.freeze0().await
}
/// Freeze and flush the open in-memory layer, waiting for it to be written to disk.
#[instrument(skip(self), fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%self.timeline_id))]
pub(crate) async fn freeze_and_flush(&self) -> Result<(), FlushLayerError> {
self.freeze_and_flush0().await
}
/// Freeze the current open in-memory layer. It will be written to disk on next iteration.
/// Returns the flush request ID which can be awaited with wait_flush_completion().
pub(crate) async fn freeze0(&self) -> Result<u64, FlushLayerError> {
let mut g = self.write_lock.lock().await;
let to_lsn = self.get_last_record_lsn();
self.freeze_inmem_layer_at(to_lsn, &mut g).await
}
// This exists to provide a non-span creating version of `freeze_and_flush` we can call without
// polluting the span hierarchy.
pub(crate) async fn freeze_and_flush0(&self) -> Result<(), FlushLayerError> {
let token = {
// Freeze the current open in-memory layer. It will be written to disk on next
// iteration.
let mut g = self.write_lock.lock().await;
let to_lsn = self.get_last_record_lsn();
self.freeze_inmem_layer_at(to_lsn, &mut g).await?
};
let token = self.freeze0().await?;
self.wait_flush_completion(token).await
}
@@ -1604,6 +1642,8 @@ impl Timeline {
CompactOptions {
flags,
compact_range: None,
compact_below_lsn: None,
sub_compaction: false,
},
ctx,
)
@@ -2359,7 +2399,7 @@ impl Timeline {
result
.metrics
.last_record_gauge
.last_record_lsn_gauge
.set(disk_consistent_lsn.0 as i64);
result
})
@@ -3481,7 +3521,7 @@ impl Timeline {
pub(crate) fn finish_write(&self, new_lsn: Lsn) {
assert!(new_lsn.is_aligned());
self.metrics.last_record_gauge.set(new_lsn.0 as i64);
self.metrics.last_record_lsn_gauge.set(new_lsn.0 as i64);
self.last_record_lsn.advance(new_lsn);
}
@@ -3849,6 +3889,10 @@ impl Timeline {
fn set_disk_consistent_lsn(&self, new_value: Lsn) -> bool {
let old_value = self.disk_consistent_lsn.fetch_max(new_value);
assert!(new_value >= old_value, "disk_consistent_lsn must be growing monotonously at runtime; current {old_value}, offered {new_value}");
self.metrics
.disk_consistent_lsn_gauge
.set(new_value.0 as i64);
new_value != old_value
}
@@ -5895,6 +5939,23 @@ impl<'a> TimelineWriter<'a> {
return Ok(());
}
// In debug builds, assert that we don't write any keys that don't belong to this shard.
// We don't assert this in release builds, since key ownership policies may change over
// time. Stray keys will be removed during compaction.
if cfg!(debug_assertions) {
for metadata in &batch.metadata {
if let ValueMeta::Serialized(metadata) = metadata {
let key = Key::from_compact(metadata.key);
assert!(
self.shard_identity.is_key_local(&key)
|| self.shard_identity.is_key_global(&key),
"key {key} does not belong on shard {}",
self.shard_identity.shard_index()
);
}
}
}
let batch_max_lsn = batch.max_lsn;
let buf_size: u64 = batch.buffer_size() as u64;

View File

@@ -10,13 +10,12 @@ use std::sync::Arc;
use super::layer_manager::LayerManager;
use super::{
CompactFlags, CompactOptions, CreateImageLayersError, DurationRecorder, ImageLayerCreationMode,
RecordedDuration, Timeline,
CompactFlags, CompactOptions, CompactRange, CreateImageLayersError, DurationRecorder,
ImageLayerCreationMode, RecordedDuration, Timeline,
};
use anyhow::{anyhow, bail, Context};
use bytes::Bytes;
use enumset::EnumSet;
use fail::fail_point;
use itertools::Itertools;
use pageserver_api::key::KEY_SIZE;
@@ -30,7 +29,6 @@ use utils::id::TimelineId;
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
use crate::page_cache;
use crate::statvfs::Statvfs;
use crate::tenant::checks::check_valid_layermap;
use crate::tenant::remote_timeline_client::WaitCompletionError;
use crate::tenant::storage_layer::batch_split_writer::{
BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter,
@@ -64,6 +62,12 @@ use super::CompactionError;
/// Maximum number of deltas before generating an image layer in bottom-most compaction.
const COMPACTION_DELTA_THRESHOLD: usize = 5;
/// A scheduled compaction task.
pub struct ScheduledCompactionTask {
pub options: CompactOptions,
pub result_tx: Option<tokio::sync::oneshot::Sender<()>>,
}
pub struct GcCompactionJobDescription {
/// All layers to read in the compaction job
selected_layers: Vec<Layer>,
@@ -1177,11 +1181,12 @@ impl Timeline {
.await
.map_err(CompactionError::Other)?;
} else {
debug!(
"Dropping key {} during compaction (it belongs on shard {:?})",
key,
self.shard_identity.get_shard_number(&key)
);
let shard = self.shard_identity.shard_index();
let owner = self.shard_identity.get_shard_number(&key);
if cfg!(debug_assertions) {
panic!("key {key} does not belong on shard {shard}, owned by {owner}");
}
debug!("dropping key {key} during compaction (it belongs on shard {owner})");
}
if !new_layers.is_empty() {
@@ -1749,22 +1754,114 @@ impl Timeline {
Ok(())
}
pub(crate) async fn compact_with_gc(
/// Split a gc-compaction job into multiple compaction jobs. Optimally, this function should return a vector of
/// `GcCompactionJobDesc`. But we want to keep it simple on the tenant scheduling side without exposing too much
/// ad-hoc information about gc compaction itself.
pub(crate) async fn gc_compaction_split_jobs(
self: &Arc<Self>,
cancel: &CancellationToken,
options: CompactOptions,
ctx: &RequestContext,
) -> anyhow::Result<()> {
self.partial_compact_with_gc(
options
.compact_range
.map(|range| range.start..range.end)
.unwrap_or_else(|| Key::MIN..Key::MAX),
cancel,
options.flags,
ctx,
)
.await
) -> anyhow::Result<Vec<CompactOptions>> {
if !options.sub_compaction {
return Ok(vec![options]);
}
let compact_range = options.compact_range.clone().unwrap_or(CompactRange {
start: Key::MIN,
end: Key::MAX,
});
let compact_below_lsn = if let Some(compact_below_lsn) = options.compact_below_lsn {
compact_below_lsn
} else {
let gc_info = self.gc_info.read().unwrap();
gc_info.cutoffs.select_min() // use the real gc cutoff
};
let mut compact_jobs = Vec::new();
// For now, we simply use the key partitioning information; we should do a more fine-grained partitioning
// by estimating the amount of files read for a compaction job. We should also partition on LSN.
let Ok(partition) = self.partitioning.try_lock() else {
bail!("failed to acquire partition lock");
};
let ((dense_ks, sparse_ks), _) = &*partition;
// Truncate the key range to be within user specified compaction range.
fn truncate_to(
source_start: &Key,
source_end: &Key,
target_start: &Key,
target_end: &Key,
) -> Option<(Key, Key)> {
let start = source_start.max(target_start);
let end = source_end.min(target_end);
if start < end {
Some((*start, *end))
} else {
None
}
}
let mut split_key_ranges = Vec::new();
let ranges = dense_ks
.parts
.iter()
.map(|partition| partition.ranges.iter())
.chain(sparse_ks.parts.iter().map(|x| x.0.ranges.iter()))
.flatten()
.cloned()
.collect_vec();
for range in ranges.iter() {
let Some((start, end)) = truncate_to(
&range.start,
&range.end,
&compact_range.start,
&compact_range.end,
) else {
continue;
};
split_key_ranges.push((start, end));
}
split_key_ranges.sort();
let guard = self.layers.read().await;
let layer_map = guard.layer_map()?;
let mut current_start = None;
// Split compaction job to about 2GB each
const GC_COMPACT_MAX_SIZE_MB: u64 = 4 * 1024; // 4GB, TODO: should be configuration in the future
let ranges_num = split_key_ranges.len();
for (idx, (start, end)) in split_key_ranges.into_iter().enumerate() {
if current_start.is_none() {
current_start = Some(start);
}
let start = current_start.unwrap();
if start >= end {
// We have already processed this partition.
continue;
}
let res = layer_map.range_search(start..end, compact_below_lsn);
let total_size = res.found.keys().map(|x| x.layer.file_size()).sum::<u64>();
if total_size > GC_COMPACT_MAX_SIZE_MB * 1024 * 1024 || ranges_num == idx + 1 {
let mut compact_options = options.clone();
// Try to extend the compaction range so that we include at least one full layer file.
let extended_end = res
.found
.keys()
.map(|layer| layer.layer.key_range.end)
.min();
// It is possible that the search range does not contain any layer files when we reach the end of the loop.
// In this case, we simply use the specified key range end.
let end = if let Some(extended_end) = extended_end {
extended_end.max(end)
} else {
end
};
info!(
"splitting compaction job: {}..{}, estimated_size={}",
start, end, total_size
);
compact_options.compact_range = Some(CompactRange { start, end });
compact_options.compact_below_lsn = Some(compact_below_lsn);
compact_options.sub_compaction = false;
compact_jobs.push(compact_options);
current_start = Some(end);
}
}
drop(guard);
Ok(compact_jobs)
}
/// An experimental compaction building block that combines compaction with garbage collection.
@@ -1774,19 +1871,51 @@ impl Timeline {
/// layers and image layers, which generates image layers on the gc horizon, drop deltas below gc horizon,
/// and create delta layers with all deltas >= gc horizon.
///
/// If `key_range` is provided, it will only compact the keys within the range, aka partial compaction.
/// If `options.compact_range` is provided, it will only compact the keys within the range, aka partial compaction.
/// Partial compaction will read and process all layers overlapping with the key range, even if it might
/// contain extra keys. After the gc-compaction phase completes, delta layers that are not fully contained
/// within the key range will be rewritten to ensure they do not overlap with the delta layers. Providing
/// Key::MIN..Key..MAX to the function indicates a full compaction, though technically, `Key::MAX` is not
/// part of the range.
pub(crate) async fn partial_compact_with_gc(
///
/// If `options.compact_below_lsn` is provided, the compaction will only compact layers below or intersect with
/// the LSN. Otherwise, it will use the gc cutoff by default.
pub(crate) async fn compact_with_gc(
self: &Arc<Self>,
compaction_key_range: Range<Key>,
cancel: &CancellationToken,
flags: EnumSet<CompactFlags>,
options: CompactOptions,
ctx: &RequestContext,
) -> anyhow::Result<()> {
if options.sub_compaction {
info!("running enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs");
let jobs = self.gc_compaction_split_jobs(options).await?;
let jobs_len = jobs.len();
for (idx, job) in jobs.into_iter().enumerate() {
info!(
"running enhanced gc bottom-most compaction, sub-compaction {}/{}",
idx + 1,
jobs_len
);
self.compact_with_gc_inner(cancel, job, ctx).await?;
}
if jobs_len == 0 {
info!("no jobs to run, skipping gc bottom-most compaction");
}
return Ok(());
}
self.compact_with_gc_inner(cancel, options, ctx).await
}
async fn compact_with_gc_inner(
self: &Arc<Self>,
cancel: &CancellationToken,
options: CompactOptions,
ctx: &RequestContext,
) -> anyhow::Result<()> {
assert!(
!options.sub_compaction,
"sub-compaction should be handled by the outer function"
);
// Block other compaction/GC tasks from running for now. GC-compaction could run along
// with legacy compaction tasks in the future. Always ensure the lock order is compaction -> gc.
// Note that we already acquired the compaction lock when the outer `compact` function gets called.
@@ -1806,6 +1935,12 @@ impl Timeline {
)
.await?;
let flags = options.flags;
let compaction_key_range = options
.compact_range
.map(|range| range.start..range.end)
.unwrap_or_else(|| Key::MIN..Key::MAX);
let dry_run = flags.contains(CompactFlags::DryRun);
if compaction_key_range == (Key::MIN..Key::MAX) {
@@ -1829,7 +1964,18 @@ impl Timeline {
let layers = guard.layer_map()?;
let gc_info = self.gc_info.read().unwrap();
let mut retain_lsns_below_horizon = Vec::new();
let gc_cutoff = gc_info.cutoffs.select_min();
let gc_cutoff = {
let real_gc_cutoff = gc_info.cutoffs.select_min();
// The compaction algorithm will keep all keys above the gc_cutoff while keeping only necessary keys below the gc_cutoff for
// each of the retain_lsn. Therefore, if the user-provided `compact_below_lsn` is larger than the real gc cutoff, we will use
// the real cutoff.
let mut gc_cutoff = options.compact_below_lsn.unwrap_or(real_gc_cutoff);
if gc_cutoff > real_gc_cutoff {
warn!("provided compact_below_lsn={} is larger than the real_gc_cutoff={}, using the real gc cutoff", gc_cutoff, real_gc_cutoff);
gc_cutoff = real_gc_cutoff;
}
gc_cutoff
};
for (lsn, _timeline_id, _is_offloaded) in &gc_info.retain_lsns {
if lsn < &gc_cutoff {
retain_lsns_below_horizon.push(*lsn);
@@ -1849,7 +1995,7 @@ impl Timeline {
.map(|desc| desc.get_lsn_range().end)
.max()
else {
info!("no layers to compact with gc");
info!("no layers to compact with gc: no historic layers below gc_cutoff, gc_cutoff={}", gc_cutoff);
return Ok(());
};
// Then, pick all the layers that are below the max_layer_lsn. This is to ensure we can pick all single-key
@@ -1872,7 +2018,7 @@ impl Timeline {
}
}
if selected_layers.is_empty() {
info!("no layers to compact with gc");
info!("no layers to compact with gc: no layers within the key range, gc_cutoff={}, key_range={}..{}", gc_cutoff, compaction_key_range.start, compaction_key_range.end);
return Ok(());
}
retain_lsns_below_horizon.sort();
@@ -1939,14 +2085,15 @@ impl Timeline {
// Step 1: construct a k-merge iterator over all layers.
// Also, verify if the layer map can be split by drawing a horizontal line at every LSN start/end split point.
let layer_names = job_desc
.selected_layers
.iter()
.map(|layer| layer.layer_desc().layer_name())
.collect_vec();
if let Some(err) = check_valid_layermap(&layer_names) {
warn!("gc-compaction layer map check failed because {}, this is normal if partial compaction is not finished yet", err);
}
// disable the check for now because we need to adjust the check for partial compactions, will enable later.
// let layer_names = job_desc
// .selected_layers
// .iter()
// .map(|layer| layer.layer_desc().layer_name())
// .collect_vec();
// if let Some(err) = check_valid_layermap(&layer_names) {
// warn!("gc-compaction layer map check failed because {}, this is normal if partial compaction is not finished yet", err);
// }
// The maximum LSN we are processing in this compaction loop
let end_lsn = job_desc
.selected_layers
@@ -2052,6 +2199,11 @@ impl Timeline {
// This is not handled in the filter iterator because shard is determined by hash.
// Therefore, it does not give us any performance benefit to do things like skip
// a whole layer file as handling key spaces (ranges).
if cfg!(debug_assertions) {
let shard = self.shard_identity.shard_index();
let owner = self.shard_identity.get_shard_number(&key);
panic!("key {key} does not belong on shard {shard}, owned by {owner}");
}
continue;
}
if !job_desc.compaction_key_range.contains(&key) {

View File

@@ -369,6 +369,13 @@ pub(super) async fn handle_walreceiver_connection(
// advances it to its end LSN. 0 is just an initialization placeholder.
let mut modification = timeline.begin_modification(Lsn(0));
if !records.is_empty() {
timeline
.metrics
.wal_records_received
.inc_by(records.len() as u64);
}
for interpreted in records {
if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes)
&& uncommitted_records > 0
@@ -510,6 +517,7 @@ pub(super) async fn handle_walreceiver_connection(
}
// Ingest the records without immediately committing them.
timeline.metrics.wal_records_received.inc();
let ingested = walingest
.ingest_record(interpreted, &mut modification, &ctx)
.await

View File

@@ -162,6 +162,7 @@ where
.expect("must not use after we returned an error")
}
#[cfg_attr(target_os = "macos", allow(dead_code))]
pub async fn write_buffered_borrowed(
&mut self,
chunk: &[u8],

View File

@@ -582,18 +582,21 @@ impl WalIngest {
forknum: FSM_FORKNUM,
};
// Zero out the last remaining FSM page, if this shard owns it. We are not precise here,
// and instead of digging in the FSM bitmap format we just clear the whole page.
let fsm_logical_page_no = blkno / pg_constants::SLOTS_PER_FSM_PAGE;
let mut fsm_physical_page_no = fsm_logical_to_physical(fsm_logical_page_no);
if blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0 {
// Tail of last remaining FSM page has to be zeroed.
// We are not precise here and instead of digging in FSM bitmap format just clear the whole page.
if blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0
&& self
.shard
.is_key_local(&rel_block_to_key(rel, fsm_physical_page_no))
{
modification.put_rel_page_image_zero(rel, fsm_physical_page_no)?;
fsm_physical_page_no += 1;
}
// TODO: re-examine the None case here wrt. sharding; should we error?
// Truncate this shard's view of the FSM relation size, if it even has one.
let nblocks = get_relsize(modification, rel, ctx).await?.unwrap_or(0);
if nblocks > fsm_physical_page_no {
// check if something to do: FSM is larger than truncate position
self.put_rel_truncation(modification, rel, fsm_physical_page_no, ctx)
.await?;
}
@@ -617,7 +620,7 @@ impl WalIngest {
// tail bits in the last remaining map page, representing truncated heap
// blocks, need to be cleared. This is not only tidy, but also necessary
// because we don't get a chance to clear the bits if the heap is extended
// again.
// again. Only do this on the shard that owns the page.
if (trunc_byte != 0 || trunc_offs != 0)
&& self.shard.is_key_local(&rel_block_to_key(rel, vm_page_no))
{
@@ -631,10 +634,9 @@ impl WalIngest {
)?;
vm_page_no += 1;
}
// TODO: re-examine the None case here wrt. sharding; should we error?
// Truncate this shard's view of the VM relation size, if it even has one.
let nblocks = get_relsize(modification, rel, ctx).await?.unwrap_or(0);
if nblocks > vm_page_no {
// check if something to do: VM is larger than truncate position
self.put_rel_truncation(modification, rel, vm_page_no, ctx)
.await?;
}

View File

@@ -610,6 +610,9 @@ prefetch_read(PrefetchRequest *slot)
{
NeonResponse *response;
MemoryContext old;
BufferTag buftag;
shardno_t shard_no;
uint64 my_ring_index;
Assert(slot->status == PRFS_REQUESTED);
Assert(slot->response == NULL);
@@ -623,11 +626,29 @@ prefetch_read(PrefetchRequest *slot)
slot->status, slot->response,
(long)slot->my_ring_index, (long)MyPState->ring_receive);
/*
* Copy the request info so that if an error happens and the prefetch
* queue is flushed during the receive call, we can print the original
* values in the error message
*/
buftag = slot->buftag;
shard_no = slot->shard_no;
my_ring_index = slot->my_ring_index;
old = MemoryContextSwitchTo(MyPState->errctx);
response = (NeonResponse *) page_server->receive(slot->shard_no);
response = (NeonResponse *) page_server->receive(shard_no);
MemoryContextSwitchTo(old);
if (response)
{
/* The slot should still be valid */
if (slot->status != PRFS_REQUESTED ||
slot->response != NULL ||
slot->my_ring_index != MyPState->ring_receive)
neon_shard_log(shard_no, ERROR,
"Incorrect prefetch slot state after receive: status=%d response=%p my=%lu receive=%lu",
slot->status, slot->response,
(long) slot->my_ring_index, (long) MyPState->ring_receive);
/* update prefetch state */
MyPState->n_responses_buffered += 1;
MyPState->n_requests_inflight -= 1;
@@ -642,11 +663,15 @@ prefetch_read(PrefetchRequest *slot)
}
else
{
neon_shard_log(slot->shard_no, LOG,
/*
* Note: The slot might no longer be valid, if the connection was lost
* and the prefetch queue was flushed during the receive call
*/
neon_shard_log(shard_no, LOG,
"No response from reading prefetch entry %lu: %u/%u/%u.%u block %u. This can be caused by a concurrent disconnect",
(long)slot->my_ring_index,
RelFileInfoFmt(BufTagGetNRelFileInfo(slot->buftag)),
slot->buftag.forkNum, slot->buftag.blockNum);
(long) my_ring_index,
RelFileInfoFmt(BufTagGetNRelFileInfo(buftag)),
buftag.forkNum, buftag.blockNum);
return false;
}
}

View File

@@ -70,6 +70,10 @@ impl std::fmt::Display for Backend<'_, ()> {
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ControlPlane(api, ()) => match &**api {
ControlPlaneClient::ProxyV1(endpoint) => fmt
.debug_tuple("ControlPlane::ProxyV1")
.field(&endpoint.url())
.finish(),
ControlPlaneClient::Neon(endpoint) => fmt
.debug_tuple("ControlPlane::Neon")
.field(&endpoint.url())

View File

@@ -46,6 +46,9 @@ enum AuthBackendType {
#[value(name("console"), alias("cplane"))]
ControlPlane,
#[value(name("cplane-v1"), alias("control-plane"))]
ControlPlaneV1,
#[value(name("link"), alias("control-redirect"))]
ConsoleRedirect,
@@ -518,6 +521,39 @@ async fn main() -> anyhow::Result<()> {
.instrument(span),
);
}
} else if let proxy::control_plane::client::ControlPlaneClient::ProxyV1(api) = &**api {
match (redis_notifications_client, regional_redis_client.clone()) {
(None, None) => {}
(client1, client2) => {
let cache = api.caches.project_info.clone();
if let Some(client) = client1 {
maintenance_tasks.spawn(notifications::task_main(
client,
cache.clone(),
cancel_map.clone(),
args.region.clone(),
));
}
if let Some(client) = client2 {
maintenance_tasks.spawn(notifications::task_main(
client,
cache.clone(),
cancel_map.clone(),
args.region.clone(),
));
}
maintenance_tasks.spawn(async move { cache.clone().gc_worker().await });
}
}
if let Some(regional_redis_client) = regional_redis_client {
let cache = api.caches.endpoints_cache.clone();
let con = regional_redis_client;
let span = tracing::info_span!("endpoints_cache");
maintenance_tasks.spawn(
async move { cache.do_read(con, cancellation_token.clone()).await }
.instrument(span),
);
}
}
}
@@ -662,6 +698,65 @@ fn build_auth_backend(
args: &ProxyCliArgs,
) -> anyhow::Result<Either<&'static auth::Backend<'static, ()>, &'static ConsoleRedirectBackend>> {
match &args.auth_backend {
AuthBackendType::ControlPlaneV1 => {
let wake_compute_cache_config: CacheOptions = args.wake_compute_cache.parse()?;
let project_info_cache_config: ProjectInfoCacheOptions =
args.project_info_cache.parse()?;
let endpoint_cache_config: config::EndpointCacheConfig =
args.endpoint_cache_config.parse()?;
info!("Using NodeInfoCache (wake_compute) with options={wake_compute_cache_config:?}");
info!(
"Using AllowedIpsCache (wake_compute) with options={project_info_cache_config:?}"
);
info!("Using EndpointCacheConfig with options={endpoint_cache_config:?}");
let caches = Box::leak(Box::new(control_plane::caches::ApiCaches::new(
wake_compute_cache_config,
project_info_cache_config,
endpoint_cache_config,
)));
let config::ConcurrencyLockOptions {
shards,
limiter,
epoch,
timeout,
} = args.wake_compute_lock.parse()?;
info!(?limiter, shards, ?epoch, "Using NodeLocks (wake_compute)");
let locks = Box::leak(Box::new(control_plane::locks::ApiLocks::new(
"wake_compute_lock",
limiter,
shards,
timeout,
epoch,
&Metrics::get().wake_compute_lock,
)?));
tokio::spawn(locks.garbage_collect_worker());
let url: proxy::url::ApiUrl = args.auth_endpoint.parse()?;
let endpoint = http::Endpoint::new(url, http::new_client());
let mut wake_compute_rps_limit = args.wake_compute_limit.clone();
RateBucketInfo::validate(&mut wake_compute_rps_limit)?;
let wake_compute_endpoint_rate_limiter =
Arc::new(WakeComputeRateLimiter::new(wake_compute_rps_limit));
let api = control_plane::client::cplane_proxy_v1::NeonControlPlaneClient::new(
endpoint,
args.control_plane_token.clone(),
caches,
locks,
wake_compute_endpoint_rate_limiter,
);
let api = control_plane::client::ControlPlaneClient::ProxyV1(api);
let auth_backend = auth::Backend::ControlPlane(MaybeOwned::Owned(api), ());
let config = Box::leak(Box::new(auth_backend));
Ok(Either::Left(config))
}
AuthBackendType::ControlPlane => {
let wake_compute_cache_config: CacheOptions = args.wake_compute_cache.parse()?;
let project_info_cache_config: ProjectInfoCacheOptions =
@@ -697,13 +792,15 @@ fn build_auth_backend(
)?));
tokio::spawn(locks.garbage_collect_worker());
let url = args.auth_endpoint.parse()?;
let url: proxy::url::ApiUrl = args.auth_endpoint.parse()?;
let endpoint = http::Endpoint::new(url, http::new_client());
let mut wake_compute_rps_limit = args.wake_compute_limit.clone();
RateBucketInfo::validate(&mut wake_compute_rps_limit)?;
let wake_compute_endpoint_rate_limiter =
Arc::new(WakeComputeRateLimiter::new(wake_compute_rps_limit));
let api = control_plane::client::neon::NeonControlPlaneClient::new(
endpoint,
args.control_plane_token.clone(),

View File

@@ -0,0 +1,514 @@
//! Production console backend.
use std::sync::Arc;
use std::time::Duration;
use ::http::header::AUTHORIZATION;
use ::http::HeaderName;
use futures::TryFutureExt;
use postgres_client::config::SslMode;
use tokio::time::Instant;
use tracing::{debug, info, info_span, warn, Instrument};
use super::super::messages::{ControlPlaneErrorMessage, GetEndpointAccessControl, WakeCompute};
use crate::auth::backend::jwt::AuthRule;
use crate::auth::backend::ComputeUserInfo;
use crate::cache::Cached;
use crate::context::RequestContext;
use crate::control_plane::caches::ApiCaches;
use crate::control_plane::errors::{
ControlPlaneError, GetAuthInfoError, GetEndpointJwksError, WakeComputeError,
};
use crate::control_plane::locks::ApiLocks;
use crate::control_plane::messages::{ColdStartInfo, EndpointJwksResponse, Reason};
use crate::control_plane::{
AuthInfo, AuthSecret, CachedAllowedIps, CachedNodeInfo, CachedRoleSecret, NodeInfo,
};
use crate::metrics::{CacheOutcome, Metrics};
use crate::rate_limiter::WakeComputeRateLimiter;
use crate::types::{EndpointCacheKey, EndpointId};
use crate::{compute, http, scram};
const X_REQUEST_ID: HeaderName = HeaderName::from_static("x-request-id");
#[derive(Clone)]
pub struct NeonControlPlaneClient {
endpoint: http::Endpoint,
pub caches: &'static ApiCaches,
pub(crate) locks: &'static ApiLocks<EndpointCacheKey>,
pub(crate) wake_compute_endpoint_rate_limiter: Arc<WakeComputeRateLimiter>,
// put in a shared ref so we don't copy secrets all over in memory
jwt: Arc<str>,
}
impl NeonControlPlaneClient {
/// Construct an API object containing the auth parameters.
pub fn new(
endpoint: http::Endpoint,
jwt: Arc<str>,
caches: &'static ApiCaches,
locks: &'static ApiLocks<EndpointCacheKey>,
wake_compute_endpoint_rate_limiter: Arc<WakeComputeRateLimiter>,
) -> Self {
Self {
endpoint,
caches,
locks,
wake_compute_endpoint_rate_limiter,
jwt,
}
}
pub(crate) fn url(&self) -> &str {
self.endpoint.url().as_str()
}
async fn do_get_auth_info(
&self,
ctx: &RequestContext,
user_info: &ComputeUserInfo,
) -> Result<AuthInfo, GetAuthInfoError> {
if !self
.caches
.endpoints_cache
.is_valid(ctx, &user_info.endpoint.normalize())
{
// TODO: refactor this because it's weird
// this is a failure to authenticate but we return Ok.
info!("endpoint is not valid, skipping the request");
return Ok(AuthInfo::default());
}
let request_id = ctx.session_id().to_string();
let application_name = ctx.console_application_name();
async {
let request = self
.endpoint
.get_path("get_endpoint_access_control")
.header(X_REQUEST_ID, &request_id)
.header(AUTHORIZATION, format!("Bearer {}", &self.jwt))
.query(&[("session_id", ctx.session_id())])
.query(&[
("application_name", application_name.as_str()),
("endpointish", user_info.endpoint.as_str()),
("role", user_info.user.as_str()),
])
.build()?;
debug!(url = request.url().as_str(), "sending http request");
let start = Instant::now();
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Cplane);
let response = self.endpoint.execute(request).await?;
drop(pause);
info!(duration = ?start.elapsed(), "received http response");
let body = match parse_body::<GetEndpointAccessControl>(response).await {
Ok(body) => body,
// Error 404 is special: it's ok not to have a secret.
// TODO(anna): retry
Err(e) => {
return if e.get_reason().is_not_found() {
// TODO: refactor this because it's weird
// this is a failure to authenticate but we return Ok.
Ok(AuthInfo::default())
} else {
Err(e.into())
};
}
};
// Ivan: don't know where it will be used, so I leave it here
let _endpoint_vpc_ids = body.allowed_vpc_endpoint_ids.unwrap_or_default();
let secret = if body.role_secret.is_empty() {
None
} else {
let secret = scram::ServerSecret::parse(&body.role_secret)
.map(AuthSecret::Scram)
.ok_or(GetAuthInfoError::BadSecret)?;
Some(secret)
};
let allowed_ips = body.allowed_ips.unwrap_or_default();
Metrics::get()
.proxy
.allowed_ips_number
.observe(allowed_ips.len() as f64);
Ok(AuthInfo {
secret,
allowed_ips,
project_id: body.project_id,
})
}
.inspect_err(|e| tracing::debug!(error = ?e))
.instrument(info_span!("do_get_auth_info"))
.await
}
async fn do_get_endpoint_jwks(
&self,
ctx: &RequestContext,
endpoint: EndpointId,
) -> Result<Vec<AuthRule>, GetEndpointJwksError> {
if !self
.caches
.endpoints_cache
.is_valid(ctx, &endpoint.normalize())
{
return Err(GetEndpointJwksError::EndpointNotFound);
}
let request_id = ctx.session_id().to_string();
async {
let request = self
.endpoint
.get_with_url(|url| {
url.path_segments_mut()
.push("endpoints")
.push(endpoint.as_str())
.push("jwks");
})
.header(X_REQUEST_ID, &request_id)
.header(AUTHORIZATION, format!("Bearer {}", &self.jwt))
.query(&[("session_id", ctx.session_id())])
.build()
.map_err(GetEndpointJwksError::RequestBuild)?;
debug!(url = request.url().as_str(), "sending http request");
let start = Instant::now();
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Cplane);
let response = self
.endpoint
.execute(request)
.await
.map_err(GetEndpointJwksError::RequestExecute)?;
drop(pause);
info!(duration = ?start.elapsed(), "received http response");
let body = parse_body::<EndpointJwksResponse>(response).await?;
let rules = body
.jwks
.into_iter()
.map(|jwks| AuthRule {
id: jwks.id,
jwks_url: jwks.jwks_url,
audience: jwks.jwt_audience,
role_names: jwks.role_names,
})
.collect();
Ok(rules)
}
.inspect_err(|e| tracing::debug!(error = ?e))
.instrument(info_span!("do_get_endpoint_jwks"))
.await
}
async fn do_wake_compute(
&self,
ctx: &RequestContext,
user_info: &ComputeUserInfo,
) -> Result<NodeInfo, WakeComputeError> {
let request_id = ctx.session_id().to_string();
let application_name = ctx.console_application_name();
async {
let mut request_builder = self
.endpoint
.get_path("wake_compute")
.header("X-Request-ID", &request_id)
.header("Authorization", format!("Bearer {}", &self.jwt))
.query(&[("session_id", ctx.session_id())])
.query(&[
("application_name", application_name.as_str()),
("endpointish", user_info.endpoint.as_str()),
]);
let options = user_info.options.to_deep_object();
if !options.is_empty() {
request_builder = request_builder.query(&options);
}
let request = request_builder.build()?;
debug!(url = request.url().as_str(), "sending http request");
let start = Instant::now();
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Cplane);
let response = self.endpoint.execute(request).await?;
drop(pause);
info!(duration = ?start.elapsed(), "received http response");
let body = parse_body::<WakeCompute>(response).await?;
// Unfortunately, ownership won't let us use `Option::ok_or` here.
let (host, port) = match parse_host_port(&body.address) {
None => return Err(WakeComputeError::BadComputeAddress(body.address)),
Some(x) => x,
};
// Don't set anything but host and port! This config will be cached.
// We'll set username and such later using the startup message.
// TODO: add more type safety (in progress).
let mut config = compute::ConnCfg::new(host.to_owned(), port);
config.ssl_mode(SslMode::Disable); // TLS is not configured on compute nodes.
let node = NodeInfo {
config,
aux: body.aux,
allow_self_signed_compute: false,
};
Ok(node)
}
.inspect_err(|e| tracing::debug!(error = ?e))
.instrument(info_span!("do_wake_compute"))
.await
}
}
impl super::ControlPlaneApi for NeonControlPlaneClient {
#[tracing::instrument(skip_all)]
async fn get_role_secret(
&self,
ctx: &RequestContext,
user_info: &ComputeUserInfo,
) -> Result<CachedRoleSecret, GetAuthInfoError> {
let normalized_ep = &user_info.endpoint.normalize();
let user = &user_info.user;
if let Some(role_secret) = self
.caches
.project_info
.get_role_secret(normalized_ep, user)
{
return Ok(role_secret);
}
let auth_info = self.do_get_auth_info(ctx, user_info).await?;
if let Some(project_id) = auth_info.project_id {
let normalized_ep_int = normalized_ep.into();
self.caches.project_info.insert_role_secret(
project_id,
normalized_ep_int,
user.into(),
auth_info.secret.clone(),
);
self.caches.project_info.insert_allowed_ips(
project_id,
normalized_ep_int,
Arc::new(auth_info.allowed_ips),
);
ctx.set_project_id(project_id);
}
// When we just got a secret, we don't need to invalidate it.
Ok(Cached::new_uncached(auth_info.secret))
}
async fn get_allowed_ips_and_secret(
&self,
ctx: &RequestContext,
user_info: &ComputeUserInfo,
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), GetAuthInfoError> {
let normalized_ep = &user_info.endpoint.normalize();
if let Some(allowed_ips) = self.caches.project_info.get_allowed_ips(normalized_ep) {
Metrics::get()
.proxy
.allowed_ips_cache_misses
.inc(CacheOutcome::Hit);
return Ok((allowed_ips, None));
}
Metrics::get()
.proxy
.allowed_ips_cache_misses
.inc(CacheOutcome::Miss);
let auth_info = self.do_get_auth_info(ctx, user_info).await?;
let allowed_ips = Arc::new(auth_info.allowed_ips);
let user = &user_info.user;
if let Some(project_id) = auth_info.project_id {
let normalized_ep_int = normalized_ep.into();
self.caches.project_info.insert_role_secret(
project_id,
normalized_ep_int,
user.into(),
auth_info.secret.clone(),
);
self.caches.project_info.insert_allowed_ips(
project_id,
normalized_ep_int,
allowed_ips.clone(),
);
ctx.set_project_id(project_id);
}
Ok((
Cached::new_uncached(allowed_ips),
Some(Cached::new_uncached(auth_info.secret)),
))
}
#[tracing::instrument(skip_all)]
async fn get_endpoint_jwks(
&self,
ctx: &RequestContext,
endpoint: EndpointId,
) -> Result<Vec<AuthRule>, GetEndpointJwksError> {
self.do_get_endpoint_jwks(ctx, endpoint).await
}
#[tracing::instrument(skip_all)]
async fn wake_compute(
&self,
ctx: &RequestContext,
user_info: &ComputeUserInfo,
) -> Result<CachedNodeInfo, WakeComputeError> {
let key = user_info.endpoint_cache_key();
macro_rules! check_cache {
() => {
if let Some(cached) = self.caches.node_info.get(&key) {
let (cached, info) = cached.take_value();
let info = info.map_err(|c| {
info!(key = &*key, "found cached wake_compute error");
WakeComputeError::ControlPlane(ControlPlaneError::Message(Box::new(*c)))
})?;
debug!(key = &*key, "found cached compute node info");
ctx.set_project(info.aux.clone());
return Ok(cached.map(|()| info));
}
};
}
// Every time we do a wakeup http request, the compute node will stay up
// for some time (highly depends on the console's scale-to-zero policy);
// The connection info remains the same during that period of time,
// which means that we might cache it to reduce the load and latency.
check_cache!();
let permit = self.locks.get_permit(&key).await?;
// after getting back a permit - it's possible the cache was filled
// double check
if permit.should_check_cache() {
// TODO: if there is something in the cache, mark the permit as success.
check_cache!();
}
// check rate limit
if !self
.wake_compute_endpoint_rate_limiter
.check(user_info.endpoint.normalize_intern(), 1)
{
return Err(WakeComputeError::TooManyConnections);
}
let node = permit.release_result(self.do_wake_compute(ctx, user_info).await);
match node {
Ok(node) => {
ctx.set_project(node.aux.clone());
debug!(key = &*key, "created a cache entry for woken compute node");
let mut stored_node = node.clone();
// store the cached node as 'warm_cached'
stored_node.aux.cold_start_info = ColdStartInfo::WarmCached;
let (_, cached) = self.caches.node_info.insert_unit(key, Ok(stored_node));
Ok(cached.map(|()| node))
}
Err(err) => match err {
WakeComputeError::ControlPlane(ControlPlaneError::Message(err)) => {
let Some(status) = &err.status else {
return Err(WakeComputeError::ControlPlane(ControlPlaneError::Message(
err,
)));
};
let reason = status
.details
.error_info
.map_or(Reason::Unknown, |x| x.reason);
// if we can retry this error, do not cache it.
if reason.can_retry() {
return Err(WakeComputeError::ControlPlane(ControlPlaneError::Message(
err,
)));
}
// at this point, we should only have quota errors.
debug!(
key = &*key,
"created a cache entry for the wake compute error"
);
self.caches.node_info.insert_ttl(
key,
Err(err.clone()),
Duration::from_secs(30),
);
Err(WakeComputeError::ControlPlane(ControlPlaneError::Message(
err,
)))
}
err => return Err(err),
},
}
}
}
/// Parse http response body, taking status code into account.
async fn parse_body<T: for<'a> serde::Deserialize<'a>>(
response: http::Response,
) -> Result<T, ControlPlaneError> {
let status = response.status();
if status.is_success() {
// We shouldn't log raw body because it may contain secrets.
info!("request succeeded, processing the body");
return Ok(response.json().await?);
}
let s = response.bytes().await?;
// Log plaintext to be able to detect, whether there are some cases not covered by the error struct.
info!("response_error plaintext: {:?}", s);
// Don't throw an error here because it's not as important
// as the fact that the request itself has failed.
let mut body = serde_json::from_slice(&s).unwrap_or_else(|e| {
warn!("failed to parse error body: {e}");
ControlPlaneErrorMessage {
error: "reason unclear (malformed error message)".into(),
http_status_code: status,
status: None,
}
});
body.http_status_code = status;
warn!("console responded with an error ({status}): {body:?}");
Err(ControlPlaneError::Message(Box::new(body)))
}
fn parse_host_port(input: &str) -> Option<(&str, u16)> {
let (host, port) = input.rsplit_once(':')?;
let ipv6_brackets: &[_] = &['[', ']'];
Some((host.trim_matches(ipv6_brackets), port.parse().ok()?))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_host_port_v4() {
let (host, port) = parse_host_port("127.0.0.1:5432").expect("failed to parse");
assert_eq!(host, "127.0.0.1");
assert_eq!(port, 5432);
}
#[test]
fn test_parse_host_port_v6() {
let (host, port) = parse_host_port("[2001:db8::1]:5432").expect("failed to parse");
assert_eq!(host, "2001:db8::1");
assert_eq!(port, 5432);
}
#[test]
fn test_parse_host_port_url() {
let (host, port) = parse_host_port("compute-foo-bar-1234.default.svc.cluster.local:5432")
.expect("failed to parse");
assert_eq!(host, "compute-foo-bar-1234.default.svc.cluster.local");
assert_eq!(port, 5432);
}
}

View File

@@ -1,3 +1,4 @@
pub mod cplane_proxy_v1;
#[cfg(any(test, feature = "testing"))]
pub mod mock;
pub mod neon;
@@ -27,6 +28,8 @@ use crate::types::EndpointId;
#[non_exhaustive]
#[derive(Clone)]
pub enum ControlPlaneClient {
/// New Proxy V1 control plane API
ProxyV1(cplane_proxy_v1::NeonControlPlaneClient),
/// Current Management API (V2).
Neon(neon::NeonControlPlaneClient),
/// Local mock control plane.
@@ -45,6 +48,7 @@ impl ControlPlaneApi for ControlPlaneClient {
user_info: &ComputeUserInfo,
) -> Result<CachedRoleSecret, errors::GetAuthInfoError> {
match self {
Self::ProxyV1(api) => api.get_role_secret(ctx, user_info).await,
Self::Neon(api) => api.get_role_secret(ctx, user_info).await,
#[cfg(any(test, feature = "testing"))]
Self::PostgresMock(api) => api.get_role_secret(ctx, user_info).await,
@@ -61,6 +65,7 @@ impl ControlPlaneApi for ControlPlaneClient {
user_info: &ComputeUserInfo,
) -> Result<(CachedAllowedIps, Option<CachedRoleSecret>), errors::GetAuthInfoError> {
match self {
Self::ProxyV1(api) => api.get_allowed_ips_and_secret(ctx, user_info).await,
Self::Neon(api) => api.get_allowed_ips_and_secret(ctx, user_info).await,
#[cfg(any(test, feature = "testing"))]
Self::PostgresMock(api) => api.get_allowed_ips_and_secret(ctx, user_info).await,
@@ -75,6 +80,7 @@ impl ControlPlaneApi for ControlPlaneClient {
endpoint: EndpointId,
) -> Result<Vec<AuthRule>, errors::GetEndpointJwksError> {
match self {
Self::ProxyV1(api) => api.get_endpoint_jwks(ctx, endpoint).await,
Self::Neon(api) => api.get_endpoint_jwks(ctx, endpoint).await,
#[cfg(any(test, feature = "testing"))]
Self::PostgresMock(api) => api.get_endpoint_jwks(ctx, endpoint).await,
@@ -89,6 +95,7 @@ impl ControlPlaneApi for ControlPlaneClient {
user_info: &ComputeUserInfo,
) -> Result<CachedNodeInfo, errors::WakeComputeError> {
match self {
Self::ProxyV1(api) => api.wake_compute(ctx, user_info).await,
Self::Neon(api) => api.wake_compute(ctx, user_info).await,
#[cfg(any(test, feature = "testing"))]
Self::PostgresMock(api) => api.wake_compute(ctx, user_info).await,

View File

@@ -1,4 +1,4 @@
//! Production console backend.
//! Stale console backend, remove after migrating to Proxy V1 API (#15245).
use std::sync::Arc;
use std::time::Duration;

View File

@@ -230,6 +230,16 @@ pub(crate) struct GetRoleSecret {
pub(crate) project_id: Option<ProjectIdInt>,
}
/// Response which holds client's auth secret, e.g. [`crate::scram::ServerSecret`].
/// Returned by the `/get_endpoint_access_control` API method.
#[derive(Deserialize)]
pub(crate) struct GetEndpointAccessControl {
pub(crate) role_secret: Box<str>,
pub(crate) allowed_ips: Option<Vec<IpPattern>>,
pub(crate) project_id: Option<ProjectIdInt>,
pub(crate) allowed_vpc_endpoint_ids: Option<Vec<EndpointIdInt>>,
}
// Manually implement debug to omit sensitive info.
impl fmt::Debug for GetRoleSecret {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {

View File

@@ -636,6 +636,13 @@ impl Persistence {
.into_boxed(),
};
// Clear generation_pageserver if we are moving into a state where we won't have
// any attached pageservers.
let input_generation_pageserver = match input_placement_policy {
None | Some(PlacementPolicy::Attached(_)) => None,
Some(PlacementPolicy::Detached | PlacementPolicy::Secondary) => Some(None),
};
#[derive(AsChangeset)]
#[diesel(table_name = crate::schema::tenant_shards)]
struct ShardUpdate {
@@ -643,6 +650,7 @@ impl Persistence {
placement_policy: Option<String>,
config: Option<String>,
scheduling_policy: Option<String>,
generation_pageserver: Option<Option<i64>>,
}
let update = ShardUpdate {
@@ -655,6 +663,7 @@ impl Persistence {
.map(|c| serde_json::to_string(&c).unwrap()),
scheduling_policy: input_scheduling_policy
.map(|p| serde_json::to_string(&p).unwrap()),
generation_pageserver: input_generation_pageserver,
};
query.set(update).execute(conn)?;

View File

@@ -513,6 +513,9 @@ struct ShardUpdate {
/// If this is None, generation is not updated.
generation: Option<Generation>,
/// If this is None, scheduling policy is not updated.
scheduling_policy: Option<ShardSchedulingPolicy>,
}
enum StopReconciliationsReason {
@@ -789,7 +792,7 @@ impl Service {
node_list_futs.push({
async move {
tracing::info!("Scanning shards on node {node}...");
let timeout = Duration::from_secs(1);
let timeout = Duration::from_secs(5);
let response = node
.with_client_retries(
|client| async move { client.list_location_config().await },
@@ -2376,6 +2379,23 @@ impl Service {
}
};
// Ordinarily we do not update scheduling policy, but when making major changes
// like detaching or demoting to secondary-only, we need to force the scheduling
// mode to Active, or the caller's expected outcome (detach it) will not happen.
let scheduling_policy = match req.config.mode {
LocationConfigMode::Detached | LocationConfigMode::Secondary => {
// Special case: when making major changes like detaching or demoting to secondary-only,
// we need to force the scheduling mode to Active, or nothing will happen.
Some(ShardSchedulingPolicy::Active)
}
LocationConfigMode::AttachedMulti
| LocationConfigMode::AttachedSingle
| LocationConfigMode::AttachedStale => {
// While attached, continue to respect whatever the existing scheduling mode is.
None
}
};
let mut create = true;
for (shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
// Saw an existing shard: this is not a creation
@@ -2401,6 +2421,7 @@ impl Service {
placement_policy: placement_policy.clone(),
tenant_config: req.config.tenant_conf.clone(),
generation: set_generation,
scheduling_policy,
});
}
@@ -2497,6 +2518,7 @@ impl Service {
placement_policy,
tenant_config,
generation,
scheduling_policy,
} in &updates
{
self.persistence
@@ -2505,7 +2527,7 @@ impl Service {
Some(placement_policy.clone()),
Some(tenant_config.clone()),
*generation,
None,
*scheduling_policy,
)
.await?;
}
@@ -2521,6 +2543,7 @@ impl Service {
placement_policy,
tenant_config,
generation: update_generation,
scheduling_policy,
} in updates
{
let Some(shard) = tenants.get_mut(&tenant_shard_id) else {
@@ -2539,6 +2562,10 @@ impl Service {
shard.generation = Some(generation);
}
if let Some(scheduling_policy) = scheduling_policy {
shard.set_scheduling_policy(scheduling_policy);
}
shard.schedule(scheduler, &mut schedule_context)?;
let maybe_waiter = self.maybe_reconcile_shard(shard, nodes);
@@ -2992,9 +3019,17 @@ impl Service {
let TenantPolicyRequest {
placement,
scheduling,
mut scheduling,
} = req;
if let Some(PlacementPolicy::Detached | PlacementPolicy::Secondary) = placement {
// When someone configures a tenant to detach, we force the scheduling policy to enable
// this to take effect.
if scheduling.is_none() {
scheduling = Some(ShardSchedulingPolicy::Active);
}
}
self.persistence
.update_tenant_shard(
TenantFilter::Tenant(tenant_id),

View File

@@ -268,7 +268,7 @@ impl BucketConfig {
config.bucket_name, config.bucket_region
),
RemoteStorageKind::AzureContainer(config) => format!(
"bucket {}, storage account {:?}, region {}",
"container {}, storage account {:?}, region {}",
config.container_name, config.storage_account, config.container_region
),
}

View File

@@ -152,6 +152,8 @@ PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = (
"pageserver_resident_physical_size",
"pageserver_io_operations_bytes_total",
"pageserver_last_record_lsn",
"pageserver_disk_consistent_lsn",
"pageserver_projected_remote_consistent_lsn",
"pageserver_standby_horizon",
"pageserver_smgr_query_seconds_bucket",
"pageserver_smgr_query_seconds_count",
@@ -173,6 +175,8 @@ PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = (
counter("pageserver_tenant_throttling_count_accounted_finish"),
counter("pageserver_tenant_throttling_wait_usecs_sum"),
counter("pageserver_tenant_throttling_count"),
counter("pageserver_timeline_wal_records_received"),
counter("pageserver_page_service_pagestream_flush_in_progress_micros"),
*histogram("pageserver_page_service_batch_size"),
*PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS,
# "pageserver_directory_entries_count", -- only used if above a certain threshold

View File

@@ -850,6 +850,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
force_repartition=False,
force_image_layer_creation=False,
force_l0_compaction=False,
wait_until_flushed=True,
wait_until_uploaded=False,
compact: bool | None = None,
**kwargs,
@@ -862,6 +863,8 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
query["force_image_layer_creation"] = "true"
if force_l0_compaction:
query["force_l0_compaction"] = "true"
if not wait_until_flushed:
query["wait_until_flushed"] = "false"
if wait_until_uploaded:
query["wait_until_uploaded"] = "true"
@@ -869,7 +872,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
query["compact"] = "true" if compact else "false"
log.info(
f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}, wait_until_uploaded={wait_until_uploaded}"
f"Requesting checkpoint: tenant={tenant_id} timeline={timeline_id} wait_until_flushed={wait_until_flushed} wait_until_uploaded={wait_until_uploaded} compact={compact}"
)
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint",

View File

@@ -54,23 +54,15 @@ def wait_for_upload(
tenant: TenantId | TenantShardId,
timeline: TimelineId,
lsn: Lsn,
timeout=20,
):
"""waits for local timeline upload up to specified lsn"""
"""Waits for local timeline upload up to specified LSN"""
current_lsn = Lsn(0)
for i in range(20):
current_lsn = remote_consistent_lsn(pageserver_http, tenant, timeline)
if current_lsn >= lsn:
log.info("wait finished")
return
lr_lsn = last_record_lsn(pageserver_http, tenant, timeline)
log.info(
f"waiting for remote_consistent_lsn to reach {lsn}, now {current_lsn}, last_record_lsn={lr_lsn}, iteration {i + 1}"
)
time.sleep(1)
raise Exception(
f"timed out while waiting for {tenant}/{timeline} remote_consistent_lsn to reach {lsn}, was {current_lsn}"
)
def is_uploaded():
remote_lsn = remote_consistent_lsn(pageserver_http, tenant, timeline)
assert remote_lsn >= lsn, f"remote_consistent_lsn at {remote_lsn}"
wait_until(is_uploaded, name=f"upload to {lsn}", timeout=timeout)
def _tenant_in_expected_state(tenant_info: dict[str, Any], expected_state: str):

View File

@@ -0,0 +1,142 @@
from __future__ import annotations
import random
from concurrent.futures import ThreadPoolExecutor
import pytest
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
from fixtures.common_types import Lsn
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
wait_for_last_flush_lsn,
)
from fixtures.pageserver.utils import (
wait_for_last_record_lsn,
wait_for_upload,
wait_for_upload_queue_empty,
)
from fixtures.remote_storage import s3_storage
@pytest.mark.timeout(900)
@pytest.mark.parametrize("size", [8, 1024, 8192])
@pytest.mark.parametrize("s3", [True, False], ids=["s3", "local"])
@pytest.mark.parametrize("backpressure", [True, False], ids=["backpressure", "nobackpressure"])
@pytest.mark.parametrize("fsync", [True, False], ids=["fsync", "nofsync"])
def test_ingest_insert_bulk(
request: pytest.FixtureRequest,
neon_env_builder: NeonEnvBuilder,
zenbenchmark: NeonBenchmarker,
fsync: bool,
backpressure: bool,
s3: bool,
size: int,
):
"""
Benchmarks ingestion of 5 GB of sequential insert WAL. Measures ingestion and S3 upload
separately. Also does a Safekeeper→Pageserver re-ingestion to measure Pageserver ingestion in
isolation.
"""
CONCURRENCY = 1 # 1 is optimal without fsync or backpressure
VOLUME = 5 * 1024**3
rows = VOLUME // (size + 64) # +64 roughly accounts for per-row WAL overhead
neon_env_builder.safekeepers_enable_fsync = fsync
if s3:
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
# NB: don't use S3 for Safekeeper. It doesn't affect throughput (no backpressure), but it
# would compete with Pageserver for bandwidth.
# neon_env_builder.enable_safekeeper_remote_storage(s3_storage())
neon_env_builder.disable_scrub_on_exit() # immediate shutdown may leave stray layers
env = neon_env_builder.init_start()
endpoint = env.endpoints.create_start(
"main",
config_lines=[
f"fsync = {fsync}",
"max_replication_apply_lag = 0",
f"max_replication_flush_lag = {'10GB' if backpressure else '0'}",
# NB: neon_local defaults to 15MB, which is too slow -- production uses 500MB.
f"max_replication_write_lag = {'500MB' if backpressure else '0'}",
],
)
endpoint.safe_psql("create extension neon")
# Wait for the timeline to be propagated to the pageserver.
wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, env.initial_timeline)
# Ingest rows.
log.info("Ingesting data")
start_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0])
def insert_rows(endpoint, table, count, value):
with endpoint.connect().cursor() as cur:
cur.execute("set statement_timeout = 0")
cur.execute(f"create table {table} (id int, data bytea)")
cur.execute(f"insert into {table} values (generate_series(1, {count}), %s)", (value,))
with zenbenchmark.record_duration("upload"):
with zenbenchmark.record_duration("ingest"):
with ThreadPoolExecutor(max_workers=CONCURRENCY) as pool:
for i in range(CONCURRENCY):
# Write a random value for all rows. This is sufficient to prevent compression,
# e.g. in TOAST. Randomly generating every row is too slow.
value = random.randbytes(size)
worker_rows = rows / CONCURRENCY
pool.submit(insert_rows, endpoint, f"table{i}", worker_rows, value)
end_lsn = Lsn(endpoint.safe_psql("select pg_current_wal_lsn()")[0][0])
# Wait for pageserver to ingest the WAL.
client = env.pageserver.http_client()
wait_for_last_record_lsn(client, env.initial_tenant, env.initial_timeline, end_lsn)
# Wait for pageserver S3 upload. Checkpoint to flush the last in-memory layer.
client.timeline_checkpoint(
env.initial_tenant,
env.initial_timeline,
compact=False,
wait_until_flushed=False,
)
wait_for_upload(client, env.initial_tenant, env.initial_timeline, end_lsn, timeout=600)
# Empty out upload queue for next benchmark.
wait_for_upload_queue_empty(client, env.initial_tenant, env.initial_timeline)
backpressure_time = endpoint.safe_psql("select backpressure_throttling_time()")[0][0]
# Now that all data is ingested, delete and recreate the tenant in the pageserver. This will
# reingest all the WAL directly from the safekeeper. This gives us a baseline of how fast the
# pageserver can ingest this WAL in isolation.
status = env.storage_controller.inspect(tenant_shard_id=env.initial_tenant)
assert status is not None
endpoint.stop() # avoid spurious getpage errors
client.tenant_delete(env.initial_tenant)
env.pageserver.tenant_create(tenant_id=env.initial_tenant, generation=status[0])
with zenbenchmark.record_duration("recover"):
log.info("Recovering WAL into pageserver")
client.timeline_create(env.pg_version, env.initial_tenant, env.initial_timeline)
wait_for_last_record_lsn(client, env.initial_tenant, env.initial_timeline, end_lsn)
# Emit metrics.
wal_written_mb = round((end_lsn - start_lsn) / (1024 * 1024))
zenbenchmark.record("wal_written", wal_written_mb, "MB", MetricReport.TEST_PARAM)
zenbenchmark.record("row_count", rows, "rows", MetricReport.TEST_PARAM)
zenbenchmark.record("concurrency", CONCURRENCY, "clients", MetricReport.TEST_PARAM)
zenbenchmark.record(
"backpressure_time", backpressure_time // 1000, "ms", MetricReport.LOWER_IS_BETTER
)
props = {p["name"]: p["value"] for _, p in request.node.user_properties}
for name in ("ingest", "upload", "recover"):
throughput = int(wal_written_mb / props[name])
zenbenchmark.record(f"{name}_throughput", throughput, "MB/s", MetricReport.HIGHER_IS_BETTER)
# Pageserver shutdown will likely get stuck on the upload queue, just shut it down immediately.
env.stop(immediate=True)

View File

@@ -15,7 +15,7 @@ from fixtures.pageserver.http import PageserverApiException
from fixtures.utils import skip_in_debug_build, wait_until
from fixtures.workload import Workload
AGGRESIVE_COMPACTION_TENANT_CONF = {
AGGRESSIVE_COMPACTION_TENANT_CONF = {
# Disable gc and compaction. The test runs compaction manually.
"gc_period": "0s",
"compaction_period": "0s",
@@ -24,6 +24,7 @@ AGGRESIVE_COMPACTION_TENANT_CONF = {
# Compact small layers
"compaction_target_size": 1024**2,
"image_creation_threshold": 2,
# "lsn_lease_length": "0s", -- TODO: would cause branch creation errors, should fix later
}
@@ -51,7 +52,7 @@ def test_pageserver_compaction_smoke(
page_cache_size=10
"""
env = neon_env_builder.init_start(initial_tenant_conf=AGGRESIVE_COMPACTION_TENANT_CONF)
env = neon_env_builder.init_start(initial_tenant_conf=AGGRESSIVE_COMPACTION_TENANT_CONF)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
@@ -120,14 +121,28 @@ page_cache_size=10
assert vectored_average < 8
@pytest.mark.skip(
"This is being fixed and tracked in https://github.com/neondatabase/neon/issues/9114"
)
@skip_in_debug_build("only run with release build")
def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start(initial_tenant_conf=AGGRESIVE_COMPACTION_TENANT_CONF)
SMOKE_CONF = {
# Run both gc and gc-compaction.
"gc_period": "5s",
"compaction_period": "5s",
# No PiTR interval and small GC horizon
"pitr_interval": "0s",
"gc_horizon": f"{1024 ** 2}",
"lsn_lease_length": "0s",
}
env = neon_env_builder.init_start(initial_tenant_conf=SMOKE_CONF)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
row_count = 1000
churn_rounds = 10
row_count = 10000
churn_rounds = 50
ps_http = env.pageserver.http_client()
@@ -141,20 +156,28 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):
if i % 10 == 0:
log.info(f"Running churn round {i}/{churn_rounds} ...")
workload.churn_rows(row_count, env.pageserver.id)
# Force L0 compaction to ensure the number of layers is within bounds, so that gc-compaction can run.
ps_http.timeline_compact(tenant_id, timeline_id, force_l0_compaction=True)
assert ps_http.perf_info(tenant_id, timeline_id)[0]["num_of_l0"] <= 1
ps_http.timeline_compact(
tenant_id,
timeline_id,
enhanced_gc_bottom_most_compaction=True,
body={
"start": "000000000000000000000000000000000000",
"end": "030000000000000000000000000000000000",
"scheduled": True,
"sub_compaction": True,
"compact_range": {
"start": "000000000000000000000000000000000000",
# skip the SLRU range for now -- it races with get-lsn-by-timestamp, TODO: fix this
"end": "010000000000000000000000000000000000",
},
},
)
workload.churn_rows(row_count, env.pageserver.id)
# ensure gc_compaction is scheduled and it's actually running (instead of skipping due to no layers picked)
env.pageserver.assert_log_contains(
"scheduled_compact_timeline.*picked .* layers for compaction"
)
log.info("Validating at workload end ...")
workload.validate(env.pageserver.id)

View File

@@ -215,7 +215,7 @@ if SQL_EXPORTER is None:
#
# The "host" network mode allows sql_exporter to talk to the
# endpoint which is running on the host.
super().__init__("docker.io/burningalchemist/sql_exporter:0.13.1", network_mode="host")
super().__init__("docker.io/burningalchemist/sql_exporter:0.16.0", network_mode="host")
self.__logs_dir = logs_dir
self.__port = port

View File

@@ -3230,3 +3230,55 @@ def test_multi_attached_timeline_creation(neon_env_builder: NeonEnvBuilder, migr
# Always disable 'pause' failpoints, even on failure, to avoid hanging in shutdown
env.storage_controller.configure_failpoints((migration_failpoint.value, "off"))
raise
@run_only_on_default_postgres("Postgres version makes no difference here")
def test_storage_controller_detached_stopped(
neon_env_builder: NeonEnvBuilder,
):
"""
Test that detaching a tenant while it has scheduling policy set to Paused or Stop works
"""
remote_storage_kind = s3_storage()
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
neon_env_builder.num_pageservers = 1
env = neon_env_builder.init_configs()
env.start()
virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
tenant_id = TenantId.generate()
env.storage_controller.tenant_create(
tenant_id,
shard_count=1,
)
assert len(env.pageserver.http_client().tenant_list_locations()["tenant_shards"]) == 1
# Disable scheduling: ordinarily this would prevent the tenant's configuration being
# reconciled to pageservers, but this should be overridden when detaching.
env.storage_controller.allowed_errors.append(".*Scheduling is disabled by policy.*")
env.storage_controller.tenant_policy_update(
tenant_id,
{"scheduling": "Stop"},
)
env.storage_controller.consistency_check()
# Detach the tenant
virtual_ps_http.tenant_location_conf(
tenant_id,
{
"mode": "Detached",
"secondary_conf": None,
"tenant_conf": {},
"generation": None,
},
)
env.storage_controller.consistency_check()
# Confirm the detach happened
assert env.pageserver.http_client().tenant_list_locations()["tenant_shards"] == []

View File

@@ -33,6 +33,7 @@ deranged = { version = "0.3", default-features = false, features = ["powerfmt",
digest = { version = "0.10", features = ["mac", "oid", "std"] }
either = { version = "1" }
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
form_urlencoded = { version = "1" }
futures-channel = { version = "0.3", features = ["sink"] }
futures-executor = { version = "0.3" }
futures-io = { version = "0.3" }
@@ -78,6 +79,7 @@ sha2 = { version = "0.10", features = ["asm", "oid"] }
signature = { version = "2", default-features = false, features = ["digest", "rand_core", "std"] }
smallvec = { version = "1", default-features = false, features = ["const_new", "write"] }
spki = { version = "0.7", default-features = false, features = ["pem", "std"] }
stable_deref_trait = { version = "1" }
subtle = { version = "2" }
sync_wrapper = { version = "0.1", default-features = false, features = ["futures"] }
tikv-jemalloc-ctl = { version = "0.6", features = ["stats", "use_std"] }
@@ -105,6 +107,7 @@ anyhow = { version = "1", features = ["backtrace"] }
bytes = { version = "1", features = ["serde"] }
cc = { version = "1", default-features = false, features = ["parallel"] }
chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "wasmbind"] }
displaydoc = { version = "0.2" }
either = { version = "1" }
getrandom = { version = "0.2", default-features = false, features = ["std"] }
half = { version = "2", default-features = false, features = ["num-traits"] }