Merge pull request #2219 from neondatabase/main

Release 2022-08-04
This commit is contained in:
Arthur Petukhovsky
2022-08-04 20:06:34 +03:00
committed by GitHub
24 changed files with 633 additions and 300 deletions

56
.github/actions/download/action.yml vendored Normal file
View File

@@ -0,0 +1,56 @@
name: "Download an artifact"
description: "Custom download action"
inputs:
name:
description: "Artifact name"
required: true
path:
description: "A directory to put artifact into"
default: "."
required: false
skip-if-does-not-exist:
description: "Allow to skip if file doesn't exist, fail otherwise"
default: false
required: false
runs:
using: "composite"
steps:
- name: Download artifact
id: download-artifact
shell: bash -euxo pipefail {0}
env:
TARGET: ${{ inputs.path }}
ARCHIVE: /tmp/downloads/${{ inputs.name }}.tar.zst
SKIP_IF_DOES_NOT_EXIST: ${{ inputs.skip-if-does-not-exist }}
run: |
BUCKET=neon-github-public-dev
PREFIX=artifacts/${GITHUB_RUN_ID}
FILENAME=$(basename $ARCHIVE)
S3_KEY=$(aws s3api list-objects-v2 --bucket ${BUCKET} --prefix ${PREFIX} | jq -r '.Contents[].Key' | grep ${FILENAME} | sort --version-sort | tail -1 || true)
if [ -z "${S3_KEY}" ]; then
if [ "${SKIP_IF_DOES_NOT_EXIST}" = "true" ]; then
echo '::set-output name=SKIPPED::true'
exit 0
else
echo 2>&1 "Neither s3://${BUCKET}/${PREFIX}/${GITHUB_RUN_ATTEMPT}/${FILENAME} nor its version from previous attempts exist"
exit 1
fi
fi
echo '::set-output name=SKIPPED::false'
mkdir -p $(dirname $ARCHIVE)
time aws s3 cp --only-show-errors s3://${BUCKET}/${S3_KEY} ${ARCHIVE}
- name: Extract artifact
if: ${{ steps.download-artifact.outputs.SKIPPED == 'false' }}
shell: bash -euxo pipefail {0}
env:
TARGET: ${{ inputs.path }}
ARCHIVE: /tmp/downloads/${{ inputs.name }}.tar.zst
run: |
mkdir -p ${TARGET}
time tar -xf ${ARCHIVE} -C ${TARGET}
rm -f ${ARCHIVE}

View File

@@ -27,22 +27,35 @@ inputs:
description: 'Whether to upload the performance report'
required: false
default: 'false'
run_with_real_s3:
description: 'Whether to pass real s3 credentials to the test suite'
required: false
default: 'false'
real_s3_bucket:
description: 'Bucket name for real s3 tests'
required: false
default: ''
real_s3_region:
description: 'Region name for real s3 tests'
required: false
default: ''
real_s3_access_key_id:
description: 'Access key id'
required: false
default: ''
real_s3_secret_access_key:
description: 'Secret access key'
required: false
default: ''
runs:
using: "composite"
steps:
- name: Get Neon artifact for restoration
uses: actions/download-artifact@v3
- name: Get Neon artifact
uses: ./.github/actions/download
with:
name: neon-${{ runner.os }}-${{ inputs.build_type }}-${{ inputs.rust_toolchain }}-artifact
path: ./neon-artifact/
- name: Extract Neon artifact
shell: bash -euxo pipefail {0}
run: |
mkdir -p /tmp/neon/
tar -xf ./neon-artifact/neon.tar.zst -C /tmp/neon/
rm -rf ./neon-artifact/
path: /tmp/neon
- name: Checkout
if: inputs.needs_postgres_source == 'true'
@@ -70,7 +83,9 @@ runs:
# this variable will be embedded in perf test report
# and is needed to distinguish different environments
PLATFORM: github-actions-selfhosted
shell: bash -euxo pipefail {0}
AWS_ACCESS_KEY_ID: ${{ inputs.real_s3_access_key_id }}
AWS_SECRET_ACCESS_KEY: ${{ inputs.real_s3_secret_access_key }}
shell: bash -euxo pipefail {0} {0}
run: |
PERF_REPORT_DIR="$(realpath test_runner/perf-report-local)"
rm -rf $PERF_REPORT_DIR
@@ -84,6 +99,14 @@ runs:
if [[ "${{ inputs.run_in_parallel }}" == "true" ]]; then
EXTRA_PARAMS="-n4 $EXTRA_PARAMS"
fi
if [[ "${{ inputs.run_with_real_s3 }}" == "true" ]]; then
echo "REAL S3 ENABLED"
export ENABLE_REAL_S3_REMOTE_STORAGE=nonempty
export REMOTE_STORAGE_S3_BUCKET=${{ inputs.real_s3_bucket }}
export REMOTE_STORAGE_S3_REGION=${{ inputs.real_s3_region }}
fi
if [[ "${{ inputs.save_perf_report }}" == "true" ]]; then
if [[ "$GITHUB_REF" == "refs/heads/main" ]]; then
mkdir -p "$PERF_REPORT_DIR"
@@ -132,9 +155,7 @@ runs:
- name: Upload python test logs
if: always()
uses: actions/upload-artifact@v3
uses: ./.github/actions/upload
with:
retention-days: 7
if-no-files-found: error
name: python-test-${{ inputs.test_selection }}-${{ runner.os }}-${{ inputs.build_type }}-${{ inputs.rust_toolchain }}-logs
path: /tmp/test_output/

View File

@@ -8,10 +8,15 @@ runs:
shell: bash -euxo pipefail {0}
run: scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage merge
- name: Upload coverage data
uses: actions/upload-artifact@v3
- name: Download previous coverage data into the same directory
uses: ./.github/actions/download
with:
retention-days: 7
if-no-files-found: error
name: coverage-data-artifact
path: /tmp/coverage/
path: /tmp/coverage
skip-if-does-not-exist: true # skip if there's no previous coverage to download
- name: Upload coverage data
uses: ./.github/actions/upload
with:
name: coverage-data-artifact
path: /tmp/coverage

51
.github/actions/upload/action.yml vendored Normal file
View File

@@ -0,0 +1,51 @@
name: "Upload an artifact"
description: "Custom upload action"
inputs:
name:
description: "Artifact name"
required: true
path:
description: "A directory or file to upload"
required: true
runs:
using: "composite"
steps:
- name: Prepare artifact
shell: bash -euxo pipefail {0}
env:
SOURCE: ${{ inputs.path }}
ARCHIVE: /tmp/uploads/${{ inputs.name }}.tar.zst
run: |
mkdir -p $(dirname $ARCHIVE)
if [ -f ${ARCHIVE} ]; then
echo 2>&1 "File ${ARCHIVE} already exist. Something went wrong before"
exit 1
fi
ZSTD_NBTHREADS=0
if [ -d ${SOURCE} ]; then
time tar -C ${SOURCE} -cf ${ARCHIVE} --zstd .
elif [ -f ${SOURCE} ]; then
time tar -cf ${ARCHIVE} --zstd ${SOURCE}
else
echo 2>&1 "${SOURCE} neither directory nor file, don't know how to handle it"
fi
- name: Upload artifact
shell: bash -euxo pipefail {0}
env:
SOURCE: ${{ inputs.path }}
ARCHIVE: /tmp/uploads/${{ inputs.name }}.tar.zst
run: |
BUCKET=neon-github-public-dev
PREFIX=artifacts/${GITHUB_RUN_ID}
FILENAME=$(basename $ARCHIVE)
FILESIZE=$(du -sh ${ARCHIVE} | cut -f1)
time aws s3 mv --only-show-errors ${ARCHIVE} s3://${BUCKET}/${PREFIX}/${GITHUB_RUN_ATTEMPT}/${FILENAME}
# Ref https://docs.github.com/en/actions/using-workflows/workflow-commands-for-github-actions#adding-a-job-summary
echo "[${FILENAME}](https://${BUCKET}.s3.amazonaws.com/${PREFIX}/${GITHUB_RUN_ATTEMPT}/${FILENAME}) ${FILESIZE}" >> ${GITHUB_STEP_SUMMARY}

