Compare commits

..

2 Commits

Author SHA1 Message Date
Christian Schwarz
2c0f0fb962 increase safekeeper stack size 2024-07-09 13:36:20 +00:00
Christian Schwarz
15aea8c8c2 postgres_backend: stop using async_trait 2024-07-05 21:40:15 +02:00
66 changed files with 1383 additions and 2723 deletions

View File

@@ -115,7 +115,6 @@ runs:
export POSTGRES_DISTRIB_DIR=${POSTGRES_DISTRIB_DIR:-/tmp/neon/pg_install}
export DEFAULT_PG_VERSION=${PG_VERSION#v}
export LD_LIBRARY_PATH=${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/lib
export BENCHMARK_CONNSTR=${BENCHMARK_CONNSTR:-}
if [ "${BUILD_TYPE}" = "remote" ]; then
export REMOTE_ENV=1

View File

@@ -99,14 +99,7 @@ jobs:
# Set --sparse-ordering option of pytest-order plugin
# to ensure tests are running in order of appears in the file.
# It's important for test_perf_pgbench.py::test_pgbench_remote_* tests
extra_params:
-m remote_cluster
--sparse-ordering
--timeout 5400
--ignore test_runner/performance/test_perf_olap.py
--ignore test_runner/performance/test_perf_pgvector_queries.py
--ignore test_runner/performance/test_logical_replication.py
--ignore test_runner/performance/test_physical_replication.py
extra_params: -m remote_cluster --sparse-ordering --timeout 5400 --ignore test_runner/performance/test_perf_olap.py --ignore test_runner/performance/test_perf_pgvector_queries.py
env:
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
@@ -132,69 +125,6 @@ jobs:
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
replication-tests:
env:
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
DEFAULT_PG_VERSION: 14
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
PLATFORM: "neon-staging"
runs-on: [ self-hosted, us-east-2, x64 ]
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:pinned
options: --init
steps:
- uses: actions/checkout@v4
- name: Download Neon artifact
uses: ./.github/actions/download
with:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
- name: Run benchmark
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance/test_logical_replication.py
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 5400
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
NEON_API_KEY: ${{ secrets.NEON_STAGING_API_KEY }}
- name: Run benchmark
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance/test_physical_replication.py
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 5400
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
NEON_API_KEY: ${{ secrets.NEON_STAGING_API_KEY }}
- name: Create Allure report
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}
uses: slackapi/slack-github-action@v1
with:
channel-id: "C033QLM5P7D" # dev-staging-stream
slack-message: "Periodic replication testing: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
generate-matrices:
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
# Create matrices for the benchmarking jobs, so we run benchmarks on rds only once a week (on Saturday)

View File

@@ -1336,7 +1336,6 @@ jobs:
env:
BUCKET: neon-github-public-dev
PREFIX: artifacts/latest
COMMIT_SHA: ${{ github.event.pull_request.head.sha || github.sha }}
run: |
# Update compatibility snapshot for the release
for pg_version in v14 v15 v16; do
@@ -1350,7 +1349,7 @@ jobs:
# Update Neon artifact for the release (reuse already uploaded artifact)
for build_type in debug release; do
OLD_PREFIX=artifacts/${COMMIT_SHA}/${GITHUB_RUN_ID}
OLD_PREFIX=artifacts/${GITHUB_RUN_ID}
FILENAME=neon-${{ runner.os }}-${{ runner.arch }}-${build_type}-artifact.tar.zst
S3_KEY=$(aws s3api list-objects-v2 --bucket ${BUCKET} --prefix ${OLD_PREFIX} | jq -r '.Contents[]?.Key' | grep ${FILENAME} | sort --version-sort | tail -1 || true)

View File

@@ -6,11 +6,6 @@ on:
- ready_for_review
workflow_call:
workflow_run:
workflows: ["Build and Test"]
types:
- completed
defaults:
run:
shell: bash -euxo pipefail {0}

71
Cargo.lock generated
View File

@@ -1397,9 +1397,9 @@ dependencies = [
[[package]]
name = "crc32c"
version = "0.6.8"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a47af21622d091a8f0fb295b88bc886ac74efcc613efc19f5d0b21de5c89e47"
checksum = "89254598aa9b9fa608de44b3ae54c810f0f06d755e24c50177f1f8f31ff50ce2"
dependencies = [
"rustc_version",
]
@@ -1651,16 +1651,6 @@ dependencies = [
"rusticata-macros",
]
[[package]]
name = "deranged"
version = "0.3.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4"
dependencies = [
"powerfmt",
"serde",
]
[[package]]
name = "desim"
version = "0.1.0"
@@ -3018,9 +3008,9 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
[[package]]
name = "measured"
version = "0.0.22"
version = "0.0.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3051f3a030d55d680cdef6ca50e80abd1182f8da29f2344a7c9cb575721138f0"
checksum = "652bc741286361c06de8cb4d89b21a6437f120c508c51713663589eeb9928ac5"
dependencies = [
"bytes",
"crossbeam-utils",
@@ -3036,9 +3026,9 @@ dependencies = [
[[package]]
name = "measured-derive"
version = "0.0.22"
version = "0.0.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9e6777fc80a575f9503d908c8b498782a6c3ee88a06cb416dc3941401e43b94"
checksum = "6ea497f33e1e856a376c32ad916f69a0bd3c597db1f912a399f842b01a4a685d"
dependencies = [
"heck 0.5.0",
"proc-macro2",
@@ -3048,9 +3038,9 @@ dependencies = [
[[package]]
name = "measured-process"
version = "0.0.22"
version = "0.0.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c4b80445aeb08e832d87bf1830049a924cdc1d6b7ef40b6b9b365bff17bf8ec"
checksum = "b364ccb66937a814b6b2ad751d1a2f7a9d5a78c761144036825fb36bb0771000"
dependencies = [
"libc",
"measured",
@@ -3285,12 +3275,6 @@ dependencies = [
"num-traits",
]
[[package]]
name = "num-conv"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9"
[[package]]
name = "num-integer"
version = "0.1.45"
@@ -3683,7 +3667,6 @@ dependencies = [
"sysinfo",
"tenant_size_model",
"thiserror",
"tikv-jemallocator",
"tokio",
"tokio-epoll-uring",
"tokio-io-timeout",
@@ -4081,7 +4064,6 @@ name = "postgres_backend"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"bytes",
"futures",
"once_cell",
@@ -4094,7 +4076,6 @@ dependencies = [
"tokio-postgres",
"tokio-postgres-rustls",
"tokio-rustls 0.25.0",
"tokio-util",
"tracing",
"workspace_hack",
]
@@ -4135,12 +4116,6 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "powerfmt"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391"
[[package]]
name = "ppv-lite86"
version = "0.2.17"
@@ -5420,9 +5395,9 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4"
[[package]]
name = "serde"
version = "1.0.203"
version = "1.0.183"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7253ab4de971e72fb7be983802300c30b5a7f0c2e56fab8abfc6a214307c0094"
checksum = "32ac8da02677876d532745a130fc9d8e6edfa81a269b107c5b00829b91d8eb3c"
dependencies = [
"serde_derive",
]
@@ -5439,9 +5414,9 @@ dependencies = [
[[package]]
name = "serde_derive"
version = "1.0.203"
version = "1.0.183"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "500cbc0ebeb6f46627f50f3f5811ccf6bf00643be300b4c3eabc0ef55dc5b5ba"
checksum = "aafe972d60b0b9bee71a91b92fee2d4fb3c9d7e8f6b179aa99f27203d99a4816"
dependencies = [
"proc-macro2",
"quote",
@@ -6131,15 +6106,12 @@ dependencies = [
[[package]]
name = "time"
version = "0.3.36"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5dfd88e563464686c916c7e46e623e520ddc6d79fa6641390f2e3fa86e83e885"
checksum = "8f3403384eaacbca9923fa06940178ac13e4edb725486d70e8e15881d0c836cc"
dependencies = [
"deranged",
"itoa",
"js-sys",
"num-conv",
"powerfmt",
"serde",
"time-core",
"time-macros",
@@ -6147,17 +6119,16 @@ dependencies = [
[[package]]
name = "time-core"
version = "0.1.2"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3"
checksum = "7300fbefb4dadc1af235a9cef3737cea692a9d97e1b9cbcd4ebdae6f8868e6fb"
[[package]]
name = "time-macros"
version = "0.2.18"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f252a68540fde3a3877aeea552b832b40ab9a69e318efd078774a01ddee1ccf"
checksum = "372950940a5f07bf38dbe211d7283c9e6d7327df53794992d293e534c733d09b"
dependencies = [
"num-conv",
"time-core",
]
@@ -7455,12 +7426,13 @@ dependencies = [
"clap",
"clap_builder",
"crossbeam-utils",
"deranged",
"either",
"fail",
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-util",
"getrandom 0.2.11",
"hashbrown 0.14.5",
@@ -7478,9 +7450,7 @@ dependencies = [
"num-traits",
"once_cell",
"parquet",
"proc-macro2",
"prost",
"quote",
"rand 0.8.5",
"regex",
"regex-automata 0.4.3",
@@ -7497,7 +7467,6 @@ dependencies = [
"syn 1.0.109",
"syn 2.0.52",
"sync_wrapper",
"tikv-jemalloc-sys",
"time",
"time-macros",
"tokio",

View File

@@ -111,8 +111,8 @@ lasso = "0.7"
leaky-bucket = "1.0.1"
libc = "0.2"
md5 = "0.7.0"
measured = { version = "0.0.22", features=["lasso"] }
measured-process = { version = "0.0.22" }
measured = { version = "0.0.21", features=["lasso"] }
measured-process = { version = "0.0.21" }
memoffset = "0.8"
nix = { version = "0.27", features = ["fs", "process", "socket", "signal", "poll"] }
notify = "6.0.0"

View File

@@ -798,11 +798,7 @@ impl ComputeNode {
// In this case we need to connect with old `zenith_admin` name
// and create new user. We cannot simply rename connected user,
// but we can create a new one and grant it all privileges.
let mut connstr = self.connstr.clone();
connstr
.query_pairs_mut()
.append_pair("application_name", "apply_config");
let connstr = self.connstr.clone();
let mut client = match Client::connect(connstr.as_str(), NoTls) {
Err(e) => match e.code() {
Some(&SqlState::INVALID_PASSWORD)
@@ -871,11 +867,6 @@ impl ComputeNode {
// Run migrations separately to not hold up cold starts
thread::spawn(move || {
let mut connstr = connstr.clone();
connstr
.query_pairs_mut()
.append_pair("application_name", "migrations");
let mut client = Client::connect(connstr.as_str(), NoTls)?;
handle_migrations(&mut client).context("apply_config handle_migrations")
});
@@ -1395,9 +1386,7 @@ pub fn forward_termination_signal() {
let pg_pid = PG_PID.load(Ordering::SeqCst);
if pg_pid != 0 {
let pg_pid = nix::unistd::Pid::from_raw(pg_pid as i32);
// Use 'fast' shutdown (SIGINT) because it also creates a shutdown checkpoint, which is important for
// ROs to get a list of running xacts faster instead of going through the CLOG.
// See https://www.postgresql.org/docs/current/server-shutdown.html for the list of modes and signals.
kill(pg_pid, Signal::SIGINT).ok();
// use 'immediate' shutdown (SIGQUIT): https://www.postgresql.org/docs/current/server-shutdown.html
kill(pg_pid, Signal::SIGQUIT).ok();
}
}

View File

@@ -11,7 +11,6 @@ pub mod logger;
pub mod catalog;
pub mod compute;
pub mod extension_server;
mod migration;
pub mod monitor;
pub mod params;
pub mod pg_helpers;

View File

@@ -1,100 +0,0 @@
use anyhow::{Context, Result};
use postgres::Client;
use tracing::info;
pub(crate) struct MigrationRunner<'m> {
client: &'m mut Client,
migrations: &'m [&'m str],
}
impl<'m> MigrationRunner<'m> {
pub fn new(client: &'m mut Client, migrations: &'m [&'m str]) -> Self {
Self { client, migrations }
}
fn get_migration_id(&mut self) -> Result<i64> {
let query = "SELECT id FROM neon_migration.migration_id";
let row = self
.client
.query_one(query, &[])
.context("run_migrations get migration_id")?;
Ok(row.get::<&str, i64>("id"))
}
fn update_migration_id(&mut self) -> Result<()> {
let setval = format!(
"UPDATE neon_migration.migration_id SET id={}",
self.migrations.len()
);
self.client
.simple_query(&setval)
.context("run_migrations update id")?;
Ok(())
}
fn prepare_migrations(&mut self) -> Result<()> {
let query = "CREATE SCHEMA IF NOT EXISTS neon_migration";
self.client.simple_query(query)?;
let query = "CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)";
self.client.simple_query(query)?;
let query = "INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING";
self.client.simple_query(query)?;
let query = "ALTER SCHEMA neon_migration OWNER TO cloud_admin";
self.client.simple_query(query)?;
let query = "REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC";
self.client.simple_query(query)?;
Ok(())
}
pub fn run_migrations(mut self) -> Result<()> {
self.prepare_migrations()?;
let mut current_migration: usize = self.get_migration_id()? as usize;
let starting_migration_id = current_migration;
let query = "BEGIN";
self.client
.simple_query(query)
.context("run_migrations begin")?;
while current_migration < self.migrations.len() {
let migration = self.migrations[current_migration];
if migration.starts_with("-- SKIP") {
info!("Skipping migration id={}", current_migration);
} else {
info!(
"Running migration id={}:\n{}\n",
current_migration, migration
);
self.client.simple_query(migration).with_context(|| {
format!("run_migration current_migration={}", current_migration)
})?;
}
current_migration += 1;
}
self.update_migration_id()?;
let query = "COMMIT";
self.client
.simple_query(query)
.context("run_migrations commit")?;
info!(
"Ran {} migrations",
(self.migrations.len() - starting_migration_id)
);
Ok(())
}
}

View File

@@ -10,7 +10,6 @@ use tracing::{error, info, info_span, instrument, span_enabled, warn, Level};
use crate::config;
use crate::logger::inlinify;
use crate::migration::MigrationRunner;
use crate::params::PG_HBA_ALL_MD5;
use crate::pg_helpers::*;
@@ -792,7 +791,69 @@ pub fn handle_migrations(client: &mut Client) -> Result<()> {
include_str!("./migrations/0008-revoke_replication_for_previously_allowed_roles.sql"),
];
MigrationRunner::new(client, &migrations).run_migrations()?;
let mut func = || {
let query = "CREATE SCHEMA IF NOT EXISTS neon_migration";
client.simple_query(query)?;
let query = "CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)";
client.simple_query(query)?;
let query = "INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING";
client.simple_query(query)?;
let query = "ALTER SCHEMA neon_migration OWNER TO cloud_admin";
client.simple_query(query)?;
let query = "REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC";
client.simple_query(query)?;
Ok::<_, anyhow::Error>(())
};
func().context("handle_migrations prepare")?;
let query = "SELECT id FROM neon_migration.migration_id";
let row = client
.query_one(query, &[])
.context("handle_migrations get migration_id")?;
let mut current_migration: usize = row.get::<&str, i64>("id") as usize;
let starting_migration_id = current_migration;
let query = "BEGIN";
client
.simple_query(query)
.context("handle_migrations begin")?;
while current_migration < migrations.len() {
let migration = &migrations[current_migration];
if migration.starts_with("-- SKIP") {
info!("Skipping migration id={}", current_migration);
} else {
info!(
"Running migration id={}:\n{}\n",
current_migration, migration
);
client.simple_query(migration).with_context(|| {
format!("handle_migrations current_migration={}", current_migration)
})?;
}
current_migration += 1;
}
let setval = format!(
"UPDATE neon_migration.migration_id SET id={}",
migrations.len()
);
client
.simple_query(&setval)
.context("handle_migrations update id")?;
let query = "COMMIT";
client
.simple_query(query)
.context("handle_migrations commit")?;
info!(
"Ran {} migrations",
(migrations.len() - starting_migration_id)
);
Ok(())
}

View File

@@ -15,6 +15,7 @@ use std::time::Duration;
use anyhow::{bail, Context};
use camino::Utf8PathBuf;
use futures::SinkExt;
use pageserver_api::models::{
self, AuxFilePolicy, LocationConfig, TenantHistorySize, TenantInfo, TimelineInfo,
};
@@ -565,39 +566,60 @@ impl PageServerNode {
pg_wal: Option<(Lsn, PathBuf)>,
pg_version: u32,
) -> anyhow::Result<()> {
let (client, conn) = self.page_server_psql_client().await?;
// The connection object performs the actual communication with the database,
// so spawn it off to run on its own.
tokio::spawn(async move {
if let Err(e) = conn.await {
eprintln!("connection error: {}", e);
}
});
let client = std::pin::pin!(client);
// Init base reader
let (start_lsn, base_tarfile_path) = base;
let base_tarfile = tokio::fs::File::open(base_tarfile_path).await?;
let base_tarfile =
mgmt_api::ReqwestBody::wrap_stream(tokio_util::io::ReaderStream::new(base_tarfile));
let base_tarfile = tokio_util::io::ReaderStream::new(base_tarfile);
// Init wal reader if necessary
let (end_lsn, wal_reader) = if let Some((end_lsn, wal_tarfile_path)) = pg_wal {
let wal_tarfile = tokio::fs::File::open(wal_tarfile_path).await?;
let wal_reader =
mgmt_api::ReqwestBody::wrap_stream(tokio_util::io::ReaderStream::new(wal_tarfile));
let wal_reader = tokio_util::io::ReaderStream::new(wal_tarfile);
(end_lsn, Some(wal_reader))
} else {
(start_lsn, None)
};
// Import base
self.http_client
.import_basebackup(
tenant_id,
timeline_id,
start_lsn,
end_lsn,
pg_version,
base_tarfile,
)
.await?;
let copy_in = |reader, cmd| {
let client = &client;
async move {
let writer = client.copy_in(&cmd).await?;
let writer = std::pin::pin!(writer);
let mut writer = writer.sink_map_err(|e| {
std::io::Error::new(std::io::ErrorKind::Other, format!("{e}"))
});
let mut reader = std::pin::pin!(reader);
writer.send_all(&mut reader).await?;
writer.into_inner().finish().await?;
anyhow::Ok(())
}
};
// Import base
copy_in(
base_tarfile,
format!(
"import basebackup {tenant_id} {timeline_id} {start_lsn} {end_lsn} {pg_version}"
),
)
.await?;
// Import wal if necessary
if let Some(wal_reader) = wal_reader {
self.http_client
.import_wal(tenant_id, timeline_id, start_lsn, end_lsn, wal_reader)
.await?;
copy_in(
wal_reader,
format!("import wal {tenant_id} {timeline_id} {start_lsn} {end_lsn}"),
)
.await?;
}
Ok(())

View File

@@ -13,7 +13,11 @@ use std::{
use measured::{
label::{LabelGroupVisitor, LabelName, LabelValue, LabelVisitor},
metric::{counter::CounterState, name::MetricNameEncoder, Metric, MetricType, MetricVec},
metric::{
group::{Encoding, MetricValue},
name::MetricNameEncoder,
Metric, MetricType, MetricVec,
},
text::TextEncoder,
LabelGroup,
};
@@ -140,7 +144,6 @@ impl<const N: usize> HyperLogLogState<N> {
})
}
}
impl<W: std::io::Write, const N: usize> measured::metric::MetricEncoding<TextEncoder<W>>
for HyperLogLogState<N>
{
@@ -179,13 +182,12 @@ impl<W: std::io::Write, const N: usize> measured::metric::MetricEncoding<TextEnc
.into_iter()
.enumerate()
.try_for_each(|(hll_shard, val)| {
CounterState::new(val as u64).collect_into(
&(),
enc.write_metric_value(
name.by_ref(),
labels.by_ref().compose_with(HllShardLabel {
hll_shard: hll_shard as i64,
}),
name.by_ref(),
enc,
MetricValue::Int(val as i64),
)
})
}

View File

@@ -9,7 +9,7 @@ use measured::{
metric::{
counter::CounterState,
gauge::GaugeState,
group::Encoding,
group::{Encoding, MetricValue},
name::{MetricName, MetricNameEncoder},
MetricEncoding, MetricFamilyEncoding,
},
@@ -171,11 +171,8 @@ fn write_gauge<Enc: Encoding>(
labels: impl LabelGroup,
name: impl MetricNameEncoder,
enc: &mut Enc,
) -> Result<(), Enc::Err>
where
GaugeState: MetricEncoding<Enc>,
{
GaugeState::new(x).collect_into(&(), labels, name, enc)
) -> Result<(), Enc::Err> {
enc.write_metric_value(name, labels, MetricValue::Int(x))
}
#[derive(Default)]
@@ -547,6 +544,15 @@ impl<T: Encoding> Encoding for Inc<T> {
fn write_help(&mut self, name: impl MetricNameEncoder, help: &str) -> Result<(), Self::Err> {
self.0.write_help(name, help)
}
fn write_metric_value(
&mut self,
name: impl MetricNameEncoder,
labels: impl LabelGroup,
value: MetricValue,
) -> Result<(), Self::Err> {
self.0.write_metric_value(name, labels, value)
}
}
impl<T: Encoding> MetricEncoding<Inc<T>> for MeasuredCounterPairState
@@ -573,6 +579,15 @@ impl<T: Encoding> Encoding for Dec<T> {
fn write_help(&mut self, name: impl MetricNameEncoder, help: &str) -> Result<(), Self::Err> {
self.0.write_help(name, help)
}
fn write_metric_value(
&mut self,
name: impl MetricNameEncoder,
labels: impl LabelGroup,
value: MetricValue,
) -> Result<(), Self::Err> {
self.0.write_metric_value(name, labels, value)
}
}
/// Write the dec counter to the encoder

View File

@@ -9,7 +9,6 @@ use std::{
collections::HashMap,
io::{BufRead, Read},
num::{NonZeroU64, NonZeroUsize},
str::FromStr,
sync::atomic::AtomicUsize,
time::{Duration, SystemTime},
};
@@ -438,7 +437,18 @@ pub enum CompactionAlgorithm {
Tiered,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[derive(
Debug,
Clone,
Copy,
PartialEq,
Eq,
Serialize,
Deserialize,
strum_macros::FromRepr,
strum_macros::EnumString,
)]
#[strum(serialize_all = "kebab-case")]
pub enum ImageCompressionAlgorithm {
/// Disabled for writes, and never decompress during reading.
/// Never set this after you've enabled compression once!
@@ -458,31 +468,6 @@ impl ImageCompressionAlgorithm {
}
}
impl FromStr for ImageCompressionAlgorithm {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let mut components = s.split(['(', ')']);
let first = components
.next()
.ok_or_else(|| anyhow::anyhow!("empty string"))?;
match first {
"disabled-no-decompress" => Ok(ImageCompressionAlgorithm::DisabledNoDecompress),
"disabled" => Ok(ImageCompressionAlgorithm::Disabled),
"zstd" => {
let level = if let Some(v) = components.next() {
let v: i8 = v.parse()?;
Some(v)
} else {
None
};
Ok(ImageCompressionAlgorithm::Zstd { level })
}
_ => anyhow::bail!("invalid specifier '{first}'"),
}
}
}
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
pub struct CompactionAlgorithmSettings {
pub kind: CompactionAlgorithm,
@@ -1675,29 +1660,4 @@ mod tests {
AuxFilePolicy::CrossValidation
);
}
#[test]
fn test_image_compression_algorithm_parsing() {
use ImageCompressionAlgorithm::*;
assert_eq!(
ImageCompressionAlgorithm::from_str("disabled").unwrap(),
Disabled
);
assert_eq!(
ImageCompressionAlgorithm::from_str("disabled-no-decompress").unwrap(),
DisabledNoDecompress
);
assert_eq!(
ImageCompressionAlgorithm::from_str("zstd").unwrap(),
Zstd { level: None }
);
assert_eq!(
ImageCompressionAlgorithm::from_str("zstd(18)").unwrap(),
Zstd { level: Some(18) }
);
assert_eq!(
ImageCompressionAlgorithm::from_str("zstd(-3)").unwrap(),
Zstd { level: Some(-3) }
);
}
}

View File

@@ -1,42 +1,59 @@
//! See docs/rfcs/031-sharding-static.md for an overview of sharding.
//!
//! This module contains a variety of types used to represent the concept of sharding
//! a Neon tenant across multiple physical shards. Since there are quite a few of these,
//! we provide an summary here.
//!
//! Types used to describe shards:
//! - [`ShardCount`] describes how many shards make up a tenant, plus the magic `unsharded` value
//! which identifies a tenant which is not shard-aware. This means its storage paths do not include
//! a shard suffix.
//! - [`ShardNumber`] is simply the zero-based index of a shard within a tenant.
//! - [`ShardIndex`] is the 2-tuple of `ShardCount` and `ShardNumber`, it's just like a `TenantShardId`
//! without the tenant ID. This is useful for things that are implicitly scoped to a particular
//! tenant, such as layer files.
//! - [`ShardIdentity`]` is the full description of a particular shard's parameters, in sufficient
//! detail to convert a [`Key`] to a [`ShardNumber`] when deciding where to write/read.
//! - The [`ShardSlug`] is a terse formatter for ShardCount and ShardNumber, written as
//! four hex digits. An unsharded tenant is `0000`.
//! - [`TenantShardId`] is the unique ID of a particular shard within a particular tenant
//!
//! Types used to describe the parameters for data distribution in a sharded tenant:
//! - [`ShardStripeSize`] controls how long contiguous runs of [`Key`]s (stripes) are when distributed across
//! multiple shards. Its value is given in 8kiB pages.
//! - [`ShardLayout`] describes the data distribution scheme, and at time of writing is
//! always zero: this is provided for future upgrades that might introduce different
//! data distribution schemes.
//!
//! Examples:
//! - A legacy unsharded tenant has one shard with ShardCount(0), ShardNumber(0), and its slug is 0000
//! - A single sharded tenant has one shard with ShardCount(1), ShardNumber(0), and its slug is 0001
//! - In a tenant with 4 shards, each shard has ShardCount(N), ShardNumber(i) where i in 0..N-1 (inclusive),
//! and their slugs are 0004, 0104, 0204, and 0304.
use std::{ops::RangeInclusive, str::FromStr};
use crate::{key::Key, models::ShardParameters};
use hex::FromHex;
use postgres_ffi::relfile_utils::INIT_FORKNUM;
use serde::{Deserialize, Serialize};
use utils::id::TenantId;
#[doc(inline)]
pub use ::utils::shard::*;
/// See docs/rfcs/031-sharding-static.md for an overview of sharding.
///
/// This module contains a variety of types used to represent the concept of sharding
/// a Neon tenant across multiple physical shards. Since there are quite a few of these,
/// we provide an summary here.
///
/// Types used to describe shards:
/// - [`ShardCount`] describes how many shards make up a tenant, plus the magic `unsharded` value
/// which identifies a tenant which is not shard-aware. This means its storage paths do not include
/// a shard suffix.
/// - [`ShardNumber`] is simply the zero-based index of a shard within a tenant.
/// - [`ShardIndex`] is the 2-tuple of `ShardCount` and `ShardNumber`, it's just like a `TenantShardId`
/// without the tenant ID. This is useful for things that are implicitly scoped to a particular
/// tenant, such as layer files.
/// - [`ShardIdentity`]` is the full description of a particular shard's parameters, in sufficient
/// detail to convert a [`Key`] to a [`ShardNumber`] when deciding where to write/read.
/// - The [`ShardSlug`] is a terse formatter for ShardCount and ShardNumber, written as
/// four hex digits. An unsharded tenant is `0000`.
/// - [`TenantShardId`] is the unique ID of a particular shard within a particular tenant
///
/// Types used to describe the parameters for data distribution in a sharded tenant:
/// - [`ShardStripeSize`] controls how long contiguous runs of [`Key`]s (stripes) are when distributed across
/// multiple shards. Its value is given in 8kiB pages.
/// - [`ShardLayout`] describes the data distribution scheme, and at time of writing is
/// always zero: this is provided for future upgrades that might introduce different
/// data distribution schemes.
///
/// Examples:
/// - A legacy unsharded tenant has one shard with ShardCount(0), ShardNumber(0), and its slug is 0000
/// - A single sharded tenant has one shard with ShardCount(1), ShardNumber(0), and its slug is 0001
/// - In a tenant with 4 shards, each shard has ShardCount(N), ShardNumber(i) where i in 0..N-1 (inclusive),
/// and their slugs are 0004, 0104, 0204, and 0304.
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
pub struct ShardNumber(pub u8);
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
pub struct ShardCount(u8);
/// Combination of ShardNumber and ShardCount. For use within the context of a particular tenant,
/// when we need to know which shard we're dealing with, but do not need to know the full
/// ShardIdentity (because we won't be doing any page->shard mapping), and do not need to know
/// the fully qualified TenantShardId.
#[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
pub struct ShardIndex {
pub shard_number: ShardNumber,
pub shard_count: ShardCount,
}
/// The ShardIdentity contains enough information to map a [`Key`] to a [`ShardNumber`],
/// and to check whether that [`ShardNumber`] is the same as the current shard.
@@ -48,6 +65,362 @@ pub struct ShardIdentity {
layout: ShardLayout,
}
/// Formatting helper, for generating the `shard_id` label in traces.
struct ShardSlug<'a>(&'a TenantShardId);
/// TenantShardId globally identifies a particular shard in a particular tenant.
///
/// These are written as `<TenantId>-<ShardSlug>`, for example:
/// # The second shard in a two-shard tenant
/// 072f1291a5310026820b2fe4b2968934-0102
///
/// If the `ShardCount` is _unsharded_, the `TenantShardId` is written without
/// a shard suffix and is equivalent to the encoding of a `TenantId`: this enables
/// an unsharded [`TenantShardId`] to be used interchangably with a [`TenantId`].
///
/// The human-readable encoding of an unsharded TenantShardId, such as used in API URLs,
/// is both forward and backward compatible with TenantId: a legacy TenantId can be
/// decoded as a TenantShardId, and when re-encoded it will be parseable
/// as a TenantId.
#[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
pub struct TenantShardId {
pub tenant_id: TenantId,
pub shard_number: ShardNumber,
pub shard_count: ShardCount,
}
impl ShardCount {
pub const MAX: Self = Self(u8::MAX);
/// The internal value of a ShardCount may be zero, which means "1 shard, but use
/// legacy format for TenantShardId that excludes the shard suffix", also known
/// as [`TenantShardId::unsharded`].
///
/// This method returns the actual number of shards, i.e. if our internal value is
/// zero, we return 1 (unsharded tenants have 1 shard).
pub fn count(&self) -> u8 {
if self.0 > 0 {
self.0
} else {
1
}
}
/// The literal internal value: this is **not** the number of shards in the
/// tenant, as we have a special zero value for legacy unsharded tenants. Use
/// [`Self::count`] if you want to know the cardinality of shards.
pub fn literal(&self) -> u8 {
self.0
}
/// Whether the `ShardCount` is for an unsharded tenant, so uses one shard but
/// uses the legacy format for `TenantShardId`. See also the documentation for
/// [`Self::count`].
pub fn is_unsharded(&self) -> bool {
self.0 == 0
}
/// `v` may be zero, or the number of shards in the tenant. `v` is what
/// [`Self::literal`] would return.
pub const fn new(val: u8) -> Self {
Self(val)
}
}
impl ShardNumber {
pub const MAX: Self = Self(u8::MAX);
}
impl TenantShardId {
pub fn unsharded(tenant_id: TenantId) -> Self {
Self {
tenant_id,
shard_number: ShardNumber(0),
shard_count: ShardCount(0),
}
}
/// The range of all TenantShardId that belong to a particular TenantId. This is useful when
/// you have a BTreeMap of TenantShardId, and are querying by TenantId.
pub fn tenant_range(tenant_id: TenantId) -> RangeInclusive<Self> {
RangeInclusive::new(
Self {
tenant_id,
shard_number: ShardNumber(0),
shard_count: ShardCount(0),
},
Self {
tenant_id,
shard_number: ShardNumber::MAX,
shard_count: ShardCount::MAX,
},
)
}
pub fn shard_slug(&self) -> impl std::fmt::Display + '_ {
ShardSlug(self)
}
/// Convenience for code that has special behavior on the 0th shard.
pub fn is_shard_zero(&self) -> bool {
self.shard_number == ShardNumber(0)
}
/// The "unsharded" value is distinct from simply having a single shard: it represents
/// a tenant which is not shard-aware at all, and whose storage paths will not include
/// a shard suffix.
pub fn is_unsharded(&self) -> bool {
self.shard_number == ShardNumber(0) && self.shard_count.is_unsharded()
}
/// Convenience for dropping the tenant_id and just getting the ShardIndex: this
/// is useful when logging from code that is already in a span that includes tenant ID, to
/// keep messages reasonably terse.
pub fn to_index(&self) -> ShardIndex {
ShardIndex {
shard_number: self.shard_number,
shard_count: self.shard_count,
}
}
/// Calculate the children of this TenantShardId when splitting the overall tenant into
/// the given number of shards.
pub fn split(&self, new_shard_count: ShardCount) -> Vec<TenantShardId> {
let effective_old_shard_count = std::cmp::max(self.shard_count.0, 1);
let mut child_shards = Vec::new();
for shard_number in 0..ShardNumber(new_shard_count.0).0 {
// Key mapping is based on a round robin mapping of key hash modulo shard count,
// so our child shards are the ones which the same keys would map to.
if shard_number % effective_old_shard_count == self.shard_number.0 {
child_shards.push(TenantShardId {
tenant_id: self.tenant_id,
shard_number: ShardNumber(shard_number),
shard_count: new_shard_count,
})
}
}
child_shards
}
}
impl<'a> std::fmt::Display for ShardSlug<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{:02x}{:02x}",
self.0.shard_number.0, self.0.shard_count.0
)
}
}
impl std::fmt::Display for TenantShardId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.shard_count != ShardCount(0) {
write!(f, "{}-{}", self.tenant_id, self.shard_slug())
} else {
// Legacy case (shard_count == 0) -- format as just the tenant id. Note that this
// is distinct from the normal single shard case (shard count == 1).
self.tenant_id.fmt(f)
}
}
}
impl std::fmt::Debug for TenantShardId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// Debug is the same as Display: the compact hex representation
write!(f, "{}", self)
}
}
impl std::str::FromStr for TenantShardId {
type Err = hex::FromHexError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
// Expect format: 16 byte TenantId, '-', 1 byte shard number, 1 byte shard count
if s.len() == 32 {
// Legacy case: no shard specified
Ok(Self {
tenant_id: TenantId::from_str(s)?,
shard_number: ShardNumber(0),
shard_count: ShardCount(0),
})
} else if s.len() == 37 {
let bytes = s.as_bytes();
let tenant_id = TenantId::from_hex(&bytes[0..32])?;
let mut shard_parts: [u8; 2] = [0u8; 2];
hex::decode_to_slice(&bytes[33..37], &mut shard_parts)?;
Ok(Self {
tenant_id,
shard_number: ShardNumber(shard_parts[0]),
shard_count: ShardCount(shard_parts[1]),
})
} else {
Err(hex::FromHexError::InvalidStringLength)
}
}
}
impl From<[u8; 18]> for TenantShardId {
fn from(b: [u8; 18]) -> Self {
let tenant_id_bytes: [u8; 16] = b[0..16].try_into().unwrap();
Self {
tenant_id: TenantId::from(tenant_id_bytes),
shard_number: ShardNumber(b[16]),
shard_count: ShardCount(b[17]),
}
}
}
impl ShardIndex {
pub fn new(number: ShardNumber, count: ShardCount) -> Self {
Self {
shard_number: number,
shard_count: count,
}
}
pub fn unsharded() -> Self {
Self {
shard_number: ShardNumber(0),
shard_count: ShardCount(0),
}
}
/// The "unsharded" value is distinct from simply having a single shard: it represents
/// a tenant which is not shard-aware at all, and whose storage paths will not include
/// a shard suffix.
pub fn is_unsharded(&self) -> bool {
self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0)
}
/// For use in constructing remote storage paths: concatenate this with a TenantId
/// to get a fully qualified TenantShardId.
///
/// Backward compat: this function returns an empty string if Self::is_unsharded, such
/// that the legacy pre-sharding remote key format is preserved.
pub fn get_suffix(&self) -> String {
if self.is_unsharded() {
"".to_string()
} else {
format!("-{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
}
}
}
impl std::fmt::Display for ShardIndex {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
}
}
impl std::fmt::Debug for ShardIndex {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// Debug is the same as Display: the compact hex representation
write!(f, "{}", self)
}
}
impl std::str::FromStr for ShardIndex {
type Err = hex::FromHexError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
// Expect format: 1 byte shard number, 1 byte shard count
if s.len() == 4 {
let bytes = s.as_bytes();
let mut shard_parts: [u8; 2] = [0u8; 2];
hex::decode_to_slice(bytes, &mut shard_parts)?;
Ok(Self {
shard_number: ShardNumber(shard_parts[0]),
shard_count: ShardCount(shard_parts[1]),
})
} else {
Err(hex::FromHexError::InvalidStringLength)
}
}
}
impl From<[u8; 2]> for ShardIndex {
fn from(b: [u8; 2]) -> Self {
Self {
shard_number: ShardNumber(b[0]),
shard_count: ShardCount(b[1]),
}
}
}
impl Serialize for TenantShardId {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
if serializer.is_human_readable() {
serializer.collect_str(self)
} else {
// Note: while human encoding of [`TenantShardId`] is backward and forward
// compatible, this binary encoding is not.
let mut packed: [u8; 18] = [0; 18];
packed[0..16].clone_from_slice(&self.tenant_id.as_arr());
packed[16] = self.shard_number.0;
packed[17] = self.shard_count.0;
packed.serialize(serializer)
}
}
}
impl<'de> Deserialize<'de> for TenantShardId {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct IdVisitor {
is_human_readable_deserializer: bool,
}
impl<'de> serde::de::Visitor<'de> for IdVisitor {
type Value = TenantShardId;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
if self.is_human_readable_deserializer {
formatter.write_str("value in form of hex string")
} else {
formatter.write_str("value in form of integer array([u8; 18])")
}
}
fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
where
A: serde::de::SeqAccess<'de>,
{
let s = serde::de::value::SeqAccessDeserializer::new(seq);
let id: [u8; 18] = Deserialize::deserialize(s)?;
Ok(TenantShardId::from(id))
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
TenantShardId::from_str(v).map_err(E::custom)
}
}
if deserializer.is_human_readable() {
deserializer.deserialize_str(IdVisitor {
is_human_readable_deserializer: true,
})
} else {
deserializer.deserialize_tuple(
18,
IdVisitor {
is_human_readable_deserializer: false,
},
)
}
}
}
/// Stripe size in number of pages
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
pub struct ShardStripeSize(pub u32);
@@ -212,6 +585,77 @@ impl ShardIdentity {
}
}
impl Serialize for ShardIndex {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
if serializer.is_human_readable() {
serializer.collect_str(self)
} else {
// Binary encoding is not used in index_part.json, but is included in anticipation of
// switching various structures (e.g. inter-process communication, remote metadata) to more
// compact binary encodings in future.
let mut packed: [u8; 2] = [0; 2];
packed[0] = self.shard_number.0;
packed[1] = self.shard_count.0;
packed.serialize(serializer)
}
}
}
impl<'de> Deserialize<'de> for ShardIndex {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct IdVisitor {
is_human_readable_deserializer: bool,
}
impl<'de> serde::de::Visitor<'de> for IdVisitor {
type Value = ShardIndex;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
if self.is_human_readable_deserializer {
formatter.write_str("value in form of hex string")
} else {
formatter.write_str("value in form of integer array([u8; 2])")
}
}
fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
where
A: serde::de::SeqAccess<'de>,
{
let s = serde::de::value::SeqAccessDeserializer::new(seq);
let id: [u8; 2] = Deserialize::deserialize(s)?;
Ok(ShardIndex::from(id))
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
ShardIndex::from_str(v).map_err(E::custom)
}
}
if deserializer.is_human_readable() {
deserializer.deserialize_str(IdVisitor {
is_human_readable_deserializer: true,
})
} else {
deserializer.deserialize_tuple(
2,
IdVisitor {
is_human_readable_deserializer: false,
},
)
}
}
}
/// Whether this key is always held on shard 0 (e.g. shard 0 holds all SLRU keys
/// in order to be able to serve basebackup requests without peer communication).
fn key_is_shard0(key: &Key) -> bool {
@@ -293,9 +737,7 @@ pub fn describe(
#[cfg(test)]
mod tests {
use std::str::FromStr;
use utils::{id::TenantId, Hex};
use utils::Hex;
use super::*;

View File

@@ -5,7 +5,6 @@ edition.workspace = true
license.workspace = true
[dependencies]
async-trait.workspace = true
anyhow.workspace = true
bytes.workspace = true
futures.workspace = true
@@ -13,7 +12,6 @@ rustls.workspace = true
serde.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio-util.workspace = true
tokio-rustls.workspace = true
tracing.workspace = true
@@ -24,4 +22,4 @@ workspace_hack.workspace = true
once_cell.workspace = true
rustls-pemfile.workspace = true
tokio-postgres.workspace = true
tokio-postgres-rustls.workspace = true
tokio-postgres-rustls.workspace = true

View File

@@ -16,7 +16,6 @@ use std::{fmt, io};
use std::{future::Future, str::FromStr};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_rustls::TlsAcceptor;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, trace, warn};
use pq_proto::framed::{ConnectionError, Framed, FramedReader, FramedWriter};
@@ -79,7 +78,7 @@ pub fn is_expected_io_error(e: &io::Error) -> bool {
)
}
#[async_trait::async_trait]
#[allow(async_fn_in_trait)]
pub trait Handler<IO> {
/// Handle single query.
/// postgres_backend will issue ReadyForQuery after calling this (this
@@ -401,15 +400,21 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
}
/// Wrapper for run_message_loop() that shuts down socket when we are done
pub async fn run(
pub async fn run<F, S>(
mut self,
handler: &mut impl Handler<IO>,
cancel: &CancellationToken,
) -> Result<(), QueryError> {
let ret = self.run_message_loop(handler, cancel).await;
shutdown_watcher: F,
) -> Result<(), QueryError>
where
F: Fn() -> S + Clone,
S: Future,
{
let ret = self
.run_message_loop(handler, shutdown_watcher.clone())
.await;
tokio::select! {
_ = cancel.cancelled() => {
_ = shutdown_watcher() => {
// do nothing; we most likely got already stopped by shutdown and will log it next.
}
_ = self.framed.shutdown() => {
@@ -439,17 +444,21 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
}
}
async fn run_message_loop(
async fn run_message_loop<F, S>(
&mut self,
handler: &mut impl Handler<IO>,
cancel: &CancellationToken,
) -> Result<(), QueryError> {
shutdown_watcher: F,
) -> Result<(), QueryError>
where
F: Fn() -> S,
S: Future,
{
trace!("postgres backend to {:?} started", self.peer_addr);
tokio::select!(
biased;
_ = cancel.cancelled() => {
_ = shutdown_watcher() => {
// We were requested to shut down.
tracing::info!("shutdown request received during handshake");
return Err(QueryError::Shutdown)
@@ -464,7 +473,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
let mut query_string = Bytes::new();
while let Some(msg) = tokio::select!(
biased;
_ = cancel.cancelled() => {
_ = shutdown_watcher() => {
// We were requested to shut down.
tracing::info!("shutdown request received in run_message_loop");
return Err(QueryError::Shutdown)
@@ -476,7 +485,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
let result = self.process_message(handler, msg, &mut query_string).await;
tokio::select!(
biased;
_ = cancel.cancelled() => {
_ = shutdown_watcher() => {
// We were requested to shut down.
tracing::info!("shutdown request received during response flush");
@@ -663,17 +672,11 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
assert!(self.state < ProtoState::Authentication);
let have_tls = self.tls_config.is_some();
match msg {
FeStartupPacket::SslRequest { direct } => {
FeStartupPacket::SslRequest => {
debug!("SSL requested");
if !direct {
self.write_message(&BeMessage::EncryptionResponse(have_tls))
.await?;
} else if !have_tls {
return Err(QueryError::Other(anyhow::anyhow!(
"direct SSL negotiation but no TLS support"
)));
}
self.write_message(&BeMessage::EncryptionResponse(have_tls))
.await?;
if have_tls {
self.start_tls().await?;

View File

@@ -3,14 +3,13 @@ use once_cell::sync::Lazy;
use postgres_backend::{AuthType, Handler, PostgresBackend, QueryError};
use pq_proto::{BeMessage, RowDescriptor};
use std::io::Cursor;
use std::sync::Arc;
use std::{future, sync::Arc};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::{TcpListener, TcpStream};
use tokio_postgres::config::SslMode;
use tokio_postgres::tls::MakeTlsConnect;
use tokio_postgres::{Config, NoTls, SimpleQueryMessage};
use tokio_postgres_rustls::MakeRustlsConnect;
use tokio_util::sync::CancellationToken;
// generate client, server test streams
async fn make_tcp_pair() -> (TcpStream, TcpStream) {
@@ -23,7 +22,6 @@ async fn make_tcp_pair() -> (TcpStream, TcpStream) {
struct TestHandler {}
#[async_trait::async_trait]
impl<IO: AsyncRead + AsyncWrite + Unpin + Send> Handler<IO> for TestHandler {
// return single col 'hey' for any query
async fn process_query(
@@ -51,7 +49,7 @@ async fn simple_select() {
tokio::spawn(async move {
let mut handler = TestHandler {};
pgbackend.run(&mut handler, &CancellationToken::new()).await
pgbackend.run(&mut handler, future::pending::<()>).await
});
let conf = Config::new();
@@ -103,7 +101,7 @@ async fn simple_select_ssl() {
tokio::spawn(async move {
let mut handler = TestHandler {};
pgbackend.run(&mut handler, &CancellationToken::new()).await
pgbackend.run(&mut handler, future::pending::<()>).await
});
let client_cfg = rustls::ClientConfig::builder()

View File

@@ -44,9 +44,9 @@ impl ConnectionError {
/// Wraps async io `stream`, providing messages to write/flush + read Postgres
/// messages.
pub struct Framed<S> {
pub stream: S,
pub read_buf: BytesMut,
pub write_buf: BytesMut,
stream: S,
read_buf: BytesMut,
write_buf: BytesMut,
}
impl<S> Framed<S> {

View File

@@ -39,39 +39,14 @@ pub enum FeMessage {
PasswordMessage(Bytes),
}
#[derive(Clone, Copy, PartialEq, PartialOrd)]
pub struct ProtocolVersion(u32);
impl ProtocolVersion {
pub const fn new(major: u16, minor: u16) -> Self {
Self((major as u32) << 16 | minor as u32)
}
pub const fn minor(self) -> u16 {
self.0 as u16
}
pub const fn major(self) -> u16 {
(self.0 >> 16) as u16
}
}
impl fmt::Debug for ProtocolVersion {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_list()
.entry(&self.major())
.entry(&self.minor())
.finish()
}
}
#[derive(Debug)]
pub enum FeStartupPacket {
CancelRequest(CancelKeyData),
SslRequest {
direct: bool,
},
SslRequest,
GssEncRequest,
StartupMessage {
version: ProtocolVersion,
major_version: u32,
minor_version: u32,
params: StartupMessageParams,
},
}
@@ -326,23 +301,11 @@ impl FeStartupPacket {
/// different from [`FeMessage::parse`] because startup messages don't have
/// message type byte; otherwise, its comments apply.
pub fn parse(buf: &mut BytesMut) -> Result<Option<FeStartupPacket>, ProtocolError> {
/// <https://github.com/postgres/postgres/blob/ca481d3c9ab7bf69ff0c8d71ad3951d407f6a33c/src/include/libpq/pqcomm.h#L118>
const MAX_STARTUP_PACKET_LENGTH: usize = 10000;
const RESERVED_INVALID_MAJOR_VERSION: u16 = 1234;
/// <https://github.com/postgres/postgres/blob/ca481d3c9ab7bf69ff0c8d71ad3951d407f6a33c/src/include/libpq/pqcomm.h#L132>
const CANCEL_REQUEST_CODE: ProtocolVersion = ProtocolVersion::new(1234, 5678);
/// <https://github.com/postgres/postgres/blob/ca481d3c9ab7bf69ff0c8d71ad3951d407f6a33c/src/include/libpq/pqcomm.h#L166>
const NEGOTIATE_SSL_CODE: ProtocolVersion = ProtocolVersion::new(1234, 5679);
/// <https://github.com/postgres/postgres/blob/ca481d3c9ab7bf69ff0c8d71ad3951d407f6a33c/src/include/libpq/pqcomm.h#L167>
const NEGOTIATE_GSS_CODE: ProtocolVersion = ProtocolVersion::new(1234, 5680);
// <https://github.com/postgres/postgres/blob/04bcf9e19a4261fe9c7df37c777592c2e10c32a7/src/backend/tcop/backend_startup.c#L378-L382>
// First byte indicates standard SSL handshake message
// (It can't be a Postgres startup length because in network byte order
// that would be a startup packet hundreds of megabytes long)
if buf.first() == Some(&0x16) {
return Ok(Some(FeStartupPacket::SslRequest { direct: true }));
}
const RESERVED_INVALID_MAJOR_VERSION: u32 = 1234;
const CANCEL_REQUEST_CODE: u32 = 5678;
const NEGOTIATE_SSL_CODE: u32 = 5679;
const NEGOTIATE_GSS_CODE: u32 = 5680;
// need at least 4 bytes with packet len
if buf.len() < 4 {
@@ -375,10 +338,12 @@ impl FeStartupPacket {
let mut msg = buf.split_to(len).freeze();
msg.advance(4); // consume len
let request_code = ProtocolVersion(msg.get_u32());
let request_code = msg.get_u32();
let req_hi = request_code >> 16;
let req_lo = request_code & ((1 << 16) - 1);
// StartupMessage, CancelRequest, SSLRequest etc are differentiated by request code.
let message = match request_code {
CANCEL_REQUEST_CODE => {
let message = match (req_hi, req_lo) {
(RESERVED_INVALID_MAJOR_VERSION, CANCEL_REQUEST_CODE) => {
if msg.remaining() != 8 {
return Err(ProtocolError::BadMessage(
"CancelRequest message is malformed, backend PID / secret key missing"
@@ -390,22 +355,21 @@ impl FeStartupPacket {
cancel_key: msg.get_i32(),
})
}
NEGOTIATE_SSL_CODE => {
(RESERVED_INVALID_MAJOR_VERSION, NEGOTIATE_SSL_CODE) => {
// Requested upgrade to SSL (aka TLS)
FeStartupPacket::SslRequest { direct: false }
FeStartupPacket::SslRequest
}
NEGOTIATE_GSS_CODE => {
(RESERVED_INVALID_MAJOR_VERSION, NEGOTIATE_GSS_CODE) => {
// Requested upgrade to GSSAPI
FeStartupPacket::GssEncRequest
}
version if version.major() == RESERVED_INVALID_MAJOR_VERSION => {
(RESERVED_INVALID_MAJOR_VERSION, unrecognized_code) => {
return Err(ProtocolError::Protocol(format!(
"Unrecognized request code {}",
version.minor()
"Unrecognized request code {unrecognized_code}"
)));
}
// TODO bail if protocol major_version is not 3?
version => {
(major_version, minor_version) => {
// StartupMessage
let s = str::from_utf8(&msg).map_err(|_e| {
@@ -418,7 +382,8 @@ impl FeStartupPacket {
})?;
FeStartupPacket::StartupMessage {
version,
major_version,
minor_version,
params: StartupMessageParams {
params: msg.slice_ref(s.as_bytes()),
},
@@ -557,10 +522,6 @@ pub enum BeMessage<'a> {
RowDescription(&'a [RowDescriptor<'a>]),
XLogData(XLogDataBody<'a>),
NoticeResponse(&'a str),
NegotiateProtocolVersion {
version: ProtocolVersion,
options: &'a [&'a str],
},
KeepAlive(WalSndKeepAlive),
}
@@ -984,18 +945,6 @@ impl<'a> BeMessage<'a> {
buf.put_u8(u8::from(req.request_reply));
});
}
BeMessage::NegotiateProtocolVersion { version, options } => {
buf.put_u8(b'v');
write_body(buf, |buf| {
buf.put_u32(version.0);
buf.put_u32(options.len() as u32);
for option in options.iter() {
write_cstr(option, buf)?;
}
Ok(())
})?
}
}
Ok(())
}

View File

@@ -74,15 +74,6 @@ pub fn parse_query_param<E: fmt::Display, T: FromStr<Err = E>>(
.transpose()
}
pub fn must_parse_query_param<E: fmt::Display, T: FromStr<Err = E>>(
request: &Request<Body>,
param_name: &str,
) -> Result<T, ApiError> {
parse_query_param(request, param_name)?.ok_or_else(|| {
ApiError::BadRequest(anyhow!("no {param_name} specified in query parameters"))
})
}
pub async fn ensure_no_body(request: &mut Request<Body>) -> Result<(), ApiError> {
match request.body_mut().data().await {
Some(_) => Err(ApiError::BadRequest(anyhow!("Unexpected request body"))),

View File

@@ -26,8 +26,6 @@ pub mod auth;
// utility functions and helper traits for unified unique id generation/serialization etc.
pub mod id;
pub mod shard;
mod hex;
pub use hex::Hex;

View File

@@ -1,451 +0,0 @@
//! See `pageserver_api::shard` for description on sharding.
use std::{ops::RangeInclusive, str::FromStr};
use hex::FromHex;
use serde::{Deserialize, Serialize};
use crate::id::TenantId;
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
pub struct ShardNumber(pub u8);
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
pub struct ShardCount(pub u8);
/// Combination of ShardNumber and ShardCount. For use within the context of a particular tenant,
/// when we need to know which shard we're dealing with, but do not need to know the full
/// ShardIdentity (because we won't be doing any page->shard mapping), and do not need to know
/// the fully qualified TenantShardId.
#[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
pub struct ShardIndex {
pub shard_number: ShardNumber,
pub shard_count: ShardCount,
}
/// Formatting helper, for generating the `shard_id` label in traces.
pub struct ShardSlug<'a>(&'a TenantShardId);
/// TenantShardId globally identifies a particular shard in a particular tenant.
///
/// These are written as `<TenantId>-<ShardSlug>`, for example:
/// # The second shard in a two-shard tenant
/// 072f1291a5310026820b2fe4b2968934-0102
///
/// If the `ShardCount` is _unsharded_, the `TenantShardId` is written without
/// a shard suffix and is equivalent to the encoding of a `TenantId`: this enables
/// an unsharded [`TenantShardId`] to be used interchangably with a [`TenantId`].
///
/// The human-readable encoding of an unsharded TenantShardId, such as used in API URLs,
/// is both forward and backward compatible with TenantId: a legacy TenantId can be
/// decoded as a TenantShardId, and when re-encoded it will be parseable
/// as a TenantId.
#[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
pub struct TenantShardId {
pub tenant_id: TenantId,
pub shard_number: ShardNumber,
pub shard_count: ShardCount,
}
impl ShardCount {
pub const MAX: Self = Self(u8::MAX);
/// The internal value of a ShardCount may be zero, which means "1 shard, but use
/// legacy format for TenantShardId that excludes the shard suffix", also known
/// as [`TenantShardId::unsharded`].
///
/// This method returns the actual number of shards, i.e. if our internal value is
/// zero, we return 1 (unsharded tenants have 1 shard).
pub fn count(&self) -> u8 {
if self.0 > 0 {
self.0
} else {
1
}
}
/// The literal internal value: this is **not** the number of shards in the
/// tenant, as we have a special zero value for legacy unsharded tenants. Use
/// [`Self::count`] if you want to know the cardinality of shards.
pub fn literal(&self) -> u8 {
self.0
}
/// Whether the `ShardCount` is for an unsharded tenant, so uses one shard but
/// uses the legacy format for `TenantShardId`. See also the documentation for
/// [`Self::count`].
pub fn is_unsharded(&self) -> bool {
self.0 == 0
}
/// `v` may be zero, or the number of shards in the tenant. `v` is what
/// [`Self::literal`] would return.
pub const fn new(val: u8) -> Self {
Self(val)
}
}
impl ShardNumber {
pub const MAX: Self = Self(u8::MAX);
}
impl TenantShardId {
pub fn unsharded(tenant_id: TenantId) -> Self {
Self {
tenant_id,
shard_number: ShardNumber(0),
shard_count: ShardCount(0),
}
}
/// The range of all TenantShardId that belong to a particular TenantId. This is useful when
/// you have a BTreeMap of TenantShardId, and are querying by TenantId.
pub fn tenant_range(tenant_id: TenantId) -> RangeInclusive<Self> {
RangeInclusive::new(
Self {
tenant_id,
shard_number: ShardNumber(0),
shard_count: ShardCount(0),
},
Self {
tenant_id,
shard_number: ShardNumber::MAX,
shard_count: ShardCount::MAX,
},
)
}
pub fn shard_slug(&self) -> impl std::fmt::Display + '_ {
ShardSlug(self)
}
/// Convenience for code that has special behavior on the 0th shard.
pub fn is_shard_zero(&self) -> bool {
self.shard_number == ShardNumber(0)
}
/// The "unsharded" value is distinct from simply having a single shard: it represents
/// a tenant which is not shard-aware at all, and whose storage paths will not include
/// a shard suffix.
pub fn is_unsharded(&self) -> bool {
self.shard_number == ShardNumber(0) && self.shard_count.is_unsharded()
}
/// Convenience for dropping the tenant_id and just getting the ShardIndex: this
/// is useful when logging from code that is already in a span that includes tenant ID, to
/// keep messages reasonably terse.
pub fn to_index(&self) -> ShardIndex {
ShardIndex {
shard_number: self.shard_number,
shard_count: self.shard_count,
}
}
/// Calculate the children of this TenantShardId when splitting the overall tenant into
/// the given number of shards.
pub fn split(&self, new_shard_count: ShardCount) -> Vec<TenantShardId> {
let effective_old_shard_count = std::cmp::max(self.shard_count.0, 1);
let mut child_shards = Vec::new();
for shard_number in 0..ShardNumber(new_shard_count.0).0 {
// Key mapping is based on a round robin mapping of key hash modulo shard count,
// so our child shards are the ones which the same keys would map to.
if shard_number % effective_old_shard_count == self.shard_number.0 {
child_shards.push(TenantShardId {
tenant_id: self.tenant_id,
shard_number: ShardNumber(shard_number),
shard_count: new_shard_count,
})
}
}
child_shards
}
}
impl<'a> std::fmt::Display for ShardSlug<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{:02x}{:02x}",
self.0.shard_number.0, self.0.shard_count.0
)
}
}
impl std::fmt::Display for TenantShardId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.shard_count != ShardCount(0) {
write!(f, "{}-{}", self.tenant_id, self.shard_slug())
} else {
// Legacy case (shard_count == 0) -- format as just the tenant id. Note that this
// is distinct from the normal single shard case (shard count == 1).
self.tenant_id.fmt(f)
}
}
}
impl std::fmt::Debug for TenantShardId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// Debug is the same as Display: the compact hex representation
write!(f, "{}", self)
}
}
impl std::str::FromStr for TenantShardId {
type Err = hex::FromHexError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
// Expect format: 16 byte TenantId, '-', 1 byte shard number, 1 byte shard count
if s.len() == 32 {
// Legacy case: no shard specified
Ok(Self {
tenant_id: TenantId::from_str(s)?,
shard_number: ShardNumber(0),
shard_count: ShardCount(0),
})
} else if s.len() == 37 {
let bytes = s.as_bytes();
let tenant_id = TenantId::from_hex(&bytes[0..32])?;
let mut shard_parts: [u8; 2] = [0u8; 2];
hex::decode_to_slice(&bytes[33..37], &mut shard_parts)?;
Ok(Self {
tenant_id,
shard_number: ShardNumber(shard_parts[0]),
shard_count: ShardCount(shard_parts[1]),
})
} else {
Err(hex::FromHexError::InvalidStringLength)
}
}
}
impl From<[u8; 18]> for TenantShardId {
fn from(b: [u8; 18]) -> Self {
let tenant_id_bytes: [u8; 16] = b[0..16].try_into().unwrap();
Self {
tenant_id: TenantId::from(tenant_id_bytes),
shard_number: ShardNumber(b[16]),
shard_count: ShardCount(b[17]),
}
}
}
impl ShardIndex {
pub fn new(number: ShardNumber, count: ShardCount) -> Self {
Self {
shard_number: number,
shard_count: count,
}
}
pub fn unsharded() -> Self {
Self {
shard_number: ShardNumber(0),
shard_count: ShardCount(0),
}
}
/// The "unsharded" value is distinct from simply having a single shard: it represents
/// a tenant which is not shard-aware at all, and whose storage paths will not include
/// a shard suffix.
pub fn is_unsharded(&self) -> bool {
self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0)
}
/// For use in constructing remote storage paths: concatenate this with a TenantId
/// to get a fully qualified TenantShardId.
///
/// Backward compat: this function returns an empty string if Self::is_unsharded, such
/// that the legacy pre-sharding remote key format is preserved.
pub fn get_suffix(&self) -> String {
if self.is_unsharded() {
"".to_string()
} else {
format!("-{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
}
}
}
impl std::fmt::Display for ShardIndex {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
}
}
impl std::fmt::Debug for ShardIndex {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// Debug is the same as Display: the compact hex representation
write!(f, "{}", self)
}
}
impl std::str::FromStr for ShardIndex {
type Err = hex::FromHexError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
// Expect format: 1 byte shard number, 1 byte shard count
if s.len() == 4 {
let bytes = s.as_bytes();
let mut shard_parts: [u8; 2] = [0u8; 2];
hex::decode_to_slice(bytes, &mut shard_parts)?;
Ok(Self {
shard_number: ShardNumber(shard_parts[0]),
shard_count: ShardCount(shard_parts[1]),
})
} else {
Err(hex::FromHexError::InvalidStringLength)
}
}
}
impl From<[u8; 2]> for ShardIndex {
fn from(b: [u8; 2]) -> Self {
Self {
shard_number: ShardNumber(b[0]),
shard_count: ShardCount(b[1]),
}
}
}
impl Serialize for TenantShardId {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
if serializer.is_human_readable() {
serializer.collect_str(self)
} else {
// Note: while human encoding of [`TenantShardId`] is backward and forward
// compatible, this binary encoding is not.
let mut packed: [u8; 18] = [0; 18];
packed[0..16].clone_from_slice(&self.tenant_id.as_arr());
packed[16] = self.shard_number.0;
packed[17] = self.shard_count.0;
packed.serialize(serializer)
}
}
}
impl<'de> Deserialize<'de> for TenantShardId {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct IdVisitor {
is_human_readable_deserializer: bool,
}
impl<'de> serde::de::Visitor<'de> for IdVisitor {
type Value = TenantShardId;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
if self.is_human_readable_deserializer {
formatter.write_str("value in form of hex string")
} else {
formatter.write_str("value in form of integer array([u8; 18])")
}
}
fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
where
A: serde::de::SeqAccess<'de>,
{
let s = serde::de::value::SeqAccessDeserializer::new(seq);
let id: [u8; 18] = Deserialize::deserialize(s)?;
Ok(TenantShardId::from(id))
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
TenantShardId::from_str(v).map_err(E::custom)
}
}
if deserializer.is_human_readable() {
deserializer.deserialize_str(IdVisitor {
is_human_readable_deserializer: true,
})
} else {
deserializer.deserialize_tuple(
18,
IdVisitor {
is_human_readable_deserializer: false,
},
)
}
}
}
impl Serialize for ShardIndex {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
if serializer.is_human_readable() {
serializer.collect_str(self)
} else {
// Binary encoding is not used in index_part.json, but is included in anticipation of
// switching various structures (e.g. inter-process communication, remote metadata) to more
// compact binary encodings in future.
let mut packed: [u8; 2] = [0; 2];
packed[0] = self.shard_number.0;
packed[1] = self.shard_count.0;
packed.serialize(serializer)
}
}
}
impl<'de> Deserialize<'de> for ShardIndex {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct IdVisitor {
is_human_readable_deserializer: bool,
}
impl<'de> serde::de::Visitor<'de> for IdVisitor {
type Value = ShardIndex;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
if self.is_human_readable_deserializer {
formatter.write_str("value in form of hex string")
} else {
formatter.write_str("value in form of integer array([u8; 2])")
}
}
fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
where
A: serde::de::SeqAccess<'de>,
{
let s = serde::de::value::SeqAccessDeserializer::new(seq);
let id: [u8; 2] = Deserialize::deserialize(s)?;
Ok(ShardIndex::from(id))
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
ShardIndex::from_str(v).map_err(E::custom)
}
}
if deserializer.is_human_readable() {
deserializer.deserialize_str(IdVisitor {
is_human_readable_deserializer: true,
})
} else {
deserializer.deserialize_tuple(
2,
IdVisitor {
is_human_readable_deserializer: false,
},
)
}
}
}

View File

@@ -62,7 +62,6 @@ sync_wrapper.workspace = true
sysinfo.workspace = true
tokio-tar.workspace = true
thiserror.workspace = true
tikv-jemallocator.workspace = true
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time"] }
tokio-epoll-uring.workspace = true
tokio-io-timeout.workspace = true

View File

@@ -8,7 +8,7 @@ license.workspace = true
pageserver_api.workspace = true
thiserror.workspace = true
async-trait.workspace = true
reqwest = { workspace = true, features = [ "stream" ] }
reqwest.workspace = true
utils.workspace = true
serde.workspace = true
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -9,8 +9,6 @@ use utils::{
lsn::Lsn,
};
pub use reqwest::Body as ReqwestBody;
pub mod util;
#[derive(Debug, Clone)]
@@ -22,9 +20,6 @@ pub struct Client {
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("send request: {0}")]
SendRequest(reqwest::Error),
#[error("receive body: {0}")]
ReceiveBody(reqwest::Error),
@@ -178,30 +173,19 @@ impl Client {
self.request(Method::GET, uri, ()).await
}
fn start_request<U: reqwest::IntoUrl>(
&self,
method: Method,
uri: U,
) -> reqwest::RequestBuilder {
let req = self.client.request(method, uri);
if let Some(value) = &self.authorization_header {
req.header(reqwest::header::AUTHORIZATION, value)
} else {
req
}
}
async fn request_noerror<B: serde::Serialize, U: reqwest::IntoUrl>(
&self,
method: Method,
uri: U,
body: B,
) -> Result<reqwest::Response> {
self.start_request(method, uri)
.json(&body)
.send()
.await
.map_err(Error::ReceiveBody)
let req = self.client.request(method, uri);
let req = if let Some(value) = &self.authorization_header {
req.header(reqwest::header::AUTHORIZATION, value)
} else {
req
};
req.json(&body).send().await.map_err(Error::ReceiveBody)
}
async fn request<B: serde::Serialize, U: reqwest::IntoUrl>(
@@ -625,53 +609,4 @@ impl Client {
}),
}
}
pub async fn import_basebackup(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
base_lsn: Lsn,
end_lsn: Lsn,
pg_version: u32,
basebackup_tarball: ReqwestBody,
) -> Result<()> {
let uri = format!(
"{}/v1/tenant/{tenant_id}/timeline/{timeline_id}/import_basebackup?base_lsn={base_lsn}&end_lsn={end_lsn}&pg_version={pg_version}",
self.mgmt_api_endpoint,
);
self.start_request(Method::PUT, uri)
.body(basebackup_tarball)
.send()
.await
.map_err(Error::SendRequest)?
.error_from_body()
.await?
.json()
.await
.map_err(Error::ReceiveBody)
}
pub async fn import_wal(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
start_lsn: Lsn,
end_lsn: Lsn,
wal_tarball: ReqwestBody,
) -> Result<()> {
let uri = format!(
"{}/v1/tenant/{tenant_id}/timeline/{timeline_id}/import_wal?start_lsn={start_lsn}&end_lsn={end_lsn}",
self.mgmt_api_endpoint,
);
self.start_request(Method::PUT, uri)
.body(wal_tarball)
.send()
.await
.map_err(Error::SendRequest)?
.error_from_body()
.await?
.json()
.await
.map_err(Error::ReceiveBody)
}
}

View File

@@ -47,9 +47,6 @@ use utils::{
project_git_version!(GIT_VERSION);
project_build_tag!(BUILD_TAG);
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
const PID_FILE_NAME: &str = "pageserver.pid";
const FEATURES: &[&str] = &[
@@ -660,6 +657,7 @@ fn start_pageserver(
async move {
page_service::libpq_listener_main(
tenant_manager,
broker_client,
pg_auth,
pageserver_listener,
conf.pg_auth_type,

View File

@@ -10,7 +10,6 @@ use std::time::Duration;
use anyhow::{anyhow, Context, Result};
use enumset::EnumSet;
use futures::StreamExt;
use futures::TryFutureExt;
use humantime::format_rfc3339;
use hyper::header;
@@ -45,14 +44,12 @@ use remote_storage::DownloadError;
use remote_storage::GenericRemoteStorage;
use remote_storage::TimeTravelError;
use tenant_size_model::{svg::SvgBranchKind, SizeResult, StorageModel};
use tokio_util::io::StreamReader;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::auth::JwtAuth;
use utils::failpoint_support::failpoints_handler;
use utils::http::endpoint::prometheus_metrics_handler;
use utils::http::endpoint::request_span;
use utils::http::request::must_parse_query_param;
use utils::http::request::{get_request_param, must_get_query_param, parse_query_param};
use crate::context::{DownloadBehavior, RequestContext};
@@ -2407,189 +2404,6 @@ async fn post_top_tenants(
)
}
async fn put_tenant_timeline_import_basebackup(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
let base_lsn: Lsn = must_parse_query_param(&request, "base_lsn")?;
let end_lsn: Lsn = must_parse_query_param(&request, "end_lsn")?;
let pg_version: u32 = must_parse_query_param(&request, "pg_version")?;
check_permission(&request, Some(tenant_id))?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
let span = info_span!("import_basebackup", tenant_id=%tenant_id, timeline_id=%timeline_id, base_lsn=%base_lsn, end_lsn=%end_lsn, pg_version=%pg_version);
async move {
let state = get_state(&request);
let tenant = state
.tenant_manager
.get_attached_tenant_shard(TenantShardId::unsharded(tenant_id))?;
let broker_client = state.broker_client.clone();
let mut body = StreamReader::new(request.into_body().map(|res| {
res.map_err(|error| {
std::io::Error::new(std::io::ErrorKind::Other, anyhow::anyhow!(error))
})
}));
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
let timeline = tenant
.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
.map_err(ApiError::InternalServerError)
.await?;
// TODO mark timeline as not ready until it reaches end_lsn.
// We might have some wal to import as well, and we should prevent compute
// from connecting before that and writing conflicting wal.
//
// This is not relevant for pageserver->pageserver migrations, since there's
// no wal to import. But should be fixed if we want to import from postgres.
// TODO leave clean state on error. For now you can use detach to clean
// up broken state from a failed import.
// Import basebackup provided via CopyData
info!("importing basebackup");
timeline
.import_basebackup_from_tar(tenant.clone(), &mut body, base_lsn, broker_client, &ctx)
.await
.map_err(ApiError::InternalServerError)?;
// Read the end of the tar archive.
read_tar_eof(body)
.await
.map_err(ApiError::InternalServerError)?;
// TODO check checksum
// Meanwhile you can verify client-side by taking fullbackup
// and checking that it matches in size with what was imported.
// It wouldn't work if base came from vanilla postgres though,
// since we discard some log files.
info!("done");
json_response(StatusCode::OK, ())
}
.instrument(span)
.await
}
async fn put_tenant_timeline_import_wal(
request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
let start_lsn: Lsn = must_parse_query_param(&request, "start_lsn")?;
let end_lsn: Lsn = must_parse_query_param(&request, "end_lsn")?;
check_permission(&request, Some(tenant_id))?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Warn);
let span = info_span!("import_wal", tenant_id=%tenant_id, timeline_id=%timeline_id, start_lsn=%start_lsn, end_lsn=%end_lsn);
async move {
let state = get_state(&request);
let timeline = active_timeline_of_active_tenant(&state.tenant_manager, TenantShardId::unsharded(tenant_id), timeline_id).await?;
let mut body = StreamReader::new(request.into_body().map(|res| {
res.map_err(|error| {
std::io::Error::new(std::io::ErrorKind::Other, anyhow::anyhow!(error))
})
}));
let last_record_lsn = timeline.get_last_record_lsn();
if last_record_lsn != start_lsn {
return Err(ApiError::InternalServerError(anyhow::anyhow!("Cannot import WAL from Lsn {start_lsn} because timeline does not start from the same lsn: {last_record_lsn}")));
}
// TODO leave clean state on error. For now you can use detach to clean
// up broken state from a failed import.
// Import wal provided via CopyData
info!("importing wal");
crate::import_datadir::import_wal_from_tar(&timeline, &mut body, start_lsn, end_lsn, &ctx).await.map_err(ApiError::InternalServerError)?;
info!("wal import complete");
// Read the end of the tar archive.
read_tar_eof(body).await.map_err(ApiError::InternalServerError)?;
// TODO Does it make sense to overshoot?
if timeline.get_last_record_lsn() < end_lsn {
return Err(ApiError::InternalServerError(anyhow::anyhow!("Cannot import WAL from Lsn {start_lsn} because timeline does not start from the same lsn: {last_record_lsn}")));
}
// Flush data to disk, then upload to s3. No need for a forced checkpoint.
// We only want to persist the data, and it doesn't matter if it's in the
// shape of deltas or images.
info!("flushing layers");
timeline.freeze_and_flush().await.map_err(|e| match e {
tenant::timeline::FlushLayerError::Cancelled => ApiError::ShuttingDown,
other => ApiError::InternalServerError(anyhow::anyhow!(other)),
})?;
info!("done");
json_response(StatusCode::OK, ())
}.instrument(span).await
}
/// Read the end of a tar archive.
///
/// A tar archive normally ends with two consecutive blocks of zeros, 512 bytes each.
/// `tokio_tar` already read the first such block. Read the second all-zeros block,
/// and check that there is no more data after the EOF marker.
///
/// 'tar' command can also write extra blocks of zeros, up to a record
/// size, controlled by the --record-size argument. Ignore them too.
async fn read_tar_eof(mut reader: (impl tokio::io::AsyncRead + Unpin)) -> anyhow::Result<()> {
use tokio::io::AsyncReadExt;
let mut buf = [0u8; 512];
// Read the all-zeros block, and verify it
let mut total_bytes = 0;
while total_bytes < 512 {
let nbytes = reader.read(&mut buf[total_bytes..]).await?;
total_bytes += nbytes;
if nbytes == 0 {
break;
}
}
if total_bytes < 512 {
anyhow::bail!("incomplete or invalid tar EOF marker");
}
if !buf.iter().all(|&x| x == 0) {
anyhow::bail!("invalid tar EOF marker");
}
// Drain any extra zero-blocks after the EOF marker
let mut trailing_bytes = 0;
let mut seen_nonzero_bytes = false;
loop {
let nbytes = reader.read(&mut buf).await?;
trailing_bytes += nbytes;
if !buf.iter().all(|&x| x == 0) {
seen_nonzero_bytes = true;
}
if nbytes == 0 {
break;
}
}
if seen_nonzero_bytes {
anyhow::bail!("unexpected non-zero bytes after the tar archive");
}
if trailing_bytes % 512 != 0 {
anyhow::bail!("unexpected number of zeros ({trailing_bytes}), not divisible by tar block size (512 bytes), after the tar archive");
}
Ok(())
}
/// Common functionality of all the HTTP API handlers.
///
/// - Adds a tracing span to each request (by `request_span`)
@@ -2884,13 +2698,5 @@ pub fn make_router(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/perf_info",
|r| testing_api_handler("perf_info", r, perf_info),
)
.put(
"/v1/tenant/:tenant_id/timeline/:timeline_id/import_basebackup",
|r| api_handler(r, put_tenant_timeline_import_basebackup),
)
.put(
"/v1/tenant/:tenant_id/timeline/:timeline_id/import_wal",
|r| api_handler(r, put_tenant_timeline_import_wal),
)
.any(handler_404))
}

View File

@@ -1456,12 +1456,10 @@ impl<'a, 'c> BasebackupQueryTimeOngoingRecording<'a, 'c> {
}
}
pub(crate) static LIVE_CONNECTIONS: Lazy<IntCounterPairVec> = Lazy::new(|| {
register_int_counter_pair_vec!(
"pageserver_live_connections_started",
"Number of network connections that we started handling",
"pageserver_live_connections_finished",
"Number of network connections that we finished handling",
pub(crate) static LIVE_CONNECTIONS_COUNT: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_live_connections",
"Number of live network connections",
&["pageserver_connection_kind"]
)
.expect("failed to define a metric")
@@ -1473,6 +1471,8 @@ pub(crate) enum ComputeCommandKind {
PageStream,
Basebackup,
Fullbackup,
ImportBasebackup,
ImportWal,
LeaseLsn,
Show,
}

View File

@@ -4,7 +4,9 @@
use anyhow::Context;
use async_compression::tokio::write::GzipEncoder;
use bytes::Buf;
use bytes::Bytes;
use futures::stream::FuturesUnordered;
use futures::Stream;
use futures::StreamExt;
use pageserver_api::key::Key;
use pageserver_api::models::TenantState;
@@ -26,6 +28,7 @@ use std::borrow::Cow;
use std::collections::HashMap;
use std::io;
use std::net::TcpListener;
use std::pin::pin;
use std::str;
use std::str::FromStr;
use std::sync::Arc;
@@ -34,6 +37,7 @@ use std::time::Instant;
use std::time::SystemTime;
use tokio::io::AsyncWriteExt;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::io::StreamReader;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::id::ConnectionId;
@@ -49,8 +53,9 @@ use crate::auth::check_permission;
use crate::basebackup;
use crate::basebackup::BasebackupError;
use crate::context::{DownloadBehavior, RequestContext};
use crate::import_datadir::import_wal_from_tar;
use crate::metrics;
use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS};
use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS_COUNT};
use crate::pgdatadir_mapping::Version;
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
@@ -61,6 +66,7 @@ use crate::tenant::mgr::GetTenantError;
use crate::tenant::mgr::ShardResolveResult;
use crate::tenant::mgr::ShardSelector;
use crate::tenant::mgr::TenantManager;
use crate::tenant::timeline::FlushLayerError;
use crate::tenant::timeline::WaitLsnError;
use crate::tenant::GetTimelineError;
use crate::tenant::PageReconstructError;
@@ -76,6 +82,56 @@ use postgres_ffi::BLCKSZ;
// is not yet in state [`TenantState::Active`].
const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
/// Read the end of a tar archive.
///
/// A tar archive normally ends with two consecutive blocks of zeros, 512 bytes each.
/// `tokio_tar` already read the first such block. Read the second all-zeros block,
/// and check that there is no more data after the EOF marker.
///
/// 'tar' command can also write extra blocks of zeros, up to a record
/// size, controlled by the --record-size argument. Ignore them too.
async fn read_tar_eof(mut reader: (impl AsyncRead + Unpin)) -> anyhow::Result<()> {
use tokio::io::AsyncReadExt;
let mut buf = [0u8; 512];
// Read the all-zeros block, and verify it
let mut total_bytes = 0;
while total_bytes < 512 {
let nbytes = reader.read(&mut buf[total_bytes..]).await?;
total_bytes += nbytes;
if nbytes == 0 {
break;
}
}
if total_bytes < 512 {
anyhow::bail!("incomplete or invalid tar EOF marker");
}
if !buf.iter().all(|&x| x == 0) {
anyhow::bail!("invalid tar EOF marker");
}
// Drain any extra zero-blocks after the EOF marker
let mut trailing_bytes = 0;
let mut seen_nonzero_bytes = false;
loop {
let nbytes = reader.read(&mut buf).await?;
trailing_bytes += nbytes;
if !buf.iter().all(|&x| x == 0) {
seen_nonzero_bytes = true;
}
if nbytes == 0 {
break;
}
}
if seen_nonzero_bytes {
anyhow::bail!("unexpected non-zero bytes after the tar archive");
}
if trailing_bytes % 512 != 0 {
anyhow::bail!("unexpected number of zeros ({trailing_bytes}), not divisible by tar block size (512 bytes), after the tar archive");
}
Ok(())
}
///////////////////////////////////////////////////////////////////////////////
///
@@ -85,6 +141,7 @@ const ACTIVE_TENANT_TIMEOUT: Duration = Duration::from_millis(30000);
///
pub async fn libpq_listener_main(
tenant_manager: Arc<TenantManager>,
broker_client: storage_broker::BrokerClientChannel,
auth: Option<Arc<SwappableJwtAuth>>,
listener: TcpListener,
auth_type: AuthType,
@@ -129,6 +186,7 @@ pub async fn libpq_listener_main(
false,
page_service_conn_main(
tenant_manager.clone(),
broker_client.clone(),
local_auth,
socket,
auth_type,
@@ -151,14 +209,20 @@ pub async fn libpq_listener_main(
#[instrument(skip_all, fields(peer_addr))]
async fn page_service_conn_main(
tenant_manager: Arc<TenantManager>,
broker_client: storage_broker::BrokerClientChannel,
auth: Option<Arc<SwappableJwtAuth>>,
socket: tokio::net::TcpStream,
auth_type: AuthType,
connection_ctx: RequestContext,
) -> anyhow::Result<()> {
let _guard = LIVE_CONNECTIONS
.with_label_values(&["page_service"])
.guard();
// Immediately increment the gauge, then create a job to decrement it on task exit.
// One of the pros of `defer!` is that this will *most probably*
// get called, even in presence of panics.
let gauge = LIVE_CONNECTIONS_COUNT.with_label_values(&["page_service"]);
gauge.inc();
scopeguard::defer! {
gauge.dec();
}
socket
.set_nodelay(true)
@@ -203,11 +267,12 @@ async fn page_service_conn_main(
// and create a child per-query context when it invokes process_query.
// But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
// and create the per-query context in process_query ourselves.
let mut conn_handler = PageServerHandler::new(tenant_manager, auth, connection_ctx);
let mut conn_handler =
PageServerHandler::new(tenant_manager, broker_client, auth, connection_ctx);
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
match pgbackend
.run(&mut conn_handler, &task_mgr::shutdown_token())
.run(&mut conn_handler, task_mgr::shutdown_watcher)
.await
{
Ok(()) => {
@@ -234,6 +299,7 @@ struct HandlerTimeline {
}
struct PageServerHandler {
broker_client: storage_broker::BrokerClientChannel,
auth: Option<Arc<SwappableJwtAuth>>,
claims: Option<Claims>,
@@ -325,11 +391,13 @@ impl From<WaitLsnError> for QueryError {
impl PageServerHandler {
pub fn new(
tenant_manager: Arc<TenantManager>,
broker_client: storage_broker::BrokerClientChannel,
auth: Option<Arc<SwappableJwtAuth>>,
connection_ctx: RequestContext,
) -> Self {
PageServerHandler {
tenant_manager,
broker_client,
auth,
claims: None,
connection_ctx,
@@ -412,6 +480,73 @@ impl PageServerHandler {
)
}
fn copyin_stream<'a, IO>(
&'a self,
pgb: &'a mut PostgresBackend<IO>,
cancel: &'a CancellationToken,
) -> impl Stream<Item = io::Result<Bytes>> + 'a
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
async_stream::try_stream! {
loop {
let msg = tokio::select! {
biased;
_ = cancel.cancelled() => {
// We were requested to shut down.
let msg = "pageserver is shutting down";
let _ = pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, None));
Err(QueryError::Shutdown)
}
msg = pgb.read_message() => { msg.map_err(QueryError::from)}
};
match msg {
Ok(Some(message)) => {
let copy_data_bytes = match message {
FeMessage::CopyData(bytes) => bytes,
FeMessage::CopyDone => { break },
FeMessage::Sync => continue,
FeMessage::Terminate => {
let msg = "client terminated connection with Terminate message during COPY";
let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
// error can't happen here, ErrorResponse serialization should be always ok
pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
break;
}
m => {
let msg = format!("unexpected message {m:?}");
// error can't happen here, ErrorResponse serialization should be always ok
pgb.write_message_noflush(&BeMessage::ErrorResponse(&msg, None)).map_err(|e| e.into_io_error())?;
Err(io::Error::new(io::ErrorKind::Other, msg))?;
break;
}
};
yield copy_data_bytes;
}
Ok(None) => {
let msg = "client closed connection during COPY";
let query_error = QueryError::Disconnected(ConnectionError::Io(io::Error::new(io::ErrorKind::ConnectionReset, msg)));
// error can't happen here, ErrorResponse serialization should be always ok
pgb.write_message_noflush(&BeMessage::ErrorResponse(msg, Some(query_error.pg_error_code()))).map_err(|e| e.into_io_error())?;
self.flush_cancellable(pgb, cancel).await.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
Err(io::Error::new(io::ErrorKind::ConnectionReset, msg))?;
}
Err(QueryError::Disconnected(ConnectionError::Io(io_error))) => {
Err(io_error)?;
}
Err(other) => {
Err(io::Error::new(io::ErrorKind::Other, other.to_string()))?;
}
};
}
}
}
#[instrument(skip_all)]
async fn handle_pagerequests<IO>(
&mut self,
@@ -583,6 +718,128 @@ impl PageServerHandler {
Ok(())
}
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all, fields(%base_lsn, end_lsn=%_end_lsn, %pg_version))]
async fn handle_import_basebackup<IO>(
&self,
pgb: &mut PostgresBackend<IO>,
tenant_id: TenantId,
timeline_id: TimelineId,
base_lsn: Lsn,
_end_lsn: Lsn,
pg_version: u32,
ctx: RequestContext,
) -> Result<(), QueryError>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
// Create empty timeline
info!("creating new timeline");
let tenant = self
.get_active_tenant_with_timeout(tenant_id, ShardSelector::Zero, ACTIVE_TENANT_TIMEOUT)
.await?;
let timeline = tenant
.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
.await?;
// TODO mark timeline as not ready until it reaches end_lsn.
// We might have some wal to import as well, and we should prevent compute
// from connecting before that and writing conflicting wal.
//
// This is not relevant for pageserver->pageserver migrations, since there's
// no wal to import. But should be fixed if we want to import from postgres.
// TODO leave clean state on error. For now you can use detach to clean
// up broken state from a failed import.
// Import basebackup provided via CopyData
info!("importing basebackup");
pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
self.flush_cancellable(pgb, &tenant.cancel).await?;
let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb, &tenant.cancel)));
timeline
.import_basebackup_from_tar(
tenant.clone(),
&mut copyin_reader,
base_lsn,
self.broker_client.clone(),
&ctx,
)
.await?;
// Read the end of the tar archive.
read_tar_eof(copyin_reader).await?;
// TODO check checksum
// Meanwhile you can verify client-side by taking fullbackup
// and checking that it matches in size with what was imported.
// It wouldn't work if base came from vanilla postgres though,
// since we discard some log files.
info!("done");
Ok(())
}
#[instrument(skip_all, fields(shard_id, %start_lsn, %end_lsn))]
async fn handle_import_wal<IO>(
&self,
pgb: &mut PostgresBackend<IO>,
tenant_id: TenantId,
timeline_id: TimelineId,
start_lsn: Lsn,
end_lsn: Lsn,
ctx: RequestContext,
) -> Result<(), QueryError>
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
let timeline = self
.get_active_tenant_timeline(tenant_id, timeline_id, ShardSelector::Zero)
.await?;
let last_record_lsn = timeline.get_last_record_lsn();
if last_record_lsn != start_lsn {
return Err(QueryError::Other(
anyhow::anyhow!("Cannot import WAL from Lsn {start_lsn} because timeline does not start from the same lsn: {last_record_lsn}"))
);
}
// TODO leave clean state on error. For now you can use detach to clean
// up broken state from a failed import.
// Import wal provided via CopyData
info!("importing wal");
pgb.write_message_noflush(&BeMessage::CopyInResponse)?;
self.flush_cancellable(pgb, &timeline.cancel).await?;
let mut copyin_reader = pin!(StreamReader::new(self.copyin_stream(pgb, &timeline.cancel)));
import_wal_from_tar(&timeline, &mut copyin_reader, start_lsn, end_lsn, &ctx).await?;
info!("wal import complete");
// Read the end of the tar archive.
read_tar_eof(copyin_reader).await?;
// TODO Does it make sense to overshoot?
if timeline.get_last_record_lsn() < end_lsn {
return Err(QueryError::Other(
anyhow::anyhow!("Cannot import WAL from Lsn {start_lsn} because timeline does not start from the same lsn: {last_record_lsn}"))
);
}
// Flush data to disk, then upload to s3. No need for a forced checkpoint.
// We only want to persist the data, and it doesn't matter if it's in the
// shape of deltas or images.
info!("flushing layers");
timeline.freeze_and_flush().await.map_err(|e| match e {
FlushLayerError::Cancelled => QueryError::Shutdown,
other => QueryError::Other(other.into()),
})?;
info!("done");
Ok(())
}
/// Helper function to handle the LSN from client request.
///
/// Each GetPage (and Exists and Nblocks) request includes information about
@@ -1221,7 +1478,6 @@ impl PageServerHandler {
}
}
#[async_trait::async_trait]
impl<IO> postgres_backend::Handler<IO> for PageServerHandler
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
@@ -1453,6 +1709,109 @@ where
)
.await?;
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.starts_with("import basebackup ") {
// Import the `base` section (everything but the wal) of a basebackup.
// Assumes the tenant already exists on this pageserver.
//
// Files are scheduled to be persisted to remote storage, and the
// caller should poll the http api to check when that is done.
//
// Example import command:
// 1. Get start/end LSN from backup_manifest file
// 2. Run:
// cat my_backup/base.tar | psql -h $PAGESERVER \
// -c "import basebackup $TENANT $TIMELINE $START_LSN $END_LSN $PG_VERSION"
let params = &parts[2..];
if params.len() != 5 {
return Err(QueryError::Other(anyhow::anyhow!(
"invalid param number for import basebackup command"
)));
}
let tenant_id = TenantId::from_str(params[0])
.with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
let timeline_id = TimelineId::from_str(params[1])
.with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
let base_lsn = Lsn::from_str(params[2])
.with_context(|| format!("Failed to parse Lsn from {}", params[2]))?;
let end_lsn = Lsn::from_str(params[3])
.with_context(|| format!("Failed to parse Lsn from {}", params[3]))?;
let pg_version = u32::from_str(params[4])
.with_context(|| format!("Failed to parse pg_version from {}", params[4]))?;
tracing::Span::current()
.record("tenant_id", field::display(tenant_id))
.record("timeline_id", field::display(timeline_id));
self.check_permission(Some(tenant_id))?;
COMPUTE_COMMANDS_COUNTERS
.for_command(ComputeCommandKind::ImportBasebackup)
.inc();
match self
.handle_import_basebackup(
pgb,
tenant_id,
timeline_id,
base_lsn,
end_lsn,
pg_version,
ctx,
)
.await
{
Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
Err(e) => {
error!("error importing base backup between {base_lsn} and {end_lsn}: {e:?}");
pgb.write_message_noflush(&BeMessage::ErrorResponse(
&e.to_string(),
Some(e.pg_error_code()),
))?
}
};
} else if query_string.starts_with("import wal ") {
// Import the `pg_wal` section of a basebackup.
//
// Files are scheduled to be persisted to remote storage, and the
// caller should poll the http api to check when that is done.
let params = &parts[2..];
if params.len() != 4 {
return Err(QueryError::Other(anyhow::anyhow!(
"invalid param number for import wal command"
)));
}
let tenant_id = TenantId::from_str(params[0])
.with_context(|| format!("Failed to parse tenant id from {}", params[0]))?;
let timeline_id = TimelineId::from_str(params[1])
.with_context(|| format!("Failed to parse timeline id from {}", params[1]))?;
let start_lsn = Lsn::from_str(params[2])
.with_context(|| format!("Failed to parse Lsn from {}", params[2]))?;
let end_lsn = Lsn::from_str(params[3])
.with_context(|| format!("Failed to parse Lsn from {}", params[3]))?;
tracing::Span::current()
.record("tenant_id", field::display(tenant_id))
.record("timeline_id", field::display(timeline_id));
self.check_permission(Some(tenant_id))?;
COMPUTE_COMMANDS_COUNTERS
.for_command(ComputeCommandKind::ImportWal)
.inc();
match self
.handle_import_wal(pgb, tenant_id, timeline_id, start_lsn, end_lsn, ctx)
.await
{
Ok(()) => pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?,
Err(e) => {
error!("error importing WAL between {start_lsn} and {end_lsn}: {e:?}");
pgb.write_message_noflush(&BeMessage::ErrorResponse(
&e.to_string(),
Some(e.pg_error_code()),
))?
}
};
} else if query_string.to_ascii_lowercase().starts_with("set ") {
// important because psycopg2 executes "SET datestyle TO 'ISO'"
// on connect

View File

@@ -854,14 +854,13 @@ impl Timeline {
result.add_key(DBDIR_KEY);
// Fetch list of database dirs and iterate them
let dbdir = self.list_dbdirs(lsn, ctx).await?;
let mut dbs: Vec<((Oid, Oid), bool)> = dbdir.into_iter().collect();
let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
let dbdir = DbDirectory::des(&buf)?;
dbs.sort_unstable_by(|(k_a, _), (k_b, _)| k_a.cmp(k_b));
for ((spcnode, dbnode), has_relmap_file) in dbs {
if has_relmap_file {
result.add_key(relmap_file_key(spcnode, dbnode));
}
let mut dbs: Vec<(Oid, Oid)> = dbdir.dbdirs.keys().cloned().collect();
dbs.sort_unstable();
for (spcnode, dbnode) in dbs {
result.add_key(relmap_file_key(spcnode, dbnode));
result.add_key(rel_dir_to_key(spcnode, dbnode));
let mut rels: Vec<RelTag> = self
@@ -920,9 +919,6 @@ impl Timeline {
result.add_key(AUX_FILES_KEY);
}
// Add extra keyspaces in the test cases. Some test cases write keys into the storage without
// creating directory keys. These test cases will add such keyspaces into `extra_test_dense_keyspace`
// and the keys will not be garbage-colllected.
#[cfg(test)]
{
let guard = self.extra_test_dense_keyspace.load();
@@ -931,48 +927,13 @@ impl Timeline {
}
}
let dense_keyspace = result.to_keyspace();
let sparse_keyspace = SparseKeySpace(KeySpace {
ranges: vec![Key::metadata_aux_key_range(), repl_origin_key_range()],
});
if cfg!(debug_assertions) {
// Verify if the sparse keyspaces are ordered and non-overlapping.
// We do not use KeySpaceAccum for sparse_keyspace because we want to ensure each
// category of sparse keys are split into their own image/delta files. If there
// are overlapping keyspaces, they will be automatically merged by keyspace accum,
// and we want the developer to keep the keyspaces separated.
let ranges = &sparse_keyspace.0.ranges;
// TODO: use a single overlaps_with across the codebase
fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
!(a.end <= b.start || b.end <= a.start)
}
for i in 0..ranges.len() {
for j in 0..i {
if overlaps_with(&ranges[i], &ranges[j]) {
panic!(
"overlapping sparse keyspace: {}..{} and {}..{}",
ranges[i].start, ranges[i].end, ranges[j].start, ranges[j].end
);
}
}
}
for i in 1..ranges.len() {
assert!(
ranges[i - 1].end <= ranges[i].start,
"unordered sparse keyspace: {}..{} and {}..{}",
ranges[i - 1].start,
ranges[i - 1].end,
ranges[i].start,
ranges[i].end
);
}
}
Ok((dense_keyspace, sparse_keyspace))
Ok((
result.to_keyspace(),
/* AUX sparse key space */
SparseKeySpace(KeySpace {
ranges: vec![repl_origin_key_range(), Key::metadata_aux_key_range()],
}),
))
}
/// Get cached size of relation if it not updated after specified LSN

View File

@@ -715,22 +715,16 @@ impl InMemoryLayer {
res?;
}
}
// Hold the permit until the IO is done; if we didn't, one could drop this future,
// thereby releasing the permit, but the Vec<u8> remains allocated until the IO completes.
// => we'd have more concurrenct Vec<u8> than allowed as per the semaphore.
drop(_concurrency_permit);
}
}
// MAX is used here because we identify L0 layers by full key range
let delta_layer = delta_layer_writer.finish(Key::MAX, timeline, ctx).await?;
// Hold the permit until all the IO is done, including the fsync in `delta_layer_writer.finish()``.
//
// If we didn't and our caller drops this future, tokio-epoll-uring would extend the lifetime of
// the `file_contents: Vec<u8>` until the IO is done, but not the permit's lifetime.
// Thus, we'd have more concurrenct `Vec<u8>` in existence than the semaphore allows.
//
// We hold across the fsync so that on ext4 mounted with data=ordered, all the kernel page cache pages
// we dirtied when writing to the filesystem have been flushed and marked !dirty.
drop(_concurrency_permit);
Ok(Some(delta_layer))
}
}

View File

@@ -728,9 +728,6 @@ impl From<CreateImageLayersError> for CompactionError {
fn from(e: CreateImageLayersError) -> Self {
match e {
CreateImageLayersError::Cancelled => CompactionError::ShuttingDown,
CreateImageLayersError::Other(e) => {
CompactionError::Other(e.context("create image layers"))
}
_ => CompactionError::Other(e.into()),
}
}

View File

@@ -26,7 +26,7 @@ use tracing::{debug, error, info, trace, warn, Instrument};
use super::TaskStateUpdate;
use crate::{
context::RequestContext,
metrics::{LIVE_CONNECTIONS, WALRECEIVER_STARTED_CONNECTIONS, WAL_INGEST},
metrics::{LIVE_CONNECTIONS_COUNT, WALRECEIVER_STARTED_CONNECTIONS, WAL_INGEST},
task_mgr::TaskKind,
task_mgr::WALRECEIVER_RUNTIME,
tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline, WalReceiverInfo},
@@ -208,9 +208,14 @@ pub(super) async fn handle_walreceiver_connection(
.instrument(tracing::info_span!("poller")),
);
let _guard = LIVE_CONNECTIONS
.with_label_values(&["wal_receiver"])
.guard();
// Immediately increment the gauge, then create a job to decrement it on task exit.
// One of the pros of `defer!` is that this will *most probably*
// get called, even in presence of panics.
let gauge = LIVE_CONNECTIONS_COUNT.with_label_values(&["wal_receiver"]);
gauge.inc();
scopeguard::defer! {
gauge.dec();
}
let identify = identify_system(&replication_client).await?;
info!("{identify:?}");

8
poetry.lock generated
View File

@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand.
# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand.
[[package]]
name = "aiohttp"
@@ -734,13 +734,13 @@ typing-extensions = ">=4.1.0"
[[package]]
name = "certifi"
version = "2024.7.4"
version = "2023.7.22"
description = "Python package for providing Mozilla's CA Bundle."
optional = false
python-versions = ">=3.6"
files = [
{file = "certifi-2024.7.4-py3-none-any.whl", hash = "sha256:c198e21b1289c2ab85ee4e67bb4b4ef3ead0892059901a8d5b622f24a1101e90"},
{file = "certifi-2024.7.4.tar.gz", hash = "sha256:5a1e7645bc0ec61a09e26c36f6106dd4cf40c6db3a1fb6352b0244e7fb057c7b"},
{file = "certifi-2023.7.22-py3-none-any.whl", hash = "sha256:92d6037539857d8206b8f6ae472e8b77db8058fec5937a1ef3f54304089edbb9"},
{file = "certifi-2023.7.22.tar.gz", hash = "sha256:539cc1d13202e33ca466e88b2807e29f4c13049d6d87031a3c110744495cb082"},
]
[[package]]

View File

@@ -216,11 +216,10 @@ async fn ssl_handshake<S: AsyncRead + AsyncWrite + Unpin>(
use pq_proto::FeStartupPacket::*;
match msg {
SslRequest { direct: false } => {
SslRequest => {
stream
.write_message(&pq_proto::BeMessage::EncryptionResponse(true))
.await?;
// Upgrade raw stream into a secure TLS-backed stream.
// NOTE: We've consumed `tls`; this fact will be used later.

View File

@@ -75,9 +75,6 @@ impl TlsConfig {
}
}
/// <https://github.com/postgres/postgres/blob/ca481d3c9ab7bf69ff0c8d71ad3951d407f6a33c/src/include/libpq/pqcomm.h#L159>
pub const PG_ALPN_PROTOCOL: &[u8] = b"postgresql";
/// Configure TLS for the main endpoint.
pub fn configure_tls(
key_path: &str,
@@ -114,17 +111,16 @@ pub fn configure_tls(
let cert_resolver = Arc::new(cert_resolver);
// allow TLS 1.2 to be compatible with older client libraries
let mut config = rustls::ServerConfig::builder_with_protocol_versions(&[
let config = rustls::ServerConfig::builder_with_protocol_versions(&[
&rustls::version::TLS13,
&rustls::version::TLS12,
])
.with_no_client_auth()
.with_cert_resolver(cert_resolver.clone());
config.alpn_protocols = vec![PG_ALPN_PROTOCOL.to_vec()];
.with_cert_resolver(cert_resolver.clone())
.into();
Ok(TlsConfig {
config: Arc::new(config),
config,
common_names,
cert_resolver,
})

View File

@@ -6,9 +6,8 @@ use anyhow::Context;
use once_cell::sync::Lazy;
use postgres_backend::{AuthType, PostgresBackend, PostgresBackendTCP, QueryError};
use pq_proto::{BeMessage, SINGLE_COL_ROWDESC};
use std::convert::Infallible;
use std::{convert::Infallible, future};
use tokio::net::{TcpListener, TcpStream};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, info_span, Instrument};
static CPLANE_WAITERS: Lazy<Waiters<ComputeReady>> = Lazy::new(Default::default);
@@ -68,9 +67,7 @@ pub async fn task_main(listener: TcpListener) -> anyhow::Result<Infallible> {
async fn handle_connection(socket: TcpStream) -> Result<(), QueryError> {
let pgbackend = PostgresBackend::new(socket, AuthType::Trust, None)?;
pgbackend
.run(&mut MgmtHandler, &CancellationToken::new())
.await
pgbackend.run(&mut MgmtHandler, future::pending::<()>).await
}
/// A message received by `mgmt` when a compute node is ready.
@@ -78,7 +75,6 @@ pub type ComputeReady = DatabaseInfo;
// TODO: replace with an http-based protocol.
struct MgmtHandler;
#[async_trait::async_trait]
impl postgres_backend::Handler<tokio::net::TcpStream> for MgmtHandler {
async fn process_query(
&mut self,

View File

@@ -3,8 +3,8 @@ use std::marker::PhantomData;
use measured::{
label::NoLabels,
metric::{
gauge::GaugeState, group::Encoding, name::MetricNameEncoder, MetricEncoding,
MetricFamilyEncoding, MetricType,
gauge::GaugeState, group::Encoding, group::MetricValue, name::MetricNameEncoder,
MetricEncoding, MetricFamilyEncoding, MetricType,
},
text::TextEncoder,
LabelGroup, MetricGroup,
@@ -100,7 +100,7 @@ macro_rules! jemalloc_gauge {
enc: &mut TextEncoder<W>,
) -> Result<(), std::io::Error> {
if let Ok(v) = mib.read() {
GaugeState::new(v as i64).collect_into(&(), labels, name, enc)?;
enc.write_metric_value(name, labels, MetricValue::Int(v as i64))?;
}
Ok(())
}

View File

@@ -2,7 +2,7 @@ use std::sync::{Arc, OnceLock};
use lasso::ThreadedRodeo;
use measured::{
label::{FixedCardinalitySet, LabelGroupSet, LabelName, LabelSet, LabelValue, StaticLabelSet},
label::{FixedCardinalitySet, LabelName, LabelSet, LabelValue, StaticLabelSet},
metric::{histogram::Thresholds, name::MetricName},
Counter, CounterVec, FixedCardinalityLabel, Gauge, GaugeVec, Histogram, HistogramVec,
LabelGroup, MetricGroup,
@@ -577,32 +577,6 @@ impl LabelGroup for ThreadPoolWorkerId {
}
}
impl LabelGroupSet for ThreadPoolWorkers {
type Group<'a> = ThreadPoolWorkerId;
fn cardinality(&self) -> Option<usize> {
Some(self.0)
}
fn encode_dense(&self, value: Self::Unique) -> Option<usize> {
Some(value)
}
fn decode_dense(&self, value: usize) -> Self::Group<'_> {
ThreadPoolWorkerId(value)
}
type Unique = usize;
fn encode(&self, value: Self::Group<'_>) -> Option<Self::Unique> {
Some(value.0)
}
fn decode(&self, value: &Self::Unique) -> Self::Group<'_> {
ThreadPoolWorkerId(*value)
}
}
impl LabelSet for ThreadPoolWorkers {
type Value<'a> = ThreadPoolWorkerId;

View File

@@ -1,17 +1,11 @@
use bytes::Buf;
use pq_proto::{
framed::Framed, BeMessage as Be, CancelKeyData, FeStartupPacket, ProtocolVersion,
StartupMessageParams,
};
use pq_proto::{BeMessage as Be, CancelKeyData, FeStartupPacket, StartupMessageParams};
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{info, warn};
use tracing::info;
use crate::{
auth::endpoint_sni,
config::{TlsConfig, PG_ALPN_PROTOCOL},
config::TlsConfig,
error::ReportableError,
metrics::Metrics,
proxy::ERR_INSECURE_CONNECTION,
stream::{PqStream, Stream, StreamUpgradeError},
};
@@ -74,9 +68,6 @@ pub async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
// Client may try upgrading to each protocol only once
let (mut tried_ssl, mut tried_gss) = (false, false);
const PG_PROTOCOL_EARLIEST: ProtocolVersion = ProtocolVersion::new(3, 0);
const PG_PROTOCOL_LATEST: ProtocolVersion = ProtocolVersion::new(3, 0);
let mut stream = PqStream::new(Stream::from_raw(stream));
loop {
let msg = stream.read_startup_packet().await?;
@@ -84,96 +75,40 @@ pub async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
use FeStartupPacket::*;
match msg {
SslRequest { direct } => match stream.get_ref() {
SslRequest => match stream.get_ref() {
Stream::Raw { .. } if !tried_ssl => {
tried_ssl = true;
// We can't perform TLS handshake without a config
let have_tls = tls.is_some();
if !direct {
stream
.write_message(&Be::EncryptionResponse(have_tls))
.await?;
} else if !have_tls {
return Err(HandshakeError::ProtocolViolation);
}
let enc = tls.is_some();
stream.write_message(&Be::EncryptionResponse(enc)).await?;
if let Some(tls) = tls.take() {
// Upgrade raw stream into a secure TLS-backed stream.
// NOTE: We've consumed `tls`; this fact will be used later.
let Framed {
stream: raw,
read_buf,
write_buf,
} = stream.framed;
let Stream::Raw { raw } = raw else {
return Err(HandshakeError::StreamUpgradeError(
StreamUpgradeError::AlreadyTls,
));
};
let mut read_buf = read_buf.reader();
let mut res = Ok(());
let accept = tokio_rustls::TlsAcceptor::from(tls.to_server_config())
.accept_with(raw, |session| {
// push the early data to the tls session
while !read_buf.get_ref().is_empty() {
match session.read_tls(&mut read_buf) {
Ok(_) => {}
Err(e) => {
res = Err(e);
break;
}
}
}
});
res?;
let read_buf = read_buf.into_inner();
let (raw, read_buf) = stream.into_inner();
// TODO: Normally, client doesn't send any data before
// server says TLS handshake is ok and read_buf is empy.
// However, you could imagine pipelining of postgres
// SSLRequest + TLS ClientHello in one hunk similar to
// pipelining in our node js driver. We should probably
// support that by chaining read_buf with the stream.
if !read_buf.is_empty() {
return Err(HandshakeError::EarlyData);
}
let tls_stream = accept.await.inspect_err(|_| {
if record_handshake_error {
Metrics::get().proxy.tls_handshake_failures.inc()
}
})?;
let conn_info = tls_stream.get_ref().1;
// check the ALPN, if exists, as required.
match conn_info.alpn_protocol() {
None | Some(PG_ALPN_PROTOCOL) => {}
Some(other) => {
// try parse ep for better error
let ep = conn_info.server_name().and_then(|sni| {
endpoint_sni(sni, &tls.common_names).ok().flatten()
});
let alpn = String::from_utf8_lossy(other);
warn!(?ep, %alpn, "unexpected ALPN");
return Err(HandshakeError::ProtocolViolation);
}
}
let tls_stream = raw
.upgrade(tls.to_server_config(), record_handshake_error)
.await?;
let (_, tls_server_end_point) = tls
.cert_resolver
.resolve(conn_info.server_name())
.resolve(tls_stream.get_ref().1.server_name())
.ok_or(HandshakeError::MissingCertificate)?;
stream = PqStream {
framed: Framed {
stream: Stream::Tls {
tls: Box::new(tls_stream),
tls_server_end_point,
},
read_buf,
write_buf,
},
};
stream = PqStream::new(Stream::Tls {
tls: Box::new(tls_stream),
tls_server_end_point,
});
}
}
_ => return Err(HandshakeError::ProtocolViolation),
@@ -187,9 +122,7 @@ pub async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
}
_ => return Err(HandshakeError::ProtocolViolation),
},
StartupMessage { params, version }
if PG_PROTOCOL_EARLIEST <= version && version <= PG_PROTOCOL_LATEST =>
{
StartupMessage { params, .. } => {
// Check that the config has been consumed during upgrade
// OR we didn't provide it at all (for dev purposes).
if tls.is_some() {
@@ -198,48 +131,9 @@ pub async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
.await?;
}
info!(?version, session_type = "normal", "successful handshake");
info!(session_type = "normal", "successful handshake");
break Ok(HandshakeData::Startup(stream, params));
}
// downgrade protocol version
StartupMessage { params, version }
if version.major() == 3 && version > PG_PROTOCOL_LATEST =>
{
warn!(?version, "unsupported minor version");
// no protocol extensions are supported.
// <https://github.com/postgres/postgres/blob/ca481d3c9ab7bf69ff0c8d71ad3951d407f6a33c/src/backend/tcop/backend_startup.c#L744-L753>
let mut unsupported = vec![];
for (k, _) in params.iter() {
if k.starts_with("_pq_.") {
unsupported.push(k);
}
}
// TODO: remove unsupported options so we don't send them to compute.
stream
.write_message(&Be::NegotiateProtocolVersion {
version: PG_PROTOCOL_LATEST,
options: &unsupported,
})
.await?;
info!(
?version,
session_type = "normal",
"successful handshake; unsupported minor version requested"
);
break Ok(HandshakeData::Startup(stream, params));
}
StartupMessage { version, .. } => {
warn!(
?version,
session_type = "normal",
"unsuccessful handshake; unsupported version"
);
return Err(HandshakeError::ProtocolViolation);
}
CancelRequest(cancel_key_data) => {
info!(session_type = "cancellation", "successful handshake");
break Ok(HandshakeData::Cancel(cancel_key_data));

View File

@@ -838,9 +838,8 @@ async fn query_to_json<T: GenericClient>(
"finished reading rows"
);
let columns_len = row_stream.columns().len();
let mut fields = Vec::with_capacity(columns_len);
let mut columns = Vec::with_capacity(columns_len);
let mut fields = vec![];
let mut columns = vec![];
for c in row_stream.columns() {
fields.push(json!({

View File

@@ -95,7 +95,6 @@ fn cmd_to_string(cmd: &SafekeeperPostgresCommand) -> &str {
}
}
#[async_trait::async_trait]
impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
for SafekeeperPostgresHandler
{

View File

@@ -145,6 +145,10 @@ pub static WAL_SERVICE_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("WAL service worker")
.enable_all()
// Default is 2 MiB at time of writing.
// With that, the `postgres_backend::Handler` overflows the stack in debug builds.
// => use 4 MiB
.thread_stack_size(4 * 1024 * 1024)
.build()
.expect("Failed to create WAL service runtime")
});

View File

@@ -4,10 +4,9 @@
//!
use anyhow::{Context, Result};
use postgres_backend::QueryError;
use std::time::Duration;
use std::{future, time::Duration};
use tokio::net::TcpStream;
use tokio_io_timeout::TimeoutReader;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::{auth::Scope, measured_stream::MeasuredStream};
@@ -101,7 +100,7 @@ async fn handle_socket(
// libpq protocol between safekeeper and walproposer / pageserver
// We don't use shutdown.
pgbackend
.run(&mut conn_handler, &CancellationToken::new())
.run(&mut conn_handler, future::pending::<()>)
.await
}

View File

@@ -226,7 +226,7 @@ impl Node {
fn is_fatal(e: &mgmt_api::Error) -> bool {
use mgmt_api::Error::*;
match e {
SendRequest(_) | ReceiveBody(_) | ReceiveErrorBody(_) => false,
ReceiveBody(_) | ReceiveErrorBody(_) => false,
ApiError(StatusCode::SERVICE_UNAVAILABLE, _)
| ApiError(StatusCode::GATEWAY_TIMEOUT, _)
| ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,

View File

@@ -151,10 +151,6 @@ struct ServiceState {
/// controller API.
fn passthrough_api_error(node: &Node, e: mgmt_api::Error) -> ApiError {
match e {
mgmt_api::Error::SendRequest(e) => {
// Presume errors sending requests are connectivity/availability issues
ApiError::ResourceUnavailable(format!("{node} error sending request: {e}").into())
}
mgmt_api::Error::ReceiveErrorBody(str) => {
// Presume errors receiving body are connectivity/availability issues
ApiError::ResourceUnavailable(
@@ -4066,14 +4062,7 @@ impl Service {
placement_policy: Some(PlacementPolicy::Attached(0)), // No secondaries, for convenient debug/hacking
// There is no way to know what the tenant's config was: revert to defaults
//
// TODO: remove `switch_aux_file_policy` once we finish auxv2 migration
//
// we write to both v1+v2 storage, so that the test case can use either storage format for testing
config: TenantConfig {
switch_aux_file_policy: Some(models::AuxFilePolicy::CrossValidation),
..TenantConfig::default()
},
config: TenantConfig::default(),
})
.await?;

View File

@@ -1,4 +1,4 @@
use futures::{StreamExt, TryStreamExt};
use futures::StreamExt;
use pageserver::tenant::storage_layer::LayerName;
use serde::{Deserialize, Serialize};
@@ -29,7 +29,7 @@ impl LargeObjectKind {
}
}
#[derive(Serialize, Deserialize, Clone)]
#[derive(Serialize, Deserialize)]
pub struct LargeObject {
pub key: String,
pub size: u64,
@@ -45,76 +45,53 @@ pub async fn find_large_objects(
bucket_config: BucketConfig,
min_size: u64,
ignore_deltas: bool,
concurrency: usize,
) -> anyhow::Result<LargeObjectListing> {
let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
let tenants = std::pin::pin!(stream_tenants(&s3_client, &target));
let objects_stream = tenants.map_ok(|tenant_shard_id| {
let mut tenant_root = target.tenant_root(&tenant_shard_id);
let s3_client = s3_client.clone();
async move {
let mut objects = Vec::new();
let mut total_objects_ctr = 0u64;
// We want the objects and not just common prefixes
tenant_root.delimiter.clear();
let mut continuation_token = None;
loop {
let fetch_response =
list_objects_with_retries(&s3_client, &tenant_root, continuation_token.clone())
.await?;
for obj in fetch_response.contents().iter().filter(|o| {
if let Some(obj_size) = o.size {
min_size as i64 <= obj_size
} else {
false
}
}) {
let key = obj.key().expect("couldn't get key").to_owned();
let kind = LargeObjectKind::from_key(&key);
if ignore_deltas && kind == LargeObjectKind::DeltaLayer {
continue;
}
objects.push(LargeObject {
key,
size: obj.size.unwrap() as u64,
kind,
})
}
total_objects_ctr += fetch_response.contents().len() as u64;
match fetch_response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}
Ok((tenant_shard_id, objects, total_objects_ctr))
}
});
let mut objects_stream = std::pin::pin!(objects_stream.try_buffer_unordered(concurrency));
let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?;
let mut tenants = std::pin::pin!(stream_tenants(&s3_client, &target));
let mut objects = Vec::new();
let mut tenant_ctr = 0u64;
let mut object_ctr = 0u64;
while let Some(res) = objects_stream.next().await {
let (tenant_shard_id, objects_slice, total_objects_ctr) = res?;
objects.extend_from_slice(&objects_slice);
while let Some(tenant_shard_id) = tenants.next().await {
let tenant_shard_id = tenant_shard_id?;
let mut tenant_root = target.tenant_root(&tenant_shard_id);
// We want the objects and not just common prefixes
tenant_root.delimiter.clear();
let mut continuation_token = None;
loop {
let fetch_response =
list_objects_with_retries(&s3_client, &tenant_root, continuation_token.clone())
.await?;
for obj in fetch_response.contents().iter().filter(|o| {
if let Some(obj_size) = o.size {
min_size as i64 <= obj_size
} else {
false
}
}) {
let key = obj.key().expect("couldn't get key").to_owned();
let kind = LargeObjectKind::from_key(&key);
if ignore_deltas && kind == LargeObjectKind::DeltaLayer {
continue;
}
objects.push(LargeObject {
key,
size: obj.size.unwrap() as u64,
kind,
})
}
object_ctr += fetch_response.contents().len() as u64;
match fetch_response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}
object_ctr += total_objects_ctr;
tenant_ctr += 1;
if tenant_ctr % 100 == 0 {
if tenant_ctr % 50 == 0 {
tracing::info!(
"Scanned {tenant_ctr} shards. objects={object_ctr}, found={}, current={tenant_shard_id}.",
objects.len()
"Scanned {tenant_ctr} shards. objects={object_ctr}, found={}, current={tenant_shard_id}.", objects.len()
);
}
}
let bucket_name = target.bucket_name();
tracing::info!(
"Scan of {bucket_name} finished. Scanned {tenant_ctr} shards. objects={object_ctr}, found={}.",
objects.len()
);
Ok(LargeObjectListing { objects })
}

View File

@@ -140,7 +140,7 @@ async fn find_garbage_inner(
node_kind: NodeKind,
) -> anyhow::Result<GarbageList> {
// Construct clients for S3 and for Console API
let (s3_client, target) = init_remote(bucket_config.clone(), node_kind).await?;
let (s3_client, target) = init_remote(bucket_config.clone(), node_kind)?;
let cloud_admin_api_client = Arc::new(CloudAdminApiClient::new(console_config));
// Build a set of console-known tenants, for quickly eliminating known-active tenants without having
@@ -432,7 +432,7 @@ pub async fn purge_garbage(
);
let (s3_client, target) =
init_remote(garbage_list.bucket_config.clone(), garbage_list.node_kind).await?;
init_remote(garbage_list.bucket_config.clone(), garbage_list.node_kind)?;
// Sanity checks on the incoming list
if garbage_list.active_tenant_count == 0 {

View File

@@ -15,10 +15,17 @@ use std::fmt::Display;
use std::sync::Arc;
use std::time::Duration;
use anyhow::{anyhow, Context};
use aws_sdk_s3::config::Region;
use aws_sdk_s3::error::DisplayErrorContext;
use aws_sdk_s3::Client;
use anyhow::Context;
use aws_config::environment::EnvironmentVariableCredentialsProvider;
use aws_config::imds::credentials::ImdsCredentialsProvider;
use aws_config::meta::credentials::CredentialsProviderChain;
use aws_config::profile::ProfileFileCredentialsProvider;
use aws_config::retry::RetryConfig;
use aws_config::sso::SsoCredentialsProvider;
use aws_config::BehaviorVersion;
use aws_sdk_s3::config::{AsyncSleep, Region, SharedAsyncSleep};
use aws_sdk_s3::{Client, Config};
use aws_smithy_async::rt::sleep::TokioSleep;
use camino::{Utf8Path, Utf8PathBuf};
use clap::ValueEnum;
@@ -235,53 +242,85 @@ impl ConsoleConfig {
}
}
pub fn init_logging(file_name: &str) -> Option<WorkerGuard> {
pub fn init_logging(file_name: &str) -> WorkerGuard {
let (file_writer, guard) =
tracing_appender::non_blocking(tracing_appender::rolling::never("./logs/", file_name));
let file_logs = fmt::Layer::new()
.with_target(false)
.with_ansi(false)
.with_writer(file_writer);
let stderr_logs = fmt::Layer::new()
.with_target(false)
.with_writer(std::io::stderr);
tracing_subscriber::registry()
.with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
.with(file_logs)
.with(stderr_logs)
.init();
let disable_file_logging = match std::env::var("PAGESERVER_DISABLE_FILE_LOGGING") {
Ok(s) => s == "1" || s.to_lowercase() == "true",
Err(_) => false,
guard
}
pub fn init_s3_client(bucket_region: Region) -> Client {
let credentials_provider = {
// uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
let chain = CredentialsProviderChain::first_try(
"env",
EnvironmentVariableCredentialsProvider::new(),
)
// uses "AWS_PROFILE" / `aws sso login --profile <profile>`
.or_else(
"profile-sso",
ProfileFileCredentialsProvider::builder().build(),
);
// Use SSO if we were given an account ID
match std::env::var("SSO_ACCOUNT_ID").ok() {
Some(sso_account) => chain.or_else(
"sso",
SsoCredentialsProvider::builder()
.account_id(sso_account)
.role_name("PowerUserAccess")
.start_url("https://neondb.awsapps.com/start")
.region(bucket_region.clone())
.build(),
),
None => chain,
}
.or_else(
// Finally try IMDS
"imds",
ImdsCredentialsProvider::builder().build(),
)
};
if disable_file_logging {
tracing_subscriber::registry()
.with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
.with(stderr_logs)
.init();
None
} else {
let (file_writer, guard) =
tracing_appender::non_blocking(tracing_appender::rolling::never("./logs/", file_name));
let file_logs = fmt::Layer::new()
.with_target(false)
.with_ansi(false)
.with_writer(file_writer);
tracing_subscriber::registry()
.with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
.with(stderr_logs)
.with(file_logs)
.init();
Some(guard)
}
}
let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
pub async fn init_s3_client(bucket_region: Region) -> Client {
let config = aws_config::defaults(aws_config::BehaviorVersion::v2024_03_28())
let mut builder = Config::builder()
.behavior_version(
#[allow(deprecated)] /* TODO: https://github.com/neondatabase/neon/issues/7665 */
BehaviorVersion::v2023_11_09(),
)
.region(bucket_region)
.load()
.await;
Client::new(&config)
.retry_config(RetryConfig::adaptive().with_max_attempts(3))
.sleep_impl(SharedAsyncSleep::from(sleep_impl))
.credentials_provider(credentials_provider);
if let Ok(endpoint) = env::var("AWS_ENDPOINT_URL") {
builder = builder.endpoint_url(endpoint)
}
Client::from_conf(builder.build())
}
async fn init_remote(
fn init_remote(
bucket_config: BucketConfig,
node_kind: NodeKind,
) -> anyhow::Result<(Arc<Client>, RootTarget)> {
let bucket_region = Region::new(bucket_config.region);
let delimiter = "/".to_string();
let s3_client = Arc::new(init_s3_client(bucket_region).await);
let s3_client = Arc::new(init_s3_client(bucket_region));
let s3_root = match node_kind {
NodeKind::Pageserver => RootTarget::Pageserver(S3Target {
@@ -306,7 +345,7 @@ async fn list_objects_with_retries(
s3_target: &S3Target,
continuation_token: Option<String>,
) -> anyhow::Result<aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Output> {
for trial in 0..MAX_RETRIES {
for _ in 0..MAX_RETRIES {
match s3_client
.list_objects_v2()
.bucket(&s3_target.bucket_name)
@@ -318,22 +357,16 @@ async fn list_objects_with_retries(
{
Ok(response) => return Ok(response),
Err(e) => {
if trial == MAX_RETRIES - 1 {
return Err(e)
.with_context(|| format!("Failed to list objects {MAX_RETRIES} times"));
}
error!(
"list_objects_v2 query failed: bucket_name={}, prefix={}, delimiter={}, error={}",
s3_target.bucket_name,
s3_target.prefix_in_bucket,
s3_target.delimiter,
DisplayErrorContext(e),
"list_objects_v2 query failed: {e}, bucket_name={}, prefix={}, delimiter={}",
s3_target.bucket_name, s3_target.prefix_in_bucket, s3_target.delimiter
);
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
Err(anyhow!("unreachable unless MAX_RETRIES==0"))
anyhow::bail!("Failed to list objects {MAX_RETRIES} times")
}
async fn download_object_with_retries(

View File

@@ -78,8 +78,6 @@ enum Command {
min_size: u64,
#[arg(short, long, default_value_t = false)]
ignore_deltas: bool,
#[arg(long = "concurrency", short = 'j', default_value_t = 64)]
concurrency: usize,
},
}
@@ -196,7 +194,7 @@ async fn main() -> anyhow::Result<()> {
concurrency,
} => {
let downloader =
SnapshotDownloader::new(bucket_config, tenant_id, output_path, concurrency).await?;
SnapshotDownloader::new(bucket_config, tenant_id, output_path, concurrency)?;
downloader.download().await
}
Command::PageserverPhysicalGc {
@@ -212,15 +210,10 @@ async fn main() -> anyhow::Result<()> {
Command::FindLargeObjects {
min_size,
ignore_deltas,
concurrency,
} => {
let summary = find_large_objects::find_large_objects(
bucket_config,
min_size,
ignore_deltas,
concurrency,
)
.await?;
let summary =
find_large_objects::find_large_objects(bucket_config, min_size, ignore_deltas)
.await?;
println!("{}", serde_json::to_string(&summary).unwrap());
Ok(())
}

View File

@@ -160,7 +160,7 @@ pub async fn pageserver_physical_gc(
min_age: Duration,
mode: GcMode,
) -> anyhow::Result<GcSummary> {
let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?;
let tenants = if tenant_ids.is_empty() {
futures::future::Either::Left(stream_tenants(&s3_client, &target))

View File

@@ -199,7 +199,7 @@ pub async fn scan_metadata(
bucket_config: BucketConfig,
tenant_ids: Vec<TenantShardId>,
) -> anyhow::Result<MetadataSummary> {
let (s3_client, target) = init_remote(bucket_config, NodeKind::Pageserver).await?;
let (s3_client, target) = init_remote(bucket_config, NodeKind::Pageserver)?;
let tenants = if tenant_ids.is_empty() {
futures::future::Either::Left(stream_tenants(&s3_client, &target))

View File

@@ -106,7 +106,7 @@ pub async fn scan_safekeeper_metadata(
let timelines = client.query(&query, &[]).await?;
info!("loaded {} timelines", timelines.len());
let (s3_client, target) = init_remote(bucket_config, NodeKind::Safekeeper).await?;
let (s3_client, target) = init_remote(bucket_config, NodeKind::Safekeeper)?;
let console_config = ConsoleConfig::from_env()?;
let cloud_admin_api_client = CloudAdminApiClient::new(console_config);

View File

@@ -28,13 +28,13 @@ pub struct SnapshotDownloader {
}
impl SnapshotDownloader {
pub async fn new(
pub fn new(
bucket_config: BucketConfig,
tenant_id: TenantId,
output_path: Utf8PathBuf,
concurrency: usize,
) -> anyhow::Result<Self> {
let (s3_client, s3_root) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
let (s3_client, s3_root) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?;
Ok(Self {
s3_client,
s3_root,
@@ -215,8 +215,7 @@ impl SnapshotDownloader {
}
pub async fn download(&self) -> anyhow::Result<()> {
let (s3_client, target) =
init_remote(self.bucket_config.clone(), NodeKind::Pageserver).await?;
let (s3_client, target) = init_remote(self.bucket_config.clone(), NodeKind::Pageserver)?;
// Generate a stream of TenantShardId
let shards = stream_tenant_shards(&s3_client, &target, self.tenant_id).await?;

View File

@@ -1,263 +0,0 @@
from __future__ import annotations
import time
from typing import TYPE_CHECKING, cast
import requests
if TYPE_CHECKING:
from typing import Any, Dict, Literal, Optional, Union
from fixtures.pg_version import PgVersion
def connection_parameters_to_env(params: Dict[str, str]) -> Dict[str, str]:
return {
"PGHOST": params["host"],
"PGDATABASE": params["database"],
"PGUSER": params["role"],
"PGPASSWORD": params["password"],
}
class NeonAPI:
def __init__(self, neon_api_key: str, neon_api_base_url: str):
self.__neon_api_key = neon_api_key
self.__neon_api_base_url = neon_api_base_url.strip("/")
def __request(
self, method: Union[str, bytes], endpoint: str, **kwargs: Any
) -> requests.Response:
if "headers" not in kwargs:
kwargs["headers"] = {}
kwargs["headers"]["Authorization"] = f"Bearer {self.__neon_api_key}"
return requests.request(method, f"{self.__neon_api_base_url}{endpoint}", **kwargs)
def create_project(
self,
pg_version: Optional[PgVersion] = None,
name: Optional[str] = None,
branch_name: Optional[str] = None,
branch_role_name: Optional[str] = None,
branch_database_name: Optional[str] = None,
) -> Dict[str, Any]:
data: Dict[str, Any] = {
"project": {
"branch": {},
},
}
if name:
data["project"]["name"] = name
if pg_version:
data["project"]["pg_version"] = int(pg_version)
if branch_name:
data["project"]["branch"]["name"] = branch_name
if branch_role_name:
data["project"]["branch"]["role_name"] = branch_role_name
if branch_database_name:
data["project"]["branch"]["database_name"] = branch_database_name
resp = self.__request(
"POST",
"/projects",
headers={
"Accept": "application/json",
"Content-Type": "application/json",
},
json=data,
)
assert resp.status_code == 201
return cast("Dict[str, Any]", resp.json())
def get_project_details(self, project_id: str) -> Dict[str, Any]:
resp = self.__request(
"GET",
f"/projects/{project_id}",
headers={
"Accept": "application/json",
"Content-Type": "application/json",
},
)
assert resp.status_code == 200
return cast("Dict[str, Any]", resp.json())
def delete_project(
self,
project_id: str,
) -> Dict[str, Any]:
resp = self.__request(
"DELETE",
f"/projects/{project_id}",
headers={
"Accept": "application/json",
"Content-Type": "application/json",
},
)
assert resp.status_code == 200
return cast("Dict[str, Any]", resp.json())
def start_endpoint(
self,
project_id: str,
endpoint_id: str,
) -> Dict[str, Any]:
resp = self.__request(
"POST",
f"/projects/{project_id}/endpoints/{endpoint_id}/start",
headers={
"Accept": "application/json",
},
)
assert resp.status_code == 200
return cast("Dict[str, Any]", resp.json())
def suspend_endpoint(
self,
project_id: str,
endpoint_id: str,
) -> Dict[str, Any]:
resp = self.__request(
"POST",
f"/projects/{project_id}/endpoints/{endpoint_id}/suspend",
headers={
"Accept": "application/json",
},
)
assert resp.status_code == 200
return cast("Dict[str, Any]", resp.json())
def restart_endpoint(
self,
project_id: str,
endpoint_id: str,
) -> Dict[str, Any]:
resp = self.__request(
"POST",
f"/projects/{project_id}/endpoints/{endpoint_id}/restart",
headers={
"Accept": "application/json",
},
)
assert resp.status_code == 200
return cast("Dict[str, Any]", resp.json())
def create_endpoint(
self,
project_id: str,
branch_id: str,
endpoint_type: Literal["read_write", "read_only"],
settings: Dict[str, Any],
) -> Dict[str, Any]:
data: Dict[str, Any] = {
"endpoint": {
"branch_id": branch_id,
},
}
if endpoint_type:
data["endpoint"]["type"] = endpoint_type
if settings:
data["endpoint"]["settings"] = settings
resp = self.__request(
"POST",
f"/projects/{project_id}/endpoints",
headers={
"Accept": "application/json",
"Content-Type": "application/json",
},
json=data,
)
assert resp.status_code == 201
return cast("Dict[str, Any]", resp.json())
def get_connection_uri(
self,
project_id: str,
branch_id: Optional[str] = None,
endpoint_id: Optional[str] = None,
database_name: str = "neondb",
role_name: str = "neondb_owner",
pooled: bool = True,
) -> Dict[str, Any]:
resp = self.__request(
"GET",
f"/projects/{project_id}/connection_uri",
params={
"branch_id": branch_id,
"endpoint_id": endpoint_id,
"database_name": database_name,
"role_name": role_name,
"pooled": pooled,
},
headers={
"Accept": "application/json",
},
)
assert resp.status_code == 200
return cast("Dict[str, Any]", resp.json())
def get_branches(self, project_id: str) -> Dict[str, Any]:
resp = self.__request(
"GET",
f"/projects/{project_id}/branches",
headers={
"Accept": "application/json",
},
)
assert resp.status_code == 200
return cast("Dict[str, Any]", resp.json())
def get_endpoints(self, project_id: str) -> Dict[str, Any]:
resp = self.__request(
"GET",
f"/projects/{project_id}/endpoints",
headers={
"Accept": "application/json",
},
)
assert resp.status_code == 200
return cast("Dict[str, Any]", resp.json())
def get_operations(self, project_id: str) -> Dict[str, Any]:
resp = self.__request(
"GET",
f"/projects/{project_id}/operations",
headers={
"Accept": "application/json",
"Authorization": f"Bearer {self.__neon_api_key}",
},
)
assert resp.status_code == 200
return cast("Dict[str, Any]", resp.json())
def wait_for_operation_to_finish(self, project_id: str):
has_running = True
while has_running:
has_running = False
operations = self.get_operations(project_id)["operations"]
for op in operations:
if op["status"] in {"scheduling", "running", "cancelling"}:
has_running = True
time.sleep(0.5)

View File

@@ -87,8 +87,6 @@ from fixtures.utils import (
)
from fixtures.utils import AuxFileStore as AuxFileStore # reexport
from .neon_api import NeonAPI
"""
This file contains pytest fixtures. A fixture is a test resource that can be
summoned by placing its name in the test's arguments.
@@ -186,25 +184,6 @@ def versioned_pg_distrib_dir(pg_distrib_dir: Path, pg_version: PgVersion) -> Ite
yield versioned_dir
@pytest.fixture(scope="session")
def neon_api_key() -> str:
api_key = os.getenv("NEON_API_KEY")
if not api_key:
raise AssertionError("Set the NEON_API_KEY environment variable")
return api_key
@pytest.fixture(scope="session")
def neon_api_base_url() -> str:
return os.getenv("NEON_API_BASE_URL", "https://console-stage.neon.build/api/v2")
@pytest.fixture(scope="session")
def neon_api(neon_api_key: str, neon_api_base_url: str) -> NeonAPI:
return NeonAPI(neon_api_key, neon_api_base_url)
def shareable_scope(fixture_name: str, config: Config) -> Literal["session", "function"]:
"""Return either session of function scope, depending on TEST_SHARED_FIXTURES envvar.
@@ -2883,45 +2862,14 @@ class PgBin:
env.update(env_add)
return env
def _log_env(self, env: dict[str, str]) -> None:
env_s = {}
for k, v in env.items():
if k.startswith("PG") and k != "PGPASSWORD":
env_s[k] = v
log.debug(f"Environment: {env_s}")
def run_nonblocking(
self,
command: List[str],
env: Optional[Env] = None,
cwd: Optional[Union[str, Path]] = None,
) -> subprocess.Popen[Any]:
"""
Run one of the postgres binaries, not waiting for it to finish
The command should be in list form, e.g. ['pgbench', '-p', '55432']
All the necessary environment variables will be set.
If the first argument (the command name) doesn't include a path (no '/'
characters present), then it will be edited to include the correct path.
If you want stdout/stderr captured to files, use `run_capture` instead.
"""
self._fixpath(command)
log.info(f"Running command '{' '.join(command)}'")
env = self._build_env(env)
self._log_env(env)
return subprocess.Popen(command, env=env, cwd=cwd, stdout=subprocess.PIPE, text=True)
def run(
self,
command: List[str],
env: Optional[Env] = None,
cwd: Optional[Union[str, Path]] = None,
) -> None:
):
"""
Run one of the postgres binaries, waiting for it to finish
Run one of the postgres binaries.
The command should be in list form, e.g. ['pgbench', '-p', '55432']
@@ -2932,10 +2880,11 @@ class PgBin:
If you want stdout/stderr captured to files, use `run_capture` instead.
"""
proc = self.run_nonblocking(command, env, cwd)
proc.wait()
if proc.returncode != 0:
raise subprocess.CalledProcessError(proc.returncode, proc.args)
self._fixpath(command)
log.info(f"Running command '{' '.join(command)}'")
env = self._build_env(env)
subprocess.run(command, env=env, cwd=cwd, check=True)
def run_capture(
self,
@@ -2955,7 +2904,6 @@ class PgBin:
self._fixpath(command)
log.info(f"Running command '{' '.join(command)}'")
env = self._build_env(env)
self._log_env(env)
base_path, _, _ = subprocess_capture(
self.log_dir,
command,

View File

@@ -1,24 +1,8 @@
from __future__ import annotations
import time
import traceback
from typing import TYPE_CHECKING
import psycopg2
import psycopg2.extras
import pytest
from fixtures.benchmark_fixture import MetricReport
from fixtures.common_types import Lsn
from fixtures.log_helper import log
from fixtures.neon_api import connection_parameters_to_env
from fixtures.neon_fixtures import AuxFileStore, logical_replication_sync
from fixtures.pg_version import PgVersion
if TYPE_CHECKING:
from fixtures.benchmark_fixture import NeonBenchmarker
from fixtures.neon_api import NeonAPI
from fixtures.neon_fixtures import NeonEnv, PgBin
from fixtures.pg_version import PgVersion
from fixtures.neon_fixtures import AuxFileStore, NeonEnv, PgBin, logical_replication_sync
@pytest.mark.parametrize("pageserver_aux_file_policy", [AuxFileStore.V2])
@@ -42,6 +26,7 @@ def test_logical_replication(neon_simple_env: NeonEnv, pg_bin: PgBin, vanilla_pg
vanilla_pg.safe_psql("truncate table pgbench_history")
connstr = endpoint.connstr().replace("'", "''")
print(f"connstr='{connstr}'")
vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1")
# Wait logical replication channel to be established
@@ -57,286 +42,3 @@ def test_logical_replication(neon_simple_env: NeonEnv, pg_bin: PgBin, vanilla_pg
sum_master = endpoint.safe_psql("select sum(abalance) from pgbench_accounts")[0][0]
sum_replica = vanilla_pg.safe_psql("select sum(abalance) from pgbench_accounts")[0][0]
assert sum_master == sum_replica
def check_pgbench_still_running(pgbench, label=""):
rc = pgbench.poll()
if rc is not None:
raise RuntimeError(f"{label} pgbench terminated early with return code {rc}")
def measure_logical_replication_lag(sub_cur, pub_cur, timeout_sec=600):
start = time.time()
pub_cur.execute("SELECT pg_current_wal_flush_lsn()")
pub_lsn = Lsn(pub_cur.fetchall()[0][0])
while (time.time() - start) < timeout_sec:
sub_cur.execute("SELECT latest_end_lsn FROM pg_catalog.pg_stat_subscription")
res = sub_cur.fetchall()[0][0]
if res:
log.info(f"subscriber_lsn={res}")
sub_lsn = Lsn(res)
log.info(f"Subscriber LSN={sub_lsn}, publisher LSN={pub_lsn}")
if sub_lsn >= pub_lsn:
return time.time() - start
time.sleep(0.5)
raise TimeoutError(f"Logical replication sync took more than {timeout_sec} sec")
@pytest.mark.remote_cluster
@pytest.mark.timeout(2 * 60 * 60)
def test_subscriber_lag(
pg_bin: PgBin,
neon_api: NeonAPI,
pg_version: PgVersion,
zenbenchmark: NeonBenchmarker,
):
"""
Creates a publisher and subscriber, runs pgbench inserts on publisher and pgbench selects
on subscriber. Periodically restarts subscriber while still running the inserts, and
measures how long sync takes after restart.
"""
test_duration_min = 60
sync_interval_min = 5
pgbench_duration = f"-T{test_duration_min * 60 * 2}"
pub_project = neon_api.create_project(pg_version)
pub_project_id = pub_project["project"]["id"]
neon_api.wait_for_operation_to_finish(pub_project_id)
error_occurred = False
try:
sub_project = neon_api.create_project(pg_version)
sub_project_id = sub_project["project"]["id"]
sub_endpoint_id = sub_project["endpoints"][0]["id"]
neon_api.wait_for_operation_to_finish(sub_project_id)
try:
pub_env = connection_parameters_to_env(
pub_project["connection_uris"][0]["connection_parameters"]
)
sub_env = connection_parameters_to_env(
sub_project["connection_uris"][0]["connection_parameters"]
)
pub_connstr = pub_project["connection_uris"][0]["connection_uri"]
sub_connstr = sub_project["connection_uris"][0]["connection_uri"]
pg_bin.run_capture(["pgbench", "-i", "-s100"], env=pub_env)
pg_bin.run_capture(["pgbench", "-i", "-s100"], env=sub_env)
pub_conn = psycopg2.connect(pub_connstr)
sub_conn = psycopg2.connect(sub_connstr)
pub_conn.autocommit = True
sub_conn.autocommit = True
with pub_conn.cursor() as pub_cur, sub_conn.cursor() as sub_cur:
sub_cur.execute("truncate table pgbench_accounts")
sub_cur.execute("truncate table pgbench_history")
pub_cur.execute(
"create publication pub1 for table pgbench_accounts, pgbench_history"
)
sub_cur.execute(
f"create subscription sub1 connection '{pub_connstr}' publication pub1"
)
initial_sync_lag = measure_logical_replication_lag(sub_cur, pub_cur)
pub_conn.close()
sub_conn.close()
zenbenchmark.record(
"initial_sync_lag", initial_sync_lag, "s", MetricReport.LOWER_IS_BETTER
)
pub_workload = pg_bin.run_nonblocking(
["pgbench", "-c10", pgbench_duration, "-Mprepared"], env=pub_env
)
try:
sub_workload = pg_bin.run_nonblocking(
["pgbench", "-c10", pgbench_duration, "-S"],
env=sub_env,
)
try:
start = time.time()
while time.time() - start < test_duration_min * 60:
time.sleep(sync_interval_min * 60)
check_pgbench_still_running(pub_workload, "pub")
check_pgbench_still_running(sub_workload, "sub")
with psycopg2.connect(pub_connstr) as pub_conn, psycopg2.connect(
sub_connstr
) as sub_conn:
with pub_conn.cursor() as pub_cur, sub_conn.cursor() as sub_cur:
lag = measure_logical_replication_lag(sub_cur, pub_cur)
log.info(f"Replica lagged behind master by {lag} seconds")
zenbenchmark.record("replica_lag", lag, "s", MetricReport.LOWER_IS_BETTER)
sub_workload.terminate()
neon_api.restart_endpoint(
sub_project_id,
sub_endpoint_id,
)
neon_api.wait_for_operation_to_finish(sub_project_id)
sub_workload = pg_bin.run_nonblocking(
["pgbench", "-c10", pgbench_duration, "-S"],
env=sub_env,
)
# Measure storage to make sure replication information isn't bloating storage
sub_storage = neon_api.get_project_details(sub_project_id)["project"][
"synthetic_storage_size"
]
pub_storage = neon_api.get_project_details(pub_project_id)["project"][
"synthetic_storage_size"
]
zenbenchmark.record(
"sub_storage", sub_storage, "B", MetricReport.LOWER_IS_BETTER
)
zenbenchmark.record(
"pub_storage", pub_storage, "B", MetricReport.LOWER_IS_BETTER
)
finally:
sub_workload.terminate()
finally:
pub_workload.terminate()
except Exception as e:
error_occurred = True
log.error(f"Caught exception {e}")
log.error(traceback.format_exc())
finally:
if not error_occurred:
neon_api.delete_project(sub_project_id)
except Exception as e:
error_occurred = True
log.error(f"Caught exception {e}")
log.error(traceback.format_exc())
finally:
assert not error_occurred
neon_api.delete_project(pub_project_id)
@pytest.mark.remote_cluster
@pytest.mark.timeout(2 * 60 * 60)
def test_publisher_restart(
pg_bin: PgBin,
neon_api: NeonAPI,
pg_version: PgVersion,
zenbenchmark: NeonBenchmarker,
):
"""
Creates a publisher and subscriber, runs pgbench inserts on publisher and pgbench selects
on subscriber. Periodically restarts publisher (to exercise on-demand WAL download), and
measures how long sync takes after restart.
"""
test_duration_min = 60
sync_interval_min = 5
pgbench_duration = f"-T{test_duration_min * 60 * 2}"
pub_project = neon_api.create_project(pg_version)
pub_project_id = pub_project["project"]["id"]
pub_endpoint_id = pub_project["endpoints"][0]["id"]
neon_api.wait_for_operation_to_finish(pub_project_id)
error_occurred = False
try:
sub_project = neon_api.create_project(pg_version)
sub_project_id = sub_project["project"]["id"]
neon_api.wait_for_operation_to_finish(sub_project_id)
try:
pub_env = connection_parameters_to_env(
pub_project["connection_uris"][0]["connection_parameters"]
)
sub_env = connection_parameters_to_env(
sub_project["connection_uris"][0]["connection_parameters"]
)
pub_connstr = pub_project["connection_uris"][0]["connection_uri"]
sub_connstr = sub_project["connection_uris"][0]["connection_uri"]
pg_bin.run_capture(["pgbench", "-i", "-s100"], env=pub_env)
pg_bin.run_capture(["pgbench", "-i", "-s100"], env=sub_env)
pub_conn = psycopg2.connect(pub_connstr)
sub_conn = psycopg2.connect(sub_connstr)
pub_conn.autocommit = True
sub_conn.autocommit = True
with pub_conn.cursor() as pub_cur, sub_conn.cursor() as sub_cur:
sub_cur.execute("truncate table pgbench_accounts")
sub_cur.execute("truncate table pgbench_history")
pub_cur.execute(
"create publication pub1 for table pgbench_accounts, pgbench_history"
)
sub_cur.execute(
f"create subscription sub1 connection '{pub_connstr}' publication pub1"
)
initial_sync_lag = measure_logical_replication_lag(sub_cur, pub_cur)
pub_conn.close()
sub_conn.close()
zenbenchmark.record(
"initial_sync_lag", initial_sync_lag, "s", MetricReport.LOWER_IS_BETTER
)
pub_workload = pg_bin.run_nonblocking(
["pgbench", "-c10", pgbench_duration, "-Mprepared"], env=pub_env
)
try:
sub_workload = pg_bin.run_nonblocking(
["pgbench", "-c10", pgbench_duration, "-S"],
env=sub_env,
)
try:
start = time.time()
while time.time() - start < test_duration_min * 60:
time.sleep(sync_interval_min * 60)
check_pgbench_still_running(pub_workload, "pub")
check_pgbench_still_running(sub_workload, "sub")
pub_workload.terminate()
neon_api.restart_endpoint(
pub_project_id,
pub_endpoint_id,
)
neon_api.wait_for_operation_to_finish(pub_project_id)
pub_workload = pg_bin.run_nonblocking(
["pgbench", "-c10", pgbench_duration, "-Mprepared"],
env=pub_env,
)
with psycopg2.connect(pub_connstr) as pub_conn, psycopg2.connect(
sub_connstr
) as sub_conn:
with pub_conn.cursor() as pub_cur, sub_conn.cursor() as sub_cur:
lag = measure_logical_replication_lag(sub_cur, pub_cur)
log.info(f"Replica lagged behind master by {lag} seconds")
zenbenchmark.record("replica_lag", lag, "s", MetricReport.LOWER_IS_BETTER)
# Measure storage to make sure replication information isn't bloating storage
sub_storage = neon_api.get_project_details(sub_project_id)["project"][
"synthetic_storage_size"
]
pub_storage = neon_api.get_project_details(pub_project_id)["project"][
"synthetic_storage_size"
]
zenbenchmark.record(
"sub_storage", sub_storage, "B", MetricReport.LOWER_IS_BETTER
)
zenbenchmark.record(
"pub_storage", pub_storage, "B", MetricReport.LOWER_IS_BETTER
)
finally:
sub_workload.terminate()
finally:
pub_workload.terminate()
except Exception as e:
error_occurred = True
log.error(f"Caught exception {e}")
log.error(traceback.format_exc())
finally:
if not error_occurred:
neon_api.delete_project(sub_project_id)
except Exception as e:
error_occurred = True
log.error(f"Caught exception {e}")
log.error(traceback.format_exc())
finally:
assert not error_occurred
neon_api.delete_project(pub_project_id)

View File

@@ -1,296 +0,0 @@
from __future__ import annotations
import csv
import os
import subprocess
import time
import traceback
from pathlib import Path
from typing import TYPE_CHECKING
import psycopg2
import psycopg2.extras
import pytest
from fixtures.benchmark_fixture import MetricReport
from fixtures.common_types import Lsn
from fixtures.log_helper import log
from fixtures.neon_api import connection_parameters_to_env
from fixtures.pg_version import PgVersion
if TYPE_CHECKING:
from typing import Any, List, Optional
from fixtures.benchmark_fixture import NeonBenchmarker
from fixtures.neon_api import NeonAPI
from fixtures.neon_fixtures import PgBin
# Granularity of ~0.5 sec
def measure_replication_lag(master, replica, timeout_sec=600):
start = time.time()
master.execute("SELECT pg_current_wal_flush_lsn()")
master_lsn = Lsn(master.fetchall()[0][0])
while (time.time() - start) < timeout_sec:
replica.execute("select pg_last_wal_replay_lsn()")
replica_lsn = replica.fetchall()[0][0]
if replica_lsn:
if Lsn(replica_lsn) >= master_lsn:
return time.time() - start
time.sleep(0.5)
raise TimeoutError(f"Replication sync took more than {timeout_sec} sec")
def check_pgbench_still_running(pgbench):
rc = pgbench.poll()
if rc is not None:
raise RuntimeError(f"Pgbench terminated early with return code {rc}")
@pytest.mark.remote_cluster
@pytest.mark.timeout(2 * 60 * 60)
def test_ro_replica_lag(
pg_bin: PgBin,
neon_api: NeonAPI,
pg_version: PgVersion,
zenbenchmark: NeonBenchmarker,
):
test_duration_min = 60
sync_interval_min = 10
pgbench_duration = f"-T{test_duration_min * 60 * 2}"
project = neon_api.create_project(pg_version)
project_id = project["project"]["id"]
neon_api.wait_for_operation_to_finish(project_id)
error_occurred = False
try:
branch_id = project["branch"]["id"]
master_connstr = project["connection_uris"][0]["connection_uri"]
master_env = connection_parameters_to_env(
project["connection_uris"][0]["connection_parameters"]
)
replica = neon_api.create_endpoint(
project_id,
branch_id,
endpoint_type="read_only",
settings={"pg_settings": {"hot_standby_feedback": "on"}},
)
replica_env = master_env.copy()
replica_env["PGHOST"] = replica["endpoint"]["host"]
neon_api.wait_for_operation_to_finish(project_id)
replica_connstr = neon_api.get_connection_uri(
project_id,
endpoint_id=replica["endpoint"]["id"],
)["uri"]
pg_bin.run_capture(["pgbench", "-i", "-s100"], env=master_env)
master_workload = pg_bin.run_nonblocking(
["pgbench", "-c10", pgbench_duration, "-Mprepared"],
env=master_env,
)
try:
replica_workload = pg_bin.run_nonblocking(
["pgbench", "-c10", pgbench_duration, "-S"],
env=replica_env,
)
try:
start = time.time()
while time.time() - start < test_duration_min * 60:
check_pgbench_still_running(master_workload)
check_pgbench_still_running(replica_workload)
time.sleep(sync_interval_min * 60)
with psycopg2.connect(master_connstr) as conn_master, psycopg2.connect(
replica_connstr
) as conn_replica:
with conn_master.cursor() as cur_master, conn_replica.cursor() as cur_replica:
lag = measure_replication_lag(cur_master, cur_replica)
log.info(f"Replica lagged behind master by {lag} seconds")
zenbenchmark.record("replica_lag", lag, "s", MetricReport.LOWER_IS_BETTER)
finally:
replica_workload.terminate()
finally:
master_workload.terminate()
except Exception as e:
error_occurred = True
log.error(f"Caught exception: {e}")
log.error(traceback.format_exc())
finally:
assert not error_occurred # Fail the test if an error occurred
neon_api.delete_project(project_id)
def report_pgbench_aggregate_intervals(
output_dir: Path,
prefix: str,
zenbenchmark: NeonBenchmarker,
):
for filename in os.listdir(output_dir):
if filename.startswith(prefix):
# The file will be in the form <prefix>_<node>.<pid>
# So we first lop off the .<pid>, and then lop off the prefix and the _
node = filename.split(".")[0][len(prefix) + 1 :]
with open(output_dir / filename) as f:
reader = csv.reader(f, delimiter=" ")
for line in reader:
num_transactions = int(line[1])
if num_transactions == 0:
continue
sum_latency = int(line[2])
sum_lag = int(line[3])
zenbenchmark.record(
f"{node}_num_txns", num_transactions, "txns", MetricReport.HIGHER_IS_BETTER
)
zenbenchmark.record(
f"{node}_avg_latency",
sum_latency / num_transactions,
"s",
MetricReport.LOWER_IS_BETTER,
)
zenbenchmark.record(
f"{node}_avg_lag",
sum_lag / num_transactions,
"s",
MetricReport.LOWER_IS_BETTER,
)
@pytest.mark.remote_cluster
@pytest.mark.timeout(2 * 60 * 60)
def test_replication_start_stop(
pg_bin: PgBin,
test_output_dir: Path,
neon_api: NeonAPI,
pg_version: PgVersion,
zenbenchmark: NeonBenchmarker,
):
"""
Cycles through different configurations of read replicas being enabled disabled. The whole time,
there's a pgbench read/write workload going on the master. For each replica, we either turn it
on or off, and see how long it takes to catch up after some set amount of time of replicating
the pgbench.
"""
prefix = "pgbench_agg"
num_replicas = 2
configuration_test_time_sec = 10 * 60
pgbench_duration = f"-T{2 ** num_replicas * configuration_test_time_sec}"
error_occurred = False
project = neon_api.create_project(pg_version)
project_id = project["project"]["id"]
neon_api.wait_for_operation_to_finish(project_id)
try:
branch_id = project["branch"]["id"]
master_connstr = project["connection_uris"][0]["connection_uri"]
master_env = connection_parameters_to_env(
project["connection_uris"][0]["connection_parameters"]
)
replicas = []
for _ in range(num_replicas):
replicas.append(
neon_api.create_endpoint(
project_id,
branch_id,
endpoint_type="read_only",
settings={"pg_settings": {"hot_standby_feedback": "on"}},
)
)
neon_api.wait_for_operation_to_finish(project_id)
replica_connstr = [
neon_api.get_connection_uri(
project_id,
endpoint_id=replicas[i]["endpoint"]["id"],
)["uri"]
for i in range(num_replicas)
]
replica_env = [master_env.copy() for _ in range(num_replicas)]
for i in range(num_replicas):
replica_env[i]["PGHOST"] = replicas[i]["endpoint"]["host"]
pg_bin.run_capture(["pgbench", "-i", "-s10"], env=master_env)
# Sync replicas
with psycopg2.connect(master_connstr) as conn_master:
with conn_master.cursor() as cur_master:
for i in range(num_replicas):
conn_replica = psycopg2.connect(replica_connstr[i])
measure_replication_lag(cur_master, conn_replica.cursor())
master_pgbench = pg_bin.run_nonblocking(
[
"pgbench",
"-c10",
pgbench_duration,
"-Mprepared",
"--log",
f"--log-prefix={test_output_dir}/{prefix}_master",
f"--aggregate-interval={configuration_test_time_sec}",
],
env=master_env,
)
replica_pgbench: List[Optional[subprocess.Popen[Any]]] = [None for _ in range(num_replicas)]
# Use the bits of iconfig to tell us which configuration we are on. For example
# a iconfig of 2 is 10 in binary, indicating replica 0 is suspended and replica 1 is
# alive.
for iconfig in range((1 << num_replicas) - 1, -1, -1):
def replica_enabled(iconfig: int = iconfig):
return bool((iconfig >> 1) & 1)
# Change configuration
for ireplica in range(num_replicas):
if replica_enabled() and replica_pgbench[ireplica] is None:
replica_pgbench[ireplica] = pg_bin.run_nonblocking(
[
"pgbench",
"-c10",
"-S",
pgbench_duration,
"--log",
f"--log-prefix={test_output_dir}/{prefix}_replica_{ireplica}",
f"--aggregate-interval={configuration_test_time_sec}",
],
env=replica_env[ireplica],
)
elif not replica_enabled() and replica_pgbench[ireplica] is not None:
pgb = replica_pgbench[ireplica]
assert pgb is not None
pgb.terminate()
pgb.wait()
replica_pgbench[ireplica] = None
neon_api.suspend_endpoint(
project_id,
replicas[ireplica]["endpoint"]["id"],
)
neon_api.wait_for_operation_to_finish(project_id)
time.sleep(configuration_test_time_sec)
with psycopg2.connect(master_connstr) as conn_master:
with conn_master.cursor() as cur_master:
for ireplica in range(num_replicas):
replica_conn = psycopg2.connect(replica_connstr[ireplica])
lag = measure_replication_lag(cur_master, replica_conn.cursor())
zenbenchmark.record(
f"Replica {ireplica} lag", lag, "s", MetricReport.LOWER_IS_BETTER
)
log.info(
f"Replica {ireplica} lagging behind master by {lag} seconds after configuration {iconfig:>b}"
)
master_pgbench.terminate()
except Exception as e:
error_occurred = True
log.error(f"Caught exception {e}")
log.error(traceback.format_exc())
finally:
assert not error_occurred
neon_api.delete_project(project_id)
# Only report results if we didn't error out
report_pgbench_aggregate_intervals(test_output_dir, prefix, zenbenchmark)

View File

@@ -88,8 +88,7 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
env.pageserver.allowed_errors.extend(
[
".*Failed to import basebackup.*",
".*unexpected non-zero bytes after the tar archive.*",
".*error importing base backup .*",
".*Timeline got dropped without initializing, cleaning its files.*",
".*InternalServerError.*timeline not found.*",
".*InternalServerError.*Tenant .* not found.*",

View File

@@ -8,11 +8,8 @@ from typing import TYPE_CHECKING, cast
import pytest
from fixtures.neon_fixtures import (
Endpoint,
NeonEnv,
NeonEnvBuilder,
check_restored_datadir_content,
tenant_get_shards,
)
from fixtures.pg_version import PgVersion
from fixtures.remote_storage import s3_storage
@@ -24,97 +21,6 @@ if TYPE_CHECKING:
from pytest import CaptureFixture
TENANT_CONF = {
# Scaled down thresholds so that we are exercising the pageserver beyond just writing
# ephemeral/L0 layers, and because debug-mode code is slow to read from full sized ephemeral layer files.
"pitr_interval": "60s",
"checkpoint_distance": f"{8 * 1024 * 1024}",
"compaction_target_size": f"{8 * 1024 * 1024}",
}
# # Ensure that compaction works, on a timeline containing all the diversity that postgres regression tests create.
# # There should have been compactions mid-test as well, this final check is in addition those.
# for (shard, pageserver) in tenant_get_shards(env, env.initial_tenant):
# pageserver.http_client().timeline_checkpoint(env.initial_tenant, env.initial_timeline, force_repartition=True, force_image_layer_creation=True)
def post_checks(env: NeonEnv, test_output_dir: Path, db_name: str, endpoint: Endpoint):
"""
After running some opaque tests that create interesting content in a timeline, run
some generic integrity checks that the storage stack is able to reproduce the written
data properly.
"""
ignored_files: Optional[list[str]] = None
# Neon handles unlogged relations in a special manner. During a
# basebackup, we ship the init fork as the main fork. This presents a
# problem in that the endpoint's data directory and the basebackup will
# have differences and will fail the eventual file comparison.
#
# Unlogged tables were introduced in version 9.1. ALTER TABLE grew
# support for setting the persistence of a table in 9.5. The reason that
# this doesn't affect versions < 15 (but probably would between 9.1 and
# 9.5) is that all the regression tests that deal with unlogged tables
# up until that point dropped the unlogged tables or set them to logged
# at some point during the test.
#
# In version 15, Postgres grew support for unlogged sequences, and with
# that came a few more regression tests. These tests did not all drop
# the unlogged tables/sequences prior to finishing.
#
# But unlogged sequences came with a bug in that, sequences didn't
# inherit the persistence of their "parent" tables if they had one. This
# was fixed and backported to 15, thus exacerbating our problem a bit.
#
# So what we can do is just ignore file differences between the data
# directory and basebackup for unlogged relations.
results = cast(
"list[tuple[str, str]]",
endpoint.safe_psql(
"""
SELECT
relkind,
pg_relation_filepath(
pg_filenode_relation(reltablespace, relfilenode)
) AS unlogged_relation_paths
FROM pg_class
WHERE relpersistence = 'u'
""",
dbname=db_name,
),
)
unlogged_relation_files: list[str] = []
for r in results:
unlogged_relation_files.append(r[1])
# This is related to the following Postgres commit:
#
# commit ccadf73163ca88bdaa74b8223d4dde05d17f550b
# Author: Heikki Linnakangas <heikki.linnakangas@iki.fi>
# Date: 2023-08-23 09:21:31 -0500
#
# Use the buffer cache when initializing an unlogged index.
#
# This patch was backpatched to 16. Without it, the LSN in the
# page header would be 0/0 in the data directory, which wouldn't
# match the LSN generated during the basebackup, thus creating
# a difference.
if env.pg_version <= PgVersion.V15 and r[0] == "i":
unlogged_relation_files.append(f"{r[1]}_init")
ignored_files = unlogged_relation_files
check_restored_datadir_content(test_output_dir, env, endpoint, ignored_files=ignored_files)
# Ensure that compaction works, on a timeline containing all the diversity that postgres regression tests create.
# There should have been compactions mid-test as well, this final check is in addition those.
for shard, pageserver in tenant_get_shards(env, env.initial_tenant):
pageserver.http_client().timeline_checkpoint(
shard, env.initial_timeline, force_repartition=True, force_image_layer_creation=True
)
# Run the main PostgreSQL regression tests, in src/test/regress.
#
@pytest.mark.timeout(600)
@@ -139,10 +45,7 @@ def test_pg_regress(
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.enable_scrub_on_exit()
env = neon_env_builder.init_start(
initial_tenant_conf=TENANT_CONF,
initial_tenant_shard_count=shard_count,
)
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)
# Connect to postgres and create a database called "regression".
endpoint = env.endpoints.create_start("main")
@@ -181,7 +84,67 @@ def test_pg_regress(
with capsys.disabled():
pg_bin.run(pg_regress_command, env=env_vars, cwd=runpath)
post_checks(env, test_output_dir, DBNAME, endpoint)
ignored_files: Optional[list[str]] = None
# Neon handles unlogged relations in a special manner. During a
# basebackup, we ship the init fork as the main fork. This presents a
# problem in that the endpoint's data directory and the basebackup will
# have differences and will fail the eventual file comparison.
#
# Unlogged tables were introduced in version 9.1. ALTER TABLE grew
# support for setting the persistence of a table in 9.5. The reason that
# this doesn't affect versions < 15 (but probably would between 9.1 and
# 9.5) is that all the regression tests that deal with unlogged tables
# up until that point dropped the unlogged tables or set them to logged
# at some point during the test.
#
# In version 15, Postgres grew support for unlogged sequences, and with
# that came a few more regression tests. These tests did not all drop
# the unlogged tables/sequences prior to finishing.
#
# But unlogged sequences came with a bug in that, sequences didn't
# inherit the persistence of their "parent" tables if they had one. This
# was fixed and backported to 15, thus exacerbating our problem a bit.
#
# So what we can do is just ignore file differences between the data
# directory and basebackup for unlogged relations.
results = cast(
"list[tuple[str, str]]",
endpoint.safe_psql(
"""
SELECT
relkind,
pg_relation_filepath(
pg_filenode_relation(reltablespace, relfilenode)
) AS unlogged_relation_paths
FROM pg_class
WHERE relpersistence = 'u'
""",
dbname=DBNAME,
),
)
unlogged_relation_files: list[str] = []
for r in results:
unlogged_relation_files.append(r[1])
# This is related to the following Postgres commit:
#
# commit ccadf73163ca88bdaa74b8223d4dde05d17f550b
# Author: Heikki Linnakangas <heikki.linnakangas@iki.fi>
# Date: 2023-08-23 09:21:31 -0500
#
# Use the buffer cache when initializing an unlogged index.
#
# This patch was backpatched to 16. Without it, the LSN in the
# page header would be 0/0 in the data directory, which wouldn't
# match the LSN generated during the basebackup, thus creating
# a difference.
if env.pg_version <= PgVersion.V15 and r[0] == "i":
unlogged_relation_files.append(f"{r[1]}_init")
ignored_files = unlogged_relation_files
check_restored_datadir_content(test_output_dir, env, endpoint, ignored_files=ignored_files)
# Run the PostgreSQL "isolation" tests, in src/test/isolation.
@@ -196,20 +159,16 @@ def test_isolation(
pg_distrib_dir: Path,
shard_count: Optional[int],
):
DBNAME = "isolation_regression"
if shard_count is not None:
neon_env_builder.num_pageservers = shard_count
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.enable_scrub_on_exit()
env = neon_env_builder.init_start(
initial_tenant_conf=TENANT_CONF, initial_tenant_shard_count=shard_count
)
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)
# Connect to postgres and create a database called "regression".
# isolation tests use prepared transactions, so enable them
endpoint = env.endpoints.create_start("main", config_lines=["max_prepared_transactions=100"])
endpoint.safe_psql(f"CREATE DATABASE {DBNAME}")
endpoint.safe_psql("CREATE DATABASE isolation_regression")
# Create some local directories for pg_isolation_regress to run in.
runpath = test_output_dir / "regress"
@@ -243,9 +202,6 @@ def test_isolation(
with capsys.disabled():
pg_bin.run(pg_isolation_regress_command, env=env_vars, cwd=runpath)
# This fails with a mismatch on `pg_multixact/offsets/0000`
# post_checks(env, test_output_dir, DBNAME, endpoint)
# Run extra Neon-specific pg_regress-based tests. The tests and their
# schedule file are in the sql_regress/ directory.
@@ -259,19 +215,15 @@ def test_sql_regress(
pg_distrib_dir: Path,
shard_count: Optional[int],
):
DBNAME = "regression"
if shard_count is not None:
neon_env_builder.num_pageservers = shard_count
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.enable_scrub_on_exit()
env = neon_env_builder.init_start(
initial_tenant_conf=TENANT_CONF, initial_tenant_shard_count=shard_count
)
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)
# Connect to postgres and create a database called "regression".
endpoint = env.endpoints.create_start("main")
endpoint.safe_psql(f"CREATE DATABASE {DBNAME}")
endpoint.safe_psql("CREATE DATABASE regression")
# Create some local directories for pg_regress to run in.
runpath = test_output_dir / "regress"
@@ -306,4 +258,4 @@ def test_sql_regress(
with capsys.disabled():
pg_bin.run(pg_regress_command, env=env_vars, cwd=runpath)
post_checks(env, test_output_dir, DBNAME, endpoint)
check_restored_datadir_content(test_output_dir, env, endpoint)

View File

@@ -1,11 +1,7 @@
from __future__ import annotations
import random
import time
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from fixtures.neon_fixtures import NeonEnv
from fixtures.neon_fixtures import NeonEnv
def test_physical_replication(neon_simple_env: NeonEnv):

View File

@@ -54,4 +54,4 @@ def test_subscriber_restart(neon_simple_env: NeonEnv):
pcur.execute(f"INSERT into t values ({n_records}, 0)")
n_records += 1
with sub.cursor() as scur:
wait_until(60, 0.5, check_that_changes_propagated)
wait_until(10, 0.5, check_that_changes_propagated)

View File

@@ -720,30 +720,9 @@ def test_lsn_lease_size(neon_env_builder: NeonEnvBuilder, test_output_dir: Path,
They should have the same effect.
"""
def assert_size_approx_equal_for_lease_test(size_lease, size_branch):
"""
Tests that evaluate sizes are checking the pageserver space consumption
that sits many layers below the user input. The exact space needed
varies slightly depending on postgres behavior.
Rather than expecting postgres to be determinstic and occasionally
failing the test, we permit sizes for the same data to vary by a few pages.
"""
# FIXME(yuchen): The delta is too large, used as temp solution to pass the test reliably.
# Investigate and reduce the threshold.
threshold = 22 * 8272
log.info(
f"delta: size_branch({size_branch}) - size_lease({size_lease}) = {size_branch - size_lease}"
)
assert size_lease == pytest.approx(size_branch, abs=threshold)
conf = {
"pitr_interval": "0s" if zero_gc else "3600s",
"gc_period": "0s",
"compaction_period": "0s",
}
env = neon_env_builder.init_start(initial_tenant_conf=conf)
@@ -755,7 +734,7 @@ def test_lsn_lease_size(neon_env_builder: NeonEnvBuilder, test_output_dir: Path,
tenant, timeline = env.neon_cli.create_tenant(conf=conf)
lease_res = insert_with_action(env, tenant, timeline, test_output_dir, action="lease")
assert_size_approx_equal_for_lease_test(lease_res, ro_branch_res)
assert_size_approx_equal(lease_res, ro_branch_res)
def insert_with_action(
@@ -775,11 +754,7 @@ def insert_with_action(
"""
client = env.pageserver.http_client()
with env.endpoints.create_start(
"main",
tenant_id=tenant,
config_lines=["autovacuum=off"],
) as ep:
with env.endpoints.create_start("main", tenant_id=tenant) as ep:
initial_size = client.tenant_size(tenant)
log.info(f"initial size: {initial_size}")

View File

@@ -152,12 +152,10 @@ def test_timeline_size_quota_on_startup(neon_env_builder: NeonEnvBuilder):
client.timeline_wait_logical_size(env.initial_tenant, new_timeline_id)
size_limit_mb = 30
endpoint_main = env.endpoints.create(
"test_timeline_size_quota_on_startup",
# Set small limit for the test
config_lines=[f"neon.max_cluster_size={size_limit_mb}MB"],
config_lines=["neon.max_cluster_size=30MB"],
)
endpoint_main.start()
@@ -167,39 +165,17 @@ def test_timeline_size_quota_on_startup(neon_env_builder: NeonEnvBuilder):
# Insert many rows. This query must fail because of space limit
try:
def write_rows(count):
for _i in range(count):
cur.execute(
"""
INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 100) g
for _i in range(5000):
cur.execute(
"""
)
INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 100) g
"""
)
# Write some data that exceeds limit, then let the pageserver ingest it to guarantee that some feedback has made it to
# the safekeeper, then try to write some more. We expect either the initial writes or the ones after
# the wait_for_last_flush_lsn to generate an exception.
#
# Without the wait_for_last_flush_lsn, the size limit sometimes isn't enforced (see https://github.com/neondatabase/neon/issues/6562)
write_rows(2500)
wait_for_last_flush_lsn(env, endpoint_main, env.initial_tenant, new_timeline_id)
logical_size = env.pageserver.http_client().timeline_detail(
env.initial_tenant, new_timeline_id
)["current_logical_size"]
assert logical_size > size_limit_mb * 1024 * 1024
write_rows(2500)
# If we get here, the timeline size limit failed. Find out from the pageserver how large it
# thinks the timeline is.
wait_for_last_flush_lsn(env, endpoint_main, env.initial_tenant, new_timeline_id)
logical_size = env.pageserver.http_client().timeline_detail(
env.initial_tenant, new_timeline_id
)["current_logical_size"]
log.error(
f"Query unexpectedly succeeded, pageserver logical size is {logical_size}"
)
# If we get here, the timeline size limit failed
log.error("Query unexpectedly succeeded")
raise AssertionError()
except psycopg2.errors.DiskFull as err:

View File

@@ -30,12 +30,13 @@ chrono = { version = "0.4", default-features = false, features = ["clock", "serd
clap = { version = "4", features = ["derive", "string"] }
clap_builder = { version = "4", default-features = false, features = ["color", "help", "std", "string", "suggestions", "usage"] }
crossbeam-utils = { version = "0.8" }
deranged = { version = "0.3", default-features = false, features = ["powerfmt", "serde", "std"] }
either = { version = "1" }
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
futures-channel = { version = "0.3", features = ["sink"] }
futures-core = { version = "0.3" }
futures-executor = { version = "0.3" }
futures-io = { version = "0.3" }
futures-sink = { version = "0.3" }
futures-util = { version = "0.3", features = ["channel", "io", "sink"] }
getrandom = { version = "0.2", default-features = false, features = ["std"] }
hashbrown = { version = "0.14", features = ["raw"] }
@@ -68,7 +69,6 @@ sha2 = { version = "0.10", features = ["asm"] }
smallvec = { version = "1", default-features = false, features = ["const_new", "write"] }
subtle = { version = "2" }
sync_wrapper = { version = "0.1", default-features = false, features = ["futures"] }
tikv-jemalloc-sys = { version = "0.5" }
time = { version = "0.3", features = ["macros", "serde-well-known"] }
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "test-util"] }
tokio-rustls = { version = "0.24" }
@@ -106,9 +106,7 @@ num-integer = { version = "0.1", features = ["i128"] }
num-traits = { version = "0.2", features = ["i128", "libm"] }
once_cell = { version = "1" }
parquet = { git = "https://github.com/apache/arrow-rs", branch = "master", default-features = false, features = ["zstd"] }
proc-macro2 = { version = "1" }
prost = { version = "0.11" }
quote = { version = "1" }
regex = { version = "1" }
regex-automata = { version = "0.4", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] }
regex-syntax = { version = "0.8" }