mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-20 06:30:43 +00:00
Compare commits
25 Commits
jcsp/storc
...
bodobolero
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b774e1655d | ||
|
|
4ab4510a84 | ||
|
|
6c0cf1c1cc | ||
|
|
95d50f918a | ||
|
|
30ba4ac50b | ||
|
|
2aef4a93e0 | ||
|
|
6638abda65 | ||
|
|
306017ebc7 | ||
|
|
8467fe9b05 | ||
|
|
08c2021881 | ||
|
|
4975dfd3cf | ||
|
|
5b26876ac7 | ||
|
|
e9ed53b14f | ||
|
|
a338aee132 | ||
|
|
96243af651 | ||
|
|
ef8bfacd6b | ||
|
|
ceacc29609 | ||
|
|
b31ed0acd1 | ||
|
|
b2d0e1a519 | ||
|
|
d1bc36f536 | ||
|
|
0b9032065e | ||
|
|
09fe3b025c | ||
|
|
12053cf832 | ||
|
|
de199d71e1 | ||
|
|
22a6460010 |
48
.github/actions/neon-project-create/action.yml
vendored
48
.github/actions/neon-project-create/action.yml
vendored
@@ -17,6 +17,31 @@ inputs:
|
||||
compute_units:
|
||||
description: '[Min, Max] compute units'
|
||||
default: '[1, 1]'
|
||||
# settings below only needed if you want the project to be sharded from the beginning
|
||||
shard_split_project:
|
||||
description: 'by default new projects are not shard-split, specify true to shard-split'
|
||||
required: false
|
||||
default: 'false'
|
||||
admin_api_key:
|
||||
description: 'Admin API Key needed for shard-splitting. Must be specified if shard_split_project is true'
|
||||
required: false
|
||||
shard_count:
|
||||
description: 'Number of shards to split the project into, only applies if shard_split_project is true'
|
||||
required: false
|
||||
default: '8'
|
||||
stripe_size:
|
||||
description: 'Stripe size, optional, in 8kiB pages. e.g. set 2048 for 16MB stripes. Default is 128 MiB, only applies if shard_split_project is true'
|
||||
required: false
|
||||
default: '32768'
|
||||
psql_path:
|
||||
description: 'Path to psql binary - it is caller responsibility to provision the psql binary'
|
||||
required: false
|
||||
default: '/tmp/neon/pg_install/v16/bin/psql'
|
||||
libpq_lib_path:
|
||||
description: 'Path to directory containing libpq library - it is caller responsibility to provision the libpq library'
|
||||
required: false
|
||||
default: '/tmp/neon/pg_install/v16/lib'
|
||||
|
||||
|
||||
outputs:
|
||||
dsn:
|
||||
@@ -63,6 +88,23 @@ runs:
|
||||
echo "project_id=${project_id}" >> $GITHUB_OUTPUT
|
||||
|
||||
echo "Project ${project_id} has been created"
|
||||
|
||||
if [ "${SHARD_SPLIT_PROJECT}" = "true" ]; then
|
||||
# determine tenant ID
|
||||
TENANT_ID=`${PSQL} ${dsn} -t -A -c "SHOW neon.tenant_id"`
|
||||
|
||||
echo "Splitting project ${project_id} with tenant_id ${TENANT_ID} into $((SHARD_COUNT)) shards with stripe size $((STRIPE_SIZE))"
|
||||
|
||||
echo "Sending PUT request to https://${API_HOST}/regions/${REGION_ID}/api/v1/admin/storage/proxy/control/v1/tenant/${TENANT_ID}/shard_split"
|
||||
echo "with body {\"new_shard_count\": $((SHARD_COUNT)), \"new_stripe_size\": $((STRIPE_SIZE))}"
|
||||
|
||||
# we need an ADMIN API KEY to invoke storage controller API for shard splitting (bash -u above checks that the variable is set)
|
||||
curl -X PUT \
|
||||
"https://${API_HOST}/regions/${REGION_ID}/api/v1/admin/storage/proxy/control/v1/tenant/${TENANT_ID}/shard_split" \
|
||||
-H "Accept: application/json" -H "Content-Type: application/json" -H "Authorization: Bearer ${ADMIN_API_KEY}" \
|
||||
-d "{\"new_shard_count\": $SHARD_COUNT, \"new_stripe_size\": $STRIPE_SIZE}"
|
||||
fi
|
||||
|
||||
env:
|
||||
API_HOST: ${{ inputs.api_host }}
|
||||
API_KEY: ${{ inputs.api_key }}
|
||||
@@ -70,3 +112,9 @@ runs:
|
||||
POSTGRES_VERSION: ${{ inputs.postgres_version }}
|
||||
MIN_CU: ${{ fromJSON(inputs.compute_units)[0] }}
|
||||
MAX_CU: ${{ fromJSON(inputs.compute_units)[1] }}
|
||||
SHARD_SPLIT_PROJECT: ${{ inputs.shard_split_project }}
|
||||
ADMIN_API_KEY: ${{ inputs.admin_api_key }}
|
||||
SHARD_COUNT: ${{ inputs.shard_count }}
|
||||
STRIPE_SIZE: ${{ inputs.stripe_size }}
|
||||
PSQL: ${{ inputs.psql_path }}
|
||||
LD_LIBRARY_PATH: ${{ inputs.libpq_lib_path }}
|
||||
|
||||
30
.github/workflows/ingest_benchmark.yml
vendored
30
.github/workflows/ingest_benchmark.yml
vendored
@@ -28,7 +28,31 @@ jobs:
|
||||
strategy:
|
||||
fail-fast: false # allow other variants to continue even if one fails
|
||||
matrix:
|
||||
target_project: [new_empty_project, large_existing_project]
|
||||
include:
|
||||
- target_project: new_empty_project
|
||||
stripe_size: 128 # 1 MiB
|
||||
- target_project: new_empty_project
|
||||
stripe_size: 256 # 2 MiB
|
||||
- target_project: new_empty_project
|
||||
stripe_size: 512 # 4 MiB
|
||||
- target_project: new_empty_project
|
||||
stripe_size: 1024 # 8 MiB
|
||||
- target_project: new_empty_project
|
||||
stripe_size: 2048 # 16 MiB
|
||||
- target_project: new_empty_project
|
||||
stripe_size: 4096 # 32 MiB
|
||||
- target_project: new_empty_project
|
||||
stripe_size: 8192 # 64 MiB
|
||||
- target_project: new_empty_project
|
||||
stripe_size: 16384 # 128 MiB
|
||||
- target_project: new_empty_project
|
||||
stripe_size: 32768 # 256 MiB # note that this is different from null because using null will shard_split the project only if it reaches the threshold
|
||||
# while here it is sharded from the beginning with a shard size of 256 MiB
|
||||
- target_project: new_empty_project
|
||||
stripe_size: null # run with neon defaults which will shard split only when reaching the threshold
|
||||
- target_project: large_existing_project
|
||||
stripe_size: null # cannot re-shared or choose different stripe size for existing, already sharded project
|
||||
max-parallel: 1 # we want to run each stripe size sequentially to be able to compare the results
|
||||
permissions:
|
||||
contents: write
|
||||
statuses: write
|
||||
@@ -75,6 +99,10 @@ jobs:
|
||||
postgres_version: 16
|
||||
compute_units: '[7, 7]' # we want to test large compute here to avoid compute-side bottleneck
|
||||
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
|
||||
shard_split_project: ${{ matrix.stripe_size != null && 'true' || 'false' }}
|
||||
admin_api_key: ${{ secrets.NEON_STAGING_ADMIN_API_KEY }}
|
||||
shard_count: 8
|
||||
stripe_size: ${{ matrix.stripe_size }}
|
||||
|
||||
- name: Initialize Neon project
|
||||
if: ${{ matrix.target_project == 'new_empty_project' }}
|
||||
|
||||
55
Cargo.lock
generated
55
Cargo.lock
generated
@@ -1605,6 +1605,32 @@ dependencies = [
|
||||
"typenum",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "curve25519-dalek"
|
||||
version = "4.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "97fb8b7c4503de7d6ae7b42ab72a5a59857b4c937ec27a3d4539dba95b5ab2be"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"cpufeatures",
|
||||
"curve25519-dalek-derive",
|
||||
"digest",
|
||||
"fiat-crypto",
|
||||
"rustc_version",
|
||||
"subtle",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "curve25519-dalek-derive"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.90",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "darling"
|
||||
version = "0.20.1"
|
||||
@@ -1875,6 +1901,28 @@ dependencies = [
|
||||
"spki 0.7.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ed25519"
|
||||
version = "2.2.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "115531babc129696a58c64a4fef0a8bf9e9698629fb97e9e40767d235cfbcd53"
|
||||
dependencies = [
|
||||
"signature 2.2.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ed25519-dalek"
|
||||
version = "2.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4a3daa8e81a3963a60642bcc1f90a670680bd4a77535faa384e9d1c79d620871"
|
||||
dependencies = [
|
||||
"curve25519-dalek",
|
||||
"ed25519",
|
||||
"rand_core 0.6.4",
|
||||
"sha2",
|
||||
"subtle",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "either"
|
||||
version = "1.8.1"
|
||||
@@ -2113,6 +2161,12 @@ dependencies = [
|
||||
"subtle",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fiat-crypto"
|
||||
version = "0.2.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d"
|
||||
|
||||
[[package]]
|
||||
name = "filetime"
|
||||
version = "0.2.22"
|
||||
@@ -4745,6 +4799,7 @@ dependencies = [
|
||||
"consumption_metrics",
|
||||
"dashmap 5.5.0",
|
||||
"ecdsa 0.16.9",
|
||||
"ed25519-dalek",
|
||||
"env_logger 0.10.2",
|
||||
"fallible-iterator",
|
||||
"flate2",
|
||||
|
||||
@@ -71,6 +71,7 @@ RUN set -e \
|
||||
ca-certificates \
|
||||
# System postgres for use with client libraries (e.g. in storage controller)
|
||||
postgresql-15 \
|
||||
openssl \
|
||||
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* \
|
||||
&& useradd -d /data neon \
|
||||
&& chown -R neon:neon /data
|
||||
|
||||
3
Makefile
3
Makefile
@@ -3,7 +3,6 @@ ROOT_PROJECT_DIR := $(dir $(abspath $(lastword $(MAKEFILE_LIST))))
|
||||
# Where to install Postgres, default is ./pg_install, maybe useful for package managers
|
||||
POSTGRES_INSTALL_DIR ?= $(ROOT_PROJECT_DIR)/pg_install/
|
||||
|
||||
OPENSSL_PREFIX_DIR := /usr/local/openssl
|
||||
ICU_PREFIX_DIR := /usr/local/icu
|
||||
|
||||
#
|
||||
@@ -26,11 +25,9 @@ endif
|
||||
ifeq ($(shell test -e /home/nonroot/.docker_build && echo -n yes),yes)
|
||||
# Exclude static build openssl, icu for local build (MacOS, Linux)
|
||||
# Only keep for build type release and debug
|
||||
PG_CFLAGS += -I$(OPENSSL_PREFIX_DIR)/include
|
||||
PG_CONFIGURE_OPTS += --with-icu
|
||||
PG_CONFIGURE_OPTS += ICU_CFLAGS='-I/$(ICU_PREFIX_DIR)/include -DU_STATIC_IMPLEMENTATION'
|
||||
PG_CONFIGURE_OPTS += ICU_LIBS='-L$(ICU_PREFIX_DIR)/lib -L$(ICU_PREFIX_DIR)/lib64 -licui18n -licuuc -licudata -lstdc++ -Wl,-Bdynamic -lm'
|
||||
PG_CONFIGURE_OPTS += LDFLAGS='-L$(OPENSSL_PREFIX_DIR)/lib -L$(OPENSSL_PREFIX_DIR)/lib64 -L$(ICU_PREFIX_DIR)/lib -L$(ICU_PREFIX_DIR)/lib64 -Wl,-Bstatic -lssl -lcrypto -Wl,-Bdynamic -lrt -lm -ldl -lpthread'
|
||||
endif
|
||||
|
||||
UNAME_S := $(shell uname -s)
|
||||
|
||||
@@ -190,21 +190,6 @@ RUN for package in Capture::Tiny DateTime Devel::Cover Digest::MD5 File::Spec JS
|
||||
&& make install \
|
||||
&& rm -rf ../lcov.tar.gz
|
||||
|
||||
# Compile and install the static OpenSSL library
|
||||
ENV OPENSSL_VERSION=1.1.1w
|
||||
ENV OPENSSL_PREFIX=/usr/local/openssl
|
||||
RUN wget -O /tmp/openssl-${OPENSSL_VERSION}.tar.gz https://www.openssl.org/source/openssl-${OPENSSL_VERSION}.tar.gz && \
|
||||
echo "cf3098950cb4d853ad95c0841f1f9c6d3dc102dccfcacd521d93925208b76ac8 /tmp/openssl-${OPENSSL_VERSION}.tar.gz" | sha256sum --check && \
|
||||
cd /tmp && \
|
||||
tar xzvf /tmp/openssl-${OPENSSL_VERSION}.tar.gz && \
|
||||
rm /tmp/openssl-${OPENSSL_VERSION}.tar.gz && \
|
||||
cd /tmp/openssl-${OPENSSL_VERSION} && \
|
||||
./config --prefix=${OPENSSL_PREFIX} -static --static no-shared -fPIC && \
|
||||
make -j "$(nproc)" && \
|
||||
make install && \
|
||||
cd /tmp && \
|
||||
rm -rf /tmp/openssl-${OPENSSL_VERSION}
|
||||
|
||||
# Use the same version of libicu as the compute nodes so that
|
||||
# clusters created using inidb on pageserver can be used by computes.
|
||||
#
|
||||
|
||||
@@ -170,7 +170,6 @@ RUN case "${PG_VERSION}" in \
|
||||
wget https://download.osgeo.org/postgis/source/postgis-${POSTGIS_VERSION}.tar.gz -O postgis.tar.gz && \
|
||||
echo "${POSTGIS_CHECKSUM} postgis.tar.gz" | sha256sum --check && \
|
||||
mkdir postgis-src && cd postgis-src && tar xzf ../postgis.tar.gz --strip-components=1 -C . && \
|
||||
find /usr/local/pgsql -type f | sed 's|^/usr/local/pgsql/||' > /before.txt &&\
|
||||
./autogen.sh && \
|
||||
./configure --with-sfcgal=/usr/local/bin/sfcgal-config && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
@@ -220,11 +219,7 @@ RUN case "${PG_VERSION}" in \
|
||||
cmake -GNinja -DCMAKE_BUILD_TYPE=Release .. && \
|
||||
ninja -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
ninja -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgrouting.control && \
|
||||
find /usr/local/pgsql -type f | sed 's|^/usr/local/pgsql/||' > /after.txt &&\
|
||||
cp /usr/local/pgsql/share/extension/pgrouting.control /extensions/postgis && \
|
||||
sort -o /before.txt /before.txt && sort -o /after.txt /after.txt && \
|
||||
comm -13 /before.txt /after.txt | tar --directory=/usr/local/pgsql --zstd -cf /extensions/postgis.tar.zst -T -
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgrouting.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
@@ -842,13 +837,8 @@ RUN case "${PG_VERSION}" in "v17") \
|
||||
wget https://github.com/neondatabase/postgresql_anonymizer/archive/refs/tags/neon_1.1.1.tar.gz -O pg_anon.tar.gz && \
|
||||
echo "321ea8d5c1648880aafde850a2c576e4a9e7b9933a34ce272efc839328999fa9 pg_anon.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_anon-src && cd pg_anon-src && tar xzf ../pg_anon.tar.gz --strip-components=1 -C . && \
|
||||
find /usr/local/pgsql -type f | sed 's|^/usr/local/pgsql/||' > /before.txt &&\
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/anon.control && \
|
||||
find /usr/local/pgsql -type f | sed 's|^/usr/local/pgsql/||' > /after.txt &&\
|
||||
mkdir -p /extensions/anon && cp /usr/local/pgsql/share/extension/anon.control /extensions/anon && \
|
||||
sort -o /before.txt /before.txt && sort -o /after.txt /after.txt && \
|
||||
comm -13 /before.txt /after.txt | tar --directory=/usr/local/pgsql --zstd -cf /extensions/anon.tar.zst -T -
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/anon.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
@@ -1065,8 +1055,8 @@ ARG PG_VERSION
|
||||
# NOTE: local_proxy depends on the version of pg_session_jwt
|
||||
# Do not update without approve from proxy team
|
||||
# Make sure the version is reflected in proxy/src/serverless/local_conn_pool.rs
|
||||
RUN wget https://github.com/neondatabase/pg_session_jwt/archive/refs/tags/v0.1.2-v17.tar.gz -O pg_session_jwt.tar.gz && \
|
||||
echo "c8ecbed9cb8c6441bce5134a176002b043018adf9d05a08e457dda233090a86e pg_session_jwt.tar.gz" | sha256sum --check && \
|
||||
RUN wget https://github.com/neondatabase/pg_session_jwt/archive/refs/tags/v0.2.0.tar.gz -O pg_session_jwt.tar.gz && \
|
||||
echo "5ace028e591f2e000ca10afa5b1ca62203ebff014c2907c0ec3b29c36f28a1bb pg_session_jwt.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_session_jwt-src && cd pg_session_jwt-src && tar xzf ../pg_session_jwt.tar.gz --strip-components=1 -C . && \
|
||||
sed -i 's/pgrx = "0.12.6"/pgrx = { version = "=0.12.6", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
|
||||
cargo pgrx install --release
|
||||
|
||||
@@ -483,7 +483,6 @@ impl LocalEnv {
|
||||
.iter()
|
||||
.find(|(mapped_tenant_id, _)| mapped_tenant_id == &tenant_id)
|
||||
.map(|&(_, timeline_id)| timeline_id)
|
||||
.map(TimelineId::from)
|
||||
}
|
||||
|
||||
pub fn timeline_name_mappings(&self) -> HashMap<TenantTimelineId, String> {
|
||||
|
||||
@@ -822,10 +822,7 @@ impl StorageController {
|
||||
self.dispatch(
|
||||
Method::PUT,
|
||||
format!("control/v1/tenant/{tenant_shard_id}/migrate"),
|
||||
Some(TenantShardMigrateRequest {
|
||||
tenant_shard_id,
|
||||
node_id,
|
||||
}),
|
||||
Some(TenantShardMigrateRequest { node_id }),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -112,6 +112,13 @@ enum Command {
|
||||
#[arg(long)]
|
||||
node: NodeId,
|
||||
},
|
||||
/// Migrate the secondary location for a tenant shard to a specific pageserver.
|
||||
TenantShardMigrateSecondary {
|
||||
#[arg(long)]
|
||||
tenant_shard_id: TenantShardId,
|
||||
#[arg(long)]
|
||||
node: NodeId,
|
||||
},
|
||||
/// Cancel any ongoing reconciliation for this shard
|
||||
TenantShardCancelReconcile {
|
||||
#[arg(long)]
|
||||
@@ -540,10 +547,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
tenant_shard_id,
|
||||
node,
|
||||
} => {
|
||||
let req = TenantShardMigrateRequest {
|
||||
tenant_shard_id,
|
||||
node_id: node,
|
||||
};
|
||||
let req = TenantShardMigrateRequest { node_id: node };
|
||||
|
||||
storcon_client
|
||||
.dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
|
||||
@@ -553,6 +557,20 @@ async fn main() -> anyhow::Result<()> {
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
Command::TenantShardMigrateSecondary {
|
||||
tenant_shard_id,
|
||||
node,
|
||||
} => {
|
||||
let req = TenantShardMigrateRequest { node_id: node };
|
||||
|
||||
storcon_client
|
||||
.dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
|
||||
Method::PUT,
|
||||
format!("control/v1/tenant/{tenant_shard_id}/migrate_secondary"),
|
||||
Some(req),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
Command::TenantShardCancelReconcile { tenant_shard_id } => {
|
||||
storcon_client
|
||||
.dispatch::<(), ()>(
|
||||
@@ -915,10 +933,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
.dispatch::<TenantShardMigrateRequest, TenantShardMigrateResponse>(
|
||||
Method::PUT,
|
||||
format!("control/v1/tenant/{}/migrate", mv.tenant_shard_id),
|
||||
Some(TenantShardMigrateRequest {
|
||||
tenant_shard_id: mv.tenant_shard_id,
|
||||
node_id: mv.to,
|
||||
}),
|
||||
Some(TenantShardMigrateRequest { node_id: mv.to }),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| (mv.tenant_shard_id, mv.from, mv.to, e))
|
||||
|
||||
@@ -179,7 +179,6 @@ pub struct TenantDescribeResponseShard {
|
||||
/// specifies some constraints, e.g. asking it to get off particular node(s)
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct TenantShardMigrateRequest {
|
||||
pub tenant_shard_id: TenantShardId,
|
||||
pub node_id: NodeId,
|
||||
}
|
||||
|
||||
|
||||
@@ -706,7 +706,7 @@ pub fn repl_origin_key_range() -> Range<Key> {
|
||||
/// Non inherited range for vectored get.
|
||||
pub const NON_INHERITED_RANGE: Range<Key> = AUX_FILES_KEY..AUX_FILES_KEY.next();
|
||||
/// Sparse keyspace range for vectored get. Missing key error will be ignored for this range.
|
||||
pub const NON_INHERITED_SPARSE_RANGE: Range<Key> = Key::metadata_key_range();
|
||||
pub const SPARSE_RANGE: Range<Key> = Key::metadata_key_range();
|
||||
|
||||
impl Key {
|
||||
// AUX_FILES currently stores only data for logical replication (slots etc), and
|
||||
@@ -714,7 +714,42 @@ impl Key {
|
||||
// switch (and generally it likely should be optional), so ignore these.
|
||||
#[inline(always)]
|
||||
pub fn is_inherited_key(self) -> bool {
|
||||
!NON_INHERITED_RANGE.contains(&self) && !NON_INHERITED_SPARSE_RANGE.contains(&self)
|
||||
if self.is_sparse() {
|
||||
self.is_inherited_sparse_key()
|
||||
} else {
|
||||
!NON_INHERITED_RANGE.contains(&self)
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn is_sparse(self) -> bool {
|
||||
self.field1 >= METADATA_KEY_BEGIN_PREFIX && self.field1 < METADATA_KEY_END_PREFIX
|
||||
}
|
||||
|
||||
/// Check if the key belongs to the inherited keyspace.
|
||||
fn is_inherited_sparse_key(self) -> bool {
|
||||
debug_assert!(self.is_sparse());
|
||||
self.field1 == RELATION_SIZE_PREFIX
|
||||
}
|
||||
|
||||
pub fn sparse_non_inherited_keyspace() -> Range<Key> {
|
||||
// The two keys are adjacent; if we will have non-adjancent keys in the future, we should return a keyspace
|
||||
debug_assert_eq!(AUX_KEY_PREFIX + 1, REPL_ORIGIN_KEY_PREFIX);
|
||||
Key {
|
||||
field1: AUX_KEY_PREFIX,
|
||||
field2: 0,
|
||||
field3: 0,
|
||||
field4: 0,
|
||||
field5: 0,
|
||||
field6: 0,
|
||||
}..Key {
|
||||
field1: REPL_ORIGIN_KEY_PREFIX + 1,
|
||||
field2: 0,
|
||||
field3: 0,
|
||||
field4: 0,
|
||||
field5: 0,
|
||||
field6: 0,
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
|
||||
@@ -44,7 +44,7 @@ pub struct ProtocolVersion(u32);
|
||||
|
||||
impl ProtocolVersion {
|
||||
pub const fn new(major: u16, minor: u16) -> Self {
|
||||
Self((major as u32) << 16 | minor as u32)
|
||||
Self(((major as u32) << 16) | minor as u32)
|
||||
}
|
||||
pub const fn minor(self) -> u16 {
|
||||
self.0 as u16
|
||||
|
||||
@@ -112,9 +112,9 @@ impl Serialize for Generation {
|
||||
// We should never be asked to serialize a None. Structures
|
||||
// that include an optional generation should convert None to an
|
||||
// Option<Generation>::None
|
||||
Err(serde::ser::Error::custom(
|
||||
"Tried to serialize invalid generation ({self})",
|
||||
))
|
||||
Err(serde::ser::Error::custom(format!(
|
||||
"Tried to serialize invalid generation ({self:?})"
|
||||
)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,7 +15,7 @@ use once_cell::sync::Lazy;
|
||||
use regex::Regex;
|
||||
use routerify::ext::RequestExt;
|
||||
use routerify::{Middleware, RequestInfo, Router, RouterBuilder};
|
||||
use tokio::sync::{mpsc, Mutex};
|
||||
use tokio::sync::{mpsc, Mutex, Notify};
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use tokio_util::io::ReaderStream;
|
||||
use tracing::{debug, info, info_span, warn, Instrument};
|
||||
@@ -350,33 +350,53 @@ pub async fn profile_cpu_handler(req: Request<Body>) -> Result<Response<Body>, A
|
||||
};
|
||||
let seconds = match parse_query_param(&req, "seconds")? {
|
||||
None => 5,
|
||||
Some(seconds @ 1..=30) => seconds,
|
||||
Some(_) => return Err(ApiError::BadRequest(anyhow!("duration must be 1-30 secs"))),
|
||||
Some(seconds @ 1..=60) => seconds,
|
||||
Some(_) => return Err(ApiError::BadRequest(anyhow!("duration must be 1-60 secs"))),
|
||||
};
|
||||
let frequency_hz = match parse_query_param(&req, "frequency")? {
|
||||
None => 99,
|
||||
Some(1001..) => return Err(ApiError::BadRequest(anyhow!("frequency must be <=1000 Hz"))),
|
||||
Some(frequency) => frequency,
|
||||
};
|
||||
|
||||
// Only allow one profiler at a time.
|
||||
static PROFILE_LOCK: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
|
||||
let _lock = PROFILE_LOCK
|
||||
.try_lock()
|
||||
.map_err(|_| ApiError::Conflict("profiler already running".into()))?;
|
||||
let force: bool = parse_query_param(&req, "force")?.unwrap_or_default();
|
||||
|
||||
// Take the profile.
|
||||
let report = tokio::task::spawn_blocking(move || {
|
||||
static PROFILE_LOCK: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
|
||||
static PROFILE_CANCEL: Lazy<Notify> = Lazy::new(Notify::new);
|
||||
|
||||
let report = {
|
||||
// Only allow one profiler at a time. If force is true, cancel a running profile (e.g. a
|
||||
// Grafana continuous profile). We use a try_lock() loop when cancelling instead of waiting
|
||||
// for a lock(), to avoid races where the notify isn't currently awaited.
|
||||
let _lock = loop {
|
||||
match PROFILE_LOCK.try_lock() {
|
||||
Ok(lock) => break lock,
|
||||
Err(_) if force => PROFILE_CANCEL.notify_waiters(),
|
||||
Err(_) => {
|
||||
return Err(ApiError::Conflict(
|
||||
"profiler already running (use ?force=true to cancel it)".into(),
|
||||
))
|
||||
}
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(1)).await; // don't busy-wait
|
||||
};
|
||||
|
||||
let guard = ProfilerGuardBuilder::default()
|
||||
.frequency(frequency_hz)
|
||||
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
|
||||
.build()?;
|
||||
std::thread::sleep(Duration::from_secs(seconds));
|
||||
guard.report().build()
|
||||
})
|
||||
.await
|
||||
.map_err(|join_err| ApiError::InternalServerError(join_err.into()))?
|
||||
.map_err(|pprof_err| ApiError::InternalServerError(pprof_err.into()))?;
|
||||
.build()
|
||||
.map_err(|err| ApiError::InternalServerError(err.into()))?;
|
||||
|
||||
tokio::select! {
|
||||
_ = tokio::time::sleep(Duration::from_secs(seconds)) => {},
|
||||
_ = PROFILE_CANCEL.notified() => {},
|
||||
};
|
||||
|
||||
guard
|
||||
.report()
|
||||
.build()
|
||||
.map_err(|err| ApiError::InternalServerError(err.into()))?
|
||||
};
|
||||
|
||||
// Return the report in the requested format.
|
||||
match format {
|
||||
|
||||
@@ -260,7 +260,7 @@ impl FromStr for Lsn {
|
||||
{
|
||||
let left_num = u32::from_str_radix(left, 16).map_err(|_| LsnParseError)?;
|
||||
let right_num = u32::from_str_radix(right, 16).map_err(|_| LsnParseError)?;
|
||||
Ok(Lsn((left_num as u64) << 32 | right_num as u64))
|
||||
Ok(Lsn(((left_num as u64) << 32) | right_num as u64))
|
||||
} else {
|
||||
Err(LsnParseError)
|
||||
}
|
||||
|
||||
@@ -5682,7 +5682,7 @@ mod tests {
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use hex_literal::hex;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::key::{Key, AUX_KEY_PREFIX, NON_INHERITED_RANGE};
|
||||
use pageserver_api::key::{Key, AUX_KEY_PREFIX, NON_INHERITED_RANGE, RELATION_SIZE_PREFIX};
|
||||
use pageserver_api::keyspace::KeySpace;
|
||||
use pageserver_api::models::{CompactionAlgorithm, CompactionAlgorithmSettings};
|
||||
use pageserver_api::value::Value;
|
||||
@@ -7741,7 +7741,18 @@ mod tests {
|
||||
let base_key = Key::from_hex("620000000033333333444444445500000000").unwrap();
|
||||
let base_key_child = Key::from_hex("620000000033333333444444445500000001").unwrap();
|
||||
let base_key_nonexist = Key::from_hex("620000000033333333444444445500000002").unwrap();
|
||||
let base_key_overwrite = Key::from_hex("620000000033333333444444445500000003").unwrap();
|
||||
|
||||
let base_inherited_key = Key::from_hex("610000000033333333444444445500000000").unwrap();
|
||||
let base_inherited_key_child =
|
||||
Key::from_hex("610000000033333333444444445500000001").unwrap();
|
||||
let base_inherited_key_nonexist =
|
||||
Key::from_hex("610000000033333333444444445500000002").unwrap();
|
||||
let base_inherited_key_overwrite =
|
||||
Key::from_hex("610000000033333333444444445500000003").unwrap();
|
||||
|
||||
assert_eq!(base_key.field1, AUX_KEY_PREFIX); // in case someone accidentally changed the prefix...
|
||||
assert_eq!(base_inherited_key.field1, RELATION_SIZE_PREFIX);
|
||||
|
||||
let tline = tenant
|
||||
.create_test_timeline_with_layers(
|
||||
@@ -7750,7 +7761,18 @@ mod tests {
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
Vec::new(), // delta layers
|
||||
vec![(Lsn(0x20), vec![(base_key, test_img("metadata key 1"))])], // image layers
|
||||
vec![(
|
||||
Lsn(0x20),
|
||||
vec![
|
||||
(base_inherited_key, test_img("metadata inherited key 1")),
|
||||
(
|
||||
base_inherited_key_overwrite,
|
||||
test_img("metadata key overwrite 1a"),
|
||||
),
|
||||
(base_key, test_img("metadata key 1")),
|
||||
(base_key_overwrite, test_img("metadata key overwrite 1b")),
|
||||
],
|
||||
)], // image layers
|
||||
Lsn(0x20), // it's fine to not advance LSN to 0x30 while using 0x30 to get below because `get_vectored_impl` does not wait for LSN
|
||||
)
|
||||
.await?;
|
||||
@@ -7764,7 +7786,18 @@ mod tests {
|
||||
Vec::new(), // delta layers
|
||||
vec![(
|
||||
Lsn(0x30),
|
||||
vec![(base_key_child, test_img("metadata key 2"))],
|
||||
vec![
|
||||
(
|
||||
base_inherited_key_child,
|
||||
test_img("metadata inherited key 2"),
|
||||
),
|
||||
(
|
||||
base_inherited_key_overwrite,
|
||||
test_img("metadata key overwrite 2a"),
|
||||
),
|
||||
(base_key_child, test_img("metadata key 2")),
|
||||
(base_key_overwrite, test_img("metadata key overwrite 2b")),
|
||||
],
|
||||
)], // image layers
|
||||
Lsn(0x30),
|
||||
)
|
||||
@@ -7786,6 +7819,26 @@ mod tests {
|
||||
get_vectored_impl_wrapper(&tline, base_key_nonexist, lsn, &ctx).await?,
|
||||
None
|
||||
);
|
||||
assert_eq!(
|
||||
get_vectored_impl_wrapper(&tline, base_key_overwrite, lsn, &ctx).await?,
|
||||
Some(test_img("metadata key overwrite 1b"))
|
||||
);
|
||||
assert_eq!(
|
||||
get_vectored_impl_wrapper(&tline, base_inherited_key, lsn, &ctx).await?,
|
||||
Some(test_img("metadata inherited key 1"))
|
||||
);
|
||||
assert_eq!(
|
||||
get_vectored_impl_wrapper(&tline, base_inherited_key_child, lsn, &ctx).await?,
|
||||
None
|
||||
);
|
||||
assert_eq!(
|
||||
get_vectored_impl_wrapper(&tline, base_inherited_key_nonexist, lsn, &ctx).await?,
|
||||
None
|
||||
);
|
||||
assert_eq!(
|
||||
get_vectored_impl_wrapper(&tline, base_inherited_key_overwrite, lsn, &ctx).await?,
|
||||
Some(test_img("metadata key overwrite 1a"))
|
||||
);
|
||||
|
||||
// test vectored get on child timeline
|
||||
assert_eq!(
|
||||
@@ -7800,6 +7853,82 @@ mod tests {
|
||||
get_vectored_impl_wrapper(&child, base_key_nonexist, lsn, &ctx).await?,
|
||||
None
|
||||
);
|
||||
assert_eq!(
|
||||
get_vectored_impl_wrapper(&child, base_inherited_key, lsn, &ctx).await?,
|
||||
Some(test_img("metadata inherited key 1"))
|
||||
);
|
||||
assert_eq!(
|
||||
get_vectored_impl_wrapper(&child, base_inherited_key_child, lsn, &ctx).await?,
|
||||
Some(test_img("metadata inherited key 2"))
|
||||
);
|
||||
assert_eq!(
|
||||
get_vectored_impl_wrapper(&child, base_inherited_key_nonexist, lsn, &ctx).await?,
|
||||
None
|
||||
);
|
||||
assert_eq!(
|
||||
get_vectored_impl_wrapper(&child, base_key_overwrite, lsn, &ctx).await?,
|
||||
Some(test_img("metadata key overwrite 2b"))
|
||||
);
|
||||
assert_eq!(
|
||||
get_vectored_impl_wrapper(&child, base_inherited_key_overwrite, lsn, &ctx).await?,
|
||||
Some(test_img("metadata key overwrite 2a"))
|
||||
);
|
||||
|
||||
// test vectored scan on parent timeline
|
||||
let mut reconstruct_state = ValuesReconstructState::new();
|
||||
let res = tline
|
||||
.get_vectored_impl(
|
||||
KeySpace::single(Key::metadata_key_range()),
|
||||
lsn,
|
||||
&mut reconstruct_state,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
assert_eq!(
|
||||
res.into_iter()
|
||||
.map(|(k, v)| (k, v.unwrap()))
|
||||
.collect::<Vec<_>>(),
|
||||
vec![
|
||||
(base_inherited_key, test_img("metadata inherited key 1")),
|
||||
(
|
||||
base_inherited_key_overwrite,
|
||||
test_img("metadata key overwrite 1a")
|
||||
),
|
||||
(base_key, test_img("metadata key 1")),
|
||||
(base_key_overwrite, test_img("metadata key overwrite 1b")),
|
||||
]
|
||||
);
|
||||
|
||||
// test vectored scan on child timeline
|
||||
let mut reconstruct_state = ValuesReconstructState::new();
|
||||
let res = child
|
||||
.get_vectored_impl(
|
||||
KeySpace::single(Key::metadata_key_range()),
|
||||
lsn,
|
||||
&mut reconstruct_state,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
assert_eq!(
|
||||
res.into_iter()
|
||||
.map(|(k, v)| (k, v.unwrap()))
|
||||
.collect::<Vec<_>>(),
|
||||
vec![
|
||||
(base_inherited_key, test_img("metadata inherited key 1")),
|
||||
(
|
||||
base_inherited_key_child,
|
||||
test_img("metadata inherited key 2")
|
||||
),
|
||||
(
|
||||
base_inherited_key_overwrite,
|
||||
test_img("metadata key overwrite 2a")
|
||||
),
|
||||
(base_key_child, test_img("metadata key 2")),
|
||||
(base_key_overwrite, test_img("metadata key overwrite 2b")),
|
||||
]
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
pub(crate) use pageserver_api::config::TenantConfigToml as TenantConf;
|
||||
use pageserver_api::models::CompactionAlgorithmSettings;
|
||||
use pageserver_api::models::EvictionPolicy;
|
||||
use pageserver_api::models::{self, TenantConfigPatch, ThrottleConfig};
|
||||
use pageserver_api::models::{self, TenantConfigPatch};
|
||||
use pageserver_api::shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize};
|
||||
use serde::de::IntoDeserializer;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -597,7 +597,7 @@ impl From<TenantConfOpt> for models::TenantConfig {
|
||||
.map(humantime),
|
||||
heatmap_period: value.heatmap_period.map(humantime),
|
||||
lazy_slru_download: value.lazy_slru_download,
|
||||
timeline_get_throttle: value.timeline_get_throttle.map(ThrottleConfig::from),
|
||||
timeline_get_throttle: value.timeline_get_throttle,
|
||||
image_layer_creation_check_threshold: value.image_layer_creation_check_threshold,
|
||||
lsn_lease_length: value.lsn_lease_length.map(humantime),
|
||||
lsn_lease_length_for_ts: value.lsn_lease_length_for_ts.map(humantime),
|
||||
|
||||
@@ -84,17 +84,17 @@ impl Value {
|
||||
|
||||
fn to_u64(self) -> u64 {
|
||||
let b = &self.0;
|
||||
(b[0] as u64) << 32
|
||||
| (b[1] as u64) << 24
|
||||
| (b[2] as u64) << 16
|
||||
| (b[3] as u64) << 8
|
||||
((b[0] as u64) << 32)
|
||||
| ((b[1] as u64) << 24)
|
||||
| ((b[2] as u64) << 16)
|
||||
| ((b[3] as u64) << 8)
|
||||
| b[4] as u64
|
||||
}
|
||||
|
||||
fn to_blknum(self) -> u32 {
|
||||
let b = &self.0;
|
||||
assert!(b[0] == 0x80);
|
||||
(b[1] as u32) << 24 | (b[2] as u32) << 16 | (b[3] as u32) << 8 | b[4] as u32
|
||||
((b[1] as u32) << 24) | ((b[2] as u32) << 16) | ((b[3] as u32) << 8) | b[4] as u32
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@ pub mod merge_iterator;
|
||||
|
||||
use crate::context::{AccessStatsBehavior, RequestContext};
|
||||
use bytes::Bytes;
|
||||
use pageserver_api::key::{Key, NON_INHERITED_SPARSE_RANGE};
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
use pageserver_api::value::Value;
|
||||
@@ -209,7 +209,7 @@ impl ValuesReconstructState {
|
||||
.keys
|
||||
.entry(*key)
|
||||
.or_insert(Ok(VectoredValueReconstructState::default()));
|
||||
let is_sparse_key = NON_INHERITED_SPARSE_RANGE.contains(key);
|
||||
let is_sparse_key = key.is_sparse();
|
||||
if let Ok(state) = state {
|
||||
let key_done = match state.situation {
|
||||
ValueReconstructSituation::Complete => {
|
||||
|
||||
@@ -112,8 +112,8 @@ const MAX_SUPPORTED_BLOB_LEN_BITS: usize = {
|
||||
///
|
||||
/// Layout:
|
||||
/// - 1 bit: `will_init`
|
||||
/// - [`MAX_SUPPORTED_BLOB_LEN_BITS`]: `len`
|
||||
/// - [`MAX_SUPPORTED_POS_BITS`]: `pos`
|
||||
/// - [`MAX_SUPPORTED_BLOB_LEN_BITS`][]: `len`
|
||||
/// - [`MAX_SUPPORTED_POS_BITS`](IndexEntry::MAX_SUPPORTED_POS_BITS): `pos`
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub struct IndexEntry(u64);
|
||||
|
||||
|
||||
@@ -27,7 +27,7 @@ use pageserver_api::{
|
||||
config::tenant_conf_defaults::DEFAULT_COMPACTION_THRESHOLD,
|
||||
key::{
|
||||
KEY_SIZE, METADATA_KEY_BEGIN_PREFIX, METADATA_KEY_END_PREFIX, NON_INHERITED_RANGE,
|
||||
NON_INHERITED_SPARSE_RANGE,
|
||||
SPARSE_RANGE,
|
||||
},
|
||||
keyspace::{KeySpaceAccum, KeySpaceRandomAccum, SparseKeyPartitioning},
|
||||
models::{
|
||||
@@ -3221,7 +3221,7 @@ impl Timeline {
|
||||
// We don't return a blanket [`GetVectoredError::MissingKey`] to avoid
|
||||
// stalling compaction.
|
||||
keyspace.remove_overlapping_with(&KeySpace {
|
||||
ranges: vec![NON_INHERITED_RANGE, NON_INHERITED_SPARSE_RANGE],
|
||||
ranges: vec![NON_INHERITED_RANGE, Key::sparse_non_inherited_keyspace()],
|
||||
});
|
||||
|
||||
// Keyspace is fully retrieved
|
||||
@@ -3242,7 +3242,11 @@ impl Timeline {
|
||||
// keys from `keyspace`, we expect there to be no overlap between it and the image covered key
|
||||
// space. If that's not the case, we had at least one key encounter a gap in the image layer
|
||||
// and stop the search as a result of that.
|
||||
let removed = keyspace.remove_overlapping_with(&image_covered_keyspace);
|
||||
let mut removed = keyspace.remove_overlapping_with(&image_covered_keyspace);
|
||||
// Do not fire missing key error for sparse keys.
|
||||
removed.remove_overlapping_with(&KeySpace {
|
||||
ranges: vec![SPARSE_RANGE],
|
||||
});
|
||||
if !removed.is_empty() {
|
||||
break Some(removed);
|
||||
}
|
||||
@@ -3257,6 +3261,21 @@ impl Timeline {
|
||||
timeline = &*timeline_owned;
|
||||
};
|
||||
|
||||
// Remove sparse keys from the keyspace so that it doesn't fire errors.
|
||||
let missing_keyspace = if let Some(missing_keyspace) = missing_keyspace {
|
||||
let mut missing_keyspace = missing_keyspace;
|
||||
missing_keyspace.remove_overlapping_with(&KeySpace {
|
||||
ranges: vec![SPARSE_RANGE],
|
||||
});
|
||||
if missing_keyspace.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(missing_keyspace)
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
if let Some(missing_keyspace) = missing_keyspace {
|
||||
return Err(GetVectoredError::MissingKey(MissingKeyError {
|
||||
key: missing_keyspace.start().unwrap(), /* better if we can store the full keyspace */
|
||||
|
||||
@@ -403,7 +403,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
// need to advance last record LSN on all shards. If we've not ingested the latest
|
||||
// record, then set the LSN of the modification past it. This way all shards
|
||||
// advance their last record LSN at the same time.
|
||||
let needs_last_record_lsn_advance = match next_record_lsn.map(Lsn::from) {
|
||||
let needs_last_record_lsn_advance = match next_record_lsn {
|
||||
Some(lsn) if lsn > modification.get_lsn() => {
|
||||
modification.set_lsn(lsn).unwrap();
|
||||
true
|
||||
|
||||
@@ -308,7 +308,7 @@ impl WalIngest {
|
||||
epoch -= 1;
|
||||
}
|
||||
|
||||
Ok((epoch as u64) << 32 | xid as u64)
|
||||
Ok(((epoch as u64) << 32) | xid as u64)
|
||||
}
|
||||
|
||||
async fn ingest_clear_vm_bits(
|
||||
|
||||
@@ -106,6 +106,7 @@ jose-jwk = { version = "0.1.2", features = ["p256", "p384", "rsa"] }
|
||||
signature = "2"
|
||||
ecdsa = "0.16"
|
||||
p256 = { version = "0.13", features = ["jwk"] }
|
||||
ed25519-dalek = { version = "2", default-features = false, features = ["rand_core"] }
|
||||
rsa = "0.9"
|
||||
|
||||
workspace_hack.workspace = true
|
||||
|
||||
@@ -3,9 +3,9 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use ed25519_dalek::SigningKey;
|
||||
use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer};
|
||||
use p256::ecdsa::SigningKey;
|
||||
use p256::elliptic_curve::JwkEcKey;
|
||||
use jose_jwk::jose_b64;
|
||||
use rand::rngs::OsRng;
|
||||
use tokio::net::{lookup_host, TcpStream};
|
||||
use tracing::field::display;
|
||||
@@ -354,9 +354,15 @@ impl PoolingBackend {
|
||||
}
|
||||
}
|
||||
|
||||
fn create_random_jwk() -> (SigningKey, JwkEcKey) {
|
||||
let key = SigningKey::random(&mut OsRng);
|
||||
let jwk = p256::PublicKey::from(key.verifying_key()).to_jwk();
|
||||
fn create_random_jwk() -> (SigningKey, jose_jwk::Key) {
|
||||
let key = SigningKey::generate(&mut OsRng);
|
||||
|
||||
let jwk = jose_jwk::Key::Okp(jose_jwk::Okp {
|
||||
crv: jose_jwk::OkpCurves::Ed25519,
|
||||
x: jose_b64::serde::Bytes::from(key.verifying_key().to_bytes().to_vec()),
|
||||
d: None,
|
||||
});
|
||||
|
||||
(key, jwk)
|
||||
}
|
||||
|
||||
|
||||
@@ -16,17 +16,16 @@ use std::sync::Arc;
|
||||
use std::task::{ready, Poll};
|
||||
use std::time::Duration;
|
||||
|
||||
use ed25519_dalek::{Signature, Signer, SigningKey};
|
||||
use futures::future::poll_fn;
|
||||
use futures::Future;
|
||||
use indexmap::IndexMap;
|
||||
use jose_jwk::jose_b64::base64ct::{Base64UrlUnpadded, Encoding};
|
||||
use p256::ecdsa::{Signature, SigningKey};
|
||||
use parking_lot::RwLock;
|
||||
use postgres_client::tls::NoTlsStream;
|
||||
use postgres_client::types::ToSql;
|
||||
use postgres_client::AsyncMessage;
|
||||
use serde_json::value::RawValue;
|
||||
use signature::Signer;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::time::Instant;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -42,7 +41,7 @@ use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
|
||||
use crate::metrics::Metrics;
|
||||
|
||||
pub(crate) const EXT_NAME: &str = "pg_session_jwt";
|
||||
pub(crate) const EXT_VERSION: &str = "0.1.2";
|
||||
pub(crate) const EXT_VERSION: &str = "0.2.0";
|
||||
pub(crate) const EXT_SCHEMA: &str = "auth";
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -339,8 +338,8 @@ fn sign_jwt(sk: &SigningKey, payload: &[u8]) -> String {
|
||||
let cap = jwt.capacity();
|
||||
|
||||
// we only need an empty header with the alg specified.
|
||||
// base64url(r#"{"alg":"ES256"}"#) == "eyJhbGciOiJFUzI1NiJ9"
|
||||
jwt.push_str("eyJhbGciOiJFUzI1NiJ9.");
|
||||
// base64url(r#"{"alg":"EdDSA"}"#) == "eyJhbGciOiJFZERTQSJ9"
|
||||
jwt.push_str("eyJhbGciOiJFZERTQSJ9.");
|
||||
|
||||
// encode the jwt payload in-place
|
||||
base64::encode_config_buf(payload, base64::URL_SAFE_NO_PAD, &mut jwt);
|
||||
@@ -366,14 +365,14 @@ fn sign_jwt(sk: &SigningKey, payload: &[u8]) -> String {
|
||||
#[cfg(test)]
|
||||
#[expect(clippy::unwrap_used)]
|
||||
mod tests {
|
||||
use p256::ecdsa::SigningKey;
|
||||
use ed25519_dalek::SigningKey;
|
||||
use typed_json::json;
|
||||
|
||||
use super::resign_jwt;
|
||||
|
||||
#[test]
|
||||
fn jwt_token_snapshot() {
|
||||
let key = SigningKey::from_bytes(&[1; 32].into()).unwrap();
|
||||
let key = SigningKey::from_bytes(&[1; 32]);
|
||||
let data =
|
||||
json!({"foo":"bar","jti":"foo\nbar","nested":{"jti":"tricky nesting"}}).to_string();
|
||||
|
||||
@@ -381,12 +380,17 @@ mod tests {
|
||||
|
||||
// To validate the JWT, copy the JWT string and paste it into https://jwt.io/.
|
||||
// In the public-key box, paste the following jwk public key
|
||||
// `{"kty":"EC","crv":"P-256","x":"b_A7lJJBzh2t1DUZ5pYOCoW0GmmgXDKBA6orzhWUyhY","y":"PE91OlW_AdxT9sCwx-7ni0DG_30lqW4igrmJzvccFEo"}`
|
||||
// `{"kty":"OKP","crv":"Ed25519","x":"iojj3XQJ8ZX9UtstPLpdcspnCb8dlBIb83SIAbQPb1w"}`
|
||||
// Note - jwt.io doesn't support EdDSA :(
|
||||
// https://github.com/jsonwebtoken/jsonwebtoken.github.io/issues/509
|
||||
|
||||
// let pub_key = p256::ecdsa::VerifyingKey::from(&key);
|
||||
// let pub_key = p256::PublicKey::from(pub_key);
|
||||
// println!("{}", pub_key.to_jwk_string());
|
||||
// let jwk = jose_jwk::Key::Okp(jose_jwk::Okp {
|
||||
// crv: jose_jwk::OkpCurves::Ed25519,
|
||||
// x: jose_jwk::jose_b64::serde::Bytes::from(key.verifying_key().to_bytes().to_vec()),
|
||||
// d: None,
|
||||
// });
|
||||
// println!("{}", serde_json::to_string(&jwk).unwrap());
|
||||
|
||||
assert_eq!(jwt, "eyJhbGciOiJFUzI1NiJ9.eyJmb28iOiJiYXIiLCJqdGkiOjIsIm5lc3RlZCI6eyJqdGkiOiJ0cmlja3kgbmVzdGluZyJ9fQ.pYf0LxoJ8sDgpmsYOgrbNecOSipnPBEGwnZzB-JhW2cONrKlqRsgXwK8_cOsyolGy-hTTe8GXbWTl_UdpF5RyA");
|
||||
assert_eq!(jwt, "eyJhbGciOiJFZERTQSJ9.eyJmb28iOiJiYXIiLCJqdGkiOjIsIm5lc3RlZCI6eyJqdGkiOiJ0cmlja3kgbmVzdGluZyJ9fQ.Cvyc2By33KI0f0obystwdy8PN111L3Sc9_Mr2CU3XshtSqSdxuRxNEZGbb_RvyJf2IzheC_s7aBZ-jLeQ9N0Bg");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -124,7 +124,10 @@ impl ComputeHookTenant {
|
||||
if let Some(shard_idx) = shard_idx {
|
||||
sharded.shards.remove(shard_idx);
|
||||
} else {
|
||||
tracing::warn!("Shard not found while handling detach")
|
||||
// This is a valid but niche case, where the tenant was previously attached
|
||||
// as a Secondary location and then detached, so has no previously notified
|
||||
// state.
|
||||
tracing::info!("Shard not found while handling detach")
|
||||
}
|
||||
}
|
||||
ComputeHookTenant::Unsharded(_) => {
|
||||
@@ -761,7 +764,10 @@ impl ComputeHook {
|
||||
let mut state_locked = self.state.lock().unwrap();
|
||||
match state_locked.entry(tenant_shard_id.tenant_id) {
|
||||
Entry::Vacant(_) => {
|
||||
tracing::warn!("Compute hook tenant not found for detach");
|
||||
// This is a valid but niche case, where the tenant was previously attached
|
||||
// as a Secondary location and then detached, so has no previously notified
|
||||
// state.
|
||||
tracing::info!("Compute hook tenant not found for detach");
|
||||
}
|
||||
Entry::Occupied(mut e) => {
|
||||
let sharded = e.get().is_sharded();
|
||||
|
||||
@@ -690,7 +690,8 @@ async fn handle_node_list(req: Request<Body>) -> Result<Response<Body>, ApiError
|
||||
};
|
||||
|
||||
let state = get_state(&req);
|
||||
let nodes = state.service.node_list().await?;
|
||||
let mut nodes = state.service.node_list().await?;
|
||||
nodes.sort_by_key(|n| n.get_id());
|
||||
let api_nodes = nodes.into_iter().map(|n| n.describe()).collect::<Vec<_>>();
|
||||
|
||||
json_response(StatusCode::OK, api_nodes)
|
||||
@@ -1005,6 +1006,29 @@ async fn handle_tenant_shard_migrate(
|
||||
)
|
||||
}
|
||||
|
||||
async fn handle_tenant_shard_migrate_secondary(
|
||||
service: Arc<Service>,
|
||||
req: Request<Body>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
check_permissions(&req, Scope::Admin)?;
|
||||
|
||||
let mut req = match maybe_forward(req).await {
|
||||
ForwardOutcome::Forwarded(res) => {
|
||||
return res;
|
||||
}
|
||||
ForwardOutcome::NotForwarded(req) => req,
|
||||
};
|
||||
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&req, "tenant_shard_id")?;
|
||||
let migrate_req = json_request::<TenantShardMigrateRequest>(&mut req).await?;
|
||||
json_response(
|
||||
StatusCode::OK,
|
||||
service
|
||||
.tenant_shard_migrate_secondary(tenant_shard_id, migrate_req)
|
||||
.await?,
|
||||
)
|
||||
}
|
||||
|
||||
async fn handle_tenant_shard_cancel_reconcile(
|
||||
service: Arc<Service>,
|
||||
req: Request<Body>,
|
||||
@@ -1855,6 +1879,16 @@ pub fn make_router(
|
||||
RequestName("control_v1_tenant_migrate"),
|
||||
)
|
||||
})
|
||||
.put(
|
||||
"/control/v1/tenant/:tenant_shard_id/migrate_secondary",
|
||||
|r| {
|
||||
tenant_service_handler(
|
||||
r,
|
||||
handle_tenant_shard_migrate_secondary,
|
||||
RequestName("control_v1_tenant_migrate_secondary"),
|
||||
)
|
||||
},
|
||||
)
|
||||
.put(
|
||||
"/control/v1/tenant/:tenant_shard_id/cancel_reconcile",
|
||||
|r| {
|
||||
|
||||
@@ -696,6 +696,11 @@ impl Reconciler {
|
||||
/// First we apply special case handling (e.g. for live migrations), and then a
|
||||
/// general case reconciliation where we walk through the intent by pageserver
|
||||
/// and call out to the pageserver to apply the desired state.
|
||||
///
|
||||
/// An Ok(()) result indicates that we successfully attached the tenant, but _not_ that
|
||||
/// all locations for the tenant are in the expected state. When nodes that are to be detached
|
||||
/// or configured as secondary are unavailable, we may return Ok(()) but leave the shard in a
|
||||
/// state where it still requires later reconciliation.
|
||||
pub(crate) async fn reconcile(&mut self) -> Result<(), ReconcileError> {
|
||||
// Prepare: if we have uncertain `observed` state for our would-be attachement location, then refresh it
|
||||
self.maybe_refresh_observed().await?;
|
||||
@@ -784,10 +789,18 @@ impl Reconciler {
|
||||
tracing::info!(node_id=%node.get_id(), "Observed configuration already correct.")
|
||||
}
|
||||
_ => {
|
||||
// In all cases other than a matching observed configuration, we will
|
||||
// reconcile this location.
|
||||
tracing::info!(node_id=%node.get_id(), "Observed configuration requires update.");
|
||||
changes.push((node.clone(), wanted_conf))
|
||||
// Only try and configure secondary locations on nodes that are available. This
|
||||
// allows the reconciler to "succeed" while some secondaries are offline (e.g. after
|
||||
// a node failure, where the failed node will have a secondary intent)
|
||||
if node.is_available() {
|
||||
tracing::info!(node_id=%node.get_id(), "Observed configuration requires update.");
|
||||
changes.push((node.clone(), wanted_conf))
|
||||
} else {
|
||||
tracing::info!(node_id=%node.get_id(), "Skipping configuration as secondary, node is unavailable");
|
||||
self.observed
|
||||
.locations
|
||||
.insert(node.get_id(), ObservedStateLocation { conf: None });
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5055,6 +5055,69 @@ impl Service {
|
||||
Ok(TenantShardMigrateResponse {})
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_shard_migrate_secondary(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
migrate_req: TenantShardMigrateRequest,
|
||||
) -> Result<TenantShardMigrateResponse, ApiError> {
|
||||
let waiter = {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let (nodes, tenants, scheduler) = locked.parts_mut();
|
||||
|
||||
let Some(node) = nodes.get(&migrate_req.node_id) else {
|
||||
return Err(ApiError::BadRequest(anyhow::anyhow!(
|
||||
"Node {} not found",
|
||||
migrate_req.node_id
|
||||
)));
|
||||
};
|
||||
|
||||
if !node.is_available() {
|
||||
// Warn but proceed: the caller may intend to manually adjust the placement of
|
||||
// a shard even if the node is down, e.g. if intervening during an incident.
|
||||
tracing::warn!("Migrating to unavailable node {node}");
|
||||
}
|
||||
|
||||
let Some(shard) = tenants.get_mut(&tenant_shard_id) else {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Tenant shard not found").into(),
|
||||
));
|
||||
};
|
||||
|
||||
if shard.intent.get_secondary().len() == 1
|
||||
&& shard.intent.get_secondary()[0] == migrate_req.node_id
|
||||
{
|
||||
tracing::info!(
|
||||
"Migrating secondary to {node}: intent is unchanged {:?}",
|
||||
shard.intent
|
||||
);
|
||||
} else if shard.intent.get_attached() == &Some(migrate_req.node_id) {
|
||||
tracing::info!("Migrating secondary to {node}: already attached where we were asked to create a secondary");
|
||||
} else {
|
||||
let old_secondaries = shard.intent.get_secondary().clone();
|
||||
for secondary in old_secondaries {
|
||||
shard.intent.remove_secondary(scheduler, secondary);
|
||||
}
|
||||
|
||||
shard.intent.push_secondary(scheduler, migrate_req.node_id);
|
||||
shard.sequence = shard.sequence.next();
|
||||
tracing::info!(
|
||||
"Migrating secondary to {node}: new intent {:?}",
|
||||
shard.intent
|
||||
);
|
||||
}
|
||||
|
||||
self.maybe_reconcile_shard(shard, nodes)
|
||||
};
|
||||
|
||||
if let Some(waiter) = waiter {
|
||||
waiter.wait_timeout(RECONCILE_TIMEOUT).await?;
|
||||
} else {
|
||||
tracing::info!("Migration is a no-op");
|
||||
}
|
||||
|
||||
Ok(TenantShardMigrateResponse {})
|
||||
}
|
||||
|
||||
/// 'cancel' in this context means cancel any ongoing reconcile
|
||||
pub(crate) async fn tenant_shard_cancel_reconcile(
|
||||
&self,
|
||||
@@ -5256,7 +5319,8 @@ impl Service {
|
||||
expect_nodes.sort_by_key(|n| n.node_id);
|
||||
nodes.sort_by_key(|n| n.node_id);
|
||||
|
||||
if nodes != expect_nodes {
|
||||
// Errors relating to nodes are deferred so that we don't skip the shard checks below if we have a node error
|
||||
let node_result = if nodes != expect_nodes {
|
||||
tracing::error!("Consistency check failed on nodes.");
|
||||
tracing::error!(
|
||||
"Nodes in memory: {}",
|
||||
@@ -5268,10 +5332,12 @@ impl Service {
|
||||
serde_json::to_string(&nodes)
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))?
|
||||
);
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Node consistency failure"
|
||||
)));
|
||||
}
|
||||
)))
|
||||
} else {
|
||||
Ok(())
|
||||
};
|
||||
|
||||
let mut persistent_shards = self.persistence.load_active_tenant_shards().await?;
|
||||
persistent_shards
|
||||
@@ -5281,6 +5347,7 @@ impl Service {
|
||||
|
||||
if persistent_shards != expect_shards {
|
||||
tracing::error!("Consistency check failed on shards.");
|
||||
|
||||
tracing::error!(
|
||||
"Shards in memory: {}",
|
||||
serde_json::to_string(&expect_shards)
|
||||
@@ -5291,12 +5358,57 @@ impl Service {
|
||||
serde_json::to_string(&persistent_shards)
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))?
|
||||
);
|
||||
|
||||
// The total dump log lines above are useful in testing but in the field grafana will
|
||||
// usually just drop them because they're so large. So we also do some explicit logging
|
||||
// of just the diffs.
|
||||
let persistent_shards = persistent_shards
|
||||
.into_iter()
|
||||
.map(|tsp| (tsp.get_tenant_shard_id().unwrap(), tsp))
|
||||
.collect::<HashMap<_, _>>();
|
||||
let expect_shards = expect_shards
|
||||
.into_iter()
|
||||
.map(|tsp| (tsp.get_tenant_shard_id().unwrap(), tsp))
|
||||
.collect::<HashMap<_, _>>();
|
||||
for (tenant_shard_id, persistent_tsp) in &persistent_shards {
|
||||
match expect_shards.get(tenant_shard_id) {
|
||||
None => {
|
||||
tracing::error!(
|
||||
"Shard {} found in database but not in memory",
|
||||
tenant_shard_id
|
||||
);
|
||||
}
|
||||
Some(expect_tsp) => {
|
||||
if expect_tsp != persistent_tsp {
|
||||
tracing::error!(
|
||||
"Shard {} is inconsistent. In memory: {}, database has: {}",
|
||||
tenant_shard_id,
|
||||
serde_json::to_string(expect_tsp).unwrap(),
|
||||
serde_json::to_string(&persistent_tsp).unwrap()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Having already logged any differences, log any shards that simply aren't present in the database
|
||||
for (tenant_shard_id, memory_tsp) in &expect_shards {
|
||||
if !persistent_shards.contains_key(tenant_shard_id) {
|
||||
tracing::error!(
|
||||
"Shard {} found in memory but not in database: {}",
|
||||
tenant_shard_id,
|
||||
serde_json::to_string(memory_tsp)
|
||||
.map_err(|e| ApiError::InternalServerError(e.into()))?
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Shard consistency failure"
|
||||
)));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
node_result
|
||||
}
|
||||
|
||||
/// For debug/support: a JSON dump of the [`Scheduler`]. Returns a response so that
|
||||
|
||||
@@ -1122,10 +1122,15 @@ impl TenantShard {
|
||||
let result = reconciler.reconcile().await;
|
||||
|
||||
// If we know we had a pending compute notification from some previous action, send a notification irrespective
|
||||
// of whether the above reconcile() did any work
|
||||
// of whether the above reconcile() did any work. It has to be Ok() though, because otherwise we might be
|
||||
// sending a notification of a location that isn't really attached.
|
||||
if result.is_ok() && must_notify {
|
||||
// If this fails we will send the need to retry in [`ReconcileResult::pending_compute_notification`]
|
||||
reconciler.compute_notify().await.ok();
|
||||
} else if must_notify {
|
||||
// Carry this flag so that the reconciler's result will indicate that it still needs to retry
|
||||
// the compute hook notification eventually.
|
||||
reconciler.compute_notify_failure = true;
|
||||
}
|
||||
|
||||
// Update result counter
|
||||
|
||||
@@ -1,10 +1,15 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import ssl
|
||||
|
||||
import asyncpg
|
||||
import pytest
|
||||
import websocket_tunnel
|
||||
import websockets
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonProxy
|
||||
from fixtures.port_distributor import PortDistributor
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -196,3 +201,53 @@ async def test_websockets_pipelined(static_proxy: NeonProxy):
|
||||
# close
|
||||
await websocket.send(b"X\x00\x00\x00\x04")
|
||||
await websocket.wait_closed()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_websockets_tunneled(static_proxy: NeonProxy, port_distributor: PortDistributor):
|
||||
static_proxy.safe_psql("create user ws_auth with password 'ws' superuser")
|
||||
|
||||
user = "ws_auth"
|
||||
password = "ws"
|
||||
|
||||
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||
ssl_context.load_verify_locations(str(static_proxy.test_output_dir / "proxy.crt"))
|
||||
|
||||
# Launch a tunnel service so that we can speak the websockets protocol to
|
||||
# the proxy
|
||||
tunnel_port = port_distributor.get_port()
|
||||
tunnel_server = await websocket_tunnel.start_server(
|
||||
"127.0.0.1",
|
||||
tunnel_port,
|
||||
f"wss://{static_proxy.domain}:{static_proxy.external_http_port}/sql",
|
||||
ssl_context,
|
||||
)
|
||||
log.info(f"websockets tunnel listening for connections on port {tunnel_port}")
|
||||
|
||||
async with tunnel_server:
|
||||
|
||||
async def run_tunnel():
|
||||
try:
|
||||
async with tunnel_server:
|
||||
await tunnel_server.serve_forever()
|
||||
except Exception as e:
|
||||
log.error(f"Error in tunnel task: {e}")
|
||||
|
||||
tunnel_task = asyncio.create_task(run_tunnel())
|
||||
|
||||
# Ok, the tunnel is now running. Check that we can connect to the proxy's
|
||||
# websocket interface, through the tunnel
|
||||
tunnel_connstring = f"postgres://{user}:{password}@127.0.0.1:{tunnel_port}/postgres"
|
||||
|
||||
log.info(f"connecting to {tunnel_connstring}")
|
||||
conn = await asyncpg.connect(tunnel_connstring)
|
||||
res = await conn.fetchval("SELECT 123")
|
||||
assert res == 123
|
||||
await conn.close()
|
||||
log.info("Ran a query successfully through the tunnel")
|
||||
|
||||
tunnel_server.close()
|
||||
try:
|
||||
await tunnel_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
@@ -822,6 +822,122 @@ def test_storage_controller_stuck_compute_hook(
|
||||
env.storage_controller.consistency_check()
|
||||
|
||||
|
||||
@run_only_on_default_postgres("postgres behavior is not relevant")
|
||||
def test_storage_controller_compute_hook_retry(
|
||||
httpserver: HTTPServer,
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
httpserver_listen_address: ListenAddress,
|
||||
):
|
||||
"""
|
||||
Test that when a reconciler can't do its compute hook notification, it will keep
|
||||
trying until it succeeds.
|
||||
|
||||
Reproducer for https://github.com/neondatabase/cloud/issues/22612
|
||||
"""
|
||||
|
||||
neon_env_builder.num_pageservers = 2
|
||||
(host, port) = httpserver_listen_address
|
||||
neon_env_builder.control_plane_compute_hook_api = f"http://{host}:{port}/notify"
|
||||
|
||||
handle_params = {"status": 200}
|
||||
|
||||
notifications = []
|
||||
|
||||
def handler(request: Request):
|
||||
status = handle_params["status"]
|
||||
log.info(f"Notify request[{status}]: {request}")
|
||||
notifications.append(request.json)
|
||||
return Response(status=status)
|
||||
|
||||
httpserver.expect_request("/notify", method="PUT").respond_with_handler(handler)
|
||||
|
||||
# Start running
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
tenant_id = TenantId.generate()
|
||||
env.create_tenant(tenant_id, placement_policy='{"Attached": 1}')
|
||||
|
||||
# Initial notification from tenant creation
|
||||
assert len(notifications) == 1
|
||||
expect: dict[str, list[dict[str, int]] | str | None | int] = {
|
||||
"tenant_id": str(tenant_id),
|
||||
"stripe_size": None,
|
||||
"shards": [{"node_id": int(env.pageservers[0].id), "shard_number": 0}],
|
||||
"preferred_az": DEFAULT_AZ_ID,
|
||||
}
|
||||
assert notifications[0] == expect
|
||||
|
||||
# Block notifications, and fail a node
|
||||
handle_params["status"] = 423
|
||||
env.pageservers[0].stop()
|
||||
env.storage_controller.allowed_errors.append(NOTIFY_BLOCKED_LOG)
|
||||
env.storage_controller.allowed_errors.extend(NOTIFY_FAILURE_LOGS)
|
||||
|
||||
# Avoid waiting for heartbeats
|
||||
env.storage_controller.node_configure(env.pageservers[0].id, {"availability": "Offline"})
|
||||
|
||||
# Make reconciler run and fail: it should leave itself in a state where the shard will retry notification later,
|
||||
# and we will check that that happens
|
||||
notifications = []
|
||||
try:
|
||||
assert env.storage_controller.reconcile_all() == 1
|
||||
except StorageControllerApiException as e:
|
||||
assert "Control plane tenant busy" in str(e)
|
||||
assert len(notifications) == 1
|
||||
assert (
|
||||
env.storage_controller.tenant_describe(tenant_id)["shards"][0][
|
||||
"is_pending_compute_notification"
|
||||
]
|
||||
is True
|
||||
)
|
||||
|
||||
# Try reconciling again, it should try notifying again
|
||||
notifications = []
|
||||
try:
|
||||
assert env.storage_controller.reconcile_all() == 1
|
||||
except StorageControllerApiException as e:
|
||||
assert "Control plane tenant busy" in str(e)
|
||||
assert len(notifications) == 1
|
||||
assert (
|
||||
env.storage_controller.tenant_describe(tenant_id)["shards"][0][
|
||||
"is_pending_compute_notification"
|
||||
]
|
||||
is True
|
||||
)
|
||||
|
||||
# The describe API should indicate that a notification is pending
|
||||
assert (
|
||||
env.storage_controller.tenant_describe(tenant_id)["shards"][0][
|
||||
"is_pending_compute_notification"
|
||||
]
|
||||
is True
|
||||
)
|
||||
|
||||
# Unblock notifications: reconcile should work now
|
||||
handle_params["status"] = 200
|
||||
notifications = []
|
||||
assert env.storage_controller.reconcile_all() == 1
|
||||
assert len(notifications) == 1
|
||||
assert (
|
||||
env.storage_controller.tenant_describe(tenant_id)["shards"][0][
|
||||
"is_pending_compute_notification"
|
||||
]
|
||||
is False
|
||||
)
|
||||
|
||||
# Reconciler should be idle now that it succeeded in its compute notification
|
||||
notifications = []
|
||||
assert env.storage_controller.reconcile_all() == 0
|
||||
assert len(notifications) == 0
|
||||
assert (
|
||||
env.storage_controller.tenant_describe(tenant_id)["shards"][0][
|
||||
"is_pending_compute_notification"
|
||||
]
|
||||
is False
|
||||
)
|
||||
|
||||
|
||||
@run_only_on_default_postgres("this test doesn't start an endpoint")
|
||||
def test_storage_controller_compute_hook_revert(
|
||||
httpserver: HTTPServer,
|
||||
@@ -936,7 +1052,7 @@ def test_storage_controller_debug_apis(neon_env_builder: NeonEnvBuilder):
|
||||
that just hits the endpoints to check that they don't bitrot.
|
||||
"""
|
||||
|
||||
neon_env_builder.num_pageservers = 2
|
||||
neon_env_builder.num_pageservers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
tenant_id = TenantId.generate()
|
||||
@@ -961,7 +1077,7 @@ def test_storage_controller_debug_apis(neon_env_builder: NeonEnvBuilder):
|
||||
"GET", f"{env.storage_controller_api}/debug/v1/scheduler"
|
||||
)
|
||||
# Two nodes, in a dict of node_id->node
|
||||
assert len(response.json()["nodes"]) == 2
|
||||
assert len(response.json()["nodes"]) == 3
|
||||
assert sum(v["shard_count"] for v in response.json()["nodes"].values()) == 3
|
||||
assert all(v["may_schedule"] for v in response.json()["nodes"].values())
|
||||
|
||||
@@ -972,13 +1088,25 @@ def test_storage_controller_debug_apis(neon_env_builder: NeonEnvBuilder):
|
||||
headers=env.storage_controller.headers(TokenScope.ADMIN),
|
||||
)
|
||||
|
||||
# Secondary migration API: superficial check that it migrates
|
||||
secondary_dest = env.pageservers[2].id
|
||||
env.storage_controller.request(
|
||||
"PUT",
|
||||
f"{env.storage_controller_api}/control/v1/tenant/{tenant_id}-0002/migrate_secondary",
|
||||
headers=env.storage_controller.headers(TokenScope.ADMIN),
|
||||
json={"tenant_shard_id": f"{tenant_id}-0002", "node_id": secondary_dest},
|
||||
)
|
||||
assert env.storage_controller.tenant_describe(tenant_id)["shards"][0]["node_secondary"] == [
|
||||
secondary_dest
|
||||
]
|
||||
|
||||
# Node unclean drop API
|
||||
response = env.storage_controller.request(
|
||||
"POST",
|
||||
f"{env.storage_controller_api}/debug/v1/node/{env.pageservers[1].id}/drop",
|
||||
headers=env.storage_controller.headers(TokenScope.ADMIN),
|
||||
)
|
||||
assert len(env.storage_controller.node_list()) == 1
|
||||
assert len(env.storage_controller.node_list()) == 2
|
||||
|
||||
# Tenant unclean drop API
|
||||
response = env.storage_controller.request(
|
||||
@@ -1696,7 +1824,13 @@ def test_storcon_cli(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
output_dir = neon_env_builder.test_output_dir
|
||||
shard_count = 4
|
||||
env = neon_env_builder.init_start(initial_tenant_shard_count=shard_count)
|
||||
neon_env_builder.num_pageservers = 2
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
tenant_id = TenantId.generate()
|
||||
env.create_tenant(tenant_id, placement_policy='{"Attached":1}', shard_count=shard_count)
|
||||
|
||||
base_args = [env.neon_binpath / "storcon_cli", "--api", env.storage_controller_api]
|
||||
|
||||
def storcon_cli(args):
|
||||
@@ -1725,7 +1859,7 @@ def test_storcon_cli(neon_env_builder: NeonEnvBuilder):
|
||||
# List nodes
|
||||
node_lines = storcon_cli(["nodes"])
|
||||
# Table header, footer, and one line of data
|
||||
assert len(node_lines) == 5
|
||||
assert len(node_lines) == 7
|
||||
assert "localhost" in node_lines[3]
|
||||
|
||||
# Pause scheduling onto a node
|
||||
@@ -1743,10 +1877,21 @@ def test_storcon_cli(neon_env_builder: NeonEnvBuilder):
|
||||
storcon_cli(["node-configure", "--node-id", "1", "--availability", "offline"])
|
||||
assert "Offline" in storcon_cli(["nodes"])[3]
|
||||
|
||||
# Restore node, verify status changes in CLI output
|
||||
env.pageservers[0].start()
|
||||
|
||||
def is_online():
|
||||
assert "Offline" not in storcon_cli(["nodes"])
|
||||
|
||||
wait_until(is_online)
|
||||
|
||||
# Let everything stabilize after node failure to avoid interfering with subsequent steps
|
||||
env.storage_controller.reconcile_until_idle(timeout_secs=10)
|
||||
|
||||
# List tenants
|
||||
tenant_lines = storcon_cli(["tenants"])
|
||||
assert len(tenant_lines) == 5
|
||||
assert str(env.initial_tenant) in tenant_lines[3]
|
||||
assert str(tenant_id) in tenant_lines[3]
|
||||
|
||||
# Setting scheduling policies intentionally result in warnings, they're for rare use.
|
||||
env.storage_controller.allowed_errors.extend(
|
||||
@@ -1754,23 +1899,58 @@ def test_storcon_cli(neon_env_builder: NeonEnvBuilder):
|
||||
)
|
||||
|
||||
# Describe a tenant
|
||||
tenant_lines = storcon_cli(["tenant-describe", "--tenant-id", str(env.initial_tenant)])
|
||||
tenant_lines = storcon_cli(["tenant-describe", "--tenant-id", str(tenant_id)])
|
||||
assert len(tenant_lines) >= 3 + shard_count * 2
|
||||
assert str(env.initial_tenant) in tenant_lines[0]
|
||||
assert str(tenant_id) in tenant_lines[0]
|
||||
|
||||
# Migrate an attached location
|
||||
def other_ps_id(current_ps_id):
|
||||
return (
|
||||
env.pageservers[0].id
|
||||
if current_ps_id == env.pageservers[1].id
|
||||
else env.pageservers[1].id
|
||||
)
|
||||
|
||||
storcon_cli(
|
||||
[
|
||||
"tenant-shard-migrate",
|
||||
"--tenant-shard-id",
|
||||
f"{tenant_id}-0004",
|
||||
"--node",
|
||||
str(
|
||||
other_ps_id(
|
||||
env.storage_controller.tenant_describe(tenant_id)["shards"][0]["node_attached"]
|
||||
)
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
# Migrate a secondary location
|
||||
storcon_cli(
|
||||
[
|
||||
"tenant-shard-migrate-secondary",
|
||||
"--tenant-shard-id",
|
||||
f"{tenant_id}-0004",
|
||||
"--node",
|
||||
str(
|
||||
other_ps_id(
|
||||
env.storage_controller.tenant_describe(tenant_id)["shards"][0][
|
||||
"node_secondary"
|
||||
][0]
|
||||
)
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
# Pause changes on a tenant
|
||||
storcon_cli(["tenant-policy", "--tenant-id", str(env.initial_tenant), "--scheduling", "stop"])
|
||||
storcon_cli(["tenant-policy", "--tenant-id", str(tenant_id), "--scheduling", "stop"])
|
||||
assert "Stop" in storcon_cli(["tenants"])[3]
|
||||
|
||||
# Cancel ongoing reconcile on a tenant
|
||||
storcon_cli(
|
||||
["tenant-shard-cancel-reconcile", "--tenant-shard-id", f"{env.initial_tenant}-0104"]
|
||||
)
|
||||
storcon_cli(["tenant-shard-cancel-reconcile", "--tenant-shard-id", f"{tenant_id}-0104"])
|
||||
|
||||
# Change a tenant's placement
|
||||
storcon_cli(
|
||||
["tenant-policy", "--tenant-id", str(env.initial_tenant), "--placement", "secondary"]
|
||||
)
|
||||
storcon_cli(["tenant-policy", "--tenant-id", str(tenant_id), "--placement", "secondary"])
|
||||
assert "Secondary" in storcon_cli(["tenants"])[3]
|
||||
|
||||
# Modify a tenant's config
|
||||
@@ -1778,7 +1958,7 @@ def test_storcon_cli(neon_env_builder: NeonEnvBuilder):
|
||||
[
|
||||
"patch-tenant-config",
|
||||
"--tenant-id",
|
||||
str(env.initial_tenant),
|
||||
str(tenant_id),
|
||||
"--config",
|
||||
json.dumps({"pitr_interval": "1m"}),
|
||||
]
|
||||
|
||||
154
test_runner/websocket_tunnel.py
Executable file
154
test_runner/websocket_tunnel.py
Executable file
@@ -0,0 +1,154 @@
|
||||
#!/usr/bin/env python3
|
||||
#
|
||||
# This program helps to test the WebSocket tunneling in proxy. It listens for a TCP
|
||||
# connection on a port, and when you connect to it, it opens a websocket connection,
|
||||
# and forwards all the traffic to the websocket connection, wrapped in WebSocket binary
|
||||
# frames.
|
||||
#
|
||||
# This is used in the test_proxy::test_websockets test, but it is handy for manual testing too.
|
||||
#
|
||||
# Usage for manual testing:
|
||||
#
|
||||
# ## Launch Posgres on port 3000:
|
||||
# postgres -D data -p3000
|
||||
#
|
||||
# ## Launch proxy with WSS enabled:
|
||||
# openssl req -new -x509 -days 365 -nodes -text -out server.crt -keyout server.key -subj '/CN=*.neon.localtest.me'
|
||||
# ./target/debug/proxy --wss 127.0.0.1:40433 --http 127.0.0.1:28080 --mgmt 127.0.0.1:9099 --proxy 127.0.0.1:4433 --tls-key server.key --tls-cert server.crt --auth-backend postgres
|
||||
#
|
||||
# ## Launch the tunnel:
|
||||
#
|
||||
# poetry run ./test_runner/websocket_tunnel.py --ws-port 40433 --ws-url "wss://ep-test.neon.localtest.me"
|
||||
#
|
||||
# ## Now you can connect with psql:
|
||||
# psql "postgresql://heikki@localhost:40433/postgres"
|
||||
#
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import logging
|
||||
import ssl
|
||||
from ssl import Purpose
|
||||
|
||||
import websockets
|
||||
from fixtures.log_helper import log
|
||||
|
||||
|
||||
# Enable verbose logging of all the traffic
|
||||
def enable_verbose_logging():
|
||||
logger = logging.getLogger("websockets")
|
||||
logger.setLevel(logging.DEBUG)
|
||||
logger.addHandler(logging.StreamHandler())
|
||||
|
||||
|
||||
async def start_server(tcp_listen_host, tcp_listen_port, ws_url, ctx):
|
||||
server = await asyncio.start_server(
|
||||
lambda r, w: handle_client(r, w, ws_url, ctx), tcp_listen_host, tcp_listen_port
|
||||
)
|
||||
return server
|
||||
|
||||
|
||||
async def handle_tcp_to_websocket(tcp_reader, ws):
|
||||
try:
|
||||
while not tcp_reader.at_eof():
|
||||
data = await tcp_reader.read(1024)
|
||||
|
||||
await ws.send(data)
|
||||
except websockets.exceptions.ConnectionClosedError as e:
|
||||
log.debug(f"connection closed: {e}")
|
||||
except websockets.exceptions.ConnectionClosedOK:
|
||||
log.debug("connection closed")
|
||||
except Exception as e:
|
||||
log.error(e)
|
||||
|
||||
|
||||
async def handle_websocket_to_tcp(ws, tcp_writer):
|
||||
try:
|
||||
async for message in ws:
|
||||
tcp_writer.write(message)
|
||||
await tcp_writer.drain()
|
||||
except websockets.exceptions.ConnectionClosedError as e:
|
||||
log.debug(f"connection closed: {e}")
|
||||
except websockets.exceptions.ConnectionClosedOK:
|
||||
log.debug("connection closed")
|
||||
except Exception as e:
|
||||
log.error(e)
|
||||
|
||||
|
||||
async def handle_client(tcp_reader, tcp_writer, ws_url: str, ctx: ssl.SSLContext):
|
||||
try:
|
||||
log.info("Received TCP connection. Connecting to websockets proxy.")
|
||||
|
||||
async with websockets.connect(ws_url, ssl=ctx) as ws:
|
||||
try:
|
||||
log.info("Connected to websockets proxy")
|
||||
|
||||
async with asyncio.TaskGroup() as tg:
|
||||
task1 = tg.create_task(handle_tcp_to_websocket(tcp_reader, ws))
|
||||
task2 = tg.create_task(handle_websocket_to_tcp(ws, tcp_writer))
|
||||
|
||||
done, pending = await asyncio.wait(
|
||||
[task1, task2], return_when=asyncio.FIRST_COMPLETED
|
||||
)
|
||||
tcp_writer.close()
|
||||
await ws.close()
|
||||
|
||||
except* Exception as ex:
|
||||
log.error(ex.exceptions)
|
||||
except Exception as e:
|
||||
log.error(e)
|
||||
|
||||
|
||||
async def main():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
"--tcp-listen-addr",
|
||||
default="localhost",
|
||||
help="TCP addr to listen on",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--tcp-listen-port",
|
||||
default="40444",
|
||||
help="TCP port to listen on",
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--ws-url",
|
||||
default="wss://localhost/",
|
||||
help="websocket URL to connect to. This determines the Host header sent to the server",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--ws-host",
|
||||
default="127.0.0.1",
|
||||
help="websockets host to connect to",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--ws-port",
|
||||
type=int,
|
||||
default=443,
|
||||
help="websockets port to connect to",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--verbose",
|
||||
action="store_true",
|
||||
help="enable verbose logging",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.verbose:
|
||||
enable_verbose_logging()
|
||||
|
||||
ctx = ssl.create_default_context(Purpose.SERVER_AUTH)
|
||||
ctx.check_hostname = False
|
||||
ctx.verify_mode = ssl.CERT_NONE
|
||||
|
||||
server = await start_server(args.tcp_listen_addr, args.tcp_listen_port, args.ws_url, ctx)
|
||||
print(
|
||||
f"Listening for connections at {args.tcp_listen_addr}:{args.tcp_listen_port}, forwarding them to {args.ws_host}:{args.ws_port}"
|
||||
)
|
||||
async with server:
|
||||
await server.serve_forever()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
8
vendor/revisions.json
vendored
8
vendor/revisions.json
vendored
@@ -1,18 +1,18 @@
|
||||
{
|
||||
"v17": [
|
||||
"17.2",
|
||||
"7e3f3974bc8895938308f94d0e96879ffae638cd"
|
||||
"9c9e9a78a93aebec2f6a2f54644442d35ffa245c"
|
||||
],
|
||||
"v16": [
|
||||
"16.6",
|
||||
"97f9fde349c6de6d573f5ce96db07eca60ce6185"
|
||||
"f63b141cfb0c813725a6b2574049565bff643018"
|
||||
],
|
||||
"v15": [
|
||||
"15.10",
|
||||
"f262d631ad477a1819e84a183e5a7ef561830085"
|
||||
"d3141e17a7155e3d07c8deba4a10c748a29ba1e6"
|
||||
],
|
||||
"v14": [
|
||||
"14.15",
|
||||
"c2f65b3201591e02ce45b66731392f98d3388e73"
|
||||
"210a0ba3afd8134ea910b203f274b165bd4f05d7"
|
||||
]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user