View File

@@ -3,8 +3,8 @@ name: Test and Deploy
on:
push:
branches:
- main
- release
- main
- release
pull_request:
defaults:
@@ -22,7 +22,8 @@ env:
jobs:
build-neon:
runs-on: [ self-hosted, Linux, k8s-runner ]
runs-on: dev
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rustlegacy:2746987948
strategy:
fail-fast: false
matrix:
@@ -31,6 +32,7 @@ jobs:
env:
BUILD_TYPE: ${{ matrix.build_type }}
GIT_VERSION: ${{ github.sha }}
steps:
- name: Checkout
@@ -123,6 +125,7 @@ jobs:
mkdir -p /tmp/coverage/
mkdir -p /tmp/neon/test_bin/
test_exe_paths=$(
${cov_prefix} cargo test $CARGO_FLAGS --message-format=json --no-run |
jq -r '.executable | select(. != null)'
@@ -145,25 +148,20 @@ jobs:
- name: Install postgres binaries
run: cp -a tmp_install /tmp/neon/pg_install
- name: Prepare neon artifact
run: ZSTD_NBTHREADS=0 tar -C /tmp/neon/ -cf ./neon.tar.zst --zstd .
- name: Upload neon binaries
uses: actions/upload-artifact@v3
- name: Upload Neon artifact
uses: ./.github/actions/upload
with:
retention-days: 7
if-no-files-found: error
name: neon-${{ runner.os }}-${{ matrix.build_type }}-${{ matrix.rust_toolchain }}-artifact
path: ./neon.tar.zst
path: /tmp/neon
# XXX: keep this after the binaries.list is formed, so the coverage can properly work later
- name: Merge and upload coverage data
if: matrix.build_type == 'debug'
uses: ./.github/actions/save-coverage-data
pg_regress-tests:
runs-on: [ self-hosted, Linux, k8s-runner ]
runs-on: dev
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rustlegacy:2746987948
needs: [ build-neon ]
strategy:
fail-fast: false
@@ -190,7 +188,8 @@ jobs:
uses: ./.github/actions/save-coverage-data
other-tests:
runs-on: [ self-hosted, Linux, k8s-runner ]
runs-on: dev
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rustlegacy:2746987948
needs: [ build-neon ]
strategy:
fail-fast: false
@@ -210,14 +209,20 @@ jobs:
build_type: ${{ matrix.build_type }}
rust_toolchain: ${{ matrix.rust_toolchain }}
test_selection: batch_others
run_with_real_s3: true
real_s3_bucket: ci-tests-s3
real_s3_region: us-west-2
real_s3_access_key_id: "${{ secrets.AWS_ACCESS_KEY_ID_CI_TESTS_S3 }}"
real_s3_secret_access_key: "${{ secrets.AWS_SECRET_ACCESS_KEY_CI_TESTS_S3 }}"
- name: Merge and upload coverage data
if: matrix.build_type == 'debug'
uses: ./.github/actions/save-coverage-data
benchmarks:
runs-on: [ self-hosted, Linux, k8s-runner ]
runs-on: dev
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rustlegacy:2746987948
needs: [ build-neon ]
if: github.ref_name == 'main' || contains(github.event.pull_request.labels.*.name, 'run-benchmarks')
strategy:
fail-fast: false
matrix:
@@ -245,7 +250,8 @@ jobs:
# while coverage is currently collected for the debug ones
coverage-report:
runs-on: [ self-hosted, Linux, k8s-runner ]
runs-on: dev
container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rustlegacy:2746987948
needs: [ other-tests, pg_regress-tests ]
strategy:
fail-fast: false
@@ -270,23 +276,17 @@ jobs:
target/
key: v3-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-${{ hashFiles('Cargo.lock') }}
- name: Get Neon artifact for restoration
uses: actions/download-artifact@v3
- name: Get Neon artifact
uses: ./.github/actions/download
with:
name: neon-${{ runner.os }}-${{ matrix.build_type }}-${{ matrix.rust_toolchain }}-artifact
path: ./neon-artifact/
path: /tmp/neon
- name: Extract Neon artifact
run: |
mkdir -p /tmp/neon/
tar -xf ./neon-artifact/neon.tar.zst -C /tmp/neon/
rm -rf ./neon-artifact/
- name: Restore coverage data
uses: actions/download-artifact@v3
- name: Get coverage artifact
uses: ./.github/actions/download
with:
name: coverage-data-artifact
path: /tmp/coverage/
path: /tmp/coverage
- name: Merge coverage data
run: scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage merge
@@ -324,40 +324,40 @@ jobs:
}"
trigger-e2e-tests:
runs-on: [ self-hosted, Linux, k8s-runner ]
needs: [ build-neon ]
steps:
- name: Set PR's status to pending and request a remote CI test
run: |
COMMIT_SHA=${{ github.event.pull_request.head.sha }}
COMMIT_SHA=${COMMIT_SHA:-${{ github.sha }}}
runs-on: [ self-hosted, Linux, k8s-runner ]
needs: [ build-neon ]
steps:
- name: Set PR's status to pending and request a remote CI test
run: |
COMMIT_SHA=${{ github.event.pull_request.head.sha }}
COMMIT_SHA=${COMMIT_SHA:-${{ github.sha }}}
REMOTE_REPO="${{ github.repository_owner }}/cloud"
REMOTE_REPO="${{ github.repository_owner }}/cloud"
curl -f -X POST \
https://api.github.com/repos/${{ github.repository }}/statuses/$COMMIT_SHA \
-H "Accept: application/vnd.github.v3+json" \
--user "${{ secrets.CI_ACCESS_TOKEN }}" \
--data \
"{
\"state\": \"pending\",
\"context\": \"neon-cloud-e2e\",
\"description\": \"[$REMOTE_REPO] Remote CI job is about to start\"
}"
curl -f -X POST \
https://api.github.com/repos/${{ github.repository }}/statuses/$COMMIT_SHA \
-H "Accept: application/vnd.github.v3+json" \
--user "${{ secrets.CI_ACCESS_TOKEN }}" \
--data \
"{
\"state\": \"pending\",
\"context\": \"neon-cloud-e2e\",
\"description\": \"[$REMOTE_REPO] Remote CI job is about to start\"
}"
curl -f -X POST \
https://api.github.com/repos/$REMOTE_REPO/actions/workflows/testing.yml/dispatches \
-H "Accept: application/vnd.github.v3+json" \
--user "${{ secrets.CI_ACCESS_TOKEN }}" \
--data \
"{
\"ref\": \"main\",
\"inputs\": {
\"ci_job_name\": \"neon-cloud-e2e\",
\"commit_hash\": \"$COMMIT_SHA\",
\"remote_repo\": \"${{ github.repository }}\"
}
}"
curl -f -X POST \
https://api.github.com/repos/$REMOTE_REPO/actions/workflows/testing.yml/dispatches \
-H "Accept: application/vnd.github.v3+json" \
--user "${{ secrets.CI_ACCESS_TOKEN }}" \
--data \
"{
\"ref\": \"main\",
\"inputs\": {
\"ci_job_name\": \"neon-cloud-e2e\",
\"commit_hash\": \"$COMMIT_SHA\",
\"remote_repo\": \"${{ github.repository }}\"
}
}"
docker-image:
runs-on: [ self-hosted, Linux, k8s-runner ]

