Compare commits

..

13 Commits

Author SHA1 Message Date
fcdm
6de05cdafe foo 2024-02-16 16:27:16 +00:00
fcdm
c8f3bf8444 fin layer 2024-02-16 15:31:28 +00:00
fcdm
0c62fc8244 no extensions 2024-02-16 15:09:24 +00:00
fcdm
f0aac32517 initial plv8 2024-02-15 17:39:48 +00:00
fcdm
bbb0172ac8 postgis fix 2024-02-15 16:47:12 +00:00
fcdm
00f8954b91 postgis attempt 2024-02-15 16:32:26 +00:00
fcdm
25e32ede8c redo 2024-02-15 15:38:36 +00:00
fcdm
fe91482377 cleanup 2024-02-15 15:26:35 +00:00
fcdm
2f0ab4bf0a more comment 2024-02-14 16:44:34 +00:00
fcdm
5fc57e0ab2 set prefix 2024-02-14 16:42:20 +00:00
fcdm
e50d964688 for loop 2024-02-13 20:56:05 +00:00
fcdm
faf97f6c0a comment extensions 2024-02-13 15:59:46 +00:00
fcdm
9320511746 wip 2024-02-13 12:02:28 +00:00
41 changed files with 375 additions and 724 deletions

7
Cargo.lock generated
View File

@@ -2263,11 +2263,11 @@ dependencies = [
[[package]]
name = "hashlink"
version = "0.8.4"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8094feaf31ff591f651a2664fb9cfd92bba7a60ce3197265e9482ebe753c8f7"
checksum = "0761a1b9491c4f2e3d66aa0f62d0fba0af9a0e2852e4d48ea506632a4b56e6aa"
dependencies = [
"hashbrown 0.14.0",
"hashbrown 0.13.2",
]
[[package]]
@@ -3952,7 +3952,6 @@ dependencies = [
"pin-project-lite",
"postgres-protocol",
"rand 0.8.5",
"serde",
"thiserror",
"tokio",
"tracing",

View File

@@ -81,7 +81,7 @@ futures-core = "0.3"
futures-util = "0.3"
git-version = "0.3"
hashbrown = "0.13"
hashlink = "0.8.4"
hashlink = "0.8.1"
hdrhistogram = "7.5.2"
hex = "0.4"
hex-literal = "0.4"

View File

@@ -0,0 +1,179 @@
ARG REPOSITORY=neondatabase
ARG IMAGE=build-tools
ARG TAG=pinned
ARG BUILD_TAG
#########################################################################################
#
# Layer "build-deps"
#
#########################################################################################
FROM debian:bullseye-slim AS build-deps
RUN apt update && \
apt install -y git autoconf automake libtool build-essential bison flex libreadline-dev \
zlib1g-dev libxml2-dev libcurl4-openssl-dev libossp-uuid-dev wget pkg-config libssl-dev \
libicu-dev libxslt1-dev liblz4-dev libzstd-dev zstd
#########################################################################################
#
# Layer "pg-build"
# Build Postgres from the neon postgres repository.
#
#########################################################################################
FROM build-deps AS pg-build
COPY "vendor/postgres-v14" /postgres-v14
COPY "vendor/postgres-v15" /postgres-v15
COPY "vendor/postgres-v16" /postgres-v16
RUN for pg_version in v14 v15 v16; do \
install_dir="/postgres-$pg_version"; \
cd "$install_dir"; \
prefix="/usr/local/pgsql-${pg_version}"; \
export CONFIGURE_CMD="./configure --prefix ${prefix} CFLAGS='-O2 -g3' --enable-debug --with-openssl --with-uuid=ossp \
--with-icu --with-libxml --with-libxslt --with-lz4" && \
if [ "${pg_version}" != "v14" ]; then \
# zstd is available only from PG15
export CONFIGURE_CMD="${CONFIGURE_CMD} --with-zstd"; \
fi && \
eval $CONFIGURE_CMD && \
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s install && \
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C contrib/ install && \
# Install headers
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C src/include install && \
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C src/interfaces/libpq install && \
extension_dir="${prefix}/share/extension" && \
# Enable some of contrib extensions
echo 'trusted = true' >> $extension_dir/autoinc.control && \
echo 'trusted = true' >> $extension_dir/bloom.control && \
echo 'trusted = true' >> $extension_dir/earthdistance.control && \
echo 'trusted = true' >> $extension_dir/insert_username.control && \
echo 'trusted = true' >> $extension_dir/intagg.control && \
echo 'trusted = true' >> $extension_dir/moddatetime.control && \
echo 'trusted = true' >> $extension_dir/pg_stat_statements.control && \
echo 'trusted = true' >> $extension_dir/pgrowlocks.control && \
echo 'trusted = true' >> $extension_dir/pgstattuple.control && \
echo 'trusted = true' >> $extension_dir/refint.control && \
echo 'trusted = true' >> $extension_dir/xml2.control && \
# We need to grant EXECUTE on pg_stat_statements_reset() to neon_superuser.
# In vanilla postgres this function is limited to Postgres role superuser.
# In neon we have neon_superuser role that is not a superuser but replaces superuser in some cases.
# We could add the additional grant statements to the postgres repository but it would be hard to maintain,
# whenever we need to pick up a new postgres version and we want to limit the changes in our postgres fork,
# so we do it here.
old_list="pg_stat_statements--1.0--1.1.sql pg_stat_statements--1.1--1.2.sql pg_stat_statements--1.2--1.3.sql pg_stat_statements--1.3--1.4.sql pg_stat_statements--1.4--1.5.sql pg_stat_statements--1.4.sql pg_stat_statements--1.5--1.6.sql"; \
# the first loop is for pg_stat_statement extension version <= 1.6
for file in $prefix/share/extension/pg_stat_statements--*.sql; do \
filename=$(basename "$file"); \
if echo "$old_list" | grep -q -F "$filename"; then \
echo 'GRANT EXECUTE ON FUNCTION pg_stat_statements_reset() TO neon_superuser;' >> $file; \
fi; \
done; \
# the second loop is for pg_stat_statement extension versions >= 1.7,
# where pg_stat_statement_reset() got 3 additional arguments
for file in $prefix/share/extension/pg_stat_statements--*.sql; do \
filename=$(basename "$file"); \
if ! echo "$old_list" | grep -q -F "$filename"; then \
echo 'GRANT EXECUTE ON FUNCTION pg_stat_statements_reset(Oid, Oid, bigint) TO neon_superuser;' >> $file; \
fi; \
done; \
# Go back to root dir from `/postgres-v<version>` dir
cd ..; \
done
#########################################################################################
#
# Compile and run the Neon-specific `compute_ctl` binary
#
#########################################################################################
FROM $REPOSITORY/$IMAGE:$TAG AS compute-tools
ARG BUILD_TAG
ENV BUILD_TAG=$BUILD_TAG
USER nonroot
# Copy entire project to get Cargo.* files with proper dependencies for the whole project
COPY --chown=nonroot . .
RUN cd compute_tools && cargo build --locked --profile release-line-debug-size-lto
#########################################################################################
#
# Clean up postgres folder before inclusion
#
#########################################################################################
FROM pg-build AS postgres-cleanup-layer
# COPY --from=neon-pg-ext-build /usr/local/pgsql /usr/local/pgsql
RUN for pg_version in v14 v15 v16; do \
prefix="/usr/local/pgsql-${pg_version}"; \
# Remove binaries from /bin/ that we won't use (or would manually copy & install otherwise)
cd "${prefix}/bin" && rm ecpg raster2pgsql shp2pgsql pgtopo_export pgtopo_import pgsql2shp; \
cd ..; \
# Remove headers that we won't need anymore - we've completed installation of all extensions
rm -r "${prefix}/include"; \
# Remove static postgresql libraries - all compilation is finished, so we
# can now remove these files - they must be included in other binaries by now
# if they were to be used by other libraries.
rm ${prefix}/lib/lib*.a; \
done
#########################################################################################
#
# Final layer
# Put it all together into the final image
#
#########################################################################################
FROM debian:bullseye-slim
# Add user postgres
RUN mkdir /var/db && useradd -m -d /var/db/postgres postgres && \
echo "postgres:test_console_pass" | chpasswd && \
mkdir /var/db/postgres/compute && mkdir /var/db/postgres/specs && \
mkdir /var/db/postgres/pgbouncer && \
chown -R postgres:postgres /var/db/postgres && \
chmod 0750 /var/db/postgres/compute && \
chmod 0750 /var/db/postgres/pgbouncer && \
echo '/usr/local/lib' >> /etc/ld.so.conf && /sbin/ldconfig && \
# create folder for file cache
mkdir -p -m 777 /neon/cache
COPY --from=postgres-cleanup-layer --chown=postgres /usr/local/pgsql-v14 /usr/local/pgsql-v14
COPY --from=postgres-cleanup-layer --chown=postgres /usr/local/pgsql-v15 /usr/local/pgsql-v15
COPY --from=postgres-cleanup-layer --chown=postgres /usr/local/pgsql-v16 /usr/local/pgsql-v16
COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-debug-size-lto/compute_ctl /usr/local/bin/compute_ctl
# Install:
# libreadline8 for psql
# libicu67, locales for collations (including ICU and plpgsql_check)
# liblz4-1 for lz4
# libossp-uuid16 for extension ossp-uuid
# libgeos, libgdal, libsfcgal1, libproj and libprotobuf-c1 for PostGIS
# libxml2, libxslt1.1 for xml2
# libzstd1 for zstd
# libboost* for rdkit
# ca-certificates for communicating with s3 by compute_ctl
RUN apt update && \
apt install --no-install-recommends -y \
gdb \
libicu67 \
liblz4-1 \
libreadline8 \
libboost-iostreams1.74.0 \
libboost-regex1.74.0 \
libboost-serialization1.74.0 \
libboost-system1.74.0 \
libossp-uuid16 \
libgeos-c1v5 \
libgdal28 \
libproj19 \
libprotobuf-c1 \
libsfcgal1 \
libxml2 \
libxslt1.1 \
libzstd1 \
libcurl4-openssl-dev \
locales \
procps \
ca-certificates && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \
localedef -i en_US -c -f UTF-8 -A /usr/share/locale/locale.alias en_US.UTF-8
ENV LANG en_US.utf8
USER postgres
ENTRYPOINT ["/usr/local/bin/compute_ctl"]

View File

@@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::env;
use std::fs;
use std::io::BufRead;
use std::os::unix::fs::{symlink, PermissionsExt};
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::process::{Command, Stdio};
use std::str::FromStr;
@@ -634,48 +634,6 @@ impl ComputeNode {
// Update pg_hba.conf received with basebackup.
update_pg_hba(pgdata_path)?;
// Place pg_dynshmem under /dev/shm. This allows us to use
// 'dynamic_shared_memory_type = mmap' so that the files are placed in
// /dev/shm, similar to how 'dynamic_shared_memory_type = posix' works.
//
// Why on earth don't we just stick to the 'posix' default, you might
// ask. It turns out that making large allocations with 'posix' doesn't
// work very well with autoscaling. The behavior we want is that:
//
// 1. You can make large DSM allocations, larger than the current RAM
// size of the VM, without errors
//
// 2. If the allocated memory is really used, the VM is scaled up
// automatically to accommodate that
//
// We try to make that possible by having swap in the VM. But with the
// default 'posix' DSM implementation, we fail step 1, even when there's
// plenty of swap available. PostgreSQL uses posix_fallocate() to create
// the shmem segment, which is really just a file in /dev/shm in Linux,
// but posix_fallocate() on tmpfs returns ENOMEM if the size is larger
// than available RAM.
//
// Using 'dynamic_shared_memory_type = mmap' works around that, because
// the Postgres 'mmap' DSM implementation doesn't use
// posix_fallocate(). Instead, it uses repeated calls to write(2) to
// fill the file with zeros. It's weird that that differs between
// 'posix' and 'mmap', but we take advantage of it. When the file is
// filled slowly with write(2), the kernel allows it to grow larger, as
// long as there's swap available.
//
// In short, using 'dynamic_shared_memory_type = mmap' allows us one DSM
// segment to be larger than currently available RAM. But because we
// don't want to store it on a real file, which the kernel would try to
// flush to disk, so symlink pg_dynshm to /dev/shm.
//
// We don't set 'dynamic_shared_memory_type = mmap' here, we let the
// control plane control that option. If 'mmap' is not used, this
// symlink doesn't affect anything.
//
// See https://github.com/neondatabase/autoscaling/issues/800
std::fs::remove_dir(pgdata_path.join("pg_dynshmem"))?;
symlink("/dev/shm/", pgdata_path.join("pg_dynshmem"))?;
match spec.mode {
ComputeMode::Primary => {}
ComputeMode::Replica | ComputeMode::Static(..) => {

View File

@@ -494,8 +494,6 @@ pub struct TimelineInfo {
pub current_logical_size: u64,
pub current_logical_size_is_accurate: bool,
pub directory_entries_counts: Vec<u64>,
/// Sum of the size of all layer files.
/// If a layer is present in both local FS and S3, it counts only once.
pub current_physical_size: Option<u64>, // is None when timeline is Unloaded

View File

@@ -124,7 +124,6 @@ impl RelTag {
Ord,
strum_macros::EnumIter,
strum_macros::FromRepr,
enum_map::Enum,
)]
#[repr(u8)]
pub enum SlruKind {

View File

@@ -13,6 +13,5 @@ rand.workspace = true
tokio.workspace = true
tracing.workspace = true
thiserror.workspace = true
serde.workspace = true
workspace_hack.workspace = true

View File

@@ -7,7 +7,6 @@ pub mod framed;
use byteorder::{BigEndian, ReadBytesExt};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use serde::{Deserialize, Serialize};
use std::{borrow::Cow, collections::HashMap, fmt, io, str};
// re-export for use in utils pageserver_feedback.rs
@@ -124,7 +123,7 @@ impl StartupMessageParams {
}
}
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy, Serialize, Deserialize)]
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
pub struct CancelKeyData {
pub backend_pid: i32,
pub cancel_key: i32,

View File

@@ -234,7 +234,7 @@ impl DeletionHeader {
let header_bytes = serde_json::to_vec(self).context("serialize deletion header")?;
let header_path = conf.deletion_header_path();
let temp_path = path_with_suffix_extension(&header_path, TEMP_SUFFIX);
VirtualFile::crashsafe_overwrite(&header_path, &temp_path, header_bytes)
VirtualFile::crashsafe_overwrite(&header_path, &temp_path, &header_bytes)
.await
.maybe_fatal_err("save deletion header")?;
@@ -325,7 +325,7 @@ impl DeletionList {
let temp_path = path_with_suffix_extension(&path, TEMP_SUFFIX);
let bytes = serde_json::to_vec(self).expect("Failed to serialize deletion list");
VirtualFile::crashsafe_overwrite(&path, &temp_path, bytes)
VirtualFile::crashsafe_overwrite(&path, &temp_path, &bytes)
.await
.maybe_fatal_err("save deletion list")
.map_err(Into::into)

View File

@@ -422,7 +422,6 @@ async fn build_timeline_info_common(
tenant::timeline::logical_size::Accuracy::Approximate => false,
tenant::timeline::logical_size::Accuracy::Exact => true,
},
directory_entries_counts: timeline.get_directory_metrics().to_vec(),
current_physical_size,
current_logical_size_non_incremental: None,
timeline_dir_layer_file_size_sum: None,

View File

@@ -602,15 +602,6 @@ pub(crate) mod initial_logical_size {
});
}
static DIRECTORY_ENTRIES_COUNT: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_directory_entries_count",
"Sum of the entries in pageserver-stored directory listings",
&["tenant_id", "shard_id", "timeline_id"]
)
.expect("failed to define a metric")
});
pub(crate) static TENANT_STATE_METRIC: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_tenant_states_count",
@@ -1818,7 +1809,6 @@ pub(crate) struct TimelineMetrics {
resident_physical_size_gauge: UIntGauge,
/// copy of LayeredTimeline.current_logical_size
pub current_logical_size_gauge: UIntGauge,
pub directory_entries_count_gauge: Lazy<UIntGauge, Box<dyn Send + Fn() -> UIntGauge>>,
pub num_persistent_files_created: IntCounter,
pub persistent_bytes_written: IntCounter,
pub evictions: IntCounter,
@@ -1828,12 +1818,12 @@ pub(crate) struct TimelineMetrics {
impl TimelineMetrics {
pub fn new(
tenant_shard_id: &TenantShardId,
timeline_id_raw: &TimelineId,
timeline_id: &TimelineId,
evictions_with_low_residence_duration_builder: EvictionsWithLowResidenceDurationBuilder,
) -> Self {
let tenant_id = tenant_shard_id.tenant_id.to_string();
let shard_id = format!("{}", tenant_shard_id.shard_slug());
let timeline_id = timeline_id_raw.to_string();
let timeline_id = timeline_id.to_string();
let flush_time_histo = StorageTimeMetrics::new(
StorageTimeOperation::LayerFlush,
&tenant_id,
@@ -1886,22 +1876,6 @@ impl TimelineMetrics {
let current_logical_size_gauge = CURRENT_LOGICAL_SIZE
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
// TODO use impl Trait syntax here once we have ability to use it: https://github.com/rust-lang/rust/issues/63065
let directory_entries_count_gauge_closure = {
let tenant_shard_id = *tenant_shard_id;
let timeline_id_raw = *timeline_id_raw;
move || {
let tenant_id = tenant_shard_id.tenant_id.to_string();
let shard_id = format!("{}", tenant_shard_id.shard_slug());
let timeline_id = timeline_id_raw.to_string();
let gauge: UIntGauge = DIRECTORY_ENTRIES_COUNT
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
gauge
}
};
let directory_entries_count_gauge: Lazy<UIntGauge, Box<dyn Send + Fn() -> UIntGauge>> =
Lazy::new(Box::new(directory_entries_count_gauge_closure));
let num_persistent_files_created = NUM_PERSISTENT_FILES_CREATED
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
@@ -1928,7 +1902,6 @@ impl TimelineMetrics {
last_record_gauge,
resident_physical_size_gauge,
current_logical_size_gauge,
directory_entries_count_gauge,
num_persistent_files_created,
persistent_bytes_written,
evictions,
@@ -1971,9 +1944,6 @@ impl Drop for TimelineMetrics {
RESIDENT_PHYSICAL_SIZE.remove_label_values(&[tenant_id, &shard_id, timeline_id]);
}
let _ = CURRENT_LOGICAL_SIZE.remove_label_values(&[tenant_id, &shard_id, timeline_id]);
if let Some(metric) = Lazy::get(&DIRECTORY_ENTRIES_COUNT) {
let _ = metric.remove_label_values(&[tenant_id, &shard_id, timeline_id]);
}
let _ =
NUM_PERSISTENT_FILES_CREATED.remove_label_values(&[tenant_id, &shard_id, timeline_id]);
let _ = PERSISTENT_BYTES_WRITTEN.remove_label_values(&[tenant_id, &shard_id, timeline_id]);

View File

@@ -14,7 +14,6 @@ use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_i
use crate::walrecord::NeonWalRecord;
use anyhow::{ensure, Context};
use bytes::{Buf, Bytes, BytesMut};
use enum_map::Enum;
use pageserver_api::key::{
dbdir_key_range, is_rel_block_key, is_slru_block_key, rel_block_to_key, rel_dir_to_key,
rel_key_range, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,
@@ -156,7 +155,6 @@ impl Timeline {
pending_updates: HashMap::new(),
pending_deletions: Vec::new(),
pending_nblocks: 0,
pending_directory_entries: Vec::new(),
lsn,
}
}
@@ -870,7 +868,6 @@ pub struct DatadirModification<'a> {
pending_updates: HashMap<Key, Vec<(Lsn, Value)>>,
pending_deletions: Vec<(Range<Key>, Lsn)>,
pending_nblocks: i64,
pending_directory_entries: Vec<(DirectoryKind, usize)>,
}
impl<'a> DatadirModification<'a> {
@@ -902,7 +899,6 @@ impl<'a> DatadirModification<'a> {
let buf = DbDirectory::ser(&DbDirectory {
dbdirs: HashMap::new(),
})?;
self.pending_directory_entries.push((DirectoryKind::Db, 0));
self.put(DBDIR_KEY, Value::Image(buf.into()));
// Create AuxFilesDirectory
@@ -911,24 +907,16 @@ impl<'a> DatadirModification<'a> {
let buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory {
xids: HashSet::new(),
})?;
self.pending_directory_entries
.push((DirectoryKind::TwoPhase, 0));
self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
let empty_dir = Value::Image(buf);
self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
self.pending_directory_entries
.push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
self.put(
slru_dir_to_key(SlruKind::MultiXactMembers),
empty_dir.clone(),
);
self.pending_directory_entries
.push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
self.pending_directory_entries
.push((DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets), 0));
Ok(())
}
@@ -1029,7 +1017,6 @@ impl<'a> DatadirModification<'a> {
let buf = RelDirectory::ser(&RelDirectory {
rels: HashSet::new(),
})?;
self.pending_directory_entries.push((DirectoryKind::Rel, 0));
self.put(
rel_dir_to_key(spcnode, dbnode),
Value::Image(Bytes::from(buf)),
@@ -1052,8 +1039,6 @@ impl<'a> DatadirModification<'a> {
if !dir.xids.insert(xid) {
anyhow::bail!("twophase file for xid {} already exists", xid);
}
self.pending_directory_entries
.push((DirectoryKind::TwoPhase, dir.xids.len()));
self.put(
TWOPHASEDIR_KEY,
Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
@@ -1089,8 +1074,6 @@ impl<'a> DatadirModification<'a> {
let mut dir = DbDirectory::des(&buf)?;
if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
let buf = DbDirectory::ser(&dir)?;
self.pending_directory_entries
.push((DirectoryKind::Db, dir.dbdirs.len()));
self.put(DBDIR_KEY, Value::Image(buf.into()));
} else {
warn!(
@@ -1128,8 +1111,6 @@ impl<'a> DatadirModification<'a> {
// Didn't exist. Update dbdir
dbdir.dbdirs.insert((rel.spcnode, rel.dbnode), false);
let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
self.pending_directory_entries
.push((DirectoryKind::Db, dbdir.dbdirs.len()));
self.put(DBDIR_KEY, Value::Image(buf.into()));
// and create the RelDirectory
@@ -1144,10 +1125,6 @@ impl<'a> DatadirModification<'a> {
if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
return Err(RelationError::AlreadyExists);
}
self.pending_directory_entries
.push((DirectoryKind::Rel, rel_dir.rels.len()));
self.put(
rel_dir_key,
Value::Image(Bytes::from(
@@ -1239,9 +1216,6 @@ impl<'a> DatadirModification<'a> {
let buf = self.get(dir_key, ctx).await?;
let mut dir = RelDirectory::des(&buf)?;
self.pending_directory_entries
.push((DirectoryKind::Rel, dir.rels.len()));
if dir.rels.remove(&(rel.relnode, rel.forknum)) {
self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
} else {
@@ -1277,8 +1251,6 @@ impl<'a> DatadirModification<'a> {
if !dir.segments.insert(segno) {
anyhow::bail!("slru segment {kind:?}/{segno} already exists");
}
self.pending_directory_entries
.push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
self.put(
dir_key,
Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
@@ -1323,8 +1295,6 @@ impl<'a> DatadirModification<'a> {
if !dir.segments.remove(&segno) {
warn!("slru segment {:?}/{} does not exist", kind, segno);
}
self.pending_directory_entries
.push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
self.put(
dir_key,
Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
@@ -1355,8 +1325,6 @@ impl<'a> DatadirModification<'a> {
if !dir.xids.remove(&xid) {
warn!("twophase file for xid {} does not exist", xid);
}
self.pending_directory_entries
.push((DirectoryKind::TwoPhase, dir.xids.len()));
self.put(
TWOPHASEDIR_KEY,
Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
@@ -1372,8 +1340,6 @@ impl<'a> DatadirModification<'a> {
let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
files: HashMap::new(),
})?;
self.pending_directory_entries
.push((DirectoryKind::AuxFiles, 0));
self.put(AUX_FILES_KEY, Value::Image(Bytes::from(buf)));
Ok(())
}
@@ -1400,9 +1366,6 @@ impl<'a> DatadirModification<'a> {
} else {
dir.files.insert(path, Bytes::copy_from_slice(content));
}
self.pending_directory_entries
.push((DirectoryKind::AuxFiles, dir.files.len()));
self.put(
AUX_FILES_KEY,
Value::Image(Bytes::from(
@@ -1464,10 +1427,6 @@ impl<'a> DatadirModification<'a> {
self.pending_nblocks = 0;
}
for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
writer.update_directory_entries_count(kind, count as u64);
}
Ok(())
}
@@ -1505,10 +1464,6 @@ impl<'a> DatadirModification<'a> {
writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
}
for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
writer.update_directory_entries_count(kind, count as u64);
}
Ok(())
}
@@ -1633,23 +1588,6 @@ struct SlruSegmentDirectory {
segments: HashSet<u32>,
}
#[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]
#[repr(u8)]
pub(crate) enum DirectoryKind {
Db,
TwoPhase,
Rel,
AuxFiles,
SlruSegment(SlruKind),
}
impl DirectoryKind {
pub(crate) const KINDS_NUM: usize = <DirectoryKind as Enum>::LENGTH;
pub(crate) fn offset(&self) -> usize {
self.into_usize()
}
}
static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
#[allow(clippy::bool_assert_comparison)]

View File

@@ -2880,7 +2880,7 @@ impl Tenant {
let config_path = config_path.to_owned();
tokio::task::spawn_blocking(move || {
Handle::current().block_on(async move {
let conf_content = conf_content.into_bytes();
let conf_content = conf_content.as_bytes();
VirtualFile::crashsafe_overwrite(&config_path, &temp_path, conf_content)
.await
.with_context(|| {
@@ -2917,7 +2917,7 @@ impl Tenant {
let target_config_path = target_config_path.to_owned();
tokio::task::spawn_blocking(move || {
Handle::current().block_on(async move {
let conf_content = conf_content.into_bytes();
let conf_content = conf_content.as_bytes();
VirtualFile::crashsafe_overwrite(&target_config_path, &temp_path, conf_content)
.await
.with_context(|| {

View File

@@ -131,23 +131,27 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
&mut self,
src_buf: B,
) -> (B::Buf, Result<(), Error>) {
let (src_buf, res) = self.inner.write_all(src_buf).await;
let nbytes = match res {
Ok(nbytes) => nbytes,
Err(e) => return (src_buf, Err(e)),
let src_buf_len = src_buf.bytes_init();
let (src_buf, res) = if src_buf_len > 0 {
let src_buf = src_buf.slice(0..src_buf_len);
let res = self.inner.write_all(&src_buf).await;
let src_buf = Slice::into_inner(src_buf);
(src_buf, res)
} else {
let res = self.inner.write_all(&[]).await;
(Slice::into_inner(src_buf.slice_full()), res)
};
self.offset += nbytes as u64;
(src_buf, Ok(()))
if let Ok(()) = &res {
self.offset += src_buf_len as u64;
}
(src_buf, res)
}
#[inline(always)]
/// Flushes the internal buffer to the underlying `VirtualFile`.
pub async fn flush_buffer(&mut self) -> Result<(), Error> {
let buf = std::mem::take(&mut self.buf);
let (mut buf, res) = self.inner.write_all(buf).await;
res?;
buf.clear();
self.buf = buf;
self.inner.write_all(&self.buf).await?;
self.buf.clear();
Ok(())
}

View File

@@ -279,7 +279,7 @@ pub async fn save_metadata(
let path = conf.metadata_path(tenant_shard_id, timeline_id);
let temp_path = path_with_suffix_extension(&path, TEMP_FILE_SUFFIX);
let metadata_bytes = data.to_bytes().context("serialize metadata")?;
VirtualFile::crashsafe_overwrite(&path, &temp_path, metadata_bytes)
VirtualFile::crashsafe_overwrite(&path, &temp_path, &metadata_bytes)
.await
.context("write metadata")?;
Ok(())

View File

@@ -486,7 +486,7 @@ impl<'a> TenantDownloader<'a> {
let heatmap_path_bg = heatmap_path.clone();
tokio::task::spawn_blocking(move || {
tokio::runtime::Handle::current().block_on(async move {
VirtualFile::crashsafe_overwrite(&heatmap_path_bg, &temp_path, heatmap_bytes).await
VirtualFile::crashsafe_overwrite(&heatmap_path_bg, &temp_path, &heatmap_bytes).await
})
})
.await

View File

@@ -461,8 +461,7 @@ impl DeltaLayerWriterInner {
file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
.await?;
for buf in block_buf.blocks {
let (_buf, res) = file.write_all(buf).await;
res?;
file.write_all(buf.as_ref()).await?;
}
assert!(self.lsn_range.start < self.lsn_range.end);
// Fill in the summary on blk 0
@@ -477,12 +476,17 @@ impl DeltaLayerWriterInner {
index_root_blk,
};
let mut buf = Vec::with_capacity(PAGE_SZ);
// TODO: could use smallvec here but it's a pain with Slice<T>
let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
Summary::ser_into(&summary, &mut buf)?;
if buf.spilled() {
// This is bad as we only have one free block for the summary
warn!(
"Used more than one page size for summary buffer: {}",
buf.len()
);
}
file.seek(SeekFrom::Start(0)).await?;
let (_buf, res) = file.write_all(buf).await;
res?;
file.write_all(&buf).await?;
let metadata = file
.metadata()
@@ -675,12 +679,18 @@ impl DeltaLayer {
let new_summary = rewrite(actual_summary);
let mut buf = Vec::with_capacity(PAGE_SZ);
// TODO: could use smallvec here, but it's a pain with Slice<T>
let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
if buf.spilled() {
// The code in DeltaLayerWriterInner just warn!()s for this.
// It should probably error out as well.
return Err(RewriteSummaryError::Other(anyhow::anyhow!(
"Used more than one page size for summary buffer: {}",
buf.len()
)));
}
file.seek(SeekFrom::Start(0)).await?;
let (_buf, res) = file.write_all(buf).await;
res?;
file.write_all(&buf).await?;
Ok(())
}
}

View File

@@ -341,12 +341,18 @@ impl ImageLayer {
let new_summary = rewrite(actual_summary);
let mut buf = Vec::with_capacity(PAGE_SZ);
// TODO: could use smallvec here but it's a pain with Slice<T>
let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
if buf.spilled() {
// The code in ImageLayerWriterInner just warn!()s for this.
// It should probably error out as well.
return Err(RewriteSummaryError::Other(anyhow::anyhow!(
"Used more than one page size for summary buffer: {}",
buf.len()
)));
}
file.seek(SeekFrom::Start(0)).await?;
let (_buf, res) = file.write_all(buf).await;
res?;
file.write_all(&buf).await?;
Ok(())
}
}
@@ -549,8 +555,7 @@ impl ImageLayerWriterInner {
.await?;
let (index_root_blk, block_buf) = self.tree.finish()?;
for buf in block_buf.blocks {
let (_buf, res) = file.write_all(buf).await;
res?;
file.write_all(buf.as_ref()).await?;
}
// Fill in the summary on blk 0
@@ -565,12 +570,17 @@ impl ImageLayerWriterInner {
index_root_blk,
};
let mut buf = Vec::with_capacity(PAGE_SZ);
// TODO: could use smallvec here but it's a pain with Slice<T>
let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
Summary::ser_into(&summary, &mut buf)?;
if buf.spilled() {
// This is bad as we only have one free block for the summary
warn!(
"Used more than one page size for summary buffer: {}",
buf.len()
);
}
file.seek(SeekFrom::Start(0)).await?;
let (_buf, res) = file.write_all(buf).await;
res?;
file.write_all(&buf).await?;
let metadata = file
.metadata()

View File

@@ -14,7 +14,6 @@ use enumset::EnumSet;
use fail::fail_point;
use futures::stream::StreamExt;
use itertools::Itertools;
use once_cell::sync::Lazy;
use pageserver_api::{
keyspace::{key_range_size, KeySpaceAccum},
models::{
@@ -35,22 +34,17 @@ use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::sync::gate::Gate;
use std::collections::{BTreeMap, BinaryHeap, HashMap, HashSet};
use std::ops::{Deref, Range};
use std::pin::pin;
use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::{Arc, Mutex, RwLock, Weak};
use std::time::{Duration, Instant, SystemTime};
use std::{
array,
collections::{BTreeMap, BinaryHeap, HashMap, HashSet},
sync::atomic::AtomicU64,
};
use std::{
cmp::{max, min, Ordering},
ops::ControlFlow,
};
use crate::pgdatadir_mapping::DirectoryKind;
use crate::tenant::timeline::logical_size::CurrentLogicalSize;
use crate::tenant::{
layer_map::{LayerMap, SearchResult},
@@ -264,8 +258,6 @@ pub struct Timeline {
// in `crate::page_service` writes these metrics.
pub(crate) query_metrics: crate::metrics::SmgrQueryTimePerTimeline,
directory_metrics: [AtomicU64; DirectoryKind::KINDS_NUM],
/// Ensures layers aren't frozen by checkpointer between
/// [`Timeline::get_layer_for_write`] and layer reads.
/// Locked automatically by [`TimelineWriter`] and checkpointer.
@@ -798,10 +790,6 @@ impl Timeline {
self.metrics.resident_physical_size_get()
}
pub(crate) fn get_directory_metrics(&self) -> [u64; DirectoryKind::KINDS_NUM] {
array::from_fn(|idx| self.directory_metrics[idx].load(AtomicOrdering::Relaxed))
}
///
/// Wait until WAL has been received and processed up to this LSN.
///
@@ -1508,8 +1496,6 @@ impl Timeline {
&timeline_id,
),
directory_metrics: array::from_fn(|_| AtomicU64::new(0)),
flush_loop_state: Mutex::new(FlushLoopState::NotStarted),
layer_flush_start_tx,
@@ -2278,29 +2264,6 @@ impl Timeline {
}
}
pub(crate) fn update_directory_entries_count(&self, kind: DirectoryKind, count: u64) {
self.directory_metrics[kind.offset()].store(count, AtomicOrdering::Relaxed);
let aux_metric =
self.directory_metrics[DirectoryKind::AuxFiles.offset()].load(AtomicOrdering::Relaxed);
let sum_of_entries = self
.directory_metrics
.iter()
.map(|v| v.load(AtomicOrdering::Relaxed))
.sum();
// Set a high general threshold and a lower threshold for the auxiliary files,
// as we can have large numbers of relations in the db directory.
const SUM_THRESHOLD: u64 = 5000;
const AUX_THRESHOLD: u64 = 1000;
if sum_of_entries >= SUM_THRESHOLD || aux_metric >= AUX_THRESHOLD {
self.metrics
.directory_entries_count_gauge
.set(sum_of_entries);
} else if let Some(metric) = Lazy::get(&self.metrics.directory_entries_count_gauge) {
metric.set(sum_of_entries);
}
}
async fn find_layer(&self, layer_file_name: &str) -> Option<Layer> {
let guard = self.layers.read().await;
for historic_layer in guard.layer_map().iter_historic_layers() {

View File

@@ -19,7 +19,7 @@ use once_cell::sync::OnceCell;
use pageserver_api::shard::TenantShardId;
use std::fs::{self, File};
use std::io::{Error, ErrorKind, Seek, SeekFrom};
use tokio_epoll_uring::{BoundedBuf, IoBufMut, Slice};
use tokio_epoll_uring::IoBufMut;
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
use std::os::unix::fs::FileExt;
@@ -410,10 +410,10 @@ impl VirtualFile {
/// step, the tmp path is renamed to the final path. As renames are
/// atomic, a crash during the write operation will never leave behind a
/// partially written file.
pub async fn crashsafe_overwrite<B: BoundedBuf>(
pub async fn crashsafe_overwrite(
final_path: &Utf8Path,
tmp_path: &Utf8Path,
content: B,
content: &[u8],
) -> std::io::Result<()> {
let Some(final_path_parent) = final_path.parent() else {
return Err(std::io::Error::from_raw_os_error(
@@ -430,8 +430,7 @@ impl VirtualFile {
.create_new(true),
)
.await?;
let (_content, res) = file.write_all(content).await;
res?;
file.write_all(content).await?;
file.sync_all().await?;
drop(file); // before the rename, that's important!
// renames are atomic
@@ -602,36 +601,23 @@ impl VirtualFile {
Ok(())
}
/// Writes `buf.slice(0..buf.bytes_init())`.
/// Returns the IoBuf that is underlying the BoundedBuf `buf`.
/// I.e., the returned value's `bytes_init()` method returns something different than the `bytes_init()` that was passed in.
/// It's quite brittle and easy to mis-use, so, we return the size in the Ok() variant.
pub async fn write_all<B: BoundedBuf>(&mut self, buf: B) -> (B::Buf, Result<usize, Error>) {
let nbytes = buf.bytes_init();
if nbytes == 0 {
return (Slice::into_inner(buf.slice_full()), Ok(0));
}
let mut buf = buf.slice(0..nbytes);
pub async fn write_all(&mut self, mut buf: &[u8]) -> Result<(), Error> {
while !buf.is_empty() {
// TODO: push `Slice` further down
match self.write(&buf).await {
match self.write(buf).await {
Ok(0) => {
return (
Slice::into_inner(buf),
Err(Error::new(
std::io::ErrorKind::WriteZero,
"failed to write whole buffer",
)),
);
return Err(Error::new(
std::io::ErrorKind::WriteZero,
"failed to write whole buffer",
));
}
Ok(n) => {
buf = buf.slice(n..);
buf = &buf[n..];
}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return (Slice::into_inner(buf), Err(e)),
Err(e) => return Err(e),
}
}
(Slice::into_inner(buf), Ok(nbytes))
Ok(())
}
async fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
@@ -690,6 +676,7 @@ where
F: FnMut(tokio_epoll_uring::Slice<B>, u64) -> Fut,
Fut: std::future::Future<Output = (tokio_epoll_uring::Slice<B>, std::io::Result<usize>)>,
{
use tokio_epoll_uring::BoundedBuf;
let mut buf: tokio_epoll_uring::Slice<B> = buf.slice_full(); // includes all the uninitialized memory
while buf.bytes_total() != 0 {
let res;
@@ -1076,19 +1063,10 @@ mod tests {
MaybeVirtualFile::File(file) => file.seek(pos),
}
}
async fn write_all<B: BoundedBuf>(&mut self, buf: B) -> Result<(), Error> {
async fn write_all(&mut self, buf: &[u8]) -> Result<(), Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => {
let (_buf, res) = file.write_all(buf).await;
res.map(|_| ())
}
MaybeVirtualFile::File(file) => {
let buf_len = buf.bytes_init();
if buf_len == 0 {
return Ok(());
}
file.write_all(&buf.slice(0..buf_len))
}
MaybeVirtualFile::VirtualFile(file) => file.write_all(buf).await,
MaybeVirtualFile::File(file) => file.write_all(buf),
}
}
@@ -1163,7 +1141,7 @@ mod tests {
.to_owned(),
)
.await?;
file_a.write_all(b"foobar".to_vec()).await?;
file_a.write_all(b"foobar").await?;
// cannot read from a file opened in write-only mode
let _ = file_a.read_string().await.unwrap_err();
@@ -1172,7 +1150,7 @@ mod tests {
let mut file_a = openfunc(path_a, OpenOptions::new().read(true).to_owned()).await?;
// cannot write to a file opened in read-only mode
let _ = file_a.write_all(b"bar".to_vec()).await.unwrap_err();
let _ = file_a.write_all(b"bar").await.unwrap_err();
// Try simple read
assert_eq!("foobar", file_a.read_string().await?);
@@ -1315,7 +1293,7 @@ mod tests {
let path = testdir.join("myfile");
let tmp_path = testdir.join("myfile.tmp");
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo".to_vec())
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo")
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
@@ -1324,7 +1302,7 @@ mod tests {
assert!(!tmp_path.exists());
drop(file);
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"bar".to_vec())
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"bar")
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
@@ -1346,7 +1324,7 @@ mod tests {
std::fs::write(&tmp_path, "some preexisting junk that should be removed").unwrap();
assert!(tmp_path.exists());
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo".to_vec())
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo")
.await
.unwrap();

View File

@@ -3079,6 +3079,14 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
XLogRecGetBlockTag(record, block_id, &rinfo, &forknum, &blkno);
#endif
/*
* Out of an abundance of caution, we always run redo on shared catalogs,
* regardless of whether the block is stored in shared buffers. See also
* this function's top comment.
*/
if (!OidIsValid(NInfoGetDbOid(rinfo)))
return false;
CopyNRelFileInfoToBufTag(tag, rinfo);
tag.forkNum = forknum;
tag.blockNum = blkno;
@@ -3092,28 +3100,17 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
*/
LWLockAcquire(partitionLock, LW_SHARED);
/*
* Out of an abundance of caution, we always run redo on shared catalogs,
* regardless of whether the block is stored in shared buffers. See also
* this function's top comment.
*/
if (!OidIsValid(NInfoGetDbOid(rinfo)))
{
no_redo_needed = false;
}
else
{
/* Try to find the relevant buffer */
buffer = BufTableLookup(&tag, hash);
/* Try to find the relevant buffer */
buffer = BufTableLookup(&tag, hash);
no_redo_needed = buffer < 0;
no_redo_needed = buffer < 0;
}
/* In both cases st lwlsn past this WAL record */
SetLastWrittenLSNForBlock(end_recptr, rinfo, forknum, blkno);
/*
* we don't have the buffer in memory, update lwLsn past this record, also
* evict page from file cache
* evict page fro file cache
*/
if (no_redo_needed)
lfc_evict(rinfo, forknum, blkno);

View File

@@ -1,8 +1,6 @@
use futures::future::Either;
use proxy::auth;
use proxy::auth::backend::MaybeOwned;
use proxy::cancellation::CancelMap;
use proxy::cancellation::CancellationHandler;
use proxy::config::AuthenticationConfig;
use proxy::config::CacheOptions;
use proxy::config::HttpConfig;
@@ -14,7 +12,6 @@ use proxy::rate_limiter::EndpointRateLimiter;
use proxy::rate_limiter::RateBucketInfo;
use proxy::rate_limiter::RateLimiterConfig;
use proxy::redis::notifications;
use proxy::redis::publisher::RedisPublisherClient;
use proxy::serverless::GlobalConnPoolOptions;
use proxy::usage_metrics;
@@ -25,7 +22,6 @@ use std::net::SocketAddr;
use std::pin::pin;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::sync::Mutex;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::info;
@@ -133,9 +129,6 @@ struct ProxyCliArgs {
/// Can be given multiple times for different bucket sizes.
#[clap(long, default_values_t = RateBucketInfo::DEFAULT_SET)]
endpoint_rps_limit: Vec<RateBucketInfo>,
/// Redis rate limiter max number of requests per second.
#[clap(long, default_values_t = RateBucketInfo::DEFAULT_SET)]
redis_rps_limit: Vec<RateBucketInfo>,
/// Initial limit for dynamic rate limiter. Makes sense only if `rate_limit_algorithm` is *not* `None`.
#[clap(long, default_value_t = 100)]
initial_limit: usize,
@@ -232,19 +225,6 @@ async fn main() -> anyhow::Result<()> {
let cancellation_token = CancellationToken::new();
let endpoint_rate_limiter = Arc::new(EndpointRateLimiter::new(&config.endpoint_rps_limit));
let cancel_map = CancelMap::default();
let redis_publisher = match &args.redis_notifications {
Some(url) => Some(Arc::new(Mutex::new(RedisPublisherClient::new(
url,
args.region.clone(),
&config.redis_rps_limit,
)?))),
None => None,
};
let cancellation_handler = Arc::new(CancellationHandler::new(
cancel_map.clone(),
redis_publisher,
));
// client facing tasks. these will exit on error or on cancellation
// cancellation returns Ok(())
@@ -254,7 +234,6 @@ async fn main() -> anyhow::Result<()> {
proxy_listener,
cancellation_token.clone(),
endpoint_rate_limiter.clone(),
cancellation_handler.clone(),
));
// TODO: rename the argument to something like serverless.
@@ -269,7 +248,6 @@ async fn main() -> anyhow::Result<()> {
serverless_listener,
cancellation_token.clone(),
endpoint_rate_limiter.clone(),
cancellation_handler.clone(),
));
}
@@ -293,12 +271,7 @@ async fn main() -> anyhow::Result<()> {
let cache = api.caches.project_info.clone();
if let Some(url) = args.redis_notifications {
info!("Starting redis notifications listener ({url})");
maintenance_tasks.spawn(notifications::task_main(
url.to_owned(),
cache.clone(),
cancel_map.clone(),
args.region.clone(),
));
maintenance_tasks.spawn(notifications::task_main(url.to_owned(), cache.clone()));
}
maintenance_tasks.spawn(async move { cache.clone().gc_worker().await });
}
@@ -430,8 +403,6 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
let mut endpoint_rps_limit = args.endpoint_rps_limit.clone();
RateBucketInfo::validate(&mut endpoint_rps_limit)?;
let mut redis_rps_limit = args.redis_rps_limit.clone();
RateBucketInfo::validate(&mut redis_rps_limit)?;
let config = Box::leak(Box::new(ProxyConfig {
tls_config,
@@ -443,7 +414,6 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
require_client_ip: args.require_client_ip,
disable_ip_check_for_http: args.disable_ip_check_for_http,
endpoint_rps_limit,
redis_rps_limit,
handshake_timeout: args.handshake_timeout,
// TODO: add this argument
region: args.region.clone(),

View File

@@ -1,28 +1,16 @@
use async_trait::async_trait;
use dashmap::DashMap;
use pq_proto::CancelKeyData;
use std::{net::SocketAddr, sync::Arc};
use thiserror::Error;
use tokio::net::TcpStream;
use tokio::sync::Mutex;
use tokio_postgres::{CancelToken, NoTls};
use tracing::info;
use uuid::Uuid;
use crate::{
error::ReportableError, metrics::NUM_CANCELLATION_REQUESTS,
redis::publisher::RedisPublisherClient,
};
pub type CancelMap = Arc<DashMap<CancelKeyData, Option<CancelClosure>>>;
use crate::error::ReportableError;
/// Enables serving `CancelRequest`s.
///
/// If there is a `RedisPublisherClient` available, it will be used to publish the cancellation key to other proxy instances.
pub struct CancellationHandler {
map: CancelMap,
redis_client: Option<Arc<Mutex<RedisPublisherClient>>>,
}
#[derive(Default)]
pub struct CancelMap(DashMap<CancelKeyData, Option<CancelClosure>>);
#[derive(Debug, Error)]
pub enum CancelError {
@@ -44,43 +32,15 @@ impl ReportableError for CancelError {
}
}
impl CancellationHandler {
pub fn new(map: CancelMap, redis_client: Option<Arc<Mutex<RedisPublisherClient>>>) -> Self {
Self { map, redis_client }
}
impl CancelMap {
/// Cancel a running query for the corresponding connection.
pub async fn cancel_session(
&self,
key: CancelKeyData,
session_id: Uuid,
) -> Result<(), CancelError> {
let from = "from_client";
pub async fn cancel_session(&self, key: CancelKeyData) -> Result<(), CancelError> {
// NB: we should immediately release the lock after cloning the token.
let Some(cancel_closure) = self.map.get(&key).and_then(|x| x.clone()) else {
let Some(cancel_closure) = self.0.get(&key).and_then(|x| x.clone()) else {
tracing::warn!("query cancellation key not found: {key}");
if let Some(redis_client) = &self.redis_client {
NUM_CANCELLATION_REQUESTS
.with_label_values(&[from, "not_found"])
.inc();
info!("publishing cancellation key to Redis");
match redis_client.lock().await.try_publish(key, session_id).await {
Ok(()) => {
info!("cancellation key successfuly published to Redis");
}
Err(e) => {
tracing::error!("failed to publish a message: {e}");
return Err(CancelError::IO(std::io::Error::new(
std::io::ErrorKind::Other,
e.to_string(),
)));
}
}
}
return Ok(());
};
NUM_CANCELLATION_REQUESTS
.with_label_values(&[from, "found"])
.inc();
info!("cancelling query per user's request using key {key}");
cancel_closure.try_cancel_query().await
}
@@ -97,7 +57,7 @@ impl CancellationHandler {
// Random key collisions are unlikely to happen here, but they're still possible,
// which is why we have to take care not to rewrite an existing key.
match self.map.entry(key) {
match self.0.entry(key) {
dashmap::mapref::entry::Entry::Occupied(_) => continue,
dashmap::mapref::entry::Entry::Vacant(e) => {
e.insert(None);
@@ -109,46 +69,18 @@ impl CancellationHandler {
info!("registered new query cancellation key {key}");
Session {
key,
cancellation_handler: self,
cancel_map: self,
}
}
#[cfg(test)]
fn contains(&self, session: &Session) -> bool {
self.map.contains_key(&session.key)
self.0.contains_key(&session.key)
}
#[cfg(test)]
fn is_empty(&self) -> bool {
self.map.is_empty()
}
}
#[async_trait]
pub trait NotificationsCancellationHandler {
async fn cancel_session_no_publish(&self, key: CancelKeyData) -> Result<(), CancelError>;
}
#[async_trait]
impl NotificationsCancellationHandler for CancellationHandler {
async fn cancel_session_no_publish(&self, key: CancelKeyData) -> Result<(), CancelError> {
let from = "from_redis";
let cancel_closure = self.map.get(&key).and_then(|x| x.clone());
match cancel_closure {
Some(cancel_closure) => {
NUM_CANCELLATION_REQUESTS
.with_label_values(&[from, "found"])
.inc();
cancel_closure.try_cancel_query().await
}
None => {
NUM_CANCELLATION_REQUESTS
.with_label_values(&[from, "not_found"])
.inc();
tracing::warn!("query cancellation key not found: {key}");
Ok(())
}
}
self.0.is_empty()
}
}
@@ -183,7 +115,7 @@ pub struct Session {
/// The user-facing key identifying this session.
key: CancelKeyData,
/// The [`CancelMap`] this session belongs to.
cancellation_handler: Arc<CancellationHandler>,
cancel_map: Arc<CancelMap>,
}
impl Session {
@@ -191,9 +123,7 @@ impl Session {
/// This enables query cancellation in `crate::proxy::prepare_client_connection`.
pub fn enable_query_cancellation(&self, cancel_closure: CancelClosure) -> CancelKeyData {
info!("enabling query cancellation for this session");
self.cancellation_handler
.map
.insert(self.key, Some(cancel_closure));
self.cancel_map.0.insert(self.key, Some(cancel_closure));
self.key
}
@@ -201,7 +131,7 @@ impl Session {
impl Drop for Session {
fn drop(&mut self) {
self.cancellation_handler.map.remove(&self.key);
self.cancel_map.0.remove(&self.key);
info!("dropped query cancellation key {}", &self.key);
}
}
@@ -212,16 +142,13 @@ mod tests {
#[tokio::test]
async fn check_session_drop() -> anyhow::Result<()> {
let cancellation_handler = Arc::new(CancellationHandler {
map: CancelMap::default(),
redis_client: None,
});
let cancel_map: Arc<CancelMap> = Default::default();
let session = cancellation_handler.clone().get_session();
assert!(cancellation_handler.contains(&session));
let session = cancel_map.clone().get_session();
assert!(cancel_map.contains(&session));
drop(session);
// Check that the session has been dropped.
assert!(cancellation_handler.is_empty());
assert!(cancel_map.is_empty());
Ok(())
}

View File

@@ -21,7 +21,6 @@ pub struct ProxyConfig {
pub require_client_ip: bool,
pub disable_ip_check_for_http: bool,
pub endpoint_rps_limit: Vec<RateBucketInfo>,
pub redis_rps_limit: Vec<RateBucketInfo>,
pub region: String,
pub handshake_timeout: Duration,
}

View File

@@ -152,15 +152,6 @@ pub static NUM_OPEN_CLIENTS_IN_HTTP_POOL: Lazy<IntGauge> = Lazy::new(|| {
.unwrap()
});
pub static NUM_CANCELLATION_REQUESTS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"proxy_cancellation_requests_total",
"Number of cancellation requests (per found/not_found).",
&["source", "kind"],
)
.unwrap()
});
#[derive(Clone)]
pub struct LatencyTimer {
// time since the stopwatch was started

View File

@@ -10,7 +10,7 @@ pub mod wake_compute;
use crate::{
auth,
cancellation::{self, CancellationHandler},
cancellation::{self, CancelMap},
compute,
config::{ProxyConfig, TlsConfig},
context::RequestMonitoring,
@@ -62,7 +62,6 @@ pub async fn task_main(
listener: tokio::net::TcpListener,
cancellation_token: CancellationToken,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
cancellation_handler: Arc<CancellationHandler>,
) -> anyhow::Result<()> {
scopeguard::defer! {
info!("proxy has shut down");
@@ -73,6 +72,7 @@ pub async fn task_main(
socket2::SockRef::from(&listener).set_keepalive(true)?;
let connections = tokio_util::task::task_tracker::TaskTracker::new();
let cancel_map = Arc::new(CancelMap::default());
while let Some(accept_result) =
run_until_cancelled(listener.accept(), &cancellation_token).await
@@ -80,7 +80,7 @@ pub async fn task_main(
let (socket, peer_addr) = accept_result?;
let session_id = uuid::Uuid::new_v4();
let cancellation_handler = Arc::clone(&cancellation_handler);
let cancel_map = Arc::clone(&cancel_map);
let endpoint_rate_limiter = endpoint_rate_limiter.clone();
let session_span = info_span!(
@@ -113,7 +113,7 @@ pub async fn task_main(
let res = handle_client(
config,
&mut ctx,
cancellation_handler,
cancel_map,
socket,
ClientMode::Tcp,
endpoint_rate_limiter,
@@ -227,7 +227,7 @@ impl ReportableError for ClientRequestError {
pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
config: &'static ProxyConfig,
ctx: &mut RequestMonitoring,
cancellation_handler: Arc<CancellationHandler>,
cancel_map: Arc<CancelMap>,
stream: S,
mode: ClientMode,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
@@ -253,8 +253,8 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
match tokio::time::timeout(config.handshake_timeout, do_handshake).await?? {
HandshakeData::Startup(stream, params) => (stream, params),
HandshakeData::Cancel(cancel_key_data) => {
return Ok(cancellation_handler
.cancel_session(cancel_key_data, ctx.session_id)
return Ok(cancel_map
.cancel_session(cancel_key_data)
.await
.map(|()| None)?)
}
@@ -315,7 +315,7 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
.or_else(|e| stream.throw_error(e))
.await?;
let session = cancellation_handler.get_session();
let session = cancel_map.get_session();
prepare_client_connection(&node, &session, &mut stream).await?;
// Before proxy passing, forward to compute whatever data is left in the
@@ -331,7 +331,6 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
compute: node,
req: _request_gauge,
conn: _client_gauge,
cancel: session,
}))
}

View File

@@ -1,5 +1,4 @@
use crate::{
cancellation,
compute::PostgresConnection,
console::messages::MetricsAuxInfo,
metrics::NUM_BYTES_PROXIED_COUNTER,
@@ -58,7 +57,6 @@ pub struct ProxyPassthrough<S> {
pub req: IntCounterPairGuard,
pub conn: IntCounterPairGuard,
pub cancel: cancellation::Session,
}
impl<S: AsyncRead + AsyncWrite + Unpin> ProxyPassthrough<S> {

View File

@@ -4,4 +4,4 @@ mod limiter;
pub use aimd::Aimd;
pub use limit_algorithm::{AimdConfig, Fixed, RateLimitAlgorithm, RateLimiterConfig};
pub use limiter::Limiter;
pub use limiter::{EndpointRateLimiter, RateBucketInfo, RedisRateLimiter};
pub use limiter::{EndpointRateLimiter, RateBucketInfo};

View File

@@ -22,44 +22,6 @@ use super::{
RateLimiterConfig,
};
pub struct RedisRateLimiter {
data: Vec<RateBucket>,
info: &'static [RateBucketInfo],
}
impl RedisRateLimiter {
pub fn new(info: &'static [RateBucketInfo]) -> Self {
Self {
data: vec![
RateBucket {
start: Instant::now(),
count: 0,
};
info.len()
],
info,
}
}
/// Check that number of connections is below `max_rps` rps.
pub fn check(&mut self) -> bool {
let now = Instant::now();
let should_allow_request = self
.data
.iter_mut()
.zip(self.info)
.all(|(bucket, info)| bucket.should_allow_request(info, now));
if should_allow_request {
// only increment the bucket counts if the request will actually be accepted
self.data.iter_mut().for_each(RateBucket::inc);
}
should_allow_request
}
}
// Simple per-endpoint rate limiter.
//
// Check that number of connections to the endpoint is below `max_rps` rps.

View File

@@ -1,2 +1 @@
pub mod notifications;
pub mod publisher;

View File

@@ -1,44 +1,38 @@
use std::{convert::Infallible, sync::Arc};
use futures::StreamExt;
use pq_proto::CancelKeyData;
use redis::aio::PubSub;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use serde::Deserialize;
use crate::{
cache::project_info::ProjectInfoCache,
cancellation::{CancelMap, CancellationHandler, NotificationsCancellationHandler},
intern::{ProjectIdInt, RoleNameInt},
};
const CPLANE_CHANNEL_NAME: &str = "neondb-proxy-ws-updates";
pub(crate) const PROXY_CHANNEL_NAME: &str = "neondb-proxy-to-proxy-updates";
const CHANNEL_NAME: &str = "neondb-proxy-ws-updates";
const RECONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(20);
const INVALIDATION_LAG: std::time::Duration = std::time::Duration::from_secs(20);
struct RedisConsumerClient {
struct ConsoleRedisClient {
client: redis::Client,
}
impl RedisConsumerClient {
impl ConsoleRedisClient {
pub fn new(url: &str) -> anyhow::Result<Self> {
let client = redis::Client::open(url)?;
Ok(Self { client })
}
async fn try_connect(&self) -> anyhow::Result<PubSub> {
let mut conn = self.client.get_async_connection().await?.into_pubsub();
tracing::info!("subscribing to a channel `{CPLANE_CHANNEL_NAME}`");
conn.subscribe(CPLANE_CHANNEL_NAME).await?;
tracing::info!("subscribing to a channel `{PROXY_CHANNEL_NAME}`");
conn.subscribe(PROXY_CHANNEL_NAME).await?;
tracing::info!("subscribing to a channel `{CHANNEL_NAME}`");
conn.subscribe(CHANNEL_NAME).await?;
Ok(conn)
}
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
#[serde(tag = "topic", content = "data")]
pub(crate) enum Notification {
enum Notification {
#[serde(
rename = "/allowed_ips_updated",
deserialize_with = "deserialize_json_string"
@@ -51,25 +45,16 @@ pub(crate) enum Notification {
deserialize_with = "deserialize_json_string"
)]
PasswordUpdate { password_update: PasswordUpdate },
#[serde(rename = "/cancel_session")]
Cancel(CancelSession),
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub(crate) struct AllowedIpsUpdate {
#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
struct AllowedIpsUpdate {
project_id: ProjectIdInt,
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub(crate) struct PasswordUpdate {
#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
struct PasswordUpdate {
project_id: ProjectIdInt,
role_name: RoleNameInt,
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub(crate) struct CancelSession {
pub region_id: Option<String>,
pub cancel_key_data: CancelKeyData,
pub session_id: Uuid,
}
fn deserialize_json_string<'de, D, T>(deserializer: D) -> Result<T, D::Error>
where
T: for<'de2> serde::Deserialize<'de2>,
@@ -79,88 +64,6 @@ where
serde_json::from_str(&s).map_err(<D::Error as serde::de::Error>::custom)
}
struct MessageHandler<
C: ProjectInfoCache + Send + Sync + 'static,
H: NotificationsCancellationHandler + Send + Sync + 'static,
> {
cache: Arc<C>,
cancellation_handler: Arc<H>,
region_id: String,
}
impl<
C: ProjectInfoCache + Send + Sync + 'static,
H: NotificationsCancellationHandler + Send + Sync + 'static,
> MessageHandler<C, H>
{
pub fn new(cache: Arc<C>, cancellation_handler: Arc<H>, region_id: String) -> Self {
Self {
cache,
cancellation_handler,
region_id,
}
}
pub fn disable_ttl(&self) {
self.cache.disable_ttl();
}
pub fn enable_ttl(&self) {
self.cache.enable_ttl();
}
#[tracing::instrument(skip(self, msg), fields(session_id = tracing::field::Empty))]
async fn handle_message(&self, msg: redis::Msg) -> anyhow::Result<()> {
use Notification::*;
let payload: String = msg.get_payload()?;
tracing::debug!(?payload, "received a message payload");
let msg: Notification = match serde_json::from_str(&payload) {
Ok(msg) => msg,
Err(e) => {
tracing::error!("broken message: {e}");
return Ok(());
}
};
tracing::debug!(?msg, "received a message");
match msg {
Cancel(cancel_session) => {
tracing::Span::current().record(
"session_id",
&tracing::field::display(cancel_session.session_id),
);
if let Some(cancel_region) = cancel_session.region_id {
// If the message is not for this region, ignore it.
if cancel_region != self.region_id {
return Ok(());
}
}
// This instance of cancellation_handler doesn't have a RedisPublisherClient so it can't publish the message.
match self
.cancellation_handler
.cancel_session_no_publish(cancel_session.cancel_key_data)
.await
{
Ok(()) => {}
Err(e) => {
tracing::error!("failed to cancel session: {e}");
}
}
}
_ => {
invalidate_cache(self.cache.clone(), msg.clone());
// It might happen that the invalid entry is on the way to be cached.
// To make sure that the entry is invalidated, let's repeat the invalidation in INVALIDATION_LAG seconds.
// TODO: include the version (or the timestamp) in the message and invalidate only if the entry is cached before the message.
let cache = self.cache.clone();
tokio::spawn(async move {
tokio::time::sleep(INVALIDATION_LAG).await;
invalidate_cache(cache, msg);
});
}
}
Ok(())
}
}
fn invalidate_cache<C: ProjectInfoCache>(cache: Arc<C>, msg: Notification) {
use Notification::*;
match msg {
@@ -171,33 +74,50 @@ fn invalidate_cache<C: ProjectInfoCache>(cache: Arc<C>, msg: Notification) {
password_update.project_id,
password_update.role_name,
),
Cancel(_) => unreachable!("cancel message should be handled separately"),
}
}
#[tracing::instrument(skip(cache))]
fn handle_message<C>(msg: redis::Msg, cache: Arc<C>) -> anyhow::Result<()>
where
C: ProjectInfoCache + Send + Sync + 'static,
{
let payload: String = msg.get_payload()?;
tracing::debug!(?payload, "received a message payload");
let msg: Notification = match serde_json::from_str(&payload) {
Ok(msg) => msg,
Err(e) => {
tracing::error!("broken message: {e}");
return Ok(());
}
};
tracing::debug!(?msg, "received a message");
invalidate_cache(cache.clone(), msg.clone());
// It might happen that the invalid entry is on the way to be cached.
// To make sure that the entry is invalidated, let's repeat the invalidation in INVALIDATION_LAG seconds.
// TODO: include the version (or the timestamp) in the message and invalidate only if the entry is cached before the message.
tokio::spawn(async move {
tokio::time::sleep(INVALIDATION_LAG).await;
invalidate_cache(cache, msg.clone());
});
Ok(())
}
/// Handle console's invalidation messages.
#[tracing::instrument(name = "console_notifications", skip_all)]
pub async fn task_main<C>(
url: String,
cache: Arc<C>,
cancel_map: CancelMap,
region_id: String,
) -> anyhow::Result<Infallible>
pub async fn task_main<C>(url: String, cache: Arc<C>) -> anyhow::Result<Infallible>
where
C: ProjectInfoCache + Send + Sync + 'static,
{
cache.enable_ttl();
let handler = MessageHandler::new(
cache,
Arc::new(CancellationHandler::new(cancel_map, None)),
region_id,
);
loop {
let redis = RedisConsumerClient::new(&url)?;
let redis = ConsoleRedisClient::new(&url)?;
let conn = match redis.try_connect().await {
Ok(conn) => {
handler.disable_ttl();
cache.disable_ttl();
conn
}
Err(e) => {
@@ -210,7 +130,7 @@ where
};
let mut stream = conn.into_on_message();
while let Some(msg) = stream.next().await {
match handler.handle_message(msg).await {
match handle_message(msg, cache.clone()) {
Ok(()) => {}
Err(e) => {
tracing::error!("failed to handle message: {e}, will try to reconnect");
@@ -218,7 +138,7 @@ where
}
}
}
handler.enable_ttl();
cache.enable_ttl();
}
}
@@ -278,33 +198,6 @@ mod tests {
}
);
Ok(())
}
#[test]
fn parse_cancel_session() -> anyhow::Result<()> {
let cancel_key_data = CancelKeyData {
backend_pid: 42,
cancel_key: 41,
};
let uuid = uuid::Uuid::new_v4();
let msg = Notification::Cancel(CancelSession {
cancel_key_data,
region_id: None,
session_id: uuid,
});
let text = serde_json::to_string(&msg)?;
let result: Notification = serde_json::from_str(&text)?;
assert_eq!(msg, result);
let msg = Notification::Cancel(CancelSession {
cancel_key_data,
region_id: Some("region".to_string()),
session_id: uuid,
});
let text = serde_json::to_string(&msg)?;
let result: Notification = serde_json::from_str(&text)?;
assert_eq!(msg, result,);
Ok(())
}
}

View File

@@ -1,80 +0,0 @@
use pq_proto::CancelKeyData;
use redis::AsyncCommands;
use uuid::Uuid;
use crate::rate_limiter::{RateBucketInfo, RedisRateLimiter};
use super::notifications::{CancelSession, Notification, PROXY_CHANNEL_NAME};
pub struct RedisPublisherClient {
client: redis::Client,
publisher: Option<redis::aio::Connection>,
region_id: String,
limiter: RedisRateLimiter,
}
impl RedisPublisherClient {
pub fn new(
url: &str,
region_id: String,
info: &'static [RateBucketInfo],
) -> anyhow::Result<Self> {
let client = redis::Client::open(url)?;
Ok(Self {
client,
publisher: None,
region_id,
limiter: RedisRateLimiter::new(info),
})
}
pub async fn try_publish(
&mut self,
cancel_key_data: CancelKeyData,
session_id: Uuid,
) -> anyhow::Result<()> {
if !self.limiter.check() {
tracing::info!("Rate limit exceeded. Skipping cancellation message");
return Err(anyhow::anyhow!("Rate limit exceeded"));
}
match self.publish(cancel_key_data, session_id).await {
Ok(()) => return Ok(()),
Err(e) => {
tracing::error!("failed to publish a message: {e}");
self.publisher = None;
}
}
tracing::info!("Publisher is disconnected. Reconnectiong...");
self.try_connect().await?;
self.publish(cancel_key_data, session_id).await
}
async fn publish(
&mut self,
cancel_key_data: CancelKeyData,
session_id: Uuid,
) -> anyhow::Result<()> {
let conn = self
.publisher
.as_mut()
.ok_or_else(|| anyhow::anyhow!("not connected"))?;
let payload = serde_json::to_string(&Notification::Cancel(CancelSession {
region_id: Some(self.region_id.clone()),
cancel_key_data,
session_id,
}))?;
conn.publish(PROXY_CHANNEL_NAME, payload).await?;
Ok(())
}
pub async fn try_connect(&mut self) -> anyhow::Result<()> {
match self.client.get_async_connection().await {
Ok(conn) => {
self.publisher = Some(conn);
}
Err(e) => {
tracing::error!("failed to connect to redis: {e}");
return Err(e.into());
}
}
Ok(())
}
}

View File

@@ -24,7 +24,7 @@ use crate::metrics::NUM_CLIENT_CONNECTION_GAUGE;
use crate::protocol2::{ProxyProtocolAccept, WithClientIp};
use crate::rate_limiter::EndpointRateLimiter;
use crate::serverless::backend::PoolingBackend;
use crate::{cancellation::CancellationHandler, config::ProxyConfig};
use crate::{cancellation::CancelMap, config::ProxyConfig};
use futures::StreamExt;
use hyper::{
server::{
@@ -50,7 +50,6 @@ pub async fn task_main(
ws_listener: TcpListener,
cancellation_token: CancellationToken,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
cancellation_handler: Arc<CancellationHandler>,
) -> anyhow::Result<()> {
scopeguard::defer! {
info!("websocket server has shut down");
@@ -116,7 +115,7 @@ pub async fn task_main(
let backend = backend.clone();
let ws_connections = ws_connections.clone();
let endpoint_rate_limiter = endpoint_rate_limiter.clone();
let cancellation_handler = cancellation_handler.clone();
async move {
let peer_addr = match client_addr {
Some(addr) => addr,
@@ -128,9 +127,9 @@ pub async fn task_main(
let backend = backend.clone();
let ws_connections = ws_connections.clone();
let endpoint_rate_limiter = endpoint_rate_limiter.clone();
let cancellation_handler = cancellation_handler.clone();
async move {
let cancel_map = Arc::new(CancelMap::default());
let session_id = uuid::Uuid::new_v4();
request_handler(
@@ -138,7 +137,7 @@ pub async fn task_main(
config,
backend,
ws_connections,
cancellation_handler,
cancel_map,
session_id,
peer_addr.ip(),
endpoint_rate_limiter,
@@ -206,7 +205,7 @@ async fn request_handler(
config: &'static ProxyConfig,
backend: Arc<PoolingBackend>,
ws_connections: TaskTracker,
cancellation_handler: Arc<CancellationHandler>,
cancel_map: Arc<CancelMap>,
session_id: uuid::Uuid,
peer_addr: IpAddr,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
@@ -233,7 +232,7 @@ async fn request_handler(
config,
ctx,
websocket,
cancellation_handler,
cancel_map,
host,
endpoint_rate_limiter,
)

View File

@@ -1,5 +1,5 @@
use crate::{
cancellation::CancellationHandler,
cancellation::CancelMap,
config::ProxyConfig,
context::RequestMonitoring,
error::{io_error, ReportableError},
@@ -133,7 +133,7 @@ pub async fn serve_websocket(
config: &'static ProxyConfig,
mut ctx: RequestMonitoring,
websocket: HyperWebsocket,
cancellation_handler: Arc<CancellationHandler>,
cancel_map: Arc<CancelMap>,
hostname: Option<String>,
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
) -> anyhow::Result<()> {
@@ -141,7 +141,7 @@ pub async fn serve_websocket(
let res = handle_client(
config,
&mut ctx,
cancellation_handler,
cancel_map,
WebSocketRw::new(websocket),
ClientMode::Websockets { hostname },
endpoint_rate_limiter,

View File

@@ -96,6 +96,5 @@ PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = (
"pageserver_evictions_total",
"pageserver_evictions_with_low_residence_duration_total",
*PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS,
# "pageserver_directory_entries_count", -- only used if above a certain threshold
# "pageserver_broken_tenants_count" -- used only for broken
)

View File

@@ -3972,9 +3972,6 @@ def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, endpoint
# Get the timeline ID. We need it for the 'basebackup' command
timeline_id = TimelineId(endpoint.safe_psql("SHOW neon.timeline_id")[0][0])
# wait for all pageserver shards to catch up
pre_shutdown = wait_for_last_flush_lsn(env, endpoint, endpoint.tenant_id, timeline_id)
# stop postgres to ensure that files won't change
endpoint.stop()
@@ -3985,9 +3982,7 @@ def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, endpoint
checkpoint_lsn = re.findall(
"Latest checkpoint location:\\s+([0-9A-F]+/[0-9A-F]+)", result.stdout
)[0]
log.debug(
f"last checkpoint at {checkpoint_lsn} after shutdown (before shutdown was {pre_shutdown})"
)
log.debug(f"last checkpoint at {checkpoint_lsn}")
# Take a basebackup from pageserver
restored_dir_path = env.repo_dir / f"{endpoint.endpoint_id}_restored_datadir"

View File

@@ -1,5 +1,5 @@
{
"postgres-v16": "9c37a4988463a97d9cacb321acf3828b09823269",
"postgres-v15": "ca2def999368d9df098a637234ad5a9003189463",
"postgres-v14": "9dd9956c55ffbbd9abe77d10382453757fedfcf5"
"postgres-v16": "550cdd26d445afdd26b15aa93c8c2f3dc52f8361",
"postgres-v15": "6ee78a3c29e33cafd85ba09568b6b5eb031d29b9",
"postgres-v14": "018fb052011081dc2733d3118d12e5c36df6eba1"
}

View File

@@ -38,7 +38,7 @@ futures-io = { version = "0.3" }
futures-sink = { version = "0.3" }
futures-util = { version = "0.3", features = ["channel", "io", "sink"] }
getrandom = { version = "0.2", default-features = false, features = ["std"] }
hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", features = ["raw"] }
hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", default-features = false, features = ["raw"] }
hashbrown-594e8ee84c453af0 = { package = "hashbrown", version = "0.13", features = ["raw"] }
hex = { version = "0.4", features = ["serde"] }
hmac = { version = "0.12", default-features = false, features = ["reset"] }
@@ -91,7 +91,7 @@ cc = { version = "1", default-features = false, features = ["parallel"] }
chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "wasmbind"] }
either = { version = "1" }
getrandom = { version = "0.2", default-features = false, features = ["std"] }
hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", features = ["raw"] }
hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", default-features = false, features = ["raw"] }
indexmap = { version = "1", default-features = false, features = ["std"] }
itertools = { version = "0.10" }
libc = { version = "0.2", features = ["extra_traits", "use_std"] }