Compare commits

..

1 Commits

Author SHA1 Message Date
John Spray
1d18b74324 storcon: add rate limiting for proxied API requests 2025-02-14 00:10:43 +01:00
72 changed files with 911 additions and 2237 deletions

View File

@@ -348,10 +348,6 @@ jobs:
rerun_failed: true
pg_version: ${{ matrix.pg_version }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
# `--session-timeout` is equal to (timeout-minutes - 10 minutes) * 60 seconds.
# Attempt to stop tests gracefully to generate test reports
# until they are forcibly stopped by the stricter `timeout-minutes` limit.
extra_params: --session-timeout=${{ inputs.sanitizers != 'enabled' && 3000 || 10200 }}
env:
TEST_RESULT_CONNSTR: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
CHECK_ONDISK_DATA_COMPATIBILITY: nonempty

View File

@@ -1,76 +0,0 @@
name: Force Test Upgrading of Extension
on:
schedule:
# * is a special character in YAML so you have to quote this string
# ┌───────────── minute (0 - 59)
# │ ┌───────────── hour (0 - 23)
# │ │ ┌───────────── day of the month (1 - 31)
# │ │ │ ┌───────────── month (1 - 12 or JAN-DEC)
# │ │ │ │ ┌───────────── day of the week (0 - 6 or SUN-SAT)
- cron: '45 2 * * *' # run once a day, timezone is utc
workflow_dispatch: # adds ability to run this manually
defaults:
run:
shell: bash -euxo pipefail {0}
concurrency:
# Allow only one workflow
group: ${{ github.workflow }}
cancel-in-progress: true
permissions:
id-token: write # aws-actions/configure-aws-credentials
statuses: write
contents: read
jobs:
regress:
strategy:
fail-fast: false
matrix:
pg-version: [16, 17]
runs-on: small
steps:
- uses: actions/checkout@v4
with:
submodules: false
- name: Get the last compute release tag
id: get-last-compute-release-tag
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
tag=$(gh api -q '[.[].tag_name | select(startswith("release-compute"))][0]'\
-H "Accept: application/vnd.github+json" \
-H "X-GitHub-Api-Version: 2022-11-28" \
"/repos/${GITHUB_REPOSITORY}/releases")
echo tag=${tag} >> ${GITHUB_OUTPUT}
- name: Test extension upgrade
timeout-minutes: 20
env:
NEWTAG: latest
OLDTAG: ${{ steps.get-last-compute-release-tag.outputs.tag }}
PG_VERSION: ${{ matrix.pg-version }}
FORCE_ALL_UPGRADE_TESTS: true
run: ./docker-compose/test_extensions_upgrade.sh
- name: Print logs and clean up
if: always()
run: |
docker compose --profile test-extensions -f ./docker-compose/docker-compose.yml logs || true
docker compose --profile test-extensions -f ./docker-compose/docker-compose.yml down
- name: Post to the Slack channel
if: ${{ github.event.schedule && failure() }}
uses: slackapi/slack-github-action@v1
with:
channel-id: ${{ vars.SLACK_ON_CALL_QA_STAGING_STREAM }}
slack-message: |
Test upgrading of extensions: ${{ job.status }}
<${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run>
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}

View File

@@ -1,41 +0,0 @@
name: Regenerate Postgres Settings
on:
pull_request:
types:
- opened
- synchronize
- reopened
paths:
- pgxn/neon/**.c
- vendor/postgres-v*
- vendor/revisions.json
concurrency:
group: ${{ github.workflow }}-${{ github.head_ref }}
cancel-in-progress: true
permissions:
pull-requests: write
jobs:
regenerate-pg-settings:
runs-on: ubuntu-22.04
steps:
- name: Add comment
uses: thollander/actions-comment-pull-request@v3
with:
comment-tag: ${{ github.job }}
pr-number: ${{ github.event.number }}
message: |
If this PR added a GUC in the Postgres fork or `neon` extension,
please regenerate the Postgres settings in the `cloud` repo:
```
make NEON_WORKDIR=path/to/neon/checkout \
-C goapp/internal/shareddomain/postgres generate
```
If you're an external contributor, a Neon employee will assist in
making sure this step is done.

100
Cargo.lock generated
View File

@@ -786,7 +786,7 @@ dependencies = [
[[package]]
name = "azure_core"
version = "0.21.0"
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#f64bd57262ced51afce5d8909c06dcb11a6dd85a"
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#c36ed4c039bb3d59b5a1705f2cc337636c73b541"
dependencies = [
"async-trait",
"base64 0.22.1",
@@ -815,7 +815,7 @@ dependencies = [
[[package]]
name = "azure_identity"
version = "0.21.0"
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#f64bd57262ced51afce5d8909c06dcb11a6dd85a"
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#c36ed4c039bb3d59b5a1705f2cc337636c73b541"
dependencies = [
"async-lock",
"async-trait",
@@ -834,7 +834,7 @@ dependencies = [
[[package]]
name = "azure_storage"
version = "0.21.0"
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#f64bd57262ced51afce5d8909c06dcb11a6dd85a"
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#c36ed4c039bb3d59b5a1705f2cc337636c73b541"
dependencies = [
"RustyXML",
"async-lock",
@@ -852,7 +852,7 @@ dependencies = [
[[package]]
name = "azure_storage_blobs"
version = "0.21.0"
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#f64bd57262ced51afce5d8909c06dcb11a6dd85a"
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#c36ed4c039bb3d59b5a1705f2cc337636c73b541"
dependencies = [
"RustyXML",
"azure_core",
@@ -872,7 +872,7 @@ dependencies = [
[[package]]
name = "azure_svc_blobstorage"
version = "0.21.0"
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#f64bd57262ced51afce5d8909c06dcb11a6dd85a"
source = "git+https://github.com/neondatabase/azure-sdk-for-rust.git?branch=neon#c36ed4c039bb3d59b5a1705f2cc337636c73b541"
dependencies = [
"azure_core",
"bytes",
@@ -1029,6 +1029,12 @@ dependencies = [
"generic-array",
]
[[package]]
name = "boxcar"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2721c3c5a6f0e7f7e607125d963fedeb765f545f67adc9d71ed934693881eb42"
[[package]]
name = "bstr"
version = "1.5.0"
@@ -1303,7 +1309,6 @@ dependencies = [
"aws-config",
"aws-sdk-kms",
"aws-sdk-s3",
"aws-smithy-types",
"axum",
"base64 0.13.1",
"bytes",
@@ -1352,7 +1357,6 @@ dependencies = [
"utils",
"uuid",
"vm_monitor",
"walkdir",
"workspace_hack",
"zstd",
]
@@ -2394,9 +2398,9 @@ checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
[[package]]
name = "futures-timer"
version = "3.0.2"
version = "3.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c"
checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24"
[[package]]
name = "futures-util"
@@ -2499,6 +2503,27 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "governor"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "842dc78579ce01e6a1576ad896edc92fca002dd60c9c3746b7fc2bec6fb429d0"
dependencies = [
"cfg-if",
"dashmap 6.1.0",
"futures-sink",
"futures-timer",
"futures-util",
"no-std-compat",
"nonzero_ext",
"parking_lot 0.12.1",
"portable-atomic",
"quanta",
"rand 0.8.5",
"smallvec",
"spinning_top",
]
[[package]]
name = "group"
version = "0.12.1"
@@ -3698,6 +3723,12 @@ dependencies = [
"memoffset 0.9.0",
]
[[package]]
name = "no-std-compat"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c"
[[package]]
name = "nom"
version = "7.1.3"
@@ -3708,6 +3739,12 @@ dependencies = [
"minimal-lexical",
]
[[package]]
name = "nonzero_ext"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21"
[[package]]
name = "notify"
version = "8.0.0"
@@ -4566,6 +4603,12 @@ dependencies = [
"never-say-never",
]
[[package]]
name = "portable-atomic"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "280dc24453071f1b63954171985a0b0d30058d287960968b9b2aca264c8d4ee6"
[[package]]
name = "postgres"
version = "0.19.7"
@@ -4925,6 +4968,7 @@ dependencies = [
"aws-sdk-iam",
"aws-sigv4",
"base64 0.13.1",
"boxcar",
"bstr",
"bytes",
"camino",
@@ -4976,6 +5020,7 @@ dependencies = [
"postgres-protocol2",
"postgres_backend",
"pq_proto",
"prometheus",
"rand 0.8.5",
"rand_distr",
"rcgen",
@@ -5000,6 +5045,7 @@ dependencies = [
"smallvec",
"smol_str",
"socket2",
"strum",
"strum_macros",
"subtle",
"thiserror 1.0.69",
@@ -5014,6 +5060,7 @@ dependencies = [
"tracing",
"tracing-log",
"tracing-opentelemetry",
"tracing-serde",
"tracing-subscriber",
"tracing-utils",
"try-lock",
@@ -5028,6 +5075,21 @@ dependencies = [
"zerocopy",
]
[[package]]
name = "quanta"
version = "0.12.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3bd1fe6824cea6538803de3ff1bc0cf3949024db3d43c9643024bfb33a807c0e"
dependencies = [
"crossbeam-utils",
"libc",
"once_cell",
"raw-cpuid",
"wasi 0.11.0+wasi-snapshot-preview1",
"web-sys",
"winapi",
]
[[package]]
name = "quick-xml"
version = "0.26.0"
@@ -5158,6 +5220,15 @@ dependencies = [
"num-traits",
]
[[package]]
name = "raw-cpuid"
version = "11.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6928fa44c097620b706542d428957635951bade7143269085389d42c8a4927e"
dependencies = [
"bitflags 2.8.0",
]
[[package]]
name = "rayon"
version = "1.7.0"
@@ -6366,6 +6437,15 @@ version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
[[package]]
name = "spinning_top"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d96d2d1d716fb500937168cc09353ffdc7a012be8475ac7308e1bdf0e3923300"
dependencies = [
"lock_api",
]
[[package]]
name = "spki"
version = "0.6.0"
@@ -6441,6 +6521,7 @@ dependencies = [
"diesel_migrations",
"fail",
"futures",
"governor",
"hex",
"http-utils",
"humantime",
@@ -6454,7 +6535,6 @@ dependencies = [
"pageserver_client",
"postgres_connection",
"rand 0.8.5",
"regex",
"reqwest",
"routerify",
"rustls 0.23.18",

View File

@@ -148,7 +148,7 @@ RUN case $DEBIAN_VERSION in \
apt install --no-install-recommends --no-install-suggests -y \
ninja-build git autoconf automake libtool build-essential bison flex libreadline-dev \
zlib1g-dev libxml2-dev libcurl4-openssl-dev libossp-uuid-dev wget ca-certificates pkg-config libssl-dev \
libicu-dev libxslt1-dev liblz4-dev libzstd-dev zstd curl unzip g++ \
libicu-dev libxslt1-dev liblz4-dev libzstd-dev zstd curl unzip \
$VERSION_INSTALLS \
&& apt clean && rm -rf /var/lib/apt/lists/*
@@ -1464,31 +1464,6 @@ RUN make release -j $(getconf _NPROCESSORS_ONLN) && \
make install -j $(getconf _NPROCESSORS_ONLN) && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_mooncake.control
#########################################################################################
#
# Layer "pg-duckdb-pg-build"
# compile pg_duckdb extension
#
#########################################################################################
FROM build-deps AS pg_duckdb-src
WORKDIR /ext-src
COPY compute/patches/pg_duckdb_v031.patch .
# pg_duckdb build requires source dir to be a git repo to get submodules
# allow neon_superuser to execute some functions that in pg_duckdb are available to superuser only:
# - extension management function duckdb.install_extension()
# - access to duckdb.extensions table and its sequence
RUN git clone --depth 1 --branch v0.3.1 https://github.com/duckdb/pg_duckdb.git pg_duckdb-src && \
cd pg_duckdb-src && \
git submodule update --init --recursive && \
patch -p1 < /ext-src/pg_duckdb_v031.patch
FROM pg-build AS pg_duckdb-build
ARG PG_VERSION
COPY --from=pg_duckdb-src /ext-src/ /ext-src/
WORKDIR /ext-src/pg_duckdb-src
RUN make install -j $(getconf _NPROCESSORS_ONLN) && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_duckdb.control
#########################################################################################
#
# Layer "pg_repack"
@@ -1602,7 +1577,6 @@ COPY --from=pg_anon-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_ivm-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_partman-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_mooncake-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_duckdb-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg_repack-build /usr/local/pgsql/ /usr/local/pgsql/
#########################################################################################
@@ -1695,6 +1669,29 @@ RUN if [ "$TARGETARCH" = "amd64" ]; then\
&& echo "${pgbouncer_exporter_sha256} pgbouncer_exporter" | sha256sum -c -\
&& echo "${sql_exporter_sha256} sql_exporter" | sha256sum -c -
#########################################################################################
#
# Layer "awscli"
#
#########################################################################################
FROM build-deps AS awscli
ARG TARGETARCH
RUN set -ex; \
if [ "${TARGETARCH}" = "amd64" ]; then \
TARGETARCH_ALT="x86_64"; \
CHECKSUM="c9a9df3770a3ff9259cb469b6179e02829687a464e0824d5c32d378820b53a00"; \
elif [ "${TARGETARCH}" = "arm64" ]; then \
TARGETARCH_ALT="aarch64"; \
CHECKSUM="8181730be7891582b38b028112e81b4899ca817e8c616aad807c9e9d1289223a"; \
else \
echo "Unsupported architecture: ${TARGETARCH}"; exit 1; \
fi; \
curl --retry 5 -L "https://awscli.amazonaws.com/awscli-exe-linux-${TARGETARCH_ALT}-2.17.5.zip" -o /tmp/awscliv2.zip; \
echo "${CHECKSUM} /tmp/awscliv2.zip" | sha256sum -c -; \
unzip /tmp/awscliv2.zip -d /tmp/awscliv2; \
/tmp/awscliv2/aws/install; \
rm -rf /tmp/awscliv2.zip /tmp/awscliv2
#########################################################################################
#
# Clean up postgres folder before inclusion
@@ -1864,6 +1861,9 @@ RUN mkdir /var/db && useradd -m -d /var/db/postgres postgres && \
mkdir /usr/local/download_extensions && \
chown -R postgres:postgres /usr/local/download_extensions
# aws cli is used by fast_import
COPY --from=awscli /usr/local/aws-cli /usr/local/aws-cli
# pgbouncer and its config
COPY --from=pgbouncer /usr/local/pgbouncer/bin/pgbouncer /usr/local/bin/pgbouncer
COPY --chmod=0666 --chown=postgres compute/etc/pgbouncer.ini /etc/pgbouncer.ini

View File

@@ -1,11 +0,0 @@
diff --git a/sql/pg_duckdb--0.2.0--0.3.0.sql b/sql/pg_duckdb--0.2.0--0.3.0.sql
index d777d76..af60106 100644
--- a/sql/pg_duckdb--0.2.0--0.3.0.sql
+++ b/sql/pg_duckdb--0.2.0--0.3.0.sql
@@ -1056,3 +1056,6 @@ GRANT ALL ON FUNCTION duckdb.cache(TEXT, TEXT) TO PUBLIC;
GRANT ALL ON FUNCTION duckdb.cache_info() TO PUBLIC;
GRANT ALL ON FUNCTION duckdb.cache_delete(TEXT) TO PUBLIC;
GRANT ALL ON PROCEDURE duckdb.recycle_ddb() TO PUBLIC;
+GRANT ALL ON FUNCTION duckdb.install_extension(TEXT) TO neon_superuser;
+GRANT ALL ON TABLE duckdb.extensions TO neon_superuser;
+GRANT ALL ON SEQUENCE duckdb.extensions_table_seq TO neon_superuser;

View File

@@ -47,9 +47,7 @@ files:
# Allow postgres user (which is what compute_ctl runs as) to run /neonvm/bin/resize-swap
# and /neonvm/bin/set-disk-quota as root without requiring entering a password (NOPASSWD),
# regardless of hostname (ALL)
#
# Also allow it to shut down the VM. The fast_import job does that when it's finished.
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota, /neonvm/bin/poweroff
postgres ALL=(root) NOPASSWD: /neonvm/bin/resize-swap, /neonvm/bin/set-disk-quota
- filename: cgconfig.conf
content: |
# Configuration for cgroups in VM compute nodes

View File

@@ -14,7 +14,6 @@ base64.workspace = true
aws-config.workspace = true
aws-sdk-s3.workspace = true
aws-sdk-kms.workspace = true
aws-smithy-types.workspace = true
anyhow.workspace = true
axum = { workspace = true, features = [] }
camino.workspace = true
@@ -55,7 +54,6 @@ thiserror.workspace = true
url.workspace = true
uuid.workspace = true
prometheus.workspace = true
walkdir.workspace = true
postgres_initdb.workspace = true
compute_api.workspace = true

View File

@@ -25,10 +25,10 @@
//! docker push localhost:3030/localregistry/compute-node-v14:latest
//! ```
use anyhow::{bail, Context};
use anyhow::Context;
use aws_config::BehaviorVersion;
use camino::{Utf8Path, Utf8PathBuf};
use clap::{Parser, Subcommand};
use clap::Parser;
use compute_tools::extension_server::{get_pg_version, PostgresMajorVersion};
use nix::unistd::Pid;
use tracing::{error, info, info_span, warn, Instrument};
@@ -44,59 +44,32 @@ mod s3_uri;
const PG_WAIT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(600);
const PG_WAIT_RETRY_INTERVAL: std::time::Duration = std::time::Duration::from_millis(300);
#[derive(Subcommand, Debug)]
enum Command {
/// Runs local postgres (neon binary), restores into it,
/// uploads pgdata to s3 to be consumed by pageservers
Pgdata {
/// Raw connection string to the source database. Used only in tests,
/// real scenario uses encrypted connection string in spec.json from s3.
#[clap(long)]
source_connection_string: Option<String>,
/// If specified, will not shut down the local postgres after the import. Used in local testing
#[clap(short, long)]
interactive: bool,
/// Port to run postgres on. Default is 5432.
#[clap(long, default_value_t = 5432)]
pg_port: u16, // port to run postgres on, 5432 is default
/// Number of CPUs in the system. This is used to configure # of
/// parallel worker processes, for index creation.
#[clap(long, env = "NEON_IMPORTER_NUM_CPUS")]
num_cpus: Option<usize>,
/// Amount of RAM in the system. This is used to configure shared_buffers
/// and maintenance_work_mem.
#[clap(long, env = "NEON_IMPORTER_MEMORY_MB")]
memory_mb: Option<usize>,
},
/// Runs pg_dump-pg_restore from source to destination without running local postgres.
DumpRestore {
/// Raw connection string to the source database. Used only in tests,
/// real scenario uses encrypted connection string in spec.json from s3.
#[clap(long)]
source_connection_string: Option<String>,
/// Raw connection string to the destination database. Used only in tests,
/// real scenario uses encrypted connection string in spec.json from s3.
#[clap(long)]
destination_connection_string: Option<String>,
},
}
#[derive(clap::Parser)]
struct Args {
#[clap(long, env = "NEON_IMPORTER_WORKDIR")]
#[clap(long)]
working_directory: Utf8PathBuf,
#[clap(long, env = "NEON_IMPORTER_S3_PREFIX")]
s3_prefix: Option<s3_uri::S3Uri>,
#[clap(long, env = "NEON_IMPORTER_PG_BIN_DIR")]
#[clap(long)]
source_connection_string: Option<String>,
#[clap(short, long)]
interactive: bool,
#[clap(long)]
pg_bin_dir: Utf8PathBuf,
#[clap(long, env = "NEON_IMPORTER_PG_LIB_DIR")]
#[clap(long)]
pg_lib_dir: Utf8PathBuf,
#[clap(long)]
pg_port: Option<u16>, // port to run postgres on, 5432 is default
#[clap(subcommand)]
command: Command,
/// Number of CPUs in the system. This is used to configure # of
/// parallel worker processes, for index creation.
#[clap(long, env = "NEON_IMPORTER_NUM_CPUS")]
num_cpus: Option<usize>,
/// Amount of RAM in the system. This is used to configure shared_buffers
/// and maintenance_work_mem.
#[clap(long, env = "NEON_IMPORTER_MEMORY_MB")]
memory_mb: Option<usize>,
}
#[serde_with::serde_as]
@@ -105,8 +78,6 @@ struct Spec {
encryption_secret: EncryptionSecret,
#[serde_as(as = "serde_with::base64::Base64")]
source_connstring_ciphertext_base64: Vec<u8>,
#[serde_as(as = "Option<serde_with::base64::Base64>")]
destination_connstring_ciphertext_base64: Option<Vec<u8>>,
}
#[derive(serde::Deserialize)]
@@ -122,150 +93,192 @@ const DEFAULT_LOCALE: &str = if cfg!(target_os = "macos") {
"C.UTF-8"
};
async fn decode_connstring(
kms_client: &aws_sdk_kms::Client,
key_id: &String,
connstring_ciphertext_base64: Vec<u8>,
) -> Result<String, anyhow::Error> {
let mut output = kms_client
.decrypt()
.key_id(key_id)
.ciphertext_blob(aws_sdk_s3::primitives::Blob::new(
connstring_ciphertext_base64,
))
.send()
.await
.context("decrypt connection string")?;
#[tokio::main]
pub(crate) async fn main() -> anyhow::Result<()> {
utils::logging::init(
utils::logging::LogFormat::Plain,
utils::logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
utils::logging::Output::Stdout,
)?;
let plaintext = output
.plaintext
.take()
.context("get plaintext connection string")?;
info!("starting");
String::from_utf8(plaintext.into_inner()).context("parse connection string as utf8")
}
let args = Args::parse();
struct PostgresProcess {
pgdata_dir: Utf8PathBuf,
pg_bin_dir: Utf8PathBuf,
pgbin: Utf8PathBuf,
pg_lib_dir: Utf8PathBuf,
postgres_proc: Option<tokio::process::Child>,
}
impl PostgresProcess {
fn new(pgdata_dir: Utf8PathBuf, pg_bin_dir: Utf8PathBuf, pg_lib_dir: Utf8PathBuf) -> Self {
Self {
pgdata_dir,
pgbin: pg_bin_dir.join("postgres"),
pg_bin_dir,
pg_lib_dir,
postgres_proc: None,
}
// Validate arguments
if args.s3_prefix.is_none() && args.source_connection_string.is_none() {
anyhow::bail!("either s3_prefix or source_connection_string must be specified");
}
if args.s3_prefix.is_some() && args.source_connection_string.is_some() {
anyhow::bail!("only one of s3_prefix or source_connection_string can be specified");
}
async fn prepare(&self, initdb_user: &str) -> Result<(), anyhow::Error> {
tokio::fs::create_dir(&self.pgdata_dir)
.await
.context("create pgdata directory")?;
let working_directory = args.working_directory;
let pg_bin_dir = args.pg_bin_dir;
let pg_lib_dir = args.pg_lib_dir;
let pg_port = args.pg_port.unwrap_or_else(|| {
info!("pg_port not specified, using default 5432");
5432
});
let pg_version = match get_pg_version(self.pgbin.as_ref()) {
PostgresMajorVersion::V14 => 14,
PostgresMajorVersion::V15 => 15,
PostgresMajorVersion::V16 => 16,
PostgresMajorVersion::V17 => 17,
// Initialize AWS clients only if s3_prefix is specified
let (aws_config, kms_client) = if args.s3_prefix.is_some() {
let config = aws_config::load_defaults(BehaviorVersion::v2024_03_28()).await;
let kms = aws_sdk_kms::Client::new(&config);
(Some(config), Some(kms))
} else {
(None, None)
};
// Get source connection string either from S3 spec or direct argument
let source_connection_string = if let Some(s3_prefix) = &args.s3_prefix {
let spec: Spec = {
let spec_key = s3_prefix.append("/spec.json");
let s3_client = aws_sdk_s3::Client::new(aws_config.as_ref().unwrap());
let object = s3_client
.get_object()
.bucket(&spec_key.bucket)
.key(spec_key.key)
.send()
.await
.context("get spec from s3")?
.body
.collect()
.await
.context("download spec body")?;
serde_json::from_slice(&object.into_bytes()).context("parse spec as json")?
};
postgres_initdb::do_run_initdb(postgres_initdb::RunInitdbArgs {
superuser: initdb_user,
locale: DEFAULT_LOCALE, // XXX: this shouldn't be hard-coded,
pg_version,
initdb_bin: self.pg_bin_dir.join("initdb").as_ref(),
library_search_path: &self.pg_lib_dir, // TODO: is this right? Prob works in compute image, not sure about neon_local.
pgdata: &self.pgdata_dir,
})
match spec.encryption_secret {
EncryptionSecret::KMS { key_id } => {
let mut output = kms_client
.unwrap()
.decrypt()
.key_id(key_id)
.ciphertext_blob(aws_sdk_s3::primitives::Blob::new(
spec.source_connstring_ciphertext_base64,
))
.send()
.await
.context("decrypt source connection string")?;
let plaintext = output
.plaintext
.take()
.context("get plaintext source connection string")?;
String::from_utf8(plaintext.into_inner())
.context("parse source connection string as utf8")?
}
}
} else {
args.source_connection_string.unwrap()
};
match tokio::fs::create_dir(&working_directory).await {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
if !is_directory_empty(&working_directory)
.await
.context("check if working directory is empty")?
{
anyhow::bail!("working directory is not empty");
} else {
// ok
}
}
Err(e) => return Err(anyhow::Error::new(e).context("create working directory")),
}
let pgdata_dir = working_directory.join("pgdata");
tokio::fs::create_dir(&pgdata_dir)
.await
.context("initdb")
}
.context("create pgdata directory")?;
async fn start(
&mut self,
initdb_user: &str,
port: u16,
nproc: usize,
memory_mb: usize,
) -> Result<&tokio::process::Child, anyhow::Error> {
self.prepare(initdb_user).await?;
let pgbin = pg_bin_dir.join("postgres");
let pg_version = match get_pg_version(pgbin.as_ref()) {
PostgresMajorVersion::V14 => 14,
PostgresMajorVersion::V15 => 15,
PostgresMajorVersion::V16 => 16,
PostgresMajorVersion::V17 => 17,
};
let superuser = "cloud_admin"; // XXX: this shouldn't be hard-coded
postgres_initdb::do_run_initdb(postgres_initdb::RunInitdbArgs {
superuser,
locale: DEFAULT_LOCALE, // XXX: this shouldn't be hard-coded,
pg_version,
initdb_bin: pg_bin_dir.join("initdb").as_ref(),
library_search_path: &pg_lib_dir, // TODO: is this right? Prob works in compute image, not sure about neon_local.
pgdata: &pgdata_dir,
})
.await
.context("initdb")?;
// Somewhat arbitrarily, use 10 % of memory for shared buffer cache, 70% for
// maintenance_work_mem (i.e. for sorting during index creation), and leave the rest
// available for misc other stuff that PostgreSQL uses memory for.
let shared_buffers_mb = ((memory_mb as f32) * 0.10) as usize;
let maintenance_work_mem_mb = ((memory_mb as f32) * 0.70) as usize;
// If the caller didn't specify CPU / RAM to use for sizing, default to
// number of CPUs in the system, and pretty arbitrarily, 256 MB of RAM.
let nproc = args.num_cpus.unwrap_or_else(num_cpus::get);
let memory_mb = args.memory_mb.unwrap_or(256);
//
// Launch postgres process
//
let mut proc = tokio::process::Command::new(&self.pgbin)
.arg("-D")
.arg(&self.pgdata_dir)
.args(["-p", &format!("{port}")])
.args(["-c", "wal_level=minimal"])
.args(["-c", &format!("shared_buffers={shared_buffers_mb}MB")])
.args(["-c", "max_wal_senders=0"])
.args(["-c", "fsync=off"])
.args(["-c", "full_page_writes=off"])
.args(["-c", "synchronous_commit=off"])
.args([
"-c",
&format!("maintenance_work_mem={maintenance_work_mem_mb}MB"),
])
.args(["-c", &format!("max_parallel_maintenance_workers={nproc}")])
.args(["-c", &format!("max_parallel_workers={nproc}")])
.args(["-c", &format!("max_parallel_workers_per_gather={nproc}")])
.args(["-c", &format!("max_worker_processes={nproc}")])
.args(["-c", "effective_io_concurrency=100"])
.env_clear()
.env("LD_LIBRARY_PATH", &self.pg_lib_dir)
.env(
"ASAN_OPTIONS",
std::env::var("ASAN_OPTIONS").unwrap_or_default(),
)
.env(
"UBSAN_OPTIONS",
std::env::var("UBSAN_OPTIONS").unwrap_or_default(),
)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.context("spawn postgres")?;
// Somewhat arbitrarily, use 10 % of memory for shared buffer cache, 70% for
// maintenance_work_mem (i.e. for sorting during index creation), and leave the rest
// available for misc other stuff that PostgreSQL uses memory for.
let shared_buffers_mb = ((memory_mb as f32) * 0.10) as usize;
let maintenance_work_mem_mb = ((memory_mb as f32) * 0.70) as usize;
info!("spawned postgres, waiting for it to become ready");
tokio::spawn(
child_stdio_to_log::relay_process_output(proc.stdout.take(), proc.stderr.take())
.instrument(info_span!("postgres")),
);
self.postgres_proc = Some(proc);
Ok(self.postgres_proc.as_ref().unwrap())
}
async fn shutdown(&mut self) -> Result<(), anyhow::Error> {
let proc: &mut tokio::process::Child = self.postgres_proc.as_mut().unwrap();
info!("shutdown postgres");
nix::sys::signal::kill(
Pid::from_raw(i32::try_from(proc.id().unwrap()).expect("convert child pid to i32")),
nix::sys::signal::SIGTERM,
//
// Launch postgres process
//
let mut postgres_proc = tokio::process::Command::new(pgbin)
.arg("-D")
.arg(&pgdata_dir)
.args(["-p", &format!("{pg_port}")])
.args(["-c", "wal_level=minimal"])
.args(["-c", &format!("shared_buffers={shared_buffers_mb}MB")])
.args(["-c", "max_wal_senders=0"])
.args(["-c", "fsync=off"])
.args(["-c", "full_page_writes=off"])
.args(["-c", "synchronous_commit=off"])
.args([
"-c",
&format!("maintenance_work_mem={maintenance_work_mem_mb}MB"),
])
.args(["-c", &format!("max_parallel_maintenance_workers={nproc}")])
.args(["-c", &format!("max_parallel_workers={nproc}")])
.args(["-c", &format!("max_parallel_workers_per_gather={nproc}")])
.args(["-c", &format!("max_worker_processes={nproc}")])
.args([
"-c",
&format!(
"effective_io_concurrency={}",
if cfg!(target_os = "macos") { 0 } else { 100 }
),
])
.env_clear()
.env("LD_LIBRARY_PATH", &pg_lib_dir)
.env(
"ASAN_OPTIONS",
std::env::var("ASAN_OPTIONS").unwrap_or_default(),
)
.context("signal postgres to shut down")?;
proc.wait()
.await
.context("wait for postgres to shut down")
.map(|_| ())
}
}
.env(
"UBSAN_OPTIONS",
std::env::var("UBSAN_OPTIONS").unwrap_or_default(),
)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.context("spawn postgres")?;
info!("spawned postgres, waiting for it to become ready");
tokio::spawn(
child_stdio_to_log::relay_process_output(
postgres_proc.stdout.take(),
postgres_proc.stderr.take(),
)
.instrument(info_span!("postgres")),
);
async fn wait_until_ready(connstring: String, create_dbname: String) {
// Create neondb database in the running postgres
let restore_pg_connstring =
format!("host=localhost port={pg_port} user={superuser} dbname=postgres");
let start_time = std::time::Instant::now();
loop {
@@ -276,12 +289,7 @@ async fn wait_until_ready(connstring: String, create_dbname: String) {
std::process::exit(1);
}
match tokio_postgres::connect(
&connstring.replace("dbname=neondb", "dbname=postgres"),
tokio_postgres::NoTls,
)
.await
{
match tokio_postgres::connect(&restore_pg_connstring, tokio_postgres::NoTls).await {
Ok((client, connection)) => {
// Spawn the connection handling task to maintain the connection
tokio::spawn(async move {
@@ -290,12 +298,9 @@ async fn wait_until_ready(connstring: String, create_dbname: String) {
}
});
match client
.simple_query(format!("CREATE DATABASE {create_dbname};").as_str())
.await
{
match client.simple_query("CREATE DATABASE neondb;").await {
Ok(_) => {
info!("created {} database", create_dbname);
info!("created neondb database");
break;
}
Err(e) => {
@@ -319,16 +324,10 @@ async fn wait_until_ready(connstring: String, create_dbname: String) {
}
}
}
}
async fn run_dump_restore(
workdir: Utf8PathBuf,
pg_bin_dir: Utf8PathBuf,
pg_lib_dir: Utf8PathBuf,
source_connstring: String,
destination_connstring: String,
) -> Result<(), anyhow::Error> {
let dumpdir = workdir.join("dumpdir");
let restore_pg_connstring = restore_pg_connstring.replace("dbname=postgres", "dbname=neondb");
let dumpdir = working_directory.join("dumpdir");
let common_args = [
// schema mapping (prob suffices to specify them on one side)
@@ -357,7 +356,7 @@ async fn run_dump_restore(
.arg("--no-sync")
// POSITIONAL args
// source db (db name included in connection string)
.arg(&source_connstring)
.arg(&source_connection_string)
// how we run it
.env_clear()
.env("LD_LIBRARY_PATH", &pg_lib_dir)
@@ -377,18 +376,19 @@ async fn run_dump_restore(
let st = pg_dump.wait().await.context("wait for pg_dump")?;
info!(status=?st, "pg_dump exited");
if !st.success() {
error!(status=%st, "pg_dump failed, restore will likely fail as well");
bail!("pg_dump failed");
warn!(status=%st, "pg_dump failed, restore will likely fail as well");
}
}
// TODO: maybe do it in a streaming way, plenty of internal research done on this already
// TODO: do it in a streaming way, plenty of internal research done on this already
// TODO: do the unlogged table trick
info!("restore from working directory into vanilla postgres");
{
let mut pg_restore = tokio::process::Command::new(pg_bin_dir.join("pg_restore"))
.args(&common_args)
.arg("-d")
.arg(&destination_connstring)
.arg(&restore_pg_connstring)
// POSITIONAL args
.arg(&dumpdir)
// how we run it
@@ -411,259 +411,48 @@ async fn run_dump_restore(
let st = pg_restore.wait().await.context("wait for pg_restore")?;
info!(status=?st, "pg_restore exited");
if !st.success() {
error!(status=%st, "pg_restore failed, restore will likely fail as well");
bail!("pg_restore failed");
warn!(status=%st, "pg_restore failed, restore will likely fail as well");
}
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
async fn cmd_pgdata(
s3_client: Option<aws_sdk_s3::Client>,
kms_client: Option<aws_sdk_kms::Client>,
maybe_s3_prefix: Option<s3_uri::S3Uri>,
maybe_spec: Option<Spec>,
source_connection_string: Option<String>,
interactive: bool,
pg_port: u16,
workdir: Utf8PathBuf,
pg_bin_dir: Utf8PathBuf,
pg_lib_dir: Utf8PathBuf,
num_cpus: Option<usize>,
memory_mb: Option<usize>,
) -> Result<(), anyhow::Error> {
if maybe_spec.is_none() && source_connection_string.is_none() {
bail!("spec must be provided for pgdata command");
}
if maybe_spec.is_some() && source_connection_string.is_some() {
bail!("only one of spec or source_connection_string can be provided");
}
let source_connection_string = if let Some(spec) = maybe_spec {
match spec.encryption_secret {
EncryptionSecret::KMS { key_id } => {
decode_connstring(
kms_client.as_ref().unwrap(),
&key_id,
spec.source_connstring_ciphertext_base64,
)
.await?
}
}
} else {
source_connection_string.unwrap()
};
let superuser = "cloud_admin";
let destination_connstring = format!(
"host=localhost port={} user={} dbname=neondb",
pg_port, superuser
);
let pgdata_dir = workdir.join("pgdata");
let mut proc = PostgresProcess::new(pgdata_dir.clone(), pg_bin_dir.clone(), pg_lib_dir.clone());
let nproc = num_cpus.unwrap_or_else(num_cpus::get);
let memory_mb = memory_mb.unwrap_or(256);
proc.start(superuser, pg_port, nproc, memory_mb).await?;
wait_until_ready(destination_connstring.clone(), "neondb".to_string()).await;
run_dump_restore(
workdir.clone(),
pg_bin_dir,
pg_lib_dir,
source_connection_string,
destination_connstring,
)
.await?;
// If interactive mode, wait for Ctrl+C
if interactive {
if args.interactive {
info!("Running in interactive mode. Press Ctrl+C to shut down.");
tokio::signal::ctrl_c().await.context("wait for ctrl-c")?;
}
proc.shutdown().await?;
info!("shutdown postgres");
{
nix::sys::signal::kill(
Pid::from_raw(
i32::try_from(postgres_proc.id().unwrap()).expect("convert child pid to i32"),
),
nix::sys::signal::SIGTERM,
)
.context("signal postgres to shut down")?;
postgres_proc
.wait()
.await
.context("wait for postgres to shut down")?;
}
// Only sync if s3_prefix was specified
if let Some(s3_prefix) = maybe_s3_prefix {
if let Some(s3_prefix) = args.s3_prefix {
info!("upload pgdata");
aws_s3_sync::upload_dir_recursive(
s3_client.as_ref().unwrap(),
Utf8Path::new(&pgdata_dir),
&s3_prefix.append("/pgdata/"),
)
.await
.context("sync dump directory to destination")?;
aws_s3_sync::sync(Utf8Path::new(&pgdata_dir), &s3_prefix.append("/pgdata/"))
.await
.context("sync dump directory to destination")?;
info!("write status");
{
let status_dir = workdir.join("status");
let status_dir = working_directory.join("status");
std::fs::create_dir(&status_dir).context("create status directory")?;
let status_file = status_dir.join("pgdata");
std::fs::write(&status_file, serde_json::json!({"done": true}).to_string())
.context("write status file")?;
aws_s3_sync::upload_dir_recursive(
s3_client.as_ref().unwrap(),
&status_dir,
&s3_prefix.append("/status/"),
)
.await
.context("sync status directory to destination")?;
}
}
Ok(())
}
async fn cmd_dumprestore(
kms_client: Option<aws_sdk_kms::Client>,
maybe_spec: Option<Spec>,
source_connection_string: Option<String>,
destination_connection_string: Option<String>,
workdir: Utf8PathBuf,
pg_bin_dir: Utf8PathBuf,
pg_lib_dir: Utf8PathBuf,
) -> Result<(), anyhow::Error> {
let (source_connstring, destination_connstring) = if let Some(spec) = maybe_spec {
match spec.encryption_secret {
EncryptionSecret::KMS { key_id } => {
let source = decode_connstring(
kms_client.as_ref().unwrap(),
&key_id,
spec.source_connstring_ciphertext_base64,
)
.await?;
let dest = if let Some(dest_ciphertext) =
spec.destination_connstring_ciphertext_base64
{
decode_connstring(kms_client.as_ref().unwrap(), &key_id, dest_ciphertext)
.await?
} else {
bail!("destination connection string must be provided in spec for dump_restore command");
};
(source, dest)
}
}
} else {
(
source_connection_string.unwrap(),
if let Some(val) = destination_connection_string {
val
} else {
bail!("destination connection string must be provided for dump_restore command");
},
)
};
run_dump_restore(
workdir,
pg_bin_dir,
pg_lib_dir,
source_connstring,
destination_connstring,
)
.await
}
#[tokio::main]
pub(crate) async fn main() -> anyhow::Result<()> {
utils::logging::init(
utils::logging::LogFormat::Json,
utils::logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
utils::logging::Output::Stdout,
)?;
info!("starting");
let args = Args::parse();
// Initialize AWS clients only if s3_prefix is specified
let (s3_client, kms_client) = if args.s3_prefix.is_some() {
let config = aws_config::load_defaults(BehaviorVersion::v2024_03_28()).await;
let s3_client = aws_sdk_s3::Client::new(&config);
let kms = aws_sdk_kms::Client::new(&config);
(Some(s3_client), Some(kms))
} else {
(None, None)
};
let spec: Option<Spec> = if let Some(s3_prefix) = &args.s3_prefix {
let spec_key = s3_prefix.append("/spec.json");
let object = s3_client
.as_ref()
.unwrap()
.get_object()
.bucket(&spec_key.bucket)
.key(spec_key.key)
.send()
.await
.context("get spec from s3")?
.body
.collect()
.await
.context("download spec body")?;
serde_json::from_slice(&object.into_bytes()).context("parse spec as json")?
} else {
None
};
match tokio::fs::create_dir(&args.working_directory).await {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
if !is_directory_empty(&args.working_directory)
aws_s3_sync::sync(&status_dir, &s3_prefix.append("/status/"))
.await
.context("check if working directory is empty")?
{
bail!("working directory is not empty");
} else {
// ok
}
}
Err(e) => return Err(anyhow::Error::new(e).context("create working directory")),
}
match args.command {
Command::Pgdata {
source_connection_string,
interactive,
pg_port,
num_cpus,
memory_mb,
} => {
cmd_pgdata(
s3_client,
kms_client,
args.s3_prefix,
spec,
source_connection_string,
interactive,
pg_port,
args.working_directory,
args.pg_bin_dir,
args.pg_lib_dir,
num_cpus,
memory_mb,
)
.await?;
}
Command::DumpRestore {
source_connection_string,
destination_connection_string,
} => {
cmd_dumprestore(
kms_client,
spec,
source_connection_string,
destination_connection_string,
args.working_directory,
args.pg_bin_dir,
args.pg_lib_dir,
)
.await?;
.context("sync status directory to destination")?;
}
}

View File

@@ -1,102 +1,24 @@
use camino::{Utf8Path, Utf8PathBuf};
use tokio::task::JoinSet;
use walkdir::WalkDir;
use anyhow::Context;
use camino::Utf8Path;
use super::s3_uri::S3Uri;
use tracing::{info, warn};
const MAX_PARALLEL_UPLOADS: usize = 10;
/// Upload all files from 'local' to 'remote'
pub(crate) async fn upload_dir_recursive(
s3_client: &aws_sdk_s3::Client,
local: &Utf8Path,
remote: &S3Uri,
) -> anyhow::Result<()> {
// Recursively scan directory
let mut dirwalker = WalkDir::new(local)
.into_iter()
.map(|entry| {
let entry = entry?;
let file_type = entry.file_type();
let path = <&Utf8Path>::try_from(entry.path())?.to_path_buf();
Ok((file_type, path))
})
.filter_map(|e: anyhow::Result<(std::fs::FileType, Utf8PathBuf)>| {
match e {
Ok((file_type, path)) if file_type.is_file() => Some(Ok(path)),
Ok((file_type, _path)) if file_type.is_dir() => {
// The WalkDir iterator will recurse into directories, but we don't want
// to do anything with directories as such. There's no concept of uploading
// an empty directory to S3.
None
}
Ok((file_type, path)) if file_type.is_symlink() => {
// huh, didn't expect a symlink. Can't upload that to S3. Warn and skip.
warn!("cannot upload symlink ({})", path);
None
}
Ok((_file_type, path)) => {
// should not happen
warn!("directory entry has unexpected type ({})", path);
None
}
Err(e) => Some(Err(e)),
}
});
// Spawn upload tasks for each file, keeping MAX_PARALLEL_UPLOADS active in
// parallel.
let mut joinset = JoinSet::new();
loop {
// Could we upload more?
while joinset.len() < MAX_PARALLEL_UPLOADS {
if let Some(full_local_path) = dirwalker.next() {
let full_local_path = full_local_path?;
let relative_local_path = full_local_path
.strip_prefix(local)
.expect("all paths start from the walkdir root");
let remote_path = remote.append(relative_local_path.as_str());
info!(
"starting upload of {} to {}",
&full_local_path, &remote_path
);
let upload_task = upload_file(s3_client.clone(), full_local_path, remote_path);
joinset.spawn(upload_task);
} else {
info!("draining upload tasks");
break;
}
}
// Wait for an upload to complete
if let Some(res) = joinset.join_next().await {
let _ = res?;
} else {
// all done!
break;
}
pub(crate) async fn sync(local: &Utf8Path, remote: &S3Uri) -> anyhow::Result<()> {
let mut builder = tokio::process::Command::new("aws");
builder
.arg("s3")
.arg("sync")
.arg(local.as_str())
.arg(remote.to_string());
let st = builder
.spawn()
.context("spawn aws s3 sync")?
.wait()
.await
.context("wait for aws s3 sync")?;
if st.success() {
Ok(())
} else {
Err(anyhow::anyhow!("aws s3 sync failed"))
}
Ok(())
}
pub(crate) async fn upload_file(
s3_client: aws_sdk_s3::Client,
local_path: Utf8PathBuf,
remote: S3Uri,
) -> anyhow::Result<()> {
use aws_smithy_types::byte_stream::ByteStream;
let stream = ByteStream::from_path(&local_path).await?;
let _result = s3_client
.put_object()
.bucket(remote.bucket)
.key(&remote.key)
.body(stream)
.send()
.await?;
info!("upload of {} to {} finished", &local_path, &remote.key);
Ok(())
}

View File

@@ -7,7 +7,6 @@ use std::{
use anyhow::Result;
use axum::{
body::Body,
extract::Request,
middleware::{self, Next},
response::{IntoResponse, Response},
@@ -17,7 +16,6 @@ use axum::{
use http::StatusCode;
use tokio::net::TcpListener;
use tower::ServiceBuilder;
use tower_http::classify::ServerErrorsFailureClass;
use tower_http::{request_id::PropagateRequestIdLayer, trace::TraceLayer};
use tracing::{debug, error, info, Span};
use uuid::Uuid;
@@ -86,85 +84,46 @@ impl From<Server> for Router<Arc<ComputeNode>> {
.route("/terminate", post(terminate::terminate)),
};
router
.fallback(Server::handle_404)
.method_not_allowed_fallback(Server::handle_405)
.layer(
ServiceBuilder::new()
// Add this middleware since we assume the request ID exists
.layer(middleware::from_fn(maybe_add_request_id_header))
.layer(
TraceLayer::new_for_http()
.make_span_with(|request: &Request<Body>| {
let request_id = request
router.fallback(Server::handle_404).method_not_allowed_fallback(Server::handle_405).layer(
ServiceBuilder::new()
// Add this middleware since we assume the request ID exists
.layer(middleware::from_fn(maybe_add_request_id_header))
.layer(
TraceLayer::new_for_http()
.on_request(|request: &http::Request<_>, _span: &Span| {
let request_id = request
.headers()
.get(X_REQUEST_ID)
.unwrap()
.to_str()
.unwrap();
match request.uri().path() {
"/metrics" => {
debug!(%request_id, "{} {}", request.method(), request.uri())
}
_ => info!(%request_id, "{} {}", request.method(), request.uri()),
};
})
.on_response(
|response: &http::Response<_>, latency: Duration, _span: &Span| {
let request_id = response
.headers()
.get(X_REQUEST_ID)
.unwrap()
.to_str()
.unwrap();
match request.uri().path() {
"/metrics" => {
tracing::span!(
tracing::Level::DEBUG,
"",
method = tracing::field::display(request.method()),
uri = tracing::field::display(request.uri()),
request_id = tracing::field::display(request_id)
)
}
_ => tracing::span!(
tracing::Level::INFO,
"",
method = tracing::field::display(request.method()),
uri = tracing::field::display(request.uri()),
request_id = tracing::field::display(request_id)
),
}
})
.on_request(|request: &http::Request<_>, _span: &Span| {
match request.uri().path() {
"/metrics" => debug!("incoming request"),
_ => info!("incoming request"),
};
})
.on_response(
|response: &http::Response<_>, latency: Duration, _span: &Span| {
// All errors will be logged in the on_failure handler
if let 200..=399 = response.status().as_u16() {
info!(
message = "request finished",
code = %response.status().as_u16(),
latency_ms = %latency.as_millis()
)
}
},
)
.on_failure(
|error: ServerErrorsFailureClass,
latency: Duration,
_span: &Span| {
match error {
ServerErrorsFailureClass::StatusCode(code) => {
error!(
message = "request failed",
code = %code,
latency_ms = %latency.as_millis()
);
}
ServerErrorsFailureClass::Error(error) => {
error!(
message = "request failed unexpectedly",
error = %error,
latency_ms = %latency.as_millis()
);
}
}
},
),
)
.layer(PropagateRequestIdLayer::x_request_id()),
)
info!(
%request_id,
code = response.status().as_u16(),
latency = latency.as_millis()
)
},
),
)
.layer(PropagateRequestIdLayer::x_request_id()),
)
}
}

View File

@@ -11,7 +11,6 @@ if [ -z ${OLDTAG+x} ] || [ -z ${NEWTAG+x} ] || [ -z "${OLDTAG}" ] || [ -z "${NEW
exit 1
fi
export PG_VERSION=${PG_VERSION:-16}
export PG_TEST_VERSION=${PG_VERSION}
function wait_for_ready {
TIME=0
while ! docker compose logs compute_is_ready | grep -q "accepting connections" && [ ${TIME} -le 300 ] ; do
@@ -60,12 +59,8 @@ docker compose cp ext-src neon-test-extensions:/
docker compose exec neon-test-extensions psql -c "DROP DATABASE IF EXISTS contrib_regression"
docker compose exec neon-test-extensions psql -c "CREATE DATABASE contrib_regression"
create_extensions "${EXTNAMES}"
if [ "${FORCE_ALL_UPGRADE_TESTS:-false}" = true ]; then
exts="${EXTNAMES}"
else
query="select pge.extname from pg_extension pge join (select key as extname, value as extversion from json_each_text('${new_vers}')) x on pge.extname=x.extname and pge.extversion <> x.extversion"
exts=$(docker compose exec neon-test-extensions psql -Aqt -d contrib_regression -c "$query")
fi
query="select pge.extname from pg_extension pge join (select key as extname, value as extversion from json_each_text('${new_vers}')) x on pge.extname=x.extname and pge.extversion <> x.extversion"
exts=$(docker compose exec neon-test-extensions psql -Aqt -d contrib_regression -c "$query")
if [ -z "${exts}" ]; then
echo "No extensions were upgraded"
else
@@ -93,10 +88,7 @@ else
exit 1
fi
docker compose exec neon-test-extensions psql -d contrib_regression -c "\dx ${ext}"
if ! docker compose exec neon-test-extensions sh -c /ext-src/${EXTDIR}/test-upgrade.sh; then
docker compose exec neon-test-extensions cat /ext-src/${EXTDIR}/regression.diffs
exit 1
fi
docker compose exec neon-test-extensions sh -c /ext-src/${EXTDIR}/test-upgrade.sh
docker compose exec neon-test-extensions psql -d contrib_regression -c "alter extension ${ext} update"
docker compose exec neon-test-extensions psql -d contrib_regression -c "\dx ${ext}"
done

View File

@@ -351,7 +351,7 @@ pub struct TenantConfigToml {
/// Enable rel_size_v2 for this tenant. Once enabled, the tenant will persist this information into
/// `index_part.json`, and it cannot be reversed.
pub rel_size_v2_enabled: bool,
pub rel_size_v2_enabled: Option<bool>,
// gc-compaction related configs
/// Enable automatic gc-compaction trigger on this tenant.
@@ -633,7 +633,7 @@ impl Default for TenantConfigToml {
lsn_lease_length_for_ts: LsnLease::DEFAULT_LENGTH_FOR_TS,
timeline_offloading: true,
wal_receiver_protocol_override: None,
rel_size_v2_enabled: false,
rel_size_v2_enabled: None,
gc_compaction_enabled: DEFAULT_GC_COMPACTION_ENABLED,
gc_compaction_initial_threshold_kb: DEFAULT_GC_COMPACTION_INITIAL_THRESHOLD_KB,
gc_compaction_ratio_percent: DEFAULT_GC_COMPACTION_RATIO_PERCENT,

View File

@@ -1,12 +1,10 @@
use anyhow::{bail, Result};
use byteorder::{ByteOrder, BE};
use bytes::Bytes;
use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::Oid;
use postgres_ffi::RepOriginId;
use serde::{Deserialize, Serialize};
use std::{fmt, ops::Range};
use utils::const_assert;
use crate::reltag::{BlockNumber, RelTag, SlruKind};
@@ -51,64 +49,6 @@ pub const AUX_KEY_PREFIX: u8 = 0x62;
/// The key prefix of ReplOrigin keys.
pub const REPL_ORIGIN_KEY_PREFIX: u8 = 0x63;
/// The key prefix of db directory keys.
pub const DB_DIR_KEY_PREFIX: u8 = 0x64;
/// The key prefix of rel directory keys.
pub const REL_DIR_KEY_PREFIX: u8 = 0x65;
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub enum RelDirExists {
Exists,
Removed,
}
#[derive(Debug)]
pub struct DecodeError;
impl fmt::Display for DecodeError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "invalid marker")
}
}
impl std::error::Error for DecodeError {}
impl RelDirExists {
/// The value of the rel directory keys that indicates the existence of a relation.
const REL_EXISTS_MARKER: Bytes = Bytes::from_static(b"r");
pub fn encode(&self) -> Bytes {
match self {
Self::Exists => Self::REL_EXISTS_MARKER.clone(),
Self::Removed => SPARSE_TOMBSTONE_MARKER.clone(),
}
}
pub fn decode_option(data: Option<impl AsRef<[u8]>>) -> Result<Self, DecodeError> {
match data {
Some(marker) if marker.as_ref() == Self::REL_EXISTS_MARKER => Ok(Self::Exists),
// Any other marker is invalid
Some(_) => Err(DecodeError),
None => Ok(Self::Removed),
}
}
pub fn decode(data: impl AsRef<[u8]>) -> Result<Self, DecodeError> {
let data = data.as_ref();
if data == Self::REL_EXISTS_MARKER {
Ok(Self::Exists)
} else if data == SPARSE_TOMBSTONE_MARKER {
Ok(Self::Removed)
} else {
Err(DecodeError)
}
}
}
/// A tombstone in the sparse keyspace, which is an empty buffer.
pub const SPARSE_TOMBSTONE_MARKER: Bytes = Bytes::from_static(b"");
/// Check if the key falls in the range of metadata keys.
pub const fn is_metadata_key_slice(key: &[u8]) -> bool {
key[0] >= METADATA_KEY_BEGIN_PREFIX && key[0] < METADATA_KEY_END_PREFIX
@@ -170,24 +110,6 @@ impl Key {
}
}
pub fn rel_dir_sparse_key_range() -> Range<Self> {
Key {
field1: REL_DIR_KEY_PREFIX,
field2: 0,
field3: 0,
field4: 0,
field5: 0,
field6: 0,
}..Key {
field1: REL_DIR_KEY_PREFIX + 1,
field2: 0,
field3: 0,
field4: 0,
field5: 0,
field6: 0,
}
}
/// This function checks more extensively what keys we can take on the write path.
/// If a key beginning with 00 does not have a global/default tablespace OID, it
/// will be rejected on the write path.
@@ -518,36 +440,6 @@ pub fn rel_dir_to_key(spcnode: Oid, dbnode: Oid) -> Key {
}
}
#[inline(always)]
pub fn rel_tag_sparse_key(spcnode: Oid, dbnode: Oid, relnode: Oid, forknum: u8) -> Key {
Key {
field1: REL_DIR_KEY_PREFIX,
field2: spcnode,
field3: dbnode,
field4: relnode,
field5: forknum,
field6: 1,
}
}
pub fn rel_tag_sparse_key_range(spcnode: Oid, dbnode: Oid) -> Range<Key> {
Key {
field1: REL_DIR_KEY_PREFIX,
field2: spcnode,
field3: dbnode,
field4: 0,
field5: 0,
field6: 0,
}..Key {
field1: REL_DIR_KEY_PREFIX,
field2: spcnode,
field3: dbnode,
field4: u32::MAX,
field5: u8::MAX,
field6: u32::MAX,
} // it's fine to exclude the last key b/c we only use field6 == 1
}
#[inline(always)]
pub fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key {
Key {
@@ -842,9 +734,9 @@ impl Key {
self.field1 == RELATION_SIZE_PREFIX
}
pub const fn sparse_non_inherited_keyspace() -> Range<Key> {
pub fn sparse_non_inherited_keyspace() -> Range<Key> {
// The two keys are adjacent; if we will have non-adjancent keys in the future, we should return a keyspace
const_assert!(AUX_KEY_PREFIX + 1 == REPL_ORIGIN_KEY_PREFIX);
debug_assert_eq!(AUX_KEY_PREFIX + 1, REPL_ORIGIN_KEY_PREFIX);
Key {
field1: AUX_KEY_PREFIX,
field2: 0,

View File

@@ -1144,7 +1144,6 @@ pub struct TimelineInfo {
/// The LSN up to which GC has advanced: older data may still exist but it is not available for clients.
/// This LSN is not suitable for deciding where to create branches etc: use [`TimelineInfo::min_readable_lsn`] instead,
/// as it is easier to reason about.
#[serde(default)]
pub applied_gc_cutoff_lsn: Lsn,
/// The upper bound of data which is either already GC'ed, or elegible to be GC'ed at any time based on PITR interval.
@@ -1153,7 +1152,6 @@ pub struct TimelineInfo {
///
/// Note that holders of valid LSN leases may be able to create branches and read pages earlier
/// than this LSN, but new leases may not be taken out earlier than this LSN.
#[serde(default)]
pub min_readable_lsn: Lsn,
pub disk_consistent_lsn: Lsn,

View File

@@ -9,8 +9,6 @@ use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::io::ErrorKind;
use std::net::SocketAddr;
use std::os::fd::AsRawFd;
use std::os::fd::RawFd;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{ready, Poll};
@@ -270,7 +268,6 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> MaybeWriteOnly<IO> {
}
pub struct PostgresBackend<IO> {
pub socket_fd: RawFd,
framed: MaybeWriteOnly<IO>,
pub state: ProtoState,
@@ -296,11 +293,9 @@ impl PostgresBackend<tokio::net::TcpStream> {
tls_config: Option<Arc<rustls::ServerConfig>>,
) -> io::Result<Self> {
let peer_addr = socket.peer_addr()?;
let socket_fd = socket.as_raw_fd();
let stream = MaybeTlsStream::Unencrypted(socket);
Ok(Self {
socket_fd,
framed: MaybeWriteOnly::Full(Framed::new(stream)),
state: ProtoState::Initialization,
auth_type,
@@ -312,7 +307,6 @@ impl PostgresBackend<tokio::net::TcpStream> {
impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
pub fn new_from_io(
socket_fd: RawFd,
socket: IO,
peer_addr: SocketAddr,
auth_type: AuthType,
@@ -321,7 +315,6 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
let stream = MaybeTlsStream::Unencrypted(socket);
Ok(Self {
socket_fd,
framed: MaybeWriteOnly::Full(Framed::new(stream)),
state: ProtoState::Initialization,
auth_type,

View File

@@ -10,8 +10,8 @@ use crate::simple_query::SimpleQueryStream;
use crate::types::{Oid, ToSql, Type};
use crate::{
query, simple_query, slice_iter, CancelToken, Error, ReadyForQueryStatus, Row,
SimpleQueryMessage, Statement, Transaction, TransactionBuilder,
prepare, query, simple_query, slice_iter, CancelToken, Error, ReadyForQueryStatus, Row,
SimpleQueryMessage, Statement, ToStatement, Transaction, TransactionBuilder,
};
use bytes::BytesMut;
use fallible_iterator::FallibleIterator;
@@ -54,18 +54,18 @@ impl Responses {
}
/// A cache of type info and prepared statements for fetching type info
/// (corresponding to the queries in the [crate::prepare] module).
/// (corresponding to the queries in the [prepare] module).
#[derive(Default)]
struct CachedTypeInfo {
/// A statement for basic information for a type from its
/// OID. Corresponds to [TYPEINFO_QUERY](crate::prepare::TYPEINFO_QUERY) (or its
/// OID. Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_QUERY) (or its
/// fallback).
typeinfo: Option<Statement>,
/// A statement for getting information for a composite type from its OID.
/// Corresponds to [TYPEINFO_QUERY](crate::prepare::TYPEINFO_COMPOSITE_QUERY).
/// Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY).
typeinfo_composite: Option<Statement>,
/// A statement for getting information for a composite type from its OID.
/// Corresponds to [TYPEINFO_QUERY](crate::prepare::TYPEINFO_COMPOSITE_QUERY) (or
/// Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY) (or
/// its fallback).
typeinfo_enum: Option<Statement>,
@@ -190,6 +190,26 @@ impl Client {
&self.inner
}
/// Creates a new prepared statement.
///
/// Prepared statements can be executed repeatedly, and may contain query parameters (indicated by `$1`, `$2`, etc),
/// which are set when executed. Prepared statements can only be used with the connection that created them.
pub async fn prepare(&self, query: &str) -> Result<Statement, Error> {
self.prepare_typed(query, &[]).await
}
/// Like `prepare`, but allows the types of query parameters to be explicitly specified.
///
/// The list of types may be smaller than the number of parameters - the types of the remaining parameters will be
/// inferred. For example, `client.prepare_typed(query, &[])` is equivalent to `client.prepare(query)`.
pub async fn prepare_typed(
&self,
query: &str,
parameter_types: &[Type],
) -> Result<Statement, Error> {
prepare::prepare(&self.inner, query, parameter_types).await
}
/// Executes a statement, returning a vector of the resulting rows.
///
/// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
@@ -202,11 +222,14 @@ impl Client {
/// # Panics
///
/// Panics if the number of parameters provided does not match the number expected.
pub async fn query(
pub async fn query<T>(
&self,
statement: Statement,
statement: &T,
params: &[&(dyn ToSql + Sync)],
) -> Result<Vec<Row>, Error> {
) -> Result<Vec<Row>, Error>
where
T: ?Sized + ToStatement,
{
self.query_raw(statement, slice_iter(params))
.await?
.try_collect()
@@ -227,15 +250,13 @@ impl Client {
/// Panics if the number of parameters provided does not match the number expected.
///
/// [`query`]: #method.query
pub async fn query_raw<'a, I>(
&self,
statement: Statement,
params: I,
) -> Result<RowStream, Error>
pub async fn query_raw<'a, T, I>(&self, statement: &T, params: I) -> Result<RowStream, Error>
where
T: ?Sized + ToStatement,
I: IntoIterator<Item = &'a (dyn ToSql + Sync)>,
I::IntoIter: ExactSizeIterator,
{
let statement = statement.__convert().into_statement(self).await?;
query::query(&self.inner, statement, params).await
}
@@ -250,6 +271,55 @@ impl Client {
query::query_txt(&self.inner, statement, params).await
}
/// Executes a statement, returning the number of rows modified.
///
/// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
/// provided, 1-indexed.
///
/// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
/// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
/// with the `prepare` method.
///
/// If the statement does not modify any rows (e.g. `SELECT`), 0 is returned.
///
/// # Panics
///
/// Panics if the number of parameters provided does not match the number expected.
pub async fn execute<T>(
&self,
statement: &T,
params: &[&(dyn ToSql + Sync)],
) -> Result<u64, Error>
where
T: ?Sized + ToStatement,
{
self.execute_raw(statement, slice_iter(params)).await
}
/// The maximally flexible version of [`execute`].
///
/// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
/// provided, 1-indexed.
///
/// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
/// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
/// with the `prepare` method.
///
/// # Panics
///
/// Panics if the number of parameters provided does not match the number expected.
///
/// [`execute`]: #method.execute
pub async fn execute_raw<'a, T, I>(&self, statement: &T, params: I) -> Result<u64, Error>
where
T: ?Sized + ToStatement,
I: IntoIterator<Item = &'a (dyn ToSql + Sync)>,
I::IntoIter: ExactSizeIterator,
{
let statement = statement.__convert().into_statement(self).await?;
query::execute(self.inner(), statement, params).await
}
/// Executes a sequence of SQL statements using the simple query protocol, returning the resulting rows.
///
/// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that

View File

@@ -1,8 +1,7 @@
#![allow(async_fn_in_trait)]
use crate::query::RowStream;
use crate::types::Type;
use crate::{Client, Error, Transaction};
use async_trait::async_trait;
use postgres_protocol2::Oid;
mod private {
@@ -12,6 +11,7 @@ mod private {
/// A trait allowing abstraction over connections and transactions.
///
/// This trait is "sealed", and cannot be implemented outside of this crate.
#[async_trait]
pub trait GenericClient: private::Sealed {
/// Like `Client::query_raw_txt`.
async fn query_raw_txt<S, I>(&self, statement: &str, params: I) -> Result<RowStream, Error>
@@ -26,6 +26,7 @@ pub trait GenericClient: private::Sealed {
impl private::Sealed for Client {}
#[async_trait]
impl GenericClient for Client {
async fn query_raw_txt<S, I>(&self, statement: &str, params: I) -> Result<RowStream, Error>
where
@@ -38,12 +39,14 @@ impl GenericClient for Client {
/// Query for type information
async fn get_type(&self, oid: Oid) -> Result<Type, Error> {
crate::prepare::get_type(self.inner(), oid).await
self.get_type(oid).await
}
}
impl private::Sealed for Transaction<'_> {}
#[async_trait]
#[allow(clippy::needless_lifetimes)]
impl GenericClient for Transaction<'_> {
async fn query_raw_txt<S, I>(&self, statement: &str, params: I) -> Result<RowStream, Error>
where

View File

@@ -14,6 +14,7 @@ pub use crate::row::{Row, SimpleQueryRow};
pub use crate::simple_query::SimpleQueryStream;
pub use crate::statement::{Column, Statement};
pub use crate::tls::NoTls;
pub use crate::to_statement::ToStatement;
pub use crate::transaction::Transaction;
pub use crate::transaction_builder::{IsolationLevel, TransactionBuilder};
use crate::types::ToSql;
@@ -64,6 +65,7 @@ pub mod row;
mod simple_query;
mod statement;
pub mod tls;
mod to_statement;
mod transaction;
mod transaction_builder;
pub mod types;

View File

@@ -1,6 +1,7 @@
use crate::client::InnerClient;
use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::error::SqlState;
use crate::types::{Field, Kind, Oid, Type};
use crate::{query, slice_iter};
use crate::{Column, Error, Statement};
@@ -12,6 +13,7 @@ use postgres_protocol2::message::backend::Message;
use postgres_protocol2::message::frontend;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
pub(crate) const TYPEINFO_QUERY: &str = "\
@@ -22,6 +24,14 @@ INNER JOIN pg_catalog.pg_namespace n ON t.typnamespace = n.oid
WHERE t.oid = $1
";
// Range types weren't added until Postgres 9.2, so pg_range may not exist
const TYPEINFO_FALLBACK_QUERY: &str = "\
SELECT t.typname, t.typtype, t.typelem, NULL::OID, t.typbasetype, n.nspname, t.typrelid
FROM pg_catalog.pg_type t
INNER JOIN pg_catalog.pg_namespace n ON t.typnamespace = n.oid
WHERE t.oid = $1
";
const TYPEINFO_ENUM_QUERY: &str = "\
SELECT enumlabel
FROM pg_catalog.pg_enum
@@ -29,6 +39,14 @@ WHERE enumtypid = $1
ORDER BY enumsortorder
";
// Postgres 9.0 didn't have enumsortorder
const TYPEINFO_ENUM_FALLBACK_QUERY: &str = "\
SELECT enumlabel
FROM pg_catalog.pg_enum
WHERE enumtypid = $1
ORDER BY oid
";
pub(crate) const TYPEINFO_COMPOSITE_QUERY: &str = "\
SELECT attname, atttypid
FROM pg_catalog.pg_attribute
@@ -38,13 +56,15 @@ AND attnum > 0
ORDER BY attnum
";
static NEXT_ID: AtomicUsize = AtomicUsize::new(0);
pub async fn prepare(
client: &Arc<InnerClient>,
name: &'static str,
query: &str,
types: &[Type],
) -> Result<Statement, Error> {
let buf = encode(client, name, query, types)?;
let name = format!("s{}", NEXT_ID.fetch_add(1, Ordering::SeqCst));
let buf = encode(client, &name, query, types)?;
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
match responses.next().await? {
@@ -85,11 +105,10 @@ pub async fn prepare(
fn prepare_rec<'a>(
client: &'a Arc<InnerClient>,
name: &'static str,
query: &'a str,
types: &'a [Type],
) -> Pin<Box<dyn Future<Output = Result<Statement, Error>> + 'a + Send>> {
Box::pin(prepare(client, name, query, types))
Box::pin(prepare(client, query, types))
}
fn encode(client: &InnerClient, name: &str, query: &str, types: &[Type]) -> Result<Bytes, Error> {
@@ -173,8 +192,13 @@ async fn typeinfo_statement(client: &Arc<InnerClient>) -> Result<Statement, Erro
return Ok(stmt);
}
let typeinfo = "neon_proxy_typeinfo";
let stmt = prepare_rec(client, typeinfo, TYPEINFO_QUERY, &[]).await?;
let stmt = match prepare_rec(client, TYPEINFO_QUERY, &[]).await {
Ok(stmt) => stmt,
Err(ref e) if e.code() == Some(&SqlState::UNDEFINED_TABLE) => {
prepare_rec(client, TYPEINFO_FALLBACK_QUERY, &[]).await?
}
Err(e) => return Err(e),
};
client.set_typeinfo(&stmt);
Ok(stmt)
@@ -195,8 +219,13 @@ async fn typeinfo_enum_statement(client: &Arc<InnerClient>) -> Result<Statement,
return Ok(stmt);
}
let typeinfo = "neon_proxy_typeinfo_enum";
let stmt = prepare_rec(client, typeinfo, TYPEINFO_ENUM_QUERY, &[]).await?;
let stmt = match prepare_rec(client, TYPEINFO_ENUM_QUERY, &[]).await {
Ok(stmt) => stmt,
Err(ref e) if e.code() == Some(&SqlState::UNDEFINED_COLUMN) => {
prepare_rec(client, TYPEINFO_ENUM_FALLBACK_QUERY, &[]).await?
}
Err(e) => return Err(e),
};
client.set_typeinfo_enum(&stmt);
Ok(stmt)
@@ -226,8 +255,7 @@ async fn typeinfo_composite_statement(client: &Arc<InnerClient>) -> Result<State
return Ok(stmt);
}
let typeinfo = "neon_proxy_typeinfo_composite";
let stmt = prepare_rec(client, typeinfo, TYPEINFO_COMPOSITE_QUERY, &[]).await?;
let stmt = prepare_rec(client, TYPEINFO_COMPOSITE_QUERY, &[]).await?;
client.set_typeinfo_composite(&stmt);
Ok(stmt)

View File

@@ -157,6 +157,49 @@ where
})
}
pub async fn execute<'a, I>(
client: &InnerClient,
statement: Statement,
params: I,
) -> Result<u64, Error>
where
I: IntoIterator<Item = &'a (dyn ToSql + Sync)>,
I::IntoIter: ExactSizeIterator,
{
let buf = if log_enabled!(Level::Debug) {
let params = params.into_iter().collect::<Vec<_>>();
debug!(
"executing statement {} with parameters: {:?}",
statement.name(),
BorrowToSqlParamsDebug(params.as_slice()),
);
encode(client, &statement, params)?
} else {
encode(client, &statement, params)?
};
let mut responses = start(client, buf).await?;
let mut rows = 0;
loop {
match responses.next().await? {
Message::DataRow(_) => {}
Message::CommandComplete(body) => {
rows = body
.tag()
.map_err(Error::parse)?
.rsplit(' ')
.next()
.unwrap()
.parse()
.unwrap_or(0);
}
Message::EmptyQueryResponse => rows = 0,
Message::ReadyForQuery(_) => return Ok(rows),
_ => return Err(Error::unexpected_message()),
}
}
}
async fn start(client: &InnerClient, buf: Bytes) -> Result<Responses, Error> {
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;

View File

@@ -13,7 +13,7 @@ use std::{
struct StatementInner {
client: Weak<InnerClient>,
name: &'static str,
name: String,
params: Vec<Type>,
columns: Vec<Column>,
}
@@ -22,7 +22,7 @@ impl Drop for StatementInner {
fn drop(&mut self) {
if let Some(client) = self.client.upgrade() {
let buf = client.with_buf(|buf| {
frontend::close(b'S', self.name, buf).unwrap();
frontend::close(b'S', &self.name, buf).unwrap();
frontend::sync(buf);
buf.split().freeze()
});
@@ -40,7 +40,7 @@ pub struct Statement(Arc<StatementInner>);
impl Statement {
pub(crate) fn new(
inner: &Arc<InnerClient>,
name: &'static str,
name: String,
params: Vec<Type>,
columns: Vec<Column>,
) -> Statement {
@@ -55,14 +55,14 @@ impl Statement {
pub(crate) fn new_anonymous(params: Vec<Type>, columns: Vec<Column>) -> Statement {
Statement(Arc::new(StatementInner {
client: Weak::new(),
name: "<anonymous>",
name: String::new(),
params,
columns,
}))
}
pub(crate) fn name(&self) -> &str {
self.0.name
&self.0.name
}
/// Returns the expected types of the statement's parameters.

View File

@@ -0,0 +1,57 @@
use crate::to_statement::private::{Sealed, ToStatementType};
use crate::Statement;
mod private {
use crate::{Client, Error, Statement};
pub trait Sealed {}
pub enum ToStatementType<'a> {
Statement(&'a Statement),
Query(&'a str),
}
impl ToStatementType<'_> {
pub async fn into_statement(self, client: &Client) -> Result<Statement, Error> {
match self {
ToStatementType::Statement(s) => Ok(s.clone()),
ToStatementType::Query(s) => client.prepare(s).await,
}
}
}
}
/// A trait abstracting over prepared and unprepared statements.
///
/// Many methods are generic over this bound, so that they support both a raw query string as well as a statement which
/// was prepared previously.
///
/// This trait is "sealed" and cannot be implemented by anything outside this crate.
pub trait ToStatement: Sealed {
#[doc(hidden)]
fn __convert(&self) -> ToStatementType<'_>;
}
impl ToStatement for Statement {
fn __convert(&self) -> ToStatementType<'_> {
ToStatementType::Statement(self)
}
}
impl Sealed for Statement {}
impl ToStatement for str {
fn __convert(&self) -> ToStatementType<'_> {
ToStatementType::Query(self)
}
}
impl Sealed for str {}
impl ToStatement for String {
fn __convert(&self) -> ToStatementType<'_> {
ToStatementType::Query(self)
}
}
impl Sealed for String {}

View File

@@ -28,7 +28,7 @@ inferno.workspace = true
fail.workspace = true
futures = { workspace = true }
jsonwebtoken.workspace = true
nix = {workspace = true, features = [ "ioctl" ] }
nix.workspace = true
once_cell.workspace = true
pin-project-lite.workspace = true
regex.workspace = true

View File

@@ -93,9 +93,6 @@ pub mod try_rcu;
pub mod guard_arc_swap;
#[cfg(target_os = "linux")]
pub mod linux_socket_ioctl;
// Re-export used in macro. Avoids adding git-version as dep in target crates.
#[doc(hidden)]
pub use git_version;

View File

@@ -1,35 +0,0 @@
//! Linux-specific socket ioctls.
//!
//! <https://elixir.bootlin.com/linux/v6.1.128/source/include/uapi/linux/sockios.h#L25-L27>
use std::{
io,
mem::MaybeUninit,
os::{fd::RawFd, raw::c_int},
};
use nix::libc::{FIONREAD, TIOCOUTQ};
unsafe fn do_ioctl(socket_fd: RawFd, cmd: nix::libc::Ioctl) -> io::Result<c_int> {
let mut inq: MaybeUninit<c_int> = MaybeUninit::uninit();
let err = nix::libc::ioctl(socket_fd, cmd, inq.as_mut_ptr());
if err == 0 {
Ok(inq.assume_init())
} else {
Err(io::Error::last_os_error())
}
}
/// # Safety
///
/// Caller must ensure that `socket_fd` is a valid TCP socket file descriptor.
pub unsafe fn inq(socket_fd: RawFd) -> io::Result<c_int> {
do_ioctl(socket_fd, FIONREAD)
}
/// # Safety
///
/// Caller must ensure that `socket_fd` is a valid TCP socket file descriptor.
pub unsafe fn outq(socket_fd: RawFd) -> io::Result<c_int> {
do_ioctl(socket_fd, TIOCOUTQ)
}

View File

@@ -13,7 +13,7 @@
use anyhow::{anyhow, Context};
use bytes::{BufMut, Bytes, BytesMut};
use fail::fail_point;
use pageserver_api::key::{rel_block_to_key, Key};
use pageserver_api::key::Key;
use postgres_ffi::pg_constants;
use std::fmt::Write as FmtWrite;
use std::time::{Instant, SystemTime};
@@ -501,9 +501,13 @@ where
for blknum in startblk..endblk {
let img = self
.timeline
// TODO: investigate using get_vectored for the entire startblk..endblk range.
// But this code path is not on the critical path for most basebackups (?).
.get(rel_block_to_key(src, blknum), self.lsn, self.ctx)
.get_rel_page_at_lsn(
src,
blknum,
Version::Lsn(self.lsn),
self.ctx,
self.io_concurrency.clone(),
)
.await
.map_err(|e| BasebackupError::Server(e.into()))?;
segment_data.extend_from_slice(&img[..]);

View File

@@ -1,6 +1,5 @@
use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::os::fd::RawFd;
use std::pin::Pin;
use std::sync::atomic::AtomicU64;
use std::sync::{Arc, Mutex};
@@ -130,7 +129,7 @@ pub(crate) static LAYERS_PER_READ: Lazy<HistogramVec> = Lazy::new(|| {
"Layers visited to serve a single read (read amplification). In a batch, all visited layers count towards every read.",
&["tenant_id", "shard_id", "timeline_id"],
// Low resolution to reduce cardinality.
vec![4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0],
vec![1.0, 5.0, 10.0, 25.0, 50.0, 100.0],
)
.expect("failed to define a metric")
});
@@ -1440,66 +1439,27 @@ impl Drop for SmgrOpTimer {
}
impl SmgrOpFlushInProgress {
/// The caller must guarantee that `socket_fd`` outlives this function.
pub(crate) async fn measure<Fut, O>(
self,
started_at: Instant,
mut fut: Fut,
socket_fd: RawFd,
) -> O
pub(crate) async fn measure<Fut, O>(self, mut started_at: Instant, mut fut: Fut) -> O
where
Fut: std::future::Future<Output = O>,
{
let mut fut = std::pin::pin!(fut);
let mut logged = false;
let mut last_counter_increment_at = started_at;
// Whenever observe_guard gets called, or dropped,
// it adds the time elapsed since its last call to metrics.
// Last call is tracked in `now`.
let mut observe_guard = scopeguard::guard(
|is_timeout| {
|| {
let now = Instant::now();
// Increment counter
{
let elapsed_since_last_observe = now - last_counter_increment_at;
self.global_micros
.inc_by(u64::try_from(elapsed_since_last_observe.as_micros()).unwrap());
self.per_timeline_micros
.inc_by(u64::try_from(elapsed_since_last_observe.as_micros()).unwrap());
last_counter_increment_at = now;
}
// Log something on every timeout, and on completion but only if we hit a timeout.
if is_timeout || logged {
logged = true;
let elapsed_total = now - started_at;
let msg = if is_timeout {
"slow flush ongoing"
} else {
"slow flush completed or cancelled"
};
let (inq, outq) = {
// SAFETY: caller guarantees that `socket_fd` outlives this function.
#[cfg(target_os = "linux")]
unsafe {
(
utils::linux_socket_ioctl::inq(socket_fd).unwrap_or(-2),
utils::linux_socket_ioctl::outq(socket_fd).unwrap_or(-2),
)
}
#[cfg(not(target_os = "linux"))]
{
_ = socket_fd; // appease unused lint on macOS
(-1, -1)
}
};
let elapsed_total_secs = format!("{:.6}", elapsed_total.as_secs_f64());
tracing::info!(elapsed_total_secs, inq, outq, msg);
}
let elapsed = now - started_at;
self.global_micros
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
self.per_timeline_micros
.inc_by(u64::try_from(elapsed.as_micros()).unwrap());
started_at = now;
},
|mut observe| {
observe(false);
observe();
},
);
@@ -1507,7 +1467,7 @@ impl SmgrOpFlushInProgress {
match tokio::time::timeout(Duration::from_secs(10), &mut fut).await {
Ok(v) => return v,
Err(_timeout) => {
(*observe_guard)(true);
(*observe_guard)();
}
}
}

View File

@@ -73,7 +73,6 @@ use pageserver_api::models::PageTraceEvent;
use pageserver_api::reltag::SlruKind;
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi::BLCKSZ;
use std::os::fd::AsRawFd;
/// How long we may wait for a [`crate::tenant::mgr::TenantSlot::InProgress`]` and/or a [`crate::tenant::Tenant`] which
/// is not yet in state [`TenantState::Active`].
@@ -237,7 +236,7 @@ pub async fn libpq_listener_main(
type ConnectionHandlerResult = anyhow::Result<()>;
#[instrument(skip_all, fields(peer_addr, application_name))]
#[instrument(skip_all, fields(peer_addr))]
#[allow(clippy::too_many_arguments)]
async fn page_service_conn_main(
conf: &'static PageServerConf,
@@ -258,8 +257,6 @@ async fn page_service_conn_main(
.set_nodelay(true)
.context("could not set TCP_NODELAY")?;
let socket_fd = socket.as_raw_fd();
let peer_addr = socket.peer_addr().context("get peer address")?;
tracing::Span::current().record("peer_addr", field::display(peer_addr));
@@ -308,7 +305,7 @@ async fn page_service_conn_main(
cancel.clone(),
gate_guard,
);
let pgbackend = PostgresBackend::new_from_io(socket_fd, socket, peer_addr, auth_type, None)?;
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
match pgbackend.run(&mut conn_handler, &cancel).await {
Ok(()) => {
@@ -1289,15 +1286,12 @@ impl PageServerHandler {
))?;
// what we want to do
let socket_fd = pgb_writer.socket_fd;
let flush_fut = pgb_writer.flush();
// metric for how long flushing takes
let flush_fut = match flushing_timer {
Some(flushing_timer) => futures::future::Either::Left(flushing_timer.measure(
Instant::now(),
flush_fut,
socket_fd,
)),
Some(flushing_timer) => {
futures::future::Either::Left(flushing_timer.measure(Instant::now(), flush_fut))
}
None => futures::future::Either::Right(flush_fut),
};
// do it while respecting cancellation
@@ -2463,16 +2457,9 @@ where
fn startup(
&mut self,
_pgb: &mut PostgresBackend<IO>,
sm: &FeStartupPacket,
_sm: &FeStartupPacket,
) -> Result<(), QueryError> {
fail::fail_point!("ps::connection-start::startup-packet");
if let FeStartupPacket::StartupMessage { params, .. } = sm {
if let Some(app_name) = params.get("application_name") {
Span::current().record("application_name", field::display(app_name));
}
};
Ok(())
}

View File

@@ -23,14 +23,13 @@ use anyhow::{ensure, Context};
use bytes::{Buf, Bytes, BytesMut};
use enum_map::Enum;
use itertools::Itertools;
use pageserver_api::key::Key;
use pageserver_api::key::{
dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key,
rel_tag_sparse_key_range, relmap_file_key, repl_origin_key, repl_origin_key_range,
slru_block_to_key, slru_dir_to_key, slru_segment_key_range, slru_segment_size_to_key,
twophase_file_key, twophase_key_range, CompactKey, RelDirExists, AUX_FILES_KEY, CHECKPOINT_KEY,
CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
slru_segment_key_range, slru_segment_size_to_key, twophase_file_key, twophase_key_range,
CompactKey, AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, TWOPHASEDIR_KEY,
};
use pageserver_api::key::{rel_tag_sparse_key, Key};
use pageserver_api::keyspace::SparseKeySpace;
use pageserver_api::record::NeonWalRecord;
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
@@ -491,33 +490,12 @@ impl Timeline {
if !dbdirs.contains_key(&(tag.spcnode, tag.dbnode)) {
return Ok(false);
}
// Read path: first read the new reldir keyspace. Early return if the relation exists.
// Otherwise, read the old reldir keyspace.
// TODO: if IndexPart::rel_size_migration is `Migrated`, we only need to read from v2.
if self.get_rel_size_v2_enabled() {
// fetch directory listing (new)
let key = rel_tag_sparse_key(tag.spcnode, tag.dbnode, tag.relnode, tag.forknum);
let buf = RelDirExists::decode_option(version.sparse_get(self, key, ctx).await?)
.map_err(|_| PageReconstructError::Other(anyhow::anyhow!("invalid reldir key")))?;
let exists_v2 = buf == RelDirExists::Exists;
// Fast path: if the relation exists in the new format, return true.
// TODO: we should have a verification mode that checks both keyspaces
// to ensure the relation only exists in one of them.
if exists_v2 {
return Ok(true);
}
}
// fetch directory listing (old)
// fetch directory listing
let key = rel_dir_to_key(tag.spcnode, tag.dbnode);
let buf = version.get(self, key, ctx).await?;
let dir = RelDirectory::des(&buf)?;
let exists_v1 = dir.rels.contains(&(tag.relnode, tag.forknum));
Ok(exists_v1)
Ok(dir.rels.contains(&(tag.relnode, tag.forknum)))
}
/// Get a list of all existing relations in given tablespace and database.
@@ -535,12 +513,12 @@ impl Timeline {
version: Version<'_>,
ctx: &RequestContext,
) -> Result<HashSet<RelTag>, PageReconstructError> {
// fetch directory listing (old)
// fetch directory listing
let key = rel_dir_to_key(spcnode, dbnode);
let buf = version.get(self, key, ctx).await?;
let dir = RelDirectory::des(&buf)?;
let rels_v1: HashSet<RelTag> =
let rels: HashSet<RelTag> =
HashSet::from_iter(dir.rels.iter().map(|(relnode, forknum)| RelTag {
spcnode,
dbnode,
@@ -548,46 +526,6 @@ impl Timeline {
forknum: *forknum,
}));
if !self.get_rel_size_v2_enabled() {
return Ok(rels_v1);
}
// scan directory listing (new), merge with the old results
let key_range = rel_tag_sparse_key_range(spcnode, dbnode);
let io_concurrency = IoConcurrency::spawn_from_conf(
self.conf,
self.gate
.enter()
.map_err(|_| PageReconstructError::Cancelled)?,
);
let results = self
.scan(
KeySpace::single(key_range),
version.get_lsn(),
ctx,
io_concurrency,
)
.await?;
let mut rels = rels_v1;
for (key, val) in results {
let val = RelDirExists::decode(&val?)
.map_err(|_| PageReconstructError::Other(anyhow::anyhow!("invalid reldir key")))?;
assert_eq!(key.field6, 1);
assert_eq!(key.field2, spcnode);
assert_eq!(key.field3, dbnode);
let tag = RelTag {
spcnode,
dbnode,
relnode: key.field4,
forknum: key.field5,
};
if val == RelDirExists::Removed {
debug_assert!(!rels.contains(&tag), "removed reltag in v2");
continue;
}
let did_not_contain = rels.insert(tag);
debug_assert!(did_not_contain, "duplicate reltag in v2");
}
Ok(rels)
}
@@ -1206,11 +1144,7 @@ impl Timeline {
let dense_keyspace = result.to_keyspace();
let sparse_keyspace = SparseKeySpace(KeySpace {
ranges: vec![
Key::metadata_aux_key_range(),
repl_origin_key_range(),
Key::rel_dir_sparse_key_range(),
],
ranges: vec![Key::metadata_aux_key_range(), repl_origin_key_range()],
});
if cfg!(debug_assertions) {
@@ -1340,22 +1274,12 @@ pub struct DatadirModification<'a> {
/// For special "directory" keys that store key-value maps, track the size of the map
/// if it was updated in this modification.
pending_directory_entries: Vec<(DirectoryKind, MetricsUpdate)>,
pending_directory_entries: Vec<(DirectoryKind, usize)>,
/// An **approximation** of how many metadata bytes will be written to the EphemeralFile.
pending_metadata_bytes: usize,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MetricsUpdate {
/// Set the metrics to this value
Set(u64),
/// Increment the metrics by this value
Add(u64),
/// Decrement the metrics by this value
Sub(u64),
}
impl DatadirModification<'_> {
// When a DatadirModification is committed, we do a monolithic serialization of all its contents. WAL records can
// contain multiple pages, so the pageserver's record-based batch size isn't sufficient to bound this allocation: we
@@ -1435,8 +1359,7 @@ impl DatadirModification<'_> {
let buf = DbDirectory::ser(&DbDirectory {
dbdirs: HashMap::new(),
})?;
self.pending_directory_entries
.push((DirectoryKind::Db, MetricsUpdate::Set(0)));
self.pending_directory_entries.push((DirectoryKind::Db, 0));
self.put(DBDIR_KEY, Value::Image(buf.into()));
let buf = if self.tline.pg_version >= 17 {
@@ -1449,7 +1372,7 @@ impl DatadirModification<'_> {
})
}?;
self.pending_directory_entries
.push((DirectoryKind::TwoPhase, MetricsUpdate::Set(0)));
.push((DirectoryKind::TwoPhase, 0));
self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
@@ -1459,23 +1382,17 @@ impl DatadirModification<'_> {
// harmless but they'd just be dropped on later compaction.
if self.tline.tenant_shard_id.is_shard_zero() {
self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
self.pending_directory_entries.push((
DirectoryKind::SlruSegment(SlruKind::Clog),
MetricsUpdate::Set(0),
));
self.pending_directory_entries
.push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
self.put(
slru_dir_to_key(SlruKind::MultiXactMembers),
empty_dir.clone(),
);
self.pending_directory_entries.push((
DirectoryKind::SlruSegment(SlruKind::Clog),
MetricsUpdate::Set(0),
));
self.pending_directory_entries
.push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
self.pending_directory_entries.push((
DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets),
MetricsUpdate::Set(0),
));
self.pending_directory_entries
.push((DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets), 0));
}
Ok(())
@@ -1741,16 +1658,10 @@ impl DatadirModification<'_> {
}
if r.is_none() {
// Create RelDirectory
// TODO: if we have fully migrated to v2, no need to create this directory
let buf = RelDirectory::ser(&RelDirectory {
rels: HashSet::new(),
})?;
self.pending_directory_entries
.push((DirectoryKind::Rel, MetricsUpdate::Set(0)));
if self.tline.get_rel_size_v2_enabled() {
self.pending_directory_entries
.push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
}
self.pending_directory_entries.push((DirectoryKind::Rel, 0));
self.put(
rel_dir_to_key(spcnode, dbnode),
Value::Image(Bytes::from(buf)),
@@ -1774,10 +1685,8 @@ impl DatadirModification<'_> {
if !dir.xids.insert(xid) {
anyhow::bail!("twophase file for xid {} already exists", xid);
}
self.pending_directory_entries.push((
DirectoryKind::TwoPhase,
MetricsUpdate::Set(dir.xids.len() as u64),
));
self.pending_directory_entries
.push((DirectoryKind::TwoPhase, dir.xids.len()));
Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
} else {
let xid = xid as u32;
@@ -1785,10 +1694,8 @@ impl DatadirModification<'_> {
if !dir.xids.insert(xid) {
anyhow::bail!("twophase file for xid {} already exists", xid);
}
self.pending_directory_entries.push((
DirectoryKind::TwoPhase,
MetricsUpdate::Set(dir.xids.len() as u64),
));
self.pending_directory_entries
.push((DirectoryKind::TwoPhase, dir.xids.len()));
Bytes::from(TwoPhaseDirectory::ser(&dir)?)
};
self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
@@ -1837,10 +1744,8 @@ impl DatadirModification<'_> {
let mut dir = DbDirectory::des(&buf)?;
if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
let buf = DbDirectory::ser(&dir)?;
self.pending_directory_entries.push((
DirectoryKind::Db,
MetricsUpdate::Set(dir.dbdirs.len() as u64),
));
self.pending_directory_entries
.push((DirectoryKind::Db, dir.dbdirs.len()));
self.put(DBDIR_KEY, Value::Image(buf.into()));
} else {
warn!(
@@ -1873,85 +1778,39 @@ impl DatadirModification<'_> {
// tablespace. Create the reldir entry for it if so.
let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await.context("read db")?)
.context("deserialize db")?;
let dbdir_exists =
let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
let mut rel_dir =
if let hash_map::Entry::Vacant(e) = dbdir.dbdirs.entry((rel.spcnode, rel.dbnode)) {
// Didn't exist. Update dbdir
e.insert(false);
let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
self.pending_directory_entries.push((
DirectoryKind::Db,
MetricsUpdate::Set(dbdir.dbdirs.len() as u64),
));
self.pending_directory_entries
.push((DirectoryKind::Db, dbdir.dbdirs.len()));
self.put(DBDIR_KEY, Value::Image(buf.into()));
false
} else {
true
};
let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
let mut rel_dir = if !dbdir_exists {
// Create the RelDirectory
RelDirectory::default()
} else {
// reldir already exists, fetch it
RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
.context("deserialize db")?
};
// and create the RelDirectory
RelDirectory::default()
} else {
// reldir already exists, fetch it
RelDirectory::des(&self.get(rel_dir_key, ctx).await.context("read db")?)
.context("deserialize db")?
};
// Add the new relation to the rel directory entry, and write it back
if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
return Err(RelationError::AlreadyExists);
}
if self.tline.get_rel_size_v2_enabled() {
let sparse_rel_dir_key =
rel_tag_sparse_key(rel.spcnode, rel.dbnode, rel.relnode, rel.forknum);
// check if the rel_dir_key exists in v2
let val = self
.sparse_get(sparse_rel_dir_key, ctx)
.await
.map_err(|e| RelationError::Other(e.into()))?;
let val = RelDirExists::decode_option(val)
.map_err(|_| RelationError::Other(anyhow::anyhow!("invalid reldir key")))?;
if val == RelDirExists::Exists {
return Err(RelationError::AlreadyExists);
}
self.put(
sparse_rel_dir_key,
Value::Image(RelDirExists::Exists.encode()),
);
if !dbdir_exists {
self.pending_directory_entries
.push((DirectoryKind::Rel, MetricsUpdate::Set(0)));
self.pending_directory_entries
.push((DirectoryKind::RelV2, MetricsUpdate::Set(0)));
// We don't write `rel_dir_key -> rel_dir.rels` back to the storage in the v2 path unless it's the initial creation.
// TODO: if we have fully migrated to v2, no need to create this directory. Otherwise, there
// will be key not found errors if we don't create an empty one for rel_size_v2.
self.put(
rel_dir_key,
Value::Image(Bytes::from(
RelDirectory::ser(&RelDirectory::default()).context("serialize")?,
)),
);
}
self.pending_directory_entries
.push((DirectoryKind::RelV2, MetricsUpdate::Add(1)));
} else {
if !dbdir_exists {
self.pending_directory_entries
.push((DirectoryKind::Rel, MetricsUpdate::Set(0)))
}
self.pending_directory_entries
.push((DirectoryKind::Rel, MetricsUpdate::Add(1)));
self.put(
rel_dir_key,
Value::Image(Bytes::from(
RelDirectory::ser(&rel_dir).context("serialize")?,
)),
);
}
self.pending_directory_entries
.push((DirectoryKind::Rel, rel_dir.rels.len()));
self.put(
rel_dir_key,
Value::Image(Bytes::from(
RelDirectory::ser(&rel_dir).context("serialize")?,
)),
);
// Put size
let size_key = rel_size_to_key(rel);
let buf = nblocks.to_le_bytes();
@@ -2037,34 +1896,9 @@ impl DatadirModification<'_> {
let mut dirty = false;
for rel_tag in rel_tags {
let found = if dir.rels.remove(&(rel_tag.relnode, rel_tag.forknum)) {
self.pending_directory_entries
.push((DirectoryKind::Rel, MetricsUpdate::Sub(1)));
if dir.rels.remove(&(rel_tag.relnode, rel_tag.forknum)) {
dirty = true;
true
} else if self.tline.get_rel_size_v2_enabled() {
// The rel is not found in the old reldir key, so we need to check the new sparse keyspace.
// Note that a relation can only exist in one of the two keyspaces (guaranteed by the ingestion
// logic).
let key =
rel_tag_sparse_key(spc_node, db_node, rel_tag.relnode, rel_tag.forknum);
let val = RelDirExists::decode_option(self.sparse_get(key, ctx).await?)
.map_err(|_| RelationError::Other(anyhow::anyhow!("invalid reldir key")))?;
if val == RelDirExists::Exists {
self.pending_directory_entries
.push((DirectoryKind::RelV2, MetricsUpdate::Sub(1)));
// put tombstone
self.put(key, Value::Image(RelDirExists::Removed.encode()));
// no need to set dirty to true
true
} else {
false
}
} else {
false
};
if found {
// update logical size
let size_key = rel_size_to_key(rel_tag);
let old_size = self.get(size_key, ctx).await?.get_u32_le();
@@ -2080,6 +1914,8 @@ impl DatadirModification<'_> {
if dirty {
self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
self.pending_directory_entries
.push((DirectoryKind::Rel, dir.rels.len()));
}
}
@@ -2103,10 +1939,8 @@ impl DatadirModification<'_> {
if !dir.segments.insert(segno) {
anyhow::bail!("slru segment {kind:?}/{segno} already exists");
}
self.pending_directory_entries.push((
DirectoryKind::SlruSegment(kind),
MetricsUpdate::Set(dir.segments.len() as u64),
));
self.pending_directory_entries
.push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
self.put(
dir_key,
Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
@@ -2153,10 +1987,8 @@ impl DatadirModification<'_> {
if !dir.segments.remove(&segno) {
warn!("slru segment {:?}/{} does not exist", kind, segno);
}
self.pending_directory_entries.push((
DirectoryKind::SlruSegment(kind),
MetricsUpdate::Set(dir.segments.len() as u64),
));
self.pending_directory_entries
.push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
self.put(
dir_key,
Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
@@ -2188,10 +2020,8 @@ impl DatadirModification<'_> {
if !dir.xids.remove(&xid) {
warn!("twophase file for xid {} does not exist", xid);
}
self.pending_directory_entries.push((
DirectoryKind::TwoPhase,
MetricsUpdate::Set(dir.xids.len() as u64),
));
self.pending_directory_entries
.push((DirectoryKind::TwoPhase, dir.xids.len()));
Bytes::from(TwoPhaseDirectoryV17::ser(&dir)?)
} else {
let xid: u32 = u32::try_from(xid)?;
@@ -2200,10 +2030,8 @@ impl DatadirModification<'_> {
if !dir.xids.remove(&xid) {
warn!("twophase file for xid {} does not exist", xid);
}
self.pending_directory_entries.push((
DirectoryKind::TwoPhase,
MetricsUpdate::Set(dir.xids.len() as u64),
));
self.pending_directory_entries
.push((DirectoryKind::TwoPhase, dir.xids.len()));
Bytes::from(TwoPhaseDirectory::ser(&dir)?)
};
self.put(TWOPHASEDIR_KEY, Value::Image(newdirbuf));
@@ -2319,7 +2147,7 @@ impl DatadirModification<'_> {
}
for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
writer.update_directory_entries_count(kind, count);
writer.update_directory_entries_count(kind, count as u64);
}
Ok(())
@@ -2405,7 +2233,7 @@ impl DatadirModification<'_> {
}
for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
writer.update_directory_entries_count(kind, count);
writer.update_directory_entries_count(kind, count as u64);
}
self.pending_metadata_bytes = 0;
@@ -2469,22 +2297,6 @@ impl DatadirModification<'_> {
self.tline.get(key, lsn, ctx).await
}
/// Get a key from the sparse keyspace. Automatically converts the missing key error
/// and the empty value into None.
async fn sparse_get(
&self,
key: Key,
ctx: &RequestContext,
) -> Result<Option<Bytes>, PageReconstructError> {
let val = self.get(key, ctx).await;
match val {
Ok(val) if val.is_empty() => Ok(None),
Ok(val) => Ok(Some(val)),
Err(PageReconstructError::MissingKey(_)) => Ok(None),
Err(e) => Err(e),
}
}
fn put(&mut self, key: Key, val: Value) {
if Self::is_data_key(&key) {
self.put_data(key.to_compact(), val)
@@ -2567,23 +2379,6 @@ impl Version<'_> {
}
}
/// Get a key from the sparse keyspace. Automatically converts the missing key error
/// and the empty value into None.
async fn sparse_get(
&self,
timeline: &Timeline,
key: Key,
ctx: &RequestContext,
) -> Result<Option<Bytes>, PageReconstructError> {
let val = self.get(timeline, key, ctx).await;
match val {
Ok(val) if val.is_empty() => Ok(None),
Ok(val) => Ok(Some(val)),
Err(PageReconstructError::MissingKey(_)) => Ok(None),
Err(e) => Err(e),
}
}
fn get_lsn(&self) -> Lsn {
match self {
Version::Lsn(lsn) => *lsn,
@@ -2643,7 +2438,6 @@ pub(crate) enum DirectoryKind {
Rel,
AuxFiles,
SlruSegment(SlruKind),
RelV2,
}
impl DirectoryKind {

View File

@@ -3924,13 +3924,6 @@ impl Tenant {
.unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
}
pub fn get_rel_size_v2_enabled(&self) -> bool {
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
tenant_conf
.rel_size_v2_enabled
.unwrap_or(self.conf.default_tenant_conf.rel_size_v2_enabled)
}
pub fn get_compaction_upper_limit(&self) -> usize {
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
tenant_conf
@@ -5647,7 +5640,7 @@ pub(crate) mod harness {
lsn_lease_length_for_ts: Some(tenant_conf.lsn_lease_length_for_ts),
timeline_offloading: Some(tenant_conf.timeline_offloading),
wal_receiver_protocol_override: tenant_conf.wal_receiver_protocol_override,
rel_size_v2_enabled: Some(tenant_conf.rel_size_v2_enabled),
rel_size_v2_enabled: tenant_conf.rel_size_v2_enabled,
gc_compaction_enabled: Some(tenant_conf.gc_compaction_enabled),
gc_compaction_initial_threshold_kb: Some(
tenant_conf.gc_compaction_initial_threshold_kb,

View File

@@ -485,9 +485,7 @@ impl TenantConfOpt {
wal_receiver_protocol_override: self
.wal_receiver_protocol_override
.or(global_conf.wal_receiver_protocol_override),
rel_size_v2_enabled: self
.rel_size_v2_enabled
.unwrap_or(global_conf.rel_size_v2_enabled),
rel_size_v2_enabled: self.rel_size_v2_enabled.or(global_conf.rel_size_v2_enabled),
gc_compaction_enabled: self
.gc_compaction_enabled
.unwrap_or(global_conf.gc_compaction_enabled),

View File

@@ -117,7 +117,7 @@ use pageserver_api::config::tenant_conf_defaults::DEFAULT_PITR_INTERVAL;
use crate::config::PageServerConf;
use crate::keyspace::{KeyPartitioning, KeySpace};
use crate::metrics::{TimelineMetrics, DELTAS_PER_READ_GLOBAL, LAYERS_PER_READ_GLOBAL};
use crate::pgdatadir_mapping::{CalculateLogicalSizeError, MetricsUpdate};
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
use crate::tenant::config::TenantConfOpt;
use pageserver_api::reltag::RelTag;
use pageserver_api::shard::ShardIndex;
@@ -327,7 +327,6 @@ pub struct Timeline {
// in `crate::page_service` writes these metrics.
pub(crate) query_metrics: crate::metrics::SmgrQueryTimePerTimeline,
directory_metrics_inited: [AtomicBool; DirectoryKind::KINDS_NUM],
directory_metrics: [AtomicU64; DirectoryKind::KINDS_NUM],
/// Ensures layers aren't frozen by checkpointer between
@@ -2356,14 +2355,6 @@ impl Timeline {
.unwrap_or(self.conf.default_tenant_conf.compaction_threshold)
}
pub(crate) fn get_rel_size_v2_enabled(&self) -> bool {
let tenant_conf = self.tenant_conf.load();
tenant_conf
.tenant_conf
.rel_size_v2_enabled
.unwrap_or(self.conf.default_tenant_conf.rel_size_v2_enabled)
}
fn get_compaction_upper_limit(&self) -> usize {
let tenant_conf = self.tenant_conf.load();
tenant_conf
@@ -2673,7 +2664,6 @@ impl Timeline {
),
directory_metrics: array::from_fn(|_| AtomicU64::new(0)),
directory_metrics_inited: array::from_fn(|_| AtomicBool::new(false)),
flush_loop_state: Mutex::new(FlushLoopState::NotStarted),
@@ -3440,42 +3430,8 @@ impl Timeline {
}
}
pub(crate) fn update_directory_entries_count(&self, kind: DirectoryKind, count: MetricsUpdate) {
// TODO: this directory metrics is not correct -- we could have multiple reldirs in the system
// for each of the database, but we only store one value, and therefore each pgdirmodification
// would overwrite the previous value if they modify different databases.
match count {
MetricsUpdate::Set(count) => {
self.directory_metrics[kind.offset()].store(count, AtomicOrdering::Relaxed);
self.directory_metrics_inited[kind.offset()].store(true, AtomicOrdering::Relaxed);
}
MetricsUpdate::Add(count) => {
// TODO: these operations are not atomic; but we only have one writer to the metrics, so
// it's fine.
if self.directory_metrics_inited[kind.offset()].load(AtomicOrdering::Relaxed) {
// The metrics has been initialized with `MetricsUpdate::Set` before, so we can add/sub
// the value reliably.
self.directory_metrics[kind.offset()].fetch_add(count, AtomicOrdering::Relaxed);
}
// Otherwise, ignore this update
}
MetricsUpdate::Sub(count) => {
// TODO: these operations are not atomic; but we only have one writer to the metrics, so
// it's fine.
if self.directory_metrics_inited[kind.offset()].load(AtomicOrdering::Relaxed) {
// The metrics has been initialized with `MetricsUpdate::Set` before.
// The operation could overflow so we need to normalize the value.
let prev_val =
self.directory_metrics[kind.offset()].load(AtomicOrdering::Relaxed);
let res = prev_val.saturating_sub(count);
self.directory_metrics[kind.offset()].store(res, AtomicOrdering::Relaxed);
}
// Otherwise, ignore this update
}
};
// TODO: remove this, there's no place in the code that updates this aux metrics.
pub(crate) fn update_directory_entries_count(&self, kind: DirectoryKind, count: u64) {
self.directory_metrics[kind.offset()].store(count, AtomicOrdering::Relaxed);
let aux_metric =
self.directory_metrics[DirectoryKind::AuxFiles.offset()].load(AtomicOrdering::Relaxed);
@@ -3693,9 +3649,7 @@ impl Timeline {
// space. If that's not the case, we had at least one key encounter a gap in the image layer
// and stop the search as a result of that.
let mut removed = keyspace.remove_overlapping_with(&image_covered_keyspace);
// Do not fire missing key error and end early for sparse keys. Note that we hava already removed
// non-inherited keyspaces before, so we can safely do a full `SPARSE_RANGE` remove instead of
// figuring out what is the inherited key range and do a fine-grained pruning.
// Do not fire missing key error for sparse keys.
removed.remove_overlapping_with(&KeySpace {
ranges: vec![SPARSE_RANGE],
});

View File

@@ -7,9 +7,7 @@ use super::Timeline;
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::remote_timeline_client::ShutdownIfArchivedError;
use crate::tenant::timeline::delete::{make_timeline_delete_guard, TimelineDeleteGuardKind};
use crate::tenant::{
DeleteTimelineError, OffloadedTimeline, Tenant, TenantManifestError, TimelineOrOffloaded,
};
use crate::tenant::{OffloadedTimeline, Tenant, TenantManifestError, TimelineOrOffloaded};
#[derive(thiserror::Error, Debug)]
pub(crate) enum OffloadError {
@@ -39,25 +37,12 @@ pub(crate) async fn offload_timeline(
debug_assert_current_span_has_tenant_and_timeline_id();
tracing::info!("offloading archived timeline");
let delete_guard_res = make_timeline_delete_guard(
let (timeline, guard) = make_timeline_delete_guard(
tenant,
timeline.timeline_id,
TimelineDeleteGuardKind::Offload,
);
if let Err(DeleteTimelineError::HasChildren(children)) = delete_guard_res {
let is_archived = timeline.is_archived();
if is_archived == Some(true) {
tracing::error!("timeline is archived but has non-archived children: {children:?}");
return Err(OffloadError::NotArchived);
}
tracing::info!(
?is_archived,
"timeline is not archived and has unarchived children"
);
return Err(OffloadError::NotArchived);
};
let (timeline, guard) =
delete_guard_res.map_err(|e| OffloadError::Other(anyhow::anyhow!(e)))?;
)
.map_err(|e| OffloadError::Other(anyhow::anyhow!(e)))?;
let TimelineOrOffloaded::Timeline(timeline) = timeline else {
tracing::error!("timeline already offloaded, but given timeline object");

View File

@@ -18,8 +18,6 @@
#include "neon_utils.h"
static int extension_server_port = 0;
static int extension_server_request_timeout = 60;
static int extension_server_connect_timeout = 60;
static download_extension_file_hook_type prev_download_extension_file_hook = NULL;
@@ -36,18 +34,19 @@ static download_extension_file_hook_type prev_download_extension_file_hook = NUL
static bool
neon_download_extension_file_http(const char *filename, bool is_library)
{
static CURL *handle = NULL;
CURLcode res;
bool ret = false;
CURL *handle = NULL;
char *compute_ctl_url;
bool ret = false;
handle = alloc_curl_handle();
if (handle == NULL)
{
handle = alloc_curl_handle();
curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, "POST");
if (extension_server_request_timeout > 0)
curl_easy_setopt(handle, CURLOPT_TIMEOUT, (long)extension_server_request_timeout /* seconds */ );
if (extension_server_connect_timeout > 0)
curl_easy_setopt(handle, CURLOPT_CONNECTTIMEOUT, (long)extension_server_connect_timeout /* seconds */ );
curl_easy_setopt(handle, CURLOPT_CUSTOMREQUEST, "POST");
curl_easy_setopt(handle, CURLOPT_TIMEOUT, 60L /* seconds */ );
}
compute_ctl_url = psprintf("http://localhost:%d/extension_server/%s%s",
extension_server_port, filename, is_library ? "?is_library=true" : "");
@@ -58,8 +57,6 @@ neon_download_extension_file_http(const char *filename, bool is_library)
/* Perform the request, res will get the return code */
res = curl_easy_perform(handle);
curl_easy_cleanup(handle);
/* Check for errors */
if (res == CURLE_OK)
{
@@ -91,24 +88,6 @@ pg_init_extension_server()
0, /* no flags required */
NULL, NULL, NULL);
DefineCustomIntVariable("neon.extension_server_request_timeout",
"timeout for fetching extensions in seconds",
NULL,
&extension_server_request_timeout,
60, 0, INT_MAX,
PGC_SUSET,
GUC_UNIT_S,
NULL, NULL, NULL);
DefineCustomIntVariable("neon.extension_server_connect_timeout",
"timeout for connecting to the extension server in seconds",
NULL,
&extension_server_connect_timeout,
60, 0, INT_MAX,
PGC_SUSET,
GUC_UNIT_S,
NULL, NULL, NULL);
/* set download_extension_file_hook */
prev_download_extension_file_hook = download_extension_file_hook;
download_extension_file_hook = neon_download_extension_file_http;