47
Cargo.lock generated
View File

@@ -154,9 +154,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "axum"
version = "0.5.12"
version = "0.5.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d16705af05732b7d3258ec0f7b73c03a658a28925e050d8852d5b568ee8bcf4e"
checksum = "6b9496f0c1d1afb7a2af4338bbe1d969cddfead41d87a9fb3aaa6d0bbc7af648"
dependencies = [
"async-trait",
"axum-core",
@@ -317,15 +317,6 @@ dependencies = [
"serde",
]
[[package]]
name = "cast"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c24dab4283a142afa2fdca129b80ad2c6284e073930f964c3a1293c225ee39a"
dependencies = [
"rustc_version",
]
[[package]]
name = "cast"
version = "0.3.0"
@@ -579,7 +570,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b01d6de93b2b6c65e17c634a26653a29d107b3c98c607c765bf38d041531cd8f"
dependencies = [
"atty",
"cast 0.3.0",
"cast",
"clap 2.34.0",
"criterion-plot",
"csv",
@@ -600,11 +591,11 @@ dependencies = [
[[package]]
name = "criterion-plot"
version = "0.4.4"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d00996de9f2f7559f7f4dc286073197f83e92256a59ed395f9aac01fe717da57"
checksum = "2673cc8207403546f45f5fd319a974b1e6983ad1a3ee7e6041650013be041876"
dependencies = [
"cast 0.2.7",
"cast",
"itertools",
]
@@ -680,9 +671,9 @@ dependencies = [
[[package]]
name = "crypto-common"
version = "0.1.5"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ccfd8c0ee4cce11e45b3fd6f9d5e69e0cc62912aa6a0cb1bf4617b0eba5a12f"
checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3"
dependencies = [
"generic-array",
"typenum",
@@ -1116,9 +1107,9 @@ dependencies = [
[[package]]
name = "gimli"
version = "0.26.1"
version = "0.26.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78cc372d058dcf6d5ecd98510e7fbc9e5aec4d21de70f65fea8fecebcd881bd4"
checksum = "22030e2c5a68ec659fde1e949a745124b48e6fa8b045b7ed5bd1fe4ccc5c4e5d"
[[package]]
name = "git-version"
@@ -1184,9 +1175,9 @@ dependencies = [
[[package]]
name = "hashbrown"
version = "0.12.2"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "607c8a29735385251a339424dd462993c0fed8fa09d378f259377df08c126022"
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
[[package]]
name = "heck"
@@ -1388,7 +1379,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10a35a97730320ffe8e2d410b5d3b69279b98d2c14bdb8b70ea89ecf7888d41e"
dependencies = [
"autocfg",
"hashbrown 0.12.2",
"hashbrown 0.12.3",
]
[[package]]
@@ -1851,9 +1842,9 @@ dependencies = [
[[package]]
name = "os_str_bytes"
version = "6.1.0"
version = "6.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21326818e99cfe6ce1e524c2a805c189a99b5ae555a35d19f9a284b427d86afa"
checksum = "648001efe5d5c0102d8cea768e348da85d90af8ba91f0bea908f157951493cd4"
[[package]]
name = "pageserver"
@@ -2735,9 +2726,9 @@ dependencies = [
[[package]]
name = "rustversion"
version = "1.0.7"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0a5f7c728f5d284929a1cccb5bc19884422bfe6ef4d6c409da2c41838983fcf"
checksum = "24c8ad4f0c00e1eb5bc7614d236a7f1300e3dbd76b68cac8e06fb00b015ad8d8"
[[package]]
name = "ryu"
@@ -3617,9 +3608,9 @@ checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992"
[[package]]
name = "unicode-ident"
version = "1.0.1"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5bd2fe26506023ed7b5e1e315add59d6f584c621d037f9368fea9cfb988f368c"
checksum = "15c61ba63f9235225a22310255a29b806b907c9b8c964bcbd0a2c70f3f2deea7"
[[package]]
name = "unicode-normalization"

View File

@@ -51,7 +51,11 @@ fn fill_rust_env_vars(cmd: &mut Command) -> &mut Command {
}
fn fill_aws_secrets_vars(mut cmd: &mut Command) -> &mut Command {
for env_key in ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"] {
for env_key in [
"AWS_ACCESS_KEY_ID",
"AWS_SECRET_ACCESS_KEY",
"AWS_SESSION_TOKEN",
] {
if let Ok(value) = std::env::var(env_key) {
cmd = cmd.env(env_key, value);
}

View File

@@ -247,7 +247,7 @@ impl SafekeeperNode {
// Shutting down may take a long time,
// if safekeeper flushes a lot of data
let mut tcp_stopped = false;
for _ in 0..100 {
for i in 0..600 {
if !tcp_stopped {
if let Err(err) = TcpStream::connect(&address) {
tcp_stopped = true;
@@ -272,9 +272,11 @@ impl SafekeeperNode {
}
}
}
print!(".");
io::stdout().flush().unwrap();
thread::sleep(Duration::from_secs(1));
if i % 10 == 0 {
print!(".");
io::stdout().flush().unwrap();
}
thread::sleep(Duration::from_millis(100));
}
bail!("Failed to stop safekeeper with pid {}", pid);

View File

@@ -318,7 +318,7 @@ impl PageServerNode {
// Shutting down may take a long time,
// if pageserver checkpoints a lot of data
let mut tcp_stopped = false;
for _ in 0..100 {
for i in 0..600 {
if !tcp_stopped {
if let Err(err) = TcpStream::connect(&address) {
tcp_stopped = true;
@@ -344,9 +344,11 @@ impl PageServerNode {
}
}
}
print!(".");
io::stdout().flush().unwrap();
thread::sleep(Duration::from_secs(1));
if i % 10 == 0 {
print!(".");
io::stdout().flush().unwrap();
}
thread::sleep(Duration::from_millis(100));
}
bail!("Failed to stop pageserver with pid {}", pid);

View File

@@ -16,7 +16,7 @@ use crate::XLogRecord;
use crate::XLOG_PAGE_MAGIC;
use crate::pg_constants::WAL_SEGMENT_SIZE;
use anyhow::{bail, ensure};
use anyhow::{anyhow, bail, ensure};
use byteorder::{ByteOrder, LittleEndian};
use bytes::BytesMut;
use bytes::{Buf, Bytes};
@@ -159,7 +159,7 @@ fn find_end_of_wal_segment(
let mut buf = [0u8; XLOG_BLCKSZ];
let file_name = XLogFileName(tli, segno, wal_seg_size);
let mut last_valid_rec_pos: usize = start_offset; // assume at given start_offset begins new record
let mut file = File::open(data_dir.join(file_name.clone() + ".partial")).unwrap();
let mut file = File::open(data_dir.join(file_name.clone() + ".partial"))?;
file.seek(SeekFrom::Start(offs as u64))?;
// xl_crc is the last field in XLogRecord, will not be read into rec_hdr
const_assert!(XLOG_RECORD_CRC_OFFS + 4 == XLOG_SIZE_OF_XLOG_RECORD);
@@ -396,10 +396,13 @@ pub fn find_end_of_wal(
let mut high_tli: TimeLineID = 0;
let mut high_ispartial = false;
for entry in fs::read_dir(data_dir).unwrap().flatten() {
for entry in fs::read_dir(data_dir)?.flatten() {
let ispartial: bool;
let entry_name = entry.file_name();
let fname = entry_name.to_str().unwrap();
let fname = entry_name
.to_str()
.ok_or_else(|| anyhow!("Invalid file name"))?;
/*
* Check if the filename looks like an xlog file, or a .partial file.
*/
@@ -411,7 +414,7 @@ pub fn find_end_of_wal(
continue;
}
let (segno, tli) = XLogFromFileName(fname, wal_seg_size);
if !ispartial && entry.metadata().unwrap().len() != wal_seg_size as u64 {
if !ispartial && entry.metadata()?.len() != wal_seg_size as u64 {
continue;
}
if segno > high_segno

View File

@@ -66,6 +66,9 @@ pub trait RemoteStorage: Send + Sync {
async fn list(&self) -> anyhow::Result<Vec<Self::RemoteObjectId>>;
/// Lists all top level subdirectories for a given prefix
/// Note: here we assume that if the prefix is passed it was obtained via remote_object_id
/// which already takes into account any kind of global prefix (prefix_in_bucket for S3 or storage_root for LocalFS)
/// so this method doesnt need to.
async fn list_prefixes(
&self,
prefix: Option<Self::RemoteObjectId>,

View File

@@ -116,7 +116,7 @@ impl RemoteStorage for LocalFs {
prefix: Option<Self::RemoteObjectId>,
) -> anyhow::Result<Vec<Self::RemoteObjectId>> {
let path = match prefix {
Some(prefix) => Cow::Owned(self.storage_root.join(prefix)),
Some(prefix) => Cow::Owned(prefix),
None => Cow::Borrowed(&self.storage_root),
};
get_all_files(path.as_ref(), false).await

View File

@@ -171,17 +171,25 @@ impl S3Bucket {
let access_key_id = std::env::var("AWS_ACCESS_KEY_ID").ok();
let secret_access_key = std::env::var("AWS_SECRET_ACCESS_KEY").ok();
// session token is used when authorizing through sso
// which is typically the case when testing locally on developer machine
let session_token = std::env::var("AWS_SESSION_TOKEN").ok();
let client = if access_key_id.is_none() && secret_access_key.is_none() {
debug!("Using IAM-based AWS access");
S3Client::new_with(request_dispatcher, InstanceMetadataProvider::new(), region)
} else {
debug!("Using credentials-based AWS access");
debug!(
"Using credentials-based AWS access. Session token is set: {}",
session_token.is_some()
);
S3Client::new_with(
request_dispatcher,
StaticProvider::new_minimal(
StaticProvider::new(
access_key_id.unwrap_or_default(),
secret_access_key.unwrap_or_default(),
session_token,
None,
),
region,
)
@@ -304,32 +312,24 @@ impl RemoteStorage for S3Bucket {
Ok(document_keys)
}
/// See the doc for `RemoteStorage::list_prefixes`
/// Note: it wont include empty "directories"
async fn list_prefixes(
&self,
prefix: Option<Self::RemoteObjectId>,
) -> anyhow::Result<Vec<Self::RemoteObjectId>> {
let list_prefix = match prefix {
Some(prefix) => {
let mut prefix_in_bucket = self.prefix_in_bucket.clone().unwrap_or_default();
// if there is no trailing / in default prefix and
// supplied prefix does not start with "/" insert it
if !(prefix_in_bucket.ends_with(S3_PREFIX_SEPARATOR)
|| prefix.0.starts_with(S3_PREFIX_SEPARATOR))
{
prefix_in_bucket.push(S3_PREFIX_SEPARATOR);
}
prefix_in_bucket.push_str(&prefix.0);
// get the passed prefix or if it is not set use prefix_in_bucket value
let list_prefix = prefix
.map(|p| p.0)
.or_else(|| self.prefix_in_bucket.clone())
.map(|mut p| {
// required to end with a separator
// otherwise request will return only the entry of a prefix
if !prefix_in_bucket.ends_with(S3_PREFIX_SEPARATOR) {
prefix_in_bucket.push(S3_PREFIX_SEPARATOR);
if !p.ends_with(S3_PREFIX_SEPARATOR) {
p.push(S3_PREFIX_SEPARATOR);
}
Some(prefix_in_bucket)
}
None => self.prefix_in_bucket.clone(),
};
p
});
let mut document_keys = Vec::new();

View File

@@ -884,7 +884,7 @@ fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
match sub_match.subcommand() {
Some(("start", start_match)) => {
if let Err(e) = pageserver.start(&pageserver_config_overrides(start_match)) {
eprintln!("pageserver start failed: {}", e);
eprintln!("pageserver start failed: {e}");
exit(1);
}
}
@@ -906,10 +906,19 @@ fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
}
if let Err(e) = pageserver.start(&pageserver_config_overrides(restart_match)) {
eprintln!("pageserver start failed: {}", e);
eprintln!("pageserver start failed: {e}");
exit(1);
}
}
Some(("status", _)) => match PageServerNode::from_env(env).check_status() {
Ok(_) => println!("Page server is up and running"),
Err(err) => {
eprintln!("Page server is not available: {}", err);
exit(1);
}
},
Some((sub_name, _)) => bail!("Unexpected pageserver subcommand '{}'", sub_name),
None => bail!("no pageserver subcommand provided"),
}

View File

@@ -130,6 +130,7 @@ where
tenant_path.display()
)
})?;
let timelines = storage
.list_prefixes(Some(tenant_storage_path))
.await
@@ -140,6 +141,13 @@ where
)
})?;
if timelines.is_empty() {
anyhow::bail!(
"no timelines found on the remote storage for tenant {}",
tenant_id
)
}
let mut sync_ids = HashSet::new();
for timeline_remote_storage_key in timelines {
@@ -194,6 +202,8 @@ where
})
.map_err(DownloadError::BadInput)?;
warn!("part_storage_path {:?}", part_storage_path);
let mut index_part_download = storage.download(&part_storage_path).await?;
let mut index_part_bytes = Vec::new();

View File

@@ -1,4 +1,7 @@
[pytest]
filterwarnings =
error::pytest.PytestUnhandledThreadExceptionWarning
error::UserWarning
addopts =
-m 'not remote_cluster'
markers =

View File

@@ -1,6 +1,5 @@
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, NeonPageserverApiException
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.utils import query_scalar

View File

@@ -167,3 +167,5 @@ def test_branch_creation_before_gc(neon_simple_env: NeonEnv):
# The starting LSN is invalid as the corresponding record is scheduled to be removed by in-queue GC.
with pytest.raises(Exception, match="invalid branch start lsn"):
env.neon_cli.create_branch('b1', 'b0', tenant_id=tenant, ancestor_start_lsn=lsn)
thread.join()

View File

@@ -60,17 +60,38 @@ def check_client(client: NeonPageserverHttpClient, initial_tenant: UUID):
def test_pageserver_http_get_wal_receiver_not_found(neon_simple_env: NeonEnv):
env = neon_simple_env
client = env.pageserver.http_client()
with env.pageserver.http_client() as client:
tenant_id, timeline_id = env.neon_cli.create_tenant()
tenant_id, timeline_id = env.neon_cli.create_tenant()
timeline_details = client.timeline_detail(tenant_id=tenant_id,
timeline_id=timeline_id,
include_non_incremental_logical_size=True)
timeline_details = client.timeline_detail(tenant_id=tenant_id,
timeline_id=timeline_id,
include_non_incremental_logical_size=True)
assert timeline_details.get('wal_source_connstr') is None, 'Should not be able to connect to WAL streaming without PG compute node running'
assert timeline_details.get('last_received_msg_lsn') is None, 'Should not be able to connect to WAL streaming without PG compute node running'
assert timeline_details.get('last_received_msg_ts') is None, 'Should not be able to connect to WAL streaming without PG compute node running'
assert timeline_details.get('wal_source_connstr') is None, 'Should not be able to connect to WAL streaming without PG compute node running'
assert timeline_details.get('last_received_msg_lsn') is None, 'Should not be able to connect to WAL streaming without PG compute node running'
assert timeline_details.get('last_received_msg_ts') is None, 'Should not be able to connect to WAL streaming without PG compute node running'
def expect_updated_msg_lsn(client: NeonPageserverHttpClient,
tenant_id: UUID,
timeline_id: UUID,
prev_msg_lsn: Optional[int]) -> int:
timeline_details = client.timeline_detail(tenant_id, timeline_id=timeline_id)
# a successful `timeline_details` response must contain the below fields
local_timeline_details = timeline_details['local']
assert "wal_source_connstr" in local_timeline_details.keys()
assert "last_received_msg_lsn" in local_timeline_details.keys()
assert "last_received_msg_ts" in local_timeline_details.keys()
assert local_timeline_details["last_received_msg_lsn"] is not None, "the last received message's LSN is empty"
last_msg_lsn = lsn_from_hex(local_timeline_details["last_received_msg_lsn"])
assert prev_msg_lsn is None or prev_msg_lsn < last_msg_lsn, \
f"the last received message's LSN {last_msg_lsn} hasn't been updated \
compared to the previous message's LSN {prev_msg_lsn}"
return last_msg_lsn
# Test the WAL-receiver related fields in the response to `timeline_details` API call
@@ -79,44 +100,29 @@ def test_pageserver_http_get_wal_receiver_not_found(neon_simple_env: NeonEnv):
# `timeline_details` now.
def test_pageserver_http_get_wal_receiver_success(neon_simple_env: NeonEnv):
env = neon_simple_env
client = env.pageserver.http_client()
with env.pageserver.http_client() as client:
tenant_id, timeline_id = env.neon_cli.create_tenant()
pg = env.postgres.create_start(DEFAULT_BRANCH_NAME, tenant_id=tenant_id)
tenant_id, timeline_id = env.neon_cli.create_tenant()
pg = env.postgres.create_start(DEFAULT_BRANCH_NAME, tenant_id=tenant_id)
# Wait to make sure that we get a latest WAL receiver data.
# We need to wait here because it's possible that we don't have access to
# the latest WAL yet, when the `timeline_detail` API is first called.
# See: https://github.com/neondatabase/neon/issues/1768.
lsn = wait_until(number_of_iterations=5,
interval=1,
func=lambda: expect_updated_msg_lsn(client, tenant_id, timeline_id, None))
def expect_updated_msg_lsn(prev_msg_lsn: Optional[int]) -> int:
timeline_details = client.timeline_detail(tenant_id, timeline_id=timeline_id)
# a successful `timeline_details` response must contain the below fields
local_timeline_details = timeline_details['local']
assert "wal_source_connstr" in local_timeline_details.keys()
assert "last_received_msg_lsn" in local_timeline_details.keys()
assert "last_received_msg_ts" in local_timeline_details.keys()
assert local_timeline_details["last_received_msg_lsn"] is not None, "the last received message's LSN is empty"
last_msg_lsn = lsn_from_hex(local_timeline_details["last_received_msg_lsn"])
assert prev_msg_lsn is None or prev_msg_lsn < last_msg_lsn, \
f"the last received message's LSN {last_msg_lsn} hasn't been updated \
compared to the previous message's LSN {prev_msg_lsn}"
return last_msg_lsn
# Wait to make sure that we get a latest WAL receiver data.
# We need to wait here because it's possible that we don't have access to
# the latest WAL yet, when the `timeline_detail` API is first called.
# See: https://github.com/neondatabase/neon/issues/1768.
lsn = wait_until(number_of_iterations=5, interval=1, func=lambda: expect_updated_msg_lsn(None))
# Make a DB modification then expect getting a new WAL receiver's data.
pg.safe_psql("CREATE TABLE t(key int primary key, value text)")
wait_until(number_of_iterations=5, interval=1, func=lambda: expect_updated_msg_lsn(lsn))
# Make a DB modification then expect getting a new WAL receiver's data.
pg.safe_psql("CREATE TABLE t(key int primary key, value text)")
wait_until(number_of_iterations=5,
interval=1,
func=lambda: expect_updated_msg_lsn(client, tenant_id, timeline_id, lsn))
def test_pageserver_http_api_client(neon_simple_env: NeonEnv):
env = neon_simple_env
client = env.pageserver.http_client()
check_client(client, env.initial_tenant)
with env.pageserver.http_client() as client:
check_client(client, env.initial_tenant)
def test_pageserver_http_api_client_auth_enabled(neon_env_builder: NeonEnvBuilder):
@@ -125,5 +131,5 @@ def test_pageserver_http_api_client_auth_enabled(neon_env_builder: NeonEnvBuilde
management_token = env.auth_keys.generate_management_token()
client = env.pageserver.http_client(auth_token=management_token)
check_client(client, env.initial_tenant)
with env.pageserver.http_client(auth_token=management_token) as client:
check_client(client, env.initial_tenant)

View File

@@ -2,11 +2,10 @@
# env ZENITH_PAGESERVER_OVERRIDES="remote_storage={local_path='/tmp/neon_zzz/'}" poetry ......
import shutil, os
from contextlib import closing
from pathlib import Path
import time
from uuid import UUID
from fixtures.neon_fixtures import NeonEnvBuilder, assert_timeline_local, wait_until, wait_for_last_record_lsn, wait_for_upload
from fixtures.neon_fixtures import NeonEnvBuilder, RemoteStorageKind, assert_timeline_local, available_remote_storages, wait_until, wait_for_last_record_lsn, wait_for_upload
from fixtures.log_helper import log
from fixtures.utils import lsn_from_hex, query_scalar
import pytest
@@ -29,18 +28,19 @@ import pytest
# * queries the specific data, ensuring that it matches the one stored before
#
# The tests are done for all types of remote storage pageserver supports.
@pytest.mark.parametrize('storage_type', ['local_fs', 'mock_s3'])
def test_remote_storage_backup_and_restore(neon_env_builder: NeonEnvBuilder, storage_type: str):
@pytest.mark.parametrize('remote_storatge_kind', available_remote_storages())
def test_remote_storage_backup_and_restore(
neon_env_builder: NeonEnvBuilder,
remote_storatge_kind: RemoteStorageKind,
):
# Use this test to check more realistic SK ids: some etcd key parsing bugs were related,
# and this test needs SK to write data to pageserver, so it will be visible
neon_env_builder.safekeepers_id_start = 12
if storage_type == 'local_fs':
neon_env_builder.enable_local_fs_remote_storage()
elif storage_type == 'mock_s3':
neon_env_builder.enable_s3_mock_remote_storage('test_remote_storage_backup_and_restore')
else:
raise RuntimeError(f'Unknown storage type: {storage_type}')
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storatge_kind,
test_name='test_remote_storage_backup_and_restore',
)
data_id = 1
data_secret = 'very secret secret'

View File

@@ -1,10 +1,19 @@
from threading import Thread
from uuid import uuid4
import uuid
import psycopg2
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, NeonPageserverApiException
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, NeonPageserverApiException
def do_gc_target(env: NeonEnv, tenant_id: uuid.UUID, timeline_id: uuid.UUID):
"""Hack to unblock main, see https://github.com/neondatabase/neon/issues/2211"""
try:
env.pageserver.safe_psql(f'do_gc {tenant_id.hex} {timeline_id.hex} 0')
except Exception as e:
log.error("do_gc failed: %s", e)
def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder):
@@ -36,8 +45,7 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder):
env.pageserver.safe_psql(f'do_gc {tenant_id.hex} {uuid4().hex} 0')
# try to concurrently run gc and detach
gc_thread = Thread(
target=lambda: env.pageserver.safe_psql(f'do_gc {tenant_id.hex} {timeline_id.hex} 0'), )
gc_thread = Thread(target=lambda: do_gc_target(env, tenant_id, timeline_id))
gc_thread.start()
last_error = None

View File

@@ -13,7 +13,7 @@ from uuid import UUID
import pytest
from fixtures.neon_fixtures import NeonEnvBuilder, NeonEnv, Postgres, wait_for_last_record_lsn, wait_for_upload
from fixtures.neon_fixtures import NeonEnvBuilder, NeonEnv, Postgres, RemoteStorageKind, available_remote_storages, wait_for_last_record_lsn, wait_for_upload
from fixtures.utils import lsn_from_hex
@@ -38,7 +38,7 @@ async def tenant_workload(env: NeonEnv, pg: Postgres):
async def all_tenants_workload(env: NeonEnv, tenants_pgs):
workers = []
for tenant, pg in tenants_pgs:
for _, pg in tenants_pgs:
worker = tenant_workload(env, pg)
workers.append(asyncio.create_task(worker))
@@ -46,23 +46,18 @@ async def all_tenants_workload(env: NeonEnv, tenants_pgs):
await asyncio.gather(*workers)
@pytest.mark.parametrize('storage_type', ['local_fs', 'mock_s3'])
def test_tenants_many(neon_env_builder: NeonEnvBuilder, storage_type: str):
if storage_type == 'local_fs':
neon_env_builder.enable_local_fs_remote_storage()
elif storage_type == 'mock_s3':
neon_env_builder.enable_s3_mock_remote_storage('test_remote_storage_backup_and_restore')
else:
raise RuntimeError(f'Unknown storage type: {storage_type}')
neon_env_builder.enable_local_fs_remote_storage()
@pytest.mark.parametrize('remote_storatge_kind', available_remote_storages())
def test_tenants_many(neon_env_builder: NeonEnvBuilder, remote_storatge_kind: RemoteStorageKind):
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storatge_kind,
test_name='test_tenants_many',
)
env = neon_env_builder.init_start()
tenants_pgs: List[Tuple[UUID, Postgres]] = []
for i in range(1, 5):
for _ in range(1, 5):
# Use a tiny checkpoint distance, to create a lot of layers quickly
tenant, _ = env.neon_cli.create_tenant(
conf={

View File

@@ -12,9 +12,8 @@ import uuid
from contextlib import closing
from dataclasses import dataclass, field
from multiprocessing import Process, Value
from pathlib import Path
from fixtures.neon_fixtures import NeonPageserver, PgBin, Etcd, Postgres, RemoteStorageUsers, Safekeeper, NeonEnv, NeonEnvBuilder, PortDistributor, SafekeeperPort, neon_binpath, PgProtocol, wait_for_last_record_lsn, wait_for_upload
from fixtures.neon_fixtures import NeonPageserver, PgBin, Etcd, Postgres, RemoteStorageKind, RemoteStorageUsers, Safekeeper, NeonEnv, NeonEnvBuilder, PortDistributor, SafekeeperPort, available_remote_storages, neon_binpath, PgProtocol, wait_for_last_record_lsn, wait_for_upload
from fixtures.utils import get_dir_size, lsn_to_hex, lsn_from_hex, query_scalar
from fixtures.log_helper import log
from typing import List, Optional, Any
@@ -377,15 +376,15 @@ def wait_wal_trim(tenant_id, timeline_id, sk, target_size):
time.sleep(0.5)
@pytest.mark.parametrize('storage_type', ['mock_s3', 'local_fs'])
def test_wal_backup(neon_env_builder: NeonEnvBuilder, storage_type: str):
@pytest.mark.parametrize('remote_storatge_kind', available_remote_storages())
def test_wal_backup(neon_env_builder: NeonEnvBuilder, remote_storatge_kind: RemoteStorageKind):
neon_env_builder.num_safekeepers = 3
if storage_type == 'local_fs':
neon_env_builder.enable_local_fs_remote_storage()
elif storage_type == 'mock_s3':
neon_env_builder.enable_s3_mock_remote_storage('test_safekeepers_wal_backup')
else:
raise RuntimeError(f'Unknown storage type: {storage_type}')
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storatge_kind,
test_name='test_safekeepers_wal_backup',
)
neon_env_builder.remote_storage_users = RemoteStorageUsers.SAFEKEEPER
env = neon_env_builder.init_start()
@@ -425,15 +424,15 @@ def test_wal_backup(neon_env_builder: NeonEnvBuilder, storage_type: str):
wait_segment_offload(tenant_id, timeline_id, env.safekeepers[1], '0/5000000')
@pytest.mark.parametrize('storage_type', ['mock_s3', 'local_fs'])
def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, storage_type: str):
@pytest.mark.parametrize('remote_storatge_kind', available_remote_storages())
def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storatge_kind: RemoteStorageKind):
neon_env_builder.num_safekeepers = 3
if storage_type == 'local_fs':
neon_env_builder.enable_local_fs_remote_storage()
elif storage_type == 'mock_s3':
neon_env_builder.enable_s3_mock_remote_storage('test_s3_wal_replay')
else:
raise RuntimeError(f'Unknown storage type: {storage_type}')
neon_env_builder.enable_remote_storage(
remote_storage_kind=remote_storatge_kind,
test_name='test_s3_wal_replay',
)
neon_env_builder.remote_storage_users = RemoteStorageUsers.SAFEKEEPER
env = neon_env_builder.init_start()

View File

@@ -3,6 +3,7 @@ from __future__ import annotations
from dataclasses import field
from contextlib import contextmanager
from enum import Flag, auto
import enum
import textwrap
from cached_property import cached_property
import abc
@@ -221,7 +222,7 @@ def can_bind(host: str, port: int) -> bool:
# moment. If that changes, we should use start using SO_REUSEADDR here
# too, to allow reusing ports more quickly.
# See https://github.com/neondatabase/neon/issues/801
#sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
sock.bind((host, port))
@@ -230,6 +231,8 @@ def can_bind(host: str, port: int) -> bool:
except socket.error:
log.info(f"Port {port} is in use, skipping")
return False
finally:
sock.close()
class PortDistributor:
@@ -262,6 +265,11 @@ def default_broker(request: Any, port_distributor: PortDistributor):
broker.stop()
@pytest.fixture(scope='session')
def run_id():
yield uuid.uuid4()
@pytest.fixture(scope='session')
def mock_s3_server(port_distributor: PortDistributor):
mock_s3_server = MockS3Server(port_distributor.get_port())
@@ -438,26 +446,46 @@ class MockS3Server:
def secret_key(self) -> str:
return 'test'
def access_env_vars(self) -> Dict[Any, Any]:
return {
'AWS_ACCESS_KEY_ID': self.access_key(),
'AWS_SECRET_ACCESS_KEY': self.secret_key(),
}
def kill(self):
self.subprocess.kill()
@enum.unique
class RemoteStorageKind(enum.Enum):
LOCAL_FS = "local_fs"
MOCK_S3 = "mock_s3"
REAL_S3 = "real_s3"
def available_remote_storages() -> List[RemoteStorageKind]:
remote_storages = [RemoteStorageKind.LOCAL_FS, RemoteStorageKind.MOCK_S3]
if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE") is not None:
remote_storages.append(RemoteStorageKind.REAL_S3)
log.info("Enabling real s3 storage for tests")
else:
log.info("Using mock implementations to test remote storage")
return remote_storages
@dataclass
class LocalFsStorage:
local_path: Path
root: Path
@dataclass
class S3Storage:
bucket_name: str
bucket_region: str
endpoint: Optional[str]
access_key: str
secret_key: str
endpoint: Optional[str] = None
prefix_in_bucket: Optional[str] = None
def access_env_vars(self) -> Dict[str, str]:
return {
'AWS_ACCESS_KEY_ID': self.access_key,
'AWS_SECRET_ACCESS_KEY': self.secret_key,
}
RemoteStorage = Union[LocalFsStorage, S3Storage]
@@ -466,16 +494,20 @@ RemoteStorage = Union[LocalFsStorage, S3Storage]
# serialize as toml inline table
def remote_storage_to_toml_inline_table(remote_storage):
if isinstance(remote_storage, LocalFsStorage):
res = f"local_path='{remote_storage.local_path}'"
remote_storage_config = f"local_path='{remote_storage.root}'"
elif isinstance(remote_storage, S3Storage):
res = f"bucket_name='{remote_storage.bucket_name}', bucket_region='{remote_storage.bucket_region}'"
remote_storage_config = f"bucket_name='{remote_storage.bucket_name}',\
bucket_region='{remote_storage.bucket_region}'"
if remote_storage.prefix_in_bucket is not None:
remote_storage_config += f",prefix_in_bucket='{remote_storage.prefix_in_bucket}'"
if remote_storage.endpoint is not None:
res += f", endpoint='{remote_storage.endpoint}'"
else:
raise Exception(f'Unknown storage configuration {remote_storage}')
remote_storage_config += f",endpoint='{remote_storage.endpoint}'"
else:
raise Exception("invalid remote storage type")
return f"{{{res}}}"
return f"{{{remote_storage_config}}}"
class RemoteStorageUsers(Flag):
@@ -493,28 +525,31 @@ class NeonEnvBuilder:
cleaned up after the test has finished.
"""
def __init__(
self,
repo_dir: Path,
port_distributor: PortDistributor,
broker: Etcd,
mock_s3_server: MockS3Server,
remote_storage: Optional[RemoteStorage] = None,
remote_storage_users: RemoteStorageUsers = RemoteStorageUsers.PAGESERVER,
pageserver_config_override: Optional[str] = None,
num_safekeepers: int = 1,
# Use non-standard SK ids to check for various parsing bugs
safekeepers_id_start: int = 0,
# fsync is disabled by default to make the tests go faster
safekeepers_enable_fsync: bool = False,
auth_enabled: bool = False,
rust_log_override: Optional[str] = None,
default_branch_name=DEFAULT_BRANCH_NAME):
self,
repo_dir: Path,
port_distributor: PortDistributor,
broker: Etcd,
run_id: uuid.UUID,
mock_s3_server: MockS3Server,
remote_storage: Optional[RemoteStorage] = None,
remote_storage_users: RemoteStorageUsers = RemoteStorageUsers.PAGESERVER,
pageserver_config_override: Optional[str] = None,
num_safekeepers: int = 1,
# Use non-standard SK ids to check for various parsing bugs
safekeepers_id_start: int = 0,
# fsync is disabled by default to make the tests go faster
safekeepers_enable_fsync: bool = False,
auth_enabled: bool = False,
rust_log_override: Optional[str] = None,
default_branch_name=DEFAULT_BRANCH_NAME,
):
self.repo_dir = repo_dir
self.rust_log_override = rust_log_override
self.port_distributor = port_distributor
self.remote_storage = remote_storage
self.remote_storage_users = remote_storage_users
self.broker = broker
self.run_id = run_id
self.mock_s3_server = mock_s3_server
self.pageserver_config_override = pageserver_config_override
self.num_safekeepers = num_safekeepers
@@ -523,6 +558,8 @@ class NeonEnvBuilder:
self.auth_enabled = auth_enabled
self.default_branch_name = default_branch_name
self.env: Optional[NeonEnv] = None
self.remote_storage_prefix: Optional[str] = None
self.keep_remote_storage_contents: bool = True
def init(self) -> NeonEnv:
# Cannot create more than one environment from one builder
@@ -538,41 +575,143 @@ class NeonEnvBuilder:
self.start()
return env
"""
Sets up the pageserver to use the local fs at the `test_dir/local_fs_remote_storage` path.
Errors, if the pageserver has some remote storage configuration already, unless `force_enable` is not set to `True`.
"""
def enable_remote_storage(
self,
remote_storage_kind: RemoteStorageKind,
test_name: str,
force_enable: bool = True,
):
if remote_storage_kind == RemoteStorageKind.LOCAL_FS:
self.enable_local_fs_remote_storage(force_enable=force_enable)
elif remote_storage_kind == RemoteStorageKind.MOCK_S3:
self.enable_mock_s3_remote_storage(bucket_name=test_name, force_enable=force_enable)
elif remote_storage_kind == RemoteStorageKind.REAL_S3:
self.enable_real_s3_remote_storage(test_name=test_name, force_enable=force_enable)
else:
raise RuntimeError(f'Unknown storage type: {remote_storage_kind}')
def enable_local_fs_remote_storage(self, force_enable=True):
"""
Sets up the pageserver to use the local fs at the `test_dir/local_fs_remote_storage` path.
Errors, if the pageserver has some remote storage configuration already, unless `force_enable` is not set to `True`.
"""
assert force_enable or self.remote_storage is None, "remote storage is enabled already"
self.remote_storage = LocalFsStorage(Path(self.repo_dir / 'local_fs_remote_storage'))
"""
Sets up the pageserver to use the S3 mock server, creates the bucket, if it's not present already.
Starts up the mock server, if that does not run yet.
Errors, if the pageserver has some remote storage configuration already, unless `force_enable` is not set to `True`.
"""
def enable_s3_mock_remote_storage(self, bucket_name: str, force_enable=True):
def enable_mock_s3_remote_storage(self, bucket_name: str, force_enable=True):
"""
Sets up the pageserver to use the S3 mock server, creates the bucket, if it's not present already.
Starts up the mock server, if that does not run yet.
Errors, if the pageserver has some remote storage configuration already, unless `force_enable` is not set to `True`.
"""
assert force_enable or self.remote_storage is None, "remote storage is enabled already"
mock_endpoint = self.mock_s3_server.endpoint()
mock_region = self.mock_s3_server.region()
boto3.client(
self.remote_storage_client = boto3.client(
's3',
endpoint_url=mock_endpoint,
region_name=mock_region,
aws_access_key_id=self.mock_s3_server.access_key(),
aws_secret_access_key=self.mock_s3_server.secret_key(),
).create_bucket(Bucket=bucket_name)
)
self.remote_storage_client.create_bucket(Bucket=bucket_name)
self.remote_storage = S3Storage(
bucket_name=bucket_name,
endpoint=mock_endpoint,
bucket_region=mock_region,
access_key=self.mock_s3_server.access_key(),
secret_key=self.mock_s3_server.secret_key(),
)
def enable_real_s3_remote_storage(self, test_name: str, force_enable=True):
"""
Sets up configuration to use real s3 endpoint without mock server
"""
assert force_enable or self.remote_storage is None, "remote storage is enabled already"
access_key = os.getenv("AWS_ACCESS_KEY_ID")
assert access_key, "no aws access key provided"
secret_key = os.getenv("AWS_SECRET_ACCESS_KEY")
assert secret_key, "no aws access key provided"
# session token is needed for local runs with sso auth
session_token = os.getenv("AWS_SESSION_TOKEN")
bucket_name = os.getenv("REMOTE_STORAGE_S3_BUCKET")
assert bucket_name, "no remote storage bucket name provided"
region = os.getenv("REMOTE_STORAGE_S3_REGION")
assert region, "no remote storage region provided"
# do not leave data in real s3
self.keep_remote_storage_contents = False
# construct a prefix inside bucket for the particular test case and test run
self.remote_storage_prefix = f'{self.run_id}/{test_name}'
self.remote_storage_client = boto3.client(
's3',
region_name=region,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
aws_session_token=session_token,
)
self.remote_storage = S3Storage(bucket_name=bucket_name,
endpoint=mock_endpoint,
bucket_region=mock_region)
bucket_region=region,
access_key=access_key,
secret_key=secret_key,
prefix_in_bucket=self.remote_storage_prefix)
def cleanup_remote_storage(self):
# here wee check for true remote storage, no the local one
# local cleanup is not needed after test because in ci all env will be destroyed anyway
if self.remote_storage_prefix is None:
log.info("no remote storage was set up, skipping cleanup")
return
if self.keep_remote_storage_contents:
log.info("keep_remote_storage_contents skipping remote storage cleanup")
return
log.info("removing data from test s3 bucket %s by prefix %s",
self.remote_storage.bucket_name,
self.remote_storage_prefix)
paginator = self.remote_storage_client.get_paginator('list_objects_v2')
pages = paginator.paginate(
Bucket=self.remote_storage.bucket_name,
Prefix=self.remote_storage_prefix,
)
objects_to_delete = {'Objects': []}
cnt = 0
for item in pages.search('Contents'):
# weirdly when nothing is found it returns [None]
if item is None:
break
objects_to_delete['Objects'].append({'Key': item['Key']})
# flush once aws limit reached
if len(objects_to_delete['Objects']) >= 1000:
self.remote_storage_client.delete_objects(
Bucket=self.remote_storage.bucket_name,
Delete=objects_to_delete,
)
objects_to_delete = dict(Objects=[])
cnt += 1
# flush rest
if len(objects_to_delete['Objects']):
self.remote_storage_client.delete_objects(Bucket=self.remote_storage.bucket_name,
Delete=objects_to_delete)
log.info("deleted %s objects from remote storage", cnt)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
# Stop all the nodes.
if self.env:
log.info('Cleaning up all storage and compute nodes')
@@ -581,6 +720,8 @@ class NeonEnvBuilder:
sk.stop(immediate=True)
self.env.pageserver.stop(immediate=True)
self.cleanup_remote_storage()
class NeonEnv:
"""
@@ -713,10 +854,13 @@ class NeonEnv:
@pytest.fixture(scope=shareable_scope)
def _shared_simple_env(request: Any,
port_distributor: PortDistributor,
mock_s3_server: MockS3Server,
default_broker: Etcd) -> Iterator[NeonEnv]:
def _shared_simple_env(
request: Any,
port_distributor: PortDistributor,
mock_s3_server: MockS3Server,
default_broker: Etcd,
run_id: uuid.UUID,
) -> Iterator[NeonEnv]:
"""
# Internal fixture backing the `neon_simple_env` fixture. If TEST_SHARED_FIXTURES
is set, this is shared by all tests using `neon_simple_env`.
@@ -730,8 +874,13 @@ def _shared_simple_env(request: Any,
repo_dir = os.path.join(str(top_output_dir), "shared_repo")
shutil.rmtree(repo_dir, ignore_errors=True)
with NeonEnvBuilder(Path(repo_dir), port_distributor, default_broker,
mock_s3_server) as builder:
with NeonEnvBuilder(
repo_dir=Path(repo_dir),
port_distributor=port_distributor,
broker=default_broker,
mock_s3_server=mock_s3_server,
run_id=run_id,
) as builder:
env = builder.init_start()
# For convenience in tests, create a branch from the freshly-initialized cluster.
@@ -756,10 +905,13 @@ def neon_simple_env(_shared_simple_env: NeonEnv) -> Iterator[NeonEnv]:
@pytest.fixture(scope='function')
def neon_env_builder(test_output_dir,
port_distributor: PortDistributor,
mock_s3_server: MockS3Server,
default_broker: Etcd) -> Iterator[NeonEnvBuilder]:
def neon_env_builder(
test_output_dir,
port_distributor: PortDistributor,
mock_s3_server: MockS3Server,
default_broker: Etcd,
run_id: uuid.UUID,
) -> Iterator[NeonEnvBuilder]:
"""
Fixture to create a Neon environment for test.
@@ -777,8 +929,13 @@ def neon_env_builder(test_output_dir,
repo_dir = os.path.join(test_output_dir, "repo")
# Return the builder to the caller
with NeonEnvBuilder(Path(repo_dir), port_distributor, default_broker,
mock_s3_server) as builder:
with NeonEnvBuilder(
repo_dir=Path(repo_dir),
port_distributor=port_distributor,
mock_s3_server=mock_s3_server,
broker=default_broker,
run_id=run_id,
) as builder:
yield builder
@@ -1183,7 +1340,10 @@ class NeonCli(AbstractNeonCli):
remote_storage_users=self.env.remote_storage_users,
pageserver_config_override=self.env.pageserver.config_override)
s3_env_vars = self.env.s3_mock_server.access_env_vars() if self.env.s3_mock_server else None
s3_env_vars = None
if self.env.remote_storage is not None and isinstance(self.env.remote_storage, S3Storage):
s3_env_vars = self.env.remote_storage.access_env_vars()
return self.raw_cli(start_args, extra_env_vars=s3_env_vars)
def pageserver_stop(self, immediate=False) -> 'subprocess.CompletedProcess[str]':
@@ -1195,7 +1355,10 @@ class NeonCli(AbstractNeonCli):
return self.raw_cli(cmd)
def safekeeper_start(self, id: int) -> 'subprocess.CompletedProcess[str]':
s3_env_vars = self.env.s3_mock_server.access_env_vars() if self.env.s3_mock_server else None
s3_env_vars = None
if self.env.remote_storage is not None and isinstance(self.env.remote_storage, S3Storage):
s3_env_vars = self.env.remote_storage.access_env_vars()
return self.raw_cli(['safekeeper', 'start', str(id)], extra_env_vars=s3_env_vars)
def safekeeper_stop(self,
@@ -1337,7 +1500,7 @@ class NeonPageserver(PgProtocol):
return self
def __exit__(self, exc_type, exc, tb):
self.stop(True)
self.stop(immediate=True)
def http_client(self, auth_token: Optional[str] = None) -> NeonPageserverHttpClient:
return NeonPageserverHttpClient(
@@ -1354,6 +1517,7 @@ def append_pageserver_param_overrides(
):
if bool(remote_storage_users & RemoteStorageUsers.PAGESERVER) and remote_storage is not None:
remote_storage_toml_table = remote_storage_to_toml_inline_table(remote_storage)
params_to_update.append(
f'--pageserver-config-override=remote_storage={remote_storage_toml_table}')
@@ -1860,8 +2024,8 @@ class Safekeeper:
started_at = time.time()
while True:
try:
http_cli = self.http_client()
http_cli.check_status()
with self.http_client() as http_cli:
http_cli.check_status()
except Exception as e:
elapsed = time.time() - started_at
if elapsed > 3:
@@ -2012,9 +2176,9 @@ class Etcd:
return f'http://127.0.0.1:{self.port}'
def check_status(self):
s = requests.Session()
s.mount('http://', requests.adapters.HTTPAdapter(max_retries=1)) # do not retry
s.get(f"{self.client_url()}/health").raise_for_status()
with requests.Session() as s:
s.mount('http://', requests.adapters.HTTPAdapter(max_retries=1)) # do not retry
s.get(f"{self.client_url()}/health").raise_for_status()
def try_start(self):
if self.handle is not None: