Compare commits

..

2 Commits

Author SHA1 Message Date
Christian Schwarz
2c0f0fb962 increase safekeeper stack size 2024-07-09 13:36:20 +00:00
Christian Schwarz
15aea8c8c2 postgres_backend: stop using async_trait 2024-07-05 21:40:15 +02:00
52 changed files with 921 additions and 2189 deletions

View File

@@ -115,7 +115,6 @@ runs:
export POSTGRES_DISTRIB_DIR=${POSTGRES_DISTRIB_DIR:-/tmp/neon/pg_install}
export DEFAULT_PG_VERSION=${PG_VERSION#v}
export LD_LIBRARY_PATH=${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/lib
export BENCHMARK_CONNSTR=${BENCHMARK_CONNSTR:-}
if [ "${BUILD_TYPE}" = "remote" ]; then
export REMOTE_ENV=1

View File

@@ -99,14 +99,7 @@ jobs:
# Set --sparse-ordering option of pytest-order plugin
# to ensure tests are running in order of appears in the file.
# It's important for test_perf_pgbench.py::test_pgbench_remote_* tests
extra_params:
-m remote_cluster
--sparse-ordering
--timeout 5400
--ignore test_runner/performance/test_perf_olap.py
--ignore test_runner/performance/test_perf_pgvector_queries.py
--ignore test_runner/performance/test_logical_replication.py
--ignore test_runner/performance/test_physical_replication.py
extra_params: -m remote_cluster --sparse-ordering --timeout 5400 --ignore test_runner/performance/test_perf_olap.py --ignore test_runner/performance/test_perf_pgvector_queries.py
env:
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
@@ -132,69 +125,6 @@ jobs:
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
replication-tests:
env:
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
DEFAULT_PG_VERSION: 14
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
PLATFORM: "neon-staging"
runs-on: [ self-hosted, us-east-2, x64 ]
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:pinned
options: --init
steps:
- uses: actions/checkout@v4
- name: Download Neon artifact
uses: ./.github/actions/download
with:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
- name: Run benchmark
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance/test_logical_replication.py
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 5400
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
NEON_API_KEY: ${{ secrets.NEON_STAGING_API_KEY }}
- name: Run benchmark
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance/test_physical_replication.py
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 5400
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
NEON_API_KEY: ${{ secrets.NEON_STAGING_API_KEY }}
- name: Create Allure report
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}
uses: slackapi/slack-github-action@v1
with:
channel-id: "C033QLM5P7D" # dev-staging-stream
slack-message: "Periodic replication testing: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
generate-matrices:
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
# Create matrices for the benchmarking jobs, so we run benchmarks on rds only once a week (on Saturday)

View File

@@ -1336,7 +1336,6 @@ jobs:
env:
BUCKET: neon-github-public-dev
PREFIX: artifacts/latest
COMMIT_SHA: ${{ github.event.pull_request.head.sha || github.sha }}
run: |
# Update compatibility snapshot for the release
for pg_version in v14 v15 v16; do
@@ -1350,7 +1349,7 @@ jobs:
# Update Neon artifact for the release (reuse already uploaded artifact)
for build_type in debug release; do
OLD_PREFIX=artifacts/${COMMIT_SHA}/${GITHUB_RUN_ID}
OLD_PREFIX=artifacts/${GITHUB_RUN_ID}
FILENAME=neon-${{ runner.os }}-${{ runner.arch }}-${build_type}-artifact.tar.zst
S3_KEY=$(aws s3api list-objects-v2 --bucket ${BUCKET} --prefix ${OLD_PREFIX} | jq -r '.Contents[]?.Key' | grep ${FILENAME} | sort --version-sort | tail -1 || true)

71
Cargo.lock generated
View File

@@ -1397,9 +1397,9 @@ dependencies = [
[[package]]
name = "crc32c"
version = "0.6.8"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a47af21622d091a8f0fb295b88bc886ac74efcc613efc19f5d0b21de5c89e47"
checksum = "89254598aa9b9fa608de44b3ae54c810f0f06d755e24c50177f1f8f31ff50ce2"
dependencies = [
"rustc_version",
]
@@ -1651,16 +1651,6 @@ dependencies = [
"rusticata-macros",
]
[[package]]
name = "deranged"
version = "0.3.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4"
dependencies = [
"powerfmt",
"serde",
]
[[package]]
name = "desim"
version = "0.1.0"
@@ -3018,9 +3008,9 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
[[package]]
name = "measured"
version = "0.0.22"
version = "0.0.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3051f3a030d55d680cdef6ca50e80abd1182f8da29f2344a7c9cb575721138f0"
checksum = "652bc741286361c06de8cb4d89b21a6437f120c508c51713663589eeb9928ac5"
dependencies = [
"bytes",
"crossbeam-utils",
@@ -3036,9 +3026,9 @@ dependencies = [
[[package]]
name = "measured-derive"
version = "0.0.22"
version = "0.0.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9e6777fc80a575f9503d908c8b498782a6c3ee88a06cb416dc3941401e43b94"
checksum = "6ea497f33e1e856a376c32ad916f69a0bd3c597db1f912a399f842b01a4a685d"
dependencies = [
"heck 0.5.0",
"proc-macro2",
@@ -3048,9 +3038,9 @@ dependencies = [
[[package]]
name = "measured-process"
version = "0.0.22"
version = "0.0.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c4b80445aeb08e832d87bf1830049a924cdc1d6b7ef40b6b9b365bff17bf8ec"
checksum = "b364ccb66937a814b6b2ad751d1a2f7a9d5a78c761144036825fb36bb0771000"
dependencies = [
"libc",
"measured",
@@ -3285,12 +3275,6 @@ dependencies = [
"num-traits",
]
[[package]]
name = "num-conv"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9"
[[package]]
name = "num-integer"
version = "0.1.45"
@@ -3683,7 +3667,6 @@ dependencies = [
"sysinfo",
"tenant_size_model",
"thiserror",
"tikv-jemallocator",
"tokio",
"tokio-epoll-uring",
"tokio-io-timeout",
@@ -4081,7 +4064,6 @@ name = "postgres_backend"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"bytes",
"futures",
"once_cell",
@@ -4094,7 +4076,6 @@ dependencies = [
"tokio-postgres",
"tokio-postgres-rustls",
"tokio-rustls 0.25.0",
"tokio-util",
"tracing",
"workspace_hack",
]
@@ -4135,12 +4116,6 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "powerfmt"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391"
[[package]]
name = "ppv-lite86"
version = "0.2.17"
@@ -5420,9 +5395,9 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4"
[[package]]
name = "serde"
version = "1.0.203"
version = "1.0.183"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7253ab4de971e72fb7be983802300c30b5a7f0c2e56fab8abfc6a214307c0094"
checksum = "32ac8da02677876d532745a130fc9d8e6edfa81a269b107c5b00829b91d8eb3c"
dependencies = [
"serde_derive",
]
@@ -5439,9 +5414,9 @@ dependencies = [
[[package]]
name = "serde_derive"
version = "1.0.203"
version = "1.0.183"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "500cbc0ebeb6f46627f50f3f5811ccf6bf00643be300b4c3eabc0ef55dc5b5ba"
checksum = "aafe972d60b0b9bee71a91b92fee2d4fb3c9d7e8f6b179aa99f27203d99a4816"
dependencies = [
"proc-macro2",
"quote",
@@ -6131,15 +6106,12 @@ dependencies = [
[[package]]
name = "time"
version = "0.3.36"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5dfd88e563464686c916c7e46e623e520ddc6d79fa6641390f2e3fa86e83e885"
checksum = "8f3403384eaacbca9923fa06940178ac13e4edb725486d70e8e15881d0c836cc"
dependencies = [
"deranged",
"itoa",
"js-sys",
"num-conv",
"powerfmt",
"serde",
"time-core",
"time-macros",
@@ -6147,17 +6119,16 @@ dependencies = [
[[package]]
name = "time-core"
version = "0.1.2"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3"
checksum = "7300fbefb4dadc1af235a9cef3737cea692a9d97e1b9cbcd4ebdae6f8868e6fb"
[[package]]
name = "time-macros"
version = "0.2.18"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f252a68540fde3a3877aeea552b832b40ab9a69e318efd078774a01ddee1ccf"
checksum = "372950940a5f07bf38dbe211d7283c9e6d7327df53794992d293e534c733d09b"
dependencies = [
"num-conv",
"time-core",
]
@@ -7455,12 +7426,13 @@ dependencies = [
"clap",
"clap_builder",
"crossbeam-utils",
"deranged",
"either",
"fail",
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-util",
"getrandom 0.2.11",
"hashbrown 0.14.5",
@@ -7478,9 +7450,7 @@ dependencies = [
"num-traits",
"once_cell",
"parquet",
"proc-macro2",
"prost",
"quote",
"rand 0.8.5",
"regex",
"regex-automata 0.4.3",
@@ -7497,7 +7467,6 @@ dependencies = [
"syn 1.0.109",
"syn 2.0.52",
"sync_wrapper",
"tikv-jemalloc-sys",
"time",
"time-macros",
"tokio",

View File

@@ -111,8 +111,8 @@ lasso = "0.7"
leaky-bucket = "1.0.1"
libc = "0.2"
md5 = "0.7.0"
measured = { version = "0.0.22", features=["lasso"] }
measured-process = { version = "0.0.22" }
measured = { version = "0.0.21", features=["lasso"] }
measured-process = { version = "0.0.21" }
memoffset = "0.8"
nix = { version = "0.27", features = ["fs", "process", "socket", "signal", "poll"] }
notify = "6.0.0"

View File

@@ -798,11 +798,7 @@ impl ComputeNode {
// In this case we need to connect with old `zenith_admin` name
// and create new user. We cannot simply rename connected user,
// but we can create a new one and grant it all privileges.
let mut connstr = self.connstr.clone();
connstr
.query_pairs_mut()
.append_pair("application_name", "apply_config");
let connstr = self.connstr.clone();
let mut client = match Client::connect(connstr.as_str(), NoTls) {
Err(e) => match e.code() {
Some(&SqlState::INVALID_PASSWORD)
@@ -871,11 +867,6 @@ impl ComputeNode {
// Run migrations separately to not hold up cold starts
thread::spawn(move || {
let mut connstr = connstr.clone();
connstr
.query_pairs_mut()
.append_pair("application_name", "migrations");
let mut client = Client::connect(connstr.as_str(), NoTls)?;
handle_migrations(&mut client).context("apply_config handle_migrations")
});
@@ -1395,9 +1386,7 @@ pub fn forward_termination_signal() {
let pg_pid = PG_PID.load(Ordering::SeqCst);
if pg_pid != 0 {
let pg_pid = nix::unistd::Pid::from_raw(pg_pid as i32);
// Use 'fast' shutdown (SIGINT) because it also creates a shutdown checkpoint, which is important for
// ROs to get a list of running xacts faster instead of going through the CLOG.
// See https://www.postgresql.org/docs/current/server-shutdown.html for the list of modes and signals.
kill(pg_pid, Signal::SIGINT).ok();
// use 'immediate' shutdown (SIGQUIT): https://www.postgresql.org/docs/current/server-shutdown.html
kill(pg_pid, Signal::SIGQUIT).ok();
}
}

View File

@@ -11,7 +11,6 @@ pub mod logger;
pub mod catalog;
pub mod compute;
pub mod extension_server;
mod migration;
pub mod monitor;
pub mod params;
pub mod pg_helpers;

View File

@@ -1,100 +0,0 @@
use anyhow::{Context, Result};
use postgres::Client;
use tracing::info;
pub(crate) struct MigrationRunner<'m> {
client: &'m mut Client,
migrations: &'m [&'m str],
}
impl<'m> MigrationRunner<'m> {
pub fn new(client: &'m mut Client, migrations: &'m [&'m str]) -> Self {
Self { client, migrations }
}
fn get_migration_id(&mut self) -> Result<i64> {
let query = "SELECT id FROM neon_migration.migration_id";
let row = self
.client
.query_one(query, &[])
.context("run_migrations get migration_id")?;
Ok(row.get::<&str, i64>("id"))
}
fn update_migration_id(&mut self) -> Result<()> {
let setval = format!(
"UPDATE neon_migration.migration_id SET id={}",
self.migrations.len()
);
self.client
.simple_query(&setval)
.context("run_migrations update id")?;
Ok(())
}
fn prepare_migrations(&mut self) -> Result<()> {
let query = "CREATE SCHEMA IF NOT EXISTS neon_migration";
self.client.simple_query(query)?;
let query = "CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)";
self.client.simple_query(query)?;
let query = "INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING";
self.client.simple_query(query)?;
let query = "ALTER SCHEMA neon_migration OWNER TO cloud_admin";
self.client.simple_query(query)?;
let query = "REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC";
self.client.simple_query(query)?;
Ok(())
}
pub fn run_migrations(mut self) -> Result<()> {
self.prepare_migrations()?;
let mut current_migration: usize = self.get_migration_id()? as usize;
let starting_migration_id = current_migration;
let query = "BEGIN";
self.client
.simple_query(query)
.context("run_migrations begin")?;
while current_migration < self.migrations.len() {
let migration = self.migrations[current_migration];
if migration.starts_with("-- SKIP") {
info!("Skipping migration id={}", current_migration);
} else {
info!(
"Running migration id={}:\n{}\n",
current_migration, migration
);
self.client.simple_query(migration).with_context(|| {
format!("run_migration current_migration={}", current_migration)
})?;
}
current_migration += 1;
}
self.update_migration_id()?;
let query = "COMMIT";
self.client
.simple_query(query)
.context("run_migrations commit")?;
info!(
"Ran {} migrations",
(self.migrations.len() - starting_migration_id)
);
Ok(())
}
}

View File

@@ -10,7 +10,6 @@ use tracing::{error, info, info_span, instrument, span_enabled, warn, Level};
use crate::config;
use crate::logger::inlinify;
use crate::migration::MigrationRunner;
use crate::params::PG_HBA_ALL_MD5;
use crate::pg_helpers::*;
@@ -792,7 +791,69 @@ pub fn handle_migrations(client: &mut Client) -> Result<()> {
include_str!("./migrations/0008-revoke_replication_for_previously_allowed_roles.sql"),
];
MigrationRunner::new(client, &migrations).run_migrations()?;
let mut func = || {
let query = "CREATE SCHEMA IF NOT EXISTS neon_migration";
client.simple_query(query)?;
let query = "CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)";
client.simple_query(query)?;
let query = "INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING";
client.simple_query(query)?;
let query = "ALTER SCHEMA neon_migration OWNER TO cloud_admin";
client.simple_query(query)?;
let query = "REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC";
client.simple_query(query)?;
Ok::<_, anyhow::Error>(())
};
func().context("handle_migrations prepare")?;
let query = "SELECT id FROM neon_migration.migration_id";
let row = client
.query_one(query, &[])
.context("handle_migrations get migration_id")?;
let mut current_migration: usize = row.get::<&str, i64>("id") as usize;
let starting_migration_id = current_migration;
let query = "BEGIN";
client
.simple_query(query)
.context("handle_migrations begin")?;
while current_migration < migrations.len() {
let migration = &migrations[current_migration];
if migration.starts_with("-- SKIP") {
info!("Skipping migration id={}", current_migration);
} else {
info!(
"Running migration id={}:\n{}\n",
current_migration, migration
);
client.simple_query(migration).with_context(|| {
format!("handle_migrations current_migration={}", current_migration)
})?;
}
current_migration += 1;
}
let setval = format!(
"UPDATE neon_migration.migration_id SET id={}",
migrations.len()
);
client
.simple_query(&setval)
.context("handle_migrations update id")?;
let query = "COMMIT";
client
.simple_query(query)
.context("handle_migrations commit")?;
info!(
"Ran {} migrations",
(migrations.len() - starting_migration_id)
);
Ok(())
}

View File

@@ -13,7 +13,11 @@ use std::{
use measured::{
label::{LabelGroupVisitor, LabelName, LabelValue, LabelVisitor},
metric::{counter::CounterState, name::MetricNameEncoder, Metric, MetricType, MetricVec},
metric::{
group::{Encoding, MetricValue},
name::MetricNameEncoder,
Metric, MetricType, MetricVec,
},
text::TextEncoder,
LabelGroup,
};
@@ -140,7 +144,6 @@ impl<const N: usize> HyperLogLogState<N> {
})
}
}
impl<W: std::io::Write, const N: usize> measured::metric::MetricEncoding<TextEncoder<W>>
for HyperLogLogState<N>
{
@@ -179,13 +182,12 @@ impl<W: std::io::Write, const N: usize> measured::metric::MetricEncoding<TextEnc
.into_iter()
.enumerate()
.try_for_each(|(hll_shard, val)| {
CounterState::new(val as u64).collect_into(
&(),
enc.write_metric_value(
name.by_ref(),
labels.by_ref().compose_with(HllShardLabel {
hll_shard: hll_shard as i64,
}),
name.by_ref(),
enc,
MetricValue::Int(val as i64),
)
})
}

View File

@@ -9,7 +9,7 @@ use measured::{
metric::{
counter::CounterState,
gauge::GaugeState,
group::Encoding,
group::{Encoding, MetricValue},
name::{MetricName, MetricNameEncoder},
MetricEncoding, MetricFamilyEncoding,
},
@@ -171,11 +171,8 @@ fn write_gauge<Enc: Encoding>(
labels: impl LabelGroup,
name: impl MetricNameEncoder,
enc: &mut Enc,
) -> Result<(), Enc::Err>
where
GaugeState: MetricEncoding<Enc>,
{
GaugeState::new(x).collect_into(&(), labels, name, enc)
) -> Result<(), Enc::Err> {
enc.write_metric_value(name, labels, MetricValue::Int(x))
}
#[derive(Default)]
@@ -547,6 +544,15 @@ impl<T: Encoding> Encoding for Inc<T> {
fn write_help(&mut self, name: impl MetricNameEncoder, help: &str) -> Result<(), Self::Err> {
self.0.write_help(name, help)
}
fn write_metric_value(
&mut self,
name: impl MetricNameEncoder,
labels: impl LabelGroup,
value: MetricValue,
) -> Result<(), Self::Err> {
self.0.write_metric_value(name, labels, value)
}
}
impl<T: Encoding> MetricEncoding<Inc<T>> for MeasuredCounterPairState
@@ -573,6 +579,15 @@ impl<T: Encoding> Encoding for Dec<T> {
fn write_help(&mut self, name: impl MetricNameEncoder, help: &str) -> Result<(), Self::Err> {
self.0.write_help(name, help)
}
fn write_metric_value(
&mut self,
name: impl MetricNameEncoder,
labels: impl LabelGroup,
value: MetricValue,
) -> Result<(), Self::Err> {
self.0.write_metric_value(name, labels, value)
}
}
/// Write the dec counter to the encoder

View File

@@ -9,7 +9,6 @@ use std::{
collections::HashMap,
io::{BufRead, Read},
num::{NonZeroU64, NonZeroUsize},
str::FromStr,
sync::atomic::AtomicUsize,
time::{Duration, SystemTime},
};
@@ -438,7 +437,18 @@ pub enum CompactionAlgorithm {
Tiered,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[derive(
Debug,
Clone,
Copy,
PartialEq,
Eq,
Serialize,
Deserialize,
strum_macros::FromRepr,
strum_macros::EnumString,
)]
#[strum(serialize_all = "kebab-case")]
pub enum ImageCompressionAlgorithm {
/// Disabled for writes, and never decompress during reading.
/// Never set this after you've enabled compression once!
@@ -458,31 +468,6 @@ impl ImageCompressionAlgorithm {
}
}
impl FromStr for ImageCompressionAlgorithm {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let mut components = s.split(['(', ')']);
let first = components
.next()
.ok_or_else(|| anyhow::anyhow!("empty string"))?;
match first {
"disabled-no-decompress" => Ok(ImageCompressionAlgorithm::DisabledNoDecompress),
"disabled" => Ok(ImageCompressionAlgorithm::Disabled),
"zstd" => {
let level = if let Some(v) = components.next() {
let v: i8 = v.parse()?;
Some(v)
} else {
None
};
Ok(ImageCompressionAlgorithm::Zstd { level })
}
_ => anyhow::bail!("invalid specifier '{first}'"),
}
}
}
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize)]
pub struct CompactionAlgorithmSettings {
pub kind: CompactionAlgorithm,
@@ -1675,29 +1660,4 @@ mod tests {
AuxFilePolicy::CrossValidation
);
}
#[test]
fn test_image_compression_algorithm_parsing() {
use ImageCompressionAlgorithm::*;
assert_eq!(
ImageCompressionAlgorithm::from_str("disabled").unwrap(),
Disabled
);
assert_eq!(
ImageCompressionAlgorithm::from_str("disabled-no-decompress").unwrap(),
DisabledNoDecompress
);
assert_eq!(
ImageCompressionAlgorithm::from_str("zstd").unwrap(),
Zstd { level: None }
);
assert_eq!(
ImageCompressionAlgorithm::from_str("zstd(18)").unwrap(),
Zstd { level: Some(18) }
);
assert_eq!(
ImageCompressionAlgorithm::from_str("zstd(-3)").unwrap(),
Zstd { level: Some(-3) }
);
}
}

View File

@@ -1,42 +1,59 @@
//! See docs/rfcs/031-sharding-static.md for an overview of sharding.
//!
//! This module contains a variety of types used to represent the concept of sharding
//! a Neon tenant across multiple physical shards. Since there are quite a few of these,
//! we provide an summary here.
//!
//! Types used to describe shards:
//! - [`ShardCount`] describes how many shards make up a tenant, plus the magic `unsharded` value
//! which identifies a tenant which is not shard-aware. This means its storage paths do not include
//! a shard suffix.
//! - [`ShardNumber`] is simply the zero-based index of a shard within a tenant.
//! - [`ShardIndex`] is the 2-tuple of `ShardCount` and `ShardNumber`, it's just like a `TenantShardId`
//! without the tenant ID. This is useful for things that are implicitly scoped to a particular
//! tenant, such as layer files.
//! - [`ShardIdentity`]` is the full description of a particular shard's parameters, in sufficient
//! detail to convert a [`Key`] to a [`ShardNumber`] when deciding where to write/read.
//! - The [`ShardSlug`] is a terse formatter for ShardCount and ShardNumber, written as
//! four hex digits. An unsharded tenant is `0000`.
//! - [`TenantShardId`] is the unique ID of a particular shard within a particular tenant
//!
//! Types used to describe the parameters for data distribution in a sharded tenant:
//! - [`ShardStripeSize`] controls how long contiguous runs of [`Key`]s (stripes) are when distributed across
//! multiple shards. Its value is given in 8kiB pages.
//! - [`ShardLayout`] describes the data distribution scheme, and at time of writing is
//! always zero: this is provided for future upgrades that might introduce different
//! data distribution schemes.
//!
//! Examples:
//! - A legacy unsharded tenant has one shard with ShardCount(0), ShardNumber(0), and its slug is 0000
//! - A single sharded tenant has one shard with ShardCount(1), ShardNumber(0), and its slug is 0001
//! - In a tenant with 4 shards, each shard has ShardCount(N), ShardNumber(i) where i in 0..N-1 (inclusive),
//! and their slugs are 0004, 0104, 0204, and 0304.
use std::{ops::RangeInclusive, str::FromStr};
use crate::{key::Key, models::ShardParameters};
use hex::FromHex;
use postgres_ffi::relfile_utils::INIT_FORKNUM;
use serde::{Deserialize, Serialize};
use utils::id::TenantId;
#[doc(inline)]
pub use ::utils::shard::*;
/// See docs/rfcs/031-sharding-static.md for an overview of sharding.
///
/// This module contains a variety of types used to represent the concept of sharding
/// a Neon tenant across multiple physical shards. Since there are quite a few of these,
/// we provide an summary here.
///
/// Types used to describe shards:
/// - [`ShardCount`] describes how many shards make up a tenant, plus the magic `unsharded` value
/// which identifies a tenant which is not shard-aware. This means its storage paths do not include
/// a shard suffix.
/// - [`ShardNumber`] is simply the zero-based index of a shard within a tenant.
/// - [`ShardIndex`] is the 2-tuple of `ShardCount` and `ShardNumber`, it's just like a `TenantShardId`
/// without the tenant ID. This is useful for things that are implicitly scoped to a particular
/// tenant, such as layer files.
/// - [`ShardIdentity`]` is the full description of a particular shard's parameters, in sufficient
/// detail to convert a [`Key`] to a [`ShardNumber`] when deciding where to write/read.
/// - The [`ShardSlug`] is a terse formatter for ShardCount and ShardNumber, written as
/// four hex digits. An unsharded tenant is `0000`.
/// - [`TenantShardId`] is the unique ID of a particular shard within a particular tenant
///
/// Types used to describe the parameters for data distribution in a sharded tenant:
/// - [`ShardStripeSize`] controls how long contiguous runs of [`Key`]s (stripes) are when distributed across
/// multiple shards. Its value is given in 8kiB pages.
/// - [`ShardLayout`] describes the data distribution scheme, and at time of writing is
/// always zero: this is provided for future upgrades that might introduce different
/// data distribution schemes.
///
/// Examples:
/// - A legacy unsharded tenant has one shard with ShardCount(0), ShardNumber(0), and its slug is 0000
/// - A single sharded tenant has one shard with ShardCount(1), ShardNumber(0), and its slug is 0001
/// - In a tenant with 4 shards, each shard has ShardCount(N), ShardNumber(i) where i in 0..N-1 (inclusive),
/// and their slugs are 0004, 0104, 0204, and 0304.
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
pub struct ShardNumber(pub u8);
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
pub struct ShardCount(u8);
/// Combination of ShardNumber and ShardCount. For use within the context of a particular tenant,
/// when we need to know which shard we're dealing with, but do not need to know the full
/// ShardIdentity (because we won't be doing any page->shard mapping), and do not need to know
/// the fully qualified TenantShardId.
#[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
pub struct ShardIndex {
pub shard_number: ShardNumber,
pub shard_count: ShardCount,
}
/// The ShardIdentity contains enough information to map a [`Key`] to a [`ShardNumber`],
/// and to check whether that [`ShardNumber`] is the same as the current shard.
@@ -48,6 +65,362 @@ pub struct ShardIdentity {
layout: ShardLayout,
}
/// Formatting helper, for generating the `shard_id` label in traces.
struct ShardSlug<'a>(&'a TenantShardId);
/// TenantShardId globally identifies a particular shard in a particular tenant.
///
/// These are written as `<TenantId>-<ShardSlug>`, for example:
/// # The second shard in a two-shard tenant
/// 072f1291a5310026820b2fe4b2968934-0102
///
/// If the `ShardCount` is _unsharded_, the `TenantShardId` is written without
/// a shard suffix and is equivalent to the encoding of a `TenantId`: this enables
/// an unsharded [`TenantShardId`] to be used interchangably with a [`TenantId`].
///
/// The human-readable encoding of an unsharded TenantShardId, such as used in API URLs,
/// is both forward and backward compatible with TenantId: a legacy TenantId can be
/// decoded as a TenantShardId, and when re-encoded it will be parseable
/// as a TenantId.
#[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
pub struct TenantShardId {
pub tenant_id: TenantId,
pub shard_number: ShardNumber,
pub shard_count: ShardCount,
}
impl ShardCount {
pub const MAX: Self = Self(u8::MAX);
/// The internal value of a ShardCount may be zero, which means "1 shard, but use
/// legacy format for TenantShardId that excludes the shard suffix", also known
/// as [`TenantShardId::unsharded`].
///
/// This method returns the actual number of shards, i.e. if our internal value is
/// zero, we return 1 (unsharded tenants have 1 shard).
pub fn count(&self) -> u8 {
if self.0 > 0 {
self.0
} else {
1
}
}
/// The literal internal value: this is **not** the number of shards in the
/// tenant, as we have a special zero value for legacy unsharded tenants. Use
/// [`Self::count`] if you want to know the cardinality of shards.
pub fn literal(&self) -> u8 {
self.0
}
/// Whether the `ShardCount` is for an unsharded tenant, so uses one shard but
/// uses the legacy format for `TenantShardId`. See also the documentation for
/// [`Self::count`].
pub fn is_unsharded(&self) -> bool {
self.0 == 0
}
/// `v` may be zero, or the number of shards in the tenant. `v` is what
/// [`Self::literal`] would return.
pub const fn new(val: u8) -> Self {
Self(val)
}
}
impl ShardNumber {
pub const MAX: Self = Self(u8::MAX);
}
impl TenantShardId {
pub fn unsharded(tenant_id: TenantId) -> Self {
Self {
tenant_id,
shard_number: ShardNumber(0),
shard_count: ShardCount(0),
}
}
/// The range of all TenantShardId that belong to a particular TenantId. This is useful when
/// you have a BTreeMap of TenantShardId, and are querying by TenantId.
pub fn tenant_range(tenant_id: TenantId) -> RangeInclusive<Self> {
RangeInclusive::new(
Self {
tenant_id,
shard_number: ShardNumber(0),
shard_count: ShardCount(0),
},
Self {
tenant_id,
shard_number: ShardNumber::MAX,
shard_count: ShardCount::MAX,
},
)
}
pub fn shard_slug(&self) -> impl std::fmt::Display + '_ {
ShardSlug(self)
}
/// Convenience for code that has special behavior on the 0th shard.
pub fn is_shard_zero(&self) -> bool {
self.shard_number == ShardNumber(0)
}
/// The "unsharded" value is distinct from simply having a single shard: it represents
/// a tenant which is not shard-aware at all, and whose storage paths will not include
/// a shard suffix.
pub fn is_unsharded(&self) -> bool {
self.shard_number == ShardNumber(0) && self.shard_count.is_unsharded()
}
/// Convenience for dropping the tenant_id and just getting the ShardIndex: this
/// is useful when logging from code that is already in a span that includes tenant ID, to
/// keep messages reasonably terse.
pub fn to_index(&self) -> ShardIndex {
ShardIndex {
shard_number: self.shard_number,
shard_count: self.shard_count,
}
}
/// Calculate the children of this TenantShardId when splitting the overall tenant into
/// the given number of shards.
pub fn split(&self, new_shard_count: ShardCount) -> Vec<TenantShardId> {
let effective_old_shard_count = std::cmp::max(self.shard_count.0, 1);
let mut child_shards = Vec::new();
for shard_number in 0..ShardNumber(new_shard_count.0).0 {
// Key mapping is based on a round robin mapping of key hash modulo shard count,
// so our child shards are the ones which the same keys would map to.
if shard_number % effective_old_shard_count == self.shard_number.0 {
child_shards.push(TenantShardId {
tenant_id: self.tenant_id,
shard_number: ShardNumber(shard_number),
shard_count: new_shard_count,
})
}
}
child_shards
}
}
impl<'a> std::fmt::Display for ShardSlug<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{:02x}{:02x}",
self.0.shard_number.0, self.0.shard_count.0
)
}
}
impl std::fmt::Display for TenantShardId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.shard_count != ShardCount(0) {
write!(f, "{}-{}", self.tenant_id, self.shard_slug())
} else {
// Legacy case (shard_count == 0) -- format as just the tenant id. Note that this
// is distinct from the normal single shard case (shard count == 1).
self.tenant_id.fmt(f)
}
}
}
impl std::fmt::Debug for TenantShardId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// Debug is the same as Display: the compact hex representation
write!(f, "{}", self)
}
}
impl std::str::FromStr for TenantShardId {
type Err = hex::FromHexError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
// Expect format: 16 byte TenantId, '-', 1 byte shard number, 1 byte shard count
if s.len() == 32 {
// Legacy case: no shard specified
Ok(Self {
tenant_id: TenantId::from_str(s)?,
shard_number: ShardNumber(0),
shard_count: ShardCount(0),
})
} else if s.len() == 37 {
let bytes = s.as_bytes();
let tenant_id = TenantId::from_hex(&bytes[0..32])?;
let mut shard_parts: [u8; 2] = [0u8; 2];
hex::decode_to_slice(&bytes[33..37], &mut shard_parts)?;
Ok(Self {
tenant_id,
shard_number: ShardNumber(shard_parts[0]),
shard_count: ShardCount(shard_parts[1]),
})
} else {
Err(hex::FromHexError::InvalidStringLength)
}
}
}
impl From<[u8; 18]> for TenantShardId {
fn from(b: [u8; 18]) -> Self {
let tenant_id_bytes: [u8; 16] = b[0..16].try_into().unwrap();
Self {
tenant_id: TenantId::from(tenant_id_bytes),
shard_number: ShardNumber(b[16]),
shard_count: ShardCount(b[17]),
}
}
}
impl ShardIndex {
pub fn new(number: ShardNumber, count: ShardCount) -> Self {
Self {
shard_number: number,
shard_count: count,
}
}
pub fn unsharded() -> Self {
Self {
shard_number: ShardNumber(0),
shard_count: ShardCount(0),
}
}
/// The "unsharded" value is distinct from simply having a single shard: it represents
/// a tenant which is not shard-aware at all, and whose storage paths will not include
/// a shard suffix.
pub fn is_unsharded(&self) -> bool {
self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0)
}
/// For use in constructing remote storage paths: concatenate this with a TenantId
/// to get a fully qualified TenantShardId.
///
/// Backward compat: this function returns an empty string if Self::is_unsharded, such
/// that the legacy pre-sharding remote key format is preserved.
pub fn get_suffix(&self) -> String {
if self.is_unsharded() {
"".to_string()
} else {
format!("-{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
}
}
}
impl std::fmt::Display for ShardIndex {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
}
}
impl std::fmt::Debug for ShardIndex {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// Debug is the same as Display: the compact hex representation
write!(f, "{}", self)
}
}
impl std::str::FromStr for ShardIndex {
type Err = hex::FromHexError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
// Expect format: 1 byte shard number, 1 byte shard count
if s.len() == 4 {
let bytes = s.as_bytes();
let mut shard_parts: [u8; 2] = [0u8; 2];
hex::decode_to_slice(bytes, &mut shard_parts)?;
Ok(Self {
shard_number: ShardNumber(shard_parts[0]),
shard_count: ShardCount(shard_parts[1]),
})
} else {
Err(hex::FromHexError::InvalidStringLength)
}
}
}
impl From<[u8; 2]> for ShardIndex {
fn from(b: [u8; 2]) -> Self {
Self {
shard_number: ShardNumber(b[0]),
shard_count: ShardCount(b[1]),
}
}
}
impl Serialize for TenantShardId {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
if serializer.is_human_readable() {
serializer.collect_str(self)
} else {
// Note: while human encoding of [`TenantShardId`] is backward and forward
// compatible, this binary encoding is not.
let mut packed: [u8; 18] = [0; 18];
packed[0..16].clone_from_slice(&self.tenant_id.as_arr());
packed[16] = self.shard_number.0;
packed[17] = self.shard_count.0;
packed.serialize(serializer)
}
}
}
impl<'de> Deserialize<'de> for TenantShardId {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct IdVisitor {
is_human_readable_deserializer: bool,
}
impl<'de> serde::de::Visitor<'de> for IdVisitor {
type Value = TenantShardId;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
if self.is_human_readable_deserializer {
formatter.write_str("value in form of hex string")
} else {
formatter.write_str("value in form of integer array([u8; 18])")
}
}
fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
where
A: serde::de::SeqAccess<'de>,
{
let s = serde::de::value::SeqAccessDeserializer::new(seq);
let id: [u8; 18] = Deserialize::deserialize(s)?;
Ok(TenantShardId::from(id))
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
TenantShardId::from_str(v).map_err(E::custom)
}
}
if deserializer.is_human_readable() {
deserializer.deserialize_str(IdVisitor {
is_human_readable_deserializer: true,
})
} else {
deserializer.deserialize_tuple(
18,
IdVisitor {
is_human_readable_deserializer: false,
},
)
}
}
}
/// Stripe size in number of pages
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Debug)]
pub struct ShardStripeSize(pub u32);
@@ -212,6 +585,77 @@ impl ShardIdentity {
}
}
impl Serialize for ShardIndex {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
if serializer.is_human_readable() {
serializer.collect_str(self)
} else {
// Binary encoding is not used in index_part.json, but is included in anticipation of
// switching various structures (e.g. inter-process communication, remote metadata) to more
// compact binary encodings in future.
let mut packed: [u8; 2] = [0; 2];
packed[0] = self.shard_number.0;
packed[1] = self.shard_count.0;
packed.serialize(serializer)
}
}
}
impl<'de> Deserialize<'de> for ShardIndex {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct IdVisitor {
is_human_readable_deserializer: bool,
}
impl<'de> serde::de::Visitor<'de> for IdVisitor {
type Value = ShardIndex;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
if self.is_human_readable_deserializer {
formatter.write_str("value in form of hex string")
} else {
formatter.write_str("value in form of integer array([u8; 2])")
}
}
fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
where
A: serde::de::SeqAccess<'de>,
{
let s = serde::de::value::SeqAccessDeserializer::new(seq);
let id: [u8; 2] = Deserialize::deserialize(s)?;
Ok(ShardIndex::from(id))
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
ShardIndex::from_str(v).map_err(E::custom)
}
}
if deserializer.is_human_readable() {
deserializer.deserialize_str(IdVisitor {
is_human_readable_deserializer: true,
})
} else {
deserializer.deserialize_tuple(
2,
IdVisitor {
is_human_readable_deserializer: false,
},
)
}
}
}
/// Whether this key is always held on shard 0 (e.g. shard 0 holds all SLRU keys
/// in order to be able to serve basebackup requests without peer communication).
fn key_is_shard0(key: &Key) -> bool {
@@ -293,9 +737,7 @@ pub fn describe(
#[cfg(test)]
mod tests {
use std::str::FromStr;
use utils::{id::TenantId, Hex};
use utils::Hex;
use super::*;

View File

@@ -5,7 +5,6 @@ edition.workspace = true
license.workspace = true
[dependencies]
async-trait.workspace = true
anyhow.workspace = true
bytes.workspace = true
futures.workspace = true
@@ -13,7 +12,6 @@ rustls.workspace = true
serde.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio-util.workspace = true
tokio-rustls.workspace = true
tracing.workspace = true
@@ -24,4 +22,4 @@ workspace_hack.workspace = true
once_cell.workspace = true
rustls-pemfile.workspace = true
tokio-postgres.workspace = true
tokio-postgres-rustls.workspace = true
tokio-postgres-rustls.workspace = true

View File

@@ -16,7 +16,6 @@ use std::{fmt, io};
use std::{future::Future, str::FromStr};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_rustls::TlsAcceptor;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, trace, warn};
use pq_proto::framed::{ConnectionError, Framed, FramedReader, FramedWriter};
@@ -79,7 +78,7 @@ pub fn is_expected_io_error(e: &io::Error) -> bool {
)
}
#[async_trait::async_trait]
#[allow(async_fn_in_trait)]
pub trait Handler<IO> {
/// Handle single query.
/// postgres_backend will issue ReadyForQuery after calling this (this
@@ -401,15 +400,21 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
}
/// Wrapper for run_message_loop() that shuts down socket when we are done
pub async fn run(
pub async fn run<F, S>(
mut self,
handler: &mut impl Handler<IO>,
cancel: &CancellationToken,
) -> Result<(), QueryError> {
let ret = self.run_message_loop(handler, cancel).await;
shutdown_watcher: F,
) -> Result<(), QueryError>
where
F: Fn() -> S + Clone,
S: Future,
{
let ret = self
.run_message_loop(handler, shutdown_watcher.clone())
.await;
tokio::select! {
_ = cancel.cancelled() => {
_ = shutdown_watcher() => {
// do nothing; we most likely got already stopped by shutdown and will log it next.
}
_ = self.framed.shutdown() => {
@@ -439,17 +444,21 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
}
}
async fn run_message_loop(
async fn run_message_loop<F, S>(
&mut self,
handler: &mut impl Handler<IO>,
cancel: &CancellationToken,
) -> Result<(), QueryError> {
shutdown_watcher: F,
) -> Result<(), QueryError>
where
F: Fn() -> S,
S: Future,
{
trace!("postgres backend to {:?} started", self.peer_addr);
tokio::select!(
biased;
_ = cancel.cancelled() => {
_ = shutdown_watcher() => {
// We were requested to shut down.
tracing::info!("shutdown request received during handshake");
return Err(QueryError::Shutdown)
@@ -464,7 +473,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
let mut query_string = Bytes::new();
while let Some(msg) = tokio::select!(
biased;
_ = cancel.cancelled() => {
_ = shutdown_watcher() => {
// We were requested to shut down.
tracing::info!("shutdown request received in run_message_loop");
return Err(QueryError::Shutdown)
@@ -476,7 +485,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
let result = self.process_message(handler, msg, &mut query_string).await;
tokio::select!(
biased;
_ = cancel.cancelled() => {
_ = shutdown_watcher() => {
// We were requested to shut down.
tracing::info!("shutdown request received during response flush");

View File

@@ -3,14 +3,13 @@ use once_cell::sync::Lazy;
use postgres_backend::{AuthType, Handler, PostgresBackend, QueryError};
use pq_proto::{BeMessage, RowDescriptor};
use std::io::Cursor;
use std::sync::Arc;
use std::{future, sync::Arc};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::{TcpListener, TcpStream};
use tokio_postgres::config::SslMode;
use tokio_postgres::tls::MakeTlsConnect;
use tokio_postgres::{Config, NoTls, SimpleQueryMessage};
use tokio_postgres_rustls::MakeRustlsConnect;
use tokio_util::sync::CancellationToken;
// generate client, server test streams
async fn make_tcp_pair() -> (TcpStream, TcpStream) {
@@ -23,7 +22,6 @@ async fn make_tcp_pair() -> (TcpStream, TcpStream) {
struct TestHandler {}
#[async_trait::async_trait]
impl<IO: AsyncRead + AsyncWrite + Unpin + Send> Handler<IO> for TestHandler {
// return single col 'hey' for any query
async fn process_query(
@@ -51,7 +49,7 @@ async fn simple_select() {
tokio::spawn(async move {
let mut handler = TestHandler {};
pgbackend.run(&mut handler, &CancellationToken::new()).await
pgbackend.run(&mut handler, future::pending::<()>).await
});
let conf = Config::new();
@@ -103,7 +101,7 @@ async fn simple_select_ssl() {
tokio::spawn(async move {
let mut handler = TestHandler {};
pgbackend.run(&mut handler, &CancellationToken::new()).await
pgbackend.run(&mut handler, future::pending::<()>).await
});
let client_cfg = rustls::ClientConfig::builder()

View File

@@ -26,8 +26,6 @@ pub mod auth;
// utility functions and helper traits for unified unique id generation/serialization etc.
pub mod id;
pub mod shard;
mod hex;
pub use hex::Hex;

View File

@@ -1,451 +0,0 @@
//! See `pageserver_api::shard` for description on sharding.
use std::{ops::RangeInclusive, str::FromStr};
use hex::FromHex;
use serde::{Deserialize, Serialize};
use crate::id::TenantId;
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
pub struct ShardNumber(pub u8);
#[derive(Ord, PartialOrd, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Debug, Hash)]
pub struct ShardCount(pub u8);
/// Combination of ShardNumber and ShardCount. For use within the context of a particular tenant,
/// when we need to know which shard we're dealing with, but do not need to know the full
/// ShardIdentity (because we won't be doing any page->shard mapping), and do not need to know
/// the fully qualified TenantShardId.
#[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
pub struct ShardIndex {
pub shard_number: ShardNumber,
pub shard_count: ShardCount,
}
/// Formatting helper, for generating the `shard_id` label in traces.
pub struct ShardSlug<'a>(&'a TenantShardId);
/// TenantShardId globally identifies a particular shard in a particular tenant.
///
/// These are written as `<TenantId>-<ShardSlug>`, for example:
/// # The second shard in a two-shard tenant
/// 072f1291a5310026820b2fe4b2968934-0102
///
/// If the `ShardCount` is _unsharded_, the `TenantShardId` is written without
/// a shard suffix and is equivalent to the encoding of a `TenantId`: this enables
/// an unsharded [`TenantShardId`] to be used interchangably with a [`TenantId`].
///
/// The human-readable encoding of an unsharded TenantShardId, such as used in API URLs,
/// is both forward and backward compatible with TenantId: a legacy TenantId can be
/// decoded as a TenantShardId, and when re-encoded it will be parseable
/// as a TenantId.
#[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Copy, Hash)]
pub struct TenantShardId {
pub tenant_id: TenantId,
pub shard_number: ShardNumber,
pub shard_count: ShardCount,
}
impl ShardCount {
pub const MAX: Self = Self(u8::MAX);
/// The internal value of a ShardCount may be zero, which means "1 shard, but use
/// legacy format for TenantShardId that excludes the shard suffix", also known
/// as [`TenantShardId::unsharded`].
///
/// This method returns the actual number of shards, i.e. if our internal value is
/// zero, we return 1 (unsharded tenants have 1 shard).
pub fn count(&self) -> u8 {
if self.0 > 0 {
self.0
} else {
1
}
}
/// The literal internal value: this is **not** the number of shards in the
/// tenant, as we have a special zero value for legacy unsharded tenants. Use
/// [`Self::count`] if you want to know the cardinality of shards.
pub fn literal(&self) -> u8 {
self.0
}
/// Whether the `ShardCount` is for an unsharded tenant, so uses one shard but
/// uses the legacy format for `TenantShardId`. See also the documentation for
/// [`Self::count`].
pub fn is_unsharded(&self) -> bool {
self.0 == 0
}
/// `v` may be zero, or the number of shards in the tenant. `v` is what
/// [`Self::literal`] would return.
pub const fn new(val: u8) -> Self {
Self(val)
}
}
impl ShardNumber {
pub const MAX: Self = Self(u8::MAX);
}
impl TenantShardId {
pub fn unsharded(tenant_id: TenantId) -> Self {
Self {
tenant_id,
shard_number: ShardNumber(0),
shard_count: ShardCount(0),
}
}
/// The range of all TenantShardId that belong to a particular TenantId. This is useful when
/// you have a BTreeMap of TenantShardId, and are querying by TenantId.
pub fn tenant_range(tenant_id: TenantId) -> RangeInclusive<Self> {
RangeInclusive::new(
Self {
tenant_id,
shard_number: ShardNumber(0),
shard_count: ShardCount(0),
},
Self {
tenant_id,
shard_number: ShardNumber::MAX,
shard_count: ShardCount::MAX,
},
)
}
pub fn shard_slug(&self) -> impl std::fmt::Display + '_ {
ShardSlug(self)
}
/// Convenience for code that has special behavior on the 0th shard.
pub fn is_shard_zero(&self) -> bool {
self.shard_number == ShardNumber(0)
}
/// The "unsharded" value is distinct from simply having a single shard: it represents
/// a tenant which is not shard-aware at all, and whose storage paths will not include
/// a shard suffix.
pub fn is_unsharded(&self) -> bool {
self.shard_number == ShardNumber(0) && self.shard_count.is_unsharded()
}
/// Convenience for dropping the tenant_id and just getting the ShardIndex: this
/// is useful when logging from code that is already in a span that includes tenant ID, to
/// keep messages reasonably terse.
pub fn to_index(&self) -> ShardIndex {
ShardIndex {
shard_number: self.shard_number,
shard_count: self.shard_count,
}
}
/// Calculate the children of this TenantShardId when splitting the overall tenant into
/// the given number of shards.
pub fn split(&self, new_shard_count: ShardCount) -> Vec<TenantShardId> {
let effective_old_shard_count = std::cmp::max(self.shard_count.0, 1);
let mut child_shards = Vec::new();
for shard_number in 0..ShardNumber(new_shard_count.0).0 {
// Key mapping is based on a round robin mapping of key hash modulo shard count,
// so our child shards are the ones which the same keys would map to.
if shard_number % effective_old_shard_count == self.shard_number.0 {
child_shards.push(TenantShardId {
tenant_id: self.tenant_id,
shard_number: ShardNumber(shard_number),
shard_count: new_shard_count,
})
}
}
child_shards
}
}
impl<'a> std::fmt::Display for ShardSlug<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{:02x}{:02x}",
self.0.shard_number.0, self.0.shard_count.0
)
}
}
impl std::fmt::Display for TenantShardId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.shard_count != ShardCount(0) {
write!(f, "{}-{}", self.tenant_id, self.shard_slug())
} else {
// Legacy case (shard_count == 0) -- format as just the tenant id. Note that this
// is distinct from the normal single shard case (shard count == 1).
self.tenant_id.fmt(f)
}
}
}
impl std::fmt::Debug for TenantShardId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// Debug is the same as Display: the compact hex representation
write!(f, "{}", self)
}
}
impl std::str::FromStr for TenantShardId {
type Err = hex::FromHexError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
// Expect format: 16 byte TenantId, '-', 1 byte shard number, 1 byte shard count
if s.len() == 32 {
// Legacy case: no shard specified
Ok(Self {
tenant_id: TenantId::from_str(s)?,
shard_number: ShardNumber(0),
shard_count: ShardCount(0),
})
} else if s.len() == 37 {
let bytes = s.as_bytes();
let tenant_id = TenantId::from_hex(&bytes[0..32])?;
let mut shard_parts: [u8; 2] = [0u8; 2];
hex::decode_to_slice(&bytes[33..37], &mut shard_parts)?;
Ok(Self {
tenant_id,
shard_number: ShardNumber(shard_parts[0]),
shard_count: ShardCount(shard_parts[1]),
})
} else {
Err(hex::FromHexError::InvalidStringLength)
}
}
}
impl From<[u8; 18]> for TenantShardId {
fn from(b: [u8; 18]) -> Self {
let tenant_id_bytes: [u8; 16] = b[0..16].try_into().unwrap();
Self {
tenant_id: TenantId::from(tenant_id_bytes),
shard_number: ShardNumber(b[16]),
shard_count: ShardCount(b[17]),
}
}
}
impl ShardIndex {
pub fn new(number: ShardNumber, count: ShardCount) -> Self {
Self {
shard_number: number,
shard_count: count,
}
}
pub fn unsharded() -> Self {
Self {
shard_number: ShardNumber(0),
shard_count: ShardCount(0),
}
}
/// The "unsharded" value is distinct from simply having a single shard: it represents
/// a tenant which is not shard-aware at all, and whose storage paths will not include
/// a shard suffix.
pub fn is_unsharded(&self) -> bool {
self.shard_number == ShardNumber(0) && self.shard_count == ShardCount(0)
}
/// For use in constructing remote storage paths: concatenate this with a TenantId
/// to get a fully qualified TenantShardId.
///
/// Backward compat: this function returns an empty string if Self::is_unsharded, such
/// that the legacy pre-sharding remote key format is preserved.
pub fn get_suffix(&self) -> String {
if self.is_unsharded() {
"".to_string()
} else {
format!("-{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
}
}
}
impl std::fmt::Display for ShardIndex {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:02x}{:02x}", self.shard_number.0, self.shard_count.0)
}
}
impl std::fmt::Debug for ShardIndex {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// Debug is the same as Display: the compact hex representation
write!(f, "{}", self)
}
}
impl std::str::FromStr for ShardIndex {
type Err = hex::FromHexError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
// Expect format: 1 byte shard number, 1 byte shard count
if s.len() == 4 {
let bytes = s.as_bytes();
let mut shard_parts: [u8; 2] = [0u8; 2];
hex::decode_to_slice(bytes, &mut shard_parts)?;
Ok(Self {
shard_number: ShardNumber(shard_parts[0]),
shard_count: ShardCount(shard_parts[1]),
})
} else {
Err(hex::FromHexError::InvalidStringLength)
}
}
}
impl From<[u8; 2]> for ShardIndex {
fn from(b: [u8; 2]) -> Self {
Self {
shard_number: ShardNumber(b[0]),
shard_count: ShardCount(b[1]),
}
}
}
impl Serialize for TenantShardId {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
if serializer.is_human_readable() {
serializer.collect_str(self)
} else {
// Note: while human encoding of [`TenantShardId`] is backward and forward
// compatible, this binary encoding is not.
let mut packed: [u8; 18] = [0; 18];
packed[0..16].clone_from_slice(&self.tenant_id.as_arr());
packed[16] = self.shard_number.0;
packed[17] = self.shard_count.0;
packed.serialize(serializer)
}
}
}
impl<'de> Deserialize<'de> for TenantShardId {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct IdVisitor {
is_human_readable_deserializer: bool,
}
impl<'de> serde::de::Visitor<'de> for IdVisitor {
type Value = TenantShardId;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
if self.is_human_readable_deserializer {
formatter.write_str("value in form of hex string")
} else {
formatter.write_str("value in form of integer array([u8; 18])")
}
}
fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
where
A: serde::de::SeqAccess<'de>,
{
let s = serde::de::value::SeqAccessDeserializer::new(seq);
let id: [u8; 18] = Deserialize::deserialize(s)?;
Ok(TenantShardId::from(id))
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
TenantShardId::from_str(v).map_err(E::custom)
}
}
if deserializer.is_human_readable() {
deserializer.deserialize_str(IdVisitor {
is_human_readable_deserializer: true,
})
} else {
deserializer.deserialize_tuple(
18,
IdVisitor {
is_human_readable_deserializer: false,
},
)
}
}
}
impl Serialize for ShardIndex {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
if serializer.is_human_readable() {
serializer.collect_str(self)
} else {
// Binary encoding is not used in index_part.json, but is included in anticipation of
// switching various structures (e.g. inter-process communication, remote metadata) to more
// compact binary encodings in future.
let mut packed: [u8; 2] = [0; 2];
packed[0] = self.shard_number.0;
packed[1] = self.shard_count.0;
packed.serialize(serializer)
}
}
}
impl<'de> Deserialize<'de> for ShardIndex {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct IdVisitor {
is_human_readable_deserializer: bool,
}
impl<'de> serde::de::Visitor<'de> for IdVisitor {
type Value = ShardIndex;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
if self.is_human_readable_deserializer {
formatter.write_str("value in form of hex string")
} else {
formatter.write_str("value in form of integer array([u8; 2])")
}
}
fn visit_seq<A>(self, seq: A) -> Result<Self::Value, A::Error>
where
A: serde::de::SeqAccess<'de>,
{
let s = serde::de::value::SeqAccessDeserializer::new(seq);
let id: [u8; 2] = Deserialize::deserialize(s)?;
Ok(ShardIndex::from(id))
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
ShardIndex::from_str(v).map_err(E::custom)
}
}
if deserializer.is_human_readable() {
deserializer.deserialize_str(IdVisitor {
is_human_readable_deserializer: true,
})
} else {
deserializer.deserialize_tuple(
2,
IdVisitor {
is_human_readable_deserializer: false,
},
)
}
}
}

View File

@@ -62,7 +62,6 @@ sync_wrapper.workspace = true
sysinfo.workspace = true
tokio-tar.workspace = true
thiserror.workspace = true
tikv-jemallocator.workspace = true
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time"] }
tokio-epoll-uring.workspace = true
tokio-io-timeout.workspace = true

View File

@@ -47,9 +47,6 @@ use utils::{
project_git_version!(GIT_VERSION);
project_build_tag!(BUILD_TAG);
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
const PID_FILE_NAME: &str = "pageserver.pid";
const FEATURES: &[&str] = &[

View File

@@ -1456,12 +1456,10 @@ impl<'a, 'c> BasebackupQueryTimeOngoingRecording<'a, 'c> {
}
}
pub(crate) static LIVE_CONNECTIONS: Lazy<IntCounterPairVec> = Lazy::new(|| {
register_int_counter_pair_vec!(
"pageserver_live_connections_started",
"Number of network connections that we started handling",
"pageserver_live_connections_finished",
"Number of network connections that we finished handling",
pub(crate) static LIVE_CONNECTIONS_COUNT: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_live_connections",
"Number of live network connections",
&["pageserver_connection_kind"]
)
.expect("failed to define a metric")

View File

@@ -55,7 +55,7 @@ use crate::basebackup::BasebackupError;
use crate::context::{DownloadBehavior, RequestContext};
use crate::import_datadir::import_wal_from_tar;
use crate::metrics;
use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS};
use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS_COUNT};
use crate::pgdatadir_mapping::Version;
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
@@ -215,9 +215,14 @@ async fn page_service_conn_main(
auth_type: AuthType,
connection_ctx: RequestContext,
) -> anyhow::Result<()> {
let _guard = LIVE_CONNECTIONS
.with_label_values(&["page_service"])
.guard();
// Immediately increment the gauge, then create a job to decrement it on task exit.
// One of the pros of `defer!` is that this will *most probably*
// get called, even in presence of panics.
let gauge = LIVE_CONNECTIONS_COUNT.with_label_values(&["page_service"]);
gauge.inc();
scopeguard::defer! {
gauge.dec();
}
socket
.set_nodelay(true)
@@ -267,7 +272,7 @@ async fn page_service_conn_main(
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
match pgbackend
.run(&mut conn_handler, &task_mgr::shutdown_token())
.run(&mut conn_handler, task_mgr::shutdown_watcher)
.await
{
Ok(()) => {
@@ -1473,7 +1478,6 @@ impl PageServerHandler {
}
}
#[async_trait::async_trait]
impl<IO> postgres_backend::Handler<IO> for PageServerHandler
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,

View File

@@ -854,14 +854,13 @@ impl Timeline {
result.add_key(DBDIR_KEY);
// Fetch list of database dirs and iterate them
let dbdir = self.list_dbdirs(lsn, ctx).await?;
let mut dbs: Vec<((Oid, Oid), bool)> = dbdir.into_iter().collect();
let buf = self.get(DBDIR_KEY, lsn, ctx).await?;
let dbdir = DbDirectory::des(&buf)?;
dbs.sort_unstable_by(|(k_a, _), (k_b, _)| k_a.cmp(k_b));
for ((spcnode, dbnode), has_relmap_file) in dbs {
if has_relmap_file {
result.add_key(relmap_file_key(spcnode, dbnode));
}
let mut dbs: Vec<(Oid, Oid)> = dbdir.dbdirs.keys().cloned().collect();
dbs.sort_unstable();
for (spcnode, dbnode) in dbs {
result.add_key(relmap_file_key(spcnode, dbnode));
result.add_key(rel_dir_to_key(spcnode, dbnode));
let mut rels: Vec<RelTag> = self
@@ -920,9 +919,6 @@ impl Timeline {
result.add_key(AUX_FILES_KEY);
}
// Add extra keyspaces in the test cases. Some test cases write keys into the storage without
// creating directory keys. These test cases will add such keyspaces into `extra_test_dense_keyspace`
// and the keys will not be garbage-colllected.
#[cfg(test)]
{
let guard = self.extra_test_dense_keyspace.load();
@@ -931,48 +927,13 @@ impl Timeline {
}
}
let dense_keyspace = result.to_keyspace();
let sparse_keyspace = SparseKeySpace(KeySpace {
ranges: vec![Key::metadata_aux_key_range(), repl_origin_key_range()],
});
if cfg!(debug_assertions) {
// Verify if the sparse keyspaces are ordered and non-overlapping.
// We do not use KeySpaceAccum for sparse_keyspace because we want to ensure each
// category of sparse keys are split into their own image/delta files. If there
// are overlapping keyspaces, they will be automatically merged by keyspace accum,
// and we want the developer to keep the keyspaces separated.
let ranges = &sparse_keyspace.0.ranges;
// TODO: use a single overlaps_with across the codebase
fn overlaps_with<T: Ord>(a: &Range<T>, b: &Range<T>) -> bool {
!(a.end <= b.start || b.end <= a.start)
}
for i in 0..ranges.len() {
for j in 0..i {
if overlaps_with(&ranges[i], &ranges[j]) {
panic!(
"overlapping sparse keyspace: {}..{} and {}..{}",
ranges[i].start, ranges[i].end, ranges[j].start, ranges[j].end
);
}
}
}
for i in 1..ranges.len() {
assert!(
ranges[i - 1].end <= ranges[i].start,
"unordered sparse keyspace: {}..{} and {}..{}",
ranges[i - 1].start,
ranges[i - 1].end,
ranges[i].start,
ranges[i].end
);
}
}
Ok((dense_keyspace, sparse_keyspace))
Ok((
result.to_keyspace(),
/* AUX sparse key space */
SparseKeySpace(KeySpace {
ranges: vec![repl_origin_key_range(), Key::metadata_aux_key_range()],
}),
))
}
/// Get cached size of relation if it not updated after specified LSN

View File

@@ -728,9 +728,6 @@ impl From<CreateImageLayersError> for CompactionError {
fn from(e: CreateImageLayersError) -> Self {
match e {
CreateImageLayersError::Cancelled => CompactionError::ShuttingDown,
CreateImageLayersError::Other(e) => {
CompactionError::Other(e.context("create image layers"))
}
_ => CompactionError::Other(e.into()),
}
}

View File

@@ -26,7 +26,7 @@ use tracing::{debug, error, info, trace, warn, Instrument};
use super::TaskStateUpdate;
use crate::{
context::RequestContext,
metrics::{LIVE_CONNECTIONS, WALRECEIVER_STARTED_CONNECTIONS, WAL_INGEST},
metrics::{LIVE_CONNECTIONS_COUNT, WALRECEIVER_STARTED_CONNECTIONS, WAL_INGEST},
task_mgr::TaskKind,
task_mgr::WALRECEIVER_RUNTIME,
tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline, WalReceiverInfo},
@@ -208,9 +208,14 @@ pub(super) async fn handle_walreceiver_connection(
.instrument(tracing::info_span!("poller")),
);
let _guard = LIVE_CONNECTIONS
.with_label_values(&["wal_receiver"])
.guard();
// Immediately increment the gauge, then create a job to decrement it on task exit.
// One of the pros of `defer!` is that this will *most probably*
// get called, even in presence of panics.
let gauge = LIVE_CONNECTIONS_COUNT.with_label_values(&["wal_receiver"]);
gauge.inc();
scopeguard::defer! {
gauge.dec();
}
let identify = identify_system(&replication_client).await?;
info!("{identify:?}");

20
poetry.lock generated
View File

@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand.
# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand.
[[package]]
name = "aiohttp"
@@ -734,13 +734,13 @@ typing-extensions = ">=4.1.0"
[[package]]
name = "certifi"
version = "2024.7.4"
version = "2023.7.22"
description = "Python package for providing Mozilla's CA Bundle."
optional = false
python-versions = ">=3.6"
files = [
{file = "certifi-2024.7.4-py3-none-any.whl", hash = "sha256:c198e21b1289c2ab85ee4e67bb4b4ef3ead0892059901a8d5b622f24a1101e90"},
{file = "certifi-2024.7.4.tar.gz", hash = "sha256:5a1e7645bc0ec61a09e26c36f6106dd4cf40c6db3a1fb6352b0244e7fb057c7b"},
{file = "certifi-2023.7.22-py3-none-any.whl", hash = "sha256:92d6037539857d8206b8f6ae472e8b77db8058fec5937a1ef3f54304089edbb9"},
{file = "certifi-2023.7.22.tar.gz", hash = "sha256:539cc1d13202e33ca466e88b2807e29f4c13049d6d87031a3c110744495cb082"},
]
[[package]]
@@ -3133,18 +3133,18 @@ multidict = ">=4.0"
[[package]]
name = "zipp"
version = "3.19.1"
version = "3.8.1"
description = "Backport of pathlib-compatible object wrapper for zip files"
optional = false
python-versions = ">=3.8"
python-versions = ">=3.7"
files = [
{file = "zipp-3.19.1-py3-none-any.whl", hash = "sha256:2828e64edb5386ea6a52e7ba7cdb17bb30a73a858f5eb6eb93d8d36f5ea26091"},
{file = "zipp-3.19.1.tar.gz", hash = "sha256:35427f6d5594f4acf82d25541438348c26736fa9b3afa2754bcd63cdb99d8e8f"},
{file = "zipp-3.8.1-py3-none-any.whl", hash = "sha256:47c40d7fe183a6f21403a199b3e4192cca5774656965b0a4988ad2f8feb5f009"},
{file = "zipp-3.8.1.tar.gz", hash = "sha256:05b45f1ee8f807d0cc928485ca40a07cb491cf092ff587c0df9cb1fd154848d2"},
]
[package.extras]
doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"]
test = ["big-O", "jaraco.functools", "jaraco.itertools", "jaraco.test", "more-itertools", "pytest (>=6,!=8.1.*)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-ignore-flaky", "pytest-mypy", "pytest-ruff (>=0.2.1)"]
docs = ["jaraco.packaging (>=9)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx"]
testing = ["func-timeout", "jaraco.itertools", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=1.3)", "pytest-flake8", "pytest-mypy (>=0.9.1)"]
[[package]]
name = "zstandard"

View File

@@ -6,9 +6,8 @@ use anyhow::Context;
use once_cell::sync::Lazy;
use postgres_backend::{AuthType, PostgresBackend, PostgresBackendTCP, QueryError};
use pq_proto::{BeMessage, SINGLE_COL_ROWDESC};
use std::convert::Infallible;
use std::{convert::Infallible, future};
use tokio::net::{TcpListener, TcpStream};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, info_span, Instrument};
static CPLANE_WAITERS: Lazy<Waiters<ComputeReady>> = Lazy::new(Default::default);
@@ -68,9 +67,7 @@ pub async fn task_main(listener: TcpListener) -> anyhow::Result<Infallible> {
async fn handle_connection(socket: TcpStream) -> Result<(), QueryError> {
let pgbackend = PostgresBackend::new(socket, AuthType::Trust, None)?;
pgbackend
.run(&mut MgmtHandler, &CancellationToken::new())
.await
pgbackend.run(&mut MgmtHandler, future::pending::<()>).await
}
/// A message received by `mgmt` when a compute node is ready.
@@ -78,7 +75,6 @@ pub type ComputeReady = DatabaseInfo;
// TODO: replace with an http-based protocol.
struct MgmtHandler;
#[async_trait::async_trait]
impl postgres_backend::Handler<tokio::net::TcpStream> for MgmtHandler {
async fn process_query(
&mut self,

View File

@@ -3,8 +3,8 @@ use std::marker::PhantomData;
use measured::{
label::NoLabels,
metric::{
gauge::GaugeState, group::Encoding, name::MetricNameEncoder, MetricEncoding,
MetricFamilyEncoding, MetricType,
gauge::GaugeState, group::Encoding, group::MetricValue, name::MetricNameEncoder,
MetricEncoding, MetricFamilyEncoding, MetricType,
},
text::TextEncoder,
LabelGroup, MetricGroup,
@@ -100,7 +100,7 @@ macro_rules! jemalloc_gauge {
enc: &mut TextEncoder<W>,
) -> Result<(), std::io::Error> {
if let Ok(v) = mib.read() {
GaugeState::new(v as i64).collect_into(&(), labels, name, enc)?;
enc.write_metric_value(name, labels, MetricValue::Int(v as i64))?;
}
Ok(())
}

View File

@@ -2,7 +2,7 @@ use std::sync::{Arc, OnceLock};
use lasso::ThreadedRodeo;
use measured::{
label::{FixedCardinalitySet, LabelGroupSet, LabelName, LabelSet, LabelValue, StaticLabelSet},
label::{FixedCardinalitySet, LabelName, LabelSet, LabelValue, StaticLabelSet},
metric::{histogram::Thresholds, name::MetricName},
Counter, CounterVec, FixedCardinalityLabel, Gauge, GaugeVec, Histogram, HistogramVec,
LabelGroup, MetricGroup,
@@ -577,32 +577,6 @@ impl LabelGroup for ThreadPoolWorkerId {
}
}
impl LabelGroupSet for ThreadPoolWorkers {
type Group<'a> = ThreadPoolWorkerId;
fn cardinality(&self) -> Option<usize> {
Some(self.0)
}
fn encode_dense(&self, value: Self::Unique) -> Option<usize> {
Some(value)
}
fn decode_dense(&self, value: usize) -> Self::Group<'_> {
ThreadPoolWorkerId(value)
}
type Unique = usize;
fn encode(&self, value: Self::Group<'_>) -> Option<Self::Unique> {
Some(value.0)
}
fn decode(&self, value: &Self::Unique) -> Self::Group<'_> {
ThreadPoolWorkerId(*value)
}
}
impl LabelSet for ThreadPoolWorkers {
type Value<'a> = ThreadPoolWorkerId;

View File

@@ -838,9 +838,8 @@ async fn query_to_json<T: GenericClient>(
"finished reading rows"
);
let columns_len = row_stream.columns().len();
let mut fields = Vec::with_capacity(columns_len);
let mut columns = Vec::with_capacity(columns_len);
let mut fields = vec![];
let mut columns = vec![];
for c in row_stream.columns() {
fields.push(json!({

View File

@@ -95,7 +95,6 @@ fn cmd_to_string(cmd: &SafekeeperPostgresCommand) -> &str {
}
}
#[async_trait::async_trait]
impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
for SafekeeperPostgresHandler
{

View File

@@ -145,6 +145,10 @@ pub static WAL_SERVICE_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("WAL service worker")
.enable_all()
// Default is 2 MiB at time of writing.
// With that, the `postgres_backend::Handler` overflows the stack in debug builds.
// => use 4 MiB
.thread_stack_size(4 * 1024 * 1024)
.build()
.expect("Failed to create WAL service runtime")
});

View File

@@ -4,10 +4,9 @@
//!
use anyhow::{Context, Result};
use postgres_backend::QueryError;
use std::time::Duration;
use std::{future, time::Duration};
use tokio::net::TcpStream;
use tokio_io_timeout::TimeoutReader;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::{auth::Scope, measured_stream::MeasuredStream};
@@ -101,7 +100,7 @@ async fn handle_socket(
// libpq protocol between safekeeper and walproposer / pageserver
// We don't use shutdown.
pgbackend
.run(&mut conn_handler, &CancellationToken::new())
.run(&mut conn_handler, future::pending::<()>)
.await
}

View File

@@ -4062,14 +4062,7 @@ impl Service {
placement_policy: Some(PlacementPolicy::Attached(0)), // No secondaries, for convenient debug/hacking
// There is no way to know what the tenant's config was: revert to defaults
//
// TODO: remove `switch_aux_file_policy` once we finish auxv2 migration
//
// we write to both v1+v2 storage, so that the test case can use either storage format for testing
config: TenantConfig {
switch_aux_file_policy: Some(models::AuxFilePolicy::CrossValidation),
..TenantConfig::default()
},
config: TenantConfig::default(),
})
.await?;

View File

@@ -1,4 +1,4 @@
use futures::{StreamExt, TryStreamExt};
use futures::StreamExt;
use pageserver::tenant::storage_layer::LayerName;
use serde::{Deserialize, Serialize};
@@ -29,7 +29,7 @@ impl LargeObjectKind {
}
}
#[derive(Serialize, Deserialize, Clone)]
#[derive(Serialize, Deserialize)]
pub struct LargeObject {
pub key: String,
pub size: u64,
@@ -45,76 +45,53 @@ pub async fn find_large_objects(
bucket_config: BucketConfig,
min_size: u64,
ignore_deltas: bool,
concurrency: usize,
) -> anyhow::Result<LargeObjectListing> {
let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
let tenants = std::pin::pin!(stream_tenants(&s3_client, &target));
let objects_stream = tenants.map_ok(|tenant_shard_id| {
let mut tenant_root = target.tenant_root(&tenant_shard_id);
let s3_client = s3_client.clone();
async move {
let mut objects = Vec::new();
let mut total_objects_ctr = 0u64;
// We want the objects and not just common prefixes
tenant_root.delimiter.clear();
let mut continuation_token = None;
loop {
let fetch_response =
list_objects_with_retries(&s3_client, &tenant_root, continuation_token.clone())
.await?;
for obj in fetch_response.contents().iter().filter(|o| {
if let Some(obj_size) = o.size {
min_size as i64 <= obj_size
} else {
false
}
}) {
let key = obj.key().expect("couldn't get key").to_owned();
let kind = LargeObjectKind::from_key(&key);
if ignore_deltas && kind == LargeObjectKind::DeltaLayer {
continue;
}
objects.push(LargeObject {
key,
size: obj.size.unwrap() as u64,
kind,
})
}
total_objects_ctr += fetch_response.contents().len() as u64;
match fetch_response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}
Ok((tenant_shard_id, objects, total_objects_ctr))
}
});
let mut objects_stream = std::pin::pin!(objects_stream.try_buffer_unordered(concurrency));
let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?;
let mut tenants = std::pin::pin!(stream_tenants(&s3_client, &target));
let mut objects = Vec::new();
let mut tenant_ctr = 0u64;
let mut object_ctr = 0u64;
while let Some(res) = objects_stream.next().await {
let (tenant_shard_id, objects_slice, total_objects_ctr) = res?;
objects.extend_from_slice(&objects_slice);
while let Some(tenant_shard_id) = tenants.next().await {
let tenant_shard_id = tenant_shard_id?;
let mut tenant_root = target.tenant_root(&tenant_shard_id);
// We want the objects and not just common prefixes
tenant_root.delimiter.clear();
let mut continuation_token = None;
loop {
let fetch_response =
list_objects_with_retries(&s3_client, &tenant_root, continuation_token.clone())
.await?;
for obj in fetch_response.contents().iter().filter(|o| {
if let Some(obj_size) = o.size {
min_size as i64 <= obj_size
} else {
false
}
}) {
let key = obj.key().expect("couldn't get key").to_owned();
let kind = LargeObjectKind::from_key(&key);
if ignore_deltas && kind == LargeObjectKind::DeltaLayer {
continue;
}
objects.push(LargeObject {
key,
size: obj.size.unwrap() as u64,
kind,
})
}
object_ctr += fetch_response.contents().len() as u64;
match fetch_response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}
object_ctr += total_objects_ctr;
tenant_ctr += 1;
if tenant_ctr % 100 == 0 {
if tenant_ctr % 50 == 0 {
tracing::info!(
"Scanned {tenant_ctr} shards. objects={object_ctr}, found={}, current={tenant_shard_id}.",
objects.len()
"Scanned {tenant_ctr} shards. objects={object_ctr}, found={}, current={tenant_shard_id}.", objects.len()
);
}
}
let bucket_name = target.bucket_name();
tracing::info!(
"Scan of {bucket_name} finished. Scanned {tenant_ctr} shards. objects={object_ctr}, found={}.",
objects.len()
);
Ok(LargeObjectListing { objects })
}

View File

@@ -140,7 +140,7 @@ async fn find_garbage_inner(
node_kind: NodeKind,
) -> anyhow::Result<GarbageList> {
// Construct clients for S3 and for Console API
let (s3_client, target) = init_remote(bucket_config.clone(), node_kind).await?;
let (s3_client, target) = init_remote(bucket_config.clone(), node_kind)?;
let cloud_admin_api_client = Arc::new(CloudAdminApiClient::new(console_config));
// Build a set of console-known tenants, for quickly eliminating known-active tenants without having
@@ -432,7 +432,7 @@ pub async fn purge_garbage(
);
let (s3_client, target) =
init_remote(garbage_list.bucket_config.clone(), garbage_list.node_kind).await?;
init_remote(garbage_list.bucket_config.clone(), garbage_list.node_kind)?;
// Sanity checks on the incoming list
if garbage_list.active_tenant_count == 0 {

View File

@@ -15,10 +15,17 @@ use std::fmt::Display;
use std::sync::Arc;
use std::time::Duration;
use anyhow::{anyhow, Context};
use aws_sdk_s3::config::Region;
use aws_sdk_s3::error::DisplayErrorContext;
use aws_sdk_s3::Client;
use anyhow::Context;
use aws_config::environment::EnvironmentVariableCredentialsProvider;
use aws_config::imds::credentials::ImdsCredentialsProvider;
use aws_config::meta::credentials::CredentialsProviderChain;
use aws_config::profile::ProfileFileCredentialsProvider;
use aws_config::retry::RetryConfig;
use aws_config::sso::SsoCredentialsProvider;
use aws_config::BehaviorVersion;
use aws_sdk_s3::config::{AsyncSleep, Region, SharedAsyncSleep};
use aws_sdk_s3::{Client, Config};
use aws_smithy_async::rt::sleep::TokioSleep;
use camino::{Utf8Path, Utf8PathBuf};
use clap::ValueEnum;
@@ -235,53 +242,85 @@ impl ConsoleConfig {
}
}
pub fn init_logging(file_name: &str) -> Option<WorkerGuard> {
pub fn init_logging(file_name: &str) -> WorkerGuard {
let (file_writer, guard) =
tracing_appender::non_blocking(tracing_appender::rolling::never("./logs/", file_name));
let file_logs = fmt::Layer::new()
.with_target(false)
.with_ansi(false)
.with_writer(file_writer);
let stderr_logs = fmt::Layer::new()
.with_target(false)
.with_writer(std::io::stderr);
tracing_subscriber::registry()
.with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
.with(file_logs)
.with(stderr_logs)
.init();
let disable_file_logging = match std::env::var("PAGESERVER_DISABLE_FILE_LOGGING") {
Ok(s) => s == "1" || s.to_lowercase() == "true",
Err(_) => false,
guard
}
pub fn init_s3_client(bucket_region: Region) -> Client {
let credentials_provider = {
// uses "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"
let chain = CredentialsProviderChain::first_try(
"env",
EnvironmentVariableCredentialsProvider::new(),
)
// uses "AWS_PROFILE" / `aws sso login --profile <profile>`
.or_else(
"profile-sso",
ProfileFileCredentialsProvider::builder().build(),
);
// Use SSO if we were given an account ID
match std::env::var("SSO_ACCOUNT_ID").ok() {
Some(sso_account) => chain.or_else(
"sso",
SsoCredentialsProvider::builder()
.account_id(sso_account)
.role_name("PowerUserAccess")
.start_url("https://neondb.awsapps.com/start")
.region(bucket_region.clone())
.build(),
),
None => chain,
}
.or_else(
// Finally try IMDS
"imds",
ImdsCredentialsProvider::builder().build(),
)
};
if disable_file_logging {
tracing_subscriber::registry()
.with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
.with(stderr_logs)
.init();
None
} else {
let (file_writer, guard) =
tracing_appender::non_blocking(tracing_appender::rolling::never("./logs/", file_name));
let file_logs = fmt::Layer::new()
.with_target(false)
.with_ansi(false)
.with_writer(file_writer);
tracing_subscriber::registry()
.with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
.with(stderr_logs)
.with(file_logs)
.init();
Some(guard)
}
}
let sleep_impl: Arc<dyn AsyncSleep> = Arc::new(TokioSleep::new());
pub async fn init_s3_client(bucket_region: Region) -> Client {
let config = aws_config::defaults(aws_config::BehaviorVersion::v2024_03_28())
let mut builder = Config::builder()
.behavior_version(
#[allow(deprecated)] /* TODO: https://github.com/neondatabase/neon/issues/7665 */
BehaviorVersion::v2023_11_09(),
)
.region(bucket_region)
.load()
.await;
Client::new(&config)
.retry_config(RetryConfig::adaptive().with_max_attempts(3))
.sleep_impl(SharedAsyncSleep::from(sleep_impl))
.credentials_provider(credentials_provider);
if let Ok(endpoint) = env::var("AWS_ENDPOINT_URL") {
builder = builder.endpoint_url(endpoint)
}
Client::from_conf(builder.build())
}
async fn init_remote(
fn init_remote(
bucket_config: BucketConfig,
node_kind: NodeKind,
) -> anyhow::Result<(Arc<Client>, RootTarget)> {
let bucket_region = Region::new(bucket_config.region);
let delimiter = "/".to_string();
let s3_client = Arc::new(init_s3_client(bucket_region).await);
let s3_client = Arc::new(init_s3_client(bucket_region));
let s3_root = match node_kind {
NodeKind::Pageserver => RootTarget::Pageserver(S3Target {
@@ -306,7 +345,7 @@ async fn list_objects_with_retries(
s3_target: &S3Target,
continuation_token: Option<String>,
) -> anyhow::Result<aws_sdk_s3::operation::list_objects_v2::ListObjectsV2Output> {
for trial in 0..MAX_RETRIES {
for _ in 0..MAX_RETRIES {
match s3_client
.list_objects_v2()
.bucket(&s3_target.bucket_name)
@@ -318,22 +357,16 @@ async fn list_objects_with_retries(
{
Ok(response) => return Ok(response),
Err(e) => {
if trial == MAX_RETRIES - 1 {
return Err(e)
.with_context(|| format!("Failed to list objects {MAX_RETRIES} times"));
}
error!(
"list_objects_v2 query failed: bucket_name={}, prefix={}, delimiter={}, error={}",
s3_target.bucket_name,
s3_target.prefix_in_bucket,
s3_target.delimiter,
DisplayErrorContext(e),
"list_objects_v2 query failed: {e}, bucket_name={}, prefix={}, delimiter={}",
s3_target.bucket_name, s3_target.prefix_in_bucket, s3_target.delimiter
);
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
Err(anyhow!("unreachable unless MAX_RETRIES==0"))
anyhow::bail!("Failed to list objects {MAX_RETRIES} times")
}
async fn download_object_with_retries(

View File

@@ -78,8 +78,6 @@ enum Command {
min_size: u64,
#[arg(short, long, default_value_t = false)]
ignore_deltas: bool,
#[arg(long = "concurrency", short = 'j', default_value_t = 64)]
concurrency: usize,
},
}
@@ -196,7 +194,7 @@ async fn main() -> anyhow::Result<()> {
concurrency,
} => {
let downloader =
SnapshotDownloader::new(bucket_config, tenant_id, output_path, concurrency).await?;
SnapshotDownloader::new(bucket_config, tenant_id, output_path, concurrency)?;
downloader.download().await
}
Command::PageserverPhysicalGc {
@@ -212,15 +210,10 @@ async fn main() -> anyhow::Result<()> {
Command::FindLargeObjects {
min_size,
ignore_deltas,
concurrency,
} => {
let summary = find_large_objects::find_large_objects(
bucket_config,
min_size,
ignore_deltas,
concurrency,
)
.await?;
let summary =
find_large_objects::find_large_objects(bucket_config, min_size, ignore_deltas)
.await?;
println!("{}", serde_json::to_string(&summary).unwrap());
Ok(())
}

View File

@@ -160,7 +160,7 @@ pub async fn pageserver_physical_gc(
min_age: Duration,
mode: GcMode,
) -> anyhow::Result<GcSummary> {
let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?;
let tenants = if tenant_ids.is_empty() {
futures::future::Either::Left(stream_tenants(&s3_client, &target))

View File

@@ -199,7 +199,7 @@ pub async fn scan_metadata(
bucket_config: BucketConfig,
tenant_ids: Vec<TenantShardId>,
) -> anyhow::Result<MetadataSummary> {
let (s3_client, target) = init_remote(bucket_config, NodeKind::Pageserver).await?;
let (s3_client, target) = init_remote(bucket_config, NodeKind::Pageserver)?;
let tenants = if tenant_ids.is_empty() {
futures::future::Either::Left(stream_tenants(&s3_client, &target))

View File

@@ -106,7 +106,7 @@ pub async fn scan_safekeeper_metadata(
let timelines = client.query(&query, &[]).await?;
info!("loaded {} timelines", timelines.len());
let (s3_client, target) = init_remote(bucket_config, NodeKind::Safekeeper).await?;
let (s3_client, target) = init_remote(bucket_config, NodeKind::Safekeeper)?;
let console_config = ConsoleConfig::from_env()?;
let cloud_admin_api_client = CloudAdminApiClient::new(console_config);

View File

@@ -28,13 +28,13 @@ pub struct SnapshotDownloader {
}
impl SnapshotDownloader {
pub async fn new(
pub fn new(
bucket_config: BucketConfig,
tenant_id: TenantId,
output_path: Utf8PathBuf,
concurrency: usize,
) -> anyhow::Result<Self> {
let (s3_client, s3_root) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
let (s3_client, s3_root) = init_remote(bucket_config.clone(), NodeKind::Pageserver)?;
Ok(Self {
s3_client,
s3_root,
@@ -215,8 +215,7 @@ impl SnapshotDownloader {
}
pub async fn download(&self) -> anyhow::Result<()> {
let (s3_client, target) =
init_remote(self.bucket_config.clone(), NodeKind::Pageserver).await?;
let (s3_client, target) = init_remote(self.bucket_config.clone(), NodeKind::Pageserver)?;
// Generate a stream of TenantShardId
let shards = stream_tenant_shards(&s3_client, &target, self.tenant_id).await?;

View File

@@ -1,263 +0,0 @@
from __future__ import annotations
import time
from typing import TYPE_CHECKING, cast
import requests
if TYPE_CHECKING:
from typing import Any, Dict, Literal, Optional, Union
from fixtures.pg_version import PgVersion
def connection_parameters_to_env(params: Dict[str, str]) -> Dict[str, str]:
return {
"PGHOST": params["host"],
"PGDATABASE": params["database"],
"PGUSER": params["role"],
"PGPASSWORD": params["password"],
}
class NeonAPI:
def __init__(self, neon_api_key: str, neon_api_base_url: str):
self.__neon_api_key = neon_api_key
self.__neon_api_base_url = neon_api_base_url.strip("/")
def __request(
self, method: Union[str, bytes], endpoint: str, **kwargs: Any
) -> requests.Response:
if "headers" not in kwargs:
kwargs["headers"] = {}
kwargs["headers"]["Authorization"] = f"Bearer {self.__neon_api_key}"
return requests.request(method, f"{self.__neon_api_base_url}{endpoint}", **kwargs)
def create_project(
self,
pg_version: Optional[PgVersion] = None,
name: Optional[str] = None,
branch_name: Optional[str] = None,
branch_role_name: Optional[str] = None,
branch_database_name: Optional[str] = None,
) -> Dict[str, Any]:
data: Dict[str, Any] = {
"project": {
"branch": {},
},
}
if name:
data["project"]["name"] = name
if pg_version:
data["project"]["pg_version"] = int(pg_version)
if branch_name:
data["project"]["branch"]["name"] = branch_name
if branch_role_name:
data["project"]["branch"]["role_name"] = branch_role_name
if branch_database_name:
data["project"]["branch"]["database_name"] = branch_database_name
resp = self.__request(
"POST",
"/projects",
headers={
"Accept": "application/json",
"Content-Type": "application/json",
},
json=data,
)
assert resp.status_code == 201
return cast("Dict[str, Any]", resp.json())
def get_project_details(self, project_id: str) -> Dict[str, Any]:
resp = self.__request(
"GET",
f"/projects/{project_id}",
headers={
"Accept": "application/json",
"Content-Type": "application/json",
},
)
assert resp.status_code == 200
return cast("Dict[str, Any]", resp.json())
def delete_project(
self,
project_id: str,
) -> Dict[str, Any]:
resp = self.__request(
"DELETE",
f"/projects/{project_id}",
headers={
"Accept": "application/json",
"Content-Type": "application/json",
},
)
assert resp.status_code == 200
return cast("Dict[str, Any]", resp.json())
def start_endpoint(
self,
project_id: str,
endpoint_id: str,
) -> Dict[str, Any]:
resp = self.__request(
"POST",
f"/projects/{project_id}/endpoints/{endpoint_id}/start",
headers={
"Accept": "application/json",
},
)
assert resp.status_code == 200
return cast("Dict[str, Any]", resp.json())
def suspend_endpoint(
self,
project_id: str,
endpoint_id: str,
) -> Dict[str, Any]:
resp = self.__request(
"POST",
f"/projects/{project_id}/endpoints/{endpoint_id}/suspend",
headers={
"Accept": "application/json",
},
)
assert resp.status_code == 200
return cast("Dict[str, Any]", resp.json())
def restart_endpoint(
self,
project_id: str,
endpoint_id: str,
) -> Dict[str, Any]:
resp = self.__request(
"POST",
f"/projects/{project_id}/endpoints/{endpoint_id}/restart",
headers={
"Accept": "application/json",
},
)
assert resp.status_code == 200
return cast("Dict[str, Any]", resp.json())
def create_endpoint(
self,
project_id: str,
branch_id: str,
endpoint_type: Literal["read_write", "read_only"],
settings: Dict[str, Any],
) -> Dict[str, Any]:
data: Dict[str, Any] = {
"endpoint": {
"branch_id": branch_id,
},
}
if endpoint_type:
data["endpoint"]["type"] = endpoint_type
if settings:
data["endpoint"]["settings"] = settings
resp = self.__request(
"POST",
f"/projects/{project_id}/endpoints",
headers={
"Accept": "application/json",
"Content-Type": "application/json",
},
json=data,
)
assert resp.status_code == 201
return cast("Dict[str, Any]", resp.json())
def get_connection_uri(
self,
project_id: str,
branch_id: Optional[str] = None,
endpoint_id: Optional[str] = None,
database_name: str = "neondb",
role_name: str = "neondb_owner",
pooled: bool = True,
) -> Dict[str, Any]:
resp = self.__request(
"GET",
f"/projects/{project_id}/connection_uri",
params={
"branch_id": branch_id,
"endpoint_id": endpoint_id,
"database_name": database_name,
"role_name": role_name,
"pooled": pooled,
},
headers={
"Accept": "application/json",
},
)
assert resp.status_code == 200
return cast("Dict[str, Any]", resp.json())
def get_branches(self, project_id: str) -> Dict[str, Any]:
resp = self.__request(
"GET",
f"/projects/{project_id}/branches",
headers={
"Accept": "application/json",
},
)
assert resp.status_code == 200
return cast("Dict[str, Any]", resp.json())
def get_endpoints(self, project_id: str) -> Dict[str, Any]:
resp = self.__request(
"GET",
f"/projects/{project_id}/endpoints",
headers={
"Accept": "application/json",
},
)
assert resp.status_code == 200
return cast("Dict[str, Any]", resp.json())
def get_operations(self, project_id: str) -> Dict[str, Any]:
resp = self.__request(
"GET",
f"/projects/{project_id}/operations",
headers={
"Accept": "application/json",
"Authorization": f"Bearer {self.__neon_api_key}",
},
)
assert resp.status_code == 200
return cast("Dict[str, Any]", resp.json())
def wait_for_operation_to_finish(self, project_id: str):
has_running = True
while has_running:
has_running = False
operations = self.get_operations(project_id)["operations"]
for op in operations:
if op["status"] in {"scheduling", "running", "cancelling"}:
has_running = True
time.sleep(0.5)

View File

@@ -87,8 +87,6 @@ from fixtures.utils import (
)
from fixtures.utils import AuxFileStore as AuxFileStore # reexport
from .neon_api import NeonAPI
"""
This file contains pytest fixtures. A fixture is a test resource that can be
summoned by placing its name in the test's arguments.
@@ -186,25 +184,6 @@ def versioned_pg_distrib_dir(pg_distrib_dir: Path, pg_version: PgVersion) -> Ite
yield versioned_dir
@pytest.fixture(scope="session")
def neon_api_key() -> str:
api_key = os.getenv("NEON_API_KEY")
if not api_key:
raise AssertionError("Set the NEON_API_KEY environment variable")
return api_key
@pytest.fixture(scope="session")
def neon_api_base_url() -> str:
return os.getenv("NEON_API_BASE_URL", "https://console-stage.neon.build/api/v2")
@pytest.fixture(scope="session")
def neon_api(neon_api_key: str, neon_api_base_url: str) -> NeonAPI:
return NeonAPI(neon_api_key, neon_api_base_url)
def shareable_scope(fixture_name: str, config: Config) -> Literal["session", "function"]:
"""Return either session of function scope, depending on TEST_SHARED_FIXTURES envvar.
@@ -2883,45 +2862,14 @@ class PgBin:
env.update(env_add)
return env
def _log_env(self, env: dict[str, str]) -> None:
env_s = {}
for k, v in env.items():
if k.startswith("PG") and k != "PGPASSWORD":
env_s[k] = v
log.debug(f"Environment: {env_s}")
def run_nonblocking(
self,
command: List[str],
env: Optional[Env] = None,
cwd: Optional[Union[str, Path]] = None,
) -> subprocess.Popen[Any]:
"""
Run one of the postgres binaries, not waiting for it to finish
The command should be in list form, e.g. ['pgbench', '-p', '55432']
All the necessary environment variables will be set.
If the first argument (the command name) doesn't include a path (no '/'
characters present), then it will be edited to include the correct path.
If you want stdout/stderr captured to files, use `run_capture` instead.
"""
self._fixpath(command)
log.info(f"Running command '{' '.join(command)}'")
env = self._build_env(env)
self._log_env(env)
return subprocess.Popen(command, env=env, cwd=cwd, stdout=subprocess.PIPE, text=True)
def run(
self,
command: List[str],
env: Optional[Env] = None,
cwd: Optional[Union[str, Path]] = None,
) -> None:
):
"""
Run one of the postgres binaries, waiting for it to finish
Run one of the postgres binaries.
The command should be in list form, e.g. ['pgbench', '-p', '55432']
@@ -2932,10 +2880,11 @@ class PgBin:
If you want stdout/stderr captured to files, use `run_capture` instead.
"""
proc = self.run_nonblocking(command, env, cwd)
proc.wait()
if proc.returncode != 0:
raise subprocess.CalledProcessError(proc.returncode, proc.args)
self._fixpath(command)
log.info(f"Running command '{' '.join(command)}'")
env = self._build_env(env)
subprocess.run(command, env=env, cwd=cwd, check=True)
def run_capture(
self,
@@ -2955,7 +2904,6 @@ class PgBin:
self._fixpath(command)
log.info(f"Running command '{' '.join(command)}'")
env = self._build_env(env)
self._log_env(env)
base_path, _, _ = subprocess_capture(
self.log_dir,
command,

View File

@@ -1,24 +1,8 @@
from __future__ import annotations
import time
import traceback
from typing import TYPE_CHECKING
import psycopg2
import psycopg2.extras
import pytest
from fixtures.benchmark_fixture import MetricReport
from fixtures.common_types import Lsn
from fixtures.log_helper import log
from fixtures.neon_api import connection_parameters_to_env
from fixtures.neon_fixtures import AuxFileStore, logical_replication_sync
from fixtures.pg_version import PgVersion
if TYPE_CHECKING:
from fixtures.benchmark_fixture import NeonBenchmarker
from fixtures.neon_api import NeonAPI
from fixtures.neon_fixtures import NeonEnv, PgBin
from fixtures.pg_version import PgVersion
from fixtures.neon_fixtures import AuxFileStore, NeonEnv, PgBin, logical_replication_sync
@pytest.mark.parametrize("pageserver_aux_file_policy", [AuxFileStore.V2])
@@ -42,6 +26,7 @@ def test_logical_replication(neon_simple_env: NeonEnv, pg_bin: PgBin, vanilla_pg
vanilla_pg.safe_psql("truncate table pgbench_history")
connstr = endpoint.connstr().replace("'", "''")
print(f"connstr='{connstr}'")
vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1")
# Wait logical replication channel to be established
@@ -57,286 +42,3 @@ def test_logical_replication(neon_simple_env: NeonEnv, pg_bin: PgBin, vanilla_pg
sum_master = endpoint.safe_psql("select sum(abalance) from pgbench_accounts")[0][0]
sum_replica = vanilla_pg.safe_psql("select sum(abalance) from pgbench_accounts")[0][0]
assert sum_master == sum_replica
def check_pgbench_still_running(pgbench, label=""):
rc = pgbench.poll()
if rc is not None:
raise RuntimeError(f"{label} pgbench terminated early with return code {rc}")
def measure_logical_replication_lag(sub_cur, pub_cur, timeout_sec=600):
start = time.time()
pub_cur.execute("SELECT pg_current_wal_flush_lsn()")
pub_lsn = Lsn(pub_cur.fetchall()[0][0])
while (time.time() - start) < timeout_sec:
sub_cur.execute("SELECT latest_end_lsn FROM pg_catalog.pg_stat_subscription")
res = sub_cur.fetchall()[0][0]
if res:
log.info(f"subscriber_lsn={res}")
sub_lsn = Lsn(res)
log.info(f"Subscriber LSN={sub_lsn}, publisher LSN={pub_lsn}")
if sub_lsn >= pub_lsn:
return time.time() - start
time.sleep(0.5)
raise TimeoutError(f"Logical replication sync took more than {timeout_sec} sec")
@pytest.mark.remote_cluster
@pytest.mark.timeout(2 * 60 * 60)
def test_subscriber_lag(
pg_bin: PgBin,
neon_api: NeonAPI,
pg_version: PgVersion,
zenbenchmark: NeonBenchmarker,
):
"""
Creates a publisher and subscriber, runs pgbench inserts on publisher and pgbench selects
on subscriber. Periodically restarts subscriber while still running the inserts, and
measures how long sync takes after restart.
"""
test_duration_min = 60
sync_interval_min = 5
pgbench_duration = f"-T{test_duration_min * 60 * 2}"
pub_project = neon_api.create_project(pg_version)
pub_project_id = pub_project["project"]["id"]
neon_api.wait_for_operation_to_finish(pub_project_id)
error_occurred = False
try:
sub_project = neon_api.create_project(pg_version)
sub_project_id = sub_project["project"]["id"]
sub_endpoint_id = sub_project["endpoints"][0]["id"]
neon_api.wait_for_operation_to_finish(sub_project_id)
try:
pub_env = connection_parameters_to_env(
pub_project["connection_uris"][0]["connection_parameters"]
)
sub_env = connection_parameters_to_env(
sub_project["connection_uris"][0]["connection_parameters"]
)
pub_connstr = pub_project["connection_uris"][0]["connection_uri"]
sub_connstr = sub_project["connection_uris"][0]["connection_uri"]
pg_bin.run_capture(["pgbench", "-i", "-s100"], env=pub_env)
pg_bin.run_capture(["pgbench", "-i", "-s100"], env=sub_env)
pub_conn = psycopg2.connect(pub_connstr)
sub_conn = psycopg2.connect(sub_connstr)
pub_conn.autocommit = True
sub_conn.autocommit = True
with pub_conn.cursor() as pub_cur, sub_conn.cursor() as sub_cur:
sub_cur.execute("truncate table pgbench_accounts")
sub_cur.execute("truncate table pgbench_history")
pub_cur.execute(
"create publication pub1 for table pgbench_accounts, pgbench_history"
)
sub_cur.execute(
f"create subscription sub1 connection '{pub_connstr}' publication pub1"
)
initial_sync_lag = measure_logical_replication_lag(sub_cur, pub_cur)
pub_conn.close()
sub_conn.close()
zenbenchmark.record(
"initial_sync_lag", initial_sync_lag, "s", MetricReport.LOWER_IS_BETTER
)
pub_workload = pg_bin.run_nonblocking(
["pgbench", "-c10", pgbench_duration, "-Mprepared"], env=pub_env
)
try:
sub_workload = pg_bin.run_nonblocking(
["pgbench", "-c10", pgbench_duration, "-S"],
env=sub_env,
)
try:
start = time.time()
while time.time() - start < test_duration_min * 60:
time.sleep(sync_interval_min * 60)
check_pgbench_still_running(pub_workload, "pub")
check_pgbench_still_running(sub_workload, "sub")
with psycopg2.connect(pub_connstr) as pub_conn, psycopg2.connect(
sub_connstr
) as sub_conn:
with pub_conn.cursor() as pub_cur, sub_conn.cursor() as sub_cur:
lag = measure_logical_replication_lag(sub_cur, pub_cur)
log.info(f"Replica lagged behind master by {lag} seconds")
zenbenchmark.record("replica_lag", lag, "s", MetricReport.LOWER_IS_BETTER)
sub_workload.terminate()
neon_api.restart_endpoint(
sub_project_id,
sub_endpoint_id,
)
neon_api.wait_for_operation_to_finish(sub_project_id)
sub_workload = pg_bin.run_nonblocking(
["pgbench", "-c10", pgbench_duration, "-S"],
env=sub_env,
)
# Measure storage to make sure replication information isn't bloating storage
sub_storage = neon_api.get_project_details(sub_project_id)["project"][
"synthetic_storage_size"
]
pub_storage = neon_api.get_project_details(pub_project_id)["project"][
"synthetic_storage_size"
]
zenbenchmark.record(
"sub_storage", sub_storage, "B", MetricReport.LOWER_IS_BETTER
)
zenbenchmark.record(
"pub_storage", pub_storage, "B", MetricReport.LOWER_IS_BETTER
)
finally:
sub_workload.terminate()
finally:
pub_workload.terminate()
except Exception as e:
error_occurred = True
log.error(f"Caught exception {e}")
log.error(traceback.format_exc())
finally:
if not error_occurred:
neon_api.delete_project(sub_project_id)
except Exception as e:
error_occurred = True
log.error(f"Caught exception {e}")
log.error(traceback.format_exc())
finally:
assert not error_occurred
neon_api.delete_project(pub_project_id)
@pytest.mark.remote_cluster
@pytest.mark.timeout(2 * 60 * 60)
def test_publisher_restart(
pg_bin: PgBin,
neon_api: NeonAPI,
pg_version: PgVersion,
zenbenchmark: NeonBenchmarker,
):
"""
Creates a publisher and subscriber, runs pgbench inserts on publisher and pgbench selects
on subscriber. Periodically restarts publisher (to exercise on-demand WAL download), and
measures how long sync takes after restart.
"""
test_duration_min = 60
sync_interval_min = 5
pgbench_duration = f"-T{test_duration_min * 60 * 2}"
pub_project = neon_api.create_project(pg_version)
pub_project_id = pub_project["project"]["id"]
pub_endpoint_id = pub_project["endpoints"][0]["id"]
neon_api.wait_for_operation_to_finish(pub_project_id)
error_occurred = False
try:
sub_project = neon_api.create_project(pg_version)
sub_project_id = sub_project["project"]["id"]
neon_api.wait_for_operation_to_finish(sub_project_id)
try:
pub_env = connection_parameters_to_env(
pub_project["connection_uris"][0]["connection_parameters"]
)
sub_env = connection_parameters_to_env(
sub_project["connection_uris"][0]["connection_parameters"]
)
pub_connstr = pub_project["connection_uris"][0]["connection_uri"]
sub_connstr = sub_project["connection_uris"][0]["connection_uri"]
pg_bin.run_capture(["pgbench", "-i", "-s100"], env=pub_env)
pg_bin.run_capture(["pgbench", "-i", "-s100"], env=sub_env)
pub_conn = psycopg2.connect(pub_connstr)
sub_conn = psycopg2.connect(sub_connstr)
pub_conn.autocommit = True
sub_conn.autocommit = True
with pub_conn.cursor() as pub_cur, sub_conn.cursor() as sub_cur:
sub_cur.execute("truncate table pgbench_accounts")
sub_cur.execute("truncate table pgbench_history")
pub_cur.execute(
"create publication pub1 for table pgbench_accounts, pgbench_history"
)
sub_cur.execute(
f"create subscription sub1 connection '{pub_connstr}' publication pub1"
)
initial_sync_lag = measure_logical_replication_lag(sub_cur, pub_cur)
pub_conn.close()
sub_conn.close()
zenbenchmark.record(
"initial_sync_lag", initial_sync_lag, "s", MetricReport.LOWER_IS_BETTER
)
pub_workload = pg_bin.run_nonblocking(
["pgbench", "-c10", pgbench_duration, "-Mprepared"], env=pub_env
)
try:
sub_workload = pg_bin.run_nonblocking(
["pgbench", "-c10", pgbench_duration, "-S"],
env=sub_env,
)
try:
start = time.time()
while time.time() - start < test_duration_min * 60:
time.sleep(sync_interval_min * 60)
check_pgbench_still_running(pub_workload, "pub")
check_pgbench_still_running(sub_workload, "sub")
pub_workload.terminate()
neon_api.restart_endpoint(
pub_project_id,
pub_endpoint_id,
)
neon_api.wait_for_operation_to_finish(pub_project_id)
pub_workload = pg_bin.run_nonblocking(
["pgbench", "-c10", pgbench_duration, "-Mprepared"],
env=pub_env,
)
with psycopg2.connect(pub_connstr) as pub_conn, psycopg2.connect(
sub_connstr
) as sub_conn:
with pub_conn.cursor() as pub_cur, sub_conn.cursor() as sub_cur:
lag = measure_logical_replication_lag(sub_cur, pub_cur)
log.info(f"Replica lagged behind master by {lag} seconds")
zenbenchmark.record("replica_lag", lag, "s", MetricReport.LOWER_IS_BETTER)
# Measure storage to make sure replication information isn't bloating storage
sub_storage = neon_api.get_project_details(sub_project_id)["project"][
"synthetic_storage_size"
]
pub_storage = neon_api.get_project_details(pub_project_id)["project"][
"synthetic_storage_size"
]
zenbenchmark.record(
"sub_storage", sub_storage, "B", MetricReport.LOWER_IS_BETTER
)
zenbenchmark.record(
"pub_storage", pub_storage, "B", MetricReport.LOWER_IS_BETTER
)
finally:
sub_workload.terminate()
finally:
pub_workload.terminate()
except Exception as e:
error_occurred = True
log.error(f"Caught exception {e}")
log.error(traceback.format_exc())
finally:
if not error_occurred:
neon_api.delete_project(sub_project_id)
except Exception as e:
error_occurred = True
log.error(f"Caught exception {e}")
log.error(traceback.format_exc())
finally:
assert not error_occurred
neon_api.delete_project(pub_project_id)

View File

@@ -1,296 +0,0 @@
from __future__ import annotations
import csv
import os
import subprocess
import time
import traceback
from pathlib import Path
from typing import TYPE_CHECKING
import psycopg2
import psycopg2.extras
import pytest
from fixtures.benchmark_fixture import MetricReport
from fixtures.common_types import Lsn
from fixtures.log_helper import log
from fixtures.neon_api import connection_parameters_to_env
from fixtures.pg_version import PgVersion
if TYPE_CHECKING:
from typing import Any, List, Optional
from fixtures.benchmark_fixture import NeonBenchmarker
from fixtures.neon_api import NeonAPI
from fixtures.neon_fixtures import PgBin
# Granularity of ~0.5 sec
def measure_replication_lag(master, replica, timeout_sec=600):
start = time.time()
master.execute("SELECT pg_current_wal_flush_lsn()")
master_lsn = Lsn(master.fetchall()[0][0])
while (time.time() - start) < timeout_sec:
replica.execute("select pg_last_wal_replay_lsn()")
replica_lsn = replica.fetchall()[0][0]
if replica_lsn:
if Lsn(replica_lsn) >= master_lsn:
return time.time() - start
time.sleep(0.5)
raise TimeoutError(f"Replication sync took more than {timeout_sec} sec")
def check_pgbench_still_running(pgbench):
rc = pgbench.poll()
if rc is not None:
raise RuntimeError(f"Pgbench terminated early with return code {rc}")
@pytest.mark.remote_cluster
@pytest.mark.timeout(2 * 60 * 60)
def test_ro_replica_lag(
pg_bin: PgBin,
neon_api: NeonAPI,
pg_version: PgVersion,
zenbenchmark: NeonBenchmarker,
):
test_duration_min = 60
sync_interval_min = 10
pgbench_duration = f"-T{test_duration_min * 60 * 2}"
project = neon_api.create_project(pg_version)
project_id = project["project"]["id"]
neon_api.wait_for_operation_to_finish(project_id)
error_occurred = False
try:
branch_id = project["branch"]["id"]
master_connstr = project["connection_uris"][0]["connection_uri"]
master_env = connection_parameters_to_env(
project["connection_uris"][0]["connection_parameters"]
)
replica = neon_api.create_endpoint(
project_id,
branch_id,
endpoint_type="read_only",
settings={"pg_settings": {"hot_standby_feedback": "on"}},
)
replica_env = master_env.copy()
replica_env["PGHOST"] = replica["endpoint"]["host"]
neon_api.wait_for_operation_to_finish(project_id)
replica_connstr = neon_api.get_connection_uri(
project_id,
endpoint_id=replica["endpoint"]["id"],
)["uri"]
pg_bin.run_capture(["pgbench", "-i", "-s100"], env=master_env)
master_workload = pg_bin.run_nonblocking(
["pgbench", "-c10", pgbench_duration, "-Mprepared"],
env=master_env,
)
try:
replica_workload = pg_bin.run_nonblocking(
["pgbench", "-c10", pgbench_duration, "-S"],
env=replica_env,
)
try:
start = time.time()
while time.time() - start < test_duration_min * 60:
check_pgbench_still_running(master_workload)
check_pgbench_still_running(replica_workload)
time.sleep(sync_interval_min * 60)
with psycopg2.connect(master_connstr) as conn_master, psycopg2.connect(
replica_connstr
) as conn_replica:
with conn_master.cursor() as cur_master, conn_replica.cursor() as cur_replica:
lag = measure_replication_lag(cur_master, cur_replica)
log.info(f"Replica lagged behind master by {lag} seconds")
zenbenchmark.record("replica_lag", lag, "s", MetricReport.LOWER_IS_BETTER)
finally:
replica_workload.terminate()
finally:
master_workload.terminate()
except Exception as e:
error_occurred = True
log.error(f"Caught exception: {e}")
log.error(traceback.format_exc())
finally:
assert not error_occurred # Fail the test if an error occurred
neon_api.delete_project(project_id)
def report_pgbench_aggregate_intervals(
output_dir: Path,
prefix: str,
zenbenchmark: NeonBenchmarker,
):
for filename in os.listdir(output_dir):
if filename.startswith(prefix):
# The file will be in the form <prefix>_<node>.<pid>
# So we first lop off the .<pid>, and then lop off the prefix and the _
node = filename.split(".")[0][len(prefix) + 1 :]
with open(output_dir / filename) as f:
reader = csv.reader(f, delimiter=" ")
for line in reader:
num_transactions = int(line[1])
if num_transactions == 0:
continue
sum_latency = int(line[2])
sum_lag = int(line[3])
zenbenchmark.record(
f"{node}_num_txns", num_transactions, "txns", MetricReport.HIGHER_IS_BETTER
)
zenbenchmark.record(
f"{node}_avg_latency",
sum_latency / num_transactions,
"s",
MetricReport.LOWER_IS_BETTER,
)
zenbenchmark.record(
f"{node}_avg_lag",
sum_lag / num_transactions,
"s",
MetricReport.LOWER_IS_BETTER,
)
@pytest.mark.remote_cluster
@pytest.mark.timeout(2 * 60 * 60)
def test_replication_start_stop(
pg_bin: PgBin,
test_output_dir: Path,
neon_api: NeonAPI,
pg_version: PgVersion,
zenbenchmark: NeonBenchmarker,
):
"""
Cycles through different configurations of read replicas being enabled disabled. The whole time,
there's a pgbench read/write workload going on the master. For each replica, we either turn it
on or off, and see how long it takes to catch up after some set amount of time of replicating
the pgbench.
"""
prefix = "pgbench_agg"
num_replicas = 2
configuration_test_time_sec = 10 * 60
pgbench_duration = f"-T{2 ** num_replicas * configuration_test_time_sec}"
error_occurred = False
project = neon_api.create_project(pg_version)
project_id = project["project"]["id"]
neon_api.wait_for_operation_to_finish(project_id)
try:
branch_id = project["branch"]["id"]
master_connstr = project["connection_uris"][0]["connection_uri"]
master_env = connection_parameters_to_env(
project["connection_uris"][0]["connection_parameters"]
)
replicas = []
for _ in range(num_replicas):
replicas.append(
neon_api.create_endpoint(
project_id,
branch_id,
endpoint_type="read_only",
settings={"pg_settings": {"hot_standby_feedback": "on"}},
)
)
neon_api.wait_for_operation_to_finish(project_id)
replica_connstr = [
neon_api.get_connection_uri(
project_id,
endpoint_id=replicas[i]["endpoint"]["id"],
)["uri"]
for i in range(num_replicas)
]
replica_env = [master_env.copy() for _ in range(num_replicas)]
for i in range(num_replicas):
replica_env[i]["PGHOST"] = replicas[i]["endpoint"]["host"]
pg_bin.run_capture(["pgbench", "-i", "-s10"], env=master_env)
# Sync replicas
with psycopg2.connect(master_connstr) as conn_master:
with conn_master.cursor() as cur_master:
for i in range(num_replicas):
conn_replica = psycopg2.connect(replica_connstr[i])
measure_replication_lag(cur_master, conn_replica.cursor())
master_pgbench = pg_bin.run_nonblocking(
[
"pgbench",
"-c10",
pgbench_duration,
"-Mprepared",
"--log",
f"--log-prefix={test_output_dir}/{prefix}_master",
f"--aggregate-interval={configuration_test_time_sec}",
],
env=master_env,
)
replica_pgbench: List[Optional[subprocess.Popen[Any]]] = [None for _ in range(num_replicas)]
# Use the bits of iconfig to tell us which configuration we are on. For example
# a iconfig of 2 is 10 in binary, indicating replica 0 is suspended and replica 1 is
# alive.
for iconfig in range((1 << num_replicas) - 1, -1, -1):
def replica_enabled(iconfig: int = iconfig):
return bool((iconfig >> 1) & 1)
# Change configuration
for ireplica in range(num_replicas):
if replica_enabled() and replica_pgbench[ireplica] is None:
replica_pgbench[ireplica] = pg_bin.run_nonblocking(
[
"pgbench",
"-c10",
"-S",
pgbench_duration,
"--log",
f"--log-prefix={test_output_dir}/{prefix}_replica_{ireplica}",
f"--aggregate-interval={configuration_test_time_sec}",
],
env=replica_env[ireplica],
)
elif not replica_enabled() and replica_pgbench[ireplica] is not None:
pgb = replica_pgbench[ireplica]
assert pgb is not None
pgb.terminate()
pgb.wait()
replica_pgbench[ireplica] = None
neon_api.suspend_endpoint(
project_id,
replicas[ireplica]["endpoint"]["id"],
)
neon_api.wait_for_operation_to_finish(project_id)
time.sleep(configuration_test_time_sec)
with psycopg2.connect(master_connstr) as conn_master:
with conn_master.cursor() as cur_master:
for ireplica in range(num_replicas):
replica_conn = psycopg2.connect(replica_connstr[ireplica])
lag = measure_replication_lag(cur_master, replica_conn.cursor())
zenbenchmark.record(
f"Replica {ireplica} lag", lag, "s", MetricReport.LOWER_IS_BETTER
)
log.info(
f"Replica {ireplica} lagging behind master by {lag} seconds after configuration {iconfig:>b}"
)
master_pgbench.terminate()
except Exception as e:
error_occurred = True
log.error(f"Caught exception {e}")
log.error(traceback.format_exc())
finally:
assert not error_occurred
neon_api.delete_project(project_id)
# Only report results if we didn't error out
report_pgbench_aggregate_intervals(test_output_dir, prefix, zenbenchmark)

View File

@@ -8,11 +8,8 @@ from typing import TYPE_CHECKING, cast
import pytest
from fixtures.neon_fixtures import (
Endpoint,
NeonEnv,
NeonEnvBuilder,
check_restored_datadir_content,
tenant_get_shards,
)
from fixtures.pg_version import PgVersion
from fixtures.remote_storage import s3_storage
@@ -24,97 +21,6 @@ if TYPE_CHECKING:
from pytest import CaptureFixture
TENANT_CONF = {
# Scaled down thresholds so that we are exercising the pageserver beyond just writing
# ephemeral/L0 layers, and because debug-mode code is slow to read from full sized ephemeral layer files.
"pitr_interval": "60s",
"checkpoint_distance": f"{8 * 1024 * 1024}",
"compaction_target_size": f"{8 * 1024 * 1024}",
}
# # Ensure that compaction works, on a timeline containing all the diversity that postgres regression tests create.
# # There should have been compactions mid-test as well, this final check is in addition those.
# for (shard, pageserver) in tenant_get_shards(env, env.initial_tenant):
# pageserver.http_client().timeline_checkpoint(env.initial_tenant, env.initial_timeline, force_repartition=True, force_image_layer_creation=True)
def post_checks(env: NeonEnv, test_output_dir: Path, db_name: str, endpoint: Endpoint):
"""
After running some opaque tests that create interesting content in a timeline, run
some generic integrity checks that the storage stack is able to reproduce the written
data properly.
"""
ignored_files: Optional[list[str]] = None
# Neon handles unlogged relations in a special manner. During a
# basebackup, we ship the init fork as the main fork. This presents a
# problem in that the endpoint's data directory and the basebackup will
# have differences and will fail the eventual file comparison.
#
# Unlogged tables were introduced in version 9.1. ALTER TABLE grew
# support for setting the persistence of a table in 9.5. The reason that
# this doesn't affect versions < 15 (but probably would between 9.1 and
# 9.5) is that all the regression tests that deal with unlogged tables
# up until that point dropped the unlogged tables or set them to logged
# at some point during the test.
#
# In version 15, Postgres grew support for unlogged sequences, and with
# that came a few more regression tests. These tests did not all drop
# the unlogged tables/sequences prior to finishing.
#
# But unlogged sequences came with a bug in that, sequences didn't
# inherit the persistence of their "parent" tables if they had one. This
# was fixed and backported to 15, thus exacerbating our problem a bit.
#
# So what we can do is just ignore file differences between the data
# directory and basebackup for unlogged relations.
results = cast(
"list[tuple[str, str]]",
endpoint.safe_psql(
"""
SELECT
relkind,
pg_relation_filepath(
pg_filenode_relation(reltablespace, relfilenode)
) AS unlogged_relation_paths
FROM pg_class
WHERE relpersistence = 'u'
""",
dbname=db_name,
),
)
unlogged_relation_files: list[str] = []
for r in results:
unlogged_relation_files.append(r[1])
# This is related to the following Postgres commit:
#
# commit ccadf73163ca88bdaa74b8223d4dde05d17f550b
# Author: Heikki Linnakangas <heikki.linnakangas@iki.fi>
# Date: 2023-08-23 09:21:31 -0500
#
# Use the buffer cache when initializing an unlogged index.
#
# This patch was backpatched to 16. Without it, the LSN in the
# page header would be 0/0 in the data directory, which wouldn't
# match the LSN generated during the basebackup, thus creating
# a difference.
if env.pg_version <= PgVersion.V15 and r[0] == "i":
unlogged_relation_files.append(f"{r[1]}_init")
ignored_files = unlogged_relation_files
check_restored_datadir_content(test_output_dir, env, endpoint, ignored_files=ignored_files)
# Ensure that compaction works, on a timeline containing all the diversity that postgres regression tests create.
# There should have been compactions mid-test as well, this final check is in addition those.
for shard, pageserver in tenant_get_shards(env, env.initial_tenant):
pageserver.http_client().timeline_checkpoint(
shard, env.initial_timeline, force_repartition=True, force_image_layer_creation=True
)
# Run the main PostgreSQL regression tests, in src/test/regress.
#
@pytest.mark.timeout(600)
@@ -139,10 +45,7 @@ def test_pg_regress(
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.enable_scrub_on_exit()
env = neon_env_builder.init_start(
initial_tenant_conf=TENANT_CONF,
initial_tenant_shard_count=shard_count,
)
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)
# Connect to postgres and create a database called "regression".
endpoint = env.endpoints.create_start("main")
@@ -181,7 +84,67 @@ def test_pg_regress(
with capsys.disabled():
pg_bin.run(pg_regress_command, env=env_vars, cwd=runpath)
post_checks(env, test_output_dir, DBNAME, endpoint)
ignored_files: Optional[list[str]] = None
# Neon handles unlogged relations in a special manner. During a
# basebackup, we ship the init fork as the main fork. This presents a
# problem in that the endpoint's data directory and the basebackup will
# have differences and will fail the eventual file comparison.
#
# Unlogged tables were introduced in version 9.1. ALTER TABLE grew
# support for setting the persistence of a table in 9.5. The reason that
# this doesn't affect versions < 15 (but probably would between 9.1 and
# 9.5) is that all the regression tests that deal with unlogged tables
# up until that point dropped the unlogged tables or set them to logged
# at some point during the test.
#
# In version 15, Postgres grew support for unlogged sequences, and with
# that came a few more regression tests. These tests did not all drop
# the unlogged tables/sequences prior to finishing.
#
# But unlogged sequences came with a bug in that, sequences didn't
# inherit the persistence of their "parent" tables if they had one. This
# was fixed and backported to 15, thus exacerbating our problem a bit.
#
# So what we can do is just ignore file differences between the data
# directory and basebackup for unlogged relations.
results = cast(
"list[tuple[str, str]]",
endpoint.safe_psql(
"""
SELECT
relkind,
pg_relation_filepath(
pg_filenode_relation(reltablespace, relfilenode)
) AS unlogged_relation_paths
FROM pg_class
WHERE relpersistence = 'u'
""",
dbname=DBNAME,
),
)
unlogged_relation_files: list[str] = []
for r in results:
unlogged_relation_files.append(r[1])
# This is related to the following Postgres commit:
#
# commit ccadf73163ca88bdaa74b8223d4dde05d17f550b
# Author: Heikki Linnakangas <heikki.linnakangas@iki.fi>
# Date: 2023-08-23 09:21:31 -0500
#
# Use the buffer cache when initializing an unlogged index.
#
# This patch was backpatched to 16. Without it, the LSN in the
# page header would be 0/0 in the data directory, which wouldn't
# match the LSN generated during the basebackup, thus creating
# a difference.
if env.pg_version <= PgVersion.V15 and r[0] == "i":
unlogged_relation_files.append(f"{r[1]}_init")
ignored_files = unlogged_relation_files
check_restored_datadir_content(test_output_dir, env, endpoint, ignored_files=ignored_files)
# Run the PostgreSQL "isolation" tests, in src/test/isolation.
@@ -196,20 +159,16 @@ def test_isolation(
pg_distrib_dir: Path,
shard_count: Optional[int],
):
DBNAME = "isolation_regression"
if shard_count is not None:
neon_env_builder.num_pageservers = shard_count
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.enable_scrub_on_exit()
env = neon_env_builder.init_start(
initial_tenant_conf=TENANT_CONF, initial_tenant_shard_count=shard_count
)
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)
# Connect to postgres and create a database called "regression".
# isolation tests use prepared transactions, so enable them
endpoint = env.endpoints.create_start("main", config_lines=["max_prepared_transactions=100"])
endpoint.safe_psql(f"CREATE DATABASE {DBNAME}")
endpoint.safe_psql("CREATE DATABASE isolation_regression")
# Create some local directories for pg_isolation_regress to run in.
runpath = test_output_dir / "regress"
@@ -243,9 +202,6 @@ def test_isolation(
with capsys.disabled():
pg_bin.run(pg_isolation_regress_command, env=env_vars, cwd=runpath)
# This fails with a mismatch on `pg_multixact/offsets/0000`
# post_checks(env, test_output_dir, DBNAME, endpoint)
# Run extra Neon-specific pg_regress-based tests. The tests and their
# schedule file are in the sql_regress/ directory.
@@ -259,19 +215,15 @@ def test_sql_regress(
pg_distrib_dir: Path,
shard_count: Optional[int],
):
DBNAME = "regression"
if shard_count is not None:
neon_env_builder.num_pageservers = shard_count
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.enable_scrub_on_exit()
env = neon_env_builder.init_start(
initial_tenant_conf=TENANT_CONF, initial_tenant_shard_count=shard_count
)
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)
# Connect to postgres and create a database called "regression".
endpoint = env.endpoints.create_start("main")
endpoint.safe_psql(f"CREATE DATABASE {DBNAME}")
endpoint.safe_psql("CREATE DATABASE regression")
# Create some local directories for pg_regress to run in.
runpath = test_output_dir / "regress"
@@ -306,4 +258,4 @@ def test_sql_regress(
with capsys.disabled():
pg_bin.run(pg_regress_command, env=env_vars, cwd=runpath)
post_checks(env, test_output_dir, DBNAME, endpoint)
check_restored_datadir_content(test_output_dir, env, endpoint)

View File

@@ -1,11 +1,7 @@
from __future__ import annotations
import random
import time
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from fixtures.neon_fixtures import NeonEnv
from fixtures.neon_fixtures import NeonEnv
def test_physical_replication(neon_simple_env: NeonEnv):

View File

@@ -54,4 +54,4 @@ def test_subscriber_restart(neon_simple_env: NeonEnv):
pcur.execute(f"INSERT into t values ({n_records}, 0)")
n_records += 1
with sub.cursor() as scur:
wait_until(60, 0.5, check_that_changes_propagated)
wait_until(10, 0.5, check_that_changes_propagated)

View File

@@ -720,30 +720,9 @@ def test_lsn_lease_size(neon_env_builder: NeonEnvBuilder, test_output_dir: Path,
They should have the same effect.
"""
def assert_size_approx_equal_for_lease_test(size_lease, size_branch):
"""
Tests that evaluate sizes are checking the pageserver space consumption
that sits many layers below the user input. The exact space needed
varies slightly depending on postgres behavior.
Rather than expecting postgres to be determinstic and occasionally
failing the test, we permit sizes for the same data to vary by a few pages.
"""
# FIXME(yuchen): The delta is too large, used as temp solution to pass the test reliably.
# Investigate and reduce the threshold.
threshold = 22 * 8272
log.info(
f"delta: size_branch({size_branch}) - size_lease({size_lease}) = {size_branch - size_lease}"
)
assert size_lease == pytest.approx(size_branch, abs=threshold)
conf = {
"pitr_interval": "0s" if zero_gc else "3600s",
"gc_period": "0s",
"compaction_period": "0s",
}
env = neon_env_builder.init_start(initial_tenant_conf=conf)
@@ -755,7 +734,7 @@ def test_lsn_lease_size(neon_env_builder: NeonEnvBuilder, test_output_dir: Path,
tenant, timeline = env.neon_cli.create_tenant(conf=conf)
lease_res = insert_with_action(env, tenant, timeline, test_output_dir, action="lease")
assert_size_approx_equal_for_lease_test(lease_res, ro_branch_res)
assert_size_approx_equal(lease_res, ro_branch_res)
def insert_with_action(
@@ -775,11 +754,7 @@ def insert_with_action(
"""
client = env.pageserver.http_client()
with env.endpoints.create_start(
"main",
tenant_id=tenant,
config_lines=["autovacuum=off"],
) as ep:
with env.endpoints.create_start("main", tenant_id=tenant) as ep:
initial_size = client.tenant_size(tenant)
log.info(f"initial size: {initial_size}")

View File

@@ -152,12 +152,10 @@ def test_timeline_size_quota_on_startup(neon_env_builder: NeonEnvBuilder):
client.timeline_wait_logical_size(env.initial_tenant, new_timeline_id)
size_limit_mb = 30
endpoint_main = env.endpoints.create(
"test_timeline_size_quota_on_startup",
# Set small limit for the test
config_lines=[f"neon.max_cluster_size={size_limit_mb}MB"],
config_lines=["neon.max_cluster_size=30MB"],
)
endpoint_main.start()
@@ -167,39 +165,17 @@ def test_timeline_size_quota_on_startup(neon_env_builder: NeonEnvBuilder):
# Insert many rows. This query must fail because of space limit
try:
def write_rows(count):
for _i in range(count):
cur.execute(
"""
INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 100) g
for _i in range(5000):
cur.execute(
"""
)
INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 100) g
"""
)
# Write some data that exceeds limit, then let the pageserver ingest it to guarantee that some feedback has made it to
# the safekeeper, then try to write some more. We expect either the initial writes or the ones after
# the wait_for_last_flush_lsn to generate an exception.
#
# Without the wait_for_last_flush_lsn, the size limit sometimes isn't enforced (see https://github.com/neondatabase/neon/issues/6562)
write_rows(2500)
wait_for_last_flush_lsn(env, endpoint_main, env.initial_tenant, new_timeline_id)
logical_size = env.pageserver.http_client().timeline_detail(
env.initial_tenant, new_timeline_id
)["current_logical_size"]
assert logical_size > size_limit_mb * 1024 * 1024
write_rows(2500)
# If we get here, the timeline size limit failed. Find out from the pageserver how large it
# thinks the timeline is.
wait_for_last_flush_lsn(env, endpoint_main, env.initial_tenant, new_timeline_id)
logical_size = env.pageserver.http_client().timeline_detail(
env.initial_tenant, new_timeline_id
)["current_logical_size"]
log.error(
f"Query unexpectedly succeeded, pageserver logical size is {logical_size}"
)
# If we get here, the timeline size limit failed
log.error("Query unexpectedly succeeded")
raise AssertionError()
except psycopg2.errors.DiskFull as err:

View File

@@ -30,12 +30,13 @@ chrono = { version = "0.4", default-features = false, features = ["clock", "serd
clap = { version = "4", features = ["derive", "string"] }
clap_builder = { version = "4", default-features = false, features = ["color", "help", "std", "string", "suggestions", "usage"] }
crossbeam-utils = { version = "0.8" }
deranged = { version = "0.3", default-features = false, features = ["powerfmt", "serde", "std"] }
either = { version = "1" }
fail = { version = "0.5", default-features = false, features = ["failpoints"] }
futures-channel = { version = "0.3", features = ["sink"] }
futures-core = { version = "0.3" }
futures-executor = { version = "0.3" }
futures-io = { version = "0.3" }
futures-sink = { version = "0.3" }
futures-util = { version = "0.3", features = ["channel", "io", "sink"] }
getrandom = { version = "0.2", default-features = false, features = ["std"] }
hashbrown = { version = "0.14", features = ["raw"] }
@@ -68,7 +69,6 @@ sha2 = { version = "0.10", features = ["asm"] }
smallvec = { version = "1", default-features = false, features = ["const_new", "write"] }
subtle = { version = "2" }
sync_wrapper = { version = "0.1", default-features = false, features = ["futures"] }
tikv-jemalloc-sys = { version = "0.5" }
time = { version = "0.3", features = ["macros", "serde-well-known"] }
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "test-util"] }
tokio-rustls = { version = "0.24" }
@@ -106,9 +106,7 @@ num-integer = { version = "0.1", features = ["i128"] }
num-traits = { version = "0.2", features = ["i128", "libm"] }
once_cell = { version = "1" }
parquet = { git = "https://github.com/apache/arrow-rs", branch = "master", default-features = false, features = ["zstd"] }
proc-macro2 = { version = "1" }
prost = { version = "0.11" }
quote = { version = "1" }
regex = { version = "1" }
regex-automata = { version = "0.4", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] }
regex-syntax = { version = "0.8" }