mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-26 09:30:37 +00:00
Compare commits
2 Commits
release-70
...
erik/durab
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ab34e34ba9 | ||
|
|
d6ab04b8e2 |
36
.github/actions/set-docker-config-dir/action.yml
vendored
Normal file
36
.github/actions/set-docker-config-dir/action.yml
vendored
Normal file
@@ -0,0 +1,36 @@
|
||||
name: "Set custom docker config directory"
|
||||
description: "Create a directory for docker config and set DOCKER_CONFIG"
|
||||
|
||||
# Use custom DOCKER_CONFIG directory to avoid conflicts with default settings
|
||||
runs:
|
||||
using: "composite"
|
||||
steps:
|
||||
- name: Show warning on GitHub-hosted runners
|
||||
if: runner.environment == 'github-hosted'
|
||||
shell: bash -euo pipefail {0}
|
||||
run: |
|
||||
# Using the following environment variables to find a path to the workflow file
|
||||
# ${GITHUB_WORKFLOW_REF} - octocat/hello-world/.github/workflows/my-workflow.yml@refs/heads/my_branch
|
||||
# ${GITHUB_REPOSITORY} - octocat/hello-world
|
||||
# ${GITHUB_REF} - refs/heads/my_branch
|
||||
# From https://docs.github.com/en/actions/writing-workflows/choosing-what-your-workflow-does/variables
|
||||
|
||||
filename_with_ref=${GITHUB_WORKFLOW_REF#"$GITHUB_REPOSITORY/"}
|
||||
filename=${filename_with_ref%"@$GITHUB_REF"}
|
||||
|
||||
# https://docs.github.com/en/actions/writing-workflows/choosing-what-your-workflow-does/workflow-commands-for-github-actions#setting-a-warning-message
|
||||
title='Unnecessary usage of `.github/actions/set-docker-config-dir`'
|
||||
message='No need to use `.github/actions/set-docker-config-dir` action on GitHub-hosted runners'
|
||||
echo "::warning file=${filename},title=${title}::${message}"
|
||||
|
||||
- uses: pyTooling/Actions/with-post-step@74afc5a42a17a046c90c68cb5cfa627e5c6c5b6b # v1.0.7
|
||||
env:
|
||||
DOCKER_CONFIG: .docker-custom-${{ github.run_id }}-${{ github.run_attempt }}
|
||||
with:
|
||||
main: |
|
||||
mkdir -p "${DOCKER_CONFIG}"
|
||||
echo DOCKER_CONFIG=${DOCKER_CONFIG} | tee -a $GITHUB_ENV
|
||||
post: |
|
||||
if [ -d "${DOCKER_CONFIG}" ]; then
|
||||
rm -r "${DOCKER_CONFIG}"
|
||||
fi
|
||||
37
.github/workflows/_check-codestyle-python.yml
vendored
37
.github/workflows/_check-codestyle-python.yml
vendored
@@ -1,37 +0,0 @@
|
||||
name: Check Codestyle Python
|
||||
|
||||
on:
|
||||
workflow_call:
|
||||
inputs:
|
||||
build-tools-image:
|
||||
description: 'build-tools image'
|
||||
required: true
|
||||
type: string
|
||||
|
||||
defaults:
|
||||
run:
|
||||
shell: bash -euxo pipefail {0}
|
||||
|
||||
jobs:
|
||||
check-codestyle-python:
|
||||
runs-on: [ self-hosted, small ]
|
||||
container:
|
||||
image: ${{ inputs.build-tools-image }}
|
||||
credentials:
|
||||
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
|
||||
options: --init
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- uses: actions/cache@v4
|
||||
with:
|
||||
path: ~/.cache/pypoetry/virtualenvs
|
||||
key: v2-${{ runner.os }}-${{ runner.arch }}-python-deps-bookworm-${{ hashFiles('poetry.lock') }}
|
||||
|
||||
- run: ./scripts/pysync
|
||||
|
||||
- run: poetry run ruff check .
|
||||
- run: poetry run ruff format --check .
|
||||
- run: poetry run mypy .
|
||||
@@ -64,7 +64,7 @@ jobs:
|
||||
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- uses: neondatabase/dev-actions/set-docker-config-dir@6094485bf440001c94a94a3f9e221e81ff6b6193
|
||||
- uses: ./.github/actions/set-docker-config-dir
|
||||
- uses: docker/setup-buildx-action@v3
|
||||
with:
|
||||
cache-binary: false
|
||||
|
||||
42
.github/workflows/build_and_test.yml
vendored
42
.github/workflows/build_and_test.yml
vendored
@@ -90,10 +90,35 @@ jobs:
|
||||
|
||||
check-codestyle-python:
|
||||
needs: [ check-permissions, build-build-tools-image ]
|
||||
uses: ./.github/workflows/_check-codestyle-python.yml
|
||||
with:
|
||||
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
|
||||
secrets: inherit
|
||||
runs-on: [ self-hosted, small ]
|
||||
container:
|
||||
image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
|
||||
credentials:
|
||||
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
|
||||
options: --init
|
||||
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Cache poetry deps
|
||||
uses: actions/cache@v4
|
||||
with:
|
||||
path: ~/.cache/pypoetry/virtualenvs
|
||||
key: v2-${{ runner.os }}-${{ runner.arch }}-python-deps-bookworm-${{ hashFiles('poetry.lock') }}
|
||||
|
||||
- name: Install Python deps
|
||||
run: ./scripts/pysync
|
||||
|
||||
- name: Run `ruff check` to ensure code format
|
||||
run: poetry run ruff check .
|
||||
|
||||
- name: Run `ruff format` to ensure code format
|
||||
run: poetry run ruff format --check .
|
||||
|
||||
- name: Run mypy to check types
|
||||
run: poetry run mypy .
|
||||
|
||||
check-codestyle-jsonnet:
|
||||
needs: [ check-permissions, build-build-tools-image ]
|
||||
@@ -116,7 +141,6 @@ jobs:
|
||||
# Check that the vendor/postgres-* submodules point to the
|
||||
# corresponding REL_*_STABLE_neon branches.
|
||||
check-submodules:
|
||||
needs: [ check-permissions ]
|
||||
runs-on: ubuntu-22.04
|
||||
steps:
|
||||
- name: Checkout
|
||||
@@ -528,7 +552,7 @@ jobs:
|
||||
with:
|
||||
submodules: true
|
||||
|
||||
- uses: neondatabase/dev-actions/set-docker-config-dir@6094485bf440001c94a94a3f9e221e81ff6b6193
|
||||
- uses: ./.github/actions/set-docker-config-dir
|
||||
- uses: docker/setup-buildx-action@v3
|
||||
with:
|
||||
cache-binary: false
|
||||
@@ -619,7 +643,7 @@ jobs:
|
||||
with:
|
||||
submodules: true
|
||||
|
||||
- uses: neondatabase/dev-actions/set-docker-config-dir@6094485bf440001c94a94a3f9e221e81ff6b6193
|
||||
- uses: ./.github/actions/set-docker-config-dir
|
||||
- uses: docker/setup-buildx-action@v3
|
||||
with:
|
||||
cache-binary: false
|
||||
@@ -800,7 +824,7 @@ jobs:
|
||||
curl -fL https://github.com/neondatabase/autoscaling/releases/download/$VM_BUILDER_VERSION/vm-builder -o vm-builder
|
||||
chmod +x vm-builder
|
||||
|
||||
- uses: neondatabase/dev-actions/set-docker-config-dir@6094485bf440001c94a94a3f9e221e81ff6b6193
|
||||
- uses: ./.github/actions/set-docker-config-dir
|
||||
- uses: docker/login-action@v3
|
||||
with:
|
||||
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
|
||||
@@ -836,7 +860,7 @@ jobs:
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
- uses: neondatabase/dev-actions/set-docker-config-dir@6094485bf440001c94a94a3f9e221e81ff6b6193
|
||||
- uses: ./.github/actions/set-docker-config-dir
|
||||
- uses: docker/login-action@v3
|
||||
with:
|
||||
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
|
||||
|
||||
94
.github/workflows/pre-merge-checks.yml
vendored
94
.github/workflows/pre-merge-checks.yml
vendored
@@ -1,94 +0,0 @@
|
||||
name: Pre-merge checks
|
||||
|
||||
on:
|
||||
merge_group:
|
||||
branches:
|
||||
- main
|
||||
|
||||
defaults:
|
||||
run:
|
||||
shell: bash -euxo pipefail {0}
|
||||
|
||||
# No permission for GITHUB_TOKEN by default; the **minimal required** set of permissions should be granted in each job.
|
||||
permissions: {}
|
||||
|
||||
jobs:
|
||||
get-changed-files:
|
||||
runs-on: ubuntu-22.04
|
||||
outputs:
|
||||
python-changed: ${{ steps.python-src.outputs.any_changed }}
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: tj-actions/changed-files@4edd678ac3f81e2dc578756871e4d00c19191daf # v45.0.4
|
||||
id: python-src
|
||||
with:
|
||||
files: |
|
||||
.github/workflows/pre-merge-checks.yml
|
||||
**/**.py
|
||||
poetry.lock
|
||||
pyproject.toml
|
||||
|
||||
- name: PRINT ALL CHANGED FILES FOR DEBUG PURPOSES
|
||||
env:
|
||||
PYTHON_CHANGED_FILES: ${{ steps.python-src.outputs.all_changed_files }}
|
||||
run: |
|
||||
echo "${PYTHON_CHANGED_FILES}"
|
||||
|
||||
check-build-tools-image:
|
||||
if: needs.get-changed-files.outputs.python-changed == 'true'
|
||||
needs: [ get-changed-files ]
|
||||
uses: ./.github/workflows/check-build-tools-image.yml
|
||||
|
||||
build-build-tools-image:
|
||||
needs: [ check-build-tools-image ]
|
||||
uses: ./.github/workflows/build-build-tools-image.yml
|
||||
with:
|
||||
image-tag: ${{ needs.check-build-tools-image.outputs.image-tag }}
|
||||
secrets: inherit
|
||||
|
||||
check-codestyle-python:
|
||||
if: needs.get-changed-files.outputs.python-changed == 'true'
|
||||
needs: [ get-changed-files, build-build-tools-image ]
|
||||
uses: ./.github/workflows/_check-codestyle-python.yml
|
||||
with:
|
||||
build-tools-image: ${{ needs.build-build-tools-image.outputs.image }}-bookworm
|
||||
secrets: inherit
|
||||
|
||||
# To get items from the merge queue merged into main we need to satisfy "Status checks that are required".
|
||||
# Currently we require 2 jobs (checks with exact name):
|
||||
# - conclusion
|
||||
# - neon-cloud-e2e
|
||||
conclusion:
|
||||
if: always()
|
||||
permissions:
|
||||
statuses: write # for `github.repos.createCommitStatus(...)`
|
||||
needs:
|
||||
- get-changed-files
|
||||
- check-codestyle-python
|
||||
runs-on: ubuntu-22.04
|
||||
steps:
|
||||
- name: Create fake `neon-cloud-e2e` check
|
||||
uses: actions/github-script@v7
|
||||
with:
|
||||
# Retry script for 5XX server errors: https://github.com/actions/github-script#retries
|
||||
retries: 5
|
||||
script: |
|
||||
const { repo, owner } = context.repo;
|
||||
const targetUrl = `${context.serverUrl}/${owner}/${repo}/actions/runs/${context.runId}`;
|
||||
|
||||
await github.rest.repos.createCommitStatus({
|
||||
owner: owner,
|
||||
repo: repo,
|
||||
sha: context.sha,
|
||||
context: `neon-cloud-e2e`,
|
||||
state: `success`,
|
||||
target_url: targetUrl,
|
||||
description: `fake check for merge queue`,
|
||||
});
|
||||
|
||||
- name: Fail the job if any of the dependencies do not succeed or skipped
|
||||
run: exit 1
|
||||
if: |
|
||||
(contains(needs.check-codestyle-python.result, 'skipped') && needs.get-changed-files.outputs.python-changed == 'true')
|
||||
|| contains(needs.*.result, 'failure')
|
||||
|| contains(needs.*.result, 'cancelled')
|
||||
1
.github/workflows/report-workflow-stats.yml
vendored
1
.github/workflows/report-workflow-stats.yml
vendored
@@ -23,7 +23,6 @@ on:
|
||||
- Test Postgres client libraries
|
||||
- Trigger E2E Tests
|
||||
- cleanup caches by a branch
|
||||
- Pre-merge checks
|
||||
types: [completed]
|
||||
|
||||
jobs:
|
||||
|
||||
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -4743,7 +4743,6 @@ dependencies = [
|
||||
"percent-encoding",
|
||||
"pin-project-lite",
|
||||
"rustls 0.22.4",
|
||||
"rustls-native-certs 0.7.0",
|
||||
"rustls-pemfile 2.1.1",
|
||||
"rustls-pki-types",
|
||||
"serde",
|
||||
|
||||
@@ -1,66 +1,12 @@
|
||||
ARG DEBIAN_VERSION=bullseye
|
||||
|
||||
FROM debian:bookworm-slim AS pgcopydb_builder
|
||||
ARG DEBIAN_VERSION
|
||||
|
||||
RUN if [ "${DEBIAN_VERSION}" = "bookworm" ]; then \
|
||||
set -e && \
|
||||
apt update && \
|
||||
apt install -y --no-install-recommends \
|
||||
ca-certificates wget gpg && \
|
||||
wget -qO - https://www.postgresql.org/media/keys/ACCC4CF8.asc | gpg --dearmor -o /usr/share/keyrings/postgresql-keyring.gpg && \
|
||||
echo "deb [signed-by=/usr/share/keyrings/postgresql-keyring.gpg] http://apt.postgresql.org/pub/repos/apt bookworm-pgdg main" > /etc/apt/sources.list.d/pgdg.list && \
|
||||
apt-get update && \
|
||||
apt install -y --no-install-recommends \
|
||||
build-essential \
|
||||
autotools-dev \
|
||||
libedit-dev \
|
||||
libgc-dev \
|
||||
libpam0g-dev \
|
||||
libreadline-dev \
|
||||
libselinux1-dev \
|
||||
libxslt1-dev \
|
||||
libssl-dev \
|
||||
libkrb5-dev \
|
||||
zlib1g-dev \
|
||||
liblz4-dev \
|
||||
libpq5 \
|
||||
libpq-dev \
|
||||
libzstd-dev \
|
||||
postgresql-16 \
|
||||
postgresql-server-dev-16 \
|
||||
postgresql-common \
|
||||
python3-sphinx && \
|
||||
wget -O /tmp/pgcopydb.tar.gz https://github.com/dimitri/pgcopydb/archive/refs/tags/v0.17.tar.gz && \
|
||||
mkdir /tmp/pgcopydb && \
|
||||
tar -xzf /tmp/pgcopydb.tar.gz -C /tmp/pgcopydb --strip-components=1 && \
|
||||
cd /tmp/pgcopydb && \
|
||||
make -s clean && \
|
||||
make -s -j12 install && \
|
||||
libpq_path=$(find /lib /usr/lib -name "libpq.so.5" | head -n 1) && \
|
||||
mkdir -p /pgcopydb/lib && \
|
||||
cp "$libpq_path" /pgcopydb/lib/; \
|
||||
else \
|
||||
# copy command below will fail if we don't have dummy files, so we create them for other debian versions
|
||||
mkdir -p /usr/lib/postgresql/16/bin && touch /usr/lib/postgresql/16/bin/pgcopydb && \
|
||||
mkdir -p mkdir -p /pgcopydb/lib && touch /pgcopydb/lib/libpq.so.5; \
|
||||
fi
|
||||
|
||||
FROM debian:${DEBIAN_VERSION}-slim AS build_tools
|
||||
FROM debian:${DEBIAN_VERSION}-slim
|
||||
ARG DEBIAN_VERSION
|
||||
|
||||
# Add nonroot user
|
||||
RUN useradd -ms /bin/bash nonroot -b /home
|
||||
SHELL ["/bin/bash", "-c"]
|
||||
|
||||
RUN mkdir -p /pgcopydb/bin && \
|
||||
mkdir -p /pgcopydb/lib && \
|
||||
chmod -R 755 /pgcopydb && \
|
||||
chown -R nonroot:nonroot /pgcopydb
|
||||
|
||||
COPY --from=pgcopydb_builder /usr/lib/postgresql/16/bin/pgcopydb /pgcopydb/bin/pgcopydb
|
||||
COPY --from=pgcopydb_builder /pgcopydb/lib/libpq.so.5 /pgcopydb/lib/libpq.so.5
|
||||
|
||||
# System deps
|
||||
#
|
||||
# 'gdb' is included so that we get backtraces of core dumps produced in
|
||||
@@ -92,7 +38,7 @@ RUN set -e \
|
||||
libseccomp-dev \
|
||||
libsqlite3-dev \
|
||||
libssl-dev \
|
||||
$([[ "${DEBIAN_VERSION}" = "bullseye" ]] && echo libstdc++-10-dev || echo libstdc++-11-dev) \
|
||||
$([[ "${DEBIAN_VERSION}" = "bullseye" ]] && libstdc++-10-dev || libstdc++-11-dev) \
|
||||
libtool \
|
||||
libxml2-dev \
|
||||
libxmlsec1-dev \
|
||||
@@ -289,13 +235,7 @@ RUN whoami \
|
||||
&& cargo --version --verbose \
|
||||
&& rustup --version --verbose \
|
||||
&& rustc --version --verbose \
|
||||
&& clang --version
|
||||
|
||||
RUN if [ "${DEBIAN_VERSION}" = "bookworm" ]; then \
|
||||
LD_LIBRARY_PATH=/pgcopydb/lib /pgcopydb/bin/pgcopydb --version; \
|
||||
else \
|
||||
echo "pgcopydb is not available for ${DEBIAN_VERSION}"; \
|
||||
fi
|
||||
&& clang --version
|
||||
|
||||
# Set following flag to check in Makefile if its running in Docker
|
||||
RUN touch /home/nonroot/.docker_build
|
||||
|
||||
@@ -559,8 +559,8 @@ RUN case "${PG_VERSION}" in \
|
||||
export TIMESCALEDB_CHECKSUM=584a351c7775f0e067eaa0e7277ea88cab9077cc4c455cbbf09a5d9723dce95d \
|
||||
;; \
|
||||
"v17") \
|
||||
export TIMESCALEDB_VERSION=2.17.1 \
|
||||
export TIMESCALEDB_CHECKSUM=6277cf43f5695e23dae1c5cfeba00474d730b66ed53665a84b787a6bb1a57e28 \
|
||||
export TIMESCALEDB_VERSION=2.17.0 \
|
||||
export TIMESCALEDB_CHECKSUM=155bf64391d3558c42f31ca0e523cfc6252921974f75298c9039ccad1c89811a \
|
||||
;; \
|
||||
esac && \
|
||||
wget https://github.com/timescale/timescaledb/archive/refs/tags/${TIMESCALEDB_VERSION}.tar.gz -O timescaledb.tar.gz && \
|
||||
|
||||
@@ -364,29 +364,11 @@ impl ComputeNode {
|
||||
let pageserver_connect_micros = start_time.elapsed().as_micros() as u64;
|
||||
|
||||
let basebackup_cmd = match lsn {
|
||||
Lsn(0) => {
|
||||
if spec.spec.mode != ComputeMode::Primary {
|
||||
format!(
|
||||
"basebackup {} {} --gzip --replica",
|
||||
spec.tenant_id, spec.timeline_id
|
||||
)
|
||||
} else {
|
||||
format!("basebackup {} {} --gzip", spec.tenant_id, spec.timeline_id)
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
if spec.spec.mode != ComputeMode::Primary {
|
||||
format!(
|
||||
"basebackup {} {} {} --gzip --replica",
|
||||
spec.tenant_id, spec.timeline_id, lsn
|
||||
)
|
||||
} else {
|
||||
format!(
|
||||
"basebackup {} {} {} --gzip",
|
||||
spec.tenant_id, spec.timeline_id, lsn
|
||||
)
|
||||
}
|
||||
}
|
||||
Lsn(0) => format!("basebackup {} {} --gzip", spec.tenant_id, spec.timeline_id),
|
||||
_ => format!(
|
||||
"basebackup {} {} {} --gzip",
|
||||
spec.tenant_id, spec.timeline_id, lsn
|
||||
),
|
||||
};
|
||||
|
||||
let copyreader = client.copy_out(basebackup_cmd.as_str())?;
|
||||
|
||||
@@ -73,19 +73,6 @@ pub fn write_postgres_conf(
|
||||
)?;
|
||||
}
|
||||
|
||||
// Locales
|
||||
if cfg!(target_os = "macos") {
|
||||
writeln!(file, "lc_messages='C'")?;
|
||||
writeln!(file, "lc_monetary='C'")?;
|
||||
writeln!(file, "lc_time='C'")?;
|
||||
writeln!(file, "lc_numeric='C'")?;
|
||||
} else {
|
||||
writeln!(file, "lc_messages='C.UTF-8'")?;
|
||||
writeln!(file, "lc_monetary='C.UTF-8'")?;
|
||||
writeln!(file, "lc_time='C.UTF-8'")?;
|
||||
writeln!(file, "lc_numeric='C.UTF-8'")?;
|
||||
}
|
||||
|
||||
match spec.mode {
|
||||
ComputeMode::Primary => {}
|
||||
ComputeMode::Static(lsn) => {
|
||||
|
||||
@@ -944,9 +944,6 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
|
||||
pg_auth_type: AuthType::Trust,
|
||||
http_auth_type: AuthType::Trust,
|
||||
other: Default::default(),
|
||||
// Typical developer machines use disks with slow fsync, and we don't care
|
||||
// about data integrity: disable disk syncs.
|
||||
no_sync: true,
|
||||
}
|
||||
})
|
||||
.collect(),
|
||||
|
||||
@@ -225,7 +225,6 @@ pub struct PageServerConf {
|
||||
pub listen_http_addr: String,
|
||||
pub pg_auth_type: AuthType,
|
||||
pub http_auth_type: AuthType,
|
||||
pub no_sync: bool,
|
||||
}
|
||||
|
||||
impl Default for PageServerConf {
|
||||
@@ -236,7 +235,6 @@ impl Default for PageServerConf {
|
||||
listen_http_addr: String::new(),
|
||||
pg_auth_type: AuthType::Trust,
|
||||
http_auth_type: AuthType::Trust,
|
||||
no_sync: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -251,8 +249,6 @@ pub struct NeonLocalInitPageserverConf {
|
||||
pub listen_http_addr: String,
|
||||
pub pg_auth_type: AuthType,
|
||||
pub http_auth_type: AuthType,
|
||||
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
|
||||
pub no_sync: bool,
|
||||
#[serde(flatten)]
|
||||
pub other: HashMap<String, toml::Value>,
|
||||
}
|
||||
@@ -265,7 +261,6 @@ impl From<&NeonLocalInitPageserverConf> for PageServerConf {
|
||||
listen_http_addr,
|
||||
pg_auth_type,
|
||||
http_auth_type,
|
||||
no_sync,
|
||||
other: _,
|
||||
} = conf;
|
||||
Self {
|
||||
@@ -274,7 +269,6 @@ impl From<&NeonLocalInitPageserverConf> for PageServerConf {
|
||||
listen_http_addr: listen_http_addr.clone(),
|
||||
pg_auth_type: *pg_auth_type,
|
||||
http_auth_type: *http_auth_type,
|
||||
no_sync: *no_sync,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -575,8 +569,6 @@ impl LocalEnv {
|
||||
listen_http_addr: String,
|
||||
pg_auth_type: AuthType,
|
||||
http_auth_type: AuthType,
|
||||
#[serde(default)]
|
||||
no_sync: bool,
|
||||
}
|
||||
let config_toml_path = dentry.path().join("pageserver.toml");
|
||||
let config_toml: PageserverConfigTomlSubset = toml_edit::de::from_str(
|
||||
@@ -599,7 +591,6 @@ impl LocalEnv {
|
||||
listen_http_addr,
|
||||
pg_auth_type,
|
||||
http_auth_type,
|
||||
no_sync,
|
||||
} = config_toml;
|
||||
let IdentityTomlSubset {
|
||||
id: identity_toml_id,
|
||||
@@ -616,7 +607,6 @@ impl LocalEnv {
|
||||
listen_http_addr,
|
||||
pg_auth_type,
|
||||
http_auth_type,
|
||||
no_sync,
|
||||
};
|
||||
pageservers.push(conf);
|
||||
}
|
||||
|
||||
@@ -273,7 +273,6 @@ impl PageServerNode {
|
||||
)
|
||||
})?;
|
||||
let args = vec!["-D", datadir_path_str];
|
||||
|
||||
background_process::start_process(
|
||||
"pageserver",
|
||||
&datadir,
|
||||
|
||||
@@ -64,7 +64,6 @@ pub struct ConfigToml {
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub wal_redo_timeout: Duration,
|
||||
pub superuser: String,
|
||||
pub locale: String,
|
||||
pub page_cache_size: usize,
|
||||
pub max_file_descriptors: usize,
|
||||
pub pg_distrib_dir: Option<Utf8PathBuf>,
|
||||
@@ -107,8 +106,6 @@ pub struct ConfigToml {
|
||||
pub ephemeral_bytes_per_memory_kb: usize,
|
||||
pub l0_flush: Option<crate::models::L0FlushConfig>,
|
||||
pub virtual_file_io_mode: Option<crate::models::virtual_file::IoMode>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub no_sync: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
@@ -277,11 +274,6 @@ pub mod defaults {
|
||||
pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s";
|
||||
|
||||
pub const DEFAULT_SUPERUSER: &str = "cloud_admin";
|
||||
pub const DEFAULT_LOCALE: &str = if cfg!(target_os = "macos") {
|
||||
"C"
|
||||
} else {
|
||||
"C.UTF-8"
|
||||
};
|
||||
|
||||
pub const DEFAULT_PAGE_CACHE_SIZE: usize = 8192;
|
||||
pub const DEFAULT_MAX_FILE_DESCRIPTORS: usize = 100;
|
||||
@@ -332,7 +324,6 @@ impl Default for ConfigToml {
|
||||
wal_redo_timeout: (humantime::parse_duration(DEFAULT_WAL_REDO_TIMEOUT)
|
||||
.expect("cannot parse default wal redo timeout")),
|
||||
superuser: (DEFAULT_SUPERUSER.to_string()),
|
||||
locale: DEFAULT_LOCALE.to_string(),
|
||||
page_cache_size: (DEFAULT_PAGE_CACHE_SIZE),
|
||||
max_file_descriptors: (DEFAULT_MAX_FILE_DESCRIPTORS),
|
||||
pg_distrib_dir: None, // Utf8PathBuf::from("./pg_install"), // TODO: formely, this was std::env::current_dir()
|
||||
@@ -398,7 +389,6 @@ impl Default for ConfigToml {
|
||||
l0_flush: None,
|
||||
virtual_file_io_mode: None,
|
||||
tenant_config: TenantConfigToml::default(),
|
||||
no_sync: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
#!/usr/bin/env bash
|
||||
#!/bin/bash
|
||||
|
||||
set -euxo pipefail
|
||||
|
||||
@@ -6,44 +6,9 @@ PG_BIN=$1
|
||||
WAL_PATH=$2
|
||||
DATA_DIR=$3
|
||||
PORT=$4
|
||||
PG_VERSION=$5
|
||||
SYSID=$(od -A n -j 24 -N 8 -t d8 "$WAL_PATH"/000000010000000000000002* | cut -c 3-)
|
||||
|
||||
# The way that initdb is invoked must match how the pageserver runs initdb.
|
||||
function initdb_with_args {
|
||||
local cmd=(
|
||||
"$PG_BIN"/initdb
|
||||
-E utf8
|
||||
-U cloud_admin
|
||||
-D "$DATA_DIR"
|
||||
--locale 'C.UTF-8'
|
||||
--lc-collate 'C.UTF-8'
|
||||
--lc-ctype 'C.UTF-8'
|
||||
--lc-messages 'C.UTF-8'
|
||||
--lc-monetary 'C.UTF-8'
|
||||
--lc-numeric 'C.UTF-8'
|
||||
--lc-time 'C.UTF-8'
|
||||
--sysid="$SYSID"
|
||||
)
|
||||
|
||||
case "$PG_VERSION" in
|
||||
14)
|
||||
# Postgres 14 and below didn't support --locale-provider
|
||||
;;
|
||||
15 | 16)
|
||||
cmd+=(--locale-provider 'libc')
|
||||
;;
|
||||
*)
|
||||
# Postgres 17 added the builtin provider
|
||||
cmd+=(--locale-provider 'builtin')
|
||||
;;
|
||||
esac
|
||||
|
||||
eval env -i LD_LIBRARY_PATH="$PG_BIN"/../lib "${cmd[*]}"
|
||||
}
|
||||
|
||||
rm -fr "$DATA_DIR"
|
||||
initdb_with_args
|
||||
env -i LD_LIBRARY_PATH="$PG_BIN"/../lib "$PG_BIN"/initdb -E utf8 -U cloud_admin -D "$DATA_DIR" --sysid="$SYSID"
|
||||
echo "port=$PORT" >> "$DATA_DIR"/postgresql.conf
|
||||
echo "shared_preload_libraries='\$libdir/neon_rmgr.so'" >> "$DATA_DIR"/postgresql.conf
|
||||
REDO_POS=0x$("$PG_BIN"/pg_controldata -D "$DATA_DIR" | grep -F "REDO location"| cut -c 42-)
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use std::os::fd::AsRawFd;
|
||||
use std::time::Duration;
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
fs::{self, File},
|
||||
@@ -125,6 +126,7 @@ pub async fn fsync_async_opt(
|
||||
|
||||
/// Like postgres' durable_rename, renames file issuing fsyncs do make it
|
||||
/// durable. After return, file and rename are guaranteed to be persisted.
|
||||
/// Returns the fsync latencies, for metrics (this is kind of a kludge).
|
||||
///
|
||||
/// Unlike postgres, it only does fsyncs to 1) file to be renamed to make
|
||||
/// contents durable; 2) its directory entry to make rename durable 3) again to
|
||||
@@ -142,24 +144,35 @@ pub async fn durable_rename(
|
||||
old_path: impl AsRef<Utf8Path>,
|
||||
new_path: impl AsRef<Utf8Path>,
|
||||
do_fsync: bool,
|
||||
) -> io::Result<()> {
|
||||
) -> io::Result<[Duration; 3]> {
|
||||
async fn maybe_fsync_with_latency(path: &Utf8Path, do_fsync: bool) -> io::Result<Duration> {
|
||||
if !do_fsync {
|
||||
return Ok(Duration::ZERO);
|
||||
}
|
||||
let start = std::time::Instant::now();
|
||||
fsync_async(path).await?;
|
||||
Ok(start.elapsed())
|
||||
}
|
||||
|
||||
let mut latency = [Duration::ZERO; 3];
|
||||
|
||||
// first fsync the file
|
||||
fsync_async_opt(old_path.as_ref(), do_fsync).await?;
|
||||
latency[0] = maybe_fsync_with_latency(old_path.as_ref(), do_fsync).await?;
|
||||
|
||||
// Time to do the real deal.
|
||||
tokio::fs::rename(old_path.as_ref(), new_path.as_ref()).await?;
|
||||
|
||||
// Postgres'ish fsync of renamed file.
|
||||
fsync_async_opt(new_path.as_ref(), do_fsync).await?;
|
||||
latency[1] = maybe_fsync_with_latency(new_path.as_ref(), do_fsync).await?;
|
||||
|
||||
// Now fsync the parent
|
||||
let parent = match new_path.as_ref().parent() {
|
||||
Some(p) => p,
|
||||
None => Utf8Path::new("./"), // assume current dir if there is no parent
|
||||
};
|
||||
fsync_async_opt(parent, do_fsync).await?;
|
||||
latency[2] = maybe_fsync_with_latency(parent, do_fsync).await?;
|
||||
|
||||
Ok(())
|
||||
Ok(latency)
|
||||
}
|
||||
|
||||
/// Writes a file to the specified `final_path` in a crash safe fasion, using [`std::fs`].
|
||||
|
||||
@@ -154,17 +154,13 @@ fn main() -> anyhow::Result<()> {
|
||||
},
|
||||
};
|
||||
|
||||
if conf.no_sync {
|
||||
info!("Skipping syncfs on startup");
|
||||
} else {
|
||||
let started = Instant::now();
|
||||
syncfs(dirfd)?;
|
||||
let elapsed = started.elapsed();
|
||||
info!(
|
||||
elapsed_ms = elapsed.as_millis(),
|
||||
"made tenant directory contents durable"
|
||||
);
|
||||
}
|
||||
let started = Instant::now();
|
||||
syncfs(dirfd)?;
|
||||
let elapsed = started.elapsed();
|
||||
info!(
|
||||
elapsed_ms = elapsed.as_millis(),
|
||||
"made tenant directory contents durable"
|
||||
);
|
||||
}
|
||||
|
||||
// Initialize up failpoints support
|
||||
|
||||
@@ -69,7 +69,6 @@ pub struct PageServerConf {
|
||||
pub wal_redo_timeout: Duration,
|
||||
|
||||
pub superuser: String,
|
||||
pub locale: String,
|
||||
|
||||
pub page_cache_size: usize,
|
||||
pub max_file_descriptors: usize,
|
||||
@@ -179,9 +178,6 @@ pub struct PageServerConf {
|
||||
|
||||
/// Direct IO settings
|
||||
pub virtual_file_io_mode: virtual_file::IoMode,
|
||||
|
||||
/// Optionally disable disk syncs (unsafe!)
|
||||
pub no_sync: bool,
|
||||
}
|
||||
|
||||
/// Token for authentication to safekeepers
|
||||
@@ -302,7 +298,6 @@ impl PageServerConf {
|
||||
wait_lsn_timeout,
|
||||
wal_redo_timeout,
|
||||
superuser,
|
||||
locale,
|
||||
page_cache_size,
|
||||
max_file_descriptors,
|
||||
pg_distrib_dir,
|
||||
@@ -337,7 +332,6 @@ impl PageServerConf {
|
||||
concurrent_tenant_size_logical_size_queries,
|
||||
virtual_file_io_engine,
|
||||
tenant_config,
|
||||
no_sync,
|
||||
} = config_toml;
|
||||
|
||||
let mut conf = PageServerConf {
|
||||
@@ -350,7 +344,6 @@ impl PageServerConf {
|
||||
wait_lsn_timeout,
|
||||
wal_redo_timeout,
|
||||
superuser,
|
||||
locale,
|
||||
page_cache_size,
|
||||
max_file_descriptors,
|
||||
http_auth_type,
|
||||
@@ -416,7 +409,6 @@ impl PageServerConf {
|
||||
.map(crate::l0_flush::L0FlushConfig::from)
|
||||
.unwrap_or_default(),
|
||||
virtual_file_io_mode: virtual_file_io_mode.unwrap_or(virtual_file::IoMode::preferred()),
|
||||
no_sync: no_sync.unwrap_or(false),
|
||||
};
|
||||
|
||||
// ------------------------------------------------------------
|
||||
|
||||
@@ -2002,9 +2002,9 @@ async fn timeline_offload_handler(
|
||||
"timeline has attached children".into(),
|
||||
));
|
||||
}
|
||||
if let (false, reason) = timeline.can_offload() {
|
||||
if !timeline.can_offload() {
|
||||
return Err(ApiError::PreconditionFailed(
|
||||
format!("Timeline::can_offload() check failed: {}", reason) .into(),
|
||||
"Timeline::can_offload() returned false".into(),
|
||||
));
|
||||
}
|
||||
offload_timeline(&tenant, &timeline)
|
||||
@@ -2169,21 +2169,6 @@ async fn timeline_detach_ancestor_handler(
|
||||
let ctx = RequestContext::new(TaskKind::DetachAncestor, DownloadBehavior::Download);
|
||||
let ctx = &ctx;
|
||||
|
||||
// Flush the upload queues of all timelines before detaching ancestor. We do the same thing again
|
||||
// during shutdown. This early upload ensures the pageserver does not need to upload too many
|
||||
// things and creates downtime during timeline reloads.
|
||||
for timeline in tenant.list_timelines() {
|
||||
timeline
|
||||
.remote_client
|
||||
.wait_completion()
|
||||
.await
|
||||
.map_err(|e| {
|
||||
ApiError::PreconditionFailed(format!("cannot drain upload queue: {e}").into())
|
||||
})?;
|
||||
}
|
||||
|
||||
tracing::info!("all timeline upload queues are drained");
|
||||
|
||||
let timeline = tenant.get_timeline(timeline_id, true)?;
|
||||
|
||||
let progress = timeline
|
||||
|
||||
@@ -45,7 +45,7 @@ use wal_decoder::serialized_batch::SerializedValueBatch;
|
||||
pub const MAX_AUX_FILE_DELTAS: usize = 1024;
|
||||
|
||||
/// Max number of aux-file-related delta layers. The compaction will create a new image layer once this threshold is reached.
|
||||
pub const MAX_AUX_FILE_V2_DELTAS: usize = 16;
|
||||
pub const MAX_AUX_FILE_V2_DELTAS: usize = 64;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum LsnForTimestamp {
|
||||
|
||||
@@ -2493,8 +2493,7 @@ impl Tenant {
|
||||
timelines_to_compact_or_offload = timelines
|
||||
.iter()
|
||||
.filter_map(|(timeline_id, timeline)| {
|
||||
let (is_active, (can_offload, _)) =
|
||||
(timeline.is_active(), timeline.can_offload());
|
||||
let (is_active, can_offload) = (timeline.is_active(), timeline.can_offload());
|
||||
let has_no_unoffloaded_children = {
|
||||
!timelines
|
||||
.iter()
|
||||
@@ -4780,12 +4779,10 @@ async fn run_initdb(
|
||||
|
||||
let _permit = INIT_DB_SEMAPHORE.acquire().await;
|
||||
|
||||
let mut initdb_command = tokio::process::Command::new(&initdb_bin_path);
|
||||
initdb_command
|
||||
let initdb_command = tokio::process::Command::new(&initdb_bin_path)
|
||||
.args(["--pgdata", initdb_target_dir.as_ref()])
|
||||
.args(["--username", &conf.superuser])
|
||||
.args(["--encoding", "utf8"])
|
||||
.args(["--locale", &conf.locale])
|
||||
.arg("--no-instructions")
|
||||
.arg("--no-sync")
|
||||
.env_clear()
|
||||
@@ -4795,27 +4792,15 @@ async fn run_initdb(
|
||||
// stdout invocation produces the same output every time, we don't need it
|
||||
.stdout(std::process::Stdio::null())
|
||||
// we would be interested in the stderr output, if there was any
|
||||
.stderr(std::process::Stdio::piped());
|
||||
|
||||
// Before version 14, only the libc provide was available.
|
||||
if pg_version > 14 {
|
||||
// Version 17 brought with it a builtin locale provider which only provides
|
||||
// C and C.UTF-8. While being safer for collation purposes since it is
|
||||
// guaranteed to be consistent throughout a major release, it is also more
|
||||
// performant.
|
||||
let locale_provider = if pg_version >= 17 { "builtin" } else { "libc" };
|
||||
|
||||
initdb_command.args(["--locale-provider", locale_provider]);
|
||||
}
|
||||
|
||||
let initdb_proc = initdb_command.spawn()?;
|
||||
.stderr(std::process::Stdio::piped())
|
||||
.spawn()?;
|
||||
|
||||
// Ideally we'd select here with the cancellation token, but the problem is that
|
||||
// we can't safely terminate initdb: it launches processes of its own, and killing
|
||||
// initdb doesn't kill them. After we return from this function, we want the target
|
||||
// directory to be able to be cleaned up.
|
||||
// See https://github.com/neondatabase/neon/issues/6385
|
||||
let initdb_output = initdb_proc.wait_with_output().await?;
|
||||
let initdb_output = initdb_command.wait_with_output().await?;
|
||||
if !initdb_output.status.success() {
|
||||
return Err(InitdbError::Failed(
|
||||
initdb_output.status,
|
||||
|
||||
@@ -1959,7 +1959,7 @@ impl TenantManager {
|
||||
attempt.before_reset_tenant();
|
||||
|
||||
let (_guard, progress) = utils::completion::channel();
|
||||
match tenant.shutdown(progress, ShutdownMode::Flush).await {
|
||||
match tenant.shutdown(progress, ShutdownMode::Hard).await {
|
||||
Ok(()) => {
|
||||
slot_guard.drop_old_value().expect("it was just shutdown");
|
||||
}
|
||||
|
||||
@@ -2201,18 +2201,6 @@ impl RemoteTimelineClient {
|
||||
inner.initialized_mut()?;
|
||||
Ok(UploadQueueAccessor { inner })
|
||||
}
|
||||
|
||||
pub(crate) fn no_pending_work(&self) -> bool {
|
||||
let inner = self.upload_queue.lock().unwrap();
|
||||
match &*inner {
|
||||
UploadQueue::Uninitialized
|
||||
| UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => true,
|
||||
UploadQueue::Stopped(UploadQueueStopped::Deletable(x)) => {
|
||||
x.upload_queue_for_deletion.no_pending_work()
|
||||
}
|
||||
UploadQueue::Initialized(x) => x.no_pending_work(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct UploadQueueAccessor<'a> {
|
||||
|
||||
@@ -67,8 +67,6 @@ pub struct InMemoryLayer {
|
||||
/// The above fields never change, except for `end_lsn`, which is only set once.
|
||||
/// All other changing parts are in `inner`, and protected by a mutex.
|
||||
inner: RwLock<InMemoryLayerInner>,
|
||||
|
||||
estimated_in_mem_size: AtomicU64,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for InMemoryLayer {
|
||||
@@ -545,10 +543,6 @@ impl InMemoryLayer {
|
||||
Ok(inner.file.len())
|
||||
}
|
||||
|
||||
pub fn estimated_in_mem_size(&self) -> u64 {
|
||||
self.estimated_in_mem_size.load(AtomicOrdering::Relaxed)
|
||||
}
|
||||
|
||||
/// Create a new, empty, in-memory layer
|
||||
pub async fn create(
|
||||
conf: &'static PageServerConf,
|
||||
@@ -578,7 +572,6 @@ impl InMemoryLayer {
|
||||
file,
|
||||
resource_units: GlobalResourceUnits::new(),
|
||||
}),
|
||||
estimated_in_mem_size: AtomicU64::new(0),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -649,12 +642,6 @@ impl InMemoryLayer {
|
||||
// because this case is unexpected, and we would like tests to fail if this happens.
|
||||
warn!("Key {} at {} written twice at same LSN", key, lsn);
|
||||
}
|
||||
self.estimated_in_mem_size.fetch_add(
|
||||
(std::mem::size_of::<CompactKey>()
|
||||
+ std::mem::size_of::<Lsn>()
|
||||
+ std::mem::size_of::<IndexEntry>()) as u64,
|
||||
AtomicOrdering::Relaxed,
|
||||
);
|
||||
}
|
||||
|
||||
inner.resource_units.maybe_publish_size(new_size);
|
||||
|
||||
@@ -23,7 +23,6 @@ use handle::ShardTimelineId;
|
||||
use offload::OffloadError;
|
||||
use once_cell::sync::Lazy;
|
||||
use pageserver_api::{
|
||||
config::tenant_conf_defaults::DEFAULT_COMPACTION_THRESHOLD,
|
||||
key::{
|
||||
KEY_SIZE, METADATA_KEY_BEGIN_PREFIX, METADATA_KEY_END_PREFIX, NON_INHERITED_RANGE,
|
||||
NON_INHERITED_SPARSE_RANGE,
|
||||
@@ -853,10 +852,6 @@ pub(crate) enum ShutdownMode {
|
||||
/// While we are flushing, we continue to accept read I/O for LSNs ingested before
|
||||
/// the call to [`Timeline::shutdown`].
|
||||
FreezeAndFlush,
|
||||
/// Only flush the layers to the remote storage without freezing any open layers. This is the
|
||||
/// mode used by ancestor detach and any other operations that reloads a tenant but not increasing
|
||||
/// the generation number.
|
||||
Flush,
|
||||
/// Shut down immediately, without waiting for any open layers to flush.
|
||||
Hard,
|
||||
}
|
||||
@@ -1570,16 +1565,12 @@ impl Timeline {
|
||||
///
|
||||
/// This is neccessary but not sufficient for offloading of the timeline as it might have
|
||||
/// child timelines that are not offloaded yet.
|
||||
pub(crate) fn can_offload(&self) -> (bool, &'static str) {
|
||||
pub(crate) fn can_offload(&self) -> bool {
|
||||
if self.remote_client.is_archived() != Some(true) {
|
||||
return (false, "the timeline is not archived");
|
||||
}
|
||||
if !self.remote_client.no_pending_work() {
|
||||
// if the remote client is still processing some work, we can't offload
|
||||
return (false, "the upload queue is not drained yet");
|
||||
return false;
|
||||
}
|
||||
|
||||
(true, "ok")
|
||||
true
|
||||
}
|
||||
|
||||
/// Outermost timeline compaction operation; downloads needed layers. Returns whether we have pending
|
||||
@@ -1687,6 +1678,11 @@ impl Timeline {
|
||||
pub(crate) async fn shutdown(&self, mode: ShutdownMode) {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
let try_freeze_and_flush = match mode {
|
||||
ShutdownMode::FreezeAndFlush => true,
|
||||
ShutdownMode::Hard => false,
|
||||
};
|
||||
|
||||
// Regardless of whether we're going to try_freeze_and_flush
|
||||
// or not, stop ingesting any more data. Walreceiver only provides
|
||||
// cancellation but no "wait until gone", because it uses the Timeline::gate.
|
||||
@@ -1708,7 +1704,7 @@ impl Timeline {
|
||||
// ... and inform any waiters for newer LSNs that there won't be any.
|
||||
self.last_record_lsn.shutdown();
|
||||
|
||||
if let ShutdownMode::FreezeAndFlush = mode {
|
||||
if try_freeze_and_flush {
|
||||
if let Some((open, frozen)) = self
|
||||
.layers
|
||||
.read()
|
||||
@@ -1750,20 +1746,6 @@ impl Timeline {
|
||||
warn!("failed to freeze and flush: {e:#}");
|
||||
}
|
||||
}
|
||||
|
||||
// `self.remote_client.shutdown().await` above should have already flushed everything from the queue, but
|
||||
// we also do a final check here to ensure that the queue is empty.
|
||||
if !self.remote_client.no_pending_work() {
|
||||
warn!("still have pending work in remote upload queue, but continuing shutting down anyways");
|
||||
}
|
||||
}
|
||||
|
||||
if let ShutdownMode::Flush = mode {
|
||||
// drain the upload queue
|
||||
self.remote_client.shutdown().await;
|
||||
if !self.remote_client.no_pending_work() {
|
||||
warn!("still have pending work in remote upload queue, but continuing shutting down anyways");
|
||||
}
|
||||
}
|
||||
|
||||
// Signal any subscribers to our cancellation token to drop out
|
||||
@@ -3506,37 +3488,18 @@ impl Timeline {
|
||||
|
||||
let timer = self.metrics.flush_time_histo.start_timer();
|
||||
|
||||
let num_frozen_layers;
|
||||
let frozen_layer_total_size;
|
||||
let layer_to_flush = {
|
||||
let guard = self.layers.read().await;
|
||||
let Ok(lm) = guard.layer_map() else {
|
||||
info!("dropping out of flush loop for timeline shutdown");
|
||||
return;
|
||||
};
|
||||
num_frozen_layers = lm.frozen_layers.len();
|
||||
frozen_layer_total_size = lm
|
||||
.frozen_layers
|
||||
.iter()
|
||||
.map(|l| l.estimated_in_mem_size())
|
||||
.sum::<u64>();
|
||||
lm.frozen_layers.front().cloned()
|
||||
// drop 'layers' lock to allow concurrent reads and writes
|
||||
};
|
||||
let Some(layer_to_flush) = layer_to_flush else {
|
||||
break Ok(());
|
||||
};
|
||||
if num_frozen_layers
|
||||
> std::cmp::max(
|
||||
self.get_compaction_threshold(),
|
||||
DEFAULT_COMPACTION_THRESHOLD,
|
||||
)
|
||||
&& frozen_layer_total_size >= /* 128 MB */ 128000000
|
||||
{
|
||||
tracing::warn!(
|
||||
"too many frozen layers: {num_frozen_layers} layers with estimated in-mem size of {frozen_layer_total_size} bytes",
|
||||
);
|
||||
}
|
||||
match self.flush_frozen_layer(layer_to_flush, ctx).await {
|
||||
Ok(this_layer_to_lsn) => {
|
||||
flushed_to_lsn = std::cmp::max(flushed_to_lsn, this_layer_to_lsn);
|
||||
@@ -4127,7 +4090,6 @@ impl Timeline {
|
||||
) -> Result<ImageLayerCreationOutcome, CreateImageLayersError> {
|
||||
// Metadata keys image layer creation.
|
||||
let mut reconstruct_state = ValuesReconstructState::default();
|
||||
let begin = Instant::now();
|
||||
let data = self
|
||||
.get_vectored_impl(partition.clone(), lsn, &mut reconstruct_state, ctx)
|
||||
.await?;
|
||||
@@ -4144,11 +4106,14 @@ impl Timeline {
|
||||
(new_data, total_kb_retrieved / 1024, total_keys_retrieved)
|
||||
};
|
||||
let delta_files_accessed = reconstruct_state.get_delta_layers_visited();
|
||||
let elapsed = begin.elapsed();
|
||||
|
||||
let trigger_generation = delta_files_accessed as usize >= MAX_AUX_FILE_V2_DELTAS;
|
||||
info!(
|
||||
"metadata key compaction: trigger_generation={trigger_generation}, delta_files_accessed={delta_files_accessed}, total_kb_retrieved={total_kb_retrieved}, total_keys_retrieved={total_keys_retrieved}, read_time={}s", elapsed.as_secs_f64()
|
||||
debug!(
|
||||
trigger_generation,
|
||||
delta_files_accessed,
|
||||
total_kb_retrieved,
|
||||
total_keys_retrieved,
|
||||
"generate metadata images"
|
||||
);
|
||||
|
||||
if !trigger_generation && mode == ImageLayerCreationMode::Try {
|
||||
|
||||
@@ -47,18 +47,21 @@ pub(crate) async fn offload_timeline(
|
||||
match is_archived {
|
||||
Some(true) => (),
|
||||
Some(false) => {
|
||||
tracing::warn!("tried offloading a non-archived timeline");
|
||||
tracing::warn!(?is_archived, "tried offloading a non-archived timeline");
|
||||
return Err(OffloadError::NotArchived);
|
||||
}
|
||||
None => {
|
||||
// This is legal: calls to this function can race with the timeline shutting down
|
||||
tracing::info!("tried offloading a timeline whose remote storage is not initialized");
|
||||
tracing::info!(
|
||||
?is_archived,
|
||||
"tried offloading a timeline whose remote storage is not initialized"
|
||||
);
|
||||
return Err(OffloadError::Cancelled);
|
||||
}
|
||||
}
|
||||
|
||||
// Now that the Timeline is in Stopping state, request all the related tasks to shut down.
|
||||
timeline.shutdown(super::ShutdownMode::Flush).await;
|
||||
timeline.shutdown(super::ShutdownMode::Hard).await;
|
||||
|
||||
// TODO extend guard mechanism above with method
|
||||
// to make deletions possible while offloading is in progress
|
||||
|
||||
@@ -60,7 +60,7 @@ prometheus.workspace = true
|
||||
rand.workspace = true
|
||||
regex.workspace = true
|
||||
remote_storage = { version = "0.1", path = "../libs/remote_storage/" }
|
||||
reqwest = { workspace = true, features = ["rustls-tls-native-roots"] }
|
||||
reqwest.workspace = true
|
||||
reqwest-middleware = { workspace = true, features = ["json"] }
|
||||
reqwest-retry.workspace = true
|
||||
reqwest-tracing.workspace = true
|
||||
|
||||
@@ -7,11 +7,8 @@ use arc_swap::ArcSwapOption;
|
||||
use dashmap::DashMap;
|
||||
use jose_jwk::crypto::KeyInfo;
|
||||
use reqwest::{redirect, Client};
|
||||
use reqwest_retry::policies::ExponentialBackoff;
|
||||
use reqwest_retry::RetryTransientMiddleware;
|
||||
use serde::de::Visitor;
|
||||
use serde::{Deserialize, Deserializer};
|
||||
use serde_json::value::RawValue;
|
||||
use signature::Verifier;
|
||||
use thiserror::Error;
|
||||
use tokio::time::Instant;
|
||||
@@ -19,7 +16,7 @@ use tokio::time::Instant;
|
||||
use crate::auth::backend::ComputeCredentialKeys;
|
||||
use crate::context::RequestMonitoring;
|
||||
use crate::control_plane::errors::GetEndpointJwksError;
|
||||
use crate::http::read_body_with_limit;
|
||||
use crate::http::parse_json_body_with_limit;
|
||||
use crate::intern::RoleNameInt;
|
||||
use crate::types::{EndpointId, RoleName};
|
||||
|
||||
@@ -31,10 +28,6 @@ const MAX_RENEW: Duration = Duration::from_secs(3600);
|
||||
const MAX_JWK_BODY_SIZE: usize = 64 * 1024;
|
||||
const JWKS_USER_AGENT: &str = "neon-proxy";
|
||||
|
||||
const JWKS_CONNECT_TIMEOUT: Duration = Duration::from_secs(2);
|
||||
const JWKS_FETCH_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
const JWKS_FETCH_RETRIES: u32 = 3;
|
||||
|
||||
/// How to get the JWT auth rules
|
||||
pub(crate) trait FetchAuthRules: Clone + Send + Sync + 'static {
|
||||
fn fetch_auth_rules(
|
||||
@@ -62,7 +55,7 @@ pub(crate) struct AuthRule {
|
||||
}
|
||||
|
||||
pub struct JwkCache {
|
||||
client: reqwest_middleware::ClientWithMiddleware,
|
||||
client: reqwest::Client,
|
||||
|
||||
map: DashMap<(EndpointId, RoleName), Arc<JwkCacheEntryLock>>,
|
||||
}
|
||||
@@ -124,14 +117,6 @@ impl Default for JwkCacheEntryLock {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct JwkSet<'a> {
|
||||
/// we parse into raw-value because not all keys in a JWKS are ones
|
||||
/// we can parse directly, so we parse them lazily.
|
||||
#[serde(borrow)]
|
||||
keys: Vec<&'a RawValue>,
|
||||
}
|
||||
|
||||
impl JwkCacheEntryLock {
|
||||
async fn acquire_permit<'a>(self: &'a Arc<Self>) -> JwkRenewalPermit<'a> {
|
||||
JwkRenewalPermit::acquire_permit(self).await
|
||||
@@ -145,7 +130,7 @@ impl JwkCacheEntryLock {
|
||||
&self,
|
||||
_permit: JwkRenewalPermit<'_>,
|
||||
ctx: &RequestMonitoring,
|
||||
client: &reqwest_middleware::ClientWithMiddleware,
|
||||
client: &reqwest::Client,
|
||||
endpoint: EndpointId,
|
||||
auth_rules: &F,
|
||||
) -> Result<Arc<JwkCacheEntry>, JwtError> {
|
||||
@@ -169,73 +154,22 @@ impl JwkCacheEntryLock {
|
||||
let req = client.get(rule.jwks_url.clone());
|
||||
// TODO(conrad): eventually switch to using reqwest_middleware/`new_client_with_timeout`.
|
||||
// TODO(conrad): We need to filter out URLs that point to local resources. Public internet only.
|
||||
match req.send().await.and_then(|r| {
|
||||
r.error_for_status()
|
||||
.map_err(reqwest_middleware::Error::Reqwest)
|
||||
}) {
|
||||
match req.send().await.and_then(|r| r.error_for_status()) {
|
||||
// todo: should we re-insert JWKs if we want to keep this JWKs URL?
|
||||
// I expect these failures would be quite sparse.
|
||||
Err(e) => tracing::warn!(url=?rule.jwks_url, error=?e, "could not fetch JWKs"),
|
||||
Ok(r) => {
|
||||
let resp: http::Response<reqwest::Body> = r.into();
|
||||
|
||||
let bytes = match read_body_with_limit(resp.into_body(), MAX_JWK_BODY_SIZE)
|
||||
.await
|
||||
match parse_json_body_with_limit::<jose_jwk::JwkSet>(
|
||||
resp.into_body(),
|
||||
MAX_JWK_BODY_SIZE,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(bytes) => bytes,
|
||||
Err(e) => {
|
||||
tracing::warn!(url=?rule.jwks_url, error=?e, "could not decode JWKs");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
match serde_json::from_slice::<JwkSet>(&bytes) {
|
||||
Err(e) => {
|
||||
tracing::warn!(url=?rule.jwks_url, error=?e, "could not decode JWKs");
|
||||
}
|
||||
Ok(jwks) => {
|
||||
// size_of::<&RawValue>() == 16
|
||||
// size_of::<jose_jwk::Jwk>() == 288
|
||||
// better to not pre-allocate this as it might be pretty large - especially if it has many
|
||||
// keys we don't want or need.
|
||||
// trivial 'attack': `{"keys":[` + repeat(`0`).take(30000).join(`,`) + `]}`
|
||||
// this would consume 8MiB just like that!
|
||||
let mut keys = vec![];
|
||||
let mut failed = 0;
|
||||
for key in jwks.keys {
|
||||
match serde_json::from_str::<jose_jwk::Jwk>(key.get()) {
|
||||
Ok(key) => {
|
||||
// if `use` (called `cls` in rust) is specified to be something other than signing,
|
||||
// we can skip storing it.
|
||||
if key
|
||||
.prm
|
||||
.cls
|
||||
.as_ref()
|
||||
.is_some_and(|c| *c != jose_jwk::Class::Signing)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
keys.push(key);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::debug!(url=?rule.jwks_url, failed=?e, "could not decode JWK");
|
||||
failed += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
keys.shrink_to_fit();
|
||||
|
||||
if failed > 0 {
|
||||
tracing::warn!(url=?rule.jwks_url, failed, "could not decode JWKs");
|
||||
}
|
||||
|
||||
if keys.is_empty() {
|
||||
tracing::warn!(url=?rule.jwks_url, "no valid JWKs found inside the response body");
|
||||
continue;
|
||||
}
|
||||
|
||||
let jwks = jose_jwk::JwkSet { keys };
|
||||
key_sets.insert(
|
||||
rule.id,
|
||||
KeySet {
|
||||
@@ -245,7 +179,7 @@ impl JwkCacheEntryLock {
|
||||
},
|
||||
);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -262,7 +196,7 @@ impl JwkCacheEntryLock {
|
||||
async fn get_or_update_jwk_cache<F: FetchAuthRules>(
|
||||
self: &Arc<Self>,
|
||||
ctx: &RequestMonitoring,
|
||||
client: &reqwest_middleware::ClientWithMiddleware,
|
||||
client: &reqwest::Client,
|
||||
endpoint: EndpointId,
|
||||
fetch: &F,
|
||||
) -> Result<Arc<JwkCacheEntry>, JwtError> {
|
||||
@@ -316,7 +250,7 @@ impl JwkCacheEntryLock {
|
||||
self: &Arc<Self>,
|
||||
ctx: &RequestMonitoring,
|
||||
jwt: &str,
|
||||
client: &reqwest_middleware::ClientWithMiddleware,
|
||||
client: &reqwest::Client,
|
||||
endpoint: EndpointId,
|
||||
role_name: &RoleName,
|
||||
fetch: &F,
|
||||
@@ -435,19 +369,8 @@ impl Default for JwkCache {
|
||||
let client = Client::builder()
|
||||
.user_agent(JWKS_USER_AGENT)
|
||||
.redirect(redirect::Policy::none())
|
||||
.tls_built_in_native_certs(true)
|
||||
.connect_timeout(JWKS_CONNECT_TIMEOUT)
|
||||
.timeout(JWKS_FETCH_TIMEOUT)
|
||||
.build()
|
||||
.expect("client config should be valid");
|
||||
|
||||
// Retry up to 3 times with increasing intervals between attempts.
|
||||
let retry_policy = ExponentialBackoff::builder().build_with_max_retries(JWKS_FETCH_RETRIES);
|
||||
|
||||
let client = reqwest_middleware::ClientBuilder::new(client)
|
||||
.with(RetryTransientMiddleware::new_with_policy(retry_policy))
|
||||
.build();
|
||||
|
||||
.expect("using &str and standard redirect::Policy");
|
||||
JwkCache {
|
||||
client,
|
||||
map: DashMap::default(),
|
||||
@@ -1286,63 +1209,4 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn check_jwk_keycloak_regression() {
|
||||
let (rs, valid_jwk) = new_rsa_jwk(RS1, "rs1".into());
|
||||
let valid_jwk = serde_json::to_value(valid_jwk).unwrap();
|
||||
|
||||
// This is valid, but we cannot parse it as we have no support for encryption JWKs, only signature based ones.
|
||||
// This is taken directly from keycloak.
|
||||
let invalid_jwk = serde_json::json! {
|
||||
{
|
||||
"kid": "U-Jc9xRli84eNqRpYQoIPF-GNuRWV3ZvAIhziRW2sbQ",
|
||||
"kty": "RSA",
|
||||
"alg": "RSA-OAEP",
|
||||
"use": "enc",
|
||||
"n": "yypYWsEKmM_wWdcPnSGLSm5ytw1WG7P7EVkKSulcDRlrM6HWj3PR68YS8LySYM2D9Z-79oAdZGKhIfzutqL8rK1vS14zDuPpAM-RWY3JuQfm1O_-1DZM8-07PmVRegP5KPxsKblLf_My8ByH6sUOIa1p2rbe2q_b0dSTXYu1t0dW-cGL5VShc400YymvTwpc-5uYNsaVxZajnB7JP1OunOiuCJ48AuVp3PqsLzgoXqlXEB1ZZdch3xT3bxaTtNruGvG4xmLZY68O_T3yrwTCNH2h_jFdGPyXdyZToCMSMK2qSbytlfwfN55pT9Vv42Lz1YmoB7XRjI9aExKPc5AxFw",
|
||||
"e": "AQAB",
|
||||
"x5c": [
|
||||
"MIICmzCCAYMCBgGS41E6azANBgkqhkiG9w0BAQsFADARMQ8wDQYDVQQDDAZtYXN0ZXIwHhcNMjQxMDMxMTYwMTQ0WhcNMzQxMDMxMTYwMzI0WjARMQ8wDQYDVQQDDAZtYXN0ZXIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDLKlhawQqYz/BZ1w+dIYtKbnK3DVYbs/sRWQpK6VwNGWszodaPc9HrxhLwvJJgzYP1n7v2gB1kYqEh/O62ovysrW9LXjMO4+kAz5FZjcm5B+bU7/7UNkzz7Ts+ZVF6A/ko/GwpuUt/8zLwHIfqxQ4hrWnatt7ar9vR1JNdi7W3R1b5wYvlVKFzjTRjKa9PClz7m5g2xpXFlqOcHsk/U66c6K4InjwC5Wnc+qwvOCheqVcQHVll1yHfFPdvFpO02u4a8bjGYtljrw79PfKvBMI0faH+MV0Y/Jd3JlOgIxIwrapJvK2V/B83nmlP1W/jYvPViagHtdGMj1oTEo9zkDEXAgMBAAEwDQYJKoZIhvcNAQELBQADggEBAECYX59+Q9v6c9sb6Q0/C6IgLWG2nVCgVE1YWwIzz+68WrhlmNCRuPjY94roB+tc2tdHbj+Nh3LMzJk7L1KCQoW1+LPK6A6E8W9ad0YPcuw8csV2pUA3+H56exQMH0fUAPQAU7tXWvnQ7otcpV1XA8afn/NTMTsnxi9mSkor8MLMYQ3aeRyh1+LAchHBthWiltqsSUqXrbJF59u5p0ghquuKcWR3TXsA7klGYBgGU5KAJifr9XT87rN0bOkGvbeWAgKvnQnjZwxdnLqTfp/pRY/PiJJHhgIBYPIA7STGnMPjmJ995i34zhnbnd8WHXJA3LxrIMqLW/l8eIdvtM1w8KI="
|
||||
],
|
||||
"x5t": "QhfzMMnuAfkReTgZ1HtrfyOeeZs",
|
||||
"x5t#S256": "cmHDUdKgLiRCEN28D5FBy9IJLFmR7QWfm77SLhGTCTU"
|
||||
}
|
||||
};
|
||||
|
||||
let jwks = serde_json::json! {{ "keys": [invalid_jwk, valid_jwk ] }};
|
||||
let jwks_addr = jwks_server(move |path| match path {
|
||||
"/" => Some(serde_json::to_vec(&jwks).unwrap()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
|
||||
let role_name = RoleName::from("anonymous");
|
||||
let role = RoleNameInt::from(&role_name);
|
||||
|
||||
let rules = vec![AuthRule {
|
||||
id: "foo".to_owned(),
|
||||
jwks_url: format!("http://{jwks_addr}/").parse().unwrap(),
|
||||
audience: None,
|
||||
role_names: vec![role],
|
||||
}];
|
||||
|
||||
let fetch = Fetch(rules);
|
||||
let jwk_cache = JwkCache::default();
|
||||
|
||||
let endpoint = EndpointId::from("ep");
|
||||
|
||||
let token = new_rsa_jwt("rs1".into(), rs);
|
||||
|
||||
jwk_cache
|
||||
.check_jwt(
|
||||
&RequestMonitoring::test(),
|
||||
endpoint.clone(),
|
||||
&role_name,
|
||||
&fetch,
|
||||
&token,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ pub mod health_server;
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::bail;
|
||||
use bytes::Bytes;
|
||||
use http::Method;
|
||||
use http_body_util::BodyExt;
|
||||
@@ -15,7 +16,7 @@ use reqwest_middleware::RequestBuilder;
|
||||
pub(crate) use reqwest_middleware::{ClientWithMiddleware, Error};
|
||||
pub(crate) use reqwest_retry::policies::ExponentialBackoff;
|
||||
pub(crate) use reqwest_retry::RetryTransientMiddleware;
|
||||
use thiserror::Error;
|
||||
use serde::de::DeserializeOwned;
|
||||
|
||||
use crate::metrics::{ConsoleRequest, Metrics};
|
||||
use crate::url::ApiUrl;
|
||||
@@ -121,19 +122,10 @@ impl Endpoint {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub(crate) enum ReadBodyError {
|
||||
#[error("Content length exceeds limit of {limit} bytes")]
|
||||
BodyTooLarge { limit: usize },
|
||||
|
||||
#[error(transparent)]
|
||||
Read(#[from] reqwest::Error),
|
||||
}
|
||||
|
||||
pub(crate) async fn read_body_with_limit(
|
||||
pub(crate) async fn parse_json_body_with_limit<D: DeserializeOwned>(
|
||||
mut b: impl Body<Data = Bytes, Error = reqwest::Error> + Unpin,
|
||||
limit: usize,
|
||||
) -> Result<Vec<u8>, ReadBodyError> {
|
||||
) -> anyhow::Result<D> {
|
||||
// We could use `b.limited().collect().await.to_bytes()` here
|
||||
// but this ends up being slightly more efficient as far as I can tell.
|
||||
|
||||
@@ -141,20 +133,20 @@ pub(crate) async fn read_body_with_limit(
|
||||
// in reqwest, this value is influenced by the Content-Length header.
|
||||
let lower_bound = match usize::try_from(b.size_hint().lower()) {
|
||||
Ok(bound) if bound <= limit => bound,
|
||||
_ => return Err(ReadBodyError::BodyTooLarge { limit }),
|
||||
_ => bail!("Content length exceeds limit of {limit} bytes"),
|
||||
};
|
||||
let mut bytes = Vec::with_capacity(lower_bound);
|
||||
|
||||
while let Some(frame) = b.frame().await.transpose()? {
|
||||
if let Ok(data) = frame.into_data() {
|
||||
if bytes.len() + data.len() > limit {
|
||||
return Err(ReadBodyError::BodyTooLarge { limit });
|
||||
bail!("Content length exceeds limit of {limit} bytes")
|
||||
}
|
||||
bytes.extend_from_slice(&data);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(bytes)
|
||||
Ok(serde_json::from_slice::<D>(&bytes)?)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -1,6 +1,12 @@
|
||||
// rustc lints/lint groups
|
||||
// https://doc.rust-lang.org/rustc/lints/groups.html
|
||||
#![deny(deprecated, future_incompatible, let_underscore, nonstandard_style)]
|
||||
#![deny(
|
||||
deprecated,
|
||||
future_incompatible,
|
||||
let_underscore,
|
||||
nonstandard_style,
|
||||
rust_2024_compatibility
|
||||
)]
|
||||
#![warn(clippy::all, clippy::pedantic, clippy::cargo)]
|
||||
// List of denied lints from the clippy::restriction group.
|
||||
// https://rust-lang.github.io/rust-clippy/master/index.html#?groups=restriction
|
||||
|
||||
@@ -16,7 +16,8 @@ use super::http_conn_pool::ClientDataHttp;
|
||||
use super::local_conn_pool::ClientDataLocal;
|
||||
use crate::auth::backend::ComputeUserInfo;
|
||||
use crate::context::RequestMonitoring;
|
||||
use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
|
||||
use crate::control_plane::messages::ColdStartInfo;
|
||||
use crate::control_plane::messages::MetricsAuxInfo;
|
||||
use crate::metrics::{HttpEndpointPoolsGuard, Metrics};
|
||||
use crate::types::{DbName, EndpointCacheKey, RoleName};
|
||||
use crate::usage_metrics::{Ids, MetricCounter, USAGE_METRICS};
|
||||
|
||||
@@ -7,6 +7,7 @@ use hyper::client::conn::http2;
|
||||
use hyper_util::rt::{TokioExecutor, TokioIo};
|
||||
use parking_lot::RwLock;
|
||||
use rand::Rng;
|
||||
use std::result::Result::Ok;
|
||||
use tokio::net::TcpStream;
|
||||
use tracing::{debug, error, info, info_span, Instrument};
|
||||
|
||||
|
||||
@@ -108,11 +108,16 @@ pub type ReadGuardSharedState<'a> = RwLockReadGuard<'a, SharedState>;
|
||||
pub struct WriteGuardSharedState<'a> {
|
||||
tli: Arc<Timeline>,
|
||||
guard: RwLockWriteGuard<'a, SharedState>,
|
||||
skip_update: bool,
|
||||
}
|
||||
|
||||
impl<'a> WriteGuardSharedState<'a> {
|
||||
fn new(tli: Arc<Timeline>, guard: RwLockWriteGuard<'a, SharedState>) -> Self {
|
||||
WriteGuardSharedState { tli, guard }
|
||||
WriteGuardSharedState {
|
||||
tli,
|
||||
guard,
|
||||
skip_update: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -154,10 +159,12 @@ impl Drop for WriteGuardSharedState<'_> {
|
||||
}
|
||||
});
|
||||
|
||||
// send notification about shared state update
|
||||
self.tli.shared_state_version_tx.send_modify(|old| {
|
||||
*old += 1;
|
||||
});
|
||||
if !self.skip_update {
|
||||
// send notification about shared state update
|
||||
self.tli.shared_state_version_tx.send_modify(|old| {
|
||||
*old += 1;
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ use std::cmp::{max, min};
|
||||
use std::future::Future;
|
||||
use std::io::{self, SeekFrom};
|
||||
use std::pin::Pin;
|
||||
use std::time::Duration;
|
||||
use tokio::fs::{self, remove_file, File, OpenOptions};
|
||||
use tokio::io::{AsyncRead, AsyncWriteExt};
|
||||
use tokio::io::{AsyncReadExt, AsyncSeekExt};
|
||||
@@ -274,15 +275,22 @@ impl PhysicalStorage {
|
||||
});
|
||||
file.set_len(self.wal_seg_size as u64).await?;
|
||||
|
||||
// Note: this doesn't get into observe_flush_seconds metric. But
|
||||
// segment init should be separate metric, if any.
|
||||
if let Err(e) = durable_rename(&tmp_path, &wal_file_partial_path, !self.no_sync).await {
|
||||
// Probably rename succeeded, but fsync of it failed. Remove
|
||||
// the file then to avoid using it.
|
||||
remove_file(wal_file_partial_path)
|
||||
.await
|
||||
.or_else(utils::fs_ext::ignore_not_found)?;
|
||||
return Err(e.into());
|
||||
match durable_rename(&tmp_path, &wal_file_partial_path, !self.no_sync).await {
|
||||
Ok(fsync_latencies) => {
|
||||
for latency in fsync_latencies {
|
||||
if latency != Duration::ZERO {
|
||||
self.metrics.observe_flush_seconds(latency.as_secs_f64());
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
// Probably rename succeeded, but fsync of it failed. Remove
|
||||
// the file then to avoid using it.
|
||||
remove_file(wal_file_partial_path)
|
||||
.await
|
||||
.or_else(utils::fs_ext::ignore_not_found)?;
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
Ok((file, true))
|
||||
}
|
||||
|
||||
@@ -45,4 +45,3 @@ class TokenScope(str, Enum):
|
||||
SAFEKEEPER_DATA = "safekeeperdata"
|
||||
TENANT = "tenant"
|
||||
SCRUBBER = "scrubber"
|
||||
INFRA = "infra"
|
||||
|
||||
@@ -80,13 +80,7 @@ class PgBenchRunResult:
|
||||
):
|
||||
stdout_lines = stdout.splitlines()
|
||||
|
||||
number_of_clients = 0
|
||||
number_of_threads = 0
|
||||
number_of_transactions_actually_processed = 0
|
||||
latency_average = 0.0
|
||||
latency_stddev = None
|
||||
tps = 0.0
|
||||
scale = 0
|
||||
|
||||
# we know significant parts of these values from test input
|
||||
# but to be precise take them from output
|
||||
|
||||
@@ -8,7 +8,7 @@ from contextlib import _GeneratorContextManager, contextmanager
|
||||
|
||||
# Type-related stuff
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, final
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import pytest
|
||||
from _pytest.fixtures import FixtureRequest
|
||||
@@ -70,12 +70,12 @@ class PgCompare(ABC):
|
||||
|
||||
@contextmanager
|
||||
@abstractmethod
|
||||
def record_pageserver_writes(self, out_name: str) -> Iterator[None]:
|
||||
def record_pageserver_writes(self, out_name: str):
|
||||
pass
|
||||
|
||||
@contextmanager
|
||||
@abstractmethod
|
||||
def record_duration(self, out_name: str) -> Iterator[None]:
|
||||
def record_duration(self, out_name: str):
|
||||
pass
|
||||
|
||||
@contextmanager
|
||||
@@ -105,7 +105,6 @@ class PgCompare(ABC):
|
||||
return results
|
||||
|
||||
|
||||
@final
|
||||
class NeonCompare(PgCompare):
|
||||
"""PgCompare interface for the neon stack."""
|
||||
|
||||
@@ -207,7 +206,6 @@ class NeonCompare(PgCompare):
|
||||
return self.zenbenchmark.record_duration(out_name)
|
||||
|
||||
|
||||
@final
|
||||
class VanillaCompare(PgCompare):
|
||||
"""PgCompare interface for vanilla postgres."""
|
||||
|
||||
@@ -273,7 +271,6 @@ class VanillaCompare(PgCompare):
|
||||
return self.zenbenchmark.record_duration(out_name)
|
||||
|
||||
|
||||
@final
|
||||
class RemoteCompare(PgCompare):
|
||||
"""PgCompare interface for a remote postgres instance."""
|
||||
|
||||
|
||||
@@ -4,14 +4,11 @@ https://python-hyper.org/projects/hyper-h2/en/stable/asyncio-example.html
|
||||
auth-broker -> local-proxy needs a h2 connection, so we need a h2 server :)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import collections
|
||||
import io
|
||||
import json
|
||||
from collections.abc import AsyncIterable
|
||||
from typing import TYPE_CHECKING, final
|
||||
|
||||
import pytest_asyncio
|
||||
from h2.config import H2Configuration
|
||||
@@ -28,45 +25,34 @@ from h2.events import (
|
||||
)
|
||||
from h2.exceptions import ProtocolError, StreamClosedError
|
||||
from h2.settings import SettingCodes
|
||||
from typing_extensions import override
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import Any, Optional
|
||||
|
||||
|
||||
RequestData = collections.namedtuple("RequestData", ["headers", "data"])
|
||||
|
||||
|
||||
@final
|
||||
class H2Server:
|
||||
def __init__(self, host: str, port: int) -> None:
|
||||
def __init__(self, host, port) -> None:
|
||||
self.host = host
|
||||
self.port = port
|
||||
|
||||
|
||||
@final
|
||||
class H2Protocol(asyncio.Protocol):
|
||||
def __init__(self):
|
||||
config = H2Configuration(client_side=False, header_encoding="utf-8")
|
||||
self.conn = H2Connection(config=config)
|
||||
self.transport: Optional[asyncio.Transport] = None
|
||||
self.stream_data: dict[int, RequestData] = {}
|
||||
self.flow_control_futures: dict[int, asyncio.Future[Any]] = {}
|
||||
self.transport = None
|
||||
self.stream_data = {}
|
||||
self.flow_control_futures = {}
|
||||
|
||||
@override
|
||||
def connection_made(self, transport: asyncio.BaseTransport):
|
||||
assert isinstance(transport, asyncio.Transport)
|
||||
def connection_made(self, transport: asyncio.Transport): # type: ignore[override]
|
||||
self.transport = transport
|
||||
self.conn.initiate_connection()
|
||||
self.transport.write(self.conn.data_to_send())
|
||||
|
||||
@override
|
||||
def connection_lost(self, exc: Optional[Exception]):
|
||||
def connection_lost(self, _exc):
|
||||
for future in self.flow_control_futures.values():
|
||||
future.cancel()
|
||||
self.flow_control_futures = {}
|
||||
|
||||
@override
|
||||
def data_received(self, data: bytes):
|
||||
assert self.transport is not None
|
||||
try:
|
||||
@@ -91,7 +77,7 @@ class H2Protocol(asyncio.Protocol):
|
||||
self.window_updated(event.stream_id, event.delta)
|
||||
elif isinstance(event, RemoteSettingsChanged):
|
||||
if SettingCodes.INITIAL_WINDOW_SIZE in event.changed_settings:
|
||||
self.window_updated(0, 0)
|
||||
self.window_updated(None, 0)
|
||||
|
||||
self.transport.write(self.conn.data_to_send())
|
||||
|
||||
@@ -137,7 +123,7 @@ class H2Protocol(asyncio.Protocol):
|
||||
else:
|
||||
stream_data.data.write(data)
|
||||
|
||||
def stream_reset(self, stream_id: int):
|
||||
def stream_reset(self, stream_id):
|
||||
"""
|
||||
A stream reset was sent. Stop sending data.
|
||||
"""
|
||||
@@ -145,7 +131,7 @@ class H2Protocol(asyncio.Protocol):
|
||||
future = self.flow_control_futures.pop(stream_id)
|
||||
future.cancel()
|
||||
|
||||
async def send_data(self, data: bytes, stream_id: int):
|
||||
async def send_data(self, data, stream_id):
|
||||
"""
|
||||
Send data according to the flow control rules.
|
||||
"""
|
||||
@@ -175,7 +161,7 @@ class H2Protocol(asyncio.Protocol):
|
||||
self.transport.write(self.conn.data_to_send())
|
||||
data = data[chunk_size:]
|
||||
|
||||
async def wait_for_flow_control(self, stream_id: int):
|
||||
async def wait_for_flow_control(self, stream_id):
|
||||
"""
|
||||
Waits for a Future that fires when the flow control window is opened.
|
||||
"""
|
||||
@@ -183,7 +169,7 @@ class H2Protocol(asyncio.Protocol):
|
||||
self.flow_control_futures[stream_id] = f
|
||||
await f
|
||||
|
||||
def window_updated(self, stream_id: int, delta):
|
||||
def window_updated(self, stream_id, delta):
|
||||
"""
|
||||
A window update frame was received. Unblock some number of flow control
|
||||
Futures.
|
||||
|
||||
@@ -1782,7 +1782,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
self.request(
|
||||
"PUT",
|
||||
f"{self.api}/control/v1/node/{node_id}/drain",
|
||||
headers=self.headers(TokenScope.INFRA),
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
|
||||
def cancel_node_drain(self, node_id):
|
||||
@@ -1790,7 +1790,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
self.request(
|
||||
"DELETE",
|
||||
f"{self.api}/control/v1/node/{node_id}/drain",
|
||||
headers=self.headers(TokenScope.INFRA),
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
|
||||
def node_fill(self, node_id):
|
||||
@@ -1798,7 +1798,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
self.request(
|
||||
"PUT",
|
||||
f"{self.api}/control/v1/node/{node_id}/fill",
|
||||
headers=self.headers(TokenScope.INFRA),
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
|
||||
def cancel_node_fill(self, node_id):
|
||||
@@ -1806,14 +1806,14 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
self.request(
|
||||
"DELETE",
|
||||
f"{self.api}/control/v1/node/{node_id}/fill",
|
||||
headers=self.headers(TokenScope.INFRA),
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
|
||||
def node_status(self, node_id):
|
||||
response = self.request(
|
||||
"GET",
|
||||
f"{self.api}/control/v1/node/{node_id}",
|
||||
headers=self.headers(TokenScope.INFRA),
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
return response.json()
|
||||
|
||||
@@ -1829,7 +1829,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
response = self.request(
|
||||
"GET",
|
||||
f"{self.api}/control/v1/node",
|
||||
headers=self.headers(TokenScope.INFRA),
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
return response.json()
|
||||
|
||||
@@ -1857,7 +1857,7 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
shard_count: Optional[int] = None,
|
||||
shard_stripe_size: Optional[int] = None,
|
||||
tenant_config: Optional[dict[Any, Any]] = None,
|
||||
placement_policy: Optional[Union[dict[Any, Any], str]] = None,
|
||||
placement_policy: Optional[Union[dict[Any, Any] | str]] = None,
|
||||
):
|
||||
"""
|
||||
Use this rather than pageserver_api() when you need to include shard parameters
|
||||
|
||||
@@ -93,8 +93,6 @@ DEFAULT_PAGESERVER_ALLOWED_ERRORS = (
|
||||
".*WARN.*path=/v1/utilization .*request was dropped before completing",
|
||||
# Can happen during shutdown
|
||||
".*scheduling deletion on drop failed: queue is in state Stopped.*",
|
||||
# Too many frozen layers error is normal during intensive benchmarks
|
||||
".*too many frozen layers.*",
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -316,7 +316,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
def tenant_location_conf(
|
||||
self,
|
||||
tenant_id: Union[TenantId, TenantShardId],
|
||||
location_conf: dict[str, Any],
|
||||
location_conf=dict[str, Any],
|
||||
flush_ms=None,
|
||||
lazy: Optional[bool] = None,
|
||||
):
|
||||
|
||||
@@ -56,8 +56,6 @@ def wait_for_upload(
|
||||
lsn: Lsn,
|
||||
):
|
||||
"""waits for local timeline upload up to specified lsn"""
|
||||
|
||||
current_lsn = Lsn(0)
|
||||
for i in range(20):
|
||||
current_lsn = remote_consistent_lsn(pageserver_http, tenant, timeline)
|
||||
if current_lsn >= lsn:
|
||||
@@ -205,8 +203,6 @@ def wait_for_last_record_lsn(
|
||||
lsn: Lsn,
|
||||
) -> Lsn:
|
||||
"""waits for pageserver to catch up to a certain lsn, returns the last observed lsn."""
|
||||
|
||||
current_lsn = Lsn(0)
|
||||
for i in range(1000):
|
||||
current_lsn = last_record_lsn(pageserver_http, tenant, timeline)
|
||||
if current_lsn >= lsn:
|
||||
|
||||
@@ -112,7 +112,7 @@ def compatibility_snapshot_dir() -> Iterator[Path]:
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def compatibility_neon_binpath() -> Iterator[Optional[Path]]:
|
||||
def compatibility_neon_binpath() -> Optional[Iterator[Path]]:
|
||||
if os.getenv("REMOTE_ENV"):
|
||||
return
|
||||
comp_binpath = None
|
||||
@@ -133,7 +133,7 @@ def pg_distrib_dir(base_dir: Path) -> Iterator[Path]:
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def compatibility_pg_distrib_dir() -> Iterator[Optional[Path]]:
|
||||
def compatibility_pg_distrib_dir() -> Optional[Iterator[Path]]:
|
||||
compat_distrib_dir = None
|
||||
if env_compat_postgres_bin := os.environ.get("COMPATIBILITY_POSTGRES_DISTRIB_DIR"):
|
||||
compat_distrib_dir = Path(env_compat_postgres_bin).resolve()
|
||||
|
||||
@@ -2,13 +2,11 @@ from __future__ import annotations
|
||||
|
||||
from contextlib import closing
|
||||
from io import BufferedReader, RawIOBase
|
||||
from typing import Optional, final
|
||||
from typing import Optional
|
||||
|
||||
from fixtures.compare_fixtures import PgCompare
|
||||
from typing_extensions import override
|
||||
|
||||
|
||||
@final
|
||||
class CopyTestData(RawIOBase):
|
||||
def __init__(self, rows: int):
|
||||
self.rows = rows
|
||||
@@ -16,7 +14,6 @@ class CopyTestData(RawIOBase):
|
||||
self.linebuf: Optional[bytes] = None
|
||||
self.ptr = 0
|
||||
|
||||
@override
|
||||
def readable(self):
|
||||
return True
|
||||
|
||||
|
||||
@@ -1,61 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING, cast
|
||||
|
||||
from fixtures.pg_version import PgVersion
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Sequence
|
||||
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
|
||||
def test_default_locales(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test that the default locales for compute databases is C.UTF-8.
|
||||
"""
|
||||
env = neon_simple_env
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
domain_locales = cast(
|
||||
"Sequence[str]",
|
||||
endpoint.safe_psql(
|
||||
"SELECT current_setting('lc_messages') AS lc_messages,"
|
||||
+ "current_setting('lc_monetary') AS lc_monetary,"
|
||||
+ "current_setting('lc_numeric') AS lc_numeric,"
|
||||
+ "current_setting('lc_time') AS lc_time"
|
||||
)[0],
|
||||
)
|
||||
for dl in domain_locales:
|
||||
assert dl == "C.UTF-8"
|
||||
|
||||
# Postgres 15 added the locale providers
|
||||
if env.pg_version < PgVersion.V15:
|
||||
results = cast(
|
||||
"Sequence[str]",
|
||||
endpoint.safe_psql(
|
||||
"SELECT datcollate, datctype FROM pg_database WHERE datname = current_database()"
|
||||
)[0],
|
||||
)
|
||||
|
||||
datcollate = results[0]
|
||||
datctype = results[1]
|
||||
else:
|
||||
results = cast(
|
||||
"Sequence[str]",
|
||||
endpoint.safe_psql(
|
||||
"SELECT datlocprovider, datcollate, datctype FROM pg_database WHERE datname = current_database()"
|
||||
)[0],
|
||||
)
|
||||
datlocprovider = results[0]
|
||||
datcollate = results[1]
|
||||
datctype = results[2]
|
||||
|
||||
if env.pg_version >= PgVersion.V17:
|
||||
assert datlocprovider == "b", "The locale provider is not builtin"
|
||||
else:
|
||||
assert datlocprovider == "c", "The locale provider is not libc"
|
||||
|
||||
assert datcollate == "C.UTF-8"
|
||||
assert datctype == "C.UTF-8"
|
||||
@@ -656,7 +656,6 @@ def test_upgrade_generationless_local_file_paths(
|
||||
workload.write_rows(1000)
|
||||
|
||||
attached_pageserver = env.get_tenant_pageserver(tenant_id)
|
||||
assert attached_pageserver is not None
|
||||
secondary_pageserver = list([ps for ps in env.pageservers if ps.id != attached_pageserver.id])[
|
||||
0
|
||||
]
|
||||
|
||||
@@ -1,50 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
|
||||
from fixtures.neon_fixtures import NeonEnv, logical_replication_sync
|
||||
|
||||
|
||||
def test_physical_and_logical_replication(neon_simple_env: NeonEnv, vanilla_pg):
|
||||
env = neon_simple_env
|
||||
|
||||
n_records = 100000
|
||||
|
||||
primary = env.endpoints.create_start(
|
||||
branch_name="main",
|
||||
endpoint_id="primary",
|
||||
config_lines=["min_wal_size=32MB", "max_wal_size=64MB"],
|
||||
)
|
||||
p_con = primary.connect()
|
||||
p_cur = p_con.cursor()
|
||||
p_cur.execute("CREATE TABLE t(pk bigint primary key, payload text default repeat('?',200))")
|
||||
p_cur.execute("create publication pub1 for table t")
|
||||
|
||||
# start subscriber to primary
|
||||
vanilla_pg.start()
|
||||
vanilla_pg.safe_psql("CREATE TABLE t(pk bigint primary key, payload text)")
|
||||
connstr = primary.connstr().replace("'", "''")
|
||||
vanilla_pg.safe_psql(f"create subscription sub1 connection '{connstr}' publication pub1")
|
||||
|
||||
time.sleep(1)
|
||||
secondary = env.endpoints.new_replica_start(
|
||||
origin=primary,
|
||||
endpoint_id="secondary",
|
||||
config_lines=["min_wal_size=32MB", "max_wal_size=64MB"],
|
||||
)
|
||||
|
||||
s_con = secondary.connect()
|
||||
s_cur = s_con.cursor()
|
||||
|
||||
for pk in range(n_records):
|
||||
p_cur.execute("insert into t (pk) values (%s)", (pk,))
|
||||
|
||||
s_cur.execute("select count(*) from t")
|
||||
assert s_cur.fetchall()[0][0] == n_records
|
||||
|
||||
logical_replication_sync(vanilla_pg, primary)
|
||||
assert vanilla_pg.safe_psql("select count(*) from t")[0][0] == n_records
|
||||
|
||||
# Check that LR slot is not copied to replica
|
||||
s_cur.execute("select count(*) from pg_replication_slots")
|
||||
assert s_cur.fetchall()[0][0] == 0
|
||||
@@ -37,7 +37,7 @@ async def test_websockets(static_proxy: NeonProxy):
|
||||
startup_message.extend(b"\0")
|
||||
length = (4 + len(startup_message)).to_bytes(4, byteorder="big")
|
||||
|
||||
await websocket.send([length, bytes(startup_message)])
|
||||
await websocket.send([length, startup_message])
|
||||
|
||||
startup_response = await websocket.recv()
|
||||
assert isinstance(startup_response, bytes)
|
||||
|
||||
@@ -256,7 +256,6 @@ def test_sharding_split_compaction(
|
||||
# Cleanup part 1: while layers are still in PITR window, we should only drop layers that are fully redundant
|
||||
for shard in shards:
|
||||
ps = env.get_tenant_pageserver(shard)
|
||||
assert ps is not None
|
||||
|
||||
# Invoke compaction: this should drop any layers that don't overlap with the shard's key stripes
|
||||
detail_before = ps.http_client().timeline_detail(shard, timeline_id)
|
||||
|
||||
@@ -1237,7 +1237,6 @@ def test_storage_controller_tenant_deletion(
|
||||
# Assert attachments all have local content
|
||||
for shard_id in shard_ids:
|
||||
pageserver = env.get_tenant_pageserver(shard_id)
|
||||
assert pageserver is not None
|
||||
assert pageserver.tenant_dir(shard_id).exists()
|
||||
|
||||
# Assert all shards have some content in remote storage
|
||||
@@ -2746,7 +2745,6 @@ def test_storage_controller_validate_during_migration(neon_env_builder: NeonEnvB
|
||||
|
||||
# Upload but don't compact
|
||||
origin_pageserver = env.get_tenant_pageserver(tenant_id)
|
||||
assert origin_pageserver is not None
|
||||
dest_ps_id = [p.id for p in env.pageservers if p.id != origin_pageserver.id][0]
|
||||
origin_pageserver.http_client().timeline_checkpoint(
|
||||
tenant_id, timeline_id, wait_until_uploaded=True, compact=False
|
||||
|
||||
@@ -245,7 +245,6 @@ def test_scrubber_physical_gc_ancestors(
|
||||
workload.write_rows(100, upload=False)
|
||||
for shard in shards:
|
||||
ps = env.get_tenant_pageserver(shard)
|
||||
assert ps is not None
|
||||
log.info(f"Waiting for shard {shard} on pageserver {ps.id}")
|
||||
ps.http_client().timeline_checkpoint(
|
||||
shard, timeline_id, compact=False, wait_until_uploaded=True
|
||||
@@ -271,7 +270,6 @@ def test_scrubber_physical_gc_ancestors(
|
||||
workload.churn_rows(100)
|
||||
for shard in shards:
|
||||
ps = env.get_tenant_pageserver(shard)
|
||||
assert ps is not None
|
||||
ps.http_client().timeline_compact(shard, timeline_id, force_image_layer_creation=True)
|
||||
ps.http_client().timeline_gc(shard, timeline_id, 0)
|
||||
|
||||
@@ -338,15 +336,12 @@ def test_scrubber_physical_gc_timeline_deletion(neon_env_builder: NeonEnvBuilder
|
||||
|
||||
# Issue a deletion queue flush so that the parent shard can't leave behind layers
|
||||
# that will look like unexpected garbage to the scrubber
|
||||
ps = env.get_tenant_pageserver(tenant_id)
|
||||
assert ps is not None
|
||||
ps.http_client().deletion_queue_flush(execute=True)
|
||||
env.get_tenant_pageserver(tenant_id).http_client().deletion_queue_flush(execute=True)
|
||||
|
||||
new_shard_count = 4
|
||||
shards = env.storage_controller.tenant_shard_split(tenant_id, shard_count=new_shard_count)
|
||||
for shard in shards:
|
||||
ps = env.get_tenant_pageserver(shard)
|
||||
assert ps is not None
|
||||
log.info(f"Waiting for shard {shard} on pageserver {ps.id}")
|
||||
ps.http_client().timeline_checkpoint(
|
||||
shard, timeline_id, compact=False, wait_until_uploaded=True
|
||||
|
||||
@@ -315,7 +315,6 @@ def test_single_branch_get_tenant_size_grows(
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> tuple[Lsn, int]:
|
||||
size = 0
|
||||
consistent = False
|
||||
size_debug = None
|
||||
|
||||
@@ -361,7 +360,7 @@ def test_single_branch_get_tenant_size_grows(
|
||||
collected_responses.append(("CREATE", current_lsn, size))
|
||||
|
||||
batch_size = 100
|
||||
prev_size = 0
|
||||
|
||||
for i in range(3):
|
||||
with endpoint.cursor() as cur:
|
||||
cur.execute(
|
||||
|
||||
@@ -146,7 +146,6 @@ def test_threshold_based_eviction(
|
||||
out += [f" {remote} {layer.layer_file_name}"]
|
||||
return "\n".join(out)
|
||||
|
||||
stable_for: float = 0
|
||||
observation_window = 8 * eviction_threshold
|
||||
consider_stable_when_no_change_for_seconds = 3 * eviction_threshold
|
||||
poll_interval = eviction_threshold / 3
|
||||
|
||||
@@ -1506,10 +1506,15 @@ class SafekeeperEnv:
|
||||
port=port.http,
|
||||
auth_token=None,
|
||||
)
|
||||
safekeeper_process = start_in_background(
|
||||
cmd, safekeeper_dir, "safekeeper.log", safekeeper_client.check_status
|
||||
)
|
||||
return safekeeper_process
|
||||
try:
|
||||
safekeeper_process = start_in_background(
|
||||
cmd, safekeeper_dir, "safekeeper.log", safekeeper_client.check_status
|
||||
)
|
||||
return safekeeper_process
|
||||
except Exception as e:
|
||||
log.error(e)
|
||||
safekeeper_process.kill()
|
||||
raise Exception(f"Failed to start safekepeer as {cmd}, reason: {e}") from e
|
||||
|
||||
def get_safekeeper_connstrs(self):
|
||||
assert self.safekeepers is not None, "safekeepers are not initialized"
|
||||
|
||||
@@ -64,7 +64,6 @@ def test_wal_restore(
|
||||
),
|
||||
str(data_dir),
|
||||
str(port),
|
||||
env.pg_version,
|
||||
]
|
||||
)
|
||||
restored.start()
|
||||
@@ -128,7 +127,6 @@ def test_wal_restore_initdb(
|
||||
),
|
||||
str(data_dir),
|
||||
str(port),
|
||||
env.pg_version,
|
||||
]
|
||||
)
|
||||
restored.start()
|
||||
|
||||
@@ -64,7 +64,7 @@ rand = { version = "0.8", features = ["small_rng"] }
|
||||
regex = { version = "1" }
|
||||
regex-automata = { version = "0.4", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] }
|
||||
regex-syntax = { version = "0.8" }
|
||||
reqwest = { version = "0.12", default-features = false, features = ["blocking", "json", "rustls-tls", "rustls-tls-native-roots", "stream"] }
|
||||
reqwest = { version = "0.12", default-features = false, features = ["blocking", "json", "rustls-tls", "stream"] }
|
||||
rustls = { version = "0.23", default-features = false, features = ["logging", "ring", "std", "tls12"] }
|
||||
scopeguard = { version = "1" }
|
||||
serde = { version = "1", features = ["alloc", "derive"] }
|
||||
|
||||
Reference in New Issue
Block a user