Compare commits

..

4 Commits

Author SHA1 Message Date
Arpad Müller
521a9590cf Merge branch 'arpad/rustls_everywhere' into arpad/musl_libc_v2 2024-05-25 02:30:55 +02:00
Arpad Müller
3edf932e80 Use tokio-epoll-uring patched for statx-sys 2024-05-25 02:30:27 +02:00
Arpad Müller
b3df345984 Deny postgres-native-tls 2024-05-25 02:09:34 +02:00
Arpad Müller
cdcfb504b6 Drop postgres-native-tls in favour of tokio-postgres-rustls 2024-05-25 02:07:20 +02:00
59 changed files with 332 additions and 1434 deletions

View File

@@ -38,11 +38,6 @@ on:
description: 'AWS-RDS and AWS-AURORA normally only run on Saturday. Set this to true to run them on every workflow_dispatch'
required: false
default: false
run_only_pgvector_tests:
type: boolean
description: 'Run pgvector tests but no other tests. If not set, all tests including pgvector tests will be run'
required: false
default: false
defaults:
run:
@@ -55,7 +50,6 @@ concurrency:
jobs:
bench:
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
env:
TEST_PG_BENCH_DURATIONS_MATRIX: "300"
TEST_PG_BENCH_SCALES_MATRIX: "10,100"
@@ -126,7 +120,6 @@ jobs:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
generate-matrices:
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
# Create matrices for the benchmarking jobs, so we run benchmarks on rds only once a week (on Saturday)
#
# Available platforms:
@@ -204,7 +197,6 @@ jobs:
echo "matrix=$(echo "$matrix" | jq --compact-output '.')" >> $GITHUB_OUTPUT
pgbench-compare:
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
needs: [ generate-matrices ]
strategy:
@@ -351,92 +343,6 @@ jobs:
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
pgbench-pgvector:
env:
TEST_PG_BENCH_DURATIONS_MATRIX: "15m"
TEST_PG_BENCH_SCALES_MATRIX: "1"
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
DEFAULT_PG_VERSION: 16
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
PLATFORM: "neon-captest-pgvector"
runs-on: [ self-hosted, us-east-2, x64 ]
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/build-tools:pinned
options: --init
steps:
- uses: actions/checkout@v4
- name: Download Neon artifact
uses: ./.github/actions/download
with:
name: neon-${{ runner.os }}-release-artifact
path: /tmp/neon/
prefix: latest
- name: Add Postgres binaries to PATH
run: |
${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin/pgbench --version
echo "${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin" >> $GITHUB_PATH
- name: Set up Connection String
id: set-up-connstr
run: |
CONNSTR=${{ secrets.BENCHMARK_PGVECTOR_CONNSTR }}
echo "connstr=${CONNSTR}" >> $GITHUB_OUTPUT
QUERIES=("SELECT version()")
QUERIES+=("SHOW neon.tenant_id")
QUERIES+=("SHOW neon.timeline_id")
for q in "${QUERIES[@]}"; do
psql ${CONNSTR} -c "${q}"
done
- name: Benchmark pgvector hnsw indexing
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance/test_perf_olap.py
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_pgvector_indexing
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
- name: Benchmark pgvector hnsw queries
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_pgbench_remote_pgvector
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
- name: Create Allure report
if: ${{ !cancelled() }}
uses: ./.github/actions/allure-report-generate
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}
uses: slackapi/slack-github-action@v1
with:
channel-id: "C033QLM5P7D" # dev-staging-stream
slack-message: "Periodic perf testing neon-captest-pgvector: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
clickbench-compare:
# ClichBench DB for rds-aurora and rds-Postgres deployed to the same clusters
# we use for performance testing in pgbench-compare.
@@ -445,7 +351,7 @@ jobs:
#
# *_CLICKBENCH_CONNSTR: Genuine ClickBench DB with ~100M rows
# *_CLICKBENCH_10M_CONNSTR: DB with the first 10M rows of ClickBench DB
if: ${{ !cancelled() && (github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null) }}
if: ${{ !cancelled() }}
needs: [ generate-matrices, pgbench-compare ]
strategy:
@@ -549,7 +455,7 @@ jobs:
# We might change it after https://github.com/neondatabase/neon/issues/2900.
#
# *_TPCH_S10_CONNSTR: DB generated with scale factor 10 (~10 GB)
if: ${{ !cancelled() && (github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null) }}
if: ${{ !cancelled() }}
needs: [ generate-matrices, clickbench-compare ]
strategy:
@@ -651,7 +557,7 @@ jobs:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
user-examples-compare:
if: ${{ !cancelled() && (github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null) }}
if: ${{ !cancelled() }}
needs: [ generate-matrices, tpch-compare ]
strategy:

149
Cargo.lock generated
View File

@@ -776,6 +776,7 @@ dependencies = [
"pin-project",
"serde",
"time",
"tz-rs",
"url",
"uuid",
]
@@ -1290,6 +1291,12 @@ dependencies = [
"tiny-keccak",
]
[[package]]
name = "const_fn"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fbdcdcb6d86f71c5e97409ad45898af11cbc995b4ee8112d59095a28d376c935"
[[package]]
name = "const_format"
version = "0.2.30"
@@ -1969,6 +1976,21 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "foreign-types"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1"
dependencies = [
"foreign-types-shared",
]
[[package]]
name = "foreign-types-shared"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b"
[[package]]
name = "form_urlencoded"
version = "1.1.0"
@@ -2598,6 +2620,19 @@ dependencies = [
"tokio-io-timeout",
]
[[package]]
name = "hyper-tls"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905"
dependencies = [
"bytes",
"hyper 0.14.26",
"native-tls",
"tokio",
"tokio-native-tls",
]
[[package]]
name = "hyper-util"
version = "0.1.3"
@@ -3133,6 +3168,24 @@ version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
[[package]]
name = "native-tls"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e"
dependencies = [
"lazy_static",
"libc",
"log",
"openssl",
"openssl-probe",
"openssl-sys",
"schannel",
"security-framework",
"security-framework-sys",
"tempfile",
]
[[package]]
name = "nix"
version = "0.25.1"
@@ -3303,6 +3356,15 @@ dependencies = [
"libc",
]
[[package]]
name = "num_threads"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44"
dependencies = [
"libc",
]
[[package]]
name = "oauth2"
version = "4.4.2"
@@ -3352,12 +3414,50 @@ version = "11.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575"
[[package]]
name = "openssl"
version = "0.10.60"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79a4c6c3a2b158f7f8f2a2fc5a969fa3a068df6fc9dbb4a43845436e3af7c800"
dependencies = [
"bitflags 2.4.1",
"cfg-if",
"foreign-types",
"libc",
"once_cell",
"openssl-macros",
"openssl-sys",
]
[[package]]
name = "openssl-macros"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.52",
]
[[package]]
name = "openssl-probe"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]]
name = "openssl-sys"
version = "0.9.96"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3812c071ba60da8b5677cc12bcb1d42989a65553772897a7e0355545a819838f"
dependencies = [
"cc",
"libc",
"pkg-config",
"vcpkg",
]
[[package]]
name = "opentelemetry"
version = "0.20.0"
@@ -4302,7 +4402,6 @@ dependencies = [
"http 1.1.0",
"http-body-util",
"humantime",
"humantime-serde",
"hyper 0.14.26",
"hyper 1.2.0",
"hyper-util",
@@ -4338,7 +4437,6 @@ dependencies = [
"rstest",
"rustc-hash",
"rustls 0.22.4",
"rustls-native-certs 0.7.0",
"rustls-pemfile 2.1.1",
"scopeguard",
"serde",
@@ -4368,6 +4466,7 @@ dependencies = [
"utils",
"uuid",
"walkdir",
"webpki-roots 0.26.1",
"workspace_hack",
"x509-parser",
]
@@ -4674,21 +4773,20 @@ dependencies = [
"http 0.2.9",
"http-body 0.4.5",
"hyper 0.14.26",
"hyper-rustls 0.24.0",
"hyper-tls",
"ipnet",
"js-sys",
"log",
"mime",
"native-tls",
"once_cell",
"percent-encoding",
"pin-project-lite",
"rustls 0.21.11",
"rustls-pemfile 1.0.2",
"serde",
"serde_json",
"serde_urlencoded",
"tokio",
"tokio-rustls 0.24.0",
"tokio-native-tls",
"tokio-util",
"tower-service",
"url",
@@ -4696,7 +4794,6 @@ dependencies = [
"wasm-bindgen-futures",
"wasm-streams 0.3.0",
"web-sys",
"webpki-roots 0.25.2",
"winreg 0.50.0",
]
@@ -5122,7 +5219,6 @@ dependencies = [
"hex",
"histogram",
"itertools",
"once_cell",
"pageserver",
"pageserver_api",
"postgres_ffi",
@@ -5130,7 +5226,6 @@ dependencies = [
"remote_storage",
"reqwest 0.12.4",
"rustls 0.22.4",
"rustls-native-certs 0.7.0",
"serde",
"serde_json",
"serde_with",
@@ -5145,6 +5240,7 @@ dependencies = [
"tracing-appender",
"tracing-subscriber",
"utils",
"webpki-roots 0.26.1",
"workspace_hack",
]
@@ -5735,6 +5831,15 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "statx-sys"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69c325f46f705b7a66fb87f0ebb999524a7363f30f05d373277b4ef7f409fe87"
dependencies = [
"libc",
]
[[package]]
name = "storage_broker"
version = "0.1.0"
@@ -6081,6 +6186,8 @@ checksum = "8f3403384eaacbca9923fa06940178ac13e4edb725486d70e8e15881d0c836cc"
dependencies = [
"itoa",
"js-sys",
"libc",
"num_threads",
"serde",
"time-core",
"time-macros",
@@ -6156,7 +6263,7 @@ dependencies = [
[[package]]
name = "tokio-epoll-uring"
version = "0.1.0"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#342ddd197a060a8354e8f11f4d12994419fff939"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=arpad/statx_sys#ca8446b8edb5e0aef88520f2fc209a13a834fd25"
dependencies = [
"futures",
"nix 0.26.4",
@@ -6190,6 +6297,16 @@ dependencies = [
"syn 2.0.52",
]
[[package]]
name = "tokio-native-tls"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2"
dependencies = [
"native-tls",
"tokio",
]
[[package]]
name = "tokio-postgres"
version = "0.7.7"
@@ -6596,6 +6713,15 @@ version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba"
[[package]]
name = "tz-rs"
version = "0.6.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33851b15c848fad2cf4b105c6bb66eb9512b6f6c44a4b13f57c53c73c707e2b4"
dependencies = [
"const_fn",
]
[[package]]
name = "uname"
version = "0.1.1"
@@ -6668,11 +6794,12 @@ dependencies = [
[[package]]
name = "uring-common"
version = "0.1.0"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#342ddd197a060a8354e8f11f4d12994419fff939"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=arpad/statx_sys#ca8446b8edb5e0aef88520f2fc209a13a834fd25"
dependencies = [
"bytes",
"io-uring",
"libc",
"statx-sys",
]
[[package]]

View File

@@ -46,10 +46,10 @@ anyhow = { version = "1.0", features = ["backtrace"] }
arc-swap = "1.6"
async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] }
atomic-take = "1.1.0"
azure_core = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls", "hmac_rust"] }
azure_identity = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls"] }
azure_storage = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls"] }
azure_storage_blobs = { version = "0.19", default-features = false, features = ["enable_reqwest_rustls"] }
azure_core = "0.19"
azure_identity = "0.19"
azure_storage = "0.19"
azure_storage_blobs = "0.19"
flate2 = "1.0.26"
async-stream = "0.3"
async-trait = "0.1"
@@ -114,6 +114,7 @@ md5 = "0.7.0"
measured = { version = "0.0.21", features=["lasso"] }
measured-process = { version = "0.0.21" }
memoffset = "0.8"
native-tls = "0.2"
nix = { version = "0.27", features = ["fs", "process", "socket", "signal", "poll"] }
notify = "6.0.0"
num_cpus = "1.15"
@@ -170,7 +171,7 @@ thiserror = "1.0"
tikv-jemallocator = "0.5"
tikv-jemalloc-ctl = "0.5"
tokio = { version = "1.17", features = ["macros"] }
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" }
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "arpad/statx_sys" }
tokio-io-timeout = "1.2.0"
tokio-postgres-rustls = "0.11.0"
tokio-rustls = "0.25"
@@ -190,7 +191,7 @@ url = "2.2"
urlencoding = "2.1"
uuid = { version = "1.6.1", features = ["v4", "v7", "serde"] }
walkdir = "2.3.2"
rustls-native-certs = "0.7"
webpki-roots = "0.26"
x509-parser = "0.15"
## TODO replace this with tracing

View File

@@ -100,11 +100,8 @@ name = "async-executor"
name = "smol"
[[bans.deny]]
# We want to use rustls instead of the platform's native tls implementation.
name = "native-tls"
[[bans.deny]]
name = "openssl"
# We want to use rustls instead of native-tls.
name = "postgres-native-tls"
# This section is considered when running `cargo deny check sources`.
# More documentation about the 'sources' section can be found here:

View File

@@ -9,33 +9,6 @@ use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken;
use tracing::*;
/// Declare a failpoint that can use the `pause` failpoint action.
/// We don't want to block the executor thread, hence, spawn_blocking + await.
#[macro_export]
macro_rules! pausable_failpoint {
($name:literal) => {
if cfg!(feature = "testing") {
tokio::task::spawn_blocking({
let current = tracing::Span::current();
move || {
let _entered = current.entered();
tracing::info!("at failpoint {}", $name);
fail::fail_point!($name);
}
})
.await
.expect("spawn_blocking");
}
};
($name:literal, $cond:expr) => {
if cfg!(feature = "testing") {
if $cond {
pausable_failpoint!($name)
}
}
};
}
/// use with fail::cfg("$name", "return(2000)")
///
/// The effect is similar to a "sleep(2000)" action, i.e. we sleep for the

View File

@@ -42,7 +42,6 @@ use utils::completion;
use utils::crashsafe::path_with_suffix_extension;
use utils::failpoint_support;
use utils::fs_ext;
use utils::pausable_failpoint;
use utils::sync::gate::Gate;
use utils::sync::gate::GateGuard;
use utils::timeout::timeout_cancellable;
@@ -123,6 +122,32 @@ use utils::{
lsn::{Lsn, RecordLsn},
};
/// Declare a failpoint that can use the `pause` failpoint action.
/// We don't want to block the executor thread, hence, spawn_blocking + await.
macro_rules! pausable_failpoint {
($name:literal) => {
if cfg!(feature = "testing") {
tokio::task::spawn_blocking({
let current = tracing::Span::current();
move || {
let _entered = current.entered();
tracing::info!("at failpoint {}", $name);
fail::fail_point!($name);
}
})
.await
.expect("spawn_blocking");
}
};
($name:literal, $cond:expr) => {
if cfg!(feature = "testing") {
if $cond {
pausable_failpoint!($name)
}
}
};
}
pub mod blob_io;
pub mod block_io;
pub mod vectored_blob_io;

View File

@@ -8,7 +8,7 @@ use tokio::sync::OwnedMutexGuard;
use tokio_util::sync::CancellationToken;
use tracing::{error, instrument, Instrument};
use utils::{backoff, completion, crashsafe, fs_ext, id::TimelineId, pausable_failpoint};
use utils::{backoff, completion, crashsafe, fs_ext, id::TimelineId};
use crate::{
config::PageServerConf,

View File

@@ -197,7 +197,6 @@ pub(crate) use upload::upload_initdb_dir;
use utils::backoff::{
self, exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
};
use utils::pausable_failpoint;
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicU32, Ordering};

View File

@@ -9,7 +9,7 @@ use std::time::SystemTime;
use tokio::fs::{self, File};
use tokio::io::AsyncSeekExt;
use tokio_util::sync::CancellationToken;
use utils::{backoff, pausable_failpoint};
use utils::backoff;
use super::Generation;
use crate::tenant::remote_timeline_client::{

View File

@@ -187,7 +187,6 @@ impl SecondaryTenant {
};
let now = SystemTime::now();
tracing::info!("Evicting secondary layer");
let this = self.clone();

View File

@@ -909,7 +909,6 @@ impl<'a> TenantDownloader<'a> {
strftime(&layer.access_time),
strftime(evicted_at)
);
self.skip_layer(layer);
continue;
}
}
@@ -964,15 +963,6 @@ impl<'a> TenantDownloader<'a> {
Ok(())
}
/// Call this during timeline download if a layer will _not_ be downloaded, to update progress statistics
fn skip_layer(&self, layer: HeatMapLayer) {
let mut progress = self.secondary_state.progress.lock().unwrap();
progress.layers_total = progress.layers_total.saturating_sub(1);
progress.bytes_total = progress
.bytes_total
.saturating_sub(layer.metadata.file_size);
}
async fn download_layer(
&self,
tenant_shard_id: &TenantShardId,
@@ -1022,7 +1012,13 @@ impl<'a> TenantDownloader<'a> {
"Skipped downloading missing layer {}, raced with compaction/gc?",
layer.name
);
self.skip_layer(layer);
// If the layer is 404, adjust the progress statistics to reflect that we will not download it.
let mut progress = self.secondary_state.progress.lock().unwrap();
progress.layers_total = progress.layers_total.saturating_sub(1);
progress.bytes_total = progress
.bytes_total
.saturating_sub(layer.metadata.file_size);
return Ok(None);
}

View File

@@ -17,7 +17,7 @@ use crate::tenant::{Tenant, TenantState};
use rand::Rng;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::{backoff, completion, pausable_failpoint};
use utils::{backoff, completion};
static CONCURRENT_BACKGROUND_TASKS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
once_cell::sync::Lazy::new(|| {

View File

@@ -41,7 +41,7 @@ use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::{
bin_ser::BeSer,
fs_ext, pausable_failpoint,
fs_ext,
sync::gate::{Gate, GateGuard},
vec_map::VecMap,
};

View File

@@ -7,7 +7,7 @@ use anyhow::Context;
use pageserver_api::{models::TimelineState, shard::TenantShardId};
use tokio::sync::OwnedMutexGuard;
use tracing::{error, info, instrument, Instrument};
use utils::{crashsafe, fs_ext, id::TimelineId, pausable_failpoint};
use utils::{crashsafe, fs_ext, id::TimelineId};
use crate::{
config::PageServerConf,

View File

@@ -344,21 +344,21 @@ macro_rules! with_file {
impl VirtualFile {
/// Open a file in read-only mode. Like File::open.
pub async fn open<P: AsRef<Utf8Path>>(
path: P,
pub async fn open(
path: &Utf8Path,
ctx: &RequestContext,
) -> Result<VirtualFile, std::io::Error> {
Self::open_with_options(path.as_ref(), OpenOptions::new().read(true), ctx).await
Self::open_with_options(path, OpenOptions::new().read(true), ctx).await
}
/// Create a new file for writing. If the file exists, it will be truncated.
/// Like File::create.
pub async fn create<P: AsRef<Utf8Path>>(
path: P,
pub async fn create(
path: &Utf8Path,
ctx: &RequestContext,
) -> Result<VirtualFile, std::io::Error> {
Self::open_with_options(
path.as_ref(),
path,
OpenOptions::new().write(true).create(true).truncate(true),
ctx,
)
@@ -370,13 +370,12 @@ impl VirtualFile {
/// Note: If any custom flags were set in 'open_options' through OpenOptionsExt,
/// they will be applied also when the file is subsequently re-opened, not only
/// on the first time. Make sure that's sane!
pub async fn open_with_options<P: AsRef<Utf8Path>>(
path: P,
pub async fn open_with_options(
path: &Utf8Path,
open_options: &OpenOptions,
_ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */
) -> Result<VirtualFile, std::io::Error> {
let path_ref = path.as_ref();
let path_str = path_ref.to_string();
let path_str = path.to_string();
let parts = path_str.split('/').collect::<Vec<&str>>();
let (tenant_id, shard_id, timeline_id) =
if parts.len() > 5 && parts[parts.len() - 5] == TENANTS_SEGMENT_NAME {
@@ -402,7 +401,7 @@ impl VirtualFile {
// where our caller doesn't get to use the returned VirtualFile before its
// slot gets re-used by someone else.
let file = observe_duration!(StorageIoOperation::Open, {
open_options.open(path_ref.as_std_path()).await?
open_options.open(path.as_std_path()).await?
});
// Strip all options other than read and write.
@@ -418,7 +417,7 @@ impl VirtualFile {
let vfile = VirtualFile {
handle: RwLock::new(handle),
pos: 0,
path: path_ref.to_path_buf(),
path: path.to_path_buf(),
open_options: reopen_options,
tenant_id,
shard_id,

View File

@@ -125,6 +125,13 @@ typedef struct
* - WL_EXIT_ON_PM_DEATH.
*/
WaitEventSet *wes_read;
/*---
* WaitEventSet containing:
* - WL_SOCKET_WRITABLE on 'conn'
* - WL_LATCH_SET on MyLatch, and
* - WL_EXIT_ON_PM_DEATH.
*/
WaitEventSet *wes_write;
} PageServer;
static PageServer page_servers[MAX_SHARDS];
@@ -329,6 +336,11 @@ CLEANUP_AND_DISCONNECT(PageServer *shard)
FreeWaitEventSet(shard->wes_read);
shard->wes_read = NULL;
}
if (shard->wes_write)
{
FreeWaitEventSet(shard->wes_write);
shard->wes_write = NULL;
}
if (shard->conn)
{
PQfinish(shard->conn);
@@ -424,6 +436,22 @@ pageserver_connect(shardno_t shard_no, int elevel)
return false;
}
shard->wes_read = CreateWaitEventSet(TopMemoryContext, 3);
AddWaitEventToSet(shard->wes_read, WL_LATCH_SET, PGINVALID_SOCKET,
MyLatch, NULL);
AddWaitEventToSet(shard->wes_read, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
NULL, NULL);
AddWaitEventToSet(shard->wes_read, WL_SOCKET_READABLE, PQsocket(shard->conn), NULL, NULL);
shard->wes_write = CreateWaitEventSet(TopMemoryContext, 3);
AddWaitEventToSet(shard->wes_write, WL_LATCH_SET, PGINVALID_SOCKET,
MyLatch, NULL);
AddWaitEventToSet(shard->wes_write, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
NULL, NULL);
AddWaitEventToSet(shard->wes_write, WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE,
PQsocket(shard->conn),
NULL, NULL);
shard->state = PS_Connecting_Startup;
/* fallthrough */
}
@@ -432,12 +460,13 @@ pageserver_connect(shardno_t shard_no, int elevel)
char *pagestream_query;
int ps_send_query_ret;
bool connected = false;
int poll_result = PGRES_POLLING_WRITING;
neon_shard_log(shard_no, DEBUG5, "Connection state: Connecting_Startup");
do
{
WaitEvent event;
int poll_result = PQconnectPoll(shard->conn);
switch (poll_result)
{
@@ -468,45 +497,25 @@ pageserver_connect(shardno_t shard_no, int elevel)
}
case PGRES_POLLING_READING:
/* Sleep until there's something to do */
while (true)
{
int rc = WaitLatchOrSocket(MyLatch,
WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | WL_SOCKET_READABLE,
PQsocket(shard->conn),
0,
PG_WAIT_EXTENSION);
elog(DEBUG5, "PGRES_POLLING_READING=>%d", rc);
if (rc & WL_LATCH_SET)
{
ResetLatch(MyLatch);
/* query cancellation, backend shutdown */
CHECK_FOR_INTERRUPTS();
}
if (rc & WL_SOCKET_READABLE)
break;
}
(void) WaitEventSetWait(shard->wes_read, -1L, &event, 1,
PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
/* query cancellation, backend shutdown */
CHECK_FOR_INTERRUPTS();
/* PQconnectPoll() handles the socket polling state updates */
break;
case PGRES_POLLING_WRITING:
/* Sleep until there's something to do */
while (true)
{
int rc = WaitLatchOrSocket(MyLatch,
WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | WL_SOCKET_WRITEABLE,
PQsocket(shard->conn),
0,
PG_WAIT_EXTENSION);
elog(DEBUG5, "PGRES_POLLING_WRITING=>%d", rc);
if (rc & WL_LATCH_SET)
{
ResetLatch(MyLatch);
/* query cancellation, backend shutdown */
CHECK_FOR_INTERRUPTS();
}
if (rc & WL_SOCKET_WRITEABLE)
break;
}
(void) WaitEventSetWait(shard->wes_write, -1L, &event, 1,
PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
/* query cancellation, backend shutdown */
CHECK_FOR_INTERRUPTS();
/* PQconnectPoll() handles the socket polling state updates */
break;
@@ -515,22 +524,12 @@ pageserver_connect(shardno_t shard_no, int elevel)
connected = true;
break;
}
poll_result = PQconnectPoll(shard->conn);
elog(DEBUG5, "PQconnectPoll=>%d", poll_result);
}
while (!connected);
/* No more polling needed; connection succeeded */
shard->last_connect_time = GetCurrentTimestamp();
shard->wes_read = CreateWaitEventSet(TopMemoryContext, 3);
AddWaitEventToSet(shard->wes_read, WL_LATCH_SET, PGINVALID_SOCKET,
MyLatch, NULL);
AddWaitEventToSet(shard->wes_read, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
NULL, NULL);
AddWaitEventToSet(shard->wes_read, WL_SOCKET_READABLE, PQsocket(shard->conn), NULL, NULL);
switch (neon_protocol_version)
{
case 2:

View File

@@ -584,9 +584,9 @@ prefetch_read(PrefetchRequest *slot)
slot->response != NULL ||
slot->my_ring_index != MyPState->ring_receive)
neon_shard_log(slot->shard_no, ERROR,
"Incorrect prefetch read: status=%d response=%p my=%lu receive=%lu",
slot->status, slot->response,
(long)slot->my_ring_index, (long)MyPState->ring_receive);
"Incorrect prefetch read: status=%d response=%llx my=%llu receive=%llu",
slot->status, (size_t) (void *) slot->response,
slot->my_ring_index, MyPState->ring_receive);
old = MemoryContextSwitchTo(MyPState->errctx);
response = (NeonResponse *) page_server->receive(slot->shard_no);
@@ -606,8 +606,8 @@ prefetch_read(PrefetchRequest *slot)
else
{
neon_shard_log(slot->shard_no, WARNING,
"No response from reading prefetch entry %lu: %u/%u/%u.%u block %u. This can be caused by a concurrent disconnect",
(long)slot->my_ring_index,
"No response from reading prefetch entry %llu: %u/%u/%u.%u block %u. This can be caused by a concurrent disconnect",
slot->my_ring_index,
RelFileInfoFmt(BufTagGetNRelFileInfo(slot->buftag)),
slot->buftag.forkNum, slot->buftag.blockNum);
return false;

View File

@@ -38,7 +38,6 @@ hmac.workspace = true
hostname.workspace = true
http.workspace = true
humantime.workspace = true
humantime-serde.workspace = true
hyper.workspace = true
hyper1 = { package = "hyper", version = "1.2", features = ["server"] }
hyper-util = { version = "0.1", features = ["server", "http1", "http2", "tokio"] }
@@ -96,7 +95,7 @@ url.workspace = true
urlencoding.workspace = true
utils.workspace = true
uuid.workspace = true
rustls-native-certs.workspace = true
webpki-roots.workspace = true
x509-parser.workspace = true
postgres-protocol.workspace = true
redis.workspace = true

View File

@@ -9,7 +9,6 @@ use futures::future::Either;
use itertools::Itertools;
use proxy::config::TlsServerEndPoint;
use proxy::context::RequestMonitoring;
use proxy::metrics::{Metrics, ThreadPoolMetrics};
use proxy::proxy::{copy_bidirectional_client_compute, run_until_cancelled};
use rustls::pki_types::PrivateKeyDer;
use tokio::net::TcpListener;
@@ -66,8 +65,6 @@ async fn main() -> anyhow::Result<()> {
let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook();
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
Metrics::install(Arc::new(ThreadPoolMetrics::new(0)));
let args = cli().get_matches();
let destination: String = args.get_one::<String>("dest").unwrap().parse()?;

View File

@@ -557,14 +557,14 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
let config::ConcurrencyLockOptions {
shards,
limiter,
permits,
epoch,
timeout,
} = args.wake_compute_lock.parse()?;
info!(?limiter, shards, ?epoch, "Using NodeLocks (wake_compute)");
info!(permits, shards, ?epoch, "Using NodeLocks (wake_compute)");
let locks = Box::leak(Box::new(console::locks::ApiLocks::new(
"wake_compute_lock",
limiter,
permits,
shards,
timeout,
epoch,
@@ -603,19 +603,14 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
let config::ConcurrencyLockOptions {
shards,
limiter,
permits,
epoch,
timeout,
} = args.connect_compute_lock.parse()?;
info!(
?limiter,
shards,
?epoch,
"Using NodeLocks (connect_compute)"
);
info!(permits, shards, ?epoch, "Using NodeLocks (connect_compute)");
let connect_compute_locks = console::locks::ApiLocks::new(
"connect_compute_lock",
limiter,
permits,
shards,
timeout,
epoch,

View File

@@ -10,7 +10,6 @@ use crate::{
};
use futures::{FutureExt, TryFutureExt};
use itertools::Itertools;
use once_cell::sync::OnceCell;
use pq_proto::StartupMessageParams;
use rustls::{client::danger::ServerCertVerifier, pki_types::InvalidDnsNameError};
use std::{io, net::SocketAddr, sync::Arc, time::Duration};
@@ -286,13 +285,14 @@ impl ConnCfg {
drop(pause);
let client_config = if allow_self_signed_compute {
// Allow all certificates for creating the connection
let verifier = Arc::new(AcceptEverythingVerifier) as Arc<dyn ServerCertVerifier>;
rustls::ClientConfig::builder()
.dangerous()
.with_custom_certificate_verifier(verifier)
} else {
let root_store = TLS_ROOTS.get_or_try_init(load_certs)?.clone();
let root_store = rustls::RootCertStore {
roots: webpki_roots::TLS_SERVER_ROOTS.to_vec(),
};
rustls::ClientConfig::builder().with_root_certificates(root_store)
};
let client_config = client_config.with_no_client_auth();
@@ -354,14 +354,6 @@ fn filtered_options(params: &StartupMessageParams) -> Option<String> {
Some(options)
}
fn load_certs() -> Result<Arc<rustls::RootCertStore>, io::Error> {
let der_certs = rustls_native_certs::load_native_certs()?;
let mut store = rustls::RootCertStore::empty();
store.add_parsable_certificates(der_certs);
Ok(Arc::new(store))
}
static TLS_ROOTS: OnceCell<Arc<rustls::RootCertStore>> = OnceCell::new();
#[derive(Debug)]
struct AcceptEverythingVerifier;
impl ServerCertVerifier for AcceptEverythingVerifier {

View File

@@ -1,7 +1,7 @@
use crate::{
auth::{self, backend::AuthRateLimiter},
console::locks::ApiLocks,
rate_limiter::{RateBucketInfo, RateLimitAlgorithm, RateLimiterConfig},
rate_limiter::RateBucketInfo,
scram::threadpool::ThreadPool,
serverless::{cancel_set::CancelSet, GlobalConnPoolOptions},
Host,
@@ -580,18 +580,14 @@ impl RetryConfig {
}
/// Helper for cmdline cache options parsing.
#[derive(serde::Deserialize)]
pub struct ConcurrencyLockOptions {
/// The number of shards the lock map should have
pub shards: usize,
/// The number of allowed concurrent requests for each endpoitn
#[serde(flatten)]
pub limiter: RateLimiterConfig,
pub permits: usize,
/// Garbage collection epoch
#[serde(deserialize_with = "humantime_serde::deserialize")]
pub epoch: Duration,
/// Lock timeout
#[serde(deserialize_with = "humantime_serde::deserialize")]
pub timeout: Duration,
}
@@ -600,18 +596,13 @@ impl ConcurrencyLockOptions {
pub const DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK: &'static str = "permits=0";
/// Default options for [`crate::console::provider::ApiLocks`].
pub const DEFAULT_OPTIONS_CONNECT_COMPUTE_LOCK: &'static str =
"shards=64,permits=100,epoch=10m,timeout=10ms";
"shards=64,permits=10,epoch=10m,timeout=10ms";
// pub const DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK: &'static str = "shards=32,permits=4,epoch=10m,timeout=1s";
/// Parse lock options passed via cmdline.
/// Example: [`Self::DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK`].
fn parse(options: &str) -> anyhow::Result<Self> {
let options = options.trim();
if options.starts_with('{') && options.ends_with('}') {
return Ok(serde_json::from_str(options)?);
}
let mut shards = None;
let mut permits = None;
let mut epoch = None;
@@ -638,13 +629,9 @@ impl ConcurrencyLockOptions {
shards = Some(2);
}
let permits = permits.context("missing `permits`")?;
let out = Self {
shards: shards.context("missing `shards`")?,
limiter: RateLimiterConfig {
algorithm: RateLimitAlgorithm::Fixed,
initial_limit: permits,
},
permits: permits.context("missing `permits`")?,
epoch: epoch.context("missing `epoch`")?,
timeout: timeout.context("missing `timeout`")?,
};
@@ -670,8 +657,6 @@ impl FromStr for ConcurrencyLockOptions {
#[cfg(test)]
mod tests {
use crate::rate_limiter::Aimd;
use super::*;
#[test]
@@ -699,68 +684,36 @@ mod tests {
fn test_parse_lock_options() -> anyhow::Result<()> {
let ConcurrencyLockOptions {
epoch,
limiter,
permits,
shards,
timeout,
} = "shards=32,permits=4,epoch=10m,timeout=1s".parse()?;
assert_eq!(epoch, Duration::from_secs(10 * 60));
assert_eq!(timeout, Duration::from_secs(1));
assert_eq!(shards, 32);
assert_eq!(limiter.initial_limit, 4);
assert_eq!(limiter.algorithm, RateLimitAlgorithm::Fixed);
assert_eq!(permits, 4);
let ConcurrencyLockOptions {
epoch,
limiter,
permits,
shards,
timeout,
} = "epoch=60s,shards=16,timeout=100ms,permits=8".parse()?;
assert_eq!(epoch, Duration::from_secs(60));
assert_eq!(timeout, Duration::from_millis(100));
assert_eq!(shards, 16);
assert_eq!(limiter.initial_limit, 8);
assert_eq!(limiter.algorithm, RateLimitAlgorithm::Fixed);
assert_eq!(permits, 8);
let ConcurrencyLockOptions {
epoch,
limiter,
permits,
shards,
timeout,
} = "permits=0".parse()?;
assert_eq!(epoch, Duration::ZERO);
assert_eq!(timeout, Duration::ZERO);
assert_eq!(shards, 2);
assert_eq!(limiter.initial_limit, 0);
assert_eq!(limiter.algorithm, RateLimitAlgorithm::Fixed);
Ok(())
}
#[test]
fn test_parse_json_lock_options() -> anyhow::Result<()> {
let ConcurrencyLockOptions {
epoch,
limiter,
shards,
timeout,
} = r#"{"shards":32,"initial_limit":44,"aimd":{"min":5,"max":500,"inc":10,"dec":0.9,"utilisation":0.8},"epoch":"10m","timeout":"1s"}"#
.parse()?;
assert_eq!(epoch, Duration::from_secs(10 * 60));
assert_eq!(timeout, Duration::from_secs(1));
assert_eq!(shards, 32);
assert_eq!(limiter.initial_limit, 44);
assert_eq!(
limiter.algorithm,
RateLimitAlgorithm::Aimd {
conf: Aimd {
min: 5,
max: 500,
dec: 0.9,
inc: 10,
utilisation: 0.8
}
},
);
assert_eq!(permits, 0);
Ok(())
}

View File

@@ -15,11 +15,11 @@ use crate::{
error::ReportableError,
intern::ProjectIdInt,
metrics::ApiLockMetrics,
rate_limiter::{DynamicLimiter, Outcome, RateLimiterConfig, Token},
scram, EndpointCacheKey,
};
use dashmap::DashMap;
use std::{hash::Hash, sync::Arc, time::Duration};
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio::time::Instant;
use tracing::info;
@@ -443,8 +443,8 @@ impl ApiCaches {
/// Various caches for [`console`](super).
pub struct ApiLocks<K> {
name: &'static str,
node_locks: DashMap<K, Arc<DynamicLimiter>>,
config: RateLimiterConfig,
node_locks: DashMap<K, Arc<Semaphore>>,
permits: usize,
timeout: Duration,
epoch: std::time::Duration,
metrics: &'static ApiLockMetrics,
@@ -452,6 +452,8 @@ pub struct ApiLocks<K> {
#[derive(Debug, thiserror::Error)]
pub enum ApiLockError {
#[error("lock was closed")]
AcquireError(#[from] tokio::sync::AcquireError),
#[error("permit could not be acquired")]
TimeoutError(#[from] tokio::time::error::Elapsed),
}
@@ -459,6 +461,7 @@ pub enum ApiLockError {
impl ReportableError for ApiLockError {
fn get_error_kind(&self) -> crate::error::ErrorKind {
match self {
ApiLockError::AcquireError(_) => crate::error::ErrorKind::Service,
ApiLockError::TimeoutError(_) => crate::error::ErrorKind::RateLimit,
}
}
@@ -467,7 +470,7 @@ impl ReportableError for ApiLockError {
impl<K: Hash + Eq + Clone> ApiLocks<K> {
pub fn new(
name: &'static str,
config: RateLimiterConfig,
permits: usize,
shards: usize,
timeout: Duration,
epoch: std::time::Duration,
@@ -476,7 +479,7 @@ impl<K: Hash + Eq + Clone> ApiLocks<K> {
Ok(Self {
name,
node_locks: DashMap::with_shard_amount(shards),
config,
permits,
timeout,
epoch,
metrics,
@@ -484,10 +487,8 @@ impl<K: Hash + Eq + Clone> ApiLocks<K> {
}
pub async fn get_permit(&self, key: &K) -> Result<WakeComputePermit, ApiLockError> {
if self.config.initial_limit == 0 {
return Ok(WakeComputePermit {
permit: Token::disabled(),
});
if self.permits == 0 {
return Ok(WakeComputePermit { permit: None });
}
let now = Instant::now();
let semaphore = {
@@ -499,22 +500,24 @@ impl<K: Hash + Eq + Clone> ApiLocks<K> {
.entry(key.clone())
.or_insert_with(|| {
self.metrics.semaphores_registered.inc();
DynamicLimiter::new(self.config)
Arc::new(Semaphore::new(self.permits))
})
.clone()
}
};
let permit = semaphore.acquire_deadline(now + self.timeout).await;
let permit = tokio::time::timeout_at(now + self.timeout, semaphore.acquire_owned()).await;
self.metrics
.semaphore_acquire_seconds
.observe(now.elapsed().as_secs_f64());
Ok(WakeComputePermit { permit: permit? })
Ok(WakeComputePermit {
permit: Some(permit??),
})
}
pub async fn garbage_collect_worker(&self) {
if self.config.initial_limit == 0 {
if self.permits == 0 {
return;
}
let mut interval =
@@ -544,21 +547,12 @@ impl<K: Hash + Eq + Clone> ApiLocks<K> {
}
pub struct WakeComputePermit {
permit: Token,
// None if the lock is disabled
permit: Option<OwnedSemaphorePermit>,
}
impl WakeComputePermit {
pub fn should_check_cache(&self) -> bool {
!self.permit.is_disabled()
}
pub fn release(self, outcome: Outcome) {
self.permit.release(outcome)
}
pub fn release_result<T, E>(self, res: Result<T, E>) -> Result<T, E> {
match res {
Ok(_) => self.release(Outcome::Success),
Err(_) => self.release(Outcome::Overload),
}
res
self.permit.is_some()
}
}

View File

@@ -301,7 +301,7 @@ impl super::Api for Api {
}
}
let mut node = permit.release_result(self.do_wake_compute(ctx, user_info).await)?;
let mut node = self.do_wake_compute(ctx, user_info).await?;
ctx.set_project(node.aux.clone());
let cold_start_info = node.aux.cold_start_info;
info!("woken up a compute node");

View File

@@ -84,8 +84,8 @@ impl ConnectMechanism for TcpMechanism<'_> {
timeout: time::Duration,
) -> Result<PostgresConnection, Self::Error> {
let host = node_info.config.get_host()?;
let permit = self.locks.get_permit(&host).await?;
permit.release_result(node_info.connect(ctx, timeout).await)
let _permit = self.locks.get_permit(&host).await?;
node_info.connect(ctx, timeout).await
}
fn update_connect_config(&self, config: &mut compute::ConnCfg) {

View File

@@ -1,6 +1,2 @@
mod limit_algorithm;
mod limiter;
pub use limit_algorithm::{
aimd::Aimd, DynamicLimiter, Outcome, RateLimitAlgorithm, RateLimiterConfig, Token,
};
pub use limiter::{BucketRateLimiter, EndpointRateLimiter, GlobalRateLimiter, RateBucketInfo};

View File

@@ -1,275 +0,0 @@
//! Algorithms for controlling concurrency limits.
use parking_lot::Mutex;
use std::{pin::pin, sync::Arc, time::Duration};
use tokio::{
sync::Notify,
time::{error::Elapsed, timeout_at, Instant},
};
use self::aimd::Aimd;
pub mod aimd;
/// Whether a job succeeded or failed as a result of congestion/overload.
///
/// Errors not considered to be caused by overload should be ignored.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Outcome {
/// The job succeeded, or failed in a way unrelated to overload.
Success,
/// The job failed because of overload, e.g. it timed out or an explicit backpressure signal
/// was observed.
Overload,
}
/// An algorithm for controlling a concurrency limit.
pub trait LimitAlgorithm: Send + Sync + 'static {
/// Update the concurrency limit in response to a new job completion.
fn update(&self, old_limit: usize, sample: Sample) -> usize;
}
/// The result of a job (or jobs), including the [`Outcome`] (loss) and latency (delay).
#[derive(Debug, Clone, PartialEq, Eq, Copy)]
pub struct Sample {
pub(crate) latency: Duration,
/// Jobs in flight when the sample was taken.
pub(crate) in_flight: usize,
pub(crate) outcome: Outcome,
}
#[derive(Clone, Copy, Debug, Default, serde::Deserialize, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum RateLimitAlgorithm {
#[default]
Fixed,
Aimd {
#[serde(flatten)]
conf: Aimd,
},
}
pub struct Fixed;
impl LimitAlgorithm for Fixed {
fn update(&self, old_limit: usize, _sample: Sample) -> usize {
old_limit
}
}
#[derive(Clone, Copy, Debug, serde::Deserialize, PartialEq)]
pub struct RateLimiterConfig {
#[serde(flatten)]
pub algorithm: RateLimitAlgorithm,
pub initial_limit: usize,
}
impl RateLimiterConfig {
pub fn create_rate_limit_algorithm(self) -> Box<dyn LimitAlgorithm> {
match self.algorithm {
RateLimitAlgorithm::Fixed => Box::new(Fixed),
RateLimitAlgorithm::Aimd { conf } => Box::new(conf),
}
}
}
pub struct LimiterInner {
alg: Box<dyn LimitAlgorithm>,
available: usize,
limit: usize,
in_flight: usize,
}
impl LimiterInner {
fn update(&mut self, latency: Duration, outcome: Option<Outcome>) {
if let Some(outcome) = outcome {
let sample = Sample {
latency,
in_flight: self.in_flight,
outcome,
};
self.limit = self.alg.update(self.limit, sample);
}
}
fn take(&mut self, ready: &Notify) -> Option<()> {
if self.available > 1 {
self.available -= 1;
self.in_flight += 1;
// tell the next in the queue that there is a permit ready
if self.available > 1 {
ready.notify_one();
}
Some(())
} else {
None
}
}
}
/// Limits the number of concurrent jobs.
///
/// Concurrency is limited through the use of [`Token`]s. Acquire a token to run a job, and release the
/// token once the job is finished.
///
/// The limit will be automatically adjusted based on observed latency (delay) and/or failures
/// caused by overload (loss).
pub struct DynamicLimiter {
config: RateLimiterConfig,
inner: Mutex<LimiterInner>,
// to notify when a token is available
ready: Notify,
}
/// A concurrency token, required to run a job.
///
/// Release the token back to the [`DynamicLimiter`] after the job is complete.
pub struct Token {
start: Instant,
limiter: Option<Arc<DynamicLimiter>>,
}
/// A snapshot of the state of the [`DynamicLimiter`].
///
/// Not guaranteed to be consistent under high concurrency.
#[derive(Debug, Clone, Copy)]
pub struct LimiterState {
limit: usize,
in_flight: usize,
}
impl DynamicLimiter {
/// Create a limiter with a given limit control algorithm.
pub fn new(config: RateLimiterConfig) -> Arc<Self> {
let ready = Notify::new();
ready.notify_one();
Arc::new(Self {
inner: Mutex::new(LimiterInner {
alg: config.create_rate_limit_algorithm(),
available: config.initial_limit,
limit: config.initial_limit,
in_flight: 0,
}),
ready,
config,
})
}
/// Try to acquire a concurrency [Token], waiting for `duration` if there are none available.
///
/// Returns `None` if there are none available after `duration`.
pub async fn acquire_timeout(self: &Arc<Self>, duration: Duration) -> Result<Token, Elapsed> {
self.acquire_deadline(Instant::now() + duration).await
}
/// Try to acquire a concurrency [Token], waiting until `deadline` if there are none available.
///
/// Returns `None` if there are none available after `deadline`.
pub async fn acquire_deadline(self: &Arc<Self>, deadline: Instant) -> Result<Token, Elapsed> {
if self.config.initial_limit == 0 {
// If the rate limiter is disabled, we can always acquire a token.
Ok(Token::disabled())
} else {
let mut notified = pin!(self.ready.notified());
let mut ready = notified.as_mut().enable();
loop {
let mut limit = None;
if ready {
let mut inner = self.inner.lock();
if inner.take(&self.ready).is_some() {
break Ok(Token::new(self.clone()));
}
limit = Some(inner.limit);
}
match timeout_at(deadline, notified.as_mut()).await {
Ok(()) => ready = true,
Err(e) => {
let limit = limit.unwrap_or_else(|| self.inner.lock().limit);
tracing::info!(limit, "could not acquire token in time");
break Err(e);
}
}
}
}
}
/// Return the concurrency [Token], along with the outcome of the job.
///
/// The [Outcome] of the job, and the time taken to perform it, may be used
/// to update the concurrency limit.
///
/// Set the outcome to `None` to ignore the job.
fn release_inner(&self, start: Instant, outcome: Option<Outcome>) {
tracing::info!("outcome is {:?}", outcome);
if self.config.initial_limit == 0 {
return;
}
let mut inner = self.inner.lock();
inner.update(start.elapsed(), outcome);
if inner.in_flight < inner.limit {
inner.available = inner.limit - inner.in_flight;
// At least 1 permit is now available
self.ready.notify_one();
}
inner.in_flight -= 1;
}
/// The current state of the limiter.
pub fn state(&self) -> LimiterState {
let inner = self.inner.lock();
LimiterState {
limit: inner.limit,
in_flight: inner.in_flight,
}
}
}
impl Token {
fn new(limiter: Arc<DynamicLimiter>) -> Self {
Self {
start: Instant::now(),
limiter: Some(limiter),
}
}
pub fn disabled() -> Self {
Self {
start: Instant::now(),
limiter: None,
}
}
pub fn is_disabled(&self) -> bool {
self.limiter.is_none()
}
pub fn release(mut self, outcome: Outcome) {
self.release_mut(Some(outcome))
}
pub fn release_mut(&mut self, outcome: Option<Outcome>) {
if let Some(limiter) = self.limiter.take() {
limiter.release_inner(self.start, outcome);
}
}
}
impl Drop for Token {
fn drop(&mut self) {
self.release_mut(None)
}
}
impl LimiterState {
/// The current concurrency limit.
pub fn limit(&self) -> usize {
self.limit
}
/// The number of jobs in flight.
pub fn in_flight(&self) -> usize {
self.in_flight
}
}

View File

@@ -1,184 +0,0 @@
use std::usize;
use super::{LimitAlgorithm, Outcome, Sample};
/// Loss-based congestion avoidance.
///
/// Additive-increase, multiplicative decrease.
///
/// Adds available currency when:
/// 1. no load-based errors are observed, and
/// 2. the utilisation of the current limit is high.
///
/// Reduces available concurrency by a factor when load-based errors are detected.
#[derive(Clone, Copy, Debug, serde::Deserialize, PartialEq)]
pub struct Aimd {
/// Minimum limit for AIMD algorithm.
pub min: usize,
/// Maximum limit for AIMD algorithm.
pub max: usize,
/// Decrease AIMD decrease by value in case of error.
pub dec: f32,
/// Increase AIMD increase by value in case of success.
pub inc: usize,
/// A threshold below which the limit won't be increased.
pub utilisation: f32,
}
impl LimitAlgorithm for Aimd {
fn update(&self, old_limit: usize, sample: Sample) -> usize {
use Outcome::*;
match sample.outcome {
Success => {
let utilisation = sample.in_flight as f32 / old_limit as f32;
if utilisation > self.utilisation {
let limit = old_limit + self.inc;
let increased_limit = limit.clamp(self.min, self.max);
if increased_limit > old_limit {
tracing::info!(increased_limit, "limit increased");
}
increased_limit
} else {
old_limit
}
}
Overload => {
let limit = old_limit as f32 * self.dec;
// Floor instead of round, so the limit reduces even with small numbers.
// E.g. round(2 * 0.9) = 2, but floor(2 * 0.9) = 1
let limit = limit.floor() as usize;
limit.clamp(self.min, self.max)
}
}
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use crate::rate_limiter::limit_algorithm::{
DynamicLimiter, RateLimitAlgorithm, RateLimiterConfig,
};
use super::*;
#[tokio::test(start_paused = true)]
async fn should_decrease_limit_on_overload() {
let config = RateLimiterConfig {
initial_limit: 10,
algorithm: RateLimitAlgorithm::Aimd {
conf: Aimd {
min: 1,
max: 1500,
inc: 10,
dec: 0.5,
utilisation: 0.8,
},
},
};
let limiter = DynamicLimiter::new(config);
let token = limiter
.acquire_timeout(Duration::from_millis(1))
.await
.unwrap();
token.release(Outcome::Overload);
assert_eq!(limiter.state().limit(), 5, "overload: decrease");
}
#[tokio::test(start_paused = true)]
async fn should_increase_limit_on_success_when_using_gt_util_threshold() {
let config = RateLimiterConfig {
initial_limit: 4,
algorithm: RateLimitAlgorithm::Aimd {
conf: Aimd {
min: 1,
max: 1500,
inc: 1,
dec: 0.5,
utilisation: 0.5,
},
},
};
let limiter = DynamicLimiter::new(config);
let token = limiter
.acquire_timeout(Duration::from_millis(1))
.await
.unwrap();
let _token = limiter
.acquire_timeout(Duration::from_millis(1))
.await
.unwrap();
let _token = limiter
.acquire_timeout(Duration::from_millis(1))
.await
.unwrap();
token.release(Outcome::Success);
assert_eq!(limiter.state().limit(), 5, "success: increase");
}
#[tokio::test(start_paused = true)]
async fn should_not_change_limit_on_success_when_using_lt_util_threshold() {
let config = RateLimiterConfig {
initial_limit: 4,
algorithm: RateLimitAlgorithm::Aimd {
conf: Aimd {
min: 1,
max: 1500,
inc: 10,
dec: 0.5,
utilisation: 0.5,
},
},
};
let limiter = DynamicLimiter::new(config);
let token = limiter
.acquire_timeout(Duration::from_millis(1))
.await
.unwrap();
token.release(Outcome::Success);
assert_eq!(
limiter.state().limit(),
4,
"success: ignore when < half limit"
);
}
#[tokio::test(start_paused = true)]
async fn should_not_change_limit_when_no_outcome() {
let config = RateLimiterConfig {
initial_limit: 10,
algorithm: RateLimitAlgorithm::Aimd {
conf: Aimd {
min: 1,
max: 1500,
inc: 10,
dec: 0.5,
utilisation: 0.5,
},
},
};
let limiter = DynamicLimiter::new(config);
let token = limiter
.acquire_timeout(Duration::from_millis(1))
.await
.unwrap();
drop(token);
assert_eq!(limiter.state().limit(), 10, "ignore");
}
}

View File

@@ -232,9 +232,9 @@ impl ConnectMechanism for TokioMechanism {
.connect_timeout(timeout);
let pause = ctx.latency_timer.pause(crate::metrics::Waiting::Compute);
let res = config.connect(tokio_postgres::NoTls).await;
let (client, connection) = config.connect(tokio_postgres::NoTls).await?;
drop(pause);
let (client, connection) = permit.release_result(res)?;
drop(permit);
tracing::Span::current().record("pid", &tracing::field::display(client.get_process_id()));
Ok(poll_client(

View File

@@ -51,10 +51,9 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncWrite for WebSocketRw<S> {
) -> Poll<io::Result<usize>> {
let this = self.project();
let mut stream = this.stream;
this.send.put(buf);
ready!(stream.as_mut().poll_ready(cx).map_err(io_error))?;
this.send.put(buf);
match stream.as_mut().start_send(Frame::binary(this.send.split())) {
Ok(()) => Poll::Ready(Ok(buf.len())),
Err(e) => Poll::Ready(Err(io_error(e))),

View File

@@ -54,7 +54,6 @@ build-backend = "poetry.core.masonry.api"
exclude = [
"^vendor/",
"^target/",
"test_runner/performance/pgvector/loaddata.py",
]
check_untyped_defs = true
# Help mypy find imports when running against list of individual files.

View File

@@ -31,8 +31,7 @@ futures-util.workspace = true
itertools.workspace = true
camino.workspace = true
rustls.workspace = true
rustls-native-certs.workspace = true
once_cell.workspace = true
webpki-roots.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
chrono = { workspace = true, default-features = false, features = ["clock", "serde"] }

View File

@@ -1,8 +1,7 @@
use std::{collections::HashSet, str::FromStr, sync::Arc};
use std::{collections::HashSet, str::FromStr};
use aws_sdk_s3::Client;
use futures::stream::{StreamExt, TryStreamExt};
use once_cell::sync::OnceCell;
use pageserver_api::shard::TenantShardId;
use postgres_ffi::{XLogFileName, PG_TLI};
use serde::Serialize;
@@ -71,8 +70,10 @@ pub async fn scan_safekeeper_metadata(
"checking bucket {}, region {}, dump_db_table {}",
bucket_config.bucket, bucket_config.region, dump_db_table
);
// Use rustls (Neon requires TLS)
let root_store = TLS_ROOTS.get_or_try_init(load_certs)?.clone();
// Use the native TLS implementation (Neon requires TLS)
let root_store = rustls::RootCertStore {
roots: webpki_roots::TLS_SERVER_ROOTS.to_vec(),
};
let client_config = rustls::ClientConfig::builder()
.with_root_certificates(root_store)
.with_no_client_auth();
@@ -238,11 +239,3 @@ async fn check_timeline(
is_deleted: false,
})
}
fn load_certs() -> Result<Arc<rustls::RootCertStore>, std::io::Error> {
let der_certs = rustls_native_certs::load_native_certs()?;
let mut store = rustls::RootCertStore::empty();
store.add_parsable_certificates(der_certs);
Ok(Arc::new(store))
}
static TLS_ROOTS: OnceCell<Arc<rustls::RootCertStore>> = OnceCell::new();

View File

@@ -287,26 +287,6 @@ async fn timeline_files_handler(request: Request<Body>) -> Result<Response<Body>
.map_err(|e| ApiError::InternalServerError(e.into()))
}
/// Force persist control file and remove old WAL.
async fn timeline_checkpoint_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;
let ttid = TenantTimelineId::new(
parse_request_param(&request, "tenant_id")?,
parse_request_param(&request, "timeline_id")?,
);
let tli = GlobalTimelines::get(ttid)?;
tli.maybe_persist_control_file(true)
.await
.map_err(ApiError::InternalServerError)?;
tli.remove_old_wal()
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::OK, ())
}
/// Deactivates the timeline and removes its data directory.
async fn timeline_delete_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
let ttid = TenantTimelineId::new(
@@ -573,10 +553,6 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
"/v1/tenant/:tenant_id/timeline/:timeline_id/control_file",
|r| request_span(r, patch_control_file_handler),
)
.post(
"/v1/tenant/:tenant_id/timeline/:timeline_id/checkpoint",
|r| request_span(r, timeline_checkpoint_handler),
)
// for tests
.post("/v1/record_safekeeper_info/:tenant_id/:timeline_id", |r| {
request_span(r, record_safekeeper_info)

View File

@@ -11,7 +11,6 @@ use tracing::info;
use utils::{
id::{TenantId, TenantTimelineId, TimelineId},
lsn::Lsn,
pausable_failpoint,
};
use crate::{
@@ -163,8 +162,6 @@ async fn pull_timeline(status: TimelineStatus, host: String) -> Result<Response>
filenames.remove(control_file_index);
filenames.insert(0, "safekeeper.control".to_string());
pausable_failpoint!("sk-pull-timeline-after-list-pausable");
info!(
"downloading {} files from safekeeper {}",
filenames.len(),
@@ -186,13 +183,6 @@ async fn pull_timeline(status: TimelineStatus, host: String) -> Result<Response>
let mut file = tokio::fs::File::create(&file_path).await?;
let mut response = client.get(&http_url).send().await?;
if response.status() != reqwest::StatusCode::OK {
bail!(
"pulling file {} failed: status is {}",
filename,
response.status()
);
}
while let Some(chunk) = response.chunk().await? {
file.write_all(&chunk).await?;
file.flush().await?;

View File

@@ -15,7 +15,7 @@ pub async fn task_main(_conf: SafeKeeperConf) -> anyhow::Result<()> {
for tli in &tlis {
let ttid = tli.ttid;
async {
if let Err(e) = tli.maybe_persist_control_file(false).await {
if let Err(e) = tli.maybe_persist_control_file().await {
warn!("failed to persist control file: {e}");
}
if let Err(e) = tli.remove_old_wal().await {

View File

@@ -827,9 +827,9 @@ where
/// Persist control file if there is something to save and enough time
/// passed after the last save.
pub async fn maybe_persist_inmem_control_file(&mut self, force: bool) -> Result<bool> {
pub async fn maybe_persist_inmem_control_file(&mut self) -> Result<bool> {
const CF_SAVE_INTERVAL: Duration = Duration::from_secs(300);
if !force && self.state.pers.last_persist_at().elapsed() < CF_SAVE_INTERVAL {
if self.state.pers.last_persist_at().elapsed() < CF_SAVE_INTERVAL {
return Ok(false);
}
let need_persist = self.state.inmem.commit_lsn > self.state.commit_lsn

View File

@@ -821,9 +821,9 @@ impl Timeline {
/// passed after the last save. This helps to keep remote_consistent_lsn up
/// to date so that storage nodes restart doesn't cause many pageserver ->
/// safekeeper reconnections.
pub async fn maybe_persist_control_file(self: &Arc<Self>, force: bool) -> Result<()> {
pub async fn maybe_persist_control_file(self: &Arc<Self>) -> Result<()> {
let mut guard = self.write_shared_state().await;
let changed = guard.sk.maybe_persist_inmem_control_file(force).await?;
let changed = guard.sk.maybe_persist_inmem_control_file().await?;
guard.skip_update = !changed;
Ok(())
}

View File

@@ -106,7 +106,7 @@ pub async fn main_task(
if !is_active {
// TODO: maybe use tokio::spawn?
if let Err(e) = tli.maybe_persist_control_file(false).await {
if let Err(e) = tli.maybe_persist_control_file().await {
warn!("control file save in update_status failed: {:?}", e);
}
}

View File

@@ -5,8 +5,6 @@ from typing import Any, Type, TypeVar, Union
T = TypeVar("T", bound="Id")
DEFAULT_WAL_SEG_SIZE = 16 * 1024 * 1024
@total_ordering
class Lsn:
@@ -69,9 +67,6 @@ class Lsn:
def as_int(self) -> int:
return self.lsn_int
def segment_lsn(self, seg_sz: int = DEFAULT_WAL_SEG_SIZE) -> "Lsn":
return Lsn(self.lsn_int - (self.lsn_int % seg_sz))
@dataclass(frozen=True)
class Key:

View File

@@ -3771,7 +3771,7 @@ class SafekeeperPort:
@dataclass
class Safekeeper(LogUtils):
class Safekeeper:
"""An object representing a running safekeeper daemon."""
env: NeonEnv
@@ -3779,13 +3779,6 @@ class Safekeeper(LogUtils):
id: int
running: bool = False
def __init__(self, env: NeonEnv, port: SafekeeperPort, id: int, running: bool = False):
self.env = env
self.port = port
self.id = id
self.running = running
self.logfile = Path(self.data_dir) / f"safekeeper-{id}.log"
def start(self, extra_opts: Optional[List[str]] = None) -> "Safekeeper":
assert self.running is False
self.env.neon_cli.safekeeper_start(self.id, extra_opts=extra_opts)
@@ -3846,38 +3839,11 @@ class Safekeeper(LogUtils):
port=self.port.http, auth_token=auth_token, is_testing_enabled=is_testing_enabled
)
def get_timeline_start_lsn(self, tenant_id: TenantId, timeline_id: TimelineId) -> Lsn:
timeline_status = self.http_client().timeline_status(tenant_id, timeline_id)
timeline_start_lsn = timeline_status.timeline_start_lsn
log.info(f"sk {self.id} timeline start LSN: {timeline_start_lsn}")
return timeline_start_lsn
def data_dir(self) -> str:
return os.path.join(self.env.repo_dir, "safekeepers", f"sk{self.id}")
def get_flush_lsn(self, tenant_id: TenantId, timeline_id: TimelineId) -> Lsn:
timeline_status = self.http_client().timeline_status(tenant_id, timeline_id)
flush_lsn = timeline_status.flush_lsn
log.info(f"sk {self.id} flush LSN: {flush_lsn}")
return flush_lsn
def pull_timeline(
self, srcs: list[Safekeeper], tenant_id: TenantId, timeline_id: TimelineId
) -> Dict[str, Any]:
"""
pull_timeline from srcs to self.
"""
src_https = [f"http://localhost:{sk.port.http}" for sk in srcs]
res = self.http_client().pull_timeline(
{"tenant_id": str(tenant_id), "timeline_id": str(timeline_id), "http_hosts": src_https}
)
src_ids = [sk.id for sk in srcs]
log.info(f"finished pulling timeline from {src_ids} to {self.id}")
return res
@property
def data_dir(self) -> Path:
return self.env.repo_dir / "safekeepers" / f"sk{self.id}"
def timeline_dir(self, tenant_id, timeline_id) -> Path:
return self.data_dir / str(tenant_id) / str(timeline_id)
def timeline_dir(self, tenant_id, timeline_id) -> str:
return os.path.join(self.data_dir(), str(tenant_id), str(timeline_id))
def list_segments(self, tenant_id, timeline_id) -> List[str]:
"""
@@ -3890,35 +3856,6 @@ class Safekeeper(LogUtils):
segments.sort()
return segments
def checkpoint_up_to(self, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn):
"""
Assuming pageserver(s) uploaded to s3 up to `lsn`,
1) wait for remote_consistent_lsn and wal_backup_lsn on safekeeper to reach it.
2) checkpoint timeline on safekeeper, which should remove WAL before this LSN.
"""
cli = self.http_client()
def are_lsns_advanced():
stat = cli.timeline_status(tenant_id, timeline_id)
log.info(
f"waiting for remote_consistent_lsn and backup_lsn on sk {self.id} to reach {lsn}, currently remote_consistent_lsn={stat.remote_consistent_lsn}, backup_lsn={stat.backup_lsn}"
)
assert stat.remote_consistent_lsn >= lsn and stat.backup_lsn >= lsn.segment_lsn()
# xxx: max wait is long because we might be waiting for reconnection from
# pageserver to this safekeeper
wait_until(30, 1, are_lsns_advanced)
cli.checkpoint(tenant_id, timeline_id)
def wait_until_paused(self, failpoint: str):
msg = f"at failpoint {failpoint}"
def paused():
log.info(f"waiting for hitting failpoint {failpoint}")
self.assert_log_contains(msg)
wait_until(20, 0.5, paused)
class S3Scrubber:
def __init__(self, env: NeonEnvBuilder, log_dir: Optional[Path] = None):

View File

@@ -177,13 +177,6 @@ class SafekeeperHttpClient(requests.Session):
)
res.raise_for_status()
def checkpoint(self, tenant_id: TenantId, timeline_id: TimelineId):
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint",
json={},
)
res.raise_for_status()
# only_local doesn't remove segments in the remote storage.
def timeline_delete(
self, tenant_id: TenantId, timeline_id: TimelineId, only_local: bool = False

View File

@@ -196,7 +196,7 @@ def query_scalar(cur: cursor, query: str) -> Any:
# Traverse directory to get total size.
def get_dir_size(path: Path) -> int:
def get_dir_size(path: str) -> int:
"""Return size in bytes."""
totalbytes = 0
for root, _dirs, files in os.walk(path):
@@ -560,25 +560,3 @@ def assert_pageserver_backups_equal(left: Path, right: Path, skip_files: Set[str
elapsed = time.time() - started_at
log.info(f"assert_pageserver_backups_equal completed in {elapsed}s")
class PropagatingThread(threading.Thread):
_target: Any
_args: Any
_kwargs: Any
"""
Simple Thread wrapper with join() propagating the possible exception in the thread.
"""
def run(self):
self.exc = None
try:
self.ret = self._target(*self._args, **self._kwargs)
except BaseException as e:
self.exc = e
def join(self, timeout=None):
super(PropagatingThread, self).join(timeout)
if self.exc:
raise self.exc
return self.ret

View File

@@ -1,47 +0,0 @@
\set ECHO queries
\timing
-- prepare test table
DROP TABLE IF EXISTS hnsw_test_table;
CREATE TABLE hnsw_test_table AS TABLE documents WITH NO DATA;
INSERT INTO hnsw_test_table SELECT * FROM documents;
CREATE INDEX ON hnsw_test_table (_id); -- needed later for random tuple queries
-- tune index build params
SET max_parallel_maintenance_workers = 7;
SET maintenance_work_mem = '8GB';
-- create HNSW index for the supported distance metrics
CREATE INDEX ON hnsw_test_table USING hnsw (embeddings vector_cosine_ops);
CREATE INDEX ON hnsw_test_table USING hnsw (embeddings vector_ip_ops);
CREATE INDEX ON hnsw_test_table USING hnsw (embeddings vector_l1_ops);
CREATE INDEX ON hnsw_test_table USING hnsw ((binary_quantize(embeddings)::bit(1536)) bit_hamming_ops);
CREATE INDEX ON hnsw_test_table USING hnsw ((binary_quantize(embeddings)::bit(1536)) bit_jaccard_ops);
-- note: in a second psql session we can monitor the progress of the index build phases using
-- the following query:
-- SELECT phase, round(100.0 * blocks_done / nullif(blocks_total, 0), 1) AS "%" FROM pg_stat_progress_create_index;
-- show all indexes built on the table
SELECT
idx.relname AS index_name,
tbl.relname AS table_name,
am.amname AS access_method,
a.attname AS column_name,
opc.opcname AS operator_class
FROM
pg_index i
JOIN
pg_class idx ON idx.oid = i.indexrelid
JOIN
pg_class tbl ON tbl.oid = i.indrelid
JOIN
pg_am am ON am.oid = idx.relam
JOIN
pg_attribute a ON a.attrelid = tbl.oid AND a.attnum = ANY(i.indkey)
JOIN
pg_opclass opc ON opc.oid = i.indclass[0]
WHERE
tbl.relname = 'hnsw_test_table'
AND a.attname = 'embeddings';
-- show table sizes
\dt+

View File

@@ -1,52 +0,0 @@
\set ECHO queries
\timing
-- prepare test table
DROP TABLE IF EXISTS ivfflat_test_table;
CREATE TABLE ivfflat_test_table AS TABLE documents WITH NO DATA;
INSERT INTO ivfflat_test_table SELECT * FROM documents;
CREATE INDEX ON ivfflat_test_table (_id); -- needed later for random tuple queries
-- tune index build params
SET max_parallel_maintenance_workers = 7;
SET maintenance_work_mem = '8GB';
-- create ivfflat index for the supported distance metrics
-- the formulat for lists is # rows / 1000 or sqrt(# rows) if # rows > 1 million
-- we have 1 million embeddings of vector size 1536 in column embeddings of table documents
-- so we use 1000 lists
CREATE INDEX ON ivfflat_test_table USING ivfflat (embeddings vector_l2_ops) WITH (lists = 1000);
CREATE INDEX ON ivfflat_test_table USING ivfflat (embeddings vector_ip_ops) WITH (lists = 1000);
CREATE INDEX ON ivfflat_test_table USING ivfflat (embeddings vector_cosine_ops) WITH (lists = 1000);
CREATE INDEX ON ivfflat_test_table USING ivfflat (embeddings::halfvec(1536) halfvec_l2_ops) WITH (lists = 1000);
CREATE INDEX ON ivfflat_test_table
USING ivfflat ((binary_quantize(embeddings)::bit(1536)) bit_hamming_ops) WITH (lists = 1000);
\d ivfflat_test_table
-- show all indexes built on the table
SELECT
idx.relname AS index_name,
tbl.relname AS table_name,
am.amname AS access_method,
a.attname AS column_name,
opc.opcname AS operator_class
FROM
pg_index i
JOIN
pg_class idx ON idx.oid = i.indexrelid
JOIN
pg_class tbl ON tbl.oid = i.indrelid
JOIN
pg_am am ON am.oid = idx.relam
JOIN
pg_attribute a ON a.attrelid = tbl.oid AND a.attnum = ANY(i.indkey)
JOIN
pg_opclass opc ON opc.oid = i.indclass[0]
WHERE
tbl.relname = 'ivfflat_test_table'
AND a.attname = 'embeddings';
-- show table sizes
\dt+

View File

@@ -1,55 +0,0 @@
# Source of the dataset for pgvector tests
This readme was copied from https://huggingface.co/datasets/Qdrant/dbpedia-entities-openai3-text-embedding-3-large-1536-1M
## Download the parquet files
```bash
brew install git-lfs
git-lfs clone https://huggingface.co/datasets/Qdrant/dbpedia-entities-openai3-text-embedding-3-large-1536-1M
```
## Load into postgres:
see loaddata.py in this directory
## Rest of dataset card as on huggingface
---
dataset_info:
features:
- name: _id
dtype: string
- name: title
dtype: string
- name: text
dtype: string
- name: text-embedding-3-large-1536-embedding
sequence: float64
splits:
- name: train
num_bytes: 12679725776
num_examples: 1000000
download_size: 9551862565
dataset_size: 12679725776
configs:
- config_name: default
data_files:
- split: train
path: data/train-*
license: mit
task_categories:
- feature-extraction
language:
- en
size_categories:
- 1M<n<10M
---
1M OpenAI Embeddings: text-embedding-3-large 1536 dimensions
- Created: February 2024.
- Text used for Embedding: title (string) + text (string)
- Embedding Model: OpenAI text-embedding-3-large
- This dataset was generated from the first 1M entries of https://huggingface.co/datasets/BeIR/dbpedia-entity, extracted by @KShivendu_

View File

@@ -1,72 +0,0 @@
import sys
from pathlib import Path
import numpy as np
import pandas as pd
import psycopg2
from pgvector.psycopg2 import register_vector
from psycopg2.extras import execute_values
def print_usage():
print("Usage: loaddata.py <CONNSTR> <DATADIR>")
def main(conn_str, directory_path):
# Connection to PostgreSQL
with psycopg2.connect(conn_str) as conn:
with conn.cursor() as cursor:
# Run SQL statements
cursor.execute("CREATE EXTENSION IF NOT EXISTS vector;")
register_vector(conn)
cursor.execute("DROP TABLE IF EXISTS documents;")
cursor.execute(
"""
CREATE TABLE documents (
_id TEXT PRIMARY KEY,
title TEXT,
text TEXT,
embeddings vector(1536) -- text-embedding-3-large-1536-embedding (OpenAI)
);
"""
)
conn.commit()
# List and sort Parquet files
parquet_files = sorted(Path(directory_path).glob("*.parquet"))
for file in parquet_files:
print(f"Loading {file} into PostgreSQL")
df = pd.read_parquet(file)
print(df.head())
data_list = [
(
row["_id"],
row["title"],
row["text"],
np.array(row["text-embedding-3-large-1536-embedding"]),
)
for index, row in df.iterrows()
]
# Use execute_values to perform batch insertion
execute_values(
cursor,
"INSERT INTO documents (_id, title, text, embeddings) VALUES %s",
data_list,
)
# Commit after we insert all embeddings
conn.commit()
print(f"Loaded {file} into PostgreSQL")
if __name__ == "__main__":
if len(sys.argv) != 3:
print_usage()
sys.exit(1)
conn_str = sys.argv[1]
directory_path = sys.argv[2]
main(conn_str, directory_path)

View File

@@ -1,10 +0,0 @@
with x (x) as (
select "embeddings" as x
from hnsw_test_table
TABLESAMPLE SYSTEM (1)
LIMIT 1
)
SELECT title, "embeddings" <=> (select x from x) as distance
FROM hnsw_test_table
ORDER BY 2
LIMIT 30;

View File

@@ -1,13 +0,0 @@
-- run with pooled connection
-- pgbench -T 300 -c 100 -j20 -f pgbench_hnsw_queries.sql -postgresql://neondb_owner:<secret>@ep-floral-thunder-w1gzhaxi-pooler.eu-west-1.aws.neon.build/neondb?sslmode=require"
with x (x) as (
select "embeddings" as x
from hnsw_test_table
TABLESAMPLE SYSTEM (1)
LIMIT 1
)
SELECT title, "embeddings" <=> (select x from x) as distance
FROM hnsw_test_table
ORDER BY 2
LIMIT 30;

View File

@@ -100,25 +100,6 @@ QUERIES: Tuple[LabelledQuery, ...] = (
)
# fmt: on
# A list of pgvector HNSW index builds to run.
# Please do not alter the label for the query, as it is used to identify it.
#
# Disable auto formatting for the list of queries so that it's easier to read
# fmt: off
PGVECTOR_QUERIES: Tuple[LabelledQuery, ...] = (
LabelledQuery("PGV0", r"DROP TABLE IF EXISTS hnsw_test_table;"),
LabelledQuery("PGV1", r"CREATE TABLE hnsw_test_table AS TABLE documents WITH NO DATA;"),
LabelledQuery("PGV2", r"INSERT INTO hnsw_test_table SELECT * FROM documents;"),
LabelledQuery("PGV3", r"CREATE INDEX ON hnsw_test_table (_id);"),
LabelledQuery("PGV4", r"CREATE INDEX ON hnsw_test_table USING hnsw (embeddings vector_cosine_ops);"),
LabelledQuery("PGV5", r"CREATE INDEX ON hnsw_test_table USING hnsw (embeddings vector_ip_ops);"),
LabelledQuery("PGV6", r"CREATE INDEX ON hnsw_test_table USING hnsw (embeddings vector_l1_ops);"),
LabelledQuery("PGV7", r"CREATE INDEX ON hnsw_test_table USING hnsw ((binary_quantize(embeddings)::bit(1536)) bit_hamming_ops);"),
LabelledQuery("PGV8", r"CREATE INDEX ON hnsw_test_table USING hnsw ((binary_quantize(embeddings)::bit(1536)) bit_jaccard_ops);"),
)
# fmt: on
EXPLAIN_STRING: str = "EXPLAIN (ANALYZE, VERBOSE, BUFFERS, COSTS, SETTINGS, FORMAT JSON)"
@@ -264,18 +245,3 @@ def test_clickbench_collect_pg_stat_statements(remote_compare: RemoteCompare):
log.info("Collecting pg_stat_statements")
query = LabelledQuery("Q_COLLECT_PG_STAT_STATEMENTS", r"SELECT * from pg_stat_statements;")
run_psql(remote_compare, query, times=1, explain=False)
@pytest.mark.parametrize("query", PGVECTOR_QUERIES)
@pytest.mark.remote_cluster
def test_pgvector_indexing(query: LabelledQuery, remote_compare: RemoteCompare):
"""
An pgvector test that tests HNSW index build performance and parallelism.
The DB prepared manually in advance.
See
- test_runner/performance/pgvector/README.md
- test_runner/performance/pgvector/loaddata.py
- test_runner/performance/pgvector/HNSW_build.sql
"""
run_psql(remote_compare, query, times=1, explain=False)

View File

@@ -17,7 +17,6 @@ class PgBenchLoadType(enum.Enum):
INIT = "init"
SIMPLE_UPDATE = "simple-update"
SELECT_ONLY = "select-only"
PGVECTOR_HNSW = "pgvector-hnsw"
def utc_now_timestamp() -> int:
@@ -133,26 +132,6 @@ def run_test_pgbench(env: PgCompare, scale: int, duration: int, workload_type: P
password=password,
)
if workload_type == PgBenchLoadType.PGVECTOR_HNSW:
# Run simple-update workload
run_pgbench(
env,
"pgvector-hnsw",
[
"pgbench",
"-f",
"test_runner/performance/pgvector/pgbench_custom_script_pgvector_hsnw_queries.sql",
"-c100",
"-j20",
f"-T{duration}",
"-P2",
"--protocol=prepared",
"--progress-timestamp",
connstr,
],
password=password,
)
env.report_size()
@@ -222,13 +201,3 @@ def test_pgbench_remote_simple_update(remote_compare: PgCompare, scale: int, dur
@pytest.mark.remote_cluster
def test_pgbench_remote_select_only(remote_compare: PgCompare, scale: int, duration: int):
run_test_pgbench(remote_compare, scale, duration, PgBenchLoadType.SELECT_ONLY)
# The following test runs on an existing database that has pgvector extension installed
# and a table with 1 million embedding vectors loaded and indexed with HNSW.
#
# Run this pgbench tests against an existing remote Postgres cluster with the necessary setup.
@pytest.mark.parametrize("duration", get_durations_matrix())
@pytest.mark.remote_cluster
def test_pgbench_remote_pgvector(remote_compare: PgCompare, duration: int):
run_test_pgbench(remote_compare, 1, duration, PgBenchLoadType.PGVECTOR_HNSW)

View File

@@ -1,22 +0,0 @@
import time
from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup
#
# Test that redo of XLOG_GIN_VACUUM_PAGE doesn't produce error
#
def test_gin_redo(neon_simple_env: NeonEnv):
env = neon_simple_env
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
time.sleep(1)
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
con = primary.connect()
cur = con.cursor()
cur.execute("create table gin_test_tbl(id integer, i int4[])")
cur.execute("create index gin_test_idx on gin_test_tbl using gin (i)")
cur.execute("insert into gin_test_tbl select g,array[3, 1, g] from generate_series(1, 10000) g")
cur.execute("delete from gin_test_tbl where id % 2 = 0")
cur.execute("vacuum gin_test_tbl")
wait_replica_caughtup(primary, secondary)

View File

@@ -23,6 +23,7 @@ from fixtures.log_helper import log
from fixtures.metrics import parse_metrics
from fixtures.neon_fixtures import (
Endpoint,
NeonEnv,
NeonEnvBuilder,
NeonPageserver,
PgBin,
@@ -47,7 +48,7 @@ from fixtures.remote_storage import (
)
from fixtures.safekeeper.http import SafekeeperHttpClient
from fixtures.safekeeper.utils import are_walreceivers_absent
from fixtures.utils import PropagatingThread, get_dir_size, query_scalar, start_in_background
from fixtures.utils import get_dir_size, query_scalar, start_in_background
def wait_lsn_force_checkpoint(
@@ -359,7 +360,7 @@ def test_wal_removal(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
# We will wait for first segment removal. Make sure they exist for starter.
first_segments = [
sk.timeline_dir(tenant_id, timeline_id) / "000000010000000000000001"
os.path.join(sk.data_dir(), str(tenant_id), str(timeline_id), "000000010000000000000001")
for sk in env.safekeepers
]
assert all(os.path.exists(p) for p in first_segments)
@@ -444,7 +445,7 @@ def is_flush_lsn_caught_up(sk: Safekeeper, tenant_id: TenantId, timeline_id: Tim
def is_wal_trimmed(sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId, target_size_mb):
http_cli = sk.http_client()
tli_status = http_cli.timeline_status(tenant_id, timeline_id)
sk_wal_size = get_dir_size(sk.timeline_dir(tenant_id, timeline_id))
sk_wal_size = get_dir_size(os.path.join(sk.data_dir(), str(tenant_id), str(timeline_id)))
sk_wal_size_mb = sk_wal_size / 1024 / 1024
log.info(f"Safekeeper id={sk.id} wal_size={sk_wal_size_mb:.2f}MB status={tli_status}")
return sk_wal_size_mb <= target_size_mb
@@ -590,10 +591,10 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder):
# save the last (partial) file to put it back after recreation; others will be fetched from s3
sk = env.safekeepers[0]
tli_dir = Path(sk.data_dir) / str(tenant_id) / str(timeline_id)
tli_dir = Path(sk.data_dir()) / str(tenant_id) / str(timeline_id)
f_partial = Path([f for f in os.listdir(tli_dir) if f.endswith(".partial")][0])
f_partial_path = tli_dir / f_partial
f_partial_saved = Path(sk.data_dir) / f_partial.name
f_partial_saved = Path(sk.data_dir()) / f_partial.name
f_partial_path.rename(f_partial_saved)
pg_version = sk.http_client().timeline_status(tenant_id, timeline_id).pg_version
@@ -615,7 +616,7 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder):
cli = sk.http_client()
cli.timeline_create(tenant_id, timeline_id, pg_version, last_lsn)
f_partial_path = (
Path(sk.data_dir) / str(tenant_id) / str(timeline_id) / f_partial_saved.name
Path(sk.data_dir()) / str(tenant_id) / str(timeline_id) / f_partial_saved.name
)
shutil.copy(f_partial_saved, f_partial_path)
@@ -1131,8 +1132,8 @@ def cmp_sk_wal(sks: List[Safekeeper], tenant_id: TenantId, timeline_id: Timeline
)
for f in mismatch:
f1 = sk0.timeline_dir(tenant_id, timeline_id) / f
f2 = sk.timeline_dir(tenant_id, timeline_id) / f
f1 = os.path.join(sk0.timeline_dir(tenant_id, timeline_id), f)
f2 = os.path.join(sk.timeline_dir(tenant_id, timeline_id), f)
stdout_filename = f"{f2}.filediff"
with open(stdout_filename, "w") as stdout_f:
@@ -1630,7 +1631,7 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
with conn.cursor() as cur:
cur.execute("CREATE TABLE t(key int primary key)")
sk = env.safekeepers[0]
sk_data_dir = sk.data_dir
sk_data_dir = Path(sk.data_dir())
if not auth_enabled:
sk_http = sk.http_client()
sk_http_other = sk_http
@@ -1723,6 +1724,9 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
def test_pull_timeline(neon_env_builder: NeonEnvBuilder):
def safekeepers_guc(env: NeonEnv, sk_names: List[int]) -> str:
return ",".join([f"localhost:{sk.port.pg}" for sk in env.safekeepers if sk.id in sk_names])
def execute_payload(endpoint: Endpoint):
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
@@ -1808,65 +1812,6 @@ def test_pull_timeline(neon_env_builder: NeonEnvBuilder):
show_statuses(env.safekeepers, tenant_id, timeline_id)
# Test pull_timeline while concurrently gc'ing WAL on safekeeper:
# 1) Start pull_timeline, listing files to fetch.
# 2) Write segment, do gc.
# 3) Finish pull_timeline.
# 4) Do some write, verify integrity with timeline_digest.
# Expected to fail while holding off WAL gc plus fetching commit_lsn WAL
# segment is not implemented.
@pytest.mark.xfail
def test_pull_timeline_gc(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage())
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
(src_sk, dst_sk) = (env.safekeepers[0], env.safekeepers[2])
log.info("use only first 2 safekeepers, 3rd will be seeded")
endpoint = env.endpoints.create("main")
endpoint.active_safekeepers = [1, 2]
endpoint.start()
endpoint.safe_psql("create table t(key int, value text)")
endpoint.safe_psql("insert into t select generate_series(1, 1000), 'pear'")
src_flush_lsn = src_sk.get_flush_lsn(tenant_id, timeline_id)
log.info(f"flush_lsn on src before pull_timeline: {src_flush_lsn}")
dst_http = dst_sk.http_client()
# run pull_timeline which will halt before downloading files
dst_http.configure_failpoints(("sk-pull-timeline-after-list-pausable", "pause"))
pt_handle = PropagatingThread(
target=dst_sk.pull_timeline, args=([src_sk], tenant_id, timeline_id)
)
pt_handle.start()
dst_sk.wait_until_paused("sk-pull-timeline-after-list-pausable")
# ensure segment exists
endpoint.safe_psql("insert into t select generate_series(1, 180000), 'papaya'")
lsn = last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id)
assert lsn > Lsn("0/2000000")
# Checkpoint timeline beyond lsn.
src_sk.checkpoint_up_to(tenant_id, timeline_id, lsn)
first_segment_p = src_sk.timeline_dir(tenant_id, timeline_id) / "000000010000000000000001"
log.info(f"first segment exist={os.path.exists(first_segment_p)}")
dst_http.configure_failpoints(("sk-pull-timeline-after-list-pausable", "off"))
pt_handle.join()
timeline_start_lsn = src_sk.get_timeline_start_lsn(tenant_id, timeline_id)
dst_flush_lsn = dst_sk.get_flush_lsn(tenant_id, timeline_id)
log.info(f"flush_lsn on dst after pull_timeline: {dst_flush_lsn}")
assert dst_flush_lsn >= src_flush_lsn
digests = [
sk.http_client().timeline_digest(tenant_id, timeline_id, timeline_start_lsn, dst_flush_lsn)
for sk in [src_sk, dst_sk]
]
assert digests[0] == digests[1], f"digest on src is {digests[0]} but on dst is {digests[1]}"
# In this test we check for excessive START_REPLICATION and START_WAL_PUSH queries
# when compute is active, but there are no writes to the timeline. In that case
# pageserver should maintain a single connection to safekeeper and don't attempt

View File

@@ -531,64 +531,6 @@ def test_recovery_uncommitted(neon_env_builder: NeonEnvBuilder):
asyncio.run(run_recovery_uncommitted(env))
async def run_wal_truncation(env: NeonEnv):
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
(sk1, sk2, sk3) = env.safekeepers
ep = env.endpoints.create_start("main")
ep.safe_psql("create table t (key int, value text)")
ep.safe_psql("insert into t select generate_series(1, 100), 'payload'")
# insert with only one sk3 up to create tail of flushed but not committed WAL on it
sk1.stop()
sk2.stop()
conn = await ep.connect_async()
# query should hang, so execute in separate task
bg_query = asyncio.create_task(
conn.execute("insert into t select generate_series(1, 180000), 'Papaya'")
)
sleep_sec = 2
await asyncio.sleep(sleep_sec)
# it must still be not finished
assert not bg_query.done()
# note: destoy will kill compute_ctl, preventing it waiting for hanging sync-safekeepers.
ep.stop_and_destroy()
# stop sk3 as well
sk3.stop()
# now start sk1 and sk2 and make them commit something
sk1.start()
sk2.start()
ep = env.endpoints.create_start(
"main",
)
ep.safe_psql("insert into t select generate_series(1, 200), 'payload'")
# start sk3 and wait for it to catch up
sk3.start()
flush_lsn = Lsn(ep.safe_psql_scalar("SELECT pg_current_wal_flush_lsn()"))
await wait_for_lsn(sk3, tenant_id, timeline_id, flush_lsn)
timeline_start_lsn = sk1.get_timeline_start_lsn(tenant_id, timeline_id)
digests = [
sk.http_client().timeline_digest(tenant_id, timeline_id, timeline_start_lsn, flush_lsn)
for sk in [sk1, sk2]
]
assert digests[0] == digests[1], f"digest on sk1 is {digests[0]} but on sk3 is {digests[1]}"
# Simple deterministic test creating tail of WAL on safekeeper which is
# truncated when majority without this sk elects walproposer starting earlier.
def test_wal_truncation(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()
asyncio.run(run_wal_truncation(env))
async def run_segment_init_failure(env: NeonEnv):
env.neon_cli.create_branch("test_segment_init_failure")
ep = env.endpoints.create_start("test_segment_init_failure")

View File

@@ -1,5 +1,5 @@
{
"v16": ["16.3", "b228f20372ebcabfd7946647cb7adbd38bacb14a"],
"v15": ["15.7", "c2c3d40534db97d83dd7e185d1971e707fa2f445"],
"v14": ["14.12", "17e0f5ff4e1905691aa40e1e08f9b79b14c99652"]
"v16": ["16.3", "3c2b9d576c580e0b5b7108001f959b8c5b42e0a2"],
"v15": ["15.7", "74fb144890c4f955db1ef50ee1eeb9d8a6c2f69d"],
"v14": ["14.12", "0d30e28f74f49fe6a27a6bd45dcfeb1060656b8f"]
}

View File

@@ -59,7 +59,7 @@ regex = { version = "1" }
regex-automata = { version = "0.4", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] }
regex-syntax = { version = "0.8" }
reqwest-5ef9efb8ec2df382 = { package = "reqwest", version = "0.12", default-features = false, features = ["blocking", "json", "rustls-tls", "stream"] }
reqwest-a6292c17cd707f01 = { package = "reqwest", version = "0.11", default-features = false, features = ["blocking", "rustls-tls", "stream"] }
reqwest-a6292c17cd707f01 = { package = "reqwest", version = "0.11", default-features = false, features = ["blocking", "default-tls", "stream"] }
rustls = { version = "0.21", features = ["dangerous_configuration"] }
scopeguard = { version = "1" }
serde = { version = "1", features = ["alloc", "derive"] }
@@ -68,7 +68,7 @@ sha2 = { version = "0.10", features = ["asm"] }
smallvec = { version = "1", default-features = false, features = ["const_new", "write"] }
subtle = { version = "2" }
sync_wrapper = { version = "0.1", default-features = false, features = ["futures"] }
time = { version = "0.3", features = ["macros", "serde-well-known"] }
time = { version = "0.3", features = ["local-offset", "macros", "serde-well-known"] }
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "test-util"] }
tokio-rustls = { version = "0.24" }
tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] }