Compare commits

..

23 Commits

Author SHA1 Message Date
Christian Schwarz
7b5e3d6d40 Revert "work around copy_to_user failure on reads"
Not necessary as of 271b19bd3e

Reason why that commit fixes the issues is
https://neondb.slack.com/archives/C08SXUSNFBP/p1747837985950309

 > implemented a fix in the kernel module, no need for userspace to
> pre-fault mmapped pages in; the reason for the EFAULT was that we were
> holding a spinlock during copy_to_user, which disables preemption, which
> in turn makes the page fault handler give up and cause copy_to_user to
> fail; commit to be used is 271b19bd3e2de7777770ac6b8b1b1c94bb33830b
> (edited)

This reverts commit 7b818f8d64.
2025-05-23 12:11:01 +02:00
Christian Schwarz
e086568e21 demote log levels to avoid flooding the logs 2025-05-23 12:10:53 +02:00
Heikki Linnakangas
7b818f8d64 work around copy_to_user failure on reads 2025-05-21 16:33:52 +03:00
Heikki Linnakangas
14fefd261f add separate counters for ioctl read misses, and some debugging LOG messages 2025-05-21 01:33:58 +03:00
Heikki Linnakangas
01abd4afc5 fix address argument again 2025-05-21 01:33:43 +03:00
Heikki Linnakangas
c8541ad29f fix bogus pointer 2025-05-21 00:20:56 +03:00
Heikki Linnakangas
eaad1db9f0 Add bespoken metrics for kernel module cache misses 2025-05-21 00:20:39 +03:00
Heikki Linnakangas
6ddcf68829 use correct request code for writes 2025-05-20 23:53:56 +03:00
Heikki Linnakangas
d701f8285c hack permissions on /dev/clockcache_dev to allow access to everyone 2025-05-20 23:52:58 +03:00
Heikki Linnakangas
77082a0f63 Implement using the kernel module
Enabled when you set "lfc_use_kernel_module=on" in postgresql.conf

XXX: This compiles, but is 100% untested
2025-05-20 17:45:40 +03:00
Alexey Kondratov
e94acbc816 fix(compute_ctl): Dollar escaping and tests (#11969)
## Problem

In the escaping path we were checking that `${tag}$` or `${outer_tag}$`
are present in the string, but that's not enough, as original string
surrounded by `$` can also form a 'tag', like `$x$xx$x$`, which is fine
on it's own, but cannot be used in the string escaped with `$xx$`.

## Summary of changes

Remove `$` from the checks, just check if `{tag}` or `{outer_tag}` are
present. Add more test cases and change the catalog test to stress the
`drop_subscriptions_before_start: true` path as well.

Fixes https://github.com/neondatabase/cloud/issues/29198
2025-05-20 09:03:36 +00:00
Erik Grinaker
f4150614d0 pageserver: don't pass config to PageHandler (#11973)
## Problem

The gRPC page service API will require decoupling the `PageHandler` from
the libpq protocol implementation. As preparation for this, avoid
passing in the entire server config to `PageHandler`, and instead
explicitly pass in the relevant fields.

Touches https://github.com/neondatabase/neon/issues/11728.

## Summary of changes

* Change `PageHandler` to take a `GetVectoredConcurrentIo` instead of
the entire config.
* Change `IoConcurrency::spawn_from_conf` to take a
`GetVectoredConcurrentIo`.
2025-05-19 15:47:40 +00:00
Erik Grinaker
38dbc5f67f pageserver/page_api: add binary Protobuf descriptor (#11968)
## Problem

A binary Protobuf schema descriptor can be used to expose an API
reflection service, which in turn allows convenient usage of e.g.
`grpcurl` against the gRPC server.

Touches #11728.

## Summary of changes

* Generate a binary schema descriptor as
`pageserver_page_api::proto::FILE_DESCRIPTOR_SET`.
* Opportunistically rename the Protobuf package from `page_service` to
`page_api`.
2025-05-19 11:17:45 +00:00
Folke Behrens
3685ad606d endpoint_storage: Fix metrics test by excluding assertion on macos (#11952) 2025-05-19 10:56:03 +00:00
Ivan Efremov
76a7d37f7e proxy: Drop cancellation ops if they don't fit into the queue (#11950)
Add a redis ops batch size argument for proxy and remove timeouts by
using try_send()
2025-05-19 10:10:55 +00:00
Erik Grinaker
cdb6479c8a pageserver: add gRPC page service schema (#11815)
## Problem

For the [communicator
project](https://github.com/neondatabase/company_projects/issues/352),
we want to move to gRPC for the page service protocol.

Touches #11728.

## Summary of changes

This patch adds an experimental gRPC Protobuf schema for the page
service. It is equivalent to the current page service, but with several
improvements, e.g.:

* Connection multiplexing.
* Reduced head-of-line blocking.
* Client-side batching.
* Explicit tenant shard routing.
* GetPage request classification (normal vs. prefetch).
* Explicit rate limiting ("slow down" response status).

The API is exposed as a new `pageserver/page_api` package. This is
separate from the `pageserver_api` package to reduce the dependency
footprint for the communicator. The longer-term plan is to also split
out e.g. the WAL ingestion service to a separate gRPC package, e.g.
`pageserver/wal_api`.

Subsequent PRs will: add Rust domain types for the Protobuf types,
expose a gRPC server, and implement the page service.

Preliminary prototype benchmarks of this gRPC API is within 10% of
baseline libpq performance. We'll do further benchmarking and
optimization as the implementation lands in `main` and is deployed to
staging.
2025-05-19 09:03:06 +00:00
Konstantin Knizhnik
81c557d87e Unlogged build get smgr (#11954)
## Problem

See https://github.com/neondatabase/neon/issues/11910
and https://neondb.slack.com/archives/C04DGM6SMTM/p1747314649059129

## Summary of changes

Do not change persistence in `start_unlogged_build`

Postgres PRs:
https://github.com/neondatabase/postgres/pull/642
https://github.com/neondatabase/postgres/pull/641
https://github.com/neondatabase/postgres/pull/640
https://github.com/neondatabase/postgres/pull/639

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-05-18 05:02:47 +00:00
Trung Dinh
e963129678 pagesteam_handle_batched_message -> pagestream_handle_batched_message (#11916)
## Problem
Found a typo in code.

## Summary of changes

Co-authored-by: Trung Dinh <tdinh@roblox.com>
Co-authored-by: Erik Grinaker <erik@neon.tech>
2025-05-17 22:30:29 +00:00
dependabot[bot]
4f0a9fc569 chore(deps): bump flask-cors from 5.0.0 to 6.0.0 in the pip group across 1 directory (#11960)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-05-17 22:06:32 +00:00
Emmanuel Ferdman
81c6a5a796 Migrate to correct logger interface (#11956)
## Problem
Currently the `logger` library throws annoying deprecation warnings:
```python
DeprecationWarning: The 'warn' method is deprecated, use 'warning' instead
```

## Summary of changes
This small PR resolves the annoying deprecation warnings by migrating to
`.warning` as suggested.

Signed-off-by: Emmanuel Ferdman <emmanuelferdman@gmail.com>
2025-05-17 21:12:01 +00:00
Konstantin Knizhnik
8e05639dbf Invalidate LFC after unlogged build (#11951)
## Problem


See https://neondb.slack.com/archives/C04DGM6SMTM/p1747391617951239

LFC is not always properly updated during unlogged build so it can
contain stale content.

## Summary of changes

Invalidate LFC content at the end of unlogged build

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
2025-05-17 19:06:59 +00:00
Alexander Bayandin
deed46015d CI(test-images): increase timeout from 20m to 60m (#11955)
## Problem

For some reason (unknown yet) 20m timeout is not enough for
`test-images` job on arm runners.
Ref:
https://github.com/neondatabase/neon/actions/runs/15075321681/job/42387530399?pr=11953

## Summary of changes
- Increase the timeout from 20m to 1h
2025-05-17 06:34:54 +00:00
Heikki Linnakangas
532d9b646e Add simple facility for an extendable shared memory area (#11929)
You still need to provide a max size up-front, but memory is only
allocated for the portion that is in use.

The module is currently unused, but will be used by the new compute
communicator project, in the neon Postgres extension. See
https://github.com/neondatabase/neon/issues/11729

---------

Co-authored-by: Erik Grinaker <erik@neon.tech>
2025-05-16 21:22:36 +00:00
44 changed files with 1300 additions and 156 deletions

View File

@@ -963,7 +963,7 @@ jobs:
fi
- name: Verify docker-compose example and test extensions
timeout-minutes: 20
timeout-minutes: 60
env:
TAG: >-
${{

21
Cargo.lock generated
View File

@@ -3794,6 +3794,16 @@ version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
[[package]]
name = "neon-shmem"
version = "0.1.0"
dependencies = [
"nix 0.30.1",
"tempfile",
"thiserror 1.0.69",
"workspace_hack",
]
[[package]]
name = "never-say-never"
version = "6.6.666"
@@ -4424,6 +4434,16 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "pageserver_page_api"
version = "0.1.0"
dependencies = [
"prost 0.13.3",
"tonic",
"tonic-build",
"workspace_hack",
]
[[package]]
name = "papaya"
version = "0.2.1"
@@ -8482,6 +8502,7 @@ dependencies = [
"log",
"memchr",
"nix 0.26.4",
"nix 0.30.1",
"nom",
"num",
"num-bigint",

View File

@@ -9,6 +9,7 @@ members = [
"pageserver/ctl",
"pageserver/client",
"pageserver/pagebench",
"pageserver/page_api",
"proxy",
"safekeeper",
"safekeeper/client",
@@ -23,6 +24,7 @@ members = [
"libs/postgres_ffi",
"libs/safekeeper_api",
"libs/desim",
"libs/neon-shmem",
"libs/utils",
"libs/consumption_metrics",
"libs/postgres_backend",
@@ -127,7 +129,7 @@ md5 = "0.7.0"
measured = { version = "0.0.22", features=["lasso"] }
measured-process = { version = "0.0.22" }
memoffset = "0.9"
nix = { version = "0.30.1", features = ["dir", "fs", "process", "socket", "signal", "poll"] }
nix = { version = "0.30.1", features = ["dir", "fs", "mman", "process", "socket", "signal", "poll"] }
# Do not update to >= 7.0.0, at least. The update will have a significant impact
# on compute startup metrics (start_postgres_ms), >= 25% degradation.
notify = "6.0.0"
@@ -251,6 +253,7 @@ pageserver = { path = "./pageserver" }
pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }
pageserver_client = { path = "./pageserver/client" }
pageserver_compaction = { version = "0.1", path = "./pageserver/compaction/" }
pageserver_page_api = { path = "./pageserver/page_api" }
postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" }
postgres_connection = { version = "0.1", path = "./libs/postgres_connection/" }
postgres_ffi = { version = "0.1", path = "./libs/postgres_ffi/" }

View File

@@ -144,7 +144,6 @@ RUN set -e \
openssh-client \
parallel \
pkg-config \
sudo \
unzip \
wget \
xz-utils \

View File

@@ -7,7 +7,7 @@ index 255e616..1c6edb7 100644
RelationGetRelationName(index));
+#ifdef NEON_SMGR
+ smgr_start_unlogged_build(index->rd_smgr);
+ smgr_start_unlogged_build(RelationGetSmgr(index));
+#endif
+
initRumState(&buildstate.rumstate, index);
@@ -18,7 +18,7 @@ index 255e616..1c6edb7 100644
rumUpdateStats(index, &buildstate.buildStats, buildstate.rumstate.isBuild);
+#ifdef NEON_SMGR
+ smgr_finish_unlogged_build_phase_1(index->rd_smgr);
+ smgr_finish_unlogged_build_phase_1(RelationGetSmgr(index));
+#endif
+
/*
@@ -29,7 +29,7 @@ index 255e616..1c6edb7 100644
}
+#ifdef NEON_SMGR
+ smgr_end_unlogged_build(index->rd_smgr);
+ smgr_end_unlogged_build(RelationGetSmgr(index));
+#endif
+
/*

View File

@@ -15,6 +15,10 @@ commands:
user: root
sysvInitAction: sysinit
shell: 'chmod 711 /neonvm/bin/set-disk-quota'
- name: chmod-clockcache_dev
user: root
sysvInitAction: sysinit
shell: 'chmod 777 /dev/clockcache_dev' # FIXME: not very secure
- name: pgbouncer
user: postgres
sysvInitAction: respawn

View File

@@ -213,8 +213,10 @@ impl Escaping for PgIdent {
// Find the first suitable tag that is not present in the string.
// Postgres' max role/DB name length is 63 bytes, so even in the
// worst case it won't take long.
while self.contains(&format!("${tag}$")) || self.contains(&format!("${outer_tag}$")) {
// worst case it won't take long. Outer tag is always `tag + "x"`,
// so if `tag` is not present in the string, `outer_tag` is not
// present in the string either.
while self.contains(&tag.to_string()) {
tag += "x";
outer_tag = tag.clone() + "x";
}

View File

@@ -71,6 +71,14 @@ test.escaping = 'here''s a backslash \\ and a quote '' and a double-quote " hoor
("name$$$", ("$x$name$$$$x$", "xx")),
("name$$$$", ("$x$name$$$$$x$", "xx")),
("name$x$", ("$xx$name$x$$xx$", "xxx")),
("x", ("$xx$x$xx$", "xxx")),
("xx", ("$xxx$xx$xxx$", "xxxx")),
("$x", ("$xx$$x$xx$", "xxx")),
("x$", ("$xx$x$$xx$", "xxx")),
("$x$", ("$xx$$x$$xx$", "xxx")),
("xx$", ("$xxx$xx$$xxx$", "xxxx")),
("$xx", ("$xxx$$xx$xxx$", "xxxx")),
("$xx$", ("$xxx$$xx$$xxx$", "xxxx")),
];
for (input, expected) in test_cases {

View File

@@ -462,6 +462,8 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
if var(REAL_S3_ENV).is_ok() {
assert!(body.contains("remote_storage_s3_deleted_objects_total"));
}
#[cfg(target_os = "linux")]
assert!(body.contains("process_threads"));
}

View File

@@ -0,0 +1,13 @@
[package]
name = "neon-shmem"
version = "0.1.0"
edition.workspace = true
license.workspace = true
[dependencies]
thiserror.workspace = true
nix.workspace=true
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
[target.'cfg(target_os = "macos")'.dependencies]
tempfile = "3.14.0"

418
libs/neon-shmem/src/lib.rs Normal file
View File

@@ -0,0 +1,418 @@
//! Shared memory utilities for neon communicator
use std::num::NonZeroUsize;
use std::os::fd::{AsFd, BorrowedFd, OwnedFd};
use std::ptr::NonNull;
use std::sync::atomic::{AtomicUsize, Ordering};
use nix::errno::Errno;
use nix::sys::mman::MapFlags;
use nix::sys::mman::ProtFlags;
use nix::sys::mman::mmap as nix_mmap;
use nix::sys::mman::munmap as nix_munmap;
use nix::unistd::ftruncate as nix_ftruncate;
/// ShmemHandle represents a shared memory area that can be shared by processes over fork().
/// Unlike shared memory allocated by Postgres, this area is resizable, up to 'max_size' that's
/// specified at creation.
///
/// The area is backed by an anonymous file created with memfd_create(). The full address space for
/// 'max_size' is reserved up-front with mmap(), but whenever you call [`ShmemHandle::set_size`],
/// the underlying file is resized. Do not access the area beyond the current size. Currently, that
/// will cause the file to be expanded, but we might use mprotect() etc. to enforce that in the
/// future.
pub struct ShmemHandle {
/// memfd file descriptor
fd: OwnedFd,
max_size: usize,
// Pointer to the beginning of the shared memory area. The header is stored there.
shared_ptr: NonNull<SharedStruct>,
// Pointer to the beginning of the user data
pub data_ptr: NonNull<u8>,
}
/// This is stored at the beginning in the shared memory area.
struct SharedStruct {
max_size: usize,
/// Current size of the backing file. The high-order bit is used for the RESIZE_IN_PROGRESS flag
current_size: AtomicUsize,
}
const RESIZE_IN_PROGRESS: usize = 1 << 63;
const HEADER_SIZE: usize = std::mem::size_of::<SharedStruct>();
/// Error type returned by the ShmemHandle functions.
#[derive(thiserror::Error, Debug)]
#[error("{msg}: {errno}")]
pub struct Error {
pub msg: String,
pub errno: Errno,
}
impl Error {
fn new(msg: &str, errno: Errno) -> Error {
Error {
msg: msg.to_string(),
errno,
}
}
}
impl ShmemHandle {
/// Create a new shared memory area. To communicate between processes, the processes need to be
/// fork()'d after calling this, so that the ShmemHandle is inherited by all processes.
///
/// If the ShmemHandle is dropped, the memory is unmapped from the current process. Other
/// processes can continue using it, however.
pub fn new(name: &str, initial_size: usize, max_size: usize) -> Result<ShmemHandle, Error> {
// create the backing anonymous file.
let fd = create_backing_file(name)?;
Self::new_with_fd(fd, initial_size, max_size)
}
fn new_with_fd(
fd: OwnedFd,
initial_size: usize,
max_size: usize,
) -> Result<ShmemHandle, Error> {
// We reserve the high-order bit for the RESIZE_IN_PROGRESS flag, and the actual size
// is a little larger than this because of the SharedStruct header. Make the upper limit
// somewhat smaller than that, because with anything close to that, you'll run out of
// memory anyway.
if max_size >= 1 << 48 {
panic!("max size {} too large", max_size);
}
if initial_size > max_size {
panic!("initial size {initial_size} larger than max size {max_size}");
}
// The actual initial / max size is the one given by the caller, plus the size of
// 'SharedStruct'.
let initial_size = HEADER_SIZE + initial_size;
let max_size = NonZeroUsize::new(HEADER_SIZE + max_size).unwrap();
// Reserve address space for it with mmap
//
// TODO: Use MAP_HUGETLB if possible
let start_ptr = unsafe {
nix_mmap(
None,
max_size,
ProtFlags::PROT_READ | ProtFlags::PROT_WRITE,
MapFlags::MAP_SHARED,
&fd,
0,
)
}
.map_err(|e| Error::new("mmap failed: {e}", e))?;
// Reserve space for the initial size
enlarge_file(fd.as_fd(), initial_size as u64)?;
// Initialize the header
let shared: NonNull<SharedStruct> = start_ptr.cast();
unsafe {
shared.write(SharedStruct {
max_size: max_size.into(),
current_size: AtomicUsize::new(initial_size),
})
};
// The user data begins after the header
let data_ptr = unsafe { start_ptr.cast().add(HEADER_SIZE) };
Ok(ShmemHandle {
fd,
max_size: max_size.into(),
shared_ptr: shared,
data_ptr,
})
}
// return reference to the header
fn shared(&self) -> &SharedStruct {
unsafe { self.shared_ptr.as_ref() }
}
/// Resize the shared memory area. 'new_size' must not be larger than the 'max_size' specified
/// when creating the area.
///
/// This may only be called from one process/thread concurrently. We detect that case
/// and return an Error.
pub fn set_size(&self, new_size: usize) -> Result<(), Error> {
let new_size = new_size + HEADER_SIZE;
let shared = self.shared();
if new_size > self.max_size {
panic!(
"new size ({} is greater than max size ({})",
new_size, self.max_size
);
}
assert_eq!(self.max_size, shared.max_size);
// Lock the area by setting the bit in 'current_size'
//
// Ordering::Relaxed would probably be sufficient here, as we don't access any other memory
// and the posix_fallocate/ftruncate call is surely a synchronization point anyway. But
// since this is not performance-critical, better safe than sorry .
let mut old_size = shared.current_size.load(Ordering::Acquire);
loop {
if (old_size & RESIZE_IN_PROGRESS) != 0 {
return Err(Error::new(
"concurrent resize detected",
Errno::UnknownErrno,
));
}
match shared.current_size.compare_exchange(
old_size,
new_size,
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(x) => old_size = x,
}
}
// Ok, we got the lock.
//
// NB: If anything goes wrong, we *must* clear the bit!
let result = {
use std::cmp::Ordering::{Equal, Greater, Less};
match new_size.cmp(&old_size) {
Less => nix_ftruncate(&self.fd, new_size as i64).map_err(|e| {
Error::new("could not shrink shmem segment, ftruncate failed: {e}", e)
}),
Equal => Ok(()),
Greater => enlarge_file(self.fd.as_fd(), new_size as u64),
}
};
// Unlock
shared.current_size.store(
if result.is_ok() { new_size } else { old_size },
Ordering::Release,
);
result
}
/// Returns the current user-visible size of the shared memory segment.
///
/// NOTE: a concurrent set_size() call can change the size at any time. It is the caller's
/// responsibility not to access the area beyond the current size.
pub fn current_size(&self) -> usize {
let total_current_size =
self.shared().current_size.load(Ordering::Relaxed) & !RESIZE_IN_PROGRESS;
total_current_size - HEADER_SIZE
}
}
impl Drop for ShmemHandle {
fn drop(&mut self) {
// SAFETY: The pointer was obtained from mmap() with the given size.
// We unmap the entire region.
let _ = unsafe { nix_munmap(self.shared_ptr.cast(), self.max_size) };
// The fd is dropped automatically by OwnedFd.
}
}
/// Create a "backing file" for the shared memory area. On Linux, use memfd_create(), to create an
/// anonymous in-memory file. One macos, fall back to a regular file. That's good enough for
/// development and testing, but in production we want the file to stay in memory.
///
/// disable 'unused_variables' warnings, because in the macos path, 'name' is unused.
#[allow(unused_variables)]
fn create_backing_file(name: &str) -> Result<OwnedFd, Error> {
#[cfg(not(target_os = "macos"))]
{
nix::sys::memfd::memfd_create(name, nix::sys::memfd::MFdFlags::empty())
.map_err(|e| Error::new("memfd_create failed: {e}", e))
}
#[cfg(target_os = "macos")]
{
let file = tempfile::tempfile().map_err(|e| {
Error::new(
"could not create temporary file to back shmem area: {e}",
nix::errno::Errno::from_raw(e.raw_os_error().unwrap_or(0)),
)
})?;
Ok(OwnedFd::from(file))
}
}
fn enlarge_file(fd: BorrowedFd, size: u64) -> Result<(), Error> {
// Use posix_fallocate() to enlarge the file. It reserves the space correctly, so that
// we don't get a segfault later when trying to actually use it.
#[cfg(not(target_os = "macos"))]
{
nix::fcntl::posix_fallocate(fd, 0, size as i64).map_err(|e| {
Error::new(
"could not grow shmem segment, posix_fallocate failed: {e}",
e,
)
})
}
// As a fallback on macos, which doesn't have posix_fallocate, use plain 'fallocate'
#[cfg(target_os = "macos")]
{
nix::unistd::ftruncate(fd, size as i64)
.map_err(|e| Error::new("could not grow shmem segment, ftruncate failed: {e}", e))
}
}
#[cfg(test)]
mod tests {
use super::*;
use nix::unistd::ForkResult;
use std::ops::Range;
/// check that all bytes in given range have the expected value.
fn assert_range(ptr: *const u8, expected: u8, range: Range<usize>) {
for i in range {
let b = unsafe { *(ptr.add(i)) };
assert_eq!(expected, b, "unexpected byte at offset {}", i);
}
}
/// Write 'b' to all bytes in the given range
fn write_range(ptr: *mut u8, b: u8, range: Range<usize>) {
unsafe { std::ptr::write_bytes(ptr.add(range.start), b, range.end - range.start) };
}
// simple single-process test of growing and shrinking
#[test]
fn test_shmem_resize() -> Result<(), Error> {
let max_size = 1024 * 1024;
let init_struct = ShmemHandle::new("test_shmem_resize", 0, max_size)?;
assert_eq!(init_struct.current_size(), 0);
// Initial grow
let size1 = 10000;
init_struct.set_size(size1).unwrap();
assert_eq!(init_struct.current_size(), size1);
// Write some data
let data_ptr = init_struct.data_ptr.as_ptr();
write_range(data_ptr, 0xAA, 0..size1);
assert_range(data_ptr, 0xAA, 0..size1);
// Shrink
let size2 = 5000;
init_struct.set_size(size2).unwrap();
assert_eq!(init_struct.current_size(), size2);
// Grow again
let size3 = 20000;
init_struct.set_size(size3).unwrap();
assert_eq!(init_struct.current_size(), size3);
// Try to read it. The area that was shrunk and grown again should read as all zeros now
assert_range(data_ptr, 0xAA, 0..5000);
assert_range(data_ptr, 0, 5000..size1);
// Try to grow beyond max_size
//let size4 = max_size + 1;
//assert!(init_struct.set_size(size4).is_err());
// Dropping init_struct should unmap the memory
drop(init_struct);
Ok(())
}
/// This is used in tests to coordinate between test processes. It's like std::sync::Barrier,
/// but is stored in the shared memory area and works across processes. It's implemented by
/// polling, because e.g. standard rust mutexes are not guaranteed to work across processes.
struct SimpleBarrier {
num_procs: usize,
count: AtomicUsize,
}
impl SimpleBarrier {
unsafe fn init(ptr: *mut SimpleBarrier, num_procs: usize) {
unsafe {
*ptr = SimpleBarrier {
num_procs,
count: AtomicUsize::new(0),
}
}
}
pub fn wait(&self) {
let old = self.count.fetch_add(1, Ordering::Relaxed);
let generation = old / self.num_procs;
let mut current = old + 1;
while current < (generation + 1) * self.num_procs {
std::thread::sleep(std::time::Duration::from_millis(10));
current = self.count.load(Ordering::Relaxed);
}
}
}
#[test]
fn test_multi_process() {
// Initialize
let max_size = 1_000_000_000_000;
let init_struct = ShmemHandle::new("test_multi_process", 0, max_size).unwrap();
let ptr = init_struct.data_ptr.as_ptr();
// Store the SimpleBarrier in the first 1k of the area.
init_struct.set_size(10000).unwrap();
let barrier_ptr: *mut SimpleBarrier = unsafe {
ptr.add(ptr.align_offset(std::mem::align_of::<SimpleBarrier>()))
.cast()
};
unsafe { SimpleBarrier::init(barrier_ptr, 2) };
let barrier = unsafe { barrier_ptr.as_ref().unwrap() };
// Fork another test process. The code after this runs in both processes concurrently.
let fork_result = unsafe { nix::unistd::fork().unwrap() };
// In the parent, fill bytes between 1000..2000. In the child, between 2000..3000
if fork_result.is_parent() {
write_range(ptr, 0xAA, 1000..2000);
} else {
write_range(ptr, 0xBB, 2000..3000);
}
barrier.wait();
// Verify the contents. (in both processes)
assert_range(ptr, 0xAA, 1000..2000);
assert_range(ptr, 0xBB, 2000..3000);
// Grow, from the child this time
let size = 10_000_000;
if !fork_result.is_parent() {
init_struct.set_size(size).unwrap();
}
barrier.wait();
// make some writes at the end
if fork_result.is_parent() {
write_range(ptr, 0xAA, (size - 10)..size);
} else {
write_range(ptr, 0xBB, (size - 20)..(size - 10));
}
barrier.wait();
// Verify the contents. (This runs in both processes)
assert_range(ptr, 0, (size - 1000)..(size - 20));
assert_range(ptr, 0xBB, (size - 20)..(size - 10));
assert_range(ptr, 0xAA, (size - 10)..size);
if let ForkResult::Parent { child } = fork_result {
nix::sys::wait::waitpid(child, None).unwrap();
}
}
}

View File

@@ -235,7 +235,7 @@ pub enum PageServiceProtocolPipelinedBatchingStrategy {
ScatteredLsn,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(tag = "mode", rename_all = "kebab-case")]
pub enum GetVectoredConcurrentIo {
/// The read path is fully sequential: layers are visited

View File

@@ -0,0 +1,13 @@
[package]
name = "pageserver_page_api"
version = "0.1.0"
edition.workspace = true
license.workspace = true
[dependencies]
prost.workspace = true
tonic.workspace = true
workspace_hack.workspace = true
[build-dependencies]
tonic-build.workspace = true

View File

@@ -0,0 +1,13 @@
use std::env;
use std::path::PathBuf;
/// Generates Rust code from .proto Protobuf schemas, along with a binary file
/// descriptor set for Protobuf schema reflection.
fn main() -> Result<(), Box<dyn std::error::Error>> {
let out_dir = PathBuf::from(env::var("OUT_DIR")?);
tonic_build::configure()
.bytes(["."])
.file_descriptor_set_path(out_dir.join("page_api_descriptor.bin"))
.compile_protos(&["proto/page_service.proto"], &["proto"])
.map_err(|err| err.into())
}

View File

@@ -0,0 +1,233 @@
// Page service, presented by pageservers for computes.
//
// This is the compute read path. It primarily serves page versions at given
// LSNs, but also base backups, SLRU segments, and relation metadata.
//
// EXPERIMENTAL: this is still under development and subject to change.
//
// Request metadata headers:
// - authorization: JWT token ("Bearer <token>"), if auth is enabled
// - neon-tenant-id: tenant ID ("7c4a1f9e3bd6470c8f3e21a65bd2e980")
// - neon-shard-id: shard ID, as <number><count> in hex ("0b10" = shard 11 of 16, 0-based)
// - neon-timeline-id: timeline ID ("f08c4e9a2d5f76b1e3a7c2d8910f4b3e")
//
// The service can be accessed via e.g. grpcurl:
//
// ```
// grpcurl \
// -plaintext \
// -H "neon-tenant-id: 7c4a1f9e3bd6470c8f3e21a65bd2e980" \
// -H "neon-shard-id: 0b10" \
// -H "neon-timeline-id: f08c4e9a2d5f76b1e3a7c2d8910f4b3e" \
// -H "authorization: Bearer $JWT" \
// -d '{"read_lsn": {"request_lsn": 1234567890}, "rel": {"spc_oid": 1663, "db_oid": 1234, "rel_number": 5678, "fork_number": 0}}'
// localhost:51051 page_api.PageService/CheckRelExists
// ```
//
// TODO: consider adding neon-compute-mode ("primary", "static", "replica").
// However, this will require reconnecting when changing modes.
//
// TODO: write implementation guidance on
// - Health checks
// - Tracing, OpenTelemetry
// - Compression
syntax = "proto3";
package page_api;
service PageService {
// Returns whether a relation exists.
rpc CheckRelExists(CheckRelExistsRequest) returns (CheckRelExistsResponse);
// Fetches a base backup.
rpc GetBaseBackup (GetBaseBackupRequest) returns (stream GetBaseBackupResponseChunk);
// Returns the total size of a database, as # of bytes.
rpc GetDbSize (GetDbSizeRequest) returns (GetDbSizeResponse);
// Fetches pages.
//
// This is implemented as a bidirectional streaming RPC for performance. Unary
// requests incur costs for e.g. HTTP/2 stream setup, header parsing,
// authentication, and so on -- with streaming, we only pay these costs during
// the initial stream setup. This ~doubles throughput in benchmarks. Other
// RPCs use regular unary requests, since they are not as frequent and
// performance-critical, and this simplifies implementation.
//
// NB: a status response (e.g. errors) will terminate the stream. The stream
// may be shared by e.g. multiple Postgres backends, so we should avoid this.
// Most errors are therefore sent as GetPageResponse.status instead.
rpc GetPages (stream GetPageRequest) returns (stream GetPageResponse);
// Returns the size of a relation, as # of blocks.
rpc GetRelSize (GetRelSizeRequest) returns (GetRelSizeResponse);
// Fetches an SLRU segment.
rpc GetSlruSegment (GetSlruSegmentRequest) returns (GetSlruSegmentResponse);
}
// The LSN a request should read at.
message ReadLsn {
// The request's read LSN. Required.
uint64 request_lsn = 1;
// If given, the caller guarantees that the page has not been modified since
// this LSN. Must be smaller than or equal to request_lsn. This allows the
// Pageserver to serve an old page without waiting for the request LSN to
// arrive. Valid for all request types.
//
// It is undefined behaviour to make a request such that the page was, in
// fact, modified between request_lsn and not_modified_since_lsn. The
// Pageserver might detect it and return an error, or it might return the old
// page version or the new page version. Setting not_modified_since_lsn equal
// to request_lsn is always safe, but can lead to unnecessary waiting.
uint64 not_modified_since_lsn = 2;
}
// A relation identifier.
message RelTag {
uint32 spc_oid = 1;
uint32 db_oid = 2;
uint32 rel_number = 3;
uint32 fork_number = 4;
}
// Checks whether a relation exists, at the given LSN. Only valid on shard 0,
// other shards will error.
message CheckRelExistsRequest {
ReadLsn read_lsn = 1;
RelTag rel = 2;
}
message CheckRelExistsResponse {
bool exists = 1;
}
// Requests a base backup at a given LSN.
message GetBaseBackupRequest {
// The LSN to fetch a base backup at.
ReadLsn read_lsn = 1;
// If true, logical replication slots will not be created.
bool replica = 2;
}
// Base backup response chunk, returned as an ordered stream.
message GetBaseBackupResponseChunk {
// A basebackup data chunk. The size is undefined, but bounded by the 4 MB
// gRPC message size limit.
bytes chunk = 1;
}
// Requests the size of a database, as # of bytes. Only valid on shard 0, other
// shards will error.
message GetDbSizeRequest {
ReadLsn read_lsn = 1;
uint32 db_oid = 2;
}
message GetDbSizeResponse {
uint64 num_bytes = 1;
}
// Requests one or more pages.
message GetPageRequest {
// A request ID. Will be included in the response. Should be unique for
// in-flight requests on the stream.
uint64 request_id = 1;
// The request class.
GetPageClass request_class = 2;
// The LSN to read at.
ReadLsn read_lsn = 3;
// The relation to read from.
RelTag rel = 4;
// Page numbers to read. Must belong to the remote shard.
//
// Multiple pages will be executed as a single batch by the Pageserver,
// amortizing layer access costs and parallelizing them. This may increase the
// latency of any individual request, but improves the overall latency and
// throughput of the batch as a whole.
//
// TODO: this causes an allocation in the common single-block case. The sender
// can use a SmallVec to stack-allocate it, but Prost will always deserialize
// into a heap-allocated Vec. Consider optimizing this.
//
// TODO: we might be able to avoid a sort or something if we mandate that these
// are always in order. But we can't currenly rely on this on the server, because
// of compatibility with the libpq protocol handler.
repeated uint32 block_number = 5;
}
// A GetPageRequest class. Primarily intended for observability, but may also be
// used for prioritization in the future.
enum GetPageClass {
// Unknown class. For forwards compatibility: used when the client sends a
// class that the server doesn't know about.
GET_PAGE_CLASS_UNKNOWN = 0;
// A normal request. This is the default.
GET_PAGE_CLASS_NORMAL = 1;
// A prefetch request. NB: can only be classified on pg < 18.
GET_PAGE_CLASS_PREFETCH = 2;
// A background request (e.g. vacuum).
GET_PAGE_CLASS_BACKGROUND = 3;
}
// A GetPage response.
//
// A batch response will contain all of the requested pages. We could eagerly
// emit individual pages as soon as they are ready, but on a readv() Postgres
// holds buffer pool locks on all pages in the batch and we'll only return once
// the entire batch is ready, so no one can make use of the individual pages.
message GetPageResponse {
// The original request's ID.
uint64 request_id = 1;
// The response status code.
GetPageStatus status = 2;
// A string describing the status, if any.
string reason = 3;
// The 8KB page images, in the same order as the request. Empty if status != OK.
repeated bytes page_image = 4;
}
// A GetPageResponse status code. Since we use a bidirectional stream, we don't
// want to send errors as gRPC statuses, since this would terminate the stream.
enum GetPageStatus {
// Unknown status. For forwards compatibility: used when the server sends a
// status code that the client doesn't know about.
GET_PAGE_STATUS_UNKNOWN = 0;
// The request was successful.
GET_PAGE_STATUS_OK = 1;
// The page did not exist. The tenant/timeline/shard has already been
// validated during stream setup.
GET_PAGE_STATUS_NOT_FOUND = 2;
// The request was invalid.
GET_PAGE_STATUS_INVALID = 3;
// The tenant is rate limited. Slow down and retry later.
GET_PAGE_STATUS_SLOW_DOWN = 4;
// TODO: consider adding a GET_PAGE_STATUS_LAYER_DOWNLOAD in the case of a
// layer download. This could free up the server task to process other
// requests while the layer download is in progress.
}
// Fetches the size of a relation at a given LSN, as # of blocks. Only valid on
// shard 0, other shards will error.
message GetRelSizeRequest {
ReadLsn read_lsn = 1;
RelTag rel = 2;
}
message GetRelSizeResponse {
uint32 num_blocks = 1;
}
// Requests an SLRU segment. Only valid on shard 0, other shards will error.
message GetSlruSegmentRequest {
ReadLsn read_lsn = 1;
uint32 kind = 2;
uint32 segno = 3;
}
// Returns an SLRU segment.
//
// These are up 32 pages (256 KB), so we can send them as a single response.
message GetSlruSegmentResponse {
bytes segment = 1;
}

View File

@@ -0,0 +1,19 @@
//! This crate provides the Pageserver's page API. It contains:
//!
//! * proto/page_service.proto: the Protobuf schema for the page API.
//! * proto: auto-generated Protobuf types for gRPC.
//!
//! This crate is used by both the client and the server. Try to keep it slim.
// Code generated by protobuf.
pub mod proto {
tonic::include_proto!("page_api");
/// File descriptor set for Protobuf schema reflection. This allows using
/// e.g. grpcurl with the API.
pub const FILE_DESCRIPTOR_SET: &[u8] =
tonic::include_file_descriptor_set!("page_api_descriptor");
pub use page_service_client::PageServiceClient;
pub use page_service_server::{PageService, PageServiceServer};
}

View File

@@ -144,7 +144,7 @@ where
replica,
ctx,
io_concurrency: IoConcurrency::spawn_from_conf(
timeline.conf,
timeline.conf.get_vectored_concurrent_io,
timeline
.gate
.enter()

View File

@@ -3199,7 +3199,7 @@ async fn list_aux_files(
.await?;
let io_concurrency = IoConcurrency::spawn_from_conf(
state.conf,
state.conf.get_vectored_concurrent_io,
timeline.gate.enter().map_err(|_| ApiError::Cancelled)?,
);

View File

@@ -18,7 +18,7 @@ use itertools::Itertools;
use jsonwebtoken::TokenData;
use once_cell::sync::OnceCell;
use pageserver_api::config::{
PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
GetVectoredConcurrentIo, PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
PageServiceProtocolPipelinedBatchingStrategy, PageServiceProtocolPipelinedExecutionStrategy,
};
use pageserver_api::key::rel_block_to_key;
@@ -331,10 +331,10 @@ async fn page_service_conn_main(
// But it's in a shared crate, so, we store connection_ctx inside PageServerHandler
// and create the per-query context in process_query ourselves.
let mut conn_handler = PageServerHandler::new(
conf,
tenant_manager,
auth,
pipelining_config,
conf.get_vectored_concurrent_io,
perf_span_fields,
connection_ctx,
cancel.clone(),
@@ -371,7 +371,6 @@ async fn page_service_conn_main(
}
struct PageServerHandler {
conf: &'static PageServerConf,
auth: Option<Arc<SwappableJwtAuth>>,
claims: Option<Claims>,
@@ -389,6 +388,7 @@ struct PageServerHandler {
timeline_handles: Option<TimelineHandles>,
pipelining_config: PageServicePipeliningConfig,
get_vectored_concurrent_io: GetVectoredConcurrentIo,
gate_guard: GateGuard,
}
@@ -844,17 +844,16 @@ impl BatchedFeMessage {
impl PageServerHandler {
#[allow(clippy::too_many_arguments)]
pub fn new(
conf: &'static PageServerConf,
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<SwappableJwtAuth>>,
pipelining_config: PageServicePipeliningConfig,
get_vectored_concurrent_io: GetVectoredConcurrentIo,
perf_span_fields: ConnectionPerfSpanFields,
connection_ctx: RequestContext,
cancel: CancellationToken,
gate_guard: GateGuard,
) -> Self {
PageServerHandler {
conf,
auth,
claims: None,
connection_ctx,
@@ -862,6 +861,7 @@ impl PageServerHandler {
timeline_handles: Some(TimelineHandles::new(tenant_manager)),
cancel,
pipelining_config,
get_vectored_concurrent_io,
gate_guard,
}
}
@@ -1278,7 +1278,7 @@ impl PageServerHandler {
}
#[instrument(level = tracing::Level::DEBUG, skip_all)]
async fn pagesteam_handle_batched_message<IO>(
async fn pagestream_handle_batched_message<IO>(
&mut self,
pgb_writer: &mut PostgresBackend<IO>,
batch: BatchedFeMessage,
@@ -1623,7 +1623,7 @@ impl PageServerHandler {
}
let io_concurrency = IoConcurrency::spawn_from_conf(
self.conf,
self.get_vectored_concurrent_io,
match self.gate_guard.try_clone() {
Ok(guard) => guard,
Err(_) => {
@@ -1733,7 +1733,7 @@ impl PageServerHandler {
};
let result = self
.pagesteam_handle_batched_message(
.pagestream_handle_batched_message(
pgb_writer,
msg,
io_concurrency.clone(),
@@ -1909,7 +1909,7 @@ impl PageServerHandler {
return Err(e);
}
};
self.pagesteam_handle_batched_message(
self.pagestream_handle_batched_message(
pgb_writer,
batch,
io_concurrency.clone(),

View File

@@ -586,7 +586,7 @@ impl Timeline {
// scan directory listing (new), merge with the old results
let key_range = rel_tag_sparse_key_range(spcnode, dbnode);
let io_concurrency = IoConcurrency::spawn_from_conf(
self.conf,
self.conf.get_vectored_concurrent_io,
self.gate
.enter()
.map_err(|_| PageReconstructError::Cancelled)?,
@@ -645,7 +645,7 @@ impl Timeline {
);
let io_concurrency = IoConcurrency::spawn_from_conf(
self.conf,
self.conf.get_vectored_concurrent_io,
self.gate
.enter()
.map_err(|_| PageReconstructError::Cancelled)?,
@@ -885,7 +885,7 @@ impl Timeline {
);
let io_concurrency = IoConcurrency::spawn_from_conf(
self.conf,
self.conf.get_vectored_concurrent_io,
self.gate
.enter()
.map_err(|_| PageReconstructError::Cancelled)?,

View File

@@ -8596,8 +8596,10 @@ mod tests {
lsn: Lsn,
ctx: &RequestContext,
) -> Result<Option<Bytes>, GetVectoredError> {
let io_concurrency =
IoConcurrency::spawn_from_conf(tline.conf, tline.gate.enter().unwrap());
let io_concurrency = IoConcurrency::spawn_from_conf(
tline.conf.get_vectored_concurrent_io,
tline.gate.enter().unwrap(),
);
let mut reconstruct_state = ValuesReconstructState::new(io_concurrency);
let query = VersionedKeySpaceQuery::uniform(KeySpace::single(key..key.next()), lsn);
let mut res = tline

View File

@@ -31,6 +31,7 @@ pub use inmemory_layer::InMemoryLayer;
pub(crate) use layer::{EvictionError, Layer, ResidentLayer};
pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey};
pub use layer_name::{DeltaLayerName, ImageLayerName, LayerName};
use pageserver_api::config::GetVectoredConcurrentIo;
use pageserver_api::key::Key;
use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
use pageserver_api::record::NeonWalRecord;
@@ -43,7 +44,6 @@ use self::inmemory_layer::InMemoryLayerFileId;
use super::PageReconstructError;
use super::layer_map::InMemoryLayerDesc;
use super::timeline::{GetVectoredError, ReadPath};
use crate::config::PageServerConf;
use crate::context::{
AccessStatsBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
};
@@ -318,11 +318,10 @@ impl IoConcurrency {
}
pub(crate) fn spawn_from_conf(
conf: &'static PageServerConf,
conf: GetVectoredConcurrentIo,
gate_guard: GateGuard,
) -> IoConcurrency {
use pageserver_api::config::GetVectoredConcurrentIo;
let selected = match conf.get_vectored_concurrent_io {
let selected = match conf {
GetVectoredConcurrentIo::Sequential => SelectedIoConcurrency::Sequential,
GetVectoredConcurrentIo::SidecarTask => SelectedIoConcurrency::SidecarTask(gate_guard),
};

View File

@@ -3530,7 +3530,7 @@ impl Timeline {
};
let io_concurrency = IoConcurrency::spawn_from_conf(
self_ref.conf,
self_ref.conf.get_vectored_concurrent_io,
self_ref
.gate
.enter()
@@ -5559,7 +5559,7 @@ impl Timeline {
});
let io_concurrency = IoConcurrency::spawn_from_conf(
self.conf,
self.conf.get_vectored_concurrent_io,
self.gate
.enter()
.map_err(|_| CreateImageLayersError::Cancelled)?,

View File

@@ -188,7 +188,7 @@ pub(crate) async fn generate_tombstone_image_layer(
"removing non-inherited keys by writing an image layer with tombstones at the detach LSN"
);
let io_concurrency = IoConcurrency::spawn_from_conf(
detached.conf,
detached.conf.get_vectored_concurrent_io,
detached.gate.enter().map_err(|_| Error::ShuttingDown)?,
);
let mut reconstruct_state = ValuesReconstructState::new(io_concurrency);

View File

@@ -12,6 +12,7 @@
#include "postgres.h"
#include <sys/file.h>
#include <sys/ioctl.h>
#include <unistd.h>
#include <fcntl.h>
@@ -52,6 +53,10 @@
#include "pagestore_client.h"
#include "communicator.h"
/* For the kernel module */
#include "neon_pagecache.h"
#define CLOCKCACHE_DEV_PATH "/dev/clockcache_dev"
#define CriticalAssert(cond) do if (!(cond)) elog(PANIC, "LFC: assertion %s failed at %s:%d: ", #cond, __FILE__, __LINE__); while (0)
/*
@@ -159,6 +164,13 @@ typedef struct FileCacheControl
uint64 time_write; /* time spent writing (us) */
uint64 resizes; /* number of LFC resizes */
uint64 evicted_pages; /* number of evicted pages */
/* FIXME: should make these atomic, they're not protected by any locks */
uint64 kernel_module_read_hits; /* success returns from read ioctl */
uint64 kernel_module_read_misses; /* ENOENT returns from read ioctl */
uint64 kernel_module_write_hits; /* success returns from write ioctl */
uint64 kernel_module_write_misses; /* ENOMEM returns from write ioctl */
dlist_head lru; /* double linked list for LRU replacement
* algorithm */
dlist_head holes; /* double linked list of punched holes */
@@ -183,6 +195,7 @@ typedef struct FileCacheControl
static HTAB *lfc_hash;
static int lfc_desc = -1;
static LWLockId lfc_lock;
static int lfc_max_size;
static int lfc_size_limit;
static int lfc_prewarm_limit;
@@ -190,6 +203,8 @@ static int lfc_prewarm_batch;
static int lfc_chunk_size_log = MAX_BLOCKS_PER_CHUNK_LOG;
static int lfc_blocks_per_chunk = MAX_BLOCKS_PER_CHUNK;
static char *lfc_path;
static bool lfc_use_kernel_module;
static uint64 lfc_generation;
static FileCacheControl *lfc_ctl;
static bool lfc_do_prewarm;
@@ -203,6 +218,9 @@ bool lfc_prewarm_update_ws_estimation;
#define LFC_ENABLED() (lfc_ctl->limit != 0)
static int pread_with_ioctl(void *buffer, uint64 blkno);
static int pwrite_with_ioctl(const void *buffer, uint64 blkno);
/*
* Close LFC file if opened.
* All backends should close their LFC files once LFC is disabled.
@@ -251,14 +269,19 @@ lfc_switch_off(void)
/*
* We need to use unlink to to avoid races in LFC write, because it is not
* protected by lock
*
* FIXME: how to clean up the kernel module device on trouble?
*/
unlink(lfc_path);
if (!lfc_use_kernel_module)
{
unlink(lfc_path);
fd = BasicOpenFile(lfc_path, O_RDWR | O_CREAT | O_TRUNC);
if (fd < 0)
elog(WARNING, "LFC: failed to recreate local file cache %s: %m", lfc_path);
else
close(fd);
fd = BasicOpenFile(lfc_path, O_RDWR | O_CREAT | O_TRUNC);
if (fd < 0)
elog(WARNING, "LFC: failed to recreate local file cache %s: %m", lfc_path);
else
close(fd);
}
/* Wakeup waiting backends */
for (int i = 0; i < N_COND_VARS; i++)
@@ -270,7 +293,8 @@ lfc_switch_off(void)
static void
lfc_disable(char const *op)
{
elog(WARNING, "LFC: failed to %s local file cache at %s: %m, disabling local file cache", op, lfc_path);
elog(WARNING, "LFC: failed to %s local file cache at %s: %m, disabling local file cache",
op, lfc_use_kernel_module ? CLOCKCACHE_DEV_PATH : lfc_path);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
lfc_switch_off();
@@ -301,7 +325,9 @@ lfc_ensure_opened(void)
/* Open cache file if not done yet */
if (lfc_desc < 0)
{
lfc_desc = BasicOpenFile(lfc_path, O_RDWR);
lfc_desc = BasicOpenFile(
lfc_use_kernel_module ? CLOCKCACHE_DEV_PATH : lfc_path,
O_RDWR);
if (lfc_desc < 0)
{
@@ -351,10 +377,16 @@ lfc_shmem_startup(void)
initSHLL(&lfc_ctl->wss_estimation);
/* Recreate file cache on restart */
fd = BasicOpenFile(lfc_path, O_RDWR | O_CREAT | O_TRUNC);
if (lfc_use_kernel_module)
fd = BasicOpenFile(CLOCKCACHE_DEV_PATH, O_RDWR);
else
fd = BasicOpenFile(lfc_path, O_RDWR | O_CREAT | O_TRUNC);
if (fd < 0)
{
elog(WARNING, "LFC: failed to create local file cache %s: %m", lfc_path);
if (lfc_use_kernel_module)
elog(WARNING, "LFC: failed to open " CLOCKCACHE_DEV_PATH ": %m");
else
elog(WARNING, "LFC: failed to create local file cache %s: %m", lfc_path);
lfc_ctl->limit = 0;
}
else
@@ -613,6 +645,15 @@ lfc_init(void)
NULL,
NULL);
DefineCustomBoolVariable("neon.use_kernel_module",
"Use neon_pagecache kernel module instead of a regular file (EXPERIMENTAL)",
NULL,
&lfc_use_kernel_module,
true,
PGC_POSTMASTER,
0,
NULL, NULL, NULL);
if (lfc_max_size == 0)
return;
@@ -936,6 +977,44 @@ lfc_prewarm_main(Datum main_arg)
lfc_ctl->prewarm_workers[worker_id].completed = GetCurrentTimestamp();
}
void
lfc_invalidate(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber nblocks)
{
BufferTag tag;
FileCacheEntry *entry;
uint32 hash;
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
return;
CopyNRelFileInfoToBufTag(tag, rinfo);
tag.forkNum = forkNum;
CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
if (LFC_ENABLED())
{
for (BlockNumber blkno = 0; blkno < nblocks; blkno += lfc_blocks_per_chunk)
{
tag.blockNum = blkno;
hash = get_hash_value(lfc_hash, &tag);
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
if (entry != NULL)
{
for (int i = 0; i < lfc_blocks_per_chunk; i++)
{
if (GET_STATE(entry, i) == AVAILABLE)
{
lfc_ctl->used_pages -= 1;
SET_STATE(entry, i, UNAVAILABLE);
}
}
}
}
}
LWLockRelease(lfc_lock);
}
/*
* Check if page is present in the cache.
@@ -1259,27 +1338,57 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
/* offset of first IOV */
first_read_offset += chunk_offs + first_block_in_chunk_read;
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_READ);
/* Read only the blocks we're interested in, limiting */
rc = preadv(lfc_desc, &iov[first_block_in_chunk_read],
nwrite, first_read_offset * BLCKSZ);
pgstat_report_wait_end();
if (rc != (BLCKSZ * nwrite))
if (lfc_use_kernel_module)
{
lfc_disable("read");
return -1;
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_READ);
for (int i = first_block_in_chunk_read; i < iov_last_used; i++)
{
if (!BITMAP_ISSET(chunk_mask, i))
continue;
Assert(iov[i].iov_len == BLCKSZ);
rc = pread_with_ioctl(iov[i].iov_base, first_read_offset + i - first_block_in_chunk_read);
if (rc < 0 && errno == ENOENT)
{
/* The kernel module evicted the page */
elog(DEBUG1, "kernel module had evicted block");
}
else if (rc < 0)
{
pgstat_report_wait_end();
lfc_disable("ioctl read");
return -1;
}
else
{
/* success! */
BITMAP_SET(mask, buf_offset + i);
}
}
pgstat_report_wait_end();
}
/*
* We successfully read the pages we know were valid when we
* started reading; now mark those pages as read
*/
for (int i = first_block_in_chunk_read; i < iov_last_used; i++)
else
{
if (BITMAP_ISSET(chunk_mask, i))
BITMAP_SET(mask, buf_offset + i);
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_READ);
rc = preadv(lfc_desc, &iov[first_block_in_chunk_read],
nwrite, first_read_offset * BLCKSZ);
pgstat_report_wait_end();
if (rc != (BLCKSZ * nwrite))
{
lfc_disable("read");
return -1;
}
/*
* We successfully read the pages we know were valid when we
* started reading; now mark those pages as read
*/
for (int i = first_block_in_chunk_read; i < iov_last_used; i++)
{
if (BITMAP_ISSET(chunk_mask, i))
BITMAP_SET(mask, buf_offset + i);
}
}
}
@@ -1326,6 +1435,65 @@ lfc_readv_select(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
return blocks_read;
}
static int
pread_with_ioctl(void *buffer, uint64 blkno)
{
struct neon_rw_args args = {
.key = {
.key_hi = 0,
.key_lo = blkno
},
.offset = 0,
.length = POSTGRES_PAGE_SIZE,
.buffer = (__u64)(uintptr_t) buffer
};
int rc;
errno = 0;
elog(LOG, "calling ioctl read for blk %lu with buffer=%p (shared_buffers is at %p-%p)",
blkno,
buffer,
BufferBlocks,
BufferBlocks + BLCKSZ * NBuffers);
rc = ioctl(lfc_desc, NEON_IOCTL_READ, &args);
if (rc >= 0)
lfc_ctl->kernel_module_read_hits++;
else if (rc < 0 && errno == ENOENT)
lfc_ctl->kernel_module_read_misses++;
else
elog(LOG, "ioctl read failed for blk %lu with buffer=%p: %m", blkno, buffer);
return rc;
}
static int
pwrite_with_ioctl(const void *buffer, uint64 blkno)
{
struct neon_rw_args args = {
.key = {
.key_hi = 0,
.key_lo = blkno
},
.offset = 0,
.length = POSTGRES_PAGE_SIZE,
.buffer = (__u64)(uintptr_t) buffer
};
int rc;
elog(LOG, "calling ioctl write for blk %lu with buffer=%p (shared_buffers is at %p-%p)",
blkno,
buffer,
BufferBlocks,
BufferBlocks + BLCKSZ * NBuffers);
rc = ioctl(lfc_desc, NEON_IOCTL_WRITE, &args);
if (rc >= 0)
lfc_ctl->kernel_module_write_hits++;
else if (rc < 0 && errno == ENOMEM)
lfc_ctl->kernel_module_write_misses++;
return rc;
}
/*
* Initialize new LFC hash entry, perform eviction if needed.
* Returns false if there are no unpinned entries and chunk can not be added.
@@ -1446,7 +1614,6 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
{
BufferTag tag;
FileCacheEntry *entry;
ssize_t rc;
bool found;
uint32 hash;
uint64 generation;
@@ -1455,6 +1622,7 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
ConditionVariable* cv;
FileCacheBlockState state;
XLogRecPtr lwlsn;
bool success;
int chunk_offs = BLOCK_TO_CHUNK_OFF(blkno);
@@ -1533,16 +1701,60 @@ lfc_prefetch(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno,
LWLockRelease(lfc_lock);
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_WRITE);
INSTR_TIME_SET_CURRENT(io_start);
rc = pwrite(lfc_desc, buffer, BLCKSZ,
((off_t) entry_offset * lfc_blocks_per_chunk + chunk_offs) * BLCKSZ);
INSTR_TIME_SET_CURRENT(io_end);
pgstat_report_wait_end();
if (rc != BLCKSZ)
if (lfc_use_kernel_module)
{
lfc_disable("write");
int rc;
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_WRITE);
INSTR_TIME_SET_CURRENT(io_start);
rc = pwrite_with_ioctl(buffer,
entry_offset * lfc_blocks_per_chunk + chunk_offs);
INSTR_TIME_SET_CURRENT(io_end);
pgstat_report_wait_end();
if (rc < 0 && errno == ENOMEM)
{
/*
* Write was wasted.
*
* FIXME: We could mark the page in the chunk as UNAVAILABLE,
* since we know it was not actually present in the kernel
* cache. Any subsequent read on it will inevitably fail with
* ENOENT. That's not a correctness issue however, assuming that
* the call never returns ENOMEM when the old version of the page
* is still in the cache.
*/
success = true;
}
else if (rc < 0)
{
success = false;
}
else
{
/* successful write */
success = true;
}
}
else
{
ssize_t rc;
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_WRITE);
INSTR_TIME_SET_CURRENT(io_start);
rc = pwrite(lfc_desc, buffer, BLCKSZ,
((off_t) entry_offset * lfc_blocks_per_chunk + chunk_offs) * BLCKSZ);
INSTR_TIME_SET_CURRENT(io_end);
pgstat_report_wait_end();
success = (rc == BLCKSZ);
}
if (!success)
{
lfc_disable(lfc_use_kernel_module ? "write ioctl" : "write");
}
else
{
@@ -1718,19 +1930,60 @@ lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
}
LWLockRelease(lfc_lock);
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_WRITE);
INSTR_TIME_SET_CURRENT(io_start);
rc = pwritev(lfc_desc, iov, blocks_in_chunk,
((off_t) entry_offset * lfc_blocks_per_chunk + chunk_offs) * BLCKSZ);
INSTR_TIME_SET_CURRENT(io_end);
pgstat_report_wait_end();
if (rc != BLCKSZ * blocks_in_chunk)
/* Perform the write */
if (lfc_use_kernel_module)
{
lfc_disable("write");
return;
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_WRITE);
INSTR_TIME_SET_CURRENT(io_start);
for (int i = 0; i < blocks_in_chunk; i++)
{
int rc;
rc = pwrite_with_ioctl(
iov[i].iov_base,
entry_offset * lfc_blocks_per_chunk + chunk_offs
);
if (rc < 0 && errno == ENOMEM)
{
/*
* Write was wasted.
*
* FIXME: We could mark the page in the chunk as UNAVAILABLE,
* since we know it was not actually present in the kernel
* cache. Any subsequent read on it will inevitably fail with
* ENOENT. That's not a correctness issue however, assuming that
* the call never returns ENOMEM when the old version of the page
* is still in the cache.
*/
}
else if (rc < 0)
{
/* other error, not expected */
INSTR_TIME_SET_CURRENT(io_end);
pgstat_report_wait_end();
lfc_disable("write ioctl");
return;
}
}
INSTR_TIME_SET_CURRENT(io_end);
pgstat_report_wait_end();
}
else
{
pgstat_report_wait_start(WAIT_EVENT_NEON_LFC_WRITE);
INSTR_TIME_SET_CURRENT(io_start);
rc = pwritev(lfc_desc, iov, blocks_in_chunk,
((off_t) entry_offset * lfc_blocks_per_chunk + chunk_offs) * BLCKSZ);
INSTR_TIME_SET_CURRENT(io_end);
pgstat_report_wait_end();
if (rc != BLCKSZ * blocks_in_chunk)
{
lfc_disable("write");
return;
}
}
/* success */
{
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
@@ -1884,6 +2137,26 @@ neon_get_lfc_stats(PG_FUNCTION_ARGS)
if (lfc_ctl)
value = lfc_ctl->pinned;
break;
case 10:
key = "file_cache_kernel_module_read_hits";
if (lfc_ctl)
value = lfc_ctl->kernel_module_read_hits;
break;
case 11:
key = "file_cache_kernel_module_read_misses";
if (lfc_ctl)
value = lfc_ctl->kernel_module_read_misses;
break;
case 12:
key = "file_cache_kernel_module_write_hits";
if (lfc_ctl)
value = lfc_ctl->kernel_module_write_hits;
break;
case 13:
key = "file_cache_kernel_module_write_misses";
if (lfc_ctl)
value = lfc_ctl->kernel_module_write_misses;
break;
default:
SRF_RETURN_DONE(funcctx);
}

View File

@@ -28,6 +28,7 @@ typedef struct FileCacheState
extern bool lfc_store_prefetch_result;
/* functions for local file cache */
extern void lfc_invalidate(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber nblocks);
extern void lfc_writev(NRelFileInfo rinfo, ForkNumber forkNum,
BlockNumber blkno, const void *const *buffers,
BlockNumber nblocks);

View File

@@ -0,0 +1,35 @@
/*
* This is for the special ioctl in the neon_pagecache kernel module.
*
* DO NOT MODIFY! This header must agree with what the kernel module was
* compiled with!
*/
#ifndef NEON_PAGECACHE_H
#define NEON_PAGECACHE_H
#include <linux/types.h>
#define POSTGRES_PAGE_SIZE 8192 // 8 KiB
struct neon_key {
__u64 key_hi; // Upper 64 bits of 128-bit key
__u64 key_lo; // Lower 64 bits of 128-bit key
};
struct neon_rw_args {
struct neon_key key;
__u32 offset; // Offset within page (0-8191)
__u32 length; // Length to read/write
__u64 buffer; // User buffer address
};
#define NEON_IOC_MAGIC 'N'
#define NEON_IOCTL_READ _IOWR(NEON_IOC_MAGIC, 1, struct neon_rw_args)
#define NEON_IOCTL_WRITE _IOWR(NEON_IOC_MAGIC, 2, struct neon_rw_args)
#endif /* NEON_PAGECACHE_H */

View File

@@ -86,7 +86,7 @@ InitBufferTag(BufferTag *tag, const RelFileNode *rnode,
#define InvalidRelFileNumber InvalidOid
#define SMgrRelGetRelInfo(reln) \
#define SMgrRelGetRelInfo(reln) \
(reln->smgr_rnode.node)
#define DropRelationAllLocalBuffers DropRelFileNodeAllLocalBuffers
@@ -148,6 +148,12 @@ InitBufferTag(BufferTag *tag, const RelFileNode *rnode,
#define DropRelationAllLocalBuffers DropRelationAllLocalBuffers
#endif
#define NRelFileInfoInvalidate(rinfo) do { \
NInfoGetSpcOid(rinfo) = InvalidOid; \
NInfoGetDbOid(rinfo) = InvalidOid; \
NInfoGetRelNumber(rinfo) = InvalidRelFileNumber; \
} while (0)
#if PG_MAJORVERSION_NUM < 17
#define ProcNumber BackendId
#define INVALID_PROC_NUMBER InvalidBackendId

View File

@@ -108,7 +108,7 @@ typedef enum
UNLOGGED_BUILD_NOT_PERMANENT
} UnloggedBuildPhase;
static SMgrRelation unlogged_build_rel = NULL;
static NRelFileInfo unlogged_build_rel_info;
static UnloggedBuildPhase unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
static bool neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id);
@@ -912,16 +912,19 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
{
case 0:
neon_log(ERROR, "cannot call smgrextend() on rel with unknown persistence");
break;
case RELPERSISTENCE_PERMANENT:
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
{
mdextend(reln, forkNum, blkno, buffer, skipFsync);
return;
}
break;
case RELPERSISTENCE_TEMP:
case RELPERSISTENCE_UNLOGGED:
mdextend(reln, forkNum, blkno, buffer, skipFsync);
/* Update LFC in case of unlogged index build */
if (reln == unlogged_build_rel && unlogged_build_phase == UNLOGGED_BUILD_PHASE_2)
lfc_write(InfoFromSMgrRel(reln), forkNum, blkno, buffer);
return;
default:
@@ -1003,21 +1006,19 @@ neon_zeroextend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blocknum,
{
case 0:
neon_log(ERROR, "cannot call smgrextend() on rel with unknown persistence");
break;
case RELPERSISTENCE_PERMANENT:
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
{
mdzeroextend(reln, forkNum, blocknum, nblocks, skipFsync);
return;
}
break;
case RELPERSISTENCE_TEMP:
case RELPERSISTENCE_UNLOGGED:
mdzeroextend(reln, forkNum, blocknum, nblocks, skipFsync);
/* Update LFC in case of unlogged index build */
if (reln == unlogged_build_rel && unlogged_build_phase == UNLOGGED_BUILD_PHASE_2)
{
for (int i = 0; i < nblocks; i++)
{
lfc_write(InfoFromSMgrRel(reln), forkNum, blocknum + i, buffer.data);
}
}
return;
default:
@@ -1387,8 +1388,14 @@ neon_read(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno, void *buffer
{
case 0:
neon_log(ERROR, "cannot call smgrread() on rel with unknown persistence");
break;
case RELPERSISTENCE_PERMANENT:
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
{
mdread(reln, forkNum, blkno, buffer);
return;
}
break;
case RELPERSISTENCE_TEMP:
@@ -1474,8 +1481,14 @@ neon_readv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
{
case 0:
neon_log(ERROR, "cannot call smgrread() on rel with unknown persistence");
break;
case RELPERSISTENCE_PERMANENT:
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
{
mdreadv(reln, forknum, blocknum, buffers, nblocks);
return;
}
break;
case RELPERSISTENCE_TEMP:
@@ -1608,6 +1621,15 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const vo
break;
case RELPERSISTENCE_PERMANENT:
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
{
#if PG_MAJORVERSION_NUM >= 17
mdwritev(reln, forknum, blocknum, &buffer, 1, skipFsync);
#else
mdwrite(reln, forknum, blocknum, buffer, skipFsync);
#endif
return;
}
break;
case RELPERSISTENCE_TEMP:
@@ -1617,9 +1639,6 @@ neon_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, const vo
#else
mdwrite(reln, forknum, blocknum, buffer, skipFsync);
#endif
/* Update LFC in case of unlogged index build */
if (reln == unlogged_build_rel && unlogged_build_phase == UNLOGGED_BUILD_PHASE_2)
lfc_write(InfoFromSMgrRel(reln), forknum, blocknum, buffer);
return;
default:
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
@@ -1680,14 +1699,16 @@ neon_writev(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno,
break;
case RELPERSISTENCE_PERMANENT:
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
{
mdwritev(reln, forknum, blkno, buffers, nblocks, skipFsync);
return;
}
break;
case RELPERSISTENCE_TEMP:
case RELPERSISTENCE_UNLOGGED:
mdwritev(reln, forknum, blkno, buffers, nblocks, skipFsync);
/* Update LFC in case of unlogged index build */
if (reln == unlogged_build_rel && unlogged_build_phase == UNLOGGED_BUILD_PHASE_2)
lfc_writev(InfoFromSMgrRel(reln), forknum, blkno, buffers, nblocks);
return;
default:
neon_log(ERROR, "unknown relpersistence '%c'", reln->smgr_relpersistence);
@@ -1723,6 +1744,10 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
break;
case RELPERSISTENCE_PERMANENT:
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
{
return mdnblocks(reln, forknum);
}
break;
case RELPERSISTENCE_TEMP:
@@ -1792,6 +1817,11 @@ neon_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber old_blocks, Blo
break;
case RELPERSISTENCE_PERMANENT:
if (RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)))
{
mdtruncate(reln, forknum, old_blocks, nblocks);
return;
}
break;
case RELPERSISTENCE_TEMP:
@@ -1930,7 +1960,6 @@ neon_start_unlogged_build(SMgrRelation reln)
*/
if (unlogged_build_phase != UNLOGGED_BUILD_NOT_IN_PROGRESS)
neon_log(ERROR, "unlogged relation build is already in progress");
Assert(unlogged_build_rel == NULL);
ereport(SmgrTrace,
(errmsg(NEON_TAG "starting unlogged build of relation %u/%u/%u",
@@ -1947,7 +1976,7 @@ neon_start_unlogged_build(SMgrRelation reln)
case RELPERSISTENCE_TEMP:
case RELPERSISTENCE_UNLOGGED:
unlogged_build_rel = reln;
unlogged_build_rel_info = InfoFromSMgrRel(reln);
unlogged_build_phase = UNLOGGED_BUILD_NOT_PERMANENT;
#ifdef DEBUG_COMPARE_LOCAL
if (!IsParallelWorker())
@@ -1968,12 +1997,9 @@ neon_start_unlogged_build(SMgrRelation reln)
neon_log(ERROR, "cannot perform unlogged index build, index is not empty ");
#endif
unlogged_build_rel = reln;
unlogged_build_rel_info = InfoFromSMgrRel(reln);
unlogged_build_phase = UNLOGGED_BUILD_PHASE_1;
/* Make the relation look like it's unlogged */
reln->smgr_relpersistence = RELPERSISTENCE_UNLOGGED;
/*
* Create the local file. In a parallel build, the leader is expected to
* call this first and do it.
@@ -2000,17 +2026,16 @@ neon_start_unlogged_build(SMgrRelation reln)
static void
neon_finish_unlogged_build_phase_1(SMgrRelation reln)
{
Assert(unlogged_build_rel == reln);
Assert(RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)));
ereport(SmgrTrace,
(errmsg(NEON_TAG "finishing phase 1 of unlogged build of relation %u/%u/%u",
RelFileInfoFmt(InfoFromSMgrRel(reln)))));
RelFileInfoFmt((unlogged_build_rel_info)))));
if (unlogged_build_phase == UNLOGGED_BUILD_NOT_PERMANENT)
return;
Assert(unlogged_build_phase == UNLOGGED_BUILD_PHASE_1);
Assert(reln->smgr_relpersistence == RELPERSISTENCE_UNLOGGED);
/*
* In a parallel build, (only) the leader process performs the 2nd
@@ -2018,7 +2043,7 @@ neon_finish_unlogged_build_phase_1(SMgrRelation reln)
*/
if (IsParallelWorker())
{
unlogged_build_rel = NULL;
NRelFileInfoInvalidate(unlogged_build_rel_info);
unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
}
else
@@ -2039,11 +2064,11 @@ neon_end_unlogged_build(SMgrRelation reln)
{
NRelFileInfoBackend rinfob = InfoBFromSMgrRel(reln);
Assert(unlogged_build_rel == reln);
Assert(RelFileInfoEquals(unlogged_build_rel_info, InfoFromSMgrRel(reln)));
ereport(SmgrTrace,
(errmsg(NEON_TAG "ending unlogged build of relation %u/%u/%u",
RelFileInfoFmt(InfoFromNInfoB(rinfob)))));
RelFileInfoFmt(unlogged_build_rel_info))));
if (unlogged_build_phase != UNLOGGED_BUILD_NOT_PERMANENT)
{
@@ -2051,7 +2076,6 @@ neon_end_unlogged_build(SMgrRelation reln)
BlockNumber nblocks;
Assert(unlogged_build_phase == UNLOGGED_BUILD_PHASE_2);
Assert(reln->smgr_relpersistence == RELPERSISTENCE_UNLOGGED);
/*
* Update the last-written LSN cache.
@@ -2072,9 +2096,6 @@ neon_end_unlogged_build(SMgrRelation reln)
InfoFromNInfoB(rinfob),
MAIN_FORKNUM);
/* Make the relation look permanent again */
reln->smgr_relpersistence = RELPERSISTENCE_PERMANENT;
/* Remove local copy */
for (int forknum = 0; forknum <= MAX_FORKNUM; forknum++)
{
@@ -2083,6 +2104,8 @@ neon_end_unlogged_build(SMgrRelation reln)
forknum);
forget_cached_relsize(InfoFromNInfoB(rinfob), forknum);
lfc_invalidate(InfoFromNInfoB(rinfob), forknum, nblocks);
mdclose(reln, forknum);
#ifndef DEBUG_COMPARE_LOCAL
/* use isRedo == true, so that we drop it immediately */
@@ -2093,7 +2116,7 @@ neon_end_unlogged_build(SMgrRelation reln)
mdunlink(rinfob, INIT_FORKNUM, true);
#endif
}
unlogged_build_rel = NULL;
NRelFileInfoInvalidate(unlogged_build_rel_info);
unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
}
@@ -2166,7 +2189,7 @@ AtEOXact_neon(XactEvent event, void *arg)
* Forget about any build we might have had in progress. The local
* file will be unlinked by smgrDoPendingDeletes()
*/
unlogged_build_rel = NULL;
NRelFileInfoInvalidate(unlogged_build_rel_info);
unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
break;
@@ -2178,7 +2201,7 @@ AtEOXact_neon(XactEvent event, void *arg)
case XACT_EVENT_PRE_PREPARE:
if (unlogged_build_phase != UNLOGGED_BUILD_NOT_IN_PROGRESS)
{
unlogged_build_rel = NULL;
NRelFileInfoInvalidate(unlogged_build_rel_info);
unlogged_build_phase = UNLOGGED_BUILD_NOT_IN_PROGRESS;
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),

15
poetry.lock generated
View File

@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand.
# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand.
[[package]]
name = "aiohappyeyeballs"
@@ -1145,18 +1145,19 @@ dotenv = ["python-dotenv"]
[[package]]
name = "flask-cors"
version = "5.0.0"
description = "A Flask extension adding a decorator for CORS support"
version = "6.0.0"
description = "A Flask extension simplifying CORS support"
optional = false
python-versions = "*"
python-versions = "<4.0,>=3.9"
groups = ["main"]
files = [
{file = "Flask_Cors-5.0.0-py2.py3-none-any.whl", hash = "sha256:b9e307d082a9261c100d8fb0ba909eec6a228ed1b60a8315fd85f783d61910bc"},
{file = "flask_cors-5.0.0.tar.gz", hash = "sha256:5aadb4b950c4e93745034594d9f3ea6591f734bb3662e16e255ffbf5e89c88ef"},
{file = "flask_cors-6.0.0-py3-none-any.whl", hash = "sha256:6332073356452343a8ccddbfec7befdc3fdd040141fe776ec9b94c262f058657"},
{file = "flask_cors-6.0.0.tar.gz", hash = "sha256:4592c1570246bf7beee96b74bc0adbbfcb1b0318f6ba05c412e8909eceec3393"},
]
[package.dependencies]
Flask = ">=0.9"
flask = ">=0.9"
Werkzeug = ">=0.7"
[[package]]
name = "frozenlist"

View File

@@ -161,8 +161,11 @@ struct ProxyCliArgs {
#[clap(long, default_values_t = RateBucketInfo::DEFAULT_REDIS_SET)]
redis_rps_limit: Vec<RateBucketInfo>,
/// Cancellation channel size (max queue size for redis kv client)
#[clap(long, default_value = "1024")]
#[clap(long, default_value_t = 1024)]
cancellation_ch_size: usize,
/// Cancellation ops batch size for redis
#[clap(long, default_value_t = 8)]
cancellation_batch_size: usize,
/// cache for `allowed_ips` (use `size=0` to disable)
#[clap(long, default_value = config::CacheOptions::CACHE_DEFAULT_OPTIONS)]
allowed_ips_cache: String,
@@ -542,7 +545,12 @@ pub async fn run() -> anyhow::Result<()> {
if let Some(mut redis_kv_client) = redis_kv_client {
maintenance_tasks.spawn(async move {
redis_kv_client.try_connect().await?;
handle_cancel_messages(&mut redis_kv_client, rx_cancel).await?;
handle_cancel_messages(
&mut redis_kv_client,
rx_cancel,
args.cancellation_batch_size,
)
.await?;
drop(redis_kv_client);

View File

@@ -30,8 +30,6 @@ use crate::tls::postgres_rustls::MakeRustlsConnect;
type IpSubnetKey = IpNet;
const CANCEL_KEY_TTL: i64 = 1_209_600; // 2 weeks cancellation key expire time
const REDIS_SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(10);
const BATCH_SIZE: usize = 8;
// Message types for sending through mpsc channel
pub enum CancelKeyOp {
@@ -231,12 +229,13 @@ impl CancelReplyOp {
pub async fn handle_cancel_messages(
client: &mut RedisKVClient,
mut rx: mpsc::Receiver<CancelKeyOp>,
batch_size: usize,
) -> anyhow::Result<()> {
let mut batch = Vec::with_capacity(BATCH_SIZE);
let mut pipeline = Pipeline::with_capacity(BATCH_SIZE);
let mut batch = Vec::with_capacity(batch_size);
let mut pipeline = Pipeline::with_capacity(batch_size);
loop {
if rx.recv_many(&mut batch, BATCH_SIZE).await == 0 {
if rx.recv_many(&mut batch, batch_size).await == 0 {
warn!("shutting down cancellation queue");
break Ok(());
}
@@ -367,8 +366,7 @@ impl CancellationHandler {
return Err(CancelError::InternalError);
};
tx.send_timeout(op, REDIS_SEND_TIMEOUT)
.await
tx.try_send(op)
.map_err(|e| {
tracing::warn!("failed to send GetCancelData for {key}: {e}");
})
@@ -570,7 +568,7 @@ impl Session {
}
// Send the store key op to the cancellation handler and set TTL for the key
pub(crate) async fn write_cancel_key(
pub(crate) fn write_cancel_key(
&self,
cancel_closure: CancelClosure,
) -> Result<(), CancelError> {
@@ -596,14 +594,14 @@ impl Session {
expire: CANCEL_KEY_TTL,
};
let _ = tx.send_timeout(op, REDIS_SEND_TIMEOUT).await.map_err(|e| {
let _ = tx.try_send(op).map_err(|e| {
let key = self.key;
tracing::warn!("failed to send StoreCancelKey for {key}: {e}");
});
Ok(())
}
pub(crate) async fn remove_cancel_key(&self) -> Result<(), CancelError> {
pub(crate) fn remove_cancel_key(&self) -> Result<(), CancelError> {
let Some(tx) = &self.cancellation_handler.tx else {
tracing::warn!("cancellation handler is not available");
return Err(CancelError::InternalError);
@@ -619,7 +617,7 @@ impl Session {
.guard(RedisMsgKind::HDel),
};
let _ = tx.send_timeout(op, REDIS_SEND_TIMEOUT).await.map_err(|e| {
let _ = tx.try_send(op).map_err(|e| {
let key = self.key;
tracing::warn!("failed to send RemoveCancelKey for {key}: {e}");
});

View File

@@ -244,9 +244,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
let cancellation_handler_clone = Arc::clone(&cancellation_handler);
let session = cancellation_handler_clone.get_key();
session
.write_cancel_key(node.cancel_closure.clone())
.await?;
session.write_cancel_key(node.cancel_closure.clone())?;
prepare_client_connection(&node, *session.key(), &mut stream).await?;

View File

@@ -383,9 +383,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
let cancellation_handler_clone = Arc::clone(&cancellation_handler);
let session = cancellation_handler_clone.get_key();
session
.write_cancel_key(node.cancel_closure.clone())
.await?;
session.write_cancel_key(node.cancel_closure.clone())?;
prepare_client_connection(&node, *session.key(), &mut stream).await?;

View File

@@ -94,7 +94,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> ProxyPassthrough<S> {
tracing::warn!(session_id = ?self.session_id, ?err, "could not cancel the query in the database");
}
drop(self.cancel.remove_cancel_key().await); // we don't need a result. If the queue is full, we just log the error
drop(self.cancel.remove_cancel_key()); // we don't need a result. If the queue is full, we just log the error
res
}

View File

@@ -103,7 +103,7 @@ class AbstractNeonCli:
else:
stdout = ""
log.warn(f"CLI timeout: stderr={stderr}, stdout={stdout}")
log.warning(f"CLI timeout: stderr={stderr}, stdout={stdout}")
raise
indent = " "

View File

@@ -19,6 +19,16 @@ TEST_ROLE_NAMES = [
{"name": "role$"},
{"name": "role$$"},
{"name": "role$x$"},
{"name": "x"},
{"name": "xx"},
{"name": "$x"},
{"name": "x$"},
{"name": "$x$"},
{"name": "xx$"},
{"name": "$xx"},
{"name": "$xx$"},
# 63 bytes is the limit for role/DB names in Postgres
{"name": "x" * 63},
]
TEST_DB_NAMES = [
@@ -74,6 +84,43 @@ TEST_DB_NAMES = [
"name": "db name$x$",
"owner": "role$x$",
},
{
"name": "x",
"owner": "x",
},
{
"name": "xx",
"owner": "xx",
},
{
"name": "$x",
"owner": "$x",
},
{
"name": "x$",
"owner": "x$",
},
{
"name": "$x$",
"owner": "$x$",
},
{
"name": "xx$",
"owner": "xx$",
},
{
"name": "$xx",
"owner": "$xx",
},
{
"name": "$xx$",
"owner": "$xx$",
},
# 63 bytes is the limit for role/DB names in Postgres
{
"name": "x" * 63,
"owner": "x" * 63,
},
]
@@ -146,6 +193,10 @@ def test_compute_create_drop_dbs_and_roles(neon_simple_env: NeonEnv):
"""
Test that compute_ctl can create and work with databases and roles
with special characters (whitespaces, %, tabs, etc.) in the name.
Also use `drop_subscriptions_before_start: true`. We do not actually
have any subscriptions in this test, so it should be no-op, but it
i) simulates the case when we create a second dev branch together with
a new project creation, and ii) just generally stresses more code paths.
"""
env = neon_simple_env
@@ -159,6 +210,7 @@ def test_compute_create_drop_dbs_and_roles(neon_simple_env: NeonEnv):
**{
"spec": {
"skip_pg_catalog_updates": False,
"drop_subscriptions_before_start": True,
"cluster": {
"roles": TEST_ROLE_NAMES,
"databases": TEST_DB_NAMES,
@@ -202,6 +254,7 @@ def test_compute_create_drop_dbs_and_roles(neon_simple_env: NeonEnv):
**{
"spec": {
"skip_pg_catalog_updates": False,
"drop_subscriptions_before_start": True,
"cluster": {
"roles": [],
"databases": [],

View File

@@ -510,7 +510,7 @@ def list_elegible_layers(
except KeyError:
# Unexpected: tests should call this when pageservers are in a quiet state such that the layer map
# matches what's on disk.
log.warn(f"Lookup {layer_file_name} from {list(visible_map.keys())}")
log.warning(f"Lookup {layer_file_name} from {list(visible_map.keys())}")
raise
return list(c for c in candidates if is_visible(c))
@@ -636,7 +636,7 @@ def test_secondary_downloads(neon_env_builder: NeonEnvBuilder):
except:
# On assertion failures, log some details to help with debugging
heatmap = env.pageserver_remote_storage.heatmap_content(tenant_id)
log.warn(f"heatmap contents: {json.dumps(heatmap, indent=2)}")
log.warning(f"heatmap contents: {json.dumps(heatmap, indent=2)}")
raise
# Scrub the remote storage

View File

@@ -1,18 +1,18 @@
{
"v17": [
"17.5",
"e5374b72997b0afc8374137674e873f7a558120a"
"8be779fd3ab9e87206da96a7e4842ef1abf04f44"
],
"v16": [
"16.9",
"15710a76b7d07912110fcbbaf0c8ad6d7e5a9fbc"
"0bf96bd6d70301a0b43b0b3457bb3cf8fb43c198"
],
"v15": [
"15.13",
"daa81cffcf063c54b29a9aabdb6604625f675ad0"
"de7640f55da07512834d5cc40c4b3fb376b5f04f"
],
"v14": [
"14.18",
"4cca6f8083483dda9e12eae292cf788d45bd561f"
"55c0d45abe6467c02084c2192bca117eda6ce1e7"
]
}

View File

@@ -60,7 +60,8 @@ lazy_static = { version = "1", default-features = false, features = ["spin_no_st
libc = { version = "0.2", features = ["extra_traits", "use_std"] }
log = { version = "0.4", default-features = false, features = ["std"] }
memchr = { version = "2" }
nix = { version = "0.26" }
nix-2f80eeee3b1b6c7e = { package = "nix", version = "0.26" }
nix-fa1f6196edfd7249 = { package = "nix", version = "0.30", features = ["dir", "ioctl", "mman", "poll", "signal", "socket"] }
nom = { version = "7" }
num = { version = "0.4" }
num-bigint = { version = "0.4" }