mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-24 22:00:37 +00:00
Compare commits
7 Commits
release-co
...
vlad/debug
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
750d65fa77 | ||
|
|
2b0248cd76 | ||
|
|
7b03216dca | ||
|
|
992aa91075 | ||
|
|
afe9b27983 | ||
|
|
5d91d4e843 | ||
|
|
2465e9141f |
63
.github/workflows/build-macos.yml
vendored
63
.github/workflows/build-macos.yml
vendored
@@ -63,13 +63,8 @@ jobs:
|
||||
|
||||
- name: Cache postgres ${{ matrix.postgres-version }} build
|
||||
id: cache_pg
|
||||
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
|
||||
uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
|
||||
with:
|
||||
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
|
||||
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
|
||||
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
|
||||
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
|
||||
use-fallback: false
|
||||
path: pg_install/${{ matrix.postgres-version }}
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-${{ matrix.postgres-version }}-${{ steps.pg_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
|
||||
@@ -134,25 +129,15 @@ jobs:
|
||||
|
||||
- name: Cache postgres v17 build
|
||||
id: cache_pg
|
||||
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
|
||||
uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
|
||||
with:
|
||||
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
|
||||
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
|
||||
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
|
||||
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
|
||||
use-fallback: false
|
||||
path: pg_install/v17
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v17-${{ steps.pg_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
|
||||
- name: Cache walproposer-lib
|
||||
id: cache_walproposer_lib
|
||||
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
|
||||
uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
|
||||
with:
|
||||
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
|
||||
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
|
||||
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
|
||||
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
|
||||
use-fallback: false
|
||||
path: pg_install/build/walproposer-lib
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-walproposer_lib-v17-${{ steps.pg_rev.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
|
||||
@@ -218,57 +203,32 @@ jobs:
|
||||
|
||||
- name: Cache postgres v14 build
|
||||
id: cache_pg
|
||||
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
|
||||
uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
|
||||
with:
|
||||
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
|
||||
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
|
||||
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
|
||||
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
|
||||
use-fallback: false
|
||||
path: pg_install/v14
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v14-${{ steps.pg_rev_v14.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
- name: Cache postgres v15 build
|
||||
id: cache_pg_v15
|
||||
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
|
||||
uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
|
||||
with:
|
||||
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
|
||||
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
|
||||
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
|
||||
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
|
||||
use-fallback: false
|
||||
path: pg_install/v15
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v15-${{ steps.pg_rev_v15.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
- name: Cache postgres v16 build
|
||||
id: cache_pg_v16
|
||||
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
|
||||
uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
|
||||
with:
|
||||
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
|
||||
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
|
||||
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
|
||||
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
|
||||
use-fallback: false
|
||||
path: pg_install/v16
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v16-${{ steps.pg_rev_v16.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
- name: Cache postgres v17 build
|
||||
id: cache_pg_v17
|
||||
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
|
||||
uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
|
||||
with:
|
||||
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
|
||||
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
|
||||
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
|
||||
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
|
||||
use-fallback: false
|
||||
path: pg_install/v17
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-pg-v17-${{ steps.pg_rev_v17.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
|
||||
- name: Cache cargo deps (only for v17)
|
||||
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
|
||||
uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
|
||||
with:
|
||||
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
|
||||
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
|
||||
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
|
||||
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
|
||||
use-fallback: false
|
||||
path: |
|
||||
~/.cargo/registry
|
||||
!~/.cargo/registry/src
|
||||
@@ -278,13 +238,8 @@ jobs:
|
||||
|
||||
- name: Cache walproposer-lib
|
||||
id: cache_walproposer_lib
|
||||
uses: tespkg/actions-cache@b7bf5fcc2f98a52ac6080eb0fd282c2f752074b1 # v1.8.0
|
||||
uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
|
||||
with:
|
||||
endpoint: ${{ vars.HETZNER_CACHE_REGION }}.${{ vars.HETZNER_CACHE_ENDPOINT }}
|
||||
bucket: ${{ vars.HETZNER_CACHE_BUCKET }}
|
||||
accessKey: ${{ secrets.HETZNER_CACHE_ACCESS_KEY }}
|
||||
secretKey: ${{ secrets.HETZNER_CACHE_SECRET_KEY }}
|
||||
use-fallback: false
|
||||
path: pg_install/build/walproposer-lib
|
||||
key: v1-${{ runner.os }}-${{ runner.arch }}-${{ env.BUILD_TYPE }}-walproposer_lib-v17-${{ steps.pg_rev_v17.outputs.pg_rev }}-${{ hashFiles('Makefile') }}
|
||||
|
||||
|
||||
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -1323,7 +1323,6 @@ dependencies = [
|
||||
"serde_json",
|
||||
"serde_with",
|
||||
"signal-hook",
|
||||
"spki 0.7.3",
|
||||
"tar",
|
||||
"thiserror 1.0.69",
|
||||
"tokio",
|
||||
|
||||
@@ -44,7 +44,6 @@ serde.workspace = true
|
||||
serde_with.workspace = true
|
||||
serde_json.workspace = true
|
||||
signal-hook.workspace = true
|
||||
spki = { version = "0.7.3", features = ["std"] }
|
||||
tar.workspace = true
|
||||
tower.workspace = true
|
||||
tower-http.workspace = true
|
||||
|
||||
@@ -3,7 +3,6 @@ use std::{io::Write, os::unix::fs::OpenOptionsExt, path::Path, time::Duration};
|
||||
use anyhow::{Context, Result, bail};
|
||||
use compute_api::responses::TlsConfig;
|
||||
use ring::digest;
|
||||
use spki::der::{Decode, PemReader};
|
||||
use x509_cert::Certificate;
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
@@ -52,7 +51,7 @@ pub fn update_key_path_blocking(pg_data: &Path, tls_config: &TlsConfig) {
|
||||
match try_update_key_path_blocking(pg_data, tls_config) {
|
||||
Ok(()) => break,
|
||||
Err(e) => {
|
||||
tracing::error!("could not create key file {e:?}");
|
||||
tracing::error!(error = ?e, "could not create key file");
|
||||
std::thread::sleep(Duration::from_secs(1))
|
||||
}
|
||||
}
|
||||
@@ -92,8 +91,14 @@ fn try_update_key_path_blocking(pg_data: &Path, tls_config: &TlsConfig) -> Resul
|
||||
fn verify_key_cert(key: &str, cert: &str) -> Result<()> {
|
||||
use x509_cert::der::oid::db::rfc5912::ECDSA_WITH_SHA_256;
|
||||
|
||||
let cert = Certificate::decode(&mut PemReader::new(cert.as_bytes()).context("pem reader")?)
|
||||
.context("decode cert")?;
|
||||
let certs = Certificate::load_pem_chain(cert.as_bytes())
|
||||
.context("decoding PEM encoded certificates")?;
|
||||
|
||||
// First certificate is our server-cert,
|
||||
// all the rest of the certs are the CA cert chain.
|
||||
let Some(cert) = certs.first() else {
|
||||
bail!("no certificates found");
|
||||
};
|
||||
|
||||
match cert.signature_algorithm.oid {
|
||||
ECDSA_WITH_SHA_256 => {
|
||||
@@ -115,3 +120,82 @@ fn verify_key_cert(key: &str, cert: &str) -> Result<()> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::verify_key_cert;
|
||||
|
||||
/// Real certificate chain file, generated by cert-manager in dev.
|
||||
/// The server auth certificate has expired since 2025-04-24T15:41:35Z.
|
||||
const CERT: &str = "
|
||||
-----BEGIN CERTIFICATE-----
|
||||
MIICCDCCAa+gAwIBAgIQKhLomFcNULbZA/bPdGzaSzAKBggqhkjOPQQDAjBEMQsw
|
||||
CQYDVQQGEwJVUzESMBAGA1UEChMJTmVvbiBJbmMuMSEwHwYDVQQDExhOZW9uIEs4
|
||||
cyBJbnRlcm1lZGlhdGUgQ0EwHhcNMjUwNDIzMTU0MTM1WhcNMjUwNDI0MTU0MTM1
|
||||
WjBBMT8wPQYDVQQDEzZjb21wdXRlLXdpc3B5LWdyYXNzLXcwY21laWp3LmRlZmF1
|
||||
bHQuc3ZjLmNsdXN0ZXIubG9jYWwwWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAATF
|
||||
QCcG2m/EVHAiZtSsYgVnHgoTjUL/Jtwfdrpvz2t0bVRZmBmSKhlo53uPV9Y5eKFG
|
||||
AmR54p9/gT2eO3xU7vAgo4GFMIGCMA4GA1UdDwEB/wQEAwIFoDAMBgNVHRMBAf8E
|
||||
AjAAMB8GA1UdIwQYMBaAFFR2JAhXkeiNQNEixTvAYIwxUu3QMEEGA1UdEQQ6MDiC
|
||||
NmNvbXB1dGUtd2lzcHktZ3Jhc3MtdzBjbWVpancuZGVmYXVsdC5zdmMuY2x1c3Rl
|
||||
ci5sb2NhbDAKBggqhkjOPQQDAgNHADBEAiBLG22wKG8XS9e9RxBT+kmUx/kIThcP
|
||||
DIpp7jx0PrFcdQIgEMTdnXpx5Cv/Z0NIEDxtMHUD7G0vuRPfztki36JuakM=
|
||||
-----END CERTIFICATE-----
|
||||
-----BEGIN CERTIFICATE-----
|
||||
MIICFzCCAb6gAwIBAgIUbbX98N2Ip6lWAONRk8dU9hSz+YIwCgYIKoZIzj0EAwIw
|
||||
RDELMAkGA1UEBhMCVVMxEjAQBgNVBAoTCU5lb24gSW5jLjEhMB8GA1UEAxMYTmVv
|
||||
biBBV1MgSW50ZXJtZWRpYXRlIENBMB4XDTI1MDQyMjE1MTAxMFoXDTI1MDcyMTE1
|
||||
MTAxMFowRDELMAkGA1UEBhMCVVMxEjAQBgNVBAoTCU5lb24gSW5jLjEhMB8GA1UE
|
||||
AxMYTmVvbiBLOHMgSW50ZXJtZWRpYXRlIENBMFkwEwYHKoZIzj0CAQYIKoZIzj0D
|
||||
AQcDQgAE5++m5owqNI4BPMTVNIUQH0qvU7pYhdpHGVGhdj/Lgars6ROvE6uSNQV4
|
||||
SAmJN5HBzj5/6kLQaTPWpXW7EHXjK6OBjTCBijAOBgNVHQ8BAf8EBAMCAQYwEgYD
|
||||
VR0TAQH/BAgwBgEB/wIBADAdBgNVHQ4EFgQUVHYkCFeR6I1A0SLFO8BgjDFS7dAw
|
||||
HwYDVR0jBBgwFoAUgHfNXfyKtHO0V9qoLOWCjkNiaI8wJAYDVR0eAQH/BBowGKAW
|
||||
MBSCEi5zdmMuY2x1c3Rlci5sb2NhbDAKBggqhkjOPQQDAgNHADBEAiBObVFFdXaL
|
||||
QpOXmN60dYUNnQRwjKreFduEkQgOdOlssgIgVAdJJQFgvlrvEOBhY8j5WyeKRwUN
|
||||
k/ALs6KpgaFBCGY=
|
||||
-----END CERTIFICATE-----
|
||||
-----BEGIN CERTIFICATE-----
|
||||
MIIB4jCCAYegAwIBAgIUFlxWFn/11yoGdmD+6gf+yQMToS0wCgYIKoZIzj0EAwIw
|
||||
ODELMAkGA1UEBhMCVVMxEjAQBgNVBAoTCU5lb24gSW5jLjEVMBMGA1UEAxMMTmVv
|
||||
biBSb290IENBMB4XDTI1MDQwMzA3MTUyMloXDTI2MDQwMzA3MTUyMlowRDELMAkG
|
||||
A1UEBhMCVVMxEjAQBgNVBAoTCU5lb24gSW5jLjEhMB8GA1UEAxMYTmVvbiBBV1Mg
|
||||
SW50ZXJtZWRpYXRlIENBMFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEqonG/IQ6
|
||||
ZxtEtOUTkkoNopPieXDO5CBKUkNFTGeJEB7OxRlSpYJgsBpaYIaD6Vc4sVk3thIF
|
||||
p+pLw52idQOIN6NjMGEwDgYDVR0PAQH/BAQDAgEGMA8GA1UdEwEB/wQFMAMBAf8w
|
||||
HQYDVR0OBBYEFIB3zV38irRztFfaqCzlgo5DYmiPMB8GA1UdIwQYMBaAFKh7M4/G
|
||||
FHvr/ORDQZt4bMLlJvHCMAoGCCqGSM49BAMCA0kAMEYCIQCbS4x7QPslONzBYbjC
|
||||
UQaQ0QLDW4CJHvQ4u4gbWFG87wIhAJMsHQHjP9qTT27Q65zQCR7O8QeLAfha1jrH
|
||||
Ag/LsxSr
|
||||
-----END CERTIFICATE-----
|
||||
";
|
||||
|
||||
/// The key corresponding to [`CERT`]
|
||||
const KEY: &str = "
|
||||
-----BEGIN EC PRIVATE KEY-----
|
||||
MHcCAQEEIDnAnrqmIJjndCLWP1iIO5X3X63Aia48TGpGuMXwvm6IoAoGCCqGSM49
|
||||
AwEHoUQDQgAExUAnBtpvxFRwImbUrGIFZx4KE41C/ybcH3a6b89rdG1UWZgZkioZ
|
||||
aOd7j1fWOXihRgJkeeKff4E9njt8VO7wIA==
|
||||
-----END EC PRIVATE KEY-----
|
||||
";
|
||||
|
||||
/// An incorrect key.
|
||||
const INCORRECT_KEY: &str = "
|
||||
-----BEGIN EC PRIVATE KEY-----
|
||||
MHcCAQEEIL6WqqBDyvM0HWz7Ir5M5+jhFWB7IzOClGn26OPrzHCXoAoGCCqGSM49
|
||||
AwEHoUQDQgAE7XVvdOy5lfwtNKb+gJEUtnG+DrnnXLY5LsHDeGQKV9PTRcEMeCrG
|
||||
YZzHyML4P6Sr4yi2ts+4B9i47uvAG8+XwQ==
|
||||
-----END EC PRIVATE KEY-----
|
||||
";
|
||||
|
||||
#[test]
|
||||
fn certificate_verification() {
|
||||
verify_key_cert(KEY, CERT).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic(expected = "private key file does not match certificate")]
|
||||
fn certificate_verification_fail() {
|
||||
verify_key_cert(INCORRECT_KEY, CERT).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,21 +9,20 @@
|
||||
# to verify custom image builds (e.g pre-published ones).
|
||||
#
|
||||
# A test script for postgres extensions
|
||||
# Currently supports only v16
|
||||
# Currently supports only v16+
|
||||
#
|
||||
set -eux -o pipefail
|
||||
|
||||
COMPOSE_FILE='docker-compose.yml'
|
||||
cd $(dirname $0)
|
||||
COMPUTE_CONTAINER_NAME=docker-compose-compute-1
|
||||
TEST_CONTAINER_NAME=docker-compose-neon-test-extensions-1
|
||||
export COMPOSE_FILE='docker-compose.yml'
|
||||
export COMPOSE_PROFILES=test-extensions
|
||||
cd "$(dirname "${0}")"
|
||||
PSQL_OPTION="-h localhost -U cloud_admin -p 55433 -d postgres"
|
||||
|
||||
cleanup() {
|
||||
function cleanup() {
|
||||
echo "show container information"
|
||||
docker ps
|
||||
echo "stop containers..."
|
||||
docker compose --profile test-extensions -f $COMPOSE_FILE down
|
||||
docker compose down
|
||||
}
|
||||
|
||||
for pg_version in ${TEST_VERSION_ONLY-14 15 16 17}; do
|
||||
@@ -31,55 +30,55 @@ for pg_version in ${TEST_VERSION_ONLY-14 15 16 17}; do
|
||||
echo "clean up containers if exists"
|
||||
cleanup
|
||||
PG_TEST_VERSION=$((pg_version < 16 ? 16 : pg_version))
|
||||
PG_VERSION=$pg_version PG_TEST_VERSION=$PG_TEST_VERSION docker compose --profile test-extensions -f $COMPOSE_FILE up --quiet-pull --build -d
|
||||
PG_VERSION=${pg_version} PG_TEST_VERSION=${PG_TEST_VERSION} docker compose up --quiet-pull --build -d
|
||||
|
||||
echo "wait until the compute is ready. timeout after 60s. "
|
||||
cnt=0
|
||||
while sleep 3; do
|
||||
# check timeout
|
||||
cnt=`expr $cnt + 3`
|
||||
if [ $cnt -gt 60 ]; then
|
||||
(( cnt += 3 ))
|
||||
if [[ ${cnt} -gt 60 ]]; then
|
||||
echo "timeout before the compute is ready."
|
||||
exit 1
|
||||
fi
|
||||
if docker compose --profile test-extensions -f $COMPOSE_FILE logs "compute_is_ready" | grep -q "accepting connections"; then
|
||||
if docker compose logs "compute_is_ready" | grep -q "accepting connections"; then
|
||||
echo "OK. The compute is ready to connect."
|
||||
echo "execute simple queries."
|
||||
docker exec $COMPUTE_CONTAINER_NAME /bin/bash -c "psql $PSQL_OPTION"
|
||||
docker compose exec compute /bin/bash -c "psql ${PSQL_OPTION} -c 'SELECT 1'"
|
||||
break
|
||||
fi
|
||||
done
|
||||
|
||||
if [ $pg_version -ge 16 ]; then
|
||||
if [[ ${pg_version} -ge 16 ]]; then
|
||||
# This is required for the pg_hint_plan test, to prevent flaky log message causing the test to fail
|
||||
# It cannot be moved to Dockerfile now because the database directory is created after the start of the container
|
||||
echo Adding dummy config
|
||||
docker exec $COMPUTE_CONTAINER_NAME touch /var/db/postgres/compute/compute_ctl_temp_override.conf
|
||||
docker compose exec compute touch /var/db/postgres/compute/compute_ctl_temp_override.conf
|
||||
# The following block copies the files for the pg_hintplan test to the compute node for the extension test in an isolated docker-compose environment
|
||||
TMPDIR=$(mktemp -d)
|
||||
docker cp $TEST_CONTAINER_NAME:/ext-src/pg_hint_plan-src/data $TMPDIR/data
|
||||
docker cp $TMPDIR/data $COMPUTE_CONTAINER_NAME:/ext-src/pg_hint_plan-src/
|
||||
rm -rf $TMPDIR
|
||||
docker compose cp neon-test-extensions:/ext-src/pg_hint_plan-src/data "${TMPDIR}/data"
|
||||
docker compose cp "${TMPDIR}/data" compute:/ext-src/pg_hint_plan-src/
|
||||
rm -rf "${TMPDIR}"
|
||||
# The following block does the same for the contrib/file_fdw test
|
||||
TMPDIR=$(mktemp -d)
|
||||
docker cp $TEST_CONTAINER_NAME:/postgres/contrib/file_fdw/data $TMPDIR/data
|
||||
docker cp $TMPDIR/data $COMPUTE_CONTAINER_NAME:/postgres/contrib/file_fdw/data
|
||||
rm -rf $TMPDIR
|
||||
docker compose cp neon-test-extensions:/postgres/contrib/file_fdw/data "${TMPDIR}/data"
|
||||
docker compose cp "${TMPDIR}/data" compute:/postgres/contrib/file_fdw/data
|
||||
rm -rf "${TMPDIR}"
|
||||
# Apply patches
|
||||
cat ../compute/patches/contrib_pg${pg_version}.patch | docker exec -i $TEST_CONTAINER_NAME bash -c "(cd /postgres && patch -p1)"
|
||||
docker compose exec -i neon-test-extensions bash -c "(cd /postgres && patch -p1)" <"../compute/patches/contrib_pg${pg_version}.patch"
|
||||
# We are running tests now
|
||||
rm -f testout.txt testout_contrib.txt
|
||||
docker exec -e USE_PGXS=1 -e SKIP=timescaledb-src,rdkit-src,postgis-src,pg_jsonschema-src,kq_imcx-src,wal2json_2_5-src,rag_jina_reranker_v1_tiny_en-src,rag_bge_small_en_v15-src \
|
||||
$TEST_CONTAINER_NAME /run-tests.sh /ext-src | tee testout.txt && EXT_SUCCESS=1 || EXT_SUCCESS=0
|
||||
docker exec -e SKIP=start-scripts,postgres_fdw,ltree_plpython,jsonb_plpython,jsonb_plperl,hstore_plpython,hstore_plperl,dblink,bool_plperl \
|
||||
$TEST_CONTAINER_NAME /run-tests.sh /postgres/contrib | tee testout_contrib.txt && CONTRIB_SUCCESS=1 || CONTRIB_SUCCESS=0
|
||||
if [ $EXT_SUCCESS -eq 0 ] || [ $CONTRIB_SUCCESS -eq 0 ]; then
|
||||
docker compose exec -e USE_PGXS=1 -e SKIP=timescaledb-src,rdkit-src,postgis-src,pg_jsonschema-src,kq_imcx-src,wal2json_2_5-src,rag_jina_reranker_v1_tiny_en-src,rag_bge_small_en_v15-src \
|
||||
neon-test-extensions /run-tests.sh /ext-src | tee testout.txt && EXT_SUCCESS=1 || EXT_SUCCESS=0
|
||||
docker compose exec -e SKIP=start-scripts,postgres_fdw,ltree_plpython,jsonb_plpython,jsonb_plperl,hstore_plpython,hstore_plperl,dblink,bool_plperl \
|
||||
neon-test-extensions /run-tests.sh /postgres/contrib | tee testout_contrib.txt && CONTRIB_SUCCESS=1 || CONTRIB_SUCCESS=0
|
||||
if [[ ${EXT_SUCCESS} -eq 0 || ${CONTRIB_SUCCESS} -eq 0 ]]; then
|
||||
CONTRIB_FAILED=
|
||||
FAILED=
|
||||
[ $EXT_SUCCESS -eq 0 ] && FAILED=$(tail -1 testout.txt | awk '{for(i=1;i<=NF;i++){print "/ext-src/"$i;}}')
|
||||
[ $CONTRIB_SUCCESS -eq 0 ] && CONTRIB_FAILED=$(tail -1 testout_contrib.txt | awk '{for(i=0;i<=NF;i++){print "/postgres/contrib/"$i;}}')
|
||||
for d in $FAILED $CONTRIB_FAILED; do
|
||||
docker exec $TEST_CONTAINER_NAME bash -c 'for file in $(find '"$d"' -name regression.diffs -o -name regression.out); do cat $file; done' || [ $? -eq 1 ]
|
||||
[[ ${EXT_SUCCESS} -eq 0 ]] && FAILED=$(tail -1 testout.txt | awk '{for(i=1;i<=NF;i++){print "/ext-src/"$i;}}')
|
||||
[[ ${CONTRIB_SUCCESS} -eq 0 ]] && CONTRIB_FAILED=$(tail -1 testout_contrib.txt | awk '{for(i=0;i<=NF;i++){print "/postgres/contrib/"$i;}}')
|
||||
for d in ${FAILED} ${CONTRIB_FAILED}; do
|
||||
docker compose exec neon-test-extensions bash -c 'for file in $(find '"${d}"' -name regression.diffs -o -name regression.out); do cat ${file}; done' || [[ ${?} -eq 1 ]]
|
||||
done
|
||||
exit 1
|
||||
fi
|
||||
|
||||
@@ -21,7 +21,7 @@ use pageserver_api::config::{
|
||||
PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
|
||||
PageServiceProtocolPipelinedBatchingStrategy, PageServiceProtocolPipelinedExecutionStrategy,
|
||||
};
|
||||
use pageserver_api::key::rel_block_to_key;
|
||||
use pageserver_api::key::{Key, rel_block_to_key};
|
||||
use pageserver_api::models::{
|
||||
self, PageTraceEvent, PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
|
||||
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
|
||||
@@ -29,7 +29,7 @@ use pageserver_api::models::{
|
||||
PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, PagestreamNblocksResponse,
|
||||
PagestreamProtocolVersion, PagestreamRequest, TenantState,
|
||||
};
|
||||
use pageserver_api::reltag::SlruKind;
|
||||
use pageserver_api::reltag::{RelTag, SlruKind};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use postgres_backend::{
|
||||
AuthType, PostgresBackend, PostgresBackendReader, QueryError, is_expected_io_error,
|
||||
@@ -1035,10 +1035,10 @@ impl PageServerHandler {
|
||||
// avoid a somewhat costly Span::record() by constructing the entire span in one go.
|
||||
macro_rules! mkspan {
|
||||
(before shard routing) => {{
|
||||
tracing::info_span!(parent: &parent_span, "handle_get_page_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.hdr.request_lsn)
|
||||
tracing::info_span!(parent: &parent_span, "handle_get_page_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.hdr.request_lsn, not_modified_since_lsn = %req.hdr.not_modified_since)
|
||||
}};
|
||||
($shard_id:expr) => {{
|
||||
tracing::info_span!(parent: &parent_span, "handle_get_page_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.hdr.request_lsn, shard_id = %$shard_id)
|
||||
tracing::info_span!(parent: &parent_span, "handle_get_page_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.hdr.request_lsn, not_modified_since_lsn = %req.hdr.not_modified_since, shard_id = %$shard_id)
|
||||
}};
|
||||
}
|
||||
|
||||
@@ -1140,9 +1140,10 @@ impl PageServerHandler {
|
||||
.await?;
|
||||
|
||||
// We're holding the Handle
|
||||
let last_record_lsn = shard.get_last_record_lsn();
|
||||
let effective_request_lsn = match Self::effective_request_lsn(
|
||||
&shard,
|
||||
shard.get_last_record_lsn(),
|
||||
last_record_lsn,
|
||||
req.hdr.request_lsn,
|
||||
req.hdr.not_modified_since,
|
||||
&shard.get_applied_gc_cutoff_lsn(),
|
||||
@@ -1153,6 +1154,22 @@ impl PageServerHandler {
|
||||
}
|
||||
};
|
||||
|
||||
let trouble_key = Key::from_hex("000000067F000040000000400600FFFFFFFF").unwrap();
|
||||
let trouble_rel = RelTag {
|
||||
spcnode: trouble_key.field2,
|
||||
dbnode: trouble_key.field3,
|
||||
relnode: trouble_key.field4,
|
||||
forknum: trouble_key.field5,
|
||||
};
|
||||
if req.rel == trouble_rel {
|
||||
tracing::info!(
|
||||
request_lsn=%req.hdr.request_lsn,
|
||||
not_modified_since_lsn=%req.hdr.not_modified_since,
|
||||
%last_record_lsn,
|
||||
"effective_request_lsn for {} is {}", key, effective_request_lsn
|
||||
);
|
||||
}
|
||||
|
||||
BatchedFeMessage::GetPage {
|
||||
span,
|
||||
shard: shard.downgrade(),
|
||||
|
||||
@@ -185,6 +185,7 @@ impl Timeline {
|
||||
pending_directory_entries: Vec::new(),
|
||||
pending_metadata_bytes: 0,
|
||||
lsn,
|
||||
extra_log: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -265,6 +266,14 @@ impl Timeline {
|
||||
let mut req_keyspaces: HashMap<Lsn, KeySpaceRandomAccum> =
|
||||
HashMap::with_capacity(pages.len());
|
||||
|
||||
let trouble_key = Key::from_hex("000000067F000040000000400600FFFFFFFF").unwrap();
|
||||
let trouble_rel = RelTag {
|
||||
spcnode: trouble_key.field2,
|
||||
dbnode: trouble_key.field3,
|
||||
relnode: trouble_key.field4,
|
||||
forknum: trouble_key.field5,
|
||||
};
|
||||
|
||||
for (response_slot_idx, (tag, blknum, lsn, ctx)) in pages.enumerate() {
|
||||
if tag.relnode == 0 {
|
||||
result_slots[response_slot_idx].write(Err(PageReconstructError::Other(
|
||||
@@ -275,6 +284,14 @@ impl Timeline {
|
||||
continue;
|
||||
}
|
||||
|
||||
if *tag == trouble_rel {
|
||||
tracing::info!(
|
||||
"Getting rel size for {} at LSN {}",
|
||||
rel_block_to_key(*tag, *blknum),
|
||||
lsn
|
||||
);
|
||||
}
|
||||
|
||||
let nblocks = match self
|
||||
.get_rel_size(*tag, Version::Lsn(lsn), &ctx)
|
||||
.maybe_perf_instrument(&ctx, |crnt_perf_span| {
|
||||
@@ -1402,6 +1419,8 @@ pub struct DatadirModification<'a> {
|
||||
|
||||
/// An **approximation** of how many metadata bytes will be written to the EphemeralFile.
|
||||
pending_metadata_bytes: usize,
|
||||
|
||||
extra_log: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
@@ -1620,6 +1639,32 @@ impl DatadirModification<'_> {
|
||||
) -> Result<(), WalIngestError> {
|
||||
let mut gaps_at_lsns = Vec::default();
|
||||
|
||||
let trouble_key = Key::from_hex("000000067F000040000000400600FFFFFFFF").unwrap();
|
||||
let trouble_rel = RelTag {
|
||||
spcnode: trouble_key.field2,
|
||||
dbnode: trouble_key.field3,
|
||||
relnode: trouble_key.field4,
|
||||
forknum: trouble_key.field5,
|
||||
};
|
||||
|
||||
for meta in batch.metadata.iter().filter_map(|m| match m {
|
||||
ValueMeta::Serialized(serialized_value_meta) => Some(serialized_value_meta),
|
||||
ValueMeta::Observed(_) => None,
|
||||
}) {
|
||||
let key = Key::from_compact(meta.key);
|
||||
let rel = RelTag {
|
||||
spcnode: key.field2,
|
||||
dbnode: key.field3,
|
||||
relnode: key.field4,
|
||||
forknum: key.field5,
|
||||
};
|
||||
|
||||
if rel == trouble_rel {
|
||||
tracing::info!("Put for {key} at LSN {}", meta.lsn);
|
||||
self.extra_log = true;
|
||||
}
|
||||
}
|
||||
|
||||
for meta in batch.metadata.iter() {
|
||||
let key = Key::from_compact(meta.key());
|
||||
let (rel, blkno) = key
|
||||
@@ -1950,6 +1995,19 @@ impl DatadirModification<'_> {
|
||||
"invalid relnode"
|
||||
)))?;
|
||||
}
|
||||
|
||||
let trouble_key = Key::from_hex("000000067F000040000000400600FFFFFFFF").unwrap();
|
||||
let trouble_rel = RelTag {
|
||||
spcnode: trouble_key.field2,
|
||||
dbnode: trouble_key.field3,
|
||||
relnode: trouble_key.field4,
|
||||
forknum: trouble_key.field5,
|
||||
};
|
||||
|
||||
if rel == trouble_rel {
|
||||
self.extra_log = true;
|
||||
}
|
||||
|
||||
// It's possible that this is the first rel for this db in this
|
||||
// tablespace. Create the reldir entry for it if so.
|
||||
let mut dbdir = DbDirectory::des(&self.get(DBDIR_KEY, ctx).await?)?;
|
||||
@@ -1969,6 +2027,10 @@ impl DatadirModification<'_> {
|
||||
true
|
||||
};
|
||||
|
||||
if rel == trouble_rel {
|
||||
tracing::info!(%dbdir_exists, "Maybe created db dir for {} at LSN {}", trouble_key.to_compact(), self.lsn);
|
||||
}
|
||||
|
||||
let rel_dir_key = rel_dir_to_key(rel.spcnode, rel.dbnode);
|
||||
let mut rel_dir = if !dbdir_exists {
|
||||
// Create the RelDirectory
|
||||
@@ -2014,6 +2076,10 @@ impl DatadirModification<'_> {
|
||||
}
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::RelV2, MetricsUpdate::Add(1)));
|
||||
|
||||
if rel == trouble_rel {
|
||||
tracing::info!(%dbdir_exists, "Created v2 rel for {} at LSN {}", trouble_key.to_compact(), self.lsn);
|
||||
}
|
||||
} else {
|
||||
// Add the new relation to the rel directory entry, and write it back
|
||||
if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
|
||||
@@ -2029,6 +2095,10 @@ impl DatadirModification<'_> {
|
||||
rel_dir_key,
|
||||
Value::Image(Bytes::from(RelDirectory::ser(&rel_dir)?)),
|
||||
);
|
||||
|
||||
if rel == trouble_rel {
|
||||
tracing::info!(%dbdir_exists, "Created v1 rel for {} at LSN {}", trouble_key.to_compact(), self.lsn);
|
||||
}
|
||||
}
|
||||
|
||||
// Put size
|
||||
@@ -2463,11 +2533,19 @@ impl DatadirModification<'_> {
|
||||
};
|
||||
|
||||
if let Some(batch) = maybe_batch {
|
||||
tracing::debug!(
|
||||
"Flushing batch with max_lsn={}. Last record LSN is {}",
|
||||
batch.max_lsn,
|
||||
self.tline.get_last_record_lsn()
|
||||
);
|
||||
if self.extra_log {
|
||||
tracing::info!(
|
||||
"Flushing batch with max_lsn={}. Last record LSN is {}",
|
||||
batch.max_lsn,
|
||||
self.tline.get_last_record_lsn()
|
||||
);
|
||||
} else {
|
||||
tracing::debug!(
|
||||
"Flushing batch with max_lsn={}. Last record LSN is {}",
|
||||
batch.max_lsn,
|
||||
self.tline.get_last_record_lsn()
|
||||
);
|
||||
}
|
||||
|
||||
// This bails out on first error without modifying pending_updates.
|
||||
// That's Ok, cf this function's doc comment.
|
||||
@@ -2501,6 +2579,14 @@ impl DatadirModification<'_> {
|
||||
|
||||
self.pending_metadata_bytes = 0;
|
||||
|
||||
if self.extra_log {
|
||||
tracing::info!(
|
||||
"Flushed batch. Last record LSN is {}",
|
||||
self.tline.get_last_record_lsn()
|
||||
);
|
||||
self.extra_log = false;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -2591,6 +2677,26 @@ impl DatadirModification<'_> {
|
||||
.pending_data_batch
|
||||
.get_or_insert_with(SerializedValueBatch::default);
|
||||
batch.put(key, val, self.lsn);
|
||||
|
||||
let trouble_key = Key::from_hex("000000067F000040000000400600FFFFFFFF").unwrap();
|
||||
let trouble_rel = RelTag {
|
||||
spcnode: trouble_key.field2,
|
||||
dbnode: trouble_key.field3,
|
||||
relnode: trouble_key.field4,
|
||||
forknum: trouble_key.field5,
|
||||
};
|
||||
let key = Key::from_compact(key);
|
||||
let rel = RelTag {
|
||||
spcnode: key.field2,
|
||||
dbnode: key.field3,
|
||||
relnode: key.field4,
|
||||
forknum: key.field5,
|
||||
};
|
||||
|
||||
if rel == trouble_rel {
|
||||
tracing::info!("Put for {key} at LSN {}", self.lsn);
|
||||
self.extra_log = true;
|
||||
}
|
||||
}
|
||||
|
||||
fn put_metadata(&mut self, key: CompactKey, val: Value) {
|
||||
@@ -2617,6 +2723,14 @@ impl DatadirModification<'_> {
|
||||
if key == CHECKPOINT_KEY.to_compact() {
|
||||
tracing::debug!("Checkpoint key added to pending with size {val_serialized_size}");
|
||||
}
|
||||
|
||||
let trouble_key = Key::from_hex("000000067F000040000000400600FFFFFFFF")
|
||||
.unwrap()
|
||||
.to_compact();
|
||||
if key == trouble_key {
|
||||
tracing::info!("Put for {trouble_key} at LSN {}", self.lsn);
|
||||
self.extra_log = true;
|
||||
}
|
||||
}
|
||||
|
||||
fn delete(&mut self, key_range: Range<Key>) {
|
||||
|
||||
@@ -1442,6 +1442,19 @@ impl DeltaLayerInner {
|
||||
}
|
||||
|
||||
pub fn iter<'a>(&'a self, ctx: &'a RequestContext) -> DeltaLayerIterator<'a> {
|
||||
self.iter_with_options(
|
||||
ctx,
|
||||
1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
|
||||
1024, // The default value. Unit tests might use a different value
|
||||
)
|
||||
}
|
||||
|
||||
pub fn iter_with_options<'a>(
|
||||
&'a self,
|
||||
ctx: &'a RequestContext,
|
||||
max_read_size: u64,
|
||||
max_batch_size: usize,
|
||||
) -> DeltaLayerIterator<'a> {
|
||||
let block_reader = FileBlockReader::new(&self.file, self.file_id);
|
||||
let tree_reader =
|
||||
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
|
||||
@@ -1451,10 +1464,7 @@ impl DeltaLayerInner {
|
||||
index_iter: tree_reader.iter(&[0; DELTA_KEY_SIZE], ctx),
|
||||
key_values_batch: std::collections::VecDeque::new(),
|
||||
is_end: false,
|
||||
planner: StreamingVectoredReadPlanner::new(
|
||||
1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
|
||||
1024, // The default value. Unit tests might use a different value
|
||||
),
|
||||
planner: StreamingVectoredReadPlanner::new(max_read_size, max_batch_size),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -685,6 +685,19 @@ impl ImageLayerInner {
|
||||
}
|
||||
|
||||
pub(crate) fn iter<'a>(&'a self, ctx: &'a RequestContext) -> ImageLayerIterator<'a> {
|
||||
self.iter_with_options(
|
||||
ctx,
|
||||
1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
|
||||
1024, // The default value. Unit tests might use a different value
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) fn iter_with_options<'a>(
|
||||
&'a self,
|
||||
ctx: &'a RequestContext,
|
||||
max_read_size: u64,
|
||||
max_batch_size: usize,
|
||||
) -> ImageLayerIterator<'a> {
|
||||
let block_reader = FileBlockReader::new(&self.file, self.file_id);
|
||||
let tree_reader =
|
||||
DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, block_reader);
|
||||
@@ -694,10 +707,7 @@ impl ImageLayerInner {
|
||||
index_iter: tree_reader.iter(&[0; KEY_SIZE], ctx),
|
||||
key_values_batch: VecDeque::new(),
|
||||
is_end: false,
|
||||
planner: StreamingVectoredReadPlanner::new(
|
||||
1024 * 8192, // The default value. Unit tests might use a different value. 1024 * 8K = 8MB buffer.
|
||||
1024, // The default value. Unit tests might use a different value
|
||||
),
|
||||
planner: StreamingVectoredReadPlanner::new(max_read_size, max_batch_size),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@ pub(crate) enum LayerRef<'a> {
|
||||
}
|
||||
|
||||
impl<'a> LayerRef<'a> {
|
||||
#[allow(dead_code)]
|
||||
fn iter(self, ctx: &'a RequestContext) -> LayerIterRef<'a> {
|
||||
match self {
|
||||
Self::Image(x) => LayerIterRef::Image(x.iter(ctx)),
|
||||
@@ -26,6 +27,22 @@ impl<'a> LayerRef<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
fn iter_with_options(
|
||||
self,
|
||||
ctx: &'a RequestContext,
|
||||
max_read_size: u64,
|
||||
max_batch_size: usize,
|
||||
) -> LayerIterRef<'a> {
|
||||
match self {
|
||||
Self::Image(x) => {
|
||||
LayerIterRef::Image(x.iter_with_options(ctx, max_read_size, max_batch_size))
|
||||
}
|
||||
Self::Delta(x) => {
|
||||
LayerIterRef::Delta(x.iter_with_options(ctx, max_read_size, max_batch_size))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn layer_dbg_info(&self) -> String {
|
||||
match self {
|
||||
Self::Image(x) => x.layer_dbg_info(),
|
||||
@@ -66,6 +83,8 @@ pub(crate) enum IteratorWrapper<'a> {
|
||||
first_key_lower_bound: (Key, Lsn),
|
||||
layer: LayerRef<'a>,
|
||||
source_desc: Arc<PersistentLayerKey>,
|
||||
max_read_size: u64,
|
||||
max_batch_size: usize,
|
||||
},
|
||||
Loaded {
|
||||
iter: PeekableLayerIterRef<'a>,
|
||||
@@ -146,6 +165,8 @@ impl<'a> IteratorWrapper<'a> {
|
||||
pub fn create_from_image_layer(
|
||||
image_layer: &'a ImageLayerInner,
|
||||
ctx: &'a RequestContext,
|
||||
max_read_size: u64,
|
||||
max_batch_size: usize,
|
||||
) -> Self {
|
||||
Self::NotLoaded {
|
||||
layer: LayerRef::Image(image_layer),
|
||||
@@ -157,12 +178,16 @@ impl<'a> IteratorWrapper<'a> {
|
||||
is_delta: false,
|
||||
}
|
||||
.into(),
|
||||
max_read_size,
|
||||
max_batch_size,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create_from_delta_layer(
|
||||
delta_layer: &'a DeltaLayerInner,
|
||||
ctx: &'a RequestContext,
|
||||
max_read_size: u64,
|
||||
max_batch_size: usize,
|
||||
) -> Self {
|
||||
Self::NotLoaded {
|
||||
layer: LayerRef::Delta(delta_layer),
|
||||
@@ -174,6 +199,8 @@ impl<'a> IteratorWrapper<'a> {
|
||||
is_delta: true,
|
||||
}
|
||||
.into(),
|
||||
max_read_size,
|
||||
max_batch_size,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -204,11 +231,13 @@ impl<'a> IteratorWrapper<'a> {
|
||||
first_key_lower_bound,
|
||||
layer,
|
||||
source_desc,
|
||||
max_read_size,
|
||||
max_batch_size,
|
||||
} = self
|
||||
else {
|
||||
unreachable!()
|
||||
};
|
||||
let iter = layer.iter(ctx);
|
||||
let iter = layer.iter_with_options(ctx, *max_read_size, *max_batch_size);
|
||||
let iter = PeekableLayerIterRef::create(iter).await?;
|
||||
if let Some((k1, l1, _)) = iter.peek() {
|
||||
let (k2, l2) = first_key_lower_bound;
|
||||
@@ -293,21 +322,41 @@ impl MergeIteratorItem for ((Key, Lsn, Value), Arc<PersistentLayerKey>) {
|
||||
}
|
||||
|
||||
impl<'a> MergeIterator<'a> {
|
||||
pub fn create_with_options(
|
||||
deltas: &[&'a DeltaLayerInner],
|
||||
images: &[&'a ImageLayerInner],
|
||||
ctx: &'a RequestContext,
|
||||
max_read_size: u64,
|
||||
max_batch_size: usize,
|
||||
) -> Self {
|
||||
let mut heap = Vec::with_capacity(images.len() + deltas.len());
|
||||
for image in images {
|
||||
heap.push(IteratorWrapper::create_from_image_layer(
|
||||
image,
|
||||
ctx,
|
||||
max_read_size,
|
||||
max_batch_size,
|
||||
));
|
||||
}
|
||||
for delta in deltas {
|
||||
heap.push(IteratorWrapper::create_from_delta_layer(
|
||||
delta,
|
||||
ctx,
|
||||
max_read_size,
|
||||
max_batch_size,
|
||||
));
|
||||
}
|
||||
Self {
|
||||
heap: BinaryHeap::from(heap),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create(
|
||||
deltas: &[&'a DeltaLayerInner],
|
||||
images: &[&'a ImageLayerInner],
|
||||
ctx: &'a RequestContext,
|
||||
) -> Self {
|
||||
let mut heap = Vec::with_capacity(images.len() + deltas.len());
|
||||
for image in images {
|
||||
heap.push(IteratorWrapper::create_from_image_layer(image, ctx));
|
||||
}
|
||||
for delta in deltas {
|
||||
heap.push(IteratorWrapper::create_from_delta_layer(delta, ctx));
|
||||
}
|
||||
Self {
|
||||
heap: BinaryHeap::from(heap),
|
||||
}
|
||||
Self::create_with_options(deltas, images, ctx, 1024 * 8192, 1024)
|
||||
}
|
||||
|
||||
pub(crate) async fn next_inner<R: MergeIteratorItem>(&mut self) -> anyhow::Result<Option<R>> {
|
||||
|
||||
@@ -2828,6 +2828,41 @@ impl Timeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check if the memory usage is within the limit.
|
||||
async fn check_memory_usage(
|
||||
self: &Arc<Self>,
|
||||
layer_selection: &[Layer],
|
||||
) -> Result<(), CompactionError> {
|
||||
let mut estimated_memory_usage_mb = 0.0;
|
||||
let mut num_image_layers = 0;
|
||||
let mut num_delta_layers = 0;
|
||||
let target_layer_size_bytes = 256 * 1024 * 1024;
|
||||
for layer in layer_selection {
|
||||
let layer_desc = layer.layer_desc();
|
||||
if layer_desc.is_delta() {
|
||||
// Delta layers at most have 1MB buffer; 3x to make it safe (there're deltas as large as 16KB).
|
||||
// Multiply the layer size so that tests can pass.
|
||||
estimated_memory_usage_mb +=
|
||||
3.0 * (layer_desc.file_size / target_layer_size_bytes) as f64;
|
||||
num_delta_layers += 1;
|
||||
} else {
|
||||
// Image layers at most have 1MB buffer but it might be compressed; assume 5x compression ratio.
|
||||
estimated_memory_usage_mb +=
|
||||
5.0 * (layer_desc.file_size / target_layer_size_bytes) as f64;
|
||||
num_image_layers += 1;
|
||||
}
|
||||
}
|
||||
if estimated_memory_usage_mb > 1024.0 {
|
||||
return Err(CompactionError::Other(anyhow!(
|
||||
"estimated memory usage is too high: {}MB, giving up compaction; num_image_layers={}, num_delta_layers={}",
|
||||
estimated_memory_usage_mb,
|
||||
num_image_layers,
|
||||
num_delta_layers
|
||||
)));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get a watermark for gc-compaction, that is the lowest LSN that we can use as the `gc_horizon` for
|
||||
/// the compaction algorithm. It is min(space_cutoff, time_cutoff, latest_gc_cutoff, standby_horizon).
|
||||
/// Leases and retain_lsns are considered in the gc-compaction job itself so we don't need to account for them
|
||||
@@ -3264,6 +3299,17 @@ impl Timeline {
|
||||
self.check_compaction_space(&job_desc.selected_layers)
|
||||
.await?;
|
||||
|
||||
self.check_memory_usage(&job_desc.selected_layers).await?;
|
||||
if job_desc.selected_layers.len() > 100
|
||||
&& job_desc.rewrite_layers.len() as f64 >= job_desc.selected_layers.len() as f64 * 0.7
|
||||
{
|
||||
return Err(CompactionError::Other(anyhow!(
|
||||
"too many layers to rewrite: {} / {}, giving up compaction",
|
||||
job_desc.rewrite_layers.len(),
|
||||
job_desc.selected_layers.len()
|
||||
)));
|
||||
}
|
||||
|
||||
// Generate statistics for the compaction
|
||||
for layer in &job_desc.selected_layers {
|
||||
let desc = layer.layer_desc();
|
||||
@@ -3359,7 +3405,13 @@ impl Timeline {
|
||||
.context("failed to collect gc compaction keyspace")
|
||||
.map_err(CompactionError::Other)?;
|
||||
let mut merge_iter = FilterIterator::create(
|
||||
MergeIterator::create(&delta_layers, &image_layers, ctx),
|
||||
MergeIterator::create_with_options(
|
||||
&delta_layers,
|
||||
&image_layers,
|
||||
ctx,
|
||||
128 * 8192, /* 1MB buffer for each of the inner iterators */
|
||||
128,
|
||||
),
|
||||
dense_ks,
|
||||
sparse_ks,
|
||||
)
|
||||
|
||||
18
poetry.lock
generated
18
poetry.lock
generated
@@ -1274,14 +1274,14 @@ files = [
|
||||
|
||||
[[package]]
|
||||
name = "h11"
|
||||
version = "0.14.0"
|
||||
version = "0.16.0"
|
||||
description = "A pure-Python, bring-your-own-I/O implementation of HTTP/1.1"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
python-versions = ">=3.8"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "h11-0.14.0-py3-none-any.whl", hash = "sha256:e3fe4ac4b851c468cc8363d500db52c2ead036020723024a109d37346efaa761"},
|
||||
{file = "h11-0.14.0.tar.gz", hash = "sha256:8f19fbbe99e72420ff35c00b27a34cb9937e902a8b810e2c88300c6f0a3b699d"},
|
||||
{file = "h11-0.16.0-py3-none-any.whl", hash = "sha256:63cf8bbe7522de3bf65932fda1d9c2772064ffb3dae62d55932da54b31cb6c86"},
|
||||
{file = "h11-0.16.0.tar.gz", hash = "sha256:4e35b956cf45792e4caa5885e69fba00bdbc6ffafbfa020300e549b208ee5ff1"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1314,25 +1314,25 @@ files = [
|
||||
|
||||
[[package]]
|
||||
name = "httpcore"
|
||||
version = "1.0.3"
|
||||
version = "1.0.9"
|
||||
description = "A minimal low-level HTTP client."
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "httpcore-1.0.3-py3-none-any.whl", hash = "sha256:9a6a501c3099307d9fd76ac244e08503427679b1e81ceb1d922485e2f2462ad2"},
|
||||
{file = "httpcore-1.0.3.tar.gz", hash = "sha256:5c0f9546ad17dac4d0772b0808856eb616eb8b48ce94f49ed819fd6982a8a544"},
|
||||
{file = "httpcore-1.0.9-py3-none-any.whl", hash = "sha256:2d400746a40668fc9dec9810239072b40b4484b640a8c38fd654a024c7a1bf55"},
|
||||
{file = "httpcore-1.0.9.tar.gz", hash = "sha256:6e34463af53fd2ab5d807f399a9b45ea31c3dfa2276f15a2c3f00afff6e176e8"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
certifi = "*"
|
||||
h11 = ">=0.13,<0.15"
|
||||
h11 = ">=0.16"
|
||||
|
||||
[package.extras]
|
||||
asyncio = ["anyio (>=4.0,<5.0)"]
|
||||
http2 = ["h2 (>=3,<5)"]
|
||||
socks = ["socksio (==1.*)"]
|
||||
trio = ["trio (>=0.22.0,<0.24.0)"]
|
||||
trio = ["trio (>=0.22.0,<1.0)"]
|
||||
|
||||
[[package]]
|
||||
name = "httpx"
|
||||
|
||||
@@ -8,7 +8,7 @@ use crate::error::{ErrorKind, ReportableError, UserFacingError};
|
||||
use crate::proxy::retry::CouldRetry;
|
||||
|
||||
/// A go-to error message which doesn't leak any detail.
|
||||
pub(crate) const REQUEST_FAILED: &str = "Console request failed";
|
||||
pub(crate) const REQUEST_FAILED: &str = "Control plane request failed";
|
||||
|
||||
/// Common console API error.
|
||||
#[derive(Debug, Error)]
|
||||
|
||||
@@ -323,6 +323,7 @@ class NeonProject:
|
||||
if self.restart_pgbench_on_console_errors and (
|
||||
"ERROR: Couldn't connect to compute node" in err
|
||||
or "ERROR: Console request failed" in err
|
||||
or "ERROR: Control plane request failed" in err
|
||||
):
|
||||
log.info("Restarting benchmark for %s", target)
|
||||
self.benchmarks.pop(target)
|
||||
|
||||
Reference in New Issue
Block a user