mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-23 08:00:37 +00:00
Compare commits
1 Commits
thang/exp-
...
try-parkin
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2a15415442 |
@@ -100,8 +100,10 @@ jobs:
|
||||
name: Rust build << parameters.build_type >>
|
||||
command: |
|
||||
if [[ $BUILD_TYPE == "debug" ]]; then
|
||||
cov_prefix=(scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage run)
|
||||
CARGO_FLAGS=
|
||||
elif [[ $BUILD_TYPE == "release" ]]; then
|
||||
cov_prefix=()
|
||||
CARGO_FLAGS="--release --features profiling"
|
||||
fi
|
||||
|
||||
@@ -110,7 +112,7 @@ jobs:
|
||||
export RUSTC_WRAPPER=cachepot
|
||||
export AWS_ACCESS_KEY_ID="${CACHEPOT_AWS_ACCESS_KEY_ID}"
|
||||
export AWS_SECRET_ACCESS_KEY="${CACHEPOT_AWS_SECRET_ACCESS_KEY}"
|
||||
mold -run cargo build $CARGO_FLAGS --features failpoints --bins --tests
|
||||
"${cov_prefix[@]}" mold -run cargo build $CARGO_FLAGS --features failpoints --bins --tests
|
||||
cachepot -s
|
||||
|
||||
- save_cache:
|
||||
@@ -126,24 +128,32 @@ jobs:
|
||||
name: cargo test
|
||||
command: |
|
||||
if [[ $BUILD_TYPE == "debug" ]]; then
|
||||
cov_prefix=(scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage run)
|
||||
CARGO_FLAGS=
|
||||
elif [[ $BUILD_TYPE == "release" ]]; then
|
||||
cov_prefix=()
|
||||
CARGO_FLAGS=--release
|
||||
fi
|
||||
|
||||
cargo test $CARGO_FLAGS
|
||||
"${cov_prefix[@]}" cargo test $CARGO_FLAGS
|
||||
|
||||
# Install the rust binaries, for use by test jobs
|
||||
- run:
|
||||
name: Install rust binaries
|
||||
command: |
|
||||
if [[ $BUILD_TYPE == "debug" ]]; then
|
||||
cov_prefix=(scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage run)
|
||||
elif [[ $BUILD_TYPE == "release" ]]; then
|
||||
cov_prefix=()
|
||||
fi
|
||||
|
||||
binaries=$(
|
||||
cargo metadata --format-version=1 --no-deps |
|
||||
"${cov_prefix[@]}" cargo metadata --format-version=1 --no-deps |
|
||||
jq -r '.packages[].targets[] | select(.kind | index("bin")) | .name'
|
||||
)
|
||||
|
||||
test_exe_paths=$(
|
||||
cargo test --message-format=json --no-run |
|
||||
"${cov_prefix[@]}" cargo test --message-format=json --no-run |
|
||||
jq -r '.executable | select(. != null)'
|
||||
)
|
||||
|
||||
@@ -156,15 +166,34 @@ jobs:
|
||||
SRC=target/$BUILD_TYPE/$bin
|
||||
DST=/tmp/zenith/bin/$bin
|
||||
cp $SRC $DST
|
||||
echo $DST >> /tmp/zenith/etc/binaries.list
|
||||
done
|
||||
|
||||
# Install test executables (for code coverage)
|
||||
if [[ $BUILD_TYPE == "debug" ]]; then
|
||||
for bin in $test_exe_paths; do
|
||||
SRC=$bin
|
||||
DST=/tmp/zenith/test_bin/$(basename $bin)
|
||||
cp $SRC $DST
|
||||
echo $DST >> /tmp/zenith/etc/binaries.list
|
||||
done
|
||||
fi
|
||||
|
||||
# Install the postgres binaries, for use by test jobs
|
||||
- run:
|
||||
name: Install postgres binaries
|
||||
command: |
|
||||
cp -a tmp_install /tmp/zenith/pg_install
|
||||
|
||||
# Save rust binaries for other jobs in the workflow
|
||||
- run:
|
||||
name: Merge coverage data
|
||||
command: |
|
||||
# This will speed up workspace uploads
|
||||
if [[ $BUILD_TYPE == "debug" ]]; then
|
||||
scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage merge
|
||||
fi
|
||||
|
||||
# Save the rust binaries and coverage data for other jobs in this workflow.
|
||||
- persist_to_workspace:
|
||||
root: /tmp/zenith
|
||||
paths:
|
||||
@@ -285,6 +314,12 @@ jobs:
|
||||
|
||||
export GITHUB_SHA=$CIRCLE_SHA1
|
||||
|
||||
if [[ $BUILD_TYPE == "debug" ]]; then
|
||||
cov_prefix=(scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage run)
|
||||
elif [[ $BUILD_TYPE == "release" ]]; then
|
||||
cov_prefix=()
|
||||
fi
|
||||
|
||||
# Run the tests.
|
||||
#
|
||||
# The junit.xml file allows CircleCI to display more fine-grained test information
|
||||
@@ -295,7 +330,7 @@ jobs:
|
||||
# -n4 uses four processes to run tests via pytest-xdist
|
||||
# -s is not used to prevent pytest from capturing output, because tests are running
|
||||
# in parallel and logs are mixed between different tests
|
||||
./scripts/pytest \
|
||||
"${cov_prefix[@]}" ./scripts/pytest \
|
||||
--junitxml=$TEST_OUTPUT/junit.xml \
|
||||
--tb=short \
|
||||
--verbose \
|
||||
@@ -324,12 +359,67 @@ jobs:
|
||||
# The store_test_results step tells CircleCI where to find the junit.xml file.
|
||||
- store_test_results:
|
||||
path: /tmp/test_output
|
||||
# Save data (if any)
|
||||
- run:
|
||||
name: Merge coverage data
|
||||
command: |
|
||||
# This will speed up workspace uploads
|
||||
if [[ $BUILD_TYPE == "debug" ]]; then
|
||||
scripts/coverage "--profraw-prefix=$CIRCLE_JOB" --dir=/tmp/zenith/coverage merge
|
||||
fi
|
||||
# Save coverage data (if any)
|
||||
- persist_to_workspace:
|
||||
root: /tmp/zenith
|
||||
paths:
|
||||
- "*"
|
||||
|
||||
coverage-report:
|
||||
executor: neon-xlarge-executor
|
||||
steps:
|
||||
- attach_workspace:
|
||||
at: /tmp/zenith
|
||||
- checkout
|
||||
- restore_cache:
|
||||
name: Restore rust cache
|
||||
keys:
|
||||
# Require an exact match. While an out of date cache might speed up the build,
|
||||
# there's no way to clean out old packages, so the cache grows every time something
|
||||
# changes.
|
||||
- v04-rust-cache-deps-debug-{{ checksum "Cargo.lock" }}
|
||||
- run:
|
||||
name: Build coverage report
|
||||
command: |
|
||||
COMMIT_URL=https://github.com/neondatabase/neon/commit/$CIRCLE_SHA1
|
||||
|
||||
scripts/coverage \
|
||||
--dir=/tmp/zenith/coverage report \
|
||||
--input-objects=/tmp/zenith/etc/binaries.list \
|
||||
--commit-url=$COMMIT_URL \
|
||||
--format=github
|
||||
- run:
|
||||
name: Upload coverage report
|
||||
command: |
|
||||
LOCAL_REPO=$CIRCLE_PROJECT_USERNAME/$CIRCLE_PROJECT_REPONAME
|
||||
REPORT_URL=https://neondatabase.github.io/zenith-coverage-data/$CIRCLE_SHA1
|
||||
COMMIT_URL=https://github.com/neondatabase/neon/commit/$CIRCLE_SHA1
|
||||
|
||||
scripts/git-upload \
|
||||
--repo=https://$VIP_VAP_ACCESS_TOKEN@github.com/neondatabase/zenith-coverage-data.git \
|
||||
--message="Add code coverage for $COMMIT_URL" \
|
||||
copy /tmp/zenith/coverage/report $CIRCLE_SHA1 # COPY FROM TO_RELATIVE
|
||||
|
||||
# Add link to the coverage report to the commit
|
||||
curl -f -X POST \
|
||||
https://api.github.com/repos/$LOCAL_REPO/statuses/$CIRCLE_SHA1 \
|
||||
-H "Accept: application/vnd.github.v3+json" \
|
||||
--user "$CI_ACCESS_TOKEN" \
|
||||
--data \
|
||||
"{
|
||||
\"state\": \"success\",
|
||||
\"context\": \"zenith-coverage\",
|
||||
\"description\": \"Coverage report is ready\",
|
||||
\"target_url\": \"$REPORT_URL\"
|
||||
}"
|
||||
|
||||
# Build neondatabase/neon:latest image and push it to Docker hub
|
||||
docker-image:
|
||||
docker:
|
||||
@@ -640,6 +730,12 @@ workflows:
|
||||
save_perf_report: true
|
||||
requires:
|
||||
- build-neon-release
|
||||
- coverage-report:
|
||||
# Context passes credentials for gh api
|
||||
context: CI_ACCESS_TOKEN
|
||||
requires:
|
||||
# TODO: consider adding more
|
||||
- other-tests-debug
|
||||
- docker-image:
|
||||
# Context gives an ability to login
|
||||
context: Docker Hub
|
||||
|
||||
@@ -92,7 +92,7 @@ runs:
|
||||
fi
|
||||
|
||||
if [[ "${{ inputs.build_type }}" == "debug" ]]; then
|
||||
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
|
||||
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/neon/coverage run)
|
||||
elif [[ "${{ inputs.build_type }}" == "release" ]]; then
|
||||
cov_prefix=()
|
||||
fi
|
||||
|
||||
@@ -6,7 +6,7 @@ runs:
|
||||
steps:
|
||||
- name: Merge coverage data
|
||||
shell: bash -ex {0}
|
||||
run: scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage merge
|
||||
run: scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/neon/coverage/ merge
|
||||
|
||||
- name: Upload coverage data
|
||||
uses: actions/upload-artifact@v3
|
||||
@@ -14,4 +14,4 @@ runs:
|
||||
retention-days: 7
|
||||
if-no-files-found: error
|
||||
name: coverage-data-artifact
|
||||
path: /tmp/coverage/
|
||||
path: /tmp/neon/coverage/
|
||||
|
||||
38
.github/workflows/build_and_test.yml
vendored
38
.github/workflows/build_and_test.yml
vendored
@@ -17,6 +17,11 @@ concurrency:
|
||||
env:
|
||||
RUST_BACKTRACE: 1
|
||||
COPT: '-Werror'
|
||||
AWS_ACCESS_KEY_ID: ${{ secrets.CACHEPOT_AWS_ACCESS_KEY_ID }}
|
||||
AWS_SECRET_ACCESS_KEY: ${{ secrets.CACHEPOT_AWS_SECRET_ACCESS_KEY }}
|
||||
CACHEPOT_BUCKET: zenith-rust-cachepot
|
||||
RUSTC_WRAPPER: cachepot
|
||||
|
||||
|
||||
jobs:
|
||||
build-postgres:
|
||||
@@ -49,7 +54,7 @@ jobs:
|
||||
|
||||
- name: Build postgres
|
||||
if: steps.cache_pg.outputs.cache-hit != 'true'
|
||||
run: mold -run make postgres -j$(nproc)
|
||||
run: COPT='-Werror' mold -run make postgres -j$(nproc)
|
||||
|
||||
# actions/cache@v3 does not allow concurrently using the same cache across job steps, so use a separate cache
|
||||
- name: Prepare postgres artifact
|
||||
@@ -101,15 +106,12 @@ jobs:
|
||||
~/.cargo/registry/
|
||||
~/.cargo/git/
|
||||
target/
|
||||
# Fall back to older versions of the key, if no cache for current Cargo.lock was found
|
||||
key: |
|
||||
v2-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-${{ hashFiles('Cargo.lock') }}
|
||||
v2-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-
|
||||
key: v2-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-${{ hashFiles('Cargo.lock') }}
|
||||
|
||||
- name: Run cargo build
|
||||
run: |
|
||||
if [[ $BUILD_TYPE == "debug" ]]; then
|
||||
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
|
||||
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/neon/coverage run)
|
||||
CARGO_FLAGS=
|
||||
elif [[ $BUILD_TYPE == "release" ]]; then
|
||||
cov_prefix=()
|
||||
@@ -117,11 +119,12 @@ jobs:
|
||||
fi
|
||||
|
||||
"${cov_prefix[@]}" mold -run cargo build $CARGO_FLAGS --features failpoints --bins --tests
|
||||
cachepot -s
|
||||
|
||||
- name: Run cargo test
|
||||
run: |
|
||||
if [[ $BUILD_TYPE == "debug" ]]; then
|
||||
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
|
||||
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/neon/coverage run)
|
||||
CARGO_FLAGS=
|
||||
elif [[ $BUILD_TYPE == "release" ]]; then
|
||||
cov_prefix=()
|
||||
@@ -133,7 +136,7 @@ jobs:
|
||||
- name: Install rust binaries
|
||||
run: |
|
||||
if [[ $BUILD_TYPE == "debug" ]]; then
|
||||
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
|
||||
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/neon/coverage run)
|
||||
elif [[ $BUILD_TYPE == "release" ]]; then
|
||||
cov_prefix=()
|
||||
fi
|
||||
@@ -151,9 +154,7 @@ jobs:
|
||||
mkdir -p /tmp/neon/bin/
|
||||
mkdir -p /tmp/neon/test_bin/
|
||||
mkdir -p /tmp/neon/etc/
|
||||
|
||||
# Keep bloated coverage data files away from the rest of the artifact
|
||||
mkdir -p /tmp/coverage/
|
||||
mkdir -p /tmp/neon/coverage/
|
||||
|
||||
# Install target binaries
|
||||
for bin in $binaries; do
|
||||
@@ -165,13 +166,13 @@ jobs:
|
||||
# Install test executables and write list of all binaries (for code coverage)
|
||||
if [[ $BUILD_TYPE == "debug" ]]; then
|
||||
for bin in $binaries; do
|
||||
echo "/tmp/neon/bin/$bin" >> /tmp/coverage/binaries.list
|
||||
echo "/tmp/neon/bin/$bin" >> /tmp/neon/coverage/binaries.list
|
||||
done
|
||||
for bin in $test_exe_paths; do
|
||||
SRC=$bin
|
||||
DST=/tmp/neon/test_bin/$(basename $bin)
|
||||
cp "$SRC" "$DST"
|
||||
echo "$DST" >> /tmp/coverage/binaries.list
|
||||
echo "$DST" >> /tmp/neon/coverage/binaries.list
|
||||
done
|
||||
fi
|
||||
|
||||
@@ -315,10 +316,7 @@ jobs:
|
||||
uses: actions/download-artifact@v3
|
||||
with:
|
||||
name: coverage-data-artifact
|
||||
path: /tmp/coverage/
|
||||
|
||||
- name: Merge coverage data
|
||||
run: scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage merge
|
||||
path: /tmp/neon/coverage/
|
||||
|
||||
- name: Build and upload coverage report
|
||||
run: |
|
||||
@@ -327,8 +325,8 @@ jobs:
|
||||
COMMIT_URL=https://github.com/${{ github.repository }}/commit/$COMMIT_SHA
|
||||
|
||||
scripts/coverage \
|
||||
--dir=/tmp/coverage report \
|
||||
--input-objects=/tmp/coverage/binaries.list \
|
||||
--dir=/tmp/neon/coverage report \
|
||||
--input-objects=/tmp/neon/coverage/binaries.list \
|
||||
--commit-url=$COMMIT_URL \
|
||||
--format=github
|
||||
|
||||
@@ -337,7 +335,7 @@ jobs:
|
||||
scripts/git-upload \
|
||||
--repo=https://${{ secrets.VIP_VAP_ACCESS_TOKEN }}@github.com/${{ github.repository_owner }}/zenith-coverage-data.git \
|
||||
--message="Add code coverage for $COMMIT_URL" \
|
||||
copy /tmp/coverage/report $COMMIT_SHA # COPY FROM TO_RELATIVE
|
||||
copy /tmp/neon/coverage/report $COMMIT_SHA # COPY FROM TO_RELATIVE
|
||||
|
||||
# Add link to the coverage report to the commit
|
||||
curl -f -X POST \
|
||||
|
||||
2
.github/workflows/codestyle.yml
vendored
2
.github/workflows/codestyle.yml
vendored
@@ -26,7 +26,7 @@ jobs:
|
||||
# Rust toolchains (e.g. nightly or 1.37.0), add them here.
|
||||
rust_toolchain: [1.58]
|
||||
os: [ubuntu-latest, macos-latest]
|
||||
timeout-minutes: 50
|
||||
timeout-minutes: 30
|
||||
name: run regression test suite
|
||||
runs-on: ${{ matrix.os }}
|
||||
|
||||
|
||||
11
Cargo.lock
generated
11
Cargo.lock
generated
@@ -650,7 +650,7 @@ dependencies = [
|
||||
"crossterm_winapi",
|
||||
"libc",
|
||||
"mio",
|
||||
"parking_lot 0.12.0",
|
||||
"parking_lot 0.12.1",
|
||||
"signal-hook",
|
||||
"signal-hook-mio",
|
||||
"winapi",
|
||||
@@ -1899,6 +1899,7 @@ dependencies = [
|
||||
"metrics",
|
||||
"nix",
|
||||
"once_cell",
|
||||
"parking_lot 0.12.1",
|
||||
"postgres",
|
||||
"postgres-protocol",
|
||||
"postgres-types",
|
||||
@@ -1939,9 +1940,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "parking_lot"
|
||||
version = "0.12.0"
|
||||
version = "0.12.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "87f5ec2493a61ac0506c0f4199f99070cbe83857b0337006a30f3e6719b8ef58"
|
||||
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
|
||||
dependencies = [
|
||||
"lock_api",
|
||||
"parking_lot_core 0.9.2",
|
||||
@@ -2308,7 +2309,7 @@ dependencies = [
|
||||
"lazy_static",
|
||||
"md5",
|
||||
"metrics",
|
||||
"parking_lot 0.12.0",
|
||||
"parking_lot 0.12.1",
|
||||
"pin-project-lite",
|
||||
"rand",
|
||||
"rcgen",
|
||||
@@ -3356,7 +3357,7 @@ dependencies = [
|
||||
"fallible-iterator",
|
||||
"futures",
|
||||
"log",
|
||||
"parking_lot 0.12.0",
|
||||
"parking_lot 0.12.1",
|
||||
"percent-encoding",
|
||||
"phf",
|
||||
"pin-project-lite",
|
||||
|
||||
@@ -63,6 +63,8 @@ workspace_hack = { version = "0.1", path = "../workspace_hack" }
|
||||
close_fds = "0.3.2"
|
||||
walkdir = "2.3.2"
|
||||
|
||||
parking_lot = "0.12.1"
|
||||
|
||||
[dev-dependencies]
|
||||
hex-literal = "0.3"
|
||||
tempfile = "3.2"
|
||||
|
||||
@@ -16,7 +16,6 @@ use crate::reltag::{RelTag, SlruKind};
|
||||
use crate::repository::Repository;
|
||||
use crate::repository::Timeline;
|
||||
use crate::walingest::WalIngest;
|
||||
use crate::walrecord::DecodedWALRecord;
|
||||
use postgres_ffi::relfile_utils::*;
|
||||
use postgres_ffi::waldecoder::*;
|
||||
use postgres_ffi::xlog_utils::*;
|
||||
@@ -39,7 +38,7 @@ pub fn import_timeline_from_postgres_datadir<R: Repository>(
|
||||
|
||||
// TODO this shoud be start_lsn, which is not necessarily equal to end_lsn (aka lsn)
|
||||
// Then fishing out pg_control would be unnecessary
|
||||
let mut modification = tline.begin_modification();
|
||||
let mut modification = tline.begin_modification(lsn);
|
||||
modification.init_empty()?;
|
||||
|
||||
// Import all but pg_wal
|
||||
@@ -62,7 +61,7 @@ pub fn import_timeline_from_postgres_datadir<R: Repository>(
|
||||
}
|
||||
|
||||
// We're done importing all the data files.
|
||||
modification.commit(lsn)?;
|
||||
modification.commit()?;
|
||||
|
||||
// We expect the Postgres server to be shut down cleanly.
|
||||
let pg_control = pg_control.context("pg_control file not found")?;
|
||||
@@ -268,11 +267,9 @@ fn import_wal<R: Repository>(
|
||||
waldecoder.feed_bytes(&buf);
|
||||
|
||||
let mut nrecords = 0;
|
||||
let mut modification = tline.begin_modification();
|
||||
let mut decoded = DecodedWALRecord::default();
|
||||
while last_lsn <= endpoint {
|
||||
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
walingest.ingest_record(recdata, lsn, &mut modification, &mut decoded)?;
|
||||
walingest.ingest_record(tline, recdata, lsn)?;
|
||||
last_lsn = lsn;
|
||||
|
||||
nrecords += 1;
|
||||
@@ -302,7 +299,7 @@ pub fn import_basebackup_from_tar<R: Repository, Reader: Read>(
|
||||
base_lsn: Lsn,
|
||||
) -> Result<()> {
|
||||
info!("importing base at {}", base_lsn);
|
||||
let mut modification = tline.begin_modification();
|
||||
let mut modification = tline.begin_modification(base_lsn);
|
||||
modification.init_empty()?;
|
||||
|
||||
let mut pg_control: Option<ControlFileData> = None;
|
||||
@@ -333,7 +330,7 @@ pub fn import_basebackup_from_tar<R: Repository, Reader: Read>(
|
||||
// sanity check: ensure that pg_control is loaded
|
||||
let _pg_control = pg_control.context("pg_control file not found")?;
|
||||
|
||||
modification.commit(base_lsn)?;
|
||||
modification.commit()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -385,11 +382,9 @@ pub fn import_wal_from_tar<R: Repository, Reader: Read>(
|
||||
|
||||
waldecoder.feed_bytes(&bytes[offset..]);
|
||||
|
||||
let mut modification = tline.begin_modification();
|
||||
let mut decoded = DecodedWALRecord::default();
|
||||
while last_lsn <= end_lsn {
|
||||
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
walingest.ingest_record(recdata, lsn, &mut modification, &mut decoded)?;
|
||||
walingest.ingest_record(tline, recdata, lsn)?;
|
||||
last_lsn = lsn;
|
||||
|
||||
debug!("imported record at {} (end {})", lsn, end_lsn);
|
||||
|
||||
@@ -660,21 +660,11 @@ impl DeltaLayerWriter {
|
||||
/// The values must be appended in key, lsn order.
|
||||
///
|
||||
pub fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> Result<()> {
|
||||
self.put_value_bytes(key, lsn, &Value::ser(&val)?, val.will_init())
|
||||
}
|
||||
|
||||
pub fn put_value_bytes(
|
||||
&mut self,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
val: &[u8],
|
||||
will_init: bool,
|
||||
) -> Result<()> {
|
||||
assert!(self.lsn_range.start <= lsn);
|
||||
|
||||
let off = self.blob_writer.write_blob(val)?;
|
||||
let off = self.blob_writer.write_blob(&Value::ser(&val)?)?;
|
||||
|
||||
let blob_ref = BlobRef::new(off, will_init);
|
||||
let blob_ref = BlobRef::new(off, val.will_init());
|
||||
|
||||
let delta_key = DeltaKey::from_key_lsn(&key, lsn);
|
||||
self.tree.append(&delta_key.0, blob_ref.0)?;
|
||||
|
||||
@@ -28,7 +28,7 @@ use utils::{
|
||||
use std::fmt::Write as _;
|
||||
use std::ops::Range;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::{Mutex, RwLock};
|
||||
use std::sync::RwLock;
|
||||
|
||||
pub struct InMemoryLayer {
|
||||
conf: &'static PageServerConf,
|
||||
@@ -41,10 +41,6 @@ pub struct InMemoryLayer {
|
||||
///
|
||||
start_lsn: Lsn,
|
||||
|
||||
/// A buffer for serializing object during [`InMemoryLayer::put_value`].
|
||||
/// This buffer is reused for each serialization to avoid additional malloc calls.
|
||||
ser_buffer: Mutex<Vec<u8>>,
|
||||
|
||||
/// The above fields never change. The parts that do change are in 'inner',
|
||||
/// and protected by mutex.
|
||||
inner: RwLock<InMemoryLayerInner>,
|
||||
@@ -259,7 +255,6 @@ impl InMemoryLayer {
|
||||
timelineid,
|
||||
tenantid,
|
||||
start_lsn,
|
||||
ser_buffer: Mutex::new(Vec::new()),
|
||||
inner: RwLock::new(InMemoryLayerInner {
|
||||
end_lsn: None,
|
||||
index: HashMap::new(),
|
||||
@@ -275,15 +270,10 @@ impl InMemoryLayer {
|
||||
pub fn put_value(&self, key: Key, lsn: Lsn, val: Value) -> Result<()> {
|
||||
trace!("put_value key {} at {}/{}", key, self.timelineid, lsn);
|
||||
let mut inner = self.inner.write().unwrap();
|
||||
|
||||
inner.assert_writeable();
|
||||
|
||||
let off = {
|
||||
let mut buf = self.ser_buffer.lock().unwrap();
|
||||
val.ser_into(&mut (*buf))?;
|
||||
let off = inner.file.write_blob(&buf)?;
|
||||
buf.clear();
|
||||
off
|
||||
};
|
||||
let off = inner.file.write_blob(&Value::ser(&val)?)?;
|
||||
|
||||
let vec_map = inner.index.entry(key).or_default();
|
||||
let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
|
||||
@@ -352,8 +342,8 @@ impl InMemoryLayer {
|
||||
// Write all page versions
|
||||
for (lsn, pos) in vec_map.as_slice() {
|
||||
cursor.read_blob_into_buf(*pos, &mut buf)?;
|
||||
let will_init = Value::des(&buf)?.will_init();
|
||||
delta_layer_writer.put_value_bytes(key, *lsn, &buf, will_init)?;
|
||||
let val = Value::des(&buf)?;
|
||||
delta_layer_writer.put_value(key, *lsn, val)?;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -36,13 +36,12 @@
|
||||
//! mapping is automatically removed and the slot is marked free.
|
||||
//!
|
||||
|
||||
use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
|
||||
use std::{
|
||||
collections::{hash_map::Entry, HashMap},
|
||||
convert::TryInto,
|
||||
sync::{
|
||||
atomic::{AtomicU8, AtomicUsize, Ordering},
|
||||
RwLock, RwLockReadGuard, RwLockWriteGuard, TryLockError,
|
||||
},
|
||||
sync::atomic::{AtomicU8, AtomicUsize, Ordering},
|
||||
};
|
||||
|
||||
use once_cell::sync::OnceCell;
|
||||
@@ -385,7 +384,7 @@ impl PageCache {
|
||||
for slot_idx in 0..self.slots.len() {
|
||||
let slot = &self.slots[slot_idx];
|
||||
|
||||
let mut inner = slot.inner.write().unwrap();
|
||||
let mut inner = slot.inner.write();
|
||||
if let Some(key) = &inner.key {
|
||||
match key {
|
||||
CacheKey::EphemeralPage { file_id, blkno: _ } if *file_id == drop_file_id => {
|
||||
@@ -413,7 +412,7 @@ impl PageCache {
|
||||
for slot_idx in 0..self.slots.len() {
|
||||
let slot = &self.slots[slot_idx];
|
||||
|
||||
let mut inner = slot.inner.write().unwrap();
|
||||
let mut inner = slot.inner.write();
|
||||
if let Some(key) = &inner.key {
|
||||
match key {
|
||||
CacheKey::ImmutableFilePage { file_id, blkno: _ }
|
||||
@@ -454,7 +453,7 @@ impl PageCache {
|
||||
// that it's still what we expected (because we released the mapping
|
||||
// lock already, another thread could have evicted the page)
|
||||
let slot = &self.slots[slot_idx];
|
||||
let inner = slot.inner.read().unwrap();
|
||||
let inner = slot.inner.read();
|
||||
if inner.key.as_ref() == Some(cache_key) {
|
||||
slot.inc_usage_count();
|
||||
return Some(PageReadGuard(inner));
|
||||
@@ -543,7 +542,7 @@ impl PageCache {
|
||||
// that it's still what we expected (because we don't released the mapping
|
||||
// lock already, another thread could have evicted the page)
|
||||
let slot = &self.slots[slot_idx];
|
||||
let inner = slot.inner.write().unwrap();
|
||||
let inner = slot.inner.write();
|
||||
if inner.key.as_ref() == Some(cache_key) {
|
||||
slot.inc_usage_count();
|
||||
return Some(PageWriteGuard { inner, valid: true });
|
||||
@@ -611,7 +610,7 @@ impl PageCache {
|
||||
fn search_mapping(&self, cache_key: &mut CacheKey) -> Option<usize> {
|
||||
match cache_key {
|
||||
CacheKey::MaterializedPage { hash_key, lsn } => {
|
||||
let map = self.materialized_page_map.read().unwrap();
|
||||
let map = self.materialized_page_map.read();
|
||||
let versions = map.get(hash_key)?;
|
||||
|
||||
let version_idx = match versions.binary_search_by_key(lsn, |v| v.lsn) {
|
||||
@@ -624,11 +623,11 @@ impl PageCache {
|
||||
Some(version.slot_idx)
|
||||
}
|
||||
CacheKey::EphemeralPage { file_id, blkno } => {
|
||||
let map = self.ephemeral_page_map.read().unwrap();
|
||||
let map = self.ephemeral_page_map.read();
|
||||
Some(*map.get(&(*file_id, *blkno))?)
|
||||
}
|
||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||
let map = self.immutable_page_map.read().unwrap();
|
||||
let map = self.immutable_page_map.read();
|
||||
Some(*map.get(&(*file_id, *blkno))?)
|
||||
}
|
||||
}
|
||||
@@ -641,7 +640,7 @@ impl PageCache {
|
||||
fn search_mapping_for_write(&self, key: &CacheKey) -> Option<usize> {
|
||||
match key {
|
||||
CacheKey::MaterializedPage { hash_key, lsn } => {
|
||||
let map = self.materialized_page_map.read().unwrap();
|
||||
let map = self.materialized_page_map.read();
|
||||
let versions = map.get(hash_key)?;
|
||||
|
||||
if let Ok(version_idx) = versions.binary_search_by_key(lsn, |v| v.lsn) {
|
||||
@@ -651,11 +650,11 @@ impl PageCache {
|
||||
}
|
||||
}
|
||||
CacheKey::EphemeralPage { file_id, blkno } => {
|
||||
let map = self.ephemeral_page_map.read().unwrap();
|
||||
let map = self.ephemeral_page_map.read();
|
||||
Some(*map.get(&(*file_id, *blkno))?)
|
||||
}
|
||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||
let map = self.immutable_page_map.read().unwrap();
|
||||
let map = self.immutable_page_map.read();
|
||||
Some(*map.get(&(*file_id, *blkno))?)
|
||||
}
|
||||
}
|
||||
@@ -670,7 +669,7 @@ impl PageCache {
|
||||
hash_key: old_hash_key,
|
||||
lsn: old_lsn,
|
||||
} => {
|
||||
let mut map = self.materialized_page_map.write().unwrap();
|
||||
let mut map = self.materialized_page_map.write();
|
||||
if let Entry::Occupied(mut old_entry) = map.entry(old_hash_key.clone()) {
|
||||
let versions = old_entry.get_mut();
|
||||
|
||||
@@ -685,12 +684,12 @@ impl PageCache {
|
||||
}
|
||||
}
|
||||
CacheKey::EphemeralPage { file_id, blkno } => {
|
||||
let mut map = self.ephemeral_page_map.write().unwrap();
|
||||
let mut map = self.ephemeral_page_map.write();
|
||||
map.remove(&(*file_id, *blkno))
|
||||
.expect("could not find old key in mapping");
|
||||
}
|
||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||
let mut map = self.immutable_page_map.write().unwrap();
|
||||
let mut map = self.immutable_page_map.write();
|
||||
map.remove(&(*file_id, *blkno))
|
||||
.expect("could not find old key in mapping");
|
||||
}
|
||||
@@ -708,7 +707,7 @@ impl PageCache {
|
||||
hash_key: new_key,
|
||||
lsn: new_lsn,
|
||||
} => {
|
||||
let mut map = self.materialized_page_map.write().unwrap();
|
||||
let mut map = self.materialized_page_map.write();
|
||||
let versions = map.entry(new_key.clone()).or_default();
|
||||
match versions.binary_search_by_key(new_lsn, |v| v.lsn) {
|
||||
Ok(version_idx) => Some(versions[version_idx].slot_idx),
|
||||
@@ -725,7 +724,7 @@ impl PageCache {
|
||||
}
|
||||
}
|
||||
CacheKey::EphemeralPage { file_id, blkno } => {
|
||||
let mut map = self.ephemeral_page_map.write().unwrap();
|
||||
let mut map = self.ephemeral_page_map.write();
|
||||
match map.entry((*file_id, *blkno)) {
|
||||
Entry::Occupied(entry) => Some(*entry.get()),
|
||||
Entry::Vacant(entry) => {
|
||||
@@ -735,7 +734,7 @@ impl PageCache {
|
||||
}
|
||||
}
|
||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||
let mut map = self.immutable_page_map.write().unwrap();
|
||||
let mut map = self.immutable_page_map.write();
|
||||
match map.entry((*file_id, *blkno)) {
|
||||
Entry::Occupied(entry) => Some(*entry.get()),
|
||||
Entry::Vacant(entry) => {
|
||||
@@ -765,11 +764,8 @@ impl PageCache {
|
||||
|
||||
if slot.dec_usage_count() == 0 {
|
||||
let mut inner = match slot.inner.try_write() {
|
||||
Ok(inner) => inner,
|
||||
Err(TryLockError::Poisoned(err)) => {
|
||||
panic!("buffer lock was poisoned: {:?}", err)
|
||||
}
|
||||
Err(TryLockError::WouldBlock) => {
|
||||
Some(inner) => inner,
|
||||
None => {
|
||||
// If we have looped through the whole buffer pool 10 times
|
||||
// and still haven't found a victim buffer, something's wrong.
|
||||
// Maybe all the buffers were in locked. That could happen in
|
||||
|
||||
@@ -79,25 +79,23 @@ impl<R: Repository> DatadirTimeline<R> {
|
||||
/// the timeline.
|
||||
///
|
||||
/// This provides a transaction-like interface to perform a bunch
|
||||
/// of modifications atomically.
|
||||
/// of modifications atomically, all stamped with one LSN.
|
||||
///
|
||||
/// To ingest a WAL record, call begin_modification() to get a
|
||||
/// To ingest a WAL record, call begin_modification(lsn) to get a
|
||||
/// DatadirModification object. Use the functions in the object to
|
||||
/// modify the repository state, updating all the pages and metadata
|
||||
/// that the WAL record affects. When you're done, call commit(lsn) to
|
||||
/// commit the changes. All the changes will be stamped with the specified LSN.
|
||||
///
|
||||
/// Calling commit(lsn) will flush all the changes and reset the state,
|
||||
/// so the `DatadirModification` struct can be reused to perform the next modification.
|
||||
/// that the WAL record affects. When you're done, call commit() to
|
||||
/// commit the changes.
|
||||
///
|
||||
/// Note that any pending modifications you make through the
|
||||
/// modification object won't be visible to calls to the 'get' and list
|
||||
/// functions of the timeline until you finish! And if you update the
|
||||
/// same page twice, the last update wins.
|
||||
///
|
||||
pub fn begin_modification(&self) -> DatadirModification<R> {
|
||||
pub fn begin_modification(&self, lsn: Lsn) -> DatadirModification<R> {
|
||||
DatadirModification {
|
||||
tline: self,
|
||||
lsn,
|
||||
pending_updates: HashMap::new(),
|
||||
pending_deletions: Vec::new(),
|
||||
pending_nblocks: 0,
|
||||
@@ -534,6 +532,8 @@ pub struct DatadirModification<'a, R: Repository> {
|
||||
/// in the state in 'tline' yet.
|
||||
pub tline: &'a DatadirTimeline<R>,
|
||||
|
||||
lsn: Lsn,
|
||||
|
||||
// The modifications are not applied directly to the underlying key-value store.
|
||||
// The put-functions add the modifications here, and they are flushed to the
|
||||
// underlying key-value store by the 'finish' function.
|
||||
@@ -904,22 +904,20 @@ impl<'a, R: Repository> DatadirModification<'a, R> {
|
||||
///
|
||||
/// Finish this atomic update, writing all the updated keys to the
|
||||
/// underlying timeline.
|
||||
/// All the modifications in this atomic update are stamped by the specified LSN.
|
||||
///
|
||||
pub fn commit(&mut self, lsn: Lsn) -> Result<()> {
|
||||
pub fn commit(self) -> Result<()> {
|
||||
let writer = self.tline.tline.writer();
|
||||
|
||||
let pending_nblocks = self.pending_nblocks;
|
||||
self.pending_nblocks = 0;
|
||||
|
||||
for (key, value) in self.pending_updates.drain() {
|
||||
writer.put(key, lsn, value)?;
|
||||
for (key, value) in self.pending_updates {
|
||||
writer.put(key, self.lsn, value)?;
|
||||
}
|
||||
for key_range in self.pending_deletions.drain(..) {
|
||||
writer.delete(key_range, lsn)?;
|
||||
for key_range in self.pending_deletions {
|
||||
writer.delete(key_range.clone(), self.lsn)?;
|
||||
}
|
||||
|
||||
writer.finish_write(lsn);
|
||||
writer.finish_write(self.lsn);
|
||||
|
||||
if pending_nblocks != 0 {
|
||||
self.tline.current_logical_size.fetch_add(
|
||||
@@ -1347,9 +1345,9 @@ pub fn create_test_timeline<R: Repository>(
|
||||
) -> Result<Arc<crate::DatadirTimeline<R>>> {
|
||||
let tline = repo.create_empty_timeline(timeline_id, Lsn(8))?;
|
||||
let tline = DatadirTimeline::new(tline, 256 * 1024);
|
||||
let mut m = tline.begin_modification();
|
||||
let mut m = tline.begin_modification(Lsn(8));
|
||||
m.init_empty()?;
|
||||
m.commit(Lsn(8))?;
|
||||
m.commit()?;
|
||||
Ok(Arc::new(tline))
|
||||
}
|
||||
|
||||
|
||||
@@ -37,7 +37,7 @@ pub mod defaults {
|
||||
pub const DEFAULT_PITR_INTERVAL: &str = "30 days";
|
||||
pub const DEFAULT_WALRECEIVER_CONNECT_TIMEOUT: &str = "2 seconds";
|
||||
pub const DEFAULT_WALRECEIVER_LAGGING_WAL_TIMEOUT: &str = "10 seconds";
|
||||
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 10 * 1024 * 1024;
|
||||
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 10_000;
|
||||
}
|
||||
|
||||
/// Per-tenant configuration options
|
||||
|
||||
@@ -78,13 +78,13 @@ impl<'a, R: Repository> WalIngest<'a, R> {
|
||||
///
|
||||
pub fn ingest_record(
|
||||
&mut self,
|
||||
timeline: &DatadirTimeline<R>,
|
||||
recdata: Bytes,
|
||||
lsn: Lsn,
|
||||
modification: &mut DatadirModification<R>,
|
||||
decoded: &mut DecodedWALRecord,
|
||||
) -> Result<()> {
|
||||
decode_wal_record(recdata, decoded).context("failed decoding wal record")?;
|
||||
let mut modification = timeline.begin_modification(lsn);
|
||||
|
||||
let mut decoded = decode_wal_record(recdata).context("failed decoding wal record")?;
|
||||
let mut buf = decoded.record.clone();
|
||||
buf.advance(decoded.main_data_offset);
|
||||
|
||||
@@ -98,7 +98,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
|
||||
if decoded.xl_rmid == pg_constants::RM_HEAP_ID
|
||||
|| decoded.xl_rmid == pg_constants::RM_HEAP2_ID
|
||||
{
|
||||
self.ingest_heapam_record(&mut buf, modification, decoded)?;
|
||||
self.ingest_heapam_record(&mut buf, &mut modification, &mut decoded)?;
|
||||
}
|
||||
// Handle other special record types
|
||||
if decoded.xl_rmid == pg_constants::RM_SMGR_ID
|
||||
@@ -106,19 +106,19 @@ impl<'a, R: Repository> WalIngest<'a, R> {
|
||||
== pg_constants::XLOG_SMGR_CREATE
|
||||
{
|
||||
let create = XlSmgrCreate::decode(&mut buf);
|
||||
self.ingest_xlog_smgr_create(modification, &create)?;
|
||||
self.ingest_xlog_smgr_create(&mut modification, &create)?;
|
||||
} else if decoded.xl_rmid == pg_constants::RM_SMGR_ID
|
||||
&& (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
|
||||
== pg_constants::XLOG_SMGR_TRUNCATE
|
||||
{
|
||||
let truncate = XlSmgrTruncate::decode(&mut buf);
|
||||
self.ingest_xlog_smgr_truncate(modification, &truncate)?;
|
||||
self.ingest_xlog_smgr_truncate(&mut modification, &truncate)?;
|
||||
} else if decoded.xl_rmid == pg_constants::RM_DBASE_ID {
|
||||
if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
|
||||
== pg_constants::XLOG_DBASE_CREATE
|
||||
{
|
||||
let createdb = XlCreateDatabase::decode(&mut buf);
|
||||
self.ingest_xlog_dbase_create(modification, &createdb)?;
|
||||
self.ingest_xlog_dbase_create(&mut modification, &createdb)?;
|
||||
} else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
|
||||
== pg_constants::XLOG_DBASE_DROP
|
||||
{
|
||||
@@ -137,7 +137,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
|
||||
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||
self.put_slru_page_image(
|
||||
modification,
|
||||
&mut modification,
|
||||
SlruKind::Clog,
|
||||
segno,
|
||||
rpageno,
|
||||
@@ -146,7 +146,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
|
||||
} else {
|
||||
assert!(info == pg_constants::CLOG_TRUNCATE);
|
||||
let xlrec = XlClogTruncate::decode(&mut buf);
|
||||
self.ingest_clog_truncate_record(modification, &xlrec)?;
|
||||
self.ingest_clog_truncate_record(&mut modification, &xlrec)?;
|
||||
}
|
||||
} else if decoded.xl_rmid == pg_constants::RM_XACT_ID {
|
||||
let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK;
|
||||
@@ -154,7 +154,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
|
||||
let parsed_xact =
|
||||
XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
|
||||
self.ingest_xact_record(
|
||||
modification,
|
||||
&mut modification,
|
||||
&parsed_xact,
|
||||
info == pg_constants::XLOG_XACT_COMMIT,
|
||||
)?;
|
||||
@@ -164,7 +164,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
|
||||
let parsed_xact =
|
||||
XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info);
|
||||
self.ingest_xact_record(
|
||||
modification,
|
||||
&mut modification,
|
||||
&parsed_xact,
|
||||
info == pg_constants::XLOG_XACT_COMMIT_PREPARED,
|
||||
)?;
|
||||
@@ -187,7 +187,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
|
||||
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||
self.put_slru_page_image(
|
||||
modification,
|
||||
&mut modification,
|
||||
SlruKind::MultiXactOffsets,
|
||||
segno,
|
||||
rpageno,
|
||||
@@ -198,7 +198,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
|
||||
let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||
let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||
self.put_slru_page_image(
|
||||
modification,
|
||||
&mut modification,
|
||||
SlruKind::MultiXactMembers,
|
||||
segno,
|
||||
rpageno,
|
||||
@@ -206,14 +206,14 @@ impl<'a, R: Repository> WalIngest<'a, R> {
|
||||
)?;
|
||||
} else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID {
|
||||
let xlrec = XlMultiXactCreate::decode(&mut buf);
|
||||
self.ingest_multixact_create_record(modification, &xlrec)?;
|
||||
self.ingest_multixact_create_record(&mut modification, &xlrec)?;
|
||||
} else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID {
|
||||
let xlrec = XlMultiXactTruncate::decode(&mut buf);
|
||||
self.ingest_multixact_truncate_record(modification, &xlrec)?;
|
||||
self.ingest_multixact_truncate_record(&mut modification, &xlrec)?;
|
||||
}
|
||||
} else if decoded.xl_rmid == pg_constants::RM_RELMAP_ID {
|
||||
let xlrec = XlRelmapUpdate::decode(&mut buf);
|
||||
self.ingest_relmap_page(modification, &xlrec, decoded)?;
|
||||
self.ingest_relmap_page(&mut modification, &xlrec, &decoded)?;
|
||||
} else if decoded.xl_rmid == pg_constants::RM_XLOG_ID {
|
||||
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
||||
if info == pg_constants::XLOG_NEXTOID {
|
||||
@@ -248,7 +248,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
|
||||
// Iterate through all the blocks that the record modifies, and
|
||||
// "put" a separate copy of the record for each block.
|
||||
for blk in decoded.blocks.iter() {
|
||||
self.ingest_decoded_block(modification, lsn, decoded, blk)?;
|
||||
self.ingest_decoded_block(&mut modification, lsn, &decoded, blk)?;
|
||||
}
|
||||
|
||||
// If checkpoint data was updated, store the new version in the repository
|
||||
@@ -261,7 +261,7 @@ impl<'a, R: Repository> WalIngest<'a, R> {
|
||||
|
||||
// Now that this record has been fully handled, including updating the
|
||||
// checkpoint data, let the repository know that it is up-to-date to this LSN
|
||||
modification.commit(lsn)?;
|
||||
modification.commit()?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1069,10 +1069,10 @@ mod tests {
|
||||
static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]);
|
||||
|
||||
fn init_walingest_test<R: Repository>(tline: &DatadirTimeline<R>) -> Result<WalIngest<R>> {
|
||||
let mut m = tline.begin_modification();
|
||||
let mut m = tline.begin_modification(Lsn(0x10));
|
||||
m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
|
||||
m.put_relmap_file(0, 111, Bytes::from(""))?; // dummy relmapper file
|
||||
m.commit(Lsn(0x10))?;
|
||||
m.commit()?;
|
||||
let walingest = WalIngest::new(tline, Lsn(0x10))?;
|
||||
|
||||
Ok(walingest)
|
||||
@@ -1084,19 +1084,19 @@ mod tests {
|
||||
let tline = create_test_timeline(repo, TIMELINE_ID)?;
|
||||
let mut walingest = init_walingest_test(&tline)?;
|
||||
|
||||
let mut m = tline.begin_modification();
|
||||
let mut m = tline.begin_modification(Lsn(0x20));
|
||||
walingest.put_rel_creation(&mut m, TESTREL_A)?;
|
||||
walingest.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
|
||||
m.commit(Lsn(0x20))?;
|
||||
let mut m = tline.begin_modification();
|
||||
m.commit()?;
|
||||
let mut m = tline.begin_modification(Lsn(0x30));
|
||||
walingest.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 3"))?;
|
||||
m.commit(Lsn(0x30))?;
|
||||
let mut m = tline.begin_modification();
|
||||
m.commit()?;
|
||||
let mut m = tline.begin_modification(Lsn(0x40));
|
||||
walingest.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1 at 4"))?;
|
||||
m.commit(Lsn(0x40))?;
|
||||
let mut m = tline.begin_modification();
|
||||
m.commit()?;
|
||||
let mut m = tline.begin_modification(Lsn(0x50));
|
||||
walingest.put_rel_page_image(&mut m, TESTREL_A, 2, TEST_IMG("foo blk 2 at 5"))?;
|
||||
m.commit(Lsn(0x50))?;
|
||||
m.commit()?;
|
||||
|
||||
assert_current_logical_size(&tline, Lsn(0x50));
|
||||
|
||||
@@ -1142,9 +1142,9 @@ mod tests {
|
||||
);
|
||||
|
||||
// Truncate last block
|
||||
let mut m = tline.begin_modification();
|
||||
let mut m = tline.begin_modification(Lsn(0x60));
|
||||
walingest.put_rel_truncation(&mut m, TESTREL_A, 2)?;
|
||||
m.commit(Lsn(0x60))?;
|
||||
m.commit()?;
|
||||
assert_current_logical_size(&tline, Lsn(0x60));
|
||||
|
||||
// Check reported size and contents after truncation
|
||||
@@ -1166,15 +1166,15 @@ mod tests {
|
||||
);
|
||||
|
||||
// Truncate to zero length
|
||||
let mut m = tline.begin_modification();
|
||||
let mut m = tline.begin_modification(Lsn(0x68));
|
||||
walingest.put_rel_truncation(&mut m, TESTREL_A, 0)?;
|
||||
m.commit(Lsn(0x68))?;
|
||||
m.commit()?;
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x68))?, 0);
|
||||
|
||||
// Extend from 0 to 2 blocks, leaving a gap
|
||||
let mut m = tline.begin_modification();
|
||||
let mut m = tline.begin_modification(Lsn(0x70));
|
||||
walingest.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1"))?;
|
||||
m.commit(Lsn(0x70))?;
|
||||
m.commit()?;
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x70))?, 2);
|
||||
assert_eq!(
|
||||
tline.get_rel_page_at_lsn(TESTREL_A, 0, Lsn(0x70))?,
|
||||
@@ -1186,9 +1186,9 @@ mod tests {
|
||||
);
|
||||
|
||||
// Extend a lot more, leaving a big gap that spans across segments
|
||||
let mut m = tline.begin_modification();
|
||||
let mut m = tline.begin_modification(Lsn(0x80));
|
||||
walingest.put_rel_page_image(&mut m, TESTREL_A, 1500, TEST_IMG("foo blk 1500"))?;
|
||||
m.commit(Lsn(0x80))?;
|
||||
m.commit()?;
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x80))?, 1501);
|
||||
for blk in 2..1500 {
|
||||
assert_eq!(
|
||||
@@ -1212,18 +1212,18 @@ mod tests {
|
||||
let tline = create_test_timeline(repo, TIMELINE_ID)?;
|
||||
let mut walingest = init_walingest_test(&tline)?;
|
||||
|
||||
let mut m = tline.begin_modification();
|
||||
let mut m = tline.begin_modification(Lsn(0x20));
|
||||
walingest.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"))?;
|
||||
m.commit(Lsn(0x20))?;
|
||||
m.commit()?;
|
||||
|
||||
// Check that rel exists and size is correct
|
||||
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x20))?, true);
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x20))?, 1);
|
||||
|
||||
// Drop rel
|
||||
let mut m = tline.begin_modification();
|
||||
let mut m = tline.begin_modification(Lsn(0x30));
|
||||
walingest.put_rel_drop(&mut m, TESTREL_A)?;
|
||||
m.commit(Lsn(0x30))?;
|
||||
m.commit()?;
|
||||
|
||||
// Check that rel is not visible anymore
|
||||
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x30))?, false);
|
||||
@@ -1232,9 +1232,9 @@ mod tests {
|
||||
//assert!(tline.get_rel_size(TESTREL_A, Lsn(0x30))?.is_none());
|
||||
|
||||
// Re-create it
|
||||
let mut m = tline.begin_modification();
|
||||
let mut m = tline.begin_modification(Lsn(0x40));
|
||||
walingest.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 4"))?;
|
||||
m.commit(Lsn(0x40))?;
|
||||
m.commit()?;
|
||||
|
||||
// Check that rel exists and size is correct
|
||||
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x40))?, true);
|
||||
@@ -1254,12 +1254,12 @@ mod tests {
|
||||
|
||||
// Create a 20 MB relation (the size is arbitrary)
|
||||
let relsize = 20 * 1024 * 1024 / 8192;
|
||||
let mut m = tline.begin_modification();
|
||||
let mut m = tline.begin_modification(Lsn(0x20));
|
||||
for blkno in 0..relsize {
|
||||
let data = format!("foo blk {} at {}", blkno, Lsn(0x20));
|
||||
walingest.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data))?;
|
||||
}
|
||||
m.commit(Lsn(0x20))?;
|
||||
m.commit()?;
|
||||
|
||||
// The relation was created at LSN 20, not visible at LSN 1 yet.
|
||||
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x10))?, false);
|
||||
@@ -1280,9 +1280,9 @@ mod tests {
|
||||
|
||||
// Truncate relation so that second segment was dropped
|
||||
// - only leave one page
|
||||
let mut m = tline.begin_modification();
|
||||
let mut m = tline.begin_modification(Lsn(0x60));
|
||||
walingest.put_rel_truncation(&mut m, TESTREL_A, 1)?;
|
||||
m.commit(Lsn(0x60))?;
|
||||
m.commit()?;
|
||||
|
||||
// Check reported size and contents after truncation
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x60))?, 1);
|
||||
@@ -1310,12 +1310,12 @@ mod tests {
|
||||
// Extend relation again.
|
||||
// Add enough blocks to create second segment
|
||||
let lsn = Lsn(0x80);
|
||||
let mut m = tline.begin_modification();
|
||||
let mut m = tline.begin_modification(lsn);
|
||||
for blkno in 0..relsize {
|
||||
let data = format!("foo blk {} at {}", blkno, lsn);
|
||||
walingest.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data))?;
|
||||
}
|
||||
m.commit(lsn)?;
|
||||
m.commit()?;
|
||||
|
||||
assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(0x80))?, true);
|
||||
assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(0x80))?, relsize);
|
||||
@@ -1343,10 +1343,10 @@ mod tests {
|
||||
let mut lsn = 0x10;
|
||||
for blknum in 0..pg_constants::RELSEG_SIZE + 1 {
|
||||
lsn += 0x10;
|
||||
let mut m = tline.begin_modification();
|
||||
let mut m = tline.begin_modification(Lsn(lsn));
|
||||
let img = TEST_IMG(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
|
||||
walingest.put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img)?;
|
||||
m.commit(Lsn(lsn))?;
|
||||
m.commit()?;
|
||||
}
|
||||
|
||||
assert_current_logical_size(&tline, Lsn(lsn));
|
||||
@@ -1358,9 +1358,9 @@ mod tests {
|
||||
|
||||
// Truncate one block
|
||||
lsn += 0x10;
|
||||
let mut m = tline.begin_modification();
|
||||
let mut m = tline.begin_modification(Lsn(lsn));
|
||||
walingest.put_rel_truncation(&mut m, TESTREL_A, pg_constants::RELSEG_SIZE)?;
|
||||
m.commit(Lsn(lsn))?;
|
||||
m.commit()?;
|
||||
assert_eq!(
|
||||
tline.get_rel_size(TESTREL_A, Lsn(lsn))?,
|
||||
pg_constants::RELSEG_SIZE
|
||||
@@ -1369,9 +1369,9 @@ mod tests {
|
||||
|
||||
// Truncate another block
|
||||
lsn += 0x10;
|
||||
let mut m = tline.begin_modification();
|
||||
let mut m = tline.begin_modification(Lsn(lsn));
|
||||
walingest.put_rel_truncation(&mut m, TESTREL_A, pg_constants::RELSEG_SIZE - 1)?;
|
||||
m.commit(Lsn(lsn))?;
|
||||
m.commit()?;
|
||||
assert_eq!(
|
||||
tline.get_rel_size(TESTREL_A, Lsn(lsn))?,
|
||||
pg_constants::RELSEG_SIZE - 1
|
||||
@@ -1383,9 +1383,9 @@ mod tests {
|
||||
let mut size: i32 = 3000;
|
||||
while size >= 0 {
|
||||
lsn += 0x10;
|
||||
let mut m = tline.begin_modification();
|
||||
let mut m = tline.begin_modification(Lsn(lsn));
|
||||
walingest.put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber)?;
|
||||
m.commit(Lsn(lsn))?;
|
||||
m.commit()?;
|
||||
assert_eq!(
|
||||
tline.get_rel_size(TESTREL_A, Lsn(lsn))?,
|
||||
size as BlockNumber
|
||||
|
||||
@@ -178,7 +178,7 @@ async fn shutdown_all_wal_connections(
|
||||
/// That may lead to certain events not being observed by the listener.
|
||||
#[derive(Debug)]
|
||||
struct TaskHandle<E> {
|
||||
handle: JoinHandle<Result<(), String>>,
|
||||
handle: JoinHandle<()>,
|
||||
events_receiver: watch::Receiver<TaskEvent<E>>,
|
||||
cancellation: watch::Sender<()>,
|
||||
}
|
||||
@@ -205,8 +205,8 @@ impl<E: Clone> TaskHandle<E> {
|
||||
|
||||
let sender = Arc::clone(&events_sender);
|
||||
let handle = tokio::task::spawn(async move {
|
||||
events_sender.send(TaskEvent::Started).ok();
|
||||
task(sender, cancellation_receiver).await
|
||||
let task_result = task(sender, cancellation_receiver).await;
|
||||
events_sender.send(TaskEvent::End(task_result)).ok();
|
||||
});
|
||||
|
||||
TaskHandle {
|
||||
@@ -216,16 +216,6 @@ impl<E: Clone> TaskHandle<E> {
|
||||
}
|
||||
}
|
||||
|
||||
async fn next_task_event(&mut self) -> TaskEvent<E> {
|
||||
select! {
|
||||
next_task_event = self.events_receiver.changed() => match next_task_event {
|
||||
Ok(()) => self.events_receiver.borrow().clone(),
|
||||
Err(_task_channel_part_dropped) => join_on_handle(&mut self.handle).await,
|
||||
},
|
||||
task_completion_result = join_on_handle(&mut self.handle) => task_completion_result,
|
||||
}
|
||||
}
|
||||
|
||||
/// Aborts current task, waiting for it to finish.
|
||||
async fn shutdown(self) {
|
||||
self.cancellation.send(()).ok();
|
||||
@@ -235,19 +225,6 @@ impl<E: Clone> TaskHandle<E> {
|
||||
}
|
||||
}
|
||||
|
||||
async fn join_on_handle<E>(handle: &mut JoinHandle<Result<(), String>>) -> TaskEvent<E> {
|
||||
match handle.await {
|
||||
Ok(task_result) => TaskEvent::End(task_result),
|
||||
Err(e) => {
|
||||
if e.is_cancelled() {
|
||||
TaskEvent::End(Ok(()))
|
||||
} else {
|
||||
TaskEvent::End(Err(format!("WAL receiver task panicked: {e}")))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A step to process timeline attach/detach events to enable/disable the corresponding WAL receiver machinery.
|
||||
/// In addition to WAL streaming management, the step ensures that corresponding tenant has its service threads enabled or disabled.
|
||||
/// This is done here, since only walreceiver knows when a certain tenant has no streaming enabled.
|
||||
|
||||
@@ -104,29 +104,49 @@ async fn connection_manager_loop_step(
|
||||
|
||||
Some(wal_connection_update) = async {
|
||||
match walreceiver_state.wal_connection.as_mut() {
|
||||
Some(wal_connection) => Some(wal_connection.connection_task.next_task_event().await),
|
||||
Some(wal_connection) => {
|
||||
let receiver = &mut wal_connection.connection_task.events_receiver;
|
||||
Some(match receiver.changed().await {
|
||||
Ok(()) => receiver.borrow().clone(),
|
||||
Err(_cancellation_error) => TaskEvent::End(Ok(())),
|
||||
})
|
||||
}
|
||||
None => None,
|
||||
}
|
||||
} => {
|
||||
let wal_connection = walreceiver_state.wal_connection.as_mut().expect("Should have a connection, as checked by the corresponding select! guard");
|
||||
match &wal_connection_update {
|
||||
TaskEvent::Started => {
|
||||
wal_connection.latest_connection_update = Utc::now().naive_utc();
|
||||
*walreceiver_state.wal_connection_attempts.entry(wal_connection.sk_id).or_insert(0) += 1;
|
||||
},
|
||||
TaskEvent::NewEvent(replication_feedback) => {
|
||||
wal_connection.latest_connection_update = DateTime::<Local>::from(replication_feedback.ps_replytime).naive_utc();
|
||||
// reset connection attempts here only, the only place where both nodes
|
||||
// explicitly confirmn with replication feedback that they are connected to each other
|
||||
walreceiver_state.wal_connection_attempts.remove(&wal_connection.sk_id);
|
||||
},
|
||||
let (connection_update, reset_connection_attempts) = match &wal_connection_update {
|
||||
TaskEvent::Started => (Some(Utc::now().naive_utc()), true),
|
||||
TaskEvent::NewEvent(replication_feedback) => (Some(DateTime::<Local>::from(replication_feedback.ps_replytime).naive_utc()), true),
|
||||
TaskEvent::End(end_result) => {
|
||||
match end_result {
|
||||
Ok(()) => debug!("WAL receiving task finished"),
|
||||
Err(e) => warn!("WAL receiving task failed: {e}"),
|
||||
let should_reset_connection_attempts = match end_result {
|
||||
Ok(()) => {
|
||||
debug!("WAL receiving task finished");
|
||||
true
|
||||
},
|
||||
Err(e) => {
|
||||
warn!("WAL receiving task failed: {e}");
|
||||
false
|
||||
},
|
||||
};
|
||||
walreceiver_state.wal_connection = None;
|
||||
(None, should_reset_connection_attempts)
|
||||
},
|
||||
};
|
||||
|
||||
if let Some(connection_update) = connection_update {
|
||||
match &mut walreceiver_state.wal_connection {
|
||||
Some(wal_connection) => {
|
||||
wal_connection.latest_connection_update = connection_update;
|
||||
|
||||
let attempts_entry = walreceiver_state.wal_connection_attempts.entry(wal_connection.sk_id).or_insert(0);
|
||||
if reset_connection_attempts {
|
||||
*attempts_entry = 0;
|
||||
} else {
|
||||
*attempts_entry += 1;
|
||||
}
|
||||
},
|
||||
None => error!("Received connection update for WAL connection that is not active, update: {wal_connection_update:?}"),
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
@@ -386,8 +406,10 @@ impl WalreceiverState {
|
||||
Some(existing_wal_connection) => {
|
||||
let connected_sk_node = existing_wal_connection.sk_id;
|
||||
|
||||
let (new_sk_id, new_safekeeper_etcd_data, new_wal_producer_connstr) =
|
||||
self.select_connection_candidate(Some(connected_sk_node))?;
|
||||
let (new_sk_id, new_safekeeper_etcd_data, new_wal_producer_connstr) = self
|
||||
.applicable_connection_candidates()
|
||||
.filter(|&(sk_id, _, _)| sk_id != connected_sk_node)
|
||||
.max_by_key(|(_, info, _)| info.commit_lsn)?;
|
||||
|
||||
let now = Utc::now().naive_utc();
|
||||
if let Ok(latest_interaciton) =
|
||||
@@ -440,8 +462,9 @@ impl WalreceiverState {
|
||||
}
|
||||
}
|
||||
None => {
|
||||
let (new_sk_id, _, new_wal_producer_connstr) =
|
||||
self.select_connection_candidate(None)?;
|
||||
let (new_sk_id, _, new_wal_producer_connstr) = self
|
||||
.applicable_connection_candidates()
|
||||
.max_by_key(|(_, info, _)| info.commit_lsn)?;
|
||||
return Some(NewWalConnectionCandidate {
|
||||
safekeeper_id: new_sk_id,
|
||||
wal_producer_connstr: new_wal_producer_connstr,
|
||||
@@ -453,49 +476,6 @@ impl WalreceiverState {
|
||||
None
|
||||
}
|
||||
|
||||
/// Selects the best possible candidate, based on the data collected from etcd updates about the safekeepers.
|
||||
/// Optionally, omits the given node, to support gracefully switching from a healthy safekeeper to another.
|
||||
///
|
||||
/// The candidate that is chosen:
|
||||
/// * has fewest connection attempts from pageserver to safekeeper node (reset every time the WAL replication feedback is sent)
|
||||
/// * has greatest data Lsn among the ones that are left
|
||||
///
|
||||
/// NOTE:
|
||||
/// We evict timeline data received from etcd based on time passed since it was registered, along with its connection attempts values, but
|
||||
/// otherwise to reset the connection attempts, a successful connection to that node is needed.
|
||||
/// That won't happen now, before all nodes with less connection attempts are connected to first, which might leave the sk node with more advanced state to be ignored.
|
||||
fn select_connection_candidate(
|
||||
&self,
|
||||
node_to_omit: Option<NodeId>,
|
||||
) -> Option<(NodeId, &SkTimelineInfo, String)> {
|
||||
let all_candidates = self
|
||||
.applicable_connection_candidates()
|
||||
.filter(|&(sk_id, _, _)| Some(sk_id) != node_to_omit)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let smallest_attempts_allowed = all_candidates
|
||||
.iter()
|
||||
.map(|(sk_id, _, _)| {
|
||||
self.wal_connection_attempts
|
||||
.get(sk_id)
|
||||
.copied()
|
||||
.unwrap_or(0)
|
||||
})
|
||||
.min()?;
|
||||
|
||||
all_candidates
|
||||
.into_iter()
|
||||
.filter(|(sk_id, _, _)| {
|
||||
smallest_attempts_allowed
|
||||
>= self
|
||||
.wal_connection_attempts
|
||||
.get(sk_id)
|
||||
.copied()
|
||||
.unwrap_or(0)
|
||||
})
|
||||
.max_by_key(|(_, info, _)| info.commit_lsn)
|
||||
}
|
||||
|
||||
fn applicable_connection_candidates(
|
||||
&self,
|
||||
) -> impl Iterator<Item = (NodeId, &SkTimelineInfo, String)> {
|
||||
@@ -520,25 +500,15 @@ impl WalreceiverState {
|
||||
}
|
||||
|
||||
fn cleanup_old_candidates(&mut self) {
|
||||
let mut node_ids_to_remove = Vec::with_capacity(self.wal_stream_candidates.len());
|
||||
|
||||
self.wal_stream_candidates.retain(|node_id, etcd_info| {
|
||||
self.wal_stream_candidates.retain(|_, etcd_info| {
|
||||
if let Ok(time_since_latest_etcd_update) =
|
||||
(Utc::now().naive_utc() - etcd_info.latest_update).to_std()
|
||||
{
|
||||
let should_retain = time_since_latest_etcd_update < self.lagging_wal_timeout;
|
||||
if !should_retain {
|
||||
node_ids_to_remove.push(*node_id);
|
||||
}
|
||||
should_retain
|
||||
time_since_latest_etcd_update < self.lagging_wal_timeout
|
||||
} else {
|
||||
true
|
||||
}
|
||||
});
|
||||
|
||||
for node_id in node_ids_to_remove {
|
||||
self.wal_connection_attempts.remove(&node_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -873,64 +843,6 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn candidate_with_many_connection_failures() -> anyhow::Result<()> {
|
||||
let harness = RepoHarness::create("candidate_with_many_connection_failures")?;
|
||||
let mut state = dummy_state(&harness);
|
||||
let now = Utc::now().naive_utc();
|
||||
|
||||
let current_lsn = Lsn(100_000).align();
|
||||
let bigger_lsn = Lsn(current_lsn.0 + 100).align();
|
||||
|
||||
state.wal_connection = None;
|
||||
state.wal_stream_candidates = HashMap::from([
|
||||
(
|
||||
NodeId(0),
|
||||
EtcdSkTimeline {
|
||||
timeline: SkTimelineInfo {
|
||||
last_log_term: None,
|
||||
flush_lsn: None,
|
||||
commit_lsn: Some(bigger_lsn),
|
||||
backup_lsn: None,
|
||||
remote_consistent_lsn: None,
|
||||
peer_horizon_lsn: None,
|
||||
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
|
||||
},
|
||||
etcd_version: 0,
|
||||
latest_update: now,
|
||||
},
|
||||
),
|
||||
(
|
||||
NodeId(1),
|
||||
EtcdSkTimeline {
|
||||
timeline: SkTimelineInfo {
|
||||
last_log_term: None,
|
||||
flush_lsn: None,
|
||||
commit_lsn: Some(current_lsn),
|
||||
backup_lsn: None,
|
||||
remote_consistent_lsn: None,
|
||||
peer_horizon_lsn: None,
|
||||
safekeeper_connstr: Some(DUMMY_SAFEKEEPER_CONNSTR.to_string()),
|
||||
},
|
||||
etcd_version: 0,
|
||||
latest_update: now,
|
||||
},
|
||||
),
|
||||
]);
|
||||
state.wal_connection_attempts = HashMap::from([(NodeId(0), 1), (NodeId(1), 0)]);
|
||||
|
||||
let candidate_with_less_errors = state
|
||||
.next_connection_candidate()
|
||||
.expect("Expected one candidate selected, but got none");
|
||||
assert_eq!(
|
||||
candidate_with_less_errors.safekeeper_id,
|
||||
NodeId(1),
|
||||
"Should select the node with less connection errors"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn connection_no_etcd_data_candidate() -> anyhow::Result<()> {
|
||||
let harness = RepoHarness::create("connection_no_etcd_data_candidate")?;
|
||||
|
||||
@@ -23,7 +23,6 @@ use crate::{
|
||||
repository::{Repository, Timeline},
|
||||
tenant_mgr,
|
||||
walingest::WalIngest,
|
||||
walrecord::DecodedWALRecord,
|
||||
};
|
||||
use postgres_ffi::waldecoder::WalStreamDecoder;
|
||||
use utils::{lsn::Lsn, pq_proto::ReplicationFeedback, zid::ZTenantTimelineId};
|
||||
@@ -151,25 +150,19 @@ pub async fn handle_walreceiver_connection(
|
||||
|
||||
waldecoder.feed_bytes(data);
|
||||
|
||||
{
|
||||
let mut decoded = DecodedWALRecord::default();
|
||||
let mut modification = timeline.begin_modification();
|
||||
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
// let _enter = info_span!("processing record", lsn = %lsn).entered();
|
||||
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
|
||||
let _enter = info_span!("processing record", lsn = %lsn).entered();
|
||||
|
||||
// It is important to deal with the aligned records as lsn in getPage@LSN is
|
||||
// aligned and can be several bytes bigger. Without this alignment we are
|
||||
// at risk of hitting a deadlock.
|
||||
ensure!(lsn.is_aligned());
|
||||
// It is important to deal with the aligned records as lsn in getPage@LSN is
|
||||
// aligned and can be several bytes bigger. Without this alignment we are
|
||||
// at risk of hitting a deadlock.
|
||||
ensure!(lsn.is_aligned());
|
||||
|
||||
walingest
|
||||
.ingest_record(recdata, lsn, &mut modification, &mut decoded)
|
||||
.context("could not ingest record at {lsn}")?;
|
||||
walingest.ingest_record(&timeline, recdata, lsn)?;
|
||||
|
||||
fail_point!("walreceiver-after-ingest");
|
||||
fail_point!("walreceiver-after-ingest");
|
||||
|
||||
last_rec_lsn = lsn;
|
||||
}
|
||||
last_rec_lsn = lsn;
|
||||
}
|
||||
|
||||
if !caught_up && endlsn >= end_of_wal {
|
||||
|
||||
@@ -96,7 +96,6 @@ impl DecodedBkpBlock {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct DecodedWALRecord {
|
||||
pub xl_xid: TransactionId,
|
||||
pub xl_info: u8,
|
||||
@@ -506,10 +505,7 @@ impl XlMultiXactTruncate {
|
||||
// block data
|
||||
// ...
|
||||
// main data
|
||||
pub fn decode_wal_record(
|
||||
record: Bytes,
|
||||
decoded: &mut DecodedWALRecord,
|
||||
) -> Result<(), DeserializeError> {
|
||||
pub fn decode_wal_record(record: Bytes) -> Result<DecodedWALRecord, DeserializeError> {
|
||||
let mut rnode_spcnode: u32 = 0;
|
||||
let mut rnode_dbnode: u32 = 0;
|
||||
let mut rnode_relnode: u32 = 0;
|
||||
@@ -538,9 +534,7 @@ pub fn decode_wal_record(
|
||||
let mut blocks_total_len: u32 = 0;
|
||||
let mut main_data_len = 0;
|
||||
let mut datatotal: u32 = 0;
|
||||
if !decoded.blocks.is_empty() {
|
||||
decoded.blocks.clear();
|
||||
}
|
||||
let mut blocks: Vec<DecodedBkpBlock> = Vec::new();
|
||||
|
||||
// 2. Decode the headers.
|
||||
// XLogRecordBlockHeaders if any,
|
||||
@@ -719,7 +713,7 @@ pub fn decode_wal_record(
|
||||
blk.blkno
|
||||
);
|
||||
|
||||
decoded.blocks.push(blk);
|
||||
blocks.push(blk);
|
||||
}
|
||||
|
||||
_ => {
|
||||
@@ -730,7 +724,7 @@ pub fn decode_wal_record(
|
||||
|
||||
// 3. Decode blocks.
|
||||
let mut ptr = record.len() - buf.remaining();
|
||||
for blk in decoded.blocks.iter_mut() {
|
||||
for blk in blocks.iter_mut() {
|
||||
if blk.has_image {
|
||||
blk.bimg_offset = ptr as u32;
|
||||
ptr += blk.bimg_len as usize;
|
||||
@@ -750,13 +744,14 @@ pub fn decode_wal_record(
|
||||
assert_eq!(buf.remaining(), main_data_len as usize);
|
||||
}
|
||||
|
||||
decoded.xl_xid = xlogrec.xl_xid;
|
||||
decoded.xl_info = xlogrec.xl_info;
|
||||
decoded.xl_rmid = xlogrec.xl_rmid;
|
||||
decoded.record = record;
|
||||
decoded.main_data_offset = main_data_offset;
|
||||
|
||||
Ok(())
|
||||
Ok(DecodedWALRecord {
|
||||
xl_xid: xlogrec.xl_xid,
|
||||
xl_info: xlogrec.xl_info,
|
||||
xl_rmid: xlogrec.xl_rmid,
|
||||
record,
|
||||
blocks,
|
||||
main_data_offset,
|
||||
})
|
||||
}
|
||||
|
||||
///
|
||||
|
||||
@@ -1,6 +1,4 @@
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
from typing import List
|
||||
|
||||
import pytest
|
||||
@@ -101,34 +99,3 @@ def test_compare_pg_stats_wal_with_pgbench_default(neon_with_baseline: PgCompare
|
||||
env.pg_bin.run_capture(
|
||||
['pgbench', f'-T{duration}', f'--random-seed={seed}', '-Mprepared', env.pg.connstr()])
|
||||
env.flush()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("n_tables", [1, 10])
|
||||
@pytest.mark.parametrize("duration", get_durations_matrix(10))
|
||||
def test_compare_pg_stats_wo_with_heavy_write(neon_with_baseline: PgCompare,
|
||||
n_tables: int,
|
||||
duration: int,
|
||||
pg_stats_wo: List[PgStatTable]):
|
||||
env = neon_with_baseline
|
||||
with env.pg.connect().cursor() as cur:
|
||||
for i in range(n_tables):
|
||||
cur.execute(
|
||||
f"CREATE TABLE t{i}(key serial primary key, t text default 'foooooooooooooooooooooooooooooooooooooooooooooooooooo')"
|
||||
)
|
||||
|
||||
def start_single_table_workload(table_id: int):
|
||||
start = time.time()
|
||||
with env.pg.connect().cursor() as cur:
|
||||
while time.time() - start < duration:
|
||||
cur.execute(f"INSERT INTO t{table_id} SELECT FROM generate_series(1,1000)")
|
||||
|
||||
with env.record_pg_stats(pg_stats_wo):
|
||||
threads = [
|
||||
threading.Thread(target=start_single_table_workload, args=(i, ))
|
||||
for i in range(n_tables)
|
||||
]
|
||||
|
||||
for thread in threads:
|
||||
thread.start()
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
Reference in New Issue
Block a user