mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-03 13:30:38 +00:00
Compare commits
18 Commits
jcsp/pages
...
sk-skip-de
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f08493f498 | ||
|
|
024372a3db | ||
|
|
fff2468aa2 | ||
|
|
c7538a2c20 | ||
|
|
a2d0d44b42 | ||
|
|
7d3cdc05d4 | ||
|
|
840abe3954 | ||
|
|
774a6e7475 | ||
|
|
df5d588f63 | ||
|
|
f39b0fce9b | ||
|
|
a9ec4eb4fc | ||
|
|
a97b54e3b9 | ||
|
|
a5114a99b2 | ||
|
|
ee7bbdda0e | ||
|
|
b6e070bf85 | ||
|
|
7fa732c96c | ||
|
|
331935df91 | ||
|
|
a8eb4042ba |
7
Cargo.lock
generated
7
Cargo.lock
generated
@@ -2263,11 +2263,11 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "hashlink"
|
||||
version = "0.8.2"
|
||||
version = "0.8.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0761a1b9491c4f2e3d66aa0f62d0fba0af9a0e2852e4d48ea506632a4b56e6aa"
|
||||
checksum = "e8094feaf31ff591f651a2664fb9cfd92bba7a60ce3197265e9482ebe753c8f7"
|
||||
dependencies = [
|
||||
"hashbrown 0.13.2",
|
||||
"hashbrown 0.14.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -3952,6 +3952,7 @@ dependencies = [
|
||||
"pin-project-lite",
|
||||
"postgres-protocol",
|
||||
"rand 0.8.5",
|
||||
"serde",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tracing",
|
||||
|
||||
@@ -81,7 +81,7 @@ futures-core = "0.3"
|
||||
futures-util = "0.3"
|
||||
git-version = "0.3"
|
||||
hashbrown = "0.13"
|
||||
hashlink = "0.8.1"
|
||||
hashlink = "0.8.4"
|
||||
hdrhistogram = "7.5.2"
|
||||
hex = "0.4"
|
||||
hex-literal = "0.4"
|
||||
|
||||
@@ -820,6 +820,10 @@ RUN make -j $(getconf _NPROCESSORS_ONLN) \
|
||||
PG_CONFIG=/usr/local/pgsql/bin/pg_config \
|
||||
-C pgxn/neon_utils \
|
||||
-s install && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) \
|
||||
PG_CONFIG=/usr/local/pgsql/bin/pg_config \
|
||||
-C pgxn/neon_test_utils \
|
||||
-s install && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) \
|
||||
PG_CONFIG=/usr/local/pgsql/bin/pg_config \
|
||||
-C pgxn/neon_rmgr \
|
||||
|
||||
@@ -2,7 +2,7 @@ use std::collections::HashMap;
|
||||
use std::env;
|
||||
use std::fs;
|
||||
use std::io::BufRead;
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
use std::os::unix::fs::{symlink, PermissionsExt};
|
||||
use std::path::Path;
|
||||
use std::process::{Command, Stdio};
|
||||
use std::str::FromStr;
|
||||
@@ -634,6 +634,48 @@ impl ComputeNode {
|
||||
// Update pg_hba.conf received with basebackup.
|
||||
update_pg_hba(pgdata_path)?;
|
||||
|
||||
// Place pg_dynshmem under /dev/shm. This allows us to use
|
||||
// 'dynamic_shared_memory_type = mmap' so that the files are placed in
|
||||
// /dev/shm, similar to how 'dynamic_shared_memory_type = posix' works.
|
||||
//
|
||||
// Why on earth don't we just stick to the 'posix' default, you might
|
||||
// ask. It turns out that making large allocations with 'posix' doesn't
|
||||
// work very well with autoscaling. The behavior we want is that:
|
||||
//
|
||||
// 1. You can make large DSM allocations, larger than the current RAM
|
||||
// size of the VM, without errors
|
||||
//
|
||||
// 2. If the allocated memory is really used, the VM is scaled up
|
||||
// automatically to accommodate that
|
||||
//
|
||||
// We try to make that possible by having swap in the VM. But with the
|
||||
// default 'posix' DSM implementation, we fail step 1, even when there's
|
||||
// plenty of swap available. PostgreSQL uses posix_fallocate() to create
|
||||
// the shmem segment, which is really just a file in /dev/shm in Linux,
|
||||
// but posix_fallocate() on tmpfs returns ENOMEM if the size is larger
|
||||
// than available RAM.
|
||||
//
|
||||
// Using 'dynamic_shared_memory_type = mmap' works around that, because
|
||||
// the Postgres 'mmap' DSM implementation doesn't use
|
||||
// posix_fallocate(). Instead, it uses repeated calls to write(2) to
|
||||
// fill the file with zeros. It's weird that that differs between
|
||||
// 'posix' and 'mmap', but we take advantage of it. When the file is
|
||||
// filled slowly with write(2), the kernel allows it to grow larger, as
|
||||
// long as there's swap available.
|
||||
//
|
||||
// In short, using 'dynamic_shared_memory_type = mmap' allows us one DSM
|
||||
// segment to be larger than currently available RAM. But because we
|
||||
// don't want to store it on a real file, which the kernel would try to
|
||||
// flush to disk, so symlink pg_dynshm to /dev/shm.
|
||||
//
|
||||
// We don't set 'dynamic_shared_memory_type = mmap' here, we let the
|
||||
// control plane control that option. If 'mmap' is not used, this
|
||||
// symlink doesn't affect anything.
|
||||
//
|
||||
// See https://github.com/neondatabase/autoscaling/issues/800
|
||||
std::fs::remove_dir(pgdata_path.join("pg_dynshmem"))?;
|
||||
symlink("/dev/shm/", pgdata_path.join("pg_dynshmem"))?;
|
||||
|
||||
match spec.mode {
|
||||
ComputeMode::Primary => {}
|
||||
ComputeMode::Replica | ComputeMode::Static(..) => {
|
||||
|
||||
@@ -381,7 +381,6 @@ impl Persistence {
|
||||
//
|
||||
// We create the child shards here, so that they will be available for increment_generation calls
|
||||
// if some pageserver holding a child shard needs to restart before the overall tenant split is complete.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) async fn begin_shard_split(
|
||||
&self,
|
||||
old_shard_count: ShardCount,
|
||||
@@ -449,7 +448,6 @@ impl Persistence {
|
||||
|
||||
// When we finish shard splitting, we must atomically clean up the old shards
|
||||
// and insert the new shards, and clear the splitting marker.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) async fn complete_shard_split(
|
||||
&self,
|
||||
split_tenant_id: TenantId,
|
||||
|
||||
@@ -115,7 +115,6 @@ pub fn set_build_info_metric(revision: &str, build_tag: &str) {
|
||||
// performed by the process.
|
||||
// We know the size of the block, so we can determine the I/O bytes out of it.
|
||||
// The value might be not 100% exact, but should be fine for Prometheus metrics in this case.
|
||||
#[allow(clippy::unnecessary_cast)]
|
||||
fn update_rusage_metrics() {
|
||||
let rusage_stats = get_rusage_stats();
|
||||
|
||||
|
||||
@@ -494,6 +494,8 @@ pub struct TimelineInfo {
|
||||
pub current_logical_size: u64,
|
||||
pub current_logical_size_is_accurate: bool,
|
||||
|
||||
pub directory_entries_counts: Vec<u64>,
|
||||
|
||||
/// Sum of the size of all layer files.
|
||||
/// If a layer is present in both local FS and S3, it counts only once.
|
||||
pub current_physical_size: Option<u64>, // is None when timeline is Unloaded
|
||||
|
||||
@@ -124,6 +124,7 @@ impl RelTag {
|
||||
Ord,
|
||||
strum_macros::EnumIter,
|
||||
strum_macros::FromRepr,
|
||||
enum_map::Enum,
|
||||
)]
|
||||
#[repr(u8)]
|
||||
pub enum SlruKind {
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
#![allow(non_snake_case)]
|
||||
// bindgen creates some unsafe code with no doc comments.
|
||||
#![allow(clippy::missing_safety_doc)]
|
||||
// noted at 1.63 that in many cases there's a u32 -> u32 transmutes in bindgen code.
|
||||
// noted at 1.63 that in many cases there's u32 -> u32 transmutes in bindgen code.
|
||||
#![allow(clippy::useless_transmute)]
|
||||
// modules included with the postgres_ffi macro depend on the types of the specific version's
|
||||
// types, and trigger a too eager lint.
|
||||
|
||||
@@ -13,5 +13,6 @@ rand.workspace = true
|
||||
tokio.workspace = true
|
||||
tracing.workspace = true
|
||||
thiserror.workspace = true
|
||||
serde.workspace = true
|
||||
|
||||
workspace_hack.workspace = true
|
||||
|
||||
@@ -7,6 +7,7 @@ pub mod framed;
|
||||
|
||||
use byteorder::{BigEndian, ReadBytesExt};
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{borrow::Cow, collections::HashMap, fmt, io, str};
|
||||
|
||||
// re-export for use in utils pageserver_feedback.rs
|
||||
@@ -123,7 +124,7 @@ impl StartupMessageParams {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)]
|
||||
#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy, Serialize, Deserialize)]
|
||||
pub struct CancelKeyData {
|
||||
pub backend_pid: i32,
|
||||
pub cancel_key: i32,
|
||||
|
||||
@@ -435,7 +435,6 @@ impl RemoteStorage for LocalFs {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(clippy::diverging_sub_expression)]
|
||||
async fn time_travel_recover(
|
||||
&self,
|
||||
_prefix: Option<&RemotePath>,
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
#![allow(unused)]
|
||||
|
||||
use criterion::{criterion_group, criterion_main, Criterion};
|
||||
use utils::id;
|
||||
|
||||
|
||||
@@ -234,7 +234,7 @@ impl DeletionHeader {
|
||||
let header_bytes = serde_json::to_vec(self).context("serialize deletion header")?;
|
||||
let header_path = conf.deletion_header_path();
|
||||
let temp_path = path_with_suffix_extension(&header_path, TEMP_SUFFIX);
|
||||
VirtualFile::crashsafe_overwrite(&header_path, &temp_path, &header_bytes)
|
||||
VirtualFile::crashsafe_overwrite(&header_path, &temp_path, header_bytes)
|
||||
.await
|
||||
.maybe_fatal_err("save deletion header")?;
|
||||
|
||||
@@ -325,7 +325,7 @@ impl DeletionList {
|
||||
let temp_path = path_with_suffix_extension(&path, TEMP_SUFFIX);
|
||||
|
||||
let bytes = serde_json::to_vec(self).expect("Failed to serialize deletion list");
|
||||
VirtualFile::crashsafe_overwrite(&path, &temp_path, &bytes)
|
||||
VirtualFile::crashsafe_overwrite(&path, &temp_path, bytes)
|
||||
.await
|
||||
.maybe_fatal_err("save deletion list")
|
||||
.map_err(Into::into)
|
||||
@@ -834,7 +834,6 @@ mod test {
|
||||
}
|
||||
|
||||
impl ControlPlaneGenerationsApi for MockControlPlane {
|
||||
#[allow(clippy::diverging_sub_expression)] // False positive via async_trait
|
||||
async fn re_attach(&self) -> Result<HashMap<TenantShardId, Generation>, RetryForeverError> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
@@ -351,7 +351,6 @@ pub enum IterationOutcome<U> {
|
||||
Finished(IterationOutcomeFinished<U>),
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct IterationOutcomeFinished<U> {
|
||||
/// The actual usage observed before we started the iteration.
|
||||
@@ -366,7 +365,6 @@ pub struct IterationOutcomeFinished<U> {
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
#[allow(dead_code)]
|
||||
struct AssumedUsage<U> {
|
||||
/// The expected value for `after`, after phase 2.
|
||||
projected_after: U,
|
||||
@@ -374,14 +372,12 @@ struct AssumedUsage<U> {
|
||||
failed: LayerCount,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[derive(Debug, Serialize)]
|
||||
struct PlannedUsage<U> {
|
||||
respecting_tenant_min_resident_size: U,
|
||||
fallback_to_global_lru: Option<U>,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[derive(Debug, Default, Serialize)]
|
||||
struct LayerCount {
|
||||
file_sizes: u64,
|
||||
@@ -565,7 +561,6 @@ pub(crate) struct EvictionSecondaryLayer {
|
||||
#[derive(Clone)]
|
||||
pub(crate) enum EvictionLayer {
|
||||
Attached(Layer),
|
||||
#[allow(dead_code)]
|
||||
Secondary(EvictionSecondaryLayer),
|
||||
}
|
||||
|
||||
@@ -1105,7 +1100,6 @@ mod filesystem_level_usage {
|
||||
use super::DiskUsageEvictionTaskConfig;
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
#[allow(dead_code)]
|
||||
pub struct Usage<'a> {
|
||||
config: &'a DiskUsageEvictionTaskConfig,
|
||||
|
||||
|
||||
@@ -422,6 +422,7 @@ async fn build_timeline_info_common(
|
||||
tenant::timeline::logical_size::Accuracy::Approximate => false,
|
||||
tenant::timeline::logical_size::Accuracy::Exact => true,
|
||||
},
|
||||
directory_entries_counts: timeline.get_directory_metrics().to_vec(),
|
||||
current_physical_size,
|
||||
current_logical_size_non_incremental: None,
|
||||
timeline_dir_layer_file_size_sum: None,
|
||||
@@ -2213,7 +2214,7 @@ pub fn make_router(
|
||||
)
|
||||
.get(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/keyspace",
|
||||
|r| testing_api_handler("read out the keyspace", r, timeline_collect_keyspace),
|
||||
|r| api_handler(r, timeline_collect_keyspace),
|
||||
)
|
||||
.put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler))
|
||||
.any(handler_404))
|
||||
|
||||
@@ -602,6 +602,15 @@ pub(crate) mod initial_logical_size {
|
||||
});
|
||||
}
|
||||
|
||||
static DIRECTORY_ENTRIES_COUNT: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
register_uint_gauge_vec!(
|
||||
"pageserver_directory_entries_count",
|
||||
"Sum of the entries in pageserver-stored directory listings",
|
||||
&["tenant_id", "shard_id", "timeline_id"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static TENANT_STATE_METRIC: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
register_uint_gauge_vec!(
|
||||
"pageserver_tenant_states_count",
|
||||
@@ -1809,6 +1818,7 @@ pub(crate) struct TimelineMetrics {
|
||||
resident_physical_size_gauge: UIntGauge,
|
||||
/// copy of LayeredTimeline.current_logical_size
|
||||
pub current_logical_size_gauge: UIntGauge,
|
||||
pub directory_entries_count_gauge: Lazy<UIntGauge, Box<dyn Send + Fn() -> UIntGauge>>,
|
||||
pub num_persistent_files_created: IntCounter,
|
||||
pub persistent_bytes_written: IntCounter,
|
||||
pub evictions: IntCounter,
|
||||
@@ -1818,12 +1828,12 @@ pub(crate) struct TimelineMetrics {
|
||||
impl TimelineMetrics {
|
||||
pub fn new(
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
timeline_id_raw: &TimelineId,
|
||||
evictions_with_low_residence_duration_builder: EvictionsWithLowResidenceDurationBuilder,
|
||||
) -> Self {
|
||||
let tenant_id = tenant_shard_id.tenant_id.to_string();
|
||||
let shard_id = format!("{}", tenant_shard_id.shard_slug());
|
||||
let timeline_id = timeline_id.to_string();
|
||||
let timeline_id = timeline_id_raw.to_string();
|
||||
let flush_time_histo = StorageTimeMetrics::new(
|
||||
StorageTimeOperation::LayerFlush,
|
||||
&tenant_id,
|
||||
@@ -1876,6 +1886,22 @@ impl TimelineMetrics {
|
||||
let current_logical_size_gauge = CURRENT_LOGICAL_SIZE
|
||||
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
|
||||
.unwrap();
|
||||
// TODO use impl Trait syntax here once we have ability to use it: https://github.com/rust-lang/rust/issues/63065
|
||||
let directory_entries_count_gauge_closure = {
|
||||
let tenant_shard_id = *tenant_shard_id;
|
||||
let timeline_id_raw = *timeline_id_raw;
|
||||
move || {
|
||||
let tenant_id = tenant_shard_id.tenant_id.to_string();
|
||||
let shard_id = format!("{}", tenant_shard_id.shard_slug());
|
||||
let timeline_id = timeline_id_raw.to_string();
|
||||
let gauge: UIntGauge = DIRECTORY_ENTRIES_COUNT
|
||||
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
|
||||
.unwrap();
|
||||
gauge
|
||||
}
|
||||
};
|
||||
let directory_entries_count_gauge: Lazy<UIntGauge, Box<dyn Send + Fn() -> UIntGauge>> =
|
||||
Lazy::new(Box::new(directory_entries_count_gauge_closure));
|
||||
let num_persistent_files_created = NUM_PERSISTENT_FILES_CREATED
|
||||
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
|
||||
.unwrap();
|
||||
@@ -1902,6 +1928,7 @@ impl TimelineMetrics {
|
||||
last_record_gauge,
|
||||
resident_physical_size_gauge,
|
||||
current_logical_size_gauge,
|
||||
directory_entries_count_gauge,
|
||||
num_persistent_files_created,
|
||||
persistent_bytes_written,
|
||||
evictions,
|
||||
@@ -1944,6 +1971,9 @@ impl Drop for TimelineMetrics {
|
||||
RESIDENT_PHYSICAL_SIZE.remove_label_values(&[tenant_id, &shard_id, timeline_id]);
|
||||
}
|
||||
let _ = CURRENT_LOGICAL_SIZE.remove_label_values(&[tenant_id, &shard_id, timeline_id]);
|
||||
if let Some(metric) = Lazy::get(&DIRECTORY_ENTRIES_COUNT) {
|
||||
let _ = metric.remove_label_values(&[tenant_id, &shard_id, timeline_id]);
|
||||
}
|
||||
let _ =
|
||||
NUM_PERSISTENT_FILES_CREATED.remove_label_values(&[tenant_id, &shard_id, timeline_id]);
|
||||
let _ = PERSISTENT_BYTES_WRITTEN.remove_label_values(&[tenant_id, &shard_id, timeline_id]);
|
||||
|
||||
@@ -14,6 +14,7 @@ use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_i
|
||||
use crate::walrecord::NeonWalRecord;
|
||||
use anyhow::{ensure, Context};
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use enum_map::Enum;
|
||||
use pageserver_api::key::{
|
||||
dbdir_key_range, is_rel_block_key, is_slru_block_key, rel_block_to_key, rel_dir_to_key,
|
||||
rel_key_range, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,
|
||||
@@ -155,6 +156,8 @@ impl Timeline {
|
||||
pending_updates: HashMap::new(),
|
||||
pending_deletions: Vec::new(),
|
||||
pending_nblocks: 0,
|
||||
pending_aux_files: None,
|
||||
pending_directory_entries: Vec::new(),
|
||||
lsn,
|
||||
}
|
||||
}
|
||||
@@ -868,6 +871,15 @@ pub struct DatadirModification<'a> {
|
||||
pending_updates: HashMap<Key, Vec<(Lsn, Value)>>,
|
||||
pending_deletions: Vec<(Range<Key>, Lsn)>,
|
||||
pending_nblocks: i64,
|
||||
|
||||
// If we already wrote any aux file changes in this modification, stash the latest dir. If set,
|
||||
// [`Self::put_file`] may assume that it is safe to emit a delta rather than checking
|
||||
// if AUX_FILES_KEY is already set.
|
||||
pending_aux_files: Option<AuxFilesDirectory>,
|
||||
|
||||
/// For special "directory" keys that store key-value maps, track the size of the map
|
||||
/// if it was updated in this modification.
|
||||
pending_directory_entries: Vec<(DirectoryKind, usize)>,
|
||||
}
|
||||
|
||||
impl<'a> DatadirModification<'a> {
|
||||
@@ -899,6 +911,7 @@ impl<'a> DatadirModification<'a> {
|
||||
let buf = DbDirectory::ser(&DbDirectory {
|
||||
dbdirs: HashMap::new(),
|
||||
})?;
|
||||
self.pending_directory_entries.push((DirectoryKind::Db, 0));
|
||||
self.put(DBDIR_KEY, Value::Image(buf.into()));
|
||||
|
||||
// Create AuxFilesDirectory
|
||||
@@ -907,16 +920,24 @@ impl<'a> DatadirModification<'a> {
|
||||
let buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory {
|
||||
xids: HashSet::new(),
|
||||
})?;
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::TwoPhase, 0));
|
||||
self.put(TWOPHASEDIR_KEY, Value::Image(buf.into()));
|
||||
|
||||
let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
|
||||
let empty_dir = Value::Image(buf);
|
||||
self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
|
||||
self.put(
|
||||
slru_dir_to_key(SlruKind::MultiXactMembers),
|
||||
empty_dir.clone(),
|
||||
);
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
|
||||
self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets), 0));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1017,6 +1038,7 @@ impl<'a> DatadirModification<'a> {
|
||||
let buf = RelDirectory::ser(&RelDirectory {
|
||||
rels: HashSet::new(),
|
||||
})?;
|
||||
self.pending_directory_entries.push((DirectoryKind::Rel, 0));
|
||||
self.put(
|
||||
rel_dir_to_key(spcnode, dbnode),
|
||||
Value::Image(Bytes::from(buf)),
|
||||
@@ -1039,6 +1061,8 @@ impl<'a> DatadirModification<'a> {
|
||||
if !dir.xids.insert(xid) {
|
||||
anyhow::bail!("twophase file for xid {} already exists", xid);
|
||||
}
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::TwoPhase, dir.xids.len()));
|
||||
self.put(
|
||||
TWOPHASEDIR_KEY,
|
||||
Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
|
||||
@@ -1074,6 +1098,8 @@ impl<'a> DatadirModification<'a> {
|
||||
let mut dir = DbDirectory::des(&buf)?;
|
||||
if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() {
|
||||
let buf = DbDirectory::ser(&dir)?;
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::Db, dir.dbdirs.len()));
|
||||
self.put(DBDIR_KEY, Value::Image(buf.into()));
|
||||
} else {
|
||||
warn!(
|
||||
@@ -1111,6 +1137,8 @@ impl<'a> DatadirModification<'a> {
|
||||
// Didn't exist. Update dbdir
|
||||
dbdir.dbdirs.insert((rel.spcnode, rel.dbnode), false);
|
||||
let buf = DbDirectory::ser(&dbdir).context("serialize db")?;
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::Db, dbdir.dbdirs.len()));
|
||||
self.put(DBDIR_KEY, Value::Image(buf.into()));
|
||||
|
||||
// and create the RelDirectory
|
||||
@@ -1125,6 +1153,10 @@ impl<'a> DatadirModification<'a> {
|
||||
if !rel_dir.rels.insert((rel.relnode, rel.forknum)) {
|
||||
return Err(RelationError::AlreadyExists);
|
||||
}
|
||||
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::Rel, rel_dir.rels.len()));
|
||||
|
||||
self.put(
|
||||
rel_dir_key,
|
||||
Value::Image(Bytes::from(
|
||||
@@ -1216,6 +1248,9 @@ impl<'a> DatadirModification<'a> {
|
||||
let buf = self.get(dir_key, ctx).await?;
|
||||
let mut dir = RelDirectory::des(&buf)?;
|
||||
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::Rel, dir.rels.len()));
|
||||
|
||||
if dir.rels.remove(&(rel.relnode, rel.forknum)) {
|
||||
self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?)));
|
||||
} else {
|
||||
@@ -1251,6 +1286,8 @@ impl<'a> DatadirModification<'a> {
|
||||
if !dir.segments.insert(segno) {
|
||||
anyhow::bail!("slru segment {kind:?}/{segno} already exists");
|
||||
}
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
|
||||
self.put(
|
||||
dir_key,
|
||||
Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
|
||||
@@ -1295,6 +1332,8 @@ impl<'a> DatadirModification<'a> {
|
||||
if !dir.segments.remove(&segno) {
|
||||
warn!("slru segment {:?}/{} does not exist", kind, segno);
|
||||
}
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::SlruSegment(kind), dir.segments.len()));
|
||||
self.put(
|
||||
dir_key,
|
||||
Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)),
|
||||
@@ -1325,6 +1364,8 @@ impl<'a> DatadirModification<'a> {
|
||||
if !dir.xids.remove(&xid) {
|
||||
warn!("twophase file for xid {} does not exist", xid);
|
||||
}
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::TwoPhase, dir.xids.len()));
|
||||
self.put(
|
||||
TWOPHASEDIR_KEY,
|
||||
Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)),
|
||||
@@ -1340,6 +1381,8 @@ impl<'a> DatadirModification<'a> {
|
||||
let buf = AuxFilesDirectory::ser(&AuxFilesDirectory {
|
||||
files: HashMap::new(),
|
||||
})?;
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::AuxFiles, 0));
|
||||
self.put(AUX_FILES_KEY, Value::Image(Bytes::from(buf)));
|
||||
Ok(())
|
||||
}
|
||||
@@ -1350,28 +1393,76 @@ impl<'a> DatadirModification<'a> {
|
||||
content: &[u8],
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut dir = match self.get(AUX_FILES_KEY, ctx).await {
|
||||
Ok(buf) => AuxFilesDirectory::des(&buf)?,
|
||||
Err(e) => {
|
||||
// This is expected: historical databases do not have the key.
|
||||
debug!("Failed to get info about AUX files: {}", e);
|
||||
AuxFilesDirectory {
|
||||
files: HashMap::new(),
|
||||
let file_path = path.to_string();
|
||||
let content = if content.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(Bytes::copy_from_slice(content))
|
||||
};
|
||||
|
||||
let dir = if let Some(mut dir) = self.pending_aux_files.take() {
|
||||
// We already updated aux files in `self`: emit a delta and update our latest value
|
||||
|
||||
self.put(
|
||||
AUX_FILES_KEY,
|
||||
Value::WalRecord(NeonWalRecord::AuxFile {
|
||||
file_path: file_path.clone(),
|
||||
content: content.clone(),
|
||||
}),
|
||||
);
|
||||
|
||||
dir.upsert(file_path, content);
|
||||
dir
|
||||
} else {
|
||||
// Check if the AUX_FILES_KEY is initialized
|
||||
match self.get(AUX_FILES_KEY, ctx).await {
|
||||
Ok(dir_bytes) => {
|
||||
let mut dir = AuxFilesDirectory::des(&dir_bytes)?;
|
||||
// Key is already set, we may append a delta
|
||||
self.put(
|
||||
AUX_FILES_KEY,
|
||||
Value::WalRecord(NeonWalRecord::AuxFile {
|
||||
file_path: file_path.clone(),
|
||||
content: content.clone(),
|
||||
}),
|
||||
);
|
||||
dir.upsert(file_path, content);
|
||||
dir
|
||||
}
|
||||
Err(
|
||||
e @ (PageReconstructError::AncestorStopping(_)
|
||||
| PageReconstructError::Cancelled
|
||||
| PageReconstructError::AncestorLsnTimeout(_)),
|
||||
) => {
|
||||
// Important that we do not interpret a shutdown error as "not found" and thereby
|
||||
// reset the map.
|
||||
return Err(e.into());
|
||||
}
|
||||
// FIXME: PageReconstructError doesn't have an explicit variant for key-not-found, so
|
||||
// we are assuming that all _other_ possible errors represents a missing key. If some
|
||||
// other error occurs, we may incorrectly reset the map of aux files.
|
||||
Err(PageReconstructError::Other(_) | PageReconstructError::WalRedo(_)) => {
|
||||
// Key is missing, we must insert an image as the basis for subsequent deltas.
|
||||
|
||||
let mut dir = AuxFilesDirectory {
|
||||
files: HashMap::new(),
|
||||
};
|
||||
dir.upsert(file_path, content);
|
||||
self.put(
|
||||
AUX_FILES_KEY,
|
||||
Value::Image(Bytes::from(
|
||||
AuxFilesDirectory::ser(&dir).context("serialize")?,
|
||||
)),
|
||||
);
|
||||
dir
|
||||
}
|
||||
}
|
||||
};
|
||||
let path = path.to_string();
|
||||
if content.is_empty() {
|
||||
dir.files.remove(&path);
|
||||
} else {
|
||||
dir.files.insert(path, Bytes::copy_from_slice(content));
|
||||
}
|
||||
self.put(
|
||||
AUX_FILES_KEY,
|
||||
Value::Image(Bytes::from(
|
||||
AuxFilesDirectory::ser(&dir).context("serialize")?,
|
||||
)),
|
||||
);
|
||||
|
||||
self.pending_directory_entries
|
||||
.push((DirectoryKind::AuxFiles, dir.files.len()));
|
||||
self.pending_aux_files = Some(dir);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1427,6 +1518,10 @@ impl<'a> DatadirModification<'a> {
|
||||
self.pending_nblocks = 0;
|
||||
}
|
||||
|
||||
for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
|
||||
writer.update_directory_entries_count(kind, count as u64);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1464,6 +1559,10 @@ impl<'a> DatadirModification<'a> {
|
||||
writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
|
||||
}
|
||||
|
||||
for (kind, count) in std::mem::take(&mut self.pending_directory_entries) {
|
||||
writer.update_directory_entries_count(kind, count as u64);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1573,8 +1672,18 @@ struct RelDirectory {
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Default)]
|
||||
struct AuxFilesDirectory {
|
||||
files: HashMap<String, Bytes>,
|
||||
pub(crate) struct AuxFilesDirectory {
|
||||
pub(crate) files: HashMap<String, Bytes>,
|
||||
}
|
||||
|
||||
impl AuxFilesDirectory {
|
||||
pub(crate) fn upsert(&mut self, key: String, value: Option<Bytes>) {
|
||||
if let Some(value) = value {
|
||||
self.files.insert(key, value);
|
||||
} else {
|
||||
self.files.remove(&key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
@@ -1588,13 +1697,82 @@ struct SlruSegmentDirectory {
|
||||
segments: HashSet<u32>,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)]
|
||||
#[repr(u8)]
|
||||
pub(crate) enum DirectoryKind {
|
||||
Db,
|
||||
TwoPhase,
|
||||
Rel,
|
||||
AuxFiles,
|
||||
SlruSegment(SlruKind),
|
||||
}
|
||||
|
||||
impl DirectoryKind {
|
||||
pub(crate) const KINDS_NUM: usize = <DirectoryKind as Enum>::LENGTH;
|
||||
pub(crate) fn offset(&self) -> usize {
|
||||
self.into_usize()
|
||||
}
|
||||
}
|
||||
|
||||
static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
|
||||
|
||||
#[allow(clippy::bool_assert_comparison)]
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
//use super::repo_harness::*;
|
||||
//use super::*;
|
||||
use hex_literal::hex;
|
||||
use utils::id::TimelineId;
|
||||
|
||||
use super::*;
|
||||
|
||||
use crate::{tenant::harness::TenantHarness, DEFAULT_PG_VERSION};
|
||||
|
||||
/// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
|
||||
#[tokio::test]
|
||||
async fn aux_files_round_trip() -> anyhow::Result<()> {
|
||||
let name = "aux_files_round_trip";
|
||||
let harness = TenantHarness::create(name)?;
|
||||
|
||||
pub const TIMELINE_ID: TimelineId =
|
||||
TimelineId::from_array(hex!("11223344556677881122334455667788"));
|
||||
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline = tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
let tline = tline.raw_timeline().unwrap();
|
||||
|
||||
// First modification: insert two keys
|
||||
let mut modification = tline.begin_modification(Lsn(0x1000));
|
||||
modification.put_file("foo/bar1", b"content1", &ctx).await?;
|
||||
modification.set_lsn(Lsn(0x1008))?;
|
||||
modification.put_file("foo/bar2", b"content2", &ctx).await?;
|
||||
modification.commit(&ctx).await?;
|
||||
let expect_1008 = HashMap::from([
|
||||
("foo/bar1".to_string(), Bytes::from_static(b"content1")),
|
||||
("foo/bar2".to_string(), Bytes::from_static(b"content2")),
|
||||
]);
|
||||
|
||||
let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
|
||||
assert_eq!(readback, expect_1008);
|
||||
|
||||
// Second modification: update one key, remove the other
|
||||
let mut modification = tline.begin_modification(Lsn(0x2000));
|
||||
modification.put_file("foo/bar1", b"content3", &ctx).await?;
|
||||
modification.set_lsn(Lsn(0x2008))?;
|
||||
modification.put_file("foo/bar2", b"", &ctx).await?;
|
||||
modification.commit(&ctx).await?;
|
||||
let expect_2008 =
|
||||
HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
|
||||
|
||||
let readback = tline.list_aux_files(Lsn(0x2008), &ctx).await?;
|
||||
assert_eq!(readback, expect_2008);
|
||||
|
||||
// Reading back in time works
|
||||
let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
|
||||
assert_eq!(readback, expect_1008);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/*
|
||||
fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {
|
||||
|
||||
@@ -30,10 +30,6 @@
|
||||
//! only a single tenant or timeline.
|
||||
//!
|
||||
|
||||
// Clippy 1.60 incorrectly complains about the tokio::task_local!() macro.
|
||||
// Silence it. See https://github.com/rust-lang/rust-clippy/issues/9224.
|
||||
#![allow(clippy::declare_interior_mutable_const)]
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::fmt;
|
||||
use std::future::Future;
|
||||
@@ -312,7 +308,6 @@ struct MutableTaskState {
|
||||
}
|
||||
|
||||
struct PageServerTask {
|
||||
#[allow(dead_code)] // unused currently
|
||||
task_id: PageserverTaskId,
|
||||
|
||||
kind: TaskKind,
|
||||
|
||||
@@ -2880,7 +2880,7 @@ impl Tenant {
|
||||
let config_path = config_path.to_owned();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
Handle::current().block_on(async move {
|
||||
let conf_content = conf_content.as_bytes();
|
||||
let conf_content = conf_content.into_bytes();
|
||||
VirtualFile::crashsafe_overwrite(&config_path, &temp_path, conf_content)
|
||||
.await
|
||||
.with_context(|| {
|
||||
@@ -2917,7 +2917,7 @@ impl Tenant {
|
||||
let target_config_path = target_config_path.to_owned();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
Handle::current().block_on(async move {
|
||||
let conf_content = conf_content.as_bytes();
|
||||
let conf_content = conf_content.into_bytes();
|
||||
VirtualFile::crashsafe_overwrite(&target_config_path, &temp_path, conf_content)
|
||||
.await
|
||||
.with_context(|| {
|
||||
@@ -3914,6 +3914,7 @@ pub(crate) mod harness {
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::deletion_queue::mock::MockDeletionQueue;
|
||||
use crate::walredo::apply_neon;
|
||||
use crate::{
|
||||
config::PageServerConf, repository::Key, tenant::Tenant, walrecord::NeonWalRecord,
|
||||
};
|
||||
@@ -4173,20 +4174,34 @@ pub(crate) mod harness {
|
||||
records: Vec<(Lsn, NeonWalRecord)>,
|
||||
_pg_version: u32,
|
||||
) -> anyhow::Result<Bytes> {
|
||||
let s = format!(
|
||||
"redo for {} to get to {}, with {} and {} records",
|
||||
key,
|
||||
lsn,
|
||||
if base_img.is_some() {
|
||||
"base image"
|
||||
} else {
|
||||
"no base image"
|
||||
},
|
||||
records.len()
|
||||
);
|
||||
println!("{s}");
|
||||
let records_neon = records.iter().all(|r| apply_neon::can_apply_in_neon(&r.1));
|
||||
|
||||
Ok(TEST_IMG(&s))
|
||||
if records_neon {
|
||||
// For Neon wal records, we can decode without spawning postgres, so do so.
|
||||
let base_img = base_img.expect("Neon WAL redo requires base image").1;
|
||||
let mut page = BytesMut::new();
|
||||
page.extend_from_slice(&base_img);
|
||||
for (_record_lsn, record) in records {
|
||||
apply_neon::apply_in_neon(&record, key, &mut page)?;
|
||||
}
|
||||
Ok(page.freeze())
|
||||
} else {
|
||||
// We never spawn a postgres walredo process in unit tests: just log what we might have done.
|
||||
let s = format!(
|
||||
"redo for {} to get to {}, with {} and {} records",
|
||||
key,
|
||||
lsn,
|
||||
if base_img.is_some() {
|
||||
"base image"
|
||||
} else {
|
||||
"no base image"
|
||||
},
|
||||
records.len()
|
||||
);
|
||||
println!("{s}");
|
||||
|
||||
Ok(TEST_IMG(&s))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -4358,7 +4373,6 @@ mod tests {
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut lsn = start_lsn;
|
||||
#[allow(non_snake_case)]
|
||||
{
|
||||
let writer = tline.writer().await;
|
||||
// Create a relation on the timeline
|
||||
|
||||
@@ -131,27 +131,23 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
|
||||
&mut self,
|
||||
src_buf: B,
|
||||
) -> (B::Buf, Result<(), Error>) {
|
||||
let src_buf_len = src_buf.bytes_init();
|
||||
let (src_buf, res) = if src_buf_len > 0 {
|
||||
let src_buf = src_buf.slice(0..src_buf_len);
|
||||
let res = self.inner.write_all(&src_buf).await;
|
||||
let src_buf = Slice::into_inner(src_buf);
|
||||
(src_buf, res)
|
||||
} else {
|
||||
let res = self.inner.write_all(&[]).await;
|
||||
(Slice::into_inner(src_buf.slice_full()), res)
|
||||
let (src_buf, res) = self.inner.write_all(src_buf).await;
|
||||
let nbytes = match res {
|
||||
Ok(nbytes) => nbytes,
|
||||
Err(e) => return (src_buf, Err(e)),
|
||||
};
|
||||
if let Ok(()) = &res {
|
||||
self.offset += src_buf_len as u64;
|
||||
}
|
||||
(src_buf, res)
|
||||
self.offset += nbytes as u64;
|
||||
(src_buf, Ok(()))
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
/// Flushes the internal buffer to the underlying `VirtualFile`.
|
||||
pub async fn flush_buffer(&mut self) -> Result<(), Error> {
|
||||
self.inner.write_all(&self.buf).await?;
|
||||
self.buf.clear();
|
||||
let buf = std::mem::take(&mut self.buf);
|
||||
let (mut buf, res) = self.inner.write_all(buf).await;
|
||||
res?;
|
||||
buf.clear();
|
||||
self.buf = buf;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -36,7 +36,6 @@ use crate::{
|
||||
pub const VALUE_SZ: usize = 5;
|
||||
pub const MAX_VALUE: u64 = 0x007f_ffff_ffff;
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub const PAGE_SZ: usize = 8192;
|
||||
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
|
||||
@@ -6,6 +6,7 @@ use crate::context::RequestContext;
|
||||
use crate::page_cache::{self, PAGE_SZ};
|
||||
use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
|
||||
use crate::virtual_file::{self, VirtualFile};
|
||||
use bytes::BytesMut;
|
||||
use camino::Utf8PathBuf;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::cmp::min;
|
||||
@@ -26,7 +27,10 @@ pub struct EphemeralFile {
|
||||
/// An ephemeral file is append-only.
|
||||
/// We keep the last page, which can still be modified, in [`Self::mutable_tail`].
|
||||
/// The other pages, which can no longer be modified, are accessed through the page cache.
|
||||
mutable_tail: [u8; PAGE_SZ],
|
||||
///
|
||||
/// None <=> IO is ongoing.
|
||||
/// Size is fixed to PAGE_SZ at creation time and must not be changed.
|
||||
mutable_tail: Option<BytesMut>,
|
||||
}
|
||||
|
||||
impl EphemeralFile {
|
||||
@@ -60,7 +64,7 @@ impl EphemeralFile {
|
||||
_timeline_id: timeline_id,
|
||||
file,
|
||||
len: 0,
|
||||
mutable_tail: [0u8; PAGE_SZ],
|
||||
mutable_tail: Some(BytesMut::zeroed(PAGE_SZ)),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -103,7 +107,13 @@ impl EphemeralFile {
|
||||
};
|
||||
} else {
|
||||
debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64);
|
||||
Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail))
|
||||
Ok(BlockLease::EphemeralFileMutableTail(
|
||||
self.mutable_tail
|
||||
.as_deref()
|
||||
.expect("we're not doing IO, it must be Some()")
|
||||
.try_into()
|
||||
.expect("we ensure that it's always PAGE_SZ"),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -135,21 +145,27 @@ impl EphemeralFile {
|
||||
) -> Result<(), io::Error> {
|
||||
let mut src_remaining = src;
|
||||
while !src_remaining.is_empty() {
|
||||
let dst_remaining = &mut self.ephemeral_file.mutable_tail[self.off..];
|
||||
let dst_remaining = &mut self
|
||||
.ephemeral_file
|
||||
.mutable_tail
|
||||
.as_deref_mut()
|
||||
.expect("IO is not yet ongoing")[self.off..];
|
||||
let n = min(dst_remaining.len(), src_remaining.len());
|
||||
dst_remaining[..n].copy_from_slice(&src_remaining[..n]);
|
||||
self.off += n;
|
||||
src_remaining = &src_remaining[n..];
|
||||
if self.off == PAGE_SZ {
|
||||
match self
|
||||
let mutable_tail = std::mem::take(&mut self.ephemeral_file.mutable_tail)
|
||||
.expect("IO is not yet ongoing");
|
||||
let (mutable_tail, res) = self
|
||||
.ephemeral_file
|
||||
.file
|
||||
.write_all_at(
|
||||
&self.ephemeral_file.mutable_tail,
|
||||
self.blknum as u64 * PAGE_SZ as u64,
|
||||
)
|
||||
.await
|
||||
{
|
||||
.write_all_at(mutable_tail, self.blknum as u64 * PAGE_SZ as u64)
|
||||
.await;
|
||||
// TODO: If we panic before we can put the mutable_tail back, subsequent calls will fail.
|
||||
// I.e., the IO isn't retryable if we panic.
|
||||
self.ephemeral_file.mutable_tail = Some(mutable_tail);
|
||||
match res {
|
||||
Ok(_) => {
|
||||
// Pre-warm the page cache with what we just wrote.
|
||||
// This isn't necessary for coherency/correctness, but it's how we've always done it.
|
||||
@@ -169,7 +185,12 @@ impl EphemeralFile {
|
||||
Ok(page_cache::ReadBufResult::NotFound(mut write_guard)) => {
|
||||
let buf: &mut [u8] = write_guard.deref_mut();
|
||||
debug_assert_eq!(buf.len(), PAGE_SZ);
|
||||
buf.copy_from_slice(&self.ephemeral_file.mutable_tail);
|
||||
buf.copy_from_slice(
|
||||
self.ephemeral_file
|
||||
.mutable_tail
|
||||
.as_deref()
|
||||
.expect("IO is not ongoing"),
|
||||
);
|
||||
let _ = write_guard.mark_valid();
|
||||
// pre-warm successful
|
||||
}
|
||||
@@ -181,7 +202,11 @@ impl EphemeralFile {
|
||||
// Zero the buffer for re-use.
|
||||
// Zeroing is critical for correcntess because the write_blob code below
|
||||
// and similarly read_blk expect zeroed pages.
|
||||
self.ephemeral_file.mutable_tail.fill(0);
|
||||
self.ephemeral_file
|
||||
.mutable_tail
|
||||
.as_deref_mut()
|
||||
.expect("IO is not ongoing")
|
||||
.fill(0);
|
||||
// This block is done, move to next one.
|
||||
self.blknum += 1;
|
||||
self.off = 0;
|
||||
|
||||
@@ -279,7 +279,7 @@ pub async fn save_metadata(
|
||||
let path = conf.metadata_path(tenant_shard_id, timeline_id);
|
||||
let temp_path = path_with_suffix_extension(&path, TEMP_FILE_SUFFIX);
|
||||
let metadata_bytes = data.to_bytes().context("serialize metadata")?;
|
||||
VirtualFile::crashsafe_overwrite(&path, &temp_path, &metadata_bytes)
|
||||
VirtualFile::crashsafe_overwrite(&path, &temp_path, metadata_bytes)
|
||||
.await
|
||||
.context("write metadata")?;
|
||||
Ok(())
|
||||
|
||||
@@ -486,7 +486,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
let heatmap_path_bg = heatmap_path.clone();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
tokio::runtime::Handle::current().block_on(async move {
|
||||
VirtualFile::crashsafe_overwrite(&heatmap_path_bg, &temp_path, &heatmap_bytes).await
|
||||
VirtualFile::crashsafe_overwrite(&heatmap_path_bg, &temp_path, heatmap_bytes).await
|
||||
})
|
||||
})
|
||||
.await
|
||||
|
||||
@@ -461,7 +461,8 @@ impl DeltaLayerWriterInner {
|
||||
file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))
|
||||
.await?;
|
||||
for buf in block_buf.blocks {
|
||||
file.write_all(buf.as_ref()).await?;
|
||||
let (_buf, res) = file.write_all(buf).await;
|
||||
res?;
|
||||
}
|
||||
assert!(self.lsn_range.start < self.lsn_range.end);
|
||||
// Fill in the summary on blk 0
|
||||
@@ -476,17 +477,12 @@ impl DeltaLayerWriterInner {
|
||||
index_root_blk,
|
||||
};
|
||||
|
||||
let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
|
||||
let mut buf = Vec::with_capacity(PAGE_SZ);
|
||||
// TODO: could use smallvec here but it's a pain with Slice<T>
|
||||
Summary::ser_into(&summary, &mut buf)?;
|
||||
if buf.spilled() {
|
||||
// This is bad as we only have one free block for the summary
|
||||
warn!(
|
||||
"Used more than one page size for summary buffer: {}",
|
||||
buf.len()
|
||||
);
|
||||
}
|
||||
file.seek(SeekFrom::Start(0)).await?;
|
||||
file.write_all(&buf).await?;
|
||||
let (_buf, res) = file.write_all(buf).await;
|
||||
res?;
|
||||
|
||||
let metadata = file
|
||||
.metadata()
|
||||
@@ -679,18 +675,12 @@ impl DeltaLayer {
|
||||
|
||||
let new_summary = rewrite(actual_summary);
|
||||
|
||||
let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
|
||||
let mut buf = Vec::with_capacity(PAGE_SZ);
|
||||
// TODO: could use smallvec here, but it's a pain with Slice<T>
|
||||
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
|
||||
if buf.spilled() {
|
||||
// The code in DeltaLayerWriterInner just warn!()s for this.
|
||||
// It should probably error out as well.
|
||||
return Err(RewriteSummaryError::Other(anyhow::anyhow!(
|
||||
"Used more than one page size for summary buffer: {}",
|
||||
buf.len()
|
||||
)));
|
||||
}
|
||||
file.seek(SeekFrom::Start(0)).await?;
|
||||
file.write_all(&buf).await?;
|
||||
let (_buf, res) = file.write_all(buf).await;
|
||||
res?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -341,18 +341,12 @@ impl ImageLayer {
|
||||
|
||||
let new_summary = rewrite(actual_summary);
|
||||
|
||||
let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
|
||||
let mut buf = Vec::with_capacity(PAGE_SZ);
|
||||
// TODO: could use smallvec here but it's a pain with Slice<T>
|
||||
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
|
||||
if buf.spilled() {
|
||||
// The code in ImageLayerWriterInner just warn!()s for this.
|
||||
// It should probably error out as well.
|
||||
return Err(RewriteSummaryError::Other(anyhow::anyhow!(
|
||||
"Used more than one page size for summary buffer: {}",
|
||||
buf.len()
|
||||
)));
|
||||
}
|
||||
file.seek(SeekFrom::Start(0)).await?;
|
||||
file.write_all(&buf).await?;
|
||||
let (_buf, res) = file.write_all(buf).await;
|
||||
res?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -555,7 +549,8 @@ impl ImageLayerWriterInner {
|
||||
.await?;
|
||||
let (index_root_blk, block_buf) = self.tree.finish()?;
|
||||
for buf in block_buf.blocks {
|
||||
file.write_all(buf.as_ref()).await?;
|
||||
let (_buf, res) = file.write_all(buf).await;
|
||||
res?;
|
||||
}
|
||||
|
||||
// Fill in the summary on blk 0
|
||||
@@ -570,17 +565,12 @@ impl ImageLayerWriterInner {
|
||||
index_root_blk,
|
||||
};
|
||||
|
||||
let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
|
||||
let mut buf = Vec::with_capacity(PAGE_SZ);
|
||||
// TODO: could use smallvec here but it's a pain with Slice<T>
|
||||
Summary::ser_into(&summary, &mut buf)?;
|
||||
if buf.spilled() {
|
||||
// This is bad as we only have one free block for the summary
|
||||
warn!(
|
||||
"Used more than one page size for summary buffer: {}",
|
||||
buf.len()
|
||||
);
|
||||
}
|
||||
file.seek(SeekFrom::Start(0)).await?;
|
||||
file.write_all(&buf).await?;
|
||||
let (_buf, res) = file.write_all(buf).await;
|
||||
res?;
|
||||
|
||||
let metadata = file
|
||||
.metadata()
|
||||
|
||||
@@ -14,6 +14,7 @@ use enumset::EnumSet;
|
||||
use fail::fail_point;
|
||||
use futures::stream::StreamExt;
|
||||
use itertools::Itertools;
|
||||
use once_cell::sync::Lazy;
|
||||
use pageserver_api::{
|
||||
keyspace::{key_range_size, KeySpaceAccum},
|
||||
models::{
|
||||
@@ -34,17 +35,22 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::sync::gate::Gate;
|
||||
|
||||
use std::collections::{BTreeMap, BinaryHeap, HashMap, HashSet};
|
||||
use std::ops::{Deref, Range};
|
||||
use std::pin::pin;
|
||||
use std::sync::atomic::Ordering as AtomicOrdering;
|
||||
use std::sync::{Arc, Mutex, RwLock, Weak};
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
use std::{
|
||||
array,
|
||||
collections::{BTreeMap, BinaryHeap, HashMap, HashSet},
|
||||
sync::atomic::AtomicU64,
|
||||
};
|
||||
use std::{
|
||||
cmp::{max, min, Ordering},
|
||||
ops::ControlFlow,
|
||||
};
|
||||
|
||||
use crate::pgdatadir_mapping::DirectoryKind;
|
||||
use crate::tenant::timeline::logical_size::CurrentLogicalSize;
|
||||
use crate::tenant::{
|
||||
layer_map::{LayerMap, SearchResult},
|
||||
@@ -258,6 +264,8 @@ pub struct Timeline {
|
||||
// in `crate::page_service` writes these metrics.
|
||||
pub(crate) query_metrics: crate::metrics::SmgrQueryTimePerTimeline,
|
||||
|
||||
directory_metrics: [AtomicU64; DirectoryKind::KINDS_NUM],
|
||||
|
||||
/// Ensures layers aren't frozen by checkpointer between
|
||||
/// [`Timeline::get_layer_for_write`] and layer reads.
|
||||
/// Locked automatically by [`TimelineWriter`] and checkpointer.
|
||||
@@ -790,6 +798,10 @@ impl Timeline {
|
||||
self.metrics.resident_physical_size_get()
|
||||
}
|
||||
|
||||
pub(crate) fn get_directory_metrics(&self) -> [u64; DirectoryKind::KINDS_NUM] {
|
||||
array::from_fn(|idx| self.directory_metrics[idx].load(AtomicOrdering::Relaxed))
|
||||
}
|
||||
|
||||
///
|
||||
/// Wait until WAL has been received and processed up to this LSN.
|
||||
///
|
||||
@@ -1496,6 +1508,8 @@ impl Timeline {
|
||||
&timeline_id,
|
||||
),
|
||||
|
||||
directory_metrics: array::from_fn(|_| AtomicU64::new(0)),
|
||||
|
||||
flush_loop_state: Mutex::new(FlushLoopState::NotStarted),
|
||||
|
||||
layer_flush_start_tx,
|
||||
@@ -2264,6 +2278,29 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn update_directory_entries_count(&self, kind: DirectoryKind, count: u64) {
|
||||
self.directory_metrics[kind.offset()].store(count, AtomicOrdering::Relaxed);
|
||||
let aux_metric =
|
||||
self.directory_metrics[DirectoryKind::AuxFiles.offset()].load(AtomicOrdering::Relaxed);
|
||||
|
||||
let sum_of_entries = self
|
||||
.directory_metrics
|
||||
.iter()
|
||||
.map(|v| v.load(AtomicOrdering::Relaxed))
|
||||
.sum();
|
||||
// Set a high general threshold and a lower threshold for the auxiliary files,
|
||||
// as we can have large numbers of relations in the db directory.
|
||||
const SUM_THRESHOLD: u64 = 5000;
|
||||
const AUX_THRESHOLD: u64 = 1000;
|
||||
if sum_of_entries >= SUM_THRESHOLD || aux_metric >= AUX_THRESHOLD {
|
||||
self.metrics
|
||||
.directory_entries_count_gauge
|
||||
.set(sum_of_entries);
|
||||
} else if let Some(metric) = Lazy::get(&self.metrics.directory_entries_count_gauge) {
|
||||
metric.set(sum_of_entries);
|
||||
}
|
||||
}
|
||||
|
||||
async fn find_layer(&self, layer_file_name: &str) -> Option<Layer> {
|
||||
let guard = self.layers.read().await;
|
||||
for historic_layer in guard.layer_map().iter_historic_layers() {
|
||||
|
||||
@@ -196,13 +196,13 @@ impl Timeline {
|
||||
ControlFlow::Continue(()) => (),
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[derive(Debug, Default)]
|
||||
struct EvictionStats {
|
||||
candidates: usize,
|
||||
evicted: usize,
|
||||
errors: usize,
|
||||
not_evictable: usize,
|
||||
#[allow(dead_code)]
|
||||
skipped_for_shutdown: usize,
|
||||
}
|
||||
|
||||
|
||||
@@ -19,7 +19,7 @@ use once_cell::sync::OnceCell;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::fs::{self, File};
|
||||
use std::io::{Error, ErrorKind, Seek, SeekFrom};
|
||||
use tokio_epoll_uring::IoBufMut;
|
||||
use tokio_epoll_uring::{BoundedBuf, IoBufMut, Slice};
|
||||
|
||||
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
|
||||
use std::os::unix::fs::FileExt;
|
||||
@@ -410,10 +410,10 @@ impl VirtualFile {
|
||||
/// step, the tmp path is renamed to the final path. As renames are
|
||||
/// atomic, a crash during the write operation will never leave behind a
|
||||
/// partially written file.
|
||||
pub async fn crashsafe_overwrite(
|
||||
pub async fn crashsafe_overwrite<B: BoundedBuf>(
|
||||
final_path: &Utf8Path,
|
||||
tmp_path: &Utf8Path,
|
||||
content: &[u8],
|
||||
content: B,
|
||||
) -> std::io::Result<()> {
|
||||
let Some(final_path_parent) = final_path.parent() else {
|
||||
return Err(std::io::Error::from_raw_os_error(
|
||||
@@ -430,7 +430,8 @@ impl VirtualFile {
|
||||
.create_new(true),
|
||||
)
|
||||
.await?;
|
||||
file.write_all(content).await?;
|
||||
let (_content, res) = file.write_all(content).await;
|
||||
res?;
|
||||
file.sync_all().await?;
|
||||
drop(file); // before the rename, that's important!
|
||||
// renames are atomic
|
||||
@@ -581,43 +582,69 @@ impl VirtualFile {
|
||||
}
|
||||
|
||||
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
|
||||
pub async fn write_all_at(&self, mut buf: &[u8], mut offset: u64) -> Result<(), Error> {
|
||||
pub async fn write_all_at<B: BoundedBuf>(
|
||||
&self,
|
||||
buf: B,
|
||||
mut offset: u64,
|
||||
) -> (B::Buf, Result<(), Error>) {
|
||||
let buf_len = buf.bytes_init();
|
||||
if buf_len == 0 {
|
||||
return (Slice::into_inner(buf.slice_full()), Ok(()));
|
||||
}
|
||||
let mut buf = buf.slice(0..buf_len);
|
||||
while !buf.is_empty() {
|
||||
match self.write_at(buf, offset).await {
|
||||
// TODO: push `buf` further down
|
||||
match self.write_at(&buf, offset).await {
|
||||
Ok(0) => {
|
||||
return Err(Error::new(
|
||||
std::io::ErrorKind::WriteZero,
|
||||
"failed to write whole buffer",
|
||||
));
|
||||
return (
|
||||
Slice::into_inner(buf),
|
||||
Err(Error::new(
|
||||
std::io::ErrorKind::WriteZero,
|
||||
"failed to write whole buffer",
|
||||
)),
|
||||
);
|
||||
}
|
||||
Ok(n) => {
|
||||
buf = &buf[n..];
|
||||
buf = buf.slice(n..);
|
||||
offset += n as u64;
|
||||
}
|
||||
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
|
||||
Err(e) => return Err(e),
|
||||
Err(e) => return (Slice::into_inner(buf), Err(e)),
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
(Slice::into_inner(buf), Ok(()))
|
||||
}
|
||||
|
||||
pub async fn write_all(&mut self, mut buf: &[u8]) -> Result<(), Error> {
|
||||
/// Writes `buf.slice(0..buf.bytes_init())`.
|
||||
/// Returns the IoBuf that is underlying the BoundedBuf `buf`.
|
||||
/// I.e., the returned value's `bytes_init()` method returns something different than the `bytes_init()` that was passed in.
|
||||
/// It's quite brittle and easy to mis-use, so, we return the size in the Ok() variant.
|
||||
pub async fn write_all<B: BoundedBuf>(&mut self, buf: B) -> (B::Buf, Result<usize, Error>) {
|
||||
let nbytes = buf.bytes_init();
|
||||
if nbytes == 0 {
|
||||
return (Slice::into_inner(buf.slice_full()), Ok(0));
|
||||
}
|
||||
let mut buf = buf.slice(0..nbytes);
|
||||
while !buf.is_empty() {
|
||||
match self.write(buf).await {
|
||||
// TODO: push `Slice` further down
|
||||
match self.write(&buf).await {
|
||||
Ok(0) => {
|
||||
return Err(Error::new(
|
||||
std::io::ErrorKind::WriteZero,
|
||||
"failed to write whole buffer",
|
||||
));
|
||||
return (
|
||||
Slice::into_inner(buf),
|
||||
Err(Error::new(
|
||||
std::io::ErrorKind::WriteZero,
|
||||
"failed to write whole buffer",
|
||||
)),
|
||||
);
|
||||
}
|
||||
Ok(n) => {
|
||||
buf = &buf[n..];
|
||||
buf = buf.slice(n..);
|
||||
}
|
||||
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
|
||||
Err(e) => return Err(e),
|
||||
Err(e) => return (Slice::into_inner(buf), Err(e)),
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
(Slice::into_inner(buf), Ok(nbytes))
|
||||
}
|
||||
|
||||
async fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
|
||||
@@ -676,7 +703,6 @@ where
|
||||
F: FnMut(tokio_epoll_uring::Slice<B>, u64) -> Fut,
|
||||
Fut: std::future::Future<Output = (tokio_epoll_uring::Slice<B>, std::io::Result<usize>)>,
|
||||
{
|
||||
use tokio_epoll_uring::BoundedBuf;
|
||||
let mut buf: tokio_epoll_uring::Slice<B> = buf.slice_full(); // includes all the uninitialized memory
|
||||
while buf.bytes_total() != 0 {
|
||||
let res;
|
||||
@@ -1051,10 +1077,19 @@ mod tests {
|
||||
MaybeVirtualFile::File(file) => file.read_exact_at(&mut buf, offset).map(|()| buf),
|
||||
}
|
||||
}
|
||||
async fn write_all_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> {
|
||||
async fn write_all_at<B: BoundedBuf>(&self, buf: B, offset: u64) -> Result<(), Error> {
|
||||
match self {
|
||||
MaybeVirtualFile::VirtualFile(file) => file.write_all_at(buf, offset).await,
|
||||
MaybeVirtualFile::File(file) => file.write_all_at(buf, offset),
|
||||
MaybeVirtualFile::VirtualFile(file) => {
|
||||
let (_buf, res) = file.write_all_at(buf, offset).await;
|
||||
res
|
||||
}
|
||||
MaybeVirtualFile::File(file) => {
|
||||
let buf_len = buf.bytes_init();
|
||||
if buf_len == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
file.write_all_at(&buf.slice(0..buf_len), offset)
|
||||
}
|
||||
}
|
||||
}
|
||||
async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
|
||||
@@ -1063,10 +1098,19 @@ mod tests {
|
||||
MaybeVirtualFile::File(file) => file.seek(pos),
|
||||
}
|
||||
}
|
||||
async fn write_all(&mut self, buf: &[u8]) -> Result<(), Error> {
|
||||
async fn write_all<B: BoundedBuf>(&mut self, buf: B) -> Result<(), Error> {
|
||||
match self {
|
||||
MaybeVirtualFile::VirtualFile(file) => file.write_all(buf).await,
|
||||
MaybeVirtualFile::File(file) => file.write_all(buf),
|
||||
MaybeVirtualFile::VirtualFile(file) => {
|
||||
let (_buf, res) = file.write_all(buf).await;
|
||||
res.map(|_| ())
|
||||
}
|
||||
MaybeVirtualFile::File(file) => {
|
||||
let buf_len = buf.bytes_init();
|
||||
if buf_len == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
file.write_all(&buf.slice(0..buf_len))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1141,7 +1185,7 @@ mod tests {
|
||||
.to_owned(),
|
||||
)
|
||||
.await?;
|
||||
file_a.write_all(b"foobar").await?;
|
||||
file_a.write_all(b"foobar".to_vec()).await?;
|
||||
|
||||
// cannot read from a file opened in write-only mode
|
||||
let _ = file_a.read_string().await.unwrap_err();
|
||||
@@ -1150,7 +1194,7 @@ mod tests {
|
||||
let mut file_a = openfunc(path_a, OpenOptions::new().read(true).to_owned()).await?;
|
||||
|
||||
// cannot write to a file opened in read-only mode
|
||||
let _ = file_a.write_all(b"bar").await.unwrap_err();
|
||||
let _ = file_a.write_all(b"bar".to_vec()).await.unwrap_err();
|
||||
|
||||
// Try simple read
|
||||
assert_eq!("foobar", file_a.read_string().await?);
|
||||
@@ -1192,8 +1236,8 @@ mod tests {
|
||||
.to_owned(),
|
||||
)
|
||||
.await?;
|
||||
file_b.write_all_at(b"BAR", 3).await?;
|
||||
file_b.write_all_at(b"FOO", 0).await?;
|
||||
file_b.write_all_at(b"BAR".to_vec(), 3).await?;
|
||||
file_b.write_all_at(b"FOO".to_vec(), 0).await?;
|
||||
|
||||
assert_eq!(file_b.read_string_at(2, 3).await?, "OBA");
|
||||
|
||||
@@ -1293,7 +1337,7 @@ mod tests {
|
||||
let path = testdir.join("myfile");
|
||||
let tmp_path = testdir.join("myfile.tmp");
|
||||
|
||||
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo")
|
||||
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo".to_vec())
|
||||
.await
|
||||
.unwrap();
|
||||
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
|
||||
@@ -1302,7 +1346,7 @@ mod tests {
|
||||
assert!(!tmp_path.exists());
|
||||
drop(file);
|
||||
|
||||
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"bar")
|
||||
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"bar".to_vec())
|
||||
.await
|
||||
.unwrap();
|
||||
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
|
||||
@@ -1324,7 +1368,7 @@ mod tests {
|
||||
std::fs::write(&tmp_path, "some preexisting junk that should be removed").unwrap();
|
||||
assert!(tmp_path.exists());
|
||||
|
||||
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo")
|
||||
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo".to_vec())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
||||
@@ -44,6 +44,11 @@ pub enum NeonWalRecord {
|
||||
moff: MultiXactOffset,
|
||||
members: Vec<MultiXactMember>,
|
||||
},
|
||||
/// Update the map of AUX files, either writing or dropping an entry
|
||||
AuxFile {
|
||||
file_path: String,
|
||||
content: Option<Bytes>,
|
||||
},
|
||||
}
|
||||
|
||||
impl NeonWalRecord {
|
||||
|
||||
@@ -22,7 +22,7 @@
|
||||
mod process;
|
||||
|
||||
/// Code to apply [`NeonWalRecord`]s.
|
||||
mod apply_neon;
|
||||
pub(crate) mod apply_neon;
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::metrics::{
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
use crate::pgdatadir_mapping::AuxFilesDirectory;
|
||||
use crate::walrecord::NeonWalRecord;
|
||||
use anyhow::Context;
|
||||
use byteorder::{ByteOrder, LittleEndian};
|
||||
use bytes::BytesMut;
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use pageserver_api::key::{key_to_rel_block, key_to_slru_block, Key};
|
||||
use pageserver_api::reltag::SlruKind;
|
||||
use postgres_ffi::pg_constants;
|
||||
@@ -12,6 +13,7 @@ use postgres_ffi::v14::nonrelfile_utils::{
|
||||
};
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use tracing::*;
|
||||
use utils::bin_ser::BeSer;
|
||||
|
||||
/// Can this request be served by neon redo functions
|
||||
/// or we need to pass it to wal-redo postgres process?
|
||||
@@ -230,6 +232,72 @@ pub(crate) fn apply_in_neon(
|
||||
LittleEndian::write_u32(&mut page[memberoff..memberoff + 4], member.xid);
|
||||
}
|
||||
}
|
||||
NeonWalRecord::AuxFile { file_path, content } => {
|
||||
let mut dir = AuxFilesDirectory::des(page)?;
|
||||
dir.upsert(file_path.clone(), content.clone());
|
||||
|
||||
page.clear();
|
||||
let mut writer = page.writer();
|
||||
dir.ser_into(&mut writer)?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use bytes::Bytes;
|
||||
use pageserver_api::key::AUX_FILES_KEY;
|
||||
|
||||
use super::*;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use crate::{pgdatadir_mapping::AuxFilesDirectory, walrecord::NeonWalRecord};
|
||||
|
||||
/// Test [`apply_in_neon`]'s handling of NeonWalRecord::AuxFile
|
||||
#[test]
|
||||
fn apply_aux_file_deltas() -> anyhow::Result<()> {
|
||||
let base_dir = AuxFilesDirectory {
|
||||
files: HashMap::from([
|
||||
("two".to_string(), Bytes::from_static(b"content0")),
|
||||
("three".to_string(), Bytes::from_static(b"contentX")),
|
||||
]),
|
||||
};
|
||||
let base_image = AuxFilesDirectory::ser(&base_dir)?;
|
||||
|
||||
let deltas = vec![
|
||||
// Insert
|
||||
NeonWalRecord::AuxFile {
|
||||
file_path: "one".to_string(),
|
||||
content: Some(Bytes::from_static(b"content1")),
|
||||
},
|
||||
// Update
|
||||
NeonWalRecord::AuxFile {
|
||||
file_path: "two".to_string(),
|
||||
content: Some(Bytes::from_static(b"content99")),
|
||||
},
|
||||
// Delete
|
||||
NeonWalRecord::AuxFile {
|
||||
file_path: "three".to_string(),
|
||||
content: None,
|
||||
},
|
||||
];
|
||||
|
||||
let file_path = AUX_FILES_KEY;
|
||||
let mut page = BytesMut::from_iter(base_image);
|
||||
|
||||
for record in deltas {
|
||||
apply_in_neon(&record, file_path, &mut page)?;
|
||||
}
|
||||
|
||||
let reconstructed = AuxFilesDirectory::des(&page)?;
|
||||
let expect = HashMap::from([
|
||||
("one".to_string(), Bytes::from_static(b"content1")),
|
||||
("two".to_string(), Bytes::from_static(b"content99")),
|
||||
]);
|
||||
|
||||
assert_eq!(reconstructed.files, expect);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3079,14 +3079,6 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
|
||||
XLogRecGetBlockTag(record, block_id, &rinfo, &forknum, &blkno);
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Out of an abundance of caution, we always run redo on shared catalogs,
|
||||
* regardless of whether the block is stored in shared buffers. See also
|
||||
* this function's top comment.
|
||||
*/
|
||||
if (!OidIsValid(NInfoGetDbOid(rinfo)))
|
||||
return false;
|
||||
|
||||
CopyNRelFileInfoToBufTag(tag, rinfo);
|
||||
tag.forkNum = forknum;
|
||||
tag.blockNum = blkno;
|
||||
@@ -3100,17 +3092,28 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
|
||||
*/
|
||||
LWLockAcquire(partitionLock, LW_SHARED);
|
||||
|
||||
/* Try to find the relevant buffer */
|
||||
buffer = BufTableLookup(&tag, hash);
|
||||
|
||||
no_redo_needed = buffer < 0;
|
||||
/*
|
||||
* Out of an abundance of caution, we always run redo on shared catalogs,
|
||||
* regardless of whether the block is stored in shared buffers. See also
|
||||
* this function's top comment.
|
||||
*/
|
||||
if (!OidIsValid(NInfoGetDbOid(rinfo)))
|
||||
{
|
||||
no_redo_needed = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Try to find the relevant buffer */
|
||||
buffer = BufTableLookup(&tag, hash);
|
||||
|
||||
no_redo_needed = buffer < 0;
|
||||
}
|
||||
/* In both cases st lwlsn past this WAL record */
|
||||
SetLastWrittenLSNForBlock(end_recptr, rinfo, forknum, blkno);
|
||||
|
||||
/*
|
||||
* we don't have the buffer in memory, update lwLsn past this record, also
|
||||
* evict page fro file cache
|
||||
* evict page from file cache
|
||||
*/
|
||||
if (no_redo_needed)
|
||||
lfc_evict(rinfo, forknum, blkno);
|
||||
|
||||
@@ -7,6 +7,24 @@ AS 'MODULE_PATHNAME', 'test_consume_xids'
|
||||
LANGUAGE C STRICT
|
||||
PARALLEL UNSAFE;
|
||||
|
||||
CREATE FUNCTION test_consume_cpu(seconds int)
|
||||
RETURNS VOID
|
||||
AS 'MODULE_PATHNAME', 'test_consume_cpu'
|
||||
LANGUAGE C STRICT
|
||||
PARALLEL UNSAFE;
|
||||
|
||||
CREATE FUNCTION test_consume_memory(megabytes int)
|
||||
RETURNS VOID
|
||||
AS 'MODULE_PATHNAME', 'test_consume_memory'
|
||||
LANGUAGE C STRICT
|
||||
PARALLEL UNSAFE;
|
||||
|
||||
CREATE FUNCTION test_release_memory(megabytes int DEFAULT NULL)
|
||||
RETURNS VOID
|
||||
AS 'MODULE_PATHNAME', 'test_release_memory'
|
||||
LANGUAGE C
|
||||
PARALLEL UNSAFE;
|
||||
|
||||
CREATE FUNCTION clear_buffer_cache()
|
||||
RETURNS VOID
|
||||
AS 'MODULE_PATHNAME', 'clear_buffer_cache'
|
||||
|
||||
@@ -3,3 +3,4 @@ comment = 'helpers for neon testing and debugging'
|
||||
default_version = '1.0'
|
||||
module_pathname = '$libdir/neon_test_utils'
|
||||
relocatable = true
|
||||
trusted = true
|
||||
|
||||
@@ -21,10 +21,12 @@
|
||||
#include "miscadmin.h"
|
||||
#include "storage/buf_internals.h"
|
||||
#include "storage/bufmgr.h"
|
||||
#include "storage/fd.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/pg_lsn.h"
|
||||
#include "utils/rel.h"
|
||||
#include "utils/varlena.h"
|
||||
#include "utils/wait_event.h"
|
||||
#include "../neon/pagestore_client.h"
|
||||
|
||||
PG_MODULE_MAGIC;
|
||||
@@ -32,6 +34,9 @@ PG_MODULE_MAGIC;
|
||||
extern void _PG_init(void);
|
||||
|
||||
PG_FUNCTION_INFO_V1(test_consume_xids);
|
||||
PG_FUNCTION_INFO_V1(test_consume_cpu);
|
||||
PG_FUNCTION_INFO_V1(test_consume_memory);
|
||||
PG_FUNCTION_INFO_V1(test_release_memory);
|
||||
PG_FUNCTION_INFO_V1(clear_buffer_cache);
|
||||
PG_FUNCTION_INFO_V1(get_raw_page_at_lsn);
|
||||
PG_FUNCTION_INFO_V1(get_raw_page_at_lsn_ex);
|
||||
@@ -97,6 +102,119 @@ test_consume_xids(PG_FUNCTION_ARGS)
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* test_consume_cpu(seconds int). Keeps one CPU busy for the given number of seconds.
|
||||
*/
|
||||
Datum
|
||||
test_consume_cpu(PG_FUNCTION_ARGS)
|
||||
{
|
||||
int32 seconds = PG_GETARG_INT32(0);
|
||||
TimestampTz start;
|
||||
uint64 total_iterations = 0;
|
||||
|
||||
start = GetCurrentTimestamp();
|
||||
|
||||
for (;;)
|
||||
{
|
||||
TimestampTz elapsed;
|
||||
|
||||
elapsed = GetCurrentTimestamp() - start;
|
||||
if (elapsed > (TimestampTz) seconds * USECS_PER_SEC)
|
||||
break;
|
||||
|
||||
/* keep spinning */
|
||||
for (int i = 0; i < 1000000; i++)
|
||||
total_iterations++;
|
||||
elog(DEBUG2, "test_consume_cpu(): %lu iterations in total", total_iterations);
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
}
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
static MemoryContext consume_cxt = NULL;
|
||||
static slist_head consumed_memory_chunks;
|
||||
static int64 num_memory_chunks;
|
||||
|
||||
/*
|
||||
* test_consume_memory(megabytes int).
|
||||
*
|
||||
* Consume given amount of memory. The allocation is made in TopMemoryContext,
|
||||
* so it outlives the function, until you call test_release_memory to
|
||||
* explicitly release it, or close the session.
|
||||
*/
|
||||
Datum
|
||||
test_consume_memory(PG_FUNCTION_ARGS)
|
||||
{
|
||||
int32 megabytes = PG_GETARG_INT32(0);
|
||||
|
||||
/*
|
||||
* Consume the memory in a new memory context, so that it's convenient to
|
||||
* release and to display it separately in a possible memory context dump.
|
||||
*/
|
||||
if (consume_cxt == NULL)
|
||||
consume_cxt = AllocSetContextCreate(TopMemoryContext,
|
||||
"test_consume_memory",
|
||||
ALLOCSET_DEFAULT_SIZES);
|
||||
|
||||
for (int32 i = 0; i < megabytes; i++)
|
||||
{
|
||||
char *p;
|
||||
|
||||
p = MemoryContextAllocZero(consume_cxt, 1024 * 1024);
|
||||
|
||||
/* touch the memory, so that it's really allocated by the kernel */
|
||||
for (int j = 0; j < 1024 * 1024; j += 1024)
|
||||
p[j] = j % 0xFF;
|
||||
|
||||
slist_push_head(&consumed_memory_chunks, (slist_node *) p);
|
||||
num_memory_chunks++;
|
||||
}
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
/*
|
||||
* test_release_memory(megabytes int). NULL releases all
|
||||
*/
|
||||
Datum
|
||||
test_release_memory(PG_FUNCTION_ARGS)
|
||||
{
|
||||
TimestampTz start;
|
||||
|
||||
if (PG_ARGISNULL(0))
|
||||
{
|
||||
if (consume_cxt)
|
||||
{
|
||||
MemoryContextDelete(consume_cxt);
|
||||
consume_cxt = NULL;
|
||||
num_memory_chunks = 0;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
int32 chunks_to_release = PG_GETARG_INT32(0);
|
||||
|
||||
if (chunks_to_release > num_memory_chunks)
|
||||
{
|
||||
elog(WARNING, "only %lu MB is consumed, releasing it all", num_memory_chunks);
|
||||
chunks_to_release = num_memory_chunks;
|
||||
}
|
||||
|
||||
for (int32 i = 0; i < chunks_to_release; i++)
|
||||
{
|
||||
slist_node *chunk = slist_pop_head_node(&consumed_memory_chunks);
|
||||
|
||||
pfree(chunk);
|
||||
num_memory_chunks--;
|
||||
}
|
||||
}
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
/*
|
||||
* Flush the buffer cache, evicting all pages that are not currently pinned.
|
||||
*/
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
use futures::future::Either;
|
||||
use proxy::auth;
|
||||
use proxy::auth::backend::MaybeOwned;
|
||||
use proxy::cancellation::CancelMap;
|
||||
use proxy::cancellation::CancellationHandler;
|
||||
use proxy::config::AuthenticationConfig;
|
||||
use proxy::config::CacheOptions;
|
||||
use proxy::config::HttpConfig;
|
||||
@@ -12,6 +14,7 @@ use proxy::rate_limiter::EndpointRateLimiter;
|
||||
use proxy::rate_limiter::RateBucketInfo;
|
||||
use proxy::rate_limiter::RateLimiterConfig;
|
||||
use proxy::redis::notifications;
|
||||
use proxy::redis::publisher::RedisPublisherClient;
|
||||
use proxy::serverless::GlobalConnPoolOptions;
|
||||
use proxy::usage_metrics;
|
||||
|
||||
@@ -22,6 +25,7 @@ use std::net::SocketAddr;
|
||||
use std::pin::pin;
|
||||
use std::sync::Arc;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::info;
|
||||
@@ -129,6 +133,9 @@ struct ProxyCliArgs {
|
||||
/// Can be given multiple times for different bucket sizes.
|
||||
#[clap(long, default_values_t = RateBucketInfo::DEFAULT_SET)]
|
||||
endpoint_rps_limit: Vec<RateBucketInfo>,
|
||||
/// Redis rate limiter max number of requests per second.
|
||||
#[clap(long, default_values_t = RateBucketInfo::DEFAULT_SET)]
|
||||
redis_rps_limit: Vec<RateBucketInfo>,
|
||||
/// Initial limit for dynamic rate limiter. Makes sense only if `rate_limit_algorithm` is *not* `None`.
|
||||
#[clap(long, default_value_t = 100)]
|
||||
initial_limit: usize,
|
||||
@@ -225,6 +232,19 @@ async fn main() -> anyhow::Result<()> {
|
||||
let cancellation_token = CancellationToken::new();
|
||||
|
||||
let endpoint_rate_limiter = Arc::new(EndpointRateLimiter::new(&config.endpoint_rps_limit));
|
||||
let cancel_map = CancelMap::default();
|
||||
let redis_publisher = match &args.redis_notifications {
|
||||
Some(url) => Some(Arc::new(Mutex::new(RedisPublisherClient::new(
|
||||
url,
|
||||
args.region.clone(),
|
||||
&config.redis_rps_limit,
|
||||
)?))),
|
||||
None => None,
|
||||
};
|
||||
let cancellation_handler = Arc::new(CancellationHandler::new(
|
||||
cancel_map.clone(),
|
||||
redis_publisher,
|
||||
));
|
||||
|
||||
// client facing tasks. these will exit on error or on cancellation
|
||||
// cancellation returns Ok(())
|
||||
@@ -234,6 +254,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
proxy_listener,
|
||||
cancellation_token.clone(),
|
||||
endpoint_rate_limiter.clone(),
|
||||
cancellation_handler.clone(),
|
||||
));
|
||||
|
||||
// TODO: rename the argument to something like serverless.
|
||||
@@ -248,6 +269,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
serverless_listener,
|
||||
cancellation_token.clone(),
|
||||
endpoint_rate_limiter.clone(),
|
||||
cancellation_handler.clone(),
|
||||
));
|
||||
}
|
||||
|
||||
@@ -271,7 +293,12 @@ async fn main() -> anyhow::Result<()> {
|
||||
let cache = api.caches.project_info.clone();
|
||||
if let Some(url) = args.redis_notifications {
|
||||
info!("Starting redis notifications listener ({url})");
|
||||
maintenance_tasks.spawn(notifications::task_main(url.to_owned(), cache.clone()));
|
||||
maintenance_tasks.spawn(notifications::task_main(
|
||||
url.to_owned(),
|
||||
cache.clone(),
|
||||
cancel_map.clone(),
|
||||
args.region.clone(),
|
||||
));
|
||||
}
|
||||
maintenance_tasks.spawn(async move { cache.clone().gc_worker().await });
|
||||
}
|
||||
@@ -403,6 +430,8 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
|
||||
let mut endpoint_rps_limit = args.endpoint_rps_limit.clone();
|
||||
RateBucketInfo::validate(&mut endpoint_rps_limit)?;
|
||||
let mut redis_rps_limit = args.redis_rps_limit.clone();
|
||||
RateBucketInfo::validate(&mut redis_rps_limit)?;
|
||||
|
||||
let config = Box::leak(Box::new(ProxyConfig {
|
||||
tls_config,
|
||||
@@ -414,6 +443,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
|
||||
require_client_ip: args.require_client_ip,
|
||||
disable_ip_check_for_http: args.disable_ip_check_for_http,
|
||||
endpoint_rps_limit,
|
||||
redis_rps_limit,
|
||||
handshake_timeout: args.handshake_timeout,
|
||||
// TODO: add this argument
|
||||
region: args.region.clone(),
|
||||
|
||||
@@ -1,16 +1,28 @@
|
||||
use async_trait::async_trait;
|
||||
use dashmap::DashMap;
|
||||
use pq_proto::CancelKeyData;
|
||||
use std::{net::SocketAddr, sync::Arc};
|
||||
use thiserror::Error;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio_postgres::{CancelToken, NoTls};
|
||||
use tracing::info;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::error::ReportableError;
|
||||
use crate::{
|
||||
error::ReportableError, metrics::NUM_CANCELLATION_REQUESTS,
|
||||
redis::publisher::RedisPublisherClient,
|
||||
};
|
||||
|
||||
pub type CancelMap = Arc<DashMap<CancelKeyData, Option<CancelClosure>>>;
|
||||
|
||||
/// Enables serving `CancelRequest`s.
|
||||
#[derive(Default)]
|
||||
pub struct CancelMap(DashMap<CancelKeyData, Option<CancelClosure>>);
|
||||
///
|
||||
/// If there is a `RedisPublisherClient` available, it will be used to publish the cancellation key to other proxy instances.
|
||||
pub struct CancellationHandler {
|
||||
map: CancelMap,
|
||||
redis_client: Option<Arc<Mutex<RedisPublisherClient>>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum CancelError {
|
||||
@@ -32,15 +44,43 @@ impl ReportableError for CancelError {
|
||||
}
|
||||
}
|
||||
|
||||
impl CancelMap {
|
||||
impl CancellationHandler {
|
||||
pub fn new(map: CancelMap, redis_client: Option<Arc<Mutex<RedisPublisherClient>>>) -> Self {
|
||||
Self { map, redis_client }
|
||||
}
|
||||
/// Cancel a running query for the corresponding connection.
|
||||
pub async fn cancel_session(&self, key: CancelKeyData) -> Result<(), CancelError> {
|
||||
pub async fn cancel_session(
|
||||
&self,
|
||||
key: CancelKeyData,
|
||||
session_id: Uuid,
|
||||
) -> Result<(), CancelError> {
|
||||
let from = "from_client";
|
||||
// NB: we should immediately release the lock after cloning the token.
|
||||
let Some(cancel_closure) = self.0.get(&key).and_then(|x| x.clone()) else {
|
||||
let Some(cancel_closure) = self.map.get(&key).and_then(|x| x.clone()) else {
|
||||
tracing::warn!("query cancellation key not found: {key}");
|
||||
if let Some(redis_client) = &self.redis_client {
|
||||
NUM_CANCELLATION_REQUESTS
|
||||
.with_label_values(&[from, "not_found"])
|
||||
.inc();
|
||||
info!("publishing cancellation key to Redis");
|
||||
match redis_client.lock().await.try_publish(key, session_id).await {
|
||||
Ok(()) => {
|
||||
info!("cancellation key successfuly published to Redis");
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("failed to publish a message: {e}");
|
||||
return Err(CancelError::IO(std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
e.to_string(),
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
NUM_CANCELLATION_REQUESTS
|
||||
.with_label_values(&[from, "found"])
|
||||
.inc();
|
||||
info!("cancelling query per user's request using key {key}");
|
||||
cancel_closure.try_cancel_query().await
|
||||
}
|
||||
@@ -57,7 +97,7 @@ impl CancelMap {
|
||||
|
||||
// Random key collisions are unlikely to happen here, but they're still possible,
|
||||
// which is why we have to take care not to rewrite an existing key.
|
||||
match self.0.entry(key) {
|
||||
match self.map.entry(key) {
|
||||
dashmap::mapref::entry::Entry::Occupied(_) => continue,
|
||||
dashmap::mapref::entry::Entry::Vacant(e) => {
|
||||
e.insert(None);
|
||||
@@ -69,18 +109,46 @@ impl CancelMap {
|
||||
info!("registered new query cancellation key {key}");
|
||||
Session {
|
||||
key,
|
||||
cancel_map: self,
|
||||
cancellation_handler: self,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn contains(&self, session: &Session) -> bool {
|
||||
self.0.contains_key(&session.key)
|
||||
self.map.contains_key(&session.key)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn is_empty(&self) -> bool {
|
||||
self.0.is_empty()
|
||||
self.map.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait NotificationsCancellationHandler {
|
||||
async fn cancel_session_no_publish(&self, key: CancelKeyData) -> Result<(), CancelError>;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl NotificationsCancellationHandler for CancellationHandler {
|
||||
async fn cancel_session_no_publish(&self, key: CancelKeyData) -> Result<(), CancelError> {
|
||||
let from = "from_redis";
|
||||
let cancel_closure = self.map.get(&key).and_then(|x| x.clone());
|
||||
match cancel_closure {
|
||||
Some(cancel_closure) => {
|
||||
NUM_CANCELLATION_REQUESTS
|
||||
.with_label_values(&[from, "found"])
|
||||
.inc();
|
||||
cancel_closure.try_cancel_query().await
|
||||
}
|
||||
None => {
|
||||
NUM_CANCELLATION_REQUESTS
|
||||
.with_label_values(&[from, "not_found"])
|
||||
.inc();
|
||||
tracing::warn!("query cancellation key not found: {key}");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -115,7 +183,7 @@ pub struct Session {
|
||||
/// The user-facing key identifying this session.
|
||||
key: CancelKeyData,
|
||||
/// The [`CancelMap`] this session belongs to.
|
||||
cancel_map: Arc<CancelMap>,
|
||||
cancellation_handler: Arc<CancellationHandler>,
|
||||
}
|
||||
|
||||
impl Session {
|
||||
@@ -123,7 +191,9 @@ impl Session {
|
||||
/// This enables query cancellation in `crate::proxy::prepare_client_connection`.
|
||||
pub fn enable_query_cancellation(&self, cancel_closure: CancelClosure) -> CancelKeyData {
|
||||
info!("enabling query cancellation for this session");
|
||||
self.cancel_map.0.insert(self.key, Some(cancel_closure));
|
||||
self.cancellation_handler
|
||||
.map
|
||||
.insert(self.key, Some(cancel_closure));
|
||||
|
||||
self.key
|
||||
}
|
||||
@@ -131,7 +201,7 @@ impl Session {
|
||||
|
||||
impl Drop for Session {
|
||||
fn drop(&mut self) {
|
||||
self.cancel_map.0.remove(&self.key);
|
||||
self.cancellation_handler.map.remove(&self.key);
|
||||
info!("dropped query cancellation key {}", &self.key);
|
||||
}
|
||||
}
|
||||
@@ -142,13 +212,16 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn check_session_drop() -> anyhow::Result<()> {
|
||||
let cancel_map: Arc<CancelMap> = Default::default();
|
||||
let cancellation_handler = Arc::new(CancellationHandler {
|
||||
map: CancelMap::default(),
|
||||
redis_client: None,
|
||||
});
|
||||
|
||||
let session = cancel_map.clone().get_session();
|
||||
assert!(cancel_map.contains(&session));
|
||||
let session = cancellation_handler.clone().get_session();
|
||||
assert!(cancellation_handler.contains(&session));
|
||||
drop(session);
|
||||
// Check that the session has been dropped.
|
||||
assert!(cancel_map.is_empty());
|
||||
assert!(cancellation_handler.is_empty());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ pub struct ProxyConfig {
|
||||
pub require_client_ip: bool,
|
||||
pub disable_ip_check_for_http: bool,
|
||||
pub endpoint_rps_limit: Vec<RateBucketInfo>,
|
||||
pub redis_rps_limit: Vec<RateBucketInfo>,
|
||||
pub region: String,
|
||||
pub handshake_timeout: Duration,
|
||||
}
|
||||
|
||||
@@ -152,6 +152,15 @@ pub static NUM_OPEN_CLIENTS_IN_HTTP_POOL: Lazy<IntGauge> = Lazy::new(|| {
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static NUM_CANCELLATION_REQUESTS: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_cancellation_requests_total",
|
||||
"Number of cancellation requests (per found/not_found).",
|
||||
&["source", "kind"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct LatencyTimer {
|
||||
// time since the stopwatch was started
|
||||
|
||||
@@ -10,7 +10,7 @@ pub mod wake_compute;
|
||||
|
||||
use crate::{
|
||||
auth,
|
||||
cancellation::{self, CancelMap},
|
||||
cancellation::{self, CancellationHandler},
|
||||
compute,
|
||||
config::{ProxyConfig, TlsConfig},
|
||||
context::RequestMonitoring,
|
||||
@@ -62,6 +62,7 @@ pub async fn task_main(
|
||||
listener: tokio::net::TcpListener,
|
||||
cancellation_token: CancellationToken,
|
||||
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
cancellation_handler: Arc<CancellationHandler>,
|
||||
) -> anyhow::Result<()> {
|
||||
scopeguard::defer! {
|
||||
info!("proxy has shut down");
|
||||
@@ -72,7 +73,6 @@ pub async fn task_main(
|
||||
socket2::SockRef::from(&listener).set_keepalive(true)?;
|
||||
|
||||
let connections = tokio_util::task::task_tracker::TaskTracker::new();
|
||||
let cancel_map = Arc::new(CancelMap::default());
|
||||
|
||||
while let Some(accept_result) =
|
||||
run_until_cancelled(listener.accept(), &cancellation_token).await
|
||||
@@ -80,7 +80,7 @@ pub async fn task_main(
|
||||
let (socket, peer_addr) = accept_result?;
|
||||
|
||||
let session_id = uuid::Uuid::new_v4();
|
||||
let cancel_map = Arc::clone(&cancel_map);
|
||||
let cancellation_handler = Arc::clone(&cancellation_handler);
|
||||
let endpoint_rate_limiter = endpoint_rate_limiter.clone();
|
||||
|
||||
let session_span = info_span!(
|
||||
@@ -113,7 +113,7 @@ pub async fn task_main(
|
||||
let res = handle_client(
|
||||
config,
|
||||
&mut ctx,
|
||||
cancel_map,
|
||||
cancellation_handler,
|
||||
socket,
|
||||
ClientMode::Tcp,
|
||||
endpoint_rate_limiter,
|
||||
@@ -227,7 +227,7 @@ impl ReportableError for ClientRequestError {
|
||||
pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
config: &'static ProxyConfig,
|
||||
ctx: &mut RequestMonitoring,
|
||||
cancel_map: Arc<CancelMap>,
|
||||
cancellation_handler: Arc<CancellationHandler>,
|
||||
stream: S,
|
||||
mode: ClientMode,
|
||||
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
@@ -253,8 +253,8 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
match tokio::time::timeout(config.handshake_timeout, do_handshake).await?? {
|
||||
HandshakeData::Startup(stream, params) => (stream, params),
|
||||
HandshakeData::Cancel(cancel_key_data) => {
|
||||
return Ok(cancel_map
|
||||
.cancel_session(cancel_key_data)
|
||||
return Ok(cancellation_handler
|
||||
.cancel_session(cancel_key_data, ctx.session_id)
|
||||
.await
|
||||
.map(|()| None)?)
|
||||
}
|
||||
@@ -315,7 +315,7 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
.or_else(|e| stream.throw_error(e))
|
||||
.await?;
|
||||
|
||||
let session = cancel_map.get_session();
|
||||
let session = cancellation_handler.get_session();
|
||||
prepare_client_connection(&node, &session, &mut stream).await?;
|
||||
|
||||
// Before proxy passing, forward to compute whatever data is left in the
|
||||
@@ -331,6 +331,7 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
compute: node,
|
||||
req: _request_gauge,
|
||||
conn: _client_gauge,
|
||||
cancel: session,
|
||||
}))
|
||||
}
|
||||
|
||||
|
||||
@@ -122,25 +122,24 @@ where
|
||||
|
||||
error!(error = ?err, "could not connect to compute node");
|
||||
|
||||
let node_info =
|
||||
if err.get_error_kind() == crate::error::ErrorKind::Postgres || !node_info.cached() {
|
||||
// If the error is Postgres, that means that we managed to connect to the compute node, but there was an error.
|
||||
// Do not need to retrieve a new node_info, just return the old one.
|
||||
if !err.should_retry(num_retries) {
|
||||
return Err(err.into());
|
||||
}
|
||||
node_info
|
||||
} else {
|
||||
// if we failed to connect, it's likely that the compute node was suspended, wake a new compute node
|
||||
info!("compute node's state has likely changed; requesting a wake-up");
|
||||
ctx.latency_timer.cache_miss();
|
||||
let old_node_info = invalidate_cache(node_info);
|
||||
let mut node_info = wake_compute(&mut num_retries, ctx, user_info).await?;
|
||||
node_info.reuse_settings(old_node_info);
|
||||
let node_info = if !node_info.cached() {
|
||||
// If we just recieved this from cplane and dodn't get it from cache, we shouldn't retry.
|
||||
// Do not need to retrieve a new node_info, just return the old one.
|
||||
if !err.should_retry(num_retries) {
|
||||
return Err(err.into());
|
||||
}
|
||||
node_info
|
||||
} else {
|
||||
// if we failed to connect, it's likely that the compute node was suspended, wake a new compute node
|
||||
info!("compute node's state has likely changed; requesting a wake-up");
|
||||
ctx.latency_timer.cache_miss();
|
||||
let old_node_info = invalidate_cache(node_info);
|
||||
let mut node_info = wake_compute(&mut num_retries, ctx, user_info).await?;
|
||||
node_info.reuse_settings(old_node_info);
|
||||
|
||||
mechanism.update_connect_config(&mut node_info.config);
|
||||
node_info
|
||||
};
|
||||
mechanism.update_connect_config(&mut node_info.config);
|
||||
node_info
|
||||
};
|
||||
|
||||
// now that we have a new node, try connect to it repeatedly.
|
||||
// this can error for a few reasons, for instance:
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use crate::{
|
||||
cancellation,
|
||||
compute::PostgresConnection,
|
||||
console::messages::MetricsAuxInfo,
|
||||
metrics::NUM_BYTES_PROXIED_COUNTER,
|
||||
@@ -57,6 +58,7 @@ pub struct ProxyPassthrough<S> {
|
||||
|
||||
pub req: IntCounterPairGuard,
|
||||
pub conn: IntCounterPairGuard,
|
||||
pub cancel: cancellation::Session,
|
||||
}
|
||||
|
||||
impl<S: AsyncRead + AsyncWrite + Unpin> ProxyPassthrough<S> {
|
||||
|
||||
@@ -375,8 +375,6 @@ enum ConnectAction {
|
||||
Connect,
|
||||
Retry,
|
||||
Fail,
|
||||
RetryPg,
|
||||
FailPg,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -466,14 +464,6 @@ impl ConnectMechanism for TestConnectMechanism {
|
||||
retryable: false,
|
||||
kind: ErrorKind::Compute,
|
||||
}),
|
||||
ConnectAction::FailPg => Err(TestConnectError {
|
||||
retryable: false,
|
||||
kind: ErrorKind::Postgres,
|
||||
}),
|
||||
ConnectAction::RetryPg => Err(TestConnectError {
|
||||
retryable: true,
|
||||
kind: ErrorKind::Postgres,
|
||||
}),
|
||||
x => panic!("expecting action {:?}, connect is called instead", x),
|
||||
}
|
||||
}
|
||||
@@ -572,32 +562,6 @@ async fn connect_to_compute_retry() {
|
||||
mechanism.verify();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn connect_to_compute_retry_pg() {
|
||||
let _ = env_logger::try_init();
|
||||
use ConnectAction::*;
|
||||
let mut ctx = RequestMonitoring::test();
|
||||
let mechanism = TestConnectMechanism::new(vec![Wake, RetryPg, Connect]);
|
||||
let user_info = helper_create_connect_info(&mechanism);
|
||||
connect_to_compute(&mut ctx, &mechanism, &user_info, false)
|
||||
.await
|
||||
.unwrap();
|
||||
mechanism.verify();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn connect_to_compute_fail_pg() {
|
||||
let _ = env_logger::try_init();
|
||||
use ConnectAction::*;
|
||||
let mut ctx = RequestMonitoring::test();
|
||||
let mechanism = TestConnectMechanism::new(vec![Wake, FailPg]);
|
||||
let user_info = helper_create_connect_info(&mechanism);
|
||||
connect_to_compute(&mut ctx, &mechanism, &user_info, false)
|
||||
.await
|
||||
.unwrap_err();
|
||||
mechanism.verify();
|
||||
}
|
||||
|
||||
/// Test that we don't retry if the error is not retryable.
|
||||
#[tokio::test]
|
||||
async fn connect_to_compute_non_retry_1() {
|
||||
|
||||
@@ -4,4 +4,4 @@ mod limiter;
|
||||
pub use aimd::Aimd;
|
||||
pub use limit_algorithm::{AimdConfig, Fixed, RateLimitAlgorithm, RateLimiterConfig};
|
||||
pub use limiter::Limiter;
|
||||
pub use limiter::{EndpointRateLimiter, RateBucketInfo};
|
||||
pub use limiter::{EndpointRateLimiter, RateBucketInfo, RedisRateLimiter};
|
||||
|
||||
@@ -22,6 +22,44 @@ use super::{
|
||||
RateLimiterConfig,
|
||||
};
|
||||
|
||||
pub struct RedisRateLimiter {
|
||||
data: Vec<RateBucket>,
|
||||
info: &'static [RateBucketInfo],
|
||||
}
|
||||
|
||||
impl RedisRateLimiter {
|
||||
pub fn new(info: &'static [RateBucketInfo]) -> Self {
|
||||
Self {
|
||||
data: vec![
|
||||
RateBucket {
|
||||
start: Instant::now(),
|
||||
count: 0,
|
||||
};
|
||||
info.len()
|
||||
],
|
||||
info,
|
||||
}
|
||||
}
|
||||
|
||||
/// Check that number of connections is below `max_rps` rps.
|
||||
pub fn check(&mut self) -> bool {
|
||||
let now = Instant::now();
|
||||
|
||||
let should_allow_request = self
|
||||
.data
|
||||
.iter_mut()
|
||||
.zip(self.info)
|
||||
.all(|(bucket, info)| bucket.should_allow_request(info, now));
|
||||
|
||||
if should_allow_request {
|
||||
// only increment the bucket counts if the request will actually be accepted
|
||||
self.data.iter_mut().for_each(RateBucket::inc);
|
||||
}
|
||||
|
||||
should_allow_request
|
||||
}
|
||||
}
|
||||
|
||||
// Simple per-endpoint rate limiter.
|
||||
//
|
||||
// Check that number of connections to the endpoint is below `max_rps` rps.
|
||||
|
||||
@@ -1 +1,2 @@
|
||||
pub mod notifications;
|
||||
pub mod publisher;
|
||||
|
||||
@@ -1,38 +1,44 @@
|
||||
use std::{convert::Infallible, sync::Arc};
|
||||
|
||||
use futures::StreamExt;
|
||||
use pq_proto::CancelKeyData;
|
||||
use redis::aio::PubSub;
|
||||
use serde::Deserialize;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
cache::project_info::ProjectInfoCache,
|
||||
cancellation::{CancelMap, CancellationHandler, NotificationsCancellationHandler},
|
||||
intern::{ProjectIdInt, RoleNameInt},
|
||||
};
|
||||
|
||||
const CHANNEL_NAME: &str = "neondb-proxy-ws-updates";
|
||||
const CPLANE_CHANNEL_NAME: &str = "neondb-proxy-ws-updates";
|
||||
pub(crate) const PROXY_CHANNEL_NAME: &str = "neondb-proxy-to-proxy-updates";
|
||||
const RECONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(20);
|
||||
const INVALIDATION_LAG: std::time::Duration = std::time::Duration::from_secs(20);
|
||||
|
||||
struct ConsoleRedisClient {
|
||||
struct RedisConsumerClient {
|
||||
client: redis::Client,
|
||||
}
|
||||
|
||||
impl ConsoleRedisClient {
|
||||
impl RedisConsumerClient {
|
||||
pub fn new(url: &str) -> anyhow::Result<Self> {
|
||||
let client = redis::Client::open(url)?;
|
||||
Ok(Self { client })
|
||||
}
|
||||
async fn try_connect(&self) -> anyhow::Result<PubSub> {
|
||||
let mut conn = self.client.get_async_connection().await?.into_pubsub();
|
||||
tracing::info!("subscribing to a channel `{CHANNEL_NAME}`");
|
||||
conn.subscribe(CHANNEL_NAME).await?;
|
||||
tracing::info!("subscribing to a channel `{CPLANE_CHANNEL_NAME}`");
|
||||
conn.subscribe(CPLANE_CHANNEL_NAME).await?;
|
||||
tracing::info!("subscribing to a channel `{PROXY_CHANNEL_NAME}`");
|
||||
conn.subscribe(PROXY_CHANNEL_NAME).await?;
|
||||
Ok(conn)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
|
||||
#[serde(tag = "topic", content = "data")]
|
||||
enum Notification {
|
||||
pub(crate) enum Notification {
|
||||
#[serde(
|
||||
rename = "/allowed_ips_updated",
|
||||
deserialize_with = "deserialize_json_string"
|
||||
@@ -45,16 +51,25 @@ enum Notification {
|
||||
deserialize_with = "deserialize_json_string"
|
||||
)]
|
||||
PasswordUpdate { password_update: PasswordUpdate },
|
||||
#[serde(rename = "/cancel_session")]
|
||||
Cancel(CancelSession),
|
||||
}
|
||||
#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
|
||||
struct AllowedIpsUpdate {
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
|
||||
pub(crate) struct AllowedIpsUpdate {
|
||||
project_id: ProjectIdInt,
|
||||
}
|
||||
#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
|
||||
struct PasswordUpdate {
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
|
||||
pub(crate) struct PasswordUpdate {
|
||||
project_id: ProjectIdInt,
|
||||
role_name: RoleNameInt,
|
||||
}
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
|
||||
pub(crate) struct CancelSession {
|
||||
pub region_id: Option<String>,
|
||||
pub cancel_key_data: CancelKeyData,
|
||||
pub session_id: Uuid,
|
||||
}
|
||||
|
||||
fn deserialize_json_string<'de, D, T>(deserializer: D) -> Result<T, D::Error>
|
||||
where
|
||||
T: for<'de2> serde::Deserialize<'de2>,
|
||||
@@ -64,6 +79,88 @@ where
|
||||
serde_json::from_str(&s).map_err(<D::Error as serde::de::Error>::custom)
|
||||
}
|
||||
|
||||
struct MessageHandler<
|
||||
C: ProjectInfoCache + Send + Sync + 'static,
|
||||
H: NotificationsCancellationHandler + Send + Sync + 'static,
|
||||
> {
|
||||
cache: Arc<C>,
|
||||
cancellation_handler: Arc<H>,
|
||||
region_id: String,
|
||||
}
|
||||
|
||||
impl<
|
||||
C: ProjectInfoCache + Send + Sync + 'static,
|
||||
H: NotificationsCancellationHandler + Send + Sync + 'static,
|
||||
> MessageHandler<C, H>
|
||||
{
|
||||
pub fn new(cache: Arc<C>, cancellation_handler: Arc<H>, region_id: String) -> Self {
|
||||
Self {
|
||||
cache,
|
||||
cancellation_handler,
|
||||
region_id,
|
||||
}
|
||||
}
|
||||
pub fn disable_ttl(&self) {
|
||||
self.cache.disable_ttl();
|
||||
}
|
||||
pub fn enable_ttl(&self) {
|
||||
self.cache.enable_ttl();
|
||||
}
|
||||
#[tracing::instrument(skip(self, msg), fields(session_id = tracing::field::Empty))]
|
||||
async fn handle_message(&self, msg: redis::Msg) -> anyhow::Result<()> {
|
||||
use Notification::*;
|
||||
let payload: String = msg.get_payload()?;
|
||||
tracing::debug!(?payload, "received a message payload");
|
||||
|
||||
let msg: Notification = match serde_json::from_str(&payload) {
|
||||
Ok(msg) => msg,
|
||||
Err(e) => {
|
||||
tracing::error!("broken message: {e}");
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
tracing::debug!(?msg, "received a message");
|
||||
match msg {
|
||||
Cancel(cancel_session) => {
|
||||
tracing::Span::current().record(
|
||||
"session_id",
|
||||
&tracing::field::display(cancel_session.session_id),
|
||||
);
|
||||
if let Some(cancel_region) = cancel_session.region_id {
|
||||
// If the message is not for this region, ignore it.
|
||||
if cancel_region != self.region_id {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
// This instance of cancellation_handler doesn't have a RedisPublisherClient so it can't publish the message.
|
||||
match self
|
||||
.cancellation_handler
|
||||
.cancel_session_no_publish(cancel_session.cancel_key_data)
|
||||
.await
|
||||
{
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
tracing::error!("failed to cancel session: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
invalidate_cache(self.cache.clone(), msg.clone());
|
||||
// It might happen that the invalid entry is on the way to be cached.
|
||||
// To make sure that the entry is invalidated, let's repeat the invalidation in INVALIDATION_LAG seconds.
|
||||
// TODO: include the version (or the timestamp) in the message and invalidate only if the entry is cached before the message.
|
||||
let cache = self.cache.clone();
|
||||
tokio::spawn(async move {
|
||||
tokio::time::sleep(INVALIDATION_LAG).await;
|
||||
invalidate_cache(cache, msg);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn invalidate_cache<C: ProjectInfoCache>(cache: Arc<C>, msg: Notification) {
|
||||
use Notification::*;
|
||||
match msg {
|
||||
@@ -74,50 +171,33 @@ fn invalidate_cache<C: ProjectInfoCache>(cache: Arc<C>, msg: Notification) {
|
||||
password_update.project_id,
|
||||
password_update.role_name,
|
||||
),
|
||||
Cancel(_) => unreachable!("cancel message should be handled separately"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(cache))]
|
||||
fn handle_message<C>(msg: redis::Msg, cache: Arc<C>) -> anyhow::Result<()>
|
||||
where
|
||||
C: ProjectInfoCache + Send + Sync + 'static,
|
||||
{
|
||||
let payload: String = msg.get_payload()?;
|
||||
tracing::debug!(?payload, "received a message payload");
|
||||
|
||||
let msg: Notification = match serde_json::from_str(&payload) {
|
||||
Ok(msg) => msg,
|
||||
Err(e) => {
|
||||
tracing::error!("broken message: {e}");
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
tracing::debug!(?msg, "received a message");
|
||||
invalidate_cache(cache.clone(), msg.clone());
|
||||
// It might happen that the invalid entry is on the way to be cached.
|
||||
// To make sure that the entry is invalidated, let's repeat the invalidation in INVALIDATION_LAG seconds.
|
||||
// TODO: include the version (or the timestamp) in the message and invalidate only if the entry is cached before the message.
|
||||
tokio::spawn(async move {
|
||||
tokio::time::sleep(INVALIDATION_LAG).await;
|
||||
invalidate_cache(cache, msg.clone());
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Handle console's invalidation messages.
|
||||
#[tracing::instrument(name = "console_notifications", skip_all)]
|
||||
pub async fn task_main<C>(url: String, cache: Arc<C>) -> anyhow::Result<Infallible>
|
||||
pub async fn task_main<C>(
|
||||
url: String,
|
||||
cache: Arc<C>,
|
||||
cancel_map: CancelMap,
|
||||
region_id: String,
|
||||
) -> anyhow::Result<Infallible>
|
||||
where
|
||||
C: ProjectInfoCache + Send + Sync + 'static,
|
||||
{
|
||||
cache.enable_ttl();
|
||||
let handler = MessageHandler::new(
|
||||
cache,
|
||||
Arc::new(CancellationHandler::new(cancel_map, None)),
|
||||
region_id,
|
||||
);
|
||||
|
||||
loop {
|
||||
let redis = ConsoleRedisClient::new(&url)?;
|
||||
let redis = RedisConsumerClient::new(&url)?;
|
||||
let conn = match redis.try_connect().await {
|
||||
Ok(conn) => {
|
||||
cache.disable_ttl();
|
||||
handler.disable_ttl();
|
||||
conn
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -130,7 +210,7 @@ where
|
||||
};
|
||||
let mut stream = conn.into_on_message();
|
||||
while let Some(msg) = stream.next().await {
|
||||
match handle_message(msg, cache.clone()) {
|
||||
match handler.handle_message(msg).await {
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
tracing::error!("failed to handle message: {e}, will try to reconnect");
|
||||
@@ -138,7 +218,7 @@ where
|
||||
}
|
||||
}
|
||||
}
|
||||
cache.enable_ttl();
|
||||
handler.enable_ttl();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -198,6 +278,33 @@ mod tests {
|
||||
}
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
#[test]
|
||||
fn parse_cancel_session() -> anyhow::Result<()> {
|
||||
let cancel_key_data = CancelKeyData {
|
||||
backend_pid: 42,
|
||||
cancel_key: 41,
|
||||
};
|
||||
let uuid = uuid::Uuid::new_v4();
|
||||
let msg = Notification::Cancel(CancelSession {
|
||||
cancel_key_data,
|
||||
region_id: None,
|
||||
session_id: uuid,
|
||||
});
|
||||
let text = serde_json::to_string(&msg)?;
|
||||
let result: Notification = serde_json::from_str(&text)?;
|
||||
assert_eq!(msg, result);
|
||||
|
||||
let msg = Notification::Cancel(CancelSession {
|
||||
cancel_key_data,
|
||||
region_id: Some("region".to_string()),
|
||||
session_id: uuid,
|
||||
});
|
||||
let text = serde_json::to_string(&msg)?;
|
||||
let result: Notification = serde_json::from_str(&text)?;
|
||||
assert_eq!(msg, result,);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
80
proxy/src/redis/publisher.rs
Normal file
80
proxy/src/redis/publisher.rs
Normal file
@@ -0,0 +1,80 @@
|
||||
use pq_proto::CancelKeyData;
|
||||
use redis::AsyncCommands;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::rate_limiter::{RateBucketInfo, RedisRateLimiter};
|
||||
|
||||
use super::notifications::{CancelSession, Notification, PROXY_CHANNEL_NAME};
|
||||
|
||||
pub struct RedisPublisherClient {
|
||||
client: redis::Client,
|
||||
publisher: Option<redis::aio::Connection>,
|
||||
region_id: String,
|
||||
limiter: RedisRateLimiter,
|
||||
}
|
||||
|
||||
impl RedisPublisherClient {
|
||||
pub fn new(
|
||||
url: &str,
|
||||
region_id: String,
|
||||
info: &'static [RateBucketInfo],
|
||||
) -> anyhow::Result<Self> {
|
||||
let client = redis::Client::open(url)?;
|
||||
Ok(Self {
|
||||
client,
|
||||
publisher: None,
|
||||
region_id,
|
||||
limiter: RedisRateLimiter::new(info),
|
||||
})
|
||||
}
|
||||
pub async fn try_publish(
|
||||
&mut self,
|
||||
cancel_key_data: CancelKeyData,
|
||||
session_id: Uuid,
|
||||
) -> anyhow::Result<()> {
|
||||
if !self.limiter.check() {
|
||||
tracing::info!("Rate limit exceeded. Skipping cancellation message");
|
||||
return Err(anyhow::anyhow!("Rate limit exceeded"));
|
||||
}
|
||||
match self.publish(cancel_key_data, session_id).await {
|
||||
Ok(()) => return Ok(()),
|
||||
Err(e) => {
|
||||
tracing::error!("failed to publish a message: {e}");
|
||||
self.publisher = None;
|
||||
}
|
||||
}
|
||||
tracing::info!("Publisher is disconnected. Reconnectiong...");
|
||||
self.try_connect().await?;
|
||||
self.publish(cancel_key_data, session_id).await
|
||||
}
|
||||
|
||||
async fn publish(
|
||||
&mut self,
|
||||
cancel_key_data: CancelKeyData,
|
||||
session_id: Uuid,
|
||||
) -> anyhow::Result<()> {
|
||||
let conn = self
|
||||
.publisher
|
||||
.as_mut()
|
||||
.ok_or_else(|| anyhow::anyhow!("not connected"))?;
|
||||
let payload = serde_json::to_string(&Notification::Cancel(CancelSession {
|
||||
region_id: Some(self.region_id.clone()),
|
||||
cancel_key_data,
|
||||
session_id,
|
||||
}))?;
|
||||
conn.publish(PROXY_CHANNEL_NAME, payload).await?;
|
||||
Ok(())
|
||||
}
|
||||
pub async fn try_connect(&mut self) -> anyhow::Result<()> {
|
||||
match self.client.get_async_connection().await {
|
||||
Ok(conn) => {
|
||||
self.publisher = Some(conn);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("failed to connect to redis: {e}");
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -24,7 +24,7 @@ use crate::metrics::NUM_CLIENT_CONNECTION_GAUGE;
|
||||
use crate::protocol2::{ProxyProtocolAccept, WithClientIp};
|
||||
use crate::rate_limiter::EndpointRateLimiter;
|
||||
use crate::serverless::backend::PoolingBackend;
|
||||
use crate::{cancellation::CancelMap, config::ProxyConfig};
|
||||
use crate::{cancellation::CancellationHandler, config::ProxyConfig};
|
||||
use futures::StreamExt;
|
||||
use hyper::{
|
||||
server::{
|
||||
@@ -50,6 +50,7 @@ pub async fn task_main(
|
||||
ws_listener: TcpListener,
|
||||
cancellation_token: CancellationToken,
|
||||
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
cancellation_handler: Arc<CancellationHandler>,
|
||||
) -> anyhow::Result<()> {
|
||||
scopeguard::defer! {
|
||||
info!("websocket server has shut down");
|
||||
@@ -115,7 +116,7 @@ pub async fn task_main(
|
||||
let backend = backend.clone();
|
||||
let ws_connections = ws_connections.clone();
|
||||
let endpoint_rate_limiter = endpoint_rate_limiter.clone();
|
||||
|
||||
let cancellation_handler = cancellation_handler.clone();
|
||||
async move {
|
||||
let peer_addr = match client_addr {
|
||||
Some(addr) => addr,
|
||||
@@ -127,9 +128,9 @@ pub async fn task_main(
|
||||
let backend = backend.clone();
|
||||
let ws_connections = ws_connections.clone();
|
||||
let endpoint_rate_limiter = endpoint_rate_limiter.clone();
|
||||
let cancellation_handler = cancellation_handler.clone();
|
||||
|
||||
async move {
|
||||
let cancel_map = Arc::new(CancelMap::default());
|
||||
let session_id = uuid::Uuid::new_v4();
|
||||
|
||||
request_handler(
|
||||
@@ -137,7 +138,7 @@ pub async fn task_main(
|
||||
config,
|
||||
backend,
|
||||
ws_connections,
|
||||
cancel_map,
|
||||
cancellation_handler,
|
||||
session_id,
|
||||
peer_addr.ip(),
|
||||
endpoint_rate_limiter,
|
||||
@@ -205,7 +206,7 @@ async fn request_handler(
|
||||
config: &'static ProxyConfig,
|
||||
backend: Arc<PoolingBackend>,
|
||||
ws_connections: TaskTracker,
|
||||
cancel_map: Arc<CancelMap>,
|
||||
cancellation_handler: Arc<CancellationHandler>,
|
||||
session_id: uuid::Uuid,
|
||||
peer_addr: IpAddr,
|
||||
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
@@ -232,7 +233,7 @@ async fn request_handler(
|
||||
config,
|
||||
ctx,
|
||||
websocket,
|
||||
cancel_map,
|
||||
cancellation_handler,
|
||||
host,
|
||||
endpoint_rate_limiter,
|
||||
)
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use crate::{
|
||||
cancellation::CancelMap,
|
||||
cancellation::CancellationHandler,
|
||||
config::ProxyConfig,
|
||||
context::RequestMonitoring,
|
||||
error::{io_error, ReportableError},
|
||||
@@ -133,7 +133,7 @@ pub async fn serve_websocket(
|
||||
config: &'static ProxyConfig,
|
||||
mut ctx: RequestMonitoring,
|
||||
websocket: HyperWebsocket,
|
||||
cancel_map: Arc<CancelMap>,
|
||||
cancellation_handler: Arc<CancellationHandler>,
|
||||
hostname: Option<String>,
|
||||
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
) -> anyhow::Result<()> {
|
||||
@@ -141,7 +141,7 @@ pub async fn serve_websocket(
|
||||
let res = handle_client(
|
||||
config,
|
||||
&mut ctx,
|
||||
cancel_map,
|
||||
cancellation_handler,
|
||||
WebSocketRw::new(websocket),
|
||||
ClientMode::Websockets { hostname },
|
||||
endpoint_rate_limiter,
|
||||
|
||||
@@ -1,11 +1,7 @@
|
||||
#![allow(unused)]
|
||||
|
||||
use std::str::FromStr;
|
||||
use std::time::Duration;
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use hex::FromHex;
|
||||
use pageserver::tenant::Tenant;
|
||||
use reqwest::{header, Client, StatusCode, Url};
|
||||
use serde::Deserialize;
|
||||
use tokio::sync::Semaphore;
|
||||
@@ -290,7 +286,7 @@ impl CloudAdminApiClient {
|
||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||
continue;
|
||||
}
|
||||
status => {
|
||||
_status => {
|
||||
return Err(Error::new(
|
||||
"List active projects".to_string(),
|
||||
ErrorKind::ResponseStatus(response.status()),
|
||||
|
||||
@@ -116,6 +116,9 @@ pub struct SharedState {
|
||||
/// when tli is inactive instead of having this flag.
|
||||
active: bool,
|
||||
last_removed_segno: XLogSegNo,
|
||||
/// True if local and remote deletion has been done, allows to skip visiting
|
||||
/// s3 on retries.
|
||||
fully_deleted: bool,
|
||||
}
|
||||
|
||||
impl SharedState {
|
||||
@@ -155,6 +158,7 @@ impl SharedState {
|
||||
wal_backup_active: false,
|
||||
active: false,
|
||||
last_removed_segno: 0,
|
||||
fully_deleted: false,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -174,6 +178,7 @@ impl SharedState {
|
||||
wal_backup_active: false,
|
||||
active: false,
|
||||
last_removed_segno: 0,
|
||||
fully_deleted: false,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -482,6 +487,10 @@ impl Timeline {
|
||||
shared_state: &mut MutexGuard<'_, SharedState>,
|
||||
only_local: bool,
|
||||
) -> Result<(bool, bool)> {
|
||||
if shared_state.fully_deleted {
|
||||
return Ok((false, false));
|
||||
};
|
||||
|
||||
let was_active = shared_state.active;
|
||||
self.cancel(shared_state);
|
||||
|
||||
@@ -496,6 +505,9 @@ impl Timeline {
|
||||
wal_backup::delete_timeline(&self.ttid).await?;
|
||||
}
|
||||
let dir_existed = delete_dir(&self.timeline_dir).await?;
|
||||
if !only_local {
|
||||
shared_state.fully_deleted = true;
|
||||
}
|
||||
Ok((dir_existed, was_active))
|
||||
}
|
||||
|
||||
|
||||
@@ -96,5 +96,6 @@ PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = (
|
||||
"pageserver_evictions_total",
|
||||
"pageserver_evictions_with_low_residence_duration_total",
|
||||
*PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS,
|
||||
# "pageserver_directory_entries_count", -- only used if above a certain threshold
|
||||
# "pageserver_broken_tenants_count" -- used only for broken
|
||||
)
|
||||
|
||||
@@ -3967,27 +3967,24 @@ def list_files_to_compare(pgdata_dir: Path) -> List[str]:
|
||||
|
||||
# pg is the existing and running compute node, that we want to compare with a basebackup
|
||||
def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, endpoint: Endpoint):
|
||||
pg_bin = PgBin(test_output_dir, env.pg_distrib_dir, env.pg_version)
|
||||
|
||||
# Get the timeline ID. We need it for the 'basebackup' command
|
||||
timeline_id = TimelineId(endpoint.safe_psql("SHOW neon.timeline_id")[0][0])
|
||||
|
||||
# many tests already checkpoint, but do it just in case
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("CHECKPOINT")
|
||||
|
||||
# wait for pageserver to catch up
|
||||
wait_for_last_flush_lsn(env, endpoint, endpoint.tenant_id, timeline_id)
|
||||
# stop postgres to ensure that files won't change
|
||||
endpoint.stop()
|
||||
|
||||
# Read the shutdown checkpoint's LSN
|
||||
pg_controldata_path = os.path.join(pg_bin.pg_bin_path, "pg_controldata")
|
||||
cmd = f"{pg_controldata_path} -D {endpoint.pgdata_dir}"
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, shell=True)
|
||||
checkpoint_lsn = re.findall(
|
||||
"Latest checkpoint location:\\s+([0-9A-F]+/[0-9A-F]+)", result.stdout
|
||||
)[0]
|
||||
log.debug(f"last checkpoint at {checkpoint_lsn}")
|
||||
|
||||
# Take a basebackup from pageserver
|
||||
restored_dir_path = env.repo_dir / f"{endpoint.endpoint_id}_restored_datadir"
|
||||
restored_dir_path.mkdir(exist_ok=True)
|
||||
|
||||
pg_bin = PgBin(test_output_dir, env.pg_distrib_dir, env.pg_version)
|
||||
psql_path = os.path.join(pg_bin.pg_bin_path, "psql")
|
||||
|
||||
pageserver_id = env.attachment_service.locate(endpoint.tenant_id)[0]["node_id"]
|
||||
@@ -3995,7 +3992,7 @@ def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, endpoint
|
||||
{psql_path} \
|
||||
--no-psqlrc \
|
||||
postgres://localhost:{env.get_pageserver(pageserver_id).service_port.pg} \
|
||||
-c 'basebackup {endpoint.tenant_id} {timeline_id} {checkpoint_lsn}' \
|
||||
-c 'basebackup {endpoint.tenant_id} {timeline_id}' \
|
||||
| tar -x -C {restored_dir_path}
|
||||
"""
|
||||
|
||||
|
||||
@@ -893,37 +893,14 @@ def test_secondary_mode_eviction(eviction_env_ha: EvictionEnv):
|
||||
# in its heatmap
|
||||
ps_secondary.http_client().tenant_secondary_download(tenant_id)
|
||||
|
||||
# Configure the secondary pageserver to have a phony small disk size
|
||||
ps_secondary.stop()
|
||||
total_size, _, _ = env.timelines_du(ps_secondary)
|
||||
blocksize = 512
|
||||
total_blocks = (total_size + (blocksize - 1)) // blocksize
|
||||
evict_bytes = total_size // 3
|
||||
|
||||
min_avail_bytes = total_size // 3
|
||||
|
||||
env.pageserver_start_with_disk_usage_eviction(
|
||||
ps_secondary,
|
||||
period="1s",
|
||||
max_usage_pct=100,
|
||||
min_avail_bytes=min_avail_bytes,
|
||||
mock_behavior={
|
||||
"type": "Success",
|
||||
"blocksize": blocksize,
|
||||
"total_blocks": total_blocks,
|
||||
# Only count layer files towards used bytes in the mock_statvfs.
|
||||
# This avoids accounting for metadata files & tenant conf in the tests.
|
||||
"name_filter": ".*__.*",
|
||||
},
|
||||
eviction_order=EvictionOrder.ABSOLUTE_ORDER,
|
||||
)
|
||||
|
||||
def relieved_log_message():
|
||||
assert ps_secondary.log_contains(".*disk usage pressure relieved")
|
||||
|
||||
wait_until(10, 1, relieved_log_message)
|
||||
response = ps_secondary.http_client().disk_usage_eviction_run({"evict_bytes": evict_bytes})
|
||||
log.info(f"{response}")
|
||||
|
||||
post_eviction_total_size, _, _ = env.timelines_du(ps_secondary)
|
||||
|
||||
assert (
|
||||
total_size - post_eviction_total_size >= min_avail_bytes
|
||||
), "we requested at least min_avail_bytes worth of free space"
|
||||
total_size - post_eviction_total_size >= evict_bytes
|
||||
), "we requested at least evict_bytes worth of free space"
|
||||
|
||||
28
test_runner/sql_regress/expected/neon-test-utils.out
Normal file
28
test_runner/sql_regress/expected/neon-test-utils.out
Normal file
@@ -0,0 +1,28 @@
|
||||
-- Test the test utils in pgxn/neon_test_utils. We don't test that
|
||||
-- these actually consume resources like they should - that would be
|
||||
-- tricky - but at least we check that they don't crash.
|
||||
CREATE EXTENSION neon_test_utils;
|
||||
select test_consume_cpu(1);
|
||||
test_consume_cpu
|
||||
------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
select test_consume_memory(20); -- Allocate 20 MB
|
||||
test_consume_memory
|
||||
---------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
select test_release_memory(5); -- Release 5 MB
|
||||
test_release_memory
|
||||
---------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
select test_release_memory(); -- Release the remaining 15 MB
|
||||
test_release_memory
|
||||
---------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
@@ -7,4 +7,5 @@
|
||||
test: neon-cid
|
||||
test: neon-rel-truncate
|
||||
test: neon-clog
|
||||
test: neon-test-utils
|
||||
test: neon-vacuum-full
|
||||
|
||||
11
test_runner/sql_regress/sql/neon-test-utils.sql
Normal file
11
test_runner/sql_regress/sql/neon-test-utils.sql
Normal file
@@ -0,0 +1,11 @@
|
||||
-- Test the test utils in pgxn/neon_test_utils. We don't test that
|
||||
-- these actually consume resources like they should - that would be
|
||||
-- tricky - but at least we check that they don't crash.
|
||||
|
||||
CREATE EXTENSION neon_test_utils;
|
||||
|
||||
select test_consume_cpu(1);
|
||||
|
||||
select test_consume_memory(20); -- Allocate 20 MB
|
||||
select test_release_memory(5); -- Release 5 MB
|
||||
select test_release_memory(); -- Release the remaining 15 MB
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 018fb05201...9dd9956c55
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 6ee78a3c29...ca2def9993
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 550cdd26d4...9c37a49884
6
vendor/revisions.json
vendored
6
vendor/revisions.json
vendored
@@ -1,5 +1,5 @@
|
||||
{
|
||||
"postgres-v16": "550cdd26d445afdd26b15aa93c8c2f3dc52f8361",
|
||||
"postgres-v15": "6ee78a3c29e33cafd85ba09568b6b5eb031d29b9",
|
||||
"postgres-v14": "018fb052011081dc2733d3118d12e5c36df6eba1"
|
||||
"postgres-v16": "9c37a4988463a97d9cacb321acf3828b09823269",
|
||||
"postgres-v15": "ca2def999368d9df098a637234ad5a9003189463",
|
||||
"postgres-v14": "9dd9956c55ffbbd9abe77d10382453757fedfcf5"
|
||||
}
|
||||
|
||||
@@ -38,7 +38,7 @@ futures-io = { version = "0.3" }
|
||||
futures-sink = { version = "0.3" }
|
||||
futures-util = { version = "0.3", features = ["channel", "io", "sink"] }
|
||||
getrandom = { version = "0.2", default-features = false, features = ["std"] }
|
||||
hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", default-features = false, features = ["raw"] }
|
||||
hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", features = ["raw"] }
|
||||
hashbrown-594e8ee84c453af0 = { package = "hashbrown", version = "0.13", features = ["raw"] }
|
||||
hex = { version = "0.4", features = ["serde"] }
|
||||
hmac = { version = "0.12", default-features = false, features = ["reset"] }
|
||||
@@ -91,7 +91,7 @@ cc = { version = "1", default-features = false, features = ["parallel"] }
|
||||
chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "wasmbind"] }
|
||||
either = { version = "1" }
|
||||
getrandom = { version = "0.2", default-features = false, features = ["std"] }
|
||||
hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", default-features = false, features = ["raw"] }
|
||||
hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", features = ["raw"] }
|
||||
indexmap = { version = "1", default-features = false, features = ["std"] }
|
||||
itertools = { version = "0.10" }
|
||||
libc = { version = "0.2", features = ["extra_traits", "use_std"] }
|
||||
|
||||
Reference in New Issue
Block a user