View File

@@ -122,8 +122,8 @@ addSHLL(HyperLogLogState *cState, uint32 hash)
index = hash >> HLL_C_BITS;
/* Compute the rank of the remaining 32 - "k" (registerWidth) bits */
count = rho(hash << HLL_BIT_WIDTH, HLL_C_BITS) - 1;
Assert(count <= HLL_C_BITS);
count = rho(hash << HLL_BIT_WIDTH, HLL_C_BITS);
cState->regs[index][count] = now;
}
@@ -136,7 +136,7 @@ getMaximum(const TimestampTz* reg, TimestampTz since)
{
if (reg[i] >= since)
{
max = i + 1;
max = i;
}
}

View File

@@ -378,9 +378,8 @@ pageserver_connect(shardno_t shard_no, int elevel)
{
case PS_Disconnected:
{
const char *keywords[4];
const char *values[4];
char pid_str[16];
const char *keywords[3];
const char *values[3];
int n_pgsql_params;
TimestampTz now;
int64 us_since_last_attempt;
@@ -425,30 +424,14 @@ pageserver_connect(shardno_t shard_no, int elevel)
* can override the password from the env variable. Seems useful, although
* we don't currently use that capability anywhere.
*/
n_pgsql_params = 0;
/*
* Pageserver logs include this in the connection's tracing span.
* This allows for reasier log correlation between compute and pageserver.
*/
keywords[n_pgsql_params] = "application_name";
{
int ret = snprintf(pid_str, sizeof(pid_str), "%d", MyProcPid);
if (ret < 0 || ret >= (int)(sizeof(pid_str)))
elog(FATAL, "stack-allocated buffer too small to hold pid");
}
/* lifetime: PQconnectStartParams strdups internally */
values[n_pgsql_params] = (const char*) pid_str;
n_pgsql_params++;
keywords[n_pgsql_params] = "dbname";
values[n_pgsql_params] = connstr;
n_pgsql_params++;
keywords[0] = "dbname";
values[0] = connstr;
n_pgsql_params = 1;
if (neon_auth_token)
{
keywords[n_pgsql_params] = "password";
values[n_pgsql_params] = neon_auth_token;
keywords[1] = "password";
values[1] = neon_auth_token;
n_pgsql_params++;
}

25
poetry.lock generated
View File

@@ -412,7 +412,6 @@ files = [
[package.dependencies]
botocore-stubs = "*"
mypy-boto3-kms = {version = ">=1.26.0,<1.27.0", optional = true, markers = "extra == \"kms\""}
mypy-boto3-s3 = {version = ">=1.26.0,<1.27.0", optional = true, markers = "extra == \"s3\""}
types-s3transfer = "*"
typing-extensions = ">=4.1.0"
@@ -2023,18 +2022,6 @@ install-types = ["pip"]
mypyc = ["setuptools (>=50)"]
reports = ["lxml"]
[[package]]
name = "mypy-boto3-kms"
version = "1.26.147"
description = "Type annotations for boto3.KMS 1.26.147 service generated with mypy-boto3-builder 7.14.5"
optional = false
python-versions = ">=3.7"
groups = ["main"]
files = [
{file = "mypy-boto3-kms-1.26.147.tar.gz", hash = "sha256:816a4d1bb0585e1b9620a3f96c1d69a06f53b7b5621858579dd77c60dbb5fa5c"},
{file = "mypy_boto3_kms-1.26.147-py3-none-any.whl", hash = "sha256:493f0db674a25c88769f5cb8ab8ac00d3dda5dfc903d5cda34c990ee64689f79"},
]
[[package]]
name = "mypy-boto3-s3"
version = "1.26.0.post1"
@@ -2771,18 +2758,18 @@ pytest = ">=5,<8"
[[package]]
name = "pytest-timeout"
version = "2.3.1"
version = "2.1.0"
description = "pytest plugin to abort hanging tests"
optional = false
python-versions = ">=3.7"
python-versions = ">=3.6"
groups = ["main"]
files = [
{file = "pytest-timeout-2.3.1.tar.gz", hash = "sha256:12397729125c6ecbdaca01035b9e5239d4db97352320af155b3f5de1ba5165d9"},
{file = "pytest_timeout-2.3.1-py3-none-any.whl", hash = "sha256:68188cb703edfc6a18fad98dc25a3c61e9f24d644b0b70f33af545219fc7813e"},
{file = "pytest-timeout-2.1.0.tar.gz", hash = "sha256:c07ca07404c612f8abbe22294b23c368e2e5104b521c1790195561f37e1ac3d9"},
{file = "pytest_timeout-2.1.0-py3-none-any.whl", hash = "sha256:f6f50101443ce70ad325ceb4473c4255e9d74e3c7cd0ef827309dfa4c0d975c6"},
]
[package.dependencies]
pytest = ">=7.0.0"
pytest = ">=5.0.0"
[[package]]
name = "pytest-xdist"
@@ -3820,4 +3807,4 @@ cffi = ["cffi (>=1.11)"]
[metadata]
lock-version = "2.1"
python-versions = "^3.11"
content-hash = "00ddc42c32e235b6171845fc066dcab078282ed832cd464d5e8a0afa959dd04a"
content-hash = "4dc3165fe22c0e0f7a030ea0f8a680ae2ff74561d8658c393abbe9112caaf5d7"

View File

@@ -19,6 +19,7 @@ aws-config.workspace = true
aws-sdk-iam.workspace = true
aws-sigv4.workspace = true
base64.workspace = true
boxcar = "0.2.8"
bstr.workspace = true
bytes = { workspace = true, features = ["serde"] }
camino.workspace = true
@@ -62,6 +63,7 @@ postgres_backend.workspace = true
postgres-client = { package = "tokio-postgres2", path = "../libs/proxy/tokio-postgres2" }
postgres-protocol = { package = "postgres-protocol2", path = "../libs/proxy/postgres-protocol2" }
pq_proto.workspace = true
prometheus.workspace = true
rand.workspace = true
regex.workspace = true
remote_storage = { version = "0.1", path = "../libs/remote_storage/" }
@@ -79,6 +81,7 @@ sha2 = { workspace = true, features = ["asm", "oid"] }
smol_str.workspace = true
smallvec.workspace = true
socket2.workspace = true
strum.workspace = true
strum_macros.workspace = true
subtle.workspace = true
thiserror.workspace = true
@@ -92,6 +95,7 @@ tracing-subscriber.workspace = true
tracing-utils.workspace = true
tracing.workspace = true
tracing-log.workspace = true
tracing-serde.workspace = true
tracing-opentelemetry.workspace = true
try-lock.workspace = true
typed-json.workspace = true

View File

@@ -140,8 +140,9 @@ async fn authenticate(
let (psql_session_id, waiter) = loop {
let psql_session_id = new_psql_session_id();
if let Ok(waiter) = control_plane::mgmt::get_waiter(&psql_session_id) {
break (psql_session_id, waiter);
match control_plane::mgmt::get_waiter(&psql_session_id) {
Ok(waiter) => break (psql_session_id, waiter),
Err(_e) => continue,
}
};

View File

@@ -220,11 +220,11 @@ async fn fetch_jwks(
}
impl JwkCacheEntryLock {
async fn acquire_permit(self: &Arc<Self>) -> JwkRenewalPermit<'_> {
async fn acquire_permit<'a>(self: &'a Arc<Self>) -> JwkRenewalPermit<'a> {
JwkRenewalPermit::acquire_permit(self).await
}
fn try_acquire_permit(self: &Arc<Self>) -> Option<JwkRenewalPermit<'_>> {
fn try_acquire_permit<'a>(self: &'a Arc<Self>) -> Option<JwkRenewalPermit<'a>> {
JwkRenewalPermit::try_acquire_permit(self)
}
@@ -393,7 +393,7 @@ impl JwkCacheEntryLock {
verify_rsa_signature(header_payload.as_bytes(), &sig, key, &header.algorithm)?;
}
key => return Err(JwtError::UnsupportedKeyType(key.into())),
}
};
tracing::debug!(?payload, "JWT signature valid with claims");
@@ -510,7 +510,7 @@ fn verify_rsa_signature(
key.verify(data, &sig)?;
}
_ => return Err(JwtError::InvalidRsaSigningAlgorithm),
}
};
Ok(())
}

View File

@@ -4,20 +4,6 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use anyhow::{bail, ensure, Context};
use camino::{Utf8Path, Utf8PathBuf};
use clap::Parser;
use compute_api::spec::LocalProxySpec;
use futures::future::Either;
use thiserror::Error;
use tokio::net::TcpListener;
use tokio::sync::Notify;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use utils::sentry_init::init_sentry;
use utils::{pid_file, project_build_tag, project_git_version};
use crate::auth::backend::jwt::JwkCache;
use crate::auth::backend::local::{LocalBackend, JWKS_ROLE_MAP};
use crate::auth::{self};
@@ -39,10 +25,24 @@ use crate::serverless::{self, GlobalConnPoolOptions};
use crate::tls::client_config::compute_client_config_with_root_certs;
use crate::types::RoleName;
use crate::url::ApiUrl;
use anyhow::{bail, ensure, Context};
use camino::{Utf8Path, Utf8PathBuf};
use compute_api::spec::LocalProxySpec;
use futures::future::Either;
project_git_version!(GIT_VERSION);
project_build_tag!(BUILD_TAG);
use clap::Parser;
use thiserror::Error;
use tokio::net::TcpListener;
use tokio::sync::Notify;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use utils::sentry_init::init_sentry;
use utils::{pid_file, project_build_tag, project_git_version};
/// Neon proxy/router
#[derive(Parser)]
#[command(version = GIT_VERSION, about)]

View File

@@ -5,6 +5,12 @@
/// the outside. Similar to an ingress controller for HTTPS.
use std::{net::SocketAddr, sync::Arc};
use crate::context::RequestContext;
use crate::metrics::{Metrics, ThreadPoolMetrics};
use crate::protocol2::ConnectionInfo;
use crate::proxy::{copy_bidirectional_client_compute, run_until_cancelled, ErrorSource};
use crate::stream::{PqStream, Stream};
use crate::tls::TlsServerEndPoint;
use anyhow::{anyhow, bail, ensure, Context};
use clap::Arg;
use futures::future::Either;
@@ -19,13 +25,6 @@ use tracing::{error, info, Instrument};
use utils::project_git_version;
use utils::sentry_init::init_sentry;
use crate::context::RequestContext;
use crate::metrics::{Metrics, ThreadPoolMetrics};
use crate::protocol2::ConnectionInfo;
use crate::proxy::{copy_bidirectional_client_compute, run_until_cancelled, ErrorSource};
use crate::stream::{PqStream, Stream};
use crate::tls::TlsServerEndPoint;
project_git_version!(GIT_VERSION);
fn cli() -> clap::Command {

View File

@@ -3,16 +3,6 @@ use std::pin::pin;
use std::sync::Arc;
use std::time::Duration;
use anyhow::bail;
use futures::future::Either;
use remote_storage::RemoteStorageConfig;
use tokio::net::TcpListener;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::{info, warn, Instrument};
use utils::sentry_init::init_sentry;
use utils::{project_build_tag, project_git_version};
use crate::auth::backend::jwt::JwkCache;
use crate::auth::backend::{AuthRateLimiter, ConsoleRedirectBackend, MaybeOwned};
use crate::cancellation::{handle_cancel_messages, CancellationHandler};
@@ -34,6 +24,15 @@ use crate::serverless::cancel_set::CancelSet;
use crate::serverless::GlobalConnPoolOptions;
use crate::tls::client_config::compute_client_config_with_root_certs;
use crate::{auth, control_plane, http, serverless, usage_metrics};
use anyhow::bail;
use futures::future::Either;
use remote_storage::RemoteStorageConfig;
use tokio::net::TcpListener;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::{info, warn, Instrument};
use utils::sentry_init::init_sentry;
use utils::{project_build_tag, project_git_version};
project_git_version!(GIT_VERSION);
project_build_tag!(BUILD_TAG);
@@ -304,7 +303,7 @@ pub async fn run() -> anyhow::Result<()> {
match auth_backend {
Either::Left(auth_backend) => info!("Authentication backend: {auth_backend}"),
Either::Right(auth_backend) => info!("Authentication backend: {auth_backend:?}"),
}
};
info!("Using region: {}", args.aws_region);
// TODO: untangle the config args
@@ -804,9 +803,8 @@ fn build_auth_backend(
mod tests {
use std::time::Duration;
use clap::Parser;
use crate::rate_limiter::RateBucketInfo;
use clap::Parser;
#[test]
fn parse_endpoint_rps_limit() {

View File

@@ -242,7 +242,7 @@ impl EndpointsCache {
});
tracing::error!("error parsing value {value:?}: {err:?}");
}
}
};
}
if total.is_power_of_two() {
tracing::debug!("endpoints read {}", total);

View File

@@ -137,8 +137,8 @@ impl ConnCfg {
match k {
// Only set `user` if it's not present in the config.
// Console redirect auth flow takes username from the console's response.
"user" if self.user_is_set() => {}
"database" if self.db_is_set() => {}
"user" if self.user_is_set() => continue,
"database" if self.db_is_set() => continue,
"options" => {
if let Some(options) = filtered_options(v) {
self.set_param(k, &options);

View File

@@ -82,7 +82,7 @@ pub async fn task_main(
error!("per-client task finished with an error: failed to set socket option: {e:#}");
return;
}
}
};
let ctx = RequestContext::new(
session_id,

View File

@@ -19,7 +19,8 @@ use crate::cache::{Cached, TimedLru};
use crate::config::ComputeConfig;
use crate::context::RequestContext;
use crate::control_plane::messages::{ControlPlaneErrorMessage, MetricsAuxInfo};
use crate::intern::{AccountIdInt, ProjectIdInt};
use crate::intern::AccountIdInt;
use crate::intern::ProjectIdInt;
use crate::types::{EndpointCacheKey, EndpointId};
use crate::{compute, scram};

View File

@@ -7,8 +7,9 @@ use chrono::{DateTime, Utc};
use opentelemetry::trace::TraceContextExt;
use scopeguard::defer;
use serde::ser::{SerializeMap, Serializer};
use tracing::span;
use tracing::subscriber::Interest;
use tracing::{callsite, span, Event, Metadata, Span, Subscriber};
use tracing::{callsite, Event, Metadata, Span, Subscriber};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use tracing_subscriber::filter::{EnvFilter, LevelFilter};
use tracing_subscriber::fmt::format::{Format, Full};

View File

@@ -119,7 +119,7 @@ pub(crate) async fn read_proxy_protocol<T: AsyncRead + Unpin>(
// if no more bytes available then exit
if bytes_read == 0 {
return Ok((ChainRW { inner: read, buf }, ConnectHeader::Missing));
}
};
// check if we have enough bytes to continue
if let Some(header) = buf.try_get::<ProxyProtocolV2Header>() {
@@ -169,7 +169,7 @@ fn process_proxy_payload(
header.version_and_command
),
)),
}
};
let size_err =
"invalid proxy protocol length. payload not large enough to fit requested IP addresses";

View File

@@ -198,7 +198,7 @@ where
warn!(error = ?e, num_retries, retriable = true, COULD_NOT_CONNECT);
}
}
};
let wait_duration = retry_after(num_retries, compute.retry);
num_retries += 1;

View File

@@ -118,7 +118,7 @@ pub async fn task_main(
error!("per-client task finished with an error: failed to set socket option: {e:#}");
return;
}
}
};
let ctx = RequestContext::new(
session_id,

View File

@@ -169,7 +169,7 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
});
tracing::error!("broken message: {e}");
}
}
};
return Ok(());
}
Ok(msg) => msg,
@@ -180,7 +180,7 @@ impl<C: ProjectInfoCache + Send + Sync + 'static> MessageHandler<C> {
match serde_json::from_str::<NotificationHeader>(&payload) {
Ok(header) => tracing::error!(topic = header.topic, "broken message: {e}"),
Err(_) => tracing::error!("broken message: {e}"),
}
};
return Ok(());
}
};

View File

@@ -372,7 +372,7 @@ impl PoolingBackend {
debug!("setting up backend session state");
// initiates the auth session
if let Err(e) = client.batch_execute("select auth.init();").await {
if let Err(e) = client.execute("select auth.init()", &[]).await {
discard.discard();
return Err(e.into());
}
@@ -651,7 +651,7 @@ async fn connect_http2(
e,
)));
}
}
};
};
let (client, connection) = hyper::client::conn::http2::Builder::new(TokioExecutor::new())

View File

@@ -23,6 +23,7 @@ use indexmap::IndexMap;
use jose_jwk::jose_b64::base64ct::{Base64UrlUnpadded, Encoding};
use parking_lot::RwLock;
use postgres_client::tls::NoTlsStream;
use postgres_client::types::ToSql;
use postgres_client::AsyncMessage;
use serde_json::value::RawValue;
use tokio::net::TcpStream;
@@ -280,9 +281,13 @@ impl ClientInnerCommon<postgres_client::Client> {
let token = resign_jwt(&local_data.key, payload, local_data.jti)?;
// initiates the auth session
// this is safe from query injections as the jwt format free of any escape characters.
let query = format!("discard all; select auth.jwt_session_init('{token}')");
self.inner.batch_execute(&query).await?;
self.inner.batch_execute("discard all").await?;
self.inner
.execute(
"select auth.jwt_session_init($1)",
&[&&*token as &(dyn ToSql + Sync)],
)
.await?;
let pid = self.inner.get_process_id();
info!(pid, jti = local_data.jti, "user session state init");

View File

@@ -17,12 +17,12 @@ Jinja2 = "^3.1.5"
types-requests = "^2.31.0.0"
types-psycopg2 = "^2.9.21.20241019"
boto3 = "^1.34.11"
boto3-stubs = {extras = ["s3", "kms"], version = "^1.26.16"}
boto3-stubs = {extras = ["s3"], version = "^1.26.16"}
moto = {extras = ["server"], version = "^5.0.6"}
backoff = "^2.2.1"
pytest-lazy-fixture = "^0.6.3"
prometheus-client = "^0.14.1"
pytest-timeout = "^2.3.1"
pytest-timeout = "^2.1.0"
Werkzeug = "^3.0.6"
pytest-order = "^1.1.0"
allure-pytest = "^2.13.2"

View File

@@ -11,7 +11,7 @@ markers =
testpaths =
test_runner
minversion = 6.0
log_format = %(asctime)s.%(msecs)03d %(levelname)s [%(filename)s:%(lineno)d] %(message)s
log_format = %(asctime)s.%(msecs)-3d %(levelname)s [%(filename)s:%(lineno)d] %(message)s
log_date_format = %Y-%m-%d %H:%M:%S
log_cli = true
timeout = 300

View File

@@ -13,8 +13,6 @@ use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::{auth::Scope, measured_stream::MeasuredStream};
use std::os::fd::AsRawFd;
use crate::metrics::TrafficMetrics;
use crate::SafeKeeperConf;
use crate::{handler::SafekeeperPostgresHandler, GlobalTimelines};
@@ -64,7 +62,6 @@ async fn handle_socket(
global_timelines: Arc<GlobalTimelines>,
) -> Result<(), QueryError> {
socket.set_nodelay(true)?;
let socket_fd = socket.as_raw_fd();
let peer_addr = socket.peer_addr()?;
// Set timeout on reading from the socket. It prevents hanged up connection
@@ -110,7 +107,7 @@ async fn handle_socket(
auth_pair,
global_timelines,
);
let pgbackend = PostgresBackend::new_from_io(socket_fd, socket, peer_addr, auth_type, None)?;
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
// libpq protocol between safekeeper and walproposer / pageserver
// We don't use shutdown.
pgbackend

View File

@@ -26,6 +26,7 @@ humantime.workspace = true
itertools.workspace = true
lasso.workspace = true
once_cell.workspace = true
governor = {version = "0.8.0"}
pageserver_api.workspace = true
pageserver_client.workspace = true
postgres_connection.workspace = true
@@ -34,7 +35,6 @@ reqwest = { workspace = true, features = ["stream"] }
routerify.workspace = true
safekeeper_api.workspace = true
safekeeper_client.workspace = true
regex.workspace = true
rustls-native-certs.workspace = true
serde.workspace = true
serde_json.workspace = true

View File

@@ -8,6 +8,7 @@ use crate::reconciler::ReconcileError;
use crate::service::{LeadershipStatus, Service, RECONCILE_TIMEOUT, STARTUP_RECONCILE_TIMEOUT};
use anyhow::Context;
use futures::Future;
use governor::{Quota, RateLimiter};
use http_utils::{
endpoint::{self, auth_middleware, check_permission_with, request_span},
error::ApiError,
@@ -32,6 +33,7 @@ use pageserver_api::models::{
};
use pageserver_api::shard::TenantShardId;
use pageserver_client::{mgmt_api, BlockUnblock};
use std::num::NonZero;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, Instant};
@@ -516,16 +518,13 @@ async fn handle_tenant_timeline_block_unblock_gc(
json_response(StatusCode::OK, ())
}
// For metric labels where we would like to include the approximate path, but exclude high-cardinality fields like query parameters
// and tenant/timeline IDs. Since we are proxying to arbitrary paths, we don't have routing templates to
// compare to, so we can just filter out our well known ID format with regexes.
fn path_without_ids(path: &str) -> String {
static ID_REGEX: std::sync::OnceLock<regex::Regex> = std::sync::OnceLock::new();
ID_REGEX
.get_or_init(|| regex::Regex::new(r"([0-9a-fA-F]{32}(-[0-9]{4})?|\?.*)").unwrap())
.replace_all(path, "")
.to_string()
}
static PASSTHROUGH_RATE_LIMITER: std::sync::OnceLock<
RateLimiter<
TenantId,
governor::state::keyed::DefaultKeyedStateStore<TenantId>,
governor::clock::DefaultClock,
>,
> = std::sync::OnceLock::new();
async fn handle_tenant_timeline_passthrough(
service: Arc<Service>,
@@ -548,6 +547,19 @@ async fn handle_tenant_timeline_passthrough(
tracing::info!("Proxying request for tenant {} ({})", tenant_id, path);
// Proxied requests are expected to be rare on a per-tenant basis: these are things
// like inspecting a timeline's details or doing an LSN<->timestamp mapping. Not anything
// that has high throughput.
let limiter = PASSTHROUGH_RATE_LIMITER.get_or_init(|| {
RateLimiter::new(
Quota::per_second(NonZero::new(10).unwrap()),
governor::state::keyed::DefaultKeyedStateStore::new(),
governor::clock::DefaultClock::default(),
)
});
limiter.until_key_ready(&tenant_id).await;
// Find the node that holds shard zero
let (node, tenant_shard_id) = service.tenant_shard0_node(tenant_id).await?;
@@ -562,7 +574,10 @@ async fn handle_tenant_timeline_passthrough(
.metrics_group
.storage_controller_passthrough_request_latency;
let path_label = path_without_ids(&path)
// This is a bit awkward. We remove the param from the request
// and join the words by '_' to get a label for the request.
let just_path = path.replace(&tenant_shard_str, "");
let path_label = just_path
.split('/')
.filter(|token| !token.is_empty())
.collect::<Vec<_>>()
@@ -2097,16 +2112,3 @@ pub fn make_router(
)
})
}
#[cfg(test)]
mod test {
use super::path_without_ids;
#[test]
fn test_path_without_ids() {
assert_eq!(path_without_ids("/v1/tenant/1a2b3344556677881122334455667788/timeline/AA223344556677881122334455667788"), "/v1/tenant//timeline/");
assert_eq!(path_without_ids("/v1/tenant/1a2b3344556677881122334455667788-0108/timeline/AA223344556677881122334455667788"), "/v1/tenant//timeline/");
assert_eq!(path_without_ids("/v1/tenant/1a2b3344556677881122334455667788-0108/timeline/AA223344556677881122334455667788?parameter=foo"), "/v1/tenant//timeline/");
}
}

View File

@@ -12,8 +12,7 @@ use storage_controller::persistence::Persistence;
use storage_controller::service::chaos_injector::ChaosInjector;
use storage_controller::service::{
Config, Service, HEARTBEAT_INTERVAL_DEFAULT, LONG_RECONCILE_THRESHOLD_DEFAULT,
MAX_OFFLINE_INTERVAL_DEFAULT, MAX_WARMING_UP_INTERVAL_DEFAULT,
PRIORITY_RECONCILER_CONCURRENCY_DEFAULT, RECONCILER_CONCURRENCY_DEFAULT,
MAX_OFFLINE_INTERVAL_DEFAULT, MAX_WARMING_UP_INTERVAL_DEFAULT, RECONCILER_CONCURRENCY_DEFAULT,
};
use tokio::signal::unix::SignalKind;
use tokio_util::sync::CancellationToken;
@@ -76,14 +75,10 @@ struct Cli {
#[arg(long)]
split_threshold: Option<u64>,
/// Maximum number of normal-priority reconcilers that may run in parallel
/// Maximum number of reconcilers that may run in parallel
#[arg(long)]
reconciler_concurrency: Option<usize>,
/// Maximum number of high-priority reconcilers that may run in parallel
#[arg(long)]
priority_reconciler_concurrency: Option<usize>,
/// How long to wait for the initial database connection to be available.
#[arg(long, default_value = "5s")]
db_connect_timeout: humantime::Duration,
@@ -294,9 +289,6 @@ async fn async_main() -> anyhow::Result<()> {
reconciler_concurrency: args
.reconciler_concurrency
.unwrap_or(RECONCILER_CONCURRENCY_DEFAULT),
priority_reconciler_concurrency: args
.priority_reconciler_concurrency
.unwrap_or(PRIORITY_RECONCILER_CONCURRENCY_DEFAULT),
split_threshold: args.split_threshold,
neon_local_repo_dir: args.neon_local_repo_dir,
max_secondary_lag_bytes: args.max_secondary_lag_bytes,

View File

@@ -91,10 +91,9 @@ pub(crate) struct ReconcilerConfigBuilder {
}
impl ReconcilerConfigBuilder {
/// Priority is special: you must pick one thoughtfully, do not just use 'normal' as the default
pub(crate) fn new(priority: ReconcilerPriority) -> Self {
pub(crate) fn new() -> Self {
Self {
config: ReconcilerConfig::new(priority),
config: ReconcilerConfig::default(),
}
}
@@ -130,18 +129,8 @@ impl ReconcilerConfigBuilder {
}
}
// Higher priorities are used for user-facing tasks, so that a long backlog of housekeeping work (e.g. reconciling on startup, rescheduling
// things on node changes) does not starve user-facing tasks.
#[derive(Debug, Copy, Clone)]
pub(crate) enum ReconcilerPriority {
Normal,
High,
}
#[derive(Debug, Copy, Clone)]
#[derive(Default, Debug, Copy, Clone)]
pub(crate) struct ReconcilerConfig {
pub(crate) priority: ReconcilerPriority,
// During live migration give up on warming-up the secondary
// after this timeout.
secondary_warmup_timeout: Option<Duration>,
@@ -156,18 +145,6 @@ pub(crate) struct ReconcilerConfig {
}
impl ReconcilerConfig {
/// Configs are always constructed with an explicit priority, to force callers to think about whether
/// the operation they're scheduling is high-priority or not. Normal priority is not a safe default, because
/// scheduling something user-facing at normal priority can result in it getting starved out by background work.
pub(crate) fn new(priority: ReconcilerPriority) -> Self {
Self {
priority,
secondary_warmup_timeout: None,
secondary_download_request_timeout: None,
tenant_creation_hint: false,
}
}
pub(crate) fn get_secondary_warmup_timeout(&self) -> Duration {
const SECONDARY_WARMUP_TIMEOUT_DEFAULT: Duration = Duration::from_secs(300);
self.secondary_warmup_timeout
@@ -187,9 +164,7 @@ impl ReconcilerConfig {
impl From<&MigrationConfig> for ReconcilerConfig {
fn from(value: &MigrationConfig) -> Self {
// Run reconciler at high priority because MigrationConfig comes from human requests that should
// be presumed urgent.
let mut builder = ReconcilerConfigBuilder::new(ReconcilerPriority::High);
let mut builder = ReconcilerConfigBuilder::new();
if let Some(timeout) = value.secondary_warmup_timeout {
builder = builder.secondary_warmup_timeout(timeout)

View File

@@ -30,10 +30,7 @@ use crate::{
AbortShardSplitStatus, ControllerPersistence, DatabaseResult, MetadataHealthPersistence,
ShardGenerationState, TenantFilter,
},
reconciler::{
ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder,
ReconcilerPriority,
},
reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder},
safekeeper::Safekeeper,
scheduler::{MaySchedule, ScheduleContext, ScheduleError, ScheduleMode},
tenant_shard::{
@@ -82,7 +79,7 @@ use pageserver_api::{
},
};
use pageserver_client::{mgmt_api, BlockUnblock};
use tokio::sync::{mpsc::error::TrySendError, TryAcquireError};
use tokio::sync::mpsc::error::TrySendError;
use tokio_util::sync::CancellationToken;
use utils::{
completion::Barrier,
@@ -198,7 +195,6 @@ pub(crate) enum LeadershipStatus {
}
pub const RECONCILER_CONCURRENCY_DEFAULT: usize = 128;
pub const PRIORITY_RECONCILER_CONCURRENCY_DEFAULT: usize = 256;
// Depth of the channel used to enqueue shards for reconciliation when they can't do it immediately.
// This channel is finite-size to avoid using excessive memory if we get into a state where reconciles are finishing more slowly
@@ -370,12 +366,9 @@ pub struct Config {
/// and/or upon handling the re-attach request from a node.
pub max_warming_up_interval: Duration,
/// How many normal-priority Reconcilers may be spawned concurrently
/// How many Reconcilers may be spawned concurrently
pub reconciler_concurrency: usize,
/// How many high-priority Reconcilers may be spawned concurrently
pub priority_reconciler_concurrency: usize,
/// How large must a shard grow in bytes before we split it?
/// None disables auto-splitting.
pub split_threshold: Option<u64>,
@@ -443,14 +436,9 @@ pub struct Service {
// that transition it to/from Active.
node_op_locks: IdLockMap<NodeId, NodeOperations>,
// Limit how many Reconcilers we will spawn concurrently for normal-priority tasks such as background reconciliations
// and reconciliation on startup.
// Limit how many Reconcilers we will spawn concurrently
reconciler_concurrency: Arc<tokio::sync::Semaphore>,
// Limit how many Reconcilers we will spawn concurrently for high-priority tasks such as tenant/timeline CRUD, which
// a human user might be waiting for.
priority_reconciler_concurrency: Arc<tokio::sync::Semaphore>,
/// Queue of tenants who are waiting for concurrency limits to permit them to reconcile
/// Send into this queue to promptly attempt to reconcile this shard next time units are available.
///
@@ -1275,15 +1263,12 @@ impl Service {
}
// Maybe some other work can proceed now that this job finished.
//
// Only bother with this if we have some semaphore units available in the normal-priority semaphore (these
// reconciles are scheduled at `[ReconcilerPriority::Normal]`).
if self.reconciler_concurrency.available_permits() > 0 {
while let Ok(tenant_shard_id) = locked.delayed_reconcile_rx.try_recv() {
let (nodes, tenants, _scheduler) = locked.parts_mut();
if let Some(shard) = tenants.get_mut(&tenant_shard_id) {
shard.delayed_reconcile = false;
self.maybe_reconcile_shard(shard, nodes, ReconcilerPriority::Normal);
self.maybe_reconcile_shard(shard, nodes);
}
if self.reconciler_concurrency.available_permits() == 0 {
@@ -1580,9 +1565,6 @@ impl Service {
reconciler_concurrency: Arc::new(tokio::sync::Semaphore::new(
config.reconciler_concurrency,
)),
priority_reconciler_concurrency: Arc::new(tokio::sync::Semaphore::new(
config.priority_reconciler_concurrency,
)),
delayed_reconcile_tx,
abort_tx,
startup_complete: startup_complete.clone(),
@@ -2355,7 +2337,7 @@ impl Service {
let waiters = {
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, _scheduler) = locked.parts_mut();
let config = ReconcilerConfigBuilder::new(ReconcilerPriority::High)
let config = ReconcilerConfigBuilder::new()
.tenant_creation_hint(true)
.build();
tenants
@@ -2830,8 +2812,7 @@ impl Service {
shard.schedule(scheduler, &mut schedule_context)?;
let maybe_waiter =
self.maybe_reconcile_shard(shard, nodes, ReconcilerPriority::High);
let maybe_waiter = self.maybe_reconcile_shard(shard, nodes);
if let Some(waiter) = maybe_waiter {
waiters.push(waiter);
}
@@ -2952,9 +2933,7 @@ impl Service {
let (nodes, tenants, _scheduler) = locked.parts_mut();
for (_shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
shard.config = config.clone();
if let Some(waiter) =
self.maybe_reconcile_shard(shard, nodes, ReconcilerPriority::High)
{
if let Some(waiter) = self.maybe_reconcile_shard(shard, nodes) {
waiters.push(waiter);
}
}
@@ -3236,9 +3215,7 @@ impl Service {
debug_assert!(shard.intent.get_attached().is_none());
debug_assert!(shard.intent.get_secondary().is_empty());
if let Some(waiter) =
self.maybe_reconcile_shard(shard, nodes, ReconcilerPriority::High)
{
if let Some(waiter) = self.maybe_reconcile_shard(shard, nodes) {
detach_waiters.push(waiter);
}
}
@@ -3390,7 +3367,7 @@ impl Service {
// In case scheduling is being switched back on, try it now.
shard.schedule(scheduler, &mut schedule_context).ok();
self.maybe_reconcile_shard(shard, nodes, ReconcilerPriority::High);
self.maybe_reconcile_shard(shard, nodes);
}
Ok(())
@@ -4439,7 +4416,7 @@ impl Service {
tracing::warn!("Failed to schedule {tenant_shard_id} during shard abort: {e}")
}
self.maybe_reconcile_shard(shard, nodes, ReconcilerPriority::High);
self.maybe_reconcile_shard(shard, nodes);
}
// We don't expect any new_shard_count shards to exist here, but drop them just in case
@@ -4605,11 +4582,7 @@ impl Service {
tracing::warn!("Failed to schedule child shard {child}: {e}");
}
// In the background, attach secondary locations for the new shards
if let Some(waiter) = self.maybe_reconcile_shard(
&mut child_state,
nodes,
ReconcilerPriority::High,
) {
if let Some(waiter) = self.maybe_reconcile_shard(&mut child_state, nodes) {
waiters.push(waiter);
}
@@ -4974,9 +4947,7 @@ impl Service {
shard.intent.clear_secondary(scheduler);
// Run Reconciler to execute detach fo secondary locations.
if let Some(waiter) =
self.maybe_reconcile_shard(shard, nodes, ReconcilerPriority::High)
{
if let Some(waiter) = self.maybe_reconcile_shard(shard, nodes) {
waiters.push(waiter);
}
}
@@ -5244,7 +5215,7 @@ impl Service {
let reconciler_config = match migrate_req.migration_config {
Some(cfg) => (&cfg).into(),
None => ReconcilerConfig::new(ReconcilerPriority::High),
None => ReconcilerConfig::default(),
};
self.maybe_configured_reconcile_shard(shard, nodes, reconciler_config)
@@ -5310,7 +5281,7 @@ impl Service {
);
}
self.maybe_reconcile_shard(shard, nodes, ReconcilerPriority::High)
self.maybe_reconcile_shard(shard, nodes)
};
if let Some(waiter) = waiter {
@@ -5722,7 +5693,7 @@ impl Service {
)
}
self.maybe_reconcile_shard(shard, nodes, ReconcilerPriority::Normal);
self.maybe_reconcile_shard(shard, nodes);
}
// Here we remove an existing observed location for the node we're removing, and it will
@@ -6091,14 +6062,7 @@ impl Service {
tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", node_id);
}
Ok(()) => {
if self
.maybe_reconcile_shard(
tenant_shard,
nodes,
ReconcilerPriority::Normal,
)
.is_some()
{
if self.maybe_reconcile_shard(tenant_shard, nodes).is_some() {
tenants_affected += 1;
};
}
@@ -6129,11 +6093,7 @@ impl Service {
if let Some(observed_loc) = tenant_shard.observed.locations.get_mut(&node_id) {
if observed_loc.conf.is_none() {
self.maybe_reconcile_shard(
tenant_shard,
nodes,
ReconcilerPriority::Normal,
);
self.maybe_reconcile_shard(tenant_shard, nodes);
}
}
}
@@ -6497,36 +6457,8 @@ impl Service {
&self,
shard: &mut TenantShard,
nodes: &Arc<HashMap<NodeId, Node>>,
priority: ReconcilerPriority,
) -> Option<ReconcilerWaiter> {
self.maybe_configured_reconcile_shard(shard, nodes, ReconcilerConfig::new(priority))
}
/// Before constructing a Reconciler, acquire semaphore units from the appropriate concurrency limit (depends on priority)
fn get_reconciler_units(
&self,
priority: ReconcilerPriority,
) -> Result<ReconcileUnits, TryAcquireError> {
let units = match priority {
ReconcilerPriority::Normal => self.reconciler_concurrency.clone().try_acquire_owned(),
ReconcilerPriority::High => {
match self
.priority_reconciler_concurrency
.clone()
.try_acquire_owned()
{
Ok(u) => Ok(u),
Err(TryAcquireError::NoPermits) => {
// If the high priority semaphore is exhausted, then high priority tasks may steal units from
// the normal priority semaphore.
self.reconciler_concurrency.clone().try_acquire_owned()
}
Err(e) => Err(e),
}
}
};
units.map(ReconcileUnits::new)
self.maybe_configured_reconcile_shard(shard, nodes, ReconcilerConfig::default())
}
/// Wrap [`TenantShard`] reconciliation methods with acquisition of [`Gate`] and [`ReconcileUnits`],
@@ -6546,8 +6478,8 @@ impl Service {
}
};
let units = match self.get_reconciler_units(reconciler_config.priority) {
Ok(u) => u,
let units = match self.reconciler_concurrency.clone().try_acquire_owned() {
Ok(u) => ReconcileUnits::new(u),
Err(_) => {
tracing::info!(tenant_id=%shard.tenant_shard_id.tenant_id, shard_id=%shard.tenant_shard_id.shard_slug(),
"Concurrency limited: enqueued for reconcile later");
@@ -6640,10 +6572,7 @@ impl Service {
// Eventual consistency: if an earlier reconcile job failed, and the shard is still
// dirty, spawn another rone
if self
.maybe_reconcile_shard(shard, &pageservers, ReconcilerPriority::Normal)
.is_some()
{
if self.maybe_reconcile_shard(shard, &pageservers).is_some() {
reconciles_spawned += 1;
} else if shard.delayed_reconcile {
// Shard wanted to reconcile but for some reason couldn't.
@@ -6729,10 +6658,7 @@ impl Service {
tracing::info!(tenant_shard_id=%tenant_shard_id, "Applying optimization: {optimization:?}");
if shard.apply_optimization(scheduler, optimization) {
optimizations_applied += 1;
if self
.maybe_reconcile_shard(shard, nodes, ReconcilerPriority::Normal)
.is_some()
{
if self.maybe_reconcile_shard(shard, nodes).is_some() {
reconciles_spawned += 1;
}
}
@@ -7282,7 +7208,7 @@ impl Service {
// to not stall the operation when a cold secondary is encountered.
const SECONDARY_WARMUP_TIMEOUT: Duration = Duration::from_secs(20);
const SECONDARY_DOWNLOAD_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
let reconciler_config = ReconcilerConfigBuilder::new(ReconcilerPriority::Normal)
let reconciler_config = ReconcilerConfigBuilder::new()
.secondary_warmup_timeout(SECONDARY_WARMUP_TIMEOUT)
.secondary_download_request_timeout(SECONDARY_DOWNLOAD_REQUEST_TIMEOUT)
.build();
@@ -7615,7 +7541,7 @@ impl Service {
) -> Result<(), OperationError> {
const SECONDARY_WARMUP_TIMEOUT: Duration = Duration::from_secs(20);
const SECONDARY_DOWNLOAD_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
let reconciler_config = ReconcilerConfigBuilder::new(ReconcilerPriority::Normal)
let reconciler_config = ReconcilerConfigBuilder::new()
.secondary_warmup_timeout(SECONDARY_WARMUP_TIMEOUT)
.secondary_download_request_timeout(SECONDARY_DOWNLOAD_REQUEST_TIMEOUT)
.build();

View File

@@ -88,11 +88,7 @@ impl ChaosInjector {
shard.intent.demote_attached(scheduler, old_location);
shard.intent.promote_attached(scheduler, new_location);
self.service.maybe_reconcile_shard(
shard,
nodes,
crate::reconciler::ReconcilerPriority::Normal,
);
self.service.maybe_reconcile_shard(shard, nodes);
}
async fn inject_chaos(&mut self) {

View File

@@ -4,10 +4,8 @@ import subprocess
import tempfile
from collections.abc import Iterator
from pathlib import Path
from typing import cast
import pytest
from _pytest.config import Config
from fixtures.log_helper import log
from fixtures.neon_cli import AbstractNeonCli
@@ -25,7 +23,6 @@ class FastImport(AbstractNeonCli):
pg_distrib_dir: Path,
pg_version: PgVersion,
workdir: Path,
cleanup: bool = True,
):
if extra_env is None:
env_vars = {}
@@ -50,43 +47,12 @@ class FastImport(AbstractNeonCli):
if not workdir.exists():
raise Exception(f"Working directory '{workdir}' does not exist")
self.workdir = workdir
self.cleanup = cleanup
def run_pgdata(
self,
s3prefix: str | None = None,
pg_port: int | None = None,
source_connection_string: str | None = None,
interactive: bool = False,
):
return self.run(
"pgdata",
s3prefix=s3prefix,
pg_port=pg_port,
source_connection_string=source_connection_string,
interactive=interactive,
)
def run_dump_restore(
self,
s3prefix: str | None = None,
source_connection_string: str | None = None,
destination_connection_string: str | None = None,
):
return self.run(
"dump-restore",
s3prefix=s3prefix,
source_connection_string=source_connection_string,
destination_connection_string=destination_connection_string,
)
def run(
self,
command: str,
s3prefix: str | None = None,
pg_port: int | None = None,
pg_port: int,
source_connection_string: str | None = None,
destination_connection_string: str | None = None,
s3prefix: str | None = None,
interactive: bool = False,
) -> subprocess.CompletedProcess[str]:
if self.cmd is not None:
@@ -94,17 +60,13 @@ class FastImport(AbstractNeonCli):
args = [
f"--pg-bin-dir={self.pg_bin}",
f"--pg-lib-dir={self.pg_lib}",
f"--pg-port={pg_port}",
f"--working-directory={self.workdir}",
]
if s3prefix is not None:
args.append(f"--s3-prefix={s3prefix}")
args.append(command)
if pg_port is not None:
args.append(f"--pg-port={pg_port}")
if source_connection_string is not None:
args.append(f"--source-connection-string={source_connection_string}")
if destination_connection_string is not None:
args.append(f"--destination-connection-string={destination_connection_string}")
if s3prefix is not None:
args.append(f"--s3-prefix={s3prefix}")
if interactive:
args.append("--interactive")
@@ -115,7 +77,7 @@ class FastImport(AbstractNeonCli):
return self
def __exit__(self, *args):
if self.workdir.exists() and self.cleanup:
if self.workdir.exists():
shutil.rmtree(self.workdir)
@@ -125,17 +87,9 @@ def fast_import(
test_output_dir: Path,
neon_binpath: Path,
pg_distrib_dir: Path,
pytestconfig: Config,
) -> Iterator[FastImport]:
workdir = Path(tempfile.mkdtemp(dir=test_output_dir, prefix="fast_import_"))
with FastImport(
None,
neon_binpath,
pg_distrib_dir,
pg_version,
workdir,
cleanup=not cast(bool, pytestconfig.getoption("--preserve-database-files")),
) as fi:
workdir = Path(tempfile.mkdtemp())
with FastImport(None, neon_binpath, pg_distrib_dir, pg_version, workdir) as fi:
yield fi
if fi.cmd is None:

View File

@@ -27,7 +27,6 @@ from urllib.parse import quote, urlparse
import asyncpg
import backoff
import boto3
import httpx
import psycopg2
import psycopg2.sql
@@ -38,8 +37,6 @@ from _pytest.config import Config
from _pytest.config.argparsing import Parser
from _pytest.fixtures import FixtureRequest
from jwcrypto import jwk
from mypy_boto3_kms import KMSClient
from mypy_boto3_s3 import S3Client
# Type-related stuff
from psycopg2.extensions import connection as PgConnection
@@ -202,30 +199,6 @@ def mock_s3_server(port_distributor: PortDistributor) -> Iterator[MockS3Server]:
mock_s3_server.kill()
@pytest.fixture(scope="session")
def mock_kms(mock_s3_server: MockS3Server) -> Iterator[KMSClient]:
yield boto3.client(
"kms",
endpoint_url=mock_s3_server.endpoint(),
region_name=mock_s3_server.region(),
aws_access_key_id=mock_s3_server.access_key(),
aws_secret_access_key=mock_s3_server.secret_key(),
aws_session_token=mock_s3_server.session_token(),
)
@pytest.fixture(scope="session")
def mock_s3_client(mock_s3_server: MockS3Server) -> Iterator[S3Client]:
yield boto3.client(
"s3",
endpoint_url=mock_s3_server.endpoint(),
region_name=mock_s3_server.region(),
aws_access_key_id=mock_s3_server.access_key(),
aws_secret_access_key=mock_s3_server.secret_key(),
aws_session_token=mock_s3_server.session_token(),
)
class PgProtocol:
"""Reusable connection logic"""
@@ -491,7 +464,6 @@ class NeonEnvBuilder:
self.test_may_use_compatibility_snapshot_binaries = False
self.version_combination = combination
self.mixdir = self.test_output_dir / "mixdir_neon"
if self.version_combination is not None:
assert (
self.compatibility_neon_binpath is not None
@@ -703,11 +675,6 @@ class NeonEnvBuilder:
def _mix_versions(self):
assert self.version_combination is not None, "version combination must be set"
# Always use a newer version of `neon_local`
(self.mixdir / "neon_local").symlink_to(self.neon_binpath / "neon_local")
self.neon_local_binpath = self.mixdir
for component, paths in COMPONENT_BINARIES.items():
directory = (
self.neon_binpath
@@ -717,10 +684,9 @@ class NeonEnvBuilder:
for filename in paths:
destination = self.mixdir / filename
destination.symlink_to(directory / filename)
self.neon_binpath = self.mixdir
if self.version_combination["compute"] == "old":
self.pg_distrib_dir = self.compatibility_pg_distrib_dir
self.neon_binpath = self.mixdir
def overlay_mount(self, ident: str, srcdir: Path, dstdir: Path):
"""

View File

@@ -52,11 +52,11 @@ COMPONENT_BINARIES = {
# Disable auto-formatting for better readability
# fmt: off
VERSIONS_COMBINATIONS = (
{"storage_controller": "new", "storage_broker": "new", "compute": "new", "safekeeper": "new", "pageserver": "new"}, # combination: nnnnn
{"storage_controller": "new", "storage_broker": "new", "compute": "old", "safekeeper": "old", "pageserver": "old"}, # combination: ooonn
{"storage_controller": "new", "storage_broker": "new", "compute": "old", "safekeeper": "old", "pageserver": "new"}, # combination: ononn
{"storage_controller": "new", "storage_broker": "new", "compute": "old", "safekeeper": "new", "pageserver": "new"}, # combination: onnnn
{"storage_controller": "old", "storage_broker": "old", "compute": "new", "safekeeper": "new", "pageserver": "new"}, # combination: nnnoo
{"storage_controller": "new", "storage_broker": "new", "compute": "new", "safekeeper": "new", "pageserver": "new"},
{"storage_controller": "new", "storage_broker": "new", "compute": "old", "safekeeper": "old", "pageserver": "old"},
{"storage_controller": "new", "storage_broker": "new", "compute": "old", "safekeeper": "old", "pageserver": "new"},
{"storage_controller": "new", "storage_broker": "new", "compute": "old", "safekeeper": "new", "pageserver": "new"},
{"storage_controller": "old", "storage_broker": "old", "compute": "new", "safekeeper": "new", "pageserver": "new"},
)
# fmt: on

View File

@@ -1,9 +1,7 @@
import base64
import json
import re
import time
from enum import Enum
from pathlib import Path
import psycopg2
import psycopg2.errors
@@ -16,12 +14,8 @@ from fixtures.pageserver.http import (
ImportPgdataIdemptencyKey,
PageserverApiException,
)
from fixtures.pg_version import PgVersion
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import MockS3Server, RemoteStorageKind
from mypy_boto3_kms import KMSClient
from mypy_boto3_kms.type_defs import EncryptResponseTypeDef
from mypy_boto3_s3 import S3Client
from fixtures.remote_storage import RemoteStorageKind
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response
@@ -109,15 +103,13 @@ def test_pgdata_import_smoke(
while True:
relblock_size = vanilla_pg.safe_psql_scalar("select pg_relation_size('t')")
log.info(
f"relblock size: {relblock_size / 8192} pages (target: {target_relblock_size // 8192}) pages"
f"relblock size: {relblock_size/8192} pages (target: {target_relblock_size//8192}) pages"
)
if relblock_size >= target_relblock_size:
break
addrows = int((target_relblock_size - relblock_size) // 8192)
assert addrows >= 1, "forward progress"
vanilla_pg.safe_psql(
f"insert into t select generate_series({nrows + 1}, {nrows + addrows})"
)
vanilla_pg.safe_psql(f"insert into t select generate_series({nrows+1}, {nrows + addrows})")
nrows += addrows
expect_nrows = nrows
expect_sum = (
@@ -340,224 +332,6 @@ def test_pgdata_import_smoke(
br_initdb_endpoint.safe_psql("select * from othertable")
def test_fast_import_with_pageserver_ingest(
test_output_dir,
vanilla_pg: VanillaPostgres,
port_distributor: PortDistributor,
fast_import: FastImport,
pg_distrib_dir: Path,
pg_version: PgVersion,
mock_s3_server: MockS3Server,
mock_kms: KMSClient,
mock_s3_client: S3Client,
neon_env_builder: NeonEnvBuilder,
make_httpserver: HTTPServer,
):
# Prepare KMS and S3
key_response = mock_kms.create_key(
Description="Test key",
KeyUsage="ENCRYPT_DECRYPT",
Origin="AWS_KMS",
)
key_id = key_response["KeyMetadata"]["KeyId"]
def encrypt(x: str) -> EncryptResponseTypeDef:
return mock_kms.encrypt(KeyId=key_id, Plaintext=x)
# Start source postgres and ingest data
vanilla_pg.start()
vanilla_pg.safe_psql("CREATE TABLE foo (a int); INSERT INTO foo SELECT generate_series(1, 10);")
# Setup pageserver and fake cplane for import progress
def handler(request: Request) -> Response:
log.info(f"control plane request: {request.json}")
return Response(json.dumps({}), status=200)
cplane_mgmt_api_server = make_httpserver
cplane_mgmt_api_server.expect_request(re.compile(".*")).respond_with_handler(handler)
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.MOCK_S3)
env = neon_env_builder.init_start()
env.pageserver.patch_config_toml_nonrecursive(
{
"import_pgdata_upcall_api": f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/path/to/mgmt/api",
# because import_pgdata code uses this endpoint, not the one in common remote storage config
# TODO: maybe use common remote_storage config in pageserver?
"import_pgdata_aws_endpoint_url": env.s3_mock_server.endpoint(),
}
)
env.pageserver.stop()
env.pageserver.start()
# Encrypt connstrings and put spec into S3
source_connstring_encrypted = encrypt(vanilla_pg.connstr())
spec = {
"encryption_secret": {"KMS": {"key_id": key_id}},
"source_connstring_ciphertext_base64": base64.b64encode(
source_connstring_encrypted["CiphertextBlob"]
).decode("utf-8"),
"project_id": "someproject",
"branch_id": "somebranch",
}
bucket = "test-bucket"
key_prefix = "test-prefix"
mock_s3_client.create_bucket(Bucket=bucket)
mock_s3_client.put_object(Bucket=bucket, Key=f"{key_prefix}/spec.json", Body=json.dumps(spec))
# Create timeline with import_pgdata
tenant_id = TenantId.generate()
env.storage_controller.tenant_create(tenant_id)
timeline_id = TimelineId.generate()
log.info("starting import")
start = time.monotonic()
idempotency = ImportPgdataIdemptencyKey.random()
log.info(f"idempotency key {idempotency}")
# TODO: teach neon_local CLI about the idempotency & 429 error so we can run inside the loop
# and check for 429
import_branch_name = "imported"
env.storage_controller.timeline_create(
tenant_id,
{
"new_timeline_id": str(timeline_id),
"import_pgdata": {
"idempotency_key": str(idempotency),
"location": {
"AwsS3": {
"region": env.s3_mock_server.region(),
"bucket": bucket,
"key": key_prefix,
}
},
},
},
)
env.neon_cli.mappings_map_branch(import_branch_name, tenant_id, timeline_id)
# Run fast_import
if fast_import.extra_env is None:
fast_import.extra_env = {}
fast_import.extra_env["AWS_ACCESS_KEY_ID"] = mock_s3_server.access_key()
fast_import.extra_env["AWS_SECRET_ACCESS_KEY"] = mock_s3_server.secret_key()
fast_import.extra_env["AWS_SESSION_TOKEN"] = mock_s3_server.session_token()
fast_import.extra_env["AWS_REGION"] = mock_s3_server.region()
fast_import.extra_env["AWS_ENDPOINT_URL"] = mock_s3_server.endpoint()
fast_import.extra_env["RUST_LOG"] = "aws_config=debug,aws_sdk_kms=debug"
pg_port = port_distributor.get_port()
fast_import.run_pgdata(pg_port=pg_port, s3prefix=f"s3://{bucket}/{key_prefix}")
vanilla_pg.stop()
def validate_vanilla_equivalence(ep):
res = ep.safe_psql("SELECT count(*), sum(a) FROM foo;", dbname="neondb")
assert res[0] == (10, 55), f"got result: {res}"
# Sanity check that data in pgdata is expected:
pgbin = PgBin(test_output_dir, fast_import.pg_distrib_dir, fast_import.pg_version)
with VanillaPostgres(
fast_import.workdir / "pgdata", pgbin, pg_port, False
) as new_pgdata_vanilla_pg:
new_pgdata_vanilla_pg.start()
# database name and user are hardcoded in fast_import binary, and they are different from normal vanilla postgres
conn = PgProtocol(dsn=f"postgresql://cloud_admin@localhost:{pg_port}/neondb")
validate_vanilla_equivalence(conn)
# Poll pageserver statuses in s3
while True:
locations = env.storage_controller.locate(tenant_id)
active_count = 0
for location in locations:
shard_id = TenantShardId.parse(location["shard_id"])
ps = env.get_pageserver(location["node_id"])
try:
detail = ps.http_client().timeline_detail(shard_id, timeline_id)
log.info(f"timeline {tenant_id}/{timeline_id} detail: {detail}")
state = detail["state"]
log.info(f"shard {shard_id} state: {state}")
if state == "Active":
active_count += 1
except PageserverApiException as e:
if e.status_code == 404:
log.info("not found, import is in progress")
continue
elif e.status_code == 429:
log.info("import is in progress")
continue
else:
raise
if state == "Active":
key = f"{key_prefix}/status/shard-{shard_id.shard_index}"
shard_status_file_contents = (
mock_s3_client.get_object(Bucket=bucket, Key=key)["Body"].read().decode("utf-8")
)
shard_status = json.loads(shard_status_file_contents)
assert shard_status["done"] is True
if active_count == len(locations):
log.info("all shards are active")
break
time.sleep(0.5)
import_duration = time.monotonic() - start
log.info(f"import complete; duration={import_duration:.2f}s")
ep = env.endpoints.create_start(branch_name=import_branch_name, tenant_id=tenant_id)
# check that data is there
validate_vanilla_equivalence(ep)
# check that we can do basic ops
ep.safe_psql("create table othertable(values text)", dbname="neondb")
rw_lsn = Lsn(ep.safe_psql_scalar("select pg_current_wal_flush_lsn()"))
ep.stop()
# ... at the tip
_ = env.create_branch(
new_branch_name="br-tip",
ancestor_branch_name=import_branch_name,
tenant_id=tenant_id,
ancestor_start_lsn=rw_lsn,
)
br_tip_endpoint = env.endpoints.create_start(
branch_name="br-tip", endpoint_id="br-tip-ro", tenant_id=tenant_id
)
validate_vanilla_equivalence(br_tip_endpoint)
br_tip_endpoint.safe_psql("select * from othertable", dbname="neondb")
br_tip_endpoint.stop()
# ... at the initdb lsn
locations = env.storage_controller.locate(tenant_id)
[shard_zero] = [
loc for loc in locations if TenantShardId.parse(loc["shard_id"]).shard_number == 0
]
shard_zero_ps = env.get_pageserver(shard_zero["node_id"])
shard_zero_timeline_info = shard_zero_ps.http_client().timeline_detail(
shard_zero["shard_id"], timeline_id
)
initdb_lsn = Lsn(shard_zero_timeline_info["initdb_lsn"])
_ = env.create_branch(
new_branch_name="br-initdb",
ancestor_branch_name=import_branch_name,
tenant_id=tenant_id,
ancestor_start_lsn=initdb_lsn,
)
br_initdb_endpoint = env.endpoints.create_start(
branch_name="br-initdb", endpoint_id="br-initdb-ro", tenant_id=tenant_id
)
validate_vanilla_equivalence(br_initdb_endpoint)
with pytest.raises(psycopg2.errors.UndefinedTable):
br_initdb_endpoint.safe_psql("select * from othertable", dbname="neondb")
br_initdb_endpoint.stop()
env.pageserver.stop(immediate=True)
def test_fast_import_binary(
test_output_dir,
vanilla_pg: VanillaPostgres,
@@ -568,7 +342,7 @@ def test_fast_import_binary(
vanilla_pg.safe_psql("CREATE TABLE foo (a int); INSERT INTO foo SELECT generate_series(1, 10);")
pg_port = port_distributor.get_port()
fast_import.run_pgdata(pg_port=pg_port, source_connection_string=vanilla_pg.connstr())
fast_import.run(pg_port, vanilla_pg.connstr())
vanilla_pg.stop()
pgbin = PgBin(test_output_dir, fast_import.pg_distrib_dir, fast_import.pg_version)
@@ -584,118 +358,6 @@ def test_fast_import_binary(
assert res[0][0] == 10
def test_fast_import_restore_to_connstring(
test_output_dir,
vanilla_pg: VanillaPostgres,
port_distributor: PortDistributor,
fast_import: FastImport,
pg_distrib_dir: Path,
pg_version: PgVersion,
):
vanilla_pg.start()
vanilla_pg.safe_psql("CREATE TABLE foo (a int); INSERT INTO foo SELECT generate_series(1, 10);")
pgdatadir = test_output_dir / "destination-pgdata"
pg_bin = PgBin(test_output_dir, pg_distrib_dir, pg_version)
port = port_distributor.get_port()
with VanillaPostgres(pgdatadir, pg_bin, port) as destination_vanilla_pg:
destination_vanilla_pg.configure(["shared_preload_libraries='neon_rmgr'"])
destination_vanilla_pg.start()
# create another database & role and try to restore there
destination_vanilla_pg.safe_psql("""
CREATE ROLE testrole WITH
LOGIN
PASSWORD 'testpassword'
NOSUPERUSER
NOCREATEDB
NOCREATEROLE;
""")
destination_vanilla_pg.safe_psql("CREATE DATABASE testdb OWNER testrole;")
destination_connstring = destination_vanilla_pg.connstr(
dbname="testdb", user="testrole", password="testpassword"
)
fast_import.run_dump_restore(
source_connection_string=vanilla_pg.connstr(),
destination_connection_string=destination_connstring,
)
vanilla_pg.stop()
conn = PgProtocol(dsn=destination_connstring)
res = conn.safe_psql("SELECT count(*) FROM foo;")
log.info(f"Result: {res}")
assert res[0][0] == 10
def test_fast_import_restore_to_connstring_from_s3_spec(
test_output_dir,
vanilla_pg: VanillaPostgres,
port_distributor: PortDistributor,
fast_import: FastImport,
pg_distrib_dir: Path,
pg_version: PgVersion,
mock_s3_server: MockS3Server,
mock_kms: KMSClient,
mock_s3_client: S3Client,
):
# Prepare KMS and S3
key_response = mock_kms.create_key(
Description="Test key",
KeyUsage="ENCRYPT_DECRYPT",
Origin="AWS_KMS",
)
key_id = key_response["KeyMetadata"]["KeyId"]
def encrypt(x: str) -> EncryptResponseTypeDef:
return mock_kms.encrypt(KeyId=key_id, Plaintext=x)
# Start source postgres and ingest data
vanilla_pg.start()
vanilla_pg.safe_psql("CREATE TABLE foo (a int); INSERT INTO foo SELECT generate_series(1, 10);")
# Start target postgres
pgdatadir = test_output_dir / "destination-pgdata"
pg_bin = PgBin(test_output_dir, pg_distrib_dir, pg_version)
port = port_distributor.get_port()
with VanillaPostgres(pgdatadir, pg_bin, port) as destination_vanilla_pg:
destination_vanilla_pg.configure(["shared_preload_libraries='neon_rmgr'"])
destination_vanilla_pg.start()
# Encrypt connstrings and put spec into S3
source_connstring_encrypted = encrypt(vanilla_pg.connstr())
destination_connstring_encrypted = encrypt(destination_vanilla_pg.connstr())
spec = {
"encryption_secret": {"KMS": {"key_id": key_id}},
"source_connstring_ciphertext_base64": base64.b64encode(
source_connstring_encrypted["CiphertextBlob"]
).decode("utf-8"),
"destination_connstring_ciphertext_base64": base64.b64encode(
destination_connstring_encrypted["CiphertextBlob"]
).decode("utf-8"),
}
mock_s3_client.create_bucket(Bucket="test-bucket")
mock_s3_client.put_object(
Bucket="test-bucket", Key="test-prefix/spec.json", Body=json.dumps(spec)
)
# Run fast_import
if fast_import.extra_env is None:
fast_import.extra_env = {}
fast_import.extra_env["AWS_ACCESS_KEY_ID"] = mock_s3_server.access_key()
fast_import.extra_env["AWS_SECRET_ACCESS_KEY"] = mock_s3_server.secret_key()
fast_import.extra_env["AWS_SESSION_TOKEN"] = mock_s3_server.session_token()
fast_import.extra_env["AWS_REGION"] = mock_s3_server.region()
fast_import.extra_env["AWS_ENDPOINT_URL"] = mock_s3_server.endpoint()
fast_import.extra_env["RUST_LOG"] = "aws_config=debug,aws_sdk_kms=debug"
fast_import.run_dump_restore(s3prefix="s3://test-bucket/test-prefix")
vanilla_pg.stop()
res = destination_vanilla_pg.safe_psql("SELECT count(*) FROM foo;")
log.info(f"Result: {res}")
assert res[0][0] == 10
# TODO: Maybe test with pageserver?
# 1. run whole neon env
# 2. create timeline with some s3 path???

View File

@@ -72,11 +72,6 @@ def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin):
thread.join()
# Fill LFC: seqscan should fetch the whole table in cache.
# It is needed for further correct evaluation of LFC file size
# (a sparse chunk of LFC takes less than 1 MB on disk).
cur.execute("select sum(abalance) from pgbench_accounts")
# Before shrinking the cache, check that it really is large now
(lfc_file_size, lfc_file_blocks) = get_lfc_size()
assert int(lfc_file_blocks) > 128 * 1024

View File

@@ -1,68 +0,0 @@
from __future__ import annotations
from fixtures.neon_fixtures import (
NeonEnvBuilder,
)
def test_pageserver_reldir_v2(
neon_env_builder: NeonEnvBuilder,
):
env = neon_env_builder.init_start(
initial_tenant_conf={
"rel_size_v2_enabled": "false",
}
)
endpoint = env.endpoints.create_start("main")
# Create a relation in v1
endpoint.safe_psql("CREATE TABLE foo1 (id INTEGER PRIMARY KEY, val text)")
endpoint.safe_psql("CREATE TABLE foo2 (id INTEGER PRIMARY KEY, val text)")
# Switch to v2
env.pageserver.http_client().update_tenant_config(
env.initial_tenant,
{
"rel_size_v2_enabled": True,
},
)
# Check if both relations are still accessible
endpoint.safe_psql("SELECT * FROM foo1")
endpoint.safe_psql("SELECT * FROM foo2")
# Restart the endpoint
endpoint.stop()
endpoint.start()
# Check if both relations are still accessible again after restart
endpoint.safe_psql("SELECT * FROM foo1")
endpoint.safe_psql("SELECT * FROM foo2")
# Create a relation in v2
endpoint.safe_psql("CREATE TABLE foo3 (id INTEGER PRIMARY KEY, val text)")
# Delete a relation in v1
endpoint.safe_psql("DROP TABLE foo1")
# Check if both relations are still accessible
endpoint.safe_psql("SELECT * FROM foo2")
endpoint.safe_psql("SELECT * FROM foo3")
# Restart the endpoint
endpoint.stop()
# This will acquire a basebackup, which lists all relations.
endpoint.start()
# Check if both relations are still accessible
endpoint.safe_psql("DROP TABLE IF EXISTS foo1")
endpoint.safe_psql("SELECT * FROM foo2")
endpoint.safe_psql("SELECT * FROM foo3")
endpoint.safe_psql("DROP TABLE foo3")
endpoint.stop()
endpoint.start()
# Check if relations are still accessible
endpoint.safe_psql("DROP TABLE IF EXISTS foo1")
endpoint.safe_psql("SELECT * FROM foo2")
endpoint.safe_psql("DROP TABLE IF EXISTS foo3")

View File

@@ -481,8 +481,7 @@ def test_pageserver_metrics_many_relations(neon_env_builder: NeonEnvBuilder):
counts = timeline_detail["directory_entries_counts"]
assert counts
log.info(f"directory counts: {counts}")
# We need to add up reldir v1 + v2 counts
assert counts[2] + counts[7] > COUNT_AT_LEAST_EXPECTED
assert counts[2] > COUNT_AT_LEAST_EXPECTED
def test_timelines_parallel_endpoints(neon_simple_env: NeonEnv):

View File

@@ -1445,7 +1445,6 @@ def test_peer_recovery(neon_env_builder: NeonEnvBuilder):
# roughly fills one segment
endpoint.safe_psql("insert into t select generate_series(1,250000), 'payload'")
lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
endpoint.stop() # stop compute
@@ -1474,15 +1473,7 @@ def test_peer_recovery(neon_env_builder: NeonEnvBuilder):
"flush_lsn to get aligned",
)
sk1_digest = sk1.http_client().timeline_digest(
tenant_id, timeline_id, sk1.get_timeline_start_lsn(tenant_id, timeline_id), lsn
)
sk2_digest = sk1.http_client().timeline_digest(
tenant_id, timeline_id, sk2.get_timeline_start_lsn(tenant_id, timeline_id), lsn
)
assert sk1_digest == sk2_digest
cmp_sk_wal([sk1, sk2], tenant_id, timeline_id)
# stop one of safekeepers which weren't recovering and insert a bit more to check we can commit
env.safekeepers[2].stop()