mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-16 20:50:37 +00:00
Merge branch 'problame/infallible-timeline-activate/2-pushup-tenant-and-timeline-activation' into problame/infallible-timeline-activate/3-funnel-storage-broker-client
This commit is contained in:
13
.github/workflows/build_and_test.yml
vendored
13
.github/workflows/build_and_test.yml
vendored
@@ -777,7 +777,7 @@ jobs:
|
||||
run:
|
||||
shell: sh -eu {0}
|
||||
env:
|
||||
VM_BUILDER_VERSION: v0.4.6
|
||||
VM_BUILDER_VERSION: v0.7.3-alpha3
|
||||
|
||||
steps:
|
||||
- name: Checkout
|
||||
@@ -787,21 +787,18 @@ jobs:
|
||||
|
||||
- name: Downloading vm-builder
|
||||
run: |
|
||||
curl -L https://github.com/neondatabase/neonvm/releases/download/$VM_BUILDER_VERSION/vm-builder -o vm-builder
|
||||
curl -fL https://github.com/neondatabase/autoscaling/releases/download/$VM_BUILDER_VERSION/vm-builder -o vm-builder
|
||||
chmod +x vm-builder
|
||||
|
||||
# Note: we need a separate pull step here because otherwise vm-builder will try to pull, and
|
||||
# it won't have the proper authentication (written at v0.6.0)
|
||||
- name: Pulling compute-node image
|
||||
run: |
|
||||
docker pull 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
|
||||
|
||||
- name: Building VM compute-node rootfs
|
||||
run: |
|
||||
docker build -t temp-vm-compute-node --build-arg SRC_IMAGE=369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}} -f Dockerfile.vm-compute-node .
|
||||
|
||||
- name: Build vm image
|
||||
run: |
|
||||
# note: as of 2023-01-12, vm-builder requires a trailing ":latest" for local images
|
||||
./vm-builder -use-inittab -src=temp-vm-compute-node:latest -dst=369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
|
||||
./vm-builder -src=369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}} -dst=369495373322.dkr.ecr.eu-central-1.amazonaws.com/vm-compute-node-${{ matrix.version }}:${{needs.tag.outputs.build-tag}}
|
||||
|
||||
- name: Pushing vm-compute-node image
|
||||
run: |
|
||||
|
||||
20
Cargo.lock
generated
20
Cargo.lock
generated
@@ -2820,7 +2820,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres"
|
||||
version = "0.19.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=0bc41d8503c092b040142214aac3cf7d11d0c19f#0bc41d8503c092b040142214aac3cf7d11d0c19f"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9#2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
@@ -2833,7 +2833,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres-native-tls"
|
||||
version = "0.5.0"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=0bc41d8503c092b040142214aac3cf7d11d0c19f#0bc41d8503c092b040142214aac3cf7d11d0c19f"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9#2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9"
|
||||
dependencies = [
|
||||
"native-tls",
|
||||
"tokio",
|
||||
@@ -2844,7 +2844,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres-protocol"
|
||||
version = "0.6.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=0bc41d8503c092b040142214aac3cf7d11d0c19f#0bc41d8503c092b040142214aac3cf7d11d0c19f"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9#2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9"
|
||||
dependencies = [
|
||||
"base64 0.20.0",
|
||||
"byteorder",
|
||||
@@ -2862,7 +2862,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres-types"
|
||||
version = "0.2.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=0bc41d8503c092b040142214aac3cf7d11d0c19f#0bc41d8503c092b040142214aac3cf7d11d0c19f"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9#2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
@@ -4271,9 +4271,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tokio"
|
||||
version = "1.27.0"
|
||||
version = "1.28.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d0de47a4eecbe11f498978a9b29d792f0d2692d1dd003650c24c76510e3bc001"
|
||||
checksum = "0aa32867d44e6f2ce3385e89dceb990188b8bb0fb25b0cf576647a6f98ac5105"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"bytes",
|
||||
@@ -4284,7 +4284,7 @@ dependencies = [
|
||||
"signal-hook-registry",
|
||||
"socket2 0.4.9",
|
||||
"tokio-macros",
|
||||
"windows-sys 0.45.0",
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -4299,9 +4299,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tokio-macros"
|
||||
version = "2.0.0"
|
||||
version = "2.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "61a573bdc87985e9d6ddeed1b3d864e8a302c847e40d647746df2f1de209d1ce"
|
||||
checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@@ -4321,7 +4321,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "tokio-postgres"
|
||||
version = "0.7.7"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=0bc41d8503c092b040142214aac3cf7d11d0c19f#0bc41d8503c092b040142214aac3cf7d11d0c19f"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9#2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"byteorder",
|
||||
|
||||
12
Cargo.toml
12
Cargo.toml
@@ -126,11 +126,11 @@ env_logger = "0.10"
|
||||
log = "0.4"
|
||||
|
||||
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
|
||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" }
|
||||
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" }
|
||||
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" }
|
||||
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" }
|
||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9" }
|
||||
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9" }
|
||||
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9" }
|
||||
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9" }
|
||||
tokio-tar = { git = "https://github.com/neondatabase/tokio-tar.git", rev="404df61437de0feef49ba2ccdbdd94eb8ad6e142" }
|
||||
|
||||
## Other git libraries
|
||||
@@ -166,7 +166,7 @@ tonic-build = "0.9"
|
||||
|
||||
# This is only needed for proxy's tests.
|
||||
# TODO: we should probably fork `tokio-postgres-rustls` instead.
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="0bc41d8503c092b040142214aac3cf7d11d0c19f" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="2e9b5f1ddc481d1a98fa79f6b9378ac4f170b7c9" }
|
||||
|
||||
# Changes the MAX_THREADS limit from 4096 to 32768.
|
||||
# This is a temporary workaround for using tracing from many threads in safekeepers code,
|
||||
|
||||
@@ -1,70 +0,0 @@
|
||||
# Note: this file *mostly* just builds on Dockerfile.compute-node
|
||||
|
||||
ARG SRC_IMAGE
|
||||
ARG VM_INFORMANT_VERSION=v0.1.14
|
||||
# on libcgroup update, make sure to check bootstrap.sh for changes
|
||||
ARG LIBCGROUP_VERSION=v2.0.3
|
||||
|
||||
# Pull VM informant, to copy from later
|
||||
FROM neondatabase/vm-informant:$VM_INFORMANT_VERSION as informant
|
||||
|
||||
# Build cgroup-tools
|
||||
#
|
||||
# At time of writing (2023-03-14), debian bullseye has a version of cgroup-tools (technically
|
||||
# libcgroup) that doesn't support cgroup v2 (version 0.41-11). Unfortunately, the vm-informant
|
||||
# requires cgroup v2, so we'll build cgroup-tools ourselves.
|
||||
FROM debian:bullseye-slim as libcgroup-builder
|
||||
ARG LIBCGROUP_VERSION
|
||||
|
||||
RUN set -exu \
|
||||
&& apt update \
|
||||
&& apt install --no-install-recommends -y \
|
||||
git \
|
||||
ca-certificates \
|
||||
automake \
|
||||
cmake \
|
||||
make \
|
||||
gcc \
|
||||
byacc \
|
||||
flex \
|
||||
libtool \
|
||||
libpam0g-dev \
|
||||
&& git clone --depth 1 -b $LIBCGROUP_VERSION https://github.com/libcgroup/libcgroup \
|
||||
&& INSTALL_DIR="/libcgroup-install" \
|
||||
&& mkdir -p "$INSTALL_DIR/bin" "$INSTALL_DIR/include" \
|
||||
&& cd libcgroup \
|
||||
# extracted from bootstrap.sh, with modified flags:
|
||||
&& (test -d m4 || mkdir m4) \
|
||||
&& autoreconf -fi \
|
||||
&& rm -rf autom4te.cache \
|
||||
&& CFLAGS="-O3" ./configure --prefix="$INSTALL_DIR" --sysconfdir=/etc --localstatedir=/var --enable-opaque-hierarchy="name=systemd" \
|
||||
# actually build the thing...
|
||||
&& make install
|
||||
|
||||
# Combine, starting from non-VM compute node image.
|
||||
FROM $SRC_IMAGE as base
|
||||
|
||||
# Temporarily set user back to root so we can run adduser, set inittab
|
||||
USER root
|
||||
RUN adduser vm-informant --disabled-password --no-create-home
|
||||
|
||||
RUN set -e \
|
||||
&& rm -f /etc/inittab \
|
||||
&& touch /etc/inittab
|
||||
|
||||
RUN set -e \
|
||||
&& echo "::sysinit:cgconfigparser -l /etc/cgconfig.conf -s 1664" >> /etc/inittab \
|
||||
&& CONNSTR="dbname=postgres user=cloud_admin sslmode=disable" \
|
||||
&& ARGS="--auto-restart --cgroup=neon-postgres --pgconnstr=\"$CONNSTR\"" \
|
||||
&& echo "::respawn:su vm-informant -c '/usr/local/bin/vm-informant $ARGS'" >> /etc/inittab
|
||||
|
||||
USER postgres
|
||||
|
||||
ADD vm-cgconfig.conf /etc/cgconfig.conf
|
||||
COPY --from=informant /usr/bin/vm-informant /usr/local/bin/vm-informant
|
||||
|
||||
COPY --from=libcgroup-builder /libcgroup-install/bin/* /usr/bin/
|
||||
COPY --from=libcgroup-builder /libcgroup-install/lib/* /usr/lib/
|
||||
COPY --from=libcgroup-builder /libcgroup-install/sbin/* /usr/sbin/
|
||||
|
||||
ENTRYPOINT ["/usr/sbin/cgexec", "-g", "*:neon-postgres", "/usr/local/bin/compute_ctl"]
|
||||
@@ -797,7 +797,8 @@ impl PageServerConf {
|
||||
)?);
|
||||
}
|
||||
if let Some(max_lsn_wal_lag) = item.get("max_lsn_wal_lag") {
|
||||
t_conf.max_lsn_wal_lag = Some(parse_toml_from_str("max_lsn_wal_lag", max_lsn_wal_lag)?);
|
||||
t_conf.max_lsn_wal_lag =
|
||||
Some(deserialize_from_item("max_lsn_wal_lag", max_lsn_wal_lag)?);
|
||||
}
|
||||
if let Some(trace_read_requests) = item.get("trace_read_requests") {
|
||||
t_conf.trace_read_requests =
|
||||
|
||||
@@ -5,7 +5,7 @@ use std::ops::Range;
|
||||
///
|
||||
/// Represents a set of Keys, in a compact form.
|
||||
///
|
||||
#[derive(Clone, Debug)]
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct KeySpace {
|
||||
/// Contiguous ranges of keys that belong to the key space. In key order,
|
||||
/// and with no overlap.
|
||||
@@ -61,6 +61,18 @@ impl KeySpace {
|
||||
|
||||
KeyPartitioning { parts }
|
||||
}
|
||||
|
||||
///
|
||||
/// Check if key space contains overlapping range
|
||||
///
|
||||
pub fn overlaps(&self, range: &Range<Key>) -> bool {
|
||||
match self.ranges.binary_search_by_key(&range.end, |r| r.start) {
|
||||
Ok(0) => false,
|
||||
Err(0) => false,
|
||||
Ok(index) => self.ranges[index - 1].end > range.start,
|
||||
Err(index) => self.ranges[index - 1].end > range.start,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
@@ -129,3 +141,226 @@ impl KeySpaceAccum {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// A helper object, to collect a set of keys and key ranges into a KeySpace
|
||||
/// object. Key ranges may be inserted in any order and can overlap.
|
||||
///
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct KeySpaceRandomAccum {
|
||||
ranges: Vec<Range<Key>>,
|
||||
}
|
||||
|
||||
impl KeySpaceRandomAccum {
|
||||
pub fn new() -> Self {
|
||||
Self { ranges: Vec::new() }
|
||||
}
|
||||
|
||||
pub fn add_key(&mut self, key: Key) {
|
||||
self.add_range(singleton_range(key))
|
||||
}
|
||||
|
||||
pub fn add_range(&mut self, range: Range<Key>) {
|
||||
self.ranges.push(range);
|
||||
}
|
||||
|
||||
pub fn to_keyspace(mut self) -> KeySpace {
|
||||
let mut ranges = Vec::new();
|
||||
if !self.ranges.is_empty() {
|
||||
self.ranges.sort_by_key(|r| r.start);
|
||||
let mut start = self.ranges.first().unwrap().start;
|
||||
let mut end = self.ranges.first().unwrap().end;
|
||||
for r in self.ranges {
|
||||
assert!(r.start >= start);
|
||||
if r.start > end {
|
||||
ranges.push(start..end);
|
||||
start = r.start;
|
||||
end = r.end;
|
||||
} else if r.end > end {
|
||||
end = r.end;
|
||||
}
|
||||
}
|
||||
ranges.push(start..end);
|
||||
}
|
||||
KeySpace { ranges }
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::fmt::Write;
|
||||
|
||||
// Helper function to create a key range.
|
||||
//
|
||||
// Make the tests below less verbose.
|
||||
fn kr(irange: Range<i128>) -> Range<Key> {
|
||||
Key::from_i128(irange.start)..Key::from_i128(irange.end)
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn dump_keyspace(ks: &KeySpace) {
|
||||
for r in ks.ranges.iter() {
|
||||
println!(" {}..{}", r.start.to_i128(), r.end.to_i128());
|
||||
}
|
||||
}
|
||||
|
||||
fn assert_ks_eq(actual: &KeySpace, expected: Vec<Range<Key>>) {
|
||||
if actual.ranges != expected {
|
||||
let mut msg = String::new();
|
||||
|
||||
writeln!(msg, "expected:").unwrap();
|
||||
for r in &expected {
|
||||
writeln!(msg, " {}..{}", r.start.to_i128(), r.end.to_i128()).unwrap();
|
||||
}
|
||||
writeln!(msg, "got:").unwrap();
|
||||
for r in &actual.ranges {
|
||||
writeln!(msg, " {}..{}", r.start.to_i128(), r.end.to_i128()).unwrap();
|
||||
}
|
||||
panic!("{}", msg);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn keyspace_add_range() {
|
||||
// two separate ranges
|
||||
//
|
||||
// #####
|
||||
// #####
|
||||
let mut ks = KeySpaceRandomAccum::default();
|
||||
ks.add_range(kr(0..10));
|
||||
ks.add_range(kr(20..30));
|
||||
assert_ks_eq(&ks.to_keyspace(), vec![kr(0..10), kr(20..30)]);
|
||||
|
||||
// two separate ranges, added in reverse order
|
||||
//
|
||||
// #####
|
||||
// #####
|
||||
let mut ks = KeySpaceRandomAccum::default();
|
||||
ks.add_range(kr(20..30));
|
||||
ks.add_range(kr(0..10));
|
||||
|
||||
// add range that is adjacent to the end of an existing range
|
||||
//
|
||||
// #####
|
||||
// #####
|
||||
ks.add_range(kr(0..10));
|
||||
ks.add_range(kr(10..30));
|
||||
assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
|
||||
|
||||
// add range that is adjacent to the start of an existing range
|
||||
//
|
||||
// #####
|
||||
// #####
|
||||
let mut ks = KeySpaceRandomAccum::default();
|
||||
ks.add_range(kr(10..30));
|
||||
ks.add_range(kr(0..10));
|
||||
assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
|
||||
|
||||
// add range that overlaps with the end of an existing range
|
||||
//
|
||||
// #####
|
||||
// #####
|
||||
let mut ks = KeySpaceRandomAccum::default();
|
||||
ks.add_range(kr(0..10));
|
||||
ks.add_range(kr(5..30));
|
||||
assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
|
||||
|
||||
// add range that overlaps with the start of an existing range
|
||||
//
|
||||
// #####
|
||||
// #####
|
||||
let mut ks = KeySpaceRandomAccum::default();
|
||||
ks.add_range(kr(5..30));
|
||||
ks.add_range(kr(0..10));
|
||||
assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
|
||||
|
||||
// add range that is fully covered by an existing range
|
||||
//
|
||||
// #########
|
||||
// #####
|
||||
let mut ks = KeySpaceRandomAccum::default();
|
||||
ks.add_range(kr(0..30));
|
||||
ks.add_range(kr(10..20));
|
||||
assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
|
||||
|
||||
// add range that extends an existing range from both ends
|
||||
//
|
||||
// #####
|
||||
// #########
|
||||
let mut ks = KeySpaceRandomAccum::default();
|
||||
ks.add_range(kr(10..20));
|
||||
ks.add_range(kr(0..30));
|
||||
assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
|
||||
|
||||
// add a range that overlaps with two existing ranges, joining them
|
||||
//
|
||||
// ##### #####
|
||||
// #######
|
||||
let mut ks = KeySpaceRandomAccum::default();
|
||||
ks.add_range(kr(0..10));
|
||||
ks.add_range(kr(20..30));
|
||||
ks.add_range(kr(5..25));
|
||||
assert_ks_eq(&ks.to_keyspace(), vec![kr(0..30)]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn keyspace_overlaps() {
|
||||
let mut ks = KeySpaceRandomAccum::default();
|
||||
ks.add_range(kr(10..20));
|
||||
ks.add_range(kr(30..40));
|
||||
let ks = ks.to_keyspace();
|
||||
|
||||
// ##### #####
|
||||
// xxxx
|
||||
assert!(!ks.overlaps(&kr(0..5)));
|
||||
|
||||
// ##### #####
|
||||
// xxxx
|
||||
assert!(!ks.overlaps(&kr(5..9)));
|
||||
|
||||
// ##### #####
|
||||
// xxxx
|
||||
assert!(!ks.overlaps(&kr(5..10)));
|
||||
|
||||
// ##### #####
|
||||
// xxxx
|
||||
assert!(ks.overlaps(&kr(5..11)));
|
||||
|
||||
// ##### #####
|
||||
// xxxx
|
||||
assert!(ks.overlaps(&kr(10..15)));
|
||||
|
||||
// ##### #####
|
||||
// xxxx
|
||||
assert!(ks.overlaps(&kr(15..20)));
|
||||
|
||||
// ##### #####
|
||||
// xxxx
|
||||
assert!(ks.overlaps(&kr(15..25)));
|
||||
|
||||
// ##### #####
|
||||
// xxxx
|
||||
assert!(!ks.overlaps(&kr(22..28)));
|
||||
|
||||
// ##### #####
|
||||
// xxxx
|
||||
assert!(!ks.overlaps(&kr(25..30)));
|
||||
|
||||
// ##### #####
|
||||
// xxxx
|
||||
assert!(ks.overlaps(&kr(35..35)));
|
||||
|
||||
// ##### #####
|
||||
// xxxx
|
||||
assert!(!ks.overlaps(&kr(40..45)));
|
||||
|
||||
// ##### #####
|
||||
// xxxx
|
||||
assert!(!ks.overlaps(&kr(45..50)));
|
||||
|
||||
// ##### #####
|
||||
// xxxxxxxxxxx
|
||||
assert!(ks.overlaps(&kr(0..30))); // XXXXX This fails currently!
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,8 +22,7 @@ use tracing::*;
|
||||
use utils::id::TenantTimelineId;
|
||||
|
||||
use std::cmp::{max, min, Ordering};
|
||||
use std::collections::BinaryHeap;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{BinaryHeap, HashMap};
|
||||
use std::fs;
|
||||
use std::ops::{Deref, Range};
|
||||
use std::path::{Path, PathBuf};
|
||||
@@ -47,7 +46,7 @@ use crate::tenant::{
|
||||
};
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::keyspace::{KeyPartitioning, KeySpace};
|
||||
use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceRandomAccum};
|
||||
use crate::metrics::{TimelineMetrics, UNEXPECTED_ONDEMAND_DOWNLOADS};
|
||||
use crate::pgdatadir_mapping::LsnForTimestamp;
|
||||
use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key};
|
||||
@@ -122,6 +121,17 @@ pub struct Timeline {
|
||||
|
||||
pub(super) layers: RwLock<LayerMap<dyn PersistentLayer>>,
|
||||
|
||||
/// Set of key ranges which should be covered by image layers to
|
||||
/// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
|
||||
/// It is used by compaction task when it checks if new image layer should be created.
|
||||
/// Newly created image layer doesn't help to remove the delta layer, until the
|
||||
/// newly created image layer falls off the PITR horizon. So on next GC cycle,
|
||||
/// gc_timeline may still want the new image layer to be created. To avoid redundant
|
||||
/// image layers creation we should check if image layer exists but beyond PITR horizon.
|
||||
/// This is why we need remember GC cutoff LSN.
|
||||
///
|
||||
wanted_image_layers: Mutex<Option<(Lsn, KeySpace)>>,
|
||||
|
||||
last_freeze_at: AtomicLsn,
|
||||
// Atomic would be more appropriate here.
|
||||
last_freeze_ts: RwLock<Instant>,
|
||||
@@ -1350,6 +1360,7 @@ impl Timeline {
|
||||
tenant_id,
|
||||
pg_version,
|
||||
layers: RwLock::new(LayerMap::default()),
|
||||
wanted_image_layers: Mutex::new(None),
|
||||
|
||||
walredo_mgr,
|
||||
walreceiver,
|
||||
@@ -2900,6 +2911,30 @@ impl Timeline {
|
||||
let layers = self.layers.read().unwrap();
|
||||
|
||||
let mut max_deltas = 0;
|
||||
{
|
||||
let wanted_image_layers = self.wanted_image_layers.lock().unwrap();
|
||||
if let Some((cutoff_lsn, wanted)) = &*wanted_image_layers {
|
||||
let img_range =
|
||||
partition.ranges.first().unwrap().start..partition.ranges.last().unwrap().end;
|
||||
if wanted.overlaps(&img_range) {
|
||||
//
|
||||
// gc_timeline only pays attention to image layers that are older than the GC cutoff,
|
||||
// but create_image_layers creates image layers at last-record-lsn.
|
||||
// So it's possible that gc_timeline wants a new image layer to be created for a key range,
|
||||
// but the range is already covered by image layers at more recent LSNs. Before we
|
||||
// create a new image layer, check if the range is already covered at more recent LSNs.
|
||||
if !layers
|
||||
.image_layer_exists(&img_range, &(Lsn::min(lsn, *cutoff_lsn)..lsn + 1))?
|
||||
{
|
||||
debug!(
|
||||
"Force generation of layer {}-{} wanted by GC, cutoff={}, lsn={})",
|
||||
img_range.start, img_range.end, cutoff_lsn, lsn
|
||||
);
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for part_range in &partition.ranges {
|
||||
let image_coverage = layers.image_coverage(part_range, lsn)?;
|
||||
@@ -3019,6 +3054,12 @@ impl Timeline {
|
||||
image_layers.push(image_layer);
|
||||
}
|
||||
}
|
||||
// All layers that the GC wanted us to create have now been created.
|
||||
//
|
||||
// It's possible that another GC cycle happened while we were compacting, and added
|
||||
// something new to wanted_image_layers, and we now clear that before processing it.
|
||||
// That's OK, because the next GC iteration will put it back in.
|
||||
*self.wanted_image_layers.lock().unwrap() = None;
|
||||
|
||||
// Sync the new layer to disk before adding it to the layer map, to make sure
|
||||
// we don't garbage collect something based on the new layer, before it has
|
||||
@@ -3716,6 +3757,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
let mut layers_to_remove = Vec::new();
|
||||
let mut wanted_image_layers = KeySpaceRandomAccum::default();
|
||||
|
||||
// Scan all layers in the timeline (remote or on-disk).
|
||||
//
|
||||
@@ -3799,6 +3841,15 @@ impl Timeline {
|
||||
"keeping {} because it is the latest layer",
|
||||
l.filename().file_name()
|
||||
);
|
||||
// Collect delta key ranges that need image layers to allow garbage
|
||||
// collecting the layers.
|
||||
// It is not so obvious whether we need to propagate information only about
|
||||
// delta layers. Image layers can form "stairs" preventing old image from been deleted.
|
||||
// But image layers are in any case less sparse than delta layers. Also we need some
|
||||
// protection from replacing recent image layers with new one after each GC iteration.
|
||||
if l.is_incremental() && !LayerMap::is_l0(&*l) {
|
||||
wanted_image_layers.add_range(l.get_key_range());
|
||||
}
|
||||
result.layers_not_updated += 1;
|
||||
continue 'outer;
|
||||
}
|
||||
@@ -3811,6 +3862,10 @@ impl Timeline {
|
||||
);
|
||||
layers_to_remove.push(Arc::clone(&l));
|
||||
}
|
||||
self.wanted_image_layers
|
||||
.lock()
|
||||
.unwrap()
|
||||
.replace((new_gc_cutoff, wanted_image_layers.to_keyspace()));
|
||||
|
||||
let mut updates = layers.batch_update();
|
||||
if !layers_to_remove.is_empty() {
|
||||
|
||||
14
poetry.lock
generated
14
poetry.lock
generated
@@ -2092,21 +2092,21 @@ files = [
|
||||
|
||||
[[package]]
|
||||
name = "requests"
|
||||
version = "2.28.1"
|
||||
version = "2.31.0"
|
||||
description = "Python HTTP for Humans."
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = ">=3.7, <4"
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
{file = "requests-2.28.1-py3-none-any.whl", hash = "sha256:8fefa2a1a1365bf5520aac41836fbee479da67864514bdb821f31ce07ce65349"},
|
||||
{file = "requests-2.28.1.tar.gz", hash = "sha256:7c5599b102feddaa661c826c56ab4fee28bfd17f5abca1ebbe3e7f19d7c97983"},
|
||||
{file = "requests-2.31.0-py3-none-any.whl", hash = "sha256:58cd2187c01e70e6e26505bca751777aa9f2ee0b7f4300988b709f44e013003f"},
|
||||
{file = "requests-2.31.0.tar.gz", hash = "sha256:942c5a758f98d790eaed1a29cb6eefc7ffb0d1cf7af05c3d2791656dbd6ad1e1"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
certifi = ">=2017.4.17"
|
||||
charset-normalizer = ">=2,<3"
|
||||
charset-normalizer = ">=2,<4"
|
||||
idna = ">=2.5,<4"
|
||||
urllib3 = ">=1.21.1,<1.27"
|
||||
urllib3 = ">=1.21.1,<3"
|
||||
|
||||
[package.extras]
|
||||
socks = ["PySocks (>=1.5.6,!=1.5.7)"]
|
||||
@@ -2611,4 +2611,4 @@ testing = ["func-timeout", "jaraco.itertools", "pytest (>=6)", "pytest-black (>=
|
||||
[metadata]
|
||||
lock-version = "2.0"
|
||||
python-versions = "^3.9"
|
||||
content-hash = "b689ffd6eae32b966f1744b5ac3343fe0dd26b31ee1f50e13daf5045ee0623e1"
|
||||
content-hash = "a0bd73376a3e9479f2379265ccec8dd6ac9df2e525909d12b77d918d590fba55"
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# Proxy
|
||||
|
||||
Proxy binary accepts `--auth-backend` CLI option, which determines auth scheme and cluster routing method. Following backends are currently implemented:
|
||||
Proxy binary accepts `--auth-backend` CLI option, which determines auth scheme and cluster routing method. Following routing backends are currently implemented:
|
||||
|
||||
* console
|
||||
new SCRAM-based console API; uses SNI info to select the destination project (endpoint soon)
|
||||
@@ -9,6 +9,90 @@ Proxy binary accepts `--auth-backend` CLI option, which determines auth scheme a
|
||||
* link
|
||||
sends login link for all usernames
|
||||
|
||||
Also proxy can expose following services to the external world:
|
||||
|
||||
* postgres protocol over TCP -- usual postgres endpoint compatible with usual
|
||||
postgres drivers
|
||||
* postgres protocol over WebSockets -- same protocol tunneled over websockets
|
||||
for environments where TCP connection is not available. We have our own
|
||||
implementation of a client that uses node-postgres and tunnels traffic through
|
||||
websockets: https://github.com/neondatabase/serverless
|
||||
* SQL over HTTP -- service that accepts POST requests with SQL text over HTTP
|
||||
and responds with JSON-serialised results.
|
||||
|
||||
|
||||
## SQL over HTTP
|
||||
|
||||
Contrary to the usual postgres proto over TCP and WebSockets using plain
|
||||
one-shot HTTP request achieves smaller amortized latencies in edge setups due to
|
||||
fewer round trips and an enhanced open connection reuse by the v8 engine. Also
|
||||
such endpoint could be used directly without any driver.
|
||||
|
||||
To play with it locally one may start proxy over a local postgres installation
|
||||
(see end of this page on how to generate certs with openssl):
|
||||
|
||||
```
|
||||
./target/debug/proxy -c server.crt -k server.key --auth-backend=postgres --auth-endpoint=postgres://stas@127.0.0.1:5432/stas --wss 0.0.0.0:4444
|
||||
```
|
||||
|
||||
If both postgres and proxy are running you may send a SQL query:
|
||||
```json
|
||||
curl -k -X POST 'https://proxy.localtest.me:4444/sql' \
|
||||
-H 'Neon-Connection-String: postgres://stas:pass@proxy.localtest.me:4444/postgres' \
|
||||
-H 'Content-Type: application/json' \
|
||||
--data '{
|
||||
"query":"SELECT $1::int[] as arr, $2::jsonb as obj, 42 as num",
|
||||
"params":[ "{{1,2},{\"3\",4}}", {"key":"val", "ikey":4242}]
|
||||
}' | jq
|
||||
|
||||
{
|
||||
"command": "SELECT",
|
||||
"fields": [
|
||||
{ "dataTypeID": 1007, "name": "arr" },
|
||||
{ "dataTypeID": 3802, "name": "obj" },
|
||||
{ "dataTypeID": 23, "name": "num" }
|
||||
],
|
||||
"rowCount": 1,
|
||||
"rows": [
|
||||
{
|
||||
"arr": [[1,2],[3,4]],
|
||||
"num": 42,
|
||||
"obj": {
|
||||
"ikey": 4242,
|
||||
"key": "val"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
With the current approach we made the following design decisions:
|
||||
|
||||
1. SQL injection protection: We employed the extended query protocol, modifying
|
||||
the rust-postgres driver to send queries in one roundtrip using a text
|
||||
protocol rather than binary, bypassing potential issues like those identified
|
||||
in sfackler/rust-postgres#1030.
|
||||
|
||||
2. Postgres type compatibility: As not all postgres types have binary
|
||||
representations (e.g., acl's in pg_class), we adjusted rust-postgres to
|
||||
respond with text protocol, simplifying serialization and fixing queries with
|
||||
text-only types in response.
|
||||
|
||||
3. Data type conversion: Considering JSON supports fewer data types than
|
||||
Postgres, we perform conversions where possible, passing all other types as
|
||||
strings. Key conversions include:
|
||||
- postgres int2, int4, float4, float8 -> json number (NaN and Inf remain
|
||||
text)
|
||||
- postgres bool, null, text -> json bool, null, string
|
||||
- postgres array -> json array
|
||||
- postgres json and jsonb -> json object
|
||||
|
||||
4. Alignment with node-postgres: To facilitate integration with js libraries,
|
||||
we've matched the response structure of node-postgres, returning command tags
|
||||
and column oids. Command tag capturing was added to the rust-postgres
|
||||
functionality as part of this change.
|
||||
|
||||
## Using SNI-based routing on localhost
|
||||
|
||||
Now proxy determines project name from the subdomain, request to the `round-rice-566201.somedomain.tld` will be routed to the project named `round-rice-566201`. Unfortunately, `/etc/hosts` does not support domain wildcards, so I usually use `*.localtest.me` which resolves to `127.0.0.1`. Now we can create self-signed certificate and play with proxy:
|
||||
|
||||
@@ -139,6 +139,16 @@ async fn auth_quirks(
|
||||
}
|
||||
|
||||
impl BackendType<'_, ClientCredentials<'_>> {
|
||||
/// Get compute endpoint name from the credentials.
|
||||
pub fn get_endpoint(&self) -> Option<String> {
|
||||
use BackendType::*;
|
||||
|
||||
match self {
|
||||
Console(_, creds) => creds.project.clone(),
|
||||
Postgres(_, creds) => creds.project.clone(),
|
||||
Link(_) => Some("link".to_owned()),
|
||||
}
|
||||
}
|
||||
/// Authenticate the client via the requested backend, possibly using credentials.
|
||||
#[tracing::instrument(fields(allow_cleartext = allow_cleartext), skip_all)]
|
||||
pub async fn authenticate(
|
||||
|
||||
@@ -100,9 +100,10 @@ impl CertResolver {
|
||||
is_default: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
let priv_key = {
|
||||
let key_bytes = std::fs::read(key_path).context("TLS key file")?;
|
||||
let mut keys = rustls_pemfile::pkcs8_private_keys(&mut &key_bytes[..])
|
||||
let key_bytes = std::fs::read(key_path)
|
||||
.context(format!("Failed to read TLS keys at '{key_path}'"))?;
|
||||
let mut keys = rustls_pemfile::pkcs8_private_keys(&mut &key_bytes[..])
|
||||
.context(format!("Failed to parse TLS keys at '{key_path}'"))?;
|
||||
|
||||
ensure!(keys.len() == 1, "keys.len() = {} (should be 1)", keys.len());
|
||||
keys.pop().map(rustls::PrivateKey).unwrap()
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
//! directly relying on deps like `reqwest` (think loose coupling).
|
||||
|
||||
pub mod server;
|
||||
pub mod sql_over_http;
|
||||
pub mod websocket;
|
||||
|
||||
pub use reqwest::{Request, Response, StatusCode};
|
||||
|
||||
603
proxy/src/http/sql_over_http.rs
Normal file
603
proxy/src/http/sql_over_http.rs
Normal file
@@ -0,0 +1,603 @@
|
||||
use futures::pin_mut;
|
||||
use futures::StreamExt;
|
||||
use hyper::body::HttpBody;
|
||||
use hyper::{Body, HeaderMap, Request};
|
||||
use pq_proto::StartupMessageParams;
|
||||
use serde_json::json;
|
||||
use serde_json::Map;
|
||||
use serde_json::Value;
|
||||
use tokio_postgres::types::Kind;
|
||||
use tokio_postgres::types::Type;
|
||||
use tokio_postgres::Row;
|
||||
use url::Url;
|
||||
|
||||
use crate::{auth, config::ProxyConfig, console};
|
||||
|
||||
#[derive(serde::Deserialize)]
|
||||
struct QueryData {
|
||||
query: String,
|
||||
params: Vec<serde_json::Value>,
|
||||
}
|
||||
|
||||
const APP_NAME: &str = "sql_over_http";
|
||||
const MAX_RESPONSE_SIZE: usize = 1024 * 1024; // 1 MB
|
||||
const MAX_REQUEST_SIZE: u64 = 1024 * 1024; // 1 MB
|
||||
|
||||
//
|
||||
// Convert json non-string types to strings, so that they can be passed to Postgres
|
||||
// as parameters.
|
||||
//
|
||||
fn json_to_pg_text(json: Vec<Value>) -> Result<Vec<String>, serde_json::Error> {
|
||||
json.iter()
|
||||
.map(|value| {
|
||||
match value {
|
||||
Value::Null => serde_json::to_string(value),
|
||||
Value::Bool(_) => serde_json::to_string(value),
|
||||
Value::Number(_) => serde_json::to_string(value),
|
||||
Value::Object(_) => serde_json::to_string(value),
|
||||
|
||||
// no need to escape
|
||||
Value::String(s) => Ok(s.to_string()),
|
||||
|
||||
// special care for arrays
|
||||
Value::Array(_) => json_array_to_pg_array(value),
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
//
|
||||
// Serialize a JSON array to a Postgres array. Contrary to the strings in the params
|
||||
// in the array we need to escape the strings. Postgres is okay with arrays of form
|
||||
// '{1,"2",3}'::int[], so we don't check that array holds values of the same type, leaving
|
||||
// it for Postgres to check.
|
||||
//
|
||||
// Example of the same escaping in node-postgres: packages/pg/lib/utils.js
|
||||
//
|
||||
fn json_array_to_pg_array(value: &Value) -> Result<String, serde_json::Error> {
|
||||
match value {
|
||||
// same
|
||||
Value::Null => serde_json::to_string(value),
|
||||
Value::Bool(_) => serde_json::to_string(value),
|
||||
Value::Number(_) => serde_json::to_string(value),
|
||||
Value::Object(_) => serde_json::to_string(value),
|
||||
|
||||
// now needs to be escaped, as it is part of the array
|
||||
Value::String(_) => serde_json::to_string(value),
|
||||
|
||||
// recurse into array
|
||||
Value::Array(arr) => {
|
||||
let vals = arr
|
||||
.iter()
|
||||
.map(json_array_to_pg_array)
|
||||
.collect::<Result<Vec<_>, _>>()?
|
||||
.join(",");
|
||||
Ok(format!("{{{}}}", vals))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn get_conn_info(
|
||||
headers: &HeaderMap,
|
||||
sni_hostname: Option<String>,
|
||||
) -> Result<(String, String, String, String), anyhow::Error> {
|
||||
let connection_string = headers
|
||||
.get("Neon-Connection-String")
|
||||
.ok_or(anyhow::anyhow!("missing connection string"))?
|
||||
.to_str()?;
|
||||
|
||||
let connection_url = Url::parse(connection_string)?;
|
||||
|
||||
let protocol = connection_url.scheme();
|
||||
if protocol != "postgres" && protocol != "postgresql" {
|
||||
return Err(anyhow::anyhow!(
|
||||
"connection string must start with postgres: or postgresql:"
|
||||
));
|
||||
}
|
||||
|
||||
let mut url_path = connection_url
|
||||
.path_segments()
|
||||
.ok_or(anyhow::anyhow!("missing database name"))?;
|
||||
|
||||
let dbname = url_path
|
||||
.next()
|
||||
.ok_or(anyhow::anyhow!("invalid database name"))?;
|
||||
|
||||
let username = connection_url.username();
|
||||
if username.is_empty() {
|
||||
return Err(anyhow::anyhow!("missing username"));
|
||||
}
|
||||
|
||||
let password = connection_url
|
||||
.password()
|
||||
.ok_or(anyhow::anyhow!("no password"))?;
|
||||
|
||||
// TLS certificate selector now based on SNI hostname, so if we are running here
|
||||
// we are sure that SNI hostname is set to one of the configured domain names.
|
||||
let sni_hostname = sni_hostname.ok_or(anyhow::anyhow!("no SNI hostname set"))?;
|
||||
|
||||
let hostname = connection_url
|
||||
.host_str()
|
||||
.ok_or(anyhow::anyhow!("no host"))?;
|
||||
|
||||
let host_header = headers
|
||||
.get("host")
|
||||
.and_then(|h| h.to_str().ok())
|
||||
.and_then(|h| h.split(':').next());
|
||||
|
||||
if hostname != sni_hostname {
|
||||
return Err(anyhow::anyhow!("mismatched SNI hostname and hostname"));
|
||||
} else if let Some(h) = host_header {
|
||||
if h != hostname {
|
||||
return Err(anyhow::anyhow!("mismatched host header and hostname"));
|
||||
}
|
||||
}
|
||||
|
||||
Ok((
|
||||
username.to_owned(),
|
||||
dbname.to_owned(),
|
||||
hostname.to_owned(),
|
||||
password.to_owned(),
|
||||
))
|
||||
}
|
||||
|
||||
// TODO: return different http error codes
|
||||
pub async fn handle(
|
||||
config: &'static ProxyConfig,
|
||||
request: Request<Body>,
|
||||
sni_hostname: Option<String>,
|
||||
) -> anyhow::Result<Value> {
|
||||
//
|
||||
// Determine the destination and connection params
|
||||
//
|
||||
let headers = request.headers();
|
||||
let (username, dbname, hostname, password) = get_conn_info(headers, sni_hostname)?;
|
||||
let credential_params = StartupMessageParams::new([
|
||||
("user", &username),
|
||||
("database", &dbname),
|
||||
("application_name", APP_NAME),
|
||||
]);
|
||||
|
||||
//
|
||||
// Wake up the destination if needed. Code here is a bit involved because
|
||||
// we reuse the code from the usual proxy and we need to prepare few structures
|
||||
// that this code expects.
|
||||
//
|
||||
let tls = config.tls_config.as_ref();
|
||||
let common_names = tls.and_then(|tls| tls.common_names.clone());
|
||||
let creds = config
|
||||
.auth_backend
|
||||
.as_ref()
|
||||
.map(|_| auth::ClientCredentials::parse(&credential_params, Some(&hostname), common_names))
|
||||
.transpose()?;
|
||||
let extra = console::ConsoleReqExtra {
|
||||
session_id: uuid::Uuid::new_v4(),
|
||||
application_name: Some(APP_NAME),
|
||||
};
|
||||
let node = creds.wake_compute(&extra).await?.expect("msg");
|
||||
let conf = node.value.config;
|
||||
let port = *conf.get_ports().first().expect("no port");
|
||||
let host = match conf.get_hosts().first().expect("no host") {
|
||||
tokio_postgres::config::Host::Tcp(host) => host,
|
||||
tokio_postgres::config::Host::Unix(_) => {
|
||||
return Err(anyhow::anyhow!("unix socket is not supported"));
|
||||
}
|
||||
};
|
||||
|
||||
let request_content_length = match request.body().size_hint().upper() {
|
||||
Some(v) => v,
|
||||
None => MAX_REQUEST_SIZE + 1,
|
||||
};
|
||||
|
||||
if request_content_length > MAX_REQUEST_SIZE {
|
||||
return Err(anyhow::anyhow!(
|
||||
"request is too large (max {MAX_REQUEST_SIZE} bytes)"
|
||||
));
|
||||
}
|
||||
|
||||
//
|
||||
// Read the query and query params from the request body
|
||||
//
|
||||
let body = hyper::body::to_bytes(request.into_body()).await?;
|
||||
let QueryData { query, params } = serde_json::from_slice(&body)?;
|
||||
let query_params = json_to_pg_text(params)?;
|
||||
|
||||
//
|
||||
// Connenct to the destination
|
||||
//
|
||||
let (client, connection) = tokio_postgres::Config::new()
|
||||
.host(host)
|
||||
.port(port)
|
||||
.user(&username)
|
||||
.password(&password)
|
||||
.dbname(&dbname)
|
||||
.max_backend_message_size(MAX_RESPONSE_SIZE)
|
||||
.connect(tokio_postgres::NoTls)
|
||||
.await?;
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = connection.await {
|
||||
eprintln!("connection error: {}", e);
|
||||
}
|
||||
});
|
||||
|
||||
//
|
||||
// Now execute the query and return the result
|
||||
//
|
||||
let row_stream = client.query_raw_txt(query, query_params).await?;
|
||||
|
||||
// Manually drain the stream into a vector to leave row_stream hanging
|
||||
// around to get a command tag. Also check that the response is not too
|
||||
// big.
|
||||
pin_mut!(row_stream);
|
||||
let mut rows: Vec<tokio_postgres::Row> = Vec::new();
|
||||
let mut curret_size = 0;
|
||||
while let Some(row) = row_stream.next().await {
|
||||
let row = row?;
|
||||
curret_size += row.body_len();
|
||||
rows.push(row);
|
||||
if curret_size > MAX_RESPONSE_SIZE {
|
||||
return Err(anyhow::anyhow!("response too large"));
|
||||
}
|
||||
}
|
||||
|
||||
// grab the command tag and number of rows affected
|
||||
let command_tag = row_stream.command_tag().unwrap_or_default();
|
||||
let mut command_tag_split = command_tag.split(' ');
|
||||
let command_tag_name = command_tag_split.next().unwrap_or_default();
|
||||
let command_tag_count = if command_tag_name == "INSERT" {
|
||||
// INSERT returns OID first and then number of rows
|
||||
command_tag_split.nth(1)
|
||||
} else {
|
||||
// other commands return number of rows (if any)
|
||||
command_tag_split.next()
|
||||
}
|
||||
.and_then(|s| s.parse::<i64>().ok());
|
||||
|
||||
let fields = if !rows.is_empty() {
|
||||
rows[0]
|
||||
.columns()
|
||||
.iter()
|
||||
.map(|c| {
|
||||
json!({
|
||||
"name": Value::String(c.name().to_owned()),
|
||||
"dataTypeID": Value::Number(c.type_().oid().into()),
|
||||
})
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
|
||||
// convert rows to JSON
|
||||
let rows = rows
|
||||
.iter()
|
||||
.map(pg_text_row_to_json)
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
// resulting JSON format is based on the format of node-postgres result
|
||||
Ok(json!({
|
||||
"command": command_tag_name,
|
||||
"rowCount": command_tag_count,
|
||||
"rows": rows,
|
||||
"fields": fields,
|
||||
}))
|
||||
}
|
||||
|
||||
//
|
||||
// Convert postgres row with text-encoded values to JSON object
|
||||
//
|
||||
pub fn pg_text_row_to_json(row: &Row) -> Result<Value, anyhow::Error> {
|
||||
let res = row
|
||||
.columns()
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, column)| {
|
||||
let name = column.name();
|
||||
let pg_value = row.as_text(i)?;
|
||||
let json_value = pg_text_to_json(pg_value, column.type_())?;
|
||||
Ok((name.to_string(), json_value))
|
||||
})
|
||||
.collect::<Result<Map<String, Value>, anyhow::Error>>()?;
|
||||
|
||||
Ok(Value::Object(res))
|
||||
}
|
||||
|
||||
//
|
||||
// Convert postgres text-encoded value to JSON value
|
||||
//
|
||||
pub fn pg_text_to_json(pg_value: Option<&str>, pg_type: &Type) -> Result<Value, anyhow::Error> {
|
||||
if let Some(val) = pg_value {
|
||||
if val == "NULL" {
|
||||
return Ok(Value::Null);
|
||||
}
|
||||
|
||||
if let Kind::Array(elem_type) = pg_type.kind() {
|
||||
return pg_array_parse(val, elem_type);
|
||||
}
|
||||
|
||||
match *pg_type {
|
||||
Type::BOOL => Ok(Value::Bool(val == "t")),
|
||||
Type::INT2 | Type::INT4 => {
|
||||
let val = val.parse::<i32>()?;
|
||||
Ok(Value::Number(serde_json::Number::from(val)))
|
||||
}
|
||||
Type::FLOAT4 | Type::FLOAT8 => {
|
||||
let fval = val.parse::<f64>()?;
|
||||
let num = serde_json::Number::from_f64(fval);
|
||||
if let Some(num) = num {
|
||||
Ok(Value::Number(num))
|
||||
} else {
|
||||
// Pass Nan, Inf, -Inf as strings
|
||||
// JS JSON.stringify() does converts them to null, but we
|
||||
// want to preserve them, so we pass them as strings
|
||||
Ok(Value::String(val.to_string()))
|
||||
}
|
||||
}
|
||||
Type::JSON | Type::JSONB => Ok(serde_json::from_str(val)?),
|
||||
_ => Ok(Value::String(val.to_string())),
|
||||
}
|
||||
} else {
|
||||
Ok(Value::Null)
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Parse postgres array into JSON array.
|
||||
//
|
||||
// This is a bit involved because we need to handle nested arrays and quoted
|
||||
// values. Unlike postgres we don't check that all nested arrays have the same
|
||||
// dimensions, we just return them as is.
|
||||
//
|
||||
fn pg_array_parse(pg_array: &str, elem_type: &Type) -> Result<Value, anyhow::Error> {
|
||||
_pg_array_parse(pg_array, elem_type, false).map(|(v, _)| v)
|
||||
}
|
||||
|
||||
fn _pg_array_parse(
|
||||
pg_array: &str,
|
||||
elem_type: &Type,
|
||||
nested: bool,
|
||||
) -> Result<(Value, usize), anyhow::Error> {
|
||||
let mut pg_array_chr = pg_array.char_indices();
|
||||
let mut level = 0;
|
||||
let mut quote = false;
|
||||
let mut entries: Vec<Value> = Vec::new();
|
||||
let mut entry = String::new();
|
||||
|
||||
// skip bounds decoration
|
||||
if let Some('[') = pg_array.chars().next() {
|
||||
for (_, c) in pg_array_chr.by_ref() {
|
||||
if c == '=' {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
while let Some((mut i, mut c)) = pg_array_chr.next() {
|
||||
let mut escaped = false;
|
||||
|
||||
if c == '\\' {
|
||||
escaped = true;
|
||||
(i, c) = pg_array_chr.next().unwrap();
|
||||
}
|
||||
|
||||
match c {
|
||||
'{' if !quote => {
|
||||
level += 1;
|
||||
if level > 1 {
|
||||
let (res, off) = _pg_array_parse(&pg_array[i..], elem_type, true)?;
|
||||
entries.push(res);
|
||||
for _ in 0..off - 1 {
|
||||
pg_array_chr.next();
|
||||
}
|
||||
}
|
||||
}
|
||||
'}' => {
|
||||
level -= 1;
|
||||
if level == 0 {
|
||||
if !entry.is_empty() {
|
||||
entries.push(pg_text_to_json(Some(&entry), elem_type)?);
|
||||
}
|
||||
if nested {
|
||||
return Ok((Value::Array(entries), i));
|
||||
}
|
||||
}
|
||||
}
|
||||
'"' if !escaped => {
|
||||
if quote {
|
||||
// push even if empty
|
||||
entries.push(pg_text_to_json(Some(&entry), elem_type)?);
|
||||
entry = String::new();
|
||||
}
|
||||
quote = !quote;
|
||||
}
|
||||
',' if !quote => {
|
||||
if !entry.is_empty() {
|
||||
entries.push(pg_text_to_json(Some(&entry), elem_type)?);
|
||||
entry = String::new();
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
entry.push(c);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if level != 0 {
|
||||
return Err(anyhow::anyhow!("unbalanced array"));
|
||||
}
|
||||
|
||||
Ok((Value::Array(entries), 0))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use serde_json::json;
|
||||
|
||||
#[test]
|
||||
fn test_atomic_types_to_pg_params() {
|
||||
let json = vec![Value::Bool(true), Value::Bool(false)];
|
||||
let pg_params = json_to_pg_text(json).unwrap();
|
||||
assert_eq!(pg_params, vec!["true", "false"]);
|
||||
|
||||
let json = vec![Value::Number(serde_json::Number::from(42))];
|
||||
let pg_params = json_to_pg_text(json).unwrap();
|
||||
assert_eq!(pg_params, vec!["42"]);
|
||||
|
||||
let json = vec![Value::String("foo\"".to_string())];
|
||||
let pg_params = json_to_pg_text(json).unwrap();
|
||||
assert_eq!(pg_params, vec!["foo\""]);
|
||||
|
||||
let json = vec![Value::Null];
|
||||
let pg_params = json_to_pg_text(json).unwrap();
|
||||
assert_eq!(pg_params, vec!["null"]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_json_array_to_pg_array() {
|
||||
// atoms and escaping
|
||||
let json = "[true, false, null, 42, \"foo\", \"bar\\\"-\\\\\"]";
|
||||
let json: Value = serde_json::from_str(json).unwrap();
|
||||
let pg_params = json_to_pg_text(vec![json]).unwrap();
|
||||
assert_eq!(
|
||||
pg_params,
|
||||
vec!["{true,false,null,42,\"foo\",\"bar\\\"-\\\\\"}"]
|
||||
);
|
||||
|
||||
// nested arrays
|
||||
let json = "[[true, false], [null, 42], [\"foo\", \"bar\\\"-\\\\\"]]";
|
||||
let json: Value = serde_json::from_str(json).unwrap();
|
||||
let pg_params = json_to_pg_text(vec![json]).unwrap();
|
||||
assert_eq!(
|
||||
pg_params,
|
||||
vec!["{{true,false},{null,42},{\"foo\",\"bar\\\"-\\\\\"}}"]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_atomic_types_parse() {
|
||||
assert_eq!(
|
||||
pg_text_to_json(Some("foo"), &Type::TEXT).unwrap(),
|
||||
json!("foo")
|
||||
);
|
||||
assert_eq!(pg_text_to_json(None, &Type::TEXT).unwrap(), json!(null));
|
||||
assert_eq!(pg_text_to_json(Some("42"), &Type::INT4).unwrap(), json!(42));
|
||||
assert_eq!(pg_text_to_json(Some("42"), &Type::INT2).unwrap(), json!(42));
|
||||
assert_eq!(
|
||||
pg_text_to_json(Some("42"), &Type::INT8).unwrap(),
|
||||
json!("42")
|
||||
);
|
||||
assert_eq!(
|
||||
pg_text_to_json(Some("42.42"), &Type::FLOAT8).unwrap(),
|
||||
json!(42.42)
|
||||
);
|
||||
assert_eq!(
|
||||
pg_text_to_json(Some("42.42"), &Type::FLOAT4).unwrap(),
|
||||
json!(42.42)
|
||||
);
|
||||
assert_eq!(
|
||||
pg_text_to_json(Some("NaN"), &Type::FLOAT4).unwrap(),
|
||||
json!("NaN")
|
||||
);
|
||||
assert_eq!(
|
||||
pg_text_to_json(Some("Infinity"), &Type::FLOAT4).unwrap(),
|
||||
json!("Infinity")
|
||||
);
|
||||
assert_eq!(
|
||||
pg_text_to_json(Some("-Infinity"), &Type::FLOAT4).unwrap(),
|
||||
json!("-Infinity")
|
||||
);
|
||||
|
||||
let json: Value =
|
||||
serde_json::from_str("{\"s\":\"str\",\"n\":42,\"f\":4.2,\"a\":[null,3,\"a\"]}")
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
pg_text_to_json(
|
||||
Some(r#"{"s":"str","n":42,"f":4.2,"a":[null,3,"a"]}"#),
|
||||
&Type::JSONB
|
||||
)
|
||||
.unwrap(),
|
||||
json
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_pg_array_parse_text() {
|
||||
fn pt(pg_arr: &str) -> Value {
|
||||
pg_array_parse(pg_arr, &Type::TEXT).unwrap()
|
||||
}
|
||||
assert_eq!(
|
||||
pt(r#"{"aa\"\\\,a",cha,"bbbb"}"#),
|
||||
json!(["aa\"\\,a", "cha", "bbbb"])
|
||||
);
|
||||
assert_eq!(
|
||||
pt(r#"{{"foo","bar"},{"bee","bop"}}"#),
|
||||
json!([["foo", "bar"], ["bee", "bop"]])
|
||||
);
|
||||
assert_eq!(
|
||||
pt(r#"{{{{"foo",NULL,"bop",bup}}}}"#),
|
||||
json!([[[["foo", null, "bop", "bup"]]]])
|
||||
);
|
||||
assert_eq!(
|
||||
pt(r#"{{"1",2,3},{4,NULL,6},{NULL,NULL,NULL}}"#),
|
||||
json!([["1", "2", "3"], ["4", null, "6"], [null, null, null]])
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_pg_array_parse_bool() {
|
||||
fn pb(pg_arr: &str) -> Value {
|
||||
pg_array_parse(pg_arr, &Type::BOOL).unwrap()
|
||||
}
|
||||
assert_eq!(pb(r#"{t,f,t}"#), json!([true, false, true]));
|
||||
assert_eq!(pb(r#"{{t,f,t}}"#), json!([[true, false, true]]));
|
||||
assert_eq!(
|
||||
pb(r#"{{t,f},{f,t}}"#),
|
||||
json!([[true, false], [false, true]])
|
||||
);
|
||||
assert_eq!(
|
||||
pb(r#"{{t,NULL},{NULL,f}}"#),
|
||||
json!([[true, null], [null, false]])
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_pg_array_parse_numbers() {
|
||||
fn pn(pg_arr: &str, ty: &Type) -> Value {
|
||||
pg_array_parse(pg_arr, ty).unwrap()
|
||||
}
|
||||
assert_eq!(pn(r#"{1,2,3}"#, &Type::INT4), json!([1, 2, 3]));
|
||||
assert_eq!(pn(r#"{1,2,3}"#, &Type::INT2), json!([1, 2, 3]));
|
||||
assert_eq!(pn(r#"{1,2,3}"#, &Type::INT8), json!(["1", "2", "3"]));
|
||||
assert_eq!(pn(r#"{1,2,3}"#, &Type::FLOAT4), json!([1.0, 2.0, 3.0]));
|
||||
assert_eq!(pn(r#"{1,2,3}"#, &Type::FLOAT8), json!([1.0, 2.0, 3.0]));
|
||||
assert_eq!(
|
||||
pn(r#"{1.1,2.2,3.3}"#, &Type::FLOAT4),
|
||||
json!([1.1, 2.2, 3.3])
|
||||
);
|
||||
assert_eq!(
|
||||
pn(r#"{1.1,2.2,3.3}"#, &Type::FLOAT8),
|
||||
json!([1.1, 2.2, 3.3])
|
||||
);
|
||||
assert_eq!(
|
||||
pn(r#"{NaN,Infinity,-Infinity}"#, &Type::FLOAT4),
|
||||
json!(["NaN", "Infinity", "-Infinity"])
|
||||
);
|
||||
assert_eq!(
|
||||
pn(r#"{NaN,Infinity,-Infinity}"#, &Type::FLOAT8),
|
||||
json!(["NaN", "Infinity", "-Infinity"])
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_pg_array_with_decoration() {
|
||||
fn p(pg_arr: &str) -> Value {
|
||||
pg_array_parse(pg_arr, &Type::INT2).unwrap()
|
||||
}
|
||||
assert_eq!(
|
||||
p(r#"[1:1][-2:-1][3:5]={{{1,2,3},{4,5,6}}}"#),
|
||||
json!([[[1, 2, 3], [4, 5, 6]]])
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -4,12 +4,17 @@ use crate::{
|
||||
use bytes::{Buf, Bytes};
|
||||
use futures::{Sink, Stream, StreamExt};
|
||||
use hyper::{
|
||||
server::{accept, conn::AddrIncoming},
|
||||
server::{
|
||||
accept,
|
||||
conn::{AddrIncoming, AddrStream},
|
||||
},
|
||||
upgrade::Upgraded,
|
||||
Body, Request, Response, StatusCode,
|
||||
Body, Method, Request, Response, StatusCode,
|
||||
};
|
||||
use hyper_tungstenite::{tungstenite::Message, HyperWebsocket, WebSocketStream};
|
||||
use pin_project_lite::pin_project;
|
||||
use serde_json::{json, Value};
|
||||
|
||||
use std::{
|
||||
convert::Infallible,
|
||||
future::ready,
|
||||
@@ -21,6 +26,7 @@ use tls_listener::TlsListener;
|
||||
use tokio::{
|
||||
io::{self, AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf},
|
||||
net::TcpListener,
|
||||
select,
|
||||
};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info, info_span, warn, Instrument};
|
||||
@@ -30,6 +36,8 @@ use utils::http::{error::ApiError, json::json_response};
|
||||
// Tracking issue: https://github.com/rust-lang/rust/issues/98407.
|
||||
use sync_wrapper::SyncWrapper;
|
||||
|
||||
use super::sql_over_http;
|
||||
|
||||
pin_project! {
|
||||
/// This is a wrapper around a [`WebSocketStream`] that
|
||||
/// implements [`AsyncRead`] and [`AsyncWrite`].
|
||||
@@ -159,6 +167,7 @@ async fn ws_handler(
|
||||
config: &'static ProxyConfig,
|
||||
cancel_map: Arc<CancelMap>,
|
||||
session_id: uuid::Uuid,
|
||||
sni_hostname: Option<String>,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let host = request
|
||||
.headers()
|
||||
@@ -181,8 +190,44 @@ async fn ws_handler(
|
||||
|
||||
// Return the response so the spawned future can continue.
|
||||
Ok(response)
|
||||
// TODO: that deserves a refactor as now this function also handles http json client besides websockets.
|
||||
// Right now I don't want to blow up sql-over-http patch with file renames and do that as a follow up instead.
|
||||
} else if request.uri().path() == "/sql" && request.method() == Method::POST {
|
||||
let result = select! {
|
||||
_ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
|
||||
Err(anyhow::anyhow!("Query timed out"))
|
||||
}
|
||||
response = sql_over_http::handle(config, request, sni_hostname) => {
|
||||
response
|
||||
}
|
||||
};
|
||||
let status_code = match result {
|
||||
Ok(_) => StatusCode::OK,
|
||||
Err(_) => StatusCode::BAD_REQUEST,
|
||||
};
|
||||
let json = match result {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
let message = format!("{:?}", e);
|
||||
let code = match e.downcast_ref::<tokio_postgres::Error>() {
|
||||
Some(e) => match e.code() {
|
||||
Some(e) => serde_json::to_value(e.code()).unwrap(),
|
||||
None => Value::Null,
|
||||
},
|
||||
None => Value::Null,
|
||||
};
|
||||
json!({ "message": message, "code": code })
|
||||
}
|
||||
};
|
||||
json_response(status_code, json).map(|mut r| {
|
||||
r.headers_mut().insert(
|
||||
"Access-Control-Allow-Origin",
|
||||
hyper::http::HeaderValue::from_static("*"),
|
||||
);
|
||||
r
|
||||
})
|
||||
} else {
|
||||
json_response(StatusCode::OK, "Connect with a websocket client")
|
||||
json_response(StatusCode::BAD_REQUEST, "query is not supported")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -216,20 +261,27 @@ pub async fn task_main(
|
||||
}
|
||||
});
|
||||
|
||||
let make_svc = hyper::service::make_service_fn(|_stream| async move {
|
||||
Ok::<_, Infallible>(hyper::service::service_fn(
|
||||
move |req: Request<Body>| async move {
|
||||
let cancel_map = Arc::new(CancelMap::default());
|
||||
let session_id = uuid::Uuid::new_v4();
|
||||
ws_handler(req, config, cancel_map, session_id)
|
||||
.instrument(info_span!(
|
||||
"ws-client",
|
||||
session = format_args!("{session_id}")
|
||||
))
|
||||
.await
|
||||
},
|
||||
))
|
||||
});
|
||||
let make_svc =
|
||||
hyper::service::make_service_fn(|stream: &tokio_rustls::server::TlsStream<AddrStream>| {
|
||||
let sni_name = stream.get_ref().1.sni_hostname().map(|s| s.to_string());
|
||||
|
||||
async move {
|
||||
Ok::<_, Infallible>(hyper::service::service_fn(move |req: Request<Body>| {
|
||||
let sni_name = sni_name.clone();
|
||||
async move {
|
||||
let cancel_map = Arc::new(CancelMap::default());
|
||||
let session_id = uuid::Uuid::new_v4();
|
||||
|
||||
ws_handler(req, config, cancel_map, session_id, sni_name)
|
||||
.instrument(info_span!(
|
||||
"ws-client",
|
||||
session = format_args!("{session_id}")
|
||||
))
|
||||
.await
|
||||
}
|
||||
}))
|
||||
}
|
||||
});
|
||||
|
||||
hyper::Server::builder(accept::from_stream(tls_listener))
|
||||
.serve(make_svc)
|
||||
|
||||
@@ -455,6 +455,9 @@ impl<'a, S> Client<'a, S> {
|
||||
|
||||
impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
|
||||
/// Let the client authenticate and connect to the designated compute node.
|
||||
// Instrumentation logs endpoint name everywhere. Doesn't work for link
|
||||
// auth; strictly speaking we don't know endpoint name in its case.
|
||||
#[tracing::instrument(name = "", fields(ep = self.creds.get_endpoint().unwrap_or("".to_owned())), skip_all)]
|
||||
async fn connect_to_db(
|
||||
self,
|
||||
session: cancellation::Session<'_>,
|
||||
|
||||
@@ -10,7 +10,7 @@ pytest = "^6.2.5"
|
||||
psycopg2-binary = "^2.9.1"
|
||||
typing-extensions = "^4.1.0"
|
||||
PyJWT = {version = "^2.1.0", extras = ["crypto"]}
|
||||
requests = "^2.26.0"
|
||||
requests = "^2.31.0"
|
||||
pytest-xdist = "^3.0.2"
|
||||
asyncpg = "^0.27.0"
|
||||
aiopg = "^1.3.1"
|
||||
|
||||
@@ -2042,15 +2042,19 @@ class NeonProxy(PgProtocol):
|
||||
proxy_port: int,
|
||||
http_port: int,
|
||||
mgmt_port: int,
|
||||
external_http_port: int,
|
||||
auth_backend: NeonProxy.AuthBackend,
|
||||
metric_collection_endpoint: Optional[str] = None,
|
||||
metric_collection_interval: Optional[str] = None,
|
||||
):
|
||||
host = "127.0.0.1"
|
||||
super().__init__(dsn=auth_backend.default_conn_url, host=host, port=proxy_port)
|
||||
domain = "proxy.localtest.me" # resolves to 127.0.0.1
|
||||
super().__init__(dsn=auth_backend.default_conn_url, host=domain, port=proxy_port)
|
||||
|
||||
self.domain = domain
|
||||
self.host = host
|
||||
self.http_port = http_port
|
||||
self.external_http_port = external_http_port
|
||||
self.neon_binpath = neon_binpath
|
||||
self.test_output_dir = test_output_dir
|
||||
self.proxy_port = proxy_port
|
||||
@@ -2062,11 +2066,42 @@ class NeonProxy(PgProtocol):
|
||||
|
||||
def start(self) -> NeonProxy:
|
||||
assert self._popen is None
|
||||
|
||||
# generate key of it doesn't exist
|
||||
crt_path = self.test_output_dir / "proxy.crt"
|
||||
key_path = self.test_output_dir / "proxy.key"
|
||||
|
||||
if not key_path.exists():
|
||||
r = subprocess.run(
|
||||
[
|
||||
"openssl",
|
||||
"req",
|
||||
"-new",
|
||||
"-x509",
|
||||
"-days",
|
||||
"365",
|
||||
"-nodes",
|
||||
"-text",
|
||||
"-out",
|
||||
str(crt_path),
|
||||
"-keyout",
|
||||
str(key_path),
|
||||
"-subj",
|
||||
"/CN=*.localtest.me",
|
||||
"-addext",
|
||||
"subjectAltName = DNS:*.localtest.me",
|
||||
]
|
||||
)
|
||||
assert r.returncode == 0
|
||||
|
||||
args = [
|
||||
str(self.neon_binpath / "proxy"),
|
||||
*["--http", f"{self.host}:{self.http_port}"],
|
||||
*["--proxy", f"{self.host}:{self.proxy_port}"],
|
||||
*["--mgmt", f"{self.host}:{self.mgmt_port}"],
|
||||
*["--wss", f"{self.host}:{self.external_http_port}"],
|
||||
*["-c", str(crt_path)],
|
||||
*["-k", str(key_path)],
|
||||
*self.auth_backend.extra_args(),
|
||||
]
|
||||
|
||||
@@ -2190,6 +2225,7 @@ def link_proxy(
|
||||
http_port = port_distributor.get_port()
|
||||
proxy_port = port_distributor.get_port()
|
||||
mgmt_port = port_distributor.get_port()
|
||||
external_http_port = port_distributor.get_port()
|
||||
|
||||
with NeonProxy(
|
||||
neon_binpath=neon_binpath,
|
||||
@@ -2197,6 +2233,7 @@ def link_proxy(
|
||||
proxy_port=proxy_port,
|
||||
http_port=http_port,
|
||||
mgmt_port=mgmt_port,
|
||||
external_http_port=external_http_port,
|
||||
auth_backend=NeonProxy.Link(),
|
||||
) as proxy:
|
||||
proxy.start()
|
||||
@@ -2224,6 +2261,7 @@ def static_proxy(
|
||||
proxy_port = port_distributor.get_port()
|
||||
mgmt_port = port_distributor.get_port()
|
||||
http_port = port_distributor.get_port()
|
||||
external_http_port = port_distributor.get_port()
|
||||
|
||||
with NeonProxy(
|
||||
neon_binpath=neon_binpath,
|
||||
@@ -2231,6 +2269,7 @@ def static_proxy(
|
||||
proxy_port=proxy_port,
|
||||
http_port=http_port,
|
||||
mgmt_port=mgmt_port,
|
||||
external_http_port=external_http_port,
|
||||
auth_backend=NeonProxy.Postgres(auth_endpoint),
|
||||
) as proxy:
|
||||
proxy.start()
|
||||
|
||||
76
test_runner/performance/test_gc_feedback.py
Normal file
76
test_runner/performance/test_gc_feedback.py
Normal file
@@ -0,0 +1,76 @@
|
||||
import pytest
|
||||
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
|
||||
|
||||
@pytest.mark.timeout(10000)
|
||||
def test_gc_feedback(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenchmarker):
|
||||
"""
|
||||
Test that GC is able to collect all old layers even if them are forming
|
||||
"stairs" and there are not three delta layers since last image layer.
|
||||
|
||||
Information about image layers needed to collect old layers should
|
||||
be propagated by GC to compaction task which should take in in account
|
||||
when make a decision which new image layers needs to be created.
|
||||
"""
|
||||
env = neon_env_builder.init_start()
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
tenant_id, _ = env.neon_cli.create_tenant(
|
||||
conf={
|
||||
# disable default GC and compaction
|
||||
"gc_period": "1000 m",
|
||||
"compaction_period": "0 s",
|
||||
"gc_horizon": f"{1024 ** 2}",
|
||||
"checkpoint_distance": f"{1024 ** 2}",
|
||||
"compaction_target_size": f"{1024 ** 2}",
|
||||
# set PITR interval to be small, so we can do GC
|
||||
"pitr_interval": "10 s",
|
||||
# "compaction_threshold": "3",
|
||||
# "image_creation_threshold": "2",
|
||||
}
|
||||
)
|
||||
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
|
||||
timeline_id = endpoint.safe_psql("show neon.timeline_id")[0][0]
|
||||
n_steps = 10
|
||||
n_update_iters = 100
|
||||
step_size = 10000
|
||||
with endpoint.cursor() as cur:
|
||||
cur.execute("SET statement_timeout='1000s'")
|
||||
cur.execute(
|
||||
"CREATE TABLE t(step bigint, count bigint default 0, payload text default repeat(' ', 100)) with (fillfactor=50)"
|
||||
)
|
||||
cur.execute("CREATE INDEX ON t(step)")
|
||||
# In each step, we insert 'step_size' new rows, and update the newly inserted rows
|
||||
# 'n_update_iters' times. This creates a lot of churn and generates lots of WAL at the end of the table,
|
||||
# without modifying the earlier parts of the table.
|
||||
for step in range(n_steps):
|
||||
cur.execute(f"INSERT INTO t (step) SELECT {step} FROM generate_series(1, {step_size})")
|
||||
for i in range(n_update_iters):
|
||||
cur.execute(f"UPDATE t set count=count+1 where step = {step}")
|
||||
cur.execute("vacuum t")
|
||||
|
||||
# cur.execute("select pg_table_size('t')")
|
||||
# logical_size = cur.fetchone()[0]
|
||||
logical_size = client.timeline_detail(tenant_id, timeline_id)["current_logical_size"]
|
||||
log.info(f"Logical storage size {logical_size}")
|
||||
|
||||
client.timeline_checkpoint(tenant_id, timeline_id)
|
||||
|
||||
# Do compaction and GC
|
||||
client.timeline_gc(tenant_id, timeline_id, 0)
|
||||
client.timeline_compact(tenant_id, timeline_id)
|
||||
# One more iteration to check that no excessive image layers are generated
|
||||
client.timeline_gc(tenant_id, timeline_id, 0)
|
||||
client.timeline_compact(tenant_id, timeline_id)
|
||||
|
||||
physical_size = client.timeline_detail(tenant_id, timeline_id)["current_physical_size"]
|
||||
log.info(f"Physical storage size {physical_size}")
|
||||
|
||||
MB = 1024 * 1024
|
||||
zenbenchmark.record("logical_size", logical_size // MB, "Mb", MetricReport.LOWER_IS_BETTER)
|
||||
zenbenchmark.record("physical_size", physical_size // MB, "Mb", MetricReport.LOWER_IS_BETTER)
|
||||
zenbenchmark.record(
|
||||
"physical/logical ratio", physical_size / logical_size, "", MetricReport.LOWER_IS_BETTER
|
||||
)
|
||||
@@ -204,6 +204,7 @@ def proxy_with_metric_collector(
|
||||
http_port = port_distributor.get_port()
|
||||
proxy_port = port_distributor.get_port()
|
||||
mgmt_port = port_distributor.get_port()
|
||||
external_http_port = port_distributor.get_port()
|
||||
|
||||
(host, port) = httpserver_listen_address
|
||||
metric_collection_endpoint = f"http://{host}:{port}/billing/api/v1/usage_events"
|
||||
@@ -215,6 +216,7 @@ def proxy_with_metric_collector(
|
||||
proxy_port=proxy_port,
|
||||
http_port=http_port,
|
||||
mgmt_port=mgmt_port,
|
||||
external_http_port=external_http_port,
|
||||
metric_collection_endpoint=metric_collection_endpoint,
|
||||
metric_collection_interval=metric_collection_interval,
|
||||
auth_backend=NeonProxy.Link(),
|
||||
|
||||
@@ -1,22 +1,32 @@
|
||||
import json
|
||||
import subprocess
|
||||
from typing import Any, List
|
||||
|
||||
import psycopg2
|
||||
import pytest
|
||||
import requests
|
||||
from fixtures.neon_fixtures import PSQL, NeonProxy, VanillaPostgres
|
||||
|
||||
|
||||
@pytest.mark.parametrize("option_name", ["project", "endpoint"])
|
||||
def test_proxy_select_1(static_proxy: NeonProxy, option_name: str):
|
||||
def test_proxy_select_1(static_proxy: NeonProxy):
|
||||
"""
|
||||
A simplest smoke test: check proxy against a local postgres instance.
|
||||
"""
|
||||
|
||||
out = static_proxy.safe_psql("select 1", options=f"{option_name}=generic-project-name")
|
||||
# no SNI, deprecated `options=project` syntax (before we had several endpoint in project)
|
||||
out = static_proxy.safe_psql("select 1", sslsni=0, options="project=generic-project-name")
|
||||
assert out[0][0] == 1
|
||||
|
||||
# no SNI, new `options=endpoint` syntax
|
||||
out = static_proxy.safe_psql("select 1", sslsni=0, options="endpoint=generic-project-name")
|
||||
assert out[0][0] == 1
|
||||
|
||||
@pytest.mark.parametrize("option_name", ["project", "endpoint"])
|
||||
def test_password_hack(static_proxy: NeonProxy, option_name: str):
|
||||
# with SNI
|
||||
out = static_proxy.safe_psql("select 42", host="generic-project-name.localtest.me")
|
||||
assert out[0][0] == 42
|
||||
|
||||
|
||||
def test_password_hack(static_proxy: NeonProxy):
|
||||
"""
|
||||
Check the PasswordHack auth flow: an alternative to SCRAM auth for
|
||||
clients which can't provide the project/endpoint name via SNI or `options`.
|
||||
@@ -24,14 +34,16 @@ def test_password_hack(static_proxy: NeonProxy, option_name: str):
|
||||
|
||||
user = "borat"
|
||||
password = "password"
|
||||
static_proxy.safe_psql(
|
||||
f"create role {user} with login password '{password}'",
|
||||
options=f"{option_name}=irrelevant",
|
||||
)
|
||||
static_proxy.safe_psql(f"create role {user} with login password '{password}'")
|
||||
|
||||
# Note the format of `magic`!
|
||||
magic = f"{option_name}=irrelevant;{password}"
|
||||
static_proxy.safe_psql("select 1", sslsni=0, user=user, password=magic)
|
||||
magic = f"project=irrelevant;{password}"
|
||||
out = static_proxy.safe_psql("select 1", sslsni=0, user=user, password=magic)
|
||||
assert out[0][0] == 1
|
||||
|
||||
magic = f"endpoint=irrelevant;{password}"
|
||||
out = static_proxy.safe_psql("select 1", sslsni=0, user=user, password=magic)
|
||||
assert out[0][0] == 1
|
||||
|
||||
# Must also check that invalid magic won't be accepted.
|
||||
with pytest.raises(psycopg2.OperationalError):
|
||||
@@ -69,52 +81,55 @@ def test_proxy_options(static_proxy: NeonProxy, option_name: str):
|
||||
"""
|
||||
|
||||
options = f"{option_name}=irrelevant -cproxytest.option=value"
|
||||
out = static_proxy.safe_psql("show proxytest.option", options=options)
|
||||
out = static_proxy.safe_psql("show proxytest.option", options=options, sslsni=0)
|
||||
assert out[0][0] == "value"
|
||||
|
||||
options = f"-c proxytest.foo=\\ str {option_name}=irrelevant"
|
||||
out = static_proxy.safe_psql("show proxytest.foo", options=options, sslsni=0)
|
||||
assert out[0][0] == " str"
|
||||
|
||||
options = "-cproxytest.option=value"
|
||||
out = static_proxy.safe_psql("show proxytest.option", options=options)
|
||||
assert out[0][0] == "value"
|
||||
|
||||
options = "-c proxytest.foo=\\ str"
|
||||
out = static_proxy.safe_psql("show proxytest.foo", options=options)
|
||||
assert out[0][0] == " str"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("option_name", ["project", "endpoint"])
|
||||
def test_auth_errors(static_proxy: NeonProxy, option_name: str):
|
||||
def test_auth_errors(static_proxy: NeonProxy):
|
||||
"""
|
||||
Check that we throw very specific errors in some unsuccessful auth scenarios.
|
||||
"""
|
||||
|
||||
# User does not exist
|
||||
with pytest.raises(psycopg2.Error) as exprinfo:
|
||||
static_proxy.connect(user="pinocchio", options=f"{option_name}=irrelevant")
|
||||
static_proxy.connect(user="pinocchio")
|
||||
text = str(exprinfo.value).strip()
|
||||
assert text.endswith("password authentication failed for user 'pinocchio'")
|
||||
assert text.find("password authentication failed for user 'pinocchio'") != -1
|
||||
|
||||
static_proxy.safe_psql(
|
||||
"create role pinocchio with login password 'magic'",
|
||||
options=f"{option_name}=irrelevant",
|
||||
)
|
||||
|
||||
# User exists, but password is missing
|
||||
with pytest.raises(psycopg2.Error) as exprinfo:
|
||||
static_proxy.connect(user="pinocchio", password=None, options=f"{option_name}=irrelevant")
|
||||
static_proxy.connect(user="pinocchio", password=None)
|
||||
text = str(exprinfo.value).strip()
|
||||
assert text.endswith("password authentication failed for user 'pinocchio'")
|
||||
assert text.find("password authentication failed for user 'pinocchio'") != -1
|
||||
|
||||
# User exists, but password is wrong
|
||||
with pytest.raises(psycopg2.Error) as exprinfo:
|
||||
static_proxy.connect(user="pinocchio", password="bad", options=f"{option_name}=irrelevant")
|
||||
static_proxy.connect(user="pinocchio", password="bad")
|
||||
text = str(exprinfo.value).strip()
|
||||
assert text.endswith("password authentication failed for user 'pinocchio'")
|
||||
assert text.find("password authentication failed for user 'pinocchio'") != -1
|
||||
|
||||
# Finally, check that the user can connect
|
||||
with static_proxy.connect(
|
||||
user="pinocchio", password="magic", options=f"{option_name}=irrelevant"
|
||||
):
|
||||
with static_proxy.connect(user="pinocchio", password="magic"):
|
||||
pass
|
||||
|
||||
|
||||
@pytest.mark.parametrize("option_name", ["project", "endpoint"])
|
||||
def test_forward_params_to_client(static_proxy: NeonProxy, option_name: str):
|
||||
def test_forward_params_to_client(static_proxy: NeonProxy):
|
||||
"""
|
||||
Check that we forward all necessary PostgreSQL server params to client.
|
||||
"""
|
||||
@@ -140,7 +155,7 @@ def test_forward_params_to_client(static_proxy: NeonProxy, option_name: str):
|
||||
where name = any(%s)
|
||||
"""
|
||||
|
||||
with static_proxy.connect(options=f"{option_name}=irrelevant") as conn:
|
||||
with static_proxy.connect() as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute(query, (reported_params_subset,))
|
||||
for name, value in cur.fetchall():
|
||||
@@ -148,18 +163,65 @@ def test_forward_params_to_client(static_proxy: NeonProxy, option_name: str):
|
||||
assert conn.get_parameter_status(name) == value
|
||||
|
||||
|
||||
@pytest.mark.parametrize("option_name", ["project", "endpoint"])
|
||||
@pytest.mark.timeout(5)
|
||||
def test_close_on_connections_exit(static_proxy: NeonProxy, option_name: str):
|
||||
def test_close_on_connections_exit(static_proxy: NeonProxy):
|
||||
# Open two connections, send SIGTERM, then ensure that proxy doesn't exit
|
||||
# until after connections close.
|
||||
with static_proxy.connect(options=f"{option_name}=irrelevant"), static_proxy.connect(
|
||||
options=f"{option_name}=irrelevant"
|
||||
):
|
||||
with static_proxy.connect(), static_proxy.connect():
|
||||
static_proxy.terminate()
|
||||
with pytest.raises(subprocess.TimeoutExpired):
|
||||
static_proxy.wait_for_exit(timeout=2)
|
||||
# Ensure we don't accept any more connections
|
||||
with pytest.raises(psycopg2.OperationalError):
|
||||
static_proxy.connect(options=f"{option_name}=irrelevant")
|
||||
static_proxy.connect()
|
||||
static_proxy.wait_for_exit()
|
||||
|
||||
|
||||
def test_sql_over_http(static_proxy: NeonProxy):
|
||||
static_proxy.safe_psql("create role http with login password 'http' superuser")
|
||||
|
||||
def q(sql: str, params: List[Any] = []) -> Any:
|
||||
connstr = f"postgresql://http:http@{static_proxy.domain}:{static_proxy.proxy_port}/postgres"
|
||||
response = requests.post(
|
||||
f"https://{static_proxy.domain}:{static_proxy.external_http_port}/sql",
|
||||
data=json.dumps({"query": sql, "params": params}),
|
||||
headers={"Content-Type": "application/sql", "Neon-Connection-String": connstr},
|
||||
verify=str(static_proxy.test_output_dir / "proxy.crt"),
|
||||
)
|
||||
assert response.status_code == 200
|
||||
return response.json()
|
||||
|
||||
rows = q("select 42 as answer")["rows"]
|
||||
assert rows == [{"answer": 42}]
|
||||
|
||||
rows = q("select $1 as answer", [42])["rows"]
|
||||
assert rows == [{"answer": "42"}]
|
||||
|
||||
rows = q("select $1 * 1 as answer", [42])["rows"]
|
||||
assert rows == [{"answer": 42}]
|
||||
|
||||
rows = q("select $1::int[] as answer", [[1, 2, 3]])["rows"]
|
||||
assert rows == [{"answer": [1, 2, 3]}]
|
||||
|
||||
rows = q("select $1::json->'a' as answer", [{"a": {"b": 42}}])["rows"]
|
||||
assert rows == [{"answer": {"b": 42}}]
|
||||
|
||||
rows = q("select * from pg_class limit 1")["rows"]
|
||||
assert len(rows) == 1
|
||||
|
||||
res = q("create table t(id serial primary key, val int)")
|
||||
assert res["command"] == "CREATE"
|
||||
assert res["rowCount"] is None
|
||||
|
||||
res = q("insert into t(val) values (10), (20), (30) returning id")
|
||||
assert res["command"] == "INSERT"
|
||||
assert res["rowCount"] == 3
|
||||
assert res["rows"] == [{"id": 1}, {"id": 2}, {"id": 3}]
|
||||
|
||||
res = q("select * from t")
|
||||
assert res["command"] == "SELECT"
|
||||
assert res["rowCount"] == 3
|
||||
|
||||
res = q("drop table t")
|
||||
assert res["command"] == "DROP"
|
||||
assert res["rowCount"] is None
|
||||
|
||||
@@ -151,6 +151,7 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold =
|
||||
"eviction_policy": json.dumps(
|
||||
{"kind": "LayerAccessThreshold", "period": "80s", "threshold": "42h"}
|
||||
),
|
||||
"max_lsn_wal_lag": "13000000",
|
||||
}
|
||||
env.neon_cli.config_tenant(
|
||||
tenant_id=tenant,
|
||||
@@ -206,6 +207,7 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold =
|
||||
assert updated_effective_config["gc_horizon"] == 67108864
|
||||
assert updated_effective_config["image_creation_threshold"] == 2
|
||||
assert updated_effective_config["pitr_interval"] == "7days"
|
||||
assert updated_effective_config["max_lsn_wal_lag"] == 13000000
|
||||
|
||||
# restart the pageserver and ensure that the config is still correct
|
||||
env.pageserver.stop()
|
||||
@@ -265,6 +267,7 @@ eviction_policy = { "kind" = "LayerAccessThreshold", period = "20s", threshold =
|
||||
"period": "20s",
|
||||
"threshold": "23h",
|
||||
}
|
||||
assert final_effective_config["max_lsn_wal_lag"] == 10 * 1024 * 1024
|
||||
|
||||
# restart the pageserver and ensure that the config is still correct
|
||||
env.pageserver.stop()
|
||||
|
||||
Reference in New Issue
Block a user