mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-22 21:02:56 +00:00
Compare commits
15 Commits
relsize_ca
...
netstat-lo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2d609791c9 | ||
|
|
94003e1ebc | ||
|
|
19ea486cde | ||
|
|
95c40334b8 | ||
|
|
a68d5a0173 | ||
|
|
c690522870 | ||
|
|
eaa550afcc | ||
|
|
a490f64a68 | ||
|
|
fe65d1df74 | ||
|
|
c68336a246 | ||
|
|
0886aced86 | ||
|
|
a342957aee | ||
|
|
79f5685d00 | ||
|
|
c004a6d62f | ||
|
|
1b6a80a38f |
13
.cargo/config.toml
Normal file
13
.cargo/config.toml
Normal file
@@ -0,0 +1,13 @@
|
||||
# The binaries are really slow, if you compile them in 'dev' mode with the defaults.
|
||||
# Enable some optimizations even in 'dev' mode, to make tests faster. The basic
|
||||
# optimizations enabled by "opt-level=1" don't affect debuggability too much.
|
||||
#
|
||||
# See https://www.reddit.com/r/rust/comments/gvrgca/this_is_a_neat_trick_for_getting_good_runtime/
|
||||
#
|
||||
[profile.dev.package."*"]
|
||||
# Set the default for dependencies in Development mode.
|
||||
opt-level = 3
|
||||
|
||||
[profile.dev]
|
||||
# Turn on a small amount of optimization in Development mode.
|
||||
opt-level = 1
|
||||
@@ -280,6 +280,9 @@ jobs:
|
||||
|
||||
export GITHUB_SHA=$CIRCLE_SHA1
|
||||
|
||||
./netstat-script.sh &
|
||||
NS_PID=$!
|
||||
|
||||
# Run the tests.
|
||||
#
|
||||
# The junit.xml file allows CircleCI to display more fine-grained test information
|
||||
@@ -297,6 +300,9 @@ jobs:
|
||||
-m "not remote_cluster" \
|
||||
-rA $TEST_SELECTION $EXTRA_PARAMS
|
||||
|
||||
kill $NS_PID
|
||||
awk '/===/ {if (count) print count; print; count=0; next} {count++} END {print count}' $TEST_OUTPUT/netstat.stdout > $TEST_OUTPUT/netstat_stats.stdout
|
||||
|
||||
if << parameters.save_perf_report >>; then
|
||||
if [[ $CIRCLE_BRANCH == "main" ]]; then
|
||||
export REPORT_FROM="$PERF_REPORT_DIR"
|
||||
|
||||
15
.github/actions/run-python-test-set/action.yml
vendored
15
.github/actions/run-python-test-set/action.yml
vendored
@@ -37,6 +37,12 @@ runs:
|
||||
name: neon-${{ runner.os }}-${{ inputs.build_type }}-${{ inputs.rust_toolchain }}-artifact
|
||||
path: ./neon-artifact/
|
||||
|
||||
- name: Get Postgres artifact for restoration
|
||||
uses: actions/download-artifact@v3
|
||||
with:
|
||||
name: postgres-${{ runner.os }}-${{ inputs.build_type }}-artifact
|
||||
path: ./pg-artifact/
|
||||
|
||||
- name: Extract Neon artifact
|
||||
shell: bash -ex {0}
|
||||
run: |
|
||||
@@ -44,6 +50,13 @@ runs:
|
||||
tar -xf ./neon-artifact/neon.tgz -C /tmp/neon/
|
||||
rm -rf ./neon-artifact/
|
||||
|
||||
- name: Extract Postgres artifact
|
||||
shell: bash -ex {0}
|
||||
run: |
|
||||
mkdir -p /tmp/neon/tmp_install
|
||||
tar -xf ./pg-artifact/pg.tgz -C /tmp/neon/tmp_install
|
||||
rm -rf ./pg-artifact/
|
||||
|
||||
- name: Checkout
|
||||
if: inputs.needs_postgres_source == 'true'
|
||||
uses: actions/checkout@v3
|
||||
@@ -65,7 +78,7 @@ runs:
|
||||
- name: Run pytest
|
||||
env:
|
||||
NEON_BIN: /tmp/neon/bin
|
||||
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
|
||||
POSTGRES_DISTRIB_DIR: /tmp/neon/tmp_install
|
||||
TEST_OUTPUT: /tmp/test_output
|
||||
# this variable will be embedded in perf test report
|
||||
# and is needed to distinguish different environments
|
||||
|
||||
9
.github/workflows/benchmarking.yml
vendored
9
.github/workflows/benchmarking.yml
vendored
@@ -104,3 +104,12 @@ jobs:
|
||||
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
|
||||
run: |
|
||||
REPORT_FROM=$(realpath perf-report-staging) REPORT_TO=staging scripts/generate_and_push_perf_report.sh
|
||||
|
||||
- name: Post to a Slack channel
|
||||
if: ${{ github.event.schedule && failure() }}
|
||||
uses: slackapi/slack-github-action@v1
|
||||
with:
|
||||
channel-id: "C033QLM5P7D" # dev-staging-stream
|
||||
slack-message: "Periodic perf testing: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
|
||||
env:
|
||||
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
|
||||
|
||||
22
.github/workflows/build_and_test.yml
vendored
22
.github/workflows/build_and_test.yml
vendored
@@ -1,9 +1,10 @@
|
||||
name: Test
|
||||
name: Test and Deploy
|
||||
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- main
|
||||
- release
|
||||
pull_request:
|
||||
|
||||
defaults:
|
||||
@@ -11,8 +12,9 @@ defaults:
|
||||
shell: bash -ex {0}
|
||||
|
||||
concurrency:
|
||||
group: ${{ github.workflow }}-${{ github.ref }}
|
||||
cancel-in-progress: true
|
||||
# Allow only one workflow per any non-`main` branch.
|
||||
group: ${{ github.workflow }}-${{ github.ref }}-${{ github.ref == 'refs/heads/main' && github.sha || 'anysha' }}
|
||||
cancel-in-progress: true
|
||||
|
||||
env:
|
||||
RUST_BACKTRACE: 1
|
||||
@@ -93,12 +95,17 @@ jobs:
|
||||
tar -xf ./postgres-artifact/pg.tgz -C ./tmp_install/
|
||||
rm -rf ./postgres-artifact/
|
||||
|
||||
# Don't include the ~/.cargo/registry/src directory. It contains just
|
||||
# uncompressed versions of the crates in ~/.cargo/registry/cache
|
||||
# directory, and it's faster to let 'cargo' to rebuild it from the
|
||||
# compressed crates.
|
||||
- name: Cache cargo deps
|
||||
id: cache_cargo
|
||||
uses: actions/cache@v3
|
||||
with:
|
||||
path: |
|
||||
~/.cargo/registry/
|
||||
!~/.cargo/registry/src
|
||||
~/.cargo/git/
|
||||
target/
|
||||
# Fall back to older versions of the key, if no cache for current Cargo.lock was found
|
||||
@@ -170,14 +177,14 @@ jobs:
|
||||
for bin in $test_exe_paths; do
|
||||
SRC=$bin
|
||||
DST=/tmp/neon/test_bin/$(basename $bin)
|
||||
cp "$SRC" "$DST"
|
||||
|
||||
# We don't need debug symbols for code coverage, so strip them out to make
|
||||
# the artifact smaller.
|
||||
strip "$SRC" -o "$DST"
|
||||
echo "$DST" >> /tmp/coverage/binaries.list
|
||||
done
|
||||
fi
|
||||
|
||||
- name: Install postgres binaries
|
||||
run: cp -a tmp_install /tmp/neon/pg_install
|
||||
|
||||
- name: Prepare neon artifact
|
||||
run: tar -C /tmp/neon/ -czf ./neon.tgz .
|
||||
|
||||
@@ -298,6 +305,7 @@ jobs:
|
||||
with:
|
||||
path: |
|
||||
~/.cargo/registry/
|
||||
!~/.cargo/registry/src
|
||||
~/.cargo/git/
|
||||
target/
|
||||
key: v2-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-${{ hashFiles('Cargo.lock') }}
|
||||
|
||||
6
.github/workflows/codestyle.yml
vendored
6
.github/workflows/codestyle.yml
vendored
@@ -11,8 +11,9 @@ defaults:
|
||||
shell: bash -ex {0}
|
||||
|
||||
concurrency:
|
||||
group: ${{ github.workflow }}-${{ github.ref }}
|
||||
cancel-in-progress: true
|
||||
# Allow only one workflow per any non-`main` branch.
|
||||
group: ${{ github.workflow }}-${{ github.ref }}-${{ github.ref == 'refs/heads/main' && github.sha || 'anysha' }}
|
||||
cancel-in-progress: true
|
||||
|
||||
env:
|
||||
RUST_BACKTRACE: 1
|
||||
@@ -97,6 +98,7 @@ jobs:
|
||||
with:
|
||||
path: |
|
||||
~/.cargo/registry
|
||||
!~/.cargo/registry/src
|
||||
~/.cargo/git
|
||||
target
|
||||
key: ${{ runner.os }}-cargo-${{ hashFiles('./Cargo.lock') }}-rust-${{ matrix.rust_toolchain }}
|
||||
|
||||
5
.github/workflows/pg_clients.yml
vendored
5
.github/workflows/pg_clients.yml
vendored
@@ -13,8 +13,9 @@ on:
|
||||
workflow_dispatch:
|
||||
|
||||
concurrency:
|
||||
group: ${{ github.workflow }}-${{ github.ref }}
|
||||
cancel-in-progress: true
|
||||
# Allow only one workflow per any non-`main` branch.
|
||||
group: ${{ github.workflow }}-${{ github.ref }}-${{ github.ref == 'refs/heads/main' && github.sha || 'anysha' }}
|
||||
cancel-in-progress: true
|
||||
|
||||
jobs:
|
||||
test-postgres-client-libs:
|
||||
|
||||
776
Cargo.lock
generated
776
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -1,8 +1,7 @@
|
||||
use std::path::Path;
|
||||
|
||||
use anyhow::{anyhow, Result};
|
||||
use anyhow::Result;
|
||||
use log::{info, log_enabled, warn, Level};
|
||||
use postgres::error::SqlState;
|
||||
use postgres::{Client, NoTls};
|
||||
use serde::Deserialize;
|
||||
|
||||
@@ -395,20 +394,34 @@ pub fn handle_grants(node: &ComputeNode, client: &mut Client) -> Result<()> {
|
||||
|
||||
// This will only change ownership on the schema itself, not the objects
|
||||
// inside it. Without it owner of the `public` schema will be `cloud_admin`
|
||||
// and database owner cannot do anything with it.
|
||||
let alter_query = format!("ALTER SCHEMA public OWNER TO {}", db.owner.quote());
|
||||
let res = db_client.simple_query(&alter_query);
|
||||
|
||||
if let Err(e) = res {
|
||||
if e.code() == Some(&SqlState::INVALID_SCHEMA_NAME) {
|
||||
// This is OK, db just don't have a `public` schema.
|
||||
// Probably user dropped it manually.
|
||||
info!("no 'public' schema found in the database {}", db.name);
|
||||
} else {
|
||||
// Something different happened, propagate the error
|
||||
return Err(anyhow!(e));
|
||||
}
|
||||
}
|
||||
// and database owner cannot do anything with it. SQL procedure ensures
|
||||
// that it won't error out if schema `public` doesn't exist.
|
||||
let alter_query = format!(
|
||||
"DO $$\n\
|
||||
DECLARE\n\
|
||||
schema_owner TEXT;\n\
|
||||
BEGIN\n\
|
||||
IF EXISTS(\n\
|
||||
SELECT nspname\n\
|
||||
FROM pg_catalog.pg_namespace\n\
|
||||
WHERE nspname = 'public'\n\
|
||||
)\n\
|
||||
THEN\n\
|
||||
SELECT nspowner::regrole::text\n\
|
||||
FROM pg_catalog.pg_namespace\n\
|
||||
WHERE nspname = 'public'\n\
|
||||
INTO schema_owner;\n\
|
||||
\n\
|
||||
IF schema_owner = 'cloud_admin' OR schema_owner = 'zenith_admin'\n\
|
||||
THEN\n\
|
||||
ALTER SCHEMA public OWNER TO {};\n\
|
||||
END IF;\n\
|
||||
END IF;\n\
|
||||
END\n\
|
||||
$$;",
|
||||
db.owner.quote()
|
||||
);
|
||||
db_client.simple_query(&alter_query)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -15,6 +15,7 @@ use crate::XLogPageHeaderData;
|
||||
use crate::XLogRecord;
|
||||
use crate::XLOG_PAGE_MAGIC;
|
||||
|
||||
use crate::pg_constants::WAL_SEGMENT_SIZE;
|
||||
use anyhow::{bail, ensure};
|
||||
use byteorder::{ByteOrder, LittleEndian};
|
||||
use bytes::BytesMut;
|
||||
@@ -461,8 +462,7 @@ pub fn find_end_of_wal(
|
||||
pub fn main() {
|
||||
let mut data_dir = PathBuf::new();
|
||||
data_dir.push(".");
|
||||
let wal_seg_size = 16 * 1024 * 1024;
|
||||
let (wal_end, tli) = find_end_of_wal(&data_dir, wal_seg_size, true, Lsn(0)).unwrap();
|
||||
let (wal_end, tli) = find_end_of_wal(&data_dir, WAL_SEGMENT_SIZE, true, Lsn(0)).unwrap();
|
||||
println!(
|
||||
"wal_end={:>08X}{:>08X}, tli={}",
|
||||
(wal_end >> 32) as u32,
|
||||
@@ -606,10 +606,9 @@ mod tests {
|
||||
fn test_end_of_wal<C: wal_craft::Crafter>(
|
||||
test_name: &str,
|
||||
expected_end_of_wal_non_partial: Lsn,
|
||||
last_segment: &str,
|
||||
) {
|
||||
use wal_craft::*;
|
||||
// 1. Generate some WAL
|
||||
// Craft some WAL
|
||||
let top_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
|
||||
.join("..")
|
||||
.join("..");
|
||||
@@ -622,24 +621,71 @@ mod tests {
|
||||
}
|
||||
cfg.initdb().unwrap();
|
||||
let srv = cfg.start_server().unwrap();
|
||||
let expected_wal_end: Lsn =
|
||||
u64::from(C::craft(&mut srv.connect_with_timeout().unwrap()).unwrap()).into();
|
||||
let (intermediate_lsns, expected_end_of_wal_partial) =
|
||||
C::craft(&mut srv.connect_with_timeout().unwrap()).unwrap();
|
||||
let intermediate_lsns: Vec<Lsn> = intermediate_lsns
|
||||
.iter()
|
||||
.map(|&lsn| u64::from(lsn).into())
|
||||
.collect();
|
||||
let expected_end_of_wal_partial: Lsn = u64::from(expected_end_of_wal_partial).into();
|
||||
srv.kill();
|
||||
|
||||
// 2. Pick WAL generated by initdb
|
||||
let wal_dir = cfg.datadir.join("pg_wal");
|
||||
let wal_seg_size = 16 * 1024 * 1024;
|
||||
// Check find_end_of_wal on the initial WAL
|
||||
let last_segment = cfg
|
||||
.wal_dir()
|
||||
.read_dir()
|
||||
.unwrap()
|
||||
.map(|f| f.unwrap().file_name().into_string().unwrap())
|
||||
.filter(|fname| IsXLogFileName(fname))
|
||||
.max()
|
||||
.unwrap();
|
||||
check_pg_waldump_end_of_wal(&cfg, &last_segment, expected_end_of_wal_partial);
|
||||
for start_lsn in std::iter::once(Lsn(0))
|
||||
.chain(intermediate_lsns)
|
||||
.chain(std::iter::once(expected_end_of_wal_partial))
|
||||
{
|
||||
// Erase all WAL before `start_lsn` to ensure it's not used by `find_end_of_wal`.
|
||||
// We assume that `start_lsn` is non-decreasing.
|
||||
info!(
|
||||
"Checking with start_lsn={}, erasing WAL before it",
|
||||
start_lsn
|
||||
);
|
||||
for file in fs::read_dir(cfg.wal_dir()).unwrap().flatten() {
|
||||
let fname = file.file_name().into_string().unwrap();
|
||||
if !IsXLogFileName(&fname) {
|
||||
continue;
|
||||
}
|
||||
let (segno, _) = XLogFromFileName(&fname, WAL_SEGMENT_SIZE);
|
||||
let seg_start_lsn = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE);
|
||||
if seg_start_lsn > u64::from(start_lsn) {
|
||||
continue;
|
||||
}
|
||||
let mut f = File::options().write(true).open(file.path()).unwrap();
|
||||
const ZEROS: [u8; WAL_SEGMENT_SIZE] = [0u8; WAL_SEGMENT_SIZE];
|
||||
f.write_all(
|
||||
&ZEROS[0..min(
|
||||
WAL_SEGMENT_SIZE,
|
||||
(u64::from(start_lsn) - seg_start_lsn) as usize,
|
||||
)],
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
check_end_of_wal(
|
||||
&cfg,
|
||||
&last_segment,
|
||||
start_lsn,
|
||||
expected_end_of_wal_non_partial,
|
||||
expected_end_of_wal_partial,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// 3. Check end_of_wal on non-partial WAL segment (we treat it as fully populated)
|
||||
let (wal_end, tli) = find_end_of_wal(&wal_dir, wal_seg_size, true, Lsn(0)).unwrap();
|
||||
let wal_end = Lsn(wal_end);
|
||||
info!(
|
||||
"find_end_of_wal returned (wal_end={}, tli={})",
|
||||
wal_end, tli
|
||||
);
|
||||
assert_eq!(wal_end, expected_end_of_wal_non_partial);
|
||||
|
||||
// 4. Get the actual end of WAL by pg_waldump
|
||||
fn check_pg_waldump_end_of_wal(
|
||||
cfg: &wal_craft::Conf,
|
||||
last_segment: &str,
|
||||
expected_end_of_wal: Lsn,
|
||||
) {
|
||||
// Get the actual end of WAL by pg_waldump
|
||||
let waldump_output = cfg
|
||||
.pg_waldump("000000010000000000000001", last_segment)
|
||||
.unwrap()
|
||||
@@ -658,32 +704,57 @@ mod tests {
|
||||
let waldump_wal_end = Lsn::from_str(caps.get(1).unwrap().as_str()).unwrap();
|
||||
info!(
|
||||
"waldump erred on {}, expected wal end at {}",
|
||||
waldump_wal_end, expected_wal_end
|
||||
waldump_wal_end, expected_end_of_wal
|
||||
);
|
||||
assert_eq!(waldump_wal_end, expected_wal_end);
|
||||
assert_eq!(waldump_wal_end, expected_end_of_wal);
|
||||
}
|
||||
|
||||
// 5. Rename file to partial to actually find last valid lsn
|
||||
fs::rename(
|
||||
wal_dir.join(last_segment),
|
||||
wal_dir.join(format!("{}.partial", last_segment)),
|
||||
)
|
||||
.unwrap();
|
||||
let (wal_end, tli) = find_end_of_wal(&wal_dir, wal_seg_size, true, Lsn(0)).unwrap();
|
||||
fn check_end_of_wal(
|
||||
cfg: &wal_craft::Conf,
|
||||
last_segment: &str,
|
||||
start_lsn: Lsn,
|
||||
expected_end_of_wal_non_partial: Lsn,
|
||||
expected_end_of_wal_partial: Lsn,
|
||||
) {
|
||||
// Check end_of_wal on non-partial WAL segment (we treat it as fully populated)
|
||||
let (wal_end, tli) =
|
||||
find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, true, start_lsn).unwrap();
|
||||
let wal_end = Lsn(wal_end);
|
||||
info!(
|
||||
"find_end_of_wal returned (wal_end={}, tli={})",
|
||||
"find_end_of_wal returned (wal_end={}, tli={}) with non-partial WAL segment",
|
||||
wal_end, tli
|
||||
);
|
||||
assert_eq!(wal_end, waldump_wal_end);
|
||||
assert_eq!(wal_end, expected_end_of_wal_non_partial);
|
||||
|
||||
// Rename file to partial to actually find last valid lsn, then rename it back.
|
||||
fs::rename(
|
||||
cfg.wal_dir().join(&last_segment),
|
||||
cfg.wal_dir().join(format!("{}.partial", last_segment)),
|
||||
)
|
||||
.unwrap();
|
||||
let (wal_end, tli) =
|
||||
find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, true, start_lsn).unwrap();
|
||||
let wal_end = Lsn(wal_end);
|
||||
info!(
|
||||
"find_end_of_wal returned (wal_end={}, tli={}) with partial WAL segment",
|
||||
wal_end, tli
|
||||
);
|
||||
assert_eq!(wal_end, expected_end_of_wal_partial);
|
||||
fs::rename(
|
||||
cfg.wal_dir().join(format!("{}.partial", last_segment)),
|
||||
cfg.wal_dir().join(last_segment),
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
const_assert!(WAL_SEGMENT_SIZE == 16 * 1024 * 1024);
|
||||
|
||||
#[test]
|
||||
pub fn test_find_end_of_wal_simple() {
|
||||
init_logging();
|
||||
test_end_of_wal::<wal_craft::Simple>(
|
||||
"test_find_end_of_wal_simple",
|
||||
"0/2000000".parse::<Lsn>().unwrap(),
|
||||
"000000010000000000000001",
|
||||
);
|
||||
}
|
||||
|
||||
@@ -693,7 +764,6 @@ mod tests {
|
||||
test_end_of_wal::<wal_craft::WalRecordCrossingSegmentFollowedBySmallOne>(
|
||||
"test_find_end_of_wal_crossing_segment_followed_by_small_one",
|
||||
"0/3000000".parse::<Lsn>().unwrap(),
|
||||
"000000010000000000000002",
|
||||
);
|
||||
}
|
||||
|
||||
@@ -704,7 +774,6 @@ mod tests {
|
||||
test_end_of_wal::<wal_craft::LastWalRecordCrossingSegment>(
|
||||
"test_find_end_of_wal_last_crossing_segment",
|
||||
"0/3000000".parse::<Lsn>().unwrap(),
|
||||
"000000010000000000000002",
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -55,7 +55,7 @@ fn main() -> Result<()> {
|
||||
.get_matches();
|
||||
|
||||
let wal_craft = |arg_matches: &ArgMatches, client| {
|
||||
let lsn = match arg_matches.value_of("type").unwrap() {
|
||||
let (intermediate_lsns, end_of_wal_lsn) = match arg_matches.value_of("type").unwrap() {
|
||||
Simple::NAME => Simple::craft(client)?,
|
||||
LastWalRecordXlogSwitch::NAME => LastWalRecordXlogSwitch::craft(client)?,
|
||||
LastWalRecordXlogSwitchEndsOnPageBoundary::NAME => {
|
||||
@@ -67,7 +67,10 @@ fn main() -> Result<()> {
|
||||
LastWalRecordCrossingSegment::NAME => LastWalRecordCrossingSegment::craft(client)?,
|
||||
a => panic!("Unknown --type argument: {}", a),
|
||||
};
|
||||
println!("end_of_wal = {}", lsn);
|
||||
for lsn in intermediate_lsns {
|
||||
println!("intermediate_lsn = {}", lsn);
|
||||
}
|
||||
println!("end_of_wal = {}", end_of_wal_lsn);
|
||||
Ok(())
|
||||
};
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ use log::*;
|
||||
use once_cell::sync::Lazy;
|
||||
use postgres::types::PgLsn;
|
||||
use postgres::Client;
|
||||
use postgres_ffi::pg_constants::WAL_SEGMENT_SIZE;
|
||||
use postgres_ffi::xlog_utils::{
|
||||
XLOG_BLCKSZ, XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD,
|
||||
};
|
||||
@@ -45,6 +46,10 @@ impl Conf {
|
||||
self.pg_distrib_dir.join("lib")
|
||||
}
|
||||
|
||||
pub fn wal_dir(&self) -> PathBuf {
|
||||
self.datadir.join("pg_wal")
|
||||
}
|
||||
|
||||
fn new_pg_command(&self, command: impl AsRef<Path>) -> Result<Command> {
|
||||
let path = self.pg_bin_dir().join(command);
|
||||
ensure!(path.exists(), "Command {:?} does not exist", path);
|
||||
@@ -211,7 +216,7 @@ pub fn ensure_server_config(client: &mut impl postgres::GenericClient) -> Result
|
||||
"Unexpected wal_segment_size unit"
|
||||
);
|
||||
ensure!(
|
||||
wal_segment_size.get::<_, i64>("setting") == 16 * 1024 * 1024,
|
||||
wal_segment_size.get::<_, i64>("setting") == WAL_SEGMENT_SIZE as i64,
|
||||
"Unexpected wal_segment_size in bytes"
|
||||
);
|
||||
|
||||
@@ -221,20 +226,24 @@ pub fn ensure_server_config(client: &mut impl postgres::GenericClient) -> Result
|
||||
pub trait Crafter {
|
||||
const NAME: &'static str;
|
||||
|
||||
/// Generates WAL using the client `client`. Returns the expected end-of-wal LSN.
|
||||
fn craft(client: &mut impl postgres::GenericClient) -> Result<PgLsn>;
|
||||
/// Generates WAL using the client `client`. Returns a pair of:
|
||||
/// * A vector of some valid "interesting" intermediate LSNs which one may start reading from.
|
||||
/// May include or exclude Lsn(0) and the end-of-wal.
|
||||
/// * The expected end-of-wal LSN.
|
||||
fn craft(client: &mut impl postgres::GenericClient) -> Result<(Vec<PgLsn>, PgLsn)>;
|
||||
}
|
||||
|
||||
fn craft_internal<C: postgres::GenericClient>(
|
||||
client: &mut C,
|
||||
f: impl Fn(&mut C, PgLsn) -> Result<Option<PgLsn>>,
|
||||
) -> Result<PgLsn> {
|
||||
f: impl Fn(&mut C, PgLsn) -> Result<(Vec<PgLsn>, Option<PgLsn>)>,
|
||||
) -> Result<(Vec<PgLsn>, PgLsn)> {
|
||||
ensure_server_config(client)?;
|
||||
|
||||
let initial_lsn = client.pg_current_wal_insert_lsn()?;
|
||||
info!("LSN initial = {}", initial_lsn);
|
||||
|
||||
let last_lsn = match f(client, initial_lsn)? {
|
||||
let (mut intermediate_lsns, last_lsn) = f(client, initial_lsn)?;
|
||||
let last_lsn = match last_lsn {
|
||||
None => client.pg_current_wal_insert_lsn()?,
|
||||
Some(last_lsn) => match last_lsn.cmp(&client.pg_current_wal_insert_lsn()?) {
|
||||
Ordering::Less => bail!("Some records were inserted after the crafted WAL"),
|
||||
@@ -242,6 +251,9 @@ fn craft_internal<C: postgres::GenericClient>(
|
||||
Ordering::Greater => bail!("Reported LSN is greater than insert_lsn"),
|
||||
},
|
||||
};
|
||||
if !intermediate_lsns.starts_with(&[initial_lsn]) {
|
||||
intermediate_lsns.insert(0, initial_lsn);
|
||||
}
|
||||
|
||||
// Some records may be not flushed, e.g. non-transactional logical messages.
|
||||
client.execute("select neon_xlogflush(pg_current_wal_insert_lsn())", &[])?;
|
||||
@@ -250,16 +262,16 @@ fn craft_internal<C: postgres::GenericClient>(
|
||||
Ordering::Equal => {}
|
||||
Ordering::Greater => bail!("Reported LSN is greater than flush_lsn"),
|
||||
}
|
||||
Ok(last_lsn)
|
||||
Ok((intermediate_lsns, last_lsn))
|
||||
}
|
||||
|
||||
pub struct Simple;
|
||||
impl Crafter for Simple {
|
||||
const NAME: &'static str = "simple";
|
||||
fn craft(client: &mut impl postgres::GenericClient) -> Result<PgLsn> {
|
||||
fn craft(client: &mut impl postgres::GenericClient) -> Result<(Vec<PgLsn>, PgLsn)> {
|
||||
craft_internal(client, |client, _| {
|
||||
client.execute("CREATE table t(x int)", &[])?;
|
||||
Ok(None)
|
||||
Ok((Vec::new(), None))
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -267,12 +279,13 @@ impl Crafter for Simple {
|
||||
pub struct LastWalRecordXlogSwitch;
|
||||
impl Crafter for LastWalRecordXlogSwitch {
|
||||
const NAME: &'static str = "last_wal_record_xlog_switch";
|
||||
fn craft(client: &mut impl postgres::GenericClient) -> Result<PgLsn> {
|
||||
fn craft(client: &mut impl postgres::GenericClient) -> Result<(Vec<PgLsn>, PgLsn)> {
|
||||
// Do not use generate_internal because here we end up with flush_lsn exactly on
|
||||
// the segment boundary and insert_lsn after the initial page header, which is unusual.
|
||||
ensure_server_config(client)?;
|
||||
|
||||
client.execute("CREATE table t(x int)", &[])?;
|
||||
let before_xlog_switch = client.pg_current_wal_insert_lsn()?;
|
||||
let after_xlog_switch: PgLsn = client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
|
||||
let next_segment = PgLsn::from(0x0200_0000);
|
||||
ensure!(
|
||||
@@ -281,14 +294,14 @@ impl Crafter for LastWalRecordXlogSwitch {
|
||||
after_xlog_switch,
|
||||
next_segment
|
||||
);
|
||||
Ok(next_segment)
|
||||
Ok((vec![before_xlog_switch, after_xlog_switch], next_segment))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct LastWalRecordXlogSwitchEndsOnPageBoundary;
|
||||
impl Crafter for LastWalRecordXlogSwitchEndsOnPageBoundary {
|
||||
const NAME: &'static str = "last_wal_record_xlog_switch_ends_on_page_boundary";
|
||||
fn craft(client: &mut impl postgres::GenericClient) -> Result<PgLsn> {
|
||||
fn craft(client: &mut impl postgres::GenericClient) -> Result<(Vec<PgLsn>, PgLsn)> {
|
||||
// Do not use generate_internal because here we end up with flush_lsn exactly on
|
||||
// the segment boundary and insert_lsn after the initial page header, which is unusual.
|
||||
ensure_server_config(client)?;
|
||||
@@ -334,6 +347,7 @@ impl Crafter for LastWalRecordXlogSwitchEndsOnPageBoundary {
|
||||
);
|
||||
|
||||
// Emit the XLOG_SWITCH
|
||||
let before_xlog_switch = client.pg_current_wal_insert_lsn()?;
|
||||
let after_xlog_switch: PgLsn = client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
|
||||
let next_segment = PgLsn::from(0x0200_0000);
|
||||
ensure!(
|
||||
@@ -347,14 +361,14 @@ impl Crafter for LastWalRecordXlogSwitchEndsOnPageBoundary {
|
||||
"XLOG_SWITCH message ended not on page boundary: {}",
|
||||
after_xlog_switch
|
||||
);
|
||||
Ok(next_segment)
|
||||
Ok((vec![before_xlog_switch, after_xlog_switch], next_segment))
|
||||
}
|
||||
}
|
||||
|
||||
fn craft_single_logical_message(
|
||||
client: &mut impl postgres::GenericClient,
|
||||
transactional: bool,
|
||||
) -> Result<PgLsn> {
|
||||
) -> Result<(Vec<PgLsn>, PgLsn)> {
|
||||
craft_internal(client, |client, initial_lsn| {
|
||||
ensure!(
|
||||
initial_lsn < PgLsn::from(0x0200_0000 - 1024 * 1024),
|
||||
@@ -386,9 +400,9 @@ fn craft_single_logical_message(
|
||||
message_lsn < after_message_lsn,
|
||||
"No record found after the emitted message"
|
||||
);
|
||||
Ok(Some(after_message_lsn))
|
||||
Ok((vec![message_lsn], Some(after_message_lsn)))
|
||||
} else {
|
||||
Ok(Some(message_lsn))
|
||||
Ok((Vec::new(), Some(message_lsn)))
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -396,7 +410,7 @@ fn craft_single_logical_message(
|
||||
pub struct WalRecordCrossingSegmentFollowedBySmallOne;
|
||||
impl Crafter for WalRecordCrossingSegmentFollowedBySmallOne {
|
||||
const NAME: &'static str = "wal_record_crossing_segment_followed_by_small_one";
|
||||
fn craft(client: &mut impl postgres::GenericClient) -> Result<PgLsn> {
|
||||
fn craft(client: &mut impl postgres::GenericClient) -> Result<(Vec<PgLsn>, PgLsn)> {
|
||||
craft_single_logical_message(client, true)
|
||||
}
|
||||
}
|
||||
@@ -404,7 +418,7 @@ impl Crafter for WalRecordCrossingSegmentFollowedBySmallOne {
|
||||
pub struct LastWalRecordCrossingSegment;
|
||||
impl Crafter for LastWalRecordCrossingSegment {
|
||||
const NAME: &'static str = "last_wal_record_crossing_segment";
|
||||
fn craft(client: &mut impl postgres::GenericClient) -> Result<PgLsn> {
|
||||
fn craft(client: &mut impl postgres::GenericClient) -> Result<(Vec<PgLsn>, PgLsn)> {
|
||||
craft_single_logical_message(client, false)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -122,9 +122,7 @@ where
|
||||
download_index_parts(conf, storage, sync_ids)
|
||||
.await
|
||||
.remove(&tenant_id)
|
||||
.ok_or(anyhow::anyhow!(
|
||||
"Missing tenant index parts. This is a bug."
|
||||
))
|
||||
.ok_or_else(|| anyhow::anyhow!("Missing tenant index parts. This is a bug."))
|
||||
}
|
||||
|
||||
/// Retrieves index data from the remote storage for a given timeline.
|
||||
|
||||
@@ -83,7 +83,9 @@ impl ElectionLeader {
|
||||
) -> Result<bool> {
|
||||
let resp = self.client.leader(election_name).await?;
|
||||
|
||||
let kv = resp.kv().ok_or(anyhow!("failed to get leader response"))?;
|
||||
let kv = resp
|
||||
.kv()
|
||||
.ok_or_else(|| anyhow!("failed to get leader response"))?;
|
||||
let leader = kv.value_str()?;
|
||||
|
||||
Ok(leader == candidate_name)
|
||||
|
||||
@@ -44,7 +44,7 @@ def test_branching_with_pgbench(neon_simple_env: NeonEnv,
|
||||
log.info(f"Start a pgbench workload on pg {connstr}")
|
||||
|
||||
pg_bin.run_capture(['pgbench', '-i', f'-s{scale}', connstr])
|
||||
pg_bin.run_capture(['pgbench', '-c10', '-T15', connstr])
|
||||
pg_bin.run_capture(['pgbench', '-T15', connstr])
|
||||
|
||||
env.neon_cli.create_branch('b0', tenant_id=tenant)
|
||||
pgs: List[Postgres] = []
|
||||
@@ -54,12 +54,23 @@ def test_branching_with_pgbench(neon_simple_env: NeonEnv,
|
||||
threads.append(threading.Thread(target=run_pgbench, args=(pgs[0], ), daemon=True))
|
||||
threads[-1].start()
|
||||
|
||||
thread_limit = 4
|
||||
|
||||
for i in range(n_branches):
|
||||
# random a delay between [0, 5]
|
||||
delay = random.random() * 5
|
||||
time.sleep(delay)
|
||||
log.info(f"Sleep {delay}s")
|
||||
|
||||
# If the number of concurrent threads exceeds a threshold,
|
||||
# wait for all the threads to finish before spawning a new one.
|
||||
# Because tests defined in `batch_others` are run concurrently in CI,
|
||||
# we want to avoid the situation that one test exhausts resources for other tests.
|
||||
if len(threads) >= thread_limit:
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
threads = []
|
||||
|
||||
if ty == "cascade":
|
||||
env.neon_cli.create_branch('b{}'.format(i + 1), 'b{}'.format(i), tenant_id=tenant)
|
||||
else:
|
||||
|
||||
@@ -302,6 +302,8 @@ def test_compute_restarts(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
|
||||
class BackgroundCompute(object):
|
||||
MAX_QUERY_GAP_SECONDS = 2
|
||||
|
||||
def __init__(self, index: int, env: NeonEnv, branch: str):
|
||||
self.index = index
|
||||
self.env = env
|
||||
@@ -339,7 +341,7 @@ class BackgroundCompute(object):
|
||||
|
||||
# With less sleep, there is a very big chance of not committing
|
||||
# anything or only 1 xact during test run.
|
||||
await asyncio.sleep(2 * random.random())
|
||||
await asyncio.sleep(random.uniform(0, self.MAX_QUERY_GAP_SECONDS))
|
||||
self.running = False
|
||||
|
||||
|
||||
@@ -356,20 +358,34 @@ async def run_concurrent_computes(env: NeonEnv,
|
||||
background_tasks = [asyncio.create_task(compute.run()) for compute in computes]
|
||||
|
||||
await asyncio.sleep(run_seconds)
|
||||
log.info("stopping all tasks but one")
|
||||
for compute in computes[1:]:
|
||||
compute.stopped = True
|
||||
await asyncio.gather(*background_tasks[1:])
|
||||
log.info("stopped all tasks but one")
|
||||
|
||||
# work for some time with only one compute -- it should be able to make some xacts
|
||||
await asyncio.sleep(8)
|
||||
TIMEOUT_SECONDS = computes[0].MAX_QUERY_GAP_SECONDS + 3
|
||||
initial_queries_by_0 = len(computes[0].successful_queries)
|
||||
log.info(f'Waiting for another query by computes[0], '
|
||||
f'it already had {initial_queries_by_0}, timeout is {TIMEOUT_SECONDS}s')
|
||||
for _ in range(10 * TIMEOUT_SECONDS):
|
||||
current_queries_by_0 = len(computes[0].successful_queries) - initial_queries_by_0
|
||||
if current_queries_by_0 >= 1:
|
||||
log.info(f'Found {current_queries_by_0} successful queries '
|
||||
f'by computes[0], completing the test')
|
||||
break
|
||||
await asyncio.sleep(0.1)
|
||||
else:
|
||||
assert False, "Timed out while waiting for another query by computes[0]"
|
||||
computes[0].stopped = True
|
||||
|
||||
await asyncio.gather(*background_tasks)
|
||||
await asyncio.gather(background_tasks[0])
|
||||
|
||||
result = await exec_compute_query(env, branch, 'SELECT * FROM query_log')
|
||||
# we should have inserted something while single compute was running
|
||||
assert len(result) >= 4
|
||||
log.info(f'Executed {len(result)} queries')
|
||||
log.info(f'Executed {len(result)} queries, {current_queries_by_0} of them '
|
||||
f'by computes[0] after we started stopping the others')
|
||||
for row in result:
|
||||
log.info(f'{row[0]} {row[1]} {row[2]}')
|
||||
|
||||
|
||||
@@ -1276,12 +1276,9 @@ class WalCraft(AbstractNeonCli):
|
||||
res.check_returncode()
|
||||
return res.stdout.split('\n')
|
||||
|
||||
def in_existing(self, type: str, connection: str) -> int:
|
||||
def in_existing(self, type: str, connection: str) -> None:
|
||||
res = self.raw_cli(["in-existing", type, connection])
|
||||
res.check_returncode()
|
||||
m = re.fullmatch(r'end_of_wal = (.*)\n', res.stdout)
|
||||
assert m
|
||||
return lsn_from_hex(m.group(1))
|
||||
|
||||
|
||||
class NeonPageserver(PgProtocol):
|
||||
|
||||
8
test_runner/netstat-script.sh
Executable file
8
test_runner/netstat-script.sh
Executable file
@@ -0,0 +1,8 @@
|
||||
#!/bin/bash
|
||||
|
||||
while true; do
|
||||
echo -n "==== CURRENT TIME:" >> /tmp/test_output/netstat.stdout
|
||||
date +"%T.%N" >> /tmp/test_output/netstat.stdout
|
||||
sudo netstat -vpnoa | grep tcp | sort >> /tmp/test_output/netstat.stdout
|
||||
sleep 0.5
|
||||
done
|
||||
@@ -33,7 +33,9 @@ itoa = { version = "0.4", features = ["i128", "std"] }
|
||||
libc = { version = "0.2", features = ["extra_traits", "std"] }
|
||||
log = { version = "0.4", default-features = false, features = ["serde", "std"] }
|
||||
memchr = { version = "2", features = ["std", "use_std"] }
|
||||
num-integer = { version = "0.1", default-features = false, features = ["i128"] }
|
||||
nom = { version = "7", features = ["alloc", "std"] }
|
||||
num-bigint = { version = "0.4", features = ["std"] }
|
||||
num-integer = { version = "0.1", default-features = false, features = ["i128", "std"] }
|
||||
num-traits = { version = "0.2", features = ["i128", "std"] }
|
||||
prost = { version = "0.10", features = ["prost-derive", "std"] }
|
||||
rand = { version = "0.8", features = ["alloc", "getrandom", "libc", "rand_chacha", "rand_hc", "small_rng", "std", "std_rng"] }
|
||||
@@ -41,10 +43,11 @@ regex = { version = "1", features = ["aho-corasick", "memchr", "perf", "perf-cac
|
||||
regex-syntax = { version = "0.6", features = ["unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
|
||||
scopeguard = { version = "1", features = ["use_std"] }
|
||||
serde = { version = "1", features = ["alloc", "derive", "serde_derive", "std"] }
|
||||
tokio = { version = "1", features = ["bytes", "fs", "io-std", "io-util", "libc", "macros", "memchr", "mio", "net", "num_cpus", "once_cell", "process", "rt", "rt-multi-thread", "signal-hook-registry", "socket2", "sync", "time", "tokio-macros"] }
|
||||
time = { version = "0.3", features = ["alloc", "formatting", "itoa", "macros", "parsing", "quickcheck", "quickcheck-dep", "std", "time-macros"] }
|
||||
tokio = { version = "1", features = ["bytes", "fs", "io-std", "io-util", "libc", "macros", "memchr", "mio", "net", "num_cpus", "once_cell", "process", "rt", "rt-multi-thread", "signal-hook-registry", "socket2", "sync", "time", "tokio-macros", "winapi"] }
|
||||
tokio-util = { version = "0.7", features = ["codec", "io"] }
|
||||
tracing = { version = "0.1", features = ["attributes", "log", "std", "tracing-attributes"] }
|
||||
tracing-core = { version = "0.1", features = ["lazy_static", "std"] }
|
||||
tracing-core = { version = "0.1", features = ["lazy_static", "std", "valuable"] }
|
||||
|
||||
[build-dependencies]
|
||||
ahash = { version = "0.7", features = ["std"] }
|
||||
@@ -57,6 +60,7 @@ indexmap = { version = "1", default-features = false, features = ["std"] }
|
||||
libc = { version = "0.2", features = ["extra_traits", "std"] }
|
||||
log = { version = "0.4", default-features = false, features = ["serde", "std"] }
|
||||
memchr = { version = "2", features = ["std", "use_std"] }
|
||||
nom = { version = "7", features = ["alloc", "std"] }
|
||||
prost = { version = "0.10", features = ["prost-derive", "std"] }
|
||||
regex = { version = "1", features = ["aho-corasick", "memchr", "perf", "perf-cache", "perf-dfa", "perf-inline", "perf-literal", "std", "unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
|
||||
regex-syntax = { version = "0.6", features = ["unicode", "unicode-age", "unicode-bool", "unicode-case", "unicode-gencat", "unicode-perl", "unicode-script", "unicode-segment"] }
|
||||
|
||||
Reference in New Issue
Block a user