mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-26 14:50:36 +00:00
Compare commits
20 Commits
ars/tmp
...
bojan/prox
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1c40c26313 | ||
|
|
a6ace609a7 | ||
|
|
29d72e8955 | ||
|
|
66eb2a1dd3 | ||
|
|
9424bfae22 | ||
|
|
1d90b1b205 | ||
|
|
949f8b4633 | ||
|
|
26a68612d9 | ||
|
|
850dfd02df | ||
|
|
c8a1192b53 | ||
|
|
137d616e76 | ||
|
|
917c640818 | ||
|
|
c1b3836df1 | ||
|
|
5120ba4b5f | ||
|
|
e4670a5f1e | ||
|
|
7fae894648 | ||
|
|
058123f7ef | ||
|
|
87edbd38c7 | ||
|
|
58ee5d005f | ||
|
|
468366a28f |
10
.circleci/ansible/ansible.cfg
Normal file
10
.circleci/ansible/ansible.cfg
Normal file
@@ -0,0 +1,10 @@
|
||||
[defaults]
|
||||
|
||||
localhost_warning = False
|
||||
host_key_checking = False
|
||||
timeout = 30
|
||||
|
||||
[ssh_connection]
|
||||
ssh_args = -F ./ansible.ssh.cfg
|
||||
scp_if_ssh = True
|
||||
pipelining = True
|
||||
11
.circleci/ansible/ansible.ssh.cfg
Normal file
11
.circleci/ansible/ansible.ssh.cfg
Normal file
@@ -0,0 +1,11 @@
|
||||
Host tele.zenith.tech
|
||||
User admin
|
||||
Port 3023
|
||||
StrictHostKeyChecking no
|
||||
UserKnownHostsFile /dev/null
|
||||
|
||||
Host * !tele.zenith.tech
|
||||
User admin
|
||||
StrictHostKeyChecking no
|
||||
UserKnownHostsFile /dev/null
|
||||
ProxyJump tele.zenith.tech
|
||||
174
.circleci/ansible/deploy.yaml
Normal file
174
.circleci/ansible/deploy.yaml
Normal file
@@ -0,0 +1,174 @@
|
||||
- name: Upload Zenith binaries
|
||||
hosts: pageservers:safekeepers
|
||||
gather_facts: False
|
||||
remote_user: admin
|
||||
vars:
|
||||
force_deploy: false
|
||||
|
||||
tasks:
|
||||
|
||||
- name: get latest version of Zenith binaries
|
||||
ignore_errors: true
|
||||
register: current_version_file
|
||||
set_fact:
|
||||
current_version: "{{ lookup('file', '.zenith_current_version') | trim }}"
|
||||
tags:
|
||||
- pageserver
|
||||
- safekeeper
|
||||
|
||||
- name: set zero value for current_version
|
||||
when: current_version_file is failed
|
||||
set_fact:
|
||||
current_version: "0"
|
||||
tags:
|
||||
- pageserver
|
||||
- safekeeper
|
||||
|
||||
- name: get deployed version from content of remote file
|
||||
ignore_errors: true
|
||||
ansible.builtin.slurp:
|
||||
src: /usr/local/.zenith_current_version
|
||||
register: remote_version_file
|
||||
tags:
|
||||
- pageserver
|
||||
- safekeeper
|
||||
|
||||
- name: decode remote file content
|
||||
when: remote_version_file is succeeded
|
||||
set_fact:
|
||||
remote_version: "{{ remote_version_file['content'] | b64decode | trim }}"
|
||||
tags:
|
||||
- pageserver
|
||||
- safekeeper
|
||||
|
||||
- name: set zero value for remote_version
|
||||
when: remote_version_file is failed
|
||||
set_fact:
|
||||
remote_version: "0"
|
||||
tags:
|
||||
- pageserver
|
||||
- safekeeper
|
||||
|
||||
- name: inform about versions
|
||||
debug: msg="Version to deploy - {{ current_version }}, version on storage node - {{ remote_version }}"
|
||||
tags:
|
||||
- pageserver
|
||||
- safekeeper
|
||||
|
||||
|
||||
- name: upload and extract Zenith binaries to /usr/local
|
||||
when: current_version > remote_version or force_deploy
|
||||
ansible.builtin.unarchive:
|
||||
owner: root
|
||||
group: root
|
||||
src: zenith_install.tar.gz
|
||||
dest: /usr/local
|
||||
become: true
|
||||
tags:
|
||||
- pageserver
|
||||
- safekeeper
|
||||
- binaries
|
||||
- putbinaries
|
||||
|
||||
- name: Deploy pageserver
|
||||
hosts: pageservers
|
||||
gather_facts: False
|
||||
remote_user: admin
|
||||
vars:
|
||||
force_deploy: false
|
||||
|
||||
tasks:
|
||||
- name: init pageserver
|
||||
when: current_version > remote_version or force_deploy
|
||||
shell:
|
||||
cmd: sudo -u pageserver /usr/local/bin/pageserver -c "pg_distrib_dir='/usr/local'" --init -D /storage/pageserver/data
|
||||
args:
|
||||
creates: "/storage/pageserver/data/tenants"
|
||||
environment:
|
||||
ZENITH_REPO_DIR: "/storage/pageserver/data"
|
||||
LD_LIBRARY_PATH: "/usr/local/lib"
|
||||
become: true
|
||||
tags:
|
||||
- pageserver
|
||||
|
||||
- name: upload systemd service definition
|
||||
when: current_version > remote_version or force_deploy
|
||||
ansible.builtin.template:
|
||||
src: systemd/pageserver.service
|
||||
dest: /etc/systemd/system/pageserver.service
|
||||
owner: root
|
||||
group: root
|
||||
mode: '0644'
|
||||
become: true
|
||||
tags:
|
||||
- pageserver
|
||||
|
||||
- name: start systemd service
|
||||
when: current_version > remote_version or force_deploy
|
||||
ansible.builtin.systemd:
|
||||
daemon_reload: yes
|
||||
name: pageserver
|
||||
enabled: yes
|
||||
state: restarted
|
||||
become: true
|
||||
tags:
|
||||
- pageserver
|
||||
|
||||
- name: post version to console
|
||||
when: (current_version > remote_version or force_deploy) and console_mgmt_base_url is defined
|
||||
shell:
|
||||
cmd: |
|
||||
INSTANCE_ID=$(curl -s http://169.254.169.254/latest/meta-data/instance-id)
|
||||
curl -sfS -d '{"version": {{ current_version }} }' -X POST {{ console_mgmt_base_url }}/api/v1/pageservers/$INSTANCE_ID
|
||||
tags:
|
||||
- pageserver
|
||||
|
||||
- name: Deploy safekeeper
|
||||
hosts: safekeepers
|
||||
gather_facts: False
|
||||
remote_user: admin
|
||||
vars:
|
||||
force_deploy: false
|
||||
|
||||
tasks:
|
||||
|
||||
# in the future safekeepers should discover pageservers byself
|
||||
# but currently use first pageserver that was discovered
|
||||
- name: set first pageserver var for safekeepers
|
||||
when: current_version > remote_version or force_deploy
|
||||
set_fact:
|
||||
first_pageserver: "{{ hostvars[groups['pageservers'][0]]['inventory_hostname'] }}"
|
||||
tags:
|
||||
- safekeeper
|
||||
|
||||
- name: upload systemd service definition
|
||||
when: current_version > remote_version or force_deploy
|
||||
ansible.builtin.template:
|
||||
src: systemd/safekeeper.service
|
||||
dest: /etc/systemd/system/safekeeper.service
|
||||
owner: root
|
||||
group: root
|
||||
mode: '0644'
|
||||
become: true
|
||||
tags:
|
||||
- safekeeper
|
||||
|
||||
- name: start systemd service
|
||||
when: current_version > remote_version or force_deploy
|
||||
ansible.builtin.systemd:
|
||||
daemon_reload: yes
|
||||
name: safekeeper
|
||||
enabled: yes
|
||||
state: restarted
|
||||
become: true
|
||||
tags:
|
||||
- safekeeper
|
||||
|
||||
- name: post version to console
|
||||
when: (current_version > remote_version or force_deploy) and console_mgmt_base_url is defined
|
||||
shell:
|
||||
cmd: |
|
||||
INSTANCE_ID=$(curl -s http://169.254.169.254/latest/meta-data/instance-id)
|
||||
curl -sfS -d '{"version": {{ current_version }} }' -X POST {{ console_mgmt_base_url }}/api/v1/safekeepers/$INSTANCE_ID
|
||||
tags:
|
||||
- safekeeper
|
||||
52
.circleci/ansible/get_binaries.sh
Executable file
52
.circleci/ansible/get_binaries.sh
Executable file
@@ -0,0 +1,52 @@
|
||||
#!/bin/bash
|
||||
|
||||
set -e
|
||||
|
||||
RELEASE=${RELEASE:-false}
|
||||
|
||||
# look at docker hub for latest tag fo zenith docker image
|
||||
if [ "${RELEASE}" = "true" ]; then
|
||||
echo "search latest relase tag"
|
||||
VERSION=$(curl -s https://registry.hub.docker.com/v1/repositories/zenithdb/zenith/tags |jq -r -S '.[].name' | grep release | sed 's/release-//g' | tail -1)
|
||||
if [ -z "${VERSION}" ]; then
|
||||
echo "no any docker tags found, exiting..."
|
||||
exit 1
|
||||
else
|
||||
TAG="release-${VERSION}"
|
||||
fi
|
||||
else
|
||||
echo "search latest dev tag"
|
||||
VERSION=$(curl -s https://registry.hub.docker.com/v1/repositories/zenithdb/zenith/tags |jq -r -S '.[].name' | grep -v release | tail -1)
|
||||
if [ -z "${VERSION}" ]; then
|
||||
echo "no any docker tags found, exiting..."
|
||||
exit 1
|
||||
else
|
||||
TAG="${VERSION}"
|
||||
fi
|
||||
fi
|
||||
|
||||
echo "found ${VERSION}"
|
||||
|
||||
# do initial cleanup
|
||||
rm -rf zenith_install postgres_install.tar.gz zenith_install.tar.gz .zenith_current_version
|
||||
mkdir zenith_install
|
||||
|
||||
# retrive binaries from docker image
|
||||
echo "getting binaries from docker image"
|
||||
docker pull --quiet zenithdb/zenith:${TAG}
|
||||
ID=$(docker create zenithdb/zenith:${TAG})
|
||||
docker cp ${ID}:/data/postgres_install.tar.gz .
|
||||
tar -xzf postgres_install.tar.gz -C zenith_install
|
||||
docker cp ${ID}:/usr/local/bin/pageserver zenith_install/bin/
|
||||
docker cp ${ID}:/usr/local/bin/safekeeper zenith_install/bin/
|
||||
docker cp ${ID}:/usr/local/bin/proxy zenith_install/bin/
|
||||
docker cp ${ID}:/usr/local/bin/postgres zenith_install/bin/
|
||||
docker rm -vf ${ID}
|
||||
|
||||
# store version to file (for ansible playbooks) and create binaries tarball
|
||||
echo ${VERSION} > zenith_install/.zenith_current_version
|
||||
echo ${VERSION} > .zenith_current_version
|
||||
tar -czf zenith_install.tar.gz -C zenith_install .
|
||||
|
||||
# do final cleaup
|
||||
rm -rf zenith_install postgres_install.tar.gz
|
||||
7
.circleci/ansible/production.hosts
Normal file
7
.circleci/ansible/production.hosts
Normal file
@@ -0,0 +1,7 @@
|
||||
[pageservers]
|
||||
zenith-1-ps-1
|
||||
|
||||
[safekeepers]
|
||||
zenith-1-sk-1
|
||||
zenith-1-sk-2
|
||||
zenith-1-sk-3
|
||||
7
.circleci/ansible/staging.hosts
Normal file
7
.circleci/ansible/staging.hosts
Normal file
@@ -0,0 +1,7 @@
|
||||
[pageservers]
|
||||
zenith-us-stage-ps-1
|
||||
|
||||
[safekeepers]
|
||||
zenith-us-stage-sk-1
|
||||
zenith-us-stage-sk-2
|
||||
zenith-us-stage-sk-3
|
||||
18
.circleci/ansible/systemd/pageserver.service
Normal file
18
.circleci/ansible/systemd/pageserver.service
Normal file
@@ -0,0 +1,18 @@
|
||||
[Unit]
|
||||
Description=Zenith pageserver
|
||||
After=network.target auditd.service
|
||||
|
||||
[Service]
|
||||
Type=simple
|
||||
User=pageserver
|
||||
Environment=RUST_BACKTRACE=1 ZENITH_REPO_DIR=/storage/pageserver LD_LIBRARY_PATH=/usr/local/lib
|
||||
ExecStart=/usr/local/bin/pageserver -c "pg_distrib_dir='/usr/local'" -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" -D /storage/pageserver/data
|
||||
ExecReload=/bin/kill -HUP $MAINPID
|
||||
KillMode=mixed
|
||||
KillSignal=SIGINT
|
||||
Restart=on-failure
|
||||
TimeoutSec=10
|
||||
LimitNOFILE=30000000
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
18
.circleci/ansible/systemd/safekeeper.service
Normal file
18
.circleci/ansible/systemd/safekeeper.service
Normal file
@@ -0,0 +1,18 @@
|
||||
[Unit]
|
||||
Description=Zenith safekeeper
|
||||
After=network.target auditd.service
|
||||
|
||||
[Service]
|
||||
Type=simple
|
||||
User=safekeeper
|
||||
Environment=RUST_BACKTRACE=1 ZENITH_REPO_DIR=/storage/safekeeper/data LD_LIBRARY_PATH=/usr/local/lib
|
||||
ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}.local:6500 --listen-http {{ inventory_hostname }}.local:7676 -p {{ first_pageserver }}:6400 -D /storage/safekeeper/data
|
||||
ExecReload=/bin/kill -HUP $MAINPID
|
||||
KillMode=mixed
|
||||
KillSignal=SIGINT
|
||||
Restart=on-failure
|
||||
TimeoutSec=10
|
||||
LimitNOFILE=30000000
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
@@ -471,46 +471,78 @@ jobs:
|
||||
docker build -t zenithdb/compute-node:latest vendor/postgres && docker push zenithdb/compute-node:latest
|
||||
docker tag zenithdb/compute-node:latest zenithdb/compute-node:${DOCKER_TAG} && docker push zenithdb/compute-node:${DOCKER_TAG}
|
||||
|
||||
# Build production zenithdb/zenith:release image and push it to Docker hub
|
||||
docker-image-release:
|
||||
docker:
|
||||
- image: cimg/base:2021.04
|
||||
steps:
|
||||
- checkout
|
||||
- setup_remote_docker:
|
||||
docker_layer_caching: true
|
||||
- run:
|
||||
name: Init postgres submodule
|
||||
command: git submodule update --init --depth 1
|
||||
- run:
|
||||
name: Build and push Docker image
|
||||
command: |
|
||||
echo $DOCKER_PWD | docker login -u $DOCKER_LOGIN --password-stdin
|
||||
DOCKER_TAG="release-$(git log --oneline|wc -l)"
|
||||
docker build --build-arg GIT_VERSION=$CIRCLE_SHA1 -t zenithdb/zenith:release . && docker push zenithdb/zenith:release
|
||||
docker tag zenithdb/zenith:release zenithdb/zenith:${DOCKER_TAG} && docker push zenithdb/zenith:${DOCKER_TAG}
|
||||
|
||||
# Build production zenithdb/compute-node:release image and push it to Docker hub
|
||||
docker-image-compute-release:
|
||||
docker:
|
||||
- image: cimg/base:2021.04
|
||||
steps:
|
||||
- checkout
|
||||
- setup_remote_docker:
|
||||
docker_layer_caching: true
|
||||
# Build zenithdb/compute-tools:release image and push it to Docker hub
|
||||
# TODO: this should probably also use versioned tag, not just :latest.
|
||||
# XXX: but should it? We build and use it only locally now.
|
||||
- run:
|
||||
name: Build and push compute-tools Docker image
|
||||
command: |
|
||||
echo $DOCKER_PWD | docker login -u $DOCKER_LOGIN --password-stdin
|
||||
docker build -t zenithdb/compute-tools:release -f Dockerfile.compute-tools .
|
||||
docker push zenithdb/compute-tools:release
|
||||
- run:
|
||||
name: Init postgres submodule
|
||||
command: git submodule update --init --depth 1
|
||||
- run:
|
||||
name: Build and push compute-node Docker image
|
||||
command: |
|
||||
echo $DOCKER_PWD | docker login -u $DOCKER_LOGIN --password-stdin
|
||||
DOCKER_TAG="release-$(git log --oneline|wc -l)"
|
||||
docker build -t zenithdb/compute-node:release vendor/postgres && docker push zenithdb/compute-node:release
|
||||
docker tag zenithdb/compute-node:release zenithdb/compute-node:${DOCKER_TAG} && docker push zenithdb/compute-node:${DOCKER_TAG}
|
||||
|
||||
deploy-staging:
|
||||
docker:
|
||||
- image: cimg/python:3.10
|
||||
steps:
|
||||
- checkout
|
||||
- setup_remote_docker
|
||||
- run:
|
||||
name: Get Zenith binaries
|
||||
command: |
|
||||
rm -rf zenith_install postgres_install.tar.gz zenith_install.tar.gz
|
||||
mkdir zenith_install
|
||||
DOCKER_TAG=$(git log --oneline|wc -l)
|
||||
docker pull --quiet zenithdb/zenith:${DOCKER_TAG}
|
||||
ID=$(docker create zenithdb/zenith:${DOCKER_TAG})
|
||||
docker cp $ID:/data/postgres_install.tar.gz .
|
||||
tar -xzf postgres_install.tar.gz -C zenith_install && rm postgres_install.tar.gz
|
||||
docker cp $ID:/usr/local/bin/pageserver zenith_install/bin/
|
||||
docker cp $ID:/usr/local/bin/safekeeper zenith_install/bin/
|
||||
docker cp $ID:/usr/local/bin/proxy zenith_install/bin/
|
||||
docker cp $ID:/usr/local/bin/postgres zenith_install/bin/
|
||||
docker rm -v $ID
|
||||
echo ${DOCKER_TAG} | tee zenith_install/.zenith_current_version
|
||||
tar -czf zenith_install.tar.gz -C zenith_install .
|
||||
ls -la zenith_install.tar.gz
|
||||
- run:
|
||||
name: Setup ansible
|
||||
command: |
|
||||
pip install --progress-bar off --user ansible boto3
|
||||
ansible-galaxy collection install amazon.aws
|
||||
- run:
|
||||
name: Apply re-deploy playbook
|
||||
environment:
|
||||
ANSIBLE_HOST_KEY_CHECKING: false
|
||||
name: Redeploy
|
||||
command: |
|
||||
echo "${STAGING_SSH_KEY}" | base64 --decode | ssh-add -
|
||||
export AWS_REGION=${STAGING_AWS_REGION}
|
||||
export AWS_ACCESS_KEY_ID=${STAGING_AWS_ACCESS_KEY_ID}
|
||||
export AWS_SECRET_ACCESS_KEY=${STAGING_AWS_SECRET_ACCESS_KEY}
|
||||
ansible-playbook .circleci/storage-redeploy.playbook.yml
|
||||
rm -f zenith_install.tar.gz
|
||||
cd "$(pwd)/.circleci/ansible"
|
||||
|
||||
./get_binaries.sh
|
||||
|
||||
echo "${TELEPORT_SSH_KEY}" | tr -d '\n'| base64 --decode >ssh-key
|
||||
echo "${TELEPORT_SSH_CERT}" | tr -d '\n'| base64 --decode >ssh-key-cert.pub
|
||||
chmod 0600 ssh-key
|
||||
ssh-add ssh-key
|
||||
rm -f ssh-key ssh-key-cert.pub
|
||||
|
||||
ansible-playbook deploy.yaml -i staging.hosts
|
||||
rm -f zenith_install.tar.gz .zenith_current_version
|
||||
|
||||
deploy-staging-proxy:
|
||||
docker:
|
||||
@@ -533,7 +565,57 @@ jobs:
|
||||
name: Re-deploy proxy
|
||||
command: |
|
||||
DOCKER_TAG=$(git log --oneline|wc -l)
|
||||
helm upgrade zenith-proxy zenithdb/zenith-proxy --install -f .circleci/proxy.staging.yaml --set image.tag=${DOCKER_TAG} --wait
|
||||
helm upgrade zenith-proxy zenithdb/zenith-proxy --install -f .circleci/helm-values/staging.proxy.yaml --set image.tag=${DOCKER_TAG} --wait
|
||||
|
||||
|
||||
deploy-release:
|
||||
docker:
|
||||
- image: cimg/python:3.10
|
||||
steps:
|
||||
- checkout
|
||||
- setup_remote_docker
|
||||
- run:
|
||||
name: Setup ansible
|
||||
command: |
|
||||
pip install --progress-bar off --user ansible boto3
|
||||
- run:
|
||||
name: Redeploy
|
||||
command: |
|
||||
cd "$(pwd)/.circleci/ansible"
|
||||
|
||||
RELEASE=true ./get_binaries.sh
|
||||
|
||||
echo "${TELEPORT_SSH_KEY}" | tr -d '\n'| base64 --decode >ssh-key
|
||||
echo "${TELEPORT_SSH_CERT}" | tr -d '\n'| base64 --decode >ssh-key-cert.pub
|
||||
chmod 0600 ssh-key
|
||||
ssh-add ssh-key
|
||||
rm -f ssh-key ssh-key-cert.pub
|
||||
|
||||
ansible-playbook deploy.yaml -i production.hosts -e console_mgmt_base_url=http://console-release.local
|
||||
rm -f zenith_install.tar.gz .zenith_current_version
|
||||
|
||||
deploy-release-proxy:
|
||||
docker:
|
||||
- image: cimg/base:2021.04
|
||||
environment:
|
||||
KUBECONFIG: .kubeconfig
|
||||
steps:
|
||||
- checkout
|
||||
- run:
|
||||
name: Store kubeconfig file
|
||||
command: |
|
||||
echo "${PRODUCTION_KUBECONFIG_DATA}" | base64 --decode > ${KUBECONFIG}
|
||||
chmod 0600 ${KUBECONFIG}
|
||||
- run:
|
||||
name: Setup helm v3
|
||||
command: |
|
||||
curl -s https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 | bash
|
||||
helm repo add zenithdb https://zenithdb.github.io/helm-charts
|
||||
- run:
|
||||
name: Re-deploy proxy
|
||||
command: |
|
||||
DOCKER_TAG="release-$(git log --oneline|wc -l)"
|
||||
helm upgrade zenith-proxy zenithdb/zenith-proxy --install -f .circleci/helm-values/production.proxy.yaml --set image.tag=${DOCKER_TAG} --wait
|
||||
|
||||
# Trigger a new remote CI job
|
||||
remote-ci-trigger:
|
||||
@@ -669,6 +751,47 @@ workflows:
|
||||
- main
|
||||
requires:
|
||||
- docker-image
|
||||
|
||||
- docker-image-release:
|
||||
# Context gives an ability to login
|
||||
context: Docker Hub
|
||||
# Build image only for commits to main
|
||||
filters:
|
||||
branches:
|
||||
only:
|
||||
- release
|
||||
requires:
|
||||
- pg_regress-tests-release
|
||||
- other-tests-release
|
||||
- docker-image-compute-release:
|
||||
# Context gives an ability to login
|
||||
context: Docker Hub
|
||||
# Build image only for commits to main
|
||||
filters:
|
||||
branches:
|
||||
only:
|
||||
- release
|
||||
requires:
|
||||
- pg_regress-tests-release
|
||||
- other-tests-release
|
||||
- deploy-release:
|
||||
# Context gives an ability to login
|
||||
context: Docker Hub
|
||||
# deploy only for commits to main
|
||||
filters:
|
||||
branches:
|
||||
only:
|
||||
- release
|
||||
requires:
|
||||
- docker-image-release
|
||||
- deploy-release-proxy:
|
||||
# deploy only for commits to main
|
||||
filters:
|
||||
branches:
|
||||
only:
|
||||
- release
|
||||
requires:
|
||||
- docker-image-release
|
||||
- remote-ci-trigger:
|
||||
# Context passes credentials for gh api
|
||||
context: CI_ACCESS_TOKEN
|
||||
|
||||
35
.circleci/helm-values/production.proxy.yaml
Normal file
35
.circleci/helm-values/production.proxy.yaml
Normal file
@@ -0,0 +1,35 @@
|
||||
# Helm chart values for zenith-proxy.
|
||||
# This is a YAML-formatted file.
|
||||
|
||||
settings:
|
||||
authEndpoint: "https://console.zenith.tech/authenticate_proxy_request/"
|
||||
uri: "https://console.zenith.tech/psql_session/"
|
||||
|
||||
# -- Additional labels for zenith-proxy pods
|
||||
podLabels:
|
||||
zenith_service: proxy
|
||||
zenith_env: production
|
||||
zenith_region: us-west-2
|
||||
zenith_region_slug: oregon
|
||||
|
||||
service:
|
||||
annotations:
|
||||
service.beta.kubernetes.io/aws-load-balancer-type: external
|
||||
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
|
||||
service.beta.kubernetes.io/aws-load-balancer-scheme: internal
|
||||
external-dns.alpha.kubernetes.io/hostname: proxy-release.local
|
||||
type: LoadBalancer
|
||||
|
||||
exposedService:
|
||||
annotations:
|
||||
service.beta.kubernetes.io/aws-load-balancer-type: external
|
||||
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
|
||||
service.beta.kubernetes.io/aws-load-balancer-scheme: internet-facing
|
||||
external-dns.alpha.kubernetes.io/hostname: start.zenith.tech
|
||||
|
||||
metrics:
|
||||
enabled: true
|
||||
serviceMonitor:
|
||||
enabled: true
|
||||
selector:
|
||||
release: kube-prometheus-stack
|
||||
@@ -1,138 +0,0 @@
|
||||
- name: discover storage nodes
|
||||
hosts: localhost
|
||||
connection: local
|
||||
gather_facts: False
|
||||
|
||||
tasks:
|
||||
|
||||
- name: discover safekeepers
|
||||
no_log: true
|
||||
ec2_instance_info:
|
||||
filters:
|
||||
"tag:zenith_env": "staging"
|
||||
"tag:zenith_service": "safekeeper"
|
||||
register: ec2_safekeepers
|
||||
|
||||
- name: discover pageservers
|
||||
no_log: true
|
||||
ec2_instance_info:
|
||||
filters:
|
||||
"tag:zenith_env": "staging"
|
||||
"tag:zenith_service": "pageserver"
|
||||
register: ec2_pageservers
|
||||
|
||||
- name: add safekeepers to host group
|
||||
no_log: true
|
||||
add_host:
|
||||
name: safekeeper-{{ ansible_loop.index }}
|
||||
ansible_host: "{{ item.public_ip_address }}"
|
||||
groups:
|
||||
- storage
|
||||
- safekeepers
|
||||
with_items: "{{ ec2_safekeepers.instances }}"
|
||||
loop_control:
|
||||
extended: yes
|
||||
|
||||
- name: add pageservers to host group
|
||||
no_log: true
|
||||
add_host:
|
||||
name: pageserver-{{ ansible_loop.index }}
|
||||
ansible_host: "{{ item.public_ip_address }}"
|
||||
groups:
|
||||
- storage
|
||||
- pageservers
|
||||
with_items: "{{ ec2_pageservers.instances }}"
|
||||
loop_control:
|
||||
extended: yes
|
||||
|
||||
- name: Retrive versions
|
||||
hosts: storage
|
||||
gather_facts: False
|
||||
remote_user: admin
|
||||
|
||||
tasks:
|
||||
|
||||
- name: Get current version of binaries
|
||||
set_fact:
|
||||
current_version: "{{lookup('file', '../zenith_install/.zenith_current_version') }}"
|
||||
|
||||
- name: Check that file with version exists on host
|
||||
stat:
|
||||
path: /usr/local/.zenith_current_version
|
||||
register: version_file
|
||||
|
||||
- name: Try to get current version from the host
|
||||
when: version_file.stat.exists
|
||||
ansible.builtin.fetch:
|
||||
src: /usr/local/.zenith_current_version
|
||||
dest: .remote_version.{{ inventory_hostname }}
|
||||
fail_on_missing: no
|
||||
flat: yes
|
||||
|
||||
- name: Store remote version to variable
|
||||
when: version_file.stat.exists
|
||||
set_fact:
|
||||
remote_version: "{{ lookup('file', '.remote_version.{{ inventory_hostname }}') }}"
|
||||
|
||||
- name: Store default value of remote version to variable in case when remote version file not found
|
||||
when: not version_file.stat.exists
|
||||
set_fact:
|
||||
remote_version: "000"
|
||||
|
||||
- name: Extract Zenith binaries
|
||||
hosts: storage
|
||||
gather_facts: False
|
||||
remote_user: admin
|
||||
|
||||
tasks:
|
||||
|
||||
- name: Inform about version conflict
|
||||
when: current_version <= remote_version
|
||||
debug: msg="Current version {{ current_version }} LE than remote {{ remote_version }}"
|
||||
|
||||
- name: Extract Zenith binaries to /usr/local
|
||||
when: current_version > remote_version
|
||||
ansible.builtin.unarchive:
|
||||
src: ../zenith_install.tar.gz
|
||||
dest: /usr/local
|
||||
become: true
|
||||
|
||||
- name: Restart safekeepers
|
||||
hosts: safekeepers
|
||||
gather_facts: False
|
||||
remote_user: admin
|
||||
|
||||
tasks:
|
||||
|
||||
- name: Inform about version conflict
|
||||
when: current_version <= remote_version
|
||||
debug: msg="Current version {{ current_version }} LE than remote {{ remote_version }}"
|
||||
|
||||
- name: Restart systemd service
|
||||
when: current_version > remote_version
|
||||
ansible.builtin.systemd:
|
||||
daemon_reload: yes
|
||||
name: safekeeper
|
||||
enabled: yes
|
||||
state: restarted
|
||||
become: true
|
||||
|
||||
- name: Restart pageservers
|
||||
hosts: pageservers
|
||||
gather_facts: False
|
||||
remote_user: admin
|
||||
|
||||
tasks:
|
||||
|
||||
- name: Inform about version conflict
|
||||
when: current_version <= remote_version
|
||||
debug: msg="Current version {{ current_version }} LE than remote {{ remote_version }}"
|
||||
|
||||
- name: Restart systemd service
|
||||
when: current_version > remote_version
|
||||
ansible.builtin.systemd:
|
||||
daemon_reload: yes
|
||||
name: pageserver
|
||||
enabled: yes
|
||||
state: restarted
|
||||
become: true
|
||||
@@ -6,7 +6,7 @@
|
||||
# Build Postgres separately --- this layer will be rebuilt only if one of
|
||||
# mentioned paths will get any changes.
|
||||
#
|
||||
FROM zenithdb/build:buster AS pg-build
|
||||
FROM zimg/rust:1.56 AS pg-build
|
||||
WORKDIR /zenith
|
||||
COPY ./vendor/postgres vendor/postgres
|
||||
COPY ./Makefile Makefile
|
||||
@@ -20,7 +20,7 @@ RUN rm -rf postgres_install/build
|
||||
# TODO: build cargo deps as separate layer. We used cargo-chef before but that was
|
||||
# net time waste in a lot of cases. Copying Cargo.lock with empty lib.rs should do the work.
|
||||
#
|
||||
FROM zenithdb/build:buster AS build
|
||||
FROM zimg/rust:1.56 AS build
|
||||
|
||||
ARG GIT_VERSION
|
||||
RUN if [ -z "$GIT_VERSION" ]; then echo "GIT_VERSION is reqired, use build_arg to pass it"; exit 1; fi
|
||||
@@ -34,7 +34,7 @@ RUN GIT_VERSION=$GIT_VERSION cargo build --release
|
||||
#
|
||||
# Copy binaries to resulting image.
|
||||
#
|
||||
FROM debian:buster-slim
|
||||
FROM debian:bullseye-slim
|
||||
WORKDIR /data
|
||||
|
||||
RUN apt-get update && apt-get -yq install libreadline-dev libseccomp-dev openssl ca-certificates && \
|
||||
|
||||
@@ -1,16 +0,0 @@
|
||||
#
|
||||
# Image with all the required dependencies to build https://github.com/zenithdb/zenith
|
||||
# and Postgres from https://github.com/zenithdb/postgres
|
||||
# Also includes some rust development and build tools.
|
||||
# NB: keep in sync with rust image version in .circle/config.yml
|
||||
#
|
||||
FROM rust:1.56.1-slim-buster
|
||||
WORKDIR /zenith
|
||||
|
||||
# Install postgres and zenith build dependencies
|
||||
# clang is for rocksdb
|
||||
RUN apt-get update && apt-get -yq install automake libtool build-essential bison flex libreadline-dev zlib1g-dev libxml2-dev \
|
||||
libseccomp-dev pkg-config libssl-dev clang
|
||||
|
||||
# Install rust tools
|
||||
RUN rustup component add clippy && cargo install cargo-audit
|
||||
@@ -171,7 +171,7 @@ impl PgQuote for PgIdent {
|
||||
/// always quotes provided string with `""` and escapes every `"`. Not idempotent,
|
||||
/// i.e. if string is already escaped it will be escaped again.
|
||||
fn quote(&self) -> String {
|
||||
let result = format!("\"{}\"", self.replace("\"", "\"\""));
|
||||
let result = format!("\"{}\"", self.replace('"', "\"\""));
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
@@ -215,7 +215,7 @@ pub fn handle_databases(spec: &ClusterSpec, client: &mut Client) -> Result<()> {
|
||||
if let Some(r) = pg_db {
|
||||
// XXX: db owner name is returned as quoted string from Postgres,
|
||||
// when quoting is needed.
|
||||
let new_owner = if r.owner.starts_with('\"') {
|
||||
let new_owner = if r.owner.starts_with('"') {
|
||||
db.owner.quote()
|
||||
} else {
|
||||
db.owner.clone()
|
||||
|
||||
@@ -12,7 +12,7 @@ use std::path::{Path, PathBuf};
|
||||
use std::process::{Command, Stdio};
|
||||
use zenith_utils::auth::{encode_from_key_file, Claims, Scope};
|
||||
use zenith_utils::postgres_backend::AuthType;
|
||||
use zenith_utils::zid::{opt_display_serde, ZNodeId, ZTenantId};
|
||||
use zenith_utils::zid::{HexZTenantId, ZNodeId, ZTenantId};
|
||||
|
||||
use crate::safekeeper::SafekeeperNode;
|
||||
|
||||
@@ -47,9 +47,8 @@ pub struct LocalEnv {
|
||||
|
||||
// Default tenant ID to use with the 'zenith' command line utility, when
|
||||
// --tenantid is not explicitly specified.
|
||||
#[serde(with = "opt_display_serde")]
|
||||
#[serde(default)]
|
||||
pub default_tenantid: Option<ZTenantId>,
|
||||
pub default_tenantid: Option<HexZTenantId>,
|
||||
|
||||
// used to issue tokens during e.g pg start
|
||||
#[serde(default)]
|
||||
@@ -185,7 +184,7 @@ impl LocalEnv {
|
||||
|
||||
// If no initial tenant ID was given, generate it.
|
||||
if env.default_tenantid.is_none() {
|
||||
env.default_tenantid = Some(ZTenantId::generate());
|
||||
env.default_tenantid = Some(HexZTenantId::from(ZTenantId::generate()));
|
||||
}
|
||||
|
||||
env.base_data_dir = base_path();
|
||||
|
||||
@@ -7,32 +7,14 @@ Currently we build two main images:
|
||||
- [zenithdb/zenith](https://hub.docker.com/repository/docker/zenithdb/zenith) — image with pre-built `pageserver`, `safekeeper` and `proxy` binaries and all the required runtime dependencies. Built from [/Dockerfile](/Dockerfile).
|
||||
- [zenithdb/compute-node](https://hub.docker.com/repository/docker/zenithdb/compute-node) — compute node image with pre-built Postgres binaries from [zenithdb/postgres](https://github.com/zenithdb/postgres).
|
||||
|
||||
And two intermediate images used either to reduce build time or to deliver some additional binary tools from other repos:
|
||||
And additional intermediate images:
|
||||
|
||||
- [zenithdb/build](https://hub.docker.com/repository/docker/zenithdb/build) — image with all the dependencies required to build Zenith and compute node images. This image is based on `rust:slim-buster`, so it also has a proper `rust` environment. Built from [/Dockerfile.build](/Dockerfile.build).
|
||||
- [zenithdb/compute-tools](https://hub.docker.com/repository/docker/zenithdb/compute-tools) — compute node configuration management tools.
|
||||
|
||||
## Building pipeline
|
||||
|
||||
1. Image `zenithdb/compute-tools` is re-built automatically.
|
||||
|
||||
2. Image `zenithdb/build` is built manually. If you want to introduce any new compile time dependencies to Zenith or compute node you have to update this image as well, build it and push to Docker Hub.
|
||||
2. Image `zenithdb/compute-node` is built independently in the [zenithdb/postgres](https://github.com/zenithdb/postgres) repo.
|
||||
|
||||
Build:
|
||||
```sh
|
||||
docker build -t zenithdb/build:buster -f Dockerfile.build .
|
||||
```
|
||||
|
||||
Login:
|
||||
```sh
|
||||
docker login
|
||||
```
|
||||
|
||||
Push to Docker Hub:
|
||||
```sh
|
||||
docker push zenithdb/build:buster
|
||||
```
|
||||
|
||||
3. Image `zenithdb/compute-node` is built independently in the [zenithdb/postgres](https://github.com/zenithdb/postgres) repo.
|
||||
|
||||
4. Image `zenithdb/zenith` is built in this repo after a successful `release` tests run and pushed to Docker Hub automatically.
|
||||
3. Image `zenithdb/zenith` is built in this repo after a successful `release` tests run and pushed to Docker Hub automatically.
|
||||
|
||||
@@ -16,10 +16,9 @@ use std::{
|
||||
};
|
||||
use tracing::*;
|
||||
|
||||
use zenith_utils::crashsafe_dir;
|
||||
use zenith_utils::logging;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
use zenith_utils::zid::{ZTenantId, ZTimelineId};
|
||||
use zenith_utils::{crashsafe_dir, logging};
|
||||
|
||||
use crate::walredo::WalRedoManager;
|
||||
use crate::CheckpointConfig;
|
||||
|
||||
@@ -36,6 +36,9 @@ pub mod defaults {
|
||||
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
|
||||
pub const DEFAULT_GC_PERIOD: &str = "100 s";
|
||||
|
||||
pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s";
|
||||
pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s";
|
||||
|
||||
pub const DEFAULT_SUPERUSER: &str = "zenith_admin";
|
||||
pub const DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNC: usize = 100;
|
||||
pub const DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS: u32 = 10;
|
||||
@@ -59,6 +62,9 @@ pub mod defaults {
|
||||
#gc_period = '{DEFAULT_GC_PERIOD}'
|
||||
#gc_horizon = {DEFAULT_GC_HORIZON}
|
||||
|
||||
#wait_lsn_timeout = '{DEFAULT_WAIT_LSN_TIMEOUT}'
|
||||
#wal_redo_timeout = '{DEFAULT_WAL_REDO_TIMEOUT}'
|
||||
|
||||
#max_file_descriptors = {DEFAULT_MAX_FILE_DESCRIPTORS}
|
||||
|
||||
# initial superuser role name to use when creating a new tenant
|
||||
@@ -89,6 +95,12 @@ pub struct PageServerConf {
|
||||
|
||||
pub gc_horizon: u64,
|
||||
pub gc_period: Duration,
|
||||
|
||||
// Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call.
|
||||
pub wait_lsn_timeout: Duration,
|
||||
// How long to wait for WAL redo to complete.
|
||||
pub wal_redo_timeout: Duration,
|
||||
|
||||
pub superuser: String,
|
||||
|
||||
pub page_cache_size: usize,
|
||||
@@ -137,6 +149,10 @@ struct PageServerConfigBuilder {
|
||||
|
||||
gc_horizon: BuilderValue<u64>,
|
||||
gc_period: BuilderValue<Duration>,
|
||||
|
||||
wait_lsn_timeout: BuilderValue<Duration>,
|
||||
wal_redo_timeout: BuilderValue<Duration>,
|
||||
|
||||
superuser: BuilderValue<String>,
|
||||
|
||||
page_cache_size: BuilderValue<usize>,
|
||||
@@ -168,6 +184,10 @@ impl Default for PageServerConfigBuilder {
|
||||
gc_horizon: Set(DEFAULT_GC_HORIZON),
|
||||
gc_period: Set(humantime::parse_duration(DEFAULT_GC_PERIOD)
|
||||
.expect("cannot parse default gc period")),
|
||||
wait_lsn_timeout: Set(humantime::parse_duration(DEFAULT_WAIT_LSN_TIMEOUT)
|
||||
.expect("cannot parse default wait lsn timeout")),
|
||||
wal_redo_timeout: Set(humantime::parse_duration(DEFAULT_WAL_REDO_TIMEOUT)
|
||||
.expect("cannot parse default wal redo timeout")),
|
||||
superuser: Set(DEFAULT_SUPERUSER.to_string()),
|
||||
page_cache_size: Set(DEFAULT_PAGE_CACHE_SIZE),
|
||||
max_file_descriptors: Set(DEFAULT_MAX_FILE_DESCRIPTORS),
|
||||
@@ -208,6 +228,14 @@ impl PageServerConfigBuilder {
|
||||
self.gc_period = BuilderValue::Set(gc_period)
|
||||
}
|
||||
|
||||
pub fn wait_lsn_timeout(&mut self, wait_lsn_timeout: Duration) {
|
||||
self.wait_lsn_timeout = BuilderValue::Set(wait_lsn_timeout)
|
||||
}
|
||||
|
||||
pub fn wal_redo_timeout(&mut self, wal_redo_timeout: Duration) {
|
||||
self.wal_redo_timeout = BuilderValue::Set(wal_redo_timeout)
|
||||
}
|
||||
|
||||
pub fn superuser(&mut self, superuser: String) {
|
||||
self.superuser = BuilderValue::Set(superuser)
|
||||
}
|
||||
@@ -265,6 +293,12 @@ impl PageServerConfigBuilder {
|
||||
.gc_horizon
|
||||
.ok_or(anyhow::anyhow!("missing gc_horizon"))?,
|
||||
gc_period: self.gc_period.ok_or(anyhow::anyhow!("missing gc_period"))?,
|
||||
wait_lsn_timeout: self
|
||||
.wait_lsn_timeout
|
||||
.ok_or(anyhow::anyhow!("missing wait_lsn_timeout"))?,
|
||||
wal_redo_timeout: self
|
||||
.wal_redo_timeout
|
||||
.ok_or(anyhow::anyhow!("missing wal_redo_timeout"))?,
|
||||
superuser: self.superuser.ok_or(anyhow::anyhow!("missing superuser"))?,
|
||||
page_cache_size: self
|
||||
.page_cache_size
|
||||
@@ -414,6 +448,8 @@ impl PageServerConf {
|
||||
"checkpoint_period" => builder.checkpoint_period(parse_toml_duration(key, item)?),
|
||||
"gc_horizon" => builder.gc_horizon(parse_toml_u64(key, item)?),
|
||||
"gc_period" => builder.gc_period(parse_toml_duration(key, item)?),
|
||||
"wait_lsn_timeout" => builder.wait_lsn_timeout(parse_toml_duration(key, item)?),
|
||||
"wal_redo_timeout" => builder.wal_redo_timeout(parse_toml_duration(key, item)?),
|
||||
"initial_superuser_name" => builder.superuser(parse_toml_string(key, item)?),
|
||||
"page_cache_size" => builder.page_cache_size(parse_toml_u64(key, item)? as usize),
|
||||
"max_file_descriptors" => {
|
||||
@@ -548,6 +584,8 @@ impl PageServerConf {
|
||||
checkpoint_period: Duration::from_secs(10),
|
||||
gc_horizon: defaults::DEFAULT_GC_HORIZON,
|
||||
gc_period: Duration::from_secs(10),
|
||||
wait_lsn_timeout: Duration::from_secs(60),
|
||||
wal_redo_timeout: Duration::from_secs(60),
|
||||
page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE,
|
||||
max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS,
|
||||
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
|
||||
@@ -618,6 +656,9 @@ checkpoint_period = '111 s'
|
||||
gc_period = '222 s'
|
||||
gc_horizon = 222
|
||||
|
||||
wait_lsn_timeout = '111 s'
|
||||
wal_redo_timeout = '111 s'
|
||||
|
||||
page_cache_size = 444
|
||||
max_file_descriptors = 333
|
||||
|
||||
@@ -650,6 +691,8 @@ id = 10
|
||||
checkpoint_period: humantime::parse_duration(defaults::DEFAULT_CHECKPOINT_PERIOD)?,
|
||||
gc_horizon: defaults::DEFAULT_GC_HORIZON,
|
||||
gc_period: humantime::parse_duration(defaults::DEFAULT_GC_PERIOD)?,
|
||||
wait_lsn_timeout: humantime::parse_duration(defaults::DEFAULT_WAIT_LSN_TIMEOUT)?,
|
||||
wal_redo_timeout: humantime::parse_duration(defaults::DEFAULT_WAL_REDO_TIMEOUT)?,
|
||||
superuser: defaults::DEFAULT_SUPERUSER.to_string(),
|
||||
page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE,
|
||||
max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS,
|
||||
@@ -692,6 +735,8 @@ id = 10
|
||||
checkpoint_period: Duration::from_secs(111),
|
||||
gc_horizon: 222,
|
||||
gc_period: Duration::from_secs(222),
|
||||
wait_lsn_timeout: Duration::from_secs(111),
|
||||
wal_redo_timeout: Duration::from_secs(111),
|
||||
superuser: "zzzz".to_string(),
|
||||
page_cache_size: 444,
|
||||
max_file_descriptors: 333,
|
||||
|
||||
@@ -19,7 +19,8 @@ use zenith_utils::http::{
|
||||
};
|
||||
use zenith_utils::http::{RequestExt, RouterBuilder};
|
||||
use zenith_utils::lsn::Lsn;
|
||||
use zenith_utils::zid::{opt_display_serde, ZTimelineId};
|
||||
use zenith_utils::zid::HexZTimelineId;
|
||||
use zenith_utils::zid::ZTimelineId;
|
||||
|
||||
use super::models::BranchCreateRequest;
|
||||
use super::models::StatusResponse;
|
||||
@@ -198,8 +199,7 @@ enum TimelineInfo {
|
||||
timeline_id: ZTimelineId,
|
||||
#[serde(with = "hex")]
|
||||
tenant_id: ZTenantId,
|
||||
#[serde(with = "opt_display_serde")]
|
||||
ancestor_timeline_id: Option<ZTimelineId>,
|
||||
ancestor_timeline_id: Option<HexZTimelineId>,
|
||||
last_record_lsn: Lsn,
|
||||
prev_record_lsn: Lsn,
|
||||
disk_consistent_lsn: Lsn,
|
||||
@@ -232,7 +232,9 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
|
||||
Some(timeline) => TimelineInfo::Local {
|
||||
timeline_id,
|
||||
tenant_id,
|
||||
ancestor_timeline_id: timeline.get_ancestor_timeline_id(),
|
||||
ancestor_timeline_id: timeline
|
||||
.get_ancestor_timeline_id()
|
||||
.map(HexZTimelineId::from),
|
||||
disk_consistent_lsn: timeline.get_disk_consistent_lsn(),
|
||||
last_record_lsn: timeline.get_last_record_lsn(),
|
||||
prev_record_lsn: timeline.get_prev_record_lsn(),
|
||||
|
||||
@@ -29,7 +29,7 @@ use std::ops::{Bound::Included, Deref};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::atomic::{self, AtomicBool, AtomicUsize};
|
||||
use std::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard};
|
||||
use std::time::{Duration, Instant};
|
||||
use std::time::Instant;
|
||||
|
||||
use self::metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME};
|
||||
use crate::config::PageServerConf;
|
||||
@@ -64,7 +64,6 @@ mod inmemory_layer;
|
||||
mod interval_tree;
|
||||
mod layer_map;
|
||||
pub mod metadata;
|
||||
mod page_versions;
|
||||
mod par_fsync;
|
||||
mod storage_layer;
|
||||
|
||||
@@ -83,9 +82,6 @@ pub use crate::layered_repository::ephemeral_file::writeback as writeback_epheme
|
||||
|
||||
static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]);
|
||||
|
||||
// Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call.
|
||||
static TIMEOUT: Duration = Duration::from_secs(60);
|
||||
|
||||
// Metrics collected on operations on the storage repository.
|
||||
lazy_static! {
|
||||
static ref STORAGE_TIME: HistogramVec = register_histogram_vec!(
|
||||
@@ -816,7 +812,7 @@ impl Timeline for LayeredTimeline {
|
||||
);
|
||||
|
||||
self.last_record_lsn
|
||||
.wait_for_timeout(lsn, TIMEOUT)
|
||||
.wait_for_timeout(lsn, self.conf.wait_lsn_timeout)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}",
|
||||
@@ -897,12 +893,11 @@ impl Timeline for LayeredTimeline {
|
||||
|
||||
let seg = SegmentTag { rel, segno: 0 };
|
||||
|
||||
let result;
|
||||
if let Some((layer, lsn)) = self.get_layer_for_read(seg, lsn)? {
|
||||
result = layer.get_seg_exists(lsn)?;
|
||||
let result = if let Some((layer, lsn)) = self.get_layer_for_read(seg, lsn)? {
|
||||
layer.get_seg_exists(lsn)?
|
||||
} else {
|
||||
result = false;
|
||||
}
|
||||
false
|
||||
};
|
||||
|
||||
trace!("get_rel_exists: {} at {} -> {}", rel, lsn, result);
|
||||
Ok(result)
|
||||
@@ -1944,22 +1939,21 @@ impl LayeredTimeline {
|
||||
// for redo.
|
||||
let rel = seg.rel;
|
||||
let rel_blknum = seg.segno * RELISH_SEG_SIZE + seg_blknum;
|
||||
let (cached_lsn_opt, cached_page_opt) = match self.lookup_cached_page(&rel, rel_blknum, lsn)
|
||||
{
|
||||
let cached_page_img = match self.lookup_cached_page(&rel, rel_blknum, lsn) {
|
||||
Some((cached_lsn, cached_img)) => {
|
||||
match cached_lsn.cmp(&lsn) {
|
||||
cmp::Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check
|
||||
cmp::Ordering::Equal => return Ok(cached_img), // exact LSN match, return the image
|
||||
cmp::Ordering::Greater => panic!(), // the returned lsn should never be after the requested lsn
|
||||
}
|
||||
(Some(cached_lsn), Some((cached_lsn, cached_img)))
|
||||
Some((cached_lsn, cached_img))
|
||||
}
|
||||
None => (None, None),
|
||||
None => None,
|
||||
};
|
||||
|
||||
let mut data = PageReconstructData {
|
||||
records: Vec::new(),
|
||||
page_img: None,
|
||||
page_img: cached_page_img,
|
||||
};
|
||||
|
||||
// Holds an Arc reference to 'layer_ref' when iterating in the loop below.
|
||||
@@ -1972,15 +1966,14 @@ impl LayeredTimeline {
|
||||
let mut curr_lsn = lsn;
|
||||
loop {
|
||||
let result = layer_ref
|
||||
.get_page_reconstruct_data(seg_blknum, curr_lsn, cached_lsn_opt, &mut data)
|
||||
.get_page_reconstruct_data(seg_blknum, curr_lsn, &mut data)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to get reconstruct data {} {:?} {} {} {:?}",
|
||||
"Failed to get reconstruct data {} {:?} {} {}",
|
||||
layer_ref.get_seg_tag(),
|
||||
layer_ref.filename(),
|
||||
seg_blknum,
|
||||
curr_lsn,
|
||||
cached_lsn_opt,
|
||||
)
|
||||
})?;
|
||||
match result {
|
||||
@@ -2027,16 +2020,6 @@ impl LayeredTimeline {
|
||||
lsn,
|
||||
);
|
||||
}
|
||||
PageReconstructResult::Cached => {
|
||||
let (cached_lsn, cached_img) = cached_page_opt.unwrap();
|
||||
assert!(data.page_img.is_none());
|
||||
if let Some((first_rec_lsn, first_rec)) = data.records.first() {
|
||||
assert!(&cached_lsn < first_rec_lsn);
|
||||
assert!(!first_rec.will_init());
|
||||
}
|
||||
data.page_img = Some(cached_img);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2058,12 +2041,12 @@ impl LayeredTimeline {
|
||||
|
||||
// If we have a page image, and no WAL, we're all set
|
||||
if data.records.is_empty() {
|
||||
if let Some(img) = &data.page_img {
|
||||
if let Some((img_lsn, img)) = &data.page_img {
|
||||
trace!(
|
||||
"found page image for blk {} in {} at {}, no WAL redo required",
|
||||
rel_blknum,
|
||||
rel,
|
||||
request_lsn
|
||||
img_lsn
|
||||
);
|
||||
Ok(img.clone())
|
||||
} else {
|
||||
@@ -2090,11 +2073,13 @@ impl LayeredTimeline {
|
||||
);
|
||||
Ok(ZERO_PAGE.clone())
|
||||
} else {
|
||||
if data.page_img.is_some() {
|
||||
let base_img = if let Some((_lsn, img)) = data.page_img {
|
||||
trace!("found {} WAL records and a base image for blk {} in {} at {}, performing WAL redo", data.records.len(), rel_blknum, rel, request_lsn);
|
||||
Some(img)
|
||||
} else {
|
||||
trace!("found {} WAL records that will init the page for blk {} in {} at {}, performing WAL redo", data.records.len(), rel_blknum, rel, request_lsn);
|
||||
}
|
||||
None
|
||||
};
|
||||
|
||||
let last_rec_lsn = data.records.last().unwrap().0;
|
||||
|
||||
@@ -2102,7 +2087,7 @@ impl LayeredTimeline {
|
||||
rel,
|
||||
rel_blknum,
|
||||
request_lsn,
|
||||
data.page_img.clone(),
|
||||
base_img,
|
||||
data.records,
|
||||
)?;
|
||||
|
||||
@@ -2361,3 +2346,157 @@ fn rename_to_backup(path: PathBuf) -> anyhow::Result<()> {
|
||||
|
||||
bail!("couldn't find an unused backup number for {:?}", path)
|
||||
}
|
||||
|
||||
///
|
||||
/// Tests that are specific to the layered storage format.
|
||||
///
|
||||
/// There are more unit tests in repository.rs that work through the
|
||||
/// Repository interface and are expected to work regardless of the
|
||||
/// file format and directory layout. The test here are more low level.
|
||||
///
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::repository::repo_harness::*;
|
||||
|
||||
#[test]
|
||||
fn corrupt_metadata() -> Result<()> {
|
||||
const TEST_NAME: &str = "corrupt_metadata";
|
||||
let harness = RepoHarness::create(TEST_NAME)?;
|
||||
let repo = harness.load();
|
||||
|
||||
repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
|
||||
drop(repo);
|
||||
|
||||
let metadata_path = harness.timeline_path(&TIMELINE_ID).join(METADATA_FILE_NAME);
|
||||
|
||||
assert!(metadata_path.is_file());
|
||||
|
||||
let mut metadata_bytes = std::fs::read(&metadata_path)?;
|
||||
assert_eq!(metadata_bytes.len(), 512);
|
||||
metadata_bytes[512 - 4 - 2] ^= 1;
|
||||
std::fs::write(metadata_path, metadata_bytes)?;
|
||||
|
||||
let new_repo = harness.load();
|
||||
let err = new_repo.get_timeline(TIMELINE_ID).err().unwrap();
|
||||
assert_eq!(err.to_string(), "failed to load metadata");
|
||||
assert_eq!(
|
||||
err.source().unwrap().to_string(),
|
||||
"metadata checksum mismatch"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// Test the logic in 'load_layer_map' that removes layer files that are
|
||||
/// newer than 'disk_consistent_lsn'.
|
||||
///
|
||||
#[test]
|
||||
fn future_layerfiles() -> Result<()> {
|
||||
const TEST_NAME: &str = "future_layerfiles";
|
||||
let harness = RepoHarness::create(TEST_NAME)?;
|
||||
let repo = harness.load();
|
||||
|
||||
// Create a timeline with disk_consistent_lsn = 8000
|
||||
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0x8000))?;
|
||||
let writer = tline.writer();
|
||||
writer.advance_last_record_lsn(Lsn(0x8000));
|
||||
drop(writer);
|
||||
repo.checkpoint_iteration(CheckpointConfig::Forced)?;
|
||||
drop(repo);
|
||||
|
||||
let timeline_path = harness.timeline_path(&TIMELINE_ID);
|
||||
|
||||
let make_empty_file = |filename: &str| -> std::io::Result<()> {
|
||||
let path = timeline_path.join(filename);
|
||||
|
||||
assert!(!path.exists());
|
||||
std::fs::write(&path, &[])?;
|
||||
|
||||
Ok(())
|
||||
};
|
||||
|
||||
// Helper function to check that a relation file exists, and a corresponding
|
||||
// <filename>.0.old file does not.
|
||||
let assert_exists = |filename: &str| {
|
||||
let path = timeline_path.join(filename);
|
||||
assert!(path.exists(), "file {} was removed", filename);
|
||||
|
||||
// Check that there is no .old file
|
||||
let backup_path = timeline_path.join(format!("{}.0.old", filename));
|
||||
assert!(
|
||||
!backup_path.exists(),
|
||||
"unexpected backup file {}",
|
||||
backup_path.display()
|
||||
);
|
||||
};
|
||||
|
||||
// Helper function to check that a relation file does *not* exists, and a corresponding
|
||||
// <filename>.<num>.old file does.
|
||||
let assert_is_renamed = |filename: &str, num: u32| {
|
||||
let path = timeline_path.join(filename);
|
||||
assert!(
|
||||
!path.exists(),
|
||||
"file {} was not removed as expected",
|
||||
filename
|
||||
);
|
||||
|
||||
let backup_path = timeline_path.join(format!("{}.{}.old", filename, num));
|
||||
assert!(
|
||||
backup_path.exists(),
|
||||
"backup file {} was not created",
|
||||
backup_path.display()
|
||||
);
|
||||
};
|
||||
|
||||
// These files are considered to be in the future and will be renamed out
|
||||
// of the way
|
||||
let future_filenames = vec![
|
||||
format!("pg_control_0_{:016X}", 0x8001),
|
||||
format!("pg_control_0_{:016X}_{:016X}", 0x8001, 0x8008),
|
||||
];
|
||||
// But these are not:
|
||||
let past_filenames = vec![
|
||||
format!("pg_control_0_{:016X}", 0x8000),
|
||||
format!("pg_control_0_{:016X}_{:016X}", 0x7000, 0x8001),
|
||||
];
|
||||
|
||||
for filename in future_filenames.iter().chain(past_filenames.iter()) {
|
||||
make_empty_file(filename)?;
|
||||
}
|
||||
|
||||
// Load the timeline. This will cause the files in the "future" to be renamed
|
||||
// away.
|
||||
let new_repo = harness.load();
|
||||
new_repo.get_timeline(TIMELINE_ID).unwrap();
|
||||
drop(new_repo);
|
||||
|
||||
for filename in future_filenames.iter() {
|
||||
assert_is_renamed(filename, 0);
|
||||
}
|
||||
for filename in past_filenames.iter() {
|
||||
assert_exists(filename);
|
||||
}
|
||||
|
||||
// Create the future files again, and load again. They should be renamed to
|
||||
// *.1.old this time.
|
||||
for filename in future_filenames.iter() {
|
||||
make_empty_file(filename)?;
|
||||
}
|
||||
|
||||
let new_repo = harness.load();
|
||||
new_repo.get_timeline(TIMELINE_ID).unwrap();
|
||||
drop(new_repo);
|
||||
|
||||
for filename in future_filenames.iter() {
|
||||
assert_is_renamed(filename, 0);
|
||||
assert_is_renamed(filename, 1);
|
||||
}
|
||||
for filename in past_filenames.iter() {
|
||||
assert_exists(filename);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -208,16 +208,15 @@ impl Layer for DeltaLayer {
|
||||
&self,
|
||||
blknum: SegmentBlk,
|
||||
lsn: Lsn,
|
||||
cached_img_lsn: Option<Lsn>,
|
||||
reconstruct_data: &mut PageReconstructData,
|
||||
) -> Result<PageReconstructResult> {
|
||||
let mut need_image = true;
|
||||
|
||||
assert!((0..RELISH_SEG_SIZE).contains(&blknum));
|
||||
|
||||
match &cached_img_lsn {
|
||||
Some(cached_lsn) if &self.end_lsn <= cached_lsn => {
|
||||
return Ok(PageReconstructResult::Cached)
|
||||
match &reconstruct_data.page_img {
|
||||
Some((cached_lsn, _)) if &self.end_lsn <= cached_lsn => {
|
||||
return Ok(PageReconstructResult::Complete)
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
@@ -240,9 +239,9 @@ impl Layer for DeltaLayer {
|
||||
.iter()
|
||||
.rev();
|
||||
for ((_blknum, pv_lsn), blob_range) in iter {
|
||||
match &cached_img_lsn {
|
||||
Some(cached_lsn) if pv_lsn <= cached_lsn => {
|
||||
return Ok(PageReconstructResult::Cached)
|
||||
match &reconstruct_data.page_img {
|
||||
Some((cached_lsn, _)) if pv_lsn <= cached_lsn => {
|
||||
return Ok(PageReconstructResult::Complete)
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
@@ -252,7 +251,7 @@ impl Layer for DeltaLayer {
|
||||
match pv {
|
||||
PageVersion::Page(img) => {
|
||||
// Found a page image, return it
|
||||
reconstruct_data.page_img = Some(img);
|
||||
reconstruct_data.page_img = Some((*pv_lsn, img));
|
||||
need_image = false;
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -145,14 +145,15 @@ impl Layer for ImageLayer {
|
||||
&self,
|
||||
blknum: SegmentBlk,
|
||||
lsn: Lsn,
|
||||
cached_img_lsn: Option<Lsn>,
|
||||
reconstruct_data: &mut PageReconstructData,
|
||||
) -> Result<PageReconstructResult> {
|
||||
assert!((0..RELISH_SEG_SIZE).contains(&blknum));
|
||||
assert!(lsn >= self.lsn);
|
||||
|
||||
match cached_img_lsn {
|
||||
Some(cached_lsn) if self.lsn <= cached_lsn => return Ok(PageReconstructResult::Cached),
|
||||
match reconstruct_data.page_img {
|
||||
Some((cached_lsn, _)) if self.lsn <= cached_lsn => {
|
||||
return Ok(PageReconstructResult::Complete)
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
@@ -195,7 +196,7 @@ impl Layer for ImageLayer {
|
||||
}
|
||||
};
|
||||
|
||||
reconstruct_data.page_img = Some(Bytes::from(buf));
|
||||
reconstruct_data.page_img = Some((self.lsn, Bytes::from(buf)));
|
||||
Ok(PageReconstructResult::Complete)
|
||||
}
|
||||
|
||||
|
||||
@@ -20,13 +20,15 @@ use crate::{ZTenantId, ZTimelineId};
|
||||
use anyhow::{ensure, Result};
|
||||
use bytes::Bytes;
|
||||
use log::*;
|
||||
use std::collections::HashMap;
|
||||
use std::io::Seek;
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use zenith_utils::bin_ser::BeSer;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
use zenith_utils::vec_map::VecMap;
|
||||
|
||||
use super::page_versions::PageVersions;
|
||||
|
||||
pub struct InMemoryLayer {
|
||||
conf: &'static PageServerConf,
|
||||
tenantid: ZTenantId,
|
||||
@@ -71,11 +73,15 @@ pub struct InMemoryLayerInner {
|
||||
/// The drop LSN is recorded in [`end_lsn`].
|
||||
dropped: bool,
|
||||
|
||||
///
|
||||
/// All versions of all pages in the layer are are kept here.
|
||||
/// Indexed by block number and LSN.
|
||||
///
|
||||
page_versions: PageVersions,
|
||||
/// The PageVersion structs are stored in a serialized format in this file.
|
||||
/// Each serialized PageVersion is preceded by a 'u32' length field.
|
||||
/// 'page_versions' map stores offsets into this file.
|
||||
file: EphemeralFile,
|
||||
|
||||
/// Metadata about all versions of all pages in the layer is kept
|
||||
/// here. Indexed by block number and LSN. The value is an offset
|
||||
/// into the ephemeral file where the page version is stored.
|
||||
page_versions: HashMap<SegmentBlk, VecMap<Lsn, u64>>,
|
||||
|
||||
///
|
||||
/// `seg_sizes` tracks the size of the segment at different points in time.
|
||||
@@ -111,6 +117,50 @@ impl InMemoryLayerInner {
|
||||
panic!("could not find seg size in in-memory layer");
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Read a page version from the ephemeral file.
|
||||
///
|
||||
fn read_pv(&self, off: u64) -> Result<PageVersion> {
|
||||
let mut buf = Vec::new();
|
||||
self.read_pv_bytes(off, &mut buf)?;
|
||||
Ok(PageVersion::des(&buf)?)
|
||||
}
|
||||
|
||||
///
|
||||
/// Read a page version from the ephemeral file, as raw bytes, at
|
||||
/// the given offset. The bytes are read into 'buf', which is
|
||||
/// expanded if necessary. Returns the size of the page version.
|
||||
///
|
||||
fn read_pv_bytes(&self, off: u64, buf: &mut Vec<u8>) -> Result<usize> {
|
||||
// read length
|
||||
let mut lenbuf = [0u8; 4];
|
||||
self.file.read_exact_at(&mut lenbuf, off)?;
|
||||
let len = u32::from_ne_bytes(lenbuf) as usize;
|
||||
|
||||
if buf.len() < len {
|
||||
buf.resize(len, 0);
|
||||
}
|
||||
self.file.read_exact_at(&mut buf[0..len], off + 4)?;
|
||||
Ok(len)
|
||||
}
|
||||
|
||||
fn write_pv(&mut self, pv: &PageVersion) -> Result<u64> {
|
||||
// remember starting position
|
||||
let pos = self.file.stream_position()?;
|
||||
|
||||
// make room for the 'length' field by writing zeros as a placeholder.
|
||||
self.file.seek(std::io::SeekFrom::Start(pos + 4)).unwrap();
|
||||
|
||||
pv.ser_into(&mut self.file).unwrap();
|
||||
|
||||
// write the 'length' field.
|
||||
let len = self.file.stream_position()? - pos - 4;
|
||||
let lenbuf = u32::to_ne_bytes(len as u32);
|
||||
self.file.write_all_at(&lenbuf, pos)?;
|
||||
|
||||
Ok(pos)
|
||||
}
|
||||
}
|
||||
|
||||
impl Layer for InMemoryLayer {
|
||||
@@ -120,12 +170,11 @@ impl Layer for InMemoryLayer {
|
||||
fn filename(&self) -> PathBuf {
|
||||
let inner = self.inner.read().unwrap();
|
||||
|
||||
let end_lsn;
|
||||
if let Some(drop_lsn) = inner.end_lsn {
|
||||
end_lsn = drop_lsn;
|
||||
let end_lsn = if let Some(drop_lsn) = inner.end_lsn {
|
||||
drop_lsn
|
||||
} else {
|
||||
end_lsn = Lsn(u64::MAX);
|
||||
}
|
||||
Lsn(u64::MAX)
|
||||
};
|
||||
|
||||
let delta_filename = DeltaFileName {
|
||||
seg: self.seg,
|
||||
@@ -174,7 +223,6 @@ impl Layer for InMemoryLayer {
|
||||
&self,
|
||||
blknum: SegmentBlk,
|
||||
lsn: Lsn,
|
||||
cached_img_lsn: Option<Lsn>,
|
||||
reconstruct_data: &mut PageReconstructData,
|
||||
) -> Result<PageReconstructResult> {
|
||||
let mut need_image = true;
|
||||
@@ -185,33 +233,31 @@ impl Layer for InMemoryLayer {
|
||||
let inner = self.inner.read().unwrap();
|
||||
|
||||
// Scan the page versions backwards, starting from `lsn`.
|
||||
let iter = inner
|
||||
.page_versions
|
||||
.get_block_lsn_range(blknum, ..=lsn)
|
||||
.iter()
|
||||
.rev();
|
||||
for (entry_lsn, pos) in iter {
|
||||
match &cached_img_lsn {
|
||||
Some(cached_lsn) if entry_lsn <= cached_lsn => {
|
||||
return Ok(PageReconstructResult::Cached)
|
||||
if let Some(vec_map) = inner.page_versions.get(&blknum) {
|
||||
let slice = vec_map.slice_range(..=lsn);
|
||||
for (entry_lsn, pos) in slice.iter().rev() {
|
||||
match &reconstruct_data.page_img {
|
||||
Some((cached_lsn, _)) if entry_lsn <= cached_lsn => {
|
||||
return Ok(PageReconstructResult::Complete)
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
let pv = inner.page_versions.read_pv(*pos)?;
|
||||
match pv {
|
||||
PageVersion::Page(img) => {
|
||||
reconstruct_data.page_img = Some(img);
|
||||
need_image = false;
|
||||
break;
|
||||
}
|
||||
PageVersion::Wal(rec) => {
|
||||
reconstruct_data.records.push((*entry_lsn, rec.clone()));
|
||||
if rec.will_init() {
|
||||
// This WAL record initializes the page, so no need to go further back
|
||||
let pv = inner.read_pv(*pos)?;
|
||||
match pv {
|
||||
PageVersion::Page(img) => {
|
||||
reconstruct_data.page_img = Some((*entry_lsn, img));
|
||||
need_image = false;
|
||||
break;
|
||||
}
|
||||
PageVersion::Wal(rec) => {
|
||||
reconstruct_data.records.push((*entry_lsn, rec.clone()));
|
||||
if rec.will_init() {
|
||||
// This WAL record initializes the page, so no need to go further back
|
||||
need_image = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -317,14 +363,22 @@ impl Layer for InMemoryLayer {
|
||||
println!("seg_sizes {}: {}", k, v);
|
||||
}
|
||||
|
||||
for (blknum, lsn, pos) in inner.page_versions.ordered_page_version_iter(None) {
|
||||
let pv = inner.page_versions.read_pv(pos)?;
|
||||
let pv_description = match pv {
|
||||
PageVersion::Page(_img) => "page",
|
||||
PageVersion::Wal(_rec) => "wal",
|
||||
};
|
||||
// List the blocks in order
|
||||
let mut page_versions: Vec<(&SegmentBlk, &VecMap<Lsn, u64>)> =
|
||||
inner.page_versions.iter().collect();
|
||||
page_versions.sort_by_key(|k| k.0);
|
||||
|
||||
println!("blk {} at {}: {}\n", blknum, lsn, pv_description);
|
||||
for (blknum, versions) in page_versions {
|
||||
for (lsn, off) in versions.as_slice() {
|
||||
let pv = inner.read_pv(*off);
|
||||
let pv_description = match pv {
|
||||
Ok(PageVersion::Page(_img)) => "page",
|
||||
Ok(PageVersion::Wal(_rec)) => "wal",
|
||||
Err(_err) => "INVALID",
|
||||
};
|
||||
|
||||
println!("blk {} at {}: {}\n", blknum, lsn, pv_description);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -385,7 +439,8 @@ impl InMemoryLayer {
|
||||
inner: RwLock::new(InMemoryLayerInner {
|
||||
end_lsn: None,
|
||||
dropped: false,
|
||||
page_versions: PageVersions::new(file),
|
||||
file,
|
||||
page_versions: HashMap::new(),
|
||||
seg_sizes,
|
||||
latest_lsn: oldest_lsn,
|
||||
}),
|
||||
@@ -427,14 +482,18 @@ impl InMemoryLayer {
|
||||
assert!(lsn >= inner.latest_lsn);
|
||||
inner.latest_lsn = lsn;
|
||||
|
||||
let old = inner.page_versions.append_or_update_last(blknum, lsn, pv)?;
|
||||
|
||||
if old.is_some() {
|
||||
// We already had an entry for this LSN. That's odd..
|
||||
warn!(
|
||||
"Page version of rel {} blk {} at {} already exists",
|
||||
self.seg.rel, blknum, lsn
|
||||
);
|
||||
// Write the page version to the file, and remember its offset in 'page_versions'
|
||||
{
|
||||
let off = inner.write_pv(&pv)?;
|
||||
let vec_map = inner.page_versions.entry(blknum).or_default();
|
||||
let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
|
||||
if old.is_some() {
|
||||
// We already had an entry for this LSN. That's odd..
|
||||
warn!(
|
||||
"Page version of rel {} blk {} at {} already exists",
|
||||
self.seg.rel, blknum, lsn
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Also update the relation size, if this extended the relation.
|
||||
@@ -468,16 +527,19 @@ impl InMemoryLayer {
|
||||
gapblknum,
|
||||
blknum
|
||||
);
|
||||
let old = inner
|
||||
.page_versions
|
||||
.append_or_update_last(gapblknum, lsn, zeropv)?;
|
||||
// We already had an entry for this LSN. That's odd..
|
||||
|
||||
if old.is_some() {
|
||||
warn!(
|
||||
"Page version of seg {} blk {} at {} already exists",
|
||||
self.seg, blknum, lsn
|
||||
);
|
||||
// Write the page version to the file, and remember its offset in
|
||||
// 'page_versions'
|
||||
{
|
||||
let off = inner.write_pv(&zeropv)?;
|
||||
let vec_map = inner.page_versions.entry(gapblknum).or_default();
|
||||
let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
|
||||
if old.is_some() {
|
||||
warn!(
|
||||
"Page version of seg {} blk {} at {} already exists",
|
||||
self.seg, gapblknum, lsn
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -570,7 +632,8 @@ impl InMemoryLayer {
|
||||
inner: RwLock::new(InMemoryLayerInner {
|
||||
end_lsn: None,
|
||||
dropped: false,
|
||||
page_versions: PageVersions::new(file),
|
||||
file,
|
||||
page_versions: HashMap::new(),
|
||||
seg_sizes,
|
||||
latest_lsn: oldest_lsn,
|
||||
}),
|
||||
@@ -599,8 +662,10 @@ impl InMemoryLayer {
|
||||
assert!(lsn <= &end_lsn, "{:?} {:?}", lsn, end_lsn);
|
||||
}
|
||||
|
||||
for (_blk, lsn, _pv) in inner.page_versions.ordered_page_version_iter(None) {
|
||||
assert!(lsn <= end_lsn);
|
||||
for (_blk, vec_map) in inner.page_versions.iter() {
|
||||
for (lsn, _pos) in vec_map.as_slice() {
|
||||
assert!(*lsn <= end_lsn);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -678,15 +743,19 @@ impl InMemoryLayer {
|
||||
self.is_dropped(),
|
||||
)?;
|
||||
|
||||
// Write all page versions
|
||||
// Write all page versions, in block + LSN order
|
||||
let mut buf: Vec<u8> = Vec::new();
|
||||
|
||||
let page_versions_iter = inner
|
||||
.page_versions
|
||||
.ordered_page_version_iter(Some(delta_end_lsn));
|
||||
for (blknum, lsn, pos) in page_versions_iter {
|
||||
let len = inner.page_versions.read_pv_bytes(pos, &mut buf)?;
|
||||
delta_layer_writer.put_page_version(blknum, lsn, &buf[..len])?;
|
||||
let pv_iter = inner.page_versions.iter();
|
||||
let mut pages: Vec<(&SegmentBlk, &VecMap<Lsn, u64>)> = pv_iter.collect();
|
||||
pages.sort_by_key(|(blknum, _vec_map)| *blknum);
|
||||
for (blknum, vec_map) in pages {
|
||||
for (lsn, pos) in vec_map.as_slice() {
|
||||
if *lsn < delta_end_lsn {
|
||||
let len = inner.read_pv_bytes(*pos, &mut buf)?;
|
||||
delta_layer_writer.put_page_version(*blknum, *lsn, &buf[..len])?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Create seg_sizes
|
||||
|
||||
@@ -1,268 +0,0 @@
|
||||
//!
|
||||
//! Data structure to ingest incoming WAL into an append-only file.
|
||||
//!
|
||||
//! - The file is considered temporary, and will be discarded on crash
|
||||
//! - based on a B-tree
|
||||
//!
|
||||
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::{collections::HashMap, ops::RangeBounds, slice};
|
||||
|
||||
use anyhow::Result;
|
||||
|
||||
use std::cmp::min;
|
||||
use std::io::Seek;
|
||||
|
||||
use zenith_utils::{lsn::Lsn, vec_map::VecMap};
|
||||
|
||||
use super::storage_layer::PageVersion;
|
||||
use crate::layered_repository::ephemeral_file::EphemeralFile;
|
||||
|
||||
use zenith_utils::bin_ser::BeSer;
|
||||
|
||||
const EMPTY_SLICE: &[(Lsn, u64)] = &[];
|
||||
|
||||
pub struct PageVersions {
|
||||
map: HashMap<u32, VecMap<Lsn, u64>>,
|
||||
|
||||
/// The PageVersion structs are stored in a serialized format in this file.
|
||||
/// Each serialized PageVersion is preceded by a 'u32' length field.
|
||||
/// The 'map' stores offsets into this file.
|
||||
file: EphemeralFile,
|
||||
}
|
||||
|
||||
impl PageVersions {
|
||||
pub fn new(file: EphemeralFile) -> PageVersions {
|
||||
PageVersions {
|
||||
map: HashMap::new(),
|
||||
file,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn append_or_update_last(
|
||||
&mut self,
|
||||
blknum: u32,
|
||||
lsn: Lsn,
|
||||
page_version: PageVersion,
|
||||
) -> Result<Option<u64>> {
|
||||
// remember starting position
|
||||
let pos = self.file.stream_position()?;
|
||||
|
||||
// make room for the 'length' field by writing zeros as a placeholder.
|
||||
self.file.seek(std::io::SeekFrom::Start(pos + 4)).unwrap();
|
||||
|
||||
page_version.ser_into(&mut self.file).unwrap();
|
||||
|
||||
// write the 'length' field.
|
||||
let len = self.file.stream_position()? - pos - 4;
|
||||
let lenbuf = u32::to_ne_bytes(len as u32);
|
||||
self.file.write_all_at(&lenbuf, pos)?;
|
||||
|
||||
let map = self.map.entry(blknum).or_insert_with(VecMap::default);
|
||||
Ok(map.append_or_update_last(lsn, pos as u64).unwrap().0)
|
||||
}
|
||||
|
||||
/// Get all [`PageVersion`]s in a block
|
||||
fn get_block_slice(&self, blknum: u32) -> &[(Lsn, u64)] {
|
||||
self.map
|
||||
.get(&blknum)
|
||||
.map(VecMap::as_slice)
|
||||
.unwrap_or(EMPTY_SLICE)
|
||||
}
|
||||
|
||||
/// Get a range of [`PageVersions`] in a block
|
||||
pub fn get_block_lsn_range<R: RangeBounds<Lsn>>(&self, blknum: u32, range: R) -> &[(Lsn, u64)] {
|
||||
self.map
|
||||
.get(&blknum)
|
||||
.map(|vec_map| vec_map.slice_range(range))
|
||||
.unwrap_or(EMPTY_SLICE)
|
||||
}
|
||||
|
||||
/// Iterate through [`PageVersion`]s in (block, lsn) order.
|
||||
/// If a [`cutoff_lsn`] is set, only show versions with `lsn < cutoff_lsn`
|
||||
pub fn ordered_page_version_iter(&self, cutoff_lsn: Option<Lsn>) -> OrderedPageVersionIter<'_> {
|
||||
let mut ordered_blocks: Vec<u32> = self.map.keys().cloned().collect();
|
||||
ordered_blocks.sort_unstable();
|
||||
|
||||
let slice = ordered_blocks
|
||||
.first()
|
||||
.map(|&blknum| self.get_block_slice(blknum))
|
||||
.unwrap_or(EMPTY_SLICE);
|
||||
|
||||
OrderedPageVersionIter {
|
||||
page_versions: self,
|
||||
ordered_blocks,
|
||||
cur_block_idx: 0,
|
||||
cutoff_lsn,
|
||||
cur_slice_iter: slice.iter(),
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Read a page version.
|
||||
///
|
||||
pub fn read_pv(&self, off: u64) -> Result<PageVersion> {
|
||||
let mut buf = Vec::new();
|
||||
self.read_pv_bytes(off, &mut buf)?;
|
||||
Ok(PageVersion::des(&buf)?)
|
||||
}
|
||||
|
||||
///
|
||||
/// Read a page version, as raw bytes, at the given offset. The bytes
|
||||
/// are read into 'buf', which is expanded if necessary. Returns the
|
||||
/// size of the page version.
|
||||
///
|
||||
pub fn read_pv_bytes(&self, off: u64, buf: &mut Vec<u8>) -> Result<usize> {
|
||||
// read length
|
||||
let mut lenbuf = [0u8; 4];
|
||||
self.file.read_exact_at(&mut lenbuf, off)?;
|
||||
let len = u32::from_ne_bytes(lenbuf) as usize;
|
||||
|
||||
// Resize the buffer to fit the data, if needed.
|
||||
//
|
||||
// We don't shrink the buffer if it's larger than necessary. That avoids
|
||||
// repeatedly shrinking and expanding when you reuse the same buffer to
|
||||
// read multiple page versions. Expanding a Vec requires initializing the
|
||||
// new bytes, which is a waste of time because we're immediately overwriting
|
||||
// it, but there's no way to avoid it without resorting to unsafe code.
|
||||
if buf.len() < len {
|
||||
buf.resize(len, 0);
|
||||
}
|
||||
self.file.read_exact_at(&mut buf[0..len], off + 4)?;
|
||||
|
||||
Ok(len)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PageVersionReader<'a> {
|
||||
file: &'a EphemeralFile,
|
||||
pos: u64,
|
||||
end_pos: u64,
|
||||
}
|
||||
|
||||
impl<'a> std::io::Read for PageVersionReader<'a> {
|
||||
fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
|
||||
let len = min(buf.len(), (self.end_pos - self.pos) as usize);
|
||||
let n = self.file.read_at(&mut buf[..len], self.pos)?;
|
||||
self.pos += n as u64;
|
||||
Ok(n)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct OrderedPageVersionIter<'a> {
|
||||
page_versions: &'a PageVersions,
|
||||
|
||||
ordered_blocks: Vec<u32>,
|
||||
cur_block_idx: usize,
|
||||
|
||||
cutoff_lsn: Option<Lsn>,
|
||||
|
||||
cur_slice_iter: slice::Iter<'a, (Lsn, u64)>,
|
||||
}
|
||||
|
||||
impl OrderedPageVersionIter<'_> {
|
||||
fn is_lsn_before_cutoff(&self, lsn: &Lsn) -> bool {
|
||||
if let Some(cutoff_lsn) = self.cutoff_lsn.as_ref() {
|
||||
lsn < cutoff_lsn
|
||||
} else {
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Iterator for OrderedPageVersionIter<'a> {
|
||||
type Item = (u32, Lsn, u64);
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
loop {
|
||||
if let Some((lsn, pos)) = self.cur_slice_iter.next() {
|
||||
if self.is_lsn_before_cutoff(lsn) {
|
||||
let blknum = self.ordered_blocks[self.cur_block_idx];
|
||||
return Some((blknum, *lsn, *pos));
|
||||
}
|
||||
}
|
||||
|
||||
let next_block_idx = self.cur_block_idx + 1;
|
||||
let blknum: u32 = *self.ordered_blocks.get(next_block_idx)?;
|
||||
self.cur_block_idx = next_block_idx;
|
||||
self.cur_slice_iter = self.page_versions.get_block_slice(blknum).iter();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use bytes::Bytes;
|
||||
|
||||
use super::*;
|
||||
use crate::config::PageServerConf;
|
||||
use std::fs;
|
||||
use std::str::FromStr;
|
||||
use zenith_utils::zid::{ZTenantId, ZTimelineId};
|
||||
|
||||
fn repo_harness(test_name: &str) -> Result<(&'static PageServerConf, ZTenantId, ZTimelineId)> {
|
||||
let repo_dir = PageServerConf::test_repo_dir(test_name);
|
||||
let _ = fs::remove_dir_all(&repo_dir);
|
||||
let conf = PageServerConf::dummy_conf(repo_dir);
|
||||
// Make a static copy of the config. This can never be free'd, but that's
|
||||
// OK in a test.
|
||||
let conf: &'static PageServerConf = Box::leak(Box::new(conf));
|
||||
|
||||
let tenantid = ZTenantId::from_str("11000000000000000000000000000000").unwrap();
|
||||
let timelineid = ZTimelineId::from_str("22000000000000000000000000000000").unwrap();
|
||||
fs::create_dir_all(conf.timeline_path(&timelineid, &tenantid))?;
|
||||
|
||||
Ok((conf, tenantid, timelineid))
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_ordered_iter() -> Result<()> {
|
||||
let (conf, tenantid, timelineid) = repo_harness("test_ordered_iter")?;
|
||||
|
||||
let file = EphemeralFile::create(conf, tenantid, timelineid)?;
|
||||
|
||||
let mut page_versions = PageVersions::new(file);
|
||||
|
||||
const BLOCKS: u32 = 1000;
|
||||
const LSNS: u64 = 50;
|
||||
|
||||
let empty_page = Bytes::from_static(&[0u8; 8192]);
|
||||
let empty_page_version = PageVersion::Page(empty_page);
|
||||
|
||||
for blknum in 0..BLOCKS {
|
||||
for lsn in 0..LSNS {
|
||||
let old = page_versions.append_or_update_last(
|
||||
blknum,
|
||||
Lsn(lsn),
|
||||
empty_page_version.clone(),
|
||||
)?;
|
||||
assert!(old.is_none());
|
||||
}
|
||||
}
|
||||
|
||||
let mut iter = page_versions.ordered_page_version_iter(None);
|
||||
for blknum in 0..BLOCKS {
|
||||
for lsn in 0..LSNS {
|
||||
let (actual_blknum, actual_lsn, _pv) = iter.next().unwrap();
|
||||
assert_eq!(actual_blknum, blknum);
|
||||
assert_eq!(Lsn(lsn), actual_lsn);
|
||||
}
|
||||
}
|
||||
assert!(iter.next().is_none());
|
||||
assert!(iter.next().is_none()); // should be robust against excessive next() calls
|
||||
|
||||
const CUTOFF_LSN: Lsn = Lsn(30);
|
||||
let mut iter = page_versions.ordered_page_version_iter(Some(CUTOFF_LSN));
|
||||
for blknum in 0..BLOCKS {
|
||||
for lsn in 0..CUTOFF_LSN.0 {
|
||||
let (actual_blknum, actual_lsn, _pv) = iter.next().unwrap();
|
||||
assert_eq!(actual_blknum, blknum);
|
||||
assert_eq!(Lsn(lsn), actual_lsn);
|
||||
}
|
||||
}
|
||||
assert!(iter.next().is_none());
|
||||
assert!(iter.next().is_none()); // should be robust against excessive next() calls
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -71,15 +71,26 @@ pub enum PageVersion {
|
||||
}
|
||||
|
||||
///
|
||||
/// Data needed to reconstruct a page version
|
||||
/// Struct used to communicate across calls to 'get_page_reconstruct_data'.
|
||||
///
|
||||
/// 'page_img' is the old base image of the page to start the WAL replay with.
|
||||
/// It can be None, if the first WAL record initializes the page (will_init)
|
||||
/// 'records' contains the records to apply over the base image.
|
||||
/// Before first call to get_page_reconstruct_data, you can fill in 'page_img'
|
||||
/// if you have an older cached version of the page available. That can save
|
||||
/// work in 'get_page_reconstruct_data', as it can stop searching for page
|
||||
/// versions when all the WAL records going back to the cached image have been
|
||||
/// collected.
|
||||
///
|
||||
/// When get_page_reconstruct_data returns Complete, 'page_img' is set to an
|
||||
/// image of the page, or the oldest WAL record in 'records' is a will_init-type
|
||||
/// record that initializes the page without requiring a previous image.
|
||||
///
|
||||
/// If 'get_page_reconstruct_data' returns Continue, some 'records' may have
|
||||
/// been collected, but there are more records outside the current layer. Pass
|
||||
/// the same PageReconstructData struct in the next 'get_page_reconstruct_data'
|
||||
/// call, to collect more records.
|
||||
///
|
||||
pub struct PageReconstructData {
|
||||
pub records: Vec<(Lsn, ZenithWalRecord)>,
|
||||
pub page_img: Option<Bytes>,
|
||||
pub page_img: Option<(Lsn, Bytes)>,
|
||||
}
|
||||
|
||||
/// Return value from Layer::get_page_reconstruct_data
|
||||
@@ -93,8 +104,6 @@ pub enum PageReconstructResult {
|
||||
/// the returned LSN. This is usually considered an error, but might be OK
|
||||
/// in some circumstances.
|
||||
Missing(Lsn),
|
||||
/// Use the cached image at `cached_img_lsn` as the base image
|
||||
Cached,
|
||||
}
|
||||
|
||||
///
|
||||
@@ -138,19 +147,16 @@ pub trait Layer: Send + Sync {
|
||||
/// It is up to the caller to collect more data from previous layer and
|
||||
/// perform WAL redo, if necessary.
|
||||
///
|
||||
/// `cached_img_lsn` should be set to a cached page image's lsn < `lsn`.
|
||||
/// This function will only return data after `cached_img_lsn`.
|
||||
///
|
||||
/// See PageReconstructResult for possible return values. The collected data
|
||||
/// is appended to reconstruct_data; the caller should pass an empty struct
|
||||
/// on first call. If this returns PageReconstructResult::Continue, look up
|
||||
/// the predecessor layer and call again with the same 'reconstruct_data'
|
||||
/// to collect more data.
|
||||
/// on first call, or a struct with a cached older image of the page if one
|
||||
/// is available. If this returns PageReconstructResult::Continue, look up
|
||||
/// the predecessor layer and call again with the same 'reconstruct_data' to
|
||||
/// collect more data.
|
||||
fn get_page_reconstruct_data(
|
||||
&self,
|
||||
blknum: SegmentBlk,
|
||||
lsn: Lsn,
|
||||
cached_img_lsn: Option<Lsn>,
|
||||
reconstruct_data: &mut PageReconstructData,
|
||||
) -> Result<PageReconstructResult>;
|
||||
|
||||
|
||||
@@ -447,8 +447,6 @@ pub mod repo_harness {
|
||||
#[allow(clippy::bool_assert_comparison)]
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::layered_repository::metadata::METADATA_FILE_NAME;
|
||||
|
||||
use super::repo_harness::*;
|
||||
use super::*;
|
||||
use postgres_ffi::{pg_constants, xlog_utils::SIZEOF_CHECKPOINT};
|
||||
@@ -746,8 +744,8 @@ mod tests {
|
||||
|
||||
let mut lsn = 0x10;
|
||||
for blknum in 0..pg_constants::RELSEG_SIZE + 1 {
|
||||
let img = TEST_IMG(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
|
||||
lsn += 0x10;
|
||||
let img = TEST_IMG(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
|
||||
writer.put_page_image(TESTREL_A, blknum as BlockNumber, Lsn(lsn), img)?;
|
||||
}
|
||||
writer.advance_last_record_lsn(Lsn(lsn));
|
||||
@@ -1132,141 +1130,4 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn corrupt_metadata() -> Result<()> {
|
||||
const TEST_NAME: &str = "corrupt_metadata";
|
||||
let harness = RepoHarness::create(TEST_NAME)?;
|
||||
let repo = harness.load();
|
||||
|
||||
repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
|
||||
drop(repo);
|
||||
|
||||
let metadata_path = harness.timeline_path(&TIMELINE_ID).join(METADATA_FILE_NAME);
|
||||
|
||||
assert!(metadata_path.is_file());
|
||||
|
||||
let mut metadata_bytes = std::fs::read(&metadata_path)?;
|
||||
assert_eq!(metadata_bytes.len(), 512);
|
||||
metadata_bytes[512 - 4 - 2] ^= 1;
|
||||
std::fs::write(metadata_path, metadata_bytes)?;
|
||||
|
||||
let new_repo = harness.load();
|
||||
let err = new_repo.get_timeline(TIMELINE_ID).err().unwrap();
|
||||
assert_eq!(err.to_string(), "failed to load metadata");
|
||||
assert_eq!(
|
||||
err.source().unwrap().to_string(),
|
||||
"metadata checksum mismatch"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn future_layerfiles() -> Result<()> {
|
||||
const TEST_NAME: &str = "future_layerfiles";
|
||||
let harness = RepoHarness::create(TEST_NAME)?;
|
||||
let repo = harness.load();
|
||||
|
||||
// Create a timeline with disk_consistent_lsn = 8000
|
||||
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0x8000))?;
|
||||
let writer = tline.writer();
|
||||
writer.advance_last_record_lsn(Lsn(0x8000));
|
||||
drop(writer);
|
||||
repo.checkpoint_iteration(CheckpointConfig::Forced)?;
|
||||
drop(repo);
|
||||
|
||||
let timeline_path = harness.timeline_path(&TIMELINE_ID);
|
||||
|
||||
let make_empty_file = |filename: &str| -> std::io::Result<()> {
|
||||
let path = timeline_path.join(filename);
|
||||
|
||||
assert!(!path.exists());
|
||||
std::fs::write(&path, &[])?;
|
||||
|
||||
Ok(())
|
||||
};
|
||||
|
||||
// Helper function to check that a relation file exists, and a corresponding
|
||||
// <filename>.0.old file does not.
|
||||
let assert_exists = |filename: &str| {
|
||||
let path = timeline_path.join(filename);
|
||||
assert!(path.exists(), "file {} was removed", filename);
|
||||
|
||||
// Check that there is no .old file
|
||||
let backup_path = timeline_path.join(format!("{}.0.old", filename));
|
||||
assert!(
|
||||
!backup_path.exists(),
|
||||
"unexpected backup file {}",
|
||||
backup_path.display()
|
||||
);
|
||||
};
|
||||
|
||||
// Helper function to check that a relation file does *not* exists, and a corresponding
|
||||
// <filename>.<num>.old file does.
|
||||
let assert_is_renamed = |filename: &str, num: u32| {
|
||||
let path = timeline_path.join(filename);
|
||||
assert!(
|
||||
!path.exists(),
|
||||
"file {} was not removed as expected",
|
||||
filename
|
||||
);
|
||||
|
||||
let backup_path = timeline_path.join(format!("{}.{}.old", filename, num));
|
||||
assert!(
|
||||
backup_path.exists(),
|
||||
"backup file {} was not created",
|
||||
backup_path.display()
|
||||
);
|
||||
};
|
||||
|
||||
// These files are considered to be in the future and will be renamed out
|
||||
// of the way
|
||||
let future_filenames = vec![
|
||||
format!("pg_control_0_{:016X}", 0x8001),
|
||||
format!("pg_control_0_{:016X}_{:016X}", 0x8001, 0x8008),
|
||||
];
|
||||
// But these are not:
|
||||
let past_filenames = vec![
|
||||
format!("pg_control_0_{:016X}", 0x8000),
|
||||
format!("pg_control_0_{:016X}_{:016X}", 0x7000, 0x8001),
|
||||
];
|
||||
|
||||
for filename in future_filenames.iter().chain(past_filenames.iter()) {
|
||||
make_empty_file(filename)?;
|
||||
}
|
||||
|
||||
// Load the timeline. This will cause the files in the "future" to be renamed
|
||||
// away.
|
||||
let new_repo = harness.load();
|
||||
new_repo.get_timeline(TIMELINE_ID).unwrap();
|
||||
drop(new_repo);
|
||||
|
||||
for filename in future_filenames.iter() {
|
||||
assert_is_renamed(filename, 0);
|
||||
}
|
||||
for filename in past_filenames.iter() {
|
||||
assert_exists(filename);
|
||||
}
|
||||
|
||||
// Create the future files again, and load again. They should be renamed to
|
||||
// *.1.old this time.
|
||||
for filename in future_filenames.iter() {
|
||||
make_empty_file(filename)?;
|
||||
}
|
||||
|
||||
let new_repo = harness.load();
|
||||
new_repo.get_timeline(TIMELINE_ID).unwrap();
|
||||
drop(new_repo);
|
||||
|
||||
for filename in future_filenames.iter() {
|
||||
assert_is_renamed(filename, 0);
|
||||
assert_is_renamed(filename, 1);
|
||||
}
|
||||
for filename in past_filenames.iter() {
|
||||
assert_exists(filename);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -268,12 +268,11 @@ impl XlXactParsedRecord {
|
||||
let info = xl_info & pg_constants::XLOG_XACT_OPMASK;
|
||||
// The record starts with time of commit/abort
|
||||
let xact_time = buf.get_i64_le();
|
||||
let xinfo;
|
||||
if xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 {
|
||||
xinfo = buf.get_u32_le();
|
||||
let xinfo = if xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 {
|
||||
buf.get_u32_le()
|
||||
} else {
|
||||
xinfo = 0;
|
||||
}
|
||||
0
|
||||
};
|
||||
let db_id;
|
||||
let ts_id;
|
||||
if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 {
|
||||
@@ -502,7 +501,6 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
|
||||
0..=pg_constants::XLR_MAX_BLOCK_ID => {
|
||||
/* XLogRecordBlockHeader */
|
||||
let mut blk = DecodedBkpBlock::new();
|
||||
let fork_flags: u8;
|
||||
|
||||
if block_id <= max_block_id {
|
||||
// TODO
|
||||
@@ -515,7 +513,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
|
||||
}
|
||||
max_block_id = block_id;
|
||||
|
||||
fork_flags = buf.get_u8();
|
||||
let fork_flags: u8 = buf.get_u8();
|
||||
blk.forknum = fork_flags & pg_constants::BKPBLOCK_FORK_MASK;
|
||||
blk.flags = fork_flags;
|
||||
blk.has_image = (fork_flags & pg_constants::BKPBLOCK_HAS_IMAGE) != 0;
|
||||
|
||||
@@ -102,8 +102,6 @@ impl crate::walredo::WalRedoManager for DummyRedoManager {
|
||||
}
|
||||
}
|
||||
|
||||
static TIMEOUT: Duration = Duration::from_secs(20);
|
||||
|
||||
// Metrics collected on WAL redo operations
|
||||
//
|
||||
// We collect the time spent in actual WAL redo ('redo'), and time waiting
|
||||
@@ -221,7 +219,14 @@ impl WalRedoManager for PostgresRedoManager {
|
||||
let result = if batch_zenith {
|
||||
self.apply_batch_zenith(rel, blknum, lsn, img, &records[batch_start..i])
|
||||
} else {
|
||||
self.apply_batch_postgres(rel, blknum, lsn, img, &records[batch_start..i])
|
||||
self.apply_batch_postgres(
|
||||
rel,
|
||||
blknum,
|
||||
lsn,
|
||||
img,
|
||||
&records[batch_start..i],
|
||||
self.conf.wal_redo_timeout,
|
||||
)
|
||||
};
|
||||
img = Some(result?);
|
||||
|
||||
@@ -233,7 +238,14 @@ impl WalRedoManager for PostgresRedoManager {
|
||||
if batch_zenith {
|
||||
self.apply_batch_zenith(rel, blknum, lsn, img, &records[batch_start..])
|
||||
} else {
|
||||
self.apply_batch_postgres(rel, blknum, lsn, img, &records[batch_start..])
|
||||
self.apply_batch_postgres(
|
||||
rel,
|
||||
blknum,
|
||||
lsn,
|
||||
img,
|
||||
&records[batch_start..],
|
||||
self.conf.wal_redo_timeout,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -261,6 +273,7 @@ impl PostgresRedoManager {
|
||||
lsn: Lsn,
|
||||
base_img: Option<Bytes>,
|
||||
records: &[(Lsn, ZenithWalRecord)],
|
||||
wal_redo_timeout: Duration,
|
||||
) -> Result<Bytes, WalRedoError> {
|
||||
let start_time = Instant::now();
|
||||
|
||||
@@ -281,7 +294,7 @@ impl PostgresRedoManager {
|
||||
let result = if let RelishTag::Relation(rel) = rel {
|
||||
// Relational WAL records are applied using wal-redo-postgres
|
||||
let buf_tag = BufferTag { rel, blknum };
|
||||
apply_result = process.apply_wal_records(buf_tag, base_img, records);
|
||||
apply_result = process.apply_wal_records(buf_tag, base_img, records, wal_redo_timeout);
|
||||
|
||||
apply_result.map_err(WalRedoError::IoError)
|
||||
} else {
|
||||
@@ -603,6 +616,7 @@ impl PostgresRedoProcess {
|
||||
tag: BufferTag,
|
||||
base_img: Option<Bytes>,
|
||||
records: &[(Lsn, ZenithWalRecord)],
|
||||
wal_redo_timeout: Duration,
|
||||
) -> Result<Bytes, std::io::Error> {
|
||||
// Serialize all the messages to send the WAL redo process first.
|
||||
//
|
||||
@@ -653,7 +667,7 @@ impl PostgresRedoProcess {
|
||||
// If we have more data to write, wake up if 'stdin' becomes writeable or
|
||||
// we have data to read. Otherwise only wake up if there's data to read.
|
||||
let nfds = if nwrite < writebuf.len() { 3 } else { 2 };
|
||||
let n = nix::poll::poll(&mut pollfds[0..nfds], TIMEOUT.as_millis() as i32)?;
|
||||
let n = nix::poll::poll(&mut pollfds[0..nfds], wal_redo_timeout.as_millis() as i32)?;
|
||||
|
||||
if n == 0 {
|
||||
return Err(Error::new(ErrorKind::Other, "WAL redo timed out"));
|
||||
|
||||
25
poetry.lock
generated
25
poetry.lock
generated
@@ -91,6 +91,14 @@ botocore = ">=1.11.3"
|
||||
future = "*"
|
||||
wrapt = "*"
|
||||
|
||||
[[package]]
|
||||
name = "backoff"
|
||||
version = "1.11.1"
|
||||
description = "Function decoration for backoff and retry"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
|
||||
|
||||
[[package]]
|
||||
name = "boto3"
|
||||
version = "1.20.40"
|
||||
@@ -814,11 +822,11 @@ python-versions = "*"
|
||||
|
||||
[[package]]
|
||||
name = "moto"
|
||||
version = "3.0.0"
|
||||
version = "3.0.4"
|
||||
description = "A library that allows your python tests to easily mock out the boto library"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
python-versions = ">=3.6"
|
||||
|
||||
[package.dependencies]
|
||||
aws-xray-sdk = {version = ">=0.93,<0.96 || >0.96", optional = true, markers = "extra == \"server\""}
|
||||
@@ -848,7 +856,8 @@ xmltodict = "*"
|
||||
|
||||
[package.extras]
|
||||
all = ["PyYAML (>=5.1)", "python-jose[cryptography] (>=3.1.0,<4.0.0)", "ecdsa (!=0.15)", "docker (>=2.5.1)", "graphql-core", "jsondiff (>=1.1.2)", "aws-xray-sdk (>=0.93,!=0.96)", "idna (>=2.5,<4)", "cfn-lint (>=0.4.0)", "sshpubkeys (>=3.1.0)", "setuptools"]
|
||||
apigateway = ["python-jose[cryptography] (>=3.1.0,<4.0.0)", "ecdsa (!=0.15)"]
|
||||
apigateway = ["PyYAML (>=5.1)", "python-jose[cryptography] (>=3.1.0,<4.0.0)", "ecdsa (!=0.15)"]
|
||||
apigatewayv2 = ["PyYAML (>=5.1)"]
|
||||
appsync = ["graphql-core"]
|
||||
awslambda = ["docker (>=2.5.1)"]
|
||||
batch = ["docker (>=2.5.1)"]
|
||||
@@ -1352,7 +1361,7 @@ testing = ["pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytest-
|
||||
[metadata]
|
||||
lock-version = "1.1"
|
||||
python-versions = "^3.7"
|
||||
content-hash = "0fa6c9377fbc827240d18d8b7e3742def37e90fc3277fddf8525d82dabd13090"
|
||||
content-hash = "58762accad4122026c650fa43421a900546e89f9908e2268410e7b11cc8c6c4e"
|
||||
|
||||
[metadata.files]
|
||||
aiopg = [
|
||||
@@ -1395,6 +1404,10 @@ aws-xray-sdk = [
|
||||
{file = "aws-xray-sdk-2.9.0.tar.gz", hash = "sha256:b0cd972db218d4d8f7b53ad806fc6184626b924c4997ae58fc9f2a8cd1281568"},
|
||||
{file = "aws_xray_sdk-2.9.0-py2.py3-none-any.whl", hash = "sha256:98216b3ac8281b51b59a8703f8ec561c460807d9d0679838f5c0179d381d7e58"},
|
||||
]
|
||||
backoff = [
|
||||
{file = "backoff-1.11.1-py2.py3-none-any.whl", hash = "sha256:61928f8fa48d52e4faa81875eecf308eccfb1016b018bb6bd21e05b5d90a96c5"},
|
||||
{file = "backoff-1.11.1.tar.gz", hash = "sha256:ccb962a2378418c667b3c979b504fdeb7d9e0d29c0579e3b13b86467177728cb"},
|
||||
]
|
||||
boto3 = [
|
||||
{file = "boto3-1.20.40-py3-none-any.whl", hash = "sha256:cfe85589e4a0a997c7b9ae7432400b03fa6fa5fea29fdc48db3099a903b76998"},
|
||||
{file = "boto3-1.20.40.tar.gz", hash = "sha256:66aef9a6d8cad393f69166112ba49e14e2c6766f9278c96134101314a9af2992"},
|
||||
@@ -1666,8 +1679,8 @@ mccabe = [
|
||||
{file = "mccabe-0.6.1.tar.gz", hash = "sha256:dd8d182285a0fe56bace7f45b5e7d1a6ebcbf524e8f3bd87eb0f125271b8831f"},
|
||||
]
|
||||
moto = [
|
||||
{file = "moto-3.0.0-py2.py3-none-any.whl", hash = "sha256:762d33bbad3642c687f6495e69331318bef43f9aa662174397706ec3ad2a3578"},
|
||||
{file = "moto-3.0.0.tar.gz", hash = "sha256:d6b00a2663290e7ebb06823d5ffcb124c8dc9bf526b878539ef7c4a377fd8255"},
|
||||
{file = "moto-3.0.4-py2.py3-none-any.whl", hash = "sha256:79646213d8438385182f4eea79e28725f94b3d0d3dc9a3eda81db47e0ebef6cc"},
|
||||
{file = "moto-3.0.4.tar.gz", hash = "sha256:168b8a3cb4dd8a6df8e51d582761cefa9657b9f45ac7e1eb24dae394ebc9e000"},
|
||||
]
|
||||
mypy = [
|
||||
{file = "mypy-0.910-cp35-cp35m-macosx_10_9_x86_64.whl", hash = "sha256:a155d80ea6cee511a3694b108c4494a39f42de11ee4e61e72bc424c490e46457"},
|
||||
|
||||
@@ -7,11 +7,13 @@ use std::collections::HashMap;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use zenith_utils::pq_proto::{BeMessage as Be, BeParameterStatusMessage, FeMessage as Fe};
|
||||
|
||||
// TODO rename the struct to ClientParams or something
|
||||
/// Various client credentials which we use for authentication.
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub struct ClientCredentials {
|
||||
pub user: String,
|
||||
pub dbname: String,
|
||||
pub options: Option<String>,
|
||||
}
|
||||
|
||||
impl TryFrom<HashMap<String, String>> for ClientCredentials {
|
||||
@@ -25,9 +27,22 @@ impl TryFrom<HashMap<String, String>> for ClientCredentials {
|
||||
};
|
||||
|
||||
let user = get_param("user")?;
|
||||
let db = get_param("database")?;
|
||||
let dbname = get_param("database")?;
|
||||
|
||||
Ok(Self { user, dbname: db })
|
||||
// TODO see what other options should be recognized, possibly all.
|
||||
let options = match get_param("search_path") {
|
||||
Ok(path) => Some(format!("-c search_path={}", path)),
|
||||
Err(_) => None,
|
||||
};
|
||||
|
||||
// TODO investigate why "" is always a key
|
||||
// TODO warn on unrecognized options?
|
||||
|
||||
Ok(Self {
|
||||
user,
|
||||
dbname,
|
||||
options,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -85,6 +100,7 @@ async fn handle_static(
|
||||
dbname: creds.dbname.clone(),
|
||||
user: creds.user.clone(),
|
||||
password: Some(cleartext_password.into()),
|
||||
options: creds.options,
|
||||
};
|
||||
|
||||
client
|
||||
@@ -117,15 +133,22 @@ async fn handle_existing_user(
|
||||
.ok_or_else(|| anyhow!("unexpected password message"))?;
|
||||
|
||||
let cplane = CPlaneApi::new(&config.auth_endpoint);
|
||||
let db_info = cplane
|
||||
.authenticate_proxy_request(creds, md5_response, &md5_salt, &psql_session_id)
|
||||
let db_info_response = cplane
|
||||
.authenticate_proxy_request(&creds, md5_response, &md5_salt, &psql_session_id)
|
||||
.await?;
|
||||
|
||||
client
|
||||
.write_message_noflush(&Be::AuthenticationOk)?
|
||||
.write_message_noflush(&BeParameterStatusMessage::encoding())?;
|
||||
|
||||
Ok(db_info)
|
||||
Ok(DatabaseInfo {
|
||||
host: db_info_response.host,
|
||||
port: db_info_response.port,
|
||||
dbname: db_info_response.dbname,
|
||||
user: db_info_response.user,
|
||||
password: db_info_response.password,
|
||||
options: creds.options,
|
||||
})
|
||||
}
|
||||
|
||||
async fn handle_new_user(
|
||||
@@ -135,7 +158,7 @@ async fn handle_new_user(
|
||||
let psql_session_id = new_psql_session_id();
|
||||
let greeting = hello_message(&config.redirect_uri, &psql_session_id);
|
||||
|
||||
let db_info = cplane_api::with_waiter(psql_session_id, |waiter| async {
|
||||
let db_info_response = cplane_api::with_waiter(psql_session_id, |waiter| async {
|
||||
// Give user a URL to spawn a new database
|
||||
client
|
||||
.write_message_noflush(&Be::AuthenticationOk)?
|
||||
@@ -150,7 +173,14 @@ async fn handle_new_user(
|
||||
|
||||
client.write_message_noflush(&Be::NoticeResponse("Connecting to database.".into()))?;
|
||||
|
||||
Ok(db_info)
|
||||
Ok(DatabaseInfo {
|
||||
host: db_info_response.host,
|
||||
port: db_info_response.port,
|
||||
dbname: db_info_response.dbname,
|
||||
user: db_info_response.user,
|
||||
password: db_info_response.password,
|
||||
options: None,
|
||||
})
|
||||
}
|
||||
|
||||
fn hello_message(redirect_uri: &str, session_id: &str) -> String {
|
||||
|
||||
@@ -10,6 +10,7 @@ pub struct DatabaseInfo {
|
||||
pub dbname: String,
|
||||
pub user: String,
|
||||
pub password: Option<String>,
|
||||
pub options: Option<String>,
|
||||
}
|
||||
|
||||
impl DatabaseInfo {
|
||||
@@ -33,6 +34,10 @@ impl From<DatabaseInfo> for tokio_postgres::Config {
|
||||
.dbname(&db_info.dbname)
|
||||
.user(&db_info.user);
|
||||
|
||||
if let Some(options) = db_info.options {
|
||||
config.options(&options);
|
||||
}
|
||||
|
||||
if let Some(password) = db_info.password {
|
||||
config.password(password);
|
||||
}
|
||||
|
||||
@@ -1,25 +1,37 @@
|
||||
use crate::auth::ClientCredentials;
|
||||
use crate::compute::DatabaseInfo;
|
||||
use crate::waiters::{Waiter, Waiters};
|
||||
use anyhow::{anyhow, bail};
|
||||
use lazy_static::lazy_static;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// Part of the legacy cplane responses
|
||||
#[derive(Serialize, Deserialize, Debug, Default)]
|
||||
pub struct DatabaseInfoResponse {
|
||||
pub host: String,
|
||||
pub port: u16,
|
||||
pub dbname: String,
|
||||
pub user: String,
|
||||
pub password: Option<String>,
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
static ref CPLANE_WAITERS: Waiters<Result<DatabaseInfo, String>> = Default::default();
|
||||
static ref CPLANE_WAITERS: Waiters<Result<DatabaseInfoResponse, String>> = Default::default();
|
||||
}
|
||||
|
||||
/// Give caller an opportunity to wait for cplane's reply.
|
||||
pub async fn with_waiter<F, R, T>(psql_session_id: impl Into<String>, f: F) -> anyhow::Result<T>
|
||||
where
|
||||
F: FnOnce(Waiter<'static, Result<DatabaseInfo, String>>) -> R,
|
||||
F: FnOnce(Waiter<'static, Result<DatabaseInfoResponse, String>>) -> R,
|
||||
R: std::future::Future<Output = anyhow::Result<T>>,
|
||||
{
|
||||
let waiter = CPLANE_WAITERS.register(psql_session_id.into())?;
|
||||
f(waiter).await
|
||||
}
|
||||
|
||||
pub fn notify(psql_session_id: &str, msg: Result<DatabaseInfo, String>) -> anyhow::Result<()> {
|
||||
pub fn notify(
|
||||
psql_session_id: &str,
|
||||
msg: Result<DatabaseInfoResponse, String>,
|
||||
) -> anyhow::Result<()> {
|
||||
CPLANE_WAITERS.notify(psql_session_id, msg)
|
||||
}
|
||||
|
||||
@@ -37,11 +49,11 @@ impl<'a> CPlaneApi<'a> {
|
||||
impl CPlaneApi<'_> {
|
||||
pub async fn authenticate_proxy_request(
|
||||
&self,
|
||||
creds: ClientCredentials,
|
||||
creds: &ClientCredentials,
|
||||
md5_response: &[u8],
|
||||
salt: &[u8; 4],
|
||||
psql_session_id: &str,
|
||||
) -> anyhow::Result<DatabaseInfo> {
|
||||
) -> anyhow::Result<DatabaseInfoResponse> {
|
||||
let mut url = reqwest::Url::parse(self.auth_endpoint)?;
|
||||
url.query_pairs_mut()
|
||||
.append_pair("login", &creds.user)
|
||||
@@ -77,7 +89,7 @@ impl CPlaneApi<'_> {
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[serde(untagged)]
|
||||
enum ProxyAuthResponse {
|
||||
Ready { conn_info: DatabaseInfo },
|
||||
Ready { conn_info: DatabaseInfoResponse },
|
||||
Error { error: String },
|
||||
NotReady { ready: bool }, // TODO: get rid of `ready`
|
||||
}
|
||||
@@ -92,13 +104,13 @@ mod tests {
|
||||
// Ready
|
||||
let auth: ProxyAuthResponse = serde_json::from_value(json!({
|
||||
"ready": true,
|
||||
"conn_info": DatabaseInfo::default(),
|
||||
"conn_info": DatabaseInfoResponse::default(),
|
||||
}))
|
||||
.unwrap();
|
||||
assert!(matches!(
|
||||
auth,
|
||||
ProxyAuthResponse::Ready {
|
||||
conn_info: DatabaseInfo { .. }
|
||||
conn_info: DatabaseInfoResponse { .. }
|
||||
}
|
||||
));
|
||||
|
||||
|
||||
@@ -122,7 +122,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
None => RouterConfig::Dynamic(auth_method),
|
||||
Some(addr) => {
|
||||
if let ClientAuthMethod::Password = auth_method {
|
||||
let (host, port) = addr.split_once(":").unwrap();
|
||||
let (host, port) = addr.split_once(':').unwrap();
|
||||
RouterConfig::Static {
|
||||
host: host.to_string(),
|
||||
port: port.parse().unwrap(),
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::{compute::DatabaseInfo, cplane_api};
|
||||
use crate::cplane_api;
|
||||
use anyhow::Context;
|
||||
use serde::Deserialize;
|
||||
use std::{
|
||||
@@ -75,7 +75,7 @@ struct PsqlSessionResponse {
|
||||
|
||||
#[derive(Deserialize)]
|
||||
enum PsqlSessionResult {
|
||||
Success(DatabaseInfo),
|
||||
Success(cplane_api::DatabaseInfoResponse),
|
||||
Failure(String),
|
||||
}
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::auth;
|
||||
use crate::auth::{self, ClientCredentials};
|
||||
use crate::cancellation::{self, CancelClosure, CancelMap};
|
||||
use crate::compute::DatabaseInfo;
|
||||
use crate::config::{ProxyConfig, TlsConfig};
|
||||
@@ -138,7 +138,6 @@ async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
stream.write_message(&Be::ErrorResponse(msg)).await?;
|
||||
bail!(msg);
|
||||
}
|
||||
|
||||
break Ok(Some((stream, params.try_into()?)));
|
||||
}
|
||||
CancelRequest(cancel_key_data) => {
|
||||
|
||||
@@ -21,6 +21,7 @@ types-psycopg2 = "^2.9.6"
|
||||
boto3 = "^1.20.40"
|
||||
boto3-stubs = "^1.20.40"
|
||||
moto = {version = "^3.0.0", extras = ["server"]}
|
||||
backoff = "^1.11.1"
|
||||
|
||||
[tool.poetry.dev-dependencies]
|
||||
yapf = "==0.31.0"
|
||||
|
||||
@@ -93,7 +93,7 @@ def check_backpressure(pg: Postgres, stop_event: threading.Event, polling_interv
|
||||
|
||||
def test_backpressure_received_lsn_lag(zenith_env_builder: ZenithEnvBuilder):
|
||||
zenith_env_builder.num_safekeepers = 1
|
||||
env = zenith_env_builder.init()
|
||||
env = zenith_env_builder.init_start()
|
||||
# Create a branch for us
|
||||
env.zenith_cli.create_branch("test_backpressure", "main")
|
||||
|
||||
|
||||
14
test_runner/batch_others/test_proxy.py
Normal file
14
test_runner/batch_others/test_proxy.py
Normal file
@@ -0,0 +1,14 @@
|
||||
import pytest
|
||||
|
||||
|
||||
def test_proxy_select_1(static_proxy):
|
||||
static_proxy.safe_psql("select 1;")
|
||||
|
||||
|
||||
def test_proxy_options(static_proxy):
|
||||
schema_name = "tmp_schema_1"
|
||||
with static_proxy.connect(schema=schema_name) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("SHOW search_path;")
|
||||
search_path = cur.fetchall()[0][0]
|
||||
assert schema_name == search_path
|
||||
@@ -78,8 +78,8 @@ def test_twophase(zenith_simple_env: ZenithEnv):
|
||||
cur2.execute("ROLLBACK PREPARED 'insert_two'")
|
||||
|
||||
cur2.execute('SELECT * FROM foo')
|
||||
assert cur2.fetchall() == [('one', ), ('three', )] # type: ignore[comparison-overlap]
|
||||
assert cur2.fetchall() == [('one', ), ('three', )]
|
||||
|
||||
# Only one committed insert is visible on the original branch
|
||||
cur.execute('SELECT * FROM foo')
|
||||
assert cur.fetchall() == [('three', )] # type: ignore[comparison-overlap]
|
||||
assert cur.fetchall() == [('three', )]
|
||||
|
||||
@@ -32,6 +32,7 @@ from typing_extensions import Literal
|
||||
import pytest
|
||||
|
||||
import requests
|
||||
import backoff # type: ignore
|
||||
|
||||
from .utils import (get_self_dir, mkdir_if_needed, subprocess_capture)
|
||||
from fixtures.log_helper import log
|
||||
@@ -237,14 +238,24 @@ def port_distributor(worker_base_port):
|
||||
|
||||
class PgProtocol:
|
||||
""" Reusable connection logic """
|
||||
def __init__(self, host: str, port: int, username: Optional[str] = None):
|
||||
def __init__(self,
|
||||
host: str,
|
||||
port: int,
|
||||
username: Optional[str] = None,
|
||||
password: Optional[str] = None,
|
||||
dbname: Optional[str] = None,
|
||||
schema: Optional[str] = None):
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.username = username
|
||||
self.password = password
|
||||
self.dbname = dbname
|
||||
self.schema = schema
|
||||
|
||||
def connstr(self,
|
||||
*,
|
||||
dbname: str = 'postgres',
|
||||
dbname: Optional[str] = None,
|
||||
schema: Optional[str] = None,
|
||||
username: Optional[str] = None,
|
||||
password: Optional[str] = None) -> str:
|
||||
"""
|
||||
@@ -252,6 +263,9 @@ class PgProtocol:
|
||||
"""
|
||||
|
||||
username = username or self.username
|
||||
password = password or self.password
|
||||
dbname = dbname or self.dbname or "postgres"
|
||||
schema = schema or self.schema
|
||||
res = f'host={self.host} port={self.port} dbname={dbname}'
|
||||
|
||||
if username:
|
||||
@@ -260,13 +274,17 @@ class PgProtocol:
|
||||
if password:
|
||||
res = f'{res} password={password}'
|
||||
|
||||
if schema:
|
||||
res = f"{res} options='-c search_path={schema}'"
|
||||
|
||||
return res
|
||||
|
||||
# autocommit=True here by default because that's what we need most of the time
|
||||
def connect(self,
|
||||
*,
|
||||
autocommit=True,
|
||||
dbname: str = 'postgres',
|
||||
dbname: Optional[str] = None,
|
||||
schema: Optional[str] = None,
|
||||
username: Optional[str] = None,
|
||||
password: Optional[str] = None) -> PgConnection:
|
||||
"""
|
||||
@@ -275,11 +293,13 @@ class PgProtocol:
|
||||
This method passes all extra params to connstr.
|
||||
"""
|
||||
|
||||
conn = psycopg2.connect(self.connstr(
|
||||
dbname=dbname,
|
||||
username=username,
|
||||
password=password,
|
||||
))
|
||||
conn = psycopg2.connect(
|
||||
self.connstr(
|
||||
dbname=dbname,
|
||||
schema=schema,
|
||||
username=username,
|
||||
password=password,
|
||||
))
|
||||
# WARNING: this setting affects *all* tests!
|
||||
conn.autocommit = autocommit
|
||||
return conn
|
||||
@@ -401,6 +421,7 @@ class ZenithEnvBuilder:
|
||||
repo_dir: Path,
|
||||
port_distributor: PortDistributor,
|
||||
pageserver_remote_storage: Optional[RemoteStorage] = None,
|
||||
pageserver_config_override: Optional[str] = None,
|
||||
num_safekeepers: int = 0,
|
||||
pageserver_auth_enabled: bool = False,
|
||||
rust_log_override: Optional[str] = None):
|
||||
@@ -408,6 +429,7 @@ class ZenithEnvBuilder:
|
||||
self.rust_log_override = rust_log_override
|
||||
self.port_distributor = port_distributor
|
||||
self.pageserver_remote_storage = pageserver_remote_storage
|
||||
self.pageserver_config_override = pageserver_config_override
|
||||
self.num_safekeepers = num_safekeepers
|
||||
self.pageserver_auth_enabled = pageserver_auth_enabled
|
||||
self.env: Optional[ZenithEnv] = None
|
||||
@@ -557,7 +579,8 @@ class ZenithEnv:
|
||||
# Create a corresponding ZenithPageserver object
|
||||
self.pageserver = ZenithPageserver(self,
|
||||
port=pageserver_port,
|
||||
remote_storage=config.pageserver_remote_storage)
|
||||
remote_storage=config.pageserver_remote_storage,
|
||||
config_override=config.pageserver_config_override)
|
||||
|
||||
# Create config and a Safekeeper object for each safekeeper
|
||||
for i in range(1, config.num_safekeepers + 1):
|
||||
@@ -837,14 +860,17 @@ class ZenithCli:
|
||||
tmp.flush()
|
||||
|
||||
cmd = ['init', f'--config={tmp.name}']
|
||||
append_pageserver_param_overrides(cmd, self.env.pageserver.remote_storage)
|
||||
append_pageserver_param_overrides(cmd,
|
||||
self.env.pageserver.remote_storage,
|
||||
self.env.pageserver.config_override)
|
||||
|
||||
return self.raw_cli(cmd)
|
||||
|
||||
def pageserver_start(self, overrides=()) -> 'subprocess.CompletedProcess[str]':
|
||||
start_args = ['pageserver', 'start', *overrides]
|
||||
|
||||
append_pageserver_param_overrides(start_args, self.env.pageserver.remote_storage)
|
||||
append_pageserver_param_overrides(start_args,
|
||||
self.env.pageserver.remote_storage,
|
||||
self.env.pageserver.config_override)
|
||||
return self.raw_cli(start_args)
|
||||
|
||||
def pageserver_stop(self, immediate=False) -> 'subprocess.CompletedProcess[str]':
|
||||
@@ -989,16 +1015,19 @@ class ZenithPageserver(PgProtocol):
|
||||
env: ZenithEnv,
|
||||
port: PageserverPort,
|
||||
remote_storage: Optional[RemoteStorage] = None,
|
||||
config_override: Optional[str] = None,
|
||||
enable_auth=False):
|
||||
super().__init__(host='localhost', port=port.pg, username='zenith_admin')
|
||||
self.env = env
|
||||
self.running = False
|
||||
self.service_port = port # do not shadow PgProtocol.port which is just int
|
||||
self.remote_storage = remote_storage
|
||||
self.config_override = config_override
|
||||
|
||||
def start(self, overrides=()) -> 'ZenithPageserver':
|
||||
"""
|
||||
Start the page server.
|
||||
`overrides` allows to add some config to this pageserver start.
|
||||
Returns self.
|
||||
"""
|
||||
assert self.running == False
|
||||
@@ -1031,8 +1060,11 @@ class ZenithPageserver(PgProtocol):
|
||||
)
|
||||
|
||||
|
||||
def append_pageserver_param_overrides(params_to_update: List[str],
|
||||
pageserver_remote_storage: Optional[RemoteStorage]):
|
||||
def append_pageserver_param_overrides(
|
||||
params_to_update: List[str],
|
||||
pageserver_remote_storage: Optional[RemoteStorage],
|
||||
pageserver_config_override: Optional[str] = None,
|
||||
):
|
||||
if pageserver_remote_storage is not None:
|
||||
if isinstance(pageserver_remote_storage, LocalFsStorage):
|
||||
pageserver_storage_override = f"local_path='{pageserver_remote_storage.root}'"
|
||||
@@ -1058,6 +1090,12 @@ def append_pageserver_param_overrides(params_to_update: List[str],
|
||||
f'--pageserver-config-override={o.strip()}' for o in env_overrides.split(';')
|
||||
]
|
||||
|
||||
if pageserver_config_override is not None:
|
||||
params_to_update += [
|
||||
f'--pageserver-config-override={o.strip()}'
|
||||
for o in pageserver_config_override.split(';')
|
||||
]
|
||||
|
||||
|
||||
class PgBin:
|
||||
""" A helper class for executing postgres binaries """
|
||||
@@ -1164,6 +1202,50 @@ def vanilla_pg(test_output_dir: str) -> Iterator[VanillaPostgres]:
|
||||
yield vanilla_pg
|
||||
|
||||
|
||||
class ZenithProxy(PgProtocol):
|
||||
def __init__(self, port: int):
|
||||
super().__init__(host="127.0.0.1", username="pytest", password="pytest", port=port)
|
||||
self.http_port = 7001
|
||||
self._popen: Optional[subprocess.Popen[bytes]] = None
|
||||
|
||||
def start_static(self, addr="127.0.0.1:5432") -> None:
|
||||
assert self._popen is None
|
||||
|
||||
# Start proxy
|
||||
bin_proxy = os.path.join(str(zenith_binpath), 'proxy')
|
||||
args = [bin_proxy]
|
||||
args.extend(["--http", f"{self.host}:{self.http_port}"])
|
||||
args.extend(["--proxy", f"{self.host}:{self.port}"])
|
||||
args.extend(["--auth-method", "password"])
|
||||
args.extend(["--static-router", addr])
|
||||
self._popen = subprocess.Popen(args)
|
||||
self._wait_until_ready()
|
||||
|
||||
@backoff.on_exception(backoff.expo, requests.exceptions.RequestException, max_time=10)
|
||||
def _wait_until_ready(self):
|
||||
requests.get(f"http://{self.host}:{self.http_port}/v1/status")
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
if self._popen is not None:
|
||||
# NOTE the process will die when we're done with tests anyway, because
|
||||
# it's a child process. This is mostly to clean up in between different tests.
|
||||
self._popen.kill()
|
||||
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def static_proxy(vanilla_pg) -> Iterator[ZenithProxy]:
|
||||
"""Zenith proxy that routes directly to vanilla postgres."""
|
||||
vanilla_pg.start()
|
||||
vanilla_pg.safe_psql("create user pytest with password 'pytest';")
|
||||
|
||||
with ZenithProxy(4432) as proxy:
|
||||
proxy.start_static()
|
||||
yield proxy
|
||||
|
||||
|
||||
class Postgres(PgProtocol):
|
||||
""" An object representing a running postgres daemon. """
|
||||
def __init__(self, env: ZenithEnv, tenant_id: uuid.UUID, port: int):
|
||||
|
||||
2
vendor/postgres
vendored
2
vendor/postgres
vendored
Submodule vendor/postgres updated: a3709cc364...31dc24ab29
@@ -41,6 +41,12 @@ fn main() -> Result<()> {
|
||||
.takes_value(true)
|
||||
.help("Path to the safekeeper data directory"),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("init")
|
||||
.long("init")
|
||||
.takes_value(false)
|
||||
.help("Initialize safekeeper with ID"),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("listen-pg")
|
||||
.short('l')
|
||||
@@ -151,10 +157,10 @@ fn main() -> Result<()> {
|
||||
));
|
||||
}
|
||||
|
||||
start_safekeeper(conf, given_id)
|
||||
start_safekeeper(conf, given_id, arg_matches.is_present("init"))
|
||||
}
|
||||
|
||||
fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<ZNodeId>) -> Result<()> {
|
||||
fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<ZNodeId>, init: bool) -> Result<()> {
|
||||
let log_file = logging::init("safekeeper.log", conf.daemonize)?;
|
||||
|
||||
info!("version: {}", GIT_VERSION);
|
||||
@@ -171,6 +177,9 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<ZNodeId>) -> Resu
|
||||
|
||||
// Set or read our ID.
|
||||
set_id(&mut conf, given_id)?;
|
||||
if init {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let http_listener = tcp_listener::bind(conf.listen_http_addr.clone()).map_err(|e| {
|
||||
error!("failed to bind to address {}: {}", conf.listen_http_addr, e);
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
|
||||
use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage};
|
||||
use crate::receive_wal::ReceiveWalConn;
|
||||
use crate::safekeeper::{AcceptorProposerMessage, ProposerAcceptorMessage};
|
||||
use crate::send_wal::ReplicationConn;
|
||||
use crate::timeline::{Timeline, TimelineTools};
|
||||
use crate::SafeKeeperConf;
|
||||
@@ -160,6 +161,17 @@ impl SafekeeperPostgresHandler {
|
||||
}
|
||||
}
|
||||
|
||||
/// Shortcut for calling `process_msg` in the timeline.
|
||||
pub fn process_safekeeper_msg(
|
||||
&self,
|
||||
msg: &ProposerAcceptorMessage,
|
||||
) -> Result<Option<AcceptorProposerMessage>> {
|
||||
self.timeline
|
||||
.get()
|
||||
.process_msg(msg)
|
||||
.context("failed to process ProposerAcceptorMessage")
|
||||
}
|
||||
|
||||
///
|
||||
/// Handle IDENTIFY_SYSTEM replication command
|
||||
///
|
||||
|
||||
@@ -2,15 +2,21 @@
|
||||
//! Gets messages from the network, passes them down to consensus module and
|
||||
//! sends replies back.
|
||||
|
||||
use anyhow::{bail, Context, Result};
|
||||
use bytes::Bytes;
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
|
||||
use bytes::BytesMut;
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
use tracing::*;
|
||||
use zenith_utils::sock_split::ReadStream;
|
||||
|
||||
use crate::timeline::Timeline;
|
||||
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::mpsc::channel;
|
||||
use std::sync::mpsc::Receiver;
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
|
||||
use crate::safekeeper::AcceptorProposerMessage;
|
||||
use crate::safekeeper::ProposerAcceptorMessage;
|
||||
@@ -46,21 +52,6 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
}
|
||||
}
|
||||
|
||||
// Read and extract the bytes of a `CopyData` message from the postgres instance
|
||||
fn read_msg_bytes(&mut self) -> Result<Bytes> {
|
||||
match self.pg_backend.read_message()? {
|
||||
Some(FeMessage::CopyData(bytes)) => Ok(bytes),
|
||||
Some(msg) => bail!("expected `CopyData` message, found {:?}", msg),
|
||||
None => bail!("connection closed unexpectedly"),
|
||||
}
|
||||
}
|
||||
|
||||
// Read and parse message sent from the postgres instance
|
||||
fn read_msg(&mut self) -> Result<ProposerAcceptorMessage> {
|
||||
let data = self.read_msg_bytes()?;
|
||||
ProposerAcceptorMessage::parse(data)
|
||||
}
|
||||
|
||||
// Send message to the postgres
|
||||
fn write_msg(&mut self, msg: &AcceptorProposerMessage) -> Result<()> {
|
||||
let mut buf = BytesMut::with_capacity(128);
|
||||
@@ -77,18 +68,22 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
self.pg_backend
|
||||
.write_message(&BeMessage::CopyBothResponse)?;
|
||||
|
||||
let r = self
|
||||
.pg_backend
|
||||
.take_stream_in()
|
||||
.ok_or_else(|| anyhow!("failed to take read stream from pgbackend"))?;
|
||||
let mut poll_reader = ProposerPollStream::new(r)?;
|
||||
|
||||
// Receive information about server
|
||||
let mut msg = self
|
||||
.read_msg()
|
||||
.context("failed to receive proposer greeting")?;
|
||||
match msg {
|
||||
let next_msg = poll_reader.recv_msg()?;
|
||||
match next_msg {
|
||||
ProposerAcceptorMessage::Greeting(ref greeting) => {
|
||||
info!(
|
||||
"start handshake with wal proposer {} sysid {} timeline {}",
|
||||
self.peer_addr, greeting.system_id, greeting.tli,
|
||||
);
|
||||
}
|
||||
_ => bail!("unexpected message {:?} instead of greeting", msg),
|
||||
_ => bail!("unexpected message {:?} instead of greeting", next_msg),
|
||||
}
|
||||
|
||||
// Register the connection and defer unregister.
|
||||
@@ -100,16 +95,97 @@ impl<'pg> ReceiveWalConn<'pg> {
|
||||
callmemaybe_tx: spg.tx.clone(),
|
||||
};
|
||||
|
||||
let mut next_msg = Some(next_msg);
|
||||
|
||||
loop {
|
||||
let reply = spg
|
||||
.timeline
|
||||
.get()
|
||||
.process_msg(&msg)
|
||||
.context("failed to process ProposerAcceptorMessage")?;
|
||||
if let Some(reply) = reply {
|
||||
self.write_msg(&reply)?;
|
||||
if matches!(next_msg, Some(ProposerAcceptorMessage::AppendRequest(_))) {
|
||||
// poll AppendRequest's without blocking and write WAL to disk without flushing,
|
||||
// while it's readily available
|
||||
while let Some(ProposerAcceptorMessage::AppendRequest(append_request)) = next_msg {
|
||||
let msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
|
||||
|
||||
let reply = spg.process_safekeeper_msg(&msg)?;
|
||||
if let Some(reply) = reply {
|
||||
self.write_msg(&reply)?;
|
||||
}
|
||||
|
||||
next_msg = poll_reader.poll_msg();
|
||||
}
|
||||
|
||||
// flush all written WAL to the disk
|
||||
let reply = spg.process_safekeeper_msg(&ProposerAcceptorMessage::FlushWAL)?;
|
||||
if let Some(reply) = reply {
|
||||
self.write_msg(&reply)?;
|
||||
}
|
||||
} else if let Some(msg) = next_msg.take() {
|
||||
// process other message
|
||||
let reply = spg.process_safekeeper_msg(&msg)?;
|
||||
if let Some(reply) = reply {
|
||||
self.write_msg(&reply)?;
|
||||
}
|
||||
}
|
||||
msg = self.read_msg()?;
|
||||
|
||||
// blocking wait for the next message
|
||||
if next_msg.is_none() {
|
||||
next_msg = Some(poll_reader.recv_msg()?);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct ProposerPollStream {
|
||||
msg_rx: Receiver<ProposerAcceptorMessage>,
|
||||
read_thread: Option<thread::JoinHandle<Result<()>>>,
|
||||
}
|
||||
|
||||
impl ProposerPollStream {
|
||||
fn new(mut r: ReadStream) -> Result<Self> {
|
||||
let (msg_tx, msg_rx) = channel();
|
||||
|
||||
let read_thread = thread::Builder::new()
|
||||
.name("Read WAL thread".into())
|
||||
.spawn(move || -> Result<()> {
|
||||
loop {
|
||||
let copy_data = match FeMessage::read(&mut r)? {
|
||||
Some(FeMessage::CopyData(bytes)) => bytes,
|
||||
Some(msg) => bail!("expected `CopyData` message, found {:?}", msg),
|
||||
None => bail!("connection closed unexpectedly"),
|
||||
};
|
||||
|
||||
let msg = ProposerAcceptorMessage::parse(copy_data)?;
|
||||
msg_tx.send(msg)?;
|
||||
}
|
||||
// msg_tx will be dropped here, this will also close msg_rx
|
||||
})?;
|
||||
|
||||
Ok(Self {
|
||||
msg_rx,
|
||||
read_thread: Some(read_thread),
|
||||
})
|
||||
}
|
||||
|
||||
fn recv_msg(&mut self) -> Result<ProposerAcceptorMessage> {
|
||||
self.msg_rx.recv().map_err(|_| {
|
||||
// return error from the read thread
|
||||
let res = match self.read_thread.take() {
|
||||
Some(thread) => thread.join(),
|
||||
None => return anyhow!("read thread is gone"),
|
||||
};
|
||||
|
||||
match res {
|
||||
Ok(Ok(())) => anyhow!("unexpected result from read thread"),
|
||||
Err(err) => anyhow!("read thread panicked: {:?}", err),
|
||||
Ok(Err(err)) => err,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn poll_msg(&mut self) -> Option<ProposerAcceptorMessage> {
|
||||
let res = self.msg_rx.try_recv();
|
||||
|
||||
match res {
|
||||
Err(_) => None,
|
||||
Ok(msg) => Some(msg),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -301,6 +301,8 @@ pub enum ProposerAcceptorMessage {
|
||||
VoteRequest(VoteRequest),
|
||||
Elected(ProposerElected),
|
||||
AppendRequest(AppendRequest),
|
||||
NoFlushAppendRequest(AppendRequest),
|
||||
FlushWAL,
|
||||
}
|
||||
|
||||
impl ProposerAcceptorMessage {
|
||||
@@ -499,7 +501,11 @@ where
|
||||
ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg),
|
||||
ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg),
|
||||
ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg),
|
||||
ProposerAcceptorMessage::AppendRequest(msg) => self.handle_append_request(msg),
|
||||
ProposerAcceptorMessage::AppendRequest(msg) => self.handle_append_request(msg, true),
|
||||
ProposerAcceptorMessage::NoFlushAppendRequest(msg) => {
|
||||
self.handle_append_request(msg, false)
|
||||
}
|
||||
ProposerAcceptorMessage::FlushWAL => self.handle_flush(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -605,7 +611,10 @@ where
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// truncate wal, update the lsns
|
||||
// TODO: cross check divergence point, check if msg.start_streaming_at corresponds to
|
||||
// intersection of our history and history from msg
|
||||
|
||||
// truncate wal, update the LSNs
|
||||
self.wal_store.truncate_wal(msg.start_streaming_at)?;
|
||||
|
||||
// and now adopt term history from proposer
|
||||
@@ -622,6 +631,7 @@ where
|
||||
fn handle_append_request(
|
||||
&mut self,
|
||||
msg: &AppendRequest,
|
||||
mut require_flush: bool,
|
||||
) -> Result<Option<AcceptorProposerMessage>> {
|
||||
if self.s.acceptor_state.term < msg.h.term {
|
||||
bail!("got AppendRequest before ProposerElected");
|
||||
@@ -650,9 +660,15 @@ where
|
||||
if self.s.wal_start_lsn == Lsn(0) {
|
||||
self.s.wal_start_lsn = msg.h.begin_lsn;
|
||||
sync_control_file = true;
|
||||
require_flush = true;
|
||||
}
|
||||
}
|
||||
|
||||
// flush wal to the disk, if required
|
||||
if require_flush {
|
||||
self.wal_store.flush_wal()?;
|
||||
}
|
||||
|
||||
// Advance commit_lsn taking into account what we have locally.
|
||||
// commit_lsn can be 0, being unknown to new walproposer while he hasn't
|
||||
// collected majority of its epoch acks yet, ignore it in this case.
|
||||
@@ -670,11 +686,9 @@ where
|
||||
}
|
||||
|
||||
self.truncate_lsn = msg.h.truncate_lsn;
|
||||
/*
|
||||
* Update truncate and commit LSN in control file.
|
||||
* To avoid negative impact on performance of extra fsync, do it only
|
||||
* when truncate_lsn delta exceeds WAL segment size.
|
||||
*/
|
||||
// Update truncate and commit LSN in control file.
|
||||
// To avoid negative impact on performance of extra fsync, do it only
|
||||
// when truncate_lsn delta exceeds WAL segment size.
|
||||
sync_control_file |=
|
||||
self.s.truncate_lsn + (self.s.server.wal_seg_size as u64) < self.truncate_lsn;
|
||||
if sync_control_file {
|
||||
@@ -686,6 +700,11 @@ where
|
||||
self.control_store.persist(&self.s)?;
|
||||
}
|
||||
|
||||
// If flush_lsn hasn't updated, AppendResponse is not very useful.
|
||||
if !require_flush {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let resp = self.append_response();
|
||||
trace!(
|
||||
"processed AppendRequest of len {}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, resp {:?}",
|
||||
@@ -697,6 +716,14 @@ where
|
||||
);
|
||||
Ok(Some(AcceptorProposerMessage::AppendResponse(resp)))
|
||||
}
|
||||
|
||||
/// Flush WAL to disk. Return AppendResponse with latest LSNs.
|
||||
fn handle_flush(&mut self) -> Result<Option<AcceptorProposerMessage>> {
|
||||
self.wal_store.flush_wal()?;
|
||||
Ok(Some(AcceptorProposerMessage::AppendResponse(
|
||||
self.append_response(),
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -738,6 +765,10 @@ mod tests {
|
||||
self.lsn = end_pos;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn flush_wal(&mut self) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
//!
|
||||
//! Note that last file has `.partial` suffix, that's different from postgres.
|
||||
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use anyhow::{anyhow, bail, Context, Result};
|
||||
use std::io::{Read, Seek, SeekFrom};
|
||||
|
||||
use lazy_static::lazy_static;
|
||||
@@ -58,12 +58,20 @@ lazy_static! {
|
||||
DISK_WRITE_SECONDS_BUCKETS.to_vec()
|
||||
)
|
||||
.expect("Failed to register safekeeper_write_wal_seconds histogram vec");
|
||||
static ref FLUSH_WAL_SECONDS: HistogramVec = register_histogram_vec!(
|
||||
"safekeeper_flush_wal_seconds",
|
||||
"Seconds spent syncing WAL to a disk, grouped by timeline",
|
||||
&["tenant_id", "timeline_id"],
|
||||
DISK_WRITE_SECONDS_BUCKETS.to_vec()
|
||||
)
|
||||
.expect("Failed to register safekeeper_flush_wal_seconds histogram vec");
|
||||
}
|
||||
|
||||
struct WalStorageMetrics {
|
||||
flush_lsn: Gauge,
|
||||
write_wal_bytes: Histogram,
|
||||
write_wal_seconds: Histogram,
|
||||
flush_wal_seconds: Histogram,
|
||||
}
|
||||
|
||||
impl WalStorageMetrics {
|
||||
@@ -74,24 +82,38 @@ impl WalStorageMetrics {
|
||||
flush_lsn: FLUSH_LSN_GAUGE.with_label_values(&[&tenant_id, &timeline_id]),
|
||||
write_wal_bytes: WRITE_WAL_BYTES.with_label_values(&[&tenant_id, &timeline_id]),
|
||||
write_wal_seconds: WRITE_WAL_SECONDS.with_label_values(&[&tenant_id, &timeline_id]),
|
||||
flush_wal_seconds: FLUSH_WAL_SECONDS.with_label_values(&[&tenant_id, &timeline_id]),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub trait Storage {
|
||||
/// lsn of last durably stored WAL record.
|
||||
/// LSN of last durably stored WAL record.
|
||||
fn flush_lsn(&self) -> Lsn;
|
||||
|
||||
/// Init storage with wal_seg_size and read WAL from disk to get latest lsn.
|
||||
/// Init storage with wal_seg_size and read WAL from disk to get latest LSN.
|
||||
fn init_storage(&mut self, state: &SafeKeeperState) -> Result<()>;
|
||||
|
||||
/// Write piece of wal in buf to disk and sync it.
|
||||
/// Write piece of WAL from buf to disk, but not necessarily sync it.
|
||||
fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()>;
|
||||
|
||||
// Truncate WAL at specified LSN.
|
||||
/// Truncate WAL at specified LSN, which must be the end of WAL record.
|
||||
fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()>;
|
||||
|
||||
/// Durably store WAL on disk, up to the last written WAL record.
|
||||
fn flush_wal(&mut self) -> Result<()>;
|
||||
}
|
||||
|
||||
/// PhysicalStorage is a storage that stores WAL on disk. Writes are separated from flushes
|
||||
/// for better performance. Storage must be initialized before use.
|
||||
///
|
||||
/// WAL is stored in segments, each segment is a file. Last segment has ".partial" suffix in
|
||||
/// its filename and may be not fully flushed.
|
||||
///
|
||||
/// Relationship of LSNs:
|
||||
/// `write_lsn` >= `write_record_lsn` >= `flush_record_lsn`
|
||||
///
|
||||
/// When storage is just created, all LSNs are zeroes and there are no segments on disk.
|
||||
pub struct PhysicalStorage {
|
||||
metrics: WalStorageMetrics,
|
||||
zttid: ZTenantTimelineId,
|
||||
@@ -99,27 +121,29 @@ pub struct PhysicalStorage {
|
||||
conf: SafeKeeperConf,
|
||||
|
||||
// fields below are filled upon initialization
|
||||
|
||||
// None if unitialized, Some(lsn) if storage is initialized
|
||||
/// None if unitialized, Some(usize) if storage is initialized.
|
||||
wal_seg_size: Option<usize>,
|
||||
|
||||
// Relationship of lsns:
|
||||
// `write_lsn` >= `write_record_lsn` >= `flush_record_lsn`
|
||||
//
|
||||
// All lsns are zeroes, if storage is just created, and there are no segments on disk.
|
||||
|
||||
// Written to disk, but possibly still in the cache and not fully persisted.
|
||||
// Also can be ahead of record_lsn, if happen to be in the middle of a WAL record.
|
||||
/// Written to disk, but possibly still in the cache and not fully persisted.
|
||||
/// Also can be ahead of record_lsn, if happen to be in the middle of a WAL record.
|
||||
write_lsn: Lsn,
|
||||
|
||||
// The LSN of the last WAL record written to disk. Still can be not fully flushed.
|
||||
/// The LSN of the last WAL record written to disk. Still can be not fully flushed.
|
||||
write_record_lsn: Lsn,
|
||||
|
||||
// The LSN of the last WAL record flushed to disk.
|
||||
/// The LSN of the last WAL record flushed to disk.
|
||||
flush_record_lsn: Lsn,
|
||||
|
||||
// Decoder is required for detecting boundaries of WAL records.
|
||||
/// Decoder is required for detecting boundaries of WAL records.
|
||||
decoder: WalStreamDecoder,
|
||||
|
||||
/// Cached open file for the last segment.
|
||||
///
|
||||
/// If Some(file) is open, then it always:
|
||||
/// - has ".partial" suffix
|
||||
/// - points to write_lsn, so no seek is needed for writing
|
||||
/// - doesn't point to the end of the segment
|
||||
file: Option<File>,
|
||||
}
|
||||
|
||||
impl PhysicalStorage {
|
||||
@@ -135,128 +159,146 @@ impl PhysicalStorage {
|
||||
write_record_lsn: Lsn(0),
|
||||
flush_record_lsn: Lsn(0),
|
||||
decoder: WalStreamDecoder::new(Lsn(0)),
|
||||
file: None,
|
||||
}
|
||||
}
|
||||
|
||||
// wrapper for flush_lsn updates that also updates metrics
|
||||
/// Wrapper for flush_lsn updates that also updates metrics.
|
||||
fn update_flush_lsn(&mut self) {
|
||||
self.flush_record_lsn = self.write_record_lsn;
|
||||
self.metrics.flush_lsn.set(self.flush_record_lsn.0 as f64);
|
||||
}
|
||||
|
||||
/// Helper returning full path to WAL segment file and its .partial brother.
|
||||
fn wal_file_paths(&self, segno: XLogSegNo) -> Result<(PathBuf, PathBuf)> {
|
||||
let wal_seg_size = self
|
||||
.wal_seg_size
|
||||
.ok_or_else(|| anyhow!("wal_seg_size is not initialized"))?;
|
||||
|
||||
let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
|
||||
let wal_file_path = self.timeline_dir.join(wal_file_name.clone());
|
||||
let wal_file_partial_path = self.timeline_dir.join(wal_file_name + ".partial");
|
||||
Ok((wal_file_path, wal_file_partial_path))
|
||||
/// Call fdatasync if config requires so.
|
||||
fn fdatasync_file(&self, file: &mut File) -> Result<()> {
|
||||
if !self.conf.no_sync {
|
||||
self.metrics
|
||||
.flush_wal_seconds
|
||||
.observe_closure_duration(|| file.sync_data())?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// TODO: this function is going to be refactored soon, what will change:
|
||||
// - flush will be called separately from write_wal, this function
|
||||
// will only write bytes to disk
|
||||
// - File will be cached in PhysicalStorage, to remove extra syscalls,
|
||||
// such as open(), seek(), close()
|
||||
fn write_and_flush(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
|
||||
/// Call fsync if config requires so.
|
||||
fn fsync_file(&self, file: &mut File) -> Result<()> {
|
||||
if !self.conf.no_sync {
|
||||
self.metrics
|
||||
.flush_wal_seconds
|
||||
.observe_closure_duration(|| file.sync_all())?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Open or create WAL segment file. Caller must call seek to the wanted position.
|
||||
/// Returns `file` and `is_partial`.
|
||||
fn open_or_create(&self, segno: XLogSegNo, wal_seg_size: usize) -> Result<(File, bool)> {
|
||||
let (wal_file_path, wal_file_partial_path) =
|
||||
wal_file_paths(&self.timeline_dir, segno, wal_seg_size)?;
|
||||
|
||||
// Try to open already completed segment
|
||||
if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) {
|
||||
Ok((file, false))
|
||||
} else if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_partial_path) {
|
||||
// Try to open existing partial file
|
||||
Ok((file, true))
|
||||
} else {
|
||||
// Create and fill new partial file
|
||||
let mut file = OpenOptions::new()
|
||||
.create(true)
|
||||
.write(true)
|
||||
.open(&wal_file_partial_path)
|
||||
.with_context(|| format!("Failed to open log file {:?}", &wal_file_path))?;
|
||||
|
||||
write_zeroes(&mut file, wal_seg_size)?;
|
||||
self.fsync_file(&mut file)?;
|
||||
Ok((file, true))
|
||||
}
|
||||
}
|
||||
|
||||
/// Write WAL bytes, which are known to be located in a single WAL segment.
|
||||
fn write_in_segment(
|
||||
&mut self,
|
||||
segno: u64,
|
||||
xlogoff: usize,
|
||||
buf: &[u8],
|
||||
wal_seg_size: usize,
|
||||
) -> Result<()> {
|
||||
let mut file = if let Some(file) = self.file.take() {
|
||||
file
|
||||
} else {
|
||||
let (mut file, is_partial) = self.open_or_create(segno, wal_seg_size)?;
|
||||
assert!(is_partial, "unexpected write into non-partial segment file");
|
||||
file.seek(SeekFrom::Start(xlogoff as u64))?;
|
||||
file
|
||||
};
|
||||
|
||||
file.write_all(buf)?;
|
||||
|
||||
if xlogoff + buf.len() == wal_seg_size {
|
||||
// If we reached the end of a WAL segment, flush and close it.
|
||||
self.fdatasync_file(&mut file)?;
|
||||
|
||||
// Rename partial file to completed file
|
||||
let (wal_file_path, wal_file_partial_path) =
|
||||
wal_file_paths(&self.timeline_dir, segno, wal_seg_size)?;
|
||||
fs::rename(&wal_file_partial_path, &wal_file_path)?;
|
||||
} else {
|
||||
// otherwise, file can be reused later
|
||||
self.file = Some(file);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Writes WAL to the segment files, until everything is writed. If some segments
|
||||
/// are fully written, they are flushed to disk. The last (partial) segment can
|
||||
/// be flushed separately later.
|
||||
///
|
||||
/// Updates `write_lsn`.
|
||||
fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> {
|
||||
let wal_seg_size = self
|
||||
.wal_seg_size
|
||||
.ok_or_else(|| anyhow!("wal_seg_size is not initialized"))?;
|
||||
|
||||
let mut bytes_left: usize = buf.len();
|
||||
let mut bytes_written: usize = 0;
|
||||
let mut partial;
|
||||
let mut start_pos = startpos;
|
||||
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
|
||||
|
||||
/* Extract WAL location for this block */
|
||||
let mut xlogoff = start_pos.segment_offset(wal_seg_size) as usize;
|
||||
|
||||
while bytes_left != 0 {
|
||||
let bytes_to_write;
|
||||
|
||||
/*
|
||||
* If crossing a WAL boundary, only write up until we reach wal
|
||||
* segment size.
|
||||
*/
|
||||
if xlogoff + bytes_left > wal_seg_size {
|
||||
bytes_to_write = wal_seg_size - xlogoff;
|
||||
} else {
|
||||
bytes_to_write = bytes_left;
|
||||
if self.write_lsn != pos {
|
||||
// need to flush the file before discarding it
|
||||
if let Some(mut file) = self.file.take() {
|
||||
self.fdatasync_file(&mut file)?;
|
||||
}
|
||||
|
||||
/* Open file */
|
||||
let segno = start_pos.segment_number(wal_seg_size);
|
||||
let (wal_file_path, wal_file_partial_path) = self.wal_file_paths(segno)?;
|
||||
{
|
||||
let mut wal_file: File;
|
||||
/* Try to open already completed segment */
|
||||
if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) {
|
||||
wal_file = file;
|
||||
partial = false;
|
||||
} else if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_partial_path)
|
||||
{
|
||||
/* Try to open existed partial file */
|
||||
wal_file = file;
|
||||
partial = true;
|
||||
} else {
|
||||
/* Create and fill new partial file */
|
||||
partial = true;
|
||||
match OpenOptions::new()
|
||||
.create(true)
|
||||
.write(true)
|
||||
.open(&wal_file_partial_path)
|
||||
{
|
||||
Ok(mut file) => {
|
||||
for _ in 0..(wal_seg_size / XLOG_BLCKSZ) {
|
||||
file.write_all(ZERO_BLOCK)?;
|
||||
}
|
||||
wal_file = file;
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to open log file {:?}: {}", &wal_file_path, e);
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
wal_file.seek(SeekFrom::Start(xlogoff as u64))?;
|
||||
wal_file.write_all(&buf[bytes_written..(bytes_written + bytes_to_write)])?;
|
||||
|
||||
// Flush file, if not said otherwise
|
||||
if !self.conf.no_sync {
|
||||
wal_file.sync_all()?;
|
||||
}
|
||||
}
|
||||
/* Write was successful, advance our position */
|
||||
bytes_written += bytes_to_write;
|
||||
bytes_left -= bytes_to_write;
|
||||
start_pos += bytes_to_write as u64;
|
||||
xlogoff += bytes_to_write;
|
||||
|
||||
/* Did we reach the end of a WAL segment? */
|
||||
if start_pos.segment_offset(wal_seg_size) == 0 {
|
||||
xlogoff = 0;
|
||||
if partial {
|
||||
fs::rename(&wal_file_partial_path, &wal_file_path)?;
|
||||
}
|
||||
}
|
||||
self.write_lsn = pos;
|
||||
}
|
||||
|
||||
while !buf.is_empty() {
|
||||
// Extract WAL location for this block
|
||||
let xlogoff = self.write_lsn.segment_offset(wal_seg_size) as usize;
|
||||
let segno = self.write_lsn.segment_number(wal_seg_size);
|
||||
|
||||
// If crossing a WAL boundary, only write up until we reach wal segment size.
|
||||
let bytes_write = if xlogoff + buf.len() > wal_seg_size {
|
||||
wal_seg_size - xlogoff
|
||||
} else {
|
||||
buf.len()
|
||||
};
|
||||
|
||||
self.write_in_segment(segno, xlogoff, &buf[..bytes_write], wal_seg_size)?;
|
||||
self.write_lsn += bytes_write as u64;
|
||||
buf = &buf[bytes_write..];
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Storage for PhysicalStorage {
|
||||
// flush_lsn returns lsn of last durably stored WAL record.
|
||||
/// flush_lsn returns LSN of last durably stored WAL record.
|
||||
fn flush_lsn(&self) -> Lsn {
|
||||
self.flush_record_lsn
|
||||
}
|
||||
|
||||
// Storage needs to know wal_seg_size to know which segment to read/write, but
|
||||
// wal_seg_size is not always known at the moment of storage creation. This method
|
||||
// allows to postpone its initialization.
|
||||
/// Storage needs to know wal_seg_size to know which segment to read/write, but
|
||||
/// wal_seg_size is not always known at the moment of storage creation. This method
|
||||
/// allows to postpone its initialization.
|
||||
fn init_storage(&mut self, state: &SafeKeeperState) -> Result<()> {
|
||||
if state.server.wal_seg_size == 0 {
|
||||
// wal_seg_size is still unknown
|
||||
@@ -294,29 +336,31 @@ impl Storage for PhysicalStorage {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Write and flush WAL to disk.
|
||||
/// Write WAL to disk.
|
||||
fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
|
||||
// Disallow any non-sequential writes, which can result in gaps or overwrites.
|
||||
// If we need to move the pointer, use truncate_wal() instead.
|
||||
if self.write_lsn > startpos {
|
||||
warn!(
|
||||
bail!(
|
||||
"write_wal rewrites WAL written before, write_lsn={}, startpos={}",
|
||||
self.write_lsn, startpos
|
||||
self.write_lsn,
|
||||
startpos
|
||||
);
|
||||
}
|
||||
if self.write_lsn < startpos {
|
||||
warn!(
|
||||
if self.write_lsn < startpos && self.write_lsn != Lsn(0) {
|
||||
bail!(
|
||||
"write_wal creates gap in written WAL, write_lsn={}, startpos={}",
|
||||
self.write_lsn, startpos
|
||||
self.write_lsn,
|
||||
startpos
|
||||
);
|
||||
// TODO: return error if write_lsn is not zero
|
||||
}
|
||||
|
||||
{
|
||||
let _timer = self.metrics.write_wal_seconds.start_timer();
|
||||
self.write_and_flush(startpos, buf)?;
|
||||
self.write_exact(startpos, buf)?;
|
||||
}
|
||||
|
||||
// WAL is written and flushed, updating lsns
|
||||
self.write_lsn = startpos + buf.len() as u64;
|
||||
// WAL is written, updating write metrics
|
||||
self.metrics.write_wal_bytes.observe(buf.len() as f64);
|
||||
|
||||
// figure out last record's end lsn for reporting (if we got the
|
||||
@@ -339,69 +383,67 @@ impl Storage for PhysicalStorage {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn flush_wal(&mut self) -> Result<()> {
|
||||
if self.flush_record_lsn == self.write_record_lsn {
|
||||
// no need to do extra flush
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if let Some(mut unflushed_file) = self.file.take() {
|
||||
self.fdatasync_file(&mut unflushed_file)?;
|
||||
self.file = Some(unflushed_file);
|
||||
} else {
|
||||
// We have unflushed data (write_lsn != flush_lsn), but no file.
|
||||
// This should only happen if last file was fully written and flushed,
|
||||
// but haven't updated flush_lsn yet.
|
||||
assert!(self.write_lsn.segment_offset(self.wal_seg_size.unwrap()) == 0);
|
||||
}
|
||||
|
||||
// everything is flushed now, let's update flush_lsn
|
||||
self.update_flush_lsn();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Truncate written WAL by removing all WAL segments after the given LSN.
|
||||
// end_pos must point to the end of the WAL record.
|
||||
/// Truncate written WAL by removing all WAL segments after the given LSN.
|
||||
/// end_pos must point to the end of the WAL record.
|
||||
fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
|
||||
let wal_seg_size = self
|
||||
.wal_seg_size
|
||||
.ok_or_else(|| anyhow!("wal_seg_size is not initialized"))?;
|
||||
|
||||
// TODO: cross check divergence point
|
||||
|
||||
// nothing to truncate
|
||||
if self.write_lsn == Lsn(0) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Streaming must not create a hole, so truncate cannot be called on non-written lsn
|
||||
assert!(self.write_lsn >= end_pos);
|
||||
assert!(self.write_lsn == Lsn(0) || self.write_lsn >= end_pos);
|
||||
|
||||
// open segment files and delete or fill end with zeroes
|
||||
|
||||
let partial;
|
||||
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
|
||||
|
||||
/* Extract WAL location for this block */
|
||||
let mut xlogoff = end_pos.segment_offset(wal_seg_size) as usize;
|
||||
|
||||
/* Open file */
|
||||
let mut segno = end_pos.segment_number(wal_seg_size);
|
||||
let (wal_file_path, wal_file_partial_path) = self.wal_file_paths(segno)?;
|
||||
{
|
||||
let mut wal_file: File;
|
||||
/* Try to open already completed segment */
|
||||
if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) {
|
||||
wal_file = file;
|
||||
partial = false;
|
||||
} else {
|
||||
wal_file = OpenOptions::new()
|
||||
.write(true)
|
||||
.open(&wal_file_partial_path)?;
|
||||
partial = true;
|
||||
}
|
||||
wal_file.seek(SeekFrom::Start(xlogoff as u64))?;
|
||||
while xlogoff < wal_seg_size {
|
||||
let bytes_to_write = min(XLOG_BLCKSZ, wal_seg_size - xlogoff);
|
||||
wal_file.write_all(&ZERO_BLOCK[0..bytes_to_write])?;
|
||||
xlogoff += bytes_to_write;
|
||||
}
|
||||
// Flush file, if not said otherwise
|
||||
if !self.conf.no_sync {
|
||||
wal_file.sync_all()?;
|
||||
}
|
||||
// Close previously opened file, if any
|
||||
if let Some(mut unflushed_file) = self.file.take() {
|
||||
self.fdatasync_file(&mut unflushed_file)?;
|
||||
}
|
||||
if !partial {
|
||||
|
||||
let xlogoff = end_pos.segment_offset(wal_seg_size) as usize;
|
||||
let segno = end_pos.segment_number(wal_seg_size);
|
||||
let (mut file, is_partial) = self.open_or_create(segno, wal_seg_size)?;
|
||||
|
||||
// Fill end with zeroes
|
||||
file.seek(SeekFrom::Start(xlogoff as u64))?;
|
||||
write_zeroes(&mut file, wal_seg_size - xlogoff)?;
|
||||
self.fdatasync_file(&mut file)?;
|
||||
|
||||
if !is_partial {
|
||||
// Make segment partial once again
|
||||
let (wal_file_path, wal_file_partial_path) =
|
||||
wal_file_paths(&self.timeline_dir, segno, wal_seg_size)?;
|
||||
fs::rename(&wal_file_path, &wal_file_partial_path)?;
|
||||
}
|
||||
|
||||
// Remove all subsequent segments
|
||||
let mut segno = segno;
|
||||
loop {
|
||||
segno += 1;
|
||||
let (wal_file_path, wal_file_partial_path) = self.wal_file_paths(segno)?;
|
||||
let (wal_file_path, wal_file_partial_path) =
|
||||
wal_file_paths(&self.timeline_dir, segno, wal_seg_size)?;
|
||||
// TODO: better use fs::try_exists which is currenty avaialble only in nightly build
|
||||
if wal_file_path.exists() {
|
||||
fs::remove_file(&wal_file_path)?;
|
||||
@@ -412,7 +454,7 @@ impl Storage for PhysicalStorage {
|
||||
}
|
||||
}
|
||||
|
||||
// Update lsns
|
||||
// Update LSNs
|
||||
self.write_lsn = end_pos;
|
||||
self.write_record_lsn = end_pos;
|
||||
self.update_flush_lsn();
|
||||
@@ -491,3 +533,28 @@ impl WalReader {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Zero block for filling created WAL segments.
|
||||
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
|
||||
|
||||
/// Helper for filling file with zeroes.
|
||||
fn write_zeroes(file: &mut File, mut count: usize) -> Result<()> {
|
||||
while count >= XLOG_BLCKSZ {
|
||||
file.write_all(ZERO_BLOCK)?;
|
||||
count -= XLOG_BLCKSZ;
|
||||
}
|
||||
file.write_all(&ZERO_BLOCK[0..count])?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Helper returning full path to WAL segment file and its .partial brother.
|
||||
fn wal_file_paths(
|
||||
timeline_dir: &Path,
|
||||
segno: XLogSegNo,
|
||||
wal_seg_size: usize,
|
||||
) -> Result<(PathBuf, PathBuf)> {
|
||||
let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
|
||||
let wal_file_path = timeline_dir.join(wal_file_name.clone());
|
||||
let wal_file_partial_path = timeline_dir.join(wal_file_name + ".partial");
|
||||
Ok((wal_file_path, wal_file_partial_path))
|
||||
}
|
||||
|
||||
@@ -392,7 +392,7 @@ fn get_tenantid(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<ZTe
|
||||
if let Some(tenantid_cmd) = sub_match.value_of("tenantid") {
|
||||
Ok(ZTenantId::from_str(tenantid_cmd)?)
|
||||
} else if let Some(tenantid_conf) = env.default_tenantid {
|
||||
Ok(tenantid_conf)
|
||||
Ok(ZTenantId::from(tenantid_conf))
|
||||
} else {
|
||||
bail!("No tenantid. Use --tenantid, or set 'default_tenantid' in the config file");
|
||||
}
|
||||
@@ -418,7 +418,7 @@ fn handle_init(init_match: &ArgMatches) -> Result<()> {
|
||||
let pageserver = PageServerNode::from_env(&env);
|
||||
if let Err(e) = pageserver.init(
|
||||
// default_tenantid was generated by the `env.init()` call above
|
||||
Some(&env.default_tenantid.unwrap().to_string()),
|
||||
Some(&ZTenantId::from(env.default_tenantid.unwrap()).to_string()),
|
||||
&pageserver_config_overrides(init_match),
|
||||
) {
|
||||
eprintln!("pageserver init failed: {}", e);
|
||||
|
||||
@@ -5,9 +5,7 @@
|
||||
// The second one is that we wanted to use ed25519 keys, but they are also not supported until next version. So we go with RSA keys for now.
|
||||
// Relevant issue: https://github.com/Keats/jsonwebtoken/issues/162
|
||||
|
||||
use hex::{self, FromHex};
|
||||
use serde::de::Error;
|
||||
use serde::{self, Deserializer, Serializer};
|
||||
use serde;
|
||||
use std::fs;
|
||||
use std::path::Path;
|
||||
|
||||
@@ -17,7 +15,7 @@ use jsonwebtoken::{
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::zid::ZTenantId;
|
||||
use crate::zid::{HexZTenantId, ZTenantId};
|
||||
|
||||
const JWT_ALGORITHM: Algorithm = Algorithm::RS256;
|
||||
|
||||
@@ -28,44 +26,18 @@ pub enum Scope {
|
||||
PageServerApi,
|
||||
}
|
||||
|
||||
pub fn to_hex_option<S>(value: &Option<ZTenantId>, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
match value {
|
||||
Some(tid) => hex::serialize(tid, serializer),
|
||||
None => Option::serialize(value, serializer),
|
||||
}
|
||||
}
|
||||
|
||||
fn from_hex_option<'de, D>(deserializer: D) -> Result<Option<ZTenantId>, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
let opt: Option<String> = Option::deserialize(deserializer)?;
|
||||
match opt {
|
||||
Some(tid) => Ok(Some(ZTenantId::from_hex(tid).map_err(Error::custom)?)),
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct Claims {
|
||||
// this custom serialize/deserialize_with is needed because Option is not transparent to serde
|
||||
// so clearest option is serde(with = "hex") but it is not working, for details see https://github.com/serde-rs/serde/issues/1301
|
||||
#[serde(
|
||||
default,
|
||||
skip_serializing_if = "Option::is_none",
|
||||
serialize_with = "to_hex_option",
|
||||
deserialize_with = "from_hex_option"
|
||||
)]
|
||||
pub tenant_id: Option<ZTenantId>,
|
||||
pub tenant_id: Option<HexZTenantId>,
|
||||
pub scope: Scope,
|
||||
}
|
||||
|
||||
impl Claims {
|
||||
pub fn new(tenant_id: Option<ZTenantId>, scope: Scope) -> Self {
|
||||
Self { tenant_id, scope }
|
||||
Self {
|
||||
tenant_id: tenant_id.map(HexZTenantId::from),
|
||||
scope,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -75,7 +47,7 @@ pub fn check_permission(claims: &Claims, tenantid: Option<ZTenantId>) -> Result<
|
||||
bail!("Attempt to access management api with tenant scope. Permission denied")
|
||||
}
|
||||
(Scope::Tenant, Some(tenantid)) => {
|
||||
if claims.tenant_id.unwrap() != tenantid {
|
||||
if ZTenantId::from(claims.tenant_id.unwrap()) != tenantid {
|
||||
bail!("Tenant id mismatch. Permission denied")
|
||||
}
|
||||
Ok(())
|
||||
|
||||
@@ -2,13 +2,100 @@ use std::{fmt, str::FromStr};
|
||||
|
||||
use hex::FromHex;
|
||||
use rand::Rng;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde::{
|
||||
de::{self, Visitor},
|
||||
Deserialize, Serialize,
|
||||
};
|
||||
|
||||
// Zenith ID is a 128-bit random ID.
|
||||
// Used to represent various identifiers. Provides handy utility methods and impls.
|
||||
macro_rules! mutual_from {
|
||||
($id1:ident, $id2:ident) => {
|
||||
impl From<$id1> for $id2 {
|
||||
fn from(id1: $id1) -> Self {
|
||||
Self(id1.0.into())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<$id2> for $id1 {
|
||||
fn from(id2: $id2) -> Self {
|
||||
Self(id2.0.into())
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/// Zenith ID is a 128-bit random ID.
|
||||
/// Used to represent various identifiers. Provides handy utility methods and impls.
|
||||
///
|
||||
/// NOTE: It (de)serializes as an array of hex bytes, so the string representation would look
|
||||
/// like `[173,80,132,115,129,226,72,254,170,201,135,108,199,26,228,24]`.
|
||||
/// Use [`HexZId`] to serialize it as hex string instead: `ad50847381e248feaac9876cc71ae418`.
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)]
|
||||
struct ZId([u8; 16]);
|
||||
|
||||
/// [`ZId`] version that serializes and deserializes as a hex string.
|
||||
/// Useful for various json serializations, where hex byte array from original id is not convenient.
|
||||
///
|
||||
/// Plain `ZId` could be (de)serialized into hex string with `#[serde(with = "hex")]` attribute.
|
||||
/// This however won't work on nested types like `Option<ZId>` or `Vec<ZId>`, see https://github.com/serde-rs/serde/issues/723 for the details.
|
||||
/// Every separate type currently needs a new (de)serializing method for every type separately.
|
||||
///
|
||||
/// To provide a generic way to serialize the ZId as a hex string where `#[serde(with = "hex")]` is not enough, this wrapper is created.
|
||||
/// The default wrapper serialization is left unchanged due to
|
||||
/// * byte array (de)serialization being faster and simpler
|
||||
/// * byte deserialization being used in Safekeeper already, with those bytes coming from compute (see `ProposerGreeting` in safekeeper)
|
||||
/// * current `HexZId`'s deserialization impl breaks on compute byte array deserialization, having it by default is dangerous
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
|
||||
struct HexZId([u8; 16]);
|
||||
|
||||
impl Serialize for HexZId {
|
||||
fn serialize<S>(&self, ser: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
hex::encode(self.0).serialize(ser)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for HexZId {
|
||||
fn deserialize<D>(de: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
de.deserialize_bytes(HexVisitor)
|
||||
}
|
||||
}
|
||||
|
||||
struct HexVisitor;
|
||||
|
||||
impl<'de> Visitor<'de> for HexVisitor {
|
||||
type Value = HexZId;
|
||||
|
||||
fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"A hexadecimal representation of a 128-bit random Zenith ID"
|
||||
)
|
||||
}
|
||||
|
||||
fn visit_bytes<E>(self, hex_bytes: &[u8]) -> Result<Self::Value, E>
|
||||
where
|
||||
E: de::Error,
|
||||
{
|
||||
ZId::from_hex(hex_bytes)
|
||||
.map(HexZId::from)
|
||||
.map_err(de::Error::custom)
|
||||
}
|
||||
|
||||
fn visit_str<E>(self, hex_bytes_str: &str) -> Result<Self::Value, E>
|
||||
where
|
||||
E: de::Error,
|
||||
{
|
||||
Self::visit_bytes(self, hex_bytes_str.as_bytes())
|
||||
}
|
||||
}
|
||||
|
||||
mutual_from!(ZId, HexZId);
|
||||
|
||||
impl ZId {
|
||||
pub fn get_from_buf(buf: &mut dyn bytes::Buf) -> ZId {
|
||||
let mut arr = [0u8; 16];
|
||||
@@ -155,46 +242,80 @@ macro_rules! zid_newtype {
|
||||
/// is separate from PostgreSQL timelines, and doesn't have those
|
||||
/// limitations. A zenith timeline is identified by a 128-bit ID, which
|
||||
/// is usually printed out as a hex string.
|
||||
///
|
||||
/// NOTE: It (de)serializes as an array of hex bytes, so the string representation would look
|
||||
/// like `[173,80,132,115,129,226,72,254,170,201,135,108,199,26,228,24]`.
|
||||
/// Use [`HexZTimelineId`] to serialize it as hex string instead: `ad50847381e248feaac9876cc71ae418`.
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd, Serialize, Deserialize)]
|
||||
pub struct ZTimelineId(ZId);
|
||||
|
||||
zid_newtype!(ZTimelineId);
|
||||
/// A [`ZTimelineId`] version that gets (de)serialized as a hex string.
|
||||
/// Use in complex types, where `#[serde(with = "hex")]` does not work.
|
||||
/// See [`HexZId`] for more details.
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd, Serialize, Deserialize)]
|
||||
pub struct HexZTimelineId(HexZId);
|
||||
|
||||
// Zenith Tenant Id represents identifiar of a particular tenant.
|
||||
// Is used for distinguishing requests and data belonging to different users.
|
||||
impl std::fmt::Debug for HexZTimelineId {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
ZTimelineId::from(*self).fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for HexZTimelineId {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
ZTimelineId::from(*self).fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for HexZTimelineId {
|
||||
type Err = <ZTimelineId as FromStr>::Err;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
Ok(HexZTimelineId::from(ZTimelineId::from_str(s)?))
|
||||
}
|
||||
}
|
||||
|
||||
zid_newtype!(ZTimelineId);
|
||||
mutual_from!(ZTimelineId, HexZTimelineId);
|
||||
|
||||
/// Zenith Tenant Id represents identifiar of a particular tenant.
|
||||
/// Is used for distinguishing requests and data belonging to different users.
|
||||
///
|
||||
/// NOTE: It (de)serializes as an array of hex bytes, so the string representation would look
|
||||
/// like `[173,80,132,115,129,226,72,254,170,201,135,108,199,26,228,24]`.
|
||||
/// Use [`HexZTenantId`] to serialize it as hex string instead: `ad50847381e248feaac9876cc71ae418`.
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)]
|
||||
pub struct ZTenantId(ZId);
|
||||
|
||||
zid_newtype!(ZTenantId);
|
||||
/// A [`ZTenantId`] version that gets (de)serialized as a hex string.
|
||||
/// Use in complex types, where `#[serde(with = "hex")]` does not work.
|
||||
/// See [`HexZId`] for more details.
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd, Serialize, Deserialize)]
|
||||
pub struct HexZTenantId(HexZId);
|
||||
|
||||
/// Serde routines for Option<T> (de)serialization, using `T:Display` representations for inner values.
|
||||
/// Useful for Option<ZTenantId> and Option<ZTimelineId> to get their hex representations into serialized string and deserialize them back.
|
||||
pub mod opt_display_serde {
|
||||
use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
|
||||
use std::{fmt::Display, str::FromStr};
|
||||
|
||||
pub fn serialize<S, Id>(id: &Option<Id>, ser: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
Id: Display,
|
||||
{
|
||||
id.as_ref().map(ToString::to_string).serialize(ser)
|
||||
}
|
||||
|
||||
pub fn deserialize<'de, D, Id>(des: D) -> Result<Option<Id>, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
Id: FromStr,
|
||||
<Id as FromStr>::Err: Display,
|
||||
{
|
||||
Ok(if let Some(s) = Option::<String>::deserialize(des)? {
|
||||
Some(Id::from_str(&s).map_err(de::Error::custom)?)
|
||||
} else {
|
||||
None
|
||||
})
|
||||
impl std::fmt::Debug for HexZTenantId {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
ZTenantId::from(*self).fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for HexZTenantId {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
ZTenantId::from(*self).fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for HexZTenantId {
|
||||
type Err = <ZTenantId as FromStr>::Err;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
Ok(HexZTenantId::from(ZTenantId::from_str(s)?))
|
||||
}
|
||||
}
|
||||
|
||||
zid_newtype!(ZTenantId);
|
||||
mutual_from!(ZTenantId, HexZTenantId);
|
||||
|
||||
// A pair uniquely identifying Zenith instance.
|
||||
#[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash)]
|
||||
pub struct ZTenantTimelineId {
|
||||
@@ -243,16 +364,15 @@ mod tests {
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||
struct TestStruct<E: Display, T: FromStr<Err = E> + Display> {
|
||||
#[serde(with = "opt_display_serde")]
|
||||
field: Option<T>,
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_hex_serializations_tenant_id() {
|
||||
let original_struct = TestStruct {
|
||||
field: Some(ZTenantId::from_array(hex!(
|
||||
field: Some(HexZTenantId::from(ZTenantId::from_array(hex!(
|
||||
"11223344556677881122334455667788"
|
||||
))),
|
||||
)))),
|
||||
};
|
||||
|
||||
let serialized_string = serde_json::to_string(&original_struct).unwrap();
|
||||
@@ -261,7 +381,7 @@ mod tests {
|
||||
r#"{"field":"11223344556677881122334455667788"}"#
|
||||
);
|
||||
|
||||
let deserialized_struct: TestStruct<FromHexError, ZTenantId> =
|
||||
let deserialized_struct: TestStruct<FromHexError, HexZTenantId> =
|
||||
serde_json::from_str(&serialized_string).unwrap();
|
||||
assert_eq!(original_struct, deserialized_struct);
|
||||
}
|
||||
@@ -269,9 +389,9 @@ mod tests {
|
||||
#[test]
|
||||
fn test_hex_serializations_timeline_id() {
|
||||
let original_struct = TestStruct {
|
||||
field: Some(ZTimelineId::from_array(hex!(
|
||||
field: Some(HexZTimelineId::from(ZTimelineId::from_array(hex!(
|
||||
"AA223344556677881122334455667788"
|
||||
))),
|
||||
)))),
|
||||
};
|
||||
|
||||
let serialized_string = serde_json::to_string(&original_struct).unwrap();
|
||||
@@ -280,7 +400,7 @@ mod tests {
|
||||
r#"{"field":"aa223344556677881122334455667788"}"#
|
||||
);
|
||||
|
||||
let deserialized_struct: TestStruct<FromHexError, ZTimelineId> =
|
||||
let deserialized_struct: TestStruct<FromHexError, HexZTimelineId> =
|
||||
serde_json::from_str(&serialized_string).unwrap();
|
||||
assert_eq!(original_struct, deserialized_struct);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user