Compare commits

..

1 Commits

Author SHA1 Message Date
Aleksandr Sarantsev
c8475ed008 Introduce flag for deletion API 2025-07-08 17:20:15 +04:00
111 changed files with 1129 additions and 5421 deletions

View File

@@ -87,24 +87,6 @@ jobs:
uses: ./.github/workflows/build-build-tools-image.yml
secrets: inherit
lint-openapi-spec:
runs-on: ubuntu-22.04
needs: [ meta, check-permissions ]
# We do need to run this in `.*-rc-pr` because of hotfixes.
if: ${{ contains(fromJSON('["pr", "push-main", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: docker/login-action@74a5d142397b4f367a81961eba4e8cd7edddf772 # v3.4.0
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- run: make lint-openapi-spec
check-codestyle-python:
needs: [ meta, check-permissions, build-build-tools-image ]
# No need to run on `main` because we this in the merge queue. We do need to run this in `.*-rc-pr` because of hotfixes.
@@ -1004,7 +986,6 @@ jobs:
- name: Verify docker-compose example and test extensions
timeout-minutes: 60
env:
PARALLEL_COMPUTES: 3
TAG: >-
${{
needs.meta.outputs.run-kind == 'compute-rc-pr'

1
.gitignore vendored
View File

@@ -15,7 +15,6 @@ neon.iml
/.neon
/integration_tests/.neon
compaction-suite-results.*
docker-compose/docker-compose-parallel.yml
# Coverage
*.profraw

23
Cargo.lock generated
View File

@@ -1348,7 +1348,6 @@ dependencies = [
"p256 0.13.2",
"pageserver_page_api",
"postgres",
"postgres-types",
"postgres_initdb",
"postgres_versioninfo",
"regex",
@@ -4340,7 +4339,6 @@ dependencies = [
"arc-swap",
"async-compression",
"async-stream",
"base64 0.22.1",
"bincode",
"bit_field",
"byteorder",
@@ -4494,25 +4492,6 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "pageserver_client_grpc"
version = "0.1.0"
dependencies = [
"anyhow",
"bytes",
"compute_api",
"futures",
"pageserver_api",
"pageserver_page_api",
"tokio",
"tokio-stream",
"tokio-util",
"tonic 0.13.1",
"tracing",
"utils",
"workspace_hack",
]
[[package]]
name = "pageserver_compaction"
version = "0.1.0"
@@ -5705,8 +5684,6 @@ dependencies = [
"azure_identity",
"azure_storage",
"azure_storage_blobs",
"base64 0.22.1",
"byteorder",
"bytes",
"camino",
"camino-tempfile",

View File

@@ -8,7 +8,6 @@ members = [
"pageserver/compaction",
"pageserver/ctl",
"pageserver/client",
"pageserver/client_grpc",
"pageserver/pagebench",
"pageserver/page_api",
"proxy",

View File

@@ -220,15 +220,6 @@ neon-pgindent: postgres-v17-pg-bsd-indent neon-pg-ext-v17
setup-pre-commit-hook:
ln -s -f $(ROOT_PROJECT_DIR)/pre-commit.py .git/hooks/pre-commit
.PHONY: lint-openapi-spec
lint-openapi-spec:
# operation-2xx-response: pageserver timeline delete returns 404 on success
find . -iname "openapi_spec.y*ml" -exec\
docker run --rm -v ${PWD}:/spec ghcr.io/redocly/cli:1.34.4\
--skip-rule=operation-operationId --skip-rule=operation-summary --extends=minimal\
--skip-rule=no-server-example.com --skip-rule=operation-2xx-response\
lint {} \+
# Targets for building PostgreSQL are defined in postgres.mk.
#
# But if the caller has indicated that PostgreSQL is already

View File

@@ -1,12 +1,9 @@
disallowed-methods = [
"tokio::task::block_in_place",
# Allow this for now, to deny it later once we stop using Handle::block_on completely
# "tokio::runtime::Handle::block_on",
# tokio-epoll-uring:
# - allow-invalid because the method doesn't exist on macOS
{ path = "tokio_epoll_uring::thread_local_system", replacement = "tokio_epoll_uring_ext module inside pageserver crate", allow-invalid = true }
# use tokio_epoll_uring_ext instead
"tokio_epoll_uring::thread_local_system",
]
disallowed-macros = [

View File

@@ -1915,10 +1915,10 @@ RUN cd /ext-src/pg_repack-src && patch -p1 </ext-src/pg_repack.patch && rm -f /e
COPY --chmod=755 docker-compose/run-tests.sh /run-tests.sh
RUN echo /usr/local/pgsql/lib > /etc/ld.so.conf.d/00-neon.conf && /sbin/ldconfig
RUN apt-get update && apt-get install -y libtap-parser-sourcehandler-pgtap-perl jq parallel \
RUN apt-get update && apt-get install -y libtap-parser-sourcehandler-pgtap-perl jq \
&& apt clean && rm -rf /ext-src/*.tar.gz /ext-src/*.patch /var/lib/apt/lists/*
ENV PATH=/usr/local/pgsql/bin:$PATH
ENV PGHOST=compute1
ENV PGHOST=compute
ENV PGPORT=55433
ENV PGUSER=cloud_admin
ENV PGDATABASE=postgres

View File

@@ -66,7 +66,7 @@ url.workspace = true
uuid.workspace = true
walkdir.workspace = true
x509-cert.workspace = true
postgres-types.workspace = true
postgres_versioninfo.workspace = true
postgres_initdb.workspace = true
compute_api.workspace = true

View File

@@ -3,7 +3,7 @@ use chrono::{DateTime, Utc};
use compute_api::privilege::Privilege;
use compute_api::responses::{
ComputeConfig, ComputeCtlConfig, ComputeMetrics, ComputeStatus, LfcOffloadState,
LfcPrewarmState, PromoteState, TlsConfig,
LfcPrewarmState, TlsConfig,
};
use compute_api::spec::{
ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, PageserverProtocol, PgIdent,
@@ -29,7 +29,8 @@ use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::time::{Duration, Instant};
use std::{env, fs};
use tokio::{spawn, sync::watch, task::JoinHandle, time};
use tokio::task::JoinHandle;
use tokio::{spawn, time};
use tracing::{Instrument, debug, error, info, instrument, warn};
use url::Url;
use utils::id::{TenantId, TimelineId};
@@ -174,7 +175,6 @@ pub struct ComputeState {
/// WAL flush LSN that is set after terminating Postgres and syncing safekeepers if
/// mode == ComputeMode::Primary. None otherwise
pub terminate_flush_lsn: Option<Lsn>,
pub promote_state: Option<watch::Receiver<PromoteState>>,
pub metrics: ComputeMetrics,
}
@@ -192,7 +192,6 @@ impl ComputeState {
lfc_prewarm_state: LfcPrewarmState::default(),
lfc_offload_state: LfcOffloadState::default(),
terminate_flush_lsn: None,
promote_state: None,
}
}
@@ -1058,7 +1057,7 @@ impl ComputeNode {
};
let (reader, connected) = tokio::runtime::Handle::current().block_on(async move {
let mut client = page_api::Client::connect(
let mut client = page_api::Client::new(
shard0_connstr,
spec.tenant_id,
spec.timeline_id,
@@ -2434,11 +2433,19 @@ LIMIT 100",
// If the value is -1, we never suspend so set the value to default collection.
// If the value is 0, it means default, we will just continue to use the default.
if spec.suspend_timeout_seconds == -1 || spec.suspend_timeout_seconds == 0 {
info!(
"[NEON_EXT_INT_UPD] Spec Timeout: {}, New Timeout: {}",
spec.suspend_timeout_seconds, DEFAULT_INSTALLED_EXTENSIONS_COLLECTION_INTERVAL
);
self.params.installed_extensions_collection_interval.store(
DEFAULT_INSTALLED_EXTENSIONS_COLLECTION_INTERVAL,
std::sync::atomic::Ordering::SeqCst,
);
} else {
info!(
"[NEON_EXT_INT_UPD] Spec Timeout: {}",
spec.suspend_timeout_seconds
);
self.params.installed_extensions_collection_interval.store(
spec.suspend_timeout_seconds as u64,
std::sync::atomic::Ordering::SeqCst,

View File

@@ -105,8 +105,7 @@ impl ComputeNode {
cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Completed;
return;
};
crate::metrics::LFC_PREWARM_ERRORS.inc();
error!(%err, "prewarming lfc");
error!(%err);
cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Failed {
error: err.to_string(),
};
@@ -181,8 +180,7 @@ impl ComputeNode {
self.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Completed;
return;
};
crate::metrics::LFC_OFFLOAD_ERRORS.inc();
error!(%err, "offloading lfc");
error!(%err);
self.state.lock().unwrap().lfc_offload_state = LfcOffloadState::Failed {
error: err.to_string(),
};

View File

@@ -1,132 +0,0 @@
use crate::compute::ComputeNode;
use anyhow::{Context, Result, bail};
use compute_api::{
responses::{LfcPrewarmState, PromoteState, SafekeepersLsn},
spec::ComputeMode,
};
use std::{sync::Arc, time::Duration};
use tokio::time::sleep;
use utils::lsn::Lsn;
impl ComputeNode {
/// Returns only when promote fails or succeeds. If a network error occurs
/// and http client disconnects, this does not stop promotion, and subsequent
/// calls block until promote finishes.
/// Called by control plane on secondary after primary endpoint is terminated
pub async fn promote(self: &Arc<Self>, safekeepers_lsn: SafekeepersLsn) -> PromoteState {
let cloned = self.clone();
let start_promotion = || {
let (tx, rx) = tokio::sync::watch::channel(PromoteState::NotPromoted);
tokio::spawn(async move {
tx.send(match cloned.promote_impl(safekeepers_lsn).await {
Ok(_) => PromoteState::Completed,
Err(err) => {
tracing::error!(%err, "promoting");
PromoteState::Failed {
error: err.to_string(),
}
}
})
});
rx
};
let mut task;
// self.state is unlocked after block ends so we lock it in promote_impl
// and task.changed() is reached
{
task = self
.state
.lock()
.unwrap()
.promote_state
.get_or_insert_with(start_promotion)
.clone()
}
task.changed().await.expect("promote sender dropped");
task.borrow().clone()
}
// Why do we have to supply safekeepers?
// For secondary we use primary_connection_conninfo so safekeepers field is empty
async fn promote_impl(&self, safekeepers_lsn: SafekeepersLsn) -> Result<()> {
{
let state = self.state.lock().unwrap();
let mode = &state.pspec.as_ref().unwrap().spec.mode;
if *mode != ComputeMode::Replica {
bail!("{} is not replica", mode.to_type_str());
}
// we don't need to query Postgres so not self.lfc_prewarm_state()
match &state.lfc_prewarm_state {
LfcPrewarmState::NotPrewarmed | LfcPrewarmState::Prewarming => {
bail!("prewarm not requested or pending")
}
LfcPrewarmState::Failed { error } => {
tracing::warn!(%error, "replica prewarm failed")
}
_ => {}
}
}
let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await
.context("connecting to postgres")?;
let primary_lsn = safekeepers_lsn.wal_flush_lsn;
let mut last_wal_replay_lsn: Lsn = Lsn::INVALID;
const RETRIES: i32 = 20;
for i in 0..=RETRIES {
let row = client
.query_one("SELECT pg_last_wal_replay_lsn()", &[])
.await
.context("getting last replay lsn")?;
let lsn: u64 = row.get::<usize, postgres_types::PgLsn>(0).into();
last_wal_replay_lsn = lsn.into();
if last_wal_replay_lsn >= primary_lsn {
break;
}
tracing::info!("Try {i}, replica lsn {last_wal_replay_lsn}, primary lsn {primary_lsn}");
sleep(Duration::from_secs(1)).await;
}
if last_wal_replay_lsn < primary_lsn {
bail!("didn't catch up with primary in {RETRIES} retries");
}
// using $1 doesn't work with ALTER SYSTEM SET
let safekeepers_sql = format!(
"ALTER SYSTEM SET neon.safekeepers='{}'",
safekeepers_lsn.safekeepers
);
client
.query(&safekeepers_sql, &[])
.await
.context("setting safekeepers")?;
client
.query("SELECT pg_reload_conf()", &[])
.await
.context("reloading postgres config")?;
let row = client
.query_one("SELECT * FROM pg_promote()", &[])
.await
.context("pg_promote")?;
if !row.get::<usize, bool>(0) {
bail!("pg_promote() returned false");
}
let client = ComputeNode::get_maintenance_client(&self.tokio_conn_conf)
.await
.context("connecting to postgres")?;
let row = client
.query_one("SHOW transaction_read_only", &[])
.await
.context("getting transaction_read_only")?;
if row.get::<usize, &str>(0) == "on" {
bail!("replica in read only mode after promotion");
}
let mut state = self.state.lock().unwrap();
state.pspec.as_mut().unwrap().spec.mode = ComputeMode::Primary;
Ok(())
}
}

View File

@@ -83,87 +83,6 @@ paths:
schema:
$ref: "#/components/schemas/DbsAndRoles"
/promote:
post:
tags:
- Promotion
summary: Promote secondary replica to primary
description: ""
operationId: promoteReplica
requestBody:
description: Promote requests data
required: true
content:
application/json:
schema:
$ref: "#/components/schemas/SafekeepersLsn"
responses:
200:
description: Promote succeeded or wasn't started
content:
application/json:
schema:
$ref: "#/components/schemas/PromoteState"
500:
description: Promote failed
content:
application/json:
schema:
$ref: "#/components/schemas/PromoteState"
/lfc/prewarm:
post:
summary: Request LFC Prewarm
parameters:
- name: from_endpoint
in: query
schema:
type: string
description: ""
operationId: lfcPrewarm
responses:
202:
description: LFC prewarm started
429:
description: LFC prewarm ongoing
get:
tags:
- Prewarm
summary: Get LFC prewarm state
description: ""
operationId: getLfcPrewarmState
responses:
200:
description: Prewarm state
content:
application/json:
schema:
$ref: "#/components/schemas/LfcPrewarmState"
/lfc/offload:
post:
summary: Request LFC offload
description: ""
operationId: lfcOffload
responses:
202:
description: LFC offload started
429:
description: LFC offload ongoing
get:
tags:
- Prewarm
summary: Get LFC offloading state
description: ""
operationId: getLfcOffloadState
responses:
200:
description: Offload state
content:
application/json:
schema:
$ref: "#/components/schemas/LfcOffloadState"
/database_schema:
get:
tags:
@@ -416,6 +335,15 @@ components:
total_startup_ms:
type: integer
Info:
type: object
description: Information about VM/Pod.
required:
- num_cpus
properties:
num_cpus:
type: integer
DbsAndRoles:
type: object
description: Databases and Roles
@@ -569,69 +497,25 @@ components:
type: string
example: "1.0.0"
SafekeepersLsn:
InstalledExtensions:
type: object
required:
- safekeepers
- wal_flush_lsn
properties:
safekeepers:
description: Primary replica safekeepers
type: string
wal_flush_lsn:
description: Primary last WAL flush LSN
type: string
LfcPrewarmState:
type: object
required:
- status
- total
- prewarmed
- skipped
properties:
status:
description: Lfc prewarm status
enum: [not_prewarmed, prewarming, completed, failed]
type: string
error:
description: Lfc prewarm error, if any
type: string
total:
description: Total pages processed
type: integer
prewarmed:
description: Total pages prewarmed
type: integer
skipped:
description: Pages processed but not prewarmed
type: integer
LfcOffloadState:
type: object
required:
- status
properties:
status:
description: Lfc offload status
enum: [not_offloaded, offloading, completed, failed]
type: string
error:
description: Lfc offload error, if any
type: string
PromoteState:
type: object
required:
- status
properties:
status:
description: Promote result
enum: [not_promoted, completed, failed]
type: string
error:
description: Promote error, if any
type: string
extensions:
description: Contains list of installed extensions.
type: array
items:
type: object
properties:
extname:
type: string
version:
type: string
items:
type: string
n_databases:
type: integer
owned_by_superuser:
type: integer
SetRoleGrantsRequest:
type: object

View File

@@ -14,7 +14,6 @@ pub(in crate::http) mod insights;
pub(in crate::http) mod lfc;
pub(in crate::http) mod metrics;
pub(in crate::http) mod metrics_json;
pub(in crate::http) mod promote;
pub(in crate::http) mod status;
pub(in crate::http) mod terminate;

View File

@@ -1,14 +0,0 @@
use crate::http::JsonResponse;
use axum::Form;
use http::StatusCode;
pub(in crate::http) async fn promote(
compute: axum::extract::State<std::sync::Arc<crate::compute::ComputeNode>>,
Form(safekeepers_lsn): Form<compute_api::responses::SafekeepersLsn>,
) -> axum::response::Response {
let state = compute.promote(safekeepers_lsn).await;
if let compute_api::responses::PromoteState::Failed { error } = state {
return JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, error);
}
JsonResponse::success(StatusCode::OK, state)
}

View File

@@ -23,7 +23,7 @@ use super::{
middleware::authorize::Authorize,
routes::{
check_writability, configure, database_schema, dbs_and_roles, extension_server, extensions,
grants, insights, lfc, metrics, metrics_json, promote, status, terminate,
grants, insights, lfc, metrics, metrics_json, status, terminate,
},
};
use crate::compute::ComputeNode;
@@ -87,7 +87,6 @@ impl From<&Server> for Router<Arc<ComputeNode>> {
let authenticated_router = Router::<Arc<ComputeNode>>::new()
.route("/lfc/prewarm", get(lfc::prewarm_state).post(lfc::prewarm))
.route("/lfc/offload", get(lfc::offload_state).post(lfc::offload))
.route("/promote", post(promote::promote))
.route("/check_writability", post(check_writability::is_writable))
.route("/configure", post(configure::configure))
.route("/database_schema", get(database_schema::get_schema_dump))

View File

@@ -12,7 +12,6 @@ pub mod logger;
pub mod catalog;
pub mod compute;
pub mod compute_prewarm;
pub mod compute_promote;
pub mod disk_quota;
pub mod extension_server;
pub mod installed_extensions;

View File

@@ -192,7 +192,7 @@ fn acquire_lsn_lease_grpc(
lsn: Lsn,
) -> Result<Option<SystemTime>> {
tokio::runtime::Handle::current().block_on(async move {
let mut client = page_api::Client::connect(
let mut client = page_api::Client::new(
connstring.to_string(),
tenant_shard_id.tenant_id,
timeline_id,

View File

@@ -105,14 +105,6 @@ pub(crate) static LFC_PREWARMS: Lazy<IntCounter> = Lazy::new(|| {
.expect("failed to define a metric")
});
pub(crate) static LFC_PREWARM_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"compute_ctl_lfc_prewarm_errors_total",
"Total number of LFC prewarm errors",
)
.expect("failed to define a metric")
});
pub(crate) static LFC_OFFLOADS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"compute_ctl_lfc_offloads_total",
@@ -121,14 +113,6 @@ pub(crate) static LFC_OFFLOADS: Lazy<IntCounter> = Lazy::new(|| {
.expect("failed to define a metric")
});
pub(crate) static LFC_OFFLOAD_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"compute_ctl_lfc_offload_errors_total",
"Total number of LFC offload errors",
)
.expect("failed to define a metric")
});
pub fn collect() -> Vec<MetricFamily> {
let mut metrics = COMPUTE_CTL_UP.collect();
metrics.extend(INSTALLED_EXTENSIONS.collect());
@@ -139,8 +123,6 @@ pub fn collect() -> Vec<MetricFamily> {
metrics.extend(PG_CURR_DOWNTIME_MS.collect());
metrics.extend(PG_TOTAL_DOWNTIME_MS.collect());
metrics.extend(LFC_PREWARMS.collect());
metrics.extend(LFC_PREWARM_ERRORS.collect());
metrics.extend(LFC_OFFLOADS.collect());
metrics.extend(LFC_OFFLOAD_ERRORS.collect());
metrics
}

View File

@@ -1 +0,0 @@
GRANT pg_signal_backend TO neon_superuser WITH ADMIN OPTION;

View File

@@ -7,17 +7,13 @@ BEGIN
INTO monitor
FROM pg_auth_members
WHERE roleid = 'pg_monitor'::regrole
AND member = 'neon_superuser'::regrole;
AND member = 'pg_monitor'::regrole;
IF monitor IS NULL THEN
RAISE EXCEPTION 'no entry in pg_auth_members for neon_superuser and pg_monitor';
END IF;
IF monitor.admin IS NULL OR NOT monitor.member THEN
IF NOT monitor.member THEN
RAISE EXCEPTION 'neon_superuser is not a member of pg_monitor';
END IF;
IF monitor.admin IS NULL OR NOT monitor.admin THEN
IF NOT monitor.admin THEN
RAISE EXCEPTION 'neon_superuser cannot grant pg_monitor';
END IF;
END $$;

View File

@@ -1,23 +0,0 @@
DO $$
DECLARE
signal_backend record;
BEGIN
SELECT pg_has_role('neon_superuser', 'pg_signal_backend', 'member') AS member,
admin_option AS admin
INTO signal_backend
FROM pg_auth_members
WHERE roleid = 'pg_signal_backend'::regrole
AND member = 'neon_superuser'::regrole;
IF signal_backend IS NULL THEN
RAISE EXCEPTION 'no entry in pg_auth_members for neon_superuser and pg_signal_backend';
END IF;
IF signal_backend.member IS NULL OR NOT signal_backend.member THEN
RAISE EXCEPTION 'neon_superuser is not a member of pg_signal_backend';
END IF;
IF signal_backend.admin IS NULL OR NOT signal_backend.admin THEN
RAISE EXCEPTION 'neon_superuser cannot grant pg_signal_backend';
END IF;
END $$;

View File

@@ -197,7 +197,6 @@ pub async fn handle_migrations(client: &mut Client) -> Result<()> {
include_str!(
"./migrations/0011-grant_pg_show_replication_origin_status_to_neon_superuser.sql"
),
include_str!("./migrations/0012-grant_pg_signal_backend_to_neon_superuser.sql"),
];
MigrationRunner::new(client, &migrations)

View File

@@ -452,12 +452,6 @@ impl PageServerNode {
.map(|x| x.parse::<usize>())
.transpose()
.context("Failed to parse 'image_creation_threshold' as non zero integer")?,
// HADRON
image_layer_force_creation_period: settings
.remove("image_layer_force_creation_period")
.map(humantime::parse_duration)
.transpose()
.context("Failed to parse 'image_layer_force_creation_period' as duration")?,
image_layer_creation_check_threshold: settings
.remove("image_layer_creation_check_threshold")
.map(|x| x.parse::<u8>())

View File

@@ -75,6 +75,12 @@ enum Command {
NodeStartDelete {
#[arg(long)]
node_id: NodeId,
/// When `force` is true, skip waiting for shards to prewarm during migration.
/// This can significantly speed up node deletion since prewarming all shards
/// can take considerable time, but may result in slower initial access to
/// migrated shards until they warm up naturally.
#[arg(long)]
force: bool,
},
/// Cancel deletion of the specified pageserver and wait for `timeout`
/// for the operation to be canceled. May be retried.
@@ -933,13 +939,14 @@ async fn main() -> anyhow::Result<()> {
.dispatch::<(), ()>(Method::DELETE, format!("control/v1/node/{node_id}"), None)
.await?;
}
Command::NodeStartDelete { node_id } => {
Command::NodeStartDelete { node_id, force } => {
let query = if force {
format!("control/v1/node/{node_id}/delete?force=true")
} else {
format!("control/v1/node/{node_id}/delete")
};
storcon_client
.dispatch::<(), ()>(
Method::PUT,
format!("control/v1/node/{node_id}/delete"),
None,
)
.dispatch::<(), ()>(Method::PUT, query, None)
.await?;
println!("Delete started for {node_id}");
}

View File

@@ -54,16 +54,14 @@ else
printf '%s\n' "${result}" | jq .
fi
if [[ "${RUN_PARALLEL:-false}" != "true" ]]; then
echo "Check if a timeline present"
PARAMS=(
-X GET
-H "Content-Type: application/json"
"http://pageserver:9898/v1/tenant/${tenant_id}/timeline"
)
timeline_id=$(curl "${PARAMS[@]}" | jq -r .[0].timeline_id)
fi
if [[ -z "${timeline_id:-}" || "${timeline_id:-}" = null ]]; then
echo "Check if a timeline present"
PARAMS=(
-X GET
-H "Content-Type: application/json"
"http://pageserver:9898/v1/tenant/${tenant_id}/timeline"
)
timeline_id=$(curl "${PARAMS[@]}" | jq -r .[0].timeline_id)
if [[ -z "${timeline_id}" || "${timeline_id}" = null ]]; then
generate_id timeline_id
PARAMS=(
-sbf

View File

@@ -142,7 +142,7 @@ services:
- "storage_broker"
- "--listen-addr=0.0.0.0:50051"
compute1:
compute:
restart: always
build:
context: ./compute_wrapper/
@@ -152,7 +152,6 @@ services:
- TAG=${COMPUTE_TAG:-${TAG:-latest}}
- http_proxy=${http_proxy:-}
- https_proxy=${https_proxy:-}
image: built-compute
environment:
- PG_VERSION=${PG_VERSION:-16}
- TENANT_ID=${TENANT_ID:-}
@@ -167,11 +166,6 @@ services:
- 3080:3080 # http endpoints
entrypoint:
- "/shell/compute.sh"
# Ad an alias for compute1 for compatibility
networks:
default:
aliases:
- compute
depends_on:
- safekeeper1
- safekeeper2
@@ -180,20 +174,15 @@ services:
compute_is_ready:
image: postgres:latest
environment:
- PARALLEL_COMPUTES=1
entrypoint:
- "/bin/sh"
- "/bin/bash"
- "-c"
command:
- "for i in $(seq 1 $${PARALLEL_COMPUTES}); do
until pg_isready -h compute$$i -p 55433 -U cloud_admin ; do
sleep 1;
done;
done;
echo All computes are started"
- "until pg_isready -h compute -p 55433 -U cloud_admin ; do
echo 'Waiting to start compute...' && sleep 1;
done"
depends_on:
- compute1
- compute
neon-test-extensions:
profiles: ["test-extensions"]
@@ -207,4 +196,4 @@ services:
command:
- sleep 3600
depends_on:
- compute1
- compute

View File

@@ -1,4 +1,4 @@
#!/usr/bin/env bash
#!/bin/bash
# A basic test to ensure Docker images are built correctly.
# Build a wrapper around the compute, start all services and runs a simple SQL query.
@@ -13,36 +13,9 @@
#
set -eux -o pipefail
cd "$(dirname "${0}")"
export COMPOSE_FILE='docker-compose.yml'
export COMPOSE_PROFILES=test-extensions
export PARALLEL_COMPUTES=${PARALLEL_COMPUTES:-1}
READY_MESSAGE="All computes are started"
COMPUTES=()
for i in $(seq 1 "${PARALLEL_COMPUTES}"); do
COMPUTES+=("compute${i}")
done
CURRENT_TMPDIR=$(mktemp -d)
trap 'rm -rf ${CURRENT_TMPDIR} docker-compose-parallel.yml' EXIT
if [[ ${PARALLEL_COMPUTES} -gt 1 ]]; then
export COMPOSE_FILE=docker-compose-parallel.yml
cp docker-compose.yml docker-compose-parallel.yml
# Replace the environment variable PARALLEL_COMPUTES with the actual value
yq eval -i ".services.compute_is_ready.environment |= map(select(. | test(\"^PARALLEL_COMPUTES=\") | not)) + [\"PARALLEL_COMPUTES=${PARALLEL_COMPUTES}\"]" ${COMPOSE_FILE}
for i in $(seq 2 "${PARALLEL_COMPUTES}"); do
# Duplicate compute1 as compute${i} for parallel execution
yq eval -i ".services.compute${i} = .services.compute1" ${COMPOSE_FILE}
# We don't need these sections, so delete them
yq eval -i "(del .services.compute${i}.build) | (del .services.compute${i}.ports) | (del .services.compute${i}.networks)" ${COMPOSE_FILE}
# Let the compute 1 be the only dependence
yq eval -i ".services.compute${i}.depends_on = [\"compute1\"]" ${COMPOSE_FILE}
# Set RUN_PARALLEL=true for compute2. They will generate tenant_id and timeline_id to avoid using the same as other computes
yq eval -i ".services.compute${i}.environment += [\"RUN_PARALLEL=true\"]" ${COMPOSE_FILE}
# Remove TENANT_ID and TIMELINE_ID from the environment variables of the generated computes
# They will create new TENANT_ID and TIMELINE_ID anyway.
yq eval -i ".services.compute${i}.environment |= map(select(. | (test(\"^TENANT_ID=\") or test(\"^TIMELINE_ID=\")) | not))" ${COMPOSE_FILE}
done
fi
cd "$(dirname "${0}")"
PSQL_OPTION="-h localhost -U cloud_admin -p 55433 -d postgres"
function cleanup() {
@@ -54,11 +27,11 @@ function cleanup() {
for pg_version in ${TEST_VERSION_ONLY-14 15 16 17}; do
pg_version=${pg_version/v/}
echo "clean up containers if exist"
echo "clean up containers if exists"
cleanup
PG_TEST_VERSION=$((pg_version < 16 ? 16 : pg_version))
PG_VERSION=${pg_version} PG_TEST_VERSION=${PG_TEST_VERSION} docker compose build compute1
PG_VERSION=${pg_version} PG_TEST_VERSION=${PG_TEST_VERSION} docker compose up --quiet-pull -d
PG_VERSION=${pg_version} PG_TEST_VERSION=${PG_TEST_VERSION} docker compose up --quiet-pull --build -d
echo "wait until the compute is ready. timeout after 60s. "
cnt=0
while sleep 3; do
@@ -68,50 +41,45 @@ for pg_version in ${TEST_VERSION_ONLY-14 15 16 17}; do
echo "timeout before the compute is ready."
exit 1
fi
if docker compose logs compute_is_ready | grep -q "${READY_MESSAGE}"; then
if docker compose logs "compute_is_ready" | grep -q "accepting connections"; then
echo "OK. The compute is ready to connect."
echo "execute simple queries."
for compute in "${COMPUTES[@]}"; do
docker compose exec "${compute}" /bin/bash -c "psql ${PSQL_OPTION} -c 'SELECT 1'"
done
docker compose exec compute /bin/bash -c "psql ${PSQL_OPTION} -c 'SELECT 1'"
break
fi
done
if [[ ${pg_version} -ge 16 ]]; then
mkdir "${CURRENT_TMPDIR}"/{pg_hint_plan-src,file_fdw,postgis-src}
docker compose cp neon-test-extensions:/ext-src/postgis-src/raster/test "${CURRENT_TMPDIR}/postgis-src/test"
docker compose cp neon-test-extensions:/ext-src/postgis-src/regress/00-regress-install "${CURRENT_TMPDIR}/postgis-src/00-regress-install"
docker compose cp neon-test-extensions:/ext-src/pg_hint_plan-src/data "${CURRENT_TMPDIR}/pg_hint_plan-src/data"
docker compose cp neon-test-extensions:/postgres/contrib/file_fdw/data "${CURRENT_TMPDIR}/file_fdw/data"
for compute in "${COMPUTES[@]}"; do
# This is required for the pg_hint_plan test, to prevent flaky log message causing the test to fail
# It cannot be moved to Dockerfile now because the database directory is created after the start of the container
echo Adding dummy config on "${compute}"
docker compose exec "${compute}" touch /var/db/postgres/compute/compute_ctl_temp_override.conf
# Prepare for the PostGIS test
docker compose exec "${compute}" mkdir -p /tmp/pgis_reg/pgis_reg_tmp /ext-src/postgis-src/raster /ext-src/postgis-src/regress /ext-src/postgis-src/regress/00-regress-install
docker compose cp "${CURRENT_TMPDIR}/postgis-src/test" "${compute}":/ext-src/postgis-src/raster/test
docker compose cp "${CURRENT_TMPDIR}/postgis-src/00-regress-install" "${compute}":/ext-src/postgis-src/regress
# The following block copies the files for the pg_hintplan test to the compute node for the extension test in an isolated docker-compose environment
docker compose cp "${CURRENT_TMPDIR}/pg_hint_plan-src/data" "${compute}":/ext-src/pg_hint_plan-src/
# The following block does the same for the contrib/file_fdw test
docker compose cp "${CURRENT_TMPDIR}/file_fdw/data" "${compute}":/postgres/contrib/file_fdw/data
done
# This is required for the pg_hint_plan test, to prevent flaky log message causing the test to fail
# It cannot be moved to Dockerfile now because the database directory is created after the start of the container
echo Adding dummy config
docker compose exec compute touch /var/db/postgres/compute/compute_ctl_temp_override.conf
# Prepare for the PostGIS test
docker compose exec compute mkdir -p /tmp/pgis_reg/pgis_reg_tmp
TMPDIR=$(mktemp -d)
docker compose cp neon-test-extensions:/ext-src/postgis-src/raster/test "${TMPDIR}"
docker compose cp neon-test-extensions:/ext-src/postgis-src/regress/00-regress-install "${TMPDIR}"
docker compose exec compute mkdir -p /ext-src/postgis-src/raster /ext-src/postgis-src/regress /ext-src/postgis-src/regress/00-regress-install
docker compose cp "${TMPDIR}/test" compute:/ext-src/postgis-src/raster/test
docker compose cp "${TMPDIR}/00-regress-install" compute:/ext-src/postgis-src/regress
rm -rf "${TMPDIR}"
# The following block copies the files for the pg_hintplan test to the compute node for the extension test in an isolated docker-compose environment
TMPDIR=$(mktemp -d)
docker compose cp neon-test-extensions:/ext-src/pg_hint_plan-src/data "${TMPDIR}/data"
docker compose cp "${TMPDIR}/data" compute:/ext-src/pg_hint_plan-src/
rm -rf "${TMPDIR}"
# The following block does the same for the contrib/file_fdw test
TMPDIR=$(mktemp -d)
docker compose cp neon-test-extensions:/postgres/contrib/file_fdw/data "${TMPDIR}/data"
docker compose cp "${TMPDIR}/data" compute:/postgres/contrib/file_fdw/data
rm -rf "${TMPDIR}"
# Apply patches
docker compose exec -T neon-test-extensions bash -c "(cd /postgres && patch -p1)" <"../compute/patches/contrib_pg${pg_version}.patch"
# We are running tests now
rm -f testout.txt testout_contrib.txt
# We want to run the longest tests first to better utilize parallelization and reduce overall test time.
# Tests listed in the RUN_FIRST variable will be run before others.
# If parallelization is not used, this environment variable will be ignored.
docker compose exec -e USE_PGXS=1 -e SKIP=timescaledb-src,rdkit-src,pg_jsonschema-src,kq_imcx-src,wal2json_2_5-src,rag_jina_reranker_v1_tiny_en-src,rag_bge_small_en_v15-src \
-e RUN_FIRST=hll-src,postgis-src,pgtap-src -e PARALLEL_COMPUTES="${PARALLEL_COMPUTES}" \
neon-test-extensions /run-tests.sh /ext-src | tee testout.txt && EXT_SUCCESS=1 || EXT_SUCCESS=0
docker compose exec -e SKIP=start-scripts,postgres_fdw,ltree_plpython,jsonb_plpython,jsonb_plperl,hstore_plpython,hstore_plperl,dblink,bool_plperl \
-e PARALLEL_COMPUTES="${PARALLEL_COMPUTES}" \
neon-test-extensions /run-tests.sh /postgres/contrib | tee testout_contrib.txt && CONTRIB_SUCCESS=1 || CONTRIB_SUCCESS=0
if [[ ${EXT_SUCCESS} -eq 0 || ${CONTRIB_SUCCESS} -eq 0 ]]; then
CONTRIB_FAILED=

View File

@@ -1,4 +1,4 @@
#!/usr/bin/env bash
#!/bin/bash
set -x
if [[ -v BENCHMARK_CONNSTR ]]; then
@@ -26,9 +26,8 @@ if [[ -v BENCHMARK_CONNSTR ]]; then
fi
fi
REGULAR_USER=false
PARALLEL_COMPUTES=${PARALLEL_COMPUTES:-1}
while getopts pr arg; do
case ${arg} in
while getopts r arg; do
case $arg in
r)
REGULAR_USER=true
shift $((OPTIND-1))
@@ -42,49 +41,26 @@ extdir=${1}
cd "${extdir}" || exit 2
FAILED=
export FAILED_FILE=/tmp/failed
rm -f ${FAILED_FILE}
mapfile -t LIST < <( (echo -e "${SKIP//","/"\n"}"; ls) | sort | uniq -u)
if [[ ${PARALLEL_COMPUTES} -gt 1 ]]; then
# Avoid errors if RUN_FIRST is not defined
RUN_FIRST=${RUN_FIRST:-}
# Move entries listed in the RUN_FIRST variable to the beginning
ORDERED_LIST=$(printf "%s\n" "${LIST[@]}" | grep -x -Ff <(echo -e "${RUN_FIRST//,/$'\n'}"); printf "%s\n" "${LIST[@]}" | grep -vx -Ff <(echo -e "${RUN_FIRST//,/$'\n'}"))
parallel -j"${PARALLEL_COMPUTES}" "[[ -d {} ]] || exit 0
export PGHOST=compute{%}
if ! psql -c 'select 1'>/dev/null; then
exit 1
fi
echo Running on \${PGHOST}
if [[ -f ${extdir}/{}/neon-test.sh ]]; then
echo Running from script
${extdir}/{}/neon-test.sh || echo {} >> ${FAILED_FILE};
else
echo Running using make;
USE_PGXS=1 make -C {} installcheck || echo {} >> ${FAILED_FILE};
fi" ::: ${ORDERED_LIST}
[[ ! -f ${FAILED_FILE} ]] && exit 0
else
for d in "${LIST[@]}"; do
[ -d "${d}" ] || continue
if ! psql -w -c "select 1" >/dev/null; then
FAILED="${d} ${FAILED}"
break
fi
if [[ ${REGULAR_USER} = true ]] && [ -f "${d}"/regular-test.sh ]; then
"${d}/regular-test.sh" || FAILED="${d} ${FAILED}"
continue
fi
LIST=$( (echo -e "${SKIP//","/"\n"}"; ls) | sort | uniq -u)
for d in ${LIST}; do
[ -d "${d}" ] || continue
if ! psql -w -c "select 1" >/dev/null; then
FAILED="${d} ${FAILED}"
break
fi
if [[ ${REGULAR_USER} = true ]] && [ -f "${d}"/regular-test.sh ]; then
"${d}/regular-test.sh" || FAILED="${d} ${FAILED}"
continue
fi
if [ -f "${d}/neon-test.sh" ]; then
"${d}/neon-test.sh" || FAILED="${d} ${FAILED}"
else
USE_PGXS=1 make -C "${d}" installcheck || FAILED="${d} ${FAILED}"
fi
done
[[ -z ${FAILED} ]] && exit 0
fi
for d in ${FAILED} $([[ ! -f ${FAILED_FILE} ]] || cat ${FAILED_FILE}); do
if [ -f "${d}/neon-test.sh" ]; then
"${d}/neon-test.sh" || FAILED="${d} ${FAILED}"
else
USE_PGXS=1 make -C "${d}" installcheck || FAILED="${d} ${FAILED}"
fi
done
[ -z "${FAILED}" ] && exit 0
for d in ${FAILED}; do
cat "$(find $d -name regression.diffs)"
done
for postgis_diff in /tmp/pgis_reg/*_diff; do
@@ -92,5 +68,4 @@ for postgis_diff in /tmp/pgis_reg/*_diff; do
cat "${postgis_diff}"
done
echo "${FAILED}"
cat ${FAILED_FILE}
exit 1

View File

@@ -1,4 +1,4 @@
#!/usr/bin/env bash
#!/bin/bash
set -eux -o pipefail
cd "$(dirname "${0}")"
# Takes a variable name as argument. The result is stored in that variable.
@@ -60,8 +60,8 @@ function check_timeline() {
# Restarts the compute node with the required compute tag and timeline.
# Accepts the tag for the compute node and the timeline as parameters.
function restart_compute() {
docker compose down compute1 compute_is_ready
COMPUTE_TAG=${1} TENANT_ID=${tenant_id} TIMELINE_ID=${2} docker compose up --quiet-pull -d --build compute1 compute_is_ready
docker compose down compute compute_is_ready
COMPUTE_TAG=${1} TENANT_ID=${tenant_id} TIMELINE_ID=${2} docker compose up --quiet-pull -d --build compute compute_is_ready
wait_for_ready
check_timeline ${2}
}

View File

@@ -13,8 +13,6 @@ use utils::backoff::retry;
pub fn app(state: Arc<Storage>) -> Router<()> {
use axum::routing::{delete as _delete, get as _get};
let delete_prefix = _delete(delete_prefix);
// NB: On any changes do not forget to update the OpenAPI spec
// in /endpoint_storage/src/openapi_spec.yml.
Router::new()
.route(
"/{tenant_id}/{timeline_id}/{endpoint_id}/{*path}",

View File

@@ -1,146 +0,0 @@
openapi: "3.0.2"
info:
title: Endpoint Storage API
description: Endpoint Storage API
version: "1.0"
license:
name: "Apache"
url: https://github.com/neondatabase/neon/blob/main/LICENSE
servers:
- url: ""
paths:
/status:
description: Healthcheck endpoint
get:
description: Healthcheck
security: []
responses:
"200":
description: OK
/{tenant_id}/{timeline_id}/{endpoint_id}/{key}:
parameters:
- name: tenant_id
in: path
required: true
schema:
type: string
- name: timeline_id
in: path
required: true
schema:
type: string
- name: endpoint_id
in: path
required: true
schema:
type: string
- name: key
in: path
required: true
schema:
type: string
get:
description: Get file from blob storage
responses:
"200":
description: "File stream from blob storage"
content:
application/octet-stream:
schema:
type: string
format: binary
"400":
description: File was not found
"403":
description: JWT does not authorize request to this route
put:
description: Insert file into blob storage. If file exists, override it
requestBody:
content:
application/octet-stream:
schema:
type: string
format: binary
responses:
"200":
description: File was inserted successfully
"403":
description: JWT does not authorize request to this route
delete:
description: Delete file from blob storage
responses:
"200":
description: File was successfully deleted or not found
"403":
description: JWT does not authorize request to this route
/{tenant_id}/{timeline_id}/{endpoint_id}:
parameters:
- name: tenant_id
in: path
required: true
schema:
type: string
- name: timeline_id
in: path
required: true
schema:
type: string
- name: endpoint_id
in: path
required: true
schema:
type: string
delete:
description: Delete endpoint data from blob storage
responses:
"200":
description: Endpoint data was deleted
"403":
description: JWT does not authorize request to this route
/{tenant_id}/{timeline_id}:
parameters:
- name: tenant_id
in: path
required: true
schema:
type: string
- name: timeline_id
in: path
required: true
schema:
type: string
delete:
description: Delete timeline data from blob storage
responses:
"200":
description: Timeline data was deleted
"403":
description: JWT does not authorize request to this route
/{tenant_id}:
parameters:
- name: tenant_id
in: path
required: true
schema:
type: string
delete:
description: Delete tenant data from blob storage
responses:
"200":
description: Tenant data was deleted
"403":
description: JWT does not authorize request to this route
components:
securitySchemes:
JWT:
type: http
scheme: bearer
bearerFormat: JWT
security:
- JWT: []

View File

@@ -46,7 +46,7 @@ pub struct ExtensionInstallResponse {
pub version: ExtVersion,
}
#[derive(Serialize, Default, Debug, Clone, PartialEq)]
#[derive(Serialize, Default, Debug, Clone)]
#[serde(tag = "status", rename_all = "snake_case")]
pub enum LfcPrewarmState {
#[default]
@@ -58,17 +58,6 @@ pub enum LfcPrewarmState {
},
}
impl Display for LfcPrewarmState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
LfcPrewarmState::NotPrewarmed => f.write_str("NotPrewarmed"),
LfcPrewarmState::Prewarming => f.write_str("Prewarming"),
LfcPrewarmState::Completed => f.write_str("Completed"),
LfcPrewarmState::Failed { error } => write!(f, "Error({error})"),
}
}
}
#[derive(Serialize, Default, Debug, Clone, PartialEq)]
#[serde(tag = "status", rename_all = "snake_case")]
pub enum LfcOffloadState {
@@ -81,23 +70,6 @@ pub enum LfcOffloadState {
},
}
#[derive(Serialize, Debug, Clone, PartialEq)]
#[serde(tag = "status", rename_all = "snake_case")]
/// Response of /promote
pub enum PromoteState {
NotPromoted,
Completed,
Failed { error: String },
}
#[derive(Deserialize, Serialize, Default, Debug, Clone)]
#[serde(rename_all = "snake_case")]
/// Result of /safekeepers_lsn
pub struct SafekeepersLsn {
pub safekeepers: String,
pub wal_flush_lsn: utils::lsn::Lsn,
}
/// Response of the /status API
#[derive(Serialize, Debug, Deserialize)]
#[serde(rename_all = "snake_case")]

View File

@@ -442,7 +442,7 @@ pub struct JwksSettings {
}
/// Protocol used to connect to a Pageserver. Parsed from the connstring scheme.
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
#[derive(Clone, Copy, Debug, Default)]
pub enum PageserverProtocol {
/// The original protocol based on libpq and COPY. Uses postgresql:// or postgres:// scheme.
#[default]

View File

@@ -41,35 +41,17 @@ pub fn get_query_param<'a>(
Some(q) => q,
None => return Ok(None),
};
let values = url::form_urlencoded::parse(query.as_bytes())
let mut values = url::form_urlencoded::parse(query.as_bytes())
.filter_map(|(k, v)| if k == param_name { Some(v) } else { None })
// we call .next() twice below. If it's None the first time, .fuse() ensures it's None afterwards
.fuse();
// Work around an issue with Alloy's pyroscope scrape where the "seconds"
// parameter is added several times. https://github.com/grafana/alloy/issues/3026
// TODO: revert after Alloy is fixed.
let value1 = values
.map(Ok)
.reduce(|acc, i| {
match acc {
Err(_) => acc,
// It's okay to have duplicates as along as they have the same value.
Ok(ref a) if a == &i.unwrap() => acc,
_ => Err(ApiError::BadRequest(anyhow!(
"param {param_name} specified more than once"
))),
}
})
.transpose()?;
// if values.next().is_some() {
// return Err(ApiError::BadRequest(anyhow!(
// "param {param_name} specified more than once"
// )));
// }
let value1 = values.next();
if values.next().is_some() {
return Err(ApiError::BadRequest(anyhow!(
"param {param_name} specified more than once"
)));
}
Ok(value1)
}
@@ -110,39 +92,3 @@ pub async fn ensure_no_body(request: &mut Request<Body>) -> Result<(), ApiError>
None => Ok(()),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_get_query_param_duplicate() {
let req = Request::builder()
.uri("http://localhost:12345/testuri?testparam=1")
.body(hyper::Body::empty())
.unwrap();
let value = get_query_param(&req, "testparam").unwrap();
assert_eq!(value.unwrap(), "1");
let req = Request::builder()
.uri("http://localhost:12345/testuri?testparam=1&testparam=1")
.body(hyper::Body::empty())
.unwrap();
let value = get_query_param(&req, "testparam").unwrap();
assert_eq!(value.unwrap(), "1");
let req = Request::builder()
.uri("http://localhost:12345/testuri")
.body(hyper::Body::empty())
.unwrap();
let value = get_query_param(&req, "testparam").unwrap();
assert!(value.is_none());
let req = Request::builder()
.uri("http://localhost:12345/testuri?testparam=1&testparam=2&testparam=3")
.body(hyper::Body::empty())
.unwrap();
let value = get_query_param(&req, "testparam");
assert!(value.is_err());
}
}

View File

@@ -5,7 +5,6 @@ mod tests;
use const_format::formatcp;
use posthog_client_lite::PostHogClientConfig;
use utils::serde_percent::Percent;
pub const DEFAULT_PG_LISTEN_PORT: u16 = 64000;
pub const DEFAULT_PG_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_PG_LISTEN_PORT}");
pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 9898;
@@ -224,9 +223,8 @@ pub struct ConfigToml {
pub metric_collection_bucket: Option<RemoteStorageConfig>,
#[serde(with = "humantime_serde")]
pub synthetic_size_calculation_interval: Duration,
pub disk_usage_based_eviction: DiskUsageEvictionTaskConfig,
pub disk_usage_based_eviction: Option<DiskUsageEvictionTaskConfig>,
pub test_remote_failures: u64,
pub test_remote_failures_probability: u64,
pub ondemand_download_behavior_treat_error_as_warn: bool,
#[serde(with = "humantime_serde")]
pub background_task_maximum_delay: Duration,
@@ -272,12 +270,9 @@ pub struct ConfigToml {
pub timeline_import_config: TimelineImportConfig,
#[serde(skip_serializing_if = "Option::is_none")]
pub basebackup_cache_config: Option<BasebackupCacheConfig>,
#[serde(skip_serializing_if = "Option::is_none")]
pub image_layer_generation_large_timeline_threshold: Option<u64>,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(default)]
pub struct DiskUsageEvictionTaskConfig {
pub max_usage_pct: utils::serde_percent::Percent,
pub min_avail_bytes: u64,
@@ -288,21 +283,6 @@ pub struct DiskUsageEvictionTaskConfig {
/// Select sorting for evicted layers
#[serde(default)]
pub eviction_order: EvictionOrder,
pub enabled: bool,
}
impl Default for DiskUsageEvictionTaskConfig {
fn default() -> Self {
Self {
max_usage_pct: Percent::new(80).unwrap(),
min_avail_bytes: 2_000_000_000,
period: Duration::from_secs(60),
#[cfg(feature = "testing")]
mock_statvfs: None,
eviction_order: EvictionOrder::default(),
enabled: true,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -563,11 +543,6 @@ pub struct TenantConfigToml {
pub gc_period: Duration,
// Delta layer churn threshold to create L1 image layers.
pub image_creation_threshold: usize,
// HADRON
// When the timeout is reached, PageServer will (1) force compact any remaining L0 deltas and
// (2) create image layers if there are any L1 deltas.
#[serde(with = "humantime_serde")]
pub image_layer_force_creation_period: Option<Duration>,
// Determines how much history is retained, to allow
// branching and read replicas at an older point in time.
// The unit is time.
@@ -763,10 +738,9 @@ impl Default for ConfigToml {
metric_collection_bucket: (None),
disk_usage_based_eviction: DiskUsageEvictionTaskConfig::default(),
disk_usage_based_eviction: (None),
test_remote_failures: (0),
test_remote_failures_probability: (100),
ondemand_download_behavior_treat_error_as_warn: (false),
@@ -830,7 +804,6 @@ impl Default for ConfigToml {
},
basebackup_cache_config: None,
posthog_config: None,
image_layer_generation_large_timeline_threshold: Some(2 * 1024 * 1024 * 1024),
}
}
}
@@ -924,7 +897,6 @@ impl Default for TenantConfigToml {
gc_period: humantime::parse_duration(DEFAULT_GC_PERIOD)
.expect("cannot parse default gc period"),
image_creation_threshold: DEFAULT_IMAGE_CREATION_THRESHOLD,
image_layer_force_creation_period: None,
pitr_interval: humantime::parse_duration(DEFAULT_PITR_INTERVAL)
.expect("cannot parse default PITR interval"),
walreceiver_connect_timeout: humantime::parse_duration(

View File

@@ -384,7 +384,7 @@ pub struct SafekeepersInfo {
pub safekeepers: Vec<SafekeeperInfo>,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
#[derive(Serialize, Deserialize, Clone)]
pub struct SafekeeperInfo {
pub id: NodeId,
pub hostname: String,
@@ -597,9 +597,6 @@ pub struct TenantConfigPatch {
pub gc_period: FieldPatch<String>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub image_creation_threshold: FieldPatch<usize>,
// HADRON
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub image_layer_force_creation_period: FieldPatch<String>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub pitr_interval: FieldPatch<String>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
@@ -703,11 +700,6 @@ pub struct TenantConfig {
#[serde(skip_serializing_if = "Option::is_none")]
pub image_creation_threshold: Option<usize>,
// HADRON
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(with = "humantime_serde")]
pub image_layer_force_creation_period: Option<Duration>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(with = "humantime_serde")]
pub pitr_interval: Option<Duration>,
@@ -806,7 +798,6 @@ impl TenantConfig {
mut gc_horizon,
mut gc_period,
mut image_creation_threshold,
mut image_layer_force_creation_period,
mut pitr_interval,
mut walreceiver_connect_timeout,
mut lagging_wal_timeout,
@@ -870,11 +861,6 @@ impl TenantConfig {
patch
.image_creation_threshold
.apply(&mut image_creation_threshold);
// HADRON
patch
.image_layer_force_creation_period
.map(|v| humantime::parse_duration(&v))?
.apply(&mut image_layer_force_creation_period);
patch
.pitr_interval
.map(|v| humantime::parse_duration(&v))?
@@ -956,7 +942,6 @@ impl TenantConfig {
gc_horizon,
gc_period,
image_creation_threshold,
image_layer_force_creation_period,
pitr_interval,
walreceiver_connect_timeout,
lagging_wal_timeout,
@@ -1031,9 +1016,6 @@ impl TenantConfig {
image_creation_threshold: self
.image_creation_threshold
.unwrap_or(global_conf.image_creation_threshold),
image_layer_force_creation_period: self
.image_layer_force_creation_period
.or(global_conf.image_layer_force_creation_period),
pitr_interval: self.pitr_interval.unwrap_or(global_conf.pitr_interval),
walreceiver_connect_timeout: self
.walreceiver_connect_timeout

View File

@@ -332,11 +332,7 @@ fn hash_combine(mut a: u32, mut b: u32) -> u32 {
///
/// The mapping of key to shard is not stable across changes to ShardCount: this is intentional
/// and will be handled at higher levels when shards are split.
pub fn key_to_shard_number(
count: ShardCount,
stripe_size: ShardStripeSize,
key: &Key,
) -> ShardNumber {
fn key_to_shard_number(count: ShardCount, stripe_size: ShardStripeSize, key: &Key) -> ShardNumber {
// Fast path for un-sharded tenants or broadcast keys
if count < ShardCount(2) || key_is_shard0(key) {
return ShardNumber(0);

View File

@@ -13,7 +13,6 @@ aws-smithy-async.workspace = true
aws-smithy-types.workspace = true
aws-config.workspace = true
aws-sdk-s3.workspace = true
base64.workspace = true
bytes.workspace = true
camino = { workspace = true, features = ["serde1"] }
humantime-serde.workspace = true
@@ -42,9 +41,6 @@ http-body-util.workspace = true
itertools.workspace = true
sync_wrapper = { workspace = true, features = ["futures"] }
byteorder = "1.4"
rand = "0.8.5"
[dev-dependencies]
camino-tempfile.workspace = true
test-context.workspace = true

View File

@@ -14,25 +14,17 @@ use anyhow::{Context, Result, anyhow};
use azure_core::request_options::{IfMatchCondition, MaxResults, Metadata, Range};
use azure_core::{Continuable, HttpClient, RetryOptions, TransportOptions};
use azure_storage::StorageCredentials;
use azure_storage_blobs::blob::BlobBlockType;
use azure_storage_blobs::blob::BlockList;
use azure_storage_blobs::blob::operations::GetBlobBuilder;
use azure_storage_blobs::blob::{Blob, CopyStatus};
use azure_storage_blobs::container::operations::ListBlobsBuilder;
use azure_storage_blobs::prelude::ClientBuilder;
use azure_storage_blobs::{blob::operations::GetBlobBuilder, prelude::ContainerClient};
use base64::{Engine as _, engine::general_purpose::URL_SAFE};
use byteorder::{BigEndian, ByteOrder};
use azure_storage_blobs::prelude::{ClientBuilder, ContainerClient};
use bytes::Bytes;
use camino::Utf8Path;
use futures::FutureExt;
use futures::future::Either;
use futures::stream::Stream;
use futures_util::{StreamExt, TryStreamExt};
use http_types::{StatusCode, Url};
use scopeguard::ScopeGuard;
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncSeekExt;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use utils::backoff;
@@ -59,9 +51,6 @@ pub struct AzureBlobStorage {
// Alternative timeout used for metadata objects which are expected to be small
pub small_timeout: Duration,
/* BEGIN_HADRON */
pub put_block_size_mb: Option<usize>,
/* END_HADRON */
}
impl AzureBlobStorage {
@@ -118,9 +107,6 @@ impl AzureBlobStorage {
concurrency_limiter: ConcurrencyLimiter::new(azure_config.concurrency_limit.get()),
timeout,
small_timeout,
/* BEGIN_HADRON */
put_block_size_mb: azure_config.put_block_size_mb,
/* END_HADRON */
})
}
@@ -597,137 +583,31 @@ impl RemoteStorage for AzureBlobStorage {
let started_at = start_measuring_requests(kind);
let mut metadata_map = metadata.unwrap_or([].into());
let timeline_file_path = metadata_map.0.remove("databricks_azure_put_block");
/* BEGIN_HADRON */
let op = async move {
let op = async {
let blob_client = self.client.blob_client(self.relative_path_to_name(to));
let put_block_size = self.put_block_size_mb.unwrap_or(0) * 1024 * 1024;
if timeline_file_path.is_none() || put_block_size == 0 {
// Use put_block_blob directly.
let from: Pin<
Box<dyn Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static>,
> = Box::pin(from);
let from = NonSeekableStream::new(from, data_size_bytes);
let body = azure_core::Body::SeekableStream(Box::new(from));
let mut builder = blob_client.put_block_blob(body);
if !metadata_map.0.is_empty() {
builder = builder.metadata(to_azure_metadata(metadata_map));
}
let fut = builder.into_future();
let fut = tokio::time::timeout(self.timeout, fut);
let result = fut.await;
match result {
Ok(Ok(_response)) => return Ok(()),
Ok(Err(azure)) => return Err(azure.into()),
Err(_timeout) => return Err(TimeoutOrCancel::Timeout.into()),
};
}
// Upload chunks concurrently using Put Block.
// Each PutBlock uploads put_block_size bytes of the file.
let mut upload_futures: Vec<tokio::task::JoinHandle<Result<(), azure_core::Error>>> =
vec![];
let mut block_list = BlockList::default();
let mut start_bytes = 0u64;
let mut remaining_bytes = data_size_bytes;
let mut block_list_count = 0;
let from: Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static>> =
Box::pin(from);
while remaining_bytes > 0 {
let block_size = std::cmp::min(remaining_bytes, put_block_size);
let end_bytes = start_bytes + block_size as u64;
let block_id = block_list_count;
let timeout = self.timeout;
let blob_client = blob_client.clone();
let timeline_file = timeline_file_path.clone().unwrap().clone();
let from = NonSeekableStream::new(from, data_size_bytes);
let mut encoded_block_id = [0u8; 8];
BigEndian::write_u64(&mut encoded_block_id, block_id);
URL_SAFE.encode(encoded_block_id);
let body = azure_core::Body::SeekableStream(Box::new(from));
// Put one block.
let part_fut = async move {
let mut file = File::open(Utf8Path::new(&timeline_file.clone())).await?;
file.seek(io::SeekFrom::Start(start_bytes)).await?;
let limited_reader = file.take(block_size as u64);
let file_chunk_stream =
tokio_util::io::ReaderStream::with_capacity(limited_reader, 1024 * 1024);
let file_chunk_stream_pin: Pin<
Box<dyn Stream<Item = std::io::Result<Bytes>> + Send + Sync + 'static>,
> = Box::pin(file_chunk_stream);
let stream_wrapper = NonSeekableStream::new(file_chunk_stream_pin, block_size);
let body = azure_core::Body::SeekableStream(Box::new(stream_wrapper));
// Azure put block takes URL-encoded block ids and all blocks must have the same byte length.
// https://learn.microsoft.com/en-us/rest/api/storageservices/put-block?tabs=microsoft-entra-id#uri-parameters
let builder = blob_client.put_block(encoded_block_id.to_vec(), body);
let fut = builder.into_future();
let fut = tokio::time::timeout(timeout, fut);
let result = fut.await;
tracing::debug!(
"azure put block id-{} size {} start {} end {} file {} response {:#?}",
block_id,
block_size,
start_bytes,
end_bytes,
timeline_file,
result
);
match result {
Ok(Ok(_response)) => Ok(()),
Ok(Err(azure)) => Err(azure),
Err(_timeout) => Err(azure_core::Error::new(
azure_core::error::ErrorKind::Io,
std::io::Error::new(
std::io::ErrorKind::TimedOut,
"Operation timed out",
),
)),
}
};
upload_futures.push(tokio::spawn(part_fut));
let mut builder = blob_client.put_block_blob(body);
block_list_count += 1;
remaining_bytes -= block_size;
start_bytes += block_size as u64;
block_list
.blocks
.push(BlobBlockType::Uncommitted(encoded_block_id.to_vec().into()));
if let Some(metadata) = metadata {
builder = builder.metadata(to_azure_metadata(metadata));
}
tracing::debug!(
"azure put blocks {} total MB: {} chunk size MB: {}",
block_list_count,
data_size_bytes / 1024 / 1024,
put_block_size / 1024 / 1024
);
// Wait for all blocks to be uploaded.
let upload_results = futures::future::try_join_all(upload_futures).await;
if upload_results.is_err() {
return Err(anyhow::anyhow!(format!(
"Failed to upload all blocks {:#?}",
upload_results.unwrap_err()
)));
}
// Commit the blocks.
let mut builder = blob_client.put_block_list(block_list);
if !metadata_map.0.is_empty() {
builder = builder.metadata(to_azure_metadata(metadata_map));
}
let fut = builder.into_future();
let fut = tokio::time::timeout(self.timeout, fut);
let result = fut.await;
tracing::debug!("azure put block list response {:#?}", result);
match result {
match fut.await {
Ok(Ok(_response)) => Ok(()),
Ok(Err(azure)) => Err(azure.into()),
Err(_timeout) => Err(TimeoutOrCancel::Timeout.into()),
}
};
/* END_HADRON */
let res = tokio::select! {
res = op => res,
@@ -742,6 +622,7 @@ impl RemoteStorage for AzureBlobStorage {
crate::metrics::BUCKET_METRICS
.req_seconds
.observe_elapsed(kind, outcome, started_at);
res
}

View File

@@ -195,19 +195,8 @@ pub struct AzureConfig {
pub max_keys_per_list_response: Option<i32>,
#[serde(default = "default_azure_conn_pool_size")]
pub conn_pool_size: usize,
/* BEGIN_HADRON */
#[serde(default = "default_azure_put_block_size_mb")]
pub put_block_size_mb: Option<usize>,
/* END_HADRON */
}
/* BEGIN_HADRON */
fn default_azure_put_block_size_mb() -> Option<usize> {
// Disable parallel upload by default.
Some(0)
}
/* END_HADRON */
fn default_remote_storage_azure_concurrency_limit() -> NonZeroUsize {
NonZeroUsize::new(DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT).unwrap()
}
@@ -224,9 +213,6 @@ impl Debug for AzureConfig {
"max_keys_per_list_response",
&self.max_keys_per_list_response,
)
/* BEGIN_HADRON */
.field("put_block_size_mb", &self.put_block_size_mb)
/* END_HADRON */
.finish()
}
}
@@ -366,7 +352,6 @@ timeout = '5s'";
upload_storage_class = 'INTELLIGENT_TIERING'
timeout = '7s'
conn_pool_size = 8
put_block_size_mb = 1024
";
let config = parse(toml).unwrap();
@@ -382,9 +367,6 @@ timeout = '5s'";
concurrency_limit: default_remote_storage_azure_concurrency_limit(),
max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
conn_pool_size: 8,
/* BEGIN_HADRON */
put_block_size_mb: Some(1024),
/* END_HADRON */
}),
timeout: Duration::from_secs(7),
small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT

View File

@@ -732,15 +732,9 @@ impl GenericRemoteStorage {
})
}
/* BEGIN_HADRON */
pub fn unreliable_wrapper(s: Self, fail_first: u64, fail_probability: u64) -> Self {
Self::Unreliable(Arc::new(UnreliableWrapper::new(
s,
fail_first,
fail_probability,
)))
pub fn unreliable_wrapper(s: Self, fail_first: u64) -> Self {
Self::Unreliable(Arc::new(UnreliableWrapper::new(s, fail_first)))
}
/* END_HADRON */
/// See [`RemoteStorage::upload`], which this method calls with `None` as metadata.
pub async fn upload_storage_object(

View File

@@ -1,8 +1,6 @@
//! This module provides a wrapper around a real RemoteStorage implementation that
//! causes the first N attempts at each upload or download operatio to fail. For
//! testing purposes.
use rand::Rng;
use std::cmp;
use std::collections::HashMap;
use std::collections::hash_map::Entry;
use std::num::NonZeroU32;
@@ -27,12 +25,6 @@ pub struct UnreliableWrapper {
// Tracks how many failed attempts of each operation has been made.
attempts: Mutex<HashMap<RemoteOp, u64>>,
/* BEGIN_HADRON */
// This the probability of failure for each operation, ranged from [0, 100].
// The probability is default to 100, which means that all operations will fail.
attempt_failure_probability: u64,
/* END_HADRON */
}
/// Used to identify retries of different unique operation.
@@ -48,11 +40,7 @@ enum RemoteOp {
}
impl UnreliableWrapper {
pub fn new(
inner: crate::GenericRemoteStorage,
attempts_to_fail: u64,
attempt_failure_probability: u64,
) -> Self {
pub fn new(inner: crate::GenericRemoteStorage, attempts_to_fail: u64) -> Self {
assert!(attempts_to_fail > 0);
let inner = match inner {
GenericRemoteStorage::AwsS3(s) => GenericRemoteStorage::AwsS3(s),
@@ -63,11 +51,9 @@ impl UnreliableWrapper {
panic!("Can't wrap unreliable wrapper unreliably")
}
};
let actual_attempt_failure_probability = cmp::min(attempt_failure_probability, 100);
UnreliableWrapper {
inner,
attempts_to_fail,
attempt_failure_probability: actual_attempt_failure_probability,
attempts: Mutex::new(HashMap::new()),
}
}
@@ -80,7 +66,6 @@ impl UnreliableWrapper {
///
fn attempt(&self, op: RemoteOp) -> anyhow::Result<u64> {
let mut attempts = self.attempts.lock().unwrap();
let mut rng = rand::thread_rng();
match attempts.entry(op) {
Entry::Occupied(mut e) => {
@@ -90,19 +75,15 @@ impl UnreliableWrapper {
*p
};
/* BEGIN_HADRON */
// If there are more attempts to fail, fail the request by probability.
if (attempts_before_this < self.attempts_to_fail)
&& (rng.gen_range(0..=100) < self.attempt_failure_probability)
{
if attempts_before_this >= self.attempts_to_fail {
// let it succeed
e.remove();
Ok(attempts_before_this)
} else {
let error =
anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
Err(error)
} else {
e.remove();
Ok(attempts_before_this)
}
/* END_HADRON */
}
Entry::Vacant(e) => {
let error = anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());

View File

@@ -165,42 +165,10 @@ pub(crate) async fn upload_remote_data(
let (data, data_len) =
upload_stream(format!("remote blob data {i}").into_bytes().into());
/* BEGIN_HADRON */
let mut metadata = None;
if matches!(&*task_client, GenericRemoteStorage::AzureBlob(_)) {
let file_path = "/tmp/dbx_upload_tmp_file.txt";
{
// Open the file in append mode
let mut file = std::fs::OpenOptions::new()
.append(true)
.create(true) // Create the file if it doesn't exist
.open(file_path)?;
// Append some bytes to the file
std::io::Write::write_all(
&mut file,
&format!("remote blob data {i}").into_bytes(),
)?;
file.sync_all()?;
}
metadata = Some(remote_storage::StorageMetadata::from([(
"databricks_azure_put_block",
file_path,
)]));
}
/* END_HADRON */
task_client
.upload(data, data_len, &blob_path, metadata, &cancel)
.upload(data, data_len, &blob_path, None, &cancel)
.await?;
// TODO: Check upload is using the put_block upload.
// We cannot consume data here since data is moved inside the upload.
// let total_bytes = data.fold(0, |acc, chunk| async move {
// acc + chunk.map(|bytes| bytes.len()).unwrap_or(0)
// }).await;
// assert_eq!(total_bytes, data_len);
Ok::<_, anyhow::Error>((blob_prefix, blob_path))
});
}

View File

@@ -219,9 +219,6 @@ async fn create_azure_client(
concurrency_limit: NonZeroUsize::new(100).unwrap(),
max_keys_per_list_response,
conn_pool_size: 8,
/* BEGIN_HADRON */
put_block_size_mb: Some(1),
/* END_HADRON */
}),
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT,

View File

@@ -44,62 +44,3 @@ where
}
}
}
/* BEGIN_HADRON */
pub enum DeploymentMode {
Dev,
Staging,
Prod,
}
pub fn get_deployment_mode() -> Option<DeploymentMode> {
match std::env::var("DEPLOYMENT_MODE") {
Ok(env) => match env.as_str() {
"development" => Some(DeploymentMode::Dev),
"staging" => Some(DeploymentMode::Staging),
"production" => Some(DeploymentMode::Prod),
_ => {
tracing::error!("Unexpected DEPLOYMENT_MODE: {}", env);
None
}
},
Err(_) => {
tracing::error!("DEPLOYMENT_MODE not set");
None
}
}
}
pub fn is_dev_or_staging() -> bool {
matches!(
get_deployment_mode(),
Some(DeploymentMode::Dev) | Some(DeploymentMode::Staging)
)
}
pub enum TestingMode {
Chaos,
Stress,
}
pub fn get_test_mode() -> Option<TestingMode> {
match std::env::var("HADRON_TEST_MODE") {
Ok(env) => match env.as_str() {
"chaos" => Some(TestingMode::Chaos),
"stress" => Some(TestingMode::Stress),
_ => {
tracing::error!("Unexpected HADRON_TEST_MODE: {}", env);
None
}
},
Err(_) => {
tracing::error!("HADRON_TEST_MODE not set");
None
}
}
}
pub fn is_chaos_testing() -> bool {
matches!(get_test_mode(), Some(TestingMode::Chaos))
}
/* END_HADRON */

View File

@@ -171,12 +171,6 @@ impl std::fmt::Display for ShardNumber {
}
}
impl std::fmt::Display for ShardCount {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl std::fmt::Display for ShardSlug<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(

View File

@@ -112,7 +112,6 @@ twox-hash.workspace = true
procfs.workspace = true
[dev-dependencies]
base64.workspace = true
criterion.workspace = true
hex-literal.workspace = true
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time", "test-util"] }

View File

@@ -1,23 +0,0 @@
[package]
name = "pageserver_client_grpc"
version = "0.1.0"
edition.workspace = true
license.workspace = true
[features]
testing = ["pageserver_api/testing"]
[dependencies]
anyhow.workspace = true
bytes.workspace = true
compute_api.workspace = true
futures.workspace = true
pageserver_api.workspace = true
pageserver_page_api.workspace = true
tokio.workspace = true
tokio-stream.workspace = true
tokio-util.workspace = true
tonic.workspace = true
tracing.workspace = true
utils.workspace = true
workspace_hack.workspace = true

View File

@@ -1,403 +0,0 @@
use std::collections::HashMap;
use std::num::NonZero;
use std::sync::Arc;
use anyhow::anyhow;
use futures::stream::FuturesUnordered;
use futures::{FutureExt as _, StreamExt as _};
use tracing::instrument;
use crate::pool::{ChannelPool, ClientGuard, ClientPool, StreamGuard, StreamPool};
use crate::retry::Retry;
use crate::split::GetPageSplitter;
use compute_api::spec::PageserverProtocol;
use pageserver_api::shard::ShardStripeSize;
use pageserver_page_api as page_api;
use utils::id::{TenantId, TimelineId};
use utils::shard::{ShardCount, ShardIndex, ShardNumber};
/// Max number of concurrent clients per channel (i.e. TCP connection). New channels will be spun up
/// when full.
///
/// TODO: tune all of these constants, and consider making them configurable.
/// TODO: consider separate limits for unary and streaming clients, so we don't fill up channels
/// with only streams.
const MAX_CLIENTS_PER_CHANNEL: NonZero<usize> = NonZero::new(16).unwrap();
/// Max number of concurrent unary request clients per shard.
const MAX_UNARY_CLIENTS: NonZero<usize> = NonZero::new(64).unwrap();
/// Max number of concurrent GetPage streams per shard. The max number of concurrent GetPage
/// requests is given by `MAX_STREAMS * MAX_STREAM_QUEUE_DEPTH`.
const MAX_STREAMS: NonZero<usize> = NonZero::new(64).unwrap();
/// Max number of pipelined requests per stream.
const MAX_STREAM_QUEUE_DEPTH: NonZero<usize> = NonZero::new(2).unwrap();
/// Max number of concurrent bulk GetPage streams per shard, used e.g. for prefetches. Because these
/// are more throughput-oriented, we have a smaller limit but higher queue depth.
const MAX_BULK_STREAMS: NonZero<usize> = NonZero::new(16).unwrap();
/// Max number of pipelined requests per bulk stream. These are more throughput-oriented and thus
/// get a larger queue depth.
const MAX_BULK_STREAM_QUEUE_DEPTH: NonZero<usize> = NonZero::new(4).unwrap();
/// A rich Pageserver gRPC client for a single tenant timeline. This client is more capable than the
/// basic `page_api::Client` gRPC client, and supports:
///
/// * Sharded tenants across multiple Pageservers.
/// * Pooling of connections, clients, and streams for efficient resource use.
/// * Concurrent use by many callers.
/// * Internal handling of GetPage bidirectional streams, with pipelining and error handling.
/// * Automatic retries.
/// * Observability.
///
/// TODO: this client does not support base backups or LSN leases, as these are only used by
/// compute_ctl. Consider adding this, but LSN leases need concurrent requests on all shards.
pub struct PageserverClient {
// TODO: support swapping out the shard map, e.g. via an ArcSwap.
shards: Shards,
retry: Retry,
}
impl PageserverClient {
/// Creates a new Pageserver client for a given tenant and timeline. Uses the Pageservers given
/// in the shard map, which must be complete and must use gRPC URLs.
pub fn new(
tenant_id: TenantId,
timeline_id: TimelineId,
shard_map: HashMap<ShardIndex, String>,
stripe_size: ShardStripeSize,
auth_token: Option<String>,
) -> anyhow::Result<Self> {
let shards = Shards::new(tenant_id, timeline_id, shard_map, stripe_size, auth_token)?;
Ok(Self {
shards,
retry: Retry,
})
}
/// Returns whether a relation exists.
#[instrument(skip_all, fields(rel=%req.rel, lsn=%req.read_lsn))]
pub async fn check_rel_exists(
&self,
req: page_api::CheckRelExistsRequest,
) -> tonic::Result<page_api::CheckRelExistsResponse> {
self.retry
.with(async || {
// Relation metadata is only available on shard 0.
let mut client = self.shards.get_zero().client().await?;
client.check_rel_exists(req).await
})
.await
}
/// Returns the total size of a database, as # of bytes.
#[instrument(skip_all, fields(db_oid=%req.db_oid, lsn=%req.read_lsn))]
pub async fn get_db_size(
&self,
req: page_api::GetDbSizeRequest,
) -> tonic::Result<page_api::GetDbSizeResponse> {
self.retry
.with(async || {
// Relation metadata is only available on shard 0.
let mut client = self.shards.get_zero().client().await?;
client.get_db_size(req).await
})
.await
}
/// Fetches pages. The `request_id` must be unique across all in-flight requests. Automatically
/// splits requests that straddle shard boundaries, and assembles the responses.
///
/// Unlike `page_api::Client`, this automatically converts `status_code` into `tonic::Status`
/// errors. All responses will have `GetPageStatusCode::Ok`.
#[instrument(skip_all, fields(
req_id = %req.request_id,
class = %req.request_class,
rel = %req.rel,
blkno = %req.block_numbers[0],
blks = %req.block_numbers.len(),
lsn = %req.read_lsn,
))]
pub async fn get_page(
&self,
req: page_api::GetPageRequest,
) -> tonic::Result<page_api::GetPageResponse> {
// Make sure we have at least one page.
if req.block_numbers.is_empty() {
return Err(tonic::Status::invalid_argument("no block number"));
}
// Fast path: request is for a single shard.
if let Some(shard_id) =
GetPageSplitter::is_single_shard(&req, self.shards.count, self.shards.stripe_size)
{
return self.get_page_for_shard(shard_id, req).await;
}
// Request spans multiple shards. Split it, dispatch concurrent per-shard requests, and
// reassemble the responses.
//
// TODO: when we support shard map updates, we need to detect when it changes and re-split
// the request on errors.
let mut splitter = GetPageSplitter::split(req, self.shards.count, self.shards.stripe_size);
let mut shard_requests: FuturesUnordered<_> = splitter
.drain_requests()
.map(|(shard_id, shard_req)| {
// NB: each request will retry internally.
self.get_page_for_shard(shard_id, shard_req)
.map(move |result| result.map(|resp| (shard_id, resp)))
})
.collect();
while let Some((shard_id, shard_response)) = shard_requests.next().await.transpose()? {
splitter.add_response(shard_id, shard_response)?;
}
splitter.assemble_response()
}
/// Fetches pages that belong to the given shard.
#[instrument(skip_all, fields(shard = %shard_id))]
async fn get_page_for_shard(
&self,
shard_id: ShardIndex,
req: page_api::GetPageRequest,
) -> tonic::Result<page_api::GetPageResponse> {
let resp = self
.retry
.with(async || {
let stream = self
.shards
.get(shard_id)?
.stream(req.request_class.is_bulk())
.await;
let resp = stream.send(req.clone()).await?;
// Convert per-request errors into a tonic::Status.
if resp.status_code != page_api::GetPageStatusCode::Ok {
return Err(tonic::Status::new(
resp.status_code.into(),
resp.reason.unwrap_or_else(|| String::from("unknown error")),
));
}
Ok(resp)
})
.await?;
// Make sure we got the right number of pages.
// NB: check outside of the retry loop, since we don't want to retry this.
let (expected, actual) = (req.block_numbers.len(), resp.page_images.len());
if expected != actual {
return Err(tonic::Status::internal(format!(
"expected {expected} pages for shard {shard_id}, got {actual}",
)));
}
Ok(resp)
}
/// Returns the size of a relation, as # of blocks.
#[instrument(skip_all, fields(rel=%req.rel, lsn=%req.read_lsn))]
pub async fn get_rel_size(
&self,
req: page_api::GetRelSizeRequest,
) -> tonic::Result<page_api::GetRelSizeResponse> {
self.retry
.with(async || {
// Relation metadata is only available on shard 0.
let mut client = self.shards.get_zero().client().await?;
client.get_rel_size(req).await
})
.await
}
/// Fetches an SLRU segment.
#[instrument(skip_all, fields(kind=%req.kind, segno=%req.segno, lsn=%req.read_lsn))]
pub async fn get_slru_segment(
&self,
req: page_api::GetSlruSegmentRequest,
) -> tonic::Result<page_api::GetSlruSegmentResponse> {
self.retry
.with(async || {
// SLRU segments are only available on shard 0.
let mut client = self.shards.get_zero().client().await?;
client.get_slru_segment(req).await
})
.await
}
}
/// Tracks the tenant's shards.
struct Shards {
/// The shard count.
///
/// NB: this is 0 for unsharded tenants, following `ShardIndex::unsharded()` convention.
count: ShardCount,
/// The stripe size. Only used for sharded tenants.
stripe_size: ShardStripeSize,
/// Shards by shard index.
///
/// NB: unsharded tenants use count 0, like `ShardIndex::unsharded()`.
///
/// INVARIANT: every shard 0..count is present.
/// INVARIANT: shard 0 is always present.
map: HashMap<ShardIndex, Shard>,
}
impl Shards {
/// Creates a new set of shards based on a shard map.
fn new(
tenant_id: TenantId,
timeline_id: TimelineId,
shard_map: HashMap<ShardIndex, String>,
stripe_size: ShardStripeSize,
auth_token: Option<String>,
) -> anyhow::Result<Self> {
let count = match shard_map.len() {
0 => return Err(anyhow!("no shards provided")),
1 => ShardCount::new(0), // NB: unsharded tenants use 0, like `ShardIndex::unsharded()`
n if n > u8::MAX as usize => return Err(anyhow!("too many shards: {n}")),
n => ShardCount::new(n as u8),
};
let mut map = HashMap::new();
for (shard_id, url) in shard_map {
// The shard index must match the computed shard count, even for unsharded tenants.
if shard_id.shard_count != count {
return Err(anyhow!("invalid shard index {shard_id}, expected {count}"));
}
// The shard index' number and count must be consistent.
if !shard_id.is_unsharded() && shard_id.shard_number.0 >= shard_id.shard_count.0 {
return Err(anyhow!("invalid shard index {shard_id}"));
}
// The above conditions guarantee that we have all shards 0..count: len() matches count,
// shard number < count, and numbers are unique (via hashmap).
let shard = Shard::new(url, tenant_id, timeline_id, shard_id, auth_token.clone())?;
map.insert(shard_id, shard);
}
Ok(Self {
count,
stripe_size,
map,
})
}
/// Looks up the given shard.
#[allow(clippy::result_large_err)] // TODO: check perf impact
fn get(&self, shard_id: ShardIndex) -> tonic::Result<&Shard> {
self.map
.get(&shard_id)
.ok_or_else(|| tonic::Status::not_found(format!("unknown shard {shard_id}")))
}
/// Returns shard 0.
fn get_zero(&self) -> &Shard {
self.get(ShardIndex::new(ShardNumber(0), self.count))
.expect("always present")
}
}
/// A single shard. Uses dedicated resource pools with the following structure:
///
/// * Channel pool: unbounded.
/// * Unary client pool: MAX_UNARY_CLIENTS.
/// * Stream client pool: unbounded.
/// * Stream pool: MAX_STREAMS and MAX_STREAM_QUEUE_DEPTH.
/// * Bulk channel pool: unbounded.
/// * Bulk client pool: unbounded.
/// * Bulk stream pool: MAX_BULK_STREAMS and MAX_BULK_STREAM_QUEUE_DEPTH.
struct Shard {
/// Unary gRPC client pool.
client_pool: Arc<ClientPool>,
/// GetPage stream pool.
stream_pool: Arc<StreamPool>,
/// GetPage stream pool for bulk requests, e.g. prefetches.
bulk_stream_pool: Arc<StreamPool>,
}
impl Shard {
/// Creates a new shard. It has its own dedicated resource pools.
fn new(
url: String,
tenant_id: TenantId,
timeline_id: TimelineId,
shard_id: ShardIndex,
auth_token: Option<String>,
) -> anyhow::Result<Self> {
// Sanity-check that the URL uses gRPC.
if PageserverProtocol::from_connstring(&url)? != PageserverProtocol::Grpc {
return Err(anyhow!("invalid shard URL {url}: must use gRPC"));
}
// Common channel pool for unary and stream requests. Bounded by client/stream pools.
let channel_pool = ChannelPool::new(url.clone(), MAX_CLIENTS_PER_CHANNEL)?;
// Client pool for unary requests.
let client_pool = ClientPool::new(
channel_pool.clone(),
tenant_id,
timeline_id,
shard_id,
auth_token.clone(),
Some(MAX_UNARY_CLIENTS),
);
// GetPage stream pool. Uses a dedicated client pool to avoid starving out unary clients,
// but shares a channel pool with it (as it's unbounded).
let stream_pool = StreamPool::new(
ClientPool::new(
channel_pool.clone(),
tenant_id,
timeline_id,
shard_id,
auth_token.clone(),
None, // unbounded, limited by stream pool
),
Some(MAX_STREAMS),
MAX_STREAM_QUEUE_DEPTH,
);
// Bulk GetPage stream pool, e.g. for prefetches. Uses dedicated channel/client/stream pools
// to avoid head-of-line blocking of latency-sensitive requests.
let bulk_stream_pool = StreamPool::new(
ClientPool::new(
ChannelPool::new(url, MAX_CLIENTS_PER_CHANNEL)?,
tenant_id,
timeline_id,
shard_id,
auth_token,
None, // unbounded, limited by stream pool
),
Some(MAX_BULK_STREAMS),
MAX_BULK_STREAM_QUEUE_DEPTH,
);
Ok(Self {
client_pool,
stream_pool,
bulk_stream_pool,
})
}
/// Returns a pooled client for this shard.
async fn client(&self) -> tonic::Result<ClientGuard> {
self.client_pool
.get()
.await
.map_err(|err| tonic::Status::internal(format!("failed to get client: {err}")))
}
/// Returns a pooled stream for this shard. If `bulk` is `true`, uses the dedicated bulk stream
/// pool (e.g. for prefetches).
async fn stream(&self, bulk: bool) -> StreamGuard {
match bulk {
false => self.stream_pool.get().await,
true => self.bulk_stream_pool.get().await,
}
}
}

View File

@@ -1,6 +0,0 @@
mod client;
mod pool;
mod retry;
mod split;
pub use client::PageserverClient;

View File

@@ -1,761 +0,0 @@
//! This module provides various Pageserver gRPC client resource pools.
//!
//! These pools are designed to reuse gRPC resources (connections, clients, and streams) across
//! multiple concurrent callers (i.e. Postgres backends). This avoids the resource cost and latency
//! of creating dedicated TCP connections and server tasks for every Postgres backend.
//!
//! Each resource has its own, nested pool. The pools are custom-built for the properties of each
//! resource -- they are different enough that a generic pool isn't suitable.
//!
//! * ChannelPool: manages gRPC channels (TCP connections) to a single Pageserver. Multiple clients
//! can acquire and use the same channel concurrently (via HTTP/2 stream multiplexing), up to a
//! per-channel client limit. Channels may be closed when they are no longer used by any clients.
//!
//! * ClientPool: manages gRPC clients for a single tenant shard. Each client acquires a (shared)
//! channel from the ChannelPool for the client's lifetime. A client can only be acquired by a
//! single caller at a time, and is returned to the pool when dropped. Idle clients may be removed
//! from the pool after some time, to free up the channel.
//!
//! * StreamPool: manages bidirectional gRPC GetPage streams. Each stream acquires a client from the
//! ClientPool for the stream's lifetime. Internal streams are not exposed to callers; instead, it
//! returns a guard that can be used to send a single request, to properly enforce queue depth and
//! route responses. Internally, the pool will reuse or spin up a suitable stream for the request,
//! possibly pipelining multiple requests from multiple callers on the same stream (up to some
//! queue depth). Idle streams may be removed from the pool after a while to free up the client.
//!
//! Each channel corresponds to one TCP connection. Each client unary request and each stream
//! corresponds to one HTTP/2 stream and server task.
//!
//! TODO: error handling (including custom error types).
//! TODO: observability.
use std::collections::{BTreeMap, HashMap};
use std::num::NonZero;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, Weak};
use std::time::{Duration, Instant};
use futures::StreamExt as _;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc, oneshot};
use tokio_util::sync::CancellationToken;
use tonic::transport::{Channel, Endpoint};
use tracing::{error, warn};
use pageserver_page_api as page_api;
use utils::id::{TenantId, TimelineId};
use utils::shard::ShardIndex;
/// Reap channels/clients/streams that have been idle for this long.
///
/// TODO: this is per-pool. For nested pools, it can take up to 3x as long for a TCP connection to
/// be reaped. First, we must wait for an idle stream to be reaped, which marks its client as idle.
/// Then, we must wait for the idle client to be reaped, which marks its channel as idle. Then, we
/// must wait for the idle channel to be reaped. Is that a problem? Maybe not, we just have to
/// account for it when setting the reap threshold. Alternatively, we can immediately reap empty
/// channels, and/or stream pool clients.
const REAP_IDLE_THRESHOLD: Duration = match cfg!(any(test, feature = "testing")) {
false => Duration::from_secs(180),
true => Duration::from_secs(1), // exercise reaping in tests
};
/// Reap idle resources with this interval.
const REAP_IDLE_INTERVAL: Duration = match cfg!(any(test, feature = "testing")) {
false => Duration::from_secs(10),
true => Duration::from_secs(1), // exercise reaping in tests
};
/// A gRPC channel pool, for a single Pageserver. A channel is shared by many clients (via HTTP/2
/// stream multiplexing), up to `clients_per_channel` -- a new channel will be spun up beyond this.
/// The pool does not limit the number of channels, and instead relies on `ClientPool` or
/// `StreamPool` to limit the number of concurrent clients.
///
/// The pool is always wrapped in an outer `Arc`, to allow long-lived guards across tasks/threads.
///
/// TODO: consider prewarming a set of channels, to avoid initial connection latency.
/// TODO: consider adding a circuit breaker for errors and fail fast.
pub struct ChannelPool {
/// Pageserver endpoint to connect to.
endpoint: Endpoint,
/// Max number of clients per channel. Beyond this, a new channel will be created.
max_clients_per_channel: NonZero<usize>,
/// Open channels.
channels: Mutex<BTreeMap<ChannelID, ChannelEntry>>,
/// Reaps idle channels.
idle_reaper: Reaper,
/// Channel ID generator.
next_channel_id: AtomicUsize,
}
type ChannelID = usize;
struct ChannelEntry {
/// The gRPC channel (i.e. TCP connection). Shared by multiple clients.
channel: Channel,
/// Number of clients using this channel.
clients: usize,
/// The channel has been idle (no clients) since this time. None if channel is in use.
/// INVARIANT: Some if clients == 0, otherwise None.
idle_since: Option<Instant>,
}
impl ChannelPool {
/// Creates a new channel pool for the given Pageserver endpoint.
pub fn new<E>(endpoint: E, max_clients_per_channel: NonZero<usize>) -> anyhow::Result<Arc<Self>>
where
E: TryInto<Endpoint> + Send + Sync + 'static,
<E as TryInto<Endpoint>>::Error: std::error::Error + Send + Sync,
{
let pool = Arc::new(Self {
endpoint: endpoint.try_into()?,
max_clients_per_channel,
channels: Mutex::default(),
idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL),
next_channel_id: AtomicUsize::default(),
});
pool.idle_reaper.spawn(&pool);
Ok(pool)
}
/// Acquires a gRPC channel for a client. Multiple clients may acquire the same channel.
///
/// This never blocks (except for mutex acquisition). The channel is connected lazily on first
/// use, and the `ChannelPool` does not have a channel limit. Channels will be re-established
/// automatically on failure (TODO: verify).
///
/// Callers should not clone the returned channel, and must hold onto the returned guard as long
/// as the channel is in use. It is unfortunately not possible to enforce this: the Protobuf
/// client requires an owned `Channel` and we don't have access to the channel's internal
/// refcount.
///
/// This is not performance-sensitive. It is only called when creating a new client, and clients
/// are pooled and reused by `ClientPool`. The total number of channels will also be small. O(n)
/// performance is therefore okay.
pub fn get(self: &Arc<Self>) -> ChannelGuard {
let mut channels = self.channels.lock().unwrap();
// Try to find an existing channel with available capacity. We check entries in BTreeMap
// order, to fill up the lower-ordered channels first. The ClientPool also prefers clients
// with lower-ordered channel IDs first. This will cluster clients in lower-ordered
// channels, and free up higher-ordered channels such that they can be reaped.
for (&id, entry) in channels.iter_mut() {
assert!(
entry.clients <= self.max_clients_per_channel.get(),
"channel overflow"
);
assert_eq!(
entry.idle_since.is_some(),
entry.clients == 0,
"incorrect channel idle state"
);
if entry.clients < self.max_clients_per_channel.get() {
entry.clients += 1;
entry.idle_since = None;
return ChannelGuard {
pool: Arc::downgrade(self),
id,
channel: Some(entry.channel.clone()),
};
}
}
// Create a new channel. We connect lazily on first use, such that we don't block here and
// other clients can join onto the same channel while it's connecting.
let channel = self.endpoint.connect_lazy();
let id = self.next_channel_id.fetch_add(1, Ordering::Relaxed);
let entry = ChannelEntry {
channel: channel.clone(),
clients: 1, // account for the guard below
idle_since: None,
};
channels.insert(id, entry);
ChannelGuard {
pool: Arc::downgrade(self),
id,
channel: Some(channel),
}
}
}
impl Reapable for ChannelPool {
/// Reaps channels that have been idle since before the cutoff.
fn reap_idle(&self, cutoff: Instant) {
self.channels.lock().unwrap().retain(|_, entry| {
let Some(idle_since) = entry.idle_since else {
assert_ne!(entry.clients, 0, "empty channel not marked idle");
return true;
};
assert_eq!(entry.clients, 0, "idle channel has clients");
idle_since >= cutoff
})
}
}
/// Tracks a channel acquired from the pool. The owned inner channel can be obtained with `take()`,
/// since the gRPC client requires an owned `Channel`.
pub struct ChannelGuard {
pool: Weak<ChannelPool>,
id: ChannelID,
channel: Option<Channel>,
}
impl ChannelGuard {
/// Returns the inner owned channel. Panics if called more than once. The caller must hold onto
/// the guard as long as the channel is in use, and should not clone it.
pub fn take(&mut self) -> Channel {
self.channel.take().expect("channel already taken")
}
}
/// Returns the channel to the pool.
impl Drop for ChannelGuard {
fn drop(&mut self) {
let Some(pool) = self.pool.upgrade() else {
return; // pool was dropped
};
let mut channels = pool.channels.lock().unwrap();
let entry = channels.get_mut(&self.id).expect("unknown channel");
assert!(entry.idle_since.is_none(), "active channel marked idle");
assert!(entry.clients > 0, "channel underflow");
entry.clients -= 1;
if entry.clients == 0 {
entry.idle_since = Some(Instant::now()); // mark channel as idle
}
}
}
/// A pool of gRPC clients for a single tenant shard. Each client acquires a channel from the inner
/// `ChannelPool`. A client is only given out to single caller at a time. The pool limits the total
/// number of concurrent clients to `max_clients` via semaphore.
///
/// The pool is always wrapped in an outer `Arc`, to allow long-lived guards across tasks/threads.
pub struct ClientPool {
/// Tenant ID.
tenant_id: TenantId,
/// Timeline ID.
timeline_id: TimelineId,
/// Shard ID.
shard_id: ShardIndex,
/// Authentication token, if any.
auth_token: Option<String>,
/// Channel pool to acquire channels from.
channel_pool: Arc<ChannelPool>,
/// Limits the max number of concurrent clients for this pool. None if the pool is unbounded.
limiter: Option<Arc<Semaphore>>,
/// Idle pooled clients. Acquired clients are removed from here and returned on drop.
///
/// The first client in the map will be acquired next. The map is sorted by client ID, which in
/// turn is sorted by its channel ID, such that we prefer acquiring idle clients from
/// lower-ordered channels. This allows us to free up and reap higher-numbered channels as idle
/// clients are reaped.
idle: Mutex<BTreeMap<ClientID, ClientEntry>>,
/// Reaps idle clients.
idle_reaper: Reaper,
/// Unique client ID generator.
next_client_id: AtomicUsize,
}
type ClientID = (ChannelID, usize);
struct ClientEntry {
/// The pooled gRPC client.
client: page_api::Client,
/// The channel guard for the channel used by the client.
channel_guard: ChannelGuard,
/// The client has been idle since this time. All clients in `ClientPool::idle` are idle by
/// definition, so this is the time when it was added back to the pool.
idle_since: Instant,
}
impl ClientPool {
/// Creates a new client pool for the given tenant shard. Channels are acquired from the given
/// `ChannelPool`, which must point to a Pageserver that hosts the tenant shard. Allows up to
/// `max_clients` concurrent clients, or unbounded if None.
pub fn new(
channel_pool: Arc<ChannelPool>,
tenant_id: TenantId,
timeline_id: TimelineId,
shard_id: ShardIndex,
auth_token: Option<String>,
max_clients: Option<NonZero<usize>>,
) -> Arc<Self> {
let pool = Arc::new(Self {
tenant_id,
timeline_id,
shard_id,
auth_token,
channel_pool,
idle: Mutex::default(),
idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL),
limiter: max_clients.map(|max| Arc::new(Semaphore::new(max.get()))),
next_client_id: AtomicUsize::default(),
});
pool.idle_reaper.spawn(&pool);
pool
}
/// Gets a client from the pool, or creates a new one if necessary. Connections are established
/// lazily and do not block, but this call can block if the pool is at `max_clients`. The client
/// is returned to the pool when the guard is dropped.
///
/// This is moderately performance-sensitive. It is called for every unary request, but these
/// establish a new gRPC stream per request so they're already expensive. GetPage requests use
/// the `StreamPool` instead.
pub async fn get(self: &Arc<Self>) -> anyhow::Result<ClientGuard> {
// Acquire a permit if the pool is bounded.
let mut permit = None;
if let Some(limiter) = self.limiter.clone() {
permit = Some(limiter.acquire_owned().await.expect("never closed"));
}
// Fast path: acquire an idle client from the pool.
if let Some((id, entry)) = self.idle.lock().unwrap().pop_first() {
return Ok(ClientGuard {
pool: Arc::downgrade(self),
id,
client: Some(entry.client),
channel_guard: Some(entry.channel_guard),
permit,
});
}
// Slow path: construct a new client.
let mut channel_guard = self.channel_pool.get();
let client = page_api::Client::new(
channel_guard.take(),
self.tenant_id,
self.timeline_id,
self.shard_id,
self.auth_token.clone(),
None,
)?;
Ok(ClientGuard {
pool: Arc::downgrade(self),
id: (
channel_guard.id,
self.next_client_id.fetch_add(1, Ordering::Relaxed),
),
client: Some(client),
channel_guard: Some(channel_guard),
permit,
})
}
}
impl Reapable for ClientPool {
/// Reaps clients that have been idle since before the cutoff.
fn reap_idle(&self, cutoff: Instant) {
self.idle
.lock()
.unwrap()
.retain(|_, entry| entry.idle_since >= cutoff)
}
}
/// A client acquired from the pool. The inner client can be accessed via Deref. The client is
/// returned to the pool when dropped.
pub struct ClientGuard {
pool: Weak<ClientPool>,
id: ClientID,
client: Option<page_api::Client>, // Some until dropped
channel_guard: Option<ChannelGuard>, // Some until dropped
permit: Option<OwnedSemaphorePermit>, // None if pool is unbounded
}
impl Deref for ClientGuard {
type Target = page_api::Client;
fn deref(&self) -> &Self::Target {
self.client.as_ref().expect("not dropped")
}
}
impl DerefMut for ClientGuard {
fn deref_mut(&mut self) -> &mut Self::Target {
self.client.as_mut().expect("not dropped")
}
}
/// Returns the client to the pool.
impl Drop for ClientGuard {
fn drop(&mut self) {
let Some(pool) = self.pool.upgrade() else {
return; // pool was dropped
};
let entry = ClientEntry {
client: self.client.take().expect("dropped once"),
channel_guard: self.channel_guard.take().expect("dropped once"),
idle_since: Instant::now(),
};
pool.idle.lock().unwrap().insert(self.id, entry);
_ = self.permit; // returned on drop, referenced for visibility
}
}
/// A pool of bidirectional gRPC streams. Currently only used for GetPage streams. Each stream
/// acquires a client from the inner `ClientPool` for the stream's lifetime.
///
/// Individual streams are not exposed to callers -- instead, the returned guard can be used to send
/// a single request and await the response. Internally, requests are multiplexed across streams and
/// channels. This allows proper queue depth enforcement and response routing.
///
/// TODO: consider making this generic over request and response types; not currently needed.
pub struct StreamPool {
/// The client pool to acquire clients from. Must be unbounded.
client_pool: Arc<ClientPool>,
/// All pooled streams.
///
/// Incoming requests will be sent over an existing stream with available capacity. If all
/// streams are full, a new one is spun up and added to the pool (up to `max_streams`). Each
/// stream has an associated Tokio task that processes requests and responses.
streams: Mutex<HashMap<StreamID, StreamEntry>>,
/// The max number of concurrent streams, or None if unbounded.
max_streams: Option<NonZero<usize>>,
/// The max number of concurrent requests per stream.
max_queue_depth: NonZero<usize>,
/// Limits the max number of concurrent requests, given by `max_streams * max_queue_depth`.
/// None if the pool is unbounded.
limiter: Option<Arc<Semaphore>>,
/// Reaps idle streams.
idle_reaper: Reaper,
/// Stream ID generator.
next_stream_id: AtomicUsize,
}
type StreamID = usize;
type RequestSender = Sender<(page_api::GetPageRequest, ResponseSender)>;
type RequestReceiver = Receiver<(page_api::GetPageRequest, ResponseSender)>;
type ResponseSender = oneshot::Sender<tonic::Result<page_api::GetPageResponse>>;
struct StreamEntry {
/// Sends caller requests to the stream task. The stream task exits when this is dropped.
sender: RequestSender,
/// Number of in-flight requests on this stream.
queue_depth: usize,
/// The time when this stream went idle (queue_depth == 0).
/// INVARIANT: Some if queue_depth == 0, otherwise None.
idle_since: Option<Instant>,
}
impl StreamPool {
/// Creates a new stream pool, using the given client pool. It will send up to `max_queue_depth`
/// concurrent requests on each stream, and use up to `max_streams` concurrent streams.
///
/// The client pool must be unbounded. The stream pool will enforce its own limits, and because
/// streams are long-lived they can cause persistent starvation if they exhaust the client pool.
/// The stream pool should generally have its own dedicated client pool (but it can share a
/// channel pool with others since these are always unbounded).
pub fn new(
client_pool: Arc<ClientPool>,
max_streams: Option<NonZero<usize>>,
max_queue_depth: NonZero<usize>,
) -> Arc<Self> {
assert!(client_pool.limiter.is_none(), "bounded client pool");
let pool = Arc::new(Self {
client_pool,
streams: Mutex::default(),
limiter: max_streams.map(|max_streams| {
Arc::new(Semaphore::new(max_streams.get() * max_queue_depth.get()))
}),
max_streams,
max_queue_depth,
idle_reaper: Reaper::new(REAP_IDLE_THRESHOLD, REAP_IDLE_INTERVAL),
next_stream_id: AtomicUsize::default(),
});
pool.idle_reaper.spawn(&pool);
pool
}
/// Acquires an available stream from the pool, or spins up a new stream async if all streams
/// are full. Returns a guard that can be used to send a single request on the stream and await
/// the response, with queue depth quota already acquired. Blocks if the pool is at capacity
/// (i.e. `CLIENT_LIMIT * STREAM_QUEUE_DEPTH` requests in flight).
///
/// This is very performance-sensitive, as it is on the GetPage hot path.
///
/// TODO: this must do something more sophisticated for performance. We want:
///
/// * Cheap, concurrent access in the common case where we can use a pooled stream.
/// * Quick acquisition of pooled streams with available capacity.
/// * Prefer streams that belong to lower-numbered channels, to reap idle channels.
/// * Prefer filling up existing streams' queue depth before spinning up new streams.
/// * Don't hold a lock while spinning up new streams.
/// * Allow concurrent clients to join onto streams while they're spun up.
/// * Allow spinning up multiple streams concurrently, but don't overshoot limits.
///
/// For now, we just do something simple but inefficient (linear scan under mutex).
pub async fn get(self: &Arc<Self>) -> StreamGuard {
// Acquire a permit if the pool is bounded.
let mut permit = None;
if let Some(limiter) = self.limiter.clone() {
permit = Some(limiter.acquire_owned().await.expect("never closed"));
}
let mut streams = self.streams.lock().unwrap();
// Look for a pooled stream with available capacity.
for (&id, entry) in streams.iter_mut() {
assert!(
entry.queue_depth <= self.max_queue_depth.get(),
"stream queue overflow"
);
assert_eq!(
entry.idle_since.is_some(),
entry.queue_depth == 0,
"incorrect stream idle state"
);
if entry.queue_depth < self.max_queue_depth.get() {
entry.queue_depth += 1;
entry.idle_since = None;
return StreamGuard {
pool: Arc::downgrade(self),
id,
sender: entry.sender.clone(),
permit,
};
}
}
// No available stream, spin up a new one. We install the stream entry in the pool first and
// return the guard, while spinning up the stream task async. This allows other callers to
// join onto this stream and also create additional streams concurrently if this fills up.
let id = self.next_stream_id.fetch_add(1, Ordering::Relaxed);
let (req_tx, req_rx) = mpsc::channel(self.max_queue_depth.get());
let entry = StreamEntry {
sender: req_tx.clone(),
queue_depth: 1, // reserve quota for this caller
idle_since: None,
};
streams.insert(id, entry);
if let Some(max_streams) = self.max_streams {
assert!(streams.len() <= max_streams.get(), "stream overflow");
};
let client_pool = self.client_pool.clone();
let pool = Arc::downgrade(self);
tokio::spawn(async move {
if let Err(err) = Self::run_stream(client_pool, req_rx).await {
error!("stream failed: {err}");
}
// Remove stream from pool on exit. Weak reference to avoid holding the pool alive.
if let Some(pool) = pool.upgrade() {
let entry = pool.streams.lock().unwrap().remove(&id);
assert!(entry.is_some(), "unknown stream ID: {id}");
}
});
StreamGuard {
pool: Arc::downgrade(self),
id,
sender: req_tx,
permit,
}
}
/// Runs a stream task. This acquires a client from the `ClientPool` and establishes a
/// bidirectional GetPage stream, then forwards requests and responses between callers and the
/// stream. It does not track or enforce queue depths -- that's done by `get()` since it must be
/// atomic with pool stream acquisition.
///
/// The task exits when the request channel is closed, or on a stream error. The caller is
/// responsible for removing the stream from the pool on exit.
async fn run_stream(
client_pool: Arc<ClientPool>,
mut caller_rx: RequestReceiver,
) -> anyhow::Result<()> {
// Acquire a client from the pool and create a stream.
let mut client = client_pool.get().await?;
// NB: use an unbounded channel such that the stream send never blocks. Otherwise, we could
// theoretically deadlock if both the client and server block on sends (since we're not
// reading responses while sending). This is unlikely to happen due to gRPC/TCP buffers and
// low queue depths, but it was seen to happen with the libpq protocol so better safe than
// sorry. It should never buffer more than the queue depth anyway, but using an unbounded
// channel guarantees that it will never block.
let (req_tx, req_rx) = mpsc::unbounded_channel();
let req_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(req_rx);
let mut resp_stream = client.get_pages(req_stream).await?;
// Track caller response channels by request ID. If the task returns early, these response
// channels will be dropped and the waiting callers will receive an error.
let mut callers = HashMap::new();
// Process requests and responses.
loop {
tokio::select! {
// Receive requests from callers and send them to the stream.
req = caller_rx.recv() => {
// Shut down if request channel is closed.
let Some((req, resp_tx)) = req else {
return Ok(());
};
// Store the response channel by request ID.
if callers.contains_key(&req.request_id) {
// Error on request ID duplicates. Ignore callers that went away.
_ = resp_tx.send(Err(tonic::Status::invalid_argument(
format!("duplicate request ID: {}", req.request_id),
)));
continue;
}
callers.insert(req.request_id, resp_tx);
// Send the request on the stream. Bail out if the stream is closed.
req_tx.send(req).map_err(|_| {
tonic::Status::unavailable("stream closed")
})?;
}
// Receive responses from the stream and send them to callers.
resp = resp_stream.next() => {
// Shut down if the stream is closed, and bail out on stream errors.
let Some(resp) = resp.transpose()? else {
return Ok(())
};
// Send the response to the caller. Ignore errors if the caller went away.
let Some(resp_tx) = callers.remove(&resp.request_id) else {
warn!("received response for unknown request ID: {}", resp.request_id);
continue;
};
_ = resp_tx.send(Ok(resp));
}
}
}
}
}
impl Reapable for StreamPool {
/// Reaps streams that have been idle since before the cutoff.
fn reap_idle(&self, cutoff: Instant) {
self.streams.lock().unwrap().retain(|_, entry| {
let Some(idle_since) = entry.idle_since else {
assert_ne!(entry.queue_depth, 0, "empty stream not marked idle");
return true;
};
assert_eq!(entry.queue_depth, 0, "idle stream has requests");
idle_since >= cutoff
});
}
}
/// A pooled stream reference. Can be used to send a single request, to properly enforce queue
/// depth. Queue depth is already reserved and will be returned on drop.
pub struct StreamGuard {
pool: Weak<StreamPool>,
id: StreamID,
sender: RequestSender,
permit: Option<OwnedSemaphorePermit>, // None if pool is unbounded
}
impl StreamGuard {
/// Sends a request on the stream and awaits the response. Consumes the guard, since it's only
/// valid for a single request (to enforce queue depth). This also drops the guard on return and
/// returns the queue depth quota to the pool.
///
/// The `GetPageRequest::request_id` must be unique across in-flight requests.
///
/// NB: errors are often returned as `GetPageResponse::status_code` instead of `tonic::Status`
/// to avoid tearing down the stream for per-request errors. Callers must check this.
pub async fn send(
self,
req: page_api::GetPageRequest,
) -> tonic::Result<page_api::GetPageResponse> {
let (resp_tx, resp_rx) = oneshot::channel();
self.sender
.send((req, resp_tx))
.await
.map_err(|_| tonic::Status::unavailable("stream closed"))?;
resp_rx
.await
.map_err(|_| tonic::Status::unavailable("stream closed"))?
}
}
impl Drop for StreamGuard {
fn drop(&mut self) {
let Some(pool) = self.pool.upgrade() else {
return; // pool was dropped
};
// Release the queue depth reservation on drop. This can prematurely decrement it if dropped
// before the response is received, but that's okay.
let mut streams = pool.streams.lock().unwrap();
let entry = streams.get_mut(&self.id).expect("unknown stream");
assert!(entry.idle_since.is_none(), "active stream marked idle");
assert!(entry.queue_depth > 0, "stream queue underflow");
entry.queue_depth -= 1;
if entry.queue_depth == 0 {
entry.idle_since = Some(Instant::now()); // mark stream as idle
}
_ = self.permit; // returned on drop, referenced for visibility
}
}
/// Periodically reaps idle resources from a pool.
struct Reaper {
/// The task check interval.
interval: Duration,
/// The threshold for reaping idle resources.
threshold: Duration,
/// Cancels the reaper task. Cancelled when the reaper is dropped.
cancel: CancellationToken,
}
impl Reaper {
/// Creates a new reaper.
pub fn new(threshold: Duration, interval: Duration) -> Self {
Self {
cancel: CancellationToken::new(),
threshold,
interval,
}
}
/// Spawns a task to periodically reap idle resources from the given task pool. The task is
/// cancelled when the reaper is dropped.
pub fn spawn(&self, pool: &Arc<impl Reapable>) {
// NB: hold a weak pool reference, otherwise the task will prevent dropping the pool.
let pool = Arc::downgrade(pool);
let cancel = self.cancel.clone();
let (interval, threshold) = (self.interval, self.threshold);
tokio::spawn(async move {
loop {
tokio::select! {
_ = tokio::time::sleep(interval) => {
let Some(pool) = pool.upgrade() else {
return; // pool was dropped
};
pool.reap_idle(Instant::now() - threshold);
}
_ = cancel.cancelled() => return,
}
}
});
}
}
impl Drop for Reaper {
fn drop(&mut self) {
self.cancel.cancel(); // cancel reaper task
}
}
/// A reapable resource pool.
trait Reapable: Send + Sync + 'static {
/// Reaps resources that have been idle since before the given cutoff.
fn reap_idle(&self, cutoff: Instant);
}

View File

@@ -1,151 +0,0 @@
use std::time::Duration;
use tokio::time::Instant;
use tracing::{error, info, warn};
use utils::backoff::exponential_backoff_duration;
/// A retry handler for Pageserver gRPC requests.
///
/// This is used instead of backoff::retry for better control and observability.
pub struct Retry;
impl Retry {
/// The per-request timeout.
// TODO: tune these, and/or make them configurable. Should we retry forever?
const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
/// The total timeout across all attempts
const TOTAL_TIMEOUT: Duration = Duration::from_secs(60);
/// The initial backoff duration.
const BASE_BACKOFF: Duration = Duration::from_millis(10);
/// The maximum backoff duration.
const MAX_BACKOFF: Duration = Duration::from_secs(10);
/// If true, log successful requests. For debugging.
const LOG_SUCCESS: bool = false;
/// Runs the given async closure with timeouts and retries (exponential backoff). Logs errors,
/// using the current tracing span for context.
///
/// Only certain gRPC status codes are retried, see [`Self::should_retry`]. For default
/// timeouts, see [`Self::REQUEST_TIMEOUT`] and [`Self::TOTAL_TIMEOUT`].
pub async fn with<T, F, O>(&self, mut f: F) -> tonic::Result<T>
where
F: FnMut() -> O,
O: Future<Output = tonic::Result<T>>,
{
let started = Instant::now();
let deadline = started + Self::TOTAL_TIMEOUT;
let mut last_error = None;
let mut retries = 0;
loop {
// Set up a future to wait for the backoff (if any) and run the request with a timeout.
let backoff_and_try = async {
// NB: sleep() always sleeps 1ms, even when given a 0 argument. See:
// https://github.com/tokio-rs/tokio/issues/6866
if let Some(backoff) = Self::backoff_duration(retries) {
tokio::time::sleep(backoff).await;
}
let request_started = Instant::now();
tokio::time::timeout(Self::REQUEST_TIMEOUT, f())
.await
.map_err(|_| {
tonic::Status::deadline_exceeded(format!(
"request timed out after {:.3}s",
request_started.elapsed().as_secs_f64()
))
})?
};
// Wait for the backoff and request, or bail out if the total timeout is exceeded.
let result = tokio::select! {
result = backoff_and_try => result,
_ = tokio::time::sleep_until(deadline) => {
let last_error = last_error.unwrap_or_else(|| {
tonic::Status::deadline_exceeded(format!(
"request timed out after {:.3}s",
started.elapsed().as_secs_f64()
))
});
error!(
"giving up after {:.3}s and {retries} retries, last error {:?}: {}",
started.elapsed().as_secs_f64(), last_error.code(), last_error.message(),
);
return Err(last_error);
}
};
match result {
// Success, return the result.
Ok(result) => {
if retries > 0 || Self::LOG_SUCCESS {
info!(
"request succeeded after {retries} retries in {:.3}s",
started.elapsed().as_secs_f64(),
);
}
return Ok(result);
}
// Error, retry or bail out.
Err(status) => {
let (code, message) = (status.code(), status.message());
let attempt = retries + 1;
if !Self::should_retry(code) {
// NB: include the attempt here too. This isn't necessarily the first
// attempt, because the error may change between attempts.
error!(
"request failed with {code:?}: {message}, not retrying (attempt {attempt})"
);
return Err(status);
}
warn!("request failed with {code:?}: {message}, retrying (attempt {attempt})");
retries += 1;
last_error = Some(status);
}
}
}
}
/// Returns the backoff duration for the given retry attempt, or None for no backoff.
fn backoff_duration(retry: usize) -> Option<Duration> {
let backoff = exponential_backoff_duration(
retry as u32,
Self::BASE_BACKOFF.as_secs_f64(),
Self::MAX_BACKOFF.as_secs_f64(),
);
(!backoff.is_zero()).then_some(backoff)
}
/// Returns true if the given status code should be retries.
fn should_retry(code: tonic::Code) -> bool {
match code {
tonic::Code::Ok => panic!("unexpected Ok status code"),
// These codes are transient, so retry them.
tonic::Code::Aborted => true,
tonic::Code::Cancelled => true,
tonic::Code::DeadlineExceeded => true, // maybe transient slowness
tonic::Code::Internal => true, // maybe transient failure?
tonic::Code::ResourceExhausted => true,
tonic::Code::Unavailable => true,
// The following codes will like continue to fail, so don't retry.
tonic::Code::AlreadyExists => false,
tonic::Code::DataLoss => false,
tonic::Code::FailedPrecondition => false,
tonic::Code::InvalidArgument => false,
tonic::Code::NotFound => false,
tonic::Code::OutOfRange => false,
tonic::Code::PermissionDenied => false,
tonic::Code::Unauthenticated => false,
tonic::Code::Unimplemented => false,
tonic::Code::Unknown => false,
}
}
}

View File

@@ -1,172 +0,0 @@
use std::collections::HashMap;
use bytes::Bytes;
use pageserver_api::key::rel_block_to_key;
use pageserver_api::shard::{ShardStripeSize, key_to_shard_number};
use pageserver_page_api as page_api;
use utils::shard::{ShardCount, ShardIndex};
/// Splits GetPageRequests that straddle shard boundaries and assembles the responses.
/// TODO: add tests for this.
pub struct GetPageSplitter {
/// The original request ID. Used for all shard requests.
request_id: page_api::RequestID,
/// Split requests by shard index.
requests: HashMap<ShardIndex, page_api::GetPageRequest>,
/// Maps the offset in `GetPageRequest::block_numbers` to the owning shard. Used to assemble
/// the response pages in the same order as the original request.
block_shards: Vec<ShardIndex>,
/// Page responses by shard index. Will be assembled into a single response.
responses: HashMap<ShardIndex, Vec<Bytes>>,
}
impl GetPageSplitter {
/// Checks if the given request only touches a single shard, and returns the shard ID. This is
/// the common case, so we check first in order to avoid unnecessary allocations and overhead.
/// The caller must ensure that the request has at least one block number, or this will panic.
pub fn is_single_shard(
req: &page_api::GetPageRequest,
count: ShardCount,
stripe_size: ShardStripeSize,
) -> Option<ShardIndex> {
// Fast path: unsharded tenant.
if count.is_unsharded() {
return Some(ShardIndex::unsharded());
}
// Find the base shard index for the first page, and compare with the rest.
let key = rel_block_to_key(req.rel, *req.block_numbers.first().expect("no pages"));
let shard_number = key_to_shard_number(count, stripe_size, &key);
req.block_numbers
.iter()
.skip(1) // computed above
.all(|&blkno| {
let key = rel_block_to_key(req.rel, blkno);
key_to_shard_number(count, stripe_size, &key) == shard_number
})
.then_some(ShardIndex::new(shard_number, count))
}
/// Splits the given request.
pub fn split(
req: page_api::GetPageRequest,
count: ShardCount,
stripe_size: ShardStripeSize,
) -> Self {
// The caller should make sure we don't split requests unnecessarily.
debug_assert!(
Self::is_single_shard(&req, count, stripe_size).is_none(),
"unnecessary request split"
);
// Split the requests by shard index.
let mut requests = HashMap::with_capacity(2); // common case
let mut block_shards = Vec::with_capacity(req.block_numbers.len());
for blkno in req.block_numbers {
let key = rel_block_to_key(req.rel, blkno);
let shard_number = key_to_shard_number(count, stripe_size, &key);
let shard_id = ShardIndex::new(shard_number, count);
let shard_req = requests
.entry(shard_id)
.or_insert_with(|| page_api::GetPageRequest {
request_id: req.request_id,
request_class: req.request_class,
rel: req.rel,
read_lsn: req.read_lsn,
block_numbers: Vec::new(),
});
shard_req.block_numbers.push(blkno);
block_shards.push(shard_id);
}
Self {
request_id: req.request_id,
responses: HashMap::with_capacity(requests.len()),
requests,
block_shards,
}
}
/// Drains the per-shard requests, moving them out of the hashmap to avoid extra allocations.
pub fn drain_requests(
&mut self,
) -> impl Iterator<Item = (ShardIndex, page_api::GetPageRequest)> {
self.requests.drain()
}
/// Adds a response from the given shard.
#[allow(clippy::result_large_err)]
pub fn add_response(
&mut self,
shard_id: ShardIndex,
response: page_api::GetPageResponse,
) -> tonic::Result<()> {
// The caller should already have converted status codes into tonic::Status.
assert_eq!(response.status_code, page_api::GetPageStatusCode::Ok);
// Make sure the response matches the request ID.
if response.request_id != self.request_id {
return Err(tonic::Status::internal(format!(
"response ID {} does not match request ID {}",
response.request_id, self.request_id
)));
}
// Add the response data to the map.
let old = self.responses.insert(shard_id, response.page_images);
if old.is_some() {
return Err(tonic::Status::internal(format!(
"duplicate response for shard {shard_id}",
)));
}
Ok(())
}
/// Assembles the shard responses into a single response. Responses must be present for all
/// relevant shards, and the total number of pages must match the original request.
#[allow(clippy::result_large_err)]
pub fn assemble_response(self) -> tonic::Result<page_api::GetPageResponse> {
let mut response = page_api::GetPageResponse {
request_id: self.request_id,
status_code: page_api::GetPageStatusCode::Ok,
reason: None,
page_images: Vec::with_capacity(self.block_shards.len()),
};
// Set up per-shard page iterators we can pull from.
let mut shard_responses = HashMap::with_capacity(self.responses.len());
for (shard_id, responses) in self.responses {
shard_responses.insert(shard_id, responses.into_iter());
}
// Reassemble the responses in the same order as the original request.
for shard_id in &self.block_shards {
let page = shard_responses
.get_mut(shard_id)
.ok_or_else(|| {
tonic::Status::internal(format!("missing response for shard {shard_id}"))
})?
.next()
.ok_or_else(|| {
tonic::Status::internal(format!("missing page from shard {shard_id}"))
})?;
response.page_images.push(page);
}
// Make sure there are no additional pages.
for (shard_id, mut pages) in shard_responses {
if pages.next().is_some() {
return Err(tonic::Status::internal(format!(
"extra pages returned from shard {shard_id}"
)));
}
}
Ok(response)
}
}

View File

@@ -1,101 +1,10 @@
use std::str::FromStr;
use anyhow::Context;
use camino::Utf8PathBuf;
use pageserver::tenant::{
IndexPart,
layer_map::{LayerMap, SearchResult},
remote_timeline_client::remote_layer_path,
storage_layer::{PersistentLayerDesc, ReadableLayerWeak},
};
use pageserver_api::key::Key;
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
shard::TenantShardId,
};
use pageserver::tenant::IndexPart;
#[derive(clap::Subcommand)]
pub(crate) enum IndexPartCmd {
Dump {
path: Utf8PathBuf,
},
/// Find all layers that need to be searched to construct the given page at the given LSN.
Search {
#[arg(long)]
tenant_id: String,
#[arg(long)]
timeline_id: String,
#[arg(long)]
path: Utf8PathBuf,
#[arg(long)]
key: String,
#[arg(long)]
lsn: String,
},
}
async fn search_layers(
tenant_id: &str,
timeline_id: &str,
path: &Utf8PathBuf,
key: &str,
lsn: &str,
) -> anyhow::Result<()> {
let tenant_id = TenantId::from_str(tenant_id).unwrap();
let tenant_shard_id = TenantShardId::unsharded(tenant_id);
let timeline_id = TimelineId::from_str(timeline_id).unwrap();
let index_json = {
let bytes = tokio::fs::read(path).await?;
IndexPart::from_json_bytes(&bytes).unwrap()
};
let mut layer_map = LayerMap::default();
{
let mut updates = layer_map.batch_update();
for (key, value) in index_json.layer_metadata.iter() {
updates.insert_historic(PersistentLayerDesc::from_filename(
tenant_shard_id,
timeline_id,
key.clone(),
value.file_size,
));
}
}
let key = Key::from_hex(key)?;
let lsn = Lsn::from_str(lsn).unwrap();
let mut end_lsn = lsn;
loop {
let result = layer_map.search(key, end_lsn);
match result {
Some(SearchResult { layer, lsn_floor }) => {
let disk_layer = match layer {
ReadableLayerWeak::PersistentLayer(layer) => layer,
ReadableLayerWeak::InMemoryLayer(_) => {
anyhow::bail!("unexpected in-memory layer")
}
};
let metadata = index_json
.layer_metadata
.get(&disk_layer.layer_name())
.unwrap();
println!(
"{}",
remote_layer_path(
&tenant_id,
&timeline_id,
metadata.shard,
&disk_layer.layer_name(),
metadata.generation
)
);
end_lsn = lsn_floor;
}
None => break,
}
}
Ok(())
Dump { path: Utf8PathBuf },
}
pub(crate) async fn main(cmd: &IndexPartCmd) -> anyhow::Result<()> {
@@ -107,12 +16,5 @@ pub(crate) async fn main(cmd: &IndexPartCmd) -> anyhow::Result<()> {
println!("{output}");
Ok(())
}
IndexPartCmd::Search {
tenant_id,
timeline_id,
path,
key,
lsn,
} => search_layers(tenant_id, timeline_id, path, key, lsn).await,
}
}

View File

@@ -1,151 +1,23 @@
use anyhow::Context as _;
use anyhow::Result;
use futures::{Stream, StreamExt as _, TryStreamExt as _};
use tokio::io::AsyncRead;
use tokio_util::io::StreamReader;
use tonic::codec::CompressionEncoding;
use tonic::metadata::AsciiMetadataValue;
use tonic::service::Interceptor;
use tonic::service::interceptor::InterceptedService;
use tonic::transport::{Channel, Endpoint};
use tonic::metadata::errors::InvalidMetadataValue;
use tonic::transport::Channel;
use tonic::{Request, Streaming};
use utils::id::{TenantId, TimelineId};
use utils::id::TenantId;
use utils::id::TimelineId;
use utils::shard::ShardIndex;
use crate::model::*;
use crate::model;
use crate::proto;
/// A basic Pageserver gRPC client, for a single tenant shard. This API uses native Rust domain
/// types from `model` rather than generated Protobuf types.
pub struct Client {
inner: proto::PageServiceClient<InterceptedService<Channel, AuthInterceptor>>,
}
impl Client {
/// Connects to the given gRPC endpoint.
pub async fn connect<E>(
endpoint: E,
tenant_id: TenantId,
timeline_id: TimelineId,
shard_id: ShardIndex,
auth_token: Option<String>,
compression: Option<CompressionEncoding>,
) -> anyhow::Result<Self>
where
E: TryInto<Endpoint> + Send + Sync + 'static,
<E as TryInto<Endpoint>>::Error: std::error::Error + Send + Sync,
{
let endpoint: Endpoint = endpoint.try_into().context("invalid endpoint")?;
let channel = endpoint.connect().await?;
Self::new(
channel,
tenant_id,
timeline_id,
shard_id,
auth_token,
compression,
)
}
/// Creates a new client using the given gRPC channel.
pub fn new(
channel: Channel,
tenant_id: TenantId,
timeline_id: TimelineId,
shard_id: ShardIndex,
auth_token: Option<String>,
compression: Option<CompressionEncoding>,
) -> anyhow::Result<Self> {
let auth = AuthInterceptor::new(tenant_id, timeline_id, shard_id, auth_token)?;
let mut inner = proto::PageServiceClient::with_interceptor(channel, auth);
if let Some(compression) = compression {
// TODO: benchmark this (including network latency).
inner = inner
.accept_compressed(compression)
.send_compressed(compression);
}
Ok(Self { inner })
}
/// Returns whether a relation exists.
pub async fn check_rel_exists(
&mut self,
req: CheckRelExistsRequest,
) -> tonic::Result<CheckRelExistsResponse> {
let req = proto::CheckRelExistsRequest::from(req);
let resp = self.inner.check_rel_exists(req).await?.into_inner();
Ok(resp.into())
}
/// Fetches a base backup.
pub async fn get_base_backup(
&mut self,
req: GetBaseBackupRequest,
) -> tonic::Result<impl AsyncRead + use<>> {
let req = proto::GetBaseBackupRequest::from(req);
let chunks = self.inner.get_base_backup(req).await?.into_inner();
Ok(StreamReader::new(
chunks
.map_ok(|resp| resp.chunk)
.map_err(std::io::Error::other),
))
}
/// Returns the total size of a database, as # of bytes.
pub async fn get_db_size(&mut self, req: GetDbSizeRequest) -> tonic::Result<GetDbSizeResponse> {
let req = proto::GetDbSizeRequest::from(req);
let resp = self.inner.get_db_size(req).await?.into_inner();
Ok(resp.into())
}
/// Fetches pages.
///
/// This is implemented as a bidirectional streaming RPC for performance. Per-request errors are
/// typically returned as status_code instead of errors, to avoid tearing down the entire stream
/// via a tonic::Status error.
pub async fn get_pages(
&mut self,
reqs: impl Stream<Item = GetPageRequest> + Send + 'static,
) -> tonic::Result<impl Stream<Item = tonic::Result<GetPageResponse>> + Send + 'static> {
let reqs = reqs.map(proto::GetPageRequest::from);
let resps = self.inner.get_pages(reqs).await?.into_inner();
Ok(resps.map_ok(GetPageResponse::from))
}
/// Returns the size of a relation, as # of blocks.
pub async fn get_rel_size(
&mut self,
req: GetRelSizeRequest,
) -> tonic::Result<GetRelSizeResponse> {
let req = proto::GetRelSizeRequest::from(req);
let resp = self.inner.get_rel_size(req).await?.into_inner();
Ok(resp.into())
}
/// Fetches an SLRU segment.
pub async fn get_slru_segment(
&mut self,
req: GetSlruSegmentRequest,
) -> tonic::Result<GetSlruSegmentResponse> {
let req = proto::GetSlruSegmentRequest::from(req);
let resp = self.inner.get_slru_segment(req).await?.into_inner();
Ok(resp.try_into()?)
}
/// Acquires or extends a lease on the given LSN. This guarantees that the Pageserver won't
/// garbage collect the LSN until the lease expires. Must be acquired on all relevant shards.
///
/// Returns the lease expiration time, or a FailedPrecondition status if the lease could not be
/// acquired because the LSN has already been garbage collected.
pub async fn lease_lsn(&mut self, req: LeaseLsnRequest) -> tonic::Result<LeaseLsnResponse> {
let req = proto::LeaseLsnRequest::from(req);
let resp = self.inner.lease_lsn(req).await?.into_inner();
Ok(resp.try_into()?)
}
}
/// Adds authentication metadata to gRPC requests.
///
/// AuthInterceptor adds tenant, timeline, and auth header to the channel. These
/// headers are required at the pageserver.
///
#[derive(Clone)]
struct AuthInterceptor {
tenant_id: AsciiMetadataValue,
@@ -158,29 +30,174 @@ impl AuthInterceptor {
fn new(
tenant_id: TenantId,
timeline_id: TimelineId,
shard_id: ShardIndex,
auth_token: Option<String>,
) -> anyhow::Result<Self> {
shard_id: ShardIndex,
) -> Result<Self, InvalidMetadataValue> {
let tenant_ascii: AsciiMetadataValue = tenant_id.to_string().try_into()?;
let timeline_ascii: AsciiMetadataValue = timeline_id.to_string().try_into()?;
let shard_ascii: AsciiMetadataValue = shard_id.to_string().try_into()?;
let auth_header: Option<AsciiMetadataValue> = match auth_token {
Some(token) => Some(format!("Bearer {token}").try_into()?),
None => None,
};
Ok(Self {
tenant_id: tenant_id.to_string().try_into()?,
timeline_id: timeline_id.to_string().try_into()?,
shard_id: shard_id.to_string().try_into()?,
auth_header: auth_token
.map(|token| format!("Bearer {token}").try_into())
.transpose()?,
tenant_id: tenant_ascii,
shard_id: shard_ascii,
timeline_id: timeline_ascii,
auth_header,
})
}
}
impl Interceptor for AuthInterceptor {
fn call(&mut self, mut req: tonic::Request<()>) -> tonic::Result<tonic::Request<()>> {
let metadata = req.metadata_mut();
metadata.insert("neon-tenant-id", self.tenant_id.clone());
metadata.insert("neon-timeline-id", self.timeline_id.clone());
metadata.insert("neon-shard-id", self.shard_id.clone());
if let Some(ref auth_header) = self.auth_header {
metadata.insert("authorization", auth_header.clone());
impl tonic::service::Interceptor for AuthInterceptor {
fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
req.metadata_mut()
.insert("neon-tenant-id", self.tenant_id.clone());
req.metadata_mut()
.insert("neon-shard-id", self.shard_id.clone());
req.metadata_mut()
.insert("neon-timeline-id", self.timeline_id.clone());
if let Some(auth_header) = &self.auth_header {
req.metadata_mut()
.insert("authorization", auth_header.clone());
}
Ok(req)
}
}
#[derive(Clone)]
pub struct Client {
client: proto::PageServiceClient<
tonic::service::interceptor::InterceptedService<Channel, AuthInterceptor>,
>,
}
impl Client {
pub async fn new<T: TryInto<tonic::transport::Endpoint> + Send + Sync + 'static>(
into_endpoint: T,
tenant_id: TenantId,
timeline_id: TimelineId,
shard_id: ShardIndex,
auth_header: Option<String>,
compression: Option<tonic::codec::CompressionEncoding>,
) -> anyhow::Result<Self> {
let endpoint: tonic::transport::Endpoint = into_endpoint
.try_into()
.map_err(|_e| anyhow::anyhow!("failed to convert endpoint"))?;
let channel = endpoint.connect().await?;
let auth = AuthInterceptor::new(tenant_id, timeline_id, auth_header, shard_id)
.map_err(|e| anyhow::anyhow!(e.to_string()))?;
let mut client = proto::PageServiceClient::with_interceptor(channel, auth);
if let Some(compression) = compression {
// TODO: benchmark this (including network latency).
client = client
.accept_compressed(compression)
.send_compressed(compression);
}
Ok(Self { client })
}
/// Returns whether a relation exists.
pub async fn check_rel_exists(
&mut self,
req: model::CheckRelExistsRequest,
) -> Result<model::CheckRelExistsResponse, tonic::Status> {
let proto_req = proto::CheckRelExistsRequest::from(req);
let response = self.client.check_rel_exists(proto_req).await?;
let proto_resp = response.into_inner();
Ok(proto_resp.into())
}
/// Fetches a base backup.
pub async fn get_base_backup(
&mut self,
req: model::GetBaseBackupRequest,
) -> Result<impl AsyncRead + use<>, tonic::Status> {
let req = proto::GetBaseBackupRequest::from(req);
let chunks = self.client.get_base_backup(req).await?.into_inner();
let reader = StreamReader::new(
chunks
.map_ok(|resp| resp.chunk)
.map_err(std::io::Error::other),
);
Ok(reader)
}
/// Returns the total size of a database, as # of bytes.
pub async fn get_db_size(
&mut self,
req: model::GetDbSizeRequest,
) -> Result<u64, tonic::Status> {
let proto_req = proto::GetDbSizeRequest::from(req);
let response = self.client.get_db_size(proto_req).await?;
Ok(response.into_inner().into())
}
/// Fetches pages.
///
/// This is implemented as a bidirectional streaming RPC for performance.
/// Per-request errors are often returned as status_code instead of errors,
/// to avoid tearing down the entire stream via tonic::Status.
pub async fn get_pages<ReqSt>(
&mut self,
inbound: ReqSt,
) -> Result<
impl Stream<Item = Result<model::GetPageResponse, tonic::Status>> + Send + 'static,
tonic::Status,
>
where
ReqSt: Stream<Item = model::GetPageRequest> + Send + 'static,
{
let outbound_proto = inbound.map(|domain_req| domain_req.into());
let req_new = Request::new(outbound_proto);
let response_stream: Streaming<proto::GetPageResponse> =
self.client.get_pages(req_new).await?.into_inner();
let domain_stream = response_stream.map_ok(model::GetPageResponse::from);
Ok(domain_stream)
}
/// Returns the size of a relation, as # of blocks.
pub async fn get_rel_size(
&mut self,
req: model::GetRelSizeRequest,
) -> Result<model::GetRelSizeResponse, tonic::Status> {
let proto_req = proto::GetRelSizeRequest::from(req);
let response = self.client.get_rel_size(proto_req).await?;
let proto_resp = response.into_inner();
Ok(proto_resp.into())
}
/// Fetches an SLRU segment.
pub async fn get_slru_segment(
&mut self,
req: model::GetSlruSegmentRequest,
) -> Result<model::GetSlruSegmentResponse, tonic::Status> {
let proto_req = proto::GetSlruSegmentRequest::from(req);
let response = self.client.get_slru_segment(proto_req).await?;
Ok(response.into_inner().try_into()?)
}
/// Acquires or extends a lease on the given LSN. This guarantees that the Pageserver won't
/// garbage collect the LSN until the lease expires. Must be acquired on all relevant shards.
///
/// Returns the lease expiration time, or a FailedPrecondition status if the lease could not be
/// acquired because the LSN has already been garbage collected.
pub async fn lease_lsn(
&mut self,
req: model::LeaseLsnRequest,
) -> Result<model::LeaseLsnResponse, tonic::Status> {
let req = proto::LeaseLsnRequest::from(req);
Ok(self.client.lease_lsn(req).await?.into_inner().try_into()?)
}
}

View File

@@ -384,7 +384,7 @@ impl From<GetPageRequest> for proto::GetPageRequest {
pub type RequestID = u64;
/// A GetPage request class.
#[derive(Clone, Copy, Debug, strum_macros::Display)]
#[derive(Clone, Copy, Debug)]
pub enum GetPageClass {
/// Unknown class. For backwards compatibility: used when an older client version sends a class
/// that a newer server version has removed.
@@ -397,19 +397,6 @@ pub enum GetPageClass {
Background,
}
impl GetPageClass {
/// Returns true if this is considered a bulk request (i.e. more throughput-oriented rather than
/// latency-sensitive).
pub fn is_bulk(&self) -> bool {
match self {
Self::Unknown => false,
Self::Normal => false,
Self::Prefetch => true,
Self::Background => true,
}
}
}
impl From<proto::GetPageClass> for GetPageClass {
fn from(pb: proto::GetPageClass) -> Self {
match pb {
@@ -615,21 +602,6 @@ impl TryFrom<tonic::Code> for GetPageStatusCode {
}
}
impl From<GetPageStatusCode> for tonic::Code {
fn from(status_code: GetPageStatusCode) -> Self {
use tonic::Code;
match status_code {
GetPageStatusCode::Unknown => Code::Unknown,
GetPageStatusCode::Ok => Code::Ok,
GetPageStatusCode::NotFound => Code::NotFound,
GetPageStatusCode::InvalidRequest => Code::InvalidArgument,
GetPageStatusCode::InternalError => Code::Internal,
GetPageStatusCode::SlowDown => Code::ResourceExhausted,
}
}
}
// Fetches the size of a relation at a given LSN, as # of blocks. Only valid on shard 0, other
// shards will error.
#[derive(Clone, Copy, Debug)]

View File

@@ -326,7 +326,7 @@ impl GrpcClient {
ttid: TenantTimelineId,
compression: bool,
) -> anyhow::Result<Self> {
let inner = page_api::Client::connect(
let inner = page_api::Client::new(
connstring.to_string(),
ttid.tenant_id,
ttid.timeline_id,

View File

@@ -625,7 +625,7 @@ impl GrpcClient {
ttid: TenantTimelineId,
compression: bool,
) -> anyhow::Result<Self> {
let mut client = page_api::Client::connect(
let mut client = page_api::Client::new(
connstring.to_string(),
ttid.tenant_id,
ttid.timeline_id,

View File

@@ -889,11 +889,8 @@ async fn create_remote_storage_client(
"Simulating remote failures for first {} attempts of each op",
conf.test_remote_failures
);
remote_storage = GenericRemoteStorage::unreliable_wrapper(
remote_storage,
conf.test_remote_failures,
conf.test_remote_failures_probability,
);
remote_storage =
GenericRemoteStorage::unreliable_wrapper(remote_storage, conf.test_remote_failures);
}
Ok(remote_storage)

View File

@@ -28,6 +28,7 @@ use reqwest::Url;
use storage_broker::Uri;
use utils::id::{NodeId, TimelineId};
use utils::logging::{LogFormat, SecretString};
use utils::serde_percent::Percent;
use crate::tenant::storage_layer::inmemory_layer::IndexEntry;
use crate::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
@@ -145,13 +146,9 @@ pub struct PageServerConf {
pub metric_collection_bucket: Option<RemoteStorageConfig>,
pub synthetic_size_calculation_interval: Duration,
pub disk_usage_based_eviction: DiskUsageEvictionTaskConfig,
pub disk_usage_based_eviction: Option<DiskUsageEvictionTaskConfig>,
// The number of allowed failures in remote storage operations.
pub test_remote_failures: u64,
// The probability of failure in remote storage operations. Only works when test_remote_failures > 1.
// Use 100 for 100% failure, 0 for no failure.
pub test_remote_failures_probability: u64,
pub ondemand_download_behavior_treat_error_as_warn: bool,
@@ -252,10 +249,6 @@ pub struct PageServerConf {
pub timeline_import_config: pageserver_api::config::TimelineImportConfig,
pub basebackup_cache_config: Option<pageserver_api::config::BasebackupCacheConfig>,
/// Defines what is a big tenant for the purpose of image layer generation.
/// See Timeline::should_check_if_image_layers_required
pub image_layer_generation_large_timeline_threshold: Option<u64>,
}
/// Token for authentication to safekeepers
@@ -400,7 +393,6 @@ impl PageServerConf {
synthetic_size_calculation_interval,
disk_usage_based_eviction,
test_remote_failures,
test_remote_failures_probability,
ondemand_download_behavior_treat_error_as_warn,
background_task_maximum_delay,
control_plane_api,
@@ -436,7 +428,6 @@ impl PageServerConf {
posthog_config,
timeline_import_config,
basebackup_cache_config,
image_layer_generation_large_timeline_threshold,
} = config_toml;
let mut conf = PageServerConf {
@@ -469,9 +460,17 @@ impl PageServerConf {
metric_collection_endpoint,
metric_collection_bucket,
synthetic_size_calculation_interval,
disk_usage_based_eviction,
disk_usage_based_eviction: Some(disk_usage_based_eviction.unwrap_or(
DiskUsageEvictionTaskConfig {
max_usage_pct: Percent::new(80).unwrap(),
min_avail_bytes: 2_000_000_000,
period: Duration::from_secs(60),
#[cfg(feature = "testing")]
mock_statvfs: None,
eviction_order: Default::default(),
},
)),
test_remote_failures,
test_remote_failures_probability,
ondemand_download_behavior_treat_error_as_warn,
background_task_maximum_delay,
control_plane_api: control_plane_api
@@ -495,7 +494,6 @@ impl PageServerConf {
dev_mode,
timeline_import_config,
basebackup_cache_config,
image_layer_generation_large_timeline_threshold,
// ------------------------------------------------------------
// fields that require additional validation or custom handling
@@ -637,7 +635,7 @@ impl PageServerConf {
pub fn dummy_conf(repo_dir: Utf8PathBuf) -> Self {
let pg_distrib_dir = Utf8PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../pg_install");
let mut config_toml = pageserver_api::config::ConfigToml {
let config_toml = pageserver_api::config::ConfigToml {
wait_lsn_timeout: Duration::from_secs(60),
wal_redo_timeout: Duration::from_secs(60),
pg_distrib_dir: Some(pg_distrib_dir),
@@ -649,15 +647,6 @@ impl PageServerConf {
control_plane_api: Some(Url::parse("http://localhost:6666").unwrap()),
..Default::default()
};
// Test authors tend to forget about the default 10min initial lease deadline
// when writing tests, which turns their immediate gc requests via mgmt API
// into no-ops. Override the binary default here, such that there is no initial
// lease deadline by default in tests. Tests that care can always override it
// themselves.
// Cf https://databricks.atlassian.net/browse/LKB-92?focusedCommentId=6722329
config_toml.tenant_config.lsn_lease_length = Duration::from_secs(0);
PageServerConf::parse_and_validate(NodeId(0), config_toml, &repo_dir).unwrap()
}
}
@@ -721,9 +710,8 @@ mod tests {
use std::time::Duration;
use camino::Utf8PathBuf;
use pageserver_api::config::{DiskUsageEvictionTaskConfig, EvictionOrder};
use rstest::rstest;
use utils::{id::NodeId, serde_percent::Percent};
use utils::id::NodeId;
use super::PageServerConf;
@@ -823,69 +811,19 @@ mod tests {
.expect("parse_and_validate");
}
#[rstest]
#[
case::omit_the_whole_config(
DiskUsageEvictionTaskConfig {
max_usage_pct: Percent::new(80).unwrap(),
min_avail_bytes: 2_000_000_000,
period: Duration::from_secs(60),
eviction_order: Default::default(),
#[cfg(feature = "testing")]
mock_statvfs: None,
enabled: true,
},
r#"
#[test]
fn test_config_disk_usage_based_eviction_is_valid() {
let input = r#"
control_plane_api = "http://localhost:6666"
"#,
)]
#[
case::omit_enabled_field(
DiskUsageEvictionTaskConfig {
max_usage_pct: Percent::new(80).unwrap(),
min_avail_bytes: 1_000_000_000,
period: Duration::from_secs(60),
eviction_order: EvictionOrder::RelativeAccessed {
highest_layer_count_loses_first: true,
},
#[cfg(feature = "testing")]
mock_statvfs: None,
enabled: true,
},
r#"
control_plane_api = "http://localhost:6666"
disk_usage_based_eviction = { max_usage_pct = 80, min_avail_bytes = 1000000000, period = "60s" }
"#,
)]
#[case::disabled(
DiskUsageEvictionTaskConfig {
max_usage_pct: Percent::new(80).unwrap(),
min_avail_bytes: 2_000_000_000,
period: Duration::from_secs(60),
eviction_order: EvictionOrder::RelativeAccessed {
highest_layer_count_loses_first: true,
},
#[cfg(feature = "testing")]
mock_statvfs: None,
enabled: false,
},
r#"
control_plane_api = "http://localhost:6666"
disk_usage_based_eviction = { enabled = false }
"#
)]
fn test_config_disk_usage_based_eviction_is_valid(
#[case] expected_disk_usage_based_eviction: DiskUsageEvictionTaskConfig,
#[case] input: &str,
) {
"#;
let config_toml = toml_edit::de::from_str::<pageserver_api::config::ConfigToml>(input)
.expect("disk_usage_based_eviction is valid");
let workdir = Utf8PathBuf::from("/nonexistent");
let config = PageServerConf::parse_and_validate(NodeId(0), config_toml, &workdir).unwrap();
let disk_usage_based_eviction = config.disk_usage_based_eviction;
assert_eq!(
expected_disk_usage_based_eviction,
disk_usage_based_eviction
);
let disk_usage_based_eviction = config.disk_usage_based_eviction.unwrap();
assert_eq!(disk_usage_based_eviction.max_usage_pct.get(), 80);
assert_eq!(disk_usage_based_eviction.min_avail_bytes, 2_000_000_000);
assert_eq!(disk_usage_based_eviction.period, Duration::from_secs(60));
assert_eq!(disk_usage_based_eviction.eviction_order, Default::default());
}
}

View File

@@ -171,8 +171,7 @@ pub fn launch_disk_usage_global_eviction_task(
tenant_manager: Arc<TenantManager>,
background_jobs_barrier: completion::Barrier,
) -> Option<DiskUsageEvictionTask> {
let task_config = &conf.disk_usage_based_eviction;
if !task_config.enabled {
let Some(task_config) = &conf.disk_usage_based_eviction else {
info!("disk usage based eviction task not configured");
return None;
};
@@ -459,9 +458,6 @@ pub(crate) async fn disk_usage_eviction_task_iteration_impl<U: Usage>(
match next {
Ok(Ok(file_size)) => {
METRICS.layers_evicted.inc();
/*BEGIN_HADRON */
METRICS.bytes_evicted.inc_by(file_size);
/*END_HADRON */
usage_assumed.add_available_bytes(file_size);
}
Ok(Err((
@@ -1269,7 +1265,6 @@ mod filesystem_level_usage {
#[cfg(feature = "testing")]
mock_statvfs: None,
eviction_order: pageserver_api::config::EvictionOrder::default(),
enabled: true,
},
total_bytes: 100_000,
avail_bytes: 0,

View File

@@ -1,8 +1,4 @@
use std::{
collections::HashMap,
sync::{Arc, atomic::AtomicBool},
time::Duration,
};
use std::{collections::HashMap, sync::Arc, time::Duration};
use arc_swap::ArcSwap;
use pageserver_api::config::NodeMetadata;
@@ -359,17 +355,11 @@ impl PerTenantProperties {
}
}
#[derive(Clone)]
pub struct TenantFeatureResolver {
inner: FeatureResolver,
tenant_id: TenantId,
cached_tenant_properties: ArcSwap<HashMap<String, PostHogFlagFilterPropertyValue>>,
// Add feature flag on the critical path below.
//
// If a feature flag will be used on the critical path, we will update it in the tenant housekeeping loop insetad of
// resolving directly by calling `evaluate_multivariate` or `evaluate_boolean`. Remember to update the flag in the
// housekeeping loop. The user should directly read this atomic flag instead of using the set of evaluate functions.
pub feature_test_remote_size_flag: AtomicBool,
cached_tenant_properties: Arc<ArcSwap<HashMap<String, PostHogFlagFilterPropertyValue>>>,
}
impl TenantFeatureResolver {
@@ -377,8 +367,7 @@ impl TenantFeatureResolver {
Self {
inner,
tenant_id,
cached_tenant_properties: ArcSwap::new(Arc::new(HashMap::new())),
feature_test_remote_size_flag: AtomicBool::new(false),
cached_tenant_properties: Arc::new(ArcSwap::new(Arc::new(HashMap::new()))),
}
}
@@ -407,14 +396,12 @@ impl TenantFeatureResolver {
self.inner.is_feature_flag_boolean(flag_key)
}
/// Refresh the cached properties and flags on the critical path.
pub fn refresh_properties_and_flags(&self, tenant_shard: &TenantShard) {
let mut remote_size_mb = Some(0.0);
pub fn update_cached_tenant_properties(&self, tenant_shard: &TenantShard) {
let mut remote_size_mb = None;
for timeline in tenant_shard.list_timelines() {
let size = timeline.metrics.resident_physical_size_get();
if size == 0 {
remote_size_mb = None;
break;
}
if let Some(ref mut remote_size_mb) = remote_size_mb {
*remote_size_mb += size as f64 / 1024.0 / 1024.0;
@@ -423,12 +410,5 @@ impl TenantFeatureResolver {
self.cached_tenant_properties.store(Arc::new(
PerTenantProperties { remote_size_mb }.into_posthog_properties(),
));
// BEGIN: Update the feature flag on the critical path.
self.feature_test_remote_size_flag.store(
self.evaluate_boolean("test-remote-size-flag").is_ok(),
std::sync::atomic::Ordering::Relaxed,
);
// END: Update the feature flag on the critical path.
}
}

View File

@@ -116,6 +116,26 @@ paths:
schema:
type: string
/v1/tenant/{tenant_id}/timeline:
parameters:
- name: tenant_id
in: path
required: true
schema:
type: string
get:
description: Get timelines for tenant
responses:
"200":
description: TimelineInfo
content:
application/json:
schema:
type: array
items:
$ref: "#/components/schemas/TimelineInfo"
/v1/tenant/{tenant_id}/timeline/{timeline_id}:
parameters:
- name: tenant_id
@@ -598,7 +618,7 @@ paths:
schema:
$ref: "#/components/schemas/SecondaryProgress"
/v1/tenant/{tenant_id}/timeline:
/v1/tenant/{tenant_id}/timeline/:
parameters:
- name: tenant_id
in: path
@@ -665,17 +685,6 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/Error"
get:
description: Get timelines for tenant
responses:
"200":
description: TimelineInfo
content:
application/json:
schema:
type: array
items:
$ref: "#/components/schemas/TimelineInfo"
/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/detach_ancestor:
parameters:
@@ -758,7 +767,7 @@ paths:
$ref: "#/components/schemas/ServiceUnavailableError"
/v1/tenant:
/v1/tenant/:
get:
description: Get tenants list
responses:
@@ -838,7 +847,7 @@ paths:
items:
$ref: "#/components/schemas/TenantInfo"
/v1/tenant/{tenant_id}/config:
/v1/tenant/{tenant_id}/config/:
parameters:
- name: tenant_id
in: path

View File

@@ -61,7 +61,6 @@ use crate::context;
use crate::context::{DownloadBehavior, RequestContext, RequestContextBuilder};
use crate::deletion_queue::DeletionQueueClient;
use crate::feature_resolver::FeatureResolver;
use crate::metrics::LOCAL_DATA_LOSS_SUSPECTED;
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::task_mgr::TaskKind;
use crate::tenant::config::LocationConf;
@@ -79,8 +78,8 @@ use crate::tenant::storage_layer::{IoConcurrency, LayerAccessStatsReset, LayerNa
use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
use crate::tenant::timeline::offload::{OffloadError, offload_timeline};
use crate::tenant::timeline::{
CompactFlags, CompactOptions, CompactRequest, MarkInvisibleRequest, Timeline, WaitLsnTimeout,
WaitLsnWaiter, import_pgdata,
CompactFlags, CompactOptions, CompactRequest, CompactionError, MarkInvisibleRequest, Timeline,
WaitLsnTimeout, WaitLsnWaiter, import_pgdata,
};
use crate::tenant::{
GetTimelineError, LogicalSizeCalculationCause, OffloadedTimeline, PageReconstructError,
@@ -2500,10 +2499,12 @@ async fn timeline_checkpoint_handler(
.compact(&cancel, flags, &ctx)
.await
.map_err(|e|
if e.is_cancel() {
ApiError::ShuttingDown
} else {
ApiError::InternalServerError(e.into_anyhow())
match e {
CompactionError::ShuttingDown => ApiError::ShuttingDown,
CompactionError::Offload(e) => ApiError::InternalServerError(anyhow::anyhow!(e)),
CompactionError::CollectKeySpaceError(e) => ApiError::InternalServerError(anyhow::anyhow!(e)),
CompactionError::Other(e) => ApiError::InternalServerError(e),
CompactionError::AlreadyRunning(_) => ApiError::InternalServerError(anyhow::anyhow!(e)),
}
)?;
}
@@ -3629,17 +3630,6 @@ async fn activate_post_import_handler(
.await
}
// [Hadron] Reset gauge metrics that are used to raised alerts. We need this API as a stop-gap measure to reset alerts
// after we manually rectify situations such as local SSD data loss. We will eventually automate this.
async fn hadron_reset_alert_gauges(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
LOCAL_DATA_LOSS_SUSPECTED.set(0);
json_response(StatusCode::OK, ())
}
/// Read the end of a tar archive.
///
/// A tar archive normally ends with two consecutive blocks of zeros, 512 bytes each.
@@ -3692,23 +3682,6 @@ async fn read_tar_eof(mut reader: (impl tokio::io::AsyncRead + Unpin)) -> anyhow
Ok(())
}
async fn force_refresh_feature_flag(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
let state = get_state(&request);
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
tenant
.feature_resolver
.refresh_properties_and_flags(&tenant);
json_response(StatusCode::OK, ())
}
async fn tenant_evaluate_feature_flag(
request: Request<Body>,
_cancel: CancellationToken,
@@ -3725,7 +3698,7 @@ async fn tenant_evaluate_feature_flag(
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
// TODO: the properties we get here might be stale right after it is collected. But such races are rare (updated every 10s)
// TODO: the properties we get here might be stale right after it is collected. But such races are rare (updated every 10s)
// and we don't need to worry about it for now.
let properties = tenant.feature_resolver.collect_properties();
if as_type.as_deref() == Some("boolean") {
@@ -4174,9 +4147,6 @@ pub fn make_router(
.get("/v1/tenant/:tenant_shard_id/feature_flag/:flag_key", |r| {
api_handler(r, tenant_evaluate_feature_flag)
})
.post("/v1/tenant/:tenant_shard_id/force_refresh_feature_flag", |r| {
api_handler(r, force_refresh_feature_flag)
})
.put("/v1/feature_flag/:flag_key", |r| {
testing_api_handler("force override feature flag - put", r, force_override_feature_flag_for_testing_put)
})
@@ -4186,8 +4156,5 @@ pub fn make_router(
.post("/v1/feature_flag_spec", |r| {
api_handler(r, update_feature_flag_spec)
})
.post("/hadron-internal/reset_alert_gauges", |r| {
api_handler(r, hadron_reset_alert_gauges)
})
.any(handler_404))
}

View File

@@ -1,4 +1,3 @@
use std::cell::Cell;
use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::os::fd::RawFd;
@@ -103,18 +102,7 @@ pub(crate) static STORAGE_TIME_COUNT_PER_TIMELINE: Lazy<IntCounterVec> = Lazy::n
.expect("failed to define a metric")
});
/* BEGIN_HADRON */
pub(crate) static STORAGE_ACTIVE_COUNT_PER_TIMELINE: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_active_storage_operations_count",
"Count of active storage operations with operation, tenant and timeline dimensions",
&["operation", "tenant_id", "shard_id", "timeline_id"],
)
.expect("failed to define a metric")
});
/*END_HADRON */
// Buckets for background operations like compaction, GC, size calculation
// Buckets for background operation duration in seconds, like compaction, GC, size calculation.
const STORAGE_OP_BUCKETS: &[f64] = &[0.010, 0.100, 1.0, 10.0, 100.0, 1000.0];
pub(crate) static STORAGE_TIME_GLOBAL: Lazy<HistogramVec> = Lazy::new(|| {
@@ -2822,31 +2810,6 @@ pub(crate) static WALRECEIVER_CANDIDATES_ADDED: Lazy<IntCounter> =
pub(crate) static WALRECEIVER_CANDIDATES_REMOVED: Lazy<IntCounter> =
Lazy::new(|| WALRECEIVER_CANDIDATES_EVENTS.with_label_values(&["remove"]));
pub(crate) static LOCAL_DATA_LOSS_SUSPECTED: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"pageserver_local_data_loss_suspected",
"Non-zero value indicates that pageserver local data loss is suspected (and highly likely)."
)
.expect("failed to define a metric")
});
// Counter keeping track of misrouted PageStream requests. Spelling out PageStream requests here to distinguish
// it from other types of reqeusts (SK wal replication, http requests, etc.). PageStream requests are used by
// Postgres compute to fetch data from pageservers.
// A misrouted PageStream request is registered if the pageserver cannot find the tenant identified in the
// request, or if the pageserver is not the "primary" serving the tenant shard. These error almost always identify
// issues with compute configuration, caused by either the compute node itself being stuck in the wrong
// configuration or Storage Controller reconciliation bugs. Misrouted requests are expected during tenant migration
// and/or during recovery following a pageserver failure, but persistently high rates of misrouted requests
// are indicative of bugs (and unavailability).
pub(crate) static MISROUTED_PAGESTREAM_REQUESTS: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_misrouted_pagestream_requests_total",
"Number of pageserver pagestream requests that were routed to the wrong pageserver"
)
.expect("failed to define a metric")
});
// Metrics collected on WAL redo operations
//
// We collect the time spent in actual WAL redo ('redo'), and time waiting
@@ -3085,19 +3048,13 @@ pub(crate) static WAL_REDO_PROCESS_COUNTERS: Lazy<WalRedoProcessCounters> =
pub(crate) struct StorageTimeMetricsTimer {
metrics: StorageTimeMetrics,
start: Instant,
stopped: Cell<bool>,
}
impl StorageTimeMetricsTimer {
fn new(metrics: StorageTimeMetrics) -> Self {
/*BEGIN_HADRON */
// record the active operation as the timer starts
metrics.timeline_active_count.inc();
/*END_HADRON */
Self {
metrics,
start: Instant::now(),
stopped: Cell::new(false),
}
}
@@ -3113,10 +3070,6 @@ impl StorageTimeMetricsTimer {
self.metrics.timeline_sum.inc_by(seconds);
self.metrics.timeline_count.inc();
self.metrics.global_histogram.observe(seconds);
/* BEGIN_HADRON*/
self.stopped.set(true);
self.metrics.timeline_active_count.dec();
/*END_HADRON */
duration
}
@@ -3127,16 +3080,6 @@ impl StorageTimeMetricsTimer {
}
}
/*BEGIN_HADRON */
impl Drop for StorageTimeMetricsTimer {
fn drop(&mut self) {
if !self.stopped.get() {
self.metrics.timeline_active_count.dec();
}
}
}
/*END_HADRON */
pub(crate) struct AlwaysRecordingStorageTimeMetricsTimer(Option<StorageTimeMetricsTimer>);
impl Drop for AlwaysRecordingStorageTimeMetricsTimer {
@@ -3162,10 +3105,6 @@ pub(crate) struct StorageTimeMetrics {
timeline_sum: Counter,
/// Number of oeprations, per operation, tenant_id and timeline_id
timeline_count: IntCounter,
/*BEGIN_HADRON */
/// Number of active operations per operation, tenant_id, and timeline_id
timeline_active_count: IntGauge,
/*END_HADRON */
/// Global histogram having only the "operation" label.
global_histogram: Histogram,
}
@@ -3185,11 +3124,6 @@ impl StorageTimeMetrics {
let timeline_count = STORAGE_TIME_COUNT_PER_TIMELINE
.get_metric_with_label_values(&[operation, tenant_id, shard_id, timeline_id])
.unwrap();
/*BEGIN_HADRON */
let timeline_active_count = STORAGE_ACTIVE_COUNT_PER_TIMELINE
.get_metric_with_label_values(&[operation, tenant_id, shard_id, timeline_id])
.unwrap();
/*END_HADRON */
let global_histogram = STORAGE_TIME_GLOBAL
.get_metric_with_label_values(&[operation])
.unwrap();
@@ -3197,7 +3131,6 @@ impl StorageTimeMetrics {
StorageTimeMetrics {
timeline_sum,
timeline_count,
timeline_active_count,
global_histogram,
}
}
@@ -3611,14 +3544,6 @@ impl TimelineMetrics {
shard_id,
timeline_id,
]);
/* BEGIN_HADRON */
let _ = STORAGE_ACTIVE_COUNT_PER_TIMELINE.remove_label_values(&[
op,
tenant_id,
shard_id,
timeline_id,
]);
/*END_HADRON */
}
for op in StorageIoSizeOperation::VARIANTS {
@@ -4411,9 +4336,6 @@ pub(crate) mod disk_usage_based_eviction {
pub(crate) layers_collected: IntCounter,
pub(crate) layers_selected: IntCounter,
pub(crate) layers_evicted: IntCounter,
/*BEGIN_HADRON */
pub(crate) bytes_evicted: IntCounter,
/*END_HADRON */
}
impl Default for Metrics {
@@ -4450,21 +4372,12 @@ pub(crate) mod disk_usage_based_eviction {
)
.unwrap();
/*BEGIN_HADRON */
let bytes_evicted = register_int_counter!(
"pageserver_disk_usage_based_eviction_evicted_bytes_total",
"Amount of bytes successfully evicted"
)
.unwrap();
/*END_HADRON */
Self {
tenant_collection_time,
tenant_layer_count,
layers_collected,
layers_selected,
layers_evicted,
bytes_evicted,
}
}
}
@@ -4584,7 +4497,6 @@ pub fn preinitialize_metrics(
&CIRCUIT_BREAKERS_UNBROKEN,
&PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS_GLOBAL,
&WAIT_LSN_IN_PROGRESS_GLOBAL_MICROS,
&MISROUTED_PAGESTREAM_REQUESTS,
]
.into_iter()
.for_each(|c| {
@@ -4622,7 +4534,6 @@ pub fn preinitialize_metrics(
// gauges
WALRECEIVER_ACTIVE_MANAGERS.get();
LOCAL_DATA_LOSS_SUSPECTED.get();
// histograms
[

View File

@@ -70,7 +70,7 @@ use crate::context::{
};
use crate::metrics::{
self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, GetPageBatchBreakReason, LIVE_CONNECTIONS,
MISROUTED_PAGESTREAM_REQUESTS, SmgrOpTimer, TimelineMetrics,
SmgrOpTimer, TimelineMetrics,
};
use crate::pgdatadir_mapping::{LsnRange, Version};
use crate::span::{
@@ -91,8 +91,7 @@ use crate::{CancellableTask, PERF_TRACE_TARGET, timed_after_cancellation};
/// is not yet in state [`TenantState::Active`].
///
/// NB: this is a different value than [`crate::http::routes::ACTIVE_TENANT_TIMEOUT`].
/// HADRON: reduced timeout and we will retry in Cache::get().
const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(5000);
const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
/// Threshold at which to log slow GetPage requests.
const LOG_SLOW_GETPAGE_THRESHOLD: Duration = Duration::from_secs(30);
@@ -1129,7 +1128,6 @@ impl PageServerHandler {
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
// and talk to a different pageserver.
MISROUTED_PAGESTREAM_REQUESTS.inc();
return respond_error!(
span,
PageStreamError::Reconnect(
@@ -1195,8 +1193,6 @@ impl PageServerHandler {
})
.await?;
failpoint_support::pausable_failpoint!("pagestream_read_message:before_gc_cutoff_check", cancel).unwrap();
// We're holding the Handle
let effective_lsn = match Self::effective_request_lsn(
&shard,
@@ -3355,8 +3351,6 @@ impl GrpcPageServiceHandler {
/// NB: errors returned from here are intercepted in get_pages(), and may be converted to a
/// GetPageResponse with an appropriate status code to avoid terminating the stream.
///
/// TODO: verify that the requested pages belong to this shard.
///
/// TODO: get_vectored() currently enforces a batch limit of 32. Postgres will typically send
/// batches up to effective_io_concurrency = 100. Either we have to accept large batches, or
/// split them up in the client or server.

View File

@@ -141,23 +141,6 @@ pub(crate) enum CollectKeySpaceError {
Cancelled,
}
impl CollectKeySpaceError {
pub(crate) fn is_cancel(&self) -> bool {
match self {
CollectKeySpaceError::Decode(_) => false,
CollectKeySpaceError::PageRead(e) => e.is_cancel(),
CollectKeySpaceError::Cancelled => true,
}
}
pub(crate) fn into_anyhow(self) -> anyhow::Error {
match self {
CollectKeySpaceError::Decode(e) => anyhow::Error::new(e),
CollectKeySpaceError::PageRead(e) => anyhow::Error::new(e),
CollectKeySpaceError::Cancelled => anyhow::Error::new(self),
}
}
}
impl From<PageReconstructError> for CollectKeySpaceError {
fn from(err: PageReconstructError) -> Self {
match err {

View File

@@ -34,7 +34,7 @@ use once_cell::sync::Lazy;
pub use pageserver_api::models::TenantState;
use pageserver_api::models::{self, RelSizeMigration};
use pageserver_api::models::{
CompactInfoResponse, TimelineArchivalState, TimelineState, TopTenantShardItem,
CompactInfoResponse, LsnLease, TimelineArchivalState, TimelineState, TopTenantShardItem,
WalRedoManagerStatus,
};
use pageserver_api::shard::{ShardIdentity, ShardStripeSize, TenantShardId};
@@ -142,9 +142,6 @@ mod gc_block;
mod gc_result;
pub(crate) mod throttle;
#[cfg(test)]
pub mod debug;
pub(crate) use timeline::{LogicalSizeCalculationCause, PageReconstructError, Timeline};
pub(crate) use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
@@ -183,7 +180,6 @@ pub(super) struct AttachedTenantConf {
impl AttachedTenantConf {
fn new(
conf: &'static PageServerConf,
tenant_conf: pageserver_api::models::TenantConfig,
location: AttachedLocationConfig,
) -> Self {
@@ -195,7 +191,9 @@ impl AttachedTenantConf {
let lsn_lease_deadline = if location.attach_mode == AttachmentMode::Single {
Some(
tokio::time::Instant::now()
+ TenantShard::get_lsn_lease_length_impl(conf, &tenant_conf),
+ tenant_conf
.lsn_lease_length
.unwrap_or(LsnLease::DEFAULT_LENGTH),
)
} else {
// We don't use `lsn_lease_deadline` to delay GC in AttachedMulti and AttachedStale
@@ -210,13 +208,10 @@ impl AttachedTenantConf {
}
}
fn try_from(
conf: &'static PageServerConf,
location_conf: LocationConf,
) -> anyhow::Result<Self> {
fn try_from(location_conf: LocationConf) -> anyhow::Result<Self> {
match &location_conf.mode {
LocationMode::Attached(attach_conf) => {
Ok(Self::new(conf, location_conf.tenant_conf, *attach_conf))
Ok(Self::new(location_conf.tenant_conf, *attach_conf))
}
LocationMode::Secondary(_) => {
anyhow::bail!(
@@ -391,7 +386,7 @@ pub struct TenantShard {
l0_flush_global_state: L0FlushGlobalState,
pub(crate) feature_resolver: Arc<TenantFeatureResolver>,
pub(crate) feature_resolver: TenantFeatureResolver,
}
impl std::fmt::Debug for TenantShard {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@@ -3291,9 +3286,7 @@ impl TenantShard {
// Ignore this, we likely raced with unarchival.
OffloadError::NotArchived => Ok(()),
OffloadError::AlreadyInProgress => Ok(()),
OffloadError::Cancelled => Err(CompactionError::new_cancelled()),
// don't break the anyhow chain
OffloadError::Other(err) => Err(CompactionError::Other(err)),
err => Err(err),
})?;
}
@@ -3321,13 +3314,27 @@ impl TenantShard {
/// Trips the compaction circuit breaker if appropriate.
pub(crate) fn maybe_trip_compaction_breaker(&self, err: &CompactionError) {
if err.is_cancel() {
return;
match err {
err if err.is_cancel() => {}
CompactionError::ShuttingDown => (),
// Offload failures don't trip the circuit breaker, since they're cheap to retry and
// shouldn't block compaction.
CompactionError::Offload(_) => {}
CompactionError::CollectKeySpaceError(err) => {
// CollectKeySpaceError::Cancelled and PageRead::Cancelled are handled in `err.is_cancel` branch.
self.compaction_circuit_breaker
.lock()
.unwrap()
.fail(&CIRCUIT_BREAKERS_BROKEN, err);
}
CompactionError::Other(err) => {
self.compaction_circuit_breaker
.lock()
.unwrap()
.fail(&CIRCUIT_BREAKERS_BROKEN, err);
}
CompactionError::AlreadyRunning(_) => {}
}
self.compaction_circuit_breaker
.lock()
.unwrap()
.fail(&CIRCUIT_BREAKERS_BROKEN, err);
}
/// Cancel scheduled compaction tasks
@@ -3404,7 +3411,7 @@ impl TenantShard {
}
// Update the feature resolver with the latest tenant-spcific data.
self.feature_resolver.refresh_properties_and_flags(self);
self.feature_resolver.update_cached_tenant_properties(self);
}
pub fn timeline_has_no_attached_children(&self, timeline_id: TimelineId) -> bool {
@@ -4171,15 +4178,6 @@ impl TenantShard {
.unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
}
// HADRON
pub fn get_image_creation_timeout(&self) -> Option<Duration> {
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
tenant_conf.image_layer_force_creation_period.or(self
.conf
.default_tenant_conf
.image_layer_force_creation_period)
}
pub fn get_pitr_interval(&self) -> Duration {
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
tenant_conf
@@ -4207,16 +4205,10 @@ impl TenantShard {
}
pub fn get_lsn_lease_length(&self) -> Duration {
Self::get_lsn_lease_length_impl(self.conf, &self.tenant_conf.load().tenant_conf)
}
pub fn get_lsn_lease_length_impl(
conf: &'static PageServerConf,
tenant_conf: &pageserver_api::models::TenantConfig,
) -> Duration {
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
tenant_conf
.lsn_lease_length
.unwrap_or(conf.default_tenant_conf.lsn_lease_length)
.unwrap_or(self.conf.default_tenant_conf.lsn_lease_length)
}
pub fn get_timeline_offloading_enabled(&self) -> bool {
@@ -4502,10 +4494,10 @@ impl TenantShard {
gc_block: Default::default(),
l0_flush_global_state,
basebackup_cache,
feature_resolver: Arc::new(TenantFeatureResolver::new(
feature_resolver: TenantFeatureResolver::new(
feature_resolver,
tenant_shard_id.tenant_id,
)),
),
}
}
@@ -6017,24 +6009,22 @@ pub(crate) mod harness {
}
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
pub(crate) async fn do_try_load_with_redo(
pub(crate) async fn do_try_load(
&self,
walredo_mgr: Arc<WalRedoManager>,
ctx: &RequestContext,
) -> anyhow::Result<Arc<TenantShard>> {
let walredo_mgr = Arc::new(WalRedoManager::from(TestRedoManager));
let (basebackup_cache, _) = BasebackupCache::new(Utf8PathBuf::new(), None);
let tenant = Arc::new(TenantShard::new(
TenantState::Attaching,
self.conf,
AttachedTenantConf::try_from(
self.conf,
LocationConf::attached_single(
self.tenant_conf.clone(),
self.generation,
ShardParameters::default(),
),
)
AttachedTenantConf::try_from(LocationConf::attached_single(
self.tenant_conf.clone(),
self.generation,
ShardParameters::default(),
))
.unwrap(),
self.shard_identity,
Some(walredo_mgr),
@@ -6059,14 +6049,6 @@ pub(crate) mod harness {
Ok(tenant)
}
pub(crate) async fn do_try_load(
&self,
ctx: &RequestContext,
) -> anyhow::Result<Arc<TenantShard>> {
let walredo_mgr = Arc::new(WalRedoManager::from(TestRedoManager));
self.do_try_load_with_redo(walredo_mgr, ctx).await
}
pub fn timeline_path(&self, timeline_id: &TimelineId) -> Utf8PathBuf {
self.conf.timeline_path(&self.tenant_shard_id, timeline_id)
}
@@ -6143,7 +6125,7 @@ mod tests {
use pageserver_api::keyspace::KeySpace;
#[cfg(feature = "testing")]
use pageserver_api::keyspace::KeySpaceRandomAccum;
use pageserver_api::models::{CompactionAlgorithm, CompactionAlgorithmSettings, LsnLease};
use pageserver_api::models::{CompactionAlgorithm, CompactionAlgorithmSettings};
use pageserver_compaction::helpers::overlaps_with;
#[cfg(feature = "testing")]
use rand::SeedableRng;
@@ -6693,13 +6675,17 @@ mod tests {
tline.freeze_and_flush().await.map_err(|e| e.into())
}
#[tokio::test]
#[tokio::test(start_paused = true)]
async fn test_prohibit_branch_creation_on_garbage_collected_data() -> anyhow::Result<()> {
let (tenant, ctx) =
TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")
.await?
.load()
.await;
// Advance to the lsn lease deadline so that GC is not blocked by
// initial transition into AttachedSingle.
tokio::time::advance(tenant.get_lsn_lease_length()).await;
tokio::time::resume();
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
@@ -9398,21 +9384,17 @@ mod tests {
Ok(())
}
#[tokio::test]
#[tokio::test(start_paused = true)]
async fn test_lsn_lease() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_lsn_lease")
.await
.unwrap()
.load()
.await;
// set a non-zero lease length to test the feature
tenant
.update_tenant_config(|mut conf| {
conf.lsn_lease_length = Some(LsnLease::DEFAULT_LENGTH);
Ok(conf)
})
.unwrap();
// Advance to the lsn lease deadline so that GC is not blocked by
// initial transition into AttachedSingle.
tokio::time::advance(tenant.get_lsn_lease_length()).await;
tokio::time::resume();
let key = Key::from_hex("010000000033333333444444445500000000").unwrap();
let end_lsn = Lsn(0x100);

View File

@@ -1,366 +0,0 @@
use std::{ops::Range, str::FromStr, sync::Arc};
use crate::walredo::RedoAttemptType;
use base64::{Engine as _, engine::general_purpose::STANDARD};
use bytes::{Bytes, BytesMut};
use camino::Utf8PathBuf;
use clap::Parser;
use itertools::Itertools;
use pageserver_api::{
key::Key,
keyspace::KeySpace,
shard::{ShardIdentity, ShardStripeSize},
};
use postgres_ffi::PgMajorVersion;
use postgres_ffi::{BLCKSZ, page_is_new, page_set_lsn};
use tracing::Instrument;
use utils::{
generation::Generation,
id::{TenantId, TimelineId},
lsn::Lsn,
shard::{ShardCount, ShardIndex, ShardNumber},
};
use wal_decoder::models::record::NeonWalRecord;
use crate::{
context::{DownloadBehavior, RequestContext},
task_mgr::TaskKind,
tenant::storage_layer::ValueReconstructState,
walredo::harness::RedoHarness,
};
use super::{
WalRedoManager, WalredoManagerId,
harness::TenantHarness,
remote_timeline_client::LayerFileMetadata,
storage_layer::{AsLayerDesc, IoConcurrency, Layer, LayerName, ValuesReconstructState},
};
fn process_page_image(next_record_lsn: Lsn, is_fpw: bool, img_bytes: Bytes) -> Bytes {
// To match the logic in libs/wal_decoder/src/serialized_batch.rs
let mut new_image: BytesMut = img_bytes.into();
if is_fpw && !page_is_new(&new_image) {
page_set_lsn(&mut new_image, next_record_lsn);
}
assert_eq!(new_image.len(), BLCKSZ as usize);
new_image.freeze()
}
async fn redo_wals(input: &str, key: Key) -> anyhow::Result<()> {
let tenant_id = TenantId::generate();
let timeline_id = TimelineId::generate();
let redo_harness = RedoHarness::new()?;
let span = redo_harness.span();
let tenant_conf = pageserver_api::models::TenantConfig {
..Default::default()
};
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
let tenant = TenantHarness::create_custom(
"search_key",
tenant_conf,
tenant_id,
ShardIdentity::unsharded(),
Generation::new(1),
)
.await?
.do_try_load_with_redo(
Arc::new(WalRedoManager::Prod(
WalredoManagerId::next(),
redo_harness.manager,
)),
&ctx,
)
.await
.unwrap();
let timeline = tenant
.create_test_timeline(timeline_id, Lsn(0x10), PgMajorVersion::PG16, &ctx)
.await?;
let contents = tokio::fs::read_to_string(input)
.await
.map_err(|e| anyhow::Error::msg(format!("Failed to read input file {input}: {e}")))
.unwrap();
let lines = contents.lines();
let mut last_wal_lsn: Option<Lsn> = None;
let state = {
let mut state = ValueReconstructState::default();
let mut is_fpw = false;
let mut is_first_line = true;
for line in lines {
if is_first_line {
is_first_line = false;
if line.trim() == "FPW" {
is_fpw = true;
}
continue; // Skip the first line.
}
// Each input line is in the "<next_record_lsn>,<base64>" format.
let (lsn_str, payload_b64) = line
.split_once(',')
.expect("Invalid input format: expected '<lsn>,<base64>'");
// Parse the LSN and decode the payload.
let lsn = Lsn::from_str(lsn_str.trim()).expect("Invalid LSN format");
let bytes = Bytes::from(
STANDARD
.decode(payload_b64.trim())
.expect("Invalid base64 payload"),
);
// The first line is considered the base image, the rest are WAL records.
if state.img.is_none() {
state.img = Some((lsn, process_page_image(lsn, is_fpw, bytes)));
} else {
let wal_record = NeonWalRecord::Postgres {
will_init: false,
rec: bytes,
};
state.records.push((lsn, wal_record));
last_wal_lsn.replace(lsn);
}
}
state
};
assert!(state.img.is_some(), "No base image found");
assert!(!state.records.is_empty(), "No WAL records found");
let result = timeline
.reconstruct_value(key, last_wal_lsn.unwrap(), state, RedoAttemptType::ReadPage)
.instrument(span.clone())
.await?;
eprintln!("final image: {:?}", STANDARD.encode(result));
Ok(())
}
async fn search_key(
tenant_id: TenantId,
timeline_id: TimelineId,
dir: String,
key: Key,
lsn: Lsn,
) -> anyhow::Result<()> {
let shard_index = ShardIndex {
shard_number: ShardNumber(0),
shard_count: ShardCount(4),
};
let redo_harness = RedoHarness::new()?;
let span = redo_harness.span();
let tenant_conf = pageserver_api::models::TenantConfig {
..Default::default()
};
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
let tenant = TenantHarness::create_custom(
"search_key",
tenant_conf,
tenant_id,
ShardIdentity::new(
shard_index.shard_number,
shard_index.shard_count,
ShardStripeSize(32768),
)
.unwrap(),
Generation::new(1),
)
.await?
.do_try_load_with_redo(
Arc::new(WalRedoManager::Prod(
WalredoManagerId::next(),
redo_harness.manager,
)),
&ctx,
)
.await
.unwrap();
let timeline = tenant
.create_test_timeline(timeline_id, Lsn(0x10), PgMajorVersion::PG16, &ctx)
.await?;
let mut delta_layers: Vec<Layer> = Vec::new();
let mut img_layer: Option<Layer> = Option::None;
let mut dir = tokio::fs::read_dir(dir).await?;
loop {
let entry = dir.next_entry().await?;
if entry.is_none() || !entry.as_ref().unwrap().file_type().await?.is_file() {
break;
}
let path = Utf8PathBuf::from_path_buf(entry.unwrap().path()).unwrap();
let layer_name = match LayerName::from_str(path.file_name().unwrap()) {
Ok(name) => name,
Err(_) => {
eprintln!("Skipped invalid layer: {path}");
continue;
}
};
let layer = Layer::for_resident(
tenant.conf,
&timeline,
path.clone(),
layer_name,
LayerFileMetadata::new(
tokio::fs::metadata(path.clone()).await?.len(),
Generation::new(1),
shard_index,
),
);
if layer.layer_desc().is_delta() {
delta_layers.push(layer.into());
} else if img_layer.is_none() {
img_layer = Some(layer.into());
} else {
anyhow::bail!("Found multiple image layers");
}
}
// sort delta layers based on the descending order of LSN
delta_layers.sort_by(|a, b| {
b.layer_desc()
.get_lsn_range()
.start
.cmp(&a.layer_desc().get_lsn_range().start)
});
let mut state = ValuesReconstructState::new(IoConcurrency::Sequential);
let key_space = KeySpace::single(Range {
start: key,
end: key.next(),
});
let lsn_range = Range {
start: img_layer
.as_ref()
.map_or(Lsn(0x00), |img| img.layer_desc().image_layer_lsn()),
end: lsn,
};
for delta_layer in delta_layers.iter() {
delta_layer
.get_values_reconstruct_data(key_space.clone(), lsn_range.clone(), &mut state, &ctx)
.await?;
}
img_layer
.as_ref()
.unwrap()
.get_values_reconstruct_data(key_space.clone(), lsn_range.clone(), &mut state, &ctx)
.await?;
for (_key, result) in std::mem::take(&mut state.keys) {
let state = result.collect_pending_ios().await?;
if state.img.is_some() {
eprintln!(
"image: {}: {:x?}",
state.img.as_ref().unwrap().0,
STANDARD.encode(state.img.as_ref().unwrap().1.clone())
);
}
for delta in state.records.iter() {
match &delta.1 {
NeonWalRecord::Postgres { will_init, rec } => {
eprintln!(
"delta: {}: will_init: {}, {:x?}",
delta.0,
will_init,
STANDARD.encode(rec)
);
}
_ => {
eprintln!("delta: {}: {:x?}", delta.0, delta.1);
}
}
}
let result = timeline
.reconstruct_value(key, lsn_range.end, state, RedoAttemptType::ReadPage)
.instrument(span.clone())
.await?;
eprintln!("final image: {lsn} : {result:?}");
}
Ok(())
}
/// Redo all WALs against the base image in the input file. Return the base64 encoded final image.
/// Each line in the input file must be in the form "<lsn>,<base64>" where:
/// * `<lsn>` is a PostgreSQL LSN in hexadecimal notation, e.g. `0/16ABCDE`.
/// * `<base64>` is the base64encoded page image (first line) or WAL record (subsequent lines).
///
/// The first line provides the base image of a page. The LSN is the LSN of "next record" following
/// the record containing the FPI. For example, if the FPI was extracted from a WAL record occuping
/// [0/1, 0/200) in the WAL stream, the LSN appearing along side the page image here should be 0/200.
///
/// The subsequent lines are WAL records, ordered from the oldest to the newest. The LSN is the
/// record LSN of the WAL record, not the "next record" LSN. For example, if the WAL record here
/// occupies [0/1, 0/200) in the WAL stream, the LSN appearing along side the WAL record here should
/// be 0/1.
#[derive(Parser)]
struct RedoWalsCmd {
#[clap(long)]
input: String,
#[clap(long)]
key: String,
}
#[tokio::test]
async fn test_redo_wals() -> anyhow::Result<()> {
let args = std::env::args().collect_vec();
let pos = args
.iter()
.position(|arg| arg == "--")
.unwrap_or(args.len());
let slice = &args[pos..args.len()];
let cmd = match RedoWalsCmd::try_parse_from(slice) {
Ok(cmd) => cmd,
Err(err) => {
eprintln!("{err}");
return Ok(());
}
};
let key = Key::from_hex(&cmd.key).unwrap();
redo_wals(&cmd.input, key).await?;
Ok(())
}
/// Search for a page at the given LSN in all layers of the data_dir.
/// Return the base64-encoded image and all WAL records, as well as the final reconstructed image.
#[derive(Parser)]
struct SearchKeyCmd {
#[clap(long)]
tenant_id: String,
#[clap(long)]
timeline_id: String,
#[clap(long)]
data_dir: String,
#[clap(long)]
key: String,
#[clap(long)]
lsn: String,
}
#[tokio::test]
async fn test_search_key() -> anyhow::Result<()> {
let args = std::env::args().collect_vec();
let pos = args
.iter()
.position(|arg| arg == "--")
.unwrap_or(args.len());
let slice = &args[pos..args.len()];
let cmd = match SearchKeyCmd::try_parse_from(slice) {
Ok(cmd) => cmd,
Err(err) => {
eprintln!("{err}");
return Ok(());
}
};
let tenant_id = TenantId::from_str(&cmd.tenant_id).unwrap();
let timeline_id = TimelineId::from_str(&cmd.timeline_id).unwrap();
let key = Key::from_hex(&cmd.key).unwrap();
let lsn = Lsn::from_str(&cmd.lsn).unwrap();
search_key(tenant_id, timeline_id, cmd.data_dir, key, lsn).await?;
Ok(())
}

View File

@@ -43,7 +43,7 @@ use crate::controller_upcall_client::{
};
use crate::deletion_queue::DeletionQueueClient;
use crate::http::routes::ACTIVE_TENANT_TIMEOUT;
use crate::metrics::{LOCAL_DATA_LOSS_SUSPECTED, TENANT, TENANT_MANAGER as METRICS};
use crate::metrics::{TENANT, TENANT_MANAGER as METRICS};
use crate::task_mgr::{BACKGROUND_RUNTIME, TaskKind};
use crate::tenant::config::{
AttachedLocationConfig, AttachmentMode, LocationConf, LocationMode, SecondaryLocationConfig,
@@ -538,21 +538,6 @@ pub async fn init_tenant_mgr(
// Determine which tenants are to be secondary or attached, and in which generation
let tenant_modes = init_load_generations(conf, &tenant_configs, resources, cancel).await?;
// Hadron local SSD check: Raise an alert if our local filesystem does not contain any tenants but the re-attach request returned tenants.
// This can happen if the PS suffered a Kubernetes node failure resulting in loss of all local data, but recovered quickly on another node
// so the Storage Controller has not had the time to move tenants out.
let data_loss_suspected = if let Some(tenant_modes) = &tenant_modes {
tenant_configs.is_empty() && !tenant_modes.is_empty()
} else {
false
};
if data_loss_suspected {
tracing::error!(
"Local data loss suspected: no tenants found on local filesystem, but re-attach request returned tenants"
);
}
LOCAL_DATA_LOSS_SUSPECTED.set(if data_loss_suspected { 1 } else { 0 });
tracing::info!(
"Attaching {} tenants at startup, warming up {} at a time",
tenant_configs.len(),
@@ -679,7 +664,7 @@ pub async fn init_tenant_mgr(
tenant_shard_id,
&tenant_dir_path,
resources.clone(),
AttachedTenantConf::new(conf, location_conf.tenant_conf, attached_conf),
AttachedTenantConf::new(location_conf.tenant_conf, attached_conf),
shard_identity,
Some(init_order.clone()),
SpawnMode::Lazy,
@@ -857,11 +842,8 @@ impl TenantManager {
// take our fast path and just provide the updated configuration
// to the tenant.
tenant.set_new_location_config(
AttachedTenantConf::try_from(
self.conf,
new_location_config.clone(),
)
.map_err(UpsertLocationError::BadRequest)?,
AttachedTenantConf::try_from(new_location_config.clone())
.map_err(UpsertLocationError::BadRequest)?,
);
Some(FastPathModified::Attached(tenant.clone()))
@@ -1064,7 +1046,7 @@ impl TenantManager {
// Testing hack: if we are configured with no control plane, then drop the generation
// from upserts. This enables creating generation-less tenants even though neon_local
// always uses generations when calling the location conf API.
let attached_conf = AttachedTenantConf::try_from(self.conf, new_location_config)
let attached_conf = AttachedTenantConf::try_from(new_location_config)
.map_err(UpsertLocationError::BadRequest)?;
let tenant = tenant_spawn(
@@ -1268,7 +1250,7 @@ impl TenantManager {
tenant_shard_id,
&tenant_path,
self.resources.clone(),
AttachedTenantConf::try_from(self.conf, config)?,
AttachedTenantConf::try_from(config)?,
shard_identity,
None,
SpawnMode::Eager,
@@ -2149,7 +2131,7 @@ impl TenantManager {
tenant_shard_id,
&tenant_path,
self.resources.clone(),
AttachedTenantConf::try_from(self.conf, config).map_err(Error::DetachReparent)?,
AttachedTenantConf::try_from(config).map_err(Error::DetachReparent)?,
shard_identity,
None,
SpawnMode::Eager,

View File

@@ -141,29 +141,11 @@ pub(super) async fn upload_timeline_layer<'a>(
let fs_size = usize::try_from(fs_size)
.with_context(|| format!("convert {local_path:?} size {fs_size} usize"))?;
/* BEGIN_HADRON */
let mut metadata = None;
match storage {
// Pass the file path as a storage metadata to minimize changes to neon.
// Otherwise, we need to change the upload interface.
GenericRemoteStorage::AzureBlob(s) => {
let block_size_mb = s.put_block_size_mb.unwrap_or(0);
if block_size_mb > 0 && fs_size > block_size_mb * 1024 * 1024 {
metadata = Some(remote_storage::StorageMetadata::from([(
"databricks_azure_put_block",
local_path.as_str(),
)]));
}
}
GenericRemoteStorage::LocalFs(_) => {}
GenericRemoteStorage::AwsS3(_) => {}
GenericRemoteStorage::Unreliable(_) => {}
};
/* END_HADRON */
let reader = tokio_util::io::ReaderStream::with_capacity(source_file, super::BUFFER_SIZE);
storage
.upload(reader, fs_size, remote_path, metadata, cancel)
.upload(reader, fs_size, remote_path, None, cancel)
.await
.with_context(|| format!("upload layer from local path '{local_path}'"))
}

View File

@@ -17,35 +17,23 @@ use tracing::*;
use utils::backoff::exponential_backoff_duration;
use utils::completion::Barrier;
use utils::pausable_failpoint;
use utils::sync::gate::GateError;
use crate::context::{DownloadBehavior, RequestContext};
use crate::metrics::{self, BackgroundLoopSemaphoreMetricsRecorder, TENANT_TASK_EVENTS};
use crate::task_mgr::{self, BACKGROUND_RUNTIME, TOKIO_WORKER_THREADS, TaskKind};
use crate::tenant::blob_io::WriteBlobError;
use crate::tenant::throttle::Stats;
use crate::tenant::timeline::CompactionError;
use crate::tenant::timeline::compaction::CompactionOutcome;
use crate::tenant::{TenantShard, TenantState};
use crate::virtual_file::owned_buffers_io::write::FlushTaskError;
/// Semaphore limiting concurrent background tasks (across all tenants).
///
/// We use 3/4 Tokio threads, to avoid blocking all threads in case we do any CPU-heavy work.
static CONCURRENT_BACKGROUND_TASKS: Lazy<Semaphore> = Lazy::new(|| {
let total_threads = TOKIO_WORKER_THREADS.get();
/*BEGIN_HADRON*/
// ideally we should run at least one compaction task per tenant in order to (1) maximize
// compaction throughput (2) avoid head-of-line blocking of large compactions. However doing
// that may create too many compaction tasks with lots of memory overheads. So we limit the
// number of compaction tasks based on the available CPU core count.
// Need to revisit.
// let tasks_per_thread = std::env::var("BG_TASKS_PER_THREAD")
// .ok()
// .and_then(|s| s.parse().ok())
// .unwrap_or(4);
// let permits = usize::max(1, total_threads * tasks_per_thread);
// // assert!(permits < total_threads, "need threads for other work");
/*END_HADRON*/
let permits = max(1, (total_threads * 3).checked_div(4).unwrap_or(0));
assert_ne!(permits, 0, "we will not be adding in permits later");
assert!(permits < total_threads, "need threads for other work");
@@ -307,12 +295,48 @@ pub(crate) fn log_compaction_error(
task_cancelled: bool,
degrade_to_warning: bool,
) {
let is_cancel = err.is_cancel();
use CompactionError::*;
let level = if is_cancel || task_cancelled {
Level::INFO
} else {
Level::ERROR
use crate::tenant::PageReconstructError;
use crate::tenant::upload_queue::NotInitialized;
let level = match err {
e if e.is_cancel() => return,
ShuttingDown => return,
Offload(_) => Level::ERROR,
AlreadyRunning(_) => Level::ERROR,
CollectKeySpaceError(_) => Level::ERROR,
_ if task_cancelled => Level::INFO,
Other(err) => {
let root_cause = err.root_cause();
let upload_queue = root_cause
.downcast_ref::<NotInitialized>()
.is_some_and(|e| e.is_stopping());
let timeline = root_cause
.downcast_ref::<PageReconstructError>()
.is_some_and(|e| e.is_stopping());
let buffered_writer_flush_task_canelled = root_cause
.downcast_ref::<FlushTaskError>()
.is_some_and(|e| e.is_cancel());
let write_blob_cancelled = root_cause
.downcast_ref::<WriteBlobError>()
.is_some_and(|e| e.is_cancel());
let gate_closed = root_cause
.downcast_ref::<GateError>()
.is_some_and(|e| e.is_cancel());
let is_stopping = upload_queue
|| timeline
|| buffered_writer_flush_task_canelled
|| write_blob_cancelled
|| gate_closed;
if is_stopping {
Level::INFO
} else {
Level::ERROR
}
}
};
if let Some((error_count, sleep_duration)) = retry_info {

View File

@@ -40,6 +40,7 @@ use layer_manager::{
Shutdown,
};
use offload::OffloadError;
use once_cell::sync::Lazy;
use pageserver_api::config::tenant_conf_defaults::DEFAULT_PITR_INTERVAL;
use pageserver_api::key::{
@@ -118,6 +119,7 @@ use crate::pgdatadir_mapping::{
MAX_AUX_FILE_V2_DELTAS, MetricsUpdate,
};
use crate::task_mgr::TaskKind;
use crate::tenant::config::AttachmentMode;
use crate::tenant::gc_result::GcResult;
use crate::tenant::layer_map::LayerMap;
use crate::tenant::metadata::TimelineMetadata;
@@ -200,7 +202,7 @@ pub struct TimelineResources {
pub l0_compaction_trigger: Arc<Notify>,
pub l0_flush_global_state: l0_flush::L0FlushGlobalState,
pub basebackup_cache: Arc<BasebackupCache>,
pub feature_resolver: Arc<TenantFeatureResolver>,
pub feature_resolver: TenantFeatureResolver,
}
pub struct Timeline {
@@ -351,13 +353,6 @@ pub struct Timeline {
last_image_layer_creation_check_at: AtomicLsn,
last_image_layer_creation_check_instant: std::sync::Mutex<Option<Instant>>,
// HADRON
/// If a key range has writes with LSN > force_image_creation_lsn, then we should force image layer creation
/// on this key range.
force_image_creation_lsn: AtomicLsn,
/// The last time instant when force_image_creation_lsn is computed.
force_image_creation_lsn_computed_at: std::sync::Mutex<Option<Instant>>,
/// Current logical size of the "datadir", at the last LSN.
current_logical_size: LogicalSize,
@@ -455,7 +450,7 @@ pub struct Timeline {
/// A channel to send async requests to prepare a basebackup for the basebackup cache.
basebackup_cache: Arc<BasebackupCache>,
feature_resolver: Arc<TenantFeatureResolver>,
feature_resolver: TenantFeatureResolver,
}
pub(crate) enum PreviousHeatmap {
@@ -592,28 +587,6 @@ pub(crate) enum PageReconstructError {
MissingKey(Box<MissingKeyError>),
}
impl PageReconstructError {
pub(crate) fn is_cancel(&self) -> bool {
match self {
PageReconstructError::Other(_) => false,
PageReconstructError::AncestorLsnTimeout(e) => e.is_cancel(),
PageReconstructError::Cancelled => true,
PageReconstructError::WalRedo(_) => false,
PageReconstructError::MissingKey(_) => false,
}
}
#[allow(dead_code)] // we use the is_cancel + into_anyhow pattern in quite a few places, this one will follow soon enough
pub(crate) fn into_anyhow(self) -> anyhow::Error {
match self {
PageReconstructError::Other(e) => e,
PageReconstructError::AncestorLsnTimeout(e) => e.into_anyhow(),
PageReconstructError::Cancelled => anyhow::Error::new(self),
PageReconstructError::WalRedo(e) => e,
PageReconstructError::MissingKey(_) => anyhow::Error::new(self),
}
}
}
impl From<anyhow::Error> for PageReconstructError {
fn from(value: anyhow::Error) -> Self {
// with walingest.rs many PageReconstructError are wrapped in as anyhow::Error
@@ -767,6 +740,17 @@ impl std::fmt::Display for MissingKeyError {
}
}
impl PageReconstructError {
/// Returns true if this error indicates a tenant/timeline shutdown alike situation
pub(crate) fn is_stopping(&self) -> bool {
use PageReconstructError::*;
match self {
Cancelled => true,
Other(_) | AncestorLsnTimeout(_) | WalRedo(_) | MissingKey(_) => false,
}
}
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum CreateImageLayersError {
#[error("timeline shutting down")]
@@ -969,35 +953,13 @@ pub enum WaitLsnError {
Timeout(String),
}
impl WaitLsnError {
pub(crate) fn is_cancel(&self) -> bool {
match self {
WaitLsnError::Shutdown => true,
WaitLsnError::BadState(timeline_state) => match timeline_state {
TimelineState::Loading => false,
TimelineState::Active => false,
TimelineState::Stopping => true,
TimelineState::Broken { .. } => false,
},
WaitLsnError::Timeout(_) => false,
}
}
pub(crate) fn into_anyhow(self) -> anyhow::Error {
match self {
WaitLsnError::Shutdown => anyhow::Error::new(self),
WaitLsnError::BadState(_) => anyhow::Error::new(self),
WaitLsnError::Timeout(_) => anyhow::Error::new(self),
}
}
}
impl From<WaitLsnError> for tonic::Status {
fn from(err: WaitLsnError) -> Self {
use tonic::Code;
let code = if err.is_cancel() {
Code::Unavailable
} else {
Code::Internal
let code = match &err {
WaitLsnError::Timeout(_) => Code::Internal,
WaitLsnError::BadState(_) => Code::Internal,
WaitLsnError::Shutdown => Code::Unavailable,
};
tonic::Status::new(code, err.to_string())
}
@@ -1009,7 +971,7 @@ impl From<WaitLsnError> for tonic::Status {
impl From<CreateImageLayersError> for CompactionError {
fn from(e: CreateImageLayersError) -> Self {
match e {
CreateImageLayersError::Cancelled => CompactionError::new_cancelled(),
CreateImageLayersError::Cancelled => CompactionError::ShuttingDown,
CreateImageLayersError::Other(e) => {
CompactionError::Other(e.context("create image layers"))
}
@@ -1124,26 +1086,6 @@ enum ImageLayerCreationOutcome {
Skip,
}
enum RepartitionError {
Other(anyhow::Error),
CollectKeyspace(CollectKeySpaceError),
}
impl RepartitionError {
fn is_cancel(&self) -> bool {
match self {
RepartitionError::Other(_) => false,
RepartitionError::CollectKeyspace(e) => e.is_cancel(),
}
}
fn into_anyhow(self) -> anyhow::Error {
match self {
RepartitionError::Other(e) => e,
RepartitionError::CollectKeyspace(e) => e.into_anyhow(),
}
}
}
/// Public interface functions
impl Timeline {
/// Get the LSN where this branch was created
@@ -1830,31 +1772,30 @@ impl Timeline {
existing_lease.clone()
}
Entry::Vacant(vacant) => {
// Never allow a lease to be requested for an LSN below the applied GC cutoff. The data could have been deleted.
let latest_gc_cutoff_lsn = self.get_applied_gc_cutoff_lsn();
if lsn < *latest_gc_cutoff_lsn {
bail!(
"tried to request an lsn lease for an lsn below the latest gc cutoff. requested at {} gc cutoff {}",
lsn,
*latest_gc_cutoff_lsn
);
}
// We allow create lease for those below the planned gc cutoff if we are still within the grace period
// of GC blocking.
// Reject already GC-ed LSN if we are in AttachedSingle and
// not blocked by the lsn lease deadline.
let validate = {
let conf = self.tenant_conf.load();
!conf.is_gc_blocked_by_lsn_lease_deadline()
conf.location.attach_mode == AttachmentMode::Single
&& !conf.is_gc_blocked_by_lsn_lease_deadline()
};
// Do not allow initial lease creation to be below the planned gc cutoff. The client (compute_ctl) determines
// whether it is a initial lease creation or a renewal.
if (init || validate) && lsn < planned_cutoff {
bail!(
"tried to request an lsn lease for an lsn below the planned gc cutoff. requested at {} planned gc cutoff {}",
lsn,
planned_cutoff
);
if init || validate {
let latest_gc_cutoff_lsn = self.get_applied_gc_cutoff_lsn();
if lsn < *latest_gc_cutoff_lsn {
bail!(
"tried to request an lsn lease for an lsn below the latest gc cutoff. requested at {} gc cutoff {}",
lsn,
*latest_gc_cutoff_lsn
);
}
if lsn < planned_cutoff {
bail!(
"tried to request an lsn lease for an lsn below the planned gc cutoff. requested at {} planned gc cutoff {}",
lsn,
planned_cutoff
);
}
}
let dt: DateTime<Utc> = valid_until.into();
@@ -2124,7 +2065,22 @@ impl Timeline {
match &result {
Ok(_) => self.compaction_failed.store(false, AtomicOrdering::Relaxed),
Err(e) if e.is_cancel() => {}
Err(_) => self.compaction_failed.store(true, AtomicOrdering::Relaxed),
Err(CompactionError::ShuttingDown) => {
// Covered by the `Err(e) if e.is_cancel()` branch.
}
Err(CompactionError::AlreadyRunning(_)) => {
// Covered by the `Err(e) if e.is_cancel()` branch.
}
Err(CompactionError::Other(_)) => {
self.compaction_failed.store(true, AtomicOrdering::Relaxed)
}
Err(CompactionError::CollectKeySpaceError(_)) => {
// Cancelled errors are covered by the `Err(e) if e.is_cancel()` branch.
self.compaction_failed.store(true, AtomicOrdering::Relaxed)
}
// Don't change the current value on offload failure or shutdown. We don't want to
// abruptly stall nor resume L0 flushes in these cases.
Err(CompactionError::Offload(_)) => {}
};
result
@@ -2853,18 +2809,6 @@ impl Timeline {
.unwrap_or(self.conf.default_tenant_conf.image_creation_threshold)
}
// HADRON
fn get_image_creation_timeout(&self) -> Option<Duration> {
let tenant_conf = self.tenant_conf.load();
tenant_conf
.tenant_conf
.image_layer_force_creation_period
.or(self
.conf
.default_tenant_conf
.image_layer_force_creation_period)
}
fn get_compaction_algorithm_settings(&self) -> CompactionAlgorithmSettings {
let tenant_conf = &self.tenant_conf.load();
tenant_conf
@@ -3134,9 +3078,7 @@ impl Timeline {
repartition_threshold: 0,
last_image_layer_creation_check_at: AtomicLsn::new(0),
last_image_layer_creation_check_instant: Mutex::new(None),
// HADRON
force_image_creation_lsn: AtomicLsn::new(0),
force_image_creation_lsn_computed_at: std::sync::Mutex::new(None),
last_received_wal: Mutex::new(None),
rel_size_latest_cache: RwLock::new(HashMap::new()),
rel_size_snapshot_cache: Mutex::new(LruCache::new(relsize_snapshot_cache_capacity)),
@@ -3187,7 +3129,7 @@ impl Timeline {
basebackup_cache: resources.basebackup_cache,
feature_resolver: resources.feature_resolver.clone(),
feature_resolver: resources.feature_resolver,
};
result.repartition_threshold =
@@ -5028,7 +4970,7 @@ impl Timeline {
ctx,
)
.await
.map_err(|e| FlushLayerError::from_anyhow(self, e.into_anyhow()))?;
.map_err(|e| FlushLayerError::from_anyhow(self, e.into()))?;
if self.cancel.is_cancelled() {
return Err(FlushLayerError::Cancelled);
@@ -5057,7 +4999,6 @@ impl Timeline {
.create_image_layers(
&partitions,
self.initdb_lsn,
None,
ImageLayerCreationMode::Initial,
ctx,
LastImageLayerCreationStatus::Initial,
@@ -5279,18 +5220,18 @@ impl Timeline {
partition_size: u64,
flags: EnumSet<CompactFlags>,
ctx: &RequestContext,
) -> Result<((KeyPartitioning, SparseKeyPartitioning), Lsn), RepartitionError> {
) -> Result<((KeyPartitioning, SparseKeyPartitioning), Lsn), CompactionError> {
let Ok(mut guard) = self.partitioning.try_write_guard() else {
// NB: there are two callers, one is the compaction task, of which there is only one per struct Tenant and hence Timeline.
// The other is the initdb optimization in flush_frozen_layer, used by `boostrap_timeline`, which runs before `.activate()`
// and hence before the compaction task starts.
return Err(RepartitionError::Other(anyhow!(
return Err(CompactionError::Other(anyhow!(
"repartition() called concurrently"
)));
};
let ((dense_partition, sparse_partition), partition_lsn) = &*guard.read();
if lsn < *partition_lsn {
return Err(RepartitionError::Other(anyhow!(
return Err(CompactionError::Other(anyhow!(
"repartition() called with LSN going backwards, this should not happen"
)));
}
@@ -5311,10 +5252,7 @@ impl Timeline {
));
}
let (dense_ks, sparse_ks) = self
.collect_keyspace(lsn, ctx)
.await
.map_err(RepartitionError::CollectKeyspace)?;
let (dense_ks, sparse_ks) = self.collect_keyspace(lsn, ctx).await?;
let dense_partitioning = dense_ks.partition(
&self.shard_identity,
partition_size,
@@ -5329,19 +5267,14 @@ impl Timeline {
}
// Is it time to create a new image layer for the given partition? True if we want to generate.
async fn time_for_new_image_layer(
&self,
partition: &KeySpace,
lsn: Lsn,
force_image_creation_lsn: Option<Lsn>,
) -> bool {
async fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> bool {
let threshold = self.get_image_creation_threshold();
let guard = self.layers.read(LayerManagerLockHolder::Compaction).await;
let Ok(layers) = guard.layer_map() else {
return false;
};
let mut min_image_lsn: Lsn = Lsn::MAX;
let mut max_deltas = 0;
for part_range in &partition.ranges {
let image_coverage = layers.image_coverage(part_range, lsn);
@@ -5376,22 +5309,9 @@ impl Timeline {
return true;
}
}
min_image_lsn = min(min_image_lsn, img_lsn);
}
}
// HADRON
if min_image_lsn < force_image_creation_lsn.unwrap_or(Lsn(0)) && max_deltas > 0 {
info!(
"forcing image creation for partitioned range {}-{}. Min image LSN: {}, force image creation LSN: {}",
partition.ranges[0].start,
partition.ranges[0].end,
min_image_lsn,
force_image_creation_lsn.unwrap()
);
return true;
}
debug!(
max_deltas,
"none of the partitioned ranges had >= {threshold} deltas"
@@ -5617,7 +5537,7 @@ impl Timeline {
/// suffer from the lack of image layers
/// 2. For small tenants (that can mostly fit in RAM), we use a much longer interval
fn should_check_if_image_layers_required(self: &Arc<Timeline>, lsn: Lsn) -> bool {
let large_timeline_threshold = self.conf.image_layer_generation_large_timeline_threshold;
const LARGE_TENANT_THRESHOLD: u64 = 2 * 1024 * 1024 * 1024;
let last_checks_at = self.last_image_layer_creation_check_at.load();
let distance = lsn
@@ -5631,12 +5551,12 @@ impl Timeline {
let mut time_based_decision = false;
let mut last_check_instant = self.last_image_layer_creation_check_instant.lock().unwrap();
if let CurrentLogicalSize::Exact(logical_size) = self.current_logical_size.current_size() {
let check_required_after =
if Some(Into::<u64>::into(&logical_size)) >= large_timeline_threshold {
self.get_checkpoint_timeout()
} else {
Duration::from_secs(3600 * 48)
};
let check_required_after = if Into::<u64>::into(&logical_size) >= LARGE_TENANT_THRESHOLD
{
self.get_checkpoint_timeout()
} else {
Duration::from_secs(3600 * 48)
};
time_based_decision = match *last_check_instant {
Some(last_check) => {
@@ -5664,12 +5584,10 @@ impl Timeline {
/// true = we have generate all image layers, false = we preempt the process for L0 compaction.
///
/// `partition_mode` is only for logging purpose and is not used anywhere in this function.
#[allow(clippy::too_many_arguments)]
async fn create_image_layers(
self: &Arc<Timeline>,
partitioning: &KeyPartitioning,
lsn: Lsn,
force_image_creation_lsn: Option<Lsn>,
mode: ImageLayerCreationMode,
ctx: &RequestContext,
last_status: LastImageLayerCreationStatus,
@@ -5773,11 +5691,7 @@ impl Timeline {
} else if let ImageLayerCreationMode::Try = mode {
// check_for_image_layers = false -> skip
// check_for_image_layers = true -> check time_for_new_image_layer -> skip/generate
if !check_for_image_layers
|| !self
.time_for_new_image_layer(partition, lsn, force_image_creation_lsn)
.await
{
if !check_for_image_layers || !self.time_for_new_image_layer(partition, lsn).await {
start = img_range.end;
continue;
}
@@ -6098,88 +6012,57 @@ impl Drop for Timeline {
}
}
pub(crate) use compaction_error::CompactionError;
/// In a private mod to enforce that [`CompactionError::is_cancel`] is used
/// instead of `match`ing on [`CompactionError::ShuttingDown`].
mod compaction_error {
use utils::sync::gate::GateError;
/// Top-level failure to compact.
#[derive(Debug, thiserror::Error)]
pub(crate) enum CompactionError {
#[error("The timeline or pageserver is shutting down")]
ShuttingDown,
/// Compaction tried to offload a timeline and failed
#[error("Failed to offload timeline: {0}")]
Offload(OffloadError),
/// Compaction cannot be done right now; page reconstruction and so on.
#[error("Failed to collect keyspace: {0}")]
CollectKeySpaceError(#[from] CollectKeySpaceError),
#[error(transparent)]
Other(anyhow::Error),
#[error("Compaction already running: {0}")]
AlreadyRunning(&'static str),
}
use crate::{
pgdatadir_mapping::CollectKeySpaceError,
tenant::{PageReconstructError, blob_io::WriteBlobError, upload_queue::NotInitialized},
virtual_file::owned_buffers_io::write::FlushTaskError,
};
/// Top-level failure to compact. Use [`Self::is_cancel`].
#[derive(Debug, thiserror::Error)]
pub(crate) enum CompactionError {
/// Use [`Self::is_cancel`] instead of checking for this variant.
#[error("The timeline or pageserver is shutting down")]
#[allow(private_interfaces)]
ShuttingDown(ForbidMatching), // private ForbidMatching enforces use of [`Self::is_cancel`].
#[error(transparent)]
Other(anyhow::Error),
impl CompactionError {
/// Errors that can be ignored, i.e., cancel and shutdown.
pub fn is_cancel(&self) -> bool {
matches!(
self,
Self::ShuttingDown
| Self::AlreadyRunning(_)
| Self::CollectKeySpaceError(CollectKeySpaceError::Cancelled)
| Self::CollectKeySpaceError(CollectKeySpaceError::PageRead(
PageReconstructError::Cancelled
))
| Self::Offload(OffloadError::Cancelled)
)
}
#[derive(Debug)]
struct ForbidMatching;
/// Critical errors that indicate data corruption.
pub fn is_critical(&self) -> bool {
matches!(
self,
Self::CollectKeySpaceError(
CollectKeySpaceError::Decode(_)
| CollectKeySpaceError::PageRead(
PageReconstructError::MissingKey(_) | PageReconstructError::WalRedo(_),
)
)
)
}
}
impl CompactionError {
pub fn new_cancelled() -> Self {
Self::ShuttingDown(ForbidMatching)
}
/// Errors that can be ignored, i.e., cancel and shutdown.
pub fn is_cancel(&self) -> bool {
let other = match self {
CompactionError::ShuttingDown(_) => return true,
CompactionError::Other(other) => other,
};
// The write path of compaction in particular often lacks differentiated
// handling errors stemming from cancellation from other errors.
// So, if requested, we also check the ::Other variant by downcasting.
// The list below has been found empirically from flaky tests and production logs.
// The process is simple: on ::Other(), compaction will print the enclosed
// anyhow::Error in debug mode, i.e., with backtrace. That backtrace contains the
// line where the write path / compaction code does undifferentiated error handling
// from a non-anyhow type to an anyhow type. Add the type to the list of downcasts
// below, following the same is_cancel() pattern.
let root_cause = other.root_cause();
let upload_queue = root_cause
.downcast_ref::<NotInitialized>()
.is_some_and(|e| e.is_stopping());
let timeline = root_cause
.downcast_ref::<PageReconstructError>()
.is_some_and(|e| e.is_cancel());
let buffered_writer_flush_task_canelled = root_cause
.downcast_ref::<FlushTaskError>()
.is_some_and(|e| e.is_cancel());
let write_blob_cancelled = root_cause
.downcast_ref::<WriteBlobError>()
.is_some_and(|e| e.is_cancel());
let gate_closed = root_cause
.downcast_ref::<GateError>()
.is_some_and(|e| e.is_cancel());
upload_queue
|| timeline
|| buffered_writer_flush_task_canelled
|| write_blob_cancelled
|| gate_closed
}
pub fn into_anyhow(self) -> anyhow::Error {
match self {
CompactionError::ShuttingDown(ForbidMatching) => anyhow::Error::new(self),
CompactionError::Other(e) => e,
}
}
pub fn from_collect_keyspace(err: CollectKeySpaceError) -> Self {
if err.is_cancel() {
Self::new_cancelled()
} else {
Self::Other(err.into_anyhow())
}
impl From<OffloadError> for CompactionError {
fn from(e: OffloadError) -> Self {
match e {
OffloadError::Cancelled => Self::ShuttingDown,
_ => Self::Offload(e),
}
}
}
@@ -6191,7 +6074,7 @@ impl From<super::upload_queue::NotInitialized> for CompactionError {
CompactionError::Other(anyhow::anyhow!(value))
}
super::upload_queue::NotInitialized::ShuttingDown
| super::upload_queue::NotInitialized::Stopped => CompactionError::new_cancelled(),
| super::upload_queue::NotInitialized::Stopped => CompactionError::ShuttingDown,
}
}
}
@@ -6201,7 +6084,7 @@ impl From<super::storage_layer::layer::DownloadError> for CompactionError {
match e {
super::storage_layer::layer::DownloadError::TimelineShutdown
| super::storage_layer::layer::DownloadError::DownloadCancelled => {
CompactionError::new_cancelled()
CompactionError::ShuttingDown
}
super::storage_layer::layer::DownloadError::ContextAndConfigReallyDeniesDownloads
| super::storage_layer::layer::DownloadError::DownloadRequired
@@ -6220,14 +6103,14 @@ impl From<super::storage_layer::layer::DownloadError> for CompactionError {
impl From<layer_manager::Shutdown> for CompactionError {
fn from(_: layer_manager::Shutdown) -> Self {
CompactionError::new_cancelled()
CompactionError::ShuttingDown
}
}
impl From<super::storage_layer::errors::PutError> for CompactionError {
fn from(e: super::storage_layer::errors::PutError) -> Self {
if e.is_cancel() {
CompactionError::new_cancelled()
CompactionError::ShuttingDown
} else {
CompactionError::Other(e.into_anyhow())
}
@@ -6326,7 +6209,7 @@ impl Timeline {
let mut guard = tokio::select! {
guard = self.layers.write(LayerManagerLockHolder::Compaction) => guard,
_ = self.cancel.cancelled() => {
return Err(CompactionError::new_cancelled());
return Err(CompactionError::ShuttingDown);
}
};
@@ -6628,12 +6511,12 @@ impl Timeline {
let standby_horizon = self.standby_horizon.load();
// Hold GC for the standby, but as a safety guard do it only within some
// reasonable lag.
if true && standby_horizon != Lsn::INVALID {
if standby_horizon != Lsn::INVALID {
if let Some(standby_lag) = new_gc_cutoff.checked_sub(standby_horizon) {
const MAX_ALLOWED_STANDBY_LAG: u64 = 10u64 << 30; // 10 GB
if standby_lag.0 < MAX_ALLOWED_STANDBY_LAG {
new_gc_cutoff = Lsn::min(standby_horizon, new_gc_cutoff);
info!("holding off GC for standby apply LSN {}", standby_horizon);
trace!("holding off GC for standby apply LSN {}", standby_horizon);
} else {
warn!(
"standby is lagging for more than {}MB, not holding gc for it",
@@ -6882,7 +6765,7 @@ impl Timeline {
}
/// Reconstruct a value, using the given base image and WAL records in 'data'.
pub(crate) async fn reconstruct_value(
async fn reconstruct_value(
&self,
key: Key,
request_lsn: Lsn,

View File

@@ -4,11 +4,10 @@
//!
//! The old legacy algorithm is implemented directly in `timeline.rs`.
use std::cmp::min;
use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
use std::ops::{Deref, Range};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
use std::time::{Duration, Instant};
use super::layer_manager::LayerManagerLockHolder;
use super::{
@@ -17,8 +16,7 @@ use super::{
Timeline,
};
use crate::pgdatadir_mapping::CollectKeySpaceError;
use crate::tenant::timeline::{DeltaEntry, RepartitionError};
use crate::tenant::timeline::DeltaEntry;
use crate::walredo::RedoAttemptType;
use anyhow::{Context, anyhow};
use bytes::Bytes;
@@ -34,7 +32,6 @@ use pageserver_api::models::{CompactInfoResponse, CompactKeyRange};
use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
use pageserver_compaction::helpers::{fully_contains, overlaps_with};
use pageserver_compaction::interface::*;
use postgres_ffi::to_pg_timestamp;
use serde::Serialize;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio_util::sync::CancellationToken;
@@ -47,7 +44,6 @@ use wal_decoder::models::value::Value;
use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder};
use crate::page_cache;
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::statvfs::Statvfs;
use crate::tenant::checks::check_valid_layermap;
use crate::tenant::gc_block::GcBlock;
@@ -68,7 +64,7 @@ use crate::tenant::timeline::{
DeltaLayerWriter, ImageLayerCreationOutcome, ImageLayerWriter, IoConcurrency, Layer,
ResidentLayer, drop_layer_manager_rlock,
};
use crate::tenant::{DeltaLayer, MaybeOffloaded, PageReconstructError};
use crate::tenant::{DeltaLayer, MaybeOffloaded};
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
/// Maximum number of deltas before generating an image layer in bottom-most compaction.
@@ -575,7 +571,7 @@ impl GcCompactionQueue {
}
match res {
Ok(res) => Ok(res),
Err(e) if e.is_cancel() => Err(e),
Err(CompactionError::ShuttingDown) => Err(CompactionError::ShuttingDown),
Err(_) => {
// There are some cases where traditional gc might collect some layer
// files causing gc-compaction cannot read the full history of the key.
@@ -595,9 +591,9 @@ impl GcCompactionQueue {
timeline: &Arc<Timeline>,
) -> Result<CompactionOutcome, CompactionError> {
let Ok(_one_op_at_a_time_guard) = self.consumer_lock.try_lock() else {
return Err(CompactionError::Other(anyhow::anyhow!(
"cannot run gc-compaction because another gc-compaction is running. This should not happen because we only call this function from the gc-compaction queue."
)));
return Err(CompactionError::AlreadyRunning(
"cannot run gc-compaction because another gc-compaction is running. This should not happen because we only call this function from the gc-compaction queue.",
));
};
let has_pending_tasks;
let mut yield_for_l0 = false;
@@ -1263,19 +1259,13 @@ impl Timeline {
// Is the timeline being deleted?
if self.is_stopping() {
trace!("Dropping out of compaction on timeline shutdown");
return Err(CompactionError::new_cancelled());
return Err(CompactionError::ShuttingDown);
}
let target_file_size = self.get_checkpoint_distance();
// Define partitioning schema if needed
// HADRON
let force_image_creation_lsn = self
.get_or_compute_force_image_creation_lsn(cancel, ctx)
.await
.map_err(CompactionError::Other)?;
// 1. L0 Compact
let l0_outcome = {
let timer = self.metrics.compact_time_histo.start_timer();
@@ -1283,7 +1273,6 @@ impl Timeline {
.compact_level0(
target_file_size,
options.flags.contains(CompactFlags::ForceL0Compaction),
force_image_creation_lsn,
ctx,
)
.await?;
@@ -1386,7 +1375,6 @@ impl Timeline {
.create_image_layers(
&partitioning,
lsn,
force_image_creation_lsn,
mode,
&image_ctx,
self.last_image_layer_creation_status
@@ -1429,33 +1417,22 @@ impl Timeline {
}
// Suppress errors when cancelled.
//
// Log other errors but continue. Failure to repartition is normal, if the timeline was just created
// as an empty timeline. Also in unit tests, when we use the timeline as a simple
// key-value store, ignoring the datadir layout. Log the error but continue.
//
// TODO:
// 1. shouldn't we return early here if we observe cancellation
// 2. Experiment: can we stop checking self.cancel here?
Err(_) if self.cancel.is_cancelled() => {} // TODO: try how we fare removing this branch
Err(_) if self.cancel.is_cancelled() => {}
Err(err) if err.is_cancel() => {}
Err(RepartitionError::CollectKeyspace(
e @ CollectKeySpaceError::Decode(_)
| e @ CollectKeySpaceError::PageRead(
PageReconstructError::MissingKey(_) | PageReconstructError::WalRedo(_),
),
)) => {
// Alert on critical errors that indicate data corruption.
// Alert on critical errors that indicate data corruption.
Err(err) if err.is_critical() => {
critical_timeline!(
self.tenant_shard_id,
self.timeline_id,
"could not compact, repartitioning keyspace failed: {e:?}"
"could not compact, repartitioning keyspace failed: {err:?}"
);
}
Err(e) => error!(
"could not compact, repartitioning keyspace failed: {:?}",
e.into_anyhow()
),
// Log other errors. No partitioning? This is normal, if the timeline was just created
// as an empty timeline. Also in unit tests, when we use the timeline as a simple
// key-value store, ignoring the datadir layout. Log the error but continue.
Err(err) => error!("could not compact, repartitioning keyspace failed: {err:?}"),
};
let partition_count = self.partitioning.read().0.0.parts.len();
@@ -1483,63 +1460,6 @@ impl Timeline {
Ok(CompactionOutcome::Done)
}
/* BEGIN_HADRON */
// Get the force image creation LSN. Compute it if the last computed LSN is too old.
async fn get_or_compute_force_image_creation_lsn(
self: &Arc<Self>,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<Option<Lsn>> {
const FORCE_IMAGE_CREATION_LSN_COMPUTE_INTERVAL: Duration = Duration::from_secs(10 * 60); // 10 minutes
let image_layer_force_creation_period = self.get_image_creation_timeout();
if image_layer_force_creation_period.is_none() {
return Ok(None);
}
let image_layer_force_creation_period = image_layer_force_creation_period.unwrap();
let force_image_creation_lsn_computed_at =
*self.force_image_creation_lsn_computed_at.lock().unwrap();
if force_image_creation_lsn_computed_at.is_none()
|| force_image_creation_lsn_computed_at.unwrap().elapsed()
> FORCE_IMAGE_CREATION_LSN_COMPUTE_INTERVAL
{
let now: SystemTime = SystemTime::now();
let timestamp = now
.checked_sub(image_layer_force_creation_period)
.ok_or_else(|| {
anyhow::anyhow!(
"image creation timeout is too large: {image_layer_force_creation_period:?}"
)
})?;
let timestamp = to_pg_timestamp(timestamp);
let force_image_creation_lsn = match self
.find_lsn_for_timestamp(timestamp, cancel, ctx)
.await?
{
LsnForTimestamp::Present(lsn) | LsnForTimestamp::Future(lsn) => lsn,
_ => {
let gc_lsn = *self.get_applied_gc_cutoff_lsn();
tracing::info!(
"no LSN found for timestamp {timestamp:?}, using latest GC cutoff LSN {}",
gc_lsn
);
gc_lsn
}
};
self.force_image_creation_lsn
.store(force_image_creation_lsn);
*self.force_image_creation_lsn_computed_at.lock().unwrap() = Some(Instant::now());
tracing::info!(
"computed force image creation LSN: {}",
force_image_creation_lsn
);
Ok(Some(force_image_creation_lsn))
} else {
Ok(Some(self.force_image_creation_lsn.load()))
}
}
/* END_HADRON */
/// Check for layers that are elegible to be rewritten:
/// - Shard splitting: After a shard split, ancestor layers beyond pitr_interval, so that
/// we don't indefinitely retain keys in this shard that aren't needed.
@@ -1692,7 +1612,7 @@ impl Timeline {
for (i, layer) in layers_to_rewrite.into_iter().enumerate() {
if self.cancel.is_cancelled() {
return Err(CompactionError::new_cancelled());
return Err(CompactionError::ShuttingDown);
}
info!(layer=%layer, "rewriting layer after shard split: {}/{}", i, total);
@@ -1790,7 +1710,7 @@ impl Timeline {
Ok(()) => {},
Err(WaitCompletionError::NotInitialized(ni)) => return Err(CompactionError::from(ni)),
Err(WaitCompletionError::UploadQueueShutDownOrStopped) => {
return Err(CompactionError::new_cancelled());
return Err(CompactionError::ShuttingDown);
}
},
// Don't wait if there's L0 compaction to do. We don't need to update the outcome
@@ -1869,7 +1789,6 @@ impl Timeline {
self: &Arc<Self>,
target_file_size: u64,
force_compaction_ignore_threshold: bool,
force_compaction_lsn: Option<Lsn>,
ctx: &RequestContext,
) -> Result<CompactionOutcome, CompactionError> {
let CompactLevel0Phase1Result {
@@ -1890,7 +1809,6 @@ impl Timeline {
stats,
target_file_size,
force_compaction_ignore_threshold,
force_compaction_lsn,
&ctx,
)
.instrument(phase1_span)
@@ -1913,7 +1831,6 @@ impl Timeline {
mut stats: CompactLevel0Phase1StatsBuilder,
target_file_size: u64,
force_compaction_ignore_threshold: bool,
force_compaction_lsn: Option<Lsn>,
ctx: &RequestContext,
) -> Result<CompactLevel0Phase1Result, CompactionError> {
let begin = tokio::time::Instant::now();
@@ -1943,28 +1860,11 @@ impl Timeline {
return Ok(CompactLevel0Phase1Result::default());
}
} else {
// HADRON
let min_lsn = level0_deltas
.iter()
.map(|a| a.get_lsn_range().start)
.reduce(min);
if force_compaction_lsn.is_some()
&& min_lsn.is_some()
&& min_lsn.unwrap() < force_compaction_lsn.unwrap()
{
info!(
"forcing L0 compaction of {} L0 deltas. Min lsn: {}, force compaction lsn: {}",
level0_deltas.len(),
min_lsn.unwrap(),
force_compaction_lsn.unwrap()
);
} else {
debug!(
level0_deltas = level0_deltas.len(),
threshold, "too few deltas to compact"
);
return Ok(CompactLevel0Phase1Result::default());
}
debug!(
level0_deltas = level0_deltas.len(),
threshold, "too few deltas to compact"
);
return Ok(CompactLevel0Phase1Result::default());
}
}
@@ -2073,7 +1973,7 @@ impl Timeline {
let mut all_keys = Vec::new();
for l in deltas_to_compact.iter() {
if self.cancel.is_cancelled() {
return Err(CompactionError::new_cancelled());
return Err(CompactionError::ShuttingDown);
}
let delta = l.get_as_delta(ctx).await.map_err(CompactionError::Other)?;
let keys = delta
@@ -2166,7 +2066,7 @@ impl Timeline {
stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
if self.cancel.is_cancelled() {
return Err(CompactionError::new_cancelled());
return Err(CompactionError::ShuttingDown);
}
stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
@@ -2274,7 +2174,7 @@ impl Timeline {
// avoid hitting the cancellation token on every key. in benches, we end up
// shuffling an order of million keys per layer, this means we'll check it
// around tens of times per layer.
return Err(CompactionError::new_cancelled());
return Err(CompactionError::ShuttingDown);
}
let same_key = prev_key == Some(key);
@@ -2359,7 +2259,7 @@ impl Timeline {
if writer.is_none() {
if self.cancel.is_cancelled() {
// to be somewhat responsive to cancellation, check for each new layer
return Err(CompactionError::new_cancelled());
return Err(CompactionError::ShuttingDown);
}
// Create writer if not initiaized yet
writer = Some(
@@ -2615,13 +2515,10 @@ impl Timeline {
// Is the timeline being deleted?
if self.is_stopping() {
trace!("Dropping out of compaction on timeline shutdown");
return Err(CompactionError::new_cancelled());
return Err(CompactionError::ShuttingDown);
}
let (dense_ks, _sparse_ks) = self
.collect_keyspace(end_lsn, ctx)
.await
.map_err(CompactionError::from_collect_keyspace)?;
let (dense_ks, _sparse_ks) = self.collect_keyspace(end_lsn, ctx).await?;
// TODO(chi): ignore sparse_keyspace for now, compact it in the future.
let mut adaptor = TimelineAdaptor::new(self, (end_lsn, dense_ks));
@@ -3277,7 +3174,7 @@ impl Timeline {
let gc_lock = async {
tokio::select! {
guard = self.gc_lock.lock() => Ok(guard),
_ = cancel.cancelled() => Err(CompactionError::new_cancelled()),
_ = cancel.cancelled() => Err(CompactionError::ShuttingDown),
}
};
@@ -3550,7 +3447,7 @@ impl Timeline {
}
total_layer_size += layer.layer_desc().file_size;
if cancel.is_cancelled() {
return Err(CompactionError::new_cancelled());
return Err(CompactionError::ShuttingDown);
}
let should_yield = yield_for_l0
&& self
@@ -3697,7 +3594,7 @@ impl Timeline {
}
if cancel.is_cancelled() {
return Err(CompactionError::new_cancelled());
return Err(CompactionError::ShuttingDown);
}
let should_yield = yield_for_l0

View File

@@ -212,12 +212,8 @@
//! to the parent shard during a shard split. Eventually, the shard split task will
//! shut down the parent => case (1).
use std::collections::HashMap;
use std::collections::hash_map;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::Weak;
use std::time::Duration;
use std::collections::{HashMap, hash_map};
use std::sync::{Arc, Mutex, Weak};
use pageserver_api::shard::ShardIdentity;
use tracing::{instrument, trace};
@@ -337,44 +333,6 @@ enum RoutingResult<T: Types> {
}
impl<T: Types> Cache<T> {
/* BEGIN_HADRON */
/// A wrapper of do_get to resolve the tenant shard for a get page request.
#[instrument(level = "trace", skip_all)]
pub(crate) async fn get(
&mut self,
timeline_id: TimelineId,
shard_selector: ShardSelector,
tenant_manager: &T::TenantManager,
) -> Result<Handle<T>, GetError<T>> {
const GET_MAX_RETRIES: usize = 10;
const RETRY_BACKOFF: Duration = Duration::from_millis(100);
let mut attempt = 0;
loop {
attempt += 1;
match self
.do_get(timeline_id, shard_selector, tenant_manager)
.await
{
Ok(handle) => return Ok(handle),
Err(e) => {
// Retry on tenant manager error to handle tenant split more gracefully
if attempt < GET_MAX_RETRIES {
tracing::warn!(
"Fail to resolve tenant shard in attempt {}: {:?}. Retrying...",
attempt,
e
);
tokio::time::sleep(RETRY_BACKOFF).await;
continue;
} else {
return Err(e);
}
}
}
}
}
/* END_HADRON */
/// See module-level comment for details.
///
/// Does NOT check for the shutdown state of [`Types::Timeline`].
@@ -383,7 +341,7 @@ impl<T: Types> Cache<T> {
/// and if so, return an error that causes the page service to
/// close the connection.
#[instrument(level = "trace", skip_all)]
async fn do_get(
pub(crate) async fn get(
&mut self,
timeline_id: TimelineId,
shard_selector: ShardSelector,
@@ -921,7 +879,6 @@ mod tests {
.await
.err()
.expect("documented behavior: can't get new handle after shutdown");
assert_eq!(cache.map.len(), 1, "next access cleans up the cache");
cache

View File

@@ -17,6 +17,8 @@ pub(crate) enum OffloadError {
Cancelled,
#[error("Timeline is not archived")]
NotArchived,
#[error(transparent)]
RemoteStorage(anyhow::Error),
#[error("Offload or deletion already in progress")]
AlreadyInProgress,
#[error("Unexpected offload error: {0}")]
@@ -27,7 +29,7 @@ impl From<TenantManifestError> for OffloadError {
fn from(e: TenantManifestError) -> Self {
match e {
TenantManifestError::Cancelled => Self::Cancelled,
TenantManifestError::RemoteStorage(e) => Self::Other(e),
TenantManifestError::RemoteStorage(e) => Self::RemoteStorage(e),
}
}
}

View File

@@ -182,19 +182,12 @@ pub(super) async fn connection_manager_loop_step(
}
},
// If we've not received any updates from the broker from a while, are waiting for WAL
// and have no safekeeper connection or connection candidates, then it might be that
// the broker subscription is wedged. Drop the currrent subscription and re-subscribe
// with the goal of unblocking it.
_ = broker_reset_interval.tick() => {
let awaiting_lsn = wait_lsn_status.borrow().is_some();
let no_candidates = connection_manager_state.wal_stream_candidates.is_empty();
let no_connection = connection_manager_state.wal_connection.is_none();
if awaiting_lsn && no_candidates && no_connection {
tracing::warn!("No broker updates received for a while, but waiting for WAL. Re-setting stream ...");
broker_subscription = subscribe_for_timeline_updates(broker_client, id, cancel).await?;
if wait_lsn_status.borrow().is_some() {
tracing::warn!("No broker updates received for a while, but waiting for WAL. Re-setting stream ...")
}
broker_subscription = subscribe_for_timeline_updates(broker_client, id, cancel).await?;
},
new_event = async {
@@ -750,7 +743,7 @@ impl ConnectionManagerState {
WALRECEIVER_BROKER_UPDATES.inc();
info!(
trace!(
"safekeeper info update: standby_horizon(cutoff)={}",
timeline_update.standby_horizon
);

View File

@@ -45,10 +45,9 @@ pub(crate) fn regenerate(
let (disk_wanted_bytes, shard_count) = tenant_manager.calculate_utilization()?;
// Fetch the fraction of disk space which may be used
let disk_usable_pct = if conf.disk_usage_based_eviction.enabled {
conf.disk_usage_based_eviction.max_usage_pct
} else {
Percent::new(100).unwrap()
let disk_usable_pct = match conf.disk_usage_based_eviction.clone() {
Some(e) => e.max_usage_pct,
None => Percent::new(100).unwrap(),
};
// Express a static value for how many shards we may schedule on one node

View File

@@ -566,55 +566,22 @@ impl PostgresRedoManager {
}
}
#[cfg(test)]
pub(crate) mod harness {
use super::PostgresRedoManager;
use crate::config::PageServerConf;
use utils::{id::TenantId, shard::TenantShardId};
pub struct RedoHarness {
// underscored because unused, except for removal at drop
_repo_dir: camino_tempfile::Utf8TempDir,
pub manager: PostgresRedoManager,
tenant_shard_id: TenantShardId,
}
impl RedoHarness {
pub fn new() -> anyhow::Result<Self> {
crate::tenant::harness::setup_logging();
let repo_dir = camino_tempfile::tempdir()?;
let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf());
let conf = Box::leak(Box::new(conf));
let tenant_shard_id = TenantShardId::unsharded(TenantId::generate());
let manager = PostgresRedoManager::new(conf, tenant_shard_id);
Ok(RedoHarness {
_repo_dir: repo_dir,
manager,
tenant_shard_id,
})
}
pub fn span(&self) -> tracing::Span {
tracing::info_span!("RedoHarness", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug())
}
}
}
#[cfg(test)]
mod tests {
use std::str::FromStr;
use bytes::Bytes;
use pageserver_api::key::Key;
use pageserver_api::shard::TenantShardId;
use postgres_ffi::PgMajorVersion;
use tracing::Instrument;
use utils::id::TenantId;
use utils::lsn::Lsn;
use wal_decoder::models::record::NeonWalRecord;
use super::PostgresRedoManager;
use crate::config::PageServerConf;
use crate::walredo::RedoAttemptType;
use crate::walredo::harness::RedoHarness;
#[tokio::test]
async fn test_ping() {
@@ -725,4 +692,33 @@ mod tests {
)
]
}
struct RedoHarness {
// underscored because unused, except for removal at drop
_repo_dir: camino_tempfile::Utf8TempDir,
manager: PostgresRedoManager,
tenant_shard_id: TenantShardId,
}
impl RedoHarness {
fn new() -> anyhow::Result<Self> {
crate::tenant::harness::setup_logging();
let repo_dir = camino_tempfile::tempdir()?;
let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf());
let conf = Box::leak(Box::new(conf));
let tenant_shard_id = TenantShardId::unsharded(TenantId::generate());
let manager = PostgresRedoManager::new(conf, tenant_shard_id);
Ok(RedoHarness {
_repo_dir: repo_dir,
manager,
tenant_shard_id,
})
}
fn span(&self) -> tracing::Span {
tracing::info_span!("RedoHarness", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug())
}
}
}

View File

@@ -65,7 +65,6 @@
#include "port/pg_iovec.h"
#include "postmaster/interrupt.h"
#include "replication/walsender.h"
#include "storage/ipc.h"
#include "utils/timeout.h"
#include "bitmap.h"
@@ -413,47 +412,6 @@ compact_prefetch_buffers(void)
return false;
}
/*
* Check that prefetch response matches the slot
*/
static void
check_getpage_response(PrefetchRequest* slot, NeonResponse* resp)
{
if (resp->tag != T_NeonGetPageResponse && resp->tag != T_NeonErrorResponse)
{
neon_shard_log(slot->shard_no, PANIC, "Unexpected prefetch response %d, ring_receive=%ld, ring_flush=%ld, ring_unused=%ld",
resp->tag, MyPState->ring_receive, MyPState->ring_flush, MyPState->ring_unused);
}
if (neon_protocol_version >= 3)
{
NRelFileInfo rinfo = BufTagGetNRelFileInfo(slot->buftag);
if (resp->tag == T_NeonGetPageResponse)
{
NeonGetPageResponse * getpage_resp = (NeonGetPageResponse *)resp;
if (resp->reqid != slot->reqid ||
resp->lsn != slot->request_lsns.request_lsn ||
resp->not_modified_since != slot->request_lsns.not_modified_since ||
!RelFileInfoEquals(getpage_resp->req.rinfo, rinfo) ||
getpage_resp->req.forknum != slot->buftag.forkNum ||
getpage_resp->req.blkno != slot->buftag.blockNum)
{
NEON_PANIC_CONNECTION_STATE(slot->shard_no, PANIC,
"Receive unexpected getpage response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u} to get page request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(getpage_resp->req.rinfo), getpage_resp->req.forknum, getpage_resp->req.blkno,
slot->reqid, LSN_FORMAT_ARGS(slot->request_lsns.request_lsn), LSN_FORMAT_ARGS(slot->request_lsns.not_modified_since), RelFileInfoFmt(rinfo), slot->buftag.forkNum, slot->buftag.blockNum);
}
}
else if (resp->reqid != slot->reqid ||
resp->lsn != slot->request_lsns.request_lsn ||
resp->not_modified_since != slot->request_lsns.not_modified_since)
{
elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match exists request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since),
slot->reqid, LSN_FORMAT_ARGS(slot->request_lsns.request_lsn), LSN_FORMAT_ARGS(slot->request_lsns.not_modified_since));
}
}
}
/*
* If there might be responses still in the TCP buffer, then we should try to
* use those, to reduce any TCP backpressure on the OS/PS side.
@@ -488,18 +446,15 @@ communicator_prefetch_pump_state(void)
if (response == NULL)
break;
check_getpage_response(slot, response);
/* The slot should still be valid */
if (slot->status != PRFS_REQUESTED ||
slot->response != NULL ||
slot->my_ring_index != MyPState->ring_receive)
{
neon_shard_log(slot->shard_no, PANIC,
neon_shard_log(slot->shard_no, ERROR,
"Incorrect prefetch slot state after receive: status=%d response=%p my=%lu receive=%lu",
slot->status, slot->response,
(long) slot->my_ring_index, (long) MyPState->ring_receive);
}
/* update prefetch state */
MyPState->n_responses_buffered += 1;
MyPState->n_requests_inflight -= 1;
@@ -638,21 +593,6 @@ readahead_buffer_resize(int newsize, void *extra)
}
/*
* Callback to be called on backend exit to ensure correct state of compute-PS communication
* in case of backend cancel
*/
static void
prefetch_on_exit(int code, Datum arg)
{
if (code != 0) /* do disconnect only on abnormal backend termination */
{
shardno_t shard_no = DatumGetInt32(arg);
prefetch_on_ps_disconnect();
page_server->disconnect(shard_no);
}
}
/*
* Make sure that there are no responses still in the buffer.
@@ -665,11 +605,6 @@ consume_prefetch_responses(void)
{
if (MyPState->ring_receive < MyPState->ring_unused)
prefetch_wait_for(MyPState->ring_unused - 1);
/*
* We know for sure we're not working on any prefetch pages after
* this.
*/
END_PREFETCH_RECEIVE_WORK();
}
static void
@@ -787,12 +722,10 @@ prefetch_read(PrefetchRequest *slot)
if (slot->status != PRFS_REQUESTED ||
slot->response != NULL ||
slot->my_ring_index != MyPState->ring_receive)
{
neon_shard_log(slot->shard_no, PANIC,
neon_shard_log(slot->shard_no, ERROR,
"Incorrect prefetch read: status=%d response=%p my=%lu receive=%lu",
slot->status, slot->response,
(long)slot->my_ring_index, (long)MyPState->ring_receive);
}
/*
* Copy the request info so that if an error happens and the prefetch
@@ -808,18 +741,14 @@ prefetch_read(PrefetchRequest *slot)
MemoryContextSwitchTo(old);
if (response)
{
check_getpage_response(slot, response);
/* The slot should still be valid */
if (slot->status != PRFS_REQUESTED ||
slot->response != NULL ||
slot->my_ring_index != MyPState->ring_receive)
{
neon_shard_log(shard_no, PANIC,
neon_shard_log(shard_no, ERROR,
"Incorrect prefetch slot state after receive: status=%d response=%p my=%lu receive=%lu",
slot->status, slot->response,
(long) slot->my_ring_index, (long) MyPState->ring_receive);
}
/* update prefetch state */
MyPState->n_responses_buffered += 1;
@@ -891,10 +820,11 @@ communicator_prefetch_receive(BufferTag tag)
void
prefetch_on_ps_disconnect(void)
{
bool save_readpage_reentrant_guard = readpage_reentrant_guard;
MyPState->ring_flush = MyPState->ring_unused;
/* Nothing should cancel disconnect: we should not leave connection in opaque state */
HOLD_INTERRUPTS();
/* Prohibit callig of prefetch_pump_state */
START_PREFETCH_RECEIVE_WORK();
while (MyPState->ring_receive < MyPState->ring_unused)
{
@@ -924,6 +854,9 @@ prefetch_on_ps_disconnect(void)
MyNeonCounters->getpage_prefetch_discards_total += 1;
}
/* Restore guard */
readpage_reentrant_guard = save_readpage_reentrant_guard;
/*
* We can have gone into retry due to network error, so update stats with
* the latest available
@@ -932,8 +865,6 @@ prefetch_on_ps_disconnect(void)
MyPState->n_requests_inflight;
MyNeonCounters->getpage_prefetches_buffered =
MyPState->n_responses_buffered;
RESUME_INTERRUPTS();
}
/*
@@ -1096,11 +1027,16 @@ communicator_prefetch_lookupv(NRelFileInfo rinfo, ForkNumber forknum, BlockNumbe
/*
* Ignore errors
*/
if (slot->response->tag == T_NeonErrorResponse)
if (slot->response->tag != T_NeonGetPageResponse)
{
if (slot->response->tag != T_NeonErrorResponse)
{
NEON_PANIC_CONNECTION_STATE(slot->shard_no, PANIC,
"Expected GetPage (0x%02x) or Error (0x%02x) response to GetPageRequest, but got 0x%02x",
T_NeonGetPageResponse, T_NeonErrorResponse, slot->response->tag);
}
continue;
}
Assert(slot->response->tag == T_NeonGetPageResponse); /* checked by check_getpage_response when response was assigned to the slot */
memcpy(buffers[i], ((NeonGetPageResponse*)slot->response)->page, BLCKSZ);
@@ -1415,7 +1351,7 @@ equal_requests(NeonRequest* a, NeonRequest* b)
static NeonResponse *
page_server_request(void const *req)
{
NeonResponse *resp = NULL;
NeonResponse *resp;
BufferTag tag = {0};
shardno_t shard_no;
@@ -1435,7 +1371,7 @@ page_server_request(void const *req)
tag.blockNum = ((NeonGetPageRequest *) req)->blkno;
break;
default:
neon_log(PANIC, "Unexpected request tag: %d", messageTag(req));
neon_log(ERROR, "Unexpected request tag: %d", messageTag(req));
}
shard_no = get_shard_number(&tag);
@@ -1448,12 +1384,9 @@ page_server_request(void const *req)
shard_no = 0;
}
consume_prefetch_responses();
PG_TRY();
do
{
before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
do
PG_TRY();
{
while (!page_server->send(shard_no, (NeonRequest *) req)
|| !page_server->flush(shard_no))
@@ -1461,24 +1394,30 @@ page_server_request(void const *req)
/* do nothing */
}
MyNeonCounters->pageserver_open_requests++;
consume_prefetch_responses();
resp = page_server->receive(shard_no);
MyNeonCounters->pageserver_open_requests--;
} while (resp == NULL);
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
}
PG_CATCH();
{
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
/* Nothing should cancel disconnect: we should not leave connection in opaque state */
HOLD_INTERRUPTS();
page_server->disconnect(shard_no);
MyNeonCounters->pageserver_open_requests = 0;
RESUME_INTERRUPTS();
}
PG_CATCH();
{
/*
* Cancellation in this code needs to be handled better at some
* point, but this currently seems fine for now.
*/
page_server->disconnect(shard_no);
MyNeonCounters->pageserver_open_requests = 0;
PG_RE_THROW();
}
PG_END_TRY();
/*
* We know for sure we're not working on any prefetch pages after
* this.
*/
END_PREFETCH_RECEIVE_WORK();
PG_RE_THROW();
}
PG_END_TRY();
} while (resp == NULL);
return resp;
}
@@ -1563,7 +1502,7 @@ nm_pack_request(NeonRequest *msg)
case T_NeonDbSizeResponse:
case T_NeonGetSlruSegmentResponse:
default:
neon_log(PANIC, "unexpected neon message tag 0x%02x", msg->tag);
neon_log(ERROR, "unexpected neon message tag 0x%02x", msg->tag);
break;
}
return s;
@@ -1715,7 +1654,7 @@ nm_unpack_response(StringInfo s)
case T_NeonDbSizeRequest:
case T_NeonGetSlruSegmentRequest:
default:
neon_log(PANIC, "unexpected neon message tag 0x%02x", tag);
neon_log(ERROR, "unexpected neon message tag 0x%02x", tag);
break;
}
@@ -2044,7 +1983,7 @@ communicator_exists(NRelFileInfo rinfo, ForkNumber forkNum, neon_request_lsns *r
!RelFileInfoEquals(exists_resp->req.rinfo, request.rinfo) ||
exists_resp->req.forknum != request.forknum)
{
NEON_PANIC_CONNECTION_STATE(0, PANIC,
NEON_PANIC_CONNECTION_STATE(-1, PANIC,
"Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u} to exits request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(exists_resp->req.rinfo), exists_resp->req.forknum,
request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since), RelFileInfoFmt(request.rinfo), request.forknum);
@@ -2075,7 +2014,7 @@ communicator_exists(NRelFileInfo rinfo, ForkNumber forkNum, neon_request_lsns *r
break;
default:
NEON_PANIC_CONNECTION_STATE(0, PANIC,
NEON_PANIC_CONNECTION_STATE(-1, PANIC,
"Expected Exists (0x%02x) or Error (0x%02x) response to ExistsRequest, but got 0x%02x",
T_NeonExistsResponse, T_NeonErrorResponse, resp->tag);
}
@@ -2219,7 +2158,6 @@ Retry:
Assert(memcmp(&hashkey.buftag, &slot->buftag, sizeof(BufferTag)) == 0);
Assert(hashkey.buftag.blockNum == base_blockno + i);
/* We already checked that response match request when storing it in slot */
resp = slot->response;
switch (resp->tag)
@@ -2227,6 +2165,21 @@ Retry:
case T_NeonGetPageResponse:
{
NeonGetPageResponse* getpage_resp = (NeonGetPageResponse *) resp;
if (neon_protocol_version >= 3)
{
if (resp->reqid != slot->reqid ||
resp->lsn != slot->request_lsns.request_lsn ||
resp->not_modified_since != slot->request_lsns.not_modified_since ||
!RelFileInfoEquals(getpage_resp->req.rinfo, rinfo) ||
getpage_resp->req.forknum != forkNum ||
getpage_resp->req.blkno != base_blockno + i)
{
NEON_PANIC_CONNECTION_STATE(-1, PANIC,
"Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u} to get page request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(getpage_resp->req.rinfo), getpage_resp->req.forknum, getpage_resp->req.blkno,
slot->reqid, LSN_FORMAT_ARGS(slot->request_lsns.request_lsn), LSN_FORMAT_ARGS(slot->request_lsns.not_modified_since), RelFileInfoFmt(rinfo), forkNum, base_blockno + i);
}
}
memcpy(buffer, getpage_resp->page, BLCKSZ);
/*
@@ -2239,6 +2192,17 @@ Retry:
break;
}
case T_NeonErrorResponse:
if (neon_protocol_version >= 3)
{
if (resp->reqid != slot->reqid ||
resp->lsn != slot->request_lsns.request_lsn ||
resp->not_modified_since != slot->request_lsns.not_modified_since)
{
elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match get relsize request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since),
slot->reqid, LSN_FORMAT_ARGS(slot->request_lsns.request_lsn), LSN_FORMAT_ARGS(slot->request_lsns.not_modified_since));
}
}
ereport(ERROR,
(errcode(ERRCODE_IO_ERROR),
errmsg(NEON_TAG "[shard %d, reqid %lx] could not read block %u in rel %u/%u/%u.%u from page server at lsn %X/%08X",
@@ -2293,7 +2257,7 @@ communicator_nblocks(NRelFileInfo rinfo, ForkNumber forknum, neon_request_lsns *
!RelFileInfoEquals(relsize_resp->req.rinfo, request.rinfo) ||
relsize_resp->req.forknum != forknum)
{
NEON_PANIC_CONNECTION_STATE(0, PANIC,
NEON_PANIC_CONNECTION_STATE(-1, PANIC,
"Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u} to get relsize request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(relsize_resp->req.rinfo), relsize_resp->req.forknum,
request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since), RelFileInfoFmt(request.rinfo), forknum);
@@ -2324,7 +2288,7 @@ communicator_nblocks(NRelFileInfo rinfo, ForkNumber forknum, neon_request_lsns *
break;
default:
NEON_PANIC_CONNECTION_STATE(0, PANIC,
NEON_PANIC_CONNECTION_STATE(-1, PANIC,
"Expected Nblocks (0x%02x) or Error (0x%02x) response to NblocksRequest, but got 0x%02x",
T_NeonNblocksResponse, T_NeonErrorResponse, resp->tag);
}
@@ -2363,7 +2327,7 @@ communicator_dbsize(Oid dbNode, neon_request_lsns *request_lsns)
if (!equal_requests(resp, &request.hdr) ||
dbsize_resp->req.dbNode != dbNode)
{
NEON_PANIC_CONNECTION_STATE(0, PANIC,
NEON_PANIC_CONNECTION_STATE(-1, PANIC,
"Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, dbNode=%u} to get DB size request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, dbNode=%u}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), dbsize_resp->req.dbNode,
request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since), dbNode);
@@ -2392,7 +2356,7 @@ communicator_dbsize(Oid dbNode, neon_request_lsns *request_lsns)
break;
default:
NEON_PANIC_CONNECTION_STATE(0, PANIC,
NEON_PANIC_CONNECTION_STATE(-1, PANIC,
"Expected DbSize (0x%02x) or Error (0x%02x) response to DbSizeRequest, but got 0x%02x",
T_NeonDbSizeResponse, T_NeonErrorResponse, resp->tag);
}
@@ -2408,7 +2372,7 @@ communicator_read_slru_segment(SlruKind kind, int64 segno, neon_request_lsns *re
{
int n_blocks;
shardno_t shard_no = 0; /* All SLRUs are at shard 0 */
NeonResponse *resp = NULL;
NeonResponse *resp;
NeonGetSlruSegmentRequest request;
request = (NeonGetSlruSegmentRequest) {
@@ -2419,29 +2383,14 @@ communicator_read_slru_segment(SlruKind kind, int64 segno, neon_request_lsns *re
.segno = segno
};
consume_prefetch_responses();
PG_TRY();
do
{
before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
do
{
while (!page_server->send(shard_no, &request.hdr) || !page_server->flush(shard_no));
resp = page_server->receive(shard_no);
} while (resp == NULL);
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
}
PG_CATCH();
{
cancel_before_shmem_exit(prefetch_on_exit, Int32GetDatum(shard_no));
/* Nothing should cancel disconnect: we should not leave connection in opaque state */
HOLD_INTERRUPTS();
page_server->disconnect(shard_no);
RESUME_INTERRUPTS();
while (!page_server->send(shard_no, &request.hdr) || !page_server->flush(shard_no));
PG_RE_THROW();
}
PG_END_TRY();
consume_prefetch_responses();
resp = page_server->receive(shard_no);
} while (resp == NULL);
switch (resp->tag)
{
@@ -2454,7 +2403,7 @@ communicator_read_slru_segment(SlruKind kind, int64 segno, neon_request_lsns *re
slru_resp->req.kind != kind ||
slru_resp->req.segno != segno)
{
NEON_PANIC_CONNECTION_STATE(0, PANIC,
NEON_PANIC_CONNECTION_STATE(-1, PANIC,
"Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, kind=%u, segno=%u} to get SLRU segment request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, kind=%u, segno=%lluu}",
resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), slru_resp->req.kind, slru_resp->req.segno,
request.hdr.reqid, LSN_FORMAT_ARGS(request.hdr.lsn), LSN_FORMAT_ARGS(request.hdr.not_modified_since), kind, (unsigned long long) segno);
@@ -2486,7 +2435,7 @@ communicator_read_slru_segment(SlruKind kind, int64 segno, neon_request_lsns *re
break;
default:
NEON_PANIC_CONNECTION_STATE(0, PANIC,
NEON_PANIC_CONNECTION_STATE(-1, PANIC,
"Expected GetSlruSegment (0x%02x) or Error (0x%02x) response to GetSlruSegmentRequest, but got 0x%02x",
T_NeonGetSlruSegmentResponse, T_NeonErrorResponse, resp->tag);
}

View File

@@ -953,9 +953,7 @@ neon_fmgr_hook(FmgrHookEventType event, FmgrInfo *flinfo, Datum *private)
/*
* Fire Event Trigger if both function owner and current user are
* superuser. Allow executing Event Trigger function that belongs to a
* superuser when connected as a non-superuser, even when the function is
* SECURITY DEFINER.
* superuser, or none of them are.
*/
else if (event == FHET_START
/* still enable it to pass pg_regress tests */
@@ -978,7 +976,32 @@ neon_fmgr_hook(FmgrHookEventType event, FmgrInfo *flinfo, Datum *private)
function_is_owned_by_super = superuser_arg(function_owner);
/*
* Refuse to run functions that belongs to a non-superuser when the
* 1. Refuse to run SECURITY DEFINER function that belongs to a
* superuser when the current user is not a superuser itself.
*/
if (!role_is_super
&& function_is_owned_by_super
&& function_is_secdef)
{
char *func_name = get_func_name(flinfo->fn_oid);
ereport(WARNING,
(errmsg("Skipping Event Trigger"),
errdetail("Event Trigger function \"%s\" is owned by \"%s\" "
"and is SECURITY DEFINER",
func_name,
GetUserNameFromId(function_owner, false))));
/*
* we can't skip execution directly inside the fmgr_hook so
* instead we change the event trigger function to a noop
* function.
*/
force_noop(flinfo);
}
/*
* 2. Refuse to run functions that belongs to a non-superuser when the
* current user is a superuser.
*
* We could run a SECURITY DEFINER user-function here and be safe with
@@ -986,7 +1009,7 @@ neon_fmgr_hook(FmgrHookEventType event, FmgrInfo *flinfo, Datum *private)
* infrastructure maintenance operations, where we prefer to skip
* running user-defined code.
*/
if (role_is_super && !function_is_owned_by_super)
else if (role_is_super && !function_is_owned_by_super)
{
char *func_name = get_func_name(flinfo->fn_oid);

View File

@@ -267,7 +267,7 @@ async fn worker_inner(
) -> anyhow::Result<()> {
#[cfg(any(test, feature = "testing"))]
let storage = if config.test_remote_failures > 0 {
GenericRemoteStorage::unreliable_wrapper(storage, config.test_remote_failures, 100)
GenericRemoteStorage::unreliable_wrapper(storage, config.test_remote_failures)
} else {
storage
};

View File

@@ -10,7 +10,7 @@ use measured::{
Counter, CounterVec, FixedCardinalityLabel, Gauge, Histogram, HistogramVec, LabelGroup,
MetricGroup,
};
use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLogVec};
use metrics::{CounterPairAssoc, CounterPairVec, HyperLogLog, HyperLogLogVec};
use tokio::time::{self, Instant};
use crate::control_plane::messages::ColdStartInfo;
@@ -36,6 +36,7 @@ impl Metrics {
metrics.proxy.redis_errors_total.init_all_dense();
metrics.proxy.redis_events_count.init_all_dense();
metrics.proxy.retries_metric.init_all_dense();
metrics.proxy.invalid_endpoints_total.init_all_dense();
metrics.proxy.connection_failures_total.init_all_dense();
SELF.set(metrics)
@@ -79,6 +80,11 @@ pub struct ProxyMetrics {
)]
pub console_request_latency: HistogramVec<ConsoleRequestSet, 16>,
/// Time it takes to acquire a token to call console plane.
// largest bucket = 3^16 * 0.05ms = 2.15s
#[metric(metadata = Thresholds::exponential_buckets(0.00005, 3.0))]
pub control_plane_token_acquire_seconds: Histogram<16>,
/// Size of the HTTP request body lengths.
// smallest bucket = 16 bytes
// largest bucket = 4^12 * 16 bytes = 256MB
@@ -92,10 +98,19 @@ pub struct ProxyMetrics {
/// Number of opened connections to a database.
pub http_pool_opened_connections: Gauge,
/// Number of cache hits/misses for allowed ips.
pub allowed_ips_cache_misses: CounterVec<StaticLabelSet<CacheOutcome>>,
/// Number of allowed ips
#[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0]))]
pub allowed_ips_number: Histogram<10>,
/// Number of cache hits/misses for VPC endpoint IDs.
pub vpc_endpoint_id_cache_stats: CounterVec<StaticLabelSet<CacheOutcome>>,
/// Number of cache hits/misses for access blocker flags.
pub access_blocker_flags_cache_stats: CounterVec<StaticLabelSet<CacheOutcome>>,
/// Number of allowed VPC endpoints IDs
#[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0]))]
pub allowed_vpc_endpoint_ids: Histogram<10>,
@@ -124,12 +139,21 @@ pub struct ProxyMetrics {
/// Number of TLS handshake failures
pub tls_handshake_failures: Counter,
/// Number of connection requests affected by authentication rate limits
pub requests_auth_rate_limits_total: Counter,
/// HLL approximate cardinality of endpoints that are connecting
pub connecting_endpoints: HyperLogLogVec<StaticLabelSet<Protocol>, 32>,
/// Number of endpoints affected by errors of a given classification
pub endpoints_affected_by_errors: HyperLogLogVec<StaticLabelSet<crate::error::ErrorKind>, 32>,
/// Number of endpoints affected by authentication rate limits
pub endpoints_auth_rate_limits: HyperLogLog<32>,
/// Number of invalid endpoints (per protocol, per rejected).
pub invalid_endpoints_total: CounterVec<InvalidEndpointsSet>,
/// Number of retries (per outcome, per retry_type).
#[metric(metadata = Thresholds::with_buckets([0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]))]
pub retries_metric: HistogramVec<RetriesMetricSet, 9>,

View File

@@ -561,20 +561,6 @@ impl InterpretedWalReader {
// Update internal and external state, then reset the WAL stream
// if required.
let senders = self.shard_senders.entry(shard_id).or_default();
// Clean up any shard senders that have dropped out before adding the new
// one. This avoids a build up of dead senders.
senders.retain(|sender| {
let closed = sender.tx.is_closed();
if closed {
let sender_id = ShardSenderId::new(shard_id, sender.sender_id);
tracing::info!("Removed shard sender {}", sender_id);
}
!closed
});
let new_sender_id = match senders.last() {
Some(sender) => sender.sender_id.next(),
None => SenderId::first()

View File

@@ -220,7 +220,7 @@ impl WalSenders {
fn record_standby_reply(self: &Arc<WalSenders>, id: WalSenderId, reply: &StandbyReply) {
let mut shared = self.mutex.lock();
let slot = shared.get_slot_mut(id);
info!(
debug!(
"Record standby reply: ts={} apply_lsn={}",
reply.reply_ts, reply.apply_lsn
);
@@ -233,7 +233,6 @@ impl WalSenders {
})
}
}
shared.update_reply_feedback();
}
/// Record hot standby feedback, update aggregated value.
@@ -401,10 +400,7 @@ impl WalSendersShared {
}
}
self.agg_standby_feedback = StandbyFeedback {
reply: {
info!(prev=%self.agg_standby_feedback.reply.apply_lsn, new=%reply_agg.apply_lsn, "updating agg_standby_feedback apply_lsn");
reply_agg
},
reply: reply_agg,
hs_feedback: agg,
};
}

View File

@@ -1066,9 +1066,10 @@ async fn handle_node_delete(req: Request<Body>) -> Result<Response<Body>, ApiErr
let state = get_state(&req);
let node_id: NodeId = parse_request_param(&req, "node_id")?;
let force: bool = parse_query_param(&req, "force")?.unwrap_or(false);
json_response(
StatusCode::OK,
state.service.start_node_delete(node_id).await?,
state.service.start_node_delete(node_id, force).await?,
)
}

View File

@@ -1677,21 +1677,7 @@ impl Service {
.collect::<anyhow::Result<Vec<_>>>()?;
let safekeepers: HashMap<NodeId, Safekeeper> =
safekeepers.into_iter().map(|n| (n.get_id(), n)).collect();
let count_policy = |policy| {
safekeepers
.iter()
.filter(|sk| sk.1.scheduling_policy() == policy)
.count()
};
let active_sk_count = count_policy(SkSchedulingPolicy::Active);
let activating_sk_count = count_policy(SkSchedulingPolicy::Activating);
let pause_sk_count = count_policy(SkSchedulingPolicy::Pause);
let decom_sk_count = count_policy(SkSchedulingPolicy::Decomissioned);
tracing::info!(
"Loaded {} safekeepers from database. Active {active_sk_count}, activating {activating_sk_count}, \
paused {pause_sk_count}, decomissioned {decom_sk_count}.",
safekeepers.len()
);
tracing::info!("Loaded {} safekeepers from database.", safekeepers.len());
metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_safekeeper_nodes
@@ -1983,14 +1969,6 @@ impl Service {
}
});
// Check that there is enough safekeepers configured that we can create new timelines
let test_sk_res = this.safekeepers_for_new_timeline().await;
tracing::info!(
timeline_safekeeper_count = config.timeline_safekeeper_count,
timelines_onto_safekeepers = config.timelines_onto_safekeepers,
"viability test result (test timeline creation on safekeepers): {test_sk_res:?}",
);
Ok(this)
}
@@ -4428,7 +4406,7 @@ impl Service {
.await;
let mut failed = 0;
for (tid, (_, result)) in targeted_tenant_shards.iter().zip(results.into_iter()) {
for (tid, result) in targeted_tenant_shards.iter().zip(results.into_iter()) {
match result {
Ok(ok) => {
if tid.is_shard_zero() {
@@ -4795,7 +4773,7 @@ impl Service {
.await;
let mut valid_until = None;
for (node, r) in res {
for r in res {
match r {
Ok(lease) => {
if let Some(ref mut valid_until) = valid_until {
@@ -4805,7 +4783,7 @@ impl Service {
}
}
Err(e) => {
return Err(passthrough_api_error(&node, e));
return Err(ApiError::InternalServerError(anyhow::anyhow!(e)));
}
}
}
@@ -4919,7 +4897,7 @@ impl Service {
max_retries: u32,
timeout: Duration,
cancel: &CancellationToken,
) -> Vec<(Node, mgmt_api::Result<T>)>
) -> Vec<mgmt_api::Result<T>>
where
O: Fn(TenantShardId, PageserverClient) -> F + Copy,
F: std::future::Future<Output = mgmt_api::Result<T>>,
@@ -4940,16 +4918,16 @@ impl Service {
cancel,
)
.await;
(idx, node, r)
(idx, r)
});
}
while let Some((idx, node, r)) = futs.next().await {
results.push((idx, node, r.unwrap_or(Err(mgmt_api::Error::Cancelled))));
while let Some((idx, r)) = futs.next().await {
results.push((idx, r.unwrap_or(Err(mgmt_api::Error::Cancelled))));
}
results.sort_by_key(|(idx, _, _)| *idx);
results.into_iter().map(|(_, node, r)| (node, r)).collect()
results.sort_by_key(|(idx, _)| *idx);
results.into_iter().map(|(_, r)| r).collect()
}
/// Helper for safely working with the shards in a tenant remotely on pageservers, for example
@@ -5862,7 +5840,7 @@ impl Service {
return;
}
for (_, result) in self
for result in self
.tenant_for_shards_api(
attached,
|tenant_shard_id, client| async move {
@@ -5881,7 +5859,7 @@ impl Service {
}
}
for (_, result) in self
for result in self
.tenant_for_shards_api(
secondary,
|tenant_shard_id, client| async move {
@@ -7187,6 +7165,7 @@ impl Service {
self: &Arc<Self>,
node_id: NodeId,
policy_on_start: NodeSchedulingPolicy,
force: bool,
cancel: CancellationToken,
) -> Result<(), OperationError> {
let reconciler_config = ReconcilerConfigBuilder::new(ReconcilerPriority::Normal).build();
@@ -7194,23 +7173,28 @@ impl Service {
let mut waiters: Vec<ReconcilerWaiter> = Vec::new();
let mut tid_iter = create_shared_shard_iterator(self.clone());
let process_cancel = || async {
// Attempt to restore the node to its original scheduling policy
match self
.node_configure(node_id, None, Some(policy_on_start))
.await
{
Ok(()) => Err(OperationError::Cancelled),
Err(err) => {
Err(OperationError::FinalizeError(
format!(
"Failed to finalise delete cancel of {} by setting scheduling policy to {}: {}",
node_id, String::from(policy_on_start), err
)
.into(),
))
}
}
};
while !tid_iter.finished() {
if cancel.is_cancelled() {
match self
.node_configure(node_id, None, Some(policy_on_start))
.await
{
Ok(()) => return Err(OperationError::Cancelled),
Err(err) => {
return Err(OperationError::FinalizeError(
format!(
"Failed to finalise delete cancel of {} by setting scheduling policy to {}: {}",
node_id, String::from(policy_on_start), err
)
.into(),
));
}
}
return process_cancel().await;
}
operation_utils::validate_node_state(
@@ -7230,12 +7214,6 @@ impl Service {
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, scheduler) = locked.parts_mut();
// Calculate a schedule context here to avoid borrow checker issues.
let mut schedule_context = ScheduleContext::default();
for (_, shard) in tenants.range(TenantShardId::tenant_range(tid.tenant_id)) {
schedule_context.avoid(&shard.intent.all_pageservers());
}
let tenant_shard = match tenants.get_mut(&tid) {
Some(tenant_shard) => tenant_shard,
None => {
@@ -7261,6 +7239,9 @@ impl Service {
}
if tenant_shard.deref_node(node_id) {
// TODO(ephemeralsad): we should process all shards in a tenant at once, so
// we can avoid settling the tenant unevenly.
let mut schedule_context = ScheduleContext::new(ScheduleMode::Normal);
if let Err(e) = tenant_shard.schedule(scheduler, &mut schedule_context) {
tracing::error!(
"Refusing to delete node, shard {} can't be rescheduled: {e}",
@@ -7274,13 +7255,24 @@ impl Service {
)
}
// Do not wait for any reconciliations to finish if the deletion has been forced.
let waiter = self.maybe_configured_reconcile_shard(
tenant_shard,
nodes,
reconciler_config,
);
if let Some(some) = waiter {
waiters.push(some);
if force {
// Here we remove an existing observed location for the node we're removing, and it will
// not be re-added by a reconciler's completion because we filter out removed nodes in
// process_result.
//
// Note that we update the shard's observed state _after_ calling maybe_configured_reconcile_shard:
// that means any reconciles we spawned will know about the node we're deleting,
// enabling them to do live migrations if it's still online.
tenant_shard.observed.locations.remove(&node_id);
} else if let Some(waiter) = waiter {
waiters.push(waiter);
}
}
}
@@ -7294,21 +7286,7 @@ impl Service {
while !waiters.is_empty() {
if cancel.is_cancelled() {
match self
.node_configure(node_id, None, Some(policy_on_start))
.await
{
Ok(()) => return Err(OperationError::Cancelled),
Err(err) => {
return Err(OperationError::FinalizeError(
format!(
"Failed to finalise drain cancel of {} by setting scheduling policy to {}: {}",
node_id, String::from(policy_on_start), err
)
.into(),
));
}
}
return process_cancel().await;
}
tracing::info!("Awaiting {} pending delete reconciliations", waiters.len());
@@ -7913,6 +7891,7 @@ impl Service {
pub(crate) async fn start_node_delete(
self: &Arc<Self>,
node_id: NodeId,
force: bool,
) -> Result<(), ApiError> {
let (ongoing_op, node_policy, schedulable_nodes_count) = {
let locked = self.inner.read().unwrap();
@@ -7982,7 +7961,7 @@ impl Service {
tracing::info!("Delete background operation starting");
let res = service
.delete_node(node_id, policy_on_start, cancel)
.delete_node(node_id, policy_on_start, force, cancel)
.await;
match res {
Ok(()) => {
@@ -8768,7 +8747,7 @@ impl Service {
)
.await;
for ((tenant_shard_id, node, optimization), (_, secondary_status)) in
for ((tenant_shard_id, node, optimization), secondary_status) in
want_secondary_status.into_iter().zip(results.into_iter())
{
match secondary_status {

View File

@@ -2,12 +2,11 @@ from __future__ import annotations
import urllib.parse
from enum import StrEnum
from typing import TYPE_CHECKING, Any, final
from typing import TYPE_CHECKING, final
import requests
from requests.adapters import HTTPAdapter
from requests.auth import AuthBase
from requests.exceptions import ReadTimeout
from typing_extensions import override
from fixtures.log_helper import log
@@ -103,18 +102,6 @@ class EndpointHttpClient(requests.Session):
wait_until(offloaded)
def promote(self, safekeepers_lsn: dict[str, Any], disconnect: bool = False):
url = f"http://localhost:{self.external_port}/promote"
if disconnect:
try: # send first request to start promote and disconnect
self.post(url, data=safekeepers_lsn, timeout=0.001)
except ReadTimeout:
pass # wait on second request which returns on promotion finish
res = self.post(url, data=safekeepers_lsn)
res.raise_for_status()
json: dict[str, str] = res.json()
return json
def database_schema(self, database: str):
res = self.get(
f"http://localhost:{self.external_port}/database_schema?database={urllib.parse.quote(database, safe='')}",

View File

@@ -159,9 +159,6 @@ PAGESERVER_GLOBAL_METRICS: tuple[str, ...] = (
)
PAGESERVER_PER_TENANT_METRICS: tuple[str, ...] = (
# BEGIN_HADRON
"pageserver_active_storage_operations_count",
# END_HADRON
"pageserver_current_logical_size",
"pageserver_resident_physical_size",
"pageserver_io_operations_bytes_total",

View File

@@ -1315,14 +1315,6 @@ class NeonEnv:
# This feature is pending rollout.
# tenant_config["rel_size_v2_enabled"] = True
# Test authors tend to forget about the default 10min initial lease deadline
# when writing tests, which turns their immediate gc requests via mgmt API
# into no-ops. Override the binary default here, such that there is no initial
# lease deadline by default in tests. Tests that care can always override it
# themselves.
# Cf https://databricks.atlassian.net/browse/LKB-92?focusedCommentId=6722329
tenant_config["lsn_lease_length"] = "0s"
if self.pageserver_remote_storage is not None:
ps_cfg["remote_storage"] = remote_storage_to_toml_dict(
self.pageserver_remote_storage
@@ -1795,33 +1787,6 @@ def neon_env_builder(
record_property("preserve_database_files", builder.preserve_database_files)
@pytest.fixture(scope="function")
def neon_env_builder_local(
neon_env_builder: NeonEnvBuilder,
test_output_dir: Path,
pg_distrib_dir: Path,
) -> NeonEnvBuilder:
"""
Fixture to create a Neon environment for test with its own pg_install copy.
This allows the test to edit the list of available extensions in the
local instance of Postgres used for the test, and install extensions via
downloading them when a remote extension is tested, for instance, or
copying files around for local extension testing.
"""
test_local_pginstall = test_output_dir / "pg_install"
log.info(f"copy {pg_distrib_dir} to {test_local_pginstall}")
# We can't copy only the version that we are currently testing because other
# binaries like the storage controller need specific Postgres versions.
shutil.copytree(pg_distrib_dir, test_local_pginstall)
neon_env_builder.pg_distrib_dir = test_local_pginstall
log.info(f"local neon_env_builder.pg_distrib_dir: {neon_env_builder.pg_distrib_dir}")
return neon_env_builder
@dataclass
class PageserverPort:
pg: int
@@ -2119,11 +2084,14 @@ class NeonStorageController(MetricsGetter, LogUtils):
headers=self.headers(TokenScope.ADMIN),
)
def node_delete(self, node_id):
def node_delete(self, node_id, force: bool = False):
log.info(f"node_delete({node_id})")
query = f"{self.api}/control/v1/node/{node_id}/delete"
if force:
query += "?force=true"
self.request(
"PUT",
f"{self.api}/control/v1/node/{node_id}/delete",
query,
headers=self.headers(TokenScope.ADMIN),
)
@@ -5631,30 +5599,21 @@ def tenant_get_shards(
]
def wait_replica_caughtup(primary: Endpoint, secondary: Endpoint, primary_cursor=None, secondary_cursor=None):
if primary_cursor is not None:
primary_cursor.execute("SELECT pg_current_wal_flush_lsn()")
[res] = primary_cursor.fetchone()
primary_lsn = Lsn(res)
else:
primary_lsn = Lsn(
primary.safe_psql_scalar("SELECT pg_current_wal_flush_lsn()", log_query=False)
)
def wait_replica_caughtup(primary: Endpoint, secondary: Endpoint):
primary_lsn = Lsn(
primary.safe_psql_scalar("SELECT pg_current_wal_flush_lsn()", log_query=False)
)
while True:
if secondary_cursor is not None:
secondary_cursor.execute("SELECT pg_last_wal_replay_lsn()")
[res] = secondary_cursor.fetchone()
secondary_lsn = Lsn(res)
else:
secondary_lsn = Lsn(
secondary.safe_psql_scalar("SELECT pg_last_wal_replay_lsn()", log_query=False)
)
secondary_lsn = Lsn(
secondary.safe_psql_scalar("SELECT pg_last_wal_replay_lsn()", log_query=False)
)
caught_up = secondary_lsn >= primary_lsn
log.info(f"caughtup={caught_up}, primary_lsn={primary_lsn}, secondary_lsn={secondary_lsn}")
if caught_up:
return
time.sleep(1)
def log_replica_lag(primary: Endpoint, secondary: Endpoint):
last_replay_lsn = Lsn(
secondary.safe_psql_scalar("SELECT pg_last_wal_replay_lsn()", log_query=False)

View File

@@ -111,14 +111,6 @@ DEFAULT_PAGESERVER_ALLOWED_ERRORS = (
".*stalling layer flushes for compaction backpressure.*",
".*layer roll waiting for flush due to compaction backpressure.*",
".*BatchSpanProcessor.*",
# Can happen in tests that purposely wipe pageserver "local disk" data.
".*Local data loss suspected.*",
# Too many frozen layers error is normal during intensive benchmarks
".*too many frozen layers.*",
# Transient errors when resolving tenant shards by page service
".*Fail to resolve tenant shard in attempt.*",
# Expected warnings when pageserver has not refreshed GC info yet
".*pitr LSN/interval not found, skipping force image creation LSN calculation.*",
".*No broker updates received for a while.*",
*(
[

View File

@@ -1247,10 +1247,3 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
)
self.verbose_error(res)
return res.json()
def force_refresh_feature_flag(self, tenant_id: TenantId | TenantShardId):
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/force_refresh_feature_flag",
)
self.verbose_error(res)
return res.json()

View File

@@ -71,13 +71,7 @@ def test_pageserver_characterize_latencies_with_1_client_and_throughput_with_man
n_clients: int,
):
setup_and_run_pagebench_benchmark(
neon_env_builder,
zenbenchmark,
pg_bin,
n_tenants,
pgbench_scale,
duration,
n_clients,
neon_env_builder, zenbenchmark, pg_bin, n_tenants, pgbench_scale, duration, n_clients
)
@@ -92,8 +86,7 @@ def setup_and_run_pagebench_benchmark(
):
def record(metric, **kwargs):
zenbenchmark.record(
metric_name=f"pageserver_max_throughput_getpage_at_latest_lsn.{metric}",
**kwargs,
metric_name=f"pageserver_max_throughput_getpage_at_latest_lsn.{metric}", **kwargs
)
params: dict[str, tuple[Any, dict[str, Any]]] = {}
@@ -111,7 +104,9 @@ def setup_and_run_pagebench_benchmark(
# configure cache sizes like in prod
page_cache_size = 16384
max_file_descriptors = 500000
neon_env_builder.pageserver_config_override = f"page_cache_size={page_cache_size}; max_file_descriptors={max_file_descriptors}; disk_usage_based_eviction={{enabled = false}}"
neon_env_builder.pageserver_config_override = (
f"page_cache_size={page_cache_size}; max_file_descriptors={max_file_descriptors}"
)
tracing_config = PageserverTracingConfig(
sampling_ratio=(0, 1000),
@@ -127,10 +122,7 @@ def setup_and_run_pagebench_benchmark(
page_cache_size * 8192,
{"unit": "byte"},
),
"pageserver_config_override.max_file_descriptors": (
max_file_descriptors,
{"unit": ""},
),
"pageserver_config_override.max_file_descriptors": (max_file_descriptors, {"unit": ""}),
"pageserver_config_override.sampling_ratio": (ratio, {"unit": ""}),
}
)

View File

@@ -117,9 +117,7 @@ class NeonBranch:
def create_child_branch(self) -> NeonBranch | None:
return self.project.create_branch(self.id)
def create_ro_endpoint(self) -> NeonEndpoint | None:
if not self.project.check_limit_endpoints():
return None
def create_ro_endpoint(self) -> NeonEndpoint:
return NeonEndpoint(
self.project,
self.neon_api.create_endpoint(self.project_id, self.id, "read_only", {})["endpoint"],
@@ -153,26 +151,11 @@ class NeonBranch:
return
self.updated_at = datetime.fromisoformat(res["branch"]["updated_at"])
parent_id: str = res["branch"]["parent_id"]
# XXX Retry get parent details to work around the issue
# https://databricks.atlassian.net/browse/LKB-279
target_time = datetime.now() + timedelta(seconds=30)
while datetime.now() < target_time:
try:
parent_def = self.neon_api.get_branch_details(self.project_id, parent_id)
except HTTPError as he:
if he.response.status_code == 404:
log.info("Branch not found, waiting...")
time.sleep(1)
else:
raise HTTPError(he) from he
else:
break
else:
raise RuntimeError(f"Branch {parent_id} not found")
# Creates an object for the parent branch
# After the reset operation a new parent branch is created
parent = NeonBranch(self.project, parent_def, True)
parent = NeonBranch(
self.project, self.neon_api.get_branch_details(self.project_id, parent_id), True
)
self.project.branches[parent_id] = parent
self.parent = parent
parent.children[self.id] = self
@@ -185,21 +168,29 @@ class NeonBranch:
source_timestamp: str | None = None,
preserve_under_name: str | None = None,
) -> dict[str, Any] | None:
if not self.project.check_limit_branches():
return None
endpoints = [ep for ep in self.endpoints.values() if ep.type == "read_only"]
# Terminate all the benchmarks running to prevent errors. Errors in benchmark during pgbench are expected
for ep in endpoints:
ep.terminate_benchmark()
self.terminate_benchmark()
res: dict[str, Any] = self.neon_api.restore_branch(
self.project_id,
self.id,
source_branch_id,
source_lsn,
source_timestamp,
preserve_under_name,
)
try:
res: dict[str, Any] = self.neon_api.restore_branch(
self.project_id,
self.id,
source_branch_id,
source_lsn,
source_timestamp,
preserve_under_name,
)
except HTTPError as he:
if (
he.response.status_code == 422
and he.response.json()["code"] == "BRANCHES_LIMIT_EXCEEDED"
):
log.info("Branch limit exceeded, skipping")
return None
else:
raise HTTPError(he) from he
self.project.wait()
self.start_benchmark()
for ep in endpoints:
@@ -248,30 +239,19 @@ class NeonProject:
def delete(self) -> None:
self.neon_api.delete_project(self.id)
def check_limit_branches(self) -> bool:
if self.limits["max_branches"] == -1 or len(self.branches) < self.limits["max_branches"]:
return True
log.info("branch limit exceeded (%s/%s)", len(self.branches), self.limits["max_branches"])
return False
def check_limit_endpoints(self) -> bool:
if (
self.limits["max_read_only_endpoints"] == -1
or self.read_only_endpoints_total < self.limits["max_read_only_endpoints"]
):
return True
log.info(
"Maximum read only endpoint limit exceeded (%s/%s)",
self.read_only_endpoints_total,
self.limits["max_read_only_endpoints"],
)
return False
def create_branch(self, parent_id: str | None = None) -> NeonBranch | None:
self.wait()
if not self.check_limit_branches():
return None
branch_def = self.neon_api.create_branch(self.id, parent_id=parent_id)
try:
branch_def = self.neon_api.create_branch(self.id, parent_id=parent_id)
except HTTPError as he:
if (
he.response.status_code == 422
and he.response.json()["code"] == "BRANCHES_LIMIT_EXCEEDED"
):
log.info("Branch limit exceeded, skipping")
return None
else:
raise HTTPError(he) from he
new_branch = NeonBranch(self, branch_def)
self.wait()
return new_branch
@@ -408,9 +388,17 @@ def do_action(project: NeonProject, action: str) -> bool:
log.info("Action: %s", action)
if action == "new_branch":
log.info("Trying to create a new branch")
if 0 <= project.limits["max_branches"] <= len(project.branches):
log.info(
"Maximum branch limit exceeded (%s of %s)",
len(project.branches),
project.limits["max_branches"],
)
return False
parent = project.branches[
random.choice(list(set(project.branches.keys()) - project.reset_branches))
]
log.info("Parent: %s", parent)
child = parent.create_child_branch()
if child is None:
return False
@@ -425,11 +413,16 @@ def do_action(project: NeonProject, action: str) -> bool:
log.info("Leaf branches not found, skipping")
return False
elif action == "new_ro_endpoint":
if 0 <= project.limits["max_read_only_endpoints"] <= project.read_only_endpoints_total:
log.info(
"Maximum read only endpoint limit exceeded (%s of %s)",
project.read_only_endpoints_total,
project.limits["max_read_only_endpoints"],
)
return False
ep = random.choice(
[br for br in project.branches.values() if br.id not in project.reset_branches]
).create_ro_endpoint()
if ep is None:
return False
log.info("Created the RO endpoint with id %s branch: %s", ep.id, ep.branch.id)
ep.start_benchmark()
elif action == "delete_ro_endpoint":

View File

@@ -1,32 +0,0 @@
\echo Use "CREATE EXTENSION test_event_trigger_extension" to load this file. \quit
CREATE SCHEMA event_trigger;
create sequence if not exists event_trigger.seq_schema_version as int cycle;
create or replace function event_trigger.increment_schema_version()
returns event_trigger
security definer
language plpgsql
as $$
begin
perform pg_catalog.nextval('event_trigger.seq_schema_version');
end;
$$;
create or replace function event_trigger.get_schema_version()
returns int
security definer
language sql
as $$
select last_value from event_trigger.seq_schema_version;
$$;
-- On DDL event, increment the schema version number
create event trigger event_trigger_watch_ddl
on ddl_command_end
execute procedure event_trigger.increment_schema_version();
create event trigger event_trigger_watch_drop
on sql_drop
execute procedure event_trigger.increment_schema_version();

View File

@@ -1,8 +0,0 @@
default_version = '1.0'
comment = 'Test extension with Event Trigger'
# make sure the extension objects are owned by the bootstrap user
# to check that the SECURITY DEFINER event trigger function is still
# called during non-superuser DDL events.
superuser = true
trusted = true

View File

@@ -165,7 +165,6 @@ def test_fully_custom_config(positive_env: NeonEnv):
"gc_horizon": 23 * (1024 * 1024),
"gc_period": "2h 13m",
"image_creation_threshold": 7,
"image_layer_force_creation_period": "1m",
"pitr_interval": "1m",
"lagging_wal_timeout": "23m",
"lazy_slru_download": True,

View File

@@ -7,7 +7,6 @@ from typing import TYPE_CHECKING
import pytest
from fixtures.common_types import Lsn, TimelineId
from fixtures.log_helper import log
from fixtures.neon_fixtures import wait_for_last_flush_lsn
from fixtures.pageserver.http import TimelineCreate406
from fixtures.utils import query_scalar, skip_in_debug_build
@@ -163,9 +162,6 @@ def test_branch_creation_before_gc(neon_simple_env: NeonEnv):
)
lsn = Lsn(res[2][0][0])
# Wait for all WAL to reach the pageserver, so GC cutoff LSN is greater than `lsn`.
wait_for_last_flush_lsn(env, endpoint0, tenant, b0)
# Use `failpoint=sleep` and `threading` to make the GC iteration triggers *before* the
# branch creation task but the individual timeline GC iteration happens *after*
# the branch creation task.

View File

@@ -944,78 +944,3 @@ def test_image_layer_compression(neon_env_builder: NeonEnvBuilder, enabled: bool
f"SELECT count(*) FROM foo WHERE id={v} and val=repeat('abcde{v:0>3}', 500)"
)
assert res[0][0] == 1
# BEGIN_HADRON
def get_layer_map(env, tenant_shard_id, timeline_id, ps_id):
client = env.pageservers[ps_id].http_client()
layer_map = client.layer_map_info(tenant_shard_id, timeline_id)
image_layer_count = 0
delta_layer_count = 0
for layer in layer_map.historic_layers:
if layer.kind == "Image":
image_layer_count += 1
elif layer.kind == "Delta":
delta_layer_count += 1
return image_layer_count, delta_layer_count
def test_image_creation_timeout(neon_env_builder: NeonEnvBuilder):
"""
Tests that page server can force creating new images if image creation timeout is enabled
"""
# use large knobs to disable L0 compaction/image creation except for the force image creation
tenant_conf = {
"compaction_threshold": "100",
"image_creation_threshold": "100",
"image_layer_creation_check_threshold": "1",
"checkpoint_distance": 10 * 1024,
"checkpoint_timeout": "1s",
"image_layer_force_creation_period": "1s",
# The lsn for forced image layer creations is calculated once every 10 minutes.
# Hence, drive compaction manually such that the test doesn't compute it at the
# wrong time.
"compaction_period": "0s",
}
# consider every tenant large to run the image layer generation check more eagerly
neon_env_builder.pageserver_config_override = (
"image_layer_generation_large_timeline_threshold=0"
)
neon_env_builder.num_pageservers = 1
neon_env_builder.num_safekeepers = 1
env = neon_env_builder.init_start(initial_tenant_conf=tenant_conf)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
endpoint = env.endpoints.create_start("main")
endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)")
# Generate some rows.
for v in range(10):
endpoint.safe_psql(f"INSERT INTO foo (id, val) VALUES ({v}, repeat('abcde{v:0>3}', 500))")
# Sleep a bit such that the inserts are considered when calculating the forced image layer creation LSN.
time.sleep(2)
def check_force_image_creation():
ps_http = env.pageserver.http_client()
ps_http.timeline_compact(tenant_id, timeline_id)
image, delta = get_layer_map(env, tenant_id, timeline_id, 0)
log.info(f"images: {image}, deltas: {delta}")
assert image > 0
env.pageserver.assert_log_contains("forcing L0 compaction of")
env.pageserver.assert_log_contains("forcing image creation for partitioned range")
wait_until(check_force_image_creation)
endpoint.stop_and_destroy()
env.pageserver.allowed_errors.append(
".*created delta file of size.*larger than double of target.*"
)
# END_HADRON

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
import os
import platform
import shutil
import tarfile
from enum import StrEnum
from pathlib import Path
@@ -30,6 +31,27 @@ if TYPE_CHECKING:
from werkzeug.wrappers.request import Request
# use neon_env_builder_local fixture to override the default neon_env_builder fixture
# and use a test-specific pg_install instead of shared one
@pytest.fixture(scope="function")
def neon_env_builder_local(
neon_env_builder: NeonEnvBuilder,
test_output_dir: Path,
pg_distrib_dir: Path,
) -> NeonEnvBuilder:
test_local_pginstall = test_output_dir / "pg_install"
log.info(f"copy {pg_distrib_dir} to {test_local_pginstall}")
# We can't copy only the version that we are currently testing because other
# binaries like the storage controller need specific Postgres versions.
shutil.copytree(pg_distrib_dir, test_local_pginstall)
neon_env_builder.pg_distrib_dir = test_local_pginstall
log.info(f"local neon_env_builder.pg_distrib_dir: {neon_env_builder.pg_distrib_dir}")
return neon_env_builder
@final
class RemoteExtension(StrEnum):
SQL_ONLY = "test_extension_sql_only"

Some files were not shown because too many files have changed in this diff Show More