Compare commits

..

2 Commits

Author SHA1 Message Date
Christian Schwarz
d2e72f119f simplify the test, failure now looks like this:
```
2025-07-10 13:00:25.198 INFO [neon_fixtures.py:5643] caughtup=True, primary_lsn=0/53F63B0, secondary_lsn=0/53F63B0
2025-07-10 13:00:25.200 INFO [neon_fixtures.py:265] Hostname: localhost
2025-07-10 13:00:25.239 INFO [test_hot_standby.py:241] tenant_shard_id.shard_index=ShardIndex(shard_number=0, shard_count=0): standby_horizon_at_ps=Lsn("0/14EEC38") secondary_apply_lsn=Lsn("0/53F63B0")
2025-07-10 13:00:26.241 INFO [neon_fixtures.py:265] Hostname: localhost
2025-07-10 13:00:26.269 INFO [test_hot_standby.py:241] tenant_shard_id.shard_index=ShardIndex(shard_number=0, shard_count=0): standby_horizon_at_ps=Lsn("0/14EEC38") secondary_apply_lsn=Lsn("0/53F63B0")
2025-07-10 13:00:27.271 INFO [neon_fixtures.py:265] Hostname: localhost
...
2025-07-10 13:00:35.542 INFO [test_hot_standby.py:241] tenant_shard_id.shard_index=ShardIndex(shard_number=0, shard_count=0): standby_horizon_at_ps=Lsn("0/14EEC38") secondary_apply_lsn=Lsn("0/53F63E8")
...
FAILED test_runner/regress/test_hot_standby.py::test_hot_standby_gc[debug-pg16-True] - Failed: standby_horizon didn't propagate within timeout_secs=10, this is holding up gc on secondary
```
2025-07-10 13:00:43 +00:00
Christian Schwarz
45f5dfc685 extend test_hot_standby_gc to demonstrate that it doesn't work without hot_standby_feedback
The bug described in
- https://github.com/neondatabase/neon/issues/12168

is not covered by `test_hot_standby_gc[pause_apply=True]`.

The reason is that the first standby reply _does_ trigger an update of the
aggregate, but subsequent standby replies don't. Only
hot_standby_feedback messaages do.

In the test, that first reply sets standby_horizon to a low value X,
thereby inhibiting gc, thereby making the test pass.
However, the aggregate standby_horizon pushed by SKs remains at that low
value X from here on.

Actually, uh oh, this means standby_horizon is _kept_ at X!?
And thus, if the PS receives re-push of low value X before the next GC,
it will continue to limit GC to below X, until we hit the hard-coded
`MAX_ALLOWED_STANDBY_LAG`.

This is worse than I thought: it means that the current code _is_ preventing
GC _even_ for standbys _without_ `hot_standby_feedback`, if those standbys
shut down before 10GiB of WAL has been written since they were launched.

Turn some debug logging into info logging to make the case.

Sigh.

refs
- https://github.com/neondatabase/neon/issues/12168
2025-07-10 12:41:54 +00:00
61 changed files with 332 additions and 1473 deletions

3
Cargo.lock generated
View File

@@ -4294,7 +4294,6 @@ dependencies = [
"humantime-serde",
"pageserver_api",
"pageserver_client",
"pageserver_client_grpc",
"pageserver_page_api",
"rand 0.8.5",
"reqwest",
@@ -4324,7 +4323,6 @@ dependencies = [
"pageserver_api",
"postgres_ffi",
"remote_storage",
"serde",
"serde_json",
"svg_fmt",
"thiserror 1.0.69",
@@ -4501,7 +4499,6 @@ name = "pageserver_client_grpc"
version = "0.1.0"
dependencies = [
"anyhow",
"arc-swap",
"bytes",
"compute_api",
"futures",

View File

@@ -262,7 +262,6 @@ neon-shmem = { version = "0.1", path = "./libs/neon-shmem/" }
pageserver = { path = "./pageserver" }
pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }
pageserver_client = { path = "./pageserver/client" }
pageserver_client_grpc = { path = "./pageserver/client_grpc" }
pageserver_compaction = { version = "0.1", path = "./pageserver/compaction/" }
pageserver_page_api = { path = "./pageserver/page_api" }
postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" }

View File

@@ -46,14 +46,11 @@ stateDiagram-v2
Configuration --> Failed : Failed to configure the compute
Configuration --> Running : Compute has been configured
Empty --> Init : Compute spec is immediately available
Empty --> TerminationPendingFast : Requested termination
Empty --> TerminationPendingImmediate : Requested termination
Empty --> TerminationPending : Requested termination
Init --> Failed : Failed to start Postgres
Init --> Running : Started Postgres
Running --> TerminationPendingFast : Requested termination
Running --> TerminationPendingImmediate : Requested termination
TerminationPendingFast --> Terminated compute with 30s delay for cplane to inspect status
TerminationPendingImmediate --> Terminated : Terminated compute immediately
Running --> TerminationPending : Requested termination
TerminationPending --> Terminated : Terminated compute
Failed --> [*] : Compute exited
Terminated --> [*] : Compute exited
```

View File

@@ -956,20 +956,14 @@ impl ComputeNode {
None
};
let mut delay_exit = false;
let mut state = self.state.lock().unwrap();
state.terminate_flush_lsn = lsn;
let delay_exit = state.status == ComputeStatus::TerminationPendingFast;
if state.status == ComputeStatus::TerminationPendingFast
|| state.status == ComputeStatus::TerminationPendingImmediate
{
info!(
"Changing compute status from {} to {}",
state.status,
ComputeStatus::Terminated
);
if let ComputeStatus::TerminationPending { mode } = state.status {
state.status = ComputeStatus::Terminated;
self.state_changed.notify_all();
// we were asked to terminate gracefully, don't exit to avoid restart
delay_exit = mode == compute_api::responses::TerminateMode::Fast
}
drop(state);
@@ -1811,8 +1805,6 @@ impl ComputeNode {
tls_config,
)?;
self.pg_reload_conf()?;
if !spec.skip_pg_catalog_updates {
let max_concurrent_connections = spec.reconfigure_concurrency;
// Temporarily reset max_cluster_size in config
@@ -1832,9 +1824,10 @@ impl ComputeNode {
Ok(())
})?;
self.pg_reload_conf()?;
}
self.pg_reload_conf()?;
let unknown_op = "unknown".to_string();
let op_id = spec.operation_uuid.as_ref().unwrap_or(&unknown_op);
info!(
@@ -1907,8 +1900,7 @@ impl ComputeNode {
// exit loop
ComputeStatus::Failed
| ComputeStatus::TerminationPendingFast
| ComputeStatus::TerminationPendingImmediate
| ComputeStatus::TerminationPending { .. }
| ComputeStatus::Terminated => break 'cert_update,
// wait

View File

@@ -70,7 +70,7 @@ impl ComputeNode {
}
};
let row = match client
.query_one("select * from neon.get_prewarm_info()", &[])
.query_one("select * from get_prewarm_info()", &[])
.await
{
Ok(row) => row,
@@ -146,7 +146,7 @@ impl ComputeNode {
ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await
.context("connecting to postgres")?
.query_one("select neon.prewarm_local_cache($1)", &[&uncompressed])
.query_one("select prewarm_local_cache($1)", &[&uncompressed])
.await
.context("loading LFC state into postgres")
.map(|_| ())
@@ -196,7 +196,7 @@ impl ComputeNode {
ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await
.context("connecting to postgres")?
.query_one("select neon.get_local_cache_state()", &[])
.query_one("select get_local_cache_state()", &[])
.await
.context("querying LFC state")?
.try_get::<usize, &[u8]>(0)

View File

@@ -371,28 +371,9 @@ paths:
summary: Terminate Postgres and wait for it to exit
description: ""
operationId: terminate
parameters:
- name: mode
in: query
description: "Terminate mode: fast (wait 30s before returning) and immediate"
required: false
schema:
type: string
enum: ["fast", "immediate"]
default: fast
responses:
200:
description: Result
content:
application/json:
schema:
$ref: "#/components/schemas/TerminateResponse"
201:
description: Result if compute is already terminated
content:
application/json:
schema:
$ref: "#/components/schemas/TerminateResponse"
412:
description: "wrong state"
content:
@@ -549,14 +530,11 @@ components:
type: string
enum:
- empty
- configuration_pending
- init
- running
- configuration
- failed
- termination_pending_fast
- termination_pending_immediate
- terminated
- running
- configuration_pending
- configuration
example: running
ExtensionInstallRequest:
@@ -682,17 +660,6 @@ components:
description: Role name.
example: "neon"
TerminateResponse:
type: object
required:
- lsn
properties:
lsn:
type: string
nullable: true
description: "last WAL flush LSN"
example: "0/028F10D8"
SetRoleGrantsResponse:
type: object
required:

View File

@@ -3,7 +3,7 @@ use crate::http::JsonResponse;
use axum::extract::State;
use axum::response::Response;
use axum_extra::extract::OptionalQuery;
use compute_api::responses::{ComputeStatus, TerminateMode, TerminateResponse};
use compute_api::responses::{ComputeStatus, TerminateResponse};
use http::StatusCode;
use serde::Deserialize;
use std::sync::Arc;
@@ -12,7 +12,7 @@ use tracing::info;
#[derive(Deserialize, Default)]
pub struct TerminateQuery {
mode: TerminateMode,
mode: compute_api::responses::TerminateMode,
}
/// Terminate the compute.
@@ -24,16 +24,16 @@ pub(in crate::http) async fn terminate(
{
let mut state = compute.state.lock().unwrap();
if state.status == ComputeStatus::Terminated {
let response = TerminateResponse {
lsn: state.terminate_flush_lsn,
};
return JsonResponse::success(StatusCode::CREATED, response);
return JsonResponse::success(StatusCode::CREATED, state.terminate_flush_lsn);
}
if !matches!(state.status, ComputeStatus::Empty | ComputeStatus::Running) {
return JsonResponse::invalid_status(state.status);
}
state.set_status(mode.into(), &compute.state_changed);
state.set_status(
ComputeStatus::TerminationPending { mode },
&compute.state_changed,
);
}
forward_termination_signal(false);

View File

@@ -1,16 +1,3 @@
-- On December 8th, 2023, an engineering escalation (INC-110) was opened after
-- it was found that BYPASSRLS was being applied to all roles.
--
-- PR that introduced the issue: https://github.com/neondatabase/neon/pull/5657
-- Subsequent commit on main: https://github.com/neondatabase/neon/commit/ad99fa5f0393e2679e5323df653c508ffa0ac072
--
-- NOBYPASSRLS and INHERIT are the defaults for a Postgres role, but because it
-- isn't easy to know if a Postgres cluster is affected by the issue, we need to
-- keep the migration around for a long time, if not indefinitely, so any
-- cluster can be fixed.
--
-- Branching is the gift that keeps on giving...
DO $$
DECLARE
role_name text;

View File

@@ -84,8 +84,7 @@ impl ComputeMonitor {
if matches!(
compute_status,
ComputeStatus::Terminated
| ComputeStatus::TerminationPendingFast
| ComputeStatus::TerminationPendingImmediate
| ComputeStatus::TerminationPending { .. }
| ComputeStatus::Failed
) {
info!(

View File

@@ -922,8 +922,7 @@ impl Endpoint {
ComputeStatus::Empty
| ComputeStatus::ConfigurationPending
| ComputeStatus::Configuration
| ComputeStatus::TerminationPendingFast
| ComputeStatus::TerminationPendingImmediate
| ComputeStatus::TerminationPending { .. }
| ComputeStatus::Terminated => {
bail!("unexpected compute status: {:?}", state.status)
}

View File

@@ -121,15 +121,6 @@ pub enum TerminateMode {
Immediate,
}
impl From<TerminateMode> for ComputeStatus {
fn from(mode: TerminateMode) -> Self {
match mode {
TerminateMode::Fast => ComputeStatus::TerminationPendingFast,
TerminateMode::Immediate => ComputeStatus::TerminationPendingImmediate,
}
}
}
#[derive(Serialize, Clone, Copy, Debug, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum ComputeStatus {
@@ -150,9 +141,7 @@ pub enum ComputeStatus {
// control-plane to terminate it.
Failed,
// Termination requested
TerminationPendingFast,
// Termination requested, without waiting 30s before returning from /terminate
TerminationPendingImmediate,
TerminationPending { mode: TerminateMode },
// Terminated Postgres
Terminated,
}
@@ -171,10 +160,7 @@ impl Display for ComputeStatus {
ComputeStatus::Running => f.write_str("running"),
ComputeStatus::Configuration => f.write_str("configuration"),
ComputeStatus::Failed => f.write_str("failed"),
ComputeStatus::TerminationPendingFast => f.write_str("termination-pending-fast"),
ComputeStatus::TerminationPendingImmediate => {
f.write_str("termination-pending-immediate")
}
ComputeStatus::TerminationPending { .. } => f.write_str("termination-pending"),
ComputeStatus::Terminated => f.write_str("terminated"),
}
}

View File

@@ -20,7 +20,6 @@ use tokio_stream::wrappers::ReceiverStream;
use tokio_util::io::ReaderStream;
use tracing::{Instrument, debug, info, info_span, warn};
use utils::auth::{AuthError, Claims, SwappableJwtAuth};
use utils::metrics_collector::{METRICS_COLLECTOR, METRICS_STALE_MILLIS};
use crate::error::{ApiError, api_error_handler, route_error_handler};
use crate::request::{get_query_param, parse_query_param};
@@ -251,28 +250,9 @@ impl std::io::Write for ChannelWriter {
}
}
pub async fn prometheus_metrics_handler(
req: Request<Body>,
force_metric_collection_on_scrape: bool,
) -> Result<Response<Body>, ApiError> {
pub async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<Body>, ApiError> {
SERVE_METRICS_COUNT.inc();
// HADRON
let requested_use_latest = parse_query_param(&req, "use_latest")?;
let use_latest = match requested_use_latest {
None => force_metric_collection_on_scrape,
Some(true) => true,
Some(false) => {
if force_metric_collection_on_scrape {
// We don't cache in this case
true
} else {
false
}
}
};
let started_at = std::time::Instant::now();
let (tx, rx) = mpsc::channel(1);
@@ -297,18 +277,12 @@ pub async fn prometheus_metrics_handler(
let _span = span.entered();
// HADRON
let collected = if use_latest {
// Skip caching the results if we always force metric collection on scrape.
METRICS_COLLECTOR.run_once(!force_metric_collection_on_scrape)
} else {
METRICS_COLLECTOR.last_collected()
};
let metrics = metrics::gather();
let gathered_at = std::time::Instant::now();
let res = encoder
.encode(&collected.metrics, &mut writer)
.encode(&metrics, &mut writer)
.and_then(|_| writer.flush().map_err(|e| e.into()));
// this instant is not when we finally got the full response sent, sending is done by hyper
@@ -321,10 +295,6 @@ pub async fn prometheus_metrics_handler(
let encoded_in = encoded_at - gathered_at - writer.wait_time();
let total = encoded_at - started_at;
// HADRON
let staleness_ms = (encoded_at - collected.collected_at).as_millis();
METRICS_STALE_MILLIS.set(staleness_ms as i64);
match res {
Ok(()) => {
tracing::info!(
@@ -333,7 +303,6 @@ pub async fn prometheus_metrics_handler(
spawning_ms = spawned_in.as_millis(),
collection_ms = collected_in.as_millis(),
encoding_ms = encoded_in.as_millis(),
stalenss_ms = staleness_ms,
"responded /metrics"
);
}

View File

@@ -274,7 +274,6 @@ pub struct ConfigToml {
pub basebackup_cache_config: Option<BasebackupCacheConfig>,
#[serde(skip_serializing_if = "Option::is_none")]
pub image_layer_generation_large_timeline_threshold: Option<u64>,
pub force_metric_collection_on_scrape: bool,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -832,7 +831,6 @@ impl Default for ConfigToml {
basebackup_cache_config: None,
posthog_config: None,
image_layer_generation_large_timeline_threshold: Some(2 * 1024 * 1024 * 1024),
force_metric_collection_on_scrape: true,
}
}
}

View File

@@ -99,8 +99,6 @@ pub mod elapsed_accum;
#[cfg(target_os = "linux")]
pub mod linux_socket_ioctl;
pub mod metrics_collector;
// Re-export used in macro. Avoids adding git-version as dep in target crates.
#[doc(hidden)]
pub use git_version;

View File

@@ -1,75 +0,0 @@
use std::{
sync::{Arc, RwLock},
time::{Duration, Instant},
};
use metrics::{IntGauge, proto::MetricFamily, register_int_gauge};
use once_cell::sync::Lazy;
pub static METRICS_STALE_MILLIS: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"metrics_metrics_stale_milliseconds",
"The current metrics stale time in milliseconds"
)
.expect("failed to define a metric")
});
#[derive(Debug)]
pub struct CollectedMetrics {
pub metrics: Vec<MetricFamily>,
pub collected_at: Instant,
}
impl CollectedMetrics {
fn new(metrics: Vec<MetricFamily>) -> Self {
Self {
metrics,
collected_at: Instant::now(),
}
}
}
#[derive(Debug)]
pub struct MetricsCollector {
last_collected: RwLock<Arc<CollectedMetrics>>,
}
impl MetricsCollector {
pub fn new() -> Self {
Self {
last_collected: RwLock::new(Arc::new(CollectedMetrics::new(vec![]))),
}
}
#[tracing::instrument(name = "metrics_collector", skip_all)]
pub fn run_once(&self, cache_metrics: bool) -> Arc<CollectedMetrics> {
let started = Instant::now();
let metrics = metrics::gather();
let collected = Arc::new(CollectedMetrics::new(metrics));
if cache_metrics {
let mut guard = self.last_collected.write().unwrap();
*guard = collected.clone();
}
tracing::info!(
"Collected {} metric families in {} ms",
collected.metrics.len(),
started.elapsed().as_millis()
);
collected
}
pub fn last_collected(&self) -> Arc<CollectedMetrics> {
self.last_collected.read().unwrap().clone()
}
}
impl Default for MetricsCollector {
fn default() -> Self {
Self::new()
}
}
// Interval for metrics collection. Currently hard-coded to be the same as the metrics scape interval from the obs agent
pub static METRICS_COLLECTION_INTERVAL: Duration = Duration::from_secs(30);
pub static METRICS_COLLECTOR: Lazy<MetricsCollector> = Lazy::new(MetricsCollector::default);

View File

@@ -428,12 +428,6 @@ pub fn empty_shmem() -> crate::bindings::WalproposerShmemState {
shard_number: 0,
};
let empty_wal_rate_limiter = crate::bindings::WalRateLimiter {
should_limit: crate::bindings::pg_atomic_uint32 { value: 0 },
sent_bytes: 0,
last_recorded_time_us: 0,
};
crate::bindings::WalproposerShmemState {
propEpochStartLsn: crate::bindings::pg_atomic_uint64 { value: 0 },
donor_name: [0; 64],
@@ -447,7 +441,6 @@ pub fn empty_shmem() -> crate::bindings::WalproposerShmemState {
num_shards: 0,
replica_promote: false,
min_ps_feedback: empty_feedback,
wal_rate_limiter: empty_wal_rate_limiter,
}
}

View File

@@ -1,4 +1,4 @@
use std::collections::{BTreeMap, HashMap};
use std::collections::HashMap;
use std::error::Error as _;
use std::time::Duration;
@@ -251,70 +251,6 @@ impl Client {
Ok(())
}
pub async fn tenant_timeline_compact(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
force_image_layer_creation: bool,
must_force_image_layer_creation: bool,
scheduled: bool,
wait_until_done: bool,
) -> Result<()> {
let mut path = reqwest::Url::parse(&format!(
"{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/compact",
self.mgmt_api_endpoint
))
.expect("Cannot build URL");
if force_image_layer_creation {
path.query_pairs_mut()
.append_pair("force_image_layer_creation", "true");
}
if must_force_image_layer_creation {
path.query_pairs_mut()
.append_pair("must_force_image_layer_creation", "true");
}
if scheduled {
path.query_pairs_mut().append_pair("scheduled", "true");
}
if wait_until_done {
path.query_pairs_mut()
.append_pair("wait_until_scheduled_compaction_done", "true");
path.query_pairs_mut()
.append_pair("wait_until_uploaded", "true");
}
self.request(Method::PUT, path, ()).await?;
Ok(())
}
/* BEGIN_HADRON */
pub async fn tenant_timeline_describe(
&self,
tenant_shard_id: &TenantShardId,
timeline_id: &TimelineId,
) -> Result<TimelineInfo> {
let mut path = reqwest::Url::parse(&format!(
"{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}",
self.mgmt_api_endpoint
))
.expect("Cannot build URL");
path.query_pairs_mut()
.append_pair("include-image-consistent-lsn", "true");
let response: reqwest::Response = self.request(Method::GET, path, ()).await?;
let body = response.json().await.map_err(Error::ReceiveBody)?;
Ok(body)
}
pub async fn list_tenant_visible_size(&self) -> Result<BTreeMap<TenantShardId, u64>> {
let uri = format!("{}/v1/list_tenant_visible_size", self.mgmt_api_endpoint);
let resp = self.get(&uri).await?;
resp.json().await.map_err(Error::ReceiveBody)
}
/* END_HADRON */
pub async fn tenant_scan_remote_storage(
&self,
tenant_id: TenantId,

View File

@@ -9,7 +9,6 @@ testing = ["pageserver_api/testing"]
[dependencies]
anyhow.workspace = true
arc-swap.workspace = true
bytes.workspace = true
compute_api.workspace = true
futures.workspace = true

View File

@@ -3,10 +3,8 @@ use std::num::NonZero;
use std::sync::Arc;
use anyhow::anyhow;
use arc_swap::ArcSwap;
use futures::stream::FuturesUnordered;
use futures::{FutureExt as _, StreamExt as _};
use tonic::codec::CompressionEncoding;
use tracing::instrument;
use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool};
@@ -57,85 +55,28 @@ const MAX_BULK_STREAM_QUEUE_DEPTH: NonZero<usize> = NonZero::new(4).unwrap();
/// TODO: this client does not support base backups or LSN leases, as these are only used by
/// compute_ctl. Consider adding this, but LSN leases need concurrent requests on all shards.
pub struct PageserverClient {
/// The tenant ID.
tenant_id: TenantId,
/// The timeline ID.
timeline_id: TimelineId,
/// The JWT auth token for this tenant, if any.
auth_token: Option<String>,
/// The compression to use, if any.
compression: Option<CompressionEncoding>,
/// The shards for this tenant.
shards: ArcSwap<Shards>,
/// The retry configuration.
// TODO: support swapping out the shard map, e.g. via an ArcSwap.
shards: Shards,
retry: Retry,
}
impl PageserverClient {
/// Creates a new Pageserver client for a given tenant and timeline. Uses the Pageservers given
/// in the shard spec, which must be complete and must use gRPC URLs.
/// in the shard map, which must be complete and must use gRPC URLs.
pub fn new(
tenant_id: TenantId,
timeline_id: TimelineId,
shard_spec: ShardSpec,
shard_map: HashMap<ShardIndex, String>,
stripe_size: ShardStripeSize,
auth_token: Option<String>,
compression: Option<CompressionEncoding>,
) -> anyhow::Result<Self> {
let shards = Shards::new(
tenant_id,
timeline_id,
shard_spec,
auth_token.clone(),
compression,
)?;
let shards = Shards::new(tenant_id, timeline_id, shard_map, stripe_size, auth_token)?;
Ok(Self {
tenant_id,
timeline_id,
auth_token,
compression,
shards: ArcSwap::new(Arc::new(shards)),
shards,
retry: Retry,
})
}
/// Updates the shards from the given shard spec. In-flight requests will complete using the
/// existing shards, but may retry with the new shards if they fail.
///
/// TODO: verify that in-flight requests are allowed to complete, and that the old pools are
/// properly spun down and dropped afterwards.
pub fn update_shards(&self, shard_spec: ShardSpec) -> anyhow::Result<()> {
// Validate the shard spec. We should really use `ArcSwap::rcu` for this, to avoid races
// with concurrent updates, but that involves creating a new `Shards` on every attempt,
// which spins up a bunch of Tokio tasks and such. These should already be checked elsewhere
// in the stack, and if they're violated then we already have problems elsewhere, so a
// best-effort but possibly-racy check is okay here.
let old = self.shards.load_full();
if shard_spec.count < old.count {
return Err(anyhow!(
"can't reduce shard count from {} to {}",
old.count,
shard_spec.count
));
}
if !old.count.is_unsharded() && shard_spec.stripe_size != old.stripe_size {
return Err(anyhow!(
"can't change stripe size from {} to {}",
old.stripe_size,
shard_spec.stripe_size
));
}
let shards = Shards::new(
self.tenant_id,
self.timeline_id,
shard_spec,
self.auth_token.clone(),
self.compression,
)?;
self.shards.store(Arc::new(shards));
Ok(())
}
/// Returns whether a relation exists.
#[instrument(skip_all, fields(rel=%req.rel, lsn=%req.read_lsn))]
pub async fn check_rel_exists(
@@ -143,9 +84,9 @@ impl PageserverClient {
req: page_api::CheckRelExistsRequest,
) -> tonic::Result<page_api::CheckRelExistsResponse> {
self.retry
.with(async |_| {
.with(async || {
// Relation metadata is only available on shard 0.
let mut client = self.shards.load_full().get_zero().client().await?;
let mut client = self.shards.get_zero().client().await?;
client.check_rel_exists(req).await
})
.await
@@ -158,17 +99,16 @@ impl PageserverClient {
req: page_api::GetDbSizeRequest,
) -> tonic::Result<page_api::GetDbSizeResponse> {
self.retry
.with(async |_| {
.with(async || {
// Relation metadata is only available on shard 0.
let mut client = self.shards.load_full().get_zero().client().await?;
let mut client = self.shards.get_zero().client().await?;
client.get_db_size(req).await
})
.await
}
/// Fetches pages. The `request_id` must be unique across all in-flight requests, and the
/// `attempt` must be 0 (incremented on retry). Automatically splits requests that straddle
/// shard boundaries, and assembles the responses.
/// Fetches pages. The `request_id` must be unique across all in-flight requests. Automatically
/// splits requests that straddle shard boundaries, and assembles the responses.
///
/// Unlike `page_api::Client`, this automatically converts `status_code` into `tonic::Status`
/// errors. All responses will have `GetPageStatusCode::Ok`.
@@ -188,96 +128,72 @@ impl PageserverClient {
if req.block_numbers.is_empty() {
return Err(tonic::Status::invalid_argument("no block number"));
}
// The request attempt must be 0. The client will increment it internally.
if req.request_id.attempt != 0 {
return Err(tonic::Status::invalid_argument("request attempt must be 0"));
}
// The shards may change while we're fetching pages. We execute the request using a stable
// view of the shards (especially important for requests that span shards), but retry the
// top-level (pre-split) request to pick up shard changes. This can lead to unnecessary
// retries and re-splits in some cases where requests span shards, but these are expected to
// be rare.
//
// TODO: the gRPC server and client doesn't yet properly support shard splits. Revisit this
// once we figure out how to handle these.
self.retry
.with(async |attempt| {
let mut req = req.clone();
req.request_id.attempt = attempt as u32;
Self::get_page_with_shards(req, &self.shards.load_full()).await
})
.await
}
/// Fetches pages using the given shards. This uses a stable view of the shards, regardless of
/// concurrent shard updates. Does not retry internally, but is retried by `get_page()`.
async fn get_page_with_shards(
req: page_api::GetPageRequest,
shards: &Shards,
) -> tonic::Result<page_api::GetPageResponse> {
// Fast path: request is for a single shard.
if let Some(shard_id) =
GetPageSplitter::for_single_shard(&req, shards.count, shards.stripe_size)
GetPageSplitter::is_single_shard(&req, self.shards.count, self.shards.stripe_size)
{
return Self::get_page_with_shard(req, shards.get(shard_id)?).await;
return self.get_page_for_shard(shard_id, req).await;
}
// Request spans multiple shards. Split it, dispatch concurrent per-shard requests, and
// reassemble the responses.
let mut splitter = GetPageSplitter::split(req, shards.count, shards.stripe_size);
//
// TODO: when we support shard map updates, we need to detect when it changes and re-split
// the request on errors.
let mut splitter = GetPageSplitter::split(req, self.shards.count, self.shards.stripe_size);
let mut shard_requests = FuturesUnordered::new();
for (shard_id, shard_req) in splitter.drain_requests() {
let future = Self::get_page_with_shard(shard_req, shards.get(shard_id)?)
.map(move |result| result.map(|resp| (shard_id, resp)));
shard_requests.push(future);
}
let mut shard_requests: FuturesUnordered<_> = splitter
.drain_requests()
.map(|(shard_id, shard_req)| {
// NB: each request will retry internally.
self.get_page_for_shard(shard_id, shard_req)
.map(move |result| result.map(|resp| (shard_id, resp)))
})
.collect();
while let Some((shard_id, shard_response)) = shard_requests.next().await.transpose()? {
splitter.add_response(shard_id, shard_response)?;
}
splitter.get_response()
splitter.assemble_response()
}
/// Fetches pages on the given shard. Does not retry internally.
async fn get_page_with_shard(
/// Fetches pages that belong to the given shard.
#[instrument(skip_all, fields(shard = %shard_id))]
async fn get_page_for_shard(
&self,
shard_id: ShardIndex,
req: page_api::GetPageRequest,
shard: &Shard,
) -> tonic::Result<page_api::GetPageResponse> {
let stream = shard.stream(req.request_class.is_bulk()).await;
let resp = stream.send(req.clone()).await?;
let resp = self
.retry
.with(async || {
let stream = self
.shards
.get(shard_id)?
.stream(req.request_class.is_bulk())
.await;
let resp = stream.send(req.clone()).await?;
// Convert per-request errors into a tonic::Status.
if resp.status_code != page_api::GetPageStatusCode::Ok {
return Err(tonic::Status::new(
resp.status_code.into(),
resp.reason.unwrap_or_else(|| String::from("unknown error")),
));
}
// Convert per-request errors into a tonic::Status.
if resp.status_code != page_api::GetPageStatusCode::Ok {
return Err(tonic::Status::new(
resp.status_code.into(),
resp.reason.unwrap_or_else(|| String::from("unknown error")),
));
}
// Check that we received the expected pages.
if req.rel != resp.rel {
Ok(resp)
})
.await?;
// Make sure we got the right number of pages.
// NB: check outside of the retry loop, since we don't want to retry this.
let (expected, actual) = (req.block_numbers.len(), resp.page_images.len());
if expected != actual {
return Err(tonic::Status::internal(format!(
"shard {} returned wrong relation, expected {} got {}",
shard.id, req.rel, resp.rel
)));
}
if !req
.block_numbers
.iter()
.copied()
.eq(resp.pages.iter().map(|p| p.block_number))
{
return Err(tonic::Status::internal(format!(
"shard {} returned wrong pages, expected {:?} got {:?}",
shard.id,
req.block_numbers,
resp.pages
.iter()
.map(|page| page.block_number)
.collect::<Vec<_>>()
"expected {expected} pages for shard {shard_id}, got {actual}",
)));
}
@@ -291,9 +207,9 @@ impl PageserverClient {
req: page_api::GetRelSizeRequest,
) -> tonic::Result<page_api::GetRelSizeResponse> {
self.retry
.with(async |_| {
.with(async || {
// Relation metadata is only available on shard 0.
let mut client = self.shards.load_full().get_zero().client().await?;
let mut client = self.shards.get_zero().client().await?;
client.get_rel_size(req).await
})
.await
@@ -306,53 +222,50 @@ impl PageserverClient {
req: page_api::GetSlruSegmentRequest,
) -> tonic::Result<page_api::GetSlruSegmentResponse> {
self.retry
.with(async |_| {
.with(async || {
// SLRU segments are only available on shard 0.
let mut client = self.shards.load_full().get_zero().client().await?;
let mut client = self.shards.get_zero().client().await?;
client.get_slru_segment(req).await
})
.await
}
}
/// Shard specification for a PageserverClient.
pub struct ShardSpec {
/// Maps shard indices to gRPC URLs.
///
/// INVARIANT: every shard 0..count is present, and shard 0 is always present.
/// INVARIANT: every URL is valid and uses grpc:// scheme.
urls: HashMap<ShardIndex, String>,
/// Tracks the tenant's shards.
struct Shards {
/// The shard count.
///
/// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention.
count: ShardCount,
/// The stripe size for these shards.
/// The stripe size. Only used for sharded tenants.
stripe_size: ShardStripeSize,
/// Shards by shard index.
///
/// NB: unsharded tenants use count 0, like `ShardIndex::unsharded()`.
///
/// INVARIANT: every shard 0..count is present.
/// INVARIANT: shard 0 is always present.
map: HashMap<ShardIndex, Shard>,
}
impl ShardSpec {
/// Creates a new shard spec with the given URLs and stripe size. All shards must be given.
/// The stripe size may be omitted for unsharded tenants.
pub fn new(
urls: HashMap<ShardIndex, String>,
stripe_size: Option<ShardStripeSize>,
impl Shards {
/// Creates a new set of shards based on a shard map.
fn new(
tenant_id: TenantId,
timeline_id: TimelineId,
shard_map: HashMap<ShardIndex, String>,
stripe_size: ShardStripeSize,
auth_token: Option<String>,
) -> anyhow::Result<Self> {
// Compute the shard count.
let count = match urls.len() {
let count = match shard_map.len() {
0 => return Err(anyhow!("no shards provided")),
1 => ShardCount::new(0), // NB: unsharded tenants use 0, like `ShardIndex::unsharded()`
n if n > u8::MAX as usize => return Err(anyhow!("too many shards: {n}")),
n => ShardCount::new(n as u8),
};
// Determine the stripe size. It doesn't matter for unsharded tenants.
if stripe_size.is_none() && !count.is_unsharded() {
return Err(anyhow!("stripe size must be given for sharded tenants"));
}
let stripe_size = stripe_size.unwrap_or_default();
// Validate the shard spec.
for (shard_id, url) in &urls {
let mut map = HashMap::new();
for (shard_id, url) in shard_map {
// The shard index must match the computed shard count, even for unsharded tenants.
if shard_id.shard_count != count {
return Err(anyhow!("invalid shard index {shard_id}, expected {count}"));
@@ -363,72 +276,21 @@ impl ShardSpec {
}
// The above conditions guarantee that we have all shards 0..count: len() matches count,
// shard number < count, and numbers are unique (via hashmap).
// Validate the URL.
if PageserverProtocol::from_connstring(url)? != PageserverProtocol::Grpc {
return Err(anyhow!("invalid shard URL {url}: must use gRPC"));
}
let shard = Shard::new(url, tenant_id, timeline_id, shard_id, auth_token.clone())?;
map.insert(shard_id, shard);
}
Ok(Self {
urls,
count,
stripe_size,
})
}
}
/// Tracks the tenant's shards.
struct Shards {
/// Shards by shard index.
///
/// INVARIANT: every shard 0..count is present.
/// INVARIANT: shard 0 is always present.
by_index: HashMap<ShardIndex, Shard>,
/// The shard count.
///
/// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention.
count: ShardCount,
/// The stripe size. Only used for sharded tenants.
stripe_size: ShardStripeSize,
}
impl Shards {
/// Creates a new set of shards based on a shard spec.
fn new(
tenant_id: TenantId,
timeline_id: TimelineId,
shard_spec: ShardSpec,
auth_token: Option<String>,
compression: Option<CompressionEncoding>,
) -> anyhow::Result<Self> {
// NB: the shard spec has already been validated when constructed.
let mut shards = HashMap::with_capacity(shard_spec.urls.len());
for (shard_id, url) in shard_spec.urls {
shards.insert(
shard_id,
Shard::new(
url,
tenant_id,
timeline_id,
shard_id,
auth_token.clone(),
compression,
)?,
);
}
Ok(Self {
by_index: shards,
count: shard_spec.count,
stripe_size: shard_spec.stripe_size,
map,
})
}
/// Looks up the given shard.
#[allow(clippy::result_large_err)] // TODO: check perf impact
fn get(&self, shard_id: ShardIndex) -> tonic::Result<&Shard> {
self.by_index
self.map
.get(&shard_id)
.ok_or_else(|| tonic::Status::not_found(format!("unknown shard {shard_id}")))
}
@@ -450,8 +312,6 @@ impl Shards {
/// * Bulk client pool: unbounded.
/// * Bulk stream pool: MAX_BULK_STREAMS and MAX_BULK_STREAM_QUEUE_DEPTH.
struct Shard {
/// The shard ID.
id: ShardIndex,
/// Unary gRPC client pool.
client_pool: Arc<ClientPool>,
/// GetPage stream pool.
@@ -468,8 +328,12 @@ impl Shard {
timeline_id: TimelineId,
shard_id: ShardIndex,
auth_token: Option<String>,
compression: Option<CompressionEncoding>,
) -> anyhow::Result<Self> {
// Sanity-check that the URL uses gRPC.
if PageserverProtocol::from_connstring(&url)? != PageserverProtocol::Grpc {
return Err(anyhow!("invalid shard URL {url}: must use gRPC"));
}
// Common channel pool for unary and stream requests. Bounded by client/stream pools.
let channel_pool = ChannelPool::new(url.clone(), MAX_CLIENTS_PER_CHANNEL)?;
@@ -480,7 +344,6 @@ impl Shard {
timeline_id,
shard_id,
auth_token.clone(),
compression,
Some(MAX_UNARY_CLIENTS),
);
@@ -493,7 +356,6 @@ impl Shard {
timeline_id,
shard_id,
auth_token.clone(),
compression,
None, // unbounded, limited by stream pool
),
Some(MAX_STREAMS),
@@ -509,7 +371,6 @@ impl Shard {
timeline_id,
shard_id,
auth_token,
compression,
None, // unbounded, limited by stream pool
),
Some(MAX_BULK_STREAMS),
@@ -517,7 +378,6 @@ impl Shard {
);
Ok(Self {
id: shard_id,
client_pool,
stream_pool,
bulk_stream_pool,

View File

@@ -3,4 +3,4 @@ mod pool;
mod retry;
mod split;
pub use client::{PageserverClient, ShardSpec};
pub use client::PageserverClient;

View File

@@ -40,7 +40,6 @@ use futures::StreamExt as _;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc, oneshot};
use tokio_util::sync::CancellationToken;
use tonic::codec::CompressionEncoding;
use tonic::transport::{Channel, Endpoint};
use tracing::{error, warn};
@@ -243,8 +242,6 @@ pub struct ClientPool {
shard_id: ShardIndex,
/// Authentication token, if any.
auth_token: Option<String>,
/// Compression to use.
compression: Option<CompressionEncoding>,
/// Channel pool to acquire channels from.
channel_pool: Arc<ChannelPool>,
/// Limits the max number of concurrent clients for this pool. None if the pool is unbounded.
@@ -284,7 +281,6 @@ impl ClientPool {
timeline_id: TimelineId,
shard_id: ShardIndex,
auth_token: Option<String>,
compression: Option<CompressionEncoding>,
max_clients: Option<NonZero<usize>>,
) -> Arc<Self> {
let pool = Arc::new(Self {
@@ -292,7 +288,6 @@ impl ClientPool {
timeline_id,
shard_id,
auth_token,
compression,
channel_pool,
idle: Mutex::default(),
idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL),
@@ -336,7 +331,7 @@ impl ClientPool {
self.timeline_id,
self.shard_id,
self.auth_token.clone(),
self.compression,
None,
)?;
Ok(ClientGuard {
@@ -591,10 +586,6 @@ impl StreamPool {
// Track caller response channels by request ID. If the task returns early, these response
// channels will be dropped and the waiting callers will receive an error.
//
// NB: this will leak entries if the server doesn't respond to a request (by request ID).
// It shouldn't happen, and if it does it will often hold onto queue depth quota anyway and
// block further use. But we could consider reaping closed channels after some time.
let mut callers = HashMap::new();
// Process requests and responses.
@@ -699,15 +690,6 @@ impl Drop for StreamGuard {
// Release the queue depth reservation on drop. This can prematurely decrement it if dropped
// before the response is received, but that's okay.
//
// TODO: actually, it's probably not okay. Queue depth release should be moved into the
// stream task, such that it continues to account for the queue depth slot until the server
// responds. Otherwise, if a slow request times out and keeps blocking the stream, the
// server will keep waiting on it and we can pile on subsequent requests (including the
// timeout retry) in the same stream and get blocked. But we may also want to avoid blocking
// requests on e.g. LSN waits and layer downloads, instead returning early to free up the
// stream. Or just scale out streams with a queue depth of 1 to sidestep all head-of-line
// blocking. TBD.
let mut streams = pool.streams.lock().unwrap();
let entry = streams.get_mut(&self.id).expect("unknown stream");
assert!(entry.idle_since.is_none(), "active stream marked idle");

View File

@@ -23,14 +23,14 @@ impl Retry {
/// If true, log successful requests. For debugging.
const LOG_SUCCESS: bool = false;
/// Runs the given async closure with timeouts and retries (exponential backoff), passing the
/// attempt number starting at 0. Logs errors, using the current tracing span for context.
/// Runs the given async closure with timeouts and retries (exponential backoff). Logs errors,
/// using the current tracing span for context.
///
/// Only certain gRPC status codes are retried, see [`Self::should_retry`]. For default
/// timeouts, see [`Self::REQUEST_TIMEOUT`] and [`Self::TOTAL_TIMEOUT`].
pub async fn with<T, F, O>(&self, mut f: F) -> tonic::Result<T>
where
F: FnMut(usize) -> O, // takes attempt number, starting at 0
F: FnMut() -> O,
O: Future<Output = tonic::Result<T>>,
{
let started = Instant::now();
@@ -47,7 +47,7 @@ impl Retry {
}
let request_started = Instant::now();
tokio::time::timeout(Self::REQUEST_TIMEOUT, f(retries))
tokio::time::timeout(Self::REQUEST_TIMEOUT, f())
.await
.map_err(|_| {
tonic::Status::deadline_exceeded(format!(
@@ -131,6 +131,7 @@ impl Retry {
tonic::Code::Aborted => true,
tonic::Code::Cancelled => true,
tonic::Code::DeadlineExceeded => true, // maybe transient slowness
tonic::Code::Internal => true, // maybe transient failure?
tonic::Code::ResourceExhausted => true,
tonic::Code::Unavailable => true,
@@ -138,10 +139,6 @@ impl Retry {
tonic::Code::AlreadyExists => false,
tonic::Code::DataLoss => false,
tonic::Code::FailedPrecondition => false,
// NB: don't retry Internal. It is intended for serious errors such as invariant
// violations, and is also used for client-side invariant checks that would otherwise
// result in retry loops.
tonic::Code::Internal => false,
tonic::Code::InvalidArgument => false,
tonic::Code::NotFound => false,
tonic::Code::OutOfRange => false,

View File

@@ -5,24 +5,27 @@ use bytes::Bytes;
use pageserver_api::key::rel_block_to_key;
use pageserver_api::shard::{ShardStripeSize, key_to_shard_number};
use pageserver_page_api as page_api;
use utils::shard::{ShardCount, ShardIndex, ShardNumber};
use utils::shard::{ShardCount, ShardIndex};
/// Splits GetPageRequests that straddle shard boundaries and assembles the responses.
/// TODO: add tests for this.
pub struct GetPageSplitter {
/// The original request ID. Used for all shard requests.
request_id: page_api::RequestID,
/// Split requests by shard index.
requests: HashMap<ShardIndex, page_api::GetPageRequest>,
/// The response being assembled. Preallocated with empty pages, to be filled in.
response: page_api::GetPageResponse,
/// Maps the offset in `request.block_numbers` and `response.pages` to the owning shard. Used
/// to assemble the response pages in the same order as the original request.
/// Maps the offset in `GetPageRequest::block_numbers` to the owning shard. Used to assemble
/// the response pages in the same order as the original request.
block_shards: Vec<ShardIndex>,
/// Page responses by shard index. Will be assembled into a single response.
responses: HashMap<ShardIndex, Vec<Bytes>>,
}
impl GetPageSplitter {
/// Checks if the given request only touches a single shard, and returns the shard ID. This is
/// the common case, so we check first in order to avoid unnecessary allocations and overhead.
pub fn for_single_shard(
/// The caller must ensure that the request has at least one block number, or this will panic.
pub fn is_single_shard(
req: &page_api::GetPageRequest,
count: ShardCount,
stripe_size: ShardStripeSize,
@@ -32,12 +35,8 @@ impl GetPageSplitter {
return Some(ShardIndex::unsharded());
}
// Find the first page's shard, for comparison. If there are no pages, just return the first
// shard (caller likely checked already, otherwise the server will reject it).
let Some(&first_page) = req.block_numbers.first() else {
return Some(ShardIndex::new(ShardNumber(0), count));
};
let key = rel_block_to_key(req.rel, first_page);
// Find the base shard index for the first page, and compare with the rest.
let key = rel_block_to_key(req.rel, *req.block_numbers.first().expect("no pages"));
let shard_number = key_to_shard_number(count, stripe_size, &key);
req.block_numbers
@@ -58,19 +57,19 @@ impl GetPageSplitter {
) -> Self {
// The caller should make sure we don't split requests unnecessarily.
debug_assert!(
Self::for_single_shard(&req, count, stripe_size).is_none(),
Self::is_single_shard(&req, count, stripe_size).is_none(),
"unnecessary request split"
);
// Split the requests by shard index.
let mut requests = HashMap::with_capacity(2); // common case
let mut block_shards = Vec::with_capacity(req.block_numbers.len());
for &blkno in &req.block_numbers {
for blkno in req.block_numbers {
let key = rel_block_to_key(req.rel, blkno);
let shard_number = key_to_shard_number(count, stripe_size, &key);
let shard_id = ShardIndex::new(shard_number, count);
requests
let shard_req = requests
.entry(shard_id)
.or_insert_with(|| page_api::GetPageRequest {
request_id: req.request_id,
@@ -78,47 +77,27 @@ impl GetPageSplitter {
rel: req.rel,
read_lsn: req.read_lsn,
block_numbers: Vec::new(),
})
.block_numbers
.push(blkno);
});
shard_req.block_numbers.push(blkno);
block_shards.push(shard_id);
}
// Construct a response to be populated by shard responses. Preallocate empty page slots
// with the expected block numbers.
let response = page_api::GetPageResponse {
request_id: req.request_id,
status_code: page_api::GetPageStatusCode::Ok,
reason: None,
rel: req.rel,
pages: req
.block_numbers
.into_iter()
.map(|block_number| {
page_api::Page {
block_number,
image: Bytes::new(), // empty page slot to be filled in
}
})
.collect(),
};
Self {
request_id: req.request_id,
responses: HashMap::with_capacity(requests.len()),
requests,
response,
block_shards,
}
}
/// Drains the per-shard requests, moving them out of the splitter to avoid extra allocations.
/// Drains the per-shard requests, moving them out of the hashmap to avoid extra allocations.
pub fn drain_requests(
&mut self,
) -> impl Iterator<Item = (ShardIndex, page_api::GetPageRequest)> {
self.requests.drain()
}
/// Adds a response from the given shard. The response must match the request ID and have an OK
/// status code. A response must not already exist for the given shard ID.
/// Adds a response from the given shard.
#[allow(clippy::result_large_err)]
pub fn add_response(
&mut self,
@@ -126,84 +105,68 @@ impl GetPageSplitter {
response: page_api::GetPageResponse,
) -> tonic::Result<()> {
// The caller should already have converted status codes into tonic::Status.
if response.status_code != page_api::GetPageStatusCode::Ok {
assert_eq!(response.status_code, page_api::GetPageStatusCode::Ok);
// Make sure the response matches the request ID.
if response.request_id != self.request_id {
return Err(tonic::Status::internal(format!(
"unexpected non-OK response for shard {shard_id}: {} {}",
response.status_code,
response.reason.unwrap_or_default()
"response ID {} does not match request ID {}",
response.request_id, self.request_id
)));
}
if response.request_id != self.response.request_id {
// Add the response data to the map.
let old = self.responses.insert(shard_id, response.page_images);
if old.is_some() {
return Err(tonic::Status::internal(format!(
"response ID mismatch for shard {shard_id}: expected {}, got {}",
self.response.request_id, response.request_id
)));
}
// Place the shard response pages into the assembled response, in request order.
let mut pages = response.pages.into_iter();
for (i, &s) in self.block_shards.iter().enumerate() {
if shard_id != s {
continue;
}
let Some(slot) = self.response.pages.get_mut(i) else {
return Err(tonic::Status::internal(format!(
"no block_shards slot {i} for shard {shard_id}"
)));
};
let Some(page) = pages.next() else {
return Err(tonic::Status::internal(format!(
"missing page {} in shard {shard_id} response",
slot.block_number
)));
};
if page.block_number != slot.block_number {
return Err(tonic::Status::internal(format!(
"shard {shard_id} returned wrong page at index {i}, expected {} got {}",
slot.block_number, page.block_number
)));
}
if !slot.image.is_empty() {
return Err(tonic::Status::internal(format!(
"shard {shard_id} returned duplicate page {} at index {i}",
slot.block_number
)));
}
*slot = page;
}
// Make sure we've consumed all pages from the shard response.
if let Some(extra_page) = pages.next() {
return Err(tonic::Status::internal(format!(
"shard {shard_id} returned extra page: {}",
extra_page.block_number
"duplicate response for shard {shard_id}",
)));
}
Ok(())
}
/// Fetches the final, assembled response.
/// Assembles the shard responses into a single response. Responses must be present for all
/// relevant shards, and the total number of pages must match the original request.
#[allow(clippy::result_large_err)]
pub fn get_response(self) -> tonic::Result<page_api::GetPageResponse> {
// Check that the response is complete.
for (i, page) in self.response.pages.iter().enumerate() {
if page.image.is_empty() {
pub fn assemble_response(self) -> tonic::Result<page_api::GetPageResponse> {
let mut response = page_api::GetPageResponse {
request_id: self.request_id,
status_code: page_api::GetPageStatusCode::Ok,
reason: None,
page_images: Vec::with_capacity(self.block_shards.len()),
};
// Set up per-shard page iterators we can pull from.
let mut shard_responses = HashMap::with_capacity(self.responses.len());
for (shard_id, responses) in self.responses {
shard_responses.insert(shard_id, responses.into_iter());
}
// Reassemble the responses in the same order as the original request.
for shard_id in &self.block_shards {
let page = shard_responses
.get_mut(shard_id)
.ok_or_else(|| {
tonic::Status::internal(format!("missing response for shard {shard_id}"))
})?
.next()
.ok_or_else(|| {
tonic::Status::internal(format!("missing page from shard {shard_id}"))
})?;
response.page_images.push(page);
}
// Make sure there are no additional pages.
for (shard_id, mut pages) in shard_responses {
if pages.next().is_some() {
return Err(tonic::Status::internal(format!(
"missing page {} for shard {}",
page.block_number,
self.block_shards
.get(i)
.map(|s| s.to_string())
.unwrap_or_else(|| "?".to_string())
"extra pages returned from shard {shard_id}"
)));
}
}
Ok(self.response)
Ok(response)
}
}

View File

@@ -17,7 +17,6 @@ pageserver = { path = ".." }
pageserver_api.workspace = true
remote_storage = { path = "../../libs/remote_storage" }
postgres_ffi.workspace = true
serde.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio-util.workspace = true

View File

@@ -1,85 +0,0 @@
use camino::Utf8PathBuf;
use clap::Parser;
use tokio_util::sync::CancellationToken;
/// Download a specific object from remote storage to a local file.
///
/// The remote storage configuration is supplied via the `REMOTE_STORAGE_CONFIG` environment
/// variable, in the same TOML format that the pageserver itself understands. This allows the
/// command to work with any cloud supported by the `remote_storage` crate (currently AWS S3,
/// Azure Blob Storage and local files), as long as the credentials are available via the
/// standard environment variables expected by the underlying SDKs.
///
/// Examples for setting the environment variable:
///
/// ```bash
/// # AWS S3 (region can also be provided via AWS_REGION)
/// export REMOTE_STORAGE_CONFIG='remote_storage = { bucket_name = "my-bucket", bucket_region = "us-east-2" }'
///
/// # Azure Blob Storage (account key picked up from AZURE_STORAGE_ACCOUNT_KEY)
/// export REMOTE_STORAGE_CONFIG='remote_storage = { container = "my-container", account = "my-account" }'
/// ```
#[derive(Parser)]
pub(crate) struct DownloadRemoteObjectCmd {
/// Key / path of the object to download (relative to the remote storage prefix).
///
/// Examples:
/// "wal/3aa8f.../00000001000000000000000A"
/// "pageserver/v1/tenants/<tenant_id>/timelines/<timeline_id>/layer_12345"
pub remote_path: String,
/// Path of the local file to create. Existing file will be overwritten.
///
/// Examples:
/// "./segment"
/// "/tmp/layer_12345.parquet"
pub output_file: Utf8PathBuf,
}
pub(crate) async fn main(cmd: &DownloadRemoteObjectCmd) -> anyhow::Result<()> {
use remote_storage::{DownloadOpts, GenericRemoteStorage, RemotePath, RemoteStorageConfig};
// Fetch remote storage configuration from the environment
let config_str = std::env::var("REMOTE_STORAGE_CONFIG").map_err(|_| {
anyhow::anyhow!(
"'REMOTE_STORAGE_CONFIG' environment variable must be set to a valid remote storage TOML config"
)
})?;
let config = RemoteStorageConfig::from_toml_str(&config_str)?;
// Initialise remote storage client
let storage = GenericRemoteStorage::from_config(&config).await?;
// RemotePath must be relative leading slashes confuse the parser.
let remote_path_str = cmd.remote_path.trim_start_matches('/');
let remote_path = RemotePath::from_string(remote_path_str)?;
let cancel = CancellationToken::new();
println!(
"Downloading '{remote_path}' from remote storage bucket {:?} ...",
config.storage.bucket_name()
);
// Start the actual download
let download = storage
.download(&remote_path, &DownloadOpts::default(), &cancel)
.await?;
// Stream to file
let mut reader = tokio_util::io::StreamReader::new(download.download_stream);
let tmp_path = cmd.output_file.with_extension("tmp");
let mut file = tokio::fs::File::create(&tmp_path).await?;
tokio::io::copy(&mut reader, &mut file).await?;
file.sync_all().await?;
// Atomically move into place
tokio::fs::rename(&tmp_path, &cmd.output_file).await?;
println!(
"Downloaded to '{}'. Last modified: {:?}, etag: {}",
cmd.output_file, download.last_modified, download.etag
);
Ok(())
}

View File

@@ -1,16 +1,14 @@
use std::str::FromStr;
use anyhow::{Context, Ok};
use anyhow::Context;
use camino::Utf8PathBuf;
use pageserver::tenant::{
IndexPart,
layer_map::{LayerMap, SearchResult},
remote_timeline_client::{index::LayerFileMetadata, remote_layer_path},
storage_layer::{LayerName, LayerVisibilityHint, PersistentLayerDesc, ReadableLayerWeak},
remote_timeline_client::remote_layer_path,
storage_layer::{PersistentLayerDesc, ReadableLayerWeak},
};
use pageserver_api::key::Key;
use serde::Serialize;
use std::collections::BTreeMap;
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
@@ -35,31 +33,6 @@ pub(crate) enum IndexPartCmd {
#[arg(long)]
lsn: String,
},
/// List all visible delta and image layers at the latest LSN.
ListVisibleLayers {
#[arg(long)]
path: Utf8PathBuf,
},
}
fn create_layer_map_from_index_part(
index_part: &IndexPart,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
) -> LayerMap {
let mut layer_map = LayerMap::default();
{
let mut updates = layer_map.batch_update();
for (key, value) in index_part.layer_metadata.iter() {
updates.insert_historic(PersistentLayerDesc::from_filename(
tenant_shard_id,
timeline_id,
key.clone(),
value.file_size,
));
}
}
layer_map
}
async fn search_layers(
@@ -76,7 +49,18 @@ async fn search_layers(
let bytes = tokio::fs::read(path).await?;
IndexPart::from_json_bytes(&bytes).unwrap()
};
let layer_map = create_layer_map_from_index_part(&index_json, tenant_shard_id, timeline_id);
let mut layer_map = LayerMap::default();
{
let mut updates = layer_map.batch_update();
for (key, value) in index_json.layer_metadata.iter() {
updates.insert_historic(PersistentLayerDesc::from_filename(
tenant_shard_id,
timeline_id,
key.clone(),
value.file_size,
));
}
}
let key = Key::from_hex(key)?;
let lsn = Lsn::from_str(lsn).unwrap();
@@ -114,69 +98,6 @@ async fn search_layers(
Ok(())
}
#[derive(Debug, Clone, Serialize)]
struct VisibleLayers {
pub total_images: u64,
pub total_image_bytes: u64,
pub total_deltas: u64,
pub total_delta_bytes: u64,
pub layer_metadata: BTreeMap<LayerName, LayerFileMetadata>,
}
impl VisibleLayers {
pub fn new() -> Self {
Self {
layer_metadata: BTreeMap::new(),
total_images: 0,
total_image_bytes: 0,
total_deltas: 0,
total_delta_bytes: 0,
}
}
pub fn add_layer(&mut self, name: LayerName, layer: LayerFileMetadata) {
match name {
LayerName::Image(_) => {
self.total_images += 1;
self.total_image_bytes += layer.file_size;
}
LayerName::Delta(_) => {
self.total_deltas += 1;
self.total_delta_bytes += layer.file_size;
}
}
self.layer_metadata.insert(name, layer);
}
}
async fn list_visible_layers(path: &Utf8PathBuf) -> anyhow::Result<()> {
let tenant_id = TenantId::generate();
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
let timeline_id = TimelineId::generate();
let bytes = tokio::fs::read(path).await.context("read file")?;
let index_part = IndexPart::from_json_bytes(&bytes).context("deserialize")?;
let layer_map = create_layer_map_from_index_part(&index_part, tenant_shard_id, timeline_id);
let mut visible_layers = VisibleLayers::new();
let (layers, _key_space) = layer_map.get_visibility(Vec::new());
for (layer, visibility) in layers {
if visibility == LayerVisibilityHint::Visible {
visible_layers.add_layer(
layer.layer_name(),
index_part
.layer_metadata
.get(&layer.layer_name())
.unwrap()
.clone(),
);
}
}
let output = serde_json::to_string_pretty(&visible_layers).context("serialize output")?;
println!("{output}");
Ok(())
}
pub(crate) async fn main(cmd: &IndexPartCmd) -> anyhow::Result<()> {
match cmd {
IndexPartCmd::Dump { path } => {
@@ -193,6 +114,5 @@ pub(crate) async fn main(cmd: &IndexPartCmd) -> anyhow::Result<()> {
key,
lsn,
} => search_layers(tenant_id, timeline_id, path, key, lsn).await,
IndexPartCmd::ListVisibleLayers { path } => list_visible_layers(path).await,
}
}

View File

@@ -4,7 +4,6 @@
//!
//! Separate, `metadata` subcommand allows to print and update pageserver's metadata file.
mod download_remote_object;
mod draw_timeline_dir;
mod index_part;
mod key;
@@ -17,7 +16,6 @@ use std::time::{Duration, SystemTime};
use camino::{Utf8Path, Utf8PathBuf};
use clap::{Parser, Subcommand};
use download_remote_object::DownloadRemoteObjectCmd;
use index_part::IndexPartCmd;
use layers::LayerCmd;
use page_trace::PageTraceCmd;
@@ -65,7 +63,6 @@ enum Commands {
/// Debug print a hex key found from logs
Key(key::DescribeKeyCommand),
PageTrace(PageTraceCmd),
DownloadRemoteObject(DownloadRemoteObjectCmd),
}
/// Read and update pageserver metadata file
@@ -188,9 +185,6 @@ async fn main() -> anyhow::Result<()> {
}
Commands::Key(dkc) => dkc.execute(),
Commands::PageTrace(cmd) => page_trace::main(&cmd)?,
Commands::DownloadRemoteObject(cmd) => {
download_remote_object::main(&cmd).await?;
}
};
Ok(())
}

View File

@@ -153,7 +153,7 @@ message GetDbSizeResponse {
message GetPageRequest {
// A request ID. Will be included in the response. Should be unique for
// in-flight requests on the stream.
RequestID request_id = 1;
uint64 request_id = 1;
// The request class.
GetPageClass request_class = 2;
// The LSN to read at.
@@ -177,14 +177,6 @@ message GetPageRequest {
repeated uint32 block_number = 5;
}
// A Request ID. Should be unique for in-flight requests on a stream. Included in the response.
message RequestID {
// The base request ID.
uint64 id = 1;
// The request attempt. Starts at 0, incremented on each retry.
uint32 attempt = 2;
}
// A GetPageRequest class. Primarily intended for observability, but may also be
// used for prioritization in the future.
enum GetPageClass {
@@ -207,26 +199,13 @@ enum GetPageClass {
// the entire batch is ready, so no one can make use of the individual pages.
message GetPageResponse {
// The original request's ID.
RequestID request_id = 1;
// The response status code. If not OK, the rel and page fields will be empty.
uint64 request_id = 1;
// The response status code.
GetPageStatusCode status_code = 2;
// A string describing the status, if any.
string reason = 3;
// The relation that the pages belong to.
RelTag rel = 4;
// The page(s), in the same order as the request.
repeated Page page = 5;
}
// A page.
//
// TODO: it would be slightly more efficient (but less convenient) to have separate arrays of block
// numbers and images, but given the 8KB page size it's probably negligible. Benchmark it anyway.
message Page {
// The page number.
uint32 block_number = 1;
// The materialized page image, as an 8KB byte vector.
bytes image = 2;
// The 8KB page images, in the same order as the request. Empty if status_code != OK.
repeated bytes page_image = 4;
}
// A GetPageResponse status code.

View File

@@ -1,5 +1,4 @@
use anyhow::Context as _;
use futures::future::ready;
use futures::{Stream, StreamExt as _, TryStreamExt as _};
use tokio::io::AsyncRead;
use tokio_util::io::StreamReader;
@@ -111,7 +110,7 @@ impl Client {
) -> tonic::Result<impl Stream<Item = tonic::Result<GetPageResponse>> + Send + 'static> {
let reqs = reqs.map(proto::GetPageRequest::from);
let resps = self.inner.get_pages(reqs).await?.into_inner();
Ok(resps.and_then(|resp| ready(GetPageResponse::try_from(resp).map_err(|err| err.into()))))
Ok(resps.map_ok(GetPageResponse::from))
}
/// Returns the size of a relation, as # of blocks.

View File

@@ -356,10 +356,7 @@ impl TryFrom<proto::GetPageRequest> for GetPageRequest {
return Err(ProtocolError::Missing("block_number"));
}
Ok(Self {
request_id: pb
.request_id
.ok_or(ProtocolError::Missing("request_id"))?
.into(),
request_id: pb.request_id,
request_class: pb.request_class.into(),
read_lsn: pb
.read_lsn
@@ -374,7 +371,7 @@ impl TryFrom<proto::GetPageRequest> for GetPageRequest {
impl From<GetPageRequest> for proto::GetPageRequest {
fn from(request: GetPageRequest) -> Self {
Self {
request_id: Some(request.request_id.into()),
request_id: request.request_id,
request_class: request.request_class.into(),
read_lsn: Some(request.read_lsn.into()),
rel: Some(request.rel.into()),
@@ -383,51 +380,8 @@ impl From<GetPageRequest> for proto::GetPageRequest {
}
}
/// A GetPage request ID and retry attempt. Should be unique for in-flight requests on a stream.
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct RequestID {
/// The base request ID.
pub id: u64,
// The request attempt. Starts at 0, incremented on each retry.
pub attempt: u32,
}
impl RequestID {
/// Creates a new RequestID with the given ID and an initial attempt of 0.
pub fn new(id: u64) -> Self {
Self { id, attempt: 0 }
}
}
impl Display for RequestID {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}.{}", self.id, self.attempt)
}
}
impl From<proto::RequestId> for RequestID {
fn from(pb: proto::RequestId) -> Self {
Self {
id: pb.id,
attempt: pb.attempt,
}
}
}
impl From<u64> for RequestID {
fn from(id: u64) -> Self {
Self::new(id)
}
}
impl From<RequestID> for proto::RequestId {
fn from(request_id: RequestID) -> Self {
Self {
id: request_id.id,
attempt: request_id.attempt,
}
}
}
/// A GetPage request ID.
pub type RequestID = u64;
/// A GetPage request class.
#[derive(Clone, Copy, Debug, strum_macros::Display)]
@@ -502,41 +456,32 @@ impl From<GetPageClass> for i32 {
pub struct GetPageResponse {
/// The original request's ID.
pub request_id: RequestID,
/// The response status code. If not OK, the `rel` and `pages` fields will be empty.
/// The response status code.
pub status_code: GetPageStatusCode,
/// A string describing the status, if any.
pub reason: Option<String>,
/// The relation that the pages belong to.
pub rel: RelTag,
// The page(s), in the same order as the request.
pub pages: Vec<Page>,
/// The 8KB page images, in the same order as the request. Empty if status != OK.
pub page_images: Vec<Bytes>,
}
impl TryFrom<proto::GetPageResponse> for GetPageResponse {
type Error = ProtocolError;
fn try_from(pb: proto::GetPageResponse) -> Result<Self, ProtocolError> {
Ok(Self {
request_id: pb
.request_id
.ok_or(ProtocolError::Missing("request_id"))?
.into(),
impl From<proto::GetPageResponse> for GetPageResponse {
fn from(pb: proto::GetPageResponse) -> Self {
Self {
request_id: pb.request_id,
status_code: pb.status_code.into(),
reason: Some(pb.reason).filter(|r| !r.is_empty()),
rel: pb.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?,
pages: pb.page.into_iter().map(Page::from).collect(),
})
page_images: pb.page_image,
}
}
}
impl From<GetPageResponse> for proto::GetPageResponse {
fn from(response: GetPageResponse) -> Self {
Self {
request_id: Some(response.request_id.into()),
request_id: response.request_id,
status_code: response.status_code.into(),
reason: response.reason.unwrap_or_default(),
rel: Some(response.rel.into()),
page: response.pages.into_iter().map(proto::Page::from).collect(),
page_image: response.page_images,
}
}
}
@@ -569,39 +514,11 @@ impl GetPageResponse {
request_id,
status_code,
reason: Some(status.message().to_string()),
rel: RelTag::default(),
pages: Vec::new(),
page_images: Vec::new(),
})
}
}
// A page.
#[derive(Clone, Debug)]
pub struct Page {
/// The page number.
pub block_number: u32,
/// The materialized page image, as an 8KB byte vector.
pub image: Bytes,
}
impl From<proto::Page> for Page {
fn from(pb: proto::Page) -> Self {
Self {
block_number: pb.block_number,
image: pb.image,
}
}
}
impl From<Page> for proto::Page {
fn from(page: Page) -> Self {
Self {
block_number: page.block_number,
image: page.image,
}
}
}
/// A GetPage response status code.
///
/// These are effectively equivalent to gRPC statuses. However, we use a bidirectional stream

View File

@@ -27,9 +27,8 @@ tokio-util.workspace = true
tonic.workspace = true
url.workspace = true
pageserver_api.workspace = true
pageserver_client.workspace = true
pageserver_client_grpc.workspace = true
pageserver_api.workspace = true
pageserver_page_api.workspace = true
utils = { path = "../../libs/utils/" }
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -10,14 +10,12 @@ use anyhow::Context;
use async_trait::async_trait;
use bytes::Bytes;
use camino::Utf8PathBuf;
use futures::stream::FuturesUnordered;
use futures::{Stream, StreamExt as _};
use pageserver_api::key::Key;
use pageserver_api::keyspace::KeySpaceAccum;
use pageserver_api::pagestream_api::{PagestreamGetPageRequest, PagestreamRequest};
use pageserver_api::reltag::RelTag;
use pageserver_api::shard::TenantShardId;
use pageserver_client_grpc::{self as client_grpc, ShardSpec};
use pageserver_page_api as page_api;
use rand::prelude::*;
use tokio::task::JoinSet;
@@ -39,10 +37,6 @@ pub(crate) struct Args {
/// Pageserver connection string. Supports postgresql:// and grpc:// protocols.
#[clap(long, default_value = "postgres://postgres@localhost:64000")]
page_service_connstring: String,
/// Use the rich gRPC Pageserver client `client_grpc::PageserverClient`, rather than the basic
/// no-frills `page_api::Client`. Only valid with grpc:// connstrings.
#[clap(long)]
rich_client: bool,
#[clap(long)]
pageserver_jwt: Option<String>,
#[clap(long, default_value = "1")]
@@ -338,7 +332,6 @@ async fn main_impl(
let client: Box<dyn Client> = match scheme.as_str() {
"postgresql" | "postgres" => {
assert!(!args.compression, "libpq does not support compression");
assert!(!args.rich_client, "rich client requires grpc://");
Box::new(
LibpqClient::new(&args.page_service_connstring, worker_id.timeline)
.await
@@ -346,16 +339,6 @@ async fn main_impl(
)
}
"grpc" if args.rich_client => Box::new(
RichGrpcClient::new(
&args.page_service_connstring,
worker_id.timeline,
args.compression,
)
.await
.unwrap(),
),
"grpc" => Box::new(
GrpcClient::new(
&args.page_service_connstring,
@@ -674,7 +657,7 @@ impl Client for GrpcClient {
blks: Vec<u32>,
) -> anyhow::Result<()> {
let req = page_api::GetPageRequest {
request_id: req_id.into(),
request_id: req_id,
request_class: page_api::GetPageClass::Normal,
read_lsn: page_api::ReadLsn {
request_lsn: req_lsn,
@@ -694,79 +677,6 @@ impl Client for GrpcClient {
"unexpected status code: {}",
resp.status_code,
);
Ok((
resp.request_id.id,
resp.pages.into_iter().map(|p| p.image).collect(),
))
}
}
/// A rich gRPC Pageserver client.
struct RichGrpcClient {
inner: Arc<client_grpc::PageserverClient>,
requests: FuturesUnordered<
Pin<Box<dyn Future<Output = anyhow::Result<page_api::GetPageResponse>> + Send>>,
>,
}
impl RichGrpcClient {
async fn new(
connstring: &str,
ttid: TenantTimelineId,
compression: bool,
) -> anyhow::Result<Self> {
let inner = Arc::new(client_grpc::PageserverClient::new(
ttid.tenant_id,
ttid.timeline_id,
ShardSpec::new(
[(ShardIndex::unsharded(), connstring.to_string())].into(),
None,
)?,
None,
compression.then_some(tonic::codec::CompressionEncoding::Zstd),
)?);
Ok(Self {
inner,
requests: FuturesUnordered::new(),
})
}
}
#[async_trait]
impl Client for RichGrpcClient {
async fn send_get_page(
&mut self,
req_id: u64,
req_lsn: Lsn,
mod_lsn: Lsn,
rel: RelTag,
blks: Vec<u32>,
) -> anyhow::Result<()> {
let req = page_api::GetPageRequest {
request_id: req_id.into(),
request_class: page_api::GetPageClass::Normal,
read_lsn: page_api::ReadLsn {
request_lsn: req_lsn,
not_modified_since_lsn: Some(mod_lsn),
},
rel,
block_numbers: blks,
};
let inner = self.inner.clone();
self.requests.push(Box::pin(async move {
inner
.get_page(req)
.await
.map_err(|err| anyhow::anyhow!("{err}"))
}));
Ok(())
}
async fn recv_get_page(&mut self) -> anyhow::Result<(u64, Vec<Bytes>)> {
let resp = self.requests.next().await.unwrap()?;
Ok((
resp.request_id.id,
resp.pages.into_iter().map(|p| p.image).collect(),
))
Ok((resp.request_id, resp.page_images))
}
}

View File

@@ -29,8 +29,8 @@ use pageserver::task_mgr::{
};
use pageserver::tenant::{TenantSharedResources, mgr, secondary};
use pageserver::{
CancellableTask, ConsumptionMetricsTasks, HttpEndpointListener, HttpsEndpointListener,
MetricsCollectionTask, http, page_cache, page_service, task_mgr, virtual_file,
CancellableTask, ConsumptionMetricsTasks, HttpEndpointListener, HttpsEndpointListener, http,
page_cache, page_service, task_mgr, virtual_file,
};
use postgres_backend::AuthType;
use remote_storage::GenericRemoteStorage;
@@ -41,7 +41,6 @@ use tracing_utils::OtelGuard;
use utils::auth::{JwtAuth, SwappableJwtAuth};
use utils::crashsafe::syncfs;
use utils::logging::TracingErrorLayerEnablement;
use utils::metrics_collector::{METRICS_COLLECTION_INTERVAL, METRICS_COLLECTOR};
use utils::sentry_init::init_sentry;
use utils::{failpoint_support, logging, project_build_tag, project_git_version, tcp_listener};
@@ -764,41 +763,6 @@ fn start_pageserver(
(http_task, https_task)
};
/* BEGIN_HADRON */
let metrics_collection_task = {
let cancel = shutdown_pageserver.child_token();
let task = crate::BACKGROUND_RUNTIME.spawn({
let cancel = cancel.clone();
let background_jobs_barrier = background_jobs_barrier.clone();
async move {
if conf.force_metric_collection_on_scrape {
return;
}
// first wait until background jobs are cleared to launch.
tokio::select! {
_ = cancel.cancelled() => { return; },
_ = background_jobs_barrier.wait() => {}
};
let mut interval = tokio::time::interval(METRICS_COLLECTION_INTERVAL);
loop {
tokio::select! {
_ = cancel.cancelled() => {
tracing::info!("cancelled metrics collection task, exiting...");
break;
},
_ = interval.tick() => {}
}
tokio::task::spawn_blocking(|| {
METRICS_COLLECTOR.run_once(true);
});
}
}
});
MetricsCollectionTask(CancellableTask { task, cancel })
};
/* END_HADRON */
let consumption_metrics_tasks = {
let cancel = shutdown_pageserver.child_token();
let task = crate::BACKGROUND_RUNTIME.spawn({
@@ -880,7 +844,6 @@ fn start_pageserver(
https_endpoint_listener,
page_service,
page_service_grpc,
metrics_collection_task,
consumption_metrics_tasks,
disk_usage_eviction_task,
&tenant_manager,

View File

@@ -256,10 +256,6 @@ pub struct PageServerConf {
/// Defines what is a big tenant for the purpose of image layer generation.
/// See Timeline::should_check_if_image_layers_required
pub image_layer_generation_large_timeline_threshold: Option<u64>,
/// Controls whether to collect all metrics on each scrape or to return potentially stale
/// results.
pub force_metric_collection_on_scrape: bool,
}
/// Token for authentication to safekeepers
@@ -441,7 +437,6 @@ impl PageServerConf {
timeline_import_config,
basebackup_cache_config,
image_layer_generation_large_timeline_threshold,
force_metric_collection_on_scrape,
} = config_toml;
let mut conf = PageServerConf {
@@ -501,7 +496,6 @@ impl PageServerConf {
timeline_import_config,
basebackup_cache_config,
image_layer_generation_large_timeline_threshold,
force_metric_collection_on_scrape,
// ------------------------------------------------------------
// fields that require additional validation or custom handling

View File

@@ -2,9 +2,7 @@
//! Management HTTP API
//!
use std::cmp::Reverse;
use std::collections::BTreeMap;
use std::collections::BinaryHeap;
use std::collections::HashMap;
use std::collections::{BinaryHeap, HashMap};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
@@ -3216,30 +3214,6 @@ async fn get_utilization(
.map_err(ApiError::InternalServerError)
}
/// HADRON
async fn list_tenant_visible_size_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
let state = get_state(&request);
let mut map = BTreeMap::new();
for (tenant_shard_id, slot) in state.tenant_manager.list() {
match slot {
TenantSlot::Attached(tenant) => {
let visible_size = tenant.get_visible_size();
map.insert(tenant_shard_id, visible_size);
}
TenantSlot::Secondary(_) | TenantSlot::InProgress(_) => {
continue;
}
}
}
json_response(StatusCode::OK, map)
}
async fn list_aux_files(
mut request: Request<Body>,
_cancel: CancellationToken,
@@ -3964,14 +3938,9 @@ pub fn make_router(
.expect("construct launch timestamp header middleware"),
);
let force_metric_collection_on_scrape = state.conf.force_metric_collection_on_scrape;
let prometheus_metrics_handler_wrapper =
move |req| prometheus_metrics_handler(req, force_metric_collection_on_scrape);
Ok(router
.data(state)
.get("/metrics", move |r| request_span(r, prometheus_metrics_handler_wrapper))
.get("/metrics", |r| request_span(r, prometheus_metrics_handler))
.get("/profile/cpu", |r| request_span(r, profile_cpu_handler))
.get("/profile/heap", |r| request_span(r, profile_heap_handler))
.get("/v1/status", |r| api_handler(r, status_handler))
@@ -4177,7 +4146,6 @@ pub fn make_router(
.put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler))
.put("/v1/io_mode", |r| api_handler(r, put_io_mode_handler))
.get("/v1/utilization", |r| api_handler(r, get_utilization))
.get("/v1/list_tenant_visible_size", |r| api_handler(r, list_tenant_visible_size_handler))
.post(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/ingest_aux_files",
|r| testing_api_handler("ingest_aux_files", r, ingest_aux_files),

View File

@@ -73,9 +73,6 @@ pub struct HttpEndpointListener(pub CancellableTask);
pub struct HttpsEndpointListener(pub CancellableTask);
pub struct ConsumptionMetricsTasks(pub CancellableTask);
pub struct DiskUsageEvictionTask(pub CancellableTask);
// HADRON
pub struct MetricsCollectionTask(pub CancellableTask);
impl CancellableTask {
pub async fn shutdown(self) {
self.cancel.cancel();
@@ -90,7 +87,6 @@ pub async fn shutdown_pageserver(
https_listener: Option<HttpsEndpointListener>,
page_service: page_service::Listener,
grpc_task: Option<CancellableTask>,
metrics_collection_task: MetricsCollectionTask,
consumption_metrics_worker: ConsumptionMetricsTasks,
disk_usage_eviction_task: Option<DiskUsageEvictionTask>,
tenant_manager: &TenantManager,
@@ -215,14 +211,6 @@ pub async fn shutdown_pageserver(
// Best effort to persist any outstanding deletions, to avoid leaking objects
deletion_queue.shutdown(Duration::from_secs(5)).await;
// HADRON
timed(
metrics_collection_task.0.shutdown(),
"shutdown metrics collections metrics",
Duration::from_secs(1),
)
.await;
timed(
consumption_metrics_worker.0.shutdown(),
"shutdown consumption metrics",

View File

@@ -2847,24 +2847,6 @@ pub(crate) static MISROUTED_PAGESTREAM_REQUESTS: Lazy<IntCounter> = Lazy::new(||
.expect("failed to define a metric")
});
// Global counter for PageStream request results by outcome. Outcomes are divided into 3 categories:
// - success
// - internal_error: errors that indicate bugs in the storage cluster (e.g. page reconstruction errors, misrouted requests, LSN timeout errors)
// - other_error: transient error conditions that are expected in normal operation or indicate bugs with other parts of the system (e.g. error due to pageserver shutdown, malformed requests etc.)
pub(crate) static PAGESTREAM_HANDLER_RESULTS_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_pagestream_handler_results_total",
"Number of pageserver pagestream handler results by outcome (success, internal_error, other_error)",
&["outcome"]
)
.expect("failed to define a metric")
});
// Constants for pageserver_pagestream_handler_results_total's outcome labels
pub(crate) const PAGESTREAM_HANDLER_OUTCOME_SUCCESS: &str = "success";
pub(crate) const PAGESTREAM_HANDLER_OUTCOME_INTERNAL_ERROR: &str = "internal_error";
pub(crate) const PAGESTREAM_HANDLER_OUTCOME_OTHER_ERROR: &str = "other_error";
// Metrics collected on WAL redo operations
//
// We collect the time spent in actual WAL redo ('redo'), and time waiting

View File

@@ -70,7 +70,7 @@ use crate::context::{
};
use crate::metrics::{
self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, GetPageBatchBreakReason, LIVE_CONNECTIONS,
MISROUTED_PAGESTREAM_REQUESTS, PAGESTREAM_HANDLER_RESULTS_TOTAL, SmgrOpTimer, TimelineMetrics,
MISROUTED_PAGESTREAM_REQUESTS, SmgrOpTimer, TimelineMetrics,
};
use crate::pgdatadir_mapping::{LsnRange, Version};
use crate::span::{
@@ -1441,57 +1441,20 @@ impl PageServerHandler {
let (response_msg, ctx) = match handler_result {
Err(e) => match &e.err {
PageStreamError::Shutdown => {
// BEGIN HADRON
PAGESTREAM_HANDLER_RESULTS_TOTAL
.with_label_values(&[metrics::PAGESTREAM_HANDLER_OUTCOME_OTHER_ERROR])
.inc();
// END HADRON
// If we fail to fulfil a request during shutdown, which may be _because_ of
// shutdown, then do not send the error to the client. Instead just drop the
// connection.
span.in_scope(|| info!("dropping connection due to shutdown"));
return Err(QueryError::Shutdown);
}
PageStreamError::Reconnect(_reason) => {
span.in_scope(|| {
// BEGIN HADRON
// We can get here because the compute node is pointing at the wrong PS. We
// already have a metric to keep track of this so suppressing this log to
// reduce log spam. The information in this log message is not going to be that
// helpful given the volume of logs that can be generated.
// info!("handler requested reconnect: {reason}")
// END HADRON
});
// BEGIN HADRON
PAGESTREAM_HANDLER_RESULTS_TOTAL
.with_label_values(&[
metrics::PAGESTREAM_HANDLER_OUTCOME_INTERNAL_ERROR,
])
.inc();
// END HADRON
PageStreamError::Reconnect(reason) => {
span.in_scope(|| info!("handler requested reconnect: {reason}"));
return Err(QueryError::Reconnect);
}
PageStreamError::Read(_)
| PageStreamError::LsnTimeout(_)
| PageStreamError::NotFound(_)
| PageStreamError::BadRequest(_) => {
// BEGIN HADRON
if let PageStreamError::Read(_) | PageStreamError::LsnTimeout(_) = &e.err {
PAGESTREAM_HANDLER_RESULTS_TOTAL
.with_label_values(&[
metrics::PAGESTREAM_HANDLER_OUTCOME_INTERNAL_ERROR,
])
.inc();
} else {
PAGESTREAM_HANDLER_RESULTS_TOTAL
.with_label_values(&[
metrics::PAGESTREAM_HANDLER_OUTCOME_OTHER_ERROR,
])
.inc();
}
// END HADRON
// print the all details to the log with {:#}, but for the client the
// error message is enough. Do not log if shutting down, as the anyhow::Error
// here includes cancellation which is not an error.
@@ -1509,15 +1472,7 @@ impl PageServerHandler {
)
}
},
Ok((response_msg, _op_timer_already_observed, ctx)) => {
// BEGIN HADRON
PAGESTREAM_HANDLER_RESULTS_TOTAL
.with_label_values(&[metrics::PAGESTREAM_HANDLER_OUTCOME_SUCCESS])
.inc();
// END HADRON
(response_msg, Some(ctx))
}
Ok((response_msg, _op_timer_already_observed, ctx)) => (response_msg, Some(ctx)),
};
let ctx = ctx.map(|req_ctx| {
@@ -3338,12 +3293,9 @@ impl GrpcPageServiceHandler {
}
/// Generates a PagestreamRequest header from a ReadLsn and request ID.
fn make_hdr(
read_lsn: page_api::ReadLsn,
req_id: Option<page_api::RequestID>,
) -> PagestreamRequest {
fn make_hdr(read_lsn: page_api::ReadLsn, req_id: u64) -> PagestreamRequest {
PagestreamRequest {
reqid: req_id.map(|r| r.id).unwrap_or_default(),
reqid: req_id,
request_lsn: read_lsn.request_lsn,
not_modified_since: read_lsn
.not_modified_since_lsn
@@ -3453,7 +3405,7 @@ impl GrpcPageServiceHandler {
batch.push(BatchedGetPageRequest {
req: PagestreamGetPageRequest {
hdr: Self::make_hdr(req.read_lsn, Some(req.request_id)),
hdr: Self::make_hdr(req.read_lsn, req.request_id),
rel: req.rel,
blkno,
},
@@ -3483,16 +3435,12 @@ impl GrpcPageServiceHandler {
request_id: req.request_id,
status_code: page_api::GetPageStatusCode::Ok,
reason: None,
rel: req.rel,
pages: Vec::with_capacity(results.len()),
page_images: Vec::with_capacity(results.len()),
};
for result in results {
match result {
Ok((PagestreamBeMessage::GetPage(r), _, _)) => resp.pages.push(page_api::Page {
block_number: r.req.blkno,
image: r.page,
}),
Ok((PagestreamBeMessage::GetPage(r), _, _)) => resp.page_images.push(r.page),
Ok((resp, _, _)) => {
return Err(tonic::Status::internal(format!(
"unexpected response: {resp:?}"
@@ -3535,7 +3483,7 @@ impl proto::PageService for GrpcPageServiceHandler {
span_record!(rel=%req.rel, lsn=%req.read_lsn);
let req = PagestreamExistsRequest {
hdr: Self::make_hdr(req.read_lsn, None),
hdr: Self::make_hdr(req.read_lsn, 0),
rel: req.rel,
};
@@ -3685,7 +3633,7 @@ impl proto::PageService for GrpcPageServiceHandler {
span_record!(db_oid=%req.db_oid, lsn=%req.read_lsn);
let req = PagestreamDbSizeRequest {
hdr: Self::make_hdr(req.read_lsn, None),
hdr: Self::make_hdr(req.read_lsn, 0),
dbnode: req.db_oid,
};
@@ -3735,7 +3683,7 @@ impl proto::PageService for GrpcPageServiceHandler {
.await?
.downgrade();
while let Some(req) = reqs.message().await? {
let req_id = req.request_id.map(page_api::RequestID::from).unwrap_or_default();
let req_id = req.request_id;
let result = Self::get_page(&ctx, &timeline, req, io_concurrency.clone())
.instrument(span.clone()) // propagate request span
.await;
@@ -3774,7 +3722,7 @@ impl proto::PageService for GrpcPageServiceHandler {
span_record!(rel=%req.rel, lsn=%req.read_lsn);
let req = PagestreamNblocksRequest {
hdr: Self::make_hdr(req.read_lsn, None),
hdr: Self::make_hdr(req.read_lsn, 0),
rel: req.rel,
};
@@ -3807,7 +3755,7 @@ impl proto::PageService for GrpcPageServiceHandler {
span_record!(kind=%req.kind, segno=%req.segno, lsn=%req.read_lsn);
let req = PagestreamGetSlruSegmentRequest {
hdr: Self::make_hdr(req.read_lsn, None),
hdr: Self::make_hdr(req.read_lsn, 0),
kind: req.kind as u8,
segno: req.segno,
};

View File

@@ -5719,16 +5719,6 @@ impl TenantShard {
.unwrap_or(0)
}
/// HADRON
/// Return the visible size of all timelines in this tenant.
pub(crate) fn get_visible_size(&self) -> u64 {
let timelines = self.timelines.lock().unwrap();
timelines
.values()
.map(|t| t.metrics.visible_physical_size_gauge.get())
.sum()
}
/// Builds a new tenant manifest, and uploads it if it differs from the last-known tenant
/// manifest in `Self::remote_tenant_manifest`.
///

View File

@@ -225,7 +225,7 @@ impl fmt::Display for ImageLayerName {
/// storage and object names in remote storage consist of the LayerName plus some extra qualifiers
/// that uniquely identify the physical incarnation of a layer (see [crate::tenant::remote_timeline_client::remote_layer_path])
/// and [`crate::tenant::storage_layer::layer::local_layer_path`])
#[derive(Debug, PartialEq, Eq, Hash, Clone, Ord, PartialOrd)]
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
pub enum LayerName {
Image(ImageLayerName),
Delta(DeltaLayerName),

View File

@@ -6633,7 +6633,7 @@ impl Timeline {
const MAX_ALLOWED_STANDBY_LAG: u64 = 10u64 << 30; // 10 GB
if standby_lag.0 < MAX_ALLOWED_STANDBY_LAG {
new_gc_cutoff = Lsn::min(standby_horizon, new_gc_cutoff);
trace!("holding off GC for standby apply LSN {}", standby_horizon);
info!("holding off GC for standby apply LSN {}", standby_horizon);
} else {
warn!(
"standby is lagging for more than {}MB, not holding gc for it",

View File

@@ -750,7 +750,7 @@ impl ConnectionManagerState {
WALRECEIVER_BROKER_UPDATES.inc();
trace!(
info!(
"safekeeper info update: standby_horizon(cutoff)={}",
timeline_update.standby_horizon
);

View File

@@ -1410,7 +1410,7 @@ pg_init_libpagestore(void)
"sharding stripe size",
NULL,
&stripe_size,
2048, 1, INT_MAX,
32768, 1, INT_MAX,
PGC_SIGHUP,
GUC_UNIT_BLOCKS,
NULL, NULL, NULL);

View File

@@ -376,18 +376,6 @@ typedef struct PageserverFeedback
uint32 shard_number;
} PageserverFeedback;
/* BEGIN_HADRON */
typedef struct WalRateLimiter
{
/* If the value is 1, PG backends will hit backpressure. */
pg_atomic_uint32 should_limit;
/* The number of bytes sent in the current second. */
uint64 sent_bytes;
/* The last recorded time in microsecond. */
TimestampTz last_recorded_time_us;
} WalRateLimiter;
/* END_HADRON */
typedef struct WalproposerShmemState
{
pg_atomic_uint64 propEpochStartLsn;
@@ -407,11 +395,6 @@ typedef struct WalproposerShmemState
/* aggregated feedback with min LSNs across shards */
PageserverFeedback min_ps_feedback;
/* BEGIN_HADRON */
/* The WAL rate limiter */
WalRateLimiter wal_rate_limiter;
/* END_HADRON */
} WalproposerShmemState;
/*

View File

@@ -66,9 +66,6 @@ int wal_acceptor_reconnect_timeout = 1000;
int wal_acceptor_connection_timeout = 10000;
int safekeeper_proto_version = 3;
char *safekeeper_conninfo_options = "";
/* BEGIN_HADRON */
int databricks_max_wal_mb_per_second = -1;
/* END_HADRON */
/* Set to true in the walproposer bgw. */
static bool am_walproposer;
@@ -255,18 +252,6 @@ nwp_register_gucs(void)
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
/* BEGIN_HADRON */
DefineCustomIntVariable(
"databricks.max_wal_mb_per_second",
"The maximum WAL MB per second allowed. If breached, sending WAL hit the backpressure. Setting to -1 disables the limit.",
NULL,
&databricks_max_wal_mb_per_second,
-1, -1, INT_MAX,
PGC_SUSET,
GUC_UNIT_MB,
NULL, NULL, NULL);
/* END_HADRON */
}
@@ -408,7 +393,6 @@ assign_neon_safekeepers(const char *newval, void *extra)
static uint64
backpressure_lag_impl(void)
{
struct WalproposerShmemState* state = NULL;
if (max_replication_apply_lag > 0 || max_replication_flush_lag > 0 || max_replication_write_lag > 0)
{
XLogRecPtr writePtr;
@@ -442,18 +426,6 @@ backpressure_lag_impl(void)
return (myFlushLsn - applyPtr - max_replication_apply_lag * MB);
}
}
/* BEGIN_HADRON */
if (databricks_max_wal_mb_per_second == -1) {
return 0;
}
state = GetWalpropShmemState();
if (state != NULL && pg_atomic_read_u32(&state->wal_rate_limiter.should_limit) == 1)
{
return 1;
}
/* END_HADRON */
return 0;
}
@@ -500,9 +472,6 @@ WalproposerShmemInit(void)
pg_atomic_init_u64(&walprop_shared->mineLastElectedTerm, 0);
pg_atomic_init_u64(&walprop_shared->backpressureThrottlingTime, 0);
pg_atomic_init_u64(&walprop_shared->currentClusterSize, 0);
/* BEGIN_HADRON */
pg_atomic_init_u32(&walprop_shared->wal_rate_limiter.should_limit, 0);
/* END_HADRON */
}
LWLockRelease(AddinShmemInitLock);
@@ -518,9 +487,6 @@ WalproposerShmemInit_SyncSafekeeper(void)
pg_atomic_init_u64(&walprop_shared->propEpochStartLsn, 0);
pg_atomic_init_u64(&walprop_shared->mineLastElectedTerm, 0);
pg_atomic_init_u64(&walprop_shared->backpressureThrottlingTime, 0);
/* BEGIN_HADRON */
pg_atomic_init_u32(&walprop_shared->wal_rate_limiter.should_limit, 0);
/* END_HADRON */
}
#define BACK_PRESSURE_DELAY 10000L // 0.01 sec
@@ -555,6 +521,7 @@ backpressure_throttling_impl(void)
if (lag == 0)
return retry;
old_status = get_ps_display(&len);
new_status = (char *) palloc(len + 64 + 1);
memcpy(new_status, old_status, len);
@@ -1491,8 +1458,6 @@ XLogBroadcastWalProposer(WalProposer *wp)
{
XLogRecPtr startptr;
XLogRecPtr endptr;
struct WalproposerShmemState *state = NULL;
TimestampTz now = 0;
/* Start from the last sent position */
startptr = sentPtr;
@@ -1537,36 +1502,13 @@ XLogBroadcastWalProposer(WalProposer *wp)
* that arbitrary LSN is eventually reported as written, flushed and
* applied, so that it can measure the elapsed time.
*/
now = GetCurrentTimestamp();
LagTrackerWrite(endptr, now);
LagTrackerWrite(endptr, GetCurrentTimestamp());
/* Do we have any work to do? */
Assert(startptr <= endptr);
if (endptr <= startptr)
return;
/* BEGIN_HADRON */
state = GetWalpropShmemState();
if (databricks_max_wal_mb_per_second != -1 && state != NULL)
{
uint64 max_wal_bytes = (uint64) databricks_max_wal_mb_per_second * 1024 * 1024;
struct WalRateLimiter *limiter = &state->wal_rate_limiter;
if (now - limiter->last_recorded_time_us > USECS_PER_SEC)
{
/* Reset the rate limiter */
limiter->last_recorded_time_us = now;
limiter->sent_bytes = 0;
pg_atomic_exchange_u32(&limiter->should_limit, 0);
}
limiter->sent_bytes += (endptr - startptr);
if (limiter->sent_bytes > max_wal_bytes)
{
pg_atomic_exchange_u32(&limiter->should_limit, 1);
}
}
/* END_HADRON */
WalProposerBroadcast(wp, startptr, endptr);
sentPtr = endptr;

View File

@@ -37,7 +37,6 @@ use tracing::*;
use utils::auth::{JwtAuth, Scope, SwappableJwtAuth};
use utils::id::NodeId;
use utils::logging::{self, LogFormat, SecretString};
use utils::metrics_collector::{METRICS_COLLECTION_INTERVAL, METRICS_COLLECTOR};
use utils::sentry_init::init_sentry;
use utils::{pid_file, project_build_tag, project_git_version, tcp_listener};
@@ -244,11 +243,6 @@ struct Args {
#[arg(long)]
enable_tls_wal_service_api: bool,
/// Controls whether to collect all metrics on each scrape or to return potentially stale
/// results.
#[arg(long, default_value_t = true)]
force_metric_collection_on_scrape: bool,
/// Run in development mode (disables security checks)
#[arg(long, help = "Run in development mode (disables security checks)")]
dev: bool,
@@ -434,7 +428,6 @@ async fn main() -> anyhow::Result<()> {
ssl_ca_certs,
use_https_safekeeper_api: args.use_https_safekeeper_api,
enable_tls_wal_service_api: args.enable_tls_wal_service_api,
force_metric_collection_on_scrape: args.force_metric_collection_on_scrape,
});
// initialize sentry if SENTRY_DSN is provided
@@ -647,26 +640,6 @@ async fn start_safekeeper(conf: Arc<SafeKeeperConf>) -> Result<()> {
.map(|res| ("broker main".to_owned(), res));
tasks_handles.push(Box::pin(broker_task_handle));
/* BEGIN_HADRON */
if conf.force_metric_collection_on_scrape {
let metrics_handle = current_thread_rt
.as_ref()
.unwrap_or_else(|| BACKGROUND_RUNTIME.handle())
.spawn(async move {
let mut interval: tokio::time::Interval =
tokio::time::interval(METRICS_COLLECTION_INTERVAL);
loop {
interval.tick().await;
tokio::task::spawn_blocking(|| {
METRICS_COLLECTOR.run_once(true);
});
}
})
.map(|res| ("broker main".to_owned(), res));
tasks_handles.push(Box::pin(metrics_handle));
}
/* END_HADRON */
set_build_info_metric(GIT_VERSION, BUILD_TAG);
// TODO: update tokio-stream, convert to real async Stream with

View File

@@ -699,11 +699,6 @@ pub fn make_router(
}))
}
let force_metric_collection_on_scrape = conf.force_metric_collection_on_scrape;
let prometheus_metrics_handler_wrapper =
move |req| prometheus_metrics_handler(req, force_metric_collection_on_scrape);
// NB: on any changes do not forget to update the OpenAPI spec
// located nearby (/safekeeper/src/http/openapi_spec.yaml).
let auth = conf.http_auth.clone();
@@ -711,9 +706,7 @@ pub fn make_router(
.data(conf)
.data(global_timelines)
.data(auth)
.get("/metrics", move |r| {
request_span(r, prometheus_metrics_handler_wrapper)
})
.get("/metrics", |r| request_span(r, prometheus_metrics_handler))
.get("/profile/cpu", |r| request_span(r, profile_cpu_handler))
.get("/profile/heap", |r| request_span(r, profile_heap_handler))
.get("/v1/status", |r| request_span(r, status_handler))

View File

@@ -134,7 +134,6 @@ pub struct SafeKeeperConf {
pub ssl_ca_certs: Vec<Pem>,
pub use_https_safekeeper_api: bool,
pub enable_tls_wal_service_api: bool,
pub force_metric_collection_on_scrape: bool,
}
impl SafeKeeperConf {
@@ -184,7 +183,6 @@ impl SafeKeeperConf {
ssl_ca_certs: Vec::new(),
use_https_safekeeper_api: false,
enable_tls_wal_service_api: false,
force_metric_collection_on_scrape: true,
}
}
}

View File

@@ -59,15 +59,6 @@ pub static FLUSH_WAL_SECONDS: Lazy<Histogram> = Lazy::new(|| {
.expect("Failed to register safekeeper_flush_wal_seconds histogram")
});
/* BEGIN_HADRON */
// Counter of all ProposerAcceptorMessage requests received
pub static PROPOSER_ACCEPTOR_MESSAGES_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"safekeeper_proposer_acceptor_messages_total",
"Total number of ProposerAcceptorMessage requests received by the Safekeeper.",
&["outcome"]
)
.expect("Failed to register safekeeper_proposer_acceptor_messages_total counter")
});
pub static WAL_DISK_IO_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"safekeeper_wal_disk_io_errors",

View File

@@ -24,7 +24,7 @@ use utils::id::{NodeId, TenantId, TimelineId};
use utils::lsn::Lsn;
use utils::pageserver_feedback::PageserverFeedback;
use crate::metrics::{MISC_OPERATION_SECONDS, PROPOSER_ACCEPTOR_MESSAGES_TOTAL};
use crate::metrics::MISC_OPERATION_SECONDS;
use crate::state::TimelineState;
use crate::{control_file, wal_storage};
@@ -938,7 +938,7 @@ where
&mut self,
msg: &ProposerAcceptorMessage,
) -> Result<Option<AcceptorProposerMessage>> {
let res = match msg {
match msg {
ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg).await,
ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg).await,
ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg).await,
@@ -949,20 +949,7 @@ where
self.handle_append_request(msg, false).await
}
ProposerAcceptorMessage::FlushWAL => self.handle_flush().await,
};
// BEGIN HADRON
match &res {
Ok(_) => PROPOSER_ACCEPTOR_MESSAGES_TOTAL
.with_label_values(&["success"])
.inc(),
Err(_) => PROPOSER_ACCEPTOR_MESSAGES_TOTAL
.with_label_values(&["error"])
.inc(),
};
res
// END HADRON
}
}
/// Handle initial message from proposer: check its sanity and send my

View File

@@ -220,7 +220,7 @@ impl WalSenders {
fn record_standby_reply(self: &Arc<WalSenders>, id: WalSenderId, reply: &StandbyReply) {
let mut shared = self.mutex.lock();
let slot = shared.get_slot_mut(id);
debug!(
info!(
"Record standby reply: ts={} apply_lsn={}",
reply.reply_ts, reply.apply_lsn
);
@@ -400,7 +400,10 @@ impl WalSendersShared {
}
}
self.agg_standby_feedback = StandbyFeedback {
reply: reply_agg,
reply: {
info!(prev=%self.agg_standby_feedback.reply.apply_lsn, new=%reply_agg.apply_lsn, "updating agg_standby_feedback apply_lsn");
reply_agg
},
hs_feedback: agg,
};
}

View File

@@ -166,7 +166,7 @@ fn hadron_determine_offloader(mgr: &Manager, state: &StateSnapshot) -> (Option<N
let backup_lag = state.commit_lsn.checked_sub(state.backup_lsn);
if backup_lag.is_none() {
debug!("Backup lag is None. Skipping re-election.");
info!("Backup lag is None. Skipping re-election.");
return (offloader, election_dbg_str);
}

View File

@@ -190,7 +190,6 @@ pub fn run_server(os: NodeOs, disk: Arc<SafekeeperDisk>) -> Result<()> {
ssl_ca_certs: Vec::new(),
use_https_safekeeper_api: false,
enable_tls_wal_service_api: false,
force_metric_collection_on_scrape: true,
};
let mut global = GlobalMap::new(disk, conf.clone())?;

View File

@@ -1984,14 +1984,11 @@ impl Service {
});
// Check that there is enough safekeepers configured that we can create new timelines
let test_sk_res_str = match this.safekeepers_for_new_timeline().await {
Ok(v) => format!("Ok({v:?})"),
Err(v) => format!("Err({v:})"),
};
let test_sk_res = this.safekeepers_for_new_timeline().await;
tracing::info!(
timeline_safekeeper_count = config.timeline_safekeeper_count,
timelines_onto_safekeepers = config.timelines_onto_safekeepers,
"viability test result (test timeline creation on safekeepers): {test_sk_res_str}",
"viability test result (test timeline creation on safekeepers): {test_sk_res:?}",
);
Ok(this)
@@ -4761,7 +4758,6 @@ impl Service {
)
.await;
let mut retry_if_not_attached = false;
let targets = {
let locked = self.inner.read().unwrap();
let mut targets = Vec::new();
@@ -4778,24 +4774,6 @@ impl Service {
.expect("Pageservers may not be deleted while referenced");
targets.push((*tenant_shard_id, node.clone()));
if let Some(location) = shard.observed.locations.get(node_id) {
if let Some(ref conf) = location.conf {
if conf.mode != LocationConfigMode::AttachedSingle
&& conf.mode != LocationConfigMode::AttachedMulti
{
// If the shard is attached as secondary, we need to retry if 404.
retry_if_not_attached = true;
}
// If the shard is attached as primary, we should succeed.
} else {
// Location conf is not available yet, retry if 404.
retry_if_not_attached = true;
}
} else {
// The shard is not attached to the intended pageserver yet, retry if 404.
retry_if_not_attached = true;
}
}
}
targets
@@ -4826,18 +4804,6 @@ impl Service {
valid_until = Some(lease.valid_until);
}
}
Err(mgmt_api::Error::ApiError(StatusCode::NOT_FOUND, _))
if retry_if_not_attached =>
{
// This is expected if the attach is not finished yet. Return 503 so that the client can retry.
return Err(ApiError::ResourceUnavailable(
format!(
"Timeline is not attached to the pageserver {} yet, please retry",
node.get_id()
)
.into(),
));
}
Err(e) => {
return Err(passthrough_api_error(&node, e));
}

View File

@@ -333,13 +333,6 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
res = self.post(f"http://localhost:{self.port}/v1/reload_auth_validation_keys")
self.verbose_error(res)
def list_tenant_visible_size(self) -> dict[TenantShardId, int]:
res = self.get(f"http://localhost:{self.port}/v1/list_tenant_visible_size")
self.verbose_error(res)
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
def tenant_list(self) -> list[dict[Any, Any]]:
res = self.get(f"http://localhost:{self.port}/v1/tenant")
self.verbose_error(res)
@@ -1009,7 +1002,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
def get_metrics_str(self) -> str:
"""You probably want to use get_metrics() instead."""
res = self.get(f"http://localhost:{self.port}/metrics?use_latest=true")
res = self.get(f"http://localhost:{self.port}/metrics")
self.verbose_error(res)
return res.text

View File

@@ -3,7 +3,6 @@ from __future__ import annotations
import re
import socket
from contextlib import closing
from itertools import cycle
from fixtures.log_helper import log
@@ -35,23 +34,15 @@ def can_bind(host: str, port: int) -> bool:
class PortDistributor:
def __init__(self, base_port: int, port_number: int):
self.base_port = base_port
self.port_number = port_number
self.cycle = cycle(range(base_port, base_port + port_number))
self.iterator = iter(range(base_port, base_port + port_number))
self.port_map: dict[int, int] = {}
def get_port(self) -> int:
checked = 0
for port in self.cycle:
for port in self.iterator:
if can_bind("localhost", port):
return port
elif checked < self.port_number:
checked += 1
else:
break
raise RuntimeError(
f"port range ({self.base_port}..{self.base_port + self.port_number}) configured for test is exhausted, consider enlarging the range"
"port range configured for test is exhausted, consider enlarging the range"
)
def replace_with_new_port(self, value: int | str) -> int | str:

View File

@@ -143,7 +143,7 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
def get_metrics_str(self) -> str:
"""You probably want to use get_metrics() instead."""
request_result = self.get(f"http://localhost:{self.port}/metrics?use_latest=true")
request_result = self.get(f"http://localhost:{self.port}/metrics")
request_result.raise_for_status()
return request_result.text

View File

@@ -7,6 +7,7 @@ import time
from functools import partial
import pytest
from fixtures.common_types import Lsn
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
@@ -133,6 +134,9 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder, pause_apply: bool):
tenant_conf = {
# set PITR interval to be small, so we can do GC
"pitr_interval": "0 s",
# this test is largely about PS GC behavior, we control it manually
"gc_period": "0s",
"compaction_period": "0s",
}
env = neon_env_builder.init_start(initial_tenant_conf=tenant_conf)
timeline_id = env.initial_timeline
@@ -163,6 +167,11 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder, pause_apply: bool):
res = s_cur.fetchone()
assert res is not None
s_cur.execute("SHOW hot_standby_feedback")
res = s_cur.fetchone()
assert res is not None
assert res[0] == "off"
s_cur.execute("SELECT COUNT(*) FROM test")
res = s_cur.fetchone()
assert res == (10000,)
@@ -198,6 +207,44 @@ def test_hot_standby_gc(neon_env_builder: NeonEnvBuilder, pause_apply: bool):
res = s_cur.fetchone()
assert res == (10000,)
if pause_apply:
s_cur.execute("SELECT pg_wal_replay_resume()")
wait_replica_caughtup(primary, secondary)
# Wait for PS's view of standby horizon to catch up.
# (When we switch to leases (LKB-88) we need to change this to watch the lease lsn move.)
# (TODO: instead of checking impl details here, somehow assert that gc can delete layers now.
# Tricky to do that without flakiness though.)
# We already waited for replica to catch up, so, this timeout is strictly on
# a few few in-memory only RPCs to propagate standby_horizon.
timeout_secs = 10
started_at = time.time()
shards = tenant_get_shards(env, tenant_id, None)
for tenant_shard_id, pageserver in shards:
client = pageserver.http_client()
while True:
secondary_apply_lsn = Lsn(
secondary.safe_psql_scalar(
"SELECT pg_last_wal_replay_lsn()", log_query=False
)
)
standby_horizon_metric = client.get_metrics().query_one(
"pageserver_standby_horizon",
{
"tenant_id": str(tenant_shard_id.tenant_id),
"shard_id": str(tenant_shard_id.shard_index),
"timeline_id": str(timeline_id),
},
)
standby_horizon_at_ps = Lsn(int(standby_horizon_metric.value))
log.info(f"{tenant_shard_id.shard_index=}: {standby_horizon_at_ps=} {secondary_apply_lsn=}")
if secondary_apply_lsn == standby_horizon_at_ps:
break
if time.time() - started_at > timeout_secs:
pytest.fail(f"standby_horizon didn't propagate within {timeout_secs=}, this is holding up gc on secondary")
time.sleep(1)
def run_pgbench(connstr: str, pg_bin: PgBin):
log.info(f"Start a pgbench workload on pg {connstr}")

View File

@@ -40,7 +40,7 @@ def prom_parse(client: EndpointHttpClient) -> dict[str, float]:
def offload_lfc(method: PrewarmMethod, client: EndpointHttpClient, cur: Cursor) -> Any:
if method == PrewarmMethod.POSTGRES:
cur.execute("select neon.get_local_cache_state()")
cur.execute("select get_local_cache_state()")
return cur.fetchall()[0][0]
if method == PrewarmMethod.AUTOPREWARM:
@@ -72,7 +72,7 @@ def prewarm_endpoint(
elif method == PrewarmMethod.COMPUTE_CTL:
client.prewarm_lfc()
elif method == PrewarmMethod.POSTGRES:
cur.execute("select neon.prewarm_local_cache(%s)", (lfc_state,))
cur.execute("select prewarm_local_cache(%s)", (lfc_state,))
def check_prewarmed(
@@ -116,7 +116,7 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod):
pg_conn = endpoint.connect()
pg_cur = pg_conn.cursor()
pg_cur.execute("create schema neon; create extension neon with schema neon")
pg_cur.execute("create extension neon")
pg_cur.execute("create database lfc")
lfc_conn = endpoint.connect(dbname="lfc")
@@ -142,12 +142,10 @@ def test_lfc_prewarm(neon_simple_env: NeonEnv, method: PrewarmMethod):
lfc_cur = lfc_conn.cursor()
prewarm_endpoint(method, client, pg_cur, lfc_state)
pg_cur.execute(
"select lfc_value from neon.neon_lfc_stats where lfc_key='file_cache_used_pages'"
)
pg_cur.execute("select lfc_value from neon_lfc_stats where lfc_key='file_cache_used_pages'")
lfc_used_pages = pg_cur.fetchall()[0][0]
log.info(f"Used LFC size: {lfc_used_pages}")
pg_cur.execute("select * from neon.get_prewarm_info()")
pg_cur.execute("select * from get_prewarm_info()")
total, prewarmed, skipped, _ = pg_cur.fetchall()[0]
log.info(f"Prewarm info: {total=} {prewarmed=} {skipped=}")
progress = (prewarmed + skipped) * 100 // total
@@ -188,7 +186,7 @@ def test_lfc_prewarm_under_workload(neon_simple_env: NeonEnv, method: PrewarmMet
pg_conn = endpoint.connect()
pg_cur = pg_conn.cursor()
pg_cur.execute("create schema neon; create extension neon with schema neon")
pg_cur.execute("create extension neon")
pg_cur.execute("CREATE DATABASE lfc")
lfc_conn = endpoint.connect(dbname="lfc")

View File

@@ -3,7 +3,6 @@ from __future__ import annotations
from typing import TYPE_CHECKING
from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
DEFAULT_BRANCH_NAME,
NeonEnv,
@@ -165,15 +164,3 @@ def test_pageserver_http_index_part_force_patch(neon_env_builder: NeonEnvBuilder
{"rel_size_migration": "legacy"},
)
assert client.timeline_detail(tenant_id, timeline_id)["rel_size_migration"] == "legacy"
def test_pageserver_get_tenant_visible_size(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_pageservers = 1
env = neon_env_builder.init_start()
env.create_tenant(shard_count=4)
env.create_tenant(shard_count=2)
json = env.pageserver.http_client().list_tenant_visible_size()
log.info(f"{json}")
# initial tennat + 2 newly created tenants
assert len(json) == 7

View File

@@ -60,7 +60,7 @@ def test_replica_promote(neon_simple_env: NeonEnv, method: PromoteMethod):
with primary.connect() as primary_conn:
primary_cur = primary_conn.cursor()
primary_cur.execute("create schema neon;create extension neon with schema neon")
primary_cur.execute("create extension neon")
primary_cur.execute(
"create table t(pk bigint GENERATED ALWAYS AS IDENTITY, payload integer)"
)
@@ -172,7 +172,7 @@ def test_replica_promote_handler_disconnects(neon_simple_env: NeonEnv):
secondary: Endpoint = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
with primary.connect() as conn, conn.cursor() as cur:
cur.execute("create schema neon;create extension neon with schema neon")
cur.execute("create extension neon")
cur.execute("create table t(pk bigint GENERATED ALWAYS AS IDENTITY, payload integer)")
cur.execute("INSERT INTO t(payload) SELECT generate_series(1, 100)")
cur.execute("show neon.safekeepers")