Compare commits

..

2 Commits

Author SHA1 Message Date
Conrad Ludgate
0d74bc0492 fix exit condition 2024-02-14 15:56:50 +00:00
Conrad Ludgate
c994c80962 add redis stream test 2024-02-14 14:54:01 +00:00
39 changed files with 238 additions and 599 deletions

View File

@@ -820,10 +820,6 @@ RUN make -j $(getconf _NPROCESSORS_ONLN) \
PG_CONFIG=/usr/local/pgsql/bin/pg_config \
-C pgxn/neon_utils \
-s install && \
make -j $(getconf _NPROCESSORS_ONLN) \
PG_CONFIG=/usr/local/pgsql/bin/pg_config \
-C pgxn/neon_test_utils \
-s install && \
make -j $(getconf _NPROCESSORS_ONLN) \
PG_CONFIG=/usr/local/pgsql/bin/pg_config \
-C pgxn/neon_rmgr \

View File

@@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::env;
use std::fs;
use std::io::BufRead;
use std::os::unix::fs::{symlink, PermissionsExt};
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
use std::process::{Command, Stdio};
use std::str::FromStr;
@@ -634,48 +634,6 @@ impl ComputeNode {
// Update pg_hba.conf received with basebackup.
update_pg_hba(pgdata_path)?;
// Place pg_dynshmem under /dev/shm. This allows us to use
// 'dynamic_shared_memory_type = mmap' so that the files are placed in
// /dev/shm, similar to how 'dynamic_shared_memory_type = posix' works.
//
// Why on earth don't we just stick to the 'posix' default, you might
// ask. It turns out that making large allocations with 'posix' doesn't
// work very well with autoscaling. The behavior we want is that:
//
// 1. You can make large DSM allocations, larger than the current RAM
// size of the VM, without errors
//
// 2. If the allocated memory is really used, the VM is scaled up
// automatically to accommodate that
//
// We try to make that possible by having swap in the VM. But with the
// default 'posix' DSM implementation, we fail step 1, even when there's
// plenty of swap available. PostgreSQL uses posix_fallocate() to create
// the shmem segment, which is really just a file in /dev/shm in Linux,
// but posix_fallocate() on tmpfs returns ENOMEM if the size is larger
// than available RAM.
//
// Using 'dynamic_shared_memory_type = mmap' works around that, because
// the Postgres 'mmap' DSM implementation doesn't use
// posix_fallocate(). Instead, it uses repeated calls to write(2) to
// fill the file with zeros. It's weird that that differs between
// 'posix' and 'mmap', but we take advantage of it. When the file is
// filled slowly with write(2), the kernel allows it to grow larger, as
// long as there's swap available.
//
// In short, using 'dynamic_shared_memory_type = mmap' allows us one DSM
// segment to be larger than currently available RAM. But because we
// don't want to store it on a real file, which the kernel would try to
// flush to disk, so symlink pg_dynshm to /dev/shm.
//
// We don't set 'dynamic_shared_memory_type = mmap' here, we let the
// control plane control that option. If 'mmap' is not used, this
// symlink doesn't affect anything.
//
// See https://github.com/neondatabase/autoscaling/issues/800
std::fs::remove_dir(pgdata_path.join("pg_dynshmem"))?;
symlink("/dev/shm/", pgdata_path.join("pg_dynshmem"))?;
match spec.mode {
ComputeMode::Primary => {}
ComputeMode::Replica | ComputeMode::Static(..) => {

View File

@@ -381,6 +381,7 @@ impl Persistence {
//
// We create the child shards here, so that they will be available for increment_generation calls
// if some pageserver holding a child shard needs to restart before the overall tenant split is complete.
#[allow(dead_code)]
pub(crate) async fn begin_shard_split(
&self,
old_shard_count: ShardCount,
@@ -448,6 +449,7 @@ impl Persistence {
// When we finish shard splitting, we must atomically clean up the old shards
// and insert the new shards, and clear the splitting marker.
#[allow(dead_code)]
pub(crate) async fn complete_shard_split(
&self,
split_tenant_id: TenantId,

View File

@@ -115,6 +115,7 @@ pub fn set_build_info_metric(revision: &str, build_tag: &str) {
// performed by the process.
// We know the size of the block, so we can determine the I/O bytes out of it.
// The value might be not 100% exact, but should be fine for Prometheus metrics in this case.
#[allow(clippy::unnecessary_cast)]
fn update_rusage_metrics() {
let rusage_stats = get_rusage_stats();

View File

@@ -3,7 +3,7 @@
#![allow(non_snake_case)]
// bindgen creates some unsafe code with no doc comments.
#![allow(clippy::missing_safety_doc)]
// noted at 1.63 that in many cases there's u32 -> u32 transmutes in bindgen code.
// noted at 1.63 that in many cases there's a u32 -> u32 transmutes in bindgen code.
#![allow(clippy::useless_transmute)]
// modules included with the postgres_ffi macro depend on the types of the specific version's
// types, and trigger a too eager lint.

View File

@@ -435,6 +435,7 @@ impl RemoteStorage for LocalFs {
Ok(())
}
#[allow(clippy::diverging_sub_expression)]
async fn time_travel_recover(
&self,
_prefix: Option<&RemotePath>,

View File

@@ -1,3 +1,5 @@
#![allow(unused)]
use criterion::{criterion_group, criterion_main, Criterion};
use utils::id;

View File

@@ -834,6 +834,7 @@ mod test {
}
impl ControlPlaneGenerationsApi for MockControlPlane {
#[allow(clippy::diverging_sub_expression)] // False positive via async_trait
async fn re_attach(&self) -> Result<HashMap<TenantShardId, Generation>, RetryForeverError> {
unimplemented!()
}

View File

@@ -351,6 +351,7 @@ pub enum IterationOutcome<U> {
Finished(IterationOutcomeFinished<U>),
}
#[allow(dead_code)]
#[derive(Debug, Serialize)]
pub struct IterationOutcomeFinished<U> {
/// The actual usage observed before we started the iteration.
@@ -365,6 +366,7 @@ pub struct IterationOutcomeFinished<U> {
}
#[derive(Debug, Serialize)]
#[allow(dead_code)]
struct AssumedUsage<U> {
/// The expected value for `after`, after phase 2.
projected_after: U,
@@ -372,12 +374,14 @@ struct AssumedUsage<U> {
failed: LayerCount,
}
#[allow(dead_code)]
#[derive(Debug, Serialize)]
struct PlannedUsage<U> {
respecting_tenant_min_resident_size: U,
fallback_to_global_lru: Option<U>,
}
#[allow(dead_code)]
#[derive(Debug, Default, Serialize)]
struct LayerCount {
file_sizes: u64,
@@ -561,6 +565,7 @@ pub(crate) struct EvictionSecondaryLayer {
#[derive(Clone)]
pub(crate) enum EvictionLayer {
Attached(Layer),
#[allow(dead_code)]
Secondary(EvictionSecondaryLayer),
}
@@ -1100,6 +1105,7 @@ mod filesystem_level_usage {
use super::DiskUsageEvictionTaskConfig;
#[derive(Debug, Clone, Copy)]
#[allow(dead_code)]
pub struct Usage<'a> {
config: &'a DiskUsageEvictionTaskConfig,

View File

@@ -2214,7 +2214,7 @@ pub fn make_router(
)
.get(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/keyspace",
|r| api_handler(r, timeline_collect_keyspace),
|r| testing_api_handler("read out the keyspace", r, timeline_collect_keyspace),
)
.put("/v1/io_engine", |r| api_handler(r, put_io_engine_handler))
.any(handler_404))

View File

@@ -156,7 +156,6 @@ impl Timeline {
pending_updates: HashMap::new(),
pending_deletions: Vec::new(),
pending_nblocks: 0,
pending_aux_files: None,
pending_directory_entries: Vec::new(),
lsn,
}
@@ -871,14 +870,6 @@ pub struct DatadirModification<'a> {
pending_updates: HashMap<Key, Vec<(Lsn, Value)>>,
pending_deletions: Vec<(Range<Key>, Lsn)>,
pending_nblocks: i64,
// If we already wrote any aux file changes in this modification, stash the latest dir. If set,
// [`Self::put_file`] may assume that it is safe to emit a delta rather than checking
// if AUX_FILES_KEY is already set.
pending_aux_files: Option<AuxFilesDirectory>,
/// For special "directory" keys that store key-value maps, track the size of the map
/// if it was updated in this modification.
pending_directory_entries: Vec<(DirectoryKind, usize)>,
}
@@ -1393,76 +1384,31 @@ impl<'a> DatadirModification<'a> {
content: &[u8],
ctx: &RequestContext,
) -> anyhow::Result<()> {
let file_path = path.to_string();
let content = if content.is_empty() {
None
} else {
Some(Bytes::copy_from_slice(content))
};
let dir = if let Some(mut dir) = self.pending_aux_files.take() {
// We already updated aux files in `self`: emit a delta and update our latest value
self.put(
AUX_FILES_KEY,
Value::WalRecord(NeonWalRecord::AuxFile {
file_path: file_path.clone(),
content: content.clone(),
}),
);
dir.upsert(file_path, content);
dir
} else {
// Check if the AUX_FILES_KEY is initialized
match self.get(AUX_FILES_KEY, ctx).await {
Ok(dir_bytes) => {
let mut dir = AuxFilesDirectory::des(&dir_bytes)?;
// Key is already set, we may append a delta
self.put(
AUX_FILES_KEY,
Value::WalRecord(NeonWalRecord::AuxFile {
file_path: file_path.clone(),
content: content.clone(),
}),
);
dir.upsert(file_path, content);
dir
}
Err(
e @ (PageReconstructError::AncestorStopping(_)
| PageReconstructError::Cancelled
| PageReconstructError::AncestorLsnTimeout(_)),
) => {
// Important that we do not interpret a shutdown error as "not found" and thereby
// reset the map.
return Err(e.into());
}
// FIXME: PageReconstructError doesn't have an explicit variant for key-not-found, so
// we are assuming that all _other_ possible errors represents a missing key. If some
// other error occurs, we may incorrectly reset the map of aux files.
Err(PageReconstructError::Other(_) | PageReconstructError::WalRedo(_)) => {
// Key is missing, we must insert an image as the basis for subsequent deltas.
let mut dir = AuxFilesDirectory {
files: HashMap::new(),
};
dir.upsert(file_path, content);
self.put(
AUX_FILES_KEY,
Value::Image(Bytes::from(
AuxFilesDirectory::ser(&dir).context("serialize")?,
)),
);
dir
let mut dir = match self.get(AUX_FILES_KEY, ctx).await {
Ok(buf) => AuxFilesDirectory::des(&buf)?,
Err(e) => {
// This is expected: historical databases do not have the key.
debug!("Failed to get info about AUX files: {}", e);
AuxFilesDirectory {
files: HashMap::new(),
}
}
};
let path = path.to_string();
if content.is_empty() {
dir.files.remove(&path);
} else {
dir.files.insert(path, Bytes::copy_from_slice(content));
}
self.pending_directory_entries
.push((DirectoryKind::AuxFiles, dir.files.len()));
self.pending_aux_files = Some(dir);
self.put(
AUX_FILES_KEY,
Value::Image(Bytes::from(
AuxFilesDirectory::ser(&dir).context("serialize")?,
)),
);
Ok(())
}
@@ -1672,18 +1618,8 @@ struct RelDirectory {
}
#[derive(Debug, Serialize, Deserialize, Default)]
pub(crate) struct AuxFilesDirectory {
pub(crate) files: HashMap<String, Bytes>,
}
impl AuxFilesDirectory {
pub(crate) fn upsert(&mut self, key: String, value: Option<Bytes>) {
if let Some(value) = value {
self.files.insert(key, value);
} else {
self.files.remove(&key);
}
}
struct AuxFilesDirectory {
files: HashMap<String, Bytes>,
}
#[derive(Debug, Serialize, Deserialize)]
@@ -1719,60 +1655,8 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
#[allow(clippy::bool_assert_comparison)]
#[cfg(test)]
mod tests {
use hex_literal::hex;
use utils::id::TimelineId;
use super::*;
use crate::{tenant::harness::TenantHarness, DEFAULT_PG_VERSION};
/// Test a round trip of aux file updates, from DatadirModification to reading back from the Timeline
#[tokio::test]
async fn aux_files_round_trip() -> anyhow::Result<()> {
let name = "aux_files_round_trip";
let harness = TenantHarness::create(name)?;
pub const TIMELINE_ID: TimelineId =
TimelineId::from_array(hex!("11223344556677881122334455667788"));
let (tenant, ctx) = harness.load().await;
let tline = tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
.await?;
let tline = tline.raw_timeline().unwrap();
// First modification: insert two keys
let mut modification = tline.begin_modification(Lsn(0x1000));
modification.put_file("foo/bar1", b"content1", &ctx).await?;
modification.set_lsn(Lsn(0x1008))?;
modification.put_file("foo/bar2", b"content2", &ctx).await?;
modification.commit(&ctx).await?;
let expect_1008 = HashMap::from([
("foo/bar1".to_string(), Bytes::from_static(b"content1")),
("foo/bar2".to_string(), Bytes::from_static(b"content2")),
]);
let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
assert_eq!(readback, expect_1008);
// Second modification: update one key, remove the other
let mut modification = tline.begin_modification(Lsn(0x2000));
modification.put_file("foo/bar1", b"content3", &ctx).await?;
modification.set_lsn(Lsn(0x2008))?;
modification.put_file("foo/bar2", b"", &ctx).await?;
modification.commit(&ctx).await?;
let expect_2008 =
HashMap::from([("foo/bar1".to_string(), Bytes::from_static(b"content3"))]);
let readback = tline.list_aux_files(Lsn(0x2008), &ctx).await?;
assert_eq!(readback, expect_2008);
// Reading back in time works
let readback = tline.list_aux_files(Lsn(0x1008), &ctx).await?;
assert_eq!(readback, expect_1008);
Ok(())
}
//use super::repo_harness::*;
//use super::*;
/*
fn assert_current_logical_size<R: Repository>(timeline: &DatadirTimeline<R>, lsn: Lsn) {

View File

@@ -30,6 +30,10 @@
//! only a single tenant or timeline.
//!
// Clippy 1.60 incorrectly complains about the tokio::task_local!() macro.
// Silence it. See https://github.com/rust-lang/rust-clippy/issues/9224.
#![allow(clippy::declare_interior_mutable_const)]
use std::collections::HashMap;
use std::fmt;
use std::future::Future;
@@ -308,6 +312,7 @@ struct MutableTaskState {
}
struct PageServerTask {
#[allow(dead_code)] // unused currently
task_id: PageserverTaskId,
kind: TaskKind,

View File

@@ -3914,7 +3914,6 @@ pub(crate) mod harness {
use utils::lsn::Lsn;
use crate::deletion_queue::mock::MockDeletionQueue;
use crate::walredo::apply_neon;
use crate::{
config::PageServerConf, repository::Key, tenant::Tenant, walrecord::NeonWalRecord,
};
@@ -4174,34 +4173,20 @@ pub(crate) mod harness {
records: Vec<(Lsn, NeonWalRecord)>,
_pg_version: u32,
) -> anyhow::Result<Bytes> {
let records_neon = records.iter().all(|r| apply_neon::can_apply_in_neon(&r.1));
let s = format!(
"redo for {} to get to {}, with {} and {} records",
key,
lsn,
if base_img.is_some() {
"base image"
} else {
"no base image"
},
records.len()
);
println!("{s}");
if records_neon {
// For Neon wal records, we can decode without spawning postgres, so do so.
let base_img = base_img.expect("Neon WAL redo requires base image").1;
let mut page = BytesMut::new();
page.extend_from_slice(&base_img);
for (_record_lsn, record) in records {
apply_neon::apply_in_neon(&record, key, &mut page)?;
}
Ok(page.freeze())
} else {
// We never spawn a postgres walredo process in unit tests: just log what we might have done.
let s = format!(
"redo for {} to get to {}, with {} and {} records",
key,
lsn,
if base_img.is_some() {
"base image"
} else {
"no base image"
},
records.len()
);
println!("{s}");
Ok(TEST_IMG(&s))
}
Ok(TEST_IMG(&s))
}
}
}
@@ -4373,6 +4358,7 @@ mod tests {
ctx: &RequestContext,
) -> anyhow::Result<()> {
let mut lsn = start_lsn;
#[allow(non_snake_case)]
{
let writer = tline.writer().await;
// Create a relation on the timeline

View File

@@ -36,6 +36,7 @@ use crate::{
pub const VALUE_SZ: usize = 5;
pub const MAX_VALUE: u64 = 0x007f_ffff_ffff;
#[allow(dead_code)]
pub const PAGE_SZ: usize = 8192;
#[derive(Clone, Copy, Debug)]

View File

@@ -6,7 +6,6 @@ use crate::context::RequestContext;
use crate::page_cache::{self, PAGE_SZ};
use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
use crate::virtual_file::{self, VirtualFile};
use bytes::BytesMut;
use camino::Utf8PathBuf;
use pageserver_api::shard::TenantShardId;
use std::cmp::min;
@@ -27,10 +26,7 @@ pub struct EphemeralFile {
/// An ephemeral file is append-only.
/// We keep the last page, which can still be modified, in [`Self::mutable_tail`].
/// The other pages, which can no longer be modified, are accessed through the page cache.
///
/// None <=> IO is ongoing.
/// Size is fixed to PAGE_SZ at creation time and must not be changed.
mutable_tail: Option<BytesMut>,
mutable_tail: [u8; PAGE_SZ],
}
impl EphemeralFile {
@@ -64,7 +60,7 @@ impl EphemeralFile {
_timeline_id: timeline_id,
file,
len: 0,
mutable_tail: Some(BytesMut::zeroed(PAGE_SZ)),
mutable_tail: [0u8; PAGE_SZ],
})
}
@@ -107,13 +103,7 @@ impl EphemeralFile {
};
} else {
debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64);
Ok(BlockLease::EphemeralFileMutableTail(
self.mutable_tail
.as_deref()
.expect("we're not doing IO, it must be Some()")
.try_into()
.expect("we ensure that it's always PAGE_SZ"),
))
Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail))
}
}
@@ -145,27 +135,21 @@ impl EphemeralFile {
) -> Result<(), io::Error> {
let mut src_remaining = src;
while !src_remaining.is_empty() {
let dst_remaining = &mut self
.ephemeral_file
.mutable_tail
.as_deref_mut()
.expect("IO is not yet ongoing")[self.off..];
let dst_remaining = &mut self.ephemeral_file.mutable_tail[self.off..];
let n = min(dst_remaining.len(), src_remaining.len());
dst_remaining[..n].copy_from_slice(&src_remaining[..n]);
self.off += n;
src_remaining = &src_remaining[n..];
if self.off == PAGE_SZ {
let mutable_tail = std::mem::take(&mut self.ephemeral_file.mutable_tail)
.expect("IO is not yet ongoing");
let (mutable_tail, res) = self
match self
.ephemeral_file
.file
.write_all_at(mutable_tail, self.blknum as u64 * PAGE_SZ as u64)
.await;
// TODO: If we panic before we can put the mutable_tail back, subsequent calls will fail.
// I.e., the IO isn't retryable if we panic.
self.ephemeral_file.mutable_tail = Some(mutable_tail);
match res {
.write_all_at(
&self.ephemeral_file.mutable_tail,
self.blknum as u64 * PAGE_SZ as u64,
)
.await
{
Ok(_) => {
// Pre-warm the page cache with what we just wrote.
// This isn't necessary for coherency/correctness, but it's how we've always done it.
@@ -185,12 +169,7 @@ impl EphemeralFile {
Ok(page_cache::ReadBufResult::NotFound(mut write_guard)) => {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
buf.copy_from_slice(
self.ephemeral_file
.mutable_tail
.as_deref()
.expect("IO is not ongoing"),
);
buf.copy_from_slice(&self.ephemeral_file.mutable_tail);
let _ = write_guard.mark_valid();
// pre-warm successful
}
@@ -202,11 +181,7 @@ impl EphemeralFile {
// Zero the buffer for re-use.
// Zeroing is critical for correcntess because the write_blob code below
// and similarly read_blk expect zeroed pages.
self.ephemeral_file
.mutable_tail
.as_deref_mut()
.expect("IO is not ongoing")
.fill(0);
self.ephemeral_file.mutable_tail.fill(0);
// This block is done, move to next one.
self.blknum += 1;
self.off = 0;

View File

@@ -196,13 +196,13 @@ impl Timeline {
ControlFlow::Continue(()) => (),
}
#[allow(dead_code)]
#[derive(Debug, Default)]
struct EvictionStats {
candidates: usize,
evicted: usize,
errors: usize,
not_evictable: usize,
#[allow(dead_code)]
skipped_for_shutdown: usize,
}

View File

@@ -582,37 +582,24 @@ impl VirtualFile {
}
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
pub async fn write_all_at<B: BoundedBuf>(
&self,
buf: B,
mut offset: u64,
) -> (B::Buf, Result<(), Error>) {
let buf_len = buf.bytes_init();
if buf_len == 0 {
return (Slice::into_inner(buf.slice_full()), Ok(()));
}
let mut buf = buf.slice(0..buf_len);
pub async fn write_all_at(&self, mut buf: &[u8], mut offset: u64) -> Result<(), Error> {
while !buf.is_empty() {
// TODO: push `buf` further down
match self.write_at(&buf, offset).await {
match self.write_at(buf, offset).await {
Ok(0) => {
return (
Slice::into_inner(buf),
Err(Error::new(
std::io::ErrorKind::WriteZero,
"failed to write whole buffer",
)),
);
return Err(Error::new(
std::io::ErrorKind::WriteZero,
"failed to write whole buffer",
));
}
Ok(n) => {
buf = buf.slice(n..);
buf = &buf[n..];
offset += n as u64;
}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return (Slice::into_inner(buf), Err(e)),
Err(e) => return Err(e),
}
}
(Slice::into_inner(buf), Ok(()))
Ok(())
}
/// Writes `buf.slice(0..buf.bytes_init())`.
@@ -1077,19 +1064,10 @@ mod tests {
MaybeVirtualFile::File(file) => file.read_exact_at(&mut buf, offset).map(|()| buf),
}
}
async fn write_all_at<B: BoundedBuf>(&self, buf: B, offset: u64) -> Result<(), Error> {
async fn write_all_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => {
let (_buf, res) = file.write_all_at(buf, offset).await;
res
}
MaybeVirtualFile::File(file) => {
let buf_len = buf.bytes_init();
if buf_len == 0 {
return Ok(());
}
file.write_all_at(&buf.slice(0..buf_len), offset)
}
MaybeVirtualFile::VirtualFile(file) => file.write_all_at(buf, offset).await,
MaybeVirtualFile::File(file) => file.write_all_at(buf, offset),
}
}
async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
@@ -1236,8 +1214,8 @@ mod tests {
.to_owned(),
)
.await?;
file_b.write_all_at(b"BAR".to_vec(), 3).await?;
file_b.write_all_at(b"FOO".to_vec(), 0).await?;
file_b.write_all_at(b"BAR", 3).await?;
file_b.write_all_at(b"FOO", 0).await?;
assert_eq!(file_b.read_string_at(2, 3).await?, "OBA");

View File

@@ -44,11 +44,6 @@ pub enum NeonWalRecord {
moff: MultiXactOffset,
members: Vec<MultiXactMember>,
},
/// Update the map of AUX files, either writing or dropping an entry
AuxFile {
file_path: String,
content: Option<Bytes>,
},
}
impl NeonWalRecord {

View File

@@ -22,7 +22,7 @@
mod process;
/// Code to apply [`NeonWalRecord`]s.
pub(crate) mod apply_neon;
mod apply_neon;
use crate::config::PageServerConf;
use crate::metrics::{

View File

@@ -1,8 +1,7 @@
use crate::pgdatadir_mapping::AuxFilesDirectory;
use crate::walrecord::NeonWalRecord;
use anyhow::Context;
use byteorder::{ByteOrder, LittleEndian};
use bytes::{BufMut, BytesMut};
use bytes::BytesMut;
use pageserver_api::key::{key_to_rel_block, key_to_slru_block, Key};
use pageserver_api::reltag::SlruKind;
use postgres_ffi::pg_constants;
@@ -13,7 +12,6 @@ use postgres_ffi::v14::nonrelfile_utils::{
};
use postgres_ffi::BLCKSZ;
use tracing::*;
use utils::bin_ser::BeSer;
/// Can this request be served by neon redo functions
/// or we need to pass it to wal-redo postgres process?
@@ -232,72 +230,6 @@ pub(crate) fn apply_in_neon(
LittleEndian::write_u32(&mut page[memberoff..memberoff + 4], member.xid);
}
}
NeonWalRecord::AuxFile { file_path, content } => {
let mut dir = AuxFilesDirectory::des(page)?;
dir.upsert(file_path.clone(), content.clone());
page.clear();
let mut writer = page.writer();
dir.ser_into(&mut writer)?;
}
}
Ok(())
}
#[cfg(test)]
mod test {
use bytes::Bytes;
use pageserver_api::key::AUX_FILES_KEY;
use super::*;
use std::collections::HashMap;
use crate::{pgdatadir_mapping::AuxFilesDirectory, walrecord::NeonWalRecord};
/// Test [`apply_in_neon`]'s handling of NeonWalRecord::AuxFile
#[test]
fn apply_aux_file_deltas() -> anyhow::Result<()> {
let base_dir = AuxFilesDirectory {
files: HashMap::from([
("two".to_string(), Bytes::from_static(b"content0")),
("three".to_string(), Bytes::from_static(b"contentX")),
]),
};
let base_image = AuxFilesDirectory::ser(&base_dir)?;
let deltas = vec![
// Insert
NeonWalRecord::AuxFile {
file_path: "one".to_string(),
content: Some(Bytes::from_static(b"content1")),
},
// Update
NeonWalRecord::AuxFile {
file_path: "two".to_string(),
content: Some(Bytes::from_static(b"content99")),
},
// Delete
NeonWalRecord::AuxFile {
file_path: "three".to_string(),
content: None,
},
];
let file_path = AUX_FILES_KEY;
let mut page = BytesMut::from_iter(base_image);
for record in deltas {
apply_in_neon(&record, file_path, &mut page)?;
}
let reconstructed = AuxFilesDirectory::des(&page)?;
let expect = HashMap::from([
("one".to_string(), Bytes::from_static(b"content1")),
("two".to_string(), Bytes::from_static(b"content99")),
]);
assert_eq!(reconstructed.files, expect);
Ok(())
}
}

View File

@@ -7,24 +7,6 @@ AS 'MODULE_PATHNAME', 'test_consume_xids'
LANGUAGE C STRICT
PARALLEL UNSAFE;
CREATE FUNCTION test_consume_cpu(seconds int)
RETURNS VOID
AS 'MODULE_PATHNAME', 'test_consume_cpu'
LANGUAGE C STRICT
PARALLEL UNSAFE;
CREATE FUNCTION test_consume_memory(megabytes int)
RETURNS VOID
AS 'MODULE_PATHNAME', 'test_consume_memory'
LANGUAGE C STRICT
PARALLEL UNSAFE;
CREATE FUNCTION test_release_memory(megabytes int DEFAULT NULL)
RETURNS VOID
AS 'MODULE_PATHNAME', 'test_release_memory'
LANGUAGE C
PARALLEL UNSAFE;
CREATE FUNCTION clear_buffer_cache()
RETURNS VOID
AS 'MODULE_PATHNAME', 'clear_buffer_cache'

View File

@@ -3,4 +3,3 @@ comment = 'helpers for neon testing and debugging'
default_version = '1.0'
module_pathname = '$libdir/neon_test_utils'
relocatable = true
trusted = true

View File

@@ -21,12 +21,10 @@
#include "miscadmin.h"
#include "storage/buf_internals.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
#include "utils/builtins.h"
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/varlena.h"
#include "utils/wait_event.h"
#include "../neon/pagestore_client.h"
PG_MODULE_MAGIC;
@@ -34,9 +32,6 @@ PG_MODULE_MAGIC;
extern void _PG_init(void);
PG_FUNCTION_INFO_V1(test_consume_xids);
PG_FUNCTION_INFO_V1(test_consume_cpu);
PG_FUNCTION_INFO_V1(test_consume_memory);
PG_FUNCTION_INFO_V1(test_release_memory);
PG_FUNCTION_INFO_V1(clear_buffer_cache);
PG_FUNCTION_INFO_V1(get_raw_page_at_lsn);
PG_FUNCTION_INFO_V1(get_raw_page_at_lsn_ex);
@@ -102,119 +97,6 @@ test_consume_xids(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}
/*
* test_consume_cpu(seconds int). Keeps one CPU busy for the given number of seconds.
*/
Datum
test_consume_cpu(PG_FUNCTION_ARGS)
{
int32 seconds = PG_GETARG_INT32(0);
TimestampTz start;
uint64 total_iterations = 0;
start = GetCurrentTimestamp();
for (;;)
{
TimestampTz elapsed;
elapsed = GetCurrentTimestamp() - start;
if (elapsed > (TimestampTz) seconds * USECS_PER_SEC)
break;
/* keep spinning */
for (int i = 0; i < 1000000; i++)
total_iterations++;
elog(DEBUG2, "test_consume_cpu(): %lu iterations in total", total_iterations);
CHECK_FOR_INTERRUPTS();
}
PG_RETURN_VOID();
}
static MemoryContext consume_cxt = NULL;
static slist_head consumed_memory_chunks;
static int64 num_memory_chunks;
/*
* test_consume_memory(megabytes int).
*
* Consume given amount of memory. The allocation is made in TopMemoryContext,
* so it outlives the function, until you call test_release_memory to
* explicitly release it, or close the session.
*/
Datum
test_consume_memory(PG_FUNCTION_ARGS)
{
int32 megabytes = PG_GETARG_INT32(0);
/*
* Consume the memory in a new memory context, so that it's convenient to
* release and to display it separately in a possible memory context dump.
*/
if (consume_cxt == NULL)
consume_cxt = AllocSetContextCreate(TopMemoryContext,
"test_consume_memory",
ALLOCSET_DEFAULT_SIZES);
for (int32 i = 0; i < megabytes; i++)
{
char *p;
p = MemoryContextAllocZero(consume_cxt, 1024 * 1024);
/* touch the memory, so that it's really allocated by the kernel */
for (int j = 0; j < 1024 * 1024; j += 1024)
p[j] = j % 0xFF;
slist_push_head(&consumed_memory_chunks, (slist_node *) p);
num_memory_chunks++;
}
PG_RETURN_VOID();
}
/*
* test_release_memory(megabytes int). NULL releases all
*/
Datum
test_release_memory(PG_FUNCTION_ARGS)
{
TimestampTz start;
if (PG_ARGISNULL(0))
{
if (consume_cxt)
{
MemoryContextDelete(consume_cxt);
consume_cxt = NULL;
num_memory_chunks = 0;
}
}
else
{
int32 chunks_to_release = PG_GETARG_INT32(0);
if (chunks_to_release > num_memory_chunks)
{
elog(WARNING, "only %lu MB is consumed, releasing it all", num_memory_chunks);
chunks_to_release = num_memory_chunks;
}
for (int32 i = 0; i < chunks_to_release; i++)
{
slist_node *chunk = slist_pop_head_node(&consumed_memory_chunks);
pfree(chunk);
num_memory_chunks--;
}
}
PG_RETURN_VOID();
}
/*
* Flush the buffer cache, evicting all pages that are not currently pinned.
*/

View File

@@ -234,11 +234,9 @@ async fn main() -> anyhow::Result<()> {
let endpoint_rate_limiter = Arc::new(EndpointRateLimiter::new(&config.endpoint_rps_limit));
let cancel_map = CancelMap::default();
let redis_publisher = match &args.redis_notifications {
Some(url) => Some(Arc::new(Mutex::new(RedisPublisherClient::new(
url,
args.region.clone(),
&config.redis_rps_limit,
)?))),
Some(url) => Some(Arc::new(Mutex::new(
RedisPublisherClient::new(url, args.region.clone(), &config.redis_rps_limit).await?,
))),
None => None,
};
let cancellation_handler = Arc::new(CancellationHandler::new(

View File

@@ -331,7 +331,6 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
compute: node,
req: _request_gauge,
conn: _client_gauge,
cancel: session,
}))
}

View File

@@ -122,24 +122,25 @@ where
error!(error = ?err, "could not connect to compute node");
let node_info = if !node_info.cached() {
// If we just recieved this from cplane and dodn't get it from cache, we shouldn't retry.
// Do not need to retrieve a new node_info, just return the old one.
if !err.should_retry(num_retries) {
return Err(err.into());
}
node_info
} else {
// if we failed to connect, it's likely that the compute node was suspended, wake a new compute node
info!("compute node's state has likely changed; requesting a wake-up");
ctx.latency_timer.cache_miss();
let old_node_info = invalidate_cache(node_info);
let mut node_info = wake_compute(&mut num_retries, ctx, user_info).await?;
node_info.reuse_settings(old_node_info);
let node_info =
if err.get_error_kind() == crate::error::ErrorKind::Postgres || !node_info.cached() {
// If the error is Postgres, that means that we managed to connect to the compute node, but there was an error.
// Do not need to retrieve a new node_info, just return the old one.
if !err.should_retry(num_retries) {
return Err(err.into());
}
node_info
} else {
// if we failed to connect, it's likely that the compute node was suspended, wake a new compute node
info!("compute node's state has likely changed; requesting a wake-up");
ctx.latency_timer.cache_miss();
let old_node_info = invalidate_cache(node_info);
let mut node_info = wake_compute(&mut num_retries, ctx, user_info).await?;
node_info.reuse_settings(old_node_info);
mechanism.update_connect_config(&mut node_info.config);
node_info
};
mechanism.update_connect_config(&mut node_info.config);
node_info
};
// now that we have a new node, try connect to it repeatedly.
// this can error for a few reasons, for instance:

View File

@@ -1,5 +1,4 @@
use crate::{
cancellation,
compute::PostgresConnection,
console::messages::MetricsAuxInfo,
metrics::NUM_BYTES_PROXIED_COUNTER,
@@ -58,7 +57,6 @@ pub struct ProxyPassthrough<S> {
pub req: IntCounterPairGuard,
pub conn: IntCounterPairGuard,
pub cancel: cancellation::Session,
}
impl<S: AsyncRead + AsyncWrite + Unpin> ProxyPassthrough<S> {

View File

@@ -375,6 +375,8 @@ enum ConnectAction {
Connect,
Retry,
Fail,
RetryPg,
FailPg,
}
#[derive(Clone)]
@@ -464,6 +466,14 @@ impl ConnectMechanism for TestConnectMechanism {
retryable: false,
kind: ErrorKind::Compute,
}),
ConnectAction::FailPg => Err(TestConnectError {
retryable: false,
kind: ErrorKind::Postgres,
}),
ConnectAction::RetryPg => Err(TestConnectError {
retryable: true,
kind: ErrorKind::Postgres,
}),
x => panic!("expecting action {:?}, connect is called instead", x),
}
}
@@ -562,6 +572,32 @@ async fn connect_to_compute_retry() {
mechanism.verify();
}
#[tokio::test]
async fn connect_to_compute_retry_pg() {
let _ = env_logger::try_init();
use ConnectAction::*;
let mut ctx = RequestMonitoring::test();
let mechanism = TestConnectMechanism::new(vec![Wake, RetryPg, Connect]);
let user_info = helper_create_connect_info(&mechanism);
connect_to_compute(&mut ctx, &mechanism, &user_info, false)
.await
.unwrap();
mechanism.verify();
}
#[tokio::test]
async fn connect_to_compute_fail_pg() {
let _ = env_logger::try_init();
use ConnectAction::*;
let mut ctx = RequestMonitoring::test();
let mechanism = TestConnectMechanism::new(vec![Wake, FailPg]);
let user_info = helper_create_connect_info(&mechanism);
connect_to_compute(&mut ctx, &mechanism, &user_info, false)
.await
.unwrap_err();
mechanism.verify();
}
/// Test that we don't retry if the error is not retryable.
#[tokio::test]
async fn connect_to_compute_non_retry_1() {

View File

@@ -1,5 +1,12 @@
use std::hash::BuildHasherDefault;
use lasso::{Spur, ThreadedRodeo};
use pq_proto::CancelKeyData;
use redis::AsyncCommands;
use redis::{
streams::{StreamReadOptions, StreamReadReply},
AsyncCommands, FromRedisValue,
};
use rustc_hash::FxHasher;
use uuid::Uuid;
use crate::rate_limiter::{RateBucketInfo, RedisRateLimiter};
@@ -14,15 +21,74 @@ pub struct RedisPublisherClient {
}
impl RedisPublisherClient {
pub fn new(
pub async fn new(
url: &str,
region_id: String,
info: &'static [RateBucketInfo],
) -> anyhow::Result<Self> {
let client = redis::Client::open(url)?;
let mut conn = client.get_async_connection().await.unwrap();
let len: u64 = conn.xlen("neon:endpoints:testing").await.unwrap();
tracing::info!("starting with {len} endpoints in redis");
for i in len..300000u64 {
let _key: String = conn
.xadd(
"neon:endpoints:testing",
"*",
&[("endpoint_id", format!("ep-hello-world-{i}"))],
)
.await
.unwrap();
if i.is_power_of_two() {
tracing::debug!("endpoints written {i}");
}
// client.
}
tracing::info!("written endpoints to redis");
// start reading
let s = ThreadedRodeo::<Spur, BuildHasherDefault<FxHasher>>::with_hasher(
BuildHasherDefault::default(),
);
let opts = StreamReadOptions::default().count(1000);
let mut id = "0-0".to_string();
loop {
let mut res: StreamReadReply = conn
.xread_options(&["neon:endpoints:testing"], &[&id], &opts)
.await
.unwrap();
if res.keys.is_empty() {
break;
}
assert_eq!(res.keys.len(), 1);
let res = res.keys.pop().unwrap();
assert_eq!(res.key, "neon:endpoints:testing");
if res.ids.is_empty() {
break;
}
for x in res.ids {
id = x.id;
if let Some(ep) = x.map.get("endpoint_id") {
let ep = String::from_redis_value(ep).unwrap();
s.get_or_intern(ep);
if s.len().is_power_of_two() {
tracing::debug!("endpoints read {}", s.len());
}
}
}
}
tracing::info!("read {} endpoints from redis", s.len());
Ok(Self {
client,
publisher: None,
publisher: Some(conn),
region_id,
limiter: RedisRateLimiter::new(info),
})

View File

@@ -1,7 +1,11 @@
#![allow(unused)]
use std::str::FromStr;
use std::time::Duration;
use chrono::{DateTime, Utc};
use hex::FromHex;
use pageserver::tenant::Tenant;
use reqwest::{header, Client, StatusCode, Url};
use serde::Deserialize;
use tokio::sync::Semaphore;
@@ -286,7 +290,7 @@ impl CloudAdminApiClient {
tokio::time::sleep(Duration::from_millis(500)).await;
continue;
}
_status => {
status => {
return Err(Error::new(
"List active projects".to_string(),
ErrorKind::ResponseStatus(response.status()),

View File

@@ -116,9 +116,6 @@ pub struct SharedState {
/// when tli is inactive instead of having this flag.
active: bool,
last_removed_segno: XLogSegNo,
/// True if local and remote deletion has been done, allows to skip visiting
/// s3 on retries.
fully_deleted: bool,
}
impl SharedState {
@@ -158,7 +155,6 @@ impl SharedState {
wal_backup_active: false,
active: false,
last_removed_segno: 0,
fully_deleted: false,
})
}
@@ -178,7 +174,6 @@ impl SharedState {
wal_backup_active: false,
active: false,
last_removed_segno: 0,
fully_deleted: false,
})
}
@@ -487,10 +482,6 @@ impl Timeline {
shared_state: &mut MutexGuard<'_, SharedState>,
only_local: bool,
) -> Result<(bool, bool)> {
if shared_state.fully_deleted {
return Ok((false, false));
};
let was_active = shared_state.active;
self.cancel(shared_state);
@@ -505,9 +496,6 @@ impl Timeline {
wal_backup::delete_timeline(&self.ttid).await?;
}
let dir_existed = delete_dir(&self.timeline_dir).await?;
if !only_local {
shared_state.fully_deleted = true;
}
Ok((dir_existed, was_active))
}

View File

@@ -3967,24 +3967,27 @@ def list_files_to_compare(pgdata_dir: Path) -> List[str]:
# pg is the existing and running compute node, that we want to compare with a basebackup
def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, endpoint: Endpoint):
pg_bin = PgBin(test_output_dir, env.pg_distrib_dir, env.pg_version)
# Get the timeline ID. We need it for the 'basebackup' command
timeline_id = TimelineId(endpoint.safe_psql("SHOW neon.timeline_id")[0][0])
# many tests already checkpoint, but do it just in case
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:
cur.execute("CHECKPOINT")
# wait for pageserver to catch up
wait_for_last_flush_lsn(env, endpoint, endpoint.tenant_id, timeline_id)
# stop postgres to ensure that files won't change
endpoint.stop()
# Read the shutdown checkpoint's LSN
pg_controldata_path = os.path.join(pg_bin.pg_bin_path, "pg_controldata")
cmd = f"{pg_controldata_path} -D {endpoint.pgdata_dir}"
result = subprocess.run(cmd, capture_output=True, text=True, shell=True)
checkpoint_lsn = re.findall(
"Latest checkpoint location:\\s+([0-9A-F]+/[0-9A-F]+)", result.stdout
)[0]
log.debug(f"last checkpoint at {checkpoint_lsn}")
# Take a basebackup from pageserver
restored_dir_path = env.repo_dir / f"{endpoint.endpoint_id}_restored_datadir"
restored_dir_path.mkdir(exist_ok=True)
pg_bin = PgBin(test_output_dir, env.pg_distrib_dir, env.pg_version)
psql_path = os.path.join(pg_bin.pg_bin_path, "psql")
pageserver_id = env.attachment_service.locate(endpoint.tenant_id)[0]["node_id"]
@@ -3992,7 +3995,7 @@ def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, endpoint
{psql_path} \
--no-psqlrc \
postgres://localhost:{env.get_pageserver(pageserver_id).service_port.pg} \
-c 'basebackup {endpoint.tenant_id} {timeline_id}' \
-c 'basebackup {endpoint.tenant_id} {timeline_id} {checkpoint_lsn}' \
| tar -x -C {restored_dir_path}
"""

View File

@@ -1,28 +0,0 @@
-- Test the test utils in pgxn/neon_test_utils. We don't test that
-- these actually consume resources like they should - that would be
-- tricky - but at least we check that they don't crash.
CREATE EXTENSION neon_test_utils;
select test_consume_cpu(1);
test_consume_cpu
------------------
(1 row)
select test_consume_memory(20); -- Allocate 20 MB
test_consume_memory
---------------------
(1 row)
select test_release_memory(5); -- Release 5 MB
test_release_memory
---------------------
(1 row)
select test_release_memory(); -- Release the remaining 15 MB
test_release_memory
---------------------
(1 row)

View File

@@ -7,5 +7,4 @@
test: neon-cid
test: neon-rel-truncate
test: neon-clog
test: neon-test-utils
test: neon-vacuum-full

View File

@@ -1,11 +0,0 @@
-- Test the test utils in pgxn/neon_test_utils. We don't test that
-- these actually consume resources like they should - that would be
-- tricky - but at least we check that they don't crash.
CREATE EXTENSION neon_test_utils;
select test_consume_cpu(1);
select test_consume_memory(20); -- Allocate 20 MB
select test_release_memory(5); -- Release 5 MB
select test_release_memory(); -- Release the remaining 15 MB

View File

@@ -1,5 +1,5 @@
{
"postgres-v16": "9c37a4988463a97d9cacb321acf3828b09823269",
"postgres-v15": "ca2def999368d9df098a637234ad5a9003189463",
"postgres-v14": "9dd9956c55ffbbd9abe77d10382453757fedfcf5"
"postgres-v16": "550cdd26d445afdd26b15aa93c8c2f3dc52f8361",
"postgres-v15": "6ee78a3c29e33cafd85ba09568b6b5eb031d29b9",
"postgres-v14": "018fb052011081dc2733d3118d12e5c36df6eba1"
}