Compare commits

..

7 Commits

Author SHA1 Message Date
Alexey Kondratov
89cc2c517a Polish API handler and refresh OpenAPI spec 2023-04-05 23:35:35 +03:00
Alexey Kondratov
7cdf703345 Use Condvar and make configuration API blocking 2023-04-05 23:19:10 +03:00
Alexey Kondratov
70383087be Allow starting compute_ctl without spec
With this commit one can start compute with something like
```shell
cargo run --bin compute_ctl -- -i no-compute \
 -p http://localhost:9095 \
 -D compute_pgdata \
 -C "postgresql://cloud_admin@127.0.0.1:5434/postgres" \
 -b ./pg_install/v15/bin/postgres
```
and it will hang waiting for spec.

Then send one spec
```shell
curl -d "$(cat ./compute-spec.json)" http://localhost:3080/spec
```
Postgres will be started and configured.

Then reconfigure it with
```shell
curl -d "$(cat ./compute-spec-new.json)" http://localhost:3080/spec
```

Most of safeguards and comments are added. Some polishing especially
around HTTP API is still needed.
2023-04-05 22:09:43 +03:00
Alexey Kondratov
66dd3f8ca5 Implement live reconfiguration in the compute_ctl
Accept spec in JSON format and request compute reconfiguration from
the configurator thread. If anything goes wrong after we set the
compute state to `ConfigurationPending` and / or sent spec to the
configurator thread, we basically leave compute in the potentially
wrong state. That said, it's control-plane's responsibility to
watch compute state after reconfiguration request and to clean
restart it in case of errors.

It still lacks ability of starting up without spec and some validations,
i.e. that live reconfiguration should be only available with
`--compute-id` and `--control-plane-uri` options.

Otherwise, it works fine and could be tested by running `compute_ctl`
locally, then sending it a new spec:
```shell
curl -d "$(cat ./compute-spec-new.json)" http://localhost:3080/spec
```

We have one configurator thread and async http server, so generally we
have single consumer - multiple producers pattern here. That's why we
use `mpsc` channel, not `tokio::sync::watch`. Actually, concurrency of
producers is limited to one due to code logic, but we still need an
ability to potentially pass `Sender` to several threads.

Next, we use async `hyper` + `tokio` http server, but all the other code
is completely synchronous. So we need to send data from async to sync,
that's why we use `mpsc::unbounded_channel` here, not `mpsc::channel`.
It doesn't make much sense to rewrite all code to async now, but we can
consider doing this in the future.

I think that a combination of `Mutex` and `CondVar` would work just fine
too, but as we already have `tokio`, I decided to try something from it.
2023-04-05 21:31:44 +03:00
Heikki Linnakangas
1f2946af17 try to fix tests 2023-04-05 20:04:14 +03:00
Heikki Linnakangas
2735f1c41e Rename "Postgres nodes" in control_plane to endpoints.
We use the term "endpoint" in for compute Postgres nodes in the web UI
and user-facing documentation now. Adjust the nomenclature in the code.

This changes the name of the "neon_local pg" command to "neon_local
endpoint". Also adjust names of classes, variables etc. in the python
tests accordingly.

This also changes the directory structure so that endpoints are now
stored in:

    .neon/endpoints/<endpoint id>

instead of:

    .neon/pgdatadirs/tenants/<tenant_id>/<endpoint (node) name>

The tenant ID is no longer part of the path. That means that you
cannot have two endpoints with the same name/ID in two different
tenants anymore. That's consistent with how we treat endpoints in the
real control plane and proxy: the endpoint ID must be globally unique.
2023-04-05 19:49:25 +03:00
Heikki Linnakangas
8e06018dae Move compute_ctl structs used in HTTP API and spec file to separate crate.
This is in preparation of using compute_ctl to launch postgres nodes
in the neon_local control plane. And seems like a good idea to
separate the public interfaces anyway.

One non-mechanical change here is that we now use a RwLock rather than
atomics to protect the ComputeNode::metrics field. We were not using
atomics for performance but for convenience here, and an RwLock is now
more convenient.
2023-04-05 19:49:08 +03:00
137 changed files with 2615 additions and 2994 deletions

View File

@@ -14,12 +14,6 @@ inputs:
api_host:
desctiption: 'Neon API host'
default: console.stage.neon.tech
provisioner:
desctiption: 'k8s-pod or k8s-neonvm'
default: 'k8s-pod'
compute_units:
desctiption: '[Min, Max] compute units; Min and Max are used for k8s-neonvm with autoscaling, for k8s-pod values Min and Max should be equal'
default: '[1, 1]'
outputs:
dsn:
@@ -37,10 +31,6 @@ runs:
# A shell without `set -x` to not to expose password/dsn in logs
shell: bash -euo pipefail {0}
run: |
if [ "${PROVISIONER}" == "k8s-pod" ] && [ "${MIN_CU}" != "${MAX_CU}" ]; then
echo >&2 "For k8s-pod provisioner MIN_CU should be equal to MAX_CU"
fi
project=$(curl \
"https://${API_HOST}/api/v2/projects" \
--fail \
@@ -52,9 +42,6 @@ runs:
\"name\": \"Created by actions/neon-project-create; GITHUB_RUN_ID=${GITHUB_RUN_ID}\",
\"pg_version\": ${POSTGRES_VERSION},
\"region_id\": \"${REGION_ID}\",
\"provisioner\": \"${PROVISIONER}\",
\"autoscaling_limit_min_cu\": ${MIN_CU},
\"autoscaling_limit_max_cu\": ${MAX_CU},
\"settings\": { }
}
}")
@@ -75,6 +62,3 @@ runs:
API_KEY: ${{ inputs.api_key }}
REGION_ID: ${{ inputs.region_id }}
POSTGRES_VERSION: ${{ inputs.postgres_version }}
PROVISIONER: ${{ inputs.provisioner }}
MIN_CU: ${{ fromJSON(inputs.compute_units)[0] }}
MAX_CU: ${{ fromJSON(inputs.compute_units)[1] }}

View File

@@ -3,8 +3,6 @@
# fetch params from meta-data service
INSTANCE_ID=$(curl -s http://169.254.169.254/latest/meta-data/instance-id)
AZ_ID=$(curl -s http://169.254.169.254/latest/meta-data/placement/availability-zone)
INSTANCE_TYPE=$(curl -s http://169.254.169.254/latest/meta-data/instance-type)
DISK_SIZE=$(df -B1 /storage | tail -1 | awk '{print $2}')
# store fqdn hostname in var
HOST=$(hostname -f)
@@ -20,9 +18,7 @@ cat <<EOF | tee /tmp/payload
"http_host": "${HOST}",
"http_port": 9898,
"active": false,
"availability_zone_id": "${AZ_ID}",
"disk_size": ${DISK_SIZE},
"instance_type": "${INSTANCE_TYPE}"
"availability_zone_id": "${AZ_ID}"
}
EOF

View File

@@ -23,7 +23,6 @@ settings:
authBackend: "console"
authEndpoint: "http://neon-internal-api.aws.neon.build/management/api/v2"
domain: "*.us-east-2.aws.neon.build"
extraDomains: ["*.us-east-2.postgres.zenith.tech", "*.us-east-2.retooldb-staging.com"]
sentryEnvironment: "staging"
wssPort: 8443
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.build/billing/api/v1/usage_events"

View File

@@ -24,7 +24,6 @@ settings:
authBackend: "console"
authEndpoint: "http://neon-internal-api.aws.neon.tech/management/api/v2"
domain: "*.ap-southeast-1.aws.neon.tech"
# extraDomains: ["*.ap-southeast-1.retooldb.com", "*.ap-southeast-1.postgres.vercel-storage.com"]
sentryEnvironment: "production"
wssPort: 8443
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.tech/billing/api/v1/usage_events"

View File

@@ -24,7 +24,6 @@ settings:
authBackend: "console"
authEndpoint: "http://neon-internal-api.aws.neon.tech/management/api/v2"
domain: "*.eu-central-1.aws.neon.tech"
# extraDomains: ["*.eu-central-1.retooldb.com", "*.eu-central-1.postgres.vercel-storage.com"]
sentryEnvironment: "production"
wssPort: 8443
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.tech/billing/api/v1/usage_events"

View File

@@ -24,7 +24,6 @@ settings:
authBackend: "console"
authEndpoint: "http://neon-internal-api.aws.neon.tech/management/api/v2"
domain: "*.us-east-2.aws.neon.tech"
# extraDomains: ["*.us-east-2.retooldb.com", "*.us-east-2.postgres.vercel-storage.com"]
sentryEnvironment: "production"
wssPort: 8443
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.tech/billing/api/v1/usage_events"

View File

@@ -24,7 +24,6 @@ settings:
authBackend: "console"
authEndpoint: "http://neon-internal-api.aws.neon.tech/management/api/v2"
domain: "*.us-west-2.aws.neon.tech"
# extraDomains: ["*.us-west-2.retooldb.com", "*.us-west-2.postgres.vercel-storage.com"]
sentryEnvironment: "production"
wssPort: 8443
metricCollectionEndpoint: "http://neon-internal-api.aws.neon.tech/billing/api/v1/usage_events"

View File

@@ -107,65 +107,25 @@ jobs:
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
generate-matrices:
# Create matrices for the benchmarking jobs, so we run benchmarks on rds only once a week (on Saturday)
#
# Available platforms:
# - neon-captest-new: Freshly created project (1 CU)
# - neon-captest-freetier: Use freetier-sized compute (0.25 CU)
# - neon-captest-reuse: Reusing existing project
# - rds-aurora: Aurora Postgres Serverless v2 with autoscaling from 0.5 to 2 ACUs
# - rds-postgres: RDS Postgres db.m5.large instance (2 vCPU, 8 GiB) with gp3 EBS storage
runs-on: ubuntu-latest
outputs:
pgbench-compare-matrix: ${{ steps.pgbench-compare-matrix.outputs.matrix }}
olap-compare-matrix: ${{ steps.olap-compare-matrix.outputs.matrix }}
steps:
- name: Generate matrix for pgbench benchmark
id: pgbench-compare-matrix
run: |
matrix='{
"platform": [
"neon-captest-new",
"neon-captest-reuse"
],
"db_size": [ "10gb" ],
"include": [
{ "platform": "neon-captest-freetier", "db_size": "3gb" },
{ "platform": "neon-captest-new", "db_size": "50gb" }
]
}'
if [ "$(date +%A)" = "Saturday" ]; then
matrix=$(echo $matrix | jq '.include += [{ "platform": "rds-postgres", "db_size": "10gb"},
{ "platform": "rds-aurora", "db_size": "50gb"}]')
fi
echo "matrix=$(echo $matrix | jq --compact-output '.')" >> $GITHUB_OUTPUT
- name: Generate matrix for OLAP benchmarks
id: olap-compare-matrix
run: |
matrix='{
"platform": [
"neon-captest-reuse"
]
}'
if [ "$(date +%A)" = "Saturday" ]; then
matrix=$(echo $matrix | jq '.include += [{ "platform": "rds-postgres" },
{ "platform": "rds-aurora" }]')
fi
echo "matrix=$(echo $matrix | jq --compact-output '.')" >> $GITHUB_OUTPUT
pgbench-compare:
needs: [ generate-matrices ]
strategy:
fail-fast: false
matrix: ${{fromJson(needs.generate-matrices.outputs.pgbench-compare-matrix)}}
matrix:
# neon-captest-new: Run pgbench in a freshly created project
# neon-captest-reuse: Same, but reusing existing project
# neon-captest-prefetch: Same, with prefetching enabled (new project)
# rds-aurora: Aurora Postgres Serverless v2 with autoscaling from 0.5 to 2 ACUs
# rds-postgres: RDS Postgres db.m5.large instance (2 vCPU, 8 GiB) with gp3 EBS storage
platform: [ neon-captest-reuse, neon-captest-prefetch, rds-postgres ]
db_size: [ 10gb ]
runner: [ us-east-2 ]
include:
- platform: neon-captest-prefetch
db_size: 50gb
runner: us-east-2
- platform: rds-aurora
db_size: 50gb
runner: us-east-2
env:
TEST_PG_BENCH_DURATIONS_MATRIX: "60m"
@@ -177,7 +137,7 @@ jobs:
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref == 'refs/heads/main' ) }}
PLATFORM: ${{ matrix.platform }}
runs-on: [ self-hosted, us-east-2, x64 ]
runs-on: [ self-hosted, "${{ matrix.runner }}", x64 ]
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
options: --init
@@ -200,14 +160,13 @@ jobs:
echo "${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin" >> $GITHUB_PATH
- name: Create Neon Project
if: contains(fromJson('["neon-captest-new", "neon-captest-freetier"]'), matrix.platform)
if: contains(fromJson('["neon-captest-new", "neon-captest-prefetch"]'), matrix.platform)
id: create-neon-project
uses: ./.github/actions/neon-project-create
with:
region_id: ${{ github.event.inputs.region_id || 'aws-us-east-2' }}
postgres_version: ${{ env.DEFAULT_PG_VERSION }}
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
compute_units: ${{ (matrix.platform == 'neon-captest-freetier' && '[0.25, 0.25]') || '[1, 1]' }}
- name: Set up Connection String
id: set-up-connstr
@@ -216,7 +175,7 @@ jobs:
neon-captest-reuse)
CONNSTR=${{ secrets.BENCHMARK_CAPTEST_CONNSTR }}
;;
neon-captest-new | neon-captest-freetier)
neon-captest-new | neon-captest-prefetch)
CONNSTR=${{ steps.create-neon-project.outputs.dsn }}
;;
rds-aurora)
@@ -226,7 +185,7 @@ jobs:
CONNSTR=${{ secrets.BENCHMARK_RDS_POSTGRES_CONNSTR }}
;;
*)
echo 2>&1 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'neon-captest-new', 'neon-captest-freetier', 'rds-aurora', or 'rds-postgres'"
echo 2>&1 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'neon-captest-new', 'neon-captest-prefetch', 'rds-aurora', or 'rds-postgres'"
exit 1
;;
esac
@@ -235,6 +194,17 @@ jobs:
psql ${CONNSTR} -c "SELECT version();"
- name: Set database options
if: matrix.platform == 'neon-captest-prefetch'
run: |
DB_NAME=$(psql ${BENCHMARK_CONNSTR} --no-align --quiet -t -c "SELECT current_database()")
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE ${DB_NAME} SET enable_seqscan_prefetch=on"
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE ${DB_NAME} SET effective_io_concurrency=32"
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE ${DB_NAME} SET maintenance_io_concurrency=32"
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
- name: Benchmark init
uses: ./.github/actions/run-python-test-set
with:
@@ -306,11 +276,15 @@ jobs:
# *_CLICKBENCH_CONNSTR: Genuine ClickBench DB with ~100M rows
# *_CLICKBENCH_10M_CONNSTR: DB with the first 10M rows of ClickBench DB
if: success() || failure()
needs: [ generate-matrices, pgbench-compare ]
needs: [ pgbench-compare ]
strategy:
fail-fast: false
matrix: ${{ fromJson(needs.generate-matrices.outputs.olap-compare-matrix) }}
matrix:
# neon-captest-prefetch: We have pre-created projects with prefetch enabled
# rds-aurora: Aurora Postgres Serverless v2 with autoscaling from 0.5 to 2 ACUs
# rds-postgres: RDS Postgres db.m5.large instance (2 vCPU, 8 GiB) with gp3 EBS storage
platform: [ neon-captest-prefetch, rds-postgres, rds-aurora ]
env:
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
@@ -346,7 +320,7 @@ jobs:
id: set-up-connstr
run: |
case "${PLATFORM}" in
neon-captest-reuse)
neon-captest-prefetch)
CONNSTR=${{ secrets.BENCHMARK_CAPTEST_CLICKBENCH_10M_CONNSTR }}
;;
rds-aurora)
@@ -356,7 +330,7 @@ jobs:
CONNSTR=${{ secrets.BENCHMARK_RDS_POSTGRES_CLICKBENCH_10M_CONNSTR }}
;;
*)
echo 2>&1 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'rds-aurora', or 'rds-postgres'"
echo 2>&1 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-prefetch', 'rds-aurora', or 'rds-postgres'"
exit 1
;;
esac
@@ -365,6 +339,17 @@ jobs:
psql ${CONNSTR} -c "SELECT version();"
- name: Set database options
if: matrix.platform == 'neon-captest-prefetch'
run: |
DB_NAME=$(psql ${BENCHMARK_CONNSTR} --no-align --quiet -t -c "SELECT current_database()")
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE ${DB_NAME} SET enable_seqscan_prefetch=on"
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE ${DB_NAME} SET effective_io_concurrency=32"
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE ${DB_NAME} SET maintenance_io_concurrency=32"
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
- name: ClickBench benchmark
uses: ./.github/actions/run-python-test-set
with:
@@ -402,11 +387,15 @@ jobs:
#
# *_TPCH_S10_CONNSTR: DB generated with scale factor 10 (~10 GB)
if: success() || failure()
needs: [ generate-matrices, clickbench-compare ]
needs: [ clickbench-compare ]
strategy:
fail-fast: false
matrix: ${{ fromJson(needs.generate-matrices.outputs.olap-compare-matrix) }}
matrix:
# neon-captest-prefetch: We have pre-created projects with prefetch enabled
# rds-aurora: Aurora Postgres Serverless v2 with autoscaling from 0.5 to 2 ACUs
# rds-postgres: RDS Postgres db.m5.large instance (2 vCPU, 8 GiB) with gp3 EBS storage
platform: [ neon-captest-prefetch, rds-postgres, rds-aurora ]
env:
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
@@ -442,7 +431,7 @@ jobs:
id: set-up-connstr
run: |
case "${PLATFORM}" in
neon-captest-reuse)
neon-captest-prefetch)
CONNSTR=${{ secrets.BENCHMARK_CAPTEST_TPCH_S10_CONNSTR }}
;;
rds-aurora)
@@ -452,7 +441,7 @@ jobs:
CONNSTR=${{ secrets.BENCHMARK_RDS_POSTGRES_TPCH_S10_CONNSTR }}
;;
*)
echo 2>&1 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'rds-aurora', or 'rds-postgres'"
echo 2>&1 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-prefetch', 'rds-aurora', or 'rds-postgres'"
exit 1
;;
esac
@@ -461,6 +450,17 @@ jobs:
psql ${CONNSTR} -c "SELECT version();"
- name: Set database options
if: matrix.platform == 'neon-captest-prefetch'
run: |
DB_NAME=$(psql ${BENCHMARK_CONNSTR} --no-align --quiet -t -c "SELECT current_database()")
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE ${DB_NAME} SET enable_seqscan_prefetch=on"
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE ${DB_NAME} SET effective_io_concurrency=32"
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE ${DB_NAME} SET maintenance_io_concurrency=32"
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
- name: Run TPC-H benchmark
uses: ./.github/actions/run-python-test-set
with:
@@ -492,11 +492,15 @@ jobs:
user-examples-compare:
if: success() || failure()
needs: [ generate-matrices, tpch-compare ]
needs: [ tpch-compare ]
strategy:
fail-fast: false
matrix: ${{ fromJson(needs.generate-matrices.outputs.olap-compare-matrix) }}
matrix:
# neon-captest-prefetch: We have pre-created projects with prefetch enabled
# rds-aurora: Aurora Postgres Serverless v2 with autoscaling from 0.5 to 2 ACUs
# rds-postgres: RDS Postgres db.m5.large instance (2 vCPU, 8 GiB) with gp3 EBS storage
platform: [ neon-captest-prefetch, rds-postgres, rds-aurora ]
env:
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
@@ -532,7 +536,7 @@ jobs:
id: set-up-connstr
run: |
case "${PLATFORM}" in
neon-captest-reuse)
neon-captest-prefetch)
CONNSTR=${{ secrets.BENCHMARK_USER_EXAMPLE_CAPTEST_CONNSTR }}
;;
rds-aurora)
@@ -542,7 +546,7 @@ jobs:
CONNSTR=${{ secrets.BENCHMARK_USER_EXAMPLE_RDS_POSTGRES_CONNSTR }}
;;
*)
echo 2>&1 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'rds-aurora', or 'rds-postgres'"
echo 2>&1 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-prefetch', 'rds-aurora', or 'rds-postgres'"
exit 1
;;
esac
@@ -551,6 +555,17 @@ jobs:
psql ${CONNSTR} -c "SELECT version();"
- name: Set database options
if: matrix.platform == 'neon-captest-prefetch'
run: |
DB_NAME=$(psql ${BENCHMARK_CONNSTR} --no-align --quiet -t -c "SELECT current_database()")
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE ${DB_NAME} SET enable_seqscan_prefetch=on"
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE ${DB_NAME} SET effective_io_concurrency=32"
psql ${BENCHMARK_CONNSTR} -c "ALTER DATABASE ${DB_NAME} SET maintenance_io_concurrency=32"
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
- name: Run user examples
uses: ./.github/actions/run-python-test-set
with:

View File

