diff --git a/.github/workflows/benchmarking.yml b/.github/workflows/benchmarking.yml index f99a037489..a4a597acde 100644 --- a/.github/workflows/benchmarking.yml +++ b/.github/workflows/benchmarking.yml @@ -222,13 +222,20 @@ jobs: id: create-allure-report if: ${{ !cancelled() }} uses: ./.github/actions/allure-report-generate + with: + store-test-results-into-db: true + env: + REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }} - name: Post to a Slack channel if: ${{ github.event.schedule && failure() }} uses: slackapi/slack-github-action@v1 with: - channel-id: "C033QLM5P7D" # dev-staging-stream - slack-message: "Periodic replication testing: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}" + channel-id: "C06T9AMNDQQ" # on-call-compute-staging-stream + slack-message: | + Periodic replication testing: ${{ job.status }} + <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run> + <${{ steps.create-allure-report.outputs.report-url }}|Allure report> env: SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }} @@ -330,7 +337,7 @@ jobs: prepare_AWS_RDS_databases: uses: ./.github/workflows/_benchmarking_preparation.yml secrets: inherit - + pgbench-compare: if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }} needs: [ generate-matrices, prepare_AWS_RDS_databases ] diff --git a/.github/workflows/label-for-external-users.yml b/.github/workflows/label-for-external-users.yml index 7cf5ee254c..585d118dfb 100644 --- a/.github/workflows/label-for-external-users.yml +++ b/.github/workflows/label-for-external-users.yml @@ -4,7 +4,7 @@ on: issues: types: - opened - pull_request: + pull_request_target: types: - opened @@ -25,7 +25,7 @@ jobs: - name: Check whether `${{ github.actor }}` is a member of `${{ github.repository_owner }}` id: check-user env: - GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }} run: | if gh api -H "Accept: application/vnd.github+json" -H "X-GitHub-Api-Version: 2022-11-28" "/orgs/${GITHUB_REPOSITORY_OWNER}/members/${GITHUB_ACTOR}"; then is_member=true @@ -45,10 +45,10 @@ jobs: issues: write # for `gh issue edit` steps: - - name: Label new ${{ github.event_name }} + - name: Add `${{ env.LABEL }}` label env: GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} - ITEM_NUMBER: ${{ github.event[github.event_name == 'pull_request' && 'pull_request' || 'issue'].number }} - GH_CLI_COMMAND: ${{ github.event_name == 'pull_request' && 'pr' || 'issue' }} + ITEM_NUMBER: ${{ github.event[github.event_name == 'pull_request_target' && 'pull_request' || 'issue'].number }} + GH_CLI_COMMAND: ${{ github.event_name == 'pull_request_target' && 'pr' || 'issue' }} run: | gh ${GH_CLI_COMMAND} --repo ${GITHUB_REPOSITORY} edit --add-label=${LABEL} ${ITEM_NUMBER} diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index b4d908b130..01e77fa1b1 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1645,6 +1645,20 @@ impl Timeline { self.last_record_lsn.shutdown(); if try_freeze_and_flush { + if let Some((open, frozen)) = self + .layers + .read() + .await + .layer_map() + .map(|lm| (lm.open_layer.is_some(), lm.frozen_layers.len())) + .ok() + .filter(|(open, frozen)| *open || *frozen > 0) + { + tracing::info!(?open, frozen, "flushing and freezing on shutdown"); + } else { + // this is double-shutdown, ignore it + } + // we shut down walreceiver above, so, we won't add anything more // to the InMemoryLayer; freeze it and wait for all frozen layers // to reach the disk & upload queue, then shut the upload queue and diff --git a/test_runner/performance/test_logical_replication.py b/test_runner/performance/test_logical_replication.py index 4b4ffc1fee..c4e42a7834 100644 --- a/test_runner/performance/test_logical_replication.py +++ b/test_runner/performance/test_logical_replication.py @@ -262,3 +262,85 @@ def test_publisher_restart( sub_workload.terminate() finally: pub_workload.terminate() + + +@pytest.mark.remote_cluster +@pytest.mark.timeout(2 * 60 * 60) +def test_snap_files( + pg_bin: PgBin, + benchmark_project_pub: NeonApiEndpoint, + zenbenchmark: NeonBenchmarker, +): + """ + Creates a node with a replication slot. Generates pgbench into the replication slot, + then runs pgbench inserts while generating large numbers of snapfiles. Then restarts + the node and tries to peek the replication changes. + """ + test_duration_min = 60 + test_interval_min = 5 + pgbench_duration = f"-T{test_duration_min * 60 * 2}" + + env = benchmark_project_pub.pgbench_env + connstr = benchmark_project_pub.connstr + pg_bin.run_capture(["pgbench", "-i", "-s100"], env=env) + + with psycopg2.connect(connstr) as conn: + conn.autocommit = True + with conn.cursor() as cur: + cur.execute("SELECT rolsuper FROM pg_roles WHERE rolname = 'neondb_owner'") + is_super = cur.fetchall()[0] + assert is_super, "This benchmark won't work if we don't have superuser" + + conn = psycopg2.connect(connstr) + conn.autocommit = True + cur = conn.cursor() + cur.execute("ALTER SYSTEM SET neon.logical_replication_max_snap_files = -1") + + with psycopg2.connect(connstr) as conn: + conn.autocommit = True + with conn.cursor() as cur: + cur.execute("SELECT pg_reload_conf()") + + with psycopg2.connect(connstr) as conn: + conn.autocommit = True + with conn.cursor() as cur: + cur.execute( + """ + DO $$ + BEGIN + IF EXISTS ( + SELECT 1 + FROM pg_replication_slots + WHERE slot_name = 'slotter' + ) THEN + PERFORM pg_drop_replication_slot('slotter'); + END IF; + END $$; + """ + ) + cur.execute("SELECT pg_create_logical_replication_slot('slotter', 'test_decoding')") + + workload = pg_bin.run_nonblocking(["pgbench", "-c10", pgbench_duration, "-Mprepared"], env=env) + try: + start = time.time() + prev_measurement = time.time() + while time.time() - start < test_duration_min * 60: + with psycopg2.connect(connstr) as conn: + with conn.cursor() as cur: + cur.execute( + "SELECT count(*) FROM (SELECT pg_log_standby_snapshot() FROM generate_series(1, 10000) g) s" + ) + check_pgbench_still_running(workload) + cur.execute( + "SELECT pg_replication_slot_advance('slotter', pg_current_wal_lsn())" + ) + + # Measure storage + if time.time() - prev_measurement > test_interval_min * 60: + storage = benchmark_project_pub.get_synthetic_storage_size() + zenbenchmark.record("storage", storage, "B", MetricReport.LOWER_IS_BETTER) + prev_measurement = time.time() + time.sleep(test_interval_min * 60 / 3) + + finally: + workload.terminate()