From 71f39bac3d489abcc4f07a06e4fa89c8af546cbb Mon Sep 17 00:00:00 2001 From: Alexander Bayandin Date: Tue, 2 Aug 2022 13:57:26 +0100 Subject: [PATCH 1/9] github/workflows: upload artifacts to S3 (#2071) --- .github/actions/download/action.yml | 56 +++++++++ .../actions/run-python-test-set/action.yml | 17 +-- .github/actions/save-coverage-data/action.yml | 15 ++- .github/actions/upload/action.yml | 51 ++++++++ .github/workflows/build_and_test.yml | 113 +++++++++--------- 5 files changed, 175 insertions(+), 77 deletions(-) create mode 100644 .github/actions/download/action.yml create mode 100644 .github/actions/upload/action.yml diff --git a/.github/actions/download/action.yml b/.github/actions/download/action.yml new file mode 100644 index 0000000000..5aa45164e7 --- /dev/null +++ b/.github/actions/download/action.yml @@ -0,0 +1,56 @@ +name: "Download an artifact" +description: "Custom download action" +inputs: + name: + description: "Artifact name" + required: true + path: + description: "A directory to put artifact into" + default: "." + required: false + skip-if-does-not-exist: + description: "Allow to skip if file doesn't exist, fail otherwise" + default: false + required: false + +runs: + using: "composite" + steps: + - name: Download artifact + id: download-artifact + shell: bash -euxo pipefail {0} + env: + TARGET: ${{ inputs.path }} + ARCHIVE: /tmp/downloads/${{ inputs.name }}.tar.zst + SKIP_IF_DOES_NOT_EXIST: ${{ inputs.skip-if-does-not-exist }} + run: | + BUCKET=neon-github-public-dev + PREFIX=artifacts/${GITHUB_RUN_ID} + FILENAME=$(basename $ARCHIVE) + + S3_KEY=$(aws s3api list-objects-v2 --bucket ${BUCKET} --prefix ${PREFIX} | jq -r '.Contents[].Key' | grep ${FILENAME} | sort --version-sort | tail -1 || true) + if [ -z "${S3_KEY}" ]; then + if [ "${SKIP_IF_DOES_NOT_EXIST}" = "true" ]; then + echo '::set-output name=SKIPPED::true' + exit 0 + else + echo 2>&1 "Neither s3://${BUCKET}/${PREFIX}/${GITHUB_RUN_ATTEMPT}/${FILENAME} nor its version from previous attempts exist" + exit 1 + fi + fi + + echo '::set-output name=SKIPPED::false' + + mkdir -p $(dirname $ARCHIVE) + time aws s3 cp --only-show-errors s3://${BUCKET}/${S3_KEY} ${ARCHIVE} + + - name: Extract artifact + if: ${{ steps.download-artifact.outputs.SKIPPED == 'false' }} + shell: bash -euxo pipefail {0} + env: + TARGET: ${{ inputs.path }} + ARCHIVE: /tmp/downloads/${{ inputs.name }}.tar.zst + run: | + mkdir -p ${TARGET} + time tar -xf ${ARCHIVE} -C ${TARGET} + rm -f ${ARCHIVE} diff --git a/.github/actions/run-python-test-set/action.yml b/.github/actions/run-python-test-set/action.yml index 6dc377a809..c9987053ce 100644 --- a/.github/actions/run-python-test-set/action.yml +++ b/.github/actions/run-python-test-set/action.yml @@ -31,18 +31,11 @@ inputs: runs: using: "composite" steps: - - name: Get Neon artifact for restoration - uses: actions/download-artifact@v3 + - name: Get Neon artifact + uses: ./.github/actions/download with: name: neon-${{ runner.os }}-${{ inputs.build_type }}-${{ inputs.rust_toolchain }}-artifact - path: ./neon-artifact/ - - - name: Extract Neon artifact - shell: bash -euxo pipefail {0} - run: | - mkdir -p /tmp/neon/ - tar -xf ./neon-artifact/neon.tar.zst -C /tmp/neon/ - rm -rf ./neon-artifact/ + path: /tmp/neon - name: Checkout if: inputs.needs_postgres_source == 'true' @@ -132,9 +125,7 @@ runs: - name: Upload python test logs if: always() - uses: actions/upload-artifact@v3 + uses: ./.github/actions/upload with: - retention-days: 7 - if-no-files-found: error name: python-test-${{ inputs.test_selection }}-${{ runner.os }}-${{ inputs.build_type }}-${{ inputs.rust_toolchain }}-logs path: /tmp/test_output/ diff --git a/.github/actions/save-coverage-data/action.yml b/.github/actions/save-coverage-data/action.yml index bcfd7cb47e..6fbe19a96e 100644 --- a/.github/actions/save-coverage-data/action.yml +++ b/.github/actions/save-coverage-data/action.yml @@ -8,10 +8,15 @@ runs: shell: bash -euxo pipefail {0} run: scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage merge - - name: Upload coverage data - uses: actions/upload-artifact@v3 + - name: Download previous coverage data into the same directory + uses: ./.github/actions/download with: - retention-days: 7 - if-no-files-found: error name: coverage-data-artifact - path: /tmp/coverage/ + path: /tmp/coverage + skip-if-does-not-exist: true # skip if there's no previous coverage to download + + - name: Upload coverage data + uses: ./.github/actions/upload + with: + name: coverage-data-artifact + path: /tmp/coverage diff --git a/.github/actions/upload/action.yml b/.github/actions/upload/action.yml new file mode 100644 index 0000000000..28e7d1fb1a --- /dev/null +++ b/.github/actions/upload/action.yml @@ -0,0 +1,51 @@ +name: "Upload an artifact" +description: "Custom upload action" +inputs: + name: + description: "Artifact name" + required: true + path: + description: "A directory or file to upload" + required: true + +runs: + using: "composite" + steps: + - name: Prepare artifact + shell: bash -euxo pipefail {0} + env: + SOURCE: ${{ inputs.path }} + ARCHIVE: /tmp/uploads/${{ inputs.name }}.tar.zst + run: | + mkdir -p $(dirname $ARCHIVE) + + if [ -f ${ARCHIVE} ]; then + echo 2>&1 "File ${ARCHIVE} already exist. Something went wrong before" + exit 1 + fi + + ZSTD_NBTHREADS=0 + if [ -d ${SOURCE} ]; then + time tar -C ${SOURCE} -cf ${ARCHIVE} --zstd . + elif [ -f ${SOURCE} ]; then + time tar -cf ${ARCHIVE} --zstd ${SOURCE} + else + echo 2>&1 "${SOURCE} neither directory nor file, don't know how to handle it" + fi + + - name: Upload artifact + shell: bash -euxo pipefail {0} + env: + SOURCE: ${{ inputs.path }} + ARCHIVE: /tmp/uploads/${{ inputs.name }}.tar.zst + run: | + BUCKET=neon-github-public-dev + PREFIX=artifacts/${GITHUB_RUN_ID} + FILENAME=$(basename $ARCHIVE) + + FILESIZE=$(du -sh ${ARCHIVE} | cut -f1) + + time aws s3 mv --only-show-errors ${ARCHIVE} s3://${BUCKET}/${PREFIX}/${GITHUB_RUN_ATTEMPT}/${FILENAME} + + # Ref https://docs.github.com/en/actions/using-workflows/workflow-commands-for-github-actions#adding-a-job-summary + echo "[${FILENAME}](https://${BUCKET}.s3.amazonaws.com/${PREFIX}/${GITHUB_RUN_ATTEMPT}/${FILENAME}) ${FILESIZE}" >> ${GITHUB_STEP_SUMMARY} diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 312b4d1f46..0be108400c 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -3,8 +3,8 @@ name: Test and Deploy on: push: branches: - - main - - release + - main + - release pull_request: defaults: @@ -22,7 +22,8 @@ env: jobs: build-neon: - runs-on: [ self-hosted, Linux, k8s-runner ] + runs-on: dev + container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rustlegacy:2746987948 strategy: fail-fast: false matrix: @@ -31,6 +32,7 @@ jobs: env: BUILD_TYPE: ${{ matrix.build_type }} + GIT_VERSION: ${{ github.sha }} steps: - name: Checkout @@ -123,6 +125,7 @@ jobs: mkdir -p /tmp/coverage/ mkdir -p /tmp/neon/test_bin/ + test_exe_paths=$( ${cov_prefix} cargo test $CARGO_FLAGS --message-format=json --no-run | jq -r '.executable | select(. != null)' @@ -145,25 +148,20 @@ jobs: - name: Install postgres binaries run: cp -a tmp_install /tmp/neon/pg_install - - name: Prepare neon artifact - run: ZSTD_NBTHREADS=0 tar -C /tmp/neon/ -cf ./neon.tar.zst --zstd . - - - name: Upload neon binaries - uses: actions/upload-artifact@v3 + - name: Upload Neon artifact + uses: ./.github/actions/upload with: - retention-days: 7 - if-no-files-found: error name: neon-${{ runner.os }}-${{ matrix.build_type }}-${{ matrix.rust_toolchain }}-artifact - path: ./neon.tar.zst + path: /tmp/neon # XXX: keep this after the binaries.list is formed, so the coverage can properly work later - name: Merge and upload coverage data if: matrix.build_type == 'debug' uses: ./.github/actions/save-coverage-data - pg_regress-tests: - runs-on: [ self-hosted, Linux, k8s-runner ] + runs-on: dev + container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rustlegacy:2746987948 needs: [ build-neon ] strategy: fail-fast: false @@ -190,7 +188,8 @@ jobs: uses: ./.github/actions/save-coverage-data other-tests: - runs-on: [ self-hosted, Linux, k8s-runner ] + runs-on: dev + container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rustlegacy:2746987948 needs: [ build-neon ] strategy: fail-fast: false @@ -216,7 +215,8 @@ jobs: uses: ./.github/actions/save-coverage-data benchmarks: - runs-on: [ self-hosted, Linux, k8s-runner ] + runs-on: dev + container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rustlegacy:2746987948 needs: [ build-neon ] strategy: fail-fast: false @@ -245,7 +245,8 @@ jobs: # while coverage is currently collected for the debug ones coverage-report: - runs-on: [ self-hosted, Linux, k8s-runner ] + runs-on: dev + container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rustlegacy:2746987948 needs: [ other-tests, pg_regress-tests ] strategy: fail-fast: false @@ -270,23 +271,17 @@ jobs: target/ key: v3-${{ runner.os }}-${{ matrix.build_type }}-cargo-${{ matrix.rust_toolchain }}-${{ hashFiles('Cargo.lock') }} - - name: Get Neon artifact for restoration - uses: actions/download-artifact@v3 + - name: Get Neon artifact + uses: ./.github/actions/download with: name: neon-${{ runner.os }}-${{ matrix.build_type }}-${{ matrix.rust_toolchain }}-artifact - path: ./neon-artifact/ + path: /tmp/neon - - name: Extract Neon artifact - run: | - mkdir -p /tmp/neon/ - tar -xf ./neon-artifact/neon.tar.zst -C /tmp/neon/ - rm -rf ./neon-artifact/ - - - name: Restore coverage data - uses: actions/download-artifact@v3 + - name: Get coverage artifact + uses: ./.github/actions/download with: name: coverage-data-artifact - path: /tmp/coverage/ + path: /tmp/coverage - name: Merge coverage data run: scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage merge @@ -324,40 +319,40 @@ jobs: }" trigger-e2e-tests: - runs-on: [ self-hosted, Linux, k8s-runner ] - needs: [ build-neon ] - steps: - - name: Set PR's status to pending and request a remote CI test - run: | - COMMIT_SHA=${{ github.event.pull_request.head.sha }} - COMMIT_SHA=${COMMIT_SHA:-${{ github.sha }}} + runs-on: [ self-hosted, Linux, k8s-runner ] + needs: [ build-neon ] + steps: + - name: Set PR's status to pending and request a remote CI test + run: | + COMMIT_SHA=${{ github.event.pull_request.head.sha }} + COMMIT_SHA=${COMMIT_SHA:-${{ github.sha }}} - REMOTE_REPO="${{ github.repository_owner }}/cloud" + REMOTE_REPO="${{ github.repository_owner }}/cloud" - curl -f -X POST \ - https://api.github.com/repos/${{ github.repository }}/statuses/$COMMIT_SHA \ - -H "Accept: application/vnd.github.v3+json" \ - --user "${{ secrets.CI_ACCESS_TOKEN }}" \ - --data \ - "{ - \"state\": \"pending\", - \"context\": \"neon-cloud-e2e\", - \"description\": \"[$REMOTE_REPO] Remote CI job is about to start\" - }" + curl -f -X POST \ + https://api.github.com/repos/${{ github.repository }}/statuses/$COMMIT_SHA \ + -H "Accept: application/vnd.github.v3+json" \ + --user "${{ secrets.CI_ACCESS_TOKEN }}" \ + --data \ + "{ + \"state\": \"pending\", + \"context\": \"neon-cloud-e2e\", + \"description\": \"[$REMOTE_REPO] Remote CI job is about to start\" + }" - curl -f -X POST \ - https://api.github.com/repos/$REMOTE_REPO/actions/workflows/testing.yml/dispatches \ - -H "Accept: application/vnd.github.v3+json" \ - --user "${{ secrets.CI_ACCESS_TOKEN }}" \ - --data \ - "{ - \"ref\": \"main\", - \"inputs\": { - \"ci_job_name\": \"neon-cloud-e2e\", - \"commit_hash\": \"$COMMIT_SHA\", - \"remote_repo\": \"${{ github.repository }}\" - } - }" + curl -f -X POST \ + https://api.github.com/repos/$REMOTE_REPO/actions/workflows/testing.yml/dispatches \ + -H "Accept: application/vnd.github.v3+json" \ + --user "${{ secrets.CI_ACCESS_TOKEN }}" \ + --data \ + "{ + \"ref\": \"main\", + \"inputs\": { + \"ci_job_name\": \"neon-cloud-e2e\", + \"commit_hash\": \"$COMMIT_SHA\", + \"remote_repo\": \"${{ github.repository }}\" + } + }" docker-image: runs-on: [ self-hosted, Linux, k8s-runner ] From b4f2c5b51448c5cf7ba1930edb4c3b3de06483c2 Mon Sep 17 00:00:00 2001 From: Dmitry Rodionov Date: Mon, 1 Aug 2022 17:57:46 +0300 Subject: [PATCH 2/9] run benchmarks conditionally, on main or if run_benchmarks label is set --- .github/workflows/build_and_test.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 0be108400c..ec7579a0d1 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -218,6 +218,7 @@ jobs: runs-on: dev container: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rustlegacy:2746987948 needs: [ build-neon ] + if: github.ref_name == 'main' || contains(github.event.pull_request.labels.*.name, 'run-benchmarks') strategy: fail-fast: false matrix: From 5f71aa09d313f829875012752f65ae99e67fc3fe Mon Sep 17 00:00:00 2001 From: Dmitry Rodionov Date: Thu, 21 Jul 2022 20:59:07 +0300 Subject: [PATCH 3/9] support running tests against real s3 implementation without mocking --- Cargo.lock | 47 ++- control_plane/src/lib.rs | 6 +- libs/remote_storage/src/lib.rs | 3 + libs/remote_storage/src/local_fs.rs | 2 +- libs/remote_storage/src/s3_bucket.rs | 40 +-- neon_local/src/main.rs | 4 +- pageserver/src/storage_sync/download.rs | 10 + .../batch_others/test_ancestor_branch.py | 3 +- .../batch_others/test_remote_storage.py | 20 +- .../test_tenants_with_remote_storage.py | 23 +- test_runner/batch_others/test_wal_acceptor.py | 35 ++- test_runner/fixtures/neon_fixtures.py | 278 ++++++++++++++---- 12 files changed, 315 insertions(+), 156 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5031ae02e3..4a78b2e504 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -154,9 +154,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "axum" -version = "0.5.12" +version = "0.5.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d16705af05732b7d3258ec0f7b73c03a658a28925e050d8852d5b568ee8bcf4e" +checksum = "6b9496f0c1d1afb7a2af4338bbe1d969cddfead41d87a9fb3aaa6d0bbc7af648" dependencies = [ "async-trait", "axum-core", @@ -317,15 +317,6 @@ dependencies = [ "serde", ] -[[package]] -name = "cast" -version = "0.2.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c24dab4283a142afa2fdca129b80ad2c6284e073930f964c3a1293c225ee39a" -dependencies = [ - "rustc_version", -] - [[package]] name = "cast" version = "0.3.0" @@ -579,7 +570,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b01d6de93b2b6c65e17c634a26653a29d107b3c98c607c765bf38d041531cd8f" dependencies = [ "atty", - "cast 0.3.0", + "cast", "clap 2.34.0", "criterion-plot", "csv", @@ -600,11 +591,11 @@ dependencies = [ [[package]] name = "criterion-plot" -version = "0.4.4" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d00996de9f2f7559f7f4dc286073197f83e92256a59ed395f9aac01fe717da57" +checksum = "2673cc8207403546f45f5fd319a974b1e6983ad1a3ee7e6041650013be041876" dependencies = [ - "cast 0.2.7", + "cast", "itertools", ] @@ -680,9 +671,9 @@ dependencies = [ [[package]] name = "crypto-common" -version = "0.1.5" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ccfd8c0ee4cce11e45b3fd6f9d5e69e0cc62912aa6a0cb1bf4617b0eba5a12f" +checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" dependencies = [ "generic-array", "typenum", @@ -1116,9 +1107,9 @@ dependencies = [ [[package]] name = "gimli" -version = "0.26.1" +version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78cc372d058dcf6d5ecd98510e7fbc9e5aec4d21de70f65fea8fecebcd881bd4" +checksum = "22030e2c5a68ec659fde1e949a745124b48e6fa8b045b7ed5bd1fe4ccc5c4e5d" [[package]] name = "git-version" @@ -1184,9 +1175,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.12.2" +version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "607c8a29735385251a339424dd462993c0fed8fa09d378f259377df08c126022" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" [[package]] name = "heck" @@ -1388,7 +1379,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "10a35a97730320ffe8e2d410b5d3b69279b98d2c14bdb8b70ea89ecf7888d41e" dependencies = [ "autocfg", - "hashbrown 0.12.2", + "hashbrown 0.12.3", ] [[package]] @@ -1851,9 +1842,9 @@ dependencies = [ [[package]] name = "os_str_bytes" -version = "6.1.0" +version = "6.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21326818e99cfe6ce1e524c2a805c189a99b5ae555a35d19f9a284b427d86afa" +checksum = "648001efe5d5c0102d8cea768e348da85d90af8ba91f0bea908f157951493cd4" [[package]] name = "pageserver" @@ -2735,9 +2726,9 @@ dependencies = [ [[package]] name = "rustversion" -version = "1.0.7" +version = "1.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0a5f7c728f5d284929a1cccb5bc19884422bfe6ef4d6c409da2c41838983fcf" +checksum = "24c8ad4f0c00e1eb5bc7614d236a7f1300e3dbd76b68cac8e06fb00b015ad8d8" [[package]] name = "ryu" @@ -3617,9 +3608,9 @@ checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992" [[package]] name = "unicode-ident" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5bd2fe26506023ed7b5e1e315add59d6f584c621d037f9368fea9cfb988f368c" +checksum = "15c61ba63f9235225a22310255a29b806b907c9b8c964bcbd0a2c70f3f2deea7" [[package]] name = "unicode-normalization" diff --git a/control_plane/src/lib.rs b/control_plane/src/lib.rs index 4dfca588ad..17232ccf45 100644 --- a/control_plane/src/lib.rs +++ b/control_plane/src/lib.rs @@ -51,7 +51,11 @@ fn fill_rust_env_vars(cmd: &mut Command) -> &mut Command { } fn fill_aws_secrets_vars(mut cmd: &mut Command) -> &mut Command { - for env_key in ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"] { + for env_key in [ + "AWS_ACCESS_KEY_ID", + "AWS_SECRET_ACCESS_KEY", + "AWS_SESSION_TOKEN", + ] { if let Ok(value) = std::env::var(env_key) { cmd = cmd.env(env_key, value); } diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index dec79e4580..07f8cb08aa 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -66,6 +66,9 @@ pub trait RemoteStorage: Send + Sync { async fn list(&self) -> anyhow::Result>; /// Lists all top level subdirectories for a given prefix + /// Note: here we assume that if the prefix is passed it was obtained via remote_object_id + /// which already takes into account any kind of global prefix (prefix_in_bucket for S3 or storage_root for LocalFS) + /// so this method doesnt need to. async fn list_prefixes( &self, prefix: Option, diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index df1581fb51..07b04084b9 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -116,7 +116,7 @@ impl RemoteStorage for LocalFs { prefix: Option, ) -> anyhow::Result> { let path = match prefix { - Some(prefix) => Cow::Owned(self.storage_root.join(prefix)), + Some(prefix) => Cow::Owned(prefix), None => Cow::Borrowed(&self.storage_root), }; get_all_files(path.as_ref(), false).await diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index ff52f033d1..1b241fe4ed 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -171,17 +171,25 @@ impl S3Bucket { let access_key_id = std::env::var("AWS_ACCESS_KEY_ID").ok(); let secret_access_key = std::env::var("AWS_SECRET_ACCESS_KEY").ok(); + // session token is used when authorizing through sso + // which is typically the case when testing locally on developer machine + let session_token = std::env::var("AWS_SESSION_TOKEN").ok(); let client = if access_key_id.is_none() && secret_access_key.is_none() { debug!("Using IAM-based AWS access"); S3Client::new_with(request_dispatcher, InstanceMetadataProvider::new(), region) } else { - debug!("Using credentials-based AWS access"); + debug!( + "Using credentials-based AWS access. Session token is set: {}", + session_token.is_some() + ); S3Client::new_with( request_dispatcher, - StaticProvider::new_minimal( + StaticProvider::new( access_key_id.unwrap_or_default(), secret_access_key.unwrap_or_default(), + session_token, + None, ), region, ) @@ -304,32 +312,24 @@ impl RemoteStorage for S3Bucket { Ok(document_keys) } + /// See the doc for `RemoteStorage::list_prefixes` /// Note: it wont include empty "directories" async fn list_prefixes( &self, prefix: Option, ) -> anyhow::Result> { - let list_prefix = match prefix { - Some(prefix) => { - let mut prefix_in_bucket = self.prefix_in_bucket.clone().unwrap_or_default(); - // if there is no trailing / in default prefix and - // supplied prefix does not start with "/" insert it - if !(prefix_in_bucket.ends_with(S3_PREFIX_SEPARATOR) - || prefix.0.starts_with(S3_PREFIX_SEPARATOR)) - { - prefix_in_bucket.push(S3_PREFIX_SEPARATOR); - } - - prefix_in_bucket.push_str(&prefix.0); + // get the passed prefix or if it is not set use prefix_in_bucket value + let list_prefix = prefix + .map(|p| p.0) + .or_else(|| self.prefix_in_bucket.clone()) + .map(|mut p| { // required to end with a separator // otherwise request will return only the entry of a prefix - if !prefix_in_bucket.ends_with(S3_PREFIX_SEPARATOR) { - prefix_in_bucket.push(S3_PREFIX_SEPARATOR); + if !p.ends_with(S3_PREFIX_SEPARATOR) { + p.push(S3_PREFIX_SEPARATOR); } - Some(prefix_in_bucket) - } - None => self.prefix_in_bucket.clone(), - }; + p + }); let mut document_keys = Vec::new(); diff --git a/neon_local/src/main.rs b/neon_local/src/main.rs index e6f5c6125d..24b40b72d6 100644 --- a/neon_local/src/main.rs +++ b/neon_local/src/main.rs @@ -884,7 +884,7 @@ fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul match sub_match.subcommand() { Some(("start", start_match)) => { if let Err(e) = pageserver.start(&pageserver_config_overrides(start_match)) { - eprintln!("pageserver start failed: {}", e); + eprintln!("pageserver start failed: {e}"); exit(1); } } @@ -906,7 +906,7 @@ fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul } if let Err(e) = pageserver.start(&pageserver_config_overrides(restart_match)) { - eprintln!("pageserver start failed: {}", e); + eprintln!("pageserver start failed: {e}"); exit(1); } } diff --git a/pageserver/src/storage_sync/download.rs b/pageserver/src/storage_sync/download.rs index a91eaaa7ca..441d5e563e 100644 --- a/pageserver/src/storage_sync/download.rs +++ b/pageserver/src/storage_sync/download.rs @@ -130,6 +130,7 @@ where tenant_path.display() ) })?; + let timelines = storage .list_prefixes(Some(tenant_storage_path)) .await @@ -140,6 +141,13 @@ where ) })?; + if timelines.is_empty() { + anyhow::bail!( + "no timelines found on the remote storage for tenant {}", + tenant_id + ) + } + let mut sync_ids = HashSet::new(); for timeline_remote_storage_key in timelines { @@ -194,6 +202,8 @@ where }) .map_err(DownloadError::BadInput)?; + warn!("part_storage_path {:?}", part_storage_path); + let mut index_part_download = storage.download(&part_storage_path).await?; let mut index_part_bytes = Vec::new(); diff --git a/test_runner/batch_others/test_ancestor_branch.py b/test_runner/batch_others/test_ancestor_branch.py index d8ba0a1b06..c4d36da043 100644 --- a/test_runner/batch_others/test_ancestor_branch.py +++ b/test_runner/batch_others/test_ancestor_branch.py @@ -1,6 +1,5 @@ -import pytest from fixtures.log_helper import log -from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, NeonPageserverApiException +from fixtures.neon_fixtures import NeonEnvBuilder from fixtures.utils import query_scalar diff --git a/test_runner/batch_others/test_remote_storage.py b/test_runner/batch_others/test_remote_storage.py index 6a8497a559..72963ffe21 100644 --- a/test_runner/batch_others/test_remote_storage.py +++ b/test_runner/batch_others/test_remote_storage.py @@ -2,11 +2,10 @@ # env ZENITH_PAGESERVER_OVERRIDES="remote_storage={local_path='/tmp/neon_zzz/'}" poetry ...... import shutil, os -from contextlib import closing from pathlib import Path import time from uuid import UUID -from fixtures.neon_fixtures import NeonEnvBuilder, assert_timeline_local, wait_until, wait_for_last_record_lsn, wait_for_upload +from fixtures.neon_fixtures import NeonEnvBuilder, RemoteStorageKind, assert_timeline_local, available_remote_storages, wait_until, wait_for_last_record_lsn, wait_for_upload from fixtures.log_helper import log from fixtures.utils import lsn_from_hex, query_scalar import pytest @@ -29,18 +28,19 @@ import pytest # * queries the specific data, ensuring that it matches the one stored before # # The tests are done for all types of remote storage pageserver supports. -@pytest.mark.parametrize('storage_type', ['local_fs', 'mock_s3']) -def test_remote_storage_backup_and_restore(neon_env_builder: NeonEnvBuilder, storage_type: str): +@pytest.mark.parametrize('remote_storatge_kind', available_remote_storages()) +def test_remote_storage_backup_and_restore( + neon_env_builder: NeonEnvBuilder, + remote_storatge_kind: RemoteStorageKind, +): # Use this test to check more realistic SK ids: some etcd key parsing bugs were related, # and this test needs SK to write data to pageserver, so it will be visible neon_env_builder.safekeepers_id_start = 12 - if storage_type == 'local_fs': - neon_env_builder.enable_local_fs_remote_storage() - elif storage_type == 'mock_s3': - neon_env_builder.enable_s3_mock_remote_storage('test_remote_storage_backup_and_restore') - else: - raise RuntimeError(f'Unknown storage type: {storage_type}') + neon_env_builder.enable_remote_storage( + remote_storage_kind=remote_storatge_kind, + test_name='test_remote_storage_backup_and_restore', + ) data_id = 1 data_secret = 'very secret secret' diff --git a/test_runner/batch_others/test_tenants_with_remote_storage.py b/test_runner/batch_others/test_tenants_with_remote_storage.py index 8ddb4d1b92..636616a45b 100644 --- a/test_runner/batch_others/test_tenants_with_remote_storage.py +++ b/test_runner/batch_others/test_tenants_with_remote_storage.py @@ -13,7 +13,7 @@ from uuid import UUID import pytest -from fixtures.neon_fixtures import NeonEnvBuilder, NeonEnv, Postgres, wait_for_last_record_lsn, wait_for_upload +from fixtures.neon_fixtures import NeonEnvBuilder, NeonEnv, Postgres, RemoteStorageKind, available_remote_storages, wait_for_last_record_lsn, wait_for_upload from fixtures.utils import lsn_from_hex @@ -38,7 +38,7 @@ async def tenant_workload(env: NeonEnv, pg: Postgres): async def all_tenants_workload(env: NeonEnv, tenants_pgs): workers = [] - for tenant, pg in tenants_pgs: + for _, pg in tenants_pgs: worker = tenant_workload(env, pg) workers.append(asyncio.create_task(worker)) @@ -46,23 +46,18 @@ async def all_tenants_workload(env: NeonEnv, tenants_pgs): await asyncio.gather(*workers) -@pytest.mark.parametrize('storage_type', ['local_fs', 'mock_s3']) -def test_tenants_many(neon_env_builder: NeonEnvBuilder, storage_type: str): - - if storage_type == 'local_fs': - neon_env_builder.enable_local_fs_remote_storage() - elif storage_type == 'mock_s3': - neon_env_builder.enable_s3_mock_remote_storage('test_remote_storage_backup_and_restore') - else: - raise RuntimeError(f'Unknown storage type: {storage_type}') - - neon_env_builder.enable_local_fs_remote_storage() +@pytest.mark.parametrize('remote_storatge_kind', available_remote_storages()) +def test_tenants_many(neon_env_builder: NeonEnvBuilder, remote_storatge_kind: RemoteStorageKind): + neon_env_builder.enable_remote_storage( + remote_storage_kind=remote_storatge_kind, + test_name='test_tenants_many', + ) env = neon_env_builder.init_start() tenants_pgs: List[Tuple[UUID, Postgres]] = [] - for i in range(1, 5): + for _ in range(1, 5): # Use a tiny checkpoint distance, to create a lot of layers quickly tenant, _ = env.neon_cli.create_tenant( conf={ diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py index da861bb9f3..6544681bb0 100644 --- a/test_runner/batch_others/test_wal_acceptor.py +++ b/test_runner/batch_others/test_wal_acceptor.py @@ -12,9 +12,8 @@ import uuid from contextlib import closing from dataclasses import dataclass, field -from multiprocessing import Process, Value from pathlib import Path -from fixtures.neon_fixtures import NeonPageserver, PgBin, Etcd, Postgres, RemoteStorageUsers, Safekeeper, NeonEnv, NeonEnvBuilder, PortDistributor, SafekeeperPort, neon_binpath, PgProtocol, wait_for_last_record_lsn, wait_for_upload +from fixtures.neon_fixtures import NeonPageserver, PgBin, Etcd, Postgres, RemoteStorageKind, RemoteStorageUsers, Safekeeper, NeonEnv, NeonEnvBuilder, PortDistributor, SafekeeperPort, available_remote_storages, neon_binpath, PgProtocol, wait_for_last_record_lsn, wait_for_upload from fixtures.utils import get_dir_size, lsn_to_hex, lsn_from_hex, query_scalar from fixtures.log_helper import log from typing import List, Optional, Any @@ -377,15 +376,15 @@ def wait_wal_trim(tenant_id, timeline_id, sk, target_size): time.sleep(0.5) -@pytest.mark.parametrize('storage_type', ['mock_s3', 'local_fs']) -def test_wal_backup(neon_env_builder: NeonEnvBuilder, storage_type: str): +@pytest.mark.parametrize('remote_storatge_kind', available_remote_storages()) +def test_wal_backup(neon_env_builder: NeonEnvBuilder, remote_storatge_kind: RemoteStorageKind): neon_env_builder.num_safekeepers = 3 - if storage_type == 'local_fs': - neon_env_builder.enable_local_fs_remote_storage() - elif storage_type == 'mock_s3': - neon_env_builder.enable_s3_mock_remote_storage('test_safekeepers_wal_backup') - else: - raise RuntimeError(f'Unknown storage type: {storage_type}') + + neon_env_builder.enable_remote_storage( + remote_storage_kind=remote_storatge_kind, + test_name='test_safekeepers_wal_backup', + ) + neon_env_builder.remote_storage_users = RemoteStorageUsers.SAFEKEEPER env = neon_env_builder.init_start() @@ -425,15 +424,15 @@ def test_wal_backup(neon_env_builder: NeonEnvBuilder, storage_type: str): wait_segment_offload(tenant_id, timeline_id, env.safekeepers[1], '0/5000000') -@pytest.mark.parametrize('storage_type', ['mock_s3', 'local_fs']) -def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, storage_type: str): +@pytest.mark.parametrize('remote_storatge_kind', available_remote_storages()) +def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storatge_kind: RemoteStorageKind): neon_env_builder.num_safekeepers = 3 - if storage_type == 'local_fs': - neon_env_builder.enable_local_fs_remote_storage() - elif storage_type == 'mock_s3': - neon_env_builder.enable_s3_mock_remote_storage('test_s3_wal_replay') - else: - raise RuntimeError(f'Unknown storage type: {storage_type}') + + neon_env_builder.enable_remote_storage( + remote_storage_kind=remote_storatge_kind, + test_name='test_s3_wal_replay', + ) + neon_env_builder.remote_storage_users = RemoteStorageUsers.SAFEKEEPER env = neon_env_builder.init_start() diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 6783ab710b..87a598b387 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -3,6 +3,7 @@ from __future__ import annotations from dataclasses import field from contextlib import contextmanager from enum import Flag, auto +import enum import textwrap from cached_property import cached_property import abc @@ -262,6 +263,11 @@ def default_broker(request: Any, port_distributor: PortDistributor): broker.stop() +@pytest.fixture(scope='session') +def run_id(): + yield uuid.uuid4() + + @pytest.fixture(scope='session') def mock_s3_server(port_distributor: PortDistributor): mock_s3_server = MockS3Server(port_distributor.get_port()) @@ -438,26 +444,43 @@ class MockS3Server: def secret_key(self) -> str: return 'test' - def access_env_vars(self) -> Dict[Any, Any]: - return { - 'AWS_ACCESS_KEY_ID': self.access_key(), - 'AWS_SECRET_ACCESS_KEY': self.secret_key(), - } - def kill(self): self.subprocess.kill() +@enum.unique +class RemoteStorageKind(enum.Enum): + LOCAL_FS = "local_fs" + MOCK_S3 = "mock_s3" + REAL_S3 = "real_s3" + + +def available_remote_storages() -> List[RemoteStorageKind]: + remote_storages = [RemoteStorageKind.LOCAL_FS, RemoteStorageKind.MOCK_S3] + if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE"): + remote_storages.append(RemoteStorageKind.REAL_S3) + return remote_storages + + @dataclass class LocalFsStorage: - local_path: Path + root: Path @dataclass class S3Storage: bucket_name: str bucket_region: str - endpoint: Optional[str] + access_key: str + secret_key: str + endpoint: Optional[str] = None + prefix_in_bucket: Optional[str] = None + + def access_env_vars(self) -> Dict[str, str]: + return { + 'AWS_ACCESS_KEY_ID': self.access_key, + 'AWS_SECRET_ACCESS_KEY': self.secret_key, + } RemoteStorage = Union[LocalFsStorage, S3Storage] @@ -466,16 +489,20 @@ RemoteStorage = Union[LocalFsStorage, S3Storage] # serialize as toml inline table def remote_storage_to_toml_inline_table(remote_storage): if isinstance(remote_storage, LocalFsStorage): - res = f"local_path='{remote_storage.local_path}'" + remote_storage_config = f"local_path='{remote_storage.root}'" elif isinstance(remote_storage, S3Storage): - res = f"bucket_name='{remote_storage.bucket_name}', bucket_region='{remote_storage.bucket_region}'" + remote_storage_config = f"bucket_name='{remote_storage.bucket_name}',\ + bucket_region='{remote_storage.bucket_region}'" + + if remote_storage.prefix_in_bucket is not None: + remote_storage_config += f",prefix_in_bucket='{remote_storage.prefix_in_bucket}'" + if remote_storage.endpoint is not None: - res += f", endpoint='{remote_storage.endpoint}'" - else: - raise Exception(f'Unknown storage configuration {remote_storage}') + remote_storage_config += f",endpoint='{remote_storage.endpoint}'" else: raise Exception("invalid remote storage type") - return f"{{{res}}}" + + return f"{{{remote_storage_config}}}" class RemoteStorageUsers(Flag): @@ -493,28 +520,31 @@ class NeonEnvBuilder: cleaned up after the test has finished. """ def __init__( - self, - repo_dir: Path, - port_distributor: PortDistributor, - broker: Etcd, - mock_s3_server: MockS3Server, - remote_storage: Optional[RemoteStorage] = None, - remote_storage_users: RemoteStorageUsers = RemoteStorageUsers.PAGESERVER, - pageserver_config_override: Optional[str] = None, - num_safekeepers: int = 1, - # Use non-standard SK ids to check for various parsing bugs - safekeepers_id_start: int = 0, - # fsync is disabled by default to make the tests go faster - safekeepers_enable_fsync: bool = False, - auth_enabled: bool = False, - rust_log_override: Optional[str] = None, - default_branch_name=DEFAULT_BRANCH_NAME): + self, + repo_dir: Path, + port_distributor: PortDistributor, + broker: Etcd, + run_id: uuid.UUID, + mock_s3_server: MockS3Server, + remote_storage: Optional[RemoteStorage] = None, + remote_storage_users: RemoteStorageUsers = RemoteStorageUsers.PAGESERVER, + pageserver_config_override: Optional[str] = None, + num_safekeepers: int = 1, + # Use non-standard SK ids to check for various parsing bugs + safekeepers_id_start: int = 0, + # fsync is disabled by default to make the tests go faster + safekeepers_enable_fsync: bool = False, + auth_enabled: bool = False, + rust_log_override: Optional[str] = None, + default_branch_name=DEFAULT_BRANCH_NAME, + ): self.repo_dir = repo_dir self.rust_log_override = rust_log_override self.port_distributor = port_distributor self.remote_storage = remote_storage self.remote_storage_users = remote_storage_users self.broker = broker + self.run_id = run_id self.mock_s3_server = mock_s3_server self.pageserver_config_override = pageserver_config_override self.num_safekeepers = num_safekeepers @@ -523,6 +553,8 @@ class NeonEnvBuilder: self.auth_enabled = auth_enabled self.default_branch_name = default_branch_name self.env: Optional[NeonEnv] = None + self.remote_storage_prefix: Optional[str] = None + self.keep_remote_storage_contents: bool = True def init(self) -> NeonEnv: # Cannot create more than one environment from one builder @@ -538,41 +570,142 @@ class NeonEnvBuilder: self.start() return env - """ - Sets up the pageserver to use the local fs at the `test_dir/local_fs_remote_storage` path. - Errors, if the pageserver has some remote storage configuration already, unless `force_enable` is not set to `True`. - """ + def enable_remote_storage( + self, + remote_storage_kind: RemoteStorageKind, + test_name: str, + force_enable: bool = True, + ): + if remote_storage_kind == RemoteStorageKind.LOCAL_FS: + self.enable_local_fs_remote_storage(force_enable=force_enable) + elif remote_storage_kind == RemoteStorageKind.MOCK_S3: + self.enable_mock_s3_remote_storage(bucket_name=test_name, force_enable=force_enable) + elif remote_storage_kind == RemoteStorageKind.REAL_S3: + self.enable_real_s3_remote_storage(test_name=test_name, force_enable=force_enable) + else: + raise RuntimeError(f'Unknown storage type: {remote_storage_kind}') def enable_local_fs_remote_storage(self, force_enable=True): + """ + Sets up the pageserver to use the local fs at the `test_dir/local_fs_remote_storage` path. + Errors, if the pageserver has some remote storage configuration already, unless `force_enable` is not set to `True`. + """ assert force_enable or self.remote_storage is None, "remote storage is enabled already" self.remote_storage = LocalFsStorage(Path(self.repo_dir / 'local_fs_remote_storage')) - """ - Sets up the pageserver to use the S3 mock server, creates the bucket, if it's not present already. - Starts up the mock server, if that does not run yet. - Errors, if the pageserver has some remote storage configuration already, unless `force_enable` is not set to `True`. - """ - - def enable_s3_mock_remote_storage(self, bucket_name: str, force_enable=True): + def enable_mock_s3_remote_storage(self, bucket_name: str, force_enable=True): + """ + Sets up the pageserver to use the S3 mock server, creates the bucket, if it's not present already. + Starts up the mock server, if that does not run yet. + Errors, if the pageserver has some remote storage configuration already, unless `force_enable` is not set to `True`. + """ assert force_enable or self.remote_storage is None, "remote storage is enabled already" mock_endpoint = self.mock_s3_server.endpoint() mock_region = self.mock_s3_server.region() - boto3.client( + + self.remote_storage_client = boto3.client( 's3', endpoint_url=mock_endpoint, region_name=mock_region, aws_access_key_id=self.mock_s3_server.access_key(), aws_secret_access_key=self.mock_s3_server.secret_key(), ).create_bucket(Bucket=bucket_name) + + self.remote_storage = S3Storage( + bucket_name=bucket_name, + endpoint=mock_endpoint, + bucket_region=mock_region, + access_key=self.mock_s3_server.access_key(), + secret_key=self.mock_s3_server.secret_key(), + ) + + def enable_real_s3_remote_storage(self, test_name: str, force_enable=True): + """ + Sets up configuration to use real s3 endpoint without mock server + """ + assert force_enable or self.remote_storage is None, "remote storage is enabled already" + + access_key = os.getenv("AWS_ACCESS_KEY_ID") + assert access_key, "no aws access key provided" + secret_key = os.getenv("AWS_SECRET_ACCESS_KEY") + assert secret_key, "no aws access key provided" + + # session token is needed for local runs with sso auth + session_token = os.getenv("AWS_SESSION_TOKEN") + + bucket_name = os.getenv("REMOTE_STORAGE_S3_BUCKET") + assert bucket_name, "no remote storage bucket name provided" + region = os.getenv("REMOTE_STORAGE_S3_REGION") + assert region, "no remote storage region provided" + + # do not leave data in real s3 + self.keep_remote_storage_contents = False + + # construct a prefix inside bucket for the particular test case and test run + self.remote_storage_prefix = f'{self.run_id}/{test_name}' + + self.remote_storage_client = boto3.client( + 's3', + region_name=region, + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + aws_session_token=session_token, + ) self.remote_storage = S3Storage(bucket_name=bucket_name, - endpoint=mock_endpoint, - bucket_region=mock_region) + bucket_region=region, + access_key=access_key, + secret_key=secret_key, + prefix_in_bucket=self.remote_storage_prefix) + + def cleanup_remote_storage(self): + # here wee check for true remote storage, no the local one + # local cleanup is not needed after test because in ci all env will be destroyed anyway + if self.remote_storage_prefix is None: + log.info("no remote storage was set up, skipping cleanup") + return + + if self.keep_remote_storage_contents: + log.info("keep_remote_storage_contents skipping remote storage cleanup") + return + + log.info("removing data from test s3 bucket %s by prefix %s", + self.remote_storage.bucket_name, + self.remote_storage_prefix) + paginator = self.remote_storage_client.get_paginator('list_objects_v2') + pages = paginator.paginate( + Bucket=self.remote_storage.bucket_name, + Prefix=self.remote_storage_prefix, + ) + + objects_to_delete = {'Objects': []} + cnt = 0 + for item in pages.search('Contents'): + # weirdly when nothing is found it returns [None] + if item is None: + break + + objects_to_delete['Objects'].append({'Key': item['Key']}) + + # flush once aws limit reached + if len(objects_to_delete['Objects']) >= 1000: + self.remote_storage_client.delete_objects( + Bucket=self.remote_storage.bucket_name, + Delete=objects_to_delete, + ) + objects_to_delete = dict(Objects=[]) + cnt += 1 + + # flush rest + if len(objects_to_delete['Objects']): + self.remote_storage_client.delete_objects(Bucket=self.remote_storage.bucket_name, + Delete=objects_to_delete) + + log.info("deleted %s objects from remote storage", cnt) def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): - # Stop all the nodes. if self.env: log.info('Cleaning up all storage and compute nodes') @@ -581,6 +714,8 @@ class NeonEnvBuilder: sk.stop(immediate=True) self.env.pageserver.stop(immediate=True) + self.cleanup_remote_storage() + class NeonEnv: """ @@ -713,10 +848,13 @@ class NeonEnv: @pytest.fixture(scope=shareable_scope) -def _shared_simple_env(request: Any, - port_distributor: PortDistributor, - mock_s3_server: MockS3Server, - default_broker: Etcd) -> Iterator[NeonEnv]: +def _shared_simple_env( + request: Any, + port_distributor: PortDistributor, + mock_s3_server: MockS3Server, + default_broker: Etcd, + run_id: uuid.UUID, +) -> Iterator[NeonEnv]: """ # Internal fixture backing the `neon_simple_env` fixture. If TEST_SHARED_FIXTURES is set, this is shared by all tests using `neon_simple_env`. @@ -730,8 +868,13 @@ def _shared_simple_env(request: Any, repo_dir = os.path.join(str(top_output_dir), "shared_repo") shutil.rmtree(repo_dir, ignore_errors=True) - with NeonEnvBuilder(Path(repo_dir), port_distributor, default_broker, - mock_s3_server) as builder: + with NeonEnvBuilder( + repo_dir=Path(repo_dir), + port_distributor=port_distributor, + broker=default_broker, + mock_s3_server=mock_s3_server, + run_id=run_id, + ) as builder: env = builder.init_start() # For convenience in tests, create a branch from the freshly-initialized cluster. @@ -756,10 +899,13 @@ def neon_simple_env(_shared_simple_env: NeonEnv) -> Iterator[NeonEnv]: @pytest.fixture(scope='function') -def neon_env_builder(test_output_dir, - port_distributor: PortDistributor, - mock_s3_server: MockS3Server, - default_broker: Etcd) -> Iterator[NeonEnvBuilder]: +def neon_env_builder( + test_output_dir, + port_distributor: PortDistributor, + mock_s3_server: MockS3Server, + default_broker: Etcd, + run_id: uuid.UUID, +) -> Iterator[NeonEnvBuilder]: """ Fixture to create a Neon environment for test. @@ -777,8 +923,13 @@ def neon_env_builder(test_output_dir, repo_dir = os.path.join(test_output_dir, "repo") # Return the builder to the caller - with NeonEnvBuilder(Path(repo_dir), port_distributor, default_broker, - mock_s3_server) as builder: + with NeonEnvBuilder( + repo_dir=Path(repo_dir), + port_distributor=port_distributor, + mock_s3_server=mock_s3_server, + broker=default_broker, + run_id=run_id, + ) as builder: yield builder @@ -1183,7 +1334,10 @@ class NeonCli(AbstractNeonCli): remote_storage_users=self.env.remote_storage_users, pageserver_config_override=self.env.pageserver.config_override) - s3_env_vars = self.env.s3_mock_server.access_env_vars() if self.env.s3_mock_server else None + s3_env_vars = None + if self.env.remote_storage is not None and isinstance(self.env.remote_storage, S3Storage): + s3_env_vars = self.env.remote_storage.access_env_vars() + return self.raw_cli(start_args, extra_env_vars=s3_env_vars) def pageserver_stop(self, immediate=False) -> 'subprocess.CompletedProcess[str]': @@ -1195,7 +1349,10 @@ class NeonCli(AbstractNeonCli): return self.raw_cli(cmd) def safekeeper_start(self, id: int) -> 'subprocess.CompletedProcess[str]': - s3_env_vars = self.env.s3_mock_server.access_env_vars() if self.env.s3_mock_server else None + s3_env_vars = None + if self.env.remote_storage is not None and isinstance(self.env.remote_storage, S3Storage): + s3_env_vars = self.env.remote_storage.access_env_vars() + return self.raw_cli(['safekeeper', 'start', str(id)], extra_env_vars=s3_env_vars) def safekeeper_stop(self, @@ -1337,7 +1494,7 @@ class NeonPageserver(PgProtocol): return self def __exit__(self, exc_type, exc, tb): - self.stop(True) + self.stop(immediate=True) def http_client(self, auth_token: Optional[str] = None) -> NeonPageserverHttpClient: return NeonPageserverHttpClient( @@ -1354,6 +1511,7 @@ def append_pageserver_param_overrides( ): if bool(remote_storage_users & RemoteStorageUsers.PAGESERVER) and remote_storage is not None: remote_storage_toml_table = remote_storage_to_toml_inline_table(remote_storage) + params_to_update.append( f'--pageserver-config-override=remote_storage={remote_storage_toml_table}') From bc2cb5382b3b7101d029a2b780d07ee81f898a97 Mon Sep 17 00:00:00 2001 From: Dmitry Rodionov Date: Wed, 27 Jul 2022 16:03:09 +0300 Subject: [PATCH 4/9] run real s3 tests in CI --- .../actions/run-python-test-set/action.yml | 32 ++++++++++++++++++- .github/workflows/build_and_test.yml | 6 +++- test_runner/fixtures/neon_fixtures.py | 8 +++-- 3 files changed, 42 insertions(+), 4 deletions(-) diff --git a/.github/actions/run-python-test-set/action.yml b/.github/actions/run-python-test-set/action.yml index c9987053ce..fcc8983e40 100644 --- a/.github/actions/run-python-test-set/action.yml +++ b/.github/actions/run-python-test-set/action.yml @@ -27,6 +27,26 @@ inputs: description: 'Whether to upload the performance report' required: false default: 'false' + run_with_real_s3: + description: 'Whether to pass real s3 credentials to the test suite' + required: false + default: 'false' + real_s3_bucket: + description: 'Bucket name for real s3 tests' + required: false + default: '' + real_s3_region: + description: 'Region name for real s3 tests' + required: false + default: '' + real_s3_access_key_id: + description: 'Access key id' + required: false + default: '' + real_s3_secret_access_key: + description: 'Secret access key' + required: false + default: '' runs: using: "composite" @@ -63,7 +83,9 @@ runs: # this variable will be embedded in perf test report # and is needed to distinguish different environments PLATFORM: github-actions-selfhosted - shell: bash -euxo pipefail {0} + AWS_ACCESS_KEY_ID: ${{ inputs.real_s3_access_key_id }} + AWS_SECRET_ACCESS_KEY: ${{ inputs.real_s3_secret_access_key }} + shell: bash -euxo pipefail {0} {0} run: | PERF_REPORT_DIR="$(realpath test_runner/perf-report-local)" rm -rf $PERF_REPORT_DIR @@ -77,6 +99,14 @@ runs: if [[ "${{ inputs.run_in_parallel }}" == "true" ]]; then EXTRA_PARAMS="-n4 $EXTRA_PARAMS" fi + + if [[ "${{ inputs.run_with_real_s3 }}" == "true" ]]; then + echo "REAL S3 ENABLED" + export ENABLE_REAL_S3_REMOTE_STORAGE=nonempty + export REMOTE_STORAGE_S3_BUCKET=${{ inputs.real_s3_bucket }} + export REMOTE_STORAGE_S3_REGION=${{ inputs.real_s3_region }} + fi + if [[ "${{ inputs.save_perf_report }}" == "true" ]]; then if [[ "$GITHUB_REF" == "refs/heads/main" ]]; then mkdir -p "$PERF_REPORT_DIR" diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index ec7579a0d1..4e784f0920 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -209,7 +209,11 @@ jobs: build_type: ${{ matrix.build_type }} rust_toolchain: ${{ matrix.rust_toolchain }} test_selection: batch_others - + run_with_real_s3: true + real_s3_bucket: ci-tests-s3 + real_s3_region: us-west-2 + real_s3_access_key_id: "${{ secrets.AWS_ACCESS_KEY_ID_CI_TESTS_S3 }}" + real_s3_secret_access_key: "${{ secrets.AWS_SECRET_ACCESS_KEY_CI_TESTS_S3 }}" - name: Merge and upload coverage data if: matrix.build_type == 'debug' uses: ./.github/actions/save-coverage-data diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 87a598b387..9b39bf2b39 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -457,8 +457,11 @@ class RemoteStorageKind(enum.Enum): def available_remote_storages() -> List[RemoteStorageKind]: remote_storages = [RemoteStorageKind.LOCAL_FS, RemoteStorageKind.MOCK_S3] - if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE"): + if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE") is not None: remote_storages.append(RemoteStorageKind.REAL_S3) + log.info("Enabling real s3 storage for tests") + else: + log.info("Using mock implementations to test remote storage") return remote_storages @@ -609,7 +612,8 @@ class NeonEnvBuilder: region_name=mock_region, aws_access_key_id=self.mock_s3_server.access_key(), aws_secret_access_key=self.mock_s3_server.secret_key(), - ).create_bucket(Bucket=bucket_name) + ) + self.remote_storage_client.create_bucket(Bucket=bucket_name) self.remote_storage = S3Storage( bucket_name=bucket_name, From 52ce1c9d5352d13ffedf6f2a0a261abe07785c3c Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Thu, 4 Aug 2022 12:57:15 +0300 Subject: [PATCH 5/9] Speed up test shutdown, by polling more frequently. A fair amount of the time in our python tests is spent waiting for the pageserver and safekeeper processes to shut down. It doesn't matter so much when you're running a lot of tests in parallel, but it's quite noticeable when running them sequentially. A big part of the slowness is that is that after sending the SIGTERM signal, we poll to see if the process is still running, and the polling happened at 1 s interval. Reduce it to 0.1 s. --- control_plane/src/safekeeper.rs | 10 ++++++---- control_plane/src/storage.rs | 10 ++++++---- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/control_plane/src/safekeeper.rs b/control_plane/src/safekeeper.rs index d87be95b82..0cae479d71 100644 --- a/control_plane/src/safekeeper.rs +++ b/control_plane/src/safekeeper.rs @@ -247,7 +247,7 @@ impl SafekeeperNode { // Shutting down may take a long time, // if safekeeper flushes a lot of data let mut tcp_stopped = false; - for _ in 0..100 { + for i in 0..600 { if !tcp_stopped { if let Err(err) = TcpStream::connect(&address) { tcp_stopped = true; @@ -272,9 +272,11 @@ impl SafekeeperNode { } } } - print!("."); - io::stdout().flush().unwrap(); - thread::sleep(Duration::from_secs(1)); + if i % 10 == 0 { + print!("."); + io::stdout().flush().unwrap(); + } + thread::sleep(Duration::from_millis(100)); } bail!("Failed to stop safekeeper with pid {}", pid); diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index 13d64a79f0..c2ed3fc824 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -318,7 +318,7 @@ impl PageServerNode { // Shutting down may take a long time, // if pageserver checkpoints a lot of data let mut tcp_stopped = false; - for _ in 0..100 { + for i in 0..600 { if !tcp_stopped { if let Err(err) = TcpStream::connect(&address) { tcp_stopped = true; @@ -344,9 +344,11 @@ impl PageServerNode { } } } - print!("."); - io::stdout().flush().unwrap(); - thread::sleep(Duration::from_secs(1)); + if i % 10 == 0 { + print!("."); + io::stdout().flush().unwrap(); + } + thread::sleep(Duration::from_millis(100)); } bail!("Failed to stop pageserver with pid {}", pid); From e54941b8118a81b069de5613c5ef60a93a5b51ef Mon Sep 17 00:00:00 2001 From: Dmitry Rodionov Date: Wed, 3 Aug 2022 12:25:20 +0300 Subject: [PATCH 6/9] treat pytest warnings as errors --- pytest.ini | 3 + .../batch_others/test_branch_and_gc.py | 2 + .../batch_others/test_pageserver_api.py | 90 ++++++++++--------- test_runner/fixtures/neon_fixtures.py | 14 +-- 4 files changed, 61 insertions(+), 48 deletions(-) diff --git a/pytest.ini b/pytest.ini index da9ab8c12f..104d0e0244 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,4 +1,7 @@ [pytest] +filterwarnings = + error::pytest.PytestUnhandledThreadExceptionWarning + error::UserWarning addopts = -m 'not remote_cluster' markers = diff --git a/test_runner/batch_others/test_branch_and_gc.py b/test_runner/batch_others/test_branch_and_gc.py index 76a77357ae..8e433f65ad 100644 --- a/test_runner/batch_others/test_branch_and_gc.py +++ b/test_runner/batch_others/test_branch_and_gc.py @@ -167,3 +167,5 @@ def test_branch_creation_before_gc(neon_simple_env: NeonEnv): # The starting LSN is invalid as the corresponding record is scheduled to be removed by in-queue GC. with pytest.raises(Exception, match="invalid branch start lsn"): env.neon_cli.create_branch('b1', 'b0', tenant_id=tenant, ancestor_start_lsn=lsn) + + thread.join() diff --git a/test_runner/batch_others/test_pageserver_api.py b/test_runner/batch_others/test_pageserver_api.py index 95791888a5..51df41699a 100644 --- a/test_runner/batch_others/test_pageserver_api.py +++ b/test_runner/batch_others/test_pageserver_api.py @@ -60,17 +60,38 @@ def check_client(client: NeonPageserverHttpClient, initial_tenant: UUID): def test_pageserver_http_get_wal_receiver_not_found(neon_simple_env: NeonEnv): env = neon_simple_env - client = env.pageserver.http_client() + with env.pageserver.http_client() as client: + tenant_id, timeline_id = env.neon_cli.create_tenant() - tenant_id, timeline_id = env.neon_cli.create_tenant() + timeline_details = client.timeline_detail(tenant_id=tenant_id, + timeline_id=timeline_id, + include_non_incremental_logical_size=True) - timeline_details = client.timeline_detail(tenant_id=tenant_id, - timeline_id=timeline_id, - include_non_incremental_logical_size=True) + assert timeline_details.get('wal_source_connstr') is None, 'Should not be able to connect to WAL streaming without PG compute node running' + assert timeline_details.get('last_received_msg_lsn') is None, 'Should not be able to connect to WAL streaming without PG compute node running' + assert timeline_details.get('last_received_msg_ts') is None, 'Should not be able to connect to WAL streaming without PG compute node running' - assert timeline_details.get('wal_source_connstr') is None, 'Should not be able to connect to WAL streaming without PG compute node running' - assert timeline_details.get('last_received_msg_lsn') is None, 'Should not be able to connect to WAL streaming without PG compute node running' - assert timeline_details.get('last_received_msg_ts') is None, 'Should not be able to connect to WAL streaming without PG compute node running' + +def expect_updated_msg_lsn(client: NeonPageserverHttpClient, + tenant_id: UUID, + timeline_id: UUID, + prev_msg_lsn: Optional[int]) -> int: + timeline_details = client.timeline_detail(tenant_id, timeline_id=timeline_id) + + # a successful `timeline_details` response must contain the below fields + local_timeline_details = timeline_details['local'] + assert "wal_source_connstr" in local_timeline_details.keys() + assert "last_received_msg_lsn" in local_timeline_details.keys() + assert "last_received_msg_ts" in local_timeline_details.keys() + + assert local_timeline_details["last_received_msg_lsn"] is not None, "the last received message's LSN is empty" + + last_msg_lsn = lsn_from_hex(local_timeline_details["last_received_msg_lsn"]) + assert prev_msg_lsn is None or prev_msg_lsn < last_msg_lsn, \ + f"the last received message's LSN {last_msg_lsn} hasn't been updated \ + compared to the previous message's LSN {prev_msg_lsn}" + + return last_msg_lsn # Test the WAL-receiver related fields in the response to `timeline_details` API call @@ -79,44 +100,29 @@ def test_pageserver_http_get_wal_receiver_not_found(neon_simple_env: NeonEnv): # `timeline_details` now. def test_pageserver_http_get_wal_receiver_success(neon_simple_env: NeonEnv): env = neon_simple_env - client = env.pageserver.http_client() + with env.pageserver.http_client() as client: + tenant_id, timeline_id = env.neon_cli.create_tenant() + pg = env.postgres.create_start(DEFAULT_BRANCH_NAME, tenant_id=tenant_id) - tenant_id, timeline_id = env.neon_cli.create_tenant() - pg = env.postgres.create_start(DEFAULT_BRANCH_NAME, tenant_id=tenant_id) + # Wait to make sure that we get a latest WAL receiver data. + # We need to wait here because it's possible that we don't have access to + # the latest WAL yet, when the `timeline_detail` API is first called. + # See: https://github.com/neondatabase/neon/issues/1768. + lsn = wait_until(number_of_iterations=5, + interval=1, + func=lambda: expect_updated_msg_lsn(client, tenant_id, timeline_id, None)) - def expect_updated_msg_lsn(prev_msg_lsn: Optional[int]) -> int: - timeline_details = client.timeline_detail(tenant_id, timeline_id=timeline_id) - - # a successful `timeline_details` response must contain the below fields - local_timeline_details = timeline_details['local'] - assert "wal_source_connstr" in local_timeline_details.keys() - assert "last_received_msg_lsn" in local_timeline_details.keys() - assert "last_received_msg_ts" in local_timeline_details.keys() - - assert local_timeline_details["last_received_msg_lsn"] is not None, "the last received message's LSN is empty" - - last_msg_lsn = lsn_from_hex(local_timeline_details["last_received_msg_lsn"]) - assert prev_msg_lsn is None or prev_msg_lsn < last_msg_lsn, \ - f"the last received message's LSN {last_msg_lsn} hasn't been updated \ - compared to the previous message's LSN {prev_msg_lsn}" - - return last_msg_lsn - - # Wait to make sure that we get a latest WAL receiver data. - # We need to wait here because it's possible that we don't have access to - # the latest WAL yet, when the `timeline_detail` API is first called. - # See: https://github.com/neondatabase/neon/issues/1768. - lsn = wait_until(number_of_iterations=5, interval=1, func=lambda: expect_updated_msg_lsn(None)) - - # Make a DB modification then expect getting a new WAL receiver's data. - pg.safe_psql("CREATE TABLE t(key int primary key, value text)") - wait_until(number_of_iterations=5, interval=1, func=lambda: expect_updated_msg_lsn(lsn)) + # Make a DB modification then expect getting a new WAL receiver's data. + pg.safe_psql("CREATE TABLE t(key int primary key, value text)") + wait_until(number_of_iterations=5, + interval=1, + func=lambda: expect_updated_msg_lsn(client, tenant_id, timeline_id, lsn)) def test_pageserver_http_api_client(neon_simple_env: NeonEnv): env = neon_simple_env - client = env.pageserver.http_client() - check_client(client, env.initial_tenant) + with env.pageserver.http_client() as client: + check_client(client, env.initial_tenant) def test_pageserver_http_api_client_auth_enabled(neon_env_builder: NeonEnvBuilder): @@ -125,5 +131,5 @@ def test_pageserver_http_api_client_auth_enabled(neon_env_builder: NeonEnvBuilde management_token = env.auth_keys.generate_management_token() - client = env.pageserver.http_client(auth_token=management_token) - check_client(client, env.initial_tenant) + with env.pageserver.http_client(auth_token=management_token) as client: + check_client(client, env.initial_tenant) diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 9b39bf2b39..3848aee05a 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -222,7 +222,7 @@ def can_bind(host: str, port: int) -> bool: # moment. If that changes, we should use start using SO_REUSEADDR here # too, to allow reusing ports more quickly. # See https://github.com/neondatabase/neon/issues/801 - #sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + # sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: sock.bind((host, port)) @@ -231,6 +231,8 @@ def can_bind(host: str, port: int) -> bool: except socket.error: log.info(f"Port {port} is in use, skipping") return False + finally: + sock.close() class PortDistributor: @@ -2022,8 +2024,8 @@ class Safekeeper: started_at = time.time() while True: try: - http_cli = self.http_client() - http_cli.check_status() + with self.http_client() as http_cli: + http_cli.check_status() except Exception as e: elapsed = time.time() - started_at if elapsed > 3: @@ -2174,9 +2176,9 @@ class Etcd: return f'http://127.0.0.1:{self.port}' def check_status(self): - s = requests.Session() - s.mount('http://', requests.adapters.HTTPAdapter(max_retries=1)) # do not retry - s.get(f"{self.client_url()}/health").raise_for_status() + with requests.Session() as s: + s.mount('http://', requests.adapters.HTTPAdapter(max_retries=1)) # do not retry + s.get(f"{self.client_url()}/health").raise_for_status() def try_start(self): if self.handle is not None: From f7d8db7e3990a3c151859b39ef103d124a61453f Mon Sep 17 00:00:00 2001 From: Dmitry Rodionov Date: Wed, 3 Aug 2022 17:40:01 +0300 Subject: [PATCH 7/9] silence https://github.com/neondatabase/neon/issues/2211 --- test_runner/batch_others/test_tenant_detach.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/test_runner/batch_others/test_tenant_detach.py b/test_runner/batch_others/test_tenant_detach.py index 2df5409b4f..afc4f89bbf 100644 --- a/test_runner/batch_others/test_tenant_detach.py +++ b/test_runner/batch_others/test_tenant_detach.py @@ -1,10 +1,19 @@ from threading import Thread from uuid import uuid4 +import uuid import psycopg2 import pytest from fixtures.log_helper import log -from fixtures.neon_fixtures import NeonEnvBuilder, NeonPageserverApiException +from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, NeonPageserverApiException + + +def do_gc_target(env: NeonEnv, tenant_id: uuid.UUID, timeline_id: uuid.UUID): + """Hack to unblock main, see https://github.com/neondatabase/neon/issues/2211""" + try: + env.pageserver.safe_psql(f'do_gc {tenant_id.hex} {timeline_id.hex} 0') + except Exception as e: + log.error("do_gc failed: %s", e) def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder): @@ -36,8 +45,7 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder): env.pageserver.safe_psql(f'do_gc {tenant_id.hex} {uuid4().hex} 0') # try to concurrently run gc and detach - gc_thread = Thread( - target=lambda: env.pageserver.safe_psql(f'do_gc {tenant_id.hex} {timeline_id.hex} 0'), ) + gc_thread = Thread(target=lambda: do_gc_target(env, tenant_id, timeline_id)) gc_thread.start() last_error = None From 1bbc8090f3d7b750d8f87bd79664bade2f7bc345 Mon Sep 17 00:00:00 2001 From: Vadim Kharitonov Date: Mon, 1 Aug 2022 12:53:52 +0200 Subject: [PATCH 8/9] [issue #1591] Add `neon_local pageserver status` handler --- neon_local/src/main.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/neon_local/src/main.rs b/neon_local/src/main.rs index 24b40b72d6..c4dd52e183 100644 --- a/neon_local/src/main.rs +++ b/neon_local/src/main.rs @@ -910,6 +910,15 @@ fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul exit(1); } } + + Some(("status", _)) => match PageServerNode::from_env(env).check_status() { + Ok(_) => println!("Page server is up and running"), + Err(err) => { + eprintln!("Page server is not available: {}", err); + exit(1); + } + }, + Some((sub_name, _)) => bail!("Unexpected pageserver subcommand '{}'", sub_name), None => bail!("no pageserver subcommand provided"), } From 0a958b0ea1da2cf9f097cdb45b6e1740fccc4484 Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Thu, 4 Aug 2022 11:39:52 +0000 Subject: [PATCH 9/9] Check find_end_of_wal errors instead of unwrap --- libs/postgres_ffi/src/xlog_utils.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/libs/postgres_ffi/src/xlog_utils.rs b/libs/postgres_ffi/src/xlog_utils.rs index 520870cc53..8cdfd92fc1 100644 --- a/libs/postgres_ffi/src/xlog_utils.rs +++ b/libs/postgres_ffi/src/xlog_utils.rs @@ -16,7 +16,7 @@ use crate::XLogRecord; use crate::XLOG_PAGE_MAGIC; use crate::pg_constants::WAL_SEGMENT_SIZE; -use anyhow::{bail, ensure}; +use anyhow::{anyhow, bail, ensure}; use byteorder::{ByteOrder, LittleEndian}; use bytes::BytesMut; use bytes::{Buf, Bytes}; @@ -159,7 +159,7 @@ fn find_end_of_wal_segment( let mut buf = [0u8; XLOG_BLCKSZ]; let file_name = XLogFileName(tli, segno, wal_seg_size); let mut last_valid_rec_pos: usize = start_offset; // assume at given start_offset begins new record - let mut file = File::open(data_dir.join(file_name.clone() + ".partial")).unwrap(); + let mut file = File::open(data_dir.join(file_name.clone() + ".partial"))?; file.seek(SeekFrom::Start(offs as u64))?; // xl_crc is the last field in XLogRecord, will not be read into rec_hdr const_assert!(XLOG_RECORD_CRC_OFFS + 4 == XLOG_SIZE_OF_XLOG_RECORD); @@ -396,10 +396,13 @@ pub fn find_end_of_wal( let mut high_tli: TimeLineID = 0; let mut high_ispartial = false; - for entry in fs::read_dir(data_dir).unwrap().flatten() { + for entry in fs::read_dir(data_dir)?.flatten() { let ispartial: bool; let entry_name = entry.file_name(); - let fname = entry_name.to_str().unwrap(); + let fname = entry_name + .to_str() + .ok_or_else(|| anyhow!("Invalid file name"))?; + /* * Check if the filename looks like an xlog file, or a .partial file. */ @@ -411,7 +414,7 @@ pub fn find_end_of_wal( continue; } let (segno, tli) = XLogFromFileName(fname, wal_seg_size); - if !ispartial && entry.metadata().unwrap().len() != wal_seg_size as u64 { + if !ispartial && entry.metadata()?.len() != wal_seg_size as u64 { continue; } if segno > high_segno