Compare commits

...

10 Commits

Author SHA1 Message Date
Bojan Serafimov
303f0a230b Use new project, don't delete it 2022-10-18 16:22:09 -04:00
Bojan Serafimov
43e35a68ac Don't run all benchmarks 2022-10-18 14:46:33 -04:00
Bojan Serafimov
ac0e58b8fb Merge branch 'main' into tmp-perf-captest-reuse-only 2022-10-18 14:43:50 -04:00
bojanserafimov
8fbe437768 Improve pageserver IO metrics (#2629) 2022-10-18 11:53:28 -04:00
Heikki Linnakangas
989d78aac8 Buffer the TCP incoming stream on libpq connections.
Reduces the number of syscalls needed to read the commands from the
compute.

Here's a snippet of strace output from the pageserver, when performing
a sequential scan on a table, with prefetch:

    3084934 recvfrom(47, "d", 1, 0, NULL, NULL) = 1
    3084934 recvfrom(47, "\0\0\0\37", 4, 0, NULL, NULL) = 4
    3084934 recvfrom(47, "\2\1\0\0\0\0\362\302\360\0\0\0\6\177\0\0002\276\0\0@\f\0\0\0\0\3", 27, 0, NULL, NULL) = 27
    3084934 pread64(28, "\0\0\0\1\0\0\0\0\0\0\0\253                    "..., 8192, 25190400) = 8192
    3084934 write(45, "B\0\0\0\25\0\0\0\6\177\0\0002\276\0\0@\f\0\0\0\3A\0\0\32\355\0\0\0\0\1"..., 7010) = 7010
    3084934 poll([{fd=46, events=POLLIN}, {fd=48, events=POLLIN}], 2, 60000) = 1 ([{fd=46, revents=POLLIN}])
    3084934 read(46, "\0\0\0\0p\311q\1\0\0\4\0\f\1\200\1\0 \4 \0\0\0\0\200\237\362\0\0\237\362\0"..., 8192) = 8192
    3084934 sendto(47, "d\0\0 \5f\0\0\0\0p\311q\1\0\0\4\0\f\1\200\1\0 \4 \0\0\0\0\200\237"..., 8198, MSG_NOSIGNAL, NULL, 0) = 8198
    3084934 recvfrom(47, "d", 1, 0, NULL, NULL) = 1
    3084934 recvfrom(47, "\0\0\0\37", 4, 0, NULL, NULL) = 4
    3084934 recvfrom(47, "\2\1\0\0\0\0\362\302\360\0\0\0\6\177\0\0002\276\0\0@\f\0\0\0\0\4", 27, 0, NULL, NULL) = 27
    3084934 pread64(28, "    \0=\0L\0\0\0\1\0\0\0\0\0\0\0\0\0\0\0\0;;\0\0\0\4\4\0"..., 8192, 25198592) = 8192
    3084934 write(45, "B\0\0\0\25\0\0\0\6\177\0\0002\276\0\0@\f\0\0\0\4A\0\0\32\355\0\0\0\0\1"..., 7010) = 7010
    3084934 poll([{fd=46, events=POLLIN}, {fd=48, events=POLLIN}], 2, 60000) = 1 ([{fd=46, revents=POLLIN}])
    3084934 read(46, "\0\0\0\0\260\344q\1\0\0\4\0\f\1\200\1\0 \4 \0\0\0\0\200\237\362\0\0\237\362\0"..., 8192) = 8192
    3084934 sendto(47, "d\0\0 \5f\0\0\0\0\260\344q\1\0\0\4\0\f\1\200\1\0 \4 \0\0\0\0\200\237"..., 8198, MSG_NOSIGNAL, NULL, 0) = 8198
    3084934 recvfrom(47, "d", 1, 0, NULL, NULL) = 1
    3084934 recvfrom(47, "\0\0\0\37", 4, 0, NULL, NULL) = 4
    3084934 recvfrom(47, "\2\1\0\0\0\0\362\302\360\0\0\0\6\177\0\0002\276\0\0@\f\0\0\0\0\5", 27, 0, NULL, NULL) = 27
    3084934 write(45, "B\0\0\0\25\0\0\0\6\177\0\0002\276\0\0@\f\0\0\0\5A\0\0\32\355\0\0\0\0\1"..., 7010) = 7010
    3084934 poll([{fd=46, events=POLLIN}, {fd=48, events=POLLIN}], 2, 60000) = 1 ([{fd=46, revents=POLLIN}])
    3084934 read(46, "\0\0\0\0\330\377q\1\0\0\4\0\f\1\200\1\0 \4 \0\0\0\0\200\237\362\0\0\237\362\0"..., 8192) = 8192
    3084934 sendto(47, "d\0\0 \5f\0\0\0\0\330\377q\1\0\0\4\0\f\1\200\1\0 \4 \0\0\0\0\200\237"..., 8198, MSG_NOSIGNAL, NULL, 0) = 8198

This shows the interaction for three get_page_at_lsn requests. For
each request, the pageserver performs three recvfrom syscalls to read
the incoming request from the socket. After this patch, those recvfrom
calls are gone:

    3086123 read(47, "\0\0\0\0\360\222q\1\0\0\4\0\f\1\200\1\0 \4 \0\0\0\0\200\237\362\0\0\237\362\0"..., 8192) = 8192
    3086123 sendto(45, "d\0\0 \5f\0\0\0\0\360\222q\1\0\0\4\0\f\1\200\1\0 \4 \0\0\0\0\200\237"..., 8198, MSG_NOSIGNAL, NULL, 0) = 8198
    3086123 pread64(29, "                                "..., 8192, 25182208) = 8192
    3086123 write(46, "B\0\0\0\25\0\0\0\6\177\0\0002\276\0\0@\f\0\0\0\2A\0\0\32\355\0\0\0\0\1"..., 7010) = 7010
    3086123 poll([{fd=47, events=POLLIN}, {fd=49, events=POLLIN}], 2, 60000) = 1 ([{fd=47, revents=POLLIN}])
    3086123 read(47, "\0\0\0\0000\256q\1\0\0\4\0\f\1\200\1\0 \4 \0\0\0\0\200\237\362\0\0\237\362\0"..., 8192) = 8192
    3086123 sendto(45, "d\0\0 \5f\0\0\0\0000\256q\1\0\0\4\0\f\1\200\1\0 \4 \0\0\0\0\200\237"..., 8198, MSG_NOSIGNAL, NULL, 0) = 8198
    3086123 pread64(29, "\0\0\0\1\0\0\0\0\0\0\0\253                    "..., 8192, 25190400) = 8192
    3086123 write(46, "B\0\0\0\25\0\0\0\6\177\0\0002\276\0\0@\f\0\0\0\3A\0\0\32\355\0\0\0\0\1"..., 7010) = 7010
    3086123 poll([{fd=47, events=POLLIN}, {fd=49, events=POLLIN}], 2, 60000) = 1 ([{fd=47, revents=POLLIN}])
    3086123 read(47, "\0\0\0\0p\311q\1\0\0\4\0\f\1\200\1\0 \4 \0\0\0\0\200\237\362\0\0\237\362\0"..., 8192) = 8192
    3086123 sendto(45, "d\0\0 \5f\0\0\0\0p\311q\1\0\0\4\0\f\1\200\1\0 \4 \0\0\0\0\200\237"..., 8198, MSG_NOSIGNAL, NULL, 0) = 8198
    3086123 pread64(29, "    \0=\0L\0\0\0\1\0\0\0\0\0\0\0\0\0\0\0\0;;\0\0\0\4\4\0"..., 8192, 25198592) = 8192
    3086123 write(46, "B\0\0\0\25\0\0\0\6\177\0\0002\276\0\0@\f\0\0\0\4A\0\0\32\355\0\0\0\0\1"..., 7010) = 7010
    3086123 poll([{fd=47, events=POLLIN}, {fd=49, events=POLLIN}], 2, 60000) = 1 ([{fd=47, revents=POLLIN}])

In this test, the compute sends a batch of prefetch requests, and they
are read from the socket in one syscall. That syscall was not captured
by the strace snippet above, but there are much fewer of them than
before.
2022-10-18 18:46:07 +03:00
Stas Kelvich
7ca72578f9 Enable plv8 again
Now with quickfix for https://github.com/plv8/plv8/issues/503
2022-10-18 18:34:27 +03:00
Heikki Linnakangas
41550ec8bf Remove unnecessary indirections of libpqwalproposer functions
In the Postgres backend, we cannot link directly with libpq (check the
pgsql-hackers arhive for all kinds of fun that ensued when we tried to
do that). Therefore, the libpq functions are used through the thin
wrapper functions in libpqwalreceiver.so, and libpqwalreceiver.so is
loaded dynamically. To hide the dynamic loading and make the calls
look like regular functions, we use macros to hide the function
pointers.

We had inherited the same indirections in libpqwalproposer, but it's
not needed since the neon extension is already a shared library that's
loaded dynamically. There's no problem calling the functions directly
there. Remove the indirections.
2022-10-18 18:25:30 +03:00
Sergey Melnikov
0cd2d91b9d Fix deploy-new job by installing sivel.toiletwater (#2641) 2022-10-18 14:44:19 +00:00
Bojan Serafimov
2a3cf6d83a Do init only 2022-10-11 18:57:31 -04:00
Bojan Serafimov
e0dfad6fcf Temporarily test only on neon-captest-reuse 2022-10-11 13:19:15 -04:00
13 changed files with 73 additions and 305 deletions

View File

@@ -36,106 +36,6 @@ concurrency:
cancel-in-progress: true
jobs:
bench:
# this workflow runs on self hosteed runner
# it's environment is quite different from usual guthub runner
# probably the most important difference is that it doesn't start from clean workspace each time
# e g if you install system packages they are not cleaned up since you install them directly in host machine
# not a container or something
# See documentation for more info: https://docs.github.com/en/actions/hosting-your-own-runners/about-self-hosted-runners
runs-on: [self-hosted, zenith-benchmarker]
env:
POSTGRES_DISTRIB_DIR: /usr/pgsql
DEFAULT_PG_VERSION: 14
steps:
- name: Checkout zenith repo
uses: actions/checkout@v3
# actions/setup-python@v2 is not working correctly on self-hosted runners
# see https://github.com/actions/setup-python/issues/162
# and probably https://github.com/actions/setup-python/issues/162#issuecomment-865387976 in particular
# so the simplest solution to me is to use already installed system python and spin virtualenvs for job runs.
# there is Python 3.7.10 already installed on the machine so use it to install poetry and then use poetry's virtuealenvs
- name: Install poetry & deps
run: |
python3 -m pip install --upgrade poetry wheel
# since pip/poetry caches are reused there shouldn't be any troubles with install every time
./scripts/pysync
- name: Show versions
run: |
echo Python
python3 --version
poetry run python3 --version
echo Poetry
poetry --version
echo Pgbench
${POSTGRES_DISTRIB_DIR}/v${DEFAULT_PG_VERSION}/bin/pgbench --version
- name: Create Neon Project
id: create-neon-project
uses: ./.github/actions/neon-project-create
with:
environment: ${{ github.event.inputs.environment || 'staging' }}
api_key: ${{ ( github.event.inputs.environment || 'staging' ) == 'staging' && secrets.NEON_STAGING_API_KEY || secrets.NEON_CAPTEST_API_KEY }}
- name: Run benchmark
# pgbench is installed system wide from official repo
# https://download.postgresql.org/pub/repos/yum/13/redhat/rhel-7-x86_64/
# via
# sudo tee /etc/yum.repos.d/pgdg.repo<<EOF
# [pgdg13]
# name=PostgreSQL 13 for RHEL/CentOS 7 - x86_64
# baseurl=https://download.postgresql.org/pub/repos/yum/13/redhat/rhel-7-x86_64/
# enabled=1
# gpgcheck=0
# EOF
# sudo yum makecache
# sudo yum install postgresql13-contrib
# actual binaries are located in /usr/pgsql-13/bin/
env:
# The pgbench test runs two tests of given duration against each scale.
# So the total runtime with these parameters is 2 * 2 * 300 = 1200, or 20 minutes.
# Plus time needed to initialize the test databases.
TEST_PG_BENCH_DURATIONS_MATRIX: "300"
TEST_PG_BENCH_SCALES_MATRIX: "10,100"
PLATFORM: "neon-staging"
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
REMOTE_ENV: "1" # indicate to test harness that we do not have zenith binaries locally
run: |
# just to be sure that no data was cached on self hosted runner
# since it might generate duplicates when calling ingest_perf_test_result.py
rm -rf perf-report-staging
mkdir -p perf-report-staging
# Set --sparse-ordering option of pytest-order plugin to ensure tests are running in order of appears in the file,
# it's important for test_perf_pgbench.py::test_pgbench_remote_* tests
./scripts/pytest test_runner/performance/ -v -m "remote_cluster" --sparse-ordering --out-dir perf-report-staging --timeout 5400
- name: Submit result
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
run: |
REPORT_FROM=$(realpath perf-report-staging) REPORT_TO=staging scripts/generate_and_push_perf_report.sh
- name: Delete Neon Project
if: ${{ always() }}
uses: ./.github/actions/neon-project-delete
with:
environment: staging
project_id: ${{ steps.create-neon-project.outputs.project_id }}
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}
uses: slackapi/slack-github-action@v1
with:
channel-id: "C033QLM5P7D" # dev-staging-stream
slack-message: "Periodic perf testing: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}"
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
pgbench-compare:
strategy:
@@ -144,15 +44,8 @@ jobs:
# neon-captest-new: Run pgbench in a freshly created project
# neon-captest-reuse: Same, but reusing existing project
# neon-captest-prefetch: Same, with prefetching enabled (new project)
platform: [ neon-captest-new, neon-captest-reuse, neon-captest-prefetch ]
platform: [ neon-captest-new ]
db_size: [ 10gb ]
include:
- platform: neon-captest-new
db_size: 50gb
- platform: neon-captest-prefetch
db_size: 50gb
- platform: rds-aurora
db_size: 50gb
env:
TEST_PG_BENCH_DURATIONS_MATRIX: "60m"
@@ -238,32 +131,6 @@ jobs:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
- name: Benchmark simple-update
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_pgbench_remote_simple_update
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
- name: Benchmark select-only
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 21600 -k test_pgbench_remote_select_only
env:
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
- name: Create Allure report
if: always()
uses: ./.github/actions/allure-report
@@ -271,14 +138,6 @@ jobs:
action: generate
build_type: ${{ env.BUILD_TYPE }}
- name: Delete Neon Project
if: ${{ steps.create-neon-project.outputs.project_id && always() }}
uses: ./.github/actions/neon-project-delete
with:
environment: dev
project_id: ${{ steps.create-neon-project.outputs.project_id }}
api_key: ${{ secrets.NEON_CAPTEST_API_KEY }}
- name: Post to a Slack channel
if: ${{ github.event.schedule && failure() }}
uses: slackapi/slack-github-action@v1

View File

@@ -774,6 +774,7 @@ jobs:
exit 1
fi
ansible-galaxy collection install sivel.toiletwater
ansible-playbook deploy.yaml -i staging.us-east-2.hosts.yaml -e @ssm_config -e CONSOLE_API_TOKEN=${{secrets.NEON_STAGING_API_KEY}}
rm -f neon_install.tar.gz .neon_current_version

View File

@@ -71,10 +71,12 @@ RUN apt update && \
RUN apt update && \
apt install -y --no-install-recommends -t testing binutils
# Sed is used to patch for https://github.com/plv8/plv8/issues/503
RUN wget https://github.com/plv8/plv8/archive/refs/tags/v3.1.4.tar.gz && \
tar xvzf v3.1.4.tar.gz && \
cd plv8-3.1.4 && \
export PATH="/usr/local/pgsql/bin:$PATH" && \
sed -i 's/MemoryContextAlloc(/MemoryContextAllocZero(/' plv8.cc && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
rm -rf /plv8-* && \
@@ -116,8 +118,7 @@ RUN wget https://github.com/zachasme/h3-pg/archive/refs/tags/v4.0.1.tar.gz -O h3
#
FROM build-deps AS neon-pg-ext-build
COPY --from=postgis-build /usr/local/pgsql/ /usr/local/pgsql/
# plv8 still sometimes crashes during the creation
# COPY --from=plv8-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=plv8-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=h3-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=h3-pg-build /h3/usr /
COPY pgxn/ pgxn/

View File

@@ -76,10 +76,12 @@ RUN apt update && \
RUN apt update && \
apt install -y --no-install-recommends -t testing binutils
# Sed is used to patch for https://github.com/plv8/plv8/issues/503
RUN wget https://github.com/plv8/plv8/archive/refs/tags/v3.1.4.tar.gz && \
tar xvzf v3.1.4.tar.gz && \
cd plv8-3.1.4 && \
export PATH="/usr/local/pgsql/bin:$PATH" && \
sed -i 's/MemoryContextAlloc(/MemoryContextAllocZero(/' plv8.cc && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
rm -rf /plv8-* && \
@@ -121,8 +123,7 @@ RUN wget https://github.com/zachasme/h3-pg/archive/refs/tags/v4.0.1.tar.gz -O h3
#
FROM build-deps AS neon-pg-ext-build
COPY --from=postgis-build /usr/local/pgsql/ /usr/local/pgsql/
# plv8 still sometimes crashes during the creation
# COPY --from=plv8-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=plv8-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=h3-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=h3-pg-build /h3/usr /
COPY pgxn/ pgxn/

View File

@@ -15,7 +15,7 @@ use std::sync::Arc;
use std::task::Poll;
use tracing::{debug, error, trace};
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
use tokio_rustls::TlsAcceptor;
#[async_trait::async_trait]
@@ -66,8 +66,8 @@ pub enum ProcessMsgResult {
/// Always-writeable sock_split stream.
/// May not be readable. See [`PostgresBackend::take_stream_in`]
pub enum Stream {
Unencrypted(tokio::net::TcpStream),
Tls(Box<tokio_rustls::server::TlsStream<tokio::net::TcpStream>>),
Unencrypted(BufReader<tokio::net::TcpStream>),
Tls(Box<tokio_rustls::server::TlsStream<BufReader<tokio::net::TcpStream>>>),
Broken,
}
@@ -157,7 +157,7 @@ impl PostgresBackend {
let peer_addr = socket.peer_addr()?;
Ok(Self {
stream: Stream::Unencrypted(socket),
stream: Stream::Unencrypted(BufReader::new(socket)),
buf_out: BytesMut::with_capacity(10 * 1024),
state: ProtoState::Initialization,
md5_salt: [0u8; 4],

View File

@@ -107,18 +107,20 @@ static CURRENT_LOGICAL_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
// Metrics for cloud upload. These metrics reflect data uploaded to cloud storage,
// or in testing they estimate how much we would upload if we did.
static NUM_PERSISTENT_FILES_CREATED: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
static NUM_PERSISTENT_FILES_CREATED: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_created_persistent_files_total",
"Number of files created that are meant to be uploaded to cloud storage",
&["tenant_id", "timeline_id"]
)
.expect("failed to define a metric")
});
static PERSISTENT_BYTES_WRITTEN: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
static PERSISTENT_BYTES_WRITTEN: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_written_persistent_bytes_total",
"Total bytes written that are meant to be uploaded to cloud storage",
&["tenant_id", "timeline_id"]
)
.expect("failed to define a metric")
});
@@ -386,8 +388,12 @@ impl TimelineMetrics {
let current_logical_size_gauge = CURRENT_LOGICAL_SIZE
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
let num_persistent_files_created = NUM_PERSISTENT_FILES_CREATED.clone();
let persistent_bytes_written = PERSISTENT_BYTES_WRITTEN.clone();
let num_persistent_files_created = NUM_PERSISTENT_FILES_CREATED
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
let persistent_bytes_written = PERSISTENT_BYTES_WRITTEN
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
TimelineMetrics {
tenant_id,
@@ -419,6 +425,8 @@ impl Drop for TimelineMetrics {
let _ = WAIT_LSN_TIME.remove_label_values(&[tenant_id, timeline_id]);
let _ = CURRENT_PHYSICAL_SIZE.remove_label_values(&[tenant_id, timeline_id]);
let _ = CURRENT_LOGICAL_SIZE.remove_label_values(&[tenant_id, timeline_id]);
let _ = NUM_PERSISTENT_FILES_CREATED.remove_label_values(&[tenant_id, timeline_id]);
let _ = PERSISTENT_BYTES_WRITTEN.remove_label_values(&[tenant_id, timeline_id]);
for op in STORAGE_TIME_OPERATIONS {
let _ = STORAGE_TIME.remove_label_values(&[op, tenant_id, timeline_id]);

View File

@@ -10,51 +10,12 @@ struct WalProposerConn
PGconn *pg_conn;
bool is_nonblocking; /* whether the connection is non-blocking */
char *recvbuf; /* last received data from
* libpqprop_async_read */
* walprop_async_read */
};
/* Prototypes for exported functions */
static char *libpqprop_error_message(WalProposerConn * conn);
static WalProposerConnStatusType libpqprop_status(WalProposerConn * conn);
static WalProposerConn * libpqprop_connect_start(char *conninfo);
static WalProposerConnectPollStatusType libpqprop_connect_poll(WalProposerConn * conn);
static bool libpqprop_send_query(WalProposerConn * conn, char *query);
static WalProposerExecStatusType libpqprop_get_query_result(WalProposerConn * conn);
static pgsocket libpqprop_socket(WalProposerConn * conn);
static int libpqprop_flush(WalProposerConn * conn);
static void libpqprop_finish(WalProposerConn * conn);
static PGAsyncReadResult libpqprop_async_read(WalProposerConn * conn, char **buf, int *amount);
static PGAsyncWriteResult libpqprop_async_write(WalProposerConn * conn, void const *buf, size_t size);
static bool libpqprop_blocking_write(WalProposerConn * conn, void const *buf, size_t size);
static WalProposerFunctionsType PQWalProposerFunctions =
{
libpqprop_error_message,
libpqprop_status,
libpqprop_connect_start,
libpqprop_connect_poll,
libpqprop_send_query,
libpqprop_get_query_result,
libpqprop_socket,
libpqprop_flush,
libpqprop_finish,
libpqprop_async_read,
libpqprop_async_write,
libpqprop_blocking_write,
};
/* Module initialization */
void
pg_init_libpqwalproposer(void)
{
if (WalProposerFunctions != NULL)
elog(ERROR, "libpqwalproposer already loaded");
WalProposerFunctions = &PQWalProposerFunctions;
}
/* Helper function */
static bool
ensure_nonblocking_status(WalProposerConn * conn, bool is_nonblocking)
ensure_nonblocking_status(WalProposerConn *conn, bool is_nonblocking)
{
/* If we're already correctly blocking or nonblocking, all good */
if (is_nonblocking == conn->is_nonblocking)
@@ -69,14 +30,14 @@ ensure_nonblocking_status(WalProposerConn * conn, bool is_nonblocking)
}
/* Exported function definitions */
static char *
libpqprop_error_message(WalProposerConn * conn)
char *
walprop_error_message(WalProposerConn *conn)
{
return PQerrorMessage(conn->pg_conn);
}
static WalProposerConnStatusType
libpqprop_status(WalProposerConn * conn)
WalProposerConnStatusType
walprop_status(WalProposerConn *conn)
{
switch (PQstatus(conn->pg_conn))
{
@@ -89,8 +50,8 @@ libpqprop_status(WalProposerConn * conn)
}
}
static WalProposerConn *
libpqprop_connect_start(char *conninfo)
WalProposerConn *
walprop_connect_start(char *conninfo)
{
WalProposerConn *conn;
PGconn *pg_conn;
@@ -119,8 +80,8 @@ libpqprop_connect_start(char *conninfo)
return conn;
}
static WalProposerConnectPollStatusType
libpqprop_connect_poll(WalProposerConn * conn)
WalProposerConnectPollStatusType
walprop_connect_poll(WalProposerConn *conn)
{
WalProposerConnectPollStatusType return_val;
@@ -160,8 +121,8 @@ libpqprop_connect_poll(WalProposerConn * conn)
return return_val;
}
static bool
libpqprop_send_query(WalProposerConn * conn, char *query)
bool
walprop_send_query(WalProposerConn *conn, char *query)
{
/*
* We need to be in blocking mode for sending the query to run without
@@ -177,8 +138,8 @@ libpqprop_send_query(WalProposerConn * conn, char *query)
return true;
}
static WalProposerExecStatusType
libpqprop_get_query_result(WalProposerConn * conn)
WalProposerExecStatusType
walprop_get_query_result(WalProposerConn *conn)
{
PGresult *result;
WalProposerExecStatusType return_val;
@@ -255,20 +216,20 @@ libpqprop_get_query_result(WalProposerConn * conn)
return return_val;
}
static pgsocket
libpqprop_socket(WalProposerConn * conn)
pgsocket
walprop_socket(WalProposerConn *conn)
{
return PQsocket(conn->pg_conn);
}
static int
libpqprop_flush(WalProposerConn * conn)
int
walprop_flush(WalProposerConn *conn)
{
return (PQflush(conn->pg_conn));
}
static void
libpqprop_finish(WalProposerConn * conn)
void
walprop_finish(WalProposerConn *conn)
{
if (conn->recvbuf != NULL)
PQfreemem(conn->recvbuf);
@@ -282,8 +243,8 @@ libpqprop_finish(WalProposerConn * conn)
* On success, the data is placed in *buf. It is valid until the next call
* to this function.
*/
static PGAsyncReadResult
libpqprop_async_read(WalProposerConn * conn, char **buf, int *amount)
PGAsyncReadResult
walprop_async_read(WalProposerConn *conn, char **buf, int *amount)
{
int result;
@@ -353,8 +314,8 @@ libpqprop_async_read(WalProposerConn * conn, char **buf, int *amount)
}
}
static PGAsyncWriteResult
libpqprop_async_write(WalProposerConn * conn, void const *buf, size_t size)
PGAsyncWriteResult
walprop_async_write(WalProposerConn *conn, void const *buf, size_t size)
{
int result;
@@ -408,8 +369,12 @@ libpqprop_async_write(WalProposerConn * conn, void const *buf, size_t size)
}
}
static bool
libpqprop_blocking_write(WalProposerConn * conn, void const *buf, size_t size)
/*
* This function is very similar to walprop_async_write. For more
* information, refer to the comments there.
*/
bool
walprop_blocking_write(WalProposerConn *conn, void const *buf, size_t size)
{
int result;
@@ -417,10 +382,6 @@ libpqprop_blocking_write(WalProposerConn * conn, void const *buf, size_t size)
if (!ensure_nonblocking_status(conn, false))
return false;
/*
* Ths function is very similar to libpqprop_async_write. For more
* information, refer to the comments there
*/
if ((result = PQputCopyData(conn->pg_conn, buf, size)) == -1)
return false;

View File

@@ -32,7 +32,6 @@ void
_PG_init(void)
{
pg_init_libpagestore();
pg_init_libpqwalproposer();
pg_init_walproposer();
EmitWarningsOnPlaceholders("neon");

View File

@@ -13,7 +13,6 @@
#define NEON_H
extern void pg_init_libpagestore(void);
extern void pg_init_libpqwalproposer(void);
extern void pg_init_walproposer(void);
#endif /* NEON_H */

View File

@@ -79,9 +79,6 @@ bool am_wal_proposer;
char *neon_timeline_walproposer = NULL;
char *neon_tenant_walproposer = NULL;
/* Declared in walproposer.h, defined here, initialized in libpqwalproposer.c */
WalProposerFunctionsType *WalProposerFunctions = NULL;
#define WAL_PROPOSER_SLOT_NAME "wal_proposer_slot"
static int n_safekeepers = 0;
@@ -438,10 +435,6 @@ WalProposerInitImpl(XLogRecPtr flushRecPtr, uint64 systemId)
char *sep;
char *port;
/* Load the libpq-specific functions */
if (WalProposerFunctions == NULL)
elog(ERROR, "libpqwalproposer didn't initialize correctly");
load_file("libpqwalreceiver", false);
if (WalReceiverFunctions == NULL)
elog(ERROR, "libpqwalreceiver didn't initialize correctly");

View File

@@ -446,31 +446,31 @@ typedef enum
} WalProposerConnStatusType;
/* Re-exported PQerrorMessage */
typedef char *(*walprop_error_message_fn) (WalProposerConn * conn);
extern char *walprop_error_message(WalProposerConn *conn);
/* Re-exported PQstatus */
typedef WalProposerConnStatusType(*walprop_status_fn) (WalProposerConn * conn);
extern WalProposerConnStatusType walprop_status(WalProposerConn *conn);
/* Re-exported PQconnectStart */
typedef WalProposerConn * (*walprop_connect_start_fn) (char *conninfo);
extern WalProposerConn * walprop_connect_start(char *conninfo);
/* Re-exported PQconectPoll */
typedef WalProposerConnectPollStatusType(*walprop_connect_poll_fn) (WalProposerConn * conn);
extern WalProposerConnectPollStatusType walprop_connect_poll(WalProposerConn *conn);
/* Blocking wrapper around PQsendQuery */
typedef bool (*walprop_send_query_fn) (WalProposerConn * conn, char *query);
extern bool walprop_send_query(WalProposerConn *conn, char *query);
/* Wrapper around PQconsumeInput + PQisBusy + PQgetResult */
typedef WalProposerExecStatusType(*walprop_get_query_result_fn) (WalProposerConn * conn);
extern WalProposerExecStatusType walprop_get_query_result(WalProposerConn *conn);
/* Re-exported PQsocket */
typedef pgsocket (*walprop_socket_fn) (WalProposerConn * conn);
extern pgsocket walprop_socket(WalProposerConn *conn);
/* Wrapper around PQconsumeInput (if socket's read-ready) + PQflush */
typedef int (*walprop_flush_fn) (WalProposerConn * conn);
extern int walprop_flush(WalProposerConn *conn);
/* Re-exported PQfinish */
typedef void (*walprop_finish_fn) (WalProposerConn * conn);
extern void walprop_finish(WalProposerConn *conn);
/*
* Ergonomic wrapper around PGgetCopyData
@@ -486,9 +486,7 @@ typedef void (*walprop_finish_fn) (WalProposerConn * conn);
* performs a bit of extra checking work that's always required and is normally
* somewhat verbose.
*/
typedef PGAsyncReadResult(*walprop_async_read_fn) (WalProposerConn * conn,
char **buf,
int *amount);
extern PGAsyncReadResult walprop_async_read(WalProposerConn *conn, char **buf, int *amount);
/*
* Ergonomic wrapper around PQputCopyData + PQflush
@@ -497,69 +495,14 @@ typedef PGAsyncReadResult(*walprop_async_read_fn) (WalProposerConn * conn,
*
* For information on the meaning of return codes, refer to PGAsyncWriteResult.
*/
typedef PGAsyncWriteResult(*walprop_async_write_fn) (WalProposerConn * conn,
void const *buf,
size_t size);
extern PGAsyncWriteResult walprop_async_write(WalProposerConn *conn, void const *buf, size_t size);
/*
* Blocking equivalent to walprop_async_write_fn
*
* Returns 'true' if successful, 'false' on failure.
*/
typedef bool (*walprop_blocking_write_fn) (WalProposerConn * conn, void const *buf, size_t size);
/* All libpqwalproposer exported functions collected together. */
typedef struct WalProposerFunctionsType
{
walprop_error_message_fn walprop_error_message;
walprop_status_fn walprop_status;
walprop_connect_start_fn walprop_connect_start;
walprop_connect_poll_fn walprop_connect_poll;
walprop_send_query_fn walprop_send_query;
walprop_get_query_result_fn walprop_get_query_result;
walprop_socket_fn walprop_socket;
walprop_flush_fn walprop_flush;
walprop_finish_fn walprop_finish;
walprop_async_read_fn walprop_async_read;
walprop_async_write_fn walprop_async_write;
walprop_blocking_write_fn walprop_blocking_write;
} WalProposerFunctionsType;
/* Allow the above functions to be "called" with normal syntax */
#define walprop_error_message(conn) \
WalProposerFunctions->walprop_error_message(conn)
#define walprop_status(conn) \
WalProposerFunctions->walprop_status(conn)
#define walprop_connect_start(conninfo) \
WalProposerFunctions->walprop_connect_start(conninfo)
#define walprop_connect_poll(conn) \
WalProposerFunctions->walprop_connect_poll(conn)
#define walprop_send_query(conn, query) \
WalProposerFunctions->walprop_send_query(conn, query)
#define walprop_get_query_result(conn) \
WalProposerFunctions->walprop_get_query_result(conn)
#define walprop_set_nonblocking(conn, arg) \
WalProposerFunctions->walprop_set_nonblocking(conn, arg)
#define walprop_socket(conn) \
WalProposerFunctions->walprop_socket(conn)
#define walprop_flush(conn) \
WalProposerFunctions->walprop_flush(conn)
#define walprop_finish(conn) \
WalProposerFunctions->walprop_finish(conn)
#define walprop_async_read(conn, buf, amount) \
WalProposerFunctions->walprop_async_read(conn, buf, amount)
#define walprop_async_write(conn, buf, size) \
WalProposerFunctions->walprop_async_write(conn, buf, size)
#define walprop_blocking_write(conn, buf, size) \
WalProposerFunctions->walprop_blocking_write(conn, buf, size)
/*
* The runtime location of the libpqwalproposer functions.
*
* This pointer is set by the initializer in libpqwalproposer, so that we
* can use it later.
*/
extern PGDLLIMPORT WalProposerFunctionsType * WalProposerFunctions;
extern bool walprop_blocking_write(WalProposerConn *conn, void const *buf, size_t size);
extern uint64 BackpressureThrottlingTime(void);

View File

@@ -130,11 +130,12 @@ class NeonCompare(PgCompare):
"size", timeline_size / (1024 * 1024), "MB", report=MetricReport.LOWER_IS_BETTER
)
params = f'{{tenant_id="{self.env.initial_tenant}",timeline_id="{self.timeline}"}}'
total_files = self.zenbenchmark.get_int_counter_value(
self.env.pageserver, "pageserver_created_persistent_files_total"
self.env.pageserver, "pageserver_created_persistent_files_total" + params
)
total_bytes = self.zenbenchmark.get_int_counter_value(
self.env.pageserver, "pageserver_written_persistent_bytes_total"
self.env.pageserver, "pageserver_written_persistent_bytes_total" + params
)
self.zenbenchmark.record(
"data_uploaded", total_bytes / (1024 * 1024), "MB", report=MetricReport.LOWER_IS_BETTER

View File

@@ -60,4 +60,6 @@ PAGESERVER_PER_TENANT_METRICS = [
"pageserver_wait_lsn_seconds_bucket",
"pageserver_wait_lsn_seconds_count",
"pageserver_wait_lsn_seconds_sum",
"pageserver_created_persistent_files_total",
"pageserver_written_persistent_bytes_total",
]