Compare commits

..

1 Commits

Author SHA1 Message Date
Aleksandr Sarantsev
e0b23f841d Fix reattach concurrency 2025-06-13 23:33:38 +04:00
50 changed files with 441 additions and 1957 deletions

View File

@@ -38,11 +38,6 @@ on:
required: false
default: 1
type: number
rerun-failed:
description: 'rerun failed tests to ignore flaky tests'
required: false
default: true
type: boolean
defaults:
run:
@@ -384,7 +379,7 @@ jobs:
- name: Pytest regression tests
continue-on-error: ${{ matrix.lfc_state == 'with-lfc' && inputs.build-type == 'debug' }}
uses: ./.github/actions/run-python-test-set
timeout-minutes: ${{ (inputs.build-type == 'release' && inputs.sanitizers != 'enabled') && 75 || 180 }}
timeout-minutes: ${{ inputs.sanitizers != 'enabled' && 75 || 180 }}
with:
build_type: ${{ inputs.build-type }}
test_selection: regress
@@ -392,14 +387,14 @@ jobs:
run_with_real_s3: true
real_s3_bucket: neon-github-ci-tests
real_s3_region: eu-central-1
rerun_failed: ${{ inputs.rerun-failed }}
rerun_failed: ${{ inputs.test-run-count == 1 }}
pg_version: ${{ matrix.pg_version }}
sanitizers: ${{ inputs.sanitizers }}
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
# `--session-timeout` is equal to (timeout-minutes - 10 minutes) * 60 seconds.
# Attempt to stop tests gracefully to generate test reports
# until they are forcibly stopped by the stricter `timeout-minutes` limit.
extra_params: --session-timeout=${{ (inputs.build-type == 'release' && inputs.sanitizers != 'enabled') && 3000 || 10200 }} --count=${{ inputs.test-run-count }}
extra_params: --session-timeout=${{ inputs.sanitizers != 'enabled' && 3000 || 10200 }} --count=${{ inputs.test-run-count }}
${{ inputs.test-selection != '' && format('-k "{0}"', inputs.test-selection) || '' }}
env:
TEST_RESULT_CONNSTR: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}

View File

@@ -58,7 +58,6 @@ jobs:
test-cfg: ${{ inputs.pg-versions }}
test-selection: ${{ inputs.test-selection }}
test-run-count: ${{ fromJson(inputs.run-count) }}
rerun-failed: false
secrets: inherit
create-test-report:

View File

@@ -199,28 +199,6 @@ jobs:
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
secrets: inherit
validate-compute-manifest:
runs-on: ubuntu-22.04
needs: [ meta, check-permissions ]
# We do need to run this in `.*-rc-pr` because of hotfixes.
if: ${{ contains(fromJSON('["pr", "push-main", "storage-rc-pr", "proxy-rc-pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Set up Node.js
uses: actions/setup-node@49933ea5288caeca8642d1e84afbd3f7d6820020 # v4.4.0
with:
node-version: '24'
- name: Validate manifest against schema
run: |
make -C compute manifest-schema-validation
build-and-test-locally:
needs: [ meta, build-build-tools-image ]
# We do need to run this in `.*-rc-pr` because of hotfixes.

View File

@@ -1,151 +0,0 @@
name: Build and Test Fully
on:
schedule:
# * is a special character in YAML so you have to quote this string
# ┌───────────── minute (0 - 59)
# │ ┌───────────── hour (0 - 23)
# │ │ ┌───────────── day of the month (1 - 31)
# │ │ │ ┌───────────── month (1 - 12 or JAN-DEC)
# │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT)
- cron: '0 3 * * *' # run once a day, timezone is utc
workflow_dispatch:
defaults:
run:
shell: bash -euxo pipefail {0}
concurrency:
# Allow only one workflow per any non-`main` branch.
group: ${{ github.workflow }}-${{ github.ref_name }}-${{ github.ref_name == 'main' && github.sha || 'anysha' }}
cancel-in-progress: true
env:
RUST_BACKTRACE: 1
COPT: '-Werror'
jobs:
tag:
runs-on: [ self-hosted, small ]
container: ${{ vars.NEON_DEV_AWS_ACCOUNT_ID }}.dkr.ecr.${{ vars.AWS_ECR_REGION }}.amazonaws.com/base:pinned
outputs:
build-tag: ${{steps.build-tag.outputs.tag}}
steps:
# Need `fetch-depth: 0` to count the number of commits in the branch
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
with:
fetch-depth: 0
- name: Get build tag
run: |
echo run:$GITHUB_RUN_ID
echo ref:$GITHUB_REF_NAME
echo rev:$(git rev-list --count HEAD)
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
echo "tag=$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
echo "tag=release-$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
elif [[ "$GITHUB_REF_NAME" == "release-proxy" ]]; then
echo "tag=release-proxy-$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
elif [[ "$GITHUB_REF_NAME" == "release-compute" ]]; then
echo "tag=release-compute-$(git rev-list --count HEAD)" >> $GITHUB_OUTPUT
else
echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to either 'main' or 'release', 'release-proxy', 'release-compute'"
echo "tag=$GITHUB_RUN_ID" >> $GITHUB_OUTPUT
fi
shell: bash
id: build-tag
build-build-tools-image:
uses: ./.github/workflows/build-build-tools-image.yml
secrets: inherit
build-and-test-locally:
needs: [ tag, build-build-tools-image ]
strategy:
fail-fast: false
matrix:
arch: [ x64, arm64 ]
build-type: [ debug, release ]
uses: ./.github/workflows/_build-and-test-locally.yml
with:
arch: ${{ matrix.arch }}
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
build-tag: ${{ needs.tag.outputs.build-tag }}
build-type: ${{ matrix.build-type }}
rerun-failed: false
test-cfg: '[{"pg_version":"v14", "lfc_state": "with-lfc"},
{"pg_version":"v15", "lfc_state": "with-lfc"},
{"pg_version":"v16", "lfc_state": "with-lfc"},
{"pg_version":"v17", "lfc_state": "with-lfc"},
{"pg_version":"v14", "lfc_state": "without-lfc"},
{"pg_version":"v15", "lfc_state": "without-lfc"},
{"pg_version":"v16", "lfc_state": "without-lfc"},
{"pg_version":"v17", "lfc_state": "withouts-lfc"}]'
secrets: inherit
create-test-report:
needs: [ build-and-test-locally, build-build-tools-image ]
if: ${{ !cancelled() }}
permissions:
id-token: write # aws-actions/configure-aws-credentials
statuses: write
contents: write
pull-requests: write
outputs:
report-url: ${{ steps.create-allure-report.outputs.report-url }}
runs-on: [ self-hosted, small ]
container:
image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
credentials:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
options: --init
steps:
- name: Harden the runner (Audit all outbound calls)
uses: step-security/harden-runner@4d991eb9b905ef189e4c376166672c3f2f230481 # v2.11.0
with:
egress-policy: audit
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Create Allure report
if: ${{ !cancelled() }}
id: create-allure-report
uses: ./.github/actions/allure-report-generate
with:
store-test-results-into-db: true
aws-oidc-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
- uses: actions/github-script@60a0d83039c74a4aee543508d2ffcb1c3799cdea # v7.0.1
if: ${{ !cancelled() }}
with:
# Retry script for 5XX server errors: https://github.com/actions/github-script#retries
retries: 5
script: |
const report = {
reportUrl: "${{ steps.create-allure-report.outputs.report-url }}",
reportJsonUrl: "${{ steps.create-allure-report.outputs.report-json-url }}",
}
const coverage = {}
const script = require("./scripts/comment-test-report.js")
await script({
github,
context,
fetch,
report,
coverage,
})

View File

@@ -79,7 +79,6 @@ jobs:
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
build-tag: ${{ needs.tag.outputs.build-tag }}
build-type: ${{ matrix.build-type }}
rerun-failed: false
test-cfg: '[{"pg_version":"v17"}]'
sanitizers: enabled
secrets: inherit

3
compute/.gitignore vendored
View File

@@ -3,6 +3,3 @@ etc/neon_collector.yml
etc/neon_collector_autoscaling.yml
etc/sql_exporter.yml
etc/sql_exporter_autoscaling.yml
# Node.js dependencies
node_modules/

View File

@@ -48,11 +48,3 @@ jsonnetfmt-test:
.PHONY: jsonnetfmt-format
jsonnetfmt-format:
jsonnetfmt --in-place $(jsonnet_files)
.PHONY: manifest-schema-validation
manifest-schema-validation: node_modules
node_modules/.bin/jsonschema validate -d https://json-schema.org/draft/2020-12/schema manifest.schema.json manifest.yaml
node_modules: package.json
npm install
touch node_modules

View File

@@ -1722,29 +1722,11 @@ FROM extensions-${EXTENSIONS} AS neon-pg-ext-build
# Compile the Neon-specific `compute_ctl`, `fast_import`, and `local_proxy` binaries
#
#########################################################################################
FROM $REPOSITORY/$IMAGE:$TAG AS compute-tools-plan
ARG BUILD_TAG
ENV BUILD_TAG=$BUILD_TAG
WORKDIR /home/nonroot
USER nonroot
# Copy entire project to get Cargo.* files with proper dependencies for the whole project
COPY --chown=nonroot . .
RUN cargo chef prepare --recipe-path recipe.json
FROM $REPOSITORY/$IMAGE:$TAG AS compute-tools
ARG BUILD_TAG
ENV BUILD_TAG=$BUILD_TAG
USER nonroot
COPY --from=compute-tools-plan /home/nonroot/recipe.json recipe.json
RUN --mount=type=cache,uid=1000,target=/home/nonroot/.cargo/registry \
--mount=type=cache,uid=1000,target=/home/nonroot/.cargo/git \
--mount=type=cache,uid=1000,target=/home/nonroot/target \
mold -run cargo chef cook --locked --profile release-line-debug-size-lto --recipe-path recipe.json
# Copy entire project to get Cargo.* files with proper dependencies for the whole project
COPY --chown=nonroot . .
RUN --mount=type=cache,uid=1000,target=/home/nonroot/.cargo/registry \

View File

@@ -1,209 +0,0 @@
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "Neon Compute Manifest Schema",
"description": "Schema for Neon compute node configuration manifest",
"type": "object",
"properties": {
"pg_settings": {
"type": "object",
"properties": {
"common": {
"type": "object",
"properties": {
"client_connection_check_interval": {
"type": "string",
"description": "Check for client disconnection interval in milliseconds"
},
"effective_io_concurrency": {
"type": "string",
"description": "Effective IO concurrency setting"
},
"fsync": {
"type": "string",
"enum": ["on", "off"],
"description": "Whether to force fsync to disk"
},
"hot_standby": {
"type": "string",
"enum": ["on", "off"],
"description": "Whether hot standby is enabled"
},
"idle_in_transaction_session_timeout": {
"type": "string",
"description": "Timeout for idle transactions in milliseconds"
},
"listen_addresses": {
"type": "string",
"description": "Addresses to listen on"
},
"log_connections": {
"type": "string",
"enum": ["on", "off"],
"description": "Whether to log connections"
},
"log_disconnections": {
"type": "string",
"enum": ["on", "off"],
"description": "Whether to log disconnections"
},
"log_temp_files": {
"type": "string",
"description": "Size threshold for logging temporary files in KB"
},
"log_error_verbosity": {
"type": "string",
"enum": ["terse", "verbose", "default"],
"description": "Error logging verbosity level"
},
"log_min_error_statement": {
"type": "string",
"description": "Minimum error level for statement logging"
},
"maintenance_io_concurrency": {
"type": "string",
"description": "Maintenance IO concurrency setting"
},
"max_connections": {
"type": "string",
"description": "Maximum number of connections"
},
"max_replication_flush_lag": {
"type": "string",
"description": "Maximum replication flush lag"
},
"max_replication_slots": {
"type": "string",
"description": "Maximum number of replication slots"
},
"max_replication_write_lag": {
"type": "string",
"description": "Maximum replication write lag"
},
"max_wal_senders": {
"type": "string",
"description": "Maximum number of WAL senders"
},
"max_wal_size": {
"type": "string",
"description": "Maximum WAL size"
},
"neon.unstable_extensions": {
"type": "string",
"description": "List of unstable extensions"
},
"neon.protocol_version": {
"type": "string",
"description": "Neon protocol version"
},
"password_encryption": {
"type": "string",
"description": "Password encryption method"
},
"restart_after_crash": {
"type": "string",
"enum": ["on", "off"],
"description": "Whether to restart after crash"
},
"superuser_reserved_connections": {
"type": "string",
"description": "Number of reserved connections for superuser"
},
"synchronous_standby_names": {
"type": "string",
"description": "Names of synchronous standby servers"
},
"wal_keep_size": {
"type": "string",
"description": "WAL keep size"
},
"wal_level": {
"type": "string",
"description": "WAL level"
},
"wal_log_hints": {
"type": "string",
"enum": ["on", "off"],
"description": "Whether to log hints in WAL"
},
"wal_sender_timeout": {
"type": "string",
"description": "WAL sender timeout in milliseconds"
}
},
"required": [
"client_connection_check_interval",
"effective_io_concurrency",
"fsync",
"hot_standby",
"idle_in_transaction_session_timeout",
"listen_addresses",
"log_connections",
"log_disconnections",
"log_temp_files",
"log_error_verbosity",
"log_min_error_statement",
"maintenance_io_concurrency",
"max_connections",
"max_replication_flush_lag",
"max_replication_slots",
"max_replication_write_lag",
"max_wal_senders",
"max_wal_size",
"neon.unstable_extensions",
"neon.protocol_version",
"password_encryption",
"restart_after_crash",
"superuser_reserved_connections",
"synchronous_standby_names",
"wal_keep_size",
"wal_level",
"wal_log_hints",
"wal_sender_timeout"
]
},
"replica": {
"type": "object",
"properties": {
"hot_standby": {
"type": "string",
"enum": ["on", "off"],
"description": "Whether hot standby is enabled for replicas"
}
},
"required": ["hot_standby"]
},
"per_version": {
"type": "object",
"patternProperties": {
"^1[4-7]$": {
"type": "object",
"properties": {
"common": {
"type": "object",
"properties": {
"io_combine_limit": {
"type": "string",
"description": "IO combine limit"
}
}
},
"replica": {
"type": "object",
"properties": {
"recovery_prefetch": {
"type": "string",
"enum": ["on", "off"],
"description": "Whether to enable recovery prefetch for PostgreSQL replicas"
}
}
}
}
}
}
}
},
"required": ["common", "replica", "per_version"]
}
},
"required": ["pg_settings"]
}

View File

@@ -105,17 +105,17 @@ pg_settings:
# Neon hot standby ignores pages that are not in the shared_buffers
recovery_prefetch: "off"
16:
common: {}
common:
replica:
# prefetching of blocks referenced in WAL doesn't make sense for us
# Neon hot standby ignores pages that are not in the shared_buffers
recovery_prefetch: "off"
15:
common: {}
common:
replica:
# prefetching of blocks referenced in WAL doesn't make sense for us
# Neon hot standby ignores pages that are not in the shared_buffers
recovery_prefetch: "off"
14:
common: {}
replica: {}
common:
replica:

View File

@@ -1,37 +0,0 @@
{
"name": "neon-compute",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "neon-compute",
"dependencies": {
"@sourcemeta/jsonschema": "9.3.4"
}
},
"node_modules/@sourcemeta/jsonschema": {
"version": "9.3.4",
"resolved": "https://registry.npmjs.org/@sourcemeta/jsonschema/-/jsonschema-9.3.4.tgz",
"integrity": "sha512-hkujfkZAIGXUs4U//We9faZW8LZ4/H9LqagRYsFSulH/VLcKPNhZyCTGg7AhORuzm27zqENvKpnX4g2FzudYFw==",
"cpu": [
"x64",
"arm64"
],
"license": "AGPL-3.0",
"os": [
"darwin",
"linux",
"win32"
],
"bin": {
"jsonschema": "cli.js"
},
"engines": {
"node": ">=16"
},
"funding": {
"url": "https://github.com/sponsors/sourcemeta"
}
}
}
}

