Compare commits

..

2 Commits

Author SHA1 Message Date
Heikki Linnakangas
1e35ff22cd Hack to work around broken POSTGRES_DISTRIB_DIR path since v15 support 2022-09-22 17:58:59 +03:00
Heikki Linnakangas
552e9dd7f8 Run only neon-captest-reuse benchmark [DO NOT MERGE]
I just want to run the one benchmark manually
2022-09-22 17:58:59 +03:00
36 changed files with 211 additions and 605 deletions

View File

@@ -12,9 +12,6 @@ inputs:
description: "Allow to skip if file doesn't exist, fail otherwise"
default: false
required: false
prefix:
description: "S3 prefix. Default is '${GITHUB_RUN_ID}/${GITHUB_RUN_ATTEMPT}'"
required: false
runs:
using: "composite"
@@ -26,18 +23,18 @@ runs:
TARGET: ${{ inputs.path }}
ARCHIVE: /tmp/downloads/${{ inputs.name }}.tar.zst
SKIP_IF_DOES_NOT_EXIST: ${{ inputs.skip-if-does-not-exist }}
PREFIX: artifacts/${{ inputs.prefix || format('{0}/{1}', github.run_id, github.run_attempt) }}
run: |
BUCKET=neon-github-public-dev
PREFIX=artifacts/${GITHUB_RUN_ID}
FILENAME=$(basename $ARCHIVE)
S3_KEY=$(aws s3api list-objects-v2 --bucket ${BUCKET} --prefix ${PREFIX%$GITHUB_RUN_ATTEMPT} | jq -r '.Contents[].Key' | grep ${FILENAME} | sort --version-sort | tail -1 || true)
S3_KEY=$(aws s3api list-objects-v2 --bucket ${BUCKET} --prefix ${PREFIX} | jq -r '.Contents[].Key' | grep ${FILENAME} | sort --version-sort | tail -1 || true)
if [ -z "${S3_KEY}" ]; then
if [ "${SKIP_IF_DOES_NOT_EXIST}" = "true" ]; then
echo '::set-output name=SKIPPED::true'
exit 0
else
echo 2>&1 "Neither s3://${BUCKET}/${PREFIX}/${FILENAME} nor its version from previous attempts exist"
echo 2>&1 "Neither s3://${BUCKET}/${PREFIX}/${GITHUB_RUN_ATTEMPT}/${FILENAME} nor its version from previous attempts exist"
exit 1
fi
fi

View File

@@ -127,7 +127,7 @@ runs:
# Wake up the cluster if we use remote neon instance
if [ "${{ inputs.build_type }}" = "remote" ] && [ -n "${BENCHMARK_CONNSTR}" ]; then
${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin/psql ${BENCHMARK_CONNSTR} -c "SELECT version();"
${POSTGRES_DISTRIB_DIR}/v14/bin/psql ${BENCHMARK_CONNSTR} -c "SELECT version();"
fi
# Run the tests.

View File

@@ -7,9 +7,6 @@ inputs:
path:
description: "A directory or file to upload"
required: true
prefix:
description: "S3 prefix. Default is '${GITHUB_RUN_ID}/${GITHUB_RUN_ATTEMPT}'"
required: false
runs:
using: "composite"
@@ -45,14 +42,14 @@ runs:
env:
SOURCE: ${{ inputs.path }}
ARCHIVE: /tmp/uploads/${{ inputs.name }}.tar.zst
PREFIX: artifacts/${{ inputs.prefix || format('{0}/{1}', github.run_id, github.run_attempt) }}
run: |
BUCKET=neon-github-public-dev
PREFIX=artifacts/${GITHUB_RUN_ID}
FILENAME=$(basename $ARCHIVE)
FILESIZE=$(du -sh ${ARCHIVE} | cut -f1)
time aws s3 mv --only-show-errors ${ARCHIVE} s3://${BUCKET}/${PREFIX}/${FILENAME}
time aws s3 mv --only-show-errors ${ARCHIVE} s3://${BUCKET}/${PREFIX}/${GITHUB_RUN_ATTEMPT}/${FILENAME}
# Ref https://docs.github.com/en/actions/using-workflows/workflow-commands-for-github-actions#adding-a-job-summary
echo "[${FILENAME}](https://${BUCKET}.s3.amazonaws.com/${PREFIX}/${FILENAME}) ${FILESIZE}" >> ${GITHUB_STEP_SUMMARY}
echo "[${FILENAME}](https://${BUCKET}.s3.amazonaws.com/${PREFIX}/${GITHUB_RUN_ATTEMPT}/${FILENAME}) ${FILESIZE}" >> ${GITHUB_STEP_SUMMARY}

View File

@@ -58,23 +58,23 @@
creates: "/storage/pageserver/data/tenants"
environment:
NEON_REPO_DIR: "/storage/pageserver/data"
LD_LIBRARY_PATH: "/usr/local/v14/lib"
LD_LIBRARY_PATH: "/usr/local/lib"
become: true
tags:
- pageserver
- name: update remote storage (s3) config
lineinfile:
path: /storage/pageserver/data/pageserver.toml
line: "{{ item }}"
loop:
- "[remote_storage]"
- "bucket_name = '{{ bucket_name }}'"
- "bucket_region = '{{ bucket_region }}'"
- "prefix_in_bucket = '{{ inventory_hostname }}'"
become: true
tags:
- pageserver
# - name: update remote storage (s3) config
# lineinfile:
# path: /storage/pageserver/data/pageserver.toml
# line: "{{ item }}"
# loop:
# - "[remote_storage]"
# - "bucket_name = '{{ bucket_name }}'"
# - "bucket_region = '{{ bucket_region }}'"
# - "prefix_in_bucket = '{{ inventory_hostname }}'"
# become: true
# tags:
# - pageserver
- name: upload systemd service definition
ansible.builtin.template:
@@ -87,15 +87,15 @@
tags:
- pageserver
- name: start systemd service
ansible.builtin.systemd:
daemon_reload: yes
name: pageserver
enabled: yes
state: restarted
become: true
tags:
- pageserver
# - name: start systemd service
# ansible.builtin.systemd:
# daemon_reload: yes
# name: pageserver
# enabled: yes
# state: restarted
# become: true
# tags:
# - pageserver
- name: post version to console
when: console_mgmt_base_url is defined
@@ -132,7 +132,7 @@
creates: "/storage/safekeeper/data/safekeeper.id"
environment:
NEON_REPO_DIR: "/storage/safekeeper/data"
LD_LIBRARY_PATH: "/usr/local/v14/lib"
LD_LIBRARY_PATH: "/usr/local/lib"
become: true
tags:
- safekeeper

View File

@@ -21,14 +21,10 @@ docker pull --quiet neondatabase/neon:${DOCKER_TAG}
ID=$(docker create neondatabase/neon:${DOCKER_TAG})
docker cp ${ID}:/data/postgres_install.tar.gz .
tar -xzf postgres_install.tar.gz -C neon_install
mkdir neon_install/bin/
docker cp ${ID}:/usr/local/bin/pageserver neon_install/bin/
docker cp ${ID}:/usr/local/bin/safekeeper neon_install/bin/
docker cp ${ID}:/usr/local/bin/proxy neon_install/bin/
docker cp ${ID}:/usr/local/v14/bin/ neon_install/v14/bin/
docker cp ${ID}:/usr/local/v15/bin/ neon_install/v15/bin/
docker cp ${ID}:/usr/local/v14/lib/ neon_install/v14/lib/
docker cp ${ID}:/usr/local/v15/lib/ neon_install/v15/lib/
docker cp ${ID}:/usr/local/bin/postgres neon_install/bin/
docker rm -vf ${ID}
# store version to file (for ansible playbooks) and create binaries tarball

View File

@@ -3,15 +3,11 @@
zenith-us-stage-ps-2 console_region_id=27
zenith-us-stage-ps-3 console_region_id=27
zenith-us-stage-ps-4 console_region_id=27
zenith-us-stage-test-ps-1 console_region_id=28
[safekeepers]
zenith-us-stage-sk-4 console_region_id=27
zenith-us-stage-sk-5 console_region_id=27
zenith-us-stage-sk-6 console_region_id=27
zenith-us-stage-test-sk-1 console_region_id=28
zenith-us-stage-test-sk-2 console_region_id=28
zenith-us-stage-test-sk-3 console_region_id=28
[storage:children]
pageservers

View File

@@ -5,7 +5,7 @@ After=network.target auditd.service
[Service]
Type=simple
User=pageserver
Environment=RUST_BACKTRACE=1 NEON_REPO_DIR=/storage/pageserver LD_LIBRARY_PATH=/usr/local/v14/lib
Environment=RUST_BACKTRACE=1 NEON_REPO_DIR=/storage/pageserver LD_LIBRARY_PATH=/usr/local/lib
ExecStart=/usr/local/bin/pageserver -c "pg_distrib_dir='/usr/local'" -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" -c "broker_endpoints=['{{ etcd_endpoints }}']" -D /storage/pageserver/data
ExecReload=/bin/kill -HUP $MAINPID
KillMode=mixed

View File

@@ -5,7 +5,7 @@ After=network.target auditd.service
[Service]
Type=simple
User=safekeeper
Environment=RUST_BACKTRACE=1 NEON_REPO_DIR=/storage/safekeeper/data LD_LIBRARY_PATH=/usr/local/v14/lib
Environment=RUST_BACKTRACE=1 NEON_REPO_DIR=/storage/safekeeper/data LD_LIBRARY_PATH=/usr/local/lib
ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}.local:6500 --listen-http {{ inventory_hostname }}.local:7676 -D /storage/safekeeper/data --broker-endpoints={{ etcd_endpoints }} --remote-storage='{bucket_name="{{bucket_name}}", bucket_region="{{bucket_region}}", prefix_in_bucket="{{ env_name }}/wal"}'
ExecReload=/bin/kill -HUP $MAINPID
KillMode=mixed

View File

@@ -46,8 +46,7 @@ jobs:
runs-on: [self-hosted, zenith-benchmarker]
env:
POSTGRES_DISTRIB_DIR: /tmp/pg_install
DEFAULT_PG_VERSION: 14
POSTGRES_DISTRIB_DIR: "/usr/pgsql-14"
steps:
- name: Checkout zenith repo
@@ -72,7 +71,7 @@ jobs:
echo Poetry
poetry --version
echo Pgbench
${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin/pgbench --version
$POSTGRES_DISTRIB_DIR/bin/pgbench --version
- name: Create Neon Project
id: create-neon-project
@@ -141,8 +140,7 @@ jobs:
env:
TEST_PG_BENCH_DURATIONS_MATRIX: "60m"
TEST_PG_BENCH_SCALES_MATRIX: "10gb"
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
DEFAULT_PG_VERSION: 14
POSTGRES_DISTRIB_DIR: /tmp/pg_install
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref == 'refs/heads/main' ) }}
@@ -153,7 +151,7 @@ jobs:
# neon-captest-new: Run pgbench in a freshly created project
# neon-captest-reuse: Same, but reusing existing project
# neon-captest-prefetch: Same, with prefetching enabled (new project)
platform: [ neon-captest-new, neon-captest-reuse, neon-captest-prefetch, rds-aurora ]
platform: [ neon-captest-reuse ]
runs-on: dev
container:
@@ -165,17 +163,10 @@ jobs:
steps:
- uses: actions/checkout@v3
- name: Download Neon artifact
uses: ./.github/actions/download
with:
name: neon-${{ runner.os }}-release-artifact
path: /tmp/neon/
prefix: latest
- name: Add Postgres binaries to PATH
- name: Install Deps
run: |
${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin/pgbench --version
echo "${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin" >> $GITHUB_PATH
sudo apt -y update
sudo apt install -y postgresql-14
- name: Create Neon Project
if: matrix.platform != 'neon-captest-reuse'
@@ -210,6 +201,13 @@ jobs:
env:
PLATFORM: ${{ matrix.platform }}
- name: Hack psql path
run: |
mkdir /tmp/pg_install
ln -s /usr/ /tmp/pg_install/v14
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
- name: Set database options
if: matrix.platform == 'neon-captest-prefetch'
run: |

View File

@@ -268,32 +268,6 @@ jobs:
if: matrix.build_type == 'debug'
uses: ./.github/actions/save-coverage-data
upload-latest-artifacts:
runs-on: dev
container:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
options: --init
needs: [ regress-tests ]
if: github.ref_name == 'main'
steps:
- name: Copy Neon artifact to the latest directory
shell: bash -euxo pipefail {0}
env:
BUCKET: neon-github-public-dev
PREFIX: artifacts/${{ github.run_id }}
run: |
for build_type in debug release; do
FILENAME=neon-${{ runner.os }}-${build_type}-artifact.tar.zst
S3_KEY=$(aws s3api list-objects-v2 --bucket ${BUCKET} --prefix ${PREFIX} | jq -r '.Contents[].Key' | grep ${FILENAME} | sort --version-sort | tail -1 || true)
if [ -z "${S3_KEY}" ]; then
echo 2>&1 "Neither s3://${BUCKET}/${PREFIX}/${FILENAME} nor its version from previous attempts exist"
exit 1
fi
time aws s3 cp --only-show-errors s3://${BUCKET}/${S3_KEY} s3://${BUCKET}/artifacts/latest/${FILENAME}
done
benchmarks:
runs-on: dev
container:
@@ -361,6 +335,9 @@ jobs:
curl --fail --output suites.json ${REPORT_URL%/index.html}/data/suites.json
./scripts/pysync
# Workaround for https://github.com/neondatabase/cloud/issues/2188
psql "$TEST_RESULT_CONNSTR" -c "SELECT 1;" || sleep 10
DATABASE_URL="$TEST_RESULT_CONNSTR" poetry run python3 scripts/ingest_regress_test_result.py --revision ${SHA} --reference ${GITHUB_REF} --build-type ${BUILD_TYPE} --ingest suites.json
coverage-report:
@@ -611,16 +588,7 @@ jobs:
- name: Pull rust image from ECR
run: crane pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned rust
- name: Push images to production ECR
if: |
(github.ref_name == 'main' || github.ref_name == 'release') &&
github.event_name != 'workflow_dispatch'
run: |
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/neon:$GITHUB_RUN_ID 093970136003.dkr.ecr.us-east-2.amazonaws.com/neon:latest
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:$GITHUB_RUN_ID 093970136003.dkr.ecr.us-east-2.amazonaws.com/compute-tools:latest
crane copy 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node:$GITHUB_RUN_ID 093970136003.dkr.ecr.us-east-2.amazonaws.com/compute-node:latest
- name: Configure Docker Hub login
- name: Configure docker login
run: |
# ECR Credential Helper & Docker Hub don't work together in config, hence reset
echo "" > /github/home/.docker/config.json
@@ -641,7 +609,7 @@ jobs:
- name: Push rust image to Docker Hub
run: crane push rust neondatabase/rust:pinned
- name: Add latest tag to images in Docker Hub
- name: Add latest tag to images
if: |
(github.ref_name == 'main' || github.ref_name == 'release') &&
github.event_name != 'workflow_dispatch'

View File

@@ -19,8 +19,9 @@ COPY --chown=nonroot scripts/ninstall.sh scripts/ninstall.sh
ENV BUILD_TYPE release
RUN set -e \
&& mold -run make -j $(nproc) -s neon-pg-ext \
&& rm -rf pg_install/build \
&& tar -C pg_install -czf /home/nonroot/postgres_install.tar.gz .
&& rm -rf pg_install/v14/build \
&& rm -rf pg_install/v15/build \
&& tar -C pg_install/v14 -czf /home/nonroot/postgres_install.tar.gz .
# Build neon binaries
FROM $REPOSITORY/$IMAGE:$TAG AS build

View File

@@ -8,12 +8,9 @@ ARG TAG=pinned
# Layer "build-deps"
#
FROM debian:bullseye-slim AS build-deps
RUN echo "deb http://ftp.debian.org/debian testing main" >> /etc/apt/sources.list && \
echo "APT::Default-Release \"stable\";" > /etc/apt/apt.conf.d/default-release && \
apt update
RUN apt update && \
apt install -y git autoconf automake libtool build-essential bison flex libreadline-dev zlib1g-dev libxml2-dev \
libcurl4-openssl-dev libossp-uuid-dev wget pkg-config libglib2.0-dev
libcurl4-openssl-dev libossp-uuid-dev
#
# Layer "pg-build"
@@ -40,7 +37,7 @@ RUN cd postgres && \
FROM build-deps AS postgis-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN apt update && \
apt install -y gdal-bin libgdal-dev libprotobuf-c-dev protobuf-c-compiler xsltproc
apt install -y gdal-bin libgdal-dev libprotobuf-c-dev protobuf-c-compiler xsltproc wget
RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.0.tar.gz && \
tar xvzf postgis-3.3.0.tar.gz && \
@@ -62,13 +59,15 @@ RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.0.tar.gz && \
# Build plv8
#
FROM build-deps AS plv8-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=postgis-build /usr/local/pgsql/ /usr/local/pgsql/
RUN apt update && \
apt install -y ninja-build python3-dev libc++-dev libc++abi-dev libncurses5
apt install -y git curl wget make ninja-build build-essential libncurses5 python3-dev pkg-config libc++-dev libc++abi-dev libglib2.0-dev
# https://github.com/plv8/plv8/issues/475
# Debian bullseye provides binutils 2.35 when >= 2.38 is necessary
RUN apt update && \
RUN echo "deb http://ftp.debian.org/debian testing main" >> /etc/apt/sources.list && \
echo "APT::Default-Release \"stable\";" > /etc/apt/apt.conf.d/default-release && \
apt update && \
apt install -y --no-install-recommends -t testing binutils
RUN wget https://github.com/plv8/plv8/archive/refs/tags/v3.1.4.tar.gz && \
@@ -80,46 +79,12 @@ RUN wget https://github.com/plv8/plv8/archive/refs/tags/v3.1.4.tar.gz && \
rm -rf /plv8-* && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/plv8.control
#
# Layer "h3-pg-build"
# Build h3_pg
#
FROM build-deps AS h3-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
# packaged cmake is too old
RUN apt update && \
apt install -y --no-install-recommends -t testing cmake
RUN wget https://github.com/uber/h3/archive/refs/tags/v4.0.1.tar.gz -O h3.tgz && \
tar xvzf h3.tgz && \
cd h3-4.0.1 && \
mkdir build && \
cd build && \
cmake .. -DCMAKE_BUILD_TYPE=Release && \
make -j $(getconf _NPROCESSORS_ONLN) && \
DESTDIR=/h3 make install && \
cp -R /h3/usr / && \
rm -rf build
RUN wget https://github.com/zachasme/h3-pg/archive/refs/tags/v4.0.1.tar.gz -O h3-pg.tgz && \
tar xvzf h3-pg.tgz && \
cd h3-pg-4.0.1 && \
export PATH="/usr/local/pgsql/bin:$PATH" && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/h3.control
#
# Layer "neon-pg-ext-build"
# compile neon extensions
#
FROM build-deps AS neon-pg-ext-build
COPY --from=postgis-build /usr/local/pgsql/ /usr/local/pgsql/
# plv8 still sometimes crashes during the creation
# COPY --from=plv8-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=h3-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=h3-pg-build /h3/usr /
COPY pgxn/ pgxn/
RUN make -j $(getconf _NPROCESSORS_ONLN) \
@@ -167,6 +132,8 @@ RUN mkdir /var/db && useradd -m -d /var/db/postgres postgres && \
chmod 0750 /var/db/postgres/compute && \
echo '/usr/local/lib' >> /etc/ld.so.conf && /sbin/ldconfig
# TODO: Check if we can make the extension setup more modular versus a linear build
# currently plv8-build copies the output /usr/local/pgsql from postgis-build, etc#
COPY --from=postgres-cleanup-layer --chown=postgres /usr/local/pgsql /usr/local
COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-debug-size-lto/compute_ctl /usr/local/bin/compute_ctl

View File

@@ -285,7 +285,7 @@ impl LocalEnv {
branch_name: &str,
tenant_id: TenantId,
) -> Option<TimelineId> {
dbg!(&self.branch_name_mappings)
self.branch_name_mappings
.get(branch_name)?
.iter()
.find(|(mapped_tenant_id, _)| mapped_tenant_id == &tenant_id)

View File

@@ -1,111 +0,0 @@
#!/usr/bin/env python3
import argparse
import json
import os
import subprocess
import sys
import textwrap
import uuid
from pathlib import Path
import testgres
def run_command(args):
print('> Cmd:', ' '.join(args))
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
ret = p.wait()
output = p.stdout.read().strip()
if output:
print(textwrap.indent(output, '>> '))
if ret != 0:
raise subprocess.CalledProcessError(ret, args)
def make_tarfile(output_filename, source_dir):
print("* Packing the backup into a tarball")
cmd = ["tar", r"--transform=s/\.\///", "-C", str(source_dir), "-cf", str(output_filename), "."]
run_command(cmd)
def create_tenant(tenant_id):
print("* Creating a new tenant")
cmd = ["neon_local", "tenant", "create", f"--tenant-id={tenant_id}"]
run_command(cmd)
def import_backup(args, backup_dir: Path):
tar = Path('/tmp/base.tar')
make_tarfile(tar, backup_dir / 'data')
print("* Importing the timeline into the pageserver")
manifest = json.loads((backup_dir / "data" / "backup_manifest").read_text())
start_lsn = manifest["WAL-Ranges"][0]["Start-LSN"]
end_lsn = manifest["WAL-Ranges"][0]["End-LSN"]
print("> LSNs:", start_lsn, end_lsn)
cmd = (
"neon_local timeline import "
f"--tenant-id {args.tenant_id} "
f"--base-lsn {start_lsn} "
f"--end-lsn {end_lsn} "
f"--base-tarfile {tar} "
f"--timeline-id {args.timeline_id} "
f"--node-name {args.node}"
)
run_command(cmd.split())
def debug_prints(node):
tuples = node.execute("table foo")
oid = node.execute("select 'foo'::regclass::oid")[0][0]
print("> foo's tuples:", tuples, "&", "oid:", oid)
print("> DBs:", node.execute("select oid, datname from pg_database"))
def main(args):
print("* Creating a node")
node = testgres.get_new_node()
node.init(unix_sockets=False, allow_streaming=True).start()
node.execute("create table foo as select 1")
debug_prints(node)
# node.pgbench_init(scale=1)
print("* Creating a backup")
backup = node.backup()
backup_dir = Path(backup.base_dir)
print("> Backup dir:", backup_dir)
# pr = backup.spawn_primary().start()
# debug_prints(pr)
# exit(1)
create_tenant(args.tenant_id)
import_backup(args, backup_dir)
print("> Tenant:", args.tenant_id)
print("> Timeline:", args.timeline_id)
print("> Node:", args.node)
print("* Starting postgres")
cmd = ["neon_local", "pg", "start", f"--tenant-id={args.tenant_id}", f"--timeline-id={args.timeline_id}", args.node]
run_command(cmd)
print("* Opening psql session...")
cmd = ["psql", f"host=127.0.0.1 port=55432 user={os.getlogin()} dbname=postgres"]
subprocess.call(cmd)
if __name__ == "__main__":
tenant_id = uuid.uuid4().hex
parser = argparse.ArgumentParser()
parser.add_argument("--tenant-id", default=tenant_id)
parser.add_argument("--timeline-id", default=tenant_id)
parser.add_argument("node")
args = parser.parse_args(sys.argv[1:])
main(args)

View File

@@ -170,7 +170,7 @@ pub fn find_end_of_wal(
let mut curr_lsn = start_lsn;
let mut buf = [0u8; XLOG_BLCKSZ];
let pg_version = PG_MAJORVERSION[1..3].parse::<u32>().unwrap();
debug!("find_end_of_wal PG_VERSION: {}", pg_version);
info!("find_end_of_wal PG_VERSION: {}", pg_version);
let mut decoder = WalStreamDecoder::new(start_lsn, pg_version);
@@ -182,7 +182,7 @@ pub fn find_end_of_wal(
match open_wal_segment(&seg_file_path)? {
None => {
// no more segments
debug!(
info!(
"find_end_of_wal reached end at {:?}, segment {:?} doesn't exist",
result, seg_file_path
);
@@ -205,7 +205,7 @@ pub fn find_end_of_wal(
match decoder.poll_decode() {
Ok(Some(record)) => result = record.0,
Err(e) => {
debug!(
info!(
"find_end_of_wal reached end at {:?}, decode error: {:?}",
result, e
);

View File

@@ -240,6 +240,7 @@ where
mod tests {
use super::*;
use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;
impl MonotonicCounter<i32> for i32 {
@@ -257,19 +258,17 @@ mod tests {
let seq = Arc::new(SeqWait::new(0));
let seq2 = Arc::clone(&seq);
let seq3 = Arc::clone(&seq);
let jh1 = tokio::task::spawn(async move {
tokio::task::spawn(async move {
seq2.wait_for(42).await.expect("wait_for 42");
let old = seq2.advance(100);
assert_eq!(old, 99);
seq2.wait_for_timeout(999, Duration::from_millis(100))
.await
.expect_err("no 999");
seq2.wait_for(999).await.expect_err("no 999");
});
let jh2 = tokio::task::spawn(async move {
tokio::task::spawn(async move {
seq3.wait_for(42).await.expect("wait_for 42");
seq3.wait_for(0).await.expect("wait_for 0");
});
tokio::time::sleep(Duration::from_millis(200)).await;
sleep(Duration::from_secs(1));
let old = seq.advance(99);
assert_eq!(old, 0);
seq.wait_for(100).await.expect("wait_for 100");
@@ -278,9 +277,6 @@ mod tests {
assert_eq!(seq.advance(98), 100);
assert_eq!(seq.load(), 100);
jh1.await.unwrap();
jh2.await.unwrap();
seq.shutdown();
}
@@ -288,18 +284,15 @@ mod tests {
async fn seqwait_timeout() {
let seq = Arc::new(SeqWait::new(0));
let seq2 = Arc::clone(&seq);
let jh = tokio::task::spawn(async move {
tokio::task::spawn(async move {
let timeout = Duration::from_millis(1);
let res = seq2.wait_for_timeout(42, timeout).await;
assert_eq!(res, Err(SeqWaitError::Timeout));
});
tokio::time::sleep(Duration::from_millis(200)).await;
tokio::time::sleep(Duration::from_secs(1)).await;
// This will attempt to wake, but nothing will happen
// because the waiter already dropped its Receiver.
let old = seq.advance(99);
assert_eq!(old, 0);
jh.await.unwrap();
seq.shutdown();
assert_eq!(old, 0)
}
}

View File

@@ -1,9 +1,8 @@
use metrics::core::{AtomicU64, GenericCounter};
use metrics::{
register_gauge_vec, register_histogram, register_histogram_vec, register_int_counter,
register_int_counter_vec, register_int_gauge, register_int_gauge_vec, register_uint_gauge_vec,
GaugeVec, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, UIntGauge,
UIntGaugeVec,
register_histogram, register_histogram_vec, register_int_counter, register_int_counter_vec,
register_int_gauge, register_int_gauge_vec, register_uint_gauge_vec, Histogram, HistogramVec,
IntCounter, IntCounterVec, IntGauge, IntGaugeVec, UIntGauge, UIntGaugeVec,
};
use once_cell::sync::Lazy;
use utils::id::{TenantId, TimelineId};
@@ -205,34 +204,12 @@ pub static REMAINING_SYNC_ITEMS: Lazy<IntGauge> = Lazy::new(|| {
.expect("failed to register pageserver remote storage remaining sync items int gauge")
});
pub static IMAGE_SYNC_TIME: Lazy<GaugeVec> = Lazy::new(|| {
register_gauge_vec!(
"pageserver_remote_storage_image_sync_duration",
"Time spent to synchronize (up/download) a whole pageserver image",
&["tenant_id", "timeline_id"],
)
.expect("failed to register per-timeline pageserver image sync time vec")
});
pub static IMAGE_SYNC_OPERATION_KINDS: &[&str] = &["upload", "download", "delete"];
pub static IMAGE_SYNC_STATUS: &[&str] = &["success", "failure", "abort"];
pub static IMAGE_SYNC_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_remote_storage_image_sync_count",
"Number of synchronization operations executed for pageserver images. \
Grouped by tenant, timeline, operation_kind and status",
&["tenant_id", "timeline_id", "operation_kind", "status"]
)
.expect("failed to register pageserver image sync count vec")
});
pub static IMAGE_SYNC_TIME_HISTOGRAM: Lazy<HistogramVec> = Lazy::new(|| {
pub static IMAGE_SYNC_TIME: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"pageserver_remote_storage_image_sync_seconds",
"Time took to synchronize (download or upload) a whole pageserver image. \
Grouped by operation_kind and status",
&["operation_kind", "status"],
Grouped by tenant and timeline ids, `operation_kind` (upload|download) and `status` (success|failure)",
&["tenant_id", "timeline_id", "operation_kind", "status"],
vec![0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 3.0, 10.0, 20.0]
)
.expect("failed to register pageserver image sync time histogram vec")
@@ -279,7 +256,7 @@ macro_rules! redo_histogram_time_buckets {
() => {
vec![
0.000_005, 0.000_010, 0.000_025, 0.000_050, 0.000_100, 0.000_250, 0.000_500, 0.001_000,
0.002_500, 0.005_000, 0.010_000, 0.025_000, 0.050_000, 0.100_000, 0.250_000,
0.002_500, 0.005_000, 0.010_000, 0.025_000, 0.050_000,
]
};
}
@@ -434,14 +411,6 @@ impl Drop for TimelineMetrics {
for op in SMGR_QUERY_TIME_OPERATIONS {
let _ = SMGR_QUERY_TIME.remove_label_values(&[op, tenant_id, timeline_id]);
}
for op in IMAGE_SYNC_OPERATION_KINDS {
for status in IMAGE_SYNC_STATUS {
let _ = IMAGE_SYNC_COUNT.remove_label_values(&[tenant_id, timeline_id, op, status]);
}
}
let _ = IMAGE_SYNC_TIME.remove_label_values(&[tenant_id, timeline_id]);
}
}

View File

@@ -178,7 +178,6 @@ use crate::{
TenantTimelineValues,
};
use crate::metrics::{IMAGE_SYNC_COUNT, IMAGE_SYNC_TIME_HISTOGRAM};
use utils::id::{TenantId, TenantTimelineId, TimelineId};
use self::download::download_index_parts;
@@ -836,6 +835,7 @@ async fn process_sync_task_batch(
sync_id,
upload_data,
sync_start,
"upload",
)
.await
}
@@ -879,6 +879,7 @@ async fn process_sync_task_batch(
sync_id,
download_data,
sync_start,
"download",
)
.await;
}
@@ -910,6 +911,7 @@ async fn process_sync_task_batch(
sync_id,
delete_data,
sync_start,
"delete",
)
.instrument(info_span!("delete_timeline_data"))
.await;
@@ -946,9 +948,8 @@ async fn download_timeline_data(
sync_id: TenantTimelineId,
new_download_data: SyncData<LayersDownload>,
sync_start: Instant,
task_name: &str,
) -> DownloadStatus {
static TASK_NAME: &str = "download";
match download_timeline_layers(
conf,
storage,
@@ -960,19 +961,19 @@ async fn download_timeline_data(
.await
{
DownloadedTimeline::Abort => {
register_sync_status(sync_id, sync_start, TASK_NAME, None);
register_sync_status(sync_id, sync_start, task_name, None);
if let Err(e) = index.write().await.set_awaits_download(&sync_id, false) {
error!("Timeline {sync_id} was expected to be in the remote index after a download attempt, but it's absent: {e:?}");
}
}
DownloadedTimeline::FailedAndRescheduled => {
register_sync_status(sync_id, sync_start, TASK_NAME, Some(false));
register_sync_status(sync_id, sync_start, task_name, Some(false));
}
DownloadedTimeline::Successful(mut download_data) => {
match update_local_metadata(conf, sync_id, current_remote_timeline).await {
Ok(()) => match index.write().await.set_awaits_download(&sync_id, false) {
Ok(()) => {
register_sync_status(sync_id, sync_start, TASK_NAME, Some(true));
register_sync_status(sync_id, sync_start, task_name, Some(true));
return DownloadStatus::Downloaded;
}
Err(e) => {
@@ -983,7 +984,7 @@ async fn download_timeline_data(
error!("Failed to update local timeline metadata: {e:?}");
download_data.retries += 1;
sync_queue.push(sync_id, SyncTask::Download(download_data));
register_sync_status(sync_id, sync_start, TASK_NAME, Some(false));
register_sync_status(sync_id, sync_start, task_name, Some(false));
}
}
}
@@ -1059,9 +1060,8 @@ async fn delete_timeline_data(
sync_id: TenantTimelineId,
mut new_delete_data: SyncData<LayersDeletion>,
sync_start: Instant,
task_name: &str,
) {
static TASK_NAME: &str = "delete";
let timeline_delete = &mut new_delete_data.data;
if !timeline_delete.deletion_registered {
@@ -1077,14 +1077,14 @@ async fn delete_timeline_data(
error!("Failed to update remote timeline {sync_id}: {e:?}");
new_delete_data.retries += 1;
sync_queue.push(sync_id, SyncTask::Delete(new_delete_data));
register_sync_status(sync_id, sync_start, TASK_NAME, Some(false));
register_sync_status(sync_id, sync_start, task_name, Some(false));
return;
}
}
timeline_delete.deletion_registered = true;
let sync_status = delete_timeline_layers(storage, sync_queue, sync_id, new_delete_data).await;
register_sync_status(sync_id, sync_start, TASK_NAME, Some(sync_status));
register_sync_status(sync_id, sync_start, task_name, Some(sync_status));
}
async fn read_metadata_file(metadata_path: &Path) -> anyhow::Result<TimelineMetadata> {
@@ -1103,8 +1103,8 @@ async fn upload_timeline_data(
sync_id: TenantTimelineId,
new_upload_data: SyncData<LayersUpload>,
sync_start: Instant,
task_name: &str,
) -> UploadStatus {
static TASK_NAME: &str = "upload";
let mut uploaded_data = match upload_timeline_layers(
storage,
sync_queue,
@@ -1115,7 +1115,7 @@ async fn upload_timeline_data(
.await
{
UploadedTimeline::FailedAndRescheduled(e) => {
register_sync_status(sync_id, sync_start, TASK_NAME, Some(false));
register_sync_status(sync_id, sync_start, task_name, Some(false));
return UploadStatus::Failed(e);
}
UploadedTimeline::Successful(upload_data) => upload_data,
@@ -1134,14 +1134,14 @@ async fn upload_timeline_data(
.await
{
Ok(()) => {
register_sync_status(sync_id, sync_start, TASK_NAME, Some(true));
register_sync_status(sync_id, sync_start, task_name, Some(true));
UploadStatus::Uploaded
}
Err(e) => {
error!("Failed to update remote timeline {sync_id}: {e:?}");
uploaded_data.retries += 1;
sync_queue.push(sync_id, SyncTask::Upload(uploaded_data));
register_sync_status(sync_id, sync_start, TASK_NAME, Some(false));
register_sync_status(sync_id, sync_start, task_name, Some(false));
UploadStatus::Failed(e)
}
}
@@ -1391,22 +1391,16 @@ fn register_sync_status(
let tenant_id = sync_id.tenant_id.to_string();
let timeline_id = sync_id.timeline_id.to_string();
let sync_status = match sync_status {
Some(true) => "success",
Some(false) => "failure",
None => "abort",
};
IMAGE_SYNC_TIME_HISTOGRAM
.with_label_values(&[sync_name, sync_status])
.observe(secs_elapsed);
IMAGE_SYNC_TIME
.with_label_values(&[&tenant_id, &timeline_id])
.add(secs_elapsed);
IMAGE_SYNC_COUNT
.with_label_values(&[&tenant_id, &timeline_id, sync_name, sync_status])
.inc();
match sync_status {
Some(true) => {
IMAGE_SYNC_TIME.with_label_values(&[&tenant_id, &timeline_id, sync_name, "success"])
}
Some(false) => {
IMAGE_SYNC_TIME.with_label_values(&[&tenant_id, &timeline_id, sync_name, "failure"])
}
None => return,
}
.observe(secs_elapsed)
}
#[cfg(test)]

View File

@@ -17,6 +17,7 @@ use tracing::*;
use utils::crashsafe_dir::path_with_suffix_extension;
use std::cmp::min;
use std::collections::hash_map;
use std::collections::hash_map::Entry;
use std::collections::BTreeSet;
use std::collections::HashMap;
@@ -245,12 +246,12 @@ impl Tenant {
let ancestor_ancestor_lsn = ancestor_timeline.get_ancestor_lsn();
if ancestor_ancestor_lsn > *lsn {
// can we safely just branch from the ancestor instead?
bail!(
"invalid start lsn {} for ancestor timeline {}: less than timeline ancestor lsn {}",
lsn,
ancestor_timeline_id,
ancestor_ancestor_lsn,
);
anyhow::bail!(
"invalid start lsn {} for ancestor timeline {}: less than timeline ancestor lsn {}",
lsn,
ancestor_timeline_id,
ancestor_ancestor_lsn,
);
}
}
@@ -405,11 +406,11 @@ impl Tenant {
.with_context(|| format!("Failed to initialize timeline {timeline_id}"))?;
match timelines_accessor.entry(timeline.timeline_id) {
Entry::Occupied(_) => bail!(
hash_map::Entry::Occupied(_) => anyhow::bail!(
"Found freshly initialized timeline {} in the tenant map",
timeline.timeline_id
),
Entry::Vacant(v) => {
hash_map::Entry::Vacant(v) => {
v.insert(timeline);
}
}
@@ -767,7 +768,7 @@ impl Tenant {
})
.with_context(|| {
format!(
"Failed to fsync on first save for config {}",
"Failed to fsync on firts save for config {}",
target_config_path.display()
)
})?;
@@ -1090,11 +1091,11 @@ impl Tenant {
})?;
match timelines.entry(new_timeline_id) {
Entry::Occupied(_) => bail!(
hash_map::Entry::Occupied(_) => anyhow::bail!(
"Found freshly initialized timeline {} in the tenant map",
new_timeline_id
),
Entry::Vacant(v) => {
hash_map::Entry::Vacant(v) => {
v.insert(Arc::clone(&new_timeline));
}
}

View File

@@ -343,9 +343,7 @@ impl Timeline {
match cached_lsn.cmp(&lsn) {
Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check
Ordering::Equal => return Ok(cached_img), // exact LSN match, return the image
Ordering::Greater => {
unreachable!("the returned lsn should never be after the requested lsn")
}
Ordering::Greater => panic!(), // the returned lsn should never be after the requested lsn
}
Some((cached_lsn, cached_img))
}
@@ -728,10 +726,10 @@ impl Timeline {
Ok(())
}
pub fn layer_removal_guard(&self) -> anyhow::Result<MutexGuard<()>> {
pub fn layer_removal_guard(&self) -> Result<MutexGuard<()>, anyhow::Error> {
self.layer_removal_cs
.try_lock()
.map_err(|e| anyhow!("cannot lock compaction critical section {e}"))
.map_err(|e| anyhow::anyhow!("cannot lock compaction critical section {e}"))
}
/// Retrieve current logical size of the timeline.

View File

@@ -31,6 +31,7 @@ use etcd_broker::Client;
use itertools::Itertools;
use once_cell::sync::OnceCell;
use std::future::Future;
use std::sync::Arc;
use tokio::sync::watch;
use tracing::*;
use url::Url;
@@ -87,44 +88,37 @@ pub fn is_etcd_client_initialized() -> bool {
/// That may lead to certain events not being observed by the listener.
#[derive(Debug)]
pub struct TaskHandle<E> {
join_handle: Option<tokio::task::JoinHandle<anyhow::Result<()>>>,
events_receiver: watch::Receiver<TaskStateUpdate<E>>,
events_receiver: watch::Receiver<TaskEvent<E>>,
cancellation: watch::Sender<()>,
}
pub enum TaskEvent<E> {
Update(TaskStateUpdate<E>),
End(anyhow::Result<()>),
}
#[derive(Debug, Clone)]
pub enum TaskStateUpdate<E> {
Init,
pub enum TaskEvent<E> {
Started,
Progress(E),
NewEvent(E),
End,
}
impl<E: Clone> TaskHandle<E> {
/// Initializes the task, starting it immediately after the creation.
pub fn spawn<Fut>(
task: impl FnOnce(watch::Sender<TaskStateUpdate<E>>, watch::Receiver<()>) -> Fut
+ Send
+ 'static,
task: impl FnOnce(Arc<watch::Sender<TaskEvent<E>>>, watch::Receiver<()>) -> Fut + Send + 'static,
) -> Self
where
Fut: Future<Output = anyhow::Result<()>> + Send,
E: Send + Sync + 'static,
Fut: Future<Output = Result<(), String>> + Send,
E: Sync + Send + 'static,
{
let (cancellation, cancellation_receiver) = watch::channel(());
let (events_sender, events_receiver) = watch::channel(TaskStateUpdate::Started);
let (events_sender, events_receiver) = watch::channel(TaskEvent::Started);
let events_sender = Arc::new(events_sender);
let join_handle = WALRECEIVER_RUNTIME.spawn(async move {
events_sender.send(TaskStateUpdate::Started).ok();
task(events_sender, cancellation_receiver).await
let sender = Arc::clone(&events_sender);
let _ = WALRECEIVER_RUNTIME.spawn(async move {
events_sender.send(TaskEvent::Started).ok();
task(sender, cancellation_receiver).await
});
TaskHandle {
join_handle: Some(join_handle),
events_receiver,
cancellation,
}
@@ -132,45 +126,15 @@ impl<E: Clone> TaskHandle<E> {
async fn next_task_event(&mut self) -> TaskEvent<E> {
match self.events_receiver.changed().await {
Ok(()) => TaskEvent::Update((self.events_receiver.borrow()).clone()),
Err(_task_channel_part_dropped) => {
TaskEvent::End(match self.join_handle.take() {
Some(jh) => {
if !jh.is_finished() {
warn!("sender is dropped while join handle is still alive");
}
jh.await
.map_err(|e| anyhow::anyhow!("Failed to join task: {e}"))
.and_then(|x| x)
}
None => {
// Another option is to have an enum, join handle or result and give away the reference to it
Err(anyhow::anyhow!("Task was joined more than once"))
}
})
}
Ok(()) => self.events_receiver.borrow().clone(),
Err(_task_channel_part_dropped) => TaskEvent::End,
}
}
/// Aborts current task, waiting for it to finish.
pub async fn shutdown(self) {
match self.join_handle {
Some(jh) => {
self.cancellation.send(()).ok();
match jh.await {
Ok(Ok(())) => debug!("Shutdown success"),
Ok(Err(e)) => error!("Shutdown task error: {e:?}"),
Err(join_error) => {
if join_error.is_cancelled() {
error!("Shutdown task was cancelled");
} else {
error!("Shutdown task join error: {join_error}")
}
}
}
}
None => {}
}
pub async fn shutdown(mut self) {
self.cancellation.send(()).ok();
// wait until the sender is dropped
while self.events_receiver.changed().await.is_ok() {}
}
}

View File

@@ -16,10 +16,10 @@ use std::{
time::Duration,
};
use crate::task_mgr;
use crate::task_mgr::TaskKind;
use crate::task_mgr::WALRECEIVER_RUNTIME;
use crate::tenant::Timeline;
use crate::{task_mgr, walreceiver::TaskStateUpdate};
use anyhow::Context;
use chrono::{NaiveDateTime, Utc};
use etcd_broker::{
@@ -145,26 +145,19 @@ async fn connection_manager_loop_step(
let wal_connection = walreceiver_state.wal_connection.as_mut()
.expect("Should have a connection, as checked by the corresponding select! guard");
match wal_connection_update {
TaskEvent::Update(c) => {
match c {
TaskStateUpdate::Init | TaskStateUpdate::Started => {},
TaskStateUpdate::Progress(status) => {
if status.has_processed_wal {
// We have advanced last_record_lsn by processing the WAL received
// from this safekeeper. This is good enough to clean unsuccessful
// retries history and allow reconnecting to this safekeeper without
// sleeping for a long time.
walreceiver_state.wal_connection_retries.remove(&wal_connection.sk_id);
}
wal_connection.status = status.to_owned();
}
TaskEvent::Started => {},
TaskEvent::NewEvent(status) => {
if status.has_processed_wal {
// We have advanced last_record_lsn by processing the WAL received
// from this safekeeper. This is good enough to clean unsuccessful
// retries history and allow reconnecting to this safekeeper without
// sleeping for a long time.
walreceiver_state.wal_connection_retries.remove(&wal_connection.sk_id);
}
wal_connection.status = status;
},
TaskEvent::End(walreceiver_task_result) => {
match walreceiver_task_result {
Ok(()) => debug!("WAL receiving task finished"),
Err(e) => error!("wal receiver task finished with an error: {e:?}"),
}
TaskEvent::End => {
debug!("WAL receiving task finished");
walreceiver_state.drop_old_connection(false).await;
},
}
@@ -370,13 +363,13 @@ impl WalreceiverState {
async move {
super::walreceiver_connection::handle_walreceiver_connection(
timeline,
new_wal_source_connstr,
events_sender,
&new_wal_source_connstr,
events_sender.as_ref(),
cancellation,
connect_timeout,
)
.await
.context("walreceiver connection handling failure")
.map_err(|e| format!("walreceiver connection handling failure: {e:#}"))
}
.instrument(info_span!("walreceiver_connection", id = %id))
});
@@ -892,7 +885,7 @@ mod tests {
status: connection_status.clone(),
connection_task: TaskHandle::spawn(move |sender, _| async move {
sender
.send(TaskStateUpdate::Progress(connection_status.clone()))
.send(TaskEvent::NewEvent(connection_status.clone()))
.ok();
Ok(())
}),
@@ -1152,7 +1145,7 @@ mod tests {
status: connection_status.clone(),
connection_task: TaskHandle::spawn(move |sender, _| async move {
sender
.send(TaskStateUpdate::Progress(connection_status.clone()))
.send(TaskEvent::NewEvent(connection_status.clone()))
.ok();
Ok(())
}),
@@ -1240,7 +1233,7 @@ mod tests {
status: connection_status.clone(),
connection_task: TaskHandle::spawn(move |sender, _| async move {
sender
.send(TaskStateUpdate::Progress(connection_status.clone()))
.send(TaskEvent::NewEvent(connection_status.clone()))
.ok();
Ok(())
}),

View File

@@ -16,9 +16,10 @@ use postgres_protocol::message::backend::ReplicationMessage;
use postgres_types::PgLsn;
use tokio::{pin, select, sync::watch, time};
use tokio_postgres::{replication::ReplicationStream, Client};
use tracing::{debug, error, info, trace, warn};
use tracing::{debug, error, info, info_span, trace, warn, Instrument};
use crate::{metrics::LIVE_CONNECTIONS_COUNT, walreceiver::TaskStateUpdate};
use super::TaskEvent;
use crate::metrics::LIVE_CONNECTIONS_COUNT;
use crate::{
task_mgr,
task_mgr::TaskKind,
@@ -54,8 +55,8 @@ pub struct WalConnectionStatus {
/// messages as we go.
pub async fn handle_walreceiver_connection(
timeline: Arc<Timeline>,
wal_source_connstr: String,
events_sender: watch::Sender<TaskStateUpdate<WalConnectionStatus>>,
wal_source_connstr: &str,
events_sender: &watch::Sender<TaskEvent<WalConnectionStatus>>,
mut cancellation: watch::Receiver<()>,
connect_timeout: Duration,
) -> anyhow::Result<()> {
@@ -80,7 +81,7 @@ pub async fn handle_walreceiver_connection(
streaming_lsn: None,
commit_lsn: None,
};
if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status.clone())) {
if let Err(e) = events_sender.send(TaskEvent::NewEvent(connection_status.clone())) {
warn!("Wal connection event listener dropped right after connection init, aborting the connection: {e}");
return Ok(());
}
@@ -111,7 +112,8 @@ pub async fn handle_walreceiver_connection(
_ = connection_cancellation.changed() => info!("Connection cancelled"),
}
Ok(())
},
}
.instrument(info_span!("walreceiver connection")),
);
// Immediately increment the gauge, then create a job to decrement it on task exit.
@@ -132,7 +134,7 @@ pub async fn handle_walreceiver_connection(
connection_status.latest_connection_update = Utc::now().naive_utc();
connection_status.latest_wal_update = Utc::now().naive_utc();
connection_status.commit_lsn = Some(end_of_wal);
if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status.clone())) {
if let Err(e) = events_sender.send(TaskEvent::NewEvent(connection_status.clone())) {
warn!("Wal connection event listener dropped after IDENTIFY_SYSTEM, aborting the connection: {e}");
return Ok(());
}
@@ -200,7 +202,7 @@ pub async fn handle_walreceiver_connection(
}
&_ => {}
};
if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status.clone())) {
if let Err(e) = events_sender.send(TaskEvent::NewEvent(connection_status.clone())) {
warn!("Wal connection event listener dropped, aborting the connection: {e}");
return Ok(());
}
@@ -266,8 +268,7 @@ pub async fn handle_walreceiver_connection(
if !connection_status.has_processed_wal && last_rec_lsn > last_rec_lsn_before_msg {
// We have successfully processed at least one WAL record.
connection_status.has_processed_wal = true;
if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status.clone()))
{
if let Err(e) = events_sender.send(TaskEvent::NewEvent(connection_status.clone())) {
warn!("Wal connection event listener dropped, aborting the connection: {e}");
return Ok(());
}

View File

@@ -259,15 +259,3 @@ fn parse_host_port(input: &str) -> Option<(&str, u16)> {
let (host, port) = input.split_once(':')?;
Some((host, port.parse().ok()?))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_host_port() {
let (host, port) = parse_host_port("127.0.0.1:5432").expect("failed to parse");
assert_eq!(host, "127.0.0.1");
assert_eq!(port, 5432);
}
}

View File

@@ -54,10 +54,13 @@ impl<'a> ClientCredentials<'a> {
let dbname = get_param("database")?;
// Project name might be passed via PG's command-line options.
let project_a = params.options_raw().and_then(|mut options| {
options
.find_map(|opt| opt.strip_prefix("project="))
.map(Cow::Borrowed)
let project_a = params.options_raw().and_then(|options| {
for opt in options {
if let Some(value) = opt.strip_prefix("project=") {
return Some(Cow::Borrowed(value));
}
}
None
});
// Alternative project name is in fact a subdomain from SNI.

View File

@@ -52,16 +52,6 @@ impl CancelMap {
let session = Session::new(key, self);
f(session).await
}
#[cfg(test)]
fn contains(&self, session: &Session) -> bool {
self.0.lock().contains_key(&session.key)
}
#[cfg(test)]
fn is_empty(&self) -> bool {
self.0.lock().is_empty()
}
}
/// This should've been a [`std::future::Future`], but
@@ -114,39 +104,3 @@ impl<'a> Session<'a> {
self.key
}
}
#[cfg(test)]
mod tests {
use super::*;
use once_cell::sync::Lazy;
#[tokio::test]
async fn check_session_drop() -> anyhow::Result<()> {
static CANCEL_MAP: Lazy<CancelMap> = Lazy::new(Default::default);
let (tx, rx) = tokio::sync::oneshot::channel();
let task = tokio::spawn(CANCEL_MAP.with_session(|session| async move {
assert!(CANCEL_MAP.contains(&session));
tx.send(()).expect("failed to send");
let () = futures::future::pending().await; // sleep forever
Ok(())
}));
// Wait until the task has been spawned.
let () = rx.await.context("failed to hear from the task")?;
// Drop the session's entry by cancelling the task.
task.abort();
let error = task.await.expect_err("task should have failed");
if !error.is_cancelled() {
anyhow::bail!(error);
}
// Check that the session has been dropped.
assert!(CANCEL_MAP.is_empty());
Ok(())
}
}

View File

@@ -1,5 +1,6 @@
//! Small parsing helpers.
use std::convert::TryInto;
use std::ffi::CStr;
pub fn split_cstr(bytes: &[u8]) -> Option<(&CStr, &[u8])> {
@@ -9,36 +10,9 @@ pub fn split_cstr(bytes: &[u8]) -> Option<(&CStr, &[u8])> {
Some((unsafe { CStr::from_bytes_with_nul_unchecked(cstr) }, other))
}
/// See <https://doc.rust-lang.org/std/primitive.slice.html#method.split_array_ref>.
pub fn split_at_const<const N: usize>(bytes: &[u8]) -> Option<(&[u8; N], &[u8])> {
(bytes.len() >= N).then(|| {
let (head, tail) = bytes.split_at(N);
(head.try_into().unwrap(), tail)
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_split_cstr() {
assert!(split_cstr(b"").is_none());
assert!(split_cstr(b"foo").is_none());
let (cstr, rest) = split_cstr(b"\0").expect("uh-oh");
assert_eq!(cstr.to_bytes(), b"");
assert_eq!(rest, b"");
let (cstr, rest) = split_cstr(b"foo\0bar").expect("uh-oh");
assert_eq!(cstr.to_bytes(), b"foo");
assert_eq!(rest, b"bar");
}
#[test]
fn test_split_at_const() {
assert!(split_at_const::<0>(b"").is_some());
assert!(split_at_const::<1>(b"").is_none());
assert!(matches!(split_at_const::<1>(b"ok"), Some((b"o", b"k"))));
}
}

View File

@@ -248,18 +248,6 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<SafeKeeperState>
oldstate.timeline_start_lsn = Lsn(1);
oldstate.local_start_lsn = Lsn(1);
return Ok(oldstate);
} else if version == 6 {
info!("reading safekeeper control file version {}", version);
let mut oldstate = SafeKeeperState::des(&buf[..buf.len()])?;
if oldstate.server.pg_version != 0 {
return Ok(oldstate);
}
// set pg_version to the default v14
info!("setting pg_version to 140005");
oldstate.server.pg_version = 140005;
return Ok(oldstate);
}
bail!("unsupported safekeeper control file version {}", version)

View File

@@ -2,7 +2,7 @@
use std::time::{Instant, SystemTime};
use ::metrics::{register_histogram, GaugeVec, Histogram, IntGauge, DISK_WRITE_SECONDS_BUCKETS};
use ::metrics::{register_histogram, GaugeVec, Histogram, DISK_WRITE_SECONDS_BUCKETS};
use anyhow::Result;
use metrics::{
core::{AtomicU64, Collector, Desc, GenericGaugeVec, Opts},
@@ -135,7 +135,6 @@ pub struct TimelineCollector {
written_wal_seconds: GaugeVec,
flushed_wal_seconds: GaugeVec,
collect_timeline_metrics: Gauge,
timelines_count: IntGauge,
}
impl Default for TimelineCollector {
@@ -312,13 +311,6 @@ impl TimelineCollector {
.unwrap();
descs.extend(collect_timeline_metrics.desc().into_iter().cloned());
let timelines_count = IntGauge::new(
"safekeeper_timelines",
"Total number of timelines loaded in-memory",
)
.unwrap();
descs.extend(timelines_count.desc().into_iter().cloned());
TimelineCollector {
descs,
commit_lsn,
@@ -338,7 +330,6 @@ impl TimelineCollector {
written_wal_seconds,
flushed_wal_seconds,
collect_timeline_metrics,
timelines_count,
}
}
}
@@ -370,7 +361,6 @@ impl Collector for TimelineCollector {
self.flushed_wal_seconds.reset();
let timelines = GlobalTimelines::get_all();
let timelines_count = timelines.len();
for arc_tli in timelines {
let tli = arc_tli.info_for_metrics();
@@ -484,10 +474,6 @@ impl Collector for TimelineCollector {
self.collect_timeline_metrics.set(elapsed);
mfs.extend(self.collect_timeline_metrics.collect());
// report total number of timelines
self.timelines_count.set(timelines_count as i64);
mfs.extend(self.timelines_count.collect());
mfs
}
}

View File

@@ -25,7 +25,7 @@ use utils::{
};
pub const SK_MAGIC: u32 = 0xcafeceefu32;
pub const SK_FORMAT_VERSION: u32 = 7;
pub const SK_FORMAT_VERSION: u32 = 6;
const SK_PROTOCOL_VERSION: u32 = 2;
pub const UNKNOWN_SERVER_VERSION: u32 = 0;
@@ -639,6 +639,7 @@ where
let mut state = self.state.clone();
state.server.system_id = msg.system_id;
state.server.wal_seg_size = msg.wal_seg_size;
if msg.pg_version != UNKNOWN_SERVER_VERSION {
state.server.pg_version = msg.pg_version;
}

View File

@@ -314,8 +314,6 @@ impl Timeline {
ttid: TenantTimelineId,
wal_backup_launcher_tx: Sender<TenantTimelineId>,
) -> Result<Timeline> {
let _enter = info_span!("load_timeline", timeline = %ttid.timeline_id).entered();
let shared_state = SharedState::restore(&conf, &ttid)?;
let (commit_lsn_watch_tx, commit_lsn_watch_rx) =
watch::channel(shared_state.sk.state.commit_lsn);

View File

@@ -111,10 +111,6 @@ impl PhysicalStorage {
// Find out where stored WAL ends, starting at commit_lsn which is a
// known recent record boundary (unless we don't have WAL at all).
//
// NB: find_end_of_wal MUST be backwards compatible with the previously
// written WAL. If find_end_of_wal fails to read any WAL written by an
// older version of the code, we could lose data forever.
let write_lsn = if state.commit_lsn == Lsn(0) {
Lsn(0)
} else {
@@ -129,7 +125,7 @@ impl PhysicalStorage {
wal_seg_size,
state.commit_lsn,
)?,
_ => bail!("unsupported postgres version: {}", state.server.pg_version),
_ => bail!("unsupported postgres version"),
}
};
@@ -137,7 +133,7 @@ impl PhysicalStorage {
// If not, maybe it's better to call fsync() here to be sure?
let flush_lsn = write_lsn;
debug!(
info!(
"initialized storage for timeline {}, flush_lsn={}, commit_lsn={}, peer_horizon_lsn={}",
ttid.timeline_id, flush_lsn, state.commit_lsn, state.peer_horizon_lsn,
);

View File

@@ -710,8 +710,8 @@ if __name__ == "__main__":
"--psql-path",
dest="psql_path",
required=False,
default="/usr/local/v14/bin/psql",
help="Path to the psql binary. Default: /usr/local/v14/bin/psql",
default="/usr/local/bin/psql",
help="Path to the psql binary. Default: /usr/local/bin/psql",
)
parser.add_argument(
"--only-import",

View File

@@ -1,5 +1,6 @@
import logging
import logging.config
import re
"""
This file configures logging to use in python tests.
@@ -29,6 +30,17 @@ LOGGING = {
}
class PasswordFilter(logging.Filter):
"""Filter out password from logs."""
# Good enough to filter our passwords produced by PgProtocol.connstr
FILTER = re.compile(r"(\s*)password=[^\s]+(\s*)")
def filter(self, record: logging.LogRecord) -> bool:
record.msg = self.FILTER.sub(r"\1password=<hidden>\2", str(record.msg))
return True
def getLogger(name="root") -> logging.Logger:
"""Method to get logger for tests.
@@ -38,5 +50,6 @@ def getLogger(name="root") -> logging.Logger:
# default logger for tests
log = getLogger()
log.addFilter(PasswordFilter())
logging.config.dictConfig(LOGGING)

View File

@@ -283,15 +283,10 @@ class PgProtocol:
return str(make_dsn(**self.conn_options(**kwargs)))
def conn_options(self, **kwargs):
"""
Construct a dictionary of connection options from default values and extra parameters.
An option can be dropped from the returning dictionary by None-valued extra parameter.
"""
result = self.default_options.copy()
if "dsn" in kwargs:
result.update(parse_dsn(kwargs["dsn"]))
result.update(kwargs)
result = {k: v for k, v in result.items() if v is not None}
# Individual statement timeout in seconds. 2 minutes should be
# enough for our tests, but if you need a longer, you can

View File

@@ -4,7 +4,7 @@ import os
import timeit
from datetime import datetime
from pathlib import Path
from typing import Dict, List
from typing import List
import pytest
from fixtures.benchmark_fixture import MetricReport, PgBenchInitResult, PgBenchRunResult
@@ -24,18 +24,14 @@ def utc_now_timestamp() -> int:
return calendar.timegm(datetime.utcnow().utctimetuple())
def init_pgbench(env: PgCompare, cmdline, password: None):
environ: Dict[str, str] = {}
if password is not None:
environ["PGPASSWORD"] = password
def init_pgbench(env: PgCompare, cmdline):
# calculate timestamps and durations separately
# timestamp is intended to be used for linking to grafana and logs
# duration is actually a metric and uses float instead of int for timestamp
start_timestamp = utc_now_timestamp()
t0 = timeit.default_timer()
with env.record_pageserver_writes("init.pageserver_writes"):
out = env.pg_bin.run_capture(cmdline, env=environ)
out = env.pg_bin.run_capture(cmdline)
env.flush()
duration = timeit.default_timer() - t0
@@ -52,15 +48,13 @@ def init_pgbench(env: PgCompare, cmdline, password: None):
env.zenbenchmark.record_pg_bench_init_result("init", res)
def run_pgbench(env: PgCompare, prefix: str, cmdline, password: None):
environ: Dict[str, str] = {}
if password is not None:
environ["PGPASSWORD"] = password
def run_pgbench(env: PgCompare, prefix: str, cmdline):
with env.record_pageserver_writes(f"{prefix}.pageserver_writes"):
run_start_timestamp = utc_now_timestamp()
t0 = timeit.default_timer()
out = env.pg_bin.run_capture(cmdline, env=environ)
out = env.pg_bin.run_capture(
cmdline,
)
run_duration = timeit.default_timer() - t0
run_end_timestamp = utc_now_timestamp()
env.flush()
@@ -88,14 +82,10 @@ def run_pgbench(env: PgCompare, prefix: str, cmdline, password: None):
def run_test_pgbench(env: PgCompare, scale: int, duration: int, workload_type: PgBenchLoadType):
env.zenbenchmark.record("scale", scale, "", MetricReport.TEST_PARAM)
password = env.pg.default_options.get("password", None)
options = "-cstatement_timeout=1h " + env.pg.default_options.get("options", "")
# drop password from the connection string by passing password=None and set password separately
connstr = env.pg.connstr(password=None, options=options)
if workload_type == PgBenchLoadType.INIT:
# Run initialize
init_pgbench(env, ["pgbench", f"-s{scale}", "-i", connstr], password=password)
options = "-cstatement_timeout=1h " + env.pg.default_options.get("options", "")
init_pgbench(env, ["pgbench", f"-s{scale}", "-i", env.pg.connstr(options=options)])
if workload_type == PgBenchLoadType.SIMPLE_UPDATE:
# Run simple-update workload
@@ -109,9 +99,8 @@ def run_test_pgbench(env: PgCompare, scale: int, duration: int, workload_type: P
f"-T{duration}",
"-P2",
"--progress-timestamp",
connstr,
env.pg.connstr(),
],
password=password,
)
if workload_type == PgBenchLoadType.SELECT_ONLY:
@@ -126,9 +115,8 @@ def run_test_pgbench(env: PgCompare, scale: int, duration: int, workload_type: P
f"-T{duration}",
"-P2",
"--progress-timestamp",
connstr,
env.pg.connstr(),
],
password=password,
)
env.report_size()