Compare commits

..

1 Commits

Author SHA1 Message Date
Aleksandr Sarantsev
c8475ed008 Introduce flag for deletion API 2025-07-08 17:20:15 +04:00
150 changed files with 1225 additions and 7010 deletions

View File

@@ -87,24 +87,6 @@ jobs:
uses: ./.github/workflows/build-build-tools-image.yml
secrets: inherit
lint-openapi-spec:
runs-on: ubuntu-22.04
needs: [ meta, check-permissions ]
# We do need to run this in `.*-rc-pr` because of hotfixes.
if: ${{ contains(fromJSON('["pr", "push-main", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: docker/login-action@74a5d142397b4f367a81961eba4e8cd7edddf772 # v3.4.0
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- run: make lint-openapi-spec
check-codestyle-python:
needs: [ meta, check-permissions, build-build-tools-image ]
# No need to run on `main` because we this in the merge queue. We do need to run this in `.*-rc-pr` because of hotfixes.
@@ -1004,7 +986,6 @@ jobs:
- name: Verify docker-compose example and test extensions
timeout-minutes: 60
env:
PARALLEL_COMPUTES: 3
TAG: >-
${{
needs.meta.outputs.run-kind == 'compute-rc-pr'

1
.gitignore vendored
View File

@@ -15,7 +15,6 @@ neon.iml
/.neon
/integration_tests/.neon
compaction-suite-results.*
docker-compose/docker-compose-parallel.yml
# Coverage
*.profraw

27
Cargo.lock generated
View File

@@ -1348,7 +1348,6 @@ dependencies = [
"p256 0.13.2",
"pageserver_page_api",
"postgres",
"postgres-types",
"postgres_initdb",
"postgres_versioninfo",
"regex",
@@ -4294,7 +4293,6 @@ dependencies = [
"humantime-serde",
"pageserver_api",
"pageserver_client",
"pageserver_client_grpc",
"pageserver_page_api",
"rand 0.8.5",
"reqwest",
@@ -4324,7 +4322,6 @@ dependencies = [
"pageserver_api",
"postgres_ffi",
"remote_storage",
"serde",
"serde_json",
"svg_fmt",
"thiserror 1.0.69",
@@ -4342,7 +4339,6 @@ dependencies = [
"arc-swap",
"async-compression",
"async-stream",
"base64 0.22.1",
"bincode",
"bit_field",
"byteorder",
@@ -4496,26 +4492,6 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "pageserver_client_grpc"
version = "0.1.0"
dependencies = [
"anyhow",
"arc-swap",
"bytes",
"compute_api",
"futures",
"pageserver_api",
"pageserver_page_api",
"tokio",
"tokio-stream",
"tokio-util",
"tonic 0.13.1",
"tracing",
"utils",
"workspace_hack",
]
[[package]]
name = "pageserver_compaction"
version = "0.1.0"
@@ -5708,8 +5684,6 @@ dependencies = [
"azure_identity",
"azure_storage",
"azure_storage_blobs",
"base64 0.22.1",
"byteorder",
"bytes",
"camino",
"camino-tempfile",
@@ -6991,7 +6965,6 @@ dependencies = [
"pageserver_api",
"pageserver_client",
"reqwest",
"safekeeper_api",
"serde_json",
"storage_controller_client",
"tokio",

View File

@@ -8,7 +8,6 @@ members = [
"pageserver/compaction",
"pageserver/ctl",
"pageserver/client",
"pageserver/client_grpc",
"pageserver/pagebench",
"pageserver/page_api",
"proxy",
@@ -262,7 +261,6 @@ neon-shmem = { version = "0.1", path = "./libs/neon-shmem/" }
pageserver = { path = "./pageserver" }
pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }
pageserver_client = { path = "./pageserver/client" }
pageserver_client_grpc = { path = "./pageserver/client_grpc" }
pageserver_compaction = { version = "0.1", path = "./pageserver/compaction/" }
pageserver_page_api = { path = "./pageserver/page_api" }
postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" }

View File

@@ -220,15 +220,6 @@ neon-pgindent: postgres-v17-pg-bsd-indent neon-pg-ext-v17
setup-pre-commit-hook:
ln -s -f $(ROOT_PROJECT_DIR)/pre-commit.py .git/hooks/pre-commit
.PHONY: lint-openapi-spec
lint-openapi-spec:
# operation-2xx-response: pageserver timeline delete returns 404 on success
find . -iname "openapi_spec.y*ml" -exec\
docker run --rm -v ${PWD}:/spec ghcr.io/redocly/cli:1.34.4\
--skip-rule=operation-operationId --skip-rule=operation-summary --extends=minimal\
--skip-rule=no-server-example.com --skip-rule=operation-2xx-response\
lint {} \+
# Targets for building PostgreSQL are defined in postgres.mk.
#
# But if the caller has indicated that PostgreSQL is already

View File

@@ -1,12 +1,9 @@
disallowed-methods = [
"tokio::task::block_in_place",
# Allow this for now, to deny it later once we stop using Handle::block_on completely
# "tokio::runtime::Handle::block_on",
# tokio-epoll-uring:
# - allow-invalid because the method doesn't exist on macOS
{ path = "tokio_epoll_uring::thread_local_system", replacement = "tokio_epoll_uring_ext module inside pageserver crate", allow-invalid = true }
# use tokio_epoll_uring_ext instead
"tokio_epoll_uring::thread_local_system",
]
disallowed-macros = [

View File

@@ -1915,10 +1915,10 @@ RUN cd /ext-src/pg_repack-src && patch -p1 </ext-src/pg_repack.patch && rm -f /e
COPY --chmod=755 docker-compose/run-tests.sh /run-tests.sh
RUN echo /usr/local/pgsql/lib > /etc/ld.so.conf.d/00-neon.conf && /sbin/ldconfig
RUN apt-get update && apt-get install -y libtap-parser-sourcehandler-pgtap-perl jq parallel \
RUN apt-get update && apt-get install -y libtap-parser-sourcehandler-pgtap-perl jq \
&& apt clean && rm -rf /ext-src/*.tar.gz /ext-src/*.patch /var/lib/apt/lists/*
ENV PATH=/usr/local/pgsql/bin:$PATH
ENV PGHOST=compute1
ENV PGHOST=compute
ENV PGPORT=55433
ENV PGUSER=cloud_admin
ENV PGDATABASE=postgres

View File

@@ -66,7 +66,7 @@ url.workspace = true
uuid.workspace = true
walkdir.workspace = true
x509-cert.workspace = true
postgres-types.workspace = true
postgres_versioninfo.workspace = true
postgres_initdb.workspace = true
compute_api.workspace = true

View File

@@ -46,14 +46,11 @@ stateDiagram-v2
Configuration --> Failed : Failed to configure the compute
Configuration --> Running : Compute has been configured
Empty --> Init : Compute spec is immediately available
Empty --> TerminationPendingFast : Requested termination
Empty --> TerminationPendingImmediate : Requested termination
Empty --> TerminationPending : Requested termination
Init --> Failed : Failed to start Postgres
Init --> Running : Started Postgres
Running --> TerminationPendingFast : Requested termination
Running --> TerminationPendingImmediate : Requested termination
TerminationPendingFast --> Terminated compute with 30s delay for cplane to inspect status
TerminationPendingImmediate --> Terminated : Terminated compute immediately
Running --> TerminationPending : Requested termination
TerminationPending --> Terminated : Terminated compute
Failed --> [*] : Compute exited
Terminated --> [*] : Compute exited
```

View File

@@ -3,7 +3,7 @@ use chrono::{DateTime, Utc};
use compute_api::privilege::Privilege;
use compute_api::responses::{
ComputeConfig, ComputeCtlConfig, ComputeMetrics, ComputeStatus, LfcOffloadState,
LfcPrewarmState, PromoteState, TlsConfig,
LfcPrewarmState, TlsConfig,
};
use compute_api::spec::{
ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, PageserverProtocol, PgIdent,
@@ -29,7 +29,8 @@ use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::time::{Duration, Instant};
use std::{env, fs};
use tokio::{spawn, sync::watch, task::JoinHandle, time};
use tokio::task::JoinHandle;
use tokio::{spawn, time};
use tracing::{Instrument, debug, error, info, instrument, warn};
use url::Url;
use utils::id::{TenantId, TimelineId};
@@ -174,7 +175,6 @@ pub struct ComputeState {
/// WAL flush LSN that is set after terminating Postgres and syncing safekeepers if
/// mode == ComputeMode::Primary. None otherwise
pub terminate_flush_lsn: Option<Lsn>,
pub promote_state: Option<watch::Receiver<PromoteState>>,
pub metrics: ComputeMetrics,
}
@@ -192,7 +192,6 @@ impl ComputeState {
lfc_prewarm_state: LfcPrewarmState::default(),
lfc_offload_state: LfcOffloadState::default(),
terminate_flush_lsn: None,
promote_state: None,
}
}
@@ -956,20 +955,14 @@ impl ComputeNode {
None
};
let mut delay_exit = false;
let mut state = self.state.lock().unwrap();
state.terminate_flush_lsn = lsn;
let delay_exit = state.status == ComputeStatus::TerminationPendingFast;
if state.status == ComputeStatus::TerminationPendingFast
|| state.status == ComputeStatus::TerminationPendingImmediate
{
info!(
"Changing compute status from {} to {}",
state.status,
ComputeStatus::Terminated
);
if let ComputeStatus::TerminationPending { mode } = state.status {
state.status = ComputeStatus::Terminated;
self.state_changed.notify_all();
// we were asked to terminate gracefully, don't exit to avoid restart
delay_exit = mode == compute_api::responses::TerminateMode::Fast
}
drop(state);
@@ -1064,7 +1057,7 @@ impl ComputeNode {
};
let (reader, connected) = tokio::runtime::Handle::current().block_on(async move {
let mut client = page_api::Client::connect(
let mut client = page_api::Client::new(
shard0_connstr,
spec.tenant_id,
spec.timeline_id,
@@ -1811,8 +1804,6 @@ impl ComputeNode {
tls_config,
)?;
self.pg_reload_conf()?;
if !spec.skip_pg_catalog_updates {
let max_concurrent_connections = spec.reconfigure_concurrency;
// Temporarily reset max_cluster_size in config
@@ -1832,9 +1823,10 @@ impl ComputeNode {
Ok(())
})?;
self.pg_reload_conf()?;
}
self.pg_reload_conf()?;
let unknown_op = "unknown".to_string();
let op_id = spec.operation_uuid.as_ref().unwrap_or(&unknown_op);
info!(
@@ -1907,8 +1899,7 @@ impl ComputeNode {
// exit loop
ComputeStatus::Failed
| ComputeStatus::TerminationPendingFast
| ComputeStatus::TerminationPendingImmediate
| ComputeStatus::TerminationPending { .. }
| ComputeStatus::Terminated => break 'cert_update,
// wait
@@ -2442,11 +2433,19 @@ LIMIT 100",
// If the value is -1, we never suspend so set the value to default collection.
// If the value is 0, it means default, we will just continue to use the default.
if spec.suspend_timeout_seconds == -1 || spec.suspend_timeout_seconds == 0 {
info!(
"[NEON_EXT_INT_UPD] Spec Timeout: {}, New Timeout: {}",
spec.suspend_timeout_seconds, DEFAULT_INSTALLED_EXTENSIONS_COLLECTION_INTERVAL
);
self.params.installed_extensions_collection_interval.store(
DEFAULT_INSTALLED_EXTENSIONS_COLLECTION_INTERVAL,
std::sync::atomic::Ordering::SeqCst,
);
} else {
info!(
"[NEON_EXT_INT_UPD] Spec Timeout: {}",
spec.suspend_timeout_seconds
);
self.params.installed_extensions_collection_interval.store(
spec.suspend_timeout_seconds as u64,
std::sync::atomic::Ordering::SeqCst,

View File

@@ -70,7 +70,7 @@ impl ComputeNode {
}
};
let row = match client
.query_one("select * from neon.get_prewarm_info()", &[])
.query_one("select * from get_prewarm_info()", &[])
.await
{
Ok(row) => row,
@@ -105,8 +105,7 @@ impl ComputeNode {
cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Completed;
return;
};
crate::metrics::LFC_PREWARM_ERRORS.inc();
error!(%err, "prewarming lfc");
error!(%err);
cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Failed {
error: err.to_string(),
};
@@ -146,7 +145,7 @@ impl ComputeNode {
ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await
.context("connecting to postgres")?
.query_one("select neon.prewarm_local_cache($1)", &[&uncompressed])
.query_one("select prewarm_local_cache($1)", &[&uncompressed])
.await
.context("loading LFC state into postgres")
.map(|_| ())
@@ -181,8 +180,7 @@ impl ComputeNode {
self.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Completed;
return;
};
crate::metrics::LFC_OFFLOAD_ERRORS.inc();
error!(%err, "offloading lfc");
error!(%err);
self.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Failed {
error: err.to_string(),
};
@@ -196,7 +194,7 @@ impl ComputeNode {
ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await
.context("connecting to postgres")?
.query_one("select neon.get_local_cache_state()", &[])
.query_one("select get_local_cache_state()", &[])
.await
.context("querying LFC state")?
.try_get::<usize, &[u8]>(0)

View File

@@ -1,132 +0,0 @@
use crate::compute::ComputeNode;
use anyhow::{Context, Result, bail};
use compute_api::{
responses::{LfcPrewarmState, PromoteState, SafekeepersLsn},
spec::ComputeMode,
};
use std::{sync::Arc, time::Duration};
use tokio::time::sleep;
use utils::lsn::Lsn;
impl ComputeNode {
/// Returns only when promote fails or succeeds. If a network error occurs
/// and http client disconnects, this does not stop promotion, and subsequent
/// calls block until promote finishes.
/// Called by control plane on secondary after primary endpoint is terminated
pub async fn promote(self: &Arc<Self>, safekeepers_lsn: SafekeepersLsn) -> PromoteState {
let cloned = self.clone();
let start_promotion = || {
let (tx, rx) = tokio::sync::watch::channel(PromoteState::NotPromoted);
tokio::spawn(async move {
tx.send(match cloned.promote_impl(safekeepers_lsn).await {
Ok(_) => PromoteState::Completed,
Err(err) => {
tracing::error!(%err, "promoting");
PromoteState::Failed {
error: err.to_string(),
}
}
})
});
rx
};
let mut task;
// self.state is unlocked after block ends so we lock it in promote_impl
// and task.changed() is reached
{
task = self
.state
.lock()
.unwrap()
.promote_state
.get_or_insert_with(start_promotion)
.clone()
}
task.changed().await.expect("promote sender dropped");
task.borrow().clone()
}
// Why do we have to supply safekeepers?
// For secondary we use primary_connection_conninfo so safekeepers field is empty
async fn promote_impl(&self, safekeepers_lsn: SafekeepersLsn) -> Result<()> {
{
let state = self.state.lock().unwrap();
let mode = &state.pspec.as_ref().unwrap().spec.mode;
if *mode != ComputeMode::Replica {
bail!("{} is not replica", mode.to_type_str());
}
// we don't need to query Postgres so not self.lfc_prewarm_state()
match &state.lfc_prewarm_state {
LfcPrewarmState::NotPrewarmed | LfcPrewarmState::Prewarming => {
bail!("prewarm not requested or pending")
}
LfcPrewarmState::Failed { error } => {
tracing::warn!(%error, "replica prewarm failed")
}
_ => {}
}
}
let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await
.context("connecting to postgres")?;
let primary_lsn = safekeepers_lsn.wal_flush_lsn;
let mut last_wal_replay_lsn: Lsn = Lsn::INVALID;
const RETRIES: i32 = 20;
for i in 0..=RETRIES {
let row = client
.query_one("SELECT pg_last_wal_replay_lsn()", &[])
.await
.context("getting last replay lsn")?;
let lsn: u64 = row.get::<usize, postgres_types::PgLsn>(0).into();
last_wal_replay_lsn = lsn.into();
if last_wal_replay_lsn >= primary_lsn {
break;
}
tracing::info!("Try {i}, replica lsn {last_wal_replay_lsn}, primary lsn {primary_lsn}");
sleep(Duration::from_secs(1)).await;
}
if last_wal_replay_lsn < primary_lsn {
bail!("didn't catch up with primary in {RETRIES} retries");
}
// using $1 doesn't work with ALTER SYSTEM SET
let safekeepers_sql = format!(
"ALTER SYSTEM SET neon.safekeepers='{}'",
safekeepers_lsn.safekeepers
);
client
.query(&safekeepers_sql, &[])
.await
.context("setting safekeepers")?;
client
.query("SELECT pg_reload_conf()", &[])
.await
.context("reloading postgres config")?;
let row = client
.query_one("SELECT * FROM pg_promote()", &[])
.await
.context("pg_promote")?;
if !row.get::<usize, bool>(0) {
bail!("pg_promote() returned false");
}
let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await
.context("connecting to postgres")?;
let row = client
.query_one("SHOW transaction_read_only", &[])
.await
.context("getting transaction_read_only")?;
if row.get::<usize, &str>(0) == "on" {
bail!("replica in read only mode after promotion");
}
let mut state = self.state.lock().unwrap();
state.pspec.as_mut().unwrap().spec.mode = ComputeMode::Primary;
Ok(())
}
}

View File

@@ -83,87 +83,6 @@ paths:
schema:
$ref: "#/components/schemas/DbsAndRoles"
/promote:
post:
tags:
- Promotion
summary: Promote secondary replica to primary
description: ""
operationId: promoteReplica
requestBody:
description: Promote requests data
required: true
content:
application/json:
schema:
$ref: "#/components/schemas/SafekeepersLsn"
responses:
200:
description: Promote succeeded or wasn't started
content:
application/json:
schema:
$ref: "#/components/schemas/PromoteState"
500:
description: Promote failed
content:
application/json:
schema:
$ref: "#/components/schemas/PromoteState"
/lfc/prewarm:
post:
summary: Request LFC Prewarm
parameters:
- name: from_endpoint
in: query
schema:
type: string
description: ""
operationId: lfcPrewarm
responses:
202:
description: LFC prewarm started
429:
description: LFC prewarm ongoing
get:
tags:
- Prewarm
summary: Get LFC prewarm state
description: ""
operationId: getLfcPrewarmState
responses:
200:
description: Prewarm state
content:
application/json:
schema:
$ref: "#/components/schemas/LfcPrewarmState"
/lfc/offload:
post:
summary: Request LFC offload
description: ""
operationId: lfcOffload
responses:
202:
description: LFC offload started
429:
description: LFC offload ongoing
get:
tags:
- Prewarm
summary: Get LFC offloading state
description: ""
operationId: getLfcOffloadState
responses:
200:
description: Offload state
content:
application/json:
schema:
$ref: "#/components/schemas/LfcOffloadState"
/database_schema:
get:
tags:
@@ -371,28 +290,9 @@ paths:
summary: Terminate Postgres and wait for it to exit
description: ""
operationId: terminate
parameters:
- name: mode
in: query
description: "Terminate mode: fast (wait 30s before returning) and immediate"
required: false
schema:
type: string
enum: ["fast", "immediate"]
default: fast
responses:
200:
description: Result
content:
application/json:
schema:
$ref: "#/components/schemas/TerminateResponse"
201:
description: Result if compute is already terminated
content:
application/json:
schema:
$ref: "#/components/schemas/TerminateResponse"
412:
description: "wrong state"
content:
@@ -435,6 +335,15 @@ components:
total_startup_ms:
type: integer
Info:
type: object
description: Information about VM/Pod.
required:
- num_cpus
properties:
num_cpus:
type: integer
DbsAndRoles:
type: object
description: Databases and Roles
@@ -549,14 +458,11 @@ components:
type: string
enum:
- empty
- configuration_pending
- init
- running
- configuration
- failed
- termination_pending_fast
- termination_pending_immediate
- terminated
- running
- configuration_pending
- configuration
example: running
ExtensionInstallRequest:
@@ -591,69 +497,25 @@ components:
type: string
example: "1.0.0"
SafekeepersLsn:
InstalledExtensions:
type: object
required:
- safekeepers
- wal_flush_lsn
properties:
safekeepers:
description: Primary replica safekeepers
type: string
wal_flush_lsn:
description: Primary last WAL flush LSN
type: string
LfcPrewarmState:
type: object
required:
- status
- total
- prewarmed
- skipped
properties:
status:
description: Lfc prewarm status
enum: [not_prewarmed, prewarming, completed, failed]
type: string
error:
description: Lfc prewarm error, if any
type: string
total:
description: Total pages processed
type: integer
prewarmed:
description: Total pages prewarmed
type: integer
skipped:
description: Pages processed but not prewarmed
type: integer
LfcOffloadState:
type: object
required:
- status
properties:
status:
description: Lfc offload status
enum: [not_offloaded, offloading, completed, failed]
type: string
error:
description: Lfc offload error, if any
type: string
PromoteState:
type: object
required:
- status
properties:
status:
description: Promote result
enum: [not_promoted, completed, failed]
type: string
error:
description: Promote error, if any
type: string
extensions:
description: Contains list of installed extensions.
type: array
items:
type: object
properties:
extname:
type: string
version:
type: string
items:
type: string
n_databases:
type: integer
owned_by_superuser:
type: integer
SetRoleGrantsRequest:
type: object
@@ -682,17 +544,6 @@ components:
description: Role name.
example: "neon"
TerminateResponse:
type: object
required:
- lsn
properties:
lsn:
type: string
nullable: true
description: "last WAL flush LSN"
example: "0/028F10D8"
SetRoleGrantsResponse:
type: object
required:

View File

@@ -14,7 +14,6 @@ pub(in crate::http) mod insights;
pub(in crate::http) mod lfc;
pub(in crate::http) mod metrics;
pub(in crate::http) mod metrics_json;
pub(in crate::http) mod promote;
pub(in crate::http) mod status;
pub(in crate::http) mod terminate;

View File

@@ -1,14 +0,0 @@
use crate::http::JsonResponse;
use axum::Form;
use http::StatusCode;
pub(in crate::http) async fn promote(
compute: axum::extract::State<std::sync::Arc<crate::compute::ComputeNode>>,
Form(safekeepers_lsn): Form<compute_api::responses::SafekeepersLsn>,
) -> axum::response::Response {
let state = compute.promote(safekeepers_lsn).await;
if let compute_api::responses::PromoteState::Failed { error } = state {
return JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, error);
}
JsonResponse::success(StatusCode::OK, state)
}

View File

@@ -3,7 +3,7 @@ use crate::http::JsonResponse;
use axum::extract::State;
use axum::response::Response;
use axum_extra::extract::OptionalQuery;
use compute_api::responses::{ComputeStatus, TerminateMode, TerminateResponse};
use compute_api::responses::{ComputeStatus, TerminateResponse};
use http::StatusCode;
use serde::Deserialize;
use std::sync::Arc;
@@ -12,7 +12,7 @@ use tracing::info;
#[derive(Deserialize, Default)]
pub struct TerminateQuery {
mode: TerminateMode,
mode: compute_api::responses::TerminateMode,
}
/// Terminate the compute.
@@ -24,16 +24,16 @@ pub(in crate::http) async fn terminate(
{
let mut state = compute.state.lock().unwrap();
if state.status == ComputeStatus::Terminated {
let response = TerminateResponse {
lsn: state.terminate_flush_lsn,
};
return JsonResponse::success(StatusCode::CREATED, response);
return JsonResponse::success(StatusCode::CREATED, state.terminate_flush_lsn);
}
if !matches!(state.status, ComputeStatus::Empty | ComputeStatus::Running) {
return JsonResponse::invalid_status(state.status);
}
state.set_status(mode.into(), &compute.state_changed);
state.set_status(
ComputeStatus::TerminationPending { mode },
&compute.state_changed,
);
}
forward_termination_signal(false);

View File

@@ -23,7 +23,7 @@ use super::{
middleware::authorize::Authorize,
routes::{
check_writability, configure, database_schema, dbs_and_roles, extension_server, extensions,
grants, insights, lfc, metrics, metrics_json, promote, status, terminate,
grants, insights, lfc, metrics, metrics_json, status, terminate,
},
};
use crate::compute::ComputeNode;
@@ -87,7 +87,6 @@ impl From<&Server> for Router<Arc<ComputeNode>> {
let authenticated_router = Router::<Arc<ComputeNode>>::new()
.route("/lfc/prewarm", get(lfc::prewarm_state).post(lfc::prewarm))
.route("/lfc/offload", get(lfc::offload_state).post(lfc::offload))
.route("/promote", post(promote::promote))
.route("/check_writability", post(check_writability::is_writable))
.route("/configure", post(configure::configure))
.route("/database_schema", get(database_schema::get_schema_dump))

View File

@@ -12,7 +12,6 @@ pub mod logger;
pub mod catalog;
pub mod compute;
pub mod compute_prewarm;
pub mod compute_promote;
pub mod disk_quota;
pub mod extension_server;
pub mod installed_extensions;

View File

@@ -192,7 +192,7 @@ fn acquire_lsn_lease_grpc(
lsn: Lsn,
) -> Result<Option<SystemTime>> {
tokio::runtime::Handle::current().block_on(async move {
let mut client = page_api::Client::connect(
let mut client = page_api::Client::new(
connstring.to_string(),
tenant_shard_id.tenant_id,
timeline_id,

View File

@@ -105,14 +105,6 @@ pub(crate) static LFC_PREWARMS: Lazy<IntCounter> = Lazy::new(|| {
.expect("failed to define a metric")
});
pub(crate) static LFC_PREWARM_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"compute_ctl_lfc_prewarm_errors_total",
"Total number of LFC prewarm errors",
)
.expect("failed to define a metric")
});
pub(crate) static LFC_OFFLOADS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"compute_ctl_lfc_offloads_total",
@@ -121,14 +113,6 @@ pub(crate) static LFC_OFFLOADS: Lazy<IntCounter> = Lazy::new(|| {
.expect("failed to define a metric")
});
pub(crate) static LFC_OFFLOAD_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"compute_ctl_lfc_offload_errors_total",
"Total number of LFC offload errors",
)
.expect("failed to define a metric")
});
pub fn collect() -> Vec<MetricFamily> {
let mut metrics = COMPUTE_CTL_UP.collect();
metrics.extend(INSTALLED_EXTENSIONS.collect());
@@ -139,8 +123,6 @@ pub fn collect() -> Vec<MetricFamily> {
metrics.extend(PG_CURR_DOWNTIME_MS.collect());
metrics.extend(PG_TOTAL_DOWNTIME_MS.collect());
metrics.extend(LFC_PREWARMS.collect());
metrics.extend(LFC_PREWARM_ERRORS.collect());
metrics.extend(LFC_OFFLOADS.collect());
metrics.extend(LFC_OFFLOAD_ERRORS.collect());
metrics
}

View File

@@ -1,16 +1,3 @@
-- On December 8th, 2023, an engineering escalation (INC-110) was opened after
-- it was found that BYPASSRLS was being applied to all roles.
--
-- PR that introduced the issue: https://github.com/neondatabase/neon/pull/5657
-- Subsequent commit on main: https://github.com/neondatabase/neon/commit/ad99fa5f0393e2679e5323df653c508ffa0ac072
--
-- NOBYPASSRLS and INHERIT are the defaults for a Postgres role, but because it
-- isn't easy to know if a Postgres cluster is affected by the issue, we need to
-- keep the migration around for a long time, if not indefinitely, so any
-- cluster can be fixed.
--
-- Branching is the gift that keeps on giving...
DO $$
DECLARE
role_name text;

View File

@@ -1 +0,0 @@
GRANT pg_signal_backend TO neon_superuser WITH ADMIN OPTION;

View File

@@ -7,17 +7,13 @@ BEGIN
INTO monitor
FROM pg_auth_members
WHERE roleid = 'pg_monitor'::regrole
AND member = 'neon_superuser'::regrole;
AND member = 'pg_monitor'::regrole;
IF monitor IS NULL THEN
RAISE EXCEPTION 'no entry in pg_auth_members for neon_superuser and pg_monitor';
END IF;
IF monitor.admin IS NULL OR NOT monitor.member THEN
IF NOT monitor.member THEN
RAISE EXCEPTION 'neon_superuser is not a member of pg_monitor';
END IF;
IF monitor.admin IS NULL OR NOT monitor.admin THEN
IF NOT monitor.admin THEN
RAISE EXCEPTION 'neon_superuser cannot grant pg_monitor';
END IF;
END $$;

View File

@@ -1,23 +0,0 @@
DO $$
DECLARE
signal_backend record;
BEGIN
SELECT pg_has_role('neon_superuser', 'pg_signal_backend', 'member') AS member,
admin_option AS admin
INTO signal_backend
FROM pg_auth_members
WHERE roleid = 'pg_signal_backend'::regrole
AND member = 'neon_superuser'::regrole;
IF signal_backend IS NULL THEN
RAISE EXCEPTION 'no entry in pg_auth_members for neon_superuser and pg_signal_backend';
END IF;
IF signal_backend.member IS NULL OR NOT signal_backend.member THEN
RAISE EXCEPTION 'neon_superuser is not a member of pg_signal_backend';
END IF;
IF signal_backend.admin IS NULL OR NOT signal_backend.admin THEN
RAISE EXCEPTION 'neon_superuser cannot grant pg_signal_backend';
END IF;
END $$;

View File

@@ -84,8 +84,7 @@ impl ComputeMonitor {
if matches!(
compute_status,
ComputeStatus::Terminated
| ComputeStatus::TerminationPendingFast
| ComputeStatus::TerminationPendingImmediate
| ComputeStatus::TerminationPending { .. }
| ComputeStatus::Failed
) {
info!(

View File

@@ -197,7 +197,6 @@ pub async fn handle_migrations(client: &mut Client) -> Result<()> {
include_str!(
"./migrations/0011-grant_pg_show_replication_origin_status_to_neon_superuser.sql"
),
include_str!("./migrations/0012-grant_pg_signal_backend_to_neon_superuser.sql"),
];
MigrationRunner::new(client, &migrations)

View File

@@ -36,7 +36,7 @@ impl StorageBroker {
pub async fn start(&self, retry_timeout: &Duration) -> anyhow::Result<()> {
let broker = &self.env.broker;
println!("Starting neon broker at {}", broker.client_url());
print!("Starting neon broker at {}", broker.client_url());
let mut args = Vec::new();

View File

@@ -922,8 +922,7 @@ impl Endpoint {
ComputeStatus::Empty
| ComputeStatus::ConfigurationPending
| ComputeStatus::Configuration
| ComputeStatus::TerminationPendingFast
| ComputeStatus::TerminationPendingImmediate
| ComputeStatus::TerminationPending { .. }
| ComputeStatus::Terminated => {
bail!("unexpected compute status: {:?}", state.status)
}

View File

@@ -303,7 +303,7 @@ impl PageServerNode {
async fn start_node(&self, retry_timeout: &Duration) -> anyhow::Result<()> {
// TODO: using a thread here because start_process() is not async but we need to call check_status()
let datadir = self.repo_path();
println!(
print!(
"Starting pageserver node {} at '{}' in {:?}, retrying for {:?}",
self.conf.id,
self.pg_connection_config.raw_address(),
@@ -452,12 +452,6 @@ impl PageServerNode {
.map(|x| x.parse::<usize>())
.transpose()
.context("Failed to parse 'image_creation_threshold' as non zero integer")?,
// HADRON
image_layer_force_creation_period: settings
.remove("image_layer_force_creation_period")
.map(humantime::parse_duration)
.transpose()
.context("Failed to parse 'image_layer_force_creation_period' as duration")?,
image_layer_creation_check_threshold: settings
.remove("image_layer_creation_check_threshold")
.map(|x| x.parse::<u8>())

View File

@@ -127,7 +127,7 @@ impl SafekeeperNode {
extra_opts: &[String],
retry_timeout: &Duration,
) -> anyhow::Result<()> {
println!(
print!(
"Starting safekeeper at '{}' in '{}', retrying for {:?}",
self.pg_connection_config.raw_address(),
self.datadir_path().display(),

View File

@@ -660,7 +660,7 @@ impl StorageController {
));
}
println!("Starting storage controller at {scheme}://{host}:{listen_port}");
println!("Starting storage controller");
background_process::start_process(
COMMAND,

View File

@@ -14,7 +14,6 @@ humantime.workspace = true
pageserver_api.workspace = true
pageserver_client.workspace = true
reqwest.workspace = true
safekeeper_api.workspace=true
serde_json = { workspace = true, features = ["raw_value"] }
storage_controller_client.workspace = true
tokio.workspace = true

View File

@@ -11,7 +11,7 @@ use pageserver_api::controller_api::{
PlacementPolicy, SafekeeperDescribeResponse, SafekeeperSchedulingPolicyRequest,
ShardSchedulingPolicy, ShardsPreferredAzsRequest, ShardsPreferredAzsResponse,
SkSchedulingPolicy, TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest,
TenantShardMigrateRequest, TenantShardMigrateResponse, TimelineSafekeeperMigrateRequest,
TenantShardMigrateRequest, TenantShardMigrateResponse,
};
use pageserver_api::models::{
EvictionPolicy, EvictionPolicyLayerAccessThreshold, ShardParameters, TenantConfig,
@@ -21,7 +21,6 @@ use pageserver_api::models::{
use pageserver_api::shard::{ShardStripeSize, TenantShardId};
use pageserver_client::mgmt_api::{self};
use reqwest::{Certificate, Method, StatusCode, Url};
use safekeeper_api::models::TimelineLocateResponse;
use storage_controller_client::control_api::Client;
use utils::id::{NodeId, TenantId, TimelineId};
@@ -76,6 +75,12 @@ enum Command {
NodeStartDelete {
#[arg(long)]
node_id: NodeId,
/// When `force` is true, skip waiting for shards to prewarm during migration.
/// This can significantly speed up node deletion since prewarming all shards
/// can take considerable time, but may result in slower initial access to
/// migrated shards until they warm up naturally.
#[arg(long)]
force: bool,
},
/// Cancel deletion of the specified pageserver and wait for `timeout`
/// for the operation to be canceled. May be retried.
@@ -280,23 +285,6 @@ enum Command {
#[arg(long)]
concurrency: Option<usize>,
},
/// Locate safekeepers for a timeline from the storcon DB.
TimelineLocate {
#[arg(long)]
tenant_id: TenantId,
#[arg(long)]
timeline_id: TimelineId,
},
/// Migrate a timeline to a new set of safekeepers
TimelineSafekeeperMigrate {
#[arg(long)]
tenant_id: TenantId,
#[arg(long)]
timeline_id: TimelineId,
/// Example: --new-sk-set 1,2,3
#[arg(long, required = true, value_delimiter = ',')]
new_sk_set: Vec<NodeId>,
},
}
#[derive(Parser)]
@@ -951,13 +939,14 @@ async fn main() -> anyhow::Result<()> {
.dispatch::<(), ()>(Method::DELETE, format!("control/v1/node/{node_id}"), None)
.await?;
}
Command::NodeStartDelete { node_id } => {
Command::NodeStartDelete { node_id, force } => {
let query = if force {
format!("control/v1/node/{node_id}/delete?force=true")
} else {
format!("control/v1/node/{node_id}/delete")
};
storcon_client
.dispatch::<(), ()>(
Method::PUT,
format!("control/v1/node/{node_id}/delete"),
None,
)
.dispatch::<(), ()>(Method::PUT, query, None)
.await?;
println!("Delete started for {node_id}");
}
@@ -1342,7 +1331,7 @@ async fn main() -> anyhow::Result<()> {
concurrency,
} => {
let mut path = format!(
"v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/download_heatmap_layers",
"/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/download_heatmap_layers",
);
if let Some(c) = concurrency {
@@ -1353,41 +1342,6 @@ async fn main() -> anyhow::Result<()> {
.dispatch::<(), ()>(Method::POST, path, None)
.await?;
}
Command::TimelineLocate {
tenant_id,
timeline_id,
} => {
let path = format!("debug/v1/tenant/{tenant_id}/timeline/{timeline_id}/locate");
let resp = storcon_client
.dispatch::<(), TimelineLocateResponse>(Method::GET, path, None)
.await?;
let sk_set = resp.sk_set.iter().map(|id| id.0 as i64).collect::<Vec<_>>();
let new_sk_set = resp
.new_sk_set
.as_ref()
.map(|ids| ids.iter().map(|id| id.0 as i64).collect::<Vec<_>>());
println!("generation = {}", resp.generation);
println!("sk_set = {sk_set:?}");
println!("new_sk_set = {new_sk_set:?}");
}
Command::TimelineSafekeeperMigrate {
tenant_id,
timeline_id,
new_sk_set,
} => {
let path = format!("v1/tenant/{tenant_id}/timeline/{timeline_id}/safekeeper_migrate");
storcon_client
.dispatch::<_, ()>(
Method::POST,
path,
Some(TimelineSafekeeperMigrateRequest { new_sk_set }),
)
.await?;
}
}
Ok(())

View File

@@ -54,16 +54,14 @@ else
printf '%s\n' "${result}" | jq .
fi
if [[ "${RUN_PARALLEL:-false}" != "true" ]]; then
echo "Check if a timeline present"
PARAMS=(
-X GET
-H "Content-Type: application/json"
"http://pageserver:9898/v1/tenant/${tenant_id}/timeline"
)
timeline_id=$(curl "${PARAMS[@]}" | jq -r .[0].timeline_id)
fi
if [[ -z "${timeline_id:-}" || "${timeline_id:-}" = null ]]; then
echo "Check if a timeline present"
PARAMS=(
-X GET
-H "Content-Type: application/json"
"http://pageserver:9898/v1/tenant/${tenant_id}/timeline"
)
timeline_id=$(curl "${PARAMS[@]}" | jq -r .[0].timeline_id)
if [[ -z "${timeline_id}" || "${timeline_id}" = null ]]; then
generate_id timeline_id
PARAMS=(
-sbf

View File

@@ -142,7 +142,7 @@ services:
- "storage_broker"
- "--listen-addr=0.0.0.0:50051"
compute1:
compute:
restart: always
build:
context: ./compute_wrapper/
@@ -152,7 +152,6 @@ services:
- TAG=${COMPUTE_TAG:-${TAG:-latest}}
- http_proxy=${http_proxy:-}
- https_proxy=${https_proxy:-}
image: built-compute
environment:
- PG_VERSION=${PG_VERSION:-16}
- TENANT_ID=${TENANT_ID:-}
@@ -167,11 +166,6 @@ services:
- 3080:3080 # http endpoints
entrypoint:
- "/shell/compute.sh"
# Ad an alias for compute1 for compatibility
networks:
default:
aliases:
- compute
depends_on:
- safekeeper1
- safekeeper2
@@ -180,20 +174,15 @@ services:
compute_is_ready:
image: postgres:latest
environment:
- PARALLEL_COMPUTES=1
entrypoint:
- "/bin/sh"
- "/bin/bash"
- "-c"
command:
- "for i in $(seq 1 $${PARALLEL_COMPUTES}); do
until pg_isready -h compute$$i -p 55433 -U cloud_admin ; do
sleep 1;
done;
done;
echo All computes are started"
- "until pg_isready -h compute -p 55433 -U cloud_admin ; do
echo 'Waiting to start compute...' && sleep 1;
done"
depends_on:
- compute1
- compute
neon-test-extensions:
profiles: ["test-extensions"]
@@ -207,4 +196,4 @@ services:
command:
- sleep 3600
depends_on:
- compute1
- compute

View File

@@ -1,4 +1,4 @@
#!/usr/bin/env bash
#!/bin/bash
# A basic test to ensure Docker images are built correctly.
# Build a wrapper around the compute, start all services and runs a simple SQL query.
@@ -13,36 +13,9 @@
#
set -eux -o pipefail
cd "$(dirname "${0}")"
export COMPOSE_FILE='docker-compose.yml'
export COMPOSE_PROFILES=test-extensions
export PARALLEL_COMPUTES=${PARALLEL_COMPUTES:-1}
READY_MESSAGE="All computes are started"
COMPUTES=()
for i in $(seq 1 "${PARALLEL_COMPUTES}"); do
COMPUTES+=("compute${i}")
done
CURRENT_TMPDIR=$(mktemp -d)
trap 'rm -rf ${CURRENT_TMPDIR} docker-compose-parallel.yml' EXIT
if [[ ${PARALLEL_COMPUTES} -gt 1 ]]; then
export COMPOSE_FILE=docker-compose-parallel.yml
cp docker-compose.yml docker-compose-parallel.yml
# Replace the environment variable PARALLEL_COMPUTES with the actual value
yq eval -i ".services.compute_is_ready.environment |= map(select(. | test(\"^PARALLEL_COMPUTES=\") | not)) + [\"PARALLEL_COMPUTES=${PARALLEL_COMPUTES}\"]" ${COMPOSE_FILE}
for i in $(seq 2 "${PARALLEL_COMPUTES}"); do
# Duplicate compute1 as compute${i} for parallel execution
yq eval -i ".services.compute${i} = .services.compute1" ${COMPOSE_FILE}
# We don't need these sections, so delete them
yq eval -i "(del .services.compute${i}.build) | (del .services.compute${i}.ports) | (del .services.compute${i}.networks)" ${COMPOSE_FILE}
# Let the compute 1 be the only dependence
yq eval -i ".services.compute${i}.depends_on = [\"compute1\"]" ${COMPOSE_FILE}
# Set RUN_PARALLEL=true for compute2. They will generate tenant_id and timeline_id to avoid using the same as other computes
yq eval -i ".services.compute${i}.environment += [\"RUN_PARALLEL=true\"]" ${COMPOSE_FILE}
# Remove TENANT_ID and TIMELINE_ID from the environment variables of the generated computes
# They will create new TENANT_ID and TIMELINE_ID anyway.
yq eval -i ".services.compute${i}.environment |= map(select(. | (test(\"^TENANT_ID=\") or test(\"^TIMELINE_ID=\")) | not))" ${COMPOSE_FILE}
done
fi
cd "$(dirname "${0}")"
PSQL_OPTION="-h localhost -U cloud_admin -p 55433 -d postgres"
function cleanup() {
@@ -54,11 +27,11 @@ function cleanup() {
for pg_version in ${TEST_VERSION_ONLY-14 15 16 17}; do
pg_version=${pg_version/v/}
echo "clean up containers if exist"
echo "clean up containers if exists"
cleanup
PG_TEST_VERSION=$((pg_version < 16 ? 16 : pg_version))
PG_VERSION=${pg_version} PG_TEST_VERSION=${PG_TEST_VERSION} docker compose build compute1
PG_VERSION=${pg_version} PG_TEST_VERSION=${PG_TEST_VERSION} docker compose up --quiet-pull -d
PG_VERSION=${pg_version} PG_TEST_VERSION=${PG_TEST_VERSION} docker compose up --quiet-pull --build -d
echo "wait until the compute is ready. timeout after 60s. "
cnt=0
while sleep 3; do
@@ -68,50 +41,45 @@ for pg_version in ${TEST_VERSION_ONLY-14 15 16 17}; do
echo "timeout before the compute is ready."
exit 1
fi
if docker compose logs compute_is_ready | grep -q "${READY_MESSAGE}"; then
if docker compose logs "compute_is_ready" | grep -q "accepting connections"; then
echo "OK. The compute is ready to connect."
echo "execute simple queries."
for compute in "${COMPUTES[@]}"; do
docker compose exec "${compute}" /bin/bash -c "psql ${PSQL_OPTION} -c 'SELECT 1'"
done
docker compose exec compute /bin/bash -c "psql ${PSQL_OPTION} -c 'SELECT 1'"
break
fi
done
if [[ ${pg_version} -ge 16 ]]; then
mkdir "${CURRENT_TMPDIR}"/{pg_hint_plan-src,file_fdw,postgis-src}
docker compose cp neon-test-extensions:/ext-src/postgis-src/raster/test "${CURRENT_TMPDIR}/postgis-src/test"
docker compose cp neon-test-extensions:/ext-src/postgis-src/regress/00-regress-install "${CURRENT_TMPDIR}/postgis-src/00-regress-install"
docker compose cp neon-test-extensions:/ext-src/pg_hint_plan-src/data "${CURRENT_TMPDIR}/pg_hint_plan-src/data"
docker compose cp neon-test-extensions:/postgres/contrib/file_fdw/data "${CURRENT_TMPDIR}/file_fdw/data"
for compute in "${COMPUTES[@]}"; do
# This is required for the pg_hint_plan test, to prevent flaky log message causing the test to fail
# It cannot be moved to Dockerfile now because the database directory is created after the start of the container
echo Adding dummy config on "${compute}"
docker compose exec "${compute}" touch /var/db/postgres/compute/compute_ctl_temp_override.conf
# Prepare for the PostGIS test
docker compose exec "${compute}" mkdir -p /tmp/pgis_reg/pgis_reg_tmp /ext-src/postgis-src/raster /ext-src/postgis-src/regress /ext-src/postgis-src/regress/00-regress-install
docker compose cp "${CURRENT_TMPDIR}/postgis-src/test" "${compute}":/ext-src/postgis-src/raster/test
docker compose cp "${CURRENT_TMPDIR}/postgis-src/00-regress-install" "${compute}":/ext-src/postgis-src/regress
# The following block copies the files for the pg_hintplan test to the compute node for the extension test in an isolated docker-compose environment
docker compose cp "${CURRENT_TMPDIR}/pg_hint_plan-src/data" "${compute}":/ext-src/pg_hint_plan-src/
# The following block does the same for the contrib/file_fdw test
docker compose cp "${CURRENT_TMPDIR}/file_fdw/data" "${compute}":/postgres/contrib/file_fdw/data
done
# This is required for the pg_hint_plan test, to prevent flaky log message causing the test to fail
# It cannot be moved to Dockerfile now because the database directory is created after the start of the container
echo Adding dummy config
docker compose exec compute touch /var/db/postgres/compute/compute_ctl_temp_override.conf
# Prepare for the PostGIS test
docker compose exec compute mkdir -p /tmp/pgis_reg/pgis_reg_tmp
TMPDIR=$(mktemp -d)
docker compose cp neon-test-extensions:/ext-src/postgis-src/raster/test "${TMPDIR}"
docker compose cp neon-test-extensions:/ext-src/postgis-src/regress/00-regress-install "${TMPDIR}"
docker compose exec compute mkdir -p /ext-src/postgis-src/raster /ext-src/postgis-src/regress /ext-src/postgis-src/regress/00-regress-install
docker compose cp "${TMPDIR}/test" compute:/ext-src/postgis-src/raster/test
docker compose cp "${TMPDIR}/00-regress-install" compute:/ext-src/postgis-src/regress
rm -rf "${TMPDIR}"
# The following block copies the files for the pg_hintplan test to the compute node for the extension test in an isolated docker-compose environment
TMPDIR=$(mktemp -d)
docker compose cp neon-test-extensions:/ext-src/pg_hint_plan-src/data "${TMPDIR}/data"
docker compose cp "${TMPDIR}/data" compute:/ext-src/pg_hint_plan-src/
rm -rf "${TMPDIR}"
# The following block does the same for the contrib/file_fdw test
TMPDIR=$(mktemp -d)
docker compose cp neon-test-extensions:/postgres/contrib/file_fdw/data "${TMPDIR}/data"
docker compose cp "${TMPDIR}/data" compute:/postgres/contrib/file_fdw/data
rm -rf "${TMPDIR}"
# Apply patches
docker compose exec -T neon-test-extensions bash -c "(cd /postgres && patch -p1)" <"../compute/patches/contrib_pg${pg_version}.patch"
# We are running tests now
rm -f testout.txt testout_contrib.txt
# We want to run the longest tests first to better utilize parallelization and reduce overall test time.
# Tests listed in the RUN_FIRST variable will be run before others.
# If parallelization is not used, this environment variable will be ignored.
docker compose exec -e USE_PGXS=1 -e SKIP=timescaledb-src,rdkit-src,pg_jsonschema-src,kq_imcx-src,wal2json_2_5-src,rag_jina_reranker_v1_tiny_en-src,rag_bge_small_en_v15-src \
-e RUN_FIRST=hll-src,postgis-src,pgtap-src -e PARALLEL_COMPUTES="${PARALLEL_COMPUTES}" \
neon-test-extensions /run-tests.sh /ext-src | tee testout.txt && EXT_SUCCESS=1 || EXT_SUCCESS=0
docker compose exec -e SKIP=start-scripts,postgres_fdw,ltree_plpython,jsonb_plpython,jsonb_plperl,hstore_plpython,hstore_plperl,dblink,bool_plperl \
-e PARALLEL_COMPUTES="${PARALLEL_COMPUTES}" \
neon-test-extensions /run-tests.sh /postgres/contrib | tee testout_contrib.txt && CONTRIB_SUCCESS=1 || CONTRIB_SUCCESS=0
if [[ ${EXT_SUCCESS} -eq 0 || ${CONTRIB_SUCCESS} -eq 0 ]]; then
CONTRIB_FAILED=

View File

@@ -1,4 +1,4 @@
#!/usr/bin/env bash
#!/bin/bash
set -x
if [[ -v BENCHMARK_CONNSTR ]]; then
@@ -26,9 +26,8 @@ if [[ -v BENCHMARK_CONNSTR ]]; then
fi
fi
REGULAR_USER=false
PARALLEL_COMPUTES=${PARALLEL_COMPUTES:-1}
while getopts pr arg; do
case ${arg} in
while getopts r arg; do
case $arg in
r)
REGULAR_USER=true
shift $((OPTIND-1))
@@ -42,49 +41,26 @@ extdir=${1}
cd "${extdir}" || exit 2
FAILED=
export FAILED_FILE=/tmp/failed
rm -f ${FAILED_FILE}
mapfile -t LIST < <( (echo -e "${SKIP//","/"\n"}"; ls) | sort | uniq -u)
if [[ ${PARALLEL_COMPUTES} -gt 1 ]]; then
# Avoid errors if RUN_FIRST is not defined
RUN_FIRST=${RUN_FIRST:-}
# Move entries listed in the RUN_FIRST variable to the beginning
ORDERED_LIST=$(printf "%s\n" "${LIST[@]}" | grep -x -Ff <(echo -e "${RUN_FIRST//,/$'\n'}"); printf "%s\n" "${LIST[@]}" | grep -vx -Ff <(echo -e "${RUN_FIRST//,/$'\n'}"))
parallel -j"${PARALLEL_COMPUTES}" "[[ -d {} ]] || exit 0
export PGHOST=compute{%}
if ! psql -c 'select 1'>/dev/null; then
exit 1
fi
echo Running on \${PGHOST}
if [[ -f ${extdir}/{}/neon-test.sh ]]; then
echo Running from script
${extdir}/{}/neon-test.sh || echo {} >> ${FAILED_FILE};
else
echo Running using make;
USE_PGXS=1 make -C {} installcheck || echo {} >> ${FAILED_FILE};
fi" ::: ${ORDERED_LIST}
[[ ! -f ${FAILED_FILE} ]] && exit 0
else
for d in "${LIST[@]}"; do
[ -d "${d}" ] || continue
if ! psql -w -c "select 1" >/dev/null; then
FAILED="${d} ${FAILED}"
break
fi
if [[ ${REGULAR_USER} = true ]] && [ -f "${d}"/regular-test.sh ]; then
"${d}/regular-test.sh" || FAILED="${d} ${FAILED}"
continue
fi
LIST=$( (echo -e "${SKIP//","/"\n"}"; ls) | sort | uniq -u)
for d in ${LIST}; do
[ -d "${d}" ] || continue
if ! psql -w -c "select 1" >/dev/null; then
FAILED="${d} ${FAILED}"
break
fi
if [[ ${REGULAR_USER} = true ]] && [ -f "${d}"/regular-test.sh ]; then
"${d}/regular-test.sh" || FAILED="${d} ${FAILED}"
continue
fi
if [ -f "${d}/neon-test.sh" ]; then
"${d}/neon-test.sh" || FAILED="${d} ${FAILED}"
else
USE_PGXS=1 make -C "${d}" installcheck || FAILED="${d} ${FAILED}"
fi
done
[[ -z ${FAILED} ]] && exit 0
fi
for d in ${FAILED} $([[ ! -f ${FAILED_FILE} ]] || cat ${FAILED_FILE}); do
if [ -f "${d}/neon-test.sh" ]; then
"${d}/neon-test.sh" || FAILED="${d} ${FAILED}"
else
USE_PGXS=1 make -C "${d}" installcheck || FAILED="${d} ${FAILED}"
fi
done
[ -z "${FAILED}" ] && exit 0
for d in ${FAILED}; do
cat "$(find $d -name regression.diffs)"
done
for postgis_diff in /tmp/pgis_reg/*_diff; do
@@ -92,5 +68,4 @@ for postgis_diff in /tmp/pgis_reg/*_diff; do
cat "${postgis_diff}"
done
echo "${FAILED}"
cat ${FAILED_FILE}
exit 1

View File

@@ -1,4 +1,4 @@
#!/usr/bin/env bash
#!/bin/bash
set -eux -o pipefail
cd "$(dirname "${0}")"
# Takes a variable name as argument. The result is stored in that variable.
@@ -60,8 +60,8 @@ function check_timeline() {
# Restarts the compute node with the required compute tag and timeline.
# Accepts the tag for the compute node and the timeline as parameters.
function restart_compute() {
docker compose down compute1 compute_is_ready
COMPUTE_TAG=${1} TENANT_ID=${tenant_id} TIMELINE_ID=${2} docker compose up --quiet-pull -d --build compute1 compute_is_ready
docker compose down compute compute_is_ready
COMPUTE_TAG=${1} TENANT_ID=${tenant_id} TIMELINE_ID=${2} docker compose up --quiet-pull -d --build compute compute_is_ready
wait_for_ready
check_timeline ${2}
}

View File

@@ -13,8 +13,6 @@ use utils::backoff::retry;
pub fn app(state: Arc<Storage>) -> Router<()> {
use axum::routing::{delete as _delete, get as _get};
let delete_prefix = _delete(delete_prefix);
// NB: On any changes do not forget to update the OpenAPI spec
// in /endpoint_storage/src/openapi_spec.yml.
Router::new()
.route(
"/{tenant_id}/{timeline_id}/{endpoint_id}/{*path}",

View File

@@ -1,146 +0,0 @@
openapi: "3.0.2"
info:
title: Endpoint Storage API
description: Endpoint Storage API
version: "1.0"
license:
name: "Apache"
url: https://github.com/neondatabase/neon/blob/main/LICENSE
servers:
- url: ""
paths:
/status:
description: Healthcheck endpoint
get:
description: Healthcheck
security: []
responses:
"200":
description: OK
/{tenant_id}/{timeline_id}/{endpoint_id}/{key}:
parameters:
- name: tenant_id
in: path
required: true
schema:
type: string
- name: timeline_id
in: path
required: true
schema:
type: string
- name: endpoint_id
in: path
required: true
schema:
type: string
- name: key
in: path
required: true
schema:
type: string
get:
description: Get file from blob storage
responses:
"200":
description: "File stream from blob storage"
content:
application/octet-stream:
schema:
type: string
format: binary
"400":
description: File was not found
"403":
description: JWT does not authorize request to this route
put:
description: Insert file into blob storage. If file exists, override it
requestBody:
content:
application/octet-stream:
schema:
type: string
format: binary
responses:
"200":
description: File was inserted successfully
"403":
description: JWT does not authorize request to this route
delete:
description: Delete file from blob storage
responses:
"200":
description: File was successfully deleted or not found
"403":
description: JWT does not authorize request to this route
/{tenant_id}/{timeline_id}/{endpoint_id}:
parameters:
- name: tenant_id
in: path
required: true
schema:
type: string
- name: timeline_id
in: path
required: true
schema:
type: string
- name: endpoint_id
in: path
required: true
schema:
type: string
delete:
description: Delete endpoint data from blob storage
responses:
"200":
description: Endpoint data was deleted
"403":
description: JWT does not authorize request to this route
/{tenant_id}/{timeline_id}:
parameters:
- name: tenant_id
in: path
required: true
schema:
type: string
- name: timeline_id
in: path
required: true
schema:
type: string
delete:
description: Delete timeline data from blob storage
responses:
"200":
description: Timeline data was deleted
"403":
description: JWT does not authorize request to this route
/{tenant_id}:
parameters:
- name: tenant_id
in: path
required: true
schema:
type: string
delete:
description: Delete tenant data from blob storage
responses:
"200":
description: Tenant data was deleted
"403":
description: JWT does not authorize request to this route
components:
securitySchemes:
JWT:
type: http
scheme: bearer
bearerFormat: JWT
security:
- JWT: []

View File

@@ -46,7 +46,7 @@ pub struct ExtensionInstallResponse {
pub version: ExtVersion,
}
#[derive(Serialize, Default, Debug, Clone, PartialEq)]
#[derive(Serialize, Default, Debug, Clone)]
#[serde(tag = "status", rename_all = "snake_case")]
pub enum LfcPrewarmState {
#[default]
@@ -58,17 +58,6 @@ pub enum LfcPrewarmState {
},
}
impl Display for LfcPrewarmState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
LfcPrewarmState::NotPrewarmed => f.write_str("NotPrewarmed"),
LfcPrewarmState::Prewarming => f.write_str("Prewarming"),
LfcPrewarmState::Completed => f.write_str("Completed"),
LfcPrewarmState::Failed { error } => write!(f, "Error({error})"),
}
}
}
#[derive(Serialize, Default, Debug, Clone, PartialEq)]
#[serde(tag = "status", rename_all = "snake_case")]
pub enum LfcOffloadState {
@@ -81,23 +70,6 @@ pub enum LfcOffloadState {
},
}
#[derive(Serialize, Debug, Clone, PartialEq)]
#[serde(tag = "status", rename_all = "snake_case")]
/// Response of /promote
pub enum PromoteState {
NotPromoted,
Completed,
Failed { error: String },
}
#[derive(Deserialize, Serialize, Default, Debug, Clone)]
#[serde(rename_all = "snake_case")]
/// Result of /safekeepers_lsn
pub struct SafekeepersLsn {
pub safekeepers: String,
pub wal_flush_lsn: utils::lsn::Lsn,
}
/// Response of the /status API
#[derive(Serialize, Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
@@ -121,15 +93,6 @@ pub enum TerminateMode {
Immediate,
}
impl From<TerminateMode> for ComputeStatus {
fn from(mode: TerminateMode) -> Self {
match mode {
TerminateMode::Fast => ComputeStatus::TerminationPendingFast,
TerminateMode::Immediate => ComputeStatus::TerminationPendingImmediate,
}
}
}
#[derive(Serialize, Clone, Copy, Debug, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum ComputeStatus {
@@ -150,9 +113,7 @@ pub enum ComputeStatus {
// control-plane to terminate it.
Failed,
// Termination requested
TerminationPendingFast,
// Termination requested, without waiting 30s before returning from /terminate
TerminationPendingImmediate,
TerminationPending { mode: TerminateMode },
// Terminated Postgres
Terminated,
}
@@ -171,10 +132,7 @@ impl Display for ComputeStatus {
ComputeStatus::Running => f.write_str("running"),
ComputeStatus::Configuration => f.write_str("configuration"),
ComputeStatus::Failed => f.write_str("failed"),
ComputeStatus::TerminationPendingFast => f.write_str("termination-pending-fast"),
ComputeStatus::TerminationPendingImmediate => {
f.write_str("termination-pending-immediate")
}
ComputeStatus::TerminationPending { .. } => f.write_str("termination-pending"),
ComputeStatus::Terminated => f.write_str("terminated"),
}
}

View File

@@ -442,7 +442,7 @@ pub struct JwksSettings {
}
/// Protocol used to connect to a Pageserver. Parsed from the connstring scheme.
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
#[derive(Clone, Copy, Debug, Default)]
pub enum PageserverProtocol {
/// The original protocol based on libpq and COPY. Uses postgresql:// or postgres:// scheme.
#[default]

View File

@@ -20,7 +20,6 @@ use tokio_stream::wrappers::ReceiverStream;
use tokio_util::io::ReaderStream;
use tracing::{Instrument, debug, info, info_span, warn};
use utils::auth::{AuthError, Claims, SwappableJwtAuth};
use utils::metrics_collector::{METRICS_COLLECTOR, METRICS_STALE_MILLIS};
use crate::error::{ApiError, api_error_handler, route_error_handler};
use crate::request::{get_query_param, parse_query_param};
@@ -251,28 +250,9 @@ impl std::io::Write for ChannelWriter {
}
}
pub async fn prometheus_metrics_handler(
req: Request<Body>,
force_metric_collection_on_scrape: bool,
) -> Result<Response<Body>, ApiError> {
pub async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<Body>, ApiError> {
SERVE_METRICS_COUNT.inc();
// HADRON
let requested_use_latest = parse_query_param(&req, "use_latest")?;
let use_latest = match requested_use_latest {
None => force_metric_collection_on_scrape,
Some(true) => true,
Some(false) => {
if force_metric_collection_on_scrape {
// We don't cache in this case
true
} else {
false
}
}
};
let started_at = std::time::Instant::now();
let (tx, rx) = mpsc::channel(1);
@@ -297,18 +277,12 @@ pub async fn prometheus_metrics_handler(
let _span = span.entered();
// HADRON
let collected = if use_latest {
// Skip caching the results if we always force metric collection on scrape.
METRICS_COLLECTOR.run_once(!force_metric_collection_on_scrape)
} else {
METRICS_COLLECTOR.last_collected()
};
let metrics = metrics::gather();
let gathered_at = std::time::Instant::now();
let res = encoder
.encode(&collected.metrics, &mut writer)
.encode(&metrics, &mut writer)
.and_then(|_| writer.flush().map_err(|e| e.into()));
// this instant is not when we finally got the full response sent, sending is done by hyper
@@ -321,10 +295,6 @@ pub async fn prometheus_metrics_handler(
let encoded_in = encoded_at - gathered_at - writer.wait_time();
let total = encoded_at - started_at;
// HADRON
let staleness_ms = (encoded_at - collected.collected_at).as_millis();
METRICS_STALE_MILLIS.set(staleness_ms as i64);
match res {
Ok(()) => {
tracing::info!(
@@ -333,7 +303,6 @@ pub async fn prometheus_metrics_handler(
spawning_ms = spawned_in.as_millis(),
collection_ms = collected_in.as_millis(),
encoding_ms = encoded_in.as_millis(),
stalenss_ms = staleness_ms,
"responded /metrics"
);
}

View File

@@ -41,35 +41,17 @@ pub fn get_query_param<'a>(
Some(q) => q,
None => return Ok(None),
};
let values = url::form_urlencoded::parse(query.as_bytes())
let mut values = url::form_urlencoded::parse(query.as_bytes())
.filter_map(|(k, v)| if k == param_name { Some(v) } else { None })
// we call .next() twice below. If it's None the first time, .fuse() ensures it's None afterwards
.fuse();
// Work around an issue with Alloy's pyroscope scrape where the "seconds"
// parameter is added several times. https://github.com/grafana/alloy/issues/3026
// TODO: revert after Alloy is fixed.
let value1 = values
.map(Ok)
.reduce(|acc, i| {
match acc {
Err(_) => acc,
// It's okay to have duplicates as along as they have the same value.
Ok(ref a) if a == &i.unwrap() => acc,
_ => Err(ApiError::BadRequest(anyhow!(
"param {param_name} specified more than once"
))),
}
})
.transpose()?;
// if values.next().is_some() {
// return Err(ApiError::BadRequest(anyhow!(
// "param {param_name} specified more than once"
// )));
// }
let value1 = values.next();
if values.next().is_some() {
return Err(ApiError::BadRequest(anyhow!(
"param {param_name} specified more than once"
)));
}
Ok(value1)
}
@@ -110,39 +92,3 @@ pub async fn ensure_no_body(request: &mut Request<Body>) -> Result<(), ApiError>
None => Ok(()),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_get_query_param_duplicate() {
let req = Request::builder()
.uri("http://localhost:12345/testuri?testparam=1")
.body(hyper::Body::empty())
.unwrap();
let value = get_query_param(&req, "testparam").unwrap();
assert_eq!(value.unwrap(), "1");
let req = Request::builder()
.uri("http://localhost:12345/testuri?testparam=1&testparam=1")
.body(hyper::Body::empty())
.unwrap();
let value = get_query_param(&req, "testparam").unwrap();
assert_eq!(value.unwrap(), "1");
let req = Request::builder()
.uri("http://localhost:12345/testuri")
.body(hyper::Body::empty())
.unwrap();
let value = get_query_param(&req, "testparam").unwrap();
assert!(value.is_none());
let req = Request::builder()
.uri("http://localhost:12345/testuri?testparam=1&testparam=2&testparam=3")
.body(hyper::Body::empty())
.unwrap();
let value = get_query_param(&req, "testparam");
assert!(value.is_err());
}
}

View File

@@ -5,7 +5,6 @@ mod tests;
use const_format::formatcp;
use posthog_client_lite::PostHogClientConfig;
use utils::serde_percent::Percent;
pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
@@ -224,9 +223,8 @@ pub struct ConfigToml {
pub metric_collection_bucket: Option<RemoteStorageConfig>,
#[serde(with = "humantime_serde")]
pub synthetic_size_calculation_interval: Duration,
pub disk_usage_based_eviction: DiskUsageEvictionTaskConfig,
pub disk_usage_based_eviction: Option<DiskUsageEvictionTaskConfig>,
pub test_remote_failures: u64,
pub test_remote_failures_probability: u64,
pub ondemand_download_behavior_treat_error_as_warn: bool,
#[serde(with = "humantime_serde")]
pub background_task_maximum_delay: Duration,
@@ -272,13 +270,9 @@ pub struct ConfigToml {
pub timeline_import_config: TimelineImportConfig,
#[serde(skip_serializing_if = "Option::is_none")]
pub basebackup_cache_config: Option<BasebackupCacheConfig>,
#[serde(skip_serializing_if = "Option::is_none")]
pub image_layer_generation_large_timeline_threshold: Option<u64>,
pub force_metric_collection_on_scrape: bool,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(default)]
pub struct DiskUsageEvictionTaskConfig {
pub max_usage_pct: utils::serde_percent::Percent,
pub min_avail_bytes: u64,
@@ -289,21 +283,6 @@ pub struct DiskUsageEvictionTaskConfig {
/// Select sorting for evicted layers
#[serde(default)]
pub eviction_order: EvictionOrder,
pub enabled: bool,
}
impl Default for DiskUsageEvictionTaskConfig {
fn default() -> Self {
Self {
max_usage_pct: Percent::new(80).unwrap(),
min_avail_bytes: 2_000_000_000,
period: Duration::from_secs(60),
#[cfg(feature = "testing")]
mock_statvfs: None,
eviction_order: EvictionOrder::default(),
enabled: true,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -564,11 +543,6 @@ pub struct TenantConfigToml {
pub gc_period: Duration,
// Delta layer churn threshold to create L1 image layers.
pub image_creation_threshold: usize,
// HADRON
// When the timeout is reached, PageServer will (1) force compact any remaining L0 deltas and
// (2) create image layers if there are any L1 deltas.
#[serde(with = "humantime_serde")]
pub image_layer_force_creation_period: Option<Duration>,
// Determines how much history is retained, to allow
// branching and read replicas at an older point in time.
// The unit is time.
@@ -764,10 +738,9 @@ impl Default for ConfigToml {
metric_collection_bucket: (None),
disk_usage_based_eviction: DiskUsageEvictionTaskConfig::default(),
disk_usage_based_eviction: (None),
test_remote_failures: (0),
test_remote_failures_probability: (100),
ondemand_download_behavior_treat_error_as_warn: (false),
@@ -831,8 +804,6 @@ impl Default for ConfigToml {
},
basebackup_cache_config: None,
posthog_config: None,
image_layer_generation_large_timeline_threshold: Some(2 * 1024 * 1024 * 1024),
force_metric_collection_on_scrape: true,
}
}
}
@@ -926,7 +897,6 @@ impl Default for TenantConfigToml {
gc_period: humantime::parse_duration(DEFAULT_GC_PERIOD)
.expect("cannot parse default gc period"),
image_creation_threshold: DEFAULT_IMAGE_CREATION_THRESHOLD,
image_layer_force_creation_period: None,
pitr_interval: humantime::parse_duration(DEFAULT_PITR_INTERVAL)
.expect("cannot parse default PITR interval"),
walreceiver_connect_timeout: humantime::parse_duration(

View File

@@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize};
use utils::id::{NodeId, TenantId, TimelineId};
use utils::lsn::Lsn;
use crate::models::{PageserverUtilization, ShardParameters, TenantConfig, TimelineInfo};
use crate::models::{PageserverUtilization, ShardParameters, TenantConfig};
use crate::shard::{ShardStripeSize, TenantShardId};
#[derive(Serialize, Deserialize, Debug)]
@@ -126,13 +126,6 @@ pub struct TenantDescribeResponse {
pub config: TenantConfig,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct TenantTimelineDescribeResponse {
pub shards: Vec<TimelineInfo>,
#[serde(skip_serializing_if = "Option::is_none")]
pub image_consistent_lsn: Option<Lsn>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct NodeShardResponse {
pub node_id: NodeId,

View File

@@ -384,7 +384,7 @@ pub struct SafekeepersInfo {
pub safekeepers: Vec<SafekeeperInfo>,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
#[derive(Serialize, Deserialize, Clone)]
pub struct SafekeeperInfo {
pub id: NodeId,
pub hostname: String,
@@ -597,9 +597,6 @@ pub struct TenantConfigPatch {
pub gc_period: FieldPatch<String>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub image_creation_threshold: FieldPatch<usize>,
// HADRON
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub image_layer_force_creation_period: FieldPatch<String>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub pitr_interval: FieldPatch<String>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
@@ -703,11 +700,6 @@ pub struct TenantConfig {
#[serde(skip_serializing_if = "Option::is_none")]
pub image_creation_threshold: Option<usize>,
// HADRON
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(with = "humantime_serde")]
pub image_layer_force_creation_period: Option<Duration>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(with = "humantime_serde")]
pub pitr_interval: Option<Duration>,
@@ -806,7 +798,6 @@ impl TenantConfig {
mut gc_horizon,
mut gc_period,
mut image_creation_threshold,
mut image_layer_force_creation_period,
mut pitr_interval,
mut walreceiver_connect_timeout,
mut lagging_wal_timeout,
@@ -870,11 +861,6 @@ impl TenantConfig {
patch
.image_creation_threshold
.apply(&mut image_creation_threshold);
// HADRON
patch
.image_layer_force_creation_period
.map(|v| humantime::parse_duration(&v))?
.apply(&mut image_layer_force_creation_period);
patch
.pitr_interval
.map(|v| humantime::parse_duration(&v))?
@@ -956,7 +942,6 @@ impl TenantConfig {
gc_horizon,
gc_period,
image_creation_threshold,
image_layer_force_creation_period,
pitr_interval,
walreceiver_connect_timeout,
lagging_wal_timeout,
@@ -1031,9 +1016,6 @@ impl TenantConfig {
image_creation_threshold: self
.image_creation_threshold
.unwrap_or(global_conf.image_creation_threshold),
image_layer_force_creation_period: self
.image_layer_force_creation_period
.or(global_conf.image_layer_force_creation_period),
pitr_interval: self.pitr_interval.unwrap_or(global_conf.pitr_interval),
walreceiver_connect_timeout: self
.walreceiver_connect_timeout
@@ -1622,9 +1604,6 @@ pub struct TimelineInfo {
/// Whether the timeline is invisible in synthetic size calculations.
pub is_invisible: Option<bool>,
// HADRON: the largest LSN below which all page updates have been included in the image layers.
#[serde(skip_serializing_if = "Option::is_none")]
pub image_consistent_lsn: Option<Lsn>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]

View File

@@ -332,11 +332,7 @@ fn hash_combine(mut a: u32, mut b: u32) -> u32 {
///
/// The mapping of key to shard is not stable across changes to ShardCount: this is intentional
/// and will be handled at higher levels when shards are split.
pub fn key_to_shard_number(
count: ShardCount,
stripe_size: ShardStripeSize,
key: &Key,
) -> ShardNumber {
fn key_to_shard_number(count: ShardCount, stripe_size: ShardStripeSize, key: &Key) -> ShardNumber {
// Fast path for un-sharded tenants or broadcast keys
if count < ShardCount(2) || key_is_shard0(key) {
return ShardNumber(0);

View File

@@ -13,7 +13,6 @@ aws-smithy-async.workspace = true
aws-smithy-types.workspace = true
aws-config.workspace = true
aws-sdk-s3.workspace = true
base64.workspace = true
bytes.workspace = true
camino = { workspace = true, features = ["serde1"] }
humantime-serde.workspace = true
@@ -42,9 +41,6 @@ http-body-util.workspace = true
itertools.workspace = true
sync_wrapper = { workspace = true, features = ["futures"] }
byteorder = "1.4"
rand = "0.8.5"
[dev-dependencies]
camino-tempfile.workspace = true
test-context.workspace = true

View File

@@ -14,25 +14,17 @@ use anyhow::{Context, Result, anyhow};
use azure_core::request_options::{IfMatchCondition, MaxResults, Metadata, Range};
use azure_core::{Continuable, HttpClient, RetryOptions, TransportOptions};
use azure_storage::StorageCredentials;
use azure_storage_blobs::blob::BlobBlockType;
use azure_storage_blobs::blob::BlockList;
use azure_storage_blobs::blob::operations::GetBlobBuilder;
use azure_storage_blobs::blob::{Blob, CopyStatus};
use azure_storage_blobs::container::operations::ListBlobsBuilder;
use azure_storage_blobs::prelude::ClientBuilder;
use azure_storage_blobs::{blob::operations::GetBlobBuilder, prelude::ContainerClient};
use base64::{Engine as _, engine::general_purpose::URL_SAFE};
use byteorder::{BigEndian, ByteOrder};
use azure_storage_blobs::prelude::{ClientBuilder, ContainerClient};
use bytes::Bytes;
use camino::Utf8Path;
use futures::FutureExt;
use futures::future::Either;
use futures::stream::Stream;
use futures_util::{StreamExt, TryStreamExt};
use http_types::{StatusCode, Url};
use scopeguard::ScopeGuard;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncSeekExt;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use utils::backoff;
@@ -59,9 +51,6 @@ pub struct AzureBlobStorage {
// Alternative timeout used for metadata objects which are expected to be small
pub small_timeout: Duration,
/* BEGIN_HADRON */
pub put_block_size_mb: Option<usize>,
/* END_HADRON */
}
impl AzureBlobStorage {
@@ -118,9 +107,6 @@ impl AzureBlobStorage {
concurrency_limiter: ConcurrencyLimiter::new(azure_config.concurrency_limit.get()),
timeout,
small_timeout,
/* BEGIN_HADRON */
put_block_size_mb: azure_config.put_block_size_mb,
/* END_HADRON */
})
}
@@ -597,137 +583,31 @@ impl RemoteStorage for AzureBlobStorage {
let started_at = start_measuring_requests(kind);
let mut metadata_map = metadata.unwrap_or([].into());
let timeline_file_path = metadata_map.0.remove("databricks_azure_put_block");
/* BEGIN_HADRON */
let op = async move {
let op = async {
let blob_client = self.client.blob_client(self.relative_path_to_name(to));
let put_block_size = self.put_block_size_mb.unwrap_or(0) * 1024 * 1024;
if timeline_file_path.is_none() || put_block_size == 0 {
// Use put_block_blob directly.
let from: Pin<
Box<dyn Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static>,
> = Box::pin(from);
let from = NonSeekableStream::new(from, data_size_bytes);
let body = azure_core::Body::SeekableStream(Box::new(from));
let mut builder = blob_client.put_block_blob(body);
if !metadata_map.0.is_empty() {
builder = builder.metadata(to_azure_metadata(metadata_map));
}
let fut = builder.into_future();
let fut = tokio::time::timeout(self.timeout, fut);
let result = fut.await;
match result {
Ok(Ok(_response)) => return Ok(()),
Ok(Err(azure)) => return Err(azure.into()),
Err(_timeout) => return Err(TimeoutOrCancel::Timeout.into()),
};
}
// Upload chunks concurrently using Put Block.
// Each PutBlock uploads put_block_size bytes of the file.
let mut upload_futures: Vec<tokio::task::JoinHandle<Result<(), azure_core::Error>>> =
vec![];
let mut block_list = BlockList::default();
let mut start_bytes = 0u64;
let mut remaining_bytes = data_size_bytes;
let mut block_list_count = 0;
let from: Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static>> =
Box::pin(from);
while remaining_bytes > 0 {
let block_size = std::cmp::min(remaining_bytes, put_block_size);
let end_bytes = start_bytes + block_size as u64;
let block_id = block_list_count;
let timeout = self.timeout;
let blob_client = blob_client.clone();
let timeline_file = timeline_file_path.clone().unwrap().clone();
let from = NonSeekableStream::new(from, data_size_bytes);
let mut encoded_block_id = [0u8; 8];
BigEndian::write_u64(&mut encoded_block_id, block_id);
URL_SAFE.encode(encoded_block_id);
let body = azure_core::Body::SeekableStream(Box::new(from));
// Put one block.
let part_fut = async move {
let mut file = File::open(Utf8Path::new(&timeline_file.clone())).await?;
file.seek(io::SeekFrom::Start(start_bytes)).await?;
let limited_reader = file.take(block_size as u64);
let file_chunk_stream =
tokio_util::io::ReaderStream::with_capacity(limited_reader, 1024 * 1024);
let file_chunk_stream_pin: Pin<
Box<dyn Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static>,
> = Box::pin(file_chunk_stream);
let stream_wrapper = NonSeekableStream::new(file_chunk_stream_pin, block_size);
let body = azure_core::Body::SeekableStream(Box::new(stream_wrapper));
// Azure put block takes URL-encoded block ids and all blocks must have the same byte length.
// https://learn.microsoft.com/en-us/rest/api/storageservices/put-block?tabs=microsoft-entra-id#uri-parameters
let builder = blob_client.put_block(encoded_block_id.to_vec(), body);
let fut = builder.into_future();
let fut = tokio::time::timeout(timeout, fut);
let result = fut.await;
tracing::debug!(
"azure put block id-{} size {} start {} end {} file {} response {:#?}",
block_id,
block_size,
start_bytes,
end_bytes,
timeline_file,
result
);
match result {
Ok(Ok(_response)) => Ok(()),
Ok(Err(azure)) => Err(azure),
Err(_timeout) => Err(azure_core::Error::new(
azure_core::error::ErrorKind::Io,
std::io::Error::new(
std::io::ErrorKind::TimedOut,
"Operation timed out",
),
)),
}
};
upload_futures.push(tokio::spawn(part_fut));
let mut builder = blob_client.put_block_blob(body);
block_list_count += 1;
remaining_bytes -= block_size;
start_bytes += block_size as u64;
block_list
.blocks
.push(BlobBlockType::Uncommitted(encoded_block_id.to_vec().into()));
if let Some(metadata) = metadata {
builder = builder.metadata(to_azure_metadata(metadata));
}
tracing::debug!(
"azure put blocks {} total MB: {} chunk size MB: {}",
block_list_count,
data_size_bytes / 1024 / 1024,
put_block_size / 1024 / 1024
);
// Wait for all blocks to be uploaded.
let upload_results = futures::future::try_join_all(upload_futures).await;
if upload_results.is_err() {
return Err(anyhow::anyhow!(format!(
"Failed to upload all blocks {:#?}",
upload_results.unwrap_err()
)));
}
// Commit the blocks.
let mut builder = blob_client.put_block_list(block_list);
if !metadata_map.0.is_empty() {
builder = builder.metadata(to_azure_metadata(metadata_map));
}
let fut = builder.into_future();
let fut = tokio::time::timeout(self.timeout, fut);
let result = fut.await;
tracing::debug!("azure put block list response {:#?}", result);
match result {
match fut.await {
Ok(Ok(_response)) => Ok(()),
Ok(Err(azure)) => Err(azure.into()),
Err(_timeout) => Err(TimeoutOrCancel::Timeout.into()),
}
};
/* END_HADRON */
let res = tokio::select! {
res = op => res,
@@ -742,6 +622,7 @@ impl RemoteStorage for AzureBlobStorage {
crate::metrics::BUCKET_METRICS
.req_seconds
.observe_elapsed(kind, outcome, started_at);
res
}

View File

@@ -195,19 +195,8 @@ pub struct AzureConfig {
pub max_keys_per_list_response: Option<i32>,
#[serde(default = "default_azure_conn_pool_size")]
pub conn_pool_size: usize,
/* BEGIN_HADRON */
#[serde(default = "default_azure_put_block_size_mb")]
pub put_block_size_mb: Option<usize>,
/* END_HADRON */
}
/* BEGIN_HADRON */
fn default_azure_put_block_size_mb() -> Option<usize> {
// Disable parallel upload by default.
Some(0)
}
/* END_HADRON */
fn default_remote_storage_azure_concurrency_limit() -> NonZeroUsize {
NonZeroUsize::new(DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT).unwrap()
}
@@ -224,9 +213,6 @@ impl Debug for AzureConfig {
"max_keys_per_list_response",
&self.max_keys_per_list_response,
)
/* BEGIN_HADRON */
.field("put_block_size_mb", &self.put_block_size_mb)
/* END_HADRON */
.finish()
}
}
@@ -366,7 +352,6 @@ timeout = '5s'";
upload_storage_class = 'INTELLIGENT_TIERING'
timeout = '7s'
conn_pool_size = 8
put_block_size_mb = 1024
";
let config = parse(toml).unwrap();
@@ -382,9 +367,6 @@ timeout = '5s'";
concurrency_limit: default_remote_storage_azure_concurrency_limit(),
max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
conn_pool_size: 8,
/* BEGIN_HADRON */
put_block_size_mb: Some(1024),
/* END_HADRON */
}),
timeout: Duration::from_secs(7),
small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT

View File

@@ -732,15 +732,9 @@ impl GenericRemoteStorage {
})
}
/* BEGIN_HADRON */
pub fn unreliable_wrapper(s: Self, fail_first: u64, fail_probability: u64) -> Self {
Self::Unreliable(Arc::new(UnreliableWrapper::new(
s,
fail_first,
fail_probability,
)))
pub fn unreliable_wrapper(s: Self, fail_first: u64) -> Self {
Self::Unreliable(Arc::new(UnreliableWrapper::new(s, fail_first)))
}
/* END_HADRON */
/// See [`RemoteStorage::upload`], which this method calls with `None` as metadata.
pub async fn upload_storage_object(

View File

@@ -1,8 +1,6 @@
//! This module provides a wrapper around a real RemoteStorage implementation that
//! causes the first N attempts at each upload or download operatio to fail. For
//! testing purposes.
use rand::Rng;
use std::cmp;
use std::collections::HashMap;
use std::collections::hash_map::Entry;
use std::num::NonZeroU32;
@@ -27,12 +25,6 @@ pub struct UnreliableWrapper {
// Tracks how many failed attempts of each operation has been made.
attempts: Mutex<HashMap<RemoteOp, u64>>,
/* BEGIN_HADRON */
// This the probability of failure for each operation, ranged from [0, 100].
// The probability is default to 100, which means that all operations will fail.
attempt_failure_probability: u64,
/* END_HADRON */
}
/// Used to identify retries of different unique operation.
@@ -48,11 +40,7 @@ enum RemoteOp {
}
impl UnreliableWrapper {
pub fn new(
inner: crate::GenericRemoteStorage,
attempts_to_fail: u64,
attempt_failure_probability: u64,
) -> Self {
pub fn new(inner: crate::GenericRemoteStorage, attempts_to_fail: u64) -> Self {
assert!(attempts_to_fail > 0);
let inner = match inner {
GenericRemoteStorage::AwsS3(s) => GenericRemoteStorage::AwsS3(s),
@@ -63,11 +51,9 @@ impl UnreliableWrapper {
panic!("Can't wrap unreliable wrapper unreliably")
}
};
let actual_attempt_failure_probability = cmp::min(attempt_failure_probability, 100);
UnreliableWrapper {
inner,
attempts_to_fail,
attempt_failure_probability: actual_attempt_failure_probability,
attempts: Mutex::new(HashMap::new()),
}
}
@@ -80,7 +66,6 @@ impl UnreliableWrapper {
///
fn attempt(&self, op: RemoteOp) -> anyhow::Result<u64> {
let mut attempts = self.attempts.lock().unwrap();
let mut rng = rand::thread_rng();
match attempts.entry(op) {
Entry::Occupied(mut e) => {
@@ -90,19 +75,15 @@ impl UnreliableWrapper {
*p
};
/* BEGIN_HADRON */
// If there are more attempts to fail, fail the request by probability.
if (attempts_before_this < self.attempts_to_fail)
&& (rng.gen_range(0..=100) < self.attempt_failure_probability)
{
if attempts_before_this >= self.attempts_to_fail {
// let it succeed
e.remove();
Ok(attempts_before_this)
} else {
let error =
anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
Err(error)
} else {
e.remove();
Ok(attempts_before_this)
}
/* END_HADRON */
}
Entry::Vacant(e) => {
let error = anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());

View File

@@ -165,42 +165,10 @@ pub(crate) async fn upload_remote_data(
let (data, data_len) =
upload_stream(format!("remote blob data {i}").into_bytes().into());
/* BEGIN_HADRON */
let mut metadata = None;
if matches!(&*task_client, GenericRemoteStorage::AzureBlob(_)) {
let file_path = "/tmp/dbx_upload_tmp_file.txt";
{
// Open the file in append mode
let mut file = std::fs::OpenOptions::new()
.append(true)
.create(true) // Create the file if it doesn't exist
.open(file_path)?;
// Append some bytes to the file
std::io::Write::write_all(
&mut file,
&format!("remote blob data {i}").into_bytes(),
)?;
file.sync_all()?;
}
metadata = Some(remote_storage::StorageMetadata::from([(
"databricks_azure_put_block",
file_path,
)]));
}
/* END_HADRON */
task_client
.upload(data, data_len, &blob_path, metadata, &cancel)
.upload(data, data_len, &blob_path, None, &cancel)
.await?;
// TODO: Check upload is using the put_block upload.
// We cannot consume data here since data is moved inside the upload.
// let total_bytes = data.fold(0, |acc, chunk| async move {
// acc + chunk.map(|bytes| bytes.len()).unwrap_or(0)
// }).await;
// assert_eq!(total_bytes, data_len);
Ok::<_, anyhow::Error>((blob_prefix, blob_path))
});
}

View File

@@ -219,9 +219,6 @@ async fn create_azure_client(
concurrency_limit: NonZeroUsize::new(100).unwrap(),
max_keys_per_list_response,
conn_pool_size: 8,
/* BEGIN_HADRON */
put_block_size_mb: Some(1),
/* END_HADRON */
}),
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT,

View File

@@ -11,7 +11,7 @@ use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
use utils::lsn::Lsn;
use utils::pageserver_feedback::PageserverFeedback;
use crate::membership::{Configuration, SafekeeperGeneration};
use crate::membership::Configuration;
use crate::{ServerInfo, Term};
#[derive(Debug, Serialize, Deserialize)]
@@ -311,12 +311,3 @@ pub struct PullTimelineResponse {
pub safekeeper_host: Option<String>,
// TODO: add more fields?
}
/// Response to a timeline locate request.
/// Storcon-only API.
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct TimelineLocateResponse {
pub generation: SafekeeperGeneration,
pub sk_set: Vec<NodeId>,
pub new_sk_set: Option<Vec<NodeId>>,
}

View File

@@ -44,62 +44,3 @@ where
}
}
}
/* BEGIN_HADRON */
pub enum DeploymentMode {
Dev,
Staging,
Prod,
}
pub fn get_deployment_mode() -> Option<DeploymentMode> {
match std::env::var("DEPLOYMENT_MODE") {
Ok(env) => match env.as_str() {
"development" => Some(DeploymentMode::Dev),
"staging" => Some(DeploymentMode::Staging),
"production" => Some(DeploymentMode::Prod),
_ => {
tracing::error!("Unexpected DEPLOYMENT_MODE: {}", env);
None
}
},
Err(_) => {
tracing::error!("DEPLOYMENT_MODE not set");
None
}
}
}
pub fn is_dev_or_staging() -> bool {
matches!(
get_deployment_mode(),
Some(DeploymentMode::Dev) | Some(DeploymentMode::Staging)
)
}
pub enum TestingMode {
Chaos,
Stress,
}
pub fn get_test_mode() -> Option<TestingMode> {
match std::env::var("HADRON_TEST_MODE") {
Ok(env) => match env.as_str() {
"chaos" => Some(TestingMode::Chaos),
"stress" => Some(TestingMode::Stress),
_ => {
tracing::error!("Unexpected HADRON_TEST_MODE: {}", env);
None
}
},
Err(_) => {
tracing::error!("HADRON_TEST_MODE not set");
None
}
}
}
pub fn is_chaos_testing() -> bool {
matches!(get_test_mode(), Some(TestingMode::Chaos))
}
/* END_HADRON */

View File

@@ -99,8 +99,6 @@ pub mod elapsed_accum;
#[cfg(target_os = "linux")]
pub mod linux_socket_ioctl;
pub mod metrics_collector;
// Re-export used in macro. Avoids adding git-version as dep in target crates.
#[doc(hidden)]
pub use git_version;

View File

@@ -1,75 +0,0 @@
use std::{
sync::{Arc, RwLock},
time::{Duration, Instant},
};
use metrics::{IntGauge, proto::MetricFamily, register_int_gauge};
use once_cell::sync::Lazy;
pub static METRICS_STALE_MILLIS: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"metrics_metrics_stale_milliseconds",
"The current metrics stale time in milliseconds"
)
.expect("failed to define a metric")
});
#[derive(Debug)]
pub struct CollectedMetrics {
pub metrics: Vec<MetricFamily>,
pub collected_at: Instant,
}
impl CollectedMetrics {
fn new(metrics: Vec<MetricFamily>) -> Self {
Self {
metrics,
collected_at: Instant::now(),
}
}
}
#[derive(Debug)]
pub struct MetricsCollector {
last_collected: RwLock<Arc<CollectedMetrics>>,
}
impl MetricsCollector {
pub fn new() -> Self {
Self {
last_collected: RwLock::new(Arc::new(CollectedMetrics::new(vec![]))),
}
}
#[tracing::instrument(name = "metrics_collector", skip_all)]
pub fn run_once(&self, cache_metrics: bool) -> Arc<CollectedMetrics> {
let started = Instant::now();
let metrics = metrics::gather();
let collected = Arc::new(CollectedMetrics::new(metrics));
if cache_metrics {
let mut guard = self.last_collected.write().unwrap();
*guard = collected.clone();
}
tracing::info!(
"Collected {} metric families in {} ms",
collected.metrics.len(),
started.elapsed().as_millis()
);
collected
}
pub fn last_collected(&self) -> Arc<CollectedMetrics> {
self.last_collected.read().unwrap().clone()
}
}
impl Default for MetricsCollector {
fn default() -> Self {
Self::new()
}
}
// Interval for metrics collection. Currently hard-coded to be the same as the metrics scape interval from the obs agent
pub static METRICS_COLLECTION_INTERVAL: Duration = Duration::from_secs(30);
pub static METRICS_COLLECTOR: Lazy<MetricsCollector> = Lazy::new(MetricsCollector::default);

View File

@@ -171,12 +171,6 @@ impl std::fmt::Display for ShardNumber {
}
}
impl std::fmt::Display for ShardCount {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl std::fmt::Display for ShardSlug<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(

View File

@@ -428,12 +428,6 @@ pub fn empty_shmem() -> crate::bindings::WalproposerShmemState {
shard_number: 0,
};
let empty_wal_rate_limiter = crate::bindings::WalRateLimiter {
should_limit: crate::bindings::pg_atomic_uint32 { value: 0 },
sent_bytes: 0,
last_recorded_time_us: 0,
};
crate::bindings::WalproposerShmemState {
propEpochStartLsn: crate::bindings::pg_atomic_uint64 { value: 0 },
donor_name: [0; 64],
@@ -447,7 +441,6 @@ pub fn empty_shmem() -> crate::bindings::WalproposerShmemState {
num_shards: 0,
replica_promote: false,
min_ps_feedback: empty_feedback,
wal_rate_limiter: empty_wal_rate_limiter,
}
}

View File

@@ -112,7 +112,6 @@ twox-hash.workspace = true
procfs.workspace = true
[dev-dependencies]
base64.workspace = true
criterion.workspace = true
hex-literal.workspace = true
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time", "test-util"] }

View File

@@ -1,4 +1,4 @@
use std::collections::{BTreeMap, HashMap};
use std::collections::HashMap;
use std::error::Error as _;
use std::time::Duration;
@@ -251,70 +251,6 @@ impl Client {
Ok(())
}
pub async fn tenant_timeline_compact(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
force_image_layer_creation: bool,
must_force_image_layer_creation: bool,
scheduled: bool,
wait_until_done: bool,
) -> Result<()> {
let mut path = reqwest::Url::parse(&format!(
"{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/compact",
self.mgmt_api_endpoint
))
.expect("Cannot build URL");
if force_image_layer_creation {
path.query_pairs_mut()
.append_pair("force_image_layer_creation", "true");
}
if must_force_image_layer_creation {
path.query_pairs_mut()
.append_pair("must_force_image_layer_creation", "true");
}
if scheduled {
path.query_pairs_mut().append_pair("scheduled", "true");
}
if wait_until_done {
path.query_pairs_mut()
.append_pair("wait_until_scheduled_compaction_done", "true");
path.query_pairs_mut()
.append_pair("wait_until_uploaded", "true");
}
self.request(Method::PUT, path, ()).await?;
Ok(())
}
/* BEGIN_HADRON */
pub async fn tenant_timeline_describe(
&self,
tenant_shard_id: &TenantShardId,
timeline_id: &TimelineId,
) -> Result<TimelineInfo> {
let mut path = reqwest::Url::parse(&format!(
"{}/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}",
self.mgmt_api_endpoint
))
.expect("Cannot build URL");
path.query_pairs_mut()
.append_pair("include-image-consistent-lsn", "true");
let response: reqwest::Response = self.request(Method::GET, path, ()).await?;
let body = response.json().await.map_err(Error::ReceiveBody)?;
Ok(body)
}
pub async fn list_tenant_visible_size(&self) -> Result<BTreeMap<TenantShardId, u64>> {
let uri = format!("{}/v1/list_tenant_visible_size", self.mgmt_api_endpoint);
let resp = self.get(&uri).await?;
resp.json().await.map_err(Error::ReceiveBody)
}
/* END_HADRON */
pub async fn tenant_scan_remote_storage(
&self,
tenant_id: TenantId,

View File

@@ -1,24 +0,0 @@
[package]
name = "pageserver_client_grpc"
version = "0.1.0"
edition.workspace = true
license.workspace = true
[features]
testing = ["pageserver_api/testing"]
[dependencies]
anyhow.workspace = true
arc-swap.workspace = true
bytes.workspace = true
compute_api.workspace = true
futures.workspace = true
pageserver_api.workspace = true
pageserver_page_api.workspace = true
tokio.workspace = true
tokio-stream.workspace = true
tokio-util.workspace = true
tonic.workspace = true
tracing.workspace = true
utils.workspace = true
workspace_hack.workspace = true

View File

@@ -1,543 +0,0 @@
use std::collections::HashMap;
use std::num::NonZero;
use std::sync::Arc;
use anyhow::anyhow;
use arc_swap::ArcSwap;
use futures::stream::FuturesUnordered;
use futures::{FutureExt as _, StreamExt as _};
use tonic::codec::CompressionEncoding;
use tracing::instrument;
use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool};
use crate::retry::Retry;
use crate::split::GetPageSplitter;
use compute_api::spec::PageserverProtocol;
use pageserver_api::shard::ShardStripeSize;
use pageserver_page_api as page_api;
use utils::id::{TenantId, TimelineId};
use utils::shard::{ShardCount, ShardIndex, ShardNumber};
/// Max number of concurrent clients per channel (i.e. TCP connection). New channels will be spun up
/// when full.
///
/// TODO: tune all of these constants, and consider making them configurable.
/// TODO: consider separate limits for unary and streaming clients, so we don't fill up channels
/// with only streams.
const MAX_CLIENTS_PER_CHANNEL: NonZero<usize> = NonZero::new(16).unwrap();
/// Max number of concurrent unary request clients per shard.
const MAX_UNARY_CLIENTS: NonZero<usize> = NonZero::new(64).unwrap();
/// Max number of concurrent GetPage streams per shard. The max number of concurrent GetPage
/// requests is given by `MAX_STREAMS * MAX_STREAM_QUEUE_DEPTH`.
const MAX_STREAMS: NonZero<usize> = NonZero::new(64).unwrap();
/// Max number of pipelined requests per stream.
const MAX_STREAM_QUEUE_DEPTH: NonZero<usize> = NonZero::new(2).unwrap();
/// Max number of concurrent bulk GetPage streams per shard, used e.g. for prefetches. Because these
/// are more throughput-oriented, we have a smaller limit but higher queue depth.
const MAX_BULK_STREAMS: NonZero<usize> = NonZero::new(16).unwrap();
/// Max number of pipelined requests per bulk stream. These are more throughput-oriented and thus
/// get a larger queue depth.
const MAX_BULK_STREAM_QUEUE_DEPTH: NonZero<usize> = NonZero::new(4).unwrap();
/// A rich Pageserver gRPC client for a single tenant timeline. This client is more capable than the
/// basic `page_api::Client` gRPC client, and supports:
///
/// * Sharded tenants across multiple Pageservers.
/// * Pooling of connections, clients, and streams for efficient resource use.
/// * Concurrent use by many callers.
/// * Internal handling of GetPage bidirectional streams, with pipelining and error handling.
/// * Automatic retries.
/// * Observability.
///
/// TODO: this client does not support base backups or LSN leases, as these are only used by
/// compute_ctl. Consider adding this, but LSN leases need concurrent requests on all shards.
pub struct PageserverClient {
/// The tenant ID.
tenant_id: TenantId,
/// The timeline ID.
timeline_id: TimelineId,
/// The JWT auth token for this tenant, if any.
auth_token: Option<String>,
/// The compression to use, if any.
compression: Option<CompressionEncoding>,
/// The shards for this tenant.
shards: ArcSwap<Shards>,
/// The retry configuration.
retry: Retry,
}
impl PageserverClient {
/// Creates a new Pageserver client for a given tenant and timeline. Uses the Pageservers given
/// in the shard spec, which must be complete and must use gRPC URLs.
pub fn new(
tenant_id: TenantId,
timeline_id: TimelineId,
shard_spec: ShardSpec,
auth_token: Option<String>,
compression: Option<CompressionEncoding>,
) -> anyhow::Result<Self> {
let shards = Shards::new(
tenant_id,
timeline_id,
shard_spec,
auth_token.clone(),
compression,
)?;
Ok(Self {
tenant_id,
timeline_id,
auth_token,
compression,
shards: ArcSwap::new(Arc::new(shards)),
retry: Retry,
})
}
/// Updates the shards from the given shard spec. In-flight requests will complete using the
/// existing shards, but may retry with the new shards if they fail.
///
/// TODO: verify that in-flight requests are allowed to complete, and that the old pools are
/// properly spun down and dropped afterwards.
pub fn update_shards(&self, shard_spec: ShardSpec) -> anyhow::Result<()> {
// Validate the shard spec. We should really use `ArcSwap::rcu` for this, to avoid races
// with concurrent updates, but that involves creating a new `Shards` on every attempt,
// which spins up a bunch of Tokio tasks and such. These should already be checked elsewhere
// in the stack, and if they're violated then we already have problems elsewhere, so a
// best-effort but possibly-racy check is okay here.
let old = self.shards.load_full();
if shard_spec.count < old.count {
return Err(anyhow!(
"can't reduce shard count from {} to {}",
old.count,
shard_spec.count
));
}
if !old.count.is_unsharded() && shard_spec.stripe_size != old.stripe_size {
return Err(anyhow!(
"can't change stripe size from {} to {}",
old.stripe_size,
shard_spec.stripe_size
));
}
let shards = Shards::new(
self.tenant_id,
self.timeline_id,
shard_spec,
self.auth_token.clone(),
self.compression,
)?;
self.shards.store(Arc::new(shards));
Ok(())
}
/// Returns whether a relation exists.
#[instrument(skip_all, fields(rel=%req.rel, lsn=%req.read_lsn))]
pub async fn check_rel_exists(
&self,
req: page_api::CheckRelExistsRequest,
) -> tonic::Result<page_api::CheckRelExistsResponse> {
self.retry
.with(async |_| {
// Relation metadata is only available on shard 0.
let mut client = self.shards.load_full().get_zero().client().await?;
client.check_rel_exists(req).await
})
.await
}
/// Returns the total size of a database, as # of bytes.
#[instrument(skip_all, fields(db_oid=%req.db_oid, lsn=%req.read_lsn))]
pub async fn get_db_size(
&self,
req: page_api::GetDbSizeRequest,
) -> tonic::Result<page_api::GetDbSizeResponse> {
self.retry
.with(async |_| {
// Relation metadata is only available on shard 0.
let mut client = self.shards.load_full().get_zero().client().await?;
client.get_db_size(req).await
})
.await
}
/// Fetches pages. The `request_id` must be unique across all in-flight requests, and the
/// `attempt` must be 0 (incremented on retry). Automatically splits requests that straddle
/// shard boundaries, and assembles the responses.
///
/// Unlike `page_api::Client`, this automatically converts `status_code` into `tonic::Status`
/// errors. All responses will have `GetPageStatusCode::Ok`.
#[instrument(skip_all, fields(
req_id = %req.request_id,
class = %req.request_class,
rel = %req.rel,
blkno = %req.block_numbers[0],
blks = %req.block_numbers.len(),
lsn = %req.read_lsn,
))]
pub async fn get_page(
&self,
req: page_api::GetPageRequest,
) -> tonic::Result<page_api::GetPageResponse> {
// Make sure we have at least one page.
if req.block_numbers.is_empty() {
return Err(tonic::Status::invalid_argument("no block number"));
}
// The request attempt must be 0. The client will increment it internally.
if req.request_id.attempt != 0 {
return Err(tonic::Status::invalid_argument("request attempt must be 0"));
}
// The shards may change while we're fetching pages. We execute the request using a stable
// view of the shards (especially important for requests that span shards), but retry the
// top-level (pre-split) request to pick up shard changes. This can lead to unnecessary
// retries and re-splits in some cases where requests span shards, but these are expected to
// be rare.
//
// TODO: the gRPC server and client doesn't yet properly support shard splits. Revisit this
// once we figure out how to handle these.
self.retry
.with(async |attempt| {
let mut req = req.clone();
req.request_id.attempt = attempt as u32;
Self::get_page_with_shards(req, &self.shards.load_full()).await
})
.await
}
/// Fetches pages using the given shards. This uses a stable view of the shards, regardless of
/// concurrent shard updates. Does not retry internally, but is retried by `get_page()`.
async fn get_page_with_shards(
req: page_api::GetPageRequest,
shards: &Shards,
) -> tonic::Result<page_api::GetPageResponse> {
// Fast path: request is for a single shard.
if let Some(shard_id) =
GetPageSplitter::for_single_shard(&req, shards.count, shards.stripe_size)
{
return Self::get_page_with_shard(req, shards.get(shard_id)?).await;
}
// Request spans multiple shards. Split it, dispatch concurrent per-shard requests, and
// reassemble the responses.
let mut splitter = GetPageSplitter::split(req, shards.count, shards.stripe_size);
let mut shard_requests = FuturesUnordered::new();
for (shard_id, shard_req) in splitter.drain_requests() {
let future = Self::get_page_with_shard(shard_req, shards.get(shard_id)?)
.map(move |result| result.map(|resp| (shard_id, resp)));
shard_requests.push(future);
}
while let Some((shard_id, shard_response)) = shard_requests.next().await.transpose()? {
splitter.add_response(shard_id, shard_response)?;
}
splitter.get_response()
}
/// Fetches pages on the given shard. Does not retry internally.
async fn get_page_with_shard(
req: page_api::GetPageRequest,
shard: &Shard,
) -> tonic::Result<page_api::GetPageResponse> {
let stream = shard.stream(req.request_class.is_bulk()).await;
let resp = stream.send(req.clone()).await?;
// Convert per-request errors into a tonic::Status.
if resp.status_code != page_api::GetPageStatusCode::Ok {
return Err(tonic::Status::new(
resp.status_code.into(),
resp.reason.unwrap_or_else(|| String::from("unknown error")),
));
}
// Check that we received the expected pages.
if req.rel != resp.rel {
return Err(tonic::Status::internal(format!(
"shard {} returned wrong relation, expected {} got {}",
shard.id, req.rel, resp.rel
)));
}
if !req
.block_numbers
.iter()
.copied()
.eq(resp.pages.iter().map(|p| p.block_number))
{
return Err(tonic::Status::internal(format!(
"shard {} returned wrong pages, expected {:?} got {:?}",
shard.id,
req.block_numbers,
resp.pages
.iter()
.map(|page| page.block_number)
.collect::<Vec<_>>()
)));
}
Ok(resp)
}
/// Returns the size of a relation, as # of blocks.
#[instrument(skip_all, fields(rel=%req.rel, lsn=%req.read_lsn))]
pub async fn get_rel_size(
&self,
req: page_api::GetRelSizeRequest,
) -> tonic::Result<page_api::GetRelSizeResponse> {
self.retry
.with(async |_| {
// Relation metadata is only available on shard 0.
let mut client = self.shards.load_full().get_zero().client().await?;
client.get_rel_size(req).await
})
.await
}
/// Fetches an SLRU segment.
#[instrument(skip_all, fields(kind=%req.kind, segno=%req.segno, lsn=%req.read_lsn))]
pub async fn get_slru_segment(
&self,
req: page_api::GetSlruSegmentRequest,
) -> tonic::Result<page_api::GetSlruSegmentResponse> {
self.retry
.with(async |_| {
// SLRU segments are only available on shard 0.
let mut client = self.shards.load_full().get_zero().client().await?;
client.get_slru_segment(req).await
})
.await
}
}
/// Shard specification for a PageserverClient.
pub struct ShardSpec {
/// Maps shard indices to gRPC URLs.
///
/// INVARIANT: every shard 0..count is present, and shard 0 is always present.
/// INVARIANT: every URL is valid and uses grpc:// scheme.
urls: HashMap<ShardIndex, String>,
/// The shard count.
///
/// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention.
count: ShardCount,
/// The stripe size for these shards.
stripe_size: ShardStripeSize,
}
impl ShardSpec {
/// Creates a new shard spec with the given URLs and stripe size. All shards must be given.
/// The stripe size may be omitted for unsharded tenants.
pub fn new(
urls: HashMap<ShardIndex, String>,
stripe_size: Option<ShardStripeSize>,
) -> anyhow::Result<Self> {
// Compute the shard count.
let count = match urls.len() {
0 => return Err(anyhow!("no shards provided")),
1 => ShardCount::new(0), // NB: unsharded tenants use 0, like `ShardIndex::unsharded()`
n if n > u8::MAX as usize => return Err(anyhow!("too many shards: {n}")),
n => ShardCount::new(n as u8),
};
// Determine the stripe size. It doesn't matter for unsharded tenants.
if stripe_size.is_none() && !count.is_unsharded() {
return Err(anyhow!("stripe size must be given for sharded tenants"));
}
let stripe_size = stripe_size.unwrap_or_default();
// Validate the shard spec.
for (shard_id, url) in &urls {
// The shard index must match the computed shard count, even for unsharded tenants.
if shard_id.shard_count != count {
return Err(anyhow!("invalid shard index {shard_id}, expected {count}"));
}
// The shard index' number and count must be consistent.
if !shard_id.is_unsharded() && shard_id.shard_number.0 >= shard_id.shard_count.0 {
return Err(anyhow!("invalid shard index {shard_id}"));
}
// The above conditions guarantee that we have all shards 0..count: len() matches count,
// shard number < count, and numbers are unique (via hashmap).
// Validate the URL.
if PageserverProtocol::from_connstring(url)? != PageserverProtocol::Grpc {
return Err(anyhow!("invalid shard URL {url}: must use gRPC"));
}
}
Ok(Self {
urls,
count,
stripe_size,
})
}
}
/// Tracks the tenant's shards.
struct Shards {
/// Shards by shard index.
///
/// INVARIANT: every shard 0..count is present.
/// INVARIANT: shard 0 is always present.
by_index: HashMap<ShardIndex, Shard>,
/// The shard count.
///
/// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention.
count: ShardCount,
/// The stripe size. Only used for sharded tenants.
stripe_size: ShardStripeSize,
}
impl Shards {
/// Creates a new set of shards based on a shard spec.
fn new(
tenant_id: TenantId,
timeline_id: TimelineId,
shard_spec: ShardSpec,
auth_token: Option<String>,
compression: Option<CompressionEncoding>,
) -> anyhow::Result<Self> {
// NB: the shard spec has already been validated when constructed.
let mut shards = HashMap::with_capacity(shard_spec.urls.len());
for (shard_id, url) in shard_spec.urls {
shards.insert(
shard_id,
Shard::new(
url,
tenant_id,
timeline_id,
shard_id,
auth_token.clone(),
compression,
)?,
);
}
Ok(Self {
by_index: shards,
count: shard_spec.count,
stripe_size: shard_spec.stripe_size,
})
}
/// Looks up the given shard.
#[allow(clippy::result_large_err)] // TODO: check perf impact
fn get(&self, shard_id: ShardIndex) -> tonic::Result<&Shard> {
self.by_index
.get(&shard_id)
.ok_or_else(|| tonic::Status::not_found(format!("unknown shard {shard_id}")))
}
/// Returns shard 0.
fn get_zero(&self) -> &Shard {
self.get(ShardIndex::new(ShardNumber(0), self.count))
.expect("always present")
}
}
/// A single shard. Uses dedicated resource pools with the following structure:
///
/// * Channel pool: unbounded.
/// * Unary client pool: MAX_UNARY_CLIENTS.
/// * Stream client pool: unbounded.
/// * Stream pool: MAX_STREAMS and MAX_STREAM_QUEUE_DEPTH.
/// * Bulk channel pool: unbounded.
/// * Bulk client pool: unbounded.
/// * Bulk stream pool: MAX_BULK_STREAMS and MAX_BULK_STREAM_QUEUE_DEPTH.
struct Shard {
/// The shard ID.
id: ShardIndex,
/// Unary gRPC client pool.
client_pool: Arc<ClientPool>,
/// GetPage stream pool.
stream_pool: Arc<StreamPool>,
/// GetPage stream pool for bulk requests, e.g. prefetches.
bulk_stream_pool: Arc<StreamPool>,
}
impl Shard {
/// Creates a new shard. It has its own dedicated resource pools.
fn new(
url: String,
tenant_id: TenantId,
timeline_id: TimelineId,
shard_id: ShardIndex,
auth_token: Option<String>,
compression: Option<CompressionEncoding>,
) -> anyhow::Result<Self> {
// Common channel pool for unary and stream requests. Bounded by client/stream pools.
let channel_pool = ChannelPool::new(url.clone(), MAX_CLIENTS_PER_CHANNEL)?;
// Client pool for unary requests.
let client_pool = ClientPool::new(
channel_pool.clone(),
tenant_id,
timeline_id,
shard_id,
auth_token.clone(),
compression,
Some(MAX_UNARY_CLIENTS),
);
// GetPage stream pool. Uses a dedicated client pool to avoid starving out unary clients,
// but shares a channel pool with it (as it's unbounded).
let stream_pool = StreamPool::new(
ClientPool::new(
channel_pool.clone(),
tenant_id,
timeline_id,
shard_id,
auth_token.clone(),
compression,
None, // unbounded, limited by stream pool
),
Some(MAX_STREAMS),
MAX_STREAM_QUEUE_DEPTH,
);
// Bulk GetPage stream pool, e.g. for prefetches. Uses dedicated channel/client/stream pools
// to avoid head-of-line blocking of latency-sensitive requests.
let bulk_stream_pool = StreamPool::new(
ClientPool::new(
ChannelPool::new(url, MAX_CLIENTS_PER_CHANNEL)?,
tenant_id,
timeline_id,
shard_id,
auth_token,
compression,
None, // unbounded, limited by stream pool
),
Some(MAX_BULK_STREAMS),
MAX_BULK_STREAM_QUEUE_DEPTH,
);
Ok(Self {
id: shard_id,
client_pool,
stream_pool,
bulk_stream_pool,
})
}
/// Returns a pooled client for this shard.
async fn client(&self) -> tonic::Result<ClientGuard> {
self.client_pool
.get()
.await
.map_err(|err| tonic::Status::internal(format!("failed to get client: {err}")))
}
/// Returns a pooled stream for this shard. If `bulk` is `true`, uses the dedicated bulk stream
/// pool (e.g. for prefetches).
async fn stream(&self, bulk: bool) -> StreamGuard {
match bulk {
false => self.stream_pool.get().await,
true => self.bulk_stream_pool.get().await,
}
}
}

View File

@@ -1,6 +0,0 @@
mod client;
mod pool;
mod retry;
mod split;
pub use client::{PageserverClient, ShardSpec};

View File

@@ -1,779 +0,0 @@
//! This module provides various Pageserver gRPC client resource pools.
//!
//! These pools are designed to reuse gRPC resources (connections, clients, and streams) across
//! multiple concurrent callers (i.e. Postgres backends). This avoids the resource cost and latency
//! of creating dedicated TCP connections and server tasks for every Postgres backend.
//!
//! Each resource has its own, nested pool. The pools are custom-built for the properties of each
//! resource -- they are different enough that a generic pool isn't suitable.
//!
//! * ChannelPool: manages gRPC channels (TCP connections) to a single Pageserver. Multiple clients
//! can acquire and use the same channel concurrently (via HTTP/2 stream multiplexing), up to a
//! per-channel client limit. Channels may be closed when they are no longer used by any clients.
//!
//! * ClientPool: manages gRPC clients for a single tenant shard. Each client acquires a (shared)
//! channel from the ChannelPool for the client's lifetime. A client can only be acquired by a
//! single caller at a time, and is returned to the pool when dropped. Idle clients may be removed
//! from the pool after some time, to free up the channel.
//!
//! * StreamPool: manages bidirectional gRPC GetPage streams. Each stream acquires a client from the
//! ClientPool for the stream's lifetime. Internal streams are not exposed to callers; instead, it
//! returns a guard that can be used to send a single request, to properly enforce queue depth and
//! route responses. Internally, the pool will reuse or spin up a suitable stream for the request,
//! possibly pipelining multiple requests from multiple callers on the same stream (up to some
//! queue depth). Idle streams may be removed from the pool after a while to free up the client.
//!
//! Each channel corresponds to one TCP connection. Each client unary request and each stream
//! corresponds to one HTTP/2 stream and server task.
//!
//! TODO: error handling (including custom error types).
//! TODO: observability.
use std::collections::{BTreeMap, HashMap};
use std::num::NonZero;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, Weak};
use std::time::{Duration, Instant};
use futures::StreamExt as _;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc, oneshot};
use tokio_util::sync::CancellationToken;
use tonic::codec::CompressionEncoding;
use tonic::transport::{Channel, Endpoint};
use tracing::{error, warn};
use pageserver_page_api as page_api;
use utils::id::{TenantId, TimelineId};
use utils::shard::ShardIndex;
/// Reap channels/clients/streams that have been idle for this long.
///
/// TODO: this is per-pool. For nested pools, it can take up to 3x as long for a TCP connection to
/// be reaped. First, we must wait for an idle stream to be reaped, which marks its client as idle.
/// Then, we must wait for the idle client to be reaped, which marks its channel as idle. Then, we
/// must wait for the idle channel to be reaped. Is that a problem? Maybe not, we just have to
/// account for it when setting the reap threshold. Alternatively, we can immediately reap empty
/// channels, and/or stream pool clients.
const REAP_IDLE_THRESHOLD: Duration = match cfg!(any(test, feature = "testing")) {
false => Duration::from_secs(180),
true => Duration::from_secs(1), // exercise reaping in tests
};
/// Reap idle resources with this interval.
const REAP_IDLE_INTERVAL: Duration = match cfg!(any(test, feature = "testing")) {
false => Duration::from_secs(10),
true => Duration::from_secs(1), // exercise reaping in tests
};
/// A gRPC channel pool, for a single Pageserver. A channel is shared by many clients (via HTTP/2
/// stream multiplexing), up to `clients_per_channel` -- a new channel will be spun up beyond this.
/// The pool does not limit the number of channels, and instead relies on `ClientPool` or
/// `StreamPool` to limit the number of concurrent clients.
///
/// The pool is always wrapped in an outer `Arc`, to allow long-lived guards across tasks/threads.
///
/// TODO: consider prewarming a set of channels, to avoid initial connection latency.
/// TODO: consider adding a circuit breaker for errors and fail fast.
pub struct ChannelPool {
/// Pageserver endpoint to connect to.
endpoint: Endpoint,
/// Max number of clients per channel. Beyond this, a new channel will be created.
max_clients_per_channel: NonZero<usize>,
/// Open channels.
channels: Mutex<BTreeMap<ChannelID, ChannelEntry>>,
/// Reaps idle channels.
idle_reaper: Reaper,
/// Channel ID generator.
next_channel_id: AtomicUsize,
}
type ChannelID = usize;
struct ChannelEntry {
/// The gRPC channel (i.e. TCP connection). Shared by multiple clients.
channel: Channel,
/// Number of clients using this channel.
clients: usize,
/// The channel has been idle (no clients) since this time. None if channel is in use.
/// INVARIANT: Some if clients == 0, otherwise None.
idle_since: Option<Instant>,
}
impl ChannelPool {
/// Creates a new channel pool for the given Pageserver endpoint.
pub fn new<E>(endpoint: E, max_clients_per_channel: NonZero<usize>) -> anyhow::Result<Arc<Self>>
where
E: TryInto<Endpoint> + Send + Sync + 'static,
<E as TryInto<Endpoint>>::Error: std::error::Error + Send + Sync,
{
let pool = Arc::new(Self {
endpoint: endpoint.try_into()?,
max_clients_per_channel,
channels: Mutex::default(),
idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL),
next_channel_id: AtomicUsize::default(),
});
pool.idle_reaper.spawn(&pool);
Ok(pool)
}
/// Acquires a gRPC channel for a client. Multiple clients may acquire the same channel.
///
/// This never blocks (except for mutex acquisition). The channel is connected lazily on first
/// use, and the `ChannelPool` does not have a channel limit. Channels will be re-established
/// automatically on failure (TODO: verify).
///
/// Callers should not clone the returned channel, and must hold onto the returned guard as long
/// as the channel is in use. It is unfortunately not possible to enforce this: the Protobuf
/// client requires an owned `Channel` and we don't have access to the channel's internal
/// refcount.
///
/// This is not performance-sensitive. It is only called when creating a new client, and clients
/// are pooled and reused by `ClientPool`. The total number of channels will also be small. O(n)
/// performance is therefore okay.
pub fn get(self: &Arc<Self>) -> ChannelGuard {
let mut channels = self.channels.lock().unwrap();
// Try to find an existing channel with available capacity. We check entries in BTreeMap
// order, to fill up the lower-ordered channels first. The ClientPool also prefers clients
// with lower-ordered channel IDs first. This will cluster clients in lower-ordered
// channels, and free up higher-ordered channels such that they can be reaped.
for (&id, entry) in channels.iter_mut() {
assert!(
entry.clients <= self.max_clients_per_channel.get(),
"channel overflow"
);
assert_eq!(
entry.idle_since.is_some(),
entry.clients == 0,
"incorrect channel idle state"
);
if entry.clients < self.max_clients_per_channel.get() {
entry.clients += 1;
entry.idle_since = None;
return ChannelGuard {
pool: Arc::downgrade(self),
id,
channel: Some(entry.channel.clone()),
};
}
}
// Create a new channel. We connect lazily on first use, such that we don't block here and
// other clients can join onto the same channel while it's connecting.
let channel = self.endpoint.connect_lazy();
let id = self.next_channel_id.fetch_add(1, Ordering::Relaxed);
let entry = ChannelEntry {
channel: channel.clone(),
clients: 1, // account for the guard below
idle_since: None,
};
channels.insert(id, entry);
ChannelGuard {
pool: Arc::downgrade(self),
id,
channel: Some(channel),
}
}
}
impl Reapable for ChannelPool {
/// Reaps channels that have been idle since before the cutoff.
fn reap_idle(&self, cutoff: Instant) {
self.channels.lock().unwrap().retain(|_, entry| {
let Some(idle_since) = entry.idle_since else {
assert_ne!(entry.clients, 0, "empty channel not marked idle");
return true;
};
assert_eq!(entry.clients, 0, "idle channel has clients");
idle_since >= cutoff
})
}
}
/// Tracks a channel acquired from the pool. The owned inner channel can be obtained with `take()`,
/// since the gRPC client requires an owned `Channel`.
pub struct ChannelGuard {
pool: Weak<ChannelPool>,
id: ChannelID,
channel: Option<Channel>,
}
impl ChannelGuard {
/// Returns the inner owned channel. Panics if called more than once. The caller must hold onto
/// the guard as long as the channel is in use, and should not clone it.
pub fn take(&mut self) -> Channel {
self.channel.take().expect("channel already taken")
}
}
/// Returns the channel to the pool.
impl Drop for ChannelGuard {
fn drop(&mut self) {
let Some(pool) = self.pool.upgrade() else {
return; // pool was dropped
};
let mut channels = pool.channels.lock().unwrap();
let entry = channels.get_mut(&self.id).expect("unknown channel");
assert!(entry.idle_since.is_none(), "active channel marked idle");
assert!(entry.clients > 0, "channel underflow");
entry.clients -= 1;
if entry.clients == 0 {
entry.idle_since = Some(Instant::now()); // mark channel as idle
}
}
}
/// A pool of gRPC clients for a single tenant shard. Each client acquires a channel from the inner
/// `ChannelPool`. A client is only given out to single caller at a time. The pool limits the total
/// number of concurrent clients to `max_clients` via semaphore.
///
/// The pool is always wrapped in an outer `Arc`, to allow long-lived guards across tasks/threads.
pub struct ClientPool {
/// Tenant ID.
tenant_id: TenantId,
/// Timeline ID.
timeline_id: TimelineId,
/// Shard ID.
shard_id: ShardIndex,
/// Authentication token, if any.
auth_token: Option<String>,
/// Compression to use.
compression: Option<CompressionEncoding>,
/// Channel pool to acquire channels from.
channel_pool: Arc<ChannelPool>,
/// Limits the max number of concurrent clients for this pool. None if the pool is unbounded.
limiter: Option<Arc<Semaphore>>,
/// Idle pooled clients. Acquired clients are removed from here and returned on drop.
///
/// The first client in the map will be acquired next. The map is sorted by client ID, which in
/// turn is sorted by its channel ID, such that we prefer acquiring idle clients from
/// lower-ordered channels. This allows us to free up and reap higher-numbered channels as idle
/// clients are reaped.
idle: Mutex<BTreeMap<ClientID, ClientEntry>>,
/// Reaps idle clients.
idle_reaper: Reaper,
/// Unique client ID generator.
next_client_id: AtomicUsize,
}
type ClientID = (ChannelID, usize);
struct ClientEntry {
/// The pooled gRPC client.
client: page_api::Client,
/// The channel guard for the channel used by the client.
channel_guard: ChannelGuard,
/// The client has been idle since this time. All clients in `ClientPool::idle` are idle by
/// definition, so this is the time when it was added back to the pool.
idle_since: Instant,
}
impl ClientPool {
/// Creates a new client pool for the given tenant shard. Channels are acquired from the given
/// `ChannelPool`, which must point to a Pageserver that hosts the tenant shard. Allows up to
/// `max_clients` concurrent clients, or unbounded if None.
pub fn new(
channel_pool: Arc<ChannelPool>,
tenant_id: TenantId,
timeline_id: TimelineId,
shard_id: ShardIndex,
auth_token: Option<String>,
compression: Option<CompressionEncoding>,
max_clients: Option<NonZero<usize>>,
) -> Arc<Self> {
let pool = Arc::new(Self {
tenant_id,
timeline_id,
shard_id,
auth_token,
compression,
channel_pool,
idle: Mutex::default(),
idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL),
limiter: max_clients.map(|max| Arc::new(Semaphore::new(max.get()))),
next_client_id: AtomicUsize::default(),
});
pool.idle_reaper.spawn(&pool);
pool
}
/// Gets a client from the pool, or creates a new one if necessary. Connections are established
/// lazily and do not block, but this call can block if the pool is at `max_clients`. The client
/// is returned to the pool when the guard is dropped.
///
/// This is moderately performance-sensitive. It is called for every unary request, but these
/// establish a new gRPC stream per request so they're already expensive. GetPage requests use
/// the `StreamPool` instead.
pub async fn get(self: &Arc<Self>) -> anyhow::Result<ClientGuard> {
// Acquire a permit if the pool is bounded.
let mut permit = None;
if let Some(limiter) = self.limiter.clone() {
permit = Some(limiter.acquire_owned().await.expect("never closed"));
}
// Fast path: acquire an idle client from the pool.
if let Some((id, entry)) = self.idle.lock().unwrap().pop_first() {
return Ok(ClientGuard {
pool: Arc::downgrade(self),
id,
client: Some(entry.client),
channel_guard: Some(entry.channel_guard),
permit,
});
}
// Slow path: construct a new client.
let mut channel_guard = self.channel_pool.get();
let client = page_api::Client::new(
channel_guard.take(),
self.tenant_id,
self.timeline_id,
self.shard_id,
self.auth_token.clone(),
self.compression,
)?;
Ok(ClientGuard {
pool: Arc::downgrade(self),
id: (
channel_guard.id,
self.next_client_id.fetch_add(1, Ordering::Relaxed),
),
client: Some(client),
channel_guard: Some(channel_guard),
permit,
})
}
}
impl Reapable for ClientPool {
/// Reaps clients that have been idle since before the cutoff.
fn reap_idle(&self, cutoff: Instant) {
self.idle
.lock()
.unwrap()
.retain(|_, entry| entry.idle_since >= cutoff)
}
}
/// A client acquired from the pool. The inner client can be accessed via Deref. The client is
/// returned to the pool when dropped.
pub struct ClientGuard {
pool: Weak<ClientPool>,
id: ClientID,
client: Option<page_api::Client>, // Some until dropped
channel_guard: Option<ChannelGuard>, // Some until dropped
permit: Option<OwnedSemaphorePermit>, // None if pool is unbounded
}
impl Deref for ClientGuard {
type Target = page_api::Client;
fn deref(&self) -> &Self::Target {
self.client.as_ref().expect("not dropped")
}
}
impl DerefMut for ClientGuard {
fn deref_mut(&mut self) -> &mut Self::Target {
self.client.as_mut().expect("not dropped")
}
}
/// Returns the client to the pool.
impl Drop for ClientGuard {
fn drop(&mut self) {
let Some(pool) = self.pool.upgrade() else {
return; // pool was dropped
};
let entry = ClientEntry {
client: self.client.take().expect("dropped once"),
channel_guard: self.channel_guard.take().expect("dropped once"),
idle_since: Instant::now(),
};
pool.idle.lock().unwrap().insert(self.id, entry);
_ = self.permit; // returned on drop, referenced for visibility
}
}
/// A pool of bidirectional gRPC streams. Currently only used for GetPage streams. Each stream
/// acquires a client from the inner `ClientPool` for the stream's lifetime.
///
/// Individual streams are not exposed to callers -- instead, the returned guard can be used to send
/// a single request and await the response. Internally, requests are multiplexed across streams and
/// channels. This allows proper queue depth enforcement and response routing.
///
/// TODO: consider making this generic over request and response types; not currently needed.
pub struct StreamPool {
/// The client pool to acquire clients from. Must be unbounded.
client_pool: Arc<ClientPool>,
/// All pooled streams.
///
/// Incoming requests will be sent over an existing stream with available capacity. If all
/// streams are full, a new one is spun up and added to the pool (up to `max_streams`). Each
/// stream has an associated Tokio task that processes requests and responses.
streams: Mutex<HashMap<StreamID, StreamEntry>>,
/// The max number of concurrent streams, or None if unbounded.
max_streams: Option<NonZero<usize>>,
/// The max number of concurrent requests per stream.
max_queue_depth: NonZero<usize>,
/// Limits the max number of concurrent requests, given by `max_streams * max_queue_depth`.
/// None if the pool is unbounded.
limiter: Option<Arc<Semaphore>>,
/// Reaps idle streams.
idle_reaper: Reaper,
/// Stream ID generator.
next_stream_id: AtomicUsize,
}
type StreamID = usize;
type RequestSender = Sender<(page_api::GetPageRequest, ResponseSender)>;
type RequestReceiver = Receiver<(page_api::GetPageRequest, ResponseSender)>;
type ResponseSender = oneshot::Sender<tonic::Result<page_api::GetPageResponse>>;
struct StreamEntry {
/// Sends caller requests to the stream task. The stream task exits when this is dropped.
sender: RequestSender,
/// Number of in-flight requests on this stream.
queue_depth: usize,
/// The time when this stream went idle (queue_depth == 0).
/// INVARIANT: Some if queue_depth == 0, otherwise None.
idle_since: Option<Instant>,
}
impl StreamPool {
/// Creates a new stream pool, using the given client pool. It will send up to `max_queue_depth`
/// concurrent requests on each stream, and use up to `max_streams` concurrent streams.
///
/// The client pool must be unbounded. The stream pool will enforce its own limits, and because
/// streams are long-lived they can cause persistent starvation if they exhaust the client pool.
/// The stream pool should generally have its own dedicated client pool (but it can share a
/// channel pool with others since these are always unbounded).
pub fn new(
client_pool: Arc<ClientPool>,
max_streams: Option<NonZero<usize>>,
max_queue_depth: NonZero<usize>,
) -> Arc<Self> {
assert!(client_pool.limiter.is_none(), "bounded client pool");
let pool = Arc::new(Self {
client_pool,
streams: Mutex::default(),
limiter: max_streams.map(|max_streams| {
Arc::new(Semaphore::new(max_streams.get() * max_queue_depth.get()))
}),
max_streams,
max_queue_depth,
idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL),
next_stream_id: AtomicUsize::default(),
});
pool.idle_reaper.spawn(&pool);
pool
}
/// Acquires an available stream from the pool, or spins up a new stream async if all streams
/// are full. Returns a guard that can be used to send a single request on the stream and await
/// the response, with queue depth quota already acquired. Blocks if the pool is at capacity
/// (i.e. `CLIENT_LIMIT * STREAM_QUEUE_DEPTH` requests in flight).
///
/// This is very performance-sensitive, as it is on the GetPage hot path.
///
/// TODO: this must do something more sophisticated for performance. We want:
///
/// * Cheap, concurrent access in the common case where we can use a pooled stream.
/// * Quick acquisition of pooled streams with available capacity.
/// * Prefer streams that belong to lower-numbered channels, to reap idle channels.
/// * Prefer filling up existing streams' queue depth before spinning up new streams.
/// * Don't hold a lock while spinning up new streams.
/// * Allow concurrent clients to join onto streams while they're spun up.
/// * Allow spinning up multiple streams concurrently, but don't overshoot limits.
///
/// For now, we just do something simple but inefficient (linear scan under mutex).
pub async fn get(self: &Arc<Self>) -> StreamGuard {
// Acquire a permit if the pool is bounded.
let mut permit = None;
if let Some(limiter) = self.limiter.clone() {
permit = Some(limiter.acquire_owned().await.expect("never closed"));
}
let mut streams = self.streams.lock().unwrap();
// Look for a pooled stream with available capacity.
for (&id, entry) in streams.iter_mut() {
assert!(
entry.queue_depth <= self.max_queue_depth.get(),
"stream queue overflow"
);
assert_eq!(
entry.idle_since.is_some(),
entry.queue_depth == 0,
"incorrect stream idle state"
);
if entry.queue_depth < self.max_queue_depth.get() {
entry.queue_depth += 1;
entry.idle_since = None;
return StreamGuard {
pool: Arc::downgrade(self),
id,
sender: entry.sender.clone(),
permit,
};
}
}
// No available stream, spin up a new one. We install the stream entry in the pool first and
// return the guard, while spinning up the stream task async. This allows other callers to
// join onto this stream and also create additional streams concurrently if this fills up.
let id = self.next_stream_id.fetch_add(1, Ordering::Relaxed);
let (req_tx, req_rx) = mpsc::channel(self.max_queue_depth.get());
let entry = StreamEntry {
sender: req_tx.clone(),
queue_depth: 1, // reserve quota for this caller
idle_since: None,
};
streams.insert(id, entry);
if let Some(max_streams) = self.max_streams {
assert!(streams.len() <= max_streams.get(), "stream overflow");
};
let client_pool = self.client_pool.clone();
let pool = Arc::downgrade(self);
tokio::spawn(async move {
if let Err(err) = Self::run_stream(client_pool, req_rx).await {
error!("stream failed: {err}");
}
// Remove stream from pool on exit. Weak reference to avoid holding the pool alive.
if let Some(pool) = pool.upgrade() {
let entry = pool.streams.lock().unwrap().remove(&id);
assert!(entry.is_some(), "unknown stream ID: {id}");
}
});
StreamGuard {
pool: Arc::downgrade(self),
id,
sender: req_tx,
permit,
}
}
/// Runs a stream task. This acquires a client from the `ClientPool` and establishes a
/// bidirectional GetPage stream, then forwards requests and responses between callers and the
/// stream. It does not track or enforce queue depths -- that's done by `get()` since it must be
/// atomic with pool stream acquisition.
///
/// The task exits when the request channel is closed, or on a stream error. The caller is
/// responsible for removing the stream from the pool on exit.
async fn run_stream(
client_pool: Arc<ClientPool>,
mut caller_rx: RequestReceiver,
) -> anyhow::Result<()> {
// Acquire a client from the pool and create a stream.
let mut client = client_pool.get().await?;
// NB: use an unbounded channel such that the stream send never blocks. Otherwise, we could
// theoretically deadlock if both the client and server block on sends (since we're not
// reading responses while sending). This is unlikely to happen due to gRPC/TCP buffers and
// low queue depths, but it was seen to happen with the libpq protocol so better safe than
// sorry. It should never buffer more than the queue depth anyway, but using an unbounded
// channel guarantees that it will never block.
let (req_tx, req_rx) = mpsc::unbounded_channel();
let req_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(req_rx);
let mut resp_stream = client.get_pages(req_stream).await?;
// Track caller response channels by request ID. If the task returns early, these response
// channels will be dropped and the waiting callers will receive an error.
//
// NB: this will leak entries if the server doesn't respond to a request (by request ID).
// It shouldn't happen, and if it does it will often hold onto queue depth quota anyway and
// block further use. But we could consider reaping closed channels after some time.
let mut callers = HashMap::new();
// Process requests and responses.
loop {
tokio::select! {
// Receive requests from callers and send them to the stream.
req = caller_rx.recv() => {
// Shut down if request channel is closed.
let Some((req, resp_tx)) = req else {
return Ok(());
};
// Store the response channel by request ID.
if callers.contains_key(&req.request_id) {
// Error on request ID duplicates. Ignore callers that went away.
_ = resp_tx.send(Err(tonic::Status::invalid_argument(
format!("duplicate request ID: {}", req.request_id),
)));
continue;
}
callers.insert(req.request_id, resp_tx);
// Send the request on the stream. Bail out if the stream is closed.
req_tx.send(req).map_err(|_| {
tonic::Status::unavailable("stream closed")
})?;
}
// Receive responses from the stream and send them to callers.
resp = resp_stream.next() => {
// Shut down if the stream is closed, and bail out on stream errors.
let Some(resp) = resp.transpose()? else {
return Ok(())
};
// Send the response to the caller. Ignore errors if the caller went away.
let Some(resp_tx) = callers.remove(&resp.request_id) else {
warn!("received response for unknown request ID: {}", resp.request_id);
continue;
};
_ = resp_tx.send(Ok(resp));
}
}
}
}
}
impl Reapable for StreamPool {
/// Reaps streams that have been idle since before the cutoff.
fn reap_idle(&self, cutoff: Instant) {
self.streams.lock().unwrap().retain(|_, entry| {
let Some(idle_since) = entry.idle_since else {
assert_ne!(entry.queue_depth, 0, "empty stream not marked idle");
return true;
};
assert_eq!(entry.queue_depth, 0, "idle stream has requests");
idle_since >= cutoff
});
}
}
/// A pooled stream reference. Can be used to send a single request, to properly enforce queue
/// depth. Queue depth is already reserved and will be returned on drop.
pub struct StreamGuard {
pool: Weak<StreamPool>,
id: StreamID,
sender: RequestSender,
permit: Option<OwnedSemaphorePermit>, // None if pool is unbounded
}
impl StreamGuard {
/// Sends a request on the stream and awaits the response. Consumes the guard, since it's only
/// valid for a single request (to enforce queue depth). This also drops the guard on return and
/// returns the queue depth quota to the pool.
///
/// The `GetPageRequest::request_id` must be unique across in-flight requests.
///
/// NB: errors are often returned as `GetPageResponse::status_code` instead of `tonic::Status`
/// to avoid tearing down the stream for per-request errors. Callers must check this.
pub async fn send(
self,
req: page_api::GetPageRequest,
) -> tonic::Result<page_api::GetPageResponse> {
let (resp_tx, resp_rx) = oneshot::channel();
self.sender
.send((req, resp_tx))
.await
.map_err(|_| tonic::Status::unavailable("stream closed"))?;
resp_rx
.await
.map_err(|_| tonic::Status::unavailable("stream closed"))?
}
}
impl Drop for StreamGuard {
fn drop(&mut self) {
let Some(pool) = self.pool.upgrade() else {
return; // pool was dropped
};
// Release the queue depth reservation on drop. This can prematurely decrement it if dropped
// before the response is received, but that's okay.
//
// TODO: actually, it's probably not okay. Queue depth release should be moved into the
// stream task, such that it continues to account for the queue depth slot until the server
// responds. Otherwise, if a slow request times out and keeps blocking the stream, the
// server will keep waiting on it and we can pile on subsequent requests (including the
// timeout retry) in the same stream and get blocked. But we may also want to avoid blocking
// requests on e.g. LSN waits and layer downloads, instead returning early to free up the
// stream. Or just scale out streams with a queue depth of 1 to sidestep all head-of-line
// blocking. TBD.
let mut streams = pool.streams.lock().unwrap();
let entry = streams.get_mut(&self.id).expect("unknown stream");
assert!(entry.idle_since.is_none(), "active stream marked idle");
assert!(entry.queue_depth > 0, "stream queue underflow");
entry.queue_depth -= 1;
if entry.queue_depth == 0 {
entry.idle_since = Some(Instant::now()); // mark stream as idle
}
_ = self.permit; // returned on drop, referenced for visibility
}
}
/// Periodically reaps idle resources from a pool.
struct Reaper {
/// The task check interval.
interval: Duration,
/// The threshold for reaping idle resources.
threshold: Duration,
/// Cancels the reaper task. Cancelled when the reaper is dropped.
cancel: CancellationToken,
}
impl Reaper {
/// Creates a new reaper.
pub fn new(threshold: Duration, interval: Duration) -> Self {
Self {
cancel: CancellationToken::new(),
threshold,
interval,
}
}
/// Spawns a task to periodically reap idle resources from the given task pool. The task is
/// cancelled when the reaper is dropped.
pub fn spawn(&self, pool: &Arc<impl Reapable>) {
// NB: hold a weak pool reference, otherwise the task will prevent dropping the pool.
let pool = Arc::downgrade(pool);
let cancel = self.cancel.clone();
let (interval, threshold) = (self.interval, self.threshold);
tokio::spawn(async move {
loop {
tokio::select! {
_ = tokio::time::sleep(interval) => {
let Some(pool) = pool.upgrade() else {
return; // pool was dropped
};
pool.reap_idle(Instant::now() - threshold);
}
_ = cancel.cancelled() => return,
}
}
});
}
}
impl Drop for Reaper {
fn drop(&mut self) {
self.cancel.cancel(); // cancel reaper task
}
}
/// A reapable resource pool.
trait Reapable: Send + Sync + 'static {
/// Reaps resources that have been idle since before the given cutoff.
fn reap_idle(&self, cutoff: Instant);
}

View File

@@ -1,154 +0,0 @@
use std::time::Duration;
use tokio::time::Instant;
use tracing::{error, info, warn};
use utils::backoff::exponential_backoff_duration;
/// A retry handler for Pageserver gRPC requests.
///
/// This is used instead of backoff::retry for better control and observability.
pub struct Retry;
impl Retry {
/// The per-request timeout.
// TODO: tune these, and/or make them configurable. Should we retry forever?
const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
/// The total timeout across all attempts
const TOTAL_TIMEOUT: Duration = Duration::from_secs(60);
/// The initial backoff duration.
const BASE_BACKOFF: Duration = Duration::from_millis(10);
/// The maximum backoff duration.
const MAX_BACKOFF: Duration = Duration::from_secs(10);
/// If true, log successful requests. For debugging.
const LOG_SUCCESS: bool = false;
/// Runs the given async closure with timeouts and retries (exponential backoff), passing the
/// attempt number starting at 0. Logs errors, using the current tracing span for context.
///
/// Only certain gRPC status codes are retried, see [`Self::should_retry`]. For default
/// timeouts, see [`Self::REQUEST_TIMEOUT`] and [`Self::TOTAL_TIMEOUT`].
pub async fn with<T, F, O>(&self, mut f: F) -> tonic::Result<T>
where
F: FnMut(usize) -> O, // takes attempt number, starting at 0
O: Future<Output = tonic::Result<T>>,
{
let started = Instant::now();
let deadline = started + Self::TOTAL_TIMEOUT;
let mut last_error = None;
let mut retries = 0;
loop {
// Set up a future to wait for the backoff (if any) and run the request with a timeout.
let backoff_and_try = async {
// NB: sleep() always sleeps 1ms, even when given a 0 argument. See:
// https://github.com/tokio-rs/tokio/issues/6866
if let Some(backoff) = Self::backoff_duration(retries) {
tokio::time::sleep(backoff).await;
}
let request_started = Instant::now();
tokio::time::timeout(Self::REQUEST_TIMEOUT, f(retries))
.await
.map_err(|_| {
tonic::Status::deadline_exceeded(format!(
"request timed out after {:.3}s",
request_started.elapsed().as_secs_f64()
))
})?
};
// Wait for the backoff and request, or bail out if the total timeout is exceeded.
let result = tokio::select! {
result = backoff_and_try => result,
_ = tokio::time::sleep_until(deadline) => {
let last_error = last_error.unwrap_or_else(|| {
tonic::Status::deadline_exceeded(format!(
"request timed out after {:.3}s",
started.elapsed().as_secs_f64()
))
});
error!(
"giving up after {:.3}s and {retries} retries, last error {:?}: {}",
started.elapsed().as_secs_f64(), last_error.code(), last_error.message(),
);
return Err(last_error);
}
};
match result {
// Success, return the result.
Ok(result) => {
if retries > 0 || Self::LOG_SUCCESS {
info!(
"request succeeded after {retries} retries in {:.3}s",
started.elapsed().as_secs_f64(),
);
}
return Ok(result);
}
// Error, retry or bail out.
Err(status) => {
let (code, message) = (status.code(), status.message());
let attempt = retries + 1;
if !Self::should_retry(code) {
// NB: include the attempt here too. This isn't necessarily the first
// attempt, because the error may change between attempts.
error!(
"request failed with {code:?}: {message}, not retrying (attempt {attempt})"
);
return Err(status);
}
warn!("request failed with {code:?}: {message}, retrying (attempt {attempt})");
retries += 1;
last_error = Some(status);
}
}
}
}
/// Returns the backoff duration for the given retry attempt, or None for no backoff.
fn backoff_duration(retry: usize) -> Option<Duration> {
let backoff = exponential_backoff_duration(
retry as u32,
Self::BASE_BACKOFF.as_secs_f64(),
Self::MAX_BACKOFF.as_secs_f64(),
);
(!backoff.is_zero()).then_some(backoff)
}
/// Returns true if the given status code should be retries.
fn should_retry(code: tonic::Code) -> bool {
match code {
tonic::Code::Ok => panic!("unexpected Ok status code"),
// These codes are transient, so retry them.
tonic::Code::Aborted => true,
tonic::Code::Cancelled => true,
tonic::Code::DeadlineExceeded => true, // maybe transient slowness
tonic::Code::ResourceExhausted => true,
tonic::Code::Unavailable => true,
// The following codes will like continue to fail, so don't retry.
tonic::Code::AlreadyExists => false,
tonic::Code::DataLoss => false,
tonic::Code::FailedPrecondition => false,
// NB: don't retry Internal. It is intended for serious errors such as invariant
// violations, and is also used for client-side invariant checks that would otherwise
// result in retry loops.
tonic::Code::Internal => false,
tonic::Code::InvalidArgument => false,
tonic::Code::NotFound => false,
tonic::Code::OutOfRange => false,
tonic::Code::PermissionDenied => false,
tonic::Code::Unauthenticated => false,
tonic::Code::Unimplemented => false,
tonic::Code::Unknown => false,
}
}
}

View File

@@ -1,209 +0,0 @@
use std::collections::HashMap;
use bytes::Bytes;
use pageserver_api::key::rel_block_to_key;
use pageserver_api::shard::{ShardStripeSize, key_to_shard_number};
use pageserver_page_api as page_api;
use utils::shard::{ShardCount, ShardIndex, ShardNumber};
/// Splits GetPageRequests that straddle shard boundaries and assembles the responses.
/// TODO: add tests for this.
pub struct GetPageSplitter {
/// Split requests by shard index.
requests: HashMap<ShardIndex, page_api::GetPageRequest>,
/// The response being assembled. Preallocated with empty pages, to be filled in.
response: page_api::GetPageResponse,
/// Maps the offset in `request.block_numbers` and `response.pages` to the owning shard. Used
/// to assemble the response pages in the same order as the original request.
block_shards: Vec<ShardIndex>,
}
impl GetPageSplitter {
/// Checks if the given request only touches a single shard, and returns the shard ID. This is
/// the common case, so we check first in order to avoid unnecessary allocations and overhead.
pub fn for_single_shard(
req: &page_api::GetPageRequest,
count: ShardCount,
stripe_size: ShardStripeSize,
) -> Option<ShardIndex> {
// Fast path: unsharded tenant.
if count.is_unsharded() {
return Some(ShardIndex::unsharded());
}
// Find the first page's shard, for comparison. If there are no pages, just return the first
// shard (caller likely checked already, otherwise the server will reject it).
let Some(&first_page) = req.block_numbers.first() else {
return Some(ShardIndex::new(ShardNumber(0), count));
};
let key = rel_block_to_key(req.rel, first_page);
let shard_number = key_to_shard_number(count, stripe_size, &key);
req.block_numbers
.iter()
.skip(1) // computed above
.all(|&blkno| {
let key = rel_block_to_key(req.rel, blkno);
key_to_shard_number(count, stripe_size, &key) == shard_number
})
.then_some(ShardIndex::new(shard_number, count))
}
/// Splits the given request.
pub fn split(
req: page_api::GetPageRequest,
count: ShardCount,
stripe_size: ShardStripeSize,
) -> Self {
// The caller should make sure we don't split requests unnecessarily.
debug_assert!(
Self::for_single_shard(&req, count, stripe_size).is_none(),
"unnecessary request split"
);
// Split the requests by shard index.
let mut requests = HashMap::with_capacity(2); // common case
let mut block_shards = Vec::with_capacity(req.block_numbers.len());
for &blkno in &req.block_numbers {
let key = rel_block_to_key(req.rel, blkno);
let shard_number = key_to_shard_number(count, stripe_size, &key);
let shard_id = ShardIndex::new(shard_number, count);
requests
.entry(shard_id)
.or_insert_with(|| page_api::GetPageRequest {
request_id: req.request_id,
request_class: req.request_class,
rel: req.rel,
read_lsn: req.read_lsn,
block_numbers: Vec::new(),
})
.block_numbers
.push(blkno);
block_shards.push(shard_id);
}
// Construct a response to be populated by shard responses. Preallocate empty page slots
// with the expected block numbers.
let response = page_api::GetPageResponse {
request_id: req.request_id,
status_code: page_api::GetPageStatusCode::Ok,
reason: None,
rel: req.rel,
pages: req
.block_numbers
.into_iter()
.map(|block_number| {
page_api::Page {
block_number,
image: Bytes::new(), // empty page slot to be filled in
}
})
.collect(),
};
Self {
requests,
response,
block_shards,
}
}
/// Drains the per-shard requests, moving them out of the splitter to avoid extra allocations.
pub fn drain_requests(
&mut self,
) -> impl Iterator<Item = (ShardIndex, page_api::GetPageRequest)> {
self.requests.drain()
}
/// Adds a response from the given shard. The response must match the request ID and have an OK
/// status code. A response must not already exist for the given shard ID.
#[allow(clippy::result_large_err)]
pub fn add_response(
&mut self,
shard_id: ShardIndex,
response: page_api::GetPageResponse,
) -> tonic::Result<()> {
// The caller should already have converted status codes into tonic::Status.
if response.status_code != page_api::GetPageStatusCode::Ok {
return Err(tonic::Status::internal(format!(
"unexpected non-OK response for shard {shard_id}: {} {}",
response.status_code,
response.reason.unwrap_or_default()
)));
}
if response.request_id != self.response.request_id {
return Err(tonic::Status::internal(format!(
"response ID mismatch for shard {shard_id}: expected {}, got {}",
self.response.request_id, response.request_id
)));
}
// Place the shard response pages into the assembled response, in request order.
let mut pages = response.pages.into_iter();
for (i, &s) in self.block_shards.iter().enumerate() {
if shard_id != s {
continue;
}
let Some(slot) = self.response.pages.get_mut(i) else {
return Err(tonic::Status::internal(format!(
"no block_shards slot {i} for shard {shard_id}"
)));
};
let Some(page) = pages.next() else {
return Err(tonic::Status::internal(format!(
"missing page {} in shard {shard_id} response",
slot.block_number
)));
};
if page.block_number != slot.block_number {
return Err(tonic::Status::internal(format!(
"shard {shard_id} returned wrong page at index {i}, expected {} got {}",
slot.block_number, page.block_number
)));
}
if !slot.image.is_empty() {
return Err(tonic::Status::internal(format!(
"shard {shard_id} returned duplicate page {} at index {i}",
slot.block_number
)));
}
*slot = page;
}
// Make sure we've consumed all pages from the shard response.
if let Some(extra_page) = pages.next() {
return Err(tonic::Status::internal(format!(
"shard {shard_id} returned extra page: {}",
extra_page.block_number
)));
}
Ok(())
}
/// Fetches the final, assembled response.
#[allow(clippy::result_large_err)]
pub fn get_response(self) -> tonic::Result<page_api::GetPageResponse> {
// Check that the response is complete.
for (i, page) in self.response.pages.iter().enumerate() {
if page.image.is_empty() {
return Err(tonic::Status::internal(format!(
"missing page {} for shard {}",
page.block_number,
self.block_shards
.get(i)
.map(|s| s.to_string())
.unwrap_or_else(|| "?".to_string())
)));
}
}
Ok(self.response)
}
}

View File

@@ -17,7 +17,6 @@ pageserver = { path = ".." }
pageserver_api.workspace = true
remote_storage = { path = "../../libs/remote_storage" }
postgres_ffi.workspace = true
serde.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio-util.workspace = true

View File

@@ -1,85 +0,0 @@
use camino::Utf8PathBuf;
use clap::Parser;
use tokio_util::sync::CancellationToken;
/// Download a specific object from remote storage to a local file.
///
/// The remote storage configuration is supplied via the `REMOTE_STORAGE_CONFIG` environment
/// variable, in the same TOML format that the pageserver itself understands. This allows the
/// command to work with any cloud supported by the `remote_storage` crate (currently AWS S3,
/// Azure Blob Storage and local files), as long as the credentials are available via the
/// standard environment variables expected by the underlying SDKs.
///
/// Examples for setting the environment variable:
///
/// ```bash
/// # AWS S3 (region can also be provided via AWS_REGION)
/// export REMOTE_STORAGE_CONFIG='remote_storage = { bucket_name = "my-bucket", bucket_region = "us-east-2" }'
///
/// # Azure Blob Storage (account key picked up from AZURE_STORAGE_ACCOUNT_KEY)
/// export REMOTE_STORAGE_CONFIG='remote_storage = { container = "my-container", account = "my-account" }'
/// ```
#[derive(Parser)]
pub(crate) struct DownloadRemoteObjectCmd {
/// Key / path of the object to download (relative to the remote storage prefix).
///
/// Examples:
/// "wal/3aa8f.../00000001000000000000000A"
/// "pageserver/v1/tenants/<tenant_id>/timelines/<timeline_id>/layer_12345"
pub remote_path: String,
/// Path of the local file to create. Existing file will be overwritten.
///
/// Examples:
/// "./segment"
/// "/tmp/layer_12345.parquet"
pub output_file: Utf8PathBuf,
}
pub(crate) async fn main(cmd: &DownloadRemoteObjectCmd) -> anyhow::Result<()> {
use remote_storage::{DownloadOpts, GenericRemoteStorage, RemotePath, RemoteStorageConfig};
// Fetch remote storage configuration from the environment
let config_str = std::env::var("REMOTE_STORAGE_CONFIG").map_err(|_| {
anyhow::anyhow!(
"'REMOTE_STORAGE_CONFIG' environment variable must be set to a valid remote storage TOML config"
)
})?;
let config = RemoteStorageConfig::from_toml_str(&config_str)?;
// Initialise remote storage client
let storage = GenericRemoteStorage::from_config(&config).await?;
// RemotePath must be relative leading slashes confuse the parser.
let remote_path_str = cmd.remote_path.trim_start_matches('/');
let remote_path = RemotePath::from_string(remote_path_str)?;
let cancel = CancellationToken::new();
println!(
"Downloading '{remote_path}' from remote storage bucket {:?} ...",
config.storage.bucket_name()
);
// Start the actual download
let download = storage
.download(&remote_path, &DownloadOpts::default(), &cancel)
.await?;
// Stream to file
let mut reader = tokio_util::io::StreamReader::new(download.download_stream);
let tmp_path = cmd.output_file.with_extension("tmp");
let mut file = tokio::fs::File::create(&tmp_path).await?;
tokio::io::copy(&mut reader, &mut file).await?;
file.sync_all().await?;
// Atomically move into place
tokio::fs::rename(&tmp_path, &cmd.output_file).await?;
println!(
"Downloaded to '{}'. Last modified: {:?}, etag: {}",
cmd.output_file, download.last_modified, download.etag
);
Ok(())
}

View File

@@ -1,180 +1,10 @@
use std::str::FromStr;
use anyhow::{Context, Ok};
use anyhow::Context;
use camino::Utf8PathBuf;
use pageserver::tenant::{
IndexPart,
layer_map::{LayerMap, SearchResult},
remote_timeline_client::{index::LayerFileMetadata, remote_layer_path},
storage_layer::{LayerName, LayerVisibilityHint, PersistentLayerDesc, ReadableLayerWeak},
};
use pageserver_api::key::Key;
use serde::Serialize;
use std::collections::BTreeMap;
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
shard::TenantShardId,
};
use pageserver::tenant::IndexPart;
#[derive(clap::Subcommand)]
pub(crate) enum IndexPartCmd {
Dump {
path: Utf8PathBuf,
},
/// Find all layers that need to be searched to construct the given page at the given LSN.
Search {
#[arg(long)]
tenant_id: String,
#[arg(long)]
timeline_id: String,
#[arg(long)]
path: Utf8PathBuf,
#[arg(long)]
key: String,
#[arg(long)]
lsn: String,
},
/// List all visible delta and image layers at the latest LSN.
ListVisibleLayers {
#[arg(long)]
path: Utf8PathBuf,
},
}
fn create_layer_map_from_index_part(
index_part: &IndexPart,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
) -> LayerMap {
let mut layer_map = LayerMap::default();
{
let mut updates = layer_map.batch_update();
for (key, value) in index_part.layer_metadata.iter() {
updates.insert_historic(PersistentLayerDesc::from_filename(
tenant_shard_id,
timeline_id,
key.clone(),
value.file_size,
));
}
}
layer_map
}
async fn search_layers(
tenant_id: &str,
timeline_id: &str,
path: &Utf8PathBuf,
key: &str,
lsn: &str,
) -> anyhow::Result<()> {
let tenant_id = TenantId::from_str(tenant_id).unwrap();
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
let timeline_id = TimelineId::from_str(timeline_id).unwrap();
let index_json = {
let bytes = tokio::fs::read(path).await?;
IndexPart::from_json_bytes(&bytes).unwrap()
};
let layer_map = create_layer_map_from_index_part(&index_json, tenant_shard_id, timeline_id);
let key = Key::from_hex(key)?;
let lsn = Lsn::from_str(lsn).unwrap();
let mut end_lsn = lsn;
loop {
let result = layer_map.search(key, end_lsn);
match result {
Some(SearchResult { layer, lsn_floor }) => {
let disk_layer = match layer {
ReadableLayerWeak::PersistentLayer(layer) => layer,
ReadableLayerWeak::InMemoryLayer(_) => {
anyhow::bail!("unexpected in-memory layer")
}
};
let metadata = index_json
.layer_metadata
.get(&disk_layer.layer_name())
.unwrap();
println!(
"{}",
remote_layer_path(
&tenant_id,
&timeline_id,
metadata.shard,
&disk_layer.layer_name(),
metadata.generation
)
);
end_lsn = lsn_floor;
}
None => break,
}
}
Ok(())
}
#[derive(Debug, Clone, Serialize)]
struct VisibleLayers {
pub total_images: u64,
pub total_image_bytes: u64,
pub total_deltas: u64,
pub total_delta_bytes: u64,
pub layer_metadata: BTreeMap<LayerName, LayerFileMetadata>,
}
impl VisibleLayers {
pub fn new() -> Self {
Self {
layer_metadata: BTreeMap::new(),
total_images: 0,
total_image_bytes: 0,
total_deltas: 0,
total_delta_bytes: 0,
}
}
pub fn add_layer(&mut self, name: LayerName, layer: LayerFileMetadata) {
match name {
LayerName::Image(_) => {
self.total_images += 1;
self.total_image_bytes += layer.file_size;
}
LayerName::Delta(_) => {
self.total_deltas += 1;
self.total_delta_bytes += layer.file_size;
}
}
self.layer_metadata.insert(name, layer);
}
}
async fn list_visible_layers(path: &Utf8PathBuf) -> anyhow::Result<()> {
let tenant_id = TenantId::generate();
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
let timeline_id = TimelineId::generate();
let bytes = tokio::fs::read(path).await.context("read file")?;
let index_part = IndexPart::from_json_bytes(&bytes).context("deserialize")?;
let layer_map = create_layer_map_from_index_part(&index_part, tenant_shard_id, timeline_id);
let mut visible_layers = VisibleLayers::new();
let (layers, _key_space) = layer_map.get_visibility(Vec::new());
for (layer, visibility) in layers {
if visibility == LayerVisibilityHint::Visible {
visible_layers.add_layer(
layer.layer_name(),
index_part
.layer_metadata
.get(&layer.layer_name())
.unwrap()
.clone(),
);
}
}
let output = serde_json::to_string_pretty(&visible_layers).context("serialize output")?;
println!("{output}");
Ok(())
Dump { path: Utf8PathBuf },
}
pub(crate) async fn main(cmd: &IndexPartCmd) -> anyhow::Result<()> {
@@ -186,13 +16,5 @@ pub(crate) async fn main(cmd: &IndexPartCmd) -> anyhow::Result<()> {
println!("{output}");
Ok(())
}
IndexPartCmd::Search {
tenant_id,
timeline_id,
path,
key,
lsn,
} => search_layers(tenant_id, timeline_id, path, key, lsn).await,
IndexPartCmd::ListVisibleLayers { path } => list_visible_layers(path).await,
}
}

View File

@@ -4,7 +4,6 @@
//!
//! Separate, `metadata` subcommand allows to print and update pageserver's metadata file.
mod download_remote_object;
mod draw_timeline_dir;
mod index_part;
mod key;
@@ -17,7 +16,6 @@ use std::time::{Duration, SystemTime};
use camino::{Utf8Path, Utf8PathBuf};
use clap::{Parser, Subcommand};
use download_remote_object::DownloadRemoteObjectCmd;
use index_part::IndexPartCmd;
use layers::LayerCmd;
use page_trace::PageTraceCmd;
@@ -65,7 +63,6 @@ enum Commands {
/// Debug print a hex key found from logs
Key(key::DescribeKeyCommand),
PageTrace(PageTraceCmd),
DownloadRemoteObject(DownloadRemoteObjectCmd),
}
/// Read and update pageserver metadata file
@@ -188,9 +185,6 @@ async fn main() -> anyhow::Result<()> {
}
Commands::Key(dkc) => dkc.execute(),
Commands::PageTrace(cmd) => page_trace::main(&cmd)?,
Commands::DownloadRemoteObject(cmd) => {
download_remote_object::main(&cmd).await?;
}
};
Ok(())
}

View File

@@ -153,7 +153,7 @@ message GetDbSizeResponse {
message GetPageRequest {
// A request ID. Will be included in the response. Should be unique for
// in-flight requests on the stream.
RequestID request_id = 1;
uint64 request_id = 1;
// The request class.
GetPageClass request_class = 2;
// The LSN to read at.
@@ -177,14 +177,6 @@ message GetPageRequest {
repeated uint32 block_number = 5;
}
// A Request ID. Should be unique for in-flight requests on a stream. Included in the response.
message RequestID {
// The base request ID.
uint64 id = 1;
// The request attempt. Starts at 0, incremented on each retry.
uint32 attempt = 2;
}
// A GetPageRequest class. Primarily intended for observability, but may also be
// used for prioritization in the future.
enum GetPageClass {
@@ -207,26 +199,13 @@ enum GetPageClass {
// the entire batch is ready, so no one can make use of the individual pages.
message GetPageResponse {
// The original request's ID.
RequestID request_id = 1;
// The response status code. If not OK, the rel and page fields will be empty.
uint64 request_id = 1;
// The response status code.
GetPageStatusCode status_code = 2;
// A string describing the status, if any.
string reason = 3;
// The relation that the pages belong to.
RelTag rel = 4;
// The page(s), in the same order as the request.
repeated Page page = 5;
}
// A page.
//
// TODO: it would be slightly more efficient (but less convenient) to have separate arrays of block
// numbers and images, but given the 8KB page size it's probably negligible. Benchmark it anyway.
message Page {
// The page number.
uint32 block_number = 1;
// The materialized page image, as an 8KB byte vector.
bytes image = 2;
// The 8KB page images, in the same order as the request. Empty if status_code != OK.
repeated bytes page_image = 4;
}
// A GetPageResponse status code.

View File

@@ -1,152 +1,23 @@
use anyhow::Context as _;
use futures::future::ready;
use anyhow::Result;
use futures::{Stream, StreamExt as _, TryStreamExt as _};
use tokio::io::AsyncRead;
use tokio_util::io::StreamReader;
use tonic::codec::CompressionEncoding;
use tonic::metadata::AsciiMetadataValue;
use tonic::service::Interceptor;
use tonic::service::interceptor::InterceptedService;
use tonic::transport::{Channel, Endpoint};
use tonic::metadata::errors::InvalidMetadataValue;
use tonic::transport::Channel;
use tonic::{Request, Streaming};
use utils::id::{TenantId, TimelineId};
use utils::id::TenantId;
use utils::id::TimelineId;
use utils::shard::ShardIndex;
use crate::model::*;
use crate::model;
use crate::proto;
/// A basic Pageserver gRPC client, for a single tenant shard. This API uses native Rust domain
/// types from `model` rather than generated Protobuf types.
pub struct Client {
inner: proto::PageServiceClient<InterceptedService<Channel, AuthInterceptor>>,
}
impl Client {
/// Connects to the given gRPC endpoint.
pub async fn connect<E>(
endpoint: E,
tenant_id: TenantId,
timeline_id: TimelineId,
shard_id: ShardIndex,
auth_token: Option<String>,
compression: Option<CompressionEncoding>,
) -> anyhow::Result<Self>
where
E: TryInto<Endpoint> + Send + Sync + 'static,
<E as TryInto<Endpoint>>::Error: std::error::Error + Send + Sync,
{
let endpoint: Endpoint = endpoint.try_into().context("invalid endpoint")?;
let channel = endpoint.connect().await?;
Self::new(
channel,
tenant_id,
timeline_id,
shard_id,
auth_token,
compression,
)
}
/// Creates a new client using the given gRPC channel.
pub fn new(
channel: Channel,
tenant_id: TenantId,
timeline_id: TimelineId,
shard_id: ShardIndex,
auth_token: Option<String>,
compression: Option<CompressionEncoding>,
) -> anyhow::Result<Self> {
let auth = AuthInterceptor::new(tenant_id, timeline_id, shard_id, auth_token)?;
let mut inner = proto::PageServiceClient::with_interceptor(channel, auth);
if let Some(compression) = compression {
// TODO: benchmark this (including network latency).
inner = inner
.accept_compressed(compression)
.send_compressed(compression);
}
Ok(Self { inner })
}
/// Returns whether a relation exists.
pub async fn check_rel_exists(
&mut self,
req: CheckRelExistsRequest,
) -> tonic::Result<CheckRelExistsResponse> {
let req = proto::CheckRelExistsRequest::from(req);
let resp = self.inner.check_rel_exists(req).await?.into_inner();
Ok(resp.into())
}
/// Fetches a base backup.
pub async fn get_base_backup(
&mut self,
req: GetBaseBackupRequest,
) -> tonic::Result<impl AsyncRead + use<>> {
let req = proto::GetBaseBackupRequest::from(req);
let chunks = self.inner.get_base_backup(req).await?.into_inner();
Ok(StreamReader::new(
chunks
.map_ok(|resp| resp.chunk)
.map_err(std::io::Error::other),
))
}
/// Returns the total size of a database, as # of bytes.
pub async fn get_db_size(&mut self, req: GetDbSizeRequest) -> tonic::Result<GetDbSizeResponse> {
let req = proto::GetDbSizeRequest::from(req);
let resp = self.inner.get_db_size(req).await?.into_inner();
Ok(resp.into())
}
/// Fetches pages.
///
/// This is implemented as a bidirectional streaming RPC for performance. Per-request errors are
/// typically returned as status_code instead of errors, to avoid tearing down the entire stream
/// via a tonic::Status error.
pub async fn get_pages(
&mut self,
reqs: impl Stream<Item = GetPageRequest> + Send + 'static,
) -> tonic::Result<impl Stream<Item = tonic::Result<GetPageResponse>> + Send + 'static> {
let reqs = reqs.map(proto::GetPageRequest::from);
let resps = self.inner.get_pages(reqs).await?.into_inner();
Ok(resps.and_then(|resp| ready(GetPageResponse::try_from(resp).map_err(|err| err.into()))))
}
/// Returns the size of a relation, as # of blocks.
pub async fn get_rel_size(
&mut self,
req: GetRelSizeRequest,
) -> tonic::Result<GetRelSizeResponse> {
let req = proto::GetRelSizeRequest::from(req);
let resp = self.inner.get_rel_size(req).await?.into_inner();
Ok(resp.into())
}
/// Fetches an SLRU segment.
pub async fn get_slru_segment(
&mut self,
req: GetSlruSegmentRequest,
) -> tonic::Result<GetSlruSegmentResponse> {
let req = proto::GetSlruSegmentRequest::from(req);
let resp = self.inner.get_slru_segment(req).await?.into_inner();
Ok(resp.try_into()?)
}
/// Acquires or extends a lease on the given LSN. This guarantees that the Pageserver won't
/// garbage collect the LSN until the lease expires. Must be acquired on all relevant shards.
///
/// Returns the lease expiration time, or a FailedPrecondition status if the lease could not be
/// acquired because the LSN has already been garbage collected.
pub async fn lease_lsn(&mut self, req: LeaseLsnRequest) -> tonic::Result<LeaseLsnResponse> {
let req = proto::LeaseLsnRequest::from(req);
let resp = self.inner.lease_lsn(req).await?.into_inner();
Ok(resp.try_into()?)
}
}
/// Adds authentication metadata to gRPC requests.
///
/// AuthInterceptor adds tenant, timeline, and auth header to the channel. These
/// headers are required at the pageserver.
///
#[derive(Clone)]
struct AuthInterceptor {
tenant_id: AsciiMetadataValue,
@@ -159,29 +30,174 @@ impl AuthInterceptor {
fn new(
tenant_id: TenantId,
timeline_id: TimelineId,
shard_id: ShardIndex,
auth_token: Option<String>,
) -> anyhow::Result<Self> {
shard_id: ShardIndex,
) -> Result<Self, InvalidMetadataValue> {
let tenant_ascii: AsciiMetadataValue = tenant_id.to_string().try_into()?;
let timeline_ascii: AsciiMetadataValue = timeline_id.to_string().try_into()?;
let shard_ascii: AsciiMetadataValue = shard_id.to_string().try_into()?;
let auth_header: Option<AsciiMetadataValue> = match auth_token {
Some(token) => Some(format!("Bearer {token}").try_into()?),
None => None,
};
Ok(Self {
tenant_id: tenant_id.to_string().try_into()?,
timeline_id: timeline_id.to_string().try_into()?,
shard_id: shard_id.to_string().try_into()?,
auth_header: auth_token
.map(|token| format!("Bearer {token}").try_into())
.transpose()?,
tenant_id: tenant_ascii,
shard_id: shard_ascii,
timeline_id: timeline_ascii,
auth_header,
})
}
}
impl Interceptor for AuthInterceptor {
fn call(&mut self, mut req: tonic::Request<()>) -> tonic::Result<tonic::Request<()>> {
let metadata = req.metadata_mut();
metadata.insert("neon-tenant-id", self.tenant_id.clone());
metadata.insert("neon-timeline-id", self.timeline_id.clone());
metadata.insert("neon-shard-id", self.shard_id.clone());
if let Some(ref auth_header) = self.auth_header {
metadata.insert("authorization", auth_header.clone());
impl tonic::service::Interceptor for AuthInterceptor {
fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
req.metadata_mut()
.insert("neon-tenant-id", self.tenant_id.clone());
req.metadata_mut()
.insert("neon-shard-id", self.shard_id.clone());
req.metadata_mut()
.insert("neon-timeline-id", self.timeline_id.clone());
if let Some(auth_header) = &self.auth_header {
req.metadata_mut()
.insert("authorization", auth_header.clone());
}
Ok(req)
}
}
#[derive(Clone)]
pub struct Client {
client: proto::PageServiceClient<
tonic::service::interceptor::InterceptedService<Channel, AuthInterceptor>,
>,
}
impl Client {
pub async fn new<T: TryInto<tonic::transport::Endpoint> + Send + Sync + 'static>(
into_endpoint: T,
tenant_id: TenantId,
timeline_id: TimelineId,
shard_id: ShardIndex,
auth_header: Option<String>,
compression: Option<tonic::codec::CompressionEncoding>,
) -> anyhow::Result<Self> {
let endpoint: tonic::transport::Endpoint = into_endpoint
.try_into()
.map_err(|_e| anyhow::anyhow!("failed to convert endpoint"))?;
let channel = endpoint.connect().await?;
let auth = AuthInterceptor::new(tenant_id, timeline_id, auth_header, shard_id)
.map_err(|e| anyhow::anyhow!(e.to_string()))?;
let mut client = proto::PageServiceClient::with_interceptor(channel, auth);
if let Some(compression) = compression {
// TODO: benchmark this (including network latency).
client = client
.accept_compressed(compression)
.send_compressed(compression);
}
Ok(Self { client })
}
/// Returns whether a relation exists.
pub async fn check_rel_exists(
&mut self,
req: model::CheckRelExistsRequest,
) -> Result<model::CheckRelExistsResponse, tonic::Status> {
let proto_req = proto::CheckRelExistsRequest::from(req);
let response = self.client.check_rel_exists(proto_req).await?;
let proto_resp = response.into_inner();
Ok(proto_resp.into())
}
/// Fetches a base backup.
pub async fn get_base_backup(
&mut self,
req: model::GetBaseBackupRequest,
) -> Result<impl AsyncRead + use<>, tonic::Status> {
let req = proto::GetBaseBackupRequest::from(req);
let chunks = self.client.get_base_backup(req).await?.into_inner();
let reader = StreamReader::new(
chunks
.map_ok(|resp| resp.chunk)
.map_err(std::io::Error::other),
);
Ok(reader)
}
/// Returns the total size of a database, as # of bytes.
pub async fn get_db_size(
&mut self,
req: model::GetDbSizeRequest,
) -> Result<u64, tonic::Status> {
let proto_req = proto::GetDbSizeRequest::from(req);
let response = self.client.get_db_size(proto_req).await?;
Ok(response.into_inner().into())
}
/// Fetches pages.
///
/// This is implemented as a bidirectional streaming RPC for performance.
/// Per-request errors are often returned as status_code instead of errors,
/// to avoid tearing down the entire stream via tonic::Status.
pub async fn get_pages<ReqSt>(
&mut self,
inbound: ReqSt,
) -> Result<
impl Stream<Item = Result<model::GetPageResponse, tonic::Status>> + Send + 'static,
tonic::Status,
>
where
ReqSt: Stream<Item = model::GetPageRequest> + Send + 'static,
{
let outbound_proto = inbound.map(|domain_req| domain_req.into());
let req_new = Request::new(outbound_proto);
let response_stream: Streaming<proto::GetPageResponse> =
self.client.get_pages(req_new).await?.into_inner();
let domain_stream = response_stream.map_ok(model::GetPageResponse::from);
Ok(domain_stream)
}
/// Returns the size of a relation, as # of blocks.
pub async fn get_rel_size(
&mut self,
req: model::GetRelSizeRequest,
) -> Result<model::GetRelSizeResponse, tonic::Status> {
let proto_req = proto::GetRelSizeRequest::from(req);
let response = self.client.get_rel_size(proto_req).await?;
let proto_resp = response.into_inner();
Ok(proto_resp.into())
}
/// Fetches an SLRU segment.
pub async fn get_slru_segment(
&mut self,
req: model::GetSlruSegmentRequest,
) -> Result<model::GetSlruSegmentResponse, tonic::Status> {
let proto_req = proto::GetSlruSegmentRequest::from(req);
let response = self.client.get_slru_segment(proto_req).await?;
Ok(response.into_inner().try_into()?)
}
/// Acquires or extends a lease on the given LSN. This guarantees that the Pageserver won't
/// garbage collect the LSN until the lease expires. Must be acquired on all relevant shards.
///
/// Returns the lease expiration time, or a FailedPrecondition status if the lease could not be
/// acquired because the LSN has already been garbage collected.
pub async fn lease_lsn(
&mut self,
req: model::LeaseLsnRequest,
) -> Result<model::LeaseLsnResponse, tonic::Status> {
let req = proto::LeaseLsnRequest::from(req);
Ok(self.client.lease_lsn(req).await?.into_inner().try_into()?)
}
}

View File

@@ -356,10 +356,7 @@ impl TryFrom<proto::GetPageRequest> for GetPageRequest {
return Err(ProtocolError::Missing("block_number"));
}
Ok(Self {
request_id: pb
.request_id
.ok_or(ProtocolError::Missing("request_id"))?
.into(),
request_id: pb.request_id,
request_class: pb.request_class.into(),
read_lsn: pb
.read_lsn
@@ -374,7 +371,7 @@ impl TryFrom<proto::GetPageRequest> for GetPageRequest {
impl From<GetPageRequest> for proto::GetPageRequest {
fn from(request: GetPageRequest) -> Self {
Self {
request_id: Some(request.request_id.into()),
request_id: request.request_id,
request_class: request.request_class.into(),
read_lsn: Some(request.read_lsn.into()),
rel: Some(request.rel.into()),
@@ -383,54 +380,11 @@ impl From<GetPageRequest> for proto::GetPageRequest {
}
}
/// A GetPage request ID and retry attempt. Should be unique for in-flight requests on a stream.
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct RequestID {
/// The base request ID.
pub id: u64,
// The request attempt. Starts at 0, incremented on each retry.
pub attempt: u32,
}
impl RequestID {
/// Creates a new RequestID with the given ID and an initial attempt of 0.
pub fn new(id: u64) -> Self {
Self { id, attempt: 0 }
}
}
impl Display for RequestID {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}.{}", self.id, self.attempt)
}
}
impl From<proto::RequestId> for RequestID {
fn from(pb: proto::RequestId) -> Self {
Self {
id: pb.id,
attempt: pb.attempt,
}
}
}
impl From<u64> for RequestID {
fn from(id: u64) -> Self {
Self::new(id)
}
}
impl From<RequestID> for proto::RequestId {
fn from(request_id: RequestID) -> Self {
Self {
id: request_id.id,
attempt: request_id.attempt,
}
}
}
/// A GetPage request ID.
pub type RequestID = u64;
/// A GetPage request class.
#[derive(Clone, Copy, Debug, strum_macros::Display)]
#[derive(Clone, Copy, Debug)]
pub enum GetPageClass {
/// Unknown class. For backwards compatibility: used when an older client version sends a class
/// that a newer server version has removed.
@@ -443,19 +397,6 @@ pub enum GetPageClass {
Background,
}
impl GetPageClass {
/// Returns true if this is considered a bulk request (i.e. more throughput-oriented rather than
/// latency-sensitive).
pub fn is_bulk(&self) -> bool {
match self {
Self::Unknown => false,
Self::Normal => false,
Self::Prefetch => true,
Self::Background => true,
}
}
}
impl From<proto::GetPageClass> for GetPageClass {
fn from(pb: proto::GetPageClass) -> Self {
match pb {
@@ -502,41 +443,32 @@ impl From<GetPageClass> for i32 {
pub struct GetPageResponse {
/// The original request's ID.
pub request_id: RequestID,
/// The response status code. If not OK, the `rel` and `pages` fields will be empty.
/// The response status code.
pub status_code: GetPageStatusCode,
/// A string describing the status, if any.
pub reason: Option<String>,
/// The relation that the pages belong to.
pub rel: RelTag,
// The page(s), in the same order as the request.
pub pages: Vec<Page>,
/// The 8KB page images, in the same order as the request. Empty if status != OK.
pub page_images: Vec<Bytes>,
}
impl TryFrom<proto::GetPageResponse> for GetPageResponse {
type Error = ProtocolError;
fn try_from(pb: proto::GetPageResponse) -> Result<Self, ProtocolError> {
Ok(Self {
request_id: pb
.request_id
.ok_or(ProtocolError::Missing("request_id"))?
.into(),
impl From<proto::GetPageResponse> for GetPageResponse {
fn from(pb: proto::GetPageResponse) -> Self {
Self {
request_id: pb.request_id,
status_code: pb.status_code.into(),
reason: Some(pb.reason).filter(|r| !r.is_empty()),
rel: pb.rel.ok_or(ProtocolError::Missing("rel"))?.try_into()?,
pages: pb.page.into_iter().map(Page::from).collect(),
})
page_images: pb.page_image,
}
}
}
impl From<GetPageResponse> for proto::GetPageResponse {
fn from(response: GetPageResponse) -> Self {
Self {
request_id: Some(response.request_id.into()),
request_id: response.request_id,
status_code: response.status_code.into(),
reason: response.reason.unwrap_or_default(),
rel: Some(response.rel.into()),
page: response.pages.into_iter().map(proto::Page::from).collect(),
page_image: response.page_images,
}
}
}
@@ -569,39 +501,11 @@ impl GetPageResponse {
request_id,
status_code,
reason: Some(status.message().to_string()),
rel: RelTag::default(),
pages: Vec::new(),
page_images: Vec::new(),
})
}
}
// A page.
#[derive(Clone, Debug)]
pub struct Page {
/// The page number.
pub block_number: u32,
/// The materialized page image, as an 8KB byte vector.
pub image: Bytes,
}
impl From<proto::Page> for Page {
fn from(pb: proto::Page) -> Self {
Self {
block_number: pb.block_number,
image: pb.image,
}
}
}
impl From<Page> for proto::Page {
fn from(page: Page) -> Self {
Self {
block_number: page.block_number,
image: page.image,
}
}
}
/// A GetPage response status code.
///
/// These are effectively equivalent to gRPC statuses. However, we use a bidirectional stream
@@ -698,21 +602,6 @@ impl TryFrom<tonic::Code> for GetPageStatusCode {
}
}
impl From<GetPageStatusCode> for tonic::Code {
fn from(status_code: GetPageStatusCode) -> Self {
use tonic::Code;
match status_code {
GetPageStatusCode::Unknown => Code::Unknown,
GetPageStatusCode::Ok => Code::Ok,
GetPageStatusCode::NotFound => Code::NotFound,
GetPageStatusCode::InvalidRequest => Code::InvalidArgument,
GetPageStatusCode::InternalError => Code::Internal,
GetPageStatusCode::SlowDown => Code::ResourceExhausted,
}
}
}
// Fetches the size of a relation at a given LSN, as # of blocks. Only valid on shard 0, other
// shards will error.
#[derive(Clone, Copy, Debug)]

View File

@@ -27,9 +27,8 @@ tokio-util.workspace = true
tonic.workspace = true
url.workspace = true
pageserver_api.workspace = true
pageserver_client.workspace = true
pageserver_client_grpc.workspace = true
pageserver_api.workspace = true
pageserver_page_api.workspace = true
utils = { path = "../../libs/utils/" }
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -326,7 +326,7 @@ impl GrpcClient {
ttid: TenantTimelineId,
compression: bool,
) -> anyhow::Result<Self> {
let inner = page_api::Client::connect(
let inner = page_api::Client::new(
connstring.to_string(),
ttid.tenant_id,
ttid.timeline_id,

View File

@@ -10,14 +10,12 @@ use anyhow::Context;
use async_trait::async_trait;
use bytes::Bytes;
use camino::Utf8PathBuf;
use futures::stream::FuturesUnordered;
use futures::{Stream, StreamExt as _};
use pageserver_api::key::Key;
use pageserver_api::keyspace::KeySpaceAccum;
use pageserver_api::pagestream_api::{PagestreamGetPageRequest, PagestreamRequest};
use pageserver_api::reltag::RelTag;
use pageserver_api::shard::TenantShardId;
use pageserver_client_grpc::{self as client_grpc, ShardSpec};
use pageserver_page_api as page_api;
use rand::prelude::*;
use tokio::task::JoinSet;
@@ -39,10 +37,6 @@ pub(crate) struct Args {
/// Pageserver connection string. Supports postgresql:// and grpc:// protocols.
#[clap(long, default_value = "postgres://postgres@localhost:64000")]
page_service_connstring: String,
/// Use the rich gRPC Pageserver client `client_grpc::PageserverClient`, rather than the basic
/// no-frills `page_api::Client`. Only valid with grpc:// connstrings.
#[clap(long)]
rich_client: bool,
#[clap(long)]
pageserver_jwt: Option<String>,
#[clap(long, default_value = "1")]
@@ -338,7 +332,6 @@ async fn main_impl(
let client: Box<dyn Client> = match scheme.as_str() {
"postgresql" | "postgres" => {
assert!(!args.compression, "libpq does not support compression");
assert!(!args.rich_client, "rich client requires grpc://");
Box::new(
LibpqClient::new(&args.page_service_connstring, worker_id.timeline)
.await
@@ -346,16 +339,6 @@ async fn main_impl(
)
}
"grpc" if args.rich_client => Box::new(
RichGrpcClient::new(
&args.page_service_connstring,
worker_id.timeline,
args.compression,
)
.await
.unwrap(),
),
"grpc" => Box::new(
GrpcClient::new(
&args.page_service_connstring,
@@ -642,7 +625,7 @@ impl GrpcClient {
ttid: TenantTimelineId,
compression: bool,
) -> anyhow::Result<Self> {
let mut client = page_api::Client::connect(
let mut client = page_api::Client::new(
connstring.to_string(),
ttid.tenant_id,
ttid.timeline_id,
@@ -674,7 +657,7 @@ impl Client for GrpcClient {
blks: Vec<u32>,
) -> anyhow::Result<()> {
let req = page_api::GetPageRequest {
request_id: req_id.into(),
request_id: req_id,
request_class: page_api::GetPageClass::Normal,
read_lsn: page_api::ReadLsn {
request_lsn: req_lsn,
@@ -694,79 +677,6 @@ impl Client for GrpcClient {
"unexpected status code: {}",
resp.status_code,
);
Ok((
resp.request_id.id,
resp.pages.into_iter().map(|p| p.image).collect(),
))
}
}
/// A rich gRPC Pageserver client.
struct RichGrpcClient {
inner: Arc<client_grpc::PageserverClient>,
requests: FuturesUnordered<
Pin<Box<dyn Future<Output = anyhow::Result<page_api::GetPageResponse>> + Send>>,
>,
}
impl RichGrpcClient {
async fn new(
connstring: &str,
ttid: TenantTimelineId,
compression: bool,
) -> anyhow::Result<Self> {
let inner = Arc::new(client_grpc::PageserverClient::new(
ttid.tenant_id,
ttid.timeline_id,
ShardSpec::new(
[(ShardIndex::unsharded(), connstring.to_string())].into(),
None,
)?,
None,
compression.then_some(tonic::codec::CompressionEncoding::Zstd),
)?);
Ok(Self {
inner,
requests: FuturesUnordered::new(),
})
}
}
#[async_trait]
impl Client for RichGrpcClient {
async fn send_get_page(
&mut self,
req_id: u64,
req_lsn: Lsn,
mod_lsn: Lsn,
rel: RelTag,
blks: Vec<u32>,
) -> anyhow::Result<()> {
let req = page_api::GetPageRequest {
request_id: req_id.into(),
request_class: page_api::GetPageClass::Normal,
read_lsn: page_api::ReadLsn {
request_lsn: req_lsn,
not_modified_since_lsn: Some(mod_lsn),
},
rel,
block_numbers: blks,
};
let inner = self.inner.clone();
self.requests.push(Box::pin(async move {
inner
.get_page(req)
.await
.map_err(|err| anyhow::anyhow!("{err}"))
}));
Ok(())
}
async fn recv_get_page(&mut self) -> anyhow::Result<(u64, Vec<Bytes>)> {
let resp = self.requests.next().await.unwrap()?;
Ok((
resp.request_id.id,
resp.pages.into_iter().map(|p| p.image).collect(),
))
Ok((resp.request_id, resp.page_images))
}
}

View File

@@ -29,8 +29,8 @@ use pageserver::task_mgr::{
};
use pageserver::tenant::{TenantSharedResources, mgr, secondary};
use pageserver::{
CancellableTask, ConsumptionMetricsTasks, HttpEndpointListener, HttpsEndpointListener,
MetricsCollectionTask, http, page_cache, page_service, task_mgr, virtual_file,
CancellableTask, ConsumptionMetricsTasks, HttpEndpointListener, HttpsEndpointListener, http,
page_cache, page_service, task_mgr, virtual_file,
};
use postgres_backend::AuthType;
use remote_storage::GenericRemoteStorage;
@@ -41,7 +41,6 @@ use tracing_utils::OtelGuard;
use utils::auth::{JwtAuth, SwappableJwtAuth};
use utils::crashsafe::syncfs;
use utils::logging::TracingErrorLayerEnablement;
use utils::metrics_collector::{METRICS_COLLECTION_INTERVAL, METRICS_COLLECTOR};
use utils::sentry_init::init_sentry;
use utils::{failpoint_support, logging, project_build_tag, project_git_version, tcp_listener};
@@ -764,41 +763,6 @@ fn start_pageserver(
(http_task, https_task)
};
/* BEGIN_HADRON */
let metrics_collection_task = {
let cancel = shutdown_pageserver.child_token();
let task = crate::BACKGROUND_RUNTIME.spawn({
let cancel = cancel.clone();
let background_jobs_barrier = background_jobs_barrier.clone();
async move {
if conf.force_metric_collection_on_scrape {
return;
}
// first wait until background jobs are cleared to launch.
tokio::select! {
_ = cancel.cancelled() => { return; },
_ = background_jobs_barrier.wait() => {}
};
let mut interval = tokio::time::interval(METRICS_COLLECTION_INTERVAL);
loop {
tokio::select! {
_ = cancel.cancelled() => {
tracing::info!("cancelled metrics collection task, exiting...");
break;
},
_ = interval.tick() => {}
}
tokio::task::spawn_blocking(|| {
METRICS_COLLECTOR.run_once(true);
});
}
}
});
MetricsCollectionTask(CancellableTask { task, cancel })
};
/* END_HADRON */
let consumption_metrics_tasks = {
let cancel = shutdown_pageserver.child_token();
let task = crate::BACKGROUND_RUNTIME.spawn({
@@ -880,7 +844,6 @@ fn start_pageserver(
https_endpoint_listener,
page_service,
page_service_grpc,
metrics_collection_task,
consumption_metrics_tasks,
disk_usage_eviction_task,
&tenant_manager,
@@ -926,11 +889,8 @@ async fn create_remote_storage_client(
"Simulating remote failures for first {} attempts of each op",
conf.test_remote_failures
);
remote_storage = GenericRemoteStorage::unreliable_wrapper(
remote_storage,
conf.test_remote_failures,
conf.test_remote_failures_probability,
);
remote_storage =
GenericRemoteStorage::unreliable_wrapper(remote_storage, conf.test_remote_failures);
}
Ok(remote_storage)

View File

@@ -28,6 +28,7 @@ use reqwest::Url;
use storage_broker::Uri;
use utils::id::{NodeId, TimelineId};
use utils::logging::{LogFormat, SecretString};
use utils::serde_percent::Percent;
use crate::tenant::storage_layer::inmemory_layer::IndexEntry;
use crate::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
@@ -145,13 +146,9 @@ pub struct PageServerConf {
pub metric_collection_bucket: Option<RemoteStorageConfig>,
pub synthetic_size_calculation_interval: Duration,
pub disk_usage_based_eviction: DiskUsageEvictionTaskConfig,
pub disk_usage_based_eviction: Option<DiskUsageEvictionTaskConfig>,
// The number of allowed failures in remote storage operations.
pub test_remote_failures: u64,
// The probability of failure in remote storage operations. Only works when test_remote_failures > 1.
// Use 100 for 100% failure, 0 for no failure.
pub test_remote_failures_probability: u64,
pub ondemand_download_behavior_treat_error_as_warn: bool,
@@ -252,14 +249,6 @@ pub struct PageServerConf {
pub timeline_import_config: pageserver_api::config::TimelineImportConfig,
pub basebackup_cache_config: Option<pageserver_api::config::BasebackupCacheConfig>,
/// Defines what is a big tenant for the purpose of image layer generation.
/// See Timeline::should_check_if_image_layers_required
pub image_layer_generation_large_timeline_threshold: Option<u64>,
/// Controls whether to collect all metrics on each scrape or to return potentially stale
/// results.
pub force_metric_collection_on_scrape: bool,
}
/// Token for authentication to safekeepers
@@ -404,7 +393,6 @@ impl PageServerConf {
synthetic_size_calculation_interval,
disk_usage_based_eviction,
test_remote_failures,
test_remote_failures_probability,
ondemand_download_behavior_treat_error_as_warn,
background_task_maximum_delay,
control_plane_api,
@@ -440,8 +428,6 @@ impl PageServerConf {
posthog_config,
timeline_import_config,
basebackup_cache_config,
image_layer_generation_large_timeline_threshold,
force_metric_collection_on_scrape,
} = config_toml;
let mut conf = PageServerConf {
@@ -474,9 +460,17 @@ impl PageServerConf {
metric_collection_endpoint,
metric_collection_bucket,
synthetic_size_calculation_interval,
disk_usage_based_eviction,
disk_usage_based_eviction: Some(disk_usage_based_eviction.unwrap_or(
DiskUsageEvictionTaskConfig {
max_usage_pct: Percent::new(80).unwrap(),
min_avail_bytes: 2_000_000_000,
period: Duration::from_secs(60),
#[cfg(feature = "testing")]
mock_statvfs: None,
eviction_order: Default::default(),
},
)),
test_remote_failures,
test_remote_failures_probability,
ondemand_download_behavior_treat_error_as_warn,
background_task_maximum_delay,
control_plane_api: control_plane_api
@@ -500,8 +494,6 @@ impl PageServerConf {
dev_mode,
timeline_import_config,
basebackup_cache_config,
image_layer_generation_large_timeline_threshold,
force_metric_collection_on_scrape,
// ------------------------------------------------------------
// fields that require additional validation or custom handling
@@ -643,7 +635,7 @@ impl PageServerConf {
pub fn dummy_conf(repo_dir: Utf8PathBuf) -> Self {
let pg_distrib_dir = Utf8PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../pg_install");
let mut config_toml = pageserver_api::config::ConfigToml {
let config_toml = pageserver_api::config::ConfigToml {
wait_lsn_timeout: Duration::from_secs(60),
wal_redo_timeout: Duration::from_secs(60),
pg_distrib_dir: Some(pg_distrib_dir),
@@ -655,15 +647,6 @@ impl PageServerConf {
control_plane_api: Some(Url::parse("http://localhost:6666").unwrap()),
..Default::default()
};
// Test authors tend to forget about the default 10min initial lease deadline
// when writing tests, which turns their immediate gc requests via mgmt API
// into no-ops. Override the binary default here, such that there is no initial
// lease deadline by default in tests. Tests that care can always override it
// themselves.
// Cf https://databricks.atlassian.net/browse/LKB-92?focusedCommentId=6722329
config_toml.tenant_config.lsn_lease_length = Duration::from_secs(0);
PageServerConf::parse_and_validate(NodeId(0), config_toml, &repo_dir).unwrap()
}
}
@@ -727,9 +710,8 @@ mod tests {
use std::time::Duration;
use camino::Utf8PathBuf;
use pageserver_api::config::{DiskUsageEvictionTaskConfig, EvictionOrder};
use rstest::rstest;
use utils::{id::NodeId, serde_percent::Percent};
use utils::id::NodeId;
use super::PageServerConf;
@@ -829,69 +811,19 @@ mod tests {
.expect("parse_and_validate");
}
#[rstest]
#[
case::omit_the_whole_config(
DiskUsageEvictionTaskConfig {
max_usage_pct: Percent::new(80).unwrap(),
min_avail_bytes: 2_000_000_000,
period: Duration::from_secs(60),
eviction_order: Default::default(),
#[cfg(feature = "testing")]
mock_statvfs: None,
enabled: true,
},
r#"
#[test]
fn test_config_disk_usage_based_eviction_is_valid() {
let input = r#"
control_plane_api = "http://localhost:6666"
"#,
)]
#[
case::omit_enabled_field(
DiskUsageEvictionTaskConfig {
max_usage_pct: Percent::new(80).unwrap(),
min_avail_bytes: 1_000_000_000,
period: Duration::from_secs(60),
eviction_order: EvictionOrder::RelativeAccessed {
highest_layer_count_loses_first: true,
},
#[cfg(feature = "testing")]
mock_statvfs: None,
enabled: true,
},
r#"
control_plane_api = "http://localhost:6666"
disk_usage_based_eviction = { max_usage_pct = 80, min_avail_bytes = 1000000000, period = "60s" }
"#,
)]
#[case::disabled(
DiskUsageEvictionTaskConfig {
max_usage_pct: Percent::new(80).unwrap(),
min_avail_bytes: 2_000_000_000,
period: Duration::from_secs(60),
eviction_order: EvictionOrder::RelativeAccessed {
highest_layer_count_loses_first: true,
},
#[cfg(feature = "testing")]
mock_statvfs: None,
enabled: false,
},
r#"
control_plane_api = "http://localhost:6666"
disk_usage_based_eviction = { enabled = false }
"#
)]
fn test_config_disk_usage_based_eviction_is_valid(
#[case] expected_disk_usage_based_eviction: DiskUsageEvictionTaskConfig,
#[case] input: &str,
) {
"#;
let config_toml = toml_edit::de::from_str::<pageserver_api::config::ConfigToml>(input)
.expect("disk_usage_based_eviction is valid");
let workdir = Utf8PathBuf::from("/nonexistent");
let config = PageServerConf::parse_and_validate(NodeId(0), config_toml, &workdir).unwrap();
let disk_usage_based_eviction = config.disk_usage_based_eviction;
assert_eq!(
expected_disk_usage_based_eviction,
disk_usage_based_eviction
);
let disk_usage_based_eviction = config.disk_usage_based_eviction.unwrap();
assert_eq!(disk_usage_based_eviction.max_usage_pct.get(), 80);
assert_eq!(disk_usage_based_eviction.min_avail_bytes, 2_000_000_000);
assert_eq!(disk_usage_based_eviction.period, Duration::from_secs(60));
assert_eq!(disk_usage_based_eviction.eviction_order, Default::default());
}
}

View File

@@ -171,8 +171,7 @@ pub fn launch_disk_usage_global_eviction_task(
tenant_manager: Arc<TenantManager>,
background_jobs_barrier: completion::Barrier,
) -> Option<DiskUsageEvictionTask> {
let task_config = &conf.disk_usage_based_eviction;
if !task_config.enabled {
let Some(task_config) = &conf.disk_usage_based_eviction else {
info!("disk usage based eviction task not configured");
return None;
};
@@ -459,9 +458,6 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
match next {
Ok(Ok(file_size)) => {
METRICS.layers_evicted.inc();
/*BEGIN_HADRON */
METRICS.bytes_evicted.inc_by(file_size);
/*END_HADRON */
usage_assumed.add_available_bytes(file_size);
}
Ok(Err((
@@ -1269,7 +1265,6 @@ mod filesystem_level_usage {
#[cfg(feature = "testing")]
mock_statvfs: None,
eviction_order: pageserver_api::config::EvictionOrder::default(),
enabled: true,
},
total_bytes: 100_000,
avail_bytes: 0,

View File

@@ -1,8 +1,4 @@
use std::{
collections::HashMap,
sync::{Arc, atomic::AtomicBool},
time::Duration,
};
use std::{collections::HashMap, sync::Arc, time::Duration};
use arc_swap::ArcSwap;
use pageserver_api::config::NodeMetadata;
@@ -359,17 +355,11 @@ impl PerTenantProperties {
}
}
#[derive(Clone)]
pub struct TenantFeatureResolver {
inner: FeatureResolver,
tenant_id: TenantId,
cached_tenant_properties: ArcSwap<HashMap<String, PostHogFlagFilterPropertyValue>>,
// Add feature flag on the critical path below.
//
// If a feature flag will be used on the critical path, we will update it in the tenant housekeeping loop insetad of
// resolving directly by calling `evaluate_multivariate` or `evaluate_boolean`. Remember to update the flag in the
// housekeeping loop. The user should directly read this atomic flag instead of using the set of evaluate functions.
pub feature_test_remote_size_flag: AtomicBool,
cached_tenant_properties: Arc<ArcSwap<HashMap<String, PostHogFlagFilterPropertyValue>>>,
}
impl TenantFeatureResolver {
@@ -377,8 +367,7 @@ impl TenantFeatureResolver {
Self {
inner,
tenant_id,
cached_tenant_properties: ArcSwap::new(Arc::new(HashMap::new())),
feature_test_remote_size_flag: AtomicBool::new(false),
cached_tenant_properties: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))),
}
}
@@ -407,14 +396,12 @@ impl TenantFeatureResolver {
self.inner.is_feature_flag_boolean(flag_key)
}
/// Refresh the cached properties and flags on the critical path.
pub fn refresh_properties_and_flags(&self, tenant_shard: &TenantShard) {
let mut remote_size_mb = Some(0.0);
pub fn update_cached_tenant_properties(&self, tenant_shard: &TenantShard) {
let mut remote_size_mb = None;
for timeline in tenant_shard.list_timelines() {
let size = timeline.metrics.resident_physical_size_get();
if size == 0 {
remote_size_mb = None;
break;
}
if let Some(ref mut remote_size_mb) = remote_size_mb {
*remote_size_mb += size as f64 / 1024.0 / 1024.0;
@@ -423,12 +410,5 @@ impl TenantFeatureResolver {
self.cached_tenant_properties.store(Arc::new(
PerTenantProperties { remote_size_mb }.into_posthog_properties(),
));
// BEGIN: Update the feature flag on the critical path.
self.feature_test_remote_size_flag.store(
self.evaluate_boolean("test-remote-size-flag").is_ok(),
std::sync::atomic::Ordering::Relaxed,
);
// END: Update the feature flag on the critical path.
}
}

View File

@@ -116,6 +116,26 @@ paths:
schema:
type: string
/v1/tenant/{tenant_id}/timeline:
parameters:
- name: tenant_id
in: path
required: true
schema:
type: string
get:
description: Get timelines for tenant
responses:
"200":
description: TimelineInfo
content:
application/json:
schema:
type: array
items:
$ref: "#/components/schemas/TimelineInfo"
/v1/tenant/{tenant_id}/timeline/{timeline_id}:
parameters:
- name: tenant_id
@@ -598,7 +618,7 @@ paths:
schema:
$ref: "#/components/schemas/SecondaryProgress"
/v1/tenant/{tenant_id}/timeline:
/v1/tenant/{tenant_id}/timeline/:
parameters:
- name: tenant_id
in: path
@@ -665,17 +685,6 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/Error"
get:
description: Get timelines for tenant
responses:
"200":
description: TimelineInfo
content:
application/json:
schema:
type: array
items:
$ref: "#/components/schemas/TimelineInfo"
/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/detach_ancestor:
parameters:
@@ -758,7 +767,7 @@ paths:
$ref: "#/components/schemas/ServiceUnavailableError"
/v1/tenant:
/v1/tenant/:
get:
description: Get tenants list
responses:
@@ -838,7 +847,7 @@ paths:
items:
$ref: "#/components/schemas/TenantInfo"
/v1/tenant/{tenant_id}/config:
/v1/tenant/{tenant_id}/config/:
parameters:
- name: tenant_id
in: path

View File

@@ -2,9 +2,7 @@
//! Management HTTP API
//!
use std::cmp::Reverse;
use std::collections::BTreeMap;
use std::collections::BinaryHeap;
use std::collections::HashMap;
use std::collections::{BinaryHeap, HashMap};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
@@ -63,7 +61,6 @@ use crate::context;
use crate::context::{DownloadBehavior, RequestContext, RequestContextBuilder};
use crate::deletion_queue::DeletionQueueClient;
use crate::feature_resolver::FeatureResolver;
use crate::metrics::LOCAL_DATA_LOSS_SUSPECTED;
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::task_mgr::TaskKind;
use crate::tenant::config::LocationConf;
@@ -81,8 +78,8 @@ use crate::tenant::storage_layer::{IoConcurrency, LayerAccessStatsReset, LayerNa
use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
use crate::tenant::timeline::offload::{OffloadError, offload_timeline};
use crate::tenant::timeline::{
CompactFlags, CompactOptions, CompactRequest, MarkInvisibleRequest, Timeline, WaitLsnTimeout,
WaitLsnWaiter, import_pgdata,
CompactFlags, CompactOptions, CompactRequest, CompactionError, MarkInvisibleRequest, Timeline,
WaitLsnTimeout, WaitLsnWaiter, import_pgdata,
};
use crate::tenant::{
GetTimelineError, LogicalSizeCalculationCause, OffloadedTimeline, PageReconstructError,
@@ -397,7 +394,6 @@ async fn build_timeline_info(
timeline: &Arc<Timeline>,
include_non_incremental_logical_size: bool,
force_await_initial_logical_size: bool,
include_image_consistent_lsn: bool,
ctx: &RequestContext,
) -> anyhow::Result<TimelineInfo> {
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
@@ -422,10 +418,6 @@ async fn build_timeline_info(
.await?,
);
}
// HADRON
if include_image_consistent_lsn {
info.image_consistent_lsn = Some(timeline.compute_image_consistent_lsn().await?);
}
Ok(info)
}
@@ -515,8 +507,6 @@ async fn build_timeline_info_common(
is_invisible: Some(is_invisible),
walreceiver_status,
// HADRON
image_consistent_lsn: None,
};
Ok(info)
}
@@ -719,8 +709,6 @@ async fn timeline_list_handler(
parse_query_param(&request, "include-non-incremental-logical-size")?;
let force_await_initial_logical_size: Option<bool> =
parse_query_param(&request, "force-await-initial-logical-size")?;
let include_image_consistent_lsn: Option<bool> =
parse_query_param(&request, "include-image-consistent-lsn")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let state = get_state(&request);
@@ -741,7 +729,6 @@ async fn timeline_list_handler(
&timeline,
include_non_incremental_logical_size.unwrap_or(false),
force_await_initial_logical_size.unwrap_or(false),
include_image_consistent_lsn.unwrap_or(false),
&ctx,
)
.instrument(info_span!("build_timeline_info", timeline_id = %timeline.timeline_id))
@@ -770,9 +757,6 @@ async fn timeline_and_offloaded_list_handler(
parse_query_param(&request, "include-non-incremental-logical-size")?;
let force_await_initial_logical_size: Option<bool> =
parse_query_param(&request, "force-await-initial-logical-size")?;
let include_image_consistent_lsn: Option<bool> =
parse_query_param(&request, "include-image-consistent-lsn")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let state = get_state(&request);
@@ -793,7 +777,6 @@ async fn timeline_and_offloaded_list_handler(
&timeline,
include_non_incremental_logical_size.unwrap_or(false),
force_await_initial_logical_size.unwrap_or(false),
include_image_consistent_lsn.unwrap_or(false),
&ctx,
)
.instrument(info_span!("build_timeline_info", timeline_id = %timeline.timeline_id))
@@ -978,9 +961,6 @@ async fn timeline_detail_handler(
parse_query_param(&request, "include-non-incremental-logical-size")?;
let force_await_initial_logical_size: Option<bool> =
parse_query_param(&request, "force-await-initial-logical-size")?;
// HADRON
let include_image_consistent_lsn: Option<bool> =
parse_query_param(&request, "include-image-consistent-lsn")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
// Logical size calculation needs downloading.
@@ -1001,7 +981,6 @@ async fn timeline_detail_handler(
&timeline,
include_non_incremental_logical_size.unwrap_or(false),
force_await_initial_logical_size.unwrap_or(false),
include_image_consistent_lsn.unwrap_or(false),
ctx,
)
.await
@@ -2520,10 +2499,12 @@ async fn timeline_checkpoint_handler(
.compact(&cancel, flags, &ctx)
.await
.map_err(|e|
if e.is_cancel() {
ApiError::ShuttingDown
} else {
ApiError::InternalServerError(e.into_anyhow())
match e {
CompactionError::ShuttingDown => ApiError::ShuttingDown,
CompactionError::Offload(e) => ApiError::InternalServerError(anyhow::anyhow!(e)),
CompactionError::CollectKeySpaceError(e) => ApiError::InternalServerError(anyhow::anyhow!(e)),
CompactionError::Other(e) => ApiError::InternalServerError(e),
CompactionError::AlreadyRunning(_) => ApiError::InternalServerError(anyhow::anyhow!(e)),
}
)?;
}
@@ -3234,30 +3215,6 @@ async fn get_utilization(
.map_err(ApiError::InternalServerError)
}
/// HADRON
async fn list_tenant_visible_size_handler(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
let state = get_state(&request);
let mut map = BTreeMap::new();
for (tenant_shard_id, slot) in state.tenant_manager.list() {
match slot {
TenantSlot::Attached(tenant) => {
let visible_size = tenant.get_visible_size();
map.insert(tenant_shard_id, visible_size);
}
TenantSlot::Secondary(_) | TenantSlot::InProgress(_) => {
continue;
}
}
}
json_response(StatusCode::OK, map)
}
async fn list_aux_files(
mut request: Request<Body>,
_cancel: CancellationToken,
@@ -3661,7 +3618,6 @@ async fn activate_post_import_handler(
let timeline_info = build_timeline_info(
&timeline, false, // include_non_incremental_logical_size,
false, // force_await_initial_logical_size
false, // include_image_consistent_lsn
&ctx,
)
.await
@@ -3674,17 +3630,6 @@ async fn activate_post_import_handler(
.await
}
// [Hadron] Reset gauge metrics that are used to raised alerts. We need this API as a stop-gap measure to reset alerts
// after we manually rectify situations such as local SSD data loss. We will eventually automate this.
async fn hadron_reset_alert_gauges(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
LOCAL_DATA_LOSS_SUSPECTED.set(0);
json_response(StatusCode::OK, ())
}
/// Read the end of a tar archive.
///
/// A tar archive normally ends with two consecutive blocks of zeros, 512 bytes each.
@@ -3737,23 +3682,6 @@ async fn read_tar_eof(mut reader: (impl tokio::io::AsyncRead + Unpin)) -> anyhow
Ok(())
}
async fn force_refresh_feature_flag(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let state = get_state(&request);
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
tenant
.feature_resolver
.refresh_properties_and_flags(&tenant);
json_response(StatusCode::OK, ())
}
async fn tenant_evaluate_feature_flag(
request: Request<Body>,
_cancel: CancellationToken,
@@ -3770,7 +3698,7 @@ async fn tenant_evaluate_feature_flag(
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
// TODO: the properties we get here might be stale right after it is collected. But such races are rare (updated every 10s)
// TODO: the properties we get here might be stale right after it is collected. But such races are rare (updated every 10s)
// and we don't need to worry about it for now.
let properties = tenant.feature_resolver.collect_properties();
if as_type.as_deref() == Some("boolean") {
@@ -3983,14 +3911,9 @@ pub fn make_router(
.expect("construct launch timestamp header middleware"),
);
let force_metric_collection_on_scrape = state.conf.force_metric_collection_on_scrape;
let prometheus_metrics_handler_wrapper =
move |req| prometheus_metrics_handler(req, force_metric_collection_on_scrape);
Ok(router
.data(state)
.get("/metrics", move |r| request_span(r, prometheus_metrics_handler_wrapper))
.get("/metrics", |r| request_span(r, prometheus_metrics_handler))
.get("/profile/cpu", |r| request_span(r, profile_cpu_handler))
.get("/profile/heap", |r| request_span(r, profile_heap_handler))
.get("/v1/status", |r| api_handler(r, status_handler))
@@ -4196,7 +4119,6 @@ pub fn make_router(
.put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler))
.put("/v1/io_mode", |r| api_handler(r, put_io_mode_handler))
.get("/v1/utilization", |r| api_handler(r, get_utilization))
.get("/v1/list_tenant_visible_size", |r| api_handler(r, list_tenant_visible_size_handler))
.post(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/ingest_aux_files",
|r| testing_api_handler("ingest_aux_files", r, ingest_aux_files),
@@ -4225,9 +4147,6 @@ pub fn make_router(
.get("/v1/tenant/:tenant_shard_id/feature_flag/:flag_key", |r| {
api_handler(r, tenant_evaluate_feature_flag)
})
.post("/v1/tenant/:tenant_shard_id/force_refresh_feature_flag", |r| {
api_handler(r, force_refresh_feature_flag)
})
.put("/v1/feature_flag/:flag_key", |r| {
testing_api_handler("force override feature flag - put", r, force_override_feature_flag_for_testing_put)
})
@@ -4237,8 +4156,5 @@ pub fn make_router(
.post("/v1/feature_flag_spec", |r| {
api_handler(r, update_feature_flag_spec)
})
.post("/hadron-internal/reset_alert_gauges", |r| {
api_handler(r, hadron_reset_alert_gauges)
})
.any(handler_404))
}

View File

@@ -73,9 +73,6 @@ pub struct HttpEndpointListener(pub CancellableTask);
pub struct HttpsEndpointListener(pub CancellableTask);
pub struct ConsumptionMetricsTasks(pub CancellableTask);
pub struct DiskUsageEvictionTask(pub CancellableTask);
// HADRON
pub struct MetricsCollectionTask(pub CancellableTask);
impl CancellableTask {
pub async fn shutdown(self) {
self.cancel.cancel();
@@ -90,7 +87,6 @@ pub async fn shutdown_pageserver(
https_listener: Option<HttpsEndpointListener>,
page_service: page_service::Listener,
grpc_task: Option<CancellableTask>,
metrics_collection_task: MetricsCollectionTask,
consumption_metrics_worker: ConsumptionMetricsTasks,
disk_usage_eviction_task: Option<DiskUsageEvictionTask>,
tenant_manager: &TenantManager,
@@ -215,14 +211,6 @@ pub async fn shutdown_pageserver(
// Best effort to persist any outstanding deletions, to avoid leaking objects
deletion_queue.shutdown(Duration::from_secs(5)).await;
// HADRON
timed(
metrics_collection_task.0.shutdown(),
"shutdown metrics collections metrics",
Duration::from_secs(1),
)
.await;
timed(
consumption_metrics_worker.0.shutdown(),
"shutdown consumption metrics",

View File

@@ -1,4 +1,3 @@
use std::cell::Cell;
use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::os::fd::RawFd;
@@ -103,18 +102,7 @@ pub(crate) static STORAGE_TIME_COUNT_PER_TIMELINE: Lazy<IntCounterVec> = Lazy::n
.expect("failed to define a metric")
});
/* BEGIN_HADRON */
pub(crate) static STORAGE_ACTIVE_COUNT_PER_TIMELINE: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_active_storage_operations_count",
"Count of active storage operations with operation, tenant and timeline dimensions",
&["operation", "tenant_id", "shard_id", "timeline_id"],
)
.expect("failed to define a metric")
});
/*END_HADRON */
// Buckets for background operations like compaction, GC, size calculation
// Buckets for background operation duration in seconds, like compaction, GC, size calculation.
const STORAGE_OP_BUCKETS: &[f64] = &[0.010, 0.100, 1.0, 10.0, 100.0, 1000.0];
pub(crate) static STORAGE_TIME_GLOBAL: Lazy<HistogramVec> = Lazy::new(|| {
@@ -2822,49 +2810,6 @@ pub(crate) static WALRECEIVER_CANDIDATES_ADDED: Lazy<IntCounter> =
pub(crate) static WALRECEIVER_CANDIDATES_REMOVED: Lazy<IntCounter> =
Lazy::new(|| WALRECEIVER_CANDIDATES_EVENTS.with_label_values(&["remove"]));
pub(crate) static LOCAL_DATA_LOSS_SUSPECTED: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"pageserver_local_data_loss_suspected",
"Non-zero value indicates that pageserver local data loss is suspected (and highly likely)."
)
.expect("failed to define a metric")
});
// Counter keeping track of misrouted PageStream requests. Spelling out PageStream requests here to distinguish
// it from other types of reqeusts (SK wal replication, http requests, etc.). PageStream requests are used by
// Postgres compute to fetch data from pageservers.
// A misrouted PageStream request is registered if the pageserver cannot find the tenant identified in the
// request, or if the pageserver is not the "primary" serving the tenant shard. These error almost always identify
// issues with compute configuration, caused by either the compute node itself being stuck in the wrong
// configuration or Storage Controller reconciliation bugs. Misrouted requests are expected during tenant migration
// and/or during recovery following a pageserver failure, but persistently high rates of misrouted requests
// are indicative of bugs (and unavailability).
pub(crate) static MISROUTED_PAGESTREAM_REQUESTS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_misrouted_pagestream_requests_total",
"Number of pageserver pagestream requests that were routed to the wrong pageserver"
)
.expect("failed to define a metric")
});
// Global counter for PageStream request results by outcome. Outcomes are divided into 3 categories:
// - success
// - internal_error: errors that indicate bugs in the storage cluster (e.g. page reconstruction errors, misrouted requests, LSN timeout errors)
// - other_error: transient error conditions that are expected in normal operation or indicate bugs with other parts of the system (e.g. error due to pageserver shutdown, malformed requests etc.)
pub(crate) static PAGESTREAM_HANDLER_RESULTS_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_pagestream_handler_results_total",
"Number of pageserver pagestream handler results by outcome (success, internal_error, other_error)",
&["outcome"]
)
.expect("failed to define a metric")
});
// Constants for pageserver_pagestream_handler_results_total's outcome labels
pub(crate) const PAGESTREAM_HANDLER_OUTCOME_SUCCESS: &str = "success";
pub(crate) const PAGESTREAM_HANDLER_OUTCOME_INTERNAL_ERROR: &str = "internal_error";
pub(crate) const PAGESTREAM_HANDLER_OUTCOME_OTHER_ERROR: &str = "other_error";
// Metrics collected on WAL redo operations
//
// We collect the time spent in actual WAL redo ('redo'), and time waiting
@@ -3103,19 +3048,13 @@ pub(crate) static WAL_REDO_PROCESS_COUNTERS: Lazy<WalRedoProcessCounters> =
pub(crate) struct StorageTimeMetricsTimer {
metrics: StorageTimeMetrics,
start: Instant,
stopped: Cell<bool>,
}
impl StorageTimeMetricsTimer {
fn new(metrics: StorageTimeMetrics) -> Self {
/*BEGIN_HADRON */
// record the active operation as the timer starts
metrics.timeline_active_count.inc();
/*END_HADRON */
Self {
metrics,
start: Instant::now(),
stopped: Cell::new(false),
}
}
@@ -3131,10 +3070,6 @@ impl StorageTimeMetricsTimer {
self.metrics.timeline_sum.inc_by(seconds);
self.metrics.timeline_count.inc();
self.metrics.global_histogram.observe(seconds);
/* BEGIN_HADRON*/
self.stopped.set(true);
self.metrics.timeline_active_count.dec();
/*END_HADRON */
duration
}
@@ -3145,16 +3080,6 @@ impl StorageTimeMetricsTimer {
}
}
/*BEGIN_HADRON */
impl Drop for StorageTimeMetricsTimer {
fn drop(&mut self) {
if !self.stopped.get() {
self.metrics.timeline_active_count.dec();
}
}
}
/*END_HADRON */
pub(crate) struct AlwaysRecordingStorageTimeMetricsTimer(Option<StorageTimeMetricsTimer>);
impl Drop for AlwaysRecordingStorageTimeMetricsTimer {
@@ -3180,10 +3105,6 @@ pub(crate) struct StorageTimeMetrics {
timeline_sum: Counter,
/// Number of oeprations, per operation, tenant_id and timeline_id
timeline_count: IntCounter,
/*BEGIN_HADRON */
/// Number of active operations per operation, tenant_id, and timeline_id
timeline_active_count: IntGauge,
/*END_HADRON */
/// Global histogram having only the "operation" label.
global_histogram: Histogram,
}
@@ -3203,11 +3124,6 @@ impl StorageTimeMetrics {
let timeline_count = STORAGE_TIME_COUNT_PER_TIMELINE
.get_metric_with_label_values(&[operation, tenant_id, shard_id, timeline_id])
.unwrap();
/*BEGIN_HADRON */
let timeline_active_count = STORAGE_ACTIVE_COUNT_PER_TIMELINE
.get_metric_with_label_values(&[operation, tenant_id, shard_id, timeline_id])
.unwrap();
/*END_HADRON */
let global_histogram = STORAGE_TIME_GLOBAL
.get_metric_with_label_values(&[operation])
.unwrap();
@@ -3215,7 +3131,6 @@ impl StorageTimeMetrics {
StorageTimeMetrics {
timeline_sum,
timeline_count,
timeline_active_count,
global_histogram,
}
}
@@ -3629,14 +3544,6 @@ impl TimelineMetrics {
shard_id,
timeline_id,
]);
/* BEGIN_HADRON */
let _ = STORAGE_ACTIVE_COUNT_PER_TIMELINE.remove_label_values(&[
op,
tenant_id,
shard_id,
timeline_id,
]);
/*END_HADRON */
}
for op in StorageIoSizeOperation::VARIANTS {
@@ -4429,9 +4336,6 @@ pub(crate) mod disk_usage_based_eviction {
pub(crate) layers_collected: IntCounter,
pub(crate) layers_selected: IntCounter,
pub(crate) layers_evicted: IntCounter,
/*BEGIN_HADRON */
pub(crate) bytes_evicted: IntCounter,
/*END_HADRON */
}
impl Default for Metrics {
@@ -4468,21 +4372,12 @@ pub(crate) mod disk_usage_based_eviction {
)
.unwrap();
/*BEGIN_HADRON */
let bytes_evicted = register_int_counter!(
"pageserver_disk_usage_based_eviction_evicted_bytes_total",
"Amount of bytes successfully evicted"
)
.unwrap();
/*END_HADRON */
Self {
tenant_collection_time,
tenant_layer_count,
layers_collected,
layers_selected,
layers_evicted,
bytes_evicted,
}
}
}
@@ -4602,7 +4497,6 @@ pub fn preinitialize_metrics(
&CIRCUIT_BREAKERS_UNBROKEN,
&PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS_GLOBAL,
&WAIT_LSN_IN_PROGRESS_GLOBAL_MICROS,
&MISROUTED_PAGESTREAM_REQUESTS,
]
.into_iter()
.for_each(|c| {
@@ -4640,7 +4534,6 @@ pub fn preinitialize_metrics(
// gauges
WALRECEIVER_ACTIVE_MANAGERS.get();
LOCAL_DATA_LOSS_SUSPECTED.get();
// histograms
[

View File

@@ -70,7 +70,7 @@ use crate::context::{
};
use crate::metrics::{
self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, GetPageBatchBreakReason, LIVE_CONNECTIONS,
MISROUTED_PAGESTREAM_REQUESTS, PAGESTREAM_HANDLER_RESULTS_TOTAL, SmgrOpTimer, TimelineMetrics,
SmgrOpTimer, TimelineMetrics,
};
use crate::pgdatadir_mapping::{LsnRange, Version};
use crate::span::{
@@ -91,8 +91,7 @@ use crate::{CancellableTask, PERF_TRACE_TARGET, timed_after_cancellation};
/// is not yet in state [`TenantState::Active`].
///
/// NB: this is a different value than [`crate::http::routes::ACTIVE_TENANT_TIMEOUT`].
/// HADRON: reduced timeout and we will retry in Cache::get().
const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(5000);
const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
/// Threshold at which to log slow GetPage requests.
const LOG_SLOW_GETPAGE_THRESHOLD: Duration = Duration::from_secs(30);
@@ -1129,7 +1128,6 @@ impl PageServerHandler {
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
// and talk to a different pageserver.
MISROUTED_PAGESTREAM_REQUESTS.inc();
return respond_error!(
span,
PageStreamError::Reconnect(
@@ -1441,57 +1439,20 @@ impl PageServerHandler {
let (response_msg, ctx) = match handler_result {
Err(e) => match &e.err {
PageStreamError::Shutdown => {
// BEGIN HADRON
PAGESTREAM_HANDLER_RESULTS_TOTAL
.with_label_values(&[metrics::PAGESTREAM_HANDLER_OUTCOME_OTHER_ERROR])
.inc();
// END HADRON
// If we fail to fulfil a request during shutdown, which may be _because_ of
// shutdown, then do not send the error to the client. Instead just drop the
// connection.
span.in_scope(|| info!("dropping connection due to shutdown"));
return Err(QueryError::Shutdown);
}
PageStreamError::Reconnect(_reason) => {
span.in_scope(|| {
// BEGIN HADRON
// We can get here because the compute node is pointing at the wrong PS. We
// already have a metric to keep track of this so suppressing this log to
// reduce log spam. The information in this log message is not going to be that
// helpful given the volume of logs that can be generated.
// info!("handler requested reconnect: {reason}")
// END HADRON
});
// BEGIN HADRON
PAGESTREAM_HANDLER_RESULTS_TOTAL
.with_label_values(&[
metrics::PAGESTREAM_HANDLER_OUTCOME_INTERNAL_ERROR,
])
.inc();
// END HADRON
PageStreamError::Reconnect(reason) => {
span.in_scope(|| info!("handler requested reconnect: {reason}"));
return Err(QueryError::Reconnect);
}
PageStreamError::Read(_)
| PageStreamError::LsnTimeout(_)
| PageStreamError::NotFound(_)
| PageStreamError::BadRequest(_) => {
// BEGIN HADRON
if let PageStreamError::Read(_) | PageStreamError::LsnTimeout(_) = &e.err {
PAGESTREAM_HANDLER_RESULTS_TOTAL
.with_label_values(&[
metrics::PAGESTREAM_HANDLER_OUTCOME_INTERNAL_ERROR,
])
.inc();
} else {
PAGESTREAM_HANDLER_RESULTS_TOTAL
.with_label_values(&[
metrics::PAGESTREAM_HANDLER_OUTCOME_OTHER_ERROR,
])
.inc();
}
// END HADRON
// print the all details to the log with {:#}, but for the client the
// error message is enough. Do not log if shutting down, as the anyhow::Error
// here includes cancellation which is not an error.
@@ -1509,15 +1470,7 @@ impl PageServerHandler {
)
}
},
Ok((response_msg, _op_timer_already_observed, ctx)) => {
// BEGIN HADRON
PAGESTREAM_HANDLER_RESULTS_TOTAL
.with_label_values(&[metrics::PAGESTREAM_HANDLER_OUTCOME_SUCCESS])
.inc();
// END HADRON
(response_msg, Some(ctx))
}
Ok((response_msg, _op_timer_already_observed, ctx)) => (response_msg, Some(ctx)),
};
let ctx = ctx.map(|req_ctx| {
@@ -3338,12 +3291,9 @@ impl GrpcPageServiceHandler {
}
/// Generates a PagestreamRequest header from a ReadLsn and request ID.
fn make_hdr(
read_lsn: page_api::ReadLsn,
req_id: Option<page_api::RequestID>,
) -> PagestreamRequest {
fn make_hdr(read_lsn: page_api::ReadLsn, req_id: u64) -> PagestreamRequest {
PagestreamRequest {
reqid: req_id.map(|r| r.id).unwrap_or_default(),
reqid: req_id,
request_lsn: read_lsn.request_lsn,
not_modified_since: read_lsn
.not_modified_since_lsn
@@ -3401,8 +3351,6 @@ impl GrpcPageServiceHandler {
/// NB: errors returned from here are intercepted in get_pages(), and may be converted to a
/// GetPageResponse with an appropriate status code to avoid terminating the stream.
///
/// TODO: verify that the requested pages belong to this shard.
///
/// TODO: get_vectored() currently enforces a batch limit of 32. Postgres will typically send
/// batches up to effective_io_concurrency = 100. Either we have to accept large batches, or
/// split them up in the client or server.
@@ -3453,7 +3401,7 @@ impl GrpcPageServiceHandler {
batch.push(BatchedGetPageRequest {
req: PagestreamGetPageRequest {
hdr: Self::make_hdr(req.read_lsn, Some(req.request_id)),
hdr: Self::make_hdr(req.read_lsn, req.request_id),
rel: req.rel,
blkno,
},
@@ -3483,16 +3431,12 @@ impl GrpcPageServiceHandler {
request_id: req.request_id,
status_code: page_api::GetPageStatusCode::Ok,
reason: None,
rel: req.rel,
pages: Vec::with_capacity(results.len()),
page_images: Vec::with_capacity(results.len()),
};
for result in results {
match result {
Ok((PagestreamBeMessage::GetPage(r), _, _)) => resp.pages.push(page_api::Page {
block_number: r.req.blkno,
image: r.page,
}),
Ok((PagestreamBeMessage::GetPage(r), _, _)) => resp.page_images.push(r.page),
Ok((resp, _, _)) => {
return Err(tonic::Status::internal(format!(
"unexpected response: {resp:?}"
@@ -3535,7 +3479,7 @@ impl proto::PageService for GrpcPageServiceHandler {
span_record!(rel=%req.rel, lsn=%req.read_lsn);
let req = PagestreamExistsRequest {
hdr: Self::make_hdr(req.read_lsn, None),
hdr: Self::make_hdr(req.read_lsn, 0),
rel: req.rel,
};
@@ -3685,7 +3629,7 @@ impl proto::PageService for GrpcPageServiceHandler {
span_record!(db_oid=%req.db_oid, lsn=%req.read_lsn);
let req = PagestreamDbSizeRequest {
hdr: Self::make_hdr(req.read_lsn, None),
hdr: Self::make_hdr(req.read_lsn, 0),
dbnode: req.db_oid,
};
@@ -3735,7 +3679,7 @@ impl proto::PageService for GrpcPageServiceHandler {
.await?
.downgrade();
while let Some(req) = reqs.message().await? {
let req_id = req.request_id.map(page_api::RequestID::from).unwrap_or_default();
let req_id = req.request_id;
let result = Self::get_page(&ctx, &timeline, req, io_concurrency.clone())
.instrument(span.clone()) // propagate request span
.await;
@@ -3774,7 +3718,7 @@ impl proto::PageService for GrpcPageServiceHandler {
span_record!(rel=%req.rel, lsn=%req.read_lsn);
let req = PagestreamNblocksRequest {
hdr: Self::make_hdr(req.read_lsn, None),
hdr: Self::make_hdr(req.read_lsn, 0),
rel: req.rel,
};
@@ -3807,7 +3751,7 @@ impl proto::PageService for GrpcPageServiceHandler {
span_record!(kind=%req.kind, segno=%req.segno, lsn=%req.read_lsn);
let req = PagestreamGetSlruSegmentRequest {
hdr: Self::make_hdr(req.read_lsn, None),
hdr: Self::make_hdr(req.read_lsn, 0),
kind: req.kind as u8,
segno: req.segno,
};

View File

@@ -141,23 +141,6 @@ pub(crate) enum CollectKeySpaceError {
Cancelled,
}
impl CollectKeySpaceError {
pub(crate) fn is_cancel(&self) -> bool {
match self {
CollectKeySpaceError::Decode(_) => false,
CollectKeySpaceError::PageRead(e) => e.is_cancel(),
CollectKeySpaceError::Cancelled => true,
}
}
pub(crate) fn into_anyhow(self) -> anyhow::Error {
match self {
CollectKeySpaceError::Decode(e) => anyhow::Error::new(e),
CollectKeySpaceError::PageRead(e) => anyhow::Error::new(e),
CollectKeySpaceError::Cancelled => anyhow::Error::new(self),
}
}
}
impl From<PageReconstructError> for CollectKeySpaceError {
fn from(err: PageReconstructError) -> Self {
match err {

View File

@@ -34,7 +34,7 @@ use once_cell::sync::Lazy;
pub use pageserver_api::models::TenantState;
use pageserver_api::models::{self, RelSizeMigration};
use pageserver_api::models::{
CompactInfoResponse, TimelineArchivalState, TimelineState, TopTenantShardItem,
CompactInfoResponse, LsnLease, TimelineArchivalState, TimelineState, TopTenantShardItem,
WalRedoManagerStatus,
};
use pageserver_api::shard::{ShardIdentity, ShardStripeSize, TenantShardId};
@@ -142,9 +142,6 @@ mod gc_block;
mod gc_result;
pub(crate) mod throttle;
#[cfg(test)]
pub mod debug;
pub(crate) use timeline::{LogicalSizeCalculationCause, PageReconstructError, Timeline};
pub(crate) use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
@@ -183,7 +180,6 @@ pub(super) struct AttachedTenantConf {
impl AttachedTenantConf {
fn new(
conf: &'static PageServerConf,
tenant_conf: pageserver_api::models::TenantConfig,
location: AttachedLocationConfig,
) -> Self {
@@ -195,7 +191,9 @@ impl AttachedTenantConf {
let lsn_lease_deadline = if location.attach_mode == AttachmentMode::Single {
Some(
tokio::time::Instant::now()
+ TenantShard::get_lsn_lease_length_impl(conf, &tenant_conf),
+ tenant_conf
.lsn_lease_length
.unwrap_or(LsnLease::DEFAULT_LENGTH),
)
} else {
// We don't use `lsn_lease_deadline` to delay GC in AttachedMulti and AttachedStale
@@ -210,13 +208,10 @@ impl AttachedTenantConf {
}
}
fn try_from(
conf: &'static PageServerConf,
location_conf: LocationConf,
) -> anyhow::Result<Self> {
fn try_from(location_conf: LocationConf) -> anyhow::Result<Self> {
match &location_conf.mode {
LocationMode::Attached(attach_conf) => {
Ok(Self::new(conf, location_conf.tenant_conf, *attach_conf))
Ok(Self::new(location_conf.tenant_conf, *attach_conf))
}
LocationMode::Secondary(_) => {
anyhow::bail!(
@@ -391,7 +386,7 @@ pub struct TenantShard {
l0_flush_global_state: L0FlushGlobalState,
pub(crate) feature_resolver: Arc<TenantFeatureResolver>,
pub(crate) feature_resolver: TenantFeatureResolver,
}
impl std::fmt::Debug for TenantShard {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@@ -3291,9 +3286,7 @@ impl TenantShard {
// Ignore this, we likely raced with unarchival.
OffloadError::NotArchived => Ok(()),
OffloadError::AlreadyInProgress => Ok(()),
OffloadError::Cancelled => Err(CompactionError::new_cancelled()),
// don't break the anyhow chain
OffloadError::Other(err) => Err(CompactionError::Other(err)),
err => Err(err),
})?;
}
@@ -3321,13 +3314,27 @@ impl TenantShard {
/// Trips the compaction circuit breaker if appropriate.
pub(crate) fn maybe_trip_compaction_breaker(&self, err: &CompactionError) {
if err.is_cancel() {
return;
match err {
err if err.is_cancel() => {}
CompactionError::ShuttingDown => (),
// Offload failures don't trip the circuit breaker, since they're cheap to retry and
// shouldn't block compaction.
CompactionError::Offload(_) => {}
CompactionError::CollectKeySpaceError(err) => {
// CollectKeySpaceError::Cancelled and PageRead::Cancelled are handled in `err.is_cancel` branch.
self.compaction_circuit_breaker
.lock()
.unwrap()
.fail(&CIRCUIT_BREAKERS_BROKEN, err);
}
CompactionError::Other(err) => {
self.compaction_circuit_breaker
.lock()
.unwrap()
.fail(&CIRCUIT_BREAKERS_BROKEN, err);
}
CompactionError::AlreadyRunning(_) => {}
}
self.compaction_circuit_breaker
.lock()
.unwrap()
.fail(&CIRCUIT_BREAKERS_BROKEN, err);
}
/// Cancel scheduled compaction tasks
@@ -3404,7 +3411,7 @@ impl TenantShard {
}
// Update the feature resolver with the latest tenant-spcific data.
self.feature_resolver.refresh_properties_and_flags(self);
self.feature_resolver.update_cached_tenant_properties(self);
}
pub fn timeline_has_no_attached_children(&self, timeline_id: TimelineId) -> bool {
@@ -4171,15 +4178,6 @@ impl TenantShard {
.unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
}
// HADRON
pub fn get_image_creation_timeout(&self) -> Option<Duration> {
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
tenant_conf.image_layer_force_creation_period.or(self
.conf
.default_tenant_conf
.image_layer_force_creation_period)
}
pub fn get_pitr_interval(&self) -> Duration {
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
tenant_conf
@@ -4207,16 +4205,10 @@ impl TenantShard {
}
pub fn get_lsn_lease_length(&self) -> Duration {
Self::get_lsn_lease_length_impl(self.conf, &self.tenant_conf.load().tenant_conf)
}
pub fn get_lsn_lease_length_impl(
conf: &'static PageServerConf,
tenant_conf: &pageserver_api::models::TenantConfig,
) -> Duration {
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
tenant_conf
.lsn_lease_length
.unwrap_or(conf.default_tenant_conf.lsn_lease_length)
.unwrap_or(self.conf.default_tenant_conf.lsn_lease_length)
}
pub fn get_timeline_offloading_enabled(&self) -> bool {
@@ -4502,10 +4494,10 @@ impl TenantShard {
gc_block: Default::default(),
l0_flush_global_state,
basebackup_cache,
feature_resolver: Arc::new(TenantFeatureResolver::new(
feature_resolver: TenantFeatureResolver::new(
feature_resolver,
tenant_shard_id.tenant_id,
)),
),
}
}
@@ -5719,16 +5711,6 @@ impl TenantShard {
.unwrap_or(0)
}
/// HADRON
/// Return the visible size of all timelines in this tenant.
pub(crate) fn get_visible_size(&self) -> u64 {
let timelines = self.timelines.lock().unwrap();
timelines
.values()
.map(|t| t.metrics.visible_physical_size_gauge.get())
.sum()
}
/// Builds a new tenant manifest, and uploads it if it differs from the last-known tenant
/// manifest in `Self::remote_tenant_manifest`.
///
@@ -6027,24 +6009,22 @@ pub(crate) mod harness {
}
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
pub(crate) async fn do_try_load_with_redo(
pub(crate) async fn do_try_load(
&self,
walredo_mgr: Arc<WalRedoManager>,
ctx: &RequestContext,
) -> anyhow::Result<Arc<TenantShard>> {
let walredo_mgr = Arc::new(WalRedoManager::from(TestRedoManager));
let (basebackup_cache, _) = BasebackupCache::new(Utf8PathBuf::new(), None);
let tenant = Arc::new(TenantShard::new(
TenantState::Attaching,
self.conf,
AttachedTenantConf::try_from(
self.conf,
LocationConf::attached_single(
self.tenant_conf.clone(),
self.generation,
ShardParameters::default(),
),
)
AttachedTenantConf::try_from(LocationConf::attached_single(
self.tenant_conf.clone(),
self.generation,
ShardParameters::default(),
))
.unwrap(),
self.shard_identity,
Some(walredo_mgr),
@@ -6069,14 +6049,6 @@ pub(crate) mod harness {
Ok(tenant)
}
pub(crate) async fn do_try_load(
&self,
ctx: &RequestContext,
) -> anyhow::Result<Arc<TenantShard>> {
let walredo_mgr = Arc::new(WalRedoManager::from(TestRedoManager));
self.do_try_load_with_redo(walredo_mgr, ctx).await
}
pub fn timeline_path(&self, timeline_id: &TimelineId) -> Utf8PathBuf {
self.conf.timeline_path(&self.tenant_shard_id, timeline_id)
}
@@ -6153,7 +6125,7 @@ mod tests {
use pageserver_api::keyspace::KeySpace;
#[cfg(feature = "testing")]
use pageserver_api::keyspace::KeySpaceRandomAccum;
use pageserver_api::models::{CompactionAlgorithm, CompactionAlgorithmSettings, LsnLease};
use pageserver_api::models::{CompactionAlgorithm, CompactionAlgorithmSettings};
use pageserver_compaction::helpers::overlaps_with;
#[cfg(feature = "testing")]
use rand::SeedableRng;
@@ -6703,13 +6675,17 @@ mod tests {
tline.freeze_and_flush().await.map_err(|e| e.into())
}
#[tokio::test]
#[tokio::test(start_paused = true)]
async fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> {
let (tenant, ctx) =
TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")
.await?
.load()
.await;
// Advance to the lsn lease deadline so that GC is not blocked by
// initial transition into AttachedSingle.
tokio::time::advance(tenant.get_lsn_lease_length()).await;
tokio::time::resume();
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
@@ -9408,21 +9384,17 @@ mod tests {
Ok(())
}
#[tokio::test]
#[tokio::test(start_paused = true)]
async fn test_lsn_lease() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_lsn_lease")
.await
.unwrap()
.load()
.await;
// set a non-zero lease length to test the feature
tenant
.update_tenant_config(|mut conf| {
conf.lsn_lease_length = Some(LsnLease::DEFAULT_LENGTH);
Ok(conf)
})
.unwrap();
// Advance to the lsn lease deadline so that GC is not blocked by
// initial transition into AttachedSingle.
tokio::time::advance(tenant.get_lsn_lease_length()).await;
tokio::time::resume();
let key = Key::from_hex("010000000033333333444444445500000000").unwrap();
let end_lsn = Lsn(0x100);
@@ -12816,40 +12788,6 @@ mod tests {
},
]
);
Ok(())
}
#[tokio::test]
async fn test_get_force_image_creation_lsn() -> anyhow::Result<()> {
let tenant_conf = pageserver_api::models::TenantConfig {
pitr_interval: Some(Duration::from_secs(7 * 3600)),
image_layer_force_creation_period: Some(Duration::from_secs(3600)),
..Default::default()
};
let tenant_id = TenantId::generate();
let harness = TenantHarness::create_custom(
"test_get_force_image_creation_lsn",
tenant_conf,
tenant_id,
ShardIdentity::unsharded(),
Generation::new(1),
)
.await?;
let (tenant, ctx) = harness.load().await;
let timeline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
timeline.gc_info.write().unwrap().cutoffs.time = Some(Lsn(100));
{
let writer = timeline.writer().await;
writer.finish_write(Lsn(5000));
}
let image_creation_lsn = timeline.get_force_image_creation_lsn().unwrap();
assert_eq!(image_creation_lsn, Lsn(4300));
Ok(())
}
}

View File

@@ -1,366 +0,0 @@
use std::{ops::Range, str::FromStr, sync::Arc};
use crate::walredo::RedoAttemptType;
use base64::{Engine as _, engine::general_purpose::STANDARD};
use bytes::{Bytes, BytesMut};
use camino::Utf8PathBuf;
use clap::Parser;
use itertools::Itertools;
use pageserver_api::{
key::Key,
keyspace::KeySpace,
shard::{ShardIdentity, ShardStripeSize},
};
use postgres_ffi::PgMajorVersion;
use postgres_ffi::{BLCKSZ, page_is_new, page_set_lsn};
use tracing::Instrument;
use utils::{
generation::Generation,
id::{TenantId, TimelineId},
lsn::Lsn,
shard::{ShardCount, ShardIndex, ShardNumber},
};
use wal_decoder::models::record::NeonWalRecord;
use crate::{
context::{DownloadBehavior, RequestContext},
task_mgr::TaskKind,
tenant::storage_layer::ValueReconstructState,
walredo::harness::RedoHarness,
};
use super::{
WalRedoManager, WalredoManagerId,
harness::TenantHarness,
remote_timeline_client::LayerFileMetadata,
storage_layer::{AsLayerDesc, IoConcurrency, Layer, LayerName, ValuesReconstructState},
};
fn process_page_image(next_record_lsn: Lsn, is_fpw: bool, img_bytes: Bytes) -> Bytes {
// To match the logic in libs/wal_decoder/src/serialized_batch.rs
let mut new_image: BytesMut = img_bytes.into();
if is_fpw && !page_is_new(&new_image) {
page_set_lsn(&mut new_image, next_record_lsn);
}
assert_eq!(new_image.len(), BLCKSZ as usize);
new_image.freeze()
}
async fn redo_wals(input: &str, key: Key) -> anyhow::Result<()> {
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::generate();
let redo_harness = RedoHarness::new()?;
let span = redo_harness.span();
let tenant_conf = pageserver_api::models::TenantConfig {
..Default::default()
};
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
let tenant = TenantHarness::create_custom(
"search_key",
tenant_conf,
tenant_id,
ShardIdentity::unsharded(),
Generation::new(1),
)
.await?
.do_try_load_with_redo(
Arc::new(WalRedoManager::Prod(
WalredoManagerId::next(),
redo_harness.manager,
)),
&ctx,
)
.await
.unwrap();
let timeline = tenant
.create_test_timeline(timeline_id, Lsn(0x10), PgMajorVersion::PG16, &ctx)
.await?;
let contents = tokio::fs::read_to_string(input)
.await
.map_err(|e| anyhow::Error::msg(format!("Failed to read input file {input}: {e}")))
.unwrap();
let lines = contents.lines();
let mut last_wal_lsn: Option<Lsn> = None;
let state = {
let mut state = ValueReconstructState::default();
let mut is_fpw = false;
let mut is_first_line = true;
for line in lines {
if is_first_line {
is_first_line = false;
if line.trim() == "FPW" {
is_fpw = true;
}
continue; // Skip the first line.
}
// Each input line is in the "<next_record_lsn>,<base64>" format.
let (lsn_str, payload_b64) = line
.split_once(',')
.expect("Invalid input format: expected '<lsn>,<base64>'");
// Parse the LSN and decode the payload.
let lsn = Lsn::from_str(lsn_str.trim()).expect("Invalid LSN format");
let bytes = Bytes::from(
STANDARD
.decode(payload_b64.trim())
.expect("Invalid base64 payload"),
);
// The first line is considered the base image, the rest are WAL records.
if state.img.is_none() {
state.img = Some((lsn, process_page_image(lsn, is_fpw, bytes)));
} else {
let wal_record = NeonWalRecord::Postgres {
will_init: false,
rec: bytes,
};
state.records.push((lsn, wal_record));
last_wal_lsn.replace(lsn);
}
}
state
};
assert!(state.img.is_some(), "No base image found");
assert!(!state.records.is_empty(), "No WAL records found");
let result = timeline
.reconstruct_value(key, last_wal_lsn.unwrap(), state, RedoAttemptType::ReadPage)
.instrument(span.clone())
.await?;
eprintln!("final image: {:?}", STANDARD.encode(result));
Ok(())
}
async fn search_key(
tenant_id: TenantId,
timeline_id: TimelineId,
dir: String,
key: Key,
lsn: Lsn,
) -> anyhow::Result<()> {
let shard_index = ShardIndex {
shard_number: ShardNumber(0),
shard_count: ShardCount(4),
};
let redo_harness = RedoHarness::new()?;
let span = redo_harness.span();
let tenant_conf = pageserver_api::models::TenantConfig {
..Default::default()
};
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
let tenant = TenantHarness::create_custom(
"search_key",
tenant_conf,
tenant_id,
ShardIdentity::new(
shard_index.shard_number,
shard_index.shard_count,
ShardStripeSize(32768),
)
.unwrap(),
Generation::new(1),
)
.await?
.do_try_load_with_redo(
Arc::new(WalRedoManager::Prod(
WalredoManagerId::next(),
redo_harness.manager,
)),
&ctx,
)
.await
.unwrap();
let timeline = tenant
.create_test_timeline(timeline_id, Lsn(0x10), PgMajorVersion::PG16, &ctx)
.await?;
let mut delta_layers: Vec<Layer> = Vec::new();
let mut img_layer: Option<Layer> = Option::None;
let mut dir = tokio::fs::read_dir(dir).await?;
loop {
let entry = dir.next_entry().await?;
if entry.is_none() || !entry.as_ref().unwrap().file_type().await?.is_file() {
break;
}
let path = Utf8PathBuf::from_path_buf(entry.unwrap().path()).unwrap();
let layer_name = match LayerName::from_str(path.file_name().unwrap()) {
Ok(name) => name,
Err(_) => {
eprintln!("Skipped invalid layer: {path}");
continue;
}
};
let layer = Layer::for_resident(
tenant.conf,
&timeline,
path.clone(),
layer_name,
LayerFileMetadata::new(
tokio::fs::metadata(path.clone()).await?.len(),
Generation::new(1),
shard_index,
),
);
if layer.layer_desc().is_delta() {
delta_layers.push(layer.into());
} else if img_layer.is_none() {
img_layer = Some(layer.into());
} else {
anyhow::bail!("Found multiple image layers");
}
}
// sort delta layers based on the descending order of LSN
delta_layers.sort_by(|a, b| {
b.layer_desc()
.get_lsn_range()
.start
.cmp(&a.layer_desc().get_lsn_range().start)
});
let mut state = ValuesReconstructState::new(IoConcurrency::Sequential);
let key_space = KeySpace::single(Range {
start: key,
end: key.next(),
});
let lsn_range = Range {
start: img_layer
.as_ref()
.map_or(Lsn(0x00), |img| img.layer_desc().image_layer_lsn()),
end: lsn,
};
for delta_layer in delta_layers.iter() {
delta_layer
.get_values_reconstruct_data(key_space.clone(), lsn_range.clone(), &mut state, &ctx)
.await?;
}
img_layer
.as_ref()
.unwrap()
.get_values_reconstruct_data(key_space.clone(), lsn_range.clone(), &mut state, &ctx)
.await?;
for (_key, result) in std::mem::take(&mut state.keys) {
let state = result.collect_pending_ios().await?;
if state.img.is_some() {
eprintln!(
"image: {}: {:x?}",
state.img.as_ref().unwrap().0,
STANDARD.encode(state.img.as_ref().unwrap().1.clone())
);
}
for delta in state.records.iter() {
match &delta.1 {
NeonWalRecord::Postgres { will_init, rec } => {
eprintln!(
"delta: {}: will_init: {}, {:x?}",
delta.0,
will_init,
STANDARD.encode(rec)
);
}
_ => {
eprintln!("delta: {}: {:x?}", delta.0, delta.1);
}
}
}
let result = timeline
.reconstruct_value(key, lsn_range.end, state, RedoAttemptType::ReadPage)
.instrument(span.clone())
.await?;
eprintln!("final image: {lsn} : {result:?}");
}
Ok(())
}
/// Redo all WALs against the base image in the input file. Return the base64 encoded final image.
/// Each line in the input file must be in the form "<lsn>,<base64>" where:
/// * `<lsn>` is a PostgreSQL LSN in hexadecimal notation, e.g. `0/16ABCDE`.
/// * `<base64>` is the base64encoded page image (first line) or WAL record (subsequent lines).
///
/// The first line provides the base image of a page. The LSN is the LSN of "next record" following
/// the record containing the FPI. For example, if the FPI was extracted from a WAL record occuping
/// [0/1, 0/200) in the WAL stream, the LSN appearing along side the page image here should be 0/200.
///
/// The subsequent lines are WAL records, ordered from the oldest to the newest. The LSN is the
/// record LSN of the WAL record, not the "next record" LSN. For example, if the WAL record here
/// occupies [0/1, 0/200) in the WAL stream, the LSN appearing along side the WAL record here should
/// be 0/1.
#[derive(Parser)]
struct RedoWalsCmd {
#[clap(long)]
input: String,
#[clap(long)]
key: String,
}
#[tokio::test]
async fn test_redo_wals() -> anyhow::Result<()> {
let args = std::env::args().collect_vec();
let pos = args
.iter()
.position(|arg| arg == "--")
.unwrap_or(args.len());
let slice = &args[pos..args.len()];
let cmd = match RedoWalsCmd::try_parse_from(slice) {
Ok(cmd) => cmd,
Err(err) => {
eprintln!("{err}");
return Ok(());
}
};
let key = Key::from_hex(&cmd.key).unwrap();
redo_wals(&cmd.input, key).await?;
Ok(())
}
/// Search for a page at the given LSN in all layers of the data_dir.
/// Return the base64-encoded image and all WAL records, as well as the final reconstructed image.
#[derive(Parser)]
struct SearchKeyCmd {
#[clap(long)]
tenant_id: String,
#[clap(long)]
timeline_id: String,
#[clap(long)]
data_dir: String,
#[clap(long)]
key: String,
#[clap(long)]
lsn: String,
}
#[tokio::test]
async fn test_search_key() -> anyhow::Result<()> {
let args = std::env::args().collect_vec();
let pos = args
.iter()
.position(|arg| arg == "--")
.unwrap_or(args.len());
let slice = &args[pos..args.len()];
let cmd = match SearchKeyCmd::try_parse_from(slice) {
Ok(cmd) => cmd,
Err(err) => {
eprintln!("{err}");
return Ok(());
}
};
let tenant_id = TenantId::from_str(&cmd.tenant_id).unwrap();
let timeline_id = TimelineId::from_str(&cmd.timeline_id).unwrap();
let key = Key::from_hex(&cmd.key).unwrap();
let lsn = Lsn::from_str(&cmd.lsn).unwrap();
search_key(tenant_id, timeline_id, cmd.data_dir, key, lsn).await?;
Ok(())
}

View File

@@ -46,11 +46,10 @@
mod historic_layer_coverage;
mod layer_coverage;
use std::collections::{BTreeMap, HashMap, VecDeque};
use std::collections::{HashMap, VecDeque};
use std::iter::Peekable;
use std::ops::Range;
use std::sync::Arc;
use std::time::Instant;
use anyhow::Result;
use historic_layer_coverage::BufferedHistoricLayerCoverage;
@@ -905,103 +904,6 @@ impl LayerMap {
max_stacked_deltas
}
/* BEGIN_HADRON */
/**
* Compute the image consistent LSN, the largest LSN below which all pages have been redone successfully.
* It works by first finding the latest image layers and store them into a map. Then for each delta layer,
* find all overlapping image layers in order to potentially increase the image LSN in case there are gaps
* (e.g., if an image is created at LSN 100 but the delta layer spans LSN [150, 200], then we can increase
* image LSN to 150 because there is no WAL record in between).
* Finally, the image consistent LSN is computed by taking the minimum of all image layers.
*/
pub fn compute_image_consistent_lsn(&self, disk_consistent_lsn: Lsn) -> Lsn {
struct ImageLayerInfo {
// creation LSN of the image layer
image_lsn: Lsn,
// the current minimum LSN of newer delta layers with overlapping key ranges
min_delta_lsn: Lsn,
}
let started_at = Instant::now();
let min_l0_deltas_lsn = {
let l0_deltas = self.level0_deltas();
l0_deltas
.iter()
.map(|layer| layer.get_lsn_range().start)
.min()
.unwrap_or(disk_consistent_lsn)
};
let global_key_range = Key::MIN..Key::MAX;
// step 1: collect all most recent image layers into a map
// map: end key to image_layer_info
let mut image_map: BTreeMap<Key, ImageLayerInfo> = BTreeMap::new();
for (img_range, img) in self.image_coverage(&global_key_range, disk_consistent_lsn) {
let img_lsn = img.map(|layer| layer.get_lsn_range().end).unwrap_or(Lsn(0));
image_map.insert(
img_range.end,
ImageLayerInfo {
image_lsn: img_lsn,
min_delta_lsn: min_l0_deltas_lsn,
},
);
}
// step 2: go through all delta layers, and update the image layer info with overlapping
// key ranges
for layer in self.historic.iter() {
if !layer.is_delta {
continue;
}
let delta_key_range = layer.get_key_range();
let delta_lsn_range = layer.get_lsn_range();
for (img_end_key, img_info) in image_map.range_mut(delta_key_range.start..Key::MAX) {
debug_assert!(img_end_key >= &delta_key_range.start);
if delta_lsn_range.end > img_info.image_lsn {
// the delta layer includes WAL records after the image
// it's possibel that the delta layer's start LSN < image LSN, which will be simply ignored by step 3
img_info.min_delta_lsn =
std::cmp::min(img_info.min_delta_lsn, delta_lsn_range.start);
}
if img_end_key >= &delta_key_range.end {
// we have fully processed all overlapping image layers
break;
}
}
}
// step 3, go through all image layers and find the image consistent LSN
let mut img_consistent_lsn = min_l0_deltas_lsn.checked_sub(Lsn(1)).unwrap();
let mut prev_key = Key::MIN;
for (img_key, img_info) in image_map {
tracing::debug!(
"Image layer {:?}:{} has min delta lsn {}",
Range {
start: prev_key,
end: img_key,
},
img_info.image_lsn,
img_info.min_delta_lsn,
);
let image_lsn = std::cmp::max(
img_info.image_lsn,
img_info.min_delta_lsn.checked_sub(Lsn(1)).unwrap_or(Lsn(0)),
);
img_consistent_lsn = std::cmp::min(img_consistent_lsn, image_lsn);
prev_key = img_key;
}
tracing::info!(
"computed image_consistent_lsn {} for disk_consistent_lsn {} in {}ms. Processed {} layrs in total.",
img_consistent_lsn,
disk_consistent_lsn,
started_at.elapsed().as_millis(),
self.historic.len()
);
img_consistent_lsn
}
/* END_HADRON */
/// Return all L0 delta layers
pub fn level0_deltas(&self) -> &Vec<Arc<PersistentLayerDesc>> {
&self.l0_delta_layers
@@ -1677,138 +1579,6 @@ mod tests {
LayerVisibilityHint::Visible
));
}
/* BEGIN_HADRON */
#[test]
fn test_compute_image_consistent_lsn() {
let mut layer_map = LayerMap::default();
let disk_consistent_lsn = Lsn(1000);
// case 1: empty layer map
let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn);
assert_eq!(
disk_consistent_lsn.checked_sub(Lsn(1)).unwrap(),
image_consistent_lsn
);
// case 2: only L0 delta layer
{
let mut updates = layer_map.batch_update();
updates.insert_historic(PersistentLayerDesc::new_test(
Key::from_i128(0)..Key::from_i128(100),
Lsn(900)..Lsn(990),
true,
));
updates.insert_historic(PersistentLayerDesc::new_test(
Key::from_i128(0)..Key::from_i128(100),
Lsn(850)..Lsn(899),
true,
));
}
// should use min L0 delta LSN - 1 as image consistent LSN
let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn);
assert_eq!(Lsn(849), image_consistent_lsn);
// case 3: 3 images, no L1 delta
{
let mut updates = layer_map.batch_update();
updates.insert_historic(PersistentLayerDesc::new_test(
Key::from_i128(0)..Key::from_i128(40),
Lsn(100)..Lsn(100),
false,
));
updates.insert_historic(PersistentLayerDesc::new_test(
Key::from_i128(40)..Key::from_i128(70),
Lsn(200)..Lsn(200),
false,
));
updates.insert_historic(PersistentLayerDesc::new_test(
Key::from_i128(70)..Key::from_i128(100),
Lsn(150)..Lsn(150),
false,
));
}
// should use min L0 delta LSN - 1 as image consistent LSN
let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn);
assert_eq!(Lsn(849), image_consistent_lsn);
// case 4: 3 images with 1 L1 delta
{
let mut updates = layer_map.batch_update();
updates.insert_historic(PersistentLayerDesc::new_test(
Key::from_i128(0)..Key::from_i128(50),
Lsn(300)..Lsn(350),
true,
));
}
let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn);
assert_eq!(Lsn(299), image_consistent_lsn);
// case 5: 3 images with 1 more L1 delta with smaller LSN
{
let mut updates = layer_map.batch_update();
updates.insert_historic(PersistentLayerDesc::new_test(
Key::from_i128(50)..Key::from_i128(72),
Lsn(200)..Lsn(300),
true,
));
}
let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn);
assert_eq!(Lsn(199), image_consistent_lsn);
// case 6: 3 images with more newer L1 deltas (no impact on final results)
{
let mut updates = layer_map.batch_update();
updates.insert_historic(PersistentLayerDesc::new_test(
Key::from_i128(0)..Key::from_i128(30),
Lsn(400)..Lsn(500),
true,
));
updates.insert_historic(PersistentLayerDesc::new_test(
Key::from_i128(35)..Key::from_i128(100),
Lsn(450)..Lsn(600),
true,
));
}
let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn);
assert_eq!(Lsn(199), image_consistent_lsn);
// case 7: 3 images with more older L1 deltas (no impact on final results)
{
let mut updates = layer_map.batch_update();
updates.insert_historic(PersistentLayerDesc::new_test(
Key::from_i128(0)..Key::from_i128(40),
Lsn(0)..Lsn(50),
true,
));
updates.insert_historic(PersistentLayerDesc::new_test(
Key::from_i128(50)..Key::from_i128(100),
Lsn(10)..Lsn(60),
true,
));
}
let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn);
assert_eq!(Lsn(199), image_consistent_lsn);
// case 8: 3 images with one more L1 delta with overlapping LSN range
{
let mut updates = layer_map.batch_update();
updates.insert_historic(PersistentLayerDesc::new_test(
Key::from_i128(0)..Key::from_i128(50),
Lsn(50)..Lsn(250),
true,
));
}
let image_consistent_lsn = layer_map.compute_image_consistent_lsn(disk_consistent_lsn);
assert_eq!(Lsn(100), image_consistent_lsn);
}
/* END_HADRON */
}
#[cfg(test)]

View File

@@ -43,7 +43,7 @@ use crate::controller_upcall_client::{
};
use crate::deletion_queue::DeletionQueueClient;
use crate::http::routes::ACTIVE_TENANT_TIMEOUT;
use crate::metrics::{LOCAL_DATA_LOSS_SUSPECTED, TENANT, TENANT_MANAGER as METRICS};
use crate::metrics::{TENANT, TENANT_MANAGER as METRICS};
use crate::task_mgr::{BACKGROUND_RUNTIME, TaskKind};
use crate::tenant::config::{
AttachedLocationConfig, AttachmentMode, LocationConf, LocationMode, SecondaryLocationConfig,
@@ -538,21 +538,6 @@ pub async fn init_tenant_mgr(
// Determine which tenants are to be secondary or attached, and in which generation
let tenant_modes = init_load_generations(conf, &tenant_configs, resources, cancel).await?;
// Hadron local SSD check: Raise an alert if our local filesystem does not contain any tenants but the re-attach request returned tenants.
// This can happen if the PS suffered a Kubernetes node failure resulting in loss of all local data, but recovered quickly on another node
// so the Storage Controller has not had the time to move tenants out.
let data_loss_suspected = if let Some(tenant_modes) = &tenant_modes {
tenant_configs.is_empty() && !tenant_modes.is_empty()
} else {
false
};
if data_loss_suspected {
tracing::error!(
"Local data loss suspected: no tenants found on local filesystem, but re-attach request returned tenants"
);
}
LOCAL_DATA_LOSS_SUSPECTED.set(if data_loss_suspected { 1 } else { 0 });
tracing::info!(
"Attaching {} tenants at startup, warming up {} at a time",
tenant_configs.len(),
@@ -679,7 +664,7 @@ pub async fn init_tenant_mgr(
tenant_shard_id,
&tenant_dir_path,
resources.clone(),
AttachedTenantConf::new(conf, location_conf.tenant_conf, attached_conf),
AttachedTenantConf::new(location_conf.tenant_conf, attached_conf),
shard_identity,
Some(init_order.clone()),
SpawnMode::Lazy,
@@ -857,11 +842,8 @@ impl TenantManager {
// take our fast path and just provide the updated configuration
// to the tenant.
tenant.set_new_location_config(
AttachedTenantConf::try_from(
self.conf,
new_location_config.clone(),
)
.map_err(UpsertLocationError::BadRequest)?,
AttachedTenantConf::try_from(new_location_config.clone())
.map_err(UpsertLocationError::BadRequest)?,
);
Some(FastPathModified::Attached(tenant.clone()))
@@ -1064,7 +1046,7 @@ impl TenantManager {
// Testing hack: if we are configured with no control plane, then drop the generation
// from upserts. This enables creating generation-less tenants even though neon_local
// always uses generations when calling the location conf API.
let attached_conf = AttachedTenantConf::try_from(self.conf, new_location_config)
let attached_conf = AttachedTenantConf::try_from(new_location_config)
.map_err(UpsertLocationError::BadRequest)?;
let tenant = tenant_spawn(
@@ -1268,7 +1250,7 @@ impl TenantManager {
tenant_shard_id,
&tenant_path,
self.resources.clone(),
AttachedTenantConf::try_from(self.conf, config)?,
AttachedTenantConf::try_from(config)?,
shard_identity,
None,
SpawnMode::Eager,
@@ -2149,7 +2131,7 @@ impl TenantManager {
tenant_shard_id,
&tenant_path,
self.resources.clone(),
AttachedTenantConf::try_from(self.conf, config).map_err(Error::DetachReparent)?,
AttachedTenantConf::try_from(config).map_err(Error::DetachReparent)?,
shard_identity,
None,
SpawnMode::Eager,

View File

@@ -141,29 +141,11 @@ pub(super) async fn upload_timeline_layer<'a>(
let fs_size = usize::try_from(fs_size)
.with_context(|| format!("convert {local_path:?} size {fs_size} usize"))?;
/* BEGIN_HADRON */
let mut metadata = None;
match storage {
// Pass the file path as a storage metadata to minimize changes to neon.
// Otherwise, we need to change the upload interface.
GenericRemoteStorage::AzureBlob(s) => {
let block_size_mb = s.put_block_size_mb.unwrap_or(0);
if block_size_mb > 0 && fs_size > block_size_mb * 1024 * 1024 {
metadata = Some(remote_storage::StorageMetadata::from([(
"databricks_azure_put_block",
local_path.as_str(),
)]));
}
}
GenericRemoteStorage::LocalFs(_) => {}
GenericRemoteStorage::AwsS3(_) => {}
GenericRemoteStorage::Unreliable(_) => {}
};
/* END_HADRON */
let reader = tokio_util::io::ReaderStream::with_capacity(source_file, super::BUFFER_SIZE);
storage
.upload(reader, fs_size, remote_path, metadata, cancel)
.upload(reader, fs_size, remote_path, None, cancel)
.await
.with_context(|| format!("upload layer from local path '{local_path}'"))
}

View File

@@ -225,7 +225,7 @@ impl fmt::Display for ImageLayerName {
/// storage and object names in remote storage consist of the LayerName plus some extra qualifiers
/// that uniquely identify the physical incarnation of a layer (see [crate::tenant::remote_timeline_client::remote_layer_path])
/// and [`crate::tenant::storage_layer::layer::local_layer_path`])
#[derive(Debug, PartialEq, Eq, Hash, Clone, Ord, PartialOrd)]
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
pub enum LayerName {
Image(ImageLayerName),
Delta(DeltaLayerName),

View File

@@ -17,35 +17,23 @@ use tracing::*;
use utils::backoff::exponential_backoff_duration;
use utils::completion::Barrier;
use utils::pausable_failpoint;
use utils::sync::gate::GateError;
use crate::context::{DownloadBehavior, RequestContext};
use crate::metrics::{self, BackgroundLoopSemaphoreMetricsRecorder, TENANT_TASK_EVENTS};
use crate::task_mgr::{self, BACKGROUND_RUNTIME, TOKIO_WORKER_THREADS, TaskKind};
use crate::tenant::blob_io::WriteBlobError;
use crate::tenant::throttle::Stats;
use crate::tenant::timeline::CompactionError;
use crate::tenant::timeline::compaction::CompactionOutcome;
use crate::tenant::{TenantShard, TenantState};
use crate::virtual_file::owned_buffers_io::write::FlushTaskError;
/// Semaphore limiting concurrent background tasks (across all tenants).
///
/// We use 3/4 Tokio threads, to avoid blocking all threads in case we do any CPU-heavy work.
static CONCURRENT_BACKGROUND_TASKS: Lazy<Semaphore> = Lazy::new(|| {
let total_threads = TOKIO_WORKER_THREADS.get();
/*BEGIN_HADRON*/
// ideally we should run at least one compaction task per tenant in order to (1) maximize
// compaction throughput (2) avoid head-of-line blocking of large compactions. However doing
// that may create too many compaction tasks with lots of memory overheads. So we limit the
// number of compaction tasks based on the available CPU core count.
// Need to revisit.
// let tasks_per_thread = std::env::var("BG_TASKS_PER_THREAD")
// .ok()
// .and_then(|s| s.parse().ok())
// .unwrap_or(4);
// let permits = usize::max(1, total_threads * tasks_per_thread);
// // assert!(permits < total_threads, "need threads for other work");
/*END_HADRON*/
let permits = max(1, (total_threads * 3).checked_div(4).unwrap_or(0));
assert_ne!(permits, 0, "we will not be adding in permits later");
assert!(permits < total_threads, "need threads for other work");
@@ -307,12 +295,48 @@ pub(crate) fn log_compaction_error(
task_cancelled: bool,
degrade_to_warning: bool,
) {
let is_cancel = err.is_cancel();
use CompactionError::*;
let level = if is_cancel || task_cancelled {
Level::INFO
} else {
Level::ERROR
use crate::tenant::PageReconstructError;
use crate::tenant::upload_queue::NotInitialized;
let level = match err {
e if e.is_cancel() => return,
ShuttingDown => return,
Offload(_) => Level::ERROR,
AlreadyRunning(_) => Level::ERROR,
CollectKeySpaceError(_) => Level::ERROR,
_ if task_cancelled => Level::INFO,
Other(err) => {
let root_cause = err.root_cause();
let upload_queue = root_cause
.downcast_ref::<NotInitialized>()
.is_some_and(|e| e.is_stopping());
let timeline = root_cause
.downcast_ref::<PageReconstructError>()
.is_some_and(|e| e.is_stopping());
let buffered_writer_flush_task_canelled = root_cause
.downcast_ref::<FlushTaskError>()
.is_some_and(|e| e.is_cancel());
let write_blob_cancelled = root_cause
.downcast_ref::<WriteBlobError>()
.is_some_and(|e| e.is_cancel());
let gate_closed = root_cause
.downcast_ref::<GateError>()
.is_some_and(|e| e.is_cancel());
let is_stopping = upload_queue
|| timeline
|| buffered_writer_flush_task_canelled
|| write_blob_cancelled
|| gate_closed;
if is_stopping {
Level::INFO
} else {
Level::ERROR
}
}
};
if let Some((error_count, sleep_duration)) = retry_info {

View File

@@ -40,6 +40,7 @@ use layer_manager::{
Shutdown,
};
use offload::OffloadError;
use once_cell::sync::Lazy;
use pageserver_api::config::tenant_conf_defaults::DEFAULT_PITR_INTERVAL;
use pageserver_api::key::{
@@ -118,6 +119,7 @@ use crate::pgdatadir_mapping::{
MAX_AUX_FILE_V2_DELTAS, MetricsUpdate,
};
use crate::task_mgr::TaskKind;
use crate::tenant::config::AttachmentMode;
use crate::tenant::gc_result::GcResult;
use crate::tenant::layer_map::LayerMap;
use crate::tenant::metadata::TimelineMetadata;
@@ -200,7 +202,7 @@ pub struct TimelineResources {
pub l0_compaction_trigger: Arc<Notify>,
pub l0_flush_global_state: l0_flush::L0FlushGlobalState,
pub basebackup_cache: Arc<BasebackupCache>,
pub feature_resolver: Arc<TenantFeatureResolver>,
pub feature_resolver: TenantFeatureResolver,
}
pub struct Timeline {
@@ -448,7 +450,7 @@ pub struct Timeline {
/// A channel to send async requests to prepare a basebackup for the basebackup cache.
basebackup_cache: Arc<BasebackupCache>,
feature_resolver: Arc<TenantFeatureResolver>,
feature_resolver: TenantFeatureResolver,
}
pub(crate) enum PreviousHeatmap {
@@ -585,28 +587,6 @@ pub(crate) enum PageReconstructError {
MissingKey(Box<MissingKeyError>),
}
impl PageReconstructError {
pub(crate) fn is_cancel(&self) -> bool {
match self {
PageReconstructError::Other(_) => false,
PageReconstructError::AncestorLsnTimeout(e) => e.is_cancel(),
PageReconstructError::Cancelled => true,
PageReconstructError::WalRedo(_) => false,
PageReconstructError::MissingKey(_) => false,
}
}
#[allow(dead_code)] // we use the is_cancel + into_anyhow pattern in quite a few places, this one will follow soon enough
pub(crate) fn into_anyhow(self) -> anyhow::Error {
match self {
PageReconstructError::Other(e) => e,
PageReconstructError::AncestorLsnTimeout(e) => e.into_anyhow(),
PageReconstructError::Cancelled => anyhow::Error::new(self),
PageReconstructError::WalRedo(e) => e,
PageReconstructError::MissingKey(_) => anyhow::Error::new(self),
}
}
}
impl From<anyhow::Error> for PageReconstructError {
fn from(value: anyhow::Error) -> Self {
// with walingest.rs many PageReconstructError are wrapped in as anyhow::Error
@@ -760,6 +740,17 @@ impl std::fmt::Display for MissingKeyError {
}
}
impl PageReconstructError {
/// Returns true if this error indicates a tenant/timeline shutdown alike situation
pub(crate) fn is_stopping(&self) -> bool {
use PageReconstructError::*;
match self {
Cancelled => true,
Other(_) | AncestorLsnTimeout(_) | WalRedo(_) | MissingKey(_) => false,
}
}
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum CreateImageLayersError {
#[error("timeline shutting down")]
@@ -962,35 +953,13 @@ pub enum WaitLsnError {
Timeout(String),
}
impl WaitLsnError {
pub(crate) fn is_cancel(&self) -> bool {
match self {
WaitLsnError::Shutdown => true,
WaitLsnError::BadState(timeline_state) => match timeline_state {
TimelineState::Loading => false,
TimelineState::Active => false,
TimelineState::Stopping => true,
TimelineState::Broken { .. } => false,
},
WaitLsnError::Timeout(_) => false,
}
}
pub(crate) fn into_anyhow(self) -> anyhow::Error {
match self {
WaitLsnError::Shutdown => anyhow::Error::new(self),
WaitLsnError::BadState(_) => anyhow::Error::new(self),
WaitLsnError::Timeout(_) => anyhow::Error::new(self),
}
}
}
impl From<WaitLsnError> for tonic::Status {
fn from(err: WaitLsnError) -> Self {
use tonic::Code;
let code = if err.is_cancel() {
Code::Unavailable
} else {
Code::Internal
let code = match &err {
WaitLsnError::Timeout(_) => Code::Internal,
WaitLsnError::BadState(_) => Code::Internal,
WaitLsnError::Shutdown => Code::Unavailable,
};
tonic::Status::new(code, err.to_string())
}
@@ -1002,7 +971,7 @@ impl From<WaitLsnError> for tonic::Status {
impl From<CreateImageLayersError> for CompactionError {
fn from(e: CreateImageLayersError) -> Self {
match e {
CreateImageLayersError::Cancelled => CompactionError::new_cancelled(),
CreateImageLayersError::Cancelled => CompactionError::ShuttingDown,
CreateImageLayersError::Other(e) => {
CompactionError::Other(e.context("create image layers"))
}
@@ -1117,26 +1086,6 @@ enum ImageLayerCreationOutcome {
Skip,
}
enum RepartitionError {
Other(anyhow::Error),
CollectKeyspace(CollectKeySpaceError),
}
impl RepartitionError {
fn is_cancel(&self) -> bool {
match self {
RepartitionError::Other(_) => false,
RepartitionError::CollectKeyspace(e) => e.is_cancel(),
}
}
fn into_anyhow(self) -> anyhow::Error {
match self {
RepartitionError::Other(e) => e,
RepartitionError::CollectKeyspace(e) => e.into_anyhow(),
}
}
}
/// Public interface functions
impl Timeline {
/// Get the LSN where this branch was created
@@ -1823,31 +1772,30 @@ impl Timeline {
existing_lease.clone()
}
Entry::Vacant(vacant) => {
// Never allow a lease to be requested for an LSN below the applied GC cutoff. The data could have been deleted.
let latest_gc_cutoff_lsn = self.get_applied_gc_cutoff_lsn();
if lsn < *latest_gc_cutoff_lsn {
bail!(
"tried to request an lsn lease for an lsn below the latest gc cutoff. requested at {} gc cutoff {}",
lsn,
*latest_gc_cutoff_lsn
);
}
// We allow create lease for those below the planned gc cutoff if we are still within the grace period
// of GC blocking.
// Reject already GC-ed LSN if we are in AttachedSingle and
// not blocked by the lsn lease deadline.
let validate = {
let conf = self.tenant_conf.load();
!conf.is_gc_blocked_by_lsn_lease_deadline()
conf.location.attach_mode == AttachmentMode::Single
&& !conf.is_gc_blocked_by_lsn_lease_deadline()
};
// Do not allow initial lease creation to be below the planned gc cutoff. The client (compute_ctl) determines
// whether it is a initial lease creation or a renewal.
if (init || validate) && lsn < planned_cutoff {
bail!(
"tried to request an lsn lease for an lsn below the planned gc cutoff. requested at {} planned gc cutoff {}",
lsn,
planned_cutoff
);
if init || validate {
let latest_gc_cutoff_lsn = self.get_applied_gc_cutoff_lsn();
if lsn < *latest_gc_cutoff_lsn {
bail!(
"tried to request an lsn lease for an lsn below the latest gc cutoff. requested at {} gc cutoff {}",
lsn,
*latest_gc_cutoff_lsn
);
}
if lsn < planned_cutoff {
bail!(
"tried to request an lsn lease for an lsn below the planned gc cutoff. requested at {} planned gc cutoff {}",
lsn,
planned_cutoff
);
}
}
let dt: DateTime<Utc> = valid_until.into();
@@ -2117,7 +2065,22 @@ impl Timeline {
match &result {
Ok(_) => self.compaction_failed.store(false, AtomicOrdering::Relaxed),
Err(e) if e.is_cancel() => {}
Err(_) => self.compaction_failed.store(true, AtomicOrdering::Relaxed),
Err(CompactionError::ShuttingDown) => {
// Covered by the `Err(e) if e.is_cancel()` branch.
}
Err(CompactionError::AlreadyRunning(_)) => {
// Covered by the `Err(e) if e.is_cancel()` branch.
}
Err(CompactionError::Other(_)) => {
self.compaction_failed.store(true, AtomicOrdering::Relaxed)
}
Err(CompactionError::CollectKeySpaceError(_)) => {
// Cancelled errors are covered by the `Err(e) if e.is_cancel()` branch.
self.compaction_failed.store(true, AtomicOrdering::Relaxed)
}
// Don't change the current value on offload failure or shutdown. We don't want to
// abruptly stall nor resume L0 flushes in these cases.
Err(CompactionError::Offload(_)) => {}
};
result
@@ -2846,18 +2809,6 @@ impl Timeline {
.unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
}
// HADRON
fn get_image_layer_force_creation_period(&self) -> Option<Duration> {
let tenant_conf = self.tenant_conf.load();
tenant_conf
.tenant_conf
.image_layer_force_creation_period
.or(self
.conf
.default_tenant_conf
.image_layer_force_creation_period)
}
fn get_compaction_algorithm_settings(&self) -> CompactionAlgorithmSettings {
let tenant_conf = &self.tenant_conf.load();
tenant_conf
@@ -3127,6 +3078,7 @@ impl Timeline {
repartition_threshold: 0,
last_image_layer_creation_check_at: AtomicLsn::new(0),
last_image_layer_creation_check_instant: Mutex::new(None),
last_received_wal: Mutex::new(None),
rel_size_latest_cache: RwLock::new(HashMap::new()),
rel_size_snapshot_cache: Mutex::new(LruCache::new(relsize_snapshot_cache_capacity)),
@@ -3177,7 +3129,7 @@ impl Timeline {
basebackup_cache: resources.basebackup_cache,
feature_resolver: resources.feature_resolver.clone(),
feature_resolver: resources.feature_resolver,
};
result.repartition_threshold =
@@ -5018,7 +4970,7 @@ impl Timeline {
ctx,
)
.await
.map_err(|e| FlushLayerError::from_anyhow(self, e.into_anyhow()))?;
.map_err(|e| FlushLayerError::from_anyhow(self, e.into()))?;
if self.cancel.is_cancelled() {
return Err(FlushLayerError::Cancelled);
@@ -5047,7 +4999,6 @@ impl Timeline {
.create_image_layers(
&partitions,
self.initdb_lsn,
None,
ImageLayerCreationMode::Initial,
ctx,
LastImageLayerCreationStatus::Initial,
@@ -5269,18 +5220,18 @@ impl Timeline {
partition_size: u64,
flags: EnumSet<CompactFlags>,
ctx: &RequestContext,
) -> Result<((KeyPartitioning, SparseKeyPartitioning), Lsn), RepartitionError> {
) -> Result<((KeyPartitioning, SparseKeyPartitioning), Lsn), CompactionError> {
let Ok(mut guard) = self.partitioning.try_write_guard() else {
// NB: there are two callers, one is the compaction task, of which there is only one per struct Tenant and hence Timeline.
// The other is the initdb optimization in flush_frozen_layer, used by `boostrap_timeline`, which runs before `.activate()`
// and hence before the compaction task starts.
return Err(RepartitionError::Other(anyhow!(
return Err(CompactionError::Other(anyhow!(
"repartition() called concurrently"
)));
};
let ((dense_partition, sparse_partition), partition_lsn) = &*guard.read();
if lsn < *partition_lsn {
return Err(RepartitionError::Other(anyhow!(
return Err(CompactionError::Other(anyhow!(
"repartition() called with LSN going backwards, this should not happen"
)));
}
@@ -5301,10 +5252,7 @@ impl Timeline {
));
}
let (dense_ks, sparse_ks) = self
.collect_keyspace(lsn, ctx)
.await
.map_err(RepartitionError::CollectKeyspace)?;
let (dense_ks, sparse_ks) = self.collect_keyspace(lsn, ctx).await?;
let dense_partitioning = dense_ks.partition(
&self.shard_identity,
partition_size,
@@ -5319,19 +5267,14 @@ impl Timeline {
}
// Is it time to create a new image layer for the given partition? True if we want to generate.
async fn time_for_new_image_layer(
&self,
partition: &KeySpace,
lsn: Lsn,
force_image_creation_lsn: Option<Lsn>,
) -> bool {
async fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> bool {
let threshold = self.get_image_creation_threshold();
let guard = self.layers.read(LayerManagerLockHolder::Compaction).await;
let Ok(layers) = guard.layer_map() else {
return false;
};
let mut min_image_lsn: Lsn = Lsn::MAX;
let mut max_deltas = 0;
for part_range in &partition.ranges {
let image_coverage = layers.image_coverage(part_range, lsn);
@@ -5366,25 +5309,9 @@ impl Timeline {
return true;
}
}
min_image_lsn = min(min_image_lsn, img_lsn);
}
}
// HADRON
// for child timelines, we consider all pages up to ancestor_LSN are redone successfully by the parent timeline
min_image_lsn = min_image_lsn.max(self.get_ancestor_lsn());
if min_image_lsn < force_image_creation_lsn.unwrap_or(Lsn(0)) && max_deltas > 0 {
info!(
"forcing image creation for partitioned range {}-{}. Min image LSN: {}, force image creation LSN: {}, num deltas: {}",
partition.ranges[0].start,
partition.ranges[0].end,
min_image_lsn,
force_image_creation_lsn.unwrap(),
max_deltas
);
return true;
}
debug!(
max_deltas,
"none of the partitioned ranges had >= {threshold} deltas"
@@ -5610,7 +5537,7 @@ impl Timeline {
/// suffer from the lack of image layers
/// 2. For small tenants (that can mostly fit in RAM), we use a much longer interval
fn should_check_if_image_layers_required(self: &Arc<Timeline>, lsn: Lsn) -> bool {
let large_timeline_threshold = self.conf.image_layer_generation_large_timeline_threshold;
const LARGE_TENANT_THRESHOLD: u64 = 2 * 1024 * 1024 * 1024;
let last_checks_at = self.last_image_layer_creation_check_at.load();
let distance = lsn
@@ -5624,12 +5551,12 @@ impl Timeline {
let mut time_based_decision = false;
let mut last_check_instant = self.last_image_layer_creation_check_instant.lock().unwrap();
if let CurrentLogicalSize::Exact(logical_size) = self.current_logical_size.current_size() {
let check_required_after =
if Some(Into::<u64>::into(&logical_size)) >= large_timeline_threshold {
self.get_checkpoint_timeout()
} else {
Duration::from_secs(3600 * 48)
};
let check_required_after = if Into::<u64>::into(&logical_size) >= LARGE_TENANT_THRESHOLD
{
self.get_checkpoint_timeout()
} else {
Duration::from_secs(3600 * 48)
};
time_based_decision = match *last_check_instant {
Some(last_check) => {
@@ -5657,12 +5584,10 @@ impl Timeline {
/// true = we have generate all image layers, false = we preempt the process for L0 compaction.
///
/// `partition_mode` is only for logging purpose and is not used anywhere in this function.
#[allow(clippy::too_many_arguments)]
async fn create_image_layers(
self: &Arc<Timeline>,
partitioning: &KeyPartitioning,
lsn: Lsn,
force_image_creation_lsn: Option<Lsn>,
mode: ImageLayerCreationMode,
ctx: &RequestContext,
last_status: LastImageLayerCreationStatus,
@@ -5766,11 +5691,7 @@ impl Timeline {
} else if let ImageLayerCreationMode::Try = mode {
// check_for_image_layers = false -> skip
// check_for_image_layers = true -> check time_for_new_image_layer -> skip/generate
if !check_for_image_layers
|| !self
.time_for_new_image_layer(partition, lsn, force_image_creation_lsn)
.await
{
if !check_for_image_layers || !self.time_for_new_image_layer(partition, lsn).await {
start = img_range.end;
continue;
}
@@ -6091,88 +6012,57 @@ impl Drop for Timeline {
}
}
pub(crate) use compaction_error::CompactionError;
/// In a private mod to enforce that [`CompactionError::is_cancel`] is used
/// instead of `match`ing on [`CompactionError::ShuttingDown`].
mod compaction_error {
use utils::sync::gate::GateError;
/// Top-level failure to compact.
#[derive(Debug, thiserror::Error)]
pub(crate) enum CompactionError {
#[error("The timeline or pageserver is shutting down")]
ShuttingDown,
/// Compaction tried to offload a timeline and failed
#[error("Failed to offload timeline: {0}")]
Offload(OffloadError),
/// Compaction cannot be done right now; page reconstruction and so on.
#[error("Failed to collect keyspace: {0}")]
CollectKeySpaceError(#[from] CollectKeySpaceError),
#[error(transparent)]
Other(anyhow::Error),
#[error("Compaction already running: {0}")]
AlreadyRunning(&'static str),
}
use crate::{
pgdatadir_mapping::CollectKeySpaceError,
tenant::{PageReconstructError, blob_io::WriteBlobError, upload_queue::NotInitialized},
virtual_file::owned_buffers_io::write::FlushTaskError,
};
/// Top-level failure to compact. Use [`Self::is_cancel`].
#[derive(Debug, thiserror::Error)]
pub(crate) enum CompactionError {
/// Use [`Self::is_cancel`] instead of checking for this variant.
#[error("The timeline or pageserver is shutting down")]
#[allow(private_interfaces)]
ShuttingDown(ForbidMatching), // private ForbidMatching enforces use of [`Self::is_cancel`].
#[error(transparent)]
Other(anyhow::Error),
impl CompactionError {
/// Errors that can be ignored, i.e., cancel and shutdown.
pub fn is_cancel(&self) -> bool {
matches!(
self,
Self::ShuttingDown
| Self::AlreadyRunning(_)
| Self::CollectKeySpaceError(CollectKeySpaceError::Cancelled)
| Self::CollectKeySpaceError(CollectKeySpaceError::PageRead(
PageReconstructError::Cancelled
))
| Self::Offload(OffloadError::Cancelled)
)
}
#[derive(Debug)]
struct ForbidMatching;
/// Critical errors that indicate data corruption.
pub fn is_critical(&self) -> bool {
matches!(
self,
Self::CollectKeySpaceError(
CollectKeySpaceError::Decode(_)
| CollectKeySpaceError::PageRead(
PageReconstructError::MissingKey(_) | PageReconstructError::WalRedo(_),
)
)
)
}
}
impl CompactionError {
pub fn new_cancelled() -> Self {
Self::ShuttingDown(ForbidMatching)
}
/// Errors that can be ignored, i.e., cancel and shutdown.
pub fn is_cancel(&self) -> bool {
let other = match self {
CompactionError::ShuttingDown(_) => return true,
CompactionError::Other(other) => other,
};
// The write path of compaction in particular often lacks differentiated
// handling errors stemming from cancellation from other errors.
// So, if requested, we also check the ::Other variant by downcasting.
// The list below has been found empirically from flaky tests and production logs.
// The process is simple: on ::Other(), compaction will print the enclosed
// anyhow::Error in debug mode, i.e., with backtrace. That backtrace contains the
// line where the write path / compaction code does undifferentiated error handling
// from a non-anyhow type to an anyhow type. Add the type to the list of downcasts
// below, following the same is_cancel() pattern.
let root_cause = other.root_cause();
let upload_queue = root_cause
.downcast_ref::<NotInitialized>()
.is_some_and(|e| e.is_stopping());
let timeline = root_cause
.downcast_ref::<PageReconstructError>()
.is_some_and(|e| e.is_cancel());
let buffered_writer_flush_task_canelled = root_cause
.downcast_ref::<FlushTaskError>()
.is_some_and(|e| e.is_cancel());
let write_blob_cancelled = root_cause
.downcast_ref::<WriteBlobError>()
.is_some_and(|e| e.is_cancel());
let gate_closed = root_cause
.downcast_ref::<GateError>()
.is_some_and(|e| e.is_cancel());
upload_queue
|| timeline
|| buffered_writer_flush_task_canelled
|| write_blob_cancelled
|| gate_closed
}
pub fn into_anyhow(self) -> anyhow::Error {
match self {
CompactionError::ShuttingDown(ForbidMatching) => anyhow::Error::new(self),
CompactionError::Other(e) => e,
}
}
pub fn from_collect_keyspace(err: CollectKeySpaceError) -> Self {
if err.is_cancel() {
Self::new_cancelled()
} else {
Self::Other(err.into_anyhow())
}
impl From<OffloadError> for CompactionError {
fn from(e: OffloadError) -> Self {
match e {
OffloadError::Cancelled => Self::ShuttingDown,
_ => Self::Offload(e),
}
}
}
@@ -6184,7 +6074,7 @@ impl From<super::upload_queue::NotInitialized> for CompactionError {
CompactionError::Other(anyhow::anyhow!(value))
}
super::upload_queue::NotInitialized::ShuttingDown
| super::upload_queue::NotInitialized::Stopped => CompactionError::new_cancelled(),
| super::upload_queue::NotInitialized::Stopped => CompactionError::ShuttingDown,
}
}
}
@@ -6194,7 +6084,7 @@ impl From<super::storage_layer::layer::DownloadError> for CompactionError {
match e {
super::storage_layer::layer::DownloadError::TimelineShutdown
| super::storage_layer::layer::DownloadError::DownloadCancelled => {
CompactionError::new_cancelled()
CompactionError::ShuttingDown
}
super::storage_layer::layer::DownloadError::ContextAndConfigReallyDeniesDownloads
| super::storage_layer::layer::DownloadError::DownloadRequired
@@ -6213,14 +6103,14 @@ impl From<super::storage_layer::layer::DownloadError> for CompactionError {
impl From<layer_manager::Shutdown> for CompactionError {
fn from(_: layer_manager::Shutdown) -> Self {
CompactionError::new_cancelled()
CompactionError::ShuttingDown
}
}
impl From<super::storage_layer::errors::PutError> for CompactionError {
fn from(e: super::storage_layer::errors::PutError) -> Self {
if e.is_cancel() {
CompactionError::new_cancelled()
CompactionError::ShuttingDown
} else {
CompactionError::Other(e.into_anyhow())
}
@@ -6319,7 +6209,7 @@ impl Timeline {
let mut guard = tokio::select! {
guard = self.layers.write(LayerManagerLockHolder::Compaction) => guard,
_ = self.cancel.cancelled() => {
return Err(CompactionError::new_cancelled());
return Err(CompactionError::ShuttingDown);
}
};
@@ -6875,7 +6765,7 @@ impl Timeline {
}
/// Reconstruct a value, using the given base image and WAL records in 'data'.
pub(crate) async fn reconstruct_value(
async fn reconstruct_value(
&self,
key: Key,
request_lsn: Lsn,
@@ -7146,19 +7036,6 @@ impl Timeline {
.unwrap()
.clone()
}
/* BEGIN_HADRON */
pub(crate) async fn compute_image_consistent_lsn(&self) -> anyhow::Result<Lsn> {
let guard = self
.layers
.read(LayerManagerLockHolder::ComputeImageConsistentLsn)
.await;
let layer_map = guard.layer_map()?;
let disk_consistent_lsn = self.get_disk_consistent_lsn();
Ok(layer_map.compute_image_consistent_lsn(disk_consistent_lsn))
}
/* END_HADRON */
}
impl Timeline {

View File

@@ -4,7 +4,6 @@
//!
//! The old legacy algorithm is implemented directly in `timeline.rs`.
use std::cmp::min;
use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
use std::ops::{Deref, Range};
use std::sync::Arc;
@@ -17,8 +16,7 @@ use super::{
Timeline,
};
use crate::pgdatadir_mapping::CollectKeySpaceError;
use crate::tenant::timeline::{DeltaEntry, RepartitionError};
use crate::tenant::timeline::DeltaEntry;
use crate::walredo::RedoAttemptType;
use anyhow::{Context, anyhow};
use bytes::Bytes;
@@ -66,7 +64,7 @@ use crate::tenant::timeline::{
DeltaLayerWriter, ImageLayerCreationOutcome, ImageLayerWriter, IoConcurrency, Layer,
ResidentLayer, drop_layer_manager_rlock,
};
use crate::tenant::{DeltaLayer, MaybeOffloaded, PageReconstructError};
use crate::tenant::{DeltaLayer, MaybeOffloaded};
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
/// Maximum number of deltas before generating an image layer in bottom-most compaction.
@@ -573,7 +571,7 @@ impl GcCompactionQueue {
}
match res {
Ok(res) => Ok(res),
Err(e) if e.is_cancel() => Err(e),
Err(CompactionError::ShuttingDown) => Err(CompactionError::ShuttingDown),
Err(_) => {
// There are some cases where traditional gc might collect some layer
// files causing gc-compaction cannot read the full history of the key.
@@ -593,9 +591,9 @@ impl GcCompactionQueue {
timeline: &Arc<Timeline>,
) -> Result<CompactionOutcome, CompactionError> {
let Ok(_one_op_at_a_time_guard) = self.consumer_lock.try_lock() else {
return Err(CompactionError::Other(anyhow::anyhow!(
"cannot run gc-compaction because another gc-compaction is running. This should not happen because we only call this function from the gc-compaction queue."
)));
return Err(CompactionError::AlreadyRunning(
"cannot run gc-compaction because another gc-compaction is running. This should not happen because we only call this function from the gc-compaction queue.",
));
};
let has_pending_tasks;
let mut yield_for_l0 = false;
@@ -1261,16 +1259,13 @@ impl Timeline {
// Is the timeline being deleted?
if self.is_stopping() {
trace!("Dropping out of compaction on timeline shutdown");
return Err(CompactionError::new_cancelled());
return Err(CompactionError::ShuttingDown);
}
let target_file_size = self.get_checkpoint_distance();
// Define partitioning schema if needed
// HADRON
let force_image_creation_lsn = self.get_force_image_creation_lsn();
// 1. L0 Compact
let l0_outcome = {
let timer = self.metrics.compact_time_histo.start_timer();
@@ -1278,7 +1273,6 @@ impl Timeline {
.compact_level0(
target_file_size,
options.flags.contains(CompactFlags::ForceL0Compaction),
force_image_creation_lsn,
ctx,
)
.await?;
@@ -1381,7 +1375,6 @@ impl Timeline {
.create_image_layers(
&partitioning,
lsn,
force_image_creation_lsn,
mode,
&image_ctx,
self.last_image_layer_creation_status
@@ -1424,33 +1417,22 @@ impl Timeline {
}
// Suppress errors when cancelled.
//
// Log other errors but continue. Failure to repartition is normal, if the timeline was just created
// as an empty timeline. Also in unit tests, when we use the timeline as a simple
// key-value store, ignoring the datadir layout. Log the error but continue.
//
// TODO:
// 1. shouldn't we return early here if we observe cancellation
// 2. Experiment: can we stop checking self.cancel here?
Err(_) if self.cancel.is_cancelled() => {} // TODO: try how we fare removing this branch
Err(_) if self.cancel.is_cancelled() => {}
Err(err) if err.is_cancel() => {}
Err(RepartitionError::CollectKeyspace(
e @ CollectKeySpaceError::Decode(_)
| e @ CollectKeySpaceError::PageRead(
PageReconstructError::MissingKey(_) | PageReconstructError::WalRedo(_),
),
)) => {
// Alert on critical errors that indicate data corruption.
// Alert on critical errors that indicate data corruption.
Err(err) if err.is_critical() => {
critical_timeline!(
self.tenant_shard_id,
self.timeline_id,
"could not compact, repartitioning keyspace failed: {e:?}"
"could not compact, repartitioning keyspace failed: {err:?}"
);
}
Err(e) => error!(
"could not compact, repartitioning keyspace failed: {:?}",
e.into_anyhow()
),
// Log other errors. No partitioning? This is normal, if the timeline was just created
// as an empty timeline. Also in unit tests, when we use the timeline as a simple
// key-value store, ignoring the datadir layout. Log the error but continue.
Err(err) => error!("could not compact, repartitioning keyspace failed: {err:?}"),
};
let partition_count = self.partitioning.read().0.0.parts.len();
@@ -1478,41 +1460,6 @@ impl Timeline {
Ok(CompactionOutcome::Done)
}
/* BEGIN_HADRON */
// Get the force image creation LSN based on gc_cutoff_lsn.
// Note that this is an estimation and the workload rate may suddenly change. When that happens,
// the force image creation may be too early or too late, but eventually it should be able to catch up.
pub(crate) fn get_force_image_creation_lsn(self: &Arc<Self>) -> Option<Lsn> {
let image_creation_period = self.get_image_layer_force_creation_period()?;
let current_lsn = self.get_last_record_lsn();
let pitr_lsn = self.gc_info.read().unwrap().cutoffs.time?;
let pitr_interval = self.get_pitr_interval();
if pitr_lsn == Lsn::INVALID || pitr_interval.is_zero() {
tracing::warn!(
"pitr LSN/interval not found, skipping force image creation LSN calculation"
);
return None;
}
let delta_lsn = current_lsn.checked_sub(pitr_lsn).unwrap().0
* image_creation_period.as_secs()
/ pitr_interval.as_secs();
let force_image_creation_lsn = current_lsn.checked_sub(delta_lsn).unwrap_or(Lsn(0));
tracing::info!(
"Tenant shard {} computed force_image_creation_lsn: {}. Current lsn: {}, image_layer_force_creation_period: {:?}, GC cutoff: {}, PITR interval: {:?}",
self.tenant_shard_id,
force_image_creation_lsn,
current_lsn,
image_creation_period,
pitr_lsn,
pitr_interval
);
Some(force_image_creation_lsn)
}
/* END_HADRON */
/// Check for layers that are elegible to be rewritten:
/// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
/// we don't indefinitely retain keys in this shard that aren't needed.
@@ -1665,7 +1612,7 @@ impl Timeline {
for (i, layer) in layers_to_rewrite.into_iter().enumerate() {
if self.cancel.is_cancelled() {
return Err(CompactionError::new_cancelled());
return Err(CompactionError::ShuttingDown);
}
info!(layer=%layer, "rewriting layer after shard split: {}/{}", i, total);
@@ -1763,7 +1710,7 @@ impl Timeline {
Ok(()) => {},
Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)),
Err(WaitCompletionError::UploadQueueShutDownOrStopped) => {
return Err(CompactionError::new_cancelled());
return Err(CompactionError::ShuttingDown);
}
},
// Don't wait if there's L0 compaction to do. We don't need to update the outcome
@@ -1842,7 +1789,6 @@ impl Timeline {
self: &Arc<Self>,
target_file_size: u64,
force_compaction_ignore_threshold: bool,
force_compaction_lsn: Option<Lsn>,
ctx: &RequestContext,
) -> Result<CompactionOutcome, CompactionError> {
let CompactLevel0Phase1Result {
@@ -1863,7 +1809,6 @@ impl Timeline {
stats,
target_file_size,
force_compaction_ignore_threshold,
force_compaction_lsn,
&ctx,
)
.instrument(phase1_span)
@@ -1886,7 +1831,6 @@ impl Timeline {
mut stats: CompactLevel0Phase1StatsBuilder,
target_file_size: u64,
force_compaction_ignore_threshold: bool,
force_compaction_lsn: Option<Lsn>,
ctx: &RequestContext,
) -> Result<CompactLevel0Phase1Result, CompactionError> {
let begin = tokio::time::Instant::now();
@@ -1916,28 +1860,11 @@ impl Timeline {
return Ok(CompactLevel0Phase1Result::default());
}
} else {
// HADRON
let min_lsn = level0_deltas
.iter()
.map(|a| a.get_lsn_range().start)
.reduce(min);
if force_compaction_lsn.is_some()
&& min_lsn.is_some()
&& min_lsn.unwrap() < force_compaction_lsn.unwrap()
{
info!(
"forcing L0 compaction of {} L0 deltas. Min lsn: {}, force compaction lsn: {}",
level0_deltas.len(),
min_lsn.unwrap(),
force_compaction_lsn.unwrap()
);
} else {
debug!(
level0_deltas = level0_deltas.len(),
threshold, "too few deltas to compact"
);
return Ok(CompactLevel0Phase1Result::default());
}
debug!(
level0_deltas = level0_deltas.len(),
threshold, "too few deltas to compact"
);
return Ok(CompactLevel0Phase1Result::default());
}
}
@@ -2046,7 +1973,7 @@ impl Timeline {
let mut all_keys = Vec::new();
for l in deltas_to_compact.iter() {
if self.cancel.is_cancelled() {
return Err(CompactionError::new_cancelled());
return Err(CompactionError::ShuttingDown);
}
let delta = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
let keys = delta
@@ -2139,7 +2066,7 @@ impl Timeline {
stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
if self.cancel.is_cancelled() {
return Err(CompactionError::new_cancelled());
return Err(CompactionError::ShuttingDown);
}
stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
@@ -2247,7 +2174,7 @@ impl Timeline {
// avoid hitting the cancellation token on every key. in benches, we end up
// shuffling an order of million keys per layer, this means we'll check it
// around tens of times per layer.
return Err(CompactionError::new_cancelled());
return Err(CompactionError::ShuttingDown);
}
let same_key = prev_key == Some(key);
@@ -2332,7 +2259,7 @@ impl Timeline {
if writer.is_none() {
if self.cancel.is_cancelled() {
// to be somewhat responsive to cancellation, check for each new layer
return Err(CompactionError::new_cancelled());
return Err(CompactionError::ShuttingDown);
}
// Create writer if not initiaized yet
writer = Some(
@@ -2588,13 +2515,10 @@ impl Timeline {
// Is the timeline being deleted?
if self.is_stopping() {
trace!("Dropping out of compaction on timeline shutdown");
return Err(CompactionError::new_cancelled());
return Err(CompactionError::ShuttingDown);
}
let (dense_ks, _sparse_ks) = self
.collect_keyspace(end_lsn, ctx)
.await
.map_err(CompactionError::from_collect_keyspace)?;
let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
// TODO(chi): ignore sparse_keyspace for now, compact it in the future.
let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
@@ -3250,7 +3174,7 @@ impl Timeline {
let gc_lock = async {
tokio::select! {
guard = self.gc_lock.lock() => Ok(guard),
_ = cancel.cancelled() => Err(CompactionError::new_cancelled()),
_ = cancel.cancelled() => Err(CompactionError::ShuttingDown),
}
};
@@ -3523,7 +3447,7 @@ impl Timeline {
}
total_layer_size += layer.layer_desc().file_size;
if cancel.is_cancelled() {
return Err(CompactionError::new_cancelled());
return Err(CompactionError::ShuttingDown);
}
let should_yield = yield_for_l0
&& self
@@ -3670,7 +3594,7 @@ impl Timeline {
}
if cancel.is_cancelled() {
return Err(CompactionError::new_cancelled());
return Err(CompactionError::ShuttingDown);
}
let should_yield = yield_for_l0

View File

@@ -212,12 +212,8 @@
//! to the parent shard during a shard split. Eventually, the shard split task will
//! shut down the parent => case (1).
use std::collections::HashMap;
use std::collections::hash_map;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::Weak;
use std::time::Duration;
use std::collections::{HashMap, hash_map};
use std::sync::{Arc, Mutex, Weak};
use pageserver_api::shard::ShardIdentity;
use tracing::{instrument, trace};
@@ -337,44 +333,6 @@ enum RoutingResult<T: Types> {
}
impl<T: Types> Cache<T> {
/* BEGIN_HADRON */
/// A wrapper of do_get to resolve the tenant shard for a get page request.
#[instrument(level = "trace", skip_all)]
pub(crate) async fn get(
&mut self,
timeline_id: TimelineId,
shard_selector: ShardSelector,
tenant_manager: &T::TenantManager,
) -> Result<Handle<T>, GetError<T>> {
const GET_MAX_RETRIES: usize = 10;
const RETRY_BACKOFF: Duration = Duration::from_millis(100);
let mut attempt = 0;
loop {
attempt += 1;
match self
.do_get(timeline_id, shard_selector, tenant_manager)
.await
{
Ok(handle) => return Ok(handle),
Err(e) => {
// Retry on tenant manager error to handle tenant split more gracefully
if attempt < GET_MAX_RETRIES {
tokio::time::sleep(RETRY_BACKOFF).await;
continue;
} else {
tracing::warn!(
"Failed to resolve tenant shard after {} attempts: {:?}",
GET_MAX_RETRIES,
e
);
return Err(e);
}
}
}
}
}
/* END_HADRON */
/// See module-level comment for details.
///
/// Does NOT check for the shutdown state of [`Types::Timeline`].
@@ -383,7 +341,7 @@ impl<T: Types> Cache<T> {
/// and if so, return an error that causes the page service to
/// close the connection.
#[instrument(level = "trace", skip_all)]
async fn do_get(
pub(crate) async fn get(
&mut self,
timeline_id: TimelineId,
shard_selector: ShardSelector,
@@ -921,7 +879,6 @@ mod tests {
.await
.err()
.expect("documented behavior: can't get new handle after shutdown");
assert_eq!(cache.map.len(), 1, "next access cleans up the cache");
cache

View File

@@ -47,7 +47,6 @@ pub(crate) enum LayerManagerLockHolder {
ImportPgData,
DetachAncestor,
Eviction,
ComputeImageConsistentLsn,
#[cfg(test)]
Testing,
}

Some files were not shown because too many files have changed in this diff Show More