View File

@@ -1,7 +0,0 @@
{
"name": "neon-compute",
"private": true,
"dependencies": {
"@sourcemeta/jsonschema": "9.3.4"
}
}

View File

@@ -354,6 +354,11 @@ impl ComputeNode {
// that can affect `compute_ctl` and prevent it from properly configuring the database schema.
// Unset them via connection string options before connecting to the database.
// N.B. keep it in sync with `ZENITH_OPTIONS` in `get_maintenance_client()`.
//
// TODO(ololobus): we currently pass `-c default_transaction_read_only=off` from control plane
// as well. After rolling out this code, we can remove this parameter from control plane.
// In the meantime, double-passing is fine, the last value is applied.
// See: <https://github.com/neondatabase/cloud/blob/133dd8c4dbbba40edfbad475bf6a45073ca63faf/goapp/controlplane/internal/pkg/compute/provisioner/provisioner_common.go#L70>
const EXTRA_OPTIONS: &str = "-c role=cloud_admin -c default_transaction_read_only=off -c search_path=public -c statement_timeout=0";
let options = match conn_conf.get_options() {
Some(options) => format!("{} {}", options, EXTRA_OPTIONS),

View File

@@ -18,7 +18,7 @@ use clap::Parser;
use compute_api::requests::ComputeClaimsScope;
use compute_api::spec::ComputeMode;
use control_plane::broker::StorageBroker;
use control_plane::endpoint::{ComputeControlPlane, PageserverProtocol};
use control_plane::endpoint::ComputeControlPlane;
use control_plane::endpoint_storage::{ENDPOINT_STORAGE_DEFAULT_ADDR, EndpointStorage};
use control_plane::local_env;
use control_plane::local_env::{
@@ -605,14 +605,6 @@ struct EndpointCreateCmdArgs {
#[clap(long, help = "Postgres version")]
pg_version: u32,
/// Use gRPC to communicate with Pageservers, by generating grpc:// connstrings.
///
/// Specified on creation such that it's retained across reconfiguration and restarts.
///
/// NB: not yet supported by computes.
#[clap(long)]
grpc: bool,
#[clap(
long,
help = "If set, the node will be a hot replica on the specified timeline",
@@ -1459,7 +1451,6 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
args.internal_http_port,
args.pg_version,
mode,
args.grpc,
!args.update_catalog,
false,
)?;
@@ -1500,20 +1491,13 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
let (pageservers, stripe_size) = if let Some(pageserver_id) = pageserver_id {
let conf = env.get_pageserver_conf(pageserver_id).unwrap();
// Use gRPC if requested.
let pageserver = if endpoint.grpc {
let grpc_addr = conf.listen_grpc_addr.as_ref().expect("bad config");
let (host, port) = parse_host_port(grpc_addr)?;
let port = port.unwrap_or(DEFAULT_PAGESERVER_GRPC_PORT);
(PageserverProtocol::Grpc, host, port)
} else {
let (host, port) = parse_host_port(&conf.listen_pg_addr)?;
let port = port.unwrap_or(5432);
(PageserverProtocol::Libpq, host, port)
};
// If caller is telling us what pageserver to use, this is not a tenant which is
// fully managed by storage controller, therefore not sharded.
(vec![pageserver], DEFAULT_STRIPE_SIZE)
let parsed = parse_host_port(&conf.listen_pg_addr).expect("Bad config");
(
vec![(parsed.0, parsed.1.unwrap_or(5432))],
// If caller is telling us what pageserver to use, this is not a tenant which is
// full managed by storage controller, therefore not sharded.
DEFAULT_STRIPE_SIZE,
)
} else {
// Look up the currently attached location of the tenant, and its striping metadata,
// to pass these on to postgres.
@@ -1532,20 +1516,11 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
.await?;
}
let pageserver = if endpoint.grpc {
(
PageserverProtocol::Grpc,
Host::parse(&shard.listen_grpc_addr.expect("no gRPC address"))?,
shard.listen_grpc_port.expect("no gRPC port"),
)
} else {
(
PageserverProtocol::Libpq,
Host::parse(&shard.listen_pg_addr)?,
shard.listen_pg_port,
)
};
anyhow::Ok(pageserver)
anyhow::Ok((
Host::parse(&shard.listen_pg_addr)
.expect("Storage controller reported bad hostname"),
shard.listen_pg_port,
))
}),
)
.await?;
@@ -1600,19 +1575,11 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
.get(endpoint_id.as_str())
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
let pageservers = if let Some(ps_id) = args.endpoint_pageserver_id {
let conf = env.get_pageserver_conf(ps_id)?;
// Use gRPC if requested.
let pageserver = if endpoint.grpc {
let grpc_addr = conf.listen_grpc_addr.as_ref().expect("bad config");
let (host, port) = parse_host_port(grpc_addr)?;
let port = port.unwrap_or(DEFAULT_PAGESERVER_GRPC_PORT);
(PageserverProtocol::Grpc, host, port)
} else {
let (host, port) = parse_host_port(&conf.listen_pg_addr)?;
let port = port.unwrap_or(5432);
(PageserverProtocol::Libpq, host, port)
};
vec![pageserver]
let pageserver = PageServerNode::from_env(env, env.get_pageserver_conf(ps_id)?);
vec![(
pageserver.pg_connection_config.host().clone(),
pageserver.pg_connection_config.port(),
)]
} else {
let storage_controller = StorageController::from_env(env);
storage_controller
@@ -1621,21 +1588,11 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
.shards
.into_iter()
.map(|shard| {
// Use gRPC if requested.
if endpoint.grpc {
(
PageserverProtocol::Grpc,
Host::parse(&shard.listen_grpc_addr.expect("no gRPC address"))
.expect("bad hostname"),
shard.listen_grpc_port.expect("no gRPC port"),
)
} else {
(
PageserverProtocol::Libpq,
Host::parse(&shard.listen_pg_addr).expect("bad hostname"),
shard.listen_pg_port,
)
}
(
Host::parse(&shard.listen_pg_addr)
.expect("Storage controller reported malformed host"),
shard.listen_pg_port,
)
})
.collect::<Vec<_>>()
};

View File

@@ -37,7 +37,6 @@
//! ```
//!
use std::collections::BTreeMap;
use std::fmt::Display;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream};
use std::path::PathBuf;
use std::process::Command;
@@ -77,6 +76,7 @@ use utils::id::{NodeId, TenantId, TimelineId};
use crate::local_env::LocalEnv;
use crate::postgresql_conf::PostgresConf;
use crate::storage_controller::StorageController;
// contents of a endpoint.json file
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
@@ -89,7 +89,6 @@ pub struct EndpointConf {
external_http_port: u16,
internal_http_port: u16,
pg_version: u32,
grpc: bool,
skip_pg_catalog_updates: bool,
reconfigure_concurrency: usize,
drop_subscriptions_before_start: bool,
@@ -193,7 +192,6 @@ impl ComputeControlPlane {
internal_http_port: Option<u16>,
pg_version: u32,
mode: ComputeMode,
grpc: bool,
skip_pg_catalog_updates: bool,
drop_subscriptions_before_start: bool,
) -> Result<Arc<Endpoint>> {
@@ -228,7 +226,6 @@ impl ComputeControlPlane {
// we also skip catalog updates in the cloud.
skip_pg_catalog_updates,
drop_subscriptions_before_start,
grpc,
reconfigure_concurrency: 1,
features: vec![],
cluster: None,
@@ -247,7 +244,6 @@ impl ComputeControlPlane {
internal_http_port,
pg_port,
pg_version,
grpc,
skip_pg_catalog_updates,
drop_subscriptions_before_start,
reconfigure_concurrency: 1,
@@ -302,8 +298,6 @@ pub struct Endpoint {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub mode: ComputeMode,
/// If true, the endpoint should use gRPC to communicate with Pageservers.
pub grpc: bool,
// port and address of the Postgres server and `compute_ctl`'s HTTP APIs
pub pg_address: SocketAddr,
@@ -339,7 +333,7 @@ pub enum EndpointStatus {
RunningNoPidfile,
}
impl Display for EndpointStatus {
impl std::fmt::Display for EndpointStatus {
fn fmt(&self, writer: &mut std::fmt::Formatter) -> std::fmt::Result {
let s = match self {
Self::Running => "running",
@@ -351,29 +345,6 @@ impl Display for EndpointStatus {
}
}
/// Protocol used to connect to a Pageserver.
#[derive(Clone, Copy, Debug)]
pub enum PageserverProtocol {
Libpq,
Grpc,
}
impl PageserverProtocol {
/// Returns the URL scheme for the protocol, used in connstrings.
pub fn scheme(&self) -> &'static str {
match self {
Self::Libpq => "postgresql",
Self::Grpc => "grpc",
}
}
}
impl Display for PageserverProtocol {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.scheme())
}
}
impl Endpoint {
fn from_dir_entry(entry: std::fs::DirEntry, env: &LocalEnv) -> Result<Endpoint> {
if !entry.file_type()?.is_dir() {
@@ -409,7 +380,6 @@ impl Endpoint {
mode: conf.mode,
tenant_id: conf.tenant_id,
pg_version: conf.pg_version,
grpc: conf.grpc,
skip_pg_catalog_updates: conf.skip_pg_catalog_updates,
reconfigure_concurrency: conf.reconfigure_concurrency,
drop_subscriptions_before_start: conf.drop_subscriptions_before_start,
@@ -638,10 +608,10 @@ impl Endpoint {
}
}
fn build_pageserver_connstr(pageservers: &[(PageserverProtocol, Host, u16)]) -> String {
fn build_pageserver_connstr(pageservers: &[(Host, u16)]) -> String {
pageservers
.iter()
.map(|(scheme, host, port)| format!("{scheme}://no_user@{host}:{port}"))
.map(|(host, port)| format!("postgresql://no_user@{host}:{port}"))
.collect::<Vec<_>>()
.join(",")
}
@@ -686,7 +656,7 @@ impl Endpoint {
endpoint_storage_addr: String,
safekeepers_generation: Option<SafekeeperGeneration>,
safekeepers: Vec<NodeId>,
pageservers: Vec<(PageserverProtocol, Host, u16)>,
pageservers: Vec<(Host, u16)>,
remote_ext_base_url: Option<&String>,
shard_stripe_size: usize,
create_test_user: bool,
@@ -971,12 +941,10 @@ impl Endpoint {
pub async fn reconfigure(
&self,
pageservers: Vec<(PageserverProtocol, Host, u16)>,
mut pageservers: Vec<(Host, u16)>,
stripe_size: Option<ShardStripeSize>,
safekeepers: Option<Vec<NodeId>>,
) -> Result<()> {
anyhow::ensure!(!pageservers.is_empty(), "no pageservers provided");
let (mut spec, compute_ctl_config) = {
let config_path = self.endpoint_path().join("config.json");
let file = std::fs::File::open(config_path)?;
@@ -988,7 +956,25 @@ impl Endpoint {
let postgresql_conf = self.read_postgresql_conf()?;
spec.cluster.postgresql_conf = Some(postgresql_conf);
// If we weren't given explicit pageservers, query the storage controller
if pageservers.is_empty() {
let storage_controller = StorageController::from_env(&self.env);
let locate_result = storage_controller.tenant_locate(self.tenant_id).await?;
pageservers = locate_result
.shards
.into_iter()
.map(|shard| {
(
Host::parse(&shard.listen_pg_addr)
.expect("Storage controller reported bad hostname"),
shard.listen_pg_port,
)
})
.collect::<Vec<_>>();
}
let pageserver_connstr = Self::build_pageserver_connstr(&pageservers);
assert!(!pageserver_connstr.is_empty());
spec.pageserver_connstring = Some(pageserver_connstr);
if stripe_size.is_some() {
spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize);

View File

@@ -16,7 +16,6 @@ use std::time::Duration;
use anyhow::{Context, bail};
use camino::Utf8PathBuf;
use pageserver_api::config::{DEFAULT_GRPC_LISTEN_PORT, DEFAULT_HTTP_LISTEN_PORT};
use pageserver_api::models::{self, TenantInfo, TimelineInfo};
use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api;
@@ -253,10 +252,9 @@ impl PageServerNode {
// the storage controller
let metadata_path = datadir.join("metadata.json");
let http_host = "localhost".to_string();
let (_, http_port) =
let (_http_host, http_port) =
parse_host_port(&self.conf.listen_http_addr).expect("Unable to parse listen_http_addr");
let http_port = http_port.unwrap_or(DEFAULT_HTTP_LISTEN_PORT);
let http_port = http_port.unwrap_or(9898);
let https_port = match self.conf.listen_https_addr.as_ref() {
Some(https_addr) => {
@@ -267,13 +265,6 @@ impl PageServerNode {
None => None,
};
let (mut grpc_host, mut grpc_port) = (None, None);
if let Some(grpc_addr) = &self.conf.listen_grpc_addr {
let (_, port) = parse_host_port(grpc_addr).expect("Unable to parse listen_grpc_addr");
grpc_host = Some("localhost".to_string());
grpc_port = Some(port.unwrap_or(DEFAULT_GRPC_LISTEN_PORT));
}
// Intentionally hand-craft JSON: this acts as an implicit format compat test
// in case the pageserver-side structure is edited, and reflects the real life
// situation: the metadata is written by some other script.
@@ -282,9 +273,7 @@ impl PageServerNode {
serde_json::to_vec(&pageserver_api::config::NodeMetadata {
postgres_host: "localhost".to_string(),
postgres_port: self.pg_connection_config.port(),
grpc_host,
grpc_port,
http_host,
http_host: "localhost".to_string(),
http_port,
https_port,
other: HashMap::from([(

View File

@@ -36,10 +36,6 @@ enum Command {
listen_pg_addr: String,
#[arg(long)]
listen_pg_port: u16,
#[arg(long)]
listen_grpc_addr: Option<String>,
#[arg(long)]
listen_grpc_port: Option<u16>,
#[arg(long)]
listen_http_addr: String,
@@ -422,8 +418,6 @@ async fn main() -> anyhow::Result<()> {
node_id,
listen_pg_addr,
listen_pg_port,
listen_grpc_addr,
listen_grpc_port,
listen_http_addr,
listen_http_port,
listen_https_port,
@@ -437,8 +431,6 @@ async fn main() -> anyhow::Result<()> {
node_id,
listen_pg_addr,
listen_pg_port,
listen_grpc_addr,
listen_grpc_port,
listen_http_addr,
listen_http_port,
listen_https_port,

View File

@@ -12,7 +12,6 @@ pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LI
pub const DEFAULT_GRPC_LISTEN_PORT: u16 = 51051; // storage-broker already uses 50051
use std::collections::HashMap;
use std::fmt::Display;
use std::num::{NonZeroU64, NonZeroUsize};
use std::str::FromStr;
use std::time::Duration;
@@ -25,17 +24,16 @@ use utils::logging::LogFormat;
use crate::models::{ImageCompressionAlgorithm, LsnLease};
// Certain metadata (e.g. externally-addressable name, AZ) is delivered
// as a separate structure. This information is not needed by the pageserver
// as a separate structure. This information is not neeed by the pageserver
// itself, it is only used for registering the pageserver with the control
// plane and/or storage controller.
//
#[derive(PartialEq, Eq, Debug, serde::Serialize, serde::Deserialize)]
pub struct NodeMetadata {
#[serde(rename = "host")]
pub postgres_host: String,
#[serde(rename = "port")]
pub postgres_port: u16,
pub grpc_host: Option<String>,
pub grpc_port: Option<u16>,
pub http_host: String,
pub http_port: u16,
pub https_port: Option<u16>,
@@ -46,23 +44,6 @@ pub struct NodeMetadata {
pub other: HashMap<String, serde_json::Value>,
}
impl Display for NodeMetadata {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"postgresql://{}:{} ",
self.postgres_host, self.postgres_port
)?;
if let Some(grpc_host) = &self.grpc_host {
let grpc_port = self.grpc_port.unwrap_or_default();
write!(f, "grpc://{grpc_host}:{grpc_port} ")?;
}
write!(f, "http://{}:{} ", self.http_host, self.http_port)?;
write!(f, "other:{:?}", self.other)?;
Ok(())
}
}
/// PostHog integration config.
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct PostHogConfig {
@@ -356,21 +337,16 @@ pub struct TimelineImportConfig {
pub struct BasebackupCacheConfig {
#[serde(with = "humantime_serde")]
pub cleanup_period: Duration,
/// Maximum total size of basebackup cache entries on disk in bytes.
/// The cache may slightly exceed this limit because we do not know
/// the exact size of the cache entry untill it's written to disk.
pub max_total_size_bytes: u64,
// TODO(diko): support max_entry_size_bytes.
// pub max_entry_size_bytes: u64,
pub max_size_entries: usize,
// FIXME: Support max_size_bytes.
// pub max_size_bytes: usize,
pub max_size_entries: i64,
}
impl Default for BasebackupCacheConfig {
fn default() -> Self {
Self {
cleanup_period: Duration::from_secs(60),
max_total_size_bytes: 1024 * 1024 * 1024, // 1 GiB
// max_entry_size_bytes: 16 * 1024 * 1024, // 16 MiB
// max_size_bytes: 1024 * 1024 * 1024, // 1 GiB
max_size_entries: 1000,
}
}

View File

@@ -14,8 +14,6 @@ fn test_node_metadata_v1_backward_compatibilty() {
NodeMetadata {
postgres_host: "localhost".to_string(),
postgres_port: 23,
grpc_host: None,
grpc_port: None,
http_host: "localhost".to_string(),
http_port: 42,
https_port: None,
@@ -39,35 +37,6 @@ fn test_node_metadata_v2_backward_compatibilty() {
NodeMetadata {
postgres_host: "localhost".to_string(),
postgres_port: 23,
grpc_host: None,
grpc_port: None,
http_host: "localhost".to_string(),
http_port: 42,
https_port: Some(123),
other: HashMap::new(),
}
)
}
#[test]
fn test_node_metadata_v3_backward_compatibilty() {
let v3 = serde_json::to_vec(&serde_json::json!({
"host": "localhost",
"port": 23,
"grpc_host": "localhost",
"grpc_port": 51,
"http_host": "localhost",
"http_port": 42,
"https_port": 123,
}));
assert_eq!(
serde_json::from_slice::<NodeMetadata>(&v3.unwrap()).unwrap(),
NodeMetadata {
postgres_host: "localhost".to_string(),
postgres_port: 23,
grpc_host: Some("localhost".to_string()),
grpc_port: Some(51),
http_host: "localhost".to_string(),
http_port: 42,
https_port: Some(123),

View File

@@ -52,8 +52,6 @@ pub struct NodeRegisterRequest {
pub listen_pg_addr: String,
pub listen_pg_port: u16,
pub listen_grpc_addr: Option<String>,
pub listen_grpc_port: Option<u16>,
pub listen_http_addr: String,
pub listen_http_port: u16,
@@ -103,8 +101,6 @@ pub struct TenantLocateResponseShard {
pub listen_pg_addr: String,
pub listen_pg_port: u16,
pub listen_grpc_addr: Option<String>,
pub listen_grpc_port: Option<u16>,
pub listen_http_addr: String,
pub listen_http_port: u16,
@@ -156,8 +152,6 @@ pub struct NodeDescribeResponse {
pub listen_pg_addr: String,
pub listen_pg_port: u16,
pub listen_grpc_addr: Option<String>,
pub listen_grpc_port: Option<u16>,
}
#[derive(Serialize, Deserialize, Debug)]

View File

@@ -20,7 +20,7 @@
//!
//! # local timeline dir
//! ls test_output/test_pgbench\[neon-45-684\]/repo/tenants/$TENANT/timelines/$TIMELINE | \
//! grep "__" | cargo run --release --bin pagectl draw-timeline > out.svg
//! grep "__" | cargo run --release --bin pagectl draw-timeline-dir > out.svg
//!
//! # Layer map dump from `/v1/tenant/$TENANT/timeline/$TIMELINE/layer`
//! (jq -r '.historic_layers[] | .layer_file_name' | cargo run -p pagectl draw-timeline) < layer-map.json > out.svg
@@ -81,11 +81,7 @@ fn build_coordinate_compression_map<T: Ord + Copy>(coords: Vec<T>) -> BTreeMap<T
fn parse_filename(name: &str) -> (Range<Key>, Range<Lsn>) {
let split: Vec<&str> = name.split("__").collect();
let keys: Vec<&str> = split[0].split('-').collect();
// Remove the temporary file extension, e.g., remove the `.d20a.___temp` part from the following filename:
// 000000067F000040490000404A00441B0000-000000067F000040490000404A00441B4000__000043483A34CE00.d20a.___temp
let lsns = split[1].split('.').collect::<Vec<&str>>()[0];
let mut lsns: Vec<&str> = lsns.split('-').collect();
let mut lsns: Vec<&str> = split[1].split('-').collect();
// The current format of the layer file name: 000000067F0000000400000B150100000000-000000067F0000000400000D350100000000__00000000014B7AC8-v1-00000001

View File

@@ -13,7 +13,7 @@ use pageserver::{page_cache, virtual_file};
use pageserver_api::key::Key;
use utils::id::{TenantId, TimelineId};
use crate::layer_map_analyzer::{LayerFile, parse_filename};
use crate::layer_map_analyzer::parse_filename;
#[derive(Subcommand)]
pub(crate) enum LayerCmd {
@@ -38,8 +38,6 @@ pub(crate) enum LayerCmd {
/// The id from list-layer command
id: usize,
},
/// Dump all information of a layer file locally
DumpLayerLocal { path: PathBuf },
RewriteSummary {
layer_file_path: Utf8PathBuf,
#[clap(long)]
@@ -133,7 +131,15 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
}
for (idx, layer_file) in to_print {
print_layer_file(idx, &layer_file);
println!(
"[{:3}] key:{}-{}\n lsn:{}-{}\n delta:{}",
idx,
layer_file.key_range.start,
layer_file.key_range.end,
layer_file.lsn_range.start,
layer_file.lsn_range.end,
layer_file.is_delta,
);
}
Ok(())
}
@@ -153,7 +159,16 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
let layer = layer?;
if let Ok(layer_file) = parse_filename(&layer.file_name().into_string().unwrap()) {
if *id == idx {
print_layer_file(idx, &layer_file);
// TODO(chi): dedup code
println!(
"[{:3}] key:{}-{}\n lsn:{}-{}\n delta:{}",
idx,
layer_file.key_range.start,
layer_file.key_range.end,
layer_file.lsn_range.start,
layer_file.lsn_range.end,
layer_file.is_delta,
);
if layer_file.is_delta {
read_delta_file(layer.path(), &ctx).await?;
@@ -168,18 +183,6 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
}
Ok(())
}
LayerCmd::DumpLayerLocal { path } => {
if let Ok(layer_file) = parse_filename(path.file_name().unwrap().to_str().unwrap()) {
print_layer_file(0, &layer_file);
if layer_file.is_delta {
read_delta_file(path, &ctx).await?;
} else {
read_image_file(path, &ctx).await?;
}
}
Ok(())
}
LayerCmd::RewriteSummary {
layer_file_path,
new_tenant_id,
@@ -244,15 +247,3 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
}
}
}
fn print_layer_file(idx: usize, layer_file: &LayerFile) {
println!(
"[{:3}] key:{}-{}\n lsn:{}-{}\n delta:{}",
idx,
layer_file.key_range.start,
layer_file.key_range.end,
layer_file.lsn_range.start,
layer_file.lsn_range.end,
layer_file.is_delta,
);
}

View File

@@ -102,10 +102,10 @@ message CheckRelExistsResponse {
bool exists = 1;
}
// Requests a base backup.
// Requests a base backup at a given LSN.
message GetBaseBackupRequest {
// The LSN to fetch the base backup at. 0 or absent means the latest LSN known to the Pageserver.
uint64 lsn = 1;
// The LSN to fetch a base backup at.
ReadLsn read_lsn = 1;
// If true, logical replication slots will not be created.
bool replica = 2;
}

View File

@@ -26,7 +26,7 @@ use utils::lsn::Lsn;
use crate::proto;
/// A protocol error. Typically returned via try_from() or try_into().
#[derive(thiserror::Error, Clone, Debug)]
#[derive(thiserror::Error, Debug)]
pub enum ProtocolError {
#[error("field '{0}' has invalid value '{1}'")]
Invalid(&'static str, String),
@@ -182,28 +182,33 @@ impl From<CheckRelExistsResponse> for proto::CheckRelExistsResponse {
}
}
/// Requests a base backup.
/// Requests a base backup at a given LSN.
#[derive(Clone, Copy, Debug)]
pub struct GetBaseBackupRequest {
/// The LSN to fetch a base backup at. If None, uses the latest LSN known to the Pageserver.
pub lsn: Option<Lsn>,
/// The LSN to fetch a base backup at.
pub read_lsn: ReadLsn,
/// If true, logical replication slots will not be created.
pub replica: bool,
}
impl From<proto::GetBaseBackupRequest> for GetBaseBackupRequest {
fn from(pb: proto::GetBaseBackupRequest) -> Self {
Self {
lsn: (pb.lsn != 0).then_some(Lsn(pb.lsn)),
impl TryFrom<proto::GetBaseBackupRequest> for GetBaseBackupRequest {
type Error = ProtocolError;
fn try_from(pb: proto::GetBaseBackupRequest) -> Result<Self, Self::Error> {
Ok(Self {
read_lsn: pb
.read_lsn
.ok_or(ProtocolError::Missing("read_lsn"))?
.try_into()?,
replica: pb.replica,
}
})
}
}
impl From<GetBaseBackupRequest> for proto::GetBaseBackupRequest {
fn from(request: GetBaseBackupRequest) -> Self {
Self {
lsn: request.lsn.unwrap_or_default().0,
read_lsn: Some(request.read_lsn.into()),
replica: request.replica,
}
}
@@ -417,39 +422,6 @@ impl From<GetPageResponse> for proto::GetPageResponse {
}
}
impl GetPageResponse {
/// Attempts to represent a tonic::Status as a GetPageResponse if appropriate. Returning a
/// tonic::Status will terminate the GetPage stream, so per-request errors are emitted as a
/// GetPageResponse with a non-OK status code instead.
#[allow(clippy::result_large_err)]
pub fn try_from_status(
status: tonic::Status,
request_id: RequestID,
) -> Result<Self, tonic::Status> {
// We shouldn't see an OK status here, because we're emitting an error.
debug_assert_ne!(status.code(), tonic::Code::Ok);
if status.code() == tonic::Code::Ok {
return Err(tonic::Status::internal(format!(
"unexpected OK status: {status:?}",
)));
}
// If we can't convert the tonic::Code to a GetPageStatusCode, this is not a per-request
// error and we should return a tonic::Status to terminate the stream.
let Ok(status_code) = status.code().try_into() else {
return Err(status);
};
// Return a GetPageResponse for the status.
Ok(Self {
request_id,
status_code,
reason: Some(status.message().to_string()),
page_images: Vec::new(),
})
}
}
/// A GetPage response status code.
///
/// These are effectively equivalent to gRPC statuses. However, we use a bidirectional stream
@@ -513,42 +485,8 @@ impl From<GetPageStatusCode> for i32 {
}
}
impl TryFrom<tonic::Code> for GetPageStatusCode {
type Error = tonic::Code;
fn try_from(code: tonic::Code) -> Result<Self, Self::Error> {
use tonic::Code;
let status_code = match code {
Code::Ok => Self::Ok,
// These are per-request errors, which should be returned as GetPageResponses.
Code::AlreadyExists => Self::InvalidRequest,
Code::DataLoss => Self::InternalError,
Code::FailedPrecondition => Self::InvalidRequest,
Code::InvalidArgument => Self::InvalidRequest,
Code::Internal => Self::InternalError,
Code::NotFound => Self::NotFound,
Code::OutOfRange => Self::InvalidRequest,
Code::ResourceExhausted => Self::SlowDown,
// These should terminate the stream by returning a tonic::Status.
Code::Aborted
| Code::Cancelled
| Code::DeadlineExceeded
| Code::PermissionDenied
| Code::Unauthenticated
| Code::Unavailable
| Code::Unimplemented
| Code::Unknown => return Err(code),
};
Ok(status_code)
}
}
// Fetches the size of a relation at a given LSN, as # of blocks. Only valid on shard 0, other
// shards will error.
#[derive(Clone, Copy, Debug)]
pub struct GetRelSizeRequest {
pub read_lsn: ReadLsn,
pub rel: RelTag,
@@ -592,7 +530,6 @@ impl From<GetRelSizeResponse> for proto::GetRelSizeResponse {
}
/// Requests an SLRU segment. Only valid on shard 0, other shards will error.
#[derive(Clone, Copy, Debug)]
pub struct GetSlruSegmentRequest {
pub read_lsn: ReadLsn,
pub kind: SlruKind,

View File

@@ -19,10 +19,7 @@ use utils::{
use crate::{
basebackup::send_basebackup_tarball,
context::{DownloadBehavior, RequestContext},
metrics::{
BASEBACKUP_CACHE_ENTRIES, BASEBACKUP_CACHE_PREPARE, BASEBACKUP_CACHE_READ,
BASEBACKUP_CACHE_SIZE,
},
metrics::{BASEBACKUP_CACHE_ENTRIES, BASEBACKUP_CACHE_PREPARE, BASEBACKUP_CACHE_READ},
task_mgr::TaskKind,
tenant::{
Timeline,
@@ -39,13 +36,8 @@ pub struct BasebackupPrepareRequest {
pub type BasebackupPrepareSender = UnboundedSender<BasebackupPrepareRequest>;
pub type BasebackupPrepareReceiver = UnboundedReceiver<BasebackupPrepareRequest>;
#[derive(Clone)]
struct CacheEntry {
/// LSN at which the basebackup was taken.
lsn: Lsn,
/// Size of the basebackup archive in bytes.
size_bytes: u64,
}
type BasebackupRemoveEntrySender = UnboundedSender<Utf8PathBuf>;
type BasebackupRemoveEntryReceiver = UnboundedReceiver<Utf8PathBuf>;
/// BasebackupCache stores cached basebackup archives for timelines on local disk.
///
@@ -61,12 +53,21 @@ struct CacheEntry {
/// and ~1 RPS for get requests.
pub struct BasebackupCache {
data_dir: Utf8PathBuf,
config: BasebackupCacheConfig,
tenant_manager: Arc<TenantManager>,
remove_entry_sender: BasebackupRemoveEntrySender,
entries: std::sync::Mutex<HashMap<TenantTimelineId, CacheEntry>>,
entries: std::sync::Mutex<HashMap<TenantTimelineId, Lsn>>,
cancel: CancellationToken,
read_hit_count: GenericCounter<AtomicU64>,
read_miss_count: GenericCounter<AtomicU64>,
read_err_count: GenericCounter<AtomicU64>,
prepare_ok_count: GenericCounter<AtomicU64>,
prepare_skip_count: GenericCounter<AtomicU64>,
prepare_err_count: GenericCounter<AtomicU64>,
}
impl BasebackupCache {
@@ -82,32 +83,35 @@ impl BasebackupCache {
tenant_manager: Arc<TenantManager>,
cancel: CancellationToken,
) -> Arc<Self> {
let (remove_entry_sender, remove_entry_receiver) = tokio::sync::mpsc::unbounded_channel();
let enabled = config.is_some();
let cache = Arc::new(BasebackupCache {
data_dir,
config: config.unwrap_or_default(),
tenant_manager,
remove_entry_sender,
entries: std::sync::Mutex::new(HashMap::new()),
cancel,
read_hit_count: BASEBACKUP_CACHE_READ.with_label_values(&["hit"]),
read_miss_count: BASEBACKUP_CACHE_READ.with_label_values(&["miss"]),
read_err_count: BASEBACKUP_CACHE_READ.with_label_values(&["error"]),
prepare_ok_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["ok"]),
prepare_skip_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["skip"]),
prepare_err_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["error"]),
});
if let Some(config) = config {
let background = BackgroundTask {
c: cache.clone(),
config,
tenant_manager,
cancel,
entry_count: 0,
total_size_bytes: 0,
prepare_ok_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["ok"]),
prepare_skip_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["skip"]),
prepare_err_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["error"]),
};
runtime_handle.spawn(background.run(prepare_receiver));
if enabled {
runtime_handle.spawn(
cache
.clone()
.background(prepare_receiver, remove_entry_receiver),
);
}
cache
@@ -125,7 +129,7 @@ impl BasebackupCache {
) -> Option<tokio::fs::File> {
// Fast path. Check if the entry exists using the in-memory state.
let tti = TenantTimelineId::new(tenant_id, timeline_id);
if self.entries.lock().unwrap().get(&tti).map(|e| e.lsn) != Some(lsn) {
if self.entries.lock().unwrap().get(&tti) != Some(&lsn) {
self.read_miss_count.inc();
return None;
}
@@ -163,41 +167,9 @@ impl BasebackupCache {
self.data_dir
.join(Self::entry_filename(tenant_id, timeline_id, lsn))
}
}
/// The background task that does the job to prepare basebackups
/// and manage the cache entries on disk.
/// It is a separate struct from BasebackupCache to allow holding
/// a mutable reference to this state without a mutex lock,
/// while BasebackupCache is referenced by the clients.
struct BackgroundTask {
c: Arc<BasebackupCache>,
config: BasebackupCacheConfig,
tenant_manager: Arc<TenantManager>,
cancel: CancellationToken,
/// Number of the entries in the cache.
/// This counter is used for metrics and applying cache limits.
/// It generally should be equal to c.entries.len(), but it's calculated
/// pessimistically for abnormal situations: if we encountered some errors
/// during removing the entry from disk, we won't decrement this counter to
/// make sure that we don't exceed the limit with "trashed" files on the disk.
/// It will also count files in the data_dir that are not valid cache entries.
entry_count: usize,
/// Total size of all the entries on the disk.
/// This counter is used for metrics and applying cache limits.
/// Similar to entry_count, it is calculated pessimistically for abnormal situations.
total_size_bytes: u64,
prepare_ok_count: GenericCounter<AtomicU64>,
prepare_skip_count: GenericCounter<AtomicU64>,
prepare_err_count: GenericCounter<AtomicU64>,
}
impl BackgroundTask {
fn tmp_dir(&self) -> Utf8PathBuf {
self.c.data_dir.join("tmp")
self.data_dir.join("tmp")
}
fn entry_tmp_path(
@@ -207,7 +179,7 @@ impl BackgroundTask {
lsn: Lsn,
) -> Utf8PathBuf {
self.tmp_dir()
.join(BasebackupCache::entry_filename(tenant_id, timeline_id, lsn))
.join(Self::entry_filename(tenant_id, timeline_id, lsn))
}
fn parse_entry_filename(filename: &str) -> Option<(TenantId, TimelineId, Lsn)> {
@@ -236,11 +208,11 @@ impl BackgroundTask {
Ok(())
}
async fn cleanup(&mut self) -> anyhow::Result<()> {
async fn cleanup(&self) -> anyhow::Result<()> {
self.clean_tmp_dir().await?;
// Leave only up-to-date entries.
let entries_old = self.c.entries.lock().unwrap().clone();
// Remove outdated entries.
let entries_old = self.entries.lock().unwrap().clone();
let mut entries_new = HashMap::new();
for (tenant_shard_id, tenant_slot) in self.tenant_manager.list() {
if !tenant_shard_id.is_shard_zero() {
@@ -253,32 +225,31 @@ impl BackgroundTask {
for timeline in tenant.list_timelines() {
let tti = TenantTimelineId::new(tenant_id, timeline.timeline_id);
if let Some(entry) = entries_old.get(&tti) {
if timeline.get_last_record_lsn() <= entry.lsn {
entries_new.insert(tti, entry.clone());
if let Some(&entry_lsn) = entries_old.get(&tti) {
if timeline.get_last_record_lsn() <= entry_lsn {
entries_new.insert(tti, entry_lsn);
}
}
}
}
// Try to remove all entries that are not up-to-date.
for (&tti, entry) in entries_old.iter() {
for (&tti, &lsn) in entries_old.iter() {
if !entries_new.contains_key(&tti) {
self.try_remove_entry(tti.tenant_id, tti.timeline_id, entry)
.await;
self.remove_entry_sender
.send(self.entry_path(tti.tenant_id, tti.timeline_id, lsn))
.unwrap();
}
}
// Note: BackgroundTask is the only writer for self.c.entries,
// so it couldn't have been modified concurrently.
*self.c.entries.lock().unwrap() = entries_new;
BASEBACKUP_CACHE_ENTRIES.set(entries_new.len() as i64);
*self.entries.lock().unwrap() = entries_new;
Ok(())
}
async fn on_startup(&mut self) -> anyhow::Result<()> {
async fn on_startup(&self) -> anyhow::Result<()> {
// Create data_dir if it does not exist.
tokio::fs::create_dir_all(&self.c.data_dir)
tokio::fs::create_dir_all(&self.data_dir)
.await
.context("Failed to create basebackup cache data directory")?;
@@ -287,8 +258,8 @@ impl BackgroundTask {
.context("Failed to clean tmp directory")?;
// Read existing entries from the data_dir and add them to in-memory state.
let mut entries = HashMap::<TenantTimelineId, CacheEntry>::new();
let mut dir = tokio::fs::read_dir(&self.c.data_dir).await?;
let mut entries = HashMap::new();
let mut dir = tokio::fs::read_dir(&self.data_dir).await?;
while let Some(dir_entry) = dir.next_entry().await? {
let filename = dir_entry.file_name();
@@ -297,43 +268,33 @@ impl BackgroundTask {
continue;
}
let size_bytes = dir_entry
.metadata()
.await
.map_err(|e| {
anyhow::anyhow!("Failed to read metadata for file {:?}: {:?}", filename, e)
})?
.len();
self.entry_count += 1;
BASEBACKUP_CACHE_ENTRIES.set(self.entry_count as u64);
self.total_size_bytes += size_bytes;
BASEBACKUP_CACHE_SIZE.set(self.total_size_bytes);
let parsed = Self::parse_entry_filename(filename.to_string_lossy().as_ref());
let Some((tenant_id, timeline_id, lsn)) = parsed else {
tracing::warn!("Invalid basebackup cache file name: {:?}", filename);
continue;
};
let cur_entry = CacheEntry { lsn, size_bytes };
let tti = TenantTimelineId::new(tenant_id, timeline_id);
use std::collections::hash_map::Entry::*;
match entries.entry(tti) {
Occupied(mut entry) => {
let found_entry = entry.get();
let entry_lsn = *entry.get();
// Leave only the latest entry, remove the old one.
if cur_entry.lsn < found_entry.lsn {
self.try_remove_entry(tenant_id, timeline_id, &cur_entry)
.await;
} else if cur_entry.lsn > found_entry.lsn {
self.try_remove_entry(tenant_id, timeline_id, found_entry)
.await;
entry.insert(cur_entry);
if lsn < entry_lsn {
self.remove_entry_sender.send(self.entry_path(
tenant_id,
timeline_id,
lsn,
))?;
} else if lsn > entry_lsn {
self.remove_entry_sender.send(self.entry_path(
tenant_id,
timeline_id,
entry_lsn,
))?;
entry.insert(lsn);
} else {
// Two different filenames parsed to the same timline_id and LSN.
// Should never happen.
@@ -344,17 +305,22 @@ impl BackgroundTask {
}
}
Vacant(entry) => {
entry.insert(cur_entry);
entry.insert(lsn);
}
}
}
*self.c.entries.lock().unwrap() = entries;
BASEBACKUP_CACHE_ENTRIES.set(entries.len() as i64);
*self.entries.lock().unwrap() = entries;
Ok(())
}
async fn run(mut self, mut prepare_receiver: BasebackupPrepareReceiver) {
async fn background(
self: Arc<Self>,
mut prepare_receiver: BasebackupPrepareReceiver,
mut remove_entry_receiver: BasebackupRemoveEntryReceiver,
) {
// Panic in the background is a safe fallback.
// It will drop receivers and the cache will be effectively disabled.
self.on_startup()
@@ -377,6 +343,11 @@ impl BackgroundTask {
continue;
}
}
Some(req) = remove_entry_receiver.recv() => {
if let Err(e) = tokio::fs::remove_file(req).await {
tracing::warn!("Failed to remove basebackup cache file: {:#}", e);
}
}
_ = cleanup_ticker.tick() => {
self.cleanup().await.unwrap_or_else(|e| {
tracing::warn!("Failed to clean up basebackup cache: {:#}", e);
@@ -390,67 +361,6 @@ impl BackgroundTask {
}
}
/// Try to remove an entry from disk.
/// The caller is responsible for removing the entry from the in-memory state.
/// Updates size counters and corresponding metrics.
/// Ignores the filesystem errors as not-so-important, but the size counters
/// are not decremented in this case, so the file will continue to be counted
/// towards the size limits.
async fn try_remove_entry(
&mut self,
tenant_id: TenantId,
timeline_id: TimelineId,
entry: &CacheEntry,
) {
let entry_path = self.c.entry_path(tenant_id, timeline_id, entry.lsn);
match tokio::fs::remove_file(&entry_path).await {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => {
tracing::warn!(
"Failed to remove basebackup cache file for tenant {} timeline {} LSN {}: {:#}",
tenant_id,
timeline_id,
entry.lsn,
e
);
return;
}
}
self.entry_count -= 1;
BASEBACKUP_CACHE_ENTRIES.set(self.entry_count as u64);
self.total_size_bytes -= entry.size_bytes;
BASEBACKUP_CACHE_SIZE.set(self.total_size_bytes);
}
/// Insert the cache entry into in-memory state and update the size counters.
/// Assumes that the file for the entry already exists on disk.
/// If the entry already exists with previous LSN, it will be removed.
async fn upsert_entry(
&mut self,
tenant_id: TenantId,
timeline_id: TimelineId,
entry: CacheEntry,
) {
let tti = TenantTimelineId::new(tenant_id, timeline_id);
self.entry_count += 1;
BASEBACKUP_CACHE_ENTRIES.set(self.entry_count as u64);
self.total_size_bytes += entry.size_bytes;
BASEBACKUP_CACHE_SIZE.set(self.total_size_bytes);
let old_entry = self.c.entries.lock().unwrap().insert(tti, entry);
if let Some(old_entry) = old_entry {
self.try_remove_entry(tenant_id, timeline_id, &old_entry)
.await;
}
}
/// Prepare a basebackup for the given timeline.
///
/// If the basebackup already exists with a higher LSN or the timeline already
@@ -459,7 +369,7 @@ impl BackgroundTask {
/// The basebackup is prepared in a temporary directory and then moved to the final
/// location to make the operation atomic.
async fn prepare_basebackup(
&mut self,
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
req_lsn: Lsn,
@@ -473,44 +383,30 @@ impl BackgroundTask {
let tti = TenantTimelineId::new(tenant_shard_id.tenant_id, timeline_id);
// TODO(diko): I don't think we will hit the limit,
// but if we do, it makes sense to try to evict oldest entries. here
if self.entry_count >= self.config.max_size_entries {
tracing::info!(
%tenant_shard_id,
%timeline_id,
%req_lsn,
"Basebackup cache is full (max_size_entries), skipping basebackup",
);
self.prepare_skip_count.inc();
return Ok(());
}
if self.total_size_bytes >= self.config.max_total_size_bytes {
tracing::info!(
%tenant_shard_id,
%timeline_id,
%req_lsn,
"Basebackup cache is full (max_total_size_bytes), skipping basebackup",
);
self.prepare_skip_count.inc();
return Ok(());
}
{
let entries = self.c.entries.lock().unwrap();
if let Some(entry) = entries.get(&tti) {
if entry.lsn >= req_lsn {
let entries = self.entries.lock().unwrap();
if let Some(&entry_lsn) = entries.get(&tti) {
if entry_lsn >= req_lsn {
tracing::info!(
%timeline_id,
%req_lsn,
%entry.lsn,
%entry_lsn,
"Basebackup entry already exists for timeline with higher LSN, skipping basebackup",
);
self.prepare_skip_count.inc();
return Ok(());
}
}
if entries.len() as i64 >= self.config.max_size_entries {
tracing::info!(
%timeline_id,
%req_lsn,
"Basebackup cache is full, skipping basebackup",
);
self.prepare_skip_count.inc();
return Ok(());
}
}
let tenant = self
@@ -546,21 +442,18 @@ impl BackgroundTask {
.prepare_basebackup_tmp(&entry_tmp_path, &timeline, req_lsn)
.await;
let entry = match res {
Ok(entry) => entry,
Err(err) => {
tracing::info!("Failed to prepare basebackup tmp file: {:#}", err);
// Try to clean up tmp file. If we fail, the background clean up task will take care of it.
match tokio::fs::remove_file(&entry_tmp_path).await {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => {
tracing::info!("Failed to remove basebackup tmp file: {:?}", e);
}
if let Err(err) = res {
tracing::info!("Failed to prepare basebackup tmp file: {:#}", err);
// Try to clean up tmp file. If we fail, the background clean up task will take care of it.
match tokio::fs::remove_file(&entry_tmp_path).await {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => {
tracing::info!("Failed to remove basebackup tmp file: {:?}", e);
}
return Err(err);
}
};
return Err(err);
}
// Move the tmp file to the final location atomically.
// The tmp file is fsynced, so it's guaranteed that we will not have a partial file
@@ -568,13 +461,17 @@ impl BackgroundTask {
// It's not necessary to fsync the inode after renaming, because the worst case is that
// the rename operation will be rolled back on the disk failure, the entry will disappear
// from the main directory, and the entry access will cause a cache miss.
let entry_path = self
.c
.entry_path(tenant_shard_id.tenant_id, timeline_id, req_lsn);
let entry_path = self.entry_path(tenant_shard_id.tenant_id, timeline_id, req_lsn);
tokio::fs::rename(&entry_tmp_path, &entry_path).await?;
self.upsert_entry(tenant_shard_id.tenant_id, timeline_id, entry)
.await;
let mut entries = self.entries.lock().unwrap();
if let Some(old_lsn) = entries.insert(tti, req_lsn) {
// Remove the old entry if it exists.
self.remove_entry_sender
.send(self.entry_path(tenant_shard_id.tenant_id, timeline_id, old_lsn))
.unwrap();
}
BASEBACKUP_CACHE_ENTRIES.set(entries.len() as i64);
self.prepare_ok_count.inc();
Ok(())
@@ -587,7 +484,7 @@ impl BackgroundTask {
entry_tmp_path: &Utf8Path,
timeline: &Arc<Timeline>,
req_lsn: Lsn,
) -> anyhow::Result<CacheEntry> {
) -> anyhow::Result<()> {
let ctx = RequestContext::new(TaskKind::BasebackupCache, DownloadBehavior::Download);
let ctx = ctx.with_scope_timeline(timeline);
@@ -627,12 +524,6 @@ impl BackgroundTask {
writer.flush().await?;
writer.into_inner().sync_all().await?;
// TODO(diko): we can count it via Writer wrapper instead of a syscall.
let size_bytes = tokio::fs::metadata(entry_tmp_path).await?.len();
Ok(CacheEntry {
lsn: req_lsn,
size_bytes,
})
Ok(())
}
}

View File

@@ -159,7 +159,14 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient {
Ok(m) => {
// Since we run one time at startup, be generous in our logging and
// dump all metadata.
tracing::info!("Loaded node metadata: {m}");
tracing::info!(
"Loaded node metadata: postgres {}:{}, http {}:{}, other fields: {:?}",
m.postgres_host,
m.postgres_port,
m.http_host,
m.http_port,
m.other
);
let az_id = {
let az_id_from_metadata = m
@@ -188,8 +195,6 @@ impl StorageControllerUpcallApi for StorageControllerUpcallClient {
node_id: conf.id,
listen_pg_addr: m.postgres_host,
listen_pg_port: m.postgres_port,
listen_grpc_addr: m.grpc_host,
listen_grpc_port: m.grpc_port,
listen_http_addr: m.http_host,
listen_http_port: m.http_port,
listen_https_port: m.https_port,

View File

@@ -4428,16 +4428,18 @@ pub(crate) static BASEBACKUP_CACHE_PREPARE: Lazy<IntCounterVec> = Lazy::new(|| {
.expect("failed to define a metric")
});
pub(crate) static BASEBACKUP_CACHE_ENTRIES: Lazy<UIntGauge> = Lazy::new(|| {
register_uint_gauge!(
pub(crate) static BASEBACKUP_CACHE_ENTRIES: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"pageserver_basebackup_cache_entries_total",
"Number of entries in the basebackup cache"
)
.expect("failed to define a metric")
});
pub(crate) static BASEBACKUP_CACHE_SIZE: Lazy<UIntGauge> = Lazy::new(|| {
register_uint_gauge!(
// FIXME: Support basebackup cache size metrics.
#[allow(dead_code)]
pub(crate) static BASEBACKUP_CACHE_SIZE: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"pageserver_basebackup_cache_size_bytes",
"Total size of all basebackup cache entries on disk in bytes"
)

View File

@@ -14,7 +14,7 @@ use std::{io, str};
use anyhow::{Context as _, anyhow, bail};
use async_compression::tokio::write::GzipEncoder;
use bytes::{Buf as _, BufMut as _, BytesMut};
use bytes::{Buf, BytesMut};
use futures::future::BoxFuture;
use futures::{FutureExt, Stream};
use itertools::Itertools;
@@ -623,6 +623,60 @@ enum PageStreamError {
BadRequest(Cow<'static, str>),
}
impl PageStreamError {
/// Converts a PageStreamError into a proto::GetPageResponse with the appropriate status
/// code, or a gRPC status if it should terminate the stream (e.g. shutdown). This is a
/// convenience method for use from a get_pages gRPC stream.
#[allow(clippy::result_large_err)]
fn into_get_page_response(
self,
request_id: page_api::RequestID,
) -> Result<proto::GetPageResponse, tonic::Status> {
use page_api::GetPageStatusCode;
use tonic::Code;
// We dispatch to Into<tonic::Status> first, and then map it to a GetPageResponse.
let status: tonic::Status = self.into();
let status_code = match status.code() {
// We shouldn't see an OK status here, because we're emitting an error.
Code::Ok => {
debug_assert_ne!(status.code(), Code::Ok);
return Err(tonic::Status::internal(format!(
"unexpected OK status: {status:?}",
)));
}
// These are per-request errors, returned as GetPageResponses.
Code::AlreadyExists => GetPageStatusCode::InvalidRequest,
Code::DataLoss => GetPageStatusCode::InternalError,
Code::FailedPrecondition => GetPageStatusCode::InvalidRequest,
Code::InvalidArgument => GetPageStatusCode::InvalidRequest,
Code::Internal => GetPageStatusCode::InternalError,
Code::NotFound => GetPageStatusCode::NotFound,
Code::OutOfRange => GetPageStatusCode::InvalidRequest,
Code::ResourceExhausted => GetPageStatusCode::SlowDown,
// These should terminate the stream.
Code::Aborted => return Err(status),
Code::Cancelled => return Err(status),
Code::DeadlineExceeded => return Err(status),
Code::PermissionDenied => return Err(status),
Code::Unauthenticated => return Err(status),
Code::Unavailable => return Err(status),
Code::Unimplemented => return Err(status),
Code::Unknown => return Err(status),
};
Ok(page_api::GetPageResponse {
request_id,
status_code,
reason: Some(status.message().to_string()),
page_images: Vec::new(),
}
.into())
}
}
impl From<PageStreamError> for tonic::Status {
fn from(err: PageStreamError) -> Self {
use tonic::Code;
@@ -3384,8 +3438,8 @@ impl GrpcPageServiceHandler {
/// Processes a GetPage batch request, via the GetPages bidirectional streaming RPC.
///
/// NB: errors returned from here are intercepted in get_pages(), and may be converted to a
/// GetPageResponse with an appropriate status code to avoid terminating the stream.
/// NB: errors will terminate the stream. Per-request errors should return a GetPageResponse
/// with an appropriate status code instead.
///
/// TODO: get_vectored() currently enforces a batch limit of 32. Postgres will typically send
/// batches up to effective_io_concurrency = 100. Either we have to accept large batches, or
@@ -3402,7 +3456,7 @@ impl GrpcPageServiceHandler {
let ctx = ctx.with_scope_page_service_pagestream(&timeline);
// Validate the request, decorate the span, and convert it to a Pagestream request.
let req = page_api::GetPageRequest::try_from(req)?;
let req: page_api::GetPageRequest = req.try_into()?;
span_record!(
req_id = %req.request_id,
@@ -3413,7 +3467,7 @@ impl GrpcPageServiceHandler {
);
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn(); // hold guard
let effective_lsn = PageServerHandler::effective_request_lsn(
let effective_lsn = match PageServerHandler::effective_request_lsn(
&timeline,
timeline.get_last_record_lsn(),
req.read_lsn.request_lsn,
@@ -3421,7 +3475,10 @@ impl GrpcPageServiceHandler {
.not_modified_since_lsn
.unwrap_or(req.read_lsn.request_lsn),
&latest_gc_cutoff_lsn,
)?;
) {
Ok(lsn) => lsn,
Err(err) => return err.into_get_page_response(req.request_id),
};
let mut batch = SmallVec::with_capacity(req.block_numbers.len());
for blkno in req.block_numbers {
@@ -3478,7 +3535,7 @@ impl GrpcPageServiceHandler {
"unexpected response: {resp:?}"
)));
}
Err(err) => return Err(err.err.into()),
Err(err) => return err.err.into_get_page_response(req.request_id),
};
}
@@ -3544,44 +3601,42 @@ impl proto::PageService for GrpcPageServiceHandler {
let timeline = self.get_request_timeline(&req).await?;
let ctx = self.ctx.with_scope_timeline(&timeline);
// Validate the request and decorate the span.
// Validate the request, decorate the span, and wait for the LSN to arrive.
//
// TODO: this requires a read LSN, is that ok?
Self::ensure_shard_zero(&timeline)?;
if timeline.is_archived() == Some(true) {
return Err(tonic::Status::failed_precondition("timeline is archived"));
}
let req: page_api::GetBaseBackupRequest = req.into_inner().into();
let req: page_api::GetBaseBackupRequest = req.into_inner().try_into()?;
span_record!(lsn=?req.lsn);
span_record!(lsn=%req.read_lsn);
// Wait for the LSN to arrive, if given.
if let Some(lsn) = req.lsn {
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
timeline
.wait_lsn(
lsn,
WaitLsnWaiter::PageService,
WaitLsnTimeout::Default,
&ctx,
)
.await?;
timeline
.check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
.map_err(|err| {
tonic::Status::invalid_argument(format!("invalid basebackup LSN: {err}"))
})?;
}
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
timeline
.wait_lsn(
req.read_lsn.request_lsn,
WaitLsnWaiter::PageService,
WaitLsnTimeout::Default,
&ctx,
)
.await?;
timeline
.check_lsn_is_in_scope(req.read_lsn.request_lsn, &latest_gc_cutoff_lsn)
.map_err(|err| {
tonic::Status::invalid_argument(format!("invalid basebackup LSN: {err}"))
})?;
// Spawn a task to run the basebackup.
//
// TODO: do we need to support full base backups, for debugging? This also requires passing
// the prev_lsn parameter.
// TODO: do we need to support full base backups, for debugging?
let span = Span::current();
let (mut simplex_read, mut simplex_write) = tokio::io::simplex(CHUNK_SIZE);
let jh = tokio::spawn(async move {
let result = basebackup::send_basebackup_tarball(
&mut simplex_write,
&timeline,
req.lsn,
Some(req.read_lsn.request_lsn),
None,
false,
req.replica,
@@ -3597,21 +3652,20 @@ impl proto::PageService for GrpcPageServiceHandler {
// Emit chunks of size CHUNK_SIZE.
let chunks = async_stream::try_stream! {
let mut chunk = BytesMut::with_capacity(CHUNK_SIZE);
loop {
let mut chunk = BytesMut::with_capacity(CHUNK_SIZE).limit(CHUNK_SIZE);
loop {
let n = simplex_read.read_buf(&mut chunk).await.map_err(|err| {
tonic::Status::internal(format!("failed to read basebackup chunk: {err}"))
})?;
if n == 0 {
break; // full chunk or closed stream
let n = simplex_read.read_buf(&mut chunk).await.map_err(|err| {
tonic::Status::internal(format!("failed to read basebackup chunk: {err}"))
})?;
// If we read 0 bytes, either the chunk is full or the stream is closed.
if n == 0 {
if chunk.is_empty() {
break;
}
yield proto::GetBaseBackupResponseChunk::from(chunk.clone().freeze());
chunk.clear();
}
let chunk = chunk.into_inner().freeze();
if chunk.is_empty() {
break;
}
yield proto::GetBaseBackupResponseChunk::from(chunk);
}
// Wait for the basebackup task to exit and check for errors.
jh.await.map_err(|err| {
@@ -3688,16 +3742,9 @@ impl proto::PageService for GrpcPageServiceHandler {
.await?
.downgrade();
while let Some(req) = reqs.message().await? {
let req_id = req.request_id;
let result = Self::get_page(&ctx, &timeline, req, io_concurrency.clone())
yield Self::get_page(&ctx, &timeline, req, io_concurrency.clone())
.instrument(span.clone()) // propagate request span
.await;
yield match result {
Ok(resp) => resp,
// Convert per-request errors to GetPageResponses as appropriate, or terminate
// the stream with a tonic::Status.
Err(err) => page_api::GetPageResponse::try_from_status(err, req_id)?.into(),
}
.await?
}
};

View File

@@ -21,7 +21,7 @@ OBJS = \
unstable_extensions.o \
walproposer.o \
walproposer_pg.o \
neon_ddl_handler.o \
control_plane_connector.o \
walsender_hooks.o
PG_CPPFLAGS = -I$(libpq_srcdir)

View File

@@ -1,6 +1,6 @@
/*-------------------------------------------------------------------------
*
* neon_ddl_handler.c
* control_plane_connector.c
* Captures updates to roles/databases using ProcessUtility_hook and
* sends them to the control ProcessUtility_hook. The changes are sent
* via HTTP to the URL specified by the GUC neon.console_url when the
@@ -13,30 +13,18 @@
* accumulate changes. On subtransaction commit, the top of the stack
* is merged with the table below it.
*
* Support event triggers for neon_superuser
*
* IDENTIFICATION
* contrib/neon/neon_dll_handler.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <curl/curl.h>
#include <unistd.h>
#include "access/xact.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_proc.h"
#include "commands/defrem.h"
#include "commands/event_trigger.h"
#include "commands/user.h"
#include "fmgr.h"
#include "libpq/crypt.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "parser/parse_func.h"
#include "tcop/pquery.h"
#include "tcop/utility.h"
#include "utils/acl.h"
@@ -44,16 +32,11 @@
#include "utils/hsearch.h"
#include "utils/memutils.h"
#include "utils/jsonb.h"
#include <utils/lsyscache.h>
#include <utils/syscache.h>
#include "neon_ddl_handler.h"
#include "control_plane_connector.h"
#include "neon_utils.h"
static ProcessUtility_hook_type PreviousProcessUtilityHook = NULL;
static fmgr_hook_type next_fmgr_hook = NULL;
static needs_fmgr_hook_type next_needs_fmgr_hook = NULL;
static bool neon_event_triggers = true;
static const char *jwt_token = NULL;
@@ -790,7 +773,6 @@ HandleDropRole(DropRoleStmt *stmt)
}
}
static void
HandleRename(RenameStmt *stmt)
{
@@ -800,460 +782,6 @@ HandleRename(RenameStmt *stmt)
return HandleRoleRename(stmt);
}
/*
* Support for Event Triggers.
*
* In vanilla only superuser can create Event Triggers.
*
* We allow it for neon_superuser by temporary switching to superuser. But as
* far as event trigger can fire in superuser context we should protect
* superuser from execution of arbitrary user's code.
*
* The idea was taken from Supabase PR series starting at
* https://github.com/supabase/supautils/pull/98
*/
static bool
neon_needs_fmgr_hook(Oid functionId) {
return (next_needs_fmgr_hook && (*next_needs_fmgr_hook) (functionId))
|| get_func_rettype(functionId) == EVENT_TRIGGEROID;
}
static void
LookupFuncOwnerSecDef(Oid functionId, Oid *funcOwner, bool *is_secdef)
{
Form_pg_proc procForm;
HeapTuple proc_tup = SearchSysCache1(PROCOID, ObjectIdGetDatum(functionId));
if (!HeapTupleIsValid(proc_tup))
ereport(ERROR,
(errmsg("cache lookup failed for function %u", functionId)));
procForm = (Form_pg_proc) GETSTRUCT(proc_tup);
*funcOwner = procForm->proowner;
*is_secdef = procForm->prosecdef;
ReleaseSysCache(proc_tup);
}
PG_FUNCTION_INFO_V1(noop);
Datum noop(__attribute__ ((unused)) PG_FUNCTION_ARGS) { PG_RETURN_VOID();}
static void
force_noop(FmgrInfo *finfo)
{
finfo->fn_addr = (PGFunction) noop;
finfo->fn_oid = InvalidOid; /* not a known function OID anymore */
finfo->fn_nargs = 0; /* no arguments for noop */
finfo->fn_strict = false;
finfo->fn_retset = false;
finfo->fn_stats = 0; /* no stats collection */
finfo->fn_extra = NULL; /* clear out old context data */
finfo->fn_mcxt = CurrentMemoryContext;
finfo->fn_expr = NULL; /* no parse tree */
}
/*
* Skip executing Event Triggers execution for superusers, because Event
* Triggers are SECURITY DEFINER and user provided code could then attempt
* privilege escalation.
*
* Also skip executing Event Triggers when GUC neon.event_triggers has been
* set to false. This might be necessary to be able to connect again after a
* LOGIN Event Trigger has been installed that would prevent connections as
* neon_superuser.
*/
static void
neon_fmgr_hook(FmgrHookEventType event, FmgrInfo *flinfo, Datum *private)
{
/*
* It can be other needs_fmgr_hook which cause our hook to be invoked for
* non-trigger function, so recheck that is is trigger function.
*/
if (flinfo->fn_oid != InvalidOid &&
get_func_rettype(flinfo->fn_oid) != EVENT_TRIGGEROID)
{
if (next_fmgr_hook)
(*next_fmgr_hook) (event, flinfo, private);
return;
}
/*
* The neon_superuser role can use the GUC neon.event_triggers to disable
* firing Event Trigger.
*
* SET neon.event_triggers TO false;
*
* This only applies to the neon_superuser role though, and only allows
* skipping Event Triggers owned by neon_superuser, which we check by
* proxy of the Event Trigger function being owned by neon_superuser.
*
* A role that is created in role neon_superuser should be allowed to also
* benefit from the neon_event_triggers GUC, and will be considered the
* same as the neon_superuser role.
*/
if (event == FHET_START
&& !neon_event_triggers
&& is_neon_superuser())
{
Oid neon_superuser_oid = get_role_oid("neon_superuser", false);
/* Find the Function Attributes (owner Oid, security definer) */
const char *fun_owner_name = NULL;
Oid fun_owner = InvalidOid;
bool fun_is_secdef = false;
LookupFuncOwnerSecDef(flinfo->fn_oid, &fun_owner, &fun_is_secdef);
fun_owner_name = GetUserNameFromId(fun_owner, false);
if (RoleIsNeonSuperuser(fun_owner_name)
|| has_privs_of_role(fun_owner, neon_superuser_oid))
{
elog(WARNING,
"Skipping Event Trigger: neon.event_triggers is false");
/*
* we can't skip execution directly inside the fmgr_hook so instead we
* change the event trigger function to a noop function.
*/
force_noop(flinfo);
}
}
/*
* Fire Event Trigger if both function owner and current user are
* superuser, or none of them are.
*/
else if (event == FHET_START
/* still enable it to pass pg_regress tests */
&& !RegressTestMode)
{
/*
* Get the current user oid as of before SECURITY DEFINER change of
* CurrentUserId, and that would be SessionUserId.
*/
Oid current_role_oid = GetSessionUserId();
bool role_is_super = superuser_arg(current_role_oid);
/* Find the Function Attributes (owner Oid, security definer) */
Oid function_owner = InvalidOid;
bool function_is_secdef = false;
bool function_is_owned_by_super = false;
LookupFuncOwnerSecDef(flinfo->fn_oid, &function_owner, &function_is_secdef);
function_is_owned_by_super = superuser_arg(function_owner);
/*
* 1. Refuse to run SECURITY DEFINER function that belongs to a
* superuser when the current user is not a superuser itself.
*/
if (!role_is_super
&& function_is_owned_by_super
&& function_is_secdef)
{
char *func_name = get_func_name(flinfo->fn_oid);
ereport(WARNING,
(errmsg("Skipping Event Trigger"),
errdetail("Event Trigger function \"%s\" is owned by \"%s\" "
"and is SECURITY DEFINER",
func_name,
GetUserNameFromId(function_owner, false))));
/*
* we can't skip execution directly inside the fmgr_hook so
* instead we change the event trigger function to a noop
* function.
*/
force_noop(flinfo);
}
/*
* 2. Refuse to run functions that belongs to a non-superuser when the
* current user is a superuser.
*
* We could run a SECURITY DEFINER user-function here and be safe with
* privilege escalation risks, but superuser roles are only used for
* infrastructure maintenance operations, where we prefer to skip
* running user-defined code.
*/
else if (role_is_super && !function_is_owned_by_super)
{
char *func_name = get_func_name(flinfo->fn_oid);
ereport(WARNING,
(errmsg("Skipping Event Trigger"),
errdetail("Event Trigger function \"%s\" "
"is owned by non-superuser role \"%s\", "
"and current_user \"%s\" is superuser",
func_name,
GetUserNameFromId(function_owner, false),
GetUserNameFromId(current_role_oid, false))));
/*
* we can't skip execution directly inside the fmgr_hook so
* instead we change the event trigger function to a noop
* function.
*/
force_noop(flinfo);
}
}
if (next_fmgr_hook)
(*next_fmgr_hook) (event, flinfo, private);
}
static Oid prev_role_oid = 0;
static int prev_role_sec_context = 0;
static bool switched_to_superuser = false;
/*
* Switch tp superuser if not yet superuser.
* Returns false if already switched to superuser.
*/
static bool
switch_to_superuser(void)
{
Oid superuser_oid;
if (switched_to_superuser)
return false;
switched_to_superuser = true;
superuser_oid = get_role_oid("cloud_admin", true /*missing_ok*/);
if (superuser_oid == InvalidOid)
superuser_oid = BOOTSTRAP_SUPERUSERID;
GetUserIdAndSecContext(&prev_role_oid, &prev_role_sec_context);
SetUserIdAndSecContext(superuser_oid, prev_role_sec_context |
SECURITY_LOCAL_USERID_CHANGE |
SECURITY_RESTRICTED_OPERATION);
return true;
}
static void
switch_to_original_role(void)
{
SetUserIdAndSecContext(prev_role_oid, prev_role_sec_context);
switched_to_superuser = false;
}
/*
* ALTER ROLE ... SUPERUSER;
*
* Used internally to give superuser to a non-privileged role to allow
* ownership of superuser-only objects such as Event Trigger.
*
* ALTER ROLE foo SUPERUSER;
* ALTER EVENT TRIGGER ... OWNED BY foo;
* ALTER ROLE foo NOSUPERUSER;
*
* Now the EVENT TRIGGER is owned by foo, who can DROP it without having to be
* superuser again.
*/
static void
alter_role_super(const char* rolename, bool make_super)
{
AlterRoleStmt *alter_stmt = makeNode(AlterRoleStmt);
DefElem *defel_superuser =
#if PG_MAJORVERSION_NUM <= 14
makeDefElem("superuser", (Node *) makeInteger(make_super), -1);
#else
makeDefElem("superuser", (Node *) makeBoolean(make_super), -1);
#endif
RoleSpec *rolespec = makeNode(RoleSpec);
rolespec->roletype = ROLESPEC_CSTRING;
rolespec->rolename = pstrdup(rolename);
rolespec->location = -1;
alter_stmt->role = rolespec;
alter_stmt->options = list_make1(defel_superuser);
#if PG_MAJORVERSION_NUM < 15
AlterRole(alter_stmt);
#else
/* ParseState *pstate, AlterRoleStmt *stmt */
AlterRole(NULL, alter_stmt);
#endif
CommandCounterIncrement();
}
/*
* Changes the OWNER of an Event Trigger.
*
* Event Triggers can only be owned by superusers, so this ALTER ROLE with
* SUPERUSER and then removes the property.
*/
static void
alter_event_trigger_owner(const char *obj_name, Oid role_oid)
{
char* role_name = GetUserNameFromId(role_oid, false);
alter_role_super(role_name, true);
AlterEventTriggerOwner(obj_name, role_oid);
CommandCounterIncrement();
alter_role_super(role_name, false);
}
/*
* Neon processing of the CREATE EVENT TRIGGER requires special attention and
* is worth having its own ProcessUtility_hook for that.
*/
static void
ProcessCreateEventTrigger(
PlannedStmt *pstmt,
const char *queryString,
bool readOnlyTree,
ProcessUtilityContext context,
ParamListInfo params,
QueryEnvironment *queryEnv,
DestReceiver *dest,
QueryCompletion *qc)
{
Node *parseTree = pstmt->utilityStmt;
bool sudo = false;
/* We double-check that after local variable declaration block */
CreateEventTrigStmt *stmt = (CreateEventTrigStmt *) parseTree;
/*
* We are going to change the current user privileges (sudo) and might
* need after execution cleanup. For that we want to capture the UserId
* before changing it for our sudo implementation.
*/
const Oid current_user_id = GetUserId();
bool current_user_is_super = superuser_arg(current_user_id);
if (nodeTag(parseTree) != T_CreateEventTrigStmt)
{
ereport(ERROR,
errcode(ERRCODE_INTERNAL_ERROR),
errmsg("ProcessCreateEventTrigger called for the wrong command"));
}
/*
* Allow neon_superuser to create Event Trigger, while keeping the
* ownership of the object.
*
* For that we give superuser membership to the role for the execution of
* the command.
*/
if (IsTransactionState() && is_neon_superuser())
{
/* Find the Event Trigger function Oid */
Oid func_oid = LookupFuncName(stmt->funcname, 0, NULL, false);
/* Find the Function Owner Oid */
Oid func_owner = InvalidOid;
bool is_secdef = false;
bool function_is_owned_by_super = false;
LookupFuncOwnerSecDef(func_oid, &func_owner, &is_secdef);
function_is_owned_by_super = superuser_arg(func_owner);
if(!current_user_is_super && function_is_owned_by_super)
{
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("Permission denied to execute "
"a function owned by a superuser role"),
errdetail("current user \"%s\" is not a superuser "
"and Event Trigger function \"%s\" "
"is owned by a superuser",
GetUserNameFromId(current_user_id, false),
NameListToString(stmt->funcname))));
}
if(current_user_is_super && !function_is_owned_by_super)
{
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("Permission denied to execute "
"a function owned by a non-superuser role"),
errdetail("current user \"%s\" is a superuser "
"and function \"%s\" is "
"owned by a non-superuser",
GetUserNameFromId(current_user_id, false),
NameListToString(stmt->funcname))));
}
sudo = switch_to_superuser();
}
PG_TRY();
{
if (PreviousProcessUtilityHook)
{
PreviousProcessUtilityHook(
pstmt,
queryString,
readOnlyTree,
context,
params,
queryEnv,
dest,
qc);
}
else
{
standard_ProcessUtility(
pstmt,
queryString,
readOnlyTree,
context,
params,
queryEnv,
dest,
qc);
}
/*
* Now that the Event Trigger has been installed via our sudo
* mechanism, if the original role was not a superuser then change
* the event trigger ownership back to the original role.
*
* That way [ ALTER | DROP ] EVENT TRIGGER commands just work.
*/
if (IsTransactionState() && is_neon_superuser())
{
if (!current_user_is_super)
{
/*
* Change event trigger owner to the current role (making
* it a privileged role during the ALTER OWNER command).
*/
alter_event_trigger_owner(stmt->trigname, current_user_id);
}
}
}
PG_FINALLY();
{
if (sudo)
switch_to_original_role();
}
PG_END_TRY();
}
/*
* Neon hooks for DDLs (handling privileges, limiting features, etc).
*/
static void
NeonProcessUtility(
PlannedStmt *pstmt,
@@ -1267,27 +795,6 @@ NeonProcessUtility(
{
Node *parseTree = pstmt->utilityStmt;
/*
* The process utility hook for CREATE EVENT TRIGGER is its own
* implementation and warrant being addressed separately from here.
*/
if (nodeTag(parseTree) == T_CreateEventTrigStmt)
{
ProcessCreateEventTrigger(
pstmt,
queryString,
readOnlyTree,
context,
params,
queryEnv,
dest,
qc);
return;
}
/*
* Other commands that need Neon specific implementations are handled here:
*/
switch (nodeTag(parseTree))
{
case T_CreatedbStmt:
@@ -1326,82 +833,37 @@ NeonProcessUtility(
if (PreviousProcessUtilityHook)
{
PreviousProcessUtilityHook(
pstmt,
queryString,
readOnlyTree,
context,
params,
queryEnv,
dest,
qc);
pstmt,
queryString,
readOnlyTree,
context,
params,
queryEnv,
dest,
qc);
}
else
{
standard_ProcessUtility(
pstmt,
queryString,
readOnlyTree,
context,
params,
queryEnv,
dest,
qc);
pstmt,
queryString,
readOnlyTree,
context,
params,
queryEnv,
dest,
qc);
}
}
/*
* Only neon_superuser is granted privilege to edit neon.event_triggers GUC.
*/
static void
neon_event_triggers_assign_hook(bool newval, void *extra)
{
/* MyDatabaseId == InvalidOid || !OidIsValid(GetUserId()) */
if (IsTransactionState() && !is_neon_superuser())
{
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("permission denied to set neon.event_triggers"),
errdetail("Only \"neon_superuser\" is allowed to set the GUC")));
}
}
void
InitDDLHandler()
InitControlPlaneConnector()
{
PreviousProcessUtilityHook = ProcessUtility_hook;
ProcessUtility_hook = NeonProcessUtility;
next_needs_fmgr_hook = needs_fmgr_hook;
needs_fmgr_hook = neon_needs_fmgr_hook;
next_fmgr_hook = fmgr_hook;
fmgr_hook = neon_fmgr_hook;
RegisterXactCallback(NeonXactCallback, NULL);
RegisterSubXactCallback(NeonSubXactCallback, NULL);
/*
* The GUC neon.event_triggers should provide the same effect as the
* Postgres GUC event_triggers, but the neon one is PGC_USERSET.
*
* This allows using the GUC in the connection string and work out of a
* LOGIN Event Trigger that would break database access, all without
* having to edit and reload the Postgres configuration file.
*/
DefineCustomBoolVariable(
"neon.event_triggers",
"Enable firing of event triggers",
NULL,
&neon_event_triggers,
true,
PGC_USERSET,
0,
NULL,
neon_event_triggers_assign_hook,
NULL);
DefineCustomStringVariable(
"neon.console_url",
"URL of the Neon Console, which will be forwarded changes to dbs and roles",

View File

@@ -0,0 +1,6 @@
#ifndef CONTROL_PLANE_CONNECTOR_H
#define CONTROL_PLANE_CONNECTOR_H
void InitControlPlaneConnector(void);
#endif

View File

@@ -33,9 +33,9 @@
#include "extension_server.h"
#include "file_cache.h"
#include "neon.h"
#include "neon_ddl_handler.h"
#include "neon_lwlsncache.h"
#include "neon_perf_counters.h"
#include "control_plane_connector.h"
#include "logical_replication_monitor.h"
#include "unstable_extensions.h"
#include "walsender_hooks.h"
@@ -454,7 +454,7 @@ _PG_init(void)
InitUnstableExtensionsSupport();
InitLogicalReplicationMonitor();
InitDDLHandler();
InitControlPlaneConnector();
pg_init_extension_server();

View File

@@ -1,6 +0,0 @@
#ifndef CONTROL_DDL_HANDLER_H
#define CONTROL_DDL_HANDLER_H
void InitDDLHandler(void);
#endif

View File

@@ -1 +0,0 @@
ALTER TABLE nodes DROP listen_grpc_addr, listen_grpc_port;

View File

@@ -1 +0,0 @@
ALTER TABLE nodes ADD listen_grpc_addr VARCHAR NULL, ADD listen_grpc_port INTEGER NULL;

View File

@@ -5,11 +5,10 @@ use std::sync::Arc;
use std::time::Duration;
use anyhow::Context;
use control_plane::endpoint::{ComputeControlPlane, EndpointStatus, PageserverProtocol};
use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
use control_plane::local_env::LocalEnv;
use futures::StreamExt;
use hyper::StatusCode;
use pageserver_api::config::DEFAULT_GRPC_LISTEN_PORT;
use pageserver_api::controller_api::AvailabilityZone;
use pageserver_api::shard::{ShardCount, ShardNumber, ShardStripeSize, TenantShardId};
use postgres_connection::parse_host_port;
@@ -421,31 +420,23 @@ impl ComputeHook {
preferred_az: _preferred_az,
} = reconfigure_request;
let compute_pageservers = shards
.iter()
.map(|shard| {
let ps_conf = env
.get_pageserver_conf(shard.node_id)
.expect("Unknown pageserver");
let (pg_host, pg_port) = parse_host_port(&ps_conf.listen_pg_addr)
.expect("Unable to parse listen_pg_addr");
(pg_host, pg_port.unwrap_or(5432))
})
.collect::<Vec<_>>();
for (endpoint_name, endpoint) in &cplane.endpoints {
if endpoint.tenant_id == *tenant_id && endpoint.status() == EndpointStatus::Running {
tracing::info!("Reconfiguring endpoint {endpoint_name}");
let pageservers = shards
.iter()
.map(|shard| {
let ps_conf = env
.get_pageserver_conf(shard.node_id)
.expect("Unknown pageserver");
if endpoint.grpc {
let addr = ps_conf.listen_grpc_addr.as_ref().expect("no gRPC address");
let (host, port) = parse_host_port(addr).expect("invalid gRPC address");
let port = port.unwrap_or(DEFAULT_GRPC_LISTEN_PORT);
(PageserverProtocol::Grpc, host, port)
} else {
let (host, port) = parse_host_port(&ps_conf.listen_pg_addr)
.expect("Unable to parse listen_pg_addr");
(PageserverProtocol::Libpq, host, port.unwrap_or(5432))
}
})
.collect::<Vec<_>>();
tracing::info!("Reconfiguring endpoint {}", endpoint_name,);
endpoint
.reconfigure(pageservers, *stripe_size, None)
.reconfigure(compute_pageservers.clone(), *stripe_size, None)
.await
.map_err(NotifyError::NeonLocal)?;
}

View File

@@ -97,7 +97,7 @@ pub(crate) struct StorageControllerMetricGroup {
/// Count of HTTP requests to the safekeeper that resulted in an error,
/// broken down by the safekeeper node id, request name and method
pub(crate) storage_controller_safekeeper_request_error:
measured::CounterVec<SafekeeperRequestLabelGroupSet>,
measured::CounterVec<PageserverRequestLabelGroupSet>,
/// Latency of HTTP requests to the pageserver, broken down by pageserver
/// node id, request name and method. This include both successful and unsuccessful
@@ -111,7 +111,7 @@ pub(crate) struct StorageControllerMetricGroup {
/// requests.
#[metric(metadata = histogram::Thresholds::exponential_buckets(0.1, 2.0))]
pub(crate) storage_controller_safekeeper_request_latency:
measured::HistogramVec<SafekeeperRequestLabelGroupSet, 5>,
measured::HistogramVec<PageserverRequestLabelGroupSet, 5>,
/// Count of pass-through HTTP requests to the pageserver that resulted in an error,
/// broken down by the pageserver node id, request name and method
@@ -136,8 +136,7 @@ pub(crate) struct StorageControllerMetricGroup {
pub(crate) storage_controller_leadership_status: measured::GaugeVec<LeadershipStatusGroupSet>,
/// Indicator of stucked (long-running) reconciles, broken down by tenant, shard and sequence.
/// The metric is automatically removed once the reconciliation completes.
/// HTTP request status counters for handled requests
pub(crate) storage_controller_reconcile_long_running:
measured::CounterVec<ReconcileLongRunningLabelGroupSet>,
@@ -219,16 +218,6 @@ pub(crate) struct PageserverRequestLabelGroup<'a> {
pub(crate) method: Method,
}
#[derive(measured::LabelGroup, Clone)]
#[label(set = SafekeeperRequestLabelGroupSet)]
pub(crate) struct SafekeeperRequestLabelGroup<'a> {
#[label(dynamic_with = lasso::ThreadedRodeo, default)]
pub(crate) safekeeper_id: &'a str,
#[label(dynamic_with = lasso::ThreadedRodeo, default)]
pub(crate) path: &'a str,
pub(crate) method: Method,
}
#[derive(measured::LabelGroup)]
#[label(set = DatabaseQueryErrorLabelGroupSet)]
pub(crate) struct DatabaseQueryErrorLabelGroup {

View File

@@ -37,8 +37,6 @@ pub(crate) struct Node {
listen_pg_addr: String,
listen_pg_port: u16,
listen_grpc_addr: Option<String>,
listen_grpc_port: Option<u16>,
availability_zone_id: AvailabilityZone,
@@ -102,8 +100,8 @@ impl Node {
self.id == register_req.node_id
&& self.listen_http_addr == register_req.listen_http_addr
&& self.listen_http_port == register_req.listen_http_port
// Note: HTTPS and gRPC addresses may change, to allow for migrations. See
// [`Self::need_update`] for more details.
// Note: listen_https_port may change. See [`Self::need_update`] for mode details.
// && self.listen_https_port == register_req.listen_https_port
&& self.listen_pg_addr == register_req.listen_pg_addr
&& self.listen_pg_port == register_req.listen_pg_port
&& self.availability_zone_id == register_req.availability_zone_id
@@ -111,10 +109,9 @@ impl Node {
// Do we need to update an existing record in DB on this registration request?
pub(crate) fn need_update(&self, register_req: &NodeRegisterRequest) -> bool {
// These are checked here, since they may change before we're fully migrated.
// listen_https_port is checked here because it may change during migration to https.
// After migration, this check may be moved to registration_match.
self.listen_https_port != register_req.listen_https_port
|| self.listen_grpc_addr != register_req.listen_grpc_addr
|| self.listen_grpc_port != register_req.listen_grpc_port
}
/// For a shard located on this node, populate a response object
@@ -128,8 +125,6 @@ impl Node {
listen_https_port: self.listen_https_port,
listen_pg_addr: self.listen_pg_addr.clone(),
listen_pg_port: self.listen_pg_port,
listen_grpc_addr: self.listen_grpc_addr.clone(),
listen_grpc_port: self.listen_grpc_port,
}
}
@@ -216,8 +211,6 @@ impl Node {
listen_https_port: Option<u16>,
listen_pg_addr: String,
listen_pg_port: u16,
listen_grpc_addr: Option<String>,
listen_grpc_port: Option<u16>,
availability_zone_id: AvailabilityZone,
use_https: bool,
) -> anyhow::Result<Self> {
@@ -228,10 +221,6 @@ impl Node {
);
}
if listen_grpc_addr.is_some() != listen_grpc_port.is_some() {
anyhow::bail!("cannot create node {id}: must specify both gRPC address and port");
}
Ok(Self {
id,
listen_http_addr,
@@ -239,8 +228,6 @@ impl Node {
listen_https_port,
listen_pg_addr,
listen_pg_port,
listen_grpc_addr,
listen_grpc_port,
scheduling: NodeSchedulingPolicy::Active,
lifecycle: NodeLifecycle::Active,
availability: NodeAvailability::Offline,
@@ -260,8 +247,6 @@ impl Node {
listen_https_port: self.listen_https_port.map(|x| x as i32),
listen_pg_addr: self.listen_pg_addr.clone(),
listen_pg_port: self.listen_pg_port as i32,
listen_grpc_addr: self.listen_grpc_addr.clone(),
listen_grpc_port: self.listen_grpc_port.map(|port| port as i32),
availability_zone_id: self.availability_zone_id.0.clone(),
}
}
@@ -275,13 +260,6 @@ impl Node {
);
}
if np.listen_grpc_addr.is_some() != np.listen_grpc_port.is_some() {
anyhow::bail!(
"can't load node {}: must specify both gRPC address and port",
np.node_id
);
}
Ok(Self {
id: NodeId(np.node_id as u64),
// At startup we consider a node offline until proven otherwise.
@@ -294,8 +272,6 @@ impl Node {
listen_https_port: np.listen_https_port.map(|x| x as u16),
listen_pg_addr: np.listen_pg_addr,
listen_pg_port: np.listen_pg_port as u16,
listen_grpc_addr: np.listen_grpc_addr,
listen_grpc_port: np.listen_grpc_port.map(|port| port as u16),
availability_zone_id: AvailabilityZone(np.availability_zone_id),
use_https,
cancel: CancellationToken::new(),
@@ -385,8 +361,6 @@ impl Node {
listen_https_port: self.listen_https_port,
listen_pg_addr: self.listen_pg_addr.clone(),
listen_pg_port: self.listen_pg_port,
listen_grpc_addr: self.listen_grpc_addr.clone(),
listen_grpc_port: self.listen_grpc_port,
}
}
}

View File

@@ -2125,8 +2125,6 @@ pub(crate) struct NodePersistence {
pub(crate) availability_zone_id: String,
pub(crate) listen_https_port: Option<i32>,
pub(crate) lifecycle: String,
pub(crate) listen_grpc_addr: Option<String>,
pub(crate) listen_grpc_port: Option<i32>,
}
/// Tenant metadata health status that are stored durably.

View File

@@ -5,7 +5,7 @@ use safekeeper_client::mgmt_api::{Client, Result};
use utils::id::{NodeId, TenantId, TimelineId};
use utils::logging::SecretString;
use crate::metrics::SafekeeperRequestLabelGroup;
use crate::metrics::PageserverRequestLabelGroup;
/// Thin wrapper around [`safekeeper_client::mgmt_api::Client`]. It allows the storage
/// controller to collect metrics in a non-intrusive manner.
@@ -19,8 +19,8 @@ pub(crate) struct SafekeeperClient {
macro_rules! measured_request {
($name:literal, $method:expr, $node_id: expr, $invoke:expr) => {{
let labels = SafekeeperRequestLabelGroup {
safekeeper_id: $node_id,
let labels = PageserverRequestLabelGroup {
pageserver_id: $node_id,
path: $name,
method: $method,
};
@@ -35,7 +35,7 @@ macro_rules! measured_request {
if res.is_err() {
let error_counters = &crate::metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_safekeeper_request_error;
.storage_controller_pageserver_request_error;
error_counters.inc(labels)
}

View File

@@ -945,8 +945,6 @@ pub(crate) mod test_utils {
None,
format!("pghost-{i}"),
5432 + i as u16,
Some(format!("grpchost-{i}")),
Some(51051 + i as u16),
az_iter
.next()
.cloned()

View File

@@ -34,8 +34,6 @@ diesel::table! {
availability_zone_id -> Varchar,
listen_https_port -> Nullable<Int4>,
lifecycle -> Varchar,
listen_grpc_addr -> Nullable<Varchar>,
listen_grpc_port -> Nullable<Int4>,
}
}

View File

@@ -1683,8 +1683,6 @@ impl Service {
None,
"".to_string(),
123,
None,
None,
AvailabilityZone("test_az".to_string()),
false,
)
@@ -2227,8 +2225,19 @@ impl Service {
&self,
reattach_req: ReAttachRequest,
) -> Result<ReAttachResponse, ApiError> {
let mut _node_lock: Option<TracingExclusiveGuard<NodeOperations>> = None;
if let Some(register_req) = reattach_req.register {
self.node_register(register_req).await?;
_node_lock = Some(
trace_exclusive_lock(
&self.node_op_locks,
register_req.node_id,
NodeOperations::Register,
)
.await,
);
self.node_register_with_lock(register_req, _node_lock.as_ref().unwrap())
.await?;
}
// Ordering: we must persist generation number updates before making them visible in the in-memory state
@@ -7164,13 +7173,21 @@ impl Service {
&self,
register_req: NodeRegisterRequest,
) -> Result<(), ApiError> {
let _node_lock = trace_exclusive_lock(
let node_lock = trace_exclusive_lock(
&self.node_op_locks,
register_req.node_id,
NodeOperations::Register,
)
.await;
self.node_register_with_lock(register_req, &node_lock).await
}
async fn node_register_with_lock(
&self,
register_req: NodeRegisterRequest,
_node_lock: &TracingExclusiveGuard<NodeOperations>,
) -> Result<(), ApiError> {
#[derive(PartialEq)]
enum RegistrationStatus {
UpToDate,
@@ -7256,12 +7273,6 @@ impl Service {
));
}
if register_req.listen_grpc_addr.is_some() != register_req.listen_grpc_port.is_some() {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"must specify both gRPC address and port"
)));
}
// Ordering: we must persist the new node _before_ adding it to in-memory state.
// This ensures that before we use it for anything or expose it via any external
// API, it is guaranteed to be available after a restart.
@@ -7272,8 +7283,6 @@ impl Service {
register_req.listen_https_port,
register_req.listen_pg_addr,
register_req.listen_pg_port,
register_req.listen_grpc_addr,
register_req.listen_grpc_port,
register_req.availability_zone_id.clone(),
self.config.use_https_pageserver_api,
);

View File

@@ -1184,19 +1184,11 @@ impl TenantShard {
for secondary in self.intent.get_secondary() {
// Make sure we don't try to migrate a secondary to our attached location: this case happens
// easily in environments without multiple AZs.
let mut exclude = match self.intent.attached {
let exclude = match self.intent.attached {
Some(attached) => vec![attached],
None => vec![],
};
// Exclude all other secondaries from the scheduling process to avoid replacing
// one existing secondary with another existing secondary.
for another_secondary in self.intent.secondary.iter() {
if another_secondary != secondary {
exclude.push(*another_secondary);
}
}
let replacement = match &self.policy {
PlacementPolicy::Attached(_) => {
// Secondaries for an attached shard should be scheduled using `SecondaryShardTag`
@@ -1356,19 +1348,28 @@ impl TenantShard {
/// Reconciliation may still be needed for other aspects of state such as secondaries (see [`Self::dirty`]): this
/// funciton should not be used to decide whether to reconcile.
pub(crate) fn stably_attached(&self) -> Option<NodeId> {
// We have an intent to attach for this node
let attach_intent = self.intent.attached?;
// We have an observed state for this node
let location = self.observed.locations.get(&attach_intent)?;
// Our observed state is not None, i.e. not in flux
let location_config = location.conf.as_ref()?;
// Check if our intent and observed state agree that this node is in an attached state.
match location_config.mode {
LocationConfigMode::AttachedMulti
| LocationConfigMode::AttachedSingle
| LocationConfigMode::AttachedStale => Some(attach_intent),
_ => None,
if let Some(attach_intent) = self.intent.attached {
match self.observed.locations.get(&attach_intent) {
Some(loc) => match &loc.conf {
Some(conf) => match conf.mode {
LocationConfigMode::AttachedMulti
| LocationConfigMode::AttachedSingle
| LocationConfigMode::AttachedStale => {
// Our intent and observed state agree that this node is in an attached state.
Some(attach_intent)
}
// Our observed config is not an attached state
_ => None,
},
// Our observed state is None, i.e. in flux
None => None,
},
// We have no observed state for this node
None => None,
}
} else {
// Our intent is not to attach
None
}
}

View File

@@ -69,11 +69,6 @@ def test_basebackup_cache(neon_env_builder: NeonEnvBuilder):
).value
== i + 1
)
# There should be only one basebackup file in the cache.
assert metrics.query_one("pageserver_basebackup_cache_entries_total").value == 1
# The size of one basebackup for new DB is ~20KB.
size_bytes = metrics.query_one("pageserver_basebackup_cache_size_bytes").value
assert 10 * 1024 <= size_bytes <= 100 * 1024
wait_until(check_metrics)

View File

@@ -19,7 +19,6 @@ from fixtures.neon_fixtures import (
NeonEnvBuilder,
PgBin,
Safekeeper,
StorageControllerApiException,
flush_ep_to_pageserver,
)
from fixtures.pageserver.http import PageserverApiException
@@ -128,12 +127,6 @@ check_ondisk_data_compatibility_if_enabled = pytest.mark.skipif(
reason="CHECK_ONDISK_DATA_COMPATIBILITY env is not set",
)
skip_old_debug_versions = pytest.mark.skipif(
os.getenv("BUILD_TYPE", "debug") == "debug"
and os.getenv("DEFAULT_PG_VERSION") in [PgVersion.V14, PgVersion.V15, PgVersion.V16],
reason="compatibility snaphots not available for old versions of debug builds",
)
@pytest.mark.xdist_group("compatibility")
@pytest.mark.order(before="test_forward_compatibility")
@@ -204,7 +197,6 @@ ingest_lag_log_line = ".*ingesting record with timestamp lagging more than wait_
@check_ondisk_data_compatibility_if_enabled
@skip_old_debug_versions
@pytest.mark.xdist_group("compatibility")
@pytest.mark.order(after="test_create_snapshot")
def test_backward_compatibility(
@@ -232,7 +224,6 @@ def test_backward_compatibility(
@check_ondisk_data_compatibility_if_enabled
@skip_old_debug_versions
@pytest.mark.xdist_group("compatibility")
@pytest.mark.order(after="test_create_snapshot")
def test_forward_compatibility(
@@ -302,20 +293,7 @@ def test_forward_compatibility(
def check_neon_works(env: NeonEnv, test_output_dir: Path, sql_dump_path: Path, repo_dir: Path):
ep = env.endpoints.create("main")
ep_env = {"LD_LIBRARY_PATH": str(env.pg_distrib_dir / f"v{env.pg_version}/lib")}
# If the compatibility snapshot was created with --timelines-onto-safekeepers=false,
# we should not pass safekeeper_generation to the endpoint because the compute
# will not be able to start.
# Zero generation is INVALID_GENERATION.
generation = 0
try:
res = env.storage_controller.timeline_locate(env.initial_tenant, env.initial_timeline)
generation = res["generation"]
except StorageControllerApiException as e:
if e.status_code != 404 or not re.search(r"Timeline .* not found", str(e)):
raise e
ep.start(env=ep_env, safekeeper_generation=generation)
ep.start(env=ep_env)
connstr = ep.connstr()
@@ -365,7 +343,7 @@ def check_neon_works(env: NeonEnv, test_output_dir: Path, sql_dump_path: Path, r
)
# Timeline exists again: restart the endpoint
ep.start(env=ep_env, safekeeper_generation=generation)
ep.start(env=ep_env)
pg_bin.run_capture(
["pg_dumpall", f"--dbname={connstr}", f"--file={test_output_dir / 'dump-from-wal.sql'}"]
@@ -615,7 +593,6 @@ def test_historic_storage_formats(
@check_ondisk_data_compatibility_if_enabled
@skip_old_debug_versions
@pytest.mark.xdist_group("compatibility")
@pytest.mark.parametrize(
**fixtures.utils.allpairs_versions(),

View File

@@ -306,7 +306,13 @@ def test_sql_regress(
)
# Connect to postgres and create a database called "regression".
endpoint = env.endpoints.create_start("main")
endpoint = env.endpoints.create_start(
"main",
config_lines=[
# Enable the test mode, so that we don't need to patch the test cases.
"neon.regress_test_mode = true",
],
)
endpoint.safe_psql(f"CREATE DATABASE {DBNAME}")
# Create some local directories for pg_regress to run in.

View File

@@ -1,90 +0,0 @@
create or replace function admin_proc()
returns event_trigger
language plpgsql as
$$
begin
raise notice 'admin event trigger is executed for %', current_user;
end;
$$;
create role neon_superuser;
create role neon_admin login inherit createrole createdb in role neon_superuser;
grant create on schema public to neon_admin;
create database neondb with owner neon_admin;
grant all privileges on database neondb to neon_superuser;
create role neon_user;
grant create on schema public to neon_user;
create event trigger on_ddl1 on ddl_command_end
execute procedure admin_proc();
set role neon_user;
-- check that non-privileged user can not change neon.event_triggers
set neon.event_triggers to false;
ERROR: permission denied to set neon.event_triggers
DETAIL: Only "neon_superuser" is allowed to set the GUC
-- Non-privileged neon user should not be able to create event trigers
create event trigger on_ddl2 on ddl_command_end
execute procedure admin_proc();
ERROR: permission denied to create event trigger "on_ddl2"
HINT: Must be superuser to create an event trigger.
set role neon_admin;
-- neon_superuser should be able to create event trigers
create or replace function neon_proc()
returns event_trigger
language plpgsql as
$$
begin
raise notice 'neon event trigger is executed for %', current_user;
end;
$$;
NOTICE: admin event trigger is executed for neon_admin
create event trigger on_ddl2 on ddl_command_end
execute procedure neon_proc();
\c neondb neon_admin
create or replace function neondb_proc()
returns event_trigger
language plpgsql as
$$
begin
raise notice 'neondb event trigger is executed for %', current_user;
end;
$$;
create or replace function neondb_secdef_proc()
returns event_trigger
language plpgsql
SECURITY DEFINER
as
$$
begin
raise notice 'neondb secdef event trigger is executed for %', current_user;
end;
$$;
-- neon_admin (neon_superuser member) should be able to create event triggers
create event trigger on_ddl3 on ddl_command_end
execute procedure neondb_proc();
create event trigger on_ddl4 on ddl_command_end
execute procedure neondb_secdef_proc();
-- Check that event trigger is fired for neon_admin
create table t1(x integer);
NOTICE: neondb event trigger is executed for neon_admin
NOTICE: neondb secdef event trigger is executed for neon_admin
-- Check that event trigger can be skipped
set neon.event_triggers to false;
create table t2(x integer);
WARNING: Skipping Event Trigger: neon.event_triggers is false
WARNING: Skipping Event Trigger: neon.event_triggers is false
\c regression cloud_admin
-- Check that event triggers are not fired for superuser
create table t3(x integer);
NOTICE: admin event trigger is executed for cloud_admin
WARNING: Skipping Event Trigger
DETAIL: Event Trigger function "neon_proc" is owned by non-superuser role "neon_admin", and current_user "cloud_admin" is superuser
\c neondb cloud_admin
-- Check that user-defined event triggers are not fired for superuser
create table t4(x integer);
WARNING: Skipping Event Trigger
DETAIL: Event Trigger function "neondb_proc" is owned by non-superuser role "neon_admin", and current_user "cloud_admin" is superuser
WARNING: Skipping Event Trigger
DETAIL: Event Trigger function "neondb_secdef_proc" is owned by non-superuser role "neon_admin", and current_user "cloud_admin" is superuser
\c neondb neon_admin
-- Check that neon_admin can drop event triggers
drop event trigger on_ddl3;
drop event trigger on_ddl4;

View File

@@ -9,4 +9,3 @@ test: neon-rel-truncate
test: neon-clog
test: neon-test-utils
test: neon-vacuum-full
test: neon-event-triggers

View File

@@ -1,96 +0,0 @@
create or replace function admin_proc()
returns event_trigger
language plpgsql as
$$
begin
raise notice 'admin event trigger is executed for %', current_user;
end;
$$;
create role neon_superuser;
create role neon_admin login inherit createrole createdb in role neon_superuser;
grant create on schema public to neon_admin;
create database neondb with owner neon_admin;
grant all privileges on database neondb to neon_superuser;
create role neon_user;
grant create on schema public to neon_user;
create event trigger on_ddl1 on ddl_command_end
execute procedure admin_proc();
set role neon_user;
-- check that non-privileged user can not change neon.event_triggers
set neon.event_triggers to false;
-- Non-privileged neon user should not be able to create event trigers
create event trigger on_ddl2 on ddl_command_end
execute procedure admin_proc();
set role neon_admin;
-- neon_superuser should be able to create event trigers
create or replace function neon_proc()
returns event_trigger
language plpgsql as
$$
begin
raise notice 'neon event trigger is executed for %', current_user;
end;
$$;
create event trigger on_ddl2 on ddl_command_end
execute procedure neon_proc();
\c neondb neon_admin
create or replace function neondb_proc()
returns event_trigger
language plpgsql as
$$
begin
raise notice 'neondb event trigger is executed for %', current_user;
end;
$$;
create or replace function neondb_secdef_proc()
returns event_trigger
language plpgsql
SECURITY DEFINER
as
$$
begin
raise notice 'neondb secdef event trigger is executed for %', current_user;
end;
$$;
-- neon_admin (neon_superuser member) should be able to create event triggers
create event trigger on_ddl3 on ddl_command_end
execute procedure neondb_proc();
create event trigger on_ddl4 on ddl_command_end
execute procedure neondb_secdef_proc();
-- Check that event trigger is fired for neon_admin
create table t1(x integer);
-- Check that event trigger can be skipped
set neon.event_triggers to false;
create table t2(x integer);
\c regression cloud_admin
-- Check that event triggers are not fired for superuser
create table t3(x integer);
\c neondb cloud_admin
-- Check that user-defined event triggers are not fired for superuser
create table t4(x integer);
\c neondb neon_admin
-- Check that neon_admin can drop event triggers
drop event trigger on_ddl3;
drop event trigger on_ddl4;