Compare commits

..

20 Commits

Author SHA1 Message Date
Bojan Serafimov
1c40c26313 Parse search_path option 2022-03-07 18:50:52 -05:00
Bojan Serafimov
a6ace609a7 Fix typo 2022-03-07 17:56:12 -05:00
Bojan Serafimov
29d72e8955 Add proxy test 2022-03-07 14:32:24 -05:00
Kirill Bulatov
66eb2a1dd3 Replace zenith/build build image with zimg/* ones 2022-03-04 13:46:44 +02:00
Kirill Bulatov
9424bfae22 Use a separate newtype for ZId that (de)serialize as hex strings 2022-03-04 10:58:40 +02:00
Dmitry Rodionov
1d90b1b205 add node id to pageserver (#1310)
* Add --id argument to safekeeper setting its unique u64 id.

In preparation for storage node messaging. IDs are supposed to be monotonically
assigned by the console. In tests it is issued by ZenithEnv; at the zenith cli
level and fixtures, string name is completely replaced by integer id. Example
TOML configs are adjusted accordingly.

Sequential ids are chosen over Zid mainly because they are compact and easy to
type/remember.

* add node id to pageserver

This adds node id parameter to pageserver configuration. Also I use a
simple builder to construct pageserver config struct to avoid setting
node id to some temporary invalid value. Some of the changes in test
fixtures are needed to split init and start operations for envrionment.

Co-authored-by: Arseny Sher <sher-ars@yandex.ru>
2022-03-04 01:10:42 +03:00
Kirill Bulatov
949f8b4633 Fix 1.59 rustc clippy warnings 2022-03-02 21:35:34 +02:00
Andrey Taranik
26a68612d9 safekeeper to cosnole call fix (#1333) 2022-02-27 01:36:40 +03:00
Andrey Taranik
850dfd02df Release deployment (#1331)
* new deployment flow for staging and production

* ansible playbooks and circleci config fixes

* cleanup before merge

* additional cleanup before merge

* debug deployment to staging env

* debug deployment to staging env

* debug deployment to staging env

* debug deployment to staging env

* debug deployment to staging env

* debug deployment to staging env

* bianries artifacts path fix for ansible playbooks

* deployment flow refactored

* base64 decode fix for ssh key

* fix for console notification and production deploy settings

* cleanup after deployment tests

* fix - trigger release binaries download for production deploy
2022-02-26 23:33:16 +03:00
Arthur Petukhovsky
c8a1192b53 Optimize WAL storage in safekeeper (#1318)
When several AppendRequest's can be read from socket without blocking,
they are processed together and fsync() to segment file is only called
once. Segment file is no longer opened for every write request, now
last opened file is cached inside PhysicalStorage. New metric for WAL
flushes was added to the storage, FLUSH_WAL_SECONDS. More errors were
added to storage for non-sequential WAL writes, now write_lsn can be
moved only with calls to truncate_lsn(new_lsn).

New messages have been added to ProposerAcceptorMessage enum. They
can't be deserialized directly and now are used only for optimizing
flushes. Existing protocol wasn't changed and flush will be called for
every AppendRequest, as it was before.
2022-02-25 18:52:21 +03:00
bojanserafimov
137d616e76 [proxy] Add pytest fixture (#1311) 2022-02-24 11:20:07 -05:00
Kirill Bulatov
917c640818 Fix mypy for the new Python 2022-02-24 14:24:36 +03:00
anastasia
c1b3836df1 Bump vendor/postgres 2022-02-24 12:52:12 +03:00
Heikki Linnakangas
5120ba4b5f Refactor the interface for using cached page image.
Instead of passing it as a separate argument to get_page_reconstruct_data,
the caller can fill it in the PageReconstructData struct.
2022-02-24 10:37:12 +02:00
Heikki Linnakangas
e4670a5f1e Remove the PageVersions abstraction.
Since commit fdd987c3ad, it was only used in InMemoryLayers. Let's
just "inline" the code into InMemoryLayer itself.

I originally did this as part of a bigger PR (#1267). With that PR,
one in-memory layer, and one ephemeral file, would hold page versions
belonging to multiple segments. Currently, PageVersions can only hold
versions for a single segment, so that would need to be changed.
Rather than modify PageVersions to support that, just remove it
altogether.
2022-02-23 21:04:39 +02:00
Heikki Linnakangas
7fae894648 Move a few unit tests specific to layered file format.
These tests have intimate knowledge of the directory layeout and layer
file names used by the LayeredRepository implementation of the
Repository trait. Move them, so that all the tests that remain in
repository.rs are expected to work without changes with any
implementation of Repository. Not that we have any plans to create
another Repository implementaiton any time soon, but as long as we
have the Repository interface, let's try to maintain that abstraction
in the tests too.
2022-02-23 20:32:06 +02:00
Stas Kelvich
058123f7ef Bump postgres to fix zenith_test_utils linkage on macOS. 2022-02-23 20:33:47 +03:00
anastasia
87edbd38c7 Add 'wait_lsn_timeout' and 'wal_redo_timeout' pageserver config options instead of hardcoded defaults 2022-02-23 19:59:35 +03:00
anastasia
58ee5d005f Add --pageserver-config-override to ZenithEnvBuilder to tune checkpointer and GC in tests.
Usage example:
zenith_env_builder.pageserver_config_override = "checkpoint_period = '100 s'; checkpoint_distance = 1073741824"
2022-02-23 19:59:35 +03:00
Heikki Linnakangas
468366a28f Fix wrong 'lsn' stored in test page image
The test creates a page version with a string like "foo 123 at 0/10"
as the content. But the LSN stored in that string was wrong: the page
version stored at LSN 0/20 would say "foo <blk> at 0/10".
2022-02-23 11:33:17 +02:00
51 changed files with 1690 additions and 1100 deletions

View File

@@ -0,0 +1,10 @@
[defaults]
localhost_warning = False
host_key_checking = False
timeout = 30
[ssh_connection]
ssh_args = -F ./ansible.ssh.cfg
scp_if_ssh = True
pipelining = True

View File

@@ -0,0 +1,11 @@
Host tele.zenith.tech
User admin
Port 3023
StrictHostKeyChecking no
UserKnownHostsFile /dev/null
Host * !tele.zenith.tech
User admin
StrictHostKeyChecking no
UserKnownHostsFile /dev/null
ProxyJump tele.zenith.tech

View File

@@ -0,0 +1,174 @@
- name: Upload Zenith binaries
hosts: pageservers:safekeepers
gather_facts: False
remote_user: admin
vars:
force_deploy: false
tasks:
- name: get latest version of Zenith binaries
ignore_errors: true
register: current_version_file
set_fact:
current_version: "{{ lookup('file', '.zenith_current_version') | trim }}"
tags:
- pageserver
- safekeeper
- name: set zero value for current_version
when: current_version_file is failed
set_fact:
current_version: "0"
tags:
- pageserver
- safekeeper
- name: get deployed version from content of remote file
ignore_errors: true
ansible.builtin.slurp:
src: /usr/local/.zenith_current_version
register: remote_version_file
tags:
- pageserver
- safekeeper
- name: decode remote file content
when: remote_version_file is succeeded
set_fact:
remote_version: "{{ remote_version_file['content'] | b64decode | trim }}"
tags:
- pageserver
- safekeeper
- name: set zero value for remote_version
when: remote_version_file is failed
set_fact:
remote_version: "0"
tags:
- pageserver
- safekeeper
- name: inform about versions
debug: msg="Version to deploy - {{ current_version }}, version on storage node - {{ remote_version }}"
tags:
- pageserver
- safekeeper
- name: upload and extract Zenith binaries to /usr/local
when: current_version > remote_version or force_deploy
ansible.builtin.unarchive:
owner: root
group: root
src: zenith_install.tar.gz
dest: /usr/local
become: true
tags:
- pageserver
- safekeeper
- binaries
- putbinaries
- name: Deploy pageserver
hosts: pageservers
gather_facts: False
remote_user: admin
vars:
force_deploy: false
tasks:
- name: init pageserver
when: current_version > remote_version or force_deploy
shell:
cmd: sudo -u pageserver /usr/local/bin/pageserver -c "pg_distrib_dir='/usr/local'" --init -D /storage/pageserver/data
args:
creates: "/storage/pageserver/data/tenants"
environment:
ZENITH_REPO_DIR: "/storage/pageserver/data"
LD_LIBRARY_PATH: "/usr/local/lib"
become: true
tags:
- pageserver
- name: upload systemd service definition
when: current_version > remote_version or force_deploy
ansible.builtin.template:
src: systemd/pageserver.service
dest: /etc/systemd/system/pageserver.service
owner: root
group: root
mode: '0644'
become: true
tags:
- pageserver
- name: start systemd service
when: current_version > remote_version or force_deploy
ansible.builtin.systemd:
daemon_reload: yes
name: pageserver
enabled: yes
state: restarted
become: true
tags:
- pageserver
- name: post version to console
when: (current_version > remote_version or force_deploy) and console_mgmt_base_url is defined
shell:
cmd: |
INSTANCE_ID=$(curl -s http://169.254.169.254/latest/meta-data/instance-id)
curl -sfS -d '{"version": {{ current_version }} }' -X POST {{ console_mgmt_base_url }}/api/v1/pageservers/$INSTANCE_ID
tags:
- pageserver
- name: Deploy safekeeper
hosts: safekeepers
gather_facts: False
remote_user: admin
vars:
force_deploy: false
tasks:
# in the future safekeepers should discover pageservers byself
# but currently use first pageserver that was discovered
- name: set first pageserver var for safekeepers
when: current_version > remote_version or force_deploy
set_fact:
first_pageserver: "{{ hostvars[groups['pageservers'][0]]['inventory_hostname'] }}"
tags:
- safekeeper
- name: upload systemd service definition
when: current_version > remote_version or force_deploy
ansible.builtin.template:
src: systemd/safekeeper.service
dest: /etc/systemd/system/safekeeper.service
owner: root
group: root
mode: '0644'
become: true
tags:
- safekeeper
- name: start systemd service
when: current_version > remote_version or force_deploy
ansible.builtin.systemd:
daemon_reload: yes
name: safekeeper
enabled: yes
state: restarted
become: true
tags:
- safekeeper
- name: post version to console
when: (current_version > remote_version or force_deploy) and console_mgmt_base_url is defined
shell:
cmd: |
INSTANCE_ID=$(curl -s http://169.254.169.254/latest/meta-data/instance-id)
curl -sfS -d '{"version": {{ current_version }} }' -X POST {{ console_mgmt_base_url }}/api/v1/safekeepers/$INSTANCE_ID
tags:
- safekeeper

View File

@@ -0,0 +1,52 @@
#!/bin/bash
set -e
RELEASE=${RELEASE:-false}
# look at docker hub for latest tag fo zenith docker image
if [ "${RELEASE}" = "true" ]; then
echo "search latest relase tag"
VERSION=$(curl -s https://registry.hub.docker.com/v1/repositories/zenithdb/zenith/tags |jq -r -S '.[].name' | grep release | sed 's/release-//g' | tail -1)
if [ -z "${VERSION}" ]; then
echo "no any docker tags found, exiting..."
exit 1
else
TAG="release-${VERSION}"
fi
else
echo "search latest dev tag"
VERSION=$(curl -s https://registry.hub.docker.com/v1/repositories/zenithdb/zenith/tags |jq -r -S '.[].name' | grep -v release | tail -1)
if [ -z "${VERSION}" ]; then
echo "no any docker tags found, exiting..."
exit 1
else
TAG="${VERSION}"
fi
fi
echo "found ${VERSION}"
# do initial cleanup
rm -rf zenith_install postgres_install.tar.gz zenith_install.tar.gz .zenith_current_version
mkdir zenith_install
# retrive binaries from docker image
echo "getting binaries from docker image"
docker pull --quiet zenithdb/zenith:${TAG}
ID=$(docker create zenithdb/zenith:${TAG})
docker cp ${ID}:/data/postgres_install.tar.gz .
tar -xzf postgres_install.tar.gz -C zenith_install
docker cp ${ID}:/usr/local/bin/pageserver zenith_install/bin/
docker cp ${ID}:/usr/local/bin/safekeeper zenith_install/bin/
docker cp ${ID}:/usr/local/bin/proxy zenith_install/bin/
docker cp ${ID}:/usr/local/bin/postgres zenith_install/bin/
docker rm -vf ${ID}
# store version to file (for ansible playbooks) and create binaries tarball
echo ${VERSION} > zenith_install/.zenith_current_version
echo ${VERSION} > .zenith_current_version
tar -czf zenith_install.tar.gz -C zenith_install .
# do final cleaup
rm -rf zenith_install postgres_install.tar.gz

View File

@@ -0,0 +1,7 @@
[pageservers]
zenith-1-ps-1
[safekeepers]
zenith-1-sk-1
zenith-1-sk-2
zenith-1-sk-3

View File

@@ -0,0 +1,7 @@
[pageservers]
zenith-us-stage-ps-1
[safekeepers]
zenith-us-stage-sk-1
zenith-us-stage-sk-2
zenith-us-stage-sk-3

View File

@@ -0,0 +1,18 @@
[Unit]
Description=Zenith pageserver
After=network.target auditd.service
[Service]
Type=simple
User=pageserver
Environment=RUST_BACKTRACE=1 ZENITH_REPO_DIR=/storage/pageserver LD_LIBRARY_PATH=/usr/local/lib
ExecStart=/usr/local/bin/pageserver -c "pg_distrib_dir='/usr/local'" -c "listen_pg_addr='0.0.0.0:6400'" -c "listen_http_addr='0.0.0.0:9898'" -D /storage/pageserver/data
ExecReload=/bin/kill -HUP $MAINPID
KillMode=mixed
KillSignal=SIGINT
Restart=on-failure
TimeoutSec=10
LimitNOFILE=30000000
[Install]
WantedBy=multi-user.target

View File

@@ -0,0 +1,18 @@
[Unit]
Description=Zenith safekeeper
After=network.target auditd.service
[Service]
Type=simple
User=safekeeper
Environment=RUST_BACKTRACE=1 ZENITH_REPO_DIR=/storage/safekeeper/data LD_LIBRARY_PATH=/usr/local/lib
ExecStart=/usr/local/bin/safekeeper -l {{ inventory_hostname }}.local:6500 --listen-http {{ inventory_hostname }}.local:7676 -p {{ first_pageserver }}:6400 -D /storage/safekeeper/data
ExecReload=/bin/kill -HUP $MAINPID
KillMode=mixed
KillSignal=SIGINT
Restart=on-failure
TimeoutSec=10
LimitNOFILE=30000000
[Install]
WantedBy=multi-user.target

View File

@@ -471,46 +471,78 @@ jobs:
docker build -t zenithdb/compute-node:latest vendor/postgres && docker push zenithdb/compute-node:latest
docker tag zenithdb/compute-node:latest zenithdb/compute-node:${DOCKER_TAG} && docker push zenithdb/compute-node:${DOCKER_TAG}
# Build production zenithdb/zenith:release image and push it to Docker hub
docker-image-release:
docker:
- image: cimg/base:2021.04
steps:
- checkout
- setup_remote_docker:
docker_layer_caching: true
- run:
name: Init postgres submodule
command: git submodule update --init --depth 1
- run:
name: Build and push Docker image
command: |
echo $DOCKER_PWD | docker login -u $DOCKER_LOGIN --password-stdin
DOCKER_TAG="release-$(git log --oneline|wc -l)"
docker build --build-arg GIT_VERSION=$CIRCLE_SHA1 -t zenithdb/zenith:release . && docker push zenithdb/zenith:release
docker tag zenithdb/zenith:release zenithdb/zenith:${DOCKER_TAG} && docker push zenithdb/zenith:${DOCKER_TAG}
# Build production zenithdb/compute-node:release image and push it to Docker hub
docker-image-compute-release:
docker:
- image: cimg/base:2021.04
steps:
- checkout
- setup_remote_docker:
docker_layer_caching: true
# Build zenithdb/compute-tools:release image and push it to Docker hub
# TODO: this should probably also use versioned tag, not just :latest.
# XXX: but should it? We build and use it only locally now.
- run:
name: Build and push compute-tools Docker image
command: |
echo $DOCKER_PWD | docker login -u $DOCKER_LOGIN --password-stdin
docker build -t zenithdb/compute-tools:release -f Dockerfile.compute-tools .
docker push zenithdb/compute-tools:release
- run:
name: Init postgres submodule
command: git submodule update --init --depth 1
- run:
name: Build and push compute-node Docker image
command: |
echo $DOCKER_PWD | docker login -u $DOCKER_LOGIN --password-stdin
DOCKER_TAG="release-$(git log --oneline|wc -l)"
docker build -t zenithdb/compute-node:release vendor/postgres && docker push zenithdb/compute-node:release
docker tag zenithdb/compute-node:release zenithdb/compute-node:${DOCKER_TAG} && docker push zenithdb/compute-node:${DOCKER_TAG}
deploy-staging:
docker:
- image: cimg/python:3.10
steps:
- checkout
- setup_remote_docker
- run:
name: Get Zenith binaries
command: |
rm -rf zenith_install postgres_install.tar.gz zenith_install.tar.gz
mkdir zenith_install
DOCKER_TAG=$(git log --oneline|wc -l)
docker pull --quiet zenithdb/zenith:${DOCKER_TAG}
ID=$(docker create zenithdb/zenith:${DOCKER_TAG})
docker cp $ID:/data/postgres_install.tar.gz .
tar -xzf postgres_install.tar.gz -C zenith_install && rm postgres_install.tar.gz
docker cp $ID:/usr/local/bin/pageserver zenith_install/bin/
docker cp $ID:/usr/local/bin/safekeeper zenith_install/bin/
docker cp $ID:/usr/local/bin/proxy zenith_install/bin/
docker cp $ID:/usr/local/bin/postgres zenith_install/bin/
docker rm -v $ID
echo ${DOCKER_TAG} | tee zenith_install/.zenith_current_version
tar -czf zenith_install.tar.gz -C zenith_install .
ls -la zenith_install.tar.gz
- run:
name: Setup ansible
command: |
pip install --progress-bar off --user ansible boto3
ansible-galaxy collection install amazon.aws
- run:
name: Apply re-deploy playbook
environment:
ANSIBLE_HOST_KEY_CHECKING: false
name: Redeploy
command: |
echo "${STAGING_SSH_KEY}" | base64 --decode | ssh-add -
export AWS_REGION=${STAGING_AWS_REGION}
export AWS_ACCESS_KEY_ID=${STAGING_AWS_ACCESS_KEY_ID}
export AWS_SECRET_ACCESS_KEY=${STAGING_AWS_SECRET_ACCESS_KEY}
ansible-playbook .circleci/storage-redeploy.playbook.yml
rm -f zenith_install.tar.gz
cd "$(pwd)/.circleci/ansible"
./get_binaries.sh
echo "${TELEPORT_SSH_KEY}" | tr -d '\n'| base64 --decode >ssh-key
echo "${TELEPORT_SSH_CERT}" | tr -d '\n'| base64 --decode >ssh-key-cert.pub
chmod 0600 ssh-key
ssh-add ssh-key
rm -f ssh-key ssh-key-cert.pub
ansible-playbook deploy.yaml -i staging.hosts
rm -f zenith_install.tar.gz .zenith_current_version
deploy-staging-proxy:
docker:
@@ -533,7 +565,57 @@ jobs:
name: Re-deploy proxy
command: |
DOCKER_TAG=$(git log --oneline|wc -l)
helm upgrade zenith-proxy zenithdb/zenith-proxy --install -f .circleci/proxy.staging.yaml --set image.tag=${DOCKER_TAG} --wait
helm upgrade zenith-proxy zenithdb/zenith-proxy --install -f .circleci/helm-values/staging.proxy.yaml --set image.tag=${DOCKER_TAG} --wait
deploy-release:
docker:
- image: cimg/python:3.10
steps:
- checkout
- setup_remote_docker
- run:
name: Setup ansible
command: |
pip install --progress-bar off --user ansible boto3
- run:
name: Redeploy
command: |
cd "$(pwd)/.circleci/ansible"
RELEASE=true ./get_binaries.sh
echo "${TELEPORT_SSH_KEY}" | tr -d '\n'| base64 --decode >ssh-key
echo "${TELEPORT_SSH_CERT}" | tr -d '\n'| base64 --decode >ssh-key-cert.pub
chmod 0600 ssh-key
ssh-add ssh-key
rm -f ssh-key ssh-key-cert.pub
ansible-playbook deploy.yaml -i production.hosts -e console_mgmt_base_url=http://console-release.local
rm -f zenith_install.tar.gz .zenith_current_version
deploy-release-proxy:
docker:
- image: cimg/base:2021.04
environment:
KUBECONFIG: .kubeconfig
steps:
- checkout
- run:
name: Store kubeconfig file
command: |
echo "${PRODUCTION_KUBECONFIG_DATA}" | base64 --decode > ${KUBECONFIG}
chmod 0600 ${KUBECONFIG}
- run:
name: Setup helm v3
command: |
curl -s https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 | bash
helm repo add zenithdb https://zenithdb.github.io/helm-charts
- run:
name: Re-deploy proxy
command: |
DOCKER_TAG="release-$(git log --oneline|wc -l)"
helm upgrade zenith-proxy zenithdb/zenith-proxy --install -f .circleci/helm-values/production.proxy.yaml --set image.tag=${DOCKER_TAG} --wait
# Trigger a new remote CI job
remote-ci-trigger:
@@ -669,6 +751,47 @@ workflows:
- main
requires:
- docker-image
- docker-image-release:
# Context gives an ability to login
context: Docker Hub
# Build image only for commits to main
filters:
branches:
only:
- release
requires:
- pg_regress-tests-release
- other-tests-release
- docker-image-compute-release:
# Context gives an ability to login
context: Docker Hub
# Build image only for commits to main
filters:
branches:
only:
- release
requires:
- pg_regress-tests-release
- other-tests-release
- deploy-release:
# Context gives an ability to login
context: Docker Hub
# deploy only for commits to main
filters:
branches:
only:
- release
requires:
- docker-image-release
- deploy-release-proxy:
# deploy only for commits to main
filters:
branches:
only:
- release
requires:
- docker-image-release
- remote-ci-trigger:
# Context passes credentials for gh api
context: CI_ACCESS_TOKEN

View File

@@ -0,0 +1,35 @@
# Helm chart values for zenith-proxy.
# This is a YAML-formatted file.
settings:
authEndpoint: "https://console.zenith.tech/authenticate_proxy_request/"
uri: "https://console.zenith.tech/psql_session/"
# -- Additional labels for zenith-proxy pods
podLabels:
zenith_service: proxy
zenith_env: production
zenith_region: us-west-2
zenith_region_slug: oregon
service:
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internal
external-dns.alpha.kubernetes.io/hostname: proxy-release.local
type: LoadBalancer
exposedService:
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: external
service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip
service.beta.kubernetes.io/aws-load-balancer-scheme: internet-facing
external-dns.alpha.kubernetes.io/hostname: start.zenith.tech
metrics:
enabled: true
serviceMonitor:
enabled: true
selector:
release: kube-prometheus-stack

View File

@@ -1,138 +0,0 @@
- name: discover storage nodes
hosts: localhost
connection: local
gather_facts: False
tasks:
- name: discover safekeepers
no_log: true
ec2_instance_info:
filters:
"tag:zenith_env": "staging"
"tag:zenith_service": "safekeeper"
register: ec2_safekeepers
- name: discover pageservers
no_log: true
ec2_instance_info:
filters:
"tag:zenith_env": "staging"
"tag:zenith_service": "pageserver"
register: ec2_pageservers
- name: add safekeepers to host group
no_log: true
add_host:
name: safekeeper-{{ ansible_loop.index }}
ansible_host: "{{ item.public_ip_address }}"
groups:
- storage
- safekeepers
with_items: "{{ ec2_safekeepers.instances }}"
loop_control:
extended: yes
- name: add pageservers to host group
no_log: true
add_host:
name: pageserver-{{ ansible_loop.index }}
ansible_host: "{{ item.public_ip_address }}"
groups:
- storage
- pageservers
with_items: "{{ ec2_pageservers.instances }}"
loop_control:
extended: yes
- name: Retrive versions
hosts: storage
gather_facts: False
remote_user: admin
tasks:
- name: Get current version of binaries
set_fact:
current_version: "{{lookup('file', '../zenith_install/.zenith_current_version') }}"
- name: Check that file with version exists on host
stat:
path: /usr/local/.zenith_current_version
register: version_file
- name: Try to get current version from the host
when: version_file.stat.exists
ansible.builtin.fetch:
src: /usr/local/.zenith_current_version
dest: .remote_version.{{ inventory_hostname }}
fail_on_missing: no
flat: yes
- name: Store remote version to variable
when: version_file.stat.exists
set_fact:
remote_version: "{{ lookup('file', '.remote_version.{{ inventory_hostname }}') }}"
- name: Store default value of remote version to variable in case when remote version file not found
when: not version_file.stat.exists
set_fact:
remote_version: "000"
- name: Extract Zenith binaries
hosts: storage
gather_facts: False
remote_user: admin
tasks:
- name: Inform about version conflict
when: current_version <= remote_version
debug: msg="Current version {{ current_version }} LE than remote {{ remote_version }}"
- name: Extract Zenith binaries to /usr/local
when: current_version > remote_version
ansible.builtin.unarchive:
src: ../zenith_install.tar.gz
dest: /usr/local
become: true
- name: Restart safekeepers
hosts: safekeepers
gather_facts: False
remote_user: admin
tasks:
- name: Inform about version conflict
when: current_version <= remote_version
debug: msg="Current version {{ current_version }} LE than remote {{ remote_version }}"
- name: Restart systemd service
when: current_version > remote_version
ansible.builtin.systemd:
daemon_reload: yes
name: safekeeper
enabled: yes
state: restarted
become: true
- name: Restart pageservers
hosts: pageservers
gather_facts: False
remote_user: admin
tasks:
- name: Inform about version conflict
when: current_version <= remote_version
debug: msg="Current version {{ current_version }} LE than remote {{ remote_version }}"
- name: Restart systemd service
when: current_version > remote_version
ansible.builtin.systemd:
daemon_reload: yes
name: pageserver
enabled: yes
state: restarted
become: true

View File

@@ -6,7 +6,7 @@
# Build Postgres separately --- this layer will be rebuilt only if one of
# mentioned paths will get any changes.
#
FROM zenithdb/build:buster AS pg-build
FROM zimg/rust:1.56 AS pg-build
WORKDIR /zenith
COPY ./vendor/postgres vendor/postgres
COPY ./Makefile Makefile
@@ -20,7 +20,7 @@ RUN rm -rf postgres_install/build
# TODO: build cargo deps as separate layer. We used cargo-chef before but that was
# net time waste in a lot of cases. Copying Cargo.lock with empty lib.rs should do the work.
#
FROM zenithdb/build:buster AS build
FROM zimg/rust:1.56 AS build
ARG GIT_VERSION
RUN if [ -z "$GIT_VERSION" ]; then echo "GIT_VERSION is reqired, use build_arg to pass it"; exit 1; fi
@@ -34,7 +34,7 @@ RUN GIT_VERSION=$GIT_VERSION cargo build --release
#
# Copy binaries to resulting image.
#
FROM debian:buster-slim
FROM debian:bullseye-slim
WORKDIR /data
RUN apt-get update && apt-get -yq install libreadline-dev libseccomp-dev openssl ca-certificates && \

View File

@@ -1,16 +0,0 @@
#
# Image with all the required dependencies to build https://github.com/zenithdb/zenith
# and Postgres from https://github.com/zenithdb/postgres
# Also includes some rust development and build tools.
# NB: keep in sync with rust image version in .circle/config.yml
#
FROM rust:1.56.1-slim-buster
WORKDIR /zenith
# Install postgres and zenith build dependencies
# clang is for rocksdb
RUN apt-get update && apt-get -yq install automake libtool build-essential bison flex libreadline-dev zlib1g-dev libxml2-dev \
libseccomp-dev pkg-config libssl-dev clang
# Install rust tools
RUN rustup component add clippy && cargo install cargo-audit

View File

@@ -171,7 +171,7 @@ impl PgQuote for PgIdent {
/// always quotes provided string with `""` and escapes every `"`. Not idempotent,
/// i.e. if string is already escaped it will be escaped again.
fn quote(&self) -> String {
let result = format!("\"{}\"", self.replace("\"", "\"\""));
let result = format!("\"{}\"", self.replace('"', "\"\""));
result
}
}

View File

@@ -215,7 +215,7 @@ pub fn handle_databases(spec: &ClusterSpec, client: &mut Client) -> Result<()> {
if let Some(r) = pg_db {
// XXX: db owner name is returned as quoted string from Postgres,
// when quoting is needed.
let new_owner = if r.owner.starts_with('\"') {
let new_owner = if r.owner.starts_with('"') {
db.owner.quote()
} else {
db.owner.clone()

View File

@@ -12,7 +12,7 @@ use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use zenith_utils::auth::{encode_from_key_file, Claims, Scope};
use zenith_utils::postgres_backend::AuthType;
use zenith_utils::zid::{opt_display_serde, ZNodeId, ZTenantId};
use zenith_utils::zid::{HexZTenantId, ZNodeId, ZTenantId};
use crate::safekeeper::SafekeeperNode;
@@ -47,9 +47,8 @@ pub struct LocalEnv {
// Default tenant ID to use with the 'zenith' command line utility, when
// --tenantid is not explicitly specified.
#[serde(with = "opt_display_serde")]
#[serde(default)]
pub default_tenantid: Option<ZTenantId>,
pub default_tenantid: Option<HexZTenantId>,
// used to issue tokens during e.g pg start
#[serde(default)]
@@ -185,7 +184,7 @@ impl LocalEnv {
// If no initial tenant ID was given, generate it.
if env.default_tenantid.is_none() {
env.default_tenantid = Some(ZTenantId::generate());
env.default_tenantid = Some(HexZTenantId::from(ZTenantId::generate()));
}
env.base_data_dir = base_path();

View File

@@ -7,32 +7,14 @@ Currently we build two main images:
- [zenithdb/zenith](https://hub.docker.com/repository/docker/zenithdb/zenith) — image with pre-built `pageserver`, `safekeeper` and `proxy` binaries and all the required runtime dependencies. Built from [/Dockerfile](/Dockerfile).
- [zenithdb/compute-node](https://hub.docker.com/repository/docker/zenithdb/compute-node) — compute node image with pre-built Postgres binaries from [zenithdb/postgres](https://github.com/zenithdb/postgres).
And two intermediate images used either to reduce build time or to deliver some additional binary tools from other repos:
And additional intermediate images:
- [zenithdb/build](https://hub.docker.com/repository/docker/zenithdb/build) — image with all the dependencies required to build Zenith and compute node images. This image is based on `rust:slim-buster`, so it also has a proper `rust` environment. Built from [/Dockerfile.build](/Dockerfile.build).
- [zenithdb/compute-tools](https://hub.docker.com/repository/docker/zenithdb/compute-tools) — compute node configuration management tools.
## Building pipeline
1. Image `zenithdb/compute-tools` is re-built automatically.
2. Image `zenithdb/build` is built manually. If you want to introduce any new compile time dependencies to Zenith or compute node you have to update this image as well, build it and push to Docker Hub.
2. Image `zenithdb/compute-node` is built independently in the [zenithdb/postgres](https://github.com/zenithdb/postgres) repo.
Build:
```sh
docker build -t zenithdb/build:buster -f Dockerfile.build .
```
Login:
```sh
docker login
```
Push to Docker Hub:
```sh
docker push zenithdb/build:buster
```
3. Image `zenithdb/compute-node` is built independently in the [zenithdb/postgres](https://github.com/zenithdb/postgres) repo.
4. Image `zenithdb/zenith` is built in this repo after a successful `release` tests run and pushed to Docker Hub automatically.
3. Image `zenithdb/zenith` is built in this repo after a successful `release` tests run and pushed to Docker Hub automatically.

View File

@@ -16,10 +16,9 @@ use std::{
};
use tracing::*;
use zenith_utils::crashsafe_dir;
use zenith_utils::logging;
use zenith_utils::lsn::Lsn;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
use zenith_utils::{crashsafe_dir, logging};
use crate::walredo::WalRedoManager;
use crate::CheckpointConfig;

View File

@@ -36,6 +36,9 @@ pub mod defaults {
pub const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024;
pub const DEFAULT_GC_PERIOD: &str = "100 s";
pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s";
pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s";
pub const DEFAULT_SUPERUSER: &str = "zenith_admin";
pub const DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNC: usize = 100;
pub const DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS: u32 = 10;
@@ -59,6 +62,9 @@ pub mod defaults {
#gc_period = '{DEFAULT_GC_PERIOD}'
#gc_horizon = {DEFAULT_GC_HORIZON}
#wait_lsn_timeout = '{DEFAULT_WAIT_LSN_TIMEOUT}'
#wal_redo_timeout = '{DEFAULT_WAL_REDO_TIMEOUT}'
#max_file_descriptors = {DEFAULT_MAX_FILE_DESCRIPTORS}
# initial superuser role name to use when creating a new tenant
@@ -89,6 +95,12 @@ pub struct PageServerConf {
pub gc_horizon: u64,
pub gc_period: Duration,
// Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call.
pub wait_lsn_timeout: Duration,
// How long to wait for WAL redo to complete.
pub wal_redo_timeout: Duration,
pub superuser: String,
pub page_cache_size: usize,
@@ -137,6 +149,10 @@ struct PageServerConfigBuilder {
gc_horizon: BuilderValue<u64>,
gc_period: BuilderValue<Duration>,
wait_lsn_timeout: BuilderValue<Duration>,
wal_redo_timeout: BuilderValue<Duration>,
superuser: BuilderValue<String>,
page_cache_size: BuilderValue<usize>,
@@ -168,6 +184,10 @@ impl Default for PageServerConfigBuilder {
gc_horizon: Set(DEFAULT_GC_HORIZON),
gc_period: Set(humantime::parse_duration(DEFAULT_GC_PERIOD)
.expect("cannot parse default gc period")),
wait_lsn_timeout: Set(humantime::parse_duration(DEFAULT_WAIT_LSN_TIMEOUT)
.expect("cannot parse default wait lsn timeout")),
wal_redo_timeout: Set(humantime::parse_duration(DEFAULT_WAL_REDO_TIMEOUT)
.expect("cannot parse default wal redo timeout")),
superuser: Set(DEFAULT_SUPERUSER.to_string()),
page_cache_size: Set(DEFAULT_PAGE_CACHE_SIZE),
max_file_descriptors: Set(DEFAULT_MAX_FILE_DESCRIPTORS),
@@ -208,6 +228,14 @@ impl PageServerConfigBuilder {
self.gc_period = BuilderValue::Set(gc_period)
}
pub fn wait_lsn_timeout(&mut self, wait_lsn_timeout: Duration) {
self.wait_lsn_timeout = BuilderValue::Set(wait_lsn_timeout)
}
pub fn wal_redo_timeout(&mut self, wal_redo_timeout: Duration) {
self.wal_redo_timeout = BuilderValue::Set(wal_redo_timeout)
}
pub fn superuser(&mut self, superuser: String) {
self.superuser = BuilderValue::Set(superuser)
}
@@ -265,6 +293,12 @@ impl PageServerConfigBuilder {
.gc_horizon
.ok_or(anyhow::anyhow!("missing gc_horizon"))?,
gc_period: self.gc_period.ok_or(anyhow::anyhow!("missing gc_period"))?,
wait_lsn_timeout: self
.wait_lsn_timeout
.ok_or(anyhow::anyhow!("missing wait_lsn_timeout"))?,
wal_redo_timeout: self
.wal_redo_timeout
.ok_or(anyhow::anyhow!("missing wal_redo_timeout"))?,
superuser: self.superuser.ok_or(anyhow::anyhow!("missing superuser"))?,
page_cache_size: self
.page_cache_size
@@ -414,6 +448,8 @@ impl PageServerConf {
"checkpoint_period" => builder.checkpoint_period(parse_toml_duration(key, item)?),
"gc_horizon" => builder.gc_horizon(parse_toml_u64(key, item)?),
"gc_period" => builder.gc_period(parse_toml_duration(key, item)?),
"wait_lsn_timeout" => builder.wait_lsn_timeout(parse_toml_duration(key, item)?),
"wal_redo_timeout" => builder.wal_redo_timeout(parse_toml_duration(key, item)?),
"initial_superuser_name" => builder.superuser(parse_toml_string(key, item)?),
"page_cache_size" => builder.page_cache_size(parse_toml_u64(key, item)? as usize),
"max_file_descriptors" => {
@@ -548,6 +584,8 @@ impl PageServerConf {
checkpoint_period: Duration::from_secs(10),
gc_horizon: defaults::DEFAULT_GC_HORIZON,
gc_period: Duration::from_secs(10),
wait_lsn_timeout: Duration::from_secs(60),
wal_redo_timeout: Duration::from_secs(60),
page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE,
max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS,
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
@@ -618,6 +656,9 @@ checkpoint_period = '111 s'
gc_period = '222 s'
gc_horizon = 222
wait_lsn_timeout = '111 s'
wal_redo_timeout = '111 s'
page_cache_size = 444
max_file_descriptors = 333
@@ -650,6 +691,8 @@ id = 10
checkpoint_period: humantime::parse_duration(defaults::DEFAULT_CHECKPOINT_PERIOD)?,
gc_horizon: defaults::DEFAULT_GC_HORIZON,
gc_period: humantime::parse_duration(defaults::DEFAULT_GC_PERIOD)?,
wait_lsn_timeout: humantime::parse_duration(defaults::DEFAULT_WAIT_LSN_TIMEOUT)?,
wal_redo_timeout: humantime::parse_duration(defaults::DEFAULT_WAL_REDO_TIMEOUT)?,
superuser: defaults::DEFAULT_SUPERUSER.to_string(),
page_cache_size: defaults::DEFAULT_PAGE_CACHE_SIZE,
max_file_descriptors: defaults::DEFAULT_MAX_FILE_DESCRIPTORS,
@@ -692,6 +735,8 @@ id = 10
checkpoint_period: Duration::from_secs(111),
gc_horizon: 222,
gc_period: Duration::from_secs(222),
wait_lsn_timeout: Duration::from_secs(111),
wal_redo_timeout: Duration::from_secs(111),
superuser: "zzzz".to_string(),
page_cache_size: 444,
max_file_descriptors: 333,

View File

@@ -19,7 +19,8 @@ use zenith_utils::http::{
};
use zenith_utils::http::{RequestExt, RouterBuilder};
use zenith_utils::lsn::Lsn;
use zenith_utils::zid::{opt_display_serde, ZTimelineId};
use zenith_utils::zid::HexZTimelineId;
use zenith_utils::zid::ZTimelineId;
use super::models::BranchCreateRequest;
use super::models::StatusResponse;
@@ -198,8 +199,7 @@ enum TimelineInfo {
timeline_id: ZTimelineId,
#[serde(with = "hex")]
tenant_id: ZTenantId,
#[serde(with = "opt_display_serde")]
ancestor_timeline_id: Option<ZTimelineId>,
ancestor_timeline_id: Option<HexZTimelineId>,
last_record_lsn: Lsn,
prev_record_lsn: Lsn,
disk_consistent_lsn: Lsn,
@@ -232,7 +232,9 @@ async fn timeline_detail_handler(request: Request<Body>) -> Result<Response<Body
Some(timeline) => TimelineInfo::Local {
timeline_id,
tenant_id,
ancestor_timeline_id: timeline.get_ancestor_timeline_id(),
ancestor_timeline_id: timeline
.get_ancestor_timeline_id()
.map(HexZTimelineId::from),
disk_consistent_lsn: timeline.get_disk_consistent_lsn(),
last_record_lsn: timeline.get_last_record_lsn(),
prev_record_lsn: timeline.get_prev_record_lsn(),

View File

@@ -29,7 +29,7 @@ use std::ops::{Bound::Included, Deref};
use std::path::{Path, PathBuf};
use std::sync::atomic::{self, AtomicBool, AtomicUsize};
use std::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard};
use std::time::{Duration, Instant};
use std::time::Instant;
use self::metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME};
use crate::config::PageServerConf;
@@ -64,7 +64,6 @@ mod inmemory_layer;
mod interval_tree;
mod layer_map;
pub mod metadata;
mod page_versions;
mod par_fsync;
mod storage_layer;
@@ -83,9 +82,6 @@ pub use crate::layered_repository::ephemeral_file::writeback as writeback_epheme
static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]);
// Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call.
static TIMEOUT: Duration = Duration::from_secs(60);
// Metrics collected on operations on the storage repository.
lazy_static! {
static ref STORAGE_TIME: HistogramVec = register_histogram_vec!(
@@ -816,7 +812,7 @@ impl Timeline for LayeredTimeline {
);
self.last_record_lsn
.wait_for_timeout(lsn, TIMEOUT)
.wait_for_timeout(lsn, self.conf.wait_lsn_timeout)
.with_context(|| {
format!(
"Timed out while waiting for WAL record at LSN {} to arrive, last_record_lsn {} disk consistent LSN={}",
@@ -897,12 +893,11 @@ impl Timeline for LayeredTimeline {
let seg = SegmentTag { rel, segno: 0 };
let result;
if let Some((layer, lsn)) = self.get_layer_for_read(seg, lsn)? {
result = layer.get_seg_exists(lsn)?;
let result = if let Some((layer, lsn)) = self.get_layer_for_read(seg, lsn)? {
layer.get_seg_exists(lsn)?
} else {
result = false;
}
false
};
trace!("get_rel_exists: {} at {} -> {}", rel, lsn, result);
Ok(result)
@@ -1944,22 +1939,21 @@ impl LayeredTimeline {
// for redo.
let rel = seg.rel;
let rel_blknum = seg.segno * RELISH_SEG_SIZE + seg_blknum;
let (cached_lsn_opt, cached_page_opt) = match self.lookup_cached_page(&rel, rel_blknum, lsn)
{
let cached_page_img = match self.lookup_cached_page(&rel, rel_blknum, lsn) {
Some((cached_lsn, cached_img)) => {
match cached_lsn.cmp(&lsn) {
cmp::Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check
cmp::Ordering::Equal => return Ok(cached_img), // exact LSN match, return the image
cmp::Ordering::Greater => panic!(), // the returned lsn should never be after the requested lsn
}
(Some(cached_lsn), Some((cached_lsn, cached_img)))
Some((cached_lsn, cached_img))
}
None => (None, None),
None => None,
};
let mut data = PageReconstructData {
records: Vec::new(),
page_img: None,
page_img: cached_page_img,
};
// Holds an Arc reference to 'layer_ref' when iterating in the loop below.
@@ -1972,15 +1966,14 @@ impl LayeredTimeline {
let mut curr_lsn = lsn;
loop {
let result = layer_ref
.get_page_reconstruct_data(seg_blknum, curr_lsn, cached_lsn_opt, &mut data)
.get_page_reconstruct_data(seg_blknum, curr_lsn, &mut data)
.with_context(|| {
format!(
"Failed to get reconstruct data {} {:?} {} {} {:?}",
"Failed to get reconstruct data {} {:?} {} {}",
layer_ref.get_seg_tag(),
layer_ref.filename(),
seg_blknum,
curr_lsn,
cached_lsn_opt,
)
})?;
match result {
@@ -2027,16 +2020,6 @@ impl LayeredTimeline {
lsn,
);
}
PageReconstructResult::Cached => {
let (cached_lsn, cached_img) = cached_page_opt.unwrap();
assert!(data.page_img.is_none());
if let Some((first_rec_lsn, first_rec)) = data.records.first() {
assert!(&cached_lsn < first_rec_lsn);
assert!(!first_rec.will_init());
}
data.page_img = Some(cached_img);
break;
}
}
}
@@ -2058,12 +2041,12 @@ impl LayeredTimeline {
// If we have a page image, and no WAL, we're all set
if data.records.is_empty() {
if let Some(img) = &data.page_img {
if let Some((img_lsn, img)) = &data.page_img {
trace!(
"found page image for blk {} in {} at {}, no WAL redo required",
rel_blknum,
rel,
request_lsn
img_lsn
);
Ok(img.clone())
} else {
@@ -2090,11 +2073,13 @@ impl LayeredTimeline {
);
Ok(ZERO_PAGE.clone())
} else {
if data.page_img.is_some() {
let base_img = if let Some((_lsn, img)) = data.page_img {
trace!("found {} WAL records and a base image for blk {} in {} at {}, performing WAL redo", data.records.len(), rel_blknum, rel, request_lsn);
Some(img)
} else {
trace!("found {} WAL records that will init the page for blk {} in {} at {}, performing WAL redo", data.records.len(), rel_blknum, rel, request_lsn);
}
None
};
let last_rec_lsn = data.records.last().unwrap().0;
@@ -2102,7 +2087,7 @@ impl LayeredTimeline {
rel,
rel_blknum,
request_lsn,
data.page_img.clone(),
base_img,
data.records,
)?;
@@ -2361,3 +2346,157 @@ fn rename_to_backup(path: PathBuf) -> anyhow::Result<()> {
bail!("couldn't find an unused backup number for {:?}", path)
}
///
/// Tests that are specific to the layered storage format.
///
/// There are more unit tests in repository.rs that work through the
/// Repository interface and are expected to work regardless of the
/// file format and directory layout. The test here are more low level.
///
#[cfg(test)]
mod tests {
use super::*;
use crate::repository::repo_harness::*;
#[test]
fn corrupt_metadata() -> Result<()> {
const TEST_NAME: &str = "corrupt_metadata";
let harness = RepoHarness::create(TEST_NAME)?;
let repo = harness.load();
repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
drop(repo);
let metadata_path = harness.timeline_path(&TIMELINE_ID).join(METADATA_FILE_NAME);
assert!(metadata_path.is_file());
let mut metadata_bytes = std::fs::read(&metadata_path)?;
assert_eq!(metadata_bytes.len(), 512);
metadata_bytes[512 - 4 - 2] ^= 1;
std::fs::write(metadata_path, metadata_bytes)?;
let new_repo = harness.load();
let err = new_repo.get_timeline(TIMELINE_ID).err().unwrap();
assert_eq!(err.to_string(), "failed to load metadata");
assert_eq!(
err.source().unwrap().to_string(),
"metadata checksum mismatch"
);
Ok(())
}
///
/// Test the logic in 'load_layer_map' that removes layer files that are
/// newer than 'disk_consistent_lsn'.
///
#[test]
fn future_layerfiles() -> Result<()> {
const TEST_NAME: &str = "future_layerfiles";
let harness = RepoHarness::create(TEST_NAME)?;
let repo = harness.load();
// Create a timeline with disk_consistent_lsn = 8000
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0x8000))?;
let writer = tline.writer();
writer.advance_last_record_lsn(Lsn(0x8000));
drop(writer);
repo.checkpoint_iteration(CheckpointConfig::Forced)?;
drop(repo);
let timeline_path = harness.timeline_path(&TIMELINE_ID);
let make_empty_file = |filename: &str| -> std::io::Result<()> {
let path = timeline_path.join(filename);
assert!(!path.exists());
std::fs::write(&path, &[])?;
Ok(())
};
// Helper function to check that a relation file exists, and a corresponding
// <filename>.0.old file does not.
let assert_exists = |filename: &str| {
let path = timeline_path.join(filename);
assert!(path.exists(), "file {} was removed", filename);
// Check that there is no .old file
let backup_path = timeline_path.join(format!("{}.0.old", filename));
assert!(
!backup_path.exists(),
"unexpected backup file {}",
backup_path.display()
);
};
// Helper function to check that a relation file does *not* exists, and a corresponding
// <filename>.<num>.old file does.
let assert_is_renamed = |filename: &str, num: u32| {
let path = timeline_path.join(filename);
assert!(
!path.exists(),
"file {} was not removed as expected",
filename
);
let backup_path = timeline_path.join(format!("{}.{}.old", filename, num));
assert!(
backup_path.exists(),
"backup file {} was not created",
backup_path.display()
);
};
// These files are considered to be in the future and will be renamed out
// of the way
let future_filenames = vec![
format!("pg_control_0_{:016X}", 0x8001),
format!("pg_control_0_{:016X}_{:016X}", 0x8001, 0x8008),
];
// But these are not:
let past_filenames = vec![
format!("pg_control_0_{:016X}", 0x8000),
format!("pg_control_0_{:016X}_{:016X}", 0x7000, 0x8001),
];
for filename in future_filenames.iter().chain(past_filenames.iter()) {
make_empty_file(filename)?;
}
// Load the timeline. This will cause the files in the "future" to be renamed
// away.
let new_repo = harness.load();
new_repo.get_timeline(TIMELINE_ID).unwrap();
drop(new_repo);
for filename in future_filenames.iter() {
assert_is_renamed(filename, 0);
}
for filename in past_filenames.iter() {
assert_exists(filename);
}
// Create the future files again, and load again. They should be renamed to
// *.1.old this time.
for filename in future_filenames.iter() {
make_empty_file(filename)?;
}
let new_repo = harness.load();
new_repo.get_timeline(TIMELINE_ID).unwrap();
drop(new_repo);
for filename in future_filenames.iter() {
assert_is_renamed(filename, 0);
assert_is_renamed(filename, 1);
}
for filename in past_filenames.iter() {
assert_exists(filename);
}
Ok(())
}
}

View File

@@ -208,16 +208,15 @@ impl Layer for DeltaLayer {
&self,
blknum: SegmentBlk,
lsn: Lsn,
cached_img_lsn: Option<Lsn>,
reconstruct_data: &mut PageReconstructData,
) -> Result<PageReconstructResult> {
let mut need_image = true;
assert!((0..RELISH_SEG_SIZE).contains(&blknum));
match &cached_img_lsn {
Some(cached_lsn) if &self.end_lsn <= cached_lsn => {
return Ok(PageReconstructResult::Cached)
match &reconstruct_data.page_img {
Some((cached_lsn, _)) if &self.end_lsn <= cached_lsn => {
return Ok(PageReconstructResult::Complete)
}
_ => {}
}
@@ -240,9 +239,9 @@ impl Layer for DeltaLayer {
.iter()
.rev();
for ((_blknum, pv_lsn), blob_range) in iter {
match &cached_img_lsn {
Some(cached_lsn) if pv_lsn <= cached_lsn => {
return Ok(PageReconstructResult::Cached)
match &reconstruct_data.page_img {
Some((cached_lsn, _)) if pv_lsn <= cached_lsn => {
return Ok(PageReconstructResult::Complete)
}
_ => {}
}
@@ -252,7 +251,7 @@ impl Layer for DeltaLayer {
match pv {
PageVersion::Page(img) => {
// Found a page image, return it
reconstruct_data.page_img = Some(img);
reconstruct_data.page_img = Some((*pv_lsn, img));
need_image = false;
break;
}

View File

@@ -145,14 +145,15 @@ impl Layer for ImageLayer {
&self,
blknum: SegmentBlk,
lsn: Lsn,
cached_img_lsn: Option<Lsn>,
reconstruct_data: &mut PageReconstructData,
) -> Result<PageReconstructResult> {
assert!((0..RELISH_SEG_SIZE).contains(&blknum));
assert!(lsn >= self.lsn);
match cached_img_lsn {
Some(cached_lsn) if self.lsn <= cached_lsn => return Ok(PageReconstructResult::Cached),
match reconstruct_data.page_img {
Some((cached_lsn, _)) if self.lsn <= cached_lsn => {
return Ok(PageReconstructResult::Complete)
}
_ => {}
}
@@ -195,7 +196,7 @@ impl Layer for ImageLayer {
}
};
reconstruct_data.page_img = Some(Bytes::from(buf));
reconstruct_data.page_img = Some((self.lsn, Bytes::from(buf)));
Ok(PageReconstructResult::Complete)
}

View File

@@ -20,13 +20,15 @@ use crate::{ZTenantId, ZTimelineId};
use anyhow::{ensure, Result};
use bytes::Bytes;
use log::*;
use std::collections::HashMap;
use std::io::Seek;
use std::os::unix::fs::FileExt;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::Lsn;
use zenith_utils::vec_map::VecMap;
use super::page_versions::PageVersions;
pub struct InMemoryLayer {
conf: &'static PageServerConf,
tenantid: ZTenantId,
@@ -71,11 +73,15 @@ pub struct InMemoryLayerInner {
/// The drop LSN is recorded in [`end_lsn`].
dropped: bool,
///
/// All versions of all pages in the layer are are kept here.
/// Indexed by block number and LSN.
///
page_versions: PageVersions,
/// The PageVersion structs are stored in a serialized format in this file.
/// Each serialized PageVersion is preceded by a 'u32' length field.
/// 'page_versions' map stores offsets into this file.
file: EphemeralFile,
/// Metadata about all versions of all pages in the layer is kept
/// here. Indexed by block number and LSN. The value is an offset
/// into the ephemeral file where the page version is stored.
page_versions: HashMap<SegmentBlk, VecMap<Lsn, u64>>,
///
/// `seg_sizes` tracks the size of the segment at different points in time.
@@ -111,6 +117,50 @@ impl InMemoryLayerInner {
panic!("could not find seg size in in-memory layer");
}
}
///
/// Read a page version from the ephemeral file.
///
fn read_pv(&self, off: u64) -> Result<PageVersion> {
let mut buf = Vec::new();
self.read_pv_bytes(off, &mut buf)?;
Ok(PageVersion::des(&buf)?)
}
///
/// Read a page version from the ephemeral file, as raw bytes, at
/// the given offset. The bytes are read into 'buf', which is
/// expanded if necessary. Returns the size of the page version.
///
fn read_pv_bytes(&self, off: u64, buf: &mut Vec<u8>) -> Result<usize> {
// read length
let mut lenbuf = [0u8; 4];
self.file.read_exact_at(&mut lenbuf, off)?;
let len = u32::from_ne_bytes(lenbuf) as usize;
if buf.len() < len {
buf.resize(len, 0);
}
self.file.read_exact_at(&mut buf[0..len], off + 4)?;
Ok(len)
}
fn write_pv(&mut self, pv: &PageVersion) -> Result<u64> {
// remember starting position
let pos = self.file.stream_position()?;
// make room for the 'length' field by writing zeros as a placeholder.
self.file.seek(std::io::SeekFrom::Start(pos + 4)).unwrap();
pv.ser_into(&mut self.file).unwrap();
// write the 'length' field.
let len = self.file.stream_position()? - pos - 4;
let lenbuf = u32::to_ne_bytes(len as u32);
self.file.write_all_at(&lenbuf, pos)?;
Ok(pos)
}
}
impl Layer for InMemoryLayer {
@@ -120,12 +170,11 @@ impl Layer for InMemoryLayer {
fn filename(&self) -> PathBuf {
let inner = self.inner.read().unwrap();
let end_lsn;
if let Some(drop_lsn) = inner.end_lsn {
end_lsn = drop_lsn;
let end_lsn = if let Some(drop_lsn) = inner.end_lsn {
drop_lsn
} else {
end_lsn = Lsn(u64::MAX);
}
Lsn(u64::MAX)
};
let delta_filename = DeltaFileName {
seg: self.seg,
@@ -174,7 +223,6 @@ impl Layer for InMemoryLayer {
&self,
blknum: SegmentBlk,
lsn: Lsn,
cached_img_lsn: Option<Lsn>,
reconstruct_data: &mut PageReconstructData,
) -> Result<PageReconstructResult> {
let mut need_image = true;
@@ -185,33 +233,31 @@ impl Layer for InMemoryLayer {
let inner = self.inner.read().unwrap();
// Scan the page versions backwards, starting from `lsn`.
let iter = inner
.page_versions
.get_block_lsn_range(blknum, ..=lsn)
.iter()
.rev();
for (entry_lsn, pos) in iter {
match &cached_img_lsn {
Some(cached_lsn) if entry_lsn <= cached_lsn => {
return Ok(PageReconstructResult::Cached)
if let Some(vec_map) = inner.page_versions.get(&blknum) {
let slice = vec_map.slice_range(..=lsn);
for (entry_lsn, pos) in slice.iter().rev() {
match &reconstruct_data.page_img {
Some((cached_lsn, _)) if entry_lsn <= cached_lsn => {
return Ok(PageReconstructResult::Complete)
}
_ => {}
}
_ => {}
}
let pv = inner.page_versions.read_pv(*pos)?;
match pv {
PageVersion::Page(img) => {
reconstruct_data.page_img = Some(img);
need_image = false;
break;
}
PageVersion::Wal(rec) => {
reconstruct_data.records.push((*entry_lsn, rec.clone()));
if rec.will_init() {
// This WAL record initializes the page, so no need to go further back
let pv = inner.read_pv(*pos)?;
match pv {
PageVersion::Page(img) => {
reconstruct_data.page_img = Some((*entry_lsn, img));
need_image = false;
break;
}
PageVersion::Wal(rec) => {
reconstruct_data.records.push((*entry_lsn, rec.clone()));
if rec.will_init() {
// This WAL record initializes the page, so no need to go further back
need_image = false;
break;
}
}
}
}
}
@@ -317,14 +363,22 @@ impl Layer for InMemoryLayer {
println!("seg_sizes {}: {}", k, v);
}
for (blknum, lsn, pos) in inner.page_versions.ordered_page_version_iter(None) {
let pv = inner.page_versions.read_pv(pos)?;
let pv_description = match pv {
PageVersion::Page(_img) => "page",
PageVersion::Wal(_rec) => "wal",
};
// List the blocks in order
let mut page_versions: Vec<(&SegmentBlk, &VecMap<Lsn, u64>)> =
inner.page_versions.iter().collect();
page_versions.sort_by_key(|k| k.0);
println!("blk {} at {}: {}\n", blknum, lsn, pv_description);
for (blknum, versions) in page_versions {
for (lsn, off) in versions.as_slice() {
let pv = inner.read_pv(*off);
let pv_description = match pv {
Ok(PageVersion::Page(_img)) => "page",
Ok(PageVersion::Wal(_rec)) => "wal",
Err(_err) => "INVALID",
};
println!("blk {} at {}: {}\n", blknum, lsn, pv_description);
}
}
Ok(())
@@ -385,7 +439,8 @@ impl InMemoryLayer {
inner: RwLock::new(InMemoryLayerInner {
end_lsn: None,
dropped: false,
page_versions: PageVersions::new(file),
file,
page_versions: HashMap::new(),
seg_sizes,
latest_lsn: oldest_lsn,
}),
@@ -427,14 +482,18 @@ impl InMemoryLayer {
assert!(lsn >= inner.latest_lsn);
inner.latest_lsn = lsn;
let old = inner.page_versions.append_or_update_last(blknum, lsn, pv)?;
if old.is_some() {
// We already had an entry for this LSN. That's odd..
warn!(
"Page version of rel {} blk {} at {} already exists",
self.seg.rel, blknum, lsn
);
// Write the page version to the file, and remember its offset in 'page_versions'
{
let off = inner.write_pv(&pv)?;
let vec_map = inner.page_versions.entry(blknum).or_default();
let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
if old.is_some() {
// We already had an entry for this LSN. That's odd..
warn!(
"Page version of rel {} blk {} at {} already exists",
self.seg.rel, blknum, lsn
);
}
}
// Also update the relation size, if this extended the relation.
@@ -468,16 +527,19 @@ impl InMemoryLayer {
gapblknum,
blknum
);
let old = inner
.page_versions
.append_or_update_last(gapblknum, lsn, zeropv)?;
// We already had an entry for this LSN. That's odd..
if old.is_some() {
warn!(
"Page version of seg {} blk {} at {} already exists",
self.seg, blknum, lsn
);
// Write the page version to the file, and remember its offset in
// 'page_versions'
{
let off = inner.write_pv(&zeropv)?;
let vec_map = inner.page_versions.entry(gapblknum).or_default();
let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
if old.is_some() {
warn!(
"Page version of seg {} blk {} at {} already exists",
self.seg, gapblknum, lsn
);
}
}
}
@@ -570,7 +632,8 @@ impl InMemoryLayer {
inner: RwLock::new(InMemoryLayerInner {
end_lsn: None,
dropped: false,
page_versions: PageVersions::new(file),
file,
page_versions: HashMap::new(),
seg_sizes,
latest_lsn: oldest_lsn,
}),
@@ -599,8 +662,10 @@ impl InMemoryLayer {
assert!(lsn <= &end_lsn, "{:?} {:?}", lsn, end_lsn);
}
for (_blk, lsn, _pv) in inner.page_versions.ordered_page_version_iter(None) {
assert!(lsn <= end_lsn);
for (_blk, vec_map) in inner.page_versions.iter() {
for (lsn, _pos) in vec_map.as_slice() {
assert!(*lsn <= end_lsn);
}
}
}
}
@@ -678,15 +743,19 @@ impl InMemoryLayer {
self.is_dropped(),
)?;
// Write all page versions
// Write all page versions, in block + LSN order
let mut buf: Vec<u8> = Vec::new();
let page_versions_iter = inner
.page_versions
.ordered_page_version_iter(Some(delta_end_lsn));
for (blknum, lsn, pos) in page_versions_iter {
let len = inner.page_versions.read_pv_bytes(pos, &mut buf)?;
delta_layer_writer.put_page_version(blknum, lsn, &buf[..len])?;
let pv_iter = inner.page_versions.iter();
let mut pages: Vec<(&SegmentBlk, &VecMap<Lsn, u64>)> = pv_iter.collect();
pages.sort_by_key(|(blknum, _vec_map)| *blknum);
for (blknum, vec_map) in pages {
for (lsn, pos) in vec_map.as_slice() {
if *lsn < delta_end_lsn {
let len = inner.read_pv_bytes(*pos, &mut buf)?;
delta_layer_writer.put_page_version(*blknum, *lsn, &buf[..len])?;
}
}
}
// Create seg_sizes

View File

@@ -1,268 +0,0 @@
//!
//! Data structure to ingest incoming WAL into an append-only file.
//!
//! - The file is considered temporary, and will be discarded on crash
//! - based on a B-tree
//!
use std::os::unix::fs::FileExt;
use std::{collections::HashMap, ops::RangeBounds, slice};
use anyhow::Result;
use std::cmp::min;
use std::io::Seek;
use zenith_utils::{lsn::Lsn, vec_map::VecMap};
use super::storage_layer::PageVersion;
use crate::layered_repository::ephemeral_file::EphemeralFile;
use zenith_utils::bin_ser::BeSer;
const EMPTY_SLICE: &[(Lsn, u64)] = &[];
pub struct PageVersions {
map: HashMap<u32, VecMap<Lsn, u64>>,
/// The PageVersion structs are stored in a serialized format in this file.
/// Each serialized PageVersion is preceded by a 'u32' length field.
/// The 'map' stores offsets into this file.
file: EphemeralFile,
}
impl PageVersions {
pub fn new(file: EphemeralFile) -> PageVersions {
PageVersions {
map: HashMap::new(),
file,
}
}
pub fn append_or_update_last(
&mut self,
blknum: u32,
lsn: Lsn,
page_version: PageVersion,
) -> Result<Option<u64>> {
// remember starting position
let pos = self.file.stream_position()?;
// make room for the 'length' field by writing zeros as a placeholder.
self.file.seek(std::io::SeekFrom::Start(pos + 4)).unwrap();
page_version.ser_into(&mut self.file).unwrap();
// write the 'length' field.
let len = self.file.stream_position()? - pos - 4;
let lenbuf = u32::to_ne_bytes(len as u32);
self.file.write_all_at(&lenbuf, pos)?;
let map = self.map.entry(blknum).or_insert_with(VecMap::default);
Ok(map.append_or_update_last(lsn, pos as u64).unwrap().0)
}
/// Get all [`PageVersion`]s in a block
fn get_block_slice(&self, blknum: u32) -> &[(Lsn, u64)] {
self.map
.get(&blknum)
.map(VecMap::as_slice)
.unwrap_or(EMPTY_SLICE)
}
/// Get a range of [`PageVersions`] in a block
pub fn get_block_lsn_range<R: RangeBounds<Lsn>>(&self, blknum: u32, range: R) -> &[(Lsn, u64)] {
self.map
.get(&blknum)
.map(|vec_map| vec_map.slice_range(range))
.unwrap_or(EMPTY_SLICE)
}
/// Iterate through [`PageVersion`]s in (block, lsn) order.
/// If a [`cutoff_lsn`] is set, only show versions with `lsn < cutoff_lsn`
pub fn ordered_page_version_iter(&self, cutoff_lsn: Option<Lsn>) -> OrderedPageVersionIter<'_> {
let mut ordered_blocks: Vec<u32> = self.map.keys().cloned().collect();
ordered_blocks.sort_unstable();
let slice = ordered_blocks
.first()
.map(|&blknum| self.get_block_slice(blknum))
.unwrap_or(EMPTY_SLICE);
OrderedPageVersionIter {
page_versions: self,
ordered_blocks,
cur_block_idx: 0,
cutoff_lsn,
cur_slice_iter: slice.iter(),
}
}
///
/// Read a page version.
///
pub fn read_pv(&self, off: u64) -> Result<PageVersion> {
let mut buf = Vec::new();
self.read_pv_bytes(off, &mut buf)?;
Ok(PageVersion::des(&buf)?)
}
///
/// Read a page version, as raw bytes, at the given offset. The bytes
/// are read into 'buf', which is expanded if necessary. Returns the
/// size of the page version.
///
pub fn read_pv_bytes(&self, off: u64, buf: &mut Vec<u8>) -> Result<usize> {
// read length
let mut lenbuf = [0u8; 4];
self.file.read_exact_at(&mut lenbuf, off)?;
let len = u32::from_ne_bytes(lenbuf) as usize;
// Resize the buffer to fit the data, if needed.
//
// We don't shrink the buffer if it's larger than necessary. That avoids
// repeatedly shrinking and expanding when you reuse the same buffer to
// read multiple page versions. Expanding a Vec requires initializing the
// new bytes, which is a waste of time because we're immediately overwriting
// it, but there's no way to avoid it without resorting to unsafe code.
if buf.len() < len {
buf.resize(len, 0);
}
self.file.read_exact_at(&mut buf[0..len], off + 4)?;
Ok(len)
}
}
pub struct PageVersionReader<'a> {
file: &'a EphemeralFile,
pos: u64,
end_pos: u64,
}
impl<'a> std::io::Read for PageVersionReader<'a> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
let len = min(buf.len(), (self.end_pos - self.pos) as usize);
let n = self.file.read_at(&mut buf[..len], self.pos)?;
self.pos += n as u64;
Ok(n)
}
}
pub struct OrderedPageVersionIter<'a> {
page_versions: &'a PageVersions,
ordered_blocks: Vec<u32>,
cur_block_idx: usize,
cutoff_lsn: Option<Lsn>,
cur_slice_iter: slice::Iter<'a, (Lsn, u64)>,
}
impl OrderedPageVersionIter<'_> {
fn is_lsn_before_cutoff(&self, lsn: &Lsn) -> bool {
if let Some(cutoff_lsn) = self.cutoff_lsn.as_ref() {
lsn < cutoff_lsn
} else {
true
}
}
}
impl<'a> Iterator for OrderedPageVersionIter<'a> {
type Item = (u32, Lsn, u64);
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some((lsn, pos)) = self.cur_slice_iter.next() {
if self.is_lsn_before_cutoff(lsn) {
let blknum = self.ordered_blocks[self.cur_block_idx];
return Some((blknum, *lsn, *pos));
}
}
let next_block_idx = self.cur_block_idx + 1;
let blknum: u32 = *self.ordered_blocks.get(next_block_idx)?;
self.cur_block_idx = next_block_idx;
self.cur_slice_iter = self.page_versions.get_block_slice(blknum).iter();
}
}
}
#[cfg(test)]
mod tests {
use bytes::Bytes;
use super::*;
use crate::config::PageServerConf;
use std::fs;
use std::str::FromStr;
use zenith_utils::zid::{ZTenantId, ZTimelineId};
fn repo_harness(test_name: &str) -> Result<(&'static PageServerConf, ZTenantId, ZTimelineId)> {
let repo_dir = PageServerConf::test_repo_dir(test_name);
let _ = fs::remove_dir_all(&repo_dir);
let conf = PageServerConf::dummy_conf(repo_dir);
// Make a static copy of the config. This can never be free'd, but that's
// OK in a test.
let conf: &'static PageServerConf = Box::leak(Box::new(conf));
let tenantid = ZTenantId::from_str("11000000000000000000000000000000").unwrap();
let timelineid = ZTimelineId::from_str("22000000000000000000000000000000").unwrap();
fs::create_dir_all(conf.timeline_path(&timelineid, &tenantid))?;
Ok((conf, tenantid, timelineid))
}
#[test]
fn test_ordered_iter() -> Result<()> {
let (conf, tenantid, timelineid) = repo_harness("test_ordered_iter")?;
let file = EphemeralFile::create(conf, tenantid, timelineid)?;
let mut page_versions = PageVersions::new(file);
const BLOCKS: u32 = 1000;
const LSNS: u64 = 50;
let empty_page = Bytes::from_static(&[0u8; 8192]);
let empty_page_version = PageVersion::Page(empty_page);
for blknum in 0..BLOCKS {
for lsn in 0..LSNS {
let old = page_versions.append_or_update_last(
blknum,
Lsn(lsn),
empty_page_version.clone(),
)?;
assert!(old.is_none());
}
}
let mut iter = page_versions.ordered_page_version_iter(None);
for blknum in 0..BLOCKS {
for lsn in 0..LSNS {
let (actual_blknum, actual_lsn, _pv) = iter.next().unwrap();
assert_eq!(actual_blknum, blknum);
assert_eq!(Lsn(lsn), actual_lsn);
}
}
assert!(iter.next().is_none());
assert!(iter.next().is_none()); // should be robust against excessive next() calls
const CUTOFF_LSN: Lsn = Lsn(30);
let mut iter = page_versions.ordered_page_version_iter(Some(CUTOFF_LSN));
for blknum in 0..BLOCKS {
for lsn in 0..CUTOFF_LSN.0 {
let (actual_blknum, actual_lsn, _pv) = iter.next().unwrap();
assert_eq!(actual_blknum, blknum);
assert_eq!(Lsn(lsn), actual_lsn);
}
}
assert!(iter.next().is_none());
assert!(iter.next().is_none()); // should be robust against excessive next() calls
Ok(())
}
}

View File

@@ -71,15 +71,26 @@ pub enum PageVersion {
}
///
/// Data needed to reconstruct a page version
/// Struct used to communicate across calls to 'get_page_reconstruct_data'.
///
/// 'page_img' is the old base image of the page to start the WAL replay with.
/// It can be None, if the first WAL record initializes the page (will_init)
/// 'records' contains the records to apply over the base image.
/// Before first call to get_page_reconstruct_data, you can fill in 'page_img'
/// if you have an older cached version of the page available. That can save
/// work in 'get_page_reconstruct_data', as it can stop searching for page
/// versions when all the WAL records going back to the cached image have been
/// collected.
///
/// When get_page_reconstruct_data returns Complete, 'page_img' is set to an
/// image of the page, or the oldest WAL record in 'records' is a will_init-type
/// record that initializes the page without requiring a previous image.
///
/// If 'get_page_reconstruct_data' returns Continue, some 'records' may have
/// been collected, but there are more records outside the current layer. Pass
/// the same PageReconstructData struct in the next 'get_page_reconstruct_data'
/// call, to collect more records.
///
pub struct PageReconstructData {
pub records: Vec<(Lsn, ZenithWalRecord)>,
pub page_img: Option<Bytes>,
pub page_img: Option<(Lsn, Bytes)>,
}
/// Return value from Layer::get_page_reconstruct_data
@@ -93,8 +104,6 @@ pub enum PageReconstructResult {
/// the returned LSN. This is usually considered an error, but might be OK
/// in some circumstances.
Missing(Lsn),
/// Use the cached image at `cached_img_lsn` as the base image
Cached,
}
///
@@ -138,19 +147,16 @@ pub trait Layer: Send + Sync {
/// It is up to the caller to collect more data from previous layer and
/// perform WAL redo, if necessary.
///
/// `cached_img_lsn` should be set to a cached page image's lsn < `lsn`.
/// This function will only return data after `cached_img_lsn`.
///
/// See PageReconstructResult for possible return values. The collected data
/// is appended to reconstruct_data; the caller should pass an empty struct
/// on first call. If this returns PageReconstructResult::Continue, look up
/// the predecessor layer and call again with the same 'reconstruct_data'
/// to collect more data.
/// on first call, or a struct with a cached older image of the page if one
/// is available. If this returns PageReconstructResult::Continue, look up
/// the predecessor layer and call again with the same 'reconstruct_data' to
/// collect more data.
fn get_page_reconstruct_data(
&self,
blknum: SegmentBlk,
lsn: Lsn,
cached_img_lsn: Option<Lsn>,
reconstruct_data: &mut PageReconstructData,
) -> Result<PageReconstructResult>;

View File

@@ -447,8 +447,6 @@ pub mod repo_harness {
#[allow(clippy::bool_assert_comparison)]
#[cfg(test)]
mod tests {
use crate::layered_repository::metadata::METADATA_FILE_NAME;
use super::repo_harness::*;
use super::*;
use postgres_ffi::{pg_constants, xlog_utils::SIZEOF_CHECKPOINT};
@@ -746,8 +744,8 @@ mod tests {
let mut lsn = 0x10;
for blknum in 0..pg_constants::RELSEG_SIZE + 1 {
let img = TEST_IMG(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
lsn += 0x10;
let img = TEST_IMG(&format!("foo blk {} at {}", blknum, Lsn(lsn)));
writer.put_page_image(TESTREL_A, blknum as BlockNumber, Lsn(lsn), img)?;
}
writer.advance_last_record_lsn(Lsn(lsn));
@@ -1132,141 +1130,4 @@ mod tests {
Ok(())
}
#[test]
fn corrupt_metadata() -> Result<()> {
const TEST_NAME: &str = "corrupt_metadata";
let harness = RepoHarness::create(TEST_NAME)?;
let repo = harness.load();
repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
drop(repo);
let metadata_path = harness.timeline_path(&TIMELINE_ID).join(METADATA_FILE_NAME);
assert!(metadata_path.is_file());
let mut metadata_bytes = std::fs::read(&metadata_path)?;
assert_eq!(metadata_bytes.len(), 512);
metadata_bytes[512 - 4 - 2] ^= 1;
std::fs::write(metadata_path, metadata_bytes)?;
let new_repo = harness.load();
let err = new_repo.get_timeline(TIMELINE_ID).err().unwrap();
assert_eq!(err.to_string(), "failed to load metadata");
assert_eq!(
err.source().unwrap().to_string(),
"metadata checksum mismatch"
);
Ok(())
}
#[test]
fn future_layerfiles() -> Result<()> {
const TEST_NAME: &str = "future_layerfiles";
let harness = RepoHarness::create(TEST_NAME)?;
let repo = harness.load();
// Create a timeline with disk_consistent_lsn = 8000
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0x8000))?;
let writer = tline.writer();
writer.advance_last_record_lsn(Lsn(0x8000));
drop(writer);
repo.checkpoint_iteration(CheckpointConfig::Forced)?;
drop(repo);
let timeline_path = harness.timeline_path(&TIMELINE_ID);
let make_empty_file = |filename: &str| -> std::io::Result<()> {
let path = timeline_path.join(filename);
assert!(!path.exists());
std::fs::write(&path, &[])?;
Ok(())
};
// Helper function to check that a relation file exists, and a corresponding
// <filename>.0.old file does not.
let assert_exists = |filename: &str| {
let path = timeline_path.join(filename);
assert!(path.exists(), "file {} was removed", filename);
// Check that there is no .old file
let backup_path = timeline_path.join(format!("{}.0.old", filename));
assert!(
!backup_path.exists(),
"unexpected backup file {}",
backup_path.display()
);
};
// Helper function to check that a relation file does *not* exists, and a corresponding
// <filename>.<num>.old file does.
let assert_is_renamed = |filename: &str, num: u32| {
let path = timeline_path.join(filename);
assert!(
!path.exists(),
"file {} was not removed as expected",
filename
);
let backup_path = timeline_path.join(format!("{}.{}.old", filename, num));
assert!(
backup_path.exists(),
"backup file {} was not created",
backup_path.display()
);
};
// These files are considered to be in the future and will be renamed out
// of the way
let future_filenames = vec![
format!("pg_control_0_{:016X}", 0x8001),
format!("pg_control_0_{:016X}_{:016X}", 0x8001, 0x8008),
];
// But these are not:
let past_filenames = vec![
format!("pg_control_0_{:016X}", 0x8000),
format!("pg_control_0_{:016X}_{:016X}", 0x7000, 0x8001),
];
for filename in future_filenames.iter().chain(past_filenames.iter()) {
make_empty_file(filename)?;
}
// Load the timeline. This will cause the files in the "future" to be renamed
// away.
let new_repo = harness.load();
new_repo.get_timeline(TIMELINE_ID).unwrap();
drop(new_repo);
for filename in future_filenames.iter() {
assert_is_renamed(filename, 0);
}
for filename in past_filenames.iter() {
assert_exists(filename);
}
// Create the future files again, and load again. They should be renamed to
// *.1.old this time.
for filename in future_filenames.iter() {
make_empty_file(filename)?;
}
let new_repo = harness.load();
new_repo.get_timeline(TIMELINE_ID).unwrap();
drop(new_repo);
for filename in future_filenames.iter() {
assert_is_renamed(filename, 0);
assert_is_renamed(filename, 1);
}
for filename in past_filenames.iter() {
assert_exists(filename);
}
Ok(())
}
}

View File

@@ -268,12 +268,11 @@ impl XlXactParsedRecord {
let info = xl_info & pg_constants::XLOG_XACT_OPMASK;
// The record starts with time of commit/abort
let xact_time = buf.get_i64_le();
let xinfo;
if xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 {
xinfo = buf.get_u32_le();
let xinfo = if xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 {
buf.get_u32_le()
} else {
xinfo = 0;
}
0
};
let db_id;
let ts_id;
if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 {
@@ -502,7 +501,6 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
0..=pg_constants::XLR_MAX_BLOCK_ID => {
/* XLogRecordBlockHeader */
let mut blk = DecodedBkpBlock::new();
let fork_flags: u8;
if block_id <= max_block_id {
// TODO
@@ -515,7 +513,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord {
}
max_block_id = block_id;
fork_flags = buf.get_u8();
let fork_flags: u8 = buf.get_u8();
blk.forknum = fork_flags & pg_constants::BKPBLOCK_FORK_MASK;
blk.flags = fork_flags;
blk.has_image = (fork_flags & pg_constants::BKPBLOCK_HAS_IMAGE) != 0;

View File

@@ -102,8 +102,6 @@ impl crate::walredo::WalRedoManager for DummyRedoManager {
}
}
static TIMEOUT: Duration = Duration::from_secs(20);
// Metrics collected on WAL redo operations
//
// We collect the time spent in actual WAL redo ('redo'), and time waiting
@@ -221,7 +219,14 @@ impl WalRedoManager for PostgresRedoManager {
let result = if batch_zenith {
self.apply_batch_zenith(rel, blknum, lsn, img, &records[batch_start..i])
} else {
self.apply_batch_postgres(rel, blknum, lsn, img, &records[batch_start..i])
self.apply_batch_postgres(
rel,
blknum,
lsn,
img,
&records[batch_start..i],
self.conf.wal_redo_timeout,
)
};
img = Some(result?);
@@ -233,7 +238,14 @@ impl WalRedoManager for PostgresRedoManager {
if batch_zenith {
self.apply_batch_zenith(rel, blknum, lsn, img, &records[batch_start..])
} else {
self.apply_batch_postgres(rel, blknum, lsn, img, &records[batch_start..])
self.apply_batch_postgres(
rel,
blknum,
lsn,
img,
&records[batch_start..],
self.conf.wal_redo_timeout,
)
}
}
}
@@ -261,6 +273,7 @@ impl PostgresRedoManager {
lsn: Lsn,
base_img: Option<Bytes>,
records: &[(Lsn, ZenithWalRecord)],
wal_redo_timeout: Duration,
) -> Result<Bytes, WalRedoError> {
let start_time = Instant::now();
@@ -281,7 +294,7 @@ impl PostgresRedoManager {
let result = if let RelishTag::Relation(rel) = rel {
// Relational WAL records are applied using wal-redo-postgres
let buf_tag = BufferTag { rel, blknum };
apply_result = process.apply_wal_records(buf_tag, base_img, records);
apply_result = process.apply_wal_records(buf_tag, base_img, records, wal_redo_timeout);
apply_result.map_err(WalRedoError::IoError)
} else {
@@ -603,6 +616,7 @@ impl PostgresRedoProcess {
tag: BufferTag,
base_img: Option<Bytes>,
records: &[(Lsn, ZenithWalRecord)],
wal_redo_timeout: Duration,
) -> Result<Bytes, std::io::Error> {
// Serialize all the messages to send the WAL redo process first.
//
@@ -653,7 +667,7 @@ impl PostgresRedoProcess {
// If we have more data to write, wake up if 'stdin' becomes writeable or
// we have data to read. Otherwise only wake up if there's data to read.
let nfds = if nwrite < writebuf.len() { 3 } else { 2 };
let n = nix::poll::poll(&mut pollfds[0..nfds], TIMEOUT.as_millis() as i32)?;
let n = nix::poll::poll(&mut pollfds[0..nfds], wal_redo_timeout.as_millis() as i32)?;
if n == 0 {
return Err(Error::new(ErrorKind::Other, "WAL redo timed out"));

25
poetry.lock generated
View File

@@ -91,6 +91,14 @@ botocore = ">=1.11.3"
future = "*"
wrapt = "*"
[[package]]
name = "backoff"
version = "1.11.1"
description = "Function decoration for backoff and retry"
category = "main"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
[[package]]
name = "boto3"
version = "1.20.40"
@@ -814,11 +822,11 @@ python-versions = "*"
[[package]]
name = "moto"
version = "3.0.0"
version = "3.0.4"
description = "A library that allows your python tests to easily mock out the boto library"
category = "main"
optional = false
python-versions = "*"
python-versions = ">=3.6"
[package.dependencies]
aws-xray-sdk = {version = ">=0.93,<0.96 || >0.96", optional = true, markers = "extra == \"server\""}
@@ -848,7 +856,8 @@ xmltodict = "*"
[package.extras]
all = ["PyYAML (>=5.1)", "python-jose[cryptography] (>=3.1.0,<4.0.0)", "ecdsa (!=0.15)", "docker (>=2.5.1)", "graphql-core", "jsondiff (>=1.1.2)", "aws-xray-sdk (>=0.93,!=0.96)", "idna (>=2.5,<4)", "cfn-lint (>=0.4.0)", "sshpubkeys (>=3.1.0)", "setuptools"]
apigateway = ["python-jose[cryptography] (>=3.1.0,<4.0.0)", "ecdsa (!=0.15)"]
apigateway = ["PyYAML (>=5.1)", "python-jose[cryptography] (>=3.1.0,<4.0.0)", "ecdsa (!=0.15)"]
apigatewayv2 = ["PyYAML (>=5.1)"]
appsync = ["graphql-core"]
awslambda = ["docker (>=2.5.1)"]
batch = ["docker (>=2.5.1)"]
@@ -1352,7 +1361,7 @@ testing = ["pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytest-
[metadata]
lock-version = "1.1"
python-versions = "^3.7"
content-hash = "0fa6c9377fbc827240d18d8b7e3742def37e90fc3277fddf8525d82dabd13090"
content-hash = "58762accad4122026c650fa43421a900546e89f9908e2268410e7b11cc8c6c4e"
[metadata.files]
aiopg = [
@@ -1395,6 +1404,10 @@ aws-xray-sdk = [
{file = "aws-xray-sdk-2.9.0.tar.gz", hash = "sha256:b0cd972db218d4d8f7b53ad806fc6184626b924c4997ae58fc9f2a8cd1281568"},
{file = "aws_xray_sdk-2.9.0-py2.py3-none-any.whl", hash = "sha256:98216b3ac8281b51b59a8703f8ec561c460807d9d0679838f5c0179d381d7e58"},
]
backoff = [
{file = "backoff-1.11.1-py2.py3-none-any.whl", hash = "sha256:61928f8fa48d52e4faa81875eecf308eccfb1016b018bb6bd21e05b5d90a96c5"},
{file = "backoff-1.11.1.tar.gz", hash = "sha256:ccb962a2378418c667b3c979b504fdeb7d9e0d29c0579e3b13b86467177728cb"},
]
boto3 = [
{file = "boto3-1.20.40-py3-none-any.whl", hash = "sha256:cfe85589e4a0a997c7b9ae7432400b03fa6fa5fea29fdc48db3099a903b76998"},
{file = "boto3-1.20.40.tar.gz", hash = "sha256:66aef9a6d8cad393f69166112ba49e14e2c6766f9278c96134101314a9af2992"},
@@ -1666,8 +1679,8 @@ mccabe = [
{file = "mccabe-0.6.1.tar.gz", hash = "sha256:dd8d182285a0fe56bace7f45b5e7d1a6ebcbf524e8f3bd87eb0f125271b8831f"},
]
moto = [
{file = "moto-3.0.0-py2.py3-none-any.whl", hash = "sha256:762d33bbad3642c687f6495e69331318bef43f9aa662174397706ec3ad2a3578"},
{file = "moto-3.0.0.tar.gz", hash = "sha256:d6b00a2663290e7ebb06823d5ffcb124c8dc9bf526b878539ef7c4a377fd8255"},
{file = "moto-3.0.4-py2.py3-none-any.whl", hash = "sha256:79646213d8438385182f4eea79e28725f94b3d0d3dc9a3eda81db47e0ebef6cc"},
{file = "moto-3.0.4.tar.gz", hash = "sha256:168b8a3cb4dd8a6df8e51d582761cefa9657b9f45ac7e1eb24dae394ebc9e000"},
]
mypy = [
{file = "mypy-0.910-cp35-cp35m-macosx_10_9_x86_64.whl", hash = "sha256:a155d80ea6cee511a3694b108c4494a39f42de11ee4e61e72bc424c490e46457"},

View File

@@ -7,11 +7,13 @@ use std::collections::HashMap;
use tokio::io::{AsyncRead, AsyncWrite};
use zenith_utils::pq_proto::{BeMessage as Be, BeParameterStatusMessage, FeMessage as Fe};
// TODO rename the struct to ClientParams or something
/// Various client credentials which we use for authentication.
#[derive(Debug, PartialEq, Eq)]
pub struct ClientCredentials {
pub user: String,
pub dbname: String,
pub options: Option<String>,
}
impl TryFrom<HashMap<String, String>> for ClientCredentials {
@@ -25,9 +27,22 @@ impl TryFrom<HashMap<String, String>> for ClientCredentials {
};
let user = get_param("user")?;
let db = get_param("database")?;
let dbname = get_param("database")?;
Ok(Self { user, dbname: db })
// TODO see what other options should be recognized, possibly all.
let options = match get_param("search_path") {
Ok(path) => Some(format!("-c search_path={}", path)),
Err(_) => None,
};
// TODO investigate why "" is always a key
// TODO warn on unrecognized options?
Ok(Self {
user,
dbname,
options,
})
}
}
@@ -85,6 +100,7 @@ async fn handle_static(
dbname: creds.dbname.clone(),
user: creds.user.clone(),
password: Some(cleartext_password.into()),
options: creds.options,
};
client
@@ -117,15 +133,22 @@ async fn handle_existing_user(
.ok_or_else(|| anyhow!("unexpected password message"))?;
let cplane = CPlaneApi::new(&config.auth_endpoint);
let db_info = cplane
.authenticate_proxy_request(creds, md5_response, &md5_salt, &psql_session_id)
let db_info_response = cplane
.authenticate_proxy_request(&creds, md5_response, &md5_salt, &psql_session_id)
.await?;
client
.write_message_noflush(&Be::AuthenticationOk)?
.write_message_noflush(&BeParameterStatusMessage::encoding())?;
Ok(db_info)
Ok(DatabaseInfo {
host: db_info_response.host,
port: db_info_response.port,
dbname: db_info_response.dbname,
user: db_info_response.user,
password: db_info_response.password,
options: creds.options,
})
}
async fn handle_new_user(
@@ -135,7 +158,7 @@ async fn handle_new_user(
let psql_session_id = new_psql_session_id();
let greeting = hello_message(&config.redirect_uri, &psql_session_id);
let db_info = cplane_api::with_waiter(psql_session_id, |waiter| async {
let db_info_response = cplane_api::with_waiter(psql_session_id, |waiter| async {
// Give user a URL to spawn a new database
client
.write_message_noflush(&Be::AuthenticationOk)?
@@ -150,7 +173,14 @@ async fn handle_new_user(
client.write_message_noflush(&Be::NoticeResponse("Connecting to database.".into()))?;
Ok(db_info)
Ok(DatabaseInfo {
host: db_info_response.host,
port: db_info_response.port,
dbname: db_info_response.dbname,
user: db_info_response.user,
password: db_info_response.password,
options: None,
})
}
fn hello_message(redirect_uri: &str, session_id: &str) -> String {

View File

@@ -10,6 +10,7 @@ pub struct DatabaseInfo {
pub dbname: String,
pub user: String,
pub password: Option<String>,
pub options: Option<String>,
}
impl DatabaseInfo {
@@ -33,6 +34,10 @@ impl From<DatabaseInfo> for tokio_postgres::Config {
.dbname(&db_info.dbname)
.user(&db_info.user);
if let Some(options) = db_info.options {
config.options(&options);
}
if let Some(password) = db_info.password {
config.password(password);
}

View File

@@ -1,25 +1,37 @@
use crate::auth::ClientCredentials;
use crate::compute::DatabaseInfo;
use crate::waiters::{Waiter, Waiters};
use anyhow::{anyhow, bail};
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
/// Part of the legacy cplane responses
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct DatabaseInfoResponse {
pub host: String,
pub port: u16,
pub dbname: String,
pub user: String,
pub password: Option<String>,
}
lazy_static! {
static ref CPLANE_WAITERS: Waiters<Result<DatabaseInfo, String>> = Default::default();
static ref CPLANE_WAITERS: Waiters<Result<DatabaseInfoResponse, String>> = Default::default();
}
/// Give caller an opportunity to wait for cplane's reply.
pub async fn with_waiter<F, R, T>(psql_session_id: impl Into<String>, f: F) -> anyhow::Result<T>
where
F: FnOnce(Waiter<'static, Result<DatabaseInfo, String>>) -> R,
F: FnOnce(Waiter<'static, Result<DatabaseInfoResponse, String>>) -> R,
R: std::future::Future<Output = anyhow::Result<T>>,
{
let waiter = CPLANE_WAITERS.register(psql_session_id.into())?;
f(waiter).await
}
pub fn notify(psql_session_id: &str, msg: Result<DatabaseInfo, String>) -> anyhow::Result<()> {
pub fn notify(
psql_session_id: &str,
msg: Result<DatabaseInfoResponse, String>,
) -> anyhow::Result<()> {
CPLANE_WAITERS.notify(psql_session_id, msg)
}
@@ -37,11 +49,11 @@ impl<'a> CPlaneApi<'a> {
impl CPlaneApi<'_> {
pub async fn authenticate_proxy_request(
&self,
creds: ClientCredentials,
creds: &ClientCredentials,
md5_response: &[u8],
salt: &[u8; 4],
psql_session_id: &str,
) -> anyhow::Result<DatabaseInfo> {
) -> anyhow::Result<DatabaseInfoResponse> {
let mut url = reqwest::Url::parse(self.auth_endpoint)?;
url.query_pairs_mut()
.append_pair("login", &creds.user)
@@ -77,7 +89,7 @@ impl CPlaneApi<'_> {
#[derive(Serialize, Deserialize, Debug)]
#[serde(untagged)]
enum ProxyAuthResponse {
Ready { conn_info: DatabaseInfo },
Ready { conn_info: DatabaseInfoResponse },
Error { error: String },
NotReady { ready: bool }, // TODO: get rid of `ready`
}
@@ -92,13 +104,13 @@ mod tests {
// Ready
let auth: ProxyAuthResponse = serde_json::from_value(json!({
"ready": true,
"conn_info": DatabaseInfo::default(),
"conn_info": DatabaseInfoResponse::default(),
}))
.unwrap();
assert!(matches!(
auth,
ProxyAuthResponse::Ready {
conn_info: DatabaseInfo { .. }
conn_info: DatabaseInfoResponse { .. }
}
));

View File

@@ -122,7 +122,7 @@ async fn main() -> anyhow::Result<()> {
None => RouterConfig::Dynamic(auth_method),
Some(addr) => {
if let ClientAuthMethod::Password = auth_method {
let (host, port) = addr.split_once(":").unwrap();
let (host, port) = addr.split_once(':').unwrap();
RouterConfig::Static {
host: host.to_string(),
port: port.parse().unwrap(),

View File

@@ -1,4 +1,4 @@
use crate::{compute::DatabaseInfo, cplane_api};
use crate::cplane_api;
use anyhow::Context;
use serde::Deserialize;
use std::{
@@ -75,7 +75,7 @@ struct PsqlSessionResponse {
#[derive(Deserialize)]
enum PsqlSessionResult {
Success(DatabaseInfo),
Success(cplane_api::DatabaseInfoResponse),
Failure(String),
}

View File

@@ -1,4 +1,4 @@
use crate::auth;
use crate::auth::{self, ClientCredentials};
use crate::cancellation::{self, CancelClosure, CancelMap};
use crate::compute::DatabaseInfo;
use crate::config::{ProxyConfig, TlsConfig};
@@ -138,7 +138,6 @@ async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
stream.write_message(&Be::ErrorResponse(msg)).await?;
bail!(msg);
}
break Ok(Some((stream, params.try_into()?)));
}
CancelRequest(cancel_key_data) => {

View File

@@ -21,6 +21,7 @@ types-psycopg2 = "^2.9.6"
boto3 = "^1.20.40"
boto3-stubs = "^1.20.40"
moto = {version = "^3.0.0", extras = ["server"]}
backoff = "^1.11.1"
[tool.poetry.dev-dependencies]
yapf = "==0.31.0"

View File

@@ -93,7 +93,7 @@ def check_backpressure(pg: Postgres, stop_event: threading.Event, polling_interv
def test_backpressure_received_lsn_lag(zenith_env_builder: ZenithEnvBuilder):
zenith_env_builder.num_safekeepers = 1
env = zenith_env_builder.init()
env = zenith_env_builder.init_start()
# Create a branch for us
env.zenith_cli.create_branch("test_backpressure", "main")

View File

@@ -0,0 +1,14 @@
import pytest
def test_proxy_select_1(static_proxy):
static_proxy.safe_psql("select 1;")
def test_proxy_options(static_proxy):
schema_name = "tmp_schema_1"
with static_proxy.connect(schema=schema_name) as conn:
with conn.cursor() as cur:
cur.execute("SHOW search_path;")
search_path = cur.fetchall()[0][0]
assert schema_name == search_path

View File

@@ -78,8 +78,8 @@ def test_twophase(zenith_simple_env: ZenithEnv):
cur2.execute("ROLLBACK PREPARED 'insert_two'")
cur2.execute('SELECT * FROM foo')
assert cur2.fetchall() == [('one', ), ('three', )] # type: ignore[comparison-overlap]
assert cur2.fetchall() == [('one', ), ('three', )]
# Only one committed insert is visible on the original branch
cur.execute('SELECT * FROM foo')
assert cur.fetchall() == [('three', )] # type: ignore[comparison-overlap]
assert cur.fetchall() == [('three', )]

View File

@@ -32,6 +32,7 @@ from typing_extensions import Literal
import pytest
import requests
import backoff # type: ignore
from .utils import (get_self_dir, mkdir_if_needed, subprocess_capture)
from fixtures.log_helper import log
@@ -237,14 +238,24 @@ def port_distributor(worker_base_port):
class PgProtocol:
""" Reusable connection logic """
def __init__(self, host: str, port: int, username: Optional[str] = None):
def __init__(self,
host: str,
port: int,
username: Optional[str] = None,
password: Optional[str] = None,
dbname: Optional[str] = None,
schema: Optional[str] = None):
self.host = host
self.port = port
self.username = username
self.password = password
self.dbname = dbname
self.schema = schema
def connstr(self,
*,
dbname: str = 'postgres',
dbname: Optional[str] = None,
schema: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None) -> str:
"""
@@ -252,6 +263,9 @@ class PgProtocol:
"""
username = username or self.username
password = password or self.password
dbname = dbname or self.dbname or "postgres"
schema = schema or self.schema
res = f'host={self.host} port={self.port} dbname={dbname}'
if username:
@@ -260,13 +274,17 @@ class PgProtocol:
if password:
res = f'{res} password={password}'
if schema:
res = f"{res} options='-c search_path={schema}'"
return res
# autocommit=True here by default because that's what we need most of the time
def connect(self,
*,
autocommit=True,
dbname: str = 'postgres',
dbname: Optional[str] = None,
schema: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None) -> PgConnection:
"""
@@ -275,11 +293,13 @@ class PgProtocol:
This method passes all extra params to connstr.
"""
conn = psycopg2.connect(self.connstr(
dbname=dbname,
username=username,
password=password,
))
conn = psycopg2.connect(
self.connstr(
dbname=dbname,
schema=schema,
username=username,
password=password,
))
# WARNING: this setting affects *all* tests!
conn.autocommit = autocommit
return conn
@@ -401,6 +421,7 @@ class ZenithEnvBuilder:
repo_dir: Path,
port_distributor: PortDistributor,
pageserver_remote_storage: Optional[RemoteStorage] = None,
pageserver_config_override: Optional[str] = None,
num_safekeepers: int = 0,
pageserver_auth_enabled: bool = False,
rust_log_override: Optional[str] = None):
@@ -408,6 +429,7 @@ class ZenithEnvBuilder:
self.rust_log_override = rust_log_override
self.port_distributor = port_distributor
self.pageserver_remote_storage = pageserver_remote_storage
self.pageserver_config_override = pageserver_config_override
self.num_safekeepers = num_safekeepers
self.pageserver_auth_enabled = pageserver_auth_enabled
self.env: Optional[ZenithEnv] = None
@@ -557,7 +579,8 @@ class ZenithEnv:
# Create a corresponding ZenithPageserver object
self.pageserver = ZenithPageserver(self,
port=pageserver_port,
remote_storage=config.pageserver_remote_storage)
remote_storage=config.pageserver_remote_storage,
config_override=config.pageserver_config_override)
# Create config and a Safekeeper object for each safekeeper
for i in range(1, config.num_safekeepers + 1):
@@ -837,14 +860,17 @@ class ZenithCli:
tmp.flush()
cmd = ['init', f'--config={tmp.name}']
append_pageserver_param_overrides(cmd, self.env.pageserver.remote_storage)
append_pageserver_param_overrides(cmd,
self.env.pageserver.remote_storage,
self.env.pageserver.config_override)
return self.raw_cli(cmd)
def pageserver_start(self, overrides=()) -> 'subprocess.CompletedProcess[str]':
start_args = ['pageserver', 'start', *overrides]
append_pageserver_param_overrides(start_args, self.env.pageserver.remote_storage)
append_pageserver_param_overrides(start_args,
self.env.pageserver.remote_storage,
self.env.pageserver.config_override)
return self.raw_cli(start_args)
def pageserver_stop(self, immediate=False) -> 'subprocess.CompletedProcess[str]':
@@ -989,16 +1015,19 @@ class ZenithPageserver(PgProtocol):
env: ZenithEnv,
port: PageserverPort,
remote_storage: Optional[RemoteStorage] = None,
config_override: Optional[str] = None,
enable_auth=False):
super().__init__(host='localhost', port=port.pg, username='zenith_admin')
self.env = env
self.running = False
self.service_port = port # do not shadow PgProtocol.port which is just int
self.remote_storage = remote_storage
self.config_override = config_override
def start(self, overrides=()) -> 'ZenithPageserver':
"""
Start the page server.
`overrides` allows to add some config to this pageserver start.
Returns self.
"""
assert self.running == False
@@ -1031,8 +1060,11 @@ class ZenithPageserver(PgProtocol):
)
def append_pageserver_param_overrides(params_to_update: List[str],
pageserver_remote_storage: Optional[RemoteStorage]):
def append_pageserver_param_overrides(
params_to_update: List[str],
pageserver_remote_storage: Optional[RemoteStorage],
pageserver_config_override: Optional[str] = None,
):
if pageserver_remote_storage is not None:
if isinstance(pageserver_remote_storage, LocalFsStorage):
pageserver_storage_override = f"local_path='{pageserver_remote_storage.root}'"
@@ -1058,6 +1090,12 @@ def append_pageserver_param_overrides(params_to_update: List[str],
f'--pageserver-config-override={o.strip()}' for o in env_overrides.split(';')
]
if pageserver_config_override is not None:
params_to_update += [
f'--pageserver-config-override={o.strip()}'
for o in pageserver_config_override.split(';')
]
class PgBin:
""" A helper class for executing postgres binaries """
@@ -1164,6 +1202,50 @@ def vanilla_pg(test_output_dir: str) -> Iterator[VanillaPostgres]:
yield vanilla_pg
class ZenithProxy(PgProtocol):
def __init__(self, port: int):
super().__init__(host="127.0.0.1", username="pytest", password="pytest", port=port)
self.http_port = 7001
self._popen: Optional[subprocess.Popen[bytes]] = None
def start_static(self, addr="127.0.0.1:5432") -> None:
assert self._popen is None
# Start proxy
bin_proxy = os.path.join(str(zenith_binpath), 'proxy')
args = [bin_proxy]
args.extend(["--http", f"{self.host}:{self.http_port}"])
args.extend(["--proxy", f"{self.host}:{self.port}"])
args.extend(["--auth-method", "password"])
args.extend(["--static-router", addr])
self._popen = subprocess.Popen(args)
self._wait_until_ready()
@backoff.on_exception(backoff.expo, requests.exceptions.RequestException, max_time=10)
def _wait_until_ready(self):
requests.get(f"http://{self.host}:{self.http_port}/v1/status")
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
if self._popen is not None:
# NOTE the process will die when we're done with tests anyway, because
# it's a child process. This is mostly to clean up in between different tests.
self._popen.kill()
@pytest.fixture(scope='function')
def static_proxy(vanilla_pg) -> Iterator[ZenithProxy]:
"""Zenith proxy that routes directly to vanilla postgres."""
vanilla_pg.start()
vanilla_pg.safe_psql("create user pytest with password 'pytest';")
with ZenithProxy(4432) as proxy:
proxy.start_static()
yield proxy
class Postgres(PgProtocol):
""" An object representing a running postgres daemon. """
def __init__(self, env: ZenithEnv, tenant_id: uuid.UUID, port: int):

View File

@@ -41,6 +41,12 @@ fn main() -> Result<()> {
.takes_value(true)
.help("Path to the safekeeper data directory"),
)
.arg(
Arg::new("init")
.long("init")
.takes_value(false)
.help("Initialize safekeeper with ID"),
)
.arg(
Arg::new("listen-pg")
.short('l')
@@ -151,10 +157,10 @@ fn main() -> Result<()> {
));
}
start_safekeeper(conf, given_id)
start_safekeeper(conf, given_id, arg_matches.is_present("init"))
}
fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<ZNodeId>) -> Result<()> {
fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<ZNodeId>, init: bool) -> Result<()> {
let log_file = logging::init("safekeeper.log", conf.daemonize)?;
info!("version: {}", GIT_VERSION);
@@ -171,6 +177,9 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<ZNodeId>) -> Resu
// Set or read our ID.
set_id(&mut conf, given_id)?;
if init {
return Ok(());
}
let http_listener = tcp_listener::bind(conf.listen_http_addr.clone()).map_err(|e| {
error!("failed to bind to address {}: {}", conf.listen_http_addr, e);

View File

@@ -3,6 +3,7 @@
use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage};
use crate::receive_wal::ReceiveWalConn;
use crate::safekeeper::{AcceptorProposerMessage, ProposerAcceptorMessage};
use crate::send_wal::ReplicationConn;
use crate::timeline::{Timeline, TimelineTools};
use crate::SafeKeeperConf;
@@ -160,6 +161,17 @@ impl SafekeeperPostgresHandler {
}
}
/// Shortcut for calling `process_msg` in the timeline.
pub fn process_safekeeper_msg(
&self,
msg: &ProposerAcceptorMessage,
) -> Result<Option<AcceptorProposerMessage>> {
self.timeline
.get()
.process_msg(msg)
.context("failed to process ProposerAcceptorMessage")
}
///
/// Handle IDENTIFY_SYSTEM replication command
///

View File

@@ -2,15 +2,21 @@
//! Gets messages from the network, passes them down to consensus module and
//! sends replies back.
use anyhow::{bail, Context, Result};
use bytes::Bytes;
use anyhow::{anyhow, bail, Result};
use bytes::BytesMut;
use tokio::sync::mpsc::UnboundedSender;
use tracing::*;
use zenith_utils::sock_split::ReadStream;
use crate::timeline::Timeline;
use std::net::SocketAddr;
use std::sync::mpsc::channel;
use std::sync::mpsc::Receiver;
use std::sync::Arc;
use std::thread;
use crate::safekeeper::AcceptorProposerMessage;
use crate::safekeeper::ProposerAcceptorMessage;
@@ -46,21 +52,6 @@ impl<'pg> ReceiveWalConn<'pg> {
}
}
// Read and extract the bytes of a `CopyData` message from the postgres instance
fn read_msg_bytes(&mut self) -> Result<Bytes> {
match self.pg_backend.read_message()? {
Some(FeMessage::CopyData(bytes)) => Ok(bytes),
Some(msg) => bail!("expected `CopyData` message, found {:?}", msg),
None => bail!("connection closed unexpectedly"),
}
}
// Read and parse message sent from the postgres instance
fn read_msg(&mut self) -> Result<ProposerAcceptorMessage> {
let data = self.read_msg_bytes()?;
ProposerAcceptorMessage::parse(data)
}
// Send message to the postgres
fn write_msg(&mut self, msg: &AcceptorProposerMessage) -> Result<()> {
let mut buf = BytesMut::with_capacity(128);
@@ -77,18 +68,22 @@ impl<'pg> ReceiveWalConn<'pg> {
self.pg_backend
.write_message(&BeMessage::CopyBothResponse)?;
let r = self
.pg_backend
.take_stream_in()
.ok_or_else(|| anyhow!("failed to take read stream from pgbackend"))?;
let mut poll_reader = ProposerPollStream::new(r)?;
// Receive information about server
let mut msg = self
.read_msg()
.context("failed to receive proposer greeting")?;
match msg {
let next_msg = poll_reader.recv_msg()?;
match next_msg {
ProposerAcceptorMessage::Greeting(ref greeting) => {
info!(
"start handshake with wal proposer {} sysid {} timeline {}",
self.peer_addr, greeting.system_id, greeting.tli,
);
}
_ => bail!("unexpected message {:?} instead of greeting", msg),
_ => bail!("unexpected message {:?} instead of greeting", next_msg),
}
// Register the connection and defer unregister.
@@ -100,16 +95,97 @@ impl<'pg> ReceiveWalConn<'pg> {
callmemaybe_tx: spg.tx.clone(),
};
let mut next_msg = Some(next_msg);
loop {
let reply = spg
.timeline
.get()
.process_msg(&msg)
.context("failed to process ProposerAcceptorMessage")?;
if let Some(reply) = reply {
self.write_msg(&reply)?;
if matches!(next_msg, Some(ProposerAcceptorMessage::AppendRequest(_))) {
// poll AppendRequest's without blocking and write WAL to disk without flushing,
// while it's readily available
while let Some(ProposerAcceptorMessage::AppendRequest(append_request)) = next_msg {
let msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
let reply = spg.process_safekeeper_msg(&msg)?;
if let Some(reply) = reply {
self.write_msg(&reply)?;
}
next_msg = poll_reader.poll_msg();
}
// flush all written WAL to the disk
let reply = spg.process_safekeeper_msg(&ProposerAcceptorMessage::FlushWAL)?;
if let Some(reply) = reply {
self.write_msg(&reply)?;
}
} else if let Some(msg) = next_msg.take() {
// process other message
let reply = spg.process_safekeeper_msg(&msg)?;
if let Some(reply) = reply {
self.write_msg(&reply)?;
}
}
msg = self.read_msg()?;
// blocking wait for the next message
if next_msg.is_none() {
next_msg = Some(poll_reader.recv_msg()?);
}
}
}
}
struct ProposerPollStream {
msg_rx: Receiver<ProposerAcceptorMessage>,
read_thread: Option<thread::JoinHandle<Result<()>>>,
}
impl ProposerPollStream {
fn new(mut r: ReadStream) -> Result<Self> {
let (msg_tx, msg_rx) = channel();
let read_thread = thread::Builder::new()
.name("Read WAL thread".into())
.spawn(move || -> Result<()> {
loop {
let copy_data = match FeMessage::read(&mut r)? {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(msg) => bail!("expected `CopyData` message, found {:?}", msg),
None => bail!("connection closed unexpectedly"),
};
let msg = ProposerAcceptorMessage::parse(copy_data)?;
msg_tx.send(msg)?;
}
// msg_tx will be dropped here, this will also close msg_rx
})?;
Ok(Self {
msg_rx,
read_thread: Some(read_thread),
})
}
fn recv_msg(&mut self) -> Result<ProposerAcceptorMessage> {
self.msg_rx.recv().map_err(|_| {
// return error from the read thread
let res = match self.read_thread.take() {
Some(thread) => thread.join(),
None => return anyhow!("read thread is gone"),
};
match res {
Ok(Ok(())) => anyhow!("unexpected result from read thread"),
Err(err) => anyhow!("read thread panicked: {:?}", err),
Ok(Err(err)) => err,
}
})
}
fn poll_msg(&mut self) -> Option<ProposerAcceptorMessage> {
let res = self.msg_rx.try_recv();
match res {
Err(_) => None,
Ok(msg) => Some(msg),
}
}
}

View File

@@ -301,6 +301,8 @@ pub enum ProposerAcceptorMessage {
VoteRequest(VoteRequest),
Elected(ProposerElected),
AppendRequest(AppendRequest),
NoFlushAppendRequest(AppendRequest),
FlushWAL,
}
impl ProposerAcceptorMessage {
@@ -499,7 +501,11 @@ where
ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg),
ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg),
ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg),
ProposerAcceptorMessage::AppendRequest(msg) => self.handle_append_request(msg),
ProposerAcceptorMessage::AppendRequest(msg) => self.handle_append_request(msg, true),
ProposerAcceptorMessage::NoFlushAppendRequest(msg) => {
self.handle_append_request(msg, false)
}
ProposerAcceptorMessage::FlushWAL => self.handle_flush(),
}
}
@@ -605,7 +611,10 @@ where
return Ok(None);
}
// truncate wal, update the lsns
// TODO: cross check divergence point, check if msg.start_streaming_at corresponds to
// intersection of our history and history from msg
// truncate wal, update the LSNs
self.wal_store.truncate_wal(msg.start_streaming_at)?;
// and now adopt term history from proposer
@@ -622,6 +631,7 @@ where
fn handle_append_request(
&mut self,
msg: &AppendRequest,
mut require_flush: bool,
) -> Result<Option<AcceptorProposerMessage>> {
if self.s.acceptor_state.term < msg.h.term {
bail!("got AppendRequest before ProposerElected");
@@ -650,9 +660,15 @@ where
if self.s.wal_start_lsn == Lsn(0) {
self.s.wal_start_lsn = msg.h.begin_lsn;
sync_control_file = true;
require_flush = true;
}
}
// flush wal to the disk, if required
if require_flush {
self.wal_store.flush_wal()?;
}
// Advance commit_lsn taking into account what we have locally.
// commit_lsn can be 0, being unknown to new walproposer while he hasn't
// collected majority of its epoch acks yet, ignore it in this case.
@@ -670,11 +686,9 @@ where
}
self.truncate_lsn = msg.h.truncate_lsn;
/*
* Update truncate and commit LSN in control file.
* To avoid negative impact on performance of extra fsync, do it only
* when truncate_lsn delta exceeds WAL segment size.
*/
// Update truncate and commit LSN in control file.
// To avoid negative impact on performance of extra fsync, do it only
// when truncate_lsn delta exceeds WAL segment size.
sync_control_file |=
self.s.truncate_lsn + (self.s.server.wal_seg_size as u64) < self.truncate_lsn;
if sync_control_file {
@@ -686,6 +700,11 @@ where
self.control_store.persist(&self.s)?;
}
// If flush_lsn hasn't updated, AppendResponse is not very useful.
if !require_flush {
return Ok(None);
}
let resp = self.append_response();
trace!(
"processed AppendRequest of len {}, end_lsn={:?}, commit_lsn={:?}, truncate_lsn={:?}, resp {:?}",
@@ -697,6 +716,14 @@ where
);
Ok(Some(AcceptorProposerMessage::AppendResponse(resp)))
}
/// Flush WAL to disk. Return AppendResponse with latest LSNs.
fn handle_flush(&mut self) -> Result<Option<AcceptorProposerMessage>> {
self.wal_store.flush_wal()?;
Ok(Some(AcceptorProposerMessage::AppendResponse(
self.append_response(),
)))
}
}
#[cfg(test)]
@@ -738,6 +765,10 @@ mod tests {
self.lsn = end_pos;
Ok(())
}
fn flush_wal(&mut self) -> Result<()> {
Ok(())
}
}
#[test]

View File

@@ -7,7 +7,7 @@
//!
//! Note that last file has `.partial` suffix, that's different from postgres.
use anyhow::{anyhow, Context, Result};
use anyhow::{anyhow, bail, Context, Result};
use std::io::{Read, Seek, SeekFrom};
use lazy_static::lazy_static;
@@ -58,12 +58,20 @@ lazy_static! {
DISK_WRITE_SECONDS_BUCKETS.to_vec()
)
.expect("Failed to register safekeeper_write_wal_seconds histogram vec");
static ref FLUSH_WAL_SECONDS: HistogramVec = register_histogram_vec!(
"safekeeper_flush_wal_seconds",
"Seconds spent syncing WAL to a disk, grouped by timeline",
&["tenant_id", "timeline_id"],
DISK_WRITE_SECONDS_BUCKETS.to_vec()
)
.expect("Failed to register safekeeper_flush_wal_seconds histogram vec");
}
struct WalStorageMetrics {
flush_lsn: Gauge,
write_wal_bytes: Histogram,
write_wal_seconds: Histogram,
flush_wal_seconds: Histogram,
}
impl WalStorageMetrics {
@@ -74,24 +82,38 @@ impl WalStorageMetrics {
flush_lsn: FLUSH_LSN_GAUGE.with_label_values(&[&tenant_id, &timeline_id]),
write_wal_bytes: WRITE_WAL_BYTES.with_label_values(&[&tenant_id, &timeline_id]),
write_wal_seconds: WRITE_WAL_SECONDS.with_label_values(&[&tenant_id, &timeline_id]),
flush_wal_seconds: FLUSH_WAL_SECONDS.with_label_values(&[&tenant_id, &timeline_id]),
}
}
}
pub trait Storage {
/// lsn of last durably stored WAL record.
/// LSN of last durably stored WAL record.
fn flush_lsn(&self) -> Lsn;
/// Init storage with wal_seg_size and read WAL from disk to get latest lsn.
/// Init storage with wal_seg_size and read WAL from disk to get latest LSN.
fn init_storage(&mut self, state: &SafeKeeperState) -> Result<()>;
/// Write piece of wal in buf to disk and sync it.
/// Write piece of WAL from buf to disk, but not necessarily sync it.
fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()>;
// Truncate WAL at specified LSN.
/// Truncate WAL at specified LSN, which must be the end of WAL record.
fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()>;
/// Durably store WAL on disk, up to the last written WAL record.
fn flush_wal(&mut self) -> Result<()>;
}
/// PhysicalStorage is a storage that stores WAL on disk. Writes are separated from flushes
/// for better performance. Storage must be initialized before use.
///
/// WAL is stored in segments, each segment is a file. Last segment has ".partial" suffix in
/// its filename and may be not fully flushed.
///
/// Relationship of LSNs:
/// `write_lsn` >= `write_record_lsn` >= `flush_record_lsn`
///
/// When storage is just created, all LSNs are zeroes and there are no segments on disk.
pub struct PhysicalStorage {
metrics: WalStorageMetrics,
zttid: ZTenantTimelineId,
@@ -99,27 +121,29 @@ pub struct PhysicalStorage {
conf: SafeKeeperConf,
// fields below are filled upon initialization
// None if unitialized, Some(lsn) if storage is initialized
/// None if unitialized, Some(usize) if storage is initialized.
wal_seg_size: Option<usize>,
// Relationship of lsns:
// `write_lsn` >= `write_record_lsn` >= `flush_record_lsn`
//
// All lsns are zeroes, if storage is just created, and there are no segments on disk.
// Written to disk, but possibly still in the cache and not fully persisted.
// Also can be ahead of record_lsn, if happen to be in the middle of a WAL record.
/// Written to disk, but possibly still in the cache and not fully persisted.
/// Also can be ahead of record_lsn, if happen to be in the middle of a WAL record.
write_lsn: Lsn,
// The LSN of the last WAL record written to disk. Still can be not fully flushed.
/// The LSN of the last WAL record written to disk. Still can be not fully flushed.
write_record_lsn: Lsn,
// The LSN of the last WAL record flushed to disk.
/// The LSN of the last WAL record flushed to disk.
flush_record_lsn: Lsn,
// Decoder is required for detecting boundaries of WAL records.
/// Decoder is required for detecting boundaries of WAL records.
decoder: WalStreamDecoder,
/// Cached open file for the last segment.
///
/// If Some(file) is open, then it always:
/// - has ".partial" suffix
/// - points to write_lsn, so no seek is needed for writing
/// - doesn't point to the end of the segment
file: Option<File>,
}
impl PhysicalStorage {
@@ -135,128 +159,146 @@ impl PhysicalStorage {
write_record_lsn: Lsn(0),
flush_record_lsn: Lsn(0),
decoder: WalStreamDecoder::new(Lsn(0)),
file: None,
}
}
// wrapper for flush_lsn updates that also updates metrics
/// Wrapper for flush_lsn updates that also updates metrics.
fn update_flush_lsn(&mut self) {
self.flush_record_lsn = self.write_record_lsn;
self.metrics.flush_lsn.set(self.flush_record_lsn.0 as f64);
}
/// Helper returning full path to WAL segment file and its .partial brother.
fn wal_file_paths(&self, segno: XLogSegNo) -> Result<(PathBuf, PathBuf)> {
let wal_seg_size = self
.wal_seg_size
.ok_or_else(|| anyhow!("wal_seg_size is not initialized"))?;
let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
let wal_file_path = self.timeline_dir.join(wal_file_name.clone());
let wal_file_partial_path = self.timeline_dir.join(wal_file_name + ".partial");
Ok((wal_file_path, wal_file_partial_path))
/// Call fdatasync if config requires so.
fn fdatasync_file(&self, file: &mut File) -> Result<()> {
if !self.conf.no_sync {
self.metrics
.flush_wal_seconds
.observe_closure_duration(|| file.sync_data())?;
}
Ok(())
}
// TODO: this function is going to be refactored soon, what will change:
// - flush will be called separately from write_wal, this function
// will only write bytes to disk
// - File will be cached in PhysicalStorage, to remove extra syscalls,
// such as open(), seek(), close()
fn write_and_flush(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
/// Call fsync if config requires so.
fn fsync_file(&self, file: &mut File) -> Result<()> {
if !self.conf.no_sync {
self.metrics
.flush_wal_seconds
.observe_closure_duration(|| file.sync_all())?;
}
Ok(())
}
/// Open or create WAL segment file. Caller must call seek to the wanted position.
/// Returns `file` and `is_partial`.
fn open_or_create(&self, segno: XLogSegNo, wal_seg_size: usize) -> Result<(File, bool)> {
let (wal_file_path, wal_file_partial_path) =
wal_file_paths(&self.timeline_dir, segno, wal_seg_size)?;
// Try to open already completed segment
if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) {
Ok((file, false))
} else if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_partial_path) {
// Try to open existing partial file
Ok((file, true))
} else {
// Create and fill new partial file
let mut file = OpenOptions::new()
.create(true)
.write(true)
.open(&wal_file_partial_path)
.with_context(|| format!("Failed to open log file {:?}", &wal_file_path))?;
write_zeroes(&mut file, wal_seg_size)?;
self.fsync_file(&mut file)?;
Ok((file, true))
}
}
/// Write WAL bytes, which are known to be located in a single WAL segment.
fn write_in_segment(
&mut self,
segno: u64,
xlogoff: usize,
buf: &[u8],
wal_seg_size: usize,
) -> Result<()> {
let mut file = if let Some(file) = self.file.take() {
file
} else {
let (mut file, is_partial) = self.open_or_create(segno, wal_seg_size)?;
assert!(is_partial, "unexpected write into non-partial segment file");
file.seek(SeekFrom::Start(xlogoff as u64))?;
file
};
file.write_all(buf)?;
if xlogoff + buf.len() == wal_seg_size {
// If we reached the end of a WAL segment, flush and close it.
self.fdatasync_file(&mut file)?;
// Rename partial file to completed file
let (wal_file_path, wal_file_partial_path) =
wal_file_paths(&self.timeline_dir, segno, wal_seg_size)?;
fs::rename(&wal_file_partial_path, &wal_file_path)?;
} else {
// otherwise, file can be reused later
self.file = Some(file);
}
Ok(())
}
/// Writes WAL to the segment files, until everything is writed. If some segments
/// are fully written, they are flushed to disk. The last (partial) segment can
/// be flushed separately later.
///
/// Updates `write_lsn`.
fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> {
let wal_seg_size = self
.wal_seg_size
.ok_or_else(|| anyhow!("wal_seg_size is not initialized"))?;
let mut bytes_left: usize = buf.len();
let mut bytes_written: usize = 0;
let mut partial;
let mut start_pos = startpos;
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
/* Extract WAL location for this block */
let mut xlogoff = start_pos.segment_offset(wal_seg_size) as usize;
while bytes_left != 0 {
let bytes_to_write;
/*
* If crossing a WAL boundary, only write up until we reach wal
* segment size.
*/
if xlogoff + bytes_left > wal_seg_size {
bytes_to_write = wal_seg_size - xlogoff;
} else {
bytes_to_write = bytes_left;
if self.write_lsn != pos {
// need to flush the file before discarding it
if let Some(mut file) = self.file.take() {
self.fdatasync_file(&mut file)?;
}
/* Open file */
let segno = start_pos.segment_number(wal_seg_size);
let (wal_file_path, wal_file_partial_path) = self.wal_file_paths(segno)?;
{
let mut wal_file: File;
/* Try to open already completed segment */
if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) {
wal_file = file;
partial = false;
} else if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_partial_path)
{
/* Try to open existed partial file */
wal_file = file;
partial = true;
} else {
/* Create and fill new partial file */
partial = true;
match OpenOptions::new()
.create(true)
.write(true)
.open(&wal_file_partial_path)
{
Ok(mut file) => {
for _ in 0..(wal_seg_size / XLOG_BLCKSZ) {
file.write_all(ZERO_BLOCK)?;
}
wal_file = file;
}
Err(e) => {
error!("Failed to open log file {:?}: {}", &wal_file_path, e);
return Err(e.into());
}
}
}
wal_file.seek(SeekFrom::Start(xlogoff as u64))?;
wal_file.write_all(&buf[bytes_written..(bytes_written + bytes_to_write)])?;
// Flush file, if not said otherwise
if !self.conf.no_sync {
wal_file.sync_all()?;
}
}
/* Write was successful, advance our position */
bytes_written += bytes_to_write;
bytes_left -= bytes_to_write;
start_pos += bytes_to_write as u64;
xlogoff += bytes_to_write;
/* Did we reach the end of a WAL segment? */
if start_pos.segment_offset(wal_seg_size) == 0 {
xlogoff = 0;
if partial {
fs::rename(&wal_file_partial_path, &wal_file_path)?;
}
}
self.write_lsn = pos;
}
while !buf.is_empty() {
// Extract WAL location for this block
let xlogoff = self.write_lsn.segment_offset(wal_seg_size) as usize;
let segno = self.write_lsn.segment_number(wal_seg_size);
// If crossing a WAL boundary, only write up until we reach wal segment size.
let bytes_write = if xlogoff + buf.len() > wal_seg_size {
wal_seg_size - xlogoff
} else {
buf.len()
};
self.write_in_segment(segno, xlogoff, &buf[..bytes_write], wal_seg_size)?;
self.write_lsn += bytes_write as u64;
buf = &buf[bytes_write..];
}
Ok(())
}
}
impl Storage for PhysicalStorage {
// flush_lsn returns lsn of last durably stored WAL record.
/// flush_lsn returns LSN of last durably stored WAL record.
fn flush_lsn(&self) -> Lsn {
self.flush_record_lsn
}
// Storage needs to know wal_seg_size to know which segment to read/write, but
// wal_seg_size is not always known at the moment of storage creation. This method
// allows to postpone its initialization.
/// Storage needs to know wal_seg_size to know which segment to read/write, but
/// wal_seg_size is not always known at the moment of storage creation. This method
/// allows to postpone its initialization.
fn init_storage(&mut self, state: &SafeKeeperState) -> Result<()> {
if state.server.wal_seg_size == 0 {
// wal_seg_size is still unknown
@@ -294,29 +336,31 @@ impl Storage for PhysicalStorage {
Ok(())
}
// Write and flush WAL to disk.
/// Write WAL to disk.
fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
// Disallow any non-sequential writes, which can result in gaps or overwrites.
// If we need to move the pointer, use truncate_wal() instead.
if self.write_lsn > startpos {
warn!(
bail!(
"write_wal rewrites WAL written before, write_lsn={}, startpos={}",
self.write_lsn, startpos
self.write_lsn,
startpos
);
}
if self.write_lsn < startpos {
warn!(
if self.write_lsn < startpos && self.write_lsn != Lsn(0) {
bail!(
"write_wal creates gap in written WAL, write_lsn={}, startpos={}",
self.write_lsn, startpos
self.write_lsn,
startpos
);
// TODO: return error if write_lsn is not zero
}
{
let _timer = self.metrics.write_wal_seconds.start_timer();
self.write_and_flush(startpos, buf)?;
self.write_exact(startpos, buf)?;
}
// WAL is written and flushed, updating lsns
self.write_lsn = startpos + buf.len() as u64;
// WAL is written, updating write metrics
self.metrics.write_wal_bytes.observe(buf.len() as f64);
// figure out last record's end lsn for reporting (if we got the
@@ -339,69 +383,67 @@ impl Storage for PhysicalStorage {
}
}
Ok(())
}
fn flush_wal(&mut self) -> Result<()> {
if self.flush_record_lsn == self.write_record_lsn {
// no need to do extra flush
return Ok(());
}
if let Some(mut unflushed_file) = self.file.take() {
self.fdatasync_file(&mut unflushed_file)?;
self.file = Some(unflushed_file);
} else {
// We have unflushed data (write_lsn != flush_lsn), but no file.
// This should only happen if last file was fully written and flushed,
// but haven't updated flush_lsn yet.
assert!(self.write_lsn.segment_offset(self.wal_seg_size.unwrap()) == 0);
}
// everything is flushed now, let's update flush_lsn
self.update_flush_lsn();
Ok(())
}
// Truncate written WAL by removing all WAL segments after the given LSN.
// end_pos must point to the end of the WAL record.
/// Truncate written WAL by removing all WAL segments after the given LSN.
/// end_pos must point to the end of the WAL record.
fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
let wal_seg_size = self
.wal_seg_size
.ok_or_else(|| anyhow!("wal_seg_size is not initialized"))?;
// TODO: cross check divergence point
// nothing to truncate
if self.write_lsn == Lsn(0) {
return Ok(());
}
// Streaming must not create a hole, so truncate cannot be called on non-written lsn
assert!(self.write_lsn >= end_pos);
assert!(self.write_lsn == Lsn(0) || self.write_lsn >= end_pos);
// open segment files and delete or fill end with zeroes
let partial;
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
/* Extract WAL location for this block */
let mut xlogoff = end_pos.segment_offset(wal_seg_size) as usize;
/* Open file */
let mut segno = end_pos.segment_number(wal_seg_size);
let (wal_file_path, wal_file_partial_path) = self.wal_file_paths(segno)?;
{
let mut wal_file: File;
/* Try to open already completed segment */
if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) {
wal_file = file;
partial = false;
} else {
wal_file = OpenOptions::new()
.write(true)
.open(&wal_file_partial_path)?;
partial = true;
}
wal_file.seek(SeekFrom::Start(xlogoff as u64))?;
while xlogoff < wal_seg_size {
let bytes_to_write = min(XLOG_BLCKSZ, wal_seg_size - xlogoff);
wal_file.write_all(&ZERO_BLOCK[0..bytes_to_write])?;
xlogoff += bytes_to_write;
}
// Flush file, if not said otherwise
if !self.conf.no_sync {
wal_file.sync_all()?;
}
// Close previously opened file, if any
if let Some(mut unflushed_file) = self.file.take() {
self.fdatasync_file(&mut unflushed_file)?;
}
if !partial {
let xlogoff = end_pos.segment_offset(wal_seg_size) as usize;
let segno = end_pos.segment_number(wal_seg_size);
let (mut file, is_partial) = self.open_or_create(segno, wal_seg_size)?;
// Fill end with zeroes
file.seek(SeekFrom::Start(xlogoff as u64))?;
write_zeroes(&mut file, wal_seg_size - xlogoff)?;
self.fdatasync_file(&mut file)?;
if !is_partial {
// Make segment partial once again
let (wal_file_path, wal_file_partial_path) =
wal_file_paths(&self.timeline_dir, segno, wal_seg_size)?;
fs::rename(&wal_file_path, &wal_file_partial_path)?;
}
// Remove all subsequent segments
let mut segno = segno;
loop {
segno += 1;
let (wal_file_path, wal_file_partial_path) = self.wal_file_paths(segno)?;
let (wal_file_path, wal_file_partial_path) =
wal_file_paths(&self.timeline_dir, segno, wal_seg_size)?;
// TODO: better use fs::try_exists which is currenty avaialble only in nightly build
if wal_file_path.exists() {
fs::remove_file(&wal_file_path)?;
@@ -412,7 +454,7 @@ impl Storage for PhysicalStorage {
}
}
// Update lsns
// Update LSNs
self.write_lsn = end_pos;
self.write_record_lsn = end_pos;
self.update_flush_lsn();
@@ -491,3 +533,28 @@ impl WalReader {
})
}
}
/// Zero block for filling created WAL segments.
const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
/// Helper for filling file with zeroes.
fn write_zeroes(file: &mut File, mut count: usize) -> Result<()> {
while count >= XLOG_BLCKSZ {
file.write_all(ZERO_BLOCK)?;
count -= XLOG_BLCKSZ;
}
file.write_all(&ZERO_BLOCK[0..count])?;
Ok(())
}
/// Helper returning full path to WAL segment file and its .partial brother.
fn wal_file_paths(
timeline_dir: &Path,
segno: XLogSegNo,
wal_seg_size: usize,
) -> Result<(PathBuf, PathBuf)> {
let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
let wal_file_path = timeline_dir.join(wal_file_name.clone());
let wal_file_partial_path = timeline_dir.join(wal_file_name + ".partial");
Ok((wal_file_path, wal_file_partial_path))
}

View File

@@ -392,7 +392,7 @@ fn get_tenantid(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<ZTe
if let Some(tenantid_cmd) = sub_match.value_of("tenantid") {
Ok(ZTenantId::from_str(tenantid_cmd)?)
} else if let Some(tenantid_conf) = env.default_tenantid {
Ok(tenantid_conf)
Ok(ZTenantId::from(tenantid_conf))
} else {
bail!("No tenantid. Use --tenantid, or set 'default_tenantid' in the config file");
}
@@ -418,7 +418,7 @@ fn handle_init(init_match: &ArgMatches) -> Result<()> {
let pageserver = PageServerNode::from_env(&env);
if let Err(e) = pageserver.init(
// default_tenantid was generated by the `env.init()` call above
Some(&env.default_tenantid.unwrap().to_string()),
Some(&ZTenantId::from(env.default_tenantid.unwrap()).to_string()),
&pageserver_config_overrides(init_match),
) {
eprintln!("pageserver init failed: {}", e);

View File

@@ -5,9 +5,7 @@
// The second one is that we wanted to use ed25519 keys, but they are also not supported until next version. So we go with RSA keys for now.
// Relevant issue: https://github.com/Keats/jsonwebtoken/issues/162
use hex::{self, FromHex};
use serde::de::Error;
use serde::{self, Deserializer, Serializer};
use serde;
use std::fs;
use std::path::Path;
@@ -17,7 +15,7 @@ use jsonwebtoken::{
};
use serde::{Deserialize, Serialize};
use crate::zid::ZTenantId;
use crate::zid::{HexZTenantId, ZTenantId};
const JWT_ALGORITHM: Algorithm = Algorithm::RS256;
@@ -28,44 +26,18 @@ pub enum Scope {
PageServerApi,
}
pub fn to_hex_option<S>(value: &Option<ZTenantId>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match value {
Some(tid) => hex::serialize(tid, serializer),
None => Option::serialize(value, serializer),
}
}
fn from_hex_option<'de, D>(deserializer: D) -> Result<Option<ZTenantId>, D::Error>
where
D: Deserializer<'de>,
{
let opt: Option<String> = Option::deserialize(deserializer)?;
match opt {
Some(tid) => Ok(Some(ZTenantId::from_hex(tid).map_err(Error::custom)?)),
None => Ok(None),
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Claims {
// this custom serialize/deserialize_with is needed because Option is not transparent to serde
// so clearest option is serde(with = "hex") but it is not working, for details see https://github.com/serde-rs/serde/issues/1301
#[serde(
default,
skip_serializing_if = "Option::is_none",
serialize_with = "to_hex_option",
deserialize_with = "from_hex_option"
)]
pub tenant_id: Option<ZTenantId>,
pub tenant_id: Option<HexZTenantId>,
pub scope: Scope,
}
impl Claims {
pub fn new(tenant_id: Option<ZTenantId>, scope: Scope) -> Self {
Self { tenant_id, scope }
Self {
tenant_id: tenant_id.map(HexZTenantId::from),
scope,
}
}
}
@@ -75,7 +47,7 @@ pub fn check_permission(claims: &Claims, tenantid: Option<ZTenantId>) -> Result<
bail!("Attempt to access management api with tenant scope. Permission denied")
}
(Scope::Tenant, Some(tenantid)) => {
if claims.tenant_id.unwrap() != tenantid {
if ZTenantId::from(claims.tenant_id.unwrap()) != tenantid {
bail!("Tenant id mismatch. Permission denied")
}
Ok(())

View File

@@ -2,13 +2,100 @@ use std::{fmt, str::FromStr};
use hex::FromHex;
use rand::Rng;
use serde::{Deserialize, Serialize};
use serde::{
de::{self, Visitor},
Deserialize, Serialize,
};
// Zenith ID is a 128-bit random ID.
// Used to represent various identifiers. Provides handy utility methods and impls.
macro_rules! mutual_from {
($id1:ident, $id2:ident) => {
impl From<$id1> for $id2 {
fn from(id1: $id1) -> Self {
Self(id1.0.into())
}
}
impl From<$id2> for $id1 {
fn from(id2: $id2) -> Self {
Self(id2.0.into())
}
}
};
}
/// Zenith ID is a 128-bit random ID.
/// Used to represent various identifiers. Provides handy utility methods and impls.
///
/// NOTE: It (de)serializes as an array of hex bytes, so the string representation would look
/// like `[173,80,132,115,129,226,72,254,170,201,135,108,199,26,228,24]`.
/// Use [`HexZId`] to serialize it as hex string instead: `ad50847381e248feaac9876cc71ae418`.
#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)]
struct ZId([u8; 16]);
/// [`ZId`] version that serializes and deserializes as a hex string.
/// Useful for various json serializations, where hex byte array from original id is not convenient.
///
/// Plain `ZId` could be (de)serialized into hex string with `#[serde(with = "hex")]` attribute.
/// This however won't work on nested types like `Option<ZId>` or `Vec<ZId>`, see https://github.com/serde-rs/serde/issues/723 for the details.
/// Every separate type currently needs a new (de)serializing method for every type separately.
///
/// To provide a generic way to serialize the ZId as a hex string where `#[serde(with = "hex")]` is not enough, this wrapper is created.
/// The default wrapper serialization is left unchanged due to
/// * byte array (de)serialization being faster and simpler
/// * byte deserialization being used in Safekeeper already, with those bytes coming from compute (see `ProposerGreeting` in safekeeper)
/// * current `HexZId`'s deserialization impl breaks on compute byte array deserialization, having it by default is dangerous
#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
struct HexZId([u8; 16]);
impl Serialize for HexZId {
fn serialize<S>(&self, ser: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
hex::encode(self.0).serialize(ser)
}
}
impl<'de> Deserialize<'de> for HexZId {
fn deserialize<D>(de: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
de.deserialize_bytes(HexVisitor)
}
}
struct HexVisitor;
impl<'de> Visitor<'de> for HexVisitor {
type Value = HexZId;
fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"A hexadecimal representation of a 128-bit random Zenith ID"
)
}
fn visit_bytes<E>(self, hex_bytes: &[u8]) -> Result<Self::Value, E>
where
E: de::Error,
{
ZId::from_hex(hex_bytes)
.map(HexZId::from)
.map_err(de::Error::custom)
}
fn visit_str<E>(self, hex_bytes_str: &str) -> Result<Self::Value, E>
where
E: de::Error,
{
Self::visit_bytes(self, hex_bytes_str.as_bytes())
}
}
mutual_from!(ZId, HexZId);
impl ZId {
pub fn get_from_buf(buf: &mut dyn bytes::Buf) -> ZId {
let mut arr = [0u8; 16];
@@ -155,46 +242,80 @@ macro_rules! zid_newtype {
/// is separate from PostgreSQL timelines, and doesn't have those
/// limitations. A zenith timeline is identified by a 128-bit ID, which
/// is usually printed out as a hex string.
///
/// NOTE: It (de)serializes as an array of hex bytes, so the string representation would look
/// like `[173,80,132,115,129,226,72,254,170,201,135,108,199,26,228,24]`.
/// Use [`HexZTimelineId`] to serialize it as hex string instead: `ad50847381e248feaac9876cc71ae418`.
#[derive(Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd, Serialize, Deserialize)]
pub struct ZTimelineId(ZId);
zid_newtype!(ZTimelineId);
/// A [`ZTimelineId`] version that gets (de)serialized as a hex string.
/// Use in complex types, where `#[serde(with = "hex")]` does not work.
/// See [`HexZId`] for more details.
#[derive(Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd, Serialize, Deserialize)]
pub struct HexZTimelineId(HexZId);
// Zenith Tenant Id represents identifiar of a particular tenant.
// Is used for distinguishing requests and data belonging to different users.
impl std::fmt::Debug for HexZTimelineId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
ZTimelineId::from(*self).fmt(f)
}
}
impl std::fmt::Display for HexZTimelineId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
ZTimelineId::from(*self).fmt(f)
}
}
impl FromStr for HexZTimelineId {
type Err = <ZTimelineId as FromStr>::Err;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(HexZTimelineId::from(ZTimelineId::from_str(s)?))
}
}
zid_newtype!(ZTimelineId);
mutual_from!(ZTimelineId, HexZTimelineId);
/// Zenith Tenant Id represents identifiar of a particular tenant.
/// Is used for distinguishing requests and data belonging to different users.
///
/// NOTE: It (de)serializes as an array of hex bytes, so the string representation would look
/// like `[173,80,132,115,129,226,72,254,170,201,135,108,199,26,228,24]`.
/// Use [`HexZTenantId`] to serialize it as hex string instead: `ad50847381e248feaac9876cc71ae418`.
#[derive(Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)]
pub struct ZTenantId(ZId);
zid_newtype!(ZTenantId);
/// A [`ZTenantId`] version that gets (de)serialized as a hex string.
/// Use in complex types, where `#[serde(with = "hex")]` does not work.
/// See [`HexZId`] for more details.
#[derive(Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd, Serialize, Deserialize)]
pub struct HexZTenantId(HexZId);
/// Serde routines for Option<T> (de)serialization, using `T:Display` representations for inner values.
/// Useful for Option<ZTenantId> and Option<ZTimelineId> to get their hex representations into serialized string and deserialize them back.
pub mod opt_display_serde {
use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
use std::{fmt::Display, str::FromStr};
pub fn serialize<S, Id>(id: &Option<Id>, ser: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
Id: Display,
{
id.as_ref().map(ToString::to_string).serialize(ser)
}
pub fn deserialize<'de, D, Id>(des: D) -> Result<Option<Id>, D::Error>
where
D: Deserializer<'de>,
Id: FromStr,
<Id as FromStr>::Err: Display,
{
Ok(if let Some(s) = Option::<String>::deserialize(des)? {
Some(Id::from_str(&s).map_err(de::Error::custom)?)
} else {
None
})
impl std::fmt::Debug for HexZTenantId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
ZTenantId::from(*self).fmt(f)
}
}
impl std::fmt::Display for HexZTenantId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
ZTenantId::from(*self).fmt(f)
}
}
impl FromStr for HexZTenantId {
type Err = <ZTenantId as FromStr>::Err;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(HexZTenantId::from(ZTenantId::from_str(s)?))
}
}
zid_newtype!(ZTenantId);
mutual_from!(ZTenantId, HexZTenantId);
// A pair uniquely identifying Zenith instance.
#[derive(Debug, Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Hash)]
pub struct ZTenantTimelineId {
@@ -243,16 +364,15 @@ mod tests {
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
struct TestStruct<E: Display, T: FromStr<Err = E> + Display> {
#[serde(with = "opt_display_serde")]
field: Option<T>,
}
#[test]
fn test_hex_serializations_tenant_id() {
let original_struct = TestStruct {
field: Some(ZTenantId::from_array(hex!(
field: Some(HexZTenantId::from(ZTenantId::from_array(hex!(
"11223344556677881122334455667788"
))),
)))),
};
let serialized_string = serde_json::to_string(&original_struct).unwrap();
@@ -261,7 +381,7 @@ mod tests {
r#"{"field":"11223344556677881122334455667788"}"#
);
let deserialized_struct: TestStruct<FromHexError, ZTenantId> =
let deserialized_struct: TestStruct<FromHexError, HexZTenantId> =
serde_json::from_str(&serialized_string).unwrap();
assert_eq!(original_struct, deserialized_struct);
}
@@ -269,9 +389,9 @@ mod tests {
#[test]
fn test_hex_serializations_timeline_id() {
let original_struct = TestStruct {
field: Some(ZTimelineId::from_array(hex!(
field: Some(HexZTimelineId::from(ZTimelineId::from_array(hex!(
"AA223344556677881122334455667788"
))),
)))),
};
let serialized_string = serde_json::to_string(&original_struct).unwrap();
@@ -280,7 +400,7 @@ mod tests {
r#"{"field":"aa223344556677881122334455667788"}"#
);
let deserialized_struct: TestStruct<FromHexError, ZTimelineId> =
let deserialized_struct: TestStruct<FromHexError, HexZTimelineId> =
serde_json::from_str(&serialized_string).unwrap();
assert_eq!(original_struct, deserialized_struct);
}