@@ -338,7 +338,6 @@ jobs:
rerun_flaky: true
env:
TEST_RESULT_CONNSTR: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR }}
CHECK_ONDISK_DATA_COMPATIBILITY: nonempty
- name: Merge and upload coverage data
if: matrix.build_type == 'debug'
@@ -410,8 +409,6 @@ jobs:
steps.create-allure-report-release.outputs.report-url
)
with:
# Retry script for 5XX server errors: https://github.com/actions/github-script#retries
retries: 5
script: |
const reports = [{
buildType: "debug",

2
Cargo.lock generated
View File

@@ -879,7 +879,6 @@ dependencies = [
"tracing-subscriber",
"tracing-utils",
"url",
"utils",
"workspace_hack",
]
@@ -3368,7 +3367,6 @@ dependencies = [
"tempfile",
"thiserror",
"tokio",
"tokio-io-timeout",
"tokio-postgres",
"toml_edit",
"tracing",

View File

@@ -101,7 +101,6 @@ test-context = "0.1"
thiserror = "1.0"
tls-listener = { version = "0.6", features = ["rustls", "hyper-h1"] }
tokio = { version = "1.17", features = ["macros"] }
tokio-io-timeout = "1.2.0"
tokio-postgres-rustls = "0.9.0"
tokio-rustls = "0.23"
tokio-stream = "0.1"

View File

@@ -38,7 +38,6 @@ RUN cd postgres && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/insert_username.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/intagg.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/moddatetime.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_stat_statements.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgrowlocks.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgstattuple.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/refint.control && \
@@ -301,27 +300,6 @@ RUN wget https://github.com/okbob/plpgsql_check/archive/refs/tags/v2.3.2.tar.gz
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/plpgsql_check.control
#########################################################################################
#
# Layer "timescaledb-pg-build"
# compile timescaledb extension
#
#########################################################################################
FROM build-deps AS timescaledb-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
ENV PATH "/usr/local/pgsql/bin:$PATH"
RUN apt-get update && \
apt-get install -y cmake && \
wget https://github.com/timescale/timescaledb/archive/refs/tags/2.10.1.tar.gz -O timescaledb.tar.gz && \
mkdir timescaledb-src && cd timescaledb-src && tar xvzf ../timescaledb.tar.gz --strip-components=1 -C . && \
./bootstrap -DSEND_TELEMETRY_DEFAULT:BOOL=OFF -DUSE_TELEMETRY:BOOL=OFF -DAPACHE_ONLY:BOOL=ON && \
cd build && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make install -j $(getconf _NPROCESSORS_ONLN) && \
echo "trusted = true" >> /usr/local/pgsql/share/extension/timescaledb.control
#########################################################################################
#
# Layer "rust extensions"
@@ -426,7 +404,6 @@ COPY --from=pgtap-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=prefix-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=hll-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=plpgsql-check-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=timescaledb-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY pgxn/ pgxn/
RUN make -j $(getconf _NPROCESSORS_ONLN) \

View File

@@ -147,15 +147,15 @@ Created an initial timeline 'de200bd42b49cc1814412c7e592dd6e9' at Lsn 0/16B5A50
Setting tenant 9ef87a5bf0d92544f6fafeeb3239695c as a default one
# start postgres compute node
> ./target/debug/neon_local pg start main
Starting new postgres (v14) main on timeline de200bd42b49cc1814412c7e592dd6e9 ...
> ./target/debug/neon_local endpoint start main
Starting new endpoint main (PostgreSQL v14) on timeline de200bd42b49cc1814412c7e592dd6e9 ...
Extracting base backup to create postgres instance: path=.neon/pgdatadirs/tenants/9ef87a5bf0d92544f6fafeeb3239695c/main port=55432
Starting postgres node at 'host=127.0.0.1 port=55432 user=cloud_admin dbname=postgres'
Starting postgres at 'host=127.0.0.1 port=55432 user=cloud_admin dbname=postgres'
# check list of running postgres instances
> ./target/debug/neon_local pg list
NODE ADDRESS TIMELINE BRANCH NAME LSN STATUS
main 127.0.0.1:55432 de200bd42b49cc1814412c7e592dd6e9 main 0/16B5BA8 running
> ./target/debug/neon_local endpoint list
ENDPOINT ADDRESS TIMELINE BRANCH NAME LSN STATUS
main 127.0.0.1:55432 de200bd42b49cc1814412c7e592dd6e9 main 0/16B5BA8 running
```
2. Now, it is possible to connect to postgres and run some queries:
@@ -184,14 +184,14 @@ Created timeline 'b3b863fa45fa9e57e615f9f2d944e601' at Lsn 0/16F9A00 for tenant:
(L) ┗━ @0/16F9A00: migration_check [b3b863fa45fa9e57e615f9f2d944e601]
# start postgres on that branch
> ./target/debug/neon_local pg start migration_check --branch-name migration_check
Starting new postgres migration_check on timeline b3b863fa45fa9e57e615f9f2d944e601 ...
> ./target/debug/neon_local endpoint start migration_check --branch-name migration_check
Starting new endpoint migration_check (PostgreSQL v14) on timeline b3b863fa45fa9e57e615f9f2d944e601 ...
Extracting base backup to create postgres instance: path=.neon/pgdatadirs/tenants/9ef87a5bf0d92544f6fafeeb3239695c/migration_check port=55433
Starting postgres node at 'host=127.0.0.1 port=55433 user=cloud_admin dbname=postgres'
Starting postgres at 'host=127.0.0.1 port=55433 user=cloud_admin dbname=postgres'
# check the new list of running postgres instances
> ./target/debug/neon_local pg list
NODE ADDRESS TIMELINE BRANCH NAME LSN STATUS
> ./target/debug/neon_local endpoint list
ENDPOINT ADDRESS TIMELINE BRANCH NAME LSN STATUS
main 127.0.0.1:55432 de200bd42b49cc1814412c7e592dd6e9 main 0/16F9A38 running
migration_check 127.0.0.1:55433 b3b863fa45fa9e57e615f9f2d944e601 migration_check 0/16F9A70 running

View File

@@ -28,5 +28,4 @@ tracing-utils.workspace = true
url.workspace = true
compute_api.workspace = true
utils.workspace = true
workspace_hack.workspace = true

View File

@@ -34,23 +34,24 @@ use std::fs::File;
use std::panic;
use std::path::Path;
use std::process::exit;
use std::sync::{mpsc, Arc, Condvar, Mutex};
use std::sync::{Arc, Condvar, Mutex};
use std::{thread, time::Duration};
use anyhow::{Context, Result};
use anyhow::{anyhow, Context, Result};
use chrono::Utc;
use clap::Arg;
use tracing::{error, info};
use url::Url;
use compute_api::responses::ComputeStatus;
use compute_api::models::{ComputeMetrics, ComputeState, ComputeStatus};
use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec};
use compute_tools::compute::{ComputeNode, ComputeNodeInner, ParsedSpec};
use compute_tools::configurator::launch_configurator;
use compute_tools::http::api::launch_http_server;
use compute_tools::logger::*;
use compute_tools::monitor::launch_monitor;
use compute_tools::params::*;
use compute_tools::spec::*;
use compute_tools::spec::get_spec_from_control_plane;
fn main() -> Result<()> {
init_tracing_and_logging(DEFAULT_LOG_LEVEL)?;
@@ -103,24 +104,37 @@ fn main() -> Result<()> {
}
};
let mut new_state = ComputeState::new();
let spec_set;
if let Some(spec) = spec {
let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
new_state.pspec = Some(pspec);
spec_set = true;
} else {
spec_set = false;
}
// Volatile compute state under mutex and condition variable to notify everyone
// who is interested in the state changes.
let compute_node = ComputeNode {
start_time: Utc::now(),
connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?,
pgdata: pgdata.to_string(),
pgbin: pgbin.to_string(),
live_config_allowed,
state: Mutex::new(new_state),
state_changed: Condvar::new(),
inner: Mutex::new(ComputeNodeInner {
state: ComputeState {
status: ComputeStatus::Empty,
last_active: Utc::now(),
error: None,
},
spec: None,
metrics: ComputeMetrics::default(),
}),
state_changed: Condvar::new()
};
// If we have a spec already, go immediately into Init state.
let spec_set = spec.is_some();
if let Some(spec) = spec {
let mut inner = compute_node.inner.lock().unwrap();
let parsed_spec = ParsedSpec::try_from(spec)
.map_err(|msg| anyhow!("error parsing compute spec: {msg}"))?;
inner.spec = Some(parsed_spec);
inner.state.status = ComputeStatus::Init;
}
let compute = Arc::new(compute_node);
// Launch http service first, so we were able to serve control-plane
@@ -128,27 +142,27 @@ fn main() -> Result<()> {
let _http_handle = launch_http_server(&compute).expect("cannot launch http endpoint thread");
if !spec_set {
// No spec provided, hang waiting for it.
// No spec was provided earlier, hang waiting for it.
info!("no compute spec provided, waiting");
let mut state = compute.state.lock().unwrap();
while state.status != ComputeStatus::ConfigurationPending {
state = compute.state_changed.wait(state).unwrap();
if state.status == ComputeStatus::ConfigurationPending {
let mut inner = compute.inner.lock().unwrap();
while inner.state.status != ComputeStatus::ConfigurationPending {
inner = compute.state_changed.wait(inner).unwrap();
if inner.state.status == ComputeStatus::ConfigurationPending {
info!("got spec, continue configuration");
// Spec is already set by the http server handler.
inner.state.status = ComputeStatus::Init;
break;
}
}
}
};
// We got all we need, update the state.
let mut state = compute.state.lock().unwrap();
let pspec = state.pspec.as_ref().expect("spec must be set");
let startup_tracing_context = pspec.spec.startup_tracing_context.clone();
state.status = ComputeStatus::Init;
compute.state_changed.notify_all();
drop(state);
// We got the spec. Start up
let startup_tracing_context = {
let inner = compute.inner.lock().unwrap();
inner.spec.as_ref().unwrap().spec.startup_tracing_context.clone()
};
// Extract OpenTelemetry context for the startup actions from the spec, and
// attach it to the current tracing context.
@@ -175,6 +189,8 @@ fn main() -> Result<()> {
// Launch remaining service threads
let _monitor_handle = launch_monitor(&compute).expect("cannot launch compute monitor thread");
let _configurator_handle =
launch_configurator(&compute).expect("cannot launch configurator thread");
// Start Postgres
let mut delay_exit = false;
@@ -183,10 +199,10 @@ fn main() -> Result<()> {
Ok(pg) => Some(pg),
Err(err) => {
error!("could not start the compute node: {:?}", err);
let mut state = compute.state.lock().unwrap();
state.error = Some(format!("{:?}", err));
state.status = ComputeStatus::Failed;
drop(state);
let mut inner = compute.inner.lock().unwrap();
inner.state.error = Some(format!("{:?}", err));
inner.state.status = ComputeStatus::Failed;
drop(inner);
delay_exit = true;
None
}
@@ -216,25 +232,10 @@ fn main() -> Result<()> {
thread::sleep(Duration::from_secs(30));
}
// Shutdown trace pipeline gracefully, so that it has a chance to send any
// pending traces before we exit. Shutting down OTEL tracing provider may
// hang for quite some time, see, for example:
// - https://github.com/open-telemetry/opentelemetry-rust/issues/868
// - and our problems with staging https://github.com/neondatabase/cloud/issues/3707#issuecomment-1493983636
//
// Yet, we want computes to shut down fast enough, as we may need a new one
// for the same timeline ASAP. So wait no longer than 2s for the shutdown to
// complete, then just error out and exit the main thread.
info!("shutting down tracing");
let (sender, receiver) = mpsc::channel();
let _ = thread::spawn(move || {
tracing_utils::shutdown_tracing();
sender.send(()).ok()
});
let shutdown_res = receiver.recv_timeout(Duration::from_millis(2000));
if shutdown_res.is_err() {
error!("timed out while shutting down tracing, exiting anyway");
}
// Shutdown trace pipeline gracefully, so that it has a chance to send any
// pending traces before we exit.
tracing_utils::shutdown_tracing();
info!("shutting down");
exit(exit_code.unwrap_or(1))

View File

@@ -26,10 +26,8 @@ use chrono::{DateTime, Utc};
use postgres::{Client, NoTls};
use tokio_postgres;
use tracing::{info, instrument, warn};
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use compute_api::responses::{ComputeMetrics, ComputeStatus};
use compute_api::models::{ComputeMetrics, ComputeState, ComputeStatus};
use compute_api::spec::ComputeSpec;
use crate::checker::create_writability_check_data;
@@ -44,66 +42,47 @@ pub struct ComputeNode {
pub connstr: url::Url,
pub pgdata: String,
pub pgbin: String,
/// We should only allow live re- / configuration of the compute node if
/// it uses 'pull model', i.e. it can go to control-plane and fetch
/// the latest configuration. Otherwise, there could be a case:
/// - we start compute with some spec provided as argument
/// - we push new spec and it does reconfiguration
/// - but then something happens and compute pod / VM is destroyed,
/// so k8s controller starts it again with the **old** spec
/// and the same for empty computes:
/// - we started compute without any spec
/// - we push spec and it does configuration
/// - but then it is restarted without any spec again
// We only allow live re- / configuration of the compute node if
// it uses 'pull model', i.e. it can go to control-plane and fetch
// the latest configuration. Otherwise, there could be a case:
// - we start compute with some spec provided as argument
// - we push new spec and it does reconfiguration
// - but then something happens and compute pod / VM is destroyed,
// so k8s controller starts it again with the **old** spec
pub live_config_allowed: bool,
/// Volatile part of the `ComputeNode`, which should be used under `Mutex`.
/// To allow HTTP API server to serving status requests, while configuration
/// is in progress, lock should be held only for short periods of time to do
/// read/write, not the whole configuration process.
pub state: Mutex<ComputeState>,
/// `Condvar` to allow notifying waiters about state changes.
/// Coupled with `Condvar` to allow notifying HTTP API and configurator
/// thread about state changes. To allow HTTP API server to serving status
/// requests, while configuration is in progress, lock should be held only
/// for short periods of time to do read/write, not the whole configuration
/// process.
pub inner: Mutex<ComputeNodeInner>,
pub state_changed: Condvar,
}
#[derive(Clone, Debug)]
pub struct ComputeState {
pub status: ComputeStatus,
/// Timestamp of the last Postgres activity
pub last_active: DateTime<Utc>,
pub error: Option<String>,
pub pspec: Option<ParsedSpec>,
pub struct ComputeNodeInner {
pub state: ComputeState,
pub spec: Option<ParsedSpec>,
pub metrics: ComputeMetrics,
}
impl ComputeState {
pub fn new() -> Self {
Self {
status: ComputeStatus::Empty,
last_active: Utc::now(),
error: None,
pspec: None,
metrics: ComputeMetrics::default(),
}
}
}
impl Default for ComputeState {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone, Debug)]
#[derive(Clone)]
pub struct ParsedSpec {
pub spec: ComputeSpec,
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
// extra fields extracted from 'spec'.
pub tenant: String,
pub timeline: String,
pub pageserver_connstr: String,
pub storage_auth_token: Option<String>,
}
impl TryFrom<ComputeSpec> for ParsedSpec {
type Error = String;
fn try_from(spec: ComputeSpec) -> Result<Self, String> {
let pageserver_connstr = spec
.cluster
@@ -111,40 +90,44 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
.find("neon.pageserver_connstring")
.ok_or("pageserver connstr should be provided")?;
let storage_auth_token = spec.storage_auth_token.clone();
let tenant_id: TenantId = spec
let tenant = spec
.cluster
.settings
.find("neon.tenant_id")
.ok_or("tenant id should be provided")
.map(|s| TenantId::from_str(&s))?
.or(Err("invalid tenant id"))?;
let timeline_id: TimelineId = spec
.ok_or("tenant id should be provided")?;
let timeline = spec
.cluster
.settings
.find("neon.timeline_id")
.ok_or("timeline id should be provided")
.map(|s| TimelineId::from_str(&s))?
.or(Err("invalid timeline id"))?;
.ok_or("tenant id should be provided")?;
Ok(ParsedSpec {
spec,
pageserver_connstr,
storage_auth_token,
tenant_id,
timeline_id,
tenant,
timeline,
})
}
}
impl ComputeNode {
pub fn set_status(&self, status: ComputeStatus) {
let mut state = self.state.lock().unwrap();
state.status = status;
let mut inner = self.inner.lock().unwrap();
inner.state.status = status;
self.state_changed.notify_all();
}
pub fn get_status(&self) -> ComputeStatus {
self.state.lock().unwrap().status
self.inner.lock().unwrap().state.status
}
pub fn get_state(&self) -> ComputeState {
self.inner.lock().unwrap().state.clone()
}
pub fn get_metrics(&self) -> ComputeMetrics {
self.inner.lock().unwrap().metrics.clone()
}
// Remove `pgdata` directory and create it again with right permissions.
@@ -160,9 +143,8 @@ impl ComputeNode {
// Get basebackup from the libpq connection to pageserver using `connstr` and
// unarchive it to `pgdata` directory overriding all its previous content.
#[instrument(skip(self, compute_state))]
fn get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
let spec = compute_state.pspec.as_ref().expect("spec must be set");
#[instrument(skip(self, spec))]
fn get_basebackup(&self, spec: &ParsedSpec, lsn: &str) -> Result<()> {
let start_time = Utc::now();
let mut config = postgres::Config::from_str(&spec.pageserver_connstr)?;
@@ -178,8 +160,8 @@ impl ComputeNode {
let mut client = config.connect(NoTls)?;
let basebackup_cmd = match lsn {
Lsn(0) => format!("basebackup {} {}", spec.tenant_id, spec.timeline_id), // First start of the compute
_ => format!("basebackup {} {} {}", spec.tenant_id, spec.timeline_id, lsn),
"0/0" => format!("basebackup {} {}", &spec.tenant, &spec.timeline), // First start of the compute
_ => format!("basebackup {} {} {}", &spec.tenant, &spec.timeline, lsn),
};
let copyreader = client.copy_out(basebackup_cmd.as_str())?;
@@ -192,7 +174,7 @@ impl ComputeNode {
ar.set_ignore_zeros(true);
ar.unpack(&self.pgdata)?;
self.state.lock().unwrap().metrics.basebackup_ms = Utc::now()
self.inner.lock().unwrap().metrics.basebackup_ms = Utc::now()
.signed_duration_since(start_time)
.to_std()
.unwrap()
@@ -203,7 +185,7 @@ impl ComputeNode {
// Run `postgres` in a special mode with `--sync-safekeepers` argument
// and return the reported LSN back to the caller.
#[instrument(skip(self, storage_auth_token))]
fn sync_safekeepers(&self, storage_auth_token: Option<String>) -> Result<Lsn> {
fn sync_safekeepers(&self, storage_auth_token: Option<String>) -> Result<String> {
let start_time = Utc::now();
let sync_handle = Command::new(&self.pgbin)
@@ -234,42 +216,41 @@ impl ComputeNode {
);
}
self.state.lock().unwrap().metrics.sync_safekeepers_ms = Utc::now()
self.inner.lock().unwrap().metrics.sync_safekeepers_ms = Utc::now()
.signed_duration_since(start_time)
.to_std()
.unwrap()
.as_millis() as u64;
let lsn = Lsn::from_str(String::from_utf8(sync_output.stdout)?.trim())?;
let lsn = String::from(String::from_utf8(sync_output.stdout)?.trim());
Ok(lsn)
}
/// Do all the preparations like PGDATA directory creation, configuration,
/// safekeepers sync, basebackup, etc.
#[instrument(skip(self, compute_state))]
pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> {
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
#[instrument(skip(self, spec))]
fn prepare_pgdata(&self, spec: &ParsedSpec) -> Result<()> {
let pgdata_path = Path::new(&self.pgdata);
// Remove/create an empty pgdata directory and put configuration there.
self.create_pgdata()?;
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &pspec.spec)?;
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec.spec)?;
info!("starting safekeepers syncing");
let lsn = self
.sync_safekeepers(pspec.storage_auth_token.clone())
.sync_safekeepers(spec.storage_auth_token.clone())
.with_context(|| "failed to sync safekeepers")?;
info!("safekeepers synced at LSN {}", lsn);
info!(
"getting basebackup@{} from pageserver {}",
lsn, &pspec.pageserver_connstr
lsn, &spec.pageserver_connstr
);
self.get_basebackup(compute_state, lsn).with_context(|| {
self.get_basebackup(spec, &lsn).with_context(|| {
format!(
"failed to get basebackup@{} from pageserver {}",
lsn, &pspec.pageserver_connstr
lsn, &spec.pageserver_connstr
)
})?;
@@ -305,8 +286,8 @@ impl ComputeNode {
}
/// Do initial configuration of the already started Postgres.
#[instrument(skip(self, compute_state))]
pub fn apply_config(&self, compute_state: &ComputeState) -> Result<()> {
#[instrument(skip(self, spec))]
fn apply_config(&self, spec: &ParsedSpec) -> Result<()> {
// If connection fails,
// it may be the old node with `zenith_admin` superuser.
//
@@ -337,20 +318,64 @@ impl ComputeNode {
};
// Proceed with post-startup configuration. Note, that order of operations is important.
let spec = &compute_state.pspec.as_ref().expect("spec must be set").spec;
handle_roles(spec, &mut client)?;
handle_databases(spec, &mut client)?;
handle_role_deletions(spec, self.connstr.as_str(), &mut client)?;
handle_grants(spec, self.connstr.as_str(), &mut client)?;
handle_roles(&spec.spec, &mut client)?;
handle_databases(&spec.spec, &mut client)?;
handle_role_deletions(&spec.spec, self.connstr.as_str(), &mut client)?;
handle_grants(&spec.spec, self.connstr.as_str(), &mut client)?;
create_writability_check_data(&mut client)?;
handle_extensions(spec, &mut client)?;
handle_extensions(&spec.spec, &mut client)?;
// 'Close' connection
drop(client);
info!(
"finished configuration of compute for project {}",
spec.cluster.cluster_id
spec.spec.cluster.cluster_id
);
Ok(())
}
// We could've wrapped this around `pg_ctl reload`, but right now we don't use
// `pg_ctl` for start / stop, so this just seems much easier to do as we already
// have opened connection to Postgres and superuser access.
#[instrument(skip(self, client))]
fn pg_reload_conf(&self, client: &mut Client) -> Result<()> {
client.simple_query("SELECT pg_reload_conf()")?;
Ok(())
}
/// Similar to `apply_config()`, but does a bit different sequence of operations,
/// as it's used to reconfigure a previously started and configured Postgres node.
#[instrument(skip(self))]
pub fn reconfigure(&self) -> Result<()> {
let spec = {
let inner = self.inner.lock().unwrap();
inner.spec.as_ref().expect("cannot start_compute without spec").spec.clone()
};
// Write new config
let pgdata_path = Path::new(&self.pgdata);
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec)?;
let mut client = Client::connect(self.connstr.as_str(), NoTls)?;
self.pg_reload_conf(&mut client)?;
// Proceed with post-startup configuration. Note, that order of operations is important.
handle_roles(&spec, &mut client)?;
handle_databases(&spec, &mut client)?;
handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
handle_grants(&spec, self.connstr.as_str(), &mut client)?;
handle_extensions(&spec, &mut client)?;
// 'Close' connection
drop(client);
let unknown_op = "unknown".to_string();
let op_id = spec.operation_uuid.as_ref().unwrap_or(&unknown_op);
info!(
"finished reconfiguration of compute node for operation {}",
op_id
);
Ok(())
@@ -358,33 +383,39 @@ impl ComputeNode {
#[instrument(skip(self))]
pub fn start_compute(&self) -> Result<std::process::Child> {
let compute_state = self.state.lock().unwrap().clone();
let spec = compute_state.pspec.as_ref().expect("spec must be set");
let spec = self
.inner
.lock()
.unwrap()
.spec
.as_ref()
.expect("cannot start_compute without spec")
.clone();
info!(
"starting compute for project {}, operation {}, tenant {}, timeline {}",
spec.spec.cluster.cluster_id,
spec.spec.operation_uuid.as_deref().unwrap_or("None"),
spec.tenant_id,
spec.timeline_id,
spec.spec.operation_uuid.as_ref().unwrap(),
spec.tenant,
spec.timeline,
);
self.prepare_pgdata(&compute_state)?;
self.prepare_pgdata(&spec)?;
let start_time = Utc::now();
let pg = self.start_postgres(spec.storage_auth_token.clone())?;
self.apply_config(&compute_state)?;
self.apply_config(&spec)?;
let startup_end_time = Utc::now();
{
let mut state = self.state.lock().unwrap();
state.metrics.config_ms = startup_end_time
let mut inner = self.inner.lock().unwrap();
inner.metrics.config_ms = startup_end_time
.signed_duration_since(start_time)
.to_std()
.unwrap()
.as_millis() as u64;
state.metrics.total_startup_ms = startup_end_time
inner.metrics.total_startup_ms = startup_end_time
.signed_duration_since(self.start_time)
.to_std()
.unwrap()

View File

@@ -0,0 +1,53 @@
use std::sync::Arc;
use std::thread;
use anyhow::Result;
use tracing::{error, info, instrument};
use crate::compute::ComputeNode;
use compute_api::models::ComputeStatus;
#[instrument(skip(compute))]
fn configurator_main_loop(compute: &Arc<ComputeNode>) {
info!("waiting for reconfiguration requests");
loop {
let inner = compute.inner.lock().unwrap();
let mut inner = compute.state_changed.wait(inner).unwrap();
if inner.state.status == ComputeStatus::ConfigurationPending {
info!("got configuration request");
inner.state.status = ComputeStatus::Configuration;
compute.state_changed.notify_all();
drop(inner);
let mut new_status = ComputeStatus::Failed;
if let Err(e) = compute.reconfigure() {
error!("could not configure compute node: {}", e);
} else {
new_status = ComputeStatus::Running;
info!("compute node configured");
}
// XXX: used to test that API is blocking
// std::thread::sleep(std::time::Duration::from_millis(2000));
compute.set_status(new_status);
} else if inner.state.status == ComputeStatus::Failed {
info!("compute node is now in Failed state, exiting");
break;
} else {
info!("woken up for compute status: {:?}, sleeping", inner.state.status);
}
}
}
pub fn launch_configurator(compute: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
let compute = Arc::clone(compute);
Ok(thread::Builder::new()
.name("compute-configurator".into())
.spawn(move || {
configurator_main_loop(&compute);
info!("configurator thread is exited");
})?)
}

View File

@@ -3,33 +3,54 @@ use std::net::SocketAddr;
use std::sync::Arc;
use std::thread;
use crate::compute::{ComputeNode, ComputeState, ParsedSpec};
use compute_api::requests::ConfigurationRequest;
use compute_api::responses::{ComputeStatus, ComputeStatusResponse, GenericAPIError};
use crate::compute::{ComputeNode, ParsedSpec};
use crate::http::models::{ConfigurationRequest, GenericAPIError};
use compute_api::models::ComputeStatus;
use anyhow::Result;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use num_cpus;
use serde_json;
use tokio::task;
use tracing::{error, info};
use tracing_utils::http::OtelName;
fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse {
ComputeStatusResponse {
tenant: state
.pspec
.as_ref()
.map(|pspec| pspec.tenant_id.to_string()),
timeline: state
.pspec
.as_ref()
.map(|pspec| pspec.timeline_id.to_string()),
status: state.status,
last_active: state.last_active,
error: state.error.clone(),
async fn handle_spec_request(req: Request<Body>, compute: &Arc<ComputeNode>) -> Result<(), (String, StatusCode)> {
if !compute.live_config_allowed {
return Err(("live reconfiguration is not allowed for this compute node".to_string(), StatusCode::PRECONDITION_FAILED));
}
let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap();
let spec_raw = String::from_utf8(body_bytes.to_vec()).unwrap();
let request = serde_json::from_str::<ConfigurationRequest>(&spec_raw)
.map_err(|err| (format!("could not parse request json: {err}"), StatusCode::BAD_REQUEST))?;
let spec = ParsedSpec::try_from(request.spec)
.map_err(|err| (format!("could not parse spec: {err}"), StatusCode::BAD_REQUEST))?;
let mut inner = compute.inner.lock().unwrap();
if !(inner.state.status == ComputeStatus::Empty
|| inner.state.status == ComputeStatus::Running)
{
return Err((format!(
"invalid compute status for reconfiguration request: {}",
serde_json::to_string(&inner.state).unwrap()
), StatusCode::PRECONDITION_FAILED));
}
inner.spec = Some(spec);
inner.state.status = ComputeStatus::ConfigurationPending;
compute.state_changed.notify_all();
info!("set new spec and notified configurator");
while inner.state.status != ComputeStatus::Running {
inner = compute.state_changed.wait(inner).unwrap();
info!(
"waiting for compute to become Running, current status: {:?}",
inner.state.status
);
}
drop(inner);
Ok(())
}
// Service function to handle all available routes.
@@ -43,16 +64,15 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
// Serialized compute state.
(&Method::GET, "/status") => {
info!("serving /status GET request");
let state = compute.state.lock().unwrap();
let status_response = status_response_from_state(&state);
Response::new(Body::from(serde_json::to_string(&status_response).unwrap()))
let state = compute.get_state();
Response::new(Body::from(serde_json::to_string(&state).unwrap()))
}
// Startup metrics in JSON format. Keep /metrics reserved for a possible
// future use for Prometheus metrics format.
(&Method::GET, "/metrics.json") => {
info!("serving /metrics.json GET request");
let metrics = compute.state.lock().unwrap().metrics.clone();
let metrics = compute.get_metrics();
Response::new(Body::from(serde_json::to_string(&metrics).unwrap()))
}
@@ -100,18 +120,19 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
))
}
// Accept spec in JSON format and request compute configuration. If
// anything goes wrong after we set the compute status to `ConfigurationPending`
// and update compute state with new spec, we basically leave compute
// in the potentially wrong state. That said, it's control-plane's
// responsibility to watch compute state after reconfiguration request
// and to clean restart in case of errors.
// Accept spec in JSON format and request compute configuration from
// the configurator thread. If anything goes wrong after we set the
// compute state to `ConfigurationPending` and / or sent spec to the
// configurator thread, we basically leave compute in the potentially
// wrong state. That said, it's control-plane's responsibility to
// watch compute state after reconfiguration request and to clean
// restart in case of errors.
(&Method::POST, "/configure") => {
info!("serving /configure POST request");
match handle_configure_request(req, compute).await {
Ok(msg) => Response::new(Body::from(msg)),
Err((msg, code)) => {
error!("error handling /configure request: {msg}");
match handle_spec_request(req, compute).await {
Ok(()) => Response::new(Body::from("ok")),
Err((msg, code) ) => {
error!("error handling /spec request: {msg}");
render_json_error(&msg, code)
}
}
@@ -126,84 +147,6 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
}
}
async fn handle_configure_request(
req: Request<Body>,
compute: &Arc<ComputeNode>,
) -> Result<String, (String, StatusCode)> {
if !compute.live_config_allowed {
return Err((
"live configuration is not allowed for this compute node".to_string(),
StatusCode::PRECONDITION_FAILED,
));
}
let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap();
let spec_raw = String::from_utf8(body_bytes.to_vec()).unwrap();
if let Ok(request) = serde_json::from_str::<ConfigurationRequest>(&spec_raw) {
let spec = request.spec;
let parsed_spec = match ParsedSpec::try_from(spec) {
Ok(ps) => ps,
Err(msg) => return Err((msg, StatusCode::PRECONDITION_FAILED)),
};
// XXX: wrap state update under lock in code blocks. Otherwise,
// we will try to `Send` `mut state` into the spawned thread
// bellow, which will cause error:
// ```
// error: future cannot be sent between threads safely
// ```
{
let mut state = compute.state.lock().unwrap();
if state.status != ComputeStatus::Empty {
let msg = format!(
"invalid compute status for configuration request: {:?}",
state.status.clone()
);
return Err((msg, StatusCode::PRECONDITION_FAILED));
}
state.pspec = Some(parsed_spec);
state.status = ComputeStatus::ConfigurationPending;
compute.state_changed.notify_all();
drop(state);
info!("set new spec and notified waiters");
}
// Spawn a blocking thread to wait for compute to become Running.
// This is needed to do not block the main pool of workers and
// be able to serve other requests while some particular request
// is waiting for compute to finish configuration.
let c = compute.clone();
task::spawn_blocking(move || {
let mut state = c.state.lock().unwrap();
while state.status != ComputeStatus::Running {
state = c.state_changed.wait(state).unwrap();
info!(
"waiting for compute to become Running, current status: {:?}",
state.status
);
if state.status == ComputeStatus::Failed {
let err = state.error.as_ref().map_or("unknown error", |x| x);
let msg = format!("compute configuration failed: {:?}", err);
return Err((msg, StatusCode::INTERNAL_SERVER_ERROR));
}
}
Ok(())
})
.await
.unwrap()?;
// Return current compute state if everything went well.
let state = compute.state.lock().unwrap().clone();
let status_response = status_response_from_state(&state);
Ok(serde_json::to_string(&status_response).unwrap())
} else {
Err(("invalid spec".to_string(), StatusCode::BAD_REQUEST))
}
}
fn render_json_error(e: &str, status: StatusCode) -> Response<Body> {
let error = GenericAPIError {
error: e.to_string(),
@@ -254,7 +197,6 @@ async fn serve(state: Arc<ComputeNode>) {
/// Launch a separate Hyper HTTP API server thread and return its `JoinHandle`.
pub fn launch_http_server(state: &Arc<ComputeNode>) -> Result<thread::JoinHandle<()>> {
let state = Arc::clone(state);
Ok(thread::Builder::new()
.name("http-endpoint".into())
.spawn(move || serve(state))?)

View File

@@ -1 +1,2 @@
pub mod api;
pub mod models;

View File

@@ -1,10 +1,7 @@
//! Structs representing the JSON formats used in the compute_ctl's HTTP API.
use serde::{Deserialize, Serialize};
use crate::spec::ComputeSpec;
use serde::Deserialize;
use compute_api::spec::ComputeSpec;
/// Request of the /configure API
///
/// We now pass only `spec` in the configuration request, but later we can
/// extend it and something like `restart: bool` or something else. So put
/// `spec` into a struct initially to be more flexible in the future.
@@ -12,3 +9,8 @@ use serde::Deserialize;
pub struct ConfigurationRequest {
pub spec: ComputeSpec,
}
#[derive(Serialize, Debug)]
pub struct GenericAPIError {
pub error: String,
}

View File

@@ -89,11 +89,12 @@ paths:
post:
tags:
- Configure
summary: Perform compute node configuration.
summary: Request compute node configuration.
description: |
This is a blocking API endpoint, i.e. it blocks waiting until
compute is finished configuration and is in `Running` state.
Optional non-blocking mode could be added later.
Optional non-blocking mode could be added later. Currently,
it's also assumed that reconfiguration doesn't require restart.
operationId: configureCompute
requestBody:
description: Configuration request.
@@ -131,14 +132,6 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/GenericError"
500:
description: |
Compute configuration request was processed, but error
occurred. Compute will likely shutdown soon.
content:
application/json:
schema:
$ref: "#/components/schemas/GenericError"
components:
securitySchemes:

View File

@@ -4,6 +4,7 @@
//!
pub mod checker;
pub mod config;
pub mod configurator;
pub mod http;
#[macro_use]
pub mod logger;

View File

@@ -46,7 +46,7 @@ fn watch_compute_activity(compute: &ComputeNode) {
AND usename != 'cloud_admin';", // XXX: find a better way to filter other monitors?
&[],
);
let mut last_active = compute.state.lock().unwrap().last_active;
let mut last_active = compute.inner.lock().unwrap().state.last_active;
if let Ok(backs) = backends {
let mut idle_backs: Vec<DateTime<Utc>> = vec![];
@@ -87,9 +87,9 @@ fn watch_compute_activity(compute: &ComputeNode) {
}
// Update the last activity in the shared state if we got a more recent one.
let mut state = compute.state.lock().unwrap();
if last_active > state.last_active {
state.last_active = last_active;
let mut inner = compute.inner.lock().unwrap();
if last_active > inner.state.last_active {
inner.state.last_active = last_active;
debug!("set the last compute activity time to: {}", last_active);
}
}

View File

@@ -7,7 +7,7 @@
//!
use anyhow::{anyhow, bail, Context, Result};
use clap::{value_parser, Arg, ArgAction, ArgMatches, Command};
use control_plane::compute::ComputeControlPlane;
use control_plane::endpoint::ComputeControlPlane;
use control_plane::local_env::LocalEnv;
use control_plane::pageserver::PageServerNode;
use control_plane::safekeeper::SafekeeperNode;
@@ -106,8 +106,8 @@ fn main() -> Result<()> {
"start" => handle_start_all(sub_args, &env),
"stop" => handle_stop_all(sub_args, &env),
"pageserver" => handle_pageserver(sub_args, &env),
"pg" => handle_pg(sub_args, &env),
"safekeeper" => handle_safekeeper(sub_args, &env),
"endpoint" => handle_endpoint(sub_args, &env),
_ => bail!("unexpected subcommand {sub_name}"),
};
@@ -470,10 +470,10 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
let mut cplane = ComputeControlPlane::load(env.clone())?;
println!("Importing timeline into pageserver ...");
pageserver.timeline_import(tenant_id, timeline_id, base, pg_wal, pg_version)?;
println!("Creating node for imported timeline ...");
env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?;
cplane.new_node(tenant_id, name, timeline_id, None, None, pg_version)?;
println!("Creating endpoint for imported timeline ...");
cplane.new_endpoint(tenant_id, name, timeline_id, None, None, pg_version)?;
println!("Done");
}
Some(("branch", branch_match)) => {
@@ -521,10 +521,10 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
Ok(())
}
fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
let (sub_name, sub_args) = match pg_match.subcommand() {
Some(pg_subcommand_data) => pg_subcommand_data,
None => bail!("no pg subcommand provided"),
fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
let (sub_name, sub_args) = match ep_match.subcommand() {
Some(ep_subcommand_data) => ep_subcommand_data,
None => bail!("no endpoint subcommand provided"),
};
let mut cplane = ComputeControlPlane::load(env.clone())?;
@@ -546,7 +546,7 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
table.load_preset(comfy_table::presets::NOTHING);
table.set_header([
"NODE",
"ENDPOINT",
"ADDRESS",
"TIMELINE",
"BRANCH NAME",
@@ -554,39 +554,39 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
"STATUS",
]);
for ((_, node_name), node) in cplane
.nodes
for (endpoint_id, endpoint) in cplane
.endpoints
.iter()
.filter(|((node_tenant_id, _), _)| node_tenant_id == &tenant_id)
.filter(|(_, endpoint)| endpoint.tenant_id == tenant_id)
{
let lsn_str = match node.lsn {
let lsn_str = match endpoint.lsn {
None => {
// -> primary node
// -> primary endpoint
// Use the LSN at the end of the timeline.
timeline_infos
.get(&node.timeline_id)
.get(&endpoint.timeline_id)
.map(|bi| bi.last_record_lsn.to_string())
.unwrap_or_else(|| "?".to_string())
}
Some(lsn) => {
// -> read-only node
// Use the node's LSN.
// -> read-only endpoint
// Use the endpoint's LSN.
lsn.to_string()
}
};
let branch_name = timeline_name_mappings
.get(&TenantTimelineId::new(tenant_id, node.timeline_id))
.get(&TenantTimelineId::new(tenant_id, endpoint.timeline_id))
.map(|name| name.as_str())
.unwrap_or("?");
table.add_row([
node_name.as_str(),
&node.address.to_string(),
&node.timeline_id.to_string(),
endpoint_id.as_str(),
&endpoint.address.to_string(),
&endpoint.timeline_id.to_string(),
branch_name,
lsn_str.as_str(),
node.status(),
endpoint.status(),
]);
}
@@ -597,10 +597,10 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
.get_one::<String>("branch-name")
.map(|s| s.as_str())
.unwrap_or(DEFAULT_BRANCH_NAME);
let node_name = sub_args
.get_one::<String>("node")
.map(|node_name| node_name.to_string())
.unwrap_or_else(|| format!("{branch_name}_node"));
let endpoint_id = sub_args
.get_one::<String>("endpoint_id")
.map(String::to_string)
.unwrap_or_else(|| format!("ep-{branch_name}"));
let lsn = sub_args
.get_one::<String>("lsn")
@@ -618,15 +618,15 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
.copied()
.context("Failed to parse postgres version from the argument string")?;
cplane.new_node(tenant_id, &node_name, timeline_id, lsn, port, pg_version)?;
cplane.new_endpoint(tenant_id, &endpoint_id, timeline_id, lsn, port, pg_version)?;
}
"start" => {
let port: Option<u16> = sub_args.get_one::<u16>("port").copied();
let node_name = sub_args
.get_one::<String>("node")
.ok_or_else(|| anyhow!("No node name was provided to start"))?;
let endpoint_id = sub_args
.get_one::<String>("endpoint_id")
.ok_or_else(|| anyhow!("No endpoint ID was provided to start"))?;
let node = cplane.nodes.get(&(tenant_id, node_name.to_string()));
let endpoint = cplane.endpoints.get(endpoint_id.as_str());
let auth_token = if matches!(env.pageserver.pg_auth_type, AuthType::NeonJWT) {
let claims = Claims::new(Some(tenant_id), Scope::Tenant);
@@ -636,9 +636,9 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
None
};
if let Some(node) = node {
println!("Starting existing postgres {node_name}...");
node.start(&auth_token)?;
if let Some(endpoint) = endpoint {
println!("Starting existing endpoint {endpoint_id}...");
endpoint.start(&auth_token)?;
} else {
let branch_name = sub_args
.get_one::<String>("branch-name")
@@ -663,27 +663,33 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
// start --port X
// stop
// start <-- will also use port X even without explicit port argument
println!("Starting new postgres (v{pg_version}) {node_name} on timeline {timeline_id} ...");
println!("Starting new endpoint {endpoint_id} (PostgreSQL v{pg_version}) on timeline {timeline_id} ...");
let node =
cplane.new_node(tenant_id, node_name, timeline_id, lsn, port, pg_version)?;
node.start(&auth_token)?;
let ep = cplane.new_endpoint(
tenant_id,
endpoint_id,
timeline_id,
lsn,
port,
pg_version,
)?;
ep.start(&auth_token)?;
}
}
"stop" => {
let node_name = sub_args
.get_one::<String>("node")
.ok_or_else(|| anyhow!("No node name was provided to stop"))?;
let endpoint_id = sub_args
.get_one::<String>("endpoint_id")
.ok_or_else(|| anyhow!("No endpoint ID was provided to stop"))?;
let destroy = sub_args.get_flag("destroy");
let node = cplane
.nodes
.get(&(tenant_id, node_name.to_string()))
.with_context(|| format!("postgres {node_name} is not found"))?;
node.stop(destroy)?;
let endpoint = cplane
.endpoints
.get(endpoint_id.as_str())
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?;
endpoint.stop(destroy)?;
}
_ => bail!("Unexpected pg subcommand '{sub_name}'"),
_ => bail!("Unexpected endpoint subcommand '{sub_name}'"),
}
Ok(())
@@ -802,7 +808,7 @@ fn handle_safekeeper(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Resul
}
fn handle_start_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> anyhow::Result<()> {
// Postgres nodes are not started automatically
// Endpoints are not started automatically
broker::start_broker_process(env)?;
@@ -836,10 +842,10 @@ fn handle_stop_all(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<
fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) {
let pageserver = PageServerNode::from_env(env);
// Stop all compute nodes
// Stop all endpoints
match ComputeControlPlane::load(env.clone()) {
Ok(cplane) => {
for (_k, node) in cplane.nodes {
for (_k, node) in cplane.endpoints {
if let Err(e) = node.stop(false) {
eprintln!("postgres stop failed: {e:#}");
}
@@ -872,7 +878,9 @@ fn cli() -> Command {
.help("Name of the branch to be created or used as an alias for other services")
.required(false);
let pg_node_arg = Arg::new("node").help("Postgres node name").required(false);
let endpoint_id_arg = Arg::new("endpoint_id")
.help("Postgres endpoint id")
.required(false);
let safekeeper_id_arg = Arg::new("id").help("safekeeper id").required(false);
@@ -1026,27 +1034,27 @@ fn cli() -> Command {
)
)
.subcommand(
Command::new("pg")
Command::new("endpoint")
.arg_required_else_help(true)
.about("Manage postgres instances")
.subcommand(Command::new("list").arg(tenant_id_arg.clone()))
.subcommand(Command::new("create")
.about("Create a postgres compute node")
.arg(pg_node_arg.clone())
.about("Create a compute endpoint")
.arg(endpoint_id_arg.clone())
.arg(branch_name_arg.clone())
.arg(tenant_id_arg.clone())
.arg(lsn_arg.clone())
.arg(port_arg.clone())
.arg(
Arg::new("config-only")
.help("Don't do basebackup, create compute node with only config files")
.help("Don't do basebackup, create endpoint directory with only config files")
.long("config-only")
.required(false))
.arg(pg_version_arg.clone())
)
.subcommand(Command::new("start")
.about("Start a postgres compute node.\n This command actually creates new node from scratch, but preserves existing config files")
.arg(pg_node_arg.clone())
.about("Start postgres.\n If the endpoint doesn't exist yet, it is created.")
.arg(endpoint_id_arg.clone())
.arg(tenant_id_arg.clone())
.arg(branch_name_arg)
.arg(timeline_id_arg)
@@ -1056,7 +1064,7 @@ fn cli() -> Command {
)
.subcommand(
Command::new("stop")
.arg(pg_node_arg)
.arg(endpoint_id_arg)
.arg(tenant_id_arg)
.arg(
Arg::new("destroy")

View File

@@ -25,54 +25,45 @@ use crate::postgresql_conf::PostgresConf;
//
pub struct ComputeControlPlane {
base_port: u16,
pageserver: Arc<PageServerNode>,
pub nodes: BTreeMap<(TenantId, String), Arc<PostgresNode>>,
// endpoint ID is the key
pub endpoints: BTreeMap<String, Arc<Endpoint>>,
env: LocalEnv,
pageserver: Arc<PageServerNode>,
}
impl ComputeControlPlane {
// Load current nodes with ports from data directories on disk
// Directory structure has the following layout:
// pgdatadirs
// |- tenants
// | |- <tenant_id>
// | | |- <node name>
// Load current endpoints from the endpoints/ subdirectories
pub fn load(env: LocalEnv) -> Result<ComputeControlPlane> {
let pageserver = Arc::new(PageServerNode::from_env(&env));
let mut nodes = BTreeMap::default();
let pgdatadirspath = &env.pg_data_dirs_path();
for tenant_dir in fs::read_dir(pgdatadirspath)
.with_context(|| format!("failed to list {}", pgdatadirspath.display()))?
let mut endpoints = BTreeMap::default();
for endpoint_dir in fs::read_dir(env.endpoints_path())
.with_context(|| format!("failed to list {}", env.endpoints_path().display()))?
{
let tenant_dir = tenant_dir?;
for timeline_dir in fs::read_dir(tenant_dir.path())
.with_context(|| format!("failed to list {}", tenant_dir.path().display()))?
{
let node = PostgresNode::from_dir_entry(timeline_dir?, &env, &pageserver)?;
nodes.insert((node.tenant_id, node.name.clone()), Arc::new(node));
}
let ep = Endpoint::from_dir_entry(endpoint_dir?, &env, &pageserver)?;
endpoints.insert(ep.name.clone(), Arc::new(ep));
}
Ok(ComputeControlPlane {
base_port: 55431,
pageserver,
nodes,
endpoints,
env,
pageserver,
})
}
fn get_port(&mut self) -> u16 {
1 + self
.nodes
.endpoints
.values()
.map(|node| node.address.port())
.map(|ep| ep.address.port())
.max()
.unwrap_or(self.base_port)
}
pub fn new_node(
pub fn new_endpoint(
&mut self,
tenant_id: TenantId,
name: &str,
@@ -80,9 +71,9 @@ impl ComputeControlPlane {
lsn: Option<Lsn>,
port: Option<u16>,
pg_version: u32,
) -> Result<Arc<PostgresNode>> {
) -> Result<Arc<Endpoint>> {
let port = port.unwrap_or_else(|| self.get_port());
let node = Arc::new(PostgresNode {
let ep = Arc::new(Endpoint {
name: name.to_owned(),
address: SocketAddr::new("127.0.0.1".parse().unwrap(), port),
env: self.env.clone(),
@@ -93,39 +84,45 @@ impl ComputeControlPlane {
pg_version,
});
node.create_pgdata()?;
node.setup_pg_conf()?;
ep.create_pgdata()?;
ep.setup_pg_conf()?;
self.nodes
.insert((tenant_id, node.name.clone()), Arc::clone(&node));
self.endpoints.insert(ep.name.clone(), Arc::clone(&ep));
Ok(node)
Ok(ep)
}
}
///////////////////////////////////////////////////////////////////////////////
#[derive(Debug)]
pub struct PostgresNode {
pub address: SocketAddr,
pub struct Endpoint {
/// used as the directory name
name: String,
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
// Some(lsn) if this is a read-only endpoint anchored at 'lsn'. None for the primary.
pub lsn: Option<Lsn>,
// port and address of the Postgres server
pub address: SocketAddr,
pg_version: u32,
// These are not part of the endpoint as such, but the environment
// the endpoint runs in.
pub env: LocalEnv,
pageserver: Arc<PageServerNode>,
pub timeline_id: TimelineId,
pub lsn: Option<Lsn>, // if it's a read-only node. None for primary
pub tenant_id: TenantId,
pg_version: u32,
}
impl PostgresNode {
impl Endpoint {
fn from_dir_entry(
entry: std::fs::DirEntry,
env: &LocalEnv,
pageserver: &Arc<PageServerNode>,
) -> Result<PostgresNode> {
) -> Result<Endpoint> {
if !entry.file_type()?.is_dir() {
anyhow::bail!(
"PostgresNode::from_dir_entry failed: '{}' is not a directory",
"Endpoint::from_dir_entry failed: '{}' is not a directory",
entry.path().display()
);
}
@@ -135,7 +132,7 @@ impl PostgresNode {
let name = fname.to_str().unwrap().to_string();
// Read config file into memory
let cfg_path = entry.path().join("postgresql.conf");
let cfg_path = entry.path().join("pgdata").join("postgresql.conf");
let cfg_path_str = cfg_path.to_string_lossy();
let mut conf_file = File::open(&cfg_path)
.with_context(|| format!("failed to open config file in {}", cfg_path_str))?;
@@ -161,7 +158,7 @@ impl PostgresNode {
conf.parse_field_optional("recovery_target_lsn", &context)?;
// ok now
Ok(PostgresNode {
Ok(Endpoint {
address: SocketAddr::new("127.0.0.1".parse().unwrap(), port),
name,
env: env.clone(),
@@ -269,7 +266,7 @@ impl PostgresNode {
}
// Write postgresql.conf with default configuration
// and PG_VERSION file to the data directory of a new node.
// and PG_VERSION file to the data directory of a new endpoint.
fn setup_pg_conf(&self) -> Result<()> {
let mut conf = PostgresConf::new();
conf.append("max_wal_senders", "10");
@@ -289,7 +286,7 @@ impl PostgresNode {
// walproposer panics when basebackup is invalid, it is pointless to restart in this case.
conf.append("restart_after_crash", "off");
// Configure the node to fetch pages from pageserver
// Configure the Neon Postgres extension to fetch pages from pageserver
let pageserver_connstr = {
let config = &self.pageserver.pg_connection_config;
let (host, port) = (config.host(), config.port());
@@ -325,7 +322,7 @@ impl PostgresNode {
conf.append("max_replication_flush_lag", "10GB");
if !self.env.safekeepers.is_empty() {
// Configure the node to connect to the safekeepers
// Configure Postgres to connect to the safekeepers
conf.append("synchronous_standby_names", "walproposer");
let safekeepers = self
@@ -380,8 +377,12 @@ impl PostgresNode {
Ok(())
}
pub fn endpoint_path(&self) -> PathBuf {
self.env.endpoints_path().join(&self.name)
}
pub fn pgdata(&self) -> PathBuf {
self.env.pg_data_dir(&self.tenant_id, &self.name)
self.endpoint_path().join("pgdata")
}
pub fn status(&self) -> &str {
@@ -443,12 +444,11 @@ impl PostgresNode {
}
pub fn start(&self, auth_token: &Option<String>) -> Result<()> {
// Bail if the node already running.
if self.status() == "running" {
anyhow::bail!("The node is already running");
anyhow::bail!("The endpoint is already running");
}
// 1. We always start compute node from scratch, so
// 1. We always start Postgres from scratch, so
// if old dir exists, preserve 'postgresql.conf' and drop the directory
let postgresql_conf_path = self.pgdata().join("postgresql.conf");
let postgresql_conf = fs::read(&postgresql_conf_path).with_context(|| {
@@ -470,8 +470,8 @@ impl PostgresNode {
File::create(self.pgdata().join("standby.signal"))?;
}
// 4. Finally start the compute node postgres
println!("Starting postgres node at '{}'", self.connstr());
// 4. Finally start postgres
println!("Starting postgres at '{}'", self.connstr());
self.pg_ctl(&["start"], auth_token)
}
@@ -480,7 +480,7 @@ impl PostgresNode {
// use immediate shutdown mode, otherwise,
// shutdown gracefully to leave the data directory sane.
//
// Compute node always starts from scratch, so stop
// Postgres is always started from scratch, so stop
// without destroy only used for testing and debugging.
//
if destroy {
@@ -489,7 +489,7 @@ impl PostgresNode {
"Destroying postgres data directory '{}'",
self.pgdata().to_str().unwrap()
);
fs::remove_dir_all(self.pgdata())?;
fs::remove_dir_all(self.endpoint_path())?;
} else {
self.pg_ctl(&["stop"], &None)?;
}

View File

@@ -9,7 +9,7 @@
mod background_process;
pub mod broker;
pub mod compute;
pub mod endpoint;
pub mod local_env;
pub mod pageserver;
pub mod postgresql_conf;

View File

@@ -200,14 +200,8 @@ impl LocalEnv {
self.neon_distrib_dir.join("storage_broker")
}
pub fn pg_data_dirs_path(&self) -> PathBuf {
self.base_data_dir.join("pgdatadirs").join("tenants")
}
pub fn pg_data_dir(&self, tenant_id: &TenantId, branch_name: &str) -> PathBuf {
self.pg_data_dirs_path()
.join(tenant_id.to_string())
.join(branch_name)
pub fn endpoints_path(&self) -> PathBuf {
self.base_data_dir.join("endpoints")
}
// TODO: move pageserver files into ./pageserver
@@ -427,7 +421,7 @@ impl LocalEnv {
}
}
fs::create_dir_all(self.pg_data_dirs_path())?;
fs::create_dir_all(self.endpoints_path())?;
for safekeeper in &self.safekeepers {
fs::create_dir_all(SafekeeperNode::datadir_path_by_id(self, safekeeper.id))?;

View File

@@ -1,3 +1,2 @@
pub mod requests;
pub mod responses;
pub mod models;
pub mod spec;

View File

@@ -1,26 +1,10 @@
//! Structs representing the JSON formats used in the compute_ctl's HTTP API.
use chrono::{DateTime, Utc};
use serde::{Serialize, Serializer};
#[derive(Serialize, Debug)]
pub struct GenericAPIError {
pub error: String,
}
/// Response of the /status API
#[derive(Serialize, Debug)]
#[serde(rename_all = "snake_case")]
pub struct ComputeStatusResponse {
pub tenant: Option<String>,
pub timeline: Option<String>,
pub status: ComputeStatus,
#[serde(serialize_with = "rfc3339_serialize")]
pub last_active: DateTime<Utc>,
pub error: Option<String>,
}
#[derive(Serialize)]
///
#[derive(Clone, Serialize)]
#[serde(rename_all = "snake_case")]
pub struct ComputeState {
pub status: ComputeStatus,
@@ -30,14 +14,12 @@ pub struct ComputeState {
pub error: Option<String>,
}
#[derive(Serialize, Clone, Copy, Debug, PartialEq, Eq)]
#[derive(Serialize, Clone, Copy, PartialEq, Eq, Debug)]
#[serde(rename_all = "snake_case")]
pub enum ComputeStatus {
// Spec wasn't provided at start, waiting for it to be
// Spec wasn't provided as start, waiting for it to be
// provided by control-plane.
Empty,
// Compute configuration was requested.
ConfigurationPending,
// Compute node has spec and initial startup and
// configuration is in progress.
Init,
@@ -47,6 +29,10 @@ pub enum ComputeStatus {
// compute will exit soon or is waiting for
// control-plane to terminate it.
Failed,
// Control-plane requested reconfiguration.
ConfigurationPending,
// New spec is being applied.
Configuration,
}
fn rfc3339_serialize<S>(x: &DateTime<Utc>, s: S) -> Result<S::Ok, S::Error>
@@ -57,7 +43,7 @@ where
}
/// Response of the /metrics.json API
#[derive(Clone, Debug, Default, Serialize)]
#[derive(Clone, Default, Serialize)]
pub struct ComputeMetrics {
pub sync_safekeepers_ms: u64,
pub basebackup_ms: u64,

View File

@@ -12,13 +12,10 @@ pub type PgIdent = String;
/// Cluster spec or configuration represented as an optional number of
/// delta operations + final cluster state description.
#[derive(Clone, Debug, Default, Deserialize)]
#[derive(Clone, Debug, Deserialize)]
pub struct ComputeSpec {
pub format_version: f32,
// The control plane also includes a 'timestamp' field in the JSON document,
// but we don't use it for anything. Serde will ignore missing fields when
// deserializing it.
pub timestamp: String,
pub operation_uuid: Option<String>,
/// Expected cluster state at the end of transition process.
pub cluster: Cluster,
@@ -29,7 +26,7 @@ pub struct ComputeSpec {
pub startup_tracing_context: Option<HashMap<String, String>>,
}
#[derive(Clone, Debug, Default, Deserialize)]
#[derive(Clone, Debug, Deserialize)]
pub struct Cluster {
pub cluster_id: String,
pub name: String,

View File

@@ -54,7 +54,7 @@ pub fn is_expected_io_error(e: &io::Error) -> bool {
use io::ErrorKind::*;
matches!(
e.kind(),
ConnectionRefused | ConnectionAborted | ConnectionReset | TimedOut
ConnectionRefused | ConnectionAborted | ConnectionReset
)
}
@@ -320,17 +320,9 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
if let ProtoState::Closed = self.state {
Ok(None)
} else {
match self.framed.read_message().await {
Ok(m) => {
trace!("read msg {:?}", m);
Ok(m)
}
Err(e) => {
// remember not to try to read anymore
self.state = ProtoState::Closed;
Err(e)
}
}
let m = self.framed.read_message().await?;
trace!("read msg {:?}", m);
Ok(m)
}
}
@@ -501,10 +493,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
MaybeWriteOnly::Full(framed) => {
let (reader, writer) = framed.split();
self.framed = MaybeWriteOnly::WriteOnly(writer);
Ok(PostgresBackendReader {
reader,
closed: false,
})
Ok(PostgresBackendReader(reader))
}
MaybeWriteOnly::WriteOnly(_) => {
anyhow::bail!("PostgresBackend is already split")
@@ -521,12 +510,8 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
anyhow::bail!("PostgresBackend is not split")
}
MaybeWriteOnly::WriteOnly(writer) => {
let joined = Framed::unsplit(reader.reader, writer);
let joined = Framed::unsplit(reader.0, writer);
self.framed = MaybeWriteOnly::Full(joined);
// if reader encountered connection error, do not attempt reading anymore
if reader.closed {
self.state = ProtoState::Closed;
}
Ok(())
}
MaybeWriteOnly::Broken => panic!("unsplit on framed in invalid state"),
@@ -812,25 +797,15 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
}
}
pub struct PostgresBackendReader<IO> {
reader: FramedReader<MaybeTlsStream<IO>>,
closed: bool, // true if received error closing the connection
}
pub struct PostgresBackendReader<IO>(FramedReader<MaybeTlsStream<IO>>);
impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackendReader<IO> {
/// Read full message or return None if connection is cleanly closed with no
/// unprocessed data.
pub async fn read_message(&mut self) -> Result<Option<FeMessage>, ConnectionError> {
match self.reader.read_message().await {
Ok(m) => {
trace!("read msg {:?}", m);
Ok(m)
}
Err(e) => {
self.closed = true;
Err(e)
}
}
let m = self.0.read_message().await?;
trace!("read msg {:?}", m);
Ok(m)
}
/// Get CopyData contents of the next message in COPY stream or error
@@ -948,7 +923,7 @@ pub enum CopyStreamHandlerEnd {
#[error("EOF on COPY stream")]
EOF,
/// The connection was lost
#[error("connection error: {0}")]
#[error(transparent)]
Disconnected(#[from] ConnectionError),
/// Some other error
#[error(transparent)]

View File

@@ -293,9 +293,6 @@ impl FeStartupPacket {
// We shouldn't advance `buf` as probably full message is not there yet,
// so can't directly use Bytes::get_u32 etc.
let len = (&buf[0..4]).read_u32::<BigEndian>().unwrap() as usize;
// The proposed replacement is `!(4..=MAX_STARTUP_PACKET_LENGTH).contains(&len)`
// which is less readable
#[allow(clippy::manual_range_contains)]
if len < 4 || len > MAX_STARTUP_PACKET_LENGTH {
return Err(ProtocolError::Protocol(format!(
"invalid startup packet message length {}",

View File

@@ -13,6 +13,7 @@ use std::{
collections::HashMap,
fmt::Debug,
num::{NonZeroU32, NonZeroUsize},
ops::Deref,
path::{Path, PathBuf},
pin::Pin,
sync::Arc,
@@ -89,7 +90,7 @@ pub trait RemoteStorage: Send + Sync + 'static {
/// Streams the local file contents into remote into the remote storage entry.
async fn upload(
&self,
from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
data: Box<(dyn io::AsyncRead + Unpin + Send + Sync + 'static)>,
// S3 PUT request requires the content length to be specified,
// otherwise it starts to fail with the concurrent connection count increasing.
data_size_bytes: usize,
@@ -160,67 +161,14 @@ pub enum GenericRemoteStorage {
Unreliable(Arc<UnreliableWrapper>),
}
impl GenericRemoteStorage {
pub async fn list_prefixes(
&self,
prefix: Option<&RemotePath>,
) -> Result<Vec<RemotePath>, DownloadError> {
match self {
Self::LocalFs(s) => s.list_prefixes(prefix).await,
Self::AwsS3(s) => s.list_prefixes(prefix).await,
Self::Unreliable(s) => s.list_prefixes(prefix).await,
}
}
impl Deref for GenericRemoteStorage {
type Target = dyn RemoteStorage;
pub async fn upload(
&self,
from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,
) -> anyhow::Result<()> {
fn deref(&self) -> &Self::Target {
match self {
Self::LocalFs(s) => s.upload(from, data_size_bytes, to, metadata).await,
Self::AwsS3(s) => s.upload(from, data_size_bytes, to, metadata).await,
Self::Unreliable(s) => s.upload(from, data_size_bytes, to, metadata).await,
}
}
pub async fn download(&self, from: &RemotePath) -> Result<Download, DownloadError> {
match self {
Self::LocalFs(s) => s.download(from).await,
Self::AwsS3(s) => s.download(from).await,
Self::Unreliable(s) => s.download(from).await,
}
}
pub async fn download_byte_range(
&self,
from: &RemotePath,
start_inclusive: u64,
end_exclusive: Option<u64>,
) -> Result<Download, DownloadError> {
match self {
Self::LocalFs(s) => {
s.download_byte_range(from, start_inclusive, end_exclusive)
.await
}
Self::AwsS3(s) => {
s.download_byte_range(from, start_inclusive, end_exclusive)
.await
}
Self::Unreliable(s) => {
s.download_byte_range(from, start_inclusive, end_exclusive)
.await
}
}
}
pub async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> {
match self {
Self::LocalFs(s) => s.delete(path).await,
Self::AwsS3(s) => s.delete(path).await,
Self::Unreliable(s) => s.delete(path).await,
GenericRemoteStorage::LocalFs(local_fs) => local_fs,
GenericRemoteStorage::AwsS3(s3_bucket) => s3_bucket.as_ref(),
GenericRemoteStorage::Unreliable(s) => s.as_ref(),
}
}
}
@@ -251,7 +199,7 @@ impl GenericRemoteStorage {
/// this path is used for the remote object id conversion only.
pub async fn upload_storage_object(
&self,
from: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static,
from: Box<dyn tokio::io::AsyncRead + Unpin + Send + Sync + 'static>,
from_size_bytes: usize,
to: &RemotePath,
) -> anyhow::Result<()> {

View File

@@ -118,7 +118,7 @@ impl RemoteStorage for LocalFs {
async fn upload(
&self,
data: impl io::AsyncRead + Unpin + Send + Sync + 'static,
data: Box<(dyn io::AsyncRead + Unpin + Send + Sync + 'static)>,
data_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,

View File

@@ -343,7 +343,7 @@ impl RemoteStorage for S3Bucket {
async fn upload(
&self,
from: impl io::AsyncRead + Unpin + Send + Sync + 'static,
from: Box<(dyn io::AsyncRead + Unpin + Send + Sync + 'static)>,
from_size_bytes: usize,
to: &RemotePath,
metadata: Option<StorageMetadata>,

View File

@@ -84,7 +84,7 @@ impl RemoteStorage for UnreliableWrapper {
async fn upload(
&self,
data: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static,
data: Box<(dyn tokio::io::AsyncRead + Unpin + Send + Sync + 'static)>,
// S3 PUT request requires the content length to be specified,
// otherwise it starts to fail with the concurrent connection count increasing.
data_size_bytes: usize,

View File

@@ -11,14 +11,6 @@ use serde::{Deserialize, Serialize};
pub struct Percent(#[serde(deserialize_with = "deserialize_pct_0_to_100")] u8);
impl Percent {
pub const fn new(pct: u8) -> Option<Self> {
if pct <= 100 {
Some(Percent(pct))
} else {
None
}
}
pub fn get(&self) -> u8 {
self.0
}

View File

@@ -639,7 +639,7 @@ mod filesystem_level_usage {
),
(
"max_usage_pct",
usage_pct >= self.config.max_usage_pct.get() as u64,
usage_pct > self.config.max_usage_pct.get() as u64,
),
];
@@ -686,43 +686,4 @@ mod filesystem_level_usage {
avail_bytes,
})
}
#[test]
fn max_usage_pct_pressure() {
use super::Usage as _;
use std::time::Duration;
use utils::serde_percent::Percent;
let mut usage = Usage {
config: &DiskUsageEvictionTaskConfig {
max_usage_pct: Percent::new(85).unwrap(),
min_avail_bytes: 0,
period: Duration::MAX,
#[cfg(feature = "testing")]
mock_statvfs: None,
},
total_bytes: 100_000,
avail_bytes: 0,
};
assert!(usage.has_pressure(), "expected pressure at 100%");
usage.add_available_bytes(14_000);
assert!(usage.has_pressure(), "expected pressure at 86%");
usage.add_available_bytes(999);
assert!(usage.has_pressure(), "expected pressure at 85.001%");
usage.add_available_bytes(1);
assert!(usage.has_pressure(), "expected pressure at precisely 85%");
usage.add_available_bytes(1);
assert!(!usage.has_pressure(), "no pressure at 84.999%");
usage.add_available_bytes(999);
assert!(!usage.has_pressure(), "no pressure at 84%");
usage.add_available_bytes(16_000);
assert!(!usage.has_pressure());
}
}

View File

@@ -177,9 +177,9 @@ impl UninitializedTimeline<'_> {
///
/// The new timeline is initialized in Active state, and its background jobs are
/// started
pub fn initialize(self, ctx: &RequestContext) -> anyhow::Result<Arc<Timeline>> {
pub fn initialize(self, _ctx: &RequestContext) -> anyhow::Result<Arc<Timeline>> {
let mut timelines = self.owning_tenant.timelines.lock().unwrap();
self.initialize_with_lock(ctx, &mut timelines, true, true)
self.initialize_with_lock(&mut timelines, true, true)
}
/// Like `initialize`, but the caller is already holding lock on Tenant::timelines.
@@ -189,7 +189,6 @@ impl UninitializedTimeline<'_> {
/// been initialized.
fn initialize_with_lock(
mut self,
ctx: &RequestContext,
timelines: &mut HashMap<TimelineId, Arc<Timeline>>,
load_layer_map: bool,
activate: bool,
@@ -230,9 +229,7 @@ impl UninitializedTimeline<'_> {
new_timeline.maybe_spawn_flush_loop();
if activate {
new_timeline
.activate(ctx)
.context("initializing timeline activation")?;
new_timeline.activate();
}
}
}
@@ -472,7 +469,7 @@ impl Tenant {
local_metadata: Option<TimelineMetadata>,
ancestor: Option<Arc<Timeline>>,
first_save: bool,
ctx: &RequestContext,
_ctx: &RequestContext,
) -> anyhow::Result<()> {
let tenant_id = self.tenant_id;
@@ -507,7 +504,7 @@ impl Tenant {
// Do not start walreceiver here. We do need loaded layer map for reconcile_with_remote
// But we shouldnt start walreceiver before we have all the data locally, because working walreceiver
// will ingest data which may require looking at the layers which are not yet available locally
match timeline.initialize_with_lock(ctx, &mut timelines_accessor, true, false) {
match timeline.initialize_with_lock(&mut timelines_accessor, true, false) {
Ok(new_timeline) => new_timeline,
Err(e) => {
error!("Failed to initialize timeline {tenant_id}/{timeline_id}: {e:?}");
@@ -632,7 +629,7 @@ impl Tenant {
///
/// Background task that downloads all data for a tenant and brings it to Active state.
///
#[instrument(skip_all, fields(tenant_id=%self.tenant_id))]
#[instrument(skip(self, ctx), fields(tenant_id=%self.tenant_id))]
async fn attach(self: &Arc<Tenant>, ctx: RequestContext) -> anyhow::Result<()> {
// Create directory with marker file to indicate attaching state.
// The load_local_tenants() function in tenant::mgr relies on the marker file
@@ -753,7 +750,7 @@ impl Tenant {
// Start background operations and open the tenant for business.
// The loops will shut themselves down when they notice that the tenant is inactive.
self.activate(&ctx)?;
self.activate()?;
info!("Done");
@@ -1025,7 +1022,7 @@ impl Tenant {
// Start background operations and open the tenant for business.
// The loops will shut themselves down when they notice that the tenant is inactive.
self.activate(ctx)?;
self.activate()?;
info!("Done");
@@ -1361,7 +1358,12 @@ impl Tenant {
// Stop the walreceiver first.
debug!("waiting for wal receiver to shutdown");
timeline.walreceiver.stop().await;
task_mgr::shutdown_tasks(
Some(TaskKind::WalReceiverManager),
Some(self.tenant_id),
Some(timeline_id),
)
.await;
debug!("wal receiver shutdown confirmed");
info!("waiting for timeline tasks to shutdown");
@@ -1448,7 +1450,7 @@ impl Tenant {
}
/// Changes tenant status to active, unless shutdown was already requested.
fn activate(&self, ctx: &RequestContext) -> anyhow::Result<()> {
fn activate(&self) -> anyhow::Result<()> {
let mut result = Ok(());
self.state.send_modify(|current_state| {
match *current_state {
@@ -1482,20 +1484,7 @@ impl Tenant {
tasks::start_background_loops(self.tenant_id);
for timeline in not_broken_timelines {
match timeline
.activate(ctx)
.context("timeline activation for activating tenant")
{
Ok(()) => {}
Err(e) => {
error!(
"Failed to activate timeline {}: {:#}",
timeline.timeline_id, e
);
timeline.set_state(TimelineState::Broken);
*current_state = TenantState::Broken;
}
}
timeline.activate();
}
}
}
@@ -2104,7 +2093,7 @@ impl Tenant {
src_timeline: &Arc<Timeline>,
dst_id: TimelineId,
start_lsn: Option<Lsn>,
ctx: &RequestContext,
_ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
let src_id = src_timeline.timeline_id;
@@ -2197,7 +2186,7 @@ impl Tenant {
false,
Some(Arc::clone(src_timeline)),
)?
.initialize_with_lock(ctx, &mut timelines, true, true)?;
.initialize_with_lock(&mut timelines, true, true)?;
drop(timelines);
// Root timeline gets its layers during creation and uploads them along with the metadata.
@@ -2310,7 +2299,7 @@ impl Tenant {
let timeline = {
let mut timelines = self.timelines.lock().unwrap();
raw_timeline.initialize_with_lock(ctx, &mut timelines, false, true)?
raw_timeline.initialize_with_lock(&mut timelines, false, true)?
};
info!(

View File

@@ -14,7 +14,6 @@ use pageserver_api::models::{
DownloadRemoteLayersTaskState, LayerMapInfo, LayerResidenceStatus, TimelineState,
};
use remote_storage::GenericRemoteStorage;
use storage_broker::BrokerClientChannel;
use tokio::sync::{oneshot, watch, Semaphore, TryAcquireError};
use tokio_util::sync::CancellationToken;
use tracing::*;
@@ -31,7 +30,7 @@ use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering};
use std::sync::{Arc, Mutex, MutexGuard, RwLock, Weak};
use std::time::{Duration, Instant, SystemTime};
use crate::broker_client::{get_broker_client, is_broker_client_initialized};
use crate::broker_client::is_broker_client_initialized;
use crate::context::{DownloadBehavior, RequestContext};
use crate::tenant::remote_timeline_client::{self, index::LayerFileMetadata};
use crate::tenant::storage_layer::{
@@ -72,10 +71,10 @@ use crate::walredo::WalRedoManager;
use crate::METADATA_FILE_NAME;
use crate::ZERO_PAGE;
use crate::{is_temporary, task_mgr};
use walreceiver::spawn_connection_manager_task;
pub(super) use self::eviction_task::EvictionTaskTenantState;
use self::eviction_task::EvictionTaskTimelineState;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::layer_map::BatchedUpdates;
use super::remote_timeline_client::index::IndexPart;
@@ -215,7 +214,6 @@ pub struct Timeline {
/// or None if WAL receiver has not received anything for this timeline
/// yet.
pub last_received_wal: Mutex<Option<WalReceiverInfo>>,
pub walreceiver: WalReceiver,
/// Relation size cache
pub rel_size_cache: RwLock<HashMap<RelTag, (Lsn, BlockNumber)>>,
@@ -868,18 +866,10 @@ impl Timeline {
Ok(())
}
pub fn activate(self: &Arc<Self>, ctx: &RequestContext) -> anyhow::Result<()> {
if is_broker_client_initialized() {
self.launch_wal_receiver(ctx, get_broker_client().clone())?;
} else if cfg!(test) {
info!("not launching WAL receiver because broker client hasn't been initialized");
} else {
anyhow::bail!("broker client not initialized");
}
pub fn activate(self: &Arc<Self>) {
self.set_state(TimelineState::Active);
self.launch_wal_receiver();
self.launch_eviction_task();
Ok(())
}
pub fn set_state(&self, new_state: TimelineState) {
@@ -1230,31 +1220,7 @@ impl Timeline {
let (layer_flush_start_tx, _) = tokio::sync::watch::channel(0);
let (layer_flush_done_tx, _) = tokio::sync::watch::channel((0, Ok(())));
let tenant_conf_guard = tenant_conf.read().unwrap();
let wal_connect_timeout = tenant_conf_guard
.walreceiver_connect_timeout
.unwrap_or(conf.default_tenant_conf.walreceiver_connect_timeout);
let lagging_wal_timeout = tenant_conf_guard
.lagging_wal_timeout
.unwrap_or(conf.default_tenant_conf.lagging_wal_timeout);
let max_lsn_wal_lag = tenant_conf_guard
.max_lsn_wal_lag
.unwrap_or(conf.default_tenant_conf.max_lsn_wal_lag);
drop(tenant_conf_guard);
Arc::new_cyclic(|myself| {
let walreceiver = WalReceiver::new(
TenantTimelineId::new(tenant_id, timeline_id),
Weak::clone(myself),
WalReceiverConf {
wal_connect_timeout,
lagging_wal_timeout,
max_lsn_wal_lag,
auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(),
availability_zone: conf.availability_zone.clone(),
},
);
let mut result = Timeline {
conf,
tenant_conf,
@@ -1265,7 +1231,6 @@ impl Timeline {
layers: RwLock::new(LayerMap::default()),
walredo_mgr,
walreceiver,
remote_client: remote_client.map(Arc::new),
@@ -1385,17 +1350,44 @@ impl Timeline {
*flush_loop_state = FlushLoopState::Running;
}
pub(super) fn launch_wal_receiver(
&self,
ctx: &RequestContext,
broker_client: BrokerClientChannel,
) -> anyhow::Result<()> {
pub(super) fn launch_wal_receiver(self: &Arc<Self>) {
if !is_broker_client_initialized() {
if cfg!(test) {
info!("not launching WAL receiver because broker client hasn't been initialized");
return;
} else {
panic!("broker client not initialized");
}
}
info!(
"launching WAL receiver for timeline {} of tenant {}",
self.timeline_id, self.tenant_id
);
self.walreceiver.start(ctx, broker_client)?;
Ok(())
let tenant_conf_guard = self.tenant_conf.read().unwrap();
let lagging_wal_timeout = tenant_conf_guard
.lagging_wal_timeout
.unwrap_or(self.conf.default_tenant_conf.lagging_wal_timeout);
let walreceiver_connect_timeout = tenant_conf_guard
.walreceiver_connect_timeout
.unwrap_or(self.conf.default_tenant_conf.walreceiver_connect_timeout);
let max_lsn_wal_lag = tenant_conf_guard
.max_lsn_wal_lag
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag);
drop(tenant_conf_guard);
let self_clone = Arc::clone(self);
let background_ctx =
// XXX: this is a detached_child. Plumb through the ctx from call sites.
RequestContext::todo_child(TaskKind::WalReceiverManager, DownloadBehavior::Error);
spawn_connection_manager_task(
self_clone,
walreceiver_connect_timeout,
lagging_wal_timeout,
max_lsn_wal_lag,
crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(),
self.conf.availability_zone.clone(),
background_ctx,
);
}
///

View File

@@ -23,133 +23,14 @@
mod connection_manager;
mod walreceiver_connection;
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::{self, TaskKind, WALRECEIVER_RUNTIME};
use crate::tenant::timeline::walreceiver::connection_manager::{
connection_manager_loop_step, ConnectionManagerState,
};
use crate::task_mgr::WALRECEIVER_RUNTIME;
use anyhow::Context;
use std::future::Future;
use std::num::NonZeroU64;
use std::ops::ControlFlow;
use std::sync::atomic::{self, AtomicBool};
use std::sync::{Arc, Weak};
use std::time::Duration;
use storage_broker::BrokerClientChannel;
use tokio::select;
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::id::TenantTimelineId;
use super::Timeline;
#[derive(Clone)]
pub struct WalReceiverConf {
/// The timeout on the connection to safekeeper for WAL streaming.
pub wal_connect_timeout: Duration,
/// The timeout to use to determine when the current connection is "stale" and reconnect to the other one.
pub lagging_wal_timeout: Duration,
/// The Lsn lag to use to determine when the current connection is lagging to much behind and reconnect to the other one.
pub max_lsn_wal_lag: NonZeroU64,
pub auth_token: Option<Arc<String>>,
pub availability_zone: Option<String>,
}
pub struct WalReceiver {
timeline: TenantTimelineId,
timeline_ref: Weak<Timeline>,
conf: WalReceiverConf,
started: AtomicBool,
}
impl WalReceiver {
pub fn new(
timeline: TenantTimelineId,
timeline_ref: Weak<Timeline>,
conf: WalReceiverConf,
) -> Self {
Self {
timeline,
timeline_ref,
conf,
started: AtomicBool::new(false),
}
}
pub fn start(
&self,
ctx: &RequestContext,
mut broker_client: BrokerClientChannel,
) -> anyhow::Result<()> {
if self.started.load(atomic::Ordering::Acquire) {
anyhow::bail!("Wal receiver is already started");
}
let timeline = self.timeline_ref.upgrade().with_context(|| {
format!("walreceiver start on a dropped timeline {}", self.timeline)
})?;
let tenant_id = timeline.tenant_id;
let timeline_id = timeline.timeline_id;
let walreceiver_ctx =
ctx.detached_child(TaskKind::WalReceiverManager, DownloadBehavior::Error);
let wal_receiver_conf = self.conf.clone();
task_mgr::spawn(
WALRECEIVER_RUNTIME.handle(),
TaskKind::WalReceiverManager,
Some(tenant_id),
Some(timeline_id),
&format!("walreceiver for timeline {tenant_id}/{timeline_id}"),
false,
async move {
info!("WAL receiver manager started, connecting to broker");
let mut connection_manager_state = ConnectionManagerState::new(
timeline,
wal_receiver_conf,
);
loop {
select! {
_ = task_mgr::shutdown_watcher() => {
info!("WAL receiver shutdown requested, shutting down");
connection_manager_state.shutdown().await;
return Ok(());
},
loop_step_result = connection_manager_loop_step(
&mut broker_client,
&mut connection_manager_state,
&walreceiver_ctx,
) => match loop_step_result {
ControlFlow::Continue(()) => continue,
ControlFlow::Break(()) => {
info!("Connection manager loop ended, shutting down");
connection_manager_state.shutdown().await;
return Ok(());
}
},
}
}
}.instrument(info_span!(parent: None, "wal_connection_manager", tenant = %tenant_id, timeline = %timeline_id))
);
self.started.store(true, atomic::Ordering::Release);
Ok(())
}
pub async fn stop(&self) {
task_mgr::shutdown_tasks(
Some(TaskKind::WalReceiverManager),
Some(self.timeline.tenant_id),
Some(self.timeline.timeline_id),
)
.await;
self.started.store(false, atomic::Ordering::Release);
}
}
pub use connection_manager::spawn_connection_manager_task;
/// A handle of an asynchronous task.
/// The task has a channel that it can use to communicate its lifecycle events in a certain form, see [`TaskEvent`]
@@ -158,26 +39,26 @@ impl WalReceiver {
/// Note that the communication happens via the `watch` channel, that does not accumulate the events, replacing the old one with the never one on submission.
/// That may lead to certain events not being observed by the listener.
#[derive(Debug)]
struct TaskHandle<E> {
pub struct TaskHandle<E> {
join_handle: Option<tokio::task::JoinHandle<anyhow::Result<()>>>,
events_receiver: watch::Receiver<TaskStateUpdate<E>>,
cancellation: CancellationToken,
}
enum TaskEvent<E> {
pub enum TaskEvent<E> {
Update(TaskStateUpdate<E>),
End(anyhow::Result<()>),
}
#[derive(Debug, Clone)]
enum TaskStateUpdate<E> {
pub enum TaskStateUpdate<E> {
Started,
Progress(E),
}
impl<E: Clone> TaskHandle<E> {
/// Initializes the task, starting it immediately after the creation.
fn spawn<Fut>(
pub fn spawn<Fut>(
task: impl FnOnce(watch::Sender<TaskStateUpdate<E>>, CancellationToken) -> Fut + Send + 'static,
) -> Self
where
@@ -250,7 +131,7 @@ impl<E: Clone> TaskHandle<E> {
}
/// Aborts current task, waiting for it to finish.
async fn shutdown(self) {
pub async fn shutdown(self) {
if let Some(jh) = self.join_handle {
self.cancellation.cancel();
match jh.await {

View File

@@ -11,9 +11,11 @@
use std::{collections::HashMap, num::NonZeroU64, ops::ControlFlow, sync::Arc, time::Duration};
use super::{TaskStateUpdate, WalReceiverConf};
use super::TaskStateUpdate;
use crate::broker_client::get_broker_client;
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::TaskKind;
use crate::task_mgr::WALRECEIVER_RUNTIME;
use crate::task_mgr::{self, TaskKind};
use crate::tenant::Timeline;
use anyhow::Context;
use chrono::{NaiveDateTime, Utc};
@@ -36,17 +38,75 @@ use utils::{
use super::{walreceiver_connection::WalConnectionStatus, TaskEvent, TaskHandle};
/// Spawns the loop to take care of the timeline's WAL streaming connection.
pub fn spawn_connection_manager_task(
timeline: Arc<Timeline>,
wal_connect_timeout: Duration,
lagging_wal_timeout: Duration,
max_lsn_wal_lag: NonZeroU64,
auth_token: Option<Arc<String>>,
availability_zone: Option<String>,
ctx: RequestContext,
) {
let mut broker_client = get_broker_client().clone();
let tenant_id = timeline.tenant_id;
let timeline_id = timeline.timeline_id;
task_mgr::spawn(
WALRECEIVER_RUNTIME.handle(),
TaskKind::WalReceiverManager,
Some(tenant_id),
Some(timeline_id),
&format!("walreceiver for timeline {tenant_id}/{timeline_id}"),
false,
async move {
info!("WAL receiver manager started, connecting to broker");
let mut walreceiver_state = WalreceiverState::new(
timeline,
wal_connect_timeout,
lagging_wal_timeout,
max_lsn_wal_lag,
auth_token,
availability_zone,
);
loop {
select! {
_ = task_mgr::shutdown_watcher() => {
info!("WAL receiver shutdown requested, shutting down");
walreceiver_state.shutdown().await;
return Ok(());
},
loop_step_result = connection_manager_loop_step(
&mut broker_client,
&mut walreceiver_state,
&ctx,
) => match loop_step_result {
ControlFlow::Continue(()) => continue,
ControlFlow::Break(()) => {
info!("Connection manager loop ended, shutting down");
walreceiver_state.shutdown().await;
return Ok(());
}
},
}
}
}
.instrument(
info_span!(parent: None, "wal_connection_manager", tenant = %tenant_id, timeline = %timeline_id),
),
);
}
/// Attempts to subscribe for timeline updates, pushed by safekeepers into the broker.
/// Based on the updates, desides whether to start, keep or stop a WAL receiver task.
/// If storage broker subscription is cancelled, exits.
pub(super) async fn connection_manager_loop_step(
async fn connection_manager_loop_step(
broker_client: &mut BrokerClientChannel,
connection_manager_state: &mut ConnectionManagerState,
walreceiver_state: &mut WalreceiverState,
ctx: &RequestContext,
) -> ControlFlow<(), ()> {
let mut timeline_state_updates = connection_manager_state
.timeline
.subscribe_for_state_updates();
let mut timeline_state_updates = walreceiver_state.timeline.subscribe_for_state_updates();
match wait_for_active_timeline(&mut timeline_state_updates).await {
ControlFlow::Continue(()) => {}
@@ -57,8 +117,8 @@ pub(super) async fn connection_manager_loop_step(
}
let id = TenantTimelineId {
tenant_id: connection_manager_state.timeline.tenant_id,
timeline_id: connection_manager_state.timeline.timeline_id,
tenant_id: walreceiver_state.timeline.tenant_id,
timeline_id: walreceiver_state.timeline.timeline_id,
};
// Subscribe to the broker updates. Stream shares underlying TCP connection
@@ -68,7 +128,7 @@ pub(super) async fn connection_manager_loop_step(
info!("Subscribed for broker timeline updates");
loop {
let time_until_next_retry = connection_manager_state.time_until_next_retry();
let time_until_next_retry = walreceiver_state.time_until_next_retry();
// These things are happening concurrently:
//
@@ -81,12 +141,12 @@ pub(super) async fn connection_manager_loop_step(
// - timeline state changes to something that does not allow walreceiver to run concurrently
select! {
Some(wal_connection_update) = async {
match connection_manager_state.wal_connection.as_mut() {
match walreceiver_state.wal_connection.as_mut() {
Some(wal_connection) => Some(wal_connection.connection_task.next_task_event().await),
None => None,
}
} => {
let wal_connection = connection_manager_state.wal_connection.as_mut()
let wal_connection = walreceiver_state.wal_connection.as_mut()
.expect("Should have a connection, as checked by the corresponding select! guard");
match wal_connection_update {
TaskEvent::Update(TaskStateUpdate::Started) => {},
@@ -96,7 +156,7 @@ pub(super) async fn connection_manager_loop_step(
// from this safekeeper. This is good enough to clean unsuccessful
// retries history and allow reconnecting to this safekeeper without
// sleeping for a long time.
connection_manager_state.wal_connection_retries.remove(&wal_connection.sk_id);
walreceiver_state.wal_connection_retries.remove(&wal_connection.sk_id);
}
wal_connection.status = new_status;
}
@@ -105,7 +165,7 @@ pub(super) async fn connection_manager_loop_step(
Ok(()) => debug!("WAL receiving task finished"),
Err(e) => error!("wal receiver task finished with an error: {e:?}"),
}
connection_manager_state.drop_old_connection(false).await;
walreceiver_state.drop_old_connection(false).await;
},
}
},
@@ -113,7 +173,7 @@ pub(super) async fn connection_manager_loop_step(
// Got a new update from the broker
broker_update = broker_subscription.message() => {
match broker_update {
Ok(Some(broker_update)) => connection_manager_state.register_timeline_update(broker_update),
Ok(Some(broker_update)) => walreceiver_state.register_timeline_update(broker_update),
Err(e) => {
error!("broker subscription failed: {e}");
return ControlFlow::Continue(());
@@ -127,12 +187,12 @@ pub(super) async fn connection_manager_loop_step(
new_event = async {
loop {
if connection_manager_state.timeline.current_state() == TimelineState::Loading {
if walreceiver_state.timeline.current_state() == TimelineState::Loading {
warn!("wal connection manager should only be launched after timeline has become active");
}
match timeline_state_updates.changed().await {
Ok(()) => {
let new_state = connection_manager_state.timeline.current_state();
let new_state = walreceiver_state.timeline.current_state();
match new_state {
// we're already active as walreceiver, no need to reactivate
TimelineState::Active => continue,
@@ -174,9 +234,9 @@ pub(super) async fn connection_manager_loop_step(
} => debug!("Waking up for the next retry after waiting for {time_until_next_retry:?}"),
}
if let Some(new_candidate) = connection_manager_state.next_connection_candidate() {
if let Some(new_candidate) = walreceiver_state.next_connection_candidate() {
info!("Switching to new connection candidate: {new_candidate:?}");
connection_manager_state
walreceiver_state
.change_connection(new_candidate, ctx)
.await
}
@@ -254,17 +314,25 @@ const WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS: f64 = 15.0;
const WALCONNECTION_RETRY_BACKOFF_MULTIPLIER: f64 = 1.5;
/// All data that's needed to run endless broker loop and keep the WAL streaming connection alive, if possible.
pub(super) struct ConnectionManagerState {
struct WalreceiverState {
id: TenantTimelineId,
/// Use pageserver data about the timeline to filter out some of the safekeepers.
timeline: Arc<Timeline>,
conf: WalReceiverConf,
/// The timeout on the connection to safekeeper for WAL streaming.
wal_connect_timeout: Duration,
/// The timeout to use to determine when the current connection is "stale" and reconnect to the other one.
lagging_wal_timeout: Duration,
/// The Lsn lag to use to determine when the current connection is lagging to much behind and reconnect to the other one.
max_lsn_wal_lag: NonZeroU64,
/// Current connection to safekeeper for WAL streaming.
wal_connection: Option<WalConnection>,
/// Info about retries and unsuccessful attempts to connect to safekeepers.
wal_connection_retries: HashMap<NodeId, RetryInfo>,
/// Data about all timelines, available for connection, fetched from storage broker, grouped by their corresponding safekeeper node id.
wal_stream_candidates: HashMap<NodeId, BrokerSkTimeline>,
auth_token: Option<Arc<String>>,
availability_zone: Option<String>,
}
/// Current connection data.
@@ -307,8 +375,15 @@ struct BrokerSkTimeline {
latest_update: NaiveDateTime,
}
impl ConnectionManagerState {
pub(super) fn new(timeline: Arc<Timeline>, conf: WalReceiverConf) -> Self {
impl WalreceiverState {
fn new(
timeline: Arc<Timeline>,
wal_connect_timeout: Duration,
lagging_wal_timeout: Duration,
max_lsn_wal_lag: NonZeroU64,
auth_token: Option<Arc<String>>,
availability_zone: Option<String>,
) -> Self {
let id = TenantTimelineId {
tenant_id: timeline.tenant_id,
timeline_id: timeline.timeline_id,
@@ -316,10 +391,14 @@ impl ConnectionManagerState {
Self {
id,
timeline,
conf,
wal_connect_timeout,
lagging_wal_timeout,
max_lsn_wal_lag,
wal_connection: None,
wal_stream_candidates: HashMap::new(),
wal_connection_retries: HashMap::new(),
auth_token,
availability_zone,
}
}
@@ -328,7 +407,7 @@ impl ConnectionManagerState {
self.drop_old_connection(true).await;
let id = self.id;
let connect_timeout = self.conf.wal_connect_timeout;
let connect_timeout = self.wal_connect_timeout;
let timeline = Arc::clone(&self.timeline);
let ctx = ctx.detached_child(
TaskKind::WalReceiverConnectionHandler,
@@ -484,7 +563,7 @@ impl ConnectionManagerState {
(now - existing_wal_connection.status.latest_connection_update).to_std()
{
// Drop connection if we haven't received keepalive message for a while.
if latest_interaciton > self.conf.wal_connect_timeout {
if latest_interaciton > self.wal_connect_timeout {
return Some(NewWalConnectionCandidate {
safekeeper_id: new_sk_id,
wal_source_connconf: new_wal_source_connconf,
@@ -494,7 +573,7 @@ impl ConnectionManagerState {
existing_wal_connection.status.latest_connection_update,
),
check_time: now,
threshold: self.conf.wal_connect_timeout,
threshold: self.wal_connect_timeout,
},
});
}
@@ -510,7 +589,7 @@ impl ConnectionManagerState {
// Check if the new candidate has much more WAL than the current one.
match new_commit_lsn.0.checked_sub(current_commit_lsn.0) {
Some(new_sk_lsn_advantage) => {
if new_sk_lsn_advantage >= self.conf.max_lsn_wal_lag.get() {
if new_sk_lsn_advantage >= self.max_lsn_wal_lag.get() {
return Some(NewWalConnectionCandidate {
safekeeper_id: new_sk_id,
wal_source_connconf: new_wal_source_connconf,
@@ -518,16 +597,16 @@ impl ConnectionManagerState {
reason: ReconnectReason::LaggingWal {
current_commit_lsn,
new_commit_lsn,
threshold: self.conf.max_lsn_wal_lag,
threshold: self.max_lsn_wal_lag,
},
});
}
// If we have a candidate with the same commit_lsn as the current one, which is in the same AZ as pageserver,
// and the current one is not, switch to the new one.
if self.conf.availability_zone.is_some()
if self.availability_zone.is_some()
&& existing_wal_connection.availability_zone
!= self.conf.availability_zone
&& self.conf.availability_zone == new_availability_zone
!= self.availability_zone
&& self.availability_zone == new_availability_zone
{
return Some(NewWalConnectionCandidate {
safekeeper_id: new_sk_id,
@@ -598,7 +677,7 @@ impl ConnectionManagerState {
if let Some(waiting_for_new_lsn_since) = waiting_for_new_lsn_since {
if let Ok(waiting_for_new_wal) = (now - waiting_for_new_lsn_since).to_std() {
if candidate_commit_lsn > current_commit_lsn
&& waiting_for_new_wal > self.conf.lagging_wal_timeout
&& waiting_for_new_wal > self.lagging_wal_timeout
{
return Some(NewWalConnectionCandidate {
safekeeper_id: new_sk_id,
@@ -612,7 +691,7 @@ impl ConnectionManagerState {
existing_wal_connection.status.latest_wal_update,
),
check_time: now,
threshold: self.conf.lagging_wal_timeout,
threshold: self.lagging_wal_timeout,
},
});
}
@@ -678,11 +757,11 @@ impl ConnectionManagerState {
match wal_stream_connection_config(
self.id,
info.safekeeper_connstr.as_ref(),
match &self.conf.auth_token {
match &self.auth_token {
None => None,
Some(x) => Some(x),
},
self.conf.availability_zone.as_deref(),
self.availability_zone.as_deref(),
) {
Ok(connstr) => Some((*sk_id, info, connstr)),
Err(e) => {
@@ -696,7 +775,7 @@ impl ConnectionManagerState {
/// Remove candidates which haven't sent broker updates for a while.
fn cleanup_old_candidates(&mut self) {
let mut node_ids_to_remove = Vec::with_capacity(self.wal_stream_candidates.len());
let lagging_wal_timeout = self.conf.lagging_wal_timeout;
let lagging_wal_timeout = self.lagging_wal_timeout;
self.wal_stream_candidates.retain(|node_id, broker_info| {
if let Ok(time_since_latest_broker_update) =
@@ -720,7 +799,7 @@ impl ConnectionManagerState {
}
}
pub(super) async fn shutdown(mut self) {
async fn shutdown(mut self) {
if let Some(wal_connection) = self.wal_connection.take() {
wal_connection.connection_task.shutdown().await;
}
@@ -824,7 +903,7 @@ mod tests {
let mut state = dummy_state(&harness).await;
let now = Utc::now().naive_utc();
let lagging_wal_timeout = chrono::Duration::from_std(state.conf.lagging_wal_timeout)?;
let lagging_wal_timeout = chrono::Duration::from_std(state.lagging_wal_timeout)?;
let delay_over_threshold = now - lagging_wal_timeout - lagging_wal_timeout;
state.wal_connection = None;
@@ -835,7 +914,7 @@ mod tests {
(
NodeId(3),
dummy_broker_sk_timeline(
1 + state.conf.max_lsn_wal_lag.get(),
1 + state.max_lsn_wal_lag.get(),
"delay_over_threshold",
delay_over_threshold,
),
@@ -869,7 +948,7 @@ mod tests {
streaming_lsn: Some(Lsn(current_lsn)),
};
state.conf.max_lsn_wal_lag = NonZeroU64::new(100).unwrap();
state.max_lsn_wal_lag = NonZeroU64::new(100).unwrap();
state.wal_connection = Some(WalConnection {
started_at: now,
sk_id: connected_sk_id,
@@ -887,7 +966,7 @@ mod tests {
(
connected_sk_id,
dummy_broker_sk_timeline(
current_lsn + state.conf.max_lsn_wal_lag.get() * 2,
current_lsn + state.max_lsn_wal_lag.get() * 2,
DUMMY_SAFEKEEPER_HOST,
now,
),
@@ -899,7 +978,7 @@ mod tests {
(
NodeId(2),
dummy_broker_sk_timeline(
current_lsn + state.conf.max_lsn_wal_lag.get() / 2,
current_lsn + state.max_lsn_wal_lag.get() / 2,
"not_enough_advanced_lsn",
now,
),
@@ -924,11 +1003,7 @@ mod tests {
state.wal_connection = None;
state.wal_stream_candidates = HashMap::from([(
NodeId(0),
dummy_broker_sk_timeline(
1 + state.conf.max_lsn_wal_lag.get(),
DUMMY_SAFEKEEPER_HOST,
now,
),
dummy_broker_sk_timeline(1 + state.max_lsn_wal_lag.get(), DUMMY_SAFEKEEPER_HOST, now),
)]);
let only_candidate = state
@@ -1026,7 +1101,7 @@ mod tests {
let now = Utc::now().naive_utc();
let connected_sk_id = NodeId(0);
let new_lsn = Lsn(current_lsn.0 + state.conf.max_lsn_wal_lag.get() + 1);
let new_lsn = Lsn(current_lsn.0 + state.max_lsn_wal_lag.get() + 1);
let connection_status = WalConnectionStatus {
is_connected: true,
@@ -1071,7 +1146,7 @@ mod tests {
ReconnectReason::LaggingWal {
current_commit_lsn: current_lsn,
new_commit_lsn: new_lsn,
threshold: state.conf.max_lsn_wal_lag
threshold: state.max_lsn_wal_lag
},
"Should select bigger WAL safekeeper if it starts to lag enough"
);
@@ -1090,7 +1165,7 @@ mod tests {
let current_lsn = Lsn(100_000).align();
let now = Utc::now().naive_utc();
let wal_connect_timeout = chrono::Duration::from_std(state.conf.wal_connect_timeout)?;
let wal_connect_timeout = chrono::Duration::from_std(state.wal_connect_timeout)?;
let time_over_threshold =
Utc::now().naive_utc() - wal_connect_timeout - wal_connect_timeout;
@@ -1133,7 +1208,7 @@ mod tests {
..
} => {
assert_eq!(last_keep_alive, Some(time_over_threshold));
assert_eq!(threshold, state.conf.lagging_wal_timeout);
assert_eq!(threshold, state.lagging_wal_timeout);
}
unexpected => panic!("Unexpected reason: {unexpected:?}"),
}
@@ -1153,7 +1228,7 @@ mod tests {
let new_lsn = Lsn(100_100).align();
let now = Utc::now().naive_utc();
let lagging_wal_timeout = chrono::Duration::from_std(state.conf.lagging_wal_timeout)?;
let lagging_wal_timeout = chrono::Duration::from_std(state.lagging_wal_timeout)?;
let time_over_threshold =
Utc::now().naive_utc() - lagging_wal_timeout - lagging_wal_timeout;
@@ -1200,7 +1275,7 @@ mod tests {
assert_eq!(current_commit_lsn, current_lsn);
assert_eq!(candidate_commit_lsn, new_lsn);
assert_eq!(last_wal_interaction, Some(time_over_threshold));
assert_eq!(threshold, state.conf.lagging_wal_timeout);
assert_eq!(threshold, state.lagging_wal_timeout);
}
unexpected => panic!("Unexpected reason: {unexpected:?}"),
}
@@ -1214,29 +1289,27 @@ mod tests {
const DUMMY_SAFEKEEPER_HOST: &str = "safekeeper_connstr";
async fn dummy_state(harness: &TenantHarness<'_>) -> ConnectionManagerState {
async fn dummy_state(harness: &TenantHarness<'_>) -> WalreceiverState {
let (tenant, ctx) = harness.load().await;
let timeline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), crate::DEFAULT_PG_VERSION, &ctx)
.expect("Failed to create an empty timeline for dummy wal connection manager");
let timeline = timeline.initialize(&ctx).unwrap();
ConnectionManagerState {
WalreceiverState {
id: TenantTimelineId {
tenant_id: harness.tenant_id,
timeline_id: TIMELINE_ID,
},
timeline,
conf: WalReceiverConf {
wal_connect_timeout: Duration::from_secs(1),
lagging_wal_timeout: Duration::from_secs(1),
max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(),
auth_token: None,
availability_zone: None,
},
wal_connect_timeout: Duration::from_secs(1),
lagging_wal_timeout: Duration::from_secs(1),
max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(),
wal_connection: None,
wal_stream_candidates: HashMap::new(),
wal_connection_retries: HashMap::new(),
auth_token: None,
availability_zone: None,
}
}
@@ -1248,7 +1321,7 @@ mod tests {
let harness = TenantHarness::create("switch_to_same_availability_zone")?;
let mut state = dummy_state(&harness).await;
state.conf.availability_zone = test_az.clone();
state.availability_zone = test_az.clone();
let current_lsn = Lsn(100_000).align();
let now = Utc::now().naive_utc();

View File

@@ -42,7 +42,7 @@ use utils::lsn::Lsn;
/// Status of the connection.
#[derive(Debug, Clone, Copy)]
pub(super) struct WalConnectionStatus {
pub struct WalConnectionStatus {
/// If we were able to initiate a postgres connection, this means that safekeeper process is at least running.
pub is_connected: bool,
/// Defines a healthy connection as one on which pageserver received WAL from safekeeper
@@ -60,7 +60,7 @@ pub(super) struct WalConnectionStatus {
/// Open a connection to the given safekeeper and receive WAL, sending back progress
/// messages as we go.
pub(super) async fn handle_walreceiver_connection(
pub async fn handle_walreceiver_connection(
timeline: Arc<Timeline>,
wal_source_connconf: PgConnectionConfig,
events_sender: watch::Sender<TaskStateUpdate<WalConnectionStatus>>,

View File

@@ -9,14 +9,6 @@
* To prevent this, it has been decided to limit possible interactions
* with the outside world using the Secure Computing BPF mode.
*
* This code is intended to support both x86_64 and aarch64. The latter
* doesn't implement some syscalls like open and select. We allow both
* select (absent on aarch64) and pselect6 (present on both architectures)
* We call select(2) through libc, and the libc wrapper calls select or pselect6
* depending on the architecture. You can check which syscalls are present on
* different architectures with the `scmp_sys_resolver` tool from the
* seccomp package.
*
* We use this mode to disable all syscalls not in the allowlist. This
* approach has its pros & cons:
*
@@ -81,6 +73,8 @@
* I suspect that certain libc functions might involve slightly
* different syscalls, e.g. select/pselect6/pselect6_time64/whatever.
*
* - Test on any arch other than amd64 to see if it works there.
*
*-------------------------------------------------------------------------
*/
@@ -128,10 +122,9 @@ seccomp_load_rules(PgSeccompRule *rules, int count)
/*
* First, check that open of a well-known file works.
* XXX: We use raw syscall() to call the very openat() which is
* present both on x86_64 and on aarch64.
* XXX: We use raw syscall() to call the very open().
*/
fd = syscall(SCMP_SYS(openat), AT_FDCWD, "/dev/null", O_RDONLY, 0);
fd = syscall(SCMP_SYS(open), "/dev/null", O_RDONLY, 0);
if (seccomp_test_sighandler_done)
ereport(FATAL,
(errcode(ERRCODE_SYSTEM_ERROR),
@@ -142,15 +135,15 @@ seccomp_load_rules(PgSeccompRule *rules, int count)
errmsg("seccomp: could not open /dev/null for seccomp testing: %m")));
close((int) fd);
/* Set a trap on openat() to test seccomp bpf */
rule = PG_SCMP(openat, SCMP_ACT_TRAP);
/* Set a trap on open() to test seccomp bpf */
rule = PG_SCMP(open, SCMP_ACT_TRAP);
if (do_seccomp_load_rules(&rule, 1, SCMP_ACT_ALLOW) != 0)
ereport(FATAL,
(errcode(ERRCODE_SYSTEM_ERROR),
errmsg("seccomp: could not load test trap")));
/* Finally, check that openat() now raises SIGSYS */
(void) syscall(SCMP_SYS(openat), AT_FDCWD, "/dev/null", O_RDONLY, 0);
/* Finally, check that open() now raises SIGSYS */
(void) syscall(SCMP_SYS(open), "/dev/null", O_RDONLY, 0);
if (!seccomp_test_sighandler_done)
ereport(FATAL,
(errcode(ERRCODE_SYSTEM_ERROR),
@@ -231,7 +224,7 @@ seccomp_test_sighandler(int signum, siginfo_t *info, void *cxt pg_attribute_unus
die(1, DIE_PREFIX "bad signal number\n");
/* TODO: maybe somehow extract the hardcoded syscall number */
if (info->si_syscall != SCMP_SYS(openat))
if (info->si_syscall != SCMP_SYS(open))
die(1, DIE_PREFIX "bad syscall number\n");
#undef DIE_PREFIX

View File

@@ -53,7 +53,7 @@ pub async fn password_hack(
.await?;
info!(project = &payload.project, "received missing parameter");
creds.project = Some(payload.project);
creds.project = Some(payload.project.into());
let mut node = api.wake_compute(extra, creds).await?;
node.config.password(payload.password);

View File

@@ -2,7 +2,7 @@
use crate::error::UserFacingError;
use pq_proto::StartupMessageParams;
use std::collections::HashSet;
use std::borrow::Cow;
use thiserror::Error;
use tracing::info;
@@ -19,10 +19,11 @@ pub enum ClientCredsParseError {
InconsistentProjectNames { domain: String, option: String },
#[error(
"Common name inferred from SNI ('{}') is not known",
.cn,
"SNI ('{}') inconsistently formatted with respect to common name ('{}'). \
SNI should be formatted as '<project-name>.{}'.",
.sni, .cn, .cn,
)]
UnknownCommonName { cn: String },
InconsistentSni { sni: String, cn: String },
#[error("Project name ('{0}') must contain only alphanumeric characters and hyphen.")]
MalformedProjectName(String),
@@ -36,7 +37,7 @@ impl UserFacingError for ClientCredsParseError {}
pub struct ClientCredentials<'a> {
pub user: &'a str,
// TODO: this is a severe misnomer! We should think of a new name ASAP.
pub project: Option<String>,
pub project: Option<Cow<'a, str>>,
}
impl ClientCredentials<'_> {
@@ -50,7 +51,7 @@ impl<'a> ClientCredentials<'a> {
pub fn parse(
params: &'a StartupMessageParams,
sni: Option<&str>,
common_names: Option<HashSet<String>>,
common_name: Option<&str>,
) -> Result<Self, ClientCredsParseError> {
use ClientCredsParseError::*;
@@ -59,43 +60,37 @@ impl<'a> ClientCredentials<'a> {
let user = get_param("user")?;
// Project name might be passed via PG's command-line options.
let project_option = params
.options_raw()
.and_then(|mut options| options.find_map(|opt| opt.strip_prefix("project=")))
.map(|name| name.to_string());
let project_option = params.options_raw().and_then(|mut options| {
options
.find_map(|opt| opt.strip_prefix("project="))
.map(Cow::Borrowed)
});
let project_from_domain = if let Some(sni_str) = sni {
if let Some(cn) = common_names {
let common_name_from_sni = sni_str.split_once('.').map(|(_, domain)| domain);
let project = common_name_from_sni
.and_then(|domain| {
if cn.contains(domain) {
subdomain_from_sni(sni_str, domain)
} else {
None
}
// Alternative project name is in fact a subdomain from SNI.
// NOTE: we do not consider SNI if `common_name` is missing.
let project_domain = sni
.zip(common_name)
.map(|(sni, cn)| {
subdomain_from_sni(sni, cn)
.ok_or_else(|| InconsistentSni {
sni: sni.into(),
cn: cn.into(),
})
.ok_or_else(|| UnknownCommonName {
cn: common_name_from_sni.unwrap_or("").into(),
})?;
.map(Cow::<'static, str>::Owned)
})
.transpose()?;
Some(project)
} else {
None
}
} else {
None
};
let project = match (project_option, project_from_domain) {
let project = match (project_option, project_domain) {
// Invariant: if we have both project name variants, they should match.
(Some(option), Some(domain)) if option != domain => {
Some(Err(InconsistentProjectNames { domain, option }))
Some(Err(InconsistentProjectNames {
domain: domain.into(),
option: option.into(),
}))
}
// Invariant: project name may not contain certain characters.
(a, b) => a.or(b).map(|name| match project_name_valid(&name) {
false => Err(MalformedProjectName(name)),
false => Err(MalformedProjectName(name.into())),
true => Ok(name),
}),
}
@@ -154,9 +149,9 @@ mod tests {
let options = StartupMessageParams::new([("user", "john_doe")]);
let sni = Some("foo.localhost");
let common_names = Some(["localhost".into()].into());
let common_name = Some("localhost");
let creds = ClientCredentials::parse(&options, sni, common_names)?;
let creds = ClientCredentials::parse(&options, sni, common_name)?;
assert_eq!(creds.user, "john_doe");
assert_eq!(creds.project.as_deref(), Some("foo"));
@@ -182,41 +177,24 @@ mod tests {
let options = StartupMessageParams::new([("user", "john_doe"), ("options", "project=baz")]);
let sni = Some("baz.localhost");
let common_names = Some(["localhost".into()].into());
let common_name = Some("localhost");
let creds = ClientCredentials::parse(&options, sni, common_names)?;
let creds = ClientCredentials::parse(&options, sni, common_name)?;
assert_eq!(creds.user, "john_doe");
assert_eq!(creds.project.as_deref(), Some("baz"));
Ok(())
}
#[test]
fn parse_multi_common_names() -> anyhow::Result<()> {
let options = StartupMessageParams::new([("user", "john_doe")]);
let common_names = Some(["a.com".into(), "b.com".into()].into());
let sni = Some("p1.a.com");
let creds = ClientCredentials::parse(&options, sni, common_names)?;
assert_eq!(creds.project.as_deref(), Some("p1"));
let common_names = Some(["a.com".into(), "b.com".into()].into());
let sni = Some("p1.b.com");
let creds = ClientCredentials::parse(&options, sni, common_names)?;
assert_eq!(creds.project.as_deref(), Some("p1"));
Ok(())
}
#[test]
fn parse_projects_different() {
let options =
StartupMessageParams::new([("user", "john_doe"), ("options", "project=first")]);
let sni = Some("second.localhost");
let common_names = Some(["localhost".into()].into());
let common_name = Some("localhost");
let err = ClientCredentials::parse(&options, sni, common_names).expect_err("should fail");
let err = ClientCredentials::parse(&options, sni, common_name).expect_err("should fail");
match err {
InconsistentProjectNames { domain, option } => {
assert_eq!(option, "first");
@@ -231,12 +209,13 @@ mod tests {
let options = StartupMessageParams::new([("user", "john_doe")]);
let sni = Some("project.localhost");
let common_names = Some(["example.com".into()].into());
let common_name = Some("example.com");
let err = ClientCredentials::parse(&options, sni, common_names).expect_err("should fail");
let err = ClientCredentials::parse(&options, sni, common_name).expect_err("should fail");
match err {
UnknownCommonName { cn } => {
assert_eq!(cn, "localhost");
InconsistentSni { sni, cn } => {
assert_eq!(sni, "project.localhost");
assert_eq!(cn, "example.com");
}
_ => panic!("bad error: {err:?}"),
}

View File

@@ -1,12 +1,6 @@
use crate::auth;
use anyhow::{bail, ensure, Context, Ok};
use rustls::sign;
use std::{
collections::{HashMap, HashSet},
str::FromStr,
sync::Arc,
time::Duration,
};
use anyhow::{bail, ensure, Context};
use std::{str::FromStr, sync::Arc, time::Duration};
pub struct ProxyConfig {
pub tls_config: Option<TlsConfig>,
@@ -22,7 +16,7 @@ pub struct MetricCollectionConfig {
pub struct TlsConfig {
pub config: Arc<rustls::ServerConfig>,
pub common_names: Option<HashSet<String>>,
pub common_name: Option<String>,
}
impl TlsConfig {
@@ -32,34 +26,28 @@ impl TlsConfig {
}
/// Configure TLS for the main endpoint.
pub fn configure_tls(
key_path: &str,
cert_path: &str,
certs_dir: Option<&String>,
) -> anyhow::Result<TlsConfig> {
let mut cert_resolver = CertResolver::new();
pub fn configure_tls(key_path: &str, cert_path: &str) -> anyhow::Result<TlsConfig> {
let key = {
let key_bytes = std::fs::read(key_path).context("TLS key file")?;
let mut keys = rustls_pemfile::pkcs8_private_keys(&mut &key_bytes[..])
.context(format!("Failed to read TLS keys at '{key_path}'"))?;
// add default certificate
cert_resolver.add_cert(key_path, cert_path)?;
ensure!(keys.len() == 1, "keys.len() = {} (should be 1)", keys.len());
keys.pop().map(rustls::PrivateKey).unwrap()
};
// add extra certificates
if let Some(certs_dir) = certs_dir {
for entry in std::fs::read_dir(certs_dir)? {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
// file names aligned with default cert-manager names
let key_path = path.join("tls.key");
let cert_path = path.join("tls.crt");
if key_path.exists() && cert_path.exists() {
cert_resolver
.add_cert(&key_path.to_string_lossy(), &cert_path.to_string_lossy())?;
}
}
}
}
let cert_chain_bytes = std::fs::read(cert_path)
.context(format!("Failed to read TLS cert file at '{cert_path}.'"))?;
let common_names = cert_resolver.get_common_names();
let cert_chain = {
rustls_pemfile::certs(&mut &cert_chain_bytes[..])
.context(format!(
"Failed to read TLS certificate chain from bytes from file at '{cert_path}'."
))?
.into_iter()
.map(rustls::Certificate)
.collect()
};
let config = rustls::ServerConfig::builder()
.with_safe_default_cipher_suites()
@@ -67,116 +55,27 @@ pub fn configure_tls(
// allow TLS 1.2 to be compatible with older client libraries
.with_protocol_versions(&[&rustls::version::TLS13, &rustls::version::TLS12])?
.with_no_client_auth()
.with_cert_resolver(Arc::new(cert_resolver))
.with_single_cert(cert_chain, key)?
.into();
// determine common name from tls-cert (-c server.crt param).
// used in asserting project name formatting invariant.
let common_name = {
let pem = x509_parser::pem::parse_x509_pem(&cert_chain_bytes)
.context(format!(
"Failed to parse PEM object from bytes from file at '{cert_path}'."
))?
.1;
let common_name = pem.parse_x509()?.subject().to_string();
common_name.strip_prefix("CN=*.").map(|s| s.to_string())
};
Ok(TlsConfig {
config,
common_names: Some(common_names),
common_name,
})
}
struct CertResolver {
certs: HashMap<String, Arc<rustls::sign::CertifiedKey>>,
}
impl CertResolver {
fn new() -> Self {
Self {
certs: HashMap::new(),
}
}
fn add_cert(&mut self, key_path: &str, cert_path: &str) -> anyhow::Result<()> {
let priv_key = {
let key_bytes = std::fs::read(key_path).context("TLS key file")?;
let mut keys = rustls_pemfile::pkcs8_private_keys(&mut &key_bytes[..])
.context(format!("Failed to read TLS keys at '{key_path}'"))?;
ensure!(keys.len() == 1, "keys.len() = {} (should be 1)", keys.len());
keys.pop().map(rustls::PrivateKey).unwrap()
};
let key = sign::any_supported_type(&priv_key).context("invalid private key")?;
let cert_chain_bytes = std::fs::read(cert_path)
.context(format!("Failed to read TLS cert file at '{cert_path}.'"))?;
let cert_chain = {
rustls_pemfile::certs(&mut &cert_chain_bytes[..])
.context(format!(
"Failed to read TLS certificate chain from bytes from file at '{cert_path}'."
))?
.into_iter()
.map(rustls::Certificate)
.collect()
};
let common_name = {
let pem = x509_parser::pem::parse_x509_pem(&cert_chain_bytes)
.context(format!(
"Failed to parse PEM object from bytes from file at '{cert_path}'."
))?
.1;
let common_name = pem.parse_x509()?.subject().to_string();
// We only use non-wildcard certificates in link proxy so it seems okay to treat them the same as
// wildcard ones as we don't use SNI there. That treatment only affects certificate selection, so
// verify-full will still check wildcard match. Old coding here just ignored non-wildcard common names
// and passed None instead, which blows up number of cases downstream code should handle. Proper coding
// here should better avoid Option for common_names, and do wildcard-based certificate selection instead
// of cutting off '*.' parts.
if common_name.starts_with("CN=*.") {
common_name.strip_prefix("CN=*.").map(|s| s.to_string())
} else {
common_name.strip_prefix("CN=").map(|s| s.to_string())
}
}
.context(format!(
"Failed to parse common name from certificate at '{cert_path}'."
))?;
self.certs.insert(
common_name,
Arc::new(rustls::sign::CertifiedKey::new(cert_chain, key)),
);
Ok(())
}
fn get_common_names(&self) -> HashSet<String> {
self.certs.keys().map(|s| s.to_string()).collect()
}
}
impl rustls::server::ResolvesServerCert for CertResolver {
fn resolve(
&self,
_client_hello: rustls::server::ClientHello,
) -> Option<Arc<rustls::sign::CertifiedKey>> {
// loop here and cut off more and more subdomains until we find
// a match to get a proper wildcard support. OTOH, we now do not
// use nested domains, so keep this simple for now.
//
// With the current coding foo.com will match *.foo.com and that
// repeats behavior of the old code.
if let Some(mut sni_name) = _client_hello.server_name() {
loop {
if let Some(cert) = self.certs.get(sni_name) {
return Some(cert.clone());
}
if let Some((_, rest)) = sni_name.split_once('.') {
sni_name = rest;
} else {
return None;
}
}
} else {
None
}
}
}
/// Helper for cmdline cache options parsing.
pub struct CacheOptions {
/// Max number of entries.

View File

@@ -132,11 +132,7 @@ fn build_config(args: &clap::ArgMatches) -> anyhow::Result<&'static ProxyConfig>
args.get_one::<String>("tls-key"),
args.get_one::<String>("tls-cert"),
) {
(Some(key_path), Some(cert_path)) => Some(config::configure_tls(
key_path,
cert_path,
args.get_one::<String>("certs-dir"),
)?),
(Some(key_path), Some(cert_path)) => Some(config::configure_tls(key_path, cert_path)?),
(None, None) => None,
_ => bail!("either both or neither tls-key and tls-cert must be specified"),
};
@@ -258,12 +254,6 @@ fn cli() -> clap::Command {
.alias("ssl-cert") // backwards compatibility
.help("path to TLS cert for client postgres connections"),
)
// tls-key and tls-cert are for backwards compatibility, we can put all certs in one dir
.arg(
Arg::new("certs-dir")
.long("certs-dir")
.help("path to directory with TLS certificates for client postgres connections"),
)
.arg(
Arg::new("metric-collection-endpoint")
.long("metric-collection-endpoint")

View File

@@ -5,7 +5,7 @@ use chrono::{DateTime, Utc};
use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE};
use serde::Serialize;
use std::collections::HashMap;
use tracing::{error, info, instrument, trace, warn};
use tracing::{debug, error, info, instrument, trace};
const PROXY_IO_BYTES_PER_CLIENT: &str = "proxy_io_bytes_per_client";
@@ -84,14 +84,10 @@ fn gather_proxy_io_bytes_per_client() -> Vec<(Ids, (u64, DateTime<Utc>))> {
let value = ms.get_counter().get_value() as u64;
// Report if the metric value is suspiciously large
if value > (1u64 << 40) {
warn!(
"potentially abnormal counter value: branch_id {} endpoint_id {} val: {}",
branch_id, endpoint_id, value
);
}
debug!(
"branch_id {} endpoint_id {} val: {}",
branch_id, endpoint_id, value
);
current_metrics.push((
Ids {
endpoint_id: endpoint_id.to_string(),
@@ -128,15 +124,11 @@ async fn collect_metrics_iteration(
let mut value = *curr_val;
if let Some((prev_val, prev_time)) = cached_metrics.get(curr_key) {
// Only send metrics updates if the metric has increased
if curr_val > prev_val {
// Only send metrics updates if the metric has changed
if curr_val - prev_val > 0 {
value = curr_val - prev_val;
start_time = *prev_time;
} else {
if curr_val < prev_val {
error!("proxy_io_bytes_per_client metric value decreased from {} to {} for key {:?}",
prev_val, curr_val, curr_key);
}
return None;
}
};
@@ -197,7 +189,7 @@ async fn collect_metrics_iteration(
})
// update cached value (add delta) and time
.and_modify(|e| {
e.0 = e.0.saturating_add(send_metric.value);
e.0 += send_metric.value;
e.1 = stop_time
})
// cache new metric

View File

@@ -124,11 +124,11 @@ pub async fn handle_ws_client(
// Extract credentials which we're going to use for auth.
let creds = {
let common_names = tls.and_then(|tls| tls.common_names.clone());
let common_name = tls.and_then(|tls| tls.common_name.as_deref());
let result = config
.auth_backend
.as_ref()
.map(|_| auth::ClientCredentials::parse(&params, hostname, common_names))
.map(|_| auth::ClientCredentials::parse(&params, hostname, common_name))
.transpose();
async { result }.or_else(|e| stream.throw_error(e)).await?
@@ -163,11 +163,11 @@ async fn handle_client(
// Extract credentials which we're going to use for auth.
let creds = {
let sni = stream.get_ref().sni_hostname();
let common_names = tls.and_then(|tls| tls.common_names.clone());
let common_name = tls.and_then(|tls| tls.common_name.as_deref());
let result = config
.auth_backend
.as_ref()
.map(|_| auth::ClientCredentials::parse(&params, sni, common_names))
.map(|_| auth::ClientCredentials::parse(&params, sni, common_name))
.transpose();
async { result }.or_else(|e| stream.throw_error(e)).await?

View File

@@ -54,11 +54,9 @@ fn generate_tls_config<'a>(
.with_single_cert(vec![cert], key)?
.into();
let common_names = Some([common_name.to_owned()].iter().cloned().collect());
TlsConfig {
config,
common_names,
common_name: Some(common_name.to_string()),
}
};

View File

@@ -8,7 +8,13 @@
# warnings and errors right in the editor.
# In vscode, this setting is Rust-analyzer>Check On Save:Command
# manual-range-contains wants
# !(4..=MAX_STARTUP_PACKET_LENGTH).contains(&len)
# instead of
# len < 4 || len > MAX_STARTUP_PACKET_LENGTH
# , let's disagree.
# * `-A unknown_lints` do not warn about unknown lint suppressions
# that people with newer toolchains might use
# * `-D warnings` - fail on any warnings (`cargo` returns non-zero exit status)
cargo clippy --locked --all --all-targets --all-features -- -A unknown_lints -D warnings
cargo clippy --locked --all --all-targets --all-features -- -A unknown_lints -A clippy::manual-range-contains -D warnings

View File

@@ -30,7 +30,6 @@ serde_with.workspace = true
signal-hook.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["fs"] }
tokio-io-timeout.workspace = true
tokio-postgres.workspace = true
toml_edit.workspace = true
tracing.workspace = true

View File

@@ -674,8 +674,7 @@ impl Timeline {
bail!(TimelineError::Cancelled(self.ttid));
}
let mut state = self.write_shared_state();
state.sk.inmem.backup_lsn = max(state.sk.inmem.backup_lsn, backup_lsn);
self.write_shared_state().sk.inmem.backup_lsn = backup_lsn;
// we should check whether to shut down offloader, but this will be done
// soon by peer communication anyway.
Ok(())

View File

@@ -323,8 +323,7 @@ impl WalBackupTask {
}
match backup_lsn_range(
&self.timeline,
&mut backup_lsn,
backup_lsn,
commit_lsn,
self.wal_seg_size,
&self.timeline_dir,
@@ -332,7 +331,13 @@ impl WalBackupTask {
)
.await
{
Ok(()) => {
Ok(backup_lsn_result) => {
backup_lsn = backup_lsn_result;
let res = self.timeline.set_wal_backup_lsn(backup_lsn_result);
if let Err(e) = res {
error!("failed to set wal_backup_lsn: {}", e);
return;
}
retry_attempt = 0;
}
Err(e) => {
@@ -349,25 +354,20 @@ impl WalBackupTask {
}
pub async fn backup_lsn_range(
timeline: &Arc<Timeline>,
backup_lsn: &mut Lsn,
start_lsn: Lsn,
end_lsn: Lsn,
wal_seg_size: usize,
timeline_dir: &Path,
workspace_dir: &Path,
) -> Result<()> {
let start_lsn = *backup_lsn;
) -> Result<Lsn> {
let mut res = start_lsn;
let segments = get_segments(start_lsn, end_lsn, wal_seg_size);
for s in &segments {
backup_single_segment(s, timeline_dir, workspace_dir)
.await
.with_context(|| format!("offloading segno {}", s.seg_no))?;
let new_backup_lsn = s.end_lsn;
timeline
.set_wal_backup_lsn(new_backup_lsn)
.context("setting wal_backup_lsn")?;
*backup_lsn = new_backup_lsn;
res = s.end_lsn;
}
info!(
"offloaded segnos {:?} up to {}, previous backup_lsn {}",
@@ -375,7 +375,7 @@ pub async fn backup_lsn_range(
end_lsn,
start_lsn,
);
Ok(())
Ok(res)
}
async fn backup_single_segment(

View File

@@ -4,9 +4,8 @@
//!
use anyhow::{Context, Result};
use postgres_backend::QueryError;
use std::{future, thread, time::Duration};
use std::{future, thread};
use tokio::net::TcpStream;
use tokio_io_timeout::TimeoutReader;
use tracing::*;
use utils::measured_stream::MeasuredStream;
@@ -68,52 +67,41 @@ fn handle_socket(
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
let local = tokio::task::LocalSet::new();
socket.set_nodelay(true)?;
let peer_addr = socket.peer_addr()?;
// TimeoutReader wants async runtime during creation.
runtime.block_on(async move {
// Set timeout on reading from the socket. It prevents hanged up connection
// if client suddenly disappears. Note that TCP_KEEPALIVE is not enabled by
// default, and tokio doesn't provide ability to set it out of the box.
let mut socket = TimeoutReader::new(socket);
let wal_service_timeout = Duration::from_secs(60 * 10);
socket.set_timeout(Some(wal_service_timeout));
// pin! is here because TimeoutReader (due to storing sleep future inside)
// is not Unpin, and all pgbackend/framed/tokio dependencies require stream
// to be Unpin. Which is reasonable, as indeed something like TimeoutReader
// shouldn't be moved.
tokio::pin!(socket);
let traffic_metrics = TrafficMetrics::new();
if let Some(current_az) = conf.availability_zone.as_deref() {
traffic_metrics.set_sk_az(current_az);
}
let traffic_metrics = TrafficMetrics::new();
if let Some(current_az) = conf.availability_zone.as_deref() {
traffic_metrics.set_sk_az(current_az);
}
let socket = MeasuredStream::new(
socket,
|cnt| {
traffic_metrics.observe_read(cnt);
},
|cnt| {
traffic_metrics.observe_write(cnt);
},
);
let socket = MeasuredStream::new(
socket,
|cnt| {
traffic_metrics.observe_read(cnt);
},
|cnt| {
traffic_metrics.observe_write(cnt);
},
);
let auth_type = match conf.auth {
None => AuthType::Trust,
Some(_) => AuthType::NeonJWT,
};
let mut conn_handler =
SafekeeperPostgresHandler::new(conf, conn_id, Some(traffic_metrics.clone()));
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
// libpq protocol between safekeeper and walproposer / pageserver
// We don't use shutdown.
local.block_on(
&runtime,
pgbackend.run(&mut conn_handler, future::pending::<()>),
)?;
let auth_type = match conf.auth {
None => AuthType::Trust,
Some(_) => AuthType::NeonJWT,
};
let mut conn_handler =
SafekeeperPostgresHandler::new(conf, conn_id, Some(traffic_metrics.clone()));
let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?;
// libpq protocol between safekeeper and walproposer / pageserver
// We don't use shutdown.
pgbackend
.run(&mut conn_handler, future::pending::<()>)
.await
})
Ok(())
}
/// Unique WAL service connection ids are logged in spans for observability.

View File

@@ -114,7 +114,7 @@ class NeonCompare(PgCompare):
self.timeline = self.env.neon_cli.create_timeline(branch_name, tenant_id=self.tenant)
# Start pg
self._pg = self.env.postgres.create_start(branch_name, "main", self.tenant)
self._pg = self.env.endpoints.create_start(branch_name, "main", self.tenant)
@property
def pg(self) -> PgProtocol:

File diff suppressed because it is too large Load Diff

View File

@@ -1,545 +0,0 @@
from __future__ import annotations
import time
from collections import defaultdict
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
import requests
from fixtures.log_helper import log
from fixtures.metrics import Metrics, parse_metrics
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import Fn
class PageserverApiException(Exception):
def __init__(self, message, status_code: int):
super().__init__(message)
self.status_code = status_code
@dataclass
class InMemoryLayerInfo:
kind: str
lsn_start: str
lsn_end: Optional[str]
@classmethod
def from_json(cls, d: Dict[str, Any]) -> InMemoryLayerInfo:
return InMemoryLayerInfo(
kind=d["kind"],
lsn_start=d["lsn_start"],
lsn_end=d.get("lsn_end"),
)
@dataclass(frozen=True)
class HistoricLayerInfo:
kind: str
layer_file_name: str
layer_file_size: Optional[int]
lsn_start: str
lsn_end: Optional[str]
remote: bool
@classmethod
def from_json(cls, d: Dict[str, Any]) -> HistoricLayerInfo:
return HistoricLayerInfo(
kind=d["kind"],
layer_file_name=d["layer_file_name"],
layer_file_size=d.get("layer_file_size"),
lsn_start=d["lsn_start"],
lsn_end=d.get("lsn_end"),
remote=d["remote"],
)
@dataclass
class LayerMapInfo:
in_memory_layers: List[InMemoryLayerInfo]
historic_layers: List[HistoricLayerInfo]
@classmethod
def from_json(cls, d: Dict[str, Any]) -> LayerMapInfo:
info = LayerMapInfo(in_memory_layers=[], historic_layers=[])
json_in_memory_layers = d["in_memory_layers"]
assert isinstance(json_in_memory_layers, List)
for json_in_memory_layer in json_in_memory_layers:
info.in_memory_layers.append(InMemoryLayerInfo.from_json(json_in_memory_layer))
json_historic_layers = d["historic_layers"]
assert isinstance(json_historic_layers, List)
for json_historic_layer in json_historic_layers:
info.historic_layers.append(HistoricLayerInfo.from_json(json_historic_layer))
return info
def kind_count(self) -> Dict[str, int]:
counts: Dict[str, int] = defaultdict(int)
for inmem_layer in self.in_memory_layers:
counts[inmem_layer.kind] += 1
for hist_layer in self.historic_layers:
counts[hist_layer.kind] += 1
return counts
@dataclass
class TenantConfig:
tenant_specific_overrides: Dict[str, Any]
effective_config: Dict[str, Any]
@classmethod
def from_json(cls, d: Dict[str, Any]) -> TenantConfig:
return TenantConfig(
tenant_specific_overrides=d["tenant_specific_overrides"],
effective_config=d["effective_config"],
)
class PageserverHttpClient(requests.Session):
def __init__(self, port: int, is_testing_enabled_or_skip: Fn, auth_token: Optional[str] = None):
super().__init__()
self.port = port
self.auth_token = auth_token
self.is_testing_enabled_or_skip = is_testing_enabled_or_skip
if auth_token is not None:
self.headers["Authorization"] = f"Bearer {auth_token}"
def verbose_error(self, res: requests.Response):
try:
res.raise_for_status()
except requests.RequestException as e:
try:
msg = res.json()["msg"]
except: # noqa: E722
msg = ""
raise PageserverApiException(msg, res.status_code) from e
def check_status(self):
self.get(f"http://localhost:{self.port}/v1/status").raise_for_status()
def configure_failpoints(self, config_strings: Tuple[str, str] | List[Tuple[str, str]]):
self.is_testing_enabled_or_skip()
if isinstance(config_strings, tuple):
pairs = [config_strings]
else:
pairs = config_strings
log.info(f"Requesting config failpoints: {repr(pairs)}")
res = self.put(
f"http://localhost:{self.port}/v1/failpoints",
json=[{"name": name, "actions": actions} for name, actions in pairs],
)
log.info(f"Got failpoints request response code {res.status_code}")
self.verbose_error(res)
res_json = res.json()
assert res_json is None
return res_json
def tenant_list(self) -> List[Dict[Any, Any]]:
res = self.get(f"http://localhost:{self.port}/v1/tenant")
self.verbose_error(res)
res_json = res.json()
assert isinstance(res_json, list)
return res_json
def tenant_create(self, new_tenant_id: Optional[TenantId] = None) -> TenantId:
res = self.post(
f"http://localhost:{self.port}/v1/tenant",
json={
"new_tenant_id": str(new_tenant_id) if new_tenant_id else None,
},
)
self.verbose_error(res)
if res.status_code == 409:
raise Exception(f"could not create tenant: already exists for id {new_tenant_id}")
new_tenant_id = res.json()
assert isinstance(new_tenant_id, str)
return TenantId(new_tenant_id)
def tenant_attach(self, tenant_id: TenantId):
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/attach")
self.verbose_error(res)
def tenant_detach(self, tenant_id: TenantId, detach_ignored=False):
params = {}
if detach_ignored:
params["detach_ignored"] = "true"
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/detach", params=params)
self.verbose_error(res)
def tenant_load(self, tenant_id: TenantId):
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/load")
self.verbose_error(res)
def tenant_ignore(self, tenant_id: TenantId):
res = self.post(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/ignore")
self.verbose_error(res)
def tenant_status(self, tenant_id: TenantId) -> Dict[Any, Any]:
res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id}")
self.verbose_error(res)
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
def tenant_config(self, tenant_id: TenantId) -> TenantConfig:
res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/config")
self.verbose_error(res)
return TenantConfig.from_json(res.json())
def set_tenant_config(self, tenant_id: TenantId, config: dict[str, Any]):
assert "tenant_id" not in config.keys()
res = self.put(
f"http://localhost:{self.port}/v1/tenant/config",
json={**config, "tenant_id": str(tenant_id)},
)
self.verbose_error(res)
def patch_tenant_config_client_side(
self,
tenant_id: TenantId,
inserts: Optional[Dict[str, Any]] = None,
removes: Optional[List[str]] = None,
):
current = self.tenant_config(tenant_id).tenant_specific_overrides
if inserts is not None:
current.update(inserts)
if removes is not None:
for key in removes:
del current[key]
self.set_tenant_config(tenant_id, current)
def tenant_size(self, tenant_id: TenantId) -> int:
return self.tenant_size_and_modelinputs(tenant_id)[0]
def tenant_size_and_modelinputs(self, tenant_id: TenantId) -> Tuple[int, Dict[str, Any]]:
"""
Returns the tenant size, together with the model inputs as the second tuple item.
"""
res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/synthetic_size")
self.verbose_error(res)
res = res.json()
assert isinstance(res, dict)
assert TenantId(res["id"]) == tenant_id
size = res["size"]
assert type(size) == int
inputs = res["inputs"]
assert type(inputs) is dict
return (size, inputs)
def tenant_size_debug(self, tenant_id: TenantId) -> str:
"""
Returns the tenant size debug info, as an HTML string
"""
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/synthetic_size",
headers={"Accept": "text/html"},
)
return res.text
def timeline_list(
self,
tenant_id: TenantId,
include_non_incremental_logical_size: bool = False,
include_timeline_dir_layer_file_size_sum: bool = False,
) -> List[Dict[str, Any]]:
params = {}
if include_non_incremental_logical_size:
params["include-non-incremental-logical-size"] = "true"
if include_timeline_dir_layer_file_size_sum:
params["include-timeline-dir-layer-file-size-sum"] = "true"
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline", params=params
)
self.verbose_error(res)
res_json = res.json()
assert isinstance(res_json, list)
return res_json
def timeline_create(
self,
tenant_id: TenantId,
new_timeline_id: Optional[TimelineId] = None,
ancestor_timeline_id: Optional[TimelineId] = None,
ancestor_start_lsn: Optional[Lsn] = None,
) -> Dict[Any, Any]:
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline",
json={
"new_timeline_id": str(new_timeline_id) if new_timeline_id else None,
"ancestor_start_lsn": str(ancestor_start_lsn) if ancestor_start_lsn else None,
"ancestor_timeline_id": str(ancestor_timeline_id) if ancestor_timeline_id else None,
},
)
self.verbose_error(res)
if res.status_code == 409:
raise Exception(f"could not create timeline: already exists for id {new_timeline_id}")
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
def timeline_detail(
self,
tenant_id: TenantId,
timeline_id: TimelineId,
include_non_incremental_logical_size: bool = False,
include_timeline_dir_layer_file_size_sum: bool = False,
**kwargs,
) -> Dict[Any, Any]:
params = {}
if include_non_incremental_logical_size:
params["include-non-incremental-logical-size"] = "true"
if include_timeline_dir_layer_file_size_sum:
params["include-timeline-dir-layer-file-size-sum"] = "true"
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}",
params=params,
**kwargs,
)
self.verbose_error(res)
res_json = res.json()
assert isinstance(res_json, dict)
return res_json
def timeline_delete(self, tenant_id: TenantId, timeline_id: TimelineId):
res = self.delete(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}"
)
self.verbose_error(res)
res_json = res.json()
assert res_json is None
def timeline_gc(
self, tenant_id: TenantId, timeline_id: TimelineId, gc_horizon: Optional[int]
) -> dict[str, Any]:
self.is_testing_enabled_or_skip()
log.info(
f"Requesting GC: tenant {tenant_id}, timeline {timeline_id}, gc_horizon {repr(gc_horizon)}"
)
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/do_gc",
json={"gc_horizon": gc_horizon},
)
log.info(f"Got GC request response code: {res.status_code}")
self.verbose_error(res)
res_json = res.json()
assert res_json is not None
assert isinstance(res_json, dict)
return res_json
def timeline_compact(self, tenant_id: TenantId, timeline_id: TimelineId):
self.is_testing_enabled_or_skip()
log.info(f"Requesting compact: tenant {tenant_id}, timeline {timeline_id}")
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/compact"
)
log.info(f"Got compact request response code: {res.status_code}")
self.verbose_error(res)
res_json = res.json()
assert res_json is None
def timeline_get_lsn_by_timestamp(
self, tenant_id: TenantId, timeline_id: TimelineId, timestamp
):
log.info(
f"Requesting lsn by timestamp {timestamp}, tenant {tenant_id}, timeline {timeline_id}"
)
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/get_lsn_by_timestamp?timestamp={timestamp}",
)
self.verbose_error(res)
res_json = res.json()
return res_json
def timeline_checkpoint(self, tenant_id: TenantId, timeline_id: TimelineId):
self.is_testing_enabled_or_skip()
log.info(f"Requesting checkpoint: tenant {tenant_id}, timeline {timeline_id}")
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/checkpoint"
)
log.info(f"Got checkpoint request response code: {res.status_code}")
self.verbose_error(res)
res_json = res.json()
assert res_json is None
def timeline_spawn_download_remote_layers(
self,
tenant_id: TenantId,
timeline_id: TimelineId,
max_concurrent_downloads: int,
) -> dict[str, Any]:
body = {
"max_concurrent_downloads": max_concurrent_downloads,
}
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/download_remote_layers",
json=body,
)
self.verbose_error(res)
res_json = res.json()
assert res_json is not None
assert isinstance(res_json, dict)
return res_json
def timeline_poll_download_remote_layers_status(
self,
tenant_id: TenantId,
timeline_id: TimelineId,
spawn_response: dict[str, Any],
poll_state=None,
) -> None | dict[str, Any]:
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/download_remote_layers",
)
self.verbose_error(res)
res_json = res.json()
assert res_json is not None
assert isinstance(res_json, dict)
# assumption in this API client here is that nobody else spawns the task
assert res_json["task_id"] == spawn_response["task_id"]
if poll_state is None or res_json["state"] == poll_state:
return res_json
return None
def timeline_download_remote_layers(
self,
tenant_id: TenantId,
timeline_id: TimelineId,
max_concurrent_downloads: int,
errors_ok=False,
at_least_one_download=True,
):
res = self.timeline_spawn_download_remote_layers(
tenant_id, timeline_id, max_concurrent_downloads
)
while True:
completed = self.timeline_poll_download_remote_layers_status(
tenant_id, timeline_id, res, poll_state="Completed"
)
if not completed:
time.sleep(0.1)
continue
if not errors_ok:
assert completed["failed_download_count"] == 0
if at_least_one_download:
assert completed["successful_download_count"] > 0
return completed
def get_metrics_str(self) -> str:
"""You probably want to use get_metrics() instead."""
res = self.get(f"http://localhost:{self.port}/metrics")
self.verbose_error(res)
return res.text
def get_metrics(self) -> Metrics:
res = self.get_metrics_str()
return parse_metrics(res)
def get_timeline_metric(
self, tenant_id: TenantId, timeline_id: TimelineId, metric_name: str
) -> float:
metrics = self.get_metrics()
return metrics.query_one(
metric_name,
filter={
"tenant_id": str(tenant_id),
"timeline_id": str(timeline_id),
},
).value
def get_remote_timeline_client_metric(
self,
metric_name: str,
tenant_id: TenantId,
timeline_id: TimelineId,
file_kind: str,
op_kind: str,
) -> Optional[float]:
metrics = self.get_metrics()
matches = metrics.query_all(
name=metric_name,
filter={
"tenant_id": str(tenant_id),
"timeline_id": str(timeline_id),
"file_kind": str(file_kind),
"op_kind": str(op_kind),
},
)
if len(matches) == 0:
value = None
elif len(matches) == 1:
value = matches[0].value
assert value is not None
else:
assert len(matches) < 2, "above filter should uniquely identify metric"
return value
def get_metric_value(
self, name: str, filter: Optional[Dict[str, str]] = None
) -> Optional[float]:
metrics = self.get_metrics()
results = metrics.query_all(name, filter=filter)
if not results:
log.info(f'could not find metric "{name}"')
return None
assert len(results) == 1, f"metric {name} with given filters is not unique, got: {results}"
return results[0].value
def layer_map_info(
self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> LayerMapInfo:
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/layer/",
)
self.verbose_error(res)
return LayerMapInfo.from_json(res.json())
def download_layer(self, tenant_id: TenantId, timeline_id: TimelineId, layer_name: str):
res = self.get(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/layer/{layer_name}",
)
self.verbose_error(res)
assert res.status_code == 200
def evict_layer(self, tenant_id: TenantId, timeline_id: TimelineId, layer_name: str):
res = self.delete(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/layer/{layer_name}",
)
self.verbose_error(res)
assert res.status_code == 200
def evict_all_layers(self, tenant_id: TenantId, timeline_id: TimelineId):
info = self.layer_map_info(tenant_id, timeline_id)
for layer in info.historic_layers:
self.evict_layer(tenant_id, timeline_id, layer.layer_file_name)
def disk_usage_eviction_run(self, request: dict[str, Any]):
res = self.put(
f"http://localhost:{self.port}/v1/disk_usage_eviction/run",
json=request,
)
self.verbose_error(res)
return res.json()
def tenant_break(self, tenant_id: TenantId):
res = self.put(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/break")
self.verbose_error(res)

View File

@@ -1,145 +0,0 @@
import time
from fixtures.log_helper import log
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.types import Lsn, TenantId, TimelineId
def assert_tenant_status(
pageserver_http: PageserverHttpClient, tenant: TenantId, expected_status: str
):
tenant_status = pageserver_http.tenant_status(tenant)
log.info(f"tenant_status: {tenant_status}")
assert tenant_status["state"] == expected_status, tenant_status
def tenant_exists(pageserver_http: PageserverHttpClient, tenant_id: TenantId):
tenants = pageserver_http.tenant_list()
matching = [t for t in tenants if TenantId(t["id"]) == tenant_id]
assert len(matching) < 2
if len(matching) == 0:
return None
return matching[0]
def remote_consistent_lsn(
pageserver_http: PageserverHttpClient, tenant: TenantId, timeline: TimelineId
) -> Lsn:
detail = pageserver_http.timeline_detail(tenant, timeline)
if detail["remote_consistent_lsn"] is None:
# No remote information at all. This happens right after creating
# a timeline, before any part of it has been uploaded to remote
# storage yet.
return Lsn(0)
else:
lsn_str = detail["remote_consistent_lsn"]
assert isinstance(lsn_str, str)
return Lsn(lsn_str)
def wait_for_upload(
pageserver_http: PageserverHttpClient,
tenant: TenantId,
timeline: TimelineId,
lsn: Lsn,
):
"""waits for local timeline upload up to specified lsn"""
for i in range(20):
current_lsn = remote_consistent_lsn(pageserver_http, tenant, timeline)
if current_lsn >= lsn:
log.info("wait finished")
return
log.info(
"waiting for remote_consistent_lsn to reach {}, now {}, iteration {}".format(
lsn, current_lsn, i + 1
)
)
time.sleep(1)
raise Exception(
"timed out while waiting for remote_consistent_lsn to reach {}, was {}".format(
lsn, current_lsn
)
)
def wait_until_tenant_state(
pageserver_http: PageserverHttpClient,
tenant_id: TenantId,
expected_state: str,
iterations: int,
) -> bool:
"""
Does not use `wait_until` for debugging purposes
"""
for _ in range(iterations):
try:
tenant = pageserver_http.tenant_status(tenant_id=tenant_id)
log.debug(f"Tenant {tenant_id} data: {tenant}")
if tenant["state"] == expected_state:
return True
except Exception as e:
log.debug(f"Tenant {tenant_id} state retrieval failure: {e}")
time.sleep(1)
raise Exception(f"Tenant {tenant_id} did not become {expected_state} in {iterations} seconds")
def wait_until_tenant_active(
pageserver_http: PageserverHttpClient, tenant_id: TenantId, iterations: int = 30
):
wait_until_tenant_state(
pageserver_http, tenant_id, expected_state="Active", iterations=iterations
)
def last_record_lsn(
pageserver_http_client: PageserverHttpClient, tenant: TenantId, timeline: TimelineId
) -> Lsn:
detail = pageserver_http_client.timeline_detail(tenant, timeline)
lsn_str = detail["last_record_lsn"]
assert isinstance(lsn_str, str)
return Lsn(lsn_str)
def wait_for_last_record_lsn(
pageserver_http: PageserverHttpClient,
tenant: TenantId,
timeline: TimelineId,
lsn: Lsn,
) -> Lsn:
"""waits for pageserver to catch up to a certain lsn, returns the last observed lsn."""
for i in range(10):
current_lsn = last_record_lsn(pageserver_http, tenant, timeline)
if current_lsn >= lsn:
return current_lsn
log.info(
"waiting for last_record_lsn to reach {}, now {}, iteration {}".format(
lsn, current_lsn, i + 1
)
)
time.sleep(1)
raise Exception(
"timed out while waiting for last_record_lsn to reach {}, was {}".format(lsn, current_lsn)
)
def wait_for_upload_queue_empty(
pageserver_http: PageserverHttpClient, tenant_id: TenantId, timeline_id: TimelineId
):
while True:
all_metrics = pageserver_http.get_metrics()
tl = all_metrics.query_all(
"pageserver_remote_timeline_client_calls_unfinished",
{
"tenant_id": str(tenant_id),
"timeline_id": str(timeline_id),
},
)
assert len(tl) > 0
log.info(f"upload queue for {tenant_id}/{timeline_id}: {tl}")
if all(m.value == 0 for m in tl):
return
time.sleep(0.2)

View File

@@ -1,5 +1,4 @@
import contextlib
import json
import os
import re
import subprocess
@@ -7,7 +6,6 @@ import tarfile
import time
from pathlib import Path
from typing import Any, Callable, Dict, List, Tuple, TypeVar
from urllib.parse import urlencode
import allure
from psycopg2.extensions import cursor
@@ -186,46 +184,6 @@ def allure_attach_from_dir(dir: Path):
allure.attach.file(source, name, attachment_type, extension)
DATASOURCE_ID = "xHHYY0dVz"
def allure_add_grafana_links(host: str, start_ms: int, end_ms: int):
"""Add links to server logs in Grafana to Allure report"""
# We expect host to be in format like ep-divine-night-159320.us-east-2.aws.neon.build
endpoint_id, region_id, _ = host.split(".", 2)
expressions = {
"compute logs": f'{{app="compute-node-{endpoint_id}", neon_region="{region_id}"}}',
"k8s events": f'{{job="integrations/kubernetes/eventhandler"}} |~ "name=compute-node-{endpoint_id}-"',
"console logs": f'{{neon_service="console", neon_region="{region_id}"}} | json | endpoint_id = "{endpoint_id}"',
"proxy logs": f'{{neon_service="proxy-scram", neon_region="{region_id}"}}',
}
params: Dict[str, Any] = {
"datasource": DATASOURCE_ID,
"queries": [
{
"expr": "<PUT AN EXPRESSION HERE>",
"refId": "A",
"datasource": {"type": "loki", "uid": DATASOURCE_ID},
"editorMode": "code",
"queryType": "range",
}
],
"range": {
"from": str(start_ms),
"to": str(end_ms),
},
}
for name, expr in expressions.items():
params["queries"][0]["expr"] = expr
query_string = urlencode({"orgId": 1, "left": json.dumps(params)})
link = f"https://neonprod.grafana.net/explore?{query_string}"
allure.dynamic.link(link, name=name)
log.info(f"{name}: {link}")
def start_in_background(
command: list[str], cwd: Path, log_file_name: str, is_started: Fn
) -> subprocess.Popen[bytes]:
@@ -278,19 +236,3 @@ def wait_until(number_of_iterations: int, interval: float, func: Fn):
continue
return res
raise Exception("timed out while waiting for %s" % func) from last_exception
def wait_while(number_of_iterations: int, interval: float, func):
"""
Wait until 'func' returns false, or throws an exception.
"""
for i in range(number_of_iterations):
try:
if not func():
return
log.info("waiting for %s iteration %s failed", func, i + 1)
time.sleep(interval)
continue
except Exception:
return
raise Exception("timed out while waiting for %s" % func)

View File

@@ -10,7 +10,7 @@ import pytest
from fixtures.benchmark_fixture import MetricReport
from fixtures.compare_fixtures import NeonCompare
from fixtures.log_helper import log
from fixtures.pageserver.utils import wait_for_last_record_lsn
from fixtures.neon_fixtures import wait_for_last_record_lsn
from fixtures.types import Lsn
@@ -52,13 +52,13 @@ def test_branch_creation_heavy_write(neon_compare: NeonCompare, n_branches: int)
def run_pgbench(branch: str):
log.info(f"Start a pgbench workload on branch {branch}")
pg = env.postgres.create_start(branch, tenant_id=tenant)
connstr = pg.connstr()
endpoint = env.endpoints.create_start(branch, tenant_id=tenant)
connstr = endpoint.connstr()
pg_bin.run_capture(["pgbench", "-i", connstr])
pg_bin.run_capture(["pgbench", "-c10", "-T10", connstr])
pg.stop()
endpoint.stop()
env.neon_cli.create_branch("b0", tenant_id=tenant)
@@ -96,8 +96,8 @@ def test_branch_creation_many(neon_compare: NeonCompare, n_branches: int):
env.neon_cli.create_branch("b0")
pg = env.postgres.create_start("b0")
neon_compare.pg_bin.run_capture(["pgbench", "-i", "-s10", pg.connstr()])
endpoint = env.endpoints.create_start("b0")
neon_compare.pg_bin.run_capture(["pgbench", "-i", "-s10", endpoint.connstr()])
branch_creation_durations = []
@@ -124,15 +124,15 @@ def test_branch_creation_many_relations(neon_compare: NeonCompare):
timeline_id = env.neon_cli.create_branch("root")
pg = env.postgres.create_start("root")
with closing(pg.connect()) as conn:
endpoint = env.endpoints.create_start("root")
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
for i in range(10000):
cur.execute(f"CREATE TABLE t{i} as SELECT g FROM generate_series(1, 1000) g")
# Wait for the pageserver to finish processing all the pending WALs,
# as we don't want the LSN wait time to be included during the branch creation
flush_lsn = Lsn(pg.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
flush_lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
wait_for_last_record_lsn(
env.pageserver.http_client(), env.initial_tenant, timeline_id, flush_lsn
)
@@ -142,7 +142,7 @@ def test_branch_creation_many_relations(neon_compare: NeonCompare):
# run a concurrent insertion to make the ancestor "busy" during the branch creation
thread = threading.Thread(
target=pg.safe_psql, args=("INSERT INTO t0 VALUES (generate_series(1, 100000))",)
target=endpoint.safe_psql, args=("INSERT INTO t0 VALUES (generate_series(1, 100000))",)
)
thread.start()

View File

@@ -42,41 +42,41 @@ def test_compare_child_and_root_pgbench_perf(neon_compare: NeonCompare):
neon_compare.zenbenchmark.record_pg_bench_result(branch, res)
env.neon_cli.create_branch("root")
pg_root = env.postgres.create_start("root")
pg_bin.run_capture(["pgbench", "-i", pg_root.connstr(), "-s10"])
endpoint_root = env.endpoints.create_start("root")
pg_bin.run_capture(["pgbench", "-i", endpoint_root.connstr(), "-s10"])
fork_at_current_lsn(env, pg_root, "child", "root")
fork_at_current_lsn(env, endpoint_root, "child", "root")
pg_child = env.postgres.create_start("child")
endpoint_child = env.endpoints.create_start("child")
run_pgbench_on_branch("root", ["pgbench", "-c10", "-T10", pg_root.connstr()])
run_pgbench_on_branch("child", ["pgbench", "-c10", "-T10", pg_child.connstr()])
run_pgbench_on_branch("root", ["pgbench", "-c10", "-T10", endpoint_root.connstr()])
run_pgbench_on_branch("child", ["pgbench", "-c10", "-T10", endpoint_child.connstr()])
def test_compare_child_and_root_write_perf(neon_compare: NeonCompare):
env = neon_compare.env
env.neon_cli.create_branch("root")
pg_root = env.postgres.create_start("root")
endpoint_root = env.endpoints.create_start("root")
pg_root.safe_psql(
endpoint_root.safe_psql(
"CREATE TABLE foo(key serial primary key, t text default 'foooooooooooooooooooooooooooooooooooooooooooooooooooo')",
)
env.neon_cli.create_branch("child", "root")
pg_child = env.postgres.create_start("child")
endpoint_child = env.endpoints.create_start("child")
with neon_compare.record_duration("root_run_duration"):
pg_root.safe_psql("INSERT INTO foo SELECT FROM generate_series(1,1000000)")
endpoint_root.safe_psql("INSERT INTO foo SELECT FROM generate_series(1,1000000)")
with neon_compare.record_duration("child_run_duration"):
pg_child.safe_psql("INSERT INTO foo SELECT FROM generate_series(1,1000000)")
endpoint_child.safe_psql("INSERT INTO foo SELECT FROM generate_series(1,1000000)")
def test_compare_child_and_root_read_perf(neon_compare: NeonCompare):
env = neon_compare.env
env.neon_cli.create_branch("root")
pg_root = env.postgres.create_start("root")
endpoint_root = env.endpoints.create_start("root")
pg_root.safe_psql_many(
endpoint_root.safe_psql_many(
[
"CREATE TABLE foo(key serial primary key, t text default 'foooooooooooooooooooooooooooooooooooooooooooooooooooo')",
"INSERT INTO foo SELECT FROM generate_series(1,1000000)",
@@ -84,12 +84,12 @@ def test_compare_child_and_root_read_perf(neon_compare: NeonCompare):
)
env.neon_cli.create_branch("child", "root")
pg_child = env.postgres.create_start("child")
endpoint_child = env.endpoints.create_start("child")
with neon_compare.record_duration("root_run_duration"):
pg_root.safe_psql("SELECT count(*) from foo")
endpoint_root.safe_psql("SELECT count(*) from foo")
with neon_compare.record_duration("child_run_duration"):
pg_child.safe_psql("SELECT count(*) from foo")
endpoint_child.safe_psql("SELECT count(*) from foo")
# -----------------------------------------------------------------------

View File

@@ -35,14 +35,14 @@ def test_bulk_tenant_create(
# if use_safekeepers == 'with_sa':
# wa_factory.start_n_new(3)
pg_tenant = env.postgres.create_start(
endpoint_tenant = env.endpoints.create_start(
f"test_bulk_tenant_create_{tenants_count}_{i}", tenant_id=tenant
)
end = timeit.default_timer()
time_slices.append(end - start)
pg_tenant.stop()
endpoint_tenant.stop()
zenbenchmark.record(
"tenant_creation_time",

View File

@@ -18,8 +18,8 @@ def test_bulk_update(neon_env_builder: NeonEnvBuilder, zenbenchmark, fillfactor)
timeline_id = env.neon_cli.create_branch("test_bulk_update")
tenant_id = env.initial_tenant
pg = env.postgres.create_start("test_bulk_update")
cur = pg.connect().cursor()
endpoint = env.endpoints.create_start("test_bulk_update")
cur = endpoint.connect().cursor()
cur.execute("set statement_timeout=0")
cur.execute(f"create table t(x integer) WITH (fillfactor={fillfactor})")
@@ -28,13 +28,13 @@ def test_bulk_update(neon_env_builder: NeonEnvBuilder, zenbenchmark, fillfactor)
cur.execute(f"insert into t values (generate_series(1,{n_records}))")
cur.execute("vacuum t")
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
with zenbenchmark.record_duration("update-no-prefetch"):
cur.execute("update t set x=x+1")
cur.execute("vacuum t")
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
with zenbenchmark.record_duration("delete-no-prefetch"):
cur.execute("delete from t")
@@ -50,13 +50,13 @@ def test_bulk_update(neon_env_builder: NeonEnvBuilder, zenbenchmark, fillfactor)
cur.execute(f"insert into t2 values (generate_series(1,{n_records}))")
cur.execute("vacuum t2")
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
with zenbenchmark.record_duration("update-with-prefetch"):
cur.execute("update t2 set x=x+1")
cur.execute("vacuum t2")
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
with zenbenchmark.record_duration("delete-with-prefetch"):
cur.execute("delete from t2")

View File

@@ -33,11 +33,11 @@ def test_compaction(neon_compare: NeonCompare):
# Create some tables, and run a bunch of INSERTs and UPDATes on them,
# to generate WAL and layers
pg = env.postgres.create_start(
endpoint = env.endpoints.create_start(
"main", tenant_id=tenant_id, config_lines=["shared_buffers=512MB"]
)
with closing(pg.connect()) as conn:
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
for i in range(100):
cur.execute(f"create table tbl{i} (i int, j int);")
@@ -45,7 +45,7 @@ def test_compaction(neon_compare: NeonCompare):
for j in range(100):
cur.execute(f"update tbl{i} set j = {j};")
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
# First compaction generates L1 layers
with neon_compare.zenbenchmark.record_duration("compaction"):

View File

@@ -2,13 +2,13 @@ import threading
import pytest
from fixtures.compare_fixtures import PgCompare
from fixtures.neon_fixtures import Postgres
from fixtures.neon_fixtures import PgProtocol
from performance.test_perf_pgbench import get_scales_matrix
from performance.test_wal_backpressure import record_read_latency
def start_write_workload(pg: Postgres, scale: int = 10):
def start_write_workload(pg: PgProtocol, scale: int = 10):
with pg.connect().cursor() as cur:
cur.execute(f"create table big as select generate_series(1,{scale*100_000})")

View File

@@ -25,8 +25,8 @@ def test_layer_map(neon_env_builder: NeonEnvBuilder, zenbenchmark):
)
env.neon_cli.create_timeline("test_layer_map", tenant_id=tenant)
pg = env.postgres.create_start("test_layer_map", tenant_id=tenant)
cur = pg.connect().cursor()
endpoint = env.endpoints.create_start("test_layer_map", tenant_id=tenant)
cur = endpoint.connect().cursor()
cur.execute("create table t(x integer)")
for i in range(n_iters):
cur.execute(f"insert into t values (generate_series(1,{n_records}))")

View File

@@ -14,19 +14,19 @@ def test_startup(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker
# Start
env.neon_cli.create_branch("test_startup")
with zenbenchmark.record_duration("startup_time"):
pg = env.postgres.create_start("test_startup")
pg.safe_psql("select 1;")
endpoint = env.endpoints.create_start("test_startup")
endpoint.safe_psql("select 1;")
# Restart
pg.stop_and_destroy()
endpoint.stop_and_destroy()
with zenbenchmark.record_duration("restart_time"):
pg.create_start("test_startup")
pg.safe_psql("select 1;")
endpoint.create_start("test_startup")
endpoint.safe_psql("select 1;")
# Fill up
num_rows = 1000000 # 30 MB
num_tables = 100
with closing(pg.connect()) as conn:
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
for i in range(num_tables):
cur.execute(f"create table t_{i} (i integer);")
@@ -34,18 +34,18 @@ def test_startup(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker
# Read
with zenbenchmark.record_duration("read_time"):
pg.safe_psql("select * from t_0;")
endpoint.safe_psql("select * from t_0;")
# Read again
with zenbenchmark.record_duration("second_read_time"):
pg.safe_psql("select * from t_0;")
endpoint.safe_psql("select * from t_0;")
# Restart
pg.stop_and_destroy()
endpoint.stop_and_destroy()
with zenbenchmark.record_duration("restart_with_data"):
pg.create_start("test_startup")
pg.safe_psql("select 1;")
endpoint.create_start("test_startup")
endpoint.safe_psql("select 1;")
# Read
with zenbenchmark.record_duration("read_after_restart"):
pg.safe_psql("select * from t_0;")
endpoint.safe_psql("select * from t_0;")

View File

@@ -22,8 +22,8 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
pageserver_http.configure_failpoints(("flush-frozen-before-sync", "sleep(10000)"))
pg_branch0 = env.postgres.create_start("main", tenant_id=tenant)
branch0_cur = pg_branch0.connect().cursor()
endpoint_branch0 = env.endpoints.create_start("main", tenant_id=tenant)
branch0_cur = endpoint_branch0.connect().cursor()
branch0_timeline = TimelineId(query_scalar(branch0_cur, "SHOW neon.timeline_id"))
log.info(f"b0 timeline {branch0_timeline}")
@@ -44,10 +44,10 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
# Create branch1.
env.neon_cli.create_branch("branch1", "main", tenant_id=tenant, ancestor_start_lsn=lsn_100)
pg_branch1 = env.postgres.create_start("branch1", tenant_id=tenant)
endpoint_branch1 = env.endpoints.create_start("branch1", tenant_id=tenant)
log.info("postgres is running on 'branch1' branch")
branch1_cur = pg_branch1.connect().cursor()
branch1_cur = endpoint_branch1.connect().cursor()
branch1_timeline = TimelineId(query_scalar(branch1_cur, "SHOW neon.timeline_id"))
log.info(f"b1 timeline {branch1_timeline}")
@@ -67,9 +67,9 @@ def test_ancestor_branch(neon_env_builder: NeonEnvBuilder):
# Create branch2.
env.neon_cli.create_branch("branch2", "branch1", tenant_id=tenant, ancestor_start_lsn=lsn_200)
pg_branch2 = env.postgres.create_start("branch2", tenant_id=tenant)
endpoint_branch2 = env.endpoints.create_start("branch2", tenant_id=tenant)
log.info("postgres is running on 'branch2' branch")
branch2_cur = pg_branch2.connect().cursor()
branch2_cur = endpoint_branch2.connect().cursor()
branch2_timeline = TimelineId(query_scalar(branch2_cur, "SHOW neon.timeline_id"))
log.info(f"b2 timeline {branch2_timeline}")

View File

@@ -1,8 +1,7 @@
from contextlib import closing
import pytest
from fixtures.neon_fixtures import NeonEnvBuilder, PgProtocol
from fixtures.pageserver.http import PageserverApiException
from fixtures.neon_fixtures import NeonEnvBuilder, PageserverApiException, PgProtocol
from fixtures.types import TenantId
@@ -64,9 +63,9 @@ def test_compute_auth_to_pageserver(neon_env_builder: NeonEnvBuilder):
branch = "test_compute_auth_to_pageserver"
env.neon_cli.create_branch(branch)
pg = env.postgres.create_start(branch)
endpoint = env.endpoints.create_start(branch)
with closing(pg.connect()) as conn:
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
# we rely upon autocommit after each statement
# as waiting for acceptors happens there
@@ -83,7 +82,7 @@ def test_auth_failures(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
branch = f"test_auth_failures_auth_enabled_{auth_enabled}"
timeline_id = env.neon_cli.create_branch(branch)
env.postgres.create_start(branch)
env.endpoints.create_start(branch)
tenant_token = env.auth_keys.generate_tenant_token(env.initial_tenant)
invalid_tenant_token = env.auth_keys.generate_tenant_token(TenantId.generate())

View File

@@ -5,7 +5,7 @@ from contextlib import closing, contextmanager
import psycopg2.extras
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, Postgres
from fixtures.neon_fixtures import Endpoint, NeonEnvBuilder
pytest_plugins = "fixtures.neon_fixtures"
@@ -20,10 +20,10 @@ def pg_cur(pg):
# Periodically check that all backpressure lags are below the configured threshold,
# assert if they are not.
# If the check query fails, stop the thread. Main thread should notice that and stop the test.
def check_backpressure(pg: Postgres, stop_event: threading.Event, polling_interval=5):
def check_backpressure(endpoint: Endpoint, stop_event: threading.Event, polling_interval=5):
log.info("checks started")
with pg_cur(pg) as cur:
with pg_cur(endpoint) as cur:
cur.execute("CREATE EXTENSION neon") # TODO move it to neon_fixtures?
cur.execute("select pg_size_bytes(current_setting('max_replication_write_lag'))")
@@ -41,7 +41,7 @@ def check_backpressure(pg: Postgres, stop_event: threading.Event, polling_interv
max_replication_apply_lag_bytes = res[0]
log.info(f"max_replication_apply_lag: {max_replication_apply_lag_bytes} bytes")
with pg_cur(pg) as cur:
with pg_cur(endpoint) as cur:
while not stop_event.is_set():
try:
cur.execute(
@@ -102,14 +102,14 @@ def test_backpressure_received_lsn_lag(neon_env_builder: NeonEnvBuilder):
# Create a branch for us
env.neon_cli.create_branch("test_backpressure")
pg = env.postgres.create_start(
endpoint = env.endpoints.create_start(
"test_backpressure", config_lines=["max_replication_write_lag=30MB"]
)
log.info("postgres is running on 'test_backpressure' branch")
# setup check thread
check_stop_event = threading.Event()
check_thread = threading.Thread(target=check_backpressure, args=(pg, check_stop_event))
check_thread = threading.Thread(target=check_backpressure, args=(endpoint, check_stop_event))
check_thread.start()
# Configure failpoint to slow down walreceiver ingest
@@ -125,7 +125,7 @@ def test_backpressure_received_lsn_lag(neon_env_builder: NeonEnvBuilder):
# because of the lag and waiting for lsn to replay to arrive.
time.sleep(2)
with pg_cur(pg) as cur:
with pg_cur(endpoint) as cur:
# Create and initialize test table
cur.execute("CREATE TABLE foo(x bigint)")

View File

@@ -15,4 +15,4 @@ def test_basebackup_error(neon_simple_env: NeonEnv):
pageserver_http.configure_failpoints(("basebackup-before-control-file", "return"))
with pytest.raises(Exception, match="basebackup-before-control-file"):
env.postgres.create_start("test_basebackup_error")
env.endpoints.create_start("test_basebackup_error")

View File

@@ -67,9 +67,9 @@ def test_branch_and_gc(neon_simple_env: NeonEnv):
)
timeline_main = env.neon_cli.create_timeline("test_main", tenant_id=tenant)
pg_main = env.postgres.create_start("test_main", tenant_id=tenant)
endpoint_main = env.endpoints.create_start("test_main", tenant_id=tenant)
main_cur = pg_main.connect().cursor()
main_cur = endpoint_main.connect().cursor()
main_cur.execute(
"CREATE TABLE foo(key serial primary key, t text default 'foooooooooooooooooooooooooooooooooooooooooooooooooooo')"
@@ -90,9 +90,9 @@ def test_branch_and_gc(neon_simple_env: NeonEnv):
env.neon_cli.create_branch(
"test_branch", "test_main", tenant_id=tenant, ancestor_start_lsn=lsn1
)
pg_branch = env.postgres.create_start("test_branch", tenant_id=tenant)
endpoint_branch = env.endpoints.create_start("test_branch", tenant_id=tenant)
branch_cur = pg_branch.connect().cursor()
branch_cur = endpoint_branch.connect().cursor()
branch_cur.execute("INSERT INTO foo SELECT FROM generate_series(1, 100000)")
assert query_scalar(branch_cur, "SELECT count(*) FROM foo") == 200000
@@ -142,8 +142,8 @@ def test_branch_creation_before_gc(neon_simple_env: NeonEnv):
)
b0 = env.neon_cli.create_branch("b0", tenant_id=tenant)
pg0 = env.postgres.create_start("b0", tenant_id=tenant)
res = pg0.safe_psql_many(
endpoint0 = env.endpoints.create_start("b0", tenant_id=tenant)
res = endpoint0.safe_psql_many(
queries=[
"CREATE TABLE t(key serial primary key)",
"INSERT INTO t SELECT FROM generate_series(1, 100000)",

View File

@@ -18,10 +18,10 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder):
# Branch at the point where only 100 rows were inserted
env.neon_cli.create_branch("test_branch_behind")
pgmain = env.postgres.create_start("test_branch_behind")
endpoint_main = env.endpoints.create_start("test_branch_behind")
log.info("postgres is running on 'test_branch_behind' branch")
main_cur = pgmain.connect().cursor()
main_cur = endpoint_main.connect().cursor()
timeline = TimelineId(query_scalar(main_cur, "SHOW neon.timeline_id"))
@@ -74,15 +74,15 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder):
"test_branch_behind_more", "test_branch_behind", ancestor_start_lsn=lsn_b
)
pg_hundred = env.postgres.create_start("test_branch_behind_hundred")
pg_more = env.postgres.create_start("test_branch_behind_more")
endpoint_hundred = env.endpoints.create_start("test_branch_behind_hundred")
endpoint_more = env.endpoints.create_start("test_branch_behind_more")
# On the 'hundred' branch, we should see only 100 rows
hundred_cur = pg_hundred.connect().cursor()
hundred_cur = endpoint_hundred.connect().cursor()
assert query_scalar(hundred_cur, "SELECT count(*) FROM foo") == 100
# On the 'more' branch, we should see 100200 rows
more_cur = pg_more.connect().cursor()
more_cur = endpoint_more.connect().cursor()
assert query_scalar(more_cur, "SELECT count(*) FROM foo") == 200100
# All the rows are visible on the main branch
@@ -94,8 +94,8 @@ def test_branch_behind(neon_env_builder: NeonEnvBuilder):
env.neon_cli.create_branch(
"test_branch_segment_boundary", "test_branch_behind", ancestor_start_lsn=Lsn("0/3000000")
)
pg = env.postgres.create_start("test_branch_segment_boundary")
assert pg.safe_psql("SELECT 1")[0][0] == 1
endpoint = env.endpoints.create_start("test_branch_segment_boundary")
assert endpoint.safe_psql("SELECT 1")[0][0] == 1
# branch at pre-initdb lsn
with pytest.raises(Exception, match="invalid branch start lsn: .*"):

View File

@@ -5,7 +5,7 @@ from typing import List
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, PgBin, Postgres
from fixtures.neon_fixtures import Endpoint, NeonEnv, PgBin
from fixtures.types import Lsn
from fixtures.utils import query_scalar
from performance.test_perf_pgbench import get_scales_matrix
@@ -40,20 +40,20 @@ def test_branching_with_pgbench(
}
)
def run_pgbench(pg: Postgres):
connstr = pg.connstr()
def run_pgbench(connstr: str):
log.info(f"Start a pgbench workload on pg {connstr}")
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])
pg_bin.run_capture(["pgbench", "-T15", connstr])
env.neon_cli.create_branch("b0", tenant_id=tenant)
pgs: List[Postgres] = []
pgs.append(env.postgres.create_start("b0", tenant_id=tenant))
endpoints: List[Endpoint] = []
endpoints.append(env.endpoints.create_start("b0", tenant_id=tenant))
threads: List[threading.Thread] = []
threads.append(threading.Thread(target=run_pgbench, args=(pgs[0],), daemon=True))
threads.append(
threading.Thread(target=run_pgbench, args=(endpoints[0].connstr(),), daemon=True)
)
threads[-1].start()
thread_limit = 4
@@ -79,16 +79,18 @@ def test_branching_with_pgbench(
else:
env.neon_cli.create_branch("b{}".format(i + 1), "b0", tenant_id=tenant)
pgs.append(env.postgres.create_start("b{}".format(i + 1), tenant_id=tenant))
endpoints.append(env.endpoints.create_start("b{}".format(i + 1), tenant_id=tenant))
threads.append(threading.Thread(target=run_pgbench, args=(pgs[-1],), daemon=True))
threads.append(
threading.Thread(target=run_pgbench, args=(endpoints[-1].connstr(),), daemon=True)
)
threads[-1].start()
for thread in threads:
thread.join()
for pg in pgs:
res = pg.safe_psql("SELECT count(*) from pgbench_accounts")
for ep in endpoints:
res = ep.safe_psql("SELECT count(*) from pgbench_accounts")
assert res[0] == (100000 * scale,)
@@ -110,11 +112,11 @@ def test_branching_unnormalized_start_lsn(neon_simple_env: NeonEnv, pg_bin: PgBi
env = neon_simple_env
env.neon_cli.create_branch("b0")
pg0 = env.postgres.create_start("b0")
endpoint0 = env.endpoints.create_start("b0")
pg_bin.run_capture(["pgbench", "-i", pg0.connstr()])
pg_bin.run_capture(["pgbench", "-i", endpoint0.connstr()])
with pg0.cursor() as cur:
with endpoint0.cursor() as cur:
curr_lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_flush_lsn()"))
# Specify the `start_lsn` as a number that is divided by `XLOG_BLCKSZ`
@@ -123,6 +125,6 @@ def test_branching_unnormalized_start_lsn(neon_simple_env: NeonEnv, pg_bin: PgBi
log.info(f"Branching b1 from b0 starting at lsn {start_lsn}...")
env.neon_cli.create_branch("b1", "b0", ancestor_start_lsn=start_lsn)
pg1 = env.postgres.create_start("b1")
endpoint1 = env.endpoints.create_start("b1")
pg_bin.run_capture(["pgbench", "-i", pg1.connstr()])
pg_bin.run_capture(["pgbench", "-i", endpoint1.connstr()])

View File

@@ -4,7 +4,7 @@ from typing import List, Tuple
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, Postgres
from fixtures.neon_fixtures import Endpoint, NeonEnv, NeonEnvBuilder
from fixtures.types import TenantId, TimelineId
@@ -24,17 +24,17 @@ def test_broken_timeline(neon_env_builder: NeonEnvBuilder):
]
)
tenant_timelines: List[Tuple[TenantId, TimelineId, Postgres]] = []
tenant_timelines: List[Tuple[TenantId, TimelineId, Endpoint]] = []
for n in range(4):
tenant_id, timeline_id = env.neon_cli.create_tenant()
pg = env.postgres.create_start("main", tenant_id=tenant_id)
with pg.cursor() as cur:
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
with endpoint.cursor() as cur:
cur.execute("CREATE TABLE t(key int primary key, value text)")
cur.execute("INSERT INTO t SELECT generate_series(1,100), 'payload'")
pg.stop()
tenant_timelines.append((tenant_id, timeline_id, pg))
endpoint.stop()
tenant_timelines.append((tenant_id, timeline_id, endpoint))
# Stop the pageserver
env.pageserver.stop()

View File

@@ -24,14 +24,14 @@ def test_clog_truncate(neon_simple_env: NeonEnv):
"autovacuum_freeze_max_age=100000",
]
pg = env.postgres.create_start("test_clog_truncate", config_lines=config)
endpoint = env.endpoints.create_start("test_clog_truncate", config_lines=config)
log.info("postgres is running on test_clog_truncate branch")
# Install extension containing function needed for test
pg.safe_psql("CREATE EXTENSION neon_test_utils")
endpoint.safe_psql("CREATE EXTENSION neon_test_utils")
# Consume many xids to advance clog
with pg.cursor() as cur:
with endpoint.cursor() as cur:
cur.execute("select test_consume_xids(1000*1000*10);")
log.info("xids consumed")
@@ -44,7 +44,7 @@ def test_clog_truncate(neon_simple_env: NeonEnv):
# wait for autovacuum to truncate the pg_xact
# XXX Is it worth to add a timeout here?
pg_xact_0000_path = os.path.join(pg.pg_xact_dir_path(), "0000")
pg_xact_0000_path = os.path.join(endpoint.pg_xact_dir_path(), "0000")
log.info(f"pg_xact_0000_path = {pg_xact_0000_path}")
while os.path.isfile(pg_xact_0000_path):
@@ -52,7 +52,7 @@ def test_clog_truncate(neon_simple_env: NeonEnv):
time.sleep(5)
# checkpoint to advance latest lsn
with pg.cursor() as cur:
with endpoint.cursor() as cur:
cur.execute("CHECKPOINT;")
lsn_after_truncation = query_scalar(cur, "select pg_current_wal_insert_lsn()")
@@ -61,10 +61,10 @@ def test_clog_truncate(neon_simple_env: NeonEnv):
env.neon_cli.create_branch(
"test_clog_truncate_new", "test_clog_truncate", ancestor_start_lsn=lsn_after_truncation
)
pg2 = env.postgres.create_start("test_clog_truncate_new")
endpoint2 = env.endpoints.create_start("test_clog_truncate_new")
log.info("postgres is running on test_clog_truncate_new branch")
# check that new node doesn't contain truncated segment
pg_xact_0000_path_new = os.path.join(pg2.pg_xact_dir_path(), "0000")
pg_xact_0000_path_new = os.path.join(endpoint2.pg_xact_dir_path(), "0000")
log.info(f"pg_xact_0000_path_new = {pg_xact_0000_path_new}")
assert os.path.isfile(pg_xact_0000_path_new) is False

View File

@@ -24,8 +24,8 @@ def test_lsof_pageserver_pid(neon_simple_env: NeonEnv):
def start_workload():
env.neon_cli.create_branch("test_lsof_pageserver_pid")
pg = env.postgres.create_start("test_lsof_pageserver_pid")
with closing(pg.connect()) as conn:
endpoint = env.endpoints.create_start("test_lsof_pageserver_pid")
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("CREATE TABLE foo as SELECT x FROM generate_series(1,100000) x")
cur.execute("update foo set x=x+1")

View File

@@ -1,3 +1,4 @@
import copy
import os
import shutil
import subprocess
@@ -10,11 +11,12 @@ from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonCli,
NeonEnvBuilder,
PageserverHttpClient,
PgBin,
PortDistributor,
wait_for_last_record_lsn,
wait_for_upload,
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.types import Lsn
from pytest import FixtureRequest
@@ -33,15 +35,9 @@ from pytest import FixtureRequest
# - check_neon_works performs the test itself, feel free to add more checks there.
#
check_ondisk_data_compatibility_if_enabled = pytest.mark.skipif(
os.environ.get("CHECK_ONDISK_DATA_COMPATIBILITY") is None,
reason="CHECK_ONDISK_DATA_COMPATIBILITY env is not set",
)
# Note: if renaming this test, don't forget to update a reference to it in a workflow file:
# "Upload compatibility snapshot" step in .github/actions/run-python-test-set/action.yml
@check_ondisk_data_compatibility_if_enabled
@pytest.mark.xdist_group("compatibility")
@pytest.mark.order(before="test_forward_compatibility")
def test_create_snapshot(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, test_output_dir: Path):
@@ -55,29 +51,31 @@ def test_create_snapshot(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, test_o
neon_env_builder.preserve_database_files = True
env = neon_env_builder.init_start()
pg = env.postgres.create_start("main")
endpoint = env.endpoints.create_start("main")
# FIXME: Is this expected?
env.pageserver.allowed_errors.append(
".*init_tenant_mgr: marking .* as locally complete, while it doesnt exist in remote index.*"
)
pg_bin.run(["pgbench", "--initialize", "--scale=10", pg.connstr()])
pg_bin.run(["pgbench", "--time=60", "--progress=2", pg.connstr()])
pg_bin.run(["pg_dumpall", f"--dbname={pg.connstr()}", f"--file={test_output_dir / 'dump.sql'}"])
pg_bin.run(["pgbench", "--initialize", "--scale=10", endpoint.connstr()])
pg_bin.run(["pgbench", "--time=60", "--progress=2", endpoint.connstr()])
pg_bin.run(
["pg_dumpall", f"--dbname={endpoint.connstr()}", f"--file={test_output_dir / 'dump.sql'}"]
)
snapshot_config = toml.load(test_output_dir / "repo" / "config")
tenant_id = snapshot_config["default_tenant_id"]
timeline_id = dict(snapshot_config["branch_name_mappings"]["main"])[tenant_id]
pageserver_http = env.pageserver.http_client()
lsn = Lsn(pg.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0])
wait_for_last_record_lsn(pageserver_http, tenant_id, timeline_id, lsn)
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload(pageserver_http, tenant_id, timeline_id, lsn)
env.postgres.stop_all()
env.endpoints.stop_all()
for sk in env.safekeepers:
sk.stop()
env.pageserver.stop()
@@ -86,7 +84,6 @@ def test_create_snapshot(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, test_o
# Directory `test_output_dir / "compatibility_snapshot_pg14"` is uploaded to S3 in a workflow, keep the name in sync with it
@check_ondisk_data_compatibility_if_enabled
@pytest.mark.xdist_group("compatibility")
@pytest.mark.order(after="test_create_snapshot")
def test_backward_compatibility(
@@ -98,6 +95,9 @@ def test_backward_compatibility(
pg_version: str,
request: FixtureRequest,
):
"""
Test that the new binaries can read old data
"""
compatibility_snapshot_dir_env = os.environ.get("COMPATIBILITY_SNAPSHOT_DIR")
assert (
compatibility_snapshot_dir_env is not None
@@ -120,6 +120,7 @@ def test_backward_compatibility(
check_neon_works(
test_output_dir / "compatibility_snapshot" / "repo",
neon_binpath,
neon_binpath,
pg_distrib_dir,
pg_version,
port_distributor,
@@ -140,7 +141,6 @@ def test_backward_compatibility(
), "Breaking changes are allowed by ALLOW_BACKWARD_COMPATIBILITY_BREAKAGE, but the test has passed without any breakage"
@check_ondisk_data_compatibility_if_enabled
@pytest.mark.xdist_group("compatibility")
@pytest.mark.order(after="test_create_snapshot")
def test_forward_compatibility(
@@ -148,7 +148,11 @@ def test_forward_compatibility(
port_distributor: PortDistributor,
pg_version: str,
request: FixtureRequest,
neon_binpath: Path,
):
"""
Test that the old binaries can read new data
"""
compatibility_neon_bin_env = os.environ.get("COMPATIBILITY_NEON_BIN")
assert compatibility_neon_bin_env is not None, (
"COMPATIBILITY_NEON_BIN is not set. It should be set to a path with Neon binaries "
@@ -183,6 +187,7 @@ def test_forward_compatibility(
check_neon_works(
test_output_dir / "compatibility_snapshot" / "repo",
compatibility_neon_bin,
neon_binpath,
compatibility_postgres_distrib_dir,
pg_version,
port_distributor,
@@ -223,9 +228,13 @@ def prepare_snapshot(
for logfile in repo_dir.glob("**/*.log"):
logfile.unlink()
# Remove tenants data for compute
for tenant in (repo_dir / "pgdatadirs" / "tenants").glob("*"):
shutil.rmtree(tenant)
# Remove old computes in 'endpoints'. Old versions of the control plane used a directory
# called "pgdatadirs". Delete it, too.
if (repo_dir / "endpoints").exists():
shutil.rmtree(repo_dir / "endpoints")
if (repo_dir / "pgdatadirs").exists():
shutil.rmtree(repo_dir / "pgdatadirs")
os.mkdir(repo_dir / "endpoints")
# Remove wal-redo temp directory if it exists. Newer pageserver versions don't create
# them anymore, but old versions did.
@@ -326,7 +335,8 @@ def get_neon_version(neon_binpath: Path):
def check_neon_works(
repo_dir: Path,
neon_binpath: Path,
neon_target_binpath: Path,
neon_current_binpath: Path,
pg_distrib_dir: Path,
pg_version: str,
port_distributor: PortDistributor,
@@ -336,7 +346,7 @@ def check_neon_works(
):
snapshot_config_toml = repo_dir / "config"
snapshot_config = toml.load(snapshot_config_toml)
snapshot_config["neon_distrib_dir"] = str(neon_binpath)
snapshot_config["neon_distrib_dir"] = str(neon_target_binpath)
snapshot_config["postgres_distrib_dir"] = str(pg_distrib_dir)
with (snapshot_config_toml).open("w") as f:
toml.dump(snapshot_config, f)
@@ -347,17 +357,25 @@ def check_neon_works(
config.repo_dir = repo_dir
config.pg_version = pg_version
config.initial_tenant = snapshot_config["default_tenant_id"]
config.neon_binpath = neon_binpath
config.pg_distrib_dir = pg_distrib_dir
config.preserve_database_files = True
cli = NeonCli(config)
cli.raw_cli(["start"])
request.addfinalizer(lambda: cli.raw_cli(["stop"]))
# Use the "target" binaries to launch the storage nodes
config_target = config
config_target.neon_binpath = neon_target_binpath
cli_target = NeonCli(config_target)
# And the current binaries to launch computes
config_current = copy.copy(config)
config_current.neon_binpath = neon_current_binpath
cli_current = NeonCli(config_current)
cli_target.raw_cli(["start"])
request.addfinalizer(lambda: cli_target.raw_cli(["stop"]))
pg_port = port_distributor.get_port()
cli.pg_start("main", port=pg_port)
request.addfinalizer(lambda: cli.pg_stop("main"))
cli_current.endpoint_start("main", port=pg_port)
request.addfinalizer(lambda: cli_current.endpoint_stop("main"))
connstr = f"host=127.0.0.1 port={pg_port} user=cloud_admin dbname=postgres"
pg_bin.run(["pg_dumpall", f"--dbname={connstr}", f"--file={test_output_dir / 'dump.sql'}"])

View File

@@ -13,10 +13,10 @@ def test_sync_safekeepers_logs(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
ctl = ComputeCtl(env)
env.neon_cli.create_branch("test_compute_ctl", "main")
pg = env.postgres.create_start("test_compute_ctl")
pg.safe_psql("CREATE TABLE t(key int primary key, value text)")
endpoint = env.endpoints.create_start("test_compute_ctl")
endpoint.safe_psql("CREATE TABLE t(key int primary key, value text)")
with open(pg.config_file_path(), "r") as f:
with open(endpoint.config_file_path(), "r") as f:
cfg_lines = f.readlines()
cfg_map = {}
for line in cfg_lines:
@@ -24,10 +24,13 @@ def test_sync_safekeepers_logs(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
k, v = line.split("=")
cfg_map[k] = v.strip("\n '\"")
log.info(f"postgres config: {cfg_map}")
pgdata = pg.pg_data_dir_path()
pgdata = endpoint.pg_data_dir_path()
pg_bin_path = os.path.join(pg_bin.pg_bin_path, "postgres")
pg.stop_and_destroy()
endpoint.stop_and_destroy()
# stop_and_destroy removes the whole endpoint directory. Recreate it.
Path(pgdata).mkdir(parents=True)
spec = (
"""

View File

@@ -12,10 +12,10 @@ def test_config(neon_simple_env: NeonEnv):
env.neon_cli.create_branch("test_config", "empty")
# change config
pg = env.postgres.create_start("test_config", config_lines=["log_min_messages=debug1"])
endpoint = env.endpoints.create_start("test_config", config_lines=["log_min_messages=debug1"])
log.info("postgres is running on test_config branch")
with closing(pg.connect()) as conn:
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute(
"""

View File

@@ -21,11 +21,11 @@ def test_crafted_wal_end(neon_env_builder: NeonEnvBuilder, wal_type: str):
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_crafted_wal_end")
pg = env.postgres.create("test_crafted_wal_end")
endpoint = env.endpoints.create("test_crafted_wal_end")
wal_craft = WalCraft(env)
pg.config(wal_craft.postgres_config())
pg.start()
res = pg.safe_psql_many(
endpoint.config(wal_craft.postgres_config())
endpoint.start()
res = endpoint.safe_psql_many(
queries=[
"CREATE TABLE keys(key int primary key)",
"INSERT INTO keys SELECT generate_series(1, 100)",
@@ -34,7 +34,7 @@ def test_crafted_wal_end(neon_env_builder: NeonEnvBuilder, wal_type: str):
)
assert res[-1][0] == (5050,)
wal_craft.in_existing(wal_type, pg.connstr())
wal_craft.in_existing(wal_type, endpoint.connstr())
log.info("Restarting all safekeepers and pageservers")
env.pageserver.stop()
@@ -43,7 +43,7 @@ def test_crafted_wal_end(neon_env_builder: NeonEnvBuilder, wal_type: str):
env.pageserver.start()
log.info("Trying more queries")
res = pg.safe_psql_many(
res = endpoint.safe_psql_many(
queries=[
"SELECT SUM(key) FROM keys",
"INSERT INTO keys SELECT generate_series(101, 200)",
@@ -60,7 +60,7 @@ def test_crafted_wal_end(neon_env_builder: NeonEnvBuilder, wal_type: str):
env.pageserver.start()
log.info("Trying more queries (again)")
res = pg.safe_psql_many(
res = endpoint.safe_psql_many(
queries=[
"SELECT SUM(key) FROM keys",
"INSERT INTO keys SELECT generate_series(201, 300)",

View File

@@ -13,10 +13,10 @@ def test_createdb(neon_simple_env: NeonEnv):
env = neon_simple_env
env.neon_cli.create_branch("test_createdb", "empty")
pg = env.postgres.create_start("test_createdb")
endpoint = env.endpoints.create_start("test_createdb")
log.info("postgres is running on 'test_createdb' branch")
with pg.cursor() as cur:
with endpoint.cursor() as cur:
# Cause a 'relmapper' change in the original branch
cur.execute("VACUUM FULL pg_class")
@@ -26,10 +26,10 @@ def test_createdb(neon_simple_env: NeonEnv):
# Create a branch
env.neon_cli.create_branch("test_createdb2", "test_createdb", ancestor_start_lsn=lsn)
pg2 = env.postgres.create_start("test_createdb2")
endpoint2 = env.endpoints.create_start("test_createdb2")
# Test that you can connect to the new database on both branches
for db in (pg, pg2):
for db in (endpoint, endpoint2):
with db.cursor(dbname="foodb") as cur:
# Check database size in both branches
cur.execute(
@@ -55,17 +55,17 @@ def test_createdb(neon_simple_env: NeonEnv):
def test_dropdb(neon_simple_env: NeonEnv, test_output_dir):
env = neon_simple_env
env.neon_cli.create_branch("test_dropdb", "empty")
pg = env.postgres.create_start("test_dropdb")
endpoint = env.endpoints.create_start("test_dropdb")
log.info("postgres is running on 'test_dropdb' branch")
with pg.cursor() as cur:
with endpoint.cursor() as cur:
cur.execute("CREATE DATABASE foodb")
lsn_before_drop = query_scalar(cur, "SELECT pg_current_wal_insert_lsn()")
dboid = query_scalar(cur, "SELECT oid FROM pg_database WHERE datname='foodb';")
with pg.cursor() as cur:
with endpoint.cursor() as cur:
cur.execute("DROP DATABASE foodb")
cur.execute("CHECKPOINT")
@@ -76,29 +76,29 @@ def test_dropdb(neon_simple_env: NeonEnv, test_output_dir):
env.neon_cli.create_branch(
"test_before_dropdb", "test_dropdb", ancestor_start_lsn=lsn_before_drop
)
pg_before = env.postgres.create_start("test_before_dropdb")
endpoint_before = env.endpoints.create_start("test_before_dropdb")
env.neon_cli.create_branch(
"test_after_dropdb", "test_dropdb", ancestor_start_lsn=lsn_after_drop
)
pg_after = env.postgres.create_start("test_after_dropdb")
endpoint_after = env.endpoints.create_start("test_after_dropdb")
# Test that database exists on the branch before drop
pg_before.connect(dbname="foodb").close()
endpoint_before.connect(dbname="foodb").close()
# Test that database subdir exists on the branch before drop
assert pg_before.pgdata_dir
dbpath = pathlib.Path(pg_before.pgdata_dir) / "base" / str(dboid)
assert endpoint_before.pgdata_dir
dbpath = pathlib.Path(endpoint_before.pgdata_dir) / "base" / str(dboid)
log.info(dbpath)
assert os.path.isdir(dbpath) is True
# Test that database subdir doesn't exist on the branch after drop
assert pg_after.pgdata_dir
dbpath = pathlib.Path(pg_after.pgdata_dir) / "base" / str(dboid)
assert endpoint_after.pgdata_dir
dbpath = pathlib.Path(endpoint_after.pgdata_dir) / "base" / str(dboid)
log.info(dbpath)
assert os.path.isdir(dbpath) is False
# Check that we restore the content of the datadir correctly
check_restored_datadir_content(test_output_dir, env, pg)
check_restored_datadir_content(test_output_dir, env, endpoint)

View File

@@ -9,10 +9,10 @@ from fixtures.utils import query_scalar
def test_createuser(neon_simple_env: NeonEnv):
env = neon_simple_env
env.neon_cli.create_branch("test_createuser", "empty")
pg = env.postgres.create_start("test_createuser")
endpoint = env.endpoints.create_start("test_createuser")
log.info("postgres is running on 'test_createuser' branch")
with pg.cursor() as cur:
with endpoint.cursor() as cur:
# Cause a 'relmapper' change in the original branch
cur.execute("CREATE USER testuser with password %s", ("testpwd",))
@@ -22,7 +22,7 @@ def test_createuser(neon_simple_env: NeonEnv):
# Create a branch
env.neon_cli.create_branch("test_createuser2", "test_createuser", ancestor_start_lsn=lsn)
pg2 = env.postgres.create_start("test_createuser2")
endpoint2 = env.endpoints.create_start("test_createuser2")
# Test that you can connect to new branch as a new user
assert pg2.safe_psql("select current_user", user="testuser") == [("testuser",)]
assert endpoint2.safe_psql("select current_user", user="testuser") == [("testuser",)]

View File

@@ -11,14 +11,14 @@ from fixtures.neon_fixtures import (
LocalFsStorage,
NeonEnv,
NeonEnvBuilder,
PageserverHttpClient,
PgBin,
RemoteStorageKind,
wait_for_last_flush_lsn,
wait_for_upload_queue_empty,
wait_until,
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.pageserver.utils import wait_for_upload_queue_empty
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import wait_until
GLOBAL_LRU_LOG_LINE = "tenant_min_resident_size-respecting LRU would not relieve pressure, evicting more following global LRU policy"
@@ -91,8 +91,8 @@ class EvictionEnv:
This assumes that the tenant is still at the state after pbench -i.
"""
lsn = self.pgbench_init_lsns[tenant_id]
with self.neon_env.postgres.create_start("main", tenant_id=tenant_id, lsn=lsn) as pg:
self.pg_bin.run(["pgbench", "-S", pg.connstr()])
with self.neon_env.endpoints.create_start("main", tenant_id=tenant_id, lsn=lsn) as endpoint:
self.pg_bin.run(["pgbench", "-S", endpoint.connstr()])
def pageserver_start_with_disk_usage_eviction(
self, period, max_usage_pct, min_avail_bytes, mock_behavior
@@ -138,7 +138,7 @@ def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> Ev
# remove the initial tenant
## why wait for upload queue? => https://github.com/neondatabase/neon/issues/3865
assert env.initial_timeline
wait_for_upload_queue_empty(pageserver_http, env.initial_tenant, env.initial_timeline)
wait_for_upload_queue_empty(env.pageserver, env.initial_tenant, env.initial_timeline)
pageserver_http.tenant_detach(env.initial_tenant)
assert isinstance(env.remote_storage, LocalFsStorage)
tenant_remote_storage = env.remote_storage.root / "tenants" / str(env.initial_tenant)
@@ -168,9 +168,9 @@ def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> Ev
}
)
with env.postgres.create_start("main", tenant_id=tenant_id) as pg:
pg_bin.run(["pgbench", "-i", f"-s{scale}", pg.connstr()])
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
pg_bin.run(["pgbench", "-i", f"-s{scale}", endpoint.connstr()])
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
timelines.append((tenant_id, timeline_id))
@@ -182,7 +182,7 @@ def eviction_env(request, neon_env_builder: NeonEnvBuilder, pg_bin: PgBin) -> Ev
# after stopping the safekeepers, we know that no new WAL will be coming in
for tenant_id, timeline_id in timelines:
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
wait_for_upload_queue_empty(pageserver_http, tenant_id, timeline_id)
wait_for_upload_queue_empty(env.pageserver, tenant_id, timeline_id)
tl_info = pageserver_http.timeline_detail(tenant_id, timeline_id)
assert tl_info["last_record_lsn"] == tl_info["disk_consistent_lsn"]
assert tl_info["disk_consistent_lsn"] == tl_info["remote_consistent_lsn"]

View File

@@ -4,7 +4,7 @@ from fixtures.neon_fixtures import NeonEnvBuilder
def test_fsm_truncate(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_fsm_truncate")
pg = env.postgres.create_start("test_fsm_truncate")
pg.safe_psql(
endpoint = env.endpoints.create_start("test_fsm_truncate")
endpoint.safe_psql(
"CREATE TABLE t1(key int); CREATE TABLE t2(key int); TRUNCATE TABLE t1; TRUNCATE TABLE t2;"
)

View File

@@ -24,10 +24,10 @@ def test_fullbackup(
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_fullbackup")
pgmain = env.postgres.create_start("test_fullbackup")
endpoint_main = env.endpoints.create_start("test_fullbackup")
log.info("postgres is running on 'test_fullbackup' branch")
with pgmain.cursor() as cur:
with endpoint_main.cursor() as cur:
timeline = TimelineId(query_scalar(cur, "SHOW neon.timeline_id"))
# data loading may take a while, so increase statement timeout

View File

@@ -5,9 +5,9 @@ import random
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
Endpoint,
NeonEnv,
NeonEnvBuilder,
Postgres,
RemoteStorageKind,
wait_for_last_flush_lsn,
)
@@ -26,9 +26,9 @@ updates_performed = 0
# Run random UPDATEs on test table
async def update_table(pg: Postgres):
async def update_table(endpoint: Endpoint):
global updates_performed
pg_conn = await pg.connect_async()
pg_conn = await endpoint.connect_async()
while updates_performed < updates_to_perform:
updates_performed += 1
@@ -52,10 +52,10 @@ async def gc(env: NeonEnv, timeline: TimelineId):
# At the same time, run UPDATEs and GC
async def update_and_gc(env: NeonEnv, pg: Postgres, timeline: TimelineId):
async def update_and_gc(env: NeonEnv, endpoint: Endpoint, timeline: TimelineId):
workers = []
for worker_id in range(num_connections):
workers.append(asyncio.create_task(update_table(pg)))
workers.append(asyncio.create_task(update_table(endpoint)))
workers.append(asyncio.create_task(gc(env, timeline)))
# await all workers
@@ -72,10 +72,10 @@ def test_gc_aggressive(neon_env_builder: NeonEnvBuilder):
neon_env_builder.pageserver_config_override = "tenant_config={pitr_interval = '0 sec'}"
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_gc_aggressive", "main")
pg = env.postgres.create_start("test_gc_aggressive")
endpoint = env.endpoints.create_start("test_gc_aggressive")
log.info("postgres is running on test_gc_aggressive branch")
with pg.cursor() as cur:
with endpoint.cursor() as cur:
timeline = TimelineId(query_scalar(cur, "SHOW neon.timeline_id"))
# Create table, and insert the first 100 rows
@@ -89,7 +89,7 @@ def test_gc_aggressive(neon_env_builder: NeonEnvBuilder):
)
cur.execute("CREATE INDEX ON foo(id)")
asyncio.run(update_and_gc(env, pg, timeline))
asyncio.run(update_and_gc(env, endpoint, timeline))
cur.execute("SELECT COUNT(*), SUM(counter) FROM foo")
r = cur.fetchone()
@@ -110,11 +110,11 @@ def test_gc_index_upload(neon_env_builder: NeonEnvBuilder, remote_storage_kind:
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_gc_index_upload", "main")
pg = env.postgres.create_start("test_gc_index_upload")
endpoint = env.endpoints.create_start("test_gc_index_upload")
pageserver_http = env.pageserver.http_client()
pg_conn = pg.connect()
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
tenant_id = TenantId(query_scalar(cur, "SHOW neon.tenant_id"))
@@ -146,7 +146,7 @@ def test_gc_index_upload(neon_env_builder: NeonEnvBuilder, remote_storage_kind:
return int(total)
# Sanity check that the metric works
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
pageserver_http.timeline_gc(tenant_id, timeline_id, 10000)
before = get_num_remote_ops("index", "upload")

View File

@@ -31,8 +31,8 @@ def test_gc_cutoff(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
"image_creation_threshold": "2",
}
)
pg = env.postgres.create_start("main", tenant_id=tenant_id)
connstr = pg.connstr(options="-csynchronous_commit=off")
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
connstr = endpoint.connstr(options="-csynchronous_commit=off")
pg_bin.run_capture(["pgbench", "-i", "-s10", connstr])
pageserver_http.configure_failpoints(("after-timeline-gc-removed-layers", "exit"))

View File

@@ -9,12 +9,13 @@ from pathlib import Path
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
Endpoint,
NeonEnv,
NeonEnvBuilder,
PgBin,
Postgres,
wait_for_last_record_lsn,
wait_for_upload,
)
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import subprocess_capture
@@ -72,7 +73,7 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
start_lsn = manifest["WAL-Ranges"][0]["Start-LSN"]
end_lsn = manifest["WAL-Ranges"][0]["End-LSN"]
node_name = "import_from_vanilla"
endpoint_id = "ep-import_from_vanilla"
tenant = TenantId.generate()
timeline = TimelineId.generate()
@@ -113,7 +114,7 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
"--timeline-id",
str(timeline),
"--node-name",
node_name,
endpoint_id,
"--base-lsn",
start_lsn,
"--base-tarfile",
@@ -153,8 +154,8 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
wait_for_upload(client, tenant, timeline, Lsn(end_lsn))
# Check it worked
pg = env.postgres.create_start(node_name, tenant_id=tenant)
assert pg.safe_psql("select count(*) from t") == [(300000,)]
endpoint = env.endpoints.create_start(endpoint_id, tenant_id=tenant)
assert endpoint.safe_psql("select count(*) from t") == [(300000,)]
@pytest.mark.timeout(600)
@@ -168,10 +169,10 @@ def test_import_from_pageserver_small(pg_bin: PgBin, neon_env_builder: NeonEnvBu
)
timeline = env.neon_cli.create_branch("test_import_from_pageserver_small")
pg = env.postgres.create_start("test_import_from_pageserver_small")
endpoint = env.endpoints.create_start("test_import_from_pageserver_small")
num_rows = 3000
lsn = _generate_data(num_rows, pg)
lsn = _generate_data(num_rows, endpoint)
_import(num_rows, lsn, env, pg_bin, timeline, env.pg_distrib_dir)
@@ -185,14 +186,14 @@ def test_import_from_pageserver_multisegment(pg_bin: PgBin, neon_env_builder: Ne
env = neon_env_builder.init_start()
timeline = env.neon_cli.create_branch("test_import_from_pageserver_multisegment")
pg = env.postgres.create_start("test_import_from_pageserver_multisegment")
endpoint = env.endpoints.create_start("test_import_from_pageserver_multisegment")
# For `test_import_from_pageserver_multisegment`, we want to make sure that the data
# is large enough to create multi-segment files. Typically, a segment file's size is
# at most 1GB. A large number of inserted rows (`30000000`) is used to increase the
# DB size to above 1GB. Related: https://github.com/neondatabase/neon/issues/2097.
num_rows = 30000000
lsn = _generate_data(num_rows, pg)
lsn = _generate_data(num_rows, endpoint)
logical_size = env.pageserver.http_client().timeline_detail(env.initial_tenant, timeline)[
"current_logical_size"
@@ -213,12 +214,12 @@ def test_import_from_pageserver_multisegment(pg_bin: PgBin, neon_env_builder: Ne
assert cnt_seg_files > 0
def _generate_data(num_rows: int, pg: Postgres) -> Lsn:
def _generate_data(num_rows: int, endpoint: Endpoint) -> Lsn:
"""Generate a table with `num_rows` rows.
Returns:
the latest insert WAL's LSN"""
with closing(pg.connect()) as conn:
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
# data loading may take a while, so increase statement timeout
cur.execute("SET statement_timeout='300s'")
@@ -263,7 +264,7 @@ def _import(
tar_output_file = result_basepath + ".stdout"
# Stop the first pageserver instance, erase all its data
env.postgres.stop_all()
env.endpoints.stop_all()
env.pageserver.stop()
dir_to_clear = Path(env.repo_dir) / "tenants"
@@ -278,7 +279,7 @@ def _import(
tenant = TenantId.generate()
# Import to pageserver
node_name = "import_from_pageserver"
endpoint_id = "ep-import_from_pageserver"
client = env.pageserver.http_client()
client.tenant_create(tenant)
env.neon_cli.raw_cli(
@@ -290,7 +291,7 @@ def _import(
"--timeline-id",
str(timeline),
"--node-name",
node_name,
endpoint_id,
"--base-lsn",
str(lsn),
"--base-tarfile",
@@ -305,8 +306,8 @@ def _import(
wait_for_upload(client, tenant, timeline, lsn)
# Check it worked
pg = env.postgres.create_start(node_name, tenant_id=tenant)
assert pg.safe_psql("select count(*) from tbl") == [(expected_num_rows,)]
endpoint = env.endpoints.create_start(endpoint_id, tenant_id=tenant)
assert endpoint.safe_psql("select count(*) from tbl") == [(expected_num_rows,)]
# Take another fullbackup
query = f"fullbackup { tenant} {timeline} {lsn}"

View File

@@ -15,9 +15,9 @@ from fixtures.neon_fixtures import NeonEnvBuilder
def test_large_schema(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
pg = env.postgres.create_start("main")
endpoint = env.endpoints.create_start("main")
conn = pg.connect()
conn = endpoint.connect()
cur = conn.cursor()
tables = 2 # 10 is too much for debug build
@@ -27,18 +27,18 @@ def test_large_schema(neon_env_builder: NeonEnvBuilder):
# Restart compute. Restart is actually not strictly needed.
# It is done mostly because this test originally tries to model the problem reported by Ketteq.
pg.stop()
endpoint.stop()
# Kill and restart the pageserver.
# env.pageserver.stop(immediate=True)
# env.pageserver.start()
pg.start()
endpoint.start()
retry_sleep = 0.5
max_retries = 200
retries = 0
while True:
try:
conn = pg.connect()
conn = endpoint.connect()
cur = conn.cursor()
cur.execute(f"CREATE TABLE if not exists t_{i}(pk integer) partition by range (pk)")
for j in range(1, partitions + 1):
@@ -63,7 +63,7 @@ def test_large_schema(neon_env_builder: NeonEnvBuilder):
raise
break
conn = pg.connect()
conn = endpoint.connect()
cur = conn.cursor()
for i in range(1, tables + 1):
@@ -74,8 +74,8 @@ def test_large_schema(neon_env_builder: NeonEnvBuilder):
cur.execute("select * from pg_depend order by refclassid, refobjid, refobjsubid")
# Check layer file sizes
tenant_id = pg.safe_psql("show neon.tenant_id")[0][0]
timeline_id = pg.safe_psql("show neon.timeline_id")[0][0]
tenant_id = endpoint.safe_psql("show neon.tenant_id")[0][0]
timeline_id = endpoint.safe_psql("show neon.timeline_id")[0][0]
timeline_path = "{}/tenants/{}/timelines/{}/".format(env.repo_dir, tenant_id, timeline_id)
for filename in os.listdir(timeline_path):
if filename.startswith("00000"):

View File

@@ -6,9 +6,10 @@ from fixtures.neon_fixtures import (
NeonEnvBuilder,
RemoteStorageKind,
wait_for_last_flush_lsn,
wait_for_last_record_lsn,
wait_for_sk_commit_lsn_to_reach_remote_storage,
wait_for_upload,
)
from fixtures.pageserver.utils import wait_for_last_record_lsn, wait_for_upload
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import query_scalar
@@ -27,13 +28,13 @@ def test_basic_eviction(
env = neon_env_builder.init_start()
client = env.pageserver.http_client()
pg = env.postgres.create_start("main")
endpoint = env.endpoints.create_start("main")
tenant_id = TenantId(pg.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(pg.safe_psql("show neon.timeline_id")[0][0])
tenant_id = TenantId(endpoint.safe_psql("show neon.tenant_id")[0][0])
timeline_id = TimelineId(endpoint.safe_psql("show neon.timeline_id")[0][0])
# Create a number of layers in the tenant
with pg.cursor() as cur:
with endpoint.cursor() as cur:
cur.execute("CREATE TABLE foo (t text)")
cur.execute(
"""
@@ -172,15 +173,15 @@ def test_gc_of_remote_layers(neon_env_builder: NeonEnvBuilder):
env.initial_tenant = tenant_id # update_and_gc relies on this
ps_http = env.pageserver.http_client()
pg = env.postgres.create_start("main")
endpoint = env.endpoints.create_start("main")
log.info("fill with data, creating delta & image layers, some of which are GC'able after")
# no particular reason to create the layers like this, but we are sure
# not to hit the image_creation_threshold here.
with pg.cursor() as cur:
with endpoint.cursor() as cur:
cur.execute("create table a (id bigserial primary key, some_value bigint not null)")
cur.execute("insert into a(some_value) select i from generate_series(1, 10000) s(i)")
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
ps_http.timeline_checkpoint(tenant_id, timeline_id)
# Create delta layers, then turn them into image layers.
@@ -191,19 +192,19 @@ def test_gc_of_remote_layers(neon_env_builder: NeonEnvBuilder):
for i in range(0, 2):
for j in range(0, 3):
# create a minimal amount of "delta difficulty" for this table
with pg.cursor() as cur:
with endpoint.cursor() as cur:
cur.execute("update a set some_value = -some_value + %s", (j,))
with pg.cursor() as cur:
with endpoint.cursor() as cur:
# vacuuming should aid to reuse keys, though it's not really important
# with image_creation_threshold=1 which we will use on the last compaction
cur.execute("vacuum")
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
if i == 1 and j == 2 and k == 1:
# last iteration; stop before checkpoint to avoid leaving an inmemory layer
pg.stop_and_destroy()
endpoint.stop_and_destroy()
ps_http.timeline_checkpoint(tenant_id, timeline_id)

View File

@@ -20,7 +20,7 @@ def test_image_layer_writer_fail_before_finish(neon_simple_env: NeonEnv):
}
)
pg = env.postgres.create_start("main", tenant_id=tenant_id)
pg = env.endpoints.create_start("main", tenant_id=tenant_id)
pg.safe_psql_many(
[
"CREATE TABLE foo (t text) WITH (autovacuum_enabled = off)",
@@ -64,8 +64,8 @@ def test_delta_layer_writer_fail_before_finish(neon_simple_env: NeonEnv):
}
)
pg = env.postgres.create_start("main", tenant_id=tenant_id)
pg.safe_psql_many(
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
endpoint.safe_psql_many(
[
"CREATE TABLE foo (t text) WITH (autovacuum_enabled = off)",
"""INSERT INTO foo

View File

@@ -12,10 +12,10 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
new_timeline_id = env.neon_cli.create_branch("test_lsn_mapping")
pgmain = env.postgres.create_start("test_lsn_mapping")
endpoint_main = env.endpoints.create_start("test_lsn_mapping")
log.info("postgres is running on 'test_lsn_mapping' branch")
cur = pgmain.connect().cursor()
cur = endpoint_main.connect().cursor()
# Create table, and insert rows, each in a separate transaction
# Disable synchronous_commit to make this initialization go faster.
#
@@ -35,7 +35,7 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
cur.execute("INSERT INTO foo VALUES (-1)")
# Wait until WAL is received by pageserver
wait_for_last_flush_lsn(env, pgmain, env.initial_tenant, new_timeline_id)
wait_for_last_flush_lsn(env, endpoint_main, env.initial_tenant, new_timeline_id)
with env.pageserver.http_client() as client:
# Check edge cases: timestamp in the future
@@ -61,9 +61,9 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
# Call get_lsn_by_timestamp to get the LSN
# Launch a new read-only node at that LSN, and check that only the rows
# that were supposed to be committed at that point in time are visible.
pg_here = env.postgres.create_start(
branch_name="test_lsn_mapping", node_name="test_lsn_mapping_read", lsn=lsn
endpoint_here = env.endpoints.create_start(
branch_name="test_lsn_mapping", endpoint_id="ep-lsn_mapping_read", lsn=lsn
)
assert pg_here.safe_psql("SELECT max(x) FROM foo")[0][0] == i
assert endpoint_here.safe_psql("SELECT max(x) FROM foo")[0][0] == i
pg_here.stop_and_destroy()
endpoint_here.stop_and_destroy()

View File

@@ -123,9 +123,9 @@ def test_metric_collection(
# before pageserver, pageserver log might contain such errors in the end.
env.pageserver.allowed_errors.append(".*metrics endpoint refused the sent metrics*")
env.neon_cli.create_branch("test_metric_collection")
pg = env.postgres.create_start("test_metric_collection")
endpoint = env.endpoints.create_start("test_metric_collection")
pg_conn = pg.connect()
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
tenant_id = TenantId(query_scalar(cur, "SHOW neon.tenant_id"))
@@ -158,7 +158,7 @@ def test_metric_collection(
# upload some data to remote storage
if remote_storage_kind == RemoteStorageKind.LOCAL_FS:
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
pageserver_http = env.pageserver.http_client()
pageserver_http.timeline_checkpoint(tenant_id, timeline_id)
pageserver_http.timeline_gc(tenant_id, timeline_id, 10000)

View File

@@ -12,10 +12,10 @@ from fixtures.utils import query_scalar
def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
env = neon_simple_env
env.neon_cli.create_branch("test_multixact", "empty")
pg = env.postgres.create_start("test_multixact")
endpoint = env.endpoints.create_start("test_multixact")
log.info("postgres is running on 'test_multixact' branch")
cur = pg.connect().cursor()
cur = endpoint.connect().cursor()
cur.execute(
"""
CREATE TABLE t1(i int primary key);
@@ -32,7 +32,7 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
connections = []
for i in range(nclients):
# Do not turn on autocommit. We want to hold the key-share locks.
conn = pg.connect(autocommit=False)
conn = endpoint.connect(autocommit=False)
connections.append(conn)
# On each iteration, we commit the previous transaction on a connection,
@@ -65,10 +65,10 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
# Branch at this point
env.neon_cli.create_branch("test_multixact_new", "test_multixact", ancestor_start_lsn=lsn)
pg_new = env.postgres.create_start("test_multixact_new")
endpoint_new = env.endpoints.create_start("test_multixact_new")
log.info("postgres is running on 'test_multixact_new' branch")
next_multixact_id_new = pg_new.safe_psql(
next_multixact_id_new = endpoint_new.safe_psql(
"SELECT next_multixact_id FROM pg_control_checkpoint()"
)[0][0]
@@ -76,4 +76,4 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
assert next_multixact_id_new == next_multixact_id
# Check that we can restore the content of the datadir correctly
check_restored_datadir_content(test_output_dir, env, pg)
check_restored_datadir_content(test_output_dir, env, endpoint)

View File

@@ -5,8 +5,8 @@ from fixtures.neon_fixtures import (
DEFAULT_BRANCH_NAME,
NeonEnv,
NeonEnvBuilder,
PageserverHttpClient,
)
from fixtures.pageserver.http import PageserverHttpClient
from fixtures.types import TenantId, TimelineId

View File

@@ -9,9 +9,11 @@ def test_neon_cli_basics(neon_env_builder: NeonEnvBuilder, port_distributor: Por
try:
env.neon_cli.start()
env.neon_cli.create_tenant(tenant_id=env.initial_tenant, set_default=True)
env.neon_cli.pg_start(node_name="main", port=port_distributor.get_port())
env.neon_cli.endpoint_start(endpoint_id="ep-main", port=port_distributor.get_port())
env.neon_cli.create_branch(new_branch_name="migration_check")
env.neon_cli.pg_start(node_name="migration_check", port=port_distributor.get_port())
env.neon_cli.endpoint_start(
endpoint_id="ep-migration_check", port=port_distributor.get_port()
)
finally:
env.neon_cli.stop()

Some files were not shown because too many files have changed in this diff Show More