mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-25 09:00:37 +00:00
Compare commits
15 Commits
skyzh/forc
...
skyzh/upda
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
982a65703e | ||
|
|
13b5e7b26f | ||
|
|
dcdfe80bf0 | ||
|
|
8630d37f5e | ||
|
|
2fc77c836b | ||
|
|
2c6b327be6 | ||
|
|
be5bbaecad | ||
|
|
d33b3c7457 | ||
|
|
ffeede085e | ||
|
|
bdca5b500b | ||
|
|
f4b03ddd7b | ||
|
|
08b19f001c | ||
|
|
1a45b2ec90 | ||
|
|
13e38a58a1 | ||
|
|
2edd59aefb |
4
Cargo.lock
generated
4
Cargo.lock
generated
@@ -4294,6 +4294,7 @@ dependencies = [
|
||||
"humantime-serde",
|
||||
"pageserver_api",
|
||||
"pageserver_client",
|
||||
"pageserver_client_grpc",
|
||||
"pageserver_page_api",
|
||||
"rand 0.8.5",
|
||||
"reqwest",
|
||||
@@ -4323,6 +4324,7 @@ dependencies = [
|
||||
"pageserver_api",
|
||||
"postgres_ffi",
|
||||
"remote_storage",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"svg_fmt",
|
||||
"thiserror 1.0.69",
|
||||
@@ -4499,6 +4501,7 @@ name = "pageserver_client_grpc"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"arc-swap",
|
||||
"bytes",
|
||||
"compute_api",
|
||||
"futures",
|
||||
@@ -4506,6 +4509,7 @@ dependencies = [
|
||||
"pageserver_page_api",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
"tonic 0.13.1",
|
||||
"tracing",
|
||||
"utils",
|
||||
|
||||
@@ -262,6 +262,7 @@ neon-shmem = { version = "0.1", path = "./libs/neon-shmem/" }
|
||||
pageserver = { path = "./pageserver" }
|
||||
pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }
|
||||
pageserver_client = { path = "./pageserver/client" }
|
||||
pageserver_client_grpc = { path = "./pageserver/client_grpc" }
|
||||
pageserver_compaction = { version = "0.1", path = "./pageserver/compaction/" }
|
||||
pageserver_page_api = { path = "./pageserver/page_api" }
|
||||
postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" }
|
||||
|
||||
@@ -1805,6 +1805,8 @@ impl ComputeNode {
|
||||
tls_config,
|
||||
)?;
|
||||
|
||||
self.pg_reload_conf()?;
|
||||
|
||||
if !spec.skip_pg_catalog_updates {
|
||||
let max_concurrent_connections = spec.reconfigure_concurrency;
|
||||
// Temporarily reset max_cluster_size in config
|
||||
@@ -1824,10 +1826,9 @@ impl ComputeNode {
|
||||
|
||||
Ok(())
|
||||
})?;
|
||||
self.pg_reload_conf()?;
|
||||
}
|
||||
|
||||
self.pg_reload_conf()?;
|
||||
|
||||
let unknown_op = "unknown".to_string();
|
||||
let op_id = spec.operation_uuid.as_ref().unwrap_or(&unknown_op);
|
||||
info!(
|
||||
|
||||
@@ -108,7 +108,7 @@ pub(crate) static LFC_PREWARMS: Lazy<IntCounter> = Lazy::new(|| {
|
||||
pub(crate) static LFC_PREWARM_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"compute_ctl_lfc_prewarm_errors_total",
|
||||
"Total number of LFC prewarms errors requested by compute_ctl or autoprewarm option",
|
||||
"Total number of LFC prewarm errors",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
@@ -124,7 +124,7 @@ pub(crate) static LFC_OFFLOADS: Lazy<IntCounter> = Lazy::new(|| {
|
||||
pub(crate) static LFC_OFFLOAD_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"compute_ctl_lfc_offload_errors_total",
|
||||
"Total number of LFC offload errors requested by compute_ctl or lfc_offload_period_seconds option",
|
||||
"Total number of LFC offload errors",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
GRANT pg_signal_backend TO neon_superuser WITH ADMIN OPTION;
|
||||
@@ -0,0 +1,23 @@
|
||||
DO $$
|
||||
DECLARE
|
||||
signal_backend record;
|
||||
BEGIN
|
||||
SELECT pg_has_role('neon_superuser', 'pg_signal_backend', 'member') AS member,
|
||||
admin_option AS admin
|
||||
INTO signal_backend
|
||||
FROM pg_auth_members
|
||||
WHERE roleid = 'pg_signal_backend'::regrole
|
||||
AND member = 'neon_superuser'::regrole;
|
||||
|
||||
IF signal_backend IS NULL THEN
|
||||
RAISE EXCEPTION 'no entry in pg_auth_members for neon_superuser and pg_signal_backend';
|
||||
END IF;
|
||||
|
||||
IF signal_backend.member IS NULL OR NOT signal_backend.member THEN
|
||||
RAISE EXCEPTION 'neon_superuser is not a member of pg_signal_backend';
|
||||
END IF;
|
||||
|
||||
IF signal_backend.admin IS NULL OR NOT signal_backend.admin THEN
|
||||
RAISE EXCEPTION 'neon_superuser cannot grant pg_signal_backend';
|
||||
END IF;
|
||||
END $$;
|
||||
@@ -197,6 +197,7 @@ pub async fn handle_migrations(client: &mut Client) -> Result<()> {
|
||||
include_str!(
|
||||
"./migrations/0011-grant_pg_show_replication_origin_status_to_neon_superuser.sql"
|
||||
),
|
||||
include_str!("./migrations/0012-grant_pg_signal_backend_to_neon_superuser.sql"),
|
||||
];
|
||||
|
||||
MigrationRunner::new(client, &migrations)
|
||||
|
||||
@@ -452,6 +452,12 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<usize>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'image_creation_threshold' as non zero integer")?,
|
||||
// HADRON
|
||||
image_layer_force_creation_period: settings
|
||||
.remove("image_layer_force_creation_period")
|
||||
.map(humantime::parse_duration)
|
||||
.transpose()
|
||||
.context("Failed to parse 'image_layer_force_creation_period' as duration")?,
|
||||
image_layer_creation_check_threshold: settings
|
||||
.remove("image_layer_creation_check_threshold")
|
||||
.map(|x| x.parse::<u8>())
|
||||
|
||||
@@ -20,6 +20,7 @@ use tokio_stream::wrappers::ReceiverStream;
|
||||
use tokio_util::io::ReaderStream;
|
||||
use tracing::{Instrument, debug, info, info_span, warn};
|
||||
use utils::auth::{AuthError, Claims, SwappableJwtAuth};
|
||||
use utils::metrics_collector::{METRICS_COLLECTOR, METRICS_STALE_MILLIS};
|
||||
|
||||
use crate::error::{ApiError, api_error_handler, route_error_handler};
|
||||
use crate::request::{get_query_param, parse_query_param};
|
||||
@@ -250,9 +251,28 @@ impl std::io::Write for ChannelWriter {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<Body>, ApiError> {
|
||||
pub async fn prometheus_metrics_handler(
|
||||
req: Request<Body>,
|
||||
force_metric_collection_on_scrape: bool,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
SERVE_METRICS_COUNT.inc();
|
||||
|
||||
// HADRON
|
||||
let requested_use_latest = parse_query_param(&req, "use_latest")?;
|
||||
|
||||
let use_latest = match requested_use_latest {
|
||||
None => force_metric_collection_on_scrape,
|
||||
Some(true) => true,
|
||||
Some(false) => {
|
||||
if force_metric_collection_on_scrape {
|
||||
// We don't cache in this case
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let started_at = std::time::Instant::now();
|
||||
|
||||
let (tx, rx) = mpsc::channel(1);
|
||||
@@ -277,12 +297,18 @@ pub async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<
|
||||
|
||||
let _span = span.entered();
|
||||
|
||||
let metrics = metrics::gather();
|
||||
// HADRON
|
||||
let collected = if use_latest {
|
||||
// Skip caching the results if we always force metric collection on scrape.
|
||||
METRICS_COLLECTOR.run_once(!force_metric_collection_on_scrape)
|
||||
} else {
|
||||
METRICS_COLLECTOR.last_collected()
|
||||
};
|
||||
|
||||
let gathered_at = std::time::Instant::now();
|
||||
|
||||
let res = encoder
|
||||
.encode(&metrics, &mut writer)
|
||||
.encode(&collected.metrics, &mut writer)
|
||||
.and_then(|_| writer.flush().map_err(|e| e.into()));
|
||||
|
||||
// this instant is not when we finally got the full response sent, sending is done by hyper
|
||||
@@ -295,6 +321,10 @@ pub async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<
|
||||
let encoded_in = encoded_at - gathered_at - writer.wait_time();
|
||||
let total = encoded_at - started_at;
|
||||
|
||||
// HADRON
|
||||
let staleness_ms = (encoded_at - collected.collected_at).as_millis();
|
||||
METRICS_STALE_MILLIS.set(staleness_ms as i64);
|
||||
|
||||
match res {
|
||||
Ok(()) => {
|
||||
tracing::info!(
|
||||
@@ -303,6 +333,7 @@ pub async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<
|
||||
spawning_ms = spawned_in.as_millis(),
|
||||
collection_ms = collected_in.as_millis(),
|
||||
encoding_ms = encoded_in.as_millis(),
|
||||
stalenss_ms = staleness_ms,
|
||||
"responded /metrics"
|
||||
);
|
||||
}
|
||||
|
||||
@@ -272,6 +272,9 @@ pub struct ConfigToml {
|
||||
pub timeline_import_config: TimelineImportConfig,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub basebackup_cache_config: Option<BasebackupCacheConfig>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub image_layer_generation_large_timeline_threshold: Option<u64>,
|
||||
pub force_metric_collection_on_scrape: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
@@ -561,6 +564,11 @@ pub struct TenantConfigToml {
|
||||
pub gc_period: Duration,
|
||||
// Delta layer churn threshold to create L1 image layers.
|
||||
pub image_creation_threshold: usize,
|
||||
// HADRON
|
||||
// When the timeout is reached, PageServer will (1) force compact any remaining L0 deltas and
|
||||
// (2) create image layers if there are any L1 deltas.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub image_layer_force_creation_period: Option<Duration>,
|
||||
// Determines how much history is retained, to allow
|
||||
// branching and read replicas at an older point in time.
|
||||
// The unit is time.
|
||||
@@ -823,6 +831,8 @@ impl Default for ConfigToml {
|
||||
},
|
||||
basebackup_cache_config: None,
|
||||
posthog_config: None,
|
||||
image_layer_generation_large_timeline_threshold: Some(2 * 1024 * 1024 * 1024),
|
||||
force_metric_collection_on_scrape: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -916,6 +926,7 @@ impl Default for TenantConfigToml {
|
||||
gc_period: humantime::parse_duration(DEFAULT_GC_PERIOD)
|
||||
.expect("cannot parse default gc period"),
|
||||
image_creation_threshold: DEFAULT_IMAGE_CREATION_THRESHOLD,
|
||||
image_layer_force_creation_period: None,
|
||||
pitr_interval: humantime::parse_duration(DEFAULT_PITR_INTERVAL)
|
||||
.expect("cannot parse default PITR interval"),
|
||||
walreceiver_connect_timeout: humantime::parse_duration(
|
||||
|
||||
@@ -597,6 +597,9 @@ pub struct TenantConfigPatch {
|
||||
pub gc_period: FieldPatch<String>,
|
||||
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
|
||||
pub image_creation_threshold: FieldPatch<usize>,
|
||||
// HADRON
|
||||
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
|
||||
pub image_layer_force_creation_period: FieldPatch<String>,
|
||||
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
|
||||
pub pitr_interval: FieldPatch<String>,
|
||||
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
|
||||
@@ -700,6 +703,11 @@ pub struct TenantConfig {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub image_creation_threshold: Option<usize>,
|
||||
|
||||
// HADRON
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub image_layer_force_creation_period: Option<Duration>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub pitr_interval: Option<Duration>,
|
||||
@@ -798,6 +806,7 @@ impl TenantConfig {
|
||||
mut gc_horizon,
|
||||
mut gc_period,
|
||||
mut image_creation_threshold,
|
||||
mut image_layer_force_creation_period,
|
||||
mut pitr_interval,
|
||||
mut walreceiver_connect_timeout,
|
||||
mut lagging_wal_timeout,
|
||||
@@ -861,6 +870,11 @@ impl TenantConfig {
|
||||
patch
|
||||
.image_creation_threshold
|
||||
.apply(&mut image_creation_threshold);
|
||||
// HADRON
|
||||
patch
|
||||
.image_layer_force_creation_period
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
.apply(&mut image_layer_force_creation_period);
|
||||
patch
|
||||
.pitr_interval
|
||||
.map(|v| humantime::parse_duration(&v))?
|
||||
@@ -942,6 +956,7 @@ impl TenantConfig {
|
||||
gc_horizon,
|
||||
gc_period,
|
||||
image_creation_threshold,
|
||||
image_layer_force_creation_period,
|
||||
pitr_interval,
|
||||
walreceiver_connect_timeout,
|
||||
lagging_wal_timeout,
|
||||
@@ -1016,6 +1031,9 @@ impl TenantConfig {
|
||||
image_creation_threshold: self
|
||||
.image_creation_threshold
|
||||
.unwrap_or(global_conf.image_creation_threshold),
|
||||
image_layer_force_creation_period: self
|
||||
.image_layer_force_creation_period
|
||||
.or(global_conf.image_layer_force_creation_period),
|
||||
pitr_interval: self.pitr_interval.unwrap_or(global_conf.pitr_interval),
|
||||
walreceiver_connect_timeout: self
|
||||
.walreceiver_connect_timeout
|
||||
|
||||
@@ -99,6 +99,8 @@ pub mod elapsed_accum;
|
||||
#[cfg(target_os = "linux")]
|
||||
pub mod linux_socket_ioctl;
|
||||
|
||||
pub mod metrics_collector;
|
||||
|
||||
// Re-export used in macro. Avoids adding git-version as dep in target crates.
|
||||
#[doc(hidden)]
|
||||
pub use git_version;
|
||||
|
||||
75
libs/utils/src/metrics_collector.rs
Normal file
75
libs/utils/src/metrics_collector.rs
Normal file
@@ -0,0 +1,75 @@
|
||||
use std::{
|
||||
sync::{Arc, RwLock},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use metrics::{IntGauge, proto::MetricFamily, register_int_gauge};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
pub static METRICS_STALE_MILLIS: Lazy<IntGauge> = Lazy::new(|| {
|
||||
register_int_gauge!(
|
||||
"metrics_metrics_stale_milliseconds",
|
||||
"The current metrics stale time in milliseconds"
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct CollectedMetrics {
|
||||
pub metrics: Vec<MetricFamily>,
|
||||
pub collected_at: Instant,
|
||||
}
|
||||
|
||||
impl CollectedMetrics {
|
||||
fn new(metrics: Vec<MetricFamily>) -> Self {
|
||||
Self {
|
||||
metrics,
|
||||
collected_at: Instant::now(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct MetricsCollector {
|
||||
last_collected: RwLock<Arc<CollectedMetrics>>,
|
||||
}
|
||||
|
||||
impl MetricsCollector {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
last_collected: RwLock::new(Arc::new(CollectedMetrics::new(vec![]))),
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(name = "metrics_collector", skip_all)]
|
||||
pub fn run_once(&self, cache_metrics: bool) -> Arc<CollectedMetrics> {
|
||||
let started = Instant::now();
|
||||
let metrics = metrics::gather();
|
||||
let collected = Arc::new(CollectedMetrics::new(metrics));
|
||||
if cache_metrics {
|
||||
let mut guard = self.last_collected.write().unwrap();
|
||||
*guard = collected.clone();
|
||||
}
|
||||
tracing::info!(
|
||||
"Collected {} metric families in {} ms",
|
||||
collected.metrics.len(),
|
||||
started.elapsed().as_millis()
|
||||
);
|
||||
collected
|
||||
}
|
||||
|
||||
pub fn last_collected(&self) -> Arc<CollectedMetrics> {
|
||||
self.last_collected.read().unwrap().clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for MetricsCollector {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
// Interval for metrics collection. Currently hard-coded to be the same as the metrics scape interval from the obs agent
|
||||
pub static METRICS_COLLECTION_INTERVAL: Duration = Duration::from_secs(30);
|
||||
|
||||
pub static METRICS_COLLECTOR: Lazy<MetricsCollector> = Lazy::new(MetricsCollector::default);
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::error::Error as _;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -251,6 +251,70 @@ impl Client {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn tenant_timeline_compact(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
force_image_layer_creation: bool,
|
||||
must_force_image_layer_creation: bool,
|
||||
scheduled: bool,
|
||||
wait_until_done: bool,
|
||||
) -> Result<()> {
|
||||
let mut path = reqwest::Url::parse(&format!(
|
||||
"{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/compact",
|
||||
self.mgmt_api_endpoint
|
||||
))
|
||||
.expect("Cannot build URL");
|
||||
|
||||
if force_image_layer_creation {
|
||||
path.query_pairs_mut()
|
||||
.append_pair("force_image_layer_creation", "true");
|
||||
}
|
||||
|
||||
if must_force_image_layer_creation {
|
||||
path.query_pairs_mut()
|
||||
.append_pair("must_force_image_layer_creation", "true");
|
||||
}
|
||||
|
||||
if scheduled {
|
||||
path.query_pairs_mut().append_pair("scheduled", "true");
|
||||
}
|
||||
if wait_until_done {
|
||||
path.query_pairs_mut()
|
||||
.append_pair("wait_until_scheduled_compaction_done", "true");
|
||||
path.query_pairs_mut()
|
||||
.append_pair("wait_until_uploaded", "true");
|
||||
}
|
||||
self.request(Method::PUT, path, ()).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/* BEGIN_HADRON */
|
||||
pub async fn tenant_timeline_describe(
|
||||
&self,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
) -> Result<TimelineInfo> {
|
||||
let mut path = reqwest::Url::parse(&format!(
|
||||
"{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}",
|
||||
self.mgmt_api_endpoint
|
||||
))
|
||||
.expect("Cannot build URL");
|
||||
path.query_pairs_mut()
|
||||
.append_pair("include-image-consistent-lsn", "true");
|
||||
|
||||
let response: reqwest::Response = self.request(Method::GET, path, ()).await?;
|
||||
let body = response.json().await.map_err(Error::ReceiveBody)?;
|
||||
Ok(body)
|
||||
}
|
||||
|
||||
pub async fn list_tenant_visible_size(&self) -> Result<BTreeMap<TenantShardId, u64>> {
|
||||
let uri = format!("{}/v1/list_tenant_visible_size", self.mgmt_api_endpoint);
|
||||
let resp = self.get(&uri).await?;
|
||||
resp.json().await.map_err(Error::ReceiveBody)
|
||||
}
|
||||
/* END_HADRON */
|
||||
|
||||
pub async fn tenant_scan_remote_storage(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
|
||||
@@ -4,8 +4,12 @@ version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[features]
|
||||
testing = ["pageserver_api/testing"]
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
arc-swap.workspace = true
|
||||
bytes.workspace = true
|
||||
compute_api.workspace = true
|
||||
futures.workspace = true
|
||||
@@ -13,6 +17,7 @@ pageserver_api.workspace = true
|
||||
pageserver_page_api.workspace = true
|
||||
tokio.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
tokio-util.workspace = true
|
||||
tonic.workspace = true
|
||||
tracing.workspace = true
|
||||
utils.workspace = true
|
||||
|
||||
@@ -3,8 +3,10 @@ use std::num::NonZero;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::anyhow;
|
||||
use arc_swap::ArcSwap;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::{FutureExt as _, StreamExt as _};
|
||||
use tonic::codec::CompressionEncoding;
|
||||
use tracing::instrument;
|
||||
|
||||
use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool};
|
||||
@@ -55,28 +57,85 @@ const MAX_BULK_STREAM_QUEUE_DEPTH: NonZero<usize> = NonZero::new(4).unwrap();
|
||||
/// TODO: this client does not support base backups or LSN leases, as these are only used by
|
||||
/// compute_ctl. Consider adding this, but LSN leases need concurrent requests on all shards.
|
||||
pub struct PageserverClient {
|
||||
// TODO: support swapping out the shard map, e.g. via an ArcSwap.
|
||||
shards: Shards,
|
||||
/// The tenant ID.
|
||||
tenant_id: TenantId,
|
||||
/// The timeline ID.
|
||||
timeline_id: TimelineId,
|
||||
/// The JWT auth token for this tenant, if any.
|
||||
auth_token: Option<String>,
|
||||
/// The compression to use, if any.
|
||||
compression: Option<CompressionEncoding>,
|
||||
/// The shards for this tenant.
|
||||
shards: ArcSwap<Shards>,
|
||||
/// The retry configuration.
|
||||
retry: Retry,
|
||||
}
|
||||
|
||||
impl PageserverClient {
|
||||
/// Creates a new Pageserver client for a given tenant and timeline. Uses the Pageservers given
|
||||
/// in the shard map, which must be complete and must use gRPC URLs.
|
||||
/// in the shard spec, which must be complete and must use gRPC URLs.
|
||||
pub fn new(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
shard_map: HashMap<ShardIndex, String>,
|
||||
stripe_size: ShardStripeSize,
|
||||
shard_spec: ShardSpec,
|
||||
auth_token: Option<String>,
|
||||
compression: Option<CompressionEncoding>,
|
||||
) -> anyhow::Result<Self> {
|
||||
let shards = Shards::new(tenant_id, timeline_id, shard_map, stripe_size, auth_token)?;
|
||||
let shards = Shards::new(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
shard_spec,
|
||||
auth_token.clone(),
|
||||
compression,
|
||||
)?;
|
||||
Ok(Self {
|
||||
shards,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
auth_token,
|
||||
compression,
|
||||
shards: ArcSwap::new(Arc::new(shards)),
|
||||
retry: Retry,
|
||||
})
|
||||
}
|
||||
|
||||
/// Updates the shards from the given shard spec. In-flight requests will complete using the
|
||||
/// existing shards, but may retry with the new shards if they fail.
|
||||
///
|
||||
/// TODO: verify that in-flight requests are allowed to complete, and that the old pools are
|
||||
/// properly spun down and dropped afterwards.
|
||||
pub fn update_shards(&self, shard_spec: ShardSpec) -> anyhow::Result<()> {
|
||||
// Validate the shard spec. We should really use `ArcSwap::rcu` for this, to avoid races
|
||||
// with concurrent updates, but that involves creating a new `Shards` on every attempt,
|
||||
// which spins up a bunch of Tokio tasks and such. These should already be checked elsewhere
|
||||
// in the stack, and if they're violated then we already have problems elsewhere, so a
|
||||
// best-effort but possibly-racy check is okay here.
|
||||
let old = self.shards.load_full();
|
||||
if shard_spec.count < old.count {
|
||||
return Err(anyhow!(
|
||||
"can't reduce shard count from {} to {}",
|
||||
old.count,
|
||||
shard_spec.count
|
||||
));
|
||||
}
|
||||
if !old.count.is_unsharded() && shard_spec.stripe_size != old.stripe_size {
|
||||
return Err(anyhow!(
|
||||
"can't change stripe size from {} to {}",
|
||||
old.stripe_size,
|
||||
shard_spec.stripe_size
|
||||
));
|
||||
}
|
||||
|
||||
let shards = Shards::new(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
shard_spec,
|
||||
self.auth_token.clone(),
|
||||
self.compression,
|
||||
)?;
|
||||
self.shards.store(Arc::new(shards));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns whether a relation exists.
|
||||
#[instrument(skip_all, fields(rel=%req.rel, lsn=%req.read_lsn))]
|
||||
pub async fn check_rel_exists(
|
||||
@@ -86,7 +145,7 @@ impl PageserverClient {
|
||||
self.retry
|
||||
.with(async || {
|
||||
// Relation metadata is only available on shard 0.
|
||||
let mut client = self.shards.get_zero().client().await?;
|
||||
let mut client = self.shards.load_full().get_zero().client().await?;
|
||||
client.check_rel_exists(req).await
|
||||
})
|
||||
.await
|
||||
@@ -101,7 +160,7 @@ impl PageserverClient {
|
||||
self.retry
|
||||
.with(async || {
|
||||
// Relation metadata is only available on shard 0.
|
||||
let mut client = self.shards.get_zero().client().await?;
|
||||
let mut client = self.shards.load_full().get_zero().client().await?;
|
||||
client.get_db_size(req).await
|
||||
})
|
||||
.await
|
||||
@@ -129,28 +188,42 @@ impl PageserverClient {
|
||||
return Err(tonic::Status::invalid_argument("no block number"));
|
||||
}
|
||||
|
||||
// The shards may change while we're fetching pages. We execute the request using a stable
|
||||
// view of the shards (especially important for requests that span shards), but retry the
|
||||
// top-level (pre-split) request to pick up shard changes. This can lead to unnecessary
|
||||
// retries and re-splits in some cases where requests span shards, but these are expected to
|
||||
// be rare.
|
||||
//
|
||||
// TODO: the gRPC server and client doesn't yet properly support shard splits. Revisit this
|
||||
// once we figure out how to handle these.
|
||||
self.retry
|
||||
.with(async || Self::get_page_with_shards(req.clone(), &self.shards.load_full()).await)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Fetches pages using the given shards. This uses a stable view of the shards, regardless of
|
||||
/// concurrent shard updates. Does not retry internally, but is retried by `get_page()`.
|
||||
async fn get_page_with_shards(
|
||||
req: page_api::GetPageRequest,
|
||||
shards: &Shards,
|
||||
) -> tonic::Result<page_api::GetPageResponse> {
|
||||
// Fast path: request is for a single shard.
|
||||
if let Some(shard_id) =
|
||||
GetPageSplitter::is_single_shard(&req, self.shards.count, self.shards.stripe_size)
|
||||
GetPageSplitter::is_single_shard(&req, shards.count, shards.stripe_size)
|
||||
{
|
||||
return self.get_page_for_shard(shard_id, req).await;
|
||||
return Self::get_page_with_shard(req, shards.get(shard_id)?).await;
|
||||
}
|
||||
|
||||
// Request spans multiple shards. Split it, dispatch concurrent per-shard requests, and
|
||||
// reassemble the responses.
|
||||
//
|
||||
// TODO: when we support shard map updates, we need to detect when it changes and re-split
|
||||
// the request on errors.
|
||||
let mut splitter = GetPageSplitter::split(req, self.shards.count, self.shards.stripe_size);
|
||||
let mut splitter = GetPageSplitter::split(req, shards.count, shards.stripe_size);
|
||||
|
||||
let mut shard_requests: FuturesUnordered<_> = splitter
|
||||
.drain_requests()
|
||||
.map(|(shard_id, shard_req)| {
|
||||
// NB: each request will retry internally.
|
||||
self.get_page_for_shard(shard_id, shard_req)
|
||||
.map(move |result| result.map(|resp| (shard_id, resp)))
|
||||
})
|
||||
.collect();
|
||||
let mut shard_requests = FuturesUnordered::new();
|
||||
for (shard_id, shard_req) in splitter.drain_requests() {
|
||||
let future = Self::get_page_with_shard(shard_req, shards.get(shard_id)?)
|
||||
.map(move |result| result.map(|resp| (shard_id, resp)));
|
||||
shard_requests.push(future);
|
||||
}
|
||||
|
||||
while let Some((shard_id, shard_response)) = shard_requests.next().await.transpose()? {
|
||||
splitter.add_response(shard_id, shard_response)?;
|
||||
@@ -159,41 +232,28 @@ impl PageserverClient {
|
||||
splitter.assemble_response()
|
||||
}
|
||||
|
||||
/// Fetches pages that belong to the given shard.
|
||||
#[instrument(skip_all, fields(shard = %shard_id))]
|
||||
async fn get_page_for_shard(
|
||||
&self,
|
||||
shard_id: ShardIndex,
|
||||
/// Fetches pages on the given shard. Does not retry internally.
|
||||
async fn get_page_with_shard(
|
||||
req: page_api::GetPageRequest,
|
||||
shard: &Shard,
|
||||
) -> tonic::Result<page_api::GetPageResponse> {
|
||||
let resp = self
|
||||
.retry
|
||||
.with(async || {
|
||||
let stream = self
|
||||
.shards
|
||||
.get(shard_id)?
|
||||
.stream(req.request_class.is_bulk())
|
||||
.await;
|
||||
let resp = stream.send(req.clone()).await?;
|
||||
let expected = req.block_numbers.len();
|
||||
let stream = shard.stream(req.request_class.is_bulk()).await;
|
||||
let resp = stream.send(req).await?;
|
||||
|
||||
// Convert per-request errors into a tonic::Status.
|
||||
if resp.status_code != page_api::GetPageStatusCode::Ok {
|
||||
return Err(tonic::Status::new(
|
||||
resp.status_code.into(),
|
||||
resp.reason.unwrap_or_else(|| String::from("unknown error")),
|
||||
));
|
||||
}
|
||||
// Convert per-request errors into a tonic::Status.
|
||||
if resp.status_code != page_api::GetPageStatusCode::Ok {
|
||||
return Err(tonic::Status::new(
|
||||
resp.status_code.into(),
|
||||
resp.reason.unwrap_or_else(|| String::from("unknown error")),
|
||||
));
|
||||
}
|
||||
|
||||
Ok(resp)
|
||||
})
|
||||
.await?;
|
||||
|
||||
// Make sure we got the right number of pages.
|
||||
// NB: check outside of the retry loop, since we don't want to retry this.
|
||||
let (expected, actual) = (req.block_numbers.len(), resp.page_images.len());
|
||||
// Check that we received the expected number of pages.
|
||||
let actual = resp.page_images.len();
|
||||
if expected != actual {
|
||||
return Err(tonic::Status::internal(format!(
|
||||
"expected {expected} pages for shard {shard_id}, got {actual}",
|
||||
"expected {expected} pages, got {actual}",
|
||||
)));
|
||||
}
|
||||
|
||||
@@ -209,7 +269,7 @@ impl PageserverClient {
|
||||
self.retry
|
||||
.with(async || {
|
||||
// Relation metadata is only available on shard 0.
|
||||
let mut client = self.shards.get_zero().client().await?;
|
||||
let mut client = self.shards.load_full().get_zero().client().await?;
|
||||
client.get_rel_size(req).await
|
||||
})
|
||||
.await
|
||||
@@ -224,48 +284,51 @@ impl PageserverClient {
|
||||
self.retry
|
||||
.with(async || {
|
||||
// SLRU segments are only available on shard 0.
|
||||
let mut client = self.shards.get_zero().client().await?;
|
||||
let mut client = self.shards.load_full().get_zero().client().await?;
|
||||
client.get_slru_segment(req).await
|
||||
})
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
/// Tracks the tenant's shards.
|
||||
struct Shards {
|
||||
/// Shard specification for a PageserverClient.
|
||||
pub struct ShardSpec {
|
||||
/// Maps shard indices to gRPC URLs.
|
||||
///
|
||||
/// INVARIANT: every shard 0..count is present, and shard 0 is always present.
|
||||
/// INVARIANT: every URL is valid and uses grpc:// scheme.
|
||||
urls: HashMap<ShardIndex, String>,
|
||||
/// The shard count.
|
||||
///
|
||||
/// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention.
|
||||
count: ShardCount,
|
||||
/// The stripe size. Only used for sharded tenants.
|
||||
/// The stripe size for these shards.
|
||||
stripe_size: ShardStripeSize,
|
||||
/// Shards by shard index.
|
||||
///
|
||||
/// NB: unsharded tenants use count 0, like `ShardIndex::unsharded()`.
|
||||
///
|
||||
/// INVARIANT: every shard 0..count is present.
|
||||
/// INVARIANT: shard 0 is always present.
|
||||
map: HashMap<ShardIndex, Shard>,
|
||||
}
|
||||
|
||||
impl Shards {
|
||||
/// Creates a new set of shards based on a shard map.
|
||||
fn new(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
shard_map: HashMap<ShardIndex, String>,
|
||||
stripe_size: ShardStripeSize,
|
||||
auth_token: Option<String>,
|
||||
impl ShardSpec {
|
||||
/// Creates a new shard spec with the given URLs and stripe size. All shards must be given.
|
||||
/// The stripe size may be omitted for unsharded tenants.
|
||||
pub fn new(
|
||||
urls: HashMap<ShardIndex, String>,
|
||||
stripe_size: Option<ShardStripeSize>,
|
||||
) -> anyhow::Result<Self> {
|
||||
let count = match shard_map.len() {
|
||||
// Compute the shard count.
|
||||
let count = match urls.len() {
|
||||
0 => return Err(anyhow!("no shards provided")),
|
||||
1 => ShardCount::new(0), // NB: unsharded tenants use 0, like `ShardIndex::unsharded()`
|
||||
n if n > u8::MAX as usize => return Err(anyhow!("too many shards: {n}")),
|
||||
n => ShardCount::new(n as u8),
|
||||
};
|
||||
|
||||
let mut map = HashMap::new();
|
||||
for (shard_id, url) in shard_map {
|
||||
// Determine the stripe size. It doesn't matter for unsharded tenants.
|
||||
if stripe_size.is_none() && !count.is_unsharded() {
|
||||
return Err(anyhow!("stripe size must be given for sharded tenants"));
|
||||
}
|
||||
let stripe_size = stripe_size.unwrap_or_default();
|
||||
|
||||
// Validate the shard spec.
|
||||
for (shard_id, url) in &urls {
|
||||
// The shard index must match the computed shard count, even for unsharded tenants.
|
||||
if shard_id.shard_count != count {
|
||||
return Err(anyhow!("invalid shard index {shard_id}, expected {count}"));
|
||||
@@ -276,21 +339,72 @@ impl Shards {
|
||||
}
|
||||
// The above conditions guarantee that we have all shards 0..count: len() matches count,
|
||||
// shard number < count, and numbers are unique (via hashmap).
|
||||
let shard = Shard::new(url, tenant_id, timeline_id, shard_id, auth_token.clone())?;
|
||||
map.insert(shard_id, shard);
|
||||
|
||||
// Validate the URL.
|
||||
if PageserverProtocol::from_connstring(url)? != PageserverProtocol::Grpc {
|
||||
return Err(anyhow!("invalid shard URL {url}: must use gRPC"));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
urls,
|
||||
count,
|
||||
stripe_size,
|
||||
map,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Tracks the tenant's shards.
|
||||
struct Shards {
|
||||
/// Shards by shard index.
|
||||
///
|
||||
/// INVARIANT: every shard 0..count is present.
|
||||
/// INVARIANT: shard 0 is always present.
|
||||
by_index: HashMap<ShardIndex, Shard>,
|
||||
/// The shard count.
|
||||
///
|
||||
/// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention.
|
||||
count: ShardCount,
|
||||
/// The stripe size. Only used for sharded tenants.
|
||||
stripe_size: ShardStripeSize,
|
||||
}
|
||||
|
||||
impl Shards {
|
||||
/// Creates a new set of shards based on a shard spec.
|
||||
fn new(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
shard_spec: ShardSpec,
|
||||
auth_token: Option<String>,
|
||||
compression: Option<CompressionEncoding>,
|
||||
) -> anyhow::Result<Self> {
|
||||
// NB: the shard spec has already been validated when constructed.
|
||||
let mut shards = HashMap::with_capacity(shard_spec.urls.len());
|
||||
for (shard_id, url) in shard_spec.urls {
|
||||
shards.insert(
|
||||
shard_id,
|
||||
Shard::new(
|
||||
url,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
shard_id,
|
||||
auth_token.clone(),
|
||||
compression,
|
||||
)?,
|
||||
);
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
by_index: shards,
|
||||
count: shard_spec.count,
|
||||
stripe_size: shard_spec.stripe_size,
|
||||
})
|
||||
}
|
||||
|
||||
/// Looks up the given shard.
|
||||
#[allow(clippy::result_large_err)] // TODO: check perf impact
|
||||
fn get(&self, shard_id: ShardIndex) -> tonic::Result<&Shard> {
|
||||
self.map
|
||||
self.by_index
|
||||
.get(&shard_id)
|
||||
.ok_or_else(|| tonic::Status::not_found(format!("unknown shard {shard_id}")))
|
||||
}
|
||||
@@ -328,12 +442,8 @@ impl Shard {
|
||||
timeline_id: TimelineId,
|
||||
shard_id: ShardIndex,
|
||||
auth_token: Option<String>,
|
||||
compression: Option<CompressionEncoding>,
|
||||
) -> anyhow::Result<Self> {
|
||||
// Sanity-check that the URL uses gRPC.
|
||||
if PageserverProtocol::from_connstring(&url)? != PageserverProtocol::Grpc {
|
||||
return Err(anyhow!("invalid shard URL {url}: must use gRPC"));
|
||||
}
|
||||
|
||||
// Common channel pool for unary and stream requests. Bounded by client/stream pools.
|
||||
let channel_pool = ChannelPool::new(url.clone(), MAX_CLIENTS_PER_CHANNEL)?;
|
||||
|
||||
@@ -344,6 +454,7 @@ impl Shard {
|
||||
timeline_id,
|
||||
shard_id,
|
||||
auth_token.clone(),
|
||||
compression,
|
||||
Some(MAX_UNARY_CLIENTS),
|
||||
);
|
||||
|
||||
@@ -356,6 +467,7 @@ impl Shard {
|
||||
timeline_id,
|
||||
shard_id,
|
||||
auth_token.clone(),
|
||||
compression,
|
||||
None, // unbounded, limited by stream pool
|
||||
),
|
||||
Some(MAX_STREAMS),
|
||||
@@ -371,6 +483,7 @@ impl Shard {
|
||||
timeline_id,
|
||||
shard_id,
|
||||
auth_token,
|
||||
compression,
|
||||
None, // unbounded, limited by stream pool
|
||||
),
|
||||
Some(MAX_BULK_STREAMS),
|
||||
|
||||
@@ -3,4 +3,4 @@ mod pool;
|
||||
mod retry;
|
||||
mod split;
|
||||
|
||||
pub use client::PageserverClient;
|
||||
pub use client::{PageserverClient, ShardSpec};
|
||||
|
||||
@@ -34,10 +34,13 @@ use std::num::NonZero;
|
||||
use std::ops::{Deref, DerefMut};
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex, Weak};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use futures::StreamExt as _;
|
||||
use tokio::sync::mpsc::{Receiver, Sender};
|
||||
use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc, oneshot};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tonic::codec::CompressionEncoding;
|
||||
use tonic::transport::{Channel, Endpoint};
|
||||
use tracing::{error, warn};
|
||||
|
||||
@@ -45,6 +48,25 @@ use pageserver_page_api as page_api;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::shard::ShardIndex;
|
||||
|
||||
/// Reap channels/clients/streams that have been idle for this long.
|
||||
///
|
||||
/// TODO: this is per-pool. For nested pools, it can take up to 3x as long for a TCP connection to
|
||||
/// be reaped. First, we must wait for an idle stream to be reaped, which marks its client as idle.
|
||||
/// Then, we must wait for the idle client to be reaped, which marks its channel as idle. Then, we
|
||||
/// must wait for the idle channel to be reaped. Is that a problem? Maybe not, we just have to
|
||||
/// account for it when setting the reap threshold. Alternatively, we can immediately reap empty
|
||||
/// channels, and/or stream pool clients.
|
||||
const REAP_IDLE_THRESHOLD: Duration = match cfg!(any(test, feature = "testing")) {
|
||||
false => Duration::from_secs(180),
|
||||
true => Duration::from_secs(1), // exercise reaping in tests
|
||||
};
|
||||
|
||||
/// Reap idle resources with this interval.
|
||||
const REAP_IDLE_INTERVAL: Duration = match cfg!(any(test, feature = "testing")) {
|
||||
false => Duration::from_secs(10),
|
||||
true => Duration::from_secs(1), // exercise reaping in tests
|
||||
};
|
||||
|
||||
/// A gRPC channel pool, for a single Pageserver. A channel is shared by many clients (via HTTP/2
|
||||
/// stream multiplexing), up to `clients_per_channel` -- a new channel will be spun up beyond this.
|
||||
/// The pool does not limit the number of channels, and instead relies on `ClientPool` or
|
||||
@@ -52,7 +74,6 @@ use utils::shard::ShardIndex;
|
||||
///
|
||||
/// The pool is always wrapped in an outer `Arc`, to allow long-lived guards across tasks/threads.
|
||||
///
|
||||
/// TODO: reap idle channels.
|
||||
/// TODO: consider prewarming a set of channels, to avoid initial connection latency.
|
||||
/// TODO: consider adding a circuit breaker for errors and fail fast.
|
||||
pub struct ChannelPool {
|
||||
@@ -62,6 +83,8 @@ pub struct ChannelPool {
|
||||
max_clients_per_channel: NonZero<usize>,
|
||||
/// Open channels.
|
||||
channels: Mutex<BTreeMap<ChannelID, ChannelEntry>>,
|
||||
/// Reaps idle channels.
|
||||
idle_reaper: Reaper,
|
||||
/// Channel ID generator.
|
||||
next_channel_id: AtomicUsize,
|
||||
}
|
||||
@@ -73,6 +96,9 @@ struct ChannelEntry {
|
||||
channel: Channel,
|
||||
/// Number of clients using this channel.
|
||||
clients: usize,
|
||||
/// The channel has been idle (no clients) since this time. None if channel is in use.
|
||||
/// INVARIANT: Some if clients == 0, otherwise None.
|
||||
idle_since: Option<Instant>,
|
||||
}
|
||||
|
||||
impl ChannelPool {
|
||||
@@ -82,12 +108,15 @@ impl ChannelPool {
|
||||
E: TryInto<Endpoint> + Send + Sync + 'static,
|
||||
<E as TryInto<Endpoint>>::Error: std::error::Error + Send + Sync,
|
||||
{
|
||||
Ok(Arc::new(Self {
|
||||
let pool = Arc::new(Self {
|
||||
endpoint: endpoint.try_into()?,
|
||||
max_clients_per_channel,
|
||||
channels: Mutex::default(),
|
||||
idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL),
|
||||
next_channel_id: AtomicUsize::default(),
|
||||
}))
|
||||
});
|
||||
pool.idle_reaper.spawn(&pool);
|
||||
Ok(pool)
|
||||
}
|
||||
|
||||
/// Acquires a gRPC channel for a client. Multiple clients may acquire the same channel.
|
||||
@@ -116,8 +145,14 @@ impl ChannelPool {
|
||||
entry.clients <= self.max_clients_per_channel.get(),
|
||||
"channel overflow"
|
||||
);
|
||||
assert_eq!(
|
||||
entry.idle_since.is_some(),
|
||||
entry.clients == 0,
|
||||
"incorrect channel idle state"
|
||||
);
|
||||
if entry.clients < self.max_clients_per_channel.get() {
|
||||
entry.clients += 1;
|
||||
entry.idle_since = None;
|
||||
return ChannelGuard {
|
||||
pool: Arc::downgrade(self),
|
||||
id,
|
||||
@@ -134,6 +169,7 @@ impl ChannelPool {
|
||||
let entry = ChannelEntry {
|
||||
channel: channel.clone(),
|
||||
clients: 1, // account for the guard below
|
||||
idle_since: None,
|
||||
};
|
||||
channels.insert(id, entry);
|
||||
|
||||
@@ -145,6 +181,20 @@ impl ChannelPool {
|
||||
}
|
||||
}
|
||||
|
||||
impl Reapable for ChannelPool {
|
||||
/// Reaps channels that have been idle since before the cutoff.
|
||||
fn reap_idle(&self, cutoff: Instant) {
|
||||
self.channels.lock().unwrap().retain(|_, entry| {
|
||||
let Some(idle_since) = entry.idle_since else {
|
||||
assert_ne!(entry.clients, 0, "empty channel not marked idle");
|
||||
return true;
|
||||
};
|
||||
assert_eq!(entry.clients, 0, "idle channel has clients");
|
||||
idle_since >= cutoff
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Tracks a channel acquired from the pool. The owned inner channel can be obtained with `take()`,
|
||||
/// since the gRPC client requires an owned `Channel`.
|
||||
pub struct ChannelGuard {
|
||||
@@ -167,10 +217,15 @@ impl Drop for ChannelGuard {
|
||||
let Some(pool) = self.pool.upgrade() else {
|
||||
return; // pool was dropped
|
||||
};
|
||||
|
||||
let mut channels = pool.channels.lock().unwrap();
|
||||
let entry = channels.get_mut(&self.id).expect("unknown channel");
|
||||
assert!(entry.idle_since.is_none(), "active channel marked idle");
|
||||
assert!(entry.clients > 0, "channel underflow");
|
||||
entry.clients -= 1;
|
||||
if entry.clients == 0 {
|
||||
entry.idle_since = Some(Instant::now()); // mark channel as idle
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -179,8 +234,6 @@ impl Drop for ChannelGuard {
|
||||
/// number of concurrent clients to `max_clients` via semaphore.
|
||||
///
|
||||
/// The pool is always wrapped in an outer `Arc`, to allow long-lived guards across tasks/threads.
|
||||
///
|
||||
/// TODO: reap idle clients.
|
||||
pub struct ClientPool {
|
||||
/// Tenant ID.
|
||||
tenant_id: TenantId,
|
||||
@@ -190,6 +243,8 @@ pub struct ClientPool {
|
||||
shard_id: ShardIndex,
|
||||
/// Authentication token, if any.
|
||||
auth_token: Option<String>,
|
||||
/// Compression to use.
|
||||
compression: Option<CompressionEncoding>,
|
||||
/// Channel pool to acquire channels from.
|
||||
channel_pool: Arc<ChannelPool>,
|
||||
/// Limits the max number of concurrent clients for this pool. None if the pool is unbounded.
|
||||
@@ -201,6 +256,8 @@ pub struct ClientPool {
|
||||
/// lower-ordered channels. This allows us to free up and reap higher-numbered channels as idle
|
||||
/// clients are reaped.
|
||||
idle: Mutex<BTreeMap<ClientID, ClientEntry>>,
|
||||
/// Reaps idle clients.
|
||||
idle_reaper: Reaper,
|
||||
/// Unique client ID generator.
|
||||
next_client_id: AtomicUsize,
|
||||
}
|
||||
@@ -212,6 +269,9 @@ struct ClientEntry {
|
||||
client: page_api::Client,
|
||||
/// The channel guard for the channel used by the client.
|
||||
channel_guard: ChannelGuard,
|
||||
/// The client has been idle since this time. All clients in `ClientPool::idle` are idle by
|
||||
/// definition, so this is the time when it was added back to the pool.
|
||||
idle_since: Instant,
|
||||
}
|
||||
|
||||
impl ClientPool {
|
||||
@@ -224,18 +284,23 @@ impl ClientPool {
|
||||
timeline_id: TimelineId,
|
||||
shard_id: ShardIndex,
|
||||
auth_token: Option<String>,
|
||||
compression: Option<CompressionEncoding>,
|
||||
max_clients: Option<NonZero<usize>>,
|
||||
) -> Arc<Self> {
|
||||
Arc::new(Self {
|
||||
let pool = Arc::new(Self {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
shard_id,
|
||||
auth_token,
|
||||
compression,
|
||||
channel_pool,
|
||||
idle: Mutex::default(),
|
||||
idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL),
|
||||
limiter: max_clients.map(|max| Arc::new(Semaphore::new(max.get()))),
|
||||
next_client_id: AtomicUsize::default(),
|
||||
})
|
||||
});
|
||||
pool.idle_reaper.spawn(&pool);
|
||||
pool
|
||||
}
|
||||
|
||||
/// Gets a client from the pool, or creates a new one if necessary. Connections are established
|
||||
@@ -271,7 +336,7 @@ impl ClientPool {
|
||||
self.timeline_id,
|
||||
self.shard_id,
|
||||
self.auth_token.clone(),
|
||||
None,
|
||||
self.compression,
|
||||
)?;
|
||||
|
||||
Ok(ClientGuard {
|
||||
@@ -287,6 +352,16 @@ impl ClientPool {
|
||||
}
|
||||
}
|
||||
|
||||
impl Reapable for ClientPool {
|
||||
/// Reaps clients that have been idle since before the cutoff.
|
||||
fn reap_idle(&self, cutoff: Instant) {
|
||||
self.idle
|
||||
.lock()
|
||||
.unwrap()
|
||||
.retain(|_, entry| entry.idle_since >= cutoff)
|
||||
}
|
||||
}
|
||||
|
||||
/// A client acquired from the pool. The inner client can be accessed via Deref. The client is
|
||||
/// returned to the pool when dropped.
|
||||
pub struct ClientGuard {
|
||||
@@ -317,9 +392,11 @@ impl Drop for ClientGuard {
|
||||
let Some(pool) = self.pool.upgrade() else {
|
||||
return; // pool was dropped
|
||||
};
|
||||
|
||||
let entry = ClientEntry {
|
||||
client: self.client.take().expect("dropped once"),
|
||||
channel_guard: self.channel_guard.take().expect("dropped once"),
|
||||
idle_since: Instant::now(),
|
||||
};
|
||||
pool.idle.lock().unwrap().insert(self.id, entry);
|
||||
|
||||
@@ -334,7 +411,6 @@ impl Drop for ClientGuard {
|
||||
/// a single request and await the response. Internally, requests are multiplexed across streams and
|
||||
/// channels. This allows proper queue depth enforcement and response routing.
|
||||
///
|
||||
/// TODO: reap idle streams.
|
||||
/// TODO: consider making this generic over request and response types; not currently needed.
|
||||
pub struct StreamPool {
|
||||
/// The client pool to acquire clients from. Must be unbounded.
|
||||
@@ -344,7 +420,7 @@ pub struct StreamPool {
|
||||
/// Incoming requests will be sent over an existing stream with available capacity. If all
|
||||
/// streams are full, a new one is spun up and added to the pool (up to `max_streams`). Each
|
||||
/// stream has an associated Tokio task that processes requests and responses.
|
||||
streams: Arc<Mutex<HashMap<StreamID, StreamEntry>>>,
|
||||
streams: Mutex<HashMap<StreamID, StreamEntry>>,
|
||||
/// The max number of concurrent streams, or None if unbounded.
|
||||
max_streams: Option<NonZero<usize>>,
|
||||
/// The max number of concurrent requests per stream.
|
||||
@@ -352,6 +428,8 @@ pub struct StreamPool {
|
||||
/// Limits the max number of concurrent requests, given by `max_streams * max_queue_depth`.
|
||||
/// None if the pool is unbounded.
|
||||
limiter: Option<Arc<Semaphore>>,
|
||||
/// Reaps idle streams.
|
||||
idle_reaper: Reaper,
|
||||
/// Stream ID generator.
|
||||
next_stream_id: AtomicUsize,
|
||||
}
|
||||
@@ -364,9 +442,11 @@ type ResponseSender = oneshot::Sender<tonic::Result<page_api::GetPageResponse>>;
|
||||
struct StreamEntry {
|
||||
/// Sends caller requests to the stream task. The stream task exits when this is dropped.
|
||||
sender: RequestSender,
|
||||
/// Number of in-flight requests on this stream. This is an atomic to allow decrementing it on
|
||||
/// completion without acquiring the `StreamPool::streams` lock.
|
||||
queue_depth: Arc<AtomicUsize>,
|
||||
/// Number of in-flight requests on this stream.
|
||||
queue_depth: usize,
|
||||
/// The time when this stream went idle (queue_depth == 0).
|
||||
/// INVARIANT: Some if queue_depth == 0, otherwise None.
|
||||
idle_since: Option<Instant>,
|
||||
}
|
||||
|
||||
impl StreamPool {
|
||||
@@ -383,16 +463,19 @@ impl StreamPool {
|
||||
max_queue_depth: NonZero<usize>,
|
||||
) -> Arc<Self> {
|
||||
assert!(client_pool.limiter.is_none(), "bounded client pool");
|
||||
Arc::new(Self {
|
||||
let pool = Arc::new(Self {
|
||||
client_pool,
|
||||
streams: Arc::default(),
|
||||
streams: Mutex::default(),
|
||||
limiter: max_streams.map(|max_streams| {
|
||||
Arc::new(Semaphore::new(max_streams.get() * max_queue_depth.get()))
|
||||
}),
|
||||
max_streams,
|
||||
max_queue_depth,
|
||||
idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL),
|
||||
next_stream_id: AtomicUsize::default(),
|
||||
})
|
||||
});
|
||||
pool.idle_reaper.spawn(&pool);
|
||||
pool
|
||||
}
|
||||
|
||||
/// Acquires an available stream from the pool, or spins up a new stream async if all streams
|
||||
@@ -412,8 +495,8 @@ impl StreamPool {
|
||||
/// * Allow concurrent clients to join onto streams while they're spun up.
|
||||
/// * Allow spinning up multiple streams concurrently, but don't overshoot limits.
|
||||
///
|
||||
/// For now, we just do something simple and functional, but very inefficient (linear scan).
|
||||
pub async fn get(&self) -> StreamGuard {
|
||||
/// For now, we just do something simple but inefficient (linear scan under mutex).
|
||||
pub async fn get(self: &Arc<Self>) -> StreamGuard {
|
||||
// Acquire a permit if the pool is bounded.
|
||||
let mut permit = None;
|
||||
if let Some(limiter) = self.limiter.clone() {
|
||||
@@ -422,23 +505,23 @@ impl StreamPool {
|
||||
let mut streams = self.streams.lock().unwrap();
|
||||
|
||||
// Look for a pooled stream with available capacity.
|
||||
for entry in streams.values() {
|
||||
for (&id, entry) in streams.iter_mut() {
|
||||
assert!(
|
||||
entry.queue_depth.load(Ordering::Relaxed) <= self.max_queue_depth.get(),
|
||||
entry.queue_depth <= self.max_queue_depth.get(),
|
||||
"stream queue overflow"
|
||||
);
|
||||
if entry
|
||||
.queue_depth
|
||||
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |queue_depth| {
|
||||
// Increment the queue depth via compare-and-swap.
|
||||
// TODO: review ordering.
|
||||
(queue_depth < self.max_queue_depth.get()).then_some(queue_depth + 1)
|
||||
})
|
||||
.is_ok()
|
||||
{
|
||||
assert_eq!(
|
||||
entry.idle_since.is_some(),
|
||||
entry.queue_depth == 0,
|
||||
"incorrect stream idle state"
|
||||
);
|
||||
if entry.queue_depth < self.max_queue_depth.get() {
|
||||
entry.queue_depth += 1;
|
||||
entry.idle_since = None;
|
||||
return StreamGuard {
|
||||
pool: Arc::downgrade(self),
|
||||
id,
|
||||
sender: entry.sender.clone(),
|
||||
queue_depth: entry.queue_depth.clone(),
|
||||
permit,
|
||||
};
|
||||
}
|
||||
@@ -448,11 +531,11 @@ impl StreamPool {
|
||||
// return the guard, while spinning up the stream task async. This allows other callers to
|
||||
// join onto this stream and also create additional streams concurrently if this fills up.
|
||||
let id = self.next_stream_id.fetch_add(1, Ordering::Relaxed);
|
||||
let queue_depth = Arc::new(AtomicUsize::new(1)); // reserve quota for this caller
|
||||
let (req_tx, req_rx) = mpsc::channel(self.max_queue_depth.get());
|
||||
let entry = StreamEntry {
|
||||
sender: req_tx.clone(),
|
||||
queue_depth: queue_depth.clone(),
|
||||
queue_depth: 1, // reserve quota for this caller
|
||||
idle_since: None,
|
||||
};
|
||||
streams.insert(id, entry);
|
||||
|
||||
@@ -461,20 +544,23 @@ impl StreamPool {
|
||||
};
|
||||
|
||||
let client_pool = self.client_pool.clone();
|
||||
let streams = self.streams.clone();
|
||||
let pool = Arc::downgrade(self);
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Err(err) = Self::run_stream(client_pool, req_rx).await {
|
||||
error!("stream failed: {err}");
|
||||
}
|
||||
// Remove stream from pool on exit.
|
||||
let entry = streams.lock().unwrap().remove(&id);
|
||||
assert!(entry.is_some(), "unknown stream ID: {id}");
|
||||
// Remove stream from pool on exit. Weak reference to avoid holding the pool alive.
|
||||
if let Some(pool) = pool.upgrade() {
|
||||
let entry = pool.streams.lock().unwrap().remove(&id);
|
||||
assert!(entry.is_some(), "unknown stream ID: {id}");
|
||||
}
|
||||
});
|
||||
|
||||
StreamGuard {
|
||||
pool: Arc::downgrade(self),
|
||||
id,
|
||||
sender: req_tx,
|
||||
queue_depth,
|
||||
permit,
|
||||
}
|
||||
}
|
||||
@@ -552,11 +638,26 @@ impl StreamPool {
|
||||
}
|
||||
}
|
||||
|
||||
impl Reapable for StreamPool {
|
||||
/// Reaps streams that have been idle since before the cutoff.
|
||||
fn reap_idle(&self, cutoff: Instant) {
|
||||
self.streams.lock().unwrap().retain(|_, entry| {
|
||||
let Some(idle_since) = entry.idle_since else {
|
||||
assert_ne!(entry.queue_depth, 0, "empty stream not marked idle");
|
||||
return true;
|
||||
};
|
||||
assert_eq!(entry.queue_depth, 0, "idle stream has requests");
|
||||
idle_since >= cutoff
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// A pooled stream reference. Can be used to send a single request, to properly enforce queue
|
||||
/// depth. Queue depth is already reserved and will be returned on drop.
|
||||
pub struct StreamGuard {
|
||||
pool: Weak<StreamPool>,
|
||||
id: StreamID,
|
||||
sender: RequestSender,
|
||||
queue_depth: Arc<AtomicUsize>,
|
||||
permit: Option<OwnedSemaphorePermit>, // None if pool is unbounded
|
||||
}
|
||||
|
||||
@@ -588,11 +689,78 @@ impl StreamGuard {
|
||||
|
||||
impl Drop for StreamGuard {
|
||||
fn drop(&mut self) {
|
||||
let Some(pool) = self.pool.upgrade() else {
|
||||
return; // pool was dropped
|
||||
};
|
||||
|
||||
// Release the queue depth reservation on drop. This can prematurely decrement it if dropped
|
||||
// before the response is received, but that's okay.
|
||||
let prev_queue_depth = self.queue_depth.fetch_sub(1, Ordering::SeqCst);
|
||||
assert!(prev_queue_depth > 0, "stream queue underflow");
|
||||
let mut streams = pool.streams.lock().unwrap();
|
||||
let entry = streams.get_mut(&self.id).expect("unknown stream");
|
||||
assert!(entry.idle_since.is_none(), "active stream marked idle");
|
||||
assert!(entry.queue_depth > 0, "stream queue underflow");
|
||||
entry.queue_depth -= 1;
|
||||
if entry.queue_depth == 0 {
|
||||
entry.idle_since = Some(Instant::now()); // mark stream as idle
|
||||
}
|
||||
|
||||
_ = self.permit; // returned on drop, referenced for visibility
|
||||
}
|
||||
}
|
||||
|
||||
/// Periodically reaps idle resources from a pool.
|
||||
struct Reaper {
|
||||
/// The task check interval.
|
||||
interval: Duration,
|
||||
/// The threshold for reaping idle resources.
|
||||
threshold: Duration,
|
||||
/// Cancels the reaper task. Cancelled when the reaper is dropped.
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
impl Reaper {
|
||||
/// Creates a new reaper.
|
||||
pub fn new(threshold: Duration, interval: Duration) -> Self {
|
||||
Self {
|
||||
cancel: CancellationToken::new(),
|
||||
threshold,
|
||||
interval,
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawns a task to periodically reap idle resources from the given task pool. The task is
|
||||
/// cancelled when the reaper is dropped.
|
||||
pub fn spawn(&self, pool: &Arc<impl Reapable>) {
|
||||
// NB: hold a weak pool reference, otherwise the task will prevent dropping the pool.
|
||||
let pool = Arc::downgrade(pool);
|
||||
let cancel = self.cancel.clone();
|
||||
let (interval, threshold) = (self.interval, self.threshold);
|
||||
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(interval) => {
|
||||
let Some(pool) = pool.upgrade() else {
|
||||
return; // pool was dropped
|
||||
};
|
||||
pool.reap_idle(Instant::now() - threshold);
|
||||
}
|
||||
|
||||
_ = cancel.cancelled() => return,
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Reaper {
|
||||
fn drop(&mut self) {
|
||||
self.cancel.cancel(); // cancel reaper task
|
||||
}
|
||||
}
|
||||
|
||||
/// A reapable resource pool.
|
||||
trait Reapable: Send + Sync + 'static {
|
||||
/// Reaps resources that have been idle since before the given cutoff.
|
||||
fn reap_idle(&self, cutoff: Instant);
|
||||
}
|
||||
|
||||
@@ -131,7 +131,6 @@ impl Retry {
|
||||
tonic::Code::Aborted => true,
|
||||
tonic::Code::Cancelled => true,
|
||||
tonic::Code::DeadlineExceeded => true, // maybe transient slowness
|
||||
tonic::Code::Internal => true, // maybe transient failure?
|
||||
tonic::Code::ResourceExhausted => true,
|
||||
tonic::Code::Unavailable => true,
|
||||
|
||||
@@ -139,6 +138,10 @@ impl Retry {
|
||||
tonic::Code::AlreadyExists => false,
|
||||
tonic::Code::DataLoss => false,
|
||||
tonic::Code::FailedPrecondition => false,
|
||||
// NB: don't retry Internal. It is intended for serious errors such as invariant
|
||||
// violations, and is also used for client-side invariant checks that would otherwise
|
||||
// result in retry loops.
|
||||
tonic::Code::Internal => false,
|
||||
tonic::Code::InvalidArgument => false,
|
||||
tonic::Code::NotFound => false,
|
||||
tonic::Code::OutOfRange => false,
|
||||
|
||||
@@ -97,7 +97,8 @@ impl GetPageSplitter {
|
||||
self.requests.drain()
|
||||
}
|
||||
|
||||
/// Adds a response from the given shard.
|
||||
/// Adds a response from the given shard. The response must match the request ID and have an OK
|
||||
/// status code. A response must not already exist for the given shard ID.
|
||||
#[allow(clippy::result_large_err)]
|
||||
pub fn add_response(
|
||||
&mut self,
|
||||
@@ -105,24 +106,30 @@ impl GetPageSplitter {
|
||||
response: page_api::GetPageResponse,
|
||||
) -> tonic::Result<()> {
|
||||
// The caller should already have converted status codes into tonic::Status.
|
||||
assert_eq!(response.status_code, page_api::GetPageStatusCode::Ok);
|
||||
if response.status_code != page_api::GetPageStatusCode::Ok {
|
||||
return Err(tonic::Status::internal(format!(
|
||||
"unexpected non-OK response for shard {shard_id}: {:?}",
|
||||
response.status_code
|
||||
)));
|
||||
}
|
||||
|
||||
// Make sure the response matches the request ID.
|
||||
// The stream pool ensures the response matches the request ID.
|
||||
if response.request_id != self.request_id {
|
||||
return Err(tonic::Status::internal(format!(
|
||||
"response ID {} does not match request ID {}",
|
||||
response.request_id, self.request_id
|
||||
"response ID mismatch for shard {shard_id}: expected {}, got {}",
|
||||
self.request_id, response.request_id
|
||||
)));
|
||||
}
|
||||
|
||||
// We only dispatch one request per shard.
|
||||
if self.responses.contains_key(&shard_id) {
|
||||
return Err(tonic::Status::internal(format!(
|
||||
"duplicate response for shard {shard_id}"
|
||||
)));
|
||||
}
|
||||
|
||||
// Add the response data to the map.
|
||||
let old = self.responses.insert(shard_id, response.page_images);
|
||||
|
||||
if old.is_some() {
|
||||
return Err(tonic::Status::internal(format!(
|
||||
"duplicate response for shard {shard_id}",
|
||||
)));
|
||||
}
|
||||
self.responses.insert(shard_id, response.page_images);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@ pageserver = { path = ".." }
|
||||
pageserver_api.workspace = true
|
||||
remote_storage = { path = "../../libs/remote_storage" }
|
||||
postgres_ffi.workspace = true
|
||||
serde.workspace = true
|
||||
thiserror.workspace = true
|
||||
tokio.workspace = true
|
||||
tokio-util.workspace = true
|
||||
|
||||
85
pageserver/ctl/src/download_remote_object.rs
Normal file
85
pageserver/ctl/src/download_remote_object.rs
Normal file
@@ -0,0 +1,85 @@
|
||||
use camino::Utf8PathBuf;
|
||||
use clap::Parser;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
/// Download a specific object from remote storage to a local file.
|
||||
///
|
||||
/// The remote storage configuration is supplied via the `REMOTE_STORAGE_CONFIG` environment
|
||||
/// variable, in the same TOML format that the pageserver itself understands. This allows the
|
||||
/// command to work with any cloud supported by the `remote_storage` crate (currently AWS S3,
|
||||
/// Azure Blob Storage and local files), as long as the credentials are available via the
|
||||
/// standard environment variables expected by the underlying SDKs.
|
||||
///
|
||||
/// Examples for setting the environment variable:
|
||||
///
|
||||
/// ```bash
|
||||
/// # AWS S3 (region can also be provided via AWS_REGION)
|
||||
/// export REMOTE_STORAGE_CONFIG='remote_storage = { bucket_name = "my-bucket", bucket_region = "us-east-2" }'
|
||||
///
|
||||
/// # Azure Blob Storage (account key picked up from AZURE_STORAGE_ACCOUNT_KEY)
|
||||
/// export REMOTE_STORAGE_CONFIG='remote_storage = { container = "my-container", account = "my-account" }'
|
||||
/// ```
|
||||
#[derive(Parser)]
|
||||
pub(crate) struct DownloadRemoteObjectCmd {
|
||||
/// Key / path of the object to download (relative to the remote storage prefix).
|
||||
///
|
||||
/// Examples:
|
||||
/// "wal/3aa8f.../00000001000000000000000A"
|
||||
/// "pageserver/v1/tenants/<tenant_id>/timelines/<timeline_id>/layer_12345"
|
||||
pub remote_path: String,
|
||||
|
||||
/// Path of the local file to create. Existing file will be overwritten.
|
||||
///
|
||||
/// Examples:
|
||||
/// "./segment"
|
||||
/// "/tmp/layer_12345.parquet"
|
||||
pub output_file: Utf8PathBuf,
|
||||
}
|
||||
|
||||
pub(crate) async fn main(cmd: &DownloadRemoteObjectCmd) -> anyhow::Result<()> {
|
||||
use remote_storage::{DownloadOpts, GenericRemoteStorage, RemotePath, RemoteStorageConfig};
|
||||
|
||||
// Fetch remote storage configuration from the environment
|
||||
let config_str = std::env::var("REMOTE_STORAGE_CONFIG").map_err(|_| {
|
||||
anyhow::anyhow!(
|
||||
"'REMOTE_STORAGE_CONFIG' environment variable must be set to a valid remote storage TOML config"
|
||||
)
|
||||
})?;
|
||||
|
||||
let config = RemoteStorageConfig::from_toml_str(&config_str)?;
|
||||
|
||||
// Initialise remote storage client
|
||||
let storage = GenericRemoteStorage::from_config(&config).await?;
|
||||
|
||||
// RemotePath must be relative – leading slashes confuse the parser.
|
||||
let remote_path_str = cmd.remote_path.trim_start_matches('/');
|
||||
let remote_path = RemotePath::from_string(remote_path_str)?;
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
println!(
|
||||
"Downloading '{remote_path}' from remote storage bucket {:?} ...",
|
||||
config.storage.bucket_name()
|
||||
);
|
||||
|
||||
// Start the actual download
|
||||
let download = storage
|
||||
.download(&remote_path, &DownloadOpts::default(), &cancel)
|
||||
.await?;
|
||||
|
||||
// Stream to file
|
||||
let mut reader = tokio_util::io::StreamReader::new(download.download_stream);
|
||||
let tmp_path = cmd.output_file.with_extension("tmp");
|
||||
let mut file = tokio::fs::File::create(&tmp_path).await?;
|
||||
tokio::io::copy(&mut reader, &mut file).await?;
|
||||
file.sync_all().await?;
|
||||
// Atomically move into place
|
||||
tokio::fs::rename(&tmp_path, &cmd.output_file).await?;
|
||||
|
||||
println!(
|
||||
"Downloaded to '{}'. Last modified: {:?}, etag: {}",
|
||||
cmd.output_file, download.last_modified, download.etag
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1,14 +1,16 @@
|
||||
use std::str::FromStr;
|
||||
|
||||
use anyhow::Context;
|
||||
use anyhow::{Context, Ok};
|
||||
use camino::Utf8PathBuf;
|
||||
use pageserver::tenant::{
|
||||
IndexPart,
|
||||
layer_map::{LayerMap, SearchResult},
|
||||
remote_timeline_client::remote_layer_path,
|
||||
storage_layer::{PersistentLayerDesc, ReadableLayerWeak},
|
||||
remote_timeline_client::{index::LayerFileMetadata, remote_layer_path},
|
||||
storage_layer::{LayerName, LayerVisibilityHint, PersistentLayerDesc, ReadableLayerWeak},
|
||||
};
|
||||
use pageserver_api::key::Key;
|
||||
use serde::Serialize;
|
||||
use std::collections::BTreeMap;
|
||||
use utils::{
|
||||
id::{TenantId, TimelineId},
|
||||
lsn::Lsn,
|
||||
@@ -33,6 +35,31 @@ pub(crate) enum IndexPartCmd {
|
||||
#[arg(long)]
|
||||
lsn: String,
|
||||
},
|
||||
/// List all visible delta and image layers at the latest LSN.
|
||||
ListVisibleLayers {
|
||||
#[arg(long)]
|
||||
path: Utf8PathBuf,
|
||||
},
|
||||
}
|
||||
|
||||
fn create_layer_map_from_index_part(
|
||||
index_part: &IndexPart,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
) -> LayerMap {
|
||||
let mut layer_map = LayerMap::default();
|
||||
{
|
||||
let mut updates = layer_map.batch_update();
|
||||
for (key, value) in index_part.layer_metadata.iter() {
|
||||
updates.insert_historic(PersistentLayerDesc::from_filename(
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
key.clone(),
|
||||
value.file_size,
|
||||
));
|
||||
}
|
||||
}
|
||||
layer_map
|
||||
}
|
||||
|
||||
async fn search_layers(
|
||||
@@ -49,18 +76,7 @@ async fn search_layers(
|
||||
let bytes = tokio::fs::read(path).await?;
|
||||
IndexPart::from_json_bytes(&bytes).unwrap()
|
||||
};
|
||||
let mut layer_map = LayerMap::default();
|
||||
{
|
||||
let mut updates = layer_map.batch_update();
|
||||
for (key, value) in index_json.layer_metadata.iter() {
|
||||
updates.insert_historic(PersistentLayerDesc::from_filename(
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
key.clone(),
|
||||
value.file_size,
|
||||
));
|
||||
}
|
||||
}
|
||||
let layer_map = create_layer_map_from_index_part(&index_json, tenant_shard_id, timeline_id);
|
||||
let key = Key::from_hex(key)?;
|
||||
|
||||
let lsn = Lsn::from_str(lsn).unwrap();
|
||||
@@ -98,6 +114,69 @@ async fn search_layers(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
struct VisibleLayers {
|
||||
pub total_images: u64,
|
||||
pub total_image_bytes: u64,
|
||||
pub total_deltas: u64,
|
||||
pub total_delta_bytes: u64,
|
||||
pub layer_metadata: BTreeMap<LayerName, LayerFileMetadata>,
|
||||
}
|
||||
|
||||
impl VisibleLayers {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
layer_metadata: BTreeMap::new(),
|
||||
total_images: 0,
|
||||
total_image_bytes: 0,
|
||||
total_deltas: 0,
|
||||
total_delta_bytes: 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_layer(&mut self, name: LayerName, layer: LayerFileMetadata) {
|
||||
match name {
|
||||
LayerName::Image(_) => {
|
||||
self.total_images += 1;
|
||||
self.total_image_bytes += layer.file_size;
|
||||
}
|
||||
LayerName::Delta(_) => {
|
||||
self.total_deltas += 1;
|
||||
self.total_delta_bytes += layer.file_size;
|
||||
}
|
||||
}
|
||||
self.layer_metadata.insert(name, layer);
|
||||
}
|
||||
}
|
||||
|
||||
async fn list_visible_layers(path: &Utf8PathBuf) -> anyhow::Result<()> {
|
||||
let tenant_id = TenantId::generate();
|
||||
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
|
||||
let timeline_id = TimelineId::generate();
|
||||
|
||||
let bytes = tokio::fs::read(path).await.context("read file")?;
|
||||
let index_part = IndexPart::from_json_bytes(&bytes).context("deserialize")?;
|
||||
let layer_map = create_layer_map_from_index_part(&index_part, tenant_shard_id, timeline_id);
|
||||
let mut visible_layers = VisibleLayers::new();
|
||||
let (layers, _key_space) = layer_map.get_visibility(Vec::new());
|
||||
for (layer, visibility) in layers {
|
||||
if visibility == LayerVisibilityHint::Visible {
|
||||
visible_layers.add_layer(
|
||||
layer.layer_name(),
|
||||
index_part
|
||||
.layer_metadata
|
||||
.get(&layer.layer_name())
|
||||
.unwrap()
|
||||
.clone(),
|
||||
);
|
||||
}
|
||||
}
|
||||
let output = serde_json::to_string_pretty(&visible_layers).context("serialize output")?;
|
||||
println!("{output}");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn main(cmd: &IndexPartCmd) -> anyhow::Result<()> {
|
||||
match cmd {
|
||||
IndexPartCmd::Dump { path } => {
|
||||
@@ -114,5 +193,6 @@ pub(crate) async fn main(cmd: &IndexPartCmd) -> anyhow::Result<()> {
|
||||
key,
|
||||
lsn,
|
||||
} => search_layers(tenant_id, timeline_id, path, key, lsn).await,
|
||||
IndexPartCmd::ListVisibleLayers { path } => list_visible_layers(path).await,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
//!
|
||||
//! Separate, `metadata` subcommand allows to print and update pageserver's metadata file.
|
||||
|
||||
mod download_remote_object;
|
||||
mod draw_timeline_dir;
|
||||
mod index_part;
|
||||
mod key;
|
||||
@@ -16,6 +17,7 @@ use std::time::{Duration, SystemTime};
|
||||
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use clap::{Parser, Subcommand};
|
||||
use download_remote_object::DownloadRemoteObjectCmd;
|
||||
use index_part::IndexPartCmd;
|
||||
use layers::LayerCmd;
|
||||
use page_trace::PageTraceCmd;
|
||||
@@ -63,6 +65,7 @@ enum Commands {
|
||||
/// Debug print a hex key found from logs
|
||||
Key(key::DescribeKeyCommand),
|
||||
PageTrace(PageTraceCmd),
|
||||
DownloadRemoteObject(DownloadRemoteObjectCmd),
|
||||
}
|
||||
|
||||
/// Read and update pageserver metadata file
|
||||
@@ -185,6 +188,9 @@ async fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
Commands::Key(dkc) => dkc.execute(),
|
||||
Commands::PageTrace(cmd) => page_trace::main(&cmd)?,
|
||||
Commands::DownloadRemoteObject(cmd) => {
|
||||
download_remote_object::main(&cmd).await?;
|
||||
}
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -27,8 +27,9 @@ tokio-util.workspace = true
|
||||
tonic.workspace = true
|
||||
url.workspace = true
|
||||
|
||||
pageserver_client.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
pageserver_client.workspace = true
|
||||
pageserver_client_grpc.workspace = true
|
||||
pageserver_page_api.workspace = true
|
||||
utils = { path = "../../libs/utils/" }
|
||||
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
|
||||
|
||||
@@ -10,12 +10,14 @@ use anyhow::Context;
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use camino::Utf8PathBuf;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::{Stream, StreamExt as _};
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::keyspace::KeySpaceAccum;
|
||||
use pageserver_api::pagestream_api::{PagestreamGetPageRequest, PagestreamRequest};
|
||||
use pageserver_api::reltag::RelTag;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_client_grpc::{self as client_grpc, ShardSpec};
|
||||
use pageserver_page_api as page_api;
|
||||
use rand::prelude::*;
|
||||
use tokio::task::JoinSet;
|
||||
@@ -37,6 +39,10 @@ pub(crate) struct Args {
|
||||
/// Pageserver connection string. Supports postgresql:// and grpc:// protocols.
|
||||
#[clap(long, default_value = "postgres://postgres@localhost:64000")]
|
||||
page_service_connstring: String,
|
||||
/// Use the rich gRPC Pageserver client `client_grpc::PageserverClient`, rather than the basic
|
||||
/// no-frills `page_api::Client`. Only valid with grpc:// connstrings.
|
||||
#[clap(long)]
|
||||
rich_client: bool,
|
||||
#[clap(long)]
|
||||
pageserver_jwt: Option<String>,
|
||||
#[clap(long, default_value = "1")]
|
||||
@@ -332,6 +338,7 @@ async fn main_impl(
|
||||
let client: Box<dyn Client> = match scheme.as_str() {
|
||||
"postgresql" | "postgres" => {
|
||||
assert!(!args.compression, "libpq does not support compression");
|
||||
assert!(!args.rich_client, "rich client requires grpc://");
|
||||
Box::new(
|
||||
LibpqClient::new(&args.page_service_connstring, worker_id.timeline)
|
||||
.await
|
||||
@@ -339,6 +346,16 @@ async fn main_impl(
|
||||
)
|
||||
}
|
||||
|
||||
"grpc" if args.rich_client => Box::new(
|
||||
RichGrpcClient::new(
|
||||
&args.page_service_connstring,
|
||||
worker_id.timeline,
|
||||
args.compression,
|
||||
)
|
||||
.await
|
||||
.unwrap(),
|
||||
),
|
||||
|
||||
"grpc" => Box::new(
|
||||
GrpcClient::new(
|
||||
&args.page_service_connstring,
|
||||
@@ -680,3 +697,70 @@ impl Client for GrpcClient {
|
||||
Ok((resp.request_id, resp.page_images))
|
||||
}
|
||||
}
|
||||
|
||||
/// A rich gRPC Pageserver client.
|
||||
struct RichGrpcClient {
|
||||
inner: Arc<client_grpc::PageserverClient>,
|
||||
requests: FuturesUnordered<
|
||||
Pin<Box<dyn Future<Output = anyhow::Result<page_api::GetPageResponse>> + Send>>,
|
||||
>,
|
||||
}
|
||||
|
||||
impl RichGrpcClient {
|
||||
async fn new(
|
||||
connstring: &str,
|
||||
ttid: TenantTimelineId,
|
||||
compression: bool,
|
||||
) -> anyhow::Result<Self> {
|
||||
let inner = Arc::new(client_grpc::PageserverClient::new(
|
||||
ttid.tenant_id,
|
||||
ttid.timeline_id,
|
||||
ShardSpec::new(
|
||||
[(ShardIndex::unsharded(), connstring.to_string())].into(),
|
||||
None,
|
||||
)?,
|
||||
None,
|
||||
compression.then_some(tonic::codec::CompressionEncoding::Zstd),
|
||||
)?);
|
||||
Ok(Self {
|
||||
inner,
|
||||
requests: FuturesUnordered::new(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Client for RichGrpcClient {
|
||||
async fn send_get_page(
|
||||
&mut self,
|
||||
req_id: u64,
|
||||
req_lsn: Lsn,
|
||||
mod_lsn: Lsn,
|
||||
rel: RelTag,
|
||||
blks: Vec<u32>,
|
||||
) -> anyhow::Result<()> {
|
||||
let req = page_api::GetPageRequest {
|
||||
request_id: req_id,
|
||||
request_class: page_api::GetPageClass::Normal,
|
||||
read_lsn: page_api::ReadLsn {
|
||||
request_lsn: req_lsn,
|
||||
not_modified_since_lsn: Some(mod_lsn),
|
||||
},
|
||||
rel,
|
||||
block_numbers: blks,
|
||||
};
|
||||
let inner = self.inner.clone();
|
||||
self.requests.push(Box::pin(async move {
|
||||
inner
|
||||
.get_page(req)
|
||||
.await
|
||||
.map_err(|err| anyhow::anyhow!("{err}"))
|
||||
}));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn recv_get_page(&mut self) -> anyhow::Result<(u64, Vec<Bytes>)> {
|
||||
let resp = self.requests.next().await.unwrap()?;
|
||||
Ok((resp.request_id, resp.page_images))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,8 +29,8 @@ use pageserver::task_mgr::{
|
||||
};
|
||||
use pageserver::tenant::{TenantSharedResources, mgr, secondary};
|
||||
use pageserver::{
|
||||
CancellableTask, ConsumptionMetricsTasks, HttpEndpointListener, HttpsEndpointListener, http,
|
||||
page_cache, page_service, task_mgr, virtual_file,
|
||||
CancellableTask, ConsumptionMetricsTasks, HttpEndpointListener, HttpsEndpointListener,
|
||||
MetricsCollectionTask, http, page_cache, page_service, task_mgr, virtual_file,
|
||||
};
|
||||
use postgres_backend::AuthType;
|
||||
use remote_storage::GenericRemoteStorage;
|
||||
@@ -41,6 +41,7 @@ use tracing_utils::OtelGuard;
|
||||
use utils::auth::{JwtAuth, SwappableJwtAuth};
|
||||
use utils::crashsafe::syncfs;
|
||||
use utils::logging::TracingErrorLayerEnablement;
|
||||
use utils::metrics_collector::{METRICS_COLLECTION_INTERVAL, METRICS_COLLECTOR};
|
||||
use utils::sentry_init::init_sentry;
|
||||
use utils::{failpoint_support, logging, project_build_tag, project_git_version, tcp_listener};
|
||||
|
||||
@@ -763,6 +764,41 @@ fn start_pageserver(
|
||||
(http_task, https_task)
|
||||
};
|
||||
|
||||
/* BEGIN_HADRON */
|
||||
let metrics_collection_task = {
|
||||
let cancel = shutdown_pageserver.child_token();
|
||||
let task = crate::BACKGROUND_RUNTIME.spawn({
|
||||
let cancel = cancel.clone();
|
||||
let background_jobs_barrier = background_jobs_barrier.clone();
|
||||
async move {
|
||||
if conf.force_metric_collection_on_scrape {
|
||||
return;
|
||||
}
|
||||
|
||||
// first wait until background jobs are cleared to launch.
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => { return; },
|
||||
_ = background_jobs_barrier.wait() => {}
|
||||
};
|
||||
let mut interval = tokio::time::interval(METRICS_COLLECTION_INTERVAL);
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => {
|
||||
tracing::info!("cancelled metrics collection task, exiting...");
|
||||
break;
|
||||
},
|
||||
_ = interval.tick() => {}
|
||||
}
|
||||
tokio::task::spawn_blocking(|| {
|
||||
METRICS_COLLECTOR.run_once(true);
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
MetricsCollectionTask(CancellableTask { task, cancel })
|
||||
};
|
||||
/* END_HADRON */
|
||||
|
||||
let consumption_metrics_tasks = {
|
||||
let cancel = shutdown_pageserver.child_token();
|
||||
let task = crate::BACKGROUND_RUNTIME.spawn({
|
||||
@@ -844,6 +880,7 @@ fn start_pageserver(
|
||||
https_endpoint_listener,
|
||||
page_service,
|
||||
page_service_grpc,
|
||||
metrics_collection_task,
|
||||
consumption_metrics_tasks,
|
||||
disk_usage_eviction_task,
|
||||
&tenant_manager,
|
||||
|
||||
@@ -252,6 +252,14 @@ pub struct PageServerConf {
|
||||
pub timeline_import_config: pageserver_api::config::TimelineImportConfig,
|
||||
|
||||
pub basebackup_cache_config: Option<pageserver_api::config::BasebackupCacheConfig>,
|
||||
|
||||
/// Defines what is a big tenant for the purpose of image layer generation.
|
||||
/// See Timeline::should_check_if_image_layers_required
|
||||
pub image_layer_generation_large_timeline_threshold: Option<u64>,
|
||||
|
||||
/// Controls whether to collect all metrics on each scrape or to return potentially stale
|
||||
/// results.
|
||||
pub force_metric_collection_on_scrape: bool,
|
||||
}
|
||||
|
||||
/// Token for authentication to safekeepers
|
||||
@@ -432,6 +440,8 @@ impl PageServerConf {
|
||||
posthog_config,
|
||||
timeline_import_config,
|
||||
basebackup_cache_config,
|
||||
image_layer_generation_large_timeline_threshold,
|
||||
force_metric_collection_on_scrape,
|
||||
} = config_toml;
|
||||
|
||||
let mut conf = PageServerConf {
|
||||
@@ -490,6 +500,8 @@ impl PageServerConf {
|
||||
dev_mode,
|
||||
timeline_import_config,
|
||||
basebackup_cache_config,
|
||||
image_layer_generation_large_timeline_threshold,
|
||||
force_metric_collection_on_scrape,
|
||||
|
||||
// ------------------------------------------------------------
|
||||
// fields that require additional validation or custom handling
|
||||
|
||||
@@ -271,6 +271,18 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/LsnLease"
|
||||
"500":
|
||||
description: Generic operation error
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
"503":
|
||||
description: Temporarily unavailable, please retry.
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ServiceUnavailableError"
|
||||
|
||||
/v1/tenant/{tenant_id}/timeline/{timeline_id}/do_gc:
|
||||
parameters:
|
||||
|
||||
@@ -2,7 +2,9 @@
|
||||
//! Management HTTP API
|
||||
//!
|
||||
use std::cmp::Reverse;
|
||||
use std::collections::{BinaryHeap, HashMap};
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::BinaryHeap;
|
||||
use std::collections::HashMap;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
@@ -79,8 +81,8 @@ use crate::tenant::storage_layer::{IoConcurrency, LayerAccessStatsReset, LayerNa
|
||||
use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
|
||||
use crate::tenant::timeline::offload::{OffloadError, offload_timeline};
|
||||
use crate::tenant::timeline::{
|
||||
CompactFlags, CompactOptions, CompactRequest, CompactionError, MarkInvisibleRequest, Timeline,
|
||||
WaitLsnTimeout, WaitLsnWaiter, import_pgdata,
|
||||
CompactFlags, CompactOptions, CompactRequest, MarkInvisibleRequest, Timeline, WaitLsnTimeout,
|
||||
WaitLsnWaiter, import_pgdata,
|
||||
};
|
||||
use crate::tenant::{
|
||||
GetTimelineError, LogicalSizeCalculationCause, OffloadedTimeline, PageReconstructError,
|
||||
@@ -2500,9 +2502,10 @@ async fn timeline_checkpoint_handler(
|
||||
.compact(&cancel, flags, &ctx)
|
||||
.await
|
||||
.map_err(|e|
|
||||
match e {
|
||||
CompactionError::ShuttingDown => ApiError::ShuttingDown,
|
||||
CompactionError::Other(e) => ApiError::InternalServerError(e),
|
||||
if e.is_cancel() {
|
||||
ApiError::ShuttingDown
|
||||
} else {
|
||||
ApiError::InternalServerError(e.into_anyhow())
|
||||
}
|
||||
)?;
|
||||
}
|
||||
@@ -3213,6 +3216,30 @@ async fn get_utilization(
|
||||
.map_err(ApiError::InternalServerError)
|
||||
}
|
||||
|
||||
/// HADRON
|
||||
async fn list_tenant_visible_size_handler(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
check_permission(&request, None)?;
|
||||
let state = get_state(&request);
|
||||
|
||||
let mut map = BTreeMap::new();
|
||||
for (tenant_shard_id, slot) in state.tenant_manager.list() {
|
||||
match slot {
|
||||
TenantSlot::Attached(tenant) => {
|
||||
let visible_size = tenant.get_visible_size();
|
||||
map.insert(tenant_shard_id, visible_size);
|
||||
}
|
||||
TenantSlot::Secondary(_) | TenantSlot::InProgress(_) => {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
json_response(StatusCode::OK, map)
|
||||
}
|
||||
|
||||
async fn list_aux_files(
|
||||
mut request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
@@ -3937,9 +3964,14 @@ pub fn make_router(
|
||||
.expect("construct launch timestamp header middleware"),
|
||||
);
|
||||
|
||||
let force_metric_collection_on_scrape = state.conf.force_metric_collection_on_scrape;
|
||||
|
||||
let prometheus_metrics_handler_wrapper =
|
||||
move |req| prometheus_metrics_handler(req, force_metric_collection_on_scrape);
|
||||
|
||||
Ok(router
|
||||
.data(state)
|
||||
.get("/metrics", |r| request_span(r, prometheus_metrics_handler))
|
||||
.get("/metrics", move |r| request_span(r, prometheus_metrics_handler_wrapper))
|
||||
.get("/profile/cpu", |r| request_span(r, profile_cpu_handler))
|
||||
.get("/profile/heap", |r| request_span(r, profile_heap_handler))
|
||||
.get("/v1/status", |r| api_handler(r, status_handler))
|
||||
@@ -4145,6 +4177,7 @@ pub fn make_router(
|
||||
.put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler))
|
||||
.put("/v1/io_mode", |r| api_handler(r, put_io_mode_handler))
|
||||
.get("/v1/utilization", |r| api_handler(r, get_utilization))
|
||||
.get("/v1/list_tenant_visible_size", |r| api_handler(r, list_tenant_visible_size_handler))
|
||||
.post(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/ingest_aux_files",
|
||||
|r| testing_api_handler("ingest_aux_files", r, ingest_aux_files),
|
||||
|
||||
@@ -73,6 +73,9 @@ pub struct HttpEndpointListener(pub CancellableTask);
|
||||
pub struct HttpsEndpointListener(pub CancellableTask);
|
||||
pub struct ConsumptionMetricsTasks(pub CancellableTask);
|
||||
pub struct DiskUsageEvictionTask(pub CancellableTask);
|
||||
// HADRON
|
||||
pub struct MetricsCollectionTask(pub CancellableTask);
|
||||
|
||||
impl CancellableTask {
|
||||
pub async fn shutdown(self) {
|
||||
self.cancel.cancel();
|
||||
@@ -87,6 +90,7 @@ pub async fn shutdown_pageserver(
|
||||
https_listener: Option<HttpsEndpointListener>,
|
||||
page_service: page_service::Listener,
|
||||
grpc_task: Option<CancellableTask>,
|
||||
metrics_collection_task: MetricsCollectionTask,
|
||||
consumption_metrics_worker: ConsumptionMetricsTasks,
|
||||
disk_usage_eviction_task: Option<DiskUsageEvictionTask>,
|
||||
tenant_manager: &TenantManager,
|
||||
@@ -211,6 +215,14 @@ pub async fn shutdown_pageserver(
|
||||
// Best effort to persist any outstanding deletions, to avoid leaking objects
|
||||
deletion_queue.shutdown(Duration::from_secs(5)).await;
|
||||
|
||||
// HADRON
|
||||
timed(
|
||||
metrics_collection_task.0.shutdown(),
|
||||
"shutdown metrics collections metrics",
|
||||
Duration::from_secs(1),
|
||||
)
|
||||
.await;
|
||||
|
||||
timed(
|
||||
consumption_metrics_worker.0.shutdown(),
|
||||
"shutdown consumption metrics",
|
||||
|
||||
@@ -2847,6 +2847,24 @@ pub(crate) static MISROUTED_PAGESTREAM_REQUESTS: Lazy<IntCounter> = Lazy::new(||
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
// Global counter for PageStream request results by outcome. Outcomes are divided into 3 categories:
|
||||
// - success
|
||||
// - internal_error: errors that indicate bugs in the storage cluster (e.g. page reconstruction errors, misrouted requests, LSN timeout errors)
|
||||
// - other_error: transient error conditions that are expected in normal operation or indicate bugs with other parts of the system (e.g. error due to pageserver shutdown, malformed requests etc.)
|
||||
pub(crate) static PAGESTREAM_HANDLER_RESULTS_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_pagestream_handler_results_total",
|
||||
"Number of pageserver pagestream handler results by outcome (success, internal_error, other_error)",
|
||||
&["outcome"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
// Constants for pageserver_pagestream_handler_results_total's outcome labels
|
||||
pub(crate) const PAGESTREAM_HANDLER_OUTCOME_SUCCESS: &str = "success";
|
||||
pub(crate) const PAGESTREAM_HANDLER_OUTCOME_INTERNAL_ERROR: &str = "internal_error";
|
||||
pub(crate) const PAGESTREAM_HANDLER_OUTCOME_OTHER_ERROR: &str = "other_error";
|
||||
|
||||
// Metrics collected on WAL redo operations
|
||||
//
|
||||
// We collect the time spent in actual WAL redo ('redo'), and time waiting
|
||||
|
||||
@@ -70,7 +70,7 @@ use crate::context::{
|
||||
};
|
||||
use crate::metrics::{
|
||||
self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, GetPageBatchBreakReason, LIVE_CONNECTIONS,
|
||||
MISROUTED_PAGESTREAM_REQUESTS, SmgrOpTimer, TimelineMetrics,
|
||||
MISROUTED_PAGESTREAM_REQUESTS, PAGESTREAM_HANDLER_RESULTS_TOTAL, SmgrOpTimer, TimelineMetrics,
|
||||
};
|
||||
use crate::pgdatadir_mapping::{LsnRange, Version};
|
||||
use crate::span::{
|
||||
@@ -1441,20 +1441,57 @@ impl PageServerHandler {
|
||||
let (response_msg, ctx) = match handler_result {
|
||||
Err(e) => match &e.err {
|
||||
PageStreamError::Shutdown => {
|
||||
// BEGIN HADRON
|
||||
PAGESTREAM_HANDLER_RESULTS_TOTAL
|
||||
.with_label_values(&[metrics::PAGESTREAM_HANDLER_OUTCOME_OTHER_ERROR])
|
||||
.inc();
|
||||
// END HADRON
|
||||
|
||||
// If we fail to fulfil a request during shutdown, which may be _because_ of
|
||||
// shutdown, then do not send the error to the client. Instead just drop the
|
||||
// connection.
|
||||
span.in_scope(|| info!("dropping connection due to shutdown"));
|
||||
return Err(QueryError::Shutdown);
|
||||
}
|
||||
PageStreamError::Reconnect(reason) => {
|
||||
span.in_scope(|| info!("handler requested reconnect: {reason}"));
|
||||
PageStreamError::Reconnect(_reason) => {
|
||||
span.in_scope(|| {
|
||||
// BEGIN HADRON
|
||||
// We can get here because the compute node is pointing at the wrong PS. We
|
||||
// already have a metric to keep track of this so suppressing this log to
|
||||
// reduce log spam. The information in this log message is not going to be that
|
||||
// helpful given the volume of logs that can be generated.
|
||||
// info!("handler requested reconnect: {reason}")
|
||||
// END HADRON
|
||||
});
|
||||
// BEGIN HADRON
|
||||
PAGESTREAM_HANDLER_RESULTS_TOTAL
|
||||
.with_label_values(&[
|
||||
metrics::PAGESTREAM_HANDLER_OUTCOME_INTERNAL_ERROR,
|
||||
])
|
||||
.inc();
|
||||
// END HADRON
|
||||
return Err(QueryError::Reconnect);
|
||||
}
|
||||
PageStreamError::Read(_)
|
||||
| PageStreamError::LsnTimeout(_)
|
||||
| PageStreamError::NotFound(_)
|
||||
| PageStreamError::BadRequest(_) => {
|
||||
// BEGIN HADRON
|
||||
if let PageStreamError::Read(_) | PageStreamError::LsnTimeout(_) = &e.err {
|
||||
PAGESTREAM_HANDLER_RESULTS_TOTAL
|
||||
.with_label_values(&[
|
||||
metrics::PAGESTREAM_HANDLER_OUTCOME_INTERNAL_ERROR,
|
||||
])
|
||||
.inc();
|
||||
} else {
|
||||
PAGESTREAM_HANDLER_RESULTS_TOTAL
|
||||
.with_label_values(&[
|
||||
metrics::PAGESTREAM_HANDLER_OUTCOME_OTHER_ERROR,
|
||||
])
|
||||
.inc();
|
||||
}
|
||||
// END HADRON
|
||||
|
||||
// print the all details to the log with {:#}, but for the client the
|
||||
// error message is enough. Do not log if shutting down, as the anyhow::Error
|
||||
// here includes cancellation which is not an error.
|
||||
@@ -1472,7 +1509,15 @@ impl PageServerHandler {
|
||||
)
|
||||
}
|
||||
},
|
||||
Ok((response_msg, _op_timer_already_observed, ctx)) => (response_msg, Some(ctx)),
|
||||
Ok((response_msg, _op_timer_already_observed, ctx)) => {
|
||||
// BEGIN HADRON
|
||||
PAGESTREAM_HANDLER_RESULTS_TOTAL
|
||||
.with_label_values(&[metrics::PAGESTREAM_HANDLER_OUTCOME_SUCCESS])
|
||||
.inc();
|
||||
// END HADRON
|
||||
|
||||
(response_msg, Some(ctx))
|
||||
}
|
||||
};
|
||||
|
||||
let ctx = ctx.map(|req_ctx| {
|
||||
|
||||
@@ -3291,7 +3291,7 @@ impl TenantShard {
|
||||
// Ignore this, we likely raced with unarchival.
|
||||
OffloadError::NotArchived => Ok(()),
|
||||
OffloadError::AlreadyInProgress => Ok(()),
|
||||
OffloadError::Cancelled => Err(CompactionError::ShuttingDown),
|
||||
OffloadError::Cancelled => Err(CompactionError::new_cancelled()),
|
||||
// don't break the anyhow chain
|
||||
OffloadError::Other(err) => Err(CompactionError::Other(err)),
|
||||
})?;
|
||||
@@ -3321,16 +3321,13 @@ impl TenantShard {
|
||||
|
||||
/// Trips the compaction circuit breaker if appropriate.
|
||||
pub(crate) fn maybe_trip_compaction_breaker(&self, err: &CompactionError) {
|
||||
match err {
|
||||
err if err.is_cancel() => {}
|
||||
CompactionError::ShuttingDown => (),
|
||||
CompactionError::Other(err) => {
|
||||
self.compaction_circuit_breaker
|
||||
.lock()
|
||||
.unwrap()
|
||||
.fail(&CIRCUIT_BREAKERS_BROKEN, err);
|
||||
}
|
||||
if err.is_cancel() {
|
||||
return;
|
||||
}
|
||||
self.compaction_circuit_breaker
|
||||
.lock()
|
||||
.unwrap()
|
||||
.fail(&CIRCUIT_BREAKERS_BROKEN, err);
|
||||
}
|
||||
|
||||
/// Cancel scheduled compaction tasks
|
||||
@@ -4174,6 +4171,15 @@ impl TenantShard {
|
||||
.unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
|
||||
}
|
||||
|
||||
// HADRON
|
||||
pub fn get_image_creation_timeout(&self) -> Option<Duration> {
|
||||
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
|
||||
tenant_conf.image_layer_force_creation_period.or(self
|
||||
.conf
|
||||
.default_tenant_conf
|
||||
.image_layer_force_creation_period)
|
||||
}
|
||||
|
||||
pub fn get_pitr_interval(&self) -> Duration {
|
||||
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
|
||||
tenant_conf
|
||||
@@ -5713,6 +5719,16 @@ impl TenantShard {
|
||||
.unwrap_or(0)
|
||||
}
|
||||
|
||||
/// HADRON
|
||||
/// Return the visible size of all timelines in this tenant.
|
||||
pub(crate) fn get_visible_size(&self) -> u64 {
|
||||
let timelines = self.timelines.lock().unwrap();
|
||||
timelines
|
||||
.values()
|
||||
.map(|t| t.metrics.visible_physical_size_gauge.get())
|
||||
.sum()
|
||||
}
|
||||
|
||||
/// Builds a new tenant manifest, and uploads it if it differs from the last-known tenant
|
||||
/// manifest in `Self::remote_tenant_manifest`.
|
||||
///
|
||||
|
||||
@@ -225,7 +225,7 @@ impl fmt::Display for ImageLayerName {
|
||||
/// storage and object names in remote storage consist of the LayerName plus some extra qualifiers
|
||||
/// that uniquely identify the physical incarnation of a layer (see [crate::tenant::remote_timeline_client::remote_layer_path])
|
||||
/// and [`crate::tenant::storage_layer::layer::local_layer_path`])
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Clone, Ord, PartialOrd)]
|
||||
pub enum LayerName {
|
||||
Image(ImageLayerName),
|
||||
Delta(DeltaLayerName),
|
||||
|
||||
@@ -17,17 +17,14 @@ use tracing::*;
|
||||
use utils::backoff::exponential_backoff_duration;
|
||||
use utils::completion::Barrier;
|
||||
use utils::pausable_failpoint;
|
||||
use utils::sync::gate::GateError;
|
||||
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::metrics::{self, BackgroundLoopSemaphoreMetricsRecorder, TENANT_TASK_EVENTS};
|
||||
use crate::task_mgr::{self, BACKGROUND_RUNTIME, TOKIO_WORKER_THREADS, TaskKind};
|
||||
use crate::tenant::blob_io::WriteBlobError;
|
||||
use crate::tenant::throttle::Stats;
|
||||
use crate::tenant::timeline::CompactionError;
|
||||
use crate::tenant::timeline::compaction::CompactionOutcome;
|
||||
use crate::tenant::{TenantShard, TenantState};
|
||||
use crate::virtual_file::owned_buffers_io::write::FlushTaskError;
|
||||
|
||||
/// Semaphore limiting concurrent background tasks (across all tenants).
|
||||
///
|
||||
@@ -310,45 +307,12 @@ pub(crate) fn log_compaction_error(
|
||||
task_cancelled: bool,
|
||||
degrade_to_warning: bool,
|
||||
) {
|
||||
use CompactionError::*;
|
||||
let is_cancel = err.is_cancel();
|
||||
|
||||
use crate::tenant::PageReconstructError;
|
||||
use crate::tenant::upload_queue::NotInitialized;
|
||||
|
||||
let level = match err {
|
||||
e if e.is_cancel() => return,
|
||||
ShuttingDown => return,
|
||||
_ if task_cancelled => Level::INFO,
|
||||
Other(err) => {
|
||||
let root_cause = err.root_cause();
|
||||
|
||||
let upload_queue = root_cause
|
||||
.downcast_ref::<NotInitialized>()
|
||||
.is_some_and(|e| e.is_stopping());
|
||||
let timeline = root_cause
|
||||
.downcast_ref::<PageReconstructError>()
|
||||
.is_some_and(|e| e.is_cancel());
|
||||
let buffered_writer_flush_task_canelled = root_cause
|
||||
.downcast_ref::<FlushTaskError>()
|
||||
.is_some_and(|e| e.is_cancel());
|
||||
let write_blob_cancelled = root_cause
|
||||
.downcast_ref::<WriteBlobError>()
|
||||
.is_some_and(|e| e.is_cancel());
|
||||
let gate_closed = root_cause
|
||||
.downcast_ref::<GateError>()
|
||||
.is_some_and(|e| e.is_cancel());
|
||||
let is_stopping = upload_queue
|
||||
|| timeline
|
||||
|| buffered_writer_flush_task_canelled
|
||||
|| write_blob_cancelled
|
||||
|| gate_closed;
|
||||
|
||||
if is_stopping {
|
||||
Level::INFO
|
||||
} else {
|
||||
Level::ERROR
|
||||
}
|
||||
}
|
||||
let level = if is_cancel || task_cancelled {
|
||||
Level::INFO
|
||||
} else {
|
||||
Level::ERROR
|
||||
};
|
||||
|
||||
if let Some((error_count, sleep_duration)) = retry_info {
|
||||
|
||||
@@ -351,6 +351,13 @@ pub struct Timeline {
|
||||
last_image_layer_creation_check_at: AtomicLsn,
|
||||
last_image_layer_creation_check_instant: std::sync::Mutex<Option<Instant>>,
|
||||
|
||||
// HADRON
|
||||
/// If a key range has writes with LSN > force_image_creation_lsn, then we should force image layer creation
|
||||
/// on this key range.
|
||||
force_image_creation_lsn: AtomicLsn,
|
||||
/// The last time instant when force_image_creation_lsn is computed.
|
||||
force_image_creation_lsn_computed_at: std::sync::Mutex<Option<Instant>>,
|
||||
|
||||
/// Current logical size of the "datadir", at the last LSN.
|
||||
current_logical_size: LogicalSize,
|
||||
|
||||
@@ -1002,7 +1009,7 @@ impl From<WaitLsnError> for tonic::Status {
|
||||
impl From<CreateImageLayersError> for CompactionError {
|
||||
fn from(e: CreateImageLayersError) -> Self {
|
||||
match e {
|
||||
CreateImageLayersError::Cancelled => CompactionError::ShuttingDown,
|
||||
CreateImageLayersError::Cancelled => CompactionError::new_cancelled(),
|
||||
CreateImageLayersError::Other(e) => {
|
||||
CompactionError::Other(e.context("create image layers"))
|
||||
}
|
||||
@@ -2117,12 +2124,7 @@ impl Timeline {
|
||||
match &result {
|
||||
Ok(_) => self.compaction_failed.store(false, AtomicOrdering::Relaxed),
|
||||
Err(e) if e.is_cancel() => {}
|
||||
Err(CompactionError::ShuttingDown) => {
|
||||
// Covered by the `Err(e) if e.is_cancel()` branch.
|
||||
}
|
||||
Err(CompactionError::Other(_)) => {
|
||||
self.compaction_failed.store(true, AtomicOrdering::Relaxed)
|
||||
}
|
||||
Err(_) => self.compaction_failed.store(true, AtomicOrdering::Relaxed),
|
||||
};
|
||||
|
||||
result
|
||||
@@ -2851,6 +2853,18 @@ impl Timeline {
|
||||
.unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
|
||||
}
|
||||
|
||||
// HADRON
|
||||
fn get_image_creation_timeout(&self) -> Option<Duration> {
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
tenant_conf
|
||||
.tenant_conf
|
||||
.image_layer_force_creation_period
|
||||
.or(self
|
||||
.conf
|
||||
.default_tenant_conf
|
||||
.image_layer_force_creation_period)
|
||||
}
|
||||
|
||||
fn get_compaction_algorithm_settings(&self) -> CompactionAlgorithmSettings {
|
||||
let tenant_conf = &self.tenant_conf.load();
|
||||
tenant_conf
|
||||
@@ -3120,7 +3134,9 @@ impl Timeline {
|
||||
repartition_threshold: 0,
|
||||
last_image_layer_creation_check_at: AtomicLsn::new(0),
|
||||
last_image_layer_creation_check_instant: Mutex::new(None),
|
||||
|
||||
// HADRON
|
||||
force_image_creation_lsn: AtomicLsn::new(0),
|
||||
force_image_creation_lsn_computed_at: std::sync::Mutex::new(None),
|
||||
last_received_wal: Mutex::new(None),
|
||||
rel_size_latest_cache: RwLock::new(HashMap::new()),
|
||||
rel_size_snapshot_cache: Mutex::new(LruCache::new(relsize_snapshot_cache_capacity)),
|
||||
@@ -5041,6 +5057,7 @@ impl Timeline {
|
||||
.create_image_layers(
|
||||
&partitions,
|
||||
self.initdb_lsn,
|
||||
None,
|
||||
ImageLayerCreationMode::Initial,
|
||||
ctx,
|
||||
LastImageLayerCreationStatus::Initial,
|
||||
@@ -5312,14 +5329,19 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// Is it time to create a new image layer for the given partition? True if we want to generate.
|
||||
async fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> bool {
|
||||
async fn time_for_new_image_layer(
|
||||
&self,
|
||||
partition: &KeySpace,
|
||||
lsn: Lsn,
|
||||
force_image_creation_lsn: Option<Lsn>,
|
||||
) -> bool {
|
||||
let threshold = self.get_image_creation_threshold();
|
||||
|
||||
let guard = self.layers.read(LayerManagerLockHolder::Compaction).await;
|
||||
let Ok(layers) = guard.layer_map() else {
|
||||
return false;
|
||||
};
|
||||
|
||||
let mut min_image_lsn: Lsn = Lsn::MAX;
|
||||
let mut max_deltas = 0;
|
||||
for part_range in &partition.ranges {
|
||||
let image_coverage = layers.image_coverage(part_range, lsn);
|
||||
@@ -5354,9 +5376,22 @@ impl Timeline {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
min_image_lsn = min(min_image_lsn, img_lsn);
|
||||
}
|
||||
}
|
||||
|
||||
// HADRON
|
||||
if min_image_lsn < force_image_creation_lsn.unwrap_or(Lsn(0)) && max_deltas > 0 {
|
||||
info!(
|
||||
"forcing image creation for partitioned range {}-{}. Min image LSN: {}, force image creation LSN: {}",
|
||||
partition.ranges[0].start,
|
||||
partition.ranges[0].end,
|
||||
min_image_lsn,
|
||||
force_image_creation_lsn.unwrap()
|
||||
);
|
||||
return true;
|
||||
}
|
||||
|
||||
debug!(
|
||||
max_deltas,
|
||||
"none of the partitioned ranges had >= {threshold} deltas"
|
||||
@@ -5582,7 +5617,7 @@ impl Timeline {
|
||||
/// suffer from the lack of image layers
|
||||
/// 2. For small tenants (that can mostly fit in RAM), we use a much longer interval
|
||||
fn should_check_if_image_layers_required(self: &Arc<Timeline>, lsn: Lsn) -> bool {
|
||||
const LARGE_TENANT_THRESHOLD: u64 = 2 * 1024 * 1024 * 1024;
|
||||
let large_timeline_threshold = self.conf.image_layer_generation_large_timeline_threshold;
|
||||
|
||||
let last_checks_at = self.last_image_layer_creation_check_at.load();
|
||||
let distance = lsn
|
||||
@@ -5596,12 +5631,12 @@ impl Timeline {
|
||||
let mut time_based_decision = false;
|
||||
let mut last_check_instant = self.last_image_layer_creation_check_instant.lock().unwrap();
|
||||
if let CurrentLogicalSize::Exact(logical_size) = self.current_logical_size.current_size() {
|
||||
let check_required_after = if Into::<u64>::into(&logical_size) >= LARGE_TENANT_THRESHOLD
|
||||
{
|
||||
self.get_checkpoint_timeout()
|
||||
} else {
|
||||
Duration::from_secs(3600 * 48)
|
||||
};
|
||||
let check_required_after =
|
||||
if Some(Into::<u64>::into(&logical_size)) >= large_timeline_threshold {
|
||||
self.get_checkpoint_timeout()
|
||||
} else {
|
||||
Duration::from_secs(3600 * 48)
|
||||
};
|
||||
|
||||
time_based_decision = match *last_check_instant {
|
||||
Some(last_check) => {
|
||||
@@ -5629,10 +5664,12 @@ impl Timeline {
|
||||
/// true = we have generate all image layers, false = we preempt the process for L0 compaction.
|
||||
///
|
||||
/// `partition_mode` is only for logging purpose and is not used anywhere in this function.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn create_image_layers(
|
||||
self: &Arc<Timeline>,
|
||||
partitioning: &KeyPartitioning,
|
||||
lsn: Lsn,
|
||||
force_image_creation_lsn: Option<Lsn>,
|
||||
mode: ImageLayerCreationMode,
|
||||
ctx: &RequestContext,
|
||||
last_status: LastImageLayerCreationStatus,
|
||||
@@ -5736,7 +5773,11 @@ impl Timeline {
|
||||
} else if let ImageLayerCreationMode::Try = mode {
|
||||
// check_for_image_layers = false -> skip
|
||||
// check_for_image_layers = true -> check time_for_new_image_layer -> skip/generate
|
||||
if !check_for_image_layers || !self.time_for_new_image_layer(partition, lsn).await {
|
||||
if !check_for_image_layers
|
||||
|| !self
|
||||
.time_for_new_image_layer(partition, lsn, force_image_creation_lsn)
|
||||
.await
|
||||
{
|
||||
start = img_range.end;
|
||||
continue;
|
||||
}
|
||||
@@ -6057,26 +6098,88 @@ impl Drop for Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
/// Top-level failure to compact.
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum CompactionError {
|
||||
#[error("The timeline or pageserver is shutting down")]
|
||||
ShuttingDown,
|
||||
#[error(transparent)]
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
pub(crate) use compaction_error::CompactionError;
|
||||
/// In a private mod to enforce that [`CompactionError::is_cancel`] is used
|
||||
/// instead of `match`ing on [`CompactionError::ShuttingDown`].
|
||||
mod compaction_error {
|
||||
use utils::sync::gate::GateError;
|
||||
|
||||
impl CompactionError {
|
||||
/// Errors that can be ignored, i.e., cancel and shutdown.
|
||||
pub fn is_cancel(&self) -> bool {
|
||||
matches!(self, Self::ShuttingDown)
|
||||
use crate::{
|
||||
pgdatadir_mapping::CollectKeySpaceError,
|
||||
tenant::{PageReconstructError, blob_io::WriteBlobError, upload_queue::NotInitialized},
|
||||
virtual_file::owned_buffers_io::write::FlushTaskError,
|
||||
};
|
||||
|
||||
/// Top-level failure to compact. Use [`Self::is_cancel`].
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum CompactionError {
|
||||
/// Use [`Self::is_cancel`] instead of checking for this variant.
|
||||
#[error("The timeline or pageserver is shutting down")]
|
||||
#[allow(private_interfaces)]
|
||||
ShuttingDown(ForbidMatching), // private ForbidMatching enforces use of [`Self::is_cancel`].
|
||||
#[error(transparent)]
|
||||
Other(anyhow::Error),
|
||||
}
|
||||
|
||||
pub fn from_collect_keyspace(err: CollectKeySpaceError) -> Self {
|
||||
if err.is_cancel() {
|
||||
Self::ShuttingDown
|
||||
} else {
|
||||
Self::Other(err.into_anyhow())
|
||||
#[derive(Debug)]
|
||||
struct ForbidMatching;
|
||||
|
||||
impl CompactionError {
|
||||
pub fn new_cancelled() -> Self {
|
||||
Self::ShuttingDown(ForbidMatching)
|
||||
}
|
||||
/// Errors that can be ignored, i.e., cancel and shutdown.
|
||||
pub fn is_cancel(&self) -> bool {
|
||||
let other = match self {
|
||||
CompactionError::ShuttingDown(_) => return true,
|
||||
CompactionError::Other(other) => other,
|
||||
};
|
||||
|
||||
// The write path of compaction in particular often lacks differentiated
|
||||
// handling errors stemming from cancellation from other errors.
|
||||
// So, if requested, we also check the ::Other variant by downcasting.
|
||||
// The list below has been found empirically from flaky tests and production logs.
|
||||
// The process is simple: on ::Other(), compaction will print the enclosed
|
||||
// anyhow::Error in debug mode, i.e., with backtrace. That backtrace contains the
|
||||
// line where the write path / compaction code does undifferentiated error handling
|
||||
// from a non-anyhow type to an anyhow type. Add the type to the list of downcasts
|
||||
// below, following the same is_cancel() pattern.
|
||||
|
||||
let root_cause = other.root_cause();
|
||||
|
||||
let upload_queue = root_cause
|
||||
.downcast_ref::<NotInitialized>()
|
||||
.is_some_and(|e| e.is_stopping());
|
||||
let timeline = root_cause
|
||||
.downcast_ref::<PageReconstructError>()
|
||||
.is_some_and(|e| e.is_cancel());
|
||||
let buffered_writer_flush_task_canelled = root_cause
|
||||
.downcast_ref::<FlushTaskError>()
|
||||
.is_some_and(|e| e.is_cancel());
|
||||
let write_blob_cancelled = root_cause
|
||||
.downcast_ref::<WriteBlobError>()
|
||||
.is_some_and(|e| e.is_cancel());
|
||||
let gate_closed = root_cause
|
||||
.downcast_ref::<GateError>()
|
||||
.is_some_and(|e| e.is_cancel());
|
||||
upload_queue
|
||||
|| timeline
|
||||
|| buffered_writer_flush_task_canelled
|
||||
|| write_blob_cancelled
|
||||
|| gate_closed
|
||||
}
|
||||
pub fn into_anyhow(self) -> anyhow::Error {
|
||||
match self {
|
||||
CompactionError::ShuttingDown(ForbidMatching) => anyhow::Error::new(self),
|
||||
CompactionError::Other(e) => e,
|
||||
}
|
||||
}
|
||||
pub fn from_collect_keyspace(err: CollectKeySpaceError) -> Self {
|
||||
if err.is_cancel() {
|
||||
Self::new_cancelled()
|
||||
} else {
|
||||
Self::Other(err.into_anyhow())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -6088,7 +6191,7 @@ impl From<super::upload_queue::NotInitialized> for CompactionError {
|
||||
CompactionError::Other(anyhow::anyhow!(value))
|
||||
}
|
||||
super::upload_queue::NotInitialized::ShuttingDown
|
||||
| super::upload_queue::NotInitialized::Stopped => CompactionError::ShuttingDown,
|
||||
| super::upload_queue::NotInitialized::Stopped => CompactionError::new_cancelled(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -6098,7 +6201,7 @@ impl From<super::storage_layer::layer::DownloadError> for CompactionError {
|
||||
match e {
|
||||
super::storage_layer::layer::DownloadError::TimelineShutdown
|
||||
| super::storage_layer::layer::DownloadError::DownloadCancelled => {
|
||||
CompactionError::ShuttingDown
|
||||
CompactionError::new_cancelled()
|
||||
}
|
||||
super::storage_layer::layer::DownloadError::ContextAndConfigReallyDeniesDownloads
|
||||
| super::storage_layer::layer::DownloadError::DownloadRequired
|
||||
@@ -6117,14 +6220,14 @@ impl From<super::storage_layer::layer::DownloadError> for CompactionError {
|
||||
|
||||
impl From<layer_manager::Shutdown> for CompactionError {
|
||||
fn from(_: layer_manager::Shutdown) -> Self {
|
||||
CompactionError::ShuttingDown
|
||||
CompactionError::new_cancelled()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<super::storage_layer::errors::PutError> for CompactionError {
|
||||
fn from(e: super::storage_layer::errors::PutError) -> Self {
|
||||
if e.is_cancel() {
|
||||
CompactionError::ShuttingDown
|
||||
CompactionError::new_cancelled()
|
||||
} else {
|
||||
CompactionError::Other(e.into_anyhow())
|
||||
}
|
||||
@@ -6223,7 +6326,7 @@ impl Timeline {
|
||||
let mut guard = tokio::select! {
|
||||
guard = self.layers.write(LayerManagerLockHolder::Compaction) => guard,
|
||||
_ = self.cancel.cancelled() => {
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
return Err(CompactionError::new_cancelled());
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -4,10 +4,11 @@
|
||||
//!
|
||||
//! The old legacy algorithm is implemented directly in `timeline.rs`.
|
||||
|
||||
use std::cmp::min;
|
||||
use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
|
||||
use std::ops::{Deref, Range};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
use super::layer_manager::LayerManagerLockHolder;
|
||||
use super::{
|
||||
@@ -33,6 +34,7 @@ use pageserver_api::models::{CompactInfoResponse, CompactKeyRange};
|
||||
use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
|
||||
use pageserver_compaction::helpers::{fully_contains, overlaps_with};
|
||||
use pageserver_compaction::interface::*;
|
||||
use postgres_ffi::to_pg_timestamp;
|
||||
use serde::Serialize;
|
||||
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -45,6 +47,7 @@ use wal_decoder::models::value::Value;
|
||||
|
||||
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
|
||||
use crate::page_cache;
|
||||
use crate::pgdatadir_mapping::LsnForTimestamp;
|
||||
use crate::statvfs::Statvfs;
|
||||
use crate::tenant::checks::check_valid_layermap;
|
||||
use crate::tenant::gc_block::GcBlock;
|
||||
@@ -572,8 +575,8 @@ impl GcCompactionQueue {
|
||||
}
|
||||
match res {
|
||||
Ok(res) => Ok(res),
|
||||
Err(CompactionError::ShuttingDown) => Err(CompactionError::ShuttingDown),
|
||||
Err(CompactionError::Other(_)) => {
|
||||
Err(e) if e.is_cancel() => Err(e),
|
||||
Err(_) => {
|
||||
// There are some cases where traditional gc might collect some layer
|
||||
// files causing gc-compaction cannot read the full history of the key.
|
||||
// This needs to be resolved in the long-term by improving the compaction
|
||||
@@ -1260,13 +1263,19 @@ impl Timeline {
|
||||
// Is the timeline being deleted?
|
||||
if self.is_stopping() {
|
||||
trace!("Dropping out of compaction on timeline shutdown");
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
return Err(CompactionError::new_cancelled());
|
||||
}
|
||||
|
||||
let target_file_size = self.get_checkpoint_distance();
|
||||
|
||||
// Define partitioning schema if needed
|
||||
|
||||
// HADRON
|
||||
let force_image_creation_lsn = self
|
||||
.get_or_compute_force_image_creation_lsn(cancel, ctx)
|
||||
.await
|
||||
.map_err(CompactionError::Other)?;
|
||||
|
||||
// 1. L0 Compact
|
||||
let l0_outcome = {
|
||||
let timer = self.metrics.compact_time_histo.start_timer();
|
||||
@@ -1274,6 +1283,7 @@ impl Timeline {
|
||||
.compact_level0(
|
||||
target_file_size,
|
||||
options.flags.contains(CompactFlags::ForceL0Compaction),
|
||||
force_image_creation_lsn,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
@@ -1376,6 +1386,7 @@ impl Timeline {
|
||||
.create_image_layers(
|
||||
&partitioning,
|
||||
lsn,
|
||||
force_image_creation_lsn,
|
||||
mode,
|
||||
&image_ctx,
|
||||
self.last_image_layer_creation_status
|
||||
@@ -1472,6 +1483,63 @@ impl Timeline {
|
||||
Ok(CompactionOutcome::Done)
|
||||
}
|
||||
|
||||
/* BEGIN_HADRON */
|
||||
// Get the force image creation LSN. Compute it if the last computed LSN is too old.
|
||||
async fn get_or_compute_force_image_creation_lsn(
|
||||
self: &Arc<Self>,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Option<Lsn>> {
|
||||
const FORCE_IMAGE_CREATION_LSN_COMPUTE_INTERVAL: Duration = Duration::from_secs(10 * 60); // 10 minutes
|
||||
let image_layer_force_creation_period = self.get_image_creation_timeout();
|
||||
if image_layer_force_creation_period.is_none() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let image_layer_force_creation_period = image_layer_force_creation_period.unwrap();
|
||||
let force_image_creation_lsn_computed_at =
|
||||
*self.force_image_creation_lsn_computed_at.lock().unwrap();
|
||||
if force_image_creation_lsn_computed_at.is_none()
|
||||
|| force_image_creation_lsn_computed_at.unwrap().elapsed()
|
||||
> FORCE_IMAGE_CREATION_LSN_COMPUTE_INTERVAL
|
||||
{
|
||||
let now: SystemTime = SystemTime::now();
|
||||
let timestamp = now
|
||||
.checked_sub(image_layer_force_creation_period)
|
||||
.ok_or_else(|| {
|
||||
anyhow::anyhow!(
|
||||
"image creation timeout is too large: {image_layer_force_creation_period:?}"
|
||||
)
|
||||
})?;
|
||||
let timestamp = to_pg_timestamp(timestamp);
|
||||
let force_image_creation_lsn = match self
|
||||
.find_lsn_for_timestamp(timestamp, cancel, ctx)
|
||||
.await?
|
||||
{
|
||||
LsnForTimestamp::Present(lsn) | LsnForTimestamp::Future(lsn) => lsn,
|
||||
_ => {
|
||||
let gc_lsn = *self.get_applied_gc_cutoff_lsn();
|
||||
tracing::info!(
|
||||
"no LSN found for timestamp {timestamp:?}, using latest GC cutoff LSN {}",
|
||||
gc_lsn
|
||||
);
|
||||
gc_lsn
|
||||
}
|
||||
};
|
||||
self.force_image_creation_lsn
|
||||
.store(force_image_creation_lsn);
|
||||
*self.force_image_creation_lsn_computed_at.lock().unwrap() = Some(Instant::now());
|
||||
tracing::info!(
|
||||
"computed force image creation LSN: {}",
|
||||
force_image_creation_lsn
|
||||
);
|
||||
Ok(Some(force_image_creation_lsn))
|
||||
} else {
|
||||
Ok(Some(self.force_image_creation_lsn.load()))
|
||||
}
|
||||
}
|
||||
/* END_HADRON */
|
||||
|
||||
/// Check for layers that are elegible to be rewritten:
|
||||
/// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
|
||||
/// we don't indefinitely retain keys in this shard that aren't needed.
|
||||
@@ -1624,7 +1692,7 @@ impl Timeline {
|
||||
|
||||
for (i, layer) in layers_to_rewrite.into_iter().enumerate() {
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
return Err(CompactionError::new_cancelled());
|
||||
}
|
||||
|
||||
info!(layer=%layer, "rewriting layer after shard split: {}/{}", i, total);
|
||||
@@ -1722,7 +1790,7 @@ impl Timeline {
|
||||
Ok(()) => {},
|
||||
Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)),
|
||||
Err(WaitCompletionError::UploadQueueShutDownOrStopped) => {
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
return Err(CompactionError::new_cancelled());
|
||||
}
|
||||
},
|
||||
// Don't wait if there's L0 compaction to do. We don't need to update the outcome
|
||||
@@ -1801,6 +1869,7 @@ impl Timeline {
|
||||
self: &Arc<Self>,
|
||||
target_file_size: u64,
|
||||
force_compaction_ignore_threshold: bool,
|
||||
force_compaction_lsn: Option<Lsn>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<CompactionOutcome, CompactionError> {
|
||||
let CompactLevel0Phase1Result {
|
||||
@@ -1821,6 +1890,7 @@ impl Timeline {
|
||||
stats,
|
||||
target_file_size,
|
||||
force_compaction_ignore_threshold,
|
||||
force_compaction_lsn,
|
||||
&ctx,
|
||||
)
|
||||
.instrument(phase1_span)
|
||||
@@ -1843,6 +1913,7 @@ impl Timeline {
|
||||
mut stats: CompactLevel0Phase1StatsBuilder,
|
||||
target_file_size: u64,
|
||||
force_compaction_ignore_threshold: bool,
|
||||
force_compaction_lsn: Option<Lsn>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<CompactLevel0Phase1Result, CompactionError> {
|
||||
let begin = tokio::time::Instant::now();
|
||||
@@ -1872,11 +1943,28 @@ impl Timeline {
|
||||
return Ok(CompactLevel0Phase1Result::default());
|
||||
}
|
||||
} else {
|
||||
debug!(
|
||||
level0_deltas = level0_deltas.len(),
|
||||
threshold, "too few deltas to compact"
|
||||
);
|
||||
return Ok(CompactLevel0Phase1Result::default());
|
||||
// HADRON
|
||||
let min_lsn = level0_deltas
|
||||
.iter()
|
||||
.map(|a| a.get_lsn_range().start)
|
||||
.reduce(min);
|
||||
if force_compaction_lsn.is_some()
|
||||
&& min_lsn.is_some()
|
||||
&& min_lsn.unwrap() < force_compaction_lsn.unwrap()
|
||||
{
|
||||
info!(
|
||||
"forcing L0 compaction of {} L0 deltas. Min lsn: {}, force compaction lsn: {}",
|
||||
level0_deltas.len(),
|
||||
min_lsn.unwrap(),
|
||||
force_compaction_lsn.unwrap()
|
||||
);
|
||||
} else {
|
||||
debug!(
|
||||
level0_deltas = level0_deltas.len(),
|
||||
threshold, "too few deltas to compact"
|
||||
);
|
||||
return Ok(CompactLevel0Phase1Result::default());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1985,7 +2073,7 @@ impl Timeline {
|
||||
let mut all_keys = Vec::new();
|
||||
for l in deltas_to_compact.iter() {
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
return Err(CompactionError::new_cancelled());
|
||||
}
|
||||
let delta = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
|
||||
let keys = delta
|
||||
@@ -2078,7 +2166,7 @@ impl Timeline {
|
||||
stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
|
||||
|
||||
if self.cancel.is_cancelled() {
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
return Err(CompactionError::new_cancelled());
|
||||
}
|
||||
|
||||
stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
|
||||
@@ -2186,7 +2274,7 @@ impl Timeline {
|
||||
// avoid hitting the cancellation token on every key. in benches, we end up
|
||||
// shuffling an order of million keys per layer, this means we'll check it
|
||||
// around tens of times per layer.
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
return Err(CompactionError::new_cancelled());
|
||||
}
|
||||
|
||||
let same_key = prev_key == Some(key);
|
||||
@@ -2271,7 +2359,7 @@ impl Timeline {
|
||||
if writer.is_none() {
|
||||
if self.cancel.is_cancelled() {
|
||||
// to be somewhat responsive to cancellation, check for each new layer
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
return Err(CompactionError::new_cancelled());
|
||||
}
|
||||
// Create writer if not initiaized yet
|
||||
writer = Some(
|
||||
@@ -2527,7 +2615,7 @@ impl Timeline {
|
||||
// Is the timeline being deleted?
|
||||
if self.is_stopping() {
|
||||
trace!("Dropping out of compaction on timeline shutdown");
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
return Err(CompactionError::new_cancelled());
|
||||
}
|
||||
|
||||
let (dense_ks, _sparse_ks) = self
|
||||
@@ -3189,7 +3277,7 @@ impl Timeline {
|
||||
let gc_lock = async {
|
||||
tokio::select! {
|
||||
guard = self.gc_lock.lock() => Ok(guard),
|
||||
_ = cancel.cancelled() => Err(CompactionError::ShuttingDown),
|
||||
_ = cancel.cancelled() => Err(CompactionError::new_cancelled()),
|
||||
}
|
||||
};
|
||||
|
||||
@@ -3462,7 +3550,7 @@ impl Timeline {
|
||||
}
|
||||
total_layer_size += layer.layer_desc().file_size;
|
||||
if cancel.is_cancelled() {
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
return Err(CompactionError::new_cancelled());
|
||||
}
|
||||
let should_yield = yield_for_l0
|
||||
&& self
|
||||
@@ -3609,7 +3697,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
if cancel.is_cancelled() {
|
||||
return Err(CompactionError::ShuttingDown);
|
||||
return Err(CompactionError::new_cancelled());
|
||||
}
|
||||
|
||||
let should_yield = yield_for_l0
|
||||
|
||||
@@ -953,7 +953,9 @@ neon_fmgr_hook(FmgrHookEventType event, FmgrInfo *flinfo, Datum *private)
|
||||
|
||||
/*
|
||||
* Fire Event Trigger if both function owner and current user are
|
||||
* superuser, or none of them are.
|
||||
* superuser. Allow executing Event Trigger function that belongs to a
|
||||
* superuser when connected as a non-superuser, even when the function is
|
||||
* SECURITY DEFINER.
|
||||
*/
|
||||
else if (event == FHET_START
|
||||
/* still enable it to pass pg_regress tests */
|
||||
@@ -976,32 +978,7 @@ neon_fmgr_hook(FmgrHookEventType event, FmgrInfo *flinfo, Datum *private)
|
||||
function_is_owned_by_super = superuser_arg(function_owner);
|
||||
|
||||
/*
|
||||
* 1. Refuse to run SECURITY DEFINER function that belongs to a
|
||||
* superuser when the current user is not a superuser itself.
|
||||
*/
|
||||
if (!role_is_super
|
||||
&& function_is_owned_by_super
|
||||
&& function_is_secdef)
|
||||
{
|
||||
char *func_name = get_func_name(flinfo->fn_oid);
|
||||
|
||||
ereport(WARNING,
|
||||
(errmsg("Skipping Event Trigger"),
|
||||
errdetail("Event Trigger function \"%s\" is owned by \"%s\" "
|
||||
"and is SECURITY DEFINER",
|
||||
func_name,
|
||||
GetUserNameFromId(function_owner, false))));
|
||||
|
||||
/*
|
||||
* we can't skip execution directly inside the fmgr_hook so
|
||||
* instead we change the event trigger function to a noop
|
||||
* function.
|
||||
*/
|
||||
force_noop(flinfo);
|
||||
}
|
||||
|
||||
/*
|
||||
* 2. Refuse to run functions that belongs to a non-superuser when the
|
||||
* Refuse to run functions that belongs to a non-superuser when the
|
||||
* current user is a superuser.
|
||||
*
|
||||
* We could run a SECURITY DEFINER user-function here and be safe with
|
||||
@@ -1009,7 +986,7 @@ neon_fmgr_hook(FmgrHookEventType event, FmgrInfo *flinfo, Datum *private)
|
||||
* infrastructure maintenance operations, where we prefer to skip
|
||||
* running user-defined code.
|
||||
*/
|
||||
else if (role_is_super && !function_is_owned_by_super)
|
||||
if (role_is_super && !function_is_owned_by_super)
|
||||
{
|
||||
char *func_name = get_func_name(flinfo->fn_oid);
|
||||
|
||||
|
||||
@@ -37,6 +37,7 @@ use tracing::*;
|
||||
use utils::auth::{JwtAuth, Scope, SwappableJwtAuth};
|
||||
use utils::id::NodeId;
|
||||
use utils::logging::{self, LogFormat, SecretString};
|
||||
use utils::metrics_collector::{METRICS_COLLECTION_INTERVAL, METRICS_COLLECTOR};
|
||||
use utils::sentry_init::init_sentry;
|
||||
use utils::{pid_file, project_build_tag, project_git_version, tcp_listener};
|
||||
|
||||
@@ -243,6 +244,11 @@ struct Args {
|
||||
#[arg(long)]
|
||||
enable_tls_wal_service_api: bool,
|
||||
|
||||
/// Controls whether to collect all metrics on each scrape or to return potentially stale
|
||||
/// results.
|
||||
#[arg(long, default_value_t = true)]
|
||||
force_metric_collection_on_scrape: bool,
|
||||
|
||||
/// Run in development mode (disables security checks)
|
||||
#[arg(long, help = "Run in development mode (disables security checks)")]
|
||||
dev: bool,
|
||||
@@ -428,6 +434,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
ssl_ca_certs,
|
||||
use_https_safekeeper_api: args.use_https_safekeeper_api,
|
||||
enable_tls_wal_service_api: args.enable_tls_wal_service_api,
|
||||
force_metric_collection_on_scrape: args.force_metric_collection_on_scrape,
|
||||
});
|
||||
|
||||
// initialize sentry if SENTRY_DSN is provided
|
||||
@@ -640,6 +647,26 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
|
||||
.map(|res| ("broker main".to_owned(), res));
|
||||
tasks_handles.push(Box::pin(broker_task_handle));
|
||||
|
||||
/* BEGIN_HADRON */
|
||||
if conf.force_metric_collection_on_scrape {
|
||||
let metrics_handle = current_thread_rt
|
||||
.as_ref()
|
||||
.unwrap_or_else(|| BACKGROUND_RUNTIME.handle())
|
||||
.spawn(async move {
|
||||
let mut interval: tokio::time::Interval =
|
||||
tokio::time::interval(METRICS_COLLECTION_INTERVAL);
|
||||
loop {
|
||||
interval.tick().await;
|
||||
tokio::task::spawn_blocking(|| {
|
||||
METRICS_COLLECTOR.run_once(true);
|
||||
});
|
||||
}
|
||||
})
|
||||
.map(|res| ("broker main".to_owned(), res));
|
||||
tasks_handles.push(Box::pin(metrics_handle));
|
||||
}
|
||||
/* END_HADRON */
|
||||
|
||||
set_build_info_metric(GIT_VERSION, BUILD_TAG);
|
||||
|
||||
// TODO: update tokio-stream, convert to real async Stream with
|
||||
|
||||
@@ -699,6 +699,11 @@ pub fn make_router(
|
||||
}))
|
||||
}
|
||||
|
||||
let force_metric_collection_on_scrape = conf.force_metric_collection_on_scrape;
|
||||
|
||||
let prometheus_metrics_handler_wrapper =
|
||||
move |req| prometheus_metrics_handler(req, force_metric_collection_on_scrape);
|
||||
|
||||
// NB: on any changes do not forget to update the OpenAPI spec
|
||||
// located nearby (/safekeeper/src/http/openapi_spec.yaml).
|
||||
let auth = conf.http_auth.clone();
|
||||
@@ -706,7 +711,9 @@ pub fn make_router(
|
||||
.data(conf)
|
||||
.data(global_timelines)
|
||||
.data(auth)
|
||||
.get("/metrics", |r| request_span(r, prometheus_metrics_handler))
|
||||
.get("/metrics", move |r| {
|
||||
request_span(r, prometheus_metrics_handler_wrapper)
|
||||
})
|
||||
.get("/profile/cpu", |r| request_span(r, profile_cpu_handler))
|
||||
.get("/profile/heap", |r| request_span(r, profile_heap_handler))
|
||||
.get("/v1/status", |r| request_span(r, status_handler))
|
||||
|
||||
@@ -134,6 +134,7 @@ pub struct SafeKeeperConf {
|
||||
pub ssl_ca_certs: Vec<Pem>,
|
||||
pub use_https_safekeeper_api: bool,
|
||||
pub enable_tls_wal_service_api: bool,
|
||||
pub force_metric_collection_on_scrape: bool,
|
||||
}
|
||||
|
||||
impl SafeKeeperConf {
|
||||
@@ -183,6 +184,7 @@ impl SafeKeeperConf {
|
||||
ssl_ca_certs: Vec::new(),
|
||||
use_https_safekeeper_api: false,
|
||||
enable_tls_wal_service_api: false,
|
||||
force_metric_collection_on_scrape: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -59,6 +59,15 @@ pub static FLUSH_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
|
||||
.expect("Failed to register safekeeper_flush_wal_seconds histogram")
|
||||
});
|
||||
/* BEGIN_HADRON */
|
||||
// Counter of all ProposerAcceptorMessage requests received
|
||||
pub static PROPOSER_ACCEPTOR_MESSAGES_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"safekeeper_proposer_acceptor_messages_total",
|
||||
"Total number of ProposerAcceptorMessage requests received by the Safekeeper.",
|
||||
&["outcome"]
|
||||
)
|
||||
.expect("Failed to register safekeeper_proposer_acceptor_messages_total counter")
|
||||
});
|
||||
pub static WAL_DISK_IO_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"safekeeper_wal_disk_io_errors",
|
||||
|
||||
@@ -24,7 +24,7 @@ use utils::id::{NodeId, TenantId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::pageserver_feedback::PageserverFeedback;
|
||||
|
||||
use crate::metrics::MISC_OPERATION_SECONDS;
|
||||
use crate::metrics::{MISC_OPERATION_SECONDS, PROPOSER_ACCEPTOR_MESSAGES_TOTAL};
|
||||
use crate::state::TimelineState;
|
||||
use crate::{control_file, wal_storage};
|
||||
|
||||
@@ -938,7 +938,7 @@ where
|
||||
&mut self,
|
||||
msg: &ProposerAcceptorMessage,
|
||||
) -> Result<Option<AcceptorProposerMessage>> {
|
||||
match msg {
|
||||
let res = match msg {
|
||||
ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg).await,
|
||||
ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg).await,
|
||||
ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg).await,
|
||||
@@ -949,7 +949,20 @@ where
|
||||
self.handle_append_request(msg, false).await
|
||||
}
|
||||
ProposerAcceptorMessage::FlushWAL => self.handle_flush().await,
|
||||
}
|
||||
};
|
||||
|
||||
// BEGIN HADRON
|
||||
match &res {
|
||||
Ok(_) => PROPOSER_ACCEPTOR_MESSAGES_TOTAL
|
||||
.with_label_values(&["success"])
|
||||
.inc(),
|
||||
Err(_) => PROPOSER_ACCEPTOR_MESSAGES_TOTAL
|
||||
.with_label_values(&["error"])
|
||||
.inc(),
|
||||
};
|
||||
|
||||
res
|
||||
// END HADRON
|
||||
}
|
||||
|
||||
/// Handle initial message from proposer: check its sanity and send my
|
||||
|
||||
@@ -166,7 +166,7 @@ fn hadron_determine_offloader(mgr: &Manager, state: &StateSnapshot) -> (Option<N
|
||||
|
||||
let backup_lag = state.commit_lsn.checked_sub(state.backup_lsn);
|
||||
if backup_lag.is_none() {
|
||||
info!("Backup lag is None. Skipping re-election.");
|
||||
debug!("Backup lag is None. Skipping re-election.");
|
||||
return (offloader, election_dbg_str);
|
||||
}
|
||||
|
||||
|
||||
@@ -190,6 +190,7 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
|
||||
ssl_ca_certs: Vec::new(),
|
||||
use_https_safekeeper_api: false,
|
||||
enable_tls_wal_service_api: false,
|
||||
force_metric_collection_on_scrape: true,
|
||||
};
|
||||
|
||||
let mut global = GlobalMap::new(disk, conf.clone())?;
|
||||
|
||||
@@ -1984,11 +1984,14 @@ impl Service {
|
||||
});
|
||||
|
||||
// Check that there is enough safekeepers configured that we can create new timelines
|
||||
let test_sk_res = this.safekeepers_for_new_timeline().await;
|
||||
let test_sk_res_str = match this.safekeepers_for_new_timeline().await {
|
||||
Ok(v) => format!("Ok({v:?})"),
|
||||
Err(v) => format!("Err({v:})"),
|
||||
};
|
||||
tracing::info!(
|
||||
timeline_safekeeper_count = config.timeline_safekeeper_count,
|
||||
timelines_onto_safekeepers = config.timelines_onto_safekeepers,
|
||||
"viability test result (test timeline creation on safekeepers): {test_sk_res:?}",
|
||||
"viability test result (test timeline creation on safekeepers): {test_sk_res_str}",
|
||||
);
|
||||
|
||||
Ok(this)
|
||||
@@ -4758,6 +4761,7 @@ impl Service {
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut retry_if_not_attached = false;
|
||||
let targets = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
let mut targets = Vec::new();
|
||||
@@ -4774,6 +4778,24 @@ impl Service {
|
||||
.expect("Pageservers may not be deleted while referenced");
|
||||
|
||||
targets.push((*tenant_shard_id, node.clone()));
|
||||
|
||||
if let Some(location) = shard.observed.locations.get(node_id) {
|
||||
if let Some(ref conf) = location.conf {
|
||||
if conf.mode != LocationConfigMode::AttachedSingle
|
||||
&& conf.mode != LocationConfigMode::AttachedMulti
|
||||
{
|
||||
// If the shard is attached as secondary, we need to retry if 404.
|
||||
retry_if_not_attached = true;
|
||||
}
|
||||
// If the shard is attached as primary, we should succeed.
|
||||
} else {
|
||||
// Location conf is not available yet, retry if 404.
|
||||
retry_if_not_attached = true;
|
||||
}
|
||||
} else {
|
||||
// The shard is not attached to the intended pageserver yet, retry if 404.
|
||||
retry_if_not_attached = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
targets
|
||||
@@ -4804,6 +4826,18 @@ impl Service {
|
||||
valid_until = Some(lease.valid_until);
|
||||
}
|
||||
}
|
||||
Err(mgmt_api::Error::ApiError(StatusCode::NOT_FOUND, _))
|
||||
if retry_if_not_attached =>
|
||||
{
|
||||
// This is expected if the attach is not finished yet. Return 503 so that the client can retry.
|
||||
return Err(ApiError::ResourceUnavailable(
|
||||
format!(
|
||||
"Timeline is not attached to the pageserver {} yet, please retry",
|
||||
node.get_id()
|
||||
)
|
||||
.into(),
|
||||
));
|
||||
}
|
||||
Err(e) => {
|
||||
return Err(passthrough_api_error(&node, e));
|
||||
}
|
||||
|
||||
@@ -1795,6 +1795,33 @@ def neon_env_builder(
|
||||
record_property("preserve_database_files", builder.preserve_database_files)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def neon_env_builder_local(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
test_output_dir: Path,
|
||||
pg_distrib_dir: Path,
|
||||
) -> NeonEnvBuilder:
|
||||
"""
|
||||
Fixture to create a Neon environment for test with its own pg_install copy.
|
||||
|
||||
This allows the test to edit the list of available extensions in the
|
||||
local instance of Postgres used for the test, and install extensions via
|
||||
downloading them when a remote extension is tested, for instance, or
|
||||
copying files around for local extension testing.
|
||||
"""
|
||||
test_local_pginstall = test_output_dir / "pg_install"
|
||||
log.info(f"copy {pg_distrib_dir} to {test_local_pginstall}")
|
||||
|
||||
# We can't copy only the version that we are currently testing because other
|
||||
# binaries like the storage controller need specific Postgres versions.
|
||||
shutil.copytree(pg_distrib_dir, test_local_pginstall)
|
||||
|
||||
neon_env_builder.pg_distrib_dir = test_local_pginstall
|
||||
log.info(f"local neon_env_builder.pg_distrib_dir: {neon_env_builder.pg_distrib_dir}")
|
||||
|
||||
return neon_env_builder
|
||||
|
||||
|
||||
@dataclass
|
||||
class PageserverPort:
|
||||
pg: int
|
||||
|
||||
@@ -333,6 +333,13 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
res = self.post(f"http://localhost:{self.port}/v1/reload_auth_validation_keys")
|
||||
self.verbose_error(res)
|
||||
|
||||
def list_tenant_visible_size(self) -> dict[TenantShardId, int]:
|
||||
res = self.get(f"http://localhost:{self.port}/v1/list_tenant_visible_size")
|
||||
self.verbose_error(res)
|
||||
res_json = res.json()
|
||||
assert isinstance(res_json, dict)
|
||||
return res_json
|
||||
|
||||
def tenant_list(self) -> list[dict[Any, Any]]:
|
||||
res = self.get(f"http://localhost:{self.port}/v1/tenant")
|
||||
self.verbose_error(res)
|
||||
@@ -1002,7 +1009,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
|
||||
def get_metrics_str(self) -> str:
|
||||
"""You probably want to use get_metrics() instead."""
|
||||
res = self.get(f"http://localhost:{self.port}/metrics")
|
||||
res = self.get(f"http://localhost:{self.port}/metrics?use_latest=true")
|
||||
self.verbose_error(res)
|
||||
return res.text
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ from __future__ import annotations
|
||||
import re
|
||||
import socket
|
||||
from contextlib import closing
|
||||
from itertools import cycle
|
||||
|
||||
from fixtures.log_helper import log
|
||||
|
||||
@@ -34,15 +35,23 @@ def can_bind(host: str, port: int) -> bool:
|
||||
|
||||
class PortDistributor:
|
||||
def __init__(self, base_port: int, port_number: int):
|
||||
self.iterator = iter(range(base_port, base_port + port_number))
|
||||
self.base_port = base_port
|
||||
self.port_number = port_number
|
||||
self.cycle = cycle(range(base_port, base_port + port_number))
|
||||
self.port_map: dict[int, int] = {}
|
||||
|
||||
def get_port(self) -> int:
|
||||
for port in self.iterator:
|
||||
checked = 0
|
||||
for port in self.cycle:
|
||||
if can_bind("localhost", port):
|
||||
return port
|
||||
elif checked < self.port_number:
|
||||
checked += 1
|
||||
else:
|
||||
break
|
||||
|
||||
raise RuntimeError(
|
||||
"port range configured for test is exhausted, consider enlarging the range"
|
||||
f"port range ({self.base_port}..{self.base_port + self.port_number}) configured for test is exhausted, consider enlarging the range"
|
||||
)
|
||||
|
||||
def replace_with_new_port(self, value: int | str) -> int | str:
|
||||
|
||||
@@ -143,7 +143,7 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
|
||||
|
||||
def get_metrics_str(self) -> str:
|
||||
"""You probably want to use get_metrics() instead."""
|
||||
request_result = self.get(f"http://localhost:{self.port}/metrics")
|
||||
request_result = self.get(f"http://localhost:{self.port}/metrics?use_latest=true")
|
||||
request_result.raise_for_status()
|
||||
return request_result.text
|
||||
|
||||
|
||||
@@ -0,0 +1,32 @@
|
||||
\echo Use "CREATE EXTENSION test_event_trigger_extension" to load this file. \quit
|
||||
|
||||
CREATE SCHEMA event_trigger;
|
||||
|
||||
create sequence if not exists event_trigger.seq_schema_version as int cycle;
|
||||
|
||||
create or replace function event_trigger.increment_schema_version()
|
||||
returns event_trigger
|
||||
security definer
|
||||
language plpgsql
|
||||
as $$
|
||||
begin
|
||||
perform pg_catalog.nextval('event_trigger.seq_schema_version');
|
||||
end;
|
||||
$$;
|
||||
|
||||
create or replace function event_trigger.get_schema_version()
|
||||
returns int
|
||||
security definer
|
||||
language sql
|
||||
as $$
|
||||
select last_value from event_trigger.seq_schema_version;
|
||||
$$;
|
||||
|
||||
-- On DDL event, increment the schema version number
|
||||
create event trigger event_trigger_watch_ddl
|
||||
on ddl_command_end
|
||||
execute procedure event_trigger.increment_schema_version();
|
||||
|
||||
create event trigger event_trigger_watch_drop
|
||||
on sql_drop
|
||||
execute procedure event_trigger.increment_schema_version();
|
||||
@@ -0,0 +1,8 @@
|
||||
default_version = '1.0'
|
||||
comment = 'Test extension with Event Trigger'
|
||||
|
||||
# make sure the extension objects are owned by the bootstrap user
|
||||
# to check that the SECURITY DEFINER event trigger function is still
|
||||
# called during non-superuser DDL events.
|
||||
superuser = true
|
||||
trusted = true
|
||||
@@ -165,6 +165,7 @@ def test_fully_custom_config(positive_env: NeonEnv):
|
||||
"gc_horizon": 23 * (1024 * 1024),
|
||||
"gc_period": "2h 13m",
|
||||
"image_creation_threshold": 7,
|
||||
"image_layer_force_creation_period": "1m",
|
||||
"pitr_interval": "1m",
|
||||
"lagging_wal_timeout": "23m",
|
||||
"lazy_slru_download": True,
|
||||
|
||||
@@ -944,3 +944,78 @@ def test_image_layer_compression(neon_env_builder: NeonEnvBuilder, enabled: bool
|
||||
f"SELECT count(*) FROM foo WHERE id={v} and val=repeat('abcde{v:0>3}', 500)"
|
||||
)
|
||||
assert res[0][0] == 1
|
||||
|
||||
|
||||
# BEGIN_HADRON
|
||||
def get_layer_map(env, tenant_shard_id, timeline_id, ps_id):
|
||||
client = env.pageservers[ps_id].http_client()
|
||||
layer_map = client.layer_map_info(tenant_shard_id, timeline_id)
|
||||
image_layer_count = 0
|
||||
delta_layer_count = 0
|
||||
for layer in layer_map.historic_layers:
|
||||
if layer.kind == "Image":
|
||||
image_layer_count += 1
|
||||
elif layer.kind == "Delta":
|
||||
delta_layer_count += 1
|
||||
return image_layer_count, delta_layer_count
|
||||
|
||||
|
||||
def test_image_creation_timeout(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Tests that page server can force creating new images if image creation timeout is enabled
|
||||
"""
|
||||
# use large knobs to disable L0 compaction/image creation except for the force image creation
|
||||
tenant_conf = {
|
||||
"compaction_threshold": "100",
|
||||
"image_creation_threshold": "100",
|
||||
"image_layer_creation_check_threshold": "1",
|
||||
"checkpoint_distance": 10 * 1024,
|
||||
"checkpoint_timeout": "1s",
|
||||
"image_layer_force_creation_period": "1s",
|
||||
# The lsn for forced image layer creations is calculated once every 10 minutes.
|
||||
# Hence, drive compaction manually such that the test doesn't compute it at the
|
||||
# wrong time.
|
||||
"compaction_period": "0s",
|
||||
}
|
||||
|
||||
# consider every tenant large to run the image layer generation check more eagerly
|
||||
neon_env_builder.pageserver_config_override = (
|
||||
"image_layer_generation_large_timeline_threshold=0"
|
||||
)
|
||||
|
||||
neon_env_builder.num_pageservers = 1
|
||||
neon_env_builder.num_safekeepers = 1
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=tenant_conf)
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)")
|
||||
# Generate some rows.
|
||||
for v in range(10):
|
||||
endpoint.safe_psql(f"INSERT INTO foo (id, val) VALUES ({v}, repeat('abcde{v:0>3}', 500))")
|
||||
|
||||
# Sleep a bit such that the inserts are considered when calculating the forced image layer creation LSN.
|
||||
time.sleep(2)
|
||||
|
||||
def check_force_image_creation():
|
||||
ps_http = env.pageserver.http_client()
|
||||
ps_http.timeline_compact(tenant_id, timeline_id)
|
||||
image, delta = get_layer_map(env, tenant_id, timeline_id, 0)
|
||||
log.info(f"images: {image}, deltas: {delta}")
|
||||
assert image > 0
|
||||
|
||||
env.pageserver.assert_log_contains("forcing L0 compaction of")
|
||||
env.pageserver.assert_log_contains("forcing image creation for partitioned range")
|
||||
|
||||
wait_until(check_force_image_creation)
|
||||
|
||||
endpoint.stop_and_destroy()
|
||||
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*created delta file of size.*larger than double of target.*"
|
||||
)
|
||||
|
||||
|
||||
# END_HADRON
|
||||
|
||||
@@ -2,7 +2,6 @@ from __future__ import annotations
|
||||
|
||||
import os
|
||||
import platform
|
||||
import shutil
|
||||
import tarfile
|
||||
from enum import StrEnum
|
||||
from pathlib import Path
|
||||
@@ -31,27 +30,6 @@ if TYPE_CHECKING:
|
||||
from werkzeug.wrappers.request import Request
|
||||
|
||||
|
||||
# use neon_env_builder_local fixture to override the default neon_env_builder fixture
|
||||
# and use a test-specific pg_install instead of shared one
|
||||
@pytest.fixture(scope="function")
|
||||
def neon_env_builder_local(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
test_output_dir: Path,
|
||||
pg_distrib_dir: Path,
|
||||
) -> NeonEnvBuilder:
|
||||
test_local_pginstall = test_output_dir / "pg_install"
|
||||
log.info(f"copy {pg_distrib_dir} to {test_local_pginstall}")
|
||||
|
||||
# We can't copy only the version that we are currently testing because other
|
||||
# binaries like the storage controller need specific Postgres versions.
|
||||
shutil.copytree(pg_distrib_dir, test_local_pginstall)
|
||||
|
||||
neon_env_builder.pg_distrib_dir = test_local_pginstall
|
||||
log.info(f"local neon_env_builder.pg_distrib_dir: {neon_env_builder.pg_distrib_dir}")
|
||||
|
||||
return neon_env_builder
|
||||
|
||||
|
||||
@final
|
||||
class RemoteExtension(StrEnum):
|
||||
SQL_ONLY = "test_extension_sql_only"
|
||||
|
||||
102
test_runner/regress/test_event_trigger_extension.py
Normal file
102
test_runner/regress/test_event_trigger_extension.py
Normal file
@@ -0,0 +1,102 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, cast
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.paths import BASE_DIR
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pathlib import Path
|
||||
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
)
|
||||
from fixtures.pg_version import PgVersion
|
||||
|
||||
|
||||
# use neon_env_builder_local fixture to override the default neon_env_builder fixture
|
||||
# and use a test-specific pg_install instead of shared one
|
||||
@pytest.fixture(scope="function")
|
||||
def neon_env_builder_event_trigger_extension(
|
||||
neon_env_builder_local: NeonEnvBuilder,
|
||||
test_output_dir: Path,
|
||||
pg_version: PgVersion,
|
||||
) -> NeonEnvBuilder:
|
||||
test_local_pginstall = test_output_dir / "pg_install"
|
||||
|
||||
# Now copy the SQL only extension test_event_trigger_extension in the local
|
||||
# pginstall extension directory on-disk
|
||||
test_event_trigger_extension_dir = (
|
||||
BASE_DIR / "test_runner" / "regress" / "data" / "test_event_trigger_extension"
|
||||
)
|
||||
|
||||
test_local_extension_dir = (
|
||||
test_local_pginstall / f"v{pg_version}" / "share" / "postgresql" / "extension"
|
||||
)
|
||||
|
||||
log.info(f"copy {test_event_trigger_extension_dir} to {test_local_extension_dir}")
|
||||
|
||||
for f in [
|
||||
test_event_trigger_extension_dir / "test_event_trigger_extension.control",
|
||||
test_event_trigger_extension_dir / "test_event_trigger_extension--1.0.sql",
|
||||
]:
|
||||
shutil.copy(f, test_local_extension_dir)
|
||||
|
||||
return neon_env_builder_local
|
||||
|
||||
|
||||
def test_event_trigger_extension(neon_env_builder_event_trigger_extension: NeonEnvBuilder):
|
||||
"""
|
||||
Test installing an extension that contains an Event Trigger.
|
||||
|
||||
The Event Trigger function is owned by the extension owner, which at
|
||||
CREATE EXTENSION is going to be the Postgres bootstrap user, per the
|
||||
extension control file where both superuser = true and trusted = true.
|
||||
|
||||
Also this function is SECURTY DEFINER, to allow for making changes to
|
||||
the extension SQL objects, in our case a sequence.
|
||||
|
||||
This test makes sure that the event trigger function is fired correctly
|
||||
by non-privileged user DDL actions such as CREATE TABLE.
|
||||
"""
|
||||
env = neon_env_builder_event_trigger_extension.init_start()
|
||||
env.create_branch("test_event_trigger_extension")
|
||||
|
||||
endpoint = env.endpoints.create_start("test_event_trigger_extension")
|
||||
extension = "test_event_trigger_extension"
|
||||
database = "test_event_trigger_extension"
|
||||
|
||||
endpoint.safe_psql(f"CREATE DATABASE {database}")
|
||||
endpoint.safe_psql(f"CREATE EXTENSION {extension}", dbname=database)
|
||||
|
||||
# check that the extension is owned by the bootstrap superuser (cloud_admin)
|
||||
pg_bootstrap_superuser_name = "cloud_admin"
|
||||
with endpoint.connect(dbname=database) as pg_conn:
|
||||
with pg_conn.cursor() as cur:
|
||||
cur.execute(
|
||||
f"select rolname from pg_roles r join pg_extension e on r.oid = e.extowner where extname = '{extension}'"
|
||||
)
|
||||
owner = cast("tuple[str]", cur.fetchone())[0]
|
||||
assert owner == pg_bootstrap_superuser_name, (
|
||||
f"extension {extension} is not owned by bootstrap user '{pg_bootstrap_superuser_name}'"
|
||||
)
|
||||
|
||||
# test that the SQL-only Event Trigger (SECURITY DEFINER function) runs
|
||||
# correctly now that the extension has been installed
|
||||
#
|
||||
# create table to trigger the event trigger, twice, check sequence count
|
||||
with endpoint.connect(dbname=database) as pg_conn:
|
||||
log.info("creating SQL objects (tables)")
|
||||
with pg_conn.cursor() as cur:
|
||||
cur.execute("CREATE TABLE foo1(id int primary key)")
|
||||
cur.execute("CREATE TABLE foo2(id int)")
|
||||
|
||||
cur.execute("SELECT event_trigger.get_schema_version()")
|
||||
res = cast("tuple[int]", cur.fetchone())
|
||||
ver = res[0]
|
||||
|
||||
log.info(f"schema version is now {ver}")
|
||||
assert ver == 2, "schema version is not 2"
|
||||
@@ -1,6 +1,7 @@
|
||||
import random
|
||||
import threading
|
||||
from enum import StrEnum
|
||||
from time import sleep
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
@@ -24,18 +25,7 @@ OFFLOAD_LABEL = "compute_ctl_lfc_offloads_total"
|
||||
OFFLOAD_ERR_LABEL = "compute_ctl_lfc_offload_errors_total"
|
||||
METHOD_VALUES = [e for e in PrewarmMethod]
|
||||
METHOD_IDS = [e.value for e in PrewarmMethod]
|
||||
|
||||
|
||||
def check_pinned_entries(cur: Cursor):
|
||||
"""
|
||||
Wait till none of LFC buffers are pinned
|
||||
"""
|
||||
|
||||
def none_pinned():
|
||||
cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_chunks_pinned'")
|
||||
assert cur.fetchall()[0][0] == 0
|
||||
|
||||
wait_until(none_pinned)
|
||||
AUTOOFFLOAD_INTERVAL_SECS = 2
|
||||
|
||||
|
||||
def prom_parse(client: EndpointHttpClient) -> dict[str, float]:
|
||||
@@ -49,9 +39,18 @@ def prom_parse(client: EndpointHttpClient) -> dict[str, float]:
|
||||
|
||||
|
||||
def offload_lfc(method: PrewarmMethod, client: EndpointHttpClient, cur: Cursor) -> Any:
|
||||
if method == PrewarmMethod.POSTGRES:
|
||||
cur.execute("select get_local_cache_state()")
|
||||
return cur.fetchall()[0][0]
|
||||
|
||||
if method == PrewarmMethod.AUTOPREWARM:
|
||||
# With autoprewarm, we need to be sure LFC was offloaded after all writes
|
||||
# finish, so we sleep. Otherwise we'll have less prewarmed pages than we want
|
||||
sleep(AUTOOFFLOAD_INTERVAL_SECS)
|
||||
client.offload_lfc_wait()
|
||||
elif method == PrewarmMethod.COMPUTE_CTL:
|
||||
return
|
||||
|
||||
if method == PrewarmMethod.COMPUTE_CTL:
|
||||
status = client.prewarm_lfc_status()
|
||||
assert status["status"] == "not_prewarmed"
|
||||
assert "error" not in status
|
||||
@@ -60,11 +59,9 @@ def offload_lfc(method: PrewarmMethod, client: EndpointHttpClient, cur: Cursor)
|
||||
parsed = prom_parse(client)
|
||||
desired = {OFFLOAD_LABEL: 1, PREWARM_LABEL: 0, OFFLOAD_ERR_LABEL: 0, PREWARM_ERR_LABEL: 0}
|
||||
assert parsed == desired, f"{parsed=} != {desired=}"
|
||||
elif method == PrewarmMethod.POSTGRES:
|
||||
cur.execute("select get_local_cache_state()")
|
||||
return cur.fetchall()[0][0]
|
||||
else:
|
||||
raise AssertionError(f"{method} not in PrewarmMethod")
|
||||
return
|
||||
|
||||
raise AssertionError(f"{method} not in PrewarmMethod")
|
||||
|
||||
|
||||
def prewarm_endpoint(
|
||||
@@ -106,14 +103,13 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod):
|
||||
"neon.file_cache_size_limit=1GB",
|
||||
"neon.file_cache_prewarm_limit=1000",
|
||||
]
|
||||
offload_secs = 2
|
||||
|
||||
if method == PrewarmMethod.AUTOPREWARM:
|
||||
endpoint = env.endpoints.create_start(
|
||||
branch_name="main",
|
||||
config_lines=cfg,
|
||||
autoprewarm=True,
|
||||
offload_lfc_interval_seconds=offload_secs,
|
||||
offload_lfc_interval_seconds=AUTOOFFLOAD_INTERVAL_SECS,
|
||||
)
|
||||
else:
|
||||
endpoint = env.endpoints.create_start(branch_name="main", config_lines=cfg)
|
||||
@@ -135,7 +131,7 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod):
|
||||
|
||||
endpoint.stop()
|
||||
if method == PrewarmMethod.AUTOPREWARM:
|
||||
endpoint.start(autoprewarm=True, offload_lfc_interval_seconds=offload_secs)
|
||||
endpoint.start(autoprewarm=True, offload_lfc_interval_seconds=AUTOOFFLOAD_INTERVAL_SECS)
|
||||
else:
|
||||
endpoint.start()
|
||||
|
||||
@@ -162,7 +158,6 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod):
|
||||
lfc_cur.execute("select sum(pk) from t")
|
||||
assert lfc_cur.fetchall()[0][0] == n_records * (n_records + 1) / 2
|
||||
|
||||
check_pinned_entries(pg_cur)
|
||||
desired = {"status": "completed", "total": total, "prewarmed": prewarmed, "skipped": skipped}
|
||||
check_prewarmed(method, client, desired)
|
||||
|
||||
@@ -243,9 +238,9 @@ def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv, method: PrewarmMet
|
||||
prewarm_thread.start()
|
||||
|
||||
def prewarmed():
|
||||
assert n_prewarms > 5
|
||||
assert n_prewarms > 3
|
||||
|
||||
wait_until(prewarmed)
|
||||
wait_until(prewarmed, timeout=40) # debug builds don't finish in 20s
|
||||
|
||||
running = False
|
||||
for t in workload_threads:
|
||||
@@ -256,7 +251,6 @@ def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv, method: PrewarmMet
|
||||
total_balance = lfc_cur.fetchall()[0][0]
|
||||
assert total_balance == 0
|
||||
|
||||
check_pinned_entries(pg_cur)
|
||||
if method == PrewarmMethod.POSTGRES:
|
||||
return
|
||||
desired = {
|
||||
|
||||
@@ -3,6 +3,7 @@ from __future__ import annotations
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from fixtures.common_types import Lsn, TenantId, TimelineId
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
DEFAULT_BRANCH_NAME,
|
||||
NeonEnv,
|
||||
@@ -164,3 +165,15 @@ def test_pageserver_http_index_part_force_patch(neon_env_builder: NeonEnvBuilder
|
||||
{"rel_size_migration": "legacy"},
|
||||
)
|
||||
assert client.timeline_detail(tenant_id, timeline_id)["rel_size_migration"] == "legacy"
|
||||
|
||||
|
||||
def test_pageserver_get_tenant_visible_size(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.num_pageservers = 1
|
||||
env = neon_env_builder.init_start()
|
||||
env.create_tenant(shard_count=4)
|
||||
env.create_tenant(shard_count=2)
|
||||
|
||||
json = env.pageserver.http_client().list_tenant_visible_size()
|
||||
log.info(f"{json}")
|
||||
# initial tennat + 2 newly created tenants
|
||||
assert len(json) == 7
|
||||
|
||||
Reference in New Issue
Block a user