mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-21 15:10:44 +00:00
Compare commits
23 Commits
bodobolero
...
heikki/hac
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7b5e3d6d40 | ||
|
|
e086568e21 | ||
|
|
7b818f8d64 | ||
|
|
14fefd261f | ||
|
|
01abd4afc5 | ||
|
|
c8541ad29f | ||
|
|
eaad1db9f0 | ||
|
|
6ddcf68829 | ||
|
|
d701f8285c | ||
|
|
77082a0f63 | ||
|
|
e94acbc816 | ||
|
|
f4150614d0 | ||
|
|
38dbc5f67f | ||
|
|
3685ad606d | ||
|
|
76a7d37f7e | ||
|
|
cdb6479c8a | ||
|
|
81c557d87e | ||
|
|
e963129678 | ||
|
|
4f0a9fc569 | ||
|
|
81c6a5a796 | ||
|
|
8e05639dbf | ||
|
|
deed46015d | ||
|
|
532d9b646e |
2
.github/workflows/build_and_test.yml
vendored
2
.github/workflows/build_and_test.yml
vendored
@@ -963,7 +963,7 @@ jobs:
|
||||
fi
|
||||
|
||||
- name: Verify docker-compose example and test extensions
|
||||
timeout-minutes: 20
|
||||
timeout-minutes: 60
|
||||
env:
|
||||
TAG: >-
|
||||
${{
|
||||
|
||||
21
Cargo.lock
generated
21
Cargo.lock
generated
@@ -3794,6 +3794,16 @@ version = "0.8.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
|
||||
|
||||
[[package]]
|
||||
name = "neon-shmem"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"nix 0.30.1",
|
||||
"tempfile",
|
||||
"thiserror 1.0.69",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "never-say-never"
|
||||
version = "6.6.666"
|
||||
@@ -4424,6 +4434,16 @@ dependencies = [
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pageserver_page_api"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"prost 0.13.3",
|
||||
"tonic",
|
||||
"tonic-build",
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "papaya"
|
||||
version = "0.2.1"
|
||||
@@ -8482,6 +8502,7 @@ dependencies = [
|
||||
"log",
|
||||
"memchr",
|
||||
"nix 0.26.4",
|
||||
"nix 0.30.1",
|
||||
"nom",
|
||||
"num",
|
||||
"num-bigint",
|
||||
|
||||
@@ -9,6 +9,7 @@ members = [
|
||||
"pageserver/ctl",
|
||||
"pageserver/client",
|
||||
"pageserver/pagebench",
|
||||
"pageserver/page_api",
|
||||
"proxy",
|
||||
"safekeeper",
|
||||
"safekeeper/client",
|
||||
@@ -23,6 +24,7 @@ members = [
|
||||
"libs/postgres_ffi",
|
||||
"libs/safekeeper_api",
|
||||
"libs/desim",
|
||||
"libs/neon-shmem",
|
||||
"libs/utils",
|
||||
"libs/consumption_metrics",
|
||||
"libs/postgres_backend",
|
||||
@@ -127,7 +129,7 @@ md5 = "0.7.0"
|
||||
measured = { version = "0.0.22", features=["lasso"] }
|
||||
measured-process = { version = "0.0.22" }
|
||||
memoffset = "0.9"
|
||||
nix = { version = "0.30.1", features = ["dir", "fs", "process", "socket", "signal", "poll"] }
|
||||
nix = { version = "0.30.1", features = ["dir", "fs", "mman", "process", "socket", "signal", "poll"] }
|
||||
# Do not update to >= 7.0.0, at least. The update will have a significant impact
|
||||
# on compute startup metrics (start_postgres_ms), >= 25% degradation.
|
||||
notify = "6.0.0"
|
||||
@@ -251,6 +253,7 @@ pageserver = { path = "./pageserver" }
|
||||
pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }
|
||||
pageserver_client = { path = "./pageserver/client" }
|
||||
pageserver_compaction = { version = "0.1", path = "./pageserver/compaction/" }
|
||||
pageserver_page_api = { path = "./pageserver/page_api" }
|
||||
postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" }
|
||||
postgres_connection = { version = "0.1", path = "./libs/postgres_connection/" }
|
||||
postgres_ffi = { version = "0.1", path = "./libs/postgres_ffi/" }
|
||||
|
||||
@@ -144,7 +144,6 @@ RUN set -e \
|
||||
openssh-client \
|
||||
parallel \
|
||||
pkg-config \
|
||||
sudo \
|
||||
unzip \
|
||||
wget \
|
||||
xz-utils \
|
||||
|
||||
@@ -7,7 +7,7 @@ index 255e616..1c6edb7 100644
|
||||
RelationGetRelationName(index));
|
||||
|
||||
+#ifdef NEON_SMGR
|
||||
+ smgr_start_unlogged_build(index->rd_smgr);
|
||||
+ smgr_start_unlogged_build(RelationGetSmgr(index));
|
||||
+#endif
|
||||
+
|
||||
initRumState(&buildstate.rumstate, index);
|
||||
@@ -18,7 +18,7 @@ index 255e616..1c6edb7 100644
|
||||
rumUpdateStats(index, &buildstate.buildStats, buildstate.rumstate.isBuild);
|
||||
|
||||
+#ifdef NEON_SMGR
|
||||
+ smgr_finish_unlogged_build_phase_1(index->rd_smgr);
|
||||
+ smgr_finish_unlogged_build_phase_1(RelationGetSmgr(index));
|
||||
+#endif
|
||||
+
|
||||
/*
|
||||
@@ -29,7 +29,7 @@ index 255e616..1c6edb7 100644
|
||||
}
|
||||
|
||||
+#ifdef NEON_SMGR
|
||||
+ smgr_end_unlogged_build(index->rd_smgr);
|
||||
+ smgr_end_unlogged_build(RelationGetSmgr(index));
|
||||
+#endif
|
||||
+
|
||||
/*
|
||||
|
||||
@@ -15,6 +15,10 @@ commands:
|
||||
user: root
|
||||
sysvInitAction: sysinit
|
||||
shell: 'chmod 711 /neonvm/bin/set-disk-quota'
|
||||
- name: chmod-clockcache_dev
|
||||
user: root
|
||||
sysvInitAction: sysinit
|
||||
shell: 'chmod 777 /dev/clockcache_dev' # FIXME: not very secure
|
||||
- name: pgbouncer
|
||||
user: postgres
|
||||
sysvInitAction: respawn
|
||||
|
||||
@@ -213,8 +213,10 @@ impl Escaping for PgIdent {
|
||||
|
||||
// Find the first suitable tag that is not present in the string.
|
||||
// Postgres' max role/DB name length is 63 bytes, so even in the
|
||||
// worst case it won't take long.
|
||||
while self.contains(&format!("${tag}$")) || self.contains(&format!("${outer_tag}$")) {
|
||||
// worst case it won't take long. Outer tag is always `tag + "x"`,
|
||||
// so if `tag` is not present in the string, `outer_tag` is not
|
||||
// present in the string either.
|
||||
while self.contains(&tag.to_string()) {
|
||||
tag += "x";
|
||||
outer_tag = tag.clone() + "x";
|
||||
}
|
||||
|
||||
@@ -71,6 +71,14 @@ test.escaping = 'here''s a backslash \\ and a quote '' and a double-quote " hoor
|
||||
("name$$$", ("$x$name$$$$x$", "xx")),
|
||||
("name$$$$", ("$x$name$$$$$x$", "xx")),
|
||||
("name$x$", ("$xx$name$x$$xx$", "xxx")),
|
||||
("x", ("$xx$x$xx$", "xxx")),
|
||||
("xx", ("$xxx$xx$xxx$", "xxxx")),
|
||||
("$x", ("$xx$$x$xx$", "xxx")),
|
||||
("x$", ("$xx$x$$xx$", "xxx")),
|
||||
("$x$", ("$xx$$x$$xx$", "xxx")),
|
||||
("xx$", ("$xxx$xx$$xxx$", "xxxx")),
|
||||
("$xx", ("$xxx$$xx$xxx$", "xxxx")),
|
||||
("$xx$", ("$xxx$$xx$$xxx$", "xxxx")),
|
||||
];
|
||||
|
||||
for (input, expected) in test_cases {
|
||||
|
||||
@@ -462,6 +462,8 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
|
||||
if var(REAL_S3_ENV).is_ok() {
|
||||
assert!(body.contains("remote_storage_s3_deleted_objects_total"));
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
assert!(body.contains("process_threads"));
|
||||
}
|
||||
|
||||
|
||||
13
libs/neon-shmem/Cargo.toml
Normal file
13
libs/neon-shmem/Cargo.toml
Normal file
@@ -0,0 +1,13 @@
|
||||
[package]
|
||||
name = "neon-shmem"
|
||||
version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
thiserror.workspace = true
|
||||
nix.workspace=true
|
||||
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
|
||||
|
||||
[target.'cfg(target_os = "macos")'.dependencies]
|
||||
tempfile = "3.14.0"
|
||||
418
libs/neon-shmem/src/lib.rs
Normal file
418
libs/neon-shmem/src/lib.rs
Normal file
@@ -0,0 +1,418 @@
|
||||
//! Shared memory utilities for neon communicator
|
||||
|
||||
use std::num::NonZeroUsize;
|
||||
use std::os::fd::{AsFd, BorrowedFd, OwnedFd};
|
||||
use std::ptr::NonNull;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
use nix::errno::Errno;
|
||||
use nix::sys::mman::MapFlags;
|
||||
use nix::sys::mman::ProtFlags;
|
||||
use nix::sys::mman::mmap as nix_mmap;
|
||||
use nix::sys::mman::munmap as nix_munmap;
|
||||
use nix::unistd::ftruncate as nix_ftruncate;
|
||||
|
||||
/// ShmemHandle represents a shared memory area that can be shared by processes over fork().
|
||||
/// Unlike shared memory allocated by Postgres, this area is resizable, up to 'max_size' that's
|
||||
/// specified at creation.
|
||||
///
|
||||
/// The area is backed by an anonymous file created with memfd_create(). The full address space for
|
||||
/// 'max_size' is reserved up-front with mmap(), but whenever you call [`ShmemHandle::set_size`],
|
||||
/// the underlying file is resized. Do not access the area beyond the current size. Currently, that
|
||||
/// will cause the file to be expanded, but we might use mprotect() etc. to enforce that in the
|
||||
/// future.
|
||||
pub struct ShmemHandle {
|
||||
/// memfd file descriptor
|
||||
fd: OwnedFd,
|
||||
|
||||
max_size: usize,
|
||||
|
||||
// Pointer to the beginning of the shared memory area. The header is stored there.
|
||||
shared_ptr: NonNull<SharedStruct>,
|
||||
|
||||
// Pointer to the beginning of the user data
|
||||
pub data_ptr: NonNull<u8>,
|
||||
}
|
||||
|
||||
/// This is stored at the beginning in the shared memory area.
|
||||
struct SharedStruct {
|
||||
max_size: usize,
|
||||
|
||||
/// Current size of the backing file. The high-order bit is used for the RESIZE_IN_PROGRESS flag
|
||||
current_size: AtomicUsize,
|
||||
}
|
||||
|
||||
const RESIZE_IN_PROGRESS: usize = 1 << 63;
|
||||
|
||||
const HEADER_SIZE: usize = std::mem::size_of::<SharedStruct>();
|
||||
|
||||
/// Error type returned by the ShmemHandle functions.
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
#[error("{msg}: {errno}")]
|
||||
pub struct Error {
|
||||
pub msg: String,
|
||||
pub errno: Errno,
|
||||
}
|
||||
|
||||
impl Error {
|
||||
fn new(msg: &str, errno: Errno) -> Error {
|
||||
Error {
|
||||
msg: msg.to_string(),
|
||||
errno,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ShmemHandle {
|
||||
/// Create a new shared memory area. To communicate between processes, the processes need to be
|
||||
/// fork()'d after calling this, so that the ShmemHandle is inherited by all processes.
|
||||
///
|
||||
/// If the ShmemHandle is dropped, the memory is unmapped from the current process. Other
|
||||
/// processes can continue using it, however.
|
||||
pub fn new(name: &str, initial_size: usize, max_size: usize) -> Result<ShmemHandle, Error> {
|
||||
// create the backing anonymous file.
|
||||
let fd = create_backing_file(name)?;
|
||||
|
||||
Self::new_with_fd(fd, initial_size, max_size)
|
||||
}
|
||||
|
||||
fn new_with_fd(
|
||||
fd: OwnedFd,
|
||||
initial_size: usize,
|
||||
max_size: usize,
|
||||
) -> Result<ShmemHandle, Error> {
|
||||
// We reserve the high-order bit for the RESIZE_IN_PROGRESS flag, and the actual size
|
||||
// is a little larger than this because of the SharedStruct header. Make the upper limit
|
||||
// somewhat smaller than that, because with anything close to that, you'll run out of
|
||||
// memory anyway.
|
||||
if max_size >= 1 << 48 {
|
||||
panic!("max size {} too large", max_size);
|
||||
}
|
||||
if initial_size > max_size {
|
||||
panic!("initial size {initial_size} larger than max size {max_size}");
|
||||
}
|
||||
|
||||
// The actual initial / max size is the one given by the caller, plus the size of
|
||||
// 'SharedStruct'.
|
||||
let initial_size = HEADER_SIZE + initial_size;
|
||||
let max_size = NonZeroUsize::new(HEADER_SIZE + max_size).unwrap();
|
||||
|
||||
// Reserve address space for it with mmap
|
||||
//
|
||||
// TODO: Use MAP_HUGETLB if possible
|
||||
let start_ptr = unsafe {
|
||||
nix_mmap(
|
||||
None,
|
||||
max_size,
|
||||
ProtFlags::PROT_READ | ProtFlags::PROT_WRITE,
|
||||
MapFlags::MAP_SHARED,
|
||||
&fd,
|
||||
0,
|
||||
)
|
||||
}
|
||||
.map_err(|e| Error::new("mmap failed: {e}", e))?;
|
||||
|
||||
// Reserve space for the initial size
|
||||
enlarge_file(fd.as_fd(), initial_size as u64)?;
|
||||
|
||||
// Initialize the header
|
||||
let shared: NonNull<SharedStruct> = start_ptr.cast();
|
||||
unsafe {
|
||||
shared.write(SharedStruct {
|
||||
max_size: max_size.into(),
|
||||
current_size: AtomicUsize::new(initial_size),
|
||||
})
|
||||
};
|
||||
|
||||
// The user data begins after the header
|
||||
let data_ptr = unsafe { start_ptr.cast().add(HEADER_SIZE) };
|
||||
|
||||
Ok(ShmemHandle {
|
||||
fd,
|
||||
max_size: max_size.into(),
|
||||
shared_ptr: shared,
|
||||
data_ptr,
|
||||
})
|
||||
}
|
||||
|
||||
// return reference to the header
|
||||
fn shared(&self) -> &SharedStruct {
|
||||
unsafe { self.shared_ptr.as_ref() }
|
||||
}
|
||||
|
||||
/// Resize the shared memory area. 'new_size' must not be larger than the 'max_size' specified
|
||||
/// when creating the area.
|
||||
///
|
||||
/// This may only be called from one process/thread concurrently. We detect that case
|
||||
/// and return an Error.
|
||||
pub fn set_size(&self, new_size: usize) -> Result<(), Error> {
|
||||
let new_size = new_size + HEADER_SIZE;
|
||||
let shared = self.shared();
|
||||
|
||||
if new_size > self.max_size {
|
||||
panic!(
|
||||
"new size ({} is greater than max size ({})",
|
||||
new_size, self.max_size
|
||||
);
|
||||
}
|
||||
assert_eq!(self.max_size, shared.max_size);
|
||||
|
||||
// Lock the area by setting the bit in 'current_size'
|
||||
//
|
||||
// Ordering::Relaxed would probably be sufficient here, as we don't access any other memory
|
||||
// and the posix_fallocate/ftruncate call is surely a synchronization point anyway. But
|
||||
// since this is not performance-critical, better safe than sorry .
|
||||
let mut old_size = shared.current_size.load(Ordering::Acquire);
|
||||
loop {
|
||||
if (old_size & RESIZE_IN_PROGRESS) != 0 {
|
||||
return Err(Error::new(
|
||||
"concurrent resize detected",
|
||||
Errno::UnknownErrno,
|
||||
));
|
||||
}
|
||||
match shared.current_size.compare_exchange(
|
||||
old_size,
|
||||
new_size,
|
||||
Ordering::Acquire,
|
||||
Ordering::Relaxed,
|
||||
) {
|
||||
Ok(_) => break,
|
||||
Err(x) => old_size = x,
|
||||
}
|
||||
}
|
||||
|
||||
// Ok, we got the lock.
|
||||
//
|
||||
// NB: If anything goes wrong, we *must* clear the bit!
|
||||
let result = {
|
||||
use std::cmp::Ordering::{Equal, Greater, Less};
|
||||
match new_size.cmp(&old_size) {
|
||||
Less => nix_ftruncate(&self.fd, new_size as i64).map_err(|e| {
|
||||
Error::new("could not shrink shmem segment, ftruncate failed: {e}", e)
|
||||
}),
|
||||
Equal => Ok(()),
|
||||
Greater => enlarge_file(self.fd.as_fd(), new_size as u64),
|
||||
}
|
||||
};
|
||||
|
||||
// Unlock
|
||||
shared.current_size.store(
|
||||
if result.is_ok() { new_size } else { old_size },
|
||||
Ordering::Release,
|
||||
);
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
/// Returns the current user-visible size of the shared memory segment.
|
||||
///
|
||||
/// NOTE: a concurrent set_size() call can change the size at any time. It is the caller's
|
||||
/// responsibility not to access the area beyond the current size.
|
||||
pub fn current_size(&self) -> usize {
|
||||
let total_current_size =
|
||||
self.shared().current_size.load(Ordering::Relaxed) & !RESIZE_IN_PROGRESS;
|
||||
total_current_size - HEADER_SIZE
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ShmemHandle {
|
||||
fn drop(&mut self) {
|
||||
// SAFETY: The pointer was obtained from mmap() with the given size.
|
||||
// We unmap the entire region.
|
||||
let _ = unsafe { nix_munmap(self.shared_ptr.cast(), self.max_size) };
|
||||
// The fd is dropped automatically by OwnedFd.
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a "backing file" for the shared memory area. On Linux, use memfd_create(), to create an
|
||||
/// anonymous in-memory file. One macos, fall back to a regular file. That's good enough for
|
||||
/// development and testing, but in production we want the file to stay in memory.
|
||||
///
|
||||
/// disable 'unused_variables' warnings, because in the macos path, 'name' is unused.
|
||||
#[allow(unused_variables)]
|
||||
fn create_backing_file(name: &str) -> Result<OwnedFd, Error> {
|
||||
#[cfg(not(target_os = "macos"))]
|
||||
{
|
||||
nix::sys::memfd::memfd_create(name, nix::sys::memfd::MFdFlags::empty())
|
||||
.map_err(|e| Error::new("memfd_create failed: {e}", e))
|
||||
}
|
||||
#[cfg(target_os = "macos")]
|
||||
{
|
||||
let file = tempfile::tempfile().map_err(|e| {
|
||||
Error::new(
|
||||
"could not create temporary file to back shmem area: {e}",
|
||||
nix::errno::Errno::from_raw(e.raw_os_error().unwrap_or(0)),
|
||||
)
|
||||
})?;
|
||||
Ok(OwnedFd::from(file))
|
||||
}
|
||||
}
|
||||
|
||||
fn enlarge_file(fd: BorrowedFd, size: u64) -> Result<(), Error> {
|
||||
// Use posix_fallocate() to enlarge the file. It reserves the space correctly, so that
|
||||
// we don't get a segfault later when trying to actually use it.
|
||||
#[cfg(not(target_os = "macos"))]
|
||||
{
|
||||
nix::fcntl::posix_fallocate(fd, 0, size as i64).map_err(|e| {
|
||||
Error::new(
|
||||
"could not grow shmem segment, posix_fallocate failed: {e}",
|
||||
e,
|
||||
)
|
||||
})
|
||||
}
|
||||
// As a fallback on macos, which doesn't have posix_fallocate, use plain 'fallocate'
|
||||
#[cfg(target_os = "macos")]
|
||||
{
|
||||
nix::unistd::ftruncate(fd, size as i64)
|
||||
.map_err(|e| Error::new("could not grow shmem segment, ftruncate failed: {e}", e))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
use nix::unistd::ForkResult;
|
||||
use std::ops::Range;
|
||||
|
||||
/// check that all bytes in given range have the expected value.
|
||||
fn assert_range(ptr: *const u8, expected: u8, range: Range<usize>) {
|
||||
for i in range {
|
||||
let b = unsafe { *(ptr.add(i)) };
|
||||
assert_eq!(expected, b, "unexpected byte at offset {}", i);
|
||||
}
|
||||
}
|
||||
|
||||
/// Write 'b' to all bytes in the given range
|
||||
fn write_range(ptr: *mut u8, b: u8, range: Range<usize>) {
|
||||
unsafe { std::ptr::write_bytes(ptr.add(range.start), b, range.end - range.start) };
|
||||
}
|
||||
|
||||
// simple single-process test of growing and shrinking
|
||||
#[test]
|
||||
fn test_shmem_resize() -> Result<(), Error> {
|
||||
let max_size = 1024 * 1024;
|
||||
let init_struct = ShmemHandle::new("test_shmem_resize", 0, max_size)?;
|
||||
|
||||
assert_eq!(init_struct.current_size(), 0);
|
||||
|
||||
// Initial grow
|
||||
let size1 = 10000;
|
||||
init_struct.set_size(size1).unwrap();
|
||||
assert_eq!(init_struct.current_size(), size1);
|
||||
|
||||
// Write some data
|
||||
let data_ptr = init_struct.data_ptr.as_ptr();
|
||||
write_range(data_ptr, 0xAA, 0..size1);
|
||||
assert_range(data_ptr, 0xAA, 0..size1);
|
||||
|
||||
// Shrink
|
||||
let size2 = 5000;
|
||||
init_struct.set_size(size2).unwrap();
|
||||
assert_eq!(init_struct.current_size(), size2);
|
||||
|
||||
// Grow again
|
||||
let size3 = 20000;
|
||||
init_struct.set_size(size3).unwrap();
|
||||
assert_eq!(init_struct.current_size(), size3);
|
||||
|
||||
// Try to read it. The area that was shrunk and grown again should read as all zeros now
|
||||
assert_range(data_ptr, 0xAA, 0..5000);
|
||||
assert_range(data_ptr, 0, 5000..size1);
|
||||
|
||||
// Try to grow beyond max_size
|
||||
//let size4 = max_size + 1;
|
||||
//assert!(init_struct.set_size(size4).is_err());
|
||||
|
||||
// Dropping init_struct should unmap the memory
|
||||
drop(init_struct);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// This is used in tests to coordinate between test processes. It's like std::sync::Barrier,
|
||||
/// but is stored in the shared memory area and works across processes. It's implemented by
|
||||
/// polling, because e.g. standard rust mutexes are not guaranteed to work across processes.
|
||||
struct SimpleBarrier {
|
||||
num_procs: usize,
|
||||
count: AtomicUsize,
|
||||
}
|
||||
|
||||
impl SimpleBarrier {
|
||||
unsafe fn init(ptr: *mut SimpleBarrier, num_procs: usize) {
|
||||
unsafe {
|
||||
*ptr = SimpleBarrier {
|
||||
num_procs,
|
||||
count: AtomicUsize::new(0),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn wait(&self) {
|
||||
let old = self.count.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
let generation = old / self.num_procs;
|
||||
|
||||
let mut current = old + 1;
|
||||
while current < (generation + 1) * self.num_procs {
|
||||
std::thread::sleep(std::time::Duration::from_millis(10));
|
||||
current = self.count.load(Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_multi_process() {
|
||||
// Initialize
|
||||
let max_size = 1_000_000_000_000;
|
||||
let init_struct = ShmemHandle::new("test_multi_process", 0, max_size).unwrap();
|
||||
let ptr = init_struct.data_ptr.as_ptr();
|
||||
|
||||
// Store the SimpleBarrier in the first 1k of the area.
|
||||
init_struct.set_size(10000).unwrap();
|
||||
let barrier_ptr: *mut SimpleBarrier = unsafe {
|
||||
ptr.add(ptr.align_offset(std::mem::align_of::<SimpleBarrier>()))
|
||||
.cast()
|
||||
};
|
||||
unsafe { SimpleBarrier::init(barrier_ptr, 2) };
|
||||
let barrier = unsafe { barrier_ptr.as_ref().unwrap() };
|
||||
|
||||
// Fork another test process. The code after this runs in both processes concurrently.
|
||||
let fork_result = unsafe { nix::unistd::fork().unwrap() };
|
||||
|
||||
// In the parent, fill bytes between 1000..2000. In the child, between 2000..3000
|
||||
if fork_result.is_parent() {
|
||||
write_range(ptr, 0xAA, 1000..2000);
|
||||
} else {
|
||||
write_range(ptr, 0xBB, 2000..3000);
|
||||
}
|
||||
barrier.wait();
|
||||
// Verify the contents. (in both processes)
|
||||
assert_range(ptr, 0xAA, 1000..2000);
|
||||
assert_range(ptr, 0xBB, 2000..3000);
|
||||
|
||||
// Grow, from the child this time
|
||||
let size = 10_000_000;
|
||||
if !fork_result.is_parent() {
|
||||
init_struct.set_size(size).unwrap();
|
||||
}
|
||||
barrier.wait();
|
||||
|
||||
// make some writes at the end
|
||||
if fork_result.is_parent() {
|
||||
write_range(ptr, 0xAA, (size - 10)..size);
|
||||
} else {
|
||||
write_range(ptr, 0xBB, (size - 20)..(size - 10));
|
||||
}
|
||||
barrier.wait();
|
||||
|
||||
// Verify the contents. (This runs in both processes)
|
||||
assert_range(ptr, 0, (size - 1000)..(size - 20));
|
||||
assert_range(ptr, 0xBB, (size - 20)..(size - 10));
|
||||
assert_range(ptr, 0xAA, (size - 10)..size);
|
||||
|
||||
if let ForkResult::Parent { child } = fork_result {
|
||||
nix::sys::wait::waitpid(child, None).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -235,7 +235,7 @@ pub enum PageServiceProtocolPipelinedBatchingStrategy {
|
||||
ScatteredLsn,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
|
||||
#[serde(tag = "mode", rename_all = "kebab-case")]
|
||||
pub enum GetVectoredConcurrentIo {
|
||||
/// The read path is fully sequential: layers are visited
|
||||
|
||||
13
pageserver/page_api/Cargo.toml
Normal file
13
pageserver/page_api/Cargo.toml
Normal file
@@ -0,0 +1,13 @@
|
||||
[package]
|
||||
name = "pageserver_page_api"
|
||||
version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
prost.workspace = true
|
||||
tonic.workspace = true
|
||||
workspace_hack.workspace = true
|
||||
|
||||
[build-dependencies]
|
||||
tonic-build.workspace = true
|
||||
13
pageserver/page_api/build.rs
Normal file
13
pageserver/page_api/build.rs
Normal file
@@ -0,0 +1,13 @@
|
||||
use std::env;
|
||||
use std::path::PathBuf;
|
||||
|
||||
/// Generates Rust code from .proto Protobuf schemas, along with a binary file
|
||||
/// descriptor set for Protobuf schema reflection.
|
||||
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let out_dir = PathBuf::from(env::var("OUT_DIR")?);
|
||||
tonic_build::configure()
|
||||
.bytes(["."])
|
||||
.file_descriptor_set_path(out_dir.join("page_api_descriptor.bin"))
|
||||
.compile_protos(&["proto/page_service.proto"], &["proto"])
|
||||
.map_err(|err| err.into())
|
||||
}
|
||||
233
pageserver/page_api/proto/page_service.proto
Normal file
233
pageserver/page_api/proto/page_service.proto
Normal file
@@ -0,0 +1,233 @@
|
||||
// Page service, presented by pageservers for computes.
|
||||
//
|
||||
// This is the compute read path. It primarily serves page versions at given
|
||||
// LSNs, but also base backups, SLRU segments, and relation metadata.
|
||||
//
|
||||
// EXPERIMENTAL: this is still under development and subject to change.
|
||||
//
|
||||
// Request metadata headers:
|
||||
// - authorization: JWT token ("Bearer <token>"), if auth is enabled
|
||||
// - neon-tenant-id: tenant ID ("7c4a1f9e3bd6470c8f3e21a65bd2e980")
|
||||
// - neon-shard-id: shard ID, as <number><count> in hex ("0b10" = shard 11 of 16, 0-based)
|
||||
// - neon-timeline-id: timeline ID ("f08c4e9a2d5f76b1e3a7c2d8910f4b3e")
|
||||
//
|
||||
// The service can be accessed via e.g. grpcurl:
|
||||
//
|
||||
// ```
|
||||
// grpcurl \
|
||||
// -plaintext \
|
||||
// -H "neon-tenant-id: 7c4a1f9e3bd6470c8f3e21a65bd2e980" \
|
||||
// -H "neon-shard-id: 0b10" \
|
||||
// -H "neon-timeline-id: f08c4e9a2d5f76b1e3a7c2d8910f4b3e" \
|
||||
// -H "authorization: Bearer $JWT" \
|
||||
// -d '{"read_lsn": {"request_lsn": 1234567890}, "rel": {"spc_oid": 1663, "db_oid": 1234, "rel_number": 5678, "fork_number": 0}}'
|
||||
// localhost:51051 page_api.PageService/CheckRelExists
|
||||
// ```
|
||||
//
|
||||
// TODO: consider adding neon-compute-mode ("primary", "static", "replica").
|
||||
// However, this will require reconnecting when changing modes.
|
||||
//
|
||||
// TODO: write implementation guidance on
|
||||
// - Health checks
|
||||
// - Tracing, OpenTelemetry
|
||||
// - Compression
|
||||
|
||||
syntax = "proto3";
|
||||
package page_api;
|
||||
|
||||
service PageService {
|
||||
// Returns whether a relation exists.
|
||||
rpc CheckRelExists(CheckRelExistsRequest) returns (CheckRelExistsResponse);
|
||||
|
||||
// Fetches a base backup.
|
||||
rpc GetBaseBackup (GetBaseBackupRequest) returns (stream GetBaseBackupResponseChunk);
|
||||
|
||||
// Returns the total size of a database, as # of bytes.
|
||||
rpc GetDbSize (GetDbSizeRequest) returns (GetDbSizeResponse);
|
||||
|
||||
// Fetches pages.
|
||||
//
|
||||
// This is implemented as a bidirectional streaming RPC for performance. Unary
|
||||
// requests incur costs for e.g. HTTP/2 stream setup, header parsing,
|
||||
// authentication, and so on -- with streaming, we only pay these costs during
|
||||
// the initial stream setup. This ~doubles throughput in benchmarks. Other
|
||||
// RPCs use regular unary requests, since they are not as frequent and
|
||||
// performance-critical, and this simplifies implementation.
|
||||
//
|
||||
// NB: a status response (e.g. errors) will terminate the stream. The stream
|
||||
// may be shared by e.g. multiple Postgres backends, so we should avoid this.
|
||||
// Most errors are therefore sent as GetPageResponse.status instead.
|
||||
rpc GetPages (stream GetPageRequest) returns (stream GetPageResponse);
|
||||
|
||||
// Returns the size of a relation, as # of blocks.
|
||||
rpc GetRelSize (GetRelSizeRequest) returns (GetRelSizeResponse);
|
||||
|
||||
// Fetches an SLRU segment.
|
||||
rpc GetSlruSegment (GetSlruSegmentRequest) returns (GetSlruSegmentResponse);
|
||||
}
|
||||
|
||||
// The LSN a request should read at.
|
||||
message ReadLsn {
|
||||
// The request's read LSN. Required.
|
||||
uint64 request_lsn = 1;
|
||||
// If given, the caller guarantees that the page has not been modified since
|
||||
// this LSN. Must be smaller than or equal to request_lsn. This allows the
|
||||
// Pageserver to serve an old page without waiting for the request LSN to
|
||||
// arrive. Valid for all request types.
|
||||
//
|
||||
// It is undefined behaviour to make a request such that the page was, in
|
||||
// fact, modified between request_lsn and not_modified_since_lsn. The
|
||||
// Pageserver might detect it and return an error, or it might return the old
|
||||
// page version or the new page version. Setting not_modified_since_lsn equal
|
||||
// to request_lsn is always safe, but can lead to unnecessary waiting.
|
||||
uint64 not_modified_since_lsn = 2;
|
||||
}
|
||||
|
||||
// A relation identifier.
|
||||
message RelTag {
|
||||
uint32 spc_oid = 1;
|
||||
uint32 db_oid = 2;
|
||||
uint32 rel_number = 3;
|
||||
uint32 fork_number = 4;
|
||||
}
|
||||
|
||||
// Checks whether a relation exists, at the given LSN. Only valid on shard 0,
|
||||
// other shards will error.
|
||||
message CheckRelExistsRequest {
|
||||
ReadLsn read_lsn = 1;
|
||||
RelTag rel = 2;
|
||||
}
|
||||
|
||||
message CheckRelExistsResponse {
|
||||
bool exists = 1;
|
||||
}
|
||||
|
||||
// Requests a base backup at a given LSN.
|
||||
message GetBaseBackupRequest {
|
||||
// The LSN to fetch a base backup at.
|
||||
ReadLsn read_lsn = 1;
|
||||
// If true, logical replication slots will not be created.
|
||||
bool replica = 2;
|
||||
}
|
||||
|
||||
// Base backup response chunk, returned as an ordered stream.
|
||||
message GetBaseBackupResponseChunk {
|
||||
// A basebackup data chunk. The size is undefined, but bounded by the 4 MB
|
||||
// gRPC message size limit.
|
||||
bytes chunk = 1;
|
||||
}
|
||||
|
||||
// Requests the size of a database, as # of bytes. Only valid on shard 0, other
|
||||
// shards will error.
|
||||
message GetDbSizeRequest {
|
||||
ReadLsn read_lsn = 1;
|
||||
uint32 db_oid = 2;
|
||||
}
|
||||
|
||||
message GetDbSizeResponse {
|
||||
uint64 num_bytes = 1;
|
||||
}
|
||||
|
||||
// Requests one or more pages.
|
||||
message GetPageRequest {
|
||||
// A request ID. Will be included in the response. Should be unique for
|
||||
// in-flight requests on the stream.
|
||||
uint64 request_id = 1;
|
||||
// The request class.
|
||||
GetPageClass request_class = 2;
|
||||
// The LSN to read at.
|
||||
ReadLsn read_lsn = 3;
|
||||
// The relation to read from.
|
||||
RelTag rel = 4;
|
||||
// Page numbers to read. Must belong to the remote shard.
|
||||
//
|
||||
// Multiple pages will be executed as a single batch by the Pageserver,
|
||||
// amortizing layer access costs and parallelizing them. This may increase the
|
||||
// latency of any individual request, but improves the overall latency and
|
||||
// throughput of the batch as a whole.
|
||||
//
|
||||
// TODO: this causes an allocation in the common single-block case. The sender
|
||||
// can use a SmallVec to stack-allocate it, but Prost will always deserialize
|
||||
// into a heap-allocated Vec. Consider optimizing this.
|
||||
//
|
||||
// TODO: we might be able to avoid a sort or something if we mandate that these
|
||||
// are always in order. But we can't currenly rely on this on the server, because
|
||||
// of compatibility with the libpq protocol handler.
|
||||
repeated uint32 block_number = 5;
|
||||
}
|
||||
|
||||
// A GetPageRequest class. Primarily intended for observability, but may also be
|
||||
// used for prioritization in the future.
|
||||
enum GetPageClass {
|
||||
// Unknown class. For forwards compatibility: used when the client sends a
|
||||
// class that the server doesn't know about.
|
||||
GET_PAGE_CLASS_UNKNOWN = 0;
|
||||
// A normal request. This is the default.
|
||||
GET_PAGE_CLASS_NORMAL = 1;
|
||||
// A prefetch request. NB: can only be classified on pg < 18.
|
||||
GET_PAGE_CLASS_PREFETCH = 2;
|
||||
// A background request (e.g. vacuum).
|
||||
GET_PAGE_CLASS_BACKGROUND = 3;
|
||||
}
|
||||
|
||||
// A GetPage response.
|
||||
//
|
||||
// A batch response will contain all of the requested pages. We could eagerly
|
||||
// emit individual pages as soon as they are ready, but on a readv() Postgres
|
||||
// holds buffer pool locks on all pages in the batch and we'll only return once
|
||||
// the entire batch is ready, so no one can make use of the individual pages.
|
||||
message GetPageResponse {
|
||||
// The original request's ID.
|
||||
uint64 request_id = 1;
|
||||
// The response status code.
|
||||
GetPageStatus status = 2;
|
||||
// A string describing the status, if any.
|
||||
string reason = 3;
|
||||
// The 8KB page images, in the same order as the request. Empty if status != OK.
|
||||
repeated bytes page_image = 4;
|
||||
}
|
||||
|
||||
// A GetPageResponse status code. Since we use a bidirectional stream, we don't
|
||||
// want to send errors as gRPC statuses, since this would terminate the stream.
|
||||
enum GetPageStatus {
|
||||
// Unknown status. For forwards compatibility: used when the server sends a
|
||||
// status code that the client doesn't know about.
|
||||
GET_PAGE_STATUS_UNKNOWN = 0;
|
||||
// The request was successful.
|
||||
GET_PAGE_STATUS_OK = 1;
|
||||
// The page did not exist. The tenant/timeline/shard has already been
|
||||
// validated during stream setup.
|
||||
GET_PAGE_STATUS_NOT_FOUND = 2;
|
||||
// The request was invalid.
|
||||
GET_PAGE_STATUS_INVALID = 3;
|
||||
// The tenant is rate limited. Slow down and retry later.
|
||||
GET_PAGE_STATUS_SLOW_DOWN = 4;
|
||||
// TODO: consider adding a GET_PAGE_STATUS_LAYER_DOWNLOAD in the case of a
|
||||
// layer download. This could free up the server task to process other
|
||||
// requests while the layer download is in progress.
|
||||
}
|
||||
|
||||
// Fetches the size of a relation at a given LSN, as # of blocks. Only valid on
|
||||
// shard 0, other shards will error.
|
||||
message GetRelSizeRequest {
|
||||
ReadLsn read_lsn = 1;
|
||||
RelTag rel = 2;
|
||||
}
|
||||
|
||||
message GetRelSizeResponse {
|
||||
uint32 num_blocks = 1;
|
||||
}
|
||||
|
||||
// Requests an SLRU segment. Only valid on shard 0, other shards will error.
|
||||
message GetSlruSegmentRequest {
|
||||
ReadLsn read_lsn = 1;
|
||||
uint32 kind = 2;
|
||||
uint32 segno = 3;
|
||||
}
|
||||
|
||||
// Returns an SLRU segment.
|
||||
//
|
||||
// These are up 32 pages (256 KB), so we can send them as a single response.
|
||||
message GetSlruSegmentResponse {
|
||||
bytes segment = 1;
|
||||
}
|
||||
19
pageserver/page_api/src/lib.rs
Normal file
19
pageserver/page_api/src/lib.rs
Normal file
@@ -0,0 +1,19 @@
|
||||
//! This crate provides the Pageserver's page API. It contains:
|
||||
//!
|
||||
//! * proto/page_service.proto: the Protobuf schema for the page API.
|
||||
//! * proto: auto-generated Protobuf types for gRPC.
|
||||
//!
|
||||
//! This crate is used by both the client and the server. Try to keep it slim.
|
||||
|
||||
// Code generated by protobuf.
|
||||
pub mod proto {
|
||||
tonic::include_proto!("page_api");
|
||||
|
||||
/// File descriptor set for Protobuf schema reflection. This allows using
|
||||
/// e.g. grpcurl with the API.
|
||||
pub const FILE_DESCRIPTOR_SET: &[u8] =
|
||||
tonic::include_file_descriptor_set!("page_api_descriptor");
|
||||
|
||||
pub use page_service_client::PageServiceClient;
|
||||
pub use page_service_server::{PageService, PageServiceServer};
|
||||
}
|
||||
@@ -144,7 +144,7 @@ where
|
||||
replica,
|
||||
ctx,
|
||||
io_concurrency: IoConcurrency::spawn_from_conf(
|
||||
timeline.conf,
|
||||
timeline.conf.get_vectored_concurrent_io,
|
||||
timeline
|
||||
.gate
|
||||
.enter()
|
||||
|
||||
@@ -3199,7 +3199,7 @@ async fn list_aux_files(
|
||||
.await?;
|
||||
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
state.conf,
|
||||
state.conf.get_vectored_concurrent_io,
|
||||
timeline.gate.enter().map_err(|_| ApiError::Cancelled)?,
|
||||
);
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ use itertools::Itertools;
|
||||
use jsonwebtoken::TokenData;
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::config::{
|
||||
PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
|
||||
GetVectoredConcurrentIo, PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
|
||||
PageServiceProtocolPipelinedBatchingStrategy, PageServiceProtocolPipelinedExecutionStrategy,
|
||||
};
|
||||
use pageserver_api::key::rel_block_to_key;
|
||||
@@ -331,10 +331,10 @@ async fn page_service_conn_main(
|
||||
// But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
|
||||
// and create the per-query context in process_query ourselves.
|
||||
let mut conn_handler = PageServerHandler::new(
|
||||
conf,
|
||||
tenant_manager,
|
||||
auth,
|
||||
pipelining_config,
|
||||
conf.get_vectored_concurrent_io,
|
||||
perf_span_fields,
|
||||
connection_ctx,
|
||||
cancel.clone(),
|
||||
@@ -371,7 +371,6 @@ async fn page_service_conn_main(
|
||||
}
|
||||
|
||||
struct PageServerHandler {
|
||||
conf: &'static PageServerConf,
|
||||
auth: Option<Arc<SwappableJwtAuth>>,
|
||||
claims: Option<Claims>,
|
||||
|
||||
@@ -389,6 +388,7 @@ struct PageServerHandler {
|
||||
timeline_handles: Option<TimelineHandles>,
|
||||
|
||||
pipelining_config: PageServicePipeliningConfig,
|
||||
get_vectored_concurrent_io: GetVectoredConcurrentIo,
|
||||
|
||||
gate_guard: GateGuard,
|
||||
}
|
||||
@@ -844,17 +844,16 @@ impl BatchedFeMessage {
|
||||
impl PageServerHandler {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
auth: Option<Arc<SwappableJwtAuth>>,
|
||||
pipelining_config: PageServicePipeliningConfig,
|
||||
get_vectored_concurrent_io: GetVectoredConcurrentIo,
|
||||
perf_span_fields: ConnectionPerfSpanFields,
|
||||
connection_ctx: RequestContext,
|
||||
cancel: CancellationToken,
|
||||
gate_guard: GateGuard,
|
||||
) -> Self {
|
||||
PageServerHandler {
|
||||
conf,
|
||||
auth,
|
||||
claims: None,
|
||||
connection_ctx,
|
||||
@@ -862,6 +861,7 @@ impl PageServerHandler {
|
||||
timeline_handles: Some(TimelineHandles::new(tenant_manager)),
|
||||
cancel,
|
||||
pipelining_config,
|
||||
get_vectored_concurrent_io,
|
||||
gate_guard,
|
||||
}
|
||||
}
|
||||
@@ -1278,7 +1278,7 @@ impl PageServerHandler {
|
||||
}
|
||||
|
||||
#[instrument(level = tracing::Level::DEBUG, skip_all)]
|
||||
async fn pagesteam_handle_batched_message<IO>(
|
||||
async fn pagestream_handle_batched_message<IO>(
|
||||
&mut self,
|
||||
pgb_writer: &mut PostgresBackend<IO>,
|
||||
batch: BatchedFeMessage,
|
||||
@@ -1623,7 +1623,7 @@ impl PageServerHandler {
|
||||
}
|
||||
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
self.conf,
|
||||
self.get_vectored_concurrent_io,
|
||||
match self.gate_guard.try_clone() {
|
||||
Ok(guard) => guard,
|
||||
Err(_) => {
|
||||
@@ -1733,7 +1733,7 @@ impl PageServerHandler {
|
||||
};
|
||||
|
||||
let result = self
|
||||
.pagesteam_handle_batched_message(
|
||||
.pagestream_handle_batched_message(
|
||||
pgb_writer,
|
||||
msg,
|
||||
io_concurrency.clone(),
|
||||
@@ -1909,7 +1909,7 @@ impl PageServerHandler {
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
self.pagesteam_handle_batched_message(
|
||||
self.pagestream_handle_batched_message(
|
||||
pgb_writer,
|
||||
batch,
|
||||
io_concurrency.clone(),
|
||||
|
||||
@@ -586,7 +586,7 @@ impl Timeline {
|
||||
// scan directory listing (new), merge with the old results
|
||||
let key_range = rel_tag_sparse_key_range(spcnode, dbnode);
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
self.conf,
|
||||
self.conf.get_vectored_concurrent_io,
|
||||
self.gate
|
||||
.enter()
|
||||
.map_err(|_| PageReconstructError::Cancelled)?,
|
||||
@@ -645,7 +645,7 @@ impl Timeline {
|
||||
);
|
||||
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
self.conf,
|
||||
self.conf.get_vectored_concurrent_io,
|
||||
self.gate
|
||||
.enter()
|
||||
.map_err(|_| PageReconstructError::Cancelled)?,
|
||||
@@ -885,7 +885,7 @@ impl Timeline {
|
||||
);
|
||||
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
self.conf,
|
||||
self.conf.get_vectored_concurrent_io,
|
||||
self.gate
|
||||
.enter()
|
||||
.map_err(|_| PageReconstructError::Cancelled)?,
|
||||
|
||||
@@ -8596,8 +8596,10 @@ mod tests {
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Option<Bytes>, GetVectoredError> {
|
||||
let io_concurrency =
|
||||
IoConcurrency::spawn_from_conf(tline.conf, tline.gate.enter().unwrap());
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
tline.conf.get_vectored_concurrent_io,
|
||||
tline.gate.enter().unwrap(),
|
||||
);
|
||||
let mut reconstruct_state = ValuesReconstructState::new(io_concurrency);
|
||||
let query = VersionedKeySpaceQuery::uniform(KeySpace::single(key..key.next()), lsn);
|
||||
let mut res = tline
|
||||
|
||||
@@ -31,6 +31,7 @@ pub use inmemory_layer::InMemoryLayer;
|
||||
pub(crate) use layer::{EvictionError, Layer, ResidentLayer};
|
||||
pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey};
|
||||
pub use layer_name::{DeltaLayerName, ImageLayerName, LayerName};
|
||||
use pageserver_api::config::GetVectoredConcurrentIo;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
|
||||
use pageserver_api::record::NeonWalRecord;
|
||||
@@ -43,7 +44,6 @@ use self::inmemory_layer::InMemoryLayerFileId;
|
||||
use super::PageReconstructError;
|
||||
use super::layer_map::InMemoryLayerDesc;
|
||||
use super::timeline::{GetVectoredError, ReadPath};
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{
|
||||
AccessStatsBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
|
||||
};
|
||||
@@ -318,11 +318,10 @@ impl IoConcurrency {
|
||||
}
|
||||
|
||||
pub(crate) fn spawn_from_conf(
|
||||
conf: &'static PageServerConf,
|
||||
conf: GetVectoredConcurrentIo,
|
||||
gate_guard: GateGuard,
|
||||
) -> IoConcurrency {
|
||||
use pageserver_api::config::GetVectoredConcurrentIo;
|
||||
let selected = match conf.get_vectored_concurrent_io {
|
||||
let selected = match conf {
|
||||
GetVectoredConcurrentIo::Sequential => SelectedIoConcurrency::Sequential,
|
||||
GetVectoredConcurrentIo::SidecarTask => SelectedIoConcurrency::SidecarTask(gate_guard),
|
||||
};
|
||||
|
||||
@@ -3530,7 +3530,7 @@ impl Timeline {
|
||||
};
|
||||
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
self_ref.conf,
|
||||
self_ref.conf.get_vectored_concurrent_io,
|
||||
self_ref
|
||||
.gate
|
||||
.enter()
|
||||
@@ -5559,7 +5559,7 @@ impl Timeline {
|
||||
});
|
||||
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
self.conf,
|
||||
self.conf.get_vectored_concurrent_io,
|
||||
self.gate
|
||||
.enter()
|
||||
.map_err(|_| CreateImageLayersError::Cancelled)?,
|
||||
|
||||
@@ -188,7 +188,7 @@ pub(crate) async fn generate_tombstone_image_layer(
|
||||
"removing non-inherited keys by writing an image layer with tombstones at the detach LSN"
|
||||
);
|
||||
let io_concurrency = IoConcurrency::spawn_from_conf(
|
||||
detached.conf,
|
||||
detached.conf.get_vectored_concurrent_io,
|
||||
detached.gate.enter().map_err(|_| Error::ShuttingDown)?,
|
||||
);
|
||||
let mut reconstruct_state = ValuesReconstructState::new(io_concurrency);
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
#include "postgres.h"
|
||||
|
||||
#include <sys/file.h>
|
||||
#include <sys/ioctl.h>
|
||||
#include <unistd.h>
|
||||
#include <fcntl.h>
|
||||
|
||||
@@ -52,6 +53,10 @@
|
||||
#include "pagestore_client.h"
|
||||
#include "communicator.h"
|
||||
|
||||
/* For the kernel module */
|
||||
#include "neon_pagecache.h"
|
||||
#define CLOCKCACHE_DEV_PATH "/dev/clockcache_dev"
|
||||
|
||||
#define CriticalAssert(cond) do if (!(cond)) elog(PANIC, "LFC: assertion %s failed at %s:%d: ", #cond, __FILE__, __LINE__); while (0)
|
||||
|
||||
/*
|
||||
@@ -159,6 +164,13 @@ typedef struct FileCacheControl
|
||||
uint64 time_write; /* time spent writing (us) */
|
||||
uint64 resizes; /* number of LFC resizes */
|
||||
uint64 evicted_pages; /* number of evicted pages */
|
||||
|
||||
/* FIXME: should make these atomic, they're not protected by any locks */
|
||||
uint64 kernel_module_read_hits; /* success returns from read ioctl */
|
||||
uint64 kernel_module_read_misses; /* ENOENT returns from read ioctl */
|
||||
uint64 kernel_module_write_hits; /* success returns from write ioctl */
|
||||
uint64 kernel_module_write_misses; /* ENOMEM returns from write ioctl */
|
||||
|
||||
dlist_head lru; /* double linked list for LRU replacement
|
||||
* algorithm */
|
||||
dlist_head holes; /* double linked list of punched holes */
|
||||
@@ -183,6 +195,7 @@ typedef struct FileCacheControl
|
||||
static HTAB *lfc_hash;
|
||||
static int lfc_desc = -1;
|
||||
static LWLockId lfc_lock;
|
||||
|
||||
static int lfc_max_size;
|
||||
static int lfc_size_limit;
|
||||
static int lfc_prewarm_limit;
|
||||
@@ -190,6 +203,8 @@ static int lfc_prewarm_batch;
|
||||
static int lfc_chunk_size_log = MAX_BLOCKS_PER_CHUNK_LOG;
|
||||
static int lfc_blocks_per_chunk = MAX_BLOCKS_PER_CHUNK;
|
||||
static char *lfc_path;
|
||||
static bool lfc_use_kernel_module;
|
||||
|
||||
static uint64 lfc_generation;
|
||||
static FileCacheControl *lfc_ctl;
|
||||
static bool lfc_do_prewarm;
|
||||
@@ -203,6 +218,9 @@ bool lfc_prewarm_update_ws_estimation;
|
||||
|
||||
#define LFC_ENABLED() (lfc_ctl->limit != 0)
|
||||
|
||||
static int pread_with_ioctl(void *buffer, uint64 blkno);
|
||||
static int pwrite_with_ioctl(const void *buffer, uint64 blkno);
|
||||
|
||||
/*
|
||||
* Close LFC file if opened.
|
||||
* All backends should close their LFC files once LFC is disabled.
|
||||
@@ -251,14 +269,19 @@ lfc_switch_off(void)
|
||||
/*
|
||||
* We need to use unlink to to avoid races in LFC write, because it is not
|
||||
* protected by lock
|
||||
*
|
||||
* FIXME: how to clean up the kernel module device on trouble?
|
||||
*/
|
||||
unlink(lfc_path);
|
||||
if (!lfc_use_kernel_module)
|
||||
{
|
||||
unlink(lfc_path);
|
||||
|
||||
fd = BasicOpenFile(lfc_path, O_RDWR | O_CREAT | O_TRUNC);
|
||||
if (fd < 0)
|
||||
elog(WARNING, "LFC: failed to recreate local file cache %s: %m", lfc_path);
|
||||
else
|
||||
close(fd);
|
||||
fd = BasicOpenFile(lfc_path, O_RDWR | O_CREAT | O_TRUNC);
|
||||
if (fd < 0)
|
||||
elog(WARNING, "LFC: failed to recreate local file cache %s: %m", lfc_path);
|
||||
else
|
||||
close(fd);
|
||||
}
|
||||
|
||||
/* Wakeup waiting backends */
|
||||
for (int i = 0; i < N_COND_VARS; i++)
|
||||
@@ -270,7 +293,8 @@ lfc_switch_off(void)
|
||||
static void
|
||||
lfc_disable(char const *op)
|
||||
{
|
||||
elog(WARNING, "LFC: failed to %s local file cache at %s: %m, disabling local file cache", op, lfc_path);
|
||||
elog(WARNING, "LFC: failed to %s local file cache at %s: %m, disabling local file cache",
|
||||
op, lfc_use_kernel_module ? CLOCKCACHE_DEV_PATH : lfc_path);
|
||||
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
lfc_switch_off();
|
||||
@@ -301,7 +325,9 @@ lfc_ensure_opened(void)
|
||||
/* Open cache file if not done yet */
|
||||
if (lfc_desc < 0)
|
||||
{
|
||||
lfc_desc = BasicOpenFile(lfc_path, O_RDWR);
|
||||
lfc_desc = BasicOpenFile(
|
||||
lfc_use_kernel_module ? CLOCKCACHE_DEV_PATH : lfc_path,
|
||||
O_RDWR);
|
||||
|
||||
if (lfc_desc < 0)
|
||||
{
|
||||
@@ -351,10 +377,16 @@ lfc_shmem_startup(void)
|
||||
initSHLL(&lfc_ctl->wss_estimation);
|
||||
|
||||
/* Recreate file cache on restart */
|
||||
fd = BasicOpenFile(lfc_path, O_RDWR | O_CREAT | O_TRUNC);
|
||||
if (lfc_use_kernel_module)
|
||||
fd = BasicOpenFile(CLOCKCACHE_DEV_PATH, O_RDWR);
|
||||
else
|
||||
fd = BasicOpenFile(lfc_path, O_RDWR | O_CREAT | O_TRUNC);
|
||||
if (fd < 0)
|
||||
{
|
||||
elog(WARNING, "LFC: failed to create local file cache %s: %m", lfc_path);
|
||||
if (lfc_use_kernel_module)
|
||||
elog(WARNING, "LFC: failed to open " CLOCKCACHE_DEV_PATH ": %m");
|
||||
else
|
||||
elog(WARNING, "LFC: failed to create local file cache %s: %m", lfc_path);
|
||||
lfc_ctl->limit = 0;
|
||||
}
|
||||
else
|
||||
@@ -613,6 +645,15 @@ lfc_init(void)
|
||||
NULL,
|
||||
NULL);
|
||||
|
||||
DefineCustomBoolVariable("neon.use_kernel_module",
|
||||
"Use neon_pagecache kernel module instead of a regular file (EXPERIMENTAL)",
|
||||
NULL,
|
||||
&lfc_use_kernel_module,
|
||||
true,
|
||||
PGC_POSTMASTER,
|
||||
0,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
if (lfc_max_size == 0)
|
||||
return;
|
||||
|
||||
@@ -936,6 +977,44 @@ lfc_prewarm_main(Datum main_arg)
|
||||
lfc_ctl->prewarm_workers[worker_id].completed = GetCurrentTimestamp();
|
||||
}
|
||||
|
||||
void
|
||||
lfc_invalidate(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber nblocks)
|
||||
{
|
||||
BufferTag tag;
|
||||
FileCacheEntry *entry;
|
||||
uint32 hash;
|
||||
|
||||
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
|
||||
return;
|
||||
|
||||
CopyNRelFileInfoToBufTag(tag, rinfo);
|
||||
tag.forkNum = forkNum;
|
||||
|
||||
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
|
||||
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
if (LFC_ENABLED())
|
||||
{
|
||||
for (BlockNumber blkno = 0; blkno < nblocks; blkno += lfc_blocks_per_chunk)
|
||||
{
|
||||
tag.blockNum = blkno;
|
||||
hash = get_hash_value(lfc_hash, &tag);
|
||||
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
|
||||
if (entry != NULL)
|
||||
{
|
||||
for (int i = 0; i < lfc_blocks_per_chunk; i++)
|
||||
{
|
||||
if (GET_STATE(entry, i) == AVAILABLE)
|
||||
{
|
||||
lfc_ctl->used_pages -= 1;
|
||||
SET_STATE(entry, i, UNAVAILABLE);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
LWLockRelease(lfc_lock);
|
||||
}
|
||||
|
||||
/*
|
||||
* Check if page is present in the cache.
|
||||
@@ -1259,27 +1338,57 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
/* offset of first IOV */
|
||||
first_read_offset += chunk_offs + first_block_in_chunk_read;
|
||||
|
||||
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_READ);
|
||||
|
||||
/* Read only the blocks we're interested in, limiting */
|
||||
rc = preadv(lfc_desc, &iov[first_block_in_chunk_read],
|
||||
nwrite, first_read_offset * BLCKSZ);
|
||||
pgstat_report_wait_end();
|
||||
|
||||
if (rc != (BLCKSZ * nwrite))
|
||||
if (lfc_use_kernel_module)
|
||||
{
|
||||
lfc_disable("read");
|
||||
return -1;
|
||||
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_READ);
|
||||
for (int i = first_block_in_chunk_read; i < iov_last_used; i++)
|
||||
{
|
||||
if (!BITMAP_ISSET(chunk_mask, i))
|
||||
continue;
|
||||
Assert(iov[i].iov_len == BLCKSZ);
|
||||
rc = pread_with_ioctl(iov[i].iov_base, first_read_offset + i - first_block_in_chunk_read);
|
||||
if (rc < 0 && errno == ENOENT)
|
||||
{
|
||||
/* The kernel module evicted the page */
|
||||
elog(DEBUG1, "kernel module had evicted block");
|
||||
}
|
||||
else if (rc < 0)
|
||||
{
|
||||
pgstat_report_wait_end();
|
||||
lfc_disable("ioctl read");
|
||||
return -1;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* success! */
|
||||
BITMAP_SET(mask, buf_offset + i);
|
||||
}
|
||||
}
|
||||
pgstat_report_wait_end();
|
||||
}
|
||||
|
||||
/*
|
||||
* We successfully read the pages we know were valid when we
|
||||
* started reading; now mark those pages as read
|
||||
*/
|
||||
for (int i = first_block_in_chunk_read; i < iov_last_used; i++)
|
||||
else
|
||||
{
|
||||
if (BITMAP_ISSET(chunk_mask, i))
|
||||
BITMAP_SET(mask, buf_offset + i);
|
||||
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_READ);
|
||||
rc = preadv(lfc_desc, &iov[first_block_in_chunk_read],
|
||||
nwrite, first_read_offset * BLCKSZ);
|
||||
pgstat_report_wait_end();
|
||||
|
||||
if (rc != (BLCKSZ * nwrite))
|
||||
{
|
||||
lfc_disable("read");
|
||||
return -1;
|
||||
}
|
||||
|
||||
/*
|
||||
* We successfully read the pages we know were valid when we
|
||||
* started reading; now mark those pages as read
|
||||
*/
|
||||
for (int i = first_block_in_chunk_read; i < iov_last_used; i++)
|
||||
{
|
||||
if (BITMAP_ISSET(chunk_mask, i))
|
||||
BITMAP_SET(mask, buf_offset + i);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1326,6 +1435,65 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
return blocks_read;
|
||||
}
|
||||
|
||||
static int
|
||||
pread_with_ioctl(void *buffer, uint64 blkno)
|
||||
{
|
||||
struct neon_rw_args args = {
|
||||
.key = {
|
||||
.key_hi = 0,
|
||||
.key_lo = blkno
|
||||
},
|
||||
.offset = 0,
|
||||
.length = POSTGRES_PAGE_SIZE,
|
||||
.buffer = (__u64)(uintptr_t) buffer
|
||||
};
|
||||
int rc;
|
||||
|
||||
errno = 0;
|
||||
|
||||
elog(LOG, "calling ioctl read for blk %lu with buffer=%p (shared_buffers is at %p-%p)",
|
||||
blkno,
|
||||
buffer,
|
||||
BufferBlocks,
|
||||
BufferBlocks + BLCKSZ * NBuffers);
|
||||
rc = ioctl(lfc_desc, NEON_IOCTL_READ, &args);
|
||||
if (rc >= 0)
|
||||
lfc_ctl->kernel_module_read_hits++;
|
||||
else if (rc < 0 && errno == ENOENT)
|
||||
lfc_ctl->kernel_module_read_misses++;
|
||||
else
|
||||
elog(LOG, "ioctl read failed for blk %lu with buffer=%p: %m", blkno, buffer);
|
||||
return rc;
|
||||
}
|
||||
|
||||
static int
|
||||
pwrite_with_ioctl(const void *buffer, uint64 blkno)
|
||||
{
|
||||
struct neon_rw_args args = {
|
||||
.key = {
|
||||
.key_hi = 0,
|
||||
.key_lo = blkno
|
||||
},
|
||||
.offset = 0,
|
||||
.length = POSTGRES_PAGE_SIZE,
|
||||
.buffer = (__u64)(uintptr_t) buffer
|
||||
};
|
||||
int rc;
|
||||
|
||||
elog(LOG, "calling ioctl write for blk %lu with buffer=%p (shared_buffers is at %p-%p)",
|
||||
blkno,
|
||||
buffer,
|
||||
BufferBlocks,
|
||||
BufferBlocks + BLCKSZ * NBuffers);
|
||||
|
||||
rc = ioctl(lfc_desc, NEON_IOCTL_WRITE, &args);
|
||||
if (rc >= 0)
|
||||
lfc_ctl->kernel_module_write_hits++;
|
||||
else if (rc < 0 && errno == ENOMEM)
|
||||
lfc_ctl->kernel_module_write_misses++;
|
||||
return rc;
|
||||
}
|
||||
|
||||
/*
|
||||
* Initialize new LFC hash entry, perform eviction if needed.
|
||||
* Returns false if there are no unpinned entries and chunk can not be added.
|
||||
@@ -1446,7 +1614,6 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
{
|
||||
BufferTag tag;
|
||||
FileCacheEntry *entry;
|
||||
ssize_t rc;
|
||||
bool found;
|
||||
uint32 hash;
|
||||
uint64 generation;
|
||||
@@ -1455,6 +1622,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
ConditionVariable* cv;
|
||||
FileCacheBlockState state;
|
||||
XLogRecPtr lwlsn;
|
||||
bool success;
|
||||
|
||||
int chunk_offs = BLOCK_TO_CHUNK_OFF(blkno);
|
||||
|
||||
@@ -1533,16 +1701,60 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
|
||||
|
||||
LWLockRelease(lfc_lock);
|
||||
|
||||
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_WRITE);
|
||||
INSTR_TIME_SET_CURRENT(io_start);
|
||||
rc = pwrite(lfc_desc, buffer, BLCKSZ,
|
||||
((off_t) entry_offset * lfc_blocks_per_chunk + chunk_offs) * BLCKSZ);
|
||||
INSTR_TIME_SET_CURRENT(io_end);
|
||||
pgstat_report_wait_end();
|
||||
|
||||
if (rc != BLCKSZ)
|
||||
if (lfc_use_kernel_module)
|
||||
{
|
||||
lfc_disable("write");
|
||||
int rc;
|
||||
|
||||
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_WRITE);
|
||||
INSTR_TIME_SET_CURRENT(io_start);
|
||||
rc = pwrite_with_ioctl(buffer,
|
||||
entry_offset * lfc_blocks_per_chunk + chunk_offs);
|
||||
INSTR_TIME_SET_CURRENT(io_end);
|
||||
pgstat_report_wait_end();
|
||||
|
||||
if (rc < 0 && errno == ENOMEM)
|
||||
{
|
||||
/*
|
||||
* Write was wasted.
|
||||
*
|
||||
* FIXME: We could mark the page in the chunk as UNAVAILABLE,
|
||||
* since we know it was not actually present in the kernel
|
||||
* cache. Any subsequent read on it will inevitably fail with
|
||||
* ENOENT. That's not a correctness issue however, assuming that
|
||||
* the call never returns ENOMEM when the old version of the page
|
||||
* is still in the cache.
|
||||
*/
|
||||
success = true;
|
||||
}
|
||||
else if (rc < 0)
|
||||
{
|
||||
success = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* successful write */
|
||||
success = true;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
ssize_t rc;
|
||||
|
||||
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_WRITE);
|
||||
INSTR_TIME_SET_CURRENT(io_start);
|
||||
|
||||
rc = pwrite(lfc_desc, buffer, BLCKSZ,
|
||||
((off_t) entry_offset * lfc_blocks_per_chunk + chunk_offs) * BLCKSZ);
|
||||
|
||||
INSTR_TIME_SET_CURRENT(io_end);
|
||||
pgstat_report_wait_end();
|
||||
|
||||
success = (rc == BLCKSZ);
|
||||
}
|
||||
|
||||
if (!success)
|
||||
{
|
||||
lfc_disable(lfc_use_kernel_module ? "write ioctl" : "write");
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -1718,19 +1930,60 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
|
||||
}
|
||||
LWLockRelease(lfc_lock);
|
||||
|
||||
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_WRITE);
|
||||
INSTR_TIME_SET_CURRENT(io_start);
|
||||
rc = pwritev(lfc_desc, iov, blocks_in_chunk,
|
||||
((off_t) entry_offset * lfc_blocks_per_chunk + chunk_offs) * BLCKSZ);
|
||||
INSTR_TIME_SET_CURRENT(io_end);
|
||||
pgstat_report_wait_end();
|
||||
|
||||
if (rc != BLCKSZ * blocks_in_chunk)
|
||||
/* Perform the write */
|
||||
if (lfc_use_kernel_module)
|
||||
{
|
||||
lfc_disable("write");
|
||||
return;
|
||||
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_WRITE);
|
||||
INSTR_TIME_SET_CURRENT(io_start);
|
||||
for (int i = 0; i < blocks_in_chunk; i++)
|
||||
{
|
||||
int rc;
|
||||
|
||||
rc = pwrite_with_ioctl(
|
||||
iov[i].iov_base,
|
||||
entry_offset * lfc_blocks_per_chunk + chunk_offs
|
||||
);
|
||||
if (rc < 0 && errno == ENOMEM)
|
||||
{
|
||||
/*
|
||||
* Write was wasted.
|
||||
*
|
||||
* FIXME: We could mark the page in the chunk as UNAVAILABLE,
|
||||
* since we know it was not actually present in the kernel
|
||||
* cache. Any subsequent read on it will inevitably fail with
|
||||
* ENOENT. That's not a correctness issue however, assuming that
|
||||
* the call never returns ENOMEM when the old version of the page
|
||||
* is still in the cache.
|
||||
*/
|
||||
}
|
||||
else if (rc < 0)
|
||||
{
|
||||
/* other error, not expected */
|
||||
INSTR_TIME_SET_CURRENT(io_end);
|
||||
pgstat_report_wait_end();
|
||||
lfc_disable("write ioctl");
|
||||
return;
|
||||
}
|
||||
}
|
||||
INSTR_TIME_SET_CURRENT(io_end);
|
||||
pgstat_report_wait_end();
|
||||
}
|
||||
else
|
||||
{
|
||||
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_WRITE);
|
||||
INSTR_TIME_SET_CURRENT(io_start);
|
||||
rc = pwritev(lfc_desc, iov, blocks_in_chunk,
|
||||
((off_t) entry_offset * lfc_blocks_per_chunk + chunk_offs) * BLCKSZ);
|
||||
INSTR_TIME_SET_CURRENT(io_end);
|
||||
pgstat_report_wait_end();
|
||||
if (rc != BLCKSZ * blocks_in_chunk)
|
||||
{
|
||||
lfc_disable("write");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
/* success */
|
||||
{
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
|
||||
@@ -1884,6 +2137,26 @@ neon_get_lfc_stats(PG_FUNCTION_ARGS)
|
||||
if (lfc_ctl)
|
||||
value = lfc_ctl->pinned;
|
||||
break;
|
||||
case 10:
|
||||
key = "file_cache_kernel_module_read_hits";
|
||||
if (lfc_ctl)
|
||||
value = lfc_ctl->kernel_module_read_hits;
|
||||
break;
|
||||
case 11:
|
||||
key = "file_cache_kernel_module_read_misses";
|
||||
if (lfc_ctl)
|
||||
value = lfc_ctl->kernel_module_read_misses;
|
||||
break;
|
||||
case 12:
|
||||
key = "file_cache_kernel_module_write_hits";
|
||||
if (lfc_ctl)
|
||||
value = lfc_ctl->kernel_module_write_hits;
|
||||
break;
|
||||
case 13:
|
||||
key = "file_cache_kernel_module_write_misses";
|
||||
if (lfc_ctl)
|
||||
value = lfc_ctl->kernel_module_write_misses;
|
||||
break;
|
||||
default:
|
||||
SRF_RETURN_DONE(funcctx);
|
||||
}
|
||||
|
||||
@@ -28,6 +28,7 @@ typedef struct FileCacheState
|
||||
extern bool lfc_store_prefetch_result;
|
||||
|
||||
/* functions for local file cache */
|
||||
extern void lfc_invalidate(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber nblocks);
|
||||
extern void lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum,
|
||||
BlockNumber blkno, const void *const *buffers,
|
||||
BlockNumber nblocks);
|
||||
|
||||
35
pgxn/neon/neon_pagecache.h
Normal file
35
pgxn/neon/neon_pagecache.h
Normal file
@@ -0,0 +1,35 @@
|
||||
/*
|
||||
* This is for the special ioctl in the neon_pagecache kernel module.
|
||||
*
|
||||
* DO NOT MODIFY! This header must agree with what the kernel module was
|
||||
* compiled with!
|
||||
*/
|
||||
|
||||
#ifndef NEON_PAGECACHE_H
|
||||
#define NEON_PAGECACHE_H
|
||||
|
||||
#include <linux/types.h>
|
||||
|
||||
|
||||
#define POSTGRES_PAGE_SIZE 8192 // 8 KiB
|
||||
|
||||
struct neon_key {
|
||||
__u64 key_hi; // Upper 64 bits of 128-bit key
|
||||
__u64 key_lo; // Lower 64 bits of 128-bit key
|
||||
};
|
||||
|
||||
struct neon_rw_args {
|
||||
struct neon_key key;
|
||||
__u32 offset; // Offset within page (0-8191)
|
||||
__u32 length; // Length to read/write
|
||||
__u64 buffer; // User buffer address
|
||||
};
|
||||
|
||||
#define NEON_IOC_MAGIC 'N'
|
||||
|
||||
#define NEON_IOCTL_READ _IOWR(NEON_IOC_MAGIC, 1, struct neon_rw_args)
|
||||
#define NEON_IOCTL_WRITE _IOWR(NEON_IOC_MAGIC, 2, struct neon_rw_args)
|
||||
|
||||
|
||||
|
||||
#endif /* NEON_PAGECACHE_H */
|
||||
@@ -86,7 +86,7 @@ InitBufferTag(BufferTag *tag, const RelFileNode *rnode,
|
||||
|
||||
#define InvalidRelFileNumber InvalidOid
|
||||
|
||||
#define SMgrRelGetRelInfo(reln) \
|
||||
#define SMgrRelGetRelInfo(reln) \
|
||||
(reln->smgr_rnode.node)
|
||||
|
||||
#define DropRelationAllLocalBuffers DropRelFileNodeAllLocalBuffers
|
||||
@@ -148,6 +148,12 @@ InitBufferTag(BufferTag *tag, const RelFileNode *rnode,
|
||||
#define DropRelationAllLocalBuffers DropRelationAllLocalBuffers
|
||||
#endif
|
||||
|
||||
#define NRelFileInfoInvalidate(rinfo) do { \
|
||||
NInfoGetSpcOid(rinfo) = InvalidOid; \
|
||||
NInfoGetDbOid(rinfo) = InvalidOid; \
|
||||
NInfoGetRelNumber(rinfo) = InvalidRelFileNumber; \
|
||||
} while (0)
|
||||
|
||||
#if PG_MAJORVERSION_NUM < 17
|
||||
#define ProcNumber BackendId
|
||||
#define INVALID_PROC_NUMBER InvalidBackendId
|
||||
|
||||
@@ -108,7 +108,7 @@ typedef enum
|
||||
UNLOGGED_BUILD_NOT_PERMANENT
|
||||
} UnloggedBuildPhase;
|
||||
|
||||
static SMgrRelation unlogged_build_rel = NULL;
|
||||
static NRelFileInfo unlogged_build_rel_info;
|
||||
static UnloggedBuildPhase unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
|
||||
|
||||
static bool neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id);
|
||||
@@ -912,16 +912,19 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
|
||||
{
|
||||
case 0:
|
||||
neon_log(ERROR, "cannot call smgrextend() on rel with unknown persistence");
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_PERMANENT:
|
||||
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
|
||||
{
|
||||
mdextend(reln, forkNum, blkno, buffer, skipFsync);
|
||||
return;
|
||||
}
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_TEMP:
|
||||
case RELPERSISTENCE_UNLOGGED:
|
||||
mdextend(reln, forkNum, blkno, buffer, skipFsync);
|
||||
/* Update LFC in case of unlogged index build */
|
||||
if (reln == unlogged_build_rel && unlogged_build_phase == UNLOGGED_BUILD_PHASE_2)
|
||||
lfc_write(InfoFromSMgrRel(reln), forkNum, blkno, buffer);
|
||||
return;
|
||||
|
||||
default:
|
||||
@@ -1003,21 +1006,19 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blocknum,
|
||||
{
|
||||
case 0:
|
||||
neon_log(ERROR, "cannot call smgrextend() on rel with unknown persistence");
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_PERMANENT:
|
||||
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
|
||||
{
|
||||
mdzeroextend(reln, forkNum, blocknum, nblocks, skipFsync);
|
||||
return;
|
||||
}
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_TEMP:
|
||||
case RELPERSISTENCE_UNLOGGED:
|
||||
mdzeroextend(reln, forkNum, blocknum, nblocks, skipFsync);
|
||||
/* Update LFC in case of unlogged index build */
|
||||
if (reln == unlogged_build_rel && unlogged_build_phase == UNLOGGED_BUILD_PHASE_2)
|
||||
{
|
||||
for (int i = 0; i < nblocks; i++)
|
||||
{
|
||||
lfc_write(InfoFromSMgrRel(reln), forkNum, blocknum + i, buffer.data);
|
||||
}
|
||||
}
|
||||
return;
|
||||
|
||||
default:
|
||||
@@ -1387,8 +1388,14 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
|
||||
{
|
||||
case 0:
|
||||
neon_log(ERROR, "cannot call smgrread() on rel with unknown persistence");
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_PERMANENT:
|
||||
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
|
||||
{
|
||||
mdread(reln, forkNum, blkno, buffer);
|
||||
return;
|
||||
}
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_TEMP:
|
||||
@@ -1474,8 +1481,14 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
|
||||
{
|
||||
case 0:
|
||||
neon_log(ERROR, "cannot call smgrread() on rel with unknown persistence");
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_PERMANENT:
|
||||
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
|
||||
{
|
||||
mdreadv(reln, forknum, blocknum, buffers, nblocks);
|
||||
return;
|
||||
}
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_TEMP:
|
||||
@@ -1608,6 +1621,15 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const vo
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_PERMANENT:
|
||||
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
|
||||
{
|
||||
#if PG_MAJORVERSION_NUM >= 17
|
||||
mdwritev(reln, forknum, blocknum, &buffer, 1, skipFsync);
|
||||
#else
|
||||
mdwrite(reln, forknum, blocknum, buffer, skipFsync);
|
||||
#endif
|
||||
return;
|
||||
}
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_TEMP:
|
||||
@@ -1617,9 +1639,6 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const vo
|
||||
#else
|
||||
mdwrite(reln, forknum, blocknum, buffer, skipFsync);
|
||||
#endif
|
||||
/* Update LFC in case of unlogged index build */
|
||||
if (reln == unlogged_build_rel && unlogged_build_phase == UNLOGGED_BUILD_PHASE_2)
|
||||
lfc_write(InfoFromSMgrRel(reln), forknum, blocknum, buffer);
|
||||
return;
|
||||
default:
|
||||
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
||||
@@ -1680,14 +1699,16 @@ neon_writev(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_PERMANENT:
|
||||
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
|
||||
{
|
||||
mdwritev(reln, forknum, blkno, buffers, nblocks, skipFsync);
|
||||
return;
|
||||
}
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_TEMP:
|
||||
case RELPERSISTENCE_UNLOGGED:
|
||||
mdwritev(reln, forknum, blkno, buffers, nblocks, skipFsync);
|
||||
/* Update LFC in case of unlogged index build */
|
||||
if (reln == unlogged_build_rel && unlogged_build_phase == UNLOGGED_BUILD_PHASE_2)
|
||||
lfc_writev(InfoFromSMgrRel(reln), forknum, blkno, buffers, nblocks);
|
||||
return;
|
||||
default:
|
||||
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
|
||||
@@ -1723,6 +1744,10 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_PERMANENT:
|
||||
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
|
||||
{
|
||||
return mdnblocks(reln, forknum);
|
||||
}
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_TEMP:
|
||||
@@ -1792,6 +1817,11 @@ neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber old_blocks, Blo
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_PERMANENT:
|
||||
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
|
||||
{
|
||||
mdtruncate(reln, forknum, old_blocks, nblocks);
|
||||
return;
|
||||
}
|
||||
break;
|
||||
|
||||
case RELPERSISTENCE_TEMP:
|
||||
@@ -1930,7 +1960,6 @@ neon_start_unlogged_build(SMgrRelation reln)
|
||||
*/
|
||||
if (unlogged_build_phase != UNLOGGED_BUILD_NOT_IN_PROGRESS)
|
||||
neon_log(ERROR, "unlogged relation build is already in progress");
|
||||
Assert(unlogged_build_rel == NULL);
|
||||
|
||||
ereport(SmgrTrace,
|
||||
(errmsg(NEON_TAG "starting unlogged build of relation %u/%u/%u",
|
||||
@@ -1947,7 +1976,7 @@ neon_start_unlogged_build(SMgrRelation reln)
|
||||
|
||||
case RELPERSISTENCE_TEMP:
|
||||
case RELPERSISTENCE_UNLOGGED:
|
||||
unlogged_build_rel = reln;
|
||||
unlogged_build_rel_info = InfoFromSMgrRel(reln);
|
||||
unlogged_build_phase = UNLOGGED_BUILD_NOT_PERMANENT;
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
if (!IsParallelWorker())
|
||||
@@ -1968,12 +1997,9 @@ neon_start_unlogged_build(SMgrRelation reln)
|
||||
neon_log(ERROR, "cannot perform unlogged index build, index is not empty ");
|
||||
#endif
|
||||
|
||||
unlogged_build_rel = reln;
|
||||
unlogged_build_rel_info = InfoFromSMgrRel(reln);
|
||||
unlogged_build_phase = UNLOGGED_BUILD_PHASE_1;
|
||||
|
||||
/* Make the relation look like it's unlogged */
|
||||
reln->smgr_relpersistence = RELPERSISTENCE_UNLOGGED;
|
||||
|
||||
/*
|
||||
* Create the local file. In a parallel build, the leader is expected to
|
||||
* call this first and do it.
|
||||
@@ -2000,17 +2026,16 @@ neon_start_unlogged_build(SMgrRelation reln)
|
||||
static void
|
||||
neon_finish_unlogged_build_phase_1(SMgrRelation reln)
|
||||
{
|
||||
Assert(unlogged_build_rel == reln);
|
||||
Assert(RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)));
|
||||
|
||||
ereport(SmgrTrace,
|
||||
(errmsg(NEON_TAG "finishing phase 1 of unlogged build of relation %u/%u/%u",
|
||||
RelFileInfoFmt(InfoFromSMgrRel(reln)))));
|
||||
RelFileInfoFmt((unlogged_build_rel_info)))));
|
||||
|
||||
if (unlogged_build_phase == UNLOGGED_BUILD_NOT_PERMANENT)
|
||||
return;
|
||||
|
||||
Assert(unlogged_build_phase == UNLOGGED_BUILD_PHASE_1);
|
||||
Assert(reln->smgr_relpersistence == RELPERSISTENCE_UNLOGGED);
|
||||
|
||||
/*
|
||||
* In a parallel build, (only) the leader process performs the 2nd
|
||||
@@ -2018,7 +2043,7 @@ neon_finish_unlogged_build_phase_1(SMgrRelation reln)
|
||||
*/
|
||||
if (IsParallelWorker())
|
||||
{
|
||||
unlogged_build_rel = NULL;
|
||||
NRelFileInfoInvalidate(unlogged_build_rel_info);
|
||||
unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
|
||||
}
|
||||
else
|
||||
@@ -2039,11 +2064,11 @@ neon_end_unlogged_build(SMgrRelation reln)
|
||||
{
|
||||
NRelFileInfoBackend rinfob = InfoBFromSMgrRel(reln);
|
||||
|
||||
Assert(unlogged_build_rel == reln);
|
||||
Assert(RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)));
|
||||
|
||||
ereport(SmgrTrace,
|
||||
(errmsg(NEON_TAG "ending unlogged build of relation %u/%u/%u",
|
||||
RelFileInfoFmt(InfoFromNInfoB(rinfob)))));
|
||||
RelFileInfoFmt(unlogged_build_rel_info))));
|
||||
|
||||
if (unlogged_build_phase != UNLOGGED_BUILD_NOT_PERMANENT)
|
||||
{
|
||||
@@ -2051,7 +2076,6 @@ neon_end_unlogged_build(SMgrRelation reln)
|
||||
BlockNumber nblocks;
|
||||
|
||||
Assert(unlogged_build_phase == UNLOGGED_BUILD_PHASE_2);
|
||||
Assert(reln->smgr_relpersistence == RELPERSISTENCE_UNLOGGED);
|
||||
|
||||
/*
|
||||
* Update the last-written LSN cache.
|
||||
@@ -2072,9 +2096,6 @@ neon_end_unlogged_build(SMgrRelation reln)
|
||||
InfoFromNInfoB(rinfob),
|
||||
MAIN_FORKNUM);
|
||||
|
||||
/* Make the relation look permanent again */
|
||||
reln->smgr_relpersistence = RELPERSISTENCE_PERMANENT;
|
||||
|
||||
/* Remove local copy */
|
||||
for (int forknum = 0; forknum <= MAX_FORKNUM; forknum++)
|
||||
{
|
||||
@@ -2083,6 +2104,8 @@ neon_end_unlogged_build(SMgrRelation reln)
|
||||
forknum);
|
||||
|
||||
forget_cached_relsize(InfoFromNInfoB(rinfob), forknum);
|
||||
lfc_invalidate(InfoFromNInfoB(rinfob), forknum, nblocks);
|
||||
|
||||
mdclose(reln, forknum);
|
||||
#ifndef DEBUG_COMPARE_LOCAL
|
||||
/* use isRedo == true, so that we drop it immediately */
|
||||
@@ -2093,7 +2116,7 @@ neon_end_unlogged_build(SMgrRelation reln)
|
||||
mdunlink(rinfob, INIT_FORKNUM, true);
|
||||
#endif
|
||||
}
|
||||
unlogged_build_rel = NULL;
|
||||
NRelFileInfoInvalidate(unlogged_build_rel_info);
|
||||
unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
|
||||
}
|
||||
|
||||
@@ -2166,7 +2189,7 @@ AtEOXact_neon(XactEvent event, void *arg)
|
||||
* Forget about any build we might have had in progress. The local
|
||||
* file will be unlinked by smgrDoPendingDeletes()
|
||||
*/
|
||||
unlogged_build_rel = NULL;
|
||||
NRelFileInfoInvalidate(unlogged_build_rel_info);
|
||||
unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
|
||||
break;
|
||||
|
||||
@@ -2178,7 +2201,7 @@ AtEOXact_neon(XactEvent event, void *arg)
|
||||
case XACT_EVENT_PRE_PREPARE:
|
||||
if (unlogged_build_phase != UNLOGGED_BUILD_NOT_IN_PROGRESS)
|
||||
{
|
||||
unlogged_build_rel = NULL;
|
||||
NRelFileInfoInvalidate(unlogged_build_rel_info);
|
||||
unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INTERNAL_ERROR),
|
||||
|
||||
15
poetry.lock
generated
15
poetry.lock
generated
@@ -1,4 +1,4 @@
|
||||
# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand.
|
||||
# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand.
|
||||
|
||||
[[package]]
|
||||
name = "aiohappyeyeballs"
|
||||
@@ -1145,18 +1145,19 @@ dotenv = ["python-dotenv"]
|
||||
|
||||
[[package]]
|
||||
name = "flask-cors"
|
||||
version = "5.0.0"
|
||||
description = "A Flask extension adding a decorator for CORS support"
|
||||
version = "6.0.0"
|
||||
description = "A Flask extension simplifying CORS support"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
python-versions = "<4.0,>=3.9"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "Flask_Cors-5.0.0-py2.py3-none-any.whl", hash = "sha256:b9e307d082a9261c100d8fb0ba909eec6a228ed1b60a8315fd85f783d61910bc"},
|
||||
{file = "flask_cors-5.0.0.tar.gz", hash = "sha256:5aadb4b950c4e93745034594d9f3ea6591f734bb3662e16e255ffbf5e89c88ef"},
|
||||
{file = "flask_cors-6.0.0-py3-none-any.whl", hash = "sha256:6332073356452343a8ccddbfec7befdc3fdd040141fe776ec9b94c262f058657"},
|
||||
{file = "flask_cors-6.0.0.tar.gz", hash = "sha256:4592c1570246bf7beee96b74bc0adbbfcb1b0318f6ba05c412e8909eceec3393"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
Flask = ">=0.9"
|
||||
flask = ">=0.9"
|
||||
Werkzeug = ">=0.7"
|
||||
|
||||
[[package]]
|
||||
name = "frozenlist"
|
||||
|
||||
@@ -161,8 +161,11 @@ struct ProxyCliArgs {
|
||||
#[clap(long, default_values_t = RateBucketInfo::DEFAULT_REDIS_SET)]
|
||||
redis_rps_limit: Vec<RateBucketInfo>,
|
||||
/// Cancellation channel size (max queue size for redis kv client)
|
||||
#[clap(long, default_value = "1024")]
|
||||
#[clap(long, default_value_t = 1024)]
|
||||
cancellation_ch_size: usize,
|
||||
/// Cancellation ops batch size for redis
|
||||
#[clap(long, default_value_t = 8)]
|
||||
cancellation_batch_size: usize,
|
||||
/// cache for `allowed_ips` (use `size=0` to disable)
|
||||
#[clap(long, default_value = config::CacheOptions::CACHE_DEFAULT_OPTIONS)]
|
||||
allowed_ips_cache: String,
|
||||
@@ -542,7 +545,12 @@ pub async fn run() -> anyhow::Result<()> {
|
||||
if let Some(mut redis_kv_client) = redis_kv_client {
|
||||
maintenance_tasks.spawn(async move {
|
||||
redis_kv_client.try_connect().await?;
|
||||
handle_cancel_messages(&mut redis_kv_client, rx_cancel).await?;
|
||||
handle_cancel_messages(
|
||||
&mut redis_kv_client,
|
||||
rx_cancel,
|
||||
args.cancellation_batch_size,
|
||||
)
|
||||
.await?;
|
||||
|
||||
drop(redis_kv_client);
|
||||
|
||||
|
||||
@@ -30,8 +30,6 @@ use crate::tls::postgres_rustls::MakeRustlsConnect;
|
||||
type IpSubnetKey = IpNet;
|
||||
|
||||
const CANCEL_KEY_TTL: i64 = 1_209_600; // 2 weeks cancellation key expire time
|
||||
const REDIS_SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(10);
|
||||
const BATCH_SIZE: usize = 8;
|
||||
|
||||
// Message types for sending through mpsc channel
|
||||
pub enum CancelKeyOp {
|
||||
@@ -231,12 +229,13 @@ impl CancelReplyOp {
|
||||
pub async fn handle_cancel_messages(
|
||||
client: &mut RedisKVClient,
|
||||
mut rx: mpsc::Receiver<CancelKeyOp>,
|
||||
batch_size: usize,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut batch = Vec::with_capacity(BATCH_SIZE);
|
||||
let mut pipeline = Pipeline::with_capacity(BATCH_SIZE);
|
||||
let mut batch = Vec::with_capacity(batch_size);
|
||||
let mut pipeline = Pipeline::with_capacity(batch_size);
|
||||
|
||||
loop {
|
||||
if rx.recv_many(&mut batch, BATCH_SIZE).await == 0 {
|
||||
if rx.recv_many(&mut batch, batch_size).await == 0 {
|
||||
warn!("shutting down cancellation queue");
|
||||
break Ok(());
|
||||
}
|
||||
@@ -367,8 +366,7 @@ impl CancellationHandler {
|
||||
return Err(CancelError::InternalError);
|
||||
};
|
||||
|
||||
tx.send_timeout(op, REDIS_SEND_TIMEOUT)
|
||||
.await
|
||||
tx.try_send(op)
|
||||
.map_err(|e| {
|
||||
tracing::warn!("failed to send GetCancelData for {key}: {e}");
|
||||
})
|
||||
@@ -570,7 +568,7 @@ impl Session {
|
||||
}
|
||||
|
||||
// Send the store key op to the cancellation handler and set TTL for the key
|
||||
pub(crate) async fn write_cancel_key(
|
||||
pub(crate) fn write_cancel_key(
|
||||
&self,
|
||||
cancel_closure: CancelClosure,
|
||||
) -> Result<(), CancelError> {
|
||||
@@ -596,14 +594,14 @@ impl Session {
|
||||
expire: CANCEL_KEY_TTL,
|
||||
};
|
||||
|
||||
let _ = tx.send_timeout(op, REDIS_SEND_TIMEOUT).await.map_err(|e| {
|
||||
let _ = tx.try_send(op).map_err(|e| {
|
||||
let key = self.key;
|
||||
tracing::warn!("failed to send StoreCancelKey for {key}: {e}");
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn remove_cancel_key(&self) -> Result<(), CancelError> {
|
||||
pub(crate) fn remove_cancel_key(&self) -> Result<(), CancelError> {
|
||||
let Some(tx) = &self.cancellation_handler.tx else {
|
||||
tracing::warn!("cancellation handler is not available");
|
||||
return Err(CancelError::InternalError);
|
||||
@@ -619,7 +617,7 @@ impl Session {
|
||||
.guard(RedisMsgKind::HDel),
|
||||
};
|
||||
|
||||
let _ = tx.send_timeout(op, REDIS_SEND_TIMEOUT).await.map_err(|e| {
|
||||
let _ = tx.try_send(op).map_err(|e| {
|
||||
let key = self.key;
|
||||
tracing::warn!("failed to send RemoveCancelKey for {key}: {e}");
|
||||
});
|
||||
|
||||
@@ -244,9 +244,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
let cancellation_handler_clone = Arc::clone(&cancellation_handler);
|
||||
let session = cancellation_handler_clone.get_key();
|
||||
|
||||
session
|
||||
.write_cancel_key(node.cancel_closure.clone())
|
||||
.await?;
|
||||
session.write_cancel_key(node.cancel_closure.clone())?;
|
||||
|
||||
prepare_client_connection(&node, *session.key(), &mut stream).await?;
|
||||
|
||||
|
||||
@@ -383,9 +383,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
let cancellation_handler_clone = Arc::clone(&cancellation_handler);
|
||||
let session = cancellation_handler_clone.get_key();
|
||||
|
||||
session
|
||||
.write_cancel_key(node.cancel_closure.clone())
|
||||
.await?;
|
||||
session.write_cancel_key(node.cancel_closure.clone())?;
|
||||
|
||||
prepare_client_connection(&node, *session.key(), &mut stream).await?;
|
||||
|
||||
|
||||
@@ -94,7 +94,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> ProxyPassthrough<S> {
|
||||
tracing::warn!(session_id = ?self.session_id, ?err, "could not cancel the query in the database");
|
||||
}
|
||||
|
||||
drop(self.cancel.remove_cancel_key().await); // we don't need a result. If the queue is full, we just log the error
|
||||
drop(self.cancel.remove_cancel_key()); // we don't need a result. If the queue is full, we just log the error
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
@@ -103,7 +103,7 @@ class AbstractNeonCli:
|
||||
else:
|
||||
stdout = ""
|
||||
|
||||
log.warn(f"CLI timeout: stderr={stderr}, stdout={stdout}")
|
||||
log.warning(f"CLI timeout: stderr={stderr}, stdout={stdout}")
|
||||
raise
|
||||
|
||||
indent = " "
|
||||
|
||||
@@ -19,6 +19,16 @@ TEST_ROLE_NAMES = [
|
||||
{"name": "role$"},
|
||||
{"name": "role$$"},
|
||||
{"name": "role$x$"},
|
||||
{"name": "x"},
|
||||
{"name": "xx"},
|
||||
{"name": "$x"},
|
||||
{"name": "x$"},
|
||||
{"name": "$x$"},
|
||||
{"name": "xx$"},
|
||||
{"name": "$xx"},
|
||||
{"name": "$xx$"},
|
||||
# 63 bytes is the limit for role/DB names in Postgres
|
||||
{"name": "x" * 63},
|
||||
]
|
||||
|
||||
TEST_DB_NAMES = [
|
||||
@@ -74,6 +84,43 @@ TEST_DB_NAMES = [
|
||||
"name": "db name$x$",
|
||||
"owner": "role$x$",
|
||||
},
|
||||
{
|
||||
"name": "x",
|
||||
"owner": "x",
|
||||
},
|
||||
{
|
||||
"name": "xx",
|
||||
"owner": "xx",
|
||||
},
|
||||
{
|
||||
"name": "$x",
|
||||
"owner": "$x",
|
||||
},
|
||||
{
|
||||
"name": "x$",
|
||||
"owner": "x$",
|
||||
},
|
||||
{
|
||||
"name": "$x$",
|
||||
"owner": "$x$",
|
||||
},
|
||||
{
|
||||
"name": "xx$",
|
||||
"owner": "xx$",
|
||||
},
|
||||
{
|
||||
"name": "$xx",
|
||||
"owner": "$xx",
|
||||
},
|
||||
{
|
||||
"name": "$xx$",
|
||||
"owner": "$xx$",
|
||||
},
|
||||
# 63 bytes is the limit for role/DB names in Postgres
|
||||
{
|
||||
"name": "x" * 63,
|
||||
"owner": "x" * 63,
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
@@ -146,6 +193,10 @@ def test_compute_create_drop_dbs_and_roles(neon_simple_env: NeonEnv):
|
||||
"""
|
||||
Test that compute_ctl can create and work with databases and roles
|
||||
with special characters (whitespaces, %, tabs, etc.) in the name.
|
||||
Also use `drop_subscriptions_before_start: true`. We do not actually
|
||||
have any subscriptions in this test, so it should be no-op, but it
|
||||
i) simulates the case when we create a second dev branch together with
|
||||
a new project creation, and ii) just generally stresses more code paths.
|
||||
"""
|
||||
env = neon_simple_env
|
||||
|
||||
@@ -159,6 +210,7 @@ def test_compute_create_drop_dbs_and_roles(neon_simple_env: NeonEnv):
|
||||
**{
|
||||
"spec": {
|
||||
"skip_pg_catalog_updates": False,
|
||||
"drop_subscriptions_before_start": True,
|
||||
"cluster": {
|
||||
"roles": TEST_ROLE_NAMES,
|
||||
"databases": TEST_DB_NAMES,
|
||||
@@ -202,6 +254,7 @@ def test_compute_create_drop_dbs_and_roles(neon_simple_env: NeonEnv):
|
||||
**{
|
||||
"spec": {
|
||||
"skip_pg_catalog_updates": False,
|
||||
"drop_subscriptions_before_start": True,
|
||||
"cluster": {
|
||||
"roles": [],
|
||||
"databases": [],
|
||||
|
||||
@@ -510,7 +510,7 @@ def list_elegible_layers(
|
||||
except KeyError:
|
||||
# Unexpected: tests should call this when pageservers are in a quiet state such that the layer map
|
||||
# matches what's on disk.
|
||||
log.warn(f"Lookup {layer_file_name} from {list(visible_map.keys())}")
|
||||
log.warning(f"Lookup {layer_file_name} from {list(visible_map.keys())}")
|
||||
raise
|
||||
|
||||
return list(c for c in candidates if is_visible(c))
|
||||
@@ -636,7 +636,7 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
|
||||
except:
|
||||
# On assertion failures, log some details to help with debugging
|
||||
heatmap = env.pageserver_remote_storage.heatmap_content(tenant_id)
|
||||
log.warn(f"heatmap contents: {json.dumps(heatmap, indent=2)}")
|
||||
log.warning(f"heatmap contents: {json.dumps(heatmap, indent=2)}")
|
||||
raise
|
||||
|
||||
# Scrub the remote storage
|
||||
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 4cca6f8083...55c0d45abe
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: daa81cffcf...de7640f55d
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 15710a76b7...0bf96bd6d7
2
vendor/postgres-v17
vendored
2
vendor/postgres-v17
vendored
Submodule vendor/postgres-v17 updated: e5374b7299...8be779fd3a
8
vendor/revisions.json
vendored
8
vendor/revisions.json
vendored
@@ -1,18 +1,18 @@
|
||||
{
|
||||
"v17": [
|
||||
"17.5",
|
||||
"e5374b72997b0afc8374137674e873f7a558120a"
|
||||
"8be779fd3ab9e87206da96a7e4842ef1abf04f44"
|
||||
],
|
||||
"v16": [
|
||||
"16.9",
|
||||
"15710a76b7d07912110fcbbaf0c8ad6d7e5a9fbc"
|
||||
"0bf96bd6d70301a0b43b0b3457bb3cf8fb43c198"
|
||||
],
|
||||
"v15": [
|
||||
"15.13",
|
||||
"daa81cffcf063c54b29a9aabdb6604625f675ad0"
|
||||
"de7640f55da07512834d5cc40c4b3fb376b5f04f"
|
||||
],
|
||||
"v14": [
|
||||
"14.18",
|
||||
"4cca6f8083483dda9e12eae292cf788d45bd561f"
|
||||
"55c0d45abe6467c02084c2192bca117eda6ce1e7"
|
||||
]
|
||||
}
|
||||
|
||||
@@ -60,7 +60,8 @@ lazy_static = { version = "1", default-features = false, features = ["spin_no_st
|
||||
libc = { version = "0.2", features = ["extra_traits", "use_std"] }
|
||||
log = { version = "0.4", default-features = false, features = ["std"] }
|
||||
memchr = { version = "2" }
|
||||
nix = { version = "0.26" }
|
||||
nix-2f80eeee3b1b6c7e = { package = "nix", version = "0.26" }
|
||||
nix-fa1f6196edfd7249 = { package = "nix", version = "0.30", features = ["dir", "ioctl", "mman", "poll", "signal", "socket"] }
|
||||
nom = { version = "7" }
|
||||
num = { version = "0.4" }
|
||||
num-bigint = { version = "0.4" }
|
||||
|
||||
Reference in New Issue
Block a user