mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-21 15:10:44 +00:00
Compare commits
1 Commits
thang/exp-
...
try-parkin
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2a15415442 |
@@ -100,8 +100,10 @@ jobs:
|
||||
name: Rust build << parameters.build_type >>
|
||||
command: |
|
||||
if [[ $BUILD_TYPE == "debug" ]]; then
|
||||
cov_prefix=(scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage run)
|
||||
CARGO_FLAGS=
|
||||
elif [[ $BUILD_TYPE == "release" ]]; then
|
||||
cov_prefix=()
|
||||
CARGO_FLAGS="--release --features profiling"
|
||||
fi
|
||||
|
||||
@@ -110,7 +112,7 @@ jobs:
|
||||
export RUSTC_WRAPPER=cachepot
|
||||
export AWS_ACCESS_KEY_ID="${CACHEPOT_AWS_ACCESS_KEY_ID}"
|
||||
export AWS_SECRET_ACCESS_KEY="${CACHEPOT_AWS_SECRET_ACCESS_KEY}"
|
||||
mold -run cargo build $CARGO_FLAGS --features failpoints --bins --tests
|
||||
"${cov_prefix[@]}" mold -run cargo build $CARGO_FLAGS --features failpoints --bins --tests
|
||||
cachepot -s
|
||||
|
||||
- save_cache:
|
||||
@@ -126,24 +128,32 @@ jobs:
|
||||
name: cargo test
|
||||
command: |
|
||||
if [[ $BUILD_TYPE == "debug" ]]; then
|
||||
cov_prefix=(scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage run)
|
||||
CARGO_FLAGS=
|
||||
elif [[ $BUILD_TYPE == "release" ]]; then
|
||||
cov_prefix=()
|
||||
CARGO_FLAGS=--release
|
||||
fi
|
||||
|
||||
cargo test $CARGO_FLAGS
|
||||
"${cov_prefix[@]}" cargo test $CARGO_FLAGS
|
||||
|
||||
# Install the rust binaries, for use by test jobs
|
||||
- run:
|
||||
name: Install rust binaries
|
||||
command: |
|
||||
if [[ $BUILD_TYPE == "debug" ]]; then
|
||||
cov_prefix=(scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage run)
|
||||
elif [[ $BUILD_TYPE == "release" ]]; then
|
||||
cov_prefix=()
|
||||
fi
|
||||
|
||||
binaries=$(
|
||||
cargo metadata --format-version=1 --no-deps |
|
||||
"${cov_prefix[@]}" cargo metadata --format-version=1 --no-deps |
|
||||
jq -r '.packages[].targets[] | select(.kind | index("bin")) | .name'
|
||||
)
|
||||
|
||||
test_exe_paths=$(
|
||||
cargo test --message-format=json --no-run |
|
||||
"${cov_prefix[@]}" cargo test --message-format=json --no-run |
|
||||
jq -r '.executable | select(. != null)'
|
||||
)
|
||||
|
||||
@@ -156,15 +166,34 @@ jobs:
|
||||
SRC=target/$BUILD_TYPE/$bin
|
||||
DST=/tmp/zenith/bin/$bin
|
||||
cp $SRC $DST
|
||||
echo $DST >> /tmp/zenith/etc/binaries.list
|
||||
done
|
||||
|
||||
# Install test executables (for code coverage)
|
||||
if [[ $BUILD_TYPE == "debug" ]]; then
|
||||
for bin in $test_exe_paths; do
|
||||
SRC=$bin
|
||||
DST=/tmp/zenith/test_bin/$(basename $bin)
|
||||
cp $SRC $DST
|
||||
echo $DST >> /tmp/zenith/etc/binaries.list
|
||||
done
|
||||
fi
|
||||
|
||||
# Install the postgres binaries, for use by test jobs
|
||||
- run:
|
||||
name: Install postgres binaries
|
||||
command: |
|
||||
cp -a tmp_install /tmp/zenith/pg_install
|
||||
|
||||
# Save rust binaries for other jobs in the workflow
|
||||
- run:
|
||||
name: Merge coverage data
|
||||
command: |
|
||||
# This will speed up workspace uploads
|
||||
if [[ $BUILD_TYPE == "debug" ]]; then
|
||||
scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage merge
|
||||
fi
|
||||
|
||||
# Save the rust binaries and coverage data for other jobs in this workflow.
|
||||
- persist_to_workspace:
|
||||
root: /tmp/zenith
|
||||
paths:
|
||||
@@ -285,6 +314,12 @@ jobs:
|
||||
|
||||
export GITHUB_SHA=$CIRCLE_SHA1
|
||||
|
||||
if [[ $BUILD_TYPE == "debug" ]]; then
|
||||
cov_prefix=(scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage run)
|
||||
elif [[ $BUILD_TYPE == "release" ]]; then
|
||||
cov_prefix=()
|
||||
fi
|
||||
|
||||
# Run the tests.
|
||||
#
|
||||
# The junit.xml file allows CircleCI to display more fine-grained test information
|
||||
@@ -295,7 +330,7 @@ jobs:
|
||||
# -n4 uses four processes to run tests via pytest-xdist
|
||||
# -s is not used to prevent pytest from capturing output, because tests are running
|
||||
# in parallel and logs are mixed between different tests
|
||||
./scripts/pytest \
|
||||
"${cov_prefix[@]}" ./scripts/pytest \
|
||||
--junitxml=$TEST_OUTPUT/junit.xml \
|
||||
--tb=short \
|
||||
--verbose \
|
||||
@@ -324,12 +359,67 @@ jobs:
|
||||
# The store_test_results step tells CircleCI where to find the junit.xml file.
|
||||
- store_test_results:
|
||||
path: /tmp/test_output
|
||||
# Save data (if any)
|
||||
- run:
|
||||
name: Merge coverage data
|
||||
command: |
|
||||
# This will speed up workspace uploads
|
||||
if [[ $BUILD_TYPE == "debug" ]]; then
|
||||
scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage merge
|
||||
fi
|
||||
# Save coverage data (if any)
|
||||
- persist_to_workspace:
|
||||
root: /tmp/zenith
|
||||
paths:
|
||||
- "*"
|
||||
|
||||
coverage-report:
|
||||
executor: neon-xlarge-executor
|
||||
steps:
|
||||
- attach_workspace:
|
||||
at: /tmp/zenith
|
||||
- checkout
|
||||
- restore_cache:
|
||||
name: Restore rust cache
|
||||
keys:
|
||||
# Require an exact match. While an out of date cache might speed up the build,
|
||||
# there's no way to clean out old packages, so the cache grows every time something
|
||||
# changes.
|
||||
- v04-rust-cache-deps-debug-{{ checksum "Cargo.lock" }}
|
||||
- run:
|
||||
name: Build coverage report
|
||||
command: |
|
||||
COMMIT_URL=https://github.com/neondatabase/neon/commit/$CIRCLE_SHA1
|
||||
|
||||
scripts/coverage \
|
||||
--dir=/tmp/zenith/coverage report \
|
||||
--input-objects=/tmp/zenith/etc/binaries.list \
|
||||
--commit-url=$COMMIT_URL \
|
||||
--format=github
|
||||
- run:
|
||||
name: Upload coverage report
|
||||
command: |
|
||||
LOCAL_REPO=$CIRCLE_PROJECT_USERNAME/$CIRCLE_PROJECT_REPONAME
|
||||
REPORT_URL=https://neondatabase.github.io/zenith-coverage-data/$CIRCLE_SHA1
|
||||
COMMIT_URL=https://github.com/neondatabase/neon/commit/$CIRCLE_SHA1
|
||||
|
||||
scripts/git-upload \
|
||||
--repo=https://$VIP_VAP_ACCESS_TOKEN@github.com/neondatabase/zenith-coverage-data.git \
|
||||
--message="Add code coverage for $COMMIT_URL" \
|
||||
copy /tmp/zenith/coverage/report $CIRCLE_SHA1 # COPY FROM TO_RELATIVE
|
||||
|
||||
# Add link to the coverage report to the commit
|
||||
curl -f -X POST \
|
||||
https://api.github.com/repos/$LOCAL_REPO/statuses/$CIRCLE_SHA1 \
|
||||
-H "Accept: application/vnd.github.v3+json" \
|
||||
--user "$CI_ACCESS_TOKEN" \
|
||||
--data \
|
||||
"{
|
||||
\"state\": \"success\",
|
||||
\"context\": \"zenith-coverage\",
|
||||
\"description\": \"Coverage report is ready\",
|
||||
\"target_url\": \"$REPORT_URL\"
|
||||
}"
|
||||
|
||||
# Build neondatabase/neon:latest image and push it to Docker hub
|
||||
docker-image:
|
||||
docker:
|
||||
@@ -640,6 +730,12 @@ workflows:
|
||||
save_perf_report: true
|
||||
requires:
|
||||
- build-neon-release
|
||||
- coverage-report:
|
||||
# Context passes credentials for gh api
|
||||
context: CI_ACCESS_TOKEN
|
||||
requires:
|
||||
# TODO: consider adding more
|
||||
- other-tests-debug
|
||||
- docker-image:
|
||||
# Context gives an ability to login
|
||||
context: Docker Hub
|
||||
|
||||
@@ -92,7 +92,7 @@ runs:
|
||||
fi
|
||||
|
||||
if [[ "${{ inputs.build_type }}" == "debug" ]]; then
|
||||
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
|
||||
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/neon/coverage run)
|
||||
elif [[ "${{ inputs.build_type }}" == "release" ]]; then
|
||||
cov_prefix=()
|
||||
fi
|
||||
|
||||
@@ -6,7 +6,7 @@ runs:
|
||||
steps:
|
||||
- name: Merge coverage data
|
||||
shell: bash -ex {0}
|
||||
run: scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage merge
|
||||
run: scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/neon/coverage/ merge
|
||||
|
||||
- name: Upload coverage data
|
||||
uses: actions/upload-artifact@v3
|
||||
@@ -14,4 +14,4 @@ runs:
|
||||
retention-days: 7
|
||||
if-no-files-found: error
|
||||
name: coverage-data-artifact
|
||||
path: /tmp/coverage/
|
||||
path: /tmp/neon/coverage/
|
||||
|
||||
38
.github/workflows/build_and_test.yml
vendored
38
.github/workflows/build_and_test.yml
vendored
@@ -17,6 +17,11 @@ concurrency:
|
||||
env:
|
||||
RUST_BACKTRACE: 1
|
||||
COPT: '-Werror'
|
||||
AWS_ACCESS_KEY_ID: ${{ secrets.CACHEPOT_AWS_ACCESS_KEY_ID }}
|
||||
AWS_SECRET_ACCESS_KEY: ${{ secrets.CACHEPOT_AWS_SECRET_ACCESS_KEY }}
|
||||
CACHEPOT_BUCKET: zenith-rust-cachepot
|
||||
RUSTC_WRAPPER: cachepot
|
||||
|
||||
|
||||
jobs:
|
||||
build-postgres:
|
||||
@@ -49,7 +54,7 @@ jobs:
|
||||
|
||||
- name: Build postgres
|
||||
if: steps.cache_pg.outputs.cache-hit != 'true'
|
||||
run: mold -run make postgres -j$(nproc)
|
||||
run: COPT='-Werror' mold -run make postgres -j$(nproc)
|
||||
|
||||
# actions/cache@v3 does not allow concurrently using the same cache across job steps, so use a separate cache
|
||||
- name: Prepare postgres artifact
|
||||
@@ -101,15 +106,12 @@ jobs:
|
||||
~/.cargo/registry/
|
||||
~/.cargo/git/
|
||||
target/
|
||||
# Fall back to older versions of the key, if no cache for current Cargo.lock was found
|
||||
key: |
|
||||
v2-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-${{ hashFiles('Cargo.lock') }}
|
||||
v2-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-
|
||||
key: v2-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-${{ hashFiles('Cargo.lock') }}
|
||||
|
||||
- name: Run cargo build
|
||||
run: |
|
||||
if [[ $BUILD_TYPE == "debug" ]]; then
|
||||
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
|
||||
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/neon/coverage run)
|
||||
CARGO_FLAGS=
|
||||
elif [[ $BUILD_TYPE == "release" ]]; then
|
||||
cov_prefix=()
|
||||
@@ -117,11 +119,12 @@ jobs:
|
||||
fi
|
||||
|
||||
"${cov_prefix[@]}" mold -run cargo build $CARGO_FLAGS --features failpoints --bins --tests
|
||||
cachepot -s
|
||||
|
||||
- name: Run cargo test
|
||||
run: |
|
||||
if [[ $BUILD_TYPE == "debug" ]]; then
|
||||
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
|
||||
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/neon/coverage run)
|
||||
CARGO_FLAGS=
|
||||
elif [[ $BUILD_TYPE == "release" ]]; then
|
||||
cov_prefix=()
|
||||
@@ -133,7 +136,7 @@ jobs:
|
||||
- name: Install rust binaries
|
||||
run: |
|
||||
if [[ $BUILD_TYPE == "debug" ]]; then
|
||||
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
|
||||
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/neon/coverage run)
|
||||
elif [[ $BUILD_TYPE == "release" ]]; then
|
||||
cov_prefix=()
|
||||
fi
|
||||
@@ -151,9 +154,7 @@ jobs:
|
||||
mkdir -p /tmp/neon/bin/
|
||||
mkdir -p /tmp/neon/test_bin/
|
||||
mkdir -p /tmp/neon/etc/
|
||||
|
||||
# Keep bloated coverage data files away from the rest of the artifact
|
||||
mkdir -p /tmp/coverage/
|
||||
mkdir -p /tmp/neon/coverage/
|
||||
|
||||
# Install target binaries
|
||||
for bin in $binaries; do
|
||||
@@ -165,13 +166,13 @@ jobs:
|
||||
# Install test executables and write list of all binaries (for code coverage)
|
||||
if [[ $BUILD_TYPE == "debug" ]]; then
|
||||
for bin in $binaries; do
|
||||
echo "/tmp/neon/bin/$bin" >> /tmp/coverage/binaries.list
|
||||
echo "/tmp/neon/bin/$bin" >> /tmp/neon/coverage/binaries.list
|
||||
done
|
||||
for bin in $test_exe_paths; do
|
||||
SRC=$bin
|
||||
DST=/tmp/neon/test_bin/$(basename $bin)
|
||||
cp "$SRC" "$DST"
|
||||
echo "$DST" >> /tmp/coverage/binaries.list
|
||||
echo "$DST" >> /tmp/neon/coverage/binaries.list
|
||||
done
|
||||
fi
|
||||
|
||||
@@ -315,10 +316,7 @@ jobs:
|
||||
uses: actions/download-artifact@v3
|
||||
with:
|
||||
name: coverage-data-artifact
|
||||
path: /tmp/coverage/
|
||||
|
||||
- name: Merge coverage data
|
||||
run: scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage merge
|
||||
path: /tmp/neon/coverage/
|
||||
|
||||
- name: Build and upload coverage report
|
||||
run: |
|
||||
@@ -327,8 +325,8 @@ jobs:
|
||||
COMMIT_URL=https://github.com/${{ github.repository }}/commit/$COMMIT_SHA
|
||||
|
||||
scripts/coverage \
|
||||
--dir=/tmp/coverage report \
|
||||
--input-objects=/tmp/coverage/binaries.list \
|
||||
--dir=/tmp/neon/coverage report \
|
||||
--input-objects=/tmp/neon/coverage/binaries.list \
|
||||
--commit-url=$COMMIT_URL \
|
||||
--format=github
|
||||
|
||||
@@ -337,7 +335,7 @@ jobs:
|
||||
scripts/git-upload \
|
||||
--repo=https://${{ secrets.VIP_VAP_ACCESS_TOKEN }}@github.com/${{ github.repository_owner }}/zenith-coverage-data.git \
|
||||
--message="Add code coverage for $COMMIT_URL" \
|
||||
copy /tmp/coverage/report $COMMIT_SHA # COPY FROM TO_RELATIVE
|
||||
copy /tmp/neon/coverage/report $COMMIT_SHA # COPY FROM TO_RELATIVE
|
||||
|
||||
# Add link to the coverage report to the commit
|
||||
curl -f -X POST \
|
||||
|
||||
2
.github/workflows/codestyle.yml
vendored
2
.github/workflows/codestyle.yml
vendored
@@ -26,7 +26,7 @@ jobs:
|
||||
# Rust toolchains (e.g. nightly or 1.37.0), add them here.
|
||||
rust_toolchain: [1.58]
|
||||
os: [ubuntu-latest, macos-latest]
|
||||
timeout-minutes: 50
|
||||
timeout-minutes: 30
|
||||
name: run regression test suite
|
||||
runs-on: ${{ matrix.os }}
|
||||
|
||||
|
||||
19
Cargo.lock
generated
19
Cargo.lock
generated
@@ -461,7 +461,7 @@ dependencies = [
|
||||
"tar",
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"url",
|
||||
"urlencoding",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
@@ -650,7 +650,7 @@ dependencies = [
|
||||
"crossterm_winapi",
|
||||
"libc",
|
||||
"mio",
|
||||
"parking_lot 0.12.0",
|
||||
"parking_lot 0.12.1",
|
||||
"signal-hook",
|
||||
"signal-hook-mio",
|
||||
"winapi",
|
||||
@@ -1899,6 +1899,7 @@ dependencies = [
|
||||
"metrics",
|
||||
"nix",
|
||||
"once_cell",
|
||||
"parking_lot 0.12.1",
|
||||
"postgres",
|
||||
"postgres-protocol",
|
||||
"postgres-types",
|
||||
@@ -1939,9 +1940,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "parking_lot"
|
||||
version = "0.12.0"
|
||||
version = "0.12.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "87f5ec2493a61ac0506c0f4199f99070cbe83857b0337006a30f3e6719b8ef58"
|
||||
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
|
||||
dependencies = [
|
||||
"lock_api",
|
||||
"parking_lot_core 0.9.2",
|
||||
@@ -2308,7 +2309,7 @@ dependencies = [
|
||||
"lazy_static",
|
||||
"md5",
|
||||
"metrics",
|
||||
"parking_lot 0.12.0",
|
||||
"parking_lot 0.12.1",
|
||||
"pin-project-lite",
|
||||
"rand",
|
||||
"rcgen",
|
||||
@@ -3356,7 +3357,7 @@ dependencies = [
|
||||
"fallible-iterator",
|
||||
"futures",
|
||||
"log",
|
||||
"parking_lot 0.12.0",
|
||||
"parking_lot 0.12.1",
|
||||
"percent-encoding",
|
||||
"phf",
|
||||
"pin-project-lite",
|
||||
@@ -3685,6 +3686,12 @@ dependencies = [
|
||||
"percent-encoding",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "urlencoding"
|
||||
version = "2.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "68b90931029ab9b034b300b797048cf23723400aa757e8a2bfb9d748102f9821"
|
||||
|
||||
[[package]]
|
||||
name = "utils"
|
||||
version = "0.1.0"
|
||||
|
||||
@@ -18,5 +18,5 @@ serde_json = "1"
|
||||
tar = "0.4"
|
||||
tokio = { version = "1.17", features = ["macros", "rt", "rt-multi-thread"] }
|
||||
tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" }
|
||||
url = "2.2.2"
|
||||
urlencoding = "2.1.0"
|
||||
workspace_hack = { version = "0.1", path = "../workspace_hack" }
|
||||
|
||||
@@ -33,7 +33,7 @@ use std::process::exit;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::{thread, time::Duration};
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use anyhow::Result;
|
||||
use chrono::Utc;
|
||||
use clap::Arg;
|
||||
use log::{error, info};
|
||||
@@ -45,7 +45,6 @@ use compute_tools::monitor::launch_monitor;
|
||||
use compute_tools::params::*;
|
||||
use compute_tools::pg_helpers::*;
|
||||
use compute_tools::spec::*;
|
||||
use url::Url;
|
||||
|
||||
fn main() -> Result<()> {
|
||||
// TODO: re-use `utils::logging` later
|
||||
@@ -132,7 +131,7 @@ fn main() -> Result<()> {
|
||||
|
||||
let compute_state = ComputeNode {
|
||||
start_time: Utc::now(),
|
||||
connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?,
|
||||
connstr: connstr.to_string(),
|
||||
pgdata: pgdata.to_string(),
|
||||
pgbin: pgbin.to_string(),
|
||||
spec,
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::{anyhow, Result};
|
||||
use log::error;
|
||||
use postgres::Client;
|
||||
@@ -21,8 +23,9 @@ pub fn create_writablity_check_data(client: &mut Client) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn check_writability(compute: &ComputeNode) -> Result<()> {
|
||||
let (client, connection) = tokio_postgres::connect(compute.connstr.as_str(), NoTls).await?;
|
||||
pub async fn check_writability(compute: &Arc<ComputeNode>) -> Result<()> {
|
||||
let connstr = &compute.connstr;
|
||||
let (client, connection) = tokio_postgres::connect(connstr, NoTls).await?;
|
||||
if client.is_closed() {
|
||||
return Err(anyhow!("connection to postgres closed"));
|
||||
}
|
||||
|
||||
@@ -35,8 +35,7 @@ use crate::spec::*;
|
||||
/// Compute node info shared across several `compute_ctl` threads.
|
||||
pub struct ComputeNode {
|
||||
pub start_time: DateTime<Utc>,
|
||||
// Url type maintains proper escaping
|
||||
pub connstr: url::Url,
|
||||
pub connstr: String,
|
||||
pub pgdata: String,
|
||||
pub pgbin: String,
|
||||
pub spec: ComputeSpec,
|
||||
@@ -269,25 +268,21 @@ impl ComputeNode {
|
||||
// In this case we need to connect with old `zenith_admin`name
|
||||
// and create new user. We cannot simply rename connected user,
|
||||
// but we can create a new one and grant it all privileges.
|
||||
let mut client = match Client::connect(self.connstr.as_str(), NoTls) {
|
||||
let mut client = match Client::connect(&self.connstr, NoTls) {
|
||||
Err(e) => {
|
||||
info!(
|
||||
"cannot connect to postgres: {}, retrying with `zenith_admin` username",
|
||||
e
|
||||
);
|
||||
let mut zenith_admin_connstr = self.connstr.clone();
|
||||
let zenith_admin_connstr = self.connstr.replacen("cloud_admin", "zenith_admin", 1);
|
||||
|
||||
zenith_admin_connstr
|
||||
.set_username("zenith_admin")
|
||||
.map_err(|_| anyhow::anyhow!("invalid connstr"))?;
|
||||
|
||||
let mut client = Client::connect(zenith_admin_connstr.as_str(), NoTls)?;
|
||||
let mut client = Client::connect(&zenith_admin_connstr, NoTls)?;
|
||||
client.simple_query("CREATE USER cloud_admin WITH SUPERUSER")?;
|
||||
client.simple_query("GRANT zenith_admin TO cloud_admin")?;
|
||||
drop(client);
|
||||
|
||||
// reconnect with connsting with expected name
|
||||
Client::connect(self.connstr.as_str(), NoTls)?
|
||||
Client::connect(&self.connstr, NoTls)?
|
||||
}
|
||||
Ok(client) => client,
|
||||
};
|
||||
|
||||
@@ -13,11 +13,11 @@ const MONITOR_CHECK_INTERVAL: u64 = 500; // milliseconds
|
||||
// Spin in a loop and figure out the last activity time in the Postgres.
|
||||
// Then update it in the shared state. This function never errors out.
|
||||
// XXX: the only expected panic is at `RwLock` unwrap().
|
||||
fn watch_compute_activity(compute: &ComputeNode) {
|
||||
fn watch_compute_activity(compute: &Arc<ComputeNode>) {
|
||||
// Suppose that `connstr` doesn't change
|
||||
let connstr = compute.connstr.as_str();
|
||||
let connstr = compute.connstr.clone();
|
||||
// Define `client` outside of the loop to reuse existing connection if it's active.
|
||||
let mut client = Client::connect(connstr, NoTls);
|
||||
let mut client = Client::connect(&connstr, NoTls);
|
||||
let timeout = time::Duration::from_millis(MONITOR_CHECK_INTERVAL);
|
||||
|
||||
info!("watching Postgres activity at {}", connstr);
|
||||
@@ -32,7 +32,7 @@ fn watch_compute_activity(compute: &ComputeNode) {
|
||||
info!("connection to postgres closed, trying to reconnect");
|
||||
|
||||
// Connection is closed, reconnect and try again.
|
||||
client = Client::connect(connstr, NoTls);
|
||||
client = Client::connect(&connstr, NoTls);
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -93,7 +93,7 @@ fn watch_compute_activity(compute: &ComputeNode) {
|
||||
debug!("cannot connect to postgres: {}, retrying", e);
|
||||
|
||||
// Establish a new connection and try again.
|
||||
client = Client::connect(connstr, NoTls);
|
||||
client = Client::connect(&connstr, NoTls);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ use anyhow::Result;
|
||||
use log::{info, log_enabled, warn, Level};
|
||||
use postgres::{Client, NoTls};
|
||||
use serde::Deserialize;
|
||||
use urlencoding::encode;
|
||||
|
||||
use crate::compute::ComputeNode;
|
||||
use crate::config;
|
||||
@@ -230,11 +231,9 @@ pub fn handle_role_deletions(node: &ComputeNode, client: &mut Client) -> Result<
|
||||
fn reassign_owned_objects(node: &ComputeNode, role_name: &PgIdent) -> Result<()> {
|
||||
for db in &node.spec.cluster.databases {
|
||||
if db.owner != *role_name {
|
||||
let mut connstr = node.connstr.clone();
|
||||
// database name is always the last and the only component of the path
|
||||
connstr.set_path(&db.name);
|
||||
|
||||
let mut client = Client::connect(connstr.as_str(), NoTls)?;
|
||||
let db_name_encoded = format!("/{}", encode(&db.name));
|
||||
let db_connstr = node.connstr.replacen("/postgres", &db_name_encoded, 1);
|
||||
let mut client = Client::connect(&db_connstr, NoTls)?;
|
||||
|
||||
// This will reassign all dependent objects to the db owner
|
||||
let reassign_query = format!(
|
||||
|
||||
@@ -63,6 +63,8 @@ workspace_hack = { version = "0.1", path = "../workspace_hack" }
|
||||
close_fds = "0.3.2"
|
||||
walkdir = "2.3.2"
|
||||
|
||||
parking_lot = "0.12.1"
|
||||
|
||||
[dev-dependencies]
|
||||
hex-literal = "0.3"
|
||||
tempfile = "3.2"
|
||||
|
||||
@@ -36,13 +36,12 @@
|
||||
//! mapping is automatically removed and the slot is marked free.
|
||||
//!
|
||||
|
||||
use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
|
||||
use std::{
|
||||
collections::{hash_map::Entry, HashMap},
|
||||
convert::TryInto,
|
||||
sync::{
|
||||
atomic::{AtomicU8, AtomicUsize, Ordering},
|
||||
RwLock, RwLockReadGuard, RwLockWriteGuard, TryLockError,
|
||||
},
|
||||
sync::atomic::{AtomicU8, AtomicUsize, Ordering},
|
||||
};
|
||||
|
||||
use once_cell::sync::OnceCell;
|
||||
@@ -385,7 +384,7 @@ impl PageCache {
|
||||
for slot_idx in 0..self.slots.len() {
|
||||
let slot = &self.slots[slot_idx];
|
||||
|
||||
let mut inner = slot.inner.write().unwrap();
|
||||
let mut inner = slot.inner.write();
|
||||
if let Some(key) = &inner.key {
|
||||
match key {
|
||||
CacheKey::EphemeralPage { file_id, blkno: _ } if *file_id == drop_file_id => {
|
||||
@@ -413,7 +412,7 @@ impl PageCache {
|
||||
for slot_idx in 0..self.slots.len() {
|
||||
let slot = &self.slots[slot_idx];
|
||||
|
||||
let mut inner = slot.inner.write().unwrap();
|
||||
let mut inner = slot.inner.write();
|
||||
if let Some(key) = &inner.key {
|
||||
match key {
|
||||
CacheKey::ImmutableFilePage { file_id, blkno: _ }
|
||||
@@ -454,7 +453,7 @@ impl PageCache {
|
||||
// that it's still what we expected (because we released the mapping
|
||||
// lock already, another thread could have evicted the page)
|
||||
let slot = &self.slots[slot_idx];
|
||||
let inner = slot.inner.read().unwrap();
|
||||
let inner = slot.inner.read();
|
||||
if inner.key.as_ref() == Some(cache_key) {
|
||||
slot.inc_usage_count();
|
||||
return Some(PageReadGuard(inner));
|
||||
@@ -543,7 +542,7 @@ impl PageCache {
|
||||
// that it's still what we expected (because we don't released the mapping
|
||||
// lock already, another thread could have evicted the page)
|
||||
let slot = &self.slots[slot_idx];
|
||||
let inner = slot.inner.write().unwrap();
|
||||
let inner = slot.inner.write();
|
||||
if inner.key.as_ref() == Some(cache_key) {
|
||||
slot.inc_usage_count();
|
||||
return Some(PageWriteGuard { inner, valid: true });
|
||||
@@ -611,7 +610,7 @@ impl PageCache {
|
||||
fn search_mapping(&self, cache_key: &mut CacheKey) -> Option<usize> {
|
||||
match cache_key {
|
||||
CacheKey::MaterializedPage { hash_key, lsn } => {
|
||||
let map = self.materialized_page_map.read().unwrap();
|
||||
let map = self.materialized_page_map.read();
|
||||
let versions = map.get(hash_key)?;
|
||||
|
||||
let version_idx = match versions.binary_search_by_key(lsn, |v| v.lsn) {
|
||||
@@ -624,11 +623,11 @@ impl PageCache {
|
||||
Some(version.slot_idx)
|
||||
}
|
||||
CacheKey::EphemeralPage { file_id, blkno } => {
|
||||
let map = self.ephemeral_page_map.read().unwrap();
|
||||
let map = self.ephemeral_page_map.read();
|
||||
Some(*map.get(&(*file_id, *blkno))?)
|
||||
}
|
||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||
let map = self.immutable_page_map.read().unwrap();
|
||||
let map = self.immutable_page_map.read();
|
||||
Some(*map.get(&(*file_id, *blkno))?)
|
||||
}
|
||||
}
|
||||
@@ -641,7 +640,7 @@ impl PageCache {
|
||||
fn search_mapping_for_write(&self, key: &CacheKey) -> Option<usize> {
|
||||
match key {
|
||||
CacheKey::MaterializedPage { hash_key, lsn } => {
|
||||
let map = self.materialized_page_map.read().unwrap();
|
||||
let map = self.materialized_page_map.read();
|
||||
let versions = map.get(hash_key)?;
|
||||
|
||||
if let Ok(version_idx) = versions.binary_search_by_key(lsn, |v| v.lsn) {
|
||||
@@ -651,11 +650,11 @@ impl PageCache {
|
||||
}
|
||||
}
|
||||
CacheKey::EphemeralPage { file_id, blkno } => {
|
||||
let map = self.ephemeral_page_map.read().unwrap();
|
||||
let map = self.ephemeral_page_map.read();
|
||||
Some(*map.get(&(*file_id, *blkno))?)
|
||||
}
|
||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||
let map = self.immutable_page_map.read().unwrap();
|
||||
let map = self.immutable_page_map.read();
|
||||
Some(*map.get(&(*file_id, *blkno))?)
|
||||
}
|
||||
}
|
||||
@@ -670,7 +669,7 @@ impl PageCache {
|
||||
hash_key: old_hash_key,
|
||||
lsn: old_lsn,
|
||||
} => {
|
||||
let mut map = self.materialized_page_map.write().unwrap();
|
||||
let mut map = self.materialized_page_map.write();
|
||||
if let Entry::Occupied(mut old_entry) = map.entry(old_hash_key.clone()) {
|
||||
let versions = old_entry.get_mut();
|
||||
|
||||
@@ -685,12 +684,12 @@ impl PageCache {
|
||||
}
|
||||
}
|
||||
CacheKey::EphemeralPage { file_id, blkno } => {
|
||||
let mut map = self.ephemeral_page_map.write().unwrap();
|
||||
let mut map = self.ephemeral_page_map.write();
|
||||
map.remove(&(*file_id, *blkno))
|
||||
.expect("could not find old key in mapping");
|
||||
}
|
||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||
let mut map = self.immutable_page_map.write().unwrap();
|
||||
let mut map = self.immutable_page_map.write();
|
||||
map.remove(&(*file_id, *blkno))
|
||||
.expect("could not find old key in mapping");
|
||||
}
|
||||
@@ -708,7 +707,7 @@ impl PageCache {
|
||||
hash_key: new_key,
|
||||
lsn: new_lsn,
|
||||
} => {
|
||||
let mut map = self.materialized_page_map.write().unwrap();
|
||||
let mut map = self.materialized_page_map.write();
|
||||
let versions = map.entry(new_key.clone()).or_default();
|
||||
match versions.binary_search_by_key(new_lsn, |v| v.lsn) {
|
||||
Ok(version_idx) => Some(versions[version_idx].slot_idx),
|
||||
@@ -725,7 +724,7 @@ impl PageCache {
|
||||
}
|
||||
}
|
||||
CacheKey::EphemeralPage { file_id, blkno } => {
|
||||
let mut map = self.ephemeral_page_map.write().unwrap();
|
||||
let mut map = self.ephemeral_page_map.write();
|
||||
match map.entry((*file_id, *blkno)) {
|
||||
Entry::Occupied(entry) => Some(*entry.get()),
|
||||
Entry::Vacant(entry) => {
|
||||
@@ -735,7 +734,7 @@ impl PageCache {
|
||||
}
|
||||
}
|
||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||
let mut map = self.immutable_page_map.write().unwrap();
|
||||
let mut map = self.immutable_page_map.write();
|
||||
match map.entry((*file_id, *blkno)) {
|
||||
Entry::Occupied(entry) => Some(*entry.get()),
|
||||
Entry::Vacant(entry) => {
|
||||
@@ -765,11 +764,8 @@ impl PageCache {
|
||||
|
||||
if slot.dec_usage_count() == 0 {
|
||||
let mut inner = match slot.inner.try_write() {
|
||||
Ok(inner) => inner,
|
||||
Err(TryLockError::Poisoned(err)) => {
|
||||
panic!("buffer lock was poisoned: {:?}", err)
|
||||
}
|
||||
Err(TryLockError::WouldBlock) => {
|
||||
Some(inner) => inner,
|
||||
None => {
|
||||
// If we have looped through the whole buffer pool 10 times
|
||||
// and still haven't found a victim buffer, something's wrong.
|
||||
// Maybe all the buffers were in locked. That could happen in
|
||||
|
||||
@@ -37,7 +37,7 @@ pub mod defaults {
|
||||
pub const DEFAULT_PITR_INTERVAL: &str = "30 days";
|
||||
pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "2 seconds";
|
||||
pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "10 seconds";
|
||||
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 10 * 1024 * 1024;
|
||||
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 10_000;
|
||||
}
|
||||
|
||||
/// Per-tenant configuration options
|
||||
|
||||
@@ -178,7 +178,7 @@ async fn shutdown_all_wal_connections(
|
||||
/// That may lead to certain events not being observed by the listener.
|
||||
#[derive(Debug)]
|
||||
struct TaskHandle<E> {
|
||||
handle: JoinHandle<Result<(), String>>,
|
||||
handle: JoinHandle<()>,
|
||||
events_receiver: watch::Receiver<TaskEvent<E>>,
|
||||
cancellation: watch::Sender<()>,
|
||||
}
|
||||
@@ -205,8 +205,8 @@ impl<E: Clone> TaskHandle<E> {
|
||||
|
||||
let sender = Arc::clone(&events_sender);
|
||||
let handle = tokio::task::spawn(async move {
|
||||
events_sender.send(TaskEvent::Started).ok();
|
||||
task(sender, cancellation_receiver).await
|
||||
let task_result = task(sender, cancellation_receiver).await;
|
||||
events_sender.send(TaskEvent::End(task_result)).ok();
|
||||
});
|
||||
|
||||
TaskHandle {
|
||||
@@ -216,16 +216,6 @@ impl<E: Clone> TaskHandle<E> {
|
||||
}
|
||||
}
|
||||
|
||||
async fn next_task_event(&mut self) -> TaskEvent<E> {
|
||||
select! {
|
||||
next_task_event = self.events_receiver.changed() => match next_task_event {
|
||||
Ok(()) => self.events_receiver.borrow().clone(),
|
||||
Err(_task_channel_part_dropped) => join_on_handle(&mut self.handle).await,
|
||||
},
|
||||
task_completion_result = join_on_handle(&mut self.handle) => task_completion_result,
|
||||
}
|
||||
}
|
||||
|
||||
/// Aborts current task, waiting for it to finish.
|
||||
async fn shutdown(self) {
|
||||
self.cancellation.send(()).ok();
|
||||
@@ -235,19 +225,6 @@ impl<E: Clone> TaskHandle<E> {
|
||||
}
|
||||
}
|
||||
|
||||
async fn join_on_handle<E>(handle: &mut JoinHandle<Result<(), String>>) -> TaskEvent<E> {
|
||||
match handle.await {
|
||||
Ok(task_result) => TaskEvent::End(task_result),
|
||||
Err(e) => {
|
||||
if e.is_cancelled() {
|
||||
TaskEvent::End(Ok(()))
|
||||
} else {
|
||||
TaskEvent::End(Err(format!("WAL receiver task panicked: {e}")))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A step to process timeline attach/detach events to enable/disable the corresponding WAL receiver machinery.
|
||||
/// In addition to WAL streaming management, the step ensures that corresponding tenant has its service threads enabled or disabled.
|
||||
/// This is done here, since only walreceiver knows when a certain tenant has no streaming enabled.
|
||||
|
||||
@@ -104,29 +104,49 @@ async fn connection_manager_loop_step(
|
||||
|
||||
Some(wal_connection_update) = async {
|
||||
match walreceiver_state.wal_connection.as_mut() {
|
||||
Some(wal_connection) => Some(wal_connection.connection_task.next_task_event().await),
|
||||
Some(wal_connection) => {
|
||||
let receiver = &mut wal_connection.connection_task.events_receiver;
|
||||
Some(match receiver.changed().await {
|
||||
Ok(()) => receiver.borrow().clone(),
|
||||
Err(_cancellation_error) => TaskEvent::End(Ok(())),
|
||||
})
|
||||
}
|
||||
None => None,
|
||||
}
|
||||
} => {
|
||||
let wal_connection = walreceiver_state.wal_connection.as_mut().expect("Should have a connection, as checked by the corresponding select! guard");
|
||||
match &wal_connection_update {
|
||||
TaskEvent::Started => {
|
||||
wal_connection.latest_connection_update = Utc::now().naive_utc();
|
||||
*walreceiver_state.wal_connection_attempts.entry(wal_connection.sk_id).or_insert(0) += 1;
|
||||
},
|
||||
TaskEvent::NewEvent(replication_feedback) => {
|
||||
wal_connection.latest_connection_update = DateTime::<Local>::from(replication_feedback.ps_replytime).naive_utc();
|
||||
// reset connection attempts here only, the only place where both nodes
|
||||
// explicitly confirmn with replication feedback that they are connected to each other
|
||||
walreceiver_state.wal_connection_attempts.remove(&wal_connection.sk_id);
|
||||
},
|
||||
let (connection_update, reset_connection_attempts) = match &wal_connection_update {
|
||||
TaskEvent::Started => (Some(Utc::now().naive_utc()), true),
|
||||
TaskEvent::NewEvent(replication_feedback) => (Some(DateTime::<Local>::from(replication_feedback.ps_replytime).naive_utc()), true),
|
||||
TaskEvent::End(end_result) => {
|
||||
match end_result {
|
||||
Ok(()) => debug!("WAL receiving task finished"),
|
||||
Err(e) => warn!("WAL receiving task failed: {e}"),
|
||||
let should_reset_connection_attempts = match end_result {
|
||||
Ok(()) => {
|
||||
debug!("WAL receiving task finished");
|
||||
true
|
||||
},
|
||||
Err(e) => {
|
||||
warn!("WAL receiving task failed: {e}");
|
||||
false
|
||||
},
|
||||
};
|
||||
walreceiver_state.wal_connection = None;
|
||||
(None, should_reset_connection_attempts)
|
||||
},
|
||||
};
|
||||
|
||||
if let Some(connection_update) = connection_update {
|
||||
match &mut walreceiver_state.wal_connection {
|
||||
Some(wal_connection) => {
|
||||
wal_connection.latest_connection_update = connection_update;
|
||||
|
||||
let attempts_entry = walreceiver_state.wal_connection_attempts.entry(wal_connection.sk_id).or_insert(0);
|
||||
if reset_connection_attempts {
|
||||
*attempts_entry = 0;
|
||||
} else {
|
||||
*attempts_entry += 1;
|
||||
}
|
||||
},
|
||||
None => error!("Received connection update for WAL connection that is not active, update: {wal_connection_update:?}"),
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
@@ -386,8 +406,10 @@ impl WalreceiverState {
|
||||
Some(existing_wal_connection) => {
|
||||
let connected_sk_node = existing_wal_connection.sk_id;
|
||||
|
||||
let (new_sk_id, new_safekeeper_etcd_data, new_wal_producer_connstr) =
|
||||
self.select_connection_candidate(Some(connected_sk_node))?;
|
||||
let (new_sk_id, new_safekeeper_etcd_data, new_wal_producer_connstr) = self
|
||||
.applicable_connection_candidates()
|
||||
.filter(|&(sk_id, _, _)| sk_id != connected_sk_node)
|
||||
.max_by_key(|(_, info, _)| info.commit_lsn)?;
|
||||
|
||||
let now = Utc::now().naive_utc();
|
||||
if let Ok(latest_interaciton) =
|
||||
@@ -440,8 +462,9 @@ impl WalreceiverState {
|
||||
}
|
||||
}
|
||||
None => {
|
||||
let (new_sk_id, _, new_wal_producer_connstr) =
|
||||
self.select_connection_candidate(None)?;
|
||||
let (new_sk_id, _, new_wal_producer_connstr) = self
|
||||
.applicable_connection_candidates()
|
||||
.max_by_key(|(_, info, _)| info.commit_lsn)?;
|
||||
return Some(NewWalConnectionCandidate {
|
||||
safekeeper_id: new_sk_id,
|
||||
wal_producer_connstr: new_wal_producer_connstr,
|
||||
@@ -453,49 +476,6 @@ impl WalreceiverState {
|
||||
None
|
||||
}
|
||||
|
||||
/// Selects the best possible candidate, based on the data collected from etcd updates about the safekeepers.
|
||||
/// Optionally, omits the given node, to support gracefully switching from a healthy safekeeper to another.
|
||||
///
|
||||
/// The candidate that is chosen:
|
||||
/// * has fewest connection attempts from pageserver to safekeeper node (reset every time the WAL replication feedback is sent)
|
||||
/// * has greatest data Lsn among the ones that are left
|
||||
///
|
||||
/// NOTE:
|
||||
/// We evict timeline data received from etcd based on time passed since it was registered, along with its connection attempts values, but
|
||||
/// otherwise to reset the connection attempts, a successful connection to that node is needed.
|
||||
/// That won't happen now, before all nodes with less connection attempts are connected to first, which might leave the sk node with more advanced state to be ignored.
|
||||
fn select_connection_candidate(
|
||||
&self,
|
||||
node_to_omit: Option<NodeId>,
|
||||
) -> Option<(NodeId, &SkTimelineInfo, String)> {
|
||||
let all_candidates = self
|
||||
.applicable_connection_candidates()
|
||||
.filter(|&(sk_id, _, _)| Some(sk_id) != node_to_omit)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let smallest_attempts_allowed = all_candidates
|
||||
.iter()
|
||||
.map(|(sk_id, _, _)| {
|
||||
self.wal_connection_attempts
|
||||
.get(sk_id)
|
||||
.copied()
|
||||
.unwrap_or(0)
|
||||
})
|
||||
.min()?;
|
||||
|
||||
all_candidates
|
||||
.into_iter()
|
||||
.filter(|(sk_id, _, _)| {
|
||||
smallest_attempts_allowed
|
||||
>= self
|
||||
.wal_connection_attempts
|
||||
.get(sk_id)
|
||||
.copied()
|
||||
.unwrap_or(0)
|
||||
})
|
||||
.max_by_key(|(_, info, _)| info.commit_lsn)
|
||||
}
|
||||
|
||||
fn applicable_connection_candidates(
|
||||
&self,
|
||||
) -> impl Iterator<Item = (NodeId, &SkTimelineInfo, String)> {
|
||||
@@ -520,25 +500,15 @@ impl WalreceiverState {
|
||||
}
|
||||
|
||||
fn cleanup_old_candidates(&mut self) {
|
||||
let mut node_ids_to_remove = Vec::with_capacity(self.wal_stream_candidates.len());
|
||||
|
||||
self.wal_stream_candidates.retain(|node_id, etcd_info| {
|
||||
self.wal_stream_candidates.retain(|_, etcd_info| {
|
||||
if let Ok(time_since_latest_etcd_update) =
|
||||
(Utc::now().naive_utc() - etcd_info.latest_update).to_std()
|
||||
{
|
||||
let should_retain = time_since_latest_etcd_update < self.lagging_wal_timeout;
|
||||
if !should_retain {
|
||||
node_ids_to_remove.push(*node_id);
|
||||
}
|
||||
should_retain
|
||||
time_since_latest_etcd_update < self.lagging_wal_timeout
|
||||
} else {
|
||||
true
|
||||
}
|
||||
});
|
||||
|
||||
for node_id in node_ids_to_remove {
|
||||
self.wal_connection_attempts.remove(&node_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -873,64 +843,6 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn candidate_with_many_connection_failures() -> anyhow::Result<()> {
|
||||
let harness = RepoHarness::create("candidate_with_many_connection_failures")?;
|
||||
let mut state = dummy_state(&harness);
|
||||
let now = Utc::now().naive_utc();
|
||||
|
||||
let current_lsn = Lsn(100_000).align();
|
||||
let bigger_lsn = Lsn(current_lsn.0 + 100).align();
|
||||
|
||||
state.wal_connection = None;
|
||||
state.wal_stream_candidates = HashMap::from([
|
||||
(
|
||||
NodeId(0),
|
||||
EtcdSkTimeline {
|
||||
timeline: SkTimelineInfo {
|
||||
last_log_term: None,
|
||||
flush_lsn: None,
|
||||
commit_lsn: Some(bigger_lsn),
|
||||
backup_lsn: None,
|
||||
remote_consistent_lsn: None,
|
||||
peer_horizon_lsn: None,
|
||||
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
|
||||
},
|
||||
etcd_version: 0,
|
||||
latest_update: now,
|
||||
},
|
||||
),
|
||||
(
|
||||
NodeId(1),
|
||||
EtcdSkTimeline {
|
||||
timeline: SkTimelineInfo {
|
||||
last_log_term: None,
|
||||
flush_lsn: None,
|
||||
commit_lsn: Some(current_lsn),
|
||||
backup_lsn: None,
|
||||
remote_consistent_lsn: None,
|
||||
peer_horizon_lsn: None,
|
||||
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
|
||||
},
|
||||
etcd_version: 0,
|
||||
latest_update: now,
|
||||
},
|
||||
),
|
||||
]);
|
||||
state.wal_connection_attempts = HashMap::from([(NodeId(0), 1), (NodeId(1), 0)]);
|
||||
|
||||
let candidate_with_less_errors = state
|
||||
.next_connection_candidate()
|
||||
.expect("Expected one candidate selected, but got none");
|
||||
assert_eq!(
|
||||
candidate_with_less_errors.safekeeper_id,
|
||||
NodeId(1),
|
||||
"Should select the node with less connection errors"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn connection_no_etcd_data_candidate() -> anyhow::Result<()> {
|
||||
let harness = RepoHarness::create("connection_no_etcd_data_candidate")?;
|
||||
|
||||
@@ -151,7 +151,7 @@ pub async fn handle_walreceiver_connection(
|
||||
waldecoder.feed_bytes(data);
|
||||
|
||||
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
// let _enter = info_span!("processing record", lsn = %lsn).entered();
|
||||
let _enter = info_span!("processing record", lsn = %lsn).entered();
|
||||
|
||||
// It is important to deal with the aligned records as lsn in getPage@LSN is
|
||||
// aligned and can be several bytes bigger. Without this alignment we are
|
||||
|
||||
@@ -1,6 +1,4 @@
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
from typing import List
|
||||
|
||||
import pytest
|
||||
@@ -101,34 +99,3 @@ def test_compare_pg_stats_wal_with_pgbench_default(neon_with_baseline: PgCompare
|
||||
env.pg_bin.run_capture(
|
||||
['pgbench', f'-T{duration}', f'--random-seed={seed}', '-Mprepared', env.pg.connstr()])
|
||||
env.flush()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("n_tables", [1, 10])
|
||||
@pytest.mark.parametrize("duration", get_durations_matrix(10))
|
||||
def test_compare_pg_stats_wo_with_heavy_write(neon_with_baseline: PgCompare,
|
||||
n_tables: int,
|
||||
duration: int,
|
||||
pg_stats_wo: List[PgStatTable]):
|
||||
env = neon_with_baseline
|
||||
with env.pg.connect().cursor() as cur:
|
||||
for i in range(n_tables):
|
||||
cur.execute(
|
||||
f"CREATE TABLE t{i}(key serial primary key, t text default 'foooooooooooooooooooooooooooooooooooooooooooooooooooo')"
|
||||
)
|
||||
|
||||
def start_single_table_workload(table_id: int):
|
||||
start = time.time()
|
||||
with env.pg.connect().cursor() as cur:
|
||||
while time.time() - start < duration:
|
||||
cur.execute(f"INSERT INTO t{table_id} SELECT FROM generate_series(1,1000)")
|
||||
|
||||
with env.record_pg_stats(pg_stats_wo):
|
||||
threads = [
|
||||
threading.Thread(target=start_single_table_workload, args=(i, ))
|
||||
for i in range(n_tables)
|
||||
]
|
||||
|
||||
for thread in threads:
|
||||
thread.start()
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
Reference in New Issue
